1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/cast/receiver/frame_receiver.h"
6
7 #include <algorithm>
8 #include <string>
9 #include <utility>
10
11 #include "base/big_endian.h"
12 #include "base/bind.h"
13 #include "base/logging.h"
14 #include "base/numerics/safe_conversions.h"
15 #include "media/cast/cast_config.h"
16 #include "media/cast/cast_environment.h"
17 #include "media/cast/constants.h"
18 #include "media/cast/net/rtcp/rtcp_utility.h"
19
20 namespace {
21
22 const int kMinSchedulingDelayMs = 1;
23
CreateRtcpTimeData(base::TimeTicks now)24 media::cast::RtcpTimeData CreateRtcpTimeData(base::TimeTicks now) {
25 media::cast::RtcpTimeData ret;
26 ret.timestamp = now;
27 media::cast::ConvertTimeTicksToNtp(now, &ret.ntp_seconds, &ret.ntp_fraction);
28 return ret;
29 }
30
31 } // namespace
32
33 namespace media {
34 namespace cast {
35
FrameReceiver(const scoped_refptr<CastEnvironment> & cast_environment,const FrameReceiverConfig & config,EventMediaType event_media_type,CastTransport * const transport)36 FrameReceiver::FrameReceiver(
37 const scoped_refptr<CastEnvironment>& cast_environment,
38 const FrameReceiverConfig& config,
39 EventMediaType event_media_type,
40 CastTransport* const transport)
41 : cast_environment_(cast_environment),
42 transport_(transport),
43 packet_parser_(
44 config.sender_ssrc,
45 config.rtp_payload_type <= RtpPayloadType::AUDIO_LAST ? 127 : 96),
46 stats_(cast_environment->Clock()),
47 event_media_type_(event_media_type),
48 event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type),
49 rtp_timebase_(config.rtp_timebase),
50 target_playout_delay_(
51 base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)),
52 expected_frame_duration_(
53 base::TimeDelta::FromSecondsD(1.0 / config.target_frame_rate)),
54 reports_are_scheduled_(false),
55 framer_(cast_environment->Clock(),
56 this,
57 config.sender_ssrc,
58 true,
59 static_cast<int>(config.rtp_max_delay_ms *
60 config.target_frame_rate / 1000)),
61 rtcp_(cast_environment_->Clock(),
62 config.receiver_ssrc,
63 config.sender_ssrc),
64 is_waiting_for_consecutive_frame_(false),
65 lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()) {
66 transport_->AddValidRtpReceiver(config.sender_ssrc, config.receiver_ssrc);
67 DCHECK_GT(config.rtp_max_delay_ms, 0);
68 DCHECK_GT(config.target_frame_rate, 0);
69 decryptor_.Initialize(config.aes_key, config.aes_iv_mask);
70 cast_environment_->logger()->Subscribe(&event_subscriber_);
71 }
72
~FrameReceiver()73 FrameReceiver::~FrameReceiver() {
74 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
75 cast_environment_->logger()->Unsubscribe(&event_subscriber_);
76 }
77
RequestEncodedFrame(ReceiveEncodedFrameCallback callback)78 void FrameReceiver::RequestEncodedFrame(ReceiveEncodedFrameCallback callback) {
79 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
80 frame_request_queue_.push_back(std::move(callback));
81 EmitAvailableEncodedFrames();
82 }
83
ProcessPacket(std::unique_ptr<Packet> packet)84 bool FrameReceiver::ProcessPacket(std::unique_ptr<Packet> packet) {
85 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
86
87 if (IsRtcpPacket(&packet->front(), packet->size())) {
88 rtcp_.IncomingRtcpPacket(&packet->front(), packet->size());
89 } else {
90 RtpCastHeader rtp_header;
91 const uint8_t* payload_data;
92 size_t payload_size;
93 if (!packet_parser_.ParsePacket(&packet->front(),
94 packet->size(),
95 &rtp_header,
96 &payload_data,
97 &payload_size)) {
98 return false;
99 }
100
101 ProcessParsedPacket(rtp_header, payload_data, payload_size);
102 stats_.UpdateStatistics(rtp_header, rtp_timebase_);
103 }
104
105 if (!reports_are_scheduled_) {
106 ScheduleNextRtcpReport();
107 ScheduleNextCastMessage();
108 reports_are_scheduled_ = true;
109 }
110
111 return true;
112 }
113
AsWeakPtr()114 base::WeakPtr<FrameReceiver> FrameReceiver::AsWeakPtr() {
115 return weak_factory_.GetWeakPtr();
116 }
117
ProcessParsedPacket(const RtpCastHeader & rtp_header,const uint8_t * payload_data,size_t payload_size)118 void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header,
119 const uint8_t* payload_data,
120 size_t payload_size) {
121 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
122
123 const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
124
125 frame_id_to_rtp_timestamp_[rtp_header.frame_id.lower_8_bits()] =
126 rtp_header.rtp_timestamp;
127
128 std::unique_ptr<PacketEvent> receive_event(new PacketEvent());
129 receive_event->timestamp = now;
130 receive_event->type = PACKET_RECEIVED;
131 receive_event->media_type = event_media_type_;
132 receive_event->rtp_timestamp = rtp_header.rtp_timestamp;
133 receive_event->frame_id = rtp_header.frame_id;
134 receive_event->packet_id = rtp_header.packet_id;
135 receive_event->max_packet_id = rtp_header.max_packet_id;
136 receive_event->size = base::checked_cast<uint32_t>(payload_size);
137 cast_environment_->logger()->DispatchPacketEvent(std::move(receive_event));
138
139 bool duplicate = false;
140 const bool complete =
141 framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate);
142
143 // Duplicate packets are ignored.
144 if (duplicate)
145 return;
146
147 // Update lip-sync values upon receiving the first packet of each frame, or if
148 // they have never been set yet.
149 if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) {
150 RtpTimeTicks fresh_sync_rtp;
151 base::TimeTicks fresh_sync_reference;
152 if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) {
153 // HACK: The sender should have provided Sender Reports before the first
154 // frame was sent. However, the spec does not currently require this.
155 // Therefore, when the data is missing, the local clock is used to
156 // generate reference timestamps.
157 VLOG(2) << "Lip sync info missing. Falling-back to local clock.";
158 fresh_sync_rtp = rtp_header.rtp_timestamp;
159 fresh_sync_reference = now;
160 }
161 // |lip_sync_reference_time_| is always incremented according to the time
162 // delta computed from the difference in RTP timestamps. Then,
163 // |lip_sync_drift_| accounts for clock drift and also smoothes-out any
164 // sudden/discontinuous shifts in the series of reference time values.
165 if (lip_sync_reference_time_.is_null()) {
166 lip_sync_reference_time_ = fresh_sync_reference;
167 } else {
168 // Note: It's okay for the conversion ToTimeDelta() to be approximate
169 // because |lip_sync_drift_| will account for accumulated errors.
170 lip_sync_reference_time_ +=
171 (fresh_sync_rtp - lip_sync_rtp_timestamp_).ToTimeDelta(rtp_timebase_);
172 }
173 lip_sync_rtp_timestamp_ = fresh_sync_rtp;
174 lip_sync_drift_.Update(
175 now, fresh_sync_reference - lip_sync_reference_time_);
176 }
177
178 // Another frame is complete from a non-duplicate packet. Attempt to emit
179 // more frames to satisfy enqueued requests.
180 if (complete)
181 EmitAvailableEncodedFrames();
182 }
183
CastFeedback(const RtcpCastMessage & cast_message)184 void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) {
185 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
186
187 base::TimeTicks now = cast_environment_->Clock()->NowTicks();
188 RtpTimeTicks rtp_timestamp =
189 frame_id_to_rtp_timestamp_[cast_message.ack_frame_id.lower_8_bits()];
190
191 std::unique_ptr<FrameEvent> ack_sent_event(new FrameEvent());
192 ack_sent_event->timestamp = now;
193 ack_sent_event->type = FRAME_ACK_SENT;
194 ack_sent_event->media_type = event_media_type_;
195 ack_sent_event->rtp_timestamp = rtp_timestamp;
196 ack_sent_event->frame_id = cast_message.ack_frame_id;
197 cast_environment_->logger()->DispatchFrameEvent(std::move(ack_sent_event));
198
199 ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events;
200 event_subscriber_.GetRtcpEventsWithRedundancy(&rtcp_events);
201 SendRtcpReport(rtcp_.local_ssrc(), rtcp_.remote_ssrc(),
202 CreateRtcpTimeData(now), &cast_message, nullptr,
203 target_playout_delay_, &rtcp_events, nullptr);
204 }
205
EmitAvailableEncodedFrames()206 void FrameReceiver::EmitAvailableEncodedFrames() {
207 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
208
209 while (!frame_request_queue_.empty()) {
210 // Attempt to peek at the next completed frame from the |framer_|.
211 // TODO(miu): We should only be peeking at the metadata, and not copying the
212 // payload yet! Or, at least, peek using a StringPiece instead of a copy.
213 std::unique_ptr<EncodedFrame> encoded_frame(new EncodedFrame());
214 bool is_consecutively_next_frame = false;
215 bool have_multiple_complete_frames = false;
216 if (!framer_.GetEncodedFrame(encoded_frame.get(),
217 &is_consecutively_next_frame,
218 &have_multiple_complete_frames)) {
219 VLOG(1) << "Wait for more packets to produce a completed frame.";
220 return; // ProcessParsedPacket() will invoke this method in the future.
221 }
222
223 const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
224 const base::TimeTicks playout_time = GetPlayoutTime(*encoded_frame);
225
226 // If we have multiple decodable frames, and the current frame is
227 // too old, then skip it and decode the next frame instead.
228 if (have_multiple_complete_frames && now > playout_time) {
229 framer_.ReleaseFrame(encoded_frame->frame_id);
230 continue;
231 }
232
233 // If |framer_| has a frame ready that is out of sequence, examine the
234 // playout time to determine whether it's acceptable to continue, thereby
235 // skipping one or more frames. Skip if the missing frame wouldn't complete
236 // playing before the start of playback of the available frame.
237 if (!is_consecutively_next_frame) {
238 // This assumes that decoding takes as long as playing, which might
239 // not be true.
240 const base::TimeTicks earliest_possible_end_time_of_missing_frame =
241 now + expected_frame_duration_ * 2;
242 if (earliest_possible_end_time_of_missing_frame < playout_time) {
243 VLOG(1) << "Wait for next consecutive frame instead of skipping.";
244 if (!is_waiting_for_consecutive_frame_) {
245 is_waiting_for_consecutive_frame_ = true;
246 cast_environment_->PostDelayedTask(
247 CastEnvironment::MAIN, FROM_HERE,
248 base::BindOnce(
249 &FrameReceiver::EmitAvailableEncodedFramesAfterWaiting,
250 AsWeakPtr()),
251 playout_time - now);
252 }
253 return;
254 }
255 }
256
257 // At this point, we have the complete next frame, or a decodable
258 // frame from somewhere later in the stream, AND we have given up
259 // on waiting for any frames in between, so now we can ACK the frame.
260 framer_.AckFrame(encoded_frame->frame_id);
261
262 // Decrypt the payload data in the frame, if crypto is being used.
263 if (decryptor_.is_activated()) {
264 std::string decrypted_data;
265 if (!decryptor_.Decrypt(encoded_frame->frame_id,
266 encoded_frame->data,
267 &decrypted_data)) {
268 // Decryption failed. Give up on this frame.
269 framer_.ReleaseFrame(encoded_frame->frame_id);
270 continue;
271 }
272 encoded_frame->data.swap(decrypted_data);
273 }
274
275 // At this point, we have a decrypted EncodedFrame ready to be emitted.
276 encoded_frame->reference_time = playout_time;
277 framer_.ReleaseFrame(encoded_frame->frame_id);
278 if (encoded_frame->new_playout_delay_ms) {
279 target_playout_delay_ = base::TimeDelta::FromMilliseconds(
280 encoded_frame->new_playout_delay_ms);
281 }
282 cast_environment_->PostTask(
283 CastEnvironment::MAIN, FROM_HERE,
284 base::BindOnce(&FrameReceiver::EmitOneFrame, AsWeakPtr(),
285 std::move(*frame_request_queue_.begin()),
286 std::move(encoded_frame)));
287 frame_request_queue_.pop_front();
288 }
289 }
290
EmitAvailableEncodedFramesAfterWaiting()291 void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() {
292 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
293 DCHECK(is_waiting_for_consecutive_frame_);
294 is_waiting_for_consecutive_frame_ = false;
295 EmitAvailableEncodedFrames();
296 }
297
EmitOneFrame(ReceiveEncodedFrameCallback callback,std::unique_ptr<EncodedFrame> encoded_frame) const298 void FrameReceiver::EmitOneFrame(
299 ReceiveEncodedFrameCallback callback,
300 std::unique_ptr<EncodedFrame> encoded_frame) const {
301 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
302 if (!callback.is_null())
303 std::move(callback).Run(std::move(encoded_frame));
304 }
305
GetPlayoutTime(const EncodedFrame & frame) const306 base::TimeTicks FrameReceiver::GetPlayoutTime(const EncodedFrame& frame) const {
307 base::TimeDelta target_playout_delay = target_playout_delay_;
308 if (frame.new_playout_delay_ms) {
309 target_playout_delay = base::TimeDelta::FromMilliseconds(
310 frame.new_playout_delay_ms);
311 }
312 return lip_sync_reference_time_ + lip_sync_drift_.Current() +
313 (frame.rtp_timestamp - lip_sync_rtp_timestamp_)
314 .ToTimeDelta(rtp_timebase_) +
315 target_playout_delay;
316 }
317
ScheduleNextCastMessage()318 void FrameReceiver::ScheduleNextCastMessage() {
319 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
320 base::TimeTicks send_time;
321 framer_.TimeToSendNextCastMessage(&send_time);
322 base::TimeDelta time_to_send =
323 send_time - cast_environment_->Clock()->NowTicks();
324 time_to_send = std::max(
325 time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
326 cast_environment_->PostDelayedTask(
327 CastEnvironment::MAIN, FROM_HERE,
328 base::BindOnce(&FrameReceiver::SendNextCastMessage, AsWeakPtr()),
329 time_to_send);
330 }
331
SendNextCastMessage()332 void FrameReceiver::SendNextCastMessage() {
333 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
334 framer_.SendCastMessage(); // Will only send a message if it is time.
335 ScheduleNextCastMessage();
336 }
337
ScheduleNextRtcpReport()338 void FrameReceiver::ScheduleNextRtcpReport() {
339 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
340
341 cast_environment_->PostDelayedTask(
342 CastEnvironment::MAIN, FROM_HERE,
343 base::BindOnce(&FrameReceiver::SendNextRtcpReport, AsWeakPtr()),
344 base::TimeDelta::FromMilliseconds(kRtcpReportIntervalMs));
345 }
346
SendNextRtcpReport()347 void FrameReceiver::SendNextRtcpReport() {
348 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
349 const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
350 RtpReceiverStatistics stats = stats_.GetStatistics();
351 SendRtcpReport(rtcp_.local_ssrc(), rtcp_.remote_ssrc(),
352 CreateRtcpTimeData(now), nullptr, nullptr, base::TimeDelta(),
353 nullptr, &stats);
354 ScheduleNextRtcpReport();
355 }
356
SendRtcpReport(uint32_t rtp_receiver_ssrc,uint32_t rtp_sender_ssrc,const RtcpTimeData & time_data,const RtcpCastMessage * cast_message,const RtcpPliMessage * pli_message,base::TimeDelta target_delay,const ReceiverRtcpEventSubscriber::RtcpEvents * rtcp_events,const RtpReceiverStatistics * rtp_receiver_statistics)357 void FrameReceiver::SendRtcpReport(
358 uint32_t rtp_receiver_ssrc,
359 uint32_t rtp_sender_ssrc,
360 const RtcpTimeData& time_data,
361 const RtcpCastMessage* cast_message,
362 const RtcpPliMessage* pli_message,
363 base::TimeDelta target_delay,
364 const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events,
365 const RtpReceiverStatistics* rtp_receiver_statistics) {
366 transport_->InitializeRtpReceiverRtcpBuilder(rtp_receiver_ssrc, time_data);
367 RtcpReportBlock report_block;
368 if (rtp_receiver_statistics) {
369 report_block.remote_ssrc = 0; // Not needed to set send side.
370 report_block.media_ssrc =
371 rtp_sender_ssrc; // SSRC of the RTP packet sender.
372 report_block.fraction_lost = rtp_receiver_statistics->fraction_lost;
373 report_block.cumulative_lost = rtp_receiver_statistics->cumulative_lost;
374 report_block.extended_high_sequence_number =
375 rtp_receiver_statistics->extended_high_sequence_number;
376 report_block.jitter = rtp_receiver_statistics->jitter;
377 report_block.last_sr = rtcp_.last_report_truncated_ntp();
378 base::TimeTicks last_report_received_time =
379 rtcp_.time_last_report_received();
380 if (!last_report_received_time.is_null()) {
381 uint32_t delay_seconds = 0;
382 uint32_t delay_fraction = 0;
383 base::TimeDelta delta = time_data.timestamp - last_report_received_time;
384 ConvertTimeToFractions(delta.InMicroseconds(), &delay_seconds,
385 &delay_fraction);
386 report_block.delay_since_last_sr =
387 ConvertToNtpDiff(delay_seconds, delay_fraction);
388 } else {
389 report_block.delay_since_last_sr = 0;
390 }
391 transport_->AddRtpReceiverReport(report_block);
392 }
393 if (cast_message)
394 transport_->AddCastFeedback(*cast_message, target_delay);
395 if (pli_message)
396 transport_->AddPli(*pli_message);
397 if (rtcp_events)
398 transport_->AddRtcpEvents(*rtcp_events);
399 transport_->SendRtcpFromRtpReceiver();
400 }
401
402 } // namespace cast
403 } // namespace media
404