Skip to content

Commit

Permalink
[Issue #3] ACK messages, which are shifted into failed queue
Browse files Browse the repository at this point in the history
  • Loading branch information
clorenz committed Mar 26, 2018
1 parent eab7851 commit 62e093a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 8 deletions.
2 changes: 1 addition & 1 deletion bdd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>dc-flusswerk-parent</artifactId>
<groupId>de.digitalcollections.flusswerk</groupId>
<version>2.0.1</version>
<version>2.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>de.digitalcollections.flusswerk</groupId>
<artifactId>dc-flusswerk-parent</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public void fail(Message message) throws IOException {
if (failedRoutingKey != null) {
send(failedRoutingKey, message);
}
ack(message);
}

private void retry(Message message) throws IOException {
Expand Down Expand Up @@ -200,17 +201,23 @@ public Map<String, Long> getMessageCounts() throws IOException {
public Map<String, Long> getFailedMessageCounts() throws IOException {
Map<String, Long> result = new HashMap<>();
for (String inputQueue : routingConfig.getReadFrom()) {
String queue = routingConfig.getFailurePolicy(inputQueue).getFailedRoutingKey();
result.put(queue, rabbitClient.getMessageCount(queue));
FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue);
if ( failurePolicy != null ) {
String queue = failurePolicy.getFailedRoutingKey();
result.put(queue, rabbitClient.getMessageCount(queue));
}
}
return result;
}

public Map<String, Long> getRetryMessageCounts() throws IOException {
Map<String, Long> result = new HashMap<>();
for (String inputQueue : routingConfig.getReadFrom()) {
String queue = routingConfig.getFailurePolicy(inputQueue).getRetryRoutingKey();
result.put(queue, rabbitClient.getMessageCount(queue));
FailurePolicy failurePolicy = routingConfig.getFailurePolicy(inputQueue);
if ( failurePolicy != null ) {
String queue = failurePolicy.getRetryRoutingKey();
result.put(queue, rabbitClient.getMessageCount(queue));
}
}
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>de.digitalcollections.flusswerk</groupId>
<artifactId>dc-flusswerk-parent</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>de.digitalcollections.flusswerk</groupId>
<artifactId>dc-flusswerk-parent</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
<packaging>pom</packaging>

<name>DigitalCollections: Flusswerk</name>
Expand Down

0 comments on commit 62e093a

Please sign in to comment.