1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stdint.h>
25 
26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/transport/bdp_estimator.h"
30 #include "src/core/lib/transport/pid_controller.h"
31 
32 struct grpc_chttp2_transport;
33 struct grpc_chttp2_stream;
34 
35 extern grpc_core::TraceFlag grpc_flowctl_trace;
36 
37 namespace grpc {
38 namespace testing {
39 class TrickledCHTTP2;  // to make this a friend
40 }  // namespace testing
41 }  // namespace grpc
42 
43 namespace grpc_core {
44 namespace chttp2 {
45 
46 static constexpr uint32_t kDefaultWindow = 65535;
47 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
48 // TODO(ncteisen): Tune this
49 static constexpr uint32_t kFrameSize = 1024 * 1024;
50 static constexpr const uint32_t kMinInitialWindowSize = 128;
51 static constexpr const uint32_t kMaxInitialWindowSize = (1u << 30);
52 // The maximum per-stream flow control window delta to advertise.
53 static constexpr const uint32_t kMaxWindowDelta = (1u << 20);
54 
55 class TransportFlowControl;
56 class StreamFlowControl;
57 
58 extern bool g_test_only_transport_flow_control_window_check;
59 
60 // Encapsulates a collections of actions the transport needs to take with
61 // regard to flow control. Each action comes with urgencies that tell the
62 // transport how quickly the action must take place.
63 class FlowControlAction {
64  public:
65   enum class Urgency : uint8_t {
66     // Nothing to be done.
67     NO_ACTION_NEEDED = 0,
68     // Initiate a write to update the initial window immediately.
69     UPDATE_IMMEDIATELY,
70     // Push the flow control update into a send buffer, to be sent
71     // out the next time a write is initiated.
72     QUEUE_UPDATE,
73   };
74 
send_stream_update()75   Urgency send_stream_update() const { return send_stream_update_; }
send_transport_update()76   Urgency send_transport_update() const { return send_transport_update_; }
send_initial_window_update()77   Urgency send_initial_window_update() const {
78     return send_initial_window_update_;
79   }
send_max_frame_size_update()80   Urgency send_max_frame_size_update() const {
81     return send_max_frame_size_update_;
82   }
initial_window_size()83   uint32_t initial_window_size() const { return initial_window_size_; }
max_frame_size()84   uint32_t max_frame_size() const { return max_frame_size_; }
85 
set_send_stream_update(Urgency u)86   FlowControlAction& set_send_stream_update(Urgency u) {
87     send_stream_update_ = u;
88     return *this;
89   }
set_send_transport_update(Urgency u)90   FlowControlAction& set_send_transport_update(Urgency u) {
91     send_transport_update_ = u;
92     return *this;
93   }
set_send_initial_window_update(Urgency u,uint32_t update)94   FlowControlAction& set_send_initial_window_update(Urgency u,
95                                                     uint32_t update) {
96     send_initial_window_update_ = u;
97     initial_window_size_ = update;
98     return *this;
99   }
set_send_max_frame_size_update(Urgency u,uint32_t update)100   FlowControlAction& set_send_max_frame_size_update(Urgency u,
101                                                     uint32_t update) {
102     send_max_frame_size_update_ = u;
103     max_frame_size_ = update;
104     return *this;
105   }
106 
107   static const char* UrgencyString(Urgency u);
108   void Trace(grpc_chttp2_transport* t) const;
109 
110  private:
111   Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
112   Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
113   Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
114   Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
115   uint32_t initial_window_size_ = 0;
116   uint32_t max_frame_size_ = 0;
117 };
118 
119 class FlowControlTrace {
120  public:
FlowControlTrace(const char * reason,TransportFlowControl * tfc,StreamFlowControl * sfc)121   FlowControlTrace(const char* reason, TransportFlowControl* tfc,
122                    StreamFlowControl* sfc) {
123     if (enabled_) Init(reason, tfc, sfc);
124   }
125 
~FlowControlTrace()126   ~FlowControlTrace() {
127     if (enabled_) Finish();
128   }
129 
130  private:
131   void Init(const char* reason, TransportFlowControl* tfc,
132             StreamFlowControl* sfc);
133   void Finish();
134 
135   const bool enabled_ = GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace);
136 
137   TransportFlowControl* tfc_;
138   StreamFlowControl* sfc_;
139   const char* reason_;
140   int64_t remote_window_;
141   int64_t target_window_;
142   int64_t announced_window_;
143   int64_t remote_window_delta_;
144   int64_t local_window_delta_;
145   int64_t announced_window_delta_;
146 };
147 
148 // Fat interface with all methods a flow control implementation needs to
149 // support.
150 class TransportFlowControlBase {
151  public:
TransportFlowControlBase()152   TransportFlowControlBase() {}
~TransportFlowControlBase()153   virtual ~TransportFlowControlBase() {}
154 
155   // Is flow control enabled? This is needed in other codepaths like the checks
156   // in parsing and in writing.
157   virtual bool flow_control_enabled() const = 0;
158 
159   // Called to check if the transport needs to send a WINDOW_UPDATE frame
160   virtual uint32_t MaybeSendUpdate(bool /* writing_anyway */) = 0;
161 
162   // Using the protected members, returns and Action to be taken by the
163   // tranport.
164   virtual FlowControlAction MakeAction() = 0;
165 
166   // Using the protected members, returns and Action to be taken by the
167   // tranport. Also checks for updates to our BDP estimate and acts
168   // accordingly.
169   virtual FlowControlAction PeriodicUpdate() = 0;
170 
171   // Called to do bookkeeping when a stream owned by this transport sends
172   // data on the wire
173   virtual void StreamSentData(int64_t /* size */) = 0;
174 
175   // Called to do bookkeeping when a stream owned by this transport receives
176   // data from the wire. Also does error checking for frame size.
177   virtual grpc_error_handle RecvData(int64_t /* incoming_frame_size */) = 0;
178 
179   // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
180   virtual void RecvUpdate(uint32_t /* size */) = 0;
181 
182   // Returns the BdpEstimator held by this object. Caller is responsible for
183   // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
184   // bdp estimator actions inside TransportFlowControl
bdp_estimator()185   virtual BdpEstimator* bdp_estimator() { return nullptr; }
186 
187   // Getters
remote_window()188   int64_t remote_window() const { return remote_window_; }
target_window()189   virtual int64_t target_window() const { return target_initial_window_size_; }
announced_window()190   int64_t announced_window() const { return announced_window_; }
191 
192   // Used in certain benchmarks in which we don't want FlowControl to be a
193   // factor
TestOnlyForceHugeWindow()194   virtual void TestOnlyForceHugeWindow() {}
195 
196  protected:
197   friend class ::grpc::testing::TrickledCHTTP2;
198   int64_t remote_window_ = kDefaultWindow;
199   int64_t target_initial_window_size_ = kDefaultWindow;
200   int64_t announced_window_ = kDefaultWindow;
201 };
202 
203 // Implementation of flow control that does NOTHING. Always returns maximum
204 // values, never initiates writes, and assumes that the remote peer is doing
205 // the same. To be used to narrow down on flow control as the cause of negative
206 // performance.
207 class TransportFlowControlDisabled final : public TransportFlowControlBase {
208  public:
209   // Maxes out all values
210   explicit TransportFlowControlDisabled(grpc_chttp2_transport* t);
211 
flow_control_enabled()212   bool flow_control_enabled() const override { return false; }
213 
214   // Never do anything.
MaybeSendUpdate(bool)215   uint32_t MaybeSendUpdate(bool /* writing_anyway */) override { return 0; }
MakeAction()216   FlowControlAction MakeAction() override { return FlowControlAction(); }
PeriodicUpdate()217   FlowControlAction PeriodicUpdate() override { return FlowControlAction(); }
StreamSentData(int64_t)218   void StreamSentData(int64_t /* size */) override {}
RecvData(int64_t)219   grpc_error_handle RecvData(int64_t /* incoming_frame_size */) override {
220     return GRPC_ERROR_NONE;
221   }
RecvUpdate(uint32_t)222   void RecvUpdate(uint32_t /* size */) override {}
223 };
224 
225 // Implementation of flow control that abides to HTTP/2 spec and attempts
226 // to be as performant as possible.
227 class TransportFlowControl final : public TransportFlowControlBase {
228  public:
229   TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
~TransportFlowControl()230   ~TransportFlowControl() override {}
231 
flow_control_enabled()232   bool flow_control_enabled() const override { return true; }
233 
bdp_probe()234   bool bdp_probe() const { return enable_bdp_probe_; }
235 
236   // returns an announce if we should send a transport update to our peer,
237   // else returns zero; writing_anyway indicates if a write would happen
238   // regardless of the send - if it is false and this function returns non-zero,
239   // this announce will cause a write to occur
240   uint32_t MaybeSendUpdate(bool writing_anyway) override;
241 
242   // Reads the flow control data and returns and actionable struct that will
243   // tell chttp2 exactly what it needs to do
MakeAction()244   FlowControlAction MakeAction() override {
245     return UpdateAction(FlowControlAction());
246   }
247 
248   // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
249   // to perform more complex flow control calculations and return an action
250   // to let chttp2 change its parameters
251   FlowControlAction PeriodicUpdate() override;
252 
StreamSentData(int64_t size)253   void StreamSentData(int64_t size) override { remote_window_ -= size; }
254 
255   grpc_error_handle ValidateRecvData(int64_t incoming_frame_size);
CommitRecvData(int64_t incoming_frame_size)256   void CommitRecvData(int64_t incoming_frame_size) {
257     announced_window_ -= incoming_frame_size;
258   }
259 
RecvData(int64_t incoming_frame_size)260   grpc_error_handle RecvData(int64_t incoming_frame_size) override {
261     FlowControlTrace trace("  data recv", this, nullptr);
262     grpc_error_handle error = ValidateRecvData(incoming_frame_size);
263     if (error != GRPC_ERROR_NONE) return error;
264     CommitRecvData(incoming_frame_size);
265     return GRPC_ERROR_NONE;
266   }
267 
268   // we have received a WINDOW_UPDATE frame for a transport
RecvUpdate(uint32_t size)269   void RecvUpdate(uint32_t size) override {
270     FlowControlTrace trace("t updt recv", this, nullptr);
271     remote_window_ += size;
272   }
273 
274   // See comment above announced_stream_total_over_incoming_window_ for the
275   // logic behind this decision.
target_window()276   int64_t target_window() const override {
277     return static_cast<uint32_t>(
278         std::min(static_cast<int64_t>((1u << 31) - 1),
279                  announced_stream_total_over_incoming_window_ +
280                      target_initial_window_size_));
281   }
282 
transport()283   const grpc_chttp2_transport* transport() const { return t_; }
284 
PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)285   void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
286     if (delta > 0) {
287       announced_stream_total_over_incoming_window_ -= delta;
288     }
289   }
290 
PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)291   void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
292     if (delta > 0) {
293       announced_stream_total_over_incoming_window_ += delta;
294     }
295   }
296 
bdp_estimator()297   BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
298 
TestOnlyForceHugeWindow()299   void TestOnlyForceHugeWindow() override {
300     announced_window_ = 1024 * 1024 * 1024;
301     remote_window_ = 1024 * 1024 * 1024;
302   }
303 
304  private:
305   double TargetLogBdp();
306   double SmoothLogBdp(double value);
307   FlowControlAction::Urgency DeltaUrgency(int64_t value,
308                                           grpc_chttp2_setting_id setting_id);
309 
UpdateAction(FlowControlAction action)310   FlowControlAction UpdateAction(FlowControlAction action) {
311     if (announced_window_ < target_window() / 2) {
312       action.set_send_transport_update(
313           FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
314     }
315     return action;
316   }
317 
318   const grpc_chttp2_transport* const t_;
319 
320   /** calculating what we should give for local window:
321       we track the total amount of flow control over initial window size
322       across all streams: this is data that we want to receive right now (it
323       has an outstanding read)
324       and the total amount of flow control under initial window size across all
325       streams: this is data we've read early
326       we want to adjust incoming_window such that:
327       incoming_window = total_over - max(bdp - total_under, 0) */
328   int64_t announced_stream_total_over_incoming_window_ = 0;
329 
330   /** should we probe bdp? */
331   const bool enable_bdp_probe_;
332 
333   /* bdp estimation */
334   grpc_core::BdpEstimator bdp_estimator_;
335 
336   /* pid controller */
337   grpc_core::PidController pid_controller_;
338   grpc_millis last_pid_update_ = 0;
339 };
340 
341 // Fat interface with all methods a stream flow control implementation needs
342 // to support.
343 class StreamFlowControlBase {
344  public:
StreamFlowControlBase()345   StreamFlowControlBase() {}
~StreamFlowControlBase()346   virtual ~StreamFlowControlBase() {}
347 
348   // Updates an action using the protected members.
UpdateAction(FlowControlAction)349   virtual FlowControlAction UpdateAction(FlowControlAction /* action */) {
350     abort();
351   }
352 
353   // Using the protected members, returns an Action for this stream to be
354   // taken by the tranport.
355   virtual FlowControlAction MakeAction() = 0;
356 
357   // Bookkeeping for when data is sent on this stream.
358   virtual void SentData(int64_t /* outgoing_frame_size */) = 0;
359 
360   // Bookkeeping and error checking for when data is received by this stream.
361   virtual grpc_error_handle RecvData(int64_t /* incoming_frame_size */) = 0;
362 
363   // Called to check if this stream needs to send a WINDOW_UPDATE frame.
364   virtual uint32_t MaybeSendUpdate() = 0;
365 
366   // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
367   virtual void RecvUpdate(uint32_t /* size */) = 0;
368 
369   // Bookkeeping for when a call pulls bytes out of the transport. At this
370   // point we consider the data 'used' and can thus let out peer know we are
371   // ready for more data.
IncomingByteStreamUpdate(size_t,size_t)372   virtual void IncomingByteStreamUpdate(size_t /* max_size_hint */,
373                                         size_t /* have_already */) {
374     abort();
375   }
376 
377   // Used in certain benchmarks in which we don't want FlowControl to be a
378   // factor
TestOnlyForceHugeWindow()379   virtual void TestOnlyForceHugeWindow() {}
380 
381   // Getters
remote_window_delta()382   int64_t remote_window_delta() const { return remote_window_delta_; }
local_window_delta()383   int64_t local_window_delta() const { return local_window_delta_; }
announced_window_delta()384   int64_t announced_window_delta() const { return announced_window_delta_; }
385 
386  protected:
387   friend class ::grpc::testing::TrickledCHTTP2;
388   int64_t remote_window_delta_ = 0;
389   int64_t local_window_delta_ = 0;
390   int64_t announced_window_delta_ = 0;
391 };
392 
393 // Implementation of flow control that does NOTHING. Always returns maximum
394 // values, never initiates writes, and assumes that the remote peer is doing
395 // the same. To be used to narrow down on flow control as the cause of negative
396 // performance.
397 class StreamFlowControlDisabled : public StreamFlowControlBase {
398  public:
UpdateAction(FlowControlAction action)399   FlowControlAction UpdateAction(FlowControlAction action) override {
400     return action;
401   }
MakeAction()402   FlowControlAction MakeAction() override { return FlowControlAction(); }
SentData(int64_t)403   void SentData(int64_t /* outgoing_frame_size */) override {}
RecvData(int64_t)404   grpc_error_handle RecvData(int64_t /* incoming_frame_size */) override {
405     return GRPC_ERROR_NONE;
406   }
MaybeSendUpdate()407   uint32_t MaybeSendUpdate() override { return 0; }
RecvUpdate(uint32_t)408   void RecvUpdate(uint32_t /* size */) override {}
IncomingByteStreamUpdate(size_t,size_t)409   void IncomingByteStreamUpdate(size_t /* max_size_hint */,
410                                 size_t /* have_already */) override {}
411 };
412 
413 // Implementation of flow control that abides to HTTP/2 spec and attempts
414 // to be as performant as possible.
415 class StreamFlowControl final : public StreamFlowControlBase {
416  public:
417   StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
~StreamFlowControl()418   ~StreamFlowControl() override {
419     tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
420   }
421 
422   FlowControlAction UpdateAction(FlowControlAction action) override;
MakeAction()423   FlowControlAction MakeAction() override {
424     return UpdateAction(tfc_->MakeAction());
425   }
426 
427   // we have sent data on the wire, we must track this in our bookkeeping for
428   // the remote peer's flow control.
SentData(int64_t outgoing_frame_size)429   void SentData(int64_t outgoing_frame_size) override {
430     FlowControlTrace tracer("  data sent", tfc_, this);
431     tfc_->StreamSentData(outgoing_frame_size);
432     remote_window_delta_ -= outgoing_frame_size;
433   }
434 
435   // we have received data from the wire
436   grpc_error_handle RecvData(int64_t incoming_frame_size) override;
437 
438   // returns an announce if we should send a stream update to our peer, else
439   // returns zero
440   uint32_t MaybeSendUpdate() override;
441 
442   // we have received a WINDOW_UPDATE frame for a stream
RecvUpdate(uint32_t size)443   void RecvUpdate(uint32_t size) override {
444     FlowControlTrace trace("s updt recv", tfc_, this);
445     remote_window_delta_ += size;
446   }
447 
448   // the application is asking for a certain amount of bytes
449   void IncomingByteStreamUpdate(size_t max_size_hint,
450                                 size_t have_already) override;
451 
remote_window_delta()452   int64_t remote_window_delta() const { return remote_window_delta_; }
local_window_delta()453   int64_t local_window_delta() const { return local_window_delta_; }
announced_window_delta()454   int64_t announced_window_delta() const { return announced_window_delta_; }
455 
stream()456   const grpc_chttp2_stream* stream() const { return s_; }
457 
TestOnlyForceHugeWindow()458   void TestOnlyForceHugeWindow() override {
459     announced_window_delta_ = 1024 * 1024 * 1024;
460     local_window_delta_ = 1024 * 1024 * 1024;
461     remote_window_delta_ = 1024 * 1024 * 1024;
462   }
463 
464  private:
465   TransportFlowControl* const tfc_;
466   const grpc_chttp2_stream* const s_;
467 
UpdateAnnouncedWindowDelta(TransportFlowControl * tfc,int64_t change)468   void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
469     tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
470     announced_window_delta_ += change;
471     tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
472   }
473 };
474 
475 class TestOnlyTransportTargetWindowEstimatesMocker {
476  public:
~TestOnlyTransportTargetWindowEstimatesMocker()477   virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {}
478   virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
479       double current_target) = 0;
480 };
481 
482 extern TestOnlyTransportTargetWindowEstimatesMocker*
483     g_test_only_transport_target_window_estimates_mocker;
484 
485 }  // namespace chttp2
486 }  // namespace grpc_core
487 
488 #endif  // GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
489