1 /*
2 * Copyright (C) 2009 IPTEGO GmbH
3 *
4 * This file is part of SEMS, a free SIP media server.
5 *
6 * SEMS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version. This program is released under
10 * the GPL with the additional exemption that compiling, linking,
11 * and/or using OpenSSL is allowed.
12 *
13 * For a license to use the SEMS software under conditions
14 * other than those described here, or to purchase support for this
15 * software, please contact iptel.org by e-mail at the following addresses:
16 * info@iptel.org
17 *
18 * SEMS is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with this program; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 */
27
28 #include "Monitoring.h"
29
30 #include "AmConfigReader.h"
31 #include "AmEventDispatcher.h"
32
33 #include "log.h"
34
35 #include <sys/types.h>
36 #include <regex.h>
37 #include <unistd.h>
38
39 //EXPORT_PLUGIN_CLASS_FACTORY(Monitor, MOD_NAME);
plugin_class_create()40 extern "C" void* plugin_class_create()
41 {
42 Monitor* m_inst = Monitor::instance();
43 assert(dynamic_cast<AmDynInvokeFactory*>(m_inst));
44
45 return m_inst;
46 }
47
48 Monitor* Monitor::_instance=0;
49 unsigned int Monitor::gcInterval = 10;
50 unsigned int Monitor::retain_samples_s = 10;
51
instance()52 Monitor* Monitor::instance()
53 {
54 if(_instance == NULL)
55 _instance = new Monitor(MOD_NAME);
56 return _instance;
57 }
58
Monitor(const string & name)59 Monitor::Monitor(const string& name)
60 : AmDynInvokeFactory(MOD_NAME), gc_thread(nullptr) {
61 }
62
~Monitor()63 Monitor::~Monitor() {
64 }
65
onLoad()66 int Monitor::onLoad() {
67 // todo: if GC configured, start thread
68 AmConfigReader cfg;
69 if(cfg.loadFile(AmConfig::ModConfigPath + string(MOD_NAME ".conf"))) {
70 DBG("monitoring not starting garbage collector\n");
71 return 0;
72 }
73
74 if (cfg.getParameter("run_garbage_collector","no") == "yes") {
75 gcInterval = cfg.getParameterInt("garbage_collector_interval", 10);
76 DBG("Running garbage collection for monitoring every %u seconds\n",
77 gcInterval);
78 gc_thread.reset(new MonitorGarbageCollector());
79 gc_thread->start();
80 AmEventDispatcher::instance()->addEventQueue("monitoring_gc", gc_thread.get());
81 // // add garbage collector to garbage collector...
82 // AmThreadWatcher::instance()->add(gc_thread);
83 }
84
85 retain_samples_s = cfg.getParameterInt("retain_samples_s", 10);
86
87 return 0;
88 }
89
invoke(const string & method,const AmArg & args,AmArg & ret)90 void Monitor::invoke(const string& method,
91 const AmArg& args, AmArg& ret) {
92 if((method == "log") || (method == "set")) {
93 log(args,ret);
94 } else if((method == "logAdd") || (method == "add")) {
95 logAdd(args,ret);
96 } else if(method == "markFinished"){
97 markFinished(args,ret);
98 } else if(method == "setExpiration"){
99 setExpiration(args,ret);
100 } else if(method == "get"){
101 get(args,ret);
102 } else if(method == "getSingle"){
103 getSingle(args,ret);
104 } else if(method == "inc"){
105 inc(args,ret);
106 } else if(method == "dec"){
107 dec(args,ret);
108 } else if(method == "addCount"){
109 addCount(args,ret);
110 } else if(method == "addSample"){
111 addSample(args,ret);
112 } else if(method == "getCount"){
113 getCount(args,ret);
114 } else if(method == "getAllCounts"){
115 getAllCounts(args,ret);
116 } else if(method == "getAttribute"){
117 getAttribute(args,ret);
118 } else if(method == "getAttributeFinished"){
119 getAttributeFinished(args,ret);
120 } else if(method == "getAttributeActive"){
121 getAttributeActive(args,ret);
122 } else if(method == "list"){
123 listAll(args,ret);
124 } else if(method == "listByFilter"){
125 listByFilter(args,ret, false);
126 } else if(method == "listByRegex"){
127 listByRegex(args,ret);
128 } else if(method == "listFinished"){
129 listFinished(args,ret);
130 } else if(method == "listActive"){
131 listActive(args,ret);
132 } else if(method == "clear"){
133 clear(args,ret);
134 } else if(method == "clearFinished"){
135 clearFinished(args,ret);
136 } else if(method == "erase"){
137 clear(args,ret);
138 } else if(method == "eraseByFilter"){
139 listByFilter(args,ret, true);
140 } else if(method == "_list"){
141 ret.push(AmArg("log"));
142 ret.push(AmArg("set"));
143 ret.push(AmArg("logAdd"));
144 ret.push(AmArg("add"));
145 ret.push(AmArg("inc"));
146 ret.push(AmArg("dec"));
147 ret.push(AmArg("addSample"));
148 ret.push(AmArg("markFinished"));
149 ret.push(AmArg("setExpiration"));
150 ret.push(AmArg("erase"));
151 ret.push(AmArg("eraseByFilter"));
152 ret.push(AmArg("clear"));
153 ret.push(AmArg("clearFinished"));
154 ret.push(AmArg("get"));
155 ret.push(AmArg("getAttribute"));
156 ret.push(AmArg("getAttributeActive"));
157 ret.push(AmArg("getAttributeFinished"));
158 ret.push(AmArg("getCount"));
159 ret.push(AmArg("list"));
160 ret.push(AmArg("listByFilter"));
161 ret.push(AmArg("listByRegex"));
162 ret.push(AmArg("listFinished"));
163 ret.push(AmArg("listActive"));
164 } else
165 throw AmDynInvoke::NotImplemented(method);
166 }
167
log(const AmArg & args,AmArg & ret)168 void Monitor::log(const AmArg& args, AmArg& ret) {
169 assertArgCStr(args[0]);
170
171 LogBucket& bucket = getLogBucket(args[0].asCStr());
172 bucket.log_lock.lock();
173 try {
174 for (size_t i=1;i<args.size();i+=2)
175 bucket.log[args[0].asCStr()].info[args[i].asCStr()]=AmArg(args[i+1]);
176 } catch (...) {
177 bucket.log_lock.unlock();
178 ret.push(-1);
179 ret.push("ERROR while converting value");
180 throw;
181 }
182 bucket.log_lock.unlock();
183 ret.push(0);
184 ret.push("OK");
185 }
186
add(const AmArg & args,AmArg & ret,int a)187 void Monitor::add(const AmArg& args, AmArg& ret, int a) {
188 assertArgCStr(args[0]);
189
190 LogBucket& bucket = getLogBucket(args[0].asCStr());
191 bucket.log_lock.lock();
192 try {
193 //for (size_t i=1;i<args.size();i++) {
194 int val = 0;
195 AmArg& v = bucket.log[args[0].asCStr()].info[args[1].asCStr()];
196 if (isArgInt(v))
197 val = v.asInt();
198 val+=a;
199 v = val;
200 //}
201 } catch (...) {
202 bucket.log_lock.unlock();
203 ret.push(-1);
204 ret.push("ERROR while converting value");
205 throw;
206 }
207 bucket.log_lock.unlock();
208 ret.push(0);
209 ret.push("OK");
210 }
211
inc(const AmArg & args,AmArg & ret)212 void Monitor::inc(const AmArg& args, AmArg& ret) {
213 add(args, ret, 1);
214 }
215
dec(const AmArg & args,AmArg & ret)216 void Monitor::dec(const AmArg& args, AmArg& ret) {
217 add(args, ret, -1);
218 }
219
addCount(const AmArg & args,AmArg & ret)220 void Monitor::addCount(const AmArg& args, AmArg& ret) {
221 assertArgInt(args[2]);
222 add(args, ret, args[2].asInt());
223 }
224
logAdd(const AmArg & args,AmArg & ret)225 void Monitor::logAdd(const AmArg& args, AmArg& ret) {
226 assertArgCStr(args[0]);
227 assertArgCStr(args[1]);
228
229 LogBucket& bucket = getLogBucket(args[0].asCStr());
230 bucket.log_lock.lock();
231 try {
232 AmArg& val = bucket.log[args[0].asCStr()].info[args[1].asCStr()];
233 if (!isArgArray(val) && !isArgUndef(val)) {
234 AmArg v1 = val;
235 val = AmArg();
236 val.push(v1);
237 }
238 val.push(AmArg(args[2]));
239 } catch (...) {
240 bucket.log_lock.unlock();
241 throw;
242 }
243 ret.push(0);
244 ret.push("OK");
245 bucket.log_lock.unlock();
246 }
247
248
249 // Expected args:
250 // name, key, [counter=1], [timestamp=now]
addSample(const AmArg & args,AmArg & ret)251 void Monitor::addSample(const AmArg& args, AmArg& ret) {
252 assertArgCStr(args[0]);
253 assertArgCStr(args[1]);
254
255 struct timeval now;
256 int cnt = 1;
257
258 if (args.size() > 2 && isArgInt(args[2])) {
259 cnt = args[2].asInt();
260
261 if (args.size() > 3 && isArgBlob(args[3])) {
262 now = *((struct timeval*) args[3].asBlob()->data);
263 }
264 else {
265 gettimeofday(&now, NULL);
266 }
267 }
268 else if (args.size() > 2 && isArgBlob(args[2])) {
269 now = *((struct timeval*)args[2].asBlob()->data);
270 } else {
271 gettimeofday(&now, NULL);
272 }
273
274 LogBucket& bucket = getLogBucket(args[0].asCStr());
275 bucket.log_lock.lock();
276 list<SampleInfo::time_cnt>& sample_list
277 = bucket.samples[args[0].asCStr()].sample[args[1].asCStr()];
278 if ((!sample_list.empty()) && timercmp(&sample_list.front().time, &now, >=)) {
279 // sample list time stamps needs to be monotonically increasing - clear if resyncing
280 // WARN("clock drift backwards - clearing %zd items\n", sample_list.size());
281 sample_list.clear();
282 }
283 sample_list.push_front(SampleInfo::time_cnt(now, cnt));
284 bucket.log_lock.unlock();
285
286 ret.push(0);
287 ret.push("OK");
288 }
289
truncate_samples(list<SampleInfo::time_cnt> & v,struct timeval now)290 void Monitor::truncate_samples(
291 list<SampleInfo::time_cnt>& v, struct timeval now) {
292 struct timeval cliff = now;
293 cliff.tv_sec -= retain_samples_s;
294 while ((!v.empty()) && timercmp(&cliff, &(v.back().time), >=))
295 v.pop_back();
296 }
297
298
299 // Expected args:
300 // name, key, [now [from [to]]] (blob type)
301 // or:
302 // name, key, interval in sec (int type)
getCount(const AmArg & args,AmArg & ret)303 void Monitor::getCount(const AmArg& args, AmArg& ret) {
304 assertArgCStr(args[0]);
305 assertArgCStr(args[1]);
306
307 struct timeval now;
308 if (args.size()>2 && isArgBlob(args[2])) {
309 now = *(struct timeval*)args[2].asBlob()->data;
310 } else {
311 gettimeofday(&now, NULL);
312 }
313
314 struct timeval from;
315 struct timeval to;
316 if (args.size()>3 && isArgBlob(args[3])) {
317 from = *(struct timeval*)args[3].asBlob()->data;
318
319 if (args.size()>4 && isArgBlob(args[4]))
320 to = *(struct timeval*)args[4].asBlob()->data;
321 else
322 to = now;
323
324 } else {
325 from = to = now;
326 if (args.size()>2 && isArgInt(args[2])) {
327 from.tv_sec -= args[2].asInt();
328 } else {
329 from.tv_sec -=1; // default: last second
330 }
331 }
332
333 if (!now.tv_sec) {
334 gettimeofday(&to, NULL);
335 }
336
337 unsigned int res = 0;
338
339 LogBucket& bucket = getLogBucket(args[0].asCStr());
340 bucket.log_lock.lock();
341
342 map<string, SampleInfo>::iterator it =
343 bucket.samples.find(args[0].asCStr());
344 if (it != bucket.samples.end()) {
345
346 map<string, list<SampleInfo::time_cnt> >::iterator s_it =
347 it->second.sample.find(args[1].asCStr());
348 if (s_it != it->second.sample.end()) {
349
350 list<SampleInfo::time_cnt>& v = s_it->second;
351 truncate_samples(v, now);
352 // todo (?): erase empty sample list
353 // if (v.empty()) {
354 // // sample vector is empty
355 // it->second.sample.erase(s_it);
356 // } else {
357 list<SampleInfo::time_cnt>::iterator v_it = v.begin();
358
359 while (v_it != v.end() && timercmp(&(v_it->time), &to, >))
360 v_it++;
361 if (v_it != v.end()) {
362 while (timercmp(&(v_it->time), &from, >=) && v_it != v.end()) {
363 res += v_it->counter;
364 v_it++;
365 }
366 }
367
368 }
369 }
370 bucket.log_lock.unlock();
371
372 ret.push((int)res);
373 }
374
375 // Expected args:
376 // name, [now [from [to]]] (blob type)
377 // or:
378 // name, interval in sec [now] (int type)
getAllCounts(const AmArg & args,AmArg & ret)379 void Monitor::getAllCounts(const AmArg& args, AmArg& ret) {
380 assertArgCStr(args[0]);
381 ret.assertStruct();
382
383 struct timeval now;
384 if (args.size()>1 && isArgBlob(args[1])) {
385 now = *(struct timeval*)args[1].asBlob()->data;
386 } else if (args.size()>2 && isArgInt(args[1]) && isArgBlob(args[2])) {
387 now = *(struct timeval*)args[2].asBlob()->data;
388 } else {
389 gettimeofday(&now, NULL);
390 }
391
392 struct timeval from;
393 struct timeval to;
394 if (args.size()>2 && isArgBlob(args[1]) && isArgBlob(args[2])) {
395 from = *(struct timeval*)args[2].asBlob()->data;
396
397 if (args.size()>3 && isArgBlob(args[3]))
398 to = *(struct timeval*)args[3].asBlob()->data;
399 else
400 to = now;
401
402 } else {
403 from = to = now;
404 if (args.size()>1 && isArgInt(args[1])) {
405 from.tv_sec -= args[1].asInt();
406 } else {
407 from.tv_sec -=1; // default: last second
408 }
409 }
410
411 if (!now.tv_sec) {
412 gettimeofday(&to, NULL);
413 }
414
415 LogBucket& bucket = getLogBucket(args[0].asCStr());
416 bucket.log_lock.lock();
417
418 map<string, SampleInfo>::iterator it =
419 bucket.samples.find(args[0].asCStr());
420 if (it != bucket.samples.end()) {
421
422 for (map<string, list<SampleInfo::time_cnt> >::iterator s_it =
423 it->second.sample.begin(); s_it != it->second.sample.end() ; s_it++) {
424
425 list<SampleInfo::time_cnt>& v = s_it->second;
426 truncate_samples(v, now);
427 list<SampleInfo::time_cnt>::iterator v_it = v.begin();
428
429 unsigned int res = 0;
430
431 while (timercmp(&(v_it->time), &to, >) && v_it != v.end())
432 v_it++;
433 if (v_it != v.end()) {
434 while (timercmp(&(v_it->time), &from, >=) && v_it != v.end()) {
435 res += v_it->counter;
436 v_it++;
437 }
438 }
439
440 ret[s_it->first] = (int)res;
441 }
442
443 }
444 bucket.log_lock.unlock();
445 }
446
447
markFinished(const AmArg & args,AmArg & ret)448 void Monitor::markFinished(const AmArg& args, AmArg& ret) {
449 assertArgCStr(args[0]);
450
451 LogBucket& bucket = getLogBucket(args[0].asCStr());
452 bucket.log_lock.lock();
453 if (!bucket.log[args[0].asCStr()].finished)
454 bucket.log[args[0].asCStr()].finished = time(0);
455 bucket.log_lock.unlock();
456 ret.push(0);
457 ret.push("OK");
458 }
459
setExpiration(const AmArg & args,AmArg & ret)460 void Monitor::setExpiration(const AmArg& args, AmArg& ret) {
461 assertArgCStr(args[0]);
462 assertArgInt(args[1]);
463
464 LogBucket& bucket = getLogBucket(args[0].asCStr());
465 bucket.log_lock.lock();
466 bucket.log[args[0].asCStr()].finished = args[1].asInt();
467 bucket.log_lock.unlock();
468 ret.push(0);
469 ret.push("OK");
470 }
471
erase(const AmArg & args,AmArg & ret)472 void Monitor::erase(const AmArg& args, AmArg& ret) {
473 assertArgCStr(args[0]);
474 LogBucket& bucket = getLogBucket(args[0].asCStr());
475 bucket.log_lock.lock();
476 bucket.log.erase(args[0].asCStr());
477 bucket.samples.erase(args[0].asCStr());
478 bucket.log_lock.unlock();
479 ret.push(0);
480 ret.push("OK");
481 }
482
clear(const AmArg & args,AmArg & ret)483 void Monitor::clear(const AmArg& args, AmArg& ret) {
484 for (int i=0;i<NUM_LOG_BUCKETS;i++) {
485 logs[i].log_lock.lock();
486 logs[i].log.clear();
487 logs[i].samples.clear();
488 logs[i].log_lock.unlock();
489 }
490 ret.push(0);
491 ret.push("OK");
492 }
493
clearFinished(const AmArg & args,AmArg & ret)494 void Monitor::clearFinished(const AmArg& args, AmArg& ret) {
495 clearFinished();
496
497 ret.push(0);
498 ret.push("OK");
499 }
500
clearFinished()501 void Monitor::clearFinished() {
502 time_t now = time(0);
503 for (int i=0;i<NUM_LOG_BUCKETS;i++) {
504 logs[i].log_lock.lock();
505 std::map<string, LogInfo>::iterator it=
506 logs[i].log.begin();
507 while (it != logs[i].log.end()) {
508 if (it->second.finished &&
509 it->second.finished <= now) {
510 std::map<string, LogInfo>::iterator d_it = it;
511 it++;
512 logs[i].samples.erase(d_it->first);
513 logs[i].log.erase(d_it);
514 } else {
515 it++;
516 }
517 }
518 logs[i].log_lock.unlock();
519 }
520 }
521
get(const AmArg & args,AmArg & ret)522 void Monitor::get(const AmArg& args, AmArg& ret) {
523 assertArgCStr(args[0]);
524 ret.assertArray();
525 LogBucket& bucket = getLogBucket(args[0].asCStr());
526 bucket.log_lock.lock();
527 std::map<string, LogInfo>::iterator it=bucket.log.find(args[0].asCStr());
528 if (it!=bucket.log.end())
529 ret.push(it->second.info);
530 bucket.log_lock.unlock();
531 }
532
getSingle(const AmArg & args,AmArg & ret)533 void Monitor::getSingle(const AmArg& args, AmArg& ret) {
534 assertArgCStr(args[0]);
535 assertArgCStr(args[1]);
536 ret.assertArray();
537
538 DBG("getSingle(%s,%s)",
539 args[0].asCStr(),
540 args[1].asCStr());
541
542 LogBucket& bucket = getLogBucket(args[0].asCStr());
543 bucket.log_lock.lock();
544 std::map<string, LogInfo>::iterator it=bucket.log.find(args[0].asCStr());
545 if (it!=bucket.log.end()){
546 AmArg& _v = it->second.info;
547 DBG("found log: %s",AmArg::print(_v).c_str());
548 if(isArgStruct(_v) && _v.hasMember(args[1].asCStr())) {
549 ret.push(_v[args[1].asCStr()]);
550 }
551 }
552 bucket.log_lock.unlock();
553 DBG("ret = %s",AmArg::print(ret).c_str());
554 }
555
getAttribute(const AmArg & args,AmArg & ret)556 void Monitor::getAttribute(const AmArg& args, AmArg& ret) {
557 assertArgCStr(args[0]);
558 string attr_name = args[0].asCStr();
559 for (int i=0;i<NUM_LOG_BUCKETS;i++) {
560 logs[i].log_lock.lock();
561 for (std::map<string, LogInfo>::iterator it=
562 logs[i].log.begin();it != logs[i].log.end();it++) {
563 ret.push(AmArg());
564 AmArg& val = ret.get(ret.size()-1);
565 val.push(AmArg(it->first.c_str()));
566 val.push(it->second.info[attr_name]);
567 }
568 logs[i].log_lock.unlock();
569 }
570 }
571
572
573 #define DEF_GET_ATTRIB_FUNC(func_name, cond) \
574 void Monitor::func_name(const AmArg& args, AmArg& ret) { \
575 assertArgCStr(args[0]); \
576 ret.assertArray(); \
577 string attr_name = args[0].asCStr(); \
578 time_t now = time(0); \
579 for (int i=0;i<NUM_LOG_BUCKETS;i++) { \
580 logs[i].log_lock.lock(); \
581 for (std::map<string, LogInfo>::iterator it= \
582 logs[i].log.begin();it != logs[i].log.end();it++) { \
583 if (cond) { \
584 ret.push(AmArg()); \
585 AmArg& val = ret.get(ret.size()-1); \
586 val.push(AmArg(it->first.c_str())); \
587 val.push(it->second.info[attr_name]); \
588 } \
589 } \
590 logs[i].log_lock.unlock(); \
591 } \
592 }
593
594 DEF_GET_ATTRIB_FUNC(getAttributeActive, (!(it->second.finished &&
595 it->second.finished <= now)))
596 DEF_GET_ATTRIB_FUNC(getAttributeFinished,(it->second.finished &&
597 it->second.finished <= now))
598 #undef DEF_GET_ATTRIB_FUNC
599
listAll(const AmArg & args,AmArg & ret)600 void Monitor::listAll(const AmArg& args, AmArg& ret) {
601 ret.assertArray();
602 for (int i=0;i<NUM_LOG_BUCKETS;i++) {
603 logs[i].log_lock.lock();
604 for (std::map<string, LogInfo>::iterator it=
605 logs[i].log.begin(); it != logs[i].log.end(); it++) {
606 ret.push(AmArg(it->first.c_str()));
607 }
608 logs[i].log_lock.unlock();
609 }
610 }
611
listByFilter(const AmArg & args,AmArg & ret,bool erase)612 void Monitor::listByFilter(const AmArg& args, AmArg& ret, bool erase) {
613 ret.assertArray();
614 for (int i=0;i<NUM_LOG_BUCKETS;i++) {
615 logs[i].log_lock.lock();
616 try {
617 std::map<string, LogInfo>::iterator it=logs[i].log.begin();
618
619 while (it != logs[i].log.end()) {
620 bool match = true;
621 for (size_t a_i=0;a_i<args.size();a_i++) {
622 AmArg& p = args.get(a_i);
623 if (!(it->second.info[p.get(0).asCStr()]==p.get(1))) {
624 match = false;
625 break;
626 }
627 }
628
629 if (match) {
630 ret.push(AmArg(it->first.c_str()));
631 if (erase) {
632 std::map<string, LogInfo>::iterator d_it=it;
633 it++;
634 logs[i].log.erase(d_it);
635 continue;
636 }
637 }
638 it++;
639 }
640 } catch(...) {
641 logs[i].log_lock.unlock();
642 throw;
643 }
644 logs[i].log_lock.unlock();
645 }
646 }
647
listByRegex(const AmArg & args,AmArg & ret)648 void Monitor::listByRegex(const AmArg& args, AmArg& ret) {
649 assertArgCStr(args[0]);
650 assertArgCStr(args[1]);
651
652 ret.assertArray();
653 regex_t attr_reg;
654 if(regcomp(&attr_reg,args[1].asCStr(),REG_NOSUB)){
655 ERROR("could not compile regex '%s'\n", args[1].asCStr());
656 return;
657 }
658
659 for (int i=0;i<NUM_LOG_BUCKETS;i++) {
660 logs[i].log_lock.lock();
661 try {
662 for (std::map<string, LogInfo>::iterator it=
663 logs[i].log.begin(); it != logs[i].log.end(); it++) {
664 if (!it->second.info.hasMember(args[0].asCStr()) ||
665 !isArgCStr(it->second.info[args[0].asCStr()]) ||
666 regexec(&attr_reg,it->second.info[args[0].asCStr()].asCStr(),0,0,0))
667 continue;
668
669 ret.push(AmArg(it->first.c_str()));
670 }
671 } catch(...) {
672 logs[i].log_lock.unlock();
673 throw;
674 }
675 logs[i].log_lock.unlock();
676 }
677
678 regfree(&attr_reg);
679 }
680
listFinished(const AmArg & args,AmArg & ret)681 void Monitor::listFinished(const AmArg& args, AmArg& ret) {
682 time_t now = time(0);
683 ret.assertArray();
684 for (int i=0;i<NUM_LOG_BUCKETS;i++) {
685 logs[i].log_lock.lock();
686 for (std::map<string, LogInfo>::iterator it=
687 logs[i].log.begin(); it != logs[i].log.end(); it++) {
688 if (it->second.finished &&
689 it->second.finished <= now)
690 ret.push(AmArg(it->first.c_str()));
691 }
692 logs[i].log_lock.unlock();
693 }
694 }
695
696
listActive(const AmArg & args,AmArg & ret)697 void Monitor::listActive(const AmArg& args, AmArg& ret) {
698 time_t now = time(0);
699 ret.assertArray();
700 for (int i=0;i<NUM_LOG_BUCKETS;i++) {
701 logs[i].log_lock.lock();
702 for (std::map<string, LogInfo>::iterator it=
703 logs[i].log.begin(); it != logs[i].log.end(); it++) {
704 if (!(it->second.finished &&
705 it->second.finished <= now))
706 ret.push(AmArg(it->first.c_str()));
707 }
708 logs[i].log_lock.unlock();
709 }
710 }
711
getLogBucket(const string & call_id)712 LogBucket& Monitor::getLogBucket(const string& call_id) {
713 if (call_id.empty())
714 return logs[0];
715 char c = '\0'; // some distribution...bad luck if all callid start with 00000...
716 for (size_t i=0;i<5 && i<call_id.length();i++)
717 c = c ^ call_id[i];
718
719 return logs[c % NUM_LOG_BUCKETS];
720 }
721
run()722 void MonitorGarbageCollector::run() {
723 DBG("running MonitorGarbageCollector thread\n");
724 running.set(true);
725 while (running.get()) {
726 sleep(Monitor::gcInterval);
727 Monitor::instance()->clearFinished();
728 }
729 DBG("MonitorGarbageCollector thread ends\n");
730 AmEventDispatcher::instance()->delEventQueue("monitoring_gc");
731 }
732
postEvent(AmEvent * e)733 void MonitorGarbageCollector::postEvent(AmEvent* e) {
734 AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(e);
735 if (sys_ev &&
736 sys_ev->sys_event == AmSystemEvent::ServerShutdown) {
737 DBG("stopping MonitorGarbageCollector thread\n");
738 running.set(false);
739 return;
740 }
741
742 WARN("received unknown event\n");
743 }
744
on_stop()745 void MonitorGarbageCollector::on_stop() {
746 }
747