From ae4824c6d001af0f21d0d51afe69eaf910289506 Mon Sep 17 00:00:00 2001 From: Ravi Mula Date: Tue, 24 Dec 2024 11:45:24 +0530 Subject: [PATCH 1/3] #OBS-I441 fix spark connector deployments --- .../flink-connector/templates/deployment.yaml | 5 +- .../helm-charts/flink-connector/values.yaml | 2 + .../templates/cronjob.yaml | 6 +- .../spark-connector-cron/values.yaml | 3 + .../src/command/connector_command.py | 106 +++++++++--------- .../src/command/connector_registry.py | 8 +- 6 files changed, 70 insertions(+), 60 deletions(-) diff --git a/command-service/helm-charts/flink-connector/templates/deployment.yaml b/command-service/helm-charts/flink-connector/templates/deployment.yaml index 74fc9cca..99ceee79 100644 --- a/command-service/helm-charts/flink-connector/templates/deployment.yaml +++ b/command-service/helm-charts/flink-connector/templates/deployment.yaml @@ -104,9 +104,10 @@ spec: labels: app.kubernetes.io/name: {{ include "common.names.name" . }} app.kubernetes.io/component: {{ printf "%s-%s" $jobName $component }} - component: {{ printf "%s-%s" $jobName $component }} + dataset: {{ .Values.dataset_id }} + connector: {{ .Values.connector_id }} annotations: - checksum/config: {{ .Files.Glob "cFonfigs/*" | toYaml | sha256sum }} + checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }} checksum/job-config: {{ $jobData | toYaml | sha256sum }} spec: serviceAccountName: {{ include "base.serviceaccountname" . }} diff --git a/command-service/helm-charts/flink-connector/values.yaml b/command-service/helm-charts/flink-connector/values.yaml index 3076a343..fd9dcda8 100644 --- a/command-service/helm-charts/flink-connector/values.yaml +++ b/command-service/helm-charts/flink-connector/values.yaml @@ -246,6 +246,8 @@ serviceMonitor: # override flink_jobs # flink_jobs: +dataset_id: "" +connector_id: "" commonAnnotations: reloader.stakater.com/auto: "true" \ No newline at end of file diff --git a/command-service/helm-charts/spark-connector-cron/templates/cronjob.yaml b/command-service/helm-charts/spark-connector-cron/templates/cronjob.yaml index 7d35b32b..a80831fc 100644 --- a/command-service/helm-charts/spark-connector-cron/templates/cronjob.yaml +++ b/command-service/helm-charts/spark-connector-cron/templates/cronjob.yaml @@ -1,9 +1,11 @@ apiVersion: batch/v1 kind: CronJob metadata: - name: {{ include "base.cronReleaseName" . }} + name: {{ .Release.Name }} namespace: {{ include "base.namespace" . }} labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 4 }} + app.kubernetes.io/dataset: {{ .Values.dataset_id }} + app.kubernetes.io/connector: {{ .Values.connector_id }} {{- if .Values.commonAnnotations }} annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} {{- end }} @@ -19,6 +21,8 @@ spec: {{- include "common.tplvalues.render" (dict "value" .Values.podAnnotations "context" $) | nindent 12 }} {{- end }} labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 12 }} + app.kubernetes.io/dataset: {{ .Values.dataset_id }} + app.kubernetes.io/connector: {{ .Values.connector_id }} spec: serviceAccountName: {{ .Values.serviceAccount.name }} restartPolicy: {{ .Values.restartPolicy }} diff --git a/command-service/helm-charts/spark-connector-cron/values.yaml b/command-service/helm-charts/spark-connector-cron/values.yaml index 7ed3ba10..8069fe0e 100644 --- a/command-service/helm-charts/spark-connector-cron/values.yaml +++ b/command-service/helm-charts/spark-connector-cron/values.yaml @@ -142,6 +142,9 @@ instance_id: nyt-psql.1 main_class: org.sunbird.obsrv.connector.JDBCConnector main_file: jdbc-connector-1.0.0.jar +dataset_id: "" +connector_id: "" + ## Object Store Connector # technology: python # connector-source: object_store_connector-0.1.0 diff --git a/command-service/src/command/connector_command.py b/command-service/src/command/connector_command.py index 8f5b145e..f91313c4 100644 --- a/command-service/src/command/connector_command.py +++ b/command-service/src/command/connector_command.py @@ -8,6 +8,7 @@ from model.data_models import Action, ActionResponse, CommandPayload, DatasetStatusType from model.db_models import ConnectorInstance from service.db_service import DatabaseService +from datetime import datetime class ConnectorCommand(ICommand): @@ -47,6 +48,7 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data print(f"Uninstalling jobs for {namespace}..") base_helm_chart = self.connector_job_config["spark"]["base_helm_chart"] + # managed_releases = [] # connector_jar_config = self.config.find("connector_job") # masterdata_jar_config = self.config.find("masterdata_job") @@ -57,6 +59,40 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data # for release in masterdata_jar_config: # managed_releases.append(release["release_name"]) + deployed_cron_jobs_cmd = [ + "kubectl", "get", "cronjobs", "-n", namespace, + "--selector", f"app.kubernetes.io/dataset={dataset_id}", + "-o", "jsonpath=\"{.items[*].metadata.name}\"" + ] + print(" ".join(deployed_cron_jobs_cmd)) + + deployed_cron_jobs_result = subprocess.run( + deployed_cron_jobs_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + if deployed_cron_jobs_result.returncode == 0 and len(deployed_cron_jobs_result.stdout.decode().replace("\"", "").splitlines()): + jobs = deployed_cron_jobs_result.stdout.decode().replace("\"", "").splitlines()[0].split() + for job in jobs: + print(f"Uninstalling job {job} related to dataset'{dataset_id}'...") + helm_uninstall_cmd = [ + "helm", + "uninstall", + job, + "--namespace", + namespace, + ] + print("Uninstall command: ", " ".join(helm_uninstall_cmd)) + helm_uninstall_result = subprocess.run( + helm_uninstall_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if helm_uninstall_result.returncode == 0: + print(f"Successfully uninstalled job '{job}'...") + else: + print(f"Error uninstalling job '{job}': {helm_uninstall_result.stderr.decode()}") + + ## TODO: Add flink uninstall logic helm_ls_cmd = ["helm", "ls", "--namespace", namespace] helm_ls_result = subprocess.run( helm_ls_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE @@ -67,7 +103,7 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data spark_connector = {connector.id for connector in active_connectors if connector.connector_runtime == "spark"} for release_name in spark_connector: if release_name in job_names: - print(f"Uninstalling job {release_name} related to dataset'{dataset_id}'...") + print(f"Uninstalling job {release_name} related to dataset'{dataset_id}'...") helm_uninstall_cmd = [ "helm", "uninstall", @@ -75,6 +111,7 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data "--namespace", namespace, ] + print("Uninstall command: ", " ".join(helm_uninstall_cmd)) helm_uninstall_result = subprocess.run( helm_uninstall_cmd, stdout=subprocess.PIPE, @@ -84,7 +121,7 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data print(f"Successfully uninstalled job '{release_name}'...") else: print(f"Error uninstalling job '{release_name}': {helm_uninstall_result.stderr.decode()}") - + def _install_jobs(self, dataset_id, active_connectors, is_masterdata): result = None for connector in active_connectors: @@ -125,7 +162,7 @@ def _perform_flink_install(self, dataset_id, connector_instance): deployment_exists = any(job_name in line for line in jobs.splitlines()[1:]) if deployment_exists: - restart_cmd = f"kubectl delete pods --selector app.kubernetes.io/name=flink,component={job_name}-jobmanager --namespace {namespace} && kubectl delete pods --selector app.kubernetes.io/name=flink,component={job_name}-taskmanager --namespace {namespace}".format( + restart_cmd = f"kubectl delete pods --selector app.kubernetes.io/name=flink,app.kubernetes.io/component={job_name}-jobmanager --namespace {namespace} && kubectl delete pods --selector app.kubernetes.io/name=flink,app.kubernetes.io/component={job_name}-taskmanager --namespace {namespace}".format( namespace=namespace, job_name=job_name ) print("Restart command: ", restart_cmd) @@ -140,12 +177,12 @@ def _perform_flink_install(self, dataset_id, connector_instance): print(f"Job {job_name} restart succeeded...") else: err = True + print(f"Error restarting pod: {helm_ls_result.stderr.decode()}") return ActionResponse( status="ERROR", status_code=500, error_message="FLINK_HELM_LIST_EXCEPTION", ) - print(f"Error restarting pod: {helm_ls_result.stderr.decode()}") if err is None: result = ActionResponse(status="OK", status_code=200) @@ -172,6 +209,7 @@ def _perform_flink_install(self, dataset_id, connector_instance): "--namespace", namespace, "--create-namespace", + "--set", "namespace={}".format(namespace), "--set-json", f"""flink_jobs={set_json_value.replace(" ", "")}""" ] @@ -202,7 +240,7 @@ def _perform_flink_install(self, dataset_id, connector_instance): return result else: - self._stop_connector_jobs(is_masterdata=False, namespace="flink") + self._stop_connector_jobs(is_masterdata=False, namespace=self.connector_job_config["flink"]["namespace"], active_connectors=[connector_instance], dataset_id=dataset_id) else: print(f"Error checking Flink deployments: {helm_ls_result.stderr.decode()}") return ActionResponse( @@ -214,7 +252,11 @@ def _perform_flink_install(self, dataset_id, connector_instance): def _perform_spark_install(self, dataset_id, connector_instance): err = None result = None - release_name = connector_instance.id + release_name = "{}-{}-{}".format( + dataset_id[:8].lower().replace("_", "-"), + connector_instance.connector_id[:8].lower().replace("_", "-"), + datetime.now().strftime("%Y%m%d%H%M%S") + ) connector_source = connector_instance.connector_source schedule = connector_instance.operations_config["schedule"] @@ -239,7 +281,11 @@ def _perform_spark_install(self, dataset_id, connector_instance): "--set", "technology={}".format(connector_instance.technology), "--set", - "instance_id={}".format(release_name), + "dataset_id={}".format(dataset_id), + "--set", + "connector_id={}".format(connector_instance.connector_id), + "--set", + "instance_id={}".format(connector_instance.id), "--set", "connector_source={}".format(connector_source["source"]), "--set", @@ -320,49 +366,3 @@ def _get_live_instances(self, runtime, connector_instance): has_live_instances = True return has_live_instances - - # def _perform_install(self, release): - # err = None - # result = None - # release_name = release["release_name"] - # helm_install_cmd = [ - # "helm", - # "upgrade", - # "--install", - # release_name, - # self.connector_job_chart_dir, - # "--namespace", - # self.connector_job_ns, - # "--create-namespace", - # "--set", - # "file.path={}".format(release["jar"]), - # "--set", - # "class.name={}".format(release["class"]), - # "--set", - # "job.name={}".format(release_name), - # "--set", - # "args={}".format(",".join(release["args"])), - # "--set", - # "schedule={}".format(release["schedule"]), - # ] - # helm_install_result = subprocess.run( - # helm_install_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE - # ) - # if helm_install_result.returncode == 0: - # print(f"Job {release_name} deployment succeeded...") - # else: - # err = True - # result = ActionResponse( - # status="ERROR", - # status_code=500, - # error_message="FLINK_HELM_INSTALLATION_EXCEPTION", - # ) - # print( - # f"Error re-installing job {release_name}: {helm_install_result.stderr.decode()}" - # ) - - # if err is None: - # result = ActionResponse(status="OK", status_code=200) - - # return result - diff --git a/command-service/src/command/connector_registry.py b/command-service/src/command/connector_registry.py index 40edf5f7..140d59bc 100644 --- a/command-service/src/command/connector_registry.py +++ b/command-service/src/command/connector_registry.py @@ -214,9 +214,6 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse: query, params = self.build_insert_query(registry_meta) success = self.execute_query(query, params) - subprocess.run(["rm", "-rf", self.extraction_path]) - subprocess.run(["rm", "-rf", self.download_path]) - if not success: return RegistryResponse( status="failure", @@ -224,13 +221,16 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse: statusCode=status.HTTP_500_INTERNAL_SERVER_ERROR, ) result.append(registry_meta.to_dict()) + + subprocess.run(["rm", "-rf", self.extraction_path]) + subprocess.run(["rm", "-rf", self.download_path]) + return RegistryResponse( status="success", connector_info=result, message="Connectors registered successfully", statusCode=status.HTTP_200_OK ) - else: connector_id = ( self.metadata.get("metadata", {}).get("id", "").replace(" ", "-") From 6bd53a940cab79b4b19327ca105e5cd8a8ebcfae Mon Sep 17 00:00:00 2001 From: SurabhiAngadi Date: Tue, 24 Dec 2024 17:07:47 +0530 Subject: [PATCH 2/3] #OBS-I441: replcae datetime with time module --- command-service/src/command/connector_command.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/command-service/src/command/connector_command.py b/command-service/src/command/connector_command.py index f91313c4..8e3ac5bf 100644 --- a/command-service/src/command/connector_command.py +++ b/command-service/src/command/connector_command.py @@ -8,7 +8,7 @@ from model.data_models import Action, ActionResponse, CommandPayload, DatasetStatusType from model.db_models import ConnectorInstance from service.db_service import DatabaseService -from datetime import datetime +import time class ConnectorCommand(ICommand): @@ -255,7 +255,7 @@ def _perform_spark_install(self, dataset_id, connector_instance): release_name = "{}-{}-{}".format( dataset_id[:8].lower().replace("_", "-"), connector_instance.connector_id[:8].lower().replace("_", "-"), - datetime.now().strftime("%Y%m%d%H%M%S") + str(time.time_ns()) ) connector_source = connector_instance.connector_source schedule = connector_instance.operations_config["schedule"] From 9fa14dd1fd1f0a7f915085bcd345770c7fcef3b5 Mon Sep 17 00:00:00 2001 From: Ravi Mula Date: Thu, 26 Dec 2024 10:41:44 +0530 Subject: [PATCH 3/3] #OBS-I441 fix flink connector deployments --- .../flink-connector/templates/deployment.yaml | 6 +- .../helm-charts/flink-connector/values.yaml | 1 - .../src/command/connector_command.py | 89 ++++++++++--------- 3 files changed, 50 insertions(+), 46 deletions(-) diff --git a/command-service/helm-charts/flink-connector/templates/deployment.yaml b/command-service/helm-charts/flink-connector/templates/deployment.yaml index 99ceee79..9e2c9802 100644 --- a/command-service/helm-charts/flink-connector/templates/deployment.yaml +++ b/command-service/helm-charts/flink-connector/templates/deployment.yaml @@ -10,6 +10,7 @@ metadata: name: {{ printf "%s-%s" $jobName $component }} namespace: {{ include "base.namespace" $ }} labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 4 }} + app.kubernetes.io/connector: {{ .Values.connector_id }} annotations: checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }} checksum/job-config: {{ $jobData | toYaml | sha256sum }} @@ -27,6 +28,7 @@ spec: labels: app.kubernetes.io/name: {{ include "common.names.name" . }} app.kubernetes.io/component: {{ printf "%s-%s" $jobName $component }} + app.kubernetes.io/connector: {{ .Values.connector_id }} component: {{ printf "%s-%s" $jobName $component }} annotations: checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }} @@ -87,6 +89,7 @@ metadata: name: {{ printf "%s-%s" $jobName $component }} namespace: {{ include "base.namespace" . }} labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 4 }} + app.kubernetes.io/connector: {{ .Values.connector_id }} annotations: checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }} checksum/job-config: {{ $jobData | toYaml | sha256sum }} @@ -104,8 +107,7 @@ spec: labels: app.kubernetes.io/name: {{ include "common.names.name" . }} app.kubernetes.io/component: {{ printf "%s-%s" $jobName $component }} - dataset: {{ .Values.dataset_id }} - connector: {{ .Values.connector_id }} + app.kubernetes.io/connector: {{ .Values.connector_id }} annotations: checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }} checksum/job-config: {{ $jobData | toYaml | sha256sum }} diff --git a/command-service/helm-charts/flink-connector/values.yaml b/command-service/helm-charts/flink-connector/values.yaml index fd9dcda8..ccabbc54 100644 --- a/command-service/helm-charts/flink-connector/values.yaml +++ b/command-service/helm-charts/flink-connector/values.yaml @@ -246,7 +246,6 @@ serviceMonitor: # override flink_jobs # flink_jobs: -dataset_id: "" connector_id: "" commonAnnotations: diff --git a/command-service/src/command/connector_command.py b/command-service/src/command/connector_command.py index 8e3ac5bf..4a92ee17 100644 --- a/command-service/src/command/connector_command.py +++ b/command-service/src/command/connector_command.py @@ -39,16 +39,12 @@ def execute(self, command_payload: CommandPayload, action: Action): def _deploy_connectors(self, dataset_id, active_connectors, is_masterdata): result = None - self._stop_connector_jobs(is_masterdata, self.connector_job_config["spark"]["namespace"], active_connectors, dataset_id) + self._stop_connector_jobs(is_masterdata, dataset_id) result = self._install_jobs(dataset_id, active_connectors, is_masterdata) return result - def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, dataset_id): - print(f"Uninstalling jobs for {namespace}..") - base_helm_chart = self.connector_job_config["spark"]["base_helm_chart"] - - + def _stop_connector_jobs(self, is_masterdata, dataset_id): # managed_releases = [] # connector_jar_config = self.config.find("connector_job") # masterdata_jar_config = self.config.find("masterdata_job") @@ -59,12 +55,13 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data # for release in masterdata_jar_config: # managed_releases.append(release["release_name"]) + ## Clear all spark cronjobs for the dataset + spark_namespace = self.connector_job_config["spark"]["namespace"] deployed_cron_jobs_cmd = [ - "kubectl", "get", "cronjobs", "-n", namespace, + "kubectl", "get", "cronjobs", "-n", spark_namespace, "--selector", f"app.kubernetes.io/dataset={dataset_id}", "-o", "jsonpath=\"{.items[*].metadata.name}\"" ] - print(" ".join(deployed_cron_jobs_cmd)) deployed_cron_jobs_result = subprocess.run( deployed_cron_jobs_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE @@ -79,7 +76,7 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data "uninstall", job, "--namespace", - namespace, + spark_namespace, ] print("Uninstall command: ", " ".join(helm_uninstall_cmd)) helm_uninstall_result = subprocess.run( @@ -92,35 +89,31 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data else: print(f"Error uninstalling job '{job}': {helm_uninstall_result.stderr.decode()}") - ## TODO: Add flink uninstall logic - helm_ls_cmd = ["helm", "ls", "--namespace", namespace] - helm_ls_result = subprocess.run( - helm_ls_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - if helm_ls_result.returncode == 0: - jobs = helm_ls_result.stdout.decode().splitlines()[1:] - job_names = {job.split()[0] for job in jobs if base_helm_chart in job} - spark_connector = {connector.id for connector in active_connectors if connector.connector_runtime == "spark"} - for release_name in spark_connector: - if release_name in job_names: - print(f"Uninstalling job {release_name} related to dataset'{dataset_id}'...") - helm_uninstall_cmd = [ - "helm", - "uninstall", - release_name, - "--namespace", - namespace, - ] - print("Uninstall command: ", " ".join(helm_uninstall_cmd)) - helm_uninstall_result = subprocess.run( - helm_uninstall_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if helm_uninstall_result.returncode == 0: - print(f"Successfully uninstalled job '{release_name}'...") - else: - print(f"Error uninstalling job '{release_name}': {helm_uninstall_result.stderr.decode()}") + ## Clear all flink connectors that are not active + flink_namespace = self.connector_job_config["flink"]["namespace"] + registered_connectors = self._get_registered_connectors(runtime="flink") + print("Registered flink connectors: ", registered_connectors) + for connector in registered_connectors: + if connector["instance_count"] == 0: + ### Uninstall the helm chart + helm_uninstall_cmd = [ + "helm", + "uninstall", + connector["id"].replace(".", "-").replace("_", "-"), + "--namespace", + flink_namespace, + ] + + print("Uninstall command: ", " ".join(helm_uninstall_cmd)) + helm_uninstall_result = subprocess.run( + helm_uninstall_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if helm_uninstall_result.returncode == 0: + print(f"Successfully uninstalled deployment '{connector['id']}'...") + else: + print(f"Error uninstalling deployment '{connector['id']}': {helm_uninstall_result.stderr.decode()}") def _install_jobs(self, dataset_id, active_connectors, is_masterdata): result = None @@ -167,13 +160,13 @@ def _perform_flink_install(self, dataset_id, connector_instance): ) print("Restart command: ", restart_cmd) # Run the helm command - helm_install_result = subprocess.run( + restart_cmd_result = subprocess.run( restart_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, ) - if helm_install_result.returncode == 0: + if restart_cmd_result.returncode == 0: print(f"Job {job_name} restart succeeded...") else: err = True @@ -210,6 +203,7 @@ def _perform_flink_install(self, dataset_id, connector_instance): namespace, "--create-namespace", "--set", "namespace={}".format(namespace), + "--set", "connector_id={}".format(connector_instance.connector_id), "--set-json", f"""flink_jobs={set_json_value.replace(" ", "")}""" ] @@ -239,8 +233,6 @@ def _perform_flink_install(self, dataset_id, connector_instance): result = ActionResponse(status="OK", status_code=200) return result - else: - self._stop_connector_jobs(is_masterdata=False, namespace=self.connector_job_config["flink"]["namespace"], active_connectors=[connector_instance], dataset_id=dataset_id) else: print(f"Error checking Flink deployments: {helm_ls_result.stderr.decode()}") return ActionResponse( @@ -350,7 +342,6 @@ def _get_masterdata_details(self, dataset_id): return is_masterdata - ## TODO: check for connector_id as well def _get_live_instances(self, runtime, connector_instance): has_live_instances = False query = f""" @@ -366,3 +357,15 @@ def _get_live_instances(self, runtime, connector_instance): has_live_instances = True return has_live_instances + + def _get_registered_connectors(self, runtime): + query = f""" + SELECT cr.id, COUNT(ci.id) AS instance_count + FROM connector_registry cr + LEFT JOIN connector_instances ci ON cr.id = ci.connector_id + WHERE cr.runtime = %s + GROUP BY cr.id + """ + params = (runtime,) + rows = self.db_service.execute_select_all(sql=query, params=params) + return rows \ No newline at end of file