From 83d42c681731d04044a48b1f786faeaa95860ad5 Mon Sep 17 00:00:00 2001 From: forestmvey Date: Thu, 27 Jun 2024 09:44:48 -0700 Subject: [PATCH 1/6] Updating flink dependency versions and fixing tests. Signed-off-by: forestmvey --- .../flink-connector-timestream/pom.xml | 9 ++++++++- .../timestream/TimestreamModelUtils.java | 2 +- .../connectors/timestream/SinkInitContext.java | 17 +++++++++++++++++ .../sample-kinesis-to-timestream-app/pom.xml | 2 +- 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/integrations/flink_connector/flink-connector-timestream/pom.xml b/integrations/flink_connector/flink-connector-timestream/pom.xml index 9c2acdff..5bf15ebc 100644 --- a/integrations/flink_connector/flink-connector-timestream/pom.xml +++ b/integrations/flink_connector/flink-connector-timestream/pom.xml @@ -29,7 +29,8 @@ under the License. UTF-8 - 1.17.1 + 1.18.1 + 33.2.1-jre 1.11 11 12 @@ -90,6 +91,12 @@ under the License. software.amazon.awssdk aws-crt-client 2.20.101 + + + + com.google.guava + guava + ${guava.version} diff --git a/integrations/flink_connector/flink-connector-timestream/src/main/java/com/amazonaws/samples/connectors/timestream/TimestreamModelUtils.java b/integrations/flink_connector/flink-connector-timestream/src/main/java/com/amazonaws/samples/connectors/timestream/TimestreamModelUtils.java index e89a33cd..91a9e112 100644 --- a/integrations/flink_connector/flink-connector-timestream/src/main/java/com/amazonaws/samples/connectors/timestream/TimestreamModelUtils.java +++ b/integrations/flink_connector/flink-connector-timestream/src/main/java/com/amazonaws/samples/connectors/timestream/TimestreamModelUtils.java @@ -1,6 +1,6 @@ package com.amazonaws.samples.connectors.timestream; -import org.apache.flink.shaded.guava30.com.google.common.base.Utf8; +import com.google.common.base.Utf8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/integrations/flink_connector/flink-connector-timestream/src/test/java/com/amazonaws/samples/connectors/timestream/SinkInitContext.java b/integrations/flink_connector/flink-connector-timestream/src/test/java/com/amazonaws/samples/connectors/timestream/SinkInitContext.java index 8c254d3b..a78c4f3d 100644 --- a/integrations/flink_connector/flink-connector-timestream/src/test/java/com/amazonaws/samples/connectors/timestream/SinkInitContext.java +++ b/integrations/flink_connector/flink-connector-timestream/src/test/java/com/amazonaws/samples/connectors/timestream/SinkInitContext.java @@ -1,8 +1,10 @@ package com.amazonaws.samples.connectors.timestream; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; @@ -126,4 +128,19 @@ public void setSinkMetricGroup(SinkWriterMetricGroup sinkMetricGroup) { public SinkWriterMetricGroup getSinkMetricGroup() { return this.sinkMetricGroup; } + + @Override + public TypeSerializer createInputSerializer() { + return null; + } + + @Override + public JobID getJobId() { + return null; + } + + @Override + public boolean isObjectReuseEnabled() { + return false; + } } \ No newline at end of file diff --git a/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml b/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml index af43d7db..e3069756 100644 --- a/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml +++ b/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml @@ -29,7 +29,7 @@ under the License. UTF-8 - 1.15.3 + 1.16.3 2.0.0 1.2.0 1.11 From 5673d8c308a3ff0d1416acc874a0ec280d643993 Mon Sep 17 00:00:00 2001 From: forestmvey Date: Fri, 28 Jun 2024 08:49:10 -0700 Subject: [PATCH 2/6] Updating READMEs for sample application and flink connector. Signed-off-by: forestmvey --- integrations/flink_connector/README.md | 61 +++++++++++++++++-- .../sample-data-generator/README.md | 11 +++- 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/integrations/flink_connector/README.md b/integrations/flink_connector/README.md index 25db7d48..96bec465 100644 --- a/integrations/flink_connector/README.md +++ b/integrations/flink_connector/README.md @@ -59,13 +59,62 @@ mvn clean compile && mvn package cd ../sample-kinesis-to-timestream-app aws s3 cp target/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar s3://YOUR_BUCKET_NAME/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar ``` -3. Follow the steps in [Create and Run the Kinesis Data Analytics Application](https://docs.aws.amazon.com/kinesisanalytics/latest/java/get-started-exercise.html#get-started-exercise-7) - - pick Apache Flink version 1.13.2 - - in "Edit the IAM Policy" step, add Timestream Write permissions to the created policy +3. In the AWS Console, navigate to [Managed Apache Flink](https://console.aws.amazon.com/flink/home) to create the Apache Flink application + - Choose `Apache Flink Applications` in the navigation side bar + - Choose `Create streaming application` + - pick Apache Flink version 1.18 + - Enter an application name and optional description + - Choose `Create streaming application` -4. Follow **Getting Started** section from [sample data generator](/integrations/flink_connector/sample-data-generator) to send records to Kinesis. -5. The records now should be consumed by the sample application and written to Timestream table. -6. Query Timestream table using [AWS Console](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.queries.using-console) or AWS CLI: +4. In the `Application details` pane of your new application + - Choose `IAM role` to add add the required permissions to the application role + - From the IAM Role of your application, choose the `Policy name` for the policy + - From the selected policy, choose `Edit` + - Append the following permissions to the role policy `Statement` and replace `` with the region of your deployed application, and replace `` with the AWS account id: + + ```json + { + "Effect": "Allow", + "Action": [ + "timestream:CreateDatabase" + ], + "Resource": "arn:aws:timestream:::database/kdaflink" + }, + { + "Effect": "Allow", + "Action": [ + "timestream:WriteRecords", + "timestream:CreateTable" + ], + "Resource": "arn:aws:timestream:::database/kdaflink/table/kinesisdata" + }, + { + "Effect": "Allow", + "Action": [ + "timestream:DescribeEndpoints" + ], + "Resource": "*" + }, + { + "Effect": "Allow", + "Action": [ + "kinesis:GetShardIterator", + "kinesis:GetRecords", + "kinesis:ListShards" + ], + "Resource": "arn:aws:kinesis:::stream/TimestreamTestStream" + } + ``` + +5. From the `Apache Flink applications` page, choose the application you previously created +6. Choose `Configure` +7. Under `Amazon S3 bucket` in `Application Code Location`, choose the S3 bucket that you uploaded the application to in `Step 2` +8. Under `Path to S3 object`, input the path to the application jar file +9. Choose `Save changes` +10. Choose `Run` and choose `Rune` again +11. Follow **Getting Started** section from [sample data generator](/integrations/flink_connector/sample-data-generator) to send records to Kinesis. +12. The records now should be consumed by the sample application and written to Timestream table. +13. Query Timestream table using [AWS Console](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.queries.using-console) or AWS CLI: ``` aws timestream-query query --query-string "SELECT * FROM kdaflink.kinesisdata WHERE time >= ago (15m) LIMIT 10" ``` \ No newline at end of file diff --git a/integrations/flink_connector/sample-data-generator/README.md b/integrations/flink_connector/sample-data-generator/README.md index 834685bf..38b3abac 100644 --- a/integrations/flink_connector/sample-data-generator/README.md +++ b/integrations/flink_connector/sample-data-generator/README.md @@ -5,7 +5,8 @@ This folder contains a script to generate a continuous stream of records that ar --- ## Dependencies - Boto3 -- numpy (Tested with version 1.18.5) +- distutils +- numpy (Tested with version 2.0.0) - Python3 (Tested with version 3.5.2) ---- @@ -27,7 +28,13 @@ python3 -m venv venv pip3 install numpy ``` -3. Run the following command to continuously generate and ingest sample data into Kinesis. +3. Install setuptools + + ``` + pip3 install setuptools + ``` + +4. Run the following command to continuously generate and ingest sample data into Kinesis. ``` python3 kinesis_data_gen.py --stream --region From ff8a2568fa14168d124e7e6147370ae6125968fe Mon Sep 17 00:00:00 2001 From: forestmvey Date: Fri, 28 Jun 2024 09:21:01 -0700 Subject: [PATCH 3/6] Refining README grammar and fixing typos. Signed-off-by: forestmvey --- integrations/flink_connector/README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integrations/flink_connector/README.md b/integrations/flink_connector/README.md index 96bec465..77a4a86c 100644 --- a/integrations/flink_connector/README.md +++ b/integrations/flink_connector/README.md @@ -67,10 +67,10 @@ aws s3 cp target/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar s3://YOUR_BUC - Choose `Create streaming application` 4. In the `Application details` pane of your new application - - Choose `IAM role` to add add the required permissions to the application role - - From the IAM Role of your application, choose the `Policy name` for the policy + - Choose `IAM role` to add the required permissions to the application role + - From the IAM Role of your application, choose the `Policy name` - From the selected policy, choose `Edit` - - Append the following permissions to the role policy `Statement` and replace `` with the region of your deployed application, and replace `` with the AWS account id: + - Append the following permissions to the role policy `Statement`, replacing `` with the region of your deployed application and `` with the AWS account ID: ```json { @@ -108,13 +108,13 @@ aws s3 cp target/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar s3://YOUR_BUC 5. From the `Apache Flink applications` page, choose the application you previously created 6. Choose `Configure` -7. Under `Amazon S3 bucket` in `Application Code Location`, choose the S3 bucket that you uploaded the application to in `Step 2` +7. Under `Amazon S3 bucket` in `Application Code Location`, choose the S3 bucket that the application was uploaded to in `Step 2` 8. Under `Path to S3 object`, input the path to the application jar file 9. Choose `Save changes` -10. Choose `Run` and choose `Rune` again +10. Choose `Run` and choose `Run` again 11. Follow **Getting Started** section from [sample data generator](/integrations/flink_connector/sample-data-generator) to send records to Kinesis. 12. The records now should be consumed by the sample application and written to Timestream table. 13. Query Timestream table using [AWS Console](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.queries.using-console) or AWS CLI: ``` aws timestream-query query --query-string "SELECT * FROM kdaflink.kinesisdata WHERE time >= ago (15m) LIMIT 10" -``` \ No newline at end of file +``` From d5308506261548d5e8571508129ea8413a8ad622 Mon Sep 17 00:00:00 2001 From: forestmvey Date: Fri, 28 Jun 2024 10:40:31 -0700 Subject: [PATCH 4/6] Adding ending newline to test file. Signed-off-by: forestmvey --- .../samples/connectors/timestream/SinkInitContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/flink_connector/flink-connector-timestream/src/test/java/com/amazonaws/samples/connectors/timestream/SinkInitContext.java b/integrations/flink_connector/flink-connector-timestream/src/test/java/com/amazonaws/samples/connectors/timestream/SinkInitContext.java index a78c4f3d..a88dacba 100644 --- a/integrations/flink_connector/flink-connector-timestream/src/test/java/com/amazonaws/samples/connectors/timestream/SinkInitContext.java +++ b/integrations/flink_connector/flink-connector-timestream/src/test/java/com/amazonaws/samples/connectors/timestream/SinkInitContext.java @@ -143,4 +143,4 @@ public JobID getJobId() { public boolean isObjectReuseEnabled() { return false; } -} \ No newline at end of file +} From fc4caed7cb2a5976510380b602a82e6c5c297307 Mon Sep 17 00:00:00 2001 From: forestmvey Date: Wed, 3 Jul 2024 12:48:21 -0700 Subject: [PATCH 5/6] Refining documentation and dependencies for sample application. Updating documentation for executing jar rather than using mvn. Signed-off-by: forestmvey --- integrations/flink_connector/README.md | 3 ++- .../flink-connector-timestream/pom.xml | 13 ++++++--- .../README.md | 2 +- .../sample-kinesis-to-timestream-app/pom.xml | 27 ++++++++++--------- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/integrations/flink_connector/README.md b/integrations/flink_connector/README.md index 77a4a86c..f71826b4 100644 --- a/integrations/flink_connector/README.md +++ b/integrations/flink_connector/README.md @@ -100,7 +100,8 @@ aws s3 cp target/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar s3://YOUR_BUC "Action": [ "kinesis:GetShardIterator", "kinesis:GetRecords", - "kinesis:ListShards" + "kinesis:ListShards", + "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:::stream/TimestreamTestStream" } diff --git a/integrations/flink_connector/flink-connector-timestream/pom.xml b/integrations/flink_connector/flink-connector-timestream/pom.xml index 5bf15ebc..6a129ebc 100644 --- a/integrations/flink_connector/flink-connector-timestream/pom.xml +++ b/integrations/flink_connector/flink-connector-timestream/pom.xml @@ -21,7 +21,7 @@ under the License. 4.0.0 flink-connector-timestream com.amazonaws.samples.connectors.timestream - 0.3-SNAPSHOT + 0.5-SNAPSHOT jar flink-connector-timestream @@ -33,6 +33,7 @@ under the License. 33.2.1-jre 1.11 11 + 2.26.9 12 ${java.version} ${java.version} @@ -43,7 +44,7 @@ under the License. software.amazon.awssdk bom - 2.20.101 + ${awssdk.version} pom import @@ -63,6 +64,11 @@ under the License. org.apache.flink flink-connector-base ${flink.version} + + + org.apache.flink + flink-core + ${flink.version} @@ -85,12 +91,13 @@ under the License. software.amazon.awssdk timestreamwrite + ${awssdk.version} software.amazon.awssdk aws-crt-client - 2.20.101 + ${awssdk.version} diff --git a/integrations/flink_connector/sample-kinesis-to-timestream-app/README.md b/integrations/flink_connector/sample-kinesis-to-timestream-app/README.md index edbeaa8b..354bbbf3 100644 --- a/integrations/flink_connector/sample-kinesis-to-timestream-app/README.md +++ b/integrations/flink_connector/sample-kinesis-to-timestream-app/README.md @@ -33,7 +33,7 @@ mvn clean compile && mvn package You can run the application on your local environment by running `StreamingJobs` class: ``` -mvn install exec:java -Dexec.mainClass="com.amazonaws.samples.kinesis2timestream.StreamingJob" -Dexec.args="--InputStreamName TimestreamTestStream --Region us-east-1 --TimestreamDbName kdaflink --TimestreamTableName kinesisdata" -Dexec.classpathScope=test +java -jar target/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar --InputStreamName TimestreamTestStream --Region us-east-1 --TimestreamDbName kdaflink --TimestreamTableName kinesisdata ``` In another terminal, concurrently, run the [python sample data generator](../sample-data-generator). \ No newline at end of file diff --git a/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml b/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml index e3069756..fc71f279 100644 --- a/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml +++ b/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml @@ -29,7 +29,8 @@ under the License. UTF-8 - 1.16.3 + 1.18.1 + 4.2.0-1.18 2.0.0 1.2.0 1.11 @@ -38,6 +39,7 @@ under the License. ${java.version} ${java.version} 2.17.1 + 2.0.12 @@ -45,7 +47,7 @@ under the License. software.amazon.awssdk bom - 2.17.203 + 2.26.9 pom import @@ -73,27 +75,28 @@ under the License. org.apache.flink flink-java ${flink.version} - provided org.apache.flink flink-streaming-java ${flink.version} - provided org.apache.flink flink-clients ${flink.version} - provided - + + org.apache.flink + flink-core + ${flink.version} + org.apache.flink flink-connector-kinesis - ${flink.version} + ${flink.kinesis.version} com.amazonaws @@ -115,13 +118,13 @@ under the License. software.amazon.timestream flink-connector-timestream - 0.3 + 0.5 - software.amazon.awssdk timestreamwrite + 2.26.9 @@ -149,13 +152,12 @@ under the License. org.slf4j slf4j-api - 1.7.32 + ${slf4j.version} org.slf4j slf4j-log4j12 - 1.7.32 - runtime + ${slf4j.version} de.javakaffee @@ -233,7 +235,6 @@ under the License. org.apache.flink:force-shading com.google.code.findbugs:jsr305 - org.slf4j:* log4j:* From 230e2615b03573b5896cdbc7a0ebea17039bd382 Mon Sep 17 00:00:00 2001 From: forestmvey Date: Wed, 3 Jul 2024 16:15:08 -0700 Subject: [PATCH 6/6] Minor formatting fixes. Signed-off-by: forestmvey --- integrations/flink_connector/flink-connector-timestream/pom.xml | 2 +- .../flink_connector/sample-kinesis-to-timestream-app/pom.xml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/integrations/flink_connector/flink-connector-timestream/pom.xml b/integrations/flink_connector/flink-connector-timestream/pom.xml index 6a129ebc..84f0c14b 100644 --- a/integrations/flink_connector/flink-connector-timestream/pom.xml +++ b/integrations/flink_connector/flink-connector-timestream/pom.xml @@ -64,7 +64,7 @@ under the License. org.apache.flink flink-connector-base ${flink.version} - + org.apache.flink flink-core diff --git a/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml b/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml index fc71f279..f4ab7485 100644 --- a/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml +++ b/integrations/flink_connector/sample-kinesis-to-timestream-app/pom.xml @@ -70,7 +70,6 @@ under the License. - org.apache.flink flink-java