getDynamoLeaseOwnerExpectation(Lease
if (lease.getLeaseOwner() == null) {
eav = new ExpectedAttributeValue(false);
} else {
- new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
+ eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
}
result.put(LEASE_OWNER_KEY, eav);
diff --git a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/CWPublisherRunnable.java b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/CWPublisherRunnable.java
index 7047fd07c..50371ee4c 100644
--- a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/CWPublisherRunnable.java
+++ b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/CWPublisherRunnable.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -157,7 +157,7 @@ public void runOnce() {
/**
* Overrideable for testing purposes.
*/
- long getTime() {
+ protected long getTime() {
return System.currentTimeMillis();
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java
index 90767d062..8b74cabc5 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@@ -15,42 +15,38 @@
package com.amazonaws.services.kinesis.multilang;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PrintStream;
-import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
/**
* Main app that launches the worker that runs the multi-language record processor.
- *
+ *
* Requires a properties file containing configuration for this daemon and the KCL. A properties file should at minimum
* define these properties:
- *
+ *
*
* # The script that abides by the multi-language protocol. This script will
* # be executed by the MultiLangDaemon, which will communicate with this script
* # over STDIN and STDOUT according to the multi-language protocol.
* executableName = sampleapp.py
- *
+ *
* # The name of an Amazon Kinesis stream to process.
* streamName = words
- *
+ *
* # Used by the KCL as the name of this application. Will be used as the name
* # of a Amazon DynamoDB table which will store the lease and checkpoint
* # information for workers with this application name.
* applicationName = PythonKCLSample
- *
+ *
* # Users can change the credentials provider the KCL will use to retrieve credentials.
* # The DefaultAWSCredentialsProviderChain checks several other providers, which is
* # described here:
@@ -62,91 +58,47 @@ public class MultiLangDaemon implements Callable {
private static final Log LOG = LogFactory.getLog(MultiLangDaemon.class);
- private static final String USER_AGENT = "amazon-kinesis-multi-lang-daemon";
- private static final String VERSION = "1.0.0";
-
- private static final String PROP_EXECUTABLE_NAME = "executableName";
- private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
- private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
-
- private KinesisClientLibConfiguration configuration;
-
- private MultiLangRecordProcessorFactory recordProcessorFactory;
-
- private ExecutorService workerThreadPool;
-
- private String processingLanguage;
+ private Worker worker;
/**
* Constructor.
+ *
+ * @param configuration The KCL config to use.
+ * @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang
+ * protocol.
+ * @param workerThreadPool The executor service to run the daemon in.
*/
- MultiLangDaemon(String processingLanguage,
- KinesisClientLibConfiguration configuration,
+ public MultiLangDaemon(KinesisClientLibConfiguration configuration,
MultiLangRecordProcessorFactory recordProcessorFactory,
ExecutorService workerThreadPool) {
- this.processingLanguage = processingLanguage;
- this.configuration = configuration;
- this.recordProcessorFactory = recordProcessorFactory;
- this.workerThreadPool = workerThreadPool;
+ this(new Worker(recordProcessorFactory, configuration, workerThreadPool));
}
- static void printUsage(PrintStream stream, String message) {
- StringBuilder builder = new StringBuilder();
- if (message != null) {
- builder.append(message);
- }
- builder.append(String.format("java %s ", MultiLangDaemon.class.getCanonicalName()));
- stream.println(builder.toString());
- }
-
- static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
- Properties properties = new Properties();
- try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) {
- properties.load(propertiesStream);
- return properties;
- }
- }
-
- static boolean validateProperties(Properties properties) {
- return properties != null && properties.getProperty(PROP_EXECUTABLE_NAME) != null;
+ /**
+ *
+ * @param worker A worker to use instead of the default worker.
+ */
+ public MultiLangDaemon(Worker worker) {
+ this.worker = worker;
}
/**
- * This method will cause the MultiLangDaemon to read its configuration and build a worker with a
- * MultiLangRecordProcessorFactory for the executable specified in the provided properties.
+ * Utility for describing how to run this app.
+ *
+ * @param stream Where to output the usage info.
+ * @param messageToPrepend An optional error message to describe why the usage is being printed.
*/
- void prepare() {
- // Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
- java.security.Security.setProperty("networkaddress.cache.ttl", "60");
-
- LOG.info("Using workerId: " + configuration.getWorkerIdentifier());
- LOG.info("Using credentials with access key id: "
- + configuration.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId());
-
- StringBuilder userAgent = new StringBuilder(KinesisClientLibConfiguration.KINESIS_CLIENT_LIB_USER_AGENT);
- userAgent.append(" ");
- userAgent.append(USER_AGENT);
- userAgent.append("/");
- userAgent.append(VERSION);
-
- if (processingLanguage != null) {
- userAgent.append(" ");
- userAgent.append(processingLanguage);
- }
-
- if (recordProcessorFactory.getCommandArray().length > 0) {
- userAgent.append(" ");
- userAgent.append(recordProcessorFactory.getCommandArray()[0]);
+ public static void printUsage(PrintStream stream, String messageToPrepend) {
+ StringBuilder builder = new StringBuilder();
+ if (messageToPrepend != null) {
+ builder.append(messageToPrepend);
}
-
- LOG.debug(String.format("User Agent string is: %s", userAgent.toString()));
- configuration.withUserAgent(userAgent.toString());
+ builder.append(String.format("java %s ", MultiLangDaemon.class.getCanonicalName()));
+ stream.println(builder.toString());
}
@Override
public Integer call() throws Exception {
- prepare();
- Worker worker = new Worker(recordProcessorFactory, configuration, workerThreadPool);
int exitCode = 0;
try {
worker.run();
@@ -157,22 +109,6 @@ public Integer call() throws Exception {
return exitCode;
}
- private static int getMaxActiveThreads(Properties properties) {
- return Integer.parseInt(properties.getProperty(PROP_MAX_ACTIVE_THREADS, "0"));
- }
-
- private static ExecutorService getExecutorService(Properties properties) {
- int maxActiveThreads = getMaxActiveThreads(properties);
- LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
- if (maxActiveThreads <= 0) {
- LOG.info("Using a cached thread pool.");
- return Executors.newCachedThreadPool();
- } else {
- LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
- return Executors.newFixedThreadPool(maxActiveThreads);
- }
- }
-
/**
* @param args Accepts a single argument, that argument is a properties file which provides KCL configuration as
* well as the name of an executable.
@@ -183,44 +119,30 @@ public static void main(String[] args) {
printUsage(System.err, "You must provide a properties file");
System.exit(1);
}
- Properties properties = null;
+ MultiLangDaemonConfig config = null;
try {
- properties = loadProperties(Thread.currentThread().getContextClassLoader(), args[0]);
+ config = new MultiLangDaemonConfig(args[0]);
} catch (IOException e) {
printUsage(System.err, "You must provide a properties file");
System.exit(1);
+ } catch (IllegalArgumentException e) {
+ printUsage(System.err, e.getMessage());
+ System.exit(1);
}
- if (validateProperties(properties)) {
+ ExecutorService executorService = config.getExecutorService();
- // Configuration
- KinesisClientLibConfiguration kinesisClientLibConfiguration =
- new KinesisClientLibConfigurator().getConfiguration(properties);
- String executableName = properties.getProperty(PROP_EXECUTABLE_NAME);
+ // Daemon
+ MultiLangDaemon daemon = new MultiLangDaemon(
+ config.getKinesisClientLibConfiguration(),
+ config.getRecordProcessorFactory(),
+ executorService);
- ExecutorService executorService = getExecutorService(properties);
-
- // Factory
- MultiLangRecordProcessorFactory recordProcessorFactory =
- new MultiLangRecordProcessorFactory(executableName, executorService);
-
- // Daemon
- MultiLangDaemon daemon =
- new MultiLangDaemon(properties.getProperty(PROP_PROCESSING_LANGUAGE),
- kinesisClientLibConfiguration, recordProcessorFactory, executorService);
-
- LOG.info("Running " + kinesisClientLibConfiguration.getApplicationName() + " to process stream "
- + kinesisClientLibConfiguration.getStreamName() + " with executable " + executableName);
-
- Future future = executorService.submit(daemon);
- try {
- System.exit(future.get());
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Encountered an error while running daemon", e);
- }
- } else {
- printUsage(System.err, "Must provide an executable name in the properties file, "
- + "e.g. executableName = sampleapp.py");
+ Future future = executorService.submit(daemon);
+ try {
+ System.exit(future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Encountered an error while running daemon", e);
}
System.exit(1);
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java
new file mode 100644
index 000000000..7793f12bc
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package com.amazonaws.services.kinesis.multilang;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+
+/**
+ * This class captures the configuration needed to run the MultiLangDaemon.
+ */
+public class MultiLangDaemonConfig {
+
+ private static final Log LOG = LogFactory.getLog(MultiLangDaemonConfig.class);
+
+ private static final String USER_AGENT = "amazon-kinesis-multi-lang-daemon";
+ private static final String VERSION = "1.0.1";
+
+ private static final String PROP_EXECUTABLE_NAME = "executableName";
+ private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
+ private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
+
+ private KinesisClientLibConfiguration kinesisClientLibConfig;
+
+ private ExecutorService executorService;
+
+ private MultiLangRecordProcessorFactory recordProcessorFactory;
+
+ /**
+ * Constructor.
+ *
+ * @param propertiesFile The location of the properties file.
+ * @throws IOException Thrown when the properties file can't be accessed.
+ * @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected.
+ */
+ public MultiLangDaemonConfig(String propertiesFile) throws IOException, IllegalArgumentException {
+ this(propertiesFile, Thread.currentThread().getContextClassLoader());
+ }
+
+ /**
+ *
+ * @param propertiesFile The location of the properties file.
+ * @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a
+ * unit test.
+ * @throws IOException Thrown when the properties file can't be accessed.
+ * @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected.
+ */
+ public MultiLangDaemonConfig(String propertiesFile, ClassLoader classLoader) throws IOException,
+ IllegalArgumentException {
+ this(propertiesFile, classLoader, new KinesisClientLibConfigurator());
+ }
+
+ /**
+ *
+ * @param propertiesFile The location of the properties file.
+ * @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a
+ * unit test.
+ * @param configurator A configurator to use.
+ * @throws IOException Thrown when the properties file can't be accessed.
+ * @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected.
+ */
+ public MultiLangDaemonConfig(String propertiesFile,
+ ClassLoader classLoader,
+ KinesisClientLibConfigurator configurator) throws IOException, IllegalArgumentException {
+ Properties properties = loadProperties(classLoader, propertiesFile);
+ if (!validateProperties(properties)) {
+ throw new IllegalArgumentException("Must provide an executable name in the properties file, "
+ + "e.g. executableName = sampleapp.py");
+ }
+
+ String executableName = properties.getProperty(PROP_EXECUTABLE_NAME);
+ String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE);
+
+ kinesisClientLibConfig = configurator.getConfiguration(properties);
+ executorService = buildExecutorService(properties);
+ recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService);
+
+ LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream "
+ + kinesisClientLibConfig.getStreamName() + " with executable " + executableName);
+ prepare(processingLanguage);
+ }
+
+ private void prepare(String processingLanguage) {
+ // Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
+ java.security.Security.setProperty("networkaddress.cache.ttl", "60");
+
+ LOG.info("Using workerId: " + kinesisClientLibConfig.getWorkerIdentifier());
+ LOG.info("Using credentials with access key id: "
+ + kinesisClientLibConfig.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId());
+
+ StringBuilder userAgent = new StringBuilder(KinesisClientLibConfiguration.KINESIS_CLIENT_LIB_USER_AGENT);
+ userAgent.append(" ");
+ userAgent.append(USER_AGENT);
+ userAgent.append("/");
+ userAgent.append(VERSION);
+
+ if (processingLanguage != null) {
+ userAgent.append(" ");
+ userAgent.append(processingLanguage);
+ }
+
+ if (recordProcessorFactory.getCommandArray().length > 0) {
+ userAgent.append(" ");
+ userAgent.append(recordProcessorFactory.getCommandArray()[0]);
+ }
+
+ LOG.info(String.format("MultiLangDaemon is adding the following fields to the User Agent: %s",
+ userAgent.toString()));
+ kinesisClientLibConfig.withUserAgent(userAgent.toString());
+ }
+
+ private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
+ Properties properties = new Properties();
+ try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) {
+ properties.load(propertiesStream);
+ return properties;
+ }
+ }
+
+ private static boolean validateProperties(Properties properties) {
+ return properties != null && properties.getProperty(PROP_EXECUTABLE_NAME) != null;
+ }
+
+ private static int getMaxActiveThreads(Properties properties) {
+ return Integer.parseInt(properties.getProperty(PROP_MAX_ACTIVE_THREADS, "0"));
+ }
+
+ private static ExecutorService buildExecutorService(Properties properties) {
+ int maxActiveThreads = getMaxActiveThreads(properties);
+ LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
+ if (maxActiveThreads <= 0) {
+ LOG.info("Using a cached thread pool.");
+ return Executors.newCachedThreadPool();
+ } else {
+ LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
+ return Executors.newFixedThreadPool(maxActiveThreads);
+ }
+ }
+
+ /**
+ *
+ * @return A KinesisClientLibConfiguration object based on the properties file provided.
+ */
+ public KinesisClientLibConfiguration getKinesisClientLibConfiguration() {
+ return kinesisClientLibConfig;
+ }
+
+ /**
+ *
+ * @return An executor service based on the properties file provided.
+ */
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ /**
+ *
+ * @return A MultiLangRecordProcessorFactory based on the properties file provided.
+ */
+ public MultiLangRecordProcessorFactory getRecordProcessorFactory() {
+ return recordProcessorFactory;
+ }
+}