1 /*
2 * Copyright (C) 2002-2003 Fhg Fokus
3 *
4 * This file is part of SEMS, a free SIP media server.
5 *
6 * SEMS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * For a license to use the sems software under conditions
12 * other than those described here, or to purchase support for this
13 * software, please contact iptel.org by e-mail at the following addresses:
14 * info@iptel.org
15 *
16 * SEMS is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
20 *
21 * You should have received a copy of the GNU General Public License
22 * along with this program; if not, write to the Free Software
23 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 */
25
26 #include "AmMultiPartyMixer.h"
27 #include "AmRtpStream.h"
28 #include "log.h"
ParamAmContentType::Param29
30 #include <assert.h>
31 #include <math.h>
32
33 // PCM16 range: [-32767:32768]
34 #define MAX_LINEAR_SAMPLE 32737
35
36 // the internal delay of the mixer (between put and get)
37 #define MIXER_DELAY_MS 20
38
39 #define MAX_BUFFER_STATES 50 // 1 sec max @ 20ms
40
41 void DEBUG_MIXER_BUFFER_STATE(const MixerBufferState& mbs, const string& context)
42 {
43 DBG("XXDebugMixerXX: dump of MixerBufferState %s", context.c_str());
44 DBG("XXDebugMixerXX: sample_rate = %u", mbs.sample_rate);
45 DBG("XXDebugMixerXX: last_ts = %u", mbs.last_ts);
46 for (MixerBufferState::ChannelMap::const_iterator it = mbs.channels.begin(); it != mbs.channels.end(); it++) {
47 DBG("XXDebugMixerXX: channel #%d present", it->first);
48 }
49 DBG("XXDebugMixerXX: end of MixerBufferState dump");
50 }
51
52 AmMultiPartyMixer::AmMultiPartyMixer()
53 : sampleratemap(), samplerates(),
54 channelids(), buffer_state(),
55 audio_mut(), scaling_factor(16)
56 {
57 }
58
59 AmMultiPartyMixer::~AmMultiPartyMixer()
60 {
61 for (std::deque<MixerBufferState>::iterator it = buffer_state.begin();
62 it != buffer_state.end(); it++) {
63 it->free_channels();
64 }
65 }
66
67 unsigned int AmMultiPartyMixer::addChannel(unsigned int external_sample_rate)
68 {
69 unsigned int cur_channel_id = 0;
70
71 audio_mut.lock();
72 ChannelIdSet::reverse_iterator rit = channelids.rbegin();
73 if (rit != channelids.rend()) {
74 cur_channel_id = *rit + 1;
75 }
76
77 channelids.insert(cur_channel_id);
78
79 for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
80 //DBG("XXDebugMixerXX: AmMultiPartyMixer::addChannel(): processing buffer state with sample rate %d", it->sample_rate);
81 if (it->sample_rate >= external_sample_rate) {
82 it->add_channel(cur_channel_id);
83 break;
84 }
85 }
86
87 //DBG("XXDebugMixerXX: added channel: #%i\n",cur_channel_id);
88
89 sampleratemap.insert(std::make_pair(cur_channel_id,external_sample_rate));
90 samplerates.insert(external_sample_rate);
91
92 audio_mut.unlock();
93 return cur_channel_id;
94 }
95
96 void AmMultiPartyMixer::removeChannel(unsigned int channel_id)
97 {
98 audio_mut.lock();
99 for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
100 it->remove_channel(channel_id);
101 }
102
103 channelids.erase(channel_id);
104
105 SampleRateMap::iterator sit = sampleratemap.find(channel_id);
106 if (sit != sampleratemap.end()) {
107 SampleRateSet::iterator it = samplerates.find(sit->second);
108 samplerates.erase(it);
109 sampleratemap.erase(channel_id);
110 }
111 //DBG("XXDebugMixerXX: removed channel: #%i\n",channel_id);
112 audio_mut.unlock();
113 }
114
115 void AmMultiPartyMixer::PutChannelPacket(unsigned int channel_id,
116 unsigned long long system_ts,
117 unsigned char* buffer,
118 unsigned int size)
119 {
120 if(!size)
121 return;
122 assert(size <= AUDIO_BUFFER_SIZE);
123
124 std::deque<MixerBufferState>::iterator bstate = findOrCreateBufferState(GetCurrentSampleRate());
125
126 SampleArrayShort* channel = 0;
127 if((channel = bstate->get_channel(channel_id)) != 0) {
128
129 unsigned samples = PCM16_B2S(size);
130 unsigned long long put_ts = system_ts + (MIXER_DELAY_MS * WALLCLOCK_RATE / 1000);
131 unsigned long long user_put_ts = put_ts * (GetCurrentSampleRate()/100) / (WALLCLOCK_RATE/100);
132
getCTStr()133 channel->put(user_put_ts,(short*)buffer,samples);
134 bstate->mixed_channel->get(user_put_ts,tmp_buffer,samples);
135
getCTHdr()136 mix_add(tmp_buffer,tmp_buffer,(short*)buffer,samples);
137 bstate->mixed_channel->put(user_put_ts,tmp_buffer,samples);
138 bstate->last_ts = put_ts + (samples * (WALLCLOCK_RATE/100) / (GetCurrentSampleRate()/100));
getParts()139 } else {
140 /*
141 ERROR("XXDebugMixerXX: MultiPartyMixer::PutChannelPacket: "
142 "channel #%i doesn't exist\n",channel_id);
143 DBG("XXDebugMixer:: PutChannelPacket failed ts=%u", ts);
144 for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
145 DEBUG_MIXER_BUFFER_STATE(*it, "on PutChannelPacket failure");
146 }*/
147 }
getPayload()148 }
149
150 void AmMultiPartyMixer::GetChannelPacket(unsigned int channel_id,
151 unsigned long long system_ts,
152 unsigned char* buffer,
153 unsigned int& size,
154 unsigned int& output_sample_rate)
155 {
156 if (!size)
157 return;
158 assert(size <= AUDIO_BUFFER_SIZE);
159
160 unsigned int last_ts = system_ts + (PCM16_B2S(size) * (WALLCLOCK_RATE/100) / (GetCurrentSampleRate()/100));
161 std::deque<MixerBufferState>::iterator bstate = findBufferStateForReading(GetCurrentSampleRate(), last_ts);
162
163 SampleArrayShort* channel = 0;
164 if(bstate != buffer_state.end() && (channel = bstate->get_channel(channel_id)) != 0) {
165
166 unsigned int samples = PCM16_B2S(size) * (bstate->sample_rate/100) / (GetCurrentSampleRate()/100);
167 assert(samples <= PCM16_B2S(AUDIO_BUFFER_SIZE));
168
169 unsigned long long cur_ts = system_ts * (bstate->sample_rate/100) / (WALLCLOCK_RATE/100);
170 bstate->mixed_channel->get(cur_ts,tmp_buffer,samples);
171 channel->get(cur_ts,(short*)buffer,samples);
172
173 mix_sub(tmp_buffer,tmp_buffer,(short*)buffer,samples);
174 scale((short*)buffer,tmp_buffer,samples);
175 size = PCM16_S2B(samples);
176 output_sample_rate = bstate->sample_rate;
177 } else if (bstate != buffer_state.end()) {
178 memset(buffer,0,size);
179 output_sample_rate = GetCurrentSampleRate();
180 //DBG("XXDebugMixerXX: GetChannelPacket returned zeroes, ts=%u, last_ts=%u, output_sample_rate=%u", ts, last_ts, output_sample_rate);
181 } else {
182 /*
183 ERROR("XXDebugMixerXX: MultiPartyMixer::GetChannelPacket: "
184 "channel #%i doesn't exist\n",channel_id);
185 DBG("XXDebugMixerXX: GetChannelPacket failed, ts=%u", ts);
186 for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
187 DEBUG_MIXER_BUFFER_STATE(*it, "on GetChannelPacket failure");
188 }*/
189 }
190
191 cleanupBufferStates(last_ts);
192 }
193
194 int AmMultiPartyMixer::GetCurrentSampleRate()
195 {
196 SampleRateSet::reverse_iterator sit = samplerates.rbegin();
197 if (sit != samplerates.rend()) {
198 return *sit;
199 } else {
200 return -1;
201 }
202 }
203
204 // int dest[size/2]
205 // int src1[size/2]
206 // short src2[size/2]
207 //
208 void AmMultiPartyMixer::mix_add(int* dest,int* src1,short* src2,unsigned int size)
209 {
210 int* end_dest = dest + size;
211
212 while(dest != end_dest)
213 *(dest++) = *(src1++) + int(*(src2++));
214 }
215
216 void AmMultiPartyMixer::mix_sub(int* dest,int* src1,short* src2,unsigned int size)
217 {
218 int* end_dest = dest + size;
219
220 while(dest != end_dest)
221 *(dest++) = *(src1++) - int(*(src2++));
222 }
223
224 void AmMultiPartyMixer::scale(short* buffer,int* tmp_buf,unsigned int size)
225 {
226 short* end_dest = buffer + size;
227
228 if(scaling_factor<64)
229 scaling_factor++;
230
231 while(buffer != end_dest){
232
233 int s = (*tmp_buf * scaling_factor) >> 6;
234 if(abs(s) > MAX_LINEAR_SAMPLE){
235 scaling_factor = abs( (MAX_LINEAR_SAMPLE<<6) / (*tmp_buf) );
236 if(s < 0)
237 s = -MAX_LINEAR_SAMPLE;
238 else
239 s = MAX_LINEAR_SAMPLE;
240 }
241 *(buffer++) = short(s);
242 tmp_buf++;
243 }
244 }
245
246 std::deque<MixerBufferState>::iterator AmMultiPartyMixer::findOrCreateBufferState(unsigned int sample_rate)
247 {
248 for (std::deque<MixerBufferState>::iterator it = buffer_state.begin(); it != buffer_state.end(); it++) {
249 if (it->sample_rate == sample_rate) {
250 it->fix_channels(channelids);
251 //DEBUG_MIXER_BUFFER_STATE(*it, "returned to PutChannelPacket");
252 return it;
253 }
254 }
255
256 //DBG("XXDebugMixerXX: Creating buffer state (from PutChannelPacket)");
257 buffer_state.push_back(MixerBufferState(sample_rate, channelids));
258 std::deque<MixerBufferState>::reverse_iterator rit = buffer_state.rbegin();
259 //DEBUG_MIXER_BUFFER_STATE(*((rit + 1).base()), "returned to PutChannelPacket");
260 return (rit + 1).base();
261 }
262
263 std::deque<MixerBufferState>::iterator
264 AmMultiPartyMixer::findBufferStateForReading(unsigned int sample_rate,
265 unsigned long long last_ts)
266 {
267 for (std::deque<MixerBufferState>::iterator it = buffer_state.begin();
268 it != buffer_state.end(); it++) {
269
270 if (sys_ts_less()(last_ts,it->last_ts)
271 || (last_ts == it->last_ts)) {
272 it->fix_channels(channelids);
273 //DEBUG_MIXER_BUFFER_STATE(*it, "returned to PutChannelPacket");
274 return it;
275 }
276 }
277
278 if (buffer_state.size() < MAX_BUFFER_STATES) {
279 // DBG("XXDebugMixerXX: Creating buffer state (from GetChannelPacket)\n");
280 buffer_state.push_back(MixerBufferState(sample_rate, channelids));
281 } // else just reuse the last buffer - conference without a speaker
282 std::deque<MixerBufferState>::reverse_iterator rit = buffer_state.rbegin();
283 //DEBUG_MIXER_BUFFER_STATE(*((rit + 1).base()), "returned to PutChannelPacket");
284 return (rit + 1).base();
285 }
286
287 void AmMultiPartyMixer::cleanupBufferStates(unsigned int last_ts)
288 {
289 while (!buffer_state.empty()
290 && (buffer_state.front().last_ts != 0 && buffer_state.front().last_ts < last_ts)
291 && (unsigned int)GetCurrentSampleRate() != buffer_state.front().sample_rate) {
292
293 //DEBUG_MIXER_BUFFER_STATE(buffer_state.front(), "freed in cleanupBufferStates");
294 buffer_state.front().free_channels();
295 buffer_state.pop_front();
296 }
297 }
298
299 void AmMultiPartyMixer::lock()
300 {
301 audio_mut.lock();
302 }
303
304 void AmMultiPartyMixer::unlock()
305 {
306 audio_mut.unlock();
307 }
308
309 MixerBufferState::MixerBufferState(unsigned int sample_rate, std::set<int>& channelids)
310 : sample_rate(sample_rate), last_ts(0), channels(), mixed_channel(NULL)
311 {
312 for (std::set<int>::iterator it = channelids.begin(); it != channelids.end(); it++) {
313 channels.insert(std::make_pair(*it,new SampleArrayShort()));
314 }
315
316 mixed_channel = new SampleArrayInt();
317 }
318
319 MixerBufferState::MixerBufferState(const MixerBufferState& other)
320 : sample_rate(other.sample_rate), last_ts(other.last_ts),
321 channels(other.channels), mixed_channel(other.mixed_channel)
322 {
323 }
324
325 MixerBufferState::~MixerBufferState()
326 {
327 }
328
329 void MixerBufferState::add_channel(unsigned int channel_id)
330 {
331 if (channels.find(channel_id) == channels.end())
332 channels.insert(std::make_pair(channel_id,new SampleArrayShort()));
333 }
334
335 void MixerBufferState::remove_channel(unsigned int channel_id)
336 {
337 ChannelMap::iterator channel_it = channels.find(channel_id);
338 if (channel_it != channels.end()) {
339 delete channel_it->second;
340 channels.erase(channel_it);
341 }
342 }
343
344 SampleArrayShort* MixerBufferState::get_channel(unsigned int channel_id)
345 {
346 ChannelMap::iterator channel_it = channels.find(channel_id);
347 if(channel_it == channels.end()){
348 ERROR("XXMixerDebugXX: channel #%i does not exist\n",channel_id);
349 return NULL;
350 }
351
352 return channel_it->second;
353 }
354
355 void MixerBufferState::fix_channels(std::set<int>& curchannelids)
356 {
357 for (std::set<int>::iterator it = curchannelids.begin(); it != curchannelids.end(); it++) {
358 if (channels.find(*it) == channels.end()) {
359 DBG("XXMixerDebugXX: fixing channel #%d", *it);
360 channels.insert(std::make_pair(*it,new SampleArrayShort()));
361 }
362 }
363 }
364
365 void MixerBufferState::free_channels()
366 {
367 for (ChannelMap::iterator it = channels.begin(); it != channels.end(); it++) {
368 if (it->second != NULL)
369 delete it->second;
370 }
371
372 delete mixed_channel;
373 }
374