Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromevdl committed Apr 9, 2024
1 parent 789b91b commit 1bbad5b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -49,16 +48,16 @@ public DynamoDbBatchMessageHandler(Consumer<DynamodbEvent.DynamodbStreamRecord>

@Override
public StreamsEventResponse processBatch(DynamodbEvent event, Context context) {
StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build();

for (DynamodbEvent.DynamodbStreamRecord streamRecord : event.getRecords()) {
processBatchItem(streamRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure));
}
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
.stream()
.map(eventRecord -> processBatchItem(eventRecord, context))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

return response;
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
}


@Override
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) {
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -60,13 +59,14 @@ public KinesisStreamsBatchMessageHandler(BiConsumer<KinesisEvent.KinesisEventRec

@Override
public StreamsEventResponse processBatch(KinesisEvent event, Context context) {
StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build();

for (KinesisEvent.KinesisEventRecord eventRecord : event.getRecords()) {
processBatchItem(eventRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure));
}
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = event.getRecords()
.stream()
.map(eventRecord -> processBatchItem(eventRecord, context))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

return response;
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -66,10 +65,10 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
// If we are working on a FIFO queue, when any message fails we should stop processing and return the
// rest of the batch as failed too. We use this variable to track when that has happened.
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
AtomicBoolean failWholeBatch = new AtomicBoolean(false);
final Boolean[] failWholeBatch = {false};

int messageCursor = 0;
for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) {
for (; messageCursor < event.getRecords().size() && !failWholeBatch[0]; messageCursor++) {
SQSEvent.SQSMessage message = event.getRecords().get(messageCursor);

String messageGroupId = message.getAttributes() != null ?
Expand All @@ -78,15 +77,15 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
processBatchItem(message, context).ifPresent(batchItemFailure -> {
response.getBatchItemFailures().add(batchItemFailure);
if (messageGroupId != null) {
failWholeBatch.set(true);
failWholeBatch[0] = true;
LOGGER.info(
"A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too"
, messageGroupId, message.getMessageId());
}
});
}

if (failWholeBatch.get()) {
if (failWholeBatch[0]) {
// Add the remaining messages to the batch item failures
event.getRecords()
.subList(messageCursor, event.getRecords().size())
Expand All @@ -100,8 +99,7 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
@Override
public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) {
if (!event.getRecords().isEmpty() && event.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null) {
LOGGER.warn("FIFO queues are not supported in parallel mode, proceeding in sequence");
return processBatch(event, context);
throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead");
}

MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
Expand Down

0 comments on commit 1bbad5b

Please sign in to comment.