From 9880b2fc51e3da2c2e693803692cec46b6826845 Mon Sep 17 00:00:00 2001 From: Ravi Mula Date: Wed, 13 Nov 2024 13:33:30 +0530 Subject: [PATCH] append base64 prefix upon connector register --- .../src/command/connector_registry.py | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/command-service/src/command/connector_registry.py b/command-service/src/command/connector_registry.py index 358c6eb1..40edf5f7 100644 --- a/command-service/src/command/connector_registry.py +++ b/command-service/src/command/connector_registry.py @@ -176,9 +176,9 @@ def load_ui_metadata(self, extract_out_path): def process_metadata(self, rel_path, connector_source) -> RegistryResponse: result = [] tenant = self.metadata.get("metadata", {}).get("tenant", "") - + self.copy_connector_to_runtime(self.metadata['metadata']['runtime'], connector_source) - + if tenant == "multiple": connector_objects = self.metadata["connectors"] for obj in connector_objects: @@ -213,17 +213,17 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse: ) query, params = self.build_insert_query(registry_meta) success = self.execute_query(query, params) - + subprocess.run(["rm", "-rf", self.extraction_path]) subprocess.run(["rm", "-rf", self.download_path]) - + if not success: return RegistryResponse( status="failure", message=f"Failed to register connector {connector_id}", statusCode=status.HTTP_500_INTERNAL_SERVER_ERROR, ) - result.append(registry_meta.to_dict()) + result.append(registry_meta.to_dict()) return RegistryResponse( status="success", connector_info=result, @@ -266,10 +266,10 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse: ) query, params = self.build_insert_query(registry_meta) success = self.execute_query(query, params) - + subprocess.run(["rm", "-rf", self.extraction_path]) subprocess.run(["rm", "-rf", self.download_path]) - + if not success: return RegistryResponse( status="failure", @@ -325,16 +325,16 @@ def build_insert_query(self, registry_meta: ConnectorRegsitryv2): ui_spec_json = json.dumps(registry_meta.ui_spec) query =f""" INSERT INTO connector_registry ( - id, connector_id, name, type, category, version, description, - technology, runtime, licence, owner, iconurl, status, source_url, + id, connector_id, name, type, category, version, description, + technology, runtime, licence, owner, iconurl, status, source_url, source, ui_spec, created_by, updated_by, created_date, updated_date, live_date ) VALUES ( - %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) ON CONFLICT ( connector_id, version - ) DO UPDATE SET + ) DO UPDATE SET id = %s, name = %s, type = %s, @@ -396,14 +396,26 @@ def build_insert_query(self, registry_meta: ConnectorRegsitryv2): datetime.now(), ) return query, params - + def load_file_bytes(self, rel_path: str) -> bytes | None: file_path = Path(self.extraction_path) for item in file_path.glob("*/{}".format(rel_path)): try: + prefixes = { + ".svg": "data:image/svg+xml;base64,", + ".jpeg": "data:image/jpeg;base64,", + ".jpg": "data:image/jpg;base64,", + ".gif": "data:image/gif;base64,", + ".webp": "data:image/webp;base64,", + ".ico": "data:image/x-icon;base64," + } + + prefix = prefixes.get(item.suffix, "data:application/octet-stream;base64,") + print(f"Connector Registry | Image Suffix: {item.suffix} Base64 Prefix in Use: {prefix}") + with open(item, 'rb') as file: file_content = file.read() - encoded = base64.b64encode(file_content).decode("ascii") + encoded = (prefix + base64.b64encode(file_content).decode("ascii")).strip() except IsADirectoryError: print( f"Connector Registry | No value for icon URL given at metadata: {rel_path}" @@ -433,11 +445,11 @@ def update_connector_registry(self, _id, ver): print( f"Connector Registry | An error occurred during the execution of Query: {e}" ) - + def copy_connector_to_runtime(self, runtime: str, connector_source: str): if runtime == "spark": return self.copy_connector_to_spark(connector_source) - + def copy_connector_to_spark(self, connector_source: str): print(f"Connector Registry | copying {connector_source} to spark") @@ -485,7 +497,7 @@ def copy_connector_to_spark(self, connector_source: str): message="failed to copy the connector to spark", statusCode=status.HTTP_500_INTERNAL_SERVER_ERROR, ) - + if self.metadata['metadata']['technology'] == "python": pip_install_cmd = [ "kubectl", @@ -507,7 +519,7 @@ def copy_connector_to_spark(self, connector_source: str): message="failed to install the requirements on spark", statusCode=status.HTTP_500_INTERNAL_SERVER_ERROR, ) - + return RegistryResponse( status="success", message="connector copied to spark successfully",