1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h"
12 
13 #include <math.h>
14 
15 #include <algorithm>
16 
17 #include "webrtc/base/checks.h"
18 #include "webrtc/base/constructormagic.h"
19 #include "webrtc/base/logging.h"
20 #include "webrtc/base/thread_annotations.h"
21 #include "webrtc/modules/pacing/paced_sender.h"
22 #include "webrtc/modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
23 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
24 #include "webrtc/system_wrappers/include/metrics.h"
25 #include "webrtc/typedefs.h"
26 
27 namespace webrtc {
28 
29 enum {
30   kTimestampGroupLengthMs = 5,
31   kAbsSendTimeFraction = 18,
32   kAbsSendTimeInterArrivalUpshift = 8,
33   kInterArrivalShift = kAbsSendTimeFraction + kAbsSendTimeInterArrivalUpshift,
34   kInitialProbingIntervalMs = 2000,
35   kMinClusterSize = 4,
36   kMaxProbePackets = 15,
37   kExpectedNumberOfProbes = 3
38 };
39 
40 static const double kTimestampToMs = 1000.0 /
41     static_cast<double>(1 << kInterArrivalShift);
42 
43 template<typename K, typename V>
Keys(const std::map<K,V> & map)44 std::vector<K> Keys(const std::map<K, V>& map) {
45   std::vector<K> keys;
46   keys.reserve(map.size());
47   for (typename std::map<K, V>::const_iterator it = map.begin();
48       it != map.end(); ++it) {
49     keys.push_back(it->first);
50   }
51   return keys;
52 }
53 
ConvertMsTo24Bits(int64_t time_ms)54 uint32_t ConvertMsTo24Bits(int64_t time_ms) {
55   uint32_t time_24_bits =
56       static_cast<uint32_t>(
57           ((static_cast<uint64_t>(time_ms) << kAbsSendTimeFraction) + 500) /
58           1000) &
59       0x00FFFFFF;
60   return time_24_bits;
61 }
62 
IsWithinClusterBounds(int send_delta_ms,const Cluster & cluster_aggregate)63 bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds(
64     int send_delta_ms,
65     const Cluster& cluster_aggregate) {
66     if (cluster_aggregate.count == 0)
67       return true;
68     float cluster_mean = cluster_aggregate.send_mean_ms /
69                          static_cast<float>(cluster_aggregate.count);
70     return fabs(static_cast<float>(send_delta_ms) - cluster_mean) < 2.5f;
71   }
72 
AddCluster(std::list<Cluster> * clusters,Cluster * cluster)73   void RemoteBitrateEstimatorAbsSendTime::AddCluster(
74       std::list<Cluster>* clusters,
75       Cluster* cluster) {
76     cluster->send_mean_ms /= static_cast<float>(cluster->count);
77     cluster->recv_mean_ms /= static_cast<float>(cluster->count);
78     cluster->mean_size /= cluster->count;
79     clusters->push_back(*cluster);
80   }
81 
RemoteBitrateEstimatorAbsSendTime(RemoteBitrateObserver * observer,Clock * clock)82   RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime(
83       RemoteBitrateObserver* observer,
84       Clock* clock)
85       : clock_(clock),
86         observer_(observer),
87         inter_arrival_(),
88         estimator_(),
89         detector_(),
90         incoming_bitrate_(kBitrateWindowMs, 8000),
91         incoming_bitrate_initialized_(false),
92         total_probes_received_(0),
93         first_packet_time_ms_(-1),
94         last_update_ms_(-1),
95         uma_recorded_(false) {
96     RTC_DCHECK(observer_);
97     LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating.";
98     network_thread_.DetachFromThread();
99 }
100 
ComputeClusters(std::list<Cluster> * clusters) const101 void RemoteBitrateEstimatorAbsSendTime::ComputeClusters(
102     std::list<Cluster>* clusters) const {
103   Cluster current;
104   int64_t prev_send_time = -1;
105   int64_t prev_recv_time = -1;
106   for (std::list<Probe>::const_iterator it = probes_.begin();
107        it != probes_.end();
108        ++it) {
109     if (prev_send_time >= 0) {
110       int send_delta_ms = it->send_time_ms - prev_send_time;
111       int recv_delta_ms = it->recv_time_ms - prev_recv_time;
112       if (send_delta_ms >= 1 && recv_delta_ms >= 1) {
113         ++current.num_above_min_delta;
114       }
115       if (!IsWithinClusterBounds(send_delta_ms, current)) {
116         if (current.count >= kMinClusterSize)
117           AddCluster(clusters, &current);
118         current = Cluster();
119       }
120       current.send_mean_ms += send_delta_ms;
121       current.recv_mean_ms += recv_delta_ms;
122       current.mean_size += it->payload_size;
123       ++current.count;
124     }
125     prev_send_time = it->send_time_ms;
126     prev_recv_time = it->recv_time_ms;
127   }
128   if (current.count >= kMinClusterSize)
129     AddCluster(clusters, &current);
130 }
131 
132 std::list<Cluster>::const_iterator
FindBestProbe(const std::list<Cluster> & clusters) const133 RemoteBitrateEstimatorAbsSendTime::FindBestProbe(
134     const std::list<Cluster>& clusters) const {
135   int highest_probe_bitrate_bps = 0;
136   std::list<Cluster>::const_iterator best_it = clusters.end();
137   for (std::list<Cluster>::const_iterator it = clusters.begin();
138        it != clusters.end();
139        ++it) {
140     if (it->send_mean_ms == 0 || it->recv_mean_ms == 0)
141       continue;
142     if (it->num_above_min_delta > it->count / 2 &&
143         (it->recv_mean_ms - it->send_mean_ms <= 2.0f &&
144          it->send_mean_ms - it->recv_mean_ms <= 5.0f)) {
145       int probe_bitrate_bps =
146           std::min(it->GetSendBitrateBps(), it->GetRecvBitrateBps());
147       if (probe_bitrate_bps > highest_probe_bitrate_bps) {
148         highest_probe_bitrate_bps = probe_bitrate_bps;
149         best_it = it;
150       }
151     } else {
152       int send_bitrate_bps = it->mean_size * 8 * 1000 / it->send_mean_ms;
153       int recv_bitrate_bps = it->mean_size * 8 * 1000 / it->recv_mean_ms;
154       LOG(LS_INFO) << "Probe failed, sent at " << send_bitrate_bps
155                    << " bps, received at " << recv_bitrate_bps
156                    << " bps. Mean send delta: " << it->send_mean_ms
157                    << " ms, mean recv delta: " << it->recv_mean_ms
158                    << " ms, num probes: " << it->count;
159       break;
160     }
161   }
162   return best_it;
163 }
164 
165 RemoteBitrateEstimatorAbsSendTime::ProbeResult
ProcessClusters(int64_t now_ms)166 RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) {
167   std::list<Cluster> clusters;
168   ComputeClusters(&clusters);
169   if (clusters.empty()) {
170     // If we reach the max number of probe packets and still have no clusters,
171     // we will remove the oldest one.
172     if (probes_.size() >= kMaxProbePackets)
173       probes_.pop_front();
174     return ProbeResult::kNoUpdate;
175   }
176 
177   std::list<Cluster>::const_iterator best_it = FindBestProbe(clusters);
178   if (best_it != clusters.end()) {
179     int probe_bitrate_bps =
180         std::min(best_it->GetSendBitrateBps(), best_it->GetRecvBitrateBps());
181     // Make sure that a probe sent on a lower bitrate than our estimate can't
182     // reduce the estimate.
183     if (IsBitrateImproving(probe_bitrate_bps)) {
184       LOG(LS_INFO) << "Probe successful, sent at "
185                    << best_it->GetSendBitrateBps() << " bps, received at "
186                    << best_it->GetRecvBitrateBps()
187                    << " bps. Mean send delta: " << best_it->send_mean_ms
188                    << " ms, mean recv delta: " << best_it->recv_mean_ms
189                    << " ms, num probes: " << best_it->count;
190       remote_rate_.SetEstimate(probe_bitrate_bps, now_ms);
191       return ProbeResult::kBitrateUpdated;
192     }
193   }
194 
195   // Not probing and received non-probe packet, or finished with current set
196   // of probes.
197   if (clusters.size() >= kExpectedNumberOfProbes)
198     probes_.clear();
199   return ProbeResult::kNoUpdate;
200 }
201 
IsBitrateImproving(int new_bitrate_bps) const202 bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving(
203     int new_bitrate_bps) const {
204   bool initial_probe = !remote_rate_.ValidEstimate() && new_bitrate_bps > 0;
205   bool bitrate_above_estimate =
206       remote_rate_.ValidEstimate() &&
207       new_bitrate_bps > static_cast<int>(remote_rate_.LatestEstimate());
208   return initial_probe || bitrate_above_estimate;
209 }
210 
IncomingPacketFeedbackVector(const std::vector<PacketInfo> & packet_feedback_vector)211 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketFeedbackVector(
212     const std::vector<PacketInfo>& packet_feedback_vector) {
213   RTC_DCHECK(network_thread_.CalledOnValidThread());
214   for (const auto& packet_info : packet_feedback_vector) {
215     IncomingPacketInfo(packet_info.arrival_time_ms,
216                        ConvertMsTo24Bits(packet_info.send_time_ms),
217                        packet_info.payload_size, 0);
218   }
219 }
220 
IncomingPacket(int64_t arrival_time_ms,size_t payload_size,const RTPHeader & header)221 void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(
222     int64_t arrival_time_ms,
223     size_t payload_size,
224     const RTPHeader& header) {
225   RTC_DCHECK(network_thread_.CalledOnValidThread());
226   if (!header.extension.hasAbsoluteSendTime) {
227     LOG(LS_WARNING) << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet "
228                        "is missing absolute send time extension!";
229     return;
230   }
231   IncomingPacketInfo(arrival_time_ms, header.extension.absoluteSendTime,
232                      payload_size, header.ssrc);
233 }
234 
IncomingPacketInfo(int64_t arrival_time_ms,uint32_t send_time_24bits,size_t payload_size,uint32_t ssrc)235 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo(
236     int64_t arrival_time_ms,
237     uint32_t send_time_24bits,
238     size_t payload_size,
239     uint32_t ssrc) {
240   RTC_CHECK(send_time_24bits < (1ul << 24));
241   if (!uma_recorded_) {
242     RTC_HISTOGRAM_ENUMERATION(kBweTypeHistogram, BweNames::kReceiverAbsSendTime,
243                               BweNames::kBweNamesMax);
244     uma_recorded_ = true;
245   }
246   // Shift up send time to use the full 32 bits that inter_arrival works with,
247   // so wrapping works properly.
248   uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
249   int64_t send_time_ms = static_cast<int64_t>(timestamp) * kTimestampToMs;
250 
251   int64_t now_ms = clock_->TimeInMilliseconds();
252   // TODO(holmer): SSRCs are only needed for REMB, should be broken out from
253   // here.
254 
255   // Check if incoming bitrate estimate is valid, and if it needs to be reset.
256   rtc::Optional<uint32_t> incoming_bitrate =
257       incoming_bitrate_.Rate(arrival_time_ms);
258   if (incoming_bitrate) {
259     incoming_bitrate_initialized_ = true;
260   } else if (incoming_bitrate_initialized_) {
261     // Incoming bitrate had a previous valid value, but now not enough data
262     // point are left within the current window. Reset incoming bitrate
263     // estimator so that the window size will only contain new data points.
264     incoming_bitrate_.Reset();
265     incoming_bitrate_initialized_ = false;
266   }
267   incoming_bitrate_.Update(payload_size, arrival_time_ms);
268 
269   if (first_packet_time_ms_ == -1)
270     first_packet_time_ms_ = now_ms;
271 
272   uint32_t ts_delta = 0;
273   int64_t t_delta = 0;
274   int size_delta = 0;
275   bool update_estimate = false;
276   uint32_t target_bitrate_bps = 0;
277   std::vector<uint32_t> ssrcs;
278   {
279     rtc::CritScope lock(&crit_);
280 
281     TimeoutStreams(now_ms);
282     RTC_DCHECK(inter_arrival_.get());
283     RTC_DCHECK(estimator_.get());
284     ssrcs_[ssrc] = now_ms;
285 
286     // For now only try to detect probes while we don't have a valid estimate.
287     // We currently assume that only packets larger than 200 bytes are paced by
288     // the sender.
289     if (payload_size > PacedSender::kMinProbePacketSize &&
290         (!remote_rate_.ValidEstimate() ||
291          now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) {
292       // TODO(holmer): Use a map instead to get correct order?
293       if (total_probes_received_ < kMaxProbePackets) {
294         int send_delta_ms = -1;
295         int recv_delta_ms = -1;
296         if (!probes_.empty()) {
297           send_delta_ms = send_time_ms - probes_.back().send_time_ms;
298           recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms;
299         }
300         LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms
301                      << " ms, recv time=" << arrival_time_ms
302                      << " ms, send delta=" << send_delta_ms
303                      << " ms, recv delta=" << recv_delta_ms << " ms.";
304       }
305       probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size));
306       ++total_probes_received_;
307       // Make sure that a probe which updated the bitrate immediately has an
308       // effect by calling the OnReceiveBitrateChanged callback.
309       if (ProcessClusters(now_ms) == ProbeResult::kBitrateUpdated)
310         update_estimate = true;
311     }
312     if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, now_ms,
313                                       payload_size, &ts_delta, &t_delta,
314                                       &size_delta)) {
315       double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift);
316       estimator_->Update(t_delta, ts_delta_ms, size_delta, detector_.State(),
317                          arrival_time_ms);
318       detector_.Detect(estimator_->offset(), ts_delta_ms,
319                        estimator_->num_of_deltas(), arrival_time_ms);
320     }
321 
322     if (!update_estimate) {
323       // Check if it's time for a periodic update or if we should update because
324       // of an over-use.
325       if (last_update_ms_ == -1 ||
326           now_ms - last_update_ms_ > remote_rate_.GetFeedbackInterval()) {
327         update_estimate = true;
328       } else if (detector_.State() == kBwOverusing) {
329         rtc::Optional<uint32_t> incoming_rate =
330             incoming_bitrate_.Rate(arrival_time_ms);
331         if (incoming_rate &&
332             remote_rate_.TimeToReduceFurther(now_ms, *incoming_rate)) {
333           update_estimate = true;
334         }
335       }
336     }
337 
338     if (update_estimate) {
339       // The first overuse should immediately trigger a new estimate.
340       // We also have to update the estimate immediately if we are overusing
341       // and the target bitrate is too high compared to what we are receiving.
342       const RateControlInput input(detector_.State(),
343                                    incoming_bitrate_.Rate(arrival_time_ms),
344                                    estimator_->var_noise());
345       remote_rate_.Update(&input, now_ms);
346       target_bitrate_bps = remote_rate_.UpdateBandwidthEstimate(now_ms);
347       update_estimate = remote_rate_.ValidEstimate();
348       ssrcs = Keys(ssrcs_);
349     }
350   }
351   if (update_estimate) {
352     last_update_ms_ = now_ms;
353     observer_->OnReceiveBitrateChanged(ssrcs, target_bitrate_bps);
354   }
355 }
356 
Process()357 void RemoteBitrateEstimatorAbsSendTime::Process() {}
358 
TimeUntilNextProcess()359 int64_t RemoteBitrateEstimatorAbsSendTime::TimeUntilNextProcess() {
360   const int64_t kDisabledModuleTime = 1000;
361   return kDisabledModuleTime;
362 }
363 
TimeoutStreams(int64_t now_ms)364 void RemoteBitrateEstimatorAbsSendTime::TimeoutStreams(int64_t now_ms) {
365   for (Ssrcs::iterator it = ssrcs_.begin(); it != ssrcs_.end();) {
366     if ((now_ms - it->second) > kStreamTimeOutMs) {
367       ssrcs_.erase(it++);
368     } else {
369       ++it;
370     }
371   }
372   if (ssrcs_.empty()) {
373     // We can't update the estimate if we don't have any active streams.
374     inter_arrival_.reset(
375         new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000,
376                          kTimestampToMs, true));
377     estimator_.reset(new OveruseEstimator(OverUseDetectorOptions()));
378     // We deliberately don't reset the first_packet_time_ms_ here for now since
379     // we only probe for bandwidth in the beginning of a call right now.
380   }
381 }
382 
OnRttUpdate(int64_t avg_rtt_ms,int64_t max_rtt_ms)383 void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms,
384                                                     int64_t max_rtt_ms) {
385   rtc::CritScope lock(&crit_);
386   remote_rate_.SetRtt(avg_rtt_ms);
387 }
388 
RemoveStream(uint32_t ssrc)389 void RemoteBitrateEstimatorAbsSendTime::RemoveStream(uint32_t ssrc) {
390   rtc::CritScope lock(&crit_);
391   ssrcs_.erase(ssrc);
392 }
393 
LatestEstimate(std::vector<uint32_t> * ssrcs,uint32_t * bitrate_bps) const394 bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate(
395     std::vector<uint32_t>* ssrcs,
396     uint32_t* bitrate_bps) const {
397   // Currently accessed from both the process thread (see
398   // ModuleRtpRtcpImpl::Process()) and the configuration thread (see
399   // Call::GetStats()). Should in the future only be accessed from a single
400   // thread.
401   RTC_DCHECK(ssrcs);
402   RTC_DCHECK(bitrate_bps);
403   rtc::CritScope lock(&crit_);
404   if (!remote_rate_.ValidEstimate()) {
405     return false;
406   }
407   *ssrcs = Keys(ssrcs_);
408   if (ssrcs_.empty()) {
409     *bitrate_bps = 0;
410   } else {
411     *bitrate_bps = remote_rate_.LatestEstimate();
412   }
413   return true;
414 }
415 
SetMinBitrate(int min_bitrate_bps)416 void RemoteBitrateEstimatorAbsSendTime::SetMinBitrate(int min_bitrate_bps) {
417   // Called from both the configuration thread and the network thread. Shouldn't
418   // be called from the network thread in the future.
419   rtc::CritScope lock(&crit_);
420   remote_rate_.SetMinBitrate(min_bitrate_bps);
421 }
422 }  // namespace webrtc
423