1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4. If not, see <http://www.gnu.org/licenses/>.
16
17 #include "activity_statistics_monitor.h"
18 #include "incoming_message.h"
19 #include "mutex_lock.h"
20 #include "parameters.h"
21
22 #include <ctime>
23 #include <map>
24 #include <set>
25 #include <vector>
26
27 namespace yami
28 {
29 namespace details
30 {
31
32 enum connection_type { incoming, outgoing };
33
34 struct connection_stats
35 {
connection_statsyami::details::connection_stats36 connection_stats(connection_type t = incoming)
37 : type(t), messages_sent(0), messages_received(0),
38 bytes_sent(0), bytes_received(0)
39 {
40 }
41
resetyami::details::connection_stats42 void reset()
43 {
44 messages_sent = 0;
45 messages_received = 0;
46 bytes_sent = 0;
47 bytes_received = 0;
48 }
49
50 connection_type type;
51 std::size_t messages_sent;
52 std::size_t messages_received;
53 std::size_t bytes_sent;
54 std::size_t bytes_received;
55 };
56
57 class activity_statistics_monitor_impl
58 {
59 public:
60
activity_statistics_monitor_impl()61 activity_statistics_monitor_impl()
62 {
63 mtx_.init();
64 }
65
~activity_statistics_monitor_impl()66 ~activity_statistics_monitor_impl()
67 {
68 mtx_.clean();
69 }
70
agent_created()71 void agent_created()
72 {
73 started_ = std::time(NULL);
74 }
75
agent_closed()76 void agent_closed()
77 {
78 }
79
listener_added(const char * target)80 void listener_added(const char * target)
81 {
82 mutex_lock lck(mtx_);
83
84 listeners_.insert(target);
85 }
86
listener_removed(const char * target)87 void listener_removed(const char * target)
88 {
89 mutex_lock lck(mtx_);
90
91 listeners_.erase(target);
92 }
93
incoming_connection_open(const char * target)94 void incoming_connection_open(const char * target)
95 {
96 mutex_lock lck(mtx_);
97
98 connections_[target] = connection_stats(incoming);
99 }
100
outgoing_connection_open(const char * target)101 void outgoing_connection_open(const char * target)
102 {
103 mutex_lock lck(mtx_);
104
105 connections_[target] = connection_stats(outgoing);
106 }
107
connection_closed(const char * target)108 void connection_closed(const char * target)
109 {
110 mutex_lock lck(mtx_);
111
112 connections_.erase(target);
113 }
114
connection_error(const char * target)115 void connection_error(const char * target)
116 {
117 mutex_lock lck(mtx_);
118
119 ++(connection_errors_[target]);
120 }
121
object_registered(const char * name)122 void object_registered(const char * name)
123 {
124 mutex_lock lck(mtx_);
125
126 objects_.insert(name);
127 }
128
object_unregistered(const char * name)129 void object_unregistered(const char * name)
130 {
131 mutex_lock lck(mtx_);
132
133 objects_.erase(name);
134 }
135
message_sent(const char * target,std::size_t size)136 void message_sent(const char * target, std::size_t size)
137 {
138 mutex_lock lck(mtx_);
139
140 connection_stats & stats = connections_[target];
141
142 ++stats.messages_sent;
143 stats.bytes_sent += size;
144 }
145
message_received(const char * target,std::size_t size)146 void message_received(const char * target, std::size_t size)
147 {
148 mutex_lock lck(mtx_);
149
150 connection_stats & stats = connections_[target];
151
152 ++stats.messages_received;
153 stats.bytes_received += size;
154 }
155
get(parameters & params,bool reset_counters)156 void get(parameters & params, bool reset_counters)
157 {
158 mutex_lock lck(mtx_);
159
160 // uptime information
161
162 double uptime = std::difftime(std::time(NULL), started_);
163 params.set_integer("uptime", static_cast<int>(uptime));
164
165 // list of active listeners
166
167 const char * listeners_field = "listeners";
168 params.create_string_array(listeners_field, listeners_.size());
169 std::set<std::string>::iterator lst_it = listeners_.begin();
170 std::set<std::string>::iterator lst_end = listeners_.end();
171 std::size_t index = 0;
172 for ( ; lst_it != lst_end; ++lst_it, ++index)
173 {
174 params.set_string_in_array(listeners_field, index, *lst_it);
175 }
176
177 // list of registered objects
178
179 const char * objects_field = "objects";
180 params.create_string_array(objects_field, objects_.size());
181 std::set<std::string>::iterator obj_it = objects_.begin();
182 std::set<std::string>::iterator obj_end = objects_.end();
183 index = 0;
184 for ( ; obj_it != obj_end; ++obj_it, ++index)
185 {
186 params.set_string_in_array(objects_field, index, *obj_it);
187 }
188
189 // connection statistics
190
191 std::vector<long long> conn_messages_sent(connections_.size());
192 std::vector<long long> conn_messages_received(connections_.size());
193 std::vector<long long> conn_bytes_sent(connections_.size());
194 std::vector<long long> conn_bytes_received(connections_.size());
195
196 const char * conn_names_field = "connection_names";
197 params.create_string_array(conn_names_field, connections_.size());
198 connection_map::iterator conn_it = connections_.begin();
199 connection_map::iterator conn_end = connections_.end();
200 index = 0;
201 for ( ; conn_it != conn_end; ++conn_it, ++index)
202 {
203 params.set_string_in_array(
204 conn_names_field, index, conn_it->first);
205
206 conn_messages_sent[index] = conn_it->second.messages_sent;
207 conn_messages_received[index] = conn_it->second.messages_received;
208 conn_bytes_sent[index] = conn_it->second.bytes_sent;
209 conn_bytes_received[index] = conn_it->second.bytes_received;
210 }
211
212 params.set_long_long_array("messages_sent",
213 &conn_messages_sent[0], conn_messages_sent.size());
214 params.set_long_long_array("messages_received",
215 &conn_messages_received[0], conn_messages_received.size());
216 params.set_long_long_array("bytes_sent",
217 &conn_bytes_sent[0], conn_bytes_sent.size());
218 params.set_long_long_array("bytes_received",
219 &conn_bytes_received[0], conn_bytes_received.size());
220
221 // connection errors
222
223 std::vector<long long> connection_errors(connection_errors_.size());
224
225 const char * conn_error_names_field = "error_names";
226 params.create_string_array(conn_error_names_field,
227 connection_errors_.size());
228 connection_error_map::iterator err_it = connection_errors_.begin();
229 connection_error_map::iterator err_end = connection_errors_.end();
230 index = 0;
231 for ( ; err_it != err_end; ++err_it, ++index)
232 {
233 params.set_string_in_array(
234 conn_error_names_field, index, err_it->first);
235
236 connection_errors[index] = err_it->second;
237 }
238
239 params.set_long_long_array("errors",
240 &connection_errors[0], connection_errors.size());
241
242 // atomic reset, if requested
243
244 if (reset_counters)
245 {
246 connection_errors_.clear();
247
248 connection_map::iterator it = connections_.begin();
249 connection_map::iterator end = connections_.end();
250 for ( ; it != end; ++it)
251 {
252 it->second.reset();
253 }
254 }
255 }
256
257 private:
258
259 typedef std::map<std::string, connection_stats> connection_map;
260 typedef std::map<std::string, std::size_t> connection_error_map;
261
262 details::mutex mtx_;
263
264 std::time_t started_;
265
266 std::set<std::string> listeners_;
267 std::set<std::string> objects_;
268
269 connection_map connections_;
270
271 // counter of errors per target
272 // this structure can contain targets
273 // that are not active in the main structure
274 connection_error_map connection_errors_;
275 };
276
277 } // namespace details
278
279 } // namespace yami
280
281 using namespace yami;
282
activity_statistics_monitor()283 activity_statistics_monitor::activity_statistics_monitor()
284 {
285 pimpl_ = new details::activity_statistics_monitor_impl();
286 }
287
~activity_statistics_monitor()288 activity_statistics_monitor::~activity_statistics_monitor()
289 {
290 delete pimpl_;
291 }
292
get(parameters & params,bool reset_counters)293 void activity_statistics_monitor::get(
294 parameters & params, bool reset_counters)
295 {
296 pimpl_->get(params, reset_counters);
297 }
298
operator ()(incoming_message & msg)299 void activity_statistics_monitor::operator()(incoming_message & msg)
300 {
301 if (msg.get_message_name() != "get")
302 {
303 msg.reject("Unknown message name.");
304 return;
305 }
306
307 const parameters & msg_params = msg.get_parameters();
308
309 bool reset_counters = false;
310 parameter_entry e;
311 const char * reset_field = "reset";
312 if (msg_params.find(reset_field, e) && e.type() == boolean)
313 {
314 reset_counters = msg_params.get_boolean(reset_field);
315 }
316
317 parameters reply_params;
318 get(reply_params, reset_counters);
319
320 msg.reply(reply_params);
321 }
322
agent_created()323 void activity_statistics_monitor::agent_created()
324 {
325 pimpl_->agent_created();
326 }
327
agent_closed()328 void activity_statistics_monitor::agent_closed()
329 {
330 pimpl_->agent_closed();
331 }
332
listener_added(const char * target)333 void activity_statistics_monitor::listener_added(const char * target)
334 {
335 pimpl_->listener_added(target);
336 }
337
listener_removed(const char * target)338 void activity_statistics_monitor::listener_removed(const char * target)
339 {
340 pimpl_->listener_removed(target);
341 }
342
incoming_connection_open(const char * target)343 void activity_statistics_monitor::incoming_connection_open(
344 const char * target)
345 {
346 pimpl_->incoming_connection_open(target);
347 }
348
outgoing_connection_open(const char * target)349 void activity_statistics_monitor::outgoing_connection_open(
350 const char * target)
351 {
352 pimpl_->outgoing_connection_open(target);
353 }
354
connection_closed(const char * target)355 void activity_statistics_monitor::connection_closed(const char * target)
356 {
357 pimpl_->connection_closed(target);
358 }
359
connection_error(const char * target)360 void activity_statistics_monitor::connection_error(const char * target)
361 {
362 pimpl_->connection_error(target);
363 }
364
object_registered(const char * name)365 void activity_statistics_monitor::object_registered(const char * name)
366 {
367 pimpl_->object_registered(name);
368 }
369
object_unregistered(const char * name)370 void activity_statistics_monitor::object_unregistered(const char * name)
371 {
372 pimpl_->object_unregistered(name);
373 }
374
message_sent(const char * target,std::size_t size)375 void activity_statistics_monitor::message_sent(
376 const char * target, std::size_t size)
377 {
378 pimpl_->message_sent(target, size);
379 }
380
message_received(const char * target,std::size_t size)381 void activity_statistics_monitor::message_received(
382 const char * target, std::size_t size)
383 {
384 pimpl_->message_received(target, size);
385 }
386