Skip to content

Commit

Permalink
init rabbit mq
Browse files Browse the repository at this point in the history
  • Loading branch information
nhathoang989 committed Jan 20, 2024
1 parent 1e54ea1 commit b35831f
Show file tree
Hide file tree
Showing 36 changed files with 556 additions and 57 deletions.
3 changes: 3 additions & 0 deletions src/applications/mixcore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Mix.Lib.Middlewares;
using Mix.Shared.Services;
using Mix.Shared.Models.Configurations;
using Mix.Queue.Extensions;
var builder = MixCmsHelper.CreateWebApplicationBuilder(args);

if (builder.Environment.IsDevelopment())
Expand All @@ -25,7 +26,9 @@
var globalConfig = builder.Configuration.GetSection(MixAppSettingsSection.GlobalSettings)
.Get<GlobalSettingsModel>();
builder.Services.AddEndpointsApiExplorer();
builder.AddMixQueue();
builder.Services.AddMixServices(Assembly.GetExecutingAssembly(), builder.Configuration);

builder.Services.ApplyMigrations(globalConfig);
builder.Services.AddMixCors();
builder.Services.AddScoped<MixNavigationService>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
},
"Mix": {
"ProjectId": ""
},
"RabitMqQueueSetting": {
"HostName": "localhost",
"UserName": "",
"Password": "",
"VHost": null,
"Port": 5672
}
}
}
2 changes: 1 addition & 1 deletion src/platform/mix.constant/Enums/MixQueueProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public enum MixQueueProvider
{
GOOGLE,
KAFKA,
RABITMQ,
AWS,
AZURE,
MIX
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;

Expand All @@ -13,8 +14,9 @@ public MixBackgroundTaskPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<MixBackgroundTaskPublisher> logger)
: base(TopicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixBackgroundTaskPublisher> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/platform/mix.library/Publishers/MixDbCommandPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;

Expand All @@ -13,8 +14,9 @@ public MixDbCommandPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<MixDbCommandPublisher> logger)
: base(TopicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixDbCommandPublisher> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/platform/mix.library/Publishers/MixPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;
using Mix.Queue.Engines.MixQueue;
using Mix.RepoDb.Publishers;
using RabbitMQ.Client;

namespace Mix.Lib.Publishers
{
Expand All @@ -16,8 +18,9 @@ public MixPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration, IWebHostEnvironment environment,
MixEndpointService mixEndpointService,
ILogger<MixRepoDbPublisher> logger)
: base(topicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixRepoDbPublisher> logger,
IPooledObjectPolicy<IModel> rabbitMqObjectPolicy = null)
: base(topicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;
using Mix.Queue.Engines.MixQueue;
Expand All @@ -15,8 +16,9 @@ public MixViewModelChangedPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<MixViewModelChangedPublisher> logger)
: base(TopicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixViewModelChangedPublisher> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Communicator.Models;
using Mix.Communicator.Services;
using Mix.Database.Entities.Account;
Expand Down Expand Up @@ -35,8 +36,9 @@ public MixBackgroundTaskSubscriber(
IPortalHubClientService portalHub,
MixDbEventService mixDbEventService,
IMemoryQueueService<MessageQueueModel> queueService,
ILogger<MixBackgroundTaskSubscriber> logger)
: base(TopicId, nameof(MixBackgroundTaskSubscriber), 20, serviceProvider, configuration, queueService, logger)
ILogger<MixBackgroundTaskSubscriber> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, nameof(MixBackgroundTaskSubscriber), 20, serviceProvider, configuration, queueService, logger, rabbitMqObjectPolicy)
{
PortalHub = portalHub;
MixDbEventService = mixDbEventService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Communicator.Models;
using Mix.Communicator.Services;
using Mix.Database.Entities.MixDb;
Expand All @@ -26,8 +27,9 @@ public MixDbCommandSubscriber(
IServiceProvider serviceProvider,
IConfiguration configuration,
IMemoryQueueService<MessageQueueModel> queueService,
ILogger<MixDbCommandSubscriber> logger)
: base(TopicId, nameof(MixDbCommandSubscriber), 20, serviceProvider, configuration, queueService, logger)
ILogger<MixDbCommandSubscriber> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, nameof(MixDbCommandSubscriber), 20, serviceProvider, configuration, queueService, logger, rabbitMqObjectPolicy)
{
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Heart.Model;
using Mix.Lib.Interfaces;
using Mix.Lib.Subscribers.Handlers.MixViewModelChangedHandlers;
Expand All @@ -20,8 +21,9 @@ public MixViewModelChangedSubscriber(
IConfiguration configuration,
IMixTenantService mixTenantService,
IMemoryQueueService<MessageQueueModel> queueService,
ILogger<MixViewModelChangedSubscriber> logger)
: base(TopicId, nameof(MixDbCommandSubscriber), 20, serviceProvider, configuration, queueService, logger)
ILogger<MixViewModelChangedSubscriber> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, nameof(MixDbCommandSubscriber), 20, serviceProvider, configuration, queueService, logger, rabbitMqObjectPolicy)
{
_mixTenantService = mixTenantService;
}
Expand Down
6 changes: 4 additions & 2 deletions src/platform/mix.log/Publishers/MixLogPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Constant.Constants;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;
Expand All @@ -17,8 +18,9 @@ public MixLogPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<MixLogPublisher> logger)
: base(TopicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixLogPublisher> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel>? rabbitMqObjectPolicy = null)
: base(TopicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/platform/mix.log/Subscribers/MixLogSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Constant.Constants;
using Mix.Constant.Enums;
using Mix.Database.Entities.Queue;
Expand Down Expand Up @@ -42,8 +43,9 @@ public MixLogSubscriber(
IMemoryQueueService<MessageQueueModel> queueService,
IMixQueueLog queueMessageLogService,
IAuditLogService auditLogService,
ILogger<MixLogSubscriber> logger)
: base(TopicId, nameof(MixLogSubscriber), 20, serviceProvider, configuration, queueService, logger)
ILogger<MixLogSubscriber> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, nameof(MixLogSubscriber), 20, serviceProvider, configuration, queueService, logger, rabbitMqObjectPolicy)
{
_queueMessageLogService = queueMessageLogService;
_portalHub = portalHub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class AzureQueuePublisher<T> : IQueuePublisher<T>
private static ServiceBusClient _client;
private readonly AzureQueueSetting _queueSetting;

public AzureQueuePublisher(QueueSetting queueSetting, string topicName)
public AzureQueuePublisher(IQueueSetting queueSetting, string topicName)
{
_queueSetting = queueSetting as AzureQueueSetting;
InitializeQueue(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class AzureQueueSubscriber<T> : IQueueSubscriber
private readonly AzureQueueSetting _queueSetting;
private readonly Func<T, Task> _messageHandler;
public AzureQueueSubscriber(
QueueSetting queueSetting,
IQueueSetting queueSetting,
string topicId,
string subscriptionId,
Func<T, Task> messageHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal class GoogleQueuePublisher<T> : IQueuePublisher<T>
private PublisherClient _publisher;
private readonly GoogleQueueSetting _queueSetting;

public GoogleQueuePublisher(QueueSetting queueSetting, string topicName)
public GoogleQueuePublisher(IQueueSetting queueSetting, string topicName)
{
_queueSetting = queueSetting as GoogleQueueSetting;
InitializeQueue(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class GoogleQueueSubscriber<T> : IQueueSubscriber
private readonly Func<T, Task> _messageHandler;

public GoogleQueueSubscriber(
QueueSetting queueSetting,
IQueueSetting queueSetting,
string topicId,
string subscriptionId,
Func<T, Task> messageHandler)
Expand Down
2 changes: 1 addition & 1 deletion src/platform/mix.queue/Engines/Mix/MixQueuePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class MixQueuePublisher<T> : IQueuePublisher<T>
private readonly MixEndpointService _mixEndpointService;
private GrpcChannelModel<MixMq.MixMqClient> _mixMqPublisher;

public MixQueuePublisher(QueueSetting queueSetting, string topicName, MixEndpointService mixEndpointService)
public MixQueuePublisher(IQueueSetting queueSetting, string topicName, MixEndpointService mixEndpointService)
{
_topicId = topicName;
_mixEndpointService = mixEndpointService;
Expand Down
2 changes: 1 addition & 1 deletion src/platform/mix.queue/Engines/Mix/MixQueueSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class MixQueueSubscriber<T> : BackgroundService, IQueueSubscriber
private AsyncServerStreamingCall<SubscribeReply> _call;
private CancellationToken _startCancellationToken;

Check warning on line 31 in src/platform/mix.queue/Engines/Mix/MixQueueSubscriber.cs

View workflow job for this annotation

GitHub Actions / build

The field 'MixQueueSubscriber<T>._startCancellationToken' is never used
public MixQueueSubscriber(
QueueSetting queueSetting,
IQueueSetting queueSetting,
string topicId,
string subscriptionId,
Func<T, Task> messageHandler,
Expand Down
21 changes: 16 additions & 5 deletions src/platform/mix.queue/Engines/PublisherBase.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Heart.Exceptions;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines.MixQueue;
using Mix.Queue.Interfaces;
using Mix.Queue.Models.QueueSetting;
using Mix.Shared.Services;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -25,22 +27,25 @@ public abstract class PublisherBase : BackgroundService
private readonly string _topicId;
private MixQueueProvider _provider;
protected ILogger<PublisherBase> _logger;
private readonly IPooledObjectPolicy<IModel> _rabbitMqObjectPolicy;
protected PublisherBase(
string topicId,
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<PublisherBase> logger)
ILogger<PublisherBase> logger,
IPooledObjectPolicy<IModel> rabbitMqObjectPolicy)
{
_queueService = queueService;
_configuration = configuration;
_topicId = topicId;
_mixEndpointService = mixEndpointService;
_logger = logger;
_rabbitMqObjectPolicy = rabbitMqObjectPolicy;
}

private List<IQueuePublisher<MessageQueueModel>> CreatePublisher(
string topicName)
string topicId)
{
try
{
Expand All @@ -62,7 +67,7 @@ private List<IQueuePublisher<MessageQueueModel>> CreatePublisher(

queuePublishers.Add(
QueueEngineFactory.CreatePublisher<MessageQueueModel>(
_provider, azureSetting, topicName, _mixEndpointService));
_provider, azureSetting, topicId, _mixEndpointService));
break;
case MixQueueProvider.GOOGLE:
var googleSettingPath = _configuration.GetSection("MessageQueueSetting:GoogleQueueSetting");
Expand All @@ -72,16 +77,22 @@ private List<IQueuePublisher<MessageQueueModel>> CreatePublisher(

queuePublishers.Add(
QueueEngineFactory.CreatePublisher<MessageQueueModel>(
_provider, googleSetting, topicName, _mixEndpointService));
_provider, googleSetting, topicId, _mixEndpointService));
break;

case MixQueueProvider.RABITMQ:
queuePublishers.Add(
QueueEngineFactory.CreateRabbitMqPublisher<MessageQueueModel>(_rabbitMqObjectPolicy, topicId));
break;

case MixQueueProvider.MIX:
if (_mixEndpointService.MixMq != null)
{
var mixSettingPath = _configuration.GetSection("MessageQueueSetting:Mix");
var mixSetting = new MixQueueSetting();
mixSettingPath.Bind(mixSetting);
queuePublishers.Add(
QueueEngineFactory.CreatePublisher<MessageQueueModel>(_provider, mixSetting, topicName, _mixEndpointService));
QueueEngineFactory.CreatePublisher<MessageQueueModel>(_provider, mixSetting, topicId, _mixEndpointService));
}
break;
}
Expand Down
Loading

0 comments on commit b35831f

Please sign in to comment.