Skip to content

Commit

Permalink
glossary_v2.py: refactor progress bar handling logic, also:
Browse files Browse the repository at this point in the history
- remove progressbar entry filter
- allow enabling queued/async iteration with PYGLOSSARY_ASYNC_ITER_SIZE env var
  • Loading branch information
ilius committed Jan 6, 2025
1 parent aef894a commit 72357e0
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 52 deletions.
1 change: 0 additions & 1 deletion doc/entry-filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@
| `remove_empty_dup_alt_words` | Yes | | Remove empty and duplicate alternate words |
| `prevent_duplicate_words` | No | | Prevent duplicate words |
| `strip_full_html` | No | | Replace a full HTML document with it's body |
| `progressbar` | No | | Progress Bar |
| `max_memory_usage` | No | | Show Max Memory Usage |
39 changes: 0 additions & 39 deletions pyglossary/entry_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"PreventDuplicateWords",
"RemoveHtmlTagsAll",
"ShowMaxMemoryUsage",
"ShowProgressBar",
"StripFullHtml",
"entryFiltersRules",
]
Expand Down Expand Up @@ -459,43 +458,6 @@ def run(self, entry: EntryType) -> EntryType | None:
return entry


class ShowProgressBar(EntryFilter):
name = "progressbar"
desc = "Progress Bar"

def __init__(self, glos: _GlossaryType) -> None:
EntryFilter.__init__(self, glos)
self._wordCount = -1
self._wordCountThreshold = 0
self._lastPos = 0
self._index = 0

def run(self, entry: EntryType) -> EntryType | None:
index = self._index
self._index = index + 1

if entry is not None and (bp := entry.byteProgress()):
if bp[0] > self._lastPos + 100_000:
self.glos.progress(bp[0], bp[1], unit="bytes")
self._lastPos = bp[0]
return entry

if self._wordCount == -1:
self._wordCount = len(self.glos)
self._wordCountThreshold = max(
1,
min(
500,
self._wordCount // 200,
),
)

if self._wordCount > 1 and index % self._wordCountThreshold == 0:
self.glos.progress(index, self._wordCount)

return entry


class ShowMaxMemoryUsage(EntryFilter):
name = "max_memory_usage"
desc = "Show Max Memory Usage"
Expand Down Expand Up @@ -548,6 +510,5 @@ def run(self, entry: EntryType) -> EntryType | None:
(None, False, StripFullHtml),
# -------------------------------------
# filters are added conditionally (other than with config or glossary methods):
(None, False, ShowProgressBar),
(None, False, ShowMaxMemoryUsage),
]
50 changes: 50 additions & 0 deletions pyglossary/glossary_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,19 @@
from .core import log

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
from typing import Protocol

from pyglossary.glossary_types import EntryType

from .ui_type import UIType

class ReaderType(Protocol):
def __iter__(self) -> Iterator[EntryType]: ...

def __len__(self) -> int: ...


__all__ = ["GlossaryProgress"]


Expand Down Expand Up @@ -50,3 +61,42 @@ def progress(self, pos: int, total: int, unit: str = "entries") -> None:
def progressEnd(self) -> None:
if self._ui and self._progressbar:
self._ui.progressEnd()

def _byteProgressIter(
self,
iterable: Iterable[EntryType],
) -> Iterator[EntryType]:
lastPos = 0
for entry in iterable:
if entry is None:
continue
yield entry
if (bp := entry.byteProgress()) and bp[0] > lastPos + 100_000:
self.progress(bp[0], bp[1], unit="bytes")
lastPos = bp[0]

def _wordCountProgressIter(
self,
iterable: Iterable[EntryType],
wordCount: int,
) -> Iterator[EntryType]:
wordCountThreshold = max(
1,
min(
500,
wordCount // 200,
),
)
for index, entry in enumerate(iterable):
yield entry
if index % wordCountThreshold == 0:
self.progress(index, wordCount)

