diff --git a/Zongsoft.Messaging.Kafka/src/KafkaUtility.cs b/Zongsoft.Messaging.Kafka/src/KafkaUtility.cs index 93e2137e..5ed5a7b1 100644 --- a/Zongsoft.Messaging.Kafka/src/KafkaUtility.cs +++ b/Zongsoft.Messaging.Kafka/src/KafkaUtility.cs @@ -28,22 +28,29 @@ */ using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; using System.Collections.Generic; -using System.Collections.Concurrent; using Confluent.Kafka; using Zongsoft.Common; -using Zongsoft.Components; using Zongsoft.Configuration; namespace Zongsoft.Messaging.Kafka { internal static class KafkaUtility { + private static readonly Dictionary ProducerConfigurationMapping = new() + { + { "Topic", null }, + { "SecurityProtocol", "security.protocol" }, + }; + + private static readonly Dictionary ConsumerConfigurationMapping = new() + { + { "Topic", null }, + { "SecurityProtocol", "security.protocol" }, + }; + public static ProducerConfig GetProducerOptions(IConnectionSettings settings) { if(settings == null) @@ -56,7 +63,10 @@ public static ProducerConfig GetProducerOptions(IConnectionSettings settings) }; foreach(var setting in settings) - config.Set(setting.Key, setting.Value); + { + if(ProducerConfigurationMapping.TryGetValue(setting.Key, out var key) && key != null) + config.Set(key, setting.Value); + } return config; } @@ -68,13 +78,16 @@ public static ConsumerConfig GetConsumerOptions(IConnectionSettings settings) var config = new ConsumerConfig { - GroupId = settings.Group, + GroupId = string.IsNullOrEmpty(settings.Group) ? $"G{Randomizer.GenerateString()}" : settings.Group, ClientId = settings.Client, BootstrapServers = settings.Server }; foreach(var setting in settings) - config.Set(setting.Key, setting.Value); + { + if(ConsumerConfigurationMapping.TryGetValue(setting.Key, out var key) && key != null) + config.Set(key, setting.Value); + } return config; } diff --git a/Zongsoft.Messaging.Kafka/src/Zongsoft.Messaging.Kafka.csproj b/Zongsoft.Messaging.Kafka/src/Zongsoft.Messaging.Kafka.csproj index a5c9a951..3e633f2f 100644 --- a/Zongsoft.Messaging.Kafka/src/Zongsoft.Messaging.Kafka.csproj +++ b/Zongsoft.Messaging.Kafka/src/Zongsoft.Messaging.Kafka.csproj @@ -1,6 +1,6 @@  - 6.8.0.0 + 6.8.1.0 Zongsoft Kafka Library This is a library about Kafka messaging development. Zongsoft.Messaging.Kafka