Skip to content

Commit

Permalink
Merge pull request #86 from ankitmashu/fix/sdm
Browse files Browse the repository at this point in the history
subs queue acknowledgement
  • Loading branch information
ananjaykumar2 authored May 28, 2024
2 parents ae1fc6e + 4470974 commit 2c71067
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ public Future<JsonObject> get(String key) {
JsonObject resultJson = new JsonObject().put("results", jsonArray);
promise.complete(resultJson);
} else {
LOGGER.info("key :{} : not found in cache/database server", key);
promise.fail("key not found");
LOGGER.info("Subscriber :{} : not found in cache/database server", key);
promise.fail("Subscriber doesn't exist/expired");
}
})
.onFailure(
failureHandler -> {
promise.fail("key not found");
promise.fail("Subscriber doesn't exist/expired");
});
}
return promise.future();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ private Future<JsonObject> databaseOperations(JsonObject queries) {
Promise<JsonObject> promise = Promise.promise();
LOGGER.debug("Queries are : {}", queries.encode());
Future<JsonObject> insertInPostgres = postgresService.executeWriteQuery(queries);
LOGGER.debug(
"Queries from origin is {} , Query : {}", queries.getString(ORIGIN), queries.encode());
insertInPostgres
.onSuccess(
insertInImmudbHandler -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,57 +34,66 @@ public SubscriptionMonitoringConsumer(
this.msgService = msgService;
}


@Override
public void start() {
this.consume();
}

private void consume() {
client.start().onSuccess(successHandler -> {
client.basicConsumer(SUBSCRIPTION_MONITORING_QUEUE, options, receiveResultHandler -> {
if (receiveResultHandler.succeeded()) {
RabbitMQConsumer mqConsumer = receiveResultHandler.result();
mqConsumer.handler(message -> {
mqConsumer.pause();
LOGGER.debug("message consumption paused.");
long deliveryTag = message.envelope().getDeliveryTag();
Buffer body = message.body();
if (body != null) {
LOGGER.info("Subscription message received");
boolean isArrayReceived = isJsonArray(body);
LOGGER.debug("is message array received : {}", isArrayReceived);
if (isArrayReceived) {
JsonArray jsonArrayBody = body.toJsonArray();
jsonArrayBody.forEach(
json -> {
Future.future(e -> messagePush((JsonObject) json));
});
client.basicAck(deliveryTag, false);
mqConsumer.resume();
} else {
messagePush(new JsonObject(body)).onSuccess(
successResult -> {
LOGGER.info("Latest message published in databases.");
client.basicAck(deliveryTag, false);
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
})
.onFailure(
failureHandler -> {
LOGGER.error("Error while publishing messages for processing "
+ failureHandler.getMessage());
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
});
}
}
});
}
});
}).onFailure(failureHandler -> {
LOGGER.fatal("Rabbit client startup failed for Latest message Q consumer.");
});
client
.start()
.onSuccess(
successHandler -> {
client.basicConsumer(
SUBSCRIPTION_MONITORING_QUEUE,
options,
receiveResultHandler -> {
if (receiveResultHandler.succeeded()) {
RabbitMQConsumer mqConsumer = receiveResultHandler.result();
mqConsumer.handler(
message -> {
long deliveryTag = message.envelope().getDeliveryTag();
Buffer body = message.body();
if (body != null) {
LOGGER.info("Subscription message received");
boolean isArrayReceived = isJsonArray(body);
LOGGER.debug("is message array received : {}", isArrayReceived);
if (isArrayReceived) {
JsonArray jsonArrayBody = body.toJsonArray();
jsonArrayBody.forEach(
json -> {
Future.future(e -> messagePush((JsonObject) json));
});
client.basicAck(deliveryTag, false);
} else {
messagePush(new JsonObject(body))
.onSuccess(
successResult -> {
LOGGER.info("Latest message published in RMQ.");
client.basicAck(deliveryTag, false);
})
.onFailure(
failureHandler -> {
LOGGER.error(
"Error while publishing messages for processing "
+ failureHandler.getMessage());
if (failureHandler
.getMessage()
.matches(
"(.*)Subscriber doesn't exist/expired(.*)")) {
client.basicAck(deliveryTag, false);
}
});
}
}
});
}
});
})
.onFailure(
failureHandler -> {
LOGGER.fatal("Rabbit client startup failed for subscription message Q consumer.");
});
}

public boolean isJsonArray(Buffer jsonObjectBuffer) {
Expand All @@ -102,15 +111,18 @@ public boolean isJsonArray(Buffer jsonObjectBuffer) {

public Future<Void> messagePush(JsonObject json) {
Promise<Void> promise = Promise.promise();
msgService.processSubscriptionMonitoringMessages(json)
.onSuccess(processResult -> {
LOGGER.debug("Subscription message published for processing");
promise.complete();
})
.onFailure(processFailure -> {
LOGGER.error("Error while publishing message for processing");
promise.fail("Failed to send mesasge to processer service");
});
msgService
.processSubscriptionMonitoringMessages(json)
.onSuccess(
processResult -> {
LOGGER.debug("Subscription message published for processing");
promise.complete();
})
.onFailure(
processFailure -> {
LOGGER.error(processFailure.getMessage());
promise.fail(processFailure.getMessage());
});
return promise.future();
}
}

0 comments on commit 2c71067

Please sign in to comment.