Skip to content

Commit

Permalink
Merge pull request #1 from Cabazure/feature/storage
Browse files Browse the repository at this point in the history
Ensure storage container names are configured for each processor individually
  • Loading branch information
rickykaare authored Jan 9, 2025
2 parents 14d2263 + 42aa2d5 commit a078e0e
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 72 deletions.
5 changes: 3 additions & 2 deletions samples/EventHub/EventHub.Processor/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
builder.Services.AddCabazureEventHub(b => b
.Configure(o => o
.WithConnection(connectionString)
.WithBlobStorage(blobsConnection, "container1", createIfNotExist: true))
.AddProcessor<MyEvent, MyEventprocessor>("eventhub"));
.WithBlobStorage(blobsConnection))
.AddProcessor<MyEvent, MyEventprocessor>("eventhub", "$default", b => b
.WithBlobContainer("container1", createIfNotExist: true)));

var app = builder.Build();

Expand Down
22 changes: 6 additions & 16 deletions src/Cabazure.Messaging.EventHub/CabazureEventHubOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,23 @@ public CabazureEventHubOptions WithConnection(string connectionString)
}

public CabazureEventHubOptions WithBlobStorage(
Uri containerUri,
TokenCredential credential,
bool createIfNotExist = false)
Uri serviceUri,
TokenCredential credential)
{
BlobStorage = new()
{
ContainerUri = containerUri,
ServiceUri = serviceUri,
Credential = credential,
CreateIfNotExist = createIfNotExist,
};
return this;
}

