Skip to content

Commit

Permalink
Redo "Fix concurrency bug"
Browse files Browse the repository at this point in the history
This improves upon commit ed54907,
reducing the number of queries significantly.

The prvevious attempt required a lot of queries, particularly to
rename keys when moving tasks to pending queues. This commit
reverts all that, and solves the original problem using Lua scripting.
  • Loading branch information
akadusei committed Nov 30, 2023
1 parent 2684881 commit cc71be0
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 136 deletions.
2 changes: 1 addition & 1 deletion spec/mel/task_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ describe Mel::Task do
Mel::CronTask.find_pending(-1).should be_nil
Mel::Task.find_pending(-1).should be_nil

Mel::InstantTask.find(2, delete: nil).try(&.size).should eq(2)
Mel::Task.find(2, delete: nil).try(&.size).should eq(2)
Mel::InstantTask.find(-1).try(&.size).should eq(1)

Mel::InstantTask.find_pending(-1).try(&.size).should eq(2)
Expand Down
72 changes: 62 additions & 10 deletions src/mel/task.cr
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,28 @@ abstract class Mel::Task
end

macro inherited
def self.find_lt(time : Time, count = -1, *, delete = false) : Array(self)?
def self.find_lt(
time : Time,
count : Int = -1,
*,
delete : Nil
) : Array(self)?
\{% raise <<-ERROR %}
no overload matches '#{@type.name}.#{@def.name}' with types \
Time, Int, delete: Nil
Overloads are:
- #{@type.name}.#{@def.name}(time : Time, count : Int = -1, *, \
delete : Bool = false)
ERROR
end

def self.find_lt(
time : Time,
count : Int = -1,
*,
delete : Bool = false
) : Array(self)?
return if count.zero?

Mel::Task.find_lt(time, -1, delete: false).try do |tasks|
Expand All @@ -71,7 +92,28 @@ abstract class Mel::Task
end
end

def self.find_lte(time : Time, count = -1, *, delete = false) : Array(self)?
def self.find_lte(
time : Time,
count : Int = -1,
*,
delete : Nil
) : Array(self)?
\{% raise <<-ERROR %}
no overload matches '#{@type.name}.#{@def.name}' with types \
Time, Int, delete: Nil
Overloads are:
- #{@type.name}.#{@def.name}(time : Time, count : Int = -1, *, \
delete : Bool = false)
ERROR
end

def self.find_lte(
time : Time,
count : Int = -1,
*,
delete : Bool = false
) : Array(self)?
return if count.zero?

Mel::Task.find_lte(time, -1, delete: false).try do |tasks|
Expand All @@ -81,7 +123,17 @@ abstract class Mel::Task
end
end

def self.find(count : Int, *, delete = false) : Array(self)?
def self.find(count : Int, *, delete : Nil)
\{% raise <<-ERROR %}
no overload matches '#{@type.name}.#{@def.name}' with types \
Int, delete: Nil
Overloads are:
- #{@type.name}.#{@def.name}(count : Int, *, delete : Bool = false)
ERROR
end

def self.find(count : Int, *, delete : Bool = false) : Array(self)?
return if count.zero?

Mel::Task.find(-1, delete: false).try do |tasks|
Expand All @@ -91,14 +143,14 @@ abstract class Mel::Task
end
end

def self.find(id : String, *, delete = false) : self?
def self.find(id : String, *, delete : Bool = false) : self?
Mel::Task.find(id, delete: false).try do |task|
return unless task.is_a?(self)
delete(task, delete).try &.as(self)
end
end

def self.find(ids : Indexable, *, delete = false) : Array(self)?
def self.find(ids : Indexable, *, delete : Bool = false) : Array(self)?
Mel::Task.find(ids, delete: false).try do |tasks|
tasks = tasks.select(self)
return if tasks.empty?
Expand Down Expand Up @@ -132,27 +184,27 @@ abstract class Mel::Task
end
end

def self.find_lt(time : Time, count = -1, *, delete = false)
def self.find_lt(time : Time, count : Int = -1, *, delete : Bool? = false)
Query.find_lt(time, count, delete: delete).try do |values|
from_json(values)
end
end

def self.find_lte(time : Time, count = -1, *, delete = false)
def self.find_lte(time : Time, count : Int = -1, *, delete : Bool? = false)
Query.find_lte(time, count, delete: delete).try do |values|
from_json(values)
end
end

def self.find(count : Int, *, delete = false)
def self.find(count : Int, *, delete : Bool? = false)
Query.find(count, delete: delete).try { |values| from_json(values) }
end

def self.find(id : String, *, delete = false)
def self.find(id : String, *, delete : Bool = false)
Query.find(id, delete: delete).try { |value| from_json(value) }
end

def self.find(ids : Indexable, *, delete = false)
def self.find(ids : Indexable, *, delete : Bool = false)
Query.find(ids, delete: delete).try { |values| from_json(values) }
end

