1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 /** @file AmMediaProcessor.h */
28 #ifndef _AmMediaProcessor_h_
29 #define _AmMediaProcessor_h_
30 
31 #include "AmEventQueue.h"
32 #include "amci/amci.h" // AUDIO_BUFFER_SIZE
33 
34 #include <set>
35 using std::set;
36 #include <map>
37 
38 struct SchedRequest;
39 
40 /** Interface for basic media session processing.
41  *
42  * Media processor stores set of objects implementing this interface and
43  * periodically triggers media processing on each of them.
44  *
45  * First it calls readStreams() method on all managed media sessions to read
46  * from all streams first and then calls writeStreams() to send data out in all
47  * these sessions.
48  *
49  * Once audio processing of all media sessions is done, media processor walks
50  * through them once more and calls processDtmfEvents() on them to handle DTMF
51  * events detected when reading data from media streams.
52  */
53 
54 class AmMediaSession
55 {
56   private:
57     AmCondition<bool> processing_media;
58 
59   public:
AmMediaSession()60     AmMediaSession(): processing_media(false) { }
~AmMediaSession()61     virtual ~AmMediaSession() { }
62 
63     /** Read from all media streams.
64      *
65      * To preserve current media processing scheme it is needed to read from all
66      * streams first and then write to them. This can be important for example
67      * in case of conferences where we need to have media from all streams ready
68      * for mixing them.
69      *
70      * So the AmMediaProcessorThread first calls readStreams on all sessions and
71      * then writeStreams on all sessions.
72      *
73      * \param ts timestamp for which the processing is currently running
74      *
75      * \param buffer multi-purpose space given from outside (AmMediaProcessorThread)
76      *
77      * Buffer given as parametr is usable for anything, originally was intended for data
78      * read from one stream before putting to another stream.
79      *
80      * The reason for having this buffer as parameter is that buffer size for
81      * audio processing is quite large (2K here) and thus allocating it on stack
82      * on some architectures may be problematic.
83      *
84      * On other hand having the buffer dynamically allocated for each media
85      * session would significantly increase memory consumption per call.
86      *
87      * So for now it seems to be the simplest way just to give the buffer as
88      * parameter from AmMediaProcessorThread and reuse it in all sessions handled
89      * by this thread (processing is done sequentially one session after another). */
90     virtual int readStreams(unsigned long long ts, unsigned char *buffer) = 0;
91 
92     /** Write to all media streams.
93      *
94      * For the meaning of parameters see description of readStreams() method. */
95     virtual int writeStreams(unsigned long long ts, unsigned char *buffer) = 0;
96 
97     /** Handle events in DTMF event queue.
98      *
99      * DTMF events should be detected from RTP stream when reading data (see
100      * readStreams()) and put into an event queue for later processing to avoid
101      * blocking of audio processing for too long time.
102      *
103      * This DTMF event queue should be processed then, within this method, which
104      * is triggered by AmMediaProcessorThread once reading/writing from media
105      * streams is finished. */
106     virtual void processDtmfEvents() = 0;
107 
108     /** Reset all media processing elements.
109      *
110      * Called as part of cleanup when removing session from media processor upon
111      * an processing error (either readStreams() or writeStreams() returning
112      * error). */
113     virtual void clearAudio() = 0;
114 
115     /** Reset timeouts of all RTP streams related to this media session.
116      *
117      * Called during initialization when session starts to be processed by media
118      * processor. */
119     virtual void clearRTPTimeout() = 0;
120 
121     /** Callback function called when a session is added to media processor.
122      *
123      * Default implementation sets internal variable usable for detection if the
124      * object is in use by AmMediaProcessorThread. */
onMediaProcessingStarted()125     virtual void onMediaProcessingStarted() { processing_media.set(true); }
126 
127     /* Callback function called when a session is removed from media processor.
128      *
129      * Default implementation sets internal variable usable for detection if the
130      * object is in use by AmMediaProcessorThread. */
onMediaProcessingTerminated()131     virtual void onMediaProcessingTerminated() { processing_media.set(false); }
132 
133     /** Indicates if the object is used by media processor.
134      *
135      * Returns value of internal variable for distinguishing if the object is
136      * already added into media processor. It should be avoided to insert one
137      * session into media processor multiple times.
138      *
139      * Note that using default implementation of onMediaProcessingStarted and
140      * onMediaProcessingTerminated is required for proper function. */
isProcessingMedia()141     virtual bool isProcessingMedia() { return processing_media.get(); }
142 
143     /** Indicates if the object is used by media processor.
144      *
145      * Seems to be duplicate to isProcessingMedia(). It was kept to reduce
146      * number of changes in existing code. */
isDetached()147     virtual bool isDetached() { return !isProcessingMedia(); }
148 };
149 
150 /**
151  * \brief Media processing thread
152  *
153  * This class implements a media processing thread.
154  * It processes the media and triggers the sending of RTP
155  * of all sessions added to it.
156  */
157 class AmMediaProcessorThread :
158   public AmThread,
159   public AmEventHandler
160 {
161   AmEventQueue    events;
162   unsigned char   buffer[AUDIO_BUFFER_SIZE];
163   set<AmMediaSession*> sessions;
164 
165   void processAudio(unsigned long long ts);
166   /**
167    * Process pending DTMF events
168    */
169   void processDtmfEvents();
170 
171   // AmThread interface
172   void run();
173   void on_stop();
174   AmSharedVar<bool> stop_requested;
175 
176   // AmEventHandler interface
177   void process(AmEvent* e);
178 public:
179   AmMediaProcessorThread();
180   ~AmMediaProcessorThread();
181 
182   inline void postRequest(SchedRequest* sr);
183 
184   unsigned int getLoad();
185 };
186 
187 /**
188  * \brief Media processing thread manager
189  *
190  * This class implements the manager that assigns and removes
191  * the Sessions to the various \ref MediaProcessorThreads,
192  * according to their call group. This class contains the API
193  * for the MediaProcessor.
194  */
195 class AmMediaProcessor
196 {
197   static AmMediaProcessor* _instance;
198 
199   unsigned int num_threads;
200   AmMediaProcessorThread**  threads;
201 
202   std::map<string, unsigned int> callgroup2thread;
203   std::multimap<string, AmMediaSession*> callgroupmembers;
204   std::map<AmMediaSession*, string> session2callgroup;
205   AmMutex group_mut;
206 
207   AmMediaProcessor();
208   ~AmMediaProcessor();
209 
210   void removeFromProcessor(AmMediaSession* s, unsigned int r_type);
211 public:
212   /**
213    * InsertSession     : inserts the session to the processor
214    * RemoveSession     : remove the session from the processor
215    * SoftRemoveSession : remove the session from the processor but leave it attached
216    * ClearSession      : remove the session from processor and clear audio
217    */
218   enum { InsertSession, RemoveSession, SoftRemoveSession, ClearSession };
219 
220   static AmMediaProcessor* instance();
221 
222   void init();
223   /** Add session s to processor */
224   void addSession(AmMediaSession* s, const string& callgroup);
225   /** Remove session s from processor */
226   void removeSession(AmMediaSession* s);
227   /** Remove session s from processor and clear its audio */
228   void clearSession(AmMediaSession* s);
229   /** Remove session s from processor but don't signal that to the session */
230   void softRemoveSession(AmMediaSession* s);
231   /** Change the callgroup of a session (use with caution) */
232   void changeCallgroup(AmMediaSession* s,
233 		       const string& new_callgroup);
234 
235   void stop();
236   static void dispose();
237 };
238 
239 
240 #endif
241 
242 
243 
244 
245 
246 
247