Skip to content

Commit

Permalink
Merge branch 'feature/worker-cluster' into feature/2.6.0-union
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/CAServer.EntityEventHandler/CAServerEntityEventHandlerModule.cs
  • Loading branch information
felix-zhaolei committed Dec 6, 2024
2 parents 52b8019 + 5415829 commit bb97645
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -1,38 +1,33 @@
using System.Threading.Tasks;
using CAServer.AddressBook.Migrate;
using CAServer.Options;
using Microsoft.Extensions.DependencyInjection;
using CAServer.ScheduledTask;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Threading;

namespace CAServer.EntityEventHandler.Core.Worker;

public class ContactMigrateWorker : AsyncPeriodicBackgroundWorkerBase
public class ContactMigrateWorker : ScheduledTaskBase
{
private readonly IOptionsMonitor<ContactMigrateOptions> _options;
private readonly IAddressBookMigrateService _service;
private readonly ILogger<ContactMigrateWorker> _logger;

public ContactMigrateWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory,
public ContactMigrateWorker(
IOptionsMonitor<ContactMigrateOptions> options, IAddressBookMigrateService service,
ILogger<ContactMigrateWorker> logger) : base(timer,
serviceScopeFactory)
ILogger<ContactMigrateWorker> logger)
{
_options = options;
_service = service;
_logger = logger;
timer.RunOnStart = true;
timer.Period = options.CurrentValue.Period * 1000;
Period = options.CurrentValue.Period * 1000;
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
protected override async Task DoWorkAsync()
{
_logger.LogInformation("[AddressBookMigrate] ContactMigrateWorker start.");
if (!_options.CurrentValue.Open)
{
//lob
_logger.LogInformation("[AddressBookMigrate] migrate not open.");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,36 @@
using CAServer.Grains.Grain.RedPackage;
using CAServer.Grains.State;
using CAServer.RedPackage.Dtos;
using Microsoft.Extensions.DependencyInjection;
using CAServer.ScheduledTask;
using Microsoft.Extensions.Logging;
using Nest;
using Newtonsoft.Json;
using Orleans;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Threading;

namespace CAServer.EntityEventHandler.Core.Worker;

public class CryptoGiftPreGrabQuotaExpiredWorker : AsyncPeriodicBackgroundWorkerBase
public class CryptoGiftPreGrabQuotaExpiredWorker : ScheduledTaskBase
{
private const long ExtraDeviationMilliSeconds = 90000;
private readonly INESTRepository<RedPackageIndex, Guid> _redPackageIndexRepository;
private readonly IClusterClient _clusterClient;
private readonly ICryptoGiftProvider _cryptoGiftProvider;
private readonly ILogger<CryptoGiftPreGrabQuotaExpiredWorker> _logger;

public CryptoGiftPreGrabQuotaExpiredWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory,
public CryptoGiftPreGrabQuotaExpiredWorker(
INESTRepository<RedPackageIndex, Guid> redPackageIndexRepository,
ICryptoGiftProvider cryptoGiftProvider,
IClusterClient clusterClient,
ILogger<CryptoGiftPreGrabQuotaExpiredWorker> logger) : base(timer, serviceScopeFactory)
ILogger<CryptoGiftPreGrabQuotaExpiredWorker> logger)
{
_redPackageIndexRepository = redPackageIndexRepository;
_cryptoGiftProvider = cryptoGiftProvider;
_clusterClient = clusterClient;
_logger = logger;
Timer.Period = WorkerConst.CryptoGiftExpiredTimePeriod;
Period = WorkerConst.CryptoGiftExpiredTimePeriod;
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
protected override async Task DoWorkAsync()
{
var yesterday = DateTimeOffset.Now.Subtract(TimeSpan.FromDays(1)).ToUnixTimeMilliseconds();
var mustQuery = new List<Func<QueryContainerDescriptor<RedPackageIndex>, QueryContainer>>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,32 @@
using System;
using System.Threading.Tasks;
using CAServer.PrivacyPermission;
using Microsoft.Extensions.DependencyInjection;
using CAServer.ScheduledTask;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Threading;

namespace CAServer.EntityEventHandler.Core.Worker;

public class LoginGuardianChangeRecordReceiveWorker : AsyncPeriodicBackgroundWorkerBase
public class LoginGuardianChangeRecordReceiveWorker : ScheduledTaskBase
{
private readonly IGraphQLProvider _graphQlProvider;
private readonly Options.ChainOptions _chainOptions;
private readonly IPrivacyPermissionAppService _privacyPermissionAppService;
private readonly ILogger<LoginGuardianChangeRecordReceiveWorker> _logger;

public LoginGuardianChangeRecordReceiveWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory,
public LoginGuardianChangeRecordReceiveWorker(
IGraphQLProvider graphQlProvider, IOptions<Options.ChainOptions> chainOptions,
IPrivacyPermissionAppService privacyPermissionAppService,
ILogger<LoginGuardianChangeRecordReceiveWorker> logger) : base(
timer,
serviceScopeFactory)
ILogger<LoginGuardianChangeRecordReceiveWorker> logger)
{
_chainOptions = chainOptions.Value;
_graphQlProvider = graphQlProvider;
_privacyPermissionAppService = privacyPermissionAppService;
_logger = logger;
Timer.Period = WorkerConst.TimePeriod;
Period = WorkerConst.TimePeriod;
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
protected override async Task DoWorkAsync()
{
foreach (var chainOptionsChainInfo in _chainOptions.ChainInfos)
{
Expand Down
17 changes: 7 additions & 10 deletions src/CAServer.EntityEventHandler.Core/Worker/ReferralRankWorker.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
using System.Threading.Tasks;
using CAServer.Growth;
using CAServer.Options;
using Microsoft.Extensions.DependencyInjection;
using CAServer.ScheduledTask;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Threading;

namespace CAServer.EntityEventHandler.Core.Worker;

public class ReferralRankWorker : AsyncPeriodicBackgroundWorkerBase
public class ReferralRankWorker : ScheduledTaskBase
{
private readonly IGrowthStatisticAppService _growthStatisticAppService;
private readonly ILogger<ReferralRankWorker> _logger;
private readonly ReferralRefreshTimeOptions _referralRefreshTimeOptions;


public ReferralRankWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory,

public ReferralRankWorker(
IGrowthStatisticAppService growthStatisticAppService, ILogger<ReferralRankWorker> logger,
IOptionsSnapshot<ReferralRefreshTimeOptions> referralRefreshTimeOptions) : base(timer, serviceScopeFactory)
IOptionsSnapshot<ReferralRefreshTimeOptions> referralRefreshTimeOptions)
{
_growthStatisticAppService = growthStatisticAppService;
_logger = logger;
_referralRefreshTimeOptions = referralRefreshTimeOptions.Value;
Timer.Period = _referralRefreshTimeOptions.TimePeriod;
Period = _referralRefreshTimeOptions.TimePeriod;
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
protected override async Task DoWorkAsync()
{
_logger.LogDebug("Sync referral data starting....");
await _growthStatisticAppService.CalculateReferralRankAsync();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,112 +1,41 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using CAServer.Commons;
using CAServer.Options;
using CAServer.ScheduledTask;
using CAServer.Tokens.TokenPrice;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.Caching;
using Volo.Abp.DistributedLocking;
using Volo.Abp.Threading;

namespace CAServer.EntityEventHandler.Core.Worker;

public class TokenPriceBackgroundWorker : AsyncPeriodicBackgroundWorkerBase
public class TokenPriceBackgroundWorker : ScheduledTaskBase
{
private readonly ILogger<TokenPriceBackgroundWorker> _logger;
private readonly IAbpDistributedLock _distributedLock;
private readonly IDistributedCache<string> _distributedCache;
private readonly TokenPriceWorkerOption _tokenPriceWorkerOption;
private readonly ITokenPriceService _tokenPriceService;

private readonly string _hostName;
private readonly string _workerNameKey;
private readonly string _workerLockKey;

public TokenPriceBackgroundWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory,
IAbpDistributedLock distributedLock, ILogger<TokenPriceBackgroundWorker> logger,
IDistributedCache<string> distributedCache, IOptionsMonitor<TokenPriceWorkerOption> tokenPriceWorkerOption,
ITokenPriceService tokenPriceService) : base(timer, serviceScopeFactory)
public TokenPriceBackgroundWorker(
ILogger<TokenPriceBackgroundWorker> logger,
IOptionsMonitor<TokenPriceWorkerOption> tokenPriceWorkerOption,
ITokenPriceService tokenPriceService)
{
_logger = logger;
_distributedLock = distributedLock;
_distributedCache = distributedCache;
_tokenPriceService = tokenPriceService;
_tokenPriceWorkerOption = tokenPriceWorkerOption.CurrentValue;

timer.Period = _tokenPriceWorkerOption.Period * 1000;
timer.RunOnStart = true;

_hostName = $"{HostHelper.GetLocalHostName()}_{Guid.NewGuid()}";
_workerNameKey = $"{_tokenPriceWorkerOption.Prefix}:{_tokenPriceWorkerOption.WorkerNameKey}";
_workerLockKey = $"{_tokenPriceWorkerOption.Prefix}:{_tokenPriceWorkerOption.WorkerLockKey}";
}

public override async Task StopAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("stopping token price background worker start...");
await base.StopAsync(cancellationToken);

try
{
var workName = await _distributedCache.GetAsync(_workerNameKey);
if (!workName.IsNullOrWhiteSpace() && workName != this._hostName)
{
return;
}

_logger.LogInformation("TokenPriceWorker:remove current worker... {0}", workName);
await _distributedCache.RemoveAsync(_workerNameKey);
_logger.LogInformation("TokenPriceWorker:remove current worker finished...");
}
catch (Exception e)
{
_logger.LogError(e, "TokenPriceWorker: stop Workder error.");
}
_logger.LogInformation("stoping token price background worker finished...");
Period = _tokenPriceWorkerOption.Period * 1000;
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
protected override async Task DoWorkAsync()
{
_logger.LogInformation("token price background worker start... {0}", _hostName);
try
{
var workName = await _distributedCache.GetAsync(_workerNameKey);
if (!workName.IsNullOrWhiteSpace() && workName != this._hostName)
{
_logger.LogInformation("TokenPriceWorker: running worker: {0}", workName);
return;
}

using (var lockHandle = _distributedLock.TryAcquireAsync(_workerLockKey))
{
if (lockHandle == null)
{
_logger.LogWarning("TokenPriceWorker: lock fail.");
return;
}

_logger.LogInformation("TokenPriceWorker: update current worker... {0}", _hostName);
await _distributedCache.SetAsync(_workerNameKey, _hostName, new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow =
TimeSpan.FromSeconds(_tokenPriceWorkerOption.Period + _tokenPriceWorkerOption.Period / 2)
});
_logger.LogInformation("TokenPriceWorker: update token price....");

await _tokenPriceService.RefreshCurrentPriceAsync();

_logger.LogInformation("TokenPriceWorker: update token price finished...");
}
_logger.LogInformation("TokenPriceWorker: update token price....");
await _tokenPriceService.RefreshCurrentPriceAsync();
_logger.LogInformation("TokenPriceWorker: update token price finished...");
}
catch (Exception e)
{
_logger.LogError(e, "TokenPriceWorker: task error.");
}
_logger.LogInformation("token price background worker finished...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using CAServer.Nightingale.Orleans.Filters;
using CAServer.Options;
using CAServer.Redis;
using CAServer.ScheduledTask;
using CAServer.Tokens.TokenPrice.Provider.FeiXiaoHao;
using GraphQL.Client.Abstractions;
using GraphQL.Client.Http;
Expand Down Expand Up @@ -190,21 +191,26 @@ public override void OnPreApplicationInitialization(ApplicationInitializationCon
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var backgroundWorkerManger = context.ServiceProvider.GetRequiredService<IBackgroundWorkerManager>();
backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<LoginGuardianChangeRecordReceiveWorker>());
backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<TokenPriceBackgroundWorker>());
backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<CryptoGiftPreGrabQuotaExpiredWorker>());
// backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<CryptoGiftStatisticsWorker>());
//backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<InitReferralRankWorker>());
backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<ReferralRankWorker>());
//backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<InitAddChatBotContactsWorker>());
backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<HamsterActivityWorker>());
//backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<HamsterActivityWorker>());
//backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<HamsterDataRepairWorker>());
//backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<TonGiftsValidateWorker>());
//backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService<ContactMigrateWorker>());


context.AddWorker<LoginGuardianChangeRecordReceiveWorker>();
context.AddWorker<TokenPriceBackgroundWorker>();
context.AddWorker<CryptoGiftPreGrabQuotaExpiredWorker>();
context.AddWorker<ContactMigrateWorker>();
//context.AddWorker<ReferralRankWorker>();

InitRecurringJob(context.ServiceProvider);
ConfigurationProvidersHelper.DisplayConfigurationProviders(context);
}

private static void InitRecurringJob(IServiceProvider serviceProvider)
{
var jobsService = serviceProvider.GetRequiredService<IInitWorkersService>();
jobsService.InitRecurringWorkers();
}

public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
Expand Down

0 comments on commit bb97645

Please sign in to comment.