1 // PHZ
2 // 2018-9-30
3
4 #include "MediaSession.h"
5 #include "RtpConnection.h"
6 #include <cstring>
7 #include <ctime>
8 #include <map>
9 #include <forward_list>
10 #include "net/Logger.h"
11 #include "net/SocketUtil.h"
12
13 using namespace xop;
14 using namespace std;
15
16 std::atomic_uint MediaSession::last_session_id_(1);
17
MediaSession(std::string url_suffxx)18 MediaSession::MediaSession(std::string url_suffxx)
19 : suffix_(url_suffxx)
20 , media_sources_(MAX_MEDIA_CHANNEL)
21 , buffer_(MAX_MEDIA_CHANNEL)
22 {
23 has_new_client_ = false;
24 session_id_ = ++last_session_id_;
25
26 for(int n=0; n<MAX_MEDIA_CHANNEL; n++) {
27 multicast_port_[n] = 0;
28 }
29 }
30
CreateNew(std::string url_suffix)31 MediaSession* MediaSession::CreateNew(std::string url_suffix)
32 {
33 return new MediaSession(std::move(url_suffix));
34 }
35
~MediaSession()36 MediaSession::~MediaSession()
37 {
38 if (multicast_ip_ != "") {
39 MulticastAddr::instance().Release(multicast_ip_);
40 }
41 }
42
AddNotifyConnectedCallback(const NotifyConnectedCallback & callback)43 void MediaSession::AddNotifyConnectedCallback(const NotifyConnectedCallback& callback)
44 {
45 notify_connected_callbacks_.push_back(callback);
46 }
47
AddNotifyDisconnectedCallback(const NotifyDisconnectedCallback & callback)48 void MediaSession::AddNotifyDisconnectedCallback(const NotifyDisconnectedCallback& callback)
49 {
50 notify_disconnected_callbacks_.push_back(callback);
51 }
52
AddSource(MediaChannelId channel_id,MediaSource * source)53 bool MediaSession::AddSource(MediaChannelId channel_id, MediaSource* source)
54 {
55 source->SetSendFrameCallback([this](MediaChannelId channel_id, RtpPacket pkt) {
56 std::forward_list<std::shared_ptr<RtpConnection>> clients;
57 std::map<int, RtpPacket> packets;
58 {
59 std::lock_guard<std::mutex> lock(map_mutex_);
60 for (auto iter = clients_.begin(); iter != clients_.end();) {
61 auto conn = iter->second.lock();
62 if (conn == nullptr) {
63 clients_.erase(iter++);
64 }
65 else {
66 int id = conn->GetId();
67 if (id >= 0) {
68 if (packets.find(id) == packets.end()) {
69 RtpPacket tmp_pkt;
70 memcpy(tmp_pkt.data.get(), pkt.data.get(), pkt.size);
71 tmp_pkt.size = pkt.size;
72 tmp_pkt.last = pkt.last;
73 tmp_pkt.timestamp = pkt.timestamp;
74 tmp_pkt.type = pkt.type;
75 packets.emplace(id, tmp_pkt);
76 }
77 clients.emplace_front(conn);
78 }
79 iter++;
80 }
81 }
82 }
83
84 int count = 0;
85 for(auto iter : clients) {
86 int ret = 0;
87 int id = iter->GetId();
88 if (id >= 0) {
89 auto iter2 = packets.find(id);
90 if (iter2 != packets.end()) {
91 count++;
92 ret = iter->SendRtpPacket(channel_id, iter2->second);
93 if (is_multicast_ && ret == 0) {
94 break;
95 }
96 }
97 }
98 }
99 return true;
100 });
101
102 media_sources_[channel_id].reset(source);
103 return true;
104 }
105
RemoveSource(MediaChannelId channel_id)106 bool MediaSession::RemoveSource(MediaChannelId channel_id)
107 {
108 media_sources_[channel_id] = nullptr;
109 return true;
110 }
111
StartMulticast()112 bool MediaSession::StartMulticast()
113 {
114 if (is_multicast_) {
115 return true;
116 }
117
118 multicast_ip_ = MulticastAddr::instance().GetAddr();
119 if (multicast_ip_ == "") {
120 return false;
121 }
122
123 std::random_device rd;
124 multicast_port_[channel_0] = htons(rd() & 0xfffe);
125 multicast_port_[channel_1] = htons(rd() & 0xfffe);
126
127 is_multicast_ = true;
128 return true;
129 }
130
GetSdpMessage(std::string ip,std::string session_name)131 std::string MediaSession::GetSdpMessage(std::string ip, std::string session_name)
132 {
133 if (sdp_ != "") {
134 return sdp_;
135 }
136
137 if (media_sources_.empty()) {
138 return "";
139 }
140
141 char buf[2048] = {0};
142
143 snprintf(buf, sizeof(buf),
144 "v=0\r\n"
145 "o=- 9%ld 1 IN IP4 %s\r\n"
146 "t=0 0\r\n"
147 "a=control:*\r\n" ,
148 (long)std::time(NULL), ip.c_str());
149
150 if(session_name != "") {
151 snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
152 "s=%s\r\n",
153 session_name.c_str());
154 }
155
156 if(is_multicast_) {
157 snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
158 "a=type:broadcast\r\n"
159 "a=rtcp-unicast: reflection\r\n");
160 }
161
162 for (uint32_t chn=0; chn<media_sources_.size(); chn++) {
163 if(media_sources_[chn]) {
164 if(is_multicast_) {
165 snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
166 "%s\r\n",
167 media_sources_[chn]->GetMediaDescription(multicast_port_[chn]).c_str());
168
169 snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
170 "c=IN IP4 %s/255\r\n",
171 multicast_ip_.c_str());
172 }
173 else {
174 snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
175 "%s\r\n",
176 media_sources_[chn]->GetMediaDescription(0).c_str());
177 }
178
179 snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
180 "%s\r\n",
181 media_sources_[chn]->GetAttribute().c_str());
182
183 snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
184 "a=control:track%d\r\n", chn);
185 }
186 }
187
188 sdp_ = buf;
189 return sdp_;
190 }
191
GetMediaSource(MediaChannelId channel_id)192 MediaSource* MediaSession::GetMediaSource(MediaChannelId channel_id)
193 {
194 if (media_sources_[channel_id]) {
195 return media_sources_[channel_id].get();
196 }
197
198 return nullptr;
199 }
200
HandleFrame(MediaChannelId channel_id,AVFrame frame)201 bool MediaSession::HandleFrame(MediaChannelId channel_id, AVFrame frame)
202 {
203 std::lock_guard<std::mutex> lock(mutex_);
204
205 if(media_sources_[channel_id]) {
206 media_sources_[channel_id]->HandleFrame(channel_id, frame);
207 }
208 else {
209 return false;
210 }
211
212 return true;
213 }
214
AddClient(SOCKET rtspfd,std::shared_ptr<RtpConnection> rtp_conn)215 bool MediaSession::AddClient(SOCKET rtspfd, std::shared_ptr<RtpConnection> rtp_conn)
216 {
217 std::lock_guard<std::mutex> lock(map_mutex_);
218
219 auto iter = clients_.find (rtspfd);
220 if(iter == clients_.end()) {
221 std::weak_ptr<RtpConnection> rtp_conn_weak_ptr = rtp_conn;
222 clients_.emplace(rtspfd, rtp_conn_weak_ptr);
223 for (auto& callback : notify_connected_callbacks_) {
224 callback(session_id_, rtp_conn->GetIp(), rtp_conn->GetPort());
225 }
226
227 has_new_client_ = true;
228 return true;
229 }
230
231 return false;
232 }
233
RemoveClient(SOCKET rtspfd)234 void MediaSession::RemoveClient(SOCKET rtspfd)
235 {
236 std::lock_guard<std::mutex> lock(map_mutex_);
237
238 auto iter = clients_.find(rtspfd);
239 if (iter != clients_.end()) {
240 auto conn = iter->second.lock();
241 if (conn) {
242 for (auto& callback : notify_disconnected_callbacks_) {
243 callback(session_id_, conn->GetIp(), conn->GetPort());
244 }
245 }
246 clients_.erase(iter);
247 }
248 }
249
250