Skip to content

Commit

Permalink
Added functionality to return empty closed PushStread in case of failure
Browse files Browse the repository at this point in the history
  • Loading branch information
amitjoy committed Nov 14, 2024
1 parent 26598d1 commit 463b6c4
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.SimplePushEventSource;

import in.bytehue.messaging.mqtt5.provider.MessageReplyToPublisherProvider.Config;
import in.bytehue.messaging.mqtt5.provider.helper.SubscriptionAck;
import in.bytehue.messaging.mqtt5.provider.helper.ThreadFactoryBuilder;

//@formatter:off
Expand Down Expand Up @@ -156,9 +159,17 @@ public PushStream<Message> publishWithReplyMany(final Message requestMessage) {
@Override
public PushStream<Message> publishWithReplyMany(final Message requestMessage, final MessageContext replyToContext) {
final ReplyToDTO dto = new ReplyToDTO(requestMessage, replyToContext);

SubscriptionAck sub = null;
try {
sub = subscriber.replyToSubscribe(dto.subChannel, dto.pubChannel);
} catch (Exception e) {
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Message> eventSource = psp.createSimpleEventSource(Message.class);
eventSource.endOfStream();
return psp.createStream(eventSource);
}
// subscribe to the channel first
final PushStream<Message> stream = subscriber.replyToSubscribe(dto.subChannel, dto.pubChannel).stream()
final PushStream<Message> stream = sub.stream()
.filter(responseMessage -> matchCorrelationId(requestMessage, responseMessage));

// publish the request to the channel
Expand Down

0 comments on commit 463b6c4

Please sign in to comment.