From 718cf344e743b4b33fcb2346c5efad1f84db4c6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Thu, 3 Mar 2016 11:54:30 +0100 Subject: [PATCH 1/6] remove remnants of workers --- connector/queue/queue.py | 41 ---- connector/queue/worker.py | 352 --------------------------------- connector/tests/__init__.py | 2 - connector/tests/test_queue.py | 41 ---- connector/tests/test_worker.py | 12 -- 5 files changed, 448 deletions(-) delete mode 100644 connector/queue/queue.py delete mode 100644 connector/queue/worker.py delete mode 100644 connector/tests/test_queue.py delete mode 100644 connector/tests/test_worker.py diff --git a/connector/queue/queue.py b/connector/queue/queue.py deleted file mode 100644 index 83a8a1f85..000000000 --- a/connector/queue/queue.py +++ /dev/null @@ -1,41 +0,0 @@ -# -*- coding: utf-8 -*- -############################################################################## -# -# Author: Guewen Baconnier -# Copyright 2013 Camptocamp SA -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## -from __future__ import absolute_import -from Queue import PriorityQueue - - -class JobsQueue(object): - """ Holds the jobs planned for execution in memory. - - The Jobs are sorted, the higher the priority is, - the earlier the jobs are dequeued. - """ - - def __init__(self): - self._queue = PriorityQueue() - - def enqueue(self, job): - self._queue.put_nowait(job) - - def dequeue(self): - """ Take the first job according to its priority - and return it""" - return self._queue.get() diff --git a/connector/queue/worker.py b/connector/queue/worker.py deleted file mode 100644 index e9a5873f2..000000000 --- a/connector/queue/worker.py +++ /dev/null @@ -1,352 +0,0 @@ -# -*- coding: utf-8 -*- -############################################################################## -# -# Author: Guewen Baconnier -# Copyright 2013 Camptocamp SA -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - -import logging -import os -import threading -import time -import traceback -import uuid -from datetime import datetime -from StringIO import StringIO - -from psycopg2 import OperationalError, ProgrammingError - -import openerp -from openerp.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY -from openerp.service import db -from openerp.tools import config -from .queue import JobsQueue -from ..session import ConnectorSessionHandler -from .job import (OpenERPJobStorage, - PENDING, - DONE) -from ..exception import (NoSuchJobError, - NotReadableJobError, - RetryableJobError, - FailedJobError, - NothingToDoJob) - -_logger = logging.getLogger(__name__) - -WAIT_CHECK_WORKER_ALIVE = 30 # seconds -WAIT_WHEN_ONLY_AFTER_JOBS = 10 # seconds -WORKER_TIMEOUT = 5 * 60 # seconds -PG_RETRY = 5 # seconds - - -class Worker(threading.Thread): - """ Post and retrieve jobs from the queue, execute them""" - - queue_class = JobsQueue - job_storage_class = OpenERPJobStorage - - def __init__(self, db_name, watcher_): - super(Worker, self).__init__() - self.queue = self.queue_class() - self.db_name = db_name - threading.current_thread().dbname = db_name - self.uuid = unicode(uuid.uuid4()) - self.watcher = watcher_ - - def run_job(self, job): - """ Execute a job """ - def retry_postpone(job, message, seconds=None): - with session_hdl.session() as session: - job.postpone(result=message, seconds=seconds) - job.set_enqueued(self) - self.job_storage_class(session).store(job) - self.queue.enqueue(job) - - session_hdl = ConnectorSessionHandler(self.db_name, - openerp.SUPERUSER_ID) - try: - with session_hdl.session() as session: - job = self._load_job(session, job.uuid) - if job is None: - return - - # if the job has been manually set to DONE or PENDING - # before its execution, stop - if job.state in (DONE, PENDING): - return - - # the job has been enqueued in this worker but has likely be - # modified in the database since its enqueue - if job.worker_uuid != self.uuid: - # put the job in pending so it can be requeued - _logger.error('Job %s was enqueued in worker %s but ' - 'was linked to worker %s. Reset to pending.', - job.uuid, self.uuid, job.worker_uuid) - with session_hdl.session() as session: - job.set_pending() - self.job_storage_class(session).store(job) - return - - if job.eta and job.eta > datetime.now(): - # The queue is sorted by 'eta' date first - # so if we dequeued a job expected to be run in - # the future, we have no jobs to do right now! - self.queue.enqueue(job) - # Wait some time just to avoid to loop over - # the same 'future' jobs - _logger.debug('Wait %s seconds because the delayed ' - 'jobs have been reached', - WAIT_WHEN_ONLY_AFTER_JOBS) - time.sleep(WAIT_WHEN_ONLY_AFTER_JOBS) - return - - with session_hdl.session() as session: - job.set_started() - self.job_storage_class(session).store(job) - - _logger.debug('%s started', job) - with session_hdl.session() as session: - job.perform(session) - job.set_done() - self.job_storage_class(session).store(job) - _logger.debug('%s done', job) - - except NothingToDoJob as err: - if unicode(err): - msg = unicode(err) - else: - msg = None - job.cancel(msg) - with session_hdl.session() as session: - self.job_storage_class(session).store(job) - - except RetryableJobError as err: - # delay the job later, requeue - retry_postpone(job, unicode(err), seconds=err.seconds) - _logger.debug('%s postponed', job) - - except OperationalError as err: - # Automatically retry the typical transaction serialization errors - if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: - raise - retry_postpone(job, unicode(err), seconds=PG_RETRY) - _logger.debug('%s OperationalError, postponed', job) - - except (FailedJobError, Exception): - buff = StringIO() - traceback.print_exc(file=buff) - _logger.error(buff.getvalue()) - - job.set_failed(exc_info=buff.getvalue()) - with session_hdl.session() as session: - self.job_storage_class(session).store(job) - raise - - def _load_job(self, session, job_uuid): - """ Reload a job from the backend """ - try: - job = self.job_storage_class(session).load(job_uuid) - except NoSuchJobError: - # just skip it - job = None - except NotReadableJobError: - _logger.exception('Could not read job: %s', job_uuid) - raise - return job - - def run(self): - """ Worker's main loop - - Check if it still exists in the ``watcher``. When it does no - longer exist, it break the loop so the thread stops properly. - - Wait for jobs and execute them sequentially. - """ - while 1: - # check if the worker has to exit (db destroyed, connector - # uninstalled) - if self.watcher.worker_lost(self): - break - job = self.queue.dequeue() - try: - self.run_job(job) - except: - continue - - def enqueue_job_uuid(self, job_uuid): - """ Enqueue a job: - - It will be executed by the worker as soon as possible (according - to the job's priority - """ - session_hdl = ConnectorSessionHandler(self.db_name, - openerp.SUPERUSER_ID) - with session_hdl.session() as session: - job = self._load_job(session, job_uuid) - if job is None: - # skip a deleted job - return - job.set_enqueued(self) - self.job_storage_class(session).store(job) - # the change of state should be commited before - # the enqueue otherwise we may have concurrent updates - # if the job is started directly - self.queue.enqueue(job) - _logger.debug('%s enqueued in %s', job, self) - - -class WorkerWatcher(threading.Thread): - """ Keep a sight on the workers and signal their aliveness. - - A `WorkerWatcher` is shared between databases, so only 1 instance is - necessary to check the aliveness of the workers for every database. - """ - - def __init__(self): - super(WorkerWatcher, self).__init__() - self._workers = {} - - def _new(self, db_name): - """ Create a new worker for the database """ - if db_name in self._workers: - raise Exception('Database %s already has a worker (%s)' % - (db_name, self._workers[db_name].uuid)) - worker = Worker(db_name, self) - self._workers[db_name] = worker - worker.daemon = True - worker.start() - - def _delete(self, db_name): - """ Delete a worker associated with a database """ - if db_name in self._workers: - # the worker will exit (it checks ``worker_lost()``) - del self._workers[db_name] - - def worker_for_db(self, db_name): - return self._workers.get(db_name) - - def worker_lost(self, worker): - """ Indicate if a worker is no longer referenced by the watcher. - - Used by the worker threads to know if they have to exit. - """ - return worker not in self._workers.itervalues() - - @staticmethod - def available_db_names(): - """ Returns the databases for the server having - the connector module installed. - - Available means that they can be used by a `Worker`. - - :return: database names - :rtype: list - """ - if config['db_name']: - db_names = config['db_name'].split(',') - else: - db_names = db.exp_list(True) - available_db_names = [] - for db_name in db_names: - session_hdl = ConnectorSessionHandler(db_name, - openerp.SUPERUSER_ID) - with session_hdl.session() as session: - cr = session.cr - try: - cr.execute("SELECT 1 FROM ir_module_module " - "WHERE name = %s " - "AND state = %s", ('connector', 'installed'), - log_exceptions=False) - except ProgrammingError as err: - no_db_error = 'relation "ir_module_module" does not exist' - if unicode(err).startswith(no_db_error): - _logger.debug('Database %s is not an OpenERP database,' - ' connector worker not started', db_name) - else: - raise - else: - if cr.fetchone(): - available_db_names.append(db_name) - return available_db_names - - def _update_workers(self): - """ Refresh the list of workers according to the available - databases and registries. - - A new database can be available, so we need to create a new - `Worker` or a database could have been dropped, so we have to - discard the Worker. - """ - db_names = self.available_db_names() - # deleted db or connector uninstalled: remove the workers - for db_name in set(self._workers) - set(db_names): - self._delete(db_name) - - for db_name in db_names: - if db_name not in self._workers: - self._new(db_name) - - def run(self): - """ `WorkerWatcher`'s main loop """ - while 1: - self._update_workers() - for db_name, worker in self._workers.items(): - self.check_alive(db_name, worker) - time.sleep(WAIT_CHECK_WORKER_ALIVE) - - def check_alive(self, db_name, worker): - """ Check if the the worker is still alive and notify - its aliveness. - Check if the other workers are still alive, if they are - dead, remove them from the worker's pool. - """ - session_hdl = ConnectorSessionHandler(db_name, - openerp.SUPERUSER_ID) - with session_hdl.session() as session: - if worker.is_alive(): - self._notify_alive(session, worker) - session.commit() - self._purge_dead_workers(session) - session.commit() - - def _notify_alive(self, session, worker): - _logger.debug('Worker %s is alive on process %s', - worker.uuid, os.getpid()) - session.env['queue.worker']._notify_alive(worker) - - def _purge_dead_workers(self, session): - session.env['queue.worker']._purge_dead_workers() - - -watcher = WorkerWatcher() - - -def start_service(): - """ Start the watcher """ - watcher.daemon = True - watcher.start() - -# We have to launch the Jobs Workers only if: -# 0. The alternative connector runner is not enabled -# 1. OpenERP is used in standalone mode (monoprocess) -# 2. Or it is used in multiprocess (with option ``--workers``) -# but the current process is a Connector Worker -# (launched with the ``openerp-connector-worker`` script). -if not os.environ.get('ODOO_CONNECTOR_CHANNELS'): - if (not getattr(openerp, 'multi_process', False) or - getattr(openerp, 'worker_connector', False)): - start_service() diff --git a/connector/tests/__init__.py b/connector/tests/__init__.py index a2736a025..eed661a13 100644 --- a/connector/tests/__init__.py +++ b/connector/tests/__init__.py @@ -22,8 +22,6 @@ from . import test_session from . import test_event from . import test_job -from . import test_queue -from . import test_worker from . import test_backend from . import test_producer from . import test_connector diff --git a/connector/tests/test_queue.py b/connector/tests/test_queue.py deleted file mode 100644 index c533bc9ed..000000000 --- a/connector/tests/test_queue.py +++ /dev/null @@ -1,41 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest -from datetime import timedelta - -from openerp.addons.connector.queue.queue import JobsQueue -from openerp.addons.connector.queue.job import Job - - -def dummy_task(session): - pass - - -class test_queue(unittest.TestCase): - """ Test Queue """ - - def setUp(self): - self.queue = JobsQueue() - - def test_sort(self): - """ Sort: the lowest priority number has the highest priority. - A job with a `eta` datetime is less priority in any case. - """ - job1 = Job(dummy_task, priority=10) - job2 = Job(dummy_task, priority=5) - job3 = Job(dummy_task, priority=15, - eta=timedelta(hours=2)) - job4 = Job(dummy_task, priority=15, - eta=timedelta(hours=1)) - job5 = Job(dummy_task, priority=1, - eta=timedelta(hours=2)) - self.queue.enqueue(job1) - self.queue.enqueue(job2) - self.queue.enqueue(job3) - self.queue.enqueue(job4) - self.queue.enqueue(job5) - self.assertEqual(self.queue.dequeue(), job2) - self.assertEqual(self.queue.dequeue(), job1) - self.assertEqual(self.queue.dequeue(), job4) - self.assertEqual(self.queue.dequeue(), job3) - self.assertEqual(self.queue.dequeue(), job5) diff --git a/connector/tests/test_worker.py b/connector/tests/test_worker.py deleted file mode 100644 index 499ee33ee..000000000 --- a/connector/tests/test_worker.py +++ /dev/null @@ -1,12 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest - -from openerp.addons.connector.queue.queue import JobsQueue - - -class test_worker(unittest.TestCase): - """ Test Worker """ - - def setUp(self): - self.queue = JobsQueue() From c35dea5b6d524bfec42ae7b864a9243f323e3bcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Thu, 3 Mar 2016 11:54:49 +0100 Subject: [PATCH 2/6] make connector installable fixes #177 --- connector/__openerp__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/__openerp__.py b/connector/__openerp__.py index 79ae69f22..fff173a81 100644 --- a/connector/__openerp__.py +++ b/connector/__openerp__.py @@ -40,6 +40,6 @@ 'setting_view.xml', 'res_partner_view.xml', ], - 'installable': False, + 'installable': True, 'application': True, } From 8f89ed0bdad447d31689caf085d3ae2a22598c63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Thu, 3 Mar 2016 11:57:15 +0100 Subject: [PATCH 3/6] enable the jobrunner by default fixes #175 --- connector/jobrunner/__init__.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/connector/jobrunner/__init__.py b/connector/jobrunner/__init__.py index 1be109974..e8ad0c40b 100644 --- a/connector/jobrunner/__init__.py +++ b/connector/jobrunner/__init__.py @@ -46,9 +46,6 @@ # to configure the runner (channels mostly). -enable = os.environ.get('ODOO_CONNECTOR_CHANNELS') - - class ConnectorRunnerThread(Thread): def __init__(self): @@ -78,7 +75,7 @@ def stop(self): def prefork_start(server, *args, **kwargs): global runner_thread res = orig_prefork_start(server, *args, **kwargs) - if enable and not config['stop_after_init']: + if not config['stop_after_init']: _logger.info("starting jobrunner thread (in prefork server)") runner_thread = ConnectorRunnerThread() runner_thread.start() @@ -99,7 +96,7 @@ def prefork_stop(server, graceful=True): def threaded_start(server, *args, **kwargs): global runner_thread res = orig_threaded_start(server, *args, **kwargs) - if enable and not config['stop_after_init']: + if not config['stop_after_init']: _logger.info("starting jobrunner thread (in threaded server)") runner_thread = ConnectorRunnerThread() runner_thread.start() From e0fe1caacf803466df834ae42e704674ae12524e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Thu, 3 Mar 2016 12:11:17 +0100 Subject: [PATCH 4/6] bump version --- connector/__openerp__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/__openerp__.py b/connector/__openerp__.py index fff173a81..2558e77d3 100644 --- a/connector/__openerp__.py +++ b/connector/__openerp__.py @@ -20,7 +20,7 @@ ############################################################################## {'name': 'Connector', - 'version': '9.0.1.0.0', + 'version': '9.0.1.0.1', 'author': 'Camptocamp,Openerp Connector Core Editors,' 'Odoo Community Association (OCA)', 'website': 'http://odoo-connector.com', From 742d71b0e46cc8e0a54d55140bc0200e3512b86b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Thu, 3 Mar 2016 12:13:38 +0100 Subject: [PATCH 5/6] remove unused import --- connector/connector.py | 1 - 1 file changed, 1 deletion(-) diff --git a/connector/connector.py b/connector/connector.py index 3537a28b8..ede1c3866 100644 --- a/connector/connector.py +++ b/connector/connector.py @@ -23,7 +23,6 @@ import logging import struct -from contextlib import contextmanager from openerp import models, fields from .exception import RetryableJobError From 09425c585927e1f03ac11700ec992a0ae19a57b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Thu, 3 Mar 2016 12:37:51 +0100 Subject: [PATCH 6/6] update changelog --- connector/CHANGES.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/connector/CHANGES.rst b/connector/CHANGES.rst index 0f3b3a4a9..3732f4f82 100644 --- a/connector/CHANGES.rst +++ b/connector/CHANGES.rst @@ -7,6 +7,14 @@ Changelog .. * +9.0.1.0.1 (2016-03-03) +~~~~~~~~~~~~~~~~~~~~~~ + +* Enabled the JobRunner by default, with a default channels configuration of root:1 +* Removed the old workers +* Removed the broken dbfilter support (https://github.com/OCA/connector/issues/58) +* Cleaned the methods that have been deprecated in version 3.x + 8.0.3.3.0 (2016-02-29) ~~~~~~~~~~~~~~~~~~~~~~