diff --git a/Dockerfile b/Dockerfile index 07e999d..8000089 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,16 @@ FROM python:3.7 -# Installing ffmpeg is needed for working with timelapses - this can be ommitted otherwise +# Installing ffmpeg is needed for working with timelapses - can be ommitted otherwise RUN apt-get update && apt-get -y install --no-install-recommends ffmpeg && rm -rf /var/lib/apt/lists/* +# IPFS installation for LAN filesharing +RUN wget https://dist.ipfs.tech/kubo/v0.15.0/kubo_v0.15.0_linux-amd64.tar.gz \ + && tar -xvzf kubo_v0.15.0_linux-amd64.tar.gz \ + && cd kubo \ + && bash -c ". ./install.sh" \ + && ipfs --version + + RUN adduser oprint USER oprint diff --git a/continuousprint/__init__.py b/continuousprint/__init__.py index 073a225..03b63f8 100644 --- a/continuousprint/__init__.py +++ b/continuousprint/__init__.py @@ -8,6 +8,7 @@ PRINTER_PROFILES, GCODE_SCRIPTS, Keys, + CustomEvents, ASSETS, TEMPLATES, update_info, @@ -58,6 +59,7 @@ def on_startup(self, host=None, port=None): self._logger, self._identifier, self._basefolder, + self._event_bus.fire, ) def on_after_startup(self): @@ -73,6 +75,9 @@ def on_after_startup(self): # ------------------------ Begin EventHandlerPlugin -------------------- + def register_custom_events(*args, **kwargs): + return [CustomEvents.__members__.values()] + def on_event(self, event, payload): if not hasattr(self, "_plugin"): return diff --git a/continuousprint/api.py b/continuousprint/api.py index f045cf4..055d284 100644 --- a/continuousprint/api.py +++ b/continuousprint/api.py @@ -2,6 +2,7 @@ from enum import Enum from octoprint.access.permissions import Permissions, ADMIN_GROUP from octoprint.server.util.flask import restricted_access +from .queues.lan import ValidationError import flask import json from .storage import queries @@ -174,50 +175,32 @@ def add_job(self): self._get_queue(DEFAULT_QUEUE).add_job(data.get("name")).as_dict() ) - # PRIVATE API METHOD - may change without warning. - @octoprint.plugin.BlueprintPlugin.route("/set/mv", methods=["POST"]) - @restricted_access - @cpq_permission(Permission.EDITJOB) - def mv_set(self): - self._get_queue(DEFAULT_QUEUE).mv_set( - int(flask.request.form["id"]), - int( - flask.request.form["after_id"] - ), # Move to after this set (-1 for beginning of job) - int( - flask.request.form["dest_job"] - ), # Move to this job (null for new job at end) - ) - return json.dumps("ok") - # PRIVATE API METHOD - may change without warning. @octoprint.plugin.BlueprintPlugin.route("/job/mv", methods=["POST"]) @restricted_access @cpq_permission(Permission.EDITJOB) def mv_job(self): - self._get_queue(DEFAULT_QUEUE).mv_job( - int(flask.request.form["id"]), - int( - flask.request.form["after_id"] - ), # Move to after this job (-1 for beginning of queue) - ) - return json.dumps("ok") + src_id = flask.request.form["id"] + after_id = flask.request.form["after_id"] + if after_id == "": # Treat empty string as 'none' i.e. front of queue + after_id = None + sq = self._get_queue(flask.request.form["src_queue"]) + dq = self._get_queue(flask.request.form.get("dest_queue")) + + # Transfer into dest queue first + if dq != sq: + try: + new_id = dq.import_job_from_view(sq.get_job_view(src_id)) + except ValidationError as e: + return json.dumps(dict(error=str(e))) - # PRIVATE API METHOD - may change without warning. - @octoprint.plugin.BlueprintPlugin.route("/job/submit", methods=["POST"]) - @restricted_access - @cpq_permission(Permission.ADDJOB) - def submit_job(self): - j = queries.getJob(int(flask.request.form["id"])) - # Submit to the queue and remove from its origin - err = self._get_queue(flask.request.form["queue"]).submit_job(j) - if err is None: - self._logger.debug( - self._get_queue(DEFAULT_QUEUE).remove_jobs(job_ids=[j.id]) - ) - return self._state_json() - else: - return json.dumps(dict(error=str(err))) + print("Imported job from view") + sq.remove_jobs([src_id]) + src_id = new_id + + # Finally, move the job + dq.mv_job(src_id, after_id) + return json.dumps("OK") # PRIVATE API METHOD - may change without warning. @octoprint.plugin.BlueprintPlugin.route("/job/edit", methods=["POST"]) @@ -225,7 +208,8 @@ def submit_job(self): @cpq_permission(Permission.EDITJOB) def edit_job(self): data = json.loads(flask.request.form.get("json")) - return json.dumps(self._get_queue(DEFAULT_QUEUE).edit_job(data["id"], data)) + q = self._get_queue(data["queue"]) + return json.dumps(q.edit_job(data["id"], data)) # PRIVATE API METHOD - may change without warning. @octoprint.plugin.BlueprintPlugin.route("/job/import", methods=["POST"]) @@ -270,17 +254,6 @@ def rm_job(self): ) ) - # PRIVATE API METHOD - may change without warning. - @octoprint.plugin.BlueprintPlugin.route("/set/rm", methods=["POST"]) - @restricted_access - @cpq_permission(Permission.EDITJOB) - def rm_set(self): - return json.dumps( - self._get_queue(DEFAULT_QUEUE).rm_multi( - set_ids=flask.request.form.getlist("set_ids[]") - ) - ) - # PRIVATE API METHOD - may change without warning. @octoprint.plugin.BlueprintPlugin.route("/job/reset", methods=["POST"]) @restricted_access diff --git a/continuousprint/data/__init__.py b/continuousprint/data/__init__.py index 639d7b8..b468e11 100644 --- a/continuousprint/data/__init__.py +++ b/continuousprint/data/__init__.py @@ -12,6 +12,14 @@ GCODE_SCRIPTS = dict((d["name"], d) for d in yaml.safe_load(f.read())["GScript"]) +class CustomEvents(Enum): + START_PRINT = "continuousprint_start_print" + COOLDOWN = "continuousprint_cooldown" + CLEAR_BED = "continuousprint_clear_bed" + FINISH = "continuousprint_finish" + CANCEL = "continuousprint_cancel" + + class Keys(Enum): # TODO migrate old setting names to enum names QUEUE = ("cp_queue", None) diff --git a/continuousprint/driver.py b/continuousprint/driver.py index 9bec01b..fa0a0a8 100644 --- a/continuousprint/driver.py +++ b/continuousprint/driver.py @@ -52,7 +52,9 @@ def __init__( self._set_status("Initializing") self.q = queue self.state = self._state_unknown - self.idle_start_ts = None + self.last_printer_state = None + self.printer_state_ts = 0 + self.printer_state_logs_suppressed = False self.retries = 0 self.retry_on_pause = False self.max_retries = 0 @@ -78,14 +80,21 @@ def action( # Given that some calls to action() come from a watchdog timer, we hold a mutex when performing the action # so the state is updated in a thread safe way. with self.mutex: - self._logger.debug( - f"{a.name}, {p.name}, path={path}, materials={materials}, bed_temp={bed_temp}" - ) + now = time.time() + if self.printer_state_ts + 15 > now or a != Action.TICK: + self._logger.debug( + f"{a.name}, {p.name}, path={path}, materials={materials}, bed_temp={bed_temp}" + ) + elif a == Action.TICK and not self.printer_state_logs_suppressed: + self.printer_state_logs_suppressed = True + self._logger.debug( + f"suppressing further debug logs for action=TICK, printer state={p.name}" + ) - if p == Printer.IDLE and self.idle_start_ts is None: - self.idle_start_ts = time.time() - elif p != Printer.IDLE and self.idle_start_ts is not None: - self.idle_start_ts = None + if p != self.last_printer_state: + self.printer_state_ts = now + self.last_printer_state = p + self.printer_state_logs_suppressed = False if path is not None: self._cur_path = path @@ -127,7 +136,9 @@ def _state_inactive(self, a: Action, p: Printer): if p == Printer.IDLE: self._set_status("Inactive (click Start Managing)") else: - self._set_status("Inactive (active print continues unmanaged)") + self._set_status( + "Inactive (active print continues unmanaged)", StatusType.NEEDS_ACTION + ) def _state_idle(self, a: Action, p: Printer): self.q.release() @@ -149,6 +160,15 @@ def _enter_start_print(self, a: Action, p: Printer): nxt = self._state_start_print(a, p) return nxt if nxt is not None else self._state_start_print + def _fmt_material_key(self, mk): + try: + s = mk.split("_") + return f"{s[0]} ({s[1]})" + except IndexError: + return mk + except AttributeError: + return mk + def _state_start_print(self, a: Action, p: Printer): if p != Printer.IDLE: self._set_status("Waiting for printer to be ready") @@ -166,7 +186,7 @@ def _state_start_print(self, a: Action, p: Printer): cur = self._cur_materials[i] if i < len(self._cur_materials) else None if im != cur: self._set_status( - f"Waiting for spool {im} in tool {i} (currently: {cur})", + f"Need {self._fmt_material_key(im)} in tool {i}, but {self._fmt_material_key(cur)} is loaded", StatusType.NEEDS_ACTION, ) return @@ -186,6 +206,14 @@ def _state_start_print(self, a: Action, p: Printer): StatusType.ERROR, ) + def _long_idle(self, p): + # We wait until we're in idle state for a long-ish period before acting, as + # IDLE can be returned as a state before another event-based action (e.g. SUCCESS) + return ( + p == Printer.IDLE + and time.time() - self.printer_state_ts > self.PRINTING_IDLE_BREAKOUT_SEC + ) + def _state_printing(self, a: Action, p: Printer, elapsed=None): if a == Action.FAILURE: return self._state_failure @@ -200,10 +228,7 @@ def _state_printing(self, a: Action, p: Printer, elapsed=None): StatusType.NEEDS_ACTION, ) return self._state_paused - elif a == Action.SUCCESS or ( - p == Printer.IDLE - and time.time() - self.idle_start_ts > self.PRINTING_IDLE_BREAKOUT_SEC - ): + elif a == Action.SUCCESS or self._long_idle(p): # If idle state without event, assume we somehow missed the SUCCESS action. # We wait for a period of idleness to prevent idle-before-success events # from double-completing prints. @@ -231,7 +256,7 @@ def _state_printing(self, a: Action, p: Printer, elapsed=None): def _state_paused(self, a: Action, p: Printer): self._set_status("Paused", StatusType.NEEDS_ACTION) - if p == Printer.IDLE: + if self._long_idle(p): # Here, IDLE implies the user cancelled the print. # Go inactive to prevent stomping on manual changes return self._state_inactive @@ -314,7 +339,7 @@ def _state_clearing(self, a: Action, p: Printer): self._set_status("Error when clearing bed - aborting", StatusType.ERROR) return self._state_inactive # Skip past failure state to inactive - if p == Printer.IDLE: # Idle state without event; assume success + if self._long_idle(p): # Idle state without event; assume success return self._enter_start_print(a, p) else: self._set_status("Clearing bed") @@ -332,7 +357,7 @@ def _state_finishing(self, a: Action, p: Printer): return self._state_inactive # Idle state without event -> assume success and go idle - if a == Action.SUCCESS or p == Printer.IDLE: + if a == Action.SUCCESS or self._long_idle(p): return self._state_idle self._set_status("Finishing up") diff --git a/continuousprint/driver_test.py b/continuousprint/driver_test.py index 4385b50..e40aa9c 100644 --- a/continuousprint/driver_test.py +++ b/continuousprint/driver_test.py @@ -78,7 +78,7 @@ def test_idle_while_printing(self): self.assertEqual(self.d.state.__name__, self.d._state_printing.__name__) # Continued idleness triggers bed clearing and such - self.d.idle_start_ts = time.time() - (Driver.PRINTING_IDLE_BREAKOUT_SEC + 1) + self.d.printer_state_ts = time.time() - (Driver.PRINTING_IDLE_BREAKOUT_SEC + 1) self.d.action(DA.TICK, DP.IDLE) self.assertEqual(self.d.state.__name__, self.d._state_start_clearing.__name__) @@ -153,6 +153,7 @@ def test_completed_last_print(self): ) # -> success self.d.q.get_set_or_acquire.return_value = None # Nothing more in the queue self.d.action(DA.TICK, DP.IDLE) # -> start_finishing + self.d.printer_state_ts = time.time() - (Driver.PRINTING_IDLE_BREAKOUT_SEC + 1) self.d.action(DA.TICK, DP.IDLE) # -> finishing self.d._runner.run_finish_script.assert_called() self.assertEqual(self.d.state.__name__, self.d._state_finishing.__name__) diff --git a/continuousprint/integration_test.py b/continuousprint/integration_test.py index 7d71327..f9f9613 100644 --- a/continuousprint/integration_test.py +++ b/continuousprint/integration_test.py @@ -15,6 +15,7 @@ from peewee import SqliteDatabase from collections import defaultdict from peerprint.lan_queue import LANPrintQueueBase +from peerprint.sync_objects_test import TestReplDict # logging.basicConfig(level=logging.DEBUG) @@ -61,7 +62,7 @@ def assert_from_printing_state(self, want_path, finishing=False): self.d.action(DA.TICK, DP.IDLE) # -> clearing self.d._runner.clear_bed.assert_called() self.d._runner.clear_bed.reset_mock() - self.d.action(DA.TICK, DP.IDLE) # -> start_print + self.d.action(DA.SUCCESS, DP.IDLE) # -> start_print else: # Finishing self.d.action(DA.TICK, DP.IDLE) # -> start_finishing self.assertEqual( @@ -70,7 +71,7 @@ def assert_from_printing_state(self, want_path, finishing=False): self.d.action(DA.TICK, DP.IDLE) # -> finishing self.d._runner.run_finish_script.assert_called() self.d._runner.run_finish_script.reset_mock() - self.d.action(DA.TICK, DP.IDLE) # -> inactive + self.d.action(DA.SUCCESS, DP.IDLE) # -> inactive self.assertEqual(self.d.state.__name__, self.d._state_idle.__name__) except AssertionError as e: raise AssertionError( @@ -157,6 +158,7 @@ def test_completes_job_in_order(self): queries.updateJob(1, dict(draft=False)) self.d.action(DA.ACTIVATE, DP.IDLE) # -> start_print -> printing + self.assertEqual(self.d.state, self.d._state_printing) self.assert_from_printing_state("a.gcode") self.assert_from_printing_state("a.gcode") self.assert_from_printing_state("b.gcode", finishing=True) @@ -183,11 +185,6 @@ def release(self, k): self.locks.pop(k, None) -class LocalJobDict(dict): - def set(self, k, v, **kwargs): - self[k] = v - - class TestLANQueue(IntegrationTest): """A simple in-memory integration test between DB storage layer, queuing layer, and driver.""" @@ -214,13 +211,14 @@ def setUp(self): self.lq.ns, self.lq.addr, MagicMock(), logging.getLogger("lantestbase") ) self.lq.lan.q.locks = LocalLockManager(dict(), "lq") - self.lq.lan.q.jobs = LocalJobDict() + self.lq.lan.q.jobs = TestReplDict(lambda a, b: None) self.lq.lan.q.peers = dict() def test_completes_job_in_order(self): self.lq.lan.q.setJob( - "bsdf", + "uuid1", dict( + id="uuid1", name="j1", created=0, sets=[ @@ -238,8 +236,9 @@ def test_completes_job_in_order(self): def test_multi_job(self): for name in ("j1", "j2"): self.lq.lan.q.setJob( - f"{name}_hash", + f"{name}_id", dict( + id=f"{name}_id", name=name, created=0, sets=[dict(path=f"{name}.gcode", count=1, remaining=1)], @@ -288,7 +287,7 @@ def onupdate(): lq.ns, lq.addr, MagicMock(), logging.getLogger("lantestbase") ) lq.lan.q.locks = LocalLockManager(self.locks, f"peer{i}") - lq.lan.q.jobs = LocalJobDict() + lq.lan.q.jobs = TestReplDict(lambda a, b: None) lq.lan.q.peers = self.peers if i > 0: lq.lan.q.peers = self.peers[0][2].lan.q.peers @@ -304,6 +303,7 @@ def test_ordered_acquisition(self): lq1.lan.q.setJob( f"{name}_hash", dict( + id=f"{name}_hash", name=name, created=0, sets=[ @@ -334,7 +334,7 @@ def test_ordered_acquisition(self): d1.action(DA.SUCCESS, DP.IDLE, path="j1.gcode") # -> success d1.action(DA.TICK, DP.IDLE) # -> start_clearing d1.action(DA.TICK, DP.IDLE) # -> clearing - d1.action(DA.TICK, DP.IDLE) # -> start_print + d1.action(DA.SUCCESS, DP.IDLE) # -> start_print self.assertEqual(d1._runner.start_print.call_args[0][0].path, "j3.gcode") self.assertEqual(self.locks["j3_hash"], "peer0") d1._runner.start_print.reset_mock() @@ -344,7 +344,7 @@ def test_ordered_acquisition(self): d2.action(DA.SUCCESS, DP.IDLE, path="j2.gcode") # -> success d2.action(DA.TICK, DP.IDLE) # -> start_finishing d2.action(DA.TICK, DP.IDLE) # -> finishing - d2.action(DA.TICK, DP.IDLE) # -> idle + d2.action(DA.SUCCESS, DP.IDLE) # -> idle self.assertEqual(d2.state.__name__, d2._state_idle.__name__) diff --git a/continuousprint/plugin.py b/continuousprint/plugin.py index 1ad734b..bcb194a 100644 --- a/continuousprint/plugin.py +++ b/continuousprint/plugin.py @@ -4,6 +4,7 @@ import socket import json import time +import shutil import traceback import random from pathlib import Path @@ -61,6 +62,7 @@ def __init__( logger, identifier, basefolder, + fire_event, ): self._basefolder = basefolder self._printer = printer @@ -74,6 +76,7 @@ def __init__( self._set_add_awaiting_metadata = dict() self._reconnect_attempts = 0 self._next_reconnect = 0 + self._fire_event = fire_event def start(self): self._setup_thirdparty_plugin_integration() @@ -239,10 +242,13 @@ def _setup_thirdparty_plugin_integration(self): ) def _init_fileshare(self, fs_cls=Fileshare): - fileshare_dir = self._path_on_disk(f"{PRINT_FILE_DIR}/fileshare/", sd=False) + # Note: fileshare_dir referenced when cleaning up old files + self.fileshare_dir = self._path_on_disk( + f"{PRINT_FILE_DIR}/fileshare/", sd=False + ) fileshare_addr = f"{self.get_local_ip()}:0" self._logger.info(f"Starting fileshare with address {fileshare_addr}") - self._fileshare = fs_cls(fileshare_addr, fileshare_dir, self._logger) + self._fileshare = fs_cls(fileshare_addr, self.fileshare_dir, self._logger) self._fileshare.connect() def _init_db(self): @@ -313,6 +319,7 @@ def _init_driver(self, srcls=ScriptRunner, dcls=Driver): self._logger, self._printer, self._sync_state, + self._fire_event, ) self.d = dcls( queue=self.q, @@ -402,6 +409,8 @@ def _enqueue_analysis_backlog(self): self._logger.info(f"Enqueued {counter} files for CPQ analysis") def _enqueue(self, path, high_priority=False): + if path in TEMP_FILES.values(): + return False # Exclude temp files from analysis queue_entry = QueueEntry( name=path.split("/")[-1], path=path, @@ -423,6 +432,36 @@ def _on_analysis_finished(self, entry, result): ) self.on_event(self.CPQ_ANALYSIS_FINISHED, dict(path=entry.path, result=result)) + def _cleanup_fileshare(self): + # This cleans up all non-useful fileshare files across all network queues, so they aren't just taking up space. + # First we collect all non-local queue items hosted by us - these are excluded from cleanup as someone may need to fetch them. + keep_hashes = set() + for name, q in self.q.queues.items(): + if name == ARCHIVE_QUEUE or name == DEFAULT_QUEUE: + continue + for j in q.as_dict()["jobs"]: + if j["peer_"] == q.addr or j["acquired_by_"] == q.addr: + keep_hashes.add(j["hash"]) + + # Loop through all .gjob and .gcode files in base directory and delete them if they aren't referenced or acquired by us + n = 0 + for d in os.listdir(self.fileshare_dir): + name, suffix = os.path.splitext(d) + if suffix not in ("", ".gjob", ".gcode", ".gco"): + self._logger.debug( + f"Fileshare cleanup ignoring non-printable file: {os.path.join(self.fileshare_dir, name)}" + ) + continue + if name not in keep_hashes: + p = Path(self.fileshare_dir) / d + if p.is_dir(): + # Have to use shutil instead of p.rmdir() as the directory will likely not be empty + shutil.rmtree(p, ignore_errors=True) + else: + p.unlink() + n += 1 + return n + def tick(self): # Catch/pass all exceptions to prevent errors from stopping the repeated timer. try: @@ -526,6 +565,9 @@ def on_event(self, event, payload): elif event == Events.PRINT_DONE: self._update(DA.SUCCESS) + n = self._cleanup_fileshare() + if n > 0: + self._logger.info(f"Deleted {n} unreferenced fileshare files/dirs") elif event == Events.PRINT_FAILED: # Note that cancelled events are already handled directly with Events.PRINT_CANCELLED self._update(DA.FAILURE) @@ -612,7 +654,7 @@ def _state_json(self): qs = [ dict(q.as_dict(), rank=db_qs[name]) for name, q in self.q.queues.items() - if name != "archive" + if name != ARCHIVE_QUEUE ] qs.sort(key=lambda q: q["rank"]) diff --git a/continuousprint/plugin_test.py b/continuousprint/plugin_test.py index db7d1a7..01fc8b6 100644 --- a/continuousprint/plugin_test.py +++ b/continuousprint/plugin_test.py @@ -1,4 +1,5 @@ import unittest +from pathlib import Path from collections import namedtuple from .analysis import CPQProfileAnalysisQueue from .storage.queries import getJobsAndSets @@ -36,6 +37,7 @@ def mockplugin(): settings=MockSettings(), file_manager=MagicMock(), plugin_manager=MagicMock(), + fire_event=MagicMock(), queries=MagicMock(), data_folder=None, logger=logging.getLogger(), @@ -246,6 +248,7 @@ def testQueueRunMovieDone(self): self.p._queries.annotateLastRun.assert_called_with("a.gcode", "a.mp4", ANY) def testPrintDone(self): + self.p._cleanup_fileshare = lambda: 0 self.p.on_event(Events.PRINT_DONE, dict()) self.p.d.action.assert_called_with(DA.SUCCESS, ANY, ANY, ANY, ANY) @@ -512,3 +515,39 @@ def testAnalysisCompleted(self): self.p._file_manager.set_additional_metadata.assert_called_with( ANY, "a.gcode", ANY, ANY, overwrite=True ) + + +class TestCleanupFileshare(unittest.TestCase): + def setUp(self): + self.td = tempfile.TemporaryDirectory() + q = MagicMock() + q.as_dict.return_value = dict( + jobs=[ + {"hash": "a", "peer_": q.addr, "acquired_by_": None}, + {"hash": "b", "peer_": "peer2", "acquired_by_": q.addr}, + {"hash": "c", "peer_": "peer2", "acquired_by_": None}, + {"hash": "d", "peer_": "peer2", "acquired_by_": None}, + ] + ) + self.p = mockplugin() + self.p.fileshare_dir = self.td.name + self.p.q = MagicMock() + self.p.q.queues.items.return_value = [("q", q)] + + def tearDown(self): + self.td.cleanup() + + def testCleanupNoFiles(self): + self.assertEqual(self.p._cleanup_fileshare(), 0) + + def testCleanupWithFiles(self): + p = Path(self.p.fileshare_dir) + (p / "d").mkdir() + for n in ("a", "b", "c"): + (p / f"{n}.gcode").touch() + self.assertEqual(self.p._cleanup_fileshare(), 2) + + for n in ("a", "b"): + self.assertTrue((p / f"{n}.gcode").exists()) + self.assertFalse((p / "c.gcode").exists()) + self.assertFalse((p / "d").exists()) diff --git a/continuousprint/queues/abstract.py b/continuousprint/queues/abstract.py index 56a4462..dae1e6f 100644 --- a/continuousprint/queues/abstract.py +++ b/continuousprint/queues/abstract.py @@ -67,40 +67,36 @@ def reset_jobs(self, job_ids) -> dict: pass -class AbstractJobQueue(AbstractQueue): - """LAN queues (potentially others in the future) act on whole jobs and do not allow - edits to inner data""" - - @abstractmethod - def submit_job(self, j: JobView) -> bool: - pass - - class AbstractEditableQueue(AbstractQueue): - """Some queues (e.g. local to a single printer) are directly editable.""" + """Use for queues that are directly editable.""" @abstractmethod - def add_job(self, name="") -> JobView: + def mv_job(self, job_id, after_id): pass @abstractmethod - def add_set(self, job_id, data) -> SetView: + def edit_job(self, job_id, data): pass @abstractmethod - def mv_set(self, set_id, after_id, dest_job) -> SetView: + def get_job_view(self, job_id): pass @abstractmethod - def mv_job(self, job_id, after_id): + def import_job_from_view(self, job_view): + """Imports a JobView into storage. Returns ID of the imported job""" pass + +class AbstractFactoryQueue(AbstractEditableQueue): + """Use for queues where you can construct new jobs/sets""" + @abstractmethod - def edit_job(self, job_id, data): + def add_job(self, name="") -> JobView: pass @abstractmethod - def rm_multi(self, job_ids, set_ids) -> dict: + def add_set(self, job_id, data) -> SetView: pass @abstractmethod diff --git a/continuousprint/queues/abstract_test.py b/continuousprint/queues/abstract_test.py new file mode 100644 index 0000000..ab287cf --- /dev/null +++ b/continuousprint/queues/abstract_test.py @@ -0,0 +1,157 @@ +from ..storage.database import JobView, SetView + + +class DummyQueue: + name = "foo" + + +def testJob(inst): + s = SetView() + s.id = inst + s.path = f"set{inst}.gcode" + s.count = 2 + s.remaining = 2 + s.rank = 0 + s.sd = False + s.material_keys = "" + s.profile_keys = "profile" + s.completed = 0 + s.save = lambda: True + j = JobView() + s.job = j + j.id = inst + j.acquired = False + j.name = f"job{inst}" + j.count = 2 + j.remaining = 2 + j.sets = [s] + j.draft = False + j.rank = 0 + j.queue = DummyQueue() + j.created = 5 + j.save = lambda: True + return j + + +class JobEqualityTests: + def _strip(self, d, ks): + for k in ks: + del d[k] + + def assertJobsEqual(self, v1, v2, ignore=[]): + d1 = v1.as_dict() + d2 = v2.as_dict() + for d in (d1, d2): + self._strip(d, [*ignore, "id", "queue"]) + for s in d1["sets"]: + self._strip(s, ("id", "rank")) + for s in d2["sets"]: + self._strip(s, ("id", "rank")) + self.assertEqual(d1, d2) + + def assertSetsEqual(self, s1, s2): + d1 = s1.as_dict() + d2 = s2.as_dict() + for d in (d1, d2): + self._strip(d, ("id", "rank")) + self.assertEqual(d1, d2) + + +class AbstractQueueTests(JobEqualityTests): + def setUp(self): + raise NotImplementedError("Must create queue as self.q with testJob() inserted") + + def test_acquire_get_release(self): + j = testJob(0) + self.assertEqual(self.q.acquire(), True) + self.assertEqual(self.q.get_job().acquired, True) + self.assertJobsEqual(self.q.get_job(), j, ignore=["acquired"]) + self.assertSetsEqual(self.q.get_set(), j.sets[0]) + self.q.release() + self.assertEqual(self.q.get_job(), None) + self.assertEqual(self.q.get_set(), None) + + def test_decrement_and_reset(self): + self.assertEqual(self.q.acquire(), True) + self.assertEqual(self.q.decrement(), True) # Work remains + got = self.q.get_set() + self.assertEqual(got.remaining, 1) + self.assertEqual(got.completed, 1) + self.q.reset_jobs([self.jid]) + got = self.q.as_dict()["jobs"][0]["sets"][0] + self.assertEqual(got["remaining"], 2) + self.assertEqual(got["completed"], 0) + + def test_remove_jobs(self): + self.assertEqual(self.q.remove_jobs([self.jid])["jobs_deleted"], 1) + self.assertEqual(len(self.q.as_dict()["jobs"]), 0) + + def test_as_dict(self): + d = self.q.as_dict() + self.assertNotEqual(d.get("name"), None) + self.assertNotEqual(d.get("jobs"), None) + self.assertNotEqual(d.get("strategy"), None) + + +class EditableQueueTests(JobEqualityTests): + NUM_TEST_JOBS = 4 + + def setUp(self): + raise NotImplementedError( + "Must create queue as self.q with testJob() inserted (inst=0..3)" + ) + + def test_mv_job_exchange(self): + self.q.mv_job(self.jids[1], self.jids[2]) + jids = [j["id"] for j in self.q.as_dict()["jobs"]] + self.assertEqual(jids, [self.jids[i] for i in (0, 2, 1, 3)]) + + def test_mv_to_front(self): + self.q.mv_job(self.jids[2], None) + jids = [j["id"] for j in self.q.as_dict()["jobs"]] + self.assertEqual(jids, [self.jids[i] for i in (2, 0, 1, 3)]) + + def test_mv_to_back(self): + self.q.mv_job(self.jids[2], self.jids[3]) + jids = [j["id"] for j in self.q.as_dict()["jobs"]] + self.assertEqual(jids, [self.jids[i] for i in (0, 1, 3, 2)]) + + def test_edit_job(self): + result = self.q.edit_job(self.jids[0], dict(draft=True)) + self.assertEqual(result, self.q.as_dict()["jobs"][0]) + self.assertEqual(self.q.as_dict()["jobs"][0]["draft"], True) + + def test_edit_job_then_decrement_persists_changes(self): + self.assertEqual(self.q.acquire(), True) + self.assertEqual(self.q.as_dict()["jobs"][0]["acquired"], True) + self.assertEqual(len(self.q.as_dict()["jobs"][0]["sets"]), 1) + + # Edit the acquired job, adding a new set + newsets = [testJob(0).sets[0].as_dict()] # Same as existing + newsets.append(testJob(100).sets[0].as_dict()) # New set + self.q.edit_job(self.jids[0], dict(sets=newsets)) + + # Value after decrement should be consistent, i.e. not regress to prior acquired-job value + self.q.decrement() + self.assertEqual(len(self.q.as_dict()["jobs"][0]["sets"]), 2) + + def test_get_job_view(self): + self.assertJobsEqual(self.q.get_job_view(self.jids[0]), testJob(0)) + + def test_import_job_from_view(self): + j = testJob(10) + jid = self.q.import_job_from_view(j) + self.assertJobsEqual(self.q.get_job_view(jid), j) + + def test_import_job_from_view_persists_completion_and_remaining(self): + j = testJob(10) + j.sets[0].completed = 3 + j.sets[0].remaining = 5 + jid = self.q.import_job_from_view(j) + got = self.q.get_job_view(jid).sets[0] + self.assertEqual(got.completed, j.sets[0].completed) + self.assertEqual(got.remaining, j.sets[0].remaining) + + +class AbstractFactoryQueueTests(JobEqualityTests): + pass # TODO diff --git a/continuousprint/queues/lan.py b/continuousprint/queues/lan.py index fb752c6..d1ca046 100644 --- a/continuousprint/queues/lan.py +++ b/continuousprint/queues/lan.py @@ -1,12 +1,19 @@ -from peerprint.lan_queue import LANPrintQueue -from ..storage.lan import LANJobView -from ..storage.database import JobView +import uuid +from typing import Optional +from bisect import bisect_left +from peerprint.lan_queue import LANPrintQueue, ChangeType +from ..storage.lan import LANJobView, LANSetView +from ..storage.database import JobView, SetView from pathlib import Path -from .abstract import AbstractJobQueue, QueueData, Strategy +from .abstract import AbstractEditableQueue, QueueData, Strategy import dataclasses -class LANQueue(AbstractJobQueue): +class ValidationError(Exception): + pass + + +class LANQueue(AbstractEditableQueue): def __init__( self, ns, @@ -25,6 +32,8 @@ def __init__( self.ns = ns self.addr = addr self.lan = None + self.job_id = None + self.set_id = None self.update_cb = update_cb self._fileshare = fileshare self._path_on_disk = path_on_disk_fn @@ -38,7 +47,26 @@ def is_ready(self) -> bool: def connect(self): self.lan.connect() - def _on_update(self): + def _compare_peer(self, prev, nxt): + if prev is None and nxt is not None: + return True + if prev is not None and nxt is None: + return True + if prev is None and nxt is None: + return False + for k in ("status", "run"): + if prev.get(k) != nxt.get(k): + return True + return False + + def _compare_job(self, prev, nxt): + return True # Always trigger callback - TODO make this more sophisticated + + def _on_update(self, changetype, prev, nxt): + if changetype == ChangeType.PEER and not self._compare_peer(prev, nxt): + return + elif changetype == ChangeType.JOB and not self._compare_job(prev, nxt): + return self.update_cb(self) def destroy(self): @@ -57,85 +85,74 @@ def update_peer_state(self, name, status, run, profile): ) ) - def set_job(self, hash_: str, manifest: dict): - return self.lan.q.setJob(hash_, manifest) + def set_job(self, jid: str, manifest: dict): + # Preserve peer address of job if present in the manifest + return self.lan.q.setJob(jid, manifest, addr=manifest.get("peer_", None)) - def resolve_set(self, peer, hash_, path) -> str: + def get_gjob_dirpath(self, peer, hash_): # Get fileshare address from the peer - peerstate = self.lan.q.getPeers().get(peer) + peerstate = self._get_peers().get(peer) if peerstate is None: - raise Exception( + raise ValidationError( "Cannot resolve set {path} within job hash {hash_}; peer state is None" ) # fetch unpacked job from fileshare (may be cached) and return the real path - gjob_dirpath = self._fileshare.fetch(peerstate["fs_addr"], hash_, unpack=True) - return str(Path(gjob_dirpath) / path) + return self._fileshare.fetch(peerstate["fs_addr"], hash_, unpack=True) - # --------- AbstractJobQueue implementation ------ - - def _validate_job(self, j: JobView) -> str: - peer_profiles = set( - [ - p.get("profile", dict()).get("name", "UNKNOWN") - for p in self.lan.q.getPeers().values() - ] - ) + # -------- Wrappers around LANQueue to add/remove metadata ------ - for s in j.sets: - sprof = set(s.profiles()) - # All sets in the job *must* have an assigned profile - if len(sprof) == 0: - return f"validation for job {j.name} failed - set {s.path} has no assigned profile" + def _annotate_job(self, peer_and_manifest, acquired_by): + (peer, manifest) = peer_and_manifest + m = dict(**manifest) + m["peer_"] = peer + m["acquired"] = True if acquired_by is not None else False + m["acquired_by_"] = acquired_by + return m - # At least one printer in the queue must have a compatible proile - if len(peer_profiles.intersection(sprof)) == 0: - return f"validation for job {j.name} failed - no match for set {s.path} with profiles {sprof} (connected printer profiles: {peer_profiles})" + def _normalize_job(self, data): + del m["peer_"] + del m["acquired_by_"] - # All set paths must resolve to actual files - fullpath = self._path_on_disk(s.path, s.sd) - if fullpath is None or not Path(fullpath).exists(): - return f"validation for job {j.name} failed - file not found at {s.path} (is it stored on disk and not SD?)" + def _get_jobs(self) -> list: + joblocks = self.lan.q.getLocks() + jobs = [] + for (jid, v) in self.lan.q.getJobs(): + jobs.append(self._annotate_job(v, joblocks.get(jid))) + return jobs - def submit_job(self, j: JobView) -> bool: - err = self._validate_job(j) - if err is not None: - self._logger.warning(err) - return Exception(err) - filepaths = dict([(s.path, self._path_on_disk(s.path, s.sd)) for s in j.sets]) - manifest = j.as_dict() - if manifest.get("created") is None: - manifest["created"] = int(time.time()) - # Note: postJob strips fields from manifest in-place - hash_ = self._fileshare.post(manifest, filepaths) - self.lan.q.setJob(hash_, manifest) + def _get_job(self, jid) -> dict: + j = self.lan.q.getJob(jid) + if j is not None: + joblocks = self.lan.q.getLocks() + return self._annotate_job(j, joblocks.get(jid)) - def reset_jobs(self, job_ids) -> dict: - for jid in job_ids: - j = self.lan.q.jobs.get(jid) - if j is None: - continue - (addr, manifest) = j + def _get_peers(self) -> list: + result = {} + # Locks are given by job:peer, so reverse this + peerlocks = dict([(v, k) for k, v in self.lan.q.getLocks().items()]) + for k, v in self.lan.q.getPeers().items(): + result[k] = dict(**v, acquired=peerlocks.get(k, [])) + return result - manifest["remaining"] = manifest["count"] - for s in manifest.get("sets", []): - s["remaining"] = s["count"] - self.lan.q.setJob(jid, manifest, addr=addr) + # --------- begin AbstractQueue -------- - def remove_jobs(self, job_ids) -> dict: - for jid in job_ids: - self.lan.q.removeJob(jid) + def get_job(self) -> Optional[JobView]: + # Override to ensure the latest data is received + return self.get_job_view(self.job_id) - # --------- AbstractQueue implementation -------- + def get_set(self) -> Optional[SetView]: + if self.job_id is not None and self.set_id is not None: + # Linear search through sets isn't efficient, but it works. + j = self.get_job_view(self.job_id) + for s in j.sets: + if s.id == self.set_id: + return s def _peek(self): if self.lan is None or self.lan.q is None: return (None, None) - jobs = self.lan.q.getJobs() - jobs.sort( - key=lambda j: j["created"] - ) # Always creation order - there is no reordering in lan queue - for data in jobs: + for data in self._get_jobs(): acq = data.get("acquired_by_") if acq is not None and acq != self.addr: continue # Acquired by somebody else, so don't consider for scheduling @@ -152,8 +169,8 @@ def acquire(self) -> bool: if job is not None and s is not None: if self.lan.q.acquireJob(job.id): self._logger.debug(f"acquire() candidate:\n{job}\n{s}") - self.job = job - self.set = s + self.job_id = job.id + self.set_id = s.id self._logger.debug("acquire() success") return True else: @@ -163,22 +180,24 @@ def acquire(self) -> bool: return False def release(self) -> None: - if self.job is not None: - self.lan.q.releaseJob(self.job.id) - self.job = None - self.set = None + if self.job_id is not None: + self.lan.q.releaseJob(self.job_id) + self.job_id = None + self.set_id = None def decrement(self) -> None: - if self.job is not None: - next_set = self.set.decrement(self._profile) + if self.job_id is not None: + next_set = self.get_set().decrement(self._profile) if next_set: self._logger.debug("Still has work, going for next set") - self.set = next_set + self.set_id = next_set.id return True else: self._logger.debug("No more work; releasing") self.release() return False + else: + raise Exception("Cannot decrement; no job acquired") def _active_set(self): assigned = self.get_set() @@ -190,11 +209,10 @@ def as_dict(self) -> dict: jobs = [] peers = {} if self.lan.q is not None: - jobs = self.lan.q.getJobs() - jobs.sort( - key=lambda j: j["created"] - ) # Always creation order - there is no reordering in lan queue - peers = self.lan.q.getPeers() + jobs = self._get_jobs() + peers = self._get_peers() + for j in jobs: + j["queue"] = self.ns return dataclasses.asdict( QueueData( @@ -206,3 +224,102 @@ def as_dict(self) -> dict: active_set=self._active_set(), ) ) + + def reset_jobs(self, job_ids) -> dict: + for jid in job_ids: + j = self._get_job(jid) + if j is None: + continue + + j["remaining"] = j["count"] + for s in j.get("sets", []): + s["remaining"] = s["count"] + s["completed"] = 0 + self.lan.q.setJob(jid, j, addr=j["peer_"]) + + def remove_jobs(self, job_ids) -> dict: + n = 0 + for jid in job_ids: + if self.lan.q.removeJob(jid) is not None: + n += 1 + return dict(jobs_deleted=n) + + # --------- end AbstractQueue ------ + + # --------- AbstractEditableQueue implementation ------ + + def get_job_view(self, job_id): + j = self._get_job(job_id) + if j is not None: + return LANJobView(j, self) + + def import_job_from_view(self, j, jid=None): + err = self._validate_job(j) + if err is not None: + raise ValidationError(err) + filepaths = dict([(s.path, self._path_on_disk(s.path, s.sd)) for s in j.sets]) + manifest = j.as_dict() + if manifest.get("created") is None: + manifest["created"] = int(time.time()) + # Note: post mutates manifest by stripping fields + manifest["hash"] = self._fileshare.post(manifest, filepaths) + manifest["id"] = jid if jid is not None else self._gen_uuid() + + # Propagate peer if importing from a LANJobView + # But don't fail with AttributeError if it's just a JobView + self.lan.q.setJob(manifest["id"], manifest, addr=getattr(j, "peer", None)) + return manifest["id"] + + def mv_job(self, job_id, after_id): + self.lan.q.jobs.mv(job_id, after_id) + + def _path_exists(self, fullpath): + return Path(fullpath).exists() + + def _validate_job(self, j: JobView) -> str: + peer_profiles = set( + [ + p.get("profile", dict()).get("name", "UNKNOWN") + for p in self._get_peers().values() + ] + ) + + for s in j.sets: + sprof = set(s.profiles()) + # All sets in the job *must* have an assigned profile + if len(sprof) == 0: + return f"validation for job {j.name} failed - set {s.path} has no assigned profile" + + # At least one printer in the queue must have a compatible proile + if len(peer_profiles.intersection(sprof)) == 0: + return f"validation for job {j.name} failed - no match for set {s.path} with profiles {sprof} (connected printer profiles: {peer_profiles})" + + # All set paths must resolve to actual files + fullpath = self._path_on_disk(s.path, s.sd) + if fullpath is None or not self._path_exists(fullpath): + return f"validation for job {j.name} failed - file not found at {s.path} (is it stored on disk and not SD?)" + + def _gen_uuid(self) -> str: + for i in range(100): + result = uuid.uuid4() + if not self.lan.q.hasJob(result): + return str(result) + raise Exception("UUID generation failed - too many ID collisions") + + def edit_job(self, job_id, data) -> bool: + # For lan queues, "editing" a job is basically resubmission of the whole thing. + # This is because the backing .gjob format is a single file containing the full manifest. + j = self.get_job_view(job_id) + for (k, v) in data.items(): + if k in ("id", "peer_", "queue"): + continue + if k == "sets": + j.updateSets( + v + ) # Set data must be translated into views, done by updateSets() + else: + setattr(j, k, v) + + # Exchange the old job for the new job (reuse job ID) + jid = self.import_job_from_view(j, j.id) + return self._get_job(jid) diff --git a/continuousprint/queues/lan_test.py b/continuousprint/queues/lan_test.py index 87623b9..d5cafc3 100644 --- a/continuousprint/queues/lan_test.py +++ b/continuousprint/queues/lan_test.py @@ -4,14 +4,26 @@ from datetime import datetime from unittest.mock import MagicMock from .abstract import Strategy -from .lan import LANQueue +from .abstract_test import ( + AbstractQueueTests, + EditableQueueTests, + testJob as makeAbstractTestJob, +) +from .lan import LANQueue, ValidationError from ..storage.database import JobView, SetView +from peerprint.lan_queue_test import LANQueueLocalTest as PeerPrintLANTest # logging.basicConfig(level=logging.DEBUG) -class LANQueueTest(unittest.TestCase): +class LANQueueTest(unittest.TestCase, PeerPrintLANTest): def setUp(self): + PeerPrintLANTest.setUp(self) # Generate peerprint LANQueue as self.q + self.q.q.syncPeer( + dict(profile=dict(name="profile")), addr=self.q.q.addr + ) # Helps pass validation + ppq = self.q # Rename to make way for CPQ LANQueue + self.ucb = MagicMock() self.fs = MagicMock() self.q = LANQueue( @@ -24,6 +36,23 @@ def setUp(self): dict(name="profile"), lambda path, sd: path, ) + self.q.lan = ppq + self.q._path_exists = lambda p: True # Override path check for validation + + +class TestAbstractImpl(AbstractQueueTests, LANQueueTest): + def setUp(self): + LANQueueTest.setUp(self) + self.jid = self.q.import_job_from_view(makeAbstractTestJob(0)) + + +class TestEditableImpl(EditableQueueTests, LANQueueTest): + def setUp(self): + LANQueueTest.setUp(self) + self.jids = [ + self.q.import_job_from_view(makeAbstractTestJob(i)) + for i in range(EditableQueueTests.NUM_TEST_JOBS) + ] class TestLANQueueNoConnection(LANQueueTest): @@ -31,34 +60,41 @@ def test_update_peer_state(self): self.q.update_peer_state("HI", {}, {}, {}) # No explosions? Good +class DummyQueue: + name = "lantest" + + class TestLANQueueConnected(LANQueueTest): def setUp(self): super().setUp() self.q.lan = MagicMock() self.q.lan.q = MagicMock() + self.q.lan.q.hasJob.return_value = False # For UUID generation self.q.lan.q.getPeers.return_value = { "a": dict(fs_addr="123", profile=dict(name="abc")), } - def test_resolve_set_failed_bad_peer(self): + def test_get_gjob_dirpath_failed_bad_peer(self): with self.assertRaises(Exception): - self.q.resolve_set("b", "hash", "path") + self.q.get_gjob_dirpath("b", "hash") - def test_resolve_set(self): + def test_get_gjob_dirpath(self): self.fs.fetch.return_value = "/dir/" - self.assertEqual(self.q.resolve_set("a", "hash", "path"), "/dir/path") + self.assertEqual(self.q.get_gjob_dirpath("a", "hash"), "/dir/") self.fs.fetch.assert_called_with("123", "hash", unpack=True) def _jbase(self, path="a.gcode"): j = JobView() j.id = 1 j.name = "j1" + j.queue = DummyQueue() s = SetView() s.path = path s.id = 2 s.sd = False s.count = 1 s.remaining = 1 + s.completed = 0 s.profile_keys = "" s.rank = 1 s.material_keys = "" @@ -70,57 +106,26 @@ def _jbase(self, path="a.gcode"): j.acquired = False return j - def test_submit_job_file_missing(self): + def test_validation_file_missing(self): j = self._jbase() j.sets[0].profile_keys = "def,abc" - result = self.q.submit_job(j) - self.assertRegex(str(result), "file not found") + self.q._path_exists = lambda p: False # Override path check for validation + with self.assertRaisesRegex(ValidationError, "file not found"): + self.q.import_job_from_view(j) self.fs.post.assert_not_called() - def test_submit_job_no_profile(self): - result = self.q.submit_job(self._jbase()) - self.assertRegex(str(result), "no assigned profile") + def test_validation_no_profile(self): + with self.assertRaisesRegex(ValidationError, "no assigned profile"): + self.q.import_job_from_view(self._jbase()) self.fs.post.assert_not_called() - def test_submit_job_no_match(self): + def test_validation_no_match(self): j = self._jbase() j.sets[0].profile_keys = "def" - result = self.q.submit_job(j) - self.assertRegex(str(result), "no match for set") + with self.assertRaisesRegex(ValidationError, "no match for set"): + self.q.import_job_from_view(j) self.fs.post.assert_not_called() - def test_submit_job(self): - with tempfile.NamedTemporaryFile(suffix=".gcode") as f: - self.fs.post.return_value = "hash" - j = self._jbase(f.name) - j.sets[0].profile_keys = "def,abc" - self.q.submit_job(j) - self.fs.post.assert_called() - self.q.lan.q.setJob.assert_called_with( - "hash", - { - "name": "j1", - "count": 1, - "draft": False, - "sets": [ - { - "path": f.name, - "count": 1, - "materials": [], - "profiles": ["def", "abc"], - "id": 2, - "rank": 1, - "sd": False, - "remaining": 1, - } - ], - "created": 100, - "id": 1, - "remaining": 1, - "acquired": False, - }, - ) - class TestLANQueueWithJob(LANQueueTest): def setUp(self): diff --git a/continuousprint/queues/local.py b/continuousprint/queues/local.py index 33d0d12..4d7f38b 100644 --- a/continuousprint/queues/local.py +++ b/continuousprint/queues/local.py @@ -1,5 +1,6 @@ -from .abstract import Strategy, QueueData, AbstractEditableQueue +from .abstract import Strategy, QueueData, AbstractFactoryQueue import tempfile +import shutil import os from ..storage.database import JobView, SetView from peerprint.filesharing import pack_job, unpack_job, packed_name @@ -7,7 +8,7 @@ import dataclasses -class LocalQueue(AbstractEditableQueue): +class LocalQueue(AbstractFactoryQueue): def __init__( self, queries, @@ -52,7 +53,7 @@ def acquire(self) -> bool: self.ns, self._profile, self._set_path_exists ) if p is not None and self.queries.acquireJob(p): - self.job = p + self.job = self.queries.getJob(p.id) # Refetch job to get acquired state self.set = p.next_set(self._profile, self._set_path_exists) return True return False @@ -98,7 +99,7 @@ def as_dict(self) -> dict: ) def remove_jobs(self, job_ids): - return self.rm_multi(job_ids=job_ids) + return self.queries.remove(job_ids=job_ids) def reset_jobs(self, job_ids): return self.queries.resetJobs(job_ids) @@ -107,14 +108,31 @@ def reset_jobs(self, job_ids): # -------------- begin AbstractEditableQueue ----------- - def add_job(self, name="") -> JobView: - return self.queries.newEmptyJob(self.ns, name) - - def add_set(self, job_id, data) -> SetView: - return self.queries.appendSet(self.ns, job_id, data) - - def mv_set(self, set_id, after_id, dest_job): - return self.queries.moveSet(set_id, after_id, dest_job) + def get_job_view(self, job_id): + return self.queries.getJob(job_id) + + def import_job_from_view(self, v, copy_fn=shutil.copytree): + manifest = v.as_dict() + + # If importing from a non-local queue, we must also fetch/import the files so they're available locally. + if hasattr(v, "get_base_dir"): + dest_dir = f'ContinuousPrint/imports/{manifest["name"]}_{manifest["id"]}' + gjob_dir = v.get_base_dir() + copy_fn(gjob_dir, self._path_on_disk(dest_dir, False)) + for s in manifest["sets"]: + s["path"] = os.path.join(dest_dir, s["path"]) + + # TODO make transaction, move to storage/queries.py + j = self.add_job() + for (k, v) in manifest.items(): + if k in ("peer_", "sets", "id", "acquired", "queue"): + continue + setattr(j, k, v) + j.save() + for s in manifest["sets"]: + del s["id"] + self.add_set(j.id, s) + return j.id def mv_job(self, job_id, after_id): return self.queries.moveJob(job_id, after_id) @@ -122,14 +140,22 @@ def mv_job(self, job_id, after_id): def edit_job(self, job_id, data): return self.queries.updateJob(job_id, data) - def rm_multi(self, job_ids=[], set_ids=[]) -> dict: - return self.queries.remove(job_ids=job_ids, set_ids=set_ids) + # ------------------- end AbstractEditableQueue --------------- + + # ------------ begin AbstractFactoryQueue ------ + + def add_job(self, name="") -> JobView: + return self.queries.newEmptyJob(self.ns, name) + + def add_set(self, job_id, data) -> SetView: + return self.queries.appendSet(self.ns, job_id, data) def import_job(self, gjob_path: str, draft=True) -> dict: out_dir = str(Path(gjob_path).stem) self._mkdir(out_dir) manifest, filepaths = unpack_job( - self._path_on_disk(gjob_path), self._path_on_disk(out_dir) + self._path_on_disk(gjob_path, sd=False), + self._path_on_disk(out_dir, sd=False), ) return self.queries.importJob(self.ns, manifest, out_dir, draft) @@ -150,4 +176,4 @@ def export_job(self, job_id: int, dest_dir: str) -> str: os.rename(tf.name, path) return path - # ------------------- end AbstractEditableQueue --------------- + # ------------------- end AbstractFactoryQueue --------------- diff --git a/continuousprint/queues/local_test.py b/continuousprint/queues/local_test.py index 7e27306..386aa88 100644 --- a/continuousprint/queues/local_test.py +++ b/continuousprint/queues/local_test.py @@ -1,11 +1,94 @@ import unittest import logging +from ..storage.database_test import DBTest +from ..storage import queries +from ..storage.lan import LANJobView +from ..storage.database import JobView from unittest.mock import MagicMock from .abstract import Strategy, QueueData +from .abstract_test import ( + AbstractQueueTests, + EditableQueueTests, + testJob as makeAbstractTestJob, +) from .local import LocalQueue from dataclasses import dataclass, asdict -# logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.DEBUG) + + +class TestAbstractImpl(AbstractQueueTests, DBTest): + # See abstract_test.py for actual test cases + def setUp(self): + DBTest.setUp(self) + self.q = LocalQueue( + queries, + "local", + Strategy.IN_ORDER, + dict(name="profile"), + MagicMock(), + MagicMock(), + ) + self.jid = self.q.import_job_from_view(makeAbstractTestJob(0)) + self.q._set_path_exists = lambda p: True + + +class TestEditableImpl(EditableQueueTests, DBTest): + # See abstract_test.py for actual test cases + def setUp(self): + DBTest.setUp(self) + self.q = LocalQueue( + queries, + "local", + Strategy.IN_ORDER, + dict(name="profile"), + MagicMock(), + MagicMock(), + ) + self.jids = [ + self.q.import_job_from_view(makeAbstractTestJob(i)) + for i in range(EditableQueueTests.NUM_TEST_JOBS) + ] + self.q._set_path_exists = lambda p: True + + +class TestLocalQueueImportFromLAN(unittest.TestCase): + def setUp(self): + queries = MagicMock() + queries.getAcquiredJob.return_value = None + self.q = LocalQueue( + queries, + "testQueue", + Strategy.IN_ORDER, + dict(name="profile"), + lambda p, sd: p, + MagicMock(), + ) + + def testImportJobFromLANView(self): + # Jobs imported from LAN into local queue must have their + # gcode files copied from the remote peer, in a way which + # doesn't get auto-cleaned (as in the fileshare/ directory) + lq = MagicMock() + j = JobView() + j.id = "567" + j.save = MagicMock() + self.q.queries.newEmptyJob.return_value = j + manifest = dict( + name="test_job", + id="123", + sets=[dict(path="a.gcode", count=1, remaining=1)], + hash="foo", + peer_="addr", + ) + cp = MagicMock() + lq.get_gjob_dirpath.return_value = "gjob_dirpath" + self.q.import_job_from_view(LANJobView(manifest, lq), cp) + + wantdir = "ContinuousPrint/imports/test_job_123" + cp.assert_called_with("gjob_dirpath", wantdir) + _, args, _ = self.q.queries.appendSet.mock_calls[-1] + self.assertEqual(args[2]["path"], wantdir + "/a.gcode") class TestLocalQueueInOrderNoInitialJob(unittest.TestCase): @@ -21,16 +104,6 @@ def setUp(self): MagicMock(), ) - def test_acquire_success(self): - j = MagicMock() - s = MagicMock() - j.next_set.return_value = s - self.q.queries.getNextJobInQueue.return_value = j - self.q.queries.acquireJob.return_value = True - self.assertEqual(self.q.acquire(), True) - self.assertEqual(self.q.get_job(), j) - self.assertEqual(self.q.get_set(), s) - def test_acquire_failed(self): self.q.queries.getNextJobInQueue.return_value = "doesntmatter" self.q.queries.acquireJob.return_value = False @@ -41,18 +114,8 @@ def test_acquire_failed_no_jobs(self): self.q.queries.getNextJobInQueue.return_value = None self.assertEqual(self.q.acquire(), False) - def test_as_dict(self): - self.assertEqual( - self.q.as_dict(), - dict( - name="testQueue", - strategy="IN_ORDER", - jobs=[], - active_set=None, - addr=None, - peers=[], - ), - ) + def test_import_job(self): + pass # TODO class TestLocalQueueInOrderInitial(unittest.TestCase): @@ -76,6 +139,9 @@ def test_init_already_acquired(self): self.assertEqual(self.q.get_job(), self.j) self.assertEqual(self.q.get_set(), self.s) + def test_mv(self): + pass # TODO + def test_acquire_2x(self): # Second acquire should do nothing, return True self.q.queries.getNextJobInQueue.return_value = None diff --git a/continuousprint/script_runner.py b/continuousprint/script_runner.py index ca2b629..78eef1c 100644 --- a/continuousprint/script_runner.py +++ b/continuousprint/script_runner.py @@ -5,7 +5,7 @@ from octoprint.filemanager.destinations import FileDestinations from octoprint.printer import InvalidFileLocation, InvalidFileType from .storage.lan import ResolveError -from .data import Keys, TEMP_FILES +from .data import Keys, TEMP_FILES, CustomEvents class ScriptRunner: @@ -17,6 +17,7 @@ def __init__( logger, printer, refresh_ui_state, + fire_event, ): self._msg = msg self._get_key = get_key @@ -24,6 +25,7 @@ def __init__( self._logger = logger self._printer = printer self._refresh_ui_state = refresh_ui_state + self._fire_event = fire_event def _wrap_stream(self, name, gcode): return StreamWrapper(name, BytesIO(gcode.encode("utf-8"))) @@ -44,20 +46,25 @@ def _execute_gcode(self, key): def run_finish_script(self): self._msg("Print Queue Complete", type="complete") - return self._execute_gcode(Keys.FINISHED_SCRIPT) + result = self._execute_gcode(Keys.FINISHED_SCRIPT) + self._fire_event(CustomEvents.FINISH) + return result def cancel_print(self): self._msg("Print cancelled", type="error") self._printer.cancel_print() + self._fire_event(CustomEvents.CANCEL) def start_cooldown(self): self._msg("Running bed cooldown script") self._execute_gcode(Keys.BED_COOLDOWN_SCRIPT) self._printer.set_temperature("bed", 0) # turn bed off + self._fire_event(CustomEvents.COOLDOWN) def clear_bed(self): self._msg("Clearing bed") self._execute_gcode(Keys.CLEARING_SCRIPT) + self._fire_event(CustomEvents.CLEAR_BED) def start_print(self, item): self._msg(f"{item.job.name}: printing {item.path}") @@ -73,13 +80,14 @@ def start_print(self, item): path = item.resolve() except ResolveError as e: self._logger.error(e) - self._msg(f"Could not resolve LAN print path {path}", type="error") + self._msg(f"Could not resolve LAN print path for {path}", type="error") return False self._logger.info(f"Resolved LAN print path to {path}") try: self._logger.info(f"Attempting to print {path} (sd={item.sd})") self._printer.select_file(path, sd=item.sd, printAfterSelect=True) + self._fire_event(CustomEvents.START_PRINT) except InvalidFileLocation as e: self._logger.error(e) self._msg("File not found: " + path, type="error") diff --git a/continuousprint/script_runner_test.py b/continuousprint/script_runner_test.py index 416567b..163b446 100644 --- a/continuousprint/script_runner_test.py +++ b/continuousprint/script_runner_test.py @@ -3,6 +3,7 @@ from collections import namedtuple from unittest.mock import MagicMock from .script_runner import ScriptRunner +from .data import CustomEvents import logging logging.basicConfig(level=logging.DEBUG) @@ -20,6 +21,7 @@ def setUp(self): logger=logging.getLogger(), printer=MagicMock(), refresh_ui_state=MagicMock(), + fire_event=MagicMock(), ) self.s._wrap_stream = MagicMock(return_value=None) @@ -31,10 +33,12 @@ def test_run_finish_script(self): sd=False, printAfterSelect=True, ) + self.s._fire_event.assert_called_with(CustomEvents.FINISH) def test_cancel_print(self): self.s.cancel_print() self.s._printer.cancel_print.assert_called() + self.s._fire_event.assert_called_with(CustomEvents.CANCEL) def test_clear_bed(self): self.s.clear_bed() @@ -43,18 +47,21 @@ def test_clear_bed(self): sd=False, printAfterSelect=True, ) + self.s._fire_event.assert_called_with(CustomEvents.CLEAR_BED) def test_start_print_local(self): self.assertEqual(self.s.start_print(LI(False, "a.gcode", LJ("job1"))), True) self.s._printer.select_file.assert_called_with( "a.gcode", sd=False, printAfterSelect=True ) + self.s._fire_event.assert_called_with(CustomEvents.START_PRINT) def test_start_print_sd(self): self.assertEqual(self.s.start_print(LI(True, "a.gcode", LJ("job1"))), True) self.s._printer.select_file.assert_called_with( "a.gcode", sd=True, printAfterSelect=True ) + self.s._fire_event.assert_called_with(CustomEvents.START_PRINT) def test_start_print_lan(self): class NetItem: @@ -69,11 +76,14 @@ def resolve(self): self.s._printer.select_file.assert_called_with( "net/a.gcode", sd=False, printAfterSelect=True ) + self.s._fire_event.assert_called_with(CustomEvents.START_PRINT) def test_start_print_invalid_location(self): self.s._printer.select_file.side_effect = InvalidFileLocation() self.assertEqual(self.s.start_print(LI(True, "a.gcode", LJ("job1"))), False) + self.s._fire_event.assert_not_called() def test_start_print_invalid_filetype(self): self.s._printer.select_file.side_effect = InvalidFileType() self.assertEqual(self.s.start_print(LI(True, "a.gcode", LJ("job1"))), False) + self.s._fire_event.assert_not_called() diff --git a/continuousprint/static/css/continuousprint.css b/continuousprint/static/css/continuousprint.css index 517065d..2f38e72 100644 --- a/continuousprint/static/css/continuousprint.css +++ b/continuousprint/static/css/continuousprint.css @@ -4,6 +4,11 @@ --cpq-pad2: 20px; } +@font-face { + font-family: Cabin Regular; + src: url(../ttf/Cabin/Cabin-Regular.ttf); +} + #files .cpq-gjob div[title="Load"] { display: none !important; } @@ -140,6 +145,12 @@ padding-right: var(--cpq-pad); padding-top: var(--cpq-pad); } +#tab_plugin_continuousprint .job.draft .job-gutter .text-error { + width: 100%; + line-height: 1; + text-align: right; + padding-right: var(--cpq-pad2); +} #tab_plugin_continuousprint .job .floatedit { position: absolute; top: var(--cpq-pad2); @@ -165,17 +176,59 @@ margin-bottom: 0; line-height: 1.6; } -#tab_plugin_continuousprint .queue-row-container .total { - width: 40px; +#tab_plugin_continuousprint .total, +#tab_plugin_continuousprint .completed, +#tab_plugin_continuousprint .remaining, +#tab_plugin_continuousprint .count { + width: 67px; text-align: center; + margin-left: var(--cpq-pad); + padding: 2px; + border: 1px transparent solid; } -#tab_plugin_continuousprint .loading { +#tab_plugin_continuousprint .cp-queue .loading { opacity: 0.3; cursor: default !important; } +#tab_plugin_continuousprint .edit-header { + display: flex; + justify-content: space-between; + font-weight: bold; + opacity: 0.5; +} +#tab_plugin_continuousprint .job-stats { + display: flex; + justify-content: space-between; + font-weight: bold; + border-top: 2px #ccc solid; +} +#tab_plugin_continuousprint .has_title { + cursor: help; +} +#tab_plugin_continuousprint .has_title > sup { + opacity: 0.5; +} +#tab_plugin_continuousprint .progresstext { + width: 100%; + position: absolute; + text-align:center; +} +#tab_plugin_continuousprint .set_warning { + flex: 1; + text-align: center; + margin-right: 17px; +} +#tab_plugin_continuousprint .draft .set_warning { + flex: 0; +} +#tab_plugin_continuousprint .sets .progress { + margin-left: 17px; +} #tab_plugin_continuousprint .progress { - width: 150px; + position: relative; + flex: 1; display: flex; /* allow for stacking progresses in jobs */ + margin-right: var(--cpq-pad2); } #tab_plugin_continuousprint .accordion-heading-button { padding: var(--cpq-pad) var(--cpq-pad); @@ -195,17 +248,27 @@ #tab_plugin_continuousprint .accordion-body.collapse.in { max-height: 500px; } +.cpq_logo { + height: 40px; + float: left; +} +#settings_plugin_continuousprint .cpq_title { + font-family: "Cabin Regular"; + font-size: 2em; + display: flex; + align-items: center; +} +#settings_plugin_continuousprint .cpq_title > img { + width: 71px; +} #tab_plugin_continuousprint .queue-header, #settings_continuousprint_queues .queue-header { display: flex; justify-content: space-between; font-weight: bold; - padding: var(--cpq-pad) 8px 0 28px; opacity: 0.5; margin-left: 55px; -} -#tab_plugin_continuousprint .repeats { - width: 85px; - text-align: center; + margin-right: var(--cpq-pad2); + align-items: flex-end; } #tab_plugin_continuousprint .queue-header > div, #settings_continuousprint_queues .queue-header > div { text-align: center; @@ -267,12 +330,11 @@ } #tab_plugin_continuousprint .count-box{ - text-align: left; min-width:30px; - max-width: 30px; max-height:15px; - margin-bottom: 3px; -moz-appearance: textfield; + text-align: center; + border: 1px #ccc solid !important; } #tab_plugin_continuousprint .count-box::-webkit-outer-spin-button, #tab_plugin_continuousprint .count-box::-webkit-inner-spin-button { @@ -402,5 +464,3 @@ margin-left: 5px; margin-right: 5px; } - -/* ======== */ diff --git a/continuousprint/static/img/CPQ.gif b/continuousprint/static/img/CPQ.gif new file mode 100644 index 0000000..0655f63 Binary files /dev/null and b/continuousprint/static/img/CPQ.gif differ diff --git a/continuousprint/static/img/CPQ.jpg b/continuousprint/static/img/CPQ.jpg new file mode 100644 index 0000000..9830f9b Binary files /dev/null and b/continuousprint/static/img/CPQ.jpg differ diff --git a/continuousprint/static/img/CPQ.png b/continuousprint/static/img/CPQ.png new file mode 100644 index 0000000..86f4222 Binary files /dev/null and b/continuousprint/static/img/CPQ.png differ diff --git a/continuousprint/static/img/CPQ.svg b/continuousprint/static/img/CPQ.svg new file mode 100644 index 0000000..3d0785a --- /dev/null +++ b/continuousprint/static/img/CPQ.svg @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + diff --git a/continuousprint/static/img/CPQ.tif b/continuousprint/static/img/CPQ.tif new file mode 100644 index 0000000..033b36f Binary files /dev/null and b/continuousprint/static/img/CPQ.tif differ diff --git a/continuousprint/static/js/continuousprint_api.js b/continuousprint/static/js/continuousprint_api.js index 1d151b7..1346fde 100644 --- a/continuousprint/static/js/continuousprint_api.js +++ b/continuousprint/static/js/continuousprint_api.js @@ -35,14 +35,14 @@ class CPAPI { } }, error: (xhr, textstatus, errThrown) => { + if (blocking) { + self.loading(false); + } if (err_cb) { err_cb(xhr.status, errThrown); } else { self.default_err_cb(xhr.status, errThrown); } - if (blocking) { - self.loading(false); - } } }); } diff --git a/continuousprint/static/js/continuousprint_job.js b/continuousprint/static/js/continuousprint_job.js index 1a10c29..207acab 100644 --- a/continuousprint/static/js/continuousprint_job.js +++ b/continuousprint/static/js/continuousprint_job.js @@ -20,22 +20,22 @@ function CPJob(obj, peers, api, profile) { } var self = this; - obj = {...{sets: [], name: "", draft: false, count: 1, queue: "default", id: -1}, ...obj}; + obj = {...{sets: [], name: "", draft: false, count: 1, id: -1}, ...obj}; if (obj.remaining === undefined) { obj.remaining = obj.count; } self.id = ko.observable(obj.id); self._name = ko.observable(obj.name || ""); - if (obj.acquired) { - self.acquiredBy = ko.observable('local'); - } else if (obj.acquired_by_) { + if (obj.acquired_by_) { let peer = peers[obj.acquired_by_]; if (peer !== undefined) { - self.acquiredBy = ko.observable(peer.name); + self.acquiredBy = ko.observable(`${peer.name} (${peer.profile.name})`); } else { self.acquiredBy = ko.observable(obj.acquired_by_) } + } else if (obj.acquired) { + self.acquiredBy = ko.observable('local'); } else { self.acquiredBy = ko.observable(); } @@ -53,9 +53,9 @@ function CPJob(obj, peers, api, profile) { self._update = function(result) { self.draft(result.draft); - self.count(result.count); - self.remaining(result.remaining); // Adjusted when count is mutated - self.completed(result.count - result.remaining); // Adjusted when count is mutated + self.count(result.count); // Adjusted when remaining is mutated + self.remaining(result.remaining); + self.completed(result.count - result.remaining); // Adjusted when remaining is mutated self.id(result.id); // May change if no id to start with self._name(result.name); let cpss = []; @@ -82,7 +82,7 @@ function CPJob(obj, peers, api, profile) { } self.editStart = function() { - api.edit(api.JOB, {id: self.id(), draft: true}, () => { + api.edit(api.JOB, {queue: obj.queue, id: self.id(), draft: true}, () => { self.draft(true); }); } @@ -96,26 +96,38 @@ function CPJob(obj, peers, api, profile) { self.sets.push(newqs); } self.editCancel = function() { - api.edit(api.JOB, {id: self.id(), draft: false}, self._update); + api.edit(api.JOB, {queue: obj.queue, id: self.id(), draft: false}, self._update); + } + self.onBlur = function(vm, e) { + let cl = e.target.classList; + let v = parseInt(e.target.value, 10); + if (isNaN(v)) { + return; + } + vm.count(vm.completed() + v); } self.editEnd = function() { let data = self.as_object(); data.draft = false; + data.queue = obj.queue; api.edit(api.JOB, data, self._update); } - self.length = ko.computed(function() { - let l = 0; - let c = self.count(); - for (let qs of self.sets()) { - l += qs.count()*c; + self._safeParse = function(v) { + v = parseInt(v, 10); + if (isNaN(v)) { + return 0; } - return l; - }); - self.length_completed = ko.computed(function() { - let r = 0; + return v; + } + + self.totals = ko.computed(function() { + let r = {count: 0, completed: 0, remaining: 0, total: 0}; for (let qs of self.sets()) { - r += qs.length_completed(); + r.remaining += self._safeParse(qs.remaining()); + r.total += self._safeParse(qs.length_remaining()); + r.count += self._safeParse(qs.count()); + r.completed += self._safeParse(qs.completed()); } return r; }); diff --git a/continuousprint/static/js/continuousprint_job.test.js b/continuousprint/static/js/continuousprint_job.test.js index 5be3c32..25d5039 100644 --- a/continuousprint/static/js/continuousprint_job.test.js +++ b/continuousprint/static/js/continuousprint_job.test.js @@ -41,12 +41,16 @@ test('onSetModified existing', () => { expect(j.sets()[1].path()).toBe('asdf'); }); -test('length and length_completed', () => { - let j = new Job({count: 3, remaining: 1, sets: sets()}, [], api()); +test('totals', () => { + let j = new Job({count: 3, completed: 1, remaining: 1, sets: sets()}, [], api()); // 2 jobs done, each with 2 sets of 2 --> 8 // plus an extra 1 each in current run --> 10 - expect(j.length_completed()).toBe(10); - expect(j.length()).toBe(12); + expect(j.totals()).toStrictEqual({ + completed: 0, + count: 4, + remaining: 2, + total: 2, + }); }); test('checkFraction', () => { diff --git a/continuousprint/static/js/continuousprint_queue.js b/continuousprint/static/js/continuousprint_queue.js index 34ca41f..1b94631 100644 --- a/continuousprint/static/js/continuousprint_queue.js +++ b/continuousprint/static/js/continuousprint_queue.js @@ -31,6 +31,7 @@ function CPQueue(data, api, files, profile) { self.shiftsel = ko.observable(-1); self.details = ko.observable(""); self.fullDetails = ko.observable(""); + self.ready = ko.observable(data.name === 'local' || Object.keys(data.peers).length > 0); if (self.addr !== null && data.peers !== undefined) { let pkeys = Object.keys(data.peers); if (pkeys.length === 0) { @@ -94,18 +95,18 @@ function CPQueue(data, api, files, profile) { break; case "Unstarted Jobs": for (let j of self.jobs()) { - j.onChecked(j.sets().length !== 0 && j.length_completed() === 0); + j.onChecked(j.sets().length !== 0 && j.totals().completed === 0); } break; case "Incomplete Jobs": for (let j of self.jobs()) { - let lc = j.length_completed(); - j.onChecked(lc > 0 && lc < j.length()); + let t = j.totals(); + j.onChecked(t.remaining > 0 && t.remaining < t.count); } break; case "Completed Jobs": for (let j of self.jobs()) { - j.onChecked(j.sets().length !== 0 && j.length_completed() >= j.length()); + j.onChecked(j.sets().length !== 0 && j.totals().remaining == 0); } break; default: @@ -252,6 +253,15 @@ function CPQueue(data, api, files, profile) { }); } + self.hasDraftJobs = function() { + for (let j of self.jobs()) { + if (j.draft()) { + return true; + } + } + return false; + } + self.addFile = function(data, infer_profile=false) { if (data.path.endsWith('.gjob')) { // .gjob import has a different API path @@ -291,7 +301,7 @@ function CPQueue(data, api, files, profile) { set_data['job'] = null; // Invoking API causes a new job to be created self.api.add(self.api.SET, set_data, (response) => { - return self._pushJob({id: response.job_id, name: set_data['jobName'], draft: true, count: 1, sets: [response.set_]}); + return self._pushJob({queue: self.name, id: response.job_id, name: set_data['jobName'], draft: true, count: 1, sets: [response.set_]}); }); }; diff --git a/continuousprint/static/js/continuousprint_queue.test.js b/continuousprint/static/js/continuousprint_queue.test.js index e6889f8..a3ec702 100644 --- a/continuousprint/static/js/continuousprint_queue.test.js +++ b/continuousprint/static/js/continuousprint_queue.test.js @@ -33,7 +33,9 @@ function items(njobs = 1, nsets = 2) { } function init(njobs = 1) { - return new VM({name:"test", jobs:items(njobs)}, mocks()); + return new VM({name:"test", jobs:items(njobs), peers:[ + {name: "localhost", profile: {name: "profile"}, status: "IDLE"} + ]}, mocks()); } test('newEmptyJob', () => { @@ -76,9 +78,13 @@ describe('batchSelect', () => { // job 2 is unstarted; no action needed v.jobs()[2].sets()[0].count(3); // Job 3 is incomplete, set 5 is incomplete v.jobs()[2].sets()[0].completed(1); + v.jobs()[2].sets()[0].remaining(2); v.jobs()[3].remaining(0); // job 4 is complete + v.jobs()[3].completed(1); v.jobs()[3].sets()[0].completed(1); // job 4 is complete + v.jobs()[3].sets()[0].remaining(0); // job 4 is complete v.jobs()[3].sets()[1].completed(1); // job 4 is complete + v.jobs()[3].sets()[1].remaining(0); // job 4 is complete let cases = [ // mode, jobids ['None', []], diff --git a/continuousprint/static/js/continuousprint_set.js b/continuousprint/static/js/continuousprint_set.js index 2076ca8..51e230b 100644 --- a/continuousprint/static/js/continuousprint_set.js +++ b/continuousprint/static/js/continuousprint_set.js @@ -23,7 +23,7 @@ function CPSet(data, job, api, profile) { self.count = ko.observable(data.count); self.missing_file = ko.observable(data.missing_file); self.remaining = ko.observable((data.remaining !== undefined) ? data.remaining : data.count); - self.completed = ko.observable(data.count - self.remaining()); // Not computed to allow for edits without changing + self.completed = ko.observable((data.completed !== undefined) ? data.completed : 0); self.expanded = ko.observable(data.expanded); self.mats = ko.observable(data.materials || []); self.profiles = ko.observableArray(data.profiles || []); @@ -57,17 +57,23 @@ function CPSet(data, job, api, profile) { profiles: self.profiles(), }; } - self.length = ko.computed(function() { - return job.count() * self.count(); - }); - self.length_completed = ko.computed(function() { - let job_completed = job.completed(); - if (job_completed === job.count()) { - job_completed -= 1; // Prevent double-counting the end of the job + self.length_remaining = ko.computed(function() { + let c = parseInt(self.count(), 10); + let r = parseInt(self.remaining(), 10); + let jr = parseInt(job.remaining(), 10); + if (isNaN(c) || isNaN(r) || isNaN(jr)) { + return ' '; } - return self.completed() + self.count()*job_completed; - return result; + return Math.max(0, (jr-1) * c + r); }); + self.onCountFocus = function() { + self._tmp_count = self.count(); + }; + self.onCountBlur = function() { + if (self.count() !== self._tmp_count) { + self.remaining(self.count()); + }; + }; self.remove = function() { // Just remove from UI - actual removal happens when saving the job job.sets.remove(self); @@ -115,14 +121,20 @@ function CPSet(data, job, api, profile) { return result; }); self.pct_complete = ko.computed(function() { - return Math.max(0, Math.round(100 * self.completed()/self.count())) + '%'; + let cplt = self.completed(); + let total = cplt + self.remaining(); + return Math.max(0, Math.round(100 * cplt/total)) + '%'; }); self.pct_active = ko.computed(function() { - return Math.max(0, Math.round(100 / self.count())) + '%'; + return Math.max(0, Math.round(100 / (self.completed() + self.remaining()))) + '%'; + }); + self.progress_text = ko.computed(function() { + let cplt = self.completed(); + let total = cplt + self.remaining(); + return `${cplt}/${total}`; }); // ==== Mutation methods ==== - self.set_material = function(t, v) { if (v === "Any") { v = ''; diff --git a/continuousprint/static/js/continuousprint_set.test.js b/continuousprint/static/js/continuousprint_set.test.js index bfce752..9c3ced6 100644 --- a/continuousprint/static/js/continuousprint_set.test.js +++ b/continuousprint/static/js/continuousprint_set.test.js @@ -22,9 +22,8 @@ function api() { } test('aggregate observables', () => { let i = new CPSet(data(), job, api()); - expect(i.length()).toBe(6); expect(i.path()).toBe("item.gcode"); - expect(i.length_completed()).toBe(5); + expect(i.length_remaining()).toBe(1); }); test('materials', () => { @@ -40,29 +39,31 @@ test('pct_complete', () => { let i = new CPSet(data(), job, api()); i.count(5); i.remaining(3); + i.completed(2); expect(i.pct_complete()).toBe("40%"); }); test('pct_active', () => { let i = new CPSet(data(), job, api()); i.count(5); + i.remaining(3); + i.completed(2); expect(i.pct_active()).toBe("20%"); }); -test('length_completed', () => { - let i = new CPSet(data(count=1), {count: () => 1, remaining: () => 1, completed: () => 0}); - expect(i.length_completed()).toBe(0); +test('shortName', () => { + let i = new CPSet({path: "/foo/bar.gcode"}, job, api()); + expect(i.shortName()).toEqual("bar.gcode"); +}); - // Completing a job should increase the length - i.remaining(0); - i.completed(1); - expect(i.length_completed()).toBe(1); +test('length_remaining', () => { + let i = new CPSet(data(count=1), {count: () => 1, remaining: () => 1, completed: () => 0}); + expect(i.length_remaining()).toBe(1); - // Job completion shouldn't double-count - i = new CPSet(data(count=1), {count: () => 1, remaining: () => 0, completed: () => 1}); + // Completing a job should decrease the length i.remaining(0); i.completed(1); - expect(i.length_completed()).toBe(1); + expect(i.length_remaining()).toBe(0); }); test('set_material', () => { diff --git a/continuousprint/static/js/continuousprint_viewmodel.js b/continuousprint/static/js/continuousprint_viewmodel.js index 1c554bb..44232e0 100644 --- a/continuousprint/static/js/continuousprint_viewmodel.js +++ b/continuousprint/static/js/continuousprint_viewmodel.js @@ -67,7 +67,16 @@ function CPViewModel(parameters) { // Patch the files panel to allow for adding to queue self.files.add = function(data) { - self.defaultQueue.addFile(data, self.settings.settings.plugins.continuousprint.cp_infer_profile() || false); + // We first look for any queues with draft jobs - add the file here if so + // Otherwise it goes into the default queue. + let fq = self.defaultQueue; + for (let q of self.queues()) { + if (q.hasDraftJobs()) { + fq = q; + break; + } + } + fq.addFile(data, self.settings.settings.plugins.continuousprint.cp_infer_profile() || false); }; // Also patch file deletion, to show a modal if the file is in the queue let oldRemove = self.files.removeFile; @@ -167,11 +176,11 @@ function CPViewModel(parameters) { } let cpq = new CPQueue(q, self.api, self.files, self.profile); - // Replace draft entries + // Replace draft entries that are still in draft let cpqj = cpq.jobs(); for (let i = 0; i < q.jobs.length; i++) { let draft = drafts[cpqj[i].id()]; - if (draft !== undefined) { + if (draft !== undefined && cpqj[i].draft()) { cpq.jobs.splice(i, 1, draft); } } @@ -220,42 +229,43 @@ function CPViewModel(parameters) { self.draggingJob(vm.constructor.name === "CPJob"); }; - self.sortEnd = function(evt, vm, src) { + self._getElemIdx = function(elem, pcls) { + while (!elem.classList.contains(pcls)) { + elem = elem.parentElement; + } + let siblings = elem.parentElement.children; + for (let i=0; i < siblings.length; i++) { + if (siblings[i] === elem) { + return i; + } + } + return null; + }; + + self.sortEnd = function(evt, vm, src, idx=null) { // Re-enable default drag and drop behavior self.files.onServerConnect(); self.draggingSet(false); self.draggingJob(false); - // If we're dragging a job out of the local queue and into a network queue, - // we must warn the user about the irreversable action before making the change. - // This fully replaces the default sort action - if (evt.from.classList.contains("local") && !evt.to.classList.contains("local")) { - let targetQueue = ko.dataFor(evt.to); - // Undo the move done by CPSortable and trigger updates - // This is inefficient (i.e. could instead prevent the transfer) but that - // would require substantial edits to the CPSortable library. - targetQueue.jobs.splice(evt.newIndex, 1); - src.jobs.splice(evt.oldIndex, 0, vm); - src.jobs.valueHasMutated(); - targetQueue.jobs.valueHasMutated(); - - return self.showSubmitJobDialog(vm, targetQueue); - } - // Sadly there's no "destination job" information, so we have to // infer the index of the job based on the rendered HTML given by evt.to if (vm.constructor.name === "CPJob") { - let jobs = self.defaultQueue.jobs(); - let dest_idx = jobs.indexOf(vm); - - let ids = [] - for (let j of jobs) { - ids.push(j.id()); - } + let destq = self.queues()[self._getElemIdx(evt.to, "cp-queue")]; + let dest_idx = evt.newIndex; self.api.mv(self.api.JOB, { + src_queue: src.name, + dest_queue: destq.name, id: vm.id(), - after_id: (dest_idx > 0) ? jobs[dest_idx-1].id() : -1 - }, (result) => {}); + after_id: (dest_idx > 0) ? destq.jobs()[dest_idx-1].id() : null + }, (result) => { + if (result.error) { + self.onDataUpdaterPluginMessage("continuousprint", {type: "error", msg: result.error}); + } + // Refresh in case of error or if the move modified job internals (e.g. on job submission) + // Do this as a timeout to allow for UI rendering / RPC calls to complete + setTimeout(self._loadState, 0); + }); } }; @@ -268,6 +278,10 @@ function CPViewModel(parameters) { if (evt.from.id === "queue_sets" && !evt.to.classList.contains("draft")) { return false; } + // No dragging items in non-ready queues + if (evt.to.classList.contains("loading")) { + return false; + } return true; }; @@ -342,35 +356,6 @@ function CPViewModel(parameters) { self.hasSpoolManager(statusCode !== 404); }); - - self.dialog = $("#cpq_submitDialog"); - self.jobSendDetails = ko.observable(); - self.jobSendTitle = ko.computed(function() { - let details = self.jobSendDetails(); - if (details === undefined) { - return ""; - } - return `Send ${details[0]._name()} to ${details[1].name}?`; - }); - self.submitJob = function() { - let details = self.jobSendDetails(); - self.api.submit(self.api.JOB, {id: details[0].id(), queue: details[1].name}, (result) => { - if (result.error) { - self.onDataUpdaterPluginMessage("continuousprint", {type: "error", msg: result.error}); - } else { - self._setState(result); - } - }); - self.dialog.modal('hide'); - } - self.showSubmitJobDialog = function(job, queue) { - self.jobSendDetails([job, queue]); - self.dialog.modal({}).css({ - width: 'auto', - 'margin-left': function() { return -($(this).width() /2); } - }); - } - self.humanize = function(num) { // Humanizes numbers by condensing and adding units if (num < 1000) { @@ -384,6 +369,17 @@ function CPViewModel(parameters) { } }; + self.hasDraftJob = ko.computed(function() { + for (let q of self.queues()) { + for (let j of q.jobs()) { + if (j.draft()) { + return true; + } + } + } + return false; + }); + /* ===== History Tab ===== */ self.history = ko.observableArray(); self.isDivider = function(data) { diff --git a/continuousprint/static/js/continuousprint_viewmodel.test.js b/continuousprint/static/js/continuousprint_viewmodel.test.js index 36fcac8..5c75add 100644 --- a/continuousprint/static/js/continuousprint_viewmodel.test.js +++ b/continuousprint/static/js/continuousprint_viewmodel.test.js @@ -148,23 +148,25 @@ describe('sortMove', () => { test('sortEnd job to start', () => { let v = init(); + v._getElemIdx = () => 0; // To get queue let ccont = {classList: {contains: () => true}}; - let evt = {from: ccont, to: ccont}; + let evt = {from: ccont, to: ccont, newIndex: 0}; let j = v.defaultQueue.jobs()[0]; - v.sortEnd(evt, j, null); + v.sortEnd(evt, j, v.defaultQueue, dataFor=function(elem) {return v.defaultQueue}); expect(v.files.onServerConnect).toHaveBeenCalled(); expect(v.api.mv).toHaveBeenCalled(); let data = v.api.mv.mock.calls[0][1]; expect(data.id).toEqual(j.id()); - expect(data.after_id).toEqual(-1); + expect(data.after_id).toEqual(null); }); test('sortEnd job to end', () => { let v = init(njobs=2); + v._getElemIdx = () => 0; // To get queue let ccont = {classList: {contains: () => true}}; - let evt = {from: ccont, to: ccont}; + let evt = {from: ccont, to: ccont, newIndex: 1}; let j = v.defaultQueue.jobs()[1]; - v.sortEnd(evt, j, null); + v.sortEnd(evt, j, v.defaultQueue, dataFor=function(elem) {return v.defaultQueue}); expect(v.files.onServerConnect).toHaveBeenCalled(); expect(v.api.mv).toHaveBeenCalled(); let data = v.api.mv.mock.calls[0][1]; diff --git a/continuousprint/static/ttf/Cabin/Cabin-Bold.ttf b/continuousprint/static/ttf/Cabin/Cabin-Bold.ttf new file mode 100644 index 0000000..4f5a57f Binary files /dev/null and b/continuousprint/static/ttf/Cabin/Cabin-Bold.ttf differ diff --git a/continuousprint/static/ttf/Cabin/Cabin-BoldItalic.ttf b/continuousprint/static/ttf/Cabin/Cabin-BoldItalic.ttf new file mode 100644 index 0000000..1118400 Binary files /dev/null and b/continuousprint/static/ttf/Cabin/Cabin-BoldItalic.ttf differ diff --git a/continuousprint/static/ttf/Cabin/Cabin-Italic.ttf b/continuousprint/static/ttf/Cabin/Cabin-Italic.ttf new file mode 100644 index 0000000..a49b9fb Binary files /dev/null and b/continuousprint/static/ttf/Cabin/Cabin-Italic.ttf differ diff --git a/continuousprint/static/ttf/Cabin/Cabin-Medium.ttf b/continuousprint/static/ttf/Cabin/Cabin-Medium.ttf new file mode 100644 index 0000000..069b842 Binary files /dev/null and b/continuousprint/static/ttf/Cabin/Cabin-Medium.ttf differ diff --git a/continuousprint/static/ttf/Cabin/Cabin-MediumItalic.ttf b/continuousprint/static/ttf/Cabin/Cabin-MediumItalic.ttf new file mode 100644 index 0000000..0113ddb Binary files /dev/null and b/continuousprint/static/ttf/Cabin/Cabin-MediumItalic.ttf differ diff --git a/continuousprint/static/ttf/Cabin/Cabin-Regular.ttf b/continuousprint/static/ttf/Cabin/Cabin-Regular.ttf new file mode 100644 index 0000000..4b78301 Binary files /dev/null and b/continuousprint/static/ttf/Cabin/Cabin-Regular.ttf differ diff --git a/continuousprint/static/ttf/Cabin/Cabin-SemiBold.ttf b/continuousprint/static/ttf/Cabin/Cabin-SemiBold.ttf new file mode 100644 index 0000000..bf78eb9 Binary files /dev/null and b/continuousprint/static/ttf/Cabin/Cabin-SemiBold.ttf differ diff --git a/continuousprint/static/ttf/Cabin/Cabin-SemiBoldItalic.ttf b/continuousprint/static/ttf/Cabin/Cabin-SemiBoldItalic.ttf new file mode 100644 index 0000000..3546fa8 Binary files /dev/null and b/continuousprint/static/ttf/Cabin/Cabin-SemiBoldItalic.ttf differ diff --git a/continuousprint/storage/database.py b/continuousprint/storage/database.py index b8b44d7..98ffe94 100644 --- a/continuousprint/storage/database.py +++ b/continuousprint/storage/database.py @@ -113,6 +113,7 @@ def as_dict(self): sets.sort(key=lambda s: s.rank) sets = [s.as_dict() for s in sets] d = dict( + queue=self.queue.name, name=self.name, count=self.count, draft=self.draft, @@ -151,7 +152,7 @@ def from_dict(self, data: dict): return j def refresh_sets(self): - Set.update(remaining=Set.count).where(Set.job == self).execute() + Set.update(remaining=Set.count, completed=0).where(Set.job == self).execute() class SetView: @@ -176,6 +177,7 @@ def is_printable(self, profile): def decrement(self, profile): self.remaining = max(0, self.remaining - 1) + self.completed += 1 self.save() # Save must occur before job is observed return self.job.next_set(profile) @@ -193,6 +195,7 @@ def as_dict(self): rank=self.rank, sd=self.sd, remaining=self.remaining, + completed=self.completed, ) @@ -203,7 +206,17 @@ class Set(Model, SetView): rank = FloatField() count = IntegerField(default=1, constraints=[Check("count >= 0")]) remaining = IntegerField( - default=1, constraints=[Check("remaining >= 0"), Check("remaining <= count")] + # Unlike Job, Sets can have remaining > count if the user wants to print + # additional sets as a one-off correction (e.g. a print fails) + default=1, + constraints=[Check("remaining >= 0")], + ) + completed = IntegerField( + # Due to one-off corrections to "remaining", it's important to track + # completions separately as completed + remaining != count + # This is different from jobs, where this equation holds. + default=0, + constraints=[Check("completed >= 0")], ) # This is a CSV of material key strings referencing SpoolManager entities @@ -276,7 +289,7 @@ def file_exists(path: str) -> bool: def populate(): DB.queues.create_tables(MODELS) - StorageDetails.create(schemaVersion="0.0.2") + StorageDetails.create(schemaVersion="0.0.3") Queue.create(name=LAN_QUEUE, addr="auto", strategy="LINEAR", rank=1) Queue.create(name=DEFAULT_QUEUE, strategy="LINEAR", rank=0) Queue.create(name=ARCHIVE_QUEUE, strategy="LINEAR", rank=-1) @@ -308,7 +321,43 @@ def init(db_path="queues.sqlite3", logger=None): details.schemaVersion = "0.0.2" details.save() - if details.schemaVersion != "0.0.2": + if details.schemaVersion == "0.0.2": + # Constraint removal isn't allowed in sqlite, so we have + # to recreate the table and move the entries over. + # We also added a new `completed` field, so some calculation is needed. + class TempSet(Set): + pass + + if logger is not None: + logger.warning( + f"Beginning migration to v0.0.3 for decoupled completions - {Set.select().count()} sets to migrate" + ) + with db.atomic(): + TempSet.create_table(safe=True) + for s in Set.select( + Set.path, + Set.sd, + Set.job, + Set.rank, + Set.count, + Set.remaining, + Set.material_keys, + Set.profile_keys, + ).execute(): + attrs = {} + for f in Set._meta.sorted_field_names: + attrs[f] = getattr(s, f) + attrs["completed"] = max(0, attrs["count"] - attrs["remaining"]) + TempSet.create(**attrs) + if logger is not None: + logger.warning(f"Migrating set {s.path} to schema v0.0.3") + + Set.drop_table(safe=True) + db.execute_sql('ALTER TABLE "tempset" RENAME TO "set";') + details.schemaVersion = "0.0.3" + details.save() + + if details.schemaVersion != "0.0.3": raise Exception("Unknown DB schema version: " + details.schemaVersion) if logger is not None: @@ -316,6 +365,8 @@ def init(db_path="queues.sqlite3", logger=None): except Exception: raise Exception("Failed to fetch storage schema details!") + return db + def migrateFromSettings(data: list): # Prior to v2.0.0, all state for the plugin was stored in a json-serialized list @@ -360,6 +411,7 @@ def migrateFromSettings(data: list): rank=len(j.sets), count=1, remaining=0, + completed=0, material_keys=mkeys, ) else: diff --git a/continuousprint/storage/database_test.py b/continuousprint/storage/database_test.py index b121309..e8c9f33 100644 --- a/continuousprint/storage/database_test.py +++ b/continuousprint/storage/database_test.py @@ -8,6 +8,7 @@ Job, Set, Run, + StorageDetails, DEFAULT_QUEUE, ) import tempfile @@ -18,7 +19,7 @@ class DBTest(unittest.TestCase): def setUp(self): self.tmp = tempfile.NamedTemporaryFile(delete=True) - init_db(self.tmp.name, logger=logging.getLogger()) + self.db = init_db(self.tmp.name, logger=logging.getLogger()) self.q = Queue.get(name=DEFAULT_QUEUE) def tearDown(self): @@ -92,6 +93,30 @@ def testMigrationMidSet(self): self.assertEqual(s.count, 2) self.assertEqual(s.remaining, 1) + def testMigrationSchemav2tov3(self): + details = StorageDetails.select().limit(1).execute()[0] + details.schemaVersion = "0.0.2" + details.save() + q = Queue.get(name=DEFAULT_QUEUE) + j = Job.create(name="j", queue_id=q.id, rank=0) + s = Set.create( + path="foo.gcode", + remaining=3, + count=5, + completed=0, + sd=False, + job_id=j.id, + rank=1, + ) + + self.db = init_db(self.tmp.name, logger=logging.getLogger()) + + # Destination set both exists and has computed `completed` field. + # We don't actually check whether the constraints were properly applied, just assume that + # new table creation takes care of it. + s2 = Set.get(s.id) + self.assertEqual(s2.completed, s.count - s.remaining) + class TestEmptyJob(DBTest): def setUp(self): @@ -157,10 +182,12 @@ def testDecrementUnstartedSet(self): def testDecrementCompletedSet(self): self.s.remaining = 0 + self.s.completed = 5 self.s.save() self.j.decrement() self.assertEqual(self.j.remaining, 4) self.assertEqual(self.j.sets[0].remaining, 5) + self.assertEqual(self.j.sets[0].completed, 0) def testDecrementPartialSet(self): self.s.remaining = 3 diff --git a/continuousprint/storage/lan.py b/continuousprint/storage/lan.py index 08c3360..a9b3c05 100644 --- a/continuousprint/storage/lan.py +++ b/continuousprint/storage/lan.py @@ -1,4 +1,6 @@ from .database import JobView, SetView +from pathlib import Path +from .queries import getint from requests.exceptions import HTTPError @@ -10,23 +12,38 @@ def __init__(self, lq): class LANJobView(JobView): def __init__(self, manifest, lq): - for attr in ("name", "count", "created"): - setattr(self, attr, manifest[attr]) - self.remaining = manifest.get("remaining", self.count) + # === Fields present in JobView === + self.name = manifest.get("name", "") + self.created = getint(manifest, "created") + self.count = getint(manifest, "count") + self.remaining = getint(manifest, "remaining", default=self.count) self.queue = LANQueueView(lq) self.id = manifest["id"] + self.updateSets(manifest["sets"]) + self.draft = manifest.get("draft", False) + self.acquired = manifest.get("acquired", False) + + # === LANJobView specific fields === self.peer = manifest["peer_"] - self.sets = [] - self.draft = False - self.acquired = None - self.sets = [LANSetView(s, self, i) for i, s in enumerate(manifest["sets"])] + self.hash = manifest.get("hash") + + def get_base_dir(self): + return self.queue.lq.get_gjob_dirpath(self.peer, self.hash) + + def updateSets(self, sets_list): + self.sets = [LANSetView(s, self, i) for i, s in enumerate(sets_list)] def save(self): - self.queue.lq.set_job(self.id, self.as_dict()) + # as_dict implemented in JobView doesn't handle LANJobView specific fields, so we must inject them here. + d = self.as_dict() + d["peer_"] = self.peer + d["hash"] = self.hash + self.queue.lq.set_job(self.id, d) def refresh_sets(self): for s in self.sets: s.remaining = s.count + s.completed = 0 self.save() @@ -38,11 +55,12 @@ class LANSetView(SetView): def __init__(self, data, job, rank): self.job = job self.sd = False - self.rank = rank + self.rank = int(rank) self.id = f"{job.id}_{rank}" for attr in ("path", "count"): setattr(self, attr, data[attr]) - self.remaining = data.get("remaining", self.count) + self.remaining = getint(data, "remaining", default=self.count) + self.completed = getint(data, "completed") self.material_keys = ",".join(data.get("materials", [])) self.profile_keys = ",".join(data.get("profiles", [])) self._resolved = None @@ -50,9 +68,7 @@ def __init__(self, data, job, rank): def resolve(self) -> str: if self._resolved is None: try: - self._resolved = self.job.queue.lq.resolve_set( - self.job.peer, self.job.id, self.path - ) + self._resolved = str(Path(self.job.get_base_dir()) / self.path) except HTTPError as e: raise ResolveError(f"Failed to resolve {self.path}") from e return self._resolved diff --git a/continuousprint/storage/lan_test.py b/continuousprint/storage/lan_test.py index bd85495..2489b7d 100644 --- a/continuousprint/storage/lan_test.py +++ b/continuousprint/storage/lan_test.py @@ -22,18 +22,28 @@ def setUp(self): self.s = self.j.sets[0] def test_resolve_file(self): - self.lq.resolve_set.return_value = "/path/to/set.gcode" - self.assertEqual(self.s.resolve(), "/path/to/set.gcode") + self.lq.get_gjob_dirpath.return_value = "/path/to/" + self.assertEqual(self.s.resolve(), "/path/to/a.gcode") def test_resolve_http_error(self): - self.lq.resolve_set.side_effect = HTTPError + self.lq.get_gjob_dirpath.side_effect = HTTPError with self.assertRaises(ResolveError): self.s.resolve() def test_decrement_refreshes_sets_and_saves(self): self.s.remaining = 0 + self.s.completed = 5 self.j.decrement() self.lq.set_job.assert_called() self.assertEqual( self.lq.set_job.call_args[0][1]["sets"][0]["remaining"], self.s.count ) + self.assertEqual(self.lq.set_job.call_args[0][1]["sets"][0]["completed"], 0) + + def test_save_persists_peer_and_hash(self): + self.j.peer = "foo" + self.j.hash = "bar" + self.j.save() + data = self.lq.set_job.call_args[0][1] + self.assertEqual(data["peer_"], "foo") + self.assertEqual(data["hash"], "bar") diff --git a/continuousprint/storage/queries.py b/continuousprint/storage/queries.py index d40ffcb..e803c2f 100644 --- a/continuousprint/storage/queries.py +++ b/continuousprint/storage/queries.py @@ -11,6 +11,13 @@ MAX_COUNT = 999999 +def getint(d, k, default=0): + v = d.get(k, default) + if type(v) == str: + v = int(v) + return v + + def clearOldState(): # On init, scrub the local DB for any state that may have been left around # due to an improper shutdown @@ -152,12 +159,18 @@ def _upsertSet(set_id, data, job): for k, v in data.items(): if k in ( "id", - "count", - "remaining", "materials", "profiles", ): # ignored or handled below continue + + # parse and limit integer values + if k in ( + "count", + "remaining", + ): + v = min(int(v), MAX_COUNT) + setattr(s, k, v) s.job = job @@ -167,18 +180,6 @@ def _upsertSet(set_id, data, job): s.profile_keys = ",".join( ["" if p is None else p for p in data.get("profiles", [])] ) - - if data.get("count") is not None: - newCount = min(int(data["count"]), MAX_COUNT) - inc = newCount - s.count - s.count = newCount - s.remaining = min(newCount, s.remaining + max(inc, 0)) - # Boost job remaining if we would cause it to be incomplete - job_remaining = s.job.remaining - if inc > 0 and job_remaining == 0: - job_remaining = 1 - s.job.remaining = 1 - s.job.save() s.save() @@ -189,32 +190,19 @@ def updateJob(job_id, data, queue=DEFAULT_QUEUE): except Job.DoesNotExist: q = Queue.get(name=queue) j = newEmptyJob(q) + for k, v in data.items(): if k in ( "id", - "count", - "remaining", "sets", + "queue", ): # ignored or handled separately continue - setattr(j, k, v) - clear_sets = False - if data.get("count") is not None: - newCount = min(int(data["count"]), MAX_COUNT) - inc = newCount - j.count - j.remaining = min(newCount, j.remaining + max(inc, 0)) - - # Edge case: clear sets if we are adding 1 to a completed job, - # as this is otherwise suppressed by job-end accounting. - clear_sets = newCount > j.count - if clear_sets: - for s in j.sets: - if s.remaining != 0: - clear_sets = False - break - j.count = newCount - j.save() + # Parse and bound integer values + if k in ("count", "remaining"): + v = min(int(v), MAX_COUNT) + setattr(j, k, v) if data.get("sets") is not None: # Remove any missing sets @@ -227,9 +215,7 @@ def updateJob(job_id, data, queue=DEFAULT_QUEUE): s["rank"] = float(i) _upsertSet(s["id"], s, j) - if clear_sets: - j.refresh_sets() - + j.save() return Job.get(id=job_id).as_dict() @@ -256,18 +242,24 @@ def _rankEnd(): return time.time() -def _moveImpl(cls, src, dest_id: int, retried=False): - if dest_id == -1: +def _moveImpl(src, dest_id, retried=False): + if dest_id is None: destRank = 0 else: - destRank = cls.get(id=dest_id).rank + dest_id = int(dest_id) + destRank = Job.get(id=dest_id).rank # Get the next object having a rank beyond the destination rank, # so we can then split the difference # Note the unary '&' operator and the expressions wrapped in parens (a limitation of peewee) postRank = ( - cls.select(cls.rank) - .where((cls.rank > destRank) & (cls.id != src.id)) + Job.select(Job.rank) + .where( + (Job.rank > destRank) + & (Job.id != src.id) + & (Job.queue.name != ARCHIVE_QUEUE) + ) + .order_by(Job.rank) .limit(1) .execute() ) @@ -277,13 +269,16 @@ def _moveImpl(cls, src, dest_id: int, retried=False): postRank = MAX_RANK # Pick the target value as the midpoint between the two ranks candidate = abs(postRank - destRank) / 2 + min(postRank, destRank) + # print( + # f"_moveImpl abs({postRank} - {destRank})/2 + min({postRank}, {destRank}) = {candidate}" + # ) # We may end up with an invalid candidate if we hit a singularity - in this case, rebalance all the # rows and try again if candidate <= destRank or candidate >= postRank: if not retried: - _rankBalance(cls) - _moveImpl(cls, src, dest_id, retried=True) + _rankBalance(Job) + _moveImpl(src, dest_id, retried=True) else: raise Exception("Could not rebalance job rank to move job") else: @@ -293,18 +288,7 @@ def _moveImpl(cls, src, dest_id: int, retried=False): def moveJob(src_id: int, dest_id: int): j = Job.get(id=src_id) - return _moveImpl(Job, j, dest_id) - - -def moveSet(src_id: int, dest_id: int, dest_job: int): - s = Set.get(id=src_id) - if dest_job == -1: - j = newEmptyJob(s.job.queue) - else: - j = Job.get(id=dest_job) - s.job = j - s.save() - _moveImpl(Set, s, dest_id) + return _moveImpl(j, dest_id) def newEmptyJob(q, name="", rank=_rankEnd): @@ -337,7 +321,7 @@ def appendSet(queue: str, jid, data: dict, rank=_rankEnd): if j.is_dirty(): j.save() - count = int(data["count"]) + count = getint(data, "count") sd = data.get("sd", "false") s = Set.create( path=data["path"], @@ -346,7 +330,8 @@ def appendSet(queue: str, jid, data: dict, rank=_rankEnd): material_keys=",".join(data.get("materials", "")), profile_keys=",".join(data.get("profiles", "")), count=count, - remaining=count, + remaining=getint(data, "remaining", count), + completed=getint(data, "completed"), job=j, ) @@ -386,7 +371,11 @@ def resetJobs(job_ids: list): updated += ( Job.update(remaining=Job.count).where(Job.id.in_(job_ids)).execute() ) - updated += Set.update(remaining=Set.count).where(Set.job.in_(job_ids)).execute() + updated += ( + Set.update(remaining=Set.count, completed=0) + .where(Set.job.in_(job_ids)) + .execute() + ) return dict(num_updated=updated) diff --git a/continuousprint/storage/queries_test.py b/continuousprint/storage/queries_test.py index a5ff409..8f53b64 100644 --- a/continuousprint/storage/queries_test.py +++ b/continuousprint/storage/queries_test.py @@ -126,16 +126,6 @@ def testUpdateJobSetProfiles(self): q.updateJob(1, dict(sets=[dict(id=1, profiles=["a", "b"])])) self.assertEqual(Set.get(id=1).profiles(), ["a", "b"]) - def testUpdateJobIncrementCompleted(self): - # Incrementing the count of a completed job should refresh all sets within the job - j = Job.get(id=1) - s = Set.get(id=1) - s.remaining = 0 - s.save() - - q.updateJob(1, dict(count=j.count + 1)) - self.assertEqual(Set.get(id=1).remaining, 1) - def testRemoveJob(self): q.remove(job_ids=[1]) self.assertEqual(len(q.getJobsAndSets(DEFAULT_QUEUE)), 0) # No jobs or sets @@ -229,59 +219,47 @@ def testResetJob(self): j = Job.get(id=1) s = j.sets[0] s.remaining = 0 + s.completed = 3 j.remaining = 0 s.save() j.save() q.resetJobs([j.id]) # Replenishing the job replenishes all sets self.assertEqual(Set.get(id=s.id).remaining, 1) + self.assertEqual(Set.get(id=s.id).completed, 0) self.assertEqual(Job.get(id=j.id).remaining, 1) def testUpdateJobCount(self): + q.updateJob(1, dict(count=5, remaining=5)) j = Job.get(id=1) - for before, after in [ - ((1, 1), (2, 2)), - ((2, 2), (1, 1)), - ((2, 1), (1, 1)), - ((2, 1), (3, 2)), - ((5, 3), (2, 2)), - ]: - with self.subTest(f"(count,remaining) {before} -> {after}"): - j.count = before[0] - j.remaining = before[1] - j.save() - - q.updateJob(j.id, dict(count=after[0])) - j2 = Job.get(id=j.id) - self.assertEqual(j2.count, after[0]) - self.assertEqual(j2.remaining, after[1]) + self.assertEqual(j.count, 5) + self.assertEqual(j.remaining, 5) - def testUpdateSetCount(self): + def testUpdateJobCountZeros(self): + q.updateJob(1, dict(count=0, remaining=0)) j = Job.get(id=1) - s = Set.get(id=1) - for before, after in [ - # (set count, set remaining, job remaining) - ((1, 1, 1), (2, 2, 1)), - ((2, 2, 1), (1, 1, 1)), - ((4, 2, 1), (3, 2, 1)), - ((1, 1, 0), (2, 2, 1)), # Set now runnable, so refresh job - ((2, 0, 0), (1, 0, 0)), # Set no remaining, so don't refresh job - ]: - with self.subTest( - f"(set.count,set.remaining,job.remaining) {before} -> {after}" - ): - s.count = before[0] - s.remaining = before[1] - s.save() - j.remaining = before[2] - j.count = max(before[2], 1) - j.save() + self.assertEqual(j.count, 0) + self.assertEqual(j.remaining, 0) - q.updateJob(j.id, dict(sets=[dict(id=s.id, count=after[0])])) - s2 = Set.get(id=s.id) - self.assertEqual(s2.count, after[0]) - self.assertEqual(s2.remaining, after[1]) - self.assertEqual(s2.job.remaining, after[2]) + def testUpdateJobInvalid(self): + # Can't have remaining > count for jobs (sets are fine though) + with self.assertRaises(Exception): + q.updateJob(1, dict(count=1, remaining=2)) + + # Negative count not allowed + with self.assertRaises(Exception): + q.updateJob(1, dict(count=-5)) + + # Negative remaining not allowed + with self.assertRaises(Exception): + q.updateJob(1, dict(remaining=-5)) + + def testUpdateSetCountAndRemaining(self): + # Note that remaining can exceed count + q.updateJob(1, dict(sets=[dict(id=1, count=10, remaining=15)])) + s2 = Set.get(id=1) + self.assertEqual(s2.count, 10) + self.assertEqual(s2.remaining, 15) class TestMultiItemQueue(DBTest): @@ -319,27 +297,11 @@ def rank(): ) def testMoveJob(self): - for (moveArgs, want) in [((1, 2), [2, 1]), ((2, -1), [2, 1])]: + for (moveArgs, want) in [((1, 2), [2, 1]), ((2, None), [2, 1])]: with self.subTest(f"moveJob({moveArgs}) -> want {want}"): q.moveJob(*moveArgs) self.assertEqual([j.id for j in q.getJobsAndSets(DEFAULT_QUEUE)], want) - def testMoveSet(self): - for (desc, moveArgs, want) in [ - ("FirstToLast", (1, 2, 1), [2, 1, 3, 4]), - ("LastToFirst", (2, -1, 1), [2, 1, 3, 4]), - ("DiffJob", (1, 3, 2), [2, 3, 1, 4]), - ("NewJob", (1, -1, -1), [2, 3, 4, 1]), - ]: - with self.subTest(f"{desc}: moveSet({moveArgs})"): - q.moveSet(*moveArgs) - set_order = [ - (s["id"], s["rank"]) - for j in q.getJobsAndSets(DEFAULT_QUEUE) - for s in j.as_dict()["sets"] - ] - self.assertEqual(set_order, [(w, ANY) for w in want]) - def testGetNextJobAfterDecrement(self): j = q.getNextJobInQueue(DEFAULT_QUEUE, PROFILE) s = j.sets[0] diff --git a/continuousprint/templates/continuousprint_settings.jinja2 b/continuousprint/templates/continuousprint_settings.jinja2 index 9962ba2..909b046 100644 --- a/continuousprint/templates/continuousprint_settings.jinja2 +++ b/continuousprint/templates/continuousprint_settings.jinja2 @@ -1,4 +1,7 @@ -

Continuous Print

+
+ Continuous Print +
Continuous Print Queue
+