Skip to content

Commit

Permalink
Merge branch 'feature/worker-cluster' into feature/2.6.0-union
Browse files Browse the repository at this point in the history
  • Loading branch information
felix-zhaolei committed Dec 7, 2024
2 parents bb97645 + 5cec1ef commit e6f47ca
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@


<ItemGroup>
<PackageReference Include="Hangfire" Version="1.8.6" />
<PackageReference Include="Volo.Abp.BackgroundJobs.HangFire" Version="7.0.0" />
<PackageReference Include="Hangfire.Core" Version="1.8.3" />
<PackageReference Include="Hangfire.Mongo" Version="1.7.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
<ItemGroup>
<PackageReference Include="DistributedLock.Redis" Version="1.0.2" />
<PackageReference Include="MassTransit.Abstractions" Version="8.1.0" />
<PackageReference Include="Hangfire" Version="1.8.6" />
<PackageReference Include="Hangfire.Redis.StackExchange" Version="1.9.2" />
<PackageReference Include="Serilog.AspNetCore" Version="6.0.1" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.RollingFile" Version="3.3.0" />
Expand Down
78 changes: 67 additions & 11 deletions src/CAServer.EntityEventHandler/CAServerEntityEventHandlerModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
using GraphQL.Client.Abstractions;
using GraphQL.Client.Http;
using GraphQL.Client.Serializer.Newtonsoft;
using Hangfire;
using Hangfire.Mongo;
using Hangfire.Mongo.CosmosDB;
using Hangfire.Mongo.Migration.Strategies;
using Hangfire.Mongo.Migration.Strategies.Backup;
using MassTransit;
using Medallion.Threading;
using Medallion.Threading.Redis;
Expand All @@ -26,6 +31,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using Orleans;
using Orleans.Configuration;
using Orleans.Providers.MongoDB.Configuration;
Expand Down Expand Up @@ -77,6 +83,7 @@ public override void ConfigureServices(ServiceConfigurationContext context)
ConfigureDistributedLocking(context, configuration);
ConfigureMassTransit(context, configuration);
ConfigureRedisCacheProvider(context, configuration);
ConfigureHangfire(context, configuration);
context.Services.AddSingleton<IClusterClient>(o =>
{
return new ClientBuilder()
Expand Down Expand Up @@ -106,17 +113,17 @@ public override void ConfigureServices(ServiceConfigurationContext context)
.Build();
});
}

private void ConfigureGraphQl(ServiceConfigurationContext context,
IConfiguration configuration)
{
{
Configure<GraphQLOptions>(configuration.GetSection("GraphQL"));
context.Services.AddSingleton(new GraphQLHttpClient(configuration["GraphQL:Configuration"],
new NewtonsoftJsonSerializer()));
context.Services.AddScoped<IGraphQLClient>(sp => sp.GetRequiredService<GraphQLHttpClient>());
}


private void ConfigureDistributedLocking(
ServiceConfigurationContext context,
IConfiguration configuration)
Expand All @@ -128,18 +135,18 @@ private void ConfigureDistributedLocking(
return new RedisDistributedSynchronizationProvider(connection.GetDatabase());
});
}

private void ConfigureRedisCacheProvider(
ServiceConfigurationContext context,
IConfiguration configuration)
{
var multiplexer = ConnectionMultiplexer
.Connect(configuration["Redis:Configuration"]);
context.Services.AddSingleton<IConnectionMultiplexer>(multiplexer);
context.Services.AddSingleton<ICacheProvider,RedisCacheProvider>();
context.Services.AddSingleton<ICacheProvider, RedisCacheProvider>();
}


private void ConfigureCache(IConfiguration configuration)
{
var cacheOptions = configuration.GetSection("Cache").Get<CacheOptions>();
Expand All @@ -154,7 +161,7 @@ private void ConfigureCache(IConfiguration configuration)
};
});
}

private void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
{
context.Services.AddMassTransit(x =>
Expand All @@ -163,7 +170,7 @@ private void ConfigureMassTransit(ServiceConfigurationContext context, IConfigur
// x.AddConsumer<OrderWsBroadcastConsumer>();
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(rabbitMqConfig.Connections.Default.HostName, (ushort)rabbitMqConfig.Connections.Default.Port,
cfg.Host(rabbitMqConfig.Connections.Default.HostName, (ushort)rabbitMqConfig.Connections.Default.Port,
"/", h =>
{
h.Username(rabbitMqConfig.Connections.Default.UserName);
Expand All @@ -187,7 +194,7 @@ public override void OnPreApplicationInitialization(ApplicationInitializationCon
var client = context.ServiceProvider.GetRequiredService<IClusterClient>();
AsyncHelper.RunSync(async () => await client.Connect());
}

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var backgroundWorkerManger = context.ServiceProvider.GetRequiredService<IBackgroundWorkerManager>();
Expand All @@ -201,11 +208,11 @@ public override void OnApplicationInitialization(ApplicationInitializationContex
context.AddWorker<CryptoGiftPreGrabQuotaExpiredWorker>();
context.AddWorker<ContactMigrateWorker>();
//context.AddWorker<ReferralRankWorker>();

InitRecurringJob(context.ServiceProvider);
ConfigurationProvidersHelper.DisplayConfigurationProviders(context);
}

private static void InitRecurringJob(IServiceProvider serviceProvider)
{
var jobsService = serviceProvider.GetRequiredService<IInitWorkersService>();
Expand All @@ -229,4 +236,53 @@ private void ConfigureTokenCleanupService()
{
Configure<TokenCleanupOptions>(x => x.IsCleanupEnabled = false);
}

private void ConfigureHangfire(ServiceConfigurationContext context, IConfiguration configuration)
{
var mongoType = configuration["Hangfire:MongoType"];
var connectionString = configuration["Hangfire:ConnectionString"];
if (connectionString.IsNullOrEmpty()) return;

if (mongoType.IsNullOrEmpty() ||
mongoType.Equals(MongoType.MongoDb.ToString(), StringComparison.OrdinalIgnoreCase))
{
context.Services.AddHangfire(x =>
{
x.UseMongoStorage(connectionString, new MongoStorageOptions
{
MigrationOptions = new MongoMigrationOptions
{
MigrationStrategy = new MigrateMongoMigrationStrategy(),
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckConnection = true,
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.TailNotificationsCollection
});
});
}
else if (mongoType.Equals(MongoType.DocumentDb.ToString(), StringComparison.OrdinalIgnoreCase))
{
context.Services.AddHangfire(config =>
{
var mongoUrlBuilder = new MongoUrlBuilder(connectionString);
var mongoClient = new MongoClient(mongoUrlBuilder.ToMongoUrl());
var opt = new CosmosStorageOptions
{
MigrationOptions = new MongoMigrationOptions
{
BackupStrategy = new NoneMongoBackupStrategy(),
MigrationStrategy = new DropMongoMigrationStrategy(),
}
};
config.UseCosmosStorage(mongoClient, mongoUrlBuilder.DatabaseName, opt);
});
}

context.Services.AddHangfireServer(opt =>
{
opt.SchedulePollingInterval = TimeSpan.FromMilliseconds(3000);
opt.HeartbeatInterval = TimeSpan.FromMilliseconds(3000);
opt.Queues = new[] { "default", "notDefault" };
});
}
}

0 comments on commit e6f47ca

Please sign in to comment.