From 91833841e57280b00ece272f3f46ab38114ecf18 Mon Sep 17 00:00:00 2001
From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 30 Jan 2025 17:28:32 +0100
Subject: [PATCH 1/7] initial commit
---
build/deploy-test-resources.yml | 1 +
build/templates/test-resources.bicep | 20 ++
build/variables/test.yml | 2 +
.../Arcus.Testing.Messaging.EventHubs.csproj | 41 ++++
.../EventHubEventFilter.cs | 129 ++++++++++++
.../TemporaryEventHub.cs | 197 ++++++++++++++++++
.../Arcus.Testing.Tests.Integration.csproj | 1 +
.../Configuration/EventHubsConfig.cs | 31 +++
.../Messaging/Fixture/EventHubsTestContext.cs | 131 ++++++++++++
.../Messaging/TemporaryEventHubTests.cs | 80 +++++++
src/Arcus.Testing.sln | 9 +
11 files changed, 642 insertions(+)
create mode 100644 src/Arcus.Testing.Messaging.EventHubs/Arcus.Testing.Messaging.EventHubs.csproj
create mode 100644 src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
create mode 100644 src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
create mode 100644 src/Arcus.Testing.Tests.Integration/Messaging/Configuration/EventHubsConfig.cs
create mode 100644 src/Arcus.Testing.Tests.Integration/Messaging/Fixture/EventHubsTestContext.cs
create mode 100644 src/Arcus.Testing.Tests.Integration/Messaging/TemporaryEventHubTests.cs
diff --git a/build/deploy-test-resources.yml b/build/deploy-test-resources.yml
index f7a7c3a8..3122f92f 100644
--- a/build/deploy-test-resources.yml
+++ b/build/deploy-test-resources.yml
@@ -58,6 +58,7 @@ stages:
--parameters cosmosDb_mongoDb_databaseName=${{ variables['Arcus.Testing.Cosmos.MongoDb.DatabaseName'] }} `
--parameters cosmosDb_noSql_name=${{ variables['Arcus.Testing.Cosmos.NoSql.Name'] }} `
--parameters cosmosDb_noSql_databaseName=${{ variables['Arcus.Testing.Cosmos.NoSql.DatabaseName'] }} `
+ --parameters eventHubsNamespaceName=${{ variables['Arcus.Testing.EventHubs.Namespace'] }} `
--parameters keyVaultName=${{ parameters.keyVaultName }} `
--parameters servicePrincipal_objectId=$objectId
diff --git a/build/templates/test-resources.bicep b/build/templates/test-resources.bicep
index 8944087c..b5e42da7 100644
--- a/build/templates/test-resources.bicep
+++ b/build/templates/test-resources.bicep
@@ -19,6 +19,9 @@ param cosmosDb_noSql_name string
// Define the name of the CosmosDb NoSql database that will be created.
param cosmosDb_noSql_databaseName string
+// Define the name of the Azure EventHubs namespace that will be created.
+param eventHubsNamespaceName string
+
// Define the name of the key vault where the necessary secrets will be stored to access the deployed test resources.
param keyVaultName string
@@ -125,6 +128,23 @@ module cosmosDb_noSql 'br/public:avm/res/document-db/database-account:0.6.0' = {
}
}
+module eventHubsNamespace 'br/public:avm/res/event-hub/namespace:0.9.1' = {
+ name: 'eventHubsNamespaceDeployment'
+ params: {
+ name: eventHubsNamespaceName
+ location: location
+ skuName: 'Basic'
+ skuCapacity: 1
+ publicNetworkAccess: 'Enabled'
+ roleAssignments: [
+ {
+ principalId: servicePrincipal_objectId
+ roleDefinitionIdOrName: 'Event Hubs Data Owner'
+ }
+ ]
+ }
+}
+
module vault 'br/public:avm/res/key-vault/vault:0.6.1' = {
name: 'vaultDeployment'
params: {
diff --git a/build/variables/test.yml b/build/variables/test.yml
index 8c8b64f4..db3e31ee 100644
--- a/build/variables/test.yml
+++ b/build/variables/test.yml
@@ -7,6 +7,8 @@ variables:
Arcus.Testing.Cosmos.NoSql.Name: 'arcus-testing-cosmos-nosql'
Arcus.Testing.Cosmos.NoSql.DatabaseName: 'arcus-testing-cosmos-nosql-db'
+ Arcus.Testing.EventHubs.Namespace: 'arcus-testing-eventhubs'
+
Arcus.Testing.KeyVault.Name: 'arcus-testing-kv'
Arcus.Testing.StorageAccount.Name: 'arcustestingstorage'
Arcus.Testing.StorageAccount.Key.SecretName: 'Arcus-Testing-StorageAccount-Key'
\ No newline at end of file
diff --git a/src/Arcus.Testing.Messaging.EventHubs/Arcus.Testing.Messaging.EventHubs.csproj b/src/Arcus.Testing.Messaging.EventHubs/Arcus.Testing.Messaging.EventHubs.csproj
new file mode 100644
index 00000000..914a4dc5
--- /dev/null
+++ b/src/Arcus.Testing.Messaging.EventHubs/Arcus.Testing.Messaging.EventHubs.csproj
@@ -0,0 +1,41 @@
+
+
+
+ net8.0;net6.0
+ Arcus.Testing
+ Arcus
+ Arcus
+ Provides messaging capabilities for Azure EventHubs during Arcus testing
+ Copyright (c) Arcus
+ https://github.com/arcus-azure/arcus.testing
+ https://github.com/arcus-azure/arcus.testing
+ LICENSE
+ icon.png
+ README.md
+ Git
+ Azure;Testing;Messaging;EventHubs
+ Arcus.Testing.Messaging.EventHubs
+ true
+ true
+ true
+ $(WarningsNotAsErrors);NU1901;NU1902;NU1903;NU1904
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs b/src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
new file mode 100644
index 00000000..285816a1
--- /dev/null
+++ b/src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
@@ -0,0 +1,129 @@
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Messaging.EventHubs.Consumer;
+
+namespace Arcus.Testing
+{
+ ///
+ /// Represents a configurable filter instance that selects a subset of s on an Azure EventHubs hub
+ /// (a.k.a. 'spy test fixture').
+ ///
+ public class EventHubEventFilter
+ {
+ private readonly EventHubConsumerClient _client;
+ private readonly ReadEventOptions _options = new() { MaximumWaitTime = TimeSpan.FromMinutes(1) };
+ private readonly Collection> _predicates = new();
+
+ private string _partitionId;
+ private EventPosition _startingPosition;
+
+ internal EventHubEventFilter(EventHubConsumerClient client)
+ {
+ ArgumentNullException.ThrowIfNull(client);
+ _client = client;
+ }
+
+ ///
+ ///
+ ///
+ /// The identifier of the Event Hub partition from which events will be received.
+ /// The position within the partition where the consumer should begin reading events.
+ ///
+ public EventHubEventFilter FromPartition(string partitionId, EventPosition startingPosition)
+ {
+ _partitionId = partitionId;
+ _startingPosition = startingPosition;
+ return this;
+ }
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ public EventHubEventFilter Where(Func predicate)
+ {
+ ArgumentNullException.ThrowIfNull(predicate);
+ _predicates.Add(predicate);
+
+ return this;
+ }
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ public EventHubEventFilter ReadWith(Action configureOptions)
+ {
+ configureOptions(_options);
+ return this;
+ }
+
+ ///
+ /// Gets the awaiter used to await the .
+ ///
+ public TaskAwaiter> GetAwaiter()
+ {
+ return ToListAsync().GetAwaiter();
+ }
+
+ ///
+ ///
+ ///
+ ///
+ public async Task AnyAsync()
+ {
+ return await AnyAsync(CancellationToken.None);
+ }
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ public async Task AnyAsync(CancellationToken cancellationToken)
+ {
+ List events = await ToListAsync(cancellationToken);
+ return events.Any();
+ }
+
+ ///
+ ///
+ ///
+ ///
+ public async Task> ToListAsync()
+ {
+ return await ToListAsync(CancellationToken.None);
+ }
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ public async Task> ToListAsync(CancellationToken cancellationToken)
+ {
+ IAsyncEnumerable reading =
+ _partitionId is null
+ ? _client.ReadEventsAsync(_options, cancellationToken)
+ : _client.ReadEventsFromPartitionAsync(_partitionId, _startingPosition, _options, cancellationToken);
+
+ var events = new List();
+ await foreach (PartitionEvent ev in reading)
+ {
+ if (!ev.Equals(default) && _predicates.All(predicate => predicate(ev)))
+ {
+ events.Add(ev);
+ }
+ }
+
+ return events;
+ }
+ }
+}
diff --git a/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs b/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
new file mode 100644
index 00000000..3068398b
--- /dev/null
+++ b/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
@@ -0,0 +1,197 @@
+using System;
+using System.Threading.Tasks;
+using Azure;
+using Azure.Core;
+using Azure.Identity;
+using Azure.Messaging.EventHubs.Consumer;
+using Azure.ResourceManager;
+using Azure.ResourceManager.EventHubs;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+
+namespace Arcus.Testing
+{
+ ///
+ /// Represents the available options for the when the test fixture is set up.
+ ///
+ public class OnSetupTemporaryEventHubOptions
+ {
+ internal EventHubData EventHubData { get; } = new();
+
+ ///
+ /// Configures the when an Azure EvenHubs hub is being created.
+ ///
+ /// The additional function to manipulate how the Azure EventHubs hub is created by the test fixture.
+ /// Thrown when the is null.
+ public OnSetupTemporaryEventHubOptions CreateHubWith(Action configureHub)
+ {
+ ArgumentNullException.ThrowIfNull(configureHub);
+
+ configureHub(EventHubData);
+ return this;
+ }
+ }
+
+ ///
+ /// Represents the available options for the when the test fixture is teardown.
+ ///
+ public class OnTeardownTemporaryEventHubOptions
+ {
+
+ }
+
+ ///
+ /// Represents the available options for the .
+ ///
+ public class TemporaryEventHubOptions
+ {
+ ///
+ /// Gets the available options when the test fixture is set up.
+ ///
+ public OnSetupTemporaryEventHubOptions OnSetup { get; } = new();
+
+ ///
+ /// Gets the available options when the test fixture is teardown.
+ ///
+ public OnTeardownTemporaryEventHubOptions OnTeardown { get; } = new();
+ }
+
+ ///
+ /// Represents a temporary Azure EventHubs hub that will be deleted when the instance is disposed.
+ ///
+ public class TemporaryEventHub : IAsyncDisposable
+ {
+ private readonly EventHubsNamespaceResource _eventHubsNamespace;
+ private readonly EventHubConsumerClient _consumerClient;
+ private readonly string _eventHubName;
+ private readonly bool _createdByUs;
+ private readonly ILogger _logger;
+
+ private TemporaryEventHub(
+ EventHubsNamespaceResource eventHubsNamespace,
+ EventHubConsumerClient consumerClient,
+ string eventHubName,
+ bool createdByUs,
+ ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(eventHubsNamespace);
+ ArgumentNullException.ThrowIfNull(consumerClient);
+
+ _eventHubsNamespace = eventHubsNamespace;
+ _consumerClient = consumerClient;
+ _eventHubName = eventHubName;
+ _createdByUs = createdByUs;
+ _logger = logger;
+ }
+
+ ///
+ /// Gets the filter client to search for events on the Azure EventHubs test-managed hub (a.k.a. 'spy test fixture').
+ ///
+ public EventHubEventFilter Events => new(_consumerClient);
+
+ ///
+ /// Creates a new instance of the which creates a new Azure EventHubs hub if it doesn't exist yet.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static async Task CreateIfNotExistsAsync(
+ ResourceIdentifier eventHubNamespaceResourceId,
+ string consumerGroup,
+ string eventHubName,
+ ILogger logger,
+ Action configureOptions)
+ {
+ ArgumentNullException.ThrowIfNull(eventHubNamespaceResourceId);
+
+ if (string.IsNullOrWhiteSpace(consumerGroup))
+ {
+ throw new ArgumentException("Requires a non-blank Azure EventHubs consumer group to set up the test fixture", nameof(consumerGroup));
+ }
+
+ var credential = new DefaultAzureCredential();
+ var arm = new ArmClient(credential);
+
+ EventHubsNamespaceResource resource =
+ await arm.GetEventHubsNamespaceResource(eventHubNamespaceResourceId)
+ .GetAsync();
+
+ var consumerClient = new EventHubConsumerClient(consumerGroup, resource.Data.ServiceBusEndpoint, eventHubName, credential);
+
+ return await CreateIfNotExistsAsync(resource, consumerClient, eventHubName, logger, configureOptions);
+ }
+
+ ///
+ /// Creates a new instance of the which creates a new Azure EventHubs hub if it doesn't exist yet.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static async Task CreateIfNotExistsAsync(
+ EventHubsNamespaceResource eventHubsNamespace,
+ EventHubConsumerClient consumerClient,
+ string eventHubName,
+ ILogger logger,
+ Action configureOptions)
+ {
+ ArgumentNullException.ThrowIfNull(eventHubsNamespace);
+ logger ??= NullLogger.Instance;
+
+ if (string.IsNullOrWhiteSpace(eventHubName))
+ {
+ throw new ArgumentException("Requires a non-blank Azure EventHubs hub name to set up the test fixture", nameof(eventHubName));
+ }
+
+ var options = new TemporaryEventHubOptions();
+ configureOptions?.Invoke(options);
+
+ EventHubCollection eventHubs = eventHubsNamespace.GetEventHubs();
+ if (await eventHubs.ExistsAsync(eventHubName))
+ {
+ logger.LogDebug("[Test:Setup] Use already existing Azure EventHubs hub '{EventHubName}' in namespace '{Namespace}'", eventHubName, eventHubsNamespace.Id.Name);
+
+ return new TemporaryEventHub(eventHubsNamespace, consumerClient, eventHubName, createdByUs: false, logger);
+ }
+
+ logger.LogDebug("[Test:Setup] Create new Azure EventHubs hub '{EventHubName}' in namespace '{Namespace}'", eventHubName, eventHubsNamespace.Id.Name);
+ await eventHubs.CreateOrUpdateAsync(WaitUntil.Completed, eventHubName, options.OnSetup.EventHubData);
+
+ return new TemporaryEventHub(eventHubsNamespace, consumerClient, eventHubName, createdByUs: true, logger);
+ }
+
+ ///
+ /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.
+ ///
+ /// A task that represents the asynchronous dispose operation.
+ public async ValueTask DisposeAsync()
+ {
+ await using var disposables = new DisposableCollection(_logger);
+
+ if (_createdByUs)
+ {
+ disposables.Add(AsyncDisposable.Create(async () =>
+ {
+ NullableResponse eventHub =
+ await _eventHubsNamespace.GetEventHubs()
+ .GetIfExistsAsync(_eventHubName);
+
+ if (eventHub.HasValue && eventHub.Value != null)
+ {
+ _logger.LogDebug("[Test:Teardown] Delete Azure EventHubs hub '{EventHubName}' in namespace '{Namespace}'", _eventHubName, _eventHubsNamespace.Id.Name);
+ await eventHub.Value.DeleteAsync(WaitUntil.Completed);
+ }
+ }));
+ }
+
+ disposables.Add(_consumerClient);
+ }
+ }
+}
diff --git a/src/Arcus.Testing.Tests.Integration/Arcus.Testing.Tests.Integration.csproj b/src/Arcus.Testing.Tests.Integration/Arcus.Testing.Tests.Integration.csproj
index fdada4c0..a7fe36fa 100644
--- a/src/Arcus.Testing.Tests.Integration/Arcus.Testing.Tests.Integration.csproj
+++ b/src/Arcus.Testing.Tests.Integration/Arcus.Testing.Tests.Integration.csproj
@@ -26,6 +26,7 @@
+
diff --git a/src/Arcus.Testing.Tests.Integration/Messaging/Configuration/EventHubsConfig.cs b/src/Arcus.Testing.Tests.Integration/Messaging/Configuration/EventHubsConfig.cs
new file mode 100644
index 00000000..44abe4fc
--- /dev/null
+++ b/src/Arcus.Testing.Tests.Integration/Messaging/Configuration/EventHubsConfig.cs
@@ -0,0 +1,31 @@
+using Azure.Core;
+using Azure.ResourceManager.EventHubs;
+
+namespace Arcus.Testing.Tests.Integration.Messaging.Configuration
+{
+ public class EventHubsConfig
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public EventHubsConfig(ResourceIdentifier namespaceResourceId)
+ {
+ NamespaceResourceId = namespaceResourceId;
+ }
+
+ public ResourceIdentifier NamespaceResourceId { get; }
+ }
+
+ public static class TestConfigExtensions
+ {
+ public static EventHubsConfig GetEventHubs(this TestConfig config)
+ {
+ var resourceId = EventHubsNamespaceResource.CreateResourceIdentifier(
+ config["Arcus:SubscriptionId"],
+ config["Arcus:ResourceGroup:Name"],
+ config["Arcus:EventHubs:Namespace"]);
+
+ return new EventHubsConfig(resourceId);
+ }
+ }
+}
diff --git a/src/Arcus.Testing.Tests.Integration/Messaging/Fixture/EventHubsTestContext.cs b/src/Arcus.Testing.Tests.Integration/Messaging/Fixture/EventHubsTestContext.cs
new file mode 100644
index 00000000..0b2457e4
--- /dev/null
+++ b/src/Arcus.Testing.Tests.Integration/Messaging/Fixture/EventHubsTestContext.cs
@@ -0,0 +1,131 @@
+using System;
+using System.Collections.ObjectModel;
+using System.Threading.Tasks;
+using Arcus.Testing.Tests.Integration.Configuration;
+using Arcus.Testing.Tests.Integration.Fixture;
+using Arcus.Testing.Tests.Integration.Messaging.Configuration;
+using Azure;
+using Azure.Identity;
+using Azure.Messaging.EventHubs;
+using Azure.Messaging.EventHubs.Producer;
+using Azure.ResourceManager;
+using Azure.ResourceManager.EventHubs;
+using Azure.ResourceManager.EventHubs.Models;
+using Bogus;
+using Microsoft.Extensions.Logging;
+using Xunit;
+
+namespace Arcus.Testing.Tests.Integration.Messaging.Fixture
+{
+ public class EventHubsTestContext : IAsyncDisposable
+ {
+ private readonly TemporaryManagedIdentityConnection _connection;
+ private readonly EventHubsNamespaceResource _namespace;
+ private readonly Collection _eventHubNames = new();
+ private readonly ILogger _logger;
+
+ private static readonly Faker Bogus = new();
+
+ private EventHubsTestContext(
+ TemporaryManagedIdentityConnection connection,
+ EventHubsNamespaceResource @namespace,
+ ILogger logger)
+ {
+ _connection = connection;
+ _namespace = @namespace;
+ _logger = logger;
+ }
+
+ public static async Task GivenAsync(TestConfig config, ILogger logger)
+ {
+ ServicePrincipal servicePrincipal = config.GetServicePrincipal();
+ EventHubsConfig eventHubs = config.GetEventHubs();
+
+ var connection = TemporaryManagedIdentityConnection.Create(servicePrincipal);
+ var credential = new DefaultAzureCredential();
+
+ var arm = new ArmClient(credential);
+ EventHubsNamespaceResource resource =
+ await arm.GetEventHubsNamespaceResource(eventHubs.NamespaceResourceId)
+ .GetAsync();
+
+ return new EventHubsTestContext(connection, resource, logger);
+ }
+
+ public async Task WhenHubAvailableAsync()
+ {
+ string eventHubName = WhenHubNonAvailable();
+
+ _logger.LogDebug("[Test:Setup] Create existing Azure EventHbs '{EventHubName}' in namespace '{Namespace}'", eventHubName, _namespace.Id.Name);
+
+ var eventHubs = _namespace.GetEventHubs();
+ await eventHubs.CreateOrUpdateAsync(WaitUntil.Completed, eventHubName, new EventHubData
+ {
+ RetentionDescription = new RetentionDescription
+ {
+ CleanupPolicy = CleanupPolicyRetentionDescription.Delete,
+ RetentionTimeInHours = 1
+ }
+ });
+
+ return eventHubName;
+ }
+
+ public string WhenHubNonAvailable()
+ {
+ string eventHubName = $"hub-{Bogus.Random.Guid()}";
+ _eventHubNames.Add(eventHubName);
+
+ return eventHubName;
+ }
+
+ public async Task WhenEventAvailableOnHubAsync(string eventHubName, string partitionId = null)
+ {
+ var producerClient = new EventHubProducerClient(_namespace.Data.ServiceBusEndpoint, eventHubName, new DefaultAzureCredential());
+
+ var ev = new EventData(Bogus.Random.Bytes(10))
+ {
+ MessageId = $"id-{Bogus.Random.Guid()}"
+ };
+ await producerClient.SendAsync([ev], new SendEventOptions() { PartitionId = partitionId });
+
+ return ev;
+ }
+
+ public async Task ShouldHaveHubAsync(string evenHubName)
+ {
+ Assert.True(await _namespace.GetEventHubs().ExistsAsync(evenHubName), $"Azure EventHubs hub '{evenHubName}' should be available on the namespace, but it isn't");
+ }
+
+ public async Task ShouldNotHaveHubAsync(string eventHubName)
+ {
+ Assert.False(await _namespace.GetEventHubs().ExistsAsync(eventHubName), $"Azure EventHubs hub '{eventHubName}' should not be available on the namespace, but it is");
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await using var disposables = new DisposableCollection(_logger);
+
+ if (_eventHubNames.Count > 0)
+ {
+ disposables.Add(AsyncDisposable.Create(async () =>
+ {
+ EventHubCollection eventHubs = _namespace.GetEventHubs();
+ foreach (var eventHubName in _eventHubNames)
+ {
+ NullableResponse eventHub = await eventHubs.GetIfExistsAsync(eventHubName);
+ if (eventHub.HasValue && eventHub.Value != null)
+ {
+ _logger.LogDebug("[Test:Teardown] Fallback delete Azure EventHubs hub '{EventHubName}' in namespace '{Namespace}'", eventHubName, _namespace.Id.Name);
+ await eventHub.Value.DeleteAsync(WaitUntil.Started);
+ }
+ }
+ }));
+ }
+
+ disposables.Add(_connection);
+
+ GC.SuppressFinalize(this);
+ }
+ }
+}
diff --git a/src/Arcus.Testing.Tests.Integration/Messaging/TemporaryEventHubTests.cs b/src/Arcus.Testing.Tests.Integration/Messaging/TemporaryEventHubTests.cs
new file mode 100644
index 00000000..4c8292f2
--- /dev/null
+++ b/src/Arcus.Testing.Tests.Integration/Messaging/TemporaryEventHubTests.cs
@@ -0,0 +1,80 @@
+using System.Threading.Tasks;
+using Arcus.Testing.Tests.Integration.Messaging.Configuration;
+using Arcus.Testing.Tests.Integration.Messaging.Fixture;
+using Azure.Messaging.EventHubs;
+using Azure.ResourceManager.EventHubs.Models;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Arcus.Testing.Tests.Integration.Messaging
+{
+ public class TemporaryEventHubTests : IntegrationTest
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public TemporaryEventHubTests(ITestOutputHelper outputWriter) : base(outputWriter)
+ {
+ }
+
+ [Fact]
+ public async Task CreateTempHub_WithNonExistingHub_SucceedsByCreatingHubDuringLifetimeFixture()
+ {
+ // Arrange
+ await using var eventHubs = await GivenEventHubNamespaceAsync();
+
+ string eventHubName = eventHubs.WhenHubNonAvailable();
+
+ // Act
+ TemporaryEventHub temp = await CreateTempHubAsync(eventHubName);
+
+ // Assert
+ await eventHubs.ShouldHaveHubAsync(eventHubName);
+ await temp.DisposeAsync();
+ await eventHubs.ShouldNotHaveHubAsync(eventHubName);
+ }
+
+ [Fact]
+ public async Task CreateTempHub_WithExistingHub_SucceedsByLeavingHubAfterLifetimeFixture()
+ {
+ // Arrange
+ await using var eventHubs = await GivenEventHubNamespaceAsync();
+
+ string eventHubName = await eventHubs.WhenHubAvailableAsync();
+ EventData expected = await eventHubs.WhenEventAvailableOnHubAsync(eventHubName);
+
+ // Act
+ TemporaryEventHub temp = await CreateTempHubAsync(eventHubName);
+
+ // Assert
+ await eventHubs.ShouldHaveHubAsync(eventHubName);
+ Assert.Single(await temp.Events.Where(ev => ev.Data.MessageId == expected.MessageId).ToListAsync());
+
+ await temp.DisposeAsync();
+ await eventHubs.ShouldHaveHubAsync(eventHubName);
+ }
+
+ private async Task CreateTempHubAsync(string eventHubName)
+ {
+ EventHubsConfig config = Configuration.GetEventHubs();
+
+ return await TemporaryEventHub.CreateIfNotExistsAsync(
+ config.NamespaceResourceId, "$Default", eventHubName, Logger, options =>
+ {
+ options.OnSetup.CreateHubWith(hub =>
+ {
+ hub.RetentionDescription = new RetentionDescription
+ {
+ CleanupPolicy = CleanupPolicyRetentionDescription.Delete,
+ RetentionTimeInHours = 1
+ };
+ });
+ });
+ }
+
+ private async Task GivenEventHubNamespaceAsync()
+ {
+ return await EventHubsTestContext.GivenAsync(Configuration, Logger);
+ }
+ }
+}
diff --git a/src/Arcus.Testing.sln b/src/Arcus.Testing.sln
index a96e9690..39ae3b04 100644
--- a/src/Arcus.Testing.sln
+++ b/src/Arcus.Testing.sln
@@ -41,6 +41,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcus.Testing.Storage.Cosmo
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcus.Testing.Storage.Table", "Arcus.Testing.Storage.Table\Arcus.Testing.Storage.Table.csproj", "{74AB9F6E-791F-4609-96BD-15420C25AA72}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Messaging", "Messaging", "{321D9EE7-3A59-49E5-91D2-68BE8631FD09}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcus.Testing.Messaging.EventHubs", "Arcus.Testing.Messaging.EventHubs\Arcus.Testing.Messaging.EventHubs.csproj", "{01449C8F-3400-488E-B6D3-561B78B15310}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -101,6 +105,10 @@ Global
{74AB9F6E-791F-4609-96BD-15420C25AA72}.Debug|Any CPU.Build.0 = Debug|Any CPU
{74AB9F6E-791F-4609-96BD-15420C25AA72}.Release|Any CPU.ActiveCfg = Release|Any CPU
{74AB9F6E-791F-4609-96BD-15420C25AA72}.Release|Any CPU.Build.0 = Release|Any CPU
+ {01449C8F-3400-488E-B6D3-561B78B15310}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {01449C8F-3400-488E-B6D3-561B78B15310}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {01449C8F-3400-488E-B6D3-561B78B15310}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {01449C8F-3400-488E-B6D3-561B78B15310}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -118,6 +126,7 @@ Global
{2434BA10-0D39-4A86-9025-8E2439E08A8B} = {64013B91-7B1E-4F88-8FD7-CCF583D4A4CA}
{54A67E0F-270B-4979-9E72-C68EA8222F89} = {FA2E21E0-953E-4B84-9C47-C4F0A3833E4E}
{74AB9F6E-791F-4609-96BD-15420C25AA72} = {FA2E21E0-953E-4B84-9C47-C4F0A3833E4E}
+ {01449C8F-3400-488E-B6D3-561B78B15310} = {321D9EE7-3A59-49E5-91D2-68BE8631FD09}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E5382820-51FF-4B00-92BE-C78E80EA0841}
From 449216268bb4c41ae10f9b5cb715cab476d383ff Mon Sep 17 00:00:00 2001
From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 30 Jan 2025 17:35:16 +0100
Subject: [PATCH 2/7] pr-fix: correct role assignments
---
build/templates/test-resources.bicep | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/build/templates/test-resources.bicep b/build/templates/test-resources.bicep
index b5e42da7..eee121e3 100644
--- a/build/templates/test-resources.bicep
+++ b/build/templates/test-resources.bicep
@@ -139,7 +139,11 @@ module eventHubsNamespace 'br/public:avm/res/event-hub/namespace:0.9.1' = {
roleAssignments: [
{
principalId: servicePrincipal_objectId
- roleDefinitionIdOrName: 'Event Hubs Data Owner'
+ roleDefinitionIdOrName: 'Azure Event Hubs Data Sender'
+ }
+ {
+ principalId: servicePrincipal_objectId
+ roleDefinitionIdOrName: 'Azure Event Hubs Data Receiver'
}
]
}
From 438da16cb9fd7a2ed81cd5bb77a67724d1941668 Mon Sep 17 00:00:00 2001
From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Fri, 31 Jan 2025 09:32:29 +0100
Subject: [PATCH 3/7] pr-add: remaining event filtering system + feature docs
---
.../04-Azure/05-Messaging/02-eventhubs.md | 74 ++++++++
.../EventHubEventFilter.cs | 70 +++++--
.../TemporaryEventHub.cs | 177 ++++++++++++++----
.../Configuration/EventHubsConfig.cs | 16 +-
.../Messaging/Fixture/EventHubsTestContext.cs | 53 +++++-
.../Messaging/TemporaryEventHubTests.cs | 16 +-
.../Arcus.Testing.Tests.Unit.csproj | 1 +
.../Messaging/TemporaryEventHubTests.cs | 67 +++++++
8 files changed, 410 insertions(+), 64 deletions(-)
create mode 100644 docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
create mode 100644 src/Arcus.Testing.Tests.Unit/Messaging/TemporaryEventHubTests.cs
diff --git a/docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md b/docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
new file mode 100644
index 00000000..5a65f323
--- /dev/null
+++ b/docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
@@ -0,0 +1,74 @@
+# Event Hubs
+The `Arcus.Testing.Messaging.EventHubs` package provides test fixtures related to Azure Event Hubs. By using the common testing practice 'clean environment', it provides a temporary hub.
+
+## Installation
+The following functionality is available when installing this package:
+
+```powershell
+PM> Install-Package -Name Arcus.Testing.Messaging.EventHubs
+```
+
+## Temporary hub
+The `TemporaryEventHub` provides a solution when the integration test requires an Azure Event Hub during the test run. A hub is created upon the upon the setup of the test fixture and is deleted again when the fixture is disposed.
+
+> ✨ Only when the test fixture was responsible for creating the hub, will the hub be deleted upon the fixture's disposal. This follows the 'clean environment' testing principle that describes that after the test run, the same state should be achieved as before the test run.
+
+```csharp
+using Arcus.Testing;
+
+ResourceIdentifier eventHubsNamespaceResourceId =
+ EventHubsNamespaceResource.CreateResourceIdentifier("", "");
+
+await using var hub = await TemporaryEventHub.CreateIfNotExistsAsync(
+ eventHubsNamespaceResourceId, consumerGroup: "$Default", "", logger);
+```
+
+> ⚡ Uses by default the [`DefaultAzureCredential`](https://learn.microsoft.com/en-us/dotnet/api/azure.identity.defaultazurecredential) but other type of authentication mechanisms are supported with overloads.
+
+### Customization
+The `TemporaryEventHub` allows testers to configure setup operations to manipulate the test fixture's behavior.
+
+```csharp
+using Arcus.Testing;
+
+await TemporaryEventHub.CreateIfNotExistsAsync(..., options =>
+{
+ // Options related to when the test fixture is set up.
+ // ---------------------------------------------------
+
+ // Change the default hub-creation behavior.
+ options.OnSetup.CreateHubWith((EventHubData hub) =>
+ {
+ hub.PartitionCount = 4;
+ });
+});
+```
+
+### Search for events
+The `TemporaryEventHub` is equipped with an event filtering system that allows testers to search for events during the lifetime of the test fixture. This can be useful to verify the current state of a hub, or as a test assertion to verify EventHubs-related implementations.
+
+```csharp
+using Arcus.Testing;
+
+await using TemporaryEventHub hub = ...
+
+IEnumerable events =
+ await hub.Events
+
+ // Get subset events currently on the hub.
+ .Where(ev => ev.Data.Properties.ContainsKey(" ev.Data.ContentType == "application/json")
+
+ // Get events only from a single partition.
+ .FromPartition("", EventPosition.Earliest)
+
+ // Configures the read options that will be associated with the search operation.
+ .ReadWith((ReadEventOptions opt) =>
+ {
+ opt.MaximumWaitTime = TimeSpan.FromSeconds(10);
+ opt.OwnerLevel = 10;
+ })
+
+ // Start searching for events.
+ .ToListAsync();
+```
\ No newline at end of file
diff --git a/src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs b/src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
index 285816a1..52096633 100644
--- a/src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
+++ b/src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
@@ -29,23 +29,28 @@ internal EventHubEventFilter(EventHubConsumerClient client)
}
///
- ///
+ /// Indicate that only events from a given should be searched for.
///
/// The identifier of the Event Hub partition from which events will be received.
/// The position within the partition where the consumer should begin reading events.
- ///
+ /// Thrown when the is blank.
public EventHubEventFilter FromPartition(string partitionId, EventPosition startingPosition)
{
+ if (string.IsNullOrWhiteSpace(partitionId))
+ {
+ throw new ArgumentException("Requires a non-blank partition ID to search for events on an Azure Event Hub", nameof(partitionId));
+ }
+
_partitionId = partitionId;
_startingPosition = startingPosition;
return this;
}
///
- ///
+ /// Adds a to which the searched for events should match against.
///
- ///
- ///
+ /// The custom filter function to select a subset of events.
+ /// Thrown when the is null.>
public EventHubEventFilter Where(Func predicate)
{
ArgumentNullException.ThrowIfNull(predicate);
@@ -55,13 +60,33 @@ public EventHubEventFilter Where(Func predicate)
}
///
- ///
+ /// Configures the that will be associated with the event search operation.
+ /// Use for example the to shortcut the event searching early:
+ ///
+ ///
+ /// .ReadWith(options =>
+ /// {
+ /// options.MaximumWaitTime = TimeSpan.FromSeconds(10);
+ /// })
+ ///
+ ///
+ /// Or to change the when multiple event consumers are involved:
+ ///
+ ///
+ /// .ReadWith(options =>
+ /// {
+ /// options.OwnerLevel = 10;
+ /// })
+ ///
+ ///
///
///
///
public EventHubEventFilter ReadWith(Action configureOptions)
{
+ ArgumentNullException.ThrowIfNull(configureOptions);
configureOptions(_options);
+
return this;
}
@@ -74,19 +99,23 @@ public TaskAwaiter> GetAwaiter()
}
///
- ///
+ /// Determines whether the configured Azure Event Hub contains any matching events.
///
- ///
+ ///
+ /// if any events are found that matches the previously configured predicates; otherwise, .
+ ///
public async Task AnyAsync()
{
return await AnyAsync(CancellationToken.None);
}
///
- ///
+ /// Determines whether the configured Azure Event Hub contains any matching events.
///
- ///
- ///
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// if any events are found that matches the previously configured predicates; otherwise, .
+ ///
public async Task AnyAsync(CancellationToken cancellationToken)
{
List events = await ToListAsync(cancellationToken);
@@ -94,19 +123,17 @@ public async Task AnyAsync(CancellationToken cancellationToken)
}
///
- ///
+ /// Collects all events currently matching on the configured Azure Event Hub into a .
///
- ///
public async Task> ToListAsync()
{
return await ToListAsync(CancellationToken.None);
}
///
- ///
+ /// Collects all events currently matching on the configured Azure Event Hub into a .
///
- ///
- ///
+ /// An optional instance to signal the request to cancel the operation.
public async Task> ToListAsync(CancellationToken cancellationToken)
{
IAsyncEnumerable reading =
@@ -114,16 +141,21 @@ _partitionId is null
? _client.ReadEventsAsync(_options, cancellationToken)
: _client.ReadEventsFromPartitionAsync(_partitionId, _startingPosition, _options, cancellationToken);
- var events = new List();
+ var events = new Collection();
await foreach (PartitionEvent ev in reading)
{
- if (!ev.Equals(default) && _predicates.All(predicate => predicate(ev)))
+ if (ev.Data is null)
+ {
+ return events.ToList();
+ }
+
+ if (_predicates.All(predicate => predicate(ev)))
{
events.Add(ev);
}
}
- return events;
+ return events.ToList();
}
}
}
diff --git a/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs b/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
index 3068398b..b6b61292 100644
--- a/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
+++ b/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
@@ -32,14 +32,6 @@ public OnSetupTemporaryEventHubOptions CreateHubWith(Action config
}
}
- ///
- /// Represents the available options for the when the test fixture is teardown.
- ///
- public class OnTeardownTemporaryEventHubOptions
- {
-
- }
-
///
/// Represents the available options for the .
///
@@ -49,11 +41,6 @@ public class TemporaryEventHubOptions
/// Gets the available options when the test fixture is set up.
///
public OnSetupTemporaryEventHubOptions OnSetup { get; } = new();
-
- ///
- /// Gets the available options when the test fixture is teardown.
- ///
- public OnTeardownTemporaryEventHubOptions OnTeardown { get; } = new();
}
///
@@ -64,14 +51,15 @@ public class TemporaryEventHub : IAsyncDisposable
private readonly EventHubsNamespaceResource _eventHubsNamespace;
private readonly EventHubConsumerClient _consumerClient;
private readonly string _eventHubName;
- private readonly bool _createdByUs;
+ private readonly bool _hubCreatedByUs, _consumerClientCreatedByUs;
private readonly ILogger _logger;
private TemporaryEventHub(
EventHubsNamespaceResource eventHubsNamespace,
EventHubConsumerClient consumerClient,
+ bool consumerClientCreatedByUs,
string eventHubName,
- bool createdByUs,
+ bool hubCreatedByUs,
ILogger logger)
{
ArgumentNullException.ThrowIfNull(eventHubsNamespace);
@@ -79,8 +67,9 @@ private TemporaryEventHub(
_eventHubsNamespace = eventHubsNamespace;
_consumerClient = consumerClient;
+ _consumerClientCreatedByUs = consumerClientCreatedByUs;
_eventHubName = eventHubName;
- _createdByUs = createdByUs;
+ _hubCreatedByUs = hubCreatedByUs;
_logger = logger;
}
@@ -90,23 +79,66 @@ private TemporaryEventHub(
public EventHubEventFilter Events => new(_consumerClient);
///
- /// Creates a new instance of the which creates a new Azure EventHubs hub if it doesn't exist yet.
+ /// Creates a new instance of the which creates a new Azure Event Hub if it doesn't exist yet.
+ ///
+ ///
+ /// The resource ID pointing to the Azure EventHubs namespace where a hub should be test-managed.
+ /// The resource ID can be easily constructed via the :
+ ///
+ ///
+ /// ResourceIdentifier eventHubsNamespaceResourceId =
+ /// EventHubsNamespaceResource.CreateResourceIdentifier("<subscription-id>", "<resource-group>", "<namespace-name>");
+ ///
+ ///
+ ///
+ /// The name of the consumer group this consumer is associated with. Events are read in the context of this group.
+ /// The name of the specific Azure Event Hub to associate the consumer with.
+ /// The instance to log diagnostic information during the lifetime of the test fixture.
+ ///
+ /// Thrown when the is null.
+ ///
+ ///
+ /// Thrown when the or the is blank.
+ ///
+ public static async Task CreateIfNotExistsAsync(
+ ResourceIdentifier eventHubsNamespaceResourceId,
+ string consumerGroup,
+ string eventHubName,
+ ILogger logger)
+ {
+ return await CreateIfNotExistsAsync(eventHubsNamespaceResourceId, consumerGroup, eventHubName, logger, configureOptions: null);
+ }
+
+ ///
+ /// Creates a new instance of the which creates a new Azure Event Hub if it doesn't exist yet.
///
- ///
- ///
- ///
- ///
- ///
- ///
- ///
+ ///
+ /// The resource ID pointing to the Azure EventHubs namespace where a hub should be test-managed.
+ /// The resource ID can be easily constructed via the :
+ ///
+ ///
+ /// ResourceIdentifier eventHubsNamespaceResourceId =
+ /// EventHubsNamespaceResource.CreateResourceIdentifier("<subscription-id>", "<resource-group>", "<namespace-name>");
+ ///
+ ///
+ /// /// The name of the consumer group this consumer is associated with. Events are read in the context of this group.
+ /// The name of the specific Azure Event Hub to associate the consumer with.
+ /// The instance to log diagnostic information during the lifetime of the test fixture.
+ /// The function to manipulate the test fixture's lifetime behavior.
+ ///
+ /// Thrown when the is null.
+ ///
+ ///
+ /// Thrown when the or the is blank.
+ ///
public static async Task CreateIfNotExistsAsync(
- ResourceIdentifier eventHubNamespaceResourceId,
+ ResourceIdentifier eventHubsNamespaceResourceId,
string consumerGroup,
string eventHubName,
ILogger logger,
Action configureOptions)
{
- ArgumentNullException.ThrowIfNull(eventHubNamespaceResourceId);
+ ArgumentNullException.ThrowIfNull(eventHubsNamespaceResourceId);
if (string.IsNullOrWhiteSpace(consumerGroup))
{
@@ -117,32 +149,92 @@ public static async Task CreateIfNotExistsAsync(
var arm = new ArmClient(credential);
EventHubsNamespaceResource resource =
- await arm.GetEventHubsNamespaceResource(eventHubNamespaceResourceId)
+ await arm.GetEventHubsNamespaceResource(eventHubsNamespaceResourceId)
.GetAsync();
var consumerClient = new EventHubConsumerClient(consumerGroup, resource.Data.ServiceBusEndpoint, eventHubName, credential);
- return await CreateIfNotExistsAsync(resource, consumerClient, eventHubName, logger, configureOptions);
+ return await CreateIfNotExistsAsync(resource, consumerClient, consumerClientCreatedByUs: true, eventHubName, logger, configureOptions);
+ }
+
+ ///
+ /// Creates a new instance of the which creates a new Azure Event Hub if it doesn't exist yet.
+ ///
+ ///
+ /// The Azure EventHubs namespace resource where the Azure Event Hub should be test-managed.
+ /// The resource should be retrieved via the :
+ ///
+ ///
+ /// var credential = new DefaultAzureCredential();
+ /// var arm = new ArmClient(credential);
+ ///
+ /// EventHubsNamespaceResource eventHubsNamespace =
+ /// await arm.GetEventHubsNamespaceResource(eventHubNamespaceResourceId)
+ /// .GetAsync();
+ ///
+ ///
+ ///
+ /// The client to read events from the test-managed Azure Event Hub.
+ /// The name of the specific Azure Event Hub to associate the consumer with.
+ /// The instance to log diagnostic information during the lifetime of the test fixture.
+ ///
+ /// Thrown when the or is null.
+ ///
+ /// Thrown when the is blank.
+ public static async Task CreateIfNotExistsAsync(
+ EventHubsNamespaceResource eventHubsNamespace,
+ EventHubConsumerClient consumerClient,
+ string eventHubName,
+ ILogger logger)
+ {
+ return await CreateIfNotExistsAsync(eventHubsNamespace, consumerClient, eventHubName, logger, configureOptions: null);
}
///
- /// Creates a new instance of the which creates a new Azure EventHubs hub if it doesn't exist yet.
+ /// Creates a new instance of the which creates a new Azure Event Hub if it doesn't exist yet.
///
- ///
- ///
- ///
- ///
- ///
- ///
- ///
+ ///
+ /// The Azure EventHubs namespace resource where the Azure Event Hub should be test-managed.
+ /// The resource should be retrieved via the :
+ ///
+ ///
+ /// var credential = new DefaultAzureCredential();
+ /// var arm = new ArmClient(credential);
+ ///
+ /// EventHubsNamespaceResource eventHubsNamespace =
+ /// await arm.GetEventHubsNamespaceResource(eventHubNamespaceResourceId)
+ /// .GetAsync();
+ ///
+ ///
+ ///
+ /// The client to read events from the test-managed Azure Event Hub.
+ /// The name of the specific Azure Event Hub to associate the consumer with.
+ /// The instance to log diagnostic information during the lifetime of the test fixture.
+ /// The function to manipulate the test fixture's lifetime behavior.
+ ///
+ /// Thrown when the or is null.
+ ///
+ /// Thrown when the is blank.
public static async Task CreateIfNotExistsAsync(
EventHubsNamespaceResource eventHubsNamespace,
EventHubConsumerClient consumerClient,
string eventHubName,
ILogger logger,
Action configureOptions)
+ {
+ return await CreateIfNotExistsAsync(eventHubsNamespace, consumerClient, consumerClientCreatedByUs: false, eventHubName, logger, configureOptions);
+ }
+
+ private static async Task CreateIfNotExistsAsync(
+ EventHubsNamespaceResource eventHubsNamespace,
+ EventHubConsumerClient consumerClient,
+ bool consumerClientCreatedByUs,
+ string eventHubName,
+ ILogger logger,
+ Action configureOptions)
{
ArgumentNullException.ThrowIfNull(eventHubsNamespace);
+ ArgumentNullException.ThrowIfNull(consumerClient);
logger ??= NullLogger.Instance;
if (string.IsNullOrWhiteSpace(eventHubName))
@@ -158,13 +250,13 @@ public static async Task CreateIfNotExistsAsync(
{
logger.LogDebug("[Test:Setup] Use already existing Azure EventHubs hub '{EventHubName}' in namespace '{Namespace}'", eventHubName, eventHubsNamespace.Id.Name);
- return new TemporaryEventHub(eventHubsNamespace, consumerClient, eventHubName, createdByUs: false, logger);
+ return new TemporaryEventHub(eventHubsNamespace, consumerClient, consumerClientCreatedByUs, eventHubName, hubCreatedByUs: false, logger);
}
logger.LogDebug("[Test:Setup] Create new Azure EventHubs hub '{EventHubName}' in namespace '{Namespace}'", eventHubName, eventHubsNamespace.Id.Name);
await eventHubs.CreateOrUpdateAsync(WaitUntil.Completed, eventHubName, options.OnSetup.EventHubData);
- return new TemporaryEventHub(eventHubsNamespace, consumerClient, eventHubName, createdByUs: true, logger);
+ return new TemporaryEventHub(eventHubsNamespace, consumerClient, consumerClientCreatedByUs, eventHubName, hubCreatedByUs: true, logger);
}
///
@@ -175,7 +267,7 @@ public async ValueTask DisposeAsync()
{
await using var disposables = new DisposableCollection(_logger);
- if (_createdByUs)
+ if (_hubCreatedByUs)
{
disposables.Add(AsyncDisposable.Create(async () =>
{
@@ -191,7 +283,12 @@ await _eventHubsNamespace.GetEventHubs()
}));
}
- disposables.Add(_consumerClient);
+ if (_consumerClientCreatedByUs)
+ {
+ disposables.Add(_consumerClient);
+ }
+
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/Arcus.Testing.Tests.Integration/Messaging/Configuration/EventHubsConfig.cs b/src/Arcus.Testing.Tests.Integration/Messaging/Configuration/EventHubsConfig.cs
index 44abe4fc..39232e35 100644
--- a/src/Arcus.Testing.Tests.Integration/Messaging/Configuration/EventHubsConfig.cs
+++ b/src/Arcus.Testing.Tests.Integration/Messaging/Configuration/EventHubsConfig.cs
@@ -1,8 +1,12 @@
-using Azure.Core;
+using System;
+using Azure.Core;
using Azure.ResourceManager.EventHubs;
namespace Arcus.Testing.Tests.Integration.Messaging.Configuration
{
+ ///
+ /// Represents a test configuration subsection where all Azure Event Hubs-related configuration is stored.
+ ///
public class EventHubsConfig
{
///
@@ -10,14 +14,24 @@ public class EventHubsConfig
///
public EventHubsConfig(ResourceIdentifier namespaceResourceId)
{
+ ArgumentNullException.ThrowIfNull(namespaceResourceId);
NamespaceResourceId = namespaceResourceId;
}
+ ///
+ /// Gets the resource ID of the configured Azure Event Hubs namespace.
+ ///
public ResourceIdentifier NamespaceResourceId { get; }
}
+ ///
+ /// Extensions on the to make retrieval more test-friendly.
+ ///
public static class TestConfigExtensions
{
+ ///
+ /// Loads the configuration subsection from the current test .
+ ///
public static EventHubsConfig GetEventHubs(this TestConfig config)
{
var resourceId = EventHubsNamespaceResource.CreateResourceIdentifier(
diff --git a/src/Arcus.Testing.Tests.Integration/Messaging/Fixture/EventHubsTestContext.cs b/src/Arcus.Testing.Tests.Integration/Messaging/Fixture/EventHubsTestContext.cs
index 0b2457e4..861fe7fd 100644
--- a/src/Arcus.Testing.Tests.Integration/Messaging/Fixture/EventHubsTestContext.cs
+++ b/src/Arcus.Testing.Tests.Integration/Messaging/Fixture/EventHubsTestContext.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using Arcus.Testing.Tests.Integration.Configuration;
@@ -7,6 +8,7 @@
using Azure;
using Azure.Identity;
using Azure.Messaging.EventHubs;
+using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Producer;
using Azure.ResourceManager;
using Azure.ResourceManager.EventHubs;
@@ -17,6 +19,9 @@
namespace Arcus.Testing.Tests.Integration.Messaging.Fixture
{
+ ///
+ /// Represents a test-friendly interaction with Azure EventHubs.
+ ///
public class EventHubsTestContext : IAsyncDisposable
{
private readonly TemporaryManagedIdentityConnection _connection;
@@ -36,6 +41,9 @@ private EventHubsTestContext(
_logger = logger;
}
+ ///
+ /// Creates an based on the current test .
+ ///
public static async Task GivenAsync(TestConfig config, ILogger logger)
{
ServicePrincipal servicePrincipal = config.GetServicePrincipal();
@@ -52,6 +60,10 @@ await arm.GetEventHubsNamespaceResource(eventHubs.NamespaceResourceId)
return new EventHubsTestContext(connection, resource, logger);
}
+ ///
+ /// Sets up an existing Azure Event Hub.
+ ///
+ /// The name of the newly set up Azure Event Hub.
public async Task WhenHubAvailableAsync()
{
string eventHubName = WhenHubNonAvailable();
@@ -61,6 +73,7 @@ public async Task WhenHubAvailableAsync()
var eventHubs = _namespace.GetEventHubs();
await eventHubs.CreateOrUpdateAsync(WaitUntil.Completed, eventHubName, new EventHubData
{
+ PartitionCount = 1,
RetentionDescription = new RetentionDescription
{
CleanupPolicy = CleanupPolicyRetentionDescription.Delete,
@@ -71,6 +84,10 @@ public async Task WhenHubAvailableAsync()
return eventHubName;
}
+ ///
+ /// Sets up a non-existing Azure Event Hub.
+ ///
+ /// The name of the non-existing Azure Event Hub.
public string WhenHubNonAvailable()
{
string eventHubName = $"hub-{Bogus.Random.Guid()}";
@@ -79,6 +96,11 @@ public string WhenHubNonAvailable()
return eventHubName;
}
+ ///
+ /// Place an event on an Azure Event Hub.
+ ///
+ /// The name of the hub where to place the event.
+ /// The optional partition ID where specifically the event should be placed.
public async Task WhenEventAvailableOnHubAsync(string eventHubName, string partitionId = null)
{
var producerClient = new EventHubProducerClient(_namespace.Data.ServiceBusEndpoint, eventHubName, new DefaultAzureCredential());
@@ -87,21 +109,33 @@ public async Task WhenEventAvailableOnHubAsync(string eventHubName, s
{
MessageId = $"id-{Bogus.Random.Guid()}"
};
+
+ _logger.LogDebug("[Test:Setup] Send event '{MessageId}' on Azure EventHubs hub '{EventHubName}' in namespace '{Namespace}'", ev.MessageId, eventHubName, _namespace.Id.Name);
await producerClient.SendAsync([ev], new SendEventOptions() { PartitionId = partitionId });
return ev;
}
- public async Task ShouldHaveHubAsync(string evenHubName)
+ ///
+ /// Verifies that there exists an Azure Event Hub with the given in the currently configured namespace.
+ ///
+ public async Task ShouldHaveHubAsync(string eventHubName)
{
- Assert.True(await _namespace.GetEventHubs().ExistsAsync(evenHubName), $"Azure EventHubs hub '{evenHubName}' should be available on the namespace, but it isn't");
+ Assert.True(await _namespace.GetEventHubs().ExistsAsync(eventHubName), $"Azure EventHubs hub '{eventHubName}' should be available on the namespace, but it isn't");
}
+ ///
+ /// Verifies that there does not exist an Azure Event Hub with the given in the currently configured namespace.
+ ///
public async Task ShouldNotHaveHubAsync(string eventHubName)
{
Assert.False(await _namespace.GetEventHubs().ExistsAsync(eventHubName), $"Azure EventHubs hub '{eventHubName}' should not be available on the namespace, but it is");
}
+ ///
+ /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.
+ ///
+ /// A task that represents the asynchronous dispose operation.
public async ValueTask DisposeAsync()
{
await using var disposables = new DisposableCollection(_logger);
@@ -128,4 +162,19 @@ public async ValueTask DisposeAsync()
GC.SuppressFinalize(this);
}
}
+
+ ///
+ /// Extensions on the to make interaction more test-friendly.
+ ///
+ public static class EventHubEventFilterExtensions
+ {
+ ///
+ /// Verifies that the configured indeed only found a single event.
+ ///
+ public static async Task ShouldHaveSingleAsync(this EventHubEventFilter filter)
+ {
+ List events = await filter.ReadWith(opt => opt.MaximumWaitTime = TimeSpan.FromSeconds(10)).ToListAsync();
+ Assert.True(events.Count == 1, $"Azure EventHubs hub should have a single event available, but there were '{events.Count}' events");
+ }
+ }
}
diff --git a/src/Arcus.Testing.Tests.Integration/Messaging/TemporaryEventHubTests.cs b/src/Arcus.Testing.Tests.Integration/Messaging/TemporaryEventHubTests.cs
index 4c8292f2..4378ad86 100644
--- a/src/Arcus.Testing.Tests.Integration/Messaging/TemporaryEventHubTests.cs
+++ b/src/Arcus.Testing.Tests.Integration/Messaging/TemporaryEventHubTests.cs
@@ -2,6 +2,7 @@
using Arcus.Testing.Tests.Integration.Messaging.Configuration;
using Arcus.Testing.Tests.Integration.Messaging.Fixture;
using Azure.Messaging.EventHubs;
+using Azure.Messaging.EventHubs.Consumer;
using Azure.ResourceManager.EventHubs.Models;
using Xunit;
using Xunit.Abstractions;
@@ -30,6 +31,14 @@ public async Task CreateTempHub_WithNonExistingHub_SucceedsByCreatingHubDuringLi
// Assert
await eventHubs.ShouldHaveHubAsync(eventHubName);
+
+ string partitionId = "0";
+ EventData expected = await eventHubs.WhenEventAvailableOnHubAsync(eventHubName, partitionId);
+
+ await temp.Events.FromPartition(partitionId, EventPosition.Earliest)
+ .Where(ev => ev.Data.MessageId == expected.MessageId)
+ .ShouldHaveSingleAsync();
+
await temp.DisposeAsync();
await eventHubs.ShouldNotHaveHubAsync(eventHubName);
}
@@ -48,7 +57,9 @@ public async Task CreateTempHub_WithExistingHub_SucceedsByLeavingHubAfterLifetim
// Assert
await eventHubs.ShouldHaveHubAsync(eventHubName);
- Assert.Single(await temp.Events.Where(ev => ev.Data.MessageId == expected.MessageId).ToListAsync());
+
+ await temp.Events.Where(ev => ev.Data.MessageId == expected.MessageId)
+ .ShouldHaveSingleAsync();
await temp.DisposeAsync();
await eventHubs.ShouldHaveHubAsync(eventHubName);
@@ -59,10 +70,11 @@ private async Task CreateTempHubAsync(string eventHubName)
EventHubsConfig config = Configuration.GetEventHubs();
return await TemporaryEventHub.CreateIfNotExistsAsync(
- config.NamespaceResourceId, "$Default", eventHubName, Logger, options =>
+ config.NamespaceResourceId, consumerGroup: "$Default", eventHubName, Logger, options =>
{
options.OnSetup.CreateHubWith(hub =>
{
+ hub.PartitionCount = 1;
hub.RetentionDescription = new RetentionDescription
{
CleanupPolicy = CleanupPolicyRetentionDescription.Delete,
diff --git a/src/Arcus.Testing.Tests.Unit/Arcus.Testing.Tests.Unit.csproj b/src/Arcus.Testing.Tests.Unit/Arcus.Testing.Tests.Unit.csproj
index 4df70ff4..2fb6a991 100644
--- a/src/Arcus.Testing.Tests.Unit/Arcus.Testing.Tests.Unit.csproj
+++ b/src/Arcus.Testing.Tests.Unit/Arcus.Testing.Tests.Unit.csproj
@@ -36,6 +36,7 @@
+
diff --git a/src/Arcus.Testing.Tests.Unit/Messaging/TemporaryEventHubTests.cs b/src/Arcus.Testing.Tests.Unit/Messaging/TemporaryEventHubTests.cs
new file mode 100644
index 00000000..1570dd97
--- /dev/null
+++ b/src/Arcus.Testing.Tests.Unit/Messaging/TemporaryEventHubTests.cs
@@ -0,0 +1,67 @@
+using System;
+using System.Threading.Tasks;
+using Azure.Core;
+using Azure.Messaging.EventHubs.Consumer;
+using Azure.ResourceManager.EventHubs;
+using Bogus;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using Moq;
+using Xunit;
+
+namespace Arcus.Testing.Tests.Unit.Messaging
+{
+ public class TemporaryEventHubTests
+ {
+ private static readonly Faker Bogus = new();
+
+ private static ResourceIdentifier NamespaceResourceId => ResourceIdentifier.Root;
+ private static EventHubsNamespaceResource NamespaceResource => Mock.Of();
+ private static string ConsumerGroup => Bogus.Lorem.Word();
+ private static EventHubConsumerClient ConsumerClient => Mock.Of();
+ private static string EventHubName => Bogus.Lorem.Word();
+ private static ILogger Logger => NullLogger.Instance;
+ private static Action ConfigureOptions => _ => { };
+
+ [Theory]
+ [ClassData(typeof(Blanks))]
+ public async Task CreateTempHub_WithoutEventHubName_Fails(string eventHubName)
+ {
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(NamespaceResourceId, ConsumerGroup, eventHubName, Logger));
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(NamespaceResourceId, ConsumerGroup, eventHubName, Logger, ConfigureOptions));
+
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(NamespaceResource, ConsumerClient, eventHubName, Logger));
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(NamespaceResource, ConsumerClient, eventHubName, Logger, ConfigureOptions));
+ }
+
+ [Theory]
+ [ClassData(typeof(Blanks))]
+ public async Task CreateTempHub_WithoutConsumerGroup_Fails(string consumerGroup)
+ {
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(NamespaceResourceId, consumerGroup, EventHubName, Logger));
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(NamespaceResourceId, consumerGroup, EventHubName, Logger, ConfigureOptions));
+ }
+
+ [Fact]
+ public async Task CreateTempHub_WithoutNamespaceResourceId_Fails()
+ {
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(eventHubsNamespaceResourceId: null, ConsumerGroup, EventHubName, Logger));
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(eventHubsNamespaceResourceId: null, ConsumerGroup, EventHubName, Logger, ConfigureOptions));
+
+ }
+
+ [Fact]
+ public async Task CreateTempHub_WithoutNamespaceResource_Fails()
+ {
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(eventHubsNamespace: null, ConsumerClient, EventHubName, Logger));
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(eventHubsNamespace: null, ConsumerClient, EventHubName, Logger, ConfigureOptions));
+ }
+
+ [Fact]
+ public async Task CreateTempHub_WithoutConsumerClient_Fails()
+ {
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(NamespaceResource, consumerClient: null, EventHubName, Logger));
+ await Assert.ThrowsAnyAsync(() => TemporaryEventHub.CreateIfNotExistsAsync(NamespaceResource, consumerClient: null, EventHubName, Logger, ConfigureOptions));
+ }
+ }
+}
From a63635a5ea127dc5cb88f07cc908a70919b31d7c Mon Sep 17 00:00:00 2001
From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Fri, 31 Jan 2025 09:34:04 +0100
Subject: [PATCH 4/7] pr-fix: correct xml docs for param
---
src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs b/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
index b6b61292..50b1b8cd 100644
--- a/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
+++ b/src/Arcus.Testing.Messaging.EventHubs/TemporaryEventHub.cs
@@ -121,7 +121,8 @@ public static async Task CreateIfNotExistsAsync(
/// EventHubsNamespaceResource.CreateResourceIdentifier("<subscription-id>", "<resource-group>", "<namespace-name>");
///
///
- /// /// The name of the consumer group this consumer is associated with. Events are read in the context of this group.
+ ///
+ /// The name of the consumer group this consumer is associated with. Events are read in the context of this group.
/// The name of the specific Azure Event Hub to associate the consumer with.
/// The instance to log diagnostic information during the lifetime of the test fixture.
/// The function to manipulate the test fixture's lifetime behavior.
From a33c51956858162d5086b8a2d3e94dda2619133f Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Fri, 31 Jan 2025 10:11:57 +0100
Subject: [PATCH 5/7] Update appsettings.json
---
src/Arcus.Testing.Tests.Integration/appsettings.json | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/src/Arcus.Testing.Tests.Integration/appsettings.json b/src/Arcus.Testing.Tests.Integration/appsettings.json
index 600e822c..f944988c 100644
--- a/src/Arcus.Testing.Tests.Integration/appsettings.json
+++ b/src/Arcus.Testing.Tests.Integration/appsettings.json
@@ -16,6 +16,9 @@
"Name": "#{Arcus.Testing.StorageAccount.Name}#",
"Key": "#{Arcus.Testing.StorageAccount.Key}#"
},
+ "EventHubs": {
+ "Namespace": "#{Arcus.Testing.EventHubs.Namespace}#"
+ },
"Cosmos": {
"MongoDb": {
"Name": "#{Arcus.Testing.Cosmos.MongoDb.Name}#",
@@ -27,4 +30,4 @@
}
}
}
-}
\ No newline at end of file
+}
From a9e4772c7f853251c54a921c3cb0b77c443576f9 Mon Sep 17 00:00:00 2001
From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Fri, 31 Jan 2025 11:24:58 +0100
Subject: [PATCH 6/7] pr-add: event hubs resource code example in feature docs
---
.../03-Features/04-Azure/05-Messaging/02-eventhubs.md | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md b/docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
index 5a65f323..cddf05a1 100644
--- a/docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
+++ b/docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
@@ -23,7 +23,15 @@ await using var hub = await TemporaryEventHub.CreateIfNotExistsAsync(
eventHubsNamespaceResourceId, consumerGroup: "$Default", "", logger);
```
-> ⚡ Uses by default the [`DefaultAzureCredential`](https://learn.microsoft.com/en-us/dotnet/api/azure.identity.defaultazurecredential) but other type of authentication mechanisms are supported with overloads.
+> ⚡ Uses by default the [`DefaultAzureCredential`](https://learn.microsoft.com/en-us/dotnet/api/azure.identity.defaultazurecredential) but other type of authentication mechanisms are supported with overloads that take in the `EventHubsNamespaceResource` directly:
+> ```csharp
+> var credential = new DefaultAzureCredential();
+> var arm = new ArmClient(credential);
+>
+> EventHubsNamespaceResource eventHubsNamespace =
+> await arm.GetEventHubsNamespaceResource(eventHubNamespaceResourceId)
+> .GetAsync();
+> ```
### Customization
The `TemporaryEventHub` allows testers to configure setup operations to manipulate the test fixture's behavior.
From 8cf60ef20cead56ab18bf563efb60307f77e4d17 Mon Sep 17 00:00:00 2001
From: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Fri, 21 Feb 2025 13:57:02 +0100
Subject: [PATCH 7/7] fix(.sln): re-add service-bus to /Messaging solution
folder
---
src/Arcus.Testing.sln | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/Arcus.Testing.sln b/src/Arcus.Testing.sln
index 80bccfc4..972d44d0 100644
--- a/src/Arcus.Testing.sln
+++ b/src/Arcus.Testing.sln
@@ -130,7 +130,7 @@ Global
{1E771531-C584-474E-A338-A29D6493D600} = {FA2E21E0-953E-4B84-9C47-C4F0A3833E4E}
{2321112D-0971-4465-AF85-42BAFF265C1B} = {0848C446-1D7E-43E1-83A8-5C1A981E3C6F}
{2434BA10-0D39-4A86-9025-8E2439E08A8B} = {64013B91-7B1E-4F88-8FD7-CCF583D4A4CA}
- {4F199527-761E-4F8A-AB44-8DE8D030518F} = {4992B48D-3C17-45A9-979B-B79CE1E987CB}
+ {4F199527-761E-4F8A-AB44-8DE8D030518F} = {321D9EE7-3A59-49E5-91D2-68BE8631FD09}
{54A67E0F-270B-4979-9E72-C68EA8222F89} = {FA2E21E0-953E-4B84-9C47-C4F0A3833E4E}
{74AB9F6E-791F-4609-96BD-15420C25AA72} = {FA2E21E0-953E-4B84-9C47-C4F0A3833E4E}
{01449C8F-3400-488E-B6D3-561B78B15310} = {321D9EE7-3A59-49E5-91D2-68BE8631FD09}