Skip to content

Commit

Permalink
Merge pull request #296 from Sanketika-Obsrv/I441-connector-deploymen…
Browse files Browse the repository at this point in the history
…t-fixes

I441 connector deployment fixes
  • Loading branch information
ravismula authored Dec 26, 2024
2 parents 2e2ca31 + 9fa14dd commit 2e03922
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ metadata:
name: {{ printf "%s-%s" $jobName $component }}
namespace: {{ include "base.namespace" $ }}
labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 4 }}
app.kubernetes.io/connector: {{ .Values.connector_id }}
annotations:
checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }}
checksum/job-config: {{ $jobData | toYaml | sha256sum }}
Expand All @@ -27,6 +28,7 @@ spec:
labels:
app.kubernetes.io/name: {{ include "common.names.name" . }}
app.kubernetes.io/component: {{ printf "%s-%s" $jobName $component }}
app.kubernetes.io/connector: {{ .Values.connector_id }}
component: {{ printf "%s-%s" $jobName $component }}
annotations:
checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }}
Expand Down Expand Up @@ -87,6 +89,7 @@ metadata:
name: {{ printf "%s-%s" $jobName $component }}
namespace: {{ include "base.namespace" . }}
labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 4 }}
app.kubernetes.io/connector: {{ .Values.connector_id }}
annotations:
checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }}
checksum/job-config: {{ $jobData | toYaml | sha256sum }}
Expand All @@ -104,9 +107,9 @@ spec:
labels:
app.kubernetes.io/name: {{ include "common.names.name" . }}
app.kubernetes.io/component: {{ printf "%s-%s" $jobName $component }}
component: {{ printf "%s-%s" $jobName $component }}
app.kubernetes.io/connector: {{ .Values.connector_id }}
annotations:
checksum/config: {{ .Files.Glob "cFonfigs/*" | toYaml | sha256sum }}
checksum/config: {{ .Files.Glob "configs/*" | toYaml | sha256sum }}
checksum/job-config: {{ $jobData | toYaml | sha256sum }}
spec:
serviceAccountName: {{ include "base.serviceaccountname" . }}
Expand Down
1 change: 1 addition & 0 deletions command-service/helm-charts/flink-connector/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ serviceMonitor:
# override flink_jobs
# flink_jobs:

connector_id: ""

commonAnnotations:
reloader.stakater.com/auto: "true"
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ include "base.cronReleaseName" . }}
name: {{ .Release.Name }}
namespace: {{ include "base.namespace" . }}
labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 4 }}
app.kubernetes.io/dataset: {{ .Values.dataset_id }}
app.kubernetes.io/connector: {{ .Values.connector_id }}
{{- if .Values.commonAnnotations }}
annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
{{- end }}
Expand All @@ -19,6 +21,8 @@ spec:
{{- include "common.tplvalues.render" (dict "value" .Values.podAnnotations "context" $) | nindent 12 }}
{{- end }}
labels: {{- include "common.labels.standard" ( dict "customLabels" .Values.commonLabels "context" $ ) | nindent 12 }}
app.kubernetes.io/dataset: {{ .Values.dataset_id }}
app.kubernetes.io/connector: {{ .Values.connector_id }}
spec:
serviceAccountName: {{ .Values.serviceAccount.name }}
restartPolicy: {{ .Values.restartPolicy }}
Expand Down
3 changes: 3 additions & 0 deletions command-service/helm-charts/spark-connector-cron/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ instance_id: nyt-psql.1
main_class: org.sunbird.obsrv.connector.JDBCConnector
main_file: jdbc-connector-1.0.0.jar

dataset_id: ""
connector_id: ""

## Object Store Connector
# technology: python
# connector-source: object_store_connector-0.1.0
Expand Down
175 changes: 89 additions & 86 deletions command-service/src/command/connector_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from model.data_models import Action, ActionResponse, CommandPayload, DatasetStatusType
from model.db_models import ConnectorInstance
from service.db_service import DatabaseService
import time


class ConnectorCommand(ICommand):
Expand Down Expand Up @@ -38,15 +39,12 @@ def execute(self, command_payload: CommandPayload, action: Action):

def _deploy_connectors(self, dataset_id, active_connectors, is_masterdata):
result = None
self._stop_connector_jobs(is_masterdata, self.connector_job_config["spark"]["namespace"], active_connectors, dataset_id)
self._stop_connector_jobs(is_masterdata, dataset_id)
result = self._install_jobs(dataset_id, active_connectors, is_masterdata)

return result

def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, dataset_id):
print(f"Uninstalling jobs for {namespace}..")
base_helm_chart = self.connector_job_config["spark"]["base_helm_chart"]

