Skip to content

Commit

Permalink
Pass the current builder to the async put thread.
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji committed Nov 19, 2024
1 parent cdffb5b commit 44abead
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
11 changes: 9 additions & 2 deletions python/vineyard/core/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def put(
builder: Optional[BuilderContext] = None,
persist: bool = False,
name: Optional[str] = None,
as_async: bool = False,
**kwargs
):
"""Put python value to vineyard.
Expand Down Expand Up @@ -185,16 +186,22 @@ def put(
name: str, optional
If given, the name will be automatically associated with the resulted
object. Note that only take effect when the object is persisted.
as_async: bool, optional
If true, which means the object will be put to vineyard asynchronously.
Thus we need to use the passed builder.
kw:
User-specific argument that will be passed to the builder.
Returns:
ObjectID: The result object id will be returned.
"""
if builder is not None:
if builder is not None and not as_async:
return builder(client, value, **kwargs)

meta = get_current_builders().run(client, value, **kwargs)
if as_async:
meta = builder.run(client, value, **kwargs)
else:
meta = get_current_builders().run(client, value, **kwargs)

# the builders is expected to return an :class:`ObjectMeta`, or an
# :class:`Object` (in the `bytes_builder` and `memoryview` builder).
Expand Down
22 changes: 18 additions & 4 deletions python/vineyard/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from vineyard._C import VineyardException
from vineyard._C import _connect
from vineyard.core.builder import BuilderContext
from vineyard.core.builder import get_current_builders
from vineyard.core.builder import put
from vineyard.core.resolver import ResolverContext
from vineyard.core.resolver import get
Expand Down Expand Up @@ -839,10 +840,11 @@ def _put_internal(
builder: Optional[BuilderContext] = None,
persist: bool = False,
name: Optional[str] = None,
as_async: bool = False,
**kwargs,
):
try:
return put(self, value, builder, persist, name, **kwargs)
return put(self, value, builder, persist, name, as_async, **kwargs)
except NotEnoughMemoryException as exec:
with envvars(
{'VINEYARD_RPC_SKIP_RETRY': '1', 'VINEYARD_IPC_SKIP_RETRY': '1'}
Expand All @@ -868,7 +870,7 @@ def _put_internal(
host, port = meta[instance_id]['rpc_endpoint'].split(':')
self._rpc_client = _connect(host, port)
self.compression = previous_compression_state
return put(self, value, builder, persist, name, **kwargs)
return put(self, value, builder, persist, name, as_async, **kwargs)

@_apply_docstring(put)
def put(
Expand All @@ -881,16 +883,28 @@ def put(
**kwargs,
):
if as_async:

def _default_callback(future):
try:
result = future.result()
print(f"Successfully put object {result}", flush=True)
if isinstance(result, ObjectID):
print(f"Successfully put object {result}", flush=True)
elif isinstance(result, ObjectMeta):
print(f"Successfully put object {result.id}", flush=True)
except Exception as e:
print(f"Failed to put object: {e}", flush=True)

current_builder = builder or get_current_builders()

thread_pool = self.put_thread_pool
result = thread_pool.submit(
self._put_internal, value, builder, persist, name, **kwargs
self._put_internal,
value,
current_builder,
persist,
name,
as_async=True,
**kwargs,
)
result.add_done_callback(_default_callback)
return result
Expand Down

0 comments on commit 44abead

Please sign in to comment.