1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <memory>
25 #include <set>
26 #include <unordered_map>
27 
28 #include <grpc/support/log.h>
29 #include <grpcpp/impl/codegen/config.h>
30 
31 #include "src/cpp/server/load_reporter/constants.h"
32 
33 namespace grpc {
34 namespace load_reporter {
35 
36 // The load data storage is organized in hierarchy. The LoadDataStore is the
37 // top-level data store. In LoadDataStore, for each host we keep a
38 // PerHostStore, in which for each balancer we keep a PerBalancerStore. Each
39 // PerBalancerStore maintains a map of load records, mapping from LoadRecordKey
40 // to LoadRecordValue. The LoadRecordValue contains a map of customized call
41 // metrics, mapping from a call metric name to the CallMetricValue.
42 
43 // The value of a customized call metric.
44 class CallMetricValue {
45  public:
46   explicit CallMetricValue(uint64_t num_calls = 0,
47                            double total_metric_value = 0)
num_calls_(num_calls)48       : num_calls_(num_calls), total_metric_value_(total_metric_value) {}
49 
MergeFrom(CallMetricValue other)50   void MergeFrom(CallMetricValue other) {
51     num_calls_ += other.num_calls_;
52     total_metric_value_ += other.total_metric_value_;
53   }
54 
55   // Getters.
num_calls()56   uint64_t num_calls() const { return num_calls_; }
total_metric_value()57   double total_metric_value() const { return total_metric_value_; }
58 
59  private:
60   // The number of calls that finished with this metric.
61   uint64_t num_calls_ = 0;
62   // The sum of metric values across all the calls that finished with this
63   // metric.
64   double total_metric_value_ = 0;
65 };
66 
67 // The key of a load record.
68 class LoadRecordKey {
69  public:
LoadRecordKey(grpc::string lb_id,grpc::string lb_tag,grpc::string user_id,grpc::string client_ip_hex)70   LoadRecordKey(grpc::string lb_id, grpc::string lb_tag, grpc::string user_id,
71                 grpc::string client_ip_hex)
72       : lb_id_(std::move(lb_id)),
73         lb_tag_(std::move(lb_tag)),
74         user_id_(std::move(user_id)),
75         client_ip_hex_(std::move(client_ip_hex)) {}
76 
77   // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
78   LoadRecordKey(const grpc::string& client_ip_and_token, grpc::string user_id);
79 
ToString()80   grpc::string ToString() const {
81     return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
82            ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
83            "]";
84   }
85 
86   bool operator==(const LoadRecordKey& other) const {
87     return lb_id_ == other.lb_id_ && lb_tag_ == other.lb_tag_ &&
88            user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
89   }
90 
91   // Gets the client IP bytes in network order (i.e., big-endian).
92   grpc::string GetClientIpBytes() const;
93 
94   // Getters.
lb_id()95   const grpc::string& lb_id() const { return lb_id_; }
lb_tag()96   const grpc::string& lb_tag() const { return lb_tag_; }
user_id()97   const grpc::string& user_id() const { return user_id_; }
client_ip_hex()98   const grpc::string& client_ip_hex() const { return client_ip_hex_; }
99 
100   struct Hasher {
hash_combineHasher101     void hash_combine(size_t* seed, const grpc::string& k) const {
102       *seed ^= std::hash<grpc::string>()(k) + 0x9e3779b9 + (*seed << 6) +
103                (*seed >> 2);
104     }
105 
operatorHasher106     size_t operator()(const LoadRecordKey& k) const {
107       size_t h = 0;
108       hash_combine(&h, k.lb_id_);
109       hash_combine(&h, k.lb_tag_);
110       hash_combine(&h, k.user_id_);
111       hash_combine(&h, k.client_ip_hex_);
112       return h;
113     }
114   };
115 
116  private:
117   grpc::string lb_id_;
118   grpc::string lb_tag_;
119   grpc::string user_id_;
120   grpc::string client_ip_hex_;
121 };
122 
123 // The value of a load record.
124 class LoadRecordValue {
125  public:
126   explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0,
127                            uint64_t error_count = 0, uint64_t bytes_sent = 0,
128                            uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
start_count_(start_count)129       : start_count_(start_count),
130         ok_count_(ok_count),
131         error_count_(error_count),
132         bytes_sent_(bytes_sent),
133         bytes_recv_(bytes_recv),
134         latency_ms_(latency_ms) {}
135 
136   LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
137                   double total_metric_value);
138 
MergeFrom(const LoadRecordValue & other)139   void MergeFrom(const LoadRecordValue& other) {
140     start_count_ += other.start_count_;
141     ok_count_ += other.ok_count_;
142     error_count_ += other.error_count_;
143     bytes_sent_ += other.bytes_sent_;
144     bytes_recv_ += other.bytes_recv_;
145     latency_ms_ += other.latency_ms_;
146     for (const auto& p : other.call_metrics_) {
147       const grpc::string& key = p.first;
148       const CallMetricValue& value = p.second;
149       call_metrics_[key].MergeFrom(value);
150     }
151   }
152 
GetNumCallsInProgressDelta()153   int64_t GetNumCallsInProgressDelta() const {
154     return static_cast<int64_t>(start_count_ - ok_count_ - error_count_);
155   }
156 
ToString()157   grpc::string ToString() const {
158     return "[start_count_=" + grpc::to_string(start_count_) +
159            ", ok_count_=" + grpc::to_string(ok_count_) +
160            ", error_count_=" + grpc::to_string(error_count_) +
161            ", bytes_sent_=" + grpc::to_string(bytes_sent_) +
162            ", bytes_recv_=" + grpc::to_string(bytes_recv_) +
163            ", latency_ms_=" + grpc::to_string(latency_ms_) + ", " +
164            grpc::to_string(call_metrics_.size()) + " other call metric(s)]";
165   }
166 
InsertCallMetric(const grpc::string & metric_name,const CallMetricValue & metric_value)167   bool InsertCallMetric(const grpc::string& metric_name,
168                         const CallMetricValue& metric_value) {
169     return call_metrics_.insert({metric_name, metric_value}).second;
170   }
171 
172   // Getters.
start_count()173   uint64_t start_count() const { return start_count_; }
ok_count()174   uint64_t ok_count() const { return ok_count_; }
error_count()175   uint64_t error_count() const { return error_count_; }
bytes_sent()176   uint64_t bytes_sent() const { return bytes_sent_; }
bytes_recv()177   uint64_t bytes_recv() const { return bytes_recv_; }
latency_ms()178   uint64_t latency_ms() const { return latency_ms_; }
call_metrics()179   const std::unordered_map<grpc::string, CallMetricValue>& call_metrics()
180       const {
181     return call_metrics_;
182   }
183 
184  private:
185   uint64_t start_count_ = 0;
186   uint64_t ok_count_ = 0;
187   uint64_t error_count_ = 0;
188   uint64_t bytes_sent_ = 0;
189   uint64_t bytes_recv_ = 0;
190   uint64_t latency_ms_ = 0;
191   std::unordered_map<grpc::string, CallMetricValue> call_metrics_;
192 };
193 
194 // Stores the data associated with a particular LB ID.
195 class PerBalancerStore {
196  public:
197   using LoadRecordMap =
198       std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>;
199 
PerBalancerStore(grpc::string lb_id,grpc::string load_key)200   PerBalancerStore(grpc::string lb_id, grpc::string load_key)
201       : lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {}
202 
203   // Merge a load record with the given key and value if the store is not
204   // suspended.
205   void MergeRow(const LoadRecordKey& key, const LoadRecordValue& value);
206 
207   // Suspend this store, so that no detailed load data will be recorded.
208   void Suspend();
209   // Resume this store from suspension.
210   void Resume();
211   // Is this store suspended or not?
IsSuspended()212   bool IsSuspended() const { return suspended_; }
213 
IsNumCallsInProgressChangedSinceLastReport()214   bool IsNumCallsInProgressChangedSinceLastReport() const {
215     return num_calls_in_progress_ != last_reported_num_calls_in_progress_;
216   }
217 
218   uint64_t GetNumCallsInProgressForReport();
219 
ToString()220   grpc::string ToString() {
221     return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ +
222            "]";
223   }
224 
ClearLoadRecordMap()225   void ClearLoadRecordMap() { load_record_map_.clear(); }
226 
227   // Getters.
lb_id()228   const grpc::string& lb_id() const { return lb_id_; }
load_key()229   const grpc::string& load_key() const { return load_key_; }
load_record_map()230   const LoadRecordMap& load_record_map() const { return load_record_map_; }
231 
232  private:
233   grpc::string lb_id_;
234   // TODO(juanlishen): Use bytestring protobuf type?
235   grpc::string load_key_;
236   LoadRecordMap load_record_map_;
237   uint64_t num_calls_in_progress_ = 0;
238   uint64_t last_reported_num_calls_in_progress_ = 0;
239   bool suspended_ = false;
240 };
241 
242 // Stores the data associated with a particular host.
243 class PerHostStore {
244  public:
245   // When a report stream is created, a PerBalancerStore is created for the
246   // LB ID (guaranteed unique) associated with that stream. If it is the only
247   // active store, adopt all the orphaned stores. If it is the first created
248   // store, adopt the store of kInvalidLbId.
249   void ReportStreamCreated(const grpc::string& lb_id,
250                            const grpc::string& load_key);
251 
252   // When a report stream is closed, the PerBalancerStores assigned to the
253   // associate LB ID need to be re-assigned to other active balancers,
254   // ideally with the same load key. If there is no active balancer, we have
255   // to suspend those stores and drop the incoming load data until they are
256   // resumed.
257   void ReportStreamClosed(const grpc::string& lb_id);
258 
259   // Returns null if not found. Caller doesn't own the returned store.
260   PerBalancerStore* FindPerBalancerStore(const grpc::string& lb_id) const;
261 
262   // Returns null if lb_id is not found. The returned pointer points to the
263   // underlying data structure, which is not owned by the caller.
264   const std::set<PerBalancerStore*>* GetAssignedStores(
265       const grpc::string& lb_id) const;
266 
267  private:
268   // Creates a PerBalancerStore for the given LB ID, assigns the store to
269   // itself, and records the LB ID to the load key.
270   void SetUpForNewLbId(const grpc::string& lb_id, const grpc::string& load_key);
271 
272   void AssignOrphanedStore(PerBalancerStore* orphaned_store,
273                            const grpc::string& new_receiver);
274 
275   std::unordered_map<grpc::string, std::set<grpc::string>>
276       load_key_to_receiving_lb_ids_;
277 
278   // Key: LB ID. The key set includes all the LB IDs that have been
279   // allocated for reporting streams so far.
280   // Value: the unique pointer to the PerBalancerStore of the LB ID.
281   std::unordered_map<grpc::string, std::unique_ptr<PerBalancerStore>>
282       per_balancer_stores_;
283 
284   // Key: LB ID. The key set includes the LB IDs of the balancers that are
285   // currently receiving report.
286   // Value: the set of raw pointers to the PerBalancerStores assigned to the LB
287   // ID. Note that the sets in assigned_stores_ form a division of the value set
288   // of per_balancer_stores_.
289   std::unordered_map<grpc::string, std::set<PerBalancerStore*>>
290       assigned_stores_;
291 };
292 
293 // Thread-unsafe two-level bookkeeper of all the load data.
294 // Note: We never remove any store objects from this class, as per the
295 // current spec. That's because premature removal of the store objects
296 // may lead to loss of critical information, e.g., mapping from lb_id to
297 // load_key, and the number of in-progress calls. Such loss will cause
298 // information inconsistency when the balancer is re-connected. Keeping
299 // all the stores should be fine for PerHostStore, since we assume there
300 // should only be a few hostnames. But it's a potential problem for
301 // PerBalancerStore.
302 class LoadDataStore {
303  public:
304   // Returns null if not found. Caller doesn't own the returned store.
305   PerBalancerStore* FindPerBalancerStore(const grpc::string& hostname,
306                                          const grpc::string& lb_id) const;
307 
308   // Returns null if hostname or lb_id is not found. The returned pointer points
309   // to the underlying data structure, which is not owned by the caller.
310   const std::set<PerBalancerStore*>* GetAssignedStores(const string& hostname,
311                                                        const string& lb_id);
312 
313   // If a PerBalancerStore can be found by the hostname and LB ID in
314   // LoadRecordKey, the load data will be merged to that store. Otherwise,
315   // only track the number of the in-progress calls for this unknown LB ID.
316   void MergeRow(const grpc::string& hostname, const LoadRecordKey& key,
317                 const LoadRecordValue& value);
318 
319   // Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated
320   // with some received load data but unknown to this load data store)?
IsTrackedUnknownBalancerId(const grpc::string & lb_id)321   bool IsTrackedUnknownBalancerId(const grpc::string& lb_id) const {
322     return unknown_balancer_id_trackers_.find(lb_id) !=
323            unknown_balancer_id_trackers_.end();
324   }
325 
326   // Wrapper around PerHostStore::ReportStreamCreated.
327   void ReportStreamCreated(const grpc::string& hostname,
328                            const grpc::string& lb_id,
329                            const grpc::string& load_key);
330 
331   // Wrapper around PerHostStore::ReportStreamClosed.
332   void ReportStreamClosed(const grpc::string& hostname,
333                           const grpc::string& lb_id);
334 
335  private:
336   // Buffered data that was fetched from Census but hasn't been sent to
337   // balancer. We need to keep this data ourselves because Census will
338   // delete the data once it's returned.
339   std::unordered_map<grpc::string, PerHostStore> per_host_stores_;
340 
341   // Tracks the number of in-progress calls for each unknown LB ID.
342   std::unordered_map<grpc::string, uint64_t> unknown_balancer_id_trackers_;
343 };
344 
345 }  // namespace load_reporter
346 }  // namespace grpc
347 
348 #endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
349