From 62e093abc403d59ebd09a289885737e9047650a2 Mon Sep 17 00:00:00 2001 From: Christoph Lorenz Date: Mon, 26 Mar 2018 08:59:15 +0200 Subject: [PATCH] [Issue #3] ACK messages, which are shifted into failed queue --- bdd/pom.xml | 2 +- engine/pom.xml | 2 +- .../engine/messagebroker/MessageBroker.java | 15 +++++++++++---- examples/pom.xml | 2 +- pom.xml | 2 +- 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/bdd/pom.xml b/bdd/pom.xml index 834dca90..89bb08a3 100644 --- a/bdd/pom.xml +++ b/bdd/pom.xml @@ -3,7 +3,7 @@ dc-flusswerk-parent de.digitalcollections.flusswerk - 2.0.1 + 2.0.2 4.0.0 diff --git a/engine/pom.xml b/engine/pom.xml index 11b0ebbc..761d1f98 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -4,7 +4,7 @@ de.digitalcollections.flusswerk dc-flusswerk-parent - 2.0.1 + 2.0.2 4.0.0 diff --git a/engine/src/main/java/de/digitalcollections/flusswerk/engine/messagebroker/MessageBroker.java b/engine/src/main/java/de/digitalcollections/flusswerk/engine/messagebroker/MessageBroker.java index 86aa9094..fcb828c3 100644 --- a/engine/src/main/java/de/digitalcollections/flusswerk/engine/messagebroker/MessageBroker.java +++ b/engine/src/main/java/de/digitalcollections/flusswerk/engine/messagebroker/MessageBroker.java @@ -169,6 +169,7 @@ public void fail(Message message) throws IOException { if (failedRoutingKey != null) { send(failedRoutingKey, message); } + ack(message); } private void retry(Message message) throws IOException { @@ -200,8 +201,11 @@ public Map getMessageCounts() throws IOException { public Map getFailedMessageCounts() throws IOException { Map result = new HashMap<>(); for (String inputQueue : routingConfig.getReadFrom()) { - String queue = routingConfig.getFailurePolicy(inputQueue).getFailedRoutingKey(); - result.put(queue, rabbitClient.getMessageCount(queue)); + FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue); + if ( failurePolicy != null ) { + String queue = failurePolicy.getFailedRoutingKey(); + result.put(queue, rabbitClient.getMessageCount(queue)); + } } return result; } @@ -209,8 +213,11 @@ public Map getFailedMessageCounts() throws IOException { public Map getRetryMessageCounts() throws IOException { Map result = new HashMap<>(); for (String inputQueue : routingConfig.getReadFrom()) { - String queue = routingConfig.getFailurePolicy(inputQueue).getRetryRoutingKey(); - result.put(queue, rabbitClient.getMessageCount(queue)); + FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue); + if ( failurePolicy != null ) { + String queue = failurePolicy.getRetryRoutingKey(); + result.put(queue, rabbitClient.getMessageCount(queue)); + } } return result; } diff --git a/examples/pom.xml b/examples/pom.xml index 525f2c29..42e3a522 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -5,7 +5,7 @@ de.digitalcollections.flusswerk dc-flusswerk-parent - 2.0.1 + 2.0.2 4.0.0 diff --git a/pom.xml b/pom.xml index 7a3689c8..156e28b2 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ de.digitalcollections.flusswerk dc-flusswerk-parent - 2.0.1 + 2.0.2 pom DigitalCollections: Flusswerk