diff --git a/reqless/__init__.py b/reqless/__init__.py index 76bac16..507fc60 100644 --- a/reqless/__init__.py +++ b/reqless/__init__.py @@ -52,7 +52,8 @@ def __init__(self, client: AbstractClient): def complete(self, offset: int = 0, count: int = 25) -> List[str]: """Return the paginated jids of complete jobs""" - response: List[str] = self.client("jobs.completed", offset, count) + response_json = self.client("jobs.completed", offset, count) + response: List[str] = json.loads(response_json) return response def tracked(self) -> Dict[str, List[Any]]: @@ -114,12 +115,12 @@ def __init__(self, client: AbstractClient): @property def counts(self) -> Dict[str, Any]: - counts: Dict[str, Any] = json.loads(self.client("workers.list")) + counts: Dict[str, Any] = json.loads(self.client("workers.counts")) return counts def __getitem__(self, worker_name: str) -> Dict[str, Any]: """Which jobs does a particular worker have running""" - result: Dict[str, Any] = json.loads(self.client("worker.counts", worker_name)) + result: Dict[str, Any] = json.loads(self.client("worker.jobs", worker_name)) result["jobs"] = result["jobs"] or [] result["stalled"] = result["stalled"] or [] return result @@ -133,7 +134,7 @@ def __init__(self, client: AbstractClient): @property def counts(self) -> Dict: - counts: Dict = json.loads(self.client("queues.list")) + counts: Dict = json.loads(self.client("queues.counts")) return counts def __getitem__(self, queue_name: str) -> AbstractQueue: diff --git a/reqless/config.py b/reqless/config.py index d8aef7e..f65ff05 100644 --- a/reqless/config.py +++ b/reqless/config.py @@ -15,7 +15,7 @@ def __init__(self, client: AbstractClient): @property def all(self) -> Dict[str, Any]: - response: Dict[str, Any] = json.loads(self._client("config.get")) + response: Dict[str, Any] = json.loads(self._client("config.getAll")) return response def __len__(self) -> int: diff --git a/reqless/job.py b/reqless/job.py index 4d72cea..9987ef7 100644 --- a/reqless/job.py +++ b/reqless/job.py @@ -126,12 +126,14 @@ def cancel(self) -> List[str]: def tag(self, *tags: str) -> List[str]: """Tag a job with additional tags""" - response: List[str] = self.client("job.addTag", self.jid, *tags) + response_json: str = self.client("job.addTag", self.jid, *tags) + response: List[str] = json.loads(response_json) return response def untag(self, *tags: str) -> List[str]: """Remove tags from a job""" - response: List[str] = self.client("job.removeTag", self.jid, *tags) + response_json: str = self.client("job.removeTag", self.jid, *tags) + response: List[str] = json.loads(response_json) return response @@ -515,7 +517,7 @@ def move(self, queue: str) -> bool: def cancel(self) -> List[str]: """Cancel all future recurring jobs""" - self.client("recurringJob.unrecur", self.jid) + self.client("recurringJob.cancel", self.jid) return [self.jid] def tag(self, *tags: str) -> List[str]: diff --git a/reqless/lua/reqless-lib.lua b/reqless/lua/reqless-lib.lua index ae63c9e..5afc70c 100644 --- a/reqless/lua/reqless-lib.lua +++ b/reqless/lua/reqless-lib.lua @@ -1,5 +1,13 @@ --- Current SHA: be21dd39fba640234ed989fe40aaaefc31653dcc +-- Current SHA: 8b6600adb988e7f4922f606798b6ad64c06a245d -- This is a generated file +-- cjson can't tell an empty array from an empty object, so empty arrays end up +-- encoded as objects. This function makes empty arrays look like empty arrays. +local function cjsonArrayDegenerationWorkaround(array) + if #array == 0 then + return "[]" + end + return cjson.encode(array) +end ------------------------------------------------------------------------------- -- Forward declarations to make everything happy ------------------------------------------------------------------------------- @@ -170,10 +178,10 @@ function Reqless.failed(group, start, limit) return response end --- Jobs(now, 'complete', [offset, [count]]) +-- Jobs(now, 'complete', [offset, [limit]]) -- Jobs(now, ( -- 'stalled' | 'running' | 'scheduled' | 'depends', 'recurring' --- ), queue, [offset, [count]]) +-- ), queue, [offset, [limit]]) ------------------------------------------------------------------------------- -- Return all the job ids currently considered to be in the provided state -- in a particular queue. The response is a list of job ids: @@ -188,32 +196,32 @@ function Reqless.jobs(now, state, ...) if state == 'complete' then local offset = assert(tonumber(arg[1] or 0), 'Jobs(): Arg "offset" not a number: ' .. tostring(arg[1])) - local count = assert(tonumber(arg[2] or 25), - 'Jobs(): Arg "count" not a number: ' .. tostring(arg[2])) + local limit = assert(tonumber(arg[2] or 25), + 'Jobs(): Arg "limit" not a number: ' .. tostring(arg[2])) return redis.call('zrevrange', 'ql:completed', offset, - offset + count - 1) + offset + limit - 1) end local queue_name = assert(arg[1], 'Jobs(): Arg "queue" missing') local offset = assert(tonumber(arg[2] or 0), 'Jobs(): Arg "offset" not a number: ' .. tostring(arg[2])) - local count = assert(tonumber(arg[3] or 25), - 'Jobs(): Arg "count" not a number: ' .. tostring(arg[3])) + local limit = assert(tonumber(arg[3] or 25), + 'Jobs(): Arg "limit" not a number: ' .. tostring(arg[3])) local queue = Reqless.queue(queue_name) if state == 'running' then - return queue.locks.peek(now, offset, count) + return queue.locks.peek(now, offset, limit) elseif state == 'stalled' then - return queue.locks.expired(now, offset, count) + return queue.locks.expired(now, offset, limit) elseif state == 'throttled' then - return queue.throttled.peek(now, offset, count) + return queue.throttled.peek(now, offset, limit) elseif state == 'scheduled' then queue:check_scheduled(now, queue.scheduled.length()) - return queue.scheduled.peek(now, offset, count) + return queue.scheduled.peek(now, offset, limit) elseif state == 'depends' then - return queue.depends.peek(now, offset, count) + return queue.depends.peek(now, offset, limit) elseif state == 'recurring' then - return queue.recurring.peek(math.huge, offset, count) + return queue.recurring.peek(math.huge, offset, limit) end error('Jobs(): Unknown type "' .. state .. '"') @@ -276,13 +284,13 @@ function Reqless.track(now, command, jid) end -- tag(now, ('add' | 'remove'), jid, tag, [tag, ...]) --- tag(now, 'get', tag, [offset, [count]]) --- tag(now, 'top', [offset, [count]]) +-- tag(now, 'get', tag, [offset, [limit]]) +-- tag(now, 'top', [offset, [limit]]) -- ----------------------------------------------------------------------------- -- Accepts a jid, 'add' or 'remove', and then a list of tags -- to either add or remove from the job. Alternatively, 'get', -- a tag to get jobs associated with that tag, and offset and --- count +-- limit -- -- If 'add' or 'remove', the response is a list of the jobs -- current tags, or False if the job doesn't exist. If 'get', @@ -306,16 +314,16 @@ function Reqless.tag(now, command, ...) local tag = assert(arg[1], 'Tag(): Arg "tag" missing') local offset = assert(tonumber(arg[2] or 0), 'Tag(): Arg "offset" not a number: ' .. tostring(arg[2])) - local count = assert(tonumber(arg[3] or 25), - 'Tag(): Arg "count" not a number: ' .. tostring(arg[3])) + local limit = assert(tonumber(arg[3] or 25), + 'Tag(): Arg "limit" not a number: ' .. tostring(arg[3])) return { total = redis.call('zcard', 'ql:t:' .. tag), - jobs = redis.call('zrange', 'ql:t:' .. tag, offset, offset + count - 1) + jobs = redis.call('zrange', 'ql:t:' .. tag, offset, offset + limit - 1) } elseif command == 'top' then local offset = assert(tonumber(arg[1] or 0) , 'Tag(): Arg "offset" not a number: ' .. tostring(arg[1])) - local count = assert(tonumber(arg[2] or 25), 'Tag(): Arg "count" not a number: ' .. tostring(arg[2])) - return redis.call('zrevrangebyscore', 'ql:tags', '+inf', 2, 'limit', offset, count) + local limit = assert(tonumber(arg[2] or 25), 'Tag(): Arg "limit" not a number: ' .. tostring(arg[2])) + return redis.call('zrevrangebyscore', 'ql:tags', '+inf', 2, 'limit', offset, limit) elseif command ~= 'add' and command ~= 'remove' then error('Tag(): First argument must be "add", "remove", "get", or "top"') end @@ -462,16 +470,17 @@ end -- Configuration interactions ------------------------------------------------------------------------------- --- This represents our default configuration settings +-- This represents our default configuration settings. Redis hash values are +-- strings, so use strings for the defaults for more consistent typing. Reqless.config.defaults = { ['application'] = 'reqless', - ['grace-period'] = 10, - ['heartbeat'] = 60, - ['jobs-history'] = 604800, - ['jobs-history-count'] = 50000, - ['max-job-history'] = 100, - ['max-pop-retry'] = 1, - ['max-worker-age'] = 86400, + ['grace-period'] = '10', + ['heartbeat'] = '60', + ['jobs-history'] = '604800', + ['jobs-history-count'] = '50000', + ['max-job-history'] = '100', + ['max-pop-retry'] = '1', + ['max-worker-age'] = '86400', } -- Get one or more of the keys @@ -1140,7 +1149,7 @@ function ReqlessJob:heartbeat(now, worker, data) 'expires', expires, 'worker', worker) end - -- Update hwen this job was last updated on that worker + -- Update when this job was last updated on that worker -- Add this job to the list of jobs handled by this worker redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid) @@ -1404,11 +1413,11 @@ function Reqless.queue(name) -- Access to our work queue.work = { - peek = function(offset, count) - if count <= 0 then + peek = function(offset, limit) + if limit <= 0 then return {} end - return redis.call('zrevrange', queue:prefix('work'), offset, offset + count - 1) + return redis.call('zrevrange', queue:prefix('work'), offset, offset + limit - 1) end, remove = function(...) if #arg > 0 then return redis.call('zrem', queue:prefix('work'), unpack(arg)) @@ -1425,12 +1434,12 @@ function Reqless.queue(name) -- Access to our locks queue.locks = { - expired = function(now, offset, count) + expired = function(now, offset, limit) return redis.call('zrangebyscore', - queue:prefix('locks'), -math.huge, now, 'LIMIT', offset, count) - end, peek = function(now, offset, count) + queue:prefix('locks'), -math.huge, now, 'LIMIT', offset, limit) + end, peek = function(now, offset, limit) return redis.call('zrangebyscore', queue:prefix('locks'), - now, math.huge, 'LIMIT', offset, count) + now, math.huge, 'LIMIT', offset, limit) end, add = function(expires, jid) redis.call('zadd', queue:prefix('locks'), expires, jid) end, remove = function(...) @@ -1452,9 +1461,9 @@ function Reqless.queue(name) -- Access to our dependent jobs queue.depends = { - peek = function(now, offset, count) + peek = function(now, offset, limit) return redis.call('zrange', - queue:prefix('depends'), offset, offset + count - 1) + queue:prefix('depends'), offset, offset + limit - 1) end, add = function(now, jid) redis.call('zadd', queue:prefix('depends'), now, jid) end, remove = function(...) @@ -1471,8 +1480,8 @@ function Reqless.queue(name) queue.throttled = { length = function() return (redis.call('zcard', queue:prefix('throttled')) or 0) - end, peek = function(now, offset, count) - return redis.call('zrange', queue:prefix('throttled'), offset, offset + count - 1) + end, peek = function(now, offset, limit) + return redis.call('zrange', queue:prefix('throttled'), offset, offset + limit - 1) end, add = function(...) if #arg > 0 then redis.call('zadd', queue:prefix('throttled'), unpack(arg)) @@ -1488,12 +1497,12 @@ function Reqless.queue(name) -- Access to our scheduled jobs queue.scheduled = { - peek = function(now, offset, count) + peek = function(now, offset, limit) return redis.call('zrange', - queue:prefix('scheduled'), offset, offset + count - 1) - end, ready = function(now, offset, count) + queue:prefix('scheduled'), offset, offset + limit - 1) + end, ready = function(now, offset, limit) return redis.call('zrangebyscore', - queue:prefix('scheduled'), 0, now, 'LIMIT', offset, count) + queue:prefix('scheduled'), 0, now, 'LIMIT', offset, limit) end, add = function(when, jid) redis.call('zadd', queue:prefix('scheduled'), when, jid) end, remove = function(...) @@ -1507,10 +1516,10 @@ function Reqless.queue(name) -- Access to our recurring jobs queue.recurring = { - peek = function(now, offset, count) + peek = function(now, offset, limit) return redis.call('zrangebyscore', queue:prefix('recur'), - 0, now, 'LIMIT', offset, count) - end, ready = function(now, offset, count) + 0, now, 'LIMIT', offset, limit) + end, ready = function(now, offset, limit) end, add = function(when, jid) redis.call('zadd', queue:prefix('recur'), when, jid) end, remove = function(...) @@ -1631,26 +1640,26 @@ end ------- -- Examine the next jobs that would be popped from the queue without actually -- popping them. -function ReqlessQueue:peek(now, offset, count) +function ReqlessQueue:peek(now, offset, limit) offset = assert(tonumber(offset), 'Peek(): Arg "offset" missing or not a number: ' .. tostring(offset)) - count = assert(tonumber(count), - 'Peek(): Arg "count" missing or not a number: ' .. tostring(count)) + limit = assert(tonumber(limit), + 'Peek(): Arg "limit" missing or not a number: ' .. tostring(limit)) - if count <= 0 then + if limit <= 0 then return {} end - local count_with_offset = offset + count + local offset_with_limit = offset + limit -- These are the ids that we're going to return. We'll begin with any jobs -- that have lost their locks - local jids = self.locks.expired(now, 0, count_with_offset) + local jids = self.locks.expired(now, 0, offset_with_limit) -- Since we can't just peek the range we want, we have to consider all offset - -- + count jobs before we can take the relevant range. - local remaining_capacity = count_with_offset - #jids + -- + limit jobs before we can take the relevant range. + local remaining_capacity = offset_with_limit - #jids -- If we still need jobs in order to meet demand, then we should -- look for all the recurring jobs that need jobs run @@ -1667,7 +1676,7 @@ function ReqlessQueue:peek(now, offset, count) if offset > #jids then -- Offset takes us past the expired jids, so just return straight from the -- work queue - return self.work.peek(offset - #jids, count) + return self.work.peek(offset - #jids, limit) end -- Return a mix of expired jids and prioritized items from the work queue @@ -1677,7 +1686,7 @@ function ReqlessQueue:peek(now, offset, count) return {} end - return {unpack(jids, offset + 1, count_with_offset)} + return {unpack(jids, offset + 1, offset_with_limit)} end -- Return true if this queue is paused @@ -1703,10 +1712,10 @@ end -- Checks for expired locks, scheduled and recurring jobs, returning any -- jobs that are ready to be processes -function ReqlessQueue:pop(now, worker, count) +function ReqlessQueue:pop(now, worker, limit) assert(worker, 'Pop(): Arg "worker" missing') - count = assert(tonumber(count), - 'Pop(): Arg "count" missing or not a number: ' .. tostring(count)) + limit = assert(tonumber(limit), + 'Pop(): Arg "limit" missing or not a number: ' .. tostring(limit)) -- If this queue is paused, then return no jobs if self:paused() then @@ -1716,7 +1725,7 @@ function ReqlessQueue:pop(now, worker, count) -- Make sure we this worker to the list of seen workers redis.call('zadd', 'ql:workers', now, worker) - local dead_jids = self:invalidate_locks(now, count) or {} + local dead_jids = self:invalidate_locks(now, limit) or {} local popped = {} for _, jid in ipairs(dead_jids) do @@ -1737,13 +1746,13 @@ function ReqlessQueue:pop(now, worker, count) -- If we still need jobs in order to meet demand, then we should -- look for all the recurring jobs that need jobs run - self:check_recurring(now, count - #dead_jids) + self:check_recurring(now, limit - #dead_jids) -- If we still need values in order to meet the demand, then we -- should check if any scheduled items, and if so, we should -- insert them to ensure correctness when pulling off the next -- unit of work. - self:check_scheduled(now, count - #dead_jids) + self:check_scheduled(now, limit - #dead_jids) -- With these in place, we can expand this list of jids based on the work -- queue itself and the priorities therein @@ -1756,10 +1765,10 @@ function ReqlessQueue:pop(now, worker, count) ) -- Keep trying to fulfill fulfill jobs from the work queue until we reach - -- the desired count or exhaust our retry limit - while #popped < count and pop_retry_limit > 0 do + -- the desired limit or exhaust our retry limit + while #popped < limit and pop_retry_limit > 0 do - local jids = self.work.peek(0, count - #popped) or {} + local jids = self.work.peek(0, limit - #popped) or {} -- If there is nothing in the work queue, then no need to keep looping if #jids == 0 then @@ -2100,6 +2109,7 @@ function ReqlessQueue:unfail(now, group, count) assert(group, 'Unfail(): Arg "group" missing') count = assert(tonumber(count or 25), 'Unfail(): Arg "count" not a number: ' .. tostring(count)) + assert(count > 0, 'Unfail(): Arg "count" must be greater than zero') -- Get up to that many jobs, and we'll put them in the appropriate queue local jids = redis.call('lrange', 'ql:f:' .. group, -count, -1) @@ -2515,6 +2525,95 @@ function ReqlessQueue.counts(now, name) end return response end +local ReqlessQueuePatterns = { + default_identifiers_default_pattern = '["*"]', + ns = Reqless.ns .. "qp:", +} +ReqlessQueuePatterns.__index = ReqlessQueuePatterns + +ReqlessQueuePatterns['getIdentifierPatterns'] = function(now) + local reply = redis.call('hgetall', ReqlessQueuePatterns.ns .. 'identifiers') + + if #reply == 0 then + -- Check legacy key + reply = redis.call('hgetall', 'qmore:dynamic') + end + + -- Include default pattern in case identifier patterns have never been set. + local identifierPatterns = { + ['default'] = ReqlessQueuePatterns.default_identifiers_default_pattern, + } + for i = 1, #reply, 2 do + identifierPatterns[reply[i]] = reply[i + 1] + end + + return identifierPatterns +end + +-- Each key is a string and each value is string containing a JSON list of +-- patterns. +ReqlessQueuePatterns['setIdentifierPatterns'] = function(now, ...) + if #arg % 2 == 1 then + error('Odd number of identifier patterns: ' .. tostring(arg)) + end + local key = ReqlessQueuePatterns.ns .. 'identifiers' + + local goodDefault = false; + local identifierPatterns = {} + for i = 1, #arg, 2 do + local key = arg[i] + local serializedValues = arg[i + 1] + + -- Ensure that the value is valid JSON. + local values = cjson.decode(serializedValues) + + -- Only write the value if there are items in the list. + if #values > 0 then + if key == 'default' then + goodDefault = true + end + table.insert(identifierPatterns, key) + table.insert(identifierPatterns, serializedValues) + end + end + + -- Ensure some kind of default value is persisted. + if not goodDefault then + table.insert(identifierPatterns, "default") + table.insert( + identifierPatterns, + ReqlessQueuePatterns.default_identifiers_default_pattern + ) + end + + -- Clear out the legacy key too + redis.call('del', key, 'qmore:dynamic') + redis.call('hset', key, unpack(identifierPatterns)) +end + +ReqlessQueuePatterns['getPriorityPatterns'] = function(now) + local reply = redis.call('lrange', ReqlessQueuePatterns.ns .. 'priorities', 0, -1) + + if #reply == 0 then + -- Check legacy key + reply = redis.call('lrange', 'qmore:priority', 0, -1) + end + + return reply +end + +-- Each key is a string and each value is a string containing a JSON object +-- where the JSON object has a shape like: +-- {"fairly": true, "pattern": ["string", "string", "string"]} +ReqlessQueuePatterns['setPriorityPatterns'] = function(now, ...) + local key = ReqlessQueuePatterns.ns .. 'priorities' + redis.call('del', key) + -- Clear out the legacy key + redis.call('del', 'qmore:priority') + if #arg > 0 then + redis.call('rpush', key, unpack(arg)) + end +end -- Get all the attributes of this particular job function ReqlessRecurringJob:data() local job = redis.call( @@ -2552,116 +2651,138 @@ end function ReqlessRecurringJob:update(now, ...) local options = {} -- Make sure that the job exists - if redis.call('exists', 'ql:r:' .. self.jid) ~= 0 then - for i = 1, #arg, 2 do - local key = arg[i] - local value = arg[i+1] - assert(value, 'No value provided for ' .. tostring(key)) - if key == 'priority' or key == 'interval' or key == 'retries' then - value = assert(tonumber(value), 'Recur(): Arg "' .. key .. '" must be a number: ' .. tostring(value)) - -- If the command is 'interval', then we need to update the - -- time when it should next be scheduled - if key == 'interval' then - local queue, interval = unpack(redis.call('hmget', 'ql:r:' .. self.jid, 'queue', 'interval')) - Reqless.queue(queue).recurring.update( - value - tonumber(interval), self.jid) - end - redis.call('hset', 'ql:r:' .. self.jid, key, value) - elseif key == 'data' then - assert(cjson.decode(value), 'Recur(): Arg "data" is not JSON-encoded: ' .. tostring(value)) - redis.call('hset', 'ql:r:' .. self.jid, 'data', value) - elseif key == 'klass' then - redis.call('hset', 'ql:r:' .. self.jid, 'klass', value) - elseif key == 'queue' then - local old_queue_name = redis.call('hget', 'ql:r:' .. self.jid, 'queue') - local queue_obj = Reqless.queue(old_queue_name) - local score = queue_obj.recurring.score(self.jid) - - -- Detach from the old queue - queue_obj.recurring.remove(self.jid) - local throttles = cjson.decode(redis.call('hget', 'ql:r:' .. self.jid, 'throttles') or '{}') - for index, throttle_name in ipairs(throttles) do - if throttle_name == ReqlessQueue.ns .. old_queue_name then - table.remove(throttles, index) - end + if redis.call('exists', 'ql:r:' .. self.jid) == 0 then + error('Recur(): No recurring job ' .. self.jid) + end + + for i = 1, #arg, 2 do + local key = arg[i] + local value = arg[i+1] + assert(value, 'No value provided for ' .. tostring(key)) + if key == 'priority' or key == 'interval' or key == 'retries' then + value = assert(tonumber(value), 'Recur(): Arg "' .. key .. '" must be a number: ' .. tostring(value)) + -- If the command is 'interval', then we need to update the + -- time when it should next be scheduled + if key == 'interval' then + local queue, interval = unpack(redis.call('hmget', 'ql:r:' .. self.jid, 'queue', 'interval')) + Reqless.queue(queue).recurring.update( + value - tonumber(interval), self.jid) + end + redis.call('hset', 'ql:r:' .. self.jid, key, value) + elseif key == 'data' then + assert(cjson.decode(value), 'Recur(): Arg "data" is not JSON-encoded: ' .. tostring(value)) + redis.call('hset', 'ql:r:' .. self.jid, 'data', value) + elseif key == 'klass' then + redis.call('hset', 'ql:r:' .. self.jid, 'klass', value) + elseif key == 'queue' then + local old_queue_name = redis.call('hget', 'ql:r:' .. self.jid, 'queue') + local queue_obj = Reqless.queue(old_queue_name) + local score = queue_obj.recurring.score(self.jid) + + -- Detach from the old queue + queue_obj.recurring.remove(self.jid) + local throttles = cjson.decode(redis.call('hget', 'ql:r:' .. self.jid, 'throttles') or '{}') + for index, throttle_name in ipairs(throttles) do + if throttle_name == ReqlessQueue.ns .. old_queue_name then + table.remove(throttles, index) end + end - -- Attach to the new queue - table.insert(throttles, ReqlessQueue.ns .. value) - redis.call('hset', 'ql:r:' .. self.jid, 'throttles', cjson.encode(throttles)) + -- Attach to the new queue + table.insert(throttles, ReqlessQueue.ns .. value) + redis.call('hset', 'ql:r:' .. self.jid, 'throttles', cjson.encode(throttles)) - Reqless.queue(value).recurring.add(score, self.jid) - redis.call('hset', 'ql:r:' .. self.jid, 'queue', value) - -- If we don't already know about the queue, learn about it - if redis.call('zscore', 'ql:queues', value) == false then - redis.call('zadd', 'ql:queues', now, value) - end - elseif key == 'backlog' then - value = assert(tonumber(value), - 'Recur(): Arg "backlog" not a number: ' .. tostring(value)) - redis.call('hset', 'ql:r:' .. self.jid, 'backlog', value) - elseif key == 'throttles' then - local throttles = assert(cjson.decode(value), 'Recur(): Arg "throttles" is not JSON-encoded: ' .. tostring(value)) - redis.call('hset', 'ql:r:' .. self.jid, 'throttles', cjson.encode(throttles)) - else - error('Recur(): Unrecognized option "' .. key .. '"') + Reqless.queue(value).recurring.add(score, self.jid) + redis.call('hset', 'ql:r:' .. self.jid, 'queue', value) + -- If we don't already know about the queue, learn about it + if redis.call('zscore', 'ql:queues', value) == false then + redis.call('zadd', 'ql:queues', now, value) end + elseif key == 'backlog' then + value = assert(tonumber(value), + 'Recur(): Arg "backlog" not a number: ' .. tostring(value)) + redis.call('hset', 'ql:r:' .. self.jid, 'backlog', value) + elseif key == 'throttles' then + local throttles = assert(cjson.decode(value), 'Recur(): Arg "throttles" is not JSON-encoded: ' .. tostring(value)) + redis.call('hset', 'ql:r:' .. self.jid, 'throttles', cjson.encode(throttles)) + else + error('Recur(): Unrecognized option "' .. key .. '"') end - return true end - error('Recur(): No recurring job ' .. self.jid) + return true end -- Tags this recurring job with the provided tags function ReqlessRecurringJob:tag(...) local tags = redis.call('hget', 'ql:r:' .. self.jid, 'tags') - -- If the job has been canceled / deleted, then return false - if tags then - -- Decode the json blob, convert to dictionary - tags = cjson.decode(tags) - local _tags = {} - for _, v in ipairs(tags) do _tags[v] = true end + -- If the job has been canceled / deleted, then throw an error. + if not tags then + error('Tag(): Job ' .. self.jid .. ' does not exist') + end - -- Otherwise, add the job to the sorted set with that tags - for i=1, #arg do if _tags[arg[i]] == nil then table.insert(tags, arg[i]) end end + -- Decode the json blob, convert to dictionary + tags = cjson.decode(tags) + local _tags = {} + for _, v in ipairs(tags) do + _tags[v] = true + end - tags = cjson.encode(tags) - redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) - return tags + -- Otherwise, add the job to the sorted set with that tags + for i = 1, #arg do + if _tags[arg[i]] == nil then + table.insert(tags, arg[i]) + end end - error('Tag(): Job ' .. self.jid .. ' does not exist') + tags = cjsonArrayDegenerationWorkaround(tags) + redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) + + return tags end -- Removes a tag from the recurring job function ReqlessRecurringJob:untag(...) -- Get the existing tags local tags = redis.call('hget', 'ql:r:' .. self.jid, 'tags') + -- If the job has been canceled / deleted, then return false - if tags then - -- Decode the json blob, convert to dictionary - tags = cjson.decode(tags) - local _tags = {} - -- Make a hash - for _, v in ipairs(tags) do _tags[v] = true end - -- Delete these from the hash - for i = 1, #arg do _tags[arg[i]] = nil end - -- Back into a list - local results = {} - for _, tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end - -- json encode them, set, and return - tags = cjson.encode(results) - redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) - return tags + if not tags then + error('Untag(): Job ' .. self.jid .. ' does not exist') + end + + -- Decode the json blob, convert to dictionary + tags = cjson.decode(tags) + + local _tags = {} + -- Make a dictionary + for _, v in ipairs(tags) do + _tags[v] = true + end + + -- Delete these from the hash + for i = 1, #arg do + _tags[arg[i]] = nil end - error('Untag(): Job ' .. self.jid .. ' does not exist') + -- Back into a list + local results = {} + for _, tag in ipairs(tags) do + if _tags[tag] then + table.insert(results, tag) + end + end + + -- json encode them, set, and return + tags = cjson.encode(results) + redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) + + return tags end -- Stop further occurrences of this job -function ReqlessRecurringJob:unrecur() +function ReqlessRecurringJob:cancel() -- First, find out what queue it was attached to local queue = redis.call('hget', 'ql:r:' .. self.jid, 'queue') if queue then @@ -2669,7 +2790,6 @@ function ReqlessRecurringJob:unrecur() -- thing itself Reqless.queue(queue).recurring.remove(self.jid) redis.call('del', 'ql:r:' .. self.jid) - return true end return true diff --git a/reqless/lua/reqless.lua b/reqless/lua/reqless.lua index e1672fe..d8e2f03 100644 --- a/reqless/lua/reqless.lua +++ b/reqless/lua/reqless.lua @@ -1,5 +1,11 @@ --- Current SHA: 1b5bb93e846bbf76be594efe6fc19268dd3a20b2 +-- Current SHA: 8b6600adb988e7f4922f606798b6ad64c06a245d -- This is a generated file +local function cjsonArrayDegenerationWorkaround(array) + if #array == 0 then + return "[]" + end + return cjson.encode(array) +end local Reqless = { ns = 'ql:' } @@ -127,32 +133,32 @@ function Reqless.jobs(now, state, ...) if state == 'complete' then local offset = assert(tonumber(arg[1] or 0), 'Jobs(): Arg "offset" not a number: ' .. tostring(arg[1])) - local count = assert(tonumber(arg[2] or 25), - 'Jobs(): Arg "count" not a number: ' .. tostring(arg[2])) + local limit = assert(tonumber(arg[2] or 25), + 'Jobs(): Arg "limit" not a number: ' .. tostring(arg[2])) return redis.call('zrevrange', 'ql:completed', offset, - offset + count - 1) + offset + limit - 1) end local queue_name = assert(arg[1], 'Jobs(): Arg "queue" missing') local offset = assert(tonumber(arg[2] or 0), 'Jobs(): Arg "offset" not a number: ' .. tostring(arg[2])) - local count = assert(tonumber(arg[3] or 25), - 'Jobs(): Arg "count" not a number: ' .. tostring(arg[3])) + local limit = assert(tonumber(arg[3] or 25), + 'Jobs(): Arg "limit" not a number: ' .. tostring(arg[3])) local queue = Reqless.queue(queue_name) if state == 'running' then - return queue.locks.peek(now, offset, count) + return queue.locks.peek(now, offset, limit) elseif state == 'stalled' then - return queue.locks.expired(now, offset, count) + return queue.locks.expired(now, offset, limit) elseif state == 'throttled' then - return queue.throttled.peek(now, offset, count) + return queue.throttled.peek(now, offset, limit) elseif state == 'scheduled' then queue:check_scheduled(now, queue.scheduled.length()) - return queue.scheduled.peek(now, offset, count) + return queue.scheduled.peek(now, offset, limit) elseif state == 'depends' then - return queue.depends.peek(now, offset, count) + return queue.depends.peek(now, offset, limit) elseif state == 'recurring' then - return queue.recurring.peek(math.huge, offset, count) + return queue.recurring.peek(math.huge, offset, limit) end error('Jobs(): Unknown type "' .. state .. '"') @@ -196,16 +202,16 @@ function Reqless.tag(now, command, ...) local tag = assert(arg[1], 'Tag(): Arg "tag" missing') local offset = assert(tonumber(arg[2] or 0), 'Tag(): Arg "offset" not a number: ' .. tostring(arg[2])) - local count = assert(tonumber(arg[3] or 25), - 'Tag(): Arg "count" not a number: ' .. tostring(arg[3])) + local limit = assert(tonumber(arg[3] or 25), + 'Tag(): Arg "limit" not a number: ' .. tostring(arg[3])) return { total = redis.call('zcard', 'ql:t:' .. tag), - jobs = redis.call('zrange', 'ql:t:' .. tag, offset, offset + count - 1) + jobs = redis.call('zrange', 'ql:t:' .. tag, offset, offset + limit - 1) } elseif command == 'top' then local offset = assert(tonumber(arg[1] or 0) , 'Tag(): Arg "offset" not a number: ' .. tostring(arg[1])) - local count = assert(tonumber(arg[2] or 25), 'Tag(): Arg "count" not a number: ' .. tostring(arg[2])) - return redis.call('zrevrangebyscore', 'ql:tags', '+inf', 2, 'limit', offset, count) + local limit = assert(tonumber(arg[2] or 25), 'Tag(): Arg "limit" not a number: ' .. tostring(arg[2])) + return redis.call('zrevrangebyscore', 'ql:tags', '+inf', 2, 'limit', offset, limit) elseif command ~= 'add' and command ~= 'remove' then error('Tag(): First argument must be "add", "remove", "get", or "top"') end @@ -325,13 +331,13 @@ end Reqless.config.defaults = { ['application'] = 'reqless', - ['grace-period'] = 10, - ['heartbeat'] = 60, - ['jobs-history'] = 604800, - ['jobs-history-count'] = 50000, - ['max-job-history'] = 100, - ['max-pop-retry'] = 1, - ['max-worker-age'] = 86400, + ['grace-period'] = '10', + ['heartbeat'] = '60', + ['jobs-history'] = '604800', + ['jobs-history-count'] = '50000', + ['max-job-history'] = '100', + ['max-pop-retry'] = '1', + ['max-worker-age'] = '86400', } Reqless.config.get = function(key, default) @@ -1039,11 +1045,11 @@ function Reqless.queue(name) queue.name = name queue.work = { - peek = function(offset, count) - if count <= 0 then + peek = function(offset, limit) + if limit <= 0 then return {} end - return redis.call('zrevrange', queue:prefix('work'), offset, offset + count - 1) + return redis.call('zrevrange', queue:prefix('work'), offset, offset + limit - 1) end, remove = function(...) if #arg > 0 then return redis.call('zrem', queue:prefix('work'), unpack(arg)) @@ -1059,12 +1065,12 @@ function Reqless.queue(name) } queue.locks = { - expired = function(now, offset, count) + expired = function(now, offset, limit) return redis.call('zrangebyscore', - queue:prefix('locks'), -math.huge, now, 'LIMIT', offset, count) - end, peek = function(now, offset, count) + queue:prefix('locks'), -math.huge, now, 'LIMIT', offset, limit) + end, peek = function(now, offset, limit) return redis.call('zrangebyscore', queue:prefix('locks'), - now, math.huge, 'LIMIT', offset, count) + now, math.huge, 'LIMIT', offset, limit) end, add = function(expires, jid) redis.call('zadd', queue:prefix('locks'), expires, jid) end, remove = function(...) @@ -1083,9 +1089,9 @@ function Reqless.queue(name) } queue.depends = { - peek = function(now, offset, count) + peek = function(now, offset, limit) return redis.call('zrange', - queue:prefix('depends'), offset, offset + count - 1) + queue:prefix('depends'), offset, offset + limit - 1) end, add = function(now, jid) redis.call('zadd', queue:prefix('depends'), now, jid) end, remove = function(...) @@ -1101,8 +1107,8 @@ function Reqless.queue(name) queue.throttled = { length = function() return (redis.call('zcard', queue:prefix('throttled')) or 0) - end, peek = function(now, offset, count) - return redis.call('zrange', queue:prefix('throttled'), offset, offset + count - 1) + end, peek = function(now, offset, limit) + return redis.call('zrange', queue:prefix('throttled'), offset, offset + limit - 1) end, add = function(...) if #arg > 0 then redis.call('zadd', queue:prefix('throttled'), unpack(arg)) @@ -1117,12 +1123,12 @@ function Reqless.queue(name) } queue.scheduled = { - peek = function(now, offset, count) + peek = function(now, offset, limit) return redis.call('zrange', - queue:prefix('scheduled'), offset, offset + count - 1) - end, ready = function(now, offset, count) + queue:prefix('scheduled'), offset, offset + limit - 1) + end, ready = function(now, offset, limit) return redis.call('zrangebyscore', - queue:prefix('scheduled'), 0, now, 'LIMIT', offset, count) + queue:prefix('scheduled'), 0, now, 'LIMIT', offset, limit) end, add = function(when, jid) redis.call('zadd', queue:prefix('scheduled'), when, jid) end, remove = function(...) @@ -1135,10 +1141,10 @@ function Reqless.queue(name) } queue.recurring = { - peek = function(now, offset, count) + peek = function(now, offset, limit) return redis.call('zrangebyscore', queue:prefix('recur'), - 0, now, 'LIMIT', offset, count) - end, ready = function(now, offset, count) + 0, now, 'LIMIT', offset, limit) + end, ready = function(now, offset, limit) end, add = function(when, jid) redis.call('zadd', queue:prefix('recur'), when, jid) end, remove = function(...) @@ -1218,29 +1224,29 @@ function ReqlessQueue:stats(now, date) } end -function ReqlessQueue:peek(now, offset, count) +function ReqlessQueue:peek(now, offset, limit) offset = assert(tonumber(offset), 'Peek(): Arg "offset" missing or not a number: ' .. tostring(offset)) - count = assert(tonumber(count), - 'Peek(): Arg "count" missing or not a number: ' .. tostring(count)) + limit = assert(tonumber(limit), + 'Peek(): Arg "limit" missing or not a number: ' .. tostring(limit)) - if count <= 0 then + if limit <= 0 then return {} end - local count_with_offset = offset + count + local offset_with_limit = offset + limit - local jids = self.locks.expired(now, 0, count_with_offset) + local jids = self.locks.expired(now, 0, offset_with_limit) - local remaining_capacity = count_with_offset - #jids + local remaining_capacity = offset_with_limit - #jids self:check_recurring(now, remaining_capacity) self:check_scheduled(now, remaining_capacity) if offset > #jids then - return self.work.peek(offset - #jids, count) + return self.work.peek(offset - #jids, limit) end table_extend(jids, self.work.peek(0, remaining_capacity)) @@ -1249,7 +1255,7 @@ function ReqlessQueue:peek(now, offset, count) return {} end - return {unpack(jids, offset + 1, count_with_offset)} + return {unpack(jids, offset + 1, offset_with_limit)} end function ReqlessQueue:paused() @@ -1264,10 +1270,10 @@ function ReqlessQueue.unpause(...) redis.call('srem', 'ql:paused_queues', unpack(arg)) end -function ReqlessQueue:pop(now, worker, count) +function ReqlessQueue:pop(now, worker, limit) assert(worker, 'Pop(): Arg "worker" missing') - count = assert(tonumber(count), - 'Pop(): Arg "count" missing or not a number: ' .. tostring(count)) + limit = assert(tonumber(limit), + 'Pop(): Arg "limit" missing or not a number: ' .. tostring(limit)) if self:paused() then return {} @@ -1275,7 +1281,7 @@ function ReqlessQueue:pop(now, worker, count) redis.call('zadd', 'ql:workers', now, worker) - local dead_jids = self:invalidate_locks(now, count) or {} + local dead_jids = self:invalidate_locks(now, limit) or {} local popped = {} for _, jid in ipairs(dead_jids) do @@ -1290,9 +1296,9 @@ function ReqlessQueue:pop(now, worker, count) end - self:check_recurring(now, count - #dead_jids) + self:check_recurring(now, limit - #dead_jids) - self:check_scheduled(now, count - #dead_jids) + self:check_scheduled(now, limit - #dead_jids) local pop_retry_limit = tonumber( @@ -1300,9 +1306,9 @@ function ReqlessQueue:pop(now, worker, count) Reqless.config.get('max-pop-retry', 1) ) - while #popped < count and pop_retry_limit > 0 do + while #popped < limit and pop_retry_limit > 0 do - local jids = self.work.peek(0, count - #popped) or {} + local jids = self.work.peek(0, limit - #popped) or {} if #jids == 0 then break @@ -1571,6 +1577,7 @@ function ReqlessQueue:unfail(now, group, count) assert(group, 'Unfail(): Arg "group" missing') count = assert(tonumber(count or 25), 'Unfail(): Arg "count" not a number: ' .. tostring(count)) + assert(count > 0, 'Unfail(): Arg "count" must be greater than zero') local jids = redis.call('lrange', 'ql:f:' .. group, -count, -1) @@ -1889,6 +1896,82 @@ function ReqlessQueue.counts(now, name) end return response end +local ReqlessQueuePatterns = { + default_identifiers_default_pattern = '["*"]', + ns = Reqless.ns .. "qp:", +} +ReqlessQueuePatterns.__index = ReqlessQueuePatterns + +ReqlessQueuePatterns['getIdentifierPatterns'] = function(now) + local reply = redis.call('hgetall', ReqlessQueuePatterns.ns .. 'identifiers') + + if #reply == 0 then + reply = redis.call('hgetall', 'qmore:dynamic') + end + + local identifierPatterns = { + ['default'] = ReqlessQueuePatterns.default_identifiers_default_pattern, + } + for i = 1, #reply, 2 do + identifierPatterns[reply[i]] = reply[i + 1] + end + + return identifierPatterns +end + +ReqlessQueuePatterns['setIdentifierPatterns'] = function(now, ...) + if #arg % 2 == 1 then + error('Odd number of identifier patterns: ' .. tostring(arg)) + end + local key = ReqlessQueuePatterns.ns .. 'identifiers' + + local goodDefault = false; + local identifierPatterns = {} + for i = 1, #arg, 2 do + local key = arg[i] + local serializedValues = arg[i + 1] + + local values = cjson.decode(serializedValues) + + if #values > 0 then + if key == 'default' then + goodDefault = true + end + table.insert(identifierPatterns, key) + table.insert(identifierPatterns, serializedValues) + end + end + + if not goodDefault then + table.insert(identifierPatterns, "default") + table.insert( + identifierPatterns, + ReqlessQueuePatterns.default_identifiers_default_pattern + ) + end + + redis.call('del', key, 'qmore:dynamic') + redis.call('hset', key, unpack(identifierPatterns)) +end + +ReqlessQueuePatterns['getPriorityPatterns'] = function(now) + local reply = redis.call('lrange', ReqlessQueuePatterns.ns .. 'priorities', 0, -1) + + if #reply == 0 then + reply = redis.call('lrange', 'qmore:priority', 0, -1) + end + + return reply +end + +ReqlessQueuePatterns['setPriorityPatterns'] = function(now, ...) + local key = ReqlessQueuePatterns.ns .. 'priorities' + redis.call('del', key) + redis.call('del', 'qmore:priority') + if #arg > 0 then + redis.call('rpush', key, unpack(arg)) + end +end function ReqlessRecurringJob:data() local job = redis.call( 'hmget', 'ql:r:' .. self.jid, 'jid', 'klass', 'state', 'queue', @@ -1916,103 +1999,124 @@ end function ReqlessRecurringJob:update(now, ...) local options = {} - if redis.call('exists', 'ql:r:' .. self.jid) ~= 0 then - for i = 1, #arg, 2 do - local key = arg[i] - local value = arg[i+1] - assert(value, 'No value provided for ' .. tostring(key)) - if key == 'priority' or key == 'interval' or key == 'retries' then - value = assert(tonumber(value), 'Recur(): Arg "' .. key .. '" must be a number: ' .. tostring(value)) - if key == 'interval' then - local queue, interval = unpack(redis.call('hmget', 'ql:r:' .. self.jid, 'queue', 'interval')) - Reqless.queue(queue).recurring.update( - value - tonumber(interval), self.jid) - end - redis.call('hset', 'ql:r:' .. self.jid, key, value) - elseif key == 'data' then - assert(cjson.decode(value), 'Recur(): Arg "data" is not JSON-encoded: ' .. tostring(value)) - redis.call('hset', 'ql:r:' .. self.jid, 'data', value) - elseif key == 'klass' then - redis.call('hset', 'ql:r:' .. self.jid, 'klass', value) - elseif key == 'queue' then - local old_queue_name = redis.call('hget', 'ql:r:' .. self.jid, 'queue') - local queue_obj = Reqless.queue(old_queue_name) - local score = queue_obj.recurring.score(self.jid) - - queue_obj.recurring.remove(self.jid) - local throttles = cjson.decode(redis.call('hget', 'ql:r:' .. self.jid, 'throttles') or '{}') - for index, throttle_name in ipairs(throttles) do - if throttle_name == ReqlessQueue.ns .. old_queue_name then - table.remove(throttles, index) - end + if redis.call('exists', 'ql:r:' .. self.jid) == 0 then + error('Recur(): No recurring job ' .. self.jid) + end + + for i = 1, #arg, 2 do + local key = arg[i] + local value = arg[i+1] + assert(value, 'No value provided for ' .. tostring(key)) + if key == 'priority' or key == 'interval' or key == 'retries' then + value = assert(tonumber(value), 'Recur(): Arg "' .. key .. '" must be a number: ' .. tostring(value)) + if key == 'interval' then + local queue, interval = unpack(redis.call('hmget', 'ql:r:' .. self.jid, 'queue', 'interval')) + Reqless.queue(queue).recurring.update( + value - tonumber(interval), self.jid) + end + redis.call('hset', 'ql:r:' .. self.jid, key, value) + elseif key == 'data' then + assert(cjson.decode(value), 'Recur(): Arg "data" is not JSON-encoded: ' .. tostring(value)) + redis.call('hset', 'ql:r:' .. self.jid, 'data', value) + elseif key == 'klass' then + redis.call('hset', 'ql:r:' .. self.jid, 'klass', value) + elseif key == 'queue' then + local old_queue_name = redis.call('hget', 'ql:r:' .. self.jid, 'queue') + local queue_obj = Reqless.queue(old_queue_name) + local score = queue_obj.recurring.score(self.jid) + + queue_obj.recurring.remove(self.jid) + local throttles = cjson.decode(redis.call('hget', 'ql:r:' .. self.jid, 'throttles') or '{}') + for index, throttle_name in ipairs(throttles) do + if throttle_name == ReqlessQueue.ns .. old_queue_name then + table.remove(throttles, index) end + end - table.insert(throttles, ReqlessQueue.ns .. value) - redis.call('hset', 'ql:r:' .. self.jid, 'throttles', cjson.encode(throttles)) + table.insert(throttles, ReqlessQueue.ns .. value) + redis.call('hset', 'ql:r:' .. self.jid, 'throttles', cjson.encode(throttles)) - Reqless.queue(value).recurring.add(score, self.jid) - redis.call('hset', 'ql:r:' .. self.jid, 'queue', value) - if redis.call('zscore', 'ql:queues', value) == false then - redis.call('zadd', 'ql:queues', now, value) - end - elseif key == 'backlog' then - value = assert(tonumber(value), - 'Recur(): Arg "backlog" not a number: ' .. tostring(value)) - redis.call('hset', 'ql:r:' .. self.jid, 'backlog', value) - elseif key == 'throttles' then - local throttles = assert(cjson.decode(value), 'Recur(): Arg "throttles" is not JSON-encoded: ' .. tostring(value)) - redis.call('hset', 'ql:r:' .. self.jid, 'throttles', cjson.encode(throttles)) - else - error('Recur(): Unrecognized option "' .. key .. '"') + Reqless.queue(value).recurring.add(score, self.jid) + redis.call('hset', 'ql:r:' .. self.jid, 'queue', value) + if redis.call('zscore', 'ql:queues', value) == false then + redis.call('zadd', 'ql:queues', now, value) end + elseif key == 'backlog' then + value = assert(tonumber(value), + 'Recur(): Arg "backlog" not a number: ' .. tostring(value)) + redis.call('hset', 'ql:r:' .. self.jid, 'backlog', value) + elseif key == 'throttles' then + local throttles = assert(cjson.decode(value), 'Recur(): Arg "throttles" is not JSON-encoded: ' .. tostring(value)) + redis.call('hset', 'ql:r:' .. self.jid, 'throttles', cjson.encode(throttles)) + else + error('Recur(): Unrecognized option "' .. key .. '"') end - return true end - error('Recur(): No recurring job ' .. self.jid) + return true end function ReqlessRecurringJob:tag(...) local tags = redis.call('hget', 'ql:r:' .. self.jid, 'tags') - if tags then - tags = cjson.decode(tags) - local _tags = {} - for _, v in ipairs(tags) do _tags[v] = true end + if not tags then + error('Tag(): Job ' .. self.jid .. ' does not exist') + end - for i=1, #arg do if _tags[arg[i]] == nil then table.insert(tags, arg[i]) end end + tags = cjson.decode(tags) + local _tags = {} + for _, v in ipairs(tags) do + _tags[v] = true + end - tags = cjson.encode(tags) - redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) - return tags + for i = 1, #arg do + if _tags[arg[i]] == nil then + table.insert(tags, arg[i]) + end end - error('Tag(): Job ' .. self.jid .. ' does not exist') + tags = cjsonArrayDegenerationWorkaround(tags) + redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) + + return tags end function ReqlessRecurringJob:untag(...) local tags = redis.call('hget', 'ql:r:' .. self.jid, 'tags') - if tags then - tags = cjson.decode(tags) - local _tags = {} - for _, v in ipairs(tags) do _tags[v] = true end - for i = 1, #arg do _tags[arg[i]] = nil end - local results = {} - for _, tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end - tags = cjson.encode(results) - redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) - return tags + + if not tags then + error('Untag(): Job ' .. self.jid .. ' does not exist') end - error('Untag(): Job ' .. self.jid .. ' does not exist') + tags = cjson.decode(tags) + + local _tags = {} + for _, v in ipairs(tags) do + _tags[v] = true + end + + for i = 1, #arg do + _tags[arg[i]] = nil + end + + local results = {} + for _, tag in ipairs(tags) do + if _tags[tag] then + table.insert(results, tag) + end + end + + tags = cjson.encode(results) + redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) + + return tags end -function ReqlessRecurringJob:unrecur() +function ReqlessRecurringJob:cancel() local queue = redis.call('hget', 'ql:r:' .. self.jid, 'queue') if queue then Reqless.queue(queue).recurring.remove(self.jid) redis.call('del', 'ql:r:' .. self.jid) - return true end return true @@ -2134,10 +2238,8 @@ end local ReqlessAPI = {} ReqlessAPI['config.get'] = function(now, key) - if key then - return Reqless.config.get(key) - end - return ReqlessAPI['config.getAll'](now) + assert(key, "config.get(): Argument 'key' missing") + return Reqless.config.get(key) end ReqlessAPI['config.getAll'] = function(now) @@ -2149,7 +2251,7 @@ ReqlessAPI['config.set'] = function(now, key, value) end ReqlessAPI['config.unset'] = function(now, key) - return Reqless.config.unset(key) + Reqless.config.unset(key) end ReqlessAPI['failureGroups.counts'] = function(now, start, limit) @@ -2161,7 +2263,8 @@ ReqlessAPI['job.addDependency'] = function(now, jid, ...) end ReqlessAPI['job.addTag'] = function(now, jid, ...) - return cjson.encode(Reqless.tag(now, 'add', jid, unpack(arg))) + local result = Reqless.tag(now, 'add', jid, unpack(arg)) + return cjsonArrayDegenerationWorkaround(result) end ReqlessAPI['job.cancel'] = function(now, ...) @@ -2192,7 +2295,7 @@ ReqlessAPI['job.getMulti'] = function(now, ...) for _, jid in ipairs(arg) do table.insert(results, Reqless.job(jid):data()) end - return cjson.encode(results) + return cjsonArrayDegenerationWorkaround(results) end ReqlessAPI['job.heartbeat'] = function(now, jid, worker, data) @@ -2217,7 +2320,8 @@ ReqlessAPI['job.removeDependency'] = function(now, jid, ...) end ReqlessAPI['job.removeTag'] = function(now, jid, ...) - return cjson.encode(Reqless.tag(now, 'remove', jid, unpack(arg))) + local result = Reqless.tag(now, 'remove', jid, unpack(arg)) + return cjsonArrayDegenerationWorkaround(result) end ReqlessAPI['job.requeue'] = function(now, worker, queue, jid, klass, data, delay, ...) @@ -2249,7 +2353,8 @@ ReqlessAPI['job.untrack'] = function(now, jid) end ReqlessAPI["jobs.completed"] = function(now, offset, limit) - return Reqless.jobs(now, 'complete', offset, limit) + local result = Reqless.jobs(now, 'complete', offset, limit) + return cjsonArrayDegenerationWorkaround(result) end ReqlessAPI['jobs.failedByGroup'] = function(now, group, start, limit) @@ -2273,7 +2378,8 @@ ReqlessAPI['queue.forget'] = function(now, ...) end ReqlessAPI["queue.jobsByState"] = function(now, state, ...) - return Reqless.jobs(now, state, unpack(arg)) + local result = Reqless.jobs(now, state, unpack(arg)) + return cjsonArrayDegenerationWorkaround(result) end ReqlessAPI['queue.length'] = function(now, queue) @@ -2284,22 +2390,22 @@ ReqlessAPI['queue.pause'] = function(now, ...) ReqlessQueue.pause(now, unpack(arg)) end -ReqlessAPI['queue.peek'] = function(now, queue, offset, count) - local jids = Reqless.queue(queue):peek(now, offset, count) +ReqlessAPI['queue.peek'] = function(now, queue, offset, limit) + local jids = Reqless.queue(queue):peek(now, offset, limit) local response = {} for _, jid in ipairs(jids) do table.insert(response, Reqless.job(jid):data()) end - return cjson.encode(response) + return cjsonArrayDegenerationWorkaround(response) end -ReqlessAPI['queue.pop'] = function(now, queue, worker, count) - local jids = Reqless.queue(queue):pop(now, worker, count) +ReqlessAPI['queue.pop'] = function(now, queue, worker, limit) + local jids = Reqless.queue(queue):pop(now, worker, limit) local response = {} for _, jid in ipairs(jids) do table.insert(response, Reqless.job(jid):data()) end - return cjson.encode(response) + return cjsonArrayDegenerationWorkaround(response) end ReqlessAPI['queue.put'] = function(now, worker, queue, jid, klass, data, delay, ...) @@ -2322,16 +2428,37 @@ ReqlessAPI['queue.throttle.set'] = function(now, queue, max) Reqless.throttle(ReqlessQueue.ns .. queue):set({maximum = max}, 0) end -ReqlessAPI['queue.unfail'] = function(now, queue, group, count) - return Reqless.queue(queue):unfail(now, group, count) +ReqlessAPI['queue.unfail'] = function(now, queue, group, limit) + assert(queue, 'queue.unfail(): Arg "queue" missing') + return Reqless.queue(queue):unfail(now, group, limit) end ReqlessAPI['queue.unpause'] = function(now, ...) ReqlessQueue.unpause(unpack(arg)) end -ReqlessAPI['queues.list'] = function(now) - return cjson.encode(ReqlessQueue.counts(now, nil)) +ReqlessAPI['queueIdentifierPatterns.getAll'] = function(now) + return cjson.encode(ReqlessQueuePatterns.getIdentifierPatterns(now)) +end + +ReqlessAPI['queueIdentifierPatterns.setAll'] = function(now, ...) + ReqlessQueuePatterns.setIdentifierPatterns(now, unpack(arg)) +end + +ReqlessAPI['queuePriorityPatterns.getAll'] = function(now) + return cjsonArrayDegenerationWorkaround(ReqlessQueuePatterns.getPriorityPatterns(now)) +end + +ReqlessAPI['queuePriorityPatterns.setAll'] = function(now, ...) + ReqlessQueuePatterns.setPriorityPatterns(now, unpack(arg)) +end + +ReqlessAPI['queues.counts'] = function(now) + return cjsonArrayDegenerationWorkaround(ReqlessQueue.counts(now, nil)) +end + +ReqlessAPI['recurringJob.cancel'] = function(now, jid) + return Reqless.recurring(jid):cancel() end ReqlessAPI['recurringJob.get'] = function(now, jid) @@ -2349,20 +2476,17 @@ ReqlessAPI['recurringJob.removeTag'] = function(now, jid, ...) return Reqless.recurring(jid):untag(unpack(arg)) end -ReqlessAPI['recurringJob.unrecur'] = function(now, jid) - return Reqless.recurring(jid):unrecur() -end - ReqlessAPI['recurringJob.update'] = function(now, jid, ...) return Reqless.recurring(jid):update(now, unpack(arg)) end -ReqlessAPI['tags.top'] = function(now, ...) - return cjson.encode(Reqless.tag(now, 'top', unpack(arg))) +ReqlessAPI['tags.top'] = function(now, offset, limit) + local result = Reqless.tag(now, 'top', offset, limit) + return cjsonArrayDegenerationWorkaround(result) end ReqlessAPI['throttle.delete'] = function(now, tid) - return Reqless.throttle(tid):unset() + Reqless.throttle(tid):unset() end ReqlessAPI['throttle.get'] = function(now, tid) @@ -2370,11 +2494,13 @@ ReqlessAPI['throttle.get'] = function(now, tid) end ReqlessAPI['throttle.locks'] = function(now, tid) - return Reqless.throttle(tid).locks.members() + local result = Reqless.throttle(tid).locks.members() + return cjsonArrayDegenerationWorkaround(result) end ReqlessAPI['throttle.pending'] = function(now, tid) - return Reqless.throttle(tid).pending.members() + local result = Reqless.throttle(tid).pending.members() + return cjsonArrayDegenerationWorkaround(result) end ReqlessAPI['throttle.release'] = function(now, tid, ...) @@ -2393,194 +2519,16 @@ ReqlessAPI['throttle.set'] = function(now, tid, max, ...) Reqless.throttle(tid):set(data, tonumber(expiration or 0)) end -ReqlessAPI['worker.counts'] = function(now, worker) - return cjson.encode(ReqlessWorker.counts(now, worker)) -end - ReqlessAPI['worker.forget'] = function(now, ...) - return ReqlessWorker.deregister(unpack(arg)) + ReqlessWorker.deregister(unpack(arg)) end -ReqlessAPI['workers.list'] = function(now) - return cjson.encode(ReqlessWorker.counts(now, nil)) -end - - -ReqlessAPI['cancel'] = function(now, ...) - return ReqlessAPI['job.cancel'](now, unpack(arg)) -end - -ReqlessAPI['complete'] = function(now, jid, worker, queue, data, ...) - return Reqless.job(jid):complete(now, worker, queue, data, unpack(arg)) -end - -ReqlessAPI['depends'] = function(now, jid, command, ...) - if command == "on" then - return ReqlessAPI['job.addDependency'](now, jid, unpack(arg)) - elseif command == "off" then - return ReqlessAPI['job.removeDependency'](now, jid, unpack(arg)) - end - error('Depends(): Argument "command" must be "on" or "off"') -end - -ReqlessAPI['fail'] = function(now, jid, worker, group, message, data) - return ReqlessAPI['job.fail'](now, jid, worker, group, message, data) -end - -ReqlessAPI['failed'] = function(now, group, start, limit) - if group then - return ReqlessAPI['jobs.failedByGroup'](now, group, start, limit) - end - return ReqlessAPI['failureGroups.counts'](now, start, limit) -end - -ReqlessAPI['get'] = function(now, jid) - return ReqlessAPI['job.get'](now, jid) -end - -ReqlessAPI.heartbeat = function(now, jid, worker, data) - return ReqlessAPI['job.heartbeat'](now, jid, worker, data) -end - -ReqlessAPI['jobs'] = function(now, state, ...) - if state == 'complete' then - return ReqlessAPI['jobs.completed'](now, unpack(arg)) - end - return ReqlessAPI['queue.jobsByState'](now, state, unpack(arg)) -end - -ReqlessAPI['length'] = function(now, queue) - return ReqlessAPI['queue.length'](now, queue) -end - -ReqlessAPI['log'] = function(now, jid, message, data) - return ReqlessAPI['job.log'](now, jid, message, data) -end - -ReqlessAPI['multiget'] = function(now, ...) - return ReqlessAPI['job.getMulti'](now, unpack(arg)) -end - -ReqlessAPI['pause'] = function(now, ...) - return ReqlessAPI['queue.pause'](now, unpack(arg)) -end - -ReqlessAPI['peek'] = function(now, queue, offset, count) - return ReqlessAPI['queue.peek'](now, queue, offset, count) -end - -ReqlessAPI['pop'] = function(now, queue, worker, count) - return ReqlessAPI['queue.pop'](now, queue, worker, count) -end - -ReqlessAPI['priority'] = function(now, jid, priority) - return ReqlessAPI['job.setPriority'](now, jid, priority) -end - -ReqlessAPI['put'] = function(now, worker, queue, jid, klass, data, delay, ...) - return ReqlessAPI['queue.put'](now, worker, queue, jid, klass, data, delay, unpack(arg)) -end - -ReqlessAPI['queues'] = function(now, queue) - if queue then - return ReqlessAPI['queue.counts'](now, queue) - end - return ReqlessAPI['queues.list'](now) -end - -ReqlessAPI['recur'] = function(now, queue, jid, klass, data, spec, ...) - if spec == 'interval' then - return Reqless.queue(queue):recurAtInterval(now, jid, klass, data, unpack(arg)) - end - - error('Recur(): schedule type "' .. tostring(spec) .. '" unknown') -end - -ReqlessAPI['recur.get'] = function(now, jid) - return ReqlessAPI['recurringJob.get'](now, jid) -end - -ReqlessAPI['recur.tag'] = function(now, jid, ...) - return ReqlessAPI['recurringJob.addTag'](now, jid, unpack(arg)) -end - -ReqlessAPI['recur.untag'] = function(now, jid, ...) - return ReqlessAPI['recurringJob.removeTag'](now, jid, unpack(arg)) -end - -ReqlessAPI['recur.update'] = function(now, jid, ...) - return ReqlessAPI['recurringJob.update'](now, jid, unpack(arg)) -end - -ReqlessAPI['requeue'] = function(now, worker, queue, jid, ...) - return ReqlessAPI['job.requeue'](now, worker, queue, jid, unpack(arg)) -end - -ReqlessAPI['retry'] = function(now, jid, queue, worker, delay, group, message) - return ReqlessAPI['job.retry'](now, jid, queue, worker, delay, group, message) -end - -ReqlessAPI['stats'] = function(now, queue, date) - return ReqlessAPI['queue.stats'](now, queue, date) -end - -ReqlessAPI['tag'] = function(now, command, ...) - if command == 'add' then - return ReqlessAPI['job.addTag'](now, unpack(arg)) - end - if command == 'remove' then - return ReqlessAPI['job.removeTag'](now, unpack(arg)) - end - if command == 'get' then - return ReqlessAPI['jobs.tagged'](now, unpack(arg)) - end - if command == 'top' then - return ReqlessAPI['tags.top'](now, unpack(arg)) - end - error('Tag(): Unknown command ' .. command) -end - -ReqlessAPI['throttle.ttl'] = function(now, tid) - return Reqless.throttle(tid):ttl() -end - -ReqlessAPI['timeout'] = function(now, ...) - return ReqlessAPI['job.timeout'](now, unpack(arg)) -end - -ReqlessAPI['track'] = function(now, command, jid) - if command == 'track' then - return ReqlessAPI['job.track'](now, jid) - end - - if command == 'untrack' then - return ReqlessAPI['job.untrack'](now, jid) - end - - return ReqlessAPI['jobs.tracked'](now) -end - -ReqlessAPI['unfail'] = function(now, queue, group, count) - return ReqlessAPI['queue.unfail'](now, queue, group, count) -end - -ReqlessAPI['unpause'] = function(now, ...) - return ReqlessAPI['queue.unpause'](now, unpack(arg)) -end - -ReqlessAPI['unrecur'] = function(now, jid) - return ReqlessAPI['recurringJob.unrecur'](now, jid) -end - -ReqlessAPI['worker.deregister'] = function(now, ...) - return ReqlessAPI['worker.forget'](now, unpack(arg)) +ReqlessAPI['worker.jobs'] = function(now, worker) + return cjson.encode(ReqlessWorker.counts(now, worker)) end -ReqlessAPI['workers'] = function(now, worker) - if worker then - return ReqlessAPI['worker.counts'](now, worker) - end - return ReqlessAPI['workers.list'](now) +ReqlessAPI['workers.counts'] = function(now) + return cjsonArrayDegenerationWorkaround(ReqlessWorker.counts(now, nil)) end diff --git a/reqless/queue.py b/reqless/queue.py index 1f7d3aa..d298f00 100644 --- a/reqless/queue.py +++ b/reqless/queue.py @@ -28,37 +28,42 @@ def name(self) -> str: def depends(self, offset: int = 0, count: int = 25) -> List[str]: """Return all the currently dependent jobs""" - response: List[str] = self.client( + response_json: str = self.client( "queue.jobsByState", "depends", self.name, offset, count ) + response: List[str] = json.loads(response_json) return response def recurring(self, offset: int = 0, count: int = 25) -> List[str]: """Return all the recurring jobs""" - response: List[str] = self.client( + response_json: str = self.client( "queue.jobsByState", "recurring", self.name, offset, count ) + response: List[str] = json.loads(response_json) return response def running(self, offset: int = 0, count: int = 25) -> List[str]: """Return all the currently-running jobs""" - response: List[str] = self.client( + response_json: str = self.client( "queue.jobsByState", "running", self.name, offset, count ) + response: List[str] = json.loads(response_json) return response def scheduled(self, offset: int = 0, count: int = 25) -> List[str]: """Return all the currently-scheduled jobs""" - response: List[str] = self.client( + response_json: str = self.client( "queue.jobsByState", "scheduled", self.name, offset, count ) + response: List[str] = json.loads(response_json) return response def stalled(self, offset: int = 0, count: int = 25) -> List[str]: """Return all the currently-stalled jobs""" - response: List[str] = self.client( + response_json: str = self.client( "queue.jobsByState", "stalled", self.name, offset, count ) + response: List[str] = json.loads(response_json) return response diff --git a/reqless/reqless-core b/reqless/reqless-core index 1b5bb93..8b6600a 160000 --- a/reqless/reqless-core +++ b/reqless/reqless-core @@ -1 +1 @@ -Subproject commit 1b5bb93e846bbf76be594efe6fc19268dd3a20b2 +Subproject commit 8b6600adb988e7f4922f606798b6ad64c06a245d diff --git a/reqless/throttle.py b/reqless/throttle.py index ca25092..58a339d 100644 --- a/reqless/throttle.py +++ b/reqless/throttle.py @@ -17,7 +17,8 @@ def name(self) -> str: return self._name def locks(self) -> List[str]: - response: List[str] = self.client("throttle.locks", self.name) + response_json: str = self.client("throttle.locks", self.name) + response: List[str] = json.loads(response_json) return response def maximum(self) -> int: @@ -35,7 +36,8 @@ def set_maximum( self.client("throttle.set", self.name, _maximum, expiration or 0) def pending(self) -> List[str]: - response: List[str] = self.client("throttle.pending", self.name) + response_json: str = self.client("throttle.pending", self.name) + response: List[str] = json.loads(response_json) return response def ttl(self) -> int: diff --git a/reqless_test/test_client.py b/reqless_test/test_client.py index 0eee8a7..ef79576 100644 --- a/reqless_test/test_client.py +++ b/reqless_test/test_client.py @@ -36,7 +36,7 @@ def test_attribute_error(self) -> None: def test_tags(self) -> None: """Provides access to top tags""" - self.assertEqual(self.client.tags(), {}) + self.assertEqual(self.client.tags(), []) for _ in range(10): self.client.queues["foo"].put( "reqless_test.common.NoopJob", "{}", tags=["foo"] @@ -125,7 +125,7 @@ def test_basic(self) -> None: def test_counts(self) -> None: """Gives us access to counts""" - self.assertEqual(self.client.queues.counts, {}) + self.assertEqual(self.client.queues.counts, []) self.client.queues["foo"].put("reqless_test.common.NoopJob", "{}") self.assertEqual( self.client.queues.counts, @@ -210,7 +210,7 @@ def test_individual(self) -> None: def test_counts(self) -> None: """Gives us access to worker counts""" self.client.queues["foo"].put("reqless_test.common.NoopJob", "{}", jid="jid") - self.assertEqual(self.client.workers.counts, {}) + self.assertEqual(self.client.workers.counts, []) job = next(self.worker.jobs()) assert job is not None self.assertEqual( diff --git a/reqless_test/test_config.py b/reqless_test/test_config.py index c90a876..f46bf73 100644 --- a/reqless_test/test_config.py +++ b/reqless_test/test_config.py @@ -20,13 +20,13 @@ def test_get_all(self) -> None: self.client.config.all, { "application": "reqless", - "grace-period": 10, - "heartbeat": 60, - "jobs-history": 604800, - "jobs-history-count": 50000, - "max-job-history": 100, - "max-pop-retry": 1, - "max-worker-age": 86400, + "grace-period": "10", + "heartbeat": "60", + "jobs-history": "604800", + "jobs-history-count": "50000", + "max-job-history": "100", + "max-pop-retry": "1", + "max-worker-age": "86400", }, ) diff --git a/reqless_test/test_job.py b/reqless_test/test_job.py index a32e2dc..339c9b5 100644 --- a/reqless_test/test_job.py +++ b/reqless_test/test_job.py @@ -132,12 +132,14 @@ def test_tag_untag(self) -> None: """Exposes a way to tag and untag a job""" self.client.queues["foo"].put("reqless_test.test_job.Foo", "{}", jid="jid") job = self.get_job("jid") - job.tag("foo") + result = job.tag("foo", "bar") + self.assertEqual(["foo", "bar"], result) job = self.get_job("jid") - self.assertEqual(job.tags, ["foo"]) - job.untag("foo") + self.assertEqual(job.tags, ["foo", "bar"]) + result = job.untag("foo") + self.assertEqual(["bar"], result) job = self.get_job("jid") - self.assertEqual(job.tags, []) + self.assertEqual(job.tags, ["bar"]) def test_move(self) -> None: """Able to move jobs through the move method"""