def _stop_connector_jobs(self, is_masterdata, dataset_id):
# managed_releases = []
# connector_jar_config = self.config.find("connector_job")
# masterdata_jar_config = self.config.find("masterdata_job")
Expand All @@ -57,34 +55,66 @@ def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, data
# for release in masterdata_jar_config:
# managed_releases.append(release["release_name"])

helm_ls_cmd = ["helm", "ls", "--namespace", namespace]
helm_ls_result = subprocess.run(
helm_ls_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
## Clear all spark cronjobs for the dataset
spark_namespace = self.connector_job_config["spark"]["namespace"]
deployed_cron_jobs_cmd = [
"kubectl", "get", "cronjobs", "-n", spark_namespace,
"--selector", f"app.kubernetes.io/dataset={dataset_id}",
"-o", "jsonpath=\"{.items[*].metadata.name}\""
]

deployed_cron_jobs_result = subprocess.run(
deployed_cron_jobs_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if helm_ls_result.returncode == 0:
jobs = helm_ls_result.stdout.decode().splitlines()[1:]
job_names = {job.split()[0] for job in jobs if base_helm_chart in job}
spark_connector = {connector.id for connector in active_connectors if connector.connector_runtime == "spark"}
for release_name in spark_connector:
if release_name in job_names:
print(f"Uninstalling job {release_name} related to dataset'{dataset_id}'...")
helm_uninstall_cmd = [
"helm",
"uninstall",
release_name,
"--namespace",
namespace,
]
helm_uninstall_result = subprocess.run(
helm_uninstall_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if helm_uninstall_result.returncode == 0:
print(f"Successfully uninstalled job '{release_name}'...")
else:
print(f"Error uninstalling job '{release_name}': {helm_uninstall_result.stderr.decode()}")


if deployed_cron_jobs_result.returncode == 0 and len(deployed_cron_jobs_result.stdout.decode().replace("\"", "").splitlines()):
jobs = deployed_cron_jobs_result.stdout.decode().replace("\"", "").splitlines()[0].split()
for job in jobs:
print(f"Uninstalling job {job} related to dataset'{dataset_id}'...")
helm_uninstall_cmd = [
"helm",
"uninstall",
job,
"--namespace",
spark_namespace,
]
print("Uninstall command: ", " ".join(helm_uninstall_cmd))
helm_uninstall_result = subprocess.run(
helm_uninstall_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if helm_uninstall_result.returncode == 0:
print(f"Successfully uninstalled job '{job}'...")
else:
print(f"Error uninstalling job '{job}': {helm_uninstall_result.stderr.decode()}")

## Clear all flink connectors that are not active
flink_namespace = self.connector_job_config["flink"]["namespace"]
registered_connectors = self._get_registered_connectors(runtime="flink")
print("Registered flink connectors: ", registered_connectors)
for connector in registered_connectors:
if connector["instance_count"] == 0:
### Uninstall the helm chart
helm_uninstall_cmd = [
"helm",
"uninstall",
connector["id"].replace(".", "-").replace("_", "-"),
"--namespace",
flink_namespace,
]

print("Uninstall command: ", " ".join(helm_uninstall_cmd))
helm_uninstall_result = subprocess.run(
helm_uninstall_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if helm_uninstall_result.returncode == 0:
print(f"Successfully uninstalled deployment '{connector['id']}'...")
else:
print(f"Error uninstalling deployment '{connector['id']}': {helm_uninstall_result.stderr.decode()}")

def _install_jobs(self, dataset_id, active_connectors, is_masterdata):
result = None
for connector in active_connectors:
Expand Down Expand Up @@ -125,27 +155,27 @@ def _perform_flink_install(self, dataset_id, connector_instance):

deployment_exists = any(job_name in line for line in jobs.splitlines()[1:])
if deployment_exists:
restart_cmd = f"kubectl delete pods --selector app.kubernetes.io/name=flink,component={job_name}-jobmanager --namespace {namespace} && kubectl delete pods --selector app.kubernetes.io/name=flink,component={job_name}-taskmanager --namespace {namespace}".format(
restart_cmd = f"kubectl delete pods --selector app.kubernetes.io/name=flink,app.kubernetes.io/component={job_name}-jobmanager --namespace {namespace} && kubectl delete pods --selector app.kubernetes.io/name=flink,app.kubernetes.io/component={job_name}-taskmanager --namespace {namespace}".format(
namespace=namespace, job_name=job_name
)
print("Restart command: ", restart_cmd)
# Run the helm command
helm_install_result = subprocess.run(
restart_cmd_result = subprocess.run(
restart_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)
if helm_install_result.returncode == 0:
if restart_cmd_result.returncode == 0:
print(f"Job {job_name} restart succeeded...")
else:
err = True
print(f"Error restarting pod: {helm_ls_result.stderr.decode()}")
return ActionResponse(
status="ERROR",
status_code=500,
error_message="FLINK_HELM_LIST_EXCEPTION",
)
print(f"Error restarting pod: {helm_ls_result.stderr.decode()}")

if err is None:
result = ActionResponse(status="OK", status_code=200)
Expand All @@ -172,6 +202,8 @@ def _perform_flink_install(self, dataset_id, connector_instance):
"--namespace",
namespace,
"--create-namespace",
"--set", "namespace={}".format(namespace),
"--set", "connector_id={}".format(connector_instance.connector_id),
"--set-json",
f"""flink_jobs={set_json_value.replace(" ", "")}"""
]
Expand Down Expand Up @@ -201,8 +233,6 @@ def _perform_flink_install(self, dataset_id, connector_instance):
result = ActionResponse(status="OK", status_code=200)

return result
else:
self._stop_connector_jobs(is_masterdata=False, namespace="flink")
else:
print(f"Error checking Flink deployments: {helm_ls_result.stderr.decode()}")
return ActionResponse(
Expand All @@ -214,7 +244,11 @@ def _perform_flink_install(self, dataset_id, connector_instance):
def _perform_spark_install(self, dataset_id, connector_instance):
err = None
result = None
release_name = connector_instance.id
release_name = "{}-{}-{}".format(
dataset_id[:8].lower().replace("_", "-"),
connector_instance.connector_id[:8].lower().replace("_", "-"),
str(time.time_ns())
)
connector_source = connector_instance.connector_source
schedule = connector_instance.operations_config["schedule"]

Expand All @@ -239,7 +273,11 @@ def _perform_spark_install(self, dataset_id, connector_instance):
"--set",
"technology={}".format(connector_instance.technology),
"--set",
"instance_id={}".format(release_name),
"dataset_id={}".format(dataset_id),
"--set",
"connector_id={}".format(connector_instance.connector_id),
"--set",
"instance_id={}".format(connector_instance.id),
"--set",
"connector_source={}".format(connector_source["source"]),
"--set",
Expand Down Expand Up @@ -304,7 +342,6 @@ def _get_masterdata_details(self, dataset_id):

return is_masterdata

## TODO: check for connector_id as well
def _get_live_instances(self, runtime, connector_instance):
has_live_instances = False
query = f"""
Expand All @@ -321,48 +358,14 @@ def _get_live_instances(self, runtime, connector_instance):

return has_live_instances

# def _perform_install(self, release):
# err = None
# result = None
# release_name = release["release_name"]
# helm_install_cmd = [
# "helm",
# "upgrade",
# "--install",
# release_name,
# self.connector_job_chart_dir,
# "--namespace",
# self.connector_job_ns,
# "--create-namespace",
# "--set",
# "file.path={}".format(release["jar"]),
# "--set",
# "class.name={}".format(release["class"]),
# "--set",
# "job.name={}".format(release_name),
# "--set",
# "args={}".format(",".join(release["args"])),
# "--set",
# "schedule={}".format(release["schedule"]),
# ]
# helm_install_result = subprocess.run(
# helm_install_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
# )
# if helm_install_result.returncode == 0:
# print(f"Job {release_name} deployment succeeded...")
# else:
# err = True
# result = ActionResponse(
# status="ERROR",
# status_code=500,
# error_message="FLINK_HELM_INSTALLATION_EXCEPTION",
# )
# print(
# f"Error re-installing job {release_name}: {helm_install_result.stderr.decode()}"
# )

# if err is None:
# result = ActionResponse(status="OK", status_code=200)

# return result

def _get_registered_connectors(self, runtime):
query = f"""
SELECT cr.id, COUNT(ci.id) AS instance_count
FROM connector_registry cr
LEFT JOIN connector_instances ci ON cr.id = ci.connector_id
WHERE cr.runtime = %s
GROUP BY cr.id
"""
params = (runtime,)
rows = self.db_service.execute_select_all(sql=query, params=params)
return rows
8 changes: 4 additions & 4 deletions command-service/src/command/connector_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,23 +214,23 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse:
query, params = self.build_insert_query(registry_meta)
success = self.execute_query(query, params)

subprocess.run(["rm", "-rf", self.extraction_path])
subprocess.run(["rm", "-rf", self.download_path])

if not success:
return RegistryResponse(
status="failure",
message=f"Failed to register connector {connector_id}",
statusCode=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
result.append(registry_meta.to_dict())

subprocess.run(["rm", "-rf", self.extraction_path])
subprocess.run(["rm", "-rf", self.download_path])

return RegistryResponse(
status="success",
connector_info=result,
message="Connectors registered successfully",
statusCode=status.HTTP_200_OK
)

else:
connector_id = (
self.metadata.get("metadata", {}).get("id", "").replace(" ", "-")
Expand Down

0 comments on commit 2e03922

Please sign in to comment.