def _progressIter(self, reader: ReaderType) -> Iterable[EntryType]:
if not self.progressbar:
return reader
if getattr(reader, "useByteProgress", False):
return self._byteProgressIter(reader)
if (wordCount := len(reader)) > 0:
return self._wordCountProgressIter(reader, wordCount)
return self._byteProgressIter(reader)
40 changes: 28 additions & 12 deletions pyglossary/glossary_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from typing import TYPE_CHECKING, cast
from uuid import uuid1

from pyglossary.queued_iter import QueuedIterator

from . import core
from .core import (
cacheDir,
Expand All @@ -45,7 +47,6 @@
PreventDuplicateWords,
RemoveHtmlTagsAll,
ShowMaxMemoryUsage,
ShowProgressBar,
StripFullHtml,
entryFiltersRules,
)
Expand All @@ -65,7 +66,7 @@
from .sq_entry_list import SqEntryList

if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from collections.abc import Callable, Iterable, Iterator
from typing import (
Any,
)
Expand Down Expand Up @@ -304,9 +305,6 @@ def updateEntryFilters(self) -> None:
args = [value]
entryFilters.append(filterClass(glosArg, *tuple(args)))

if self.progressbar:
entryFilters.append(ShowProgressBar(glosArg))

if log.level <= core.TRACE:
try:
import psutil # noqa: F401
Expand Down Expand Up @@ -416,12 +414,17 @@ def _loadedEntryGen(self) -> Iterator[EntryType]:
yield from self._data
return

iterable = self._progressIter(self._data)

filters = self._entryFiltersExtra
if self.progressbar:
filters.append(ShowProgressBar(self)) # pyright: ignore[reportArgumentType]
if not filters:
self.progressInit("Writing")
yield from iterable
self.progressEnd()
return

self.progressInit("Writing")
for _entry in self._data:
for _entry in iterable:
entry = _entry
for f in filters:
entry = f.run(entry) # type: ignore # pyright: ignore[reportArgumentType]
Expand All @@ -432,8 +435,18 @@ def _loadedEntryGen(self) -> Iterator[EntryType]:
def _readersEntryGen(self) -> Iterator[EntryType]:
for reader in self._readers:
self.progressInit("Converting")

iterator = self._progressIter(reader)

iterator = self._applyEntryFiltersGen(iterator)

# turn iterator into background-queued, like buffered channel in Go
queueSize = os.getenv("PYGLOSSARY_ASYNC_ITER_SIZE")
if queueSize:
iterator = QueuedIterator(iterator, int(queueSize))

try:
yield from self._applyEntryFiltersGen(reader)
yield from iterator
finally:
reader.close()
self.progressEnd()
Expand All @@ -444,10 +457,11 @@ def _readersEntryGen(self) -> Iterator[EntryType]:
# no point of returning None entries anymore.
def _applyEntryFiltersGen(
self,
gen: Iterator[EntryType],
iterable: Iterable[EntryType],
) -> Iterator[EntryType]:
entry: EntryType | None
for entry in gen:

for entry in iterable:
if entry is None:
continue
for entryFilter in self._entryFilters:
Expand Down Expand Up @@ -795,8 +809,10 @@ def loadReader(self, reader: Any) -> None: # noqa: ANN401
showMemoryUsage()

self.progressInit("Reading")
iterator = self._progressIter(reader)
iterator = self._applyEntryFiltersGen(iterator)
try:
for entry in self._applyEntryFiltersGen(reader):
for entry in iterator:
self.addEntry(entry)
finally:
reader.close()
Expand Down
34 changes: 34 additions & 0 deletions pyglossary/queued_iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import queue
import threading
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import Iterator


class QueuedIterator:
def __init__(
self,
iterator: Iterator,
max_size: int,
) -> None:
self.iterator = iterator
self.queue = queue.Queue(max_size)
self.thread = threading.Thread(target=self._background_job)
self.thread.start()

def _background_job(self) -> None:
for item in self.iterator:
self.queue.put(item)
self.queue.put(StopIteration)

def __iter__(self) -> Iterator:
return self

def __next__(self) -> Any:
item = self.queue.get()
if item is StopIteration:
raise StopIteration
return item

0 comments on commit 72357e0

Please sign in to comment.