From 811ce895eecd54c6f7bd9c792a7b21b7c4e35091 Mon Sep 17 00:00:00 2001 From: JeraldJF Date: Thu, 3 Oct 2024 13:27:58 +0530 Subject: [PATCH 1/5] #OBS-I230: fix: kafka command fixes --- command-service/src/command/kafka_command.py | 50 ++++++++++--------- command-service/src/config/service_config.yml | 4 +- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/command-service/src/command/kafka_command.py b/command-service/src/command/kafka_command.py index f27b583c..13691e1f 100644 --- a/command-service/src/command/kafka_command.py +++ b/command-service/src/command/kafka_command.py @@ -3,7 +3,7 @@ from config import Config from model.data_models import Action, ActionResponse, CommandPayload from service.http_service import HttpService -from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.admin import AdminClient, NewTopic, KafkaError from command.dataset_command import DatasetCommand class KafkaCommand(ICommand): @@ -14,36 +14,38 @@ def __init__(self, config: Config, http_service: HttpService, dataset_command: D def execute(self, command_payload: CommandPayload, action: Action): result = None - if action == Action.CREATE_KAFKA_TOPIC.name: - print( - f"Invoking CREATE_KAFKA_TOPIC command for dataset_id {command_payload.dataset_id}..." - ) - self.config_obj = Config() - dataset_id = command_payload.dataset_id - live_dataset, data_version = self.dataset_command._check_for_live_record( - dataset_id - ) - topic = live_dataset.router_config['topic'] - brokers = self.config_obj.find("kafka.brokers") - print(f"broker", brokers) - result = self.create_kafka_topic(topic, brokers, 1, 1) - return result + dataset_id = command_payload.dataset_id + live_dataset, data_version = self.dataset_command._check_for_live_record(dataset_id) + if live_dataset is None: + if action == Action.CREATE_KAFKA_TOPIC.name: + print( + f"Invoking CREATE_KAFKA_TOPIC command for dataset_id {dataset_id}..." + ) + draft_dataset_record = self.dataset_command._get_draft_dataset(dataset_id) + topic = draft_dataset_record.get("router_config")["topic"] + broker = self.config.find("kafka.brokers") + num_partitions = self.config.find("kafka.no_of_partitions") + replication_factor = self.config.find("kafka.replication_factor") + print(f"topic is", topic) + result = self.create_kafka_topic(topic, broker, num_partitions, replication_factor) + return result + return ActionResponse(status="OK", status_code=200) def create_kafka_topic(self, topic, broker, num_partitions, replication_factor): admin_client = AdminClient({'bootstrap.servers': broker}) - print(f"topic is",topic) new_topic = [NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor)] + errValue = ActionResponse(status="ERROR", status_code=500) try: fs = admin_client.create_topics(new_topic) for topic, f in fs.items(): - try: - f.result() - print(f"Topic '{topic}' created successfully") - except Exception as e: - print(f"Failed to create topic '{topic}': {e}") + f.result() + print(f"Topic '{topic}' created successfully") + return ActionResponse(status="OK", status_code=200) + except KafkaError as topicEx: + print(f"Error:", topicEx) + return errValue except Exception as e: - print(f"Error creating topic: {e}") - - return ActionResponse(status="OK", status_code=200) \ No newline at end of file + print(f"Error creating topic: {e}") + return errValue \ No newline at end of file diff --git a/command-service/src/config/service_config.yml b/command-service/src/config/service_config.yml index 6f12a413..80276238 100644 --- a/command-service/src/config/service_config.yml +++ b/command-service/src/config/service_config.yml @@ -15,8 +15,8 @@ flink: commands: PUBLISH_DATASET: workflow: - - MAKE_DATASET_LIVE - CREATE_KAFKA_TOPIC + - MAKE_DATASET_LIVE - SUBMIT_INGESTION_TASKS - START_PIPELINE_JOBS - DEPLOY_CONNECTORS @@ -167,6 +167,8 @@ kafka: brokers: localhost:9092 telemetry: topic: system.telemetry.events + replication_factor: 1 + no_of_partitions: 1 connector_jobs: spark: From 8b0ec90cfc43b1074d0296eed14ce536dcdbb321 Mon Sep 17 00:00:00 2001 From: JeraldJF Date: Fri, 4 Oct 2024 15:13:59 +0530 Subject: [PATCH 2/5] #OBS-I247: fix: feedback changes. --- command-service/src/command/kafka_command.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/command-service/src/command/kafka_command.py b/command-service/src/command/kafka_command.py index 13691e1f..60578d5c 100644 --- a/command-service/src/command/kafka_command.py +++ b/command-service/src/command/kafka_command.py @@ -3,7 +3,7 @@ from config import Config from model.data_models import Action, ActionResponse, CommandPayload from service.http_service import HttpService -from confluent_kafka.admin import AdminClient, NewTopic, KafkaError +from confluent_kafka.admin import AdminClient, NewTopic, KafkaError, KafkaException from command.dataset_command import DatasetCommand class KafkaCommand(ICommand): @@ -33,18 +33,21 @@ def execute(self, command_payload: CommandPayload, action: Action): def create_kafka_topic(self, topic, broker, num_partitions, replication_factor): - admin_client = AdminClient({'bootstrap.servers': broker}) - new_topic = [NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor)] errValue = ActionResponse(status="ERROR", status_code=500) try: + admin_client = AdminClient({'bootstrap.servers': broker}) + new_topic = [NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor)] fs = admin_client.create_topics(new_topic) for topic, f in fs.items(): f.result() print(f"Topic '{topic}' created successfully") return ActionResponse(status="OK", status_code=200) - except KafkaError as topicEx: - print(f"Error:", topicEx) + except (KafkaError, KafkaException) as kafkaErr: + print(f"Kafka exception:", kafkaErr) + return errValue + except RuntimeError as e: + print(f"Runtime exception: {e}") return errValue except Exception as e: print(f"Error creating topic: {e}") From e9a35f19af5a6405e5060a2bfd10cf21164c6d92 Mon Sep 17 00:00:00 2001 From: JeraldJF Date: Tue, 3 Dec 2024 11:11:26 +0530 Subject: [PATCH 3/5] #OBS-I375: fix: fixing the expression field having spaces within it in the ingestion schema. --- api-service/src/services/TableGenerator.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api-service/src/services/TableGenerator.ts b/api-service/src/services/TableGenerator.ts index 7030ad95..270bfec1 100644 --- a/api-service/src/services/TableGenerator.ts +++ b/api-service/src/services/TableGenerator.ts @@ -58,7 +58,7 @@ class BaseTableGenerator { const properties = this.flattenSchema(denormDataset.data_schema, type); const transformProps = _.map(properties, (prop) => { _.set(prop, "name", _.join([denormField.denorm_out_field, prop.name], ".")); - _.set(prop, "expr", _.replace(prop.expr, "$", "$." + denormField.denorm_out_field)); + _.set(prop, "expr", _.replace(prop.expr, "$", "$." + `['${denormField.denorm_out_field}']`)); return prop; }); dataFields.push(...transformProps); @@ -66,7 +66,7 @@ class BaseTableGenerator { } if (!_.isEmpty(transformations_config)) { const transformationFields = _.map(transformations_config, (tf) => ({ - expr: "$." + tf.field_key, + expr: "$." + `['${tf.field_key}']`, name: tf.field_key, data_type: tf.transformation_function.datatype, arrival_format: tf.transformation_function.datatype, @@ -90,7 +90,7 @@ class BaseTableGenerator { const properties = this.flattenSchema(denormDataset.data_schema, type); const transformProps = _.map(properties, (prop) => { _.set(prop, "name", _.join([_.replace(denormField.denorm_out_field, /\./g, "_"), prop.name], "_")); - _.set(prop, "expr", _.replace(prop.expr, "$", "$." + denormField.denorm_out_field)); + _.set(prop, "expr", _.replace(prop.expr, "$", "$." + `['${denormField.denorm_out_field}']`)); return prop; }); dataFields.push(...transformProps); @@ -98,7 +98,7 @@ class BaseTableGenerator { } if (!_.isEmpty(transformations_config)) { const transformationFields = _.map(transformations_config, (tf) => ({ - expr: "$." + tf.field_key, + expr: "$." + `['${tf.field_key}']`, name: _.replace(tf.field_key, /\./g, "_"), data_type: tf.transformation_function.datatype, arrival_format: tf.transformation_function.datatype, From 509f88d35814f6156897f3e4dd7d774bd8ae8f3b Mon Sep 17 00:00:00 2001 From: JeraldJF Date: Tue, 3 Dec 2024 11:37:13 +0530 Subject: [PATCH 4/5] #OBS-I375: fix: reverting changes for hudi spec --- api-service/src/services/TableGenerator.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api-service/src/services/TableGenerator.ts b/api-service/src/services/TableGenerator.ts index 270bfec1..661a3919 100644 --- a/api-service/src/services/TableGenerator.ts +++ b/api-service/src/services/TableGenerator.ts @@ -90,7 +90,7 @@ class BaseTableGenerator { const properties = this.flattenSchema(denormDataset.data_schema, type); const transformProps = _.map(properties, (prop) => { _.set(prop, "name", _.join([_.replace(denormField.denorm_out_field, /\./g, "_"), prop.name], "_")); - _.set(prop, "expr", _.replace(prop.expr, "$", "$." + `['${denormField.denorm_out_field}']`)); + _.set(prop, "expr", _.replace(prop.expr, "$", "$." + denormField.denorm_out_field)); return prop; }); dataFields.push(...transformProps); @@ -98,7 +98,7 @@ class BaseTableGenerator { } if (!_.isEmpty(transformations_config)) { const transformationFields = _.map(transformations_config, (tf) => ({ - expr: "$." + `['${tf.field_key}']`, + expr: "$." + tf.field_key, name: _.replace(tf.field_key, /\./g, "_"), data_type: tf.transformation_function.datatype, arrival_format: tf.transformation_function.datatype, From 3ed101885d13709a0719ba738e3fd135b9bf0af7 Mon Sep 17 00:00:00 2001 From: JeraldJF Date: Tue, 3 Dec 2024 13:18:51 +0530 Subject: [PATCH 5/5] #OBS-I369: fix: aws storage service changes --- api-service/src/services/CloudServices/AWSStorageService.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api-service/src/services/CloudServices/AWSStorageService.ts b/api-service/src/services/CloudServices/AWSStorageService.ts index d83b65b1..61e6f627 100644 --- a/api-service/src/services/CloudServices/AWSStorageService.ts +++ b/api-service/src/services/CloudServices/AWSStorageService.ts @@ -16,10 +16,14 @@ export class AWSStorageService implements ICloudService { const accessKeyId = _.get(config, "identity") const secretAccessKey = _.get(config, "credential") const endpoint = _.get(config, "endpoint") + const s3ForcePathStyle = _.get(config, "s3ForcePathStyle") const configuration: any = { region, credentials: { accessKeyId, secretAccessKey } } if(endpoint) { configuration.endpoint = endpoint; } + if (s3ForcePathStyle) { + configuration.s3ForcePathStyle = s3ForcePathStyle; + } try { this.client = new S3Client(configuration); }