1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_logger_exchange_h). 9 10-include_lib("kernel/include/logger.hrl"). 11 12-include_lib("rabbit_common/include/rabbit.hrl"). 13-include_lib("rabbit_common/include/rabbit_framing.hrl"). 14-include_lib("rabbit_common/include/logging.hrl"). 15 16%% logger callbacks 17-export([log/2, adding_handler/1, removing_handler/1, changing_config/3, 18 filter_config/1]). 19 20-define(DECL_EXCHANGE_INTERVAL_SECS, 5). 21-define(LOG_EXCH_NAME, <<"amq.rabbitmq.log">>). 22-define(DEFAULT_FORMATTER, logger_formatter). 23-define(DEFAULT_FORMATTER_CONFIG, #{}). 24 25%% ------------------------------------------------------------------- 26%% Logger handler callbacks. 27%% ------------------------------------------------------------------- 28 29adding_handler(Config) -> 30 Config1 = start_setup_proc(Config), 31 {ok, Config1}. 32 33changing_config(_SetOrUpdate, OldConfig, _NewConfig) -> 34 {ok, OldConfig}. 35 36filter_config(Config) -> 37 Config. 38 39log(#{meta := #{mfa := {?MODULE, _, _}}}, _) -> 40 ok; 41log(LogEvent, Config) -> 42 case rabbit_boot_state:get() of 43 ready -> do_log(LogEvent, Config); 44 _ -> ok 45 end. 46 47do_log(LogEvent, #{config := #{exchange := Exchange}} = Config) -> 48 RoutingKey = make_routing_key(LogEvent, Config), 49 AmqpMsg = log_event_to_amqp_msg(LogEvent, Config), 50 Body = try_format_body(LogEvent, Config), 51 case rabbit_basic:publish(Exchange, RoutingKey, AmqpMsg, Body) of 52 ok -> ok; 53 {error, not_found} -> ok 54 end. 55 56removing_handler(Config) -> 57 unconfigure_exchange(Config), 58 ok. 59 60%% ------------------------------------------------------------------- 61%% Internal functions. 62%% ------------------------------------------------------------------- 63 64log_event_to_amqp_msg(LogEvent, Config) -> 65 ContentType = guess_content_type(Config), 66 Timestamp = make_timestamp(LogEvent, Config), 67 Headers = make_headers(LogEvent, Config), 68 #'P_basic'{ 69 content_type = ContentType, 70 timestamp = Timestamp, 71 headers = Headers 72 }. 73 74make_routing_key(#{level := Level}, _) -> 75 rabbit_data_coercion:to_binary(Level). 76 77guess_content_type(#{formatter := {rabbit_logger_json_fmt, _}}) -> 78 <<"application/json">>; 79guess_content_type(_) -> 80 <<"text/plain">>. 81 82make_timestamp(#{meta := #{time := Timestamp}}, _) -> 83 erlang:convert_time_unit(Timestamp, microsecond, second); 84make_timestamp(_, _) -> 85 os:system_time(second). 86 87make_headers(_, _) -> 88 Node = rabbit_data_coercion:to_binary(node()), 89 [{<<"node">>, longstr, Node}]. 90 91try_format_body(LogEvent, #{formatter := {Formatter, FormatterConfig}}) -> 92 Formatted = try_format_body(LogEvent, Formatter, FormatterConfig), 93 erlang:iolist_to_binary(Formatted). 94 95try_format_body(LogEvent, Formatter, FormatterConfig) -> 96 try 97 Formatter:format(LogEvent, FormatterConfig) 98 catch 99 C:R:S -> 100 case {?DEFAULT_FORMATTER, ?DEFAULT_FORMATTER_CONFIG} of 101 {Formatter, FormatterConfig} -> 102 "DEFAULT FORMATTER CRASHED\n"; 103 {DefaultFormatter, DefaultFormatterConfig} -> 104 Msg = {"FORMATTER CRASH: ~tp -- ~p:~p:~p", 105 [maps:get(msg, LogEvent), C, R, S]}, 106 LogEvent1 = LogEvent#{msg => Msg}, 107 try_format_body( 108 LogEvent1, 109 DefaultFormatter, 110 DefaultFormatterConfig) 111 end 112 end. 113 114start_setup_proc(#{config := InternalConfig} = Config) -> 115 {ok, DefaultVHost} = application:get_env(rabbit, default_vhost), 116 Exchange = rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), 117 InternalConfig1 = InternalConfig#{exchange => Exchange}, 118 119 Pid = spawn(fun() -> setup_proc(Config#{config => InternalConfig1}) end), 120 InternalConfig2 = InternalConfig1#{setup_proc => Pid}, 121 Config#{config => InternalConfig2}. 122 123setup_proc( 124 #{config := #{exchange := #resource{name = Name, 125 virtual_host = VHost}}} = Config) -> 126 case declare_exchange(Config) of 127 ok -> 128 ?LOG_INFO( 129 "Logging to exchange '~s' in vhost '~s' ready", [Name, VHost], 130 #{domain => ?RMQLOG_DOMAIN_GLOBAL}); 131 error -> 132 ?LOG_DEBUG( 133 "Logging to exchange '~s' in vhost '~s' not ready, " 134 "trying again in ~b second(s)", 135 [Name, VHost, ?DECL_EXCHANGE_INTERVAL_SECS], 136 #{domain => ?RMQLOG_DOMAIN_GLOBAL}), 137 receive 138 stop -> ok 139 after ?DECL_EXCHANGE_INTERVAL_SECS * 1000 -> 140 setup_proc(Config) 141 end 142 end. 143 144declare_exchange( 145 #{config := #{exchange := #resource{name = Name, 146 virtual_host = VHost} = Exchange}}) -> 147 try 148 %% Durable. 149 #exchange{} = rabbit_exchange:declare( 150 Exchange, topic, true, false, true, [], 151 ?INTERNAL_USER), 152 ?LOG_DEBUG( 153 "Declared exchange '~s' in vhost '~s'", 154 [Name, VHost], 155 #{domain => ?RMQLOG_DOMAIN_GLOBAL}), 156 ok 157 catch 158 Class:Reason -> 159 ?LOG_DEBUG( 160 "Could not declare exchange '~s' in vhost '~s', " 161 "reason: ~0p:~0p", 162 [Name, VHost, Class, Reason], 163 #{domain => ?RMQLOG_DOMAIN_GLOBAL}), 164 error 165 end. 166 167unconfigure_exchange( 168 #{config := #{exchange := #resource{name = Name, 169 virtual_host = VHost} = Exchange, 170 setup_proc := Pid}}) -> 171 Pid ! stop, 172 rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER), 173 ?LOG_INFO( 174 "Logging to exchange '~s' in vhost '~s' disabled", 175 [Name, VHost], 176 #{domain => ?RMQLOG_DOMAIN_GLOBAL}). 177