1 /*
2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/receive_statistics_impl.h"
12
13 #include <cmath>
14 #include <cstdlib>
15 #include <memory>
16 #include <utility>
17 #include <vector>
18
19 #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
20 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
21 #include "modules/rtp_rtcp/source/rtp_rtcp_config.h"
22 #include "modules/rtp_rtcp/source/time_util.h"
23 #include "rtc_base/logging.h"
24 #include "system_wrappers/include/clock.h"
25
26 namespace webrtc {
27 namespace {
28 constexpr int64_t kStatisticsTimeoutMs = 8000;
29 constexpr int64_t kStatisticsProcessIntervalMs = 1000;
30
31 // Number of seconds since 1900 January 1 00:00 GMT (see
32 // https://tools.ietf.org/html/rfc868).
33 constexpr int64_t kNtpJan1970Millisecs = 2'208'988'800'000;
34 } // namespace
35
~StreamStatistician()36 StreamStatistician::~StreamStatistician() {}
37
StreamStatisticianImpl(uint32_t ssrc,Clock * clock,int max_reordering_threshold)38 StreamStatisticianImpl::StreamStatisticianImpl(uint32_t ssrc,
39 Clock* clock,
40 int max_reordering_threshold)
41 : ssrc_(ssrc),
42 clock_(clock),
43 delta_internal_unix_epoch_ms_(clock_->CurrentNtpInMilliseconds() -
44 clock_->TimeInMilliseconds() -
45 kNtpJan1970Millisecs),
46 incoming_bitrate_(kStatisticsProcessIntervalMs,
47 RateStatistics::kBpsScale),
48 max_reordering_threshold_(max_reordering_threshold),
49 enable_retransmit_detection_(false),
50 jitter_q4_(0),
51 cumulative_loss_(0),
52 cumulative_loss_rtcp_offset_(0),
53 last_receive_time_ms_(0),
54 last_received_timestamp_(0),
55 received_seq_first_(-1),
56 received_seq_max_(-1),
57 last_report_cumulative_loss_(0),
58 last_report_seq_max_(-1) {}
59
60 StreamStatisticianImpl::~StreamStatisticianImpl() = default;
61
UpdateOutOfOrder(const RtpPacketReceived & packet,int64_t sequence_number,int64_t now_ms)62 bool StreamStatisticianImpl::UpdateOutOfOrder(const RtpPacketReceived& packet,
63 int64_t sequence_number,
64 int64_t now_ms) {
65 // Check if |packet| is second packet of a stream restart.
66 if (received_seq_out_of_order_) {
67 // Count the previous packet as a received; it was postponed below.
68 --cumulative_loss_;
69
70 uint16_t expected_sequence_number = *received_seq_out_of_order_ + 1;
71 received_seq_out_of_order_ = absl::nullopt;
72 if (packet.SequenceNumber() == expected_sequence_number) {
73 // Ignore sequence number gap caused by stream restart for packet loss
74 // calculation, by setting received_seq_max_ to the sequence number just
75 // before the out-of-order seqno. This gives a net zero change of
76 // |cumulative_loss_|, for the two packets interpreted as a stream reset.
77 //
78 // Fraction loss for the next report may get a bit off, since we don't
79 // update last_report_seq_max_ and last_report_cumulative_loss_ in a
80 // consistent way.
81 last_report_seq_max_ = sequence_number - 2;
82 received_seq_max_ = sequence_number - 2;
83 return false;
84 }
85 }
86
87 if (std::abs(sequence_number - received_seq_max_) >
88 max_reordering_threshold_) {
89 // Sequence number gap looks too large, wait until next packet to check
90 // for a stream restart.
91 received_seq_out_of_order_ = packet.SequenceNumber();
92 // Postpone counting this as a received packet until we know how to update
93 // |received_seq_max_|, otherwise we temporarily decrement
94 // |cumulative_loss_|. The
95 // ReceiveStatisticsTest.StreamRestartDoesntCountAsLoss test expects
96 // |cumulative_loss_| to be unchanged by the reception of the first packet
97 // after stream reset.
98 ++cumulative_loss_;
99 return true;
100 }
101
102 if (sequence_number > received_seq_max_)
103 return false;
104
105 // Old out of order packet, may be retransmit.
106 if (enable_retransmit_detection_ && IsRetransmitOfOldPacket(packet, now_ms))
107 receive_counters_.retransmitted.AddPacket(packet);
108 return true;
109 }
110
UpdateCounters(const RtpPacketReceived & packet)111 void StreamStatisticianImpl::UpdateCounters(const RtpPacketReceived& packet) {
112 RTC_DCHECK_EQ(ssrc_, packet.Ssrc());
113 int64_t now_ms = clock_->TimeInMilliseconds();
114
115 incoming_bitrate_.Update(packet.size(), now_ms);
116 receive_counters_.last_packet_received_timestamp_ms = now_ms;
117 receive_counters_.transmitted.AddPacket(packet);
118 --cumulative_loss_;
119
120 int64_t sequence_number =
121 seq_unwrapper_.UnwrapWithoutUpdate(packet.SequenceNumber());
122
123 if (!ReceivedRtpPacket()) {
124 received_seq_first_ = sequence_number;
125 last_report_seq_max_ = sequence_number - 1;
126 received_seq_max_ = sequence_number - 1;
127 receive_counters_.first_packet_time_ms = now_ms;
128 } else if (UpdateOutOfOrder(packet, sequence_number, now_ms)) {
129 return;
130 }
131 // In order packet.
132 cumulative_loss_ += sequence_number - received_seq_max_;
133 received_seq_max_ = sequence_number;
134 seq_unwrapper_.UpdateLast(sequence_number);
135
136 // If new time stamp and more than one in-order packet received, calculate
137 // new jitter statistics.
138 if (packet.Timestamp() != last_received_timestamp_ &&
139 (receive_counters_.transmitted.packets -
140 receive_counters_.retransmitted.packets) > 1) {
141 UpdateJitter(packet, now_ms);
142 }
143 last_received_timestamp_ = packet.Timestamp();
144 last_receive_time_ms_ = now_ms;
145 }
146
UpdateJitter(const RtpPacketReceived & packet,int64_t receive_time_ms)147 void StreamStatisticianImpl::UpdateJitter(const RtpPacketReceived& packet,
148 int64_t receive_time_ms) {
149 int64_t receive_diff_ms = receive_time_ms - last_receive_time_ms_;
150 RTC_DCHECK_GE(receive_diff_ms, 0);
151 uint32_t receive_diff_rtp = static_cast<uint32_t>(
152 (receive_diff_ms * packet.payload_type_frequency()) / 1000);
153 int32_t time_diff_samples =
154 receive_diff_rtp - (packet.Timestamp() - last_received_timestamp_);
155
156 time_diff_samples = std::abs(time_diff_samples);
157
158 // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
159 // If this happens, don't update jitter value. Use 5 secs video frequency
160 // as the threshold.
161 if (time_diff_samples < 450000) {
162 // Note we calculate in Q4 to avoid using float.
163 int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
164 jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
165 }
166 }
167
SetMaxReorderingThreshold(int max_reordering_threshold)168 void StreamStatisticianImpl::SetMaxReorderingThreshold(
169 int max_reordering_threshold) {
170 max_reordering_threshold_ = max_reordering_threshold;
171 }
172
EnableRetransmitDetection(bool enable)173 void StreamStatisticianImpl::EnableRetransmitDetection(bool enable) {
174 enable_retransmit_detection_ = enable;
175 }
176
GetStats() const177 RtpReceiveStats StreamStatisticianImpl::GetStats() const {
178 RtpReceiveStats stats;
179 stats.packets_lost = cumulative_loss_;
180 // TODO(nisse): Can we return a float instead?
181 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
182 stats.jitter = jitter_q4_ >> 4;
183 if (receive_counters_.last_packet_received_timestamp_ms.has_value()) {
184 stats.last_packet_received_timestamp_ms =
185 *receive_counters_.last_packet_received_timestamp_ms +
186 delta_internal_unix_epoch_ms_;
187 }
188 stats.packet_counter = receive_counters_.transmitted;
189 return stats;
190 }
191
GetActiveStatisticsAndReset(RtcpStatistics * statistics)192 bool StreamStatisticianImpl::GetActiveStatisticsAndReset(
193 RtcpStatistics* statistics) {
194 if (clock_->TimeInMilliseconds() - last_receive_time_ms_ >=
195 kStatisticsTimeoutMs) {
196 // Not active.
197 return false;
198 }
199 if (!ReceivedRtpPacket()) {
200 return false;
201 }
202 *statistics = CalculateRtcpStatistics();
203 return true;
204 }
205
CalculateRtcpStatistics()206 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
207 RtcpStatistics stats;
208 // Calculate fraction lost.
209 int64_t exp_since_last = received_seq_max_ - last_report_seq_max_;
210 RTC_DCHECK_GE(exp_since_last, 0);
211
212 int32_t lost_since_last = cumulative_loss_ - last_report_cumulative_loss_;
213 if (exp_since_last > 0 && lost_since_last > 0) {
214 // Scale 0 to 255, where 255 is 100% loss.
215 stats.fraction_lost =
216 static_cast<uint8_t>(255 * lost_since_last / exp_since_last);
217 } else {
218 stats.fraction_lost = 0;
219 }
220
221 // TODO(danilchap): Ensure |stats.packets_lost| is clamped to fit in a signed
222 // 24-bit value.
223 stats.packets_lost = cumulative_loss_ + cumulative_loss_rtcp_offset_;
224 if (stats.packets_lost < 0) {
225 // Clamp to zero. Work around to accomodate for senders that misbehave with
226 // negative cumulative loss.
227 stats.packets_lost = 0;
228 cumulative_loss_rtcp_offset_ = -cumulative_loss_;
229 }
230 stats.extended_highest_sequence_number =
231 static_cast<uint32_t>(received_seq_max_);
232 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
233 stats.jitter = jitter_q4_ >> 4;
234
235 // Only for report blocks in RTCP SR and RR.
236 last_report_cumulative_loss_ = cumulative_loss_;
237 last_report_seq_max_ = received_seq_max_;
238 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "cumulative_loss_pkts",
239 clock_->TimeInMilliseconds(),
240 cumulative_loss_, ssrc_);
241 BWE_TEST_LOGGING_PLOT_WITH_SSRC(
242 1, "received_seq_max_pkts", clock_->TimeInMilliseconds(),
243 (received_seq_max_ - received_seq_first_), ssrc_);
244
245 return stats;
246 }
247
GetFractionLostInPercent() const248 absl::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const {
249 if (!ReceivedRtpPacket()) {
250 return absl::nullopt;
251 }
252 int64_t expected_packets = 1 + received_seq_max_ - received_seq_first_;
253 if (expected_packets <= 0) {
254 return absl::nullopt;
255 }
256 if (cumulative_loss_ <= 0) {
257 return 0;
258 }
259 return 100 * static_cast<int64_t>(cumulative_loss_) / expected_packets;
260 }
261
GetReceiveStreamDataCounters() const262 StreamDataCounters StreamStatisticianImpl::GetReceiveStreamDataCounters()
263 const {
264 return receive_counters_;
265 }
266
BitrateReceived() const267 uint32_t StreamStatisticianImpl::BitrateReceived() const {
268 return incoming_bitrate_.Rate(clock_->TimeInMilliseconds()).value_or(0);
269 }
270
IsRetransmitOfOldPacket(const RtpPacketReceived & packet,int64_t now_ms) const271 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
272 const RtpPacketReceived& packet,
273 int64_t now_ms) const {
274 uint32_t frequency_khz = packet.payload_type_frequency() / 1000;
275 RTC_DCHECK_GT(frequency_khz, 0);
276
277 int64_t time_diff_ms = now_ms - last_receive_time_ms_;
278
279 // Diff in time stamp since last received in order.
280 uint32_t timestamp_diff = packet.Timestamp() - last_received_timestamp_;
281 uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
282
283 int64_t max_delay_ms = 0;
284
285 // Jitter standard deviation in samples.
286 float jitter_std = std::sqrt(static_cast<float>(jitter_q4_ >> 4));
287
288 // 2 times the standard deviation => 95% confidence.
289 // And transform to milliseconds by dividing by the frequency in kHz.
290 max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
291
292 // Min max_delay_ms is 1.
293 if (max_delay_ms == 0) {
294 max_delay_ms = 1;
295 }
296 return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
297 }
298
Create(Clock * clock)299 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::Create(Clock* clock) {
300 return std::make_unique<ReceiveStatisticsLocked>(
301 clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
302 return std::make_unique<StreamStatisticianLocked>(
303 ssrc, clock, max_reordering_threshold);
304 });
305 }
306
CreateThreadCompatible(Clock * clock)307 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::CreateThreadCompatible(
308 Clock* clock) {
309 return std::make_unique<ReceiveStatisticsImpl>(
310 clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
311 return std::make_unique<StreamStatisticianImpl>(
312 ssrc, clock, max_reordering_threshold);
313 });
314 }
315
ReceiveStatisticsImpl(Clock * clock,std::function<std::unique_ptr<StreamStatisticianImplInterface> (uint32_t ssrc,Clock * clock,int max_reordering_threshold)> stream_statistician_factory)316 ReceiveStatisticsImpl::ReceiveStatisticsImpl(
317 Clock* clock,
318 std::function<std::unique_ptr<StreamStatisticianImplInterface>(
319 uint32_t ssrc,
320 Clock* clock,
321 int max_reordering_threshold)> stream_statistician_factory)
322 : clock_(clock),
323 stream_statistician_factory_(std::move(stream_statistician_factory)),
324 last_returned_ssrc_(0),
325 max_reordering_threshold_(kDefaultMaxReorderingThreshold) {}
326
OnRtpPacket(const RtpPacketReceived & packet)327 void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) {
328 // StreamStatisticianImpl instance is created once and only destroyed when
329 // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
330 // it's own locking so don't hold receive_statistics_lock_ (potential
331 // deadlock).
332 GetOrCreateStatistician(packet.Ssrc())->UpdateCounters(packet);
333 }
334
GetStatistician(uint32_t ssrc) const335 StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
336 uint32_t ssrc) const {
337 const auto& it = statisticians_.find(ssrc);
338 if (it == statisticians_.end())
339 return nullptr;
340 return it->second.get();
341 }
342
GetOrCreateStatistician(uint32_t ssrc)343 StreamStatisticianImplInterface* ReceiveStatisticsImpl::GetOrCreateStatistician(
344 uint32_t ssrc) {
345 std::unique_ptr<StreamStatisticianImplInterface>& impl = statisticians_[ssrc];
346 if (impl == nullptr) { // new element
347 impl =
348 stream_statistician_factory_(ssrc, clock_, max_reordering_threshold_);
349 }
350 return impl.get();
351 }
352
SetMaxReorderingThreshold(int max_reordering_threshold)353 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
354 int max_reordering_threshold) {
355 max_reordering_threshold_ = max_reordering_threshold;
356 for (auto& statistician : statisticians_) {
357 statistician.second->SetMaxReorderingThreshold(max_reordering_threshold);
358 }
359 }
360
SetMaxReorderingThreshold(uint32_t ssrc,int max_reordering_threshold)361 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
362 uint32_t ssrc,
363 int max_reordering_threshold) {
364 GetOrCreateStatistician(ssrc)->SetMaxReorderingThreshold(
365 max_reordering_threshold);
366 }
367
EnableRetransmitDetection(uint32_t ssrc,bool enable)368 void ReceiveStatisticsImpl::EnableRetransmitDetection(uint32_t ssrc,
369 bool enable) {
370 GetOrCreateStatistician(ssrc)->EnableRetransmitDetection(enable);
371 }
372
RtcpReportBlocks(size_t max_blocks)373 std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks(
374 size_t max_blocks) {
375 std::vector<rtcp::ReportBlock> result;
376 result.reserve(std::min(max_blocks, statisticians_.size()));
377 auto add_report_block = [&result](
378 uint32_t media_ssrc,
379 StreamStatisticianImplInterface* statistician) {
380 // Do we have receive statistics to send?
381 RtcpStatistics stats;
382 if (!statistician->GetActiveStatisticsAndReset(&stats))
383 return;
384 result.emplace_back();
385 rtcp::ReportBlock& block = result.back();
386 block.SetMediaSsrc(media_ssrc);
387 block.SetFractionLost(stats.fraction_lost);
388 if (!block.SetCumulativeLost(stats.packets_lost)) {
389 RTC_LOG(LS_WARNING) << "Cumulative lost is oversized.";
390 result.pop_back();
391 return;
392 }
393 block.SetExtHighestSeqNum(stats.extended_highest_sequence_number);
394 block.SetJitter(stats.jitter);
395 };
396
397 const auto start_it = statisticians_.upper_bound(last_returned_ssrc_);
398 for (auto it = start_it;
399 result.size() < max_blocks && it != statisticians_.end(); ++it)
400 add_report_block(it->first, it->second.get());
401 for (auto it = statisticians_.begin();
402 result.size() < max_blocks && it != start_it; ++it)
403 add_report_block(it->first, it->second.get());
404
405 if (!result.empty())
406 last_returned_ssrc_ = result.back().source_ssrc();
407 return result;
408 }
409
410 } // namespace webrtc
411