1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  * Copyright (C) 2006 iptego GmbH
4  *
5  * This file is part of SEMS, a free SIP media server.
6  *
7  * SEMS is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version. This program is released under
11  * the GPL with the additional exemption that compiling, linking,
12  * and/or using OpenSSL is allowed.
13  *
14  * For a license to use the SEMS software under conditions
15  * other than those described here, or to purchase support for this
16  * software, please contact iptel.org by e-mail at the following addresses:
17  *    info@iptel.org
18  *
19  * SEMS is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  * GNU General Public License for more details.
23  *
24  * You should have received a copy of the GNU General Public License
25  * along with this program; if not, write to the Free Software
26  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27  */
28 
29 #include "AmSessionContainer.h"
30 #include "AmPlugIn.h"
31 #include "AmApi.h"
32 #include "AmConfig.h"
33 #include "AmUtils.h"
34 #include "AmEventDispatcher.h"
35 
36 #include <assert.h>
37 #include <sys/types.h>
38 #include <unistd.h>
39 
40 #include "sems.h"
41 
42 AmSessionContainer* AmSessionContainer::_instance=NULL;
43 
44 _MONITORING_DECLARE_INTERFACE(AmSessionContainer);
45 
AmSessionContainer()46 AmSessionContainer::AmSessionContainer()
47   : _container_closed(false), _run_cond(false), enable_unclean_shutdown(false),
48     max_cps(0), CPSLimit(0), CPSHardLimit(0)
49 {
50 }
51 
instance()52 AmSessionContainer* AmSessionContainer::instance()
53 {
54   if(!_instance)
55     _instance = new AmSessionContainer();
56 
57   return _instance;
58 }
59 
dispose()60 void AmSessionContainer::dispose()
61 {
62   if(_instance != NULL) {
63     if(!_instance->is_stopped()) {
64       _instance->stop();
65 
66       while (!_instance->is_stopped())
67 	usleep(10000);
68     }
69     // todo: add locking here
70     delete _instance;
71     _instance = NULL;
72   }
73 }
74 
clean_sessions()75 bool AmSessionContainer::clean_sessions() {
76   ds_mut.lock();
77   DBG("Session cleaner starting its work\n");
78 
79   try {
80     SessionQueue n_sessions;
81 
82     while(!d_sessions.empty()){
83 
84       AmSession* cur_session = d_sessions.front();
85       d_sessions.pop();
86 
87       ds_mut.unlock();
88 
89       if(cur_session->is_stopped() && !cur_session->isProcessingMedia()){
90 
91 	MONITORING_MARK_FINISHED(cur_session->getLocalTag().c_str());
92 
93 	DBG("session [%p] has been destroyed\n",(void*)cur_session->_pid);
94 	delete cur_session;
95       }
96       else {
97 	DBG("session [%p] still running\n",(void*)cur_session->_pid);
98 	n_sessions.push(cur_session);
99       }
100 
101       ds_mut.lock();
102     }
103 
104     swap(d_sessions,n_sessions);
105 
106   }catch(std::exception& e){
107     ERROR("exception caught in session cleaner: %s\n", e.what());
108     throw; /* throw again as this is fatal (because unlocking the mutex fails!! */
109   }catch(...){
110     ERROR("unknown exception caught in session cleaner!\n");
111     throw; /* throw again as this is fatal (because unlocking the mutex fails!! */
112   }
113   bool more = !d_sessions.empty();
114   ds_mut.unlock();
115   return more;
116 }
117 
initMonitoring()118 void AmSessionContainer::initMonitoring() {
119   _MONITORING_INIT;
120 }
121 
run()122 void AmSessionContainer::run()
123 {
124   while(!_container_closed.get()){
125 
126     _run_cond.wait_for();
127 
128     if(_container_closed.get())
129       break;
130 
131     // Give the Sessions some time to stop by themselves
132     sleep(5);
133 
134     bool more = clean_sessions();
135 
136     DBG("Session cleaner finished\n");
137     if(!more  && (!_container_closed.get()))
138       _run_cond.set(false);
139   }
140   DBG("Session cleaner terminating\n");
141 }
142 
broadcastShutdown()143 void AmSessionContainer::broadcastShutdown() {
144   DBG("brodcasting ServerShutdown system event to %u sessions...\n",
145       AmSession::getSessionNum());
146   AmEventDispatcher::instance()->
147     broadcast(new AmSystemEvent(AmSystemEvent::ServerShutdown));
148 }
149 
on_stop()150 void AmSessionContainer::on_stop()
151 {
152   _container_closed.set(true);
153 
154   if (enable_unclean_shutdown) {
155     INFO("unclean shutdown requested - not broadcasting shutdown\n");
156   } else {
157     broadcastShutdown();
158 
159     DBG("waiting for active event queues to stop...\n");
160 
161     for (unsigned int i=0;
162 	 (!AmEventDispatcher::instance()->empty() &&
163 	  (!AmConfig::MaxShutdownTime ||
164 	   i < AmConfig::MaxShutdownTime * 1000 / 10));i++)
165       usleep(10000);
166 
167     if (!AmEventDispatcher::instance()->empty()) {
168       WARN("Not all calls cleanly ended!\n");
169     }
170 
171     DBG("cleaning sessions...\n");
172     while (clean_sessions())
173       usleep(10000);
174   }
175 
176   _run_cond.set(true); // so that thread stops
177 }
178 
stopAndQueue(AmSession * s)179 void AmSessionContainer::stopAndQueue(AmSession* s)
180 {
181 
182   if (AmConfig::LogSessions) {
183     INFO("session cleaner about to stop %s\n",
184 	 s->getLocalTag().c_str());
185   }
186 
187   s->stop();
188 
189   ds_mut.lock();
190   d_sessions.push(s);
191   _run_cond.set(true);
192   ds_mut.unlock();
193 }
194 
destroySession(AmSession * s)195 void AmSessionContainer::destroySession(AmSession* s)
196 {
197     AmEventQueueInterface* q = AmEventDispatcher::instance()->
198       delEventQueue(s->getLocalTag());
199 
200     if(q) {
201 	stopAndQueue(s);
202     }
203     else {
204 	WARN("could not remove session: id not found or wrong type\n");
205     }
206 }
207 
startSessionUAC(const AmSipRequest & req,string & app_name,AmArg * session_params)208 string AmSessionContainer::startSessionUAC(const AmSipRequest& req, string& app_name, AmArg* session_params) {
209 
210   unique_ptr<AmSession> session;
211   try {
212     session.reset(createSession(req, app_name, session_params));
213     if(session.get() != 0) {
214       session->dlg->initFromLocalRequest(req);
215       session->setCallgroup(req.from_tag);
216 
217       switch(addSession(req.from_tag,session.get())) {
218 
219       case AmSessionContainer::Inserted:
220 	// successful case
221 	break;
222 
223       case AmSessionContainer::ShutDown:
224 	throw AmSession::Exception(AmConfig::ShutdownModeErrCode,
225 				   AmConfig::ShutdownModeErrReason);
226 
227       case AmSessionContainer::AlreadyExist:
228 	throw AmSession::Exception(482,
229 				   SIP_REPLY_LOOP_DETECTED);
230 
231       default:
232 	ERROR("adding session to session container\n");
233 	throw string(SIP_REPLY_SERVER_INTERNAL_ERROR);
234       }
235 
236       MONITORING_LOG5(req.from_tag,
237 		      "app", app_name.c_str(),
238 		      "dir", "out",
239 		      "from", req.from.c_str(),
240 		      "to", req.to.c_str(),
241 		      "ruri", req.r_uri.c_str());
242 
243       if (int err = session->sendInvite(req.hdrs)) {
244 	ERROR("INVITE could not be sent: error code = %d.\n", err);
245 	AmEventDispatcher::instance()->delEventQueue(req.from_tag);
246 	MONITORING_MARK_FINISHED(req.from_tag.c_str());
247 	return "";
248       }
249 
250       if (AmConfig::LogSessions) {
251 	INFO("Starting UAC session %s app %s\n",
252 	     req.from_tag.c_str(), app_name.c_str());
253       }
254       try {
255 	session->start();
256       } catch (...) {
257 	AmEventDispatcher::instance()->delEventQueue(req.from_tag);
258 	throw;
259       }
260     }
261   }
262   catch(const AmSession::Exception& e){
263     ERROR("%i %s\n",e.code,e.reason.c_str());
264     return "";
265   }
266   catch(const string& err){
267     ERROR("startSession: %s\n",err.c_str());
268     return "";
269   }
270   catch(...){
271     ERROR("unexpected exception\n");
272     return "";
273   }
274 
275   session.release();
276   return req.from_tag;
277 }
278 
startSessionUAS(AmSipRequest & req)279 void AmSessionContainer::startSessionUAS(AmSipRequest& req)
280 {
281   try {
282       // Call-ID and From-Tag are unknown: it's a new session
283       unique_ptr<AmSession> session;
284       string app_name;
285 
286       session.reset(createSession(req,app_name));
287       if(session.get() != 0){
288 
289 	// update session's local tag (ID) if not already set
290 	session->setLocalTag();
291 	const string& local_tag = session->getLocalTag();
292 	// by default each session is in its own callgroup
293 	session->setCallgroup(local_tag);
294 
295 	if (AmConfig::LogSessions) {
296 	  INFO("Starting UAS session %s\n",
297 	       local_tag.c_str());
298 	}
299 
300 	switch(addSession(req.callid,req.from_tag,local_tag,
301 			  req.via_branch,session.get())) {
302 
303 	case AmSessionContainer::Inserted:
304 	  // successful case
305 	  break;
306 
307 	case AmSessionContainer::ShutDown:
308 	  throw AmSession::Exception(AmConfig::ShutdownModeErrCode,
309 				     AmConfig::ShutdownModeErrReason);
310 
311 	case AmSessionContainer::AlreadyExist:
312 	  throw AmSession::Exception(482,
313 				     SIP_REPLY_LOOP_DETECTED);
314 
315 	default:
316 	  ERROR("adding session to session container\n");
317 	  throw string(SIP_REPLY_SERVER_INTERNAL_ERROR);
318 	}
319 
320 	MONITORING_LOG4(local_tag.c_str(),
321 			"dir", "in",
322 			"from", req.from.c_str(),
323 			"to", req.to.c_str(),
324 			"ruri", req.r_uri.c_str());
325 
326 	try {
327 	  session->start();
328 	} catch (...) {
329 	  AmEventDispatcher::instance()->
330 	    delEventQueue(local_tag);
331 	  throw;
332 	}
333 
334 	session->postEvent(new AmSipRequestEvent(req));
335 	session.release();
336       }
337   }
338   catch(const AmSession::Exception& e){
339     ERROR("%i %s %s\n",e.code,e.reason.c_str(), e.hdrs.c_str());
340     AmSipDialog::reply_error(req,e.code,e.reason, e.hdrs);
341   }
342   catch(const string& err){
343     ERROR("startSession: %s\n",err.c_str());
344     AmSipDialog::reply_error(req,500,err);
345   }
346   catch(...){
347     ERROR("unexpected exception\n");
348     AmSipDialog::reply_error(req,500,"unexpected exception");
349   }
350 }
351 
352 
postEvent(const string & callid,const string & remote_tag,const string & via_branch,AmEvent * event)353 bool AmSessionContainer::postEvent(const string& callid,
354 				   const string& remote_tag,
355 				   const string& via_branch,
356 				   AmEvent* event)
357 {
358     bool posted =
359       AmEventDispatcher::instance()->
360         post(callid,remote_tag,via_branch,event);
361 
362     if(!posted)
363 	delete event;
364 
365     return posted;
366 }
367 
postEvent(const string & local_tag,AmEvent * event)368 bool AmSessionContainer::postEvent(const string& local_tag,
369 				   AmEvent* event)
370 {
371     bool posted =
372 	AmEventDispatcher::instance()->
373 	post(local_tag,event);
374 
375     if(!posted)
376 	delete event;
377 
378     return posted;
379 
380 }
381 
setCPSLimit(unsigned int limit)382 void AmSessionContainer::setCPSLimit(unsigned int limit)
383 {
384   AmLock lock(cps_mut);
385   CPSLimit = CPSHardLimit = limit;
386 }
387 
setCPSSoftLimit(unsigned int percent)388 void AmSessionContainer::setCPSSoftLimit(unsigned int percent)
389 {
390   if(!percent) {
391     CPSLimit = CPSHardLimit;
392     return;
393   }
394 
395   struct timeval tv, res;
396   gettimeofday(&tv,0);
397 
398   AmLock lock(cps_mut);
399 
400   while (cps_queue.size()) {
401     timersub(&tv, &cps_queue.front(), &res);
402     if (res.tv_sec >= CPS_SAMPLERATE) {
403       cps_queue.pop();
404     }
405     else {
406       break;
407     }
408   }
409   CPSLimit = ((float)percent / 100) * ((float)cps_queue.size() / CPS_SAMPLERATE);
410   if(0 == CPSLimit) CPSLimit = 1;
411 }
412 
getCPSLimit()413 pair<unsigned int, unsigned int> AmSessionContainer::getCPSLimit()
414 {
415   AmLock lock(cps_mut);
416   return pair<unsigned int, unsigned int>(CPSHardLimit, CPSLimit);
417 }
418 
getAvgCPS()419 unsigned int AmSessionContainer::getAvgCPS()
420 {
421   struct timeval tv, res;
422   gettimeofday(&tv,0);
423 
424   AmLock lock(cps_mut);
425 
426   while (cps_queue.size()) {
427     timersub(&tv, &cps_queue.front(), &res);
428     if (res.tv_sec >= CPS_SAMPLERATE) {
429       cps_queue.pop();
430     }
431     else {
432       break;
433     }
434   }
435 
436   return (float)cps_queue.size() / CPS_SAMPLERATE;
437 }
438 
getMaxCPS()439 unsigned int AmSessionContainer::getMaxCPS()
440 {
441   AmLock lock(cps_mut);
442   unsigned int res = max_cps;
443   max_cps = 0;
444   return res;
445 }
446 
check_and_add_cps()447 bool AmSessionContainer::check_and_add_cps()
448 {
449   struct timeval tv, res;
450   gettimeofday(&tv,0);
451 
452   AmLock lock(cps_mut);
453 
454   while (cps_queue.size()) {
455     timersub(&tv, &cps_queue.front(), &res);
456     if (res.tv_sec >= CPS_SAMPLERATE) {
457       cps_queue.pop();
458     }
459     else {
460       break;
461     }
462   }
463 
464   unsigned int cps = (float)cps_queue.size() / CPS_SAMPLERATE;
465   if (cps > max_cps) {
466     max_cps = cps;
467   }
468 
469   if( CPSLimit && cps > CPSLimit ){
470     DBG("cps_limit %d reached. Not creating session.\n", CPSLimit);
471     return true;
472   }
473   else {
474     cps_queue.push(tv);
475     return false;
476   }
477 }
478 
createSession(const AmSipRequest & req,string & app_name,AmArg * session_params)479 AmSession* AmSessionContainer::createSession(const AmSipRequest& req,
480 					     string& app_name,
481 					     AmArg* session_params)
482 {
483   if (AmConfig::ShutdownMode) {
484     _run_cond.set(true); // so that thread stops
485     DBG("Shutdown mode. Not creating session.\n");
486 
487     AmSipDialog::reply_error(req,AmConfig::ShutdownModeErrCode,
488 			     AmConfig::ShutdownModeErrReason);
489     return NULL;
490   }
491 
492   if (AmConfig::SessionLimit &&
493       AmConfig::SessionLimit <= AmSession::session_num) {
494 
495       DBG("session_limit %d reached. Not creating session.\n",
496 	  AmConfig::SessionLimit);
497 
498       AmSipDialog::reply_error(req,AmConfig::SessionLimitErrCode,
499 			       AmConfig::SessionLimitErrReason);
500       return NULL;
501   }
502 
503   if (check_and_add_cps()) {
504       AmSipDialog::reply_error(req,AmConfig::CPSLimitErrCode,
505 			       AmConfig::CPSLimitErrReason);
506       return NULL;
507   }
508 
509   AmSessionFactory* session_factory = NULL;
510   if(!app_name.empty())
511       session_factory = AmPlugIn::instance()->getFactory4App(app_name);
512   else
513       session_factory = AmPlugIn::instance()->findSessionFactory(req,app_name);
514 
515   if(!session_factory) {
516 
517       ERROR("No session factory for application\n");
518       AmSipDialog::reply_error(req,500,SIP_REPLY_SERVER_INTERNAL_ERROR);
519 
520       return NULL;
521   }
522 
523   map<string,string> app_params;
524   parse_app_params(req.hdrs,app_params);
525 
526   AmSession* session = NULL;
527   if (req.method == "INVITE") {
528     if (NULL != session_params) {
529       session = session_factory->onInvite(req, app_name, *session_params);
530     }
531     else {
532       session = session_factory->onInvite(req, app_name, app_params);
533     }
534   } else if (req.method == "REFER") {
535     if (NULL != session_params)
536       session = session_factory->onRefer(req, app_name, *session_params);
537     else
538       session = session_factory->onRefer(req, app_name, app_params);
539   }
540 
541   if(!session) {
542     //  Session creation failed:
543     //   application denied session creation
544     //   or there was an error.
545     //
546     //  let's hope the createState function has replied...
547     //  ... and do nothing !
548 
549     DBG("onInvite/onRefer returned NULL\n");
550   }
551   else {
552     // save session parameters
553     session->app_params = app_params;
554   }
555 
556   return session;
557 }
558 
559 AmSessionContainer::AddSessionStatus
addSession(const string & callid,const string & remote_tag,const string & local_tag,const string & via_branch,AmSession * session)560 AmSessionContainer::addSession(const string& callid,
561 			       const string& remote_tag,
562 			       const string& local_tag,
563 			       const string& via_branch,
564 			       AmSession* session)
565 {
566   if(_container_closed.get())
567     return ShutDown;
568 
569   if(AmEventDispatcher::instance()->
570      addEventQueue(local_tag,(AmEventQueue*)session,
571 		   callid,remote_tag,via_branch)) {
572     return Inserted;
573   }
574 
575   return AlreadyExist;
576 }
577 
578 AmSessionContainer::AddSessionStatus
addSession(const string & local_tag,AmSession * session)579 AmSessionContainer::addSession(const string& local_tag,
580 			       AmSession* session)
581 {
582   if(_container_closed.get())
583     return ShutDown;
584 
585   if(AmEventDispatcher::instance()->
586      addEventQueue(local_tag,(AmEventQueue*)session)){
587     return Inserted;
588   }
589 
590   return AlreadyExist;
591 }
592 
enableUncleanShutdown()593 void AmSessionContainer::enableUncleanShutdown() {
594   enable_unclean_shutdown = true;
595 }
596