1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_stomp_processor). 9 10-compile({no_auto_import, [error/3]}). 11 12-export([initial_state/2, process_frame/2, flush_and_die/1]). 13-export([flush_pending_receipts/3, 14 handle_exit/3, 15 cancel_consumer/2, 16 send_delivery/5]). 17 18-export([adapter_name/1]). 19-export([info/2]). 20 21-include_lib("amqp_client/include/amqp_client.hrl"). 22-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl"). 23-include("rabbit_stomp_frame.hrl"). 24-include("rabbit_stomp.hrl"). 25-include("rabbit_stomp_headers.hrl"). 26 27-record(proc_state, {session_id, channel, connection, subscriptions, 28 version, start_heartbeat_fun, pending_receipts, 29 config, route_state, reply_queues, frame_transformer, 30 adapter_info, send_fun, ssl_login_name, peer_addr, 31 %% see rabbitmq/rabbitmq-stomp#39 32 trailing_lf, auth_mechanism, auth_login, 33 default_topic_exchange, default_nack_requeue}). 34 35-record(subscription, {dest_hdr, ack_mode, multi_ack, description}). 36 37-define(FLUSH_TIMEOUT, 60000). 38 39adapter_name(State) -> 40 #proc_state{adapter_info = #amqp_adapter_info{name = Name}} = State, 41 Name. 42 43%%---------------------------------------------------------------------------- 44 45-spec initial_state( 46 #stomp_configuration{}, 47 {SendFun, AdapterInfo, SSLLoginName, PeerAddr}) 48 -> #proc_state{} 49 when SendFun :: fun((atom(), binary()) -> term()), 50 AdapterInfo :: #amqp_adapter_info{}, 51 SSLLoginName :: atom() | binary(), 52 PeerAddr :: inet:ip_address(). 53 54-type process_frame_result() :: 55 {ok, term(), #proc_state{}} | 56 {stop, term(), #proc_state{}}. 57 58-spec process_frame(#stomp_frame{}, #proc_state{}) -> 59 process_frame_result(). 60 61-spec flush_and_die(#proc_state{}) -> #proc_state{}. 62 63-spec command({Command, Frame}, State) -> process_frame_result() 64 when Command :: string(), 65 Frame :: #stomp_frame{}, 66 State :: #proc_state{}. 67 68-type process_fun() :: fun((#proc_state{}) -> 69 {ok, #stomp_frame{}, #proc_state{}} | 70 {error, string(), string(), #proc_state{}} | 71 {stop, term(), #proc_state{}}). 72-spec process_request(process_fun(), fun((#proc_state{}) -> #proc_state{}), #proc_state{}) -> 73 process_frame_result(). 74 75-spec flush_pending_receipts(DeliveryTag, IsMulti, State) -> State 76 when State :: #proc_state{}, 77 DeliveryTag :: term(), 78 IsMulti :: boolean(). 79 80-spec handle_exit(From, Reason, State) -> unknown_exit | {stop, Reason, State} 81 when State :: #proc_state{}, 82 From :: pid(), 83 Reason :: term(). 84 85-spec cancel_consumer(binary(), #proc_state{}) -> process_frame_result(). 86 87-spec send_delivery(#'basic.deliver'{}, term(), term(), term(), 88 #proc_state{}) -> #proc_state{}. 89 90%%---------------------------------------------------------------------------- 91 92 93%%---------------------------------------------------------------------------- 94%% Public API 95%%---------------------------------------------------------------------------- 96 97process_frame(Frame = #stomp_frame{command = Command}, State) -> 98 command({Command, Frame}, State). 99 100flush_and_die(State) -> 101 close_connection(State). 102 103info(session_id, #proc_state{session_id = Val}) -> 104 Val; 105info(channel, #proc_state{channel = Val}) -> Val; 106info(version, #proc_state{version = Val}) -> Val; 107info(implicit_connect, #proc_state{config = #stomp_configuration{implicit_connect = Val}}) -> Val; 108info(auth_login, #proc_state{auth_login = Val}) -> Val; 109info(auth_mechanism, #proc_state{auth_mechanism = Val}) -> Val; 110info(peer_addr, #proc_state{peer_addr = Val}) -> Val; 111info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val; 112info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val; 113info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val; 114info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val; 115info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) -> 116 case Val of 117 {Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)}; 118 Other -> Other 119 end; 120info(channels, PState) -> additional_info(channels, PState); 121info(channel_max, PState) -> additional_info(channel_max, PState); 122info(frame_max, PState) -> additional_info(frame_max, PState); 123info(client_properties, PState) -> additional_info(client_properties, PState); 124info(ssl, PState) -> additional_info(ssl, PState); 125info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState); 126info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState); 127info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState); 128info(ssl_hash, PState) -> additional_info(ssl_hash, PState). 129 130initial_state(Configuration, 131 {SendFun, AdapterInfo0 = #amqp_adapter_info{additional_info = Extra}, 132 SSLLoginName, PeerAddr}) -> 133 %% STOMP connections use exactly one channel. The frame max is not 134 %% applicable and there is no way to know what client is used. 135 AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info=[ 136 {channels, 1}, 137 {channel_max, 1}, 138 {frame_max, 0}, 139 %% TODO: can we use a header to make it possible for clients 140 %% to override this value? 141 {client_properties, [{<<"product">>, longstr, <<"STOMP client">>}]} 142 |Extra]}, 143 #proc_state { 144 send_fun = SendFun, 145 adapter_info = AdapterInfo, 146 ssl_login_name = SSLLoginName, 147 peer_addr = PeerAddr, 148 session_id = none, 149 channel = none, 150 connection = none, 151 subscriptions = #{}, 152 version = none, 153 pending_receipts = undefined, 154 config = Configuration, 155 route_state = rabbit_routing_util:init_state(), 156 reply_queues = #{}, 157 frame_transformer = undefined, 158 trailing_lf = application:get_env(rabbitmq_stomp, trailing_lf, true), 159 default_topic_exchange = application:get_env(rabbitmq_stomp, default_topic_exchange, <<"amq.topic">>), 160 default_nack_requeue = application:get_env(rabbitmq_stomp, default_nack_requeue, true)}. 161 162 163command({"STOMP", Frame}, State) -> 164 process_connect(no_implicit, Frame, State); 165 166command({"CONNECT", Frame}, State) -> 167 process_connect(no_implicit, Frame, State); 168 169command(Request, State = #proc_state{channel = none, 170 config = #stomp_configuration{ 171 implicit_connect = true}}) -> 172 {ok, State1 = #proc_state{channel = Ch}, _} = 173 process_connect(implicit, #stomp_frame{headers = []}, State), 174 case Ch of 175 none -> {stop, normal, State1}; 176 _ -> command(Request, State1) 177 end; 178 179command(_Request, State = #proc_state{channel = none, 180 config = #stomp_configuration{ 181 implicit_connect = false}}) -> 182 {ok, send_error("Illegal command", 183 "You must log in using CONNECT first", 184 State), none}; 185 186command({Command, Frame}, State = #proc_state{frame_transformer = FT}) -> 187 Frame1 = FT(Frame), 188 process_request( 189 fun(StateN) -> 190 case validate_frame(Command, Frame1, StateN) of 191 R = {error, _, _, _} -> R; 192 _ -> handle_frame(Command, Frame1, StateN) 193 end 194 end, 195 fun(StateM) -> ensure_receipt(Frame1, StateM) end, 196 State). 197 198cancel_consumer(Ctag, State) -> 199 process_request( 200 fun(StateN) -> server_cancel_consumer(Ctag, StateN) end, 201 State). 202 203handle_exit(Conn, {shutdown, {server_initiated_close, Code, Explanation}}, 204 State = #proc_state{connection = Conn}) -> 205 amqp_death(Code, Explanation, State); 206handle_exit(Conn, {shutdown, {connection_closing, 207 {server_initiated_close, Code, Explanation}}}, 208 State = #proc_state{connection = Conn}) -> 209 amqp_death(Code, Explanation, State); 210handle_exit(Conn, Reason, State = #proc_state{connection = Conn}) -> 211 _ = send_error("AMQP connection died", "Reason: ~p", [Reason], State), 212 {stop, {conn_died, Reason}, State}; 213 214handle_exit(Ch, {shutdown, {server_initiated_close, Code, Explanation}}, 215 State = #proc_state{channel = Ch}) -> 216 amqp_death(Code, Explanation, State); 217 218handle_exit(Ch, Reason, State = #proc_state{channel = Ch}) -> 219 _ = send_error("AMQP channel died", "Reason: ~p", [Reason], State), 220 {stop, {channel_died, Reason}, State}; 221handle_exit(Ch, {shutdown, {server_initiated_close, Code, Explanation}}, 222 State = #proc_state{channel = Ch}) -> 223 amqp_death(Code, Explanation, State); 224handle_exit(_, _, _) -> unknown_exit. 225 226 227process_request(ProcessFun, State) -> 228 process_request(ProcessFun, fun (StateM) -> StateM end, State). 229 230 231process_request(ProcessFun, SuccessFun, State) -> 232 Res = case catch ProcessFun(State) of 233 {'EXIT', 234 {{shutdown, 235 {server_initiated_close, ReplyCode, Explanation}}, _}} -> 236 amqp_death(ReplyCode, Explanation, State); 237 {'EXIT', {amqp_error, access_refused, Msg, _}} -> 238 amqp_death(access_refused, Msg, State); 239 {'EXIT', Reason} -> 240 priv_error("Processing error", "Processing error", 241 Reason, State); 242 Result -> 243 Result 244 end, 245 case Res of 246 {ok, Frame, NewState = #proc_state{connection = Conn}} -> 247 _ = case Frame of 248 none -> ok; 249 _ -> send_frame(Frame, NewState) 250 end, 251 {ok, SuccessFun(NewState), Conn}; 252 {error, Message, Detail, NewState = #proc_state{connection = Conn}} -> 253 {ok, send_error(Message, Detail, NewState), Conn}; 254 {stop, normal, NewState} -> 255 {stop, normal, SuccessFun(NewState)}; 256 {stop, R, NewState} -> 257 {stop, R, NewState} 258 end. 259 260process_connect(Implicit, Frame, 261 State = #proc_state{channel = none, 262 config = Config, 263 ssl_login_name = SSLLoginName, 264 adapter_info = AdapterInfo}) -> 265 process_request( 266 fun(StateN) -> 267 case negotiate_version(Frame) of 268 {ok, Version} -> 269 FT = frame_transformer(Version), 270 Frame1 = FT(Frame), 271 {Auth, {Username, Passwd}} = creds(Frame1, SSLLoginName, Config), 272 {ok, DefaultVHost} = application:get_env( 273 rabbitmq_stomp, default_vhost), 274 {ProtoName, _} = AdapterInfo#amqp_adapter_info.protocol, 275 Res = do_login( 276 Username, Passwd, 277 login_header(Frame1, ?HEADER_HOST, DefaultVHost), 278 login_header(Frame1, ?HEADER_HEART_BEAT, "0,0"), 279 AdapterInfo#amqp_adapter_info{ 280 protocol = {ProtoName, Version}}, Version, 281 StateN#proc_state{frame_transformer = FT, 282 auth_mechanism = Auth, 283 auth_login = Username}), 284 case {Res, Implicit} of 285 {{ok, _, StateN1}, implicit} -> ok(StateN1); 286 _ -> Res 287 end; 288 {error, no_common_version} -> 289 error("Version mismatch", 290 "Supported versions are ~s~n", 291 [string:join(?SUPPORTED_VERSIONS, ",")], 292 StateN) 293 end 294 end, 295 State). 296 297creds(_, _, #stomp_configuration{default_login = DefLogin, 298 default_passcode = DefPasscode, 299 force_default_creds = true}) -> 300 {config, {iolist_to_binary(DefLogin), iolist_to_binary(DefPasscode)}}; 301creds(Frame, SSLLoginName, 302 #stomp_configuration{default_login = DefLogin, 303 default_passcode = DefPasscode}) -> 304 PasswordCreds = {login_header(Frame, ?HEADER_LOGIN, DefLogin), 305 login_header(Frame, ?HEADER_PASSCODE, DefPasscode)}, 306 case {rabbit_stomp_frame:header(Frame, ?HEADER_LOGIN), SSLLoginName} of 307 {not_found, none} -> {config, PasswordCreds}; 308 {not_found, SSLName} -> {ssl, {SSLName, none}}; 309 _ -> {stomp_headers, PasswordCreds} 310 end. 311 312login_header(Frame, Key, Default) when is_binary(Default) -> 313 login_header(Frame, Key, binary_to_list(Default)); 314login_header(Frame, Key, Default) -> 315 case rabbit_stomp_frame:header(Frame, Key, Default) of 316 undefined -> undefined; 317 Hdr -> list_to_binary(Hdr) 318 end. 319 320%%---------------------------------------------------------------------------- 321%% Frame Transformation 322%%---------------------------------------------------------------------------- 323 324frame_transformer("1.0") -> fun rabbit_stomp_util:trim_headers/1; 325frame_transformer(_) -> fun(Frame) -> Frame end. 326 327%%---------------------------------------------------------------------------- 328%% Frame Validation 329%%---------------------------------------------------------------------------- 330 331report_missing_id_header(State) -> 332 error("Missing Header", 333 "Header 'id' is required for durable subscriptions", State). 334 335validate_frame(Command, Frame, State) 336 when Command =:= "SUBSCRIBE" orelse Command =:= "UNSUBSCRIBE" -> 337 Hdr = fun(Name) -> rabbit_stomp_frame:header(Frame, Name) end, 338 case {Hdr(?HEADER_DURABLE), Hdr(?HEADER_PERSISTENT), Hdr(?HEADER_ID)} of 339 {{ok, "true"}, _, not_found} -> 340 report_missing_id_header(State); 341 {_, {ok, "true"}, not_found} -> 342 report_missing_id_header(State); 343 _ -> 344 ok(State) 345 end; 346validate_frame(_Command, _Frame, State) -> 347 ok(State). 348 349%%---------------------------------------------------------------------------- 350%% Frame handlers 351%%---------------------------------------------------------------------------- 352 353handle_frame("DISCONNECT", _Frame, State) -> 354 {stop, normal, close_connection(State)}; 355 356handle_frame("SUBSCRIBE", Frame, State) -> 357 with_destination("SUBSCRIBE", Frame, State, fun do_subscribe/4); 358 359handle_frame("UNSUBSCRIBE", Frame, State) -> 360 ConsumerTag = rabbit_stomp_util:consumer_tag(Frame), 361 cancel_subscription(ConsumerTag, Frame, State); 362 363handle_frame("SEND", Frame, State) -> 364 without_headers(?HEADERS_NOT_ON_SEND, "SEND", Frame, State, 365 fun (_Command, Frame1, State1) -> 366 with_destination("SEND", Frame1, State1, fun do_send/4) 367 end); 368 369handle_frame("ACK", Frame, State) -> 370 ack_action("ACK", Frame, State, fun create_ack_method/3); 371 372handle_frame("NACK", Frame, State) -> 373 ack_action("NACK", Frame, State, fun create_nack_method/3); 374 375handle_frame("BEGIN", Frame, State) -> 376 transactional_action(Frame, "BEGIN", fun begin_transaction/2, State); 377 378handle_frame("COMMIT", Frame, State) -> 379 transactional_action(Frame, "COMMIT", fun commit_transaction/2, State); 380 381handle_frame("ABORT", Frame, State) -> 382 transactional_action(Frame, "ABORT", fun abort_transaction/2, State); 383 384handle_frame(Command, _Frame, State) -> 385 error("Bad command", 386 "Could not interpret command ~p~n", 387 [Command], 388 State). 389 390%%---------------------------------------------------------------------------- 391%% Internal helpers for processing frames callbacks 392%%---------------------------------------------------------------------------- 393 394ack_action(Command, Frame, 395 State = #proc_state{subscriptions = Subs, 396 channel = Channel, 397 version = Version, 398 default_nack_requeue = DefaultNackRequeue}, MethodFun) -> 399 AckHeader = rabbit_stomp_util:ack_header_name(Version), 400 case rabbit_stomp_frame:header(Frame, AckHeader) of 401 {ok, AckValue} -> 402 case rabbit_stomp_util:parse_message_id(AckValue) of 403 {ok, {ConsumerTag, _SessionId, DeliveryTag}} -> 404 case maps:find(ConsumerTag, Subs) of 405 {ok, Sub} -> 406 Requeue = rabbit_stomp_frame:boolean_header(Frame, "requeue", DefaultNackRequeue), 407 Method = MethodFun(DeliveryTag, Sub, Requeue), 408 case transactional(Frame) of 409 {yes, Transaction} -> 410 extend_transaction( 411 Transaction, {Method}, State); 412 no -> 413 amqp_channel:call(Channel, Method), 414 ok(State) 415 end; 416 error -> 417 error("Subscription not found", 418 "Message with id ~p has no subscription", 419 [AckValue], 420 State) 421 end; 422 _ -> 423 error("Invalid header", 424 "~p must include a valid ~p header~n", 425 [Command, AckHeader], 426 State) 427 end; 428 not_found -> 429 error("Missing header", 430 "~p must include the ~p header~n", 431 [Command, AckHeader], 432 State) 433 end. 434 435%%---------------------------------------------------------------------------- 436%% Internal helpers for processing frames callbacks 437%%---------------------------------------------------------------------------- 438 439server_cancel_consumer(ConsumerTag, State = #proc_state{subscriptions = Subs}) -> 440 case maps:find(ConsumerTag, Subs) of 441 error -> 442 error("Server cancelled unknown subscription", 443 "Consumer tag ~p is not associated with a subscription.~n", 444 [ConsumerTag], 445 State); 446 {ok, Subscription = #subscription{description = Description}} -> 447 Id = case rabbit_stomp_util:tag_to_id(ConsumerTag) of 448 {ok, {_, Id1}} -> Id1; 449 {error, {_, Id1}} -> "Unknown[" ++ Id1 ++ "]" 450 end, 451 _ = send_error_frame("Server cancelled subscription", 452 [{?HEADER_SUBSCRIPTION, Id}], 453 "The server has canceled a subscription.~n" 454 "No more messages will be delivered for ~p.~n", 455 [Description], 456 State), 457 tidy_canceled_subscription(ConsumerTag, Subscription, 458 undefined, State) 459 end. 460 461cancel_subscription({error, invalid_prefix}, _Frame, State) -> 462 error("Invalid id", 463 "UNSUBSCRIBE 'id' may not start with ~s~n", 464 [?TEMP_QUEUE_ID_PREFIX], 465 State); 466 467cancel_subscription({error, _}, _Frame, State) -> 468 error("Missing destination or id", 469 "UNSUBSCRIBE must include a 'destination' or 'id' header", 470 State); 471 472cancel_subscription({ok, ConsumerTag, Description}, Frame, 473 State = #proc_state{subscriptions = Subs, 474 channel = Channel}) -> 475 case maps:find(ConsumerTag, Subs) of 476 error -> 477 error("No subscription found", 478 "UNSUBSCRIBE must refer to an existing subscription.~n" 479 "Subscription to ~p not found.~n", 480 [Description], 481 State); 482 {ok, Subscription = #subscription{description = Descr}} -> 483 case amqp_channel:call(Channel, 484 #'basic.cancel'{ 485 consumer_tag = ConsumerTag}) of 486 #'basic.cancel_ok'{consumer_tag = ConsumerTag} -> 487 tidy_canceled_subscription(ConsumerTag, Subscription, 488 Frame, State); 489 _ -> 490 error("Failed to cancel subscription", 491 "UNSUBSCRIBE to ~p failed.~n", 492 [Descr], 493 State) 494 end 495 end. 496 497%% Server-initiated cancelations will pass an undefined instead of a 498%% STOMP frame. In this case we know that the queue was deleted and 499%% thus we don't have to clean it up. 500tidy_canceled_subscription(ConsumerTag, _Subscription, 501 undefined, State = #proc_state{subscriptions = Subs}) -> 502 Subs1 = maps:remove(ConsumerTag, Subs), 503 ok(State#proc_state{subscriptions = Subs1}); 504 505%% Client-initiated cancelations will pass an actual frame 506tidy_canceled_subscription(ConsumerTag, #subscription{dest_hdr = DestHdr}, 507 Frame, State = #proc_state{subscriptions = Subs}) -> 508 Subs1 = maps:remove(ConsumerTag, Subs), 509 {ok, Dest} = rabbit_routing_util:parse_endpoint(DestHdr), 510 maybe_delete_durable_sub(Dest, Frame, State#proc_state{subscriptions = Subs1}). 511 512maybe_delete_durable_sub({topic, Name}, Frame, 513 State = #proc_state{channel = Channel}) -> 514 case rabbit_stomp_util:has_durable_header(Frame) of 515 true -> 516 {ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID), 517 QName = rabbit_stomp_util:subscription_queue_name(Name, Id, Frame), 518 amqp_channel:call(Channel, 519 #'queue.delete'{queue = list_to_binary(QName), 520 nowait = false}), 521 ok(State); 522 false -> 523 ok(State) 524 end; 525maybe_delete_durable_sub(_Destination, _Frame, State) -> 526 ok(State). 527 528with_destination(Command, Frame, State, Fun) -> 529 case rabbit_stomp_frame:header(Frame, ?HEADER_DESTINATION) of 530 {ok, DestHdr} -> 531 case rabbit_routing_util:parse_endpoint(DestHdr) of 532 {ok, Destination} -> 533 case Fun(Destination, DestHdr, Frame, State) of 534 {error, invalid_endpoint} -> 535 error("Invalid destination", 536 "'~s' is not a valid destination for '~s'~n", 537 [DestHdr, Command], 538 State); 539 {error, {invalid_destination, Msg}} -> 540 error("Invalid destination", 541 "~s", 542 [Msg], 543 State); 544 {error, Reason} -> 545 throw(Reason); 546 Result -> 547 Result 548 end; 549 {error, {invalid_destination, Type, Content}} -> 550 error("Invalid destination", 551 "'~s' is not a valid ~p destination~n", 552 [Content, Type], 553 State); 554 {error, {unknown_destination, Content}} -> 555 error("Unknown destination", 556 "'~s' is not a valid destination.~n" 557 "Valid destination types are: ~s.~n", 558 [Content, 559 string:join(rabbit_routing_util:all_dest_prefixes(), 560 ", ")], State) 561 end; 562 not_found -> 563 error("Missing destination", 564 "~p must include a 'destination' header~n", 565 [Command], 566 State) 567 end. 568 569without_headers([Hdr | Hdrs], Command, Frame, State, Fun) -> 570 case rabbit_stomp_frame:header(Frame, Hdr) of 571 {ok, _} -> 572 error("Invalid header", 573 "'~s' is not allowed on '~s'.~n", 574 [Hdr, Command], 575 State); 576 not_found -> 577 without_headers(Hdrs, Command, Frame, State, Fun) 578 end; 579without_headers([], Command, Frame, State, Fun) -> 580 Fun(Command, Frame, State). 581 582do_login(undefined, _, _, _, _, _, State) -> 583 error("Bad CONNECT", "Missing login or passcode header(s)", State); 584do_login(Username, Passwd, VirtualHost, Heartbeat, AdapterInfo, Version, 585 State = #proc_state{peer_addr = Addr}) -> 586 case start_connection( 587 #amqp_params_direct{username = Username, 588 password = Passwd, 589 virtual_host = VirtualHost, 590 adapter_info = AdapterInfo}, Username, Addr) of 591 {ok, Connection} -> 592 link(Connection), 593 {ok, Channel} = amqp_connection:open_channel(Connection), 594 link(Channel), 595 amqp_channel:enable_delivery_flow_control(Channel), 596 SessionId = rabbit_guid:string(rabbit_guid:gen_secure(), "session"), 597 {SendTimeout, ReceiveTimeout} = ensure_heartbeats(Heartbeat), 598 599 Headers = [{?HEADER_SESSION, SessionId}, 600 {?HEADER_HEART_BEAT, 601 io_lib:format("~B,~B", [SendTimeout, ReceiveTimeout])}, 602 {?HEADER_VERSION, Version}], 603 ok("CONNECTED", 604 case application:get_env(rabbitmq_stomp, hide_server_info, false) of 605 true -> Headers; 606 false -> [{?HEADER_SERVER, server_header()} | Headers] 607 end, 608 "", 609 State#proc_state{session_id = SessionId, 610 channel = Channel, 611 connection = Connection, 612 version = Version}); 613 {error, {auth_failure, _}} -> 614 rabbit_log:warning("STOMP login failed for user '~s': authentication failed", [Username]), 615 error("Bad CONNECT", "Access refused for user '" ++ 616 binary_to_list(Username) ++ "'~n", [], State); 617 {error, not_allowed} -> 618 rabbit_log:warning("STOMP login failed for user '~s': " 619 "virtual host access not allowed", [Username]), 620 error("Bad CONNECT", "Virtual host '" ++ 621 binary_to_list(VirtualHost) ++ 622 "' access denied", State); 623 {error, access_refused} -> 624 rabbit_log:warning("STOMP login failed for user '~s': " 625 "virtual host access not allowed", [Username]), 626 error("Bad CONNECT", "Virtual host '" ++ 627 binary_to_list(VirtualHost) ++ 628 "' access denied", State); 629 {error, not_loopback} -> 630 rabbit_log:warning("STOMP login failed for user '~s': " 631 "this user's access is restricted to localhost", [Username]), 632 error("Bad CONNECT", "non-loopback access denied", State) 633 end. 634 635start_connection(Params, Username, Addr) -> 636 case amqp_connection:start(Params) of 637 {ok, Conn} -> case rabbit_access_control:check_user_loopback( 638 Username, Addr) of 639 ok -> {ok, Conn}; 640 not_allowed -> amqp_connection:close(Conn), 641 {error, not_loopback} 642 end; 643 {error, E} -> {error, E} 644 end. 645 646server_header() -> 647 {ok, Product} = application:get_key(rabbit, description), 648 {ok, Version} = application:get_key(rabbit, vsn), 649 rabbit_misc:format("~s/~s", [Product, Version]). 650 651do_subscribe(Destination, DestHdr, Frame, 652 State = #proc_state{subscriptions = Subs, 653 route_state = RouteState, 654 channel = Channel, 655 default_topic_exchange = DfltTopicEx}) -> 656 check_subscription_access(Destination, State), 657 Prefetch = 658 rabbit_stomp_frame:integer_header(Frame, ?HEADER_PREFETCH_COUNT, 659 undefined), 660 {AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame), 661 case ensure_endpoint(source, Destination, Frame, Channel, RouteState) of 662 {ok, Queue, RouteState1} -> 663 {ok, ConsumerTag, Description} = 664 rabbit_stomp_util:consumer_tag(Frame), 665 case Prefetch of 666 undefined -> ok; 667 _ -> amqp_channel:call( 668 Channel, #'basic.qos'{prefetch_count = Prefetch}) 669 end, 670 case maps:find(ConsumerTag, Subs) of 671 {ok, _} -> 672 Message = "Duplicated subscription identifier", 673 Detail = "A subscription identified by '~s' already exists.", 674 _ = error(Message, Detail, [ConsumerTag], State), 675 _ = send_error(Message, Detail, [ConsumerTag], State), 676 {stop, normal, close_connection(State)}; 677 error -> 678 ExchangeAndKey = parse_routing(Destination, DfltTopicEx), 679 StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame, undefined), 680 Arguments = case StreamOffset of 681 undefined -> 682 []; 683 {Type, Value} -> 684 [{<<"x-stream-offset">>, Type, Value}] 685 end, 686 try 687 amqp_channel:subscribe(Channel, 688 #'basic.consume'{ 689 queue = Queue, 690 consumer_tag = ConsumerTag, 691 no_local = false, 692 no_ack = (AckMode == auto), 693 exclusive = false, 694 arguments = Arguments}, 695 self()), 696 ok = rabbit_routing_util:ensure_binding( 697 Queue, ExchangeAndKey, Channel) 698 catch exit:Err -> 699 %% it's safe to delete this queue, it 700 %% was server-named and declared by us 701 case Destination of 702 {exchange, _} -> 703 ok = maybe_clean_up_queue(Queue, State); 704 {topic, _} -> 705 ok = maybe_clean_up_queue(Queue, State); 706 _ -> 707 ok 708 end, 709 exit(Err) 710 end, 711 ok(State#proc_state{subscriptions = 712 maps:put( 713 ConsumerTag, 714 #subscription{dest_hdr = DestHdr, 715 ack_mode = AckMode, 716 multi_ack = IsMulti, 717 description = Description}, 718 Subs), 719 route_state = RouteState1}) 720 end; 721 {error, _} = Err -> 722 Err 723 end. 724 725check_subscription_access(Destination = {topic, _Topic}, 726 #proc_state{auth_login = _User, 727 connection = Connection, 728 default_topic_exchange = DfltTopicEx}) -> 729 [{amqp_params, AmqpParams}, {internal_user, InternalUser = #user{username = Username}}] = 730 amqp_connection:info(Connection, [amqp_params, internal_user]), 731 #amqp_params_direct{virtual_host = VHost} = AmqpParams, 732 {Exchange, RoutingKey} = parse_routing(Destination, DfltTopicEx), 733 Resource = #resource{virtual_host = VHost, 734 kind = topic, 735 name = rabbit_data_coercion:to_binary(Exchange)}, 736 Context = #{routing_key => rabbit_data_coercion:to_binary(RoutingKey), 737 variable_map => #{<<"vhost">> => VHost, <<"username">> => Username} 738 }, 739 rabbit_access_control:check_topic_access(InternalUser, Resource, read, Context); 740check_subscription_access(_, _) -> 741 authorized. 742 743maybe_clean_up_queue(Queue, #proc_state{connection = Connection}) -> 744 {ok, Channel} = amqp_connection:open_channel(Connection), 745 catch amqp_channel:call(Channel, #'queue.delete'{queue = Queue}), 746 catch amqp_channel:close(Channel), 747 ok. 748 749do_send(Destination, _DestHdr, 750 Frame = #stomp_frame{body_iolist = BodyFragments}, 751 State = #proc_state{channel = Channel, 752 route_state = RouteState, 753 default_topic_exchange = DfltTopicEx}) -> 754 case ensure_endpoint(dest, Destination, Frame, Channel, RouteState) of 755 756 {ok, _Q, RouteState1} -> 757 758 {Frame1, State1} = 759 ensure_reply_to(Frame, State#proc_state{route_state = RouteState1}), 760 761 Props = rabbit_stomp_util:message_properties(Frame1), 762 763 {Exchange, RoutingKey} = parse_routing(Destination, DfltTopicEx), 764 765 Method = #'basic.publish'{ 766 exchange = list_to_binary(Exchange), 767 routing_key = list_to_binary(RoutingKey), 768 mandatory = false, 769 immediate = false}, 770 771 case transactional(Frame1) of 772 {yes, Transaction} -> 773 extend_transaction( 774 Transaction, 775 fun(StateN) -> 776 maybe_record_receipt(Frame1, StateN) 777 end, 778 {Method, Props, BodyFragments}, 779 State1); 780 no -> 781 ok(send_method(Method, Props, BodyFragments, 782 maybe_record_receipt(Frame1, State1))) 783 end; 784 785 {error, _} = Err -> 786 787 Err 788 end. 789 790create_ack_method(DeliveryTag, #subscription{multi_ack = IsMulti}, _) -> 791 #'basic.ack'{delivery_tag = DeliveryTag, 792 multiple = IsMulti}. 793 794create_nack_method(DeliveryTag, #subscription{multi_ack = IsMulti}, Requeue) -> 795 #'basic.nack'{delivery_tag = DeliveryTag, 796 multiple = IsMulti, 797 requeue = Requeue}. 798 799negotiate_version(Frame) -> 800 ClientVers = re:split(rabbit_stomp_frame:header( 801 Frame, ?HEADER_ACCEPT_VERSION, "1.0"), 802 ",", [{return, list}]), 803 rabbit_stomp_util:negotiate_version(ClientVers, ?SUPPORTED_VERSIONS). 804 805 806send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag}, 807 Properties, Body, DeliveryCtx, 808 State = #proc_state{ 809 session_id = SessionId, 810 subscriptions = Subs, 811 version = Version}) -> 812 NewState = case maps:find(ConsumerTag, Subs) of 813 {ok, #subscription{ack_mode = AckMode}} -> 814 send_frame( 815 "MESSAGE", 816 rabbit_stomp_util:headers(SessionId, Delivery, Properties, 817 AckMode, Version), 818 Body, 819 State); 820 error -> 821 send_error("Subscription not found", 822 "There is no current subscription with tag '~s'.", 823 [ConsumerTag], 824 State) 825 end, 826 notify_received(DeliveryCtx), 827 NewState. 828 829notify_received(undefined) -> 830 %% no notification for quorum queues and streams 831 ok; 832notify_received(DeliveryCtx) -> 833 %% notification for flow control 834 amqp_channel:notify_received(DeliveryCtx). 835 836send_method(Method, Channel, State) -> 837 amqp_channel:call(Channel, Method), 838 State. 839 840send_method(Method, State = #proc_state{channel = Channel}) -> 841 send_method(Method, Channel, State). 842 843send_method(Method, Properties, BodyFragments, 844 State = #proc_state{channel = Channel}) -> 845 send_method(Method, Channel, Properties, BodyFragments, State). 846 847send_method(Method = #'basic.publish'{}, Channel, Properties, BodyFragments, 848 State) -> 849 amqp_channel:cast_flow( 850 Channel, Method, 851 #amqp_msg{props = Properties, 852 payload = list_to_binary(BodyFragments)}), 853 State. 854 855close_connection(State = #proc_state{connection = none}) -> 856 State; 857%% Closing the connection will close the channel and subchannels 858close_connection(State = #proc_state{connection = Connection}) -> 859 %% ignore noproc or other exceptions to avoid debris 860 catch amqp_connection:close(Connection), 861 State#proc_state{channel = none, connection = none, subscriptions = none}; 862close_connection(undefined) -> 863 rabbit_log:debug("~s:close_connection: undefined state", [?MODULE]), 864 #proc_state{channel = none, connection = none, subscriptions = none}. 865 866%%---------------------------------------------------------------------------- 867%% Reply-To 868%%---------------------------------------------------------------------------- 869 870ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) -> 871 case rabbit_stomp_frame:header(Frame, ?HEADER_REPLY_TO) of 872 not_found -> 873 {Frame, State}; 874 {ok, ReplyTo} -> 875 {ok, Destination} = rabbit_routing_util:parse_endpoint(ReplyTo), 876 case rabbit_routing_util:dest_temp_queue(Destination) of 877 none -> 878 {Frame, State}; 879 TempQueueId -> 880 {ReplyQueue, State1} = 881 ensure_reply_queue(TempQueueId, State), 882 {Frame#stomp_frame{ 883 headers = lists:keyreplace( 884 ?HEADER_REPLY_TO, 1, Headers, 885 {?HEADER_REPLY_TO, ReplyQueue})}, 886 State1} 887 end 888 end. 889 890ensure_reply_queue(TempQueueId, State = #proc_state{channel = Channel, 891 reply_queues = RQS, 892 subscriptions = Subs}) -> 893 case maps:find(TempQueueId, RQS) of 894 {ok, RQ} -> 895 {binary_to_list(RQ), State}; 896 error -> 897 #'queue.declare_ok'{queue = Queue} = 898 amqp_channel:call(Channel, 899 #'queue.declare'{auto_delete = true, 900 exclusive = true}), 901 902 ConsumerTag = rabbit_stomp_util:consumer_tag_reply_to(TempQueueId), 903 #'basic.consume_ok'{} = 904 amqp_channel:subscribe(Channel, 905 #'basic.consume'{ 906 queue = Queue, 907 consumer_tag = ConsumerTag, 908 no_ack = true, 909 nowait = false}, 910 self()), 911 912 Destination = binary_to_list(Queue), 913 914 %% synthesise a subscription to the reply queue destination 915 Subs1 = maps:put(ConsumerTag, 916 #subscription{dest_hdr = Destination, 917 multi_ack = false}, 918 Subs), 919 920 {Destination, State#proc_state{ 921 reply_queues = maps:put(TempQueueId, Queue, RQS), 922 subscriptions = Subs1}} 923 end. 924 925%%---------------------------------------------------------------------------- 926%% Receipt Handling 927%%---------------------------------------------------------------------------- 928 929ensure_receipt(Frame = #stomp_frame{command = Command}, State) -> 930 case rabbit_stomp_frame:header(Frame, ?HEADER_RECEIPT) of 931 {ok, Id} -> do_receipt(Command, Id, State); 932 not_found -> State 933 end. 934 935do_receipt("SEND", _, State) -> 936 %% SEND frame receipts are handled when messages are confirmed 937 State; 938do_receipt(_Frame, ReceiptId, State) -> 939 send_frame("RECEIPT", [{"receipt-id", ReceiptId}], "", State). 940 941maybe_record_receipt(Frame, State = #proc_state{channel = Channel, 942 pending_receipts = PR}) -> 943 case rabbit_stomp_frame:header(Frame, ?HEADER_RECEIPT) of 944 {ok, Id} -> 945 PR1 = case PR of 946 undefined -> 947 amqp_channel:register_confirm_handler( 948 Channel, self()), 949 #'confirm.select_ok'{} = 950 amqp_channel:call(Channel, #'confirm.select'{}), 951 gb_trees:empty(); 952 _ -> 953 PR 954 end, 955 SeqNo = amqp_channel:next_publish_seqno(Channel), 956 State#proc_state{pending_receipts = gb_trees:insert(SeqNo, Id, PR1)}; 957 not_found -> 958 State 959 end. 960 961flush_pending_receipts(DeliveryTag, IsMulti, 962 State = #proc_state{pending_receipts = PR}) -> 963 {Receipts, PR1} = accumulate_receipts(DeliveryTag, IsMulti, PR), 964 State1 = lists:foldl(fun(ReceiptId, StateN) -> 965 do_receipt(none, ReceiptId, StateN) 966 end, State, Receipts), 967 State1#proc_state{pending_receipts = PR1}. 968 969accumulate_receipts(DeliveryTag, false, PR) -> 970 case gb_trees:lookup(DeliveryTag, PR) of 971 {value, ReceiptId} -> {[ReceiptId], gb_trees:delete(DeliveryTag, PR)}; 972 none -> {[], PR} 973 end; 974 975accumulate_receipts(DeliveryTag, true, PR) -> 976 case gb_trees:is_empty(PR) of 977 true -> {[], PR}; 978 false -> accumulate_receipts1(DeliveryTag, 979 gb_trees:take_smallest(PR), []) 980 end. 981 982accumulate_receipts1(DeliveryTag, {Key, Value, PR}, Acc) 983 when Key > DeliveryTag -> 984 {lists:reverse(Acc), gb_trees:insert(Key, Value, PR)}; 985accumulate_receipts1(DeliveryTag, {_Key, Value, PR}, Acc) -> 986 Acc1 = [Value | Acc], 987 case gb_trees:is_empty(PR) of 988 true -> {lists:reverse(Acc1), PR}; 989 false -> accumulate_receipts1(DeliveryTag, 990 gb_trees:take_smallest(PR), Acc1) 991 end. 992 993%%---------------------------------------------------------------------------- 994%% Transaction Support 995%%---------------------------------------------------------------------------- 996 997transactional(Frame) -> 998 case rabbit_stomp_frame:header(Frame, ?HEADER_TRANSACTION) of 999 {ok, Transaction} -> {yes, Transaction}; 1000 not_found -> no 1001 end. 1002 1003transactional_action(Frame, Name, Fun, State) -> 1004 case transactional(Frame) of 1005 {yes, Transaction} -> 1006 Fun(Transaction, State); 1007 no -> 1008 error("Missing transaction", 1009 "~p must include a 'transaction' header~n", 1010 [Name], 1011 State) 1012 end. 1013 1014with_transaction(Transaction, State, Fun) -> 1015 case get({transaction, Transaction}) of 1016 undefined -> 1017 error("Bad transaction", 1018 "Invalid transaction identifier: ~p~n", 1019 [Transaction], 1020 State); 1021 Actions -> 1022 Fun(Actions, State) 1023 end. 1024 1025begin_transaction(Transaction, State) -> 1026 put({transaction, Transaction}, []), 1027 ok(State). 1028 1029extend_transaction(Transaction, Callback, Action, State) -> 1030 extend_transaction(Transaction, {callback, Callback, Action}, State). 1031 1032extend_transaction(Transaction, Action, State0) -> 1033 with_transaction( 1034 Transaction, State0, 1035 fun (Actions, State) -> 1036 put({transaction, Transaction}, [Action | Actions]), 1037 ok(State) 1038 end). 1039 1040commit_transaction(Transaction, State0) -> 1041 with_transaction( 1042 Transaction, State0, 1043 fun (Actions, State) -> 1044 FinalState = lists:foldr(fun perform_transaction_action/2, 1045 State, 1046 Actions), 1047 erase({transaction, Transaction}), 1048 ok(FinalState) 1049 end). 1050 1051abort_transaction(Transaction, State0) -> 1052 with_transaction( 1053 Transaction, State0, 1054 fun (_Actions, State) -> 1055 erase({transaction, Transaction}), 1056 ok(State) 1057 end). 1058 1059perform_transaction_action({callback, Callback, Action}, State) -> 1060 perform_transaction_action(Action, Callback(State)); 1061perform_transaction_action({Method}, State) -> 1062 send_method(Method, State); 1063perform_transaction_action({Method, Props, BodyFragments}, State) -> 1064 send_method(Method, Props, BodyFragments, State). 1065 1066%%-------------------------------------------------------------------- 1067%% Heartbeat Management 1068%%-------------------------------------------------------------------- 1069 1070ensure_heartbeats(Heartbeats) -> 1071 1072 [CX, CY] = [list_to_integer(X) || 1073 X <- re:split(Heartbeats, ",", [{return, list}])], 1074 1075 {SendTimeout, ReceiveTimeout} = 1076 {millis_to_seconds(CY), millis_to_seconds(CX)}, 1077 1078 _ = rabbit_stomp_reader:start_heartbeats(self(), {SendTimeout, ReceiveTimeout}), 1079 {SendTimeout * 1000 , ReceiveTimeout * 1000}. 1080 1081millis_to_seconds(M) when M =< 0 -> 0; 1082millis_to_seconds(M) when M < 1000 -> 1; 1083millis_to_seconds(M) -> M div 1000. 1084 1085%%---------------------------------------------------------------------------- 1086%% Queue Setup 1087%%---------------------------------------------------------------------------- 1088 1089ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) -> 1090 {error, {invalid_destination, "Destination cannot be blank"}}; 1091 1092ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) -> 1093 Params = 1094 [{subscription_queue_name_gen, 1095 fun () -> 1096 Id = build_subscription_id(Frame), 1097 % Note: we discard the exchange here so there's no need to use 1098 % the default_topic_exchange configuration key 1099 {_, Name} = rabbit_routing_util:parse_routing(EndPoint), 1100 list_to_binary(rabbit_stomp_util:subscription_queue_name(Name, Id, Frame)) 1101 end 1102 }] ++ rabbit_stomp_util:build_params(EndPoint, Headers), 1103 Arguments = rabbit_stomp_util:build_arguments(Headers), 1104 rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint, 1105 [Arguments | Params], State); 1106 1107ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel, State) -> 1108 Params = rabbit_stomp_util:build_params(EndPoint, Headers), 1109 Arguments = rabbit_stomp_util:build_arguments(Headers), 1110 rabbit_routing_util:ensure_endpoint(Direction, Channel, EndPoint, 1111 [Arguments | Params], State). 1112 1113build_subscription_id(Frame) -> 1114 case rabbit_stomp_util:has_durable_header(Frame) of 1115 true -> 1116 {ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID), 1117 Id; 1118 false -> 1119 rabbit_guid:gen_secure() 1120 end. 1121 1122%%---------------------------------------------------------------------------- 1123%% Success/error handling 1124%%---------------------------------------------------------------------------- 1125 1126ok(State) -> 1127 {ok, none, State}. 1128 1129ok(Command, Headers, BodyFragments, State) -> 1130 {ok, #stomp_frame{command = Command, 1131 headers = Headers, 1132 body_iolist = BodyFragments}, State}. 1133 1134amqp_death(access_refused = ErrorName, Explanation, State) -> 1135 ErrorDesc = rabbit_misc:format("~s", [Explanation]), 1136 log_error(ErrorName, ErrorDesc, none), 1137 {stop, normal, close_connection(send_error(atom_to_list(ErrorName), ErrorDesc, State))}; 1138amqp_death(ReplyCode, Explanation, State) -> 1139 ErrorName = amqp_connection:error_atom(ReplyCode), 1140 ErrorDesc = rabbit_misc:format("~s", [Explanation]), 1141 log_error(ErrorName, ErrorDesc, none), 1142 {stop, normal, close_connection(send_error(atom_to_list(ErrorName), ErrorDesc, State))}. 1143 1144error(Message, Detail, State) -> 1145 priv_error(Message, Detail, none, State). 1146 1147error(Message, Format, Args, State) -> 1148 priv_error(Message, Format, Args, none, State). 1149 1150priv_error(Message, Detail, ServerPrivateDetail, State) -> 1151 log_error(Message, Detail, ServerPrivateDetail), 1152 {error, Message, Detail, State}. 1153 1154priv_error(Message, Format, Args, ServerPrivateDetail, State) -> 1155 priv_error(Message, rabbit_misc:format(Format, Args), ServerPrivateDetail, 1156 State). 1157 1158log_error(Message, Detail, ServerPrivateDetail) -> 1159 rabbit_log:error("STOMP error frame sent:~n" 1160 "Message: ~p~n" 1161 "Detail: ~p~n" 1162 "Server private detail: ~p", 1163 [Message, Detail, ServerPrivateDetail]). 1164 1165%%---------------------------------------------------------------------------- 1166%% Frame sending utilities 1167%%---------------------------------------------------------------------------- 1168 1169send_frame(Command, Headers, BodyFragments, State) -> 1170 send_frame(#stomp_frame{command = Command, 1171 headers = Headers, 1172 body_iolist = BodyFragments}, 1173 State). 1174 1175send_frame(Frame, State = #proc_state{send_fun = SendFun, 1176 trailing_lf = TrailingLF}) -> 1177 SendFun(async, rabbit_stomp_frame:serialize(Frame, TrailingLF)), 1178 State. 1179 1180send_error_frame(Message, ExtraHeaders, Format, Args, State) -> 1181 send_error_frame(Message, ExtraHeaders, rabbit_misc:format(Format, Args), 1182 State). 1183 1184send_error_frame(Message, ExtraHeaders, Detail, State) -> 1185 send_frame("ERROR", [{"message", Message}, 1186 {"content-type", "text/plain"}, 1187 {"version", string:join(?SUPPORTED_VERSIONS, ",")}] ++ 1188 ExtraHeaders, 1189 Detail, State). 1190 1191send_error(Message, Detail, State) -> 1192 send_error_frame(Message, [], Detail, State). 1193 1194send_error(Message, Format, Args, State) -> 1195 send_error(Message, rabbit_misc:format(Format, Args), State). 1196 1197additional_info(Key, 1198 #proc_state{adapter_info = 1199 #amqp_adapter_info{additional_info = AddInfo}}) -> 1200 proplists:get_value(Key, AddInfo). 1201 1202parse_routing(Destination, DefaultTopicExchange) -> 1203 {Exchange0, RoutingKey} = rabbit_routing_util:parse_routing(Destination), 1204 Exchange1 = maybe_apply_default_topic_exchange(Exchange0, DefaultTopicExchange), 1205 {Exchange1, RoutingKey}. 1206 1207maybe_apply_default_topic_exchange("amq.topic"=Exchange, <<"amq.topic">>=_DefaultTopicExchange) -> 1208 %% This is the case where the destination is the same 1209 %% as the default of amq.topic 1210 Exchange; 1211maybe_apply_default_topic_exchange("amq.topic"=_Exchange, DefaultTopicExchange) -> 1212 %% This is the case where the destination would have been 1213 %% amq.topic but we have configured a different default 1214 binary_to_list(DefaultTopicExchange); 1215maybe_apply_default_topic_exchange(Exchange, _DefaultTopicExchange) -> 1216 %% This is the case where the destination is different than 1217 %% amq.topic, so it must have been specified in the 1218 %% message headers 1219 Exchange. 1220