Skip to content

Commit

Permalink
#1 GeneratorExit
Browse files Browse the repository at this point in the history
  • Loading branch information
observerss committed Mar 5, 2015
1 parent 10b5362 commit 04e0642
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 9 deletions.
7 changes: 7 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
## History

### 2015.03.05

0.3.8 release

- when GeneratorExit cancel all pending tasks
- yielder related status cleanups

### 2015.03.02

0.3.4-0.3.7 release
Expand Down
2 changes: 1 addition & 1 deletion aioutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

__all__ = ['Pool', 'Group', 'Bag', 'OrderedBag',
'Yielder', 'OrderedYielder', 'yielding', 'ordered_yielding']
__version__ = '0.3.7'
__version__ = '0.3.8'
30 changes: 22 additions & 8 deletions aioutils/yielder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ def __init__(self, pool_size=None):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.sem = asyncio.Semaphore(pool_size) if pool_size else None
self._prepare()

def _prepare(self):
self.counter = 0
self.done = collections.deque()
self.getters = collections.deque()
self.tasks = []

def spawn(self, coro):
task = self._async_task(coro)
task.add_done_callback(self._on_completion)
self.counter += 1
self.tasks.append(task)
return task

def _async_task(self, coro):
Expand Down Expand Up @@ -83,15 +88,27 @@ def _yielding(self):
self.loop.run_forever()

def yielding(self):
for x in self._yielding():
if isinstance(x, asyncio.Future):
continue
yield x
try:
for x in self._yielding():
if isinstance(x, asyncio.Future):
continue
yield x
except GeneratorExit:
for task in self.tasks:
if not task.done():
task.cancel()
self.counter -= 1

self._prepare()


class OrderedYielder(Yielder):
def __init__(self, pool_size=None):
super(OrderedYielder, self).__init__(pool_size)
self._prepare()

def _prepare(self):
super(OrderedYielder, self)._prepare()
self.done = []
self.order = 0
self.yield_counter = 0
Expand All @@ -102,6 +119,7 @@ def spawn(self, coro):
task.add_done_callback(
functools.partial(self._on_completion, order=self.order))
self.counter += 1
self.tasks.append(task)
return task

def _on_completion(self, f, order):
Expand Down Expand Up @@ -141,10 +159,6 @@ def _yielding(self, heappop=heapq.heappop):
if not self.loop.is_running():
self.loop.run_forever()

self.done = []
self.order = 0
self.yield_counter = 0


class YieldingContext(object):
def __init__(self, pool_size=None, ordered=False):
Expand Down
21 changes: 21 additions & 0 deletions tests/test_yielder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def f(c):
yield from asyncio.sleep(random.random()*.1)
return c


def test_yielder():
def gen_func():
y = Yielder()
Expand Down Expand Up @@ -103,6 +104,7 @@ def gen_func():
# I don't know how to assert correctness, >.<
assert set(gen_func()) == set(chars)


def test_empty_yielder():
def gen_func():
with yielding() as y:
Expand All @@ -129,10 +131,29 @@ def g(i):
assert ''.join(list(gen_func())) == 'abcabcd'


def test_break_from_yielding():
@asyncio.coroutine
def g(c):
yield from asyncio.sleep(random.random()*.1)
return c
y = Yielder()

def gen_func():
for c in 'abcdefg':
y.spawn(g(c))
yield from y.yielding()

for x in gen_func():
break

assert y.counter == 0


if __name__ == '__main__':
test_yielder()
test_ordered_yielder()
test_yielding()
test_yielder_with_pool_size()
test_empty_yielder()
test_two_level_ordered_yielding()
test_break_from_yielding()

0 comments on commit 04e0642

Please sign in to comment.