Skip to content

Commit

Permalink
Remove blocking operation from "addAssignListener"
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed May 16, 2018
1 parent f84a924 commit dee3613
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ Dockerfile
.vscode/

/protocol/generated/
/examples/java/generated/
/examples/**
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id "org.springframework.boot" version "2.0.0.RELEASE" apply false
id "org.springframework.boot" version "2.0.2.RELEASE" apply false
id "com.google.protobuf" version "0.8.4" apply false
}

Expand Down Expand Up @@ -27,7 +27,7 @@ configure(subprojects.findAll { !it.name.startsWith("examples/") }) {
overriddenByDependencies = false

imports {
mavenBom 'org.springframework.boot:spring-boot-dependencies:2.0.0.RELEASE'
mavenBom 'org.springframework.boot:spring-boot-dependencies:2.0.2.RELEASE'
mavenBom 'org.testcontainers:testcontainers-bom:1.7.2'
}

Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.5.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.7-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.reactivestreams.Processor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.internals.DefaultKafkaReceiver;
Expand All @@ -21,7 +22,6 @@
import reactor.kafka.sender.SenderRecord;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -74,7 +74,7 @@ public Subscription subscribe(String topic, String groupId, Optional<String> aut
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "0");

return () -> Flux.create(assignmentsSink -> {
val revocations = new ConcurrentHashMap<Integer, UnicastProcessor<TopicPartition>>();
val revocations = new ConcurrentHashMap<Integer, Processor<TopicPartition, TopicPartition>>();

val receiverRef = new AtomicReference<DefaultKafkaReceiver<ByteBuffer, ByteBuffer>>();
val recordsFluxRef = new AtomicReference<Flux<Record>>();
Expand All @@ -98,51 +98,59 @@ public Subscription subscribe(String topic, String groupId, Optional<String> aut
)
)
.defaultIfEmpty(emptyMap())
.block(Duration.ofSeconds(10));
.cache();

val kafkaReceiver = receiverRef.get();
val recordFlux = recordsFluxRef.get();

for (val partition : partitions) {
DefaultKafkaReceiverAccessor.pause(kafkaReceiver, partition.topicPartition());

val lastAckedOffset = lastAckedOffsets.get(partition.topicPartition().partition());
if (lastAckedOffset != null) {
partition.seek(lastAckedOffset + 1);
}

val topicPartition = partition.topicPartition();
val partitionList = Arrays.asList(topicPartition);
val partitionNum = topicPartition.partition();

val requests = new AtomicLong();

val revocationsProcessor = UnicastProcessor.<TopicPartition>create();
revocations.put(partitionNum, revocationsProcessor);
val revocation = ReplayProcessor.<TopicPartition>create(1);
revocations.put(partitionNum, revocation);

assignmentsSink.next(
new DelegatingGroupedPublisher<>(
partitionNum,
recordFlux
.filter(it -> it.getPartition() == partitionNum)
.delayUntil(record -> {
if (requests.decrementAndGet() < 0) {
lastAckedOffsets
.delayUntil(offsets -> {
val lastAckedOffset = offsets.get(partition.topicPartition().partition());
if (lastAckedOffset != null) {
return kafkaReceiver.doOnConsumer(consumer -> {
if (requests.get() < 0) {
consumer.pause(partitionList);
}
consumer.seek(topicPartition, lastAckedOffset + 1);
return true;
});
} else {
return Mono.empty();
}
})
.doOnRequest(requested -> {
if (requests.addAndGet(requested) > 0) {
DefaultKafkaReceiverAccessor.resume(kafkaReceiver, topicPartition);
}
})
.takeUntilOther(revocationsProcessor)
.thenMany(Flux.defer(() -> recordFlux
.filter(it -> it.getPartition() == partitionNum)
.delayUntil(record -> {
if (requests.decrementAndGet() < 0) {
return kafkaReceiver.doOnConsumer(consumer -> {
if (requests.get() < 0) {
consumer.pause(partitionList);
}
return true;
});
} else {
return Mono.empty();
}
})
.doOnRequest(requested -> {
if (requests.addAndGet(requested) > 0) {
DefaultKafkaReceiverAccessor.resume(kafkaReceiver, topicPartition);
}
})
))
.takeUntilOther(revocation)
)
);
}
Expand Down

0 comments on commit dee3613

Please sign in to comment.