Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use futures in the Source poll call and logging enhancements #28

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>io.github.jaredpetersen</groupId>
<artifactId>kafka-connect-redis</artifactId>
<version>1.2.3</version>
<version>1.2.3-futures</version>
<packaging>jar</packaging>

<name>Kafka Redis Connector (Sink and Source)</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -29,7 +33,7 @@
*/
@Slf4j
public class RedisSourceTask extends SourceTask {
private static final long MAX_POLL_SIZE = 10_000L;
private long maxPollSize;

private RedisClient redisStandaloneClient;
private StatefulRedisPubSubConnection<String, String> redisStandalonePubSubConnection;
Expand Down Expand Up @@ -57,8 +61,12 @@ public void start(Map<String, String> props) {
throw new ConnectException("task configuration error", configException);
}

maxPollSize = config.getMaxPollRecords();
LOG.info("Using max poll size of {}", maxPollSize);

// Set up the subscriber for Redis
if (config.isRedisClusterEnabled()) {
LOG.info("Creating cluster Redis client");
redisClusterClient = RedisClusterClient.create(config.getRedisUri());
redisClusterClient.setOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
Expand Down Expand Up @@ -96,38 +104,44 @@ public void start(Map<String, String> props) {

@Override
public List<SourceRecord> poll() {
final List<SourceRecord> sourceRecords = new ArrayList<>();

while (true) {
final RedisMessage redisMessage = redisSubscriber.poll();

// No more events left, stop iterating
if (redisMessage == null) {
break;
}

final SourceRecord sourceRecord;

try {
sourceRecord = recordConverter.convert(redisMessage);
}
catch (Exception exception) {
throw new ConnectException("failed to convert redis message", exception);
}

sourceRecords.add(sourceRecord);

// Subscription events may come in faster than we can iterate over them here so return early once we hit the max
if (sourceRecords.size() >= MAX_POLL_SIZE) {
break;
}
final AtomicBoolean breakTask = new AtomicBoolean(false);
final ConcurrentLinkedQueue<SourceRecord> sourceRecords = new ConcurrentLinkedQueue<>();

while (!breakTask.get()) {
final CompletableFuture<RedisMessage> redisMessageFut =
CompletableFuture.supplyAsync(() -> redisSubscriber.poll());
CompletableFuture<SourceRecord> sourceRecordFut =
redisMessageFut.thenApply(
redisMessage ->
Optional.ofNullable(redisMessage)
.map(recordConverter::convert)
.orElseGet(() -> {
// No more events left, stop iterating
breakTask.set(true);
return null;
}));

CompletableFuture<SourceRecord> recordAddFut =
sourceRecordFut.whenComplete(
(sourceRecord, exception) -> {
if (exception != null) {
throw new ConnectException("failed to convert redis message", exception);
}
else if (sourceRecord != null) {
sourceRecords.offer(sourceRecord);
if (sourceRecords.size() >= maxPollSize) {
breakTask.set(true);
}
}
});
recordAddFut.join();
}

if (sourceRecords.size() >= 1) {
LOG.info("Writing {} record(s) to kafka", sourceRecords.size());
}

return sourceRecords;
return new ArrayList<>(sourceRecords);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ public class RedisSourceConfig extends AbstractConfig {
private static final String REDIS_CHANNELS_PATTERN_ENABLED_DOC = "Redis channel(s) utilize patterns.";
private final boolean redisChannelPatternEnabled;

public static final String MAX_POLL_RECORDS = "max.poll.records";
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single "
+ "call to poll(). Note, that <code>max.poll.records</code> does not impact the underlying fetching behavior. "
+ "The consumer will cache the records from each fetch request and returns them incrementally from each poll.";
private static final long MAX_POLL_RECORDS_DEFAULT = 10_000;
private final long maxPollRecords;

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(
TOPIC,
Expand All @@ -55,7 +62,13 @@ public class RedisSourceConfig extends AbstractConfig {
REDIS_CHANNELS_PATTERN_ENABLED,
Type.BOOLEAN,
Importance.HIGH,
REDIS_CHANNELS_PATTERN_ENABLED_DOC);
REDIS_CHANNELS_PATTERN_ENABLED_DOC)
.define(
MAX_POLL_RECORDS,
Type.LONG,
MAX_POLL_RECORDS_DEFAULT,
Importance.MEDIUM,
MAX_POLL_RECORDS_DOC);

/**
* Configuration for Redis Source.
Expand All @@ -70,6 +83,7 @@ public RedisSourceConfig(final Map<?, ?> originals) {
this.redisClusterEnabled = getBoolean(REDIS_CLUSTER_ENABLED);
this.redisChannels = getList(REDIS_CHANNELS);
this.redisChannelPatternEnabled = getBoolean(REDIS_CHANNELS_PATTERN_ENABLED);
this.maxPollRecords = getLong(MAX_POLL_RECORDS);
}

/**
Expand Down Expand Up @@ -116,4 +130,13 @@ public List<String> getRedisChannels() {
public boolean isRedisChannelPatternEnabled() {
return this.redisChannelPatternEnabled;
}

/**
* Get maximum records in a batch.
*
* @return Maximum records in a batch.
*/
public long getMaxPollRecords() {
return this.maxPollRecords;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -11,6 +12,7 @@
public class RecordConverter {
private static final Schema KEY_SCHEMA = SchemaBuilder.struct()
.name("io.github.jaredpetersen.kafkaconnectredis.RedisSubscriptionEventKey")
.field("nodeId", Schema.OPTIONAL_STRING_SCHEMA)
.field("channel", Schema.STRING_SCHEMA)
.field("pattern", Schema.OPTIONAL_STRING_SCHEMA);
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
Expand Down Expand Up @@ -43,6 +45,7 @@ public SourceRecord convert(RedisMessage redisMessage) {
final Struct key = new Struct(KEY_SCHEMA)
.put("channel", redisMessage.getChannel())
.put("pattern", redisMessage.getPattern());
Optional.ofNullable(redisMessage.getNodeId()).map(n -> key.put("nodeId", n));
final Struct value = new Struct(VALUE_SCHEMA)
.put("message", redisMessage.getMessage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@Value
@Builder
public class RedisMessage {
String nodeId;
String pattern;
String channel;
String message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,40 @@
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.extern.slf4j.Slf4j;

@Slf4j
class RedisClusterListener extends RedisListener implements RedisClusterPubSubListener<String, String> {
public RedisClusterListener(ConcurrentLinkedQueue<RedisMessage> messageQueue) {
super(messageQueue);
}

@Override
public void message(RedisClusterNode node, String channel, String message) {
message(channel, message);
if (LOG.isDebugEnabled()) {
LOG.debug("Received channel {} from node {}", channel, node.getNodeId());
}
final RedisMessage redisMessage = RedisMessage.builder()
.nodeId(node.getNodeId())
.channel(channel)
.message(message)
.build();

messageQueue.add(redisMessage);
}

@Override
public void message(RedisClusterNode node, String pattern, String channel, String message) {
message(pattern, channel, message);
if (LOG.isDebugEnabled()) {
LOG.debug("Received channel {} from node {}", channel, node.getNodeId());
}
final RedisMessage redisMessage = RedisMessage.builder()
.nodeId(node.getNodeId())
.pattern(pattern)
.channel(channel)
.message(message)
.build();

messageQueue.add(redisMessage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

@Slf4j
abstract class RedisListener {
private final ConcurrentLinkedQueue<RedisMessage> messageQueue;
protected final ConcurrentLinkedQueue<RedisMessage> messageQueue;

public RedisListener(ConcurrentLinkedQueue<RedisMessage> messageQueue) {
this.messageQueue = messageQueue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.jaredpetersen.kafkaconnectredis.source.listener;

import java.time.Instant;
import java.util.UUID;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -13,7 +14,9 @@
class RecordConverterTest {
@Test
void convertTransformsRedisMessageToSourceRecord() {
final String nodeId = UUID.randomUUID().toString();
final RedisMessage redisMessage = RedisMessage.builder()
.nodeId(nodeId)
.channel("mychannel")
.pattern("mypattern")
.message("some message")
Expand All @@ -28,6 +31,7 @@ void convertTransformsRedisMessageToSourceRecord() {
assertEquals(topic, sourceRecord.topic());
assertNull(sourceRecord.kafkaPartition());
assertEquals(Schema.Type.STRUCT, sourceRecord.keySchema().type());
assertEquals(redisMessage.getNodeId(), ((Struct) sourceRecord.key()).getString("nodeId"));
assertEquals(redisMessage.getChannel(), ((Struct) sourceRecord.key()).getString("channel"));
assertEquals(redisMessage.getPattern(), ((Struct) sourceRecord.key()).getString("pattern"));
assertEquals(Schema.Type.STRUCT, sourceRecord.valueSchema().type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ void messageAddsChannelMessageToQueue() {
final ConcurrentLinkedQueue<RedisMessage> queue = new ConcurrentLinkedQueue<>();
final RedisClusterListener redisClusterListener = new RedisClusterListener(queue);

final String nodeId = UUID.randomUUID().toString();
final String channel = "books";
final String message = "the best book ever";

redisClusterListener.message(
RedisClusterNode.of(UUID.randomUUID().toString()),
RedisClusterNode.of(nodeId),
channel,
message);

final RedisMessage expectedRedisMessage = RedisMessage.builder()
.nodeId(nodeId)
.channel(channel)
.message(message)
.build();
Expand All @@ -40,17 +42,19 @@ void messageAddsPatternMessageToQueue() {
final ConcurrentLinkedQueue<RedisMessage> queue = new ConcurrentLinkedQueue<>();
final RedisClusterListener redisClusterListener = new RedisClusterListener(queue);

final String nodeId = UUID.randomUUID().toString();
final String pattern = "b*";
final String channel = "books";
final String message = "the best book ever";

redisClusterListener.message(
RedisClusterNode.of(UUID.randomUUID().toString()),
RedisClusterNode.of(nodeId),
pattern,
channel,
message);

final RedisMessage expectedRedisMessage = RedisMessage.builder()
.nodeId(nodeId)
.pattern(pattern)
.channel(channel)
.message(message)
Expand Down