Skip to content

Commit

Permalink
Merge pull request #302 from eigr/feat/embedded-grpc
Browse files Browse the repository at this point in the history
Use a Contract Fist approach to define Actors
  • Loading branch information
sleipnir authored Jun 11, 2024
2 parents ddcc11e + 8895171 commit d8f58d1
Show file tree
Hide file tree
Showing 88 changed files with 12,562 additions and 165 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Install Protoc
uses: arduino/setup-protoc@v3

- name: Set up Elixir
uses: erlef/setup-beam@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ run-proxy-local2:
ERL_ZFLAGS='-proto_dist inet_tls -ssl_dist_optfile rel/overlays/local-mtls.ssl.conf' cd spawn_proxy/proxy && mix deps.get && PROXY_DATABASE_TYPE=$(database) PROXY_HTTP_PORT=9003 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= iex --name spawn_a2@test.default.svc -S mix

run-proxy-local-3:
cd spawn_proxy/proxy && mix deps.get && SPAWN_PROXY_LOGGER_LEVEL=info PROXY_CLUSTER_STRATEGY=epmd SPAWN_USE_INTERNAL_NATS=true SPAWN_PUBSUB_ADAPTER=nats PROXY_DATABASE_PORT=3307 PROXY_DATABASE_TYPE=mariadb PROXY_DATABASE_POOL_SIZE=30 PROXY_HTTP_PORT=9003 USER_FUNCTION_PORT=8091 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= iex --name spawn_a3@127.0.0.1 -S mix
cd spawn_proxy/proxy && mix deps.get && SPAWN_PROXY_LOGGER_LEVEL=info PROXY_CLUSTER_STRATEGY=epmd SPAWN_USE_INTERNAL_NATS=false SPAWN_PUBSUB_ADAPTER=nats PROXY_DATABASE_PORT=3307 PROXY_DATABASE_TYPE=mariadb PROXY_DATABASE_POOL_SIZE=30 PROXY_HTTP_PORT=9003 USER_FUNCTION_PORT=8091 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= iex --name spawn_a3@127.0.0.1 -S mix

run-proxy-local-nodejs-test:
ERL_ZFLAGS='-proto_dist inet_tls -ssl_dist_optfile rel/overlays/local-mtls.ssl.conf' cd spawn_proxy/proxy && mix deps.get && PROXY_DATABASE_TYPE=$(database) PROXY_HTTP_PORT=9001 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= PROXY_ACTOR_SYSTEM_NAME=SpawnSysTest SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=crdt iex --name spawn_a1@test.default.svc -S mix
Expand Down
52 changes: 47 additions & 5 deletions compile-pb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,51 @@ set -o nounset
set -o errexit
set -o pipefail

protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/any.proto
protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/actor.proto
protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/protocol.proto
protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/state.proto
protoc --elixir_out=gen_descriptors=true,plugins=grpc:./lib/spawn/grpc --proto_path=priv/protos/grpc/ priv/protos/grpc/reflection/v1alpha/reflection.proto

#protoc --elixir_out=gen_descriptors=true:./lib/spawn/cloudevents --proto_path=priv/protos/io/cloudevents/v1 priv/protos/io/cloudevents/v1/spec.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/any.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/empty.proto
# protoc --elixir_out=./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/descriptor.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/duration.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/timestamp.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/struct.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/source_context.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/field_mask.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/google/protobuf --proto_path=priv/protos/google/protobuf priv/protos/google/protobuf/wrappers.proto

# protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/extensions.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/actor.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/protocol.proto
# protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/state.proto


#protoc --elixir_out=gen_descriptors=true:./lib/spawn/actors --proto_path=priv/protos priv/protos/eigr/functions/protocol/actors/healthcheck.proto

#protoc --elixir_out=gen_descriptors=true:./lib/spawn/cloudevents --proto_path=priv/protos/io/cloudevents/v1 priv/protos/io/cloudevents/v1/spec.proto

