1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6%%
7
8-module(rabbit_global_counters).
9
10-export([
11         boot_step/0,
12         init/1,
13         init/2,
14         overview/0,
15         prometheus_format/0,
16         increase_protocol_counter/3,
17         messages_received/2,
18         messages_received_confirm/2,
19         messages_routed/2,
20         messages_unroutable_dropped/2,
21         messages_unroutable_returned/2,
22         messages_confirmed/2,
23         messages_delivered/3,
24         messages_delivered_consume_manual_ack/3,
25         messages_delivered_consume_auto_ack/3,
26         messages_delivered_get_manual_ack/3,
27         messages_delivered_get_auto_ack/3,
28         messages_get_empty/3,
29         messages_redelivered/3,
30         messages_acknowledged/3,
31         publisher_created/1,
32         publisher_deleted/1,
33         consumer_created/1,
34         consumer_deleted/1
35       ]).
36
37%% PROTOCOL COUNTERS:
38-define(MESSAGES_RECEIVED, 1).
39-define(MESSAGES_RECEIVED_CONFIRM, 2).
40-define(MESSAGES_ROUTED, 3).
41-define(MESSAGES_UNROUTABLE_DROPPED, 4).
42-define(MESSAGES_UNROUTABLE_RETURNED, 5).
43-define(MESSAGES_CONFIRMED, 6).
44-define(PUBLISHERS, 7).
45-define(CONSUMERS, 8).
46%% Note: ?NUM_PROTOCOL_COUNTERS needs to be up-to-date. See include/rabbit_global_counters.hrl
47-define(PROTOCOL_COUNTERS,
48            [
49                {
50                    messages_received_total, ?MESSAGES_RECEIVED, counter,
51                    "Total number of messages received from publishers"
52                },
53                {
54                    messages_received_confirm_total, ?MESSAGES_RECEIVED_CONFIRM, counter,
55                    "Total number of messages received from publishers expecting confirmations"
56                },
57                {
58                    messages_routed_total, ?MESSAGES_ROUTED, counter,
59                    "Total number of messages routed to queues or streams"
60                },
61                {
62                    messages_unroutable_dropped_total, ?MESSAGES_UNROUTABLE_DROPPED, counter,
63                    "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
64                },
65                {
66                    messages_unroutable_returned_total, ?MESSAGES_UNROUTABLE_RETURNED, counter,
67                   "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
68                },
69                {
70                    messages_confirmed_total, ?MESSAGES_CONFIRMED, counter,
71                    "Total number of messages confirmed to publishers"
72                },
73                {
74                    publishers, ?PUBLISHERS, gauge,
75                    "Current number of publishers"
76                },
77                {
78                    consumers, ?CONSUMERS, gauge,
79                    "Current number of consumers"
80                }
81            ]).
82
83%% Protocol & QueueType counters:
84-define(MESSAGES_DELIVERED, 1).
85-define(MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, 2).
86-define(MESSAGES_DELIVERED_CONSUME_AUTO_ACK, 3).
87-define(MESSAGES_DELIVERED_GET_MANUAL_ACK, 4).
88-define(MESSAGES_DELIVERED_GET_AUTO_ACK, 5).
89-define(MESSAGES_GET_EMPTY, 6).
90-define(MESSAGES_REDELIVERED, 7).
91-define(MESSAGES_ACKNOWLEDGED, 8).
92%% Note: ?NUM_PROTOCOL_QUEUE_TYPE_COUNTERS needs to be up-to-date. See include/rabbit_global_counters.hrl
93-define(PROTOCOL_QUEUE_TYPE_COUNTERS,
94            [
95                {
96                    messages_delivered_total, ?MESSAGES_DELIVERED, counter,
97                    "Total number of messages delivered to consumers"
98                },
99                {
100                    messages_delivered_consume_manual_ack_total, ?MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, counter,
101                    "Total number of messages delivered to consumers using basic.consume with manual acknowledgment"
102                },
103                {
104                    messages_delivered_consume_auto_ack_total, ?MESSAGES_DELIVERED_CONSUME_AUTO_ACK, counter,
105                    "Total number of messages delivered to consumers using basic.consume with automatic acknowledgment"
106                },
107                {
108                    messages_delivered_get_manual_ack_total, ?MESSAGES_DELIVERED_GET_MANUAL_ACK, counter,
109                    "Total number of messages delivered to consumers using basic.get with manual acknowledgment"
110                },
111                {
112                    messages_delivered_get_auto_ack_total, ?MESSAGES_DELIVERED_GET_AUTO_ACK, counter,
113                    "Total number of messages delivered to consumers using basic.get with automatic acknowledgment"
114                },
115                {
116                    messages_get_empty_total, ?MESSAGES_GET_EMPTY, counter,
117                    "Total number of times basic.get operations fetched no message"
118                },
119                {
120                    messages_redelivered_total, ?MESSAGES_REDELIVERED, counter,
121                    "Total number of messages redelivered to consumers"
122                },
123                {
124                    messages_acknowledged_total, ?MESSAGES_ACKNOWLEDGED, counter,
125                    "Total number of messages acknowledged by consumers"
126                }
127            ]).
128
129boot_step() ->
130    init([{protocol, amqp091}]),
131    init([{protocol, amqp091}, {queue_type, rabbit_classic_queue}]),
132    init([{protocol, amqp091}, {queue_type, rabbit_quorum_queue}]),
133    init([{protocol, amqp091}, {queue_type, rabbit_stream_queue}]).
134
135init(Labels) ->
136    init(Labels, []).
137
138init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) ->
139    _ = seshat_counters:new_group(?MODULE),
140    Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra),
141    persistent_term:put({?MODULE, Protocol, QueueType}, Counters),
142    ok;
143init(Labels = [{protocol, Protocol}], Extra) ->
144    _ = seshat_counters:new_group(?MODULE),
145    Counters = seshat_counters:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra),
146    persistent_term:put({?MODULE, Protocol}, Counters),
147    ok.
148
149overview() ->
150    seshat_counters:overview(?MODULE).
151
152prometheus_format() ->
153    seshat_counters:prometheus_format(?MODULE).
154
155increase_protocol_counter(Protocol, Counter, Num) ->
156    counters:add(fetch(Protocol), Counter, Num).
157
158messages_received(Protocol, Num) ->
159    counters:add(fetch(Protocol), ?MESSAGES_RECEIVED, Num).
160
161messages_received_confirm(Protocol, Num) ->
162    counters:add(fetch(Protocol), ?MESSAGES_RECEIVED_CONFIRM, Num).
163
164messages_routed(Protocol, Num) ->
165    counters:add(fetch(Protocol), ?MESSAGES_ROUTED, Num).
166
167messages_unroutable_dropped(Protocol, Num) ->
168    counters:add(fetch(Protocol), ?MESSAGES_UNROUTABLE_DROPPED, Num).
169
170messages_unroutable_returned(Protocol, Num) ->
171    counters:add(fetch(Protocol), ?MESSAGES_UNROUTABLE_RETURNED, Num).
172
173messages_confirmed(Protocol, Num) ->
174    counters:add(fetch(Protocol), ?MESSAGES_CONFIRMED, Num).
175
176messages_delivered(Protocol, QueueType, Num) ->
177    counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED, Num).
178
179messages_delivered_consume_manual_ack(Protocol, QueueType, Num) ->
180    counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, Num).
181
182messages_delivered_consume_auto_ack(Protocol, QueueType, Num) ->
183    counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED_CONSUME_AUTO_ACK, Num).
184
185messages_delivered_get_manual_ack(Protocol, QueueType, Num) ->
186    counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED_GET_MANUAL_ACK, Num).
187
188messages_delivered_get_auto_ack(Protocol, QueueType, Num) ->
189    counters:add(fetch(Protocol, QueueType), ?MESSAGES_DELIVERED_GET_AUTO_ACK, Num).
190
191messages_get_empty(Protocol, QueueType, Num) ->
192    counters:add(fetch(Protocol, QueueType), ?MESSAGES_GET_EMPTY, Num).
193
194messages_redelivered(Protocol, QueueType, Num) ->
195    counters:add(fetch(Protocol, QueueType), ?MESSAGES_REDELIVERED, Num).
196
197messages_acknowledged(Protocol, QueueType, Num) ->
198    counters:add(fetch(Protocol, QueueType), ?MESSAGES_ACKNOWLEDGED, Num).
199
200publisher_created(Protocol) ->
201    counters:add(fetch(Protocol), ?PUBLISHERS, 1).
202
203publisher_deleted(Protocol) ->
204    counters:add(fetch(Protocol), ?PUBLISHERS, -1).
205
206consumer_created(Protocol) ->
207    counters:add(fetch(Protocol), ?CONSUMERS, 1).
208
209consumer_deleted(Protocol) ->
210    counters:add(fetch(Protocol), ?CONSUMERS, -1).
211
212fetch(Protocol) ->
213    persistent_term:get({?MODULE, Protocol}).
214
215fetch(Protocol, QueueType) ->
216    persistent_term:get({?MODULE, Protocol, QueueType}).
217