1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/receive_statistics_impl.h"
12 
13 #include <cmath>
14 #include <cstdlib>
15 #include <memory>
16 #include <utility>
17 #include <vector>
18 
19 #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
20 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
21 #include "modules/rtp_rtcp/source/rtp_rtcp_config.h"
22 #include "modules/rtp_rtcp/source/time_util.h"
23 #include "rtc_base/logging.h"
24 #include "system_wrappers/include/clock.h"
25 
26 namespace webrtc {
27 namespace {
28 constexpr int64_t kStatisticsTimeoutMs = 8000;
29 constexpr int64_t kStatisticsProcessIntervalMs = 1000;
30 
31 // Number of seconds since 1900 January 1 00:00 GMT (see
32 // https://tools.ietf.org/html/rfc868).
33 constexpr int64_t kNtpJan1970Millisecs = 2'208'988'800'000;
34 }  // namespace
35 
~StreamStatistician()36 StreamStatistician::~StreamStatistician() {}
37 
StreamStatisticianImpl(uint32_t ssrc,Clock * clock,int max_reordering_threshold)38 StreamStatisticianImpl::StreamStatisticianImpl(uint32_t ssrc,
39                                                Clock* clock,
40                                                int max_reordering_threshold)
41     : ssrc_(ssrc),
42       clock_(clock),
43       delta_internal_unix_epoch_ms_(clock_->CurrentNtpInMilliseconds() -
44                                     clock_->TimeInMilliseconds() -
45                                     kNtpJan1970Millisecs),
46       incoming_bitrate_(kStatisticsProcessIntervalMs,
47                         RateStatistics::kBpsScale),
48       max_reordering_threshold_(max_reordering_threshold),
49       enable_retransmit_detection_(false),
50       jitter_q4_(0),
51       cumulative_loss_(0),
52       cumulative_loss_rtcp_offset_(0),
53       last_receive_time_ms_(0),
54       last_received_timestamp_(0),
55       received_seq_first_(-1),
56       received_seq_max_(-1),
57       last_report_cumulative_loss_(0),
58       last_report_seq_max_(-1) {}
59 
60 StreamStatisticianImpl::~StreamStatisticianImpl() = default;
61 
UpdateOutOfOrder(const RtpPacketReceived & packet,int64_t sequence_number,int64_t now_ms)62 bool StreamStatisticianImpl::UpdateOutOfOrder(const RtpPacketReceived& packet,
63                                               int64_t sequence_number,
64                                               int64_t now_ms) {
65   // Check if |packet| is second packet of a stream restart.
66   if (received_seq_out_of_order_) {
67     // Count the previous packet as a received; it was postponed below.
68     --cumulative_loss_;
69 
70     uint16_t expected_sequence_number = *received_seq_out_of_order_ + 1;
71     received_seq_out_of_order_ = absl::nullopt;
72     if (packet.SequenceNumber() == expected_sequence_number) {
73       // Ignore sequence number gap caused by stream restart for packet loss
74       // calculation, by setting received_seq_max_ to the sequence number just
75       // before the out-of-order seqno. This gives a net zero change of
76       // |cumulative_loss_|, for the two packets interpreted as a stream reset.
77       //
78       // Fraction loss for the next report may get a bit off, since we don't
79       // update last_report_seq_max_ and last_report_cumulative_loss_ in a
80       // consistent way.
81       last_report_seq_max_ = sequence_number - 2;
82       received_seq_max_ = sequence_number - 2;
83       return false;
84     }
85   }
86 
87   if (std::abs(sequence_number - received_seq_max_) >
88       max_reordering_threshold_) {
89     // Sequence number gap looks too large, wait until next packet to check
90     // for a stream restart.
91     received_seq_out_of_order_ = packet.SequenceNumber();
92     // Postpone counting this as a received packet until we know how to update
93     // |received_seq_max_|, otherwise we temporarily decrement
94     // |cumulative_loss_|. The
95     // ReceiveStatisticsTest.StreamRestartDoesntCountAsLoss test expects
96     // |cumulative_loss_| to be unchanged by the reception of the first packet
97     // after stream reset.
98     ++cumulative_loss_;
99     return true;
100   }
101 
102   if (sequence_number > received_seq_max_)
103     return false;
104 
105   // Old out of order packet, may be retransmit.
106   if (enable_retransmit_detection_ && IsRetransmitOfOldPacket(packet, now_ms))
107     receive_counters_.retransmitted.AddPacket(packet);
108   return true;
109 }
110 
UpdateCounters(const RtpPacketReceived & packet)111 void StreamStatisticianImpl::UpdateCounters(const RtpPacketReceived& packet) {
112   RTC_DCHECK_EQ(ssrc_, packet.Ssrc());
113   int64_t now_ms = clock_->TimeInMilliseconds();
114 
115   incoming_bitrate_.Update(packet.size(), now_ms);
116   receive_counters_.last_packet_received_timestamp_ms = now_ms;
117   receive_counters_.transmitted.AddPacket(packet);
118   --cumulative_loss_;
119 
120   int64_t sequence_number =
121       seq_unwrapper_.UnwrapWithoutUpdate(packet.SequenceNumber());
122 
123   if (!ReceivedRtpPacket()) {
124     received_seq_first_ = sequence_number;
125     last_report_seq_max_ = sequence_number - 1;
126     received_seq_max_ = sequence_number - 1;
127     receive_counters_.first_packet_time_ms = now_ms;
128   } else if (UpdateOutOfOrder(packet, sequence_number, now_ms)) {
129     return;
130   }
131   // In order packet.
132   cumulative_loss_ += sequence_number - received_seq_max_;
133   received_seq_max_ = sequence_number;
134   seq_unwrapper_.UpdateLast(sequence_number);
135 
136   // If new time stamp and more than one in-order packet received, calculate
137   // new jitter statistics.
138   if (packet.Timestamp() != last_received_timestamp_ &&
139       (receive_counters_.transmitted.packets -
140        receive_counters_.retransmitted.packets) > 1) {
141     UpdateJitter(packet, now_ms);
142   }
143   last_received_timestamp_ = packet.Timestamp();
144   last_receive_time_ms_ = now_ms;
145 }
146 
UpdateJitter(const RtpPacketReceived & packet,int64_t receive_time_ms)147 void StreamStatisticianImpl::UpdateJitter(const RtpPacketReceived& packet,
148                                           int64_t receive_time_ms) {
149   int64_t receive_diff_ms = receive_time_ms - last_receive_time_ms_;
150   RTC_DCHECK_GE(receive_diff_ms, 0);
151   uint32_t receive_diff_rtp = static_cast<uint32_t>(
152       (receive_diff_ms * packet.payload_type_frequency()) / 1000);
153   int32_t time_diff_samples =
154       receive_diff_rtp - (packet.Timestamp() - last_received_timestamp_);
155 
156   time_diff_samples = std::abs(time_diff_samples);
157 
158   // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
159   // If this happens, don't update jitter value. Use 5 secs video frequency
160   // as the threshold.
161   if (time_diff_samples < 450000) {
162     // Note we calculate in Q4 to avoid using float.
163     int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
164     jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
165   }
166 }
167 
SetMaxReorderingThreshold(int max_reordering_threshold)168 void StreamStatisticianImpl::SetMaxReorderingThreshold(
169     int max_reordering_threshold) {
170   max_reordering_threshold_ = max_reordering_threshold;
171 }
172 
EnableRetransmitDetection(bool enable)173 void StreamStatisticianImpl::EnableRetransmitDetection(bool enable) {
174   enable_retransmit_detection_ = enable;
175 }
176 
GetStats() const177 RtpReceiveStats StreamStatisticianImpl::GetStats() const {
178   RtpReceiveStats stats;
179   stats.packets_lost = cumulative_loss_;
180   // TODO(nisse): Can we return a float instead?
181   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
182   stats.jitter = jitter_q4_ >> 4;
183   if (receive_counters_.last_packet_received_timestamp_ms.has_value()) {
184     stats.last_packet_received_timestamp_ms =
185         *receive_counters_.last_packet_received_timestamp_ms +
186         delta_internal_unix_epoch_ms_;
187   }
188   stats.packet_counter = receive_counters_.transmitted;
189   return stats;
190 }
191 
GetActiveStatisticsAndReset(RtcpStatistics * statistics)192 bool StreamStatisticianImpl::GetActiveStatisticsAndReset(
193     RtcpStatistics* statistics) {
194   if (clock_->TimeInMilliseconds() - last_receive_time_ms_ >=
195       kStatisticsTimeoutMs) {
196     // Not active.
197     return false;
198   }
199   if (!ReceivedRtpPacket()) {
200     return false;
201   }
202   *statistics = CalculateRtcpStatistics();
203   return true;
204 }
205 
CalculateRtcpStatistics()206 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
207   RtcpStatistics stats;
208   // Calculate fraction lost.
209   int64_t exp_since_last = received_seq_max_ - last_report_seq_max_;
210   RTC_DCHECK_GE(exp_since_last, 0);
211 
212   int32_t lost_since_last = cumulative_loss_ - last_report_cumulative_loss_;
213   if (exp_since_last > 0 && lost_since_last > 0) {
214     // Scale 0 to 255, where 255 is 100% loss.
215     stats.fraction_lost =
216         static_cast<uint8_t>(255 * lost_since_last / exp_since_last);
217   } else {
218     stats.fraction_lost = 0;
219   }
220 
221   // TODO(danilchap): Ensure |stats.packets_lost| is clamped to fit in a signed
222   // 24-bit value.
223   stats.packets_lost = cumulative_loss_ + cumulative_loss_rtcp_offset_;
224   if (stats.packets_lost < 0) {
225     // Clamp to zero. Work around to accomodate for senders that misbehave with
226     // negative cumulative loss.
227     stats.packets_lost = 0;
228     cumulative_loss_rtcp_offset_ = -cumulative_loss_;
229   }
230   stats.extended_highest_sequence_number =
231       static_cast<uint32_t>(received_seq_max_);
232   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
233   stats.jitter = jitter_q4_ >> 4;
234 
235   // Only for report blocks in RTCP SR and RR.
236   last_report_cumulative_loss_ = cumulative_loss_;
237   last_report_seq_max_ = received_seq_max_;
238   BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "cumulative_loss_pkts",
239                                   clock_->TimeInMilliseconds(),
240                                   cumulative_loss_, ssrc_);
241   BWE_TEST_LOGGING_PLOT_WITH_SSRC(
242       1, "received_seq_max_pkts", clock_->TimeInMilliseconds(),
243       (received_seq_max_ - received_seq_first_), ssrc_);
244 
245   return stats;
246 }
247 
GetFractionLostInPercent() const248 absl::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const {
249   if (!ReceivedRtpPacket()) {
250     return absl::nullopt;
251   }
252   int64_t expected_packets = 1 + received_seq_max_ - received_seq_first_;
253   if (expected_packets <= 0) {
254     return absl::nullopt;
255   }
256   if (cumulative_loss_ <= 0) {
257     return 0;
258   }
259   return 100 * static_cast<int64_t>(cumulative_loss_) / expected_packets;
260 }
261 
GetReceiveStreamDataCounters() const262 StreamDataCounters StreamStatisticianImpl::GetReceiveStreamDataCounters()
263     const {
264   return receive_counters_;
265 }
266 
BitrateReceived() const267 uint32_t StreamStatisticianImpl::BitrateReceived() const {
268   return incoming_bitrate_.Rate(clock_->TimeInMilliseconds()).value_or(0);
269 }
270 
IsRetransmitOfOldPacket(const RtpPacketReceived & packet,int64_t now_ms) const271 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
272     const RtpPacketReceived& packet,
273     int64_t now_ms) const {
274   uint32_t frequency_khz = packet.payload_type_frequency() / 1000;
275   RTC_DCHECK_GT(frequency_khz, 0);
276 
277   int64_t time_diff_ms = now_ms - last_receive_time_ms_;
278 
279   // Diff in time stamp since last received in order.
280   uint32_t timestamp_diff = packet.Timestamp() - last_received_timestamp_;
281   uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
282 
283   int64_t max_delay_ms = 0;
284 
285   // Jitter standard deviation in samples.
286   float jitter_std = std::sqrt(static_cast<float>(jitter_q4_ >> 4));
287 
288   // 2 times the standard deviation => 95% confidence.
289   // And transform to milliseconds by dividing by the frequency in kHz.
290   max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
291 
292   // Min max_delay_ms is 1.
293   if (max_delay_ms == 0) {
294     max_delay_ms = 1;
295   }
296   return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
297 }
298 
Create(Clock * clock)299 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::Create(Clock* clock) {
300   return std::make_unique<ReceiveStatisticsLocked>(
301       clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
302         return std::make_unique<StreamStatisticianLocked>(
303             ssrc, clock, max_reordering_threshold);
304       });
305 }
306 
CreateThreadCompatible(Clock * clock)307 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::CreateThreadCompatible(
308     Clock* clock) {
309   return std::make_unique<ReceiveStatisticsImpl>(
310       clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
311         return std::make_unique<StreamStatisticianImpl>(
312             ssrc, clock, max_reordering_threshold);
313       });
314 }
315 
ReceiveStatisticsImpl(Clock * clock,std::function<std::unique_ptr<StreamStatisticianImplInterface> (uint32_t ssrc,Clock * clock,int max_reordering_threshold)> stream_statistician_factory)316 ReceiveStatisticsImpl::ReceiveStatisticsImpl(
317     Clock* clock,
318     std::function<std::unique_ptr<StreamStatisticianImplInterface>(
319         uint32_t ssrc,
320         Clock* clock,
321         int max_reordering_threshold)> stream_statistician_factory)
322     : clock_(clock),
323       stream_statistician_factory_(std::move(stream_statistician_factory)),
324       last_returned_ssrc_(0),
325       max_reordering_threshold_(kDefaultMaxReorderingThreshold) {}
326 
OnRtpPacket(const RtpPacketReceived & packet)327 void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) {
328   // StreamStatisticianImpl instance is created once and only destroyed when
329   // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
330   // it's own locking so don't hold receive_statistics_lock_ (potential
331   // deadlock).
332   GetOrCreateStatistician(packet.Ssrc())->UpdateCounters(packet);
333 }
334 
GetStatistician(uint32_t ssrc) const335 StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
336     uint32_t ssrc) const {
337   const auto& it = statisticians_.find(ssrc);
338   if (it == statisticians_.end())
339     return nullptr;
340   return it->second.get();
341 }
342 
GetOrCreateStatistician(uint32_t ssrc)343 StreamStatisticianImplInterface* ReceiveStatisticsImpl::GetOrCreateStatistician(
344     uint32_t ssrc) {
345   std::unique_ptr<StreamStatisticianImplInterface>& impl = statisticians_[ssrc];
346   if (impl == nullptr) {  // new element
347     impl =
348         stream_statistician_factory_(ssrc, clock_, max_reordering_threshold_);
349   }
350   return impl.get();
351 }
352 
SetMaxReorderingThreshold(int max_reordering_threshold)353 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
354     int max_reordering_threshold) {
355   max_reordering_threshold_ = max_reordering_threshold;
356   for (auto& statistician : statisticians_) {
357     statistician.second->SetMaxReorderingThreshold(max_reordering_threshold);
358   }
359 }
360 
SetMaxReorderingThreshold(uint32_t ssrc,int max_reordering_threshold)361 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
362     uint32_t ssrc,
363     int max_reordering_threshold) {
364   GetOrCreateStatistician(ssrc)->SetMaxReorderingThreshold(
365       max_reordering_threshold);
366 }
367 
EnableRetransmitDetection(uint32_t ssrc,bool enable)368 void ReceiveStatisticsImpl::EnableRetransmitDetection(uint32_t ssrc,
369                                                       bool enable) {
370   GetOrCreateStatistician(ssrc)->EnableRetransmitDetection(enable);
371 }
372 
RtcpReportBlocks(size_t max_blocks)373 std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks(
374     size_t max_blocks) {
375   std::vector<rtcp::ReportBlock> result;
376   result.reserve(std::min(max_blocks, statisticians_.size()));
377   auto add_report_block = [&result](
378                               uint32_t media_ssrc,
379                               StreamStatisticianImplInterface* statistician) {
380     // Do we have receive statistics to send?
381     RtcpStatistics stats;
382     if (!statistician->GetActiveStatisticsAndReset(&stats))
383       return;
384     result.emplace_back();
385     rtcp::ReportBlock& block = result.back();
386     block.SetMediaSsrc(media_ssrc);
387     block.SetFractionLost(stats.fraction_lost);
388     if (!block.SetCumulativeLost(stats.packets_lost)) {
389       RTC_LOG(LS_WARNING) << "Cumulative lost is oversized.";
390       result.pop_back();
391       return;
392     }
393     block.SetExtHighestSeqNum(stats.extended_highest_sequence_number);
394     block.SetJitter(stats.jitter);
395   };
396 
397   const auto start_it = statisticians_.upper_bound(last_returned_ssrc_);
398   for (auto it = start_it;
399        result.size() < max_blocks && it != statisticians_.end(); ++it)
400     add_report_block(it->first, it->second.get());
401   for (auto it = statisticians_.begin();
402        result.size() < max_blocks && it != start_it; ++it)
403     add_report_block(it->first, it->second.get());
404 
405   if (!result.empty())
406     last_returned_ssrc_ = result.back().source_ssrc();
407   return result;
408 }
409 
410 }  // namespace webrtc
411