PROTOS=("
priv/protos/eigr/functions/protocol/actors/extensions.proto
priv/protos/eigr/functions/protocol/actors/actor.proto
priv/protos/eigr/functions/protocol/actors/protocol.proto
priv/protos/eigr/functions/protocol/actors/state.proto
priv/protos/eigr/functions/protocol/actors/healthcheck.proto
")

BASE_PATH=`pwd`

echo "Base protobuf path is: $BASE_PATH/priv/protos"

for file in $PROTOS; do
echo "Compiling file $BASE_PATH/$file..."

mix protobuf.generate \
--output-path=./lib/spawn/actors \
--include-docs=true \
--generate-descriptors=true \
--include-path=$BASE_PATH/priv/protos/ \
--include-path=./priv/protos/google/protobuf \
--include-path=./priv/protos/google/api \
--plugins=ProtobufGenerate.Plugins.GRPCWithOptions \
--one-file-per-module \
$BASE_PATH/$file
done
5 changes: 2 additions & 3 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ config :do_it, DoIt.Commfig,

config :logger,
backends: [:console],
truncate: 65536

# level: :info
truncate: 65536,
level: :debug

# compile_time_purge_matching: [
# [level_lower_than: :info]
Expand Down
2 changes: 1 addition & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import Config

if config_env() == :prod do
config :logger,
level: String.to_atom(System.get_env("SPAWN_PROXY_LOGGER_LEVEL", "debug"))
level: String.to_atom(System.get_env("SPAWN_PROXY_LOGGER_LEVEL", "info"))
end
151 changes: 151 additions & 0 deletions lib/actors/actor/caller_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ defmodule Actors.Actor.CallerConsumer do
reply_to_producer(from, get_state(event))
end

defp dispatch_to_actor({from, {:readiness, event, _opts}} = _producer_event) do
reply_to_producer(from, readiness(event))
end

defp dispatch_to_actor({from, {:liveness, event, _opts}} = _producer_event) do
reply_to_producer(from, liveness(event))
end

defp dispatch_to_actor({from, {:spawn_actor, event, opts}} = _producer_event) do
reply_to_producer(from, spawn_actor(event, opts))
end
Expand Down Expand Up @@ -143,6 +151,7 @@ defmodule Actors.Actor.CallerConsumer do
opts
) do
if Sidecar.GracefulShutdown.running?() do
# actors ++ [%Actor{id: %ActorId{} = id, settings: %ActorSettings{} = settings}]
actors
|> Map.values()
|> Enum.map(fn actor -> ActorPool.create_actor_host_pool(actor, opts) end)
Expand Down Expand Up @@ -235,6 +244,148 @@ defmodule Actors.Actor.CallerConsumer do
end
end

@doc """
Performs a readiness check for a given actor identified by `%ActorId{}`.
This function uses a retry mechanism with exponential backoff, randomization, and a 30-second expiry to handle errors and failures gracefully.
It attempts to check the readiness of the specified actor, logging any errors encountered during the process.
## Parameters
- `id`: An `%ActorId{}` struct that contains:
- `name`: The name of the actor.
- `system`: The name of the system the actor belongs to.
## Returns
- `{:ok, %HealthCheckReply{}}` if the readiness check is successful. The `HealthCheckReply` struct contains:
- `status`: A `HealthcheckStatus` struct with:
- `status`: A string indicating the status, e.g., "OK".
- `details`: A string providing additional details, e.g., "I'm alive!".
- `updated_at`: A `Google.Protobuf.Timestamp` indicating the last update time.
- An error tuple (e.g., `{:error, :noproc}`) if the readiness check fails after all retry attempts.
## Examples
iex> readiness(%ActorId{name: "actor1", system: "system1"})
{:ok,
%HealthCheckReply{
status: %HealthcheckStatus{
status: "OK",
details: "I'm alive!",
updated_at: %Google.Protobuf.Timestamp{seconds: 1717606730}
}
}}
iex> readiness(%ActorId{name: "nonexistent_actor", system: "system1"})
{:error, :noproc}
## Notes
The retry mechanism handles the following cases: `:error`, `:exit`, `:noproc`, `:erpc`, `:noconnection`, and `:timeout`. It rescues only `ErlangError`.
The readiness check is performed by calling `ActorEntity.readiness/2` on the actor reference obtained through `do_lookup_action/4`.
Any errors during the readiness check are logged with a message indicating the actor's name and the error encountered.
"""
@spec readiness(ActorId.t()) :: {:ok, HealthCheckReply.t()} | {:error, any()}
def readiness(%ActorId{name: actor_name, system: system_name} = id) do
retry with: exponential_backoff() |> randomize |> expiry(30_000),
atoms: [:error, :exit, :noproc, :erpc, :noconnection, :timeout],
rescue_only: [ErlangError] do
try do
do_lookup_action(
system_name,
{false, system_name, actor_name, id},
nil,
fn actor_ref, _actor_ref_id ->
ActorEntity.readiness(actor_ref)
end
)
rescue
e ->
Logger.error("Failure to make a call to actor #{inspect(actor_name)} #{inspect(e)}")

