Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add initial eventhubs test components #265

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions build/deploy-test-resources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ stages:
--parameters cosmosDb_mongoDb_databaseName=${{ variables['Arcus.Testing.Cosmos.MongoDb.DatabaseName'] }} `
--parameters cosmosDb_noSql_name=${{ variables['Arcus.Testing.Cosmos.NoSql.Name'] }} `
--parameters cosmosDb_noSql_databaseName=${{ variables['Arcus.Testing.Cosmos.NoSql.DatabaseName'] }} `
--parameters eventHubsNamespaceName=${{ variables['Arcus.Testing.EventHubs.Namespace'] }} `
--parameters serviceBusNamespaceName=${{ variables['Arcus.Testing.ServiceBus.Namespace'] }} `
--parameters keyVaultName=${{ parameters.keyVaultName }} `
--parameters servicePrincipal_objectId=$objectId
Expand Down
24 changes: 24 additions & 0 deletions build/templates/test-resources.bicep
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ param cosmosDb_noSql_name string
// Define the name of the CosmosDb NoSql database that will be created.
param cosmosDb_noSql_databaseName string

// Define the name of the Azure EventHubs namespace that will be created.
param eventHubsNamespaceName string

// Define the name of the Service bus namespace resource that will be created.
param serviceBusNamespaceName string

Expand Down Expand Up @@ -128,6 +131,27 @@ module cosmosDb_noSql 'br/public:avm/res/document-db/database-account:0.6.0' = {
}
}

module eventHubsNamespace 'br/public:avm/res/event-hub/namespace:0.9.1' = {
name: 'eventHubsNamespaceDeployment'
params: {
name: eventHubsNamespaceName
location: location
skuName: 'Basic'
skuCapacity: 1
publicNetworkAccess: 'Enabled'
roleAssignments: [
{
principalId: servicePrincipal_objectId
roleDefinitionIdOrName: 'Azure Event Hubs Data Sender'
}
{
principalId: servicePrincipal_objectId
roleDefinitionIdOrName: 'Azure Event Hubs Data Receiver'
}
]
}
}

