Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Adding support for EventBridge #525

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions .idea/misc.xml

This file was deleted.

2 changes: 1 addition & 1 deletion spring-cloud-aws-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<name>Spring Cloud AWS Dependencies</name>
<description>Spring Cloud AWS Dependencies</description>
<properties>
<aws-java-sdk.version>1.11.415</aws-java-sdk.version>
<aws-java-sdk.version>1.11.624</aws-java-sdk.version>
jmnarloch marked this conversation as resolved.
Show resolved Hide resolved
<elasticache.version>1.1.1</elasticache.version>
<jmemcached.version>1.0.0</jmemcached.version>
<spring-cloud-context.version>1.3.2.RELEASE</spring-cloud-context.version>
Expand Down
4 changes: 4 additions & 0 deletions spring-cloud-aws-messaging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-events</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.config.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.context.annotation.Import;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({ EventBridgeConfiguration.class })
public @interface EnableEventBridge {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring Cloud has moved away from @Enable annotations for the most part. Seems like this could just be auto-configuration with @ConditionalOnClass guards


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.config.annotation;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents;
import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEventsClient;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.context.annotation.ConditionalOnMissingAmazonClient;
import org.springframework.cloud.aws.core.config.AmazonWebserviceClientFactoryBean;
import org.springframework.cloud.aws.core.region.RegionProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
@Configuration(proxyBeanMethods = false)
public class EventBridgeConfiguration {

@Autowired(required = false)
private AWSCredentialsProvider awsCredentialsProvider;

@Autowired(required = false)
private RegionProvider regionProvider;

@ConditionalOnMissingAmazonClient(AmazonCloudWatchEvents.class)
@Bean
public AmazonWebserviceClientFactoryBean<AmazonCloudWatchEventsClient> amazonEvents() {
return new AmazonWebserviceClientFactoryBean<>(AmazonCloudWatchEventsClient.class,
this.awsCredentialsProvider, this.regionProvider);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.core;

import java.util.Optional;

import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents;
import com.amazonaws.services.cloudwatchevents.model.PutEventsRequest;
import com.amazonaws.services.cloudwatchevents.model.PutEventsRequestEntry;

import org.springframework.messaging.Message;
import org.springframework.messaging.support.AbstractMessageChannel;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
public class EventBusMessageChannel extends AbstractMessageChannel {

/**
* The 'source' message header.
*/
public static final String EVENT_SOURCE_HEADER = "EVENT_SOURCE_HEADER";

/**
* The 'detail-type' message header.
*/
public static final String EVENT_DETAIL_TYPE_HEADER = "EVENT_DETAIL_TYPE_HEADER";

private final AmazonCloudWatchEvents amazonEvents;

private final String eventBus;

public EventBusMessageChannel(AmazonCloudWatchEvents amazonEvents, String eventBus) {
this.amazonEvents = amazonEvents;
this.eventBus = eventBus;
}

@Override
protected boolean sendInternal(Message<?> message, long timeout) {
PutEventsRequestEntry entry = new PutEventsRequestEntry()
.withEventBusName(eventBus).withSource(findEventSource(message))
.withDetailType(findEventDetailType(message))
.withDetail(message.getPayload().toString());
amazonEvents.putEvents(new PutEventsRequest().withEntries(entry));
return true;
}

private static String findEventSource(Message<?> message) {
return findHeaderValue(message, EVENT_SOURCE_HEADER);
}

private static String findEventDetailType(Message<?> message) {
return findHeaderValue(message, EVENT_DETAIL_TYPE_HEADER);
}

private static String findHeaderValue(Message<?> message, String header) {
return Optional.ofNullable(message.getHeaders().get(header)).map(Object::toString)
.orElse(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.core;

import java.util.HashMap;
import java.util.Map;

import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents;

import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate;
import org.springframework.cloud.aws.messaging.support.destination.DynamicEventBusDestinationResolver;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.core.DestinationResolver;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
public class EventsMessagingTemplate
extends AbstractMessageChannelMessagingSendingTemplate<EventBusMessageChannel> {

private final AmazonCloudWatchEvents amazonEvents;

public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents) {
this(amazonEvents, (ResourceIdResolver) null, null);
}

public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents,
ResourceIdResolver resourceIdResolver, MessageConverter messageConverter) {
super(new DynamicEventBusDestinationResolver(amazonEvents, resourceIdResolver));
this.amazonEvents = amazonEvents;
initMessageConverter(messageConverter);
}

public EventsMessagingTemplate(AmazonCloudWatchEvents amazonEvents,
DestinationResolver<String> destinationResolver,
MessageConverter messageConverter) {
super(destinationResolver);
this.amazonEvents = amazonEvents;
initMessageConverter(messageConverter);
}

@Override
protected EventBusMessageChannel resolveMessageChannel(
String physicalResourceIdentifier) {
return new EventBusMessageChannel(this.amazonEvents, physicalResourceIdentifier);
}

/**
* Convenience method that sends an event identified by {@literal source} and
* {@literal detailType} with the given {@literal message} to the
* {@literal destination}.
* @param source The event source
* @param detailType The event detail-type
* @param message The event body to send
*/
public void sendEvent(String source, String detailType, Object message) {
Map<String, Object> headers = new HashMap<>();
headers.put(EventBusMessageChannel.EVENT_SOURCE_HEADER, source);
headers.put(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, detailType);
this.convertAndSend(getRequiredDefaultDestination(), message, headers);
}

/**
* Convenience method that sends an event identified by {@literal source} and
* {@literal detailType} with the given {@literal message} to the specific
* {@literal eventBus}.
* @param eventBus The event bus name
* @param source The event source
* @param detailType The event detail-type
* @param message The event body to send
*/
public void sendEvent(String eventBus, String source, String detailType,
Object message) {
Map<String, Object> headers = new HashMap<>();
headers.put(EventBusMessageChannel.EVENT_SOURCE_HEADER, source);
headers.put(EventBusMessageChannel.EVENT_DETAIL_TYPE_HEADER, detailType);
this.convertAndSend(eventBus, message, headers);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.support.destination;

import com.amazonaws.services.cloudwatchevents.AmazonCloudWatchEvents;
import com.amazonaws.services.cloudwatchevents.model.CreateEventBusRequest;

import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.core.naming.AmazonResourceName;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.core.DestinationResolver;

/**
* @author Jakub Narloch
* @since 2.3.0
*/
public class DynamicEventBusDestinationResolver implements DestinationResolver<String> {

private final AmazonCloudWatchEvents amazonEvents;

private final ResourceIdResolver resourceIdResolver;

private boolean autoCreate;

public DynamicEventBusDestinationResolver(AmazonCloudWatchEvents amazonEvents) {
this(amazonEvents, null);
}

public DynamicEventBusDestinationResolver(AmazonCloudWatchEvents amazonEvents,
ResourceIdResolver resourceIdResolver) {
this.amazonEvents = amazonEvents;
this.resourceIdResolver = resourceIdResolver;
}

public void setAutoCreate(boolean autoCreate) {
this.autoCreate = autoCreate;
}

@Override
public String resolveDestination(String name) throws DestinationResolutionException {
if (autoCreate) {
amazonEvents.createEventBus(new CreateEventBusRequest().withName(name))
.getEventBusArn();
return name;
}

String eventBusName = name;
if (resourceIdResolver != null) {
eventBusName = resourceIdResolver.resolveToPhysicalResourceId(name);
}

if (eventBusName != null
&& AmazonResourceName.isValidAmazonResourceName(eventBusName)) {
return AmazonResourceName.fromString(eventBusName).getResourceName();
}

return eventBusName;
}

}
Loading