From 42aa2d54e5ff18c2378782e471dd421b0c374aca Mon Sep 17 00:00:00 2001 From: Ricky Kaare Engelharth Date: Thu, 9 Jan 2025 21:10:52 +0100 Subject: [PATCH] Ensure storage container names are configured for each processor individually --- .../EventHub/EventHub.Processor/Program.cs | 5 +- .../CabazureEventHubOptions.cs | 22 +++----- .../DependencyInjection/EventHubBuilder.cs | 3 +- .../EventHubProcessorBuilder.cs | 17 +++++- .../Internal/BlobStorageClientProvider.cs | 26 +++------ .../Internal/EventHubProcessorFactory.cs | 35 +++++++++--- .../CabazureEventHubOptionsTests.cs | 18 +++---- .../EventHubBuilderTests.cs | 45 +++++++++++++++- .../Internal/BlobStorageProviderTests.cs | 21 +++----- .../Internal/EventHubProcessorFactoryTests.cs | 53 +++++++++++++++++++ 10 files changed, 173 insertions(+), 72 deletions(-) diff --git a/samples/EventHub/EventHub.Processor/Program.cs b/samples/EventHub/EventHub.Processor/Program.cs index 1e7f89e..a0373c5 100644 --- a/samples/EventHub/EventHub.Processor/Program.cs +++ b/samples/EventHub/EventHub.Processor/Program.cs @@ -12,8 +12,9 @@ builder.Services.AddCabazureEventHub(b => b .Configure(o => o .WithConnection(connectionString) - .WithBlobStorage(blobsConnection, "container1", createIfNotExist: true)) - .AddProcessor("eventhub")); + .WithBlobStorage(blobsConnection)) + .AddProcessor("eventhub", "$default", b => b + .WithBlobContainer("container1", createIfNotExist: true))); var app = builder.Build(); diff --git a/src/Cabazure.Messaging.EventHub/CabazureEventHubOptions.cs b/src/Cabazure.Messaging.EventHub/CabazureEventHubOptions.cs index 96eccac..9dadd09 100644 --- a/src/Cabazure.Messaging.EventHub/CabazureEventHubOptions.cs +++ b/src/Cabazure.Messaging.EventHub/CabazureEventHubOptions.cs @@ -39,29 +39,23 @@ public CabazureEventHubOptions WithConnection(string connectionString) } public CabazureEventHubOptions WithBlobStorage( - Uri containerUri, - TokenCredential credential, - bool createIfNotExist = false) + Uri serviceUri, + TokenCredential credential) { BlobStorage = new() { - ContainerUri = containerUri, + ServiceUri = serviceUri, Credential = credential, - CreateIfNotExist = createIfNotExist, }; return this; } public CabazureEventHubOptions WithBlobStorage( - string connectionString, - string blobContainerName, - bool createIfNotExist = false) + string connectionString) { BlobStorage = new() { ConnectionString = connectionString, - ContainerName = blobContainerName, - CreateIfNotExist = createIfNotExist, }; return this; } @@ -70,13 +64,9 @@ public CabazureEventHubOptions WithBlobStorage( public class BlobStorageOptions { - public Uri? ContainerUri { get; set; } + public Uri? ServiceUri { get; set; } public TokenCredential? Credential { get; set; } public string? ConnectionString { get; set; } - - public string? ContainerName { get; set; } - - public bool CreateIfNotExist { get; set; } -} \ No newline at end of file +} diff --git a/src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubBuilder.cs b/src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubBuilder.cs index 86b3bf8..82d6d96 100644 --- a/src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubBuilder.cs +++ b/src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubBuilder.cs @@ -60,7 +60,7 @@ public EventHubBuilder AddProcessor( Action? builder = null) where TProcessor : class, IMessageProcessor { - var processorBuilder = new EventHubProcessorBuilder(); + var processorBuilder = new EventHubProcessorBuilder(eventHubName); builder?.Invoke(processorBuilder); Services.AddLogging(); @@ -80,6 +80,7 @@ public EventHubBuilder AddProcessor( ConnectionName, eventHubName, consumerGroup, + processorBuilder.BlobContainer, processorBuilder.ProcessorOptions); return new EventHubProcessorService( diff --git a/src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubProcessorBuilder.cs b/src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubProcessorBuilder.cs index de71512..ee477f7 100644 --- a/src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubProcessorBuilder.cs +++ b/src/Cabazure.Messaging.EventHub/DependencyInjection/EventHubProcessorBuilder.cs @@ -2,12 +2,15 @@ namespace Cabazure.Messaging.EventHub.DependencyInjection; -public class EventHubProcessorBuilder +public class EventHubProcessorBuilder( + string eventHubName) { public List, bool>> Filters { get; } = []; public EventProcessorClientOptions? ProcessorOptions { get; private set; } + public BlobContainerOptions BlobContainer { get; private set; } = new(eventHubName, true); + public EventHubProcessorBuilder WithFilter( Func, bool> predicate) { @@ -21,4 +24,16 @@ public EventHubProcessorBuilder WithProcessorOptions( ProcessorOptions = options; return this; } + + public EventHubProcessorBuilder WithBlobContainer( + string containerName, + bool createIfNotExist = true) + { + BlobContainer = new(containerName, createIfNotExist); + return this; + } } + +public record BlobContainerOptions( + string ContainerName, + bool CreateIfNotExist = false); diff --git a/src/Cabazure.Messaging.EventHub/Internal/BlobStorageClientProvider.cs b/src/Cabazure.Messaging.EventHub/Internal/BlobStorageClientProvider.cs index d0ab855..c5b9377 100644 --- a/src/Cabazure.Messaging.EventHub/Internal/BlobStorageClientProvider.cs +++ b/src/Cabazure.Messaging.EventHub/Internal/BlobStorageClientProvider.cs @@ -6,7 +6,7 @@ namespace Cabazure.Messaging.EventHub.Internal; public interface IBlobStorageClientProvider { - BlobContainerClient GetClient( + BlobServiceClient GetClient( string? connectionName = null); } @@ -15,32 +15,22 @@ public class BlobStorageClientProvider( : IBlobStorageClientProvider { private sealed record ClientKey(string? Connection); - private readonly ConcurrentDictionary clients = new(); + private readonly ConcurrentDictionary clients = new(); - public BlobContainerClient GetClient( + public BlobServiceClient GetClient( string? connectionName) => clients.GetOrAdd( new(connectionName), CreateClient); - private BlobContainerClient CreateClient(ClientKey key) - { - var options = monitor.Get(key.Connection); - var storageClient = options?.BlobStorage switch + private BlobServiceClient CreateClient(ClientKey key) + => monitor.Get(key.Connection) switch { - { ConnectionString: { } cs, ContainerName: { } cont } => new BlobContainerClient(cs, cont), - { ContainerUri: { } uri, Credential: { } cred } => new BlobContainerClient(uri, cred), - { ContainerUri: { } uri } => new BlobContainerClient(uri, options.Credential), + { BlobStorage.ConnectionString: { } cs } => new BlobServiceClient(cs), + { BlobStorage: { ServiceUri: { } uri, Credential: { } cred } } => new BlobServiceClient(uri, cred), + { BlobStorage.ServiceUri: { } uri, Credential: { } cred } => new BlobServiceClient(uri, cred), _ => throw new ArgumentException( $"Missing blob storage configuration for connection `{key.Connection}`"), }; - - if (options.BlobStorage.CreateIfNotExist) - { - storageClient.CreateIfNotExists(); - } - - return storageClient; - } } diff --git a/src/Cabazure.Messaging.EventHub/Internal/EventHubProcessorFactory.cs b/src/Cabazure.Messaging.EventHub/Internal/EventHubProcessorFactory.cs index 687b4e3..0789f73 100644 --- a/src/Cabazure.Messaging.EventHub/Internal/EventHubProcessorFactory.cs +++ b/src/Cabazure.Messaging.EventHub/Internal/EventHubProcessorFactory.cs @@ -1,4 +1,6 @@ using Azure.Messaging.EventHubs; +using Azure.Storage.Blobs; +using Cabazure.Messaging.EventHub.DependencyInjection; using Microsoft.Extensions.Options; namespace Cabazure.Messaging.EventHub.Internal; @@ -9,7 +11,8 @@ IEventHubProcessor Create( string? connectionName, string eventHubName, string consumerGroup, - EventProcessorClientOptions? options = null); + BlobContainerOptions containerOptions, + EventProcessorClientOptions? processorOptions = null); } public class EventHubProcessorFactory( @@ -21,27 +24,45 @@ public IEventHubProcessor Create( string? connectionName, string eventHubName, string consumerGroup, - EventProcessorClientOptions? options = null) + BlobContainerOptions containerOptions, + EventProcessorClientOptions? processorOptions = null) => monitor.Get(connectionName) switch { { FullyQualifiedNamespace: { } ns, Credential: { } cred } => new EventHubProcessorWrapper( new EventProcessorClient( - storageProvider.GetClient(connectionName), + GetContainerClient(connectionName, containerOptions), consumerGroup, ns, eventHubName, cred, - options)), - { ConnectionString: { } cs } + processorOptions)), + { + ConnectionString: { } cs + } => new EventHubProcessorWrapper( new EventProcessorClient( - storageProvider.GetClient(connectionName), + GetContainerClient(connectionName, containerOptions), consumerGroup, cs, eventHubName, - options)), + processorOptions)), _ => throw new ArgumentException( $"Missing configuration for Event Hub connection `{connectionName}`"), }; + + private BlobContainerClient GetContainerClient( + string? connectionName, + BlobContainerOptions options) + { + var client = storageProvider.GetClient(connectionName); + var container = client.GetBlobContainerClient(options.ContainerName); + + if (options.CreateIfNotExist) + { + container.CreateIfNotExists(); + } + + return container; + } } diff --git a/test/Cabazure.Messaging.EventHub.Tests/CabazureEventHubOptionsTests.cs b/test/Cabazure.Messaging.EventHub.Tests/CabazureEventHubOptionsTests.cs index 79af002..d96cd08 100644 --- a/test/Cabazure.Messaging.EventHub.Tests/CabazureEventHubOptionsTests.cs +++ b/test/Cabazure.Messaging.EventHub.Tests/CabazureEventHubOptionsTests.cs @@ -37,38 +37,32 @@ public void WithConnection_Sets_ConnectionString( [Theory, AutoNSubstituteData] public void WithBlobStorage_Sets_BlobStorageOptions_With_ContainerUri( [NoAutoProperties] CabazureEventHubOptions sut, - Uri containerUri, - TokenCredential credential, - bool createIfNotExist) + Uri serviceUri, + TokenCredential credential) { - sut.WithBlobStorage(containerUri, credential, createIfNotExist); + sut.WithBlobStorage(serviceUri, credential); sut.BlobStorage .Should() .BeEquivalentTo( new BlobStorageOptions { - ContainerUri = containerUri, + ServiceUri = serviceUri, Credential = credential, - CreateIfNotExist = createIfNotExist, }); } [Theory, AutoNSubstituteData] public void WithBlobStorage_Sets_BlobStorageOptions_With_ConnectionString( [NoAutoProperties] CabazureEventHubOptions sut, - string connectionString, - string blobContainerName, - bool createIfNotExist) + string connectionString) { - sut.WithBlobStorage(connectionString, blobContainerName, createIfNotExist); + sut.WithBlobStorage(connectionString); sut.BlobStorage .Should() .BeEquivalentTo( new BlobStorageOptions { ConnectionString = connectionString, - ContainerName = blobContainerName, - CreateIfNotExist = createIfNotExist, }); } } diff --git a/test/Cabazure.Messaging.EventHub.Tests/DependencyInjection/EventHubBuilderTests.cs b/test/Cabazure.Messaging.EventHub.Tests/DependencyInjection/EventHubBuilderTests.cs index 6dd33b7..7e6f91b 100644 --- a/test/Cabazure.Messaging.EventHub.Tests/DependencyInjection/EventHubBuilderTests.cs +++ b/test/Cabazure.Messaging.EventHub.Tests/DependencyInjection/EventHubBuilderTests.cs @@ -224,7 +224,7 @@ public void AddProcessor_Calls_Builder( } [Theory, AutoNSubstituteData] - public void AddProcessor_Registers_EventHubProcessorService_Using_Factory( + public void AddProcessor_With_Default_BlobContainer_Registers_EventHubProcessorService_Using_Factory( [Frozen(Matching.ImplementedInterfaces)] ServiceCollection services, string eventHubName, @@ -250,7 +250,48 @@ public void AddProcessor_Registers_EventHubProcessorService_Using_Factory( .Create( sut.ConnectionName, eventHubName, - consumerGroupName); + consumerGroupName, + new BlobContainerOptions( + ContainerName: eventHubName, + CreateIfNotExist: true)); + result.Processor + .Should() + .Be(processor); + } + + [Theory, AutoNSubstituteData] + public void AddProcessor_With_BlobCOntainerOptions_Registers_EventHubProcessorService_Using_Factory( + [Frozen(Matching.ImplementedInterfaces)] + ServiceCollection services, + string eventHubName, + string consumerGroupName, + BlobContainerOptions containerOptions, + EventHubBuilder sut, + IEventHubProcessorFactory factory, + [Substitute] TProcessor processor) + { + services.AddOptions(); + services.AddSingleton(factory); + services.AddSingleton(processor); + + sut.AddProcessor( + eventHubName, + consumerGroupName, + b => b.WithBlobContainer( + containerOptions.ContainerName, + containerOptions.CreateIfNotExist)); + + var result = services + .BuildServiceProvider() + .GetRequiredService>(); + + factory + .Received(1) + .Create( + sut.ConnectionName, + eventHubName, + consumerGroupName, + containerOptions); result.Processor .Should() .Be(processor); diff --git a/test/Cabazure.Messaging.EventHub.Tests/Internal/BlobStorageProviderTests.cs b/test/Cabazure.Messaging.EventHub.Tests/Internal/BlobStorageProviderTests.cs index dd881ee..c434d46 100644 --- a/test/Cabazure.Messaging.EventHub.Tests/Internal/BlobStorageProviderTests.cs +++ b/test/Cabazure.Messaging.EventHub.Tests/Internal/BlobStorageProviderTests.cs @@ -8,8 +8,7 @@ namespace Cabazure.Messaging.EventHub.Tests.Internal; public class BlobStorageProviderTests { - private readonly Uri containerUri; - private readonly string containerName; + private readonly Uri serviceUri; private readonly string accountName; private readonly string connectionString; private readonly TokenCredential credential; @@ -17,8 +16,7 @@ public class BlobStorageProviderTests public BlobStorageProviderTests() { - containerUri = new Uri("https://localhost"); - containerName = "container1"; + serviceUri = new Uri("https://localhost"); accountName = "account1"; connectionString = $"AccountName={accountName};AccountKey=;"; credential = Substitute.For(); @@ -28,10 +26,8 @@ public BlobStorageProviderTests() BlobStorage = new() { ConnectionString = connectionString, - ContainerName = containerName, - ContainerUri = containerUri, + ServiceUri = serviceUri, Credential = credential, - CreateIfNotExist = false, }, }; } @@ -74,7 +70,7 @@ public void GetClient_Returns_Client( result .Should() - .BeOfType(); + .BeOfType(); } [Theory, AutoNSubstituteData] @@ -87,7 +83,7 @@ public void GetClient_Uses_Namespace_From_Options( { BlobStorage = new BlobStorageOptions { - ContainerUri = containerUri, + ServiceUri = serviceUri, Credential = credential, }, }; @@ -96,7 +92,7 @@ public void GetClient_Uses_Namespace_From_Options( result.Uri .Should() - .Be(containerUri); + .Be(serviceUri); } [Theory, AutoNSubstituteData] @@ -112,15 +108,14 @@ public void GetClient_Uses_Namespace_From_ConnectionString_In_Options( BlobStorage = new BlobStorageOptions { ConnectionString = connectionString, - ContainerName = connectionName, } }; monitor.Get(default).ReturnsForAnyArgs(options); var result = sut.GetClient(connectionName); - result.Name + result.AccountName .Should() - .Be(connectionName); + .Be(accountName); } [Theory, AutoNSubstituteData] diff --git a/test/Cabazure.Messaging.EventHub.Tests/Internal/EventHubProcessorFactoryTests.cs b/test/Cabazure.Messaging.EventHub.Tests/Internal/EventHubProcessorFactoryTests.cs index 3068c0c..71b7594 100644 --- a/test/Cabazure.Messaging.EventHub.Tests/Internal/EventHubProcessorFactoryTests.cs +++ b/test/Cabazure.Messaging.EventHub.Tests/Internal/EventHubProcessorFactoryTests.cs @@ -1,6 +1,8 @@ using System.Text.Json; using Azure.Core; using Azure.Messaging.EventHubs; +using Azure.Storage.Blobs; +using Cabazure.Messaging.EventHub.DependencyInjection; using Cabazure.Messaging.EventHub.Internal; using Microsoft.Extensions.Options; @@ -15,6 +17,7 @@ public void Create_Throws_If_No_Options( string connectionName, string eventHubName, string consumerGroup, + BlobContainerOptions containerOptions, [NoAutoProperties] EventProcessorClientOptions clientOptions) => FluentActions .Invoking(() => @@ -22,6 +25,7 @@ public void Create_Throws_If_No_Options( connectionName, eventHubName, consumerGroup, + containerOptions, clientOptions)) .Should() .Throw() @@ -34,17 +38,22 @@ public void Create_Gets_Options( JsonSerializerOptions serializerOptions, CabazureEventHubOptions options, [Frozen] IOptionsMonitor monitor, + [Frozen] IBlobStorageClientProvider storageProvider, + [Substitute] BlobServiceClient blobClient, EventHubProcessorFactory sut, string connectionName, string eventHubName, string consumerGroup, + BlobContainerOptions containerOptions, [NoAutoProperties] EventProcessorClientOptions clientOptions) { + storageProvider.GetClient(default).ReturnsForAnyArgs(blobClient); monitor.Get(default).ReturnsForAnyArgs(options); sut.Create( connectionName, eventHubName, consumerGroup, + containerOptions, clientOptions); monitor @@ -52,23 +61,57 @@ public void Create_Gets_Options( .Get(connectionName); } + [Theory, AutoNSubstituteData] + public void Create_Gets_StorageClient( + [Frozen, NoAutoProperties] + JsonSerializerOptions serializerOptions, + CabazureEventHubOptions options, + [Frozen] IOptionsMonitor monitor, + [Frozen] IBlobStorageClientProvider storageProvider, + [Substitute] BlobServiceClient blobClient, + EventHubProcessorFactory sut, + string connectionName, + string eventHubName, + string consumerGroup, + BlobContainerOptions containerOptions, + [NoAutoProperties] EventProcessorClientOptions clientOptions) + { + storageProvider.GetClient(default).ReturnsForAnyArgs(blobClient); + monitor.Get(default).ReturnsForAnyArgs(options); + sut.Create( + connectionName, + eventHubName, + consumerGroup, + containerOptions, + clientOptions); + + storageProvider + .Received(1) + .GetClient(connectionName); + } + [Theory, AutoNSubstituteData] public void Create_Returns_Client( [Frozen, NoAutoProperties] JsonSerializerOptions serializerOptions, CabazureEventHubOptions options, [Frozen] IOptionsMonitor monitor, + [Frozen] IBlobStorageClientProvider storageProvider, + [Substitute] BlobServiceClient blobClient, EventHubProcessorFactory sut, string connectionName, string eventHubName, string consumerGroup, + BlobContainerOptions containerOptions, [NoAutoProperties] EventProcessorClientOptions clientOptions) { + storageProvider.GetClient(default).ReturnsForAnyArgs(blobClient); monitor.Get(default).ReturnsForAnyArgs(options); var result = sut.Create( connectionName, eventHubName, consumerGroup, + containerOptions, clientOptions); result @@ -81,10 +124,13 @@ public void Create_Uses_Namespace_From_Options( [Frozen, NoAutoProperties] JsonSerializerOptions serializerOptions, [Frozen] IOptionsMonitor monitor, + [Frozen] IBlobStorageClientProvider storageProvider, + [Substitute] BlobServiceClient blobClient, EventHubProcessorFactory sut, string connectionName, string eventHubName, string consumerGroup, + BlobContainerOptions containerOptions, [NoAutoProperties] EventProcessorClientOptions clientOptions, string fqns, TokenCredential credential) @@ -94,11 +140,13 @@ public void Create_Uses_Namespace_From_Options( FullyQualifiedNamespace = fqns, Credential = credential, }; + storageProvider.GetClient(default).ReturnsForAnyArgs(blobClient); monitor.Get(default).ReturnsForAnyArgs(options); var result = sut.Create( connectionName, eventHubName, consumerGroup, + containerOptions, clientOptions); result.FullyQualifiedNamespace @@ -117,10 +165,13 @@ public void Create_Uses_Namespace_From_ConnectionString_In_Options( [Frozen, NoAutoProperties] JsonSerializerOptions serializerOptions, [Frozen] IOptionsMonitor monitor, + [Frozen] IBlobStorageClientProvider storageProvider, + [Substitute] BlobServiceClient blobClient, EventHubProcessorFactory sut, string connectionName, string eventHubName, string consumerGroup, + BlobContainerOptions containerOptions, [NoAutoProperties] EventProcessorClientOptions clientOptions, string fqns, TokenCredential credential) @@ -132,11 +183,13 @@ public void Create_Uses_Namespace_From_ConnectionString_In_Options( $"SharedAccessKeyName=RootManageSharedAccessKey;" + $"SharedAccessKey=SAS_KEY_VALUE;", }; + storageProvider.GetClient(default).ReturnsForAnyArgs(blobClient); monitor.Get(default).ReturnsForAnyArgs(options); var result = sut.Create( connectionName, eventHubName, consumerGroup, + containerOptions, clientOptions); result.FullyQualifiedNamespace