Skip to content

Commit

Permalink
1.6.0
Browse files Browse the repository at this point in the history
Add parameter 'done_max' to class Pool to limit number of results in queue.
Add parameter 'cnt_max' to function as_completed to limit number of results
    to read with one function call.
  • Loading branch information
brmmm3 committed Dec 11, 2019
1 parent 9ba9ad4 commit dab986a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
5 changes: 5 additions & 0 deletions Changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.6.0:
Add parameter 'done_max' to class Pool to limit number of results in queue.
Add parameter 'cnt_max' to function as_completed to limit number of results
to read with one function call.

1.5.2:
Bugfix: Add missing parameter in map-function when called with direct=False.

Expand Down
56 changes: 47 additions & 9 deletions fastthreadpool/fastthreadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,23 @@ class Pool(object): #p
#c cdef str child_name_prefix
#c cdef bint result_id, exc_stack, exc_args
#c cdef int _busy_cnt
#c cdef pythread.PyThread_type_lock _busy_lock
#c cdef pythread.PyThread_type_lock _busy_lock, _done_max_lock
#c cdef object _delayed, _scheduled, _jobs, _jobs_append, _jobs_appendleft, _done, _failed
#c cdef Semaphore _done_cnt, _failed_cnt
#c cdef bint _shutdown, _shutdown_children
#c cdef object logger
#c cdef object init_callback, init_args, done_callback, finish_callback
#c cdef object _thr_done, _thr_failed
#c cdef int _done_max

#c def __cinit__(self, int max_children=-9999, str child_name_prefix="", init_callback=None,
#c init_args=None, finish_callback=None, done_callback=None, failed_callback=None,
#c int log_level=0, bint result_id=False, bint exc_stack=False, bint exc_args=False):
#c int log_level=0, bint result_id=False, bint exc_stack=False, bint exc_args=False,
#c int done_max=0):
def __init__(self, max_children=-9999, child_name_prefix="", init_callback=None, #p
init_args=None, finish_callback=None, done_callback=None, failed_callback=None, #p
log_level=None, result_id=False, exc_stack=False, exc_args=False): #p
log_level=None, result_id=False, exc_stack=False, exc_args=False, #p
done_max=0): #p
self._job_cnt = Semaphore(0)
self._children = deque()
if max_children <= -9999:
Expand Down Expand Up @@ -192,6 +195,9 @@ def __init__(self, max_children=-9999, child_name_prefix="", init_callback=None,
self.init_args = tuple() if init_args is None else init_args
self.done_callback = False if done_callback is False else True
self.finish_callback = finish_callback
self._done_max = done_max # Maximum number of results in queue (0=no limit).
#c self._done_max_lock = pythread.PyThread_allocate_lock()
self._done_max_lock = Lock() #p
if callable(done_callback):
self._thr_done = Thread(target=self._done_thread, args=(done_callback, ),
name="ThreadPoolDone")
Expand Down Expand Up @@ -257,6 +263,15 @@ def _done_thread(self, done_callback):
break
self._done_cnt.acquire()

#c cdef void _done_cnt_inc(self):
def _done_cnt_inc(self): #p
self._done_cnt.release()
#c while (self._done_max > 0) and (self._done_cnt._value >= self._done_max):
#c with nogil:
#c pythread.PyThread_acquire_lock(self._done_max_lock, 1)
while (self._done_max > 0) and (self._done_cnt.value >= self._done_max): #p
self._done_max_lock.acquire() #p

def _failed_thread(self, failed_callback):
failed_popleft = self._failed.popleft
logger_exception = None if self.logger is None else self.logger.exception
Expand Down Expand Up @@ -347,13 +362,13 @@ def _child(self):
_done_append((jobid, result))
else:
_done_append(result)
self._done_cnt.release()
self._done_cnt_inc()
else:
if self.result_id:
_done_append((id(job), fn(*args, **kwargs)))
else:
_done_append(fn(*args, **kwargs))
self._done_cnt.release()
self._done_cnt_inc()
elif callable(done_callback):
if isgeneratorfunction(fn):
for result in fn(*args, **kwargs):
Expand Down Expand Up @@ -490,10 +505,19 @@ def schedule_done(self, interval, fn, done_callback, *args, **kwargs):
def scheduled(self):
return self._scheduled

def as_completed(self, wait=None):
#c def as_completed(self, wait=None, int cnt_max=0):
def as_completed(self, wait=None, cnt_max=0): #p
# wait:
# True=Wait until all jobs are done
# False=Read all results then return immediately
# int or float value=Read all results and wait for further results until timeout
# timeout=current time at function call + value
# cnt_max:
# Maximum number of results to read. 0=No limit.
#c cdef float to
#c cdef object pyto
#c cdef bint do_sleep
#c cdef int cnt
if self._thr_done is not None:
raise PoolCallback("Using done_callback!")
if self._thr_failed is not None:
Expand All @@ -507,13 +531,27 @@ def as_completed(self, wait=None):
failed = self._failed
done_popleft = done.popleft
failed_popleft = failed.popleft
cnt = 0
while self._busy_cnt or self._jobs or done or failed:
do_sleep = True
while done:
yield done_popleft()
if self._done_cnt.value > 0:
self._done_cnt.acquire(False)
if self._done_max > 0:
#c pythread.PyThread_release_lock(self._done_max_lock)
self._done_max_lock.release() #p
if cnt_max > 0:
cnt += 1
if cnt >= cnt_max:
return
do_sleep = False
while failed:
yield failed_popleft()
if cnt_max > 0:
cnt += 1
if cnt >= cnt_max:
return
do_sleep = False
if wait is False or ((to > 0.0) and (_time() > pyto)):
return
Expand Down Expand Up @@ -546,7 +584,7 @@ def _map_child(self, fn, itr, done_callback, unpack_args): #p
_done_append(fn(*args))
else:
_done_append(fn(args))
self._done_cnt.release()
self._done_cnt_inc()
except Exception as exc:
self._append_failed(0, exc, args, {})
elif callable(done_callback):
Expand Down Expand Up @@ -589,11 +627,11 @@ def _imap_child(self, fn, itr, done_callback, unpack_args): #p
if unpack_args:
for result in fn(*args):
_done_append(result)
self._done_cnt.release()
self._done_cnt_inc()
else:
for result in fn(args):
_done_append(result)
self._done_cnt.release()
self._done_cnt_inc()
except Exception as exc:
self._append_failed(0, exc, args, {})
elif callable(done_callback):
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pyorcy import extract_cython

PKGNAME = 'fastthreadpool'
PKGVERSION = '1.5.2'
PKGVERSION = '1.6.0'
BASEDIR = os.path.dirname(__file__)
PKGDIR = PKGNAME if not BASEDIR else os.path.join(BASEDIR, PKGNAME)

Expand Down Expand Up @@ -54,7 +54,6 @@ def build_extensions(self):
build_ext.build_extensions(self)



for filename in os.listdir(PKGDIR):
for ext in (".cpp", ".c", ".pyx", ".html"):
if filename.endswith(ext):
Expand Down

0 comments on commit dab986a

Please sign in to comment.