Skip to content

Commit

Permalink
backup
Browse files Browse the repository at this point in the history
  • Loading branch information
deadtrickster committed Sep 2, 2024
1 parent 75392bb commit f0c8b4a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 39 deletions.
64 changes: 26 additions & 38 deletions deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -340,46 +340,49 @@ process_connect(Implicit, Frame,
{?HEADER_HEART_BEAT,
io_lib:format("~B,~B", [SendTimeout, ReceiveTimeout])},
{?HEADER_VERSION, Version}],
ok('CONNECTED',
case application:get_env(rabbitmq_stomp, hide_server_info, false) of
true -> Headers;
false -> [{?HEADER_SERVER, server_header()} | Headers]
end,
"",
StateN1#state{cfg = #cfg{

Res = ok("CONNECTED",
case application:get_env(rabbitmq_stomp, hide_server_info, false) of
true -> Headers;
false -> [{?HEADER_SERVER, server_header()} | Headers]
end,
"",
StateN1#state{cfg = #cfg{
session_id = SessionId,
version = Version
},
user = User,
authz_ctx = AuthzCtx})
version = Version
},
user = User,
authz_ctx = AuthzCtx}),
self() ! connection_created,
Res
else
{error, no_common_version} ->
error("Version mismatch",
"Supported versions are ~ts~n",
[string:join(?SUPPORTED_VERSIONS, ",")],
StateN);
{error, not_allowed} ->
{error, not_allowed, EUsername, EVHost} ->
rabbit_log:warning("STOMP login failed for user '~ts': "
"virtual host access not allowed", [Username]),
"virtual host access not allowed", [EUsername]),
error("Bad CONNECT", "Virtual host '" ++
binary_to_list(VHost) ++
binary_to_list(EVHost) ++
"' access denied", State);
{refused, Username1, _Msg, _Args} ->
rabbit_log:warning("STOMP login failed for user '~ts': authentication failed", [Username1]),
error("Bad CONNECT", "Access refused for user '" ++
binary_to_list(Username1) ++ "'", [], State);
{error, not_loopback} ->
{error, not_loopback, EUsername} ->
rabbit_log:warning("STOMP login failed for user '~ts': "
"this user's access is restricted to localhost", [Username]),
"this user's access is restricted to localhost", [EUsername]),
error("Bad CONNECT", "non-loopback access denied", State)
end,
end
case {Res, Implicit} of
{{ok, _, StateN2}, implicit} ->
self() ! connection_created, ok(StateN2);
_ ->
self() ! connection_created, Res
end
end,

end,
State).

creds(_, _, #cfg{default_login = DefLogin,
Expand Down Expand Up @@ -903,16 +906,6 @@ do_send(Destination, _DestHdr,

io:format("Message: ~p~n", [Message]),

%% {ok, BasicMessage} = rabbit_basic:message(ExchangeName, RoutingKey, Content),

%% Delivery = #delivery{
%% mandatory = false,
%% confirm = DoConfirm,
%% sender = self(),
%% message = BasicMessage,
%% msg_seq_no = MsgSeqNo,
%% flow = Flow
%% },
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
io:format("QNames ~p~n", [QNames]),

Expand Down Expand Up @@ -1309,8 +1302,11 @@ ensure_reply_queue(TempQueueId, State = #state{reply_queues = RQS,
#resource{name = QNameBin} = QName = amqqueue:get_name(Queue),

ConsumerTag = rabbit_stomp_util:consumer_tag_reply_to(TempQueueId),


{ok, {_Global, DefaultPrefetch}} = application:get_env(rabbit, default_consumer_prefetch),
Spec = #{no_ack => true,
prefetch_count => application:get_env(rabbit, default_consumer_prefetch),
prefetch_count => DefaultPrefetch,
consumer_tag => ConsumerTag,
exclusive_consume => false,
args => []},
Expand Down Expand Up @@ -1709,7 +1705,6 @@ check_resource_access(User, Resource, Perm, Context) ->

handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
State0 = #state{queue_states = QStates0} = State) ->
credit_flow:peer_down(QPid),
case rabbit_queue_type:handle_down(QPid, QName, Reason, QStates0) of
{ok, QStates1, Actions} ->
State1 = State0#state{queue_states = QStates1},
Expand Down Expand Up @@ -1773,13 +1768,6 @@ handle_queue_actions(Actions, #state{} = State0) ->
record_rejects(Rej, S);
({queue_down, QRef}, S0) ->
handle_consuming_queue_down_or_eol(QRef, S0);
%% TODO: I have no idea about the scope of credit_flow
({block, QName}, S0) ->
credit_flow:block(QName),
S0;
({unblock, QName}, S0) ->
credit_flow:unblock(QName),
S0;
%% TODO: in rabbit_channel there code for handling
%% send_drained and send_credit_reply
%% I'm doing catch all here to not crash?
Expand Down
15 changes: 14 additions & 1 deletion deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

-include("rabbit_stomp.hrl").
-include("rabbit_stomp_frame.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-record(reader_state, {
socket,
Expand Down Expand Up @@ -404,6 +403,20 @@ processor_args(Configuration, Sock) ->
ssl_login_name(RealSocket, Configuration), PeerAddr}.

adapter_info(Sock) ->
case rabbit_net:socket_ends(Socket, inbound) of
{ok, {PeerIp, PeerPort, Ip, Port}} ->
#amqp_adapter_info{protocol = {'STOMP', 0},
name = Name,
host = Host,
port = Port,
peer_host = PeerHost,
peer_port = PeerPort,
additional_info = maybe_ssl_info(Sock)}
process_connect(ConnectPacket, Socket, ConnName, SendFun, SocketEnds);
{error, Reason} ->
{error, {socket_ends, Reason}}
end.

amqp_connection:socket_adapter_info(Sock, {'STOMP', 0}).

ssl_login_name(_Sock, #stomp_configuration{ssl_cert_login = false}) ->
Expand Down

0 comments on commit f0c8b4a

Please sign in to comment.