reraise e, __STACKTRACE__
end
after
result -> result
else
error -> error
end
end

@doc """
Performs a liveness check for a given actor identified by `%ActorId{}`.
This function uses a retry mechanism with exponential backoff, randomization, and a 30-second expiry to handle errors and failures gracefully.
It attempts to check the liveness of the specified actor, logging any errors encountered during the process.
## Parameters
- `id`: An `%ActorId{}` struct that contains:
- `name`: The name of the actor.
- `system`: The name of the system the actor belongs to.
## Returns
- `{:ok, %HealthCheckReply{}}` if the liveness check is successful. The `HealthCheckReply` struct contains:
- `status`: A `HealthcheckStatus` struct with:
- `status`: A string indicating the status, e.g., "OK".
- `details`: A string providing additional details, e.g., "I'm alive!".
- `updated_at`: A `Google.Protobuf.Timestamp` indicating the last update time.
- An error tuple (e.g., `{:error, :noproc}`) if the liveness check fails after all retry attempts.
## Examples
iex> liveness(%ActorId{name: "actor1", system: "system1"})
{:ok,
%HealthCheckReply{
status: %HealthcheckStatus{
status: "OK",
details: "I'm still alive!",
updated_at: %Google.Protobuf.Timestamp{seconds: 1717606837}
}
}}
iex> liveness(%ActorId{name: "nonexistent_actor", system: "system1"})
{:error, :noproc}
## Notes
The retry mechanism handles the following cases: `:error`, `:exit`, `:noproc`, `:erpc`, `:noconnection`, and `:timeout`. It rescues only `ErlangError`.
The liveness check is performed by calling `ActorEntity.liveness/2` on the actor reference obtained through `do_lookup_action/4`.
Any errors during the liveness check are logged with a message indicating the actor's name and the error encountered.
"""
@spec liveness(ActorId.t()) :: {:ok, HealthCheckReply.t()} | {:error, any()}
def liveness(%ActorId{name: actor_name, system: system_name} = id) do
retry with: exponential_backoff() |> randomize |> expiry(30_000),
atoms: [:error, :exit, :noproc, :erpc, :noconnection, :timeout],
rescue_only: [ErlangError] do
try do
do_lookup_action(
system_name,
{false, system_name, actor_name, id},
nil,
fn actor_ref, _actor_ref_id ->
ActorEntity.liveness(actor_ref)
end
)
rescue
e ->
Logger.error("Failure to make a call to actor #{inspect(actor_name)} #{inspect(e)}")

reraise e, __STACKTRACE__
end
after
result -> result
else
error -> error
end
end

@doc """
Spawns an actor or a group of actors based on the provided `SpawnRequest`.
Expand Down
46 changes: 46 additions & 0 deletions lib/actors/actor/caller_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,52 @@ defmodule Actors.Actor.CallerProducer do
end
end

@doc """
Performs a readiness check for a given actor.
## Parameters
- `actor_id` (ActorId.t()): The ID of the actor.
- `opts` (any): Additional options.
## Returns
- `{:ok, response}`: If the response is successfully.
- `{:error, reason}`: If an error occurs during the operation.
"""
@spec readiness(ActorId.t()) :: {:ok, term()} | {:error, term()}
def readiness(actor_id, opts \\ []) do
if Config.get(:actors_global_backpressure_enabled) do
GenStage.call(__MODULE__, {:enqueue, {:readiness, actor_id, opts}}, :infinity)
else
CallerConsumer.readiness(actor_id)
end
end

@doc """
Performs a liveness check for a given actor.
## Parameters
- `actor_id` (ActorId.t()): The ID of the actor.
- `opts` (any): Additional options.
## Returns
- `{:ok, response}`: If the response is successfully.
- `{:error, reason}`: If an error occurs during the operation.
"""
@spec liveness(ActorId.t()) :: {:ok, term()} | {:error, term()}
def liveness(actor_id, opts \\ []) do
if Config.get(:actors_global_backpressure_enabled) do
GenStage.call(__MODULE__, {:enqueue, {:liveness, actor_id, opts}}, :infinity)
else
CallerConsumer.liveness(actor_id)
end
end

@doc """
Registers an actor with the specified registration request.
Expand Down
Loading

0 comments on commit d8f58d1

Please sign in to comment.