diff --git a/src/CAServer.EntityEventHandler.Core/Worker/ContactMigrateWorker.cs b/src/CAServer.EntityEventHandler.Core/Worker/ContactMigrateWorker.cs index 49a0d06b4..a648c8850 100644 --- a/src/CAServer.EntityEventHandler.Core/Worker/ContactMigrateWorker.cs +++ b/src/CAServer.EntityEventHandler.Core/Worker/ContactMigrateWorker.cs @@ -1,38 +1,33 @@ using System.Threading.Tasks; using CAServer.AddressBook.Migrate; using CAServer.Options; -using Microsoft.Extensions.DependencyInjection; +using CAServer.ScheduledTask; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Volo.Abp.BackgroundWorkers; -using Volo.Abp.Threading; namespace CAServer.EntityEventHandler.Core.Worker; -public class ContactMigrateWorker : AsyncPeriodicBackgroundWorkerBase +public class ContactMigrateWorker : ScheduledTaskBase { private readonly IOptionsMonitor _options; private readonly IAddressBookMigrateService _service; private readonly ILogger _logger; - public ContactMigrateWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory, + public ContactMigrateWorker( IOptionsMonitor options, IAddressBookMigrateService service, - ILogger logger) : base(timer, - serviceScopeFactory) + ILogger logger) { _options = options; _service = service; _logger = logger; - timer.RunOnStart = true; - timer.Period = options.CurrentValue.Period * 1000; + Period = options.CurrentValue.Period * 1000; } - protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) + protected override async Task DoWorkAsync() { _logger.LogInformation("[AddressBookMigrate] ContactMigrateWorker start."); if (!_options.CurrentValue.Open) { - //lob _logger.LogInformation("[AddressBookMigrate] migrate not open."); return; } diff --git a/src/CAServer.EntityEventHandler.Core/Worker/CryptoGiftPreGrabQuotaExpiredWorker.cs b/src/CAServer.EntityEventHandler.Core/Worker/CryptoGiftPreGrabQuotaExpiredWorker.cs index 74f634a82..b1176b001 100644 --- a/src/CAServer.EntityEventHandler.Core/Worker/CryptoGiftPreGrabQuotaExpiredWorker.cs +++ b/src/CAServer.EntityEventHandler.Core/Worker/CryptoGiftPreGrabQuotaExpiredWorker.cs @@ -10,17 +10,15 @@ using CAServer.Grains.Grain.RedPackage; using CAServer.Grains.State; using CAServer.RedPackage.Dtos; -using Microsoft.Extensions.DependencyInjection; +using CAServer.ScheduledTask; using Microsoft.Extensions.Logging; using Nest; using Newtonsoft.Json; using Orleans; -using Volo.Abp.BackgroundWorkers; -using Volo.Abp.Threading; namespace CAServer.EntityEventHandler.Core.Worker; -public class CryptoGiftPreGrabQuotaExpiredWorker : AsyncPeriodicBackgroundWorkerBase +public class CryptoGiftPreGrabQuotaExpiredWorker : ScheduledTaskBase { private const long ExtraDeviationMilliSeconds = 90000; private readonly INESTRepository _redPackageIndexRepository; @@ -28,20 +26,20 @@ public class CryptoGiftPreGrabQuotaExpiredWorker : AsyncPeriodicBackgroundWorker private readonly ICryptoGiftProvider _cryptoGiftProvider; private readonly ILogger _logger; - public CryptoGiftPreGrabQuotaExpiredWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory, + public CryptoGiftPreGrabQuotaExpiredWorker( INESTRepository redPackageIndexRepository, ICryptoGiftProvider cryptoGiftProvider, IClusterClient clusterClient, - ILogger logger) : base(timer, serviceScopeFactory) + ILogger logger) { _redPackageIndexRepository = redPackageIndexRepository; _cryptoGiftProvider = cryptoGiftProvider; _clusterClient = clusterClient; _logger = logger; - Timer.Period = WorkerConst.CryptoGiftExpiredTimePeriod; + Period = WorkerConst.CryptoGiftExpiredTimePeriod; } - protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) + protected override async Task DoWorkAsync() { var yesterday = DateTimeOffset.Now.Subtract(TimeSpan.FromDays(1)).ToUnixTimeMilliseconds(); var mustQuery = new List, QueryContainer>>(); diff --git a/src/CAServer.EntityEventHandler.Core/Worker/LoginGuardianChangeRecordReceiveWorker.cs b/src/CAServer.EntityEventHandler.Core/Worker/LoginGuardianChangeRecordReceiveWorker.cs index 905b0d2ed..14860ada4 100644 --- a/src/CAServer.EntityEventHandler.Core/Worker/LoginGuardianChangeRecordReceiveWorker.cs +++ b/src/CAServer.EntityEventHandler.Core/Worker/LoginGuardianChangeRecordReceiveWorker.cs @@ -1,36 +1,32 @@ using System; using System.Threading.Tasks; using CAServer.PrivacyPermission; -using Microsoft.Extensions.DependencyInjection; +using CAServer.ScheduledTask; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Volo.Abp.BackgroundWorkers; -using Volo.Abp.Threading; namespace CAServer.EntityEventHandler.Core.Worker; -public class LoginGuardianChangeRecordReceiveWorker : AsyncPeriodicBackgroundWorkerBase +public class LoginGuardianChangeRecordReceiveWorker : ScheduledTaskBase { private readonly IGraphQLProvider _graphQlProvider; private readonly Options.ChainOptions _chainOptions; private readonly IPrivacyPermissionAppService _privacyPermissionAppService; private readonly ILogger _logger; - public LoginGuardianChangeRecordReceiveWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory, + public LoginGuardianChangeRecordReceiveWorker( IGraphQLProvider graphQlProvider, IOptions chainOptions, IPrivacyPermissionAppService privacyPermissionAppService, - ILogger logger) : base( - timer, - serviceScopeFactory) + ILogger logger) { _chainOptions = chainOptions.Value; _graphQlProvider = graphQlProvider; _privacyPermissionAppService = privacyPermissionAppService; _logger = logger; - Timer.Period = WorkerConst.TimePeriod; + Period = WorkerConst.TimePeriod; } - protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) + protected override async Task DoWorkAsync() { foreach (var chainOptionsChainInfo in _chainOptions.ChainInfos) { diff --git a/src/CAServer.EntityEventHandler.Core/Worker/ReferralRankWorker.cs b/src/CAServer.EntityEventHandler.Core/Worker/ReferralRankWorker.cs index 401a1cdcd..3a5620bbb 100644 --- a/src/CAServer.EntityEventHandler.Core/Worker/ReferralRankWorker.cs +++ b/src/CAServer.EntityEventHandler.Core/Worker/ReferralRankWorker.cs @@ -1,32 +1,29 @@ using System.Threading.Tasks; using CAServer.Growth; using CAServer.Options; -using Microsoft.Extensions.DependencyInjection; +using CAServer.ScheduledTask; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Volo.Abp.BackgroundWorkers; -using Volo.Abp.Threading; namespace CAServer.EntityEventHandler.Core.Worker; -public class ReferralRankWorker : AsyncPeriodicBackgroundWorkerBase +public class ReferralRankWorker : ScheduledTaskBase { private readonly IGrowthStatisticAppService _growthStatisticAppService; private readonly ILogger _logger; private readonly ReferralRefreshTimeOptions _referralRefreshTimeOptions; - - - public ReferralRankWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory, + + public ReferralRankWorker( IGrowthStatisticAppService growthStatisticAppService, ILogger logger, - IOptionsSnapshot referralRefreshTimeOptions) : base(timer, serviceScopeFactory) + IOptionsSnapshot referralRefreshTimeOptions) { _growthStatisticAppService = growthStatisticAppService; _logger = logger; _referralRefreshTimeOptions = referralRefreshTimeOptions.Value; - Timer.Period = _referralRefreshTimeOptions.TimePeriod; + Period = _referralRefreshTimeOptions.TimePeriod; } - protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) + protected override async Task DoWorkAsync() { _logger.LogDebug("Sync referral data starting...."); await _growthStatisticAppService.CalculateReferralRankAsync(); diff --git a/src/CAServer.EntityEventHandler.Core/Worker/TokenPriceBackgroundWorker.cs b/src/CAServer.EntityEventHandler.Core/Worker/TokenPriceBackgroundWorker.cs index 603d62541..4a88beb69 100644 --- a/src/CAServer.EntityEventHandler.Core/Worker/TokenPriceBackgroundWorker.cs +++ b/src/CAServer.EntityEventHandler.Core/Worker/TokenPriceBackgroundWorker.cs @@ -1,112 +1,41 @@ using System; -using System.Threading; using System.Threading.Tasks; -using CAServer.Commons; using CAServer.Options; +using CAServer.ScheduledTask; using CAServer.Tokens.TokenPrice; -using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Volo.Abp.BackgroundWorkers; -using Volo.Abp.Caching; -using Volo.Abp.DistributedLocking; -using Volo.Abp.Threading; namespace CAServer.EntityEventHandler.Core.Worker; -public class TokenPriceBackgroundWorker : AsyncPeriodicBackgroundWorkerBase +public class TokenPriceBackgroundWorker : ScheduledTaskBase { private readonly ILogger _logger; - private readonly IAbpDistributedLock _distributedLock; - private readonly IDistributedCache _distributedCache; private readonly TokenPriceWorkerOption _tokenPriceWorkerOption; private readonly ITokenPriceService _tokenPriceService; - private readonly string _hostName; - private readonly string _workerNameKey; - private readonly string _workerLockKey; - - public TokenPriceBackgroundWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory, - IAbpDistributedLock distributedLock, ILogger logger, - IDistributedCache distributedCache, IOptionsMonitor tokenPriceWorkerOption, - ITokenPriceService tokenPriceService) : base(timer, serviceScopeFactory) + public TokenPriceBackgroundWorker( + ILogger logger, + IOptionsMonitor tokenPriceWorkerOption, + ITokenPriceService tokenPriceService) { _logger = logger; - _distributedLock = distributedLock; - _distributedCache = distributedCache; _tokenPriceService = tokenPriceService; _tokenPriceWorkerOption = tokenPriceWorkerOption.CurrentValue; - - timer.Period = _tokenPriceWorkerOption.Period * 1000; - timer.RunOnStart = true; - - _hostName = $"{HostHelper.GetLocalHostName()}_{Guid.NewGuid()}"; - _workerNameKey = $"{_tokenPriceWorkerOption.Prefix}:{_tokenPriceWorkerOption.WorkerNameKey}"; - _workerLockKey = $"{_tokenPriceWorkerOption.Prefix}:{_tokenPriceWorkerOption.WorkerLockKey}"; - } - - public override async Task StopAsync(CancellationToken cancellationToken = default) - { - _logger.LogInformation("stopping token price background worker start..."); - await base.StopAsync(cancellationToken); - - try - { - var workName = await _distributedCache.GetAsync(_workerNameKey); - if (!workName.IsNullOrWhiteSpace() && workName != this._hostName) - { - return; - } - - _logger.LogInformation("TokenPriceWorker:remove current worker... {0}", workName); - await _distributedCache.RemoveAsync(_workerNameKey); - _logger.LogInformation("TokenPriceWorker:remove current worker finished..."); - } - catch (Exception e) - { - _logger.LogError(e, "TokenPriceWorker: stop Workder error."); - } - _logger.LogInformation("stoping token price background worker finished..."); + Period = _tokenPriceWorkerOption.Period * 1000; } - protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) + protected override async Task DoWorkAsync() { - _logger.LogInformation("token price background worker start... {0}", _hostName); try { - var workName = await _distributedCache.GetAsync(_workerNameKey); - if (!workName.IsNullOrWhiteSpace() && workName != this._hostName) - { - _logger.LogInformation("TokenPriceWorker: running worker: {0}", workName); - return; - } - - using (var lockHandle = _distributedLock.TryAcquireAsync(_workerLockKey)) - { - if (lockHandle == null) - { - _logger.LogWarning("TokenPriceWorker: lock fail."); - return; - } - - _logger.LogInformation("TokenPriceWorker: update current worker... {0}", _hostName); - await _distributedCache.SetAsync(_workerNameKey, _hostName, new DistributedCacheEntryOptions - { - AbsoluteExpirationRelativeToNow = - TimeSpan.FromSeconds(_tokenPriceWorkerOption.Period + _tokenPriceWorkerOption.Period / 2) - }); - _logger.LogInformation("TokenPriceWorker: update token price...."); - - await _tokenPriceService.RefreshCurrentPriceAsync(); - - _logger.LogInformation("TokenPriceWorker: update token price finished..."); - } + _logger.LogInformation("TokenPriceWorker: update token price...."); + await _tokenPriceService.RefreshCurrentPriceAsync(); + _logger.LogInformation("TokenPriceWorker: update token price finished..."); } catch (Exception e) { _logger.LogError(e, "TokenPriceWorker: task error."); } - _logger.LogInformation("token price background worker finished..."); } } \ No newline at end of file diff --git a/src/CAServer.EntityEventHandler/CAServerEntityEventHandlerModule.cs b/src/CAServer.EntityEventHandler/CAServerEntityEventHandlerModule.cs index ee1fdbe5d..51de78b19 100644 --- a/src/CAServer.EntityEventHandler/CAServerEntityEventHandlerModule.cs +++ b/src/CAServer.EntityEventHandler/CAServerEntityEventHandlerModule.cs @@ -13,6 +13,7 @@ using CAServer.Nightingale.Orleans.Filters; using CAServer.Options; using CAServer.Redis; +using CAServer.ScheduledTask; using CAServer.Tokens.TokenPrice.Provider.FeiXiaoHao; using GraphQL.Client.Abstractions; using GraphQL.Client.Http; @@ -190,21 +191,26 @@ public override void OnPreApplicationInitialization(ApplicationInitializationCon public override void OnApplicationInitialization(ApplicationInitializationContext context) { var backgroundWorkerManger = context.ServiceProvider.GetRequiredService(); - backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); - backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); - backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); - // backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); - //backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); - backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); //backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); - backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); + //backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); //backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); //backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); - //backgroundWorkerManger.AddAsync(context.ServiceProvider.GetService()); - + + context.AddWorker(); + context.AddWorker(); + context.AddWorker(); + context.AddWorker(); + //context.AddWorker(); + InitRecurringJob(context.ServiceProvider); ConfigurationProvidersHelper.DisplayConfigurationProviders(context); } + + private static void InitRecurringJob(IServiceProvider serviceProvider) + { + var jobsService = serviceProvider.GetRequiredService(); + jobsService.InitRecurringWorkers(); + } public override void OnApplicationShutdown(ApplicationShutdownContext context) {