Skip to content

Commit

Permalink
Flink upgrade to 1.17.2 (#100)
Browse files Browse the repository at this point in the history
* #OBS-I210: Updated the hudi connector to read the atomic_creation from the configuration

* MasterDataset Data Product changes (#93)

* #OBS-#I129: Enhanced the masterdata processor data product the remove the datasources of configured retention periods only

* #OBS-#I129: Reading retention period value from the configuration

* #OBS-#I129: Removed the unwanted line

* #OBS-I129: Code indentation changes

* #OBS-#I129: Refactor - Reused the getDataSourceRefFormat method in other places

* #OBS-I182: Cache indexer job changes to process only live status of Master Datasets (#97)

* #OBS-I113 flink update to 1.17.2

* #OBS-I113 update entry class to sunbird

* #OBS-I113 update jackson versions and dependencies

* #OBS-I113 - Fix test cases for metrics post flink upgrade. And added configuration to shade jackson if necessary

* #OBS-I113 update flink dockerfile to load jar from usrlib

* #OBS-I113 update hudi to use jackson 2.13.4

* #OBS-I113 update hudi dockerfile

---------

Co-authored-by: Manjunath Davanam <[email protected]>
Co-authored-by: Santhosh Vasabhaktula <[email protected]>
  • Loading branch information
3 people authored Oct 7, 2024
1 parent c311704 commit 8185b31
Show file tree
Hide file tree
Showing 19 changed files with 408 additions and 340 deletions.
56 changes: 35 additions & 21 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,47 +1,61 @@
FROM --platform=linux/x86_64 maven:3.9.4-eclipse-temurin-11-focal AS build-core
FROM maven:3.9.4-eclipse-temurin-11-focal AS build-core
COPY . /app
RUN mvn clean install -DskipTests -f /app/framework/pom.xml
RUN mvn clean install -DskipTests -f /app/dataset-registry/pom.xml
RUN mvn clean install -DskipTests -f /app/transformation-sdk/pom.xml

FROM --platform=linux/x86_64 maven:3.9.4-eclipse-temurin-11-focal AS build-pipeline
FROM maven:3.9.4-eclipse-temurin-11-focal AS build-pipeline
COPY --from=build-core /root/.m2 /root/.m2
COPY . /app
RUN mvn clean package -DskipTests -f /app/pipeline/pom.xml

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as extractor-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS extractor-image
USER flink
COPY --from=build-pipeline /app/pipeline/extractor/target/extractor-1.0.0.jar $FLINK_HOME/lib/
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=build-pipeline /app/pipeline/extractor/target/extractor-1.0.0.jar $FLINK_HOME/usrlib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as preprocessor-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS preprocessor-image
USER flink
COPY --from=build-pipeline /app/pipeline/preprocessor/target/preprocessor-1.0.0.jar $FLINK_HOME/lib/
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=build-pipeline /app/pipeline/preprocessor/target/preprocessor-1.0.0.jar $FLINK_HOME/usrlib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as denormalizer-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS denormalizer-image
USER flink
COPY --from=build-pipeline /app/pipeline/denormalizer/target/denormalizer-1.0.0.jar $FLINK_HOME/lib/
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=build-pipeline /app/pipeline/denormalizer/target/denormalizer-1.0.0.jar $FLINK_HOME/usrlib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as transformer-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS transformer-image
USER flink
COPY --from=build-pipeline /app/pipeline/transformer/target/transformer-1.0.0.jar $FLINK_HOME/lib/
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=build-pipeline /app/pipeline/transformer/target/transformer-1.0.0.jar $FLINK_HOME/usrlib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as router-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS router-image
USER flink
COPY --from=build-pipeline /app/pipeline/druid-router/target/druid-router-1.0.0.jar $FLINK_HOME/lib/
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=build-pipeline /app/pipeline/druid-router/target/druid-router-1.0.0.jar $FLINK_HOME/usrlib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as unified-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS unified-image
USER flink
COPY --from=build-pipeline /app/pipeline/unified-pipeline/target/unified-pipeline-1.0.0.jar $FLINK_HOME/lib/
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=build-pipeline /app/pipeline/unified-pipeline/target/unified-pipeline-1.0.0.jar $FLINK_HOME/usrlib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as master-data-processor-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS master-data-processor-image
USER flink
COPY --from=build-pipeline /app/pipeline/master-data-processor/target/master-data-processor-1.0.0.jar $FLINK_HOME/lib
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=build-pipeline /app/pipeline/master-data-processor/target/master-data-processor-1.0.0.jar $FLINK_HOME/usrlib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.0-scala_2.12-lakehouse as lakehouse-connector-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS lakehouse-connector-image
USER flink
RUN mkdir $FLINK_HOME/custom-lib
COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custom-lib
RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.17.2/flink-s3-fs-hadoop-1.17.2.jar
RUN wget https://repo.maven.apache.org/maven2/org/apache/hudi/hudi-flink1.17-bundle/0.15.0/hudi-flink1.17-bundle-0.15.0.jar
RUN mv flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib
RUN mv flink-s3-fs-hadoop-1.17.2.jar $FLINK_HOME/lib
RUN mv hudi-flink1.17-bundle-0.15.0.jar $FLINK_HOME/lib
# RUN mkdir $FLINK_HOME/custom-lib
COPY --from=build-pipeline /app/pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/lib

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as cache-indexer-image
FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS cache-indexer-image
USER flink
COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/lib
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/usrlib/
22 changes: 18 additions & 4 deletions data-products/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,22 @@
<scoverage.plugin.version>1.4.0</scoverage.plugin.version>
<java.target.runtime>11</java.target.runtime>
<log4j.version>2.14.1</log4j.version>
<!-- <jackson-bom.version>2.13.4.20221013</jackson-bom.version> -->
<jackson-bom.version>2.17.2</jackson-bom.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
<version>${jackson-bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -67,17 +81,17 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.15.2</version>
<!-- <version>2.15.2</version> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
<!-- <version>2.15.2</version> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.maj.version}</artifactId>
<version>2.15.2</version>
<!-- <version>2.15.2</version> -->
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
Expand All @@ -88,7 +102,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
<!-- <version>2.15.2</version> -->
</dependency>
<dependency>
<groupId>org.sunbird.obsrv</groupId>
Expand Down
8 changes: 8 additions & 0 deletions dataset-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.maj.version>2.12</scala.maj.version>
<scala.version>2.12.11</scala.version>
<flink.version>1.17.2</flink.version>
<java.target.runtime>11</java.target.runtime>
<scoverage.plugin.version>1.4.0</scoverage.plugin.version>
<release-version>release-4.6.0</release-version>
Expand Down Expand Up @@ -80,6 +81,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.sunbird.obsrv.spec

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.runtime.testutils.InMemoryReporter
import org.sunbird.obsrv.core.util.{PostgresConnect, PostgresConnectionConfig}
import org.sunbird.spec.BaseSpecWithPostgres

import scala.collection.JavaConverters._
import scala.collection.mutable

class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres {
Expand Down Expand Up @@ -42,8 +45,8 @@ class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres {
}

private def insertTestData(postgresConnect: PostgresConnect): Unit = {
postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, data_version, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d1', 'event', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":"+config.getInt("redis.port")+",\"redis_db\":2}', 'Live', 2, 'v1', 'ingest', 'System', 'System', now(), now());")
postgresConnect.execute("update datasets set denorm_config = '{\"redis_db_host\":\"localhost\",\"redis_db_port\":"+config.getInt("redis.port")+",\"denorm_fields\":[{\"denorm_key\":\"vehicleCode\",\"redis_db\":2,\"denorm_out_field\":\"vehicleData\"}]}' where id='d1';")
postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, data_version, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d1', 'event', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":" + config.getInt("redis.port") + ",\"redis_db\":2}', 'Live', 2, 'v1', 'ingest', 'System', 'System', now(), now());")
postgresConnect.execute("update datasets set denorm_config = '{\"redis_db_host\":\"localhost\",\"redis_db_port\":" + config.getInt("redis.port") + ",\"denorm_fields\":[{\"denorm_key\":\"vehicleCode\",\"redis_db\":2,\"denorm_out_field\":\"vehicleData\"}]}' where id='d1';")
postgresConnect.execute("insert into dataset_transformations values('tf1', 'd1', 'dealer.email', '{\"type\":\"mask\",\"expr\":\"dealer.email\"}', 'Strict', 'System', 'System', now(), now());")
postgresConnect.execute("insert into dataset_transformations values('tf2', 'd1', 'dealer.maskedPhone', '{\"type\":\"mask\",\"expr\": \"dealer.phone\"}', null, 'System', 'System', now(), now());")
postgresConnect.execute("insert into datasets(id, type, data_schema, router_config, dataset_config, status, api_version, entry_topic, created_by, updated_by, created_date, updated_date, tags) values ('d2', 'event', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"topic\":\"d2-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'Live', 'v1', 'ingest', 'System', 'System', now(), now(), ARRAY['Tag1','Tag2']);")
Expand All @@ -60,4 +63,20 @@ class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres {
}).groupBy(f => f._1).mapValues(f => f.map(p => (p._2, p._3, p._4))).mapValues(f => f.groupBy(p => p._1).mapValues(q => q.map(r => (r._2, r._3)).toMap))
}

def getMetrics(metricsReporter: InMemoryReporter, dataset: String, debug: Option[Boolean] = None): Map[String, Long] = {
val groups = metricsReporter.findGroups(dataset).asScala
groups.map(group => metricsReporter.getMetricsByGroup(group).asScala)
.map(group => group.map { case (k, v) =>
val value = if(v.isInstanceOf[ScalaGauge[Long]]) v.asInstanceOf[ScalaGauge[Long]].getValue() else 0
if (debug.isDefined && debug.get)
Console.println("Metric", k, value)
k -> value
})
.map(f => f.toMap)
.reduce((map1, map2) => {
val mergedMap = map2.map { case (k: String, v: Long) => k -> (v + map1.getOrElse(k, 0l)) }
map1 ++ mergedMap
})
}

}
31 changes: 29 additions & 2 deletions framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,20 @@
<jackson-jaxrs.version>1.9.13</jackson-jaxrs.version>
<scoverage.plugin.version>1.4.0</scoverage.plugin.version>
<release-version>release-4.6.0</release-version>
<!-- <jackson-bom.version>2.13.4.20221013</jackson-bom.version> -->
<jackson-bom.version>2.17.2</jackson-bom.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
<version>${jackson-bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -30,6 +43,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -54,17 +73,25 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
<!-- <version>2.15.2</version> -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.maj.version}</artifactId>
<version>2.15.2</version>
<!-- <version>2.15.2</version> -->
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Loading

0 comments on commit 8185b31

Please sign in to comment.