Skip to content

Commit

Permalink
doc: add docstrings for public methods
Browse files Browse the repository at this point in the history
  • Loading branch information
pvanliefland committed Dec 28, 2023
1 parent 95fd979 commit 01e7c64
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 30 deletions.
13 changes: 13 additions & 0 deletions openhexa/sdk/pipelines/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ def tmp_path(self):
return Path("~/tmp/")

def add_file_output(self, path: str):
"""Record a run output for a file creation operation.
This output will be visible in the web interface, on the pipeline run page.
"""
stripped_path = path.replace(workspace.files_path, "")
name = stripped_path.strip("/")
if self._connected:
Expand All @@ -45,6 +49,10 @@ def add_file_output(self, path: str):
print(f"Sending output with path {stripped_path}")

def add_database_output(self, table_name: str):
"""Record a run output for a database operation.
This output will be visible in the web interface, on the pipeline run page.
"""
if self._connected:
graphql(
"""
Expand All @@ -63,18 +71,23 @@ def add_database_output(self, table_name: str):
print(f"Sending output with table_name {table_name}")

def log_debug(self, message: str):
"""Log a message with the DEBUG priority."""
self._log_message("DEBUG", message)

def log_info(self, message: str):
"""Log a message with the INFO priority."""
self._log_message("INFO", message)

def log_warning(self, message: str):
"""Log a message with the WARNING priority."""
self._log_message("WARNING", message)

def log_error(self, message: str):
"""Log a message with the ERROR priority."""
self._log_message("ERROR", message)

def log_critical(self, message: str):
"""Log a message with the CRITICAL priority."""
self._log_message("CRITICAL", message)

def _log_message(
Expand Down
63 changes: 33 additions & 30 deletions openhexa/sdk/pipelines/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@


class TaskCom:
"""Lightweight data transfer object allowing tasks to communicate."""
"""Lightweight data transfer object allowing tasks to communicate.
TaskCom instances also allow us to build the pipeline dependency graph.
"""

def __init__(self, task):
self.result = task.result
Expand All @@ -38,27 +41,11 @@ def __init__(self, function: typing.Callable):
self.active = False
self.pooled = False

def __call__(self, *task_args, **task_kwargs):
self.active = True # uncalled tasks will be skipped
# check that all inputs are tasks
self.task_args = task_args
self.task_kwargs = task_kwargs
return self
def is_ready(self) -> bool:
"""Determine whether the task is ready to be run.
def __repr__(self):
return self.name

def get_node_inputs(self):
inputs = []
for a in self.task_args:
if issubclass(type(a), Task):
inputs.append(a)
for k, a in self.task_kwargs.items():
if issubclass(type(a), Task):
inputs.append(a)
return inputs

def is_ready(self):
This involves checking whether tasks higher up in the dependency graph have been executed.
"""
if not self.active:
return False

Expand All @@ -71,24 +58,32 @@ def is_ready(self):

return True if self.end_time is None else False

def get_tasks_ready(self):
def get_ready_tasks(self) -> list[Task]:
"""Find and return all tasks that can be launched at this point in time."""
tasks = []
for a in self.task_args:
if issubclass(type(a), Task):
if a.is_ready():
tasks.append(a)
else:
tasks += a.get_tasks_ready()
tasks += a.get_ready_tasks()
for k, a in self.task_kwargs.items():
if issubclass(type(a), Task):
if a.is_ready():
tasks.append(a)
else:
tasks += a.get_tasks_ready()
tasks += a.get_ready_tasks()

return list(set(tasks))

def run(self):
def run(self) -> TaskCom:
"""Run the task.
Returns
-------
TaskCom
A TaskCom instance which can in turn be passed to other tasks.
"""
if self.end_time:
# already executed, return previous result
return self.result
Expand Down Expand Up @@ -118,10 +113,17 @@ def run(self):
# done!
return TaskCom(self)

def stateless_run(self):
self.result = None
self.start_time, self.end_time = None, None
return self.run()
def __call__(self, *task_args, **task_kwargs):
"""Wrap the task with args and kwargs and return it."""
self.active = True # uncalled tasks will be skipped
# check that all inputs are tasks
self.task_args = task_args
self.task_kwargs = task_kwargs

return self

def __repr__(self):
return self.name


class PipelineWithTask:
Expand All @@ -135,7 +137,8 @@ def __init__(
self.function = function
self.pipeline = pipeline

def __call__(self, *task_args, **task_kwargs):
def __call__(self, *task_args, **task_kwargs) -> Task:
"""Attach the new task to the decorated pipeline and return it."""
task = Task(self.function)(*task_args, **task_kwargs)
self.pipeline.tasks.append(task)
return task
5 changes: 5 additions & 0 deletions openhexa/sdk/workspaces/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def __repr__(self):

@property
def url(self):
"""Provide a URL to the PostgreSQL database.
The URL follows the official PostgreSQL specification.
(See https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING for more information)
"""
return f"postgresql://{self.username}:{self.password}" f"@{self.host}:{self.port}/{self.database_name}"


Expand Down
1 change: 1 addition & 0 deletions openhexa/sdk/workspaces/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def __repr__(self):
return CustomConnection(**fields)

def create_dataset(self, identifier: str, name: str, description: str):
"""Create a new dataset."""
raise NotImplementedError("create_dataset is not implemented yet.")

def get_dataset(self, identifier: str) -> Dataset:
Expand Down
1 change: 1 addition & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DatasetTest(TestCase):
)
@patch("openhexa.sdk.datasets.dataset.graphql")
def test_create_dataset_version(self, mock_graphql):
"""Ensure that dataset versions can be created."""
d = Dataset(
id="id",
slug="my-dataset",
Expand Down

0 comments on commit 01e7c64

Please sign in to comment.