Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable delays #103

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/docs-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Setup Hugo
uses: peaceiris/actions-hugo@v2
uses: peaceiris/actions-hugo@v3
with:
hugo-version: 'latest'
extended: true
-
name: Install Node
uses: actions/setup-node@v4
with:
node-version: 19.9.0
node-version: 20
-
name: Install Dependencies
run: npm install
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull-request-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Check pull requests
uses: EventStore/Automations/pr-check@master
2 changes: 1 addition & 1 deletion .github/workflows/replicator-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/replicator-publish-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Set up QEMU
uses: docker/setup-qemu-action@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/replicator-publish-helm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Install Helm
uses: azure/setup-helm@v4
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
WORKDIR /app

ARG RUNTIME
RUN curl -sL https://deb.nodesource.com/setup_19.x | bash - \
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
&& apt-get install -y --no-install-recommends nodejs \
&& npm install -g yarn

Expand All @@ -18,7 +18,7 @@
COPY ./src/es-replicator/ClientApp/yarn.lock ./src/es-replicator/ClientApp/
RUN cd ./src/es-replicator/ClientApp && yarn install

FROM builder as publish
FROM builder AS publish
ARG TARGETARCH
COPY ./src ./src
RUN dotnet publish ./src/es-replicator -c Release -a $TARGETARCH -clp:NoSummary --no-self-contained -o /app/publish
Expand All @@ -28,8 +28,8 @@
WORKDIR /app
COPY --from=publish /app/publish .

ENV ALLOWED_HOSTS "*"

Check warning on line 31 in Dockerfile

View workflow job for this annotation

GitHub Actions / docker

Legacy key/value format with whitespace separator should not be used

LegacyKeyValueFormat: "ENV key=value" should be used instead of legacy "ENV key value" format More info: https://docs.docker.com/go/dockerfile/rule/legacy-key-value-format/
ENV ASPNETCORE_URLS "http://*:5000"

Check warning on line 32 in Dockerfile

View workflow job for this annotation

GitHub Actions / docker

Legacy key/value format with whitespace separator should not be used

LegacyKeyValueFormat: "ENV key=value" should be used instead of legacy "ENV key value" format More info: https://docs.docker.com/go/dockerfile/rule/legacy-key-value-format/

EXPOSE 5000
ENTRYPOINT ["dotnet", "es-replicator.dll"]
46 changes: 24 additions & 22 deletions docs/content/docs/Configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,30 @@ The settings file has the `replicator` root level, all settings are children to

Available configuration options are:

| Option | Description |
|:----------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `replicator.reader.connectionString` | Connection string for the source cluster or instance |
| `replicator.reader.protocol` | Reader protocol (`tcp` or `grpc`) |
| `replicator.reader.pageSize` | Reader page size (only applicable for TCP protocol |
| `replicator.sink.connectionString` | Connection string for the target cluster or instance |
| `replicator.sink.protocol` | Writer protocol (`tcp` or `grpc`) |
| `replicator.sink.partitionCount` | Number of [partitioned]({{% ref "writers" %}}) concurrent writers |
| `replicator.sink.partitioner` | Custom JavaScript [partitioner]({{% ref "writers" %}}) |
| `replicator.sink.bufferSize` | Size of the sink buffer, `1000` events by default |
| `replicator.scavenge` | Enable real-time [scavenge]({{% ref "scavenge" %}}) |
| `replicator.runContinuously` | Set to `false` if you want Replicator to stop when it reaches the end of `$all` stream. Default is `true`, so the replication continues until you stop it explicitly. |
| `replicator.filters` | Add one or more of provided [filters]({{% ref "filters" %}}) |
| `replicator.transform` | Configure the [event transformation]({{% ref "Transforms" %}}) |
| `replicator.transform.bufferSize` | Size of the prepare buffer (filtering and transformations), `1000` events by default |
| `replicator.checkpoint.type` | Type of checkpoint store (`file` or `mongo`), `file` by default |
| `replicator.checkpoint.path` | The file path or connection string, `./checkpoint` by default |
| `replicator.checkpoint.checkpointAfter` | The number of events that must be replicated before a checkpoint is stored, `1000` events by default |
| `replicator.checkpoint.database` | The name of the Mongo database, `replicator` by default |
| `replicator.checkpoint.instanceId` | The name of the replicator instance to isolate checkpoints with in the Mongo database, `default` by default |
| `replicator.checkpoint.seeder.type` | Type of checkpoint seeder to use (`none` or `chaser`), `none` by default |
| `replicator.checkpoint.seeder.path` | The file path of the `chaser.chk`, empty by default |
| Option | Description |
|:---------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `replicator.reader.connectionString` | Connection string for the source cluster or instance |
| `replicator.reader.protocol` | Reader protocol (`tcp` or `grpc`) |
| `replicator.reader.pageSize` | Reader page size (only applicable for TCP protocol |
| `replicator.sink.connectionString` | Connection string for the target cluster or instance |
| `replicator.sink.protocol` | Writer protocol (`tcp` or `grpc`) |
| `replicator.sink.partitionCount` | Number of [partitioned]({{% ref "writers" %}}) concurrent writers |
| `replicator.sink.partitioner` | Custom JavaScript [partitioner]({{% ref "writers" %}}) |
| `replicator.sink.bufferSize` | Size of the sink buffer, `1000` events by default |
| `replicator.scavenge` | Enable real-time [scavenge]({{% ref "scavenge" %}}) |
| `replicator.runContinuously` | Set to `false` if you want Replicator to stop when it reaches the end of `$all` stream. Default is `true`, so the replication continues until you stop it explicitly. |
| `replicator.filters` | Add one or more of provided [filters]({{% ref "filters" %}}) |
| `replicator.transform` | Configure the [event transformation]({{% ref "Transforms" %}}) |
| `replicator.transform.bufferSize` | Size of the prepare buffer (filtering and transformations), `1000` events by default |
| `replicator.checkpoint.type` | Type of checkpoint store (`file` or `mongo`), `file` by default |
| `replicator.checkpoint.path` | The file path or connection string, `./checkpoint` by default |
| `replicator.checkpoint.checkpointAfter` | The number of events that must be replicated before a checkpoint is stored, `1000` events by default |
| `replicator.checkpoint.database` | The name of the Mongo database, `replicator` by default |
| `replicator.checkpoint.instanceId` | The name of the replicator instance to isolate checkpoints with in the Mongo database, `default` by default |
| `replicator.checkpoint.seeder.type` | Type of checkpoint seeder to use (`none` or `chaser`), `none` by default |
| `replicator.checkpoint.seeder.path` | The file path of the `chaser.chk`, empty by default |
| `replicator.restartDelayInSeconds` | The number of seconds between replication restarts, `5` by default |
| `replicator.reportMetricsFrequencyInSeconds` | The frequency at which to report certain metrics expressed in seconds, `5` by default |

## Enable verbose logging

Expand Down
18 changes: 10 additions & 8 deletions src/EventStore.Replicator/Replicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,16 @@ CancellationToken stoppingToken
break;
}

Log.Info("Will restart in 5 sec");
Log.Info("Will restart in {0} sec", replicatorOptions.RestartDelay.TotalSeconds);

try {
await Task.Delay(5000, stoppingToken);
}
catch (OperationCanceledException) {
// stopping now
break;
if (replicatorOptions.RestartDelay != TimeSpan.Zero) {
try {
await Task.Delay(replicatorOptions.RestartDelay, stoppingToken);
}
catch (OperationCanceledException) {
// stopping now
break;
}
}
}
}
Expand Down Expand Up @@ -172,7 +174,7 @@ async Task Report() {
ReplicationMetrics.LastSourcePosition.Set(position.Value);
}

await Task.Delay(5000, stoppingToken).ConfigureAwait(false);
await Task.Delay(replicatorOptions.ReportMetricsFrequency, stoppingToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException) {
Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Replicator/ReplicatorOptions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace EventStore.Replicator;

public record ReplicatorOptions(bool RestartOnFailure, bool RunContinuously);
public record ReplicatorOptions(bool RestartOnFailure, bool RunContinuously, TimeSpan RestartDelay, TimeSpan ReportMetricsFrequency);
18 changes: 10 additions & 8 deletions src/es-replicator/Settings/ReplicatorSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ public record Filter {
}

public record Replicator {
public EsdbSettings Reader { get; init; }
public SinkSettings Sink { get; init; }
public bool Scavenge { get; init; }
public bool RestartOnFailure { get; init; } = true;
public bool RunContinuously { get; init; } = true;
public Checkpoint Checkpoint { get; init; } = new();
public TransformSettings Transform { get; init; } = new();
public Filter[] Filters { get; init; }
public EsdbSettings Reader { get; init; }
public SinkSettings Sink { get; init; }
public bool Scavenge { get; init; }
public bool RestartOnFailure { get; init; } = true;
public bool RunContinuously { get; init; } = true;
public int RestartDelayInSeconds { get; init; } = 5;
public int ReportMetricsFrequencyInSeconds { get; init; } = 5;
public Checkpoint Checkpoint { get; init; } = new();
public TransformSettings Transform { get; init; } = new();
public Filter[] Filters { get; init; }
}

public static class ConfigExtensions {
Expand Down
7 changes: 6 additions & 1 deletion src/es-replicator/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ public static void ConfigureServices(WebApplicationBuilder builder) {
);

services.AddSingleton(
new ReplicatorOptions(replicatorOptions.RestartOnFailure, replicatorOptions.RunContinuously)
new ReplicatorOptions(
replicatorOptions.RestartOnFailure,
replicatorOptions.RunContinuously,
TimeSpan.FromSeconds(replicatorOptions.RestartDelayInSeconds),
TimeSpan.FromSeconds(replicatorOptions.ReportMetricsFrequencyInSeconds)
)
);

RegisterCheckpointStore(replicatorOptions.Checkpoint, services);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public async Task Verify() {
new PreparePipelineOptions(null, null),
new ChaserCheckpointSeeder(chaser_chk_copy, store),
store,
new ReplicatorOptions(false, false),
new ReplicatorOptions(false, false, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)),
CancellationToken.None
);

Expand Down
2 changes: 1 addition & 1 deletion test/EventStore.Replicator.Tests/CustomPartitionerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task ShouldKeepOrderWithinPartition() {
prepareOptions,
new NoCheckpointSeeder(),
_fixture.CheckpointStore,
new ReplicatorOptions(false, false),
new ReplicatorOptions(false, false, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)),
CancellationToken.None
);
await Timing.Measure("Replication", replication);
Expand Down