Skip to content

Commit

Permalink
Merge pull request #495 from jvllmr/rq-2.0.0
Browse files Browse the repository at this point in the history
compatibility with RQ 2.0.0
  • Loading branch information
cjlapao authored Oct 29, 2024
2 parents 24e7191 + 2b9641a commit 182bcb1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 38 deletions.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ itsdangerous==2.1.2 # via flask
jinja2==3.1.2 # via flask
markupsafe==2.1.3 # via jinja2
python-dateutil==2.8.2 # via arrow
redis==4.6.0
rq==1.15.1
redis==5.2.0
rq==2.0.0
six==1.16.0
werkzeug==2.3.7
Redis-Sentinel-Url==1.0.1
59 changes: 28 additions & 31 deletions rq_dashboard/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
VERSION as rq_version,
Queue,
Worker,
pop_connection,
push_connection,
requeue_job,
)
from rq.exceptions import NoSuchJobError
Expand Down Expand Up @@ -96,13 +94,11 @@ def push_rq_connection():
raise LookupError("Index exceeds RQ list. Not Permitted.")
else:
new_instance = current_app.redis_conn
push_connection(new_instance)

current_app.redis_conn = new_instance


@blueprint.teardown_request
def pop_rq_connection(exception=None):
pop_connection()



def jsonify(f):
Expand Down Expand Up @@ -137,7 +133,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
failed_job_registry_count=FailedJobRegistry(q.name).count,
failed_job_registry_count=FailedJobRegistry(q.name, connection=q.connection).count,
failed_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -147,7 +143,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
started_job_registry_count=StartedJobRegistry(q.name).count,
started_job_registry_count=StartedJobRegistry(q.name, connection=q.connection).count,
started_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -157,7 +153,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
deferred_job_registry_count=DeferredJobRegistry(q.name).count,
deferred_job_registry_count=DeferredJobRegistry(q.name, connection=q.connection).count,
deferred_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -167,7 +163,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
finished_job_registry_count=FinishedJobRegistry(q.name).count,
finished_job_registry_count=FinishedJobRegistry(q.name, connection=q.connection).count,
finished_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -177,7 +173,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
canceled_job_registry_count=CanceledJobRegistry(q.name).count,
canceled_job_registry_count=CanceledJobRegistry(q.name, connection=q.connection).count,
canceled_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -187,7 +183,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
scheduled_job_registry_count=ScheduledJobRegistry(q.name).count,
scheduled_job_registry_count=ScheduledJobRegistry(q.name, connection=q.connection).count,
scheduled_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -208,12 +204,13 @@ def serialize_date(dt):
return arrow.get(dt).to("UTC").datetime.isoformat()


def serialize_job(job):
def serialize_job(job: Job):
latest_result = job.latest_result()
return dict(
id=job.id,
created_at=serialize_date(job.created_at),
ended_at=serialize_date(job.ended_at),
exc_info=str(job.exc_info) if job.exc_info else None,
exc_info=str(latest_result.exc_string) if latest_result else None,
description=job.description,
)

Expand Down Expand Up @@ -255,24 +252,24 @@ def favicon():
)


def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page, order):
queue = Queue(queue_name, serializer=config.serializer)
def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page, order, connection):
queue = Queue(queue_name, serializer=config.serializer, connection=connection)
if registry_name != "queued":
if per_page >= 0:
per_page = offset + (per_page - 1)

