1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h"
12 
13 #include <math.h>
14 
15 #include <algorithm>
16 
17 #include "api/transport/field_trial_based_config.h"
18 #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
19 #include "rtc_base/checks.h"
20 #include "rtc_base/constructor_magic.h"
21 #include "rtc_base/logging.h"
22 #include "rtc_base/thread_annotations.h"
23 #include "system_wrappers/include/metrics.h"
24 
25 namespace webrtc {
26 namespace {
OptionalRateFromOptionalBps(absl::optional<int> bitrate_bps)27 absl::optional<DataRate> OptionalRateFromOptionalBps(
28     absl::optional<int> bitrate_bps) {
29   if (bitrate_bps) {
30     return DataRate::BitsPerSec(*bitrate_bps);
31   } else {
32     return absl::nullopt;
33   }
34 }
35 }  // namespace
36 
37 enum {
38   kTimestampGroupLengthMs = 5,
39   kAbsSendTimeInterArrivalUpshift = 8,
40   kInterArrivalShift = RTPHeaderExtension::kAbsSendTimeFraction +
41                        kAbsSendTimeInterArrivalUpshift,
42   kInitialProbingIntervalMs = 2000,
43   kMinClusterSize = 4,
44   kMaxProbePackets = 15,
45   kExpectedNumberOfProbes = 3
46 };
47 
48 static const double kTimestampToMs =
49     1000.0 / static_cast<double>(1 << kInterArrivalShift);
50 
51 template <typename K, typename V>
Keys(const std::map<K,V> & map)52 std::vector<K> Keys(const std::map<K, V>& map) {
53   std::vector<K> keys;
54   keys.reserve(map.size());
55   for (typename std::map<K, V>::const_iterator it = map.begin();
56        it != map.end(); ++it) {
57     keys.push_back(it->first);
58   }
59   return keys;
60 }
61 
ConvertMsTo24Bits(int64_t time_ms)62 uint32_t ConvertMsTo24Bits(int64_t time_ms) {
63   uint32_t time_24_bits =
64       static_cast<uint32_t>(((static_cast<uint64_t>(time_ms)
65                               << RTPHeaderExtension::kAbsSendTimeFraction) +
66                              500) /
67                             1000) &
68       0x00FFFFFF;
69   return time_24_bits;
70 }
71 
72 RemoteBitrateEstimatorAbsSendTime::~RemoteBitrateEstimatorAbsSendTime() =
73     default;
74 
IsWithinClusterBounds(int send_delta_ms,const Cluster & cluster_aggregate)75 bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds(
76     int send_delta_ms,
77     const Cluster& cluster_aggregate) {
78   if (cluster_aggregate.count == 0)
79     return true;
80   float cluster_mean = cluster_aggregate.send_mean_ms /
81                        static_cast<float>(cluster_aggregate.count);
82   return fabs(static_cast<float>(send_delta_ms) - cluster_mean) < 2.5f;
83 }
84 
AddCluster(std::list<Cluster> * clusters,Cluster * cluster)85 void RemoteBitrateEstimatorAbsSendTime::AddCluster(std::list<Cluster>* clusters,
86                                                    Cluster* cluster) {
87   cluster->send_mean_ms /= static_cast<float>(cluster->count);
88   cluster->recv_mean_ms /= static_cast<float>(cluster->count);
89   cluster->mean_size /= cluster->count;
90   clusters->push_back(*cluster);
91 }
92 
RemoteBitrateEstimatorAbsSendTime(RemoteBitrateObserver * observer,Clock * clock)93 RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime(
94     RemoteBitrateObserver* observer,
95     Clock* clock)
96     : clock_(clock),
97       observer_(observer),
98       inter_arrival_(),
99       estimator_(),
100       detector_(&field_trials_),
101       incoming_bitrate_(kBitrateWindowMs, 8000),
102       incoming_bitrate_initialized_(false),
103       total_probes_received_(0),
104       first_packet_time_ms_(-1),
105       last_update_ms_(-1),
106       uma_recorded_(false),
107       remote_rate_(&field_trials_) {
108   RTC_DCHECK(clock_);
109   RTC_DCHECK(observer_);
110   RTC_LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating.";
111 }
112 
ComputeClusters(std::list<Cluster> * clusters) const113 void RemoteBitrateEstimatorAbsSendTime::ComputeClusters(
114     std::list<Cluster>* clusters) const {
115   Cluster current;
116   int64_t prev_send_time = -1;
117   int64_t prev_recv_time = -1;
118   for (std::list<Probe>::const_iterator it = probes_.begin();
119        it != probes_.end(); ++it) {
120     if (prev_send_time >= 0) {
121       int send_delta_ms = it->send_time_ms - prev_send_time;
122       int recv_delta_ms = it->recv_time_ms - prev_recv_time;
123       if (send_delta_ms >= 1 && recv_delta_ms >= 1) {
124         ++current.num_above_min_delta;
125       }
126       if (!IsWithinClusterBounds(send_delta_ms, current)) {
127         if (current.count >= kMinClusterSize && current.send_mean_ms > 0.0f &&
128             current.recv_mean_ms > 0.0f) {
129           AddCluster(clusters, &current);
130         }
131         current = Cluster();
132       }
133       current.send_mean_ms += send_delta_ms;
134       current.recv_mean_ms += recv_delta_ms;
135       current.mean_size += it->payload_size;
136       ++current.count;
137     }
138     prev_send_time = it->send_time_ms;
139     prev_recv_time = it->recv_time_ms;
140   }
141   if (current.count >= kMinClusterSize && current.send_mean_ms > 0.0f &&
142       current.recv_mean_ms > 0.0f) {
143     AddCluster(clusters, &current);
144   }
145 }
146 
147 std::list<Cluster>::const_iterator
FindBestProbe(const std::list<Cluster> & clusters) const148 RemoteBitrateEstimatorAbsSendTime::FindBestProbe(
149     const std::list<Cluster>& clusters) const {
150   int highest_probe_bitrate_bps = 0;
151   std::list<Cluster>::const_iterator best_it = clusters.end();
152   for (std::list<Cluster>::const_iterator it = clusters.begin();
153        it != clusters.end(); ++it) {
154     if (it->send_mean_ms == 0 || it->recv_mean_ms == 0)
155       continue;
156     if (it->num_above_min_delta > it->count / 2 &&
157         (it->recv_mean_ms - it->send_mean_ms <= 2.0f &&
158          it->send_mean_ms - it->recv_mean_ms <= 5.0f)) {
159       int probe_bitrate_bps =
160           std::min(it->GetSendBitrateBps(), it->GetRecvBitrateBps());
161       if (probe_bitrate_bps > highest_probe_bitrate_bps) {
162         highest_probe_bitrate_bps = probe_bitrate_bps;
163         best_it = it;
164       }
165     } else {
166       int send_bitrate_bps = it->mean_size * 8 * 1000 / it->send_mean_ms;
167       int recv_bitrate_bps = it->mean_size * 8 * 1000 / it->recv_mean_ms;
168       RTC_LOG(LS_INFO) << "Probe failed, sent at " << send_bitrate_bps
169                        << " bps, received at " << recv_bitrate_bps
170                        << " bps. Mean send delta: " << it->send_mean_ms
171                        << " ms, mean recv delta: " << it->recv_mean_ms
172                        << " ms, num probes: " << it->count;
173       break;
174     }
175   }
176   return best_it;
177 }
178 
179 RemoteBitrateEstimatorAbsSendTime::ProbeResult
ProcessClusters(int64_t now_ms)180 RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) {
181   std::list<Cluster> clusters;
182   ComputeClusters(&clusters);
183   if (clusters.empty()) {
184     // If we reach the max number of probe packets and still have no clusters,
185     // we will remove the oldest one.
186     if (probes_.size() >= kMaxProbePackets)
187       probes_.pop_front();
188     return ProbeResult::kNoUpdate;
189   }
190 
191   std::list<Cluster>::const_iterator best_it = FindBestProbe(clusters);
192   if (best_it != clusters.end()) {
193     int probe_bitrate_bps =
194         std::min(best_it->GetSendBitrateBps(), best_it->GetRecvBitrateBps());
195     // Make sure that a probe sent on a lower bitrate than our estimate can't
196     // reduce the estimate.
197     if (IsBitrateImproving(probe_bitrate_bps)) {
198       RTC_LOG(LS_INFO) << "Probe successful, sent at "
199                        << best_it->GetSendBitrateBps() << " bps, received at "
200                        << best_it->GetRecvBitrateBps()
201                        << " bps. Mean send delta: " << best_it->send_mean_ms
202                        << " ms, mean recv delta: " << best_it->recv_mean_ms
203                        << " ms, num probes: " << best_it->count;
204       remote_rate_.SetEstimate(DataRate::BitsPerSec(probe_bitrate_bps),
205                                Timestamp::Millis(now_ms));
206       return ProbeResult::kBitrateUpdated;
207     }
208   }
209 
210   // Not probing and received non-probe packet, or finished with current set
211   // of probes.
212   if (clusters.size() >= kExpectedNumberOfProbes)
213     probes_.clear();
214   return ProbeResult::kNoUpdate;
215 }
216 
IsBitrateImproving(int new_bitrate_bps) const217 bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving(
218     int new_bitrate_bps) const {
219   bool initial_probe = !remote_rate_.ValidEstimate() && new_bitrate_bps > 0;
220   bool bitrate_above_estimate =
221       remote_rate_.ValidEstimate() &&
222       new_bitrate_bps > remote_rate_.LatestEstimate().bps<int>();
223   return initial_probe || bitrate_above_estimate;
224 }
225 
IncomingPacket(int64_t arrival_time_ms,size_t payload_size,const RTPHeader & header)226 void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(
227     int64_t arrival_time_ms,
228     size_t payload_size,
229     const RTPHeader& header) {
230   RTC_DCHECK_RUNS_SERIALIZED(&network_race_);
231   if (!header.extension.hasAbsoluteSendTime) {
232     RTC_LOG(LS_WARNING)
233         << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet "
234            "is missing absolute send time extension!";
235     return;
236   }
237   IncomingPacketInfo(arrival_time_ms, header.extension.absoluteSendTime,
238                      payload_size, header.ssrc);
239 }
240 
IncomingPacketInfo(int64_t arrival_time_ms,uint32_t send_time_24bits,size_t payload_size,uint32_t ssrc)241 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo(
242     int64_t arrival_time_ms,
243     uint32_t send_time_24bits,
244     size_t payload_size,
245     uint32_t ssrc) {
246   RTC_CHECK(send_time_24bits < (1ul << 24));
247   if (!uma_recorded_) {
248     RTC_HISTOGRAM_ENUMERATION(kBweTypeHistogram, BweNames::kReceiverAbsSendTime,
249                               BweNames::kBweNamesMax);
250     uma_recorded_ = true;
251   }
252   // Shift up send time to use the full 32 bits that inter_arrival works with,
253   // so wrapping works properly.
254   uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
255   int64_t send_time_ms = static_cast<int64_t>(timestamp) * kTimestampToMs;
256 
257   int64_t now_ms = clock_->TimeInMilliseconds();
258   // TODO(holmer): SSRCs are only needed for REMB, should be broken out from
259   // here.
260 
261   // Check if incoming bitrate estimate is valid, and if it needs to be reset.
262   absl::optional<uint32_t> incoming_bitrate =
263       incoming_bitrate_.Rate(arrival_time_ms);
264   if (incoming_bitrate) {
265     incoming_bitrate_initialized_ = true;
266   } else if (incoming_bitrate_initialized_) {
267     // Incoming bitrate had a previous valid value, but now not enough data
268     // point are left within the current window. Reset incoming bitrate
269     // estimator so that the window size will only contain new data points.
270     incoming_bitrate_.Reset();
271     incoming_bitrate_initialized_ = false;
272   }
273   incoming_bitrate_.Update(payload_size, arrival_time_ms);
274 
275   if (first_packet_time_ms_ == -1)
276     first_packet_time_ms_ = now_ms;
277 
278   uint32_t ts_delta = 0;
279   int64_t t_delta = 0;
280   int size_delta = 0;
281   bool update_estimate = false;
282   uint32_t target_bitrate_bps = 0;
283   std::vector<uint32_t> ssrcs;
284   {
285     MutexLock lock(&mutex_);
286 
287     TimeoutStreams(now_ms);
288     RTC_DCHECK(inter_arrival_.get());
289     RTC_DCHECK(estimator_.get());
290     ssrcs_[ssrc] = now_ms;
291 
292     // For now only try to detect probes while we don't have a valid estimate.
293     // We currently assume that only packets larger than 200 bytes are paced by
294     // the sender.
295     const size_t kMinProbePacketSize = 200;
296     if (payload_size > kMinProbePacketSize &&
297         (!remote_rate_.ValidEstimate() ||
298          now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) {
299       // TODO(holmer): Use a map instead to get correct order?
300       if (total_probes_received_ < kMaxProbePackets) {
301         int send_delta_ms = -1;
302         int recv_delta_ms = -1;
303         if (!probes_.empty()) {
304           send_delta_ms = send_time_ms - probes_.back().send_time_ms;
305           recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms;
306         }
307         RTC_LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms
308                          << " ms, recv time=" << arrival_time_ms
309                          << " ms, send delta=" << send_delta_ms
310                          << " ms, recv delta=" << recv_delta_ms << " ms.";
311       }
312       probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size));
313       ++total_probes_received_;
314       // Make sure that a probe which updated the bitrate immediately has an
315       // effect by calling the OnReceiveBitrateChanged callback.
316       if (ProcessClusters(now_ms) == ProbeResult::kBitrateUpdated)
317         update_estimate = true;
318     }
319     if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, now_ms,
320                                       payload_size, &ts_delta, &t_delta,
321                                       &size_delta)) {
322       double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift);
323       estimator_->Update(t_delta, ts_delta_ms, size_delta, detector_.State(),
324                          arrival_time_ms);
325       detector_.Detect(estimator_->offset(), ts_delta_ms,
326                        estimator_->num_of_deltas(), arrival_time_ms);
327     }
328 
329     if (!update_estimate) {
330       // Check if it's time for a periodic update or if we should update because
331       // of an over-use.
332       if (last_update_ms_ == -1 ||
333           now_ms - last_update_ms_ > remote_rate_.GetFeedbackInterval().ms()) {
334         update_estimate = true;
335       } else if (detector_.State() == BandwidthUsage::kBwOverusing) {
336         absl::optional<uint32_t> incoming_rate =
337             incoming_bitrate_.Rate(arrival_time_ms);
338         if (incoming_rate && remote_rate_.TimeToReduceFurther(
339                                  Timestamp::Millis(now_ms),
340                                  DataRate::BitsPerSec(*incoming_rate))) {
341           update_estimate = true;
342         }
343       }
344     }
345 
346     if (update_estimate) {
347       // The first overuse should immediately trigger a new estimate.
348       // We also have to update the estimate immediately if we are overusing
349       // and the target bitrate is too high compared to what we are receiving.
350       const RateControlInput input(
351           detector_.State(),
352           OptionalRateFromOptionalBps(incoming_bitrate_.Rate(arrival_time_ms)));
353       target_bitrate_bps =
354           remote_rate_.Update(&input, Timestamp::Millis(now_ms))
355               .bps<uint32_t>();
356       update_estimate = remote_rate_.ValidEstimate();
357       ssrcs = Keys(ssrcs_);
358     }
359   }
360   if (update_estimate) {
361     last_update_ms_ = now_ms;
362     observer_->OnReceiveBitrateChanged(ssrcs, target_bitrate_bps);
363   }
364 }
365 
Process()366 void RemoteBitrateEstimatorAbsSendTime::Process() {}
367 
TimeUntilNextProcess()368 int64_t RemoteBitrateEstimatorAbsSendTime::TimeUntilNextProcess() {
369   const int64_t kDisabledModuleTime = 1000;
370   return kDisabledModuleTime;
371 }
372 
TimeoutStreams(int64_t now_ms)373 void RemoteBitrateEstimatorAbsSendTime::TimeoutStreams(int64_t now_ms) {
374   for (Ssrcs::iterator it = ssrcs_.begin(); it != ssrcs_.end();) {
375     if ((now_ms - it->second) > kStreamTimeOutMs) {
376       ssrcs_.erase(it++);
377     } else {
378       ++it;
379     }
380   }
381   if (ssrcs_.empty()) {
382     // We can't update the estimate if we don't have any active streams.
383     inter_arrival_.reset(
384         new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000,
385                          kTimestampToMs, true));
386     estimator_.reset(new OveruseEstimator(OverUseDetectorOptions()));
387     // We deliberately don't reset the first_packet_time_ms_ here for now since
388     // we only probe for bandwidth in the beginning of a call right now.
389   }
390 }
391 
OnRttUpdate(int64_t avg_rtt_ms,int64_t max_rtt_ms)392 void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms,
393                                                     int64_t max_rtt_ms) {
394   MutexLock lock(&mutex_);
395   remote_rate_.SetRtt(TimeDelta::Millis(avg_rtt_ms));
396 }
397 
RemoveStream(uint32_t ssrc)398 void RemoteBitrateEstimatorAbsSendTime::RemoveStream(uint32_t ssrc) {
399   MutexLock lock(&mutex_);
400   ssrcs_.erase(ssrc);
401 }
402 
LatestEstimate(std::vector<uint32_t> * ssrcs,uint32_t * bitrate_bps) const403 bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate(
404     std::vector<uint32_t>* ssrcs,
405     uint32_t* bitrate_bps) const {
406   // Currently accessed from both the process thread (see
407   // ModuleRtpRtcpImpl::Process()) and the configuration thread (see
408   // Call::GetStats()). Should in the future only be accessed from a single
409   // thread.
410   RTC_DCHECK(ssrcs);
411   RTC_DCHECK(bitrate_bps);
412   MutexLock lock(&mutex_);
413   if (!remote_rate_.ValidEstimate()) {
414     return false;
415   }
416   *ssrcs = Keys(ssrcs_);
417   if (ssrcs_.empty()) {
418     *bitrate_bps = 0;
419   } else {
420     *bitrate_bps = remote_rate_.LatestEstimate().bps<uint32_t>();
421   }
422   return true;
423 }
424 
SetMinBitrate(int min_bitrate_bps)425 void RemoteBitrateEstimatorAbsSendTime::SetMinBitrate(int min_bitrate_bps) {
426   // Called from both the configuration thread and the network thread. Shouldn't
427   // be called from the network thread in the future.
428   MutexLock lock(&mutex_);
429   remote_rate_.SetMinBitrate(DataRate::BitsPerSec(min_bitrate_bps));
430 }
431 }  // namespace webrtc
432