1 // PHZ
2 // 2018-9-30
3 
4 #include "MediaSession.h"
5 #include "RtpConnection.h"
6 #include <cstring>
7 #include <ctime>
8 #include <map>
9 #include <forward_list>
10 #include "net/Logger.h"
11 #include "net/SocketUtil.h"
12 
13 using namespace xop;
14 using namespace std;
15 
16 std::atomic_uint MediaSession::last_session_id_(1);
17 
MediaSession(std::string url_suffxx)18 MediaSession::MediaSession(std::string url_suffxx)
19 	: suffix_(url_suffxx)
20 	, media_sources_(MAX_MEDIA_CHANNEL)
21 	, buffer_(MAX_MEDIA_CHANNEL)
22 {
23 	has_new_client_ = false;
24 	session_id_ = ++last_session_id_;
25 
26 	for(int n=0; n<MAX_MEDIA_CHANNEL; n++) {
27 		multicast_port_[n] = 0;
28 	}
29 }
30 
CreateNew(std::string url_suffix)31 MediaSession* MediaSession::CreateNew(std::string url_suffix)
32 {
33 	return new MediaSession(std::move(url_suffix));
34 }
35 
~MediaSession()36 MediaSession::~MediaSession()
37 {
38 	if (multicast_ip_ != "") {
39 		MulticastAddr::instance().Release(multicast_ip_);
40 	}
41 }
42 
AddNotifyConnectedCallback(const NotifyConnectedCallback & callback)43 void MediaSession::AddNotifyConnectedCallback(const NotifyConnectedCallback& callback)
44 {
45 	notify_connected_callbacks_.push_back(callback);
46 }
47 
AddNotifyDisconnectedCallback(const NotifyDisconnectedCallback & callback)48 void MediaSession::AddNotifyDisconnectedCallback(const NotifyDisconnectedCallback& callback)
49 {
50 	notify_disconnected_callbacks_.push_back(callback);
51 }
52 
AddSource(MediaChannelId channel_id,MediaSource * source)53 bool MediaSession::AddSource(MediaChannelId channel_id, MediaSource* source)
54 {
55 	source->SetSendFrameCallback([this](MediaChannelId channel_id, RtpPacket pkt) {
56 		std::forward_list<std::shared_ptr<RtpConnection>> clients;
57 		std::map<int, RtpPacket> packets;
58 		{
59 			std::lock_guard<std::mutex> lock(map_mutex_);
60 			for (auto iter = clients_.begin(); iter != clients_.end();) {
61 				auto conn = iter->second.lock();
62 				if (conn == nullptr) {
63 					clients_.erase(iter++);
64 				}
65 				else  {
66 					int id = conn->GetId();
67 					if (id >= 0) {
68 						if (packets.find(id) == packets.end()) {
69 							RtpPacket tmp_pkt;
70 							memcpy(tmp_pkt.data.get(), pkt.data.get(), pkt.size);
71 							tmp_pkt.size = pkt.size;
72 							tmp_pkt.last = pkt.last;
73 							tmp_pkt.timestamp = pkt.timestamp;
74 							tmp_pkt.type = pkt.type;
75 							packets.emplace(id, tmp_pkt);
76 						}
77 						clients.emplace_front(conn);
78 					}
79 					iter++;
80 				}
81 			}
82 		}
83 
84 		int count = 0;
85 		for(auto iter : clients) {
86 			int ret = 0;
87 			int id = iter->GetId();
88 			if (id >= 0) {
89 				auto iter2 = packets.find(id);
90 				if (iter2 != packets.end()) {
91 					count++;
92 					ret = iter->SendRtpPacket(channel_id, iter2->second);
93 					if (is_multicast_ && ret == 0) {
94 						break;
95 					}
96 				}
97 			}
98 		}
99 		return true;
100 		});
101 
102 	media_sources_[channel_id].reset(source);
103 	return true;
104 }
105 
RemoveSource(MediaChannelId channel_id)106 bool MediaSession::RemoveSource(MediaChannelId channel_id)
107 {
108 	media_sources_[channel_id] = nullptr;
109 	return true;
110 }
111 
StartMulticast()112 bool MediaSession::StartMulticast()
113 {
114 	if (is_multicast_) {
115 		return true;
116 	}
117 
118 	multicast_ip_ = MulticastAddr::instance().GetAddr();
119 	if (multicast_ip_ == "") {
120 		return false;
121 	}
122 
123 	std::random_device rd;
124 	multicast_port_[channel_0] = htons(rd() & 0xfffe);
125 	multicast_port_[channel_1] = htons(rd() & 0xfffe);
126 
127 	is_multicast_ = true;
128 	return true;
129 }
130 
GetSdpMessage(std::string ip,std::string session_name)131 std::string MediaSession::GetSdpMessage(std::string ip, std::string session_name)
132 {
133 	if (sdp_ != "") {
134 		return sdp_;
135 	}
136 
137 	if (media_sources_.empty()) {
138 		return "";
139 	}
140 
141 	char buf[2048] = {0};
142 
143 	snprintf(buf, sizeof(buf),
144 			"v=0\r\n"
145 			"o=- 9%ld 1 IN IP4 %s\r\n"
146 			"t=0 0\r\n"
147 			"a=control:*\r\n" ,
148 			(long)std::time(NULL), ip.c_str());
149 
150 	if(session_name != "") {
151 		snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
152 				"s=%s\r\n",
153 				session_name.c_str());
154 	}
155 
156 	if(is_multicast_) {
157 		snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
158 				"a=type:broadcast\r\n"
159 				"a=rtcp-unicast: reflection\r\n");
160 	}
161 
162 	for (uint32_t chn=0; chn<media_sources_.size(); chn++) {
163 		if(media_sources_[chn]) {
164 			if(is_multicast_) {
165 				snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
166 						"%s\r\n",
167 						media_sources_[chn]->GetMediaDescription(multicast_port_[chn]).c_str());
168 
169 				snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
170 						"c=IN IP4 %s/255\r\n",
171 						multicast_ip_.c_str());
172 			}
173 			else {
174 				snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
175 						"%s\r\n",
176 						media_sources_[chn]->GetMediaDescription(0).c_str());
177 			}
178 
179 			snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
180 					"%s\r\n",
181 					media_sources_[chn]->GetAttribute().c_str());
182 
183 			snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
184 					"a=control:track%d\r\n", chn);
185 		}
186 	}
187 
188 	sdp_ = buf;
189 	return sdp_;
190 }
191 
GetMediaSource(MediaChannelId channel_id)192 MediaSource* MediaSession::GetMediaSource(MediaChannelId channel_id)
193 {
194 	if (media_sources_[channel_id]) {
195 		return media_sources_[channel_id].get();
196 	}
197 
198 	return nullptr;
199 }
200 
HandleFrame(MediaChannelId channel_id,AVFrame frame)201 bool MediaSession::HandleFrame(MediaChannelId channel_id, AVFrame frame)
202 {
203 	std::lock_guard<std::mutex> lock(mutex_);
204 
205 	if(media_sources_[channel_id]) {
206 		media_sources_[channel_id]->HandleFrame(channel_id, frame);
207 	}
208 	else {
209 		return false;
210 	}
211 
212 	return true;
213 }
214 
AddClient(SOCKET rtspfd,std::shared_ptr<RtpConnection> rtp_conn)215 bool MediaSession::AddClient(SOCKET rtspfd, std::shared_ptr<RtpConnection> rtp_conn)
216 {
217 	std::lock_guard<std::mutex> lock(map_mutex_);
218 
219 	auto iter = clients_.find (rtspfd);
220 	if(iter == clients_.end()) {
221 		std::weak_ptr<RtpConnection> rtp_conn_weak_ptr = rtp_conn;
222 		clients_.emplace(rtspfd, rtp_conn_weak_ptr);
223 		for (auto& callback : notify_connected_callbacks_) {
224 			callback(session_id_, rtp_conn->GetIp(), rtp_conn->GetPort());
225 		}
226 
227 		has_new_client_ = true;
228 		return true;
229 	}
230 
231 	return false;
232 }
233 
RemoveClient(SOCKET rtspfd)234 void MediaSession::RemoveClient(SOCKET rtspfd)
235 {
236 	std::lock_guard<std::mutex> lock(map_mutex_);
237 
238 	auto iter = clients_.find(rtspfd);
239 	if (iter != clients_.end()) {
240 		auto conn = iter->second.lock();
241 		if (conn) {
242 			for (auto& callback : notify_disconnected_callbacks_) {
243 				callback(session_id_, conn->GetIp(), conn->GetPort());
244 			}
245 		}
246 		clients_.erase(iter);
247 	}
248 }
249 
250