From 0bb3b0ebaea016654b8164e803dfaf5ea7907787 Mon Sep 17 00:00:00 2001 From: Konstantin Ryazantsev Date: Thu, 21 Mar 2024 15:48:35 +0300 Subject: [PATCH] LM-3196 implemented API trading blocking --- settings.yaml | 3 + .../Configuration/RabbitMqConfig.cs | 9 +- .../Configuration/RabbitMqConnection.cs | 8 ++ .../Configuration/ServicesConfig.cs | 2 +- .../{UserExxtensions.cs => UserExtensions.cs} | 4 +- src/HftApi/HftApi.csproj | 2 +- src/HftApi/Modules/AutofacModule.cs | 13 ++- .../ClientSettingsUpdatesSubscriber.cs | 102 ++++++++++++++++++ .../RabbitSubscribers/KeyUpdateSubscriber.cs | 58 +++++++--- .../ClientSettingsCashoutBlockUpdated.cs | 10 ++ .../Services/IBlockedClientsService.cs | 9 ++ .../Services/ITokenService.cs | 2 +- src/Lykke.HftApi.Services/AssetsService.cs | 4 +- .../BlockedClientsService.cs | 38 +++++++ .../Lykke.HftApi.Services.csproj | 3 +- src/Lykke.HftApi.Services/TokenService.cs | 25 +++-- 16 files changed, 252 insertions(+), 40 deletions(-) create mode 100644 src/HftApi.Common/Configuration/RabbitMqConnection.cs rename src/HftApi/Extensions/{UserExxtensions.cs => UserExtensions.cs} (93%) create mode 100644 src/HftApi/RabbitSubscribers/ClientSettingsUpdatesSubscriber.cs create mode 100644 src/HftApi/RabbitSubscribers/Messages/ClientSettingsCashoutBlockUpdated.cs create mode 100644 src/Lykke.HftApi.Domain/Services/IBlockedClientsService.cs create mode 100644 src/Lykke.HftApi.Services/BlockedClientsService.cs diff --git a/settings.yaml b/settings.yaml index 78e9954..77f1fef 100644 --- a/settings.yaml +++ b/settings.yaml @@ -84,6 +84,9 @@ RabbitMq: settings-key: RabbitSpotPricesConnString ExchangeName: settings-key: PublicTradesExchangeName + ClientAccountFeedConnectionString: + settings-key: HFT-ClientAccountFeed + types: [RabbitMq] SagasRabbitMq: RabbitConnectionString: settings-key: RabbitSagas diff --git a/src/HftApi.Common/Configuration/RabbitMqConfig.cs b/src/HftApi.Common/Configuration/RabbitMqConfig.cs index 485e9b4..17cc000 100644 --- a/src/HftApi.Common/Configuration/RabbitMqConfig.cs +++ b/src/HftApi.Common/Configuration/RabbitMqConfig.cs @@ -1,4 +1,4 @@ -namespace HftApi.Common.Configuration +namespace HftApi.Common.Configuration { public class RabbitMqConfig { @@ -7,11 +7,6 @@ public class RabbitMqConfig public RabbitMqConnection Balances { get; set; } public RabbitMqConnection Orders { get; set; } public RabbitMqConnection PublicTrades { get; set; } - } - - public class RabbitMqConnection - { - public string ConnectionString { get; set; } - public string ExchangeName { get; set; } + public string ClientAccountFeedConnectionString { get; set; } } } diff --git a/src/HftApi.Common/Configuration/RabbitMqConnection.cs b/src/HftApi.Common/Configuration/RabbitMqConnection.cs new file mode 100644 index 0000000..833b93f --- /dev/null +++ b/src/HftApi.Common/Configuration/RabbitMqConnection.cs @@ -0,0 +1,8 @@ +namespace HftApi.Common.Configuration +{ + public class RabbitMqConnection + { + public string ConnectionString { get; set; } + public string ExchangeName { get; set; } + } +} diff --git a/src/HftApi.Common/Configuration/ServicesConfig.cs b/src/HftApi.Common/Configuration/ServicesConfig.cs index 5d72626..05deb7f 100644 --- a/src/HftApi.Common/Configuration/ServicesConfig.cs +++ b/src/HftApi.Common/Configuration/ServicesConfig.cs @@ -1,4 +1,4 @@ -using Lykke.Service.Kyc.Client; +using Lykke.Service.Kyc.Client; namespace HftApi.Common.Configuration { diff --git a/src/HftApi/Extensions/UserExxtensions.cs b/src/HftApi/Extensions/UserExtensions.cs similarity index 93% rename from src/HftApi/Extensions/UserExxtensions.cs rename to src/HftApi/Extensions/UserExtensions.cs index 13bdcf6..f169910 100644 --- a/src/HftApi/Extensions/UserExxtensions.cs +++ b/src/HftApi/Extensions/UserExtensions.cs @@ -1,9 +1,9 @@ -using System.Linq; +using System.Linq; using System.Security.Claims; namespace HftApi.Extensions { - public static class UserExxtensions + public static class UserExtensions { public static string GetClientId(this ClaimsPrincipal user) { diff --git a/src/HftApi/HftApi.csproj b/src/HftApi/HftApi.csproj index 3aa4d4a..f40505b 100644 --- a/src/HftApi/HftApi.csproj +++ b/src/HftApi/HftApi.csproj @@ -21,7 +21,7 @@ - + diff --git a/src/HftApi/Modules/AutofacModule.cs b/src/HftApi/Modules/AutofacModule.cs index 099cb16..c95afdb 100644 --- a/src/HftApi/Modules/AutofacModule.cs +++ b/src/HftApi/Modules/AutofacModule.cs @@ -1,4 +1,4 @@ -using System; +using System; using Autofac; using AzureStorage; using AzureStorage.Tables; @@ -52,6 +52,9 @@ protected override void Load(ContainerBuilder builder) .WithParameter(TypedParameter.From(_config.Redis.OrderBooksCacheKeyPattern)) .SingleInstance(); + builder.RegisterType() + .As(); + var cache = new RedisCache(new RedisCacheOptions { Configuration = _config.Redis.RedisConfiguration, @@ -80,6 +83,12 @@ protected override void Load(ContainerBuilder builder) .WithParameter("exchangeName", _config.RabbitMq.HftInternal.ExchangeName) .SingleInstance(); + builder.RegisterType() + .As() + .AutoActivate() + .WithParameter("connectionString", _config.RabbitMq.ClientAccountFeedConnectionString) + .SingleInstance(); + builder.RegisterHftInternalClient(_config.Services.HftInternalServiceUrl); builder.RegisterType() @@ -187,7 +196,7 @@ protected override void Load(ContainerBuilder builder) builder.RegisterOperationsClient(_config.Services.OperationsServiceUrl); builder.RegisterClientDialogsClient(_config.Services.ClientDialogsServiceUrl); - + builder.RegisterInstance( new Swisschain.Sirius.Api.ApiClient.ApiClient(_config.Services.SiriusApiServiceClient.GrpcServiceUrl, _config.Services.SiriusApiServiceClient.ApiKey) ).As(); diff --git a/src/HftApi/RabbitSubscribers/ClientSettingsUpdatesSubscriber.cs b/src/HftApi/RabbitSubscribers/ClientSettingsUpdatesSubscriber.cs new file mode 100644 index 0000000..cce4824 --- /dev/null +++ b/src/HftApi/RabbitSubscribers/ClientSettingsUpdatesSubscriber.cs @@ -0,0 +1,102 @@ +using System; +using System.Threading.Tasks; +using Autofac; +using Common.Log; +using HftApi.RabbitSubscribers.Messages; +using Lykke.Common.Log; +using Lykke.HftApi.Domain.Services; +using Lykke.RabbitMqBroker; +using Lykke.RabbitMqBroker.Subscriber; +using Lykke.Service.HftInternalService.Client; + +namespace HftApi.RabbitSubscribers +{ + public class ClientSettingsUpdatesSubscriber : IStartable, IDisposable + { + private readonly RabbitMqSubscriber _subscriber; + private readonly IHftInternalClient _hftInternalClient; + private readonly ITokenService _tokenService; + private readonly ILog _log; + + public ClientSettingsUpdatesSubscriber(ILogFactory logFactory, + string connectionString, + IHftInternalClient hftInternalClient, + ITokenService tokenService) + { + _hftInternalClient = hftInternalClient; + _tokenService = tokenService; + _log = logFactory.CreateLog(this); + + var subscriptionSettings = RabbitMqSubscriptionSettings + .ForSubscriber( + connectionString, + "client-account.client-settings-updated", + $"client-account.client-settings-updated.hft-api-v2-{Environment.MachineName}") + .UseRoutingKey(nameof(ClientSettingsCashoutBlockUpdated)); + + subscriptionSettings.DeadLetterExchangeName = null; + + var strategy = new DefaultErrorHandlingStrategy(logFactory, subscriptionSettings); + + _subscriber = new RabbitMqSubscriber(logFactory, subscriptionSettings, strategy) + .SetMessageDeserializer(new JsonMessageDeserializer()) + .SetMessageReadStrategy(new MessageReadWithTemporaryQueueStrategy()) + .Subscribe(HandleMessage); + } + + public async Task HandleMessage(ClientSettingsCashoutBlockUpdated evt) + { + // Race condition with KeyUpdatedEvent event handling is possible, but it is decided + // that it's acceptable since these events are not very frequent + + _log.Info($"Got client trades blocking update. Trades are blocked: {evt.TradesBlocked}", context: new + { + ClientId = evt.ClientId + }); + + var enabledClientApiKeys = await _hftInternalClient.Keys.GetKeys(evt.ClientId); + + if (evt.TradesBlocked) + { + foreach (var key in enabledClientApiKeys) + { + _tokenService.Remove(key.ApiKey); + + var apiKeyStart = key.ApiKey.Substring(0, 4); + + _log.Info($"API key has been cached", context: new + { + ClientId = evt.ClientId, + ApiKeyStart = apiKeyStart + }); + } + } + else + { + foreach (var key in enabledClientApiKeys) + { + _tokenService.Add(key.ApiKey); + + var apiKeyStart = key.ApiKey.Substring(0, 4); + + _log.Info($"API key has been evicted from the cache", context: new + { + ClientId = evt.ClientId, + ApiKeyStart = apiKeyStart + }); + } + } + } + + public void Start() + { + _subscriber.Start(); + } + + public void Dispose() + { + _subscriber?.Stop(); + _subscriber?.Dispose(); + } + } +} diff --git a/src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs b/src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs index b56d816..51df31c 100644 --- a/src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs +++ b/src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs @@ -1,12 +1,12 @@ -using System; +using System; using System.Threading.Tasks; using Autofac; +using Common.Log; using Lykke.Common.Log; using Lykke.HftApi.Domain.Services; using Lykke.RabbitMqBroker; using Lykke.RabbitMqBroker.Subscriber; using Lykke.Service.HftInternalService.Client.Messages; -using Microsoft.Extensions.Logging; namespace HftApi.RabbitSubscribers { @@ -16,25 +16,27 @@ public class KeyUpdateSubscriber : IStartable, IDisposable private readonly string _exchangeName; private readonly ITokenService _tokenService; private readonly ILogFactory _logFactory; + private readonly ILog _log; + private readonly IBlockedClientsService _blockedClients; private RabbitMqSubscriber _subscriber; public KeyUpdateSubscriber( string connectionString, string exchangeName, ITokenService tokenService, - ILoggerFactory loggerFactory, - ILogFactory logFactory) + ILogFactory logFactory, + IBlockedClientsService blockedClients) { + _log = logFactory.CreateLog(this); _connectionString = connectionString; _exchangeName = exchangeName; _tokenService = tokenService; _logFactory = logFactory; + _blockedClients = blockedClients; } public void Start() { - _tokenService.InitAsync().GetAwaiter().GetResult(); - var settings = RabbitMqSubscriptionSettings .ForSubscriber(_connectionString, _exchangeName, $"{nameof(KeyUpdateSubscriber)}-{Environment.MachineName}"); @@ -42,25 +44,57 @@ public void Start() _subscriber = new RabbitMqSubscriber(_logFactory, settings, - new ResilientErrorHandlingStrategy(_logFactory, settings, TimeSpan.FromSeconds(10))) + new ResilientErrorHandlingStrategy(_log, settings, TimeSpan.FromSeconds(10))) .SetMessageDeserializer(new JsonMessageDeserializer()) .Subscribe(ProcessMessageAsync) .CreateDefaultBinding() .Start(); + + _tokenService.InitAsync().GetAwaiter().GetResult(); } - private Task ProcessMessageAsync(KeyUpdatedEvent message) + private async Task ProcessMessageAsync(KeyUpdatedEvent message) { - if (message.IsDeleted) - _tokenService.Remove(message.Id); - else + // Race condition with ClientSettingsCashoutBlockUpdated event handling is possible, but it is decided + // that it's acceptable since these events are not very frequent + + var isClientBlocked = await _blockedClients.IsClientBlocked(message.ClientId); + var apiKeyStart = message.Id.Substring(0, 4); + + _log.Info($"API key deleted: {message.IsDeleted}. Client blocked: {isClientBlocked}", context: new + { + ClientId = message.ClientId, + WalletId = message.WalletId, + ApiKeyStart = apiKeyStart + }); + + if (!message.IsDeleted && !isClientBlocked) + { _tokenService.Add(message.Id); - return Task.CompletedTask; + _log.Info($"API key has been cached", context: new + { + ClientId = message.ClientId, + WalletId = message.WalletId, + ApiKeyStart = apiKeyStart + }); + } + else + { + _tokenService.Remove(message.Id); + + _log.Info($"API key has been evicted from the cache", context: new + { + ClientId = message.ClientId, + WalletId = message.WalletId, + ApiKeyStart = apiKeyStart + }); + } } public void Dispose() { + _subscriber?.Stop(); _subscriber?.Dispose(); } } diff --git a/src/HftApi/RabbitSubscribers/Messages/ClientSettingsCashoutBlockUpdated.cs b/src/HftApi/RabbitSubscribers/Messages/ClientSettingsCashoutBlockUpdated.cs new file mode 100644 index 0000000..4a68f8f --- /dev/null +++ b/src/HftApi/RabbitSubscribers/Messages/ClientSettingsCashoutBlockUpdated.cs @@ -0,0 +1,10 @@ +namespace HftApi.RabbitSubscribers.Messages +{ + // Owned by Lykke.Service.ClientAccount service + public class ClientSettingsCashoutBlockUpdated + { + public string ClientId { get; set; } + public bool CashOutBlocked { get; set; } + public bool TradesBlocked { get; set; } + } +} diff --git a/src/Lykke.HftApi.Domain/Services/IBlockedClientsService.cs b/src/Lykke.HftApi.Domain/Services/IBlockedClientsService.cs new file mode 100644 index 0000000..410fbce --- /dev/null +++ b/src/Lykke.HftApi.Domain/Services/IBlockedClientsService.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Lykke.HftApi.Domain.Services +{ + public interface IBlockedClientsService + { + Task IsClientBlocked(string clientId); + } +} diff --git a/src/Lykke.HftApi.Domain/Services/ITokenService.cs b/src/Lykke.HftApi.Domain/Services/ITokenService.cs index 8e72bed..dd787f2 100644 --- a/src/Lykke.HftApi.Domain/Services/ITokenService.cs +++ b/src/Lykke.HftApi.Domain/Services/ITokenService.cs @@ -1,4 +1,4 @@ -using System.Threading.Tasks; +using System.Threading.Tasks; namespace Lykke.HftApi.Domain.Services { diff --git a/src/Lykke.HftApi.Services/AssetsService.cs b/src/Lykke.HftApi.Services/AssetsService.cs index 4044759..2ff20ca 100644 --- a/src/Lykke.HftApi.Services/AssetsService.cs +++ b/src/Lykke.HftApi.Services/AssetsService.cs @@ -1,12 +1,10 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Autofac; -using Lykke.HftApi.Domain; using Lykke.HftApi.Domain.Entities; using Lykke.HftApi.Domain.Entities.Assets; -using Lykke.HftApi.Domain.Exceptions; using Lykke.HftApi.Domain.Services; using Lykke.HftApi.Services.AssetsClient; diff --git a/src/Lykke.HftApi.Services/BlockedClientsService.cs b/src/Lykke.HftApi.Services/BlockedClientsService.cs new file mode 100644 index 0000000..a9b8652 --- /dev/null +++ b/src/Lykke.HftApi.Services/BlockedClientsService.cs @@ -0,0 +1,38 @@ +using System.Threading.Tasks; +using Common.Log; +using Lykke.Common.ApiLibrary.Exceptions; +using Lykke.Common.Log; +using Lykke.HftApi.Domain.Services; +using Lykke.Service.ClientAccount.Client; + +namespace Lykke.HftApi.Services +{ + public class BlockedClientsService : IBlockedClientsService + { + private readonly ILog _log; + private readonly IClientAccountClient _clientAccountClient; + + public BlockedClientsService(ILogFactory logFactory, IClientAccountClient clientAccountClient) + { + _log = logFactory.CreateLog(this); + _clientAccountClient = clientAccountClient; + } + + public async Task IsClientBlocked(string clientId) + { + try + { + return (await _clientAccountClient.ClientSettings.GetCashOutBlockSettingsAsync(clientId))?.TradesBlocked == true; + } + catch (ClientApiException ex) when (ex.Message == "Client not found") + { + _log.Warning("Client not found. It will be treated as blocked one", context: new + { + ClientId = clientId + }); + + return false; + } + } + } +} diff --git a/src/Lykke.HftApi.Services/Lykke.HftApi.Services.csproj b/src/Lykke.HftApi.Services/Lykke.HftApi.Services.csproj index 5c833e9..a540842 100644 --- a/src/Lykke.HftApi.Services/Lykke.HftApi.Services.csproj +++ b/src/Lykke.HftApi.Services/Lykke.HftApi.Services.csproj @@ -16,10 +16,11 @@ + - + diff --git a/src/Lykke.HftApi.Services/TokenService.cs b/src/Lykke.HftApi.Services/TokenService.cs index 2ab216f..bc047d1 100644 --- a/src/Lykke.HftApi.Services/TokenService.cs +++ b/src/Lykke.HftApi.Services/TokenService.cs @@ -1,4 +1,4 @@ -using System.Collections.Concurrent; +using System.Collections.Concurrent; using System.Threading.Tasks; using Lykke.HftApi.Domain.Services; using Lykke.Service.HftInternalService.Client; @@ -10,28 +10,35 @@ public class TokenService : ITokenService { private readonly IHftInternalClient _hftInternalClient; private readonly ILogger _logger; + private readonly IBlockedClientsService _blockedClients; private readonly ConcurrentDictionary _cache; public TokenService( IHftInternalClient hftInternalClient, - ILogger logger - ) + ILogger logger, + IBlockedClientsService blockedClients) { _hftInternalClient = hftInternalClient; _logger = logger; + _blockedClients = blockedClients; _cache = new ConcurrentDictionary(); } public async Task InitAsync() { - _logger.LogInformation("Getting key ids"); - var ids = await _hftInternalClient.Keys.GetAllKeyIds(); - _logger.LogInformation($"Caching {ids.Count} ids"); + _logger.LogInformation("API keys cache is being initialized"); - foreach (var id in ids) + var keys = await _hftInternalClient.Keys.GetAllKeys(); + + foreach (var key in keys) { - _cache.TryAdd(id, 0); + if (!await _blockedClients.IsClientBlocked(key.ClientId)) + { + _cache.TryAdd(key.ApiKey, 0); + } } + + _logger.LogInformation($"API keys cache has been initialized. {_cache.Count} active keys were added to the cache"); } public bool IsValid(string id) @@ -41,13 +48,11 @@ public bool IsValid(string id) public void Add(string id) { - _logger.LogInformation($"Adding {id} to cache"); _cache.TryAdd(id, 0); } public void Remove(string id) { - _logger.LogInformation($"Removing {id} from cache"); _cache.TryRemove(id, out _); } }