1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_reader). 9 10%% Transitional step until we can require Erlang/OTP 21 and 11%% use the now recommended try/catch syntax for obtaining the stack trace. 12-compile(nowarn_deprecated_function). 13 14%% This is an AMQP 0-9-1 connection implementation. If AMQP 1.0 plugin is enabled, 15%% this module passes control of incoming AMQP 1.0 connections to it. 16%% 17%% Every connection (as in, a process using this module) 18%% is a controlling process for a server socket. 19%% 20%% Connections have a number of responsibilities: 21%% 22%% * Performing protocol handshake 23%% * Parsing incoming data and dispatching protocol methods 24%% * Authenticating clients (with the help of authentication backends) 25%% * Enforcing TCP backpressure (throttling clients) 26%% * Enforcing connection limits, e.g. channel_max 27%% * Channel management 28%% * Setting up heartbeater and alarm notifications 29%% * Emitting connection and network activity metric events 30%% * Gracefully handling client disconnects, channel termination, etc 31%% 32%% and a few more. 33%% 34%% Every connection has 35%% 36%% * a queue collector which is responsible for keeping 37%% track of exclusive queues on the connection and their cleanup. 38%% * a heartbeater that's responsible for sending heartbeat frames to clients, 39%% keeping track of the incoming ones and notifying connection about 40%% heartbeat timeouts 41%% * Stats timer, a timer that is used to periodically emit metric events 42%% 43%% Some dependencies are started under a separate supervisor to avoid deadlocks 44%% during system shutdown. See rabbit_channel_sup:start_link/0 for details. 45%% 46%% Reader processes are special processes (in the OTP sense). 47 48-include_lib("rabbit_common/include/rabbit_framing.hrl"). 49-include_lib("rabbit_common/include/rabbit.hrl"). 50 51-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/2, 52 shutdown/2]). 53 54-export([system_continue/3, system_terminate/4, system_code_change/4]). 55 56-export([init/3, mainloop/4, recvloop/4]). 57 58-export([conserve_resources/3, server_properties/1]). 59 60-define(NORMAL_TIMEOUT, 3). 61-define(CLOSING_TIMEOUT, 30). 62-define(CHANNEL_TERMINATION_TIMEOUT, 3). 63%% we wait for this many seconds before closing TCP connection 64%% with a client that failed to log in. Provides some relief 65%% from connection storms and DoS. 66-define(SILENT_CLOSE_DELAY, 3). 67-define(CHANNEL_MIN, 1). 68 69%%-------------------------------------------------------------------------- 70 71-record(v1, { 72 %% parent process 73 parent, 74 %% socket 75 sock, 76 %% connection state, see connection record 77 connection, 78 callback, 79 recv_len, 80 pending_recv, 81 %% pre_init | securing | running | blocking | blocked | closing | closed | {become, F} 82 connection_state, 83 %% see comment in rabbit_connection_sup:start_link/0 84 helper_sup, 85 %% takes care of cleaning up exclusive queues, 86 %% see rabbit_queue_collector 87 queue_collector, 88 %% sends and receives heartbeat frames, 89 %% see rabbit_heartbeat 90 heartbeater, 91 %% timer used to emit statistics 92 stats_timer, 93 %% channel supervisor 94 channel_sup_sup_pid, 95 %% how many channels this connection has 96 channel_count, 97 %% throttling state, for both 98 %% credit- and resource-driven flow control 99 throttle, 100 proxy_socket}). 101 102-record(throttle, { 103 %% never | timestamp() 104 last_blocked_at, 105 %% a set of the reasons why we are 106 %% blocked: {resource, memory}, {resource, disk}. 107 %% More reasons can be added in the future. 108 blocked_by, 109 %% true if received any publishes, false otherwise 110 %% note that this will also be true when connection is 111 %% already blocked 112 should_block, 113 %% true if we had we sent a connection.blocked, 114 %% false otherwise 115 connection_blocked_message_sent 116}). 117 118-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, 119 send_pend, state, channels, reductions, 120 garbage_collection]). 121 122-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]). 123-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, 124 garbage_collection]). 125 126-define(CREATION_EVENT_KEYS, 127 [pid, name, port, peer_port, host, 128 peer_host, ssl, peer_cert_subject, peer_cert_issuer, 129 peer_cert_validity, auth_mechanism, ssl_protocol, 130 ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, 131 timeout, frame_max, channel_max, client_properties, connected_at, 132 node, user_who_performed_action]). 133 134-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). 135 136-define(AUTH_NOTIFICATION_INFO_KEYS, 137 [host, name, peer_host, peer_port, protocol, auth_mechanism, 138 ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject, 139 peer_cert_validity]). 140 141-define(IS_RUNNING(State), 142 (State#v1.connection_state =:= running orelse 143 State#v1.connection_state =:= blocked)). 144 145-define(IS_STOPPING(State), 146 (State#v1.connection_state =:= closing orelse 147 State#v1.connection_state =:= closed)). 148 149%%-------------------------------------------------------------------------- 150 151-type resource_alert() :: {WasAlarmSetForNode :: boolean(), 152 IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(), 153 NodeForWhichAlarmWasSetOrCleared :: node()}. 154 155%%-------------------------------------------------------------------------- 156 157-spec start_link(pid(), any()) -> rabbit_types:ok(pid()). 158 159start_link(HelperSup, Ref) -> 160 Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref]), 161 162 {ok, Pid}. 163 164-spec shutdown(pid(), string()) -> 'ok'. 165 166shutdown(Pid, Explanation) -> 167 gen_server:call(Pid, {shutdown, Explanation}, infinity). 168 169-spec init(pid(), pid(), any()) -> no_return(). 170 171init(Parent, HelperSup, Ref) -> 172 ?LG_PROCESS_TYPE(reader), 173 {ok, Sock} = rabbit_networking:handshake(Ref, 174 application:get_env(rabbit, proxy_protocol, false)), 175 Deb = sys:debug_options([]), 176 start_connection(Parent, HelperSup, Deb, Sock). 177 178-spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any(). 179 180system_continue(Parent, Deb, {Buf, BufLen, State}) -> 181 mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}). 182 183-spec system_terminate(_,_,_,_) -> no_return(). 184 185system_terminate(Reason, _Parent, _Deb, _State) -> 186 exit(Reason). 187 188-spec system_code_change(_,_,_,_) -> {'ok',_}. 189 190system_code_change(Misc, _Module, _OldVsn, _Extra) -> 191 {ok, Misc}. 192 193-spec info_keys() -> rabbit_types:info_keys(). 194 195info_keys() -> ?INFO_KEYS. 196 197-spec info(pid()) -> rabbit_types:infos(). 198 199info(Pid) -> 200 gen_server:call(Pid, info, infinity). 201 202-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). 203 204info(Pid, Items) -> 205 case gen_server:call(Pid, {info, Items}, infinity) of 206 {ok, Res} -> Res; 207 {error, Error} -> throw(Error) 208 end. 209 210-spec force_event_refresh(pid(), reference()) -> 'ok'. 211 212% Note: https://www.pivotaltracker.com/story/show/166962656 213% This event is necessary for the stats timer to be initialized with 214% the correct values once the management agent has started 215force_event_refresh(Pid, Ref) -> 216 gen_server:cast(Pid, {force_event_refresh, Ref}). 217 218-spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'. 219 220conserve_resources(Pid, Source, {_, Conserve, _}) -> 221 Pid ! {conserve_resources, Source, Conserve}, 222 ok. 223 224-spec server_properties(rabbit_types:protocol()) -> 225 rabbit_framing:amqp_table(). 226 227server_properties(Protocol) -> 228 {ok, Product} = application:get_key(rabbit, description), 229 {ok, Version} = application:get_key(rabbit, vsn), 230 231 %% Get any configuration-specified server properties 232 {ok, RawConfigServerProps} = application:get_env(rabbit, 233 server_properties), 234 235 %% Normalize the simplified (2-tuple) and unsimplified (3-tuple) forms 236 %% from the config and merge them with the generated built-in properties 237 NormalizedConfigServerProps = 238 [{<<"capabilities">>, table, server_capabilities(Protocol)} | 239 [case X of 240 {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), 241 longstr, 242 maybe_list_to_binary(Value)}; 243 {BinKey, Type, Value} -> {BinKey, Type, Value} 244 end || X <- RawConfigServerProps ++ 245 [{product, Product}, 246 {version, Version}, 247 {cluster_name, rabbit_nodes:cluster_name()}, 248 {platform, rabbit_misc:platform_and_version()}, 249 {copyright, ?COPYRIGHT_MESSAGE}, 250 {information, ?INFORMATION_MESSAGE}]]], 251 252 %% Filter duplicated properties in favour of config file provided values 253 lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, 254 NormalizedConfigServerProps). 255 256maybe_list_to_binary(V) when is_binary(V) -> V; 257maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V). 258 259server_capabilities(rabbit_framing_amqp_0_9_1) -> 260 [{<<"publisher_confirms">>, bool, true}, 261 {<<"exchange_exchange_bindings">>, bool, true}, 262 {<<"basic.nack">>, bool, true}, 263 {<<"consumer_cancel_notify">>, bool, true}, 264 {<<"connection.blocked">>, bool, true}, 265 {<<"consumer_priorities">>, bool, true}, 266 {<<"authentication_failure_close">>, bool, true}, 267 {<<"per_consumer_qos">>, bool, true}, 268 {<<"direct_reply_to">>, bool, true}]; 269server_capabilities(_) -> 270 []. 271 272%%-------------------------------------------------------------------------- 273 274socket_error(Reason) when is_atom(Reason) -> 275 rabbit_log_connection:error("Error on AMQP connection ~p: ~s", 276 [self(), rabbit_misc:format_inet_error(Reason)]); 277socket_error(Reason) -> 278 Fmt = "Error on AMQP connection ~p:~n~p", 279 Args = [self(), Reason], 280 case Reason of 281 %% The socket was closed while upgrading to SSL. 282 %% This is presumably a TCP healthcheck, so don't log 283 %% it unless specified otherwise. 284 {ssl_upgrade_error, closed} -> 285 rabbit_log_connection:debug(Fmt, Args); 286 _ -> 287 rabbit_log_connection:error(Fmt, Args) 288 end. 289 290inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). 291 292socket_op(Sock, Fun) -> 293 RealSocket = rabbit_net:unwrap_socket(Sock), 294 case Fun(Sock) of 295 {ok, Res} -> Res; 296 {error, Reason} -> socket_error(Reason), 297 rabbit_net:fast_close(RealSocket), 298 exit(normal) 299 end. 300 301-spec start_connection(pid(), pid(), any(), rabbit_net:socket()) -> 302 no_return(). 303 304start_connection(Parent, HelperSup, Deb, Sock) -> 305 process_flag(trap_exit, true), 306 RealSocket = rabbit_net:unwrap_socket(Sock), 307 Name = case rabbit_net:connection_string(Sock, inbound) of 308 {ok, Str} -> list_to_binary(Str); 309 {error, enotconn} -> rabbit_net:fast_close(RealSocket), 310 exit(normal); 311 {error, Reason} -> socket_error(Reason), 312 rabbit_net:fast_close(RealSocket), 313 exit(normal) 314 end, 315 {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), 316 InitialFrameMax = application:get_env(rabbit, initial_frame_max, ?FRAME_MIN_SIZE), 317 erlang:send_after(HandshakeTimeout, self(), handshake_timeout), 318 {PeerHost, PeerPort, Host, Port} = 319 socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), 320 ?store_proc_name(Name), 321 State = #v1{parent = Parent, 322 sock = RealSocket, 323 connection = #connection{ 324 name = Name, 325 log_name = Name, 326 host = Host, 327 peer_host = PeerHost, 328 port = Port, 329 peer_port = PeerPort, 330 protocol = none, 331 user = none, 332 timeout_sec = (HandshakeTimeout / 1000), 333 frame_max = InitialFrameMax, 334 vhost = none, 335 client_properties = none, 336 capabilities = [], 337 auth_mechanism = none, 338 auth_state = none, 339 connected_at = os:system_time( 340 milli_seconds)}, 341 callback = uninitialized_callback, 342 recv_len = 0, 343 pending_recv = false, 344 connection_state = pre_init, 345 queue_collector = undefined, %% started on tune-ok 346 helper_sup = HelperSup, 347 heartbeater = none, 348 channel_sup_sup_pid = none, 349 channel_count = 0, 350 throttle = #throttle{ 351 last_blocked_at = never, 352 should_block = false, 353 blocked_by = sets:new(), 354 connection_blocked_message_sent = false 355 }, 356 proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)}, 357 try 358 case run({?MODULE, recvloop, 359 [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer( 360 State, #v1.stats_timer), 361 handshake, 8)]}) of 362 %% connection was closed cleanly by the client 363 #v1{connection = #connection{user = #user{username = Username}, 364 vhost = VHost}} -> 365 rabbit_log_connection:info("closing AMQP connection ~p (~s, vhost: '~s', user: '~s')", 366 [self(), dynamic_connection_name(Name), VHost, Username]); 367 %% just to be more defensive 368 _ -> 369 rabbit_log_connection:info("closing AMQP connection ~p (~s)", 370 [self(), dynamic_connection_name(Name)]) 371 end 372 catch 373 Ex -> 374 log_connection_exception(dynamic_connection_name(Name), Ex) 375 after 376 %% We don't call gen_tcp:close/1 here since it waits for 377 %% pending output to be sent, which results in unnecessary 378 %% delays. We could just terminate - the reader is the 379 %% controlling process and hence its termination will close 380 %% the socket. However, to keep the file_handle_cache 381 %% accounting as accurate as possible we ought to close the 382 %% socket w/o delay before termination. 383 rabbit_net:fast_close(RealSocket), 384 rabbit_networking:unregister_connection(self()), 385 rabbit_core_metrics:connection_closed(self()), 386 ClientProperties = case get(client_properties) of 387 undefined -> 388 []; 389 Properties -> 390 Properties 391 end, 392 EventProperties = [{name, Name}, 393 {pid, self()}, 394 {node, node()}, 395 {client_properties, ClientProperties}], 396 EventProperties1 = case get(connection_user_provided_name) of 397 undefined -> 398 EventProperties; 399 ConnectionUserProvidedName -> 400 [{user_provided_name, ConnectionUserProvidedName} | EventProperties] 401 end, 402 rabbit_event:notify(connection_closed, EventProperties1) 403 end, 404 done. 405 406log_connection_exception(Name, Ex) -> 407 Severity = case Ex of 408 connection_closed_with_no_data_received -> debug; 409 {connection_closed_abruptly, _} -> warning; 410 connection_closed_abruptly -> warning; 411 _ -> error 412 end, 413 log_connection_exception(Severity, Name, Ex). 414 415log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) -> 416 %% Long line to avoid extra spaces and line breaks in log 417 log_connection_exception_with_severity(Severity, 418 "closing AMQP connection ~p (~s):~n" 419 "missed heartbeats from client, timeout: ~ps", 420 [self(), Name, TimeoutSec]); 421log_connection_exception(Severity, Name, {connection_closed_abruptly, 422 #v1{connection = #connection{user = #user{username = Username}, 423 vhost = VHost}}}) -> 424 log_connection_exception_with_severity(Severity, 425 "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection", 426 [self(), Name, VHost, Username]); 427%% when client abruptly closes connection before connection.open/authentication/authorization 428%% succeeded, don't log username and vhost as 'none' 429log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) -> 430 log_connection_exception_with_severity(Severity, 431 "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection", 432 [self(), Name]); 433%% failed connection.tune negotiations 434log_connection_exception(Severity, Name, {handshake_error, tuning, _Channel, 435 {exit, #amqp_error{explanation = Explanation}, 436 _Method, _Stacktrace}}) -> 437 log_connection_exception_with_severity(Severity, 438 "closing AMQP connection ~p (~s):~nfailed to negotiate connection parameters: ~s", 439 [self(), Name, Explanation]); 440%% old exception structure 441log_connection_exception(Severity, Name, connection_closed_abruptly) -> 442 log_connection_exception_with_severity(Severity, 443 "closing AMQP connection ~p (~s):~n" 444 "client unexpectedly closed TCP connection", 445 [self(), Name]); 446log_connection_exception(Severity, Name, Ex) -> 447 log_connection_exception_with_severity(Severity, 448 "closing AMQP connection ~p (~s):~n~p", 449 [self(), Name, Ex]). 450 451log_connection_exception_with_severity(Severity, Fmt, Args) -> 452 case Severity of 453 debug -> rabbit_log_connection:debug(Fmt, Args); 454 warning -> rabbit_log_connection:warning(Fmt, Args); 455 error -> rabbit_log_connection:error(Fmt, Args) 456 end. 457 458run({M, F, A}) -> 459 try apply(M, F, A) 460 catch {become, MFA} -> run(MFA) 461 end. 462 463recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) -> 464 mainloop(Deb, Buf, BufLen, State); 465recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) -> 466 mainloop(Deb, Buf, BufLen, State); 467recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> 468 throw({become, F(Deb, Buf, BufLen, State)}); 469recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) 470 when BufLen < RecvLen -> 471 case rabbit_net:setopts(Sock, [{active, once}]) of 472 ok -> mainloop(Deb, Buf, BufLen, 473 State#v1{pending_recv = true}); 474 {error, Reason} -> stop(Reason, State) 475 end; 476recvloop(Deb, [B], _BufLen, State) -> 477 {Rest, State1} = handle_input(State#v1.callback, B, State), 478 recvloop(Deb, [Rest], size(Rest), State1); 479recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) -> 480 {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []), 481 Data = list_to_binary(lists:reverse(DataLRev)), 482 {<<>>, State1} = handle_input(State#v1.callback, Data, State), 483 recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1). 484 485binlist_split(0, L, Acc) -> 486 {L, Acc}; 487binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> 488 {H, T} = split_binary(Acc0, -Len), 489 {[H|L], [T|Acc]}; 490binlist_split(Len, [H|T], Acc) -> 491 binlist_split(Len - size(H), T, [H|Acc]). 492 493-spec mainloop(_,[binary()], non_neg_integer(), #v1{}) -> any(). 494 495mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, 496 connection_state = CS, 497 connection = #connection{ 498 name = ConnName}}) -> 499 Recv = rabbit_net:recv(Sock), 500 case CS of 501 pre_init when Buf =:= [] -> 502 %% We only log incoming connections when either the 503 %% first byte was received or there was an error (eg. a 504 %% timeout). 505 %% 506 %% The goal is to not log TCP healthchecks (a connection 507 %% with no data received) unless specified otherwise. 508 Fmt = "accepting AMQP connection ~p (~s)", 509 Args = [self(), ConnName], 510 case Recv of 511 closed -> rabbit_log_connection:debug(Fmt, Args); 512 _ -> rabbit_log_connection:info(Fmt, Args) 513 end; 514 _ -> 515 ok 516 end, 517 case Recv of 518 {data, Data} -> 519 recvloop(Deb, [Data | Buf], BufLen + size(Data), 520 State#v1{pending_recv = false}); 521 closed when State#v1.connection_state =:= closed -> 522 State; 523 closed when CS =:= pre_init andalso Buf =:= [] -> 524 stop(tcp_healthcheck, State); 525 closed -> 526 stop(closed, State); 527 {other, {heartbeat_send_error, Reason}} -> 528 %% The only portable way to detect disconnect on blocked 529 %% connection is to wait for heartbeat send failure. 530 stop(Reason, State); 531 {error, Reason} -> 532 stop(Reason, State); 533 {other, {system, From, Request}} -> 534 sys:handle_system_msg(Request, From, State#v1.parent, 535 ?MODULE, Deb, {Buf, BufLen, State}); 536 {other, Other} -> 537 case handle_other(Other, State) of 538 stop -> State; 539 NewState -> recvloop(Deb, Buf, BufLen, NewState) 540 end 541 end. 542 543-spec stop(_, #v1{}) -> no_return(). 544stop(tcp_healthcheck, State) -> 545 %% The connection was closed before any packet was received. It's 546 %% probably a load-balancer healthcheck: don't consider this a 547 %% failure. 548 maybe_emit_stats(State), 549 throw(connection_closed_with_no_data_received); 550stop(closed, State) -> 551 maybe_emit_stats(State), 552 throw({connection_closed_abruptly, State}); 553stop(Reason, State) -> 554 maybe_emit_stats(State), 555 throw({inet_error, Reason}). 556 557handle_other({conserve_resources, Source, Conserve}, 558 State = #v1{throttle = Throttle = #throttle{blocked_by = Blockers}}) -> 559 Resource = {resource, Source}, 560 Blockers1 = case Conserve of 561 true -> sets:add_element(Resource, Blockers); 562 false -> sets:del_element(Resource, Blockers) 563 end, 564 control_throttle(State#v1{throttle = Throttle#throttle{blocked_by = Blockers1}}); 565handle_other({channel_closing, ChPid}, State) -> 566 ok = rabbit_channel:ready_for_close(ChPid), 567 {_, State1} = channel_cleanup(ChPid, State), 568 maybe_close(control_throttle(State1)); 569handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) -> 570 %% rabbitmq/rabbitmq-server#544 571 %% The connection port process has exited due to the TCP socket being closed. 572 %% Handle this case in the same manner as receiving {error, closed} 573 stop(closed, State); 574handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> 575 Msg = io_lib:format("broker forced connection closure with reason '~w'", [Reason]), 576 terminate(Msg, State), 577 %% this is what we are expected to do according to 578 %% https://www.erlang.org/doc/man/sys.html 579 %% 580 %% If we wanted to be *really* nice we should wait for a while for 581 %% clients to close the socket at their end, just as we do in the 582 %% ordinary error case. However, since this termination is 583 %% initiated by our parent it is probably more important to exit 584 %% quickly. 585 maybe_emit_stats(State), 586 exit(Reason); 587handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) -> 588 maybe_emit_stats(State), 589 throw(E); 590handle_other({channel_exit, Channel, Reason}, State) -> 591 handle_exception(State, Channel, Reason); 592handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) -> 593 handle_dependent_exit(ChPid, Reason, State); 594handle_other(terminate_connection, State) -> 595 maybe_emit_stats(State), 596 stop; 597handle_other(handshake_timeout, State) 598 when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) -> 599 State; 600handle_other(handshake_timeout, State) -> 601 maybe_emit_stats(State), 602 throw({handshake_timeout, State#v1.callback}); 603handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> 604 State; 605handle_other(heartbeat_timeout, 606 State = #v1{connection = #connection{timeout_sec = T}}) -> 607 maybe_emit_stats(State), 608 throw({heartbeat_timeout, T}); 609handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> 610 {ForceTermination, NewState} = terminate(Explanation, State), 611 gen_server:reply(From, ok), 612 case ForceTermination of 613 force -> stop; 614 normal -> NewState 615 end; 616handle_other({'$gen_call', From, info}, State) -> 617 gen_server:reply(From, infos(?INFO_KEYS, State)), 618 State; 619handle_other({'$gen_call', From, {info, Items}}, State) -> 620 gen_server:reply(From, try {ok, infos(Items, State)} 621 catch Error -> {error, Error} 622 end), 623 State; 624handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) 625 when ?IS_RUNNING(State) -> 626 rabbit_event:notify( 627 connection_created, 628 augment_infos_with_user_provided_connection_name( 629 [{type, network} | infos(?CREATION_EVENT_KEYS, State)], State), 630 Ref), 631 rabbit_event:init_stats_timer(State, #v1.stats_timer); 632handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> 633 %% Ignore, we will emit a created event once we start running. 634 State; 635handle_other(ensure_stats, State) -> 636 ensure_stats_timer(State); 637handle_other(emit_stats, State) -> 638 emit_stats(State); 639handle_other({bump_credit, Msg}, State) -> 640 %% Here we are receiving credit by some channel process. 641 credit_flow:handle_bump_msg(Msg), 642 control_throttle(State); 643handle_other(Other, State) -> 644 %% internal error -> something worth dying for 645 maybe_emit_stats(State), 646 exit({unexpected_message, Other}). 647 648switch_callback(State, Callback, Length) -> 649 State#v1{callback = Callback, recv_len = Length}. 650 651terminate(Explanation, State) when ?IS_RUNNING(State) -> 652 {normal, handle_exception(State, 0, 653 rabbit_misc:amqp_error( 654 connection_forced, "~s", [Explanation], none))}; 655terminate(_Explanation, State) -> 656 {force, State}. 657 658send_blocked(#v1{connection = #connection{protocol = Protocol, 659 capabilities = Capabilities}, 660 sock = Sock}, Reason) -> 661 case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of 662 {bool, true} -> 663 664 ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, 665 Protocol); 666 _ -> 667 ok 668 end. 669 670send_unblocked(#v1{connection = #connection{protocol = Protocol, 671 capabilities = Capabilities}, 672 sock = Sock}) -> 673 case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of 674 {bool, true} -> 675 ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol); 676 _ -> 677 ok 678 end. 679 680%%-------------------------------------------------------------------------- 681%% error handling / termination 682 683close_connection(State = #v1{queue_collector = Collector, 684 connection = #connection{ 685 timeout_sec = TimeoutSec}}) -> 686 %% The spec says "Exclusive queues may only be accessed by the 687 %% current connection, and are deleted when that connection 688 %% closes." This does not strictly imply synchrony, but in 689 %% practice it seems to be what people assume. 690 clean_up_exclusive_queues(Collector), 691 %% We terminate the connection after the specified interval, but 692 %% no later than ?CLOSING_TIMEOUT seconds. 693 erlang:send_after((if TimeoutSec > 0 andalso 694 TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; 695 true -> ?CLOSING_TIMEOUT 696 end) * 1000, self(), terminate_connection), 697 State#v1{connection_state = closed}. 698 699%% queue collector will be undefined when connection 700%% tuning was never performed or didn't finish. In such cases 701%% there's also nothing to clean up. 702clean_up_exclusive_queues(undefined) -> 703 ok; 704 705clean_up_exclusive_queues(Collector) -> 706 rabbit_queue_collector:delete_all(Collector). 707 708handle_dependent_exit(ChPid, Reason, State) -> 709 {Channel, State1} = channel_cleanup(ChPid, State), 710 case {Channel, termination_kind(Reason)} of 711 {undefined, controlled} -> State1; 712 {undefined, uncontrolled} -> handle_uncontrolled_channel_close(ChPid), 713 exit({abnormal_dependent_exit, 714 ChPid, Reason}); 715 {_, controlled} -> maybe_close(control_throttle(State1)); 716 {_, uncontrolled} -> handle_uncontrolled_channel_close(ChPid), 717 State2 = handle_exception( 718 State1, Channel, Reason), 719 maybe_close(control_throttle(State2)) 720 end. 721 722terminate_channels(#v1{channel_count = 0} = State) -> 723 State; 724terminate_channels(#v1{channel_count = ChannelCount} = State) -> 725 lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), 726 Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount, 727 TimerRef = erlang:send_after(Timeout, self(), cancel_wait), 728 wait_for_channel_termination(ChannelCount, TimerRef, State). 729 730wait_for_channel_termination(0, TimerRef, State) -> 731 case erlang:cancel_timer(TimerRef) of 732 false -> receive 733 cancel_wait -> State 734 end; 735 _ -> State 736 end; 737wait_for_channel_termination(N, TimerRef, 738 State = #v1{connection_state = CS, 739 connection = #connection{ 740 log_name = ConnName, 741 user = User, 742 vhost = VHost}, 743 sock = Sock}) -> 744 receive 745 {'DOWN', _MRef, process, ChPid, Reason} -> 746 {Channel, State1} = channel_cleanup(ChPid, State), 747 case {Channel, termination_kind(Reason)} of 748 {undefined, _} -> 749 exit({abnormal_dependent_exit, ChPid, Reason}); 750 {_, controlled} -> 751 wait_for_channel_termination(N-1, TimerRef, State1); 752 {_, uncontrolled} -> 753 rabbit_log_connection:error( 754 "Error on AMQP connection ~p (~s, vhost: '~s'," 755 " user: '~s', state: ~p), channel ~p:" 756 "error while terminating:~n~p", 757 [self(), ConnName, VHost, User#user.username, 758 CS, Channel, Reason]), 759 handle_uncontrolled_channel_close(ChPid), 760 wait_for_channel_termination(N-1, TimerRef, State1) 761 end; 762 {'EXIT', Sock, _Reason} -> 763 clean_up_all_channels(State), 764 exit(normal); 765 cancel_wait -> 766 exit(channel_termination_timeout) 767 end. 768 769maybe_close(State = #v1{connection_state = closing, 770 channel_count = 0, 771 connection = #connection{protocol = Protocol}, 772 sock = Sock}) -> 773 NewState = close_connection(State), 774 ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), 775 NewState; 776maybe_close(State) -> 777 State. 778 779termination_kind(normal) -> controlled; 780termination_kind(_) -> uncontrolled. 781 782format_hard_error(#amqp_error{name = N, explanation = E, method = M}) -> 783 io_lib:format("operation ~s caused a connection exception ~s: ~p", [M, N, E]); 784format_hard_error(Reason) -> 785 case io_lib:deep_char_list(Reason) of 786 true -> Reason; 787 false -> rabbit_misc:format("~p", [Reason]) 788 end. 789 790log_hard_error(#v1{connection_state = CS, 791 connection = #connection{ 792 log_name = ConnName, 793 user = User, 794 vhost = VHost}}, Channel, Reason) -> 795 rabbit_log_connection:error( 796 "Error on AMQP connection ~p (~s, vhost: '~s'," 797 " user: '~s', state: ~p), channel ~p:~n ~s", 798 [self(), ConnName, VHost, User#user.username, CS, Channel, format_hard_error(Reason)]). 799 800handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> 801 log_hard_error(State, Channel, Reason), 802 State; 803handle_exception(State = #v1{connection = #connection{protocol = Protocol}, 804 connection_state = CS}, 805 Channel, Reason) 806 when ?IS_RUNNING(State) orelse CS =:= closing -> 807 respond_and_close(State, Channel, Protocol, Reason, Reason); 808%% authentication failure 809handle_exception(State = #v1{connection = #connection{protocol = Protocol, 810 log_name = ConnName, 811 capabilities = Capabilities}, 812 connection_state = starting}, 813 Channel, Reason = #amqp_error{name = access_refused, 814 explanation = ErrMsg}) -> 815 rabbit_log_connection:error( 816 "Error on AMQP connection ~p (~s, state: ~p):~n~s", 817 [self(), ConnName, starting, ErrMsg]), 818 %% respect authentication failure notification capability 819 case rabbit_misc:table_lookup(Capabilities, 820 <<"authentication_failure_close">>) of 821 {bool, true} -> 822 send_error_on_channel0_and_close(Channel, Protocol, Reason, State); 823 _ -> 824 close_connection(terminate_channels(State)) 825 end; 826%% when loopback-only user tries to connect from a non-local host 827%% when user tries to access a vhost it has no permissions for 828handle_exception(State = #v1{connection = #connection{protocol = Protocol, 829 log_name = ConnName, 830 user = User}, 831 connection_state = opening}, 832 Channel, Reason = #amqp_error{name = not_allowed, 833 explanation = ErrMsg}) -> 834 rabbit_log_connection:error( 835 "Error on AMQP connection ~p (~s, user: '~s', state: ~p):~n~s", 836 [self(), ConnName, User#user.username, opening, ErrMsg]), 837 send_error_on_channel0_and_close(Channel, Protocol, Reason, State); 838handle_exception(State = #v1{connection = #connection{protocol = Protocol}, 839 connection_state = CS = opening}, 840 Channel, Reason = #amqp_error{}) -> 841 respond_and_close(State, Channel, Protocol, Reason, 842 {handshake_error, CS, Reason}); 843%% when negotiation fails, e.g. due to channel_max being higher than the 844%% maximum allowed limit 845handle_exception(State = #v1{connection = #connection{protocol = Protocol, 846 log_name = ConnName, 847 user = User}, 848 connection_state = tuning}, 849 Channel, Reason = #amqp_error{name = not_allowed, 850 explanation = ErrMsg}) -> 851 rabbit_log_connection:error( 852 "Error on AMQP connection ~p (~s," 853 " user: '~s', state: ~p):~n~s", 854 [self(), ConnName, User#user.username, tuning, ErrMsg]), 855 send_error_on_channel0_and_close(Channel, Protocol, Reason, State); 856handle_exception(State, Channel, Reason) -> 857 %% We don't trust the client at this point - force them to wait 858 %% for a bit so they can't DOS us with repeated failed logins etc. 859 timer:sleep(?SILENT_CLOSE_DELAY * 1000), 860 throw({handshake_error, State#v1.connection_state, Channel, Reason}). 861 862%% we've "lost sync" with the client and hence must not accept any 863%% more input 864-spec fatal_frame_error(_, _, _, _, _) -> no_return(). 865fatal_frame_error(Error, Type, Channel, Payload, State) -> 866 frame_error(Error, Type, Channel, Payload, State), 867 %% grace period to allow transmission of error 868 timer:sleep(?SILENT_CLOSE_DELAY * 1000), 869 throw(fatal_frame_error). 870 871frame_error(Error, Type, Channel, Payload, State) -> 872 {Str, Bin} = payload_snippet(Payload), 873 handle_exception(State, Channel, 874 rabbit_misc:amqp_error(frame_error, 875 "type ~p, ~s octets = ~p: ~p", 876 [Type, Str, Bin, Error], none)). 877 878unexpected_frame(Type, Channel, Payload, State) -> 879 {Str, Bin} = payload_snippet(Payload), 880 handle_exception(State, Channel, 881 rabbit_misc:amqp_error(unexpected_frame, 882 "type ~p, ~s octets = ~p", 883 [Type, Str, Bin], none)). 884 885payload_snippet(Payload) when size(Payload) =< 16 -> 886 {"all", Payload}; 887payload_snippet(<<Snippet:16/binary, _/binary>>) -> 888 {"first 16", Snippet}. 889 890%%-------------------------------------------------------------------------- 891 892create_channel(_Channel, 893 #v1{channel_count = ChannelCount, 894 connection = #connection{channel_max = ChannelMax}}) 895 when ChannelMax /= 0 andalso ChannelCount >= ChannelMax -> 896 {error, rabbit_misc:amqp_error( 897 not_allowed, "number of channels opened (~w) has reached the " 898 "negotiated channel_max (~w)", 899 [ChannelCount, ChannelMax], 'none')}; 900create_channel(Channel, 901 #v1{sock = Sock, 902 queue_collector = Collector, 903 channel_sup_sup_pid = ChanSupSup, 904 channel_count = ChannelCount, 905 connection = 906 #connection{name = Name, 907 protocol = Protocol, 908 frame_max = FrameMax, 909 vhost = VHost, 910 capabilities = Capabilities, 911 user = #user{username = Username} = User} 912 } = State) -> 913 case rabbit_auth_backend_internal:is_over_channel_limit(Username) of 914 false -> 915 {ok, _ChSupPid, {ChPid, AState}} = 916 rabbit_channel_sup_sup:start_channel( 917 ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, 918 Protocol, User, VHost, Capabilities, 919 Collector}), 920 MRef = erlang:monitor(process, ChPid), 921 put({ch_pid, ChPid}, {Channel, MRef}), 922 put({channel, Channel}, {ChPid, AState}), 923 {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}; 924 {true, Limit} -> 925 {error, rabbit_misc:amqp_error(not_allowed, 926 "number of channels opened for user '~s' has reached " 927 "the maximum allowed user limit of (~w)", 928 [Username, Limit], 'none')} 929 end. 930 931channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> 932 case get({ch_pid, ChPid}) of 933 undefined -> {undefined, State}; 934 {Channel, MRef} -> credit_flow:peer_down(ChPid), 935 erase({channel, Channel}), 936 erase({ch_pid, ChPid}), 937 erlang:demonitor(MRef, [flush]), 938 {Channel, State#v1{channel_count = ChannelCount - 1}} 939 end. 940 941all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. 942 943clean_up_all_channels(State) -> 944 CleanupFun = fun(ChPid) -> 945 channel_cleanup(ChPid, State) 946 end, 947 lists:foreach(CleanupFun, all_channels()). 948 949%%-------------------------------------------------------------------------- 950 951handle_frame(Type, 0, Payload, 952 State = #v1{connection = #connection{protocol = Protocol}}) 953 when ?IS_STOPPING(State) -> 954 case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of 955 {method, MethodName, FieldsBin} -> 956 handle_method0(MethodName, FieldsBin, State); 957 _Other -> State 958 end; 959handle_frame(Type, 0, Payload, 960 State = #v1{connection = #connection{protocol = Protocol}}) -> 961 case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of 962 error -> frame_error(unknown_frame, Type, 0, Payload, State); 963 heartbeat -> State; 964 {method, MethodName, FieldsBin} -> 965 handle_method0(MethodName, FieldsBin, State); 966 _Other -> unexpected_frame(Type, 0, Payload, State) 967 end; 968handle_frame(Type, Channel, Payload, 969 State = #v1{connection = #connection{protocol = Protocol}}) 970 when ?IS_RUNNING(State) -> 971 case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of 972 error -> frame_error(unknown_frame, Type, Channel, Payload, State); 973 heartbeat -> unexpected_frame(Type, Channel, Payload, State); 974 Frame -> process_frame(Frame, Channel, State) 975 end; 976handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) -> 977 State; 978handle_frame(Type, Channel, Payload, State) -> 979 unexpected_frame(Type, Channel, Payload, State). 980 981process_frame(Frame, Channel, State) -> 982 ChKey = {channel, Channel}, 983 case (case get(ChKey) of 984 undefined -> create_channel(Channel, State); 985 Other -> {ok, Other, State} 986 end) of 987 {error, Error} -> 988 handle_exception(State, Channel, Error); 989 {ok, {ChPid, AState}, State1} -> 990 case rabbit_command_assembler:process(Frame, AState) of 991 {ok, NewAState} -> 992 put(ChKey, {ChPid, NewAState}), 993 post_process_frame(Frame, ChPid, State1); 994 {ok, Method, NewAState} -> 995 rabbit_channel:do(ChPid, Method), 996 put(ChKey, {ChPid, NewAState}), 997 post_process_frame(Frame, ChPid, State1); 998 {ok, Method, Content, NewAState} -> 999 rabbit_channel:do_flow(ChPid, Method, Content), 1000 put(ChKey, {ChPid, NewAState}), 1001 post_process_frame(Frame, ChPid, control_throttle(State1)); 1002 {error, Reason} -> 1003 handle_exception(State1, Channel, Reason) 1004 end 1005 end. 1006 1007post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> 1008 {_, State1} = channel_cleanup(ChPid, State), 1009 %% This is not strictly necessary, but more obviously 1010 %% correct. Also note that we do not need to call maybe_close/1 1011 %% since we cannot possibly be in the 'closing' state. 1012 control_throttle(State1); 1013post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> 1014 publish_received(State); 1015post_process_frame({content_body, _}, _ChPid, State) -> 1016 publish_received(State); 1017post_process_frame(_Frame, _ChPid, State) -> 1018 State. 1019 1020%%-------------------------------------------------------------------------- 1021 1022%% We allow clients to exceed the frame size a little bit since quite 1023%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. 1024-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). 1025 1026handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>, 1027 State = #v1{connection = #connection{frame_max = FrameMax}}) 1028 when FrameMax /= 0 andalso 1029 PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> 1030 fatal_frame_error( 1031 {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, 1032 Type, Channel, <<>>, State); 1033handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, 1034 Payload:PayloadSize/binary, ?FRAME_END, 1035 Rest/binary>>, 1036 State) -> 1037 {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))}; 1038handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>, 1039 State) -> 1040 {Rest, ensure_stats_timer( 1041 switch_callback(State, 1042 {frame_payload, Type, Channel, PayloadSize}, 1043 PayloadSize + 1))}; 1044handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> 1045 <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data, 1046 case EndMarker of 1047 ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), 1048 {Rest, switch_callback(State1, frame_header, 7)}; 1049 _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, 1050 Type, Channel, Payload, State) 1051 end; 1052handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) -> 1053 {Rest, handshake({A, B, C, D}, State)}; 1054handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) -> 1055 refuse_connection(Sock, {bad_header, Other}); 1056handle_input(Callback, Data, _State) -> 1057 throw({bad_input, Callback, Data}). 1058 1059%% The two rules pertaining to version negotiation: 1060%% 1061%% * If the server cannot support the protocol specified in the 1062%% protocol header, it MUST respond with a valid protocol header and 1063%% then close the socket connection. 1064%% 1065%% * The server MUST provide a protocol version that is lower than or 1066%% equal to that requested by the client in the protocol header. 1067handshake({0, 0, 9, 1}, State) -> 1068 start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); 1069 1070%% This is the protocol header for 0-9, which we can safely treat as 1071%% though it were 0-9-1. 1072handshake({1, 1, 0, 9}, State) -> 1073 start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); 1074 1075%% This is what most clients send for 0-8. The 0-8 spec, confusingly, 1076%% defines the version as 8-0. 1077handshake({1, 1, 8, 0}, State) -> 1078 start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); 1079 1080%% The 0-8 spec as on the AMQP web site actually has this as the 1081%% protocol header; some libraries e.g., py-amqplib, send it when they 1082%% want 0-8. 1083handshake({1, 1, 9, 1}, State) -> 1084 start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); 1085 1086%% ... and finally, the 1.0 spec is crystal clear! 1087handshake({Id, 1, 0, 0}, State) -> 1088 become_1_0(Id, State); 1089 1090handshake(Vsn, #v1{sock = Sock}) -> 1091 refuse_connection(Sock, {bad_version, Vsn}). 1092 1093%% Offer a protocol version to the client. Connection.start only 1094%% includes a major and minor version number, Luckily 0-9 and 0-9-1 1095%% are similar enough that clients will be happy with either. 1096start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, 1097 Protocol, 1098 State = #v1{sock = Sock, connection = Connection}) -> 1099 rabbit_networking:register_connection(self()), 1100 Start = #'connection.start'{ 1101 version_major = ProtocolMajor, 1102 version_minor = ProtocolMinor, 1103 server_properties = server_properties(Protocol), 1104 mechanisms = auth_mechanisms_binary(Sock), 1105 locales = <<"en_US">> }, 1106 ok = send_on_channel0(Sock, Start, Protocol), 1107 switch_callback(State#v1{connection = Connection#connection{ 1108 timeout_sec = ?NORMAL_TIMEOUT, 1109 protocol = Protocol}, 1110 connection_state = starting}, 1111 frame_header, 7). 1112 1113-spec refuse_connection(_, _, _) -> no_return(). 1114refuse_connection(Sock, Exception, {A, B, C, D}) -> 1115 ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end), 1116 throw(Exception). 1117 1118-spec refuse_connection(rabbit_net:socket(), any()) -> no_return(). 1119 1120refuse_connection(Sock, Exception) -> 1121 refuse_connection(Sock, Exception, {0, 0, 9, 1}). 1122 1123ensure_stats_timer(State = #v1{connection_state = running}) -> 1124 rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats); 1125ensure_stats_timer(State) -> 1126 State. 1127 1128%%-------------------------------------------------------------------------- 1129 1130handle_method0(MethodName, FieldsBin, 1131 State = #v1{connection = #connection{protocol = Protocol}}) -> 1132 try 1133 handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), 1134 State) 1135 catch throw:{inet_error, E} when E =:= closed; E =:= enotconn -> 1136 maybe_emit_stats(State), 1137 throw({connection_closed_abruptly, State}); 1138 exit:#amqp_error{method = none} = Reason -> 1139 handle_exception(State, 0, Reason#amqp_error{method = MethodName}); 1140 Type:Reason:Stacktrace -> 1141 handle_exception(State, 0, {Type, Reason, MethodName, Stacktrace}) 1142 end. 1143 1144handle_method0(#'connection.start_ok'{mechanism = Mechanism, 1145 response = Response, 1146 client_properties = ClientProperties}, 1147 State0 = #v1{connection_state = starting, 1148 connection = Connection0, 1149 sock = Sock}) -> 1150 AuthMechanism = auth_mechanism_to_module(Mechanism, Sock), 1151 Capabilities = 1152 case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of 1153 {table, Capabilities1} -> Capabilities1; 1154 _ -> [] 1155 end, 1156 Connection1 = Connection0#connection{ 1157 client_properties = ClientProperties, 1158 capabilities = Capabilities, 1159 auth_mechanism = {Mechanism, AuthMechanism}, 1160 auth_state = AuthMechanism:init(Sock)}, 1161 Connection2 = augment_connection_log_name(Connection1), 1162 State = State0#v1{connection_state = securing, 1163 connection = Connection2}, 1164 % adding client properties to process dictionary to send them later 1165 % in the connection_closed event 1166 put(client_properties, ClientProperties), 1167 case user_provided_connection_name(Connection2) of 1168 undefined -> 1169 undefined; 1170 UserProvidedConnectionName -> 1171 put(connection_user_provided_name, UserProvidedConnectionName) 1172 end, 1173 auth_phase(Response, State); 1174 1175handle_method0(#'connection.secure_ok'{response = Response}, 1176 State = #v1{connection_state = securing}) -> 1177 auth_phase(Response, State); 1178 1179handle_method0(#'connection.tune_ok'{frame_max = FrameMax, 1180 channel_max = ChannelMax, 1181 heartbeat = ClientHeartbeat}, 1182 State = #v1{connection_state = tuning, 1183 connection = Connection, 1184 helper_sup = SupPid, 1185 sock = Sock}) -> 1186 ok = validate_negotiated_integer_value( 1187 frame_max, ?FRAME_MIN_SIZE, FrameMax), 1188 ok = validate_negotiated_integer_value( 1189 channel_max, ?CHANNEL_MIN, ChannelMax), 1190 {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector( 1191 SupPid, Connection#connection.name), 1192 Frame = rabbit_binary_generator:build_heartbeat_frame(), 1193 Parent = self(), 1194 SendFun = 1195 fun() -> 1196 case catch rabbit_net:send(Sock, Frame) of 1197 ok -> 1198 ok; 1199 {error, Reason} -> 1200 Parent ! {heartbeat_send_error, Reason}; 1201 Unexpected -> 1202 Parent ! {heartbeat_send_error, Unexpected} 1203 end, 1204 ok 1205 end, 1206 ReceiveFun = fun() -> Parent ! heartbeat_timeout end, 1207 Heartbeater = rabbit_heartbeat:start( 1208 SupPid, Sock, Connection#connection.name, 1209 ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), 1210 State#v1{connection_state = opening, 1211 connection = Connection#connection{ 1212 frame_max = FrameMax, 1213 channel_max = ChannelMax, 1214 timeout_sec = ClientHeartbeat}, 1215 queue_collector = Collector, 1216 heartbeater = Heartbeater}; 1217 1218handle_method0(#'connection.open'{virtual_host = VHost}, 1219 State = #v1{connection_state = opening, 1220 connection = Connection = #connection{ 1221 log_name = ConnName, 1222 user = User = #user{username = Username}, 1223 protocol = Protocol}, 1224 helper_sup = SupPid, 1225 sock = Sock, 1226 throttle = Throttle}) -> 1227 1228 ok = is_over_vhost_connection_limit(VHost, User), 1229 ok = is_over_user_connection_limit(User), 1230 ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}), 1231 ok = is_vhost_alive(VHost, User), 1232 NewConnection = Connection#connection{vhost = VHost}, 1233 ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), 1234 1235 Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), 1236 BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]), 1237 Throttle1 = Throttle#throttle{blocked_by = BlockedBy}, 1238 1239 {ok, ChannelSupSupPid} = 1240 rabbit_connection_helper_sup:start_channel_sup_sup(SupPid), 1241 State1 = control_throttle( 1242 State#v1{connection_state = running, 1243 connection = NewConnection, 1244 channel_sup_sup_pid = ChannelSupSupPid, 1245 throttle = Throttle1}), 1246 Infos = augment_infos_with_user_provided_connection_name( 1247 [{type, network} | infos(?CREATION_EVENT_KEYS, State1)], 1248 State1 1249 ), 1250 rabbit_core_metrics:connection_created(proplists:get_value(pid, Infos), 1251 Infos), 1252 rabbit_event:notify(connection_created, Infos), 1253 maybe_emit_stats(State1), 1254 rabbit_log_connection:info( 1255 "connection ~p (~s): " 1256 "user '~s' authenticated and granted access to vhost '~s'", 1257 [self(), dynamic_connection_name(ConnName), Username, VHost]), 1258 State1; 1259handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> 1260 lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), 1261 maybe_close(State#v1{connection_state = closing}); 1262handle_method0(#'connection.close'{}, 1263 State = #v1{connection = #connection{protocol = Protocol}, 1264 sock = Sock}) 1265 when ?IS_STOPPING(State) -> 1266 %% We're already closed or closing, so we don't need to cleanup 1267 %% anything. 1268 ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), 1269 State; 1270handle_method0(#'connection.close_ok'{}, 1271 State = #v1{connection_state = closed}) -> 1272 self() ! terminate_connection, 1273 State; 1274handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reason}, 1275 State = #v1{connection = 1276 #connection{protocol = Protocol, 1277 user = User = #user{username = Username}, 1278 log_name = ConnName} = Conn, 1279 sock = Sock}) when ?IS_RUNNING(State) -> 1280 rabbit_log_connection:debug( 1281 "connection ~p (~s) of user '~s': " 1282 "asked to update secret, reason: ~s", 1283 [self(), dynamic_connection_name(ConnName), Username, Reason]), 1284 case rabbit_access_control:update_state(User, NewSecret) of 1285 {ok, User1} -> 1286 %% User/auth backend state has been updated. Now we can propagate it to channels 1287 %% asynchronously and return. All the channels have to do is to update their 1288 %% own state. 1289 %% 1290 %% Any secret update errors coming from the authz backend will be handled in the other branch. 1291 %% Therefore we optimistically do no error handling here. MK. 1292 lists:foreach(fun(Ch) -> 1293 rabbit_log:debug("Updating user/auth backend state for channel ~p", [Ch]), 1294 _ = rabbit_channel:update_user_state(Ch, User1) 1295 end, all_channels()), 1296 ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol), 1297 rabbit_log_connection:info( 1298 "connection ~p (~s): " 1299 "user '~s' updated secret, reason: ~s", 1300 [self(), dynamic_connection_name(ConnName), Username, Reason]), 1301 State#v1{connection = Conn#connection{user = User1}}; 1302 {refused, Message} -> 1303 rabbit_log_connection:error("Secret update was refused for user '~s': ~p", 1304 [Username, Message]), 1305 rabbit_misc:protocol_error(not_allowed, "New secret was refused by one of the backends", []); 1306 {error, Message} -> 1307 rabbit_log_connection:error("Secret update for user '~s' failed: ~p", 1308 [Username, Message]), 1309 rabbit_misc:protocol_error(not_allowed, 1310 "Secret update failed", []) 1311 end; 1312handle_method0(_Method, State) when ?IS_STOPPING(State) -> 1313 State; 1314handle_method0(_Method, #v1{connection_state = S}) -> 1315 rabbit_misc:protocol_error( 1316 channel_error, "unexpected method in connection state ~w", [S]). 1317 1318is_vhost_alive(VHostPath, User) -> 1319 case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of 1320 true -> ok; 1321 false -> 1322 rabbit_misc:protocol_error(internal_error, 1323 "access to vhost '~s' refused for user '~s': " 1324 "vhost '~s' is down", 1325 [VHostPath, User#user.username, VHostPath]) 1326 end. 1327 1328is_over_vhost_connection_limit(VHostPath, User) -> 1329 try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of 1330 false -> ok; 1331 {true, Limit} -> rabbit_misc:protocol_error(not_allowed, 1332 "access to vhost '~s' refused for user '~s': " 1333 "connection limit (~p) is reached", 1334 [VHostPath, User#user.username, Limit]) 1335 catch 1336 throw:{error, {no_such_vhost, VHostPath}} -> 1337 rabbit_misc:protocol_error(not_allowed, "vhost ~s not found", [VHostPath]) 1338 end. 1339 1340is_over_user_connection_limit(#user{username = Username}) -> 1341 case rabbit_auth_backend_internal:is_over_connection_limit(Username) of 1342 false -> ok; 1343 {true, Limit} -> rabbit_misc:protocol_error(not_allowed, 1344 "Connection refused for user '~s': " 1345 "user connection limit (~p) is reached", 1346 [Username, Limit]) 1347 end. 1348 1349validate_negotiated_integer_value(Field, Min, ClientValue) -> 1350 ServerValue = get_env(Field), 1351 if ClientValue /= 0 andalso ClientValue < Min -> 1352 fail_negotiation(Field, min, Min, ClientValue); 1353 ServerValue /= 0 andalso (ClientValue =:= 0 orelse 1354 ClientValue > ServerValue) -> 1355 fail_negotiation(Field, max, ServerValue, ClientValue); 1356 true -> 1357 ok 1358 end. 1359 1360%% keep dialyzer happy 1361-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) -> 1362 no_return(). 1363fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> 1364 {S1, S2} = case MinOrMax of 1365 min -> {lower, minimum}; 1366 max -> {higher, maximum} 1367 end, 1368 ClientValueDetail = get_client_value_detail(Field, ClientValue), 1369 rabbit_misc:protocol_error( 1370 not_allowed, "negotiated ~w = ~w~s is ~w than the ~w allowed value (~w)", 1371 [Field, ClientValue, ClientValueDetail, S1, S2, ServerValue], 'connection.tune'). 1372 1373get_env(Key) -> 1374 {ok, Value} = application:get_env(rabbit, Key), 1375 Value. 1376 1377send_on_channel0(Sock, Method, Protocol) -> 1378 ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). 1379 1380auth_mechanism_to_module(TypeBin, Sock) -> 1381 case rabbit_registry:binary_to_type(TypeBin) of 1382 {error, not_found} -> 1383 rabbit_misc:protocol_error( 1384 command_invalid, "unknown authentication mechanism '~s'", 1385 [TypeBin]); 1386 T -> 1387 case {lists:member(T, auth_mechanisms(Sock)), 1388 rabbit_registry:lookup_module(auth_mechanism, T)} of 1389 {true, {ok, Module}} -> 1390 Module; 1391 _ -> 1392 rabbit_misc:protocol_error( 1393 command_invalid, 1394 "invalid authentication mechanism '~s'", [T]) 1395 end 1396 end. 1397 1398auth_mechanisms(Sock) -> 1399 {ok, Configured} = application:get_env(auth_mechanisms), 1400 [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism), 1401 Module:should_offer(Sock), lists:member(Name, Configured)]. 1402 1403auth_mechanisms_binary(Sock) -> 1404 list_to_binary( 1405 string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). 1406 1407auth_phase(Response, 1408 State = #v1{connection = Connection = 1409 #connection{protocol = Protocol, 1410 auth_mechanism = {Name, AuthMechanism}, 1411 auth_state = AuthState}, 1412 sock = Sock}) -> 1413 rabbit_log:debug("Raw client connection hostname during authN phase: ~p", [Connection#connection.host]), 1414 RemoteAddress = case Connection#connection.host of 1415 %% the hostname was already resolved, e.g. by reverse DNS lookups 1416 Bin when is_binary(Bin) -> Bin; 1417 %% the hostname is an IP address 1418 Tuple when is_tuple(Tuple) -> 1419 rabbit_data_coercion:to_binary(inet:ntoa(Connection#connection.host)); 1420 Other -> rabbit_data_coercion:to_binary(Other) 1421 end, 1422 rabbit_log:debug("Resolved client hostname during authN phase: ~s", [RemoteAddress]), 1423 case AuthMechanism:handle_response(Response, AuthState) of 1424 {refused, Username, Msg, Args} -> 1425 rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091), 1426 auth_fail(Username, Msg, Args, Name, State); 1427 {protocol_error, Msg, Args} -> 1428 rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, amqp091), 1429 notify_auth_result(none, user_authentication_failure, 1430 [{error, rabbit_misc:format(Msg, Args)}], 1431 State), 1432 rabbit_misc:protocol_error(syntax_error, Msg, Args); 1433 {challenge, Challenge, AuthState1} -> 1434 rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, amqp091), 1435 Secure = #'connection.secure'{challenge = Challenge}, 1436 ok = send_on_channel0(Sock, Secure, Protocol), 1437 State#v1{connection = Connection#connection{ 1438 auth_state = AuthState1}}; 1439 {ok, User = #user{username = Username}} -> 1440 case rabbit_access_control:check_user_loopback(Username, Sock) of 1441 ok -> 1442 rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, amqp091), 1443 notify_auth_result(Username, user_authentication_success, 1444 [], State); 1445 not_allowed -> 1446 rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091), 1447 auth_fail(Username, "user '~s' can only connect via " 1448 "localhost", [Username], Name, State) 1449 end, 1450 Tune = #'connection.tune'{frame_max = get_env(frame_max), 1451 channel_max = get_env(channel_max), 1452 heartbeat = get_env(heartbeat)}, 1453 ok = send_on_channel0(Sock, Tune, Protocol), 1454 State#v1{connection_state = tuning, 1455 connection = Connection#connection{user = User, 1456 auth_state = none}} 1457 end. 1458 1459-spec auth_fail 1460 (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) -> 1461 no_return(). 1462 1463auth_fail(Username, Msg, Args, AuthName, 1464 State = #v1{connection = #connection{protocol = Protocol, 1465 capabilities = Capabilities}}) -> 1466 notify_auth_result(Username, user_authentication_failure, 1467 [{error, rabbit_misc:format(Msg, Args)}], State), 1468 AmqpError = rabbit_misc:amqp_error( 1469 access_refused, "~s login refused: ~s", 1470 [AuthName, io_lib:format(Msg, Args)], none), 1471 case rabbit_misc:table_lookup(Capabilities, 1472 <<"authentication_failure_close">>) of 1473 {bool, true} -> 1474 SafeMsg = io_lib:format( 1475 "Login was refused using authentication " 1476 "mechanism ~s. For details see the broker " 1477 "logfile.", [AuthName]), 1478 AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg}, 1479 {0, CloseMethod} = rabbit_binary_generator:map_exception( 1480 0, AmqpError1, Protocol), 1481 ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); 1482 _ -> ok 1483 end, 1484 rabbit_misc:protocol_error(AmqpError). 1485 1486notify_auth_result(Username, AuthResult, ExtraProps, State) -> 1487 EventProps = [{connection_type, network}, 1488 {name, case Username of none -> ''; _ -> Username end}] ++ 1489 [case Item of 1490 name -> {connection_name, i(name, State)}; 1491 _ -> {Item, i(Item, State)} 1492 end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++ 1493 ExtraProps, 1494 rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). 1495 1496%%-------------------------------------------------------------------------- 1497 1498infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. 1499 1500i(pid, #v1{}) -> self(); 1501i(node, #v1{}) -> node(); 1502i(SockStat, S) when SockStat =:= recv_oct; 1503 SockStat =:= recv_cnt; 1504 SockStat =:= send_oct; 1505 SockStat =:= send_cnt; 1506 SockStat =:= send_pend -> 1507 socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end, 1508 fun ([{_, I}]) -> I end, S); 1509i(ssl, #v1{sock = Sock, proxy_socket = ProxySock}) -> 1510 rabbit_net:proxy_ssl_info(Sock, ProxySock) /= nossl; 1511i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S); 1512i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S); 1513i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S); 1514i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); 1515i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); 1516i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); 1517i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); 1518i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount; 1519i(state, #v1{connection_state = ConnectionState, 1520 throttle = #throttle{blocked_by = Reasons, 1521 last_blocked_at = T} = Throttle}) -> 1522 %% not throttled by resource or other longer-term reasons 1523 %% TODO: come up with a sensible function name 1524 case sets:size(sets:del_element(flow, Reasons)) =:= 0 andalso 1525 (credit_flow:blocked() %% throttled by flow now 1526 orelse %% throttled by flow recently 1527 (is_blocked_by_flow(Throttle) andalso T =/= never andalso 1528 erlang:convert_time_unit(erlang:monotonic_time() - T, 1529 native, 1530 micro_seconds) < 5000000)) of 1531 true -> flow; 1532 false -> 1533 case {has_reasons_to_block(Throttle), ConnectionState} of 1534 %% blocked 1535 {_, blocked} -> blocked; 1536 %% not yet blocked (there were no publishes) 1537 {true, running} -> blocking; 1538 %% not blocked 1539 {false, _} -> ConnectionState; 1540 %% catch all to be defensive 1541 _ -> ConnectionState 1542 end 1543 end; 1544i(garbage_collection, _State) -> 1545 rabbit_misc:get_gc_info(self()); 1546i(reductions, _State) -> 1547 {reductions, Reductions} = erlang:process_info(self(), reductions), 1548 Reductions; 1549i(Item, #v1{connection = Conn}) -> ic(Item, Conn). 1550 1551ic(name, #connection{name = Name}) -> Name; 1552ic(host, #connection{host = Host}) -> Host; 1553ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost; 1554ic(port, #connection{port = Port}) -> Port; 1555ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort; 1556ic(protocol, #connection{protocol = none}) -> none; 1557ic(protocol, #connection{protocol = P}) -> P:version(); 1558ic(user, #connection{user = none}) -> ''; 1559ic(user, #connection{user = U}) -> U#user.username; 1560ic(user_who_performed_action, C) -> ic(user, C); 1561ic(vhost, #connection{vhost = VHost}) -> VHost; 1562ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; 1563ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; 1564ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; 1565ic(client_properties, #connection{client_properties = CP}) -> CP; 1566ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; 1567ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; 1568ic(connected_at, #connection{connected_at = T}) -> T; 1569ic(Item, #connection{}) -> throw({bad_argument, Item}). 1570 1571socket_info(Get, Select, #v1{sock = Sock}) -> 1572 case Get(Sock) of 1573 {ok, T} -> case Select(T) of 1574 N when is_number(N) -> N; 1575 _ -> 0 1576 end; 1577 {error, _} -> 0 1578 end. 1579 1580ssl_info(F, #v1{sock = Sock, proxy_socket = ProxySock}) -> 1581 case rabbit_net:proxy_ssl_info(Sock, ProxySock) of 1582 nossl -> ''; 1583 {error, _} -> ''; 1584 {ok, Items} -> 1585 P = proplists:get_value(protocol, Items), 1586 #{cipher := C, 1587 key_exchange := K, 1588 mac := H} = proplists:get_value(selected_cipher_suite, Items), 1589 F({P, {K, C, H}}) 1590 end. 1591 1592cert_info(F, #v1{sock = Sock}) -> 1593 case rabbit_net:peercert(Sock) of 1594 nossl -> ''; 1595 {error, _} -> ''; 1596 {ok, Cert} -> list_to_binary(F(Cert)) 1597 end. 1598 1599maybe_emit_stats(State) -> 1600 rabbit_event:if_enabled(State, #v1.stats_timer, 1601 fun() -> emit_stats(State) end). 1602 1603emit_stats(State) -> 1604 [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I 1605 = infos(?SIMPLE_METRICS, State), 1606 Infos = infos(?OTHER_METRICS, State), 1607 rabbit_core_metrics:connection_stats(Pid, Infos), 1608 rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions), 1609 rabbit_event:notify(connection_stats, Infos ++ I), 1610 State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer), 1611 ensure_stats_timer(State1). 1612 1613%% 1.0 stub 1614-spec become_1_0(non_neg_integer(), #v1{}) -> no_return(). 1615 1616become_1_0(Id, State = #v1{sock = Sock}) -> 1617 case code:is_loaded(rabbit_amqp1_0_reader) of 1618 false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled); 1619 _ -> Mode = case Id of 1620 0 -> amqp; 1621 3 -> sasl; 1622 _ -> refuse_connection( 1623 Sock, {unsupported_amqp1_0_protocol_id, Id}, 1624 {3, 1, 0, 0}) 1625 end, 1626 F = fun (_Deb, Buf, BufLen, S) -> 1627 {rabbit_amqp1_0_reader, init, 1628 [Mode, pack_for_1_0(Buf, BufLen, S)]} 1629 end, 1630 State#v1{connection_state = {become, F}} 1631 end. 1632 1633pack_for_1_0(Buf, BufLen, #v1{parent = Parent, 1634 sock = Sock, 1635 recv_len = RecvLen, 1636 pending_recv = PendingRecv, 1637 helper_sup = SupPid, 1638 proxy_socket = ProxySocket}) -> 1639 {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen, ProxySocket}. 1640 1641respond_and_close(State, Channel, Protocol, Reason, LogErr) -> 1642 log_hard_error(State, Channel, LogErr), 1643 send_error_on_channel0_and_close(Channel, Protocol, Reason, State). 1644 1645send_error_on_channel0_and_close(Channel, Protocol, Reason, State) -> 1646 {0, CloseMethod} = 1647 rabbit_binary_generator:map_exception(Channel, Reason, Protocol), 1648 State1 = close_connection(terminate_channels(State)), 1649 ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol), 1650 State1. 1651 1652%% 1653%% Publisher throttling 1654%% 1655 1656blocked_by_message(#throttle{blocked_by = Reasons}) -> 1657 %% we don't want to report internal flow as a reason here since 1658 %% it is entirely transient 1659 Reasons1 = sets:del_element(flow, Reasons), 1660 RStr = string:join([format_blocked_by(R) || R <- sets:to_list(Reasons1)], " & "), 1661 list_to_binary(rabbit_misc:format("low on ~s", [RStr])). 1662 1663format_blocked_by({resource, memory}) -> "memory"; 1664format_blocked_by({resource, disk}) -> "disk"; 1665format_blocked_by({resource, disc}) -> "disk". 1666 1667update_last_blocked_at(Throttle) -> 1668 Throttle#throttle{last_blocked_at = erlang:monotonic_time()}. 1669 1670connection_blocked_message_sent( 1671 #throttle{connection_blocked_message_sent = BS}) -> BS. 1672 1673should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) -> 1674 should_block(Throttle) 1675 andalso 1676 sets:size(sets:del_element(flow, Reasons)) =/= 0 1677 andalso 1678 not connection_blocked_message_sent(Throttle). 1679 1680should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) -> 1681 connection_blocked_message_sent(Throttle) 1682 andalso 1683 sets:size(sets:del_element(flow, Reasons)) == 0. 1684 1685%% Returns true if we have a reason to block 1686%% this connection. 1687has_reasons_to_block(#throttle{blocked_by = Reasons}) -> 1688 sets:size(Reasons) > 0. 1689 1690is_blocked_by_flow(#throttle{blocked_by = Reasons}) -> 1691 sets:is_element(flow, Reasons). 1692 1693should_block(#throttle{should_block = Val}) -> Val. 1694 1695should_block_connection(Throttle) -> 1696 should_block(Throttle) andalso has_reasons_to_block(Throttle). 1697 1698should_unblock_connection(Throttle) -> 1699 not should_block_connection(Throttle). 1700 1701maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) -> 1702 case should_block_connection(Throttle) of 1703 true -> 1704 State1 = State#v1{connection_state = blocked, 1705 throttle = update_last_blocked_at(Throttle)}, 1706 case CS of 1707 running -> 1708 ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater); 1709 _ -> ok 1710 end, 1711 maybe_send_blocked_or_unblocked(State1); 1712 false -> State 1713 end. 1714 1715maybe_unblock(State = #v1{throttle = Throttle}) -> 1716 case should_unblock_connection(Throttle) of 1717 true -> 1718 ok = rabbit_heartbeat:resume_monitor(State#v1.heartbeater), 1719 State1 = State#v1{connection_state = running, 1720 throttle = Throttle#throttle{should_block = false}}, 1721 maybe_send_unblocked(State1); 1722 false -> State 1723 end. 1724 1725maybe_send_unblocked(State = #v1{throttle = Throttle}) -> 1726 case should_send_unblocked(Throttle) of 1727 true -> 1728 ok = send_unblocked(State), 1729 State#v1{throttle = 1730 Throttle#throttle{connection_blocked_message_sent = false}}; 1731 false -> State 1732 end. 1733 1734maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) -> 1735 case should_send_blocked(Throttle) of 1736 true -> 1737 ok = send_blocked(State, blocked_by_message(Throttle)), 1738 State#v1{throttle = 1739 Throttle#throttle{connection_blocked_message_sent = true}}; 1740 false -> maybe_send_unblocked(State) 1741 end. 1742 1743publish_received(State = #v1{throttle = Throttle}) -> 1744 case has_reasons_to_block(Throttle) of 1745 false -> State; 1746 true -> 1747 Throttle1 = Throttle#throttle{should_block = true}, 1748 maybe_block(State#v1{throttle = Throttle1}) 1749 end. 1750 1751control_throttle(State = #v1{connection_state = CS, 1752 throttle = #throttle{blocked_by = Reasons} = Throttle}) -> 1753 Throttle1 = case credit_flow:blocked() of 1754 true -> 1755 Throttle#throttle{blocked_by = sets:add_element(flow, Reasons)}; 1756 false -> 1757 Throttle#throttle{blocked_by = sets:del_element(flow, Reasons)} 1758 end, 1759 State1 = State#v1{throttle = Throttle1}, 1760 case CS of 1761 running -> maybe_block(State1); 1762 %% unblock or re-enable blocking 1763 blocked -> maybe_block(maybe_unblock(State1)); 1764 _ -> State1 1765 end. 1766 1767augment_connection_log_name(#connection{name = Name} = Connection) -> 1768 case user_provided_connection_name(Connection) of 1769 undefined -> 1770 Connection; 1771 UserSpecifiedName -> 1772 LogName = <<Name/binary, " - ", UserSpecifiedName/binary>>, 1773 rabbit_log_connection:info("Connection ~p (~s) has a client-provided name: ~s", [self(), Name, UserSpecifiedName]), 1774 ?store_proc_name(LogName), 1775 Connection#connection{log_name = LogName} 1776 end. 1777 1778augment_infos_with_user_provided_connection_name(Infos, #v1{connection = Connection}) -> 1779 case user_provided_connection_name(Connection) of 1780 undefined -> 1781 Infos; 1782 UserProvidedConnectionName -> 1783 [{user_provided_name, UserProvidedConnectionName} | Infos] 1784 end. 1785 1786user_provided_connection_name(#connection{client_properties = ClientProperties}) -> 1787 case rabbit_misc:table_lookup(ClientProperties, <<"connection_name">>) of 1788 {longstr, UserSpecifiedName} -> 1789 UserSpecifiedName; 1790 _ -> 1791 undefined 1792 end. 1793 1794dynamic_connection_name(Default) -> 1795 case rabbit_misc:get_proc_name() of 1796 {ok, Name} -> 1797 Name; 1798 _ -> 1799 Default 1800 end. 1801 1802handle_uncontrolled_channel_close(ChPid) -> 1803 rabbit_core_metrics:channel_closed(ChPid), 1804 rabbit_event:notify(channel_closed, [{pid, ChPid}]). 1805 1806-spec get_client_value_detail(atom(), integer()) -> string(). 1807get_client_value_detail(channel_max, 0) -> 1808 " (no limit)"; 1809get_client_value_detail(_Field, _ClientValue) -> 1810 "". 1811