Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#OBS-I442 Removed env for topics #298

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ export const config = {
"connectionTimeout": process.env.kafka_connection_timeout ? parseInt(process.env.kafka_connection_timeout) : 5000
},
"topics": { // Default Kafka topics depend on type of dataset.
"createDataset": `${process.env.system_env || "local"}.ingest`,
"createMasterDataset": `${process.env.system_env || "local"}.masterdata.ingest`
"createDataset": `ingest`,
"createMasterDataset": `masterdata.ingest`
}
}
},
Expand Down
26 changes: 15 additions & 11 deletions command-service/src/command/connector_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from model.db_models import ConnectorInstance
from service.db_service import DatabaseService
import time
from random import choice
from string import ascii_lowercase


class ConnectorCommand(ICommand):
Expand Down Expand Up @@ -94,12 +96,12 @@ def _stop_connector_jobs(self, is_masterdata, dataset_id):
registered_connectors = self._get_registered_connectors(runtime="flink")
print("Registered flink connectors: ", registered_connectors)
for connector in registered_connectors:
if connector["instance_count"] == 0:
if connector[1] == 0:
### Uninstall the helm chart
helm_uninstall_cmd = [
"helm",
"uninstall",
connector["id"].replace(".", "-").replace("_", "-"),
connector[0].replace(".", "-").replace("_", "-"),
"--namespace",
flink_namespace,
]
Expand All @@ -111,9 +113,9 @@ def _stop_connector_jobs(self, is_masterdata, dataset_id):
stderr=subprocess.PIPE,
)
if helm_uninstall_result.returncode == 0:
print(f"Successfully uninstalled deployment '{connector['id']}'...")
print(f"Successfully uninstalled deployment '{connector[0]}'...")
else:
print(f"Error uninstalling deployment '{connector['id']}': {helm_uninstall_result.stderr.decode()}")
print(f"Error uninstalling deployment '{connector}': {helm_uninstall_result.stderr.decode()}")

def _install_jobs(self, dataset_id, active_connectors, is_masterdata):
result = None
Expand All @@ -130,6 +132,9 @@ def _install_jobs(self, dataset_id, active_connectors, is_masterdata):
)
break

if result is None:
result = ActionResponse(status="OK", status_code=200)

# if is_masterdata:
# print("Installing masterdata job")
# masterdata_jar_config = self.config.find("masterdata_job")
Expand Down Expand Up @@ -178,7 +183,7 @@ def _perform_flink_install(self, dataset_id, connector_instance):
)

if err is None:
result = ActionResponse(status="OK", status_code=200)
result = ActionResponse(status="OK", status_code=200)

return result
else:
Expand Down Expand Up @@ -244,10 +249,9 @@ def _perform_flink_install(self, dataset_id, connector_instance):
def _perform_spark_install(self, dataset_id, connector_instance):
err = None
result = None
release_name = "{}-{}-{}".format(
dataset_id[:8].lower().replace("_", "-"),
connector_instance.connector_id[:8].lower().replace("_", "-"),
str(time.time_ns())
release_name = "{}-{}".format(
(dataset_id + "_" + connector_instance.connector_id).lower().replace("_", "-")[:54],
"".join(choice(ascii_lowercase) for i in range(6))
)
connector_source = connector_instance.connector_source
schedule = connector_instance.operations_config["schedule"]
Expand Down Expand Up @@ -362,10 +366,10 @@ def _get_registered_connectors(self, runtime):
query = f"""
SELECT cr.id, COUNT(ci.id) AS instance_count
FROM connector_registry cr
LEFT JOIN connector_instances ci ON cr.id = ci.connector_id
LEFT JOIN connector_instances ci ON (cr.id = ci.connector_id AND ci.status = %s)
WHERE cr.runtime = %s
GROUP BY cr.id
"""
params = (runtime,)
params = (DatasetStatusType.Live.name, runtime)
rows = self.db_service.execute_select_all(sql=query, params=params)
return rows
Loading