1 /*
2  * Copyright (C) 2009 IPTEGO GmbH
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 
28 #include "Monitoring.h"
29 
30 #include "AmConfigReader.h"
31 #include "AmEventDispatcher.h"
32 
33 #include "log.h"
34 
35 #include <sys/types.h>
36 #include <regex.h>
37 #include <unistd.h>
38 
39 //EXPORT_PLUGIN_CLASS_FACTORY(Monitor, MOD_NAME);
plugin_class_create()40 extern "C" void* plugin_class_create()
41 {
42     Monitor* m_inst = Monitor::instance();
43     assert(dynamic_cast<AmDynInvokeFactory*>(m_inst));
44 
45     return m_inst;
46 }
47 
48 Monitor* Monitor::_instance=0;
49 unsigned int Monitor::gcInterval = 10;
50 unsigned int Monitor::retain_samples_s = 10;
51 
instance()52 Monitor* Monitor::instance()
53 {
54   if(_instance == NULL)
55     _instance = new Monitor(MOD_NAME);
56   return _instance;
57 }
58 
Monitor(const string & name)59 Monitor::Monitor(const string& name)
60   : AmDynInvokeFactory(MOD_NAME), gc_thread(nullptr) {
61 }
62 
~Monitor()63 Monitor::~Monitor() {
64 }
65 
onLoad()66 int Monitor::onLoad() {
67   // todo: if GC configured, start thread
68   AmConfigReader cfg;
69   if(cfg.loadFile(AmConfig::ModConfigPath + string(MOD_NAME ".conf"))) {
70     DBG("monitoring not starting garbage collector\n");
71     return 0;
72   }
73 
74   if (cfg.getParameter("run_garbage_collector","no") == "yes") {
75     gcInterval = cfg.getParameterInt("garbage_collector_interval", 10);
76     DBG("Running garbage collection for monitoring every %u seconds\n",
77 	gcInterval);
78     gc_thread.reset(new MonitorGarbageCollector());
79     gc_thread->start();
80     AmEventDispatcher::instance()->addEventQueue("monitoring_gc", gc_thread.get());
81 //     // add garbage collector to garbage collector...
82 //     AmThreadWatcher::instance()->add(gc_thread);
83   }
84 
85   retain_samples_s = cfg.getParameterInt("retain_samples_s", 10);
86 
87   return 0;
88 }
89 
invoke(const string & method,const AmArg & args,AmArg & ret)90 void Monitor::invoke(const string& method,
91 		     const AmArg& args, AmArg& ret) {
92   if((method == "log") || (method == "set")) {
93     log(args,ret);
94   } else if((method == "logAdd") || (method == "add")) {
95     logAdd(args,ret);
96   } else if(method == "markFinished"){
97     markFinished(args,ret);
98   } else if(method == "setExpiration"){
99     setExpiration(args,ret);
100   } else if(method == "get"){
101     get(args,ret);
102   } else if(method == "getSingle"){
103     getSingle(args,ret);
104   } else if(method == "inc"){
105     inc(args,ret);
106   } else if(method == "dec"){
107     dec(args,ret);
108   } else if(method == "addCount"){
109     addCount(args,ret);
110   } else if(method == "addSample"){
111     addSample(args,ret);
112   } else if(method == "getCount"){
113     getCount(args,ret);
114   } else if(method == "getAllCounts"){
115       getAllCounts(args,ret);
116   } else if(method == "getAttribute"){
117     getAttribute(args,ret);
118   } else if(method == "getAttributeFinished"){
119     getAttributeFinished(args,ret);
120   } else if(method == "getAttributeActive"){
121     getAttributeActive(args,ret);
122   } else if(method == "list"){
123     listAll(args,ret);
124   } else if(method == "listByFilter"){
125     listByFilter(args,ret, false);
126   } else if(method == "listByRegex"){
127     listByRegex(args,ret);
128   } else if(method == "listFinished"){
129     listFinished(args,ret);
130   } else if(method == "listActive"){
131     listActive(args,ret);
132   } else if(method == "clear"){
133     clear(args,ret);
134   } else if(method == "clearFinished"){
135     clearFinished(args,ret);
136   } else if(method == "erase"){
137     clear(args,ret);
138   } else if(method == "eraseByFilter"){
139     listByFilter(args,ret, true);
140   } else if(method == "_list"){
141     ret.push(AmArg("log"));
142     ret.push(AmArg("set"));
143     ret.push(AmArg("logAdd"));
144     ret.push(AmArg("add"));
145     ret.push(AmArg("inc"));
146     ret.push(AmArg("dec"));
147     ret.push(AmArg("addSample"));
148     ret.push(AmArg("markFinished"));
149     ret.push(AmArg("setExpiration"));
150     ret.push(AmArg("erase"));
151     ret.push(AmArg("eraseByFilter"));
152     ret.push(AmArg("clear"));
153     ret.push(AmArg("clearFinished"));
154     ret.push(AmArg("get"));
155     ret.push(AmArg("getAttribute"));
156     ret.push(AmArg("getAttributeActive"));
157     ret.push(AmArg("getAttributeFinished"));
158     ret.push(AmArg("getCount"));
159     ret.push(AmArg("list"));
160     ret.push(AmArg("listByFilter"));
161     ret.push(AmArg("listByRegex"));
162     ret.push(AmArg("listFinished"));
163     ret.push(AmArg("listActive"));
164   } else
165     throw AmDynInvoke::NotImplemented(method);
166 }
167 
log(const AmArg & args,AmArg & ret)168 void Monitor::log(const AmArg& args, AmArg& ret) {
169   assertArgCStr(args[0]);
170 
171   LogBucket& bucket = getLogBucket(args[0].asCStr());
172   bucket.log_lock.lock();
173   try {
174     for (size_t i=1;i<args.size();i+=2)
175       bucket.log[args[0].asCStr()].info[args[i].asCStr()]=AmArg(args[i+1]);
176   } catch (...) {
177     bucket.log_lock.unlock();
178     ret.push(-1);
179     ret.push("ERROR while converting value");
180     throw;
181   }
182   bucket.log_lock.unlock();
183   ret.push(0);
184   ret.push("OK");
185 }
186 
add(const AmArg & args,AmArg & ret,int a)187 void Monitor::add(const AmArg& args, AmArg& ret, int a) {
188   assertArgCStr(args[0]);
189 
190   LogBucket& bucket = getLogBucket(args[0].asCStr());
191   bucket.log_lock.lock();
192   try {
193     //for (size_t i=1;i<args.size();i++) {
194     int val = 0;
195     AmArg& v = bucket.log[args[0].asCStr()].info[args[1].asCStr()];
196     if (isArgInt(v))
197       val = v.asInt();
198     val+=a;
199     v = val;
200     //}
201   } catch (...) {
202     bucket.log_lock.unlock();
203     ret.push(-1);
204     ret.push("ERROR while converting value");
205     throw;
206   }
207   bucket.log_lock.unlock();
208   ret.push(0);
209   ret.push("OK");
210 }
211 
inc(const AmArg & args,AmArg & ret)212 void Monitor::inc(const AmArg& args, AmArg& ret) {
213   add(args, ret, 1);
214 }
215 
dec(const AmArg & args,AmArg & ret)216 void Monitor::dec(const AmArg& args, AmArg& ret) {
217   add(args, ret, -1);
218 }
219 
addCount(const AmArg & args,AmArg & ret)220 void Monitor::addCount(const AmArg& args, AmArg& ret) {
221   assertArgInt(args[2]);
222   add(args, ret, args[2].asInt());
223 }
224 
logAdd(const AmArg & args,AmArg & ret)225 void Monitor::logAdd(const AmArg& args, AmArg& ret) {
226   assertArgCStr(args[0]);
227   assertArgCStr(args[1]);
228 
229   LogBucket& bucket = getLogBucket(args[0].asCStr());
230   bucket.log_lock.lock();
231   try {
232     AmArg& val = bucket.log[args[0].asCStr()].info[args[1].asCStr()];
233     if (!isArgArray(val) && !isArgUndef(val)) {
234       AmArg v1 = val;
235       val = AmArg();
236       val.push(v1);
237     }
238     val.push(AmArg(args[2]));
239   } catch (...) {
240     bucket.log_lock.unlock();
241     throw;
242   }
243   ret.push(0);
244   ret.push("OK");
245   bucket.log_lock.unlock();
246 }
247 
248 
249 // Expected args:
250 // name, key, [counter=1], [timestamp=now]
addSample(const AmArg & args,AmArg & ret)251 void Monitor::addSample(const AmArg& args, AmArg& ret) {
252   assertArgCStr(args[0]);
253   assertArgCStr(args[1]);
254 
255   struct timeval now;
256   int cnt = 1;
257 
258   if (args.size() > 2 && isArgInt(args[2])) {
259     cnt = args[2].asInt();
260 
261     if (args.size() > 3 && isArgBlob(args[3])) {
262       now = *((struct timeval*) args[3].asBlob()->data);
263     }
264     else {
265       gettimeofday(&now, NULL);
266     }
267   }
268   else if (args.size() > 2 && isArgBlob(args[2])) {
269     now = *((struct timeval*)args[2].asBlob()->data);
270   } else {
271     gettimeofday(&now, NULL);
272   }
273 
274   LogBucket& bucket = getLogBucket(args[0].asCStr());
275   bucket.log_lock.lock();
276   list<SampleInfo::time_cnt>& sample_list
277     = bucket.samples[args[0].asCStr()].sample[args[1].asCStr()];
278   if ((!sample_list.empty()) && timercmp(&sample_list.front().time, &now, >=)) {
279     // sample list time stamps needs to be monotonically increasing - clear if resyncing
280     // WARN("clock drift backwards - clearing %zd items\n", sample_list.size());
281     sample_list.clear();
282   }
283   sample_list.push_front(SampleInfo::time_cnt(now, cnt));
284   bucket.log_lock.unlock();
285 
286   ret.push(0);
287   ret.push("OK");
288 }
289 
truncate_samples(list<SampleInfo::time_cnt> & v,struct timeval now)290 void Monitor::truncate_samples(
291             list<SampleInfo::time_cnt>& v, struct timeval now) {
292   struct timeval cliff = now;
293   cliff.tv_sec -= retain_samples_s;
294   while ((!v.empty()) && timercmp(&cliff, &(v.back().time), >=))
295     v.pop_back();
296 }
297 
298 
299 // Expected args:
300 // name, key, [now [from [to]]]   (blob type)
301 //  or:
302 // name, key, interval in sec     (int type)
getCount(const AmArg & args,AmArg & ret)303 void Monitor::getCount(const AmArg& args, AmArg& ret) {
304   assertArgCStr(args[0]);
305   assertArgCStr(args[1]);
306 
307   struct timeval now;
308   if (args.size()>2 && isArgBlob(args[2])) {
309     now = *(struct timeval*)args[2].asBlob()->data;
310   } else {
311     gettimeofday(&now, NULL);
312   }
313 
314   struct timeval from;
315   struct timeval to;
316   if (args.size()>3 && isArgBlob(args[3])) {
317     from = *(struct timeval*)args[3].asBlob()->data;
318 
319     if (args.size()>4 && isArgBlob(args[4]))
320       to = *(struct timeval*)args[4].asBlob()->data;
321     else
322       to = now;
323 
324   } else {
325     from = to = now;
326     if (args.size()>2 && isArgInt(args[2])) {
327       from.tv_sec -= args[2].asInt();
328     } else {
329       from.tv_sec -=1; // default: last second
330     }
331   }
332 
333   if (!now.tv_sec) {
334     gettimeofday(&to, NULL);
335   }
336 
337   unsigned int res = 0;
338 
339   LogBucket& bucket = getLogBucket(args[0].asCStr());
340   bucket.log_lock.lock();
341 
342   map<string, SampleInfo>::iterator it =
343         bucket.samples.find(args[0].asCStr());
344   if (it != bucket.samples.end()) {
345 
346     map<string, list<SampleInfo::time_cnt> >::iterator s_it =
347           it->second.sample.find(args[1].asCStr());
348     if (s_it != it->second.sample.end()) {
349 
350       list<SampleInfo::time_cnt>& v = s_it->second;
351       truncate_samples(v, now);
352       // todo (?): erase empty sample list
353       // if (v.empty()) {
354       // 	// sample vector is empty
355       // 	it->second.sample.erase(s_it);
356       // } else {
357       list<SampleInfo::time_cnt>::iterator v_it = v.begin();
358 
359       while (v_it != v.end() && timercmp(&(v_it->time), &to, >))
360 	v_it++;
361       if (v_it != v.end()) {
362 	while (timercmp(&(v_it->time), &from, >=) && v_it != v.end()) {
363 	  res += v_it->counter;
364 	  v_it++;
365 	}
366       }
367 
368     }
369   }
370   bucket.log_lock.unlock();
371 
372   ret.push((int)res);
373 }
374 
375 // Expected args:
376 // name, [now [from [to]]]   (blob type)
377 //  or:
378 // name, interval in sec [now]  (int type)
getAllCounts(const AmArg & args,AmArg & ret)379 void Monitor::getAllCounts(const AmArg& args, AmArg& ret) {
380   assertArgCStr(args[0]);
381 	ret.assertStruct();
382 
383   struct timeval now;
384   if (args.size()>1 && isArgBlob(args[1])) {
385     now = *(struct timeval*)args[1].asBlob()->data;
386   } else if (args.size()>2 && isArgInt(args[1]) && isArgBlob(args[2])) {
387     now = *(struct timeval*)args[2].asBlob()->data;
388 	} else {
389     gettimeofday(&now, NULL);
390   }
391 
392   struct timeval from;
393   struct timeval to;
394   if (args.size()>2 && isArgBlob(args[1]) && isArgBlob(args[2])) {
395     from = *(struct timeval*)args[2].asBlob()->data;
396 
397     if (args.size()>3 && isArgBlob(args[3]))
398       to = *(struct timeval*)args[3].asBlob()->data;
399     else
400       to = now;
401 
402   } else {
403     from = to = now;
404     if (args.size()>1 && isArgInt(args[1])) {
405       from.tv_sec -= args[1].asInt();
406     } else {
407       from.tv_sec -=1; // default: last second
408     }
409   }
410 
411   if (!now.tv_sec) {
412     gettimeofday(&to, NULL);
413   }
414 
415   LogBucket& bucket = getLogBucket(args[0].asCStr());
416   bucket.log_lock.lock();
417 
418   map<string, SampleInfo>::iterator it =
419         bucket.samples.find(args[0].asCStr());
420   if (it != bucket.samples.end()) {
421 
422     for (map<string, list<SampleInfo::time_cnt> >::iterator s_it =
423           it->second.sample.begin(); s_it != it->second.sample.end() ; s_it++) {
424 
425       list<SampleInfo::time_cnt>& v = s_it->second;
426       truncate_samples(v, now);
427       list<SampleInfo::time_cnt>::iterator v_it = v.begin();
428 
429       unsigned int res = 0;
430 
431       while (timercmp(&(v_it->time), &to, >) && v_it != v.end())
432         v_it++;
433       if (v_it != v.end()) {
434         while (timercmp(&(v_it->time), &from, >=) && v_it != v.end()) {
435           res += v_it->counter;
436           v_it++;
437         }
438       }
439 
440       ret[s_it->first] = (int)res;
441     }
442 
443   }
444   bucket.log_lock.unlock();
445 }
446 
447 
markFinished(const AmArg & args,AmArg & ret)448 void Monitor::markFinished(const AmArg& args, AmArg& ret) {
449   assertArgCStr(args[0]);
450 
451   LogBucket& bucket = getLogBucket(args[0].asCStr());
452   bucket.log_lock.lock();
453   if (!bucket.log[args[0].asCStr()].finished)
454     bucket.log[args[0].asCStr()].finished = time(0);
455   bucket.log_lock.unlock();
456   ret.push(0);
457   ret.push("OK");
458 }
459 
setExpiration(const AmArg & args,AmArg & ret)460 void Monitor::setExpiration(const AmArg& args, AmArg& ret) {
461   assertArgCStr(args[0]);
462   assertArgInt(args[1]);
463 
464   LogBucket& bucket = getLogBucket(args[0].asCStr());
465   bucket.log_lock.lock();
466   bucket.log[args[0].asCStr()].finished = args[1].asInt();
467   bucket.log_lock.unlock();
468   ret.push(0);
469   ret.push("OK");
470 }
471 
erase(const AmArg & args,AmArg & ret)472 void Monitor::erase(const AmArg& args, AmArg& ret) {
473   assertArgCStr(args[0]);
474   LogBucket& bucket = getLogBucket(args[0].asCStr());
475   bucket.log_lock.lock();
476   bucket.log.erase(args[0].asCStr());
477   bucket.samples.erase(args[0].asCStr());
478   bucket.log_lock.unlock();
479   ret.push(0);
480   ret.push("OK");
481 }
482 
clear(const AmArg & args,AmArg & ret)483 void Monitor::clear(const AmArg& args, AmArg& ret) {
484   for (int i=0;i<NUM_LOG_BUCKETS;i++) {
485     logs[i].log_lock.lock();
486     logs[i].log.clear();
487     logs[i].samples.clear();
488     logs[i].log_lock.unlock();
489   }
490   ret.push(0);
491   ret.push("OK");
492 }
493 
clearFinished(const AmArg & args,AmArg & ret)494 void Monitor::clearFinished(const AmArg& args, AmArg& ret) {
495   clearFinished();
496 
497   ret.push(0);
498   ret.push("OK");
499 }
500 
clearFinished()501 void Monitor::clearFinished() {
502   time_t now = time(0);
503   for (int i=0;i<NUM_LOG_BUCKETS;i++) {
504     logs[i].log_lock.lock();
505     std::map<string, LogInfo>::iterator it=
506       logs[i].log.begin();
507     while (it != logs[i].log.end()) {
508       if (it->second.finished &&
509 	  it->second.finished <= now) {
510 	std::map<string, LogInfo>::iterator d_it = it;
511 	it++;
512 	logs[i].samples.erase(d_it->first);
513 	logs[i].log.erase(d_it);
514       } else {
515 	it++;
516       }
517     }
518     logs[i].log_lock.unlock();
519   }
520 }
521 
get(const AmArg & args,AmArg & ret)522 void Monitor::get(const AmArg& args, AmArg& ret) {
523   assertArgCStr(args[0]);
524   ret.assertArray();
525   LogBucket& bucket = getLogBucket(args[0].asCStr());
526   bucket.log_lock.lock();
527   std::map<string, LogInfo>::iterator it=bucket.log.find(args[0].asCStr());
528   if (it!=bucket.log.end())
529     ret.push(it->second.info);
530   bucket.log_lock.unlock();
531 }
532 
getSingle(const AmArg & args,AmArg & ret)533 void Monitor::getSingle(const AmArg& args, AmArg& ret) {
534   assertArgCStr(args[0]);
535   assertArgCStr(args[1]);
536   ret.assertArray();
537 
538   DBG("getSingle(%s,%s)",
539       args[0].asCStr(),
540       args[1].asCStr());
541 
542   LogBucket& bucket = getLogBucket(args[0].asCStr());
543   bucket.log_lock.lock();
544   std::map<string, LogInfo>::iterator it=bucket.log.find(args[0].asCStr());
545   if (it!=bucket.log.end()){
546     AmArg& _v = it->second.info;
547     DBG("found log: %s",AmArg::print(_v).c_str());
548     if(isArgStruct(_v) && _v.hasMember(args[1].asCStr())) {
549       ret.push(_v[args[1].asCStr()]);
550     }
551   }
552   bucket.log_lock.unlock();
553   DBG("ret = %s",AmArg::print(ret).c_str());
554 }
555 
getAttribute(const AmArg & args,AmArg & ret)556 void Monitor::getAttribute(const AmArg& args, AmArg& ret) {
557   assertArgCStr(args[0]);
558   string attr_name = args[0].asCStr();
559   for (int i=0;i<NUM_LOG_BUCKETS;i++) {
560     logs[i].log_lock.lock();
561     for (std::map<string, LogInfo>::iterator it=
562 	   logs[i].log.begin();it != logs[i].log.end();it++) {
563       ret.push(AmArg());
564       AmArg& val = ret.get(ret.size()-1);
565       val.push(AmArg(it->first.c_str()));
566       val.push(it->second.info[attr_name]);
567     }
568     logs[i].log_lock.unlock();
569   }
570 }
571 
572 
573 #define DEF_GET_ATTRIB_FUNC(func_name, cond)				\
574   void Monitor::func_name(const AmArg& args, AmArg& ret) {		\
575     assertArgCStr(args[0]);						\
576     ret.assertArray();							\
577     string attr_name = args[0].asCStr();				\
578     time_t now = time(0);						\
579     for (int i=0;i<NUM_LOG_BUCKETS;i++) {				\
580       logs[i].log_lock.lock();						\
581       for (std::map<string, LogInfo>::iterator it=			\
582 	     logs[i].log.begin();it != logs[i].log.end();it++) {	\
583 	if (cond) {							\
584 	  ret.push(AmArg());						\
585 	  AmArg& val = ret.get(ret.size()-1);				\
586 	  val.push(AmArg(it->first.c_str()));				\
587 	  val.push(it->second.info[attr_name]);				\
588 	}								\
589       }									\
590       logs[i].log_lock.unlock();					\
591     }									\
592   }
593 
594 DEF_GET_ATTRIB_FUNC(getAttributeActive,  (!(it->second.finished &&
595 					    it->second.finished <= now)))
596 DEF_GET_ATTRIB_FUNC(getAttributeFinished,(it->second.finished &&
597 					  it->second.finished <= now))
598 #undef DEF_GET_ATTRIB_FUNC
599 
listAll(const AmArg & args,AmArg & ret)600 void Monitor::listAll(const AmArg& args, AmArg& ret) {
601   ret.assertArray();
602   for (int i=0;i<NUM_LOG_BUCKETS;i++) {
603     logs[i].log_lock.lock();
604     for (std::map<string, LogInfo>::iterator it=
605 	   logs[i].log.begin(); it != logs[i].log.end(); it++) {
606       ret.push(AmArg(it->first.c_str()));
607     }
608     logs[i].log_lock.unlock();
609   }
610 }
611 
listByFilter(const AmArg & args,AmArg & ret,bool erase)612 void Monitor::listByFilter(const AmArg& args, AmArg& ret, bool erase) {
613   ret.assertArray();
614   for (int i=0;i<NUM_LOG_BUCKETS;i++) {
615     logs[i].log_lock.lock();
616     try {
617       std::map<string, LogInfo>::iterator it=logs[i].log.begin();
618 
619       while (it != logs[i].log.end()) {
620 	bool match = true;
621 	for (size_t a_i=0;a_i<args.size();a_i++) {
622 	  AmArg& p = args.get(a_i);
623 	  if (!(it->second.info[p.get(0).asCStr()]==p.get(1))) {
624 	    match = false;
625 	    break;
626 	  }
627 	}
628 
629 	if (match) {
630 	  ret.push(AmArg(it->first.c_str()));
631 	  if (erase) {
632 	    std::map<string, LogInfo>::iterator d_it=it;
633 	    it++;
634 	    logs[i].log.erase(d_it);
635 	    continue;
636 	  }
637 	}
638 	it++;
639       }
640     } catch(...) {
641       logs[i].log_lock.unlock();
642       throw;
643     }
644     logs[i].log_lock.unlock();
645   }
646 }
647 
listByRegex(const AmArg & args,AmArg & ret)648 void Monitor::listByRegex(const AmArg& args, AmArg& ret) {
649   assertArgCStr(args[0]);
650   assertArgCStr(args[1]);
651 
652   ret.assertArray();
653   regex_t attr_reg;
654   if(regcomp(&attr_reg,args[1].asCStr(),REG_NOSUB)){
655     ERROR("could not compile regex '%s'\n", args[1].asCStr());
656     return;
657   }
658 
659   for (int i=0;i<NUM_LOG_BUCKETS;i++) {
660     logs[i].log_lock.lock();
661     try {
662       for (std::map<string, LogInfo>::iterator it=
663 	     logs[i].log.begin(); it != logs[i].log.end(); it++) {
664 	if (!it->second.info.hasMember(args[0].asCStr())  ||
665 	    !isArgCStr(it->second.info[args[0].asCStr()]) ||
666 	    regexec(&attr_reg,it->second.info[args[0].asCStr()].asCStr(),0,0,0))
667 	  continue;
668 
669 	ret.push(AmArg(it->first.c_str()));
670       }
671     } catch(...) {
672       logs[i].log_lock.unlock();
673       throw;
674     }
675     logs[i].log_lock.unlock();
676   }
677 
678   regfree(&attr_reg);
679 }
680 
listFinished(const AmArg & args,AmArg & ret)681 void Monitor::listFinished(const AmArg& args, AmArg& ret) {
682   time_t now = time(0);
683   ret.assertArray();
684   for (int i=0;i<NUM_LOG_BUCKETS;i++) {
685     logs[i].log_lock.lock();
686     for (std::map<string, LogInfo>::iterator it=
687 	   logs[i].log.begin(); it != logs[i].log.end(); it++) {
688       if (it->second.finished &&
689 	  it->second.finished <= now)
690 	ret.push(AmArg(it->first.c_str()));
691     }
692     logs[i].log_lock.unlock();
693   }
694 }
695 
696 
listActive(const AmArg & args,AmArg & ret)697 void Monitor::listActive(const AmArg& args, AmArg& ret) {
698   time_t now = time(0);
699   ret.assertArray();
700   for (int i=0;i<NUM_LOG_BUCKETS;i++) {
701     logs[i].log_lock.lock();
702     for (std::map<string, LogInfo>::iterator it=
703 	   logs[i].log.begin(); it != logs[i].log.end(); it++) {
704       if (!(it->second.finished &&
705 	    it->second.finished <= now))
706 	ret.push(AmArg(it->first.c_str()));
707     }
708     logs[i].log_lock.unlock();
709   }
710 }
711 
getLogBucket(const string & call_id)712 LogBucket& Monitor::getLogBucket(const string& call_id) {
713   if (call_id.empty())
714     return logs[0];
715   char c = '\0'; // some distribution...bad luck if all callid start with 00000...
716   for (size_t i=0;i<5 && i<call_id.length();i++)
717     c = c ^ call_id[i];
718 
719   return logs[c % NUM_LOG_BUCKETS];
720 }
721 
run()722 void MonitorGarbageCollector::run() {
723   DBG("running MonitorGarbageCollector thread\n");
724   running.set(true);
725   while (running.get()) {
726     sleep(Monitor::gcInterval);
727     Monitor::instance()->clearFinished();
728   }
729   DBG("MonitorGarbageCollector thread ends\n");
730   AmEventDispatcher::instance()->delEventQueue("monitoring_gc");
731 }
732 
postEvent(AmEvent * e)733 void MonitorGarbageCollector::postEvent(AmEvent* e) {
734   AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(e);
735   if (sys_ev &&
736       sys_ev->sys_event == AmSystemEvent::ServerShutdown) {
737     DBG("stopping MonitorGarbageCollector thread\n");
738     running.set(false);
739     return;
740   }
741 
742   WARN("received unknown event\n");
743 }
744 
on_stop()745 void MonitorGarbageCollector::on_stop() {
746 }
747