1 /* 2 * Copyright (C) 2002-2003 Fhg Fokus 3 * 4 * This file is part of SEMS, a free SIP media server. 5 * 6 * SEMS is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation; either version 2 of the License, or 9 * (at your option) any later version. This program is released under 10 * the GPL with the additional exemption that compiling, linking, 11 * and/or using OpenSSL is allowed. 12 * 13 * For a license to use the SEMS software under conditions 14 * other than those described here, or to purchase support for this 15 * software, please contact iptel.org by e-mail at the following addresses: 16 * info@iptel.org 17 * 18 * SEMS is distributed in the hope that it will be useful, 19 * but WITHOUT ANY WARRANTY; without even the implied warranty of 20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 21 * GNU General Public License for more details. 22 * 23 * You should have received a copy of the GNU General Public License 24 * along with this program; if not, write to the Free Software 25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 26 */ 27 28 #include "AmRtpReceiver.h" 29 #include "AmRtpStream.h" 30 #include "AmRtpMuxStream.h" 31 #include "AmRtpPacket.h" 32 #include "log.h" 33 #include "AmConfig.h" 34 35 #include <errno.h> 36 37 // Not on Solaris! 38 #if !defined (__SVR4) && !defined (__sun) 39 #include <strings.h> 40 #endif 41 42 _AmRtpReceiver::_AmRtpReceiver() 43 : mux_stream(NULL) 44 { 45 n_receivers = AmConfig::RTPReceiverThreads; 46 receivers = new AmRtpReceiverThread[n_receivers]; 47 } 48 49 _AmRtpReceiver::~_AmRtpReceiver() 50 { 51 if (mux_stream) { 52 receivers[0].removeStream(mux_stream->getLocalSocket(), mux_stream->getLocalPort()); 53 } 54 delete [] receivers; 55 } 56 57 AmRtpReceiverThread::AmRtpReceiverThread() 58 : stop_requested(false) 59 { 60 // libevent event base 61 ev_base = event_base_new(); 62 } 63 64 AmRtpReceiverThread::~AmRtpReceiverThread() 65 { 66 event_base_free(ev_base); 67 INFO("RTP receiver has been recycled.\n"); 68 } 69 70 void AmRtpReceiverThread::on_stop() 71 { 72 INFO("requesting RTP receiver to stop.\n"); 73 event_base_loopbreak(ev_base); 74 } 75 76 void AmRtpReceiverThread::stop_and_wait() 77 { 78 if(!is_stopped()) { 79 stop(); 80 81 while(!is_stopped()) 82 usleep(10000); getDataSize()83 } 84 } 85 86 void _AmRtpReceiver::dispose() 87 { 88 for(unsigned int i=0; i<n_receivers; i++){ 89 receivers[i].stop_and_wait(); 90 } 91 } 92 93 void AmRtpReceiverThread::run() 94 { 95 // fake event to prevent the event loop from exiting 96 int fake_fds[2]; 97 if (pipe(fake_fds)<0) { 98 DBG("error creating bogus pipe\n"); 99 } 100 struct event* ev_default = 101 event_new(ev_base,fake_fds[0], 102 EV_READ|EV_PERSIST, 103 NULL,NULL); 104 event_add(ev_default,NULL); 105 106 // run the event loop 107 event_base_loop(ev_base,0); 108 109 // clean-up fake fds/event 110 event_free(ev_default); 111 close(fake_fds[0]); 112 close(fake_fds[1]); 113 } 114 115 void AmRtpReceiverThread::_rtp_receiver_read_cb(evutil_socket_t sd, 116 short what, void* arg) 117 { 118 AmRtpReceiverThread::StreamInfo* p_si = 119 static_cast<AmRtpReceiverThread::StreamInfo*>(arg); 120 121 p_si->thread->streams_mut.lock(); 122 if(!p_si->stream) { 123 // we are about to get removed... 124 p_si->thread->streams_mut.unlock(); 125 return; 126 } 127 p_si->stream->recvPacket(sd); 128 p_si->thread->streams_mut.unlock(); 129 } 130 131 void AmRtpReceiverThread::_rtp_receiver_buf_cb(evutil_socket_t sd, 132 short what, void* arg) 133 { 134 AmRtpReceiverThread::RtpPacket* r_pkt = 135 static_cast<AmRtpReceiverThread::RtpPacket*>(arg); 136 137 if (NULL != r_pkt->stream) { 138 r_pkt->thread->streams_mut.lock(); 139 if (NULL != r_pkt->stream) { 140 r_pkt->stream->recvPacket(-1, r_pkt->pkt, r_pkt->len); 141 } 142 r_pkt->thread->streams_mut.unlock(); 143 } 144 delete r_pkt; 145 } 146 147 void AmRtpReceiverThread::addStream(int sd, AmRtpStream* stream) 148 { 149 streams_mut.lock(); 150 if(streams.find(sd) != streams.end()) { 151 ERROR("trying to insert existing stream [%p] with sd=%i\n", 152 stream,sd); 153 streams_mut.unlock(); 154 return; 155 } 156 157 StreamInfo& si = streams[sd]; 158 si.stream = stream; 159 event* ev_read = event_new(ev_base,sd,EV_READ|EV_PERSIST, 160 AmRtpReceiverThread::_rtp_receiver_read_cb,&si); 161 si.ev_read = ev_read; 162 si.thread = this; 163 164 streams_ports[stream->getLocalPort()] = stream; 165 streams_mut.unlock(); 166 167 // This must be done when 168 // streams_mut is NOT locked 169 event_add(ev_read,NULL); 170 } 171 172 void AmRtpReceiverThread::removeStream(int sd, int local_port) 173 { 174 streams_mut.lock(); 175 Streams::iterator sit = streams.find(sd); 176 if(sit == streams.end()) { 177 streams_mut.unlock(); 178 return; 179 } 180 181 StreamInfo& si = sit->second; 182 if(!si.stream || !si.ev_read){ 183 streams_mut.unlock(); 184 return; 185 } 186 187 si.stream = NULL; 188 event* ev_read = si.ev_read; 189 si.ev_read = NULL; 190 191 streams_mut.unlock(); 192 193 // This must be done while 194 // streams_mut is NOT locked 195 event_free(ev_read); 196 197 streams_mut.lock(); 198 // this must be done AFTER event_free() 199 // so that the StreamInfo does not get 200 // deleted while in recvPaket() 201 // (see recv callback) 202 streams.erase(sd); 203 streams_ports.erase(local_port); 204 streams_mut.unlock(); 205 } 206 207 int AmRtpReceiverThread::recvdPacket(bool need_lock, int local_port, unsigned char* buf, size_t len) { 208 // pass packet to correct recevier thread 209 AmRtpReceiverThread::RtpPacket* r_pkt = new AmRtpReceiverThread::RtpPacket(); 210 r_pkt->len = len; 211 r_pkt->pkt = new unsigned char[len]; 212 memcpy(r_pkt->pkt, buf, len); 213 214 event* ev_read = event_new(ev_base, -1 /* no fd */, 0 /* no events */, 215 AmRtpReceiverThread::_rtp_receiver_buf_cb, r_pkt); 216 r_pkt->ev_read = ev_read; 217 r_pkt->thread = this; 218 219 if (need_lock) 220 streams_mut.lock(); 221 222 std::map<int, AmRtpStream*>::iterator it = streams_ports.find(local_port); 223 if (it != streams_ports.end()) 224 r_pkt->stream = it->second; 225 else { 226 ERROR("could not find stream for local port %i\n", local_port); 227 } 228 if (need_lock) 229 streams_mut.unlock(); 230 // possibly call event_add here 231 event_active(ev_read, EV_READ, 0); 232 233 return 0; 234 } 235 236 void _AmRtpReceiver::start() 237 { 238 for(unsigned int i=0; i<n_receivers; i++) 239 receivers[i].start(); 240 } 241 242 void _AmRtpReceiver::startRtpMuxReceiver() 243 { 244 if (AmConfig::RtpMuxPort) { 245 DBG("Starting RTP MUX listener on port %d\n", AmConfig::RtpMuxPort); 246 mux_stream = new AmRtpMuxStream(); 247 mux_stream->setLocalIP(AmConfig::RtpMuxIP); 248 mux_stream->setLocalPort(AmConfig::RtpMuxPort); 249 250 receivers[0].addStream(mux_stream->getLocalSocket(), mux_stream); 251 DBG("added mux_stream [%p] to RTP receiver\n", mux_stream); 252 } else { 253 DBG("Not starting RTP MUX listener\n"); 254 } 255 } 256 257 void _AmRtpReceiver::addStream(int sd, AmRtpStream* stream) 258 { 259 unsigned int i = stream->getLocalPort() % n_receivers; 260 receivers[i].addStream(sd,stream); 261 } 262 263 void _AmRtpReceiver::removeStream(int sd, int local_port) 264 { 265 unsigned int i = local_port % n_receivers; 266 receivers[i].removeStream(sd, local_port); 267 } 268 269 int _AmRtpReceiver::recvdPacket(int recvd_port, int local_port, unsigned char* buf, size_t len) { 270 unsigned int i = local_port % n_receivers; 271 // need to lock if received on different receiver than the stream is handled by 272 unsigned int c = recvd_port % n_receivers; 273 274 return receivers[i].recvdPacket(i != c, local_port, buf, len); 275 } 276