Skip to content

Commit

Permalink
Merge pull request #277 from Sanketika-Obsrv/connector_image_fix
Browse files Browse the repository at this point in the history
append base64 prefix upon connector register
  • Loading branch information
SanthoshVasabhaktula authored Nov 13, 2024
2 parents 1d2fec8 + 9880b2f commit 812ffbe
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions command-service/src/command/connector_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ def load_ui_metadata(self, extract_out_path):
def process_metadata(self, rel_path, connector_source) -> RegistryResponse:
result = []
tenant = self.metadata.get("metadata", {}).get("tenant", "")

self.copy_connector_to_runtime(self.metadata['metadata']['runtime'], connector_source)

if tenant == "multiple":
connector_objects = self.metadata["connectors"]
for obj in connector_objects:
Expand Down Expand Up @@ -213,17 +213,17 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse:
)
query, params = self.build_insert_query(registry_meta)
success = self.execute_query(query, params)

subprocess.run(["rm", "-rf", self.extraction_path])
subprocess.run(["rm", "-rf", self.download_path])

if not success:
return RegistryResponse(
status="failure",
message=f"Failed to register connector {connector_id}",
statusCode=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
result.append(registry_meta.to_dict())
result.append(registry_meta.to_dict())
return RegistryResponse(
status="success",
connector_info=result,
Expand Down Expand Up @@ -266,10 +266,10 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse:
)
query, params = self.build_insert_query(registry_meta)
success = self.execute_query(query, params)

subprocess.run(["rm", "-rf", self.extraction_path])
subprocess.run(["rm", "-rf", self.download_path])

if not success:
return RegistryResponse(
status="failure",
Expand Down Expand Up @@ -325,16 +325,16 @@ def build_insert_query(self, registry_meta: ConnectorRegsitryv2):
ui_spec_json = json.dumps(registry_meta.ui_spec)
query =f"""
INSERT INTO connector_registry (
id, connector_id, name, type, category, version, description,
technology, runtime, licence, owner, iconurl, status, source_url,
id, connector_id, name, type, category, version, description,
technology, runtime, licence, owner, iconurl, status, source_url,
source, ui_spec, created_by, updated_by, created_date, updated_date, live_date
) VALUES (
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s
) ON CONFLICT (
connector_id, version
) DO UPDATE SET
) DO UPDATE SET
id = %s,
name = %s,
type = %s,
Expand Down Expand Up @@ -396,14 +396,26 @@ def build_insert_query(self, registry_meta: ConnectorRegsitryv2):
datetime.now(),
)
return query, params

def load_file_bytes(self, rel_path: str) -> bytes | None:
file_path = Path(self.extraction_path)
for item in file_path.glob("*/{}".format(rel_path)):
try:
prefixes = {
".svg": "data:image/svg+xml;base64,",
".jpeg": "data:image/jpeg;base64,",
".jpg": "data:image/jpg;base64,",
".gif": "data:image/gif;base64,",
".webp": "data:image/webp;base64,",
".ico": "data:image/x-icon;base64,"
}

prefix = prefixes.get(item.suffix, "data:application/octet-stream;base64,")
print(f"Connector Registry | Image Suffix: {item.suffix} Base64 Prefix in Use: {prefix}")

with open(item, 'rb') as file:
file_content = file.read()
encoded = base64.b64encode(file_content).decode("ascii")
encoded = (prefix + base64.b64encode(file_content).decode("ascii")).strip()
except IsADirectoryError:
print(
f"Connector Registry | No value for icon URL given at metadata: {rel_path}"
Expand Down Expand Up @@ -433,11 +445,11 @@ def update_connector_registry(self, _id, ver):
print(
f"Connector Registry | An error occurred during the execution of Query: {e}"
)

def copy_connector_to_runtime(self, runtime: str, connector_source: str):
if runtime == "spark":
return self.copy_connector_to_spark(connector_source)


def copy_connector_to_spark(self, connector_source: str):
print(f"Connector Registry | copying {connector_source} to spark")
Expand Down Expand Up @@ -485,7 +497,7 @@ def copy_connector_to_spark(self, connector_source: str):
message="failed to copy the connector to spark",
statusCode=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

if self.metadata['metadata']['technology'] == "python":
pip_install_cmd = [
"kubectl",
Expand All @@ -507,7 +519,7 @@ def copy_connector_to_spark(self, connector_source: str):
message="failed to install the requirements on spark",
statusCode=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

return RegistryResponse(
status="success",
message="connector copied to spark successfully",
Expand Down

0 comments on commit 812ffbe

Please sign in to comment.