1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/receive_statistics_impl.h"
12 
13 #include <cmath>
14 #include <cstdlib>
15 #include <memory>
16 #include <vector>
17 
18 #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
19 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
20 #include "modules/rtp_rtcp/source/rtp_rtcp_config.h"
21 #include "modules/rtp_rtcp/source/time_util.h"
22 #include "rtc_base/logging.h"
23 #include "system_wrappers/include/clock.h"
24 
25 namespace webrtc {
26 
27 const int64_t kStatisticsTimeoutMs = 8000;
28 const int64_t kStatisticsProcessIntervalMs = 1000;
29 
~StreamStatistician()30 StreamStatistician::~StreamStatistician() {}
31 
StreamStatisticianImpl(uint32_t ssrc,Clock * clock,int max_reordering_threshold)32 StreamStatisticianImpl::StreamStatisticianImpl(uint32_t ssrc,
33                                                Clock* clock,
34                                                int max_reordering_threshold)
35     : ssrc_(ssrc),
36       clock_(clock),
37       incoming_bitrate_(kStatisticsProcessIntervalMs,
38                         RateStatistics::kBpsScale),
39       max_reordering_threshold_(max_reordering_threshold),
40       enable_retransmit_detection_(false),
41       jitter_q4_(0),
42       cumulative_loss_(0),
43       cumulative_loss_rtcp_offset_(0),
44       last_receive_time_ms_(0),
45       last_received_timestamp_(0),
46       received_seq_first_(-1),
47       received_seq_max_(-1),
48       last_report_cumulative_loss_(0),
49       last_report_seq_max_(-1) {}
50 
51 StreamStatisticianImpl::~StreamStatisticianImpl() = default;
52 
UpdateOutOfOrder(const RtpPacketReceived & packet,int64_t sequence_number,int64_t now_ms)53 bool StreamStatisticianImpl::UpdateOutOfOrder(const RtpPacketReceived& packet,
54                                               int64_t sequence_number,
55                                               int64_t now_ms) {
56   // Check if |packet| is second packet of a stream restart.
57   if (received_seq_out_of_order_) {
58     // Count the previous packet as a received; it was postponed below.
59     --cumulative_loss_;
60 
61     uint16_t expected_sequence_number = *received_seq_out_of_order_ + 1;
62     received_seq_out_of_order_ = absl::nullopt;
63     if (packet.SequenceNumber() == expected_sequence_number) {
64       // Ignore sequence number gap caused by stream restart for packet loss
65       // calculation, by setting received_seq_max_ to the sequence number just
66       // before the out-of-order seqno. This gives a net zero change of
67       // |cumulative_loss_|, for the two packets interpreted as a stream reset.
68       //
69       // Fraction loss for the next report may get a bit off, since we don't
70       // update last_report_seq_max_ and last_report_cumulative_loss_ in a
71       // consistent way.
72       last_report_seq_max_ = sequence_number - 2;
73       received_seq_max_ = sequence_number - 2;
74       return false;
75     }
76   }
77 
78   if (std::abs(sequence_number - received_seq_max_) >
79       max_reordering_threshold_) {
80     // Sequence number gap looks too large, wait until next packet to check
81     // for a stream restart.
82     received_seq_out_of_order_ = packet.SequenceNumber();
83     // Postpone counting this as a received packet until we know how to update
84     // |received_seq_max_|, otherwise we temporarily decrement
85     // |cumulative_loss_|. The
86     // ReceiveStatisticsTest.StreamRestartDoesntCountAsLoss test expects
87     // |cumulative_loss_| to be unchanged by the reception of the first packet
88     // after stream reset.
89     ++cumulative_loss_;
90     return true;
91   }
92 
93   if (sequence_number > received_seq_max_)
94     return false;
95 
96   // Old out of order packet, may be retransmit.
97   if (enable_retransmit_detection_ && IsRetransmitOfOldPacket(packet, now_ms))
98     receive_counters_.retransmitted.AddPacket(packet);
99   return true;
100 }
101 
UpdateCounters(const RtpPacketReceived & packet)102 void StreamStatisticianImpl::UpdateCounters(const RtpPacketReceived& packet) {
103   MutexLock lock(&stream_lock_);
104   RTC_DCHECK_EQ(ssrc_, packet.Ssrc());
105   int64_t now_ms = clock_->TimeInMilliseconds();
106 
107   incoming_bitrate_.Update(packet.size(), now_ms);
108   receive_counters_.last_packet_received_timestamp_ms = now_ms;
109   receive_counters_.transmitted.AddPacket(packet);
110   --cumulative_loss_;
111 
112   int64_t sequence_number =
113       seq_unwrapper_.UnwrapWithoutUpdate(packet.SequenceNumber());
114 
115   if (!ReceivedRtpPacket()) {
116     received_seq_first_ = sequence_number;
117     last_report_seq_max_ = sequence_number - 1;
118     received_seq_max_ = sequence_number - 1;
119     receive_counters_.first_packet_time_ms = now_ms;
120   } else if (UpdateOutOfOrder(packet, sequence_number, now_ms)) {
121     return;
122   }
123   // In order packet.
124   cumulative_loss_ += sequence_number - received_seq_max_;
125   received_seq_max_ = sequence_number;
126   seq_unwrapper_.UpdateLast(sequence_number);
127 
128   // If new time stamp and more than one in-order packet received, calculate
129   // new jitter statistics.
130   if (packet.Timestamp() != last_received_timestamp_ &&
131       (receive_counters_.transmitted.packets -
132        receive_counters_.retransmitted.packets) > 1) {
133     UpdateJitter(packet, now_ms);
134   }
135   last_received_timestamp_ = packet.Timestamp();
136   last_receive_time_ms_ = now_ms;
137 }
138 
UpdateJitter(const RtpPacketReceived & packet,int64_t receive_time_ms)139 void StreamStatisticianImpl::UpdateJitter(const RtpPacketReceived& packet,
140                                           int64_t receive_time_ms) {
141   int64_t receive_diff_ms = receive_time_ms - last_receive_time_ms_;
142   RTC_DCHECK_GE(receive_diff_ms, 0);
143   uint32_t receive_diff_rtp = static_cast<uint32_t>(
144       (receive_diff_ms * packet.payload_type_frequency()) / 1000);
145   int32_t time_diff_samples =
146       receive_diff_rtp - (packet.Timestamp() - last_received_timestamp_);
147 
148   time_diff_samples = std::abs(time_diff_samples);
149 
150   // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
151   // If this happens, don't update jitter value. Use 5 secs video frequency
152   // as the threshold.
153   if (time_diff_samples < 450000) {
154     // Note we calculate in Q4 to avoid using float.
155     int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
156     jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
157   }
158 }
159 
SetMaxReorderingThreshold(int max_reordering_threshold)160 void StreamStatisticianImpl::SetMaxReorderingThreshold(
161     int max_reordering_threshold) {
162   MutexLock lock(&stream_lock_);
163   max_reordering_threshold_ = max_reordering_threshold;
164 }
165 
EnableRetransmitDetection(bool enable)166 void StreamStatisticianImpl::EnableRetransmitDetection(bool enable) {
167   MutexLock lock(&stream_lock_);
168   enable_retransmit_detection_ = enable;
169 }
170 
GetStats() const171 RtpReceiveStats StreamStatisticianImpl::GetStats() const {
172   MutexLock lock(&stream_lock_);
173   RtpReceiveStats stats;
174   stats.packets_lost = cumulative_loss_;
175   // TODO(nisse): Can we return a float instead?
176   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
177   stats.jitter = jitter_q4_ >> 4;
178   stats.last_packet_received_timestamp_ms =
179       receive_counters_.last_packet_received_timestamp_ms;
180   stats.packet_counter = receive_counters_.transmitted;
181   return stats;
182 }
183 
GetActiveStatisticsAndReset(RtcpStatistics * statistics)184 bool StreamStatisticianImpl::GetActiveStatisticsAndReset(
185     RtcpStatistics* statistics) {
186   MutexLock lock(&stream_lock_);
187   if (clock_->TimeInMilliseconds() - last_receive_time_ms_ >=
188       kStatisticsTimeoutMs) {
189     // Not active.
190     return false;
191   }
192   if (!ReceivedRtpPacket()) {
193     return false;
194   }
195 
196   *statistics = CalculateRtcpStatistics();
197 
198   return true;
199 }
200 
CalculateRtcpStatistics()201 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
202   RtcpStatistics stats;
203   // Calculate fraction lost.
204   int64_t exp_since_last = received_seq_max_ - last_report_seq_max_;
205   RTC_DCHECK_GE(exp_since_last, 0);
206 
207   int32_t lost_since_last = cumulative_loss_ - last_report_cumulative_loss_;
208   if (exp_since_last > 0 && lost_since_last > 0) {
209     // Scale 0 to 255, where 255 is 100% loss.
210     stats.fraction_lost =
211         static_cast<uint8_t>(255 * lost_since_last / exp_since_last);
212   } else {
213     stats.fraction_lost = 0;
214   }
215 
216   // TODO(danilchap): Ensure |stats.packets_lost| is clamped to fit in a signed
217   // 24-bit value.
218   stats.packets_lost = cumulative_loss_ + cumulative_loss_rtcp_offset_;
219   if (stats.packets_lost < 0) {
220     // Clamp to zero. Work around to accomodate for senders that misbehave with
221     // negative cumulative loss.
222     stats.packets_lost = 0;
223     cumulative_loss_rtcp_offset_ = -cumulative_loss_;
224   }
225   stats.extended_highest_sequence_number =
226       static_cast<uint32_t>(received_seq_max_);
227   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
228   stats.jitter = jitter_q4_ >> 4;
229 
230   // Only for report blocks in RTCP SR and RR.
231   last_report_cumulative_loss_ = cumulative_loss_;
232   last_report_seq_max_ = received_seq_max_;
233   BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "cumulative_loss_pkts",
234                                   clock_->TimeInMilliseconds(),
235                                   cumulative_loss_, ssrc_);
236   BWE_TEST_LOGGING_PLOT_WITH_SSRC(
237       1, "received_seq_max_pkts", clock_->TimeInMilliseconds(),
238       (received_seq_max_ - received_seq_first_), ssrc_);
239 
240   return stats;
241 }
242 
GetFractionLostInPercent() const243 absl::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const {
244   MutexLock lock(&stream_lock_);
245   if (!ReceivedRtpPacket()) {
246     return absl::nullopt;
247   }
248   int64_t expected_packets = 1 + received_seq_max_ - received_seq_first_;
249   if (expected_packets <= 0) {
250     return absl::nullopt;
251   }
252   if (cumulative_loss_ <= 0) {
253     return 0;
254   }
255   return 100 * static_cast<int64_t>(cumulative_loss_) / expected_packets;
256 }
257 
GetReceiveStreamDataCounters() const258 StreamDataCounters StreamStatisticianImpl::GetReceiveStreamDataCounters()
259     const {
260   MutexLock lock(&stream_lock_);
261   return receive_counters_;
262 }
263 
BitrateReceived() const264 uint32_t StreamStatisticianImpl::BitrateReceived() const {
265   MutexLock lock(&stream_lock_);
266   return incoming_bitrate_.Rate(clock_->TimeInMilliseconds()).value_or(0);
267 }
268 
IsRetransmitOfOldPacket(const RtpPacketReceived & packet,int64_t now_ms) const269 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
270     const RtpPacketReceived& packet,
271     int64_t now_ms) const {
272   uint32_t frequency_khz = packet.payload_type_frequency() / 1000;
273   RTC_DCHECK_GT(frequency_khz, 0);
274 
275   int64_t time_diff_ms = now_ms - last_receive_time_ms_;
276 
277   // Diff in time stamp since last received in order.
278   uint32_t timestamp_diff = packet.Timestamp() - last_received_timestamp_;
279   uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
280 
281   int64_t max_delay_ms = 0;
282 
283   // Jitter standard deviation in samples.
284   float jitter_std = std::sqrt(static_cast<float>(jitter_q4_ >> 4));
285 
286   // 2 times the standard deviation => 95% confidence.
287   // And transform to milliseconds by dividing by the frequency in kHz.
288   max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
289 
290   // Min max_delay_ms is 1.
291   if (max_delay_ms == 0) {
292     max_delay_ms = 1;
293   }
294   return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
295 }
296 
Create(Clock * clock)297 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::Create(Clock* clock) {
298   return std::make_unique<ReceiveStatisticsImpl>(clock);
299 }
300 
ReceiveStatisticsImpl(Clock * clock)301 ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
302     : clock_(clock),
303       last_returned_ssrc_(0),
304       max_reordering_threshold_(kDefaultMaxReorderingThreshold) {}
305 
~ReceiveStatisticsImpl()306 ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
307   while (!statisticians_.empty()) {
308     delete statisticians_.begin()->second;
309     statisticians_.erase(statisticians_.begin());
310   }
311 }
312 
OnRtpPacket(const RtpPacketReceived & packet)313 void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) {
314   // StreamStatisticianImpl instance is created once and only destroyed when
315   // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
316   // it's own locking so don't hold receive_statistics_lock_ (potential
317   // deadlock).
318   GetOrCreateStatistician(packet.Ssrc())->UpdateCounters(packet);
319 }
320 
GetStatistician(uint32_t ssrc) const321 StreamStatisticianImpl* ReceiveStatisticsImpl::GetStatistician(
322     uint32_t ssrc) const {
323   MutexLock lock(&receive_statistics_lock_);
324   const auto& it = statisticians_.find(ssrc);
325   if (it == statisticians_.end())
326     return NULL;
327   return it->second;
328 }
329 
GetOrCreateStatistician(uint32_t ssrc)330 StreamStatisticianImpl* ReceiveStatisticsImpl::GetOrCreateStatistician(
331     uint32_t ssrc) {
332   MutexLock lock(&receive_statistics_lock_);
333   StreamStatisticianImpl*& impl = statisticians_[ssrc];
334   if (impl == nullptr) {  // new element
335     impl = new StreamStatisticianImpl(ssrc, clock_, max_reordering_threshold_);
336   }
337   return impl;
338 }
339 
SetMaxReorderingThreshold(int max_reordering_threshold)340 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
341     int max_reordering_threshold) {
342   std::map<uint32_t, StreamStatisticianImpl*> statisticians;
343   {
344     MutexLock lock(&receive_statistics_lock_);
345     max_reordering_threshold_ = max_reordering_threshold;
346     statisticians = statisticians_;
347   }
348   for (auto& statistician : statisticians) {
349     statistician.second->SetMaxReorderingThreshold(max_reordering_threshold);
350   }
351 }
352 
SetMaxReorderingThreshold(uint32_t ssrc,int max_reordering_threshold)353 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
354     uint32_t ssrc,
355     int max_reordering_threshold) {
356   GetOrCreateStatistician(ssrc)->SetMaxReorderingThreshold(
357       max_reordering_threshold);
358 }
359 
EnableRetransmitDetection(uint32_t ssrc,bool enable)360 void ReceiveStatisticsImpl::EnableRetransmitDetection(uint32_t ssrc,
361                                                       bool enable) {
362   GetOrCreateStatistician(ssrc)->EnableRetransmitDetection(enable);
363 }
364 
RtcpReportBlocks(size_t max_blocks)365 std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks(
366     size_t max_blocks) {
367   std::map<uint32_t, StreamStatisticianImpl*> statisticians;
368   {
369     MutexLock lock(&receive_statistics_lock_);
370     statisticians = statisticians_;
371   }
372   std::vector<rtcp::ReportBlock> result;
373   result.reserve(std::min(max_blocks, statisticians.size()));
374   auto add_report_block = [&result](uint32_t media_ssrc,
375                                     StreamStatisticianImpl* statistician) {
376     // Do we have receive statistics to send?
377     RtcpStatistics stats;
378     if (!statistician->GetActiveStatisticsAndReset(&stats))
379       return;
380     result.emplace_back();
381     rtcp::ReportBlock& block = result.back();
382     block.SetMediaSsrc(media_ssrc);
383     block.SetFractionLost(stats.fraction_lost);
384     if (!block.SetCumulativeLost(stats.packets_lost)) {
385       RTC_LOG(LS_WARNING) << "Cumulative lost is oversized.";
386       result.pop_back();
387       return;
388     }
389     block.SetExtHighestSeqNum(stats.extended_highest_sequence_number);
390     block.SetJitter(stats.jitter);
391   };
392 
393   const auto start_it = statisticians.upper_bound(last_returned_ssrc_);
394   for (auto it = start_it;
395        result.size() < max_blocks && it != statisticians.end(); ++it)
396     add_report_block(it->first, it->second);
397   for (auto it = statisticians.begin();
398        result.size() < max_blocks && it != start_it; ++it)
399     add_report_block(it->first, it->second);
400 
401   if (!result.empty())
402     last_returned_ssrc_ = result.back().source_ssrc();
403   return result;
404 }
405 
406 }  // namespace webrtc
407