diff --git a/src/greptimedb_stream.erl b/src/greptimedb_stream.erl index 19fa7ec..3a06990 100644 --- a/src/greptimedb_stream.erl +++ b/src/greptimedb_stream.erl @@ -43,12 +43,12 @@ write_request(Stream, Request) -> end. %% @doc Finish the gRPC stream and wait the result. --spec finish(Stream :: map()) -> {ok, term()} | {error, term()}. +-spec finish(Stream :: map()) -> {ok, term()} | {error, term(), term()} | timeout | stream_finished. finish(Stream) -> finish(Stream, 10_000). %% @doc Finish the gRPC stream and wait the result with timeout in milliseconds. --spec finish(Stream :: map(), Timeout :: integer()) -> {ok, term()} | {error, term()}. +-spec finish(Stream :: map(), Timeout :: integer()) -> {ok, term()} | {error, term(), term()} | timeout | stream_finished. finish(Stream, Timeout) -> try ok = grpcbox_client:close_send(Stream), diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index faecf98..1f49856 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -203,7 +203,15 @@ shoot(Stream, ?REQ(Req, _), #state{requests = #{pending_count := 0}} = State, Re %% Write the last request and finish stream case greptimedb_stream:write_request(Stream, Req) of ok -> - Result = greptimedb_stream:finish(Stream), + Result = case greptimedb_stream:finish(Stream) of + {ok, Resp} -> + {ok, Resp}; + {error, {?GRPC_STATUS_UNAUTHENTICATED, Msg}, Other} -> + {error, {unauth, Msg, Other}}; + Err -> + {error, Err} + end, + lists:foreach(fun(ReplyTo) -> reply(ReplyTo, Result) end, ReplyToList); diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 331ec27..0b209ba 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -163,7 +163,22 @@ t_auth_error(_) -> {pool_type, random}, {auth, {basic, #{username => <<"greptime_user">>, password => <<"wrong_pwd">>}}}], {ok, Client} = greptimedb:start_client(Options), + + %% sync write {error, {unauth, _, _}} = greptimedb:write(Client, Metric, Points), + %% async write + Ref = make_ref(), + TestPid = self(), + ResultCallback = {fun(Reply) -> TestPid ! {{Ref, reply}, Reply} end, []}, + + ok = greptimedb:async_write(Client, Metric, Points, ResultCallback), + receive + {{Ref, reply}, {error, {unauth, _, _}}} -> + ok; + {{Ref, reply}, _Other} -> + ?assert(false) + end, + greptimedb:stop_client(Client), ok.