if registry_name == "failed":
current_queue = FailedJobRegistry(queue_name)
current_queue = FailedJobRegistry(queue_name, connection=connection)
elif registry_name == "deferred":
current_queue = DeferredJobRegistry(queue_name)
current_queue = DeferredJobRegistry(queue_name, connection=connection)
elif registry_name == "started":
current_queue = StartedJobRegistry(queue_name)
current_queue = StartedJobRegistry(queue_name, connection=connection)
elif registry_name == "finished":
current_queue = FinishedJobRegistry(queue_name)
current_queue = FinishedJobRegistry(queue_name, connection=connection)
elif registry_name == "scheduled":
current_queue = ScheduledJobRegistry(queue_name)
current_queue = ScheduledJobRegistry(queue_name, connection=connection)
elif registry_name == "canceled":
current_queue = CanceledJobRegistry(queue_name)
current_queue = CanceledJobRegistry(queue_name, connection=connection)
else:
current_queue = queue
total_items = current_queue.count
Expand Down Expand Up @@ -314,7 +311,7 @@ def queues_overview(instance_number):
"rq_dashboard/queues.html",
current_instance=instance_number,
instance_list=escape_format_instance_list(current_app.config.get("RQ_DASHBOARD_REDIS_URL")),
queues=Queue.all(),
queues=Queue.all(connection=current_app.redis_conn),
rq_url_prefix=url_for(".queues_overview"),
rq_dashboard_version=rq_dashboard_version,
rq_version=rq_version,
Expand Down Expand Up @@ -368,15 +365,15 @@ def workers_overview(instance_number):
)
def jobs_overview(instance_number, queue_name, registry_name, per_page, order, page):
if queue_name is None:
queue = Queue(serializer=config.serializer)
queue = Queue(serializer=config.serializer, connection=current_app.redis_conn)
else:
queue = Queue(queue_name, serializer=config.serializer)
queue = Queue(queue_name, serializer=config.serializer, connection=current_app.redis_conn)
r = make_response(
render_template(
"rq_dashboard/jobs.html",
current_instance=instance_number,
instance_list=escape_format_instance_list(current_app.config.get("RQ_DASHBOARD_REDIS_URL")),
queues=Queue.all(),
queues=Queue.all(connection=current_app.redis_conn),
queue=queue,
per_page=per_page,
order=order,
Expand All @@ -398,7 +395,7 @@ def jobs_overview(instance_number, queue_name, registry_name, per_page, order, p

@blueprint.route("/<int:instance_number>/view/job/<job_id>")
def job_view(instance_number, job_id):
job = Job.fetch(job_id)
job = Job.fetch(job_id, connection=current_app.redis_conn)
r = make_response(
render_template(
"rq_dashboard/job.html",
Expand All @@ -423,7 +420,7 @@ def job_view(instance_number, job_id):
@jsonify
def delete_job_view(job_id, registry=None):
try:
job = Job.fetch(job_id)
job = Job.fetch(job_id, connection=current_app.redis_conn)
job.delete()
except NoSuchJobError:
if registry:
Expand Down Expand Up @@ -497,7 +494,7 @@ def compact_queue(queue_name):
@blueprint.route("/<int:instance_number>/data/queues.json")
@jsonify
def list_queues(instance_number):
queues = serialize_queues(instance_number, sorted(Queue.all()))
queues = serialize_queues(instance_number, sorted(Queue.all(connection=current_app.redis_conn)))
return dict(queues=queues)


Expand All @@ -511,7 +508,7 @@ def list_jobs(instance_number, queue_name, registry_name, per_page, order, page)
offset = (current_page - 1) * per_page

total_items, jobs = get_queue_registry_jobs_count(
queue_name, registry_name, offset, per_page, order
queue_name, registry_name, offset, per_page, order, current_app.redis_conn
)

pages_numbers_in_window = pagination_window(total_items, current_page, per_page)
Expand Down Expand Up @@ -652,7 +649,7 @@ def serialize_queue_names(worker):
version=getattr(worker, "version", ""),
python_version=getattr(worker, "python_version", ""),
)
for worker in Worker.all()
for worker in Worker.all(connection=current_app.redis_conn)
),
key=lambda w: (w["state"], w["queues"], w["name"]),
)
Expand Down
9 changes: 4 additions & 5 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest

import redis
from rq import Queue, Worker, pop_connection, push_connection
from rq import Queue, Worker

from rq_dashboard.cli import make_flask_app
from rq_dashboard.web import escape_format_instance_list
Expand All @@ -24,13 +24,12 @@ def setUp(self):
self.app.testing = True
self.app.config['RQ_DASHBOARD_REDIS_URL'] = ['redis://127.0.0.1']
self.app.redis_conn = self.get_redis_client()
push_connection(self.get_redis_client())
self.client = self.app.test_client()

def tearDown(self):
q = Queue(connection=self.app.redis_conn)
q.empty()
pop_connection()


def test_dashboard_ok(self):
response = self.client.get('/')
Expand Down Expand Up @@ -89,7 +88,7 @@ def test_registry_jobs_list(self):
self.assertIn('jobs', data)

def test_worker_python_version_field(self):
w = Worker(['q'])
w = Worker(['q'], connection=self.app.redis_conn)
w.register_birth()
response = self.client.get('/0/data/workers.json')
data = json.loads(response.data.decode('utf8'))
Expand All @@ -100,7 +99,7 @@ def test_worker_python_version_field(self):
w.register_death()

def test_worker_version_field(self):
w = Worker(['q'])
w = Worker(['q'], connection=self.app.redis_conn)
w.register_birth()
response = self.client.get('/0/data/workers.json')
data = json.loads(response.data.decode('utf8'))
Expand Down

0 comments on commit 182bcb1

Please sign in to comment.