Skip to content

Commit

Permalink
Merge pull request #5 from edgurgel/fix-redis-restart-scripts
Browse files Browse the repository at this point in the history
Fix script loading when Redix fail to evaluate
  • Loading branch information
alissonsales committed Dec 22, 2015
2 parents ab1b1c6 + bb68bba commit 1626bbf
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 12 deletions.
8 changes: 1 addition & 7 deletions lib/verk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,7 @@ defmodule Verk do
alias Verk.Job

@doc false
def start(_type, _args) do
redis_url = Application.get_env(:verk, :redis_url, "redis://127.0.0.1:6379")
{ :ok, redis } = Redix.start_link(redis_url)
Verk.Scripts.load(redis)
:ok = Redix.stop(redis)
Verk.Supervisor.start_link
end
def start(_type, _args), do: Verk.Supervisor.start_link

@doc """
Add a new `queue` with a pool of size `size` of workers
Expand Down
12 changes: 9 additions & 3 deletions lib/verk/queue_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ defmodule Verk.QueueManager do
node_id = Application.get_env(:verk, :node_id, "1")
redis_url = Application.get_env(:verk, :redis_url, "redis://127.0.0.1:6379")
{ :ok, redis } = Redix.start_link(redis_url)
Verk.Scripts.load(redis)

state = %State{ queue_name: queue_name, redis: redis, node_id: node_id }

Logger.info "Queue Manager started for queue #{queue_name}"
Expand All @@ -84,9 +86,10 @@ defmodule Verk.QueueManager do
def handle_call(:enqueue_inprogress, _from, state) do
case Redix.command(state.redis, ["EVALSHA", @lpop_rpush_src_dest_script_sha, 2, inprogress(state.queue_name, state.node_id), "queue:#{state.queue_name}"]) do
{ :ok, n } -> Logger.info("#{n} jobs readded to the queue #{state.queue_name} from inprogress list")
error -> Logger.error("Failed to add jobs back to queue #{state.queue_name} from inprogress list. Error: #{inspect error}")
{ :reply, :ok, state }
{ :error, reason } -> Logger.error("Failed to add jobs back to queue #{state.queue_name} from inprogress list. Error: #{inspect reason}")
{ :stop, :redis_failed, state }
end
{ :reply, :ok, state }
end

def handle_call({ :dequeue, n }, _from, state) do
Expand All @@ -96,7 +99,10 @@ defmodule Verk.QueueManager do
{ :ok, jobs } ->
jobs = for job <- jobs, do: Verk.Job.decode!(job)
{ :reply, jobs, state }
_ ->
{ :error, %Redix.Error{message: message} } ->
Logger.error("Failed to fetch jobs: #{message}")
{ :stop, :redis_failed, :redis_failed, state }
{ :error, _ } ->
{ :reply, :redis_failed, state }
end
end
Expand Down
9 changes: 8 additions & 1 deletion lib/verk/schedule_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ defmodule Verk.ScheduleManager do
def init(_) do
redis_url = Application.get_env(:verk, :redis_url, "redis://127.0.0.1:6379")
{ :ok, redis } = Redix.start_link(redis_url)
Verk.Scripts.load(redis)

state = %State{ redis: redis }

Logger.info "Schedule Manager started"
Expand All @@ -43,13 +45,18 @@ defmodule Verk.ScheduleManager do
case Redix.command(state.redis, ["EVALSHA", @enqueue_retriable_script_sha, 1, @retry_key, Time.now(:secs)]) do
{ :ok, nil } ->
schedule_fetch_retryable!
{ :noreply, state }
{ :ok, _job } ->
schedule_fetch_retryable!(0)
{ :noreply, state }
{ :error, %Redix.Error{message: message} } ->
Logger.error("Failed to fetch retry set. Error: #{message}")
{ :stop, :redis_failed, state }
error ->
Logger.error("Failed to fetch retry set. Error: #{inspect error}")
schedule_fetch_retryable!
{ :noreply, state }
end
{ :noreply, state }
end

defp schedule_fetch_retryable! do
Expand Down
24 changes: 23 additions & 1 deletion test/queue_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ defmodule Verk.QueueManagerTest do
Application.put_env(:verk, :node_id, "test_node")

expect(Redix, :start_link, ["redis_url"], {:ok, :redis })
expect(Verk.Scripts, :load, [:redis], :ok)

assert init(["queue_name"]) == { :ok, %State{ node_id: "test_node", queue_name: "queue_name", redis: :redis } }

assert validate Redix
assert validate [Redix, Verk.Scripts]
end

test "call enqueue_inprogress" do
Expand All @@ -37,6 +38,17 @@ defmodule Verk.QueueManagerTest do
assert validate Redix
end

test "call enqueue_inprogress and redis failed" do
script = Verk.Scripts.sha("lpop_rpush_src_dest")
expect(Redix, :command, [:redis, ["EVALSHA", script, 2, "inprogress:test_queue:test_node", "queue:test_queue"]], { :error, :reason })

state = %State{ queue_name: "test_queue", redis: :redis, node_id: "test_node" }

assert handle_call(:enqueue_inprogress, :from, state) == { :stop, :redis_failed, state }

assert validate Redix
end

test "call dequeue with an empty queue" do
script = Verk.Scripts.sha("mrpop_lpush_src_dest")
expect(Redix, :command, [:redis, ["EVALSHA", script, 2, "queue:test_queue", "inprogress:test_queue:test_node", 3]], { :ok, [] })
Expand Down Expand Up @@ -69,6 +81,16 @@ defmodule Verk.QueueManagerTest do
assert validate Redix
end

test "call dequeue and redis failed to evalue the script" do
expect(Redix, :command, 2, { :error, %Redix.Error{message: "a message" } })

state = %State{ queue_name: "test_queue", redis: :redis }

assert handle_call({ :dequeue, 3 }, :from, state) == { :stop, :redis_failed, :redis_failed, state }

assert validate Redix
end

test "call dequeue and timeout" do
assert dequeue(self, 1) == :timeout
end
Expand Down
26 changes: 26 additions & 0 deletions test/schedule_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ defmodule Verk.ScheduleManagerTest do
{ :ok, %{ script: script } }
end

test "init load scripts and schedule fetch" do
state = %State{ redis: :redis }

Application.put_env(:verk, :redis_url, "redis_url")
Application.put_env(:verk, :node_id, "test_node")

expect(Redix, :start_link, ["redis_url"], {:ok, :redis })
expect(Verk.Scripts, :load, [:redis], :ok)
expect(Process, :send_after, [self, :fetch_retryable, 4200], make_ref)

assert init(:args) == { :ok, state }

assert validate [Redix, Verk.Scripts, Process]
end

test "handle_info :fetch_retryable without jobs to retry", %{ script: script } do
state = %State{ redis: :redis }
now = :now
Expand All @@ -35,4 +50,15 @@ defmodule Verk.ScheduleManagerTest do

assert validate [Timex.Time, Redix]
end

test "handle_info :fetch_retryable with jobs to retry and redis failed to apply the script", %{ script: script } do
state = %State{ redis: :redis }
now = :now
expect(Timex.Time, :now, [:secs], now)
expect(Redix, :command, [:redis, ["EVALSHA", script, 1, "retry", now]], {:error, %Redix.Error{message: "a message"}})

assert handle_info(:fetch_retryable, state) == { :stop, :redis_failed, state }

assert validate [Timex.Time, Redix]
end
end

0 comments on commit 1626bbf

Please sign in to comment.