Skip to content

Commit

Permalink
Make clean return non-zero exit status when a filter fails
Browse files Browse the repository at this point in the history
Fixes #106.
  • Loading branch information
jelmervdl committed Aug 21, 2023
1 parent e59216f commit ab02f0c
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 6 deletions.
21 changes: 20 additions & 1 deletion opuscleaner/_util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
from typing import TypeVar, Optional

from threading import Thread

T = TypeVar("T")

def none_throws(optional: Optional[T], message: str = "Unexpected `None`") -> T:
if optional is None:
raise AssertionError(message)
return optional


class RaisingThread(Thread):
"""Thread that will raise any uncaught exceptions in the thread in the
parent once it joins again."""

exception: Optional[Exception]

def run(self):
self.exception = None
try:
super().run()
except Exception as exc:
self.exception = exc

def join(self, timeout:float=None):
super().join(timeout=timeout)
if self.exception is not None:
raise self.exception
10 changes: 5 additions & 5 deletions opuscleaner/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from opuscleaner.config import COL_PY, FILTER_PATH
from opuscleaner.filters import list_filters, set_global_filters, filter_format_command, Filter, FilterStep, FilterPipeline
from opuscleaner._util import none_throws
from opuscleaner._util import none_throws, RaisingThread


# Queue for printing lines to stdout or stderr. None means end of input.
Expand Down Expand Up @@ -399,7 +399,7 @@ def run_parallel(pipeline:Pipeline, stdin:BinaryIO, stdout:BinaryIO, *, parallel
# Read `batch_queue` for batch filenames, and process them. Put output files
# on `merge_queue`.
runners = [
Thread(target=run_pipeline, args=[print_queue, batch_queue, merge_queue, pipeline, f'{n}/'])
RaisingThread(target=run_pipeline, args=[print_queue, batch_queue, merge_queue, pipeline, f'{n}/'])
for n in range(parallel)
]

Expand All @@ -416,13 +416,13 @@ def run_parallel(pipeline:Pipeline, stdin:BinaryIO, stdout:BinaryIO, *, parallel
# using a blocking size-limited Queue() to control the splitter's progress
# but a SimpleQueue() that sends messages.

print_queue.put(f'[run.py] Waiting for splitter to finish\n'.encode())
splitter.join()

print_queue.put(f'[run.py] Waiting for pipelines to finish\n'.encode())
for runner in runners:
runner.join()

print_queue.put(f'[run.py] Waiting for splitter to finish\n'.encode())
splitter.join()

print_queue.put(f'[run.py] Waiting for merger to finish\n'.encode())
merger.join()

Expand Down
Binary file not shown.
Binary file not shown.
11 changes: 11 additions & 0 deletions test/deeper/filters/fail.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"description": "Crashes",
"type": "monolingual",
"command": "python3 -c 'import sys\nsys.exit(int(sys.argv[1]))' $EXITCODE",
"parameters": {
"EXITCODE": {
"type": "int",
"default": 1
}
}
}
79 changes: 79 additions & 0 deletions test/test_clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import sys
import os
import unittest
import subprocess
import json
from typing import List
from pathlib import Path
from tempfile import NamedTemporaryFile


TEST_CWD = Path(os.path.join(os.path.dirname(__file__), 'deeper'))

FILES = [
"bible-uedin-v1.de-en.de.gz",
"bible-uedin-v1.de-en.en.gz"
]


class TestClean(unittest.TestCase):
def _run(self, args:List[str]):
proc = subprocess.Popen(
args=[sys.executable, '-m', 'opuscleaner.clean'] + args,
cwd=TEST_CWD, # so it can find filters
env={
'PYTHONPATH': os.path.join(os.path.dirname(__file__), '..') # so it can find opuscleaner code
},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

out, err = proc.communicate()
proc.wait()
return out, err, proc.returncode

def test_simple(self):
"""Test that clean runs"""
config = {
"version": 1,
"files": FILES,
"filters": [
{
"filter": "deescape_tsv",
"parameters": {},
"language": None
}
]
}
with NamedTemporaryFile(mode='w', dir=TEST_CWD / 'data/train-parts') as fh:
json.dump(config, fh)
fh.flush()
for mode in [[], ['--parallel', '1']]:
with self.subTest(mode=mode):
out, err, retval = self._run([*mode, fh.name])
self.assertEqual(out.count(b'\n'), 62195)
self.assertEqual(retval, 0)

def test_filter_fail(self):
"""Test that clean returns a non-zero exit code if a filter fails"""
config = {
"version": 1,
"files": FILES,
"filters": [
{
"filter": "fail",
"parameters": {
"EXITCODE": "42"
},
"language": "de"
}
]
}
with NamedTemporaryFile(mode='w', dir=TEST_CWD / 'data/train-parts') as fh:
json.dump(config, fh)
fh.flush()

for mode in [[], ['--parallel', '1']]:
with self.subTest(mode=mode):
out, err, retval = self._run([*mode, fh.name])
self.assertEqual(out.count(b'\n'), 0)
self.assertNotEqual(retval, 0)

0 comments on commit ab02f0c

Please sign in to comment.