1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_global_counters). 9 10-export([ 11 boot_step/0, 12 init/1, 13 init/2, 14 overview/0, 15 prometheus_format/0, 16 increase_protocol_counter/3, 17 messages_received/2, 18 messages_received_confirm/2, 19 messages_routed/2, 20 messages_unroutable_dropped/2, 21 messages_unroutable_returned/2, 22 messages_confirmed/2, 23 messages_delivered/3, 24 messages_delivered_consume_manual_ack/3, 25 messages_delivered_consume_auto_ack/3, 26 messages_delivered_get_manual_ack/3, 27 messages_delivered_get_auto_ack/3, 28 messages_get_empty/3, 29 messages_redelivered/3, 30 messages_acknowledged/3, 31 publisher_created/1, 32 publisher_deleted/1, 33 consumer_created/1, 34 consumer_deleted/1 35 ]). 36 37%% PROTOCOL COUNTERS: 38-define(MESSAGES_RECEIVED, 1). 39-define(MESSAGES_RECEIVED_CONFIRM, 2). 40-define(MESSAGES_ROUTED, 3). 41-define(MESSAGES_UNROUTABLE_DROPPED, 4). 42-define(MESSAGES_UNROUTABLE_RETURNED, 5). 43-define(MESSAGES_CONFIRMED, 6). 44-define(PUBLISHERS, 7). 45-define(CONSUMERS, 8). 46%% Note: ?NUM_PROTOCOL_COUNTERS needs to be up-to-date. See include/rabbit_global_counters.hrl 47-define(PROTOCOL_COUNTERS, 48 [ 49 { 50 messages_received_total, ?MESSAGES_RECEIVED, counter, 51 "Total number of messages received from publishers" 52 }, 53 { 54 messages_received_confirm_total, ?MESSAGES_RECEIVED_CONFIRM, counter, 55 "Total number of messages received from publishers expecting confirmations" 56 }, 57 { 58 messages_routed_total, ?MESSAGES_ROUTED, counter, 59 "Total number of messages routed to queues or streams" 60 }, 61 { 62 messages_unroutable_dropped_total, ?MESSAGES_UNROUTABLE_DROPPED, counter, 63 "Total number of messages published as non-mandatory into an exchange and dropped as unroutable" 64 }, 65 { 66 messages_unroutable_returned_total, ?MESSAGES_UNROUTABLE_RETURNED, counter, 67 "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable" 68 }, 69 { 70 messages_confirmed_total, ?MESSAGES_CONFIRMED, counter, 71 "Total number of messages confirmed to publishers" 72 }, 73 { 74 publishers, ?PUBLISHERS, gauge, 75 "Current number of publishers" 76 }, 77 { 78 consumers, ?CONSUMERS, gauge, 79 "Current number of consumers" 80 } 81 ]). 82 83%% Protocol & QueueType counters: 84-define(MESSAGES_DELIVERED, 1). 85-define(MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, 2). 86-define(MESSAGES_DELIVERED_CONSUME_AUTO_ACK, 3). 87-define(MESSAGES_DELIVERED_GET_MANUAL_ACK, 4). 88-define(MESSAGES_DELIVERED_GET_AUTO_ACK, 5). 89-define(MESSAGES_GET_EMPTY, 6). 90-define(MESSAGES_REDELIVERED, 7). 91-define(MESSAGES_ACKNOWLEDGED, 8). 92%% Note: ?NUM_PROTOCOL_QUEUE_TYPE_COUNTERS needs to be up-to-date. See include/rabbit_global_counters.hrl 93-define(PROTOCOL_QUEUE_TYPE_COUNTERS, 94 [ 95 { 96 messages_delivered_total, ?MESSAGES_DELIVERED, counter, 97 "Total number of messages delivered to consumers" 98 }, 99 { 100 messages_delivered_consume_manual_ack_total, ?MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, counter, 101 "Total number of messages delivered to consumers using basic.consume with manual acknowledgment" 102 }, 103 { 104 messages_delivered_consume_auto_ack_total, ?MESSAGES_DELIVERED_CONSUME_AUTO_ACK, counter, 105 "Total number of messages delivered to consumers using basic.consume with automatic acknowledgment" 106 }, 107 { 108 messages_delivered_get_manual_ack_total, ?MESSAGES_DELIVERED_GET_MANUAL_ACK, counter, 109 "Total number of messages delivered to consumers using basic.get with manual acknowledgment" 110 }, 111 { 112 messages_delivered_get_auto_ack_total, ?MESSAGES_DELIVERED_GET_AUTO_ACK, counter, 113 "Total number of messages delivered to consumers using basic.get with automatic acknowledgment" 114 }, 115 { 116 messages_get_empty_total, ?MESSAGES_GET_EMPTY, counter, 117 "Total number of times basic.get operations fetched no message" 118 }, 119 { 120 messages_redelivered_total, ?MESSAGES_REDELIVERED, counter, 121 "Total number of messages redelivered to consumers" 122 }, 123 { 124 messages_acknowledged_total, ?MESSAGES_ACKNOWLEDGED, counter, 125 "Total number of messages acknowledged by consumers" 126 } 127 ]). 128 129boot_step() -> 130 init([{protocol, amqp091}]), 131 init([{protocol, amqp091}, {queue_type, rabbit_classic_queue}]), 132 init([{protocol, amqp091}, {queue_type, rabbit_quorum_queue}]), 133 init([{protocol, amqp091}, {queue_type, rabbit_stream_queue}]). 134 135init(Labels) -> 136 init(Labels, []). 137 138init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) -> 139 _ = seshat_counters:new_group(?MODULE), 140 Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra), 141 persistent_term:put({?MODULE, Protocol, QueueType}, Counters), 142 ok; 143init(Labels = [{protocol, Protocol}], Extra) -> 144 _ = seshat_counters:new_group(?MODULE), 145 Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra), 146 persistent_term:put({?MODULE, Protocol}, Counters), 147 ok. 148 149overview() -> 150 seshat_counters:overview(?MODULE). 151 152prometheus_format() -> 153 seshat_counters:prometheus_format(?MODULE). 154 155increase_protocol_counter(Protocol, Counter, Num) -> 156 counters:add(fetch(Protocol), Counter, Num). 157 158messages_received(Protocol, Num) -> 159 counters:add(fetch(Protocol), ?MESSAGES_RECEIVED, Num). 160 161messages_received_confirm(Protocol, Num) -> 162 counters:add(fetch(Protocol), ?MESSAGES_RECEIVED_CONFIRM, Num). 163 164messages_routed(Protocol, Num) -> 165 counters:add(fetch(Protocol), ?MESSAGES_ROUTED, Num). 166 167messages_unroutable_dropped(Protocol, Num) -> 168 counters:add(fetch(Protocol), ?MESSAGES_UNROUTABLE_DROPPED, Num). 169 170messages_unroutable_returned(Protocol, Num) -> 171 counters:add(fetch(Protocol), ?MESSAGES_UNROUTABLE_RETURNED, Num). 172 173messages_confirmed(Protocol, Num) -> 174 counters:add(fetch(Protocol), ?MESSAGES_CONFIRMED, Num). 175 176messages_delivered(Protocol, QueueType, Num) -> 177 counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED, Num). 178 179messages_delivered_consume_manual_ack(Protocol, QueueType, Num) -> 180 counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, Num). 181 182messages_delivered_consume_auto_ack(Protocol, QueueType, Num) -> 183 counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED_CONSUME_AUTO_ACK, Num). 184 185messages_delivered_get_manual_ack(Protocol, QueueType, Num) -> 186 counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED_GET_MANUAL_ACK, Num). 187 188messages_delivered_get_auto_ack(Protocol, QueueType, Num) -> 189 counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED_GET_AUTO_ACK, Num). 190 191messages_get_empty(Protocol, QueueType, Num) -> 192 counters:add(fetch(Protocol, QueueType), ?MESSAGES_GET_EMPTY, Num). 193 194messages_redelivered(Protocol, QueueType, Num) -> 195 counters:add(fetch(Protocol, QueueType), ?MESSAGES_REDELIVERED, Num). 196 197messages_acknowledged(Protocol, QueueType, Num) -> 198 counters:add(fetch(Protocol, QueueType), ?MESSAGES_ACKNOWLEDGED, Num). 199 200publisher_created(Protocol) -> 201 counters:add(fetch(Protocol), ?PUBLISHERS, 1). 202 203publisher_deleted(Protocol) -> 204 counters:add(fetch(Protocol), ?PUBLISHERS, -1). 205 206consumer_created(Protocol) -> 207 counters:add(fetch(Protocol), ?CONSUMERS, 1). 208 209consumer_deleted(Protocol) -> 210 counters:add(fetch(Protocol), ?CONSUMERS, -1). 211 212fetch(Protocol) -> 213 persistent_term:get({?MODULE, Protocol}). 214 215fetch(Protocol, QueueType) -> 216 persistent_term:get({?MODULE, Protocol, QueueType}). 217