Skip to content

Commit

Permalink
修复 Kafka 消息队列扩展插件中的错误。 🏮
Browse files Browse the repository at this point in the history
  • Loading branch information
PopeyeZhong committed Jan 13, 2025
1 parent b4231ee commit c98be63
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
29 changes: 21 additions & 8 deletions Zongsoft.Messaging.Kafka/src/KafkaUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,29 @@
*/

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;

using Confluent.Kafka;

using Zongsoft.Common;
using Zongsoft.Components;
using Zongsoft.Configuration;

namespace Zongsoft.Messaging.Kafka
{
internal static class KafkaUtility
{
private static readonly Dictionary<string, string> ProducerConfigurationMapping = new()
{
{ "Topic", null },
{ "SecurityProtocol", "security.protocol" },
};

private static readonly Dictionary<string, string> ConsumerConfigurationMapping = new()
{
{ "Topic", null },
{ "SecurityProtocol", "security.protocol" },
};

public static ProducerConfig GetProducerOptions(IConnectionSettings settings)
{
if(settings == null)
Expand All @@ -56,7 +63,10 @@ public static ProducerConfig GetProducerOptions(IConnectionSettings settings)
};

foreach(var setting in settings)
config.Set(setting.Key, setting.Value);
{
if(ProducerConfigurationMapping.TryGetValue(setting.Key, out var key) && key != null)
config.Set(key, setting.Value);
}

return config;
}
Expand All @@ -68,13 +78,16 @@ public static ConsumerConfig GetConsumerOptions(IConnectionSettings settings)

var config = new ConsumerConfig
{
GroupId = settings.Group,
GroupId = string.IsNullOrEmpty(settings.Group) ? $"G{Randomizer.GenerateString()}" : settings.Group,
ClientId = settings.Client,
BootstrapServers = settings.Server
};

foreach(var setting in settings)
config.Set(setting.Key, setting.Value);
{
if(ConsumerConfigurationMapping.TryGetValue(setting.Key, out var key) && key != null)
config.Set(key, setting.Value);
}

return config;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Version>6.8.0.0</Version>
<Version>6.8.1.0</Version>
<Product>Zongsoft Kafka Library</Product>
<Description>This is a library about Kafka messaging development.</Description>
<RootNamespace>Zongsoft.Messaging.Kafka</RootNamespace>
Expand Down

0 comments on commit c98be63

Please sign in to comment.