1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_logger_exchange_h).
9
10-include_lib("kernel/include/logger.hrl").
11
12-include_lib("rabbit_common/include/rabbit.hrl").
13-include_lib("rabbit_common/include/rabbit_framing.hrl").
14-include_lib("rabbit_common/include/logging.hrl").
15
16%% logger callbacks
17-export([log/2, adding_handler/1, removing_handler/1, changing_config/3,
18         filter_config/1]).
19
20-define(DECL_EXCHANGE_INTERVAL_SECS, 5).
21-define(LOG_EXCH_NAME, <<"amq.rabbitmq.log">>).
22-define(DEFAULT_FORMATTER, logger_formatter).
23-define(DEFAULT_FORMATTER_CONFIG, #{}).
24
25%% -------------------------------------------------------------------
26%% Logger handler callbacks.
27%% -------------------------------------------------------------------
28
29adding_handler(Config) ->
30    Config1 = start_setup_proc(Config),
31    {ok, Config1}.
32
33changing_config(_SetOrUpdate, OldConfig, _NewConfig) ->
34    {ok, OldConfig}.
35
36filter_config(Config) ->
37    Config.
38
39log(#{meta := #{mfa := {?MODULE, _, _}}}, _) ->
40    ok;
41log(LogEvent, Config) ->
42    case rabbit_boot_state:get() of
43        ready -> do_log(LogEvent, Config);
44        _     -> ok
45    end.
46
47do_log(LogEvent, #{config := #{exchange := Exchange}} = Config) ->
48    RoutingKey = make_routing_key(LogEvent, Config),
49    AmqpMsg = log_event_to_amqp_msg(LogEvent, Config),
50    Body = try_format_body(LogEvent, Config),
51    case rabbit_basic:publish(Exchange, RoutingKey, AmqpMsg, Body) of
52        ok                 -> ok;
53        {error, not_found} -> ok
54    end.
55
56removing_handler(Config) ->
57    unconfigure_exchange(Config),
58    ok.
59
60%% -------------------------------------------------------------------
61%% Internal functions.
62%% -------------------------------------------------------------------
63
64log_event_to_amqp_msg(LogEvent, Config) ->
65    ContentType = guess_content_type(Config),
66    Timestamp = make_timestamp(LogEvent, Config),
67    Headers = make_headers(LogEvent, Config),
68    #'P_basic'{
69       content_type = ContentType,
70       timestamp = Timestamp,
71       headers = Headers
72      }.
73
74make_routing_key(#{level := Level}, _) ->
75    rabbit_data_coercion:to_binary(Level).
76
77guess_content_type(#{formatter := {rabbit_logger_json_fmt, _}}) ->
78    <<"application/json">>;
79guess_content_type(_) ->
80    <<"text/plain">>.
81
82make_timestamp(#{meta := #{time := Timestamp}}, _) ->
83    erlang:convert_time_unit(Timestamp, microsecond, second);
84make_timestamp(_, _) ->
85     os:system_time(second).
86
87make_headers(_, _) ->
88    Node = rabbit_data_coercion:to_binary(node()),
89    [{<<"node">>, longstr, Node}].
90
91try_format_body(LogEvent, #{formatter := {Formatter, FormatterConfig}}) ->
92    Formatted = try_format_body(LogEvent, Formatter, FormatterConfig),
93    erlang:iolist_to_binary(Formatted).
94
95try_format_body(LogEvent, Formatter, FormatterConfig) ->
96    try
97        Formatter:format(LogEvent, FormatterConfig)
98    catch
99        C:R:S ->
100            case {?DEFAULT_FORMATTER, ?DEFAULT_FORMATTER_CONFIG} of
101                {Formatter, FormatterConfig} ->
102                    "DEFAULT FORMATTER CRASHED\n";
103                {DefaultFormatter, DefaultFormatterConfig} ->
104                    Msg = {"FORMATTER CRASH: ~tp -- ~p:~p:~p",
105                           [maps:get(msg, LogEvent), C, R, S]},
106                    LogEvent1 = LogEvent#{msg => Msg},
107                    try_format_body(
108                      LogEvent1,
109                      DefaultFormatter,
110                      DefaultFormatterConfig)
111            end
112    end.
113
114start_setup_proc(#{config := InternalConfig} = Config) ->
115    {ok, DefaultVHost} = application:get_env(rabbit, default_vhost),
116    Exchange = rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
117    InternalConfig1 = InternalConfig#{exchange => Exchange},
118
119    Pid = spawn(fun() -> setup_proc(Config#{config => InternalConfig1}) end),
120    InternalConfig2 = InternalConfig1#{setup_proc => Pid},
121    Config#{config => InternalConfig2}.
122
123setup_proc(
124  #{config := #{exchange := #resource{name = Name,
125                                      virtual_host = VHost}}} = Config) ->
126    case declare_exchange(Config) of
127        ok ->
128            ?LOG_INFO(
129               "Logging to exchange '~s' in vhost '~s' ready", [Name, VHost],
130               #{domain => ?RMQLOG_DOMAIN_GLOBAL});
131        error ->
132            ?LOG_DEBUG(
133               "Logging to exchange '~s' in vhost '~s' not ready, "
134               "trying again in ~b second(s)",
135               [Name, VHost, ?DECL_EXCHANGE_INTERVAL_SECS],
136               #{domain => ?RMQLOG_DOMAIN_GLOBAL}),
137            receive
138                stop -> ok
139            after ?DECL_EXCHANGE_INTERVAL_SECS * 1000 ->
140                      setup_proc(Config)
141            end
142    end.
143
144declare_exchange(
145  #{config := #{exchange := #resource{name = Name,
146                                      virtual_host = VHost} = Exchange}}) ->
147    try
148        %% Durable.
149        #exchange{} = rabbit_exchange:declare(
150                        Exchange, topic, true, false, true, [],
151                        ?INTERNAL_USER),
152        ?LOG_DEBUG(
153           "Declared exchange '~s' in vhost '~s'",
154           [Name, VHost],
155           #{domain => ?RMQLOG_DOMAIN_GLOBAL}),
156        ok
157    catch
158        Class:Reason ->
159            ?LOG_DEBUG(
160               "Could not declare exchange '~s' in vhost '~s', "
161               "reason: ~0p:~0p",
162               [Name, VHost, Class, Reason],
163               #{domain => ?RMQLOG_DOMAIN_GLOBAL}),
164           error
165    end.
166
167unconfigure_exchange(
168  #{config := #{exchange := #resource{name = Name,
169                                      virtual_host = VHost} = Exchange,
170                setup_proc := Pid}}) ->
171    Pid ! stop,
172    rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER),
173    ?LOG_INFO(
174       "Logging to exchange '~s' in vhost '~s' disabled",
175       [Name, VHost],
176       #{domain => ?RMQLOG_DOMAIN_GLOBAL}).
177