Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Append token= to CREDENTIAL param of COPY command #13

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.amazonaws.services.kinesis.connectors.redshift;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;

/**
* Utility class to handle credentials.
*/
public class CredentialsUtil {

/**
* Build a credential argument for Redshift COPY command.
*
* @param provider Credential provider.
* @return credential
*/
public static String buildCredential(AWSCredentialsProvider provider) {
AWSCredentials credentials = provider.getCredentials();
StringBuilder builder = new StringBuilder();
builder
.append("aws_access_key_id=")
.append(credentials.getAWSAccessKeyId())
.append(";aws_secret_access_key=")
.append(credentials.getAWSSecretKey());
if (credentials instanceof AWSSessionCredentials) {
builder
.append(";token=")
.append(((AWSSessionCredentials) credentials).getSessionToken());
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Properties;

import com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -55,8 +56,7 @@ public class RedshiftBasicEmitter extends S3Emitter {
private final String redshiftURL;
private final char redshiftDelimiter;
private final Properties loginProperties;
private final String accessKey;
private final String secretKey;
private final AWSCredentialsProvider credentialsProvider;

public RedshiftBasicEmitter(KinesisConnectorConfiguration configuration) {
super(configuration);
Expand All @@ -67,8 +67,7 @@ public RedshiftBasicEmitter(KinesisConnectorConfiguration configuration) {
loginProperties = new Properties();
loginProperties.setProperty("user", configuration.REDSHIFT_USERNAME);
loginProperties.setProperty("password", configuration.REDSHIFT_PASSWORD);
accessKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSAccessKeyId();
secretKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSSecretKey();
credentialsProvider = configuration.AWS_CREDENTIALS_PROVIDER;
}

@Override
Expand Down Expand Up @@ -115,8 +114,10 @@ private String generateCopyStatement(String s3File) {
StringBuilder exec = new StringBuilder();
exec.append("COPY " + redshiftTable + " ");
exec.append("FROM 's3://" + s3bucket + "/" + s3File + "' ");
exec.append("CREDENTIALS 'aws_access_key_id=" + accessKey);
exec.append(";aws_secret_access_key=" + secretKey + "' ");
exec.append("CREDENTIALS '");
exec.append(CredentialsUtil.buildCredential(credentialsProvider));
exec.append("' ");

exec.append("DELIMITER '" + redshiftDelimiter + "'");
exec.append(";");
return exec.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Properties;

import com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -71,8 +72,7 @@ public class RedshiftManifestEmitter implements IEmitter<String> {
private final String fileTable;
private final String fileKeyColumn;
private final char dataDelimiter;
private final String accessKey;
private final String secretKey;
private final AWSCredentialsProvider credentialsProvider;
private final String s3Endpoint;
private final AmazonS3Client s3Client;
private final boolean copyMandatory;
Expand All @@ -91,8 +91,7 @@ public RedshiftManifestEmitter(KinesisConnectorConfiguration configuration) {
if (s3Endpoint != null) {
s3Client.setEndpoint(s3Endpoint);
}
accessKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSAccessKeyId();
secretKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSSecretKey();
credentialsProvider = configuration.AWS_CREDENTIALS_PROVIDER;
loginProps = new Properties();
loginProps.setProperty("user", configuration.REDSHIFT_USERNAME);
loginProps.setProperty("password", configuration.REDSHIFT_PASSWORD);
Expand Down Expand Up @@ -274,9 +273,7 @@ private void redshiftCopy(Connection conn, List<String> records) throws IOExcept
redshiftCopy.append("COPY " + dataTable + " ");
redshiftCopy.append("FROM 's3://" + s3Bucket + "/" + manifestFile + "' ");
redshiftCopy.append("CREDENTIALS '");
redshiftCopy.append("aws_access_key_id=" + accessKey);
redshiftCopy.append(";");
redshiftCopy.append("aws_secret_access_key=" + secretKey);
redshiftCopy.append(CredentialsUtil.buildCredential(credentialsProvider));
redshiftCopy.append("' ");
redshiftCopy.append("DELIMITER '" + dataDelimiter + "' ");
redshiftCopy.append("MANIFEST");
Expand Down