diff --git a/dataci/config.py b/dataci/config.py index 2eb95e7..181a77f 100644 --- a/dataci/config.py +++ b/dataci/config.py @@ -8,6 +8,7 @@ import configparser import logging import os +from datetime import datetime, timezone from pathlib import Path from textwrap import dedent from threading import Event as ThreadEvent @@ -97,6 +98,7 @@ def load_config(): LOG_DIR = None LOG_LEVEL = None STORAGE_BACKEND = None +TIMEZONE = datetime.now(timezone.utc).astimezone().tzinfo # DataCI Trigger and Scheduler server SERVER_ADDRESS = '0.0.0.0' diff --git a/dataci/db/init.py b/dataci/db/init.py index dee5627..23bcd51 100644 --- a/dataci/db/init.py +++ b/dataci/db/init.py @@ -17,6 +17,8 @@ # Drop all tables with db_connection: db_connection.executescript(""" + DROP TABLE IF EXISTS lineage; + DROP TABLE IF EXISTS run; DROP TABLE IF EXISTS dataset_tag; DROP TABLE IF EXISTS dataset; DROP TABLE IF EXISTS workflow_dag_node; @@ -24,12 +26,23 @@ DROP TABLE IF EXISTS stage; DROP TABLE IF EXISTS workflow_tag; DROP TABLE IF EXISTS workflow; + DROP TABLE IF EXISTS job; """) logger.info('Drop all tables.') # Create dataset table with db_connection: db_connection.executescript(""" + CREATE TABLE job + ( + workspace TEXT, + name TEXT, + version TEXT, + type TEXT, + PRIMARY KEY (workspace, name, version, type), + UNIQUE (workspace, name, version, type) + ); + CREATE TABLE workflow ( workspace TEXT, @@ -45,7 +58,9 @@ script_filelist TEXT, script_hash TEXT, PRIMARY KEY (workspace, name, version), - UNIQUE (workspace, name, version) + UNIQUE (workspace, name, version), + FOREIGN KEY (workspace, name, version) + REFERENCES job (workspace, name, version) ); CREATE TABLE workflow_tag @@ -73,7 +88,9 @@ script_filelist TEXT, script_hash TEXT, PRIMARY KEY (workspace, name, version), - UNIQUE (workspace, name, version) + UNIQUE (workspace, name, version), + FOREIGN KEY (workspace, name, version) + REFERENCES job (workspace, name, version) ); CREATE TABLE stage_tag @@ -133,5 +150,42 @@ FOREIGN KEY (workspace, name, version) REFERENCES dataset (workspace, name, version) ); + + CREATE TABLE run + ( + workspace TEXT, + name TEXT, + version INTEGER, + status TEXT, + job_workspace TEXT, + job_name TEXT, + job_version TEXT, + job_type TEXT, + create_time INTEGER, + update_time INTEGER, + PRIMARY KEY (workspace, name, version), + UNIQUE (name, version), + FOREIGN KEY (workspace, name, version) + REFERENCES job (workspace, name, version), + FOREIGN KEY (job_workspace, job_name, job_version, job_type) + REFERENCES job (workspace, name, version, type) + ); + + CREATE TABLE lineage + ( + upstream_workspace TEXT, + upstream_name TEXT, + upstream_version TEXT, + upstream_type TEXT, + downstream_workspace TEXT, + downstream_name TEXT, + downstream_version TEXT, + downstream_type TEXT, + PRIMARY KEY (upstream_workspace, upstream_name, upstream_version, upstream_type, downstream_workspace, downstream_name, downstream_version, downstream_type), + FOREIGN KEY (upstream_workspace, upstream_name, upstream_version, upstream_type) + REFERENCES job (workspace, name, version, type), + FOREIGN KEY (downstream_workspace, downstream_name, downstream_version, downstream_type) + REFERENCES job (workspace, name, version, type) + ); """) logger.info('Create all tables.') diff --git a/dataci/db/lineage.py b/dataci/db/lineage.py new file mode 100644 index 0000000..e088013 --- /dev/null +++ b/dataci/db/lineage.py @@ -0,0 +1,432 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Nov 27, 2023 +""" +import sqlite3 +from collections import OrderedDict + +from dataci.config import DB_FILE + + +def get_many_upstream_lineage(downstream_config): + """Config in downstream.""" + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + FROM lineage + WHERE ( + downstream_workspace = :workspace + AND downstream_name = :name + AND downstream_version = :version + AND downstream_type = :type + ) + """, + downstream_config, + ) + return [ + { + 'workspace': row[0], + 'name': row[1], + 'version': row[2], + 'type': row[3], + } for row in cur.fetchall() + ] + + +def get_many_downstream_lineage(upstream_config): + """Config in upstream.""" + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + FROM lineage + WHERE ( + upstream_workspace = :workspace + AND upstream_name = :name + AND upstream_version = :version + AND upstream_type = :type + ) + """, + upstream_config, + ) + return [ + { + 'workspace': row[0], + 'name': row[1], + 'version': row[2], + 'type': row[3], + } for row in cur.fetchall() + ] + + +def list_many_upstream_lineage(downstream_configs): + """List all upstream lineage of downstream_configs.""" + # Return empty list if no downstream_configs, + # this prevents SQL syntax error when generating SQL statement + if len(downstream_configs) == 0: + return list() + + # Create a ordered dict to preserve the order of downstream_configs + od = OrderedDict() + for downstream_config in downstream_configs: + od[( + downstream_config['workspace'], + downstream_config['name'], + str(downstream_config['version']), + downstream_config['type'] + )] = list() + + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + sql_lineage_values = ',\n'.join([ + repr(( + downstream_config['workspace'], + downstream_config['name'], + str(downstream_config['version']), + downstream_config['type'], + )) + for downstream_config in downstream_configs + ]) + cur.execute( + f""" + WITH downstreams ( + downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + ) AS ( + VALUES {sql_lineage_values} + ) + ,lineages AS ( + SELECT upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ,downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + FROM lineage + ) + SELECT upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ,downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + FROM lineages + JOIN downstreams USING ( + downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + ) + ; + """ + ) + + for row in cur.fetchall(): + od[(row[4], row[5], row[6], row[7],)].append({ + 'workspace': row[0], + 'name': row[1], + 'version': row[2], + 'type': row[3], + }) + return list(od.values()) + + +def list_many_downstream_lineage(upstream_configs): + """List all downstream lineage of upstream_configs.""" + # Return empty list if no upstream_configs, + # this prevents SQL syntax error when generating SQL statement + if len(upstream_configs) == 0: + return list() + + # Create a ordered dict to preserve the order of upstream_configs + od = OrderedDict() + for upstream_config in upstream_configs: + od[( + upstream_config['workspace'], + upstream_config['name'], + str(upstream_config['version']), + upstream_config['type'] + )] = list() + + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + sql_lineage_values = ',\n'.join([ + repr(( + upstream_config['workspace'], + upstream_config['name'], + str(upstream_config['version']), + upstream_config['type'], + )) + for upstream_config in upstream_configs + ]) + cur.execute( + f""" + WITH upstreams ( + upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ) AS ( + VALUES {sql_lineage_values} + ) + ,lineages AS ( + SELECT upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ,downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + FROM lineage + ) + SELECT upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ,downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + FROM lineages + JOIN upstreams USING ( + upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ) + ; + """ + ) + + for row in cur.fetchall(): + od[(row[0], row[1], row[2], row[3],)].append({ + 'workspace': row[4], + 'name': row[5], + 'version': row[6], + 'type': row[7], + }) + return list(od.values()) + + +def exist_one_lineage(upstream_config, downstream_config): + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT EXISTS( + SELECT 1 + FROM lineage + WHERE upstream_workspace = :upstream_workspace + AND upstream_name = :upstream_name + AND upstream_version = :upstream_version + AND upstream_type = :upstream_type + AND downstream_workspace = :downstream_workspace + AND downstream_name = :downstream_name + AND downstream_version = :downstream_version + AND downstream_type = :downstream_type + ) + ; + """, + { + 'upstream_workspace': upstream_config['workspace'], + 'upstream_name': upstream_config['name'], + 'upstream_version': upstream_config['version'], + 'upstream_type': upstream_config['type'], + 'downstream_workspace': downstream_config['workspace'], + 'downstream_name': downstream_config['name'], + 'downstream_version': downstream_config['version'], + 'downstream_type': downstream_config['type'], + } + ) + return cur.fetchone()[0] + + +def exist_many_downstream_lineage(upstream_config, downstream_configs): + # Return empty list if no upstream_configs or downstream_configs, + # this prevents SQL syntax error when generating SQL statement + if len(downstream_configs) == 0: + return list() + + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + sql_lineage_values = ',\n'.join([ + repr(( + downstream_config['workspace'], + downstream_config['name'], + downstream_config['version'], + downstream_config['type'], + )) + for downstream_config in downstream_configs + ]) + cur.execute( + f""" + WITH downstreams ( + downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + ) AS ( + VALUES {sql_lineage_values} + ) + ,lineages AS ( + SELECT TRUE AS flg + ,upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ,downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + FROM lineage + WHERE upstream_workspace = :upstream_workspace + AND upstream_name = :upstream_name + AND upstream_version = :upstream_version + AND upstream_type = :upstream_type + ) + SELECT COALESCE(flg, FALSE) AS flg + FROM downstreams + LEFT JOIN lineages USING ( + downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + ) + ; + """, + { + 'upstream_workspace': upstream_config['workspace'], + 'upstream_name': upstream_config['name'], + 'upstream_version': upstream_config['version'], + 'upstream_type': upstream_config['type'], + } + ) + return [bool(row[0]) for row in cur.fetchall()] + + +def exist_many_upstream_lineage(upstream_configs, downstream_config): + # Return empty list if no upstream_configs or downstream_configs, + # this prevents SQL syntax error when generating SQL statement + if len(upstream_configs) == 0: + return list() + + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + sql_lineage_values = ',\n'.join([ + repr(( + upstream_config['workspace'], + upstream_config['name'], + upstream_config['version'], + upstream_config['type'], + )) + for upstream_config in upstream_configs + ]) + cur.execute( + f""" + WITH upstreams ( + upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ) AS ( + VALUES {sql_lineage_values} + ) + ,lineages AS ( + SELECT TRUE AS flg + ,upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ,downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + FROM lineage + WHERE downstream_workspace = :downstream_workspace + AND downstream_name = :downstream_name + AND downstream_version = :downstream_version + AND downstream_type = :downstream_type + ) + SELECT COALESCE(flg, FALSE) AS flg + FROM upstreams + LEFT JOIN lineages USING ( + upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ) + ; + """, + { + 'downstream_workspace': downstream_config['workspace'], + 'downstream_name': downstream_config['name'], + 'downstream_version': downstream_config['version'], + 'downstream_type': downstream_config['type'], + } + ) + return [bool(row[0]) for row in cur.fetchall()] + + +def create_many_lineage(config): + # Permute all upstream and downstream lineage + lineage_configs = list() + for upstream_config in config['upstream']: + for downstream_config in config['downstream']: + lineage_configs.append({ + 'upstream_workspace': upstream_config['workspace'], + 'upstream_name': upstream_config['name'], + 'upstream_version': upstream_config['version'], + 'upstream_type': upstream_config['type'], + 'downstream_workspace': downstream_config['workspace'], + 'downstream_name': downstream_config['name'], + 'downstream_version': downstream_config['version'], + 'downstream_type': downstream_config['type'], + }) + + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + cur.executemany( + """ + INSERT INTO lineage ( + upstream_workspace + ,upstream_name + ,upstream_version + ,upstream_type + ,downstream_workspace + ,downstream_name + ,downstream_version + ,downstream_type + ) + VALUES ( + :upstream_workspace + ,:upstream_name + ,:upstream_version + ,:upstream_type + ,:downstream_workspace + ,:downstream_name + ,:downstream_version + ,:downstream_type + ) + ; + """, + lineage_configs, + ) diff --git a/dataci/db/run.py b/dataci/db/run.py index 302b3a5..da7459b 100644 --- a/dataci/db/run.py +++ b/dataci/db/run.py @@ -5,71 +5,267 @@ Email: yuanmingleee@gmail.com Date: Mar 14, 2023 """ -from . import db_connection +import sqlite3 +from copy import deepcopy +from dataci.config import DB_FILE -def get_next_run_num(pipeline_name, pipeline_version): - with db_connection: - (next_run_id,), = db_connection.execute( + +def create_one_run(config: dict): + config = deepcopy(config) + job_config = config.pop('job') + config['job_workspace'] = job_config['workspace'] + config['job_name'] = job_config['name'] + config['job_version'] = job_config['version'] + config['job_type'] = job_config['type'] + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + # Create job + cur.execute( + """ + INSERT INTO job ( + workspace, + name, + version, + type + ) + VALUES (:workspace, :name, :version, :type) + ; + """, + config + ) + # Create run + cur.execute( """ - SELECT COALESCE(MAX(run_num), 0) + 1 AS next_run_id - FROM run - WHERE pipeline_name = ? - AND pipeline_version = ? + INSERT INTO run ( + workspace, + name, + version, + status, + job_workspace, + job_name, + job_version, + job_type, + create_time, + update_time + ) + VALUES (:workspace, :name, :version, :status, :job_workspace, :job_name, :job_version, :job_type, :create_time, :update_time) ; """, - (pipeline_name, pipeline_version) + config ) - return next_run_id + return cur.lastrowid -def create_one_run(run_dict): - pipeline_dict = run_dict['pipeline'] - with db_connection: - db_connection.execute( +def update_one_run(config): + config = deepcopy(config) + job_config = config.pop('job') + config['job_workspace'] = job_config['workspace'] + config['job_name'] = job_config['name'] + config['job_version'] = job_config['version'] + config['job_type'] = job_config['type'] + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + cur.execute( """ - INSERT INTO run(run_num, pipeline_name, pipeline_version) VALUES - (?,?,?) + UPDATE run + SET status = :status + , job_workspace = :job_workspace + , job_name = :job_name + , job_version = :job_version + , job_type = :job_type + , update_time = :update_time + WHERE name = :name + AND version = :version ; """, - (run_dict['run_num'], pipeline_dict['name'], - pipeline_dict['version']) + config ) + return cur.lastrowid -def get_many_runs(pipeline_name, pipeline_version): - with db_connection: - run_dict_iter = db_connection.execute( +def exist_run(name, version): + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + (exists,), = cur.execute( """ - SELECT run.*, - timestamp - FROM ( - SELECT run_num, - pipeline_name, - pipeline_version + SELECT EXISTS( + SELECT 1 + FROM run + WHERE name = ? + AND version = ? + ) + ; + """, + (name, version) + ) + return exists + + +def get_latest_run_version(name): + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + (version,), = cur.execute( + """ + SELECT MAX(version) + FROM run + WHERE name = ? + ; + """, + (name,) + ) + return version or 0 + + +def get_next_run_version(name): + return get_latest_run_version(name) + 1 + + +def get_one_run(name, version='latest'): + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + if version == 'latest': + cur.execute( + """ + SELECT workspace + , name + , version + , status + , job_workspace + , job_name + , job_version + , job_type + , create_time + , update_time FROM run - WHERE pipeline_name GLOB ? - AND pipeline_version GLOB ? - ) run - JOIN ( - SELECT name, - version, - timestamp - FROM pipeline - WHERE name GLOB ? - AND version GLOB ? - ) pipeline - ON pipeline_name = name - AND pipeline_version = version + WHERE name = ? + ORDER BY version DESC + LIMIT 1 + ; + """, + (name,) + ) + else: + cur.execute( + """ + SELECT workspace + , name + , version + , status + , job_workspace + , job_name + , job_version + , job_type + , create_time + , update_time + FROM run + WHERE name = ? + AND version = ? + ; + """, + (name, version) + ) + + config = cur.fetchone() + return { + 'workspace': config[0], + 'name': config[1], + 'version': config[2], + 'status': config[3], + 'job': { + 'workspace': config[4], + 'name': config[5], + 'version': config[6], + 'type': config[7], + }, + 'create_time': config[8], + 'update_time': config[9], + } if config else None + + +def list_run_by_job(workspace, name, version, type): + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT workspace + , name + , version + , status + , job_workspace + , job_name + , job_version + , job_type + , create_time + , update_time + FROM run + WHERE job_workspace = ? + AND job_name = ? + AND job_version = ? + AND job_type = ? + ; + """, + (workspace, name, version, type) + ) + configs = cur.fetchall() + return [ + { + 'workspace': config[0], + 'name': config[1], + 'version': config[2], + 'status': config[3], + 'job': { + 'workspace': config[4], + 'name': config[5], + 'version': config[6], + 'type': config[7], + }, + 'create_time': config[8], + 'update_time': config[9], + } + for config in configs + ] + + +def list_run_by_job(workspace, name, version, type): + with sqlite3.connect(DB_FILE) as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT workspace + , name + , version + , status + , job_workspace + , job_name + , job_version + , job_type + , create_time + , update_time + FROM run + WHERE job_workspace = ? + AND job_name = ? + AND job_version = ? + AND job_type = ? ; """, - (pipeline_name, pipeline_version, pipeline_name, pipeline_version), + (workspace, name, version, type) ) - run_dict_list = list() - for run_po in run_dict_iter: - run_num, pipeline_name, pipeline_version, timestamp = run_po - run_dict_list.append({ - 'run_num': run_num, - 'pipeline': {'name': pipeline_name, 'version': pipeline_version, 'timestamp': timestamp}, - }) - return run_dict_list + configs = cur.fetchall() + return [ + { + 'workspace': config[0], + 'name': config[1], + 'version': config[2], + 'status': config[3], + 'job': { + 'workspace': config[4], + 'name': config[5], + 'version': config[6], + 'type': config[7], + }, + 'create_time': config[8], + 'update_time': config[9], + } + for config in configs + ] diff --git a/dataci/db/stage.py b/dataci/db/stage.py index 05834ab..c4780d1 100644 --- a/dataci/db/stage.py +++ b/dataci/db/stage.py @@ -24,6 +24,14 @@ def create_one_stage(stage_dict): cur = conn.cursor() cur.execute( """ + -- Insert into job first + INSERT INTO job (workspace, name, version, type) + VALUES (:workspace, :name, :version, :type) + ; + """, + stage_dict, + ) + cur.execute(""" INSERT INTO stage ( workspace, name, version, params, timestamp, script_dir, script_entry, script_filelist, script_hash ) diff --git a/dataci/db/workflow.py b/dataci/db/workflow.py index 5de40c8..aee8bd8 100644 --- a/dataci/db/workflow.py +++ b/dataci/db/workflow.py @@ -29,6 +29,14 @@ def create_one_workflow(config): with sqlite3.connect(DB_FILE) as conn: cur = conn.cursor() + cur.execute( + """ + INSERT INTO job (workspace, name, version, type) + VALUES (:workspace, :name, :version, :type) + ; + """, + workflow_dict, + ) cur.execute( """ INSERT INTO workflow ( @@ -381,7 +389,7 @@ def get_one_workflow_by_tag(workspace, name, tag): 'timestamp': config[4], 'params': '', 'flag': '', - 'trigger': json.loads(config[6]), + 'trigger': json.loads(config[6]) if config[6] is not None else list(), 'dag': { 'edge': json.loads(config[8]), }, @@ -396,7 +404,7 @@ def get_one_workflow_by_tag(workspace, name, tag): version = workflow_dict['version'] cur.execute( dedent(""" - SELECT stage_workspace, stage_name, stage_version, dag_node_id + SELECT stage_workspace, stage_name, stage_version, dag_node_id, dag_node_path FROM workflow_dag_node WHERE workflow_workspace=:workspace AND workflow_name=:name @@ -414,6 +422,7 @@ def get_one_workflow_by_tag(workspace, name, tag): 'workspace': node[0], 'name': node[1], 'version': node[2] if node[2] != '' else None, + 'path': node[4], } for node in cur.fetchall() } return workflow_dict diff --git a/dataci/decorators/base.py b/dataci/decorators/base.py index f8f93c9..33e3fff 100644 --- a/dataci/decorators/base.py +++ b/dataci/decorators/base.py @@ -48,8 +48,8 @@ def script(self): def test(self, *args, **kwargs): return self._stage.test(*args, **kwargs) - def dict(self): - return self._stage.dict() + def dict(self, id_only=False): + return self._stage.dict(id_only=id_only) def from_dict(self, config): self._stage.from_dict(config) @@ -66,3 +66,9 @@ def save(self): def publish(self): self._stage.publish() return self + + def upstream(self, n=1, type=None): + return self._stage.upstream(n, type) + + def downstream(self, n=1, type=None): + return self._stage.downstream(n, type) diff --git a/dataci/decorators/event.py b/dataci/decorators/event.py index 3ad13e1..2765ef2 100644 --- a/dataci/decorators/event.py +++ b/dataci/decorators/event.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: from typing import Type, Union, TypeVar, Callable - from dataci.models.base import BaseModel + from dataci.models.base import Job T = TypeVar('T', bound=Callable) @@ -18,7 +18,7 @@ def event(name: str = None, producer: str = None): def wrapper(func: 'T') -> 'T': @wraps(func) - def inner_wrapper(self: 'Union[BaseModel, Type[BaseModel]]', *args, **kwargs): + def inner_wrapper(self: 'Union[Job, Type[Job]]', *args, **kwargs): # Prevent circular import from dataci.models import Event diff --git a/dataci/models/__init__.py b/dataci/models/__init__.py index 43089d8..0f0a7e2 100644 --- a/dataci/models/__init__.py +++ b/dataci/models/__init__.py @@ -5,13 +5,19 @@ Email: yuanmingleee@gmail.com Date: Feb 20, 2023 """ -from .base import BaseModel +from .base import Job from .dataset import Dataset from .event import Event +from .lineage import Lineage +from .run import Run from .stage import Stage from .workflow import Workflow from .workspace import Workspace __all__ = [ - 'BaseModel', 'Workspace', 'Dataset', 'Event', 'Workflow', 'Stage', + 'Job', 'Workspace', 'Dataset', 'Event', 'Workflow', 'Stage', 'Run', 'Lineage', ] + + +# Register subclasses of Job +getattr(Job, '_Job__register_job_type')() diff --git a/dataci/models/base.py b/dataci/models/base.py index 5f94d74..c2faea8 100644 --- a/dataci/models/base.py +++ b/dataci/models/base.py @@ -7,12 +7,17 @@ """ import abc import re +from dataclasses import dataclass, asdict +from typing import TYPE_CHECKING from dataci.config import DEFAULT_WORKSPACE from dataci.models.workspace import Workspace +if TYPE_CHECKING: + from typing import Dict, Type -class BaseModel(abc.ABC): + +class Job(abc.ABC): NAME_PATTERN = re.compile(r'^(?:[a-z]\w*\.)?[a-z]\w*$', flags=re.IGNORECASE) VERSION_PATTERN = re.compile(r'latest|v\d+|none|[\da-f]+', flags=re.IGNORECASE) GET_DATA_MODEL_IDENTIFIER_PATTERN = re.compile( @@ -22,11 +27,12 @@ class BaseModel(abc.ABC): r'^(?:([a-z]\w*)\.)?([\w:.*[\]]+?)(?:@(\d+|latest|none|\*))?$', re.IGNORECASE ) type_name: str + __type_name_mapper__: 'Dict[str, Type[Job]]' = dict() def __init__(self, name, *args, **kwargs): # Prevent to pass invalid arguments to object.__init__ mro = type(self).mro() - for next_cls in mro[mro.index(BaseModel) + 1:]: + for next_cls in mro[mro.index(Job) + 1:]: if '__init__' in next_cls.__dict__: break else: @@ -55,7 +61,7 @@ def uri(self): return f'dataci://{self.workspace.name}/{self.type_name}/{self.name}/{self.version}' @abc.abstractmethod - def dict(self): + def dict(self, id_only=False): pass @classmethod @@ -72,8 +78,16 @@ def publish(self): pass @classmethod - @abc.abstractmethod - def get(cls, name, version=None, not_found_ok=False): + def get(cls, name, version=None, workspace=None, type=..., **kwargs) -> 'Job': + subcls = cls.__type_name_mapper__.get(type, None) + if not subcls: + raise ValueError(f'Invalid type {type}') + return subcls.get(name=name, version=version, workspace=workspace, **kwargs) + + def upstream(self, n=1, type=None): + pass + + def downstream(self, n=1, type=None): pass # @classmethod @@ -119,3 +133,23 @@ def parse_data_model_list_identifier(cls, identifier): version = str(version or '*').lower() return workspace, name, version + + @classmethod + def __register_job_type(cls): + """Register data class to job, this is essential load data class from job.""" + for sub_cls in cls.__subclasses__(): + cls.__type_name_mapper__[sub_cls.type_name] = sub_cls + + +@dataclass(frozen=True) +class JobView: + type: str + workspace: str + name: str + version: str = None + + def get(self) -> 'Job': + return Job.get(workspace=self.workspace, name=self.name, version=self.version, type=self.type) + + def dict(self): + return asdict(self) diff --git a/dataci/models/dataset.py b/dataci/models/dataset.py index 4a12ccf..d119723 100644 --- a/dataci/models/dataset.py +++ b/dataci/models/dataset.py @@ -32,7 +32,8 @@ ) from dataci.decorators.event import event from dataci.utils import hash_binary -from .base import BaseModel +from .base import Job +from .lineage import LineageGraph if TYPE_CHECKING: from typing import Optional, Union, Type @@ -220,7 +221,7 @@ def __len__(self): } -class Dataset(BaseModel): +class Dataset(Job): type_name = 'dataset' VERSION_PATTERN = re.compile(r'latest|none|\w+', flags=re.IGNORECASE) @@ -341,6 +342,13 @@ def from_dict(cls, config): return dataset_obj def dict(self, id_only=False): + if id_only: + return { + 'workspace': self.workspace.name, + 'type': self.type_name, + 'name': self.name, + 'version': self.version, + } config = { 'workspace': self.workspace.name, 'name': self.name, @@ -447,7 +455,17 @@ def publish(self, version_tag=None): return self.reload(config) @classmethod - def get(cls, name: str, version=None, not_found_ok=False, file_reader='auto', file_writer='csv'): + def get( + cls, + name: str, + workspace=None, + version=None, + not_found_ok=False, + file_reader='auto', + file_writer='csv', + **kwargs, + ): + name = workspace + '.' + name if workspace else name workspace, name, version_or_tag = cls.parse_data_model_get_identifier(name, version) if version_or_tag is None or cls.VERSION_TAG_PATTERN.match(version_or_tag) is not None: @@ -515,3 +533,13 @@ def find(cls, dataset_identifier=None, tree_view=False, all=False): return dict(dataset_dict) return dataset_list + + def upstream(self, n=1, type=None): + """Get upstream""" + """Get upstream lineage.""" + g = LineageGraph.upstream(self, n, type) + return g + + def downstream(self, n=1, type=None): + """Get downstream lineage.""" + return LineageGraph.downstream(self, n, type) diff --git a/dataci/models/lineage.py b/dataci/models/lineage.py new file mode 100644 index 0000000..6292033 --- /dev/null +++ b/dataci/models/lineage.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Nov 22, 2023 +""" +import dataclasses +import warnings +from itertools import chain +from typing import TYPE_CHECKING + +import networkx as nx + +from dataci.db.lineage import ( + exist_many_downstream_lineage, + exist_many_upstream_lineage, + create_many_lineage, + list_many_upstream_lineage, + list_many_downstream_lineage, +) +from dataci.models.base import Job, JobView + +if TYPE_CHECKING: + from typing import List, Union + + +class Lineage(object): + + def __init__( + self, + upstream: 'Union[List[Job], Job, dict]', + downstream: 'Union[List[Job], Job, dict]', + ): + # only one of upstream and downstream can be list + if isinstance(upstream, list) and isinstance(downstream, list): + raise ValueError('Only one of upstream and downstream can be list.') + self._upstream = upstream if isinstance(upstream, list) else [upstream] + self._downstream = downstream if isinstance(downstream, list) else [downstream] + + def dict(self): + return { + 'upstream': [node.dict(id_only=True) for node in self.upstream], + 'downstream': [node.dict(id_only=True) for node in self.downstream], + } + + @classmethod + def from_dict(cls, config): + pass + + @property + def upstream(self) -> 'List[Job]': + """Lazy load upstream from database.""" + nodes = list() + for node in self._upstream: + if isinstance(node, Job): + nodes.append(node) + elif isinstance(node, dict): + nodes.append(Job.get(**node)) + else: + warnings.warn(f'Unable to parse upstream {node}') + self._upstream = nodes + return self._upstream + + @property + def downstream(self) -> 'List[Job]': + """Lazy load downstream from database.""" + nodes = list() + for node in self._downstream: + if isinstance(node, Job): + nodes.append(node) + elif isinstance(node, dict): + nodes.append(Job.get(**node)) + else: + warnings.warn(f'Unable to parse downstream {node}') + self._downstream = nodes + return self._downstream + + def save(self, exist_ok=True): + config = self.dict() + + if len(config['upstream']) == 1: + # Check if downstream lineage exists + upstream_config = config['upstream'][0] + lineage_exist_status_list = exist_many_downstream_lineage( + upstream_config, config['downstream'], + ) + + if any(lineage_exist_status_list): + if not exist_ok: + exist_downstreams = [ + downstream_config for downstream_config, exist in zip( + config['downstream'], lineage_exist_status_list + ) if exist + ] + raise ValueError(f"Lineage exists: {upstream_config} -> {exist_downstreams}") + else: + # Remove the existed lineage + config['downstream'] = [ + node for node, exist in zip(config['downstream'], lineage_exist_status_list) if not exist + ] + else: + # Check if upstream lineage exists + downstream_config = config['downstream'][0] + lineage_exist_status_list = exist_many_upstream_lineage( + config['upstream'], downstream_config, + ) + + if any(lineage_exist_status_list): + if not exist_ok: + exist_upstreams = [ + upstream_config for upstream_config, exist in zip( + config['upstream'], lineage_exist_status_list + ) if exist + ] + raise ValueError(f"Lineage exists: {exist_upstreams} -> {downstream_config}") + else: + # Remove the existed lineage + config['upstream'] = [ + node for node, exist in zip(config['upstream'], lineage_exist_status_list) if not exist + ] + + # Create dataset lineage + create_many_lineage(config) + + return self + +class LineageGraph: + + def get_vertices(self): + # Retrieves all vertices from the vertices table V. + pass + + def get_edges(self): + # Retrieves all edges from the edge table E. + pass + + def get_vertex(self, vertex_id): + # Retrieves a vertex from the vertices table V by its ID. + pass + + def get_edge(self, edge_id): + # Retrieves an edge from the edge table E by its ID. + pass + + @classmethod + def upstream(cls, job: 'Union[LineageAllowedType, dict]', n: 'int' = 1, type: 'str' = None) -> 'nx.DiGraph': + """Retrieves incoming edges that are connected to a vertex. + """ + from dataci.models import Job + + if isinstance(job, Job): + job_config = job.dict(id_only=True) + else: + job_config = job + job_view = JobView(**job_config) + + g = nx.DiGraph() + g.add_node(job_view) + job_views = {job_view} + # Retrieve upstream lineage up to n levels + for _ in range(n): + # (level n) -> . . + # job configs to query for next iteration, job configs to query and add to graph + job_views, job_views_add = set(), job_views + # Recursively query for upstream lineage until all lineage_configs are the same `type` as the argument + while len(job_views_add) > 0: + lineage_configs = list_many_upstream_lineage([job_view.dict() for job_view in job_views_add]) + for upstream_job_view, upstreams in zip(job_views_add, lineage_configs): + g.add_edges_from((JobView(**upstream), upstream_job_view) for upstream in upstreams) + job_views_add.clear() + # (level n-1) -> . . . x + # \/ \/ + # (level n) . . + # upstreams that are the same `type` as the argument (represented as dot ".") + # will be queried for next level of lineage + # the others (represented as cross "x") will query for next iteration in the loop, because they + # are not considered as a valid node for the current level + for upstream in chain.from_iterable(lineage_configs): + upstream_job_view = JobView(**upstream) + if type is None or upstream_job_view.type == type: + job_views.add(upstream_job_view) + else: + job_views_add.add(upstream_job_view) + return g + + + @classmethod + def downstream(cls, job: 'Union[Job, dict]', n: 'int' = 1, type: 'str' = None) -> 'nx.DiGraph': + """Retrieves outgoing edges that are connected to a vertex. + """ + from dataci.models import Job + + if isinstance(job, Job): + job_config = job.dict(id_only=True) + else: + job_config = job + job_view = JobView(**job_config) + + g = nx.DiGraph() + g.add_node(job_view) + job_views = {job_view} + # Retrieve downstream lineage up to n levels + for _ in range(n): + # (level n) -> . . + # job configs to query for next iteration, job configs to query and add to graph + job_views, job_views_add = set(), job_views + # Recursively query for downstream lineage until all lineage_configs are the same `type` as the argument + while len(job_views_add) > 0: + lineage_configs = list_many_downstream_lineage([job_view.dict() for job_view in job_views_add]) + for upstream_job_view, downstreams in zip(job_views_add, lineage_configs): + g.add_edges_from((upstream_job_view, JobView(**downstream)) for downstream in downstreams) + job_views_add.clear() + # (level n) . . + # /\ /\ + # (level n+1) -> . . . x + # downstreams that are the same `type` as the argument (represented as dot ".") + # will be queried for next level of lineage + # the others (represented as cross "x") will query for next iteration in the loop, because they + # are not considered as a valid node for the current level + for downstream in chain.from_iterable(lineage_configs): + downstream_job_view = JobView(**downstream) + if type is None or downstream_job_view.type == type: + job_views.add(downstream_job_view) + else: + job_views_add.add(downstream_job_view) + return g diff --git a/dataci/models/run.py b/dataci/models/run.py new file mode 100644 index 0000000..5460740 --- /dev/null +++ b/dataci/models/run.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Mar 09, 2023 + +Run for pipeline. +""" +import re +import warnings +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +import networkx as nx + +from dataci.config import TIMEZONE +from dataci.db.run import ( + exist_run, + create_one_run, + get_next_run_version, + get_latest_run_version, + get_one_run, + list_run_by_job, + update_one_run +) +from dataci.models import Job +from dataci.models.lineage import LineageGraph + +if TYPE_CHECKING: + from typing import Optional, Union + + from dataci.models import Workflow, Stage + + +class Run(Job): + # run id (uuid) + NAME_PATTERN = re.compile(r'^[a-f0-9]{8}-?[a-f0-9]{4}-?[a-f0-9]{4}-?[a-f0-9]{4}-?[a-f0-9]{12}$', flags=re.IGNORECASE) + VERSION_PATTERN = re.compile(r'^\d+|latest$', flags=re.IGNORECASE) + GET_DATA_MODEL_IDENTIFIER_PATTERN = re.compile( + r'^(?:([a-z]\w*)\.)?([a-f0-9]{8}-?[a-f0-9]{4}-?[a-f0-9]{4}-?[a-f0-9]{4}-?[a-f0-9]{12})(\d+|latest$)?$', flags=re.IGNORECASE + ) + type_name = 'run' + + def __init__( + self, + name: str, + status: str, + job: 'Union[Workflow, Stage, dict]', + create_time: 'Optional[datetime]' = None, + update_time: 'Optional[datetime]' = None, + **kwargs + ): + # TODO: get workspace from job + super().__init__(name, **kwargs) + self.status: str = status + self._job = job + self.version = None + self.create_time = create_time + self.update_time = update_time + + @property + def try_num(self): + return self.version + + @property + def job(self) -> 'Job': + """Lazy load job (workflow or stage) from database.""" + from dataci.decorators.base import DecoratedOperatorStageMixin + if not isinstance(self._job, (Job, DecoratedOperatorStageMixin)): + self._job = Job.get(**self._job) + return self._job + + def dict(self, id_only=False): + if id_only: + return { + 'workspace': self.workspace.name, + 'name': self.name, + 'version': self.version, + 'type': self.type_name, + } + return { + 'workspace': self.workspace.name, + 'type': self.type_name, + 'name': self.name, + 'version': self.version, + 'status': self.status, + 'job': self.job.dict(id_only=True) if self.job else None, + 'create_time': int(self.create_time.replace(tzinfo=timezone.utc).timestamp()) if self.create_time else None, + 'update_time': int(self.update_time.replace(tzinfo=timezone.utc).timestamp()) if self.update_time else None, + } + + @classmethod + def from_dict(cls, config): + self = cls(**config) + return self.reload(config) + + def reload(self, config=None): + if config is None: + config = get_one_run(self.name, self.version) + self.version = config['version'] + self.status = config['status'] + self.create_time = datetime.fromtimestamp(config['create_time'], tz=TIMEZONE) + self.update_time = datetime.fromtimestamp(config['update_time'], tz=TIMEZONE) + return self + + def save(self, version=None): + # Get next run try number + version = self.version or get_next_run_version(self.name) + # Check if run exists + if exist_run(self.name, version): + # reload + config = get_one_run(self.name, version) + return self.reload(config) + + config = self.dict() + config['version'] = version + config['update_time'] = config['create_time'] + create_one_run(config) + return self.reload(config) + + def publish(self): + warnings.warn('Run.publish(...) is not implemented. Use Run.save() instead.', DeprecationWarning) + return self + + def update(self): + # Get latest run try number + version = self.version or get_latest_run_version(self.name) + # Check if run exists + run_prev = get_one_run(self.name, version) + if run_prev is None: + raise ValueError(f'Run {self.name}@{version} not found.') + # Update run by merging with previous run + config = self.dict() + config['version'] = version + config['create_time'] = run_prev['create_time'] + # Overwrite with previous field values if not set + for k, v in run_prev.items(): + if k not in config: + config[k] = v + + update_one_run(config) + return self.reload(config) + + @classmethod + def get(cls, name, version=None, workspace=None, not_found_ok=False, **kwargs): + """Get run by name and version.""" + workspace, name, version = cls.parse_data_model_get_identifier(name, version) + # If version not set, get the latest version + version = version or 'latest' + config = get_one_run(name, version) + if config is None: + if not_found_ok: + return + raise ValueError(f'Run {name}@{version} not found.') + + return cls.from_dict(config) + + @classmethod + def find_by_job(cls, workspace, name, version, type): + """Find run by job id.""" + configs = list_run_by_job(workspace=workspace, name=name, version=version, type=type) + + return [cls.from_dict(config) for config in configs] + + def upstream(self, n=1, type=None): + """Get upstream lineage.""" + g = LineageGraph.upstream(self, n, type) + return g + + def downstream(self, n=1, type=None): + """Get downstream lineage.""" + return LineageGraph.downstream(self, n, type) diff --git a/dataci/models/run/list.py b/dataci/models/run/list.py deleted file mode 100644 index f1d733e..0000000 --- a/dataci/models/run/list.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Author: Li Yuanming -Email: yuanmingleee@gmail.com -Date: Mar 16, 2023 -""" -from collections import defaultdict -from typing import TYPE_CHECKING - -from dataci.pipeline.list import LIST_PIPELINE_IDENTIFIER_PATTERN -from dataci.run import Run - -from dataci.db.run import get_many_runs - -if TYPE_CHECKING: - from typing import Optional - from dataci.repo import Repo - - -def list_run(pipeline_identifier=None, tree_view=True, repo: 'Optional[Repo]' = None): - pipeline_identifier = pipeline_identifier or '*' - matched = LIST_PIPELINE_IDENTIFIER_PATTERN.match(pipeline_identifier) - if not matched: - raise ValueError(f'Invalid pipeline identifier {pipeline_identifier}') - pipeline_name, pipeline_version = matched.groups() - pipeline_version = (pipeline_version or '').lower() + '*' - - # Check matched runs - run_dict_list = get_many_runs(pipeline_name, pipeline_version) - run_list = list() - for run_dict in run_dict_list: - run_dict['repo'] = repo - run_list.append(Run.from_dict(run_dict)) - - if tree_view: - run_dict = defaultdict(lambda: defaultdict(list)) - for run in run_list: - run_dict[run.pipeline.name][run.pipeline.version].append(run) - return run_dict - - return run_list diff --git a/dataci/models/run/run.py b/dataci/models/run/run.py deleted file mode 100644 index 0fea86f..0000000 --- a/dataci/models/run/run.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Author: Li Yuanming -Email: yuanmingleee@gmail.com -Date: Mar 09, 2023 - -Run for pipeline. -""" -import os -from copy import deepcopy -from shutil import rmtree, copy2 -from typing import TYPE_CHECKING - -from dataci.utils import symlink_force - -if TYPE_CHECKING: - from dataci.pipeline.pipeline import Pipeline - - -class Run(object): - from .save import save # type: ignore[misc] - - def __init__(self, pipeline: 'Pipeline', run_num: int, **kwargs): - self.pipeline = pipeline - self.run_num = run_num - - @property - def workdir(self): - return self.pipeline.workdir / 'runs' / str(self.run_num) - - def prepare(self): - from dataci.dataset import Dataset - - # Clean all for the run workdir - if self.workdir.exists(): - rmtree(self.workdir) - # Create workdir folder - self.workdir.mkdir(parents=True) - # Link code to work directory - (self.workdir / self.pipeline.CODE_DIR).symlink_to( - self.pipeline.workdir / self.pipeline.CODE_DIR, target_is_directory=True - ) - # TODO: better way to prepare input feat - # Create feat dir and link feat into the feat dir - (self.workdir / self.pipeline.FEAT_DIR).mkdir(parents=True) - for stage in self.pipeline.stages: - for dependency in stage.dependency: - if isinstance(dependency, Dataset): - # Link global dataset files path to local - local_file_path = os.path.join( - self.workdir / self.pipeline.FEAT_DIR, dependency.name + dependency.dataset_files.suffix) - symlink_force(dependency.dataset_files, local_file_path) - dependency = local_file_path - - # Copy pipeline definition file to work directory - copy2(self.pipeline.workdir / 'dvc.yaml', self.workdir / 'dvc.yaml', ) - - @property - def feat(self): - outputs = deepcopy(self.pipeline.outputs) - outputs_dict = dict() - for output in outputs: - output.rebase(self.workdir) - outputs_dict[output.name] = output - return outputs_dict - - def __cmp__(self, other): - if not isinstance(other, Run): - raise ValueError(f'Compare between type {type(Run)} and {type(other)} is invalid.') - if self.pipeline != other.pipeline: - raise ValueError( - f'Compare between two different pipeline {self.pipeline} and {other.pipeline} is invalid.' - ) - return self.run_num.__cmp__(other.run_num) - - def __str__(self): - return str(self.pipeline) + f'.run{self.run_num}' - - def dict(self): - return {'run_num': self.run_num, 'pipeline': self.pipeline.dict()} - - @classmethod - def from_dict(cls, config): - from dataci.pipeline.pipeline import Pipeline - - config['pipeline']['repo'] = config.get('repo', None) - config['pipeline'] = Pipeline.from_dict(config['pipeline']) - config['pipeline'].restore() - return cls(**config) diff --git a/dataci/models/run/save.py b/dataci/models/run/save.py deleted file mode 100644 index ca06b07..0000000 --- a/dataci/models/run/save.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Author: Li Yuanming -Email: yuanmingleee@gmail.com -Date: Mar 15, 2023 -""" -import logging -import os -from pathlib import Path -from typing import TYPE_CHECKING - -import yaml - -from dataci.db.run import create_one_run -from dataci.utils import cwd - -if TYPE_CHECKING: - from .run import Run - -logger = logging.getLogger(__name__) - - -def save(run: 'Run' = ...): - with cwd(run.workdir): - ##################################################################### - # Step 1: Recover pipeline feat cached file (.dvc) from .lock - # TODO: The reason due to https://github.com/iterative/dvc/issues/4428 - ##################################################################### - if os.path.exists('dvc.lock'): - with open('dvc.lock', 'r') as f: - run_cache_lock = yaml.safe_load(f) - for k, v in run_cache_lock['stages'].items(): - for out in v['outs']: - logger.info(f'Recover dvc file {out["path"]}.dvc') - with open(out['path'] + '.dvc', 'w') as f: - yaml.safe_dump({ - 'outs': [ - { - 'md5': out['md5'], - 'path': os.path.basename(out['path']), - 'size': out['size'] - } - ] - }, f) - ##################################################################### - # Step 2: Publish pipeline output feat - ##################################################################### - for output in run.pipeline.outputs: - output.publish() - - ##################################################################### - # Step 3: Publish run object to DB - ##################################################################### - create_one_run(run.dict()) - - ##################################################################### - # Step 4: Remove feat cached file (.dvc) - ##################################################################### - for dvc_file in Path(run.pipeline.FEAT_DIR).glob('**/*.dvc'): - dvc_file.unlink() diff --git a/dataci/models/stage.py b/dataci/models/stage.py index bd9bd1e..21289d1 100644 --- a/dataci/models/stage.py +++ b/dataci/models/stage.py @@ -11,9 +11,10 @@ import shutil from collections import defaultdict from datetime import datetime -from pathlib import Path from typing import TYPE_CHECKING +import networkx as nx + from dataci.db.stage import ( create_one_stage, exist_stage, @@ -22,7 +23,7 @@ get_next_stage_version_tag, get_many_stages, create_one_stage_tag ) -from .base import BaseModel +from .base import Job, JobView from .script import Script from ..utils import hash_binary, cwd @@ -30,7 +31,7 @@ from typing import Optional -class Stage(BaseModel): +class Stage(Job): """Stage mixin class. Attributes: @@ -74,11 +75,13 @@ def dict(self, id_only=False): if id_only: return { 'workspace': self.workspace.name, + 'type': self.type_name, 'name': self.name, 'version': self.version, } return { 'name': self.name, + 'type': self.type_name, 'workspace': self.workspace.name, 'version': self.version, 'version_tag': self.version_tag, @@ -183,9 +186,11 @@ def publish(self): return self.reload(config) @classmethod - def get_config(cls, name, version=None): + def get_config(cls, name, version=None, workspace=None): """Get the stage config from the workspace.""" - workspace, name, version_or_tag = cls.parse_data_model_get_identifier(name, version) + workspace_, name, version_or_tag = cls.parse_data_model_get_identifier(name, version) + # Override workspace if provided + workspace = workspace or workspace_ if version_or_tag == 'latest' or version_or_tag.startswith('v'): config = get_one_stage_by_tag(workspace, name, version_or_tag) else: @@ -194,14 +199,38 @@ def get_config(cls, name, version=None): return config @classmethod - def get(cls, name, version=None): + def get(cls, name, version=None, workspace=None, not_found_ok=False, **kwargs): """Get the stage from the workspace.""" - config = cls.get_config(name, version) + config = cls.get_config(name, version, workspace) if config is None: + if not not_found_ok: + raise ValueError(f'Stage {name}@{version} not found') return return cls.from_dict(config) + @classmethod + def get_by_workflow(cls, stage_name, workflow_name, workflow_version=None): + """Get the stage from the workspace.""" + from dataci.models.workflow import Workflow + + workflow_config = Workflow.get_config(workflow_name, workflow_version) + if workflow_config is None: + raise ValueError(f'Workflow {workflow_name}@{workflow_version} not found') + # Find stage version + for _, v in workflow_config['dag']['node'].items(): + if v['name'] == stage_name: + stage_version = v['version'] + break + else: + raise ValueError(f'Stage {stage_name} not found in workflow {workflow_name}@{workflow_version}') + + if '.' not in stage_name: + stage_workspace = workflow_config['workspace'] + stage_name = stage_workspace + '.' + stage_name + + return cls.get(stage_name, stage_version) + @classmethod def find(cls, stage_identifier, tree_view=False, all=False): """Find the stage from the workspace.""" @@ -215,3 +244,42 @@ def find(cls, stage_identifier, tree_view=False, all=False): return stage_dict return stages + + def upstream(self, n=1, type=None): + """Get the downstream stages of the stage. + TODO: type is miss-aligned with stage::upstream, only 'run' and 'dataset' are supported. + """ + from dataci.models.run import Run + + runs = Run.find_by_job(workspace=self.workspace.name, name=self.name, version=self.version, type=self.type_name) + graphs = list() + node_mapping = dict() + for run in runs: + g = run.upstream(n=n, type=type) + for node in g.nodes: + if node.type == Run.type_name: + node_mapping[node] = JobView(**node.get()._job) + nx.relabel_nodes(g, node_mapping, copy=False) + graphs.append(g) + # Merge all graphs + upstream_graph = nx.compose_all(graphs) + return upstream_graph + + def downstream(self, n=1, type=None): + """Get the downstream stages of the stage. + """ + from dataci.models.run import Run + + runs = Run.find_by_job(workspace=self.workspace.name, name=self.name, version=self.version, type=self.type_name) + graphs = list() + node_mapping = dict() + for run in runs: + g = run.downstream(n=n, type=type) + for node in g.nodes: + if node.type == Run.type_name: + node_mapping[node] = JobView(**node.get()._job) + nx.relabel_nodes(g, node_mapping, copy=False) + graphs.append(g) + # Merge all graphs + downstream_graph = nx.compose_all(graphs) + return downstream_graph diff --git a/dataci/models/workflow.py b/dataci/models/workflow.py index 395dcba..f0562a6 100644 --- a/dataci/models/workflow.py +++ b/dataci/models/workflow.py @@ -28,7 +28,7 @@ get_next_workflow_version_id, create_one_workflow_tag, get_one_workflow_by_tag, get_one_workflow_by_version, ) -from .base import BaseModel +from .base import Job from .event import Event from .script import Script from .stage import Stage @@ -44,7 +44,7 @@ logger = logging.getLogger(__name__) -class Workflow(BaseModel, ABC): +class Workflow(Job, ABC): name_arg = 'name' type_name = 'workflow' @@ -91,7 +91,12 @@ def stage_script_paths(self): def dict(self, id_only=False): if id_only: - return {'workspace': self.workspace.name, 'name': self.name, 'version': self.version} + return { + 'workspace': self.workspace.name, + 'type': self.type_name, + 'name': self.name, + 'version': self.version + } # export the dag as a dict # 1. convert the dag to a list of edges # 2. convert each node from Stage to an id @@ -109,6 +114,7 @@ def dict(self, id_only=False): return { 'workspace': self.workspace.name, + 'type': self.type_name, 'name': self.name, 'version': self.version, 'dag': { @@ -339,9 +345,11 @@ def publish(self): return self.reload(config) @classmethod - def get(cls, name: str, version: str = None): - """Get a models from the workspace.""" - workspace, name, version = cls.parse_data_model_get_identifier(name, version) + def get_config(cls, name: str, version: str = None, workspace: str = None): + """Get workflow config only""" + workspace_, name, version = cls.parse_data_model_get_identifier(name, version) + # Override the workspace if specified + workspace = workspace or workspace_ if version is None or version == 'latest' or version.startswith('v'): # Get by tag @@ -351,6 +359,12 @@ def get(cls, name: str, version: str = None): if version.lower() == 'none': version = None config = get_one_workflow_by_version(workspace, name, version) + return config + + @classmethod + def get(cls, name: str, version: str = None, workspace: str = None, not_found_ok=False, **kwargs): + """Get a models from the workspace.""" + config = cls.get_config(name=name, version=version, workspace=workspace) if config is None: return diff --git a/dataci/plugins/decorators/airflow.py b/dataci/plugins/decorators/airflow.py index 9cd320a..8a2b6b6 100644 --- a/dataci/plugins/decorators/airflow.py +++ b/dataci/plugins/decorators/airflow.py @@ -60,7 +60,8 @@ def __call__(self, *args, **kwargs): self._stage.input_table[key] = ... elif isinstance(arg, _Dataset): # arg is a DataCI dataset self._stage.input_table[key] = { - 'name': arg.identifier, 'file_reader': arg.file_reader.NAME, 'file_writer': arg.file_writer.NAME + 'name': arg.identifier, 'file_reader': arg.file_reader.NAME, 'file_writer': arg.file_writer.NAME, + 'type': arg.type_name, } # Rewrite the argument with the dataset identifier bound.arguments[key] = arg.identifier diff --git a/dataci/plugins/orchestrator/airflow.py b/dataci/plugins/orchestrator/airflow.py index d3331e4..04df74d 100644 --- a/dataci/plugins/orchestrator/airflow.py +++ b/dataci/plugins/orchestrator/airflow.py @@ -227,6 +227,13 @@ def execute_callable(self) -> 'Any': bound.arguments[arg_name] = dataset.read() # For logging self.log.info(f'Input table {arg_name}: {dataset.identifier}') + # Load back to the input table + self.input_table[arg_name] = { + 'name': dataset.identifier, + 'type': dataset.type_name, + 'file_reader': dataset.file_reader.NAME, + 'file_writer': dataset.file_writer.NAME + } self.op_args, self.op_kwargs = bound.args, bound.kwargs # Run the stage by backend @@ -237,16 +244,18 @@ def execute_callable(self) -> 'Any': if self.multiple_outputs: dataset.dataset_files = ret[key] dataset.save() - ret[key] = { + self.output_table[key] = ret[key] = { 'name': dataset.identifier, + 'type': dataset.type_name, 'file_reader': dataset.file_reader.NAME, 'file_writer': dataset.file_writer.NAME } else: dataset.dataset_files = ret dataset.save() - ret = { + self.output_table[key] = ret = { 'name': dataset.identifier, + 'type': dataset.type_name, 'file_reader': dataset.file_reader.NAME, 'file_writer': dataset.file_writer.NAME } diff --git a/dataci/utils.py b/dataci/utils.py index 9dd3d8c..3ef5640 100644 --- a/dataci/utils.py +++ b/dataci/utils.py @@ -78,6 +78,10 @@ def hash_file(filepaths: 'Union[str, os.PathLike, List[Union[os.PathLike, str]]] return sha_hash.hexdigest() +def dict_to_frozenset(d): + return frozenset((k, d[k]) for k in sorted(d.keys())) + + def hash_binary(b: bytes): """ Compute the hash of a binary. diff --git a/dataci/models/run/__init__.py b/metadata/__init__.py similarity index 63% rename from dataci/models/run/__init__.py rename to metadata/__init__.py index cd61e35..ca67a66 100644 --- a/dataci/models/run/__init__.py +++ b/metadata/__init__.py @@ -3,8 +3,5 @@ """ Author: Li Yuanming Email: yuanmingleee@gmail.com -Date: Mar 15, 2023 +Date: Nov 20, 2023 """ -from .run import Run - -__all__ = ['Run'] diff --git a/metadata/lineage_analysis.py b/metadata/lineage_analysis.py new file mode 100644 index 0000000..ba04601 --- /dev/null +++ b/metadata/lineage_analysis.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Nov 21, 2023 +""" +import json +from copy import deepcopy + +from metadata.models import RunEvent + +JSON_PATH = 'bash_example_metadata.json' +# JSON_PATH = 'lineagetest_postgres.json' + +if __name__ == '__main__': + import builtins + import rich + + builtins.print = rich.print + +with open(JSON_PATH) as f: + lines = f.readlines() + + + +def merge_dicts(a: dict, b: dict): + """Merge dictionaries b into a.""" + result = deepcopy(a) + for k, v in b.items(): + if isinstance(v, dict): + result[k] = merge_dicts(result.get(k, dict()), v) + else: + result[k] = v + return result + + +events = [] +for line in lines: + events.append(json.loads(line)) + +runs = dict() +for event in events: + run_id = event['run']['runId'] + if run_id not in runs: + runs[run_id] = event + else: + # Merge the new event into the existing one + existing_event = runs[run_id] + if event['eventTime'] >= existing_event['eventTime']: + runs[run_id] = merge_dicts(existing_event, event) + else: + runs[run_id] = merge_dicts(event, existing_event) + + +# r = runs['db33cca4-8d48-3ade-9111-384c86657c25'] +r = runs['1ff96850-b38d-3247-b2cc-7ebe66e7d0d7'] +# r2 = runs['3b80ac13-0b1b-38b5-a70d-97e15e33189f'] + +print(json.dumps(r)) diff --git a/metadata/models.py b/metadata/models.py new file mode 100644 index 0000000..68b14d0 --- /dev/null +++ b/metadata/models.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Nov 20, 2023 +""" +import re +from datetime import datetime +from enum import Enum +from typing import List, Optional, Dict, Any +from uuid import UUID + +from packaging import version +from pydantic import BaseModel, Field, AnyUrl, Extra, root_validator, validator + +SCHEMA_VERSION = "2-0-2" +SCHEMA_PATH_PATTERN = re.compile(r'^/spec/(\d+)-(\d+)-(\d+)/OpenLineage.json$') + + +def parse_schema_version(schema_url: AnyUrl) -> str: + match = SCHEMA_PATH_PATTERN.match(schema_url.path) + if match: + major, minor, micro = match.groups() + return f"{major}-{minor}-{micro}" + raise ValueError(f"Invalid schema url: {schema_url}") + + +class BaseEvent(BaseModel): + eventTime: datetime = Field(default_factory=datetime.utcnow, description="the time the event occurred at") + producer: AnyUrl = Field( + description="URI identifying the producer of this metadata. For example this could be a git url with a given tag or sha", + example="https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + ) + schemaURL: AnyUrl = Field( + description="The JSON Pointer (https://tools.ietf.org/html/rfc6901) URL to the corresponding version of the schema definition for this RunEvent", + example="https://openlineage.io/spec/0-0-1/OpenLineage.json", + ) + + @validator("schemaURL") + def check_schema_version(cls, v): + schema_version = parse_schema_version(v) + if version.parse(schema_version.replace('-', '.')) <= \ + version.parse(SCHEMA_VERSION.replace('-', '.')): + return v + raise ValueError(f"Invalid schema version: {v}") + +class BaseFacet(BaseModel, extra=Extra.allow): + """all fields of the base facet are prefixed with _ to avoid name conflicts in facets""" + + _producer: AnyUrl = Field( + description="URI identifying the producer of this metadata. For example this could be a git url with a given tag or sha", + example="https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" + ) + _schemaURL: AnyUrl = Field( + description="The JSON Pointer (https://tools.ietf.org/html/rfc6901) URL to the corresponding version of the schema definition for this facet", + example="https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet" + ) + +class RunFacet(BaseFacet): + """A Run Facet""" + +class Run(BaseModel): + runId: UUID = Field(description="The globally unique ID of the run associated with the job.") + facets: Optional[Dict[Any, RunFacet]] = Field( + default_factory=dict, + description="The run facets.", + ) + + +class JobFacet(BaseFacet): + """A Job Facet""" + _deleted: bool = Field( + description="set to true to delete a facet", + ) + + +class DatasetFacet(BaseFacet): + """A Dataset Facet""" + _deleted: bool = Field( + description="set to true to delete a facet", + ) + + +class InputDatasetFacet(DatasetFacet): + """An Input Dataset Facet""" + + +class OutputDatasetFacet(DatasetFacet): + """An Output Dataset Facet""" + + +class Job(BaseModel): + namespace: str = Field(description="The namespace containing that job", example="my-scheduler-namespace") + name: str = Field(description="The unique name for that job within that namespace", example="myjob.mytask") + facets: Optional[Dict[Any, JobFacet]] = Field( + default_factory=dict, + description="The job facets.", + ) + +class Dataset(BaseModel): + namespace: str = Field(description="The namespace containing that dataset", example="my-datasource-namespace") + name: str = Field(description="The unique name for that dataset within that namespace", example="instance.schema.table") + facets: Optional[Dict[Any, Any]] = Field( + default_factory=dict, + description="The facets for this dataset", + ) + + +class StaticDataset(Dataset): + """A Dataset sent within static metadata events""" + + +class InputDataset(Dataset): + """An input dataset""" + inputFacets: Optional[Dict[Any, InputDatasetFacet]] = Field( + default_factory=dict, + description="The input facets for this dataset.", + ) + + +class OutputDataset(Dataset): + """An output dataset""" + outputFacets: Optional[Dict[Any, OutputDatasetFacet]] = Field( + default_factory=dict, + description="The output facets for this dataset", + ) + + +class RunState(Enum): + START = "START" + RUNNING = "RUNNING" + COMPLETE = "COMPLETE" + ABORT = "ABORT" + FAIL = "FAIL" + OTHER = "OTHER" + + +class RunEvent(BaseEvent): + eventType: RunState = Field( + description="the current transition of the run state. It is required to issue 1 START event and 1 of [ COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be added to the same run. For example to send additional metadata after the run is complete", + example="START|RUNNING|COMPLETE|ABORT|FAIL|OTHER", + # enum=["START", "RUNNING", "COMPLETE", "ABORT", "FAIL", "OTHER"], + ) + run: Run + job: Job + inputs: Optional[List[InputDataset]] = Field(default_factory=list, description="The set of **input** datasets.") + outputs: Optional[List[OutputDataset]] = Field(default_factory=list, description="The set of **output** datasets.") + + +class DatasetEvent(BaseEvent): + dataset: StaticDataset + + @root_validator + def check_not_required(cls, values): + if "job" in values or "run" in values: + raise ValueError("DatasetEvent should not contain `job` or `run`") + return values + + class Config: + schema_extra = {"not": {"required": ["job", "run"]}} + + +class JobEvent(BaseEvent): + job: Job + inputs: Optional[List[InputDataset]] = Field(default_factory=list, description="The set of **input** datasets.") + outputs: Optional[List[OutputDataset]] = Field(default_factory=list, description="The set of **output** datasets.") + + @root_validator + def check_not_required(cls, values): + if "run" in values: + raise ValueError("JobEvent should not contain `run`") + return values + + class Config: + schema_extra = {"not": {"required": ["run"]}} + + +if __name__ == '__main__': + import rich + import builtins + + builtins.print = rich.print + + print(RunEvent.schema_json(indent=2)) diff --git a/metadata/server.py b/metadata/server.py new file mode 100644 index 0000000..3fa9b92 --- /dev/null +++ b/metadata/server.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Nov 20, 2023 +""" +from typing import Union + +from fastapi import APIRouter, FastAPI + +from dataci.models import Run as RunModel, Lineage, Stage +from metadata.models import RunEvent, DatasetEvent, JobEvent, RunState + +app = FastAPI() + +api_router = APIRouter(prefix='/api/v1') + + +# Record lineage information as schema defined as OpenLineage (2-0-2) +# https://openlineage.io/apidocs/openapi/ +@api_router.post('/lineage', summary='Send an event related to the state of a run') +def post_lineage(event: Union[RunEvent, DatasetEvent, JobEvent]): + """Updates a run state for a job. + """ + # Skip if event is a test event (event job name cannot parse to workspace, job name and version) + name_parts = event.job.name.split('--') + if len(name_parts) != 3: + return {'status': 'skip'} + + # Parse job type + if '.' in event.job.name: + # Get job + job_workspace, job_name, job_version = name_parts + job_version, stage_name = job_version.split('.') + job = Stage.get_by_workflow(stage_name, f'{job_workspace}.{job_name}@{job_version}') + else: + job_workspace, job_name, job_version = name_parts + job = { + 'workspace': job_workspace, + 'type': 'workflow', + 'name': job_name, + 'version': job_version, + } + + # If event type is START, create a new run + if event.eventType == RunState.START: + run = RunModel( + name=str(event.run.runId), + status=event.eventType.value, + job=job, + create_time=event.eventTime, + ) + run.save() + else: + run = RunModel( + name=str(event.run.runId), + status=event.eventType.value, + job=job, + update_time=event.eventTime, + ) + run.update() + + # get parent run if exists + if 'parent' in event.run.facets: + parent_run_config = { + 'name': str(event.run.facets['parent'].run['runId']), + 'type': 'run', + } + else: + parent_run_config = None + + # Get input and output dataset + # Inputs: event.run.facets['unknownSourceAttribute'].unknownItems[0]['properties']['input_table'] + # Outputs: event.run.facets['unknownSourceAttribute'].unknownItems[0]['properties']['output_table'] + unknown_src_attr = event.run.facets.get('unknownSourceAttribute', object()) + ops_props = (getattr(unknown_src_attr, 'unknownItems', None) or [dict()])[0].get('properties', dict()) + # Input tables and parent run are upstream + inputs = list(ops_props.get('input_table', dict()).values()) + if parent_run_config is not None: + inputs.append(parent_run_config) + # Output tables are downstream + outputs = list(ops_props.get('output_table', dict()).values()) + + if len(inputs) > 0: + upstream_lineage = Lineage(upstream=inputs, downstream=run) + upstream_lineage.save() + if len(outputs) > 0: + downstream_lineage = Lineage(upstream=run, downstream=outputs) + downstream_lineage.save() + + return {'status': 'success'} + + +app.include_router(api_router) + +if __name__ == '__main__': + import uvicorn + + uvicorn.run(app, host='localhost', port=8000) diff --git a/requirements.txt b/requirements.txt index 88da1f9..c8eb696 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,8 @@ networkx>=3.1 uvicorn>=0.15.0 click==8.1.3 tqdm>=4.62.3 +pydantic>=1.8.2 pygraphviz>=1.11 +pyan3>=1.2.0 rich>=13.3.5 GitPython>=3.1.24 \ No newline at end of file diff --git a/tests/lineage/python_ops_pipeline.py b/tests/lineage/python_ops_pipeline.py new file mode 100644 index 0000000..367f804 --- /dev/null +++ b/tests/lineage/python_ops_pipeline.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Dec 10, 2023 +""" +from datetime import datetime +from dataci.plugins.decorators import dag, Dataset, stage + + +@stage +def task1(df): + return df + + +@stage +def task2_0(df): + return df + + +@stage +def task2_1(df): + return df + + +@stage +def task3(df1, df2): + import pandas as pd + + return pd.concat([df1, df2]) + + +@dag( + start_date=datetime(2020, 7, 30), schedule=None, +) +def python_ops_pipeline(): + raw_dataset_train = Dataset.get('test.yelp_review_test@latest') + dataset1 = Dataset(name='test.task1_out', dataset_files=task1(raw_dataset_train)) + dataset2_0 = Dataset(name='test.task2_0_out', dataset_files=task2_0(dataset1)) + dataset2_1 = Dataset(name='test.task2_1_out', dataset_files=task2_1(dataset1)) + dataset3 = Dataset(name='test.task3_out', dataset_files=task3(dataset2_0, dataset2_1)) + + +# Build the pipeline +python_ops_dag = python_ops_pipeline() diff --git a/tests/test_lineage.py b/tests/test_lineage.py new file mode 100644 index 0000000..e097628 --- /dev/null +++ b/tests/test_lineage.py @@ -0,0 +1,47 @@ +import unittest +from pathlib import Path + +TEST_DIR = Path(__file__).parent + + +class TestLineage(unittest.TestCase): + def setUp(self): + """Set up test fixtures. + 1. Create a test workspace and set it as the current default workspace + 2. Save and publish a test dataset + 3. Save the test pipeline + """ + from dataci.models import workspace + from dataci.models import Dataset + + workspace.DEFAULT_WORKSPACE = 'test' + self.test_dataset = Dataset('yelp_review_test', dataset_files=[ + {'date': '2020-10-05 00:44:08', 'review_id': 'HWRpzNHPqjA4pxN5863QUA', 'stars': 5.0, + 'text': "I called Anytime on Friday afternoon about the number pad lock on my front door. After several questions, the gentleman asked me if I had changed the battery.", }, + {'date': '2020-10-15 04:34:49', 'review_id': '01plHaNGM92IT0LLcHjovQ', 'stars': 5.0, + 'text': "Friend took me for lunch. Ordered the Chicken Pecan Tart although it was like a piece quiche, was absolutely delicious!", }, + {'date': '2020-10-17 06:58:09', 'review_id': '7CDDSuzoxTr4H5N4lOi9zw', 'stars': 4.0, + 'text': "I love coming here for my fruit and vegetables. It is always fresh and a great variety. The bags of already diced veggies are a huge time saver.", }, + ]) + self.assertEqual( + self.test_dataset.workspace.name, + 'test', + "Failed to set the default workspace to `test`." + ) + self.test_dataset.publish(version_tag='2020-10') + + from dataci.models import Workflow + + self.workflow = Workflow.from_path( + TEST_DIR / 'lineage', + entry_path='python_ops_pipeline.py' + ) + self.workflow.publish() + self.workflow.run() + + def test_lineage(self): + self.assertEqual(True, True) # add assertion here + + +if __name__ == '__main__': + unittest.main()