Skip to content

Commit

Permalink
glossary_v2.py: refactor progress bar handling logic
Browse files Browse the repository at this point in the history
allow enabling queued/async iterator with PYGLOSSARY_ASYNC_ITER_SIZE env
  • Loading branch information
ilius committed Jan 6, 2025
1 parent aef894a commit 5901c7b
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 7 deletions.
52 changes: 52 additions & 0 deletions pyglossary/glossary_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,21 @@
from .core import log

if TYPE_CHECKING:
from collections.abc import Iterator
from typing import Protocol

from pyglossary.glossary_types import EntryType

from .ui_type import UIType

class ReaderType(Protocol):
def __iter__(self) -> Iterator[EntryType]: ...

def __next__(self) -> EntryType: ...

def __len__(self) -> int: ...


__all__ = ["GlossaryProgress"]


Expand Down Expand Up @@ -50,3 +63,42 @@ def progress(self, pos: int, total: int, unit: str = "entries") -> None:
def progressEnd(self) -> None:
if self._ui and self._progressbar:
self._ui.progressEnd()

def _byteProgressIter(
self,
iterator: Iterator[EntryType],
) -> Iterator[EntryType]:
lastPos = 0
for entry in iterator:
if entry is None:
continue
yield entry
if (bp := entry.byteProgress()) and bp[0] > lastPos + 100_000:
self.progress(bp[0], bp[1], unit="bytes")
lastPos = bp[0]

def _wordCountProgressIter(
self,
iterator: Iterator[EntryType],
wordCount: int,
) -> Iterator[EntryType]:
wordCountThreshold = max(
1,
min(
500,
wordCount // 200,
),
)
for index, entry in enumerate(iterator):
yield entry
if index % wordCountThreshold == 0:
self.progress(index, wordCount)

def _progressIter(self, reader: ReaderType) -> Iterator[EntryType]:
if not self.progressbar:
return reader
if getattr(reader, "useByteProgress", False):
return self._byteProgressIter(reader)
if (wordCount := len(reader)) > 0:
return self._wordCountProgressIter(reader, wordCount)
return self._byteProgressIter(reader)
26 changes: 19 additions & 7 deletions pyglossary/glossary_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from typing import TYPE_CHECKING, cast
from uuid import uuid1

from pyglossary.queued_iter import QueuedIterator

from . import core
from .core import (
cacheDir,
Expand Down Expand Up @@ -304,9 +306,6 @@ def updateEntryFilters(self) -> None:
args = [value]
entryFilters.append(filterClass(glosArg, *tuple(args)))

if self.progressbar:
entryFilters.append(ShowProgressBar(glosArg))

if log.level <= core.TRACE:
try:
import psutil # noqa: F401
Expand Down Expand Up @@ -432,8 +431,18 @@ def _loadedEntryGen(self) -> Iterator[EntryType]:
def _readersEntryGen(self) -> Iterator[EntryType]:
for reader in self._readers:
self.progressInit("Converting")

iterator = self._progressIter(reader)

iterator = self._applyEntryFiltersGen(iterator)

# turn iterator into background-queued, like buffered channel in Go
queueSize = os.getenv("PYGLOSSARY_ASYNC_ITER_SIZE")
if queueSize:
iterator = QueuedIterator(iterator, int(queueSize))

try:
yield from self._applyEntryFiltersGen(reader)
yield from iterator
finally:
reader.close()
self.progressEnd()
Expand All @@ -444,10 +453,11 @@ def _readersEntryGen(self) -> Iterator[EntryType]:
# no point of returning None entries anymore.
def _applyEntryFiltersGen(
self,
gen: Iterator[EntryType],
iterator: Iterator[EntryType],
) -> Iterator[EntryType]:
entry: EntryType | None
for entry in gen:

for entry in iterator:
if entry is None:
continue
for entryFilter in self._entryFilters:
Expand Down Expand Up @@ -795,8 +805,10 @@ def loadReader(self, reader: Any) -> None: # noqa: ANN401
showMemoryUsage()

self.progressInit("Reading")
iterator = self._progressIter(reader)
iterator = self._applyEntryFiltersGen(iterator)
try:
for entry in self._applyEntryFiltersGen(reader):
for entry in iterator:
self.addEntry(entry)
finally:
reader.close()
Expand Down
34 changes: 34 additions & 0 deletions pyglossary/queued_iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import queue
import threading
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import Iterator


class QueuedIterator:
def __init__(
self,
iterator: Iterator,
max_size: int,
) -> None:
self.iterator = iterator
self.queue = queue.Queue(max_size)
self.thread = threading.Thread(target=self._background_job)
self.thread.start()

def _background_job(self) -> None:
for item in self.iterator:
self.queue.put(item)
self.queue.put(StopIteration)

def __iter__(self) -> Iterator:
return self

def __next__(self) -> Any:
item = self.queue.get()
if item is StopIteration:
raise StopIteration
return item

0 comments on commit 5901c7b

Please sign in to comment.