Last active
December 6, 2025 15:56
-
-
Save zhaopan/cf86d76348463284e0f2b2ca1875a247 to your computer and use it in GitHub Desktop.
SysTaskQueueProducer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using FreeSql; | |
| using Mapster; | |
| using System.Collections.Concurrent; | |
| using Yitter.IdGenerator; | |
| using ZhonTai.Admin.Core; | |
| using ZhonTai.Admin.Core.Configs; | |
| using ZhonTai.Admin.Core.Consts; | |
| using ZhonTai.Admin.Core.Enums; | |
| using ZhonTai.Admin.Core.Extensions; | |
| using ZhonTai.Admin.Domain.TaskScheduler; | |
| using ZhonTai.Admin.Services.TaskScheduler.Dto; | |
| using ZhonTai.DynamicApi; | |
| using ZhonTai.DynamicApi.Attributes; | |
| namespace ZhonTai.Admin.Services.TaskScheduler; | |
| /// <summary> | |
| /// 队列相关处理 | |
| /// </summary> | |
| [Order(71)] | |
| [DynamicApi(Area = AdminConsts.AreaName)] | |
| public class SysTaskQueueService : BaseService, ISysTaskQueueService, IDynamicApi | |
| { | |
| #region Config | |
| /// <summary> | |
| /// 所有容错重试次数, 快速容错 | |
| /// </summary> | |
| private const int RETRY_COUNT = 3; | |
| /// <summary> | |
| /// 执行最大重试次数, 所有容错重试次数, 若达到最大限制, 则快速重试, 不成功便成仁 | |
| /// </summary> | |
| private const int MAX_EXEC_RETRY_COUNT = 15; | |
| /// <summary> | |
| /// 最大最大最大重试次数, 若超过则抛报异常 | |
| /// </summary> | |
| private const int MAX_RETRY_COUNT = 30; | |
| /// <summary> | |
| /// 暂存缓存Task条数 | |
| /// <para>调整为 50, 平衡批量性能与写入锁耗时, 避免卡死</para> | |
| /// </summary> | |
| private static readonly int QUEUE_LIMIT = 50; | |
| /// <summary> | |
| /// 批量获取任务的数量限制 | |
| /// <para>增加到 20 提高处理吞吐量, 同时保持对新任务的响应速度</para> | |
| /// </summary> | |
| private static readonly int BATCH_POP_LIMIT = 20; | |
| /// <summary> | |
| /// 暂存缓存Task处理的间隔时间(单位:秒) | |
| /// <para>稍微缩短间隔, 确保在低流量时也能及时写入</para> | |
| /// </summary> | |
| private static readonly int PUSH_INTERVAL = 2; | |
| /// <summary> | |
| /// 执行失败后重试的间隔时间(单位:秒) | |
| /// </summary> | |
| private static readonly int EXEC_INTERVAL = 5; | |
| /// <summary> | |
| /// 有序队列 | |
| /// </summary> | |
| private static readonly ConcurrentQueue<SysTaskQueueEntity> UNSAVE_TASK_QUEUE = new(); | |
| /// <summary> | |
| /// 服务运行状态 | |
| /// </summary> | |
| private static volatile bool _SERVICE_IS_RUNNING = false; | |
| /// <summary> | |
| /// 服务运行状态 | |
| /// </summary> | |
| public static bool SERVICE_IS_RUNNING => _SERVICE_IS_RUNNING; | |
| /// <summary> | |
| /// 执行空闲 | |
| /// </summary> | |
| private static volatile bool _execIdle = true; | |
| /// <summary> | |
| /// 执行空闲 | |
| /// </summary> | |
| public static bool ExecIdle => _execIdle; | |
| /// <summary> | |
| /// 写入空闲 | |
| /// </summary> | |
| private static volatile bool _appendIdle = true; | |
| /// <summary> | |
| /// 写入空闲 | |
| /// </summary> | |
| public static bool AppendIdle => _appendIdle; | |
| /// <summary> | |
| /// 空闲 | |
| /// </summary> | |
| public static bool Idle => _execIdle && _appendIdle; | |
| /// <summary> | |
| /// 缓存队列最后更新写入时间 | |
| /// </summary> | |
| public static DateTime LAST_APPEND_TIME { get; private set; } = DateTime.Now; | |
| /// <summary> | |
| /// 等待处理的队列数量 | |
| /// </summary> | |
| public static int TASK_QUEUE_COUNT => UNSAVE_TASK_QUEUE.Count; | |
| /// <summary> | |
| /// 当前任务执行完成数 | |
| /// </summary> | |
| private static volatile int _execCount = 0; | |
| /// <summary> | |
| /// 当前任务执行完成数 | |
| /// </summary> | |
| public static int ExecCount => _execCount; | |
| // 数据库配置 | |
| private static DbConfig _dbConfig; | |
| /// <summary> | |
| /// 数据库配置 | |
| /// </summary> | |
| private static DbConfig DbConfig | |
| { | |
| get | |
| { | |
| _dbConfig ??= AppInfo.GetOptions<DbConfig>(); | |
| return _dbConfig; | |
| } | |
| } | |
| /// <summary> | |
| /// FreeSql 实例(复用, 避免重复创建) | |
| /// </summary> | |
| private static IFreeSql _fsql; | |
| private static readonly object _fsqlLock = new(); | |
| /// <summary> | |
| /// 获取 FreeSql 实例 | |
| /// </summary> | |
| private static IFreeSql GetFreeSql() | |
| { | |
| if (_fsql == null) | |
| { | |
| lock (_fsqlLock) | |
| { | |
| _fsql ??= new FreeSqlBuilder() | |
| .UseConnectionString(DbConfig.Type, DbConfig.ConnectionString) | |
| .Build(); | |
| } | |
| } | |
| return _fsql; | |
| } | |
| #endregion Config | |
| #region Public | |
| public SysTaskQueueService() | |
| { | |
| } | |
| /// <summary> | |
| /// 启动 | |
| /// </summary> | |
| /// <returns></returns> | |
| public async Task Start() | |
| { | |
| await Run(); | |
| } | |
| /// <summary> | |
| /// 停止 | |
| /// </summary> | |
| public static async Task Stop() | |
| { | |
| // 处理剩余队列 | |
| await AppendTask(); | |
| _SERVICE_IS_RUNNING = false; | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Info("TaskQueue Stop OK"); | |
| Console.ResetColor(); | |
| } | |
| /// <summary> | |
| /// 重启 | |
| /// </summary> | |
| public static async void ReStart() | |
| { | |
| await Stop(); | |
| await Run(); | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Info("TaskQueue ReStart OK"); | |
| Console.ResetColor(); | |
| } | |
| /// <summary> | |
| /// !!! 清空所有队列 !!! | |
| /// </summary> | |
| public static void Cleanup() | |
| { | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Info("TaskQueue Cleanup OK"); | |
| Console.ResetColor(); | |
| throw new NullReferenceException(); | |
| } | |
| /// <summary> | |
| /// 添加处理队列 | |
| /// <para>加入同步队列, 根据同步机制, 由系统自动调度处理</para> | |
| /// </summary> | |
| /// <param name="taskList"></param> | |
| public static async Task Push(List<DTOSysTaskQueue> taskList) | |
| { | |
| if (taskList == null | |
| || taskList.Count < 1) | |
| { | |
| return; | |
| } | |
| foreach (var task in taskList) | |
| { | |
| if (task.RetryCount > MAX_RETRY_COUNT) throw new Exception($"任务队列最大重试次数, 不能大于 {MAX_RETRY_COUNT} "); | |
| } | |
| await Task.Factory.StartNew(async () => | |
| { | |
| try | |
| { | |
| var groupId = YitIdHelper.NextId();// 生成唯一分组ID | |
| var list = taskList.Adapt<List<SysTaskQueueEntity>>(); | |
| for (int i = 0; i < list.Count; i++) | |
| { | |
| list[i].GroupId = groupId.ToString(); | |
| UNSAVE_TASK_QUEUE.Enqueue(list[i]); | |
| } | |
| Interlocked.Add(ref _execCount, list.Count); | |
| CommonTools.DebugWriteLine($"SysTaskQueueManager Push {taskList.Count} Task, 总计条数 {TASK_QUEUE_COUNT} "); | |
| await Run(); | |
| } | |
| catch (Exception e) | |
| { | |
| AppInfo.Log.Error("TaskQueue Push ERROR", e); | |
| } | |
| }); | |
| } | |
| /// <summary> | |
| /// 添加处理队列 | |
| /// <para>加入同步队列, 根据同步机制, 由系统自动调度处理</para> | |
| /// </summary> | |
| /// <param name="task"></param> | |
| /// <returns></returns> | |
| public static async Task Push(DTOSysTaskQueue task) | |
| { | |
| if (task == null) return; | |
| if (task.RetryCount > MAX_RETRY_COUNT) throw new Exception($"任务队列最大重试次数, 不能大于 {MAX_RETRY_COUNT} "); | |
| await Task.Factory.StartNew(async () => | |
| { | |
| try | |
| { | |
| var taskEnt = task.Adapt<SysTaskQueueEntity>(); | |
| UNSAVE_TASK_QUEUE.Enqueue(taskEnt); | |
| var currentCount = Interlocked.Increment(ref _execCount); | |
| var queueCount = UNSAVE_TASK_QUEUE.Count; | |
| CommonTools.DebugWriteLine($"SysTaskQueueManager Push 1 Task, 总计条数 {queueCount}, 总计条数 {currentCount} 次"); | |
| await Run(); | |
| } | |
| catch (Exception e) | |
| { | |
| AppInfo.Log.Error("TaskQueue Push ERROR", e); | |
| } | |
| }); | |
| } | |
| #endregion Public | |
| #region Private | |
| /// <summary> | |
| /// 开始 | |
| /// </summary> | |
| private static readonly object _runLock = new(); | |
| /// <summary> | |
| /// 执行 | |
| /// </summary> | |
| private static async Task Run() | |
| { | |
| lock (_runLock) | |
| { | |
| _SERVICE_IS_RUNNING = true; | |
| // PopTask - 避免重复启动 | |
| if (_execIdle) | |
| { | |
| _execIdle = false; | |
| Task.Factory.StartNew(async () => | |
| { | |
| await Task.Delay(PUSH_INTERVAL * 1000);// 延迟启动, 等待队列写入, 防止刚启动的时候, 队列还没写入 | |
| try | |
| { | |
| bool hasNext = true; | |
| int processedCount = 0; | |
| do | |
| { | |
| try | |
| { | |
| (hasNext, processedCount) = await Pop(); | |
| } | |
| catch (Exception e) | |
| { | |
| Console.ForegroundColor = ConsoleColor.Red; | |
| AppInfo.Log.Error("TaskQueue POP ERROR", e); | |
| Console.ResetColor(); | |
| await Task.Delay(EXEC_INTERVAL * 1000); // 若出错, 则等待N秒重试 | |
| } | |
| // 自适应等待 | |
| if (processedCount > 0) | |
| { | |
| // 如果处理了任务,做一个极短的 Yield,防止 CPU 100% 但又保持高吞吐 | |
| await Task.Delay(10); | |
| } | |
| else | |
| { | |
| // 如果没有任务,则进入轮询等待 | |
| await Task.Delay(200); | |
| } | |
| } while (_SERVICE_IS_RUNNING && hasNext); | |
| } | |
| finally | |
| { | |
| _execIdle = true; | |
| await Task.Delay(PUSH_INTERVAL * 1000); // 退出延迟前关闭, 防止立即有写入队列 | |
| if (Idle) | |
| { | |
| await Stop(); | |
| } | |
| Interlocked.Exchange(ref _execCount, 0); | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Info("TaskQueue PopTask DONE !!!"); | |
| CommonTools.DebugWriteLine($"TaskQueue PopTask DONE !!!"); | |
| Console.ResetColor(); | |
| // 双重检查:如果此刻又有新任务进来(Run在Idle复位前执行过),需要自我重启 | |
| if (!_appendIdle || !UNSAVE_TASK_QUEUE.IsEmpty) | |
| { | |
| CommonTools.DebugWriteLine($"TaskQueue PopTask RESTARTING (Race condition detected)"); | |
| // 不等待,异步触发 | |
| _ = Run(); | |
| } | |
| } | |
| }, TaskCreationOptions.LongRunning); | |
| Console.ForegroundColor = ConsoleColor.Green; | |
| AppInfo.Log.Info("TaskQueue PopTask Started"); | |
| Console.ResetColor(); | |
| } | |
| // AppendTask - 避免重复启动 | |
| if (_appendIdle) | |
| { | |
| _appendIdle = false; | |
| Task.Factory.StartNew(async () => | |
| { | |
| try | |
| { | |
| do | |
| { | |
| if (TASK_QUEUE_COUNT > QUEUE_LIMIT | |
| || (DateTime.Now - LAST_APPEND_TIME).TotalSeconds > PUSH_INTERVAL) | |
| { | |
| try | |
| { | |
| await AppendTask(); | |
| AppInfo.Log.Info($"TaskQueue Append Push OK, 队列剩余处理条数 {TASK_QUEUE_COUNT} 条"); | |
| } | |
| catch (Exception e) | |
| { | |
| Console.ForegroundColor = ConsoleColor.Red; | |
| AppInfo.Log.Error("TaskQueue Append Push ERROR", e); | |
| Console.ResetColor(); | |
| await Task.Delay(5 * 1000); // 若出错, 等待5秒重试 | |
| } | |
| finally | |
| { | |
| await Task.Delay(200); // 微小间隔 | |
| } | |
| } | |
| else | |
| { | |
| await Task.Delay(PUSH_INTERVAL * 1000); // 写入队列的间隔等待, 防止CPU过高 | |
| } | |
| } | |
| while (_SERVICE_IS_RUNNING && !UNSAVE_TASK_QUEUE.IsEmpty); | |
| } | |
| finally | |
| { | |
| _appendIdle = true; | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Info("TaskQueue AppendTask DONE !!!"); | |
| CommonTools.DebugWriteLine($"TaskQueue AppendTask DONE !!!"); | |
| Console.ResetColor(); | |
| } | |
| }, TaskCreationOptions.LongRunning); | |
| Console.ForegroundColor = ConsoleColor.Green; | |
| AppInfo.Log.Info("TaskQueue AppendTask Started"); | |
| Console.ResetColor(); | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// 返回最近需要处理的一个队列, 并处理他 | |
| /// </summary> | |
| /// <returns></returns> | |
| internal static async Task<(bool Next, int Count)> Pop() | |
| { | |
| var fsql = GetFreeSql(); | |
| // 1. 批量获取任务 | |
| var tasks = await fsql.Select<SysTaskQueueEntity>() | |
| .Where(x => !x.Ignored) | |
| .OrderBy(x => x.CreateAt) | |
| .Take(BATCH_POP_LIMIT) | |
| .ToListAsync(); | |
| // 仅当【写入任务也空闲】且【队列也为空】时,才真正停止 Pop 轮询 | |
| // 防止:写入任务刚启动还未入库,或者队列刚有数据还未启动写入任务时,Pop 提前退出 | |
| if (tasks.Count == 0) return (!(_appendIdle && UNSAVE_TASK_QUEUE.IsEmpty), 0); | |
| // 2. 确保分组完整性 | |
| var groupIds = tasks | |
| .Where(x => !string.IsNullOrEmpty(x.GroupId)) | |
| .Select(x => x.GroupId) | |
| .Distinct() | |
| .ToList(); | |
| if (groupIds.Count > 0) | |
| { | |
| var fullGroups = await fsql.Select<SysTaskQueueEntity>() | |
| .Where(x => groupIds.Contains(x.GroupId) && !x.Ignored) | |
| .OrderBy(x => x.CreateAt) | |
| .ToListAsync(); | |
| var nonGroupTasks = tasks.Where(x => string.IsNullOrEmpty(x.GroupId)); | |
| tasks = [.. nonGroupTasks.Concat(fullGroups) | |
| .DistinctBy(x => x.Id) | |
| .OrderBy(x => x.CreateAt)]; | |
| } | |
| // 3. 依次处理每一条任务 | |
| foreach (var task in tasks) | |
| { | |
| await ProcessTask(task, fsql); | |
| } | |
| return (true, tasks.Count); | |
| } | |
| /// <summary> | |
| /// 处理单条任务(包含重试逻辑) | |
| /// </summary> | |
| /// <param name="task"></param> | |
| /// <param name="fsql"></param> | |
| /// <returns></returns> | |
| private static async Task ProcessTask(SysTaskQueueEntity task | |
| , IFreeSql fsql) | |
| { | |
| bool execSuccess = false; // 执行是否成功 | |
| int execCount = task.ExecCount; // 执行次数 | |
| do | |
| { | |
| var now = DateTime.Now; | |
| execSuccess = ExecTask(task);// 执行任务 | |
| execCount++; | |
| // 必须执行通过 且 重试次未数达上限, 持续重试 | |
| // 若不需要重试 或 执行失败 则跳出循环 | |
| if (execSuccess) | |
| { | |
| await fsql.Delete<SysTaskQueueEntity>() | |
| .Where(x => x.Id == task.Id) | |
| .ExecuteAffrowsAsync(); | |
| Interlocked.Decrement(ref _execCount); | |
| Console.ForegroundColor = ConsoleColor.Green; | |
| AppInfo.Log.Info($"EXEC TASK OKAY! => DEL TASK, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| break; | |
| } | |
| else | |
| { | |
| // 强制忽略:立即忽略并退出(优先级高于Required) | |
| if (task.ForceIgnore) | |
| { | |
| // 检测冲突:Required和ForceIgnore同时为true是矛盾的 | |
| if (task.Required) | |
| { | |
| Console.ForegroundColor = ConsoleColor.Red; | |
| AppInfo.Log.Warn($"任务配置冲突:优先级 ForceIgnore > Required => TaskQueueType: {task.TaskQueueType}, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| } | |
| await fsql.Update<SysTaskQueueEntity>() | |
| .Set(x => x.LastAt == now) | |
| .Set(x => x.ExecCount == execCount) | |
| .Set(x => x.Ignored == true) | |
| .Where(x => x.Id == task.Id) | |
| .ExecuteAffrowsAsync(); | |
| Interlocked.Decrement(ref _execCount); | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Info($"EXEC ERROR 已被强制忽略 => TaskQueueType: {task.TaskQueueType}, ExecCount: {execCount}, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| break; | |
| } | |
| // 必须执行:持续重试直到成功(除非达到最大容错次数) | |
| else if (task.Required) | |
| { | |
| // 检查最大容错次数(Required任务的安全阀) | |
| if (MAX_EXEC_RETRY_COUNT < execCount) | |
| { | |
| await fsql.Update<SysTaskQueueEntity>() | |
| .Set(x => x.LastAt == now) | |
| .Set(x => x.ExecCount == execCount) | |
| .Set(x => x.Ignored == true) | |
| .Where(x => x.Id == task.Id) | |
| .ExecuteAffrowsAsync(); | |
| Interlocked.Decrement(ref _execCount); | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Error($"EXEC ERROR => 容错跳过, TaskQueueType: {task.TaskQueueType}, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| break; | |
| } | |
| // Required任务会一直重试, 由MAX_EXEC_RETRY_COUNT控制最终容错 | |
| // 更新执行次数后继续重试 | |
| await fsql.Update<SysTaskQueueEntity>() | |
| .Set(x => x.LastAt == now) | |
| .Set(x => x.ExecCount == execCount) | |
| .Where(x => x.Id == task.Id) | |
| .ExecuteAffrowsAsync(); | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Error($"EXEC ERROR => TaskQueueType: {task.TaskQueueType}, ExecCount: {execCount}, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| // 注意:这里不continue, 让代码继续执行延迟重试逻辑 | |
| } | |
| // 普通任务:检查是否达到重试次数上限 | |
| else | |
| { | |
| // 如果达到重试次数上限, 标记为忽略并退出 | |
| if (execCount > task.RetryCount) | |
| { | |
| await fsql.Update<SysTaskQueueEntity>() | |
| .Set(x => x.LastAt == now) | |
| .Set(x => x.ExecCount == execCount) | |
| .Set(x => x.Ignored == true) | |
| .Where(x => x.Id == task.Id) | |
| .ExecuteAffrowsAsync(); | |
| Interlocked.Decrement(ref _execCount); | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Error($"EXEC ERROR => 达到重试次数上限, 已忽略 => TaskQueueType: {task.TaskQueueType}, ExecCount: {execCount}, RetryCount: {task.RetryCount}, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| break; | |
| } | |
| // 未达到重试次数上限, 继续重试 | |
| else | |
| { | |
| await fsql.Update<SysTaskQueueEntity>() | |
| .Set(x => x.LastAt == now) | |
| .Set(x => x.ExecCount == execCount) | |
| .Where(x => x.Id == task.Id) | |
| .ExecuteAffrowsAsync(); | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Error($"EXEC ERROR => TaskQueueType: {task.TaskQueueType}, ExecCount: {execCount}, RetryCount: {task.RetryCount}, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| // 注意:这里不continue, 让代码继续执行延迟重试逻辑 | |
| } | |
| } | |
| } | |
| // 如果已经break, 不会执行到这里 | |
| // 检查是否应该继续重试(Required任务或普通任务未达上限) | |
| if (!execSuccess && !task.ForceIgnore && MAX_EXEC_RETRY_COUNT >= execCount) | |
| { | |
| // Required任务:始终继续重试 | |
| // 普通任务:如果未达到RetryCount上限, 继续重试 | |
| if (task.Required || (!task.Required && execCount <= task.RetryCount)) | |
| { | |
| // 如果重试次数大于 3 次, 则缓慢重试 | |
| if (execCount > RETRY_COUNT) | |
| { | |
| double ratio = (double)execCount / RETRY_COUNT; | |
| double preciseMS = ratio * 1000; | |
| var nextRetryMS = (int)Math.Ceiling(preciseMS); | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Error($"EXEC ERROR => 放缓重试速度 {nextRetryMS} ms 后重试, TaskQueueType: {task.TaskQueueType}, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| await Task.Delay(nextRetryMS); | |
| } | |
| // 每5次, 重新获取数据, 防止硬拉数据导致程序卡死 | |
| if (execCount % 5 == 0) | |
| { | |
| Console.ForegroundColor = ConsoleColor.Yellow; | |
| AppInfo.Log.Error($"EXEC ERROR => 重新获取, TaskQueueType: {task.TaskQueueType}, TaskId: {task.Id}"); | |
| Console.ResetColor(); | |
| continue; // 注意:ProcessTask 内部循环, continue 针对的是 ProcessTask 内的 do-while | |
| } | |
| // 继续重试 | |
| continue; | |
| } | |
| } | |
| } | |
| while (!execSuccess); | |
| } | |
| /// <summary> | |
| /// 写入数据库 | |
| /// </summary> | |
| private static async Task AppendTask() | |
| { | |
| var list = new List<SysTaskQueueEntity>(); | |
| for (var i = 0; i < QUEUE_LIMIT && !UNSAVE_TASK_QUEUE.IsEmpty; i++) | |
| { | |
| if (UNSAVE_TASK_QUEUE.TryDequeue(out var item)) | |
| { | |
| // 仅当ID为空时生成,避免重试时ID被改变 | |
| if (item.Id == 0) item.Id = YitIdHelper.NextId(); | |
| list.Add(item); | |
| } | |
| } | |
| if (list.Count == 0) return; | |
| var fsql = GetFreeSql(); | |
| int count = 0; | |
| bool success; | |
| do | |
| { | |
| try | |
| { | |
| await fsql.InsertOrUpdate<SysTaskQueueEntity>() | |
| .SetSource(list) | |
| .ExecuteAffrowsAsync(); | |
| LAST_APPEND_TIME = DateTime.Now; | |
| success = true; | |
| break; | |
| } | |
| catch (Exception e) | |
| { | |
| success = false; | |
| Console.ForegroundColor = ConsoleColor.Red; | |
| AppInfo.Log.Error($"TaskQueue Append Push ERROR, 已重试 {count + 1} 次", e); | |
| Console.ResetColor(); | |
| } | |
| //重试3次 | |
| count++; | |
| } while (count < RETRY_COUNT); | |
| // 如果写入失败, 将数据重新放回队列头部, 避免数据丢失 | |
| if (!success) | |
| { | |
| // 按原顺序重新入队(从后往前入队, 这样先入队的会在队列前面) | |
| foreach (var item in list) | |
| { | |
| UNSAVE_TASK_QUEUE.Enqueue(item); | |
| } | |
| AppInfo.Log.Error($"TaskQueue Append Push 失败, 已将 {list.Count} 条数据重新放回队列, 将在下次重试"); | |
| } | |
| } | |
| /// <summary> | |
| /// 处理任务队列 | |
| /// </summary> | |
| /// <param name="task"></param> | |
| /// <returns></returns> | |
| private static bool ExecTask(SysTaskQueueEntity task) | |
| { | |
| return GetTaskInvoker(task.TaskQueueType).Invoke(task); | |
| } | |
| /// <summary> | |
| /// 获取任务处理函数 | |
| /// </summary> | |
| /// <param name="type"></param> | |
| /// <returns></returns> | |
| private static Func<SysTaskQueueEntity, bool> GetTaskInvoker(ETaskQueueType type) | |
| { | |
| return type switch | |
| { | |
| ETaskQueueType.S1 => S1, | |
| ETaskQueueType.S2 => S2, | |
| ETaskQueueType.S3 => S3, | |
| ETaskQueueType.S4 => S4, | |
| _ => new Func<SysTaskQueueEntity, bool>(x => true),//不识别任务 执行失败 | |
| }; | |
| } | |
| #endregion Private | |
| #region Some Sync... | |
| /// <summary> | |
| /// 执行 | |
| /// </summary> | |
| /// <param name="task"></param> | |
| /// <returns></returns> | |
| private static bool S1(SysTaskQueueEntity task) | |
| { | |
| Task.Delay(100).Wait(); | |
| return true; | |
| } | |
| /// <summary> | |
| /// 执行 | |
| /// </summary> | |
| /// <param name="task"></param> | |
| /// <returns></returns> | |
| private static bool S2(SysTaskQueueEntity task) | |
| { | |
| Task.Delay(500).Wait(); | |
| return true; | |
| } | |
| /// <summary> | |
| /// 执行 | |
| /// </summary> | |
| /// <param name="task"></param> | |
| /// <returns></returns> | |
| private static bool S3(SysTaskQueueEntity task) | |
| { | |
| return true; | |
| } | |
| /// <summary> | |
| /// 执行 | |
| /// </summary> | |
| /// <param name="task"></param> | |
| /// <returns></returns> | |
| private static bool S4(SysTaskQueueEntity task) | |
| { | |
| return true; | |
| } | |
| #endregion Some Sync... | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /// <summary> | |
| /// SysTaskQueue | |
| /// </summary> | |
| [Table(Name = "sys_task_queue")] | |
| [Index("idx_{tablename}_01", nameof(CreateAt) + " ASC")] | |
| [Index("idx_{tablename}_02", nameof(Ignored))] | |
| public class SysTaskQueueEntity : Entity | |
| { | |
| /// <summary> | |
| /// 名称 | |
| /// </summary> | |
| public string Name { get; set; } | |
| /// <summary> | |
| /// 队列类型 | |
| /// </summary> | |
| [Column(MapType = typeof(int))] | |
| public ETaskQueueType TaskQueueType { get; set; } | |
| /// <summary> | |
| /// 用户Id | |
| /// </summary> | |
| public long? UserId { get; set; } | |
| /// <summary> | |
| /// 分组Id | |
| /// </summary> | |
| public string GroupId { get; set; } | |
| /// <summary> | |
| /// 执行数据, JSON格式 | |
| /// </summary> | |
| public string Content { get; set; } | |
| /// <summary> | |
| /// 重试次数上限, 默认3次, 若出错, 则重试N次 | |
| /// </summary> | |
| public int RetryCount { get; set; } = 3; | |
| /// <summary> | |
| /// 执行次数 | |
| /// </summary> | |
| public int ExecCount { get; set; } = 0; | |
| /// <summary> | |
| /// 必须执行通过, 若出错, 则不断重试, 直到成功为止, 会阻塞后续队列执行 | |
| /// </summary> | |
| public bool Required { get; set; } = false; | |
| /// <summary> | |
| /// 强制忽略, 若出错, 则强制忽略 | |
| /// </summary> | |
| public bool ForceIgnore { get; set; } = false; | |
| /// <summary> | |
| /// 已忽略, 若标记为已忽略, 则不再执行 | |
| /// </summary> | |
| public bool Ignored { get; set; } = false; | |
| /// <summary> | |
| /// 备注 | |
| /// </summary> | |
| public string Remark { get; set; } | |
| /// <summary> | |
| /// 创建时间 | |
| /// </summary> | |
| public DateTime CreateAt { get; set; } = DateTime.Now; | |
| /// <summary> | |
| /// 修改时间 | |
| /// </summary> | |
| public DateTime LastAt { get; set; } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /// <summary> | |
| /// 任务队列类型 | |
| /// </summary> | |
| public enum ETaskQueueType | |
| { | |
| S1 = 1, | |
| S2 = 2, | |
| S3 = 4, | |
| S4 = 8, | |
| } | |
| /// <summary> | |
| /// 队列DTO | |
| /// </summary> | |
| public class DTOSysTaskQueue | |
| { | |
| /// <summary> | |
| /// Id | |
| /// </summary> | |
| public long Id { get; set; } | |
| /// <summary> | |
| /// 名称 | |
| /// </summary> | |
| public string Name { get; set; } | |
| /// <summary> | |
| /// 队列类型 | |
| /// </summary> | |
| public ETaskQueueType TaskQueueType { get; set; } | |
| /// <summary> | |
| /// 用户Id | |
| /// </summary> | |
| public long? UserId { get; set; } | |
| /// <summary> | |
| /// 分组Id | |
| /// </summary> | |
| public string GroupId { get; set; } | |
| /// <summary> | |
| /// 执行数据, JSON格式 | |
| /// </summary> | |
| public string Content { get; set; } | |
| /// <summary> | |
| /// 重试次数上限, 默认3次, 若出错, 则重试N次 | |
| /// </summary> | |
| public int RetryCount { get; set; } = 3; | |
| /// <summary> | |
| /// 执行次数 | |
| /// </summary> | |
| public int ExecCount { get; set; } = 0; | |
| /// <summary> | |
| /// 必须执行通过, 若出错, 则不断重试, 直到成功为止, 会阻塞后续队列执行 | |
| /// </summary> | |
| public bool Required { get; set; } = false; | |
| /// <summary> | |
| /// 强制忽略, 若出错, 则强制忽略 | |
| /// </summary> | |
| public bool ForceIgnore { get; set; } = false; | |
| /// <summary> | |
| /// 已忽略, 若标记为已忽略, 则不再执行 | |
| /// </summary> | |
| public bool Ignored { get; set; } = false; | |
| /// <summary> | |
| /// 备注 | |
| /// </summary> | |
| public string Remark { get; set; } | |
| /// <summary> | |
| /// 创建时间 | |
| /// </summary> | |
| public DateTime CreateAt { get; set; } = DateTime.Now; | |
| /// <summary> | |
| /// 修改时间 | |
| /// </summary> | |
| public DateTime LastAt { get; set; } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment