1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stdint.h> 25 26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h" 27 #include "src/core/lib/gpr/useful.h" 28 #include "src/core/lib/gprpp/manual_constructor.h" 29 #include "src/core/lib/transport/bdp_estimator.h" 30 #include "src/core/lib/transport/pid_controller.h" 31 32 struct grpc_chttp2_transport; 33 struct grpc_chttp2_stream; 34 35 extern grpc_core::TraceFlag grpc_flowctl_trace; 36 37 namespace grpc { 38 namespace testing { 39 class TrickledCHTTP2; // to make this a friend 40 } // namespace testing 41 } // namespace grpc 42 43 namespace grpc_core { 44 namespace chttp2 { 45 46 static constexpr uint32_t kDefaultWindow = 65535; 47 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1); 48 // TODO(ncteisen): Tune this 49 static constexpr uint32_t kFrameSize = 1024 * 1024; 50 static constexpr const uint32_t kMinInitialWindowSize = 128; 51 static constexpr const uint32_t kMaxInitialWindowSize = (1u << 30); 52 // The maximum per-stream flow control window delta to advertise. 53 static constexpr const uint32_t kMaxWindowDelta = (1u << 20); 54 55 class TransportFlowControl; 56 class StreamFlowControl; 57 58 extern bool g_test_only_transport_flow_control_window_check; 59 60 // Encapsulates a collections of actions the transport needs to take with 61 // regard to flow control. Each action comes with urgencies that tell the 62 // transport how quickly the action must take place. 63 class FlowControlAction { 64 public: 65 enum class Urgency : uint8_t { 66 // Nothing to be done. 67 NO_ACTION_NEEDED = 0, 68 // Initiate a write to update the initial window immediately. 69 UPDATE_IMMEDIATELY, 70 // Push the flow control update into a send buffer, to be sent 71 // out the next time a write is initiated. 72 QUEUE_UPDATE, 73 }; 74 send_stream_update()75 Urgency send_stream_update() const { return send_stream_update_; } send_transport_update()76 Urgency send_transport_update() const { return send_transport_update_; } send_initial_window_update()77 Urgency send_initial_window_update() const { 78 return send_initial_window_update_; 79 } send_max_frame_size_update()80 Urgency send_max_frame_size_update() const { 81 return send_max_frame_size_update_; 82 } initial_window_size()83 uint32_t initial_window_size() const { return initial_window_size_; } max_frame_size()84 uint32_t max_frame_size() const { return max_frame_size_; } 85 set_send_stream_update(Urgency u)86 FlowControlAction& set_send_stream_update(Urgency u) { 87 send_stream_update_ = u; 88 return *this; 89 } set_send_transport_update(Urgency u)90 FlowControlAction& set_send_transport_update(Urgency u) { 91 send_transport_update_ = u; 92 return *this; 93 } set_send_initial_window_update(Urgency u,uint32_t update)94 FlowControlAction& set_send_initial_window_update(Urgency u, 95 uint32_t update) { 96 send_initial_window_update_ = u; 97 initial_window_size_ = update; 98 return *this; 99 } set_send_max_frame_size_update(Urgency u,uint32_t update)100 FlowControlAction& set_send_max_frame_size_update(Urgency u, 101 uint32_t update) { 102 send_max_frame_size_update_ = u; 103 max_frame_size_ = update; 104 return *this; 105 } 106 107 static const char* UrgencyString(Urgency u); 108 void Trace(grpc_chttp2_transport* t) const; 109 110 private: 111 Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED; 112 Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED; 113 Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED; 114 Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED; 115 uint32_t initial_window_size_ = 0; 116 uint32_t max_frame_size_ = 0; 117 }; 118 119 class FlowControlTrace { 120 public: FlowControlTrace(const char * reason,TransportFlowControl * tfc,StreamFlowControl * sfc)121 FlowControlTrace(const char* reason, TransportFlowControl* tfc, 122 StreamFlowControl* sfc) { 123 if (enabled_) Init(reason, tfc, sfc); 124 } 125 ~FlowControlTrace()126 ~FlowControlTrace() { 127 if (enabled_) Finish(); 128 } 129 130 private: 131 void Init(const char* reason, TransportFlowControl* tfc, 132 StreamFlowControl* sfc); 133 void Finish(); 134 135 const bool enabled_ = GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace); 136 137 TransportFlowControl* tfc_; 138 StreamFlowControl* sfc_; 139 const char* reason_; 140 int64_t remote_window_; 141 int64_t target_window_; 142 int64_t announced_window_; 143 int64_t remote_window_delta_; 144 int64_t local_window_delta_; 145 int64_t announced_window_delta_; 146 }; 147 148 // Fat interface with all methods a flow control implementation needs to 149 // support. 150 class TransportFlowControlBase { 151 public: TransportFlowControlBase()152 TransportFlowControlBase() {} ~TransportFlowControlBase()153 virtual ~TransportFlowControlBase() {} 154 155 // Is flow control enabled? This is needed in other codepaths like the checks 156 // in parsing and in writing. 157 virtual bool flow_control_enabled() const = 0; 158 159 // Called to check if the transport needs to send a WINDOW_UPDATE frame 160 virtual uint32_t MaybeSendUpdate(bool /* writing_anyway */) = 0; 161 162 // Using the protected members, returns and Action to be taken by the 163 // tranport. 164 virtual FlowControlAction MakeAction() = 0; 165 166 // Using the protected members, returns and Action to be taken by the 167 // tranport. Also checks for updates to our BDP estimate and acts 168 // accordingly. 169 virtual FlowControlAction PeriodicUpdate() = 0; 170 171 // Called to do bookkeeping when a stream owned by this transport sends 172 // data on the wire 173 virtual void StreamSentData(int64_t /* size */) = 0; 174 175 // Called to do bookkeeping when a stream owned by this transport receives 176 // data from the wire. Also does error checking for frame size. 177 virtual grpc_error_handle RecvData(int64_t /* incoming_frame_size */) = 0; 178 179 // Called to do bookkeeping when we receive a WINDOW_UPDATE frame. 180 virtual void RecvUpdate(uint32_t /* size */) = 0; 181 182 // Returns the BdpEstimator held by this object. Caller is responsible for 183 // checking for nullptr. TODO(ncteisen): consider fully encapsulating all 184 // bdp estimator actions inside TransportFlowControl bdp_estimator()185 virtual BdpEstimator* bdp_estimator() { return nullptr; } 186 187 // Getters remote_window()188 int64_t remote_window() const { return remote_window_; } target_window()189 virtual int64_t target_window() const { return target_initial_window_size_; } announced_window()190 int64_t announced_window() const { return announced_window_; } 191 192 // Used in certain benchmarks in which we don't want FlowControl to be a 193 // factor TestOnlyForceHugeWindow()194 virtual void TestOnlyForceHugeWindow() {} 195 196 protected: 197 friend class ::grpc::testing::TrickledCHTTP2; 198 int64_t remote_window_ = kDefaultWindow; 199 int64_t target_initial_window_size_ = kDefaultWindow; 200 int64_t announced_window_ = kDefaultWindow; 201 }; 202 203 // Implementation of flow control that does NOTHING. Always returns maximum 204 // values, never initiates writes, and assumes that the remote peer is doing 205 // the same. To be used to narrow down on flow control as the cause of negative 206 // performance. 207 class TransportFlowControlDisabled final : public TransportFlowControlBase { 208 public: 209 // Maxes out all values 210 explicit TransportFlowControlDisabled(grpc_chttp2_transport* t); 211 flow_control_enabled()212 bool flow_control_enabled() const override { return false; } 213 214 // Never do anything. MaybeSendUpdate(bool)215 uint32_t MaybeSendUpdate(bool /* writing_anyway */) override { return 0; } MakeAction()216 FlowControlAction MakeAction() override { return FlowControlAction(); } PeriodicUpdate()217 FlowControlAction PeriodicUpdate() override { return FlowControlAction(); } StreamSentData(int64_t)218 void StreamSentData(int64_t /* size */) override {} RecvData(int64_t)219 grpc_error_handle RecvData(int64_t /* incoming_frame_size */) override { 220 return GRPC_ERROR_NONE; 221 } RecvUpdate(uint32_t)222 void RecvUpdate(uint32_t /* size */) override {} 223 }; 224 225 // Implementation of flow control that abides to HTTP/2 spec and attempts 226 // to be as performant as possible. 227 class TransportFlowControl final : public TransportFlowControlBase { 228 public: 229 TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe); ~TransportFlowControl()230 ~TransportFlowControl() override {} 231 flow_control_enabled()232 bool flow_control_enabled() const override { return true; } 233 bdp_probe()234 bool bdp_probe() const { return enable_bdp_probe_; } 235 236 // returns an announce if we should send a transport update to our peer, 237 // else returns zero; writing_anyway indicates if a write would happen 238 // regardless of the send - if it is false and this function returns non-zero, 239 // this announce will cause a write to occur 240 uint32_t MaybeSendUpdate(bool writing_anyway) override; 241 242 // Reads the flow control data and returns and actionable struct that will 243 // tell chttp2 exactly what it needs to do MakeAction()244 FlowControlAction MakeAction() override { 245 return UpdateAction(FlowControlAction()); 246 } 247 248 // Call periodically (at a low-ish rate, 100ms - 10s makes sense) 249 // to perform more complex flow control calculations and return an action 250 // to let chttp2 change its parameters 251 FlowControlAction PeriodicUpdate() override; 252 StreamSentData(int64_t size)253 void StreamSentData(int64_t size) override { remote_window_ -= size; } 254 255 grpc_error_handle ValidateRecvData(int64_t incoming_frame_size); CommitRecvData(int64_t incoming_frame_size)256 void CommitRecvData(int64_t incoming_frame_size) { 257 announced_window_ -= incoming_frame_size; 258 } 259 RecvData(int64_t incoming_frame_size)260 grpc_error_handle RecvData(int64_t incoming_frame_size) override { 261 FlowControlTrace trace(" data recv", this, nullptr); 262 grpc_error_handle error = ValidateRecvData(incoming_frame_size); 263 if (error != GRPC_ERROR_NONE) return error; 264 CommitRecvData(incoming_frame_size); 265 return GRPC_ERROR_NONE; 266 } 267 268 // we have received a WINDOW_UPDATE frame for a transport RecvUpdate(uint32_t size)269 void RecvUpdate(uint32_t size) override { 270 FlowControlTrace trace("t updt recv", this, nullptr); 271 remote_window_ += size; 272 } 273 274 // See comment above announced_stream_total_over_incoming_window_ for the 275 // logic behind this decision. target_window()276 int64_t target_window() const override { 277 return static_cast<uint32_t>( 278 std::min(static_cast<int64_t>((1u << 31) - 1), 279 announced_stream_total_over_incoming_window_ + 280 target_initial_window_size_)); 281 } 282 transport()283 const grpc_chttp2_transport* transport() const { return t_; } 284 PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)285 void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { 286 if (delta > 0) { 287 announced_stream_total_over_incoming_window_ -= delta; 288 } 289 } 290 PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)291 void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { 292 if (delta > 0) { 293 announced_stream_total_over_incoming_window_ += delta; 294 } 295 } 296 bdp_estimator()297 BdpEstimator* bdp_estimator() override { return &bdp_estimator_; } 298 TestOnlyForceHugeWindow()299 void TestOnlyForceHugeWindow() override { 300 announced_window_ = 1024 * 1024 * 1024; 301 remote_window_ = 1024 * 1024 * 1024; 302 } 303 304 private: 305 double TargetLogBdp(); 306 double SmoothLogBdp(double value); 307 FlowControlAction::Urgency DeltaUrgency(int64_t value, 308 grpc_chttp2_setting_id setting_id); 309 UpdateAction(FlowControlAction action)310 FlowControlAction UpdateAction(FlowControlAction action) { 311 if (announced_window_ < target_window() / 2) { 312 action.set_send_transport_update( 313 FlowControlAction::Urgency::UPDATE_IMMEDIATELY); 314 } 315 return action; 316 } 317 318 const grpc_chttp2_transport* const t_; 319 320 /** calculating what we should give for local window: 321 we track the total amount of flow control over initial window size 322 across all streams: this is data that we want to receive right now (it 323 has an outstanding read) 324 and the total amount of flow control under initial window size across all 325 streams: this is data we've read early 326 we want to adjust incoming_window such that: 327 incoming_window = total_over - max(bdp - total_under, 0) */ 328 int64_t announced_stream_total_over_incoming_window_ = 0; 329 330 /** should we probe bdp? */ 331 const bool enable_bdp_probe_; 332 333 /* bdp estimation */ 334 grpc_core::BdpEstimator bdp_estimator_; 335 336 /* pid controller */ 337 grpc_core::PidController pid_controller_; 338 grpc_millis last_pid_update_ = 0; 339 }; 340 341 // Fat interface with all methods a stream flow control implementation needs 342 // to support. 343 class StreamFlowControlBase { 344 public: StreamFlowControlBase()345 StreamFlowControlBase() {} ~StreamFlowControlBase()346 virtual ~StreamFlowControlBase() {} 347 348 // Updates an action using the protected members. UpdateAction(FlowControlAction)349 virtual FlowControlAction UpdateAction(FlowControlAction /* action */) { 350 abort(); 351 } 352 353 // Using the protected members, returns an Action for this stream to be 354 // taken by the tranport. 355 virtual FlowControlAction MakeAction() = 0; 356 357 // Bookkeeping for when data is sent on this stream. 358 virtual void SentData(int64_t /* outgoing_frame_size */) = 0; 359 360 // Bookkeeping and error checking for when data is received by this stream. 361 virtual grpc_error_handle RecvData(int64_t /* incoming_frame_size */) = 0; 362 363 // Called to check if this stream needs to send a WINDOW_UPDATE frame. 364 virtual uint32_t MaybeSendUpdate() = 0; 365 366 // Bookkeeping for receiving a WINDOW_UPDATE from for this stream. 367 virtual void RecvUpdate(uint32_t /* size */) = 0; 368 369 // Bookkeeping for when a call pulls bytes out of the transport. At this 370 // point we consider the data 'used' and can thus let out peer know we are 371 // ready for more data. IncomingByteStreamUpdate(size_t,size_t)372 virtual void IncomingByteStreamUpdate(size_t /* max_size_hint */, 373 size_t /* have_already */) { 374 abort(); 375 } 376 377 // Used in certain benchmarks in which we don't want FlowControl to be a 378 // factor TestOnlyForceHugeWindow()379 virtual void TestOnlyForceHugeWindow() {} 380 381 // Getters remote_window_delta()382 int64_t remote_window_delta() const { return remote_window_delta_; } local_window_delta()383 int64_t local_window_delta() const { return local_window_delta_; } announced_window_delta()384 int64_t announced_window_delta() const { return announced_window_delta_; } 385 386 protected: 387 friend class ::grpc::testing::TrickledCHTTP2; 388 int64_t remote_window_delta_ = 0; 389 int64_t local_window_delta_ = 0; 390 int64_t announced_window_delta_ = 0; 391 }; 392 393 // Implementation of flow control that does NOTHING. Always returns maximum 394 // values, never initiates writes, and assumes that the remote peer is doing 395 // the same. To be used to narrow down on flow control as the cause of negative 396 // performance. 397 class StreamFlowControlDisabled : public StreamFlowControlBase { 398 public: UpdateAction(FlowControlAction action)399 FlowControlAction UpdateAction(FlowControlAction action) override { 400 return action; 401 } MakeAction()402 FlowControlAction MakeAction() override { return FlowControlAction(); } SentData(int64_t)403 void SentData(int64_t /* outgoing_frame_size */) override {} RecvData(int64_t)404 grpc_error_handle RecvData(int64_t /* incoming_frame_size */) override { 405 return GRPC_ERROR_NONE; 406 } MaybeSendUpdate()407 uint32_t MaybeSendUpdate() override { return 0; } RecvUpdate(uint32_t)408 void RecvUpdate(uint32_t /* size */) override {} IncomingByteStreamUpdate(size_t,size_t)409 void IncomingByteStreamUpdate(size_t /* max_size_hint */, 410 size_t /* have_already */) override {} 411 }; 412 413 // Implementation of flow control that abides to HTTP/2 spec and attempts 414 // to be as performant as possible. 415 class StreamFlowControl final : public StreamFlowControlBase { 416 public: 417 StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); ~StreamFlowControl()418 ~StreamFlowControl() override { 419 tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 420 } 421 422 FlowControlAction UpdateAction(FlowControlAction action) override; MakeAction()423 FlowControlAction MakeAction() override { 424 return UpdateAction(tfc_->MakeAction()); 425 } 426 427 // we have sent data on the wire, we must track this in our bookkeeping for 428 // the remote peer's flow control. SentData(int64_t outgoing_frame_size)429 void SentData(int64_t outgoing_frame_size) override { 430 FlowControlTrace tracer(" data sent", tfc_, this); 431 tfc_->StreamSentData(outgoing_frame_size); 432 remote_window_delta_ -= outgoing_frame_size; 433 } 434 435 // we have received data from the wire 436 grpc_error_handle RecvData(int64_t incoming_frame_size) override; 437 438 // returns an announce if we should send a stream update to our peer, else 439 // returns zero 440 uint32_t MaybeSendUpdate() override; 441 442 // we have received a WINDOW_UPDATE frame for a stream RecvUpdate(uint32_t size)443 void RecvUpdate(uint32_t size) override { 444 FlowControlTrace trace("s updt recv", tfc_, this); 445 remote_window_delta_ += size; 446 } 447 448 // the application is asking for a certain amount of bytes 449 void IncomingByteStreamUpdate(size_t max_size_hint, 450 size_t have_already) override; 451 remote_window_delta()452 int64_t remote_window_delta() const { return remote_window_delta_; } local_window_delta()453 int64_t local_window_delta() const { return local_window_delta_; } announced_window_delta()454 int64_t announced_window_delta() const { return announced_window_delta_; } 455 stream()456 const grpc_chttp2_stream* stream() const { return s_; } 457 TestOnlyForceHugeWindow()458 void TestOnlyForceHugeWindow() override { 459 announced_window_delta_ = 1024 * 1024 * 1024; 460 local_window_delta_ = 1024 * 1024 * 1024; 461 remote_window_delta_ = 1024 * 1024 * 1024; 462 } 463 464 private: 465 TransportFlowControl* const tfc_; 466 const grpc_chttp2_stream* const s_; 467 UpdateAnnouncedWindowDelta(TransportFlowControl * tfc,int64_t change)468 void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) { 469 tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 470 announced_window_delta_ += change; 471 tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 472 } 473 }; 474 475 class TestOnlyTransportTargetWindowEstimatesMocker { 476 public: ~TestOnlyTransportTargetWindowEstimatesMocker()477 virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {} 478 virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( 479 double current_target) = 0; 480 }; 481 482 extern TestOnlyTransportTargetWindowEstimatesMocker* 483 g_test_only_transport_target_window_estimates_mocker; 484 485 } // namespace chttp2 486 } // namespace grpc_core 487 488 #endif // GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 489