1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 
28 #include "AmSession.h"
29 #include "AmSdp.h"
30 #include "AmConfig.h"
31 #include "AmUtils.h"
32 #include "AmPlugIn.h"
33 #include "AmApi.h"
34 #include "AmSessionContainer.h"
35 #include "AmSessionProcessor.h"
36 #include "AmMediaProcessor.h"
37 #include "AmDtmfDetector.h"
38 #include "AmPlayoutBuffer.h"
39 #include "AmAppTimer.h"
40 
41 #ifdef WITH_ZRTP
42 #include "AmZRTP.h"
43 #endif
44 
45 #include "log.h"
46 
47 #include <algorithm>
48 
49 #include <unistd.h>
50 #include <assert.h>
51 #include <sys/time.h>
52 
53 volatile unsigned int AmSession::session_num = 0;
54 AmMutex AmSession::session_num_mut;
55 volatile unsigned int AmSession::session_count = 0;
56 volatile unsigned int AmSession::max_session_num = 0;
57 volatile unsigned long long AmSession::avg_session_num = 0;
58 
get_now()59 struct timeval get_now() {
60   struct timeval res;
61   gettimeofday(&res, NULL);
62   return res;
63 }
64 struct timeval avg_last_timestamp = get_now();
65 struct timeval avg_first_timestamp = avg_last_timestamp;
66 
67 // AmSession methods
68 
AmSession(AmSipDialog * p_dlg)69 AmSession::AmSession(AmSipDialog* p_dlg)
70   : AmEventQueue(this), m_dtmfDetector(this),
71     m_dtmfEventQueue(&m_dtmfDetector),
72     m_dtmfDetectionEnabled(true),
73     processing_status(SESSION_PROCESSING_EVENTS),
74     sess_stopped(false),
75     accept_early_session(false),
76     rtp_interface(-1),
77     input(NULL), output(NULL),
78     refresh_method(REFRESH_UPDATE_FB_REINV),
79     remote_rtp_mux_port(0),
80 #ifdef WITH_ZRTP
81     enable_zrtp(AmConfig::enable_zrtp),
82 #endif
83     dlg(p_dlg)
84 
85 #ifdef SESSION_THREADPOOL
86   , _pid(this)
87 #endif
88 {
89   DBG("dlg = %p",dlg);
90   if(!dlg) dlg = new AmSipDialog(this);
91   else dlg->setEventhandler(this);
92 }
93 
~AmSession()94 AmSession::~AmSession()
95 {
96   for(vector<AmSessionEventHandler*>::iterator evh = ev_handlers.begin();
97       evh != ev_handlers.end(); evh++) {
98 
99     if((*evh)->destroy)
100       delete *evh;
101   }
102 
103   delete dlg;
104 
105   DBG("AmSession destructor finished\n");
106 }
107 
createSipDialog()108 AmSipDialog* AmSession::createSipDialog()
109 {
110   return new AmSipDialog(this);
111 }
112 
setCallgroup(const string & cg)113 void AmSession::setCallgroup(const string& cg) {
114   callgroup = cg;
115 }
116 
getCallgroup()117 string AmSession::getCallgroup() {
118   return callgroup;
119 }
120 
changeCallgroup(const string & cg)121 void AmSession::changeCallgroup(const string& cg) {
122   callgroup = cg;
123   AmMediaProcessor::instance()->changeCallgroup(this, cg);
124 }
125 
startMediaProcessing()126 void AmSession::startMediaProcessing()
127 {
128   if(getStopped() || isProcessingMedia())
129     return;
130 
131   if(isAudioSet()) {
132     AmMediaProcessor::instance()->addSession(this, callgroup);
133   }
134   else {
135     DBG("no audio input and output set. "
136 	"Session will not be attached to MediaProcessor.\n");
137   }
138 }
139 
stopMediaProcessing()140 void AmSession::stopMediaProcessing()
141 {
142   if(!isProcessingMedia())
143     return;
144 
145   AmMediaProcessor::instance()->removeSession(this);
146 }
147 
addHandler(AmSessionEventHandler * sess_evh)148 void AmSession::addHandler(AmSessionEventHandler* sess_evh)
149 {
150   if (sess_evh != NULL)
151     ev_handlers.push_back(sess_evh);
152 }
153 
setInput(AmAudio * in)154 void AmSession::setInput(AmAudio* in)
155 {
156   lockAudio();
157   input = in;
158   unlockAudio();
159 }
160 
setOutput(AmAudio * out)161 void AmSession::setOutput(AmAudio* out)
162 {
163   lockAudio();
164   output = out;
165   unlockAudio();
166 }
167 
setInOut(AmAudio * in,AmAudio * out)168 void AmSession::setInOut(AmAudio* in,AmAudio* out)
169 {
170   lockAudio();
171   input = in;
172   output = out;
173   unlockAudio();
174 }
175 
isAudioSet()176 bool AmSession::isAudioSet()
177 {
178   lockAudio();
179   bool set = input || output;
180   unlockAudio();
181   return set;
182 }
183 
lockAudio()184 void AmSession::lockAudio()
185 {
186   audio_mut.lock();
187 }
188 
unlockAudio()189 void AmSession::unlockAudio()
190 {
191   audio_mut.unlock();
192 }
193 
getCallID() const194 const string& AmSession::getCallID() const
195 {
196   return dlg->getCallid();
197 }
198 
getRemoteTag() const199 const string& AmSession::getRemoteTag() const
200 {
201   return dlg->getRemoteTag();
202 }
203 
getLocalTag() const204 const string& AmSession::getLocalTag() const
205 {
206   return dlg->getLocalTag();
207 }
208 
getFirstBranch() const209 const string& AmSession::getFirstBranch() const
210 {
211   return dlg->get1stBranch();
212 }
213 
setUri(const string & uri)214 void AmSession::setUri(const string& uri)
215 {
216   DBG("AmSession::setUri(%s)\n",uri.c_str());
217   /* TODO: sdp.uri = uri;*/
218 }
219 
setLocalTag()220 void AmSession::setLocalTag()
221 {
222   if (dlg->getLocalTag().empty()) {
223     string new_id = getNewId();
224     dlg->setLocalTag(new_id);
225     DBG("AmSession::setLocalTag() - session id set to %s\n", new_id.c_str());
226   }
227 }
228 
setLocalTag(const string & tag)229 void AmSession::setLocalTag(const string& tag)
230 {
231   DBG("AmSession::setLocalTag(%s)\n",tag.c_str());
232   dlg->setLocalTag(tag);
233 }
234 
getPayloads()235 const vector<SdpPayload*>& AmSession::getPayloads()
236 {
237   return m_payloads;
238 }
239 
getRPort()240 int AmSession::getRPort()
241 {
242   return RTPStream()->getRPort();
243 }
244 
245 #ifdef SESSION_THREADPOOL
start()246 void AmSession::start() {
247   AmSessionProcessorThread* processor_thread =
248     AmSessionProcessor::getProcessorThread();
249   if (NULL == processor_thread)
250     throw string("no processing thread available");
251 
252   // have the thread register and start us
253   processor_thread->startSession(this);
254 }
255 
is_stopped()256 bool AmSession::is_stopped() {
257   return processing_status == SESSION_ENDED_DISCONNECTED;
258 }
259 #else
260 // in this case every session has its own thread
261 // - this is the main processing loop
run()262 void AmSession::run() {
263   DBG("startup session\n");
264   if (!startup())
265     return;
266 
267   DBG("running session event loop\n");
268   while (true) {
269     waitForEvent();
270     if (!processingCycle())
271       break;
272   }
273 
274   DBG("session event loop ended, finalizing session\n");
275   finalize();
276 }
277 #endif
278 
startup()279 bool AmSession::startup() {
280   session_started();
281 
282   try {
283     try {
284 
285       onStart();
286 
287 #ifdef WITH_ZRTP
288       if (enable_zrtp) {
289 	if (zrtp_session_state.initSession(this)) {
290 	  ERROR("initializing ZRTP session\n");
291 	  throw AmSession::Exception(500, SIP_REPLY_SERVER_INTERNAL_ERROR);
292 	}
293 	DBG("initialized ZRTP session context OK\n");
294       }
295 #endif
296 
297     }
298     catch(const AmSession::Exception& e){ throw e; }
299     catch(const string& str){
300       ERROR("%s\n",str.c_str());
301       throw AmSession::Exception(500,"unexpected exception.");
302     }
303     catch(...){
304       throw AmSession::Exception(500,"unexpected exception.");
305     }
306 
307   } catch(const AmSession::Exception& e){
308     ERROR("%i %s\n",e.code,e.reason.c_str());
309     onBeforeDestroy();
310     destroy();
311 
312     session_stopped();
313 
314     return false;
315   }
316 
317   return true;
318 }
319 
processEventsCatchExceptions()320 bool AmSession::processEventsCatchExceptions() {
321   try {
322     try {
323       processEvents();
324     }
325     catch(const AmSession::Exception& e){ throw e; }
326     catch(const string& str){
327       ERROR("%s\n",str.c_str());
328       throw AmSession::Exception(500,"unexpected exception.");
329     }
330     catch(...){
331       throw AmSession::Exception(500,"unexpected exception.");
332     }
333   } catch(const AmSession::Exception& e){
334     ERROR("%i %s\n",e.code,e.reason.c_str());
335     return false;
336   }
337   return true;
338 }
339 
340 /** one cycle of the event processing loop.
341     this should be called until it returns false. */
processingCycle()342 bool AmSession::processingCycle() {
343 
344   DBG("vv S [%s|%s] %s, %s, %i UACTransPending, %i usages vv\n",
345       dlg->getCallid().c_str(),getLocalTag().c_str(),
346       dlg->getStatusStr(),
347       sess_stopped.get()?"stopped":"running",
348       dlg->getUACTransPending(),
349       dlg->getUsages());
350 
351   switch (processing_status) {
352   case SESSION_PROCESSING_EVENTS:
353     {
354       if (!processEventsCatchExceptions()) {
355 	// exception occured, stop processing
356 	processing_status = SESSION_ENDED_DISCONNECTED;
357 	return false;
358       }
359 
360       AmSipDialog::Status dlg_status = dlg->getStatus();
361       bool s_stopped = sess_stopped.get();
362 
363       DBG("^^ S [%s|%s] %s, %s, %i UACTransPending, %i usages ^^\n",
364 	  dlg->getCallid().c_str(),getLocalTag().c_str(),
365 	  AmBasicSipDialog::getStatusStr(dlg_status),
366 	  s_stopped?"stopped":"running",
367 	  dlg->getUACTransPending(),
368 	  dlg->getUsages());
369 
370       // session running?
371       if (!s_stopped || (dlg_status == AmSipDialog::Disconnecting)
372 	  || dlg->getUsages())
373 	return true;
374 
375       // session stopped?
376       if (s_stopped &&
377 	  (dlg_status == AmSipDialog::Disconnected)) {
378 	processing_status = SESSION_ENDED_DISCONNECTED;
379 	return false;
380       }
381 
382       // wait for session's status to be disconnected
383       // todo: set some timer to tear down the session anyway,
384       //       or react properly on negative reply to BYE (e.g. timeout)
385       processing_status = SESSION_WAITING_DISCONNECTED;
386 
387       if ((dlg_status != AmSipDialog::Disconnected) &&
388 	  (dlg_status != AmSipDialog::Cancelling)) {
389 	DBG("app did not send BYE - do that for the app\n");
390 	if (dlg->bye() != 0) {
391 	  processing_status = SESSION_ENDED_DISCONNECTED;
392 	  // BYE sending failed - don't wait for dlg status to go disconnected
393 	  return false;
394 	}
395       }
396 
397       return true;
398 
399     } break;
400 
401   case SESSION_WAITING_DISCONNECTED: {
402     // processing events until dialog status is Disconnected
403 
404     if (!processEventsCatchExceptions()) {
405       processing_status = SESSION_ENDED_DISCONNECTED;
406       return false; // exception occured, stop processing
407     }
408 
409     bool res = dlg->getStatus() != AmSipDialog::Disconnected;
410     if (!res)
411       processing_status = SESSION_ENDED_DISCONNECTED;
412 
413     DBG("^^ S [%s|%s] %s, %s, %i UACTransPending, %i usages ^^\n",
414 	dlg->getCallid().c_str(),getLocalTag().c_str(),
415 	dlg->getStatusStr(),
416 	sess_stopped.get()?"stopped":"running",
417 	dlg->getUACTransPending(),
418 	dlg->getUsages());
419 
420     return res;
421   }; break;
422 
423   default: {
424     ERROR("unknown session processing state\n");
425     return false; // stop processing
426   }
427   }
428 }
429 
finalize()430 void AmSession::finalize()
431 {
432   DBG("running finalize sequence...\n");
433   dlg->finalize();
434 
435 #ifdef WITH_ZRTP
436   if (enable_zrtp) {
437     zrtp_session_state.freeSession();
438   }
439 #endif
440 
441   onBeforeDestroy();
442   destroy();
443 
444   session_stopped();
445 
446   DBG("session is stopped.\n");
447 }
448 #ifndef SESSION_THREADPOOL
on_stop()449   void AmSession::on_stop()
450 #else
451   void AmSession::stop()
452 #endif
453 {
454   DBG("AmSession::stop()\n");
455 
456   if (!isDetached())
457     AmMediaProcessor::instance()->clearSession(this);
458   else
459     clearAudio();
460 }
461 
setStopped(bool wakeup)462 void AmSession::setStopped(bool wakeup) {
463   if (!sess_stopped.get()) {
464     sess_stopped.set(true);
465     onStop();
466   }
467   if (wakeup)
468     AmSessionContainer::instance()->postEvent(getLocalTag(),
469 					      new AmEvent(0));
470 }
471 
getAppParam(const string & param_name) const472 string AmSession::getAppParam(const string& param_name) const
473 {
474   map<string,string>::const_iterator param_it;
475   param_it = app_params.find(param_name);
476   if(param_it != app_params.end())
477     return param_it->second;
478   else
479     return "";
480 }
481 
destroy()482 void AmSession::destroy() {
483   DBG("AmSession::destroy()\n");
484   AmSessionContainer::instance()->destroySession(this);
485 }
486 
getNewId()487 string AmSession::getNewId() {
488   struct timeval t;
489   gettimeofday(&t,NULL);
490 
491   string id = "";
492 
493   id += int2hex(get_random()) + "-";
494   id += int2hex(t.tv_sec) + int2hex(t.tv_usec) + "-";
495   id += int2hex((unsigned int)((unsigned long)pthread_self()));
496 
497   return id;
498 }
499 /* bookkeeping functions - TODO: move to monitoring */
session_started()500 void AmSession::session_started() {
501   struct timeval now, delta;
502 
503   session_num_mut.lock();
504   //avg session number
505   gettimeofday(&now, NULL);
506   timersub(&now, &avg_last_timestamp, &delta);
507   avg_session_num += session_num * (delta.tv_sec * 1000000ULL + delta.tv_usec);
508   avg_last_timestamp = now;
509 
510   //current session number
511   session_num++;
512 
513   //cumulative session count
514   session_count++;
515 
516   //maximum session number
517   if(session_num > max_session_num) max_session_num = session_num;
518 
519   session_num_mut.unlock();
520 }
521 
session_stopped()522 void AmSession::session_stopped() {
523   struct timeval now, delta;
524   session_num_mut.lock();
525   //avg session number
526   gettimeofday(&now, NULL);
527   timersub(&now, &avg_last_timestamp, &delta);
528   avg_session_num += session_num * (delta.tv_sec * 1000000ULL + delta.tv_usec);
529   avg_last_timestamp = now;
530   //current session number
531   session_num--;
532   session_num_mut.unlock();
533 }
534 
getSessionNum()535 unsigned int AmSession::getSessionNum() {
536   unsigned int res = 0;
537   session_num_mut.lock();
538   res = session_num;
539   session_num_mut.unlock();
540   return res;
541 }
542 
getSessionCount()543 unsigned int AmSession::getSessionCount() {
544   unsigned int res = 0;
545   session_num_mut.lock();
546   res = session_count;
547   session_num_mut.unlock();
548   return res;
549 }
550 
getMaxSessionNum()551 unsigned int AmSession::getMaxSessionNum() {
552   unsigned int res = 0;
553   session_num_mut.lock();
554   res = max_session_num;
555   max_session_num = session_num;
556   session_num_mut.unlock();
557   return res;
558 }
559 
getAvgSessionNum()560 unsigned int AmSession::getAvgSessionNum() {
561   unsigned int res = 0;
562   struct timeval now, delta;
563   session_num_mut.lock();
564   gettimeofday(&now, NULL);
565   timersub(&now, &avg_last_timestamp, &delta);
566   avg_session_num += session_num * (delta.tv_sec * 1000000ULL + delta.tv_usec);
567   timersub(&now, &avg_first_timestamp, &delta);
568   unsigned long long d_usec = delta.tv_sec * 1000000ULL + delta.tv_usec;
569   if (!d_usec) {
570     res = 0;
571     WARN("zero delta!\n");
572   } else {
573     //Round up
574     res = (unsigned int)((avg_session_num + d_usec - 1) / d_usec);
575   }
576   avg_session_num = 0;
577   avg_last_timestamp = now;
578   avg_first_timestamp = now;
579   session_num_mut.unlock();
580   return res;
581 }
582 
setInbandDetector(Dtmf::InbandDetectorType t)583 void AmSession::setInbandDetector(Dtmf::InbandDetectorType t)
584 {
585   m_dtmfDetector.setInbandDetector(t, RTPStream()->getSampleRate());
586 }
587 
postDtmfEvent(AmDtmfEvent * evt)588 void AmSession::postDtmfEvent(AmDtmfEvent *evt)
589 {
590   if (m_dtmfDetectionEnabled)
591     {
592       if (dynamic_cast<AmSipDtmfEvent *>(evt) ||
593 	  dynamic_cast<AmRtpDtmfEvent *>(evt))
594         {
595 	  // this is a raw event from sip info or rtp
596 	  m_dtmfEventQueue.postEvent(evt);
597         }
598       else
599         {
600 	  // this is an aggregated event,
601 	  // post it into our event queue
602 	  postEvent(evt);
603         }
604     }
605 }
606 
processDtmfEvents()607 void AmSession::processDtmfEvents()
608 {
609   if (m_dtmfDetectionEnabled)
610     {
611       m_dtmfEventQueue.processEvents();
612     }
613 }
614 
putDtmfAudio(const unsigned char * buf,int size,unsigned long long system_ts)615 void AmSession::putDtmfAudio(const unsigned char *buf, int size, unsigned long long system_ts)
616 {
617   m_dtmfEventQueue.putDtmfAudio(buf, size, system_ts);
618 }
619 
sendDtmf(int event,unsigned int duration_ms)620 void AmSession::sendDtmf(int event, unsigned int duration_ms) {
621   RTPStream()->sendDtmf(event, duration_ms);
622 }
623 
624 
onDtmf(int event,int duration_msec)625 void AmSession::onDtmf(int event, int duration_msec)
626 {
627   DBG("AmSession::onDtmf(%i,%i)\n",event,duration_msec);
628 }
629 
clearAudio()630 void AmSession::clearAudio()
631 {
632   lockAudio();
633 
634   if (input) {
635     input->close();
636     input = NULL;
637   }
638   if (output) {
639     output->close();
640     output = NULL;
641   }
642 
643   unlockAudio();
644   DBG("Audio cleared !!!\n");
645   postEvent(new AmAudioEvent(AmAudioEvent::cleared));
646 }
647 
process(AmEvent * ev)648 void AmSession::process(AmEvent* ev)
649 {
650   CALL_EVENT_H(process,ev);
651 
652   DBG("AmSession processing event\n");
653 
654   if (ev->event_id == E_SYSTEM) {
655     AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(ev);
656     if(sys_ev){
657       DBG("Session received system Event\n");
658       onSystemEvent(sys_ev);
659       return;
660     }
661   }
662 
663   AmSipEvent* sip_ev = dynamic_cast<AmSipEvent*>(ev);
664   if(sip_ev){
665     (*sip_ev)(dlg);
666     return;
667   }
668 
669   AmAudioEvent* audio_ev = dynamic_cast<AmAudioEvent*>(ev);
670   if(audio_ev){
671     onAudioEvent(audio_ev);
672     return;
673   }
674 
675   AmDtmfEvent* dtmf_ev = dynamic_cast<AmDtmfEvent*>(ev);
676   if (dtmf_ev) {
677     DBG("Session received DTMF, event = %d, duration = %d\n",
678 	dtmf_ev->event(), dtmf_ev->duration());
679     onDtmf(dtmf_ev->event(), dtmf_ev->duration());
680     return;
681   }
682 
683   AmRtpTimeoutEvent* timeout_ev = dynamic_cast<AmRtpTimeoutEvent*>(ev);
684   if(timeout_ev){
685     onRtpTimeout();
686     return;
687   }
688 
689 #ifdef WITH_ZRTP
690   AmZRTPProtocolEvent* zrtp_p_ev = dynamic_cast<AmZRTPProtocolEvent*>(ev);
691   if(zrtp_p_ev){
692     onZRTPProtocolEvent((zrtp_protocol_event_t)zrtp_p_ev->event_id, zrtp_p_ev->stream_ctx);
693     return;
694   }
695 
696   AmZRTPSecurityEvent* zrtp_s_ev = dynamic_cast<AmZRTPSecurityEvent*>(ev);
697   if(zrtp_s_ev){
698     onZRTPSecurityEvent((zrtp_security_event_t)zrtp_s_ev->event_id, zrtp_s_ev->stream_ctx);
699     return;
700   }
701 #endif
702 }
703 
onSipRequest(const AmSipRequest & req)704 void AmSession::onSipRequest(const AmSipRequest& req)
705 {
706   CALL_EVENT_H(onSipRequest,req);
707 
708   DBG("onSipRequest: method = %s\n",req.method.c_str());
709 
710   updateRefreshMethod(req.hdrs);
711 
712   if(req.method == SIP_METH_INVITE){
713 
714     try {
715       onInvite(req);
716     }
717     catch(const string& s) {
718       ERROR("%s\n",s.c_str());
719       setStopped();
720       dlg->reply(req, 500, SIP_REPLY_SERVER_INTERNAL_ERROR);
721     }
722     catch(const AmSession::Exception& e) {
723       ERROR("%i %s\n",e.code,e.reason.c_str());
724       setStopped();
725       dlg->reply(req, e.code, e.reason, NULL, e.hdrs);
726     }
727   }
728   else if(req.method == SIP_METH_ACK){
729     return;
730   }
731   else if( req.method == SIP_METH_BYE ){
732     dlg->reply(req,200,"OK");
733     onBye(req);
734   }
735   else if( req.method == SIP_METH_CANCEL ){
736     onCancel(req);
737   }
738   else if( req.method == SIP_METH_INFO ){
739 
740     const AmMimeBody* dtmf_body =
741       req.body.hasContentType("application/dtmf-relay");
742 
743     if (dtmf_body) {
744       string dtmf_body_str((const char*)dtmf_body->getPayload(),
745 			   dtmf_body->getLen());
746       postDtmfEvent(new AmSipDtmfEvent(dtmf_body_str));
747       dlg->reply(req, 200, "OK");
748     } else {
749       dlg->reply(req, 415, "Unsupported Media Type");
750     }
751   } else if (req.method == SIP_METH_PRACK) {
752     // TODO: SDP
753     dlg->reply(req, 200, "OK");
754     // TODO: WARN: only include latest SDP if req.rseq == dlg->rseq (latest 1xx)
755   }
756   else if ((req.method == SIP_METH_UPDATE) && req.body.empty()) {
757     dlg->reply(req, 200, "OK");
758   }
759   else {
760     dlg->reply(req, 501, "Not implemented");
761   }
762 }
763 
onSipReply(const AmSipRequest & req,const AmSipReply & reply,AmBasicSipDialog::Status old_dlg_status)764 void AmSession::onSipReply(const AmSipRequest& req, const AmSipReply& reply,
765 			   AmBasicSipDialog::Status old_dlg_status)
766 {
767   CALL_EVENT_H(onSipReply, req, reply, old_dlg_status);
768 
769   updateRefreshMethod(reply.hdrs);
770 
771   if (dlg->getStatus() < AmSipDialog::Connected &&
772       reply.code == 180) {
773     onRinging(reply);
774   }
775 
776   if (old_dlg_status != dlg->getStatus()) {
777     DBG("Dialog status changed %s -> %s (stopped=%s) \n",
778 	AmBasicSipDialog::getStatusStr(old_dlg_status),
779 	dlg->getStatusStr(),
780 	sess_stopped.get() ? "true" : "false");
781   } else {
782     DBG("Dialog status stays %s (stopped=%s)\n",
783 	AmBasicSipDialog::getStatusStr(old_dlg_status),
784 	sess_stopped.get() ? "true" : "false");
785   }
786 }
787 
788 
789 
onInvite2xx(const AmSipReply & reply)790 void AmSession::onInvite2xx(const AmSipReply& reply)
791 {
792   dlg->send_200_ack(reply.cseq);
793 }
794 
onRemoteDisappeared(const AmSipReply &)795 void AmSession::onRemoteDisappeared(const AmSipReply&) {
796   // see 3261 - 12.2.1.2: should end dialog on 408/481
797   DBG("Remote end unreachable - ending session\n");
798   dlg->bye();
799   setStopped();
800 }
801 
onNoAck(unsigned int cseq)802 void AmSession::onNoAck(unsigned int cseq)
803 {
804   if (dlg->getStatus() == AmSipDialog::Connected)
805     dlg->bye();
806   setStopped();
807 }
808 
onNoPrack(const AmSipRequest & req,const AmSipReply & rpl)809 void AmSession::onNoPrack(const AmSipRequest &req, const AmSipReply &rpl)
810 {
811   dlg->reply(req, 504, "Server Time-out");
812   // TODO: handle forking case (when more PRACKs are sent, out of which some
813   // might time-out/fail).
814   if (dlg->getStatus() < AmSipDialog::Connected)
815     setStopped();
816 }
817 
onAudioEvent(AmAudioEvent * audio_ev)818 void AmSession::onAudioEvent(AmAudioEvent* audio_ev)
819 {
820   if (audio_ev->event_id == AmAudioEvent::cleared)
821     setStopped();
822 }
823 
onInvite(const AmSipRequest & req)824 void AmSession::onInvite(const AmSipRequest& req)
825 {
826   dlg->reply(req,200,"OK");
827 }
828 
onBye(const AmSipRequest & req)829 void AmSession::onBye(const AmSipRequest& req)
830 {
831   setStopped();
832 }
833 
onCancel(const AmSipRequest & cancel)834 void AmSession::onCancel(const AmSipRequest& cancel)
835 {
836   dlg->bye();
837   setStopped();
838 }
839 
onSystemEvent(AmSystemEvent * ev)840 void AmSession::onSystemEvent(AmSystemEvent* ev) {
841   if (ev->sys_event == AmSystemEvent::ServerShutdown) {
842     setStopped();
843     return;
844   }
845 }
846 
onSendRequest(AmSipRequest & req,int & flags)847 void AmSession::onSendRequest(AmSipRequest& req, int& flags)
848 {
849   CALL_EVENT_H(onSendRequest,req,flags);
850 }
851 
onSendReply(const AmSipRequest & req,AmSipReply & reply,int & flags)852 void AmSession::onSendReply(const AmSipRequest& req, AmSipReply& reply, int& flags)
853 {
854   CALL_EVENT_H(onSendReply,req,reply,flags);
855 }
856 
857 /** Hook called when an SDP offer is required */
getSdpOffer(AmSdp & offer)858 bool AmSession::getSdpOffer(AmSdp& offer)
859 {
860   DBG("AmSession::getSdpOffer(...) ...\n");
861 
862   offer.version = 0;
863   offer.origin.user = "sems";
864   //offer.origin.sessId = 1;
865   //offer.origin.sessV = 1;
866   offer.sessionName = "sems";
867   offer.conn.network = NT_IN;
868   offer.conn.addrType = AT_V4;
869   offer.conn.address = advertisedIP();
870 
871   // TODO: support mutiple media types (needs multiples RTP streams)
872   // TODO: support update instead of clearing everything
873 
874   if(RTPStream()->getSdpMediaIndex() < 0)
875     offer.media.clear();
876 
877   unsigned int media_idx = 0;
878   if(!offer.media.size()) {
879     offer.media.push_back(SdpMedia());
880     offer.media.back().type=MT_AUDIO;
881   }
882   else {
883     media_idx = RTPStream()->getSdpMediaIndex();
884   }
885 
886   RTPStream()->setLocalIP(localMediaIP());
887   RTPStream()->getSdpOffer(media_idx,offer.media.back());
888 
889   return true;
890 }
891 
892 struct codec_priority_cmp
893 {
894 public:
codec_priority_cmpcodec_priority_cmp895   codec_priority_cmp() {}
896 
operator ()codec_priority_cmp897   bool operator()(const SdpPayload& left, const SdpPayload& right)
898   {
899     for (vector<string>::iterator it = AmConfig::CodecOrder.begin(); it != AmConfig::CodecOrder.end(); it++) {
900       if (strcasecmp(left.encoding_name.c_str(),it->c_str())==0 && strcasecmp(right.encoding_name.c_str(), it->c_str())!=0)
901 	return true;
902       if (strcasecmp(right.encoding_name.c_str(),it->c_str())==0)
903 	return false;
904     }
905 
906     return false;
907   }
908 };
909 
910 /** Hook called when an SDP answer is required */
getSdpAnswer(const AmSdp & offer,AmSdp & answer)911 bool AmSession::getSdpAnswer(const AmSdp& offer, AmSdp& answer)
912 {
913   DBG("AmSession::getSdpAnswer(...) ...\n");
914 
915   answer.version = 0;
916   answer.origin.user = "sems";
917   //answer.origin.sessId = 1;
918   //answer.origin.sessV = 1;
919   answer.sessionName = "sems";
920   answer.conn.network = NT_IN;
921   if (offer.conn.address.empty()) answer.conn.addrType = AT_V4; // or use first stream connection?
922   else answer.conn.addrType = offer.conn.addrType;
923   answer.conn.address = advertisedIP(answer.conn.addrType);
924   answer.media.clear();
925 
926   bool audio_1st_stream = true;
927   unsigned int media_index = 0;
928   for(vector<SdpMedia>::const_iterator m_it = offer.media.begin();
929       m_it != offer.media.end(); ++m_it) {
930 
931     answer.media.push_back(SdpMedia());
932     SdpMedia& answer_media = answer.media.back();
933 
934     if( m_it->type == MT_AUDIO
935 	&& m_it->transport == TP_RTPAVP
936         && audio_1st_stream
937         && (m_it->port != 0) ) {
938 
939       RTPStream()->setLocalIP(localMediaIP(answer.conn.addrType));
940       RTPStream()->getSdpAnswer(media_index,*m_it,answer_media);
941       if(answer_media.payloads.empty() ||
942 	 ((answer_media.payloads.size() == 1) &&
943 	  (answer_media.payloads[0].encoding_name == "telephone-event"))
944 	 ){
945 	// no compatible media found
946 	throw Exception(488, SIP_REPLY_NOT_ACCEPTABLE_HERE);
947       }
948       audio_1st_stream = false;
949     }
950     else {
951 
952       answer_media.type = m_it->type;
953       answer_media.port = 0;
954       answer_media.nports = 0;
955       answer_media.transport = m_it->transport;
956       answer_media.send = false;
957       answer_media.recv = false;
958       answer_media.payloads.clear();
959       if(!m_it->payloads.empty()) {
960 	SdpPayload dummy_pl = m_it->payloads.front();
961 	dummy_pl.encoding_name.clear();
962 	dummy_pl.sdp_format_parameters.clear();
963 	answer_media.payloads.push_back(dummy_pl);
964       }
965       answer_media.attributes.clear();
966     }
967 
968     // sort payload type in the answer according to the priority given in the codec_order configuration key
969     std::stable_sort(answer_media.payloads.begin(),answer_media.payloads.end(),codec_priority_cmp());
970 
971     media_index++;
972   }
973 
974   if (audio_1st_stream)
975     throw Exception(488, SIP_REPLY_NOT_ACCEPTABLE_HERE);
976 
977   return true;
978 }
979 
onSdpCompleted(const AmSdp & local_sdp,const AmSdp & remote_sdp)980 int AmSession::onSdpCompleted(const AmSdp& local_sdp, const AmSdp& remote_sdp)
981 {
982   DBG("AmSession::onSdpCompleted(...) ...\n");
983 
984   if(local_sdp.media.empty() || remote_sdp.media.empty()) {
985 
986     ERROR("Invalid SDP");
987 
988     string debug_str;
989     local_sdp.print(debug_str);
990     ERROR("Local SDP:\n%s",
991 	  debug_str.empty() ? "<empty>"
992 	  : debug_str.c_str());
993 
994     remote_sdp.print(debug_str);
995     ERROR("Remote SDP:\n%s",
996 	  debug_str.empty() ? "<empty>"
997 	  : debug_str.c_str());
998 
999     return -1;
1000   }
1001 
1002   lockAudio();
1003 
1004   // TODO:
1005   //   - get the right media ID
1006   //   - check if the stream coresponding to the media ID
1007   //     should be created or updated
1008   //
1009   int ret = 0;
1010 
1011   try {
1012     ret = RTPStream()->init(local_sdp, remote_sdp, AmConfig::ForceSymmetricRtp);
1013   } catch (const string& s) {
1014     ERROR("Error while initializing RTP stream: '%s'\n", s.c_str());
1015     ret = -1;
1016   } catch (...) {
1017     ERROR("Error while initializing RTP stream (unknown exception in AmRTPStream::init)\n");
1018     ret = -1;
1019   }
1020   unlockAudio();
1021 
1022   if (!isProcessingMedia()) {
1023     setInbandDetector(AmConfig::DefaultDTMFDetector);
1024   }
1025 
1026   return ret;
1027 }
1028 
onEarlySessionStart()1029 void AmSession::onEarlySessionStart()
1030 {
1031   startMediaProcessing();
1032 }
1033 
onSessionStart()1034 void AmSession::onSessionStart()
1035 {
1036   startMediaProcessing();
1037 }
1038 
onRtpTimeout()1039 void AmSession::onRtpTimeout()
1040 {
1041   DBG("RTP timeout, stopping Session\n");
1042   dlg->bye();
1043   setStopped();
1044 }
1045 
onSessionTimeout()1046 void AmSession::onSessionTimeout() {
1047   DBG("Session Timer: Timeout, ending session.\n");
1048   dlg->bye();
1049   setStopped();
1050 }
1051 
updateRefreshMethod(const string & headers)1052 void AmSession::updateRefreshMethod(const string& headers) {
1053   if (refresh_method == REFRESH_UPDATE_FB_REINV) {
1054     if (key_in_list(getHeader(headers, SIP_HDR_ALLOW),
1055 		    SIP_METH_UPDATE)) {
1056       DBG("remote allows UPDATE, using UPDATE for session refresh.\n");
1057       refresh_method = REFRESH_UPDATE;
1058     }
1059   }
1060 }
1061 
refresh(int flags)1062 bool AmSession::refresh(int flags) {
1063   // no session refresh if not connected
1064   if (dlg->getStatus() != AmSipDialog::Connected)
1065     return false;
1066 
1067   if (refresh_method == REFRESH_UPDATE) {
1068     DBG("Refreshing session with UPDATE\n");
1069     return sendUpdate( NULL, "") == 0;
1070   } else {
1071 
1072     if (dlg->getUACInvTransPending()) {
1073       DBG("INVITE transaction pending - not refreshing now\n");
1074       return false;
1075     }
1076 
1077     DBG("Refreshing session with re-INVITE\n");
1078     return sendReinvite(true, "", flags) == 0;
1079   }
1080 }
1081 
sendUpdate(const AmMimeBody * body,const string & hdrs)1082 int AmSession::sendUpdate(const AmMimeBody* body,
1083 			  const string &hdrs)
1084 {
1085   return dlg->update(body, hdrs);
1086 }
1087 
onInvite1xxRel(const AmSipReply & reply)1088 void AmSession::onInvite1xxRel(const AmSipReply &reply)
1089 {
1090   // TODO: SDP
1091   if (dlg->prack(reply, NULL, /*headers*/"") < 0)
1092     ERROR("failed to send PRACK request in session '%s'.\n",sid4dbg().c_str());
1093 }
1094 
onPrack2xx(const AmSipReply & reply)1095 void AmSession::onPrack2xx(const AmSipReply &reply)
1096 {
1097   /* TODO: SDP */
1098 }
1099 
sid4dbg()1100 string AmSession::sid4dbg()
1101 {
1102   string dbg;
1103   dbg = dlg->getCallid() + "/" + dlg->getLocalTag() + "/"
1104     + dlg->getRemoteTag() + "/"
1105     + int2str(RTPStream()->getLocalPort()) + "/"
1106     + RTPStream()->getRHost() + ":" + int2str(RTPStream()->getRPort());
1107   return dbg;
1108 }
1109 
sendReinvite(bool updateSDP,const string & headers,int flags)1110 int AmSession::sendReinvite(bool updateSDP, const string& headers, int flags)
1111 {
1112   if(updateSDP){
1113     // Forces SDP offer/answer
1114     AmMimeBody sdp;
1115     sdp.addPart(SIP_APPLICATION_SDP);
1116     return dlg->reinvite(headers, &sdp, flags);
1117   }
1118   else {
1119     return dlg->reinvite(headers, NULL, flags);
1120   }
1121 }
1122 
sendInvite(const string & headers)1123 int AmSession::sendInvite(const string& headers)
1124 {
1125   onOutgoingInvite(headers);
1126 
1127   // Forces SDP offer/answer
1128   AmMimeBody sdp;
1129   sdp.addPart(SIP_APPLICATION_SDP);
1130   return dlg->invite(headers, &sdp);
1131 }
1132 
setOnHold(bool hold)1133 void AmSession::setOnHold(bool hold)
1134 {
1135   lockAudio();
1136   bool old_hold = RTPStream()->getOnHold();
1137   RTPStream()->setOnHold(hold);
1138   if (hold != old_hold)
1139     sendReinvite();
1140   unlockAudio();
1141 }
1142 
setRemoteHold(bool remote_hold)1143 void AmSession::setRemoteHold(bool remote_hold)
1144 {
1145   lockAudio();
1146   bool old_hold = RTPStream()->getRemoteHold();
1147   RTPStream()->setRemoteHold(remote_hold);
1148   if (remote_hold != old_hold)
1149     sendReinvite();
1150   unlockAudio();
1151 }
1152 
onFailure()1153 void AmSession::onFailure()
1154 {
1155   // switch (cause) {
1156   //   case FAIL_REL100_421:
1157   //   case FAIL_REL100_420:
1158   //     if (rpl) {
1159   //       dlg.cancel();
1160   //       if (dlg.getStatus() < AmSipDialog::Connected)
1161   //         setStopped();
1162   //     } else if (req) {
1163   //       if (cause == FAIL_REL100_421) {
1164   //         dlg.reply(*req, 421, SIP_REPLY_EXTENSION_REQUIRED, NULL,
1165   //             SIP_HDR_COLSP(SIP_HDR_REQUIRE) SIP_EXT_100REL CRLF);
1166   //       } else {
1167   //         dlg.reply(*req, 420, SIP_REPLY_BAD_EXTENSION, NULL,
1168   //             SIP_HDR_COLSP(SIP_HDR_UNSUPPORTED) SIP_EXT_100REL CRLF);
1169   //       }
1170   //       /* finally, stop session if running */
1171   //       if (dlg.getStatus() < AmSipDialog::Connected)
1172   //         setStopped();
1173   //     }
1174   //     break;
1175   //   default:
1176   //     break;
1177   // }
1178 }
1179 
1180 
getRtpInterface()1181 int AmSession::getRtpInterface()
1182 {
1183   if(rtp_interface < 0){
1184     // TODO: get default media interface for signaling IF instead
1185     rtp_interface = AmConfig::SIP_Ifs[dlg->getOutboundIf()].RtpInterface;
1186     if(rtp_interface < 0) {
1187       DBG("No media interface for signaling interface:\n");
1188       DBG("Using default media interface instead.\n");
1189       rtp_interface = 0;
1190     }
1191   }
1192   return rtp_interface;
1193 }
1194 
setRtpInterface(int _rtp_interface)1195 void AmSession::setRtpInterface(int _rtp_interface) {
1196   DBG("setting media interface to %d\n", _rtp_interface);
1197   rtp_interface = _rtp_interface;
1198 }
1199 
localMediaIP(int addrType)1200 string AmSession::localMediaIP(int addrType)
1201 {
1202   // sets rtp_interface if not initialized
1203   getRtpInterface();
1204 
1205   assert(rtp_interface >= 0);
1206   assert((unsigned int)rtp_interface < AmConfig::RTP_Ifs.size());
1207 
1208   string set_ip = "";
1209   for (size_t i = rtp_interface; i < AmConfig::RTP_Ifs.size(); i++) {
1210     set_ip = AmConfig::RTP_Ifs[i].LocalIP; // "media_ip" parameter.
1211     if ((addrType == AT_NONE) ||
1212 	((addrType == AT_V4) && (set_ip.find(".") != std::string::npos)) ||
1213 	((addrType == AT_V6) && (set_ip.find(":") != std::string::npos)))
1214       return set_ip;
1215   }
1216   return set_ip;
1217 }
1218 
1219 // Utility for basic NAT handling: allow the config file to specify the IP
1220 // address to use in SDP bodies
advertisedIP(int addrType)1221 string AmSession::advertisedIP(int addrType)
1222 {
1223   // sets rtp_interface if not initialized
1224   getRtpInterface();
1225 
1226   assert(rtp_interface >= 0);
1227   assert((unsigned int)rtp_interface < AmConfig::RTP_Ifs.size());
1228 
1229   string set_ip = "";
1230   for (size_t i = rtp_interface; i < AmConfig::RTP_Ifs.size(); i++) {
1231     set_ip = AmConfig::RTP_Ifs[i].getIP(); // "media_ip" parameter.
1232     if ((addrType == AT_NONE) ||
1233 	((addrType == AT_V4) && (set_ip.find(".") != std::string::npos)) ||
1234 	((addrType == AT_V6) && (set_ip.find(":") != std::string::npos)))
1235       return set_ip;
1236   }
1237   return set_ip;
1238 }
1239 
timersSupported()1240 bool AmSession::timersSupported() {
1241   WARN("this function is deprecated; application timers are always supported\n");
1242   return true;
1243 }
1244 
setTimer(int timer_id,double timeout)1245 bool AmSession::setTimer(int timer_id, double timeout) {
1246   if (timeout <= 0.005) {
1247     DBG("setting timer %d with immediate timeout - posting Event\n", timer_id);
1248     AmTimeoutEvent* ev = new AmTimeoutEvent(timer_id);
1249     postEvent(ev);
1250     return true;
1251   }
1252 
1253   DBG("setting timer %d with timeout %f\n", timer_id, timeout);
1254   AmAppTimer::instance()->setTimer(getLocalTag(), timer_id, timeout);
1255 
1256   return true;
1257 }
1258 
removeTimer(int timer_id)1259 bool AmSession::removeTimer(int timer_id) {
1260 
1261   DBG("removing timer %d\n", timer_id);
1262   AmAppTimer::instance()->removeTimer(getLocalTag(), timer_id);
1263 
1264   return true;
1265 }
1266 
removeTimers()1267 bool AmSession::removeTimers() {
1268 
1269   DBG("removing timers\n");
1270   AmAppTimer::instance()->removeTimers(getLocalTag());
1271 
1272   return true;
1273 }
1274 
1275 
1276 #ifdef WITH_ZRTP
1277 
onZRTPProtocolEvent(zrtp_protocol_event_t event,zrtp_stream_t * stream_ctx)1278 void AmSession::onZRTPProtocolEvent(zrtp_protocol_event_t event, zrtp_stream_t *stream_ctx) {
1279   DBG("AmSession::onZRTPProtocolEvent: %s\n", zrtp_protocol_event_desc(event));
1280 
1281   if (event==ZRTP_EVENT_IS_SECURE) {
1282       INFO("ZRTP_EVENT_IS_SECURE \n");
1283       //         info->is_verified  = ctx->_session_ctx->secrets.verifieds & ZRTP_BIT_RS0;
1284 
1285       // zrtp_session_t *session = stream_ctx->_session_ctx;
1286 
1287       // if (ZRTP_SAS_BASE32 == session->sas_values.rendering) {
1288       // 	DBG("Got SAS value <<<%.4s>>>\n", session->sas_values.str1.buffer);
1289       // } else {
1290       // 	DBG("Got SAS values SAS1 '%s' and SAS2 '%s'\n",
1291       // 	    session->sas_values.str1.buffer,
1292       // 	    session->sas_values.str2.buffer);
1293       // }
1294   }
1295 
1296     // case ZRTP_EVENT_IS_PENDINGCLEAR:
1297     //   INFO("ZRTP_EVENT_IS_PENDINGCLEAR\n");
1298     //   INFO("other side requested goClear. Going clear.\n\n");
1299     //   //      zrtp_clear_stream(zrtp_audio);
1300     //   break;
1301 
1302 }
1303 
onZRTPSecurityEvent(zrtp_security_event_t event,zrtp_stream_t * stream_ctx)1304 void AmSession::onZRTPSecurityEvent(zrtp_security_event_t event, zrtp_stream_t *stream_ctx) {
1305   DBG("AmSession::onZRTPSecurityEvent: %s\n", zrtp_security_event_desc(event));
1306 }
1307 
1308 #endif
1309 
readStreams(unsigned long long ts,unsigned char * buffer)1310 int AmSession::readStreams(unsigned long long ts, unsigned char *buffer)
1311 {
1312   int res = 0;
1313   lockAudio();
1314 
1315   AmRtpAudio *stream = RTPStream();
1316   unsigned int f_size = stream->getFrameSize();
1317   if (stream->checkInterval(ts)) {
1318     int got = stream->get(ts, buffer, stream->getSampleRate(), f_size);
1319     if (got < 0) res = -1;
1320     if (got > 0) {
1321       if (isDtmfDetectionEnabled())
1322         putDtmfAudio(buffer, got, ts);
1323 
1324       if (input) res = input->put(ts, buffer, stream->getSampleRate(), got);
1325     }
1326   }
1327 
1328   unlockAudio();
1329   return res;
1330 }
1331 
writeStreams(unsigned long long ts,unsigned char * buffer)1332 int AmSession::writeStreams(unsigned long long ts, unsigned char *buffer)
1333 {
1334   int res = 0;
1335   lockAudio();
1336 
1337   AmRtpAudio *stream = RTPStream();
1338   if (stream->sendIntReached()) { // FIXME: shouldn't depend on checkInterval call before!
1339     unsigned int f_size = stream->getFrameSize();
1340     int got = 0;
1341     if (output) got = output->get(ts, buffer, stream->getSampleRate(), f_size);
1342     if (got < 0) res = -1;
1343     if (got > 0) res = stream->put(ts, buffer, stream->getSampleRate(), got);
1344   }
1345 
1346   unlockAudio();
1347   return res;
1348 
1349 }
1350 
1351 /** EMACS **
1352  * Local variables:
1353  * mode: c++
1354  * c-basic-offset: 2
1355  * End:
1356  */
1357