diff --git a/settings.yaml b/settings.yaml
index 78e9954..77f1fef 100644
--- a/settings.yaml
+++ b/settings.yaml
@@ -84,6 +84,9 @@ RabbitMq:
settings-key: RabbitSpotPricesConnString
ExchangeName:
settings-key: PublicTradesExchangeName
+ ClientAccountFeedConnectionString:
+ settings-key: HFT-ClientAccountFeed
+ types: [RabbitMq]
SagasRabbitMq:
RabbitConnectionString:
settings-key: RabbitSagas
diff --git a/src/HftApi.Common/Configuration/RabbitMqConfig.cs b/src/HftApi.Common/Configuration/RabbitMqConfig.cs
index 485e9b4..17cc000 100644
--- a/src/HftApi.Common/Configuration/RabbitMqConfig.cs
+++ b/src/HftApi.Common/Configuration/RabbitMqConfig.cs
@@ -1,4 +1,4 @@
-namespace HftApi.Common.Configuration
+namespace HftApi.Common.Configuration
{
public class RabbitMqConfig
{
@@ -7,11 +7,6 @@ public class RabbitMqConfig
public RabbitMqConnection Balances { get; set; }
public RabbitMqConnection Orders { get; set; }
public RabbitMqConnection PublicTrades { get; set; }
- }
-
- public class RabbitMqConnection
- {
- public string ConnectionString { get; set; }
- public string ExchangeName { get; set; }
+ public string ClientAccountFeedConnectionString { get; set; }
}
}
diff --git a/src/HftApi.Common/Configuration/RabbitMqConnection.cs b/src/HftApi.Common/Configuration/RabbitMqConnection.cs
new file mode 100644
index 0000000..833b93f
--- /dev/null
+++ b/src/HftApi.Common/Configuration/RabbitMqConnection.cs
@@ -0,0 +1,8 @@
+namespace HftApi.Common.Configuration
+{
+ public class RabbitMqConnection
+ {
+ public string ConnectionString { get; set; }
+ public string ExchangeName { get; set; }
+ }
+}
diff --git a/src/HftApi.Common/Configuration/ServicesConfig.cs b/src/HftApi.Common/Configuration/ServicesConfig.cs
index 5d72626..05deb7f 100644
--- a/src/HftApi.Common/Configuration/ServicesConfig.cs
+++ b/src/HftApi.Common/Configuration/ServicesConfig.cs
@@ -1,4 +1,4 @@
-using Lykke.Service.Kyc.Client;
+using Lykke.Service.Kyc.Client;
namespace HftApi.Common.Configuration
{
diff --git a/src/HftApi/Extensions/UserExxtensions.cs b/src/HftApi/Extensions/UserExtensions.cs
similarity index 93%
rename from src/HftApi/Extensions/UserExxtensions.cs
rename to src/HftApi/Extensions/UserExtensions.cs
index 13bdcf6..f169910 100644
--- a/src/HftApi/Extensions/UserExxtensions.cs
+++ b/src/HftApi/Extensions/UserExtensions.cs
@@ -1,9 +1,9 @@
-using System.Linq;
+using System.Linq;
using System.Security.Claims;
namespace HftApi.Extensions
{
- public static class UserExxtensions
+ public static class UserExtensions
{
public static string GetClientId(this ClaimsPrincipal user)
{
diff --git a/src/HftApi/HftApi.csproj b/src/HftApi/HftApi.csproj
index 3aa4d4a..f40505b 100644
--- a/src/HftApi/HftApi.csproj
+++ b/src/HftApi/HftApi.csproj
@@ -21,7 +21,7 @@
-
+
diff --git a/src/HftApi/Modules/AutofacModule.cs b/src/HftApi/Modules/AutofacModule.cs
index 099cb16..c95afdb 100644
--- a/src/HftApi/Modules/AutofacModule.cs
+++ b/src/HftApi/Modules/AutofacModule.cs
@@ -1,4 +1,4 @@
-using System;
+using System;
using Autofac;
using AzureStorage;
using AzureStorage.Tables;
@@ -52,6 +52,9 @@ protected override void Load(ContainerBuilder builder)
.WithParameter(TypedParameter.From(_config.Redis.OrderBooksCacheKeyPattern))
.SingleInstance();
+ builder.RegisterType()
+ .As();
+
var cache = new RedisCache(new RedisCacheOptions
{
Configuration = _config.Redis.RedisConfiguration,
@@ -80,6 +83,12 @@ protected override void Load(ContainerBuilder builder)
.WithParameter("exchangeName", _config.RabbitMq.HftInternal.ExchangeName)
.SingleInstance();
+ builder.RegisterType()
+ .As()
+ .AutoActivate()
+ .WithParameter("connectionString", _config.RabbitMq.ClientAccountFeedConnectionString)
+ .SingleInstance();
+
builder.RegisterHftInternalClient(_config.Services.HftInternalServiceUrl);
builder.RegisterType()
@@ -187,7 +196,7 @@ protected override void Load(ContainerBuilder builder)
builder.RegisterOperationsClient(_config.Services.OperationsServiceUrl);
builder.RegisterClientDialogsClient(_config.Services.ClientDialogsServiceUrl);
-
+
builder.RegisterInstance(
new Swisschain.Sirius.Api.ApiClient.ApiClient(_config.Services.SiriusApiServiceClient.GrpcServiceUrl, _config.Services.SiriusApiServiceClient.ApiKey)
).As();
diff --git a/src/HftApi/RabbitSubscribers/ClientSettingsUpdatesSubscriber.cs b/src/HftApi/RabbitSubscribers/ClientSettingsUpdatesSubscriber.cs
new file mode 100644
index 0000000..cce4824
--- /dev/null
+++ b/src/HftApi/RabbitSubscribers/ClientSettingsUpdatesSubscriber.cs
@@ -0,0 +1,102 @@
+using System;
+using System.Threading.Tasks;
+using Autofac;
+using Common.Log;
+using HftApi.RabbitSubscribers.Messages;
+using Lykke.Common.Log;
+using Lykke.HftApi.Domain.Services;
+using Lykke.RabbitMqBroker;
+using Lykke.RabbitMqBroker.Subscriber;
+using Lykke.Service.HftInternalService.Client;
+
+namespace HftApi.RabbitSubscribers
+{
+ public class ClientSettingsUpdatesSubscriber : IStartable, IDisposable
+ {
+ private readonly RabbitMqSubscriber _subscriber;
+ private readonly IHftInternalClient _hftInternalClient;
+ private readonly ITokenService _tokenService;
+ private readonly ILog _log;
+
+ public ClientSettingsUpdatesSubscriber(ILogFactory logFactory,
+ string connectionString,
+ IHftInternalClient hftInternalClient,
+ ITokenService tokenService)
+ {
+ _hftInternalClient = hftInternalClient;
+ _tokenService = tokenService;
+ _log = logFactory.CreateLog(this);
+
+ var subscriptionSettings = RabbitMqSubscriptionSettings
+ .ForSubscriber(
+ connectionString,
+ "client-account.client-settings-updated",
+ $"client-account.client-settings-updated.hft-api-v2-{Environment.MachineName}")
+ .UseRoutingKey(nameof(ClientSettingsCashoutBlockUpdated));
+
+ subscriptionSettings.DeadLetterExchangeName = null;
+
+ var strategy = new DefaultErrorHandlingStrategy(logFactory, subscriptionSettings);
+
+ _subscriber = new RabbitMqSubscriber(logFactory, subscriptionSettings, strategy)
+ .SetMessageDeserializer(new JsonMessageDeserializer())
+ .SetMessageReadStrategy(new MessageReadWithTemporaryQueueStrategy())
+ .Subscribe(HandleMessage);
+ }
+
+ public async Task HandleMessage(ClientSettingsCashoutBlockUpdated evt)
+ {
+ // Race condition with KeyUpdatedEvent event handling is possible, but it is decided
+ // that it's acceptable since these events are not very frequent
+
+ _log.Info($"Got client trades blocking update. Trades are blocked: {evt.TradesBlocked}", context: new
+ {
+ ClientId = evt.ClientId
+ });
+
+ var enabledClientApiKeys = await _hftInternalClient.Keys.GetKeys(evt.ClientId);
+
+ if (evt.TradesBlocked)
+ {
+ foreach (var key in enabledClientApiKeys)
+ {
+ _tokenService.Remove(key.ApiKey);
+
+ var apiKeyStart = key.ApiKey.Substring(0, 4);
+
+ _log.Info($"API key has been cached", context: new
+ {
+ ClientId = evt.ClientId,
+ ApiKeyStart = apiKeyStart
+ });
+ }
+ }
+ else
+ {
+ foreach (var key in enabledClientApiKeys)
+ {
+ _tokenService.Add(key.ApiKey);
+
+ var apiKeyStart = key.ApiKey.Substring(0, 4);
+
+ _log.Info($"API key has been evicted from the cache", context: new
+ {
+ ClientId = evt.ClientId,
+ ApiKeyStart = apiKeyStart
+ });
+ }
+ }
+ }
+
+ public void Start()
+ {
+ _subscriber.Start();
+ }
+
+ public void Dispose()
+ {
+ _subscriber?.Stop();
+ _subscriber?.Dispose();
+ }
+ }
+}
diff --git a/src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs b/src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs
index b56d816..51df31c 100644
--- a/src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs
+++ b/src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs
@@ -1,12 +1,12 @@
-using System;
+using System;
using System.Threading.Tasks;
using Autofac;
+using Common.Log;
using Lykke.Common.Log;
using Lykke.HftApi.Domain.Services;
using Lykke.RabbitMqBroker;
using Lykke.RabbitMqBroker.Subscriber;
using Lykke.Service.HftInternalService.Client.Messages;
-using Microsoft.Extensions.Logging;
namespace HftApi.RabbitSubscribers
{
@@ -16,25 +16,27 @@ public class KeyUpdateSubscriber : IStartable, IDisposable
private readonly string _exchangeName;
private readonly ITokenService _tokenService;
private readonly ILogFactory _logFactory;
+ private readonly ILog _log;
+ private readonly IBlockedClientsService _blockedClients;
private RabbitMqSubscriber _subscriber;
public KeyUpdateSubscriber(
string connectionString,
string exchangeName,
ITokenService tokenService,
- ILoggerFactory loggerFactory,
- ILogFactory logFactory)
+ ILogFactory logFactory,
+ IBlockedClientsService blockedClients)
{
+ _log = logFactory.CreateLog(this);
_connectionString = connectionString;
_exchangeName = exchangeName;
_tokenService = tokenService;
_logFactory = logFactory;
+ _blockedClients = blockedClients;
}
public void Start()
{
- _tokenService.InitAsync().GetAwaiter().GetResult();
-
var settings = RabbitMqSubscriptionSettings
.ForSubscriber(_connectionString, _exchangeName, $"{nameof(KeyUpdateSubscriber)}-{Environment.MachineName}");
@@ -42,25 +44,57 @@ public void Start()
_subscriber = new RabbitMqSubscriber(_logFactory,
settings,
- new ResilientErrorHandlingStrategy(_logFactory, settings, TimeSpan.FromSeconds(10)))
+ new ResilientErrorHandlingStrategy(_log, settings, TimeSpan.FromSeconds(10)))
.SetMessageDeserializer(new JsonMessageDeserializer())
.Subscribe(ProcessMessageAsync)
.CreateDefaultBinding()
.Start();
+
+ _tokenService.InitAsync().GetAwaiter().GetResult();
}
- private Task ProcessMessageAsync(KeyUpdatedEvent message)
+ private async Task ProcessMessageAsync(KeyUpdatedEvent message)
{
- if (message.IsDeleted)
- _tokenService.Remove(message.Id);
- else
+ // Race condition with ClientSettingsCashoutBlockUpdated event handling is possible, but it is decided
+ // that it's acceptable since these events are not very frequent
+
+ var isClientBlocked = await _blockedClients.IsClientBlocked(message.ClientId);
+ var apiKeyStart = message.Id.Substring(0, 4);
+
+ _log.Info($"API key deleted: {message.IsDeleted}. Client blocked: {isClientBlocked}", context: new
+ {
+ ClientId = message.ClientId,
+ WalletId = message.WalletId,
+ ApiKeyStart = apiKeyStart
+ });
+
+ if (!message.IsDeleted && !isClientBlocked)
+ {
_tokenService.Add(message.Id);
- return Task.CompletedTask;
+ _log.Info($"API key has been cached", context: new
+ {
+ ClientId = message.ClientId,
+ WalletId = message.WalletId,
+ ApiKeyStart = apiKeyStart
+ });
+ }
+ else
+ {
+ _tokenService.Remove(message.Id);
+
+ _log.Info($"API key has been evicted from the cache", context: new
+ {
+ ClientId = message.ClientId,
+ WalletId = message.WalletId,
+ ApiKeyStart = apiKeyStart
+ });
+ }
}
public void Dispose()
{
+ _subscriber?.Stop();
_subscriber?.Dispose();
}
}
diff --git a/src/HftApi/RabbitSubscribers/Messages/ClientSettingsCashoutBlockUpdated.cs b/src/HftApi/RabbitSubscribers/Messages/ClientSettingsCashoutBlockUpdated.cs
new file mode 100644
index 0000000..4a68f8f
--- /dev/null
+++ b/src/HftApi/RabbitSubscribers/Messages/ClientSettingsCashoutBlockUpdated.cs
@@ -0,0 +1,10 @@
+namespace HftApi.RabbitSubscribers.Messages
+{
+ // Owned by Lykke.Service.ClientAccount service
+ public class ClientSettingsCashoutBlockUpdated
+ {
+ public string ClientId { get; set; }
+ public bool CashOutBlocked { get; set; }
+ public bool TradesBlocked { get; set; }
+ }
+}
diff --git a/src/Lykke.HftApi.Domain/Services/IBlockedClientsService.cs b/src/Lykke.HftApi.Domain/Services/IBlockedClientsService.cs
new file mode 100644
index 0000000..410fbce
--- /dev/null
+++ b/src/Lykke.HftApi.Domain/Services/IBlockedClientsService.cs
@@ -0,0 +1,9 @@
+using System.Threading.Tasks;
+
+namespace Lykke.HftApi.Domain.Services
+{
+ public interface IBlockedClientsService
+ {
+ Task IsClientBlocked(string clientId);
+ }
+}
diff --git a/src/Lykke.HftApi.Domain/Services/ITokenService.cs b/src/Lykke.HftApi.Domain/Services/ITokenService.cs
index 8e72bed..dd787f2 100644
--- a/src/Lykke.HftApi.Domain/Services/ITokenService.cs
+++ b/src/Lykke.HftApi.Domain/Services/ITokenService.cs
@@ -1,4 +1,4 @@
-using System.Threading.Tasks;
+using System.Threading.Tasks;
namespace Lykke.HftApi.Domain.Services
{
diff --git a/src/Lykke.HftApi.Services/AssetsService.cs b/src/Lykke.HftApi.Services/AssetsService.cs
index 4044759..2ff20ca 100644
--- a/src/Lykke.HftApi.Services/AssetsService.cs
+++ b/src/Lykke.HftApi.Services/AssetsService.cs
@@ -1,12 +1,10 @@
-using System;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Autofac;
-using Lykke.HftApi.Domain;
using Lykke.HftApi.Domain.Entities;
using Lykke.HftApi.Domain.Entities.Assets;
-using Lykke.HftApi.Domain.Exceptions;
using Lykke.HftApi.Domain.Services;
using Lykke.HftApi.Services.AssetsClient;
diff --git a/src/Lykke.HftApi.Services/BlockedClientsService.cs b/src/Lykke.HftApi.Services/BlockedClientsService.cs
new file mode 100644
index 0000000..a9b8652
--- /dev/null
+++ b/src/Lykke.HftApi.Services/BlockedClientsService.cs
@@ -0,0 +1,38 @@
+using System.Threading.Tasks;
+using Common.Log;
+using Lykke.Common.ApiLibrary.Exceptions;
+using Lykke.Common.Log;
+using Lykke.HftApi.Domain.Services;
+using Lykke.Service.ClientAccount.Client;
+
+namespace Lykke.HftApi.Services
+{
+ public class BlockedClientsService : IBlockedClientsService
+ {
+ private readonly ILog _log;
+ private readonly IClientAccountClient _clientAccountClient;
+
+ public BlockedClientsService(ILogFactory logFactory, IClientAccountClient clientAccountClient)
+ {
+ _log = logFactory.CreateLog(this);
+ _clientAccountClient = clientAccountClient;
+ }
+
+ public async Task IsClientBlocked(string clientId)
+ {
+ try
+ {
+ return (await _clientAccountClient.ClientSettings.GetCashOutBlockSettingsAsync(clientId))?.TradesBlocked == true;
+ }
+ catch (ClientApiException ex) when (ex.Message == "Client not found")
+ {
+ _log.Warning("Client not found. It will be treated as blocked one", context: new
+ {
+ ClientId = clientId
+ });
+
+ return false;
+ }
+ }
+ }
+}
diff --git a/src/Lykke.HftApi.Services/Lykke.HftApi.Services.csproj b/src/Lykke.HftApi.Services/Lykke.HftApi.Services.csproj
index 5c833e9..a540842 100644
--- a/src/Lykke.HftApi.Services/Lykke.HftApi.Services.csproj
+++ b/src/Lykke.HftApi.Services/Lykke.HftApi.Services.csproj
@@ -16,10 +16,11 @@
+
-
+
diff --git a/src/Lykke.HftApi.Services/TokenService.cs b/src/Lykke.HftApi.Services/TokenService.cs
index 2ab216f..bc047d1 100644
--- a/src/Lykke.HftApi.Services/TokenService.cs
+++ b/src/Lykke.HftApi.Services/TokenService.cs
@@ -1,4 +1,4 @@
-using System.Collections.Concurrent;
+using System.Collections.Concurrent;
using System.Threading.Tasks;
using Lykke.HftApi.Domain.Services;
using Lykke.Service.HftInternalService.Client;
@@ -10,28 +10,35 @@ public class TokenService : ITokenService
{
private readonly IHftInternalClient _hftInternalClient;
private readonly ILogger _logger;
+ private readonly IBlockedClientsService _blockedClients;
private readonly ConcurrentDictionary _cache;
public TokenService(
IHftInternalClient hftInternalClient,
- ILogger logger
- )
+ ILogger logger,
+ IBlockedClientsService blockedClients)
{
_hftInternalClient = hftInternalClient;
_logger = logger;
+ _blockedClients = blockedClients;
_cache = new ConcurrentDictionary();
}
public async Task InitAsync()
{
- _logger.LogInformation("Getting key ids");
- var ids = await _hftInternalClient.Keys.GetAllKeyIds();
- _logger.LogInformation($"Caching {ids.Count} ids");
+ _logger.LogInformation("API keys cache is being initialized");
- foreach (var id in ids)
+ var keys = await _hftInternalClient.Keys.GetAllKeys();
+
+ foreach (var key in keys)
{
- _cache.TryAdd(id, 0);
+ if (!await _blockedClients.IsClientBlocked(key.ClientId))
+ {
+ _cache.TryAdd(key.ApiKey, 0);
+ }
}
+
+ _logger.LogInformation($"API keys cache has been initialized. {_cache.Count} active keys were added to the cache");
}
public bool IsValid(string id)
@@ -41,13 +48,11 @@ public bool IsValid(string id)
public void Add(string id)
{
- _logger.LogInformation($"Adding {id} to cache");
_cache.TryAdd(id, 0);
}
public void Remove(string id)
{
- _logger.LogInformation($"Removing {id} from cache");
_cache.TryRemove(id, out _);
}
}