Skip to content

Commit

Permalink
feat: opentelemetry for gen_lb (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
chsukivra authored Aug 27, 2024
1 parent 76cbe5b commit d545973
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 11 deletions.
4 changes: 4 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@
]}.

{xref_checks, [undefined_function_calls]}.

{deps, [
{opentelemetry_api, "~> 1.0"}
]}.
14 changes: 13 additions & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
@@ -1 +1,13 @@
[].
{"1.2.0",
[{<<"opentelemetry_api">>,{pkg,<<"opentelemetry_api">>,<<"1.3.0">>},0},
{<<"opentelemetry_semantic_conventions">>,
{pkg,<<"opentelemetry_semantic_conventions">>,<<"0.2.0">>},
1}]}.
[
{pkg_hash,[
{<<"opentelemetry_api">>, <<"03E2177F28DD8D11AAA88E8522C81C2F6A788170FE52F7A65262340961E663F9">>},
{<<"opentelemetry_semantic_conventions">>, <<"B67FE459C2938FCAB341CB0951C44860C62347C005ACE1B50F8402576F241435">>}]},
{pkg_hash_ext,[
{<<"opentelemetry_api">>, <<"B9E5FF775FD064FA098DBA3C398490B77649A352B40B0B730A6B7DC0BDD68858">>},
{<<"opentelemetry_semantic_conventions">>, <<"D61FA1F5639EE8668D74B527E6806E0503EFC55A42DB7B5F39939D84C07D6895">>}]}
].
59 changes: 49 additions & 10 deletions src/gen_lb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

%%%_* Includes =========================================================
-include_lib("kernel/include/logger.hrl").
-include_lib("opentelemetry_api/include/opentelemetry.hrl").
-include_lib("opentelemetry_api/include/otel_tracer.hrl").

%%%_* Behaviour ========================================================
-callback exec(any(), list()) -> {ok, _} | {error, _}.
Expand All @@ -54,6 +56,7 @@
, args = throw(args)
, from = throw(from)
, node = throw(node)
, parent_ctx = throw(parent_ctx)
}).

-record(s, { cluster_up = throw(cluster_up)
Expand All @@ -77,8 +80,25 @@ stop(Ref) ->
call(Ref, Args) ->
call(Ref, Args, []).

call(Ref, Args, Options) ->
gen_server:call(Ref, {call, Args, Options}, infinity).
call(Ref, Args, Options0) ->
SpanName = case Ref of
_ when is_atom(Ref) ->
iolist_to_binary([<<"gen_lb call ">>, atom_to_binary(Ref)]);
_ ->
<<"gen_lb call">>
end,
StartOpts = #{ attributes => #{},
links => [],
is_recording => true,
start_time => opentelemetry:timestamp(),
kind => ?SPAN_KIND_INTERNAL
},
?with_span(SpanName, StartOpts,
fun (_SpanCtx) ->
Ctx = otel_ctx:get_current(),
Options = [{otel_ctx, Ctx}] ++ Options0,
gen_server:call(Ref, {call, Args, Options}, infinity)
end).

block(Ref, Node) ->
gen_server:call(Ref, {block, Node}, infinity).
Expand Down Expand Up @@ -110,10 +130,12 @@ terminate(_Rsn, S) ->
handle_call({call, _Args, _Options}, _From, #s{cluster_up=[]} = S) ->
{reply, {error, cluster_down}, S};
handle_call({call, Args, Options}, From, #s{cluster_up=[{Node,Info}|Up]} = S) ->
Pid = do_call(S#s.cb_mod, Args, From, Node),
ParentCtx = proplists:get_value(otel_ctx, Options, undefined),
Pid = do_call(S#s.cb_mod, Args, From, Node, ParentCtx),
Req = #r{ args = Args
, from = From
, node = Node
, parent_ctx = ParentCtx
, attempts = assoc(Options, attempts, ?ATTEMPTS)
},
{noreply, S#s{ cluster_up = Up ++ [{Node,Info}] %round robin lb
Expand Down Expand Up @@ -175,7 +197,7 @@ handle_info({'EXIT', Pid, {Type, Rsn}}, S)
when N1 =:= Req0#r.node -> [{N2,I2},{N1,I1}|Ns];
Up1 -> Up1
end,
NewPid = do_call(S#s.cb_mod, Req0#r.args, Req0#r.from, Node),
NewPid = do_call(S#s.cb_mod, Req0#r.args, Req0#r.from, Node, Req0#r.parent_ctx),
Req = Req0#r{ attempt = Req0#r.attempt + 1
, node = Node
},
Expand Down Expand Up @@ -205,19 +227,36 @@ code_change(_OldVsn, S, _Extra) ->
{ok, S}.

%%%_ * Internals -------------------------------------------------------
do_call(CbMod, Args, From, Node) ->
do_call(CbMod, Args, From, Node, ParentCtx) ->
erlang:spawn_link(
fun() ->
Daddy = erlang:self(),
Ref = erlang:make_ref(),
{Pid, MRef} =
erlang:spawn_monitor(
fun() ->
case CbMod:exec(Node, Args) of
ok -> Daddy ! {Ref, ok};
{ok, Res} -> Daddy ! {Ref, {ok, Res}};
{error, Rsn} -> Daddy ! {Ref, {error, Rsn}}
end
_ = otel_ctx:attach(ParentCtx),
SpanName = iolist_to_binary([<<"gen_lb exec ">>, atom_to_binary(CbMod)]),
StartOpts = #{ attributes => #{},
links => [],
is_recording => true,
start_time => opentelemetry:timestamp(),
kind => ?SPAN_KIND_INTERNAL
},
?with_span(SpanName, StartOpts,
fun(_SpanCtx) ->
case CbMod:exec(Node, Args) of
ok ->
?set_status(?OTEL_STATUS_OK, <<>>),
Daddy ! {Ref, ok};
{ok, Res} ->
?set_status(?OTEL_STATUS_OK, <<>>),
Daddy ! {Ref, {ok, Res}};
{error, Rsn} ->
?set_status(?OTEL_STATUS_ERROR, <<>>),
Daddy ! {Ref, {error, Rsn}}
end
end)
end),
receive
{Ref, ok} ->
Expand Down
1 change: 1 addition & 0 deletions src/yamq.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
, {registered, []}
, {applications, [ kernel
, stdlib
, opentelemetry_api
]}
, {env, []}
, {modules, []}
Expand Down

0 comments on commit d545973

Please sign in to comment.