diff --git a/ring/func/asyncio.py b/ring/func/asyncio.py index efe8fda..ef6bf45 100644 --- a/ring/func/asyncio.py +++ b/ring/func/asyncio.py @@ -405,10 +405,10 @@ def set_many_values(self, keys, values, expire): class AioredisHashStorage(AioredisStorage): """Storage implementation for :class:`aioredis.Redis`.""" - def __init__(self, rope, backend): + def __init__(self, rope, backend, maxsize): storage_backend = backend[0] self.hash_key = backend[1] - super(AioredisHashStorage, self).__init__(rope, storage_backend) + super(AioredisHashStorage, self).__init__(rope, storage_backend, maxsize) @asyncio.coroutine def get_value(self, key): @@ -452,7 +452,7 @@ def set_many_values(self, keys, values, expire): def dict( obj, key_prefix=None, expire=None, coder=None, user_interface=CacheUserInterface, storage_class=None, - **kwargs): + maxsize=128, **kwargs): """:class:`dict` interface for :mod:`asyncio`. :see: :func:`ring.func.sync.dict` for common description. @@ -463,6 +463,7 @@ def dict( storage_class = fsync.PersistentDictStorage else: storage_class = fsync.ExpirableDictStorage + storage_class.maxsize = maxsize return fbase.factory( obj, key_prefix=key_prefix, on_manufactured=None, diff --git a/ring/func/base.py b/ring/func/base.py index f3bb8c0..778ee91 100644 --- a/ring/func/base.py +++ b/ring/func/base.py @@ -562,6 +562,7 @@ def factory( # keyword-only arguments from here # building blocks coder, miss_value, user_interface, storage_class, + maxsize=None, default_action=Ellipsis, coder_registry=Ellipsis, # callback @@ -638,7 +639,7 @@ class RingRope(RopeCore): def __init__(self, *args, **kwargs): super(RingRope, self).__init__(*args, **kwargs) self.user_interface = self.user_interface_class(self) - self.storage = self.storage_class(self, storage_backend) + self.storage = self.storage_class(self, storage_backend, maxsize) _ignorable_keys = suggest_ignorable_keys( self.callable, ignorable_keys) _key_prefix = suggest_key_prefix(self.callable, key_prefix) @@ -747,9 +748,10 @@ class BaseStorage(object): are mandatory; Otherwise not. """ - def __init__(self, rope, backend): + def __init__(self, rope, backend, maxsize): self.rope = rope self.backend = backend + self.maxsize = maxsize @abc.abstractmethod def get(self, key): # pragma: no cover diff --git a/ring/func/sync.py b/ring/func/sync.py index 4ce4ccd..7551b9d 100644 --- a/ring/func/sync.py +++ b/ring/func/sync.py @@ -205,9 +205,66 @@ def touch_value(self, key, expire): except KeyError: pass +class SizeMaintainer(object): + _DEBUG = False + + def __init__(self, backend, target_size, expire_f=None): + from collections import abc + assert round(target_size) > 0, 'target_size has to be at least 1' + assert expire_f is None or callable(expire_f), 'expire_f has to be function or None' + assert isinstance(backend, abc.MutableMapping), 'backend has to be dict-like' + self._backend = backend + self._target_size = round(target_size) + self._expire_f = expire_f + + def run(self): + if (len(self._backend) <= self._target_size): + return + + import random, time + def strategy_with_expire(): + MAX_EXPIRE_RETRY_COUNT = 4 + now = time.time() + retry_count = 0 + + keys = list(self._backend.keys()) + random.shuffle(keys) + key_index = 0 + while (len(self._backend) > self._target_size) and (retry_count < MAX_EXPIRE_RETRY_COUNT): + key = keys[key_index] + val = self._backend.get(key, None) + expire = self._expire_f(val) + if expire < now: + self._backend.pop(key, None) + key_index += 1 + if self._DEBUG: + print('{} removed from strategy_with_expire => size {}'.format(key, len(self._backend))) + else: + retry_count += 1 + + def strategy_with_force(): + keys = list(self._backend.keys()) + random.shuffle(keys) + key_index = 0 + while (len(self._backend) > self._target_size): + key = keys[key_index] + self._backend.pop(key, None) + key_index += 1 + if self._DEBUG: + print('{} removed from strategy_with_force => size {}'.format(key, len(self._backend))) + + if self._DEBUG: + print('gc started in size:{}'.format(len(self._backend))) + + if self._expire_f is not None: + strategy_with_expire() + strategy_with_force() + + if self._DEBUG: + print('gc ended in size:{}'.format(len(self._backend))) -class ExpirableDictStorage(fbase.CommonMixinStorage, fbase.StorageMixin): +class ExpirableDictStorage(fbase.CommonMixinStorage, fbase.StorageMixin): in_memory_storage = True now = time.time @@ -229,6 +286,9 @@ def set_value(self, key, value, expire): expired_time = _now + expire self.backend[key] = expired_time, value + if (self.maxsize is not None) and (self.maxsize < len(self.backend)): + SizeMaintainer(self.backend, self.maxsize * 0.75, lambda x: x[0]).run() + def delete_value(self, key): try: del self.backend[key] @@ -252,7 +312,6 @@ def touch_value(self, key, expire): class PersistentDictStorage(fbase.CommonMixinStorage, fbase.StorageMixin): - in_memory_storage = True def get_value(self, key): @@ -264,6 +323,8 @@ def get_value(self, key): def set_value(self, key, value, expire): self.backend[key] = value + if (self.maxsize is not None) and self.maxsize < len(self.backend): + SizeMaintainer(self.backend, self.maxsize * 0.75).run() def delete_value(self, key): try: @@ -354,10 +415,10 @@ def set_many_values(self, keys, values, expire): class RedisHashStorage(RedisStorage): - def __init__(self, rope, backend): + def __init__(self, rope, backend, maxsize): storage_backend = backend[0] self.hash_key = backend[1] - super(RedisHashStorage, self).__init__(rope, storage_backend) + super(RedisHashStorage, self).__init__(rope, storage_backend, maxsize) def get_value(self, key): value = self.backend.hget(self.hash_key, key) @@ -443,7 +504,7 @@ def lru( def dict( obj, key_prefix=None, expire=None, coder=None, user_interface=CacheUserInterface, storage_class=None, - **kwargs): + maxsize=128, **kwargs): """Basic Python :class:`dict` based cache. This backend is not designed for real products. Please carefully read the @@ -470,7 +531,7 @@ def dict( return fbase.factory( obj, key_prefix=key_prefix, on_manufactured=None, - user_interface=user_interface, storage_class=storage_class, + user_interface=user_interface, storage_class=storage_class, maxsize=maxsize, miss_value=None, expire_default=expire, coder=coder, **kwargs) diff --git a/tests/test_dict_size.py b/tests/test_dict_size.py new file mode 100644 index 0000000..97edb12 --- /dev/null +++ b/tests/test_dict_size.py @@ -0,0 +1,168 @@ + +import ring + + +def test_dict_size_persistence_1(): + cache = {} + MAX_SIZE = 1 + + @ring.dict(cache, maxsize=MAX_SIZE) + def f_persistent_1(i): + return i + + for i in range(MAX_SIZE * 100): + f_persistent_1(i) + assert len(cache) <= 1 + assert len(cache) <= 1 + +def test_dict_size_persistence_default(): + cache = {} + + @ring.dict(cache) + def f_persistent_default(i): + return i + + for i in range(1000): + f_persistent_default(i) + assert len(cache) <= 128 + assert len(cache) <= 128 + +def test_dict_size_persistence_1000(): + cache = {} + MAX_SIZE = 1000 + + @ring.dict(cache, maxsize=MAX_SIZE) + def f_persistent_default(i): + return i + + for i in range(MAX_SIZE * 100): + f_persistent_default(i) + assert len(cache) <= MAX_SIZE + assert len(cache) <= MAX_SIZE + +def test_dict_size_persistent_with_delete(): + cache = {} + MAX_SIZE = 10 + + @ring.dict(cache, maxsize=MAX_SIZE) + def f_persistent_with_delete(i): + return i + + for i in range(MAX_SIZE * 100): + f_persistent_with_delete(i) + assert len(cache) <= MAX_SIZE + if i % 17 == 0: + for pop_count in range(8): + if len(cache) > 0: + cache.popitem() + + + assert len(cache) <= MAX_SIZE + +def test_dict_size_persistent_infinite(): + cache = {} + MAX_SIZE = None + + @ring.dict(cache, maxsize=MAX_SIZE) + def f_persistent_infinite(i): + return i + + for i in range(10000): + f_persistent_infinite(i) + assert len(cache) <= 10000 + + assert len(cache) == 10000 + +def test_dict_size_expire_1(): + cache = {} + MAX_SIZE = 1 + + @ring.dict(cache, maxsize=MAX_SIZE, expire=1) + def f_expire_1(i): + return i + + for i in range(MAX_SIZE * 100): + f_expire_1(i) + assert len(cache) <= MAX_SIZE + assert len(cache) <= MAX_SIZE + +def test_dict_size_expire_default(): + cache = {} + + @ring.dict(cache, expire=1) + def f_expire_default(i): + return i + + for i in range(1000): + f_expire_default(i) + assert len(cache) <= 128 + assert len(cache) <= 128 + +def test_dict_size_expire_1000(): + cache = {} + MAX_SIZE = 1000 + + @ring.dict(cache, maxsize=MAX_SIZE, expire=1) + def f_expire_1(i): + return i + + for i in range(MAX_SIZE * 100): + f_expire_1(i) + assert len(cache) <= MAX_SIZE + + assert len(cache) <= MAX_SIZE + +def test_dict_size_expire_some(): + import time + cache = {} + MAX_SIZE = 150 + + @ring.dict(cache, maxsize=MAX_SIZE, expire=1) + def f_expire_some_expire(i): + return i + + for _ in range(5): + for i in range(100): + f_expire_some_expire(i) + assert len(cache) <= MAX_SIZE + time.sleep(1) + for i in range(100, 200): + f_expire_some_expire(i) + assert len(cache) <= MAX_SIZE + assert len(cache) <= MAX_SIZE + + +def test_dict_size_expire_with_delete(): + import time + cache = {} + + @ring.dict(cache, expire=1) + def f_expire_with_delete(i): + return i + + for i in range(1000): + f_expire_with_delete(i) + time.sleep(1) + for i in range(1000, 2000): + f_expire_with_delete(i) + assert len(cache) <= 128 + if i % 17 == 0: + for pop_count in range(8): + if len(cache) > 0: + cache.popitem() + + assert len(cache) <= 128 + +def test_dict_size_expire_infinite(): + cache = {} + MAX_SIZE = None + + @ring.dict(cache, maxsize=MAX_SIZE) + def f_expire_infinite(i): + return i + + for i in range(10000): + f_expire_infinite(i) + assert len(cache) <= 10000 + + assert len(cache) == 10000