Skip to content

Commit

Permalink
allow enabling queued/async iterator with PYGLOSSARY_ASYNC_ITER_SIZE env
Browse files Browse the repository at this point in the history
  • Loading branch information
ilius committed Jan 6, 2025
1 parent d8e9b3a commit 02e8a82
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pyglossary/glossary_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from typing import TYPE_CHECKING, cast
from uuid import uuid1

from pyglossary.queued_iter import QueuedIterator

from . import core
from .core import (
cacheDir,
Expand Down Expand Up @@ -440,6 +442,11 @@ def _readersEntryGen(self) -> Iterator[EntryType]:

iterator = self._applyEntryFiltersGen(iterator)

# turn iterator into background-queued, like buffered channel in Go
queueSize = os.getenv("PYGLOSSARY_ASYNC_ITER_SIZE")
if queueSize:
iterator = QueuedIterator(iterator, int(queueSize))

try:
yield from iterator
finally:
Expand Down
34 changes: 34 additions & 0 deletions pyglossary/queued_iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import queue
import threading
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import Iterator


class QueuedIterator:
def __init__(
self,
iterator: Iterator,
max_size: int,
) -> None:
self.iterator = iterator
self.queue = queue.Queue(max_size)
self.thread = threading.Thread(target=self._background_job)
self.thread.start()

def _background_job(self) -> None:
for item in self.iterator:
self.queue.put(item)
self.queue.put(StopIteration)

def __iter__(self) -> Iterator:
return self

def __next__(self) -> Any:
item = self.queue.get()
if item is StopIteration:
raise StopIteration
return item

0 comments on commit 02e8a82

Please sign in to comment.