Skip to content

Commit

Permalink
Merge branch 'allow_standalone_client'
Browse files Browse the repository at this point in the history
  • Loading branch information
cjab committed Apr 27, 2021
2 parents 39d6834 + 8aabe75 commit c166c25
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 84 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Weddell is an Elixir client for [Google Pub/Sub](https://cloud.google.com/pubsub
```elixir
def deps do
[
{:weddell, "~> 0.3"},
{:weddell, "~> 0.4"},
{:goth, "~> 0.11"},
]
end
Expand All @@ -29,6 +29,10 @@ config :goth,
json: {:system, "GCP_CREDENTIALS_JSON"}
```

By default Weddell will start a client and connect on application start.
This can be disabled by setting `:no_connect_on_start` in the application config.
Clients can then be started with `Weddell.Client.start_link/3`.

## Getting Started

### Creating a topic and subscription
Expand Down
114 changes: 62 additions & 52 deletions lib/weddell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,33 @@ defmodule Weddell do
use Application

alias GRPC.RPCError
alias Weddell.{Message,
Client,
Client.Publisher,
SubscriptionDetails}
alias Weddell.{Message, Client, Client.Publisher, SubscriptionDetails}

@typedoc "An RPC error"
@type error :: {:error, RPCError.t}
@type error :: {:error, RPCError.t()}

@doc """
Start Weddell and connect to the Pub/Sub server.
"""
def start(_type, _args) do
import Supervisor.Spec
children = [worker(Client, [])]

children =
case Application.get_env(:weddell, :no_connect_on_start, false) do
true -> []
false -> [worker(Client, [])]
end

opts = [strategy: :one_for_one, name: __MODULE__]
Supervisor.start_link(children, opts)
end

@doc """
Return the client currently connected to Pub/Sub.
"""
@spec client(timeout :: integer()) :: Client.t
@spec client(timeout :: integer()) :: Client.t()
def client(timeout \\ 5000) do
GenServer.call(Weddell.Client, {:client}, timeout)
Weddell.Client.client(Weddell.Client, timeout)
end

@doc """
Expand All @@ -38,9 +41,9 @@ defmodule Weddell do
Weddell.create_topic("foo")
#=> :ok
"""
@spec create_topic(topic_name :: String.t, timeout :: integer()) :: :ok | error
@spec create_topic(topic_name :: String.t(), timeout :: integer()) :: :ok | error
def create_topic(name, timeout \\ 5000) do
GenServer.call(Weddell.Client, {:create_topic, name}, timeout)
Weddell.Client.create_topic(Weddell.Client, name, timeout)
end

@doc """
Expand All @@ -50,9 +53,9 @@ defmodule Weddell do
Weddell.delete_topic("foo")
#=> :ok
"""
@spec delete_topic(topic_name :: String.t, timeout :: integer()) :: :ok | error
@spec delete_topic(topic_name :: String.t(), timeout :: integer()) :: :ok | error
def delete_topic(name, timeout \\ 5000) do
GenServer.call(Weddell.Client, {:delete_topic, name}, timeout)
Weddell.Client.delete_topic(Weddell.Client, name, timeout)
end

@doc """
Expand All @@ -76,12 +79,12 @@ defmodule Weddell do
* `:cursor` - List topics starting at a cursor returned by an earlier call.
_(default: nil)_
"""
@spec topics(opts :: Client.list_options, timeout :: integer()) ::
{:ok, topic_names :: [String.t]} |
{:ok, topic_names :: [String.t], Client.cursor} |
error
@spec topics(opts :: Client.list_options(), timeout :: integer()) ::
{:ok, topic_names :: [String.t()]}
| {:ok, topic_names :: [String.t()], Client.cursor()}
| error
def topics(opts \\ [], timeout \\ 5000) do
GenServer.call(Weddell.Client, {:topics, opts}, timeout)
Weddell.Client.topics(Weddell.Client, opts, timeout)
end

@doc """
Expand All @@ -104,13 +107,15 @@ defmodule Weddell do
to the specified URL. For example, a Webhook endpoint might
use "https://example.com/push". _(default: nil)_
"""
@spec create_subscription(subscription_name :: String.t,
topic_name :: String.t,
Client.subscription_options,
timeout :: integer()) ::
:ok | error
@spec create_subscription(
subscription_name :: String.t(),
topic_name :: String.t(),
Client.subscription_options(),
timeout :: integer()
) ::
:ok | error
def create_subscription(name, topic, opts \\ [], timeout \\ 5000) do
GenServer.call(Weddell.Client, {:create_subscription, name, topic, opts}, timeout)
Weddell.Client.create_subscription(Weddell.Client, name, topic, opts, timeout)
end

@doc """
Expand All @@ -120,10 +125,10 @@ defmodule Weddell do
Weddell.delete_subscription("foo")
#=> :ok
"""
@spec delete_subscription(subscription_name :: String.t, timeout :: integer()) ::
:ok | error
@spec delete_subscription(subscription_name :: String.t(), timeout :: integer()) ::
:ok | error
def delete_subscription(name, timeout \\ 5000) do
GenServer.call(Weddell.Client, {:delete_subscription, name}, timeout)
Weddell.Client.delete_subscription(Weddell.Client, name, timeout)
end

@doc """
Expand All @@ -147,12 +152,12 @@ defmodule Weddell do
* `:cursor` - List subscriptions starting at a cursor returned by an earlier call.
_(default: nil)_
"""
@spec subscriptions(opts :: Client.list_options, timeout :: integer()) ::
{:ok, subscriptions :: [SubscriptionDetails.t]} |
{:ok, subscriptions :: [SubscriptionDetails.t], Client.cursor} |
error
@spec subscriptions(opts :: Client.list_options(), timeout :: integer()) ::
{:ok, subscriptions :: [SubscriptionDetails.t()]}
| {:ok, subscriptions :: [SubscriptionDetails.t()], Client.cursor()}
| error
def subscriptions(opts \\ [], timeout \\ 5000) do
GenServer.call(Weddell.Client, {:subscriptions, opts}, timeout)
Weddell.Client.subscriptions(Weddell.Client, opts, timeout)
end

@doc """
Expand All @@ -176,14 +181,16 @@ defmodule Weddell do
* `:cursor` - List subscriptions starting at a cursor returned by an earlier call.
_(default: nil)_
"""
@spec topic_subscriptions(topic :: String.t,
opts :: Client.list_options,
timeout :: integer()) ::
{:ok, subscriptions :: [String.t]} |
{:ok, subscriptions :: [String.t], Client.cursor} |
error
@spec topic_subscriptions(
topic :: String.t(),
opts :: Client.list_options(),
timeout :: integer()
) ::
{:ok, subscriptions :: [String.t()]}
| {:ok, subscriptions :: [String.t()], Client.cursor()}
| error
def topic_subscriptions(topic, opts \\ [], timeout \\ 5000) do
GenServer.call(Weddell.Client, {:topic_subscriptions, topic, opts}, timeout)
Weddell.Client.topic_subscriptions(Weddell.Client, topic, opts, timeout)
end

@doc """
Expand Down Expand Up @@ -216,12 +223,14 @@ defmodule Weddell do
|> Weddell.publish("foo-topic")
"""
@spec publish(Publisher.new_message | [Publisher.new_message],
topic_name :: String.t,
timeout :: integer()) ::
:ok | error
@spec publish(
Publisher.new_message() | [Publisher.new_message()],
topic_name :: String.t(),
timeout :: integer()
) ::
:ok | error
def publish(messages, topic, timeout \\ 5000) do
GenServer.call(Weddell.Client, {:publish, messages, topic}, timeout)
Weddell.Client.publish(Weddell.Client, messages, topic, timeout)
end

@doc """
Expand All @@ -241,11 +250,10 @@ defmodule Weddell do
* `:max_messages` - The maximum number of messages to be returned,
it may be fewer. _(default: 10)_
"""
@spec pull(subscription_name :: String.t, Client.pull_options,
timeout :: integer()) ::
{:ok, messages :: [Message.t]} | error
@spec pull(subscription_name :: String.t(), Client.pull_options(), timeout :: integer()) ::
{:ok, messages :: [Message.t()]} | error
def pull(subscription, opts \\ [], timeout \\ 5000) do
GenServer.call(Weddell.Client, {:pull, subscription, opts}, timeout)
Weddell.Client.pull(Weddell.Client, subscription, opts, timeout)
end

@doc """
Expand All @@ -257,11 +265,13 @@ defmodule Weddell do
Weddell.acknowledge(messages, "foo-subscription")
#=> :ok
"""
@spec acknowledge(messages :: [Message.t] | Message.t,
subscription_name :: String.t,
timeout :: integer()) ::
:ok | error
@spec acknowledge(
messages :: [Message.t()] | Message.t(),
subscription_name :: String.t(),
timeout :: integer()
) ::
:ok | error
def acknowledge(messages, subscription, timeout \\ 5000) do
GenServer.call(Weddell.Client, {:acknowledge, messages, subscription}, timeout)
Weddell.Client.acknowledge(Weddell.Client, messages, subscription, timeout)
end
end
Loading

0 comments on commit c166c25

Please sign in to comment.