At a high-level this represents the subscribing of events/messages from Azure Service Bus to meet the stated requirements of when an Employee is terminated that the Employee's User Account will be automatically deactivated within OKTA.
This article contains an end-to-end walkthrough to build the requisite Azure Function.
Note: as of
Beef
(v5.9.0
), and correspondingCoreEx
(v3.9.0
), isolated Azure Function support was added. This documentation and example have been updated accordingly.
A new Security domain should be implemented within a new .NET Solution, independent of the Hr domain. It is conceivable that this new domain could contain its own APIs and data repository, etc. Maybe even leverage Beef ;-)
However, for the purposes of this sample, a new domain will not be implemented. The requisite functionality will be developed within the existing MyEf.Hr
solution, within a new MyEf.Hr.Security
namespace for basic separation and simplicity. (Disclaimer: obviously, this is not the recommended approach where implementing proper).
It is intended that the Security domain subscribing capabilities are also hosted in Azure, as such the architectural decision has been made to develop leveraging an Azure Function.
This will enable the subscriber function to essentially run in the background, and we will also be able to leverage the Azure Service Bus trigger to invoke the requisite logic per event/message received.
It is assumed for the purposes of this sample that the developer has at least some basic knowledge of Azure Functions, and has likely developed one in the past.
The CoreEx.Events
capabilities enables messaging subsystem agnostic subscribing functionality; of interest are the following.
Class | Description |
---|---|
EventSubscriberBase |
Provides the messaging platform host agnostic base functionality, such as the IErrorHandling configuration, being the corresponding ErrorHandling action per error type. Also encapsulates the underlying message to EventData deserialization and associated error handling. |
EventSubscriberOrchestrator |
Enables none or more subscribers (IEventSubscriber ) to be added, which are dynamically invoked at runtime when their EventSubscriberAttribute configuration matches the received EventData message content. For more information see CoreEx orchestrated subscribing. |
The CoreEx.Azure.ServiceBus
capabilities will be leveraged to manage the underlying processing once triggered. This has the advantage of enabling standardized, consistent, and tested, functionality to further industrialize event/messaging subscription services.
There are two Azure Service Bus subscribing capabilities that both inherit from the aforementioned EventSubscriberBase
. These each also leverage the ServiceBusSubscriberInvoker
that ensures consistency of logging, exception handling, associated ServiceBusMessageActions
management to perform the corresponding CompleteMessageAsync
or DeadLetterMessageAsync
, and finally message bubbling to enable retries where the error is considered transient in nature.
Class | Description |
---|---|
ServiceBusSubscriber |
Provides the standard ServiceBusReceivedMessage subscribe (receive) execution encapsulation to run the underlying function logic in a consistent manner. This is generally used where the EventData content is consistent; i.e. a single message type. |
ServiceBusOrchestratedSubscriber |
Provides the EventSubscriberOrchestrator -managed ServiceBusReceivedMessage subscribe (receive) execution encapsulation to run the underlying function logic in a consistent manner. For the most part, this is the preferred, most flexible option. |
This sample will leverage the ServiceBusOrchestratedSubscriber
to subscribe to the terminated events; whilst ignoring all other events. This has the advantage, that other events can be subscribed to over time by leveraging the same underlying Azure Function, whilst ensuring processing in order (where applicable).
For the most part, where multiple domains exists, then topics and subscriptions should be leveraged; as each consuming domain can process their own subscription (or subscriptions) independently of others. Additionally, the subscriptions capability enables filters thats can be used to select specific messages in advance of invoking the function code.
This sample has used a basic queue for simplicity.
From Visual Studio, add a new Project named MyEf.Hr.Security.Subscriptions
(within the existing MyEf.Hr
solution) leveraging the Azure Functions project template.
On the additional information page of the wizard, enter the following, then Create for the new SecuritySubscriberFunction
function.
Property | Value |
---|---|
Functions worker | .NET 8.0 Isolated (Long Term Support) |
Function | Service Bus Queue Trigger |
Connection string setting name | ServiceBusConnectionString |
Queue name | %ServiceBusQueueName% |
Then complete the following house cleaning tasks within the newly created project.
Update project dependencies as follows.
- Add the
CoreEx.Azure
,CoreEx.Validation
andMicrosoft.Extensions.Http.Polly
NuGet packages as dependencies. - Add
MyHr.Ef.Common
as a project reference dependency (within a real implemenation the*.Common
assemblies should be published as internal packages for reuse across domains; that is largely their purpose).
Open the host.json
file and replace with the contents from host.json
.
The bindings configuration documentation will assist with understanding and further configuring.
Open the local.settings.json
file and replace Values
JSON with the following; including your actual connection string secret.
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"ServiceBusConnectionString": "get-your-secret-and-paste-here",
"ServiceBusQueueName": "event-stream"
}
}
Create a new GlobalUsings.cs
file, then copy in the contents from GlobalUsings
replacing existing.
Open the Program.cs
file and replace with the contents from Program.cs
. This is essentially the same as the default; however, a Startup
class is now referenced using ConfigureHostStartup
. This is a CoreEx.Hosting
extension method that will enable the requisite Dependency Injection to also be used within unit testing.
Dependency Injection needs to be added, which is also accessible from unit tests. To enable, create a new Startup.cs
file, then copy in the contents from Startup
. For the most part the required CoreEx
services are being registered.
The Service Bus subscribing requires the following additional services registered.
Service | Description |
---|---|
EventSubscriberOrchestrator |
The AddEventSubscriberOrchestrator() is used to register the EventSubscriberOrchestrator as a singleton service. The delegate enables the opportunity to further configure options.Setting NotSubscribedHandling = ErrorHandling.CompleteAsSilent indicates that any messages not subscribed should be completed silently without any corresponding logging. Essentially, skipping any unsubscribed messages. The AddSubscribers(EventSubscriberOrchestrator.GetSubscribers<Startup>() adds all the IEventSubscriber types defined within the specified assembly. The are also additional methods to add specific subscribers manually where applicable. |
ServiceBusOrchestratedSubscriber |
The AddEventSubscriberOrchestrator() is used to register the ServiceBusOrchestratedSubscriber as a scoped service. The delegate enables the opportunity to further configure options.Similar to above, the likes of EventDataDeserializationErrorHandling and other IErrorHandling properties can be overridden to define the default error handling behaviours. |
OktaHttpClient |
The AddTypedHttpClient() is a standard .NET method to register a typed-HttpClient , in this case an OktaHttpClient . A name is also specified so it can be uniquely referenced; this is useful for the likes of unit testing. |
The aforementioned services registration code of interest is as follows.
.AddEventSubscriberOrchestrator((_, o) =>
{
o.NotSubscribedHandling = ErrorHandling.CompleteAsSilent;
o.AddSubscribers(EventSubscriberOrchestrator.GetSubscribers<Startup>());
})
.AddAzureServiceBusOrchestratedSubscriber((_, o) =>
{
o.EventDataDeserializationErrorHandling = ErrorHandling.HandleBySubscriber;
})
.AddTypedHttpClient<OktaHttpClient>("OktaApi", (sp, hc) =>
{
var settings = sp.GetRequiredService<SecuritySettings>();
hc.BaseAddress = new Uri(settings.OktaHttpClientBaseUri);
}).AddTransientHttpErrorPolicy(policy => policy.WaitAndRetryAsync(retryCount: 3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))));
This represents the Azure Service Bus trigger subscription entry point; i.e. what is the registered function logic to be executed by the Azure Function runtime fabric. This requires the use of Dependency Injection to access the registered ServiceBusOrchestratedSubscriber
to orchestrate the underlying subscribers.
The function method signature must include the ServiceBusTrigger attribute to specify the queue or topic/subscription related properties, sessions support, as well as the Service Bus connection string name. Finally, for the ServiceBusOrchestratedSubscriber
to function correctly the ServiceBusReceivedMessage
and ServiceBusMessageActions
parameters must be specified and passed into the ServiceBusOrchestratedSubscriber.ReceiveAsync
.
Note: do not under any circumstances use
AutoCompleteMessages
as completion is managed internally.
Rename the Function1.cs
to SecuritySubscriberFunction.cs
where this did not occur correctly. Copy and replace the contents from the following.
namespace MyEf.Hr.Security.Subscriptions;
public class SecuritySubscriberFunction(ServiceBusOrchestratedSubscriber subscriber)
{
private readonly ServiceBusOrchestratedSubscriber _subscriber = subscriber.ThrowIfNull();
[Function(nameof(SecuritySubscriberFunction))]
public Task RunAsync([ServiceBusTrigger("%ServiceBusQueueName%", Connection = "ServiceBusConnectionString")] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, CancellationToken cancellationToken)
=> _subscriber.ReceiveAsync(message, messageActions, null, cancellationToken);
}
Create a new SecuritySettings.cs
file, then copy in the following contents. This is similar to the HrSettings
created earlier; however, the properties are specific to this domain. The key property is the OktaHttpClientBaseUri
to support the OKTA API endpoint specification.
namespace MyEf.Hr.Security.Subscriptions;
/// <summary>
/// Provides the <see cref="IConfiguration"/> settings.
/// </summary>
public class SecuritySettings : SettingsBase
{
/// <summary>
/// Gets the setting prefixes in order of precedence.
/// </summary>
public static string[] Prefixes { get; } = { "Security/", "Common/" };
/// <summary>
/// Initializes a new instance of the <see cref="SecuritySettings"/> class.
/// </summary>
/// <param name="configuration">The <see cref="IConfiguration"/>.</param>
public SecuritySettings(IConfiguration configuration) : base(configuration, Prefixes) => ValidationArgs.DefaultUseJsonNames = true;
/// <summary>
/// Gets the OKTA API base URI.
/// </summary>
public string OktaHttpClientBaseUri => GetRequiredValue<string>("OktaHttpClient__BaseUri");
}
Create a new corresponding appsettings.json
file, then copy in the following contents. Update the file properties; set Build Action to Content, and Copy to Output Directory to Copy if newer. The '*' within denotes that the configuration settings are accessed internally by CoreEx at runtime and therefore do not need to be specifically defined as SecuritySettings
properties.
Setting | Description |
---|---|
OktaHttpClient:BaseUri |
The base Uri for the external OKTA API. |
ServiceBusOrchestratedSubscriber.AbandonOnTransient * |
Indicates that the message should be explicitly abandoned where transient error occurs. |
ServiceBusOrchestratedSubscriber.RetryDelay * |
The timespan to delay (multiplied by delivery count) after each transient error is encountered; continues to lock message. |
{
"OktaHttpClient": {
"BaseUri": "https://dev-1234.okta.com"
},
"ServiceBusOrchestratedSubscriber": {
"AbandonOnTransient": true,
"RetryDelay": "00:00:30"
}
}
The external OKTA API must be accessed using an HttpClient
. Create a new OktaHttpClient.cs
file, then copy in the following contents. This inherits from the CoreEx TypedHttpClientBase<TSelf>
that encapsulates an underlying HttpClient
and adds extended fluent-style method-chaining capabilities and supporting SendAsync
logic. Also, of note is the usage of Result
which will enable simplified chaining and error handling.
The advantage of a typed client such as this, is that it can encapsulate all of the appropriate behavior, making it easier to understand and test. The constructor is using the injected SecuritySettings
to set the underlying HttpClient.BaseAddress
. Additionally, other DefaultOptions
can be set to ensure consistent behaviour of the underlying request/response; these can also be overridden per request where applicable.
For the purposes of deactivating a user the following OKTA capabilities are exposed:
Method | Description |
---|---|
GetIdentifier |
The underlying OKTA identifier is not persisted as part of the Employee data, only the related Email . This method will perform a search on the email and return the corresponding identifier where found. Also checks that the user is in a status that allows deactivation. See OKTA search user API documentation. |
DeactivateUser |
Deactivates the user by using the passed OKTA identifier. See OKTA deactivate user API documentation |
namespace MyEf.Hr.Security.Subscriptions;
public class OktaHttpClient : TypedHttpClientBase<OktaHttpClient>
{
public OktaHttpClient(HttpClient client) : base(client) => DefaultOptions.EnsureSuccess().ThrowKnownException();
/// <summary>
/// Gets the identifier for the email (see <see href="https://developer.okta.com/docs/reference/api/users/#list-users-with-search"/>).
/// </summary>
public async Task<Result<OktaUser>> GetUserAsync(Guid id, string email)
=> Result.GoFrom(await GetAsync<List<OktaUser>>($"/api/v1/users?search=profile.email eq \"{email}\"").ConfigureAwait(false))
.ThenAs(coll => coll.Count switch
{
0 => Result.NotFoundError($"Employee {id} with email {email} not found within OKTA."),
1 => Result.Ok(coll[0]),
_ => Result.NotFoundError($"Employee {id} with email {email} has multiple entries within OKTA.")
});
/// <summary>
/// Deactivates the specified user (<see href="https://developer.okta.com/docs/reference/api/users/#deactivate-user"/>)
/// </summary>
public async Task<Result> DeactivateUserAsync(string id) => Result.GoFrom(await PostAsync($"/api/v1/users/{id}/lifecycle/deactivate?sendEmail=true").ConfigureAwait(false));
/// <summary>
/// The basic OKTA user properties (see <see href="https://developer.okta.com/docs/reference/api/users/#user-object"/>)
/// </summary>
public class OktaUser
{
private static readonly string[] _statuses = ["STAGED", "PROVISIONED", "ACTIVE", "RECOVERY", "LOCKED_OUT", "PASSWORD_EXPIRED", "SUSPENDED"];
public string? Id { get; set; }
public string? Status { get; set; }
public bool IsDeactivatable => _statuses.Contains(Status, StringComparer.OrdinalIgnoreCase);
}
}
As discussed earlier a subscriber must implement IEventSubscriber
. To further enable the SubscriberBase
and SubscriberBase<TValue>
abstract base classes enable the requisite functionality. These provide overridable IErrorHandling
configuration, being the corresponding ErrorHandling
action per error type so that specific error handling can be defined per subscriber where needed explicitly.
Where using the SubscriberBase<TValue>
a ValueValidator
can be specified to automatically validate the value before the underlying logic is invoked. There is a ValueIsRequired
property to control whether the value is required; defaults to true
.
The underlying logic is implemented by overridding the applicable ReceiveAsync
method. The ServiceBusReceivedMessage
will have already been converted/deserialized to the appropriate EventData
or EventData<TValue>
depending.
To subscribe to the Employee terminated event raised from the MyEf.Hr
domain the following metdata will be used to uniquely match. These values will be declared using the EventSubscriberAttribute
for the subscriber.
Property | Value |
---|---|
EventData.Subject |
MyEf.Hr.Employee |
EventData.Action |
Terminated |
The required logic is as follows:
- The
EventData.Value
must be validated to ensureId
,Email
andTermination
data is present within the event/message data payload. A correspondingIValidator<T>
is required to enable. - Where any security-related error is returned from OKTA treat as transient and attempt an event/message retry (where possible).
- Where the data is not found then complete (versus dead letter) the event/message and write a warning to the log; the assumption being there is nothing that can be done where a corresponding user does not exist in OKTA.
- The business functionality, is to find the user by their email within OKTA, and where found, deactivate (in OKTA).
To implement, create a new Subscribers
folder witin the MyEf.Hr.Security.Subscriptions
project. Create a new EmployeeTerminatedSubcriber.cs
file, then copy in the following contents.
namespace MyEf.Hr.Security.Subscriptions.Subscribers;
[EventSubscriber("MyEf.Hr.Employee", "Terminated")]
public class EmployeeTerminatedSubcriber(OktaHttpClient okta, ILogger<EmployeeTerminatedSubcriber> logger) : SubscriberBase<Employee>(_employeeValidator)
{
private static readonly Validator<Employee> _employeeValidator = Validator.Create<Employee>()
.HasProperty(x => x.Id, p => p.Mandatory())
.HasProperty(x => x.Email, p => p.Mandatory().Email())
.HasProperty(x => x.Termination, p => p.Mandatory());
private readonly OktaHttpClient _okta = okta.ThrowIfNull();
private readonly ILogger _logger = logger.ThrowIfNull();
public override ErrorHandling SecurityHandling => ErrorHandling.Retry;
public override ErrorHandling NotFoundHandling => ErrorHandling.CompleteWithWarning;
public override Task<Result> ReceiveAsync(EventData<Employee> @event, EventSubscriberArgs args, CancellationToken cancellationToken)
=> Result.GoAsync(_okta.GetUserAsync(@event.Value.Id, @event.Value.Email!))
.When(user => !user.IsDeactivatable, user => _logger.LogWarning("Employee {EmployeeId} with email {Email} has User status of {UserStatus} and is therefore unable to be deactivated.", @event.Value.Id, @event.Value.Email, user.Status))
.WhenAsAsync(user => user.IsDeactivatable, user => _okta.DeactivateUserAsync(user.Id!));
}
There is generally no specific provision for the unit testing of Azure Functions, and related Azure Service Bus trigger, as it requires a dependent messaging subsystem, being Azure Service Bus. Also, at time of writing, there is not a standard means of hosting the Azure Function in process to verify in a unit testing context; especially where the likes of Dependency Injection (DI) is being leveraged.
However, given the complexity of logic that typically resides within there should be a strong desire to verfiy via unit tests. To enable Avanade has created UnitTestEx which supports the unit testing of Service Bus-trigger Azure Functions. This capability assumes that Dependency Injection (DI) is being leveraged, and will create the underlying host and enable a ServiceBusReceivedMessage
to be sent simulating the Azure Function runtime capability.
As the implemented subscriber is invoking OKTA, UnitTestEx also easily enables the mocking of HTTP requests/responses to verify both success and failure scenarios.
From Visual Studio, add a new Project named MyEf.Hr.Security.Test
(within the existing MyEf.Hr
solution) leveraging the NUnit Test Project project template.
Make the following house cleaning changes to the new project:
- Add the
UnitTestEx.NUnit
,CoreEx.UnitTesting.Azure.Functions
andCoreEx.UnitTesting.Azure.ServiceBus
NuGet packages as dependencies. - Add
MyHr.Ef.Security.Subscriptions
as a project reference dependency. - Rename
Usings.cs
toGlobalUsings.cs
and replace with content fromGlobalUsings
.
This test is to verfiy the SecuritySubscriberFunction
capabilities that are non underlying subscriber specific. The following will be tested.
- Verify that an invalid event/message is dead lettered.
- Verify that an non-subscriber event/message is completed silently.
Create a new SecuritySubscriberFunctionTest.cs
file, then replace with content from SecuritySubscriberFunctionTest
.
Review and execute the tests and ensure they all pass as expected.
This will be used by the upcoming subscriber test as an OKTA response, leverages an embedded resource as it is easier to maintain as a JSON file, than within the c# test code directly.
Create a new folder named Responses
, and add a new EmployeeTerminatedSubscriberTest_Success.json
file, then replace with content from EmployeeTerminatedSubscriberTest_Success
. Go to the file properties and set Build Action to Embedded Resource.
This test is to verify the EmployeeTerminatedSubscriber
capabilities, including the mocking of the underlying HTTP invocations to verify both success and failure scenarios.
To implement, create a new Subscribers
folder witin the MyEf.Hr.Security.Test
project. Create a new EmployeeTerminatedSubscriberTest.cs
file, then replace with content from EmployeeTerminatedSubscriberTest
.
Review and execute the tests and ensure they all pass as expected.
To achieve a basic test within a developers machine then the Function should be started using the emulator. Where corresponding events/messages exist within the Azure Service Bus they should start to be received and processed.
Note: that without an actual OKTA development account the OKTA endpoints cannot be consumed directly; i.e. they will always fail. For now, this is good enough to prove potential connectivity.
Where running locally ensure that the Azure Storage Emulator Azurite is installed and running; otherwise, errors will occur as the SingletonAttribute
added to the function requires access to storage to enable.
As Cloud Events are used as the serialization format, any events added manually will need to be in this format, example as follows:
{
"specversion": "1.0",
"id": "49d1b3f2-44f3-4ff2-9d75-847662a233e3",
"time": "2023-04-21T17:16:57.4014016Z",
"type": "myef.hr.common.entities.employee",
"source": "test",
"subject": "myef.hr.employee",
"action": "terminated",
"correlationid": "49d1b3f2-44f3-4ff2-9d75-847662a233e3",
"datacontenttype": "application/json",
"data": {
"id": "00000001-0000-0000-0000-000000000000",
"email": "[email protected]",
"termination": {}
}
}
Now that all the moving parts have been developed and configured an end-to-end integration test can be performed. This is initiated by invoking the HR domain-based APIs that result in an Azure Service Bus publish and subcribe.
The new Security domain that performs a Service Bus Subscribe of the Termination related events and proxies Okta (as our identity solution) automatically Deactivating the Employee's account is complete.
Next we will wrap up the sample - we are done - BOOM!