1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/nqe/observation_buffer.h"
6 
7 #include <float.h>
8 
9 #include <algorithm>
10 #include <utility>
11 
12 #include "base/macros.h"
13 #include "base/numerics/ranges.h"
14 #include "base/stl_util.h"
15 #include "base/time/default_tick_clock.h"
16 #include "base/time/time.h"
17 #include "net/nqe/network_quality_estimator_params.h"
18 #include "net/nqe/weighted_observation.h"
19 
20 namespace net {
21 
22 namespace nqe {
23 
24 namespace internal {
25 CanonicalStats::CanonicalStats() = default;
26 
CanonicalStats(std::map<int32_t,int32_t> & canonical_pcts,int32_t most_recent_val,size_t observation_count)27 CanonicalStats::CanonicalStats(std::map<int32_t, int32_t>& canonical_pcts,
28                                int32_t most_recent_val,
29                                size_t observation_count)
30     : canonical_pcts(canonical_pcts),
31       most_recent_val(most_recent_val),
32       observation_count(observation_count) {}
33 
CanonicalStats(const CanonicalStats & other)34 CanonicalStats::CanonicalStats(const CanonicalStats& other)
35     : canonical_pcts(other.canonical_pcts),
36       most_recent_val(other.most_recent_val),
37       observation_count(other.observation_count) {}
38 
39 CanonicalStats::~CanonicalStats() = default;
40 
41 CanonicalStats& CanonicalStats::operator=(const CanonicalStats& other) =
42     default;
43 
ObservationBuffer(const NetworkQualityEstimatorParams * params,const base::TickClock * tick_clock,double weight_multiplier_per_second,double weight_multiplier_per_signal_level)44 ObservationBuffer::ObservationBuffer(
45     const NetworkQualityEstimatorParams* params,
46     const base::TickClock* tick_clock,
47     double weight_multiplier_per_second,
48     double weight_multiplier_per_signal_level)
49     : params_(params),
50       weight_multiplier_per_second_(weight_multiplier_per_second),
51       weight_multiplier_per_signal_level_(weight_multiplier_per_signal_level),
52       tick_clock_(tick_clock) {
53   DCHECK_LT(0u, params_->observation_buffer_size());
54   DCHECK_LE(0.0, weight_multiplier_per_second_);
55   DCHECK_GE(1.0, weight_multiplier_per_second_);
56   DCHECK_LE(0.0, weight_multiplier_per_signal_level_);
57   DCHECK_GE(1.0, weight_multiplier_per_signal_level_);
58   DCHECK(params_);
59   DCHECK(tick_clock_);
60 }
61 
ObservationBuffer(const ObservationBuffer & other)62 ObservationBuffer::ObservationBuffer(const ObservationBuffer& other)
63     : params_(other.params_),
64       weight_multiplier_per_second_(other.weight_multiplier_per_second_),
65       weight_multiplier_per_signal_level_(
66           other.weight_multiplier_per_signal_level_),
67       tick_clock_(other.tick_clock_) {
68   DCHECK(other.observations_.empty());
69 }
70 
71 ObservationBuffer::~ObservationBuffer() = default;
72 
AddObservation(const Observation & observation)73 void ObservationBuffer::AddObservation(const Observation& observation) {
74   DCHECK_LE(observations_.size(), params_->observation_buffer_size());
75 
76   // Observations must be in the non-decreasing order of the timestamps.
77   DCHECK(observations_.empty() ||
78          observation.timestamp() >= observations_.back().timestamp());
79 
80   DCHECK(observation.signal_strength() == INT32_MIN ||
81          (observation.signal_strength() >= 0 &&
82           observation.signal_strength() <= 4));
83 
84   // Evict the oldest element if the buffer is already full.
85   if (observations_.size() == params_->observation_buffer_size())
86     observations_.pop_front();
87 
88   observations_.push_back(observation);
89   DCHECK_LE(observations_.size(), params_->observation_buffer_size());
90 }
91 
GetPercentile(base::TimeTicks begin_timestamp,int32_t current_signal_strength,int percentile,size_t * observations_count) const92 base::Optional<int32_t> ObservationBuffer::GetPercentile(
93     base::TimeTicks begin_timestamp,
94     int32_t current_signal_strength,
95     int percentile,
96     size_t* observations_count) const {
97   DCHECK(current_signal_strength == INT32_MIN ||
98          (current_signal_strength >= 0 && current_signal_strength <= 4));
99 
100   // Stores weighted observations in increasing order by value.
101   std::vector<WeightedObservation> weighted_observations;
102 
103   // Total weight of all observations in |weighted_observations|.
104   double total_weight = 0.0;
105 
106   ComputeWeightedObservations(begin_timestamp, current_signal_strength,
107                               &weighted_observations, &total_weight);
108 
109   if (observations_count) {
110     // |observations_count| may be null.
111     *observations_count = weighted_observations.size();
112   }
113 
114   if (weighted_observations.empty())
115     return base::nullopt;
116 
117   double desired_weight = percentile / 100.0 * total_weight;
118 
119   double cumulative_weight_seen_so_far = 0.0;
120   for (const auto& weighted_observation : weighted_observations) {
121     cumulative_weight_seen_so_far += weighted_observation.weight;
122     if (cumulative_weight_seen_so_far >= desired_weight)
123       return weighted_observation.value;
124   }
125 
126   // Computation may reach here due to floating point errors. This may happen
127   // if |percentile| was 100 (or close to 100), and |desired_weight| was
128   // slightly larger than |total_weight| (due to floating point errors).
129   // In this case, we return the highest |value| among all observations.
130   // This is same as value of the last observation in the sorted vector.
131   return weighted_observations.at(weighted_observations.size() - 1).value;
132 }
133 
134 std::map<IPHash, CanonicalStats>
GetCanonicalStatsKeyedByHosts(const base::TimeTicks & begin_timestamp,const std::set<IPHash> & target_hosts) const135 ObservationBuffer::GetCanonicalStatsKeyedByHosts(
136     const base::TimeTicks& begin_timestamp,
137     const std::set<IPHash>& target_hosts) const {
138   DCHECK_GE(Capacity(), Size());
139 
140   // Computes for all hosts if |target_hosts| is empty. Otherwise, only
141   // updates map entries for hosts in |target_hosts| and ignores observations
142   // from other hosts.
143   bool filter_on_target_hosts = !(target_hosts.empty());
144 
145   // Split observations into several subgroups keyed by their corresponding
146   // hosts. Skip observations without a host tag. Filter observations based
147   // on begin_timestamp. If |target_hosts| is not empty, filter obesrvations
148   // that do not belong to any host in the set.
149   std::map<IPHash, std::vector<int32_t>> host_keyed_observations;
150   for (const auto& observation : observations_) {
151     if (!observation.host())
152       continue;
153     if (observation.timestamp() < begin_timestamp)
154       continue;
155     // Skip zero values. Transport RTTs can have zero values in the beginning
156     // of a connection. It happens because the implementation of TCP's
157     // Exponentially Weighted Moving Average (EWMA) starts from zero.
158     if (observation.value() < 1)
159       continue;
160 
161     IPHash host = observation.host().value();
162     if (filter_on_target_hosts && target_hosts.find(host) == target_hosts.end())
163       continue;
164 
165     // Create the map entry if it did not already exist.
166     host_keyed_observations.emplace(host, std::vector<int32_t>());
167     host_keyed_observations[host].push_back(observation.value());
168   }
169 
170   std::map<IPHash, CanonicalStats> host_keyed_stats;
171   if (host_keyed_observations.empty())
172     return host_keyed_stats;
173 
174   // Calculate the canonical percentile values for each host.
175   for (auto& host_observations : host_keyed_observations) {
176     const IPHash& host = host_observations.first;
177     auto& observations = host_observations.second;
178     host_keyed_stats.emplace(host, CanonicalStats());
179     size_t count = observations.size();
180 
181     std::sort(observations.begin(), observations.end());
182     for (size_t i = 0; i < base::size(kCanonicalPercentiles); ++i) {
183       int pct_index = (count - 1) * kCanonicalPercentiles[i] / 100;
184       host_keyed_stats[host].canonical_pcts[kCanonicalPercentiles[i]] =
185           observations[pct_index];
186     }
187     host_keyed_stats[host].most_recent_val = observations.back();
188     host_keyed_stats[host].observation_count = count;
189   }
190   return host_keyed_stats;
191 }
192 
RemoveObservationsWithSource(bool deleted_observation_sources[NETWORK_QUALITY_OBSERVATION_SOURCE_MAX])193 void ObservationBuffer::RemoveObservationsWithSource(
194     bool deleted_observation_sources[NETWORK_QUALITY_OBSERVATION_SOURCE_MAX]) {
195   base::EraseIf(observations_,
196                 [deleted_observation_sources](const Observation& observation) {
197                   return deleted_observation_sources[static_cast<size_t>(
198                       observation.source())];
199                 });
200 }
201 
ComputeWeightedObservations(const base::TimeTicks & begin_timestamp,int32_t current_signal_strength,std::vector<WeightedObservation> * weighted_observations,double * total_weight) const202 void ObservationBuffer::ComputeWeightedObservations(
203     const base::TimeTicks& begin_timestamp,
204     int32_t current_signal_strength,
205     std::vector<WeightedObservation>* weighted_observations,
206     double* total_weight) const {
207   DCHECK_GE(Capacity(), Size());
208 
209   weighted_observations->clear();
210   double total_weight_observations = 0.0;
211   base::TimeTicks now = tick_clock_->NowTicks();
212 
213   for (const auto& observation : observations_) {
214     if (observation.timestamp() < begin_timestamp)
215       continue;
216 
217     base::TimeDelta time_since_sample_taken = now - observation.timestamp();
218     double time_weight =
219         pow(weight_multiplier_per_second_, time_since_sample_taken.InSeconds());
220 
221     double signal_strength_weight = 1.0;
222     if (current_signal_strength >= 0 && observation.signal_strength() >= 0) {
223       int32_t signal_strength_weight_diff =
224           std::abs(current_signal_strength - observation.signal_strength());
225       signal_strength_weight =
226           pow(weight_multiplier_per_signal_level_, signal_strength_weight_diff);
227     }
228 
229     double weight = time_weight * signal_strength_weight;
230     weight = base::ClampToRange(weight, DBL_MIN, 1.0);
231 
232     weighted_observations->push_back(
233         WeightedObservation(observation.value(), weight));
234     total_weight_observations += weight;
235   }
236 
237   // Sort the samples by value in ascending order.
238   std::sort(weighted_observations->begin(), weighted_observations->end());
239   *total_weight = total_weight_observations;
240 
241   DCHECK_LE(0.0, *total_weight);
242   DCHECK(weighted_observations->empty() || 0.0 < *total_weight);
243 
244   // |weighted_observations| may have a smaller size than |observations_|
245   // since the former contains only the observations later than
246   // |begin_timestamp|.
247   DCHECK_GE(observations_.size(), weighted_observations->size());
248 }
249 
Capacity() const250 size_t ObservationBuffer::Capacity() const {
251   return params_->observation_buffer_size();
252 }
253 
254 }  // namespace internal
255 
256 }  // namespace nqe
257 
258 }  // namespace net
259