1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_reader).
9
10%% Transitional step until we can require Erlang/OTP 21 and
11%% use the now recommended try/catch syntax for obtaining the stack trace.
12-compile(nowarn_deprecated_function).
13
14%% This is an AMQP 0-9-1 connection implementation. If AMQP 1.0 plugin is enabled,
15%% this module passes control of incoming AMQP 1.0 connections to it.
16%%
17%% Every connection (as in, a process using this module)
18%% is a controlling process for a server socket.
19%%
20%% Connections have a number of responsibilities:
21%%
22%%  * Performing protocol handshake
23%%  * Parsing incoming data and dispatching protocol methods
24%%  * Authenticating clients (with the help of authentication backends)
25%%  * Enforcing TCP backpressure (throttling clients)
26%%  * Enforcing connection limits, e.g. channel_max
27%%  * Channel management
28%%  * Setting up heartbeater and alarm notifications
29%%  * Emitting connection and network activity metric events
30%%  * Gracefully handling client disconnects, channel termination, etc
31%%
32%% and a few more.
33%%
34%% Every connection has
35%%
36%%  * a queue collector which is responsible for keeping
37%%    track of exclusive queues on the connection and their cleanup.
38%%  * a heartbeater that's responsible for sending heartbeat frames to clients,
39%%    keeping track of the incoming ones and notifying connection about
40%%    heartbeat timeouts
41%%  * Stats timer, a timer that is used to periodically emit metric events
42%%
43%% Some dependencies are started under a separate supervisor to avoid deadlocks
44%% during system shutdown. See rabbit_channel_sup:start_link/0 for details.
45%%
46%% Reader processes are special processes (in the OTP sense).
47
48-include_lib("rabbit_common/include/rabbit_framing.hrl").
49-include_lib("rabbit_common/include/rabbit.hrl").
50
51-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/2,
52         shutdown/2]).
53
54-export([system_continue/3, system_terminate/4, system_code_change/4]).
55
56-export([init/3, mainloop/4, recvloop/4]).
57
58-export([conserve_resources/3, server_properties/1]).
59
60-define(NORMAL_TIMEOUT, 3).
61-define(CLOSING_TIMEOUT, 30).
62-define(CHANNEL_TERMINATION_TIMEOUT, 3).
63%% we wait for this many seconds before closing TCP connection
64%% with a client that failed to log in. Provides some relief
65%% from connection storms and DoS.
66-define(SILENT_CLOSE_DELAY, 3).
67-define(CHANNEL_MIN, 1).
68
69%%--------------------------------------------------------------------------
70
71-record(v1, {
72          %% parent process
73          parent,
74          %% socket
75          sock,
76          %% connection state, see connection record
77          connection,
78          callback,
79          recv_len,
80          pending_recv,
81          %% pre_init | securing | running | blocking | blocked | closing | closed | {become, F}
82          connection_state,
83          %% see comment in rabbit_connection_sup:start_link/0
84          helper_sup,
85          %% takes care of cleaning up exclusive queues,
86          %% see rabbit_queue_collector
87          queue_collector,
88          %% sends and receives heartbeat frames,
89          %% see rabbit_heartbeat
90          heartbeater,
91          %% timer used to emit statistics
92          stats_timer,
93          %% channel supervisor
94          channel_sup_sup_pid,
95          %% how many channels this connection has
96          channel_count,
97          %% throttling state, for both
98          %% credit- and resource-driven flow control
99          throttle,
100          proxy_socket}).
101
102-record(throttle, {
103  %% never | timestamp()
104  last_blocked_at,
105  %% a set of the reasons why we are
106  %% blocked: {resource, memory}, {resource, disk}.
107  %% More reasons can be added in the future.
108  blocked_by,
109  %% true if received any publishes, false otherwise
110  %% note that this will also be true when connection is
111  %% already blocked
112  should_block,
113  %% true if we had we sent a connection.blocked,
114  %% false otherwise
115  connection_blocked_message_sent
116}).
117
118-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
119                          send_pend, state, channels, reductions,
120                          garbage_collection]).
121
122-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
123-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
124                        garbage_collection]).
125
126-define(CREATION_EVENT_KEYS,
127        [pid, name, port, peer_port, host,
128        peer_host, ssl, peer_cert_subject, peer_cert_issuer,
129        peer_cert_validity, auth_mechanism, ssl_protocol,
130        ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
131        timeout, frame_max, channel_max, client_properties, connected_at,
132        node, user_who_performed_action]).
133
134-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
135
136-define(AUTH_NOTIFICATION_INFO_KEYS,
137        [host, name, peer_host, peer_port, protocol, auth_mechanism,
138         ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject,
139         peer_cert_validity]).
140
141-define(IS_RUNNING(State),
142        (State#v1.connection_state =:= running orelse
143         State#v1.connection_state =:= blocked)).
144
145-define(IS_STOPPING(State),
146        (State#v1.connection_state =:= closing orelse
147         State#v1.connection_state =:= closed)).
148
149%%--------------------------------------------------------------------------
150
151-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
152                           IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
153                           NodeForWhichAlarmWasSetOrCleared :: node()}.
154
155%%--------------------------------------------------------------------------
156
157-spec start_link(pid(), any()) -> rabbit_types:ok(pid()).
158
159start_link(HelperSup, Ref) ->
160    Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref]),
161
162    {ok, Pid}.
163
164-spec shutdown(pid(), string()) -> 'ok'.
165
166shutdown(Pid, Explanation) ->
167    gen_server:call(Pid, {shutdown, Explanation}, infinity).
168
169-spec init(pid(), pid(), any()) -> no_return().
170
171init(Parent, HelperSup, Ref) ->
172    ?LG_PROCESS_TYPE(reader),
173    {ok, Sock} = rabbit_networking:handshake(Ref,
174        application:get_env(rabbit, proxy_protocol, false)),
175    Deb = sys:debug_options([]),
176    start_connection(Parent, HelperSup, Deb, Sock).
177
178-spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any().
179
180system_continue(Parent, Deb, {Buf, BufLen, State}) ->
181    mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
182
183-spec system_terminate(_,_,_,_) -> no_return().
184
185system_terminate(Reason, _Parent, _Deb, _State) ->
186    exit(Reason).
187
188-spec system_code_change(_,_,_,_) -> {'ok',_}.
189
190system_code_change(Misc, _Module, _OldVsn, _Extra) ->
191    {ok, Misc}.
192
193-spec info_keys() -> rabbit_types:info_keys().
194
195info_keys() -> ?INFO_KEYS.
196
197-spec info(pid()) -> rabbit_types:infos().
198
199info(Pid) ->
200    gen_server:call(Pid, info, infinity).
201
202-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
203
204info(Pid, Items) ->
205    case gen_server:call(Pid, {info, Items}, infinity) of
206        {ok, Res}      -> Res;
207        {error, Error} -> throw(Error)
208    end.
209
210-spec force_event_refresh(pid(), reference()) -> 'ok'.
211
212% Note: https://www.pivotaltracker.com/story/show/166962656
213% This event is necessary for the stats timer to be initialized with
214% the correct values once the management agent has started
215force_event_refresh(Pid, Ref) ->
216    gen_server:cast(Pid, {force_event_refresh, Ref}).
217
218-spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'.
219
220conserve_resources(Pid, Source, {_, Conserve, _}) ->
221    Pid ! {conserve_resources, Source, Conserve},
222    ok.
223
224-spec server_properties(rabbit_types:protocol()) ->
225          rabbit_framing:amqp_table().
226
227server_properties(Protocol) ->
228    {ok, Product} = application:get_key(rabbit, description),
229    {ok, Version} = application:get_key(rabbit, vsn),
230
231    %% Get any configuration-specified server properties
232    {ok, RawConfigServerProps} = application:get_env(rabbit,
233                                                     server_properties),
234
235    %% Normalize the simplified (2-tuple) and unsimplified (3-tuple) forms
236    %% from the config and merge them with the generated built-in properties
237    NormalizedConfigServerProps =
238        [{<<"capabilities">>, table, server_capabilities(Protocol)} |
239         [case X of
240              {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
241                                   longstr,
242                                   maybe_list_to_binary(Value)};
243              {BinKey, Type, Value} -> {BinKey, Type, Value}
244          end || X <- RawConfigServerProps ++
245                     [{product,      Product},
246                      {version,      Version},
247                      {cluster_name, rabbit_nodes:cluster_name()},
248                      {platform,     rabbit_misc:platform_and_version()},
249                      {copyright,    ?COPYRIGHT_MESSAGE},
250                      {information,  ?INFORMATION_MESSAGE}]]],
251
252    %% Filter duplicated properties in favour of config file provided values
253    lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
254                NormalizedConfigServerProps).
255
256maybe_list_to_binary(V) when is_binary(V) -> V;
257maybe_list_to_binary(V) when is_list(V)   -> list_to_binary(V).
258
259server_capabilities(rabbit_framing_amqp_0_9_1) ->
260    [{<<"publisher_confirms">>,           bool, true},
261     {<<"exchange_exchange_bindings">>,   bool, true},
262     {<<"basic.nack">>,                   bool, true},
263     {<<"consumer_cancel_notify">>,       bool, true},
264     {<<"connection.blocked">>,           bool, true},
265     {<<"consumer_priorities">>,          bool, true},
266     {<<"authentication_failure_close">>, bool, true},
267     {<<"per_consumer_qos">>,             bool, true},
268     {<<"direct_reply_to">>,              bool, true}];
269server_capabilities(_) ->
270    [].
271
272%%--------------------------------------------------------------------------
273
274socket_error(Reason) when is_atom(Reason) ->
275    rabbit_log_connection:error("Error on AMQP connection ~p: ~s",
276        [self(), rabbit_misc:format_inet_error(Reason)]);
277socket_error(Reason) ->
278    Fmt = "Error on AMQP connection ~p:~n~p",
279    Args = [self(), Reason],
280    case Reason of
281        %% The socket was closed while upgrading to SSL.
282        %% This is presumably a TCP healthcheck, so don't log
283        %% it unless specified otherwise.
284        {ssl_upgrade_error, closed} ->
285            rabbit_log_connection:debug(Fmt, Args);
286        _ ->
287            rabbit_log_connection:error(Fmt, Args)
288    end.
289
290inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
291
292socket_op(Sock, Fun) ->
293    RealSocket = rabbit_net:unwrap_socket(Sock),
294    case Fun(Sock) of
295        {ok, Res}       -> Res;
296        {error, Reason} -> socket_error(Reason),
297                           rabbit_net:fast_close(RealSocket),
298                           exit(normal)
299    end.
300
301-spec start_connection(pid(), pid(), any(), rabbit_net:socket()) ->
302          no_return().
303
304start_connection(Parent, HelperSup, Deb, Sock) ->
305    process_flag(trap_exit, true),
306    RealSocket = rabbit_net:unwrap_socket(Sock),
307    Name = case rabbit_net:connection_string(Sock, inbound) of
308               {ok, Str}         -> list_to_binary(Str);
309               {error, enotconn} -> rabbit_net:fast_close(RealSocket),
310                                    exit(normal);
311               {error, Reason}   -> socket_error(Reason),
312                                    rabbit_net:fast_close(RealSocket),
313                                    exit(normal)
314           end,
315    {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
316    InitialFrameMax = application:get_env(rabbit, initial_frame_max, ?FRAME_MIN_SIZE),
317    erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
318    {PeerHost, PeerPort, Host, Port} =
319        socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
320    ?store_proc_name(Name),
321    State = #v1{parent              = Parent,
322                sock                = RealSocket,
323                connection          = #connection{
324                  name               = Name,
325                  log_name           = Name,
326                  host               = Host,
327                  peer_host          = PeerHost,
328                  port               = Port,
329                  peer_port          = PeerPort,
330                  protocol           = none,
331                  user               = none,
332                  timeout_sec        = (HandshakeTimeout / 1000),
333                  frame_max          = InitialFrameMax,
334                  vhost              = none,
335                  client_properties  = none,
336                  capabilities       = [],
337                  auth_mechanism     = none,
338                  auth_state         = none,
339                  connected_at       = os:system_time(
340                                         milli_seconds)},
341                callback            = uninitialized_callback,
342                recv_len            = 0,
343                pending_recv        = false,
344                connection_state    = pre_init,
345                queue_collector     = undefined,  %% started on tune-ok
346                helper_sup          = HelperSup,
347                heartbeater         = none,
348                channel_sup_sup_pid = none,
349                channel_count       = 0,
350                throttle            = #throttle{
351                                         last_blocked_at = never,
352                                         should_block = false,
353                                         blocked_by = sets:new(),
354                                         connection_blocked_message_sent = false
355                                         },
356                proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)},
357    try
358        case run({?MODULE, recvloop,
359                  [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
360                                                 State, #v1.stats_timer),
361                                               handshake, 8)]}) of
362            %% connection was closed cleanly by the client
363            #v1{connection = #connection{user  = #user{username = Username},
364                                         vhost = VHost}} ->
365                rabbit_log_connection:info("closing AMQP connection ~p (~s, vhost: '~s', user: '~s')",
366                    [self(), dynamic_connection_name(Name), VHost, Username]);
367            %% just to be more defensive
368            _ ->
369                rabbit_log_connection:info("closing AMQP connection ~p (~s)",
370                    [self(), dynamic_connection_name(Name)])
371            end
372    catch
373        Ex ->
374          log_connection_exception(dynamic_connection_name(Name), Ex)
375    after
376        %% We don't call gen_tcp:close/1 here since it waits for
377        %% pending output to be sent, which results in unnecessary
378        %% delays. We could just terminate - the reader is the
379        %% controlling process and hence its termination will close
380        %% the socket. However, to keep the file_handle_cache
381        %% accounting as accurate as possible we ought to close the
382        %% socket w/o delay before termination.
383        rabbit_net:fast_close(RealSocket),
384        rabbit_networking:unregister_connection(self()),
385        rabbit_core_metrics:connection_closed(self()),
386        ClientProperties = case get(client_properties) of
387            undefined ->
388                [];
389            Properties ->
390                Properties
391        end,
392        EventProperties = [{name, Name},
393                           {pid, self()},
394                           {node, node()},
395                           {client_properties, ClientProperties}],
396        EventProperties1 = case get(connection_user_provided_name) of
397           undefined ->
398               EventProperties;
399           ConnectionUserProvidedName ->
400               [{user_provided_name, ConnectionUserProvidedName} | EventProperties]
401        end,
402        rabbit_event:notify(connection_closed, EventProperties1)
403    end,
404    done.
405
406log_connection_exception(Name, Ex) ->
407    Severity = case Ex of
408                   connection_closed_with_no_data_received -> debug;
409                   {connection_closed_abruptly, _}         -> warning;
410                   connection_closed_abruptly              -> warning;
411                   _                                       -> error
412               end,
413    log_connection_exception(Severity, Name, Ex).
414
415log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) ->
416    %% Long line to avoid extra spaces and line breaks in log
417    log_connection_exception_with_severity(Severity,
418        "closing AMQP connection ~p (~s):~n"
419        "missed heartbeats from client, timeout: ~ps",
420        [self(), Name, TimeoutSec]);
421log_connection_exception(Severity, Name, {connection_closed_abruptly,
422                                          #v1{connection = #connection{user  = #user{username = Username},
423                                                                       vhost = VHost}}}) ->
424    log_connection_exception_with_severity(Severity,
425        "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection",
426        [self(), Name, VHost, Username]);
427%% when client abruptly closes connection before connection.open/authentication/authorization
428%% succeeded, don't log username and vhost as 'none'
429log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) ->
430    log_connection_exception_with_severity(Severity,
431        "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection",
432        [self(), Name]);
433%% failed connection.tune negotiations
434log_connection_exception(Severity, Name, {handshake_error, tuning, _Channel,
435                                          {exit, #amqp_error{explanation = Explanation},
436                                           _Method, _Stacktrace}}) ->
437    log_connection_exception_with_severity(Severity,
438        "closing AMQP connection ~p (~s):~nfailed to negotiate connection parameters: ~s",
439        [self(), Name, Explanation]);
440%% old exception structure
441log_connection_exception(Severity, Name, connection_closed_abruptly) ->
442    log_connection_exception_with_severity(Severity,
443        "closing AMQP connection ~p (~s):~n"
444        "client unexpectedly closed TCP connection",
445        [self(), Name]);
446log_connection_exception(Severity, Name, Ex) ->
447    log_connection_exception_with_severity(Severity,
448        "closing AMQP connection ~p (~s):~n~p",
449        [self(), Name, Ex]).
450
451log_connection_exception_with_severity(Severity, Fmt, Args) ->
452    case Severity of
453        debug   -> rabbit_log_connection:debug(Fmt, Args);
454        warning -> rabbit_log_connection:warning(Fmt, Args);
455        error   -> rabbit_log_connection:error(Fmt, Args)
456    end.
457
458run({M, F, A}) ->
459    try apply(M, F, A)
460    catch {become, MFA} -> run(MFA)
461    end.
462
463recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
464    mainloop(Deb, Buf, BufLen, State);
465recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
466    mainloop(Deb, Buf, BufLen, State);
467recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
468    throw({become, F(Deb, Buf, BufLen, State)});
469recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
470  when BufLen < RecvLen ->
471    case rabbit_net:setopts(Sock, [{active, once}]) of
472        ok              -> mainloop(Deb, Buf, BufLen,
473                                    State#v1{pending_recv = true});
474        {error, Reason} -> stop(Reason, State)
475    end;
476recvloop(Deb, [B], _BufLen, State) ->
477    {Rest, State1} = handle_input(State#v1.callback, B, State),
478    recvloop(Deb, [Rest], size(Rest), State1);
479recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
480    {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
481    Data = list_to_binary(lists:reverse(DataLRev)),
482    {<<>>, State1} = handle_input(State#v1.callback, Data, State),
483    recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
484
485binlist_split(0, L, Acc) ->
486    {L, Acc};
487binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
488    {H, T} = split_binary(Acc0, -Len),
489    {[H|L], [T|Acc]};
490binlist_split(Len, [H|T], Acc) ->
491    binlist_split(Len - size(H), T, [H|Acc]).
492
493-spec mainloop(_,[binary()], non_neg_integer(), #v1{}) -> any().
494
495mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
496                                       connection_state = CS,
497                                       connection = #connection{
498                                         name = ConnName}}) ->
499    Recv = rabbit_net:recv(Sock),
500    case CS of
501        pre_init when Buf =:= [] ->
502            %% We only log incoming connections when either the
503            %% first byte was received or there was an error (eg. a
504            %% timeout).
505            %%
506            %% The goal is to not log TCP healthchecks (a connection
507            %% with no data received) unless specified otherwise.
508            Fmt = "accepting AMQP connection ~p (~s)",
509            Args = [self(), ConnName],
510            case Recv of
511                closed -> rabbit_log_connection:debug(Fmt, Args);
512                _      -> rabbit_log_connection:info(Fmt, Args)
513            end;
514        _ ->
515            ok
516    end,
517    case Recv of
518        {data, Data} ->
519            recvloop(Deb, [Data | Buf], BufLen + size(Data),
520                     State#v1{pending_recv = false});
521        closed when State#v1.connection_state =:= closed ->
522            State;
523        closed when CS =:= pre_init andalso Buf =:= [] ->
524            stop(tcp_healthcheck, State);
525        closed ->
526            stop(closed, State);
527        {other, {heartbeat_send_error, Reason}} ->
528            %% The only portable way to detect disconnect on blocked
529            %% connection is to wait for heartbeat send failure.
530            stop(Reason, State);
531        {error, Reason} ->
532            stop(Reason, State);
533        {other, {system, From, Request}} ->
534            sys:handle_system_msg(Request, From, State#v1.parent,
535                                  ?MODULE, Deb, {Buf, BufLen, State});
536        {other, Other}  ->
537            case handle_other(Other, State) of
538                stop     -> State;
539                NewState -> recvloop(Deb, Buf, BufLen, NewState)
540            end
541    end.
542
543-spec stop(_, #v1{}) -> no_return().
544stop(tcp_healthcheck, State) ->
545    %% The connection was closed before any packet was received. It's
546    %% probably a load-balancer healthcheck: don't consider this a
547    %% failure.
548    maybe_emit_stats(State),
549    throw(connection_closed_with_no_data_received);
550stop(closed, State) ->
551    maybe_emit_stats(State),
552    throw({connection_closed_abruptly, State});
553stop(Reason, State) ->
554    maybe_emit_stats(State),
555    throw({inet_error, Reason}).
556
557handle_other({conserve_resources, Source, Conserve},
558             State = #v1{throttle = Throttle = #throttle{blocked_by = Blockers}}) ->
559  Resource  = {resource, Source},
560  Blockers1 = case Conserve of
561              true  -> sets:add_element(Resource, Blockers);
562              false -> sets:del_element(Resource, Blockers)
563          end,
564    control_throttle(State#v1{throttle = Throttle#throttle{blocked_by = Blockers1}});
565handle_other({channel_closing, ChPid}, State) ->
566    ok = rabbit_channel:ready_for_close(ChPid),
567    {_, State1} = channel_cleanup(ChPid, State),
568    maybe_close(control_throttle(State1));
569handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) ->
570    %% rabbitmq/rabbitmq-server#544
571    %% The connection port process has exited due to the TCP socket being closed.
572    %% Handle this case in the same manner as receiving {error, closed}
573    stop(closed, State);
574handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
575    Msg = io_lib:format("broker forced connection closure with reason '~w'", [Reason]),
576    terminate(Msg, State),
577    %% this is what we are expected to do according to
578    %% https://www.erlang.org/doc/man/sys.html
579    %%
580    %% If we wanted to be *really* nice we should wait for a while for
581    %% clients to close the socket at their end, just as we do in the
582    %% ordinary error case. However, since this termination is
583    %% initiated by our parent it is probably more important to exit
584    %% quickly.
585    maybe_emit_stats(State),
586    exit(Reason);
587handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
588    maybe_emit_stats(State),
589    throw(E);
590handle_other({channel_exit, Channel, Reason}, State) ->
591    handle_exception(State, Channel, Reason);
592handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
593    handle_dependent_exit(ChPid, Reason, State);
594handle_other(terminate_connection, State) ->
595    maybe_emit_stats(State),
596    stop;
597handle_other(handshake_timeout, State)
598  when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
599    State;
600handle_other(handshake_timeout, State) ->
601    maybe_emit_stats(State),
602    throw({handshake_timeout, State#v1.callback});
603handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
604    State;
605handle_other(heartbeat_timeout,
606             State = #v1{connection = #connection{timeout_sec = T}}) ->
607    maybe_emit_stats(State),
608    throw({heartbeat_timeout, T});
609handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
610    {ForceTermination, NewState} = terminate(Explanation, State),
611    gen_server:reply(From, ok),
612    case ForceTermination of
613        force  -> stop;
614        normal -> NewState
615    end;
616handle_other({'$gen_call', From, info}, State) ->
617    gen_server:reply(From, infos(?INFO_KEYS, State)),
618    State;
619handle_other({'$gen_call', From, {info, Items}}, State) ->
620    gen_server:reply(From, try {ok, infos(Items, State)}
621                           catch Error -> {error, Error}
622                           end),
623    State;
624handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
625  when ?IS_RUNNING(State) ->
626    rabbit_event:notify(
627      connection_created,
628      augment_infos_with_user_provided_connection_name(
629          [{type, network} | infos(?CREATION_EVENT_KEYS, State)], State),
630      Ref),
631    rabbit_event:init_stats_timer(State, #v1.stats_timer);
632handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
633    %% Ignore, we will emit a created event once we start running.
634    State;
635handle_other(ensure_stats, State) ->
636    ensure_stats_timer(State);
637handle_other(emit_stats, State) ->
638    emit_stats(State);
639handle_other({bump_credit, Msg}, State) ->
640    %% Here we are receiving credit by some channel process.
641    credit_flow:handle_bump_msg(Msg),
642    control_throttle(State);
643handle_other(Other, State) ->
644    %% internal error -> something worth dying for
645    maybe_emit_stats(State),
646    exit({unexpected_message, Other}).
647
648switch_callback(State, Callback, Length) ->
649    State#v1{callback = Callback, recv_len = Length}.
650
651terminate(Explanation, State) when ?IS_RUNNING(State) ->
652    {normal, handle_exception(State, 0,
653                              rabbit_misc:amqp_error(
654                                connection_forced, "~s", [Explanation], none))};
655terminate(_Explanation, State) ->
656    {force, State}.
657
658send_blocked(#v1{connection = #connection{protocol     = Protocol,
659                                          capabilities = Capabilities},
660                 sock       = Sock}, Reason) ->
661    case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
662        {bool, true} ->
663
664            ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
665                                  Protocol);
666        _ ->
667            ok
668    end.
669
670send_unblocked(#v1{connection = #connection{protocol     = Protocol,
671                                            capabilities = Capabilities},
672                   sock       = Sock}) ->
673    case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
674        {bool, true} ->
675            ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol);
676        _ ->
677            ok
678    end.
679
680%%--------------------------------------------------------------------------
681%% error handling / termination
682
683close_connection(State = #v1{queue_collector = Collector,
684                             connection = #connection{
685                               timeout_sec = TimeoutSec}}) ->
686    %% The spec says "Exclusive queues may only be accessed by the
687    %% current connection, and are deleted when that connection
688    %% closes."  This does not strictly imply synchrony, but in
689    %% practice it seems to be what people assume.
690    clean_up_exclusive_queues(Collector),
691    %% We terminate the connection after the specified interval, but
692    %% no later than ?CLOSING_TIMEOUT seconds.
693    erlang:send_after((if TimeoutSec > 0 andalso
694                          TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
695                          true                          -> ?CLOSING_TIMEOUT
696                       end) * 1000, self(), terminate_connection),
697    State#v1{connection_state = closed}.
698
699%% queue collector will be undefined when connection
700%% tuning was never performed or didn't finish. In such cases
701%% there's also nothing to clean up.
702clean_up_exclusive_queues(undefined) ->
703    ok;
704
705clean_up_exclusive_queues(Collector) ->
706    rabbit_queue_collector:delete_all(Collector).
707
708handle_dependent_exit(ChPid, Reason, State) ->
709    {Channel, State1} = channel_cleanup(ChPid, State),
710    case {Channel, termination_kind(Reason)} of
711        {undefined,   controlled} -> State1;
712        {undefined, uncontrolled} -> handle_uncontrolled_channel_close(ChPid),
713                                     exit({abnormal_dependent_exit,
714                                           ChPid, Reason});
715        {_,           controlled} -> maybe_close(control_throttle(State1));
716        {_,         uncontrolled} -> handle_uncontrolled_channel_close(ChPid),
717                                     State2 = handle_exception(
718                                                State1, Channel, Reason),
719                                     maybe_close(control_throttle(State2))
720    end.
721
722terminate_channels(#v1{channel_count = 0} = State) ->
723    State;
724terminate_channels(#v1{channel_count = ChannelCount} = State) ->
725    lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
726    Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount,
727    TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
728    wait_for_channel_termination(ChannelCount, TimerRef, State).
729
730wait_for_channel_termination(0, TimerRef, State) ->
731    case erlang:cancel_timer(TimerRef) of
732        false -> receive
733                     cancel_wait -> State
734                 end;
735        _     -> State
736    end;
737wait_for_channel_termination(N, TimerRef,
738                             State = #v1{connection_state = CS,
739                                         connection = #connection{
740                                                         log_name  = ConnName,
741                                                         user      = User,
742                                                         vhost     = VHost},
743                                         sock = Sock}) ->
744    receive
745        {'DOWN', _MRef, process, ChPid, Reason} ->
746            {Channel, State1} = channel_cleanup(ChPid, State),
747            case {Channel, termination_kind(Reason)} of
748                {undefined,    _} ->
749                    exit({abnormal_dependent_exit, ChPid, Reason});
750                {_,   controlled} ->
751                    wait_for_channel_termination(N-1, TimerRef, State1);
752                {_, uncontrolled} ->
753                    rabbit_log_connection:error(
754                        "Error on AMQP connection ~p (~s, vhost: '~s',"
755                        " user: '~s', state: ~p), channel ~p:"
756                        "error while terminating:~n~p",
757                        [self(), ConnName, VHost, User#user.username,
758                         CS, Channel, Reason]),
759                    handle_uncontrolled_channel_close(ChPid),
760                    wait_for_channel_termination(N-1, TimerRef, State1)
761            end;
762        {'EXIT', Sock, _Reason} ->
763            clean_up_all_channels(State),
764            exit(normal);
765        cancel_wait ->
766            exit(channel_termination_timeout)
767    end.
768
769maybe_close(State = #v1{connection_state = closing,
770                        channel_count    = 0,
771                        connection       = #connection{protocol = Protocol},
772                        sock             = Sock}) ->
773    NewState = close_connection(State),
774    ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
775    NewState;
776maybe_close(State) ->
777    State.
778
779termination_kind(normal) -> controlled;
780termination_kind(_)      -> uncontrolled.
781
782format_hard_error(#amqp_error{name = N, explanation = E, method = M}) ->
783    io_lib:format("operation ~s caused a connection exception ~s: ~p", [M, N, E]);
784format_hard_error(Reason) ->
785    case io_lib:deep_char_list(Reason) of
786        true  -> Reason;
787        false -> rabbit_misc:format("~p", [Reason])
788    end.
789
790log_hard_error(#v1{connection_state = CS,
791                   connection = #connection{
792                                   log_name  = ConnName,
793                                   user  = User,
794                                   vhost = VHost}}, Channel, Reason) ->
795    rabbit_log_connection:error(
796        "Error on AMQP connection ~p (~s, vhost: '~s',"
797        " user: '~s', state: ~p), channel ~p:~n ~s",
798        [self(), ConnName, VHost, User#user.username, CS, Channel, format_hard_error(Reason)]).
799
800handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
801    log_hard_error(State, Channel, Reason),
802    State;
803handle_exception(State = #v1{connection = #connection{protocol = Protocol},
804                             connection_state = CS},
805                 Channel, Reason)
806  when ?IS_RUNNING(State) orelse CS =:= closing ->
807    respond_and_close(State, Channel, Protocol, Reason, Reason);
808%% authentication failure
809handle_exception(State = #v1{connection = #connection{protocol = Protocol,
810                                                      log_name = ConnName,
811                                                      capabilities = Capabilities},
812                             connection_state = starting},
813                 Channel, Reason = #amqp_error{name = access_refused,
814                                               explanation = ErrMsg}) ->
815    rabbit_log_connection:error(
816        "Error on AMQP connection ~p (~s, state: ~p):~n~s",
817        [self(), ConnName, starting, ErrMsg]),
818    %% respect authentication failure notification capability
819    case rabbit_misc:table_lookup(Capabilities,
820                                  <<"authentication_failure_close">>) of
821        {bool, true} ->
822            send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
823        _ ->
824            close_connection(terminate_channels(State))
825    end;
826%% when loopback-only user tries to connect from a non-local host
827%% when user tries to access a vhost it has no permissions for
828handle_exception(State = #v1{connection = #connection{protocol = Protocol,
829                                                      log_name = ConnName,
830                                                      user = User},
831                             connection_state = opening},
832                 Channel, Reason = #amqp_error{name = not_allowed,
833                                               explanation = ErrMsg}) ->
834    rabbit_log_connection:error(
835        "Error on AMQP connection ~p (~s, user: '~s', state: ~p):~n~s",
836        [self(), ConnName, User#user.username, opening, ErrMsg]),
837    send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
838handle_exception(State = #v1{connection = #connection{protocol = Protocol},
839                             connection_state = CS = opening},
840                 Channel, Reason = #amqp_error{}) ->
841    respond_and_close(State, Channel, Protocol, Reason,
842                      {handshake_error, CS, Reason});
843%% when negotiation fails, e.g. due to channel_max being higher than the
844%% maximum allowed limit
845handle_exception(State = #v1{connection = #connection{protocol = Protocol,
846                                                      log_name = ConnName,
847                                                      user = User},
848                             connection_state = tuning},
849                 Channel, Reason = #amqp_error{name = not_allowed,
850                                               explanation = ErrMsg}) ->
851    rabbit_log_connection:error(
852        "Error on AMQP connection ~p (~s,"
853        " user: '~s', state: ~p):~n~s",
854        [self(), ConnName, User#user.username, tuning, ErrMsg]),
855    send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
856handle_exception(State, Channel, Reason) ->
857    %% We don't trust the client at this point - force them to wait
858    %% for a bit so they can't DOS us with repeated failed logins etc.
859    timer:sleep(?SILENT_CLOSE_DELAY * 1000),
860    throw({handshake_error, State#v1.connection_state, Channel, Reason}).
861
862%% we've "lost sync" with the client and hence must not accept any
863%% more input
864-spec fatal_frame_error(_, _, _, _, _) -> no_return().
865fatal_frame_error(Error, Type, Channel, Payload, State) ->
866    frame_error(Error, Type, Channel, Payload, State),
867    %% grace period to allow transmission of error
868    timer:sleep(?SILENT_CLOSE_DELAY * 1000),
869    throw(fatal_frame_error).
870
871frame_error(Error, Type, Channel, Payload, State) ->
872    {Str, Bin} = payload_snippet(Payload),
873    handle_exception(State, Channel,
874                     rabbit_misc:amqp_error(frame_error,
875                                            "type ~p, ~s octets = ~p: ~p",
876                                            [Type, Str, Bin, Error], none)).
877
878unexpected_frame(Type, Channel, Payload, State) ->
879    {Str, Bin} = payload_snippet(Payload),
880    handle_exception(State, Channel,
881                     rabbit_misc:amqp_error(unexpected_frame,
882                                            "type ~p, ~s octets = ~p",
883                                            [Type, Str, Bin], none)).
884
885payload_snippet(Payload) when size(Payload) =< 16 ->
886    {"all", Payload};
887payload_snippet(<<Snippet:16/binary, _/binary>>) ->
888    {"first 16", Snippet}.
889
890%%--------------------------------------------------------------------------
891
892create_channel(_Channel,
893               #v1{channel_count = ChannelCount,
894                   connection    = #connection{channel_max = ChannelMax}})
895  when ChannelMax /= 0 andalso ChannelCount >= ChannelMax ->
896    {error, rabbit_misc:amqp_error(
897              not_allowed, "number of channels opened (~w) has reached the "
898              "negotiated channel_max (~w)",
899              [ChannelCount, ChannelMax], 'none')};
900create_channel(Channel,
901               #v1{sock                = Sock,
902                   queue_collector     = Collector,
903                   channel_sup_sup_pid = ChanSupSup,
904                   channel_count       = ChannelCount,
905                   connection =
906                       #connection{name         = Name,
907                                   protocol     = Protocol,
908                                   frame_max    = FrameMax,
909                                   vhost        = VHost,
910                                   capabilities = Capabilities,
911                                   user = #user{username = Username} = User}
912                   } = State) ->
913    case rabbit_auth_backend_internal:is_over_channel_limit(Username) of
914        false ->
915            {ok, _ChSupPid, {ChPid, AState}} =
916                rabbit_channel_sup_sup:start_channel(
917                  ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
918                               Protocol, User, VHost, Capabilities,
919                               Collector}),
920            MRef = erlang:monitor(process, ChPid),
921            put({ch_pid, ChPid}, {Channel, MRef}),
922            put({channel, Channel}, {ChPid, AState}),
923            {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}};
924        {true, Limit} ->
925            {error, rabbit_misc:amqp_error(not_allowed,
926                        "number of channels opened for user '~s' has reached "
927                        "the maximum allowed user limit of (~w)",
928                        [Username, Limit], 'none')}
929    end.
930
931channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
932    case get({ch_pid, ChPid}) of
933        undefined       -> {undefined, State};
934        {Channel, MRef} -> credit_flow:peer_down(ChPid),
935                           erase({channel, Channel}),
936                           erase({ch_pid, ChPid}),
937                           erlang:demonitor(MRef, [flush]),
938                           {Channel, State#v1{channel_count = ChannelCount - 1}}
939    end.
940
941all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
942
943clean_up_all_channels(State) ->
944    CleanupFun = fun(ChPid) ->
945                    channel_cleanup(ChPid, State)
946                 end,
947    lists:foreach(CleanupFun, all_channels()).
948
949%%--------------------------------------------------------------------------
950
951handle_frame(Type, 0, Payload,
952             State = #v1{connection = #connection{protocol = Protocol}})
953  when ?IS_STOPPING(State) ->
954    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
955        {method, MethodName, FieldsBin} ->
956            handle_method0(MethodName, FieldsBin, State);
957        _Other -> State
958    end;
959handle_frame(Type, 0, Payload,
960             State = #v1{connection = #connection{protocol = Protocol}}) ->
961    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
962        error     -> frame_error(unknown_frame, Type, 0, Payload, State);
963        heartbeat -> State;
964        {method, MethodName, FieldsBin} ->
965            handle_method0(MethodName, FieldsBin, State);
966        _Other    -> unexpected_frame(Type, 0, Payload, State)
967    end;
968handle_frame(Type, Channel, Payload,
969             State = #v1{connection = #connection{protocol = Protocol}})
970  when ?IS_RUNNING(State) ->
971    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
972        error     -> frame_error(unknown_frame, Type, Channel, Payload, State);
973        heartbeat -> unexpected_frame(Type, Channel, Payload, State);
974        Frame     -> process_frame(Frame, Channel, State)
975    end;
976handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
977    State;
978handle_frame(Type, Channel, Payload, State) ->
979    unexpected_frame(Type, Channel, Payload, State).
980
981process_frame(Frame, Channel, State) ->
982    ChKey = {channel, Channel},
983    case (case get(ChKey) of
984              undefined -> create_channel(Channel, State);
985              Other     -> {ok, Other, State}
986          end) of
987        {error, Error} ->
988            handle_exception(State, Channel, Error);
989        {ok, {ChPid, AState}, State1} ->
990            case rabbit_command_assembler:process(Frame, AState) of
991                {ok, NewAState} ->
992                    put(ChKey, {ChPid, NewAState}),
993                    post_process_frame(Frame, ChPid, State1);
994                {ok, Method, NewAState} ->
995                    rabbit_channel:do(ChPid, Method),
996                    put(ChKey, {ChPid, NewAState}),
997                    post_process_frame(Frame, ChPid, State1);
998                {ok, Method, Content, NewAState} ->
999                    rabbit_channel:do_flow(ChPid, Method, Content),
1000                    put(ChKey, {ChPid, NewAState}),
1001                    post_process_frame(Frame, ChPid, control_throttle(State1));
1002                {error, Reason} ->
1003                    handle_exception(State1, Channel, Reason)
1004            end
1005    end.
1006
1007post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
1008    {_, State1} = channel_cleanup(ChPid, State),
1009    %% This is not strictly necessary, but more obviously
1010    %% correct. Also note that we do not need to call maybe_close/1
1011    %% since we cannot possibly be in the 'closing' state.
1012    control_throttle(State1);
1013post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
1014    publish_received(State);
1015post_process_frame({content_body, _}, _ChPid, State) ->
1016    publish_received(State);
1017post_process_frame(_Frame, _ChPid, State) ->
1018    State.
1019
1020%%--------------------------------------------------------------------------
1021
1022%% We allow clients to exceed the frame size a little bit since quite
1023%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
1024-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
1025
1026handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
1027             State = #v1{connection = #connection{frame_max = FrameMax}})
1028  when FrameMax /= 0 andalso
1029       PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
1030    fatal_frame_error(
1031      {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
1032      Type, Channel, <<>>, State);
1033handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
1034                             Payload:PayloadSize/binary, ?FRAME_END,
1035                             Rest/binary>>,
1036             State) ->
1037    {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
1038handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
1039             State) ->
1040    {Rest, ensure_stats_timer(
1041             switch_callback(State,
1042                             {frame_payload, Type, Channel, PayloadSize},
1043                             PayloadSize + 1))};
1044handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
1045    <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
1046    case EndMarker of
1047        ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
1048                      {Rest, switch_callback(State1, frame_header, 7)};
1049        _          -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
1050                                        Type, Channel, Payload, State)
1051    end;
1052handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
1053    {Rest, handshake({A, B, C, D}, State)};
1054handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
1055    refuse_connection(Sock, {bad_header, Other});
1056handle_input(Callback, Data, _State) ->
1057    throw({bad_input, Callback, Data}).
1058
1059%% The two rules pertaining to version negotiation:
1060%%
1061%% * If the server cannot support the protocol specified in the
1062%% protocol header, it MUST respond with a valid protocol header and
1063%% then close the socket connection.
1064%%
1065%% * The server MUST provide a protocol version that is lower than or
1066%% equal to that requested by the client in the protocol header.
1067handshake({0, 0, 9, 1}, State) ->
1068    start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
1069
1070%% This is the protocol header for 0-9, which we can safely treat as
1071%% though it were 0-9-1.
1072handshake({1, 1, 0, 9}, State) ->
1073    start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
1074
1075%% This is what most clients send for 0-8.  The 0-8 spec, confusingly,
1076%% defines the version as 8-0.
1077handshake({1, 1, 8, 0}, State) ->
1078    start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
1079
1080%% The 0-8 spec as on the AMQP web site actually has this as the
1081%% protocol header; some libraries e.g., py-amqplib, send it when they
1082%% want 0-8.
1083handshake({1, 1, 9, 1}, State) ->
1084    start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
1085
1086%% ... and finally, the 1.0 spec is crystal clear!
1087handshake({Id, 1, 0, 0}, State) ->
1088    become_1_0(Id, State);
1089
1090handshake(Vsn, #v1{sock = Sock}) ->
1091    refuse_connection(Sock, {bad_version, Vsn}).
1092
1093%% Offer a protocol version to the client.  Connection.start only
1094%% includes a major and minor version number, Luckily 0-9 and 0-9-1
1095%% are similar enough that clients will be happy with either.
1096start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
1097                 Protocol,
1098                 State = #v1{sock = Sock, connection = Connection}) ->
1099    rabbit_networking:register_connection(self()),
1100    Start = #'connection.start'{
1101      version_major = ProtocolMajor,
1102      version_minor = ProtocolMinor,
1103      server_properties = server_properties(Protocol),
1104      mechanisms = auth_mechanisms_binary(Sock),
1105      locales = <<"en_US">> },
1106    ok = send_on_channel0(Sock, Start, Protocol),
1107    switch_callback(State#v1{connection = Connection#connection{
1108                                            timeout_sec = ?NORMAL_TIMEOUT,
1109                                            protocol = Protocol},
1110                             connection_state = starting},
1111                    frame_header, 7).
1112
1113-spec refuse_connection(_, _, _) -> no_return().
1114refuse_connection(Sock, Exception, {A, B, C, D}) ->
1115    ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
1116    throw(Exception).
1117
1118-spec refuse_connection(rabbit_net:socket(), any()) -> no_return().
1119
1120refuse_connection(Sock, Exception) ->
1121    refuse_connection(Sock, Exception, {0, 0, 9, 1}).
1122
1123ensure_stats_timer(State = #v1{connection_state = running}) ->
1124    rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
1125ensure_stats_timer(State) ->
1126    State.
1127
1128%%--------------------------------------------------------------------------
1129
1130handle_method0(MethodName, FieldsBin,
1131               State = #v1{connection = #connection{protocol = Protocol}}) ->
1132    try
1133        handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
1134                       State)
1135    catch throw:{inet_error, E} when E =:= closed; E =:= enotconn ->
1136            maybe_emit_stats(State),
1137            throw({connection_closed_abruptly, State});
1138          exit:#amqp_error{method = none} = Reason ->
1139            handle_exception(State, 0, Reason#amqp_error{method = MethodName});
1140          Type:Reason:Stacktrace ->
1141            handle_exception(State, 0, {Type, Reason, MethodName, Stacktrace})
1142    end.
1143
1144handle_method0(#'connection.start_ok'{mechanism = Mechanism,
1145                                      response = Response,
1146                                      client_properties = ClientProperties},
1147               State0 = #v1{connection_state = starting,
1148                            connection       = Connection0,
1149                            sock             = Sock}) ->
1150    AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
1151    Capabilities =
1152        case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
1153            {table, Capabilities1} -> Capabilities1;
1154            _                      -> []
1155        end,
1156    Connection1 = Connection0#connection{
1157                    client_properties = ClientProperties,
1158                    capabilities      = Capabilities,
1159                    auth_mechanism    = {Mechanism, AuthMechanism},
1160                    auth_state        = AuthMechanism:init(Sock)},
1161    Connection2 = augment_connection_log_name(Connection1),
1162    State = State0#v1{connection_state = securing,
1163                      connection       = Connection2},
1164    % adding client properties to process dictionary to send them later
1165    % in the connection_closed event
1166    put(client_properties, ClientProperties),
1167    case user_provided_connection_name(Connection2) of
1168        undefined ->
1169            undefined;
1170        UserProvidedConnectionName ->
1171            put(connection_user_provided_name, UserProvidedConnectionName)
1172    end,
1173    auth_phase(Response, State);
1174
1175handle_method0(#'connection.secure_ok'{response = Response},
1176               State = #v1{connection_state = securing}) ->
1177    auth_phase(Response, State);
1178
1179handle_method0(#'connection.tune_ok'{frame_max   = FrameMax,
1180                                     channel_max = ChannelMax,
1181                                     heartbeat   = ClientHeartbeat},
1182               State = #v1{connection_state = tuning,
1183                           connection = Connection,
1184                           helper_sup = SupPid,
1185                           sock = Sock}) ->
1186    ok = validate_negotiated_integer_value(
1187           frame_max,   ?FRAME_MIN_SIZE, FrameMax),
1188    ok = validate_negotiated_integer_value(
1189           channel_max, ?CHANNEL_MIN,    ChannelMax),
1190    {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(
1191                        SupPid, Connection#connection.name),
1192    Frame = rabbit_binary_generator:build_heartbeat_frame(),
1193    Parent = self(),
1194    SendFun =
1195        fun() ->
1196                case catch rabbit_net:send(Sock, Frame) of
1197                    ok ->
1198                        ok;
1199                    {error, Reason} ->
1200                        Parent ! {heartbeat_send_error, Reason};
1201                    Unexpected ->
1202                        Parent ! {heartbeat_send_error, Unexpected}
1203                end,
1204                ok
1205        end,
1206    ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
1207    Heartbeater = rabbit_heartbeat:start(
1208                    SupPid, Sock, Connection#connection.name,
1209                    ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
1210    State#v1{connection_state = opening,
1211             connection = Connection#connection{
1212                            frame_max   = FrameMax,
1213                            channel_max = ChannelMax,
1214                            timeout_sec = ClientHeartbeat},
1215             queue_collector = Collector,
1216             heartbeater = Heartbeater};
1217
1218handle_method0(#'connection.open'{virtual_host = VHost},
1219               State = #v1{connection_state = opening,
1220                           connection       = Connection = #connection{
1221                                                log_name = ConnName,
1222                                                user = User = #user{username = Username},
1223                                                protocol = Protocol},
1224                           helper_sup       = SupPid,
1225                           sock             = Sock,
1226                           throttle         = Throttle}) ->
1227
1228    ok = is_over_vhost_connection_limit(VHost, User),
1229    ok = is_over_user_connection_limit(User),
1230    ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}),
1231    ok = is_vhost_alive(VHost, User),
1232    NewConnection = Connection#connection{vhost = VHost},
1233    ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
1234
1235    Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
1236    BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]),
1237    Throttle1 = Throttle#throttle{blocked_by = BlockedBy},
1238
1239    {ok, ChannelSupSupPid} =
1240        rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
1241    State1 = control_throttle(
1242               State#v1{connection_state    = running,
1243                        connection          = NewConnection,
1244                        channel_sup_sup_pid = ChannelSupSupPid,
1245                        throttle            = Throttle1}),
1246    Infos = augment_infos_with_user_provided_connection_name(
1247        [{type, network} | infos(?CREATION_EVENT_KEYS, State1)],
1248        State1
1249    ),
1250    rabbit_core_metrics:connection_created(proplists:get_value(pid, Infos),
1251                                           Infos),
1252    rabbit_event:notify(connection_created, Infos),
1253    maybe_emit_stats(State1),
1254    rabbit_log_connection:info(
1255        "connection ~p (~s): "
1256        "user '~s' authenticated and granted access to vhost '~s'",
1257        [self(), dynamic_connection_name(ConnName), Username, VHost]),
1258    State1;
1259handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
1260    lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
1261    maybe_close(State#v1{connection_state = closing});
1262handle_method0(#'connection.close'{},
1263               State = #v1{connection = #connection{protocol = Protocol},
1264                           sock = Sock})
1265  when ?IS_STOPPING(State) ->
1266    %% We're already closed or closing, so we don't need to cleanup
1267    %% anything.
1268    ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
1269    State;
1270handle_method0(#'connection.close_ok'{},
1271               State = #v1{connection_state = closed}) ->
1272    self() ! terminate_connection,
1273    State;
1274handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reason},
1275               State = #v1{connection =
1276                               #connection{protocol   = Protocol,
1277                                           user       = User = #user{username = Username},
1278                                           log_name   = ConnName} = Conn,
1279                           sock       = Sock}) when ?IS_RUNNING(State) ->
1280    rabbit_log_connection:debug(
1281        "connection ~p (~s) of user '~s': "
1282        "asked to update secret, reason: ~s",
1283        [self(), dynamic_connection_name(ConnName), Username, Reason]),
1284    case rabbit_access_control:update_state(User, NewSecret) of
1285      {ok, User1} ->
1286        %% User/auth backend state has been updated. Now we can propagate it to channels
1287        %% asynchronously and return. All the channels have to do is to update their
1288        %% own state.
1289        %%
1290        %% Any secret update errors coming from the authz backend will be handled in the other branch.
1291        %% Therefore we optimistically do no error handling here. MK.
1292        lists:foreach(fun(Ch) ->
1293          rabbit_log:debug("Updating user/auth backend state for channel ~p", [Ch]),
1294          _ = rabbit_channel:update_user_state(Ch, User1)
1295        end, all_channels()),
1296        ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol),
1297        rabbit_log_connection:info(
1298            "connection ~p (~s): "
1299            "user '~s' updated secret, reason: ~s",
1300            [self(), dynamic_connection_name(ConnName), Username, Reason]),
1301        State#v1{connection = Conn#connection{user = User1}};
1302      {refused, Message} ->
1303        rabbit_log_connection:error("Secret update was refused for user '~s': ~p",
1304                                    [Username, Message]),
1305        rabbit_misc:protocol_error(not_allowed, "New secret was refused by one of the backends", []);
1306      {error, Message} ->
1307        rabbit_log_connection:error("Secret update for user '~s' failed: ~p",
1308                                    [Username, Message]),
1309        rabbit_misc:protocol_error(not_allowed,
1310                                  "Secret update failed", [])
1311    end;
1312handle_method0(_Method, State) when ?IS_STOPPING(State) ->
1313    State;
1314handle_method0(_Method, #v1{connection_state = S}) ->
1315    rabbit_misc:protocol_error(
1316      channel_error, "unexpected method in connection state ~w", [S]).
1317
1318is_vhost_alive(VHostPath, User) ->
1319    case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of
1320        true  -> ok;
1321        false ->
1322            rabbit_misc:protocol_error(internal_error,
1323                            "access to vhost '~s' refused for user '~s': "
1324                            "vhost '~s' is down",
1325                            [VHostPath, User#user.username, VHostPath])
1326    end.
1327
1328is_over_vhost_connection_limit(VHostPath, User) ->
1329    try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
1330        false         -> ok;
1331        {true, Limit} -> rabbit_misc:protocol_error(not_allowed,
1332                            "access to vhost '~s' refused for user '~s': "
1333                            "connection limit (~p) is reached",
1334                            [VHostPath, User#user.username, Limit])
1335    catch
1336        throw:{error, {no_such_vhost, VHostPath}} ->
1337            rabbit_misc:protocol_error(not_allowed, "vhost ~s not found", [VHostPath])
1338    end.
1339
1340is_over_user_connection_limit(#user{username = Username}) ->
1341    case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
1342        false -> ok;
1343        {true, Limit} -> rabbit_misc:protocol_error(not_allowed,
1344                            "Connection refused for user '~s': "
1345                            "user connection limit (~p) is reached",
1346                            [Username, Limit])
1347    end.
1348
1349validate_negotiated_integer_value(Field, Min, ClientValue) ->
1350    ServerValue = get_env(Field),
1351    if ClientValue /= 0 andalso ClientValue < Min ->
1352            fail_negotiation(Field, min, Min, ClientValue);
1353       ServerValue /= 0 andalso (ClientValue =:= 0 orelse
1354                                 ClientValue > ServerValue) ->
1355            fail_negotiation(Field, max, ServerValue, ClientValue);
1356       true ->
1357            ok
1358    end.
1359
1360%% keep dialyzer happy
1361-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) ->
1362          no_return().
1363fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
1364    {S1, S2} = case MinOrMax of
1365                   min -> {lower,  minimum};
1366                   max -> {higher, maximum}
1367               end,
1368    ClientValueDetail = get_client_value_detail(Field, ClientValue),
1369    rabbit_misc:protocol_error(
1370      not_allowed, "negotiated ~w = ~w~s is ~w than the ~w allowed value (~w)",
1371      [Field, ClientValue, ClientValueDetail, S1, S2, ServerValue], 'connection.tune').
1372
1373get_env(Key) ->
1374    {ok, Value} = application:get_env(rabbit, Key),
1375    Value.
1376
1377send_on_channel0(Sock, Method, Protocol) ->
1378    ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
1379
1380auth_mechanism_to_module(TypeBin, Sock) ->
1381    case rabbit_registry:binary_to_type(TypeBin) of
1382        {error, not_found} ->
1383            rabbit_misc:protocol_error(
1384              command_invalid, "unknown authentication mechanism '~s'",
1385              [TypeBin]);
1386        T ->
1387            case {lists:member(T, auth_mechanisms(Sock)),
1388                  rabbit_registry:lookup_module(auth_mechanism, T)} of
1389                {true, {ok, Module}} ->
1390                    Module;
1391                _ ->
1392                    rabbit_misc:protocol_error(
1393                      command_invalid,
1394                      "invalid authentication mechanism '~s'", [T])
1395            end
1396    end.
1397
1398auth_mechanisms(Sock) ->
1399    {ok, Configured} = application:get_env(auth_mechanisms),
1400    [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
1401             Module:should_offer(Sock), lists:member(Name, Configured)].
1402
1403auth_mechanisms_binary(Sock) ->
1404    list_to_binary(
1405      string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
1406
1407auth_phase(Response,
1408           State = #v1{connection = Connection =
1409                           #connection{protocol       = Protocol,
1410                                       auth_mechanism = {Name, AuthMechanism},
1411                                       auth_state     = AuthState},
1412                       sock = Sock}) ->
1413    rabbit_log:debug("Raw client connection hostname during authN phase: ~p", [Connection#connection.host]),
1414    RemoteAddress = case Connection#connection.host of
1415        %% the hostname was already resolved, e.g. by reverse DNS lookups
1416        Bin when is_binary(Bin) -> Bin;
1417        %% the hostname is an IP address
1418        Tuple when is_tuple(Tuple) ->
1419            rabbit_data_coercion:to_binary(inet:ntoa(Connection#connection.host));
1420        Other -> rabbit_data_coercion:to_binary(Other)
1421    end,
1422    rabbit_log:debug("Resolved client hostname during authN phase: ~s", [RemoteAddress]),
1423    case AuthMechanism:handle_response(Response, AuthState) of
1424        {refused, Username, Msg, Args} ->
1425            rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091),
1426            auth_fail(Username, Msg, Args, Name, State);
1427        {protocol_error, Msg, Args} ->
1428            rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, amqp091),
1429            notify_auth_result(none, user_authentication_failure,
1430                               [{error, rabbit_misc:format(Msg, Args)}],
1431                               State),
1432            rabbit_misc:protocol_error(syntax_error, Msg, Args);
1433        {challenge, Challenge, AuthState1} ->
1434            rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, amqp091),
1435            Secure = #'connection.secure'{challenge = Challenge},
1436            ok = send_on_channel0(Sock, Secure, Protocol),
1437            State#v1{connection = Connection#connection{
1438                                    auth_state = AuthState1}};
1439        {ok, User = #user{username = Username}} ->
1440            case rabbit_access_control:check_user_loopback(Username, Sock) of
1441                ok ->
1442                    rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, amqp091),
1443                    notify_auth_result(Username, user_authentication_success,
1444                                       [], State);
1445                not_allowed ->
1446                    rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091),
1447                    auth_fail(Username, "user '~s' can only connect via "
1448                              "localhost", [Username], Name, State)
1449            end,
1450            Tune = #'connection.tune'{frame_max   = get_env(frame_max),
1451                                      channel_max = get_env(channel_max),
1452                                      heartbeat   = get_env(heartbeat)},
1453            ok = send_on_channel0(Sock, Tune, Protocol),
1454            State#v1{connection_state = tuning,
1455                     connection = Connection#connection{user       = User,
1456                                                        auth_state = none}}
1457    end.
1458
1459-spec auth_fail
1460        (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) ->
1461            no_return().
1462
1463auth_fail(Username, Msg, Args, AuthName,
1464          State = #v1{connection = #connection{protocol     = Protocol,
1465                                               capabilities = Capabilities}}) ->
1466    notify_auth_result(Username, user_authentication_failure,
1467      [{error, rabbit_misc:format(Msg, Args)}], State),
1468    AmqpError = rabbit_misc:amqp_error(
1469                  access_refused, "~s login refused: ~s",
1470                  [AuthName, io_lib:format(Msg, Args)], none),
1471    case rabbit_misc:table_lookup(Capabilities,
1472                                  <<"authentication_failure_close">>) of
1473        {bool, true} ->
1474            SafeMsg = io_lib:format(
1475                        "Login was refused using authentication "
1476                        "mechanism ~s. For details see the broker "
1477                        "logfile.", [AuthName]),
1478            AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg},
1479            {0, CloseMethod} = rabbit_binary_generator:map_exception(
1480                                 0, AmqpError1, Protocol),
1481            ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
1482        _ -> ok
1483    end,
1484    rabbit_misc:protocol_error(AmqpError).
1485
1486notify_auth_result(Username, AuthResult, ExtraProps, State) ->
1487    EventProps = [{connection_type, network},
1488                  {name, case Username of none -> ''; _ -> Username end}] ++
1489                 [case Item of
1490                      name -> {connection_name, i(name, State)};
1491                      _    -> {Item, i(Item, State)}
1492                  end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++
1493                 ExtraProps,
1494    rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
1495
1496%%--------------------------------------------------------------------------
1497
1498infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
1499
1500i(pid,                #v1{}) -> self();
1501i(node,               #v1{}) -> node();
1502i(SockStat,           S) when SockStat =:= recv_oct;
1503                              SockStat =:= recv_cnt;
1504                              SockStat =:= send_oct;
1505                              SockStat =:= send_cnt;
1506                              SockStat =:= send_pend ->
1507    socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
1508                fun ([{_, I}]) -> I end, S);
1509i(ssl, #v1{sock = Sock, proxy_socket = ProxySock}) ->
1510    rabbit_net:proxy_ssl_info(Sock, ProxySock) /= nossl;
1511i(ssl_protocol,       S) -> ssl_info(fun ({P,         _}) -> P end, S);
1512i(ssl_key_exchange,   S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
1513i(ssl_cipher,         S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
1514i(ssl_hash,           S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
1515i(peer_cert_issuer,   S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1,   S);
1516i(peer_cert_subject,  S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1,  S);
1517i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
1518i(channels,           #v1{channel_count = ChannelCount}) -> ChannelCount;
1519i(state, #v1{connection_state = ConnectionState,
1520             throttle         = #throttle{blocked_by = Reasons,
1521                                          last_blocked_at = T} = Throttle}) ->
1522    %% not throttled by resource or other longer-term reasons
1523    %% TODO: come up with a sensible function name
1524    case sets:size(sets:del_element(flow, Reasons)) =:= 0 andalso
1525        (credit_flow:blocked()        %% throttled by flow now
1526         orelse                       %% throttled by flow recently
1527           (is_blocked_by_flow(Throttle) andalso T =/= never andalso
1528            erlang:convert_time_unit(erlang:monotonic_time() - T,
1529                                          native,
1530                                          micro_seconds) < 5000000)) of
1531        true  -> flow;
1532        false ->
1533          case {has_reasons_to_block(Throttle), ConnectionState} of
1534            %% blocked
1535            {_,    blocked} -> blocked;
1536            %% not yet blocked (there were no publishes)
1537            {true, running} -> blocking;
1538            %% not blocked
1539            {false,      _} -> ConnectionState;
1540            %% catch all to be defensive
1541            _               -> ConnectionState
1542          end
1543    end;
1544i(garbage_collection, _State) ->
1545    rabbit_misc:get_gc_info(self());
1546i(reductions, _State) ->
1547    {reductions, Reductions} = erlang:process_info(self(), reductions),
1548    Reductions;
1549i(Item,               #v1{connection = Conn}) -> ic(Item, Conn).
1550
1551ic(name,              #connection{name        = Name})     -> Name;
1552ic(host,              #connection{host        = Host})     -> Host;
1553ic(peer_host,         #connection{peer_host   = PeerHost}) -> PeerHost;
1554ic(port,              #connection{port        = Port})     -> Port;
1555ic(peer_port,         #connection{peer_port   = PeerPort}) -> PeerPort;
1556ic(protocol,          #connection{protocol    = none})     -> none;
1557ic(protocol,          #connection{protocol    = P})        -> P:version();
1558ic(user,              #connection{user        = none})     -> '';
1559ic(user,              #connection{user        = U})        -> U#user.username;
1560ic(user_who_performed_action, C) -> ic(user, C);
1561ic(vhost,             #connection{vhost       = VHost})    -> VHost;
1562ic(timeout,           #connection{timeout_sec = Timeout})  -> Timeout;
1563ic(frame_max,         #connection{frame_max   = FrameMax}) -> FrameMax;
1564ic(channel_max,       #connection{channel_max = ChMax})    -> ChMax;
1565ic(client_properties, #connection{client_properties = CP}) -> CP;
1566ic(auth_mechanism,    #connection{auth_mechanism = none})  -> none;
1567ic(auth_mechanism,    #connection{auth_mechanism = {Name, _Mod}}) -> Name;
1568ic(connected_at,      #connection{connected_at = T}) -> T;
1569ic(Item,              #connection{}) -> throw({bad_argument, Item}).
1570
1571socket_info(Get, Select, #v1{sock = Sock}) ->
1572    case Get(Sock) of
1573        {ok,    T} -> case Select(T) of
1574                          N when is_number(N) -> N;
1575                          _ -> 0
1576                      end;
1577        {error, _} -> 0
1578    end.
1579
1580ssl_info(F, #v1{sock = Sock, proxy_socket = ProxySock}) ->
1581    case rabbit_net:proxy_ssl_info(Sock, ProxySock) of
1582        nossl       -> '';
1583        {error, _}  -> '';
1584        {ok, Items} ->
1585            P = proplists:get_value(protocol, Items),
1586            #{cipher := C,
1587              key_exchange := K,
1588              mac := H} = proplists:get_value(selected_cipher_suite, Items),
1589            F({P, {K, C, H}})
1590    end.
1591
1592cert_info(F, #v1{sock = Sock}) ->
1593    case rabbit_net:peercert(Sock) of
1594        nossl      -> '';
1595        {error, _} -> '';
1596        {ok, Cert} -> list_to_binary(F(Cert))
1597    end.
1598
1599maybe_emit_stats(State) ->
1600    rabbit_event:if_enabled(State, #v1.stats_timer,
1601                            fun() -> emit_stats(State) end).
1602
1603emit_stats(State) ->
1604    [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I
1605      = infos(?SIMPLE_METRICS, State),
1606    Infos = infos(?OTHER_METRICS, State),
1607    rabbit_core_metrics:connection_stats(Pid, Infos),
1608    rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
1609    rabbit_event:notify(connection_stats, Infos ++ I),
1610    State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
1611    ensure_stats_timer(State1).
1612
1613%% 1.0 stub
1614-spec become_1_0(non_neg_integer(), #v1{}) -> no_return().
1615
1616become_1_0(Id, State = #v1{sock = Sock}) ->
1617    case code:is_loaded(rabbit_amqp1_0_reader) of
1618        false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled);
1619        _     -> Mode = case Id of
1620                            0 -> amqp;
1621                            3 -> sasl;
1622                            _ -> refuse_connection(
1623                                   Sock, {unsupported_amqp1_0_protocol_id, Id},
1624                                   {3, 1, 0, 0})
1625                        end,
1626                 F = fun (_Deb, Buf, BufLen, S) ->
1627                             {rabbit_amqp1_0_reader, init,
1628                              [Mode, pack_for_1_0(Buf, BufLen, S)]}
1629                     end,
1630                 State#v1{connection_state = {become, F}}
1631    end.
1632
1633pack_for_1_0(Buf, BufLen, #v1{parent       = Parent,
1634                              sock         = Sock,
1635                              recv_len     = RecvLen,
1636                              pending_recv = PendingRecv,
1637                              helper_sup   = SupPid,
1638                              proxy_socket = ProxySocket}) ->
1639    {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen, ProxySocket}.
1640
1641respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
1642    log_hard_error(State, Channel, LogErr),
1643    send_error_on_channel0_and_close(Channel, Protocol, Reason, State).
1644
1645send_error_on_channel0_and_close(Channel, Protocol, Reason, State) ->
1646    {0, CloseMethod} =
1647        rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
1648    State1 = close_connection(terminate_channels(State)),
1649    ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol),
1650    State1.
1651
1652%%
1653%% Publisher throttling
1654%%
1655
1656blocked_by_message(#throttle{blocked_by = Reasons}) ->
1657  %% we don't want to report internal flow as a reason here since
1658  %% it is entirely transient
1659  Reasons1 = sets:del_element(flow, Reasons),
1660  RStr = string:join([format_blocked_by(R) || R <- sets:to_list(Reasons1)], " & "),
1661  list_to_binary(rabbit_misc:format("low on ~s", [RStr])).
1662
1663format_blocked_by({resource, memory}) -> "memory";
1664format_blocked_by({resource, disk})   -> "disk";
1665format_blocked_by({resource, disc})   -> "disk".
1666
1667update_last_blocked_at(Throttle) ->
1668    Throttle#throttle{last_blocked_at = erlang:monotonic_time()}.
1669
1670connection_blocked_message_sent(
1671    #throttle{connection_blocked_message_sent = BS}) -> BS.
1672
1673should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) ->
1674    should_block(Throttle)
1675    andalso
1676    sets:size(sets:del_element(flow, Reasons)) =/= 0
1677    andalso
1678    not connection_blocked_message_sent(Throttle).
1679
1680should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) ->
1681    connection_blocked_message_sent(Throttle)
1682    andalso
1683    sets:size(sets:del_element(flow, Reasons)) == 0.
1684
1685%% Returns true if we have a reason to block
1686%% this connection.
1687has_reasons_to_block(#throttle{blocked_by = Reasons}) ->
1688    sets:size(Reasons) > 0.
1689
1690is_blocked_by_flow(#throttle{blocked_by = Reasons}) ->
1691    sets:is_element(flow, Reasons).
1692
1693should_block(#throttle{should_block = Val}) -> Val.
1694
1695should_block_connection(Throttle) ->
1696    should_block(Throttle) andalso has_reasons_to_block(Throttle).
1697
1698should_unblock_connection(Throttle) ->
1699    not should_block_connection(Throttle).
1700
1701maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) ->
1702    case should_block_connection(Throttle) of
1703        true ->
1704            State1 = State#v1{connection_state = blocked,
1705                              throttle = update_last_blocked_at(Throttle)},
1706            case CS of
1707                running ->
1708                    ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater);
1709                _       -> ok
1710            end,
1711            maybe_send_blocked_or_unblocked(State1);
1712        false -> State
1713    end.
1714
1715maybe_unblock(State = #v1{throttle = Throttle}) ->
1716    case should_unblock_connection(Throttle) of
1717        true ->
1718            ok = rabbit_heartbeat:resume_monitor(State#v1.heartbeater),
1719            State1 = State#v1{connection_state = running,
1720                              throttle = Throttle#throttle{should_block = false}},
1721            maybe_send_unblocked(State1);
1722        false -> State
1723    end.
1724
1725maybe_send_unblocked(State = #v1{throttle = Throttle}) ->
1726    case should_send_unblocked(Throttle) of
1727        true ->
1728            ok = send_unblocked(State),
1729            State#v1{throttle =
1730                Throttle#throttle{connection_blocked_message_sent = false}};
1731        false -> State
1732    end.
1733
1734maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) ->
1735    case should_send_blocked(Throttle) of
1736        true ->
1737            ok = send_blocked(State, blocked_by_message(Throttle)),
1738            State#v1{throttle =
1739                Throttle#throttle{connection_blocked_message_sent = true}};
1740        false -> maybe_send_unblocked(State)
1741    end.
1742
1743publish_received(State = #v1{throttle = Throttle}) ->
1744    case has_reasons_to_block(Throttle) of
1745      false -> State;
1746      true  ->
1747        Throttle1 = Throttle#throttle{should_block = true},
1748        maybe_block(State#v1{throttle = Throttle1})
1749    end.
1750
1751control_throttle(State = #v1{connection_state = CS,
1752                             throttle = #throttle{blocked_by = Reasons} = Throttle}) ->
1753    Throttle1 = case credit_flow:blocked() of
1754                  true  ->
1755                    Throttle#throttle{blocked_by = sets:add_element(flow, Reasons)};
1756                  false ->
1757                    Throttle#throttle{blocked_by = sets:del_element(flow, Reasons)}
1758             end,
1759    State1 = State#v1{throttle = Throttle1},
1760    case CS of
1761        running -> maybe_block(State1);
1762        %% unblock or re-enable blocking
1763        blocked -> maybe_block(maybe_unblock(State1));
1764        _       -> State1
1765    end.
1766
1767augment_connection_log_name(#connection{name = Name} = Connection) ->
1768    case user_provided_connection_name(Connection) of
1769        undefined ->
1770            Connection;
1771        UserSpecifiedName ->
1772            LogName = <<Name/binary, " - ", UserSpecifiedName/binary>>,
1773            rabbit_log_connection:info("Connection ~p (~s) has a client-provided name: ~s", [self(), Name, UserSpecifiedName]),
1774            ?store_proc_name(LogName),
1775            Connection#connection{log_name = LogName}
1776    end.
1777
1778augment_infos_with_user_provided_connection_name(Infos, #v1{connection = Connection}) ->
1779    case user_provided_connection_name(Connection) of
1780     undefined ->
1781         Infos;
1782     UserProvidedConnectionName ->
1783         [{user_provided_name, UserProvidedConnectionName} | Infos]
1784    end.
1785
1786user_provided_connection_name(#connection{client_properties = ClientProperties}) ->
1787    case rabbit_misc:table_lookup(ClientProperties, <<"connection_name">>) of
1788        {longstr, UserSpecifiedName} ->
1789            UserSpecifiedName;
1790        _ ->
1791            undefined
1792    end.
1793
1794dynamic_connection_name(Default) ->
1795    case rabbit_misc:get_proc_name() of
1796        {ok, Name} ->
1797            Name;
1798        _ ->
1799            Default
1800    end.
1801
1802handle_uncontrolled_channel_close(ChPid) ->
1803    rabbit_core_metrics:channel_closed(ChPid),
1804    rabbit_event:notify(channel_closed, [{pid, ChPid}]).
1805
1806-spec get_client_value_detail(atom(), integer()) -> string().
1807get_client_value_detail(channel_max, 0) ->
1808    " (no limit)";
1809get_client_value_detail(_Field, _ClientValue) ->
1810    "".
1811