Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error handling when sending messages fails #181

Merged
merged 6 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,34 +139,51 @@ public void start() {
}

@SuppressWarnings("unchecked")
void process(Message receivedMessage) {
void process(Message message) {
Collection<? extends Message> messagesToSend;
try {
Collection<? extends Message> messagesToSend = flow.process(receivedMessage);
if (flow.hasMessagesToSend()) {
for (Message messageToSend : messagesToSend) {
messageBroker.send(messageToSend);
}
}
messageBroker.ack(receivedMessage);
processReport.reportSuccess(receivedMessage);
messagesToSend = flow.process(message);
} catch (StopProcessingException e) {
try {
processReport.reportFail(receivedMessage, e);
messageBroker.fail(receivedMessage);
} catch (IOException e1) {
LOGGER.error("Could not fail message" + receivedMessage.getEnvelope().getBody(), e1);
}
} catch (RuntimeException | IOException e) {
try {
boolean isRejected = messageBroker.reject(receivedMessage);
if (isRejected) {
processReport.reportReject(receivedMessage, e);
} else {
processReport.reportFailAfterMaxRetries(receivedMessage, e);
}
} catch (IOException e1) {
LOGGER.error("Could not reject message" + receivedMessage.getEnvelope().getBody(), e1);
fail(message, e);
return; // processing was not successful → stop here
} catch (RuntimeException e) {
retryOrFail(message, e);
return; // processing was not successful → stop here
}

// Data processing was successful, now handle the messaging
try {
messageBroker.send(messagesToSend);
messageBroker.ack(message);
processReport.reportSuccess(message);
} catch (Exception e) {
var stopProcessingException =
new StopProcessingException("Could not finish message handling").causedBy(e);
fail(message, stopProcessingException);
}
}

private void retryOrFail(Message receivedMessage, RuntimeException e) {
try {
boolean isRejected = messageBroker.reject(receivedMessage);
if (isRejected) {
processReport.reportReject(receivedMessage, e);
} else {
processReport.reportFailAfterMaxRetries(receivedMessage, e);
}
} catch (IOException fatalException) {
var body = receivedMessage.getEnvelope().getBody();
LOGGER.error("Could not reject message" + body, fatalException);
}
}

private void fail(Message message, StopProcessingException e) {
try {
processReport.reportFail(message, e);
messageBroker.fail(message);
} catch (IOException fatalException) {
var body = message.getEnvelope().getBody();
LOGGER.error("Could not fail message" + body, fatalException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,4 @@ public Collection<Message> process(M message) {
return result;
}

public boolean hasMessagesToSend() {
return writer != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public void send(Message message) throws IOException {
send(routingConfig.getWriteTo(), message);
}

/**
* Sends messages to the default output queue as JSON document.
*
* @param messages the message to send.
* @throws IOException if sending the message fails.
*/
public void send(Collection<? extends Message> messages) throws IOException {
send(routingConfig.getWriteTo(), messages);
}

/**
* Sends a message to a certain queue as JSON document.
*
Expand All @@ -71,7 +81,7 @@ public void send(String routingKey, Message message) throws IOException {
* @param messages the messages to send.
* @throws IOException if sending a message fails.
*/
public void send(String routingKey, Collection<Message> messages) throws IOException {
public void send(String routingKey, Collection<? extends Message> messages) throws IOException {
for (Message message : messages) {
send(routingKey, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ public void setTracingId(String tracingId) {
public String getTracingId() {
return tracingId;
}

@Override
public String toString() {
return String.format("Message{hashcode=%s, tracingId=%s", hashCode(), tracingId);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.github.dbmdz.flusswerk.framework.engine;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -30,13 +31,16 @@ class EngineTest {

private MessageBroker messageBroker;

private Message message;

@BeforeEach
void setUp() {
messageBroker = mock(MessageBroker.class);
message = new Message();
}

private Flow<Message, String, String> flowSendingMessage() {
return flowWithTransformer(Function.identity());
private Flow<Message, Message, Message> passthroughFlow() {
return FlowBuilder.messageProcessor(Message.class).process(m -> m).build();
}

private Flow<Message, String, String> flowWithTransformer(Function<String, String> transformer) {
Expand Down Expand Up @@ -108,7 +112,6 @@ void processShouldRejectMessageOnFailure() throws IOException {
var flow = flowWithTransformer(transformerWithException);

Engine engine = new Engine(messageBroker, flow);
Message message = new Message();

engine.process(message);

Expand All @@ -119,8 +122,7 @@ void processShouldRejectMessageOnFailure() throws IOException {
@Test
@DisplayName("should accept a message processed without failure")
void processShouldAcceptMessageWithoutFailure() throws IOException {
Engine engine = new Engine(messageBroker, flowSendingMessage());
Message message = new Message();
Engine engine = new Engine(messageBroker, passthroughFlow());
engine.process(message);

verify(messageBroker).ack(message);
Expand All @@ -130,26 +132,24 @@ void processShouldAcceptMessageWithoutFailure() throws IOException {
@Test
@DisplayName("should send a message")
void processShouldSendMessage() throws IOException {
Engine engine = new Engine(messageBroker, flowSendingMessage());
Engine engine = new Engine(messageBroker, passthroughFlow());
engine.process(new Message());
verify(messageBroker).send(any(Message.class));
verify(messageBroker).send(anyCollection());
}

@Test
@DisplayName("should stop with retry for RetriableProcessException")
void retryProcessExceptionShouldRejectTemporarily() throws IOException {
Engine engine = new Engine(messageBroker, flowThrowing(RetryProcessingException.class));
Message message = new Message();
engine.process(message);

verify(messageBroker).reject(message);
}

@Test
@DisplayName("should stop processing for good for StopProcessingException")
void shoudlFailMessageForStopProcessingException() throws IOException {
void shouldFailMessageForStopProcessingException() throws IOException {
Engine engine = new Engine(messageBroker, flowThrowing(StopProcessingException.class));
Message message = new Message();
engine.process(message);

verify(messageBroker).fail(message);
Expand All @@ -161,8 +161,17 @@ void testFunctionalReporter() {
final AtomicBoolean reportHasBeenCalled = new AtomicBoolean(false);
ReportFunction reportFn = (r, msg, e) -> reportHasBeenCalled.set(true);
Engine engine =
new Engine(messageBroker, flowThrowing(StopProcessingException.class), 4, reportFn);
new Engine(messageBroker, flowThrowing(StopProcessingException.class), reportFn);
engine.process(new Message());
assertThat(reportHasBeenCalled.get()).isTrue();
}

@Test
@DisplayName("should stop processing for good when sending messages fails")
void shouldStopProcessingWhenSendingFails() throws IOException {
doThrow(RuntimeException.class).when(messageBroker).send(anyCollection());
Engine engine = new Engine(messageBroker, passthroughFlow());
engine.process(message);
verify(messageBroker).fail(message);
}
}