Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed Nov 13, 2023
1 parent a66fb59 commit 170aee3
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 39 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ on:
jobs:
nuget:
runs-on: self-hosted
# container:
# image: mcr.microsoft.com/dotnet/sdk:8.0
env:
DOTNET_INSTALL_DIR: ~/.dotnet

Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ on:

jobs:
nuget:
runs-on: ubuntu-latest
runs-on: self-hosted
env:
DOTNET_INSTALL_DIR: ~/.dotnet

steps:
- uses: actions/checkout@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static class StoreFunctions {
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Append events result</returns>
/// <exception cref="Exception">Any exception that occurred in the event store</exception>
/// <exception cref="OptimisticConcurrencyException">Gets thrown if the expected stream version mismatches with <see cref="originalVersion"/></exception>
/// <exception cref="OptimisticConcurrencyException">Gets thrown if the expected stream version mismatches with the given original stream version</exception>
public static async Task<AppendEventsResult> Store(
this IEventWriter eventWriter,
StreamName streamName,
Expand Down Expand Up @@ -65,7 +65,7 @@ StreamEvent ToStreamEvent(object evt, int position) {
/// <param name="cancellationToken">Cancellation token</param>
/// <typeparam name="T">Aggregate type</typeparam>
/// <returns>Append event result</returns>
/// <exception cref="OptimisticConcurrencyException{T}"></exception>
/// <exception cref="OptimisticConcurrencyException{T}">Gets thrown if the expected stream version mismatches with the given original stream version</exception>
public static async Task<AppendEventsResult> Store<T>(
this IEventWriter eventWriter,
StreamName streamName,
Expand Down
18 changes: 16 additions & 2 deletions src/Core/src/Eventuous.Producers/RegistrationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ namespace Microsoft.Extensions.DependencyInjection;

[PublicAPI]
public static class RegistrationExtensions {
[Obsolete("Use AddProducer instead")]
public static void AddEventProducer<T>(this IServiceCollection services, T producer) where T : class, IEventProducer {
services.AddProducer(producer);
}

public static void AddProducer<T>(this IServiceCollection services, T producer) where T : class, IEventProducer {
services.TryAddSingleton(producer);
services.TryAddSingleton<IEventProducer>(sp => sp.GetRequiredService<T>());

Expand All @@ -20,13 +25,22 @@ public static void AddEventProducer<T>(this IServiceCollection services, T produ
}
}

public static void AddEventProducer<T>(this IServiceCollection services, Func<IServiceProvider, T> getProducer)
where T : class, IEventProducer {
[Obsolete("Use AddProducer instead")]
public static void AddEventProducer<T>(this IServiceCollection services, Func<IServiceProvider, T> getProducer) where T : class, IEventProducer {
services.AddProducer(getProducer);
}

public static void AddProducer<T>(this IServiceCollection services, Func<IServiceProvider, T> getProducer) where T : class, IEventProducer {
services.TryAddSingleton(getProducer);
AddCommon<T>(services);
}

[Obsolete("Use AddProducer instead")]
public static void AddEventProducer<T>(this IServiceCollection services) where T : class, IEventProducer {
services.AddProducer<T>();
}

public static void AddProducer<T>(this IServiceCollection services) where T : class, IEventProducer {
services.TryAddSingleton<T>();
AddCommon<T>(services);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@ public partial class MongoOperationBuilder<TEvent, T>
public InsertManyBuilder InsertMany => new();
public DeleteOneBuilder DeleteOne => new();
public DeleteManyBuilder DeleteMany => new();
public BulkWriteBuilder Bulk => new();
public BulkWriteBuilder Bulk => new();

public class MongoBulkOperationBuilders {
MongoBulkOperationBuilders() {}
MongoBulkOperationBuilders() { }
internal static MongoBulkOperationBuilders Instance { get; } = new();
// ReSharper disable once MemberCanBeMadeStatic.Global
public UpdateOneBuilder UpdateOne => new();
public UpdateOneBuilder UpdateOne => new();
// ReSharper disable once MemberCanBeMadeStatic.Global
public UpdateManyBuilder UpdateMany => new();
public UpdateManyBuilder UpdateMany => new();
// ReSharper disable once MemberCanBeMadeStatic.Global
public InsertOneBuilder InsertOne => new();
public InsertOneBuilder InsertOne => new();
// ReSharper disable once MemberCanBeMadeStatic.Global
public DeleteOneBuilder DeleteOne => new();
public DeleteOneBuilder DeleteOne => new();
// ReSharper disable once MemberCanBeMadeStatic.Global
public DeleteManyBuilder DeleteMany => new();
public DeleteManyBuilder DeleteMany => new();
}

public interface IMongoProjectorBuilder {
ProjectTypedEvent<T, TEvent> Build();
}

public interface IMongoBulkBuilderFactory {
BuildWriteModel<T, TEvent> GetBuilder();
}
Expand All @@ -46,7 +46,7 @@ public class FilterBuilder {
public Func<IMessageConsumeContext<TEvent>, FilterDefinition<T>> GetFilter => Ensure.NotNull(_filterFunc, "Filter function");

public void Filter(BuildFilter<TEvent, T> buildFilter)
=> _filterFunc = evt => buildFilter(evt, Builders<T>.Filter);
=> _filterFunc = evt => buildFilter(evt, Builders<T>.Filter);

public void Filter(Func<IMessageConsumeContext<TEvent>, T, bool> filter)
=> _filterFunc = evt => new ExpressionFilterDefinition<T>(x => filter(evt, x));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public BulkWriteBuilder Configure(Action<BulkWriteOptions> configure) {
ProjectTypedEvent<T, TEvent> IMongoProjectorBuilder.Build() =>
GetHandler(async (ctx, collection, token) => {
var options = Options<BulkWriteOptions>.New(_configureOptions);
var models = await _builders.Select(build => build(ctx)).WhenAll();
await collection.BulkWriteAsync(models, options, token);
var models = await _builders.Select(build => build(ctx)).WhenAll().NoContext();
await collection.BulkWriteAsync(models, options, token).NoContext();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,20 @@ public UpdateOneBuilder DefaultId()
ProjectTypedEvent<T, TEvent> IMongoProjectorBuilder.Build()
=> GetHandler(
async (ctx, collection, token) => {
var (update, options) = await GetUpdateWithOptions(ctx);
var (update, options) = await GetUpdateWithOptions(ctx).NoContext();
var filter = FilterBuilder.GetFilter(ctx);
// TODO: Make this an option (idempotence based on commit position)
// var filter = Builders<T>.Filter.And(
// Builders<T>.Filter.Lt(x => x.Position, ctx.GlobalPosition),
// FilterBuilder.GetFilter(ctx)
// );

await collection
.UpdateOneAsync(
filter,
update,
options,
token
);
await collection.UpdateOneAsync(filter, update, options, token).NoContext();
}
);

BuildWriteModel<T, TEvent> IMongoBulkBuilderFactory.GetBuilder() => async ctx => {
var (update, options) = await GetUpdateWithOptions(ctx);
var (update, options) = await GetUpdateWithOptions(ctx).NoContext();

return new UpdateOneModel<T>(FilterBuilder.GetFilter(ctx), update) {
Collation = options.Collation,
Expand All @@ -59,20 +53,14 @@ public class UpdateManyBuilder : UpdateBuilder<UpdateManyBuilder>, IMongoProject
ProjectTypedEvent<T, TEvent> IMongoProjectorBuilder.Build()
=> GetHandler(
async (ctx, collection, token) => {
var (update, options) = await GetUpdateWithOptions(ctx);

await collection.UpdateManyAsync(
FilterBuilder.GetFilter(ctx),
update,
options,
token
)
.NoContext();
var (update, options) = await GetUpdateWithOptions(ctx).NoContext();

await collection.UpdateManyAsync(FilterBuilder.GetFilter(ctx), update, options, token).NoContext();
}
);

BuildWriteModel<T, TEvent> IMongoBulkBuilderFactory.GetBuilder() => async ctx => {
var (update, options) = await GetUpdateWithOptions(ctx);
var (update, options) = await GetUpdateWithOptions(ctx).NoContext();

return new UpdateManyModel<T>(FilterBuilder.GetFilter(ctx), update) {
Collation = options.Collation,
Expand Down

0 comments on commit 170aee3

Please sign in to comment.