1 /*
2 * Copyright (C) 2002-2003 Fhg Fokus
3 * Copyright (C) 2006 iptego GmbH
4 *
5 * This file is part of SEMS, a free SIP media server.
6 *
7 * SEMS is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version. This program is released under
11 * the GPL with the additional exemption that compiling, linking,
12 * and/or using OpenSSL is allowed.
13 *
14 * For a license to use the SEMS software under conditions
15 * other than those described here, or to purchase support for this
16 * software, please contact iptel.org by e-mail at the following addresses:
17 * info@iptel.org
18 *
19 * SEMS is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with this program; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 */
28
29 #include "AmSessionContainer.h"
30 #include "AmPlugIn.h"
31 #include "AmApi.h"
32 #include "AmConfig.h"
33 #include "AmUtils.h"
34 #include "AmEventDispatcher.h"
35
36 #include <assert.h>
37 #include <sys/types.h>
38 #include <unistd.h>
39
40 #include "sems.h"
41
42 AmSessionContainer* AmSessionContainer::_instance=NULL;
43
44 _MONITORING_DECLARE_INTERFACE(AmSessionContainer);
45
AmSessionContainer()46 AmSessionContainer::AmSessionContainer()
47 : _container_closed(false), _run_cond(false), enable_unclean_shutdown(false),
48 max_cps(0), CPSLimit(0), CPSHardLimit(0)
49 {
50 }
51
instance()52 AmSessionContainer* AmSessionContainer::instance()
53 {
54 if(!_instance)
55 _instance = new AmSessionContainer();
56
57 return _instance;
58 }
59
dispose()60 void AmSessionContainer::dispose()
61 {
62 if(_instance != NULL) {
63 if(!_instance->is_stopped()) {
64 _instance->stop();
65
66 while (!_instance->is_stopped())
67 usleep(10000);
68 }
69 // todo: add locking here
70 delete _instance;
71 _instance = NULL;
72 }
73 }
74
clean_sessions()75 bool AmSessionContainer::clean_sessions() {
76 ds_mut.lock();
77 DBG("Session cleaner starting its work\n");
78
79 try {
80 SessionQueue n_sessions;
81
82 while(!d_sessions.empty()){
83
84 AmSession* cur_session = d_sessions.front();
85 d_sessions.pop();
86
87 ds_mut.unlock();
88
89 if(cur_session->is_stopped() && !cur_session->isProcessingMedia()){
90
91 MONITORING_MARK_FINISHED(cur_session->getLocalTag().c_str());
92
93 DBG("session [%p] has been destroyed\n",(void*)cur_session->_pid);
94 delete cur_session;
95 }
96 else {
97 DBG("session [%p] still running\n",(void*)cur_session->_pid);
98 n_sessions.push(cur_session);
99 }
100
101 ds_mut.lock();
102 }
103
104 swap(d_sessions,n_sessions);
105
106 }catch(std::exception& e){
107 ERROR("exception caught in session cleaner: %s\n", e.what());
108 throw; /* throw again as this is fatal (because unlocking the mutex fails!! */
109 }catch(...){
110 ERROR("unknown exception caught in session cleaner!\n");
111 throw; /* throw again as this is fatal (because unlocking the mutex fails!! */
112 }
113 bool more = !d_sessions.empty();
114 ds_mut.unlock();
115 return more;
116 }
117
initMonitoring()118 void AmSessionContainer::initMonitoring() {
119 _MONITORING_INIT;
120 }
121
run()122 void AmSessionContainer::run()
123 {
124 while(!_container_closed.get()){
125
126 _run_cond.wait_for();
127
128 if(_container_closed.get())
129 break;
130
131 // Give the Sessions some time to stop by themselves
132 sleep(5);
133
134 bool more = clean_sessions();
135
136 DBG("Session cleaner finished\n");
137 if(!more && (!_container_closed.get()))
138 _run_cond.set(false);
139 }
140 DBG("Session cleaner terminating\n");
141 }
142
broadcastShutdown()143 void AmSessionContainer::broadcastShutdown() {
144 DBG("brodcasting ServerShutdown system event to %u sessions...\n",
145 AmSession::getSessionNum());
146 AmEventDispatcher::instance()->
147 broadcast(new AmSystemEvent(AmSystemEvent::ServerShutdown));
148 }
149
on_stop()150 void AmSessionContainer::on_stop()
151 {
152 _container_closed.set(true);
153
154 if (enable_unclean_shutdown) {
155 INFO("unclean shutdown requested - not broadcasting shutdown\n");
156 } else {
157 broadcastShutdown();
158
159 DBG("waiting for active event queues to stop...\n");
160
161 for (unsigned int i=0;
162 (!AmEventDispatcher::instance()->empty() &&
163 (!AmConfig::MaxShutdownTime ||
164 i < AmConfig::MaxShutdownTime * 1000 / 10));i++)
165 usleep(10000);
166
167 if (!AmEventDispatcher::instance()->empty()) {
168 WARN("Not all calls cleanly ended!\n");
169 }
170
171 DBG("cleaning sessions...\n");
172 while (clean_sessions())
173 usleep(10000);
174 }
175
176 _run_cond.set(true); // so that thread stops
177 }
178
stopAndQueue(AmSession * s)179 void AmSessionContainer::stopAndQueue(AmSession* s)
180 {
181
182 if (AmConfig::LogSessions) {
183 INFO("session cleaner about to stop %s\n",
184 s->getLocalTag().c_str());
185 }
186
187 s->stop();
188
189 ds_mut.lock();
190 d_sessions.push(s);
191 _run_cond.set(true);
192 ds_mut.unlock();
193 }
194
destroySession(AmSession * s)195 void AmSessionContainer::destroySession(AmSession* s)
196 {
197 AmEventQueueInterface* q = AmEventDispatcher::instance()->
198 delEventQueue(s->getLocalTag());
199
200 if(q) {
201 stopAndQueue(s);
202 }
203 else {
204 WARN("could not remove session: id not found or wrong type\n");
205 }
206 }
207
startSessionUAC(const AmSipRequest & req,string & app_name,AmArg * session_params)208 string AmSessionContainer::startSessionUAC(const AmSipRequest& req, string& app_name, AmArg* session_params) {
209
210 unique_ptr<AmSession> session;
211 try {
212 session.reset(createSession(req, app_name, session_params));
213 if(session.get() != 0) {
214 session->dlg->initFromLocalRequest(req);
215 session->setCallgroup(req.from_tag);
216
217 switch(addSession(req.from_tag,session.get())) {
218
219 case AmSessionContainer::Inserted:
220 // successful case
221 break;
222
223 case AmSessionContainer::ShutDown:
224 throw AmSession::Exception(AmConfig::ShutdownModeErrCode,
225 AmConfig::ShutdownModeErrReason);
226
227 case AmSessionContainer::AlreadyExist:
228 throw AmSession::Exception(482,
229 SIP_REPLY_LOOP_DETECTED);
230
231 default:
232 ERROR("adding session to session container\n");
233 throw string(SIP_REPLY_SERVER_INTERNAL_ERROR);
234 }
235
236 MONITORING_LOG5(req.from_tag,
237 "app", app_name.c_str(),
238 "dir", "out",
239 "from", req.from.c_str(),
240 "to", req.to.c_str(),
241 "ruri", req.r_uri.c_str());
242
243 if (int err = session->sendInvite(req.hdrs)) {
244 ERROR("INVITE could not be sent: error code = %d.\n", err);
245 AmEventDispatcher::instance()->delEventQueue(req.from_tag);
246 MONITORING_MARK_FINISHED(req.from_tag.c_str());
247 return "";
248 }
249
250 if (AmConfig::LogSessions) {
251 INFO("Starting UAC session %s app %s\n",
252 req.from_tag.c_str(), app_name.c_str());
253 }
254 try {
255 session->start();
256 } catch (...) {
257 AmEventDispatcher::instance()->delEventQueue(req.from_tag);
258 throw;
259 }
260 }
261 }
262 catch(const AmSession::Exception& e){
263 ERROR("%i %s\n",e.code,e.reason.c_str());
264 return "";
265 }
266 catch(const string& err){
267 ERROR("startSession: %s\n",err.c_str());
268 return "";
269 }
270 catch(...){
271 ERROR("unexpected exception\n");
272 return "";
273 }
274
275 session.release();
276 return req.from_tag;
277 }
278
startSessionUAS(AmSipRequest & req)279 void AmSessionContainer::startSessionUAS(AmSipRequest& req)
280 {
281 try {
282 // Call-ID and From-Tag are unknown: it's a new session
283 unique_ptr<AmSession> session;
284 string app_name;
285
286 session.reset(createSession(req,app_name));
287 if(session.get() != 0){
288
289 // update session's local tag (ID) if not already set
290 session->setLocalTag();
291 const string& local_tag = session->getLocalTag();
292 // by default each session is in its own callgroup
293 session->setCallgroup(local_tag);
294
295 if (AmConfig::LogSessions) {
296 INFO("Starting UAS session %s\n",
297 local_tag.c_str());
298 }
299
300 switch(addSession(req.callid,req.from_tag,local_tag,
301 req.via_branch,session.get())) {
302
303 case AmSessionContainer::Inserted:
304 // successful case
305 break;
306
307 case AmSessionContainer::ShutDown:
308 throw AmSession::Exception(AmConfig::ShutdownModeErrCode,
309 AmConfig::ShutdownModeErrReason);
310
311 case AmSessionContainer::AlreadyExist:
312 throw AmSession::Exception(482,
313 SIP_REPLY_LOOP_DETECTED);
314
315 default:
316 ERROR("adding session to session container\n");
317 throw string(SIP_REPLY_SERVER_INTERNAL_ERROR);
318 }
319
320 MONITORING_LOG4(local_tag.c_str(),
321 "dir", "in",
322 "from", req.from.c_str(),
323 "to", req.to.c_str(),
324 "ruri", req.r_uri.c_str());
325
326 try {
327 session->start();
328 } catch (...) {
329 AmEventDispatcher::instance()->
330 delEventQueue(local_tag);
331 throw;
332 }
333
334 session->postEvent(new AmSipRequestEvent(req));
335 session.release();
336 }
337 }
338 catch(const AmSession::Exception& e){
339 ERROR("%i %s %s\n",e.code,e.reason.c_str(), e.hdrs.c_str());
340 AmSipDialog::reply_error(req,e.code,e.reason, e.hdrs);
341 }
342 catch(const string& err){
343 ERROR("startSession: %s\n",err.c_str());
344 AmSipDialog::reply_error(req,500,err);
345 }
346 catch(...){
347 ERROR("unexpected exception\n");
348 AmSipDialog::reply_error(req,500,"unexpected exception");
349 }
350 }
351
352
postEvent(const string & callid,const string & remote_tag,const string & via_branch,AmEvent * event)353 bool AmSessionContainer::postEvent(const string& callid,
354 const string& remote_tag,
355 const string& via_branch,
356 AmEvent* event)
357 {
358 bool posted =
359 AmEventDispatcher::instance()->
360 post(callid,remote_tag,via_branch,event);
361
362 if(!posted)
363 delete event;
364
365 return posted;
366 }
367
postEvent(const string & local_tag,AmEvent * event)368 bool AmSessionContainer::postEvent(const string& local_tag,
369 AmEvent* event)
370 {
371 bool posted =
372 AmEventDispatcher::instance()->
373 post(local_tag,event);
374
375 if(!posted)
376 delete event;
377
378 return posted;
379
380 }
381
setCPSLimit(unsigned int limit)382 void AmSessionContainer::setCPSLimit(unsigned int limit)
383 {
384 AmLock lock(cps_mut);
385 CPSLimit = CPSHardLimit = limit;
386 }
387
setCPSSoftLimit(unsigned int percent)388 void AmSessionContainer::setCPSSoftLimit(unsigned int percent)
389 {
390 if(!percent) {
391 CPSLimit = CPSHardLimit;
392 return;
393 }
394
395 struct timeval tv, res;
396 gettimeofday(&tv,0);
397
398 AmLock lock(cps_mut);
399
400 while (cps_queue.size()) {
401 timersub(&tv, &cps_queue.front(), &res);
402 if (res.tv_sec >= CPS_SAMPLERATE) {
403 cps_queue.pop();
404 }
405 else {
406 break;
407 }
408 }
409 CPSLimit = ((float)percent / 100) * ((float)cps_queue.size() / CPS_SAMPLERATE);
410 if(0 == CPSLimit) CPSLimit = 1;
411 }
412
getCPSLimit()413 pair<unsigned int, unsigned int> AmSessionContainer::getCPSLimit()
414 {
415 AmLock lock(cps_mut);
416 return pair<unsigned int, unsigned int>(CPSHardLimit, CPSLimit);
417 }
418
getAvgCPS()419 unsigned int AmSessionContainer::getAvgCPS()
420 {
421 struct timeval tv, res;
422 gettimeofday(&tv,0);
423
424 AmLock lock(cps_mut);
425
426 while (cps_queue.size()) {
427 timersub(&tv, &cps_queue.front(), &res);
428 if (res.tv_sec >= CPS_SAMPLERATE) {
429 cps_queue.pop();
430 }
431 else {
432 break;
433 }
434 }
435
436 return (float)cps_queue.size() / CPS_SAMPLERATE;
437 }
438
getMaxCPS()439 unsigned int AmSessionContainer::getMaxCPS()
440 {
441 AmLock lock(cps_mut);
442 unsigned int res = max_cps;
443 max_cps = 0;
444 return res;
445 }
446
check_and_add_cps()447 bool AmSessionContainer::check_and_add_cps()
448 {
449 struct timeval tv, res;
450 gettimeofday(&tv,0);
451
452 AmLock lock(cps_mut);
453
454 while (cps_queue.size()) {
455 timersub(&tv, &cps_queue.front(), &res);
456 if (res.tv_sec >= CPS_SAMPLERATE) {
457 cps_queue.pop();
458 }
459 else {
460 break;
461 }
462 }
463
464 unsigned int cps = (float)cps_queue.size() / CPS_SAMPLERATE;
465 if (cps > max_cps) {
466 max_cps = cps;
467 }
468
469 if( CPSLimit && cps > CPSLimit ){
470 DBG("cps_limit %d reached. Not creating session.\n", CPSLimit);
471 return true;
472 }
473 else {
474 cps_queue.push(tv);
475 return false;
476 }
477 }
478
createSession(const AmSipRequest & req,string & app_name,AmArg * session_params)479 AmSession* AmSessionContainer::createSession(const AmSipRequest& req,
480 string& app_name,
481 AmArg* session_params)
482 {
483 if (AmConfig::ShutdownMode) {
484 _run_cond.set(true); // so that thread stops
485 DBG("Shutdown mode. Not creating session.\n");
486
487 AmSipDialog::reply_error(req,AmConfig::ShutdownModeErrCode,
488 AmConfig::ShutdownModeErrReason);
489 return NULL;
490 }
491
492 if (AmConfig::SessionLimit &&
493 AmConfig::SessionLimit <= AmSession::session_num) {
494
495 DBG("session_limit %d reached. Not creating session.\n",
496 AmConfig::SessionLimit);
497
498 AmSipDialog::reply_error(req,AmConfig::SessionLimitErrCode,
499 AmConfig::SessionLimitErrReason);
500 return NULL;
501 }
502
503 if (check_and_add_cps()) {
504 AmSipDialog::reply_error(req,AmConfig::CPSLimitErrCode,
505 AmConfig::CPSLimitErrReason);
506 return NULL;
507 }
508
509 AmSessionFactory* session_factory = NULL;
510 if(!app_name.empty())
511 session_factory = AmPlugIn::instance()->getFactory4App(app_name);
512 else
513 session_factory = AmPlugIn::instance()->findSessionFactory(req,app_name);
514
515 if(!session_factory) {
516
517 ERROR("No session factory for application\n");
518 AmSipDialog::reply_error(req,500,SIP_REPLY_SERVER_INTERNAL_ERROR);
519
520 return NULL;
521 }
522
523 map<string,string> app_params;
524 parse_app_params(req.hdrs,app_params);
525
526 AmSession* session = NULL;
527 if (req.method == "INVITE") {
528 if (NULL != session_params) {
529 session = session_factory->onInvite(req, app_name, *session_params);
530 }
531 else {
532 session = session_factory->onInvite(req, app_name, app_params);
533 }
534 } else if (req.method == "REFER") {
535 if (NULL != session_params)
536 session = session_factory->onRefer(req, app_name, *session_params);
537 else
538 session = session_factory->onRefer(req, app_name, app_params);
539 }
540
541 if(!session) {
542 // Session creation failed:
543 // application denied session creation
544 // or there was an error.
545 //
546 // let's hope the createState function has replied...
547 // ... and do nothing !
548
549 DBG("onInvite/onRefer returned NULL\n");
550 }
551 else {
552 // save session parameters
553 session->app_params = app_params;
554 }
555
556 return session;
557 }
558
559 AmSessionContainer::AddSessionStatus
addSession(const string & callid,const string & remote_tag,const string & local_tag,const string & via_branch,AmSession * session)560 AmSessionContainer::addSession(const string& callid,
561 const string& remote_tag,
562 const string& local_tag,
563 const string& via_branch,
564 AmSession* session)
565 {
566 if(_container_closed.get())
567 return ShutDown;
568
569 if(AmEventDispatcher::instance()->
570 addEventQueue(local_tag,(AmEventQueue*)session,
571 callid,remote_tag,via_branch)) {
572 return Inserted;
573 }
574
575 return AlreadyExist;
576 }
577
578 AmSessionContainer::AddSessionStatus
addSession(const string & local_tag,AmSession * session)579 AmSessionContainer::addSession(const string& local_tag,
580 AmSession* session)
581 {
582 if(_container_closed.get())
583 return ShutDown;
584
585 if(AmEventDispatcher::instance()->
586 addEventQueue(local_tag,(AmEventQueue*)session)){
587 return Inserted;
588 }
589
590 return AlreadyExist;
591 }
592
enableUncleanShutdown()593 void AmSessionContainer::enableUncleanShutdown() {
594 enable_unclean_shutdown = true;
595 }
596