Skip to content

Commit

Permalink
Merge pull request #298 from Sanketika-Obsrv/Env-fixes
Browse files Browse the repository at this point in the history
#OBS-I442 Removed env for topics
  • Loading branch information
ravismula authored Dec 30, 2024
2 parents 382e6b8 + f12ef8d commit 0a8100d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
4 changes: 2 additions & 2 deletions api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ export const config = {
"connectionTimeout": process.env.kafka_connection_timeout ? parseInt(process.env.kafka_connection_timeout) : 5000
},
"topics": { // Default Kafka topics depend on type of dataset.
"createDataset": `${process.env.system_env || "local"}.ingest`,
"createMasterDataset": `${process.env.system_env || "local"}.masterdata.ingest`
"createDataset": `ingest`,
"createMasterDataset": `masterdata.ingest`
}
}
},
Expand Down
26 changes: 15 additions & 11 deletions command-service/src/command/connector_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from model.db_models import ConnectorInstance
from service.db_service import DatabaseService
import time
from random import choice
from string import ascii_lowercase


class ConnectorCommand(ICommand):
Expand Down Expand Up @@ -94,12 +96,12 @@ def _stop_connector_jobs(self, is_masterdata, dataset_id):
registered_connectors = self._get_registered_connectors(runtime="flink")
print("Registered flink connectors: ", registered_connectors)
for connector in registered_connectors:
if connector["instance_count"] == 0:
if connector[1] == 0:
### Uninstall the helm chart
helm_uninstall_cmd = [
"helm",
"uninstall",
connector["id"].replace(".", "-").replace("_", "-"),
connector[0].replace(".", "-").replace("_", "-"),
"--namespace",
flink_namespace,
]
Expand All @@ -111,9 +113,9 @@ def _stop_connector_jobs(self, is_masterdata, dataset_id):
stderr=subprocess.PIPE,
)
if helm_uninstall_result.returncode == 0:
print(f"Successfully uninstalled deployment '{connector['id']}'...")
print(f"Successfully uninstalled deployment '{connector[0]}'...")
else:
print(f"Error uninstalling deployment '{connector['id']}': {helm_uninstall_result.stderr.decode()}")
print(f"Error uninstalling deployment '{connector}': {helm_uninstall_result.stderr.decode()}")

def _install_jobs(self, dataset_id, active_connectors, is_masterdata):
result = None
Expand All @@ -130,6 +132,9 @@ def _install_jobs(self, dataset_id, active_connectors, is_masterdata):
)
break

if result is None:
result = ActionResponse(status="OK", status_code=200)

# if is_masterdata:
# print("Installing masterdata job")
# masterdata_jar_config = self.config.find("masterdata_job")
Expand Down Expand Up @@ -178,7 +183,7 @@ def _perform_flink_install(self, dataset_id, connector_instance):
)

if err is None:
result = ActionResponse(status="OK", status_code=200)
result = ActionResponse(status="OK", status_code=200)

return result
else:
Expand Down Expand Up @@ -244,10 +249,9 @@ def _perform_flink_install(self, dataset_id, connector_instance):
def _perform_spark_install(self, dataset_id, connector_instance):
err = None
result = None
release_name = "{}-{}-{}".format(
dataset_id[:8].lower().replace("_", "-"),
connector_instance.connector_id[:8].lower().replace("_", "-"),
str(time.time_ns())
release_name = "{}-{}".format(
(dataset_id + "_" + connector_instance.connector_id).lower().replace("_", "-")[:54],
"".join(choice(ascii_lowercase) for i in range(6))
)
connector_source = connector_instance.connector_source
schedule = connector_instance.operations_config["schedule"]
Expand Down Expand Up @@ -362,10 +366,10 @@ def _get_registered_connectors(self, runtime):
query = f"""
SELECT cr.id, COUNT(ci.id) AS instance_count
FROM connector_registry cr
LEFT JOIN connector_instances ci ON cr.id = ci.connector_id
LEFT JOIN connector_instances ci ON (cr.id = ci.connector_id AND ci.status = %s)
WHERE cr.runtime = %s
GROUP BY cr.id
"""
params = (runtime,)
params = (DatasetStatusType.Live.name, runtime)
rows = self.db_service.execute_select_all(sql=query, params=params)
return rows

0 comments on commit 0a8100d

Please sign in to comment.