Skip to content

Commit

Permalink
get_or_update_many
Browse files Browse the repository at this point in the history
  • Loading branch information
youknowone committed May 31, 2018
1 parent 406d49a commit fdeb512
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 35 deletions.
46 changes: 28 additions & 18 deletions ring/func_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,6 @@ def touch(self, wire, **kwargs):
return self.ring.storage.touch(key)


@asyncio.coroutine
def execute_bulk_item(wire, args):
if isinstance(args, tuple):
result = yield from wire._ring.cwrapper.callable(
*(wire._preargs + args))
return result
elif isinstance(args, type_dict):
result = yield from wire._ring.cwrapper.callable(
*wire._preargs, **args)
return result
else:
raise TypeError(
"Each parameter of '_many' suffixed sub-functions must be an "
"instance of 'tuple' or 'dict'")


class BulkInterfaceMixin(fbase.AbstractBulkUserInterfaceMixin):
"""Bulk access interface mixin.
Expand All @@ -150,7 +134,7 @@ class BulkInterfaceMixin(fbase.AbstractBulkUserInterfaceMixin):
return_annotation=lambda a: List[a.get('return', Any)])
def execute_many(self, wire, *args_list):
return asyncio.gather(*(
execute_bulk_item(wire, args) for args in args_list))
fbase.execute_bulk_item(wire, args) for args in args_list))

@fbase.interface_attrs(
return_annotation=lambda a: List[Optional[a.get('return', Any)]])
Expand All @@ -159,14 +143,40 @@ def get_many(self, wire, *args_list):
return self.ring.storage.get_many(
keys, miss_value=self.ring.miss_value)

@fbase.interface_attrs(return_annotation=None)
@fbase.interface_attrs(
return_annotation=lambda a: List[a.get('return', Any)])
@asyncio.coroutine
def update_many(self, wire, *args_list):
keys = self.key_many(wire, *args_list)
values = yield from self.execute_many(wire, *args_list)
yield from self.ring.storage.set_many(keys, values)
return values

@fbase.interface_attrs(
return_annotation=lambda a: List[a.get('return', Any)])
@asyncio.coroutine
def get_or_update_many(self, wire, *args_list):
keys = self.key_many(wire, *args_list)
miss_value = object()
results = yield from self.ring.storage.get_many(
keys, miss_value=miss_value)

miss_indices = []
for i, akr in enumerate(zip(args_list, keys, results)):
args, key, result = akr
if result is not miss_value:
continue
miss_indices.append(i)

new_results = yield from asyncio.gather(*(
fbase.execute_bulk_item(wire, args_list[i]) for i in miss_indices))
new_keys = [keys[i] for i in miss_indices]
yield from self.ring.storage.set_many(new_keys, new_results)

for new_i, old_i in enumerate(miss_indices):
results[old_i] = new_results[new_i]
return results

@fbase.interface_attrs(return_annotation=None)
def set_many(self, wire, args_list, value_list):
keys = self.key_many(wire, *args_list)
Expand Down
11 changes: 11 additions & 0 deletions ring/func_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,17 @@ def create_bulk_key(interface, wire, args):
"instance of 'tuple' or 'dict'")


def execute_bulk_item(wire, args):
if isinstance(args, tuple):
return wire._ring.cwrapper.callable(*(wire._preargs + args))
elif isinstance(args, dict):
return wire._ring.cwrapper.callable(*wire._preargs, **args)
else:
raise TypeError(
"Each parameter of '_many' suffixed sub-functions must be an "
"instance of 'tuple' or 'dict'")


class AbstractBulkUserInterfaceMixin(object):
"""Bulk access interface mixin.
Expand Down
44 changes: 27 additions & 17 deletions ring/func_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
__all__ = ('dict', 'memcache', 'redis_py', 'redis', 'disk', )


type_dict = dict


class CacheUserInterface(fbase.BaseUserInterface):
"""General cache user interface provider.
Expand Down Expand Up @@ -76,17 +73,6 @@ def touch(self, wire, **kwargs):
self.ring.storage.touch(key)


def execute_bulk_item(wire, args):
if isinstance(args, tuple):
return wire._ring.cwrapper.callable(*(wire._preargs + args))
elif isinstance(args, type_dict):
return wire._ring.cwrapper.callable(*wire._preargs, **args)
else:
raise TypeError(
"Each parameter of '_many' suffixed sub-functions must be an "
"instance of 'tuple' or 'dict'")


class BulkInterfaceMixin(fbase.AbstractBulkUserInterfaceMixin):
"""Bulk access interface mixin.
Expand All @@ -97,7 +83,7 @@ class BulkInterfaceMixin(fbase.AbstractBulkUserInterfaceMixin):
@fbase.interface_attrs(
return_annotation=lambda a: List[a.get('return', Any)])
def execute_many(self, wire, *args_list):
values = [execute_bulk_item(wire, args) for args in args_list]
values = [fbase.execute_bulk_item(wire, args) for args in args_list]
return values

@fbase.interface_attrs(
Expand All @@ -108,13 +94,37 @@ def get_many(self, wire, *args_list):
keys, miss_value=self.ring.miss_value)
return results

@fbase.interface_attrs(return_annotation=None)
@fbase.interface_attrs(
return_annotation=lambda a: List[a.get('return', Any)])
def update_many(self, wire, *args_list):
keys = self.key_many(wire, *args_list)
values = [execute_bulk_item(wire, args) for args in args_list]
values = self.execute_many(wire, *args_list)
self.ring.storage.set_many(keys, values)
return values

@fbase.interface_attrs(
return_annotation=lambda a: List[a.get('return', Any)])
def get_or_update_many(self, wire, *args_list):
keys = self.key_many(wire, *args_list)
miss_value = object()
results = self.ring.storage.get_many(keys, miss_value=miss_value)

miss_indices = []
for i, akr in enumerate(zip(args_list, keys, results)):
args, key, result = akr
if result is not miss_value:
continue
miss_indices.append(i)

new_results = [
fbase.execute_bulk_item(wire, args_list[i]) for i in miss_indices]
new_keys = [keys[i] for i in miss_indices]
self.ring.storage.set_many(new_keys, new_results)

for new_i, old_i in enumerate(miss_indices):
results[old_i] = new_results[new_i]
return results

@fbase.interface_attrs(return_annotation=None)
def set_many(self, wire, args_list, value_list):
keys = self.key_many(wire, *args_list)
Expand Down
8 changes: 8 additions & 0 deletions tests/_test_func_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ def f(a):
(3,),
)
assert r == [b'foo', b'bar', b't3']
yield from f.delete(2)

r = yield from f.get_or_update_many(
(1,),
(2,),
(3,),
)
assert r == [b'foo', b't2', b't3']

with pytest.raises(AttributeError):
yield from f.delete_many()
Expand Down
7 changes: 7 additions & 0 deletions tests/test_memcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ def f(a, b):
(5, 1),
)
assert mv == [503, 716, 501]
f.delete(1, 4)
mv = f.get_or_update_many(
(1, 2),
(1, 4),
(5, 1),
)
assert mv == [503, 104, 501]

with pytest.raises(AttributeError):
f.touch_many()
Expand Down

0 comments on commit fdeb512

Please sign in to comment.