Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#111, #112 : Kinesis Channel issues fixed. #129

Merged
merged 8 commits into from
Sep 5, 2023
7 changes: 6 additions & 1 deletion channel/kinesis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ By using the Amazon Kinesis channel, TIBCO BusinessEvents can convert Kinesis da
| Field | Global Var? | Description |
|---|---|---|
Access Key | Yes | Key used in combination with the Secret Key to make programmatic requests to AWS. For example, AKIAIOSFODNN7EXAMPLE. The access key is similar to a user name used in a username-password pair.
Secret Key | Yes | Key used in combination with the access key to make programmatic requests to AWS. For example, wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY. The access key is similar to the user name used in a user name-password pair.
Secret Key | Yes | Key used in combination with the Access key to make programmatic requests to AWS. For example, wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY. The access key is similar to the user name used in a user name-password pair.
Session Token | Yes | Key used in combination with Access and Secret key to make programmatic requests to AWS.
Profile Name| Yes | You can store frequently used credentials and configuration settings in files. These files are divided into sections that are referenced by name. These sections are called profiles. Enter the profile name that contains the credentials and configuration settings that you want to use.
Assumed Role ARN | Yes | This option is used in combination with Role Session Name. When your account is role based and the operations can be performed by assuming another role, this authentication method should be used. In addition, access key, secret key and session token is required.
Role Session Name | Yes | This option is used in combination with Assume Role ARN. When your account is role based and the operations can be performed by assuming another role, this authentication method should be used. In addition, access key, secret key and session token is required.

* Three options are available #1 Profile Name (configuration and credential file setup is must). #2 Combination of Access Key, Secret Key and Session Token. #3 Combination of Assumed Role ARN and Role Session Name (Requires Access Key, Secret Key and Session Token).

## AWS Kinesis Destination Configuration Properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder;
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.tibco.be.custom.channel.BaseChannel;
import com.tibco.cep.kernel.service.logging.Level;

Expand All @@ -31,11 +36,32 @@ public void init() throws Exception {
String access_key = props.getProperty("access_key");
String secret_key = props.getProperty("secret_key");
String profile_name = props.getProperty("profile_name");
String session_token = props.getProperty("session_token");
String role_arn = props.getProperty("role_arn");
String region = props.getProperty("region.name");
String role_session_name = props.getProperty("role_session_name");

if (profile_name != null && !profile_name.isEmpty()) {
if (profile_name != null && !profile_name.isBlank()) {
credentialsProvider = new AWSCredentialsProviderChain(new ProfileCredentialsProvider(profile_name));
} else if (role_arn != null && !role_arn.isBlank()) {

AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(
new BasicSessionCredentials(access_key, secret_key, session_token)))
.withRegion(region).build();
AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withRoleArn(role_arn).withRoleSessionName(role_session_name);
AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
Credentials creds = assumeRoleResult.getCredentials();
credentialsProvider = new AWSStaticCredentialsProvider(
new BasicSessionCredentials(creds.getAccessKeyId(),
creds.getSecretAccessKey(),
creds.getSessionToken())
);

} else {
credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(access_key, secret_key));
credentialsProvider = new AWSStaticCredentialsProvider(
new BasicSessionCredentials(access_key, secret_key, session_token));

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.tibco.be.custom.channel.kinesis;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -9,6 +11,8 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.ListStreamsRequest;
import com.amazonaws.services.kinesis.model.ListStreamsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.tibco.be.custom.channel.BaseDestination;
import com.tibco.be.custom.channel.BaseEventSerializer;
Expand Down Expand Up @@ -44,6 +48,13 @@ public void bind(EventProcessor ep) throws Exception {
*/
@Override
public void start() throws Exception {

this.createClient();
if(!this.isStreamExisiting()) {
getLogger().log(Level.DEBUG, "Kinesis Data Stream not found. - " + this.getUri());
throw new RuntimeException("Kinesis Data Stream is not available. Failed to start the channel.");
}

if (consumer != null) {
executor = Executors.newFixedThreadPool(1, new DefaultThreadFactory("KinesisConsumer-" + this.getUri()));
this.executor.submit(consumer);
Expand Down Expand Up @@ -98,13 +109,10 @@ public void send(EventWithId event, Map overrideData) throws Exception {

this.serializationProperties.put(KinesisProperties.KEY_DESTINATION_INCLUDE_EVENTTYPE, true);
Object record = serializer.serializeUserEvent(event, serializationProperties);
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
clientBuilder.setRegion((String) getChannel().getGlobalVariableValue(getDestinationProperties().getProperty(KinesisProperties.KEY_DESTINATION_REGION_NAME)));
KinesisChannel kinesischannel = (KinesisChannel) getChannel();
AWSCredentialsProvider credentialsProvider = kinesischannel.getCredentialsProvider();

clientBuilder.setCredentials(credentialsProvider);
kinesisClient = clientBuilder.build();

if (kinesisClient == null) {
this.createClient();
}

if (record instanceof PutRecordRequest) {
kinesisClient.putRecord((PutRecordRequest) record);
Expand Down Expand Up @@ -140,4 +148,32 @@ public void resume() {

}
}


private void createClient() {
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
clientBuilder.setRegion((String) getChannel().getGlobalVariableValue(getDestinationProperties().getProperty(KinesisProperties.KEY_DESTINATION_REGION_NAME)));
KinesisChannel kinesischannel = (KinesisChannel) getChannel();
AWSCredentialsProvider credentialsProvider = kinesischannel.getCredentialsProvider();

clientBuilder.setCredentials(credentialsProvider);
kinesisClient = clientBuilder.build();
}

private boolean isStreamExisiting() {
String streamName = (String) getChannel().getGlobalVariableValue(getDestinationProperties().getProperty(KinesisProperties.KEY_DESTINATION_STREAM_NAME));
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
ListStreamsResult listStreamsResult = kinesisClient.listStreams(listStreamsRequest);
List<String> streamNames = new ArrayList<String>();
streamNames.addAll(listStreamsResult.getStreamNames());
while (listStreamsResult.getHasMoreStreams())
{
if (streamNames.size() > 0) {
listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1));
}
listStreamsResult = kinesisClient.listStreams(listStreamsRequest);
streamNames.addAll(listStreamsResult.getStreamNames());
}
return streamNames.stream().anyMatch(name -> streamName.equals(name));
}
}
3 changes: 3 additions & 0 deletions channel/kinesis/src/main/resources/drivers.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
<properties>
<property name="access_key" displayName="Access Key" type="String" default="" mandatory="true" />
<property name="secret_key" displayName="Secret Key" type="String" default="" mandatory="true" />
<property name="session_token" displayName="Session Token" type="String" default="" mandatory="true" />
<property name="profile_name" displayName="Profile Name" type="String" default="" />
<property name="role_arn" displayName="Assumed Role ARN" type="String" default=""/>
<property name="role_session_name" displayName="Role Session Name" type="String" default=""/>
</properties>
<destinations>
<property name="IncludeEventType" displayName="Include Event Type" type="String" default="ALWAYS" mandatory="true" gvToggle="true" />
Expand Down