Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahbenizzy committed Jun 14, 2024
1 parent 89df73a commit f038c09
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
4 changes: 3 additions & 1 deletion ui/sdk/src/hamilton_sdk/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ def post_node_execute(
},
attribute_role="error",
)
task_attr_2 = task_attr.copy()
task_attr_2["name"] = "task_attr_2"
task_run.end_time = datetime.datetime.now(timezone.utc)
tracking_state.update_task(node_.name, task_run)
task_update = dict(
Expand All @@ -309,7 +311,7 @@ def post_node_execute(
)
self.client.update_tasks(
self.dw_run_ids[run_id],
attributes=[task_attr],
attributes=[task_attr, task_attr_2],
task_updates=[task_update],
in_samples=[task_run.is_in_sample],
)
Expand Down
21 changes: 13 additions & 8 deletions ui/sdk/src/hamilton_sdk/api/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import threading
import time
from collections import defaultdict
from functools import reduce
from typing import Any, Callable, Dict, List
from urllib.parse import urlencode

Expand Down Expand Up @@ -234,13 +233,19 @@ def flush(self, batch):
task_updates[task_update["node_name"]].append(task_update)

# this assumes correct ordering of the attributes and task_updates
attributes_list = [
reduce(lambda x, y: {**x, **y}, attributes[node_name]) for node_name in attributes
]
task_updates_list = [
reduce(lambda x, y: {**x, **y}, task_updates[node_name])
for node_name in task_updates
]
# attributes_list = [
# reduce(lambda x, y: {**x, **y}, attributes[node_name]) for node_name in attributes
# ]
# task_updates_list = [
# reduce(lambda x, y: {**x, **y}, task_updates[node_name])
# for node_name in task_updates
# ]
attributes_list = []
for node_name in attributes:
attributes_list.extend(attributes[node_name])
task_updates_list = []
for node_name in task_updates:
task_updates_list.extend(task_updates[node_name])

response = requests.put(
f"{self.base_url}/dag_runs_bulk?dag_run_id={dag_run_id}",
Expand Down

0 comments on commit f038c09

Please sign in to comment.