1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "media/cast/receiver/frame_receiver.h"
6 
7 #include <algorithm>
8 #include <string>
9 #include <utility>
10 
11 #include "base/big_endian.h"
12 #include "base/bind.h"
13 #include "base/logging.h"
14 #include "base/numerics/safe_conversions.h"
15 #include "media/cast/cast_config.h"
16 #include "media/cast/cast_environment.h"
17 #include "media/cast/constants.h"
18 #include "media/cast/net/rtcp/rtcp_utility.h"
19 
20 namespace {
21 
22 const int kMinSchedulingDelayMs = 1;
23 
CreateRtcpTimeData(base::TimeTicks now)24 media::cast::RtcpTimeData CreateRtcpTimeData(base::TimeTicks now) {
25   media::cast::RtcpTimeData ret;
26   ret.timestamp = now;
27   media::cast::ConvertTimeTicksToNtp(now, &ret.ntp_seconds, &ret.ntp_fraction);
28   return ret;
29 }
30 
31 }  // namespace
32 
33 namespace media {
34 namespace cast {
35 
FrameReceiver(const scoped_refptr<CastEnvironment> & cast_environment,const FrameReceiverConfig & config,EventMediaType event_media_type,CastTransport * const transport)36 FrameReceiver::FrameReceiver(
37     const scoped_refptr<CastEnvironment>& cast_environment,
38     const FrameReceiverConfig& config,
39     EventMediaType event_media_type,
40     CastTransport* const transport)
41     : cast_environment_(cast_environment),
42       transport_(transport),
43       packet_parser_(
44           config.sender_ssrc,
45           config.rtp_payload_type <= RtpPayloadType::AUDIO_LAST ? 127 : 96),
46       stats_(cast_environment->Clock()),
47       event_media_type_(event_media_type),
48       event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type),
49       rtp_timebase_(config.rtp_timebase),
50       target_playout_delay_(
51           base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)),
52       expected_frame_duration_(
53           base::TimeDelta::FromSecondsD(1.0 / config.target_frame_rate)),
54       reports_are_scheduled_(false),
55       framer_(cast_environment->Clock(),
56               this,
57               config.sender_ssrc,
58               true,
59               static_cast<int>(config.rtp_max_delay_ms *
60                                config.target_frame_rate / 1000)),
61       rtcp_(cast_environment_->Clock(),
62             config.receiver_ssrc,
63             config.sender_ssrc),
64       is_waiting_for_consecutive_frame_(false),
65       lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()) {
66   transport_->AddValidRtpReceiver(config.sender_ssrc, config.receiver_ssrc);
67   DCHECK_GT(config.rtp_max_delay_ms, 0);
68   DCHECK_GT(config.target_frame_rate, 0);
69   decryptor_.Initialize(config.aes_key, config.aes_iv_mask);
70   cast_environment_->logger()->Subscribe(&event_subscriber_);
71 }
72 
~FrameReceiver()73 FrameReceiver::~FrameReceiver() {
74   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
75   cast_environment_->logger()->Unsubscribe(&event_subscriber_);
76 }
77 
RequestEncodedFrame(ReceiveEncodedFrameCallback callback)78 void FrameReceiver::RequestEncodedFrame(ReceiveEncodedFrameCallback callback) {
79   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
80   frame_request_queue_.push_back(std::move(callback));
81   EmitAvailableEncodedFrames();
82 }
83 
ProcessPacket(std::unique_ptr<Packet> packet)84 bool FrameReceiver::ProcessPacket(std::unique_ptr<Packet> packet) {
85   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
86 
87   if (IsRtcpPacket(&packet->front(), packet->size())) {
88     rtcp_.IncomingRtcpPacket(&packet->front(), packet->size());
89   } else {
90     RtpCastHeader rtp_header;
91     const uint8_t* payload_data;
92     size_t payload_size;
93     if (!packet_parser_.ParsePacket(&packet->front(),
94                                     packet->size(),
95                                     &rtp_header,
96                                     &payload_data,
97                                     &payload_size)) {
98       return false;
99     }
100 
101     ProcessParsedPacket(rtp_header, payload_data, payload_size);
102     stats_.UpdateStatistics(rtp_header, rtp_timebase_);
103   }
104 
105   if (!reports_are_scheduled_) {
106     ScheduleNextRtcpReport();
107     ScheduleNextCastMessage();
108     reports_are_scheduled_ = true;
109   }
110 
111   return true;
112 }
113 
AsWeakPtr()114 base::WeakPtr<FrameReceiver> FrameReceiver::AsWeakPtr() {
115   return weak_factory_.GetWeakPtr();
116 }
117 
ProcessParsedPacket(const RtpCastHeader & rtp_header,const uint8_t * payload_data,size_t payload_size)118 void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header,
119                                         const uint8_t* payload_data,
120                                         size_t payload_size) {
121   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
122 
123   const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
124 
125   frame_id_to_rtp_timestamp_[rtp_header.frame_id.lower_8_bits()] =
126       rtp_header.rtp_timestamp;
127 
128   std::unique_ptr<PacketEvent> receive_event(new PacketEvent());
129   receive_event->timestamp = now;
130   receive_event->type = PACKET_RECEIVED;
131   receive_event->media_type = event_media_type_;
132   receive_event->rtp_timestamp = rtp_header.rtp_timestamp;
133   receive_event->frame_id = rtp_header.frame_id;
134   receive_event->packet_id = rtp_header.packet_id;
135   receive_event->max_packet_id = rtp_header.max_packet_id;
136   receive_event->size = base::checked_cast<uint32_t>(payload_size);
137   cast_environment_->logger()->DispatchPacketEvent(std::move(receive_event));
138 
139   bool duplicate = false;
140   const bool complete =
141       framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate);
142 
143   // Duplicate packets are ignored.
144   if (duplicate)
145     return;
146 
147   // Update lip-sync values upon receiving the first packet of each frame, or if
148   // they have never been set yet.
149   if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) {
150     RtpTimeTicks fresh_sync_rtp;
151     base::TimeTicks fresh_sync_reference;
152     if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) {
153       // HACK: The sender should have provided Sender Reports before the first
154       // frame was sent.  However, the spec does not currently require this.
155       // Therefore, when the data is missing, the local clock is used to
156       // generate reference timestamps.
157       VLOG(2) << "Lip sync info missing.  Falling-back to local clock.";
158       fresh_sync_rtp = rtp_header.rtp_timestamp;
159       fresh_sync_reference = now;
160     }
161     // |lip_sync_reference_time_| is always incremented according to the time
162     // delta computed from the difference in RTP timestamps.  Then,
163     // |lip_sync_drift_| accounts for clock drift and also smoothes-out any
164     // sudden/discontinuous shifts in the series of reference time values.
165     if (lip_sync_reference_time_.is_null()) {
166       lip_sync_reference_time_ = fresh_sync_reference;
167     } else {
168       // Note: It's okay for the conversion ToTimeDelta() to be approximate
169       // because |lip_sync_drift_| will account for accumulated errors.
170       lip_sync_reference_time_ +=
171           (fresh_sync_rtp - lip_sync_rtp_timestamp_).ToTimeDelta(rtp_timebase_);
172     }
173     lip_sync_rtp_timestamp_ = fresh_sync_rtp;
174     lip_sync_drift_.Update(
175         now, fresh_sync_reference - lip_sync_reference_time_);
176   }
177 
178   // Another frame is complete from a non-duplicate packet.  Attempt to emit
179   // more frames to satisfy enqueued requests.
180   if (complete)
181     EmitAvailableEncodedFrames();
182 }
183 
CastFeedback(const RtcpCastMessage & cast_message)184 void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) {
185   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
186 
187   base::TimeTicks now = cast_environment_->Clock()->NowTicks();
188   RtpTimeTicks rtp_timestamp =
189       frame_id_to_rtp_timestamp_[cast_message.ack_frame_id.lower_8_bits()];
190 
191   std::unique_ptr<FrameEvent> ack_sent_event(new FrameEvent());
192   ack_sent_event->timestamp = now;
193   ack_sent_event->type = FRAME_ACK_SENT;
194   ack_sent_event->media_type = event_media_type_;
195   ack_sent_event->rtp_timestamp = rtp_timestamp;
196   ack_sent_event->frame_id = cast_message.ack_frame_id;
197   cast_environment_->logger()->DispatchFrameEvent(std::move(ack_sent_event));
198 
199   ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events;
200   event_subscriber_.GetRtcpEventsWithRedundancy(&rtcp_events);
201   SendRtcpReport(rtcp_.local_ssrc(), rtcp_.remote_ssrc(),
202                  CreateRtcpTimeData(now), &cast_message, nullptr,
203                  target_playout_delay_, &rtcp_events, nullptr);
204 }
205 
EmitAvailableEncodedFrames()206 void FrameReceiver::EmitAvailableEncodedFrames() {
207   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
208 
209   while (!frame_request_queue_.empty()) {
210     // Attempt to peek at the next completed frame from the |framer_|.
211     // TODO(miu): We should only be peeking at the metadata, and not copying the
212     // payload yet!  Or, at least, peek using a StringPiece instead of a copy.
213     std::unique_ptr<EncodedFrame> encoded_frame(new EncodedFrame());
214     bool is_consecutively_next_frame = false;
215     bool have_multiple_complete_frames = false;
216     if (!framer_.GetEncodedFrame(encoded_frame.get(),
217                                  &is_consecutively_next_frame,
218                                  &have_multiple_complete_frames)) {
219       VLOG(1) << "Wait for more packets to produce a completed frame.";
220       return;  // ProcessParsedPacket() will invoke this method in the future.
221     }
222 
223     const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
224     const base::TimeTicks playout_time = GetPlayoutTime(*encoded_frame);
225 
226     // If we have multiple decodable frames, and the current frame is
227     // too old, then skip it and decode the next frame instead.
228     if (have_multiple_complete_frames && now > playout_time) {
229       framer_.ReleaseFrame(encoded_frame->frame_id);
230       continue;
231     }
232 
233     // If |framer_| has a frame ready that is out of sequence, examine the
234     // playout time to determine whether it's acceptable to continue, thereby
235     // skipping one or more frames.  Skip if the missing frame wouldn't complete
236     // playing before the start of playback of the available frame.
237     if (!is_consecutively_next_frame) {
238       // This assumes that decoding takes as long as playing, which might
239       // not be true.
240       const base::TimeTicks earliest_possible_end_time_of_missing_frame =
241           now + expected_frame_duration_ * 2;
242       if (earliest_possible_end_time_of_missing_frame < playout_time) {
243         VLOG(1) << "Wait for next consecutive frame instead of skipping.";
244         if (!is_waiting_for_consecutive_frame_) {
245           is_waiting_for_consecutive_frame_ = true;
246           cast_environment_->PostDelayedTask(
247               CastEnvironment::MAIN, FROM_HERE,
248               base::BindOnce(
249                   &FrameReceiver::EmitAvailableEncodedFramesAfterWaiting,
250                   AsWeakPtr()),
251               playout_time - now);
252         }
253         return;
254       }
255     }
256 
257     // At this point, we have the complete next frame, or a decodable
258     // frame from somewhere later in the stream, AND we have given up
259     // on waiting for any frames in between, so now we can ACK the frame.
260     framer_.AckFrame(encoded_frame->frame_id);
261 
262     // Decrypt the payload data in the frame, if crypto is being used.
263     if (decryptor_.is_activated()) {
264       std::string decrypted_data;
265       if (!decryptor_.Decrypt(encoded_frame->frame_id,
266                               encoded_frame->data,
267                               &decrypted_data)) {
268         // Decryption failed.  Give up on this frame.
269         framer_.ReleaseFrame(encoded_frame->frame_id);
270         continue;
271       }
272       encoded_frame->data.swap(decrypted_data);
273     }
274 
275     // At this point, we have a decrypted EncodedFrame ready to be emitted.
276     encoded_frame->reference_time = playout_time;
277     framer_.ReleaseFrame(encoded_frame->frame_id);
278     if (encoded_frame->new_playout_delay_ms) {
279       target_playout_delay_ = base::TimeDelta::FromMilliseconds(
280           encoded_frame->new_playout_delay_ms);
281     }
282     cast_environment_->PostTask(
283         CastEnvironment::MAIN, FROM_HERE,
284         base::BindOnce(&FrameReceiver::EmitOneFrame, AsWeakPtr(),
285                        std::move(*frame_request_queue_.begin()),
286                        std::move(encoded_frame)));
287     frame_request_queue_.pop_front();
288   }
289 }
290 
EmitAvailableEncodedFramesAfterWaiting()291 void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() {
292   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
293   DCHECK(is_waiting_for_consecutive_frame_);
294   is_waiting_for_consecutive_frame_ = false;
295   EmitAvailableEncodedFrames();
296 }
297 
EmitOneFrame(ReceiveEncodedFrameCallback callback,std::unique_ptr<EncodedFrame> encoded_frame) const298 void FrameReceiver::EmitOneFrame(
299     ReceiveEncodedFrameCallback callback,
300     std::unique_ptr<EncodedFrame> encoded_frame) const {
301   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
302   if (!callback.is_null())
303     std::move(callback).Run(std::move(encoded_frame));
304 }
305 
GetPlayoutTime(const EncodedFrame & frame) const306 base::TimeTicks FrameReceiver::GetPlayoutTime(const EncodedFrame& frame) const {
307   base::TimeDelta target_playout_delay = target_playout_delay_;
308   if (frame.new_playout_delay_ms) {
309     target_playout_delay = base::TimeDelta::FromMilliseconds(
310         frame.new_playout_delay_ms);
311   }
312   return lip_sync_reference_time_ + lip_sync_drift_.Current() +
313          (frame.rtp_timestamp - lip_sync_rtp_timestamp_)
314              .ToTimeDelta(rtp_timebase_) +
315          target_playout_delay;
316 }
317 
ScheduleNextCastMessage()318 void FrameReceiver::ScheduleNextCastMessage() {
319   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
320   base::TimeTicks send_time;
321   framer_.TimeToSendNextCastMessage(&send_time);
322   base::TimeDelta time_to_send =
323       send_time - cast_environment_->Clock()->NowTicks();
324   time_to_send = std::max(
325       time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
326   cast_environment_->PostDelayedTask(
327       CastEnvironment::MAIN, FROM_HERE,
328       base::BindOnce(&FrameReceiver::SendNextCastMessage, AsWeakPtr()),
329       time_to_send);
330 }
331 
SendNextCastMessage()332 void FrameReceiver::SendNextCastMessage() {
333   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
334   framer_.SendCastMessage();  // Will only send a message if it is time.
335   ScheduleNextCastMessage();
336 }
337 
ScheduleNextRtcpReport()338 void FrameReceiver::ScheduleNextRtcpReport() {
339   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
340 
341   cast_environment_->PostDelayedTask(
342       CastEnvironment::MAIN, FROM_HERE,
343       base::BindOnce(&FrameReceiver::SendNextRtcpReport, AsWeakPtr()),
344       base::TimeDelta::FromMilliseconds(kRtcpReportIntervalMs));
345 }
346 
SendNextRtcpReport()347 void FrameReceiver::SendNextRtcpReport() {
348   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
349   const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
350   RtpReceiverStatistics stats = stats_.GetStatistics();
351   SendRtcpReport(rtcp_.local_ssrc(), rtcp_.remote_ssrc(),
352                  CreateRtcpTimeData(now), nullptr, nullptr, base::TimeDelta(),
353                  nullptr, &stats);
354   ScheduleNextRtcpReport();
355 }
356 
SendRtcpReport(uint32_t rtp_receiver_ssrc,uint32_t rtp_sender_ssrc,const RtcpTimeData & time_data,const RtcpCastMessage * cast_message,const RtcpPliMessage * pli_message,base::TimeDelta target_delay,const ReceiverRtcpEventSubscriber::RtcpEvents * rtcp_events,const RtpReceiverStatistics * rtp_receiver_statistics)357 void FrameReceiver::SendRtcpReport(
358     uint32_t rtp_receiver_ssrc,
359     uint32_t rtp_sender_ssrc,
360     const RtcpTimeData& time_data,
361     const RtcpCastMessage* cast_message,
362     const RtcpPliMessage* pli_message,
363     base::TimeDelta target_delay,
364     const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events,
365     const RtpReceiverStatistics* rtp_receiver_statistics) {
366   transport_->InitializeRtpReceiverRtcpBuilder(rtp_receiver_ssrc, time_data);
367   RtcpReportBlock report_block;
368   if (rtp_receiver_statistics) {
369     report_block.remote_ssrc = 0;  // Not needed to set send side.
370     report_block.media_ssrc =
371         rtp_sender_ssrc;  // SSRC of the RTP packet sender.
372     report_block.fraction_lost = rtp_receiver_statistics->fraction_lost;
373     report_block.cumulative_lost = rtp_receiver_statistics->cumulative_lost;
374     report_block.extended_high_sequence_number =
375         rtp_receiver_statistics->extended_high_sequence_number;
376     report_block.jitter = rtp_receiver_statistics->jitter;
377     report_block.last_sr = rtcp_.last_report_truncated_ntp();
378     base::TimeTicks last_report_received_time =
379         rtcp_.time_last_report_received();
380     if (!last_report_received_time.is_null()) {
381       uint32_t delay_seconds = 0;
382       uint32_t delay_fraction = 0;
383       base::TimeDelta delta = time_data.timestamp - last_report_received_time;
384       ConvertTimeToFractions(delta.InMicroseconds(), &delay_seconds,
385                              &delay_fraction);
386       report_block.delay_since_last_sr =
387           ConvertToNtpDiff(delay_seconds, delay_fraction);
388     } else {
389       report_block.delay_since_last_sr = 0;
390     }
391     transport_->AddRtpReceiverReport(report_block);
392   }
393   if (cast_message)
394     transport_->AddCastFeedback(*cast_message, target_delay);
395   if (pli_message)
396     transport_->AddPli(*pli_message);
397   if (rtcp_events)
398     transport_->AddRtcpEvents(*rtcp_events);
399   transport_->SendRtcpFromRtpReceiver();
400 }
401 
402 }  // namespace cast
403 }  // namespace media
404