Skip to content

Commit

Permalink
Use spring integration for sample to case publish (#201)
Browse files Browse the repository at this point in the history
* Use spring integration for sample to case publish

* auto patch increment

* Slightly reorder logging

Co-authored-by: ras-rm-pr-bot <[email protected]>
  • Loading branch information
insacuri and ras-rm-pr-bot authored Aug 31, 2021
1 parent 637f9df commit 514555b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 66 deletions.
4 changes: 2 additions & 2 deletions _infra/helm/sample/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 12.0.4
version: 12.0.5

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 12.0.4
appVersion: 12.0.5
12 changes: 0 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-pubsub</artifactId>
Expand All @@ -263,14 +259,6 @@
<scope>import</scope>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>20.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>ma.glasnost.orika</groupId>
<artifactId>orika-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,17 @@ public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
public interface PubsubOutboundGateway {
void sendToPubsub(String text);
}

@Bean
@ServiceActivator(inputChannel = "sampleUnitChannel")
public MessageHandler caseNotificationMessageSender(PubSubTemplate pubsubTemplate) {
String topicId = appConfig.getGcp().getCaseNotificationTopic();
log.info("Application started with publisher for sample to case with topic Id {}", topicId);
return new PubSubMessageHandler(pubsubTemplate, topicId);
}

@MessagingGateway(defaultRequestChannel = "sampleUnitChannel")
public interface PubSubOutboundCaseNotificationGateway {
void sendToPubSub(String text);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -26,12 +17,13 @@
public class SampleUnitPublisher {
private static final Logger log = LoggerFactory.getLogger(SampleUnitPublisher.class);

@Autowired private PubSub pubSub;

@Autowired private ObjectMapper objectMapper;

@Autowired private SampleSvcApplication.PubSubOutboundSampleUnitGateway samplePublisher;

@Autowired
private SampleSvcApplication.PubSubOutboundCaseNotificationGateway caseNotificationPublisher;

/**
* send sample to collection exercise via PubSub
*
Expand All @@ -53,53 +45,20 @@ public void send(SampleUnit sampleUnit) {
}

/**
* Sends a sample unit to case via PubSub
* send sample to case via PubSub
*
* @param sampleUnit A sample unit to be sent
* @param sampleUnit to be sent
*/
public void sendSampleUnitToCase(SampleUnitParentDTO sampleUnit) {
log.debug(
"Entering sendSampleUnit",
kv("sample_unit_type", sampleUnit.getSampleUnitType()),
kv("sample_unit_ref", sampleUnit.getSampleUnitRef()));
try {
log.info("Publishing message to PubSub", kv("sampleUnitRef", sampleUnit.getSampleUnitRef()));
String message = objectMapper.writeValueAsString(sampleUnit);
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
Publisher publisher = pubSub.caseNotificationPublisher();
try {
log.info("Publishing message to PubSub");
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
ApiFutures.addCallback(
messageIdFuture,
new ApiFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
log.error(
"SampleUnit publish sent failure to PubSub.",
kv("error", apiException.getStatusCode().getCode()));
}
log.error("Error Publishing PubSub message", kv("message", message));
}

@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
log.info("SampleUnit publish sent successfully", kv("messageId", messageId));
}
},
MoreExecutors.directExecutor());
} finally {
publisher.shutdown();
pubSub.shutdown();
}
caseNotificationPublisher.sendToPubSub(message);
log.info(
"Sample unit to case publish sent successfully",
kv("sampleUnitRef", sampleUnit.getSampleUnitRef()));
} catch (JsonProcessingException e) {
log.error("Error while sampleUnit can not be parsed.", kv("sampleUnit", sampleUnit));
throw new RuntimeException(e);
} catch (IOException e) {
log.error("PubSub Error while processing sample unit distribution", e);
log.error("Error converting sample unit to JSON", kv("sampleUnit", sampleUnit));
throw new RuntimeException(e);
}
}
Expand Down

0 comments on commit 514555b

Please sign in to comment.