Skip to content

Commit

Permalink
Merge pull request #13 from edgurgel/add-general-purpose-redis-process
Browse files Browse the repository at this point in the history
Add general purpose redis process
  • Loading branch information
alissonsales committed Jan 21, 2016
2 parents 38078a5 + 96a633c commit 260bcca
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 53 deletions.
28 changes: 8 additions & 20 deletions lib/verk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,19 @@ defmodule Verk do
* a module to perform (`class`)
* a valid `jid`
"""
@spec enqueue(pid, %Job{}) :: { :ok, binary } | { :error, term }
def enqueue(_, job = %Job{ queue: nil }), do: { :error, { :missing_queue, job } }
def enqueue(_, job = %Job{ class: nil }), do: { :error, { :missing_module, job } }
def enqueue(_, job = %Job{ args: args }) when not is_list(args), do: { :error, { :missing_args, job } }
def enqueue(redis, job = %Job{ jid: nil }) do
@spec enqueue(%Job{}) :: { :ok, binary } | { :error, term }
def enqueue(job = %Job{ queue: nil }), do: { :error, { :missing_queue, job } }
def enqueue(job = %Job{ class: nil }), do: { :error, { :missing_module, job } }
def enqueue(job = %Job{ args: args }) when not is_list(args), do: { :error, { :missing_args, job } }
def enqueue(job = %Job{ jid: nil }) do
<<part1::32, part2::32>> = :crypto.rand_bytes(8)
jid = "#{part1}.#{part2}"
enqueue(redis, %Job{ job | jid: jid })
enqueue(%Job{ job | jid: jid })
end
def enqueue(redis, %Job{ jid: jid, queue: queue } = job) do
case Redix.command(redis, ["LPUSH", "queue:#{queue}", Poison.encode!(job)]) do
def enqueue(%Job{ jid: jid, queue: queue } = job) do
case Redix.command(Verk.Redis, ["LPUSH", "queue:#{queue}", Poison.encode!(job)]) do
{ :ok, _ } -> { :ok, jid }
{ :error, reason } -> { :error, reason }
end
end

@doc """
Similar to enqueue/2
"""
@spec enqueue(%Job{}) :: { :ok, binary } | { :error, term }
def enqueue(job) do
{ :ok, redis_url } = Application.fetch_env(:verk, :redis_url)
{ :ok, redis } = Redix.start_link(redis_url)
result = enqueue(redis, job)
:ok = Redix.stop(redis)
result
end
end
16 changes: 8 additions & 8 deletions lib/verk/retry_set.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ defmodule Verk.RetrySet do
@retry_key "retry"

@doc """
Count how many jobs are inside the retry set
Counts how many jobs are inside the retry set
"""
@spec count(pid) :: integer
def count(redis) do
Redix.command!(redis, ["ZCARD", @retry_key])
@spec count :: integer
def count do
Redix.command!(Verk.Redis, ["ZCARD", @retry_key])
end

@doc """
List jobs from `start` to `stop`
Lists jobs from `start` to `stop`
"""
@spec range(pid, integer, integer) :: [Verk.Job.T]
def range(redis, start \\ 0, stop \\ -1) do
for job <- Redix.command!(redis, ["ZRANGE", @retry_key, start, stop]) do
@spec range(integer, integer) :: [Verk.Job.T]
def range(start \\ 0, stop \\ -1) do
for job <- Redix.command!(Verk.Redis, ["ZRANGE", @retry_key, start, stop]) do
Poison.decode!(job, as: Verk.Job)
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/verk/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ defmodule Verk.Supervisor do
queues = Application.get_env(:verk, :queues, [])
children = for { queue, size } <- queues, do: queue_child(queue, size)

{ :ok, redis_url } = Application.fetch_env(:verk, :redis_url)

schedule_manager = worker(Verk.ScheduleManager, [], id: :schedule_manager)
verk_event_manager = worker(GenEvent, [[name: Verk.EventManager]])
queue_stats_watcher = worker(Verk.QueueStatsWatcher, [])
redis = worker(Redix, [redis_url, [name: Verk.Redis]])

children = [verk_event_manager | [queue_stats_watcher | [schedule_manager | children]]]
children = [redis, verk_event_manager, queue_stats_watcher, schedule_manager] ++ children
supervise(children, strategy: :one_for_one)
end

Expand Down
32 changes: 18 additions & 14 deletions test/retry_set_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,35 @@ defmodule Verk.RetryTest do
import Verk.RetrySet

setup do
{ :ok, redis } = Application.fetch_env(:verk, :redis_url)
{ :ok, pid } = Application.fetch_env(:verk, :redis_url)
|> elem(1)
|> Redix.start_link
Redix.command!(redis, ~w(DEL retry))
{ :ok, %{ redis: redis } }
|> Redix.start_link([name: Verk.Redis])
Redix.command!(pid, ~w(DEL retry))
on_exit fn ->
ref = Process.monitor(pid)
assert_receive {:DOWN, ^ref, _, _, _}
end
:ok
end

test "count", %{ redis: redis } do
Redix.command!(redis, ~w(ZADD retry 123 abc))
test "count" do
Redix.command!(Verk.Redis, ~w(ZADD retry 123 abc))

assert count(redis) == 1
assert count == 1
end

test "count with no items", %{ redis: redis } do
assert count(redis) == 0
test "count with no items" do
assert count == 0
end

test "range", %{ redis: redis } do
test "range" do
job = %Verk.Job{class: "Class", args: []}
Redix.command!(redis, ~w(ZADD retry 123 #{Poison.encode!(job)}))
Redix.command!(Verk.Redis, ~w(ZADD retry 123 #{Poison.encode!(job)}))

assert range(redis) == [job]
assert range == [job]
end

test "range with no items", %{ redis: redis } do
assert range(redis) == []
test "range with no items" do
assert range == []
end
end
19 changes: 9 additions & 10 deletions test/verk_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ defmodule VerkTest do
job = %Verk.Job{ queue: "test_queue", jid: "job_id", class: "TestWorker", args: [] }
encoded_job = "encoded_job"
expect(Poison, :encode!, [job], encoded_job)
expect(Redix, :command, [:redis, ["LPUSH", "queue:test_queue", encoded_job]], { :ok, :_ })
expect(Redix, :command, [Verk.Redis, ["LPUSH", "queue:test_queue", encoded_job]], { :ok, :_ })

assert enqueue(:redis, job) == { :ok, "job_id" }
assert enqueue(job) == { :ok, "job_id" }

assert validate [Poison, Redix]
end
Expand All @@ -66,10 +66,9 @@ defmodule VerkTest do
job = %Verk.Job{ queue: "test_queue", jid: "job_id", class: "TestWorker", args: [] }
encoded_job = "encoded_job"
expect(Poison, :encode!, [job], encoded_job)
expect(Redix, :start_link, 1, { :ok, :redis })
expect(Redix, :command, [:redis, ["LPUSH", "queue:test_queue", encoded_job]], { :ok, :_ })
expect(Redix, :command, [Verk.Redis, ["LPUSH", "queue:test_queue", encoded_job]], { :ok, :_ })

assert enqueue(:redis, job) == { :ok, "job_id" }
assert enqueue(job) == { :ok, "job_id" }

assert validate [Poison, Redix]
end
Expand All @@ -79,28 +78,28 @@ defmodule VerkTest do
encoded_job = "encoded_job"

expect(Poison, :encode!, 1, encoded_job)
expect(Redix, :command, [:redis, ["LPUSH", "queue:test_queue", encoded_job]], { :ok, :_ })
expect(Redix, :command, [Verk.Redis, ["LPUSH", "queue:test_queue", encoded_job]], { :ok, :_ })

{ :ok, jid } = enqueue(:redis, job)
{ :ok, jid } = enqueue(job)

assert is_binary(jid)
end

test "enqueue a job without a queue" do
job = %Verk.Job{ queue: nil, jid: "job_id" }

assert enqueue(:redis, job) == { :error, { :missing_queue, job } }
assert enqueue(job) == { :error, { :missing_queue, job } }
end

test "enqueue a job with non-list args" do
job = %Verk.Job{ queue: "queue", jid: "job_id", class: "TestWorker", args: 123 }

assert enqueue(:redis, job) == { :error, { :missing_args, job } }
assert enqueue(job) == { :error, { :missing_args, job } }
end

test "enqueue a job with no module to perform" do
job = %Verk.Job{ queue: "queue", jid: "job_id", args: [123], class: nil }

assert enqueue(:redis, job) == { :error, { :missing_module, job } }
assert enqueue(job) == { :error, { :missing_module, job } }
end
end

0 comments on commit 260bcca

Please sign in to comment.