module serviceBusNamespace 'br/public:avm/res/service-bus/namespace:0.10.1' = {
name: 'serviceBusNamespaceDeployment'
params: {
Expand Down
3 changes: 2 additions & 1 deletion build/variables/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ variables:
Arcus.Testing.Cosmos.MongoDb.DatabaseName: 'arcus-testing-cosmos-mongo-db'
Arcus.Testing.Cosmos.NoSql.Name: 'arcus-testing-cosmos-nosql'
Arcus.Testing.Cosmos.NoSql.DatabaseName: 'arcus-testing-cosmos-nosql-db'
Arcus.Testing.KeyVault.Name: 'arcus-testing-kv'
Arcus.Testing.EventHubs.Namespace: 'arcus-testing-eventhubs'
Arcus.Testing.ServiceBus.Namespace: 'arcus-testing-servicebus'
Arcus.Testing.KeyVault.Name: 'arcus-testing-kv'
Arcus.Testing.StorageAccount.Name: 'arcustestingstorage'
Arcus.Testing.StorageAccount.Key.SecretName: 'Arcus-Testing-StorageAccount-Key'
82 changes: 82 additions & 0 deletions docs/preview/03-Features/04-Azure/05-Messaging/02-eventhubs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Event Hubs
The `Arcus.Testing.Messaging.EventHubs` package provides test fixtures related to Azure Event Hubs. By using the common testing practice 'clean environment', it provides a temporary hub.

## Installation
The following functionality is available when installing this package:

```powershell
PM> Install-Package -Name Arcus.Testing.Messaging.EventHubs
```

## Temporary hub
The `TemporaryEventHub` provides a solution when the integration test requires an Azure Event Hub during the test run. A hub is created upon the upon the setup of the test fixture and is deleted again when the fixture is disposed.

> ✨ Only when the test fixture was responsible for creating the hub, will the hub be deleted upon the fixture's disposal. This follows the 'clean environment' testing principle that describes that after the test run, the same state should be achieved as before the test run.

```csharp
using Arcus.Testing;

ResourceIdentifier eventHubsNamespaceResourceId =
EventHubsNamespaceResource.CreateResourceIdentifier("<subscription-id", "<resource-group>", "<namespace-name>");

await using var hub = await TemporaryEventHub.CreateIfNotExistsAsync(
eventHubsNamespaceResourceId, consumerGroup: "$Default", "<event-hub-name>", logger);
```

> ⚡ Uses by default the [`DefaultAzureCredential`](https://learn.microsoft.com/en-us/dotnet/api/azure.identity.defaultazurecredential) but other type of authentication mechanisms are supported with overloads that take in the `EventHubsNamespaceResource` directly:
> ```csharp
> var credential = new DefaultAzureCredential();
> var arm = new ArmClient(credential);
>
> EventHubsNamespaceResource eventHubsNamespace =
> await arm.GetEventHubsNamespaceResource(eventHubNamespaceResourceId)
> .GetAsync();
> ```

### Customization
The `TemporaryEventHub` allows testers to configure setup operations to manipulate the test fixture's behavior.

```csharp
using Arcus.Testing;

await TemporaryEventHub.CreateIfNotExistsAsync(..., options =>
{
// Options related to when the test fixture is set up.
// ---------------------------------------------------

// Change the default hub-creation behavior.
options.OnSetup.CreateHubWith((EventHubData hub) =>
{
hub.PartitionCount = 4;
});
});
```

### Search for events
The `TemporaryEventHub` is equipped with an event filtering system that allows testers to search for events during the lifetime of the test fixture. This can be useful to verify the current state of a hub, or as a test assertion to verify EventHubs-related implementations.

```csharp
using Arcus.Testing;

await using TemporaryEventHub hub = ...

IEnumerable<PartitionEvent> events =
await hub.Events

// Get subset events currently on the hub.
.Where(ev => ev.Data.Properties.ContainsKey("<my-key"))
.Where(ev => ev.Data.ContentType == "application/json")

// Get events only from a single partition.
.FromPartition("<partition-id>", EventPosition.Earliest)

// Configures the read options that will be associated with the search operation.
.ReadWith((ReadEventOptions opt) =>
{
opt.MaximumWaitTime = TimeSpan.FromSeconds(10);
opt.OwnerLevel = 10;
})

// Start searching for events.
.ToListAsync();
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net8.0;net6.0</TargetFrameworks>
<RootNamespace>Arcus.Testing</RootNamespace>
<Authors>Arcus</Authors>
<Company>Arcus</Company>
<Description>Provides messaging capabilities for Azure EventHubs during Arcus testing</Description>
<Copyright>Copyright (c) Arcus</Copyright>
<PackageProjectUrl>https://github.com/arcus-azure/arcus.testing</PackageProjectUrl>
<RepositoryUrl>https://github.com/arcus-azure/arcus.testing</RepositoryUrl>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageIcon>icon.png</PackageIcon>
<PackageReadmeFile>README.md</PackageReadmeFile>
<RepositoryType>Git</RepositoryType>
<PackageTags>Azure;Testing;Messaging;EventHubs</PackageTags>
<PackageId>Arcus.Testing.Messaging.EventHubs</PackageId>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsNotAsErrors>$(WarningsNotAsErrors);NU1901;NU1902;NU1903;NU1904</WarningsNotAsErrors>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\README.md" Pack="true" PackagePath="\" />
<None Include="..\..\LICENSE" Pack="true" PackagePath="\" />
<None Include="..\..\docs\static\img\icon.png" Pack="true" PackagePath="\" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="[1.13.2,2.0.0)" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="[5.11.5,6.0.0)" />
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="[5.11.5,6.0.0)" />
<PackageReference Include="Azure.ResourceManager.EventHubs" Version="[1.1.0,2.0.0)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Arcus.Testing.Core\Arcus.Testing.Core.csproj" />
</ItemGroup>

</Project>
161 changes: 161 additions & 0 deletions src/Arcus.Testing.Messaging.EventHubs/EventHubEventFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Consumer;

namespace Arcus.Testing
{
/// <summary>
/// Represents a configurable filter instance that selects a subset of <see cref="PartitionEvent"/>s on an Azure EventHubs hub
/// (a.k.a. 'spy test fixture').
/// </summary>
public class EventHubEventFilter
{
private readonly EventHubConsumerClient _client;
private readonly ReadEventOptions _options = new() { MaximumWaitTime = TimeSpan.FromMinutes(1) };
private readonly Collection<Func<PartitionEvent, bool>> _predicates = new();

private string _partitionId;
private EventPosition _startingPosition;

internal EventHubEventFilter(EventHubConsumerClient client)
{
ArgumentNullException.ThrowIfNull(client);
_client = client;
}

/// <summary>
/// Indicate that only events from a given <paramref name="partitionId"/> should be searched for.
/// </summary>
/// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</param>
/// <param name="startingPosition">The position within the partition where the consumer should begin reading events.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="partitionId"/> is blank.</exception>
public EventHubEventFilter FromPartition(string partitionId, EventPosition startingPosition)
{
if (string.IsNullOrWhiteSpace(partitionId))
{
throw new ArgumentException("Requires a non-blank partition ID to search for events on an Azure Event Hub", nameof(partitionId));
}

_partitionId = partitionId;
_startingPosition = startingPosition;
return this;
}

/// <summary>
/// Adds a <paramref name="predicate"/> to which the searched for events should match against.
/// </summary>
/// <param name="predicate">The custom filter function to select a subset of events.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="predicate"/> is <c>null</c>.</exception>>
public EventHubEventFilter Where(Func<PartitionEvent, bool> predicate)
{
ArgumentNullException.ThrowIfNull(predicate);
_predicates.Add(predicate);

return this;
}

/// <summary>
/// <para>Configures the <see cref="ReadEventOptions"/> that will be associated with the event search operation.</para>
/// <para>Use for example the <see cref="ReadEventOptions.MaximumWaitTime"/> to shortcut the event searching early:</para>
/// <example>
/// <code>
/// .ReadWith(options =>
/// {
/// options.MaximumWaitTime = TimeSpan.FromSeconds(10);
/// })
/// </code>
/// </example>
/// <para>Or to change the <see cref="ReadEventOptions.OwnerLevel"/> when multiple event consumers are involved:</para>
/// <example>
/// <code>
/// .ReadWith(options =>
/// {
/// options.OwnerLevel = 10;
/// })
/// </code>
/// </example>
/// </summary>
/// <param name="configureOptions"></param>
/// <returns></returns>
public EventHubEventFilter ReadWith(Action<ReadEventOptions> configureOptions)
{
ArgumentNullException.ThrowIfNull(configureOptions);
configureOptions(_options);

return this;
}

/// <summary>
/// Gets the awaiter used to await the <see cref="ToListAsync()"/>.
/// </summary>
public TaskAwaiter<List<PartitionEvent>> GetAwaiter()
{
return ToListAsync().GetAwaiter();
}

/// <summary>
/// Determines whether the configured Azure Event Hub contains any matching events.
/// </summary>
/// <returns>
/// <see langword="true" /> if any events are found that matches the previously configured predicates; otherwise, <see langword="false" />.
/// </returns>
public async Task<bool> AnyAsync()
{
return await AnyAsync(CancellationToken.None);
}

/// <summary>
/// Determines whether the configured Azure Event Hub contains any matching events.
/// </summary>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
/// <returns>
/// <see langword="true" /> if any events are found that matches the previously configured predicates; otherwise, <see langword="false" />.
/// </returns>
public async Task<bool> AnyAsync(CancellationToken cancellationToken)
{
List<PartitionEvent> events = await ToListAsync(cancellationToken);
return events.Any();
}

/// <summary>
/// Collects all events currently matching on the configured Azure Event Hub into a <see cref="List{T}"/>.
/// </summary>
public async Task<List<PartitionEvent>> ToListAsync()
{
return await ToListAsync(CancellationToken.None);
}

/// <summary>
/// Collects all events currently matching on the configured Azure Event Hub into a <see cref="List{T}"/>.
/// </summary>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
public async Task<List<PartitionEvent>> ToListAsync(CancellationToken cancellationToken)
{
IAsyncEnumerable<PartitionEvent> reading =
_partitionId is null
? _client.ReadEventsAsync(_options, cancellationToken)
: _client.ReadEventsFromPartitionAsync(_partitionId, _startingPosition, _options, cancellationToken);

var events = new Collection<PartitionEvent>();
await foreach (PartitionEvent ev in reading)
{
if (ev.Data is null)
{
return events.ToList();
}

if (_predicates.All(predicate => predicate(ev)))
{
events.Add(ev);
}
}

return events.ToList();
}
}
}
Loading
Loading