From 1861f12db7748f240e4963a9cf82b9f11eae7760 Mon Sep 17 00:00:00 2001 From: "Dosani, Adnan" Date: Fri, 22 May 2015 02:09:47 -0700 Subject: [PATCH] Version 1.3.0 of the Amazon Kinesis Client Library A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch. --- META-INF/MANIFEST.MF | 4 ++-- README.md | 3 +++ pom.xml | 4 ++-- .../lib/worker/KinesisClientLibConfiguration.java | 2 +- .../lib/worker/KinesisDataFetcher.java | 9 ++------- .../clientlibrary/lib/worker/ProcessTask.java | 14 ++++++++++++-- .../kinesis/leases/impl/KinesisClientLease.java | 2 +- .../leases/impl/KinesisClientLeaseSerializer.java | 8 ++------ .../services/kinesis/leases/impl/Lease.java | 6 ++---- .../kinesis/leases/impl/LeaseManager.java | 11 ++++++----- .../kinesis/leases/impl/LeaseRenewer.java | 3 ++- .../services/kinesis/leases/impl/LeaseTaker.java | 3 ++- .../kinesis/metrics/impl/MetricsHelper.java | 15 +++++++-------- 13 files changed, 44 insertions(+), 40 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index c7a8dc48a..165ab34ad 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.2.1 +Bundle-Version: 1.3.0 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", @@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.6", com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0", org.apache.httpcomponents.httpcore;bundle-version="4.3.2", org.apache.httpcomponents.httpclient;bundle-version="4.3.4" - com.amazonaws.sdk;bundle-version="1.9.16", + com.amazonaws.sdk;bundle-version="1.9.37", Export-Package: com.amazonaws.services.kinesis, com.amazonaws.services.kinesis.clientlibrary, com.amazonaws.services.kinesis.clientlibrary.config, diff --git a/README.md b/README.md index b2e1bcf0b..db248be0b 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.3.0 (May 22, 2015) +* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch. + ### Release 1.2.1 (January 26, 2015) * **MultiLangDaemon** Changes to the MultiLangDaemon to make it easier to provide a custom worker. diff --git a/pom.xml b/pom.xml index 4f8ccf635..0eef743bd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.2.1 + 1.3.0 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. https://aws.amazon.com/kinesis @@ -23,7 +23,7 @@ - 1.9.16 + 1.9.37 diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 74a3f2cd5..8f08ebb60 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.1"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.3.0"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 7b23c32f4..d890ac16d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -14,13 +14,10 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; @@ -57,17 +54,15 @@ public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) { * @param maxRecords Max records to fetch * @return list of records of up to maxRecords size */ - public List getRecords(int maxRecords) { + public GetRecordsResult getRecords(int maxRecords) { if (!isInitialized) { throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization."); } - List records = null; GetRecordsResult response = null; if (nextIterator != null) { try { response = kinesisProxy.get(nextIterator, maxRecords); - records = response.getRecords(); nextIterator = response.getNextShardIterator(); } catch (ResourceNotFoundException e) { LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId); @@ -80,7 +75,7 @@ public List getRecords(int maxRecords) { isShardEndReached = true; } - return records; + return response; } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index d3f72d646..2886c2999 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.cloudwatch.model.StandardUnit; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; @@ -37,6 +38,8 @@ class ProcessTask implements ITask { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; + private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest"; + private static final Log LOG = LogFactory.getLog(ProcessTask.class); private final ShardInfo shardInfo; @@ -93,7 +96,14 @@ public TaskResult call() { boolean shardEndReached = true; return new TaskResult(null, shardEndReached); } - List records = getRecords(); + final GetRecordsResult getRecordsResult = getRecords(); + + if (getRecordsResult.getMillisBehindLatest() != null) { + scope.addData(MILLIS_BEHIND_LATEST_METRIC, getRecordsResult.getMillisBehindLatest(), + StandardUnit.Milliseconds); + } + + final List records = getRecordsResult.getRecords(); if (records.isEmpty()) { LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId()); @@ -180,7 +190,7 @@ private String getMaxSequenceNumber(IMetricsScope scope, List records) { * @throws KinesisClientLibException if reading checkpoints fails in the edge case where we haven't passed any * records to the client code yet */ - private List getRecords() throws KinesisClientLibException { + private GetRecordsResult getRecords() throws KinesisClientLibException { int maxRecords = streamConfig.getMaxRecords(); try { return dataFetcher.getRecords(maxRecords); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index a24dd9283..bd1c097d9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -49,7 +49,7 @@ public void update(T other) { } KinesisClientLease casted = (KinesisClientLease) other; - // Do not update ownerSwitchesSinceCheckpoint here - that field is maintained by the leasing library. + setOwnerSwitchesSinceCheckpoint(casted.ownerSwitchesSinceCheckpoint); setCheckpoint(casted.checkpoint); setParentShardIds(casted.parentShardIds); } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 55f8abc88..28e55d193 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -95,16 +95,12 @@ public Map getDynamoLeaseCounterUpdate(KinesisClie public Map getDynamoTakeLeaseUpdate(KinesisClientLease lease, String newOwner) { Map result = baseSerializer.getDynamoTakeLeaseUpdate(lease, newOwner); - Long ownerSwitchesSinceCheckpoint = lease.getOwnerSwitchesSinceCheckpoint(); String oldOwner = lease.getLeaseOwner(); if (oldOwner != null && !oldOwner.equals(newOwner)) { - ownerSwitchesSinceCheckpoint++; + result.put(OWNER_SWITCHES_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(1L), + AttributeAction.ADD)); } - result.put(OWNER_SWITCHES_KEY, - new AttributeValueUpdate(DynamoUtils.createAttributeValue(ownerSwitchesSinceCheckpoint), - AttributeAction.PUT)); - return result; } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java index b5f1a5ae4..3cb365fba 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.leases.impl; import java.util.UUID; +import java.util.concurrent.TimeUnit; import com.amazonaws.util.json.JSONObject; @@ -31,11 +32,8 @@ public class Lease { * * Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two * values will be very large. We will consider leases to be expired if they are more than a year old. - * - * 365 days per year * 24 hours per day * 60 minutes per hour * 60 seconds per minute * 1000000000 - * nanoseconds/second */ - private static final long MAX_ABS_AGE_NANOS = 365 * 24 * 60 * 60 * 1000000000L; + private static final long MAX_ABS_AGE_NANOS = TimeUnit.DAYS.toNanos(365); private String leaseKey; private String leaseOwner; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index bb06fb4fb..e2970defe 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -160,14 +161,14 @@ public boolean leaseTableExists() throws DependencyException { @Override public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException { - long sleepTimeRemaining = timeoutSeconds * 1000; + long sleepTimeRemaining = TimeUnit.SECONDS.toMillis(timeoutSeconds); while (!leaseTableExists()) { if (sleepTimeRemaining <= 0) { return false; } - long timeToSleepMillis = Math.min(1000 * secondsBetweenPolls, sleepTimeRemaining); + long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining); sleepTimeRemaining -= sleep(timeToSleepMillis); } @@ -385,7 +386,7 @@ public boolean takeLease(T lease, String owner) verifyNotNull(owner, "owner cannot be null"); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Taking lease with shardId %s from %s to %s", + LOG.debug(String.format("Taking lease with leaseKey %s from %s to %s", lease.getLeaseKey(), lease.getLeaseOwner() == null ? "nobody" : lease.getLeaseOwner(), owner)); @@ -428,7 +429,7 @@ public boolean evictLease(T lease) verifyNotNull(lease, "lease cannot be null"); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Voiding lease with shardId %s owned by %s", + LOG.debug(String.format("Evicting lease with leaseKey %s owned by %s", lease.getLeaseKey(), lease.getLeaseOwner())); } @@ -485,7 +486,7 @@ public void deleteLease(T lease) throws DependencyException, InvalidStateExcepti verifyNotNull(lease, "lease cannot be null"); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Deleting lease with shardId %s", lease.getLeaseKey())); + LOG.debug(String.format("Deleting lease with leaseKey %s", lease.getLeaseKey())); } DeleteItemRequest deleteRequest = new DeleteItemRequest(); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java index 7f640074f..e36deeb62 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java @@ -22,6 +22,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,7 +58,7 @@ public class LeaseRenewer implements ILeaseRenewer { public LeaseRenewer(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) { this.leaseManager = leaseManager; this.workerIdentifier = workerIdentifier; - this.leaseDurationNanos = leaseDurationMillis * 1000000L; + this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java index 4617f9276..7ae97f798 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.java @@ -25,6 +25,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -68,7 +69,7 @@ public Long call() { public LeaseTaker(ILeaseManager leaseManager, String workerIdentifier, long leaseDurationMillis) { this.leaseManager = leaseManager; this.workerIdentifier = workerIdentifier; - this.leaseDurationNanos = leaseDurationMillis * 1000000; + this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java index f1f44b0a5..a4d9d9d2e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java +++ b/src/main/java/com/amazonaws/services/kinesis/metrics/impl/MetricsHelper.java @@ -80,16 +80,16 @@ public static void addSuccessAndLatency(long startTimeMillis, boolean success) { public static void addSuccessAndLatency(String prefix, long startTimeMillis, boolean success) { addSuccessAndLatencyPerShard(null, prefix, startTimeMillis, success); } - + public static void addSuccessAndLatencyPerShard ( - String shardId, - String prefix, - long startTimeMillis, + String shardId, + String prefix, + long startTimeMillis, boolean success) { IMetricsScope scope = getMetricsScope(); String realPrefix = prefix == null ? "" : prefix + SEP; - + if (shardId != null) { scope.addDimension("ShardId", shardId); } @@ -103,10 +103,9 @@ public static void addSuccessAndLatencyPerShard ( public static void endScope() { IMetricsScope scope = getMetricsScope(); if (scope != null) { - Integer refCount = referenceCount.get(); - refCount--; + referenceCount.set(referenceCount.get() - 1); - if (refCount == 0) { + if (referenceCount.get() == 0) { scope.end(); currentScope.remove(); }