Skip to content

Commit

Permalink
GC support
Browse files Browse the repository at this point in the history
1. Removed threading & new test
2. Improved time complexity.
3. Minor fixes.
  • Loading branch information
amsukdu committed Jun 23, 2019
1 parent 9846986 commit 5fec255
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 203 deletions.
6 changes: 4 additions & 2 deletions ring/func/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ def factory(
# keyword-only arguments from here
# building blocks
coder, miss_value, user_interface, storage_class,
maxsize=128,
default_action=Ellipsis,
coder_registry=Ellipsis,
# callback
Expand Down Expand Up @@ -638,7 +639,7 @@ class RingRope(RopeCore):
def __init__(self, *args, **kwargs):
super(RingRope, self).__init__(*args, **kwargs)
self.user_interface = self.user_interface_class(self)
self.storage = self.storage_class(self, storage_backend)
self.storage = self.storage_class(self, storage_backend, maxsize)
_ignorable_keys = suggest_ignorable_keys(
self.callable, ignorable_keys)
_key_prefix = suggest_key_prefix(self.callable, key_prefix)
Expand Down Expand Up @@ -747,9 +748,10 @@ class BaseStorage(object):
are mandatory; Otherwise not.
"""

def __init__(self, rope, backend):
def __init__(self, rope, backend, maxsize):
self.rope = rope
self.backend = backend
self.maxsize = maxsize

@abc.abstractmethod
def get(self, key): # pragma: no cover
Expand Down
126 changes: 52 additions & 74 deletions ring/func/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import time
import re
import hashlib
import threading, random, time

from . import base as fbase, lru_cache as lru_mod

Expand Down Expand Up @@ -206,82 +205,68 @@ def touch_value(self, key, expire):
except KeyError:
pass

class Gc(object):
class SizeMaintainer(object):
_DEBUG = False

def __init__(self, backend, target_size, expire_f=None):
from collections import abc
assert round(target_size) > 0, 'target_size has to be at least 1'
assert expire_f is None or callable(expire_f), 'expire_f has to be function or None'
assert isinstance(backend, type({})), 'backend has to be dict-like'
assert isinstance(backend, abc.MutableMapping), 'backend has to be dict-like'
self._backend = backend
self._target_size = round(target_size)
self._expire_f = expire_f
self._mutex = threading.Lock()

def run(self):
class WorkThread(threading.Thread):
DEBUG = False

def __init__(self, outer_instance):
threading.Thread.__init__(self)
self._backend = outer_instance._backend
self._target_size = outer_instance._target_size
self._expire_f = outer_instance._expire_f
self._mutex = outer_instance._mutex

def strategy_with_expire(self):
MAX_EXPIRE_RETRY_COUNT = 4
now = time.time()
retry_count = 0
keys = list(self._backend.keys())
while (len(self._backend) > self._target_size) and (retry_count < MAX_EXPIRE_RETRY_COUNT):
k = keys.pop(random.randrange(len(keys)))
val = self._backend.get(k, None)
if val is None:
continue
expire = self._expire_f(val)
if expire < now:
self._backend.pop(k, None)
self.print_d('{} removed from strategy_with_expire => size {}'.format(k, len(self._backend)))
else:
retry_count += 1
if len(keys) == 0:
keys = list(self._backend.keys())

def strategy_with_force(self):
keys = list(self._backend.keys())
while (len(self._backend) > self._target_size) and len(keys) > 0:
k = keys.pop(random.randrange(len(keys)))
self._backend.pop(k, None)
self.print_d('{} removed from strategy_with_force => size {}'.format(k, len(self._backend)))
if len(keys) == 0:
keys = list(self._backend.keys())

def run(self):
try:
self.print_d('gc started in size:{}'.format(len(self._backend)))
if self._expire_f is not None:
self.strategy_with_expire()
self.strategy_with_force()
self.print_d('gc ended in size:{}'.format(len(self._backend)))
finally:
self._mutex.release()


def print_d(self, str_):
if self.DEBUG:
print(str_)

self._mutex.acquire()
if (len(self._backend) > self._target_size):
WorkThread(self).start()
else:
self._mutex.release()
if (len(self._backend) <= self._target_size):
return

import random, time
def strategy_with_expire():
MAX_EXPIRE_RETRY_COUNT = 4
now = time.time()
retry_count = 0

keys = list(self._backend.keys())
random.shuffle(keys)
key_index = 0
while (len(self._backend) > self._target_size) and (retry_count < MAX_EXPIRE_RETRY_COUNT):
key = keys[key_index]
val = self._backend.get(key, None)
expire = self._expire_f(val)
if expire < now:
self._backend.pop(key, None)
key_index += 1
if self._DEBUG:
print('{} removed from strategy_with_expire => size {}'.format(key, len(self._backend)))
else:
retry_count += 1

def strategy_with_force():
keys = list(self._backend.keys())
random.shuffle(keys)
key_index = 0
while (len(self._backend) > self._target_size):
key = keys[key_index]
self._backend.pop(key, None)
key_index += 1
if self._DEBUG:
print('{} removed from strategy_with_force => size {}'.format(key, len(self._backend)))

if self._DEBUG:
print('gc started in size:{}'.format(len(self._backend)))

if self._expire_f is not None:
strategy_with_expire()
strategy_with_force()

if self._DEBUG:
print('gc ended in size:{}'.format(len(self._backend)))


class ExpirableDictStorage(fbase.CommonMixinStorage, fbase.StorageMixin):
maxsize = 128
in_memory_storage = True
now = time.time
_gc = None

def get_value(self, key):
_now = self.now()
Expand All @@ -302,9 +287,7 @@ def set_value(self, key, value, expire):
self.backend[key] = expired_time, value

if self.maxsize < len(self.backend):
if self._gc == None:
self._gc = Gc(self.backend, self.maxsize * 0.75, lambda x: x[0])
self._gc.run()
SizeMaintainer(self.backend, self.maxsize * 0.75, lambda x: x[0]).run()

def delete_value(self, key):
try:
Expand All @@ -330,8 +313,6 @@ def touch_value(self, key, expire):

class PersistentDictStorage(fbase.CommonMixinStorage, fbase.StorageMixin):
in_memory_storage = True
maxsize = 128
_gc = None

def get_value(self, key):
try:
Expand All @@ -343,9 +324,7 @@ def get_value(self, key):
def set_value(self, key, value, expire):
self.backend[key] = value
if self.maxsize < len(self.backend):
if self._gc == None:
self._gc = Gc(self.backend, self.maxsize * 0.75)
self._gc.run()
SizeMaintainer(self.backend, self.maxsize * 0.75).run()

def delete_value(self, key):
try:
Expand Down Expand Up @@ -549,11 +528,10 @@ def dict(
storage_class = PersistentDictStorage
else:
storage_class = ExpirableDictStorage
storage_class.maxsize = maxsize

return fbase.factory(
obj, key_prefix=key_prefix, on_manufactured=None,
user_interface=user_interface, storage_class=storage_class,
user_interface=user_interface, storage_class=storage_class, maxsize=maxsize,
miss_value=None, expire_default=expire, coder=coder,
**kwargs)

Expand Down
127 changes: 0 additions & 127 deletions tests/test_dict_gc.py

This file was deleted.

Loading

0 comments on commit 5fec255

Please sign in to comment.