Skip to content

Commit

Permalink
feat(http): httpcontext now supports requesting individual backends
Browse files Browse the repository at this point in the history
HTTPContext(backends=["buckets"]) will only return the buckets backend

BREAKING CHANGE: work object by default only instantiates a buckets connection in the http context
  • Loading branch information
shinybrar committed Jun 18, 2024
1 parent 3f616ab commit 569d502
Show file tree
Hide file tree
Showing 20 changed files with 199 additions and 295 deletions.
12 changes: 7 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ services:
- TZ=Etc/UTC
networks:
- workflow
restart: always

managers:
image: chimefrb/pipelines:latest
container_name: managers
command: ["/bin/bash", "-c", "touch /etc/site.yml && python -m managers.server"]
command: ["/bin/bash", "-c", "python -m managers.server"]
ports:
- "8002:8002"
environment:
Expand All @@ -46,12 +45,13 @@ services:
- SANIC_UPDATE_INTERVAL_SECONDS=40
- SANIC_SLEEP_INTERVAL_SECONDS=30
- SANIC_PURGE_TIME_SECONDS=3600
- SANIC_SITE_CONFIG_FILEPATH=/etc/site.yml
- DOCKER_HOST=unix:///var/run/docker.sock # Replace with production address or dind
- SANIC_DEPLOYER_REDEPLOY_WAIT_TIME=30
- SANIC_DEPLOYER_MAX_REPLICAS=256
- SANIC_WORKSPACE_FILEPATH=/etc/workspace.yml
- TZ=Etc/UTC
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./tests/workflow-site.yml:/etc/site.yml
- ./workflow/workspaces/development.yml:/etc/workspace.yml
networks:
- workflow

Expand Down Expand Up @@ -106,4 +106,6 @@ services:

networks:
workflow:
name: workflow
driver: bridge
attachable: true
2 changes: 1 addition & 1 deletion tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from workflow.utils import read

workspace_path = (
Path(__file__).parent.parent / "workflow" / "workspaces" / "sample-test.yaml"
Path(__file__).parent.parent / "workflow" / "workspaces" / "development.yml"
)
WORKSPACE = read.workspace(workspace_path)

Expand Down
29 changes: 15 additions & 14 deletions tests/test_buckets.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from typing import Any, Dict, List, Union

from workflow.definitions.work import Work
from workflow.http.buckets import Buckets
from workflow.http.context import HTTPContext

buckets = Buckets(baseurl="http://localhost:8004")
pipeline = "test-buckets"
http = HTTPContext(backends=["buckets"])


def test_buckets_lifecycle():
"""Test case where withdrawing from a pipeline with work deposited."""
work = Work(pipeline=pipeline, user="tester", site="local")
response: Union[bool, List[str]] = work.deposit()
response: Union[bool, List[str]] = work.deposit(http=http)
assert isinstance(response, bool)
# Withdrawing from the bucket should return the work
withdrawn: Work = Work.withdraw(pipeline=pipeline)
Expand All @@ -19,31 +19,31 @@ def test_buckets_lifecycle():
# Update the work
withdrawn.update()
# Check that the work has been updated
response = buckets.view(
query={"pipeline": pipeline}, projection={"id": 1, "status": 1}
view = http.buckets.view(
query={"pipeline": pipeline}, projection={"id": True, "status": True}
)
assert response[0]["status"] == "success"
assert response[0]["id"] == withdrawn.id
assert view[0]["status"] == "success"
assert view[0]["id"] == withdrawn.id
assert withdrawn.delete() is True


def test_withdraw_from_multiple_buckets():
"""Test case where withdrawing from multiple buckets."""
pipelines = ["test-buckets-1", "test-buckets-2", "test-buckets-3"]
works: Dict[str, Any] = []
works: List[Dict[str, Any]] = []
for pipeline in pipelines:
work = Work(pipeline=pipeline, user="tester", site="local")
works.append(work.payload)
# Deposit works using the buckets API Directly
ids: Union[bool, List[str]] = buckets.deposit(works, return_ids=True)
ids: Union[bool, List[str]] = http.buckets.deposit(works, return_ids=True)
assert isinstance(ids, list)
assert len(ids) == 3
# Withdraw all works
for attempt in range(len(pipelines)):
work: Work = Work.withdraw(pipeline=pipelines)
for _ in range(len(pipelines)):
work: Work = Work.withdraw(pipeline=pipelines) # type: ignore
assert work.id in ids
# Delete all works
assert buckets.delete_ids(ids) is True
assert http.buckets.delete_ids(ids) is True


def test_delete_many():
Expand All @@ -55,8 +55,9 @@ def test_delete_many():
).deposit()
# Delete all works with the tag "delete-many"
assert (
buckets.delete_many(pipeline=pipeline, tags=["delete-many"], force=True) is True
http.buckets.delete_many(pipeline=pipeline, tags=["delete-many"], force=True)
is True
)
# Check that the works have been deleted
status = buckets.status(pipeline=pipeline)
status = http.buckets.status(pipeline=pipeline)
assert status["total"] == 0
4 changes: 2 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_workflow_run_execution(self):
],
)

def test_workflow_run_execution_with_runspace(self):
def test_workflow_run_with_json_workspace(self):
runner = CliRunner()
directory = Path(workspaces.__file__).parent
devspace = directory / "development.yml"
Expand All @@ -78,7 +78,7 @@ def test_workflow_run_execution_with_runspace(self):
"--site=local",
"--lives=1",
"--sleep=1",
f"--runspace={runspace}",
f"--workspace={runspace}",
"some-pipeline-name",
],
)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

import pytest

from workflow.lifecycle.validation import command, function
from workflow.utils.validate import command, function


@pytest.mark.skip
def test_validate_function():
"""Test the validate function function."""
result = function("os.chmod")
Expand Down
8 changes: 5 additions & 3 deletions workflow/cli/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,17 @@ def count():
def ps(pipeline: str, id: str):
"""List a pipeline configuration in detail."""
http = HTTPContext()
query: str = json.dumps({"id": id})
projection: str = json.dumps({})
query: Dict[str, str] = {"id": id}
projection: Dict[str, str] = {}
console_content = None
column_max_width = 300
column_min_width = 40
try:
payload = http.pipelines.get_pipelines(
name=pipeline, query=query, projection=projection
)[0]
)[
0 # type: ignore
]
except IndexError:
error_text = Text("No Pipelines were found", style="red")
console_content = error_text
Expand Down
4 changes: 2 additions & 2 deletions workflow/cli/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ def results():
@results.command("version", help="Show the version.")
def version():
"""Show the version."""
http = HTTPContext()
http = HTTPContext(backends=["results"])
console.print(http.results.info())


@results.command("count", help="Count of results per pipeline.")
def count():
"""Count pipelines on results backend."""
http = HTTPContext()
http = HTTPContext(backends=["results"])
count_result = http.results.status()
table.add_column("Pipeline", max_width=50, justify="left", style="bright_blue")
table.add_column("Count", max_width=50, justify="left")
Expand Down
78 changes: 25 additions & 53 deletions workflow/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import signal
import time
from pathlib import Path
from sys import stderr, stdout
from threading import Event
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

Expand All @@ -13,10 +14,9 @@

from workflow import DEFAULT_WORKSPACE_PATH
from workflow.definitions.work import Work
from workflow.http.buckets import Buckets
from workflow.lifecycle import archive, container, execute, validation
from workflow.utils import read as reader
from workflow.utils import validate, write
from workflow.http.context import HTTPContext
from workflow.lifecycle import archive, container, execute
from workflow.utils import read, validate, write
from workflow.utils.logger import add_loki_handler, get_logger, set_tag, unset_tag

logger = get_logger("workflow.cli")
Expand Down Expand Up @@ -118,24 +118,7 @@ def run(
workspace: Union[str, Dict[Any, Any]],
log_level: str,
):
"""Fetch and perform work.
Args:
bucket (Tuple[str]): Bucket to fetch work from.
site (str): Site to filter work by.
tag (Tuple[str]): Tags to filter work by.
parent (Tuple[str]): Parent pipeline to filter work by.
function (str): Python function to execute.
command (str): Command to execute.
lives (int): Number of work to perform.
sleep (int): Time to sleep between work attempts.
workspace (str): Path or URL to the workspace file.
runspace (Optional[str]): Runtime JSON workspace.
log_level (str): Logging level.
Raises:
click.ClickException: _description_
"""
"""Fetch and perform work."""
# Set logging level
logger.root.setLevel(log_level)
logger.root.handlers[0].setLevel(log_level)
Expand All @@ -156,7 +139,7 @@ def run(
elif validate.url(workspace):
logger.info(f"Running with workspace from URL {workspace}")
# Fetch the workspace from the URL
payload = reader.url(workspace)
payload = read.url(workspace)
# Save the workspace to the default location
write.workspace(payload)

Expand All @@ -169,16 +152,12 @@ def run(

config: Dict[str, Any] = {}
try:
config = reader.workspace(DEFAULT_WORKSPACE_PATH)
config = read.workspace(DEFAULT_WORKSPACE_PATH)
except Exception as error:
logger.exception(error)
return
# Get the base urls from the config
baseurls = config.get("http", {}).get("baseurls", {})
buckets_url = baseurls.get("buckets", None)
loki_url = baseurls.get("loki", None)
products_url = baseurls.get("products", None)

# Reformat the tags, parent and buckets
tags: List[str] = list(tag)
parents: List[str] = list(parent)
Expand All @@ -198,10 +177,9 @@ def run(
"[bold red]Workspace [/bold red]",
extra=dict(markup=True, color="green"),
)
logger.info(f"Workspace: {workspace}")
logger.info(f"Buckets URL : {buckets_url}")
logger.info(f"Loki URL : {loki_url}")
logger.info(f"Prod URL : {products_url}")
logger.info(f"Buckets URL : {baseurls.get('buckets')}")
logger.info(f"Loki URL : {baseurls.get('loki')}")
logger.info(f"Prod URL : {baseurls.get('products')}")
logger.info(
"[bold red]Work Filters [/bold red]",
extra=dict(markup=True, color="green"),
Expand All @@ -221,35 +199,32 @@ def run(
"[bold red]Backend Checks [/bold red]",
extra=dict(markup=True, color="green"),
)
loki_status = add_loki_handler(logger, loki_url, config)
loki_status = add_loki_handler(logger, baseurls.get("loki"), config)
logger.info(f"Loki Logs: {'✅' if loki_status else '❌'}")

try:
Buckets(baseurl=buckets_url)
HTTPContext(backends=["buckets"]).buckets.info()
logger.info("Buckets : ✅")
except Exception as error:
logger.error(error)
raise click.ClickException("unable to connect to workflow backend")

logger.exception(error)
return
# Check if the function value provided is valid
if function:
validation.function(function)
validate.function(function)
logger.info("Function : ✅")

try:
logger.info(
"[bold]Starting Workflow Lifecycle[/bold]",
extra=dict(markup=True, color="green"),
)
slowdown: float = 1.0
if container.virtualization():
slowdown = 1000.0
console = Console(force_terminal=True, tab_size=4)
tty: bool = stdout.isatty() or stderr.isatty()
with console.status(
status="",
spinner="dots",
spinner="aesthetic" if tty else "dots",
spinner_style="bold green",
refresh_per_second=1,
speed=1 / slowdown,
refresh_per_second=10 if tty else 0.1,
):
lifecycle(
buckets,
Expand All @@ -259,7 +234,6 @@ def run(
site,
tags,
parents,
buckets_url,
config,
)
except Exception as error:
Expand All @@ -279,7 +253,6 @@ def lifecycle(
site: str,
tags: List[str],
parents: List[str],
base_url: str,
config: Dict[str, Any],
):
"""Run the workflow lifecycle."""
Expand All @@ -298,7 +271,7 @@ def quit(signo: int, _: Any):

# Run the lifecycle until the exit event is set or the lifetime is reached
while lifetime != 0 and not exit.is_set():
attempt(buckets, function, base_url, site, tags, parents, config)
attempt(buckets, function, site, tags, parents, config)
lifetime -= 1
logger.debug(f"sleeping: {sleep_time}s")
exit.wait(sleep_time)
Expand All @@ -308,7 +281,6 @@ def quit(signo: int, _: Any):
def attempt(
buckets: List[str],
function: Optional[str],
base_url: str,
site: str,
tags: List[str],
parents: List[str],
Expand Down Expand Up @@ -336,7 +308,7 @@ def attempt(
try:
if function:
mode = "static"
user_func = validation.function(function)
user_func = validate.function(function)
else:
mode = "dynamic"
user_func = None
Expand All @@ -360,12 +332,12 @@ def attempt(

# Get the user function from the work object dynamically
if function:
user_func = validation.function(function)
user_func = validate.function(function)
work = execute.function(user_func, work)

# If we have a valid command, execute it
if command:
validation.command(command[0])
validate.command(command[0])
work = execute.command(command, work)
if int(work.timeout) + int(work.start) < time.time(): # type: ignore
raise TimeoutError("work timed out")
Expand All @@ -377,11 +349,11 @@ def attempt(
finally:
if work:
product_url = config.get("http", {}).get("baseurls", {}).get("products", "")
if any(work.notify.slack.dict().values()) and work.products:
if any(work.notify.slack.model_dump().values()) and work.products:
work.products = [
f"<{product_url}{product}|{product}>" for product in work.products
]
if any(work.notify.slack.dict().values()) and work.plots:
if any(work.notify.slack.model_dump().values()) and work.plots:
work.plots = [f"<{product_url}{plot}|{plot}>" for plot in work.plots]
work.update() # type: ignore
logger.info("work completed: ✅")
Expand Down
Loading

0 comments on commit 569d502

Please sign in to comment.