diff --git a/doc/entry-filters.md b/doc/entry-filters.md index eee7b2d1f..846002bc8 100644 --- a/doc/entry-filters.md +++ b/doc/entry-filters.md @@ -20,5 +20,4 @@ | `remove_empty_dup_alt_words` | Yes | | Remove empty and duplicate alternate words | | `prevent_duplicate_words` | No | | Prevent duplicate words | | `strip_full_html` | No | | Replace a full HTML document with it's body | -| `progressbar` | No | | Progress Bar | | `max_memory_usage` | No | | Show Max Memory Usage | diff --git a/pyglossary/entry_filters.py b/pyglossary/entry_filters.py index dbf149a46..886ea2852 100644 --- a/pyglossary/entry_filters.py +++ b/pyglossary/entry_filters.py @@ -22,7 +22,6 @@ "PreventDuplicateWords", "RemoveHtmlTagsAll", "ShowMaxMemoryUsage", - "ShowProgressBar", "StripFullHtml", "entryFiltersRules", ] @@ -459,43 +458,6 @@ def run(self, entry: EntryType) -> EntryType | None: return entry -class ShowProgressBar(EntryFilter): - name = "progressbar" - desc = "Progress Bar" - - def __init__(self, glos: _GlossaryType) -> None: - EntryFilter.__init__(self, glos) - self._wordCount = -1 - self._wordCountThreshold = 0 - self._lastPos = 0 - self._index = 0 - - def run(self, entry: EntryType) -> EntryType | None: - index = self._index - self._index = index + 1 - - if entry is not None and (bp := entry.byteProgress()): - if bp[0] > self._lastPos + 100_000: - self.glos.progress(bp[0], bp[1], unit="bytes") - self._lastPos = bp[0] - return entry - - if self._wordCount == -1: - self._wordCount = len(self.glos) - self._wordCountThreshold = max( - 1, - min( - 500, - self._wordCount // 200, - ), - ) - - if self._wordCount > 1 and index % self._wordCountThreshold == 0: - self.glos.progress(index, self._wordCount) - - return entry - - class ShowMaxMemoryUsage(EntryFilter): name = "max_memory_usage" desc = "Show Max Memory Usage" @@ -548,6 +510,5 @@ def run(self, entry: EntryType) -> EntryType | None: (None, False, StripFullHtml), # ------------------------------------- # filters are added conditionally (other than with config or glossary methods): - (None, False, ShowProgressBar), (None, False, ShowMaxMemoryUsage), ] diff --git a/pyglossary/glossary_progress.py b/pyglossary/glossary_progress.py index b5513c056..dc748bcb4 100644 --- a/pyglossary/glossary_progress.py +++ b/pyglossary/glossary_progress.py @@ -5,8 +5,19 @@ from .core import log if TYPE_CHECKING: + from collections.abc import Iterable, Iterator + from typing import Protocol + + from pyglossary.glossary_types import EntryType + from .ui_type import UIType + class ReaderType(Protocol): + def __iter__(self) -> Iterator[EntryType]: ... + + def __len__(self) -> int: ... + + __all__ = ["GlossaryProgress"] @@ -50,3 +61,42 @@ def progress(self, pos: int, total: int, unit: str = "entries") -> None: def progressEnd(self) -> None: if self._ui and self._progressbar: self._ui.progressEnd() + + def _byteProgressIter( + self, + iterable: Iterable[EntryType], + ) -> Iterator[EntryType]: + lastPos = 0 + for entry in iterable: + if entry is None: + continue + yield entry + if (bp := entry.byteProgress()) and bp[0] > lastPos + 100_000: + self.progress(bp[0], bp[1], unit="bytes") + lastPos = bp[0] + + def _wordCountProgressIter( + self, + iterable: Iterable[EntryType], + wordCount: int, + ) -> Iterator[EntryType]: + wordCountThreshold = max( + 1, + min( + 500, + wordCount // 200, + ), + ) + for index, entry in enumerate(iterable): + yield entry + if index % wordCountThreshold == 0: + self.progress(index, wordCount) + + def _progressIter(self, reader: ReaderType) -> Iterable[EntryType]: + if not self.progressbar: + return reader + if getattr(reader, "useByteProgress", False): + return self._byteProgressIter(reader) + if (wordCount := len(reader)) > 0: + return self._wordCountProgressIter(reader, wordCount) + return self._byteProgressIter(reader) diff --git a/pyglossary/glossary_v2.py b/pyglossary/glossary_v2.py index 090d88098..f526e538e 100644 --- a/pyglossary/glossary_v2.py +++ b/pyglossary/glossary_v2.py @@ -34,6 +34,8 @@ from typing import TYPE_CHECKING, cast from uuid import uuid1 +from pyglossary.queued_iter import QueuedIterator + from . import core from .core import ( cacheDir, @@ -45,7 +47,6 @@ PreventDuplicateWords, RemoveHtmlTagsAll, ShowMaxMemoryUsage, - ShowProgressBar, StripFullHtml, entryFiltersRules, ) @@ -65,7 +66,7 @@ from .sq_entry_list import SqEntryList if TYPE_CHECKING: - from collections.abc import Callable, Iterator + from collections.abc import Callable, Iterable, Iterator from typing import ( Any, ) @@ -304,9 +305,6 @@ def updateEntryFilters(self) -> None: args = [value] entryFilters.append(filterClass(glosArg, *tuple(args))) - if self.progressbar: - entryFilters.append(ShowProgressBar(glosArg)) - if log.level <= core.TRACE: try: import psutil # noqa: F401 @@ -416,12 +414,17 @@ def _loadedEntryGen(self) -> Iterator[EntryType]: yield from self._data return + iterable = self._progressIter(self._data) + filters = self._entryFiltersExtra - if self.progressbar: - filters.append(ShowProgressBar(self)) # pyright: ignore[reportArgumentType] + if not filters: + self.progressInit("Writing") + yield from iterable + self.progressEnd() + return self.progressInit("Writing") - for _entry in self._data: + for _entry in iterable: entry = _entry for f in filters: entry = f.run(entry) # type: ignore # pyright: ignore[reportArgumentType] @@ -432,8 +435,18 @@ def _loadedEntryGen(self) -> Iterator[EntryType]: def _readersEntryGen(self) -> Iterator[EntryType]: for reader in self._readers: self.progressInit("Converting") + + iterator = self._progressIter(reader) + + iterator = self._applyEntryFiltersGen(iterator) + + # turn iterator into background-queued, like buffered channel in Go + queueSize = os.getenv("PYGLOSSARY_ASYNC_ITER_SIZE") + if queueSize: + iterator = QueuedIterator(iterator, int(queueSize)) + try: - yield from self._applyEntryFiltersGen(reader) + yield from iterator finally: reader.close() self.progressEnd() @@ -444,10 +457,11 @@ def _readersEntryGen(self) -> Iterator[EntryType]: # no point of returning None entries anymore. def _applyEntryFiltersGen( self, - gen: Iterator[EntryType], + iterable: Iterable[EntryType], ) -> Iterator[EntryType]: entry: EntryType | None - for entry in gen: + + for entry in iterable: if entry is None: continue for entryFilter in self._entryFilters: @@ -795,8 +809,10 @@ def loadReader(self, reader: Any) -> None: # noqa: ANN401 showMemoryUsage() self.progressInit("Reading") + iterator = self._progressIter(reader) + iterator = self._applyEntryFiltersGen(iterator) try: - for entry in self._applyEntryFiltersGen(reader): + for entry in iterator: self.addEntry(entry) finally: reader.close() diff --git a/pyglossary/queued_iter.py b/pyglossary/queued_iter.py new file mode 100644 index 000000000..4fc641b03 --- /dev/null +++ b/pyglossary/queued_iter.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import queue +import threading +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Iterator + + +class QueuedIterator: + def __init__( + self, + iterator: Iterator, + max_size: int, + ) -> None: + self.iterator = iterator + self.queue = queue.Queue(max_size) + self.thread = threading.Thread(target=self._background_job) + self.thread.start() + + def _background_job(self) -> None: + for item in self.iterator: + self.queue.put(item) + self.queue.put(StopIteration) + + def __iter__(self) -> Iterator: + return self + + def __next__(self) -> Any: + item = self.queue.get() + if item is StopIteration: + raise StopIteration + return item