1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_stomp_processor).
9
10-compile({no_auto_import, [error/3]}).
11
12-export([initial_state/2, process_frame/2, flush_and_die/1]).
13-export([flush_pending_receipts/3,
14         handle_exit/3,
15         cancel_consumer/2,
16         send_delivery/5]).
17
18-export([adapter_name/1]).
19-export([info/2]).
20
21-include_lib("amqp_client/include/amqp_client.hrl").
22-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl").
23-include("rabbit_stomp_frame.hrl").
24-include("rabbit_stomp.hrl").
25-include("rabbit_stomp_headers.hrl").
26
27-record(proc_state, {session_id, channel, connection, subscriptions,
28                version, start_heartbeat_fun, pending_receipts,
29                config, route_state, reply_queues, frame_transformer,
30                adapter_info, send_fun, ssl_login_name, peer_addr,
31                %% see rabbitmq/rabbitmq-stomp#39
32                trailing_lf, auth_mechanism, auth_login,
33                default_topic_exchange, default_nack_requeue}).
34
35-record(subscription, {dest_hdr, ack_mode, multi_ack, description}).
36
37-define(FLUSH_TIMEOUT, 60000).
38
39adapter_name(State) ->
40  #proc_state{adapter_info = #amqp_adapter_info{name = Name}} = State,
41  Name.
42
43%%----------------------------------------------------------------------------
44
45-spec initial_state(
46  #stomp_configuration{},
47  {SendFun, AdapterInfo, SSLLoginName, PeerAddr})
48    -> #proc_state{}
49  when SendFun :: fun((atom(), binary()) -> term()),
50       AdapterInfo :: #amqp_adapter_info{},
51       SSLLoginName :: atom() | binary(),
52       PeerAddr :: inet:ip_address().
53
54-type process_frame_result() ::
55    {ok, term(), #proc_state{}} |
56    {stop, term(), #proc_state{}}.
57
58-spec process_frame(#stomp_frame{}, #proc_state{}) ->
59    process_frame_result().
60
61-spec flush_and_die(#proc_state{}) -> #proc_state{}.
62
63-spec command({Command, Frame}, State) -> process_frame_result()
64    when Command :: string(),
65         Frame   :: #stomp_frame{},
66         State   :: #proc_state{}.
67
68-type process_fun() :: fun((#proc_state{}) ->
69        {ok, #stomp_frame{}, #proc_state{}}  |
70        {error, string(), string(), #proc_state{}} |
71        {stop, term(), #proc_state{}}).
72-spec process_request(process_fun(), fun((#proc_state{}) -> #proc_state{}), #proc_state{}) ->
73    process_frame_result().
74
75-spec flush_pending_receipts(DeliveryTag, IsMulti, State) -> State
76    when State :: #proc_state{},
77         DeliveryTag :: term(),
78         IsMulti :: boolean().
79
80-spec handle_exit(From, Reason, State) -> unknown_exit | {stop, Reason, State}
81    when State  :: #proc_state{},
82         From   :: pid(),
83         Reason :: term().
84
85-spec cancel_consumer(binary(), #proc_state{}) -> process_frame_result().
86
87-spec send_delivery(#'basic.deliver'{}, term(), term(), term(),
88                    #proc_state{}) -> #proc_state{}.
89
90%%----------------------------------------------------------------------------
91
92
93%%----------------------------------------------------------------------------
94%% Public API
95%%----------------------------------------------------------------------------
96
97process_frame(Frame = #stomp_frame{command = Command}, State) ->
98    command({Command, Frame}, State).
99
100flush_and_die(State) ->
101    close_connection(State).
102
103info(session_id, #proc_state{session_id = Val}) ->
104    Val;
105info(channel, #proc_state{channel = Val}) -> Val;
106info(version, #proc_state{version = Val}) -> Val;
107info(implicit_connect, #proc_state{config = #stomp_configuration{implicit_connect = Val}}) ->  Val;
108info(auth_login, #proc_state{auth_login = Val}) ->  Val;
109info(auth_mechanism, #proc_state{auth_mechanism = Val}) ->  Val;
110info(peer_addr, #proc_state{peer_addr = Val}) -> Val;
111info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val;
112info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val;
113info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val;
114info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val;
115info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) ->
116    case Val of
117        {Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)};
118        Other -> Other
119    end;
120info(channels, PState) -> additional_info(channels, PState);
121info(channel_max, PState) -> additional_info(channel_max, PState);
122info(frame_max, PState) -> additional_info(frame_max, PState);
123info(client_properties, PState) -> additional_info(client_properties, PState);
124info(ssl, PState) -> additional_info(ssl, PState);
125info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
126info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
127info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
128info(ssl_hash, PState) -> additional_info(ssl_hash, PState).
129
130initial_state(Configuration,
131    {SendFun, AdapterInfo0 = #amqp_adapter_info{additional_info = Extra},
132     SSLLoginName, PeerAddr}) ->
133  %% STOMP connections use exactly one channel. The frame max is not
134  %% applicable and there is no way to know what client is used.
135  AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info=[
136       {channels, 1},
137       {channel_max, 1},
138       {frame_max, 0},
139       %% TODO: can we use a header to make it possible for clients
140       %%       to override this value?
141       {client_properties, [{<<"product">>, longstr, <<"STOMP client">>}]}
142       |Extra]},
143  #proc_state {
144       send_fun            = SendFun,
145       adapter_info        = AdapterInfo,
146       ssl_login_name      = SSLLoginName,
147       peer_addr           = PeerAddr,
148       session_id          = none,
149       channel             = none,
150       connection          = none,
151       subscriptions       = #{},
152       version             = none,
153       pending_receipts    = undefined,
154       config              = Configuration,
155       route_state         = rabbit_routing_util:init_state(),
156       reply_queues        = #{},
157       frame_transformer   = undefined,
158       trailing_lf         = application:get_env(rabbitmq_stomp, trailing_lf, true),
159       default_topic_exchange = application:get_env(rabbitmq_stomp, default_topic_exchange, <<"amq.topic">>),
160       default_nack_requeue = application:get_env(rabbitmq_stomp, default_nack_requeue, true)}.
161
162
163command({"STOMP", Frame}, State) ->
164    process_connect(no_implicit, Frame, State);
165
166command({"CONNECT", Frame}, State) ->
167    process_connect(no_implicit, Frame, State);
168
169command(Request, State = #proc_state{channel = none,
170                             config = #stomp_configuration{
171                             implicit_connect = true}}) ->
172    {ok, State1 = #proc_state{channel = Ch}, _} =
173        process_connect(implicit, #stomp_frame{headers = []}, State),
174    case Ch of
175        none -> {stop, normal, State1};
176        _    -> command(Request, State1)
177    end;
178
179command(_Request, State = #proc_state{channel = none,
180                              config = #stomp_configuration{
181                              implicit_connect = false}}) ->
182    {ok, send_error("Illegal command",
183                    "You must log in using CONNECT first",
184                    State), none};
185
186command({Command, Frame}, State = #proc_state{frame_transformer = FT}) ->
187    Frame1 = FT(Frame),
188    process_request(
189      fun(StateN) ->
190          case validate_frame(Command, Frame1, StateN) of
191              R = {error, _, _, _} -> R;
192              _                    -> handle_frame(Command, Frame1, StateN)
193          end
194      end,
195      fun(StateM) -> ensure_receipt(Frame1, StateM) end,
196      State).
197
198cancel_consumer(Ctag, State) ->
199  process_request(
200    fun(StateN) -> server_cancel_consumer(Ctag, StateN) end,
201    State).
202
203handle_exit(Conn, {shutdown, {server_initiated_close, Code, Explanation}},
204            State = #proc_state{connection = Conn}) ->
205    amqp_death(Code, Explanation, State);
206handle_exit(Conn, {shutdown, {connection_closing,
207                    {server_initiated_close, Code, Explanation}}},
208            State = #proc_state{connection = Conn}) ->
209    amqp_death(Code, Explanation, State);
210handle_exit(Conn, Reason, State = #proc_state{connection = Conn}) ->
211    _ = send_error("AMQP connection died", "Reason: ~p", [Reason], State),
212    {stop, {conn_died, Reason}, State};
213
214handle_exit(Ch, {shutdown, {server_initiated_close, Code, Explanation}},
215            State = #proc_state{channel = Ch}) ->
216    amqp_death(Code, Explanation, State);
217
218handle_exit(Ch, Reason, State = #proc_state{channel = Ch}) ->
219    _ = send_error("AMQP channel died", "Reason: ~p", [Reason], State),
220    {stop, {channel_died, Reason}, State};
221handle_exit(Ch, {shutdown, {server_initiated_close, Code, Explanation}},
222            State = #proc_state{channel = Ch}) ->
223    amqp_death(Code, Explanation, State);
224handle_exit(_, _, _) -> unknown_exit.
225
226
227process_request(ProcessFun, State) ->
228    process_request(ProcessFun, fun (StateM) -> StateM end, State).
229
230
231process_request(ProcessFun, SuccessFun, State) ->
232    Res = case catch ProcessFun(State) of
233              {'EXIT',
234               {{shutdown,
235                 {server_initiated_close, ReplyCode, Explanation}}, _}} ->
236                  amqp_death(ReplyCode, Explanation, State);
237              {'EXIT', {amqp_error, access_refused, Msg, _}} ->
238                  amqp_death(access_refused, Msg, State);
239              {'EXIT', Reason} ->
240                  priv_error("Processing error", "Processing error",
241                              Reason, State);
242              Result ->
243                  Result
244          end,
245    case Res of
246        {ok, Frame, NewState = #proc_state{connection = Conn}} ->
247            _ = case Frame of
248                    none -> ok;
249                    _    -> send_frame(Frame, NewState)
250                end,
251            {ok, SuccessFun(NewState), Conn};
252        {error, Message, Detail, NewState = #proc_state{connection = Conn}} ->
253            {ok, send_error(Message, Detail, NewState), Conn};
254        {stop, normal, NewState} ->
255            {stop, normal, SuccessFun(NewState)};
256        {stop, R, NewState} ->
257            {stop, R, NewState}
258    end.
259
260process_connect(Implicit, Frame,
261                State = #proc_state{channel        = none,
262                               config         = Config,
263                               ssl_login_name = SSLLoginName,
264                               adapter_info   = AdapterInfo}) ->
265    process_request(
266      fun(StateN) ->
267              case negotiate_version(Frame) of
268                  {ok, Version} ->
269                      FT = frame_transformer(Version),
270                      Frame1 = FT(Frame),
271                      {Auth, {Username, Passwd}} = creds(Frame1, SSLLoginName, Config),
272                      {ok, DefaultVHost} = application:get_env(
273                                             rabbitmq_stomp, default_vhost),
274                      {ProtoName, _} = AdapterInfo#amqp_adapter_info.protocol,
275                      Res = do_login(
276                              Username, Passwd,
277                              login_header(Frame1, ?HEADER_HOST, DefaultVHost),
278                              login_header(Frame1, ?HEADER_HEART_BEAT, "0,0"),
279                              AdapterInfo#amqp_adapter_info{
280                                protocol = {ProtoName, Version}}, Version,
281                              StateN#proc_state{frame_transformer = FT,
282                                                auth_mechanism = Auth,
283                                                auth_login = Username}),
284                      case {Res, Implicit} of
285                          {{ok, _, StateN1}, implicit} -> ok(StateN1);
286                          _                            -> Res
287                      end;
288                  {error, no_common_version} ->
289                      error("Version mismatch",
290                            "Supported versions are ~s~n",
291                            [string:join(?SUPPORTED_VERSIONS, ",")],
292                            StateN)
293              end
294      end,
295      State).
296
297creds(_, _, #stomp_configuration{default_login       = DefLogin,
298                                 default_passcode    = DefPasscode,
299                                 force_default_creds = true}) ->
300    {config, {iolist_to_binary(DefLogin), iolist_to_binary(DefPasscode)}};
301creds(Frame, SSLLoginName,
302      #stomp_configuration{default_login    = DefLogin,
303                           default_passcode = DefPasscode}) ->
304    PasswordCreds = {login_header(Frame, ?HEADER_LOGIN,    DefLogin),
305                     login_header(Frame, ?HEADER_PASSCODE, DefPasscode)},
306    case {rabbit_stomp_frame:header(Frame, ?HEADER_LOGIN), SSLLoginName} of
307        {not_found, none}    -> {config, PasswordCreds};
308        {not_found, SSLName} -> {ssl, {SSLName, none}};
309        _                    -> {stomp_headers, PasswordCreds}
310    end.
311
312login_header(Frame, Key, Default) when is_binary(Default) ->
313    login_header(Frame, Key, binary_to_list(Default));
314login_header(Frame, Key, Default) ->
315    case rabbit_stomp_frame:header(Frame, Key, Default) of
316        undefined -> undefined;
317        Hdr       -> list_to_binary(Hdr)
318    end.
319
320%%----------------------------------------------------------------------------
321%% Frame Transformation
322%%----------------------------------------------------------------------------
323
324frame_transformer("1.0") -> fun rabbit_stomp_util:trim_headers/1;
325frame_transformer(_) -> fun(Frame) -> Frame end.
326
327%%----------------------------------------------------------------------------
328%% Frame Validation
329%%----------------------------------------------------------------------------
330
331report_missing_id_header(State) ->
332    error("Missing Header",
333          "Header 'id' is required for durable subscriptions", State).
334
335validate_frame(Command, Frame, State)
336  when Command =:= "SUBSCRIBE" orelse Command =:= "UNSUBSCRIBE" ->
337    Hdr = fun(Name) -> rabbit_stomp_frame:header(Frame, Name) end,
338    case {Hdr(?HEADER_DURABLE), Hdr(?HEADER_PERSISTENT), Hdr(?HEADER_ID)} of
339        {{ok, "true"}, _, not_found} ->
340            report_missing_id_header(State);
341        {_, {ok, "true"}, not_found} ->
342            report_missing_id_header(State);
343        _ ->
344            ok(State)
345    end;
346validate_frame(_Command, _Frame, State) ->
347    ok(State).
348
349%%----------------------------------------------------------------------------
350%% Frame handlers
351%%----------------------------------------------------------------------------
352
353handle_frame("DISCONNECT", _Frame, State) ->
354    {stop, normal, close_connection(State)};
355
356handle_frame("SUBSCRIBE", Frame, State) ->
357    with_destination("SUBSCRIBE", Frame, State, fun do_subscribe/4);
358
359handle_frame("UNSUBSCRIBE", Frame, State) ->
360    ConsumerTag = rabbit_stomp_util:consumer_tag(Frame),
361    cancel_subscription(ConsumerTag, Frame, State);
362
363handle_frame("SEND", Frame, State) ->
364    without_headers(?HEADERS_NOT_ON_SEND, "SEND", Frame, State,
365        fun (_Command, Frame1, State1) ->
366            with_destination("SEND", Frame1, State1, fun do_send/4)
367        end);
368
369handle_frame("ACK", Frame, State) ->
370    ack_action("ACK", Frame, State, fun create_ack_method/3);
371
372handle_frame("NACK", Frame, State) ->
373    ack_action("NACK", Frame, State, fun create_nack_method/3);
374
375handle_frame("BEGIN", Frame, State) ->
376    transactional_action(Frame, "BEGIN", fun begin_transaction/2, State);
377
378handle_frame("COMMIT", Frame, State) ->
379    transactional_action(Frame, "COMMIT", fun commit_transaction/2, State);
380
381handle_frame("ABORT", Frame, State) ->
382    transactional_action(Frame, "ABORT", fun abort_transaction/2, State);
383
384handle_frame(Command, _Frame, State) ->
385    error("Bad command",
386          "Could not interpret command ~p~n",
387          [Command],
388          State).
389
390%%----------------------------------------------------------------------------
391%% Internal helpers for processing frames callbacks
392%%----------------------------------------------------------------------------
393
394ack_action(Command, Frame,
395           State = #proc_state{subscriptions = Subs,
396                          channel              = Channel,
397                          version              = Version,
398                          default_nack_requeue = DefaultNackRequeue}, MethodFun) ->
399    AckHeader = rabbit_stomp_util:ack_header_name(Version),
400    case rabbit_stomp_frame:header(Frame, AckHeader) of
401        {ok, AckValue} ->
402            case rabbit_stomp_util:parse_message_id(AckValue) of
403                {ok, {ConsumerTag, _SessionId, DeliveryTag}} ->
404                    case maps:find(ConsumerTag, Subs) of
405                        {ok, Sub} ->
406                            Requeue = rabbit_stomp_frame:boolean_header(Frame, "requeue", DefaultNackRequeue),
407                            Method = MethodFun(DeliveryTag, Sub, Requeue),
408                            case transactional(Frame) of
409                                {yes, Transaction} ->
410                                    extend_transaction(
411                                      Transaction, {Method}, State);
412                                no ->
413                                    amqp_channel:call(Channel, Method),
414                                    ok(State)
415                            end;
416                        error ->
417                            error("Subscription not found",
418                                  "Message with id ~p has no subscription",
419                                  [AckValue],
420                                  State)
421                    end;
422                _ ->
423                   error("Invalid header",
424                         "~p must include a valid ~p header~n",
425                         [Command, AckHeader],
426                         State)
427            end;
428        not_found ->
429            error("Missing header",
430                  "~p must include the ~p header~n",
431                  [Command, AckHeader],
432                  State)
433    end.
434
435%%----------------------------------------------------------------------------
436%% Internal helpers for processing frames callbacks
437%%----------------------------------------------------------------------------
438
439server_cancel_consumer(ConsumerTag, State = #proc_state{subscriptions = Subs}) ->
440    case maps:find(ConsumerTag, Subs) of
441        error ->
442            error("Server cancelled unknown subscription",
443                  "Consumer tag ~p is not associated with a subscription.~n",
444                  [ConsumerTag],
445                  State);
446        {ok, Subscription = #subscription{description = Description}} ->
447            Id = case rabbit_stomp_util:tag_to_id(ConsumerTag) of
448                     {ok,    {_, Id1}} -> Id1;
449                     {error, {_, Id1}} -> "Unknown[" ++ Id1 ++ "]"
450                 end,
451            _ = send_error_frame("Server cancelled subscription",
452                                 [{?HEADER_SUBSCRIPTION, Id}],
453                                 "The server has canceled a subscription.~n"
454                                 "No more messages will be delivered for ~p.~n",
455                                 [Description],
456                                 State),
457            tidy_canceled_subscription(ConsumerTag, Subscription,
458                                       undefined, State)
459    end.
460
461cancel_subscription({error, invalid_prefix}, _Frame, State) ->
462    error("Invalid id",
463          "UNSUBSCRIBE 'id' may not start with ~s~n",
464          [?TEMP_QUEUE_ID_PREFIX],
465          State);
466
467cancel_subscription({error, _}, _Frame, State) ->
468    error("Missing destination or id",
469          "UNSUBSCRIBE must include a 'destination' or 'id' header",
470          State);
471
472cancel_subscription({ok, ConsumerTag, Description}, Frame,
473                    State = #proc_state{subscriptions = Subs,
474                                   channel       = Channel}) ->
475    case maps:find(ConsumerTag, Subs) of
476        error ->
477            error("No subscription found",
478                  "UNSUBSCRIBE must refer to an existing subscription.~n"
479                  "Subscription to ~p not found.~n",
480                  [Description],
481                  State);
482        {ok, Subscription = #subscription{description = Descr}} ->
483            case amqp_channel:call(Channel,
484                                   #'basic.cancel'{
485                                     consumer_tag = ConsumerTag}) of
486                #'basic.cancel_ok'{consumer_tag = ConsumerTag} ->
487                    tidy_canceled_subscription(ConsumerTag, Subscription,
488                                               Frame, State);
489                _ ->
490                    error("Failed to cancel subscription",
491                          "UNSUBSCRIBE to ~p failed.~n",
492                          [Descr],
493                          State)
494            end
495    end.
496
497%% Server-initiated cancelations will pass an undefined instead of a
498%% STOMP frame. In this case we know that the queue was deleted and
499%% thus we don't have to clean it up.
500tidy_canceled_subscription(ConsumerTag, _Subscription,
501                           undefined, State = #proc_state{subscriptions = Subs}) ->
502    Subs1 = maps:remove(ConsumerTag, Subs),
503    ok(State#proc_state{subscriptions = Subs1});
504
505%% Client-initiated cancelations will pass an actual frame
506tidy_canceled_subscription(ConsumerTag, #subscription{dest_hdr = DestHdr},
507                           Frame, State = #proc_state{subscriptions = Subs}) ->
508    Subs1 = maps:remove(ConsumerTag, Subs),
509    {ok, Dest} = rabbit_routing_util:parse_endpoint(DestHdr),
510    maybe_delete_durable_sub(Dest, Frame, State#proc_state{subscriptions = Subs1}).
511
512maybe_delete_durable_sub({topic, Name}, Frame,
513                         State = #proc_state{channel = Channel}) ->
514    case rabbit_stomp_util:has_durable_header(Frame) of
515        true ->
516            {ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
517            QName = rabbit_stomp_util:subscription_queue_name(Name, Id, Frame),
518            amqp_channel:call(Channel,
519                              #'queue.delete'{queue  = list_to_binary(QName),
520                                              nowait = false}),
521            ok(State);
522        false ->
523            ok(State)
524    end;
525maybe_delete_durable_sub(_Destination, _Frame, State) ->
526    ok(State).
527
528with_destination(Command, Frame, State, Fun) ->
529    case rabbit_stomp_frame:header(Frame, ?HEADER_DESTINATION) of
530        {ok, DestHdr} ->
531            case rabbit_routing_util:parse_endpoint(DestHdr) of
532                {ok, Destination} ->
533                    case Fun(Destination, DestHdr, Frame, State) of
534                        {error, invalid_endpoint} ->
535                            error("Invalid destination",
536                                  "'~s' is not a valid destination for '~s'~n",
537                                  [DestHdr, Command],
538                                  State);
539                        {error, {invalid_destination, Msg}} ->
540                            error("Invalid destination",
541                                  "~s",
542                                  [Msg],
543                                  State);
544                        {error, Reason} ->
545                            throw(Reason);
546                        Result ->
547                            Result
548                    end;
549                {error, {invalid_destination, Type, Content}} ->
550                    error("Invalid destination",
551                          "'~s' is not a valid ~p destination~n",
552                          [Content, Type],
553                          State);
554                {error, {unknown_destination, Content}} ->
555                    error("Unknown destination",
556                          "'~s' is not a valid destination.~n"
557                          "Valid destination types are: ~s.~n",
558                          [Content,
559                           string:join(rabbit_routing_util:all_dest_prefixes(),
560                                       ", ")], State)
561            end;
562        not_found ->
563            error("Missing destination",
564                  "~p must include a 'destination' header~n",
565                  [Command],
566                  State)
567    end.
568
569without_headers([Hdr | Hdrs], Command, Frame, State, Fun) ->
570    case rabbit_stomp_frame:header(Frame, Hdr) of
571        {ok, _} ->
572            error("Invalid header",
573                  "'~s' is not allowed on '~s'.~n",
574                  [Hdr, Command],
575                  State);
576        not_found ->
577            without_headers(Hdrs, Command, Frame, State, Fun)
578    end;
579without_headers([], Command, Frame, State, Fun) ->
580    Fun(Command, Frame, State).
581
582do_login(undefined, _, _, _, _, _, State) ->
583    error("Bad CONNECT", "Missing login or passcode header(s)", State);
584do_login(Username, Passwd, VirtualHost, Heartbeat, AdapterInfo, Version,
585         State = #proc_state{peer_addr = Addr}) ->
586    case start_connection(
587           #amqp_params_direct{username     = Username,
588                               password     = Passwd,
589                               virtual_host = VirtualHost,
590                               adapter_info = AdapterInfo}, Username, Addr) of
591        {ok, Connection} ->
592            link(Connection),
593            {ok, Channel} = amqp_connection:open_channel(Connection),
594            link(Channel),
595            amqp_channel:enable_delivery_flow_control(Channel),
596            SessionId = rabbit_guid:string(rabbit_guid:gen_secure(), "session"),
597            {SendTimeout, ReceiveTimeout} = ensure_heartbeats(Heartbeat),
598
599          Headers = [{?HEADER_SESSION, SessionId},
600                     {?HEADER_HEART_BEAT,
601                      io_lib:format("~B,~B", [SendTimeout, ReceiveTimeout])},
602                     {?HEADER_VERSION, Version}],
603          ok("CONNECTED",
604              case application:get_env(rabbitmq_stomp, hide_server_info, false) of
605                true  -> Headers;
606                false -> [{?HEADER_SERVER, server_header()} | Headers]
607              end,
608               "",
609               State#proc_state{session_id = SessionId,
610                                channel    = Channel,
611                                connection = Connection,
612                                version    = Version});
613        {error, {auth_failure, _}} ->
614            rabbit_log:warning("STOMP login failed for user '~s': authentication failed", [Username]),
615            error("Bad CONNECT", "Access refused for user '" ++
616                  binary_to_list(Username) ++ "'~n", [], State);
617        {error, not_allowed} ->
618            rabbit_log:warning("STOMP login failed for user '~s': "
619                               "virtual host access not allowed", [Username]),
620            error("Bad CONNECT", "Virtual host '" ++
621                                 binary_to_list(VirtualHost) ++
622                                 "' access denied", State);
623        {error, access_refused} ->
624            rabbit_log:warning("STOMP login failed for user '~s': "
625                               "virtual host access not allowed", [Username]),
626            error("Bad CONNECT", "Virtual host '" ++
627                                 binary_to_list(VirtualHost) ++
628                                 "' access denied", State);
629        {error, not_loopback} ->
630            rabbit_log:warning("STOMP login failed for user '~s': "
631                               "this user's access is restricted to localhost", [Username]),
632            error("Bad CONNECT", "non-loopback access denied", State)
633    end.
634
635start_connection(Params, Username, Addr) ->
636    case amqp_connection:start(Params) of
637        {ok, Conn} -> case rabbit_access_control:check_user_loopback(
638                             Username, Addr) of
639                          ok          -> {ok, Conn};
640                          not_allowed -> amqp_connection:close(Conn),
641                                         {error, not_loopback}
642                      end;
643        {error, E} -> {error, E}
644    end.
645
646server_header() ->
647    {ok, Product} = application:get_key(rabbit, description),
648    {ok, Version} = application:get_key(rabbit, vsn),
649    rabbit_misc:format("~s/~s", [Product, Version]).
650
651do_subscribe(Destination, DestHdr, Frame,
652             State = #proc_state{subscriptions = Subs,
653                                 route_state   = RouteState,
654                                 channel       = Channel,
655                                 default_topic_exchange = DfltTopicEx}) ->
656    check_subscription_access(Destination, State),
657    Prefetch =
658        rabbit_stomp_frame:integer_header(Frame, ?HEADER_PREFETCH_COUNT,
659                                          undefined),
660    {AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
661    case ensure_endpoint(source, Destination, Frame, Channel, RouteState) of
662        {ok, Queue, RouteState1} ->
663            {ok, ConsumerTag, Description} =
664                rabbit_stomp_util:consumer_tag(Frame),
665            case Prefetch of
666                undefined -> ok;
667                _         -> amqp_channel:call(
668                               Channel, #'basic.qos'{prefetch_count = Prefetch})
669            end,
670            case maps:find(ConsumerTag, Subs) of
671                {ok, _} ->
672                    Message = "Duplicated subscription identifier",
673                    Detail = "A subscription identified by '~s' already exists.",
674                    _ = error(Message, Detail, [ConsumerTag], State),
675                    _ = send_error(Message, Detail, [ConsumerTag], State),
676                    {stop, normal, close_connection(State)};
677                error ->
678                    ExchangeAndKey = parse_routing(Destination, DfltTopicEx),
679                    StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame, undefined),
680                    Arguments = case StreamOffset of
681                                    undefined ->
682                                        [];
683                                    {Type, Value} ->
684                                        [{<<"x-stream-offset">>, Type, Value}]
685                                end,
686                    try
687                        amqp_channel:subscribe(Channel,
688                                               #'basic.consume'{
689                                                  queue        = Queue,
690                                                  consumer_tag = ConsumerTag,
691                                                  no_local     = false,
692                                                  no_ack       = (AckMode == auto),
693                                                  exclusive    = false,
694                                                  arguments    = Arguments},
695                                               self()),
696                        ok = rabbit_routing_util:ensure_binding(
697                               Queue, ExchangeAndKey, Channel)
698                    catch exit:Err ->
699                            %% it's safe to delete this queue, it
700                            %% was server-named and declared by us
701                            case Destination of
702                                {exchange, _} ->
703                                    ok = maybe_clean_up_queue(Queue, State);
704                                {topic, _} ->
705                                    ok = maybe_clean_up_queue(Queue, State);
706                                _ ->
707                                    ok
708                            end,
709                            exit(Err)
710                    end,
711                    ok(State#proc_state{subscriptions =
712                                       maps:put(
713                                         ConsumerTag,
714                                         #subscription{dest_hdr    = DestHdr,
715                                                       ack_mode    = AckMode,
716                                                       multi_ack   = IsMulti,
717                                                       description = Description},
718                                         Subs),
719                                   route_state = RouteState1})
720            end;
721        {error, _} = Err ->
722            Err
723    end.
724
725check_subscription_access(Destination = {topic, _Topic},
726                          #proc_state{auth_login = _User,
727                                      connection = Connection,
728                                      default_topic_exchange = DfltTopicEx}) ->
729    [{amqp_params, AmqpParams}, {internal_user, InternalUser = #user{username = Username}}] =
730        amqp_connection:info(Connection, [amqp_params, internal_user]),
731    #amqp_params_direct{virtual_host = VHost} = AmqpParams,
732    {Exchange, RoutingKey} = parse_routing(Destination, DfltTopicEx),
733    Resource = #resource{virtual_host = VHost,
734        kind = topic,
735        name = rabbit_data_coercion:to_binary(Exchange)},
736    Context = #{routing_key  => rabbit_data_coercion:to_binary(RoutingKey),
737                variable_map => #{<<"vhost">> => VHost, <<"username">> => Username}
738    },
739    rabbit_access_control:check_topic_access(InternalUser, Resource, read, Context);
740check_subscription_access(_, _) ->
741    authorized.
742
743maybe_clean_up_queue(Queue, #proc_state{connection = Connection}) ->
744    {ok, Channel} = amqp_connection:open_channel(Connection),
745    catch amqp_channel:call(Channel, #'queue.delete'{queue = Queue}),
746    catch amqp_channel:close(Channel),
747    ok.
748
749do_send(Destination, _DestHdr,
750        Frame = #stomp_frame{body_iolist = BodyFragments},
751        State = #proc_state{channel = Channel,
752                            route_state = RouteState,
753                            default_topic_exchange = DfltTopicEx}) ->
754    case ensure_endpoint(dest, Destination, Frame, Channel, RouteState) of
755
756        {ok, _Q, RouteState1} ->
757
758            {Frame1, State1} =
759                ensure_reply_to(Frame, State#proc_state{route_state = RouteState1}),
760
761            Props = rabbit_stomp_util:message_properties(Frame1),
762
763            {Exchange, RoutingKey} = parse_routing(Destination, DfltTopicEx),
764
765            Method = #'basic.publish'{
766              exchange = list_to_binary(Exchange),
767              routing_key = list_to_binary(RoutingKey),
768              mandatory = false,
769              immediate = false},
770
771            case transactional(Frame1) of
772                {yes, Transaction} ->
773                    extend_transaction(
774                      Transaction,
775                      fun(StateN) ->
776                              maybe_record_receipt(Frame1, StateN)
777                      end,
778                      {Method, Props, BodyFragments},
779                      State1);
780                no ->
781                    ok(send_method(Method, Props, BodyFragments,
782                                   maybe_record_receipt(Frame1, State1)))
783            end;
784
785        {error, _} = Err ->
786
787            Err
788    end.
789
790create_ack_method(DeliveryTag, #subscription{multi_ack = IsMulti}, _) ->
791    #'basic.ack'{delivery_tag = DeliveryTag,
792                 multiple     = IsMulti}.
793
794create_nack_method(DeliveryTag, #subscription{multi_ack = IsMulti}, Requeue) ->
795    #'basic.nack'{delivery_tag = DeliveryTag,
796                  multiple     = IsMulti,
797                  requeue      = Requeue}.
798
799negotiate_version(Frame) ->
800    ClientVers = re:split(rabbit_stomp_frame:header(
801                            Frame, ?HEADER_ACCEPT_VERSION, "1.0"),
802                          ",", [{return, list}]),
803    rabbit_stomp_util:negotiate_version(ClientVers, ?SUPPORTED_VERSIONS).
804
805
806send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag},
807              Properties, Body, DeliveryCtx,
808              State = #proc_state{
809                          session_id  = SessionId,
810                          subscriptions = Subs,
811                          version       = Version}) ->
812    NewState = case maps:find(ConsumerTag, Subs) of
813        {ok, #subscription{ack_mode = AckMode}} ->
814            send_frame(
815              "MESSAGE",
816              rabbit_stomp_util:headers(SessionId, Delivery, Properties,
817                                        AckMode, Version),
818              Body,
819              State);
820        error ->
821            send_error("Subscription not found",
822                       "There is no current subscription with tag '~s'.",
823                       [ConsumerTag],
824                       State)
825    end,
826    notify_received(DeliveryCtx),
827    NewState.
828
829notify_received(undefined) ->
830  %% no notification for quorum queues and streams
831  ok;
832notify_received(DeliveryCtx) ->
833  %% notification for flow control
834  amqp_channel:notify_received(DeliveryCtx).
835
836send_method(Method, Channel, State) ->
837    amqp_channel:call(Channel, Method),
838    State.
839
840send_method(Method, State = #proc_state{channel = Channel}) ->
841    send_method(Method, Channel, State).
842
843send_method(Method, Properties, BodyFragments,
844            State = #proc_state{channel = Channel}) ->
845    send_method(Method, Channel, Properties, BodyFragments, State).
846
847send_method(Method = #'basic.publish'{}, Channel, Properties, BodyFragments,
848            State) ->
849    amqp_channel:cast_flow(
850      Channel, Method,
851      #amqp_msg{props   = Properties,
852                payload = list_to_binary(BodyFragments)}),
853    State.
854
855close_connection(State = #proc_state{connection = none}) ->
856    State;
857%% Closing the connection will close the channel and subchannels
858close_connection(State = #proc_state{connection = Connection}) ->
859    %% ignore noproc or other exceptions to avoid debris
860    catch amqp_connection:close(Connection),
861    State#proc_state{channel = none, connection = none, subscriptions = none};
862close_connection(undefined) ->
863    rabbit_log:debug("~s:close_connection: undefined state", [?MODULE]),
864    #proc_state{channel = none, connection = none, subscriptions = none}.
865
866%%----------------------------------------------------------------------------
867%% Reply-To
868%%----------------------------------------------------------------------------
869
870ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) ->
871    case rabbit_stomp_frame:header(Frame, ?HEADER_REPLY_TO) of
872        not_found ->
873            {Frame, State};
874        {ok, ReplyTo} ->
875            {ok, Destination} = rabbit_routing_util:parse_endpoint(ReplyTo),
876            case rabbit_routing_util:dest_temp_queue(Destination) of
877                none ->
878                    {Frame, State};
879                TempQueueId ->
880                    {ReplyQueue, State1} =
881                        ensure_reply_queue(TempQueueId, State),
882                    {Frame#stomp_frame{
883                       headers = lists:keyreplace(
884                                   ?HEADER_REPLY_TO, 1, Headers,
885                                   {?HEADER_REPLY_TO, ReplyQueue})},
886                     State1}
887            end
888    end.
889
890ensure_reply_queue(TempQueueId, State = #proc_state{channel       = Channel,
891                                               reply_queues  = RQS,
892                                               subscriptions = Subs}) ->
893    case maps:find(TempQueueId, RQS) of
894        {ok, RQ} ->
895            {binary_to_list(RQ), State};
896        error ->
897            #'queue.declare_ok'{queue = Queue} =
898                amqp_channel:call(Channel,
899                                  #'queue.declare'{auto_delete = true,
900                                                   exclusive   = true}),
901
902            ConsumerTag = rabbit_stomp_util:consumer_tag_reply_to(TempQueueId),
903            #'basic.consume_ok'{} =
904                amqp_channel:subscribe(Channel,
905                                       #'basic.consume'{
906                                         queue        = Queue,
907                                         consumer_tag = ConsumerTag,
908                                         no_ack       = true,
909                                         nowait       = false},
910                                       self()),
911
912            Destination = binary_to_list(Queue),
913
914            %% synthesise a subscription to the reply queue destination
915            Subs1 = maps:put(ConsumerTag,
916                             #subscription{dest_hdr  = Destination,
917                                           multi_ack = false},
918                             Subs),
919
920            {Destination, State#proc_state{
921                            reply_queues  = maps:put(TempQueueId, Queue, RQS),
922                            subscriptions = Subs1}}
923    end.
924
925%%----------------------------------------------------------------------------
926%% Receipt Handling
927%%----------------------------------------------------------------------------
928
929ensure_receipt(Frame = #stomp_frame{command = Command}, State) ->
930    case rabbit_stomp_frame:header(Frame, ?HEADER_RECEIPT) of
931        {ok, Id}  -> do_receipt(Command, Id, State);
932        not_found -> State
933    end.
934
935do_receipt("SEND", _, State) ->
936    %% SEND frame receipts are handled when messages are confirmed
937    State;
938do_receipt(_Frame, ReceiptId, State) ->
939    send_frame("RECEIPT", [{"receipt-id", ReceiptId}], "", State).
940
941maybe_record_receipt(Frame, State = #proc_state{channel          = Channel,
942                                           pending_receipts = PR}) ->
943    case rabbit_stomp_frame:header(Frame, ?HEADER_RECEIPT) of
944        {ok, Id} ->
945            PR1 = case PR of
946                      undefined ->
947                          amqp_channel:register_confirm_handler(
948                            Channel, self()),
949                          #'confirm.select_ok'{} =
950                              amqp_channel:call(Channel, #'confirm.select'{}),
951                          gb_trees:empty();
952                      _ ->
953                          PR
954                  end,
955            SeqNo = amqp_channel:next_publish_seqno(Channel),
956            State#proc_state{pending_receipts = gb_trees:insert(SeqNo, Id, PR1)};
957        not_found ->
958            State
959    end.
960
961flush_pending_receipts(DeliveryTag, IsMulti,
962                       State = #proc_state{pending_receipts = PR}) ->
963    {Receipts, PR1} = accumulate_receipts(DeliveryTag, IsMulti, PR),
964    State1 = lists:foldl(fun(ReceiptId, StateN) ->
965                                 do_receipt(none, ReceiptId, StateN)
966                         end, State, Receipts),
967    State1#proc_state{pending_receipts = PR1}.
968
969accumulate_receipts(DeliveryTag, false, PR) ->
970    case gb_trees:lookup(DeliveryTag, PR) of
971        {value, ReceiptId} -> {[ReceiptId], gb_trees:delete(DeliveryTag, PR)};
972        none               -> {[], PR}
973    end;
974
975accumulate_receipts(DeliveryTag, true, PR) ->
976    case gb_trees:is_empty(PR) of
977        true  -> {[], PR};
978        false -> accumulate_receipts1(DeliveryTag,
979                                      gb_trees:take_smallest(PR), [])
980    end.
981
982accumulate_receipts1(DeliveryTag, {Key, Value, PR}, Acc)
983  when Key > DeliveryTag ->
984    {lists:reverse(Acc), gb_trees:insert(Key, Value, PR)};
985accumulate_receipts1(DeliveryTag, {_Key, Value, PR}, Acc) ->
986    Acc1 = [Value | Acc],
987    case gb_trees:is_empty(PR) of
988        true  -> {lists:reverse(Acc1), PR};
989        false -> accumulate_receipts1(DeliveryTag,
990                                      gb_trees:take_smallest(PR), Acc1)
991    end.
992
993%%----------------------------------------------------------------------------
994%% Transaction Support
995%%----------------------------------------------------------------------------
996
997transactional(Frame) ->
998    case rabbit_stomp_frame:header(Frame, ?HEADER_TRANSACTION) of
999        {ok, Transaction} -> {yes, Transaction};
1000        not_found         -> no
1001    end.
1002
1003transactional_action(Frame, Name, Fun, State) ->
1004    case transactional(Frame) of
1005        {yes, Transaction} ->
1006            Fun(Transaction, State);
1007        no ->
1008            error("Missing transaction",
1009                  "~p must include a 'transaction' header~n",
1010                  [Name],
1011                  State)
1012    end.
1013
1014with_transaction(Transaction, State, Fun) ->
1015    case get({transaction, Transaction}) of
1016        undefined ->
1017            error("Bad transaction",
1018                  "Invalid transaction identifier: ~p~n",
1019                  [Transaction],
1020                  State);
1021        Actions ->
1022            Fun(Actions, State)
1023    end.
1024
1025begin_transaction(Transaction, State) ->
1026    put({transaction, Transaction}, []),
1027    ok(State).
1028
1029extend_transaction(Transaction, Callback, Action, State) ->
1030    extend_transaction(Transaction, {callback, Callback, Action}, State).
1031
1032extend_transaction(Transaction, Action, State0) ->
1033    with_transaction(
1034      Transaction, State0,
1035      fun (Actions, State) ->
1036              put({transaction, Transaction}, [Action | Actions]),
1037              ok(State)
1038      end).
1039
1040commit_transaction(Transaction, State0) ->
1041    with_transaction(
1042      Transaction, State0,
1043      fun (Actions, State) ->
1044              FinalState = lists:foldr(fun perform_transaction_action/2,
1045                                       State,
1046                                       Actions),
1047              erase({transaction, Transaction}),
1048              ok(FinalState)
1049      end).
1050
1051abort_transaction(Transaction, State0) ->
1052    with_transaction(
1053      Transaction, State0,
1054      fun (_Actions, State) ->
1055              erase({transaction, Transaction}),
1056              ok(State)
1057      end).
1058
1059perform_transaction_action({callback, Callback, Action}, State) ->
1060    perform_transaction_action(Action, Callback(State));
1061perform_transaction_action({Method}, State) ->
1062    send_method(Method, State);
1063perform_transaction_action({Method, Props, BodyFragments}, State) ->
1064    send_method(Method, Props, BodyFragments, State).
1065
1066%%--------------------------------------------------------------------
1067%% Heartbeat Management
1068%%--------------------------------------------------------------------
1069
1070ensure_heartbeats(Heartbeats) ->
1071
1072    [CX, CY] = [list_to_integer(X) ||
1073                   X <- re:split(Heartbeats, ",", [{return, list}])],
1074
1075    {SendTimeout, ReceiveTimeout} =
1076        {millis_to_seconds(CY), millis_to_seconds(CX)},
1077
1078    _ = rabbit_stomp_reader:start_heartbeats(self(), {SendTimeout, ReceiveTimeout}),
1079    {SendTimeout * 1000 , ReceiveTimeout * 1000}.
1080
1081millis_to_seconds(M) when M =< 0   -> 0;
1082millis_to_seconds(M) when M < 1000 -> 1;
1083millis_to_seconds(M)               -> M div 1000.
1084
1085%%----------------------------------------------------------------------------
1086%% Queue Setup
1087%%----------------------------------------------------------------------------
1088
1089ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) ->
1090    {error, {invalid_destination, "Destination cannot be blank"}};
1091
1092ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) ->
1093    Params =
1094        [{subscription_queue_name_gen,
1095          fun () ->
1096              Id = build_subscription_id(Frame),
1097              % Note: we discard the exchange here so there's no need to use
1098              % the default_topic_exchange configuration key
1099              {_, Name} = rabbit_routing_util:parse_routing(EndPoint),
1100              list_to_binary(rabbit_stomp_util:subscription_queue_name(Name, Id, Frame))
1101          end
1102         }] ++ rabbit_stomp_util:build_params(EndPoint, Headers),
1103    Arguments = rabbit_stomp_util:build_arguments(Headers),
1104    rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint,
1105                                        [Arguments | Params], State);
1106
1107ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel, State) ->
1108    Params = rabbit_stomp_util:build_params(EndPoint, Headers),
1109    Arguments = rabbit_stomp_util:build_arguments(Headers),
1110    rabbit_routing_util:ensure_endpoint(Direction, Channel, EndPoint,
1111                                        [Arguments | Params], State).
1112
1113build_subscription_id(Frame) ->
1114    case rabbit_stomp_util:has_durable_header(Frame) of
1115        true ->
1116            {ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
1117            Id;
1118        false ->
1119            rabbit_guid:gen_secure()
1120    end.
1121
1122%%----------------------------------------------------------------------------
1123%% Success/error handling
1124%%----------------------------------------------------------------------------
1125
1126ok(State) ->
1127    {ok, none, State}.
1128
1129ok(Command, Headers, BodyFragments, State) ->
1130    {ok, #stomp_frame{command     = Command,
1131                      headers     = Headers,
1132                      body_iolist = BodyFragments}, State}.
1133
1134amqp_death(access_refused = ErrorName, Explanation, State) ->
1135    ErrorDesc = rabbit_misc:format("~s", [Explanation]),
1136    log_error(ErrorName, ErrorDesc, none),
1137    {stop, normal, close_connection(send_error(atom_to_list(ErrorName), ErrorDesc, State))};
1138amqp_death(ReplyCode, Explanation, State) ->
1139    ErrorName = amqp_connection:error_atom(ReplyCode),
1140    ErrorDesc = rabbit_misc:format("~s", [Explanation]),
1141    log_error(ErrorName, ErrorDesc, none),
1142    {stop, normal, close_connection(send_error(atom_to_list(ErrorName), ErrorDesc, State))}.
1143
1144error(Message, Detail, State) ->
1145    priv_error(Message, Detail, none, State).
1146
1147error(Message, Format, Args, State) ->
1148    priv_error(Message, Format, Args, none, State).
1149
1150priv_error(Message, Detail, ServerPrivateDetail, State) ->
1151    log_error(Message, Detail, ServerPrivateDetail),
1152    {error, Message, Detail, State}.
1153
1154priv_error(Message, Format, Args, ServerPrivateDetail, State) ->
1155    priv_error(Message, rabbit_misc:format(Format, Args), ServerPrivateDetail,
1156               State).
1157
1158log_error(Message, Detail, ServerPrivateDetail) ->
1159    rabbit_log:error("STOMP error frame sent:~n"
1160                     "Message: ~p~n"
1161                     "Detail: ~p~n"
1162                     "Server private detail: ~p",
1163                     [Message, Detail, ServerPrivateDetail]).
1164
1165%%----------------------------------------------------------------------------
1166%% Frame sending utilities
1167%%----------------------------------------------------------------------------
1168
1169send_frame(Command, Headers, BodyFragments, State) ->
1170    send_frame(#stomp_frame{command     = Command,
1171                            headers     = Headers,
1172                            body_iolist = BodyFragments},
1173               State).
1174
1175send_frame(Frame, State = #proc_state{send_fun = SendFun,
1176                                 trailing_lf = TrailingLF}) ->
1177    SendFun(async, rabbit_stomp_frame:serialize(Frame, TrailingLF)),
1178    State.
1179
1180send_error_frame(Message, ExtraHeaders, Format, Args, State) ->
1181    send_error_frame(Message, ExtraHeaders, rabbit_misc:format(Format, Args),
1182                     State).
1183
1184send_error_frame(Message, ExtraHeaders, Detail, State) ->
1185    send_frame("ERROR", [{"message", Message},
1186                         {"content-type", "text/plain"},
1187                         {"version", string:join(?SUPPORTED_VERSIONS, ",")}] ++
1188                        ExtraHeaders,
1189                        Detail, State).
1190
1191send_error(Message, Detail, State) ->
1192    send_error_frame(Message, [], Detail, State).
1193
1194send_error(Message, Format, Args, State) ->
1195    send_error(Message, rabbit_misc:format(Format, Args), State).
1196
1197additional_info(Key,
1198                #proc_state{adapter_info =
1199                                #amqp_adapter_info{additional_info = AddInfo}}) ->
1200    proplists:get_value(Key, AddInfo).
1201
1202parse_routing(Destination, DefaultTopicExchange) ->
1203    {Exchange0, RoutingKey} = rabbit_routing_util:parse_routing(Destination),
1204    Exchange1 = maybe_apply_default_topic_exchange(Exchange0, DefaultTopicExchange),
1205    {Exchange1, RoutingKey}.
1206
1207maybe_apply_default_topic_exchange("amq.topic"=Exchange, <<"amq.topic">>=_DefaultTopicExchange) ->
1208    %% This is the case where the destination is the same
1209    %% as the default of amq.topic
1210    Exchange;
1211maybe_apply_default_topic_exchange("amq.topic"=_Exchange, DefaultTopicExchange) ->
1212    %% This is the case where the destination would have been
1213    %% amq.topic but we have configured a different default
1214    binary_to_list(DefaultTopicExchange);
1215maybe_apply_default_topic_exchange(Exchange, _DefaultTopicExchange) ->
1216    %% This is the case where the destination is different than
1217    %% amq.topic, so it must have been specified in the
1218    %% message headers
1219    Exchange.
1220