Skip to content

Commit

Permalink
LM-3196 implemented API trading blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
KonstantinRyazantsev committed Mar 21, 2024
1 parent f1712b3 commit 0bb3b0e
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 40 deletions.
3 changes: 3 additions & 0 deletions settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ RabbitMq:
settings-key: RabbitSpotPricesConnString
ExchangeName:
settings-key: PublicTradesExchangeName
ClientAccountFeedConnectionString:
settings-key: HFT-ClientAccountFeed
types: [RabbitMq]
SagasRabbitMq:
RabbitConnectionString:
settings-key: RabbitSagas
Expand Down
9 changes: 2 additions & 7 deletions src/HftApi.Common/Configuration/RabbitMqConfig.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace HftApi.Common.Configuration
namespace HftApi.Common.Configuration
{
public class RabbitMqConfig
{
Expand All @@ -7,11 +7,6 @@ public class RabbitMqConfig
public RabbitMqConnection Balances { get; set; }
public RabbitMqConnection Orders { get; set; }
public RabbitMqConnection PublicTrades { get; set; }
}

public class RabbitMqConnection
{
public string ConnectionString { get; set; }
public string ExchangeName { get; set; }
public string ClientAccountFeedConnectionString { get; set; }
}
}
8 changes: 8 additions & 0 deletions src/HftApi.Common/Configuration/RabbitMqConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace HftApi.Common.Configuration
{
public class RabbitMqConnection
{
public string ConnectionString { get; set; }
public string ExchangeName { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/HftApi.Common/Configuration/ServicesConfig.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Lykke.Service.Kyc.Client;
using Lykke.Service.Kyc.Client;

namespace HftApi.Common.Configuration
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System.Linq;
using System.Linq;
using System.Security.Claims;

namespace HftApi.Extensions
{
public static class UserExxtensions
public static class UserExtensions
{
public static string GetClientId(this ClaimsPrincipal user)
{
Expand Down
2 changes: 1 addition & 1 deletion src/HftApi/HftApi.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<PackageReference Include="Lykke.Messaging.RabbitMq" Version="2.2.2" />
<PackageReference Include="Lykke.Service.Balances.Client" Version="2.3.2" />
<PackageReference Include="Lykke.Service.ClientDialogs.Client" Version="1.0.1" />
<PackageReference Include="Lykke.Service.HftInternalService.Client" Version="2.1.1" />
<PackageReference Include="Lykke.Service.HftInternalService.Client" Version="2.4.0" />
<PackageReference Include="Lykke.Service.Kyc.Client" Version="1.5.0" />
<PackageReference Include="Lykke.Service.Operations.Client" Version="3.1.0" />
<PackageReference Include="Lykke.Service.Operations.Contracts" Version="3.1.1" />
Expand Down
13 changes: 11 additions & 2 deletions src/HftApi/Modules/AutofacModule.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using Autofac;
using AzureStorage;
using AzureStorage.Tables;
Expand Down Expand Up @@ -52,6 +52,9 @@ protected override void Load(ContainerBuilder builder)
.WithParameter(TypedParameter.From(_config.Redis.OrderBooksCacheKeyPattern))
.SingleInstance();

builder.RegisterType<BlockedClientsService>()
.As<IBlockedClientsService>();

var cache = new RedisCache(new RedisCacheOptions
{
Configuration = _config.Redis.RedisConfiguration,
Expand Down Expand Up @@ -80,6 +83,12 @@ protected override void Load(ContainerBuilder builder)
.WithParameter("exchangeName", _config.RabbitMq.HftInternal.ExchangeName)
.SingleInstance();

builder.RegisterType<ClientSettingsUpdatesSubscriber>()
.As<IStartable>()
.AutoActivate()
.WithParameter("connectionString", _config.RabbitMq.ClientAccountFeedConnectionString)
.SingleInstance();

builder.RegisterHftInternalClient(_config.Services.HftInternalServiceUrl);

builder.RegisterType<TokenService>()
Expand Down Expand Up @@ -187,7 +196,7 @@ protected override void Load(ContainerBuilder builder)
builder.RegisterOperationsClient(_config.Services.OperationsServiceUrl);

builder.RegisterClientDialogsClient(_config.Services.ClientDialogsServiceUrl);

builder.RegisterInstance(
new Swisschain.Sirius.Api.ApiClient.ApiClient(_config.Services.SiriusApiServiceClient.GrpcServiceUrl, _config.Services.SiriusApiServiceClient.ApiKey)
).As<Swisschain.Sirius.Api.ApiClient.IApiClient>();
Expand Down
102 changes: 102 additions & 0 deletions src/HftApi/RabbitSubscribers/ClientSettingsUpdatesSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using System;
using System.Threading.Tasks;
using Autofac;
using Common.Log;
using HftApi.RabbitSubscribers.Messages;
using Lykke.Common.Log;
using Lykke.HftApi.Domain.Services;
using Lykke.RabbitMqBroker;
using Lykke.RabbitMqBroker.Subscriber;
using Lykke.Service.HftInternalService.Client;

namespace HftApi.RabbitSubscribers
{
public class ClientSettingsUpdatesSubscriber : IStartable, IDisposable
{
private readonly RabbitMqSubscriber<ClientSettingsCashoutBlockUpdated> _subscriber;
private readonly IHftInternalClient _hftInternalClient;
private readonly ITokenService _tokenService;
private readonly ILog _log;

public ClientSettingsUpdatesSubscriber(ILogFactory logFactory,
string connectionString,
IHftInternalClient hftInternalClient,
ITokenService tokenService)
{
_hftInternalClient = hftInternalClient;
_tokenService = tokenService;
_log = logFactory.CreateLog(this);

var subscriptionSettings = RabbitMqSubscriptionSettings
.ForSubscriber(
connectionString,
"client-account.client-settings-updated",
$"client-account.client-settings-updated.hft-api-v2-{Environment.MachineName}")
.UseRoutingKey(nameof(ClientSettingsCashoutBlockUpdated));

subscriptionSettings.DeadLetterExchangeName = null;

var strategy = new DefaultErrorHandlingStrategy(logFactory, subscriptionSettings);

_subscriber = new RabbitMqSubscriber<ClientSettingsCashoutBlockUpdated>(logFactory, subscriptionSettings, strategy)
.SetMessageDeserializer(new JsonMessageDeserializer<ClientSettingsCashoutBlockUpdated>())
.SetMessageReadStrategy(new MessageReadWithTemporaryQueueStrategy())
.Subscribe(HandleMessage);
}

public async Task HandleMessage(ClientSettingsCashoutBlockUpdated evt)
{
// Race condition with KeyUpdatedEvent event handling is possible, but it is decided
// that it's acceptable since these events are not very frequent

_log.Info($"Got client trades blocking update. Trades are blocked: {evt.TradesBlocked}", context: new
{
ClientId = evt.ClientId
});

var enabledClientApiKeys = await _hftInternalClient.Keys.GetKeys(evt.ClientId);

if (evt.TradesBlocked)
{
foreach (var key in enabledClientApiKeys)
{
_tokenService.Remove(key.ApiKey);

var apiKeyStart = key.ApiKey.Substring(0, 4);

_log.Info($"API key has been cached", context: new
{
ClientId = evt.ClientId,
ApiKeyStart = apiKeyStart
});
}
}
else
{
foreach (var key in enabledClientApiKeys)
{
_tokenService.Add(key.ApiKey);

var apiKeyStart = key.ApiKey.Substring(0, 4);

_log.Info($"API key has been evicted from the cache", context: new
{
ClientId = evt.ClientId,
ApiKeyStart = apiKeyStart
});
}
}
}

public void Start()
{
_subscriber.Start();
}

public void Dispose()
{
_subscriber?.Stop();
_subscriber?.Dispose();
}
}
}
58 changes: 46 additions & 12 deletions src/HftApi/RabbitSubscribers/KeyUpdateSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using System;
using System;
using System.Threading.Tasks;
using Autofac;
using Common.Log;
using Lykke.Common.Log;
using Lykke.HftApi.Domain.Services;
using Lykke.RabbitMqBroker;
using Lykke.RabbitMqBroker.Subscriber;
using Lykke.Service.HftInternalService.Client.Messages;
using Microsoft.Extensions.Logging;

namespace HftApi.RabbitSubscribers
{
Expand All @@ -16,51 +16,85 @@ public class KeyUpdateSubscriber : IStartable, IDisposable
private readonly string _exchangeName;
private readonly ITokenService _tokenService;
private readonly ILogFactory _logFactory;
private readonly ILog _log;
private readonly IBlockedClientsService _blockedClients;
private RabbitMqSubscriber<KeyUpdatedEvent> _subscriber;

public KeyUpdateSubscriber(
string connectionString,
string exchangeName,
ITokenService tokenService,
ILoggerFactory loggerFactory,
ILogFactory logFactory)
ILogFactory logFactory,
IBlockedClientsService blockedClients)
{
_log = logFactory.CreateLog(this);
_connectionString = connectionString;
_exchangeName = exchangeName;
_tokenService = tokenService;
_logFactory = logFactory;
_blockedClients = blockedClients;
}

public void Start()
{
_tokenService.InitAsync().GetAwaiter().GetResult();

var settings = RabbitMqSubscriptionSettings
.ForSubscriber(_connectionString, _exchangeName, $"{nameof(KeyUpdateSubscriber)}-{Environment.MachineName}");

settings.DeadLetterExchangeName = null;

_subscriber = new RabbitMqSubscriber<KeyUpdatedEvent>(_logFactory,
settings,
new ResilientErrorHandlingStrategy(_logFactory, settings, TimeSpan.FromSeconds(10)))
new ResilientErrorHandlingStrategy(_log, settings, TimeSpan.FromSeconds(10)))
.SetMessageDeserializer(new JsonMessageDeserializer<KeyUpdatedEvent>())
.Subscribe(ProcessMessageAsync)
.CreateDefaultBinding()
.Start();

_tokenService.InitAsync().GetAwaiter().GetResult();
}

private Task ProcessMessageAsync(KeyUpdatedEvent message)
private async Task ProcessMessageAsync(KeyUpdatedEvent message)
{
if (message.IsDeleted)
_tokenService.Remove(message.Id);
else
// Race condition with ClientSettingsCashoutBlockUpdated event handling is possible, but it is decided
// that it's acceptable since these events are not very frequent

var isClientBlocked = await _blockedClients.IsClientBlocked(message.ClientId);
var apiKeyStart = message.Id.Substring(0, 4);

_log.Info($"API key deleted: {message.IsDeleted}. Client blocked: {isClientBlocked}", context: new
{
ClientId = message.ClientId,
WalletId = message.WalletId,
ApiKeyStart = apiKeyStart
});

if (!message.IsDeleted && !isClientBlocked)
{
_tokenService.Add(message.Id);

return Task.CompletedTask;
_log.Info($"API key has been cached", context: new
{
ClientId = message.ClientId,
WalletId = message.WalletId,
ApiKeyStart = apiKeyStart
});
}
else
{
_tokenService.Remove(message.Id);

_log.Info($"API key has been evicted from the cache", context: new
{
ClientId = message.ClientId,
WalletId = message.WalletId,
ApiKeyStart = apiKeyStart
});
}
}

public void Dispose()
{
_subscriber?.Stop();
_subscriber?.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace HftApi.RabbitSubscribers.Messages
{
// Owned by Lykke.Service.ClientAccount service
public class ClientSettingsCashoutBlockUpdated
{
public string ClientId { get; set; }
public bool CashOutBlocked { get; set; }
public bool TradesBlocked { get; set; }
}
}
9 changes: 9 additions & 0 deletions src/Lykke.HftApi.Domain/Services/IBlockedClientsService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace Lykke.HftApi.Domain.Services
{
public interface IBlockedClientsService
{
Task<bool> IsClientBlocked(string clientId);
}
}
2 changes: 1 addition & 1 deletion src/Lykke.HftApi.Domain/Services/ITokenService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Threading.Tasks;
using System.Threading.Tasks;

namespace Lykke.HftApi.Domain.Services
{
Expand Down
4 changes: 1 addition & 3 deletions src/Lykke.HftApi.Services/AssetsService.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Autofac;
using Lykke.HftApi.Domain;
using Lykke.HftApi.Domain.Entities;
using Lykke.HftApi.Domain.Entities.Assets;
using Lykke.HftApi.Domain.Exceptions;
using Lykke.HftApi.Domain.Services;
using Lykke.HftApi.Services.AssetsClient;

Expand Down
38 changes: 38 additions & 0 deletions src/Lykke.HftApi.Services/BlockedClientsService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Threading.Tasks;
using Common.Log;
using Lykke.Common.ApiLibrary.Exceptions;
using Lykke.Common.Log;
using Lykke.HftApi.Domain.Services;
using Lykke.Service.ClientAccount.Client;

namespace Lykke.HftApi.Services
{
public class BlockedClientsService : IBlockedClientsService
{
private readonly ILog _log;
private readonly IClientAccountClient _clientAccountClient;

public BlockedClientsService(ILogFactory logFactory, IClientAccountClient clientAccountClient)
{
_log = logFactory.CreateLog(this);
_clientAccountClient = clientAccountClient;
}

public async Task<bool> IsClientBlocked(string clientId)
{
try
{
return (await _clientAccountClient.ClientSettings.GetCashOutBlockSettingsAsync(clientId))?.TradesBlocked == true;
}
catch (ClientApiException ex) when (ex.Message == "Client not found")
{
_log.Warning("Client not found. It will be treated as blocked one", context: new
{
ClientId = clientId
});

return false;
}
}
}
}
Loading

0 comments on commit 0bb3b0e

Please sign in to comment.