Skip to content

Commit

Permalink
Improve helm configuration of File system Checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
josephcummings committed Feb 12, 2024
1 parent f3dd8c1 commit 8ce9fa3
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 16 deletions.
25 changes: 25 additions & 0 deletions charts/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: "3.4"

services:
read.eventstore.db:
image: eventstore/eventstore:23.10.0-bookworm-slim
environment:
- EVENTSTORE_CLUSTER_SIZE=1
- EVENTSTORE_EXT_TCP_PORT=1113
- EVENTSTORE_HTTP_PORT=2113
- EVENTSTORE_INSECURE=true
- EVENTSTORE_ENABLE_EXTERNAL_TCP=true
- EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
ports:
- "1113:1113"
- "2113:2113"
sink.eventstore.db:
image: eventstore/eventstore:23.10.0-bookworm-slim
environment:
- EVENTSTORE_CLUSTER_SIZE=1
- EVENTSTORE_INT_TCP_PORT=1113
- EVENTSTORE_HTTP_PORT=2113
- EVENTSTORE_INSECURE=true
- EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
ports:
- "2114:2113"
2 changes: 0 additions & 2 deletions charts/replicator/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ data:
"appsettings.yaml": |
replicator:
{{ toYaml .Values.replicator | indent 6 }}
checkpoint:
path: "./checkpoint"
kind: ConfigMap
metadata:
name: {{ template "replicator.fullname" . }}
Expand Down
4 changes: 4 additions & 0 deletions charts/replicator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ replicator:
scavenge: true
filters: []
transform: null
checkpoint:
type: file
path: ./checkpoint
checkpointAfter: 1000
transformJs: |-
# restartOnFailure: false
Expand Down
12 changes: 12 additions & 0 deletions charts/transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
function transform(original) {
log.info(
"Transforming event {Type} from {Stream}",
original.EventType, original.Stream
);

return {
...original,
Stream: `transformed${original.Stream}`,
EventType: `V2.${original.EventType}`
}
}
23 changes: 12 additions & 11 deletions src/EventStore.Replicator.Mongo/MongoCheckpointStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ namespace EventStore.Replicator.Mongo;
public class MongoCheckpointStore : ICheckpointStore {
static readonly ILog Log = LogProvider.GetCurrentClassLogger();

readonly string _id;
readonly int _checkpointAfter;
readonly string _instanceId;
readonly IMongoCollection<Checkpoint> _collection;
readonly int _checkpointAfter;

public MongoCheckpointStore(string connectionString, string database, string id, int checkpointAfter) {
_id = id;
public MongoCheckpointStore(string connectionString, string database, string instanceId, int checkpointAfter) {
_instanceId = instanceId;
_checkpointAfter = checkpointAfter;
var settings = MongoClientSettings.FromConnectionString(connectionString);
var db = new MongoClient(settings).GetDatabase(database);
_collection = db.GetCollection<Checkpoint>("checkpoint");

_collection = new MongoClient(MongoClientSettings.FromConnectionString(connectionString))
.GetDatabase(database)
.GetCollection<Checkpoint>("checkpoint");
}

int _counter;
Expand All @@ -29,14 +30,14 @@ public async ValueTask<Position> LoadCheckpoint(CancellationToken cancellationTo
}

var doc = await _collection
.Find(x => x.Id == _id).Limit(1)
.Find(x => x.Id == _instanceId).Limit(1)
.SingleOrDefaultAsync(cancellationToken)
.ConfigureAwait(false);

if (doc == null) {
Log.Info("No checkpoint file found, starting from the beginning");

await _collection.InsertOneAsync(new Checkpoint(_id, Position.Start), cancellationToken: cancellationToken)
await _collection.InsertOneAsync(new Checkpoint(_instanceId, Position.Start), cancellationToken: cancellationToken)
.ConfigureAwait(false);
return Position.Start;
}
Expand All @@ -60,8 +61,8 @@ public async ValueTask Flush(CancellationToken cancellationToken) {
if (_lastPosition == null) return;

await _collection.ReplaceOneAsync(
Builders<Checkpoint>.Filter.Eq(x => x.Id, _id),
new Checkpoint(_id, _lastPosition),
Builders<Checkpoint>.Filter.Eq(x => x.Id, _instanceId),
new Checkpoint(_instanceId, _lastPosition),
cancellationToken: cancellationToken
).ConfigureAwait(false);
}
Expand Down
5 changes: 3 additions & 2 deletions src/es-replicator/Settings/ReplicatorSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ public record EsdbSettings {
}

public record Checkpoint {
public string Path { get; init; }
public string Type { get; init; } = "file";
public int CheckpointAfter { get; init; } = 1000;
public string Path { get; init; }
public string ConnectionString { get; init; }
public string Database { get; init; } = "replicator";
public string InstanceId { get; init; } = "default";
public int CheckpointAfter { get; init; } = 1000;
}

public record SinkSettings : EsdbSettings {
Expand Down
2 changes: 1 addition & 1 deletion src/es-replicator/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static void RegisterCheckpointStore(Checkpoint settings, IServiceCollection serv
ICheckpointStore store = settings.Type switch {
"file" => new FileCheckpointStore(settings.Path, settings.CheckpointAfter),
"mongo" => new MongoCheckpointStore(
settings.Path,
settings.ConnectionString,
settings.Database,
settings.InstanceId,
settings.CheckpointAfter
Expand Down

0 comments on commit 8ce9fa3

Please sign in to comment.