Skip to content

Commit

Permalink
Switch to reqless-core and new reqless-core APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
tdg5 committed Jul 4, 2024
1 parent 5bc8127 commit 9088f13
Show file tree
Hide file tree
Showing 14 changed files with 1,480 additions and 1,297 deletions.
6 changes: 3 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "reqless/qless-core"]
path = reqless/qless-core
url = https://github.com/tdg5/qless-core.git
[submodule "reqless/reqless-core"]
path = reqless/reqless-core
url = https://github.com/tdg5/reqless-core.git
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ repos:
hooks:
- id: trailing-whitespace
args: [--markdown-linebreak-ext=md]
exclude: reqless/lua|reqless/qless-core
exclude: reqless/lua|reqless/reqless-core
- id: end-of-file-fixer
exclude: README.md|reqless/lua|reqless/qless-core
exclude: README.md|reqless/lua|reqless/reqless-core
- id: check-docstring-first
- id: debug-statements
- id: name-tests-test
Expand Down
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ clean:
# And lastly, .coverage files
find . -name .coverage -delete

.PHONY: qless-core
qless-core:
# Ensure qless-core is built
make -C reqless/qless-core/
cp reqless/qless-core/qless.lua reqless/lua/
cp reqless/qless-core/qless-lib.lua reqless/lua/
.PHONY: reqless-core
reqless-core:
# Ensure reqless-core is built
make -C reqless/reqless-core/
cp reqless/reqless-core/reqless.lua reqless/lua/
cp reqless/reqless-core/reqless-lib.lua reqless/lua/

.PHONY: test-with-coverage
test-with-coverage: qless-core
test-with-coverage: reqless-core
coverage run -m pytest
coverage report | tee .meta/coverage/report.txt
coverage-badge -f -o .meta/coverage/badge.svg
Expand Down
52 changes: 25 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
This is a fork of [seomoz/qless-py](https://github.com/seomoz/qless-py) that
includes significant type improvements and support for throttles.

`qless` is a powerful job queueing system based on remote dictionary servers
`reqless` is a powerful job queueing system based on remote dictionary servers
(like `redis` and `valkey`) inspired by
[resque](https://github.com/defunkt/resque#readme), but built on a collection
of Lua scripts, maintained in the
[qless-core](https://github.com/tdg5/qless-core) repo.
[reqless-core](https://github.com/tdg5/reqless-core) repo.

## Philosophy and Nomenclature

Expand All @@ -36,13 +36,13 @@ let the system reclaim it.
## Features

1. __Jobs don't get dropped on the floor__ -- Sometimes workers drop jobs.
`qless` automatically picks them back up and gives them to another worker
`reqless` automatically picks them back up and gives them to another worker
1. __Tagging / Tracking__ -- Some jobs are more interesting than others. Track
those jobs to get updates on their progress. Tag jobs with meaningful
identifiers to find them quickly in the UI.
1. __Job Dependencies__ -- One job might need to wait for another job to
complete
1. __Stats__ -- `qless` automatically keeps statistics about how long jobs wait
1. __Stats__ -- `reqless` automatically keeps statistics about how long jobs wait
to be processed and how long they take to be processed. Currently, we keep
track of the count, mean, standard deviation, and a histogram of these
times.
Expand Down Expand Up @@ -74,12 +74,12 @@ Install from pip:
pip install reqless

Alternatively, install reqless-py from source by checking it out from github,
and checking out the qless-core submodule:
and checking out the reqless-core submodule:

```bash
git clone git://github.com/tdg5/reqless-py.git
cd reqless-py
# qless-core is a submodule
# reqless-core is a submodule
git submodule init
git submodule update
pip install .
Expand Down Expand Up @@ -216,7 +216,7 @@ This script actually forks off several subprocesses that perform the work, and
the original process keeps tabs on them to ensure that they are all up and
running. In the future, the parent process might also perform other sanity
checks, but for the time being, it's just that the process is still alive. You
can specify the `host` and `port` you want to use for the qless server as well:
can specify the `host` and `port` you want to use for the reqless server as well:

```bash
reqless-py-worker --host foo.bar --port 1234 ...
Expand Down Expand Up @@ -291,12 +291,12 @@ output).
This is an __experimental__ feature, but you can start workers `--resume` flag
to have the worker begin its processing with the jobs it left off with. For
instance, during deployments, it's common to restart the worker processes, and
the `--resume` flag has the worker first perform a check with `qless` server to
the `--resume` flag has the worker first perform a check with `reqless` server to
see which jobs it had last been running (and still has locks for).

This flag should be used with some caution. In particular, if two workers are
running with the same worker name, then this should not be used. The reason is
that through the `qless` interface, it's impossible to differentiate the two,
that through the `reqless` interface, it's impossible to differentiate the two,
and currently-running jobs may be confused with jobs that were simply dropped
when the worker was stopped.

Expand All @@ -308,7 +308,7 @@ reimports it. We think of this as a feature.

With this in mind, when I start a new project and want to make use of
`reqless`, I first start up the web app locally (see
[`qless`](http://github.com/tdg5/qless) for more), take a first pass, and
[`reqless-ui`](http://github.com/tdg5/reqless-ui-docker) for more), take a first pass, and
enqueue a single job while the worker is running:

# Supposing that I have /my/awesome/project/awesomeproject.py
Expand Down Expand Up @@ -430,7 +430,7 @@ client.config["jobs-history-count"] = 500

### Tagging / Tracking

In `qless`, "tracking" means flagging a job as important. Tracked jobs have a
In `reqless`, "tracking" means flagging a job as important. Tracked jobs have a
tab reserved for them in the web interface, and they also emit events that can
be subscribed to as they make progress (more on that below). You can flag a job
from the web interface, or the corresponding code:
Expand Down Expand Up @@ -493,14 +493,11 @@ for evt in ["canceled", "completed", "failed", "popped", "put", "stalled", "trac
client.events.listen()
```

If you're interested in, say, getting growl or campfire notifications, you
should check out the `qless-growl` and `qless-campfire` ruby gems.

### Retries

Workers sometimes die. That's an unfortunate reality of life. We try to
mitigate the effects of this by insisting that workers heartbeat their jobs to
ensure that they do not get dropped. That said, `qless` server will
ensure that they do not get dropped. That said, `reqless` server will
automatically requeue jobs that do get "stalled" up to the provided number of
retries (default is 5). Since underpants profit can sometimes go awry, maybe
you want to retry a particular heist several times:
Expand Down Expand Up @@ -548,7 +545,7 @@ job.complete("anotherQueue")

### Stats

One of the selling points of `qless` is that it keeps stats for you about your
One of the selling points of `reqless` is that it keeps stats for you about your
underpants hijinks. It tracks the average wait time, number of jobs that have
waited in a queue, failures, retries, and average running time. It also keeps
histograms for the number of jobs that have waited _x_ time, and the number
Expand All @@ -558,21 +555,22 @@ Frankly, these are best viewed using the web app.

### Lua

`qless` is a set of client language bindings, but the majority of the work is
done in a collection of Lua scripts that comprise the
[core](https://github.com/tdg5/qless-core) functionality. These scripts run
on `redis` and `valkey` 7.0+ server atomically and allow for portability with
the same functionality guarantees. Consult the documentation for `qless-core`
to learn more about its internals.
`reqless` is a set of client language bindings, but the majority of the work is
done in a collection of Lua scripts that comprise
[reqless-core](https://github.com/tdg5/reqless-core) functionality. These
scripts run on `redis` and `valkey` 7.0+ server atomically and allow for
portability with the same functionality guarantees. Consult the documentation
for `reqless-core` to learn more about its internals.

### Web App

`qless` also comes with a web app for administrative tasks, like keeping tabs
`reqless` also comes with a web app for administrative tasks, like keeping tabs
on the progress of jobs, tracking specific jobs, retrying failed jobs, etc.
It's available in the [`qless`](https://github.com/tdg5/qless) library as a
mountable [`Sinatra`](http://www.sinatrarb.com/) app. The web app is language
agnostic and was one of the major desires out of this project, so you should
consider using it even if you're not planning on using the Ruby client.
It's available in the [`reqless-rb`](https://github.com/tdg5/reqless-rb)
library as a mountable [`Sinatra`](http://www.sinatrarb.com/) app. The web app
is language agnostic and was one of the major desires out of this project, so
you should consider using it even if you're not planning on using the Ruby
client.

The web app is also available as a Docker container,
[`tdg5/reqless-ui`](https://hub.docker.com/repository/docker/tdg5/reqless-ui/general),
Expand Down
11 changes: 5 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ description = "Queue Management built on remote data structure stores like redis
dynamic = ["version"]
keywords = [
"job",
"qless",
"qmore",
"redis",
"reqless",
Expand Down Expand Up @@ -69,7 +68,7 @@ Source = "https://github.com/tdg5/reqless-py"
Tracker = "https://github.com/tdg5/reqless-py/issues"

[tool.black]
exclude = "reqless/(lua|qless-core)/.*"
exclude = "reqless/(lua|reqless-core)/.*"
include = "(reqless(_test)?)/.*\\.py$"
line-length = 88
target-version = ["py39"]
Expand All @@ -87,7 +86,7 @@ source = [
fail_under = 98

[tool.flake8]
exclude = ["reqless/lua", "reqless/qless-core"]
exclude = ["reqless/lua", "reqless/reqless-core"]
ignore = ["E203", "E251", "E701", "N805", "W503", "DAR101", "DAR201", "DAR301", "DAR401"]
max-line-length = 88
min_python_version = "3.9.0"
Expand All @@ -100,14 +99,14 @@ lines_after_imports = 2
multi_line_output = 3
profile = "black"
sections = ["FUTURE", "STDLIB", "THIRDPARTY", "FIRSTPARTY", "LOCALFOLDER"]
skip_glob = ["reqless/lua", "reqless/qless-core"]
skip_glob = ["reqless/lua", "reqless/reqless-core"]
use_parentheses = true

[tool.mypy]
check_untyped_defs = true
disallow_untyped_calls = true
disallow_untyped_defs = true
exclude = "^(reqless/qless-core|reqless/lua|fixtures|.*?\\.js|.*?\\.json)"
exclude = "^(reqless/reqless-core|reqless/lua|fixtures|.*?\\.js|.*?\\.json)"
ignore_missing_imports = true
warn_redundant_casts = true
warn_return_any = true
Expand All @@ -117,7 +116,7 @@ warn_unused_ignores = true
markers = ["integration_test: marks tests as integration tests (deselect with '-m \"not integration_test\"')"]
norecursedirs = [
"reqless/lua",
"reqless/qless-core",
"reqless/reqless-core",
]
testpaths = ["reqless_test"]

Expand Down
32 changes: 16 additions & 16 deletions reqless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ def __init__(self, client: AbstractClient):

def complete(self, offset: int = 0, count: int = 25) -> List[str]:
"""Return the paginated jids of complete jobs"""
response: List[str] = self.client("jobs", "complete", offset, count)
response: List[str] = self.client("jobs.completed", offset, count)
return response

def tracked(self) -> Dict[str, List[Any]]:
"""Return an array of job objects that are being tracked"""
results: Dict[str, Any] = json.loads(self.client("track"))
results: Dict[str, Any] = json.loads(self.client("jobs.tracked"))
results["jobs"] = [Job(self.client, **job) for job in results["jobs"]]
return results

def tagged(self, tag: str, offset: int = 0, count: int = 25) -> Dict[str, Any]:
"""Return the paginated jids of jobs tagged with a tag"""
response: Dict[str, Any] = json.loads(
self.client("tag", "get", tag, offset, count)
self.client("jobs.tagged", tag, offset, count)
)
return response

Expand All @@ -79,9 +79,9 @@ def failed(
paginated job objects affected by that kind of failure."""
results: Dict[str, Any]
if not group:
results = json.loads(self.client("failed"))
results = json.loads(self.client("jobs.failed"))
else:
results = json.loads(self.client("failed", group, start, limit))
results = json.loads(self.client("jobs.failed", group, start, limit))
results["jobs"] = self.get(*results["jobs"])
return results

Expand All @@ -90,16 +90,16 @@ def get(self, *jids: str) -> List[AbstractJob]:
if jids:
return [
Job(self.client, **j)
for j in json.loads(self.client("multiget", *jids))
for j in json.loads(self.client("job.getMulti", *jids))
]
return []

def __getitem__(self, jid: str) -> Optional[Union[Job, RecurringJob]]:
"""Get a job object corresponding to that jid, or ``None`` if it
doesn't exist"""
results = self.client("get", jid)
results = self.client("job.get", jid)
if not results:
results = self.client("recur.get", jid)
results = self.client("recurringJob.get", jid)
if not results:
return None
return RecurringJob(self.client, **json.loads(results))
Expand All @@ -114,12 +114,12 @@ def __init__(self, client: AbstractClient):

@property
def counts(self) -> Dict[str, Any]:
counts: Dict[str, Any] = json.loads(self.client("workers"))
counts: Dict[str, Any] = json.loads(self.client("workers.list"))
return counts

def __getitem__(self, worker_name: str) -> Dict[str, Any]:
"""Which jobs does a particular worker have running"""
result: Dict[str, Any] = json.loads(self.client("workers", worker_name))
result: Dict[str, Any] = json.loads(self.client("worker.counts", worker_name))
result["jobs"] = result["jobs"] or []
result["stalled"] = result["stalled"] or []
return result
Expand All @@ -133,7 +133,7 @@ def __init__(self, client: AbstractClient):

@property
def counts(self) -> Dict:
counts: Dict = json.loads(self.client("queues"))
counts: Dict = json.loads(self.client("queues.list"))
return counts

def __getitem__(self, queue_name: str) -> AbstractQueue:
Expand Down Expand Up @@ -176,7 +176,7 @@ def __init__(
self._events: Optional[Events] = None

# We now have a single unified core script.
data = pkgutil.get_data("reqless", "lua/qless.lua")
data = pkgutil.get_data("reqless", "lua/reqless.lua")
if data is None:
raise RuntimeError("Failed to load reqless lua!")
self._lua: Script = self.database.register_script(data)
Expand Down Expand Up @@ -229,22 +229,22 @@ def __call__(self, command: str, *args: Any) -> Any:

def track(self, jid: str) -> bool:
"""Begin tracking this job"""
response: str = self("track", "track", jid)
response: str = self("job.track", jid)
return response == "1"

def untrack(self, jid: str) -> bool:
"""Stop tracking this job"""
response: str = self("track", "untrack", jid)
response: str = self("job.untrack", jid)
return response == "1"

def tags(self, offset: int = 0, count: int = 100) -> List[str]:
"""The most common tags among jobs"""
tags: List[str] = json.loads(self("tag", "top", offset, count))
tags: List[str] = json.loads(self("tags.top", offset, count))
return tags

def unfail(self, group: str, queue: str, count: int = 500) -> int:
"""Move jobs from the failed group to the provided queue"""
unfail_count = self("unfail", queue, group, count)
unfail_count = self("queue.unfail", queue, group, count)
return int(unfail_count)


Expand Down
Loading

0 comments on commit 9088f13

Please sign in to comment.