public CabazureEventHubOptions WithBlobStorage(
string connectionString,
string blobContainerName,
bool createIfNotExist = false)
string connectionString)
{
BlobStorage = new()
{
ConnectionString = connectionString,
ContainerName = blobContainerName,
CreateIfNotExist = createIfNotExist,
};
return this;
}
Expand All @@ -70,13 +64,9 @@ public CabazureEventHubOptions WithBlobStorage(

public class BlobStorageOptions
{
public Uri? ContainerUri { get; set; }
public Uri? ServiceUri { get; set; }

public TokenCredential? Credential { get; set; }

public string? ConnectionString { get; set; }

public string? ContainerName { get; set; }

public bool CreateIfNotExist { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public EventHubBuilder AddProcessor<TMessage, TProcessor>(
Action<EventHubProcessorBuilder>? builder = null)
where TProcessor : class, IMessageProcessor<TMessage>
{
var processorBuilder = new EventHubProcessorBuilder();
var processorBuilder = new EventHubProcessorBuilder(eventHubName);
builder?.Invoke(processorBuilder);

Services.AddLogging();
Expand All @@ -80,6 +80,7 @@ public EventHubBuilder AddProcessor<TMessage, TProcessor>(
ConnectionName,
eventHubName,
consumerGroup,
processorBuilder.BlobContainer,
processorBuilder.ProcessorOptions);

return new EventHubProcessorService<TMessage, TProcessor>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

namespace Cabazure.Messaging.EventHub.DependencyInjection;

public class EventHubProcessorBuilder
public class EventHubProcessorBuilder(
string eventHubName)
{
public List<Func<IDictionary<string, object>, bool>> Filters { get; } = [];

public EventProcessorClientOptions? ProcessorOptions { get; private set; }

public BlobContainerOptions BlobContainer { get; private set; } = new(eventHubName, true);

public EventHubProcessorBuilder WithFilter(
Func<IDictionary<string, object>, bool> predicate)
{
Expand All @@ -21,4 +24,16 @@ public EventHubProcessorBuilder WithProcessorOptions(
ProcessorOptions = options;
return this;
}

public EventHubProcessorBuilder WithBlobContainer(
string containerName,
bool createIfNotExist = true)
{
BlobContainer = new(containerName, createIfNotExist);
return this;
}
}

public record BlobContainerOptions(
string ContainerName,
bool CreateIfNotExist = false);
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Cabazure.Messaging.EventHub.Internal;

public interface IBlobStorageClientProvider
{
BlobContainerClient GetClient(
BlobServiceClient GetClient(
string? connectionName = null);
}

Expand All @@ -15,32 +15,22 @@ public class BlobStorageClientProvider(
: IBlobStorageClientProvider
{
private sealed record ClientKey(string? Connection);
private readonly ConcurrentDictionary<ClientKey, BlobContainerClient> clients = new();
private readonly ConcurrentDictionary<ClientKey, BlobServiceClient> clients = new();

public BlobContainerClient GetClient(
public BlobServiceClient GetClient(
string? connectionName)
=> clients.GetOrAdd(
new(connectionName),
CreateClient);

private BlobContainerClient CreateClient(ClientKey key)
{
var options = monitor.Get(key.Connection);
var storageClient = options?.BlobStorage switch
private BlobServiceClient CreateClient(ClientKey key)
=> monitor.Get(key.Connection) switch
{
{ ConnectionString: { } cs, ContainerName: { } cont } => new BlobContainerClient(cs, cont),
{ ContainerUri: { } uri, Credential: { } cred } => new BlobContainerClient(uri, cred),
{ ContainerUri: { } uri } => new BlobContainerClient(uri, options.Credential),
{ BlobStorage.ConnectionString: { } cs } => new BlobServiceClient(cs),
{ BlobStorage: { ServiceUri: { } uri, Credential: { } cred } } => new BlobServiceClient(uri, cred),
{ BlobStorage.ServiceUri: { } uri, Credential: { } cred } => new BlobServiceClient(uri, cred),

_ => throw new ArgumentException(
$"Missing blob storage configuration for connection `{key.Connection}`"),
};

if (options.BlobStorage.CreateIfNotExist)
{
storageClient.CreateIfNotExists();
}

return storageClient;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Azure.Messaging.EventHubs;
using Azure.Storage.Blobs;
using Cabazure.Messaging.EventHub.DependencyInjection;
using Microsoft.Extensions.Options;

namespace Cabazure.Messaging.EventHub.Internal;
Expand All @@ -9,7 +11,8 @@ IEventHubProcessor Create(
string? connectionName,
string eventHubName,
string consumerGroup,
EventProcessorClientOptions? options = null);
BlobContainerOptions containerOptions,
EventProcessorClientOptions? processorOptions = null);
}

public class EventHubProcessorFactory(
Expand All @@ -21,27 +24,45 @@ public IEventHubProcessor Create(
string? connectionName,
string eventHubName,
string consumerGroup,
EventProcessorClientOptions? options = null)
BlobContainerOptions containerOptions,
EventProcessorClientOptions? processorOptions = null)
=> monitor.Get(connectionName) switch
{
{ FullyQualifiedNamespace: { } ns, Credential: { } cred }
=> new EventHubProcessorWrapper(
new EventProcessorClient(
storageProvider.GetClient(connectionName),
GetContainerClient(connectionName, containerOptions),
consumerGroup,
ns,
eventHubName,
cred,
options)),
{ ConnectionString: { } cs }
processorOptions)),
{
ConnectionString: { } cs
}
=> new EventHubProcessorWrapper(
new EventProcessorClient(
storageProvider.GetClient(connectionName),
GetContainerClient(connectionName, containerOptions),
consumerGroup,
cs,
eventHubName,
options)),
processorOptions)),
_ => throw new ArgumentException(
$"Missing configuration for Event Hub connection `{connectionName}`"),
};

private BlobContainerClient GetContainerClient(
string? connectionName,
BlobContainerOptions options)
{
var client = storageProvider.GetClient(connectionName);
var container = client.GetBlobContainerClient(options.ContainerName);

if (options.CreateIfNotExist)
{
container.CreateIfNotExists();
}

return container;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,32 @@ public void WithConnection_Sets_ConnectionString(
[Theory, AutoNSubstituteData]
public void WithBlobStorage_Sets_BlobStorageOptions_With_ContainerUri(
[NoAutoProperties] CabazureEventHubOptions sut,
Uri containerUri,
TokenCredential credential,
bool createIfNotExist)
Uri serviceUri,
TokenCredential credential)
{
sut.WithBlobStorage(containerUri, credential, createIfNotExist);
sut.WithBlobStorage(serviceUri, credential);
sut.BlobStorage
.Should()
.BeEquivalentTo(
new BlobStorageOptions
{
ContainerUri = containerUri,
ServiceUri = serviceUri,
Credential = credential,
CreateIfNotExist = createIfNotExist,
});
}

[Theory, AutoNSubstituteData]
public void WithBlobStorage_Sets_BlobStorageOptions_With_ConnectionString(
[NoAutoProperties] CabazureEventHubOptions sut,
string connectionString,
string blobContainerName,
bool createIfNotExist)
string connectionString)
{
sut.WithBlobStorage(connectionString, blobContainerName, createIfNotExist);
sut.WithBlobStorage(connectionString);
sut.BlobStorage
.Should()
.BeEquivalentTo(
new BlobStorageOptions
{
ConnectionString = connectionString,
ContainerName = blobContainerName,
CreateIfNotExist = createIfNotExist,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void AddProcessor_Calls_Builder(
}

[Theory, AutoNSubstituteData]
public void AddProcessor_Registers_EventHubProcessorService_Using_Factory(
public void AddProcessor_With_Default_BlobContainer_Registers_EventHubProcessorService_Using_Factory(
[Frozen(Matching.ImplementedInterfaces)]
ServiceCollection services,
string eventHubName,
Expand All @@ -250,7 +250,48 @@ public void AddProcessor_Registers_EventHubProcessorService_Using_Factory(
.Create(
sut.ConnectionName,
eventHubName,
consumerGroupName);
consumerGroupName,
new BlobContainerOptions(
ContainerName: eventHubName,
CreateIfNotExist: true));
result.Processor
.Should()
.Be(processor);
}

[Theory, AutoNSubstituteData]
public void AddProcessor_With_BlobCOntainerOptions_Registers_EventHubProcessorService_Using_Factory(
[Frozen(Matching.ImplementedInterfaces)]
ServiceCollection services,
string eventHubName,
string consumerGroupName,
BlobContainerOptions containerOptions,
EventHubBuilder sut,
IEventHubProcessorFactory factory,
[Substitute] TProcessor processor)
{
services.AddOptions<CabazureEventHubOptions>();
services.AddSingleton(factory);
services.AddSingleton(processor);

sut.AddProcessor<TMessage, TProcessor>(
eventHubName,
consumerGroupName,
b => b.WithBlobContainer(
containerOptions.ContainerName,
containerOptions.CreateIfNotExist));

var result = services
.BuildServiceProvider()
.GetRequiredService<EventHubProcessorService<TMessage, TProcessor>>();

factory
.Received(1)
.Create(
sut.ConnectionName,
eventHubName,
consumerGroupName,
containerOptions);
result.Processor
.Should()
.Be(processor);
Expand Down
Loading

0 comments on commit a078e0e

Please sign in to comment.