From 0f842315f1e90b3efa264b73f05f99785478151c Mon Sep 17 00:00:00 2001 From: matt cameron Date: Mon, 28 Oct 2024 23:21:52 +1100 Subject: [PATCH] ElasticSearchWriter Plugin (#880) * initial commit for WriteLocalDatabase plugin * feat: add identifier for document upsert --- .../elasticsearch/write_local/model/config.py | 27 ++++ .../elasticsearch/write_local/plugin.py | 135 ++++++++++++++++++ tracardi/service/setup/setup_plugins.py | 6 + 3 files changed, 168 insertions(+) create mode 100644 tracardi/process_engine/action/v1/connectors/elasticsearch/write_local/model/config.py create mode 100644 tracardi/process_engine/action/v1/connectors/elasticsearch/write_local/plugin.py diff --git a/tracardi/process_engine/action/v1/connectors/elasticsearch/write_local/model/config.py b/tracardi/process_engine/action/v1/connectors/elasticsearch/write_local/model/config.py new file mode 100644 index 00000000..b809a806 --- /dev/null +++ b/tracardi/process_engine/action/v1/connectors/elasticsearch/write_local/model/config.py @@ -0,0 +1,27 @@ +from typing import Optional + +from pydantic import field_validator +import json +from tracardi.domain.named_entity import NamedEntity +from tracardi.service.plugin.domain.config import PluginConfig + +class Config(PluginConfig): + index: str + documents: str + source: NamedEntity + identifier: str + log: Optional[bool] = False + + @field_validator("index") + @classmethod + def validate_index(cls, value): + if value is None or len(value) == 0: + raise ValueError("This field cannot be empty.") + return value + + @field_validator("documents") + @classmethod + def validate_documents(cls, value): + if value is None or len(value) == 0: + raise ValueError("This field cannot be empty.") + return value diff --git a/tracardi/process_engine/action/v1/connectors/elasticsearch/write_local/plugin.py b/tracardi/process_engine/action/v1/connectors/elasticsearch/write_local/plugin.py new file mode 100644 index 00000000..29a64002 --- /dev/null +++ b/tracardi/process_engine/action/v1/connectors/elasticsearch/write_local/plugin.py @@ -0,0 +1,135 @@ +import json + +from tracardi.domain.value_object.bulk_insert_result import BulkInsertResult +from tracardi.service.notation.dict_traverser import DictTraverser + +from tracardi.service.plugin.domain.register import Plugin, Spec, MetaData, Documentation, PortDoc, Form, FormGroup, \ + FormField, FormComponent +from tracardi.service.plugin.runner import ActionRunner +from tracardi.service.plugin.domain.result import Result +from .model.config import Config +from tracardi.service.storage.elastic.interface import raw as raw_db + + +def validate(config: dict): + config = Config(**config) + return config + +class WriteLocalDatabase(ActionRunner): + + config: Config + + async def set_up(self, init): + self.config = Config(**init) + + async def run(self, payload: dict, in_edge=None) -> Result: + + dot = self._get_dot_accessor(payload) + + try: + + index=dot[self.config.index] + documents=dot[self.config.documents] + identifier=dot[self.config.identifier] + + if isinstance(documents, str): + documents = json.loads(documents) + + if isinstance(documents, list) and identifier: + documents = [{identifier: item} for item in documents] + + if identifier: + for item in documents: + if identifier in item: + item["_id"] = item[identifier] + + result = await raw_db.bulk_upsert( + index=index, + data=documents + ) + + if isinstance(result, BulkInsertResult): + result_dict = result.dict() + + except Exception as e: + self.console.error(str(e)) + return Result(port="error", value={ + "message": str(e) + }) + + return Result(port="result", value=result_dict) + +def register() -> Plugin: + return Plugin( + start=False, + spec=Spec( + module=__name__, + className=WriteLocalDatabase.__name__, + inputs=["payload"], + outputs=["result", "error"], + version='1.0.3', + license="MIT", + author="Matt Cameron", + manual='write_data', + init={ + "index": None, + "document": None + }, + form=Form( + groups=[ + FormGroup( + name="Configuration", + fields=[ + FormField( + id="source", + name="Elasticsearch resource", + description="Please select your Elasticsearch resource.", + component=FormComponent(type="resource", props={"label": "Resource", + "tag": "elasticsearch"}) + ), + FormField( + id="index", + name="Index", + description="The index for where the documents will be upserted.", + component=FormComponent(type="dotPath", props={ + "label": "Index" + }) + ), + FormField( + id="documents", + name="Documents", + description="The documents to be upserted/inserted.", + component=FormComponent(type="dotPath", props={ + "label": "Documents" + }) + ), + FormField( + id="identifier", + name="Identifier", + description="The primary key to be used if documents are to be upserted.", + component=FormComponent(type="dotPath", props={ + "label": "Documents" + }) + ), + ] + ) + ] + ) + ), + metadata=MetaData( + name='Write data', + desc='Write local Elasticsearch database', + icon='elasticsearch', + group=["Databases"], + tags=['database', 'nosql', 'elastic'], + documentation=Documentation( + inputs={ + "payload": PortDoc(desc="This port takes payload object.") + }, + outputs={ + "result": PortDoc(desc="This port returns result of upserting ElasticSearch instance."), + "error": PortDoc(desc="This port returns error if an error occurs.") + } + ) + ) + ) diff --git a/tracardi/service/setup/setup_plugins.py b/tracardi/service/setup/setup_plugins.py index a697ccab..7fa60548 100644 --- a/tracardi/service/setup/setup_plugins.py +++ b/tracardi/service/setup/setup_plugins.py @@ -774,6 +774,12 @@ resource=None) ), + "tracardi.process_engine.action.v1.connectors.elasticsearch.write_local.plugin": PluginMetadata( + test=PluginTest( + init={'index': '', 'documents':'', 'identifier':''}, + resource=None) + ), + "tracardi.process_engine.action.v1.connectors.ghost.plugin": PluginMetadata( test=PluginTest(init={'uuid': ''}, resource={