Skip to content

Commit

Permalink
Added base mock implementation for unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Jan 24, 2025
1 parent fe2fc99 commit 08418c5
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 50 deletions.
7 changes: 7 additions & 0 deletions topic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,21 @@
<artifactId>zstd-jni</artifactId>
<version>1.5.2-5</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>tech.ydb.test</groupId>
<artifactId>ydb-junit4-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
65 changes: 28 additions & 37 deletions topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;

Expand All @@ -27,18 +26,21 @@ public abstract class GrpcStreamRetrier {
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
protected final AtomicBoolean isStopped = new AtomicBoolean(false);

private final Logger logger;
private final ScheduledExecutorService scheduler;
private final RetryMode retryMode;
private final RetryPolicy retryPolicy = new DefaultRetryPolicy();
private final AtomicInteger retry = new AtomicInteger(-1);

protected GrpcStreamRetrier(RetryMode retryMode, ScheduledExecutorService scheduler) {
private volatile boolean connected = false;
private volatile int retryNumber = 0;

protected GrpcStreamRetrier(Logger logger, RetryMode retryMode, ScheduledExecutorService scheduler) {
this.logger = logger;
this.retryMode = retryMode;
this.scheduler = scheduler;
this.id = generateRandomId(ID_LENGTH);
}

protected abstract Logger getLogger();
protected abstract String getStreamName();
protected abstract void onStreamReconnect();
protected abstract void onShutdown(String reason);
Expand All @@ -51,34 +53,34 @@ protected static String generateRandomId(int length) {
.toString();
}

private void tryScheduleReconnect(int retryNumber) {
private void tryScheduleReconnect() {
if (!isReconnecting.compareAndSet(false, true)) {
getLogger().info("[{}] should reconnect {} stream, but reconnect is already in progress", id,
logger.info("[{}] should reconnect {} stream, but reconnect is already in progress", id,
getStreamName());
return;
}

retry.set(retryNumber);
long delay = retryPolicy.nextRetryMs(retryNumber, 0);
getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, retryNumber, getStreamName(), delay);
logger.warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, retryNumber, getStreamName(), delay);
try {
scheduler.schedule(this::reconnect, delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException exception) {
String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " +
"Shutting down " + getStreamName();
getLogger().error(errorMessage);
logger.error(errorMessage);
shutdownImpl(errorMessage);
}
}

protected void resetRetries() {
retry.set(0);
retryNumber = 0;
connected = true;
}

void reconnect() {
getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), retry.get());
logger.info("[{}] {} reconnect #{} started", id, getStreamName(), retryNumber);
if (!isReconnecting.compareAndSet(true, false)) {
getLogger().warn("Couldn't reset reconnect flag. Shouldn't happen");
logger.warn("Couldn't reset reconnect flag. Shouldn't happen");
}
onStreamReconnect();
}
Expand All @@ -88,7 +90,7 @@ protected CompletableFuture<Void> shutdownImpl() {
}

protected CompletableFuture<Void> shutdownImpl(String reason) {
getLogger().info("[{}] Shutting down {}"
logger.info("[{}] Shutting down {}"
+ (reason == null || reason.isEmpty() ? "" : " with reason: " + reason), id, getStreamName());
isStopped.set(true);
return CompletableFuture.runAsync(() -> {
Expand All @@ -97,58 +99,47 @@ protected CompletableFuture<Void> shutdownImpl(String reason) {
}

protected void onSessionClosed(Status status, Throwable th) {
getLogger().info("[{}] onSessionClosed called", id);
logger.info("[{}] onSessionClosed called", id);

if (th != null) {
getLogger().error("[{}] Exception in {} stream session: ", id, getStreamName(), th);
logger.warn("[{}] Exception in {} stream session: ", id, getStreamName(), th);
} else {
if (status.isSuccess()) {
if (isStopped.get()) {
getLogger().info("[{}] {} stream session closed successfully", id, getStreamName());
logger.info("[{}] {} stream session closed successfully", id, getStreamName());
return;
} else {
getLogger().warn("[{}] {} stream session was closed on working {}", id, getStreamName(),
logger.warn("[{}] {} stream session was closed on working {}", id, getStreamName(),
getStreamName());
}
} else {
getLogger().warn("[{}] Error in {} stream session: {}", id, getStreamName(), status);
logger.warn("[{}] Error in {} stream session: {}", id, getStreamName(), status);
}
}

if (isStopped.get()) {
getLogger().info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName());
logger.info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName());
return;
}

int currentRetry = nextRetryNumber();
if (currentRetry > 0) {
tryScheduleReconnect(currentRetry);
if (retryMode == RetryMode.ALWAYS || (retryMode == RetryMode.RECOVER && connected)) {
retryNumber++;
tryScheduleReconnect();
return;
}

if (!isStopped.compareAndSet(false, true)) {
getLogger().warn("[{}] Stopped by retry mode {} after {} retries. But {} is already shut down.", id,
retryMode, currentRetry, getStreamName());
logger.warn("[{}] Stopped by retry mode {} after {} retries. But {} is already shut down.", id,
retryMode, retryNumber, getStreamName());
return;
}

String errorMessage = "[" + id + "] Stopped by retry mode " + retryMode + " after " + currentRetry +
String errorMessage = "[" + id + "] Stopped by retry mode " + retryMode + " after " + retryNumber +
" retries. Shutting down " + getStreamName();
getLogger().error(errorMessage);
logger.warn(errorMessage);
shutdownImpl(errorMessage);
}

private int nextRetryNumber() {
int next = retry.get() + 1;
switch (retryMode) {
case RECOVER: return next;
case ALWAYS: return Math.max(1, next);
case NONE:
default:
return 0;
}
}

private static class DefaultRetryPolicy extends ExponentialBackoffRetry {

private static final int EXP_BACKOFF_BASE_MS = 256;
Expand Down
7 changes: 1 addition & 6 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
private final String consumerName;

public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
super(settings.getRetryMode(), topicRpc.getScheduler());
super(logger, settings.getRetryMode(), topicRpc.getScheduler());
this.topicRpc = topicRpc;
this.settings = settings;
this.session = new ReadSessionImpl();
Expand Down Expand Up @@ -88,11 +88,6 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
logger.info(message.toString());
}

@Override
protected Logger getLogger() {
return logger;
}

@Override
protected String getStreamName() {
return "Reader";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;

public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
super(settings.getRetryMode(), topicRpc.getScheduler());
super(logger, settings.getRetryMode(), topicRpc.getScheduler());
this.topicRpc = topicRpc;
this.settings = settings;
this.session = new WriteSessionImpl();
Expand All @@ -81,11 +81,6 @@ public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressi
logger.info(message);
}

@Override
protected Logger getLogger() {
return logger;
}

@Override
protected String getStreamName() {
return "Writer";
Expand Down
Loading

0 comments on commit 08418c5

Please sign in to comment.