1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4.  If not, see <http://www.gnu.org/licenses/>.
16 
17 #include "activity_statistics_monitor.h"
18 #include "incoming_message.h"
19 #include "mutex_lock.h"
20 #include "parameters.h"
21 
22 #include <ctime>
23 #include <map>
24 #include <set>
25 #include <vector>
26 
27 namespace yami
28 {
29 namespace details
30 {
31 
32 enum connection_type { incoming, outgoing };
33 
34 struct connection_stats
35 {
connection_statsyami::details::connection_stats36     connection_stats(connection_type t = incoming)
37         : type(t), messages_sent(0), messages_received(0),
38           bytes_sent(0), bytes_received(0)
39     {
40     }
41 
resetyami::details::connection_stats42     void reset()
43     {
44         messages_sent = 0;
45         messages_received = 0;
46         bytes_sent = 0;
47         bytes_received = 0;
48     }
49 
50     connection_type type;
51     std::size_t messages_sent;
52     std::size_t messages_received;
53     std::size_t bytes_sent;
54     std::size_t bytes_received;
55 };
56 
57 class activity_statistics_monitor_impl
58 {
59 public:
60 
activity_statistics_monitor_impl()61     activity_statistics_monitor_impl()
62     {
63         mtx_.init();
64     }
65 
~activity_statistics_monitor_impl()66     ~activity_statistics_monitor_impl()
67     {
68         mtx_.clean();
69     }
70 
agent_created()71     void agent_created()
72     {
73         started_ = std::time(NULL);
74     }
75 
agent_closed()76     void agent_closed()
77     {
78     }
79 
listener_added(const char * target)80     void listener_added(const char * target)
81     {
82         mutex_lock lck(mtx_);
83 
84         listeners_.insert(target);
85     }
86 
listener_removed(const char * target)87     void listener_removed(const char * target)
88     {
89         mutex_lock lck(mtx_);
90 
91         listeners_.erase(target);
92     }
93 
incoming_connection_open(const char * target)94     void incoming_connection_open(const char * target)
95     {
96         mutex_lock lck(mtx_);
97 
98         connections_[target] = connection_stats(incoming);
99     }
100 
outgoing_connection_open(const char * target)101     void outgoing_connection_open(const char * target)
102     {
103         mutex_lock lck(mtx_);
104 
105         connections_[target] = connection_stats(outgoing);
106     }
107 
connection_closed(const char * target)108     void connection_closed(const char * target)
109     {
110         mutex_lock lck(mtx_);
111 
112         connections_.erase(target);
113     }
114 
connection_error(const char * target)115     void connection_error(const char * target)
116     {
117         mutex_lock lck(mtx_);
118 
119         ++(connection_errors_[target]);
120     }
121 
object_registered(const char * name)122     void object_registered(const char * name)
123     {
124         mutex_lock lck(mtx_);
125 
126         objects_.insert(name);
127     }
128 
object_unregistered(const char * name)129     void object_unregistered(const char * name)
130     {
131         mutex_lock lck(mtx_);
132 
133         objects_.erase(name);
134     }
135 
message_sent(const char * target,std::size_t size)136     void message_sent(const char * target, std::size_t size)
137     {
138         mutex_lock lck(mtx_);
139 
140         connection_stats & stats = connections_[target];
141 
142         ++stats.messages_sent;
143         stats.bytes_sent += size;
144     }
145 
message_received(const char * target,std::size_t size)146     void message_received(const char * target, std::size_t size)
147     {
148         mutex_lock lck(mtx_);
149 
150         connection_stats & stats = connections_[target];
151 
152         ++stats.messages_received;
153         stats.bytes_received += size;
154     }
155 
get(parameters & params,bool reset_counters)156     void get(parameters & params, bool reset_counters)
157     {
158         mutex_lock lck(mtx_);
159 
160         // uptime information
161 
162         double uptime = std::difftime(std::time(NULL), started_);
163         params.set_integer("uptime", static_cast<int>(uptime));
164 
165         // list of active listeners
166 
167         const char * listeners_field = "listeners";
168         params.create_string_array(listeners_field, listeners_.size());
169         std::set<std::string>::iterator lst_it = listeners_.begin();
170         std::set<std::string>::iterator lst_end = listeners_.end();
171         std::size_t index = 0;
172         for ( ; lst_it != lst_end; ++lst_it, ++index)
173         {
174             params.set_string_in_array(listeners_field, index, *lst_it);
175         }
176 
177         // list of registered objects
178 
179         const char * objects_field = "objects";
180         params.create_string_array(objects_field, objects_.size());
181         std::set<std::string>::iterator obj_it = objects_.begin();
182         std::set<std::string>::iterator obj_end = objects_.end();
183         index = 0;
184         for ( ; obj_it != obj_end; ++obj_it, ++index)
185         {
186             params.set_string_in_array(objects_field, index, *obj_it);
187         }
188 
189         // connection statistics
190 
191         std::vector<long long> conn_messages_sent(connections_.size());
192         std::vector<long long> conn_messages_received(connections_.size());
193         std::vector<long long> conn_bytes_sent(connections_.size());
194         std::vector<long long> conn_bytes_received(connections_.size());
195 
196         const char * conn_names_field = "connection_names";
197         params.create_string_array(conn_names_field, connections_.size());
198         connection_map::iterator conn_it = connections_.begin();
199         connection_map::iterator conn_end = connections_.end();
200         index = 0;
201         for ( ; conn_it != conn_end; ++conn_it, ++index)
202         {
203             params.set_string_in_array(
204                 conn_names_field, index, conn_it->first);
205 
206             conn_messages_sent[index] = conn_it->second.messages_sent;
207             conn_messages_received[index] = conn_it->second.messages_received;
208             conn_bytes_sent[index] = conn_it->second.bytes_sent;
209             conn_bytes_received[index] = conn_it->second.bytes_received;
210         }
211 
212         params.set_long_long_array("messages_sent",
213             &conn_messages_sent[0], conn_messages_sent.size());
214         params.set_long_long_array("messages_received",
215             &conn_messages_received[0], conn_messages_received.size());
216         params.set_long_long_array("bytes_sent",
217             &conn_bytes_sent[0], conn_bytes_sent.size());
218         params.set_long_long_array("bytes_received",
219             &conn_bytes_received[0], conn_bytes_received.size());
220 
221         // connection errors
222 
223         std::vector<long long> connection_errors(connection_errors_.size());
224 
225         const char * conn_error_names_field = "error_names";
226         params.create_string_array(conn_error_names_field,
227             connection_errors_.size());
228         connection_error_map::iterator err_it = connection_errors_.begin();
229         connection_error_map::iterator err_end = connection_errors_.end();
230         index = 0;
231         for ( ; err_it != err_end; ++err_it, ++index)
232         {
233             params.set_string_in_array(
234                 conn_error_names_field, index, err_it->first);
235 
236             connection_errors[index] = err_it->second;
237         }
238 
239         params.set_long_long_array("errors",
240             &connection_errors[0], connection_errors.size());
241 
242         // atomic reset, if requested
243 
244         if (reset_counters)
245         {
246             connection_errors_.clear();
247 
248             connection_map::iterator it = connections_.begin();
249             connection_map::iterator end = connections_.end();
250             for ( ; it != end; ++it)
251             {
252                 it->second.reset();
253             }
254         }
255     }
256 
257 private:
258 
259     typedef std::map<std::string, connection_stats> connection_map;
260     typedef std::map<std::string, std::size_t> connection_error_map;
261 
262     details::mutex mtx_;
263 
264     std::time_t started_;
265 
266     std::set<std::string> listeners_;
267     std::set<std::string> objects_;
268 
269     connection_map connections_;
270 
271     // counter of errors per target
272     // this structure can contain targets
273     // that are not active in the main structure
274     connection_error_map connection_errors_;
275 };
276 
277 } // namespace details
278 
279 } // namespace yami
280 
281 using namespace yami;
282 
activity_statistics_monitor()283 activity_statistics_monitor::activity_statistics_monitor()
284 {
285     pimpl_ = new details::activity_statistics_monitor_impl();
286 }
287 
~activity_statistics_monitor()288 activity_statistics_monitor::~activity_statistics_monitor()
289 {
290     delete pimpl_;
291 }
292 
get(parameters & params,bool reset_counters)293 void activity_statistics_monitor::get(
294     parameters & params, bool reset_counters)
295 {
296     pimpl_->get(params, reset_counters);
297 }
298 
operator ()(incoming_message & msg)299 void activity_statistics_monitor::operator()(incoming_message & msg)
300 {
301     if (msg.get_message_name() != "get")
302     {
303         msg.reject("Unknown message name.");
304         return;
305     }
306 
307     const parameters & msg_params = msg.get_parameters();
308 
309     bool reset_counters = false;
310     parameter_entry e;
311     const char * reset_field = "reset";
312     if (msg_params.find(reset_field, e) && e.type() == boolean)
313     {
314         reset_counters = msg_params.get_boolean(reset_field);
315     }
316 
317     parameters reply_params;
318     get(reply_params, reset_counters);
319 
320     msg.reply(reply_params);
321 }
322 
agent_created()323 void activity_statistics_monitor::agent_created()
324 {
325     pimpl_->agent_created();
326 }
327 
agent_closed()328 void activity_statistics_monitor::agent_closed()
329 {
330     pimpl_->agent_closed();
331 }
332 
listener_added(const char * target)333 void activity_statistics_monitor::listener_added(const char * target)
334 {
335     pimpl_->listener_added(target);
336 }
337 
listener_removed(const char * target)338 void activity_statistics_monitor::listener_removed(const char * target)
339 {
340     pimpl_->listener_removed(target);
341 }
342 
incoming_connection_open(const char * target)343 void activity_statistics_monitor::incoming_connection_open(
344     const char * target)
345 {
346     pimpl_->incoming_connection_open(target);
347 }
348 
outgoing_connection_open(const char * target)349 void activity_statistics_monitor::outgoing_connection_open(
350     const char * target)
351 {
352     pimpl_->outgoing_connection_open(target);
353 }
354 
connection_closed(const char * target)355 void activity_statistics_monitor::connection_closed(const char * target)
356 {
357     pimpl_->connection_closed(target);
358 }
359 
connection_error(const char * target)360 void activity_statistics_monitor::connection_error(const char * target)
361 {
362     pimpl_->connection_error(target);
363 }
364 
object_registered(const char * name)365 void activity_statistics_monitor::object_registered(const char * name)
366 {
367     pimpl_->object_registered(name);
368 }
369 
object_unregistered(const char * name)370 void activity_statistics_monitor::object_unregistered(const char * name)
371 {
372     pimpl_->object_unregistered(name);
373 }
374 
message_sent(const char * target,std::size_t size)375 void activity_statistics_monitor::message_sent(
376     const char * target, std::size_t size)
377 {
378     pimpl_->message_sent(target, size);
379 }
380 
message_received(const char * target,std::size_t size)381 void activity_statistics_monitor::message_received(
382     const char * target, std::size_t size)
383 {
384     pimpl_->message_received(target, size);
385 }
386