diff --git a/api-service/src/configs/Config.ts b/api-service/src/configs/Config.ts index 51e0a7ab..cd74c111 100644 --- a/api-service/src/configs/Config.ts +++ b/api-service/src/configs/Config.ts @@ -42,8 +42,8 @@ export const config = { "connectionTimeout": process.env.kafka_connection_timeout ? parseInt(process.env.kafka_connection_timeout) : 5000 }, "topics": { // Default Kafka topics depend on type of dataset. - "createDataset": `${process.env.system_env || "local"}.ingest`, - "createMasterDataset": `${process.env.system_env || "local"}.masterdata.ingest` + "createDataset": `ingest`, + "createMasterDataset": `masterdata.ingest` } } }, diff --git a/command-service/src/command/connector_command.py b/command-service/src/command/connector_command.py index 4a92ee17..b5073665 100644 --- a/command-service/src/command/connector_command.py +++ b/command-service/src/command/connector_command.py @@ -9,6 +9,8 @@ from model.db_models import ConnectorInstance from service.db_service import DatabaseService import time +from random import choice +from string import ascii_lowercase class ConnectorCommand(ICommand): @@ -94,12 +96,12 @@ def _stop_connector_jobs(self, is_masterdata, dataset_id): registered_connectors = self._get_registered_connectors(runtime="flink") print("Registered flink connectors: ", registered_connectors) for connector in registered_connectors: - if connector["instance_count"] == 0: + if connector[1] == 0: ### Uninstall the helm chart helm_uninstall_cmd = [ "helm", "uninstall", - connector["id"].replace(".", "-").replace("_", "-"), + connector[0].replace(".", "-").replace("_", "-"), "--namespace", flink_namespace, ] @@ -111,9 +113,9 @@ def _stop_connector_jobs(self, is_masterdata, dataset_id): stderr=subprocess.PIPE, ) if helm_uninstall_result.returncode == 0: - print(f"Successfully uninstalled deployment '{connector['id']}'...") + print(f"Successfully uninstalled deployment '{connector[0]}'...") else: - print(f"Error uninstalling deployment '{connector['id']}': {helm_uninstall_result.stderr.decode()}") + print(f"Error uninstalling deployment '{connector}': {helm_uninstall_result.stderr.decode()}") def _install_jobs(self, dataset_id, active_connectors, is_masterdata): result = None @@ -130,6 +132,9 @@ def _install_jobs(self, dataset_id, active_connectors, is_masterdata): ) break + if result is None: + result = ActionResponse(status="OK", status_code=200) + # if is_masterdata: # print("Installing masterdata job") # masterdata_jar_config = self.config.find("masterdata_job") @@ -178,7 +183,7 @@ def _perform_flink_install(self, dataset_id, connector_instance): ) if err is None: - result = ActionResponse(status="OK", status_code=200) + result = ActionResponse(status="OK", status_code=200) return result else: @@ -244,10 +249,9 @@ def _perform_flink_install(self, dataset_id, connector_instance): def _perform_spark_install(self, dataset_id, connector_instance): err = None result = None - release_name = "{}-{}-{}".format( - dataset_id[:8].lower().replace("_", "-"), - connector_instance.connector_id[:8].lower().replace("_", "-"), - str(time.time_ns()) + release_name = "{}-{}".format( + (dataset_id + "_" + connector_instance.connector_id).lower().replace("_", "-")[:54], + "".join(choice(ascii_lowercase) for i in range(6)) ) connector_source = connector_instance.connector_source schedule = connector_instance.operations_config["schedule"] @@ -362,10 +366,10 @@ def _get_registered_connectors(self, runtime): query = f""" SELECT cr.id, COUNT(ci.id) AS instance_count FROM connector_registry cr - LEFT JOIN connector_instances ci ON cr.id = ci.connector_id + LEFT JOIN connector_instances ci ON (cr.id = ci.connector_id AND ci.status = %s) WHERE cr.runtime = %s GROUP BY cr.id """ - params = (runtime,) + params = (DatasetStatusType.Live.name, runtime) rows = self.db_service.execute_select_all(sql=query, params=params) return rows \ No newline at end of file