1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Try.h>
20 
21 #include <thrift/lib/cpp2/async/StreamCallbacks.h>
22 #include <thrift/lib/cpp2/async/TwoWayBridge.h>
23 
24 namespace apache {
25 namespace thrift {
26 
27 struct BufferOptions {
28   int32_t chunkSize{100};
29   size_t memSize{0};
30   // Used only with memory buffer sized based throttling to cap the
31   // number of credits to the give maxChunks value even if there is
32   // enough buffer. Useful when we expect sudden spikes of large
33   // message payloads.
34   int32_t maxChunkSize{std::numeric_limits<int32_t>::max()};
35 };
36 
37 namespace detail {
38 
39 class ClientStreamConsumer {
40  public:
41   virtual ~ClientStreamConsumer() = default;
42   virtual void consume() = 0;
43   virtual void canceled() = 0;
44 };
45 
46 class ClientStreamBridge;
47 
48 // This template explicitly instantiated in ClientStreamBridge.cpp
49 extern template class TwoWayBridge<
50     ClientStreamConsumer,
51     folly::Try<StreamPayload>,
52     ClientStreamBridge,
53     int64_t,
54     ClientStreamBridge>;
55 
56 class ClientStreamBridge : public TwoWayBridge<
57                                ClientStreamConsumer,
58                                folly::Try<StreamPayload>,
59                                ClientStreamBridge,
60                                int64_t,
61                                ClientStreamBridge>,
62                            private StreamClientCallback {
63  public:
64   ~ClientStreamBridge() override;
65 
66   struct ClientDeleter : Deleter {
67     void operator()(ClientStreamBridge* ptr);
68   };
69   using ClientPtr = std::unique_ptr<ClientStreamBridge, ClientDeleter>;
70 
71   using FirstResponseCallback = FirstResponseClientCallback<ClientPtr>;
72 
73   static StreamClientCallback* create(FirstResponseCallback* callback);
74 
75   bool wait(ClientStreamConsumer* consumer);
76 
77   ClientQueue getMessages();
78 
79   void requestN(int64_t credits);
80 
81   void cancel();
82 
83   bool isCanceled();
84 
85   void consume();
86 
87   void canceled();
88 
89  private:
90   explicit ClientStreamBridge(FirstResponseCallback* callback);
91 
92   bool onFirstResponse(
93       FirstResponsePayload&& payload,
94       folly::EventBase* evb,
95       StreamServerCallback* streamServerCallback) override;
96 
97   void onFirstResponseError(folly::exception_wrapper ew) override;
98 
99   bool onStreamNext(StreamPayload&& payload) override;
100 
101   void onStreamError(folly::exception_wrapper ew) override;
102 
103   void onStreamComplete() override;
104 
105   bool onStreamHeaders(HeadersPayload&& payload) override;
106 
107   void resetServerCallback(StreamServerCallback& serverCallback) override;
108 
109   void processCredits();
110 
111   void serverCleanup();
112 
113   union {
114     FirstResponseCallback* firstResponseCallback_;
115     StreamServerCallback* streamServerCallback_;
116   };
117   folly::Executor::KeepAlive<> serverExecutor_;
118 };
119 } // namespace detail
120 } // namespace thrift
121 } // namespace apache
122