diff --git a/charts/docker-compose.yml b/charts/docker-compose.yml new file mode 100644 index 00000000..f7ba9166 --- /dev/null +++ b/charts/docker-compose.yml @@ -0,0 +1,25 @@ +version: "3.4" + +services: + read.eventstore.db: + image: eventstore/eventstore:23.10.0-bookworm-slim + environment: + - EVENTSTORE_CLUSTER_SIZE=1 + - EVENTSTORE_EXT_TCP_PORT=1113 + - EVENTSTORE_HTTP_PORT=2113 + - EVENTSTORE_INSECURE=true + - EVENTSTORE_ENABLE_EXTERNAL_TCP=true + - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true + ports: + - "1113:1113" + - "2113:2113" + sink.eventstore.db: + image: eventstore/eventstore:23.10.0-bookworm-slim + environment: + - EVENTSTORE_CLUSTER_SIZE=1 + - EVENTSTORE_INT_TCP_PORT=1113 + - EVENTSTORE_HTTP_PORT=2113 + - EVENTSTORE_INSECURE=true + - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true + ports: + - "2114:2113" \ No newline at end of file diff --git a/charts/replicator/templates/configmap.yaml b/charts/replicator/templates/configmap.yaml index cd46fbe5..ed502c59 100644 --- a/charts/replicator/templates/configmap.yaml +++ b/charts/replicator/templates/configmap.yaml @@ -3,8 +3,6 @@ data: "appsettings.yaml": | replicator: {{ toYaml .Values.replicator | indent 6 }} - checkpoint: - path: "./checkpoint" kind: ConfigMap metadata: name: {{ template "replicator.fullname" . }} diff --git a/charts/replicator/values.yaml b/charts/replicator/values.yaml index 8bc92242..c0a8cca1 100644 --- a/charts/replicator/values.yaml +++ b/charts/replicator/values.yaml @@ -30,6 +30,10 @@ replicator: scavenge: true filters: [] transform: null + checkpoint: + type: file + path: ./checkpoint + checkpointAfter: 1000 transformJs: |- # restartOnFailure: false diff --git a/charts/transform.js b/charts/transform.js new file mode 100644 index 00000000..590aa7a9 --- /dev/null +++ b/charts/transform.js @@ -0,0 +1,12 @@ +function transform(original) { + log.info( + "Transforming event {Type} from {Stream}", + original.EventType, original.Stream + ); + + return { + ...original, + Stream: `transformed${original.Stream}`, + EventType: `V2.${original.EventType}` + } +} \ No newline at end of file diff --git a/src/EventStore.Replicator.Mongo/MongoCheckpointStore.cs b/src/EventStore.Replicator.Mongo/MongoCheckpointStore.cs index 69696d01..205f773d 100644 --- a/src/EventStore.Replicator.Mongo/MongoCheckpointStore.cs +++ b/src/EventStore.Replicator.Mongo/MongoCheckpointStore.cs @@ -7,16 +7,17 @@ namespace EventStore.Replicator.Mongo; public class MongoCheckpointStore : ICheckpointStore { static readonly ILog Log = LogProvider.GetCurrentClassLogger(); - readonly string _id; - readonly int _checkpointAfter; + readonly string _instanceId; readonly IMongoCollection _collection; + readonly int _checkpointAfter; - public MongoCheckpointStore(string connectionString, string database, string id, int checkpointAfter) { - _id = id; + public MongoCheckpointStore(string connectionString, string database, string instanceId, int checkpointAfter) { + _instanceId = instanceId; _checkpointAfter = checkpointAfter; - var settings = MongoClientSettings.FromConnectionString(connectionString); - var db = new MongoClient(settings).GetDatabase(database); - _collection = db.GetCollection("checkpoint"); + + _collection = new MongoClient(MongoClientSettings.FromConnectionString(connectionString)) + .GetDatabase(database) + .GetCollection("checkpoint"); } int _counter; @@ -29,14 +30,14 @@ public async ValueTask LoadCheckpoint(CancellationToken cancellationTo } var doc = await _collection - .Find(x => x.Id == _id).Limit(1) + .Find(x => x.Id == _instanceId).Limit(1) .SingleOrDefaultAsync(cancellationToken) .ConfigureAwait(false); if (doc == null) { Log.Info("No checkpoint file found, starting from the beginning"); - await _collection.InsertOneAsync(new Checkpoint(_id, Position.Start), cancellationToken: cancellationToken) + await _collection.InsertOneAsync(new Checkpoint(_instanceId, Position.Start), cancellationToken: cancellationToken) .ConfigureAwait(false); return Position.Start; } @@ -60,8 +61,8 @@ public async ValueTask Flush(CancellationToken cancellationToken) { if (_lastPosition == null) return; await _collection.ReplaceOneAsync( - Builders.Filter.Eq(x => x.Id, _id), - new Checkpoint(_id, _lastPosition), + Builders.Filter.Eq(x => x.Id, _instanceId), + new Checkpoint(_instanceId, _lastPosition), cancellationToken: cancellationToken ).ConfigureAwait(false); } diff --git a/src/es-replicator/Settings/ReplicatorSettings.cs b/src/es-replicator/Settings/ReplicatorSettings.cs index 2667e0c5..50b5e2a8 100644 --- a/src/es-replicator/Settings/ReplicatorSettings.cs +++ b/src/es-replicator/Settings/ReplicatorSettings.cs @@ -12,11 +12,12 @@ public record EsdbSettings { } public record Checkpoint { - public string Path { get; init; } public string Type { get; init; } = "file"; - public int CheckpointAfter { get; init; } = 1000; + public string Path { get; init; } + public string ConnectionString { get; init; } public string Database { get; init; } = "replicator"; public string InstanceId { get; init; } = "default"; + public int CheckpointAfter { get; init; } = 1000; } public record SinkSettings : EsdbSettings { diff --git a/src/es-replicator/Startup.cs b/src/es-replicator/Startup.cs index 747e3249..68f2c480 100644 --- a/src/es-replicator/Startup.cs +++ b/src/es-replicator/Startup.cs @@ -109,7 +109,7 @@ static void RegisterCheckpointStore(Checkpoint settings, IServiceCollection serv ICheckpointStore store = settings.Type switch { "file" => new FileCheckpointStore(settings.Path, settings.CheckpointAfter), "mongo" => new MongoCheckpointStore( - settings.Path, + settings.ConnectionString, settings.Database, settings.InstanceId, settings.CheckpointAfter