1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 
28 #include "AmRtpReceiver.h"
29 #include "AmRtpStream.h"
30 #include "AmRtpMuxStream.h"
31 #include "AmRtpPacket.h"
32 #include "log.h"
33 #include "AmConfig.h"
34 
35 #include <errno.h>
36 
37 // Not on Solaris!
38 #if !defined (__SVR4) && !defined (__sun)
39 #include <strings.h>
40 #endif
41 
42 _AmRtpReceiver::_AmRtpReceiver()
43   : mux_stream(NULL)
44 {
45   n_receivers = AmConfig::RTPReceiverThreads;
46   receivers = new AmRtpReceiverThread[n_receivers];
47 }
48 
49 _AmRtpReceiver::~_AmRtpReceiver()
50 {
51   if (mux_stream) {
52     receivers[0].removeStream(mux_stream->getLocalSocket(), mux_stream->getLocalPort());
53   }
54   delete [] receivers;
55 }
56 
57 AmRtpReceiverThread::AmRtpReceiverThread()
58   : stop_requested(false)
59 {
60   // libevent event base
61   ev_base = event_base_new();
62 }
63 
64 AmRtpReceiverThread::~AmRtpReceiverThread()
65 {
66   event_base_free(ev_base);
67   INFO("RTP receiver has been recycled.\n");
68 }
69 
70 void AmRtpReceiverThread::on_stop()
71 {
72   INFO("requesting RTP receiver to stop.\n");
73   event_base_loopbreak(ev_base);
74 }
75 
76 void AmRtpReceiverThread::stop_and_wait()
77 {
78   if(!is_stopped()) {
79     stop();
80 
81     while(!is_stopped())
82       usleep(10000);
getDataSize()83   }
84 }
85 
86 void _AmRtpReceiver::dispose()
87 {
88   for(unsigned int i=0; i<n_receivers; i++){
89     receivers[i].stop_and_wait();
90   }
91 }
92 
93 void AmRtpReceiverThread::run()
94 {
95   // fake event to prevent the event loop from exiting
96   int fake_fds[2];
97   if (pipe(fake_fds)<0) {
98     DBG("error creating bogus pipe\n");
99   }
100   struct event* ev_default =
101     event_new(ev_base,fake_fds[0],
102 	      EV_READ|EV_PERSIST,
103 	      NULL,NULL);
104   event_add(ev_default,NULL);
105 
106   // run the event loop
107   event_base_loop(ev_base,0);
108 
109   // clean-up fake fds/event
110   event_free(ev_default);
111   close(fake_fds[0]);
112   close(fake_fds[1]);
113 }
114 
115 void AmRtpReceiverThread::_rtp_receiver_read_cb(evutil_socket_t sd,
116 						short what, void* arg)
117 {
118   AmRtpReceiverThread::StreamInfo* p_si =
119     static_cast<AmRtpReceiverThread::StreamInfo*>(arg);
120 
121   p_si->thread->streams_mut.lock();
122   if(!p_si->stream) {
123     // we are about to get removed...
124     p_si->thread->streams_mut.unlock();
125     return;
126   }
127   p_si->stream->recvPacket(sd);
128   p_si->thread->streams_mut.unlock();
129 }
130 
131 void AmRtpReceiverThread::_rtp_receiver_buf_cb(evutil_socket_t sd,
132 					       short what, void* arg)
133 {
134   AmRtpReceiverThread::RtpPacket* r_pkt =
135     static_cast<AmRtpReceiverThread::RtpPacket*>(arg);
136 
137   if (NULL != r_pkt->stream) {
138     r_pkt->thread->streams_mut.lock();
139     if (NULL != r_pkt->stream) {
140       r_pkt->stream->recvPacket(-1, r_pkt->pkt, r_pkt->len);
141     }
142     r_pkt->thread->streams_mut.unlock();
143   }
144   delete r_pkt;
145 }
146 
147 void AmRtpReceiverThread::addStream(int sd, AmRtpStream* stream)
148 {
149   streams_mut.lock();
150   if(streams.find(sd) != streams.end()) {
151     ERROR("trying to insert existing stream [%p] with sd=%i\n",
152 	  stream,sd);
153     streams_mut.unlock();
154     return;
155   }
156 
157   StreamInfo& si = streams[sd];
158   si.stream = stream;
159   event* ev_read = event_new(ev_base,sd,EV_READ|EV_PERSIST,
160 			     AmRtpReceiverThread::_rtp_receiver_read_cb,&si);
161   si.ev_read = ev_read;
162   si.thread = this;
163 
164   streams_ports[stream->getLocalPort()] = stream;
165   streams_mut.unlock();
166 
167   // This must be done when
168   // streams_mut is NOT locked
169   event_add(ev_read,NULL);
170 }
171 
172 void AmRtpReceiverThread::removeStream(int sd, int local_port)
173 {
174   streams_mut.lock();
175   Streams::iterator sit = streams.find(sd);
176   if(sit == streams.end()) {
177     streams_mut.unlock();
178     return;
179   }
180 
181   StreamInfo& si = sit->second;
182   if(!si.stream || !si.ev_read){
183     streams_mut.unlock();
184     return;
185   }
186 
187   si.stream = NULL;
188   event* ev_read = si.ev_read;
189   si.ev_read = NULL;
190 
191   streams_mut.unlock();
192 
193   // This must be done while
194   // streams_mut is NOT locked
195   event_free(ev_read);
196 
197   streams_mut.lock();
198   // this must be done AFTER event_free()
199   // so that the StreamInfo does not get
200   // deleted while in recvPaket()
201   // (see recv callback)
202   streams.erase(sd);
203   streams_ports.erase(local_port);
204   streams_mut.unlock();
205 }
206 
207 int AmRtpReceiverThread::recvdPacket(bool need_lock, int local_port, unsigned char* buf, size_t len) {
208   // pass packet to correct recevier thread
209   AmRtpReceiverThread::RtpPacket* r_pkt = new AmRtpReceiverThread::RtpPacket();
210   r_pkt->len = len;
211   r_pkt->pkt = new unsigned char[len];
212   memcpy(r_pkt->pkt, buf, len);
213 
214   event* ev_read = event_new(ev_base, -1 /* no fd */, 0 /* no events */,
215 			     AmRtpReceiverThread::_rtp_receiver_buf_cb, r_pkt);
216   r_pkt->ev_read = ev_read;
217   r_pkt->thread = this;
218 
219   if (need_lock)
220     streams_mut.lock();
221 
222   std::map<int, AmRtpStream*>::iterator it = streams_ports.find(local_port);
223   if (it != streams_ports.end())
224     r_pkt->stream = it->second;
225   else {
226     ERROR("could not find stream for local port %i\n", local_port);
227   }
228   if (need_lock)
229     streams_mut.unlock();
230   // possibly call event_add here
231   event_active(ev_read, EV_READ, 0);
232 
233   return 0;
234 }
235 
236 void _AmRtpReceiver::start()
237 {
238   for(unsigned int i=0; i<n_receivers; i++)
239     receivers[i].start();
240 }
241 
242 void _AmRtpReceiver::startRtpMuxReceiver()
243 {
244   if (AmConfig::RtpMuxPort) {
245     DBG("Starting RTP MUX listener on port %d\n", AmConfig::RtpMuxPort);
246     mux_stream = new AmRtpMuxStream();
247     mux_stream->setLocalIP(AmConfig::RtpMuxIP);
248     mux_stream->setLocalPort(AmConfig::RtpMuxPort);
249 
250     receivers[0].addStream(mux_stream->getLocalSocket(), mux_stream);
251     DBG("added mux_stream [%p] to RTP receiver\n", mux_stream);
252   } else {
253     DBG("Not starting RTP MUX listener\n");
254   }
255 }
256 
257 void _AmRtpReceiver::addStream(int sd, AmRtpStream* stream)
258 {
259   unsigned int i = stream->getLocalPort()  % n_receivers;
260   receivers[i].addStream(sd,stream);
261 }
262 
263 void _AmRtpReceiver::removeStream(int sd, int local_port)
264 {
265   unsigned int i = local_port % n_receivers;
266   receivers[i].removeStream(sd, local_port);
267 }
268 
269 int _AmRtpReceiver::recvdPacket(int recvd_port, int local_port, unsigned char* buf, size_t len) {
270   unsigned int i = local_port % n_receivers;
271   // need to lock if received on different receiver than the stream is handled by
272   unsigned int c = recvd_port % n_receivers;
273 
274   return receivers[i].recvdPacket(i != c, local_port, buf, len);
275 }
276