1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * For a license to use the sems software under conditions
12  * other than those described here, or to purchase support for this
13  * software, please contact iptel.org by e-mail at the following addresses:
14  *    info@iptel.org
15  *
16  * SEMS is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with this program; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24  */
25 
26 #include "AmMultiPartyMixer.h"
27 #include "AmRtpStream.h"
28 #include "log.h"
ParamAmContentType::Param29 
30 #include <assert.h>
31 #include <math.h>
32 
33 // PCM16 range: [-32767:32768]
34 #define MAX_LINEAR_SAMPLE 32737
35 
36 // the internal delay of the mixer (between put and get)
37 #define MIXER_DELAY_MS 20
38 
39 #define MAX_BUFFER_STATES 50 // 1 sec max @ 20ms
40 
41 void DEBUG_MIXER_BUFFER_STATE(const MixerBufferState& mbs, const string& context)
42 {
43   DBG("XXDebugMixerXX: dump of MixerBufferState %s", context.c_str());
44   DBG("XXDebugMixerXX: sample_rate = %u", mbs.sample_rate);
45   DBG("XXDebugMixerXX: last_ts = %u", mbs.last_ts);
46   for (MixerBufferState::ChannelMap::const_iterator it = mbs.channels.begin(); it != mbs.channels.end(); it++) {
47     DBG("XXDebugMixerXX: channel #%d present", it->first);
48   }
49   DBG("XXDebugMixerXX: end of MixerBufferState dump");
50 }
51 
52 AmMultiPartyMixer::AmMultiPartyMixer()
53   : sampleratemap(), samplerates(),
54     channelids(), buffer_state(),
55     audio_mut(), scaling_factor(16)
56 {
57 }
58 
59 AmMultiPartyMixer::~AmMultiPartyMixer()
60 {
61   for (std::deque<MixerBufferState>::iterator it = buffer_state.begin();
62        it != buffer_state.end(); it++) {
63     it->free_channels();
64   }
65 }
66 
67 unsigned int AmMultiPartyMixer::addChannel(unsigned int external_sample_rate)
68 {
69   unsigned int cur_channel_id = 0;
70 
71   audio_mut.lock();
72   ChannelIdSet::reverse_iterator rit = channelids.rbegin();
73   if (rit != channelids.rend()) {
74     cur_channel_id = *rit + 1;
75   }
76 
77   channelids.insert(cur_channel_id);
78 
79   for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
80     //DBG("XXDebugMixerXX: AmMultiPartyMixer::addChannel(): processing buffer state with sample rate %d", it->sample_rate);
81     if (it->sample_rate >= external_sample_rate) {
82       it->add_channel(cur_channel_id);
83       break;
84     }
85   }
86 
87   //DBG("XXDebugMixerXX: added channel: #%i\n",cur_channel_id);
88 
89   sampleratemap.insert(std::make_pair(cur_channel_id,external_sample_rate));
90   samplerates.insert(external_sample_rate);
91 
92   audio_mut.unlock();
93   return cur_channel_id;
94 }
95 
96 void AmMultiPartyMixer::removeChannel(unsigned int channel_id)
97 {
98   audio_mut.lock();
99   for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
100     it->remove_channel(channel_id);
101   }
102 
103   channelids.erase(channel_id);
104 
105   SampleRateMap::iterator sit = sampleratemap.find(channel_id);
106   if (sit != sampleratemap.end()) {
107 	SampleRateSet::iterator it = samplerates.find(sit->second);
108 	samplerates.erase(it);
109 	sampleratemap.erase(channel_id);
110   }
111   //DBG("XXDebugMixerXX: removed channel: #%i\n",channel_id);
112   audio_mut.unlock();
113 }
114 
115 void AmMultiPartyMixer::PutChannelPacket(unsigned int   channel_id,
116 					 unsigned long long system_ts,
117 					 unsigned char* buffer,
118 					 unsigned int   size)
119 {
120   if(!size)
121     return;
122   assert(size <= AUDIO_BUFFER_SIZE);
123 
124   std::deque<MixerBufferState>::iterator bstate = findOrCreateBufferState(GetCurrentSampleRate());
125 
126   SampleArrayShort* channel = 0;
127   if((channel = bstate->get_channel(channel_id)) != 0) {
128 
129     unsigned samples = PCM16_B2S(size);
130     unsigned long long put_ts = system_ts + (MIXER_DELAY_MS * WALLCLOCK_RATE / 1000);
131     unsigned long long user_put_ts = put_ts * (GetCurrentSampleRate()/100) / (WALLCLOCK_RATE/100);
132 
getCTStr()133     channel->put(user_put_ts,(short*)buffer,samples);
134     bstate->mixed_channel->get(user_put_ts,tmp_buffer,samples);
135 
getCTHdr()136     mix_add(tmp_buffer,tmp_buffer,(short*)buffer,samples);
137     bstate->mixed_channel->put(user_put_ts,tmp_buffer,samples);
138     bstate->last_ts = put_ts + (samples * (WALLCLOCK_RATE/100) / (GetCurrentSampleRate()/100));
getParts()139   } else {
140     /*
141     ERROR("XXDebugMixerXX: MultiPartyMixer::PutChannelPacket: "
142 	  "channel #%i doesn't exist\n",channel_id);
143     DBG("XXDebugMixer:: PutChannelPacket failed ts=%u", ts);
144     for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
145       DEBUG_MIXER_BUFFER_STATE(*it, "on PutChannelPacket failure");
146       }*/
147   }
getPayload()148 }
149 
150 void AmMultiPartyMixer::GetChannelPacket(unsigned int   channel_id,
151 					 unsigned long long system_ts,
152 					 unsigned char* buffer,
153 					 unsigned int&  size,
154 					 unsigned int&  output_sample_rate)
155 {
156   if (!size)
157     return;
158   assert(size <= AUDIO_BUFFER_SIZE);
159 
160   unsigned int last_ts = system_ts + (PCM16_B2S(size) * (WALLCLOCK_RATE/100) / (GetCurrentSampleRate()/100));
161   std::deque<MixerBufferState>::iterator bstate = findBufferStateForReading(GetCurrentSampleRate(), last_ts);
162 
163   SampleArrayShort* channel = 0;
164   if(bstate != buffer_state.end() && (channel = bstate->get_channel(channel_id)) != 0) {
165 
166     unsigned int samples = PCM16_B2S(size) * (bstate->sample_rate/100) / (GetCurrentSampleRate()/100);
167     assert(samples <= PCM16_B2S(AUDIO_BUFFER_SIZE));
168 
169     unsigned long long cur_ts = system_ts * (bstate->sample_rate/100) / (WALLCLOCK_RATE/100);
170     bstate->mixed_channel->get(cur_ts,tmp_buffer,samples);
171     channel->get(cur_ts,(short*)buffer,samples);
172 
173     mix_sub(tmp_buffer,tmp_buffer,(short*)buffer,samples);
174     scale((short*)buffer,tmp_buffer,samples);
175     size = PCM16_S2B(samples);
176     output_sample_rate = bstate->sample_rate;
177   } else if (bstate != buffer_state.end()) {
178     memset(buffer,0,size);
179     output_sample_rate = GetCurrentSampleRate();
180     //DBG("XXDebugMixerXX: GetChannelPacket returned zeroes, ts=%u, last_ts=%u, output_sample_rate=%u", ts, last_ts, output_sample_rate);
181   } else {
182     /*
183     ERROR("XXDebugMixerXX: MultiPartyMixer::GetChannelPacket: "
184 	  "channel #%i doesn't exist\n",channel_id);
185     DBG("XXDebugMixerXX: GetChannelPacket failed, ts=%u", ts);
186     for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
187       DEBUG_MIXER_BUFFER_STATE(*it, "on GetChannelPacket failure");
188       }*/
189   }
190 
191   cleanupBufferStates(last_ts);
192 }
193 
194 int AmMultiPartyMixer::GetCurrentSampleRate()
195 {
196   SampleRateSet::reverse_iterator sit = samplerates.rbegin();
197   if (sit != samplerates.rend()) {
198 	return *sit;
199   } else {
200 	return -1;
201   }
202 }
203 
204 // int   dest[size/2]
205 // int   src1[size/2]
206 // short src2[size/2]
207 //
208 void AmMultiPartyMixer::mix_add(int* dest,int* src1,short* src2,unsigned int size)
209 {
210   int* end_dest = dest + size;
211 
212   while(dest != end_dest)
213     *(dest++) = *(src1++) + int(*(src2++));
214 }
215 
216 void AmMultiPartyMixer::mix_sub(int* dest,int* src1,short* src2,unsigned int size)
217 {
218   int* end_dest = dest + size;
219 
220   while(dest != end_dest)
221     *(dest++) = *(src1++) - int(*(src2++));
222 }
223 
224 void AmMultiPartyMixer::scale(short* buffer,int* tmp_buf,unsigned int size)
225 {
226   short* end_dest = buffer + size;
227 
228   if(scaling_factor<64)
229     scaling_factor++;
230 
231   while(buffer != end_dest){
232 
233     int s = (*tmp_buf * scaling_factor) >> 6;
234     if(abs(s) > MAX_LINEAR_SAMPLE){
235       scaling_factor = abs( (MAX_LINEAR_SAMPLE<<6) / (*tmp_buf) );
236       if(s < 0)
237 	s = -MAX_LINEAR_SAMPLE;
238       else
239 	s = MAX_LINEAR_SAMPLE;
240     }
241     *(buffer++) = short(s);
242     tmp_buf++;
243   }
244 }
245 
246 std::deque<MixerBufferState>::iterator AmMultiPartyMixer::findOrCreateBufferState(unsigned int sample_rate)
247 {
248   for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
249     if (it->sample_rate == sample_rate) {
250       it->fix_channels(channelids);
251       //DEBUG_MIXER_BUFFER_STATE(*it, "returned to PutChannelPacket");
252       return it;
253     }
254   }
255 
256   //DBG("XXDebugMixerXX: Creating buffer state (from PutChannelPacket)");
257   buffer_state.push_back(MixerBufferState(sample_rate, channelids));
258   std::deque<MixerBufferState>::reverse_iterator rit = buffer_state.rbegin();
259   //DEBUG_MIXER_BUFFER_STATE(*((rit + 1).base()), "returned to PutChannelPacket");
260   return (rit + 1).base();
261 }
262 
263 std::deque<MixerBufferState>::iterator
264 AmMultiPartyMixer::findBufferStateForReading(unsigned int sample_rate,
265 					     unsigned long long last_ts)
266 {
267   for (std::deque<MixerBufferState>::iterator it = buffer_state.begin();
268        it != buffer_state.end(); it++) {
269 
270     if (sys_ts_less()(last_ts,it->last_ts)
271 	|| (last_ts == it->last_ts)) {
272       it->fix_channels(channelids);
273       //DEBUG_MIXER_BUFFER_STATE(*it, "returned to PutChannelPacket");
274       return it;
275     }
276   }
277 
278   if (buffer_state.size() < MAX_BUFFER_STATES) {
279     // DBG("XXDebugMixerXX: Creating buffer state (from GetChannelPacket)\n");
280     buffer_state.push_back(MixerBufferState(sample_rate, channelids));
281   } // else just reuse the last buffer - conference without a speaker
282   std::deque<MixerBufferState>::reverse_iterator rit = buffer_state.rbegin();
283   //DEBUG_MIXER_BUFFER_STATE(*((rit + 1).base()), "returned to PutChannelPacket");
284   return (rit + 1).base();
285 }
286 
287 void AmMultiPartyMixer::cleanupBufferStates(unsigned int last_ts)
288 {
289   while (!buffer_state.empty()
290 	 && (buffer_state.front().last_ts != 0 && buffer_state.front().last_ts < last_ts)
291 	 && (unsigned int)GetCurrentSampleRate() != buffer_state.front().sample_rate) {
292 
293     //DEBUG_MIXER_BUFFER_STATE(buffer_state.front(), "freed in cleanupBufferStates");
294     buffer_state.front().free_channels();
295     buffer_state.pop_front();
296   }
297 }
298 
299 void AmMultiPartyMixer::lock()
300 {
301   audio_mut.lock();
302 }
303 
304 void AmMultiPartyMixer::unlock()
305 {
306   audio_mut.unlock();
307 }
308 
309 MixerBufferState::MixerBufferState(unsigned int sample_rate, std::set<int>& channelids)
310   : sample_rate(sample_rate), last_ts(0), channels(), mixed_channel(NULL)
311 {
312   for (std::set<int>::iterator it = channelids.begin(); it != channelids.end(); it++) {
313     channels.insert(std::make_pair(*it,new SampleArrayShort()));
314   }
315 
316   mixed_channel = new SampleArrayInt();
317 }
318 
319 MixerBufferState::MixerBufferState(const MixerBufferState& other)
320   : sample_rate(other.sample_rate), last_ts(other.last_ts),
321     channels(other.channels), mixed_channel(other.mixed_channel)
322 {
323 }
324 
325 MixerBufferState::~MixerBufferState()
326 {
327 }
328 
329 void MixerBufferState::add_channel(unsigned int channel_id)
330 {
331   if (channels.find(channel_id) == channels.end())
332     channels.insert(std::make_pair(channel_id,new SampleArrayShort()));
333 }
334 
335 void MixerBufferState::remove_channel(unsigned int channel_id)
336 {
337   ChannelMap::iterator channel_it = channels.find(channel_id);
338   if (channel_it != channels.end()) {
339     delete channel_it->second;
340     channels.erase(channel_it);
341   }
342 }
343 
344 SampleArrayShort* MixerBufferState::get_channel(unsigned int channel_id)
345 {
346   ChannelMap::iterator channel_it = channels.find(channel_id);
347   if(channel_it == channels.end()){
348     ERROR("XXMixerDebugXX: channel #%i does not exist\n",channel_id);
349     return NULL;
350   }
351 
352   return channel_it->second;
353 }
354 
355 void MixerBufferState::fix_channels(std::set<int>& curchannelids)
356 {
357   for (std::set<int>::iterator it = curchannelids.begin(); it != curchannelids.end(); it++) {
358     if (channels.find(*it) == channels.end()) {
359       DBG("XXMixerDebugXX: fixing channel #%d", *it);
360       channels.insert(std::make_pair(*it,new SampleArrayShort()));
361     }
362   }
363 }
364 
365 void MixerBufferState::free_channels()
366 {
367   for (ChannelMap::iterator it = channels.begin(); it != channels.end(); it++) {
368     if (it->second != NULL)
369       delete it->second;
370   }
371 
372   delete mixed_channel;
373 }
374