Expand Down
38 changes: 21 additions & 17 deletions src/mel/task/query.cr
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,29 @@ abstract class Mel::Task
find(ids, delete: true)
end

def find_lt(time : Time, count = -1, *, delete = false) : Array(String)?
def find_lt(
time : Time,
count : Int = -1,
*,
delete : Bool = false
) : Array(String)?
return if count.zero?

connect do
ids = Mel.redis.zrangebyscore(
key,
"0",
"(#{time.to_unix}",
{"0", count.to_s}
).as(Array)
ids = Mel.redis
.zrangebyscore(key, "0", "(#{time.to_unix}", {"0", count.to_s})
.as(Array)

find(ids, delete: delete)
end
end

def find_lte(time : Time, count = -1, *, delete = false) : Array(String)?
def find_lte(
time : Time,
count : Int = -1,
*,
delete : Bool = false
) : Array(String)?
return if count.zero?

connect do
Expand All @@ -83,26 +90,23 @@ abstract class Mel::Task
end
end

def find(count : Int, *, delete = false) : Array(String)?
def find(count : Int, *, delete : Bool = false) : Array(String)?
return if count.zero?

connect do
ids = Mel.redis.zrangebyscore(
key,
"0",
"+inf",
{"0", count.to_s}
).as(Array)
ids = Mel.redis
.zrangebyscore(key, "0", "+inf", {"0", count.to_s})
.as(Array)

find(ids, delete: delete)
end
end

def find(id : String, *, delete = false) : String?
def find(id : String, *, delete : Bool = false) : String?
find({id}, delete: delete).try(&.first?)
end

def find(ids : Indexable, *, delete = false) : Array(String)?
def find(ids : Indexable, *, delete : Bool = false) : Array(String)?
return if ids.empty?

connect do
Expand Down
1 change: 1 addition & 0 deletions src/worker/instant_task.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "./task"
module Mel
class InstantTask < Task
private def schedule_next
dequeue
end
end
end
2 changes: 1 addition & 1 deletion src/worker/recurring_task.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Mel
end

private def schedule_next
return if till.try(&.< next_time)
return dequeue if till.try(&.< next_time)
log_scheduling

task = clone
Expand Down
54 changes: 3 additions & 51 deletions src/worker/task.cr
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
require "./task/**"

abstract class Mel::Task
def dequeue_pending
do_before_dequeue
log_dequeueing

Query.delete_pending(id).tap do
log_dequeued
do_after_dequeue(true)
end
rescue error
handle_error(error)
do_after_dequeue(false)
end

def run(*, force = false) : Fiber?
return log_not_due unless force || due?

Expand All @@ -24,11 +11,9 @@ abstract class Mel::Task
log_running
job.run
rescue error
dequeue_pending
retry_failed_task(error)
log_errored(error)
else
dequeue_pending
schedule_next
log_ran
do_after_run(true)
Expand Down Expand Up @@ -64,51 +49,18 @@ abstract class Mel::Task
end

macro inherited
def self.find_pending(count : Int, *, delete = false) : Array(self)?
def self.find_pending(count : Int, *, delete : Bool = false) : Array(self)?
return if count.zero?

Mel::Task.find_pending(-1, delete: false).try do |tasks|
tasks = resize(tasks.select(self), count)
return if tasks.empty?
delete_pending(tasks, delete).try &.map(&.as self)
end
end

def self.find_pending(id : String, *, delete = false) : self?
Mel::Task.find_pending(id, delete: false).try do |task|
return unless task.is_a?(self)
delete_pending(task, delete).try &.as(self)
end
end

def self.find_pending(ids : Indexable, *, delete = false) : Array(self)?
Mel::Task.find_pending(ids, delete: false).try do |tasks|
tasks = tasks.select(self)
return if tasks.empty?
delete_pending(tasks, delete).try &.map(&.as self)
delete(tasks, delete).try &.map(&.as self)
end
end

private def self.delete_pending(tasks : Indexable, delete)
false == delete ?
tasks :
Mel::Task.find_pending(tasks.map(&.id), delete: delete)
end

private def self.delete_pending(task, delete)
false == delete ? task : Mel::Task.find_pending(task.id, delete: delete)
end
end

def self.find_pending(count : Int, *, delete = false)
def self.find_pending(count : Int, *, delete : Bool = false)
Query.find_pending(count, delete: delete).try { |values| from_json(values) }
end

def self.find_pending(id : String, *, delete = false)
Query.find_pending(id, delete: delete).try { |value| from_json(value) }
end

def self.find_pending(ids : Indexable, *, delete = false)
Query.find_pending(ids, delete: delete).try { |values| from_json(values) }
end
end
Loading

0 comments on commit cc71be0

Please sign in to comment.