Skip to content

Commit

Permalink
'Version 1.2.1 of the Amazon Kinesis Client Library'
Browse files Browse the repository at this point in the history
  • Loading branch information
Kurtis Norwood committed Jan 26, 2015
1 parent 9b1549e commit 0fc90ff
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 163 deletions.
27 changes: 15 additions & 12 deletions META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,35 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.2.0
Bundle-Version: 1.2.1
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
org.apache.commons.logging;bundle-version="1.1.1";visibility:=reexport,
com.fasterxml.jackson.core.jackson-databind;bundle-version="2.1.1",
com.fasterxml.jackson.core.jackson-core;bundle-version="2.1.1",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.1.1",
org.apache.httpcomponents.httpcore;bundle-version="4.2.0",
org.apache.httpcomponents.httpclient;bundle-version="4.2.0"
com.amazonaws.sdk;bundle-version="1.7.13",
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
org.apache.commons.logging;bundle-version="1.1.3";visibility:=reexport,
com.fasterxml.jackson.core.jackson-databind;bundle-version="2.3.2",
com.fasterxml.jackson.core.jackson-core;bundle-version="2.3.2",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0",
org.apache.httpcomponents.httpcore;bundle-version="4.3.2",
org.apache.httpcomponents.httpclient;bundle-version="4.3.4"
com.amazonaws.sdk;bundle-version="1.9.16",
Export-Package: com.amazonaws.services.kinesis,
com.amazonaws.services.kinesis.clientlibrary,
com.amazonaws.services.kinesis.clientlibrary.config,
com.amazonaws.services.kinesis.clientlibrary.exceptions,
com.amazonaws.services.kinesis.clientlibrary.exceptions.internal,
com.amazonaws.services.kinesis.clientlibrary.interfaces,
com.amazonaws.services.kinesis.clientlibrary.types,
com.amazonaws.services.kinesis.clientlibrary.proxies,
com.amazonaws.services.kinesis.clientlibrary.lib,
com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint,
com.amazonaws.services.kinesis.clientlibrary.lib.worker,
com.amazonaws.services.kinesis.clientlibrary.proxies,
com.amazonaws.services.kinesis.clientlibrary.types,
com.amazonaws.services.kinesis.leases,
com.amazonaws.services.kinesis.leases.exceptions,
com.amazonaws.services.kinesis.leases.impl,
com.amazonaws.services.kinesis.leases.interfaces,
com.amazonaws.services.kinesis.leases.util,
com.amazonaws.services.kinesis.metrics,
com.amazonaws.services.kinesis.metrics.impl,
com.amazonaws.services.kinesis.metrics.interfaces
com.amazonaws.services.kinesis.metrics.interfaces,
com.amazonaws.services.kinesis.multilang,
com.amazonaws.services.kinesis.multilang.messages,
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.

## Release Notes
### Release 1.2.1 (January 26, 2015)
* **MultiLangDaemon** Changes to the MultiLangDaemon to make it easier to provide a custom worker.

### Release 1.2 (October 21, 2014)
* **Multi-Language Support** Amazon KCL now supports implementing record processors in any language by communicating with the daemon over [STDIN and STDOUT][multi-lang-protocol]. Python developers can directly use the [Amazon Kinesis Client Library for Python][kclpy] to write their data processing applications.

Expand Down
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.2.0</version>
<version>1.2.1</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
<url>https://aws.amazon.com/kinesis</url>

Expand All @@ -23,7 +23,7 @@
</licenses>

<properties>
<aws-java-sdk.version>1.7.13</aws-java-sdk.version>
<aws-java-sdk.version>1.9.16</aws-java-sdk.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -51,6 +51,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
Expand All @@ -64,6 +65,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.0";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.1";

/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,6 +73,10 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
@Override
public synchronized void checkpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
if (LOG.isDebugEnabled()) {
LOG.debug("Checkpointing " + shardInfo.getShardId() + ", " + " token " + shardInfo.getConcurrencyToken()
+ " at largest permitted value " + this.largestPermittedCheckpointValue);
}
advancePosition(this.largestPermittedCheckpointValue);
}

Expand All @@ -86,13 +90,21 @@ public synchronized void checkpoint(String sequenceNumber)

// throws exception if sequence number shouldn't be checkpointed for this shard
sequenceNumberValidator.validateSequenceNumber(sequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Validated checkpoint sequence number " + sequenceNumber + " for " + shardInfo.getShardId()
+ ", token " + shardInfo.getConcurrencyToken());
}
/*
* If there isn't a last checkpoint value, we only care about checking the upper bound.
* If there is a last checkpoint value, we want to check both the lower and upper bound.
*/
if ((checkpointValueComparator.compare(lastCheckpointValue, sequenceNumber) <= 0)
&& checkpointValueComparator.compare(sequenceNumber, largestPermittedCheckpointValue) <= 0) {

if (LOG.isDebugEnabled()) {
LOG.debug("Checkpointing " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
+ " at specific sequence number " + sequenceNumber);
}
this.advancePosition(sequenceNumber);
} else {
throw new IllegalArgumentException("Could not checkpoint at sequence number " + sequenceNumber
Expand Down Expand Up @@ -162,15 +174,14 @@ void advancePosition(String sequenceNumber)
// Don't checkpoint a value we already successfully checkpointed
if (sequenceNumber != null && !sequenceNumber.equals(lastCheckpointValue)) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
+ " checkpoint to " + checkpointValue);
}
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointValue, shardInfo.getConcurrencyToken());
lastCheckpointValue = checkpointValue;
} catch (ThrottlingException e) {
throw e;
} catch (ShutdownException e) {
throw e;
} catch (InvalidStateException e) {
throw e;
} catch (KinesisClientLibDependencyException e) {
} catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) {
throw e;
} catch (KinesisClientLibException e) {
LOG.warn("Caught exception setting checkpoint.", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,7 +74,7 @@ public class Worker implements Runnable {
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager controlServer;

private boolean shutdown;
private volatile boolean shutdown;

// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,18 +227,22 @@ public String getWorkerIdentifier() {
* Stops background threads.
*/
public void stop() {
threadpool.shutdown();
try {
if (threadpool.awaitTermination(STOP_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
LOG.info(String.format("Worker %s has successfully stopped lease-tracking threads", leaseTaker.getWorkerIdentifier()));
} else {
threadpool.shutdownNow();
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
if (threadpool != null) {
threadpool.shutdown();
try {
if (threadpool.awaitTermination(STOP_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
LOG.info(String.format("Worker %s has successfully stopped lease-tracking threads", leaseTaker.getWorkerIdentifier()));
} else {
threadpool.shutdownNow();
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
leaseTaker.getWorkerIdentifier(),
STOP_WAIT_TIME_MILLIS));
}
} catch (InterruptedException e) {
LOG.debug("Encountered InterruptedException when awaiting threadpool termination");
}
} catch (InterruptedException e) {
LOG.debug("Encountered InterruptedException when awaiting threadpool termination");
} else {
LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool.");
}

leaseRenewer.clearCurrentlyHeldLeases();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -119,7 +119,7 @@ public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(Lease
if (lease.getLeaseOwner() == null) {
eav = new ExpectedAttributeValue(false);
} else {
new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
}

result.put(LEASE_OWNER_KEY, eav);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -157,7 +157,7 @@ public void runOnce() {
/**
* Overrideable for testing purposes.
*/
long getTime() {
protected long getTime() {
return System.currentTimeMillis();
}

Expand Down
Loading

0 comments on commit 0fc90ff

Please sign in to comment.