.net core中使用redis 延迟队列
可以利用redis的有序集合(SortedSet),用时间戳排序实现,大概的流程如下。
三、关键思路&代码段
写入任务
使用任务下一次的执行时间按分钟生成key,将同一分钟待执行的任务放到一个key中,这一步主要思考的问题是:拆分队列,设置各自的过期时间,如:过期时间 = 执行时间 + 5分钟,保证过期的队列自动删除,不会造成后续因消费能力不足而导致redis持续膨胀。
IDictionary<double, string> dic = new Dictionary<double, string>() { { interLineCacheModel.NextHandTimeSpan, interLineCacheModel.CacheRoute } }; var taskKey = GetKey(interLineCacheModel.NextHandTime); await _buildInterLineRepository.ZAddAsync(taskKey, dic); await _buildInterLineRepository.ExpireAsync(taskKey, new DateTimeOffset(interLineCacheModel.NextHandTime.AddMinutes(5)).ToUnixTimeSeconds()); private string GetKey(DateTime dateTime) { return $"IRTask{dateTime.ToString("yyyyMMddHHmm")}"; }消费服务
因为是一个有序集合,所以队列会自动按时间戳的大小来排序,这样就自动实现了由近到远依次执行,使用当前时间生成key,来获取对应的task,每次可以获取1条或者N条。
var taskKey = GetKey(DateTime.Now); var routeList = await _buildInterLineRepository.ZRangeAsync(taskKey, 0, 0);再拿到对应的执行时间戳,与当前时间做比较,如果还没有到执行时间就跳出队列。
var nextHandleTs = await _buildInterLineRepository.ZScoreAsync(taskKey, route); if (long.TryParse(nextHandleTs, out var nextHandleTimeSpan)) { var nextHandleTime = DateTimeOffset.FromUnixTimeMilliseconds(nextHandleTimeSpan).ToBeiJingDateTime(); if (nextHandleTime > DateTime.Now) { continue; } }最后一步,,使用ZRemAsync,以确保谁删除成功谁执行,解决多台机器同时拿到数据的问题
var success = await _buildInterLineRepository.ZRemAsync(taskKey, route) > 0; if (success) { //todo }注意事项:因为我们是按时间分钟来生成key,所以到时间临界点的时候,如果消费能力不足会导致key仍然有遗留任务,如果对此很敏感,可以在临界点将时间回退1秒,再获取出所有的任务。stop = -1 能拿出所有集合。
if second = 0 { addsecond(-1); var routeList = await _buildInterLineRepository.ZRangeAsync(taskKey, 0, -1); }四、注册消费服务
方式很多,这里使用的是IHostedService实现后台任务,具体可以参考:https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.0&tabs=visual-studio
public class InterLineLoopService : IHostedService { private readonly ILog _log; private readonly IInterLineTaskService _interLineTaskService; public InterLineLoopService( ILog log, IInterLineTaskService interLineTaskService) { _log = Guard.ArgumentNotNull(nameof(log), log); _interLineTaskService = Guard.ArgumentNotNull(nameof(interLineTaskService), interLineTaskService); } public Task StartAsync(CancellationToken cancellationToken) { _log.Info("LoopDelayMessage start....", "Domain", "InterLineLoopService", "StartAsync"); return _interLineTaskService.LoopDelayMessage(); } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } }再到startup中注册下
services.AddHostedService<InterLineLoopService>();.net core中使用redis 延迟队列
温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/web/41448.html
- 上一篇:HTTPS(SSL)证书下载及配置
- 下一篇:Redhat 线下赛 WEB WP