Skip to content

Commit

Permalink
Application bootstrap and config improvements (#18)
Browse files Browse the repository at this point in the history
* Application initialisation is done via
 NeatApp class

* Implemented global configs in UI and API

* - Small adjustments based on feedback
- Added new fast_graph (oxigraph) wf example


---------

Co-authored-by: Aleksandrs Livincovs <[email protected]>
  • Loading branch information
alivinco and cog-alivinco authored May 8, 2023
1 parent c4a260e commit 7800cde
Show file tree
Hide file tree
Showing 21 changed files with 968 additions and 210 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [0.11.0] - 11-05-24
- Refactored application bootrap procese and core application functions aggregated into NeatApp class.
- Small bug fixes.
- Fixed global configurations via UI and API.

## [0.10.4] - 28-04-24
- Added readme to publish process on pypi.org.

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: run-explorer run-tests run-linters build-ui build-python build-docker run-docker compose-up

version="0.10.4"
version="0.11.0"
run-explorer:
@echo "Running explorer API server..."
# open "http://localhost:8000/static/index.html" || true
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.10.4"
__version__ = "0.11.0"
66 changes: 66 additions & 0 deletions cognite/neat/core/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging

from cognite.client import CogniteClient
from fastapi import FastAPI

from cognite.neat.core.data_classes.config import Config
from cognite.neat.core.utils import get_cognite_client_from_config
from cognite.neat.core.workflow.cdf_store import CdfStore
from cognite.neat.core.workflow.manager import WorkflowManager
from cognite.neat.core.workflow.triggers import TriggerManager


class NeatApp:
def __init__(self, config: Config, cdf_client: CogniteClient = None):
self.config = config
self.cdf_client: CogniteClient = None
self.cdf_store: CdfStore = None
self.workflow_manager: WorkflowManager = None
self.triggers_manager: TriggerManager = None
self.fast_api_app: FastAPI = None
self.cdf_client = cdf_client

def set_http_server(self, fast_api_app: FastAPI):
"""Set the http server to be used by the triggers manager"""
self.fast_api_app = fast_api_app

def start(self, config: Config = None):
logging.info("Starting NeatApp")
if config:
self.config = config
logging.info("Initializing global objects")
if not self.cdf_client:
self.cdf_client = get_cognite_client_from_config(self.config.cdf_client)
self.cdf_store = CdfStore(
self.cdf_client,
self.config.cdf_default_dataset_id,
workflows_storage_path=self.config.workflows_store_path,
rules_storage_path=self.config.rules_store_path,
)

# Automatically downloading workflows from CDF if enabled in config
if self.config.workflow_downloader_filter:
self.cdf_store.load_workflows_from_cfg_by_filter(self.config.workflow_downloader_filter)

self.workflow_manager = WorkflowManager(
self.cdf_client,
self.config.workflows_store_type,
self.config.workflows_store_path,
self.config.rules_store_path,
self.config.cdf_default_dataset_id,
)
self.workflow_manager.load_workflows_from_storage_v2()
self.triggers_manager = TriggerManager(workflow_manager=self.workflow_manager)
if self.fast_api_app:
self.triggers_manager.start_http_listeners(self.fast_api_app)
self.triggers_manager.start_time_schedulers()
logging.info("NeatApp started")

def stop(self):
logging.info("Stopping NeatApp")
self.triggers_manager.stop_scheduler_main_loop()
self.cdf_client = None
self.cdf_store = None
self.workflow_manager = None
self.triggers_manager = None
logging.info("NeatApp stopped")
27 changes: 15 additions & 12 deletions cognite/neat/core/workflow/cdf_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,23 @@ def __init__(
self.rules_storage_path = str(rules_storage_path)
self.workflows_storage_type = "file"

def init_cdf_resources(self):
def init_cdf_resources(self, resource_type="all"):
if self.client and self.data_set_id:
try:
list = self.client.labels.list(external_id_prefix="neat-")
if len(list) == 0:
labels = [
LabelDefinition(
external_id="neat-workflow", name="neat-workflow", data_set_id=self.data_set_id
),
LabelDefinition(external_id="neat-latest", name="neat-latest", data_set_id=self.data_set_id),
]
self.client.labels.create(labels)
else:
logging.debug("Labels already exists.")
if resource_type == "all" or resource_type == "labels":
list = self.client.labels.list(external_id_prefix="neat-")
if len(list) == 0:
labels = [
LabelDefinition(
external_id="neat-workflow", name="neat-workflow", data_set_id=self.data_set_id
),
LabelDefinition(
external_id="neat-latest", name="neat-latest", data_set_id=self.data_set_id
),
]
self.client.labels.create(labels)
else:
logging.debug("Labels already exists.")
except Exception as e:
logging.debug(f"Failed to create labels.{e}")

Expand Down
41 changes: 29 additions & 12 deletions cognite/neat/core/workflow/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,36 @@


class WorkflowManager:
"""Workflow manager is responsible for loading, saving and managing workflows
client: CogniteClient
registry_storage_type: str = "file"
workflows_storage_path: Path = Path("workflows")
rules_storage_path: Path = Path("rules")
data_set_id: int = None,
"""

def __init__(
self,
client: CogniteClient,
registry_storage_type: str,
workflows_storage_path: Path,
rules_storage_path: Path,
client: CogniteClient = None,
registry_storage_type: str = "file",
workflows_storage_path: Path = None,
rules_storage_path: Path = None,
data_set_id: int = None,
):
self.client = client
self.data_set_id = data_set_id
self.workflow_registry: dict[str, BaseWorkflow] = {}
self.workflows_storage_type = registry_storage_type
# todo use pathlib
self.workflows_storage_path = str(workflows_storage_path)
self.rules_storage_path = str(rules_storage_path)
self.workflows_storage_path = workflows_storage_path if workflows_storage_path else Path("workflows")
self.rules_storage_path = rules_storage_path if rules_storage_path else Path("rules")
self.task_builder = WorkflowTaskBuilder(client, self)

def update_cdf_client(self, client: CogniteClient):
self.client = client
self.task_builder = WorkflowTaskBuilder(client, self)
self.workflow_registry = {}
self.load_workflows_from_storage_v2()

def get_list_of_workflows(self):
return list(self.workflow_registry.keys())
Expand All @@ -55,7 +69,7 @@ def update_workflow(self, name: str, workflow: WorkflowDefinition):
def save_workflow_to_storage(self, name: str, custom_implementation_module: str = None):
"""Save workflow from memory to storage"""
if self.workflows_storage_type == "file":
full_path = os.path.join(self.workflows_storage_path, name, "workflow.yaml")
full_path = self.workflows_storage_path / name / "workflow.yaml"
wf = self.workflow_registry[name]
with open(full_path, "w") as f:
f.write(
Expand All @@ -65,16 +79,19 @@ def save_workflow_to_storage(self, name: str, custom_implementation_module: str
)

def load_workflows_from_storage_v2(self, dir_path: str = None):
if not dir_path:
"""Loads workflows from disk/storage into memory , initializes and register them in the workflow registry"""
if dir_path:
dir_path = Path(dir_path)
else:
dir_path = self.workflows_storage_path
sys.path.append(dir_path)
sys.path.append(str(dir_path))
for wf_module_name in os.listdir(dir_path):
wf_module_full_path = os.path.join(dir_path, wf_module_name)
if Path(wf_module_full_path).is_dir():
wf_module_full_path = dir_path / wf_module_name
if wf_module_full_path.is_dir():
try:
logging.info(f"Loading workflow {wf_module_name} from {wf_module_full_path}")
# metadata_file = f"{dir_path}//{module_name}.yaml"
metadata_file = os.path.join(dir_path, wf_module_name, "workflow.yaml")
metadata_file = dir_path / wf_module_name / "workflow.yaml"
logging.info(f"Loading workflow {wf_module_name} metadata from {metadata_file}")
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
Expand Down
Loading

0 comments on commit 7800cde

Please sign in to comment.