Skip to content

Commit

Permalink
Merge pull request awslabs#191 from Bit-Quill/integ-update-flink-conn…
Browse files Browse the repository at this point in the history
…ector-dependency

Upgrade Flink Connector Dependency
  • Loading branch information
sethusrinivasan authored Jul 5, 2024
2 parents 343a19d + 728a4dc commit 4d10828
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 30 deletions.
64 changes: 57 additions & 7 deletions integrations/flink_connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,63 @@ mvn clean compile && mvn package
cd ../sample-kinesis-to-timestream-app
aws s3 cp target/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar s3://YOUR_BUCKET_NAME/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar
```
3. Follow the steps in [Create and Run the Kinesis Data Analytics Application](https://docs.aws.amazon.com/kinesisanalytics/latest/java/get-started-exercise.html#get-started-exercise-7)
- pick Apache Flink version 1.13.2
- in "Edit the IAM Policy" step, add Timestream Write permissions to the created policy
3. In the AWS Console, navigate to [Managed Apache Flink](https://console.aws.amazon.com/flink/home) to create the Apache Flink application
- Choose `Apache Flink Applications` in the navigation side bar
- Choose `Create streaming application`
- pick Apache Flink version 1.18
- Enter an application name and optional description
- Choose `Create streaming application`

4. Follow **Getting Started** section from [sample data generator](/integrations/flink_connector/sample-data-generator) to send records to Kinesis.
5. The records now should be consumed by the sample application and written to Timestream table.
6. Query Timestream table using [AWS Console](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.queries.using-console) or AWS CLI:
4. In the `Application details` pane of your new application
- Choose `IAM role` to add the required permissions to the application role
- From the IAM Role of your application, choose the `Policy name`
- From the selected policy, choose `Edit`
- Append the following permissions to the role policy `Statement`, replacing `<region>` with the region of your deployed application and `<account-id>` with the AWS account ID:

```json
{
"Effect": "Allow",
"Action": [
"timestream:CreateDatabase"
],
"Resource": "arn:aws:timestream:<region>:<account-id>:database/kdaflink"
},
{
"Effect": "Allow",
"Action": [
"timestream:WriteRecords",
"timestream:CreateTable"
],
"Resource": "arn:aws:timestream:<region>:<account-id>:database/kdaflink/table/kinesisdata"
},
{
"Effect": "Allow",
"Action": [
"timestream:DescribeEndpoints"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListShards",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:<region>:<account-id>:stream/TimestreamTestStream"
}
```

5. From the `Apache Flink applications` page, choose the application you previously created
6. Choose `Configure`
7. Under `Amazon S3 bucket` in `Application Code Location`, choose the S3 bucket that the application was uploaded to in `Step 2`
8. Under `Path to S3 object`, input the path to the application jar file
9. Choose `Save changes`
10. Choose `Run` and choose `Run` again
11. Follow **Getting Started** section from [sample data generator](/integrations/flink_connector/sample-data-generator) to send records to Kinesis.
12. The records now should be consumed by the sample application and written to Timestream table.
13. Query Timestream table using [AWS Console](https://docs.aws.amazon.com/timestream/latest/developerguide/console_timestream.html#console_timestream.queries.using-console) or AWS CLI:
```
aws timestream-query query --query-string "SELECT * FROM kdaflink.kinesisdata WHERE time >= ago (15m) LIMIT 10"
```
```
22 changes: 18 additions & 4 deletions integrations/flink_connector/flink-connector-timestream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-connector-timestream</artifactId>
<groupId>com.amazonaws.samples.connectors.timestream</groupId>
<version>0.3-SNAPSHOT</version>
<version>0.5-SNAPSHOT</version>
<packaging>jar</packaging>

<name>flink-connector-timestream</name>
<url>https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/flink_connector/flink-connector-timestream</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<flink.version>1.18.1</flink.version>
<guava.version>33.2.1-jre</guava.version>
<java.version>1.11</java.version>
<jdk.version>11</jdk.version>
<awssdk.version>2.26.9</awssdk.version>
<next.jdk.version>12</next.jdk.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
Expand All @@ -42,7 +44,7 @@ under the License.
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.20.101</version>
<version>${awssdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -63,6 +65,11 @@ under the License.
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Flink Test Utils -->
<dependency>
Expand All @@ -84,12 +91,19 @@ under the License.
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>timestreamwrite</artifactId>
<version>${awssdk.version}</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
<version>2.20.101</version>
<version>${awssdk.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- Test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.amazonaws.samples.connectors.timestream;

import org.apache.flink.shaded.guava30.com.google.common.base.Utf8;
import com.google.common.base.Utf8;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.amazonaws.samples.connectors.timestream;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
Expand Down Expand Up @@ -126,4 +128,19 @@ public void setSinkMetricGroup(SinkWriterMetricGroup sinkMetricGroup) {
public SinkWriterMetricGroup getSinkMetricGroup() {
return this.sinkMetricGroup;
}
}

@Override
public <IN> TypeSerializer<IN> createInputSerializer() {
return null;
}

@Override
public JobID getJobId() {
return null;
}

@Override
public boolean isObjectReuseEnabled() {
return false;
}
}
11 changes: 9 additions & 2 deletions integrations/flink_connector/sample-data-generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ This folder contains a script to generate a continuous stream of records that ar
---
## Dependencies
- Boto3
- numpy (Tested with version 1.18.5)
- distutils
- numpy (Tested with version 2.0.0)
- Python3 (Tested with version 3.5.2)

----
Expand All @@ -27,7 +28,13 @@ python3 -m venv venv
pip3 install numpy
```

3. Run the following command to continuously generate and ingest sample data into Kinesis.
3. Install setuptools

```
pip3 install setuptools
```

4. Run the following command to continuously generate and ingest sample data into Kinesis.

```
python3 kinesis_data_gen.py --stream <name_of_the_kinesis_stream> --region <the_region_of_kinesis_stream>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mvn clean compile && mvn package

You can run the application on your local environment by running `StreamingJobs` class:
```
mvn install exec:java -Dexec.mainClass="com.amazonaws.samples.kinesis2timestream.StreamingJob" -Dexec.args="--InputStreamName TimestreamTestStream --Region us-east-1 --TimestreamDbName kdaflink --TimestreamTableName kinesisdata" -Dexec.classpathScope=test
java -jar target/sample-kinesis-to-timestream-app-0.1-SNAPSHOT.jar --InputStreamName TimestreamTestStream --Region us-east-1 --TimestreamDbName kdaflink --TimestreamTableName kinesisdata
```

In another terminal, concurrently, run the [python sample data generator](../sample-data-generator).
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ under the License.

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.3</flink.version>
<flink.version>1.18.1</flink.version>
<flink.kinesis.version>4.2.0-1.18</flink.kinesis.version>
<kda.version>2.0.0</kda.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<java.version>1.11</java.version>
Expand All @@ -38,14 +39,15 @@ under the License.
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
<slf4j.version>2.0.12</slf4j.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.17.203</version>
<version>2.26.9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -68,32 +70,32 @@ under the License.

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- For reading from Kinesis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>${flink.version}</version>
<version>${flink.kinesis.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand All @@ -115,13 +117,13 @@ under the License.
<dependency>
<groupId>software.amazon.timestream</groupId>
<artifactId>flink-connector-timestream</artifactId>
<version>0.3</version>
<version>0.5</version>
</dependency>

<!-- sdk dependencies -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>timestreamwrite</artifactId>
<version>2.26.9</version>
</dependency>

<!-- Test & compile dependencies -->
Expand Down Expand Up @@ -149,13 +151,12 @@ under the License.
<dependency>
<groupId>org.slf4j</groupId>
<artifactId> slf4j-api</artifactId>
<version>1.7.32</version>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.32</version>
<scope>runtime</scope>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
Expand Down Expand Up @@ -233,7 +234,6 @@ under the License.
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
Expand Down

0 comments on commit 4d10828

Please sign in to comment.