1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * For a license to use the sems software under conditions
12  * other than those described here, or to purchase support for this
13  * software, please contact iptel.org by e-mail at the following addresses:
14  *    info@iptel.org
15  *
16  * SEMS is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with this program; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24  */
25 
26 #include "AmMediaProcessor.h"
27 #include "AmSession.h"
28 #include "AmRtpStream.h"
29 
30 #include <assert.h>
31 #include <sys/time.h>
32 #include <signal.h>
33 
34 // Solaris seems to need this for nanosleep().
35 #if defined (__SVR4) && defined (__sun)
36 #include <time.h>
37 #endif
38 
39 /** \brief Request event to the MediaProcessor (remove,...) */
40 struct SchedRequest :
41   public AmEvent
42 {
43   AmMediaSession* s;
44 
SchedRequestSchedRequest45   SchedRequest(int id, AmMediaSession* s)
46     : AmEvent(id), s(s) {}
47 };
48 
49 /*         session scheduler              */
50 
51 AmMediaProcessor* AmMediaProcessor::_instance = NULL;
52 
AmMediaProcessor()53 AmMediaProcessor::AmMediaProcessor()
54   : num_threads(0), threads(NULL)
55 {
56 }
57 
~AmMediaProcessor()58 AmMediaProcessor::~AmMediaProcessor()
59 {
60   INFO("Media processor has been recycled.\n");
61 }
62 
init()63 void AmMediaProcessor::init() {
64   // start the threads
65   num_threads = AmConfig::MediaProcessorThreads;
66   assert(num_threads > 0);
67   DBG("Starting %u MediaProcessorThreads.\n", num_threads);
68   threads = new AmMediaProcessorThread*[num_threads];
69   for (unsigned int i=0;i<num_threads;i++) {
70     threads[i] = new AmMediaProcessorThread();
71     threads[i]->start();
72   }
73 }
74 
instance()75 AmMediaProcessor* AmMediaProcessor::instance()
76 {
77   if(!_instance)
78     _instance = new AmMediaProcessor();
79 
80   return _instance;
81 }
82 
addSession(AmMediaSession * s,const string & callgroup)83 void AmMediaProcessor::addSession(AmMediaSession* s,
84 				  const string& callgroup)
85 {
86   s->onMediaProcessingStarted();
87 
88   // evaluate correct scheduler
89   unsigned int sched_thread = 0;
90   group_mut.lock();
91 
92   // callgroup already in a thread?
93   std::map<std::string, unsigned int>::iterator it =
94     callgroup2thread.find(callgroup);
95   if (it != callgroup2thread.end()) {
96     // yes, use it
97     sched_thread = it->second;
98   } else {
99     // no, find the thread with lowest load
100     unsigned int lowest_load = threads[0]->getLoad();
101     for (unsigned int i=1;i<num_threads;i++) {
102       unsigned int lower = threads[i]->getLoad();
103       if (lower < lowest_load) {
104 	lowest_load = lower; sched_thread = i;
105       }
106     }
107     // create callgroup->thread mapping
108     callgroup2thread[callgroup] = sched_thread;
109   }
110 
111   // join the callgroup
112   callgroupmembers.insert(make_pair(callgroup, s));
113   session2callgroup[s]=callgroup;
114 
115   group_mut.unlock();
116 
117   // add the session to selected thread
118   threads[sched_thread]->
119     postRequest(new SchedRequest(InsertSession,s));
120 }
121 
clearSession(AmMediaSession * s)122 void AmMediaProcessor::clearSession(AmMediaSession* s) {
123   removeFromProcessor(s, ClearSession);
124 }
125 
removeSession(AmMediaSession * s)126 void AmMediaProcessor::removeSession(AmMediaSession* s) {
127   removeFromProcessor(s, RemoveSession);
128 }
129 
softRemoveSession(AmMediaSession * s)130 void AmMediaProcessor::softRemoveSession(AmMediaSession* s) {
131   removeFromProcessor(s, SoftRemoveSession);
132 }
133 
134 /* FIXME: implement Call Group ts offsets for soft changing of
135 	call groups
136 */
changeCallgroup(AmMediaSession * s,const string & new_callgroup)137 void AmMediaProcessor::changeCallgroup(AmMediaSession* s,
138 				       const string& new_callgroup) {
139   removeFromProcessor(s, SoftRemoveSession);
140   addSession(s, new_callgroup);
141 }
142 
removeFromProcessor(AmMediaSession * s,unsigned int r_type)143 void AmMediaProcessor::removeFromProcessor(AmMediaSession* s,
144 					   unsigned int r_type) {
145   DBG("AmMediaProcessor::removeSession\n");
146   group_mut.lock();
147   // get scheduler
148   string callgroup = session2callgroup[s];
149   unsigned int sched_thread = callgroup2thread[callgroup];
150   DBG("  callgroup is '%s', thread %u\n", callgroup.c_str(), sched_thread);
151   // erase callgroup membership entry
152   std::multimap<std::string, AmMediaSession*>::iterator it =
153     callgroupmembers.lower_bound(callgroup);
154   while ((it != callgroupmembers.end()) &&
155          (it != callgroupmembers.upper_bound(callgroup))) {
156     if (it->second == s) {
157       callgroupmembers.erase(it);
158       break;
159     }
160     it++;
161   }
162   // erase callgroup entry if empty
163   if (!callgroupmembers.count(callgroup)) {
164     callgroup2thread.erase(callgroup);
165     DBG("callgroup empty, erasing it.\n");
166   }
167   // erase session entry
168   session2callgroup.erase(s);
169   group_mut.unlock();
170 
171   threads[sched_thread]->postRequest(new SchedRequest(r_type,s));
172 }
173 
stop()174 void AmMediaProcessor::stop() {
175   assert(threads);
176   for (unsigned int i=0;i<num_threads;i++) {
177     if(threads[i] != NULL) {
178       threads[i]->stop();
179     }
180   }
181   bool threads_stopped = true;
182   do {
183     usleep(10000); // 10ms
184     threads_stopped = true;
185     for (unsigned int i=0;i<num_threads;i++) {
186       if((threads[i] != NULL) &&(!threads[i]->is_stopped())) {
187         threads_stopped = false;
188         break;
189       }
190     }
191   } while(!threads_stopped);
192 
193   for (unsigned int i=0;i<num_threads;i++) {
194     if(threads[i] != NULL) {
195       delete threads[i];
196       threads[i] = NULL;
197     }
198   }
199   delete []  threads;
200   threads = NULL;
201 }
202 
dispose()203 void AmMediaProcessor::dispose()
204 {
205   if(_instance != NULL) {
206     if(_instance->threads != NULL) {
207       _instance->stop();
208     }
209     delete _instance;
210     _instance = NULL;
211   }
212 }
213 
214 /* the actual media processing thread */
215 
AmMediaProcessorThread()216 AmMediaProcessorThread::AmMediaProcessorThread()
217   : events(this), stop_requested(false)
218 {
219 }
~AmMediaProcessorThread()220 AmMediaProcessorThread::~AmMediaProcessorThread()
221 {
222 }
223 
on_stop()224 void AmMediaProcessorThread::on_stop()
225 {
226   INFO("requesting media processor to stop.\n");
227   stop_requested.set(true);
228 }
229 
run()230 void AmMediaProcessorThread::run()
231 {
232   stop_requested = false;
233   struct timeval now,next_tick,diff,tick;
234 
235   // wallclock time
236   unsigned long long ts = 0;//4294417296;
237 
238   tick.tv_sec  = 0;
239   tick.tv_usec = 1000*WC_INC_MS;
240 
241   gettimeofday(&now,NULL);
242   timeradd(&tick,&now,&next_tick);
243 
244   while(!stop_requested.get()){
245 
246     gettimeofday(&now,NULL);
247 
248     if(timercmp(&now,&next_tick,<)){
249 
250       struct timespec sdiff,rem;
251       timersub(&next_tick,&now,&diff);
252 
253       sdiff.tv_sec  = diff.tv_sec;
254       sdiff.tv_nsec = diff.tv_usec * 1000;
255 
256       if(sdiff.tv_nsec > 2000000) // 2 ms
257 	nanosleep(&sdiff,&rem);
258     }
259 
260     processAudio(ts);
261     events.processEvents();
262     processDtmfEvents();
263 
264     ts = (ts + WC_INC) & WALLCLOCK_MASK;
265     timeradd(&tick,&next_tick,&next_tick);
266   }
267 }
268 
269 /**
270  * process pending DTMF events
271  */
processDtmfEvents()272 void AmMediaProcessorThread::processDtmfEvents()
273 {
274   for(set<AmMediaSession*>::iterator it = sessions.begin();
275       it != sessions.end(); it++)
276     {
277       AmMediaSession* s = (*it);
278       s->processDtmfEvents();
279     }
280 }
281 
processAudio(unsigned long long ts)282 void AmMediaProcessorThread::processAudio(unsigned long long ts)
283 {
284   // receiving
285   for(set<AmMediaSession*>::iterator it = sessions.begin();
286       it != sessions.end(); it++)
287   {
288     if ((*it)->readStreams(ts, buffer) < 0)
289       postRequest(new SchedRequest(AmMediaProcessor::ClearSession, *it));
290   }
291 
292   // sending
293   for(set<AmMediaSession*>::iterator it = sessions.begin();
294       it != sessions.end(); it++)
295   {
296     if ((*it)->writeStreams(ts, buffer) < 0)
297       postRequest(new SchedRequest(AmMediaProcessor::ClearSession, *it));
298   }
299 }
300 
process(AmEvent * e)301 void AmMediaProcessorThread::process(AmEvent* e)
302 {
303   SchedRequest* sr = dynamic_cast<SchedRequest*>(e);
304   if(!sr){
305     ERROR("AmMediaProcessorThread::process: wrong event type\n");
306     return;
307   }
308 
309   switch(sr->event_id){
310 
311   case AmMediaProcessor::InsertSession:
312     DBG("Session inserted to the scheduler\n");
313     sessions.insert(sr->s);
314     sr->s->clearRTPTimeout();
315     break;
316 
317   case AmMediaProcessor::RemoveSession:{
318     AmMediaSession* s = sr->s;
319     set<AmMediaSession*>::iterator s_it = sessions.find(s);
320     if(s_it != sessions.end()){
321       sessions.erase(s_it);
322       s->onMediaProcessingTerminated();
323       DBG("Session removed from the scheduler\n");
324     }
325   }
326     break;
327 
328   case AmMediaProcessor::ClearSession:{
329     AmMediaSession* s = sr->s;
330     set<AmMediaSession*>::iterator s_it = sessions.find(s);
331     if(s_it != sessions.end()){
332       sessions.erase(s_it);
333       s->clearAudio();
334       s->onMediaProcessingTerminated();
335       DBG("Session removed from the scheduler\n");
336     }
337   }
338     break;
339 
340 
341   case AmMediaProcessor::SoftRemoveSession:{
342     AmMediaSession* s = sr->s;
343     set<AmMediaSession*>::iterator s_it = sessions.find(s);
344     if(s_it != sessions.end()){
345       sessions.erase(s_it);
346       DBG("Session removed softly from the scheduler\n");
347     }
348   }
349     break;
350 
351   default:
352     ERROR("AmMediaProcessorThread::process: unknown event id.");
353     break;
354   }
355 }
356 
getLoad()357 unsigned int AmMediaProcessorThread::getLoad() {
358   // lock ?
359   return sessions.size();
360 }
361 
postRequest(SchedRequest * sr)362 inline void AmMediaProcessorThread::postRequest(SchedRequest* sr) {
363   events.postEvent(sr);
364 }
365