1 #ifndef __B2BMEDIA_H
2 #define __B2BMEDIA_H
3 
4 #include "AmAudio.h"
5 #include "AmRtpStream.h"
6 #include "AmRtpAudio.h"
7 #include "AmMediaProcessor.h"
8 #include "AmDtmfDetector.h"
9 
10 #include <map>
11 
12 class AmB2BSession;
13 
14 class B2BMediaStatistics
15 {
16   private:
17     std::map<string, int> codec_write_usage;
18     std::map<string, int> codec_read_usage;
19     AmMutex mutex;
20 
21   public:
22     void reportCodecWriteUsage(string &dst);
23     void reportCodecReadUsage(string &dst);
24     void getReport(const AmArg &args, AmArg &ret);
25 
26     static B2BMediaStatistics *instance();
27     void incCodecWriteUsage(const string &codec_name);
28     void decCodecWriteUsage(const string &codec_name);
29     void incCodecReadUsage(const string &codec_name);
30     void decCodecReadUsage(const string &codec_name);
31 };
32 
33 /** \brief Class for computing mask of payloads to relay
34  *
35  * */
36 class RelayController {
37   public:
38     virtual void computeRelayMask(const SdpMedia &m, bool &enable, PayloadMask &mask) = 0;
~RelayController()39     virtual ~RelayController() { }
40 };
41 
42 /** \brief Storage for several data items required to be held with one RTP
43  * stream for B2B media processing.
44  *
45  * It has shown that there are more items to be remembered with one RTP stream
46  * so this class is an attempt to encapsulate them together and make the code
47  * better manageable (less duplicates - the same things were often done for
48  * A leg stuff and then for B leg stuff). */
49 
50 class AudioStreamData {
51   private:
52     /** The RTP stream itself.
53      *
54      * Audio only for now. */
55     AmRtpAudio *stream;
56 
57     /** Non-stream input (required for music on hold for example). */
58     AmAudio *in;
59 
60     /** Flag set when streams in A/B leg are correctly initialized (for
61      * transcoding purposes). */
62     bool initialized;
63 
64     /** remembered value of the option from AmB2BSession */
65     bool force_symmetric_rtp;
66 
67     /** Enables inband dtmf detection */
68     bool enable_dtmf_transcoding;
69 
70     /** Enables RTP DTMF (2833/4733) filtering */
71     bool enable_dtmf_rtp_filtering;
72 
73     /** Enables DTMF detection with RTP DTMF (2833/4733) */
74     bool enable_dtmf_rtp_detection;
75 
76     /** Low fidelity payloads for which inband DTMF transcoding should be used */
77     vector<SdpPayload> lowfi_payloads;
78 
79     /** DTMF detector used by dtmf_queue */
80     AmDtmfDetector *dtmf_detector;
81 
82     /** Queue for handling raw DTMF events.
83      *
84      * It is rather quick hack to make B2B media working with current code.
85      * Each stream can use different sampling rate and thus DTMF detection need
86      * to be done independently for each stream. */
87     AmDtmfEventQueue *dtmf_queue;
88 
89     PayloadMask relay_mask;
90     bool relay_enabled;
91     std::string relay_address;
92     int relay_port;
93 
94     /** RTP relay (temporarily) paused?
95      * relay stream may still be set up and updated */
96     bool relay_paused;
97 
98     bool muted;
99 
100     bool receiving;
101 
102     // for performance monitoring
103     int outgoing_payload;
104     int incoming_payload;
105     string outgoing_payload_name;
106     string incoming_payload_name;
107     void updateSendStats();
108     void updateRecvStats(AmRtpStream *s);
109     void resetStats();
110 
111     /** create the stream and take settings from the session */
112     void initialize(AmB2BSession *session);
113 
114     void clearDtmfSink();
115 
116   public:
117     /** Creates data based on associated signaling leg data. */
118     AudioStreamData(AmB2BSession *session);
119 
120     /** we want to preserve existing streams (relay streams already set, ports
121      * already used in outgoing SDP */
122     void changeSession(AmB2BSession *session);
123 
124     /** release old and store new DTMF sink */
125     void setDtmfSink(AmDtmfSink *dtmf_sink);
126 
127     /** Frees all allocated data.
128      *
129      * Stream and its peer (relay stream) must be removed from processing before
130      * calling this method! This method doesn't call stopStreamProcessing()
131      * itself because of the stream here is used as relay stream in the other
132      * leg. Before freeing current stream the other one has to be removed from
133      * processing as well.
134      *
135      * Please note that the "in" member is freed - this need not to be the right
136      * thing but this will show once it will be really used. */
137     void clear();
138 
139     /** Removes stream from processing by AmRtpReceiver. */
140     void stopStreamProcessing();
141 
142     /** Returns stream from processing by AmRtpReceiver if it was already there. */
143     void resumeStreamProcessing();
144 
145     /** Set relay stream and payload IDs to be relayed.
146      *
147      * Removes the stream from AmRtpReceiver before updating and returns it back
148      * once done. */
149     void setRelayStream(AmRtpStream *other);
150 
151     /** computes and stores payloads that can be relayed based on the
152      * corresponding 'peer session' remote media line (i.e. what accepts the
153      * other remote end directly) */
154     void setRelayPayloads(const SdpMedia &m, RelayController *ctrl);
155 
156     void setRelayDestination(const string& connection_address, int port);
157 
158     /** set relay temporarily to paused (stream relation may still be up) */
159     void setRelayPaused(bool paused);
160 
161     /** initialize given stream for transcoding & regular audio processing
162      *
163      * Returns false if the initialization failed (might happen for example if
164      * we are not able to handle the remote payloads by ourselves; anyway
165      * relaying could be still available in this case). */
166     bool initStream(PlayoutType playout_type, AmSdp &local_sdp, AmSdp &remote_sdp, int media_idx);
167 
168     /** Processes raw DTMF events in own queue. */
processDtmfEvents()169     void processDtmfEvents() { if (dtmf_queue) dtmf_queue->processEvents(); }
170 
171     /** Sends DTMF */
172     void sendDtmf(int event, unsigned int duration_ms);
173 
174     /** Writes data to won stream. Data are read either from local alternative
175      * input (in) or from stream given by src parameter.
176      *
177      * Buffer is just space used to read data before writing them,
178      * AmMediaProcessor buffer should be propagated here (see AmMediaSession) */
179     int writeStream(unsigned long long ts, unsigned char *buffer, AudioStreamData &src);
180 
181     // --- helper methods propagating our private member to outside world ---
182 
clearRTPTimeout()183     void clearRTPTimeout() {
184       if (stream) stream->clearRTPTimeout();
185     }
186 
getLocalPort()187     int getLocalPort() {
188       if (stream) return stream->getLocalPort();
189       else return 0;
190     }
191 
getLocalRtcpPort()192     int getLocalRtcpPort() {
193       if (stream) return stream->getLocalRtcpPort();
194       else return 0;
195     }
196 
setLocalIP(const string & ip)197     void setLocalIP(const string& ip) {
198       // set the address only if it is not used already
199       if (stream && !stream->hasLocalSocket()) stream->setLocalIP(ip);
200     }
201 
getStream()202     AmRtpAudio *getStream() {
203       return stream;
204     }
205 
isInitialized()206     bool isInitialized() { return initialized; }
getSdpOffer(int media_idx,SdpMedia & m)207     void getSdpOffer(int media_idx, SdpMedia &m) { if (stream) stream->getSdpOffer(media_idx, m); }
getSdpAnswer(int media_idx,const SdpMedia & offer,SdpMedia & answer)208     void getSdpAnswer(int media_idx, const SdpMedia &offer, SdpMedia &answer) { if (stream) stream->getSdpAnswer(media_idx, offer, answer); }
209     void mute(bool set_mute);
210     void setReceiving(bool r);
setInput(AmAudio * _in)211     void setInput(AmAudio *_in) { in = _in; }
getInput()212     AmAudio *getInput() { return in; }
213 
setLogger(msg_logger * logger)214     void setLogger(msg_logger *logger) { if (stream) stream->setLogger(logger); }
215 
216     void debug();
217 };
218 
219 /** \brief Class for control over media relaying and transcoding in a B2B session.
220  *
221  * This class manages RTP streams of both call legs, configures AmRtpStream
222  * relaying functionality and in case media needs to be transcoded its
223  * AmMediaSession interface implementation reads data from RTP streams in one
224  * leg and writes them to appropriate RTP streams of the other leg.
225  *
226  * From the signaling part of the session (AmB2BSession instance for caller and
227  * for callee) it needs to be informed about local and remote SDP in each leg
228  * via updateLocalSdp() and updateRemoteSdp() methods.
229  *
230  * Signaling parts of the session (caller and callee) needs to update outgoing
231  * SDP bodies by local address and ports of RTP streams using
232  * replaceConnectionAddress() method.
233  *
234  * Because generating B2B SDP is no more based on AmSession's offer/answer
235  * mechanism but we relay remote's SDP with just slight changes (some payloads
236  * filtered out, some payloads added before forwarding) we don't need to
237  * remember payload ID mapping any more (local to remote). Payload IDs should be
238  * generated correctly by the remote party and we don't need to change it when
239  * relaying RTP packets.
240  *
241  * TODO:
242  *  - handle offer/answer correctly (refused new offer means old offer/answer is
243  *    still valid)
244  *  - handle "on hold" streams - probably should be controlled by signaling
245  *    (AmB2BSession) - either we should not send audio or we should send hold
246  *    music
247  *
248  *    Currently problematic, setting AmRtpStream::active to false in
249  *    AmRtpStream::init doesn't help always - if some RTP packets arrive later
250  *    than media session is updated the stream remains 'active' (verified with
251  *    SPA 942 and twinkle)
252  *
253  *  - non-audio streams - list of AmRtpStream pairs which can be just relayed
254  *
255  *  - reference counting using atomic variables instead of locking
256  *
257  *  - RTCP
258  *
259  *  - correct sampling periods when relaying/transcoding according to values
260  *    advertised in local SDP (i.e. the relayed one)
261  *
262  *  - Is non-transparent SSRC & seq. no needed if some payloads can be transcoded and
263  *    some relayed? Couldn't be confusing to have transparent ones for relayed but our
264  *    own SSRC & seq. no for transcoded payloads? [wireshark seems to be
265  *    confused] => disable transparent SSRC/seq.no if there are payloads for transcoding?
266  *
267  *    Note that forcing our own SSRC can break things if the incomming RTP stream
268  *    comes from a source mixing audio from different sources - in that case we should
269  *    prefer to propagate SSRC (i.e. use transparent SSRC)!
270  *
271  *  - we should use our seq. numbers if transcoding is possible but propagate
272  *    lost packets (i.e. remember the difference between received seq. numbers and
273  *    sent ones and for the transcoding purpose use seq. number = max. already
274  *    used number + 1)
275  *
276  *  - configurable playout buffer type (from a test with transcoding PCMA -> PCMU
277  *    between SPA 942 and 941 it seems that at simulated 20% packet loss is the
278  *    audio quality better with ADAPTIVE_PLAYOUT in comparison with SIMPLE_PLAYOUT
279  *    but can't say it is really big differece)
280  *
281  *  - In-band DTMF detection within relayed payloads not supported yet. Do we
282  *    need it?
283  */
284 
285 class AmB2BMedia: public AmMediaSession
286 {
287   private:
288     /* remembered both legs of the B2B call
289      * currently required for DTMF processing and used for reading RTP relay
290      * parameters (rtp_relay_transparent_seqno, rtp_relay_transparent_ssrc,
291      * rtp_interface) */
292     AmB2BSession *a, *b;
293 
294     /** Pair of audio streams with the possibility to use given audio as input
295      * instead of the other stream. */
296     struct AudioStreamPair {
297       AudioStreamData a, b;
298       int media_idx;
AudioStreamPairAudioStreamPair299       AudioStreamPair(AmB2BSession *_a, AmB2BSession *_b, int _media_idx): a(_a), b(_b), media_idx(_media_idx) { }
setLoggerAudioStreamPair300       void setLogger(msg_logger *logger) { a.setLogger(logger); b.setLogger(logger); }
requiresProcessingAudioStreamPair301       bool requiresProcessing() { return a.getInput() || b.getInput(); }
302     };
303 
304     struct RelayStreamPair {
305       AmRtpStream a, b;
306       RelayStreamPair(AmB2BSession *_a, AmB2BSession *_b);
setLoggerRelayStreamPair307       void setLogger(msg_logger *logger) { a.setLogger(logger); b.setLogger(logger); }
308     };
309 
310     typedef std::vector<AudioStreamPair>::iterator AudioStreamIterator;
311     typedef std::vector<RelayStreamPair*>::iterator RelayStreamIterator;
312     typedef std::vector<SdpMedia>::iterator SdpMediaIterator;
313 
314     /** Callgroup reqired by AmMediaProcessor to distinguish
315      * AmMediaProcessorThread which should take care about media session.
316      *
317      * It might be handy to use own generated callgroup independent on caller's
318      * and callee's one. (FIXME: not sure if it is worth consumed additional
319      * resources). */
320     string callgroup;
321 
322     // needed for updating relayed payloads
323     AmSdp a_leg_local_sdp, a_leg_remote_sdp;
324     AmSdp b_leg_local_sdp, b_leg_remote_sdp;
325     bool have_a_leg_local_sdp, have_a_leg_remote_sdp;
326     bool have_b_leg_local_sdp, have_b_leg_remote_sdp;
327 
328     AmMutex mutex;
329     int ref_cnt;
330 
331     /** Playout type describes what kind of buffering will be used for audio
332      * streams. Please note that ADAPTIVE_PLAYOUT requires some kind of
333      * detection if there is really data to read from the buffer because the get
334      * function always return something regardless if something was written into
335      * or not.
336      */
337     PlayoutType playout_type;
338 
339     /** audio relay/processing streams */
340     std::vector<AudioStreamPair>  audio;
341     /** raw relay streams */
342     std::vector<RelayStreamPair*> relay_streams;
343 
344     bool a_leg_muted, b_leg_muted;
345     bool a_leg_receiving, b_leg_receiving;
346 
347     bool relay_paused;
348 
349     void createStreams(const AmSdp &sdp);
350     void updateStreamPair(AudioStreamPair &pair);
351     void updateAudioStreams();
352     void updateRelayStream(AmRtpStream *stream, AmB2BSession *session,
353 			   const string& connection_address,
354 			   const SdpMedia &m, AmRtpStream *relay_to);
355 
356     void setMuteFlag(bool a_leg, bool set);
357     void changeSessionUnsafe(bool a_leg, AmB2BSession *new_session);
358 
359     msg_logger* logger; // log RTP traffic
360 
361     virtual ~AmB2BMedia();
362 
363   public:
364     AmB2BMedia(AmB2BSession *_a, AmB2BSession *_b);
365 
366     /**
367      * To add a AmB2BMedia session to the media processor, *this method
368      * MUST be used* as it increases the refcnt.
369      */
370     void addToMediaProcessor();
371     /**
372      * unsafe version (no locking of mutex)
373      *
374      * To add a AmB2BMedia session to the media processor, *this method
375      * MUST be used* as it increases the refcnt.
376      */
377     void addToMediaProcessorUnsafe();
378 
379     void changeSession(bool a_leg, AmB2BSession *new_session);
380 
381     //void updateRelayPayloads(bool a_leg, const AmSdp &local_sdp, const AmSdp &remote_sdp);
382 
383     /**
384      * Adds a reference.
385      *
386      * Both AmB2BSessions and AmMediaProcessor uses refcnt to this class; B2BSession
387      * in case of RTP relay, AmMediaProcessor in case of local media processing.
388      *
389      * Instance of this object is created with reference counter set to zero.
390      * Thus if somebody wants to hold a reference it must call addReference()
391      * explicitly after construction!
392      */
393     void addReference();
394 
395     /** Releases reference.
396      *
397      * Returns true if this was the last reference, in that case the pointer
398      * to that object is now *invalid*
399      * Must be last operation in member method!
400      */
401     bool releaseReference();
402 
403     // ----------------- SDP manipulation & updates -------------------
404 
405     static bool canRelay(const SdpMedia &m);
406 
407     /** Replace connection address and ports within SDP.
408      *
409      * Throws an exception (string) in case of error. (FIXME?) */
410     void replaceConnectionAddress(AmSdp &parser_sdp, bool a_leg,
411 				  const string& relay_address,
412 				  const string& relay_public_address);
413 
414     /** replace offer inside given SDP with locally generated one (media streams
415      * etc must be initialised like in case replaceConnectionAddress) */
416     bool replaceOffer(AmSdp &sdp, bool a_leg);
417 
418     /** Update media session with local & remote SDP. */
419     void updateStreams(bool a_leg, const AmSdp &local_sdp, const AmSdp &remote_sdp, RelayController *ctrl);
420 
421     /** Clear audio for given leg and stop processing if both legs stopped.
422      *
423      * Releases all RTP streams and removes itself from media processor if still
424      * there. */
425     void stop(bool a_leg);
426 
427     // ---- AmMediaSession interface for processing audio in a standard way ----
428 
429     /** Should read from all streams before writing to the other streams.
430      *
431      * Because processing is driven by destination stream (i.e. we don't read
432      * anything unless the destination stream is ready to send something - see
433      * sendIntReached()) all processing is done in writeStreams */
readStreams(unsigned long long ts,unsigned char * buffer)434     virtual int readStreams(unsigned long long ts, unsigned char *buffer) { return 0; }
435 
436     /** Read and write all RTP streams if data are to be written (see
437      * readStreams()). */
438     virtual int writeStreams(unsigned long long ts, unsigned char *buffer);
439 
440     /** Calls processDtmfEvent on both AmB2BSessions for which this AmB2BMedia
441      * instance manages media. */
442     virtual void processDtmfEvents();
443 
444     /** Sends DTMF using the given call leg */
445     void sendDtmf(bool a_leg, int event, unsigned int duration_ms);
446 
447     /** Release all RTP streams of both legs and both AmB2BSessions as well.
448      *
449      * Though readStreams(), writeStreams() or processDtmfEvents() can be called
450      * after call to clearAudio, they will do nothing because all relevant
451      * information will be rlready eleased. */
clearAudio()452     virtual void clearAudio() { clearAudio(true); clearAudio(false); }
453 
454     /** release RTP streams for one leg */
455     void clearAudio(bool a_leg);
456 
457     /** Clear RTP timeout of all streams in both call legs. */
458     virtual void clearRTPTimeout();
459 
460     /** Callback function called once media processor releases this instance
461      * from processing loop.
462      *
463      * Deletes itself if there are no other references! FIXME: might be
464      * returning something like "release me" and calling delete from media
465      * processor would be better? */
466     virtual void onMediaProcessingTerminated();
467 
mute(bool a_leg)468     void mute(bool a_leg) { setMuteFlag(a_leg, true); }
unmute(bool a_leg)469     void unmute(bool a_leg) { setMuteFlag(a_leg, false); }
isMuted(bool a_leg)470     bool isMuted(bool a_leg) { if (a_leg) return a_leg_muted; else return b_leg_muted; }
471 
472     void setFirstStreamInput(bool a_leg, AmAudio *in);
473     void createHoldAnswer(bool a_leg, const AmSdp &offer, AmSdp &answer, bool use_zero_con);
474 
475     void setRtpLogger(msg_logger* _logger);
476 
477     /** enable or disable DTMF receiving on relay streams */
478     void setRelayDTMFReceiving(bool enabled);
479 
480     /** pause relaying on streams */
481     void pauseRelay();
482 
483     /** restart relaying on streams */
484     void restartRelay();
485 
486     /** set 'receving' property of RTP/relay streams (not receiving=drop incoming packets) */
487     void setReceiving(bool receiving_a, bool receiving_b);
488 
489     // print debug info
490     void debug();
491 };
492 
493 #endif
494