1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <folly/Try.h> 20 21 #include <thrift/lib/cpp2/async/StreamCallbacks.h> 22 #include <thrift/lib/cpp2/async/TwoWayBridge.h> 23 24 namespace apache { 25 namespace thrift { 26 27 struct BufferOptions { 28 int32_t chunkSize{100}; 29 size_t memSize{0}; 30 // Used only with memory buffer sized based throttling to cap the 31 // number of credits to the give maxChunks value even if there is 32 // enough buffer. Useful when we expect sudden spikes of large 33 // message payloads. 34 int32_t maxChunkSize{std::numeric_limits<int32_t>::max()}; 35 }; 36 37 namespace detail { 38 39 class ClientStreamConsumer { 40 public: 41 virtual ~ClientStreamConsumer() = default; 42 virtual void consume() = 0; 43 virtual void canceled() = 0; 44 }; 45 46 class ClientStreamBridge; 47 48 // This template explicitly instantiated in ClientStreamBridge.cpp 49 extern template class TwoWayBridge< 50 ClientStreamConsumer, 51 folly::Try<StreamPayload>, 52 ClientStreamBridge, 53 int64_t, 54 ClientStreamBridge>; 55 56 class ClientStreamBridge : public TwoWayBridge< 57 ClientStreamConsumer, 58 folly::Try<StreamPayload>, 59 ClientStreamBridge, 60 int64_t, 61 ClientStreamBridge>, 62 private StreamClientCallback { 63 public: 64 ~ClientStreamBridge() override; 65 66 struct ClientDeleter : Deleter { 67 void operator()(ClientStreamBridge* ptr); 68 }; 69 using ClientPtr = std::unique_ptr<ClientStreamBridge, ClientDeleter>; 70 71 using FirstResponseCallback = FirstResponseClientCallback<ClientPtr>; 72 73 static StreamClientCallback* create(FirstResponseCallback* callback); 74 75 bool wait(ClientStreamConsumer* consumer); 76 77 ClientQueue getMessages(); 78 79 void requestN(int64_t credits); 80 81 void cancel(); 82 83 bool isCanceled(); 84 85 void consume(); 86 87 void canceled(); 88 89 private: 90 explicit ClientStreamBridge(FirstResponseCallback* callback); 91 92 bool onFirstResponse( 93 FirstResponsePayload&& payload, 94 folly::EventBase* evb, 95 StreamServerCallback* streamServerCallback) override; 96 97 void onFirstResponseError(folly::exception_wrapper ew) override; 98 99 bool onStreamNext(StreamPayload&& payload) override; 100 101 void onStreamError(folly::exception_wrapper ew) override; 102 103 void onStreamComplete() override; 104 105 bool onStreamHeaders(HeadersPayload&& payload) override; 106 107 void resetServerCallback(StreamServerCallback& serverCallback) override; 108 109 void processCredits(); 110 111 void serverCleanup(); 112 113 union { 114 FirstResponseCallback* firstResponseCallback_; 115 StreamServerCallback* streamServerCallback_; 116 }; 117 folly::Executor::KeepAlive<> serverExecutor_; 118 }; 119 } // namespace detail 120 } // namespace thrift 121 } // namespace apache 122