From 09b0b143b8d2d5607d112e0cbe5ad01410e30534 Mon Sep 17 00:00:00 2001 From: Marcus Bitzl Date: Fri, 5 Jun 2020 10:59:21 +0200 Subject: [PATCH 1/6] Fix error handling when sending messages fails When sending outgoing messages fails processing should stop as retrying might cause situations that are very hard to debug (e.g. messages send multiple times). --- .../flusswerk/framework/engine/Engine.java | 68 ++++++++++++------- .../messagebroker/MessageBroker.java | 12 +++- .../flusswerk/framework/model/Message.java | 5 ++ .../framework/engine/EngineTest.java | 33 +++++---- 4 files changed, 80 insertions(+), 38 deletions(-) diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java index c447aeea..9e39b702 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java @@ -8,6 +8,7 @@ import com.github.dbmdz.flusswerk.framework.reporting.ProcessReport; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @@ -139,34 +140,51 @@ public void start() { } @SuppressWarnings("unchecked") - void process(Message receivedMessage) { + void process(Message message) { + Collection messagesToSend = Collections.emptyList(); try { - Collection messagesToSend = flow.process(receivedMessage); - if (flow.hasMessagesToSend()) { - for (Message messageToSend : messagesToSend) { - messageBroker.send(messageToSend); - } - } - messageBroker.ack(receivedMessage); - processReport.reportSuccess(receivedMessage); + messagesToSend = flow.process(message); } catch (StopProcessingException e) { - try { - processReport.reportFail(receivedMessage, e); - messageBroker.fail(receivedMessage); - } catch (IOException e1) { - LOGGER.error("Could not fail message" + receivedMessage.getEnvelope().getBody(), e1); - } - } catch (RuntimeException | IOException e) { - try { - boolean isRejected = messageBroker.reject(receivedMessage); - if (isRejected) { - processReport.reportReject(receivedMessage, e); - } else { - processReport.reportFailAfterMaxRetries(receivedMessage, e); - } - } catch (IOException e1) { - LOGGER.error("Could not reject message" + receivedMessage.getEnvelope().getBody(), e1); + fail(message, e); + return; // processing was not successful → stop here + } catch (RuntimeException e) { + retryOrFail(message, e); + return; // processing was not successful → stop here + } + + // Data processing was successful, now handle the messaging + try { + messageBroker.send(messagesToSend); + messageBroker.ack(message); + processReport.reportSuccess(message); + } catch (Exception e) { + var stopProcessingException = + new StopProcessingException("Could not finish message handling").causedBy(e); + fail(message, stopProcessingException); + } + } + + private void retryOrFail(Message receivedMessage, RuntimeException e) { + try { + boolean isRejected = messageBroker.reject(receivedMessage); + if (isRejected) { + processReport.reportReject(receivedMessage, e); + } else { + processReport.reportFailAfterMaxRetries(receivedMessage, e); } + } catch (IOException fatalExecption) { + var body = receivedMessage.getEnvelope().getBody(); + LOGGER.error("Could not reject message" + body, fatalExecption); + } + } + + private void fail(Message message, StopProcessingException e) { + try { + processReport.reportFail(message, e); + messageBroker.fail(message); + } catch (IOException fatalExecption) { + var body = message.getEnvelope().getBody(); + LOGGER.error("Could not fail message" + body, fatalExecption); } } diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/messagebroker/MessageBroker.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/messagebroker/MessageBroker.java index b50cdacc..60d5bc48 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/messagebroker/MessageBroker.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/messagebroker/MessageBroker.java @@ -50,6 +50,16 @@ public void send(Message message) throws IOException { send(routingConfig.getWriteTo(), message); } + /** + * Sends messages to the default output queue as JSON document. + * + * @param messages the message to send. + * @throws IOException if sending the message fails. + */ + public void send(Collection messages) throws IOException { + send(routingConfig.getWriteTo(), messages); + } + /** * Sends a message to a certain queue as JSON document. * @@ -71,7 +81,7 @@ public void send(String routingKey, Message message) throws IOException { * @param messages the messages to send. * @throws IOException if sending a message fails. */ - public void send(String routingKey, Collection messages) throws IOException { + public void send(String routingKey, Collection messages) throws IOException { for (Message message : messages) { send(routingKey, message); } diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/model/Message.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/model/Message.java index c156ebb9..1def2e1c 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/model/Message.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/model/Message.java @@ -38,4 +38,9 @@ public void setTracingId(String tracingId) { public String getTracingId() { return tracingId; } + + @Override + public String toString() { + return String.format("Message{hashcode=%s, tracingId=%s", hashCode(), tracingId); + } } diff --git a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/EngineTest.java b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/EngineTest.java index 2f329c0a..429a072f 100644 --- a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/EngineTest.java +++ b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/EngineTest.java @@ -1,7 +1,8 @@ package com.github.dbmdz.flusswerk.framework.engine; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -30,13 +31,16 @@ class EngineTest { private MessageBroker messageBroker; + private Message message; + @BeforeEach void setUp() { messageBroker = mock(MessageBroker.class); + message = new Message(); } - private Flow flowSendingMessage() { - return flowWithTransformer(Function.identity()); + private Flow passthroughFlow() { + return FlowBuilder.messageProcessor(Message.class).process(m -> m).build(); } private Flow flowWithTransformer(Function transformer) { @@ -108,7 +112,6 @@ void processShouldRejectMessageOnFailure() throws IOException { var flow = flowWithTransformer(transformerWithException); Engine engine = new Engine(messageBroker, flow); - Message message = new Message(); engine.process(message); @@ -119,8 +122,7 @@ void processShouldRejectMessageOnFailure() throws IOException { @Test @DisplayName("should accept a message processed without failure") void processShouldAcceptMessageWithoutFailure() throws IOException { - Engine engine = new Engine(messageBroker, flowSendingMessage()); - Message message = new Message(); + Engine engine = new Engine(messageBroker, passthroughFlow()); engine.process(message); verify(messageBroker).ack(message); @@ -130,16 +132,15 @@ void processShouldAcceptMessageWithoutFailure() throws IOException { @Test @DisplayName("should send a message") void processShouldSendMessage() throws IOException { - Engine engine = new Engine(messageBroker, flowSendingMessage()); + Engine engine = new Engine(messageBroker, passthroughFlow()); engine.process(new Message()); - verify(messageBroker).send(any(Message.class)); + verify(messageBroker).send(anyCollection()); } @Test @DisplayName("should stop with retry for RetriableProcessException") void retryProcessExceptionShouldRejectTemporarily() throws IOException { Engine engine = new Engine(messageBroker, flowThrowing(RetryProcessingException.class)); - Message message = new Message(); engine.process(message); verify(messageBroker).reject(message); @@ -147,9 +148,8 @@ void retryProcessExceptionShouldRejectTemporarily() throws IOException { @Test @DisplayName("should stop processing for good for StopProcessingException") - void shoudlFailMessageForStopProcessingException() throws IOException { + void shouldFailMessageForStopProcessingException() throws IOException { Engine engine = new Engine(messageBroker, flowThrowing(StopProcessingException.class)); - Message message = new Message(); engine.process(message); verify(messageBroker).fail(message); @@ -161,8 +161,17 @@ void testFunctionalReporter() { final AtomicBoolean reportHasBeenCalled = new AtomicBoolean(false); ReportFunction reportFn = (r, msg, e) -> reportHasBeenCalled.set(true); Engine engine = - new Engine(messageBroker, flowThrowing(StopProcessingException.class), 4, reportFn); + new Engine(messageBroker, flowThrowing(StopProcessingException.class), reportFn); engine.process(new Message()); assertThat(reportHasBeenCalled.get()).isTrue(); } + + @Test + @DisplayName("should stop processing for good when sending messages fails") + void shouldStopProcessingWhenSendingFails() throws IOException { + doThrow(RuntimeException.class).when(messageBroker).send(anyCollection()); + Engine engine = new Engine(messageBroker, passthroughFlow()); + engine.process(message); + verify(messageBroker).fail(message); + } } From 8e6c679b187ff2071db3f696fd3293e290ad80a0 Mon Sep 17 00:00:00 2001 From: Marcus Bitzl Date: Fri, 5 Jun 2020 11:03:43 +0200 Subject: [PATCH 2/6] Remove unused code --- .../java/com/github/dbmdz/flusswerk/framework/flow/Flow.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/flow/Flow.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/flow/Flow.java index 46f283fc..bbddeba8 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/flow/Flow.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/flow/Flow.java @@ -71,7 +71,4 @@ public Collection process(M message) { return result; } - public boolean hasMessagesToSend() { - return writer != null; - } } From ea69a79afa03c0deb9db927b66a2d71b109bdf83 Mon Sep 17 00:00:00 2001 From: Marcus Bitzl Date: Fri, 5 Jun 2020 11:05:55 +0200 Subject: [PATCH 3/6] Remove unused code --- .../com/github/dbmdz/flusswerk/framework/engine/Engine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java index 9e39b702..aca7e8b4 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java @@ -141,7 +141,7 @@ public void start() { @SuppressWarnings("unchecked") void process(Message message) { - Collection messagesToSend = Collections.emptyList(); + Collection messagesToSend; try { messagesToSend = flow.process(message); } catch (StopProcessingException e) { From 8bb7212d812c5f59c7edca82c45c5d652840e80d Mon Sep 17 00:00:00 2001 From: Marcus Bitzl Date: Fri, 5 Jun 2020 11:07:08 +0200 Subject: [PATCH 4/6] Remove unused code --- .../java/com/github/dbmdz/flusswerk/framework/engine/Engine.java | 1 - 1 file changed, 1 deletion(-) diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java index aca7e8b4..ec410fc5 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java @@ -8,7 +8,6 @@ import com.github.dbmdz.flusswerk.framework.reporting.ProcessReport; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; From 7add257669ce49d30c2e9fac5b449c7e8d1a8f66 Mon Sep 17 00:00:00 2001 From: Marcus Bitzl Date: Fri, 5 Jun 2020 11:17:30 +0200 Subject: [PATCH 5/6] Update framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java Co-authored-by: Thomas Zirngibl --- .../com/github/dbmdz/flusswerk/framework/engine/Engine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java index ec410fc5..afda2bd6 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java @@ -171,7 +171,7 @@ private void retryOrFail(Message receivedMessage, RuntimeException e) { } else { processReport.reportFailAfterMaxRetries(receivedMessage, e); } - } catch (IOException fatalExecption) { + } catch (IOException fatalException) { var body = receivedMessage.getEnvelope().getBody(); LOGGER.error("Could not reject message" + body, fatalExecption); } From 6de3556fbb8cd40e4ba5908eb34fc04758cb9f7c Mon Sep 17 00:00:00 2001 From: Marcus Bitzl Date: Fri, 5 Jun 2020 11:18:44 +0200 Subject: [PATCH 6/6] Fix typo --- .../com/github/dbmdz/flusswerk/framework/engine/Engine.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java index afda2bd6..e58ae5b1 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Engine.java @@ -173,7 +173,7 @@ private void retryOrFail(Message receivedMessage, RuntimeException e) { } } catch (IOException fatalException) { var body = receivedMessage.getEnvelope().getBody(); - LOGGER.error("Could not reject message" + body, fatalExecption); + LOGGER.error("Could not reject message" + body, fatalException); } } @@ -181,9 +181,9 @@ private void fail(Message message, StopProcessingException e) { try { processReport.reportFail(message, e); messageBroker.fail(message); - } catch (IOException fatalExecption) { + } catch (IOException fatalException) { var body = message.getEnvelope().getBody(); - LOGGER.error("Could not fail message" + body, fatalExecption); + LOGGER.error("Could not fail message" + body, fatalException); } }