Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Lineage for data and code #12

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8741986
:sparkles: [metadata] create a lineage server
YuanmingLeee Nov 20, 2023
ba784cb
:sparkles: [lineage] add lineage and run data model
YuanmingLeee Nov 23, 2023
f8f3b3b
:wrench: [model] Add methods for run data model
YuanmingLeee Nov 23, 2023
ef0301d
:wrench: [lineage] Add dao for run model
YuanmingLeee Nov 24, 2023
2b08ab2
:wrench: [model] Add run model update method
YuanmingLeee Nov 26, 2023
fc2c63b
:bug: [run] Fix bug in run model create and update APIs
YuanmingLeee Nov 27, 2023
ef794ba
:sparkles: [model] Add lineage data model
YuanmingLeee Nov 27, 2023
2ea0a45
:bug: [lineage] Fix lineage save API
YuanmingLeee Nov 29, 2023
bb9e032
:wrench: [lineage] Add lineage track for DataCI dataset
YuanmingLeee Nov 29, 2023
0ea3dfd
:bug: [lineage] Fix bug in output data not tracked by lineage
YuanmingLeee Nov 30, 2023
19f84e7
:wrench: [lineage] Modify lineage data object structure
YuanmingLeee Dec 1, 2023
324a2e1
:bug: [lineage] Fix bug in lineage save
YuanmingLeee Dec 4, 2023
3519be5
:wrench: [lineage] Add lineage get API design
YuanmingLeee Dec 5, 2023
9626941
:beer: Add lineage downstream/upstream query for stage
YuanmingLeee Dec 7, 2023
b1d9130
:recycle: [rename] Rename class BaseModel -> Job to be consistent wit…
YuanmingLeee Dec 7, 2023
64f5da0
:wrench: [refactor] Add a sub-class register for Job for better metho…
YuanmingLeee Dec 7, 2023
6c04604
:bug: [lineage] Fix bug in get upstream/downstream lineage
YuanmingLeee Dec 10, 2023
add5ea5
:white_check_mark: [test] Add test for lineage (WIP)
YuanmingLeee Dec 10, 2023
8785cdd
:bug: [lineage] Fix bug for lineage post API
YuanmingLeee Dec 10, 2023
47f5631
:bug: [lineage] Fix bug for lineage upstream/downstream due to common…
YuanmingLeee Dec 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dataci/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import configparser
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from threading import Event as ThreadEvent
Expand Down Expand Up @@ -97,6 +98,7 @@ def load_config():
LOG_DIR = None
LOG_LEVEL = None
STORAGE_BACKEND = None
TIMEZONE = datetime.now(timezone.utc).astimezone().tzinfo

# DataCI Trigger and Scheduler server
SERVER_ADDRESS = '0.0.0.0'
Expand Down
58 changes: 56 additions & 2 deletions dataci/db/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,32 @@
# Drop all tables
with db_connection:
db_connection.executescript("""
DROP TABLE IF EXISTS lineage;
DROP TABLE IF EXISTS run;
DROP TABLE IF EXISTS dataset_tag;
DROP TABLE IF EXISTS dataset;
DROP TABLE IF EXISTS workflow_dag_node;
DROP TABLE IF EXISTS stage_tag;
DROP TABLE IF EXISTS stage;
DROP TABLE IF EXISTS workflow_tag;
DROP TABLE IF EXISTS workflow;
DROP TABLE IF EXISTS job;
""")
logger.info('Drop all tables.')

# Create dataset table
with db_connection:
db_connection.executescript("""
CREATE TABLE job
(
workspace TEXT,
name TEXT,
version TEXT,
type TEXT,
PRIMARY KEY (workspace, name, version, type),
UNIQUE (workspace, name, version, type)
);

CREATE TABLE workflow
(
workspace TEXT,
Expand All @@ -45,7 +58,9 @@
script_filelist TEXT,
script_hash TEXT,
PRIMARY KEY (workspace, name, version),
UNIQUE (workspace, name, version)
UNIQUE (workspace, name, version),
FOREIGN KEY (workspace, name, version)
REFERENCES job (workspace, name, version)
);

CREATE TABLE workflow_tag
Expand Down Expand Up @@ -73,7 +88,9 @@
script_filelist TEXT,
script_hash TEXT,
PRIMARY KEY (workspace, name, version),
UNIQUE (workspace, name, version)
UNIQUE (workspace, name, version),
FOREIGN KEY (workspace, name, version)
REFERENCES job (workspace, name, version)
);

CREATE TABLE stage_tag
Expand Down Expand Up @@ -133,5 +150,42 @@
FOREIGN KEY (workspace, name, version)
REFERENCES dataset (workspace, name, version)
);

CREATE TABLE run
(
workspace TEXT,
name TEXT,
version INTEGER,
status TEXT,
job_workspace TEXT,
job_name TEXT,
job_version TEXT,
job_type TEXT,
create_time INTEGER,
update_time INTEGER,
PRIMARY KEY (workspace, name, version),
UNIQUE (name, version),
FOREIGN KEY (workspace, name, version)
REFERENCES job (workspace, name, version),
FOREIGN KEY (job_workspace, job_name, job_version, job_type)
REFERENCES job (workspace, name, version, type)
);

CREATE TABLE lineage
(
upstream_workspace TEXT,
upstream_name TEXT,
upstream_version TEXT,
upstream_type TEXT,
downstream_workspace TEXT,
downstream_name TEXT,
downstream_version TEXT,
downstream_type TEXT,
PRIMARY KEY (upstream_workspace, upstream_name, upstream_version, upstream_type, downstream_workspace, downstream_name, downstream_version, downstream_type),
FOREIGN KEY (upstream_workspace, upstream_name, upstream_version, upstream_type)
REFERENCES job (workspace, name, version, type),
FOREIGN KEY (downstream_workspace, downstream_name, downstream_version, downstream_type)
REFERENCES job (workspace, name, version, type)
);
""")
logger.info('Create all tables.')
Loading