Skip to content

Commit

Permalink
😊 改进 任务队模块,支持自定义 TaskId 和局部控制任务并行或串行执行策略
Browse files Browse the repository at this point in the history
  • Loading branch information
MonkSoul committed Mar 6, 2024
1 parent 35f91cb commit a732c72
Show file tree
Hide file tree
Showing 18 changed files with 180 additions and 78 deletions.
2 changes: 1 addition & 1 deletion framework/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net5.0;net6.0;net7.0;net8.0;net9.0</TargetFrameworks>
<Version>4.9.1.37</Version>
<Version>4.9.2</Version>
<ImplicitUsings>enable</ImplicitUsings>
<Authors>百小僧</Authors>
<Company>百签科技(广东)有限公司</Company>
Expand Down
13 changes: 11 additions & 2 deletions framework/Furion.Pure/App/Extensions/ObjectExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,21 @@ public static void CopyToSave(this byte[] bytes, string path)
/// 将流保存到本地磁盘
/// </summary>
/// <param name="stream"></param>
/// <param name="path"></param>
/// <param name="path">需包含文件名完整路径</param>
/// <returns></returns>
public static async Task CopyToSaveAsync(this Stream stream, string path)
{
// 空检查
if (string.IsNullOrWhiteSpace(path)) throw new ArgumentNullException(nameof(path));
if (string.IsNullOrWhiteSpace(path))
{
throw new ArgumentNullException(nameof(path));
}

// 文件名判断
if (string.IsNullOrWhiteSpace(Path.GetFileName(path)))
{
throw new ArgumentException("The parameter of <path> parameter must include the complete file name.");
}

using var fileStream = File.Create(path);
await stream.CopyToAsync(fileStream);
Expand Down
22 changes: 15 additions & 7 deletions framework/Furion.Pure/TaskQueue/Dependencies/ITaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,33 @@ public interface ITaskQueue
/// <param name="taskHandler">任务处理委托</param>
/// <param name="delay">延迟时间(毫秒)</param>
/// <param name="channel">任务通道</param>
/// <returns><see cref="Guid"/></returns>
Guid Enqueue(Action<IServiceProvider> taskHandler, int delay = 0, string channel = null);
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="object"/></returns>
object Enqueue(Action<IServiceProvider> taskHandler, int delay = 0, string channel = null, object taskId = null, object concurrent = null);

/// <summary>
/// 任务项入队
/// </summary>
/// <param name="taskHandler">任务处理委托</param>
/// <param name="delay">延迟时间(毫秒)</param>
/// <param name="channel">任务通道</param>
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="ValueTask"/></returns>
ValueTask<Guid> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, int delay = 0, string channel = null);
ValueTask<object> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, int delay = 0, string channel = null, object taskId = null, object concurrent = null);

/// <summary>
/// 任务项入队
/// </summary>
/// <param name="taskHandler">任务处理委托</param>
/// <param name="cronExpression">Cron 表达式</param>
/// <param name="format"><see cref="CronStringFormat"/></param>
/// <param name="channel">任务通道</param>
/// <returns><see cref="Guid"/></returns>
Guid Enqueue(Action<IServiceProvider> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null);
/// <param name="format"><see cref="CronStringFormat"/></param>
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="object"/></returns>
object Enqueue(Action<IServiceProvider> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null, object taskId = null, object concurrent = null);

/// <summary>
/// 任务项入队
Expand All @@ -51,8 +57,10 @@ public interface ITaskQueue
/// <param name="cronExpression">Cron 表达式</param>
/// <param name="format"><see cref="CronStringFormat"/></param>
/// <param name="channel">任务通道</param>
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="ValueTask"/></returns>
ValueTask<Guid> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null);
ValueTask<object> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null, object taskId = null, object concurrent = null);

/// <summary>
/// 任务项出队
Expand Down
41 changes: 28 additions & 13 deletions framework/Furion.Pure/TaskQueue/Dependencies/TaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ public TaskQueue(int capacity)
/// <param name="taskHandler">任务处理委托</param>
/// <param name="delay">延迟时间(毫秒)</param>
/// <param name="channel">任务通道</param>
/// <returns><see cref="Guid"/></returns>
public Guid Enqueue(Action<IServiceProvider> taskHandler, int delay = 0, string channel = null)
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="object"/></returns>
public object Enqueue(Action<IServiceProvider> taskHandler, int delay = 0, string channel = null, object taskId = null, object concurrent = null)
{
// 空检查
if (taskHandler == default)
Expand All @@ -57,7 +59,7 @@ public Guid Enqueue(Action<IServiceProvider> taskHandler, int delay = 0, string
{
taskHandler(serviceProvider);
return ValueTask.CompletedTask;
}, delay, channel)
}, delay, channel, taskId, concurrent)
.AsTask().GetAwaiter().GetResult();
}

Expand All @@ -67,22 +69,30 @@ public Guid Enqueue(Action<IServiceProvider> taskHandler, int delay = 0, string
/// <param name="taskHandler">任务处理委托</param>
/// <param name="delay">延迟时间(毫秒)</param>
/// <param name="channel">任务通道</param>
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="ValueTask"/></returns>
public async ValueTask<Guid> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, int delay = 0, string channel = null)
public async ValueTask<object> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, int delay = 0, string channel = null, object taskId = null, object concurrent = null)
{
// 空检查
if (taskHandler == default)
{
throw new ArgumentNullException(nameof(taskHandler));
}

// 仅支持 null,bool 类型
if (!(concurrent is null or bool))
{
throw new ArgumentNullException(nameof(concurrent));
}

// 创建任务 ID
var taskId = Guid.NewGuid();
var newTaskId = taskId ?? Guid.NewGuid();

// 写入管道队列
await _queue.Writer.WriteAsync(new TaskWrapper
{
TaskId = taskId,
TaskId = newTaskId,
Channel = channel ?? string.Empty,
Handler = async (serviceProvider, cancellationToken) =>
{
Expand All @@ -92,10 +102,11 @@ await _queue.Writer.WriteAsync(new TaskWrapper
}

await taskHandler(serviceProvider, cancellationToken);
}
},
Concurrent = concurrent
});

return taskId;
return newTaskId;
}

/// <summary>
Expand All @@ -105,13 +116,15 @@ await _queue.Writer.WriteAsync(new TaskWrapper
/// <param name="cronExpression">Cron 表达式</param>
/// <param name="channel">任务通道</param>
/// <param name="format"><see cref="CronStringFormat"/></param>
/// <returns><see cref="Guid"/></returns>
public Guid Enqueue(Action<IServiceProvider> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null)
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="object"/></returns>
public object Enqueue(Action<IServiceProvider> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null, object taskId = null, object concurrent = null)
{
var totalMilliseconds = Crontab.Parse(cronExpression, format)
.GetSleepMilliseconds(DateTime.Now);

return Enqueue(taskHandler, (int)totalMilliseconds, channel);
return Enqueue(taskHandler, (int)totalMilliseconds, channel, taskId, concurrent);
}

/// <summary>
Expand All @@ -121,13 +134,15 @@ public Guid Enqueue(Action<IServiceProvider> taskHandler, string cronExpression,
/// <param name="cronExpression">Cron 表达式</param>
/// <param name="format"><see cref="CronStringFormat"/></param>
/// <param name="channel">任务通道</param>
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="ValueTask"/></returns>
public ValueTask<Guid> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null)
public ValueTask<object> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null, object taskId = null, object concurrent = null)
{
var totalMilliseconds = Crontab.Parse(cronExpression, format)
.GetSleepMilliseconds(DateTime.Now);

return EnqueueAsync(taskHandler, (int)totalMilliseconds, channel);
return EnqueueAsync(taskHandler, (int)totalMilliseconds, channel, taskId, concurrent);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public sealed class TaskHandlerEventArgs : EventArgs
/// <param name="taskId">任务 Id</param>
/// <param name="channel">任务通道</param>
/// <param name="success">任务处理委托调用结果</param>
public TaskHandlerEventArgs(Guid taskId, string channel, bool success)
public TaskHandlerEventArgs(object taskId, string channel, bool success)
{
TaskId = taskId;
Channel = channel;
Expand All @@ -26,7 +26,7 @@ public TaskHandlerEventArgs(Guid taskId, string channel, bool success)
/// <summary>
/// 任务 Id
/// </summary>
public Guid TaskId { get; }
public object TaskId { get; }

/// <summary>
/// 任务通道
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// 此源代码遵循位于源代码树根目录中的 LICENSE 文件的许可证。

using Furion.TaskQueue;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Microsoft.Extensions.DependencyInjection;

Expand Down Expand Up @@ -75,7 +76,7 @@ private static IServiceCollection AddInternalService(this IServiceCollection ser
taskQueueOptionsBuilder.Build();

// 注册后台任务队列接口/实例为单例,采用工厂方式创建
services.AddSingleton<ITaskQueue>(_ =>
services.TryAddSingleton<ITaskQueue>(_ =>
{
// 创建后台队列实例
return new TaskQueue(taskQueueOptionsBuilder.ChannelCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ private async Task BackgroundProcessing(CancellationToken stoppingToken)
// 出队
var taskWrapper = await _taskQueue.DequeueAsync(stoppingToken);

// 获取任务执行策略
var concurrent = taskWrapper.Concurrent == null
? _concurrent
: (bool)taskWrapper.Concurrent;

// 并行执行
if (_concurrent)
if (concurrent)
{
Parallel.For(0, 1, async _ =>
{
Expand Down
7 changes: 6 additions & 1 deletion framework/Furion.Pure/TaskQueue/Internal/TaskWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ public sealed class TaskWrapper
/// <summary>
/// 任务 ID
/// </summary>
public Guid TaskId { get; internal set; }
public object TaskId { get; internal set; }

/// <summary>
/// 任务处理委托
/// </summary>
public Func<IServiceProvider, CancellationToken, ValueTask> Handler { get; internal set; }

/// <summary>
/// 是否采用并行执行
/// </summary>
public object Concurrent { get; internal set; } = null;
}
30 changes: 19 additions & 11 deletions framework/Furion.Pure/TaskQueue/TaskQueued.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ public static class TaskQueued
/// <param name="taskHandler">任务处理委托</param>
/// <param name="delay">延迟时间(毫秒)</param>
/// <param name="channel">任务通道</param>
/// <returns><see cref="Guid"/></returns>
public static Guid Enqueue(Action<IServiceProvider> taskHandler, int delay = 0, string channel = null)
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="object"/></returns>
public static object Enqueue(Action<IServiceProvider> taskHandler, int delay = 0, string channel = null, object taskId = null, object concurrent = null)
{
var taskQueue = App.GetRequiredService<ITaskQueue>(App.RootServices);
return taskQueue.Enqueue(taskHandler, delay, channel);
return taskQueue.Enqueue(taskHandler, delay, channel, taskId, concurrent);
}

/// <summary>
Expand All @@ -31,25 +33,29 @@ public static Guid Enqueue(Action<IServiceProvider> taskHandler, int delay = 0,
/// <param name="taskHandler">任务处理委托</param>
/// <param name="delay">延迟时间(毫秒)</param>
/// <param name="channel">任务通道</param>
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="ValueTask"/></returns>
public static async ValueTask<Guid> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, int delay = 0, string channel = null)
public static async ValueTask<object> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, int delay = 0, string channel = null, object taskId = null, object concurrent = null)
{
var taskQueue = App.GetRequiredService<ITaskQueue>(App.RootServices);
return await taskQueue.EnqueueAsync(taskHandler, delay, channel);
return await taskQueue.EnqueueAsync(taskHandler, delay, channel, taskId, concurrent);
}

/// <summary>
/// 任务项入队
/// </summary>
/// <param name="taskHandler">任务处理委托</param>
/// <param name="cronExpression">Cron 表达式</param>
/// <param name="channel">任务通道</param>
/// <param name="format"><see cref="CronStringFormat"/></param>
/// <returns><see cref="Guid"/></returns>
public static Guid Enqueue(Action<IServiceProvider> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null)
/// <param name="channel">任务通道</param>
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="object"/></returns>
public static object Enqueue(Action<IServiceProvider> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null, object taskId = null, object concurrent = null)
{
var taskQueue = App.GetRequiredService<ITaskQueue>(App.RootServices);
return taskQueue.Enqueue(taskHandler, cronExpression, format, channel);
return taskQueue.Enqueue(taskHandler, cronExpression, format, channel, taskId, concurrent);
}

/// <summary>
Expand All @@ -59,10 +65,12 @@ public static Guid Enqueue(Action<IServiceProvider> taskHandler, string cronExpr
/// <param name="cronExpression">Cron 表达式</param>
/// <param name="format"><see cref="CronStringFormat"/></param>
/// <param name="channel">任务通道</param>
/// <param name="taskId">任务 Id</param>
/// <param name="concurrent">是否采用并行执行,仅支持 null,true,fale</param>
/// <returns><see cref="ValueTask"/></returns>
public static async ValueTask<Guid> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null)
public static async ValueTask<object> EnqueueAsync(Func<IServiceProvider, CancellationToken, ValueTask> taskHandler, string cronExpression, CronStringFormat format = CronStringFormat.Default, string channel = null, object taskId = null, object concurrent = null)
{
var taskQueue = App.GetRequiredService<ITaskQueue>(App.RootServices);
return await taskQueue.EnqueueAsync(taskHandler, cronExpression, format, channel);
return await taskQueue.EnqueueAsync(taskHandler, cronExpression, format, channel, taskId, concurrent);
}
}
13 changes: 11 additions & 2 deletions framework/Furion/App/Extensions/ObjectExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,21 @@ public static void CopyToSave(this byte[] bytes, string path)
/// 将流保存到本地磁盘
/// </summary>
/// <param name="stream"></param>
/// <param name="path"></param>
/// <param name="path">需包含文件名完整路径</param>
/// <returns></returns>
public static async Task CopyToSaveAsync(this Stream stream, string path)
{
// 空检查
if (string.IsNullOrWhiteSpace(path)) throw new ArgumentNullException(nameof(path));
if (string.IsNullOrWhiteSpace(path))
{
throw new ArgumentNullException(nameof(path));
}

// 文件名判断
if (string.IsNullOrWhiteSpace(Path.GetFileName(path)))
{
throw new ArgumentException("The parameter of <path> parameter must include the complete file name.");
}

using var fileStream = File.Create(path);
await stream.CopyToAsync(fileStream);
Expand Down
Loading

0 comments on commit a732c72

Please sign in to comment.