From c2963347bc40c9c4033777fed0abe164deb0b2bd Mon Sep 17 00:00:00 2001 From: Jakub Narloch Date: Sun, 16 Feb 2020 17:40:39 -0800 Subject: [PATCH] Adding support for EventBridge --- .idea/misc.xml | 13 -- spring-cloud-aws-dependencies/pom.xml | 2 +- spring-cloud-aws-messaging/pom.xml | 4 + .../config/annotation/EnableEventBridge.java | 35 ++++ .../annotation/EventBridgeConfiguration.java | 50 ++++++ .../core/EventBusMessageChannel.java | 76 +++++++++ .../core/EventsMessagingTemplate.java | 96 +++++++++++ .../DynamicEventBusDestinationResolver.java | 74 +++++++++ .../EventBridgeConfigurationTest.java | 149 ++++++++++++++++++ .../core/EventBusMessageChannelTest.java | 87 ++++++++++ .../core/EventsMessagingTemplateTest.java | 128 +++++++++++++++ ...ynamicEventBusDestinationResolverTest.java | 101 ++++++++++++ 12 files changed, 801 insertions(+), 14 deletions(-) delete mode 100644 .idea/misc.xml create mode 100644 spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/config/annotation/EnableEventBridge.java create mode 100644 spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/config/annotation/EventBridgeConfiguration.java create mode 100644 spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/core/EventBusMessageChannel.java create mode 100644 spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/core/EventsMessagingTemplate.java create mode 100644 spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/support/destination/DynamicEventBusDestinationResolver.java create mode 100644 spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/config/annotation/EventBridgeConfigurationTest.java create mode 100644 spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/core/EventBusMessageChannelTest.java create mode 100644 spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/core/EventsMessagingTemplateTest.java create mode 100644 spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/support/destination/DynamicEventBusDestinationResolverTest.java diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index d30d09e20..000000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - \ No newline at end of file diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index a75606057..7bb837a40 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -31,7 +31,7 @@ Spring Cloud AWS Dependencies Spring Cloud AWS Dependencies - 1.11.415 + 1.11.624 1.1.1 1.0.0 1.3.2.RELEASE diff --git a/spring-cloud-aws-messaging/pom.xml b/spring-cloud-aws-messaging/pom.xml index 69d8b7ff8..c33336239 100644 --- a/spring-cloud-aws-messaging/pom.xml +++ b/spring-cloud-aws-messaging/pom.xml @@ -42,6 +42,10 @@ com.amazonaws aws-java-sdk-sqs + + com.amazonaws + aws-java-sdk-events + org.springframework spring-messaging diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/config/annotation/EnableEventBridge.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/config/annotation/EnableEventBridge.java new file mode 100644 index 000000000..14f263608 --- /dev/null +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/config/annotation/EnableEventBridge.java @@ -0,0 +1,35 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.config.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.context.annotation.Import; + +/** + * @author Jakub Narloch + * @since 2.3.0 + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Import({ EventBridgeConfiguration.class }) +public @interface EnableEventBridge { + +} diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/config/annotation/EventBridgeConfiguration.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/config/annotation/EventBridgeConfiguration.java new file mode 100644 index 000000000..5d3d4d43b --- /dev/null +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/config/annotation/EventBridgeConfiguration.java @@ -0,0 +1,50 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.config.annotation; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents; +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEventsClient; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.aws.context.annotation.ConditionalOnMissingAmazonClient; +import org.springframework.cloud.aws.core.config.AmazonWebserviceClientFactoryBean; +import org.springframework.cloud.aws.core.region.RegionProvider; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Jakub Narloch + * @since 2.3.0 + */ +@Configuration(proxyBeanMethods = false) +public class EventBridgeConfiguration { + + @Autowired(required = false) + private AWSCredentialsProvider awsCredentialsProvider; + + @Autowired(required = false) + private RegionProvider regionProvider; + + @ConditionalOnMissingAmazonClient(AmazonCloudWatchEvents.class) + @Bean + public AmazonWebserviceClientFactoryBean amazonEvents() { + return new AmazonWebserviceClientFactoryBean<>(AmazonCloudWatchEventsClient.class, + this.awsCredentialsProvider, this.regionProvider); + } + +} diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/core/EventBusMessageChannel.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/core/EventBusMessageChannel.java new file mode 100644 index 000000000..86c392c78 --- /dev/null +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/core/EventBusMessageChannel.java @@ -0,0 +1,76 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.core; + +import java.util.Optional; + +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents; +import com.amazonaws.services.cloudwatchevents.model.PutEventsRequest; +import com.amazonaws.services.cloudwatchevents.model.PutEventsRequestEntry; + +import org.springframework.messaging.Message; +import org.springframework.messaging.support.AbstractMessageChannel; + +/** + * @author Jakub Narloch + * @since 2.3.0 + */ +public class EventBusMessageChannel extends AbstractMessageChannel { + + /** + * The 'source' message header. + */ + public static final String EVENT_SOURCE_HEADER = "EVENT_SOURCE_HEADER"; + + /** + * The 'detail-type' message header. + */ + public static final String EVENT_DETAIL_TYPE_HEADER = "EVENT_DETAIL_TYPE_HEADER"; + + private final AmazonCloudWatchEvents amazonEvents; + + private final String eventBus; + + public EventBusMessageChannel(AmazonCloudWatchEvents amazonEvents, String eventBus) { + this.amazonEvents = amazonEvents; + this.eventBus = eventBus; + } + + @Override + protected boolean sendInternal(Message message, long timeout) { + PutEventsRequestEntry entry = new PutEventsRequestEntry() + .withEventBusName(eventBus).withSource(findEventSource(message)) + .withDetailType(findEventDetailType(message)) + .withDetail(message.getPayload().toString()); + amazonEvents.putEvents(new PutEventsRequest().withEntries(entry)); + return true; + } + + private static String findEventSource(Message message) { + return findHeaderValue(message, EVENT_SOURCE_HEADER); + } + + private static String findEventDetailType(Message message) { + return findHeaderValue(message, EVENT_DETAIL_TYPE_HEADER); + } + + private static String findHeaderValue(Message message, String header) { + return Optional.ofNullable(message.getHeaders().get(header)).map(Object::toString) + .orElse(null); + } + +} diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/core/EventsMessagingTemplate.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/core/EventsMessagingTemplate.java new file mode 100644 index 000000000..44395a508 --- /dev/null +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/core/EventsMessagingTemplate.java @@ -0,0 +1,96 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.core; + +import java.util.HashMap; +import java.util.Map; + +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents; + +import org.springframework.cloud.aws.core.env.ResourceIdResolver; +import org.springframework.cloud.aws.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate; +import org.springframework.cloud.aws.messaging.support.destination.DynamicEventBusDestinationResolver; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.core.DestinationResolver; + +/** + * @author Jakub Narloch + * @since 2.3.0 + */ +public class EventsMessagingTemplate + extends AbstractMessageChannelMessagingSendingTemplate { + + private final AmazonCloudWatchEvents amazonEvents; + + public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents) { + this(amazonEvents, (ResourceIdResolver) null, null); + } + + public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents, + ResourceIdResolver resourceIdResolver, MessageConverter messageConverter) { + super(new DynamicEventBusDestinationResolver(amazonEvents, resourceIdResolver)); + this.amazonEvents = amazonEvents; + initMessageConverter(messageConverter); + } + + public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents, + DestinationResolver destinationResolver, + MessageConverter messageConverter) { + super(destinationResolver); + this.amazonEvents = amazonEvents; + initMessageConverter(messageConverter); + } + + @Override + protected EventBusMessageChannel resolveMessageChannel( + String physicalResourceIdentifier) { + return new EventBusMessageChannel(this.amazonEvents, physicalResourceIdentifier); + } + + /** + * Convenience method that sends an event identified by {@literal source} and + * {@literal detailType} with the given {@literal message} to the + * {@literal destination}. + * @param source The event source + * @param detailType The event detail-type + * @param message The event body to send + */ + public void sendEvent(String source, String detailType, Object message) { + Map headers = new HashMap<>(); + headers.put(EventBusMessageChannel.EVENT_SOURCE_HEADER, source); + headers.put(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, detailType); + this.convertAndSend(getRequiredDefaultDestination(), message, headers); + } + + /** + * Convenience method that sends an event identified by {@literal source} and + * {@literal detailType} with the given {@literal message} to the specific + * {@literal eventBus}. + * @param eventBus The event bus name + * @param source The event source + * @param detailType The event detail-type + * @param message The event body to send + */ + public void sendEvent(String eventBus, String source, String detailType, + Object message) { + Map headers = new HashMap<>(); + headers.put(EventBusMessageChannel.EVENT_SOURCE_HEADER, source); + headers.put(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, detailType); + this.convertAndSend(eventBus, message, headers); + } + +} diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/support/destination/DynamicEventBusDestinationResolver.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/support/destination/DynamicEventBusDestinationResolver.java new file mode 100644 index 000000000..9921f5ac9 --- /dev/null +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/support/destination/DynamicEventBusDestinationResolver.java @@ -0,0 +1,74 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.support.destination; + +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents; +import com.amazonaws.services.cloudwatchevents.model.CreateEventBusRequest; + +import org.springframework.cloud.aws.core.env.ResourceIdResolver; +import org.springframework.cloud.aws.core.naming.AmazonResourceName; +import org.springframework.messaging.core.DestinationResolutionException; +import org.springframework.messaging.core.DestinationResolver; + +/** + * @author Jakub Narloch + * @since 2.3.0 + */ +public class DynamicEventBusDestinationResolver implements DestinationResolver { + + private final AmazonCloudWatchEvents amazonEvents; + + private final ResourceIdResolver resourceIdResolver; + + private boolean autoCreate; + + public DynamicEventBusDestinationResolver(AmazonCloudWatchEvents amazonEvents) { + this(amazonEvents, null); + } + + public DynamicEventBusDestinationResolver(AmazonCloudWatchEvents amazonEvents, + ResourceIdResolver resourceIdResolver) { + this.amazonEvents = amazonEvents; + this.resourceIdResolver = resourceIdResolver; + } + + public void setAutoCreate(boolean autoCreate) { + this.autoCreate = autoCreate; + } + + @Override + public String resolveDestination(String name) throws DestinationResolutionException { + if (autoCreate) { + amazonEvents.createEventBus(new CreateEventBusRequest().withName(name)) + .getEventBusArn(); + return name; + } + + String eventBusName = name; + if (resourceIdResolver != null) { + eventBusName = resourceIdResolver.resolveToPhysicalResourceId(name); + } + + if (eventBusName != null + && AmazonResourceName.isValidAmazonResourceName(eventBusName)) { + return AmazonResourceName.fromString(eventBusName).getResourceName(); + } + + return eventBusName; + } + +} diff --git a/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/config/annotation/EventBridgeConfigurationTest.java b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/config/annotation/EventBridgeConfigurationTest.java new file mode 100644 index 000000000..6c32f22d0 --- /dev/null +++ b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/config/annotation/EventBridgeConfigurationTest.java @@ -0,0 +1,149 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.config.annotation; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents; +import org.junit.Before; +import org.junit.Test; + +import org.springframework.cloud.aws.context.config.annotation.EnableContextRegion; +import org.springframework.context.annotation.Bean; +import org.springframework.mock.web.MockServletContext; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.web.context.support.AnnotationConfigWebApplicationContext; +import org.springframework.web.servlet.config.annotation.EnableWebMvc; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +public class EventBridgeConfigurationTest { + + private AnnotationConfigWebApplicationContext webApplicationContext; + + @Before + public void setUp() throws Exception { + this.webApplicationContext = new AnnotationConfigWebApplicationContext(); + this.webApplicationContext.setServletContext(new MockServletContext()); + } + + @Test + public void enableEventBridge_withMinimalConfig__shouldBeUsedToCreateClient() + throws Exception { + // Arrange & Act + this.webApplicationContext.register(MinimalEventBridgeConfiguration.class); + this.webApplicationContext.refresh(); + AmazonCloudWatchEvents amazonEvents = this.webApplicationContext + .getBean(AmazonCloudWatchEvents.class); + + // Assert + assertThat(amazonEvents).isNotNull(); + } + + @Test + public void enableEventBridge_withProvidedCredentials_shouldBeUsedToCreateClient() + throws Exception { + // Arrange & Act + this.webApplicationContext + .register(EventBridgeConfigurationWithCredentials.class); + this.webApplicationContext.refresh(); + AmazonCloudWatchEvents amazonEvents = this.webApplicationContext + .getBean(AmazonCloudWatchEvents.class); + + // Assert + assertThat(amazonEvents).isNotNull(); + assertThat(ReflectionTestUtils.getField(amazonEvents, "awsCredentialsProvider")) + .isEqualTo( + EventBridgeConfigurationWithCredentials.AWS_CREDENTIALS_PROVIDER); + } + + @Test + public void enableEventBridge_withCustomAmazonSnsClient_shouldBeUsedToCreateClient() + throws Exception { + // Arrange & Act + this.webApplicationContext + .register(EventBridgeConfigurationWithCustomAmazonClient.class); + this.webApplicationContext.refresh(); + AmazonCloudWatchEvents amazonEvents = this.webApplicationContext + .getBean(AmazonCloudWatchEvents.class); + + // Assert + assertThat(amazonEvents).isNotNull(); + assertThat(amazonEvents) + .isEqualTo(EventBridgeConfigurationWithCustomAmazonClient.AMAZON_EVENTS); + } + + @Test + public void enableSns_withRegionProvided_shouldBeUsedToCreateClient() + throws Exception { + // Arrange & Act + this.webApplicationContext + .register(EventBridgeConfigurationWithRegionProvider.class); + this.webApplicationContext.refresh(); + AmazonCloudWatchEvents amazonEvents = this.webApplicationContext + .getBean(AmazonCloudWatchEvents.class); + + // Assert + assertThat(ReflectionTestUtils.getField(amazonEvents, "endpoint").toString()) + .isEqualTo("https://" + Region.getRegion(Regions.EU_WEST_1) + .getServiceEndpoint("events")); + } + + @EnableWebMvc + @EnableEventBridge + protected static class MinimalEventBridgeConfiguration { + + } + + @EnableWebMvc + @EnableEventBridge + protected static class EventBridgeConfigurationWithCredentials { + + public static final AWSCredentialsProvider AWS_CREDENTIALS_PROVIDER = mock( + AWSCredentialsProvider.class); + + @Bean + public AWSCredentialsProvider awsCredentialsProvider() { + return AWS_CREDENTIALS_PROVIDER; + } + + } + + @EnableWebMvc + @EnableEventBridge + protected static class EventBridgeConfigurationWithCustomAmazonClient { + + public static final AmazonCloudWatchEvents AMAZON_EVENTS = mock( + AmazonCloudWatchEvents.class); + + @Bean + public AmazonCloudWatchEvents amazonEvents() { + return AMAZON_EVENTS; + } + + } + + @EnableWebMvc + @EnableContextRegion(region = "eu-west-1") + @EnableEventBridge + protected static class EventBridgeConfigurationWithRegionProvider { + + } + +} diff --git a/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/core/EventBusMessageChannelTest.java b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/core/EventBusMessageChannelTest.java new file mode 100644 index 000000000..d5ec6a77b --- /dev/null +++ b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/core/EventBusMessageChannelTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.core; + +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents; +import com.amazonaws.services.cloudwatchevents.model.PutEventsRequest; +import com.amazonaws.services.cloudwatchevents.model.PutEventsRequestEntry; +import org.junit.Before; +import org.junit.Test; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.verify; + +/** + * @author Jakub Narloch + */ +public class EventBusMessageChannelTest { + + private Message message; + + @Before + public void setUp() throws Exception { + message = MessageBuilder.withPayload("Message content") + .setHeader(EventBusMessageChannel.EVENT_SOURCE_HEADER, "custom") + .setHeader(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, "My Event") + .build(); + } + + @Test + public void sendMessage_validTextMessageAndSubject_returnsTrue() throws Exception { + // Arrange + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + + MessageChannel messageChannel = new EventBusMessageChannel(amazonEvents, + "default"); + + // Act + boolean sent = messageChannel.send(message); + + // Assert + verify(amazonEvents, only()) + .putEvents(new PutEventsRequest().withEntries(new PutEventsRequestEntry() + .withEventBusName("default").withSource("custom") + .withDetailType("My Event").withDetail(message.getPayload()))); + assertThat(sent).isTrue(); + } + + @Test + public void sendMessage_validTextMessageAndTimeout_timeoutIsIgnored() + throws Exception { + // Arrange + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + MessageChannel messageChannel = new EventBusMessageChannel(amazonEvents, + "default"); + + // Act + boolean sent = messageChannel.send(message, 10); + + // Assert + verify(amazonEvents, only()) + .putEvents(new PutEventsRequest().withEntries(new PutEventsRequestEntry() + .withEventBusName("default").withSource("custom") + .withDetailType("My Event").withDetail(message.getPayload()))); + assertThat(sent).isTrue(); + } + +} diff --git a/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/core/EventsMessagingTemplateTest.java b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/core/EventsMessagingTemplateTest.java new file mode 100644 index 000000000..180699210 --- /dev/null +++ b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/core/EventsMessagingTemplateTest.java @@ -0,0 +1,128 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.core; + +import java.util.Locale; + +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents; +import com.amazonaws.services.cloudwatchevents.model.EventBus; +import com.amazonaws.services.cloudwatchevents.model.ListEventBusesRequest; +import com.amazonaws.services.cloudwatchevents.model.ListEventBusesResult; +import com.amazonaws.services.cloudwatchevents.model.PutEventsRequest; +import com.amazonaws.services.cloudwatchevents.model.PutEventsRequestEntry; +import org.junit.Test; + +import org.springframework.messaging.core.DestinationResolver; +import org.springframework.messaging.support.MessageBuilder; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class EventsMessagingTemplateTest { + + @Test + public void send_validTextMessage_usesEventBusChannel() throws Exception { + // Arrange + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + EventsMessagingTemplate eventsMessagingTemplate = new EventsMessagingTemplate( + amazonEvents); + String physicalEventBusName = "arn:aws:events:us-east-1:123456789012:event-bus/default"; + when(amazonEvents.listEventBuses(new ListEventBusesRequest())) + .thenReturn(new ListEventBusesResult().withEventBuses(new EventBus() + .withName("default").withArn(physicalEventBusName))); + eventsMessagingTemplate.setDefaultDestinationName("default"); + + // Act + eventsMessagingTemplate.send(MessageBuilder.withPayload("Message content") + .setHeader(EventBusMessageChannel.EVENT_SOURCE_HEADER, "custom") + .setHeader(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, "My Event") + .build()); + + // Assert + verify(amazonEvents) + .putEvents(new PutEventsRequest().withEntries(new PutEventsRequestEntry() + .withEventBusName("default").withSource("custom") + .withDetailType("My Event").withDetail("Message content"))); + } + + @Test + public void send_validTextMessageWithCustomDestinationResolver_usesEventBusChannel() + throws Exception { + // Arrange + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + EventsMessagingTemplate eventsMessagingTemplate = new EventsMessagingTemplate( + amazonEvents, + (DestinationResolver) name -> name.toUpperCase(Locale.ENGLISH), + null); + + // Act + eventsMessagingTemplate.send("test", MessageBuilder.withPayload("Message content") + .setHeader(EventBusMessageChannel.EVENT_SOURCE_HEADER, "custom") + .setHeader(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, "My Event") + .build()); + + // Assert + verify(amazonEvents).putEvents(new PutEventsRequest().withEntries( + new PutEventsRequestEntry().withEventBusName("TEST").withSource("custom") + .withDetailType("My Event").withDetail("Message content"))); + } + + @Test + public void convertAndSend_withDestinationPayloadAndSubject_shouldSetSourceAndDetailType() + throws Exception { + // Arrange + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + EventsMessagingTemplate eventsMessagingTemplate = new EventsMessagingTemplate( + amazonEvents); + String physicalEventBusName = "arn:aws:events:us-east-1:123456789012:event-bus/default"; + when(amazonEvents.listEventBuses(new ListEventBusesRequest())) + .thenReturn(new ListEventBusesResult().withEventBuses(new EventBus() + .withName("default").withArn(physicalEventBusName))); + + // Act + eventsMessagingTemplate.sendEvent(physicalEventBusName, "custom", "My Event", + "Message content"); + + // Assert + verify(amazonEvents) + .putEvents(new PutEventsRequest().withEntries(new PutEventsRequestEntry() + .withEventBusName("default").withSource("custom") + .withDetailType("My Event").withDetail("Message content"))); + } + + @Test + public void convertAndSend_withPayloadAndSubject_shouldSetSourceAndDetailType() + throws Exception { + // Arrange + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + EventsMessagingTemplate eventsMessagingTemplate = new EventsMessagingTemplate( + amazonEvents); + String physicalEventBusName = "arn:aws:events:us-east-1:123456789012:event-bus/default"; + eventsMessagingTemplate.setDefaultDestinationName(physicalEventBusName); + + // Act + eventsMessagingTemplate.sendEvent("custom", "My Event", "Message content"); + + // Assert + verify(amazonEvents) + .putEvents(new PutEventsRequest().withEntries(new PutEventsRequestEntry() + .withEventBusName("default").withSource("custom") + .withDetailType("My Event").withDetail("Message content"))); + } + +} diff --git a/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/support/destination/DynamicEventBusDestinationResolverTest.java b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/support/destination/DynamicEventBusDestinationResolverTest.java new file mode 100644 index 000000000..ddec22d69 --- /dev/null +++ b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/support/destination/DynamicEventBusDestinationResolverTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.support.destination; + +import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents; +import com.amazonaws.services.cloudwatchevents.model.CreateEventBusRequest; +import com.amazonaws.services.cloudwatchevents.model.CreateEventBusResult; +import com.amazonaws.services.cloudwatchevents.model.EventBus; +import com.amazonaws.services.cloudwatchevents.model.ListEventBusesRequest; +import com.amazonaws.services.cloudwatchevents.model.ListEventBusesResult; +import org.junit.Test; + +import org.springframework.cloud.aws.core.env.ResourceIdResolver; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DynamicEventBusDestinationResolverTest { + + @Test + public void resolveDestination_withAlreadyExistingArn_returnsArnWithoutValidatingIt() + throws Exception { + // Arrange + String eventBusArn = "arn:aws:events:us-east-1:123456789012:event-bus/test"; + String eventBusName = "test"; + + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + DynamicEventBusDestinationResolver resolver = new DynamicEventBusDestinationResolver( + amazonEvents); + + // Act + String resolvedDestinationName = resolver.resolveDestination(eventBusArn); + + // Assert + assertThat(resolvedDestinationName).isEqualTo(eventBusName); + } + + @Test + public void resolveDestination_withAutoCreateEnabled_shouldCreateEventBusDirectly() + throws Exception { + // Arrange + String eventBusArn = "arn:aws:events:us-east-1:123456789012:event-bus/test"; + String eventBusName = "test"; + + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + when(amazonEvents.createEventBus(new CreateEventBusRequest().withName("test"))) + .thenReturn(new CreateEventBusResult().withEventBusArn(eventBusArn)); + + DynamicEventBusDestinationResolver resolver = new DynamicEventBusDestinationResolver( + amazonEvents); + resolver.setAutoCreate(true); + + // Act + String resolvedDestinationName = resolver.resolveDestination("test"); + + // Assert + assertThat(resolvedDestinationName).isEqualTo(eventBusName); + } + + @Test + public void resolveDestination_withResourceIdResolver_shouldCallIt() + throws Exception { + // Arrange + String eventBusArn = "arn:aws:events:us-east-1:123456789012:event-bus/test"; + String eventBusName = "test"; + + ResourceIdResolver resourceIdResolver = mock(ResourceIdResolver.class); + when(resourceIdResolver.resolveToPhysicalResourceId(eventBusName)) + .thenReturn(eventBusArn); + + AmazonCloudWatchEvents amazonEvents = mock(AmazonCloudWatchEvents.class); + when(amazonEvents.listEventBuses(new ListEventBusesRequest())) + .thenReturn(new ListEventBusesResult().withEventBuses( + new EventBus().withName(eventBusName).withArn(eventBusArn))); + + DynamicEventBusDestinationResolver resolver = new DynamicEventBusDestinationResolver( + amazonEvents, resourceIdResolver); + + // Assert + String resolvedDestinationName = resolver.resolveDestination(eventBusName); + + // Assert + assertThat(resolvedDestinationName).isEqualTo(eventBusName); + } + +}