1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 /** @file AmRtpReceiver.h */
28 #ifndef _AmRtpReceiver_h_
29 #define _AmRtpReceiver_h_
30 
31 #include "AmThread.h"
32 #include "atomic_types.h"
33 #include "singleton.h"
34 
35 #include <event2/event.h>
36 
37 #include <map>
38 using std::greater;
39 
40 class AmRtpStream;
41 class AmRtpMuxStream;
_AmRtpReceiver()42 class _AmRtpReceiver;
43 
44 /**
45  * \brief receiver for RTP for all streams.
46  *
47  * The RtpReceiver receives RTP packets for all streams
48  * that are registered to it. It places the received packets in
~_AmRtpReceiver()49  * the stream's buffer.
50  */
51 class AmRtpReceiverThread
52   : public AmThread
53 {
54   struct StreamInfo
55   {
56     AmRtpStream* stream;
57     struct event* ev_read;
58     AmRtpReceiverThread* thread;
59 
60     StreamInfo()
61       : stream(NULL),
62 	ev_read(NULL),
63 	thread(NULL)
64     {}
65   };
66 
67   struct RtpPacket
68   {
69     AmRtpStream* stream;
70     struct event* ev_read;
71     AmRtpReceiverThread* thread;
72     unsigned char* pkt;
73     size_t len;
74 
75     RtpPacket()
76       : stream(NULL),
77 	ev_read(NULL),
78 	thread(NULL),
79 	pkt(NULL),
80 	len(0)
81     {}
82     ~RtpPacket() {
83       if (pkt) {
84 	delete[] pkt;
85       }
86     }
87   };
88 
89   typedef std::map<int, StreamInfo> Streams;
90 
91   struct event_base* ev_base;
92   struct event*      ev_default;
93 
94   Streams  streams;
95   std::map<int, AmRtpStream*> streams_ports;
96   AmMutex  streams_mut;
97 
98   AmSharedVar<bool> stop_requested;
99 
100   static void _rtp_receiver_read_cb(evutil_socket_t sd, short what, void* arg);
101   static void _rtp_receiver_buf_cb(evutil_socket_t sd, short what, void* arg);
102 
103 public:
104   AmRtpReceiverThread();
105   ~AmRtpReceiverThread();
106 
107   void run();
108   void on_stop();
109 
110   void addStream(int sd, AmRtpStream* stream);
111   void removeStream(int sd, int local_port);
112   int recvdPacket(bool need_lock, int local_port, unsigned char* buf, size_t len);
113 
114   void stop_and_wait();
115 };
116 
117 class _AmRtpReceiver
118 {
119   AmRtpReceiverThread* receivers;
120   unsigned int         n_receivers;
121 
122   atomic_int next_index;
123 
124   AmRtpMuxStream* mux_stream;
125 
126 protected:
127   _AmRtpReceiver();
128   ~_AmRtpReceiver();
129 
130   void dispose();
_rtp_receiver_buf_cb(evutil_socket_t sd,short what,void * arg)131 
132 public:
133   void start();
134 
135   void addStream(int sd, AmRtpStream* stream);
136   void removeStream(int sd, int local_port);
137 
138   int recvdPacket(int recvd_port, int local_port, unsigned char* buf, size_t len);
139   void startRtpMuxReceiver();
140 };
141 
142 typedef singleton<_AmRtpReceiver> AmRtpReceiver;
143 
144 #endif
145 
146 // Local Variables:
addStream(int sd,AmRtpStream * stream)147 // mode:C++
148 // End:
149