1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Portability.h>
20 #include <folly/Try.h>
21 #include <folly/executors/GlobalExecutor.h>
22 #include <folly/experimental/coro/AsyncGenerator.h>
23 #include <thrift/lib/cpp2/async/ClientBufferedStream.h>
24 #include <thrift/lib/cpp2/async/RpcTypes.h>
25 #include <thrift/lib/cpp2/async/ServerGeneratorStream.h>
26 #include <thrift/lib/cpp2/async/ServerPublisherStream.h>
27 #include <thrift/lib/cpp2/async/StreamCallbacks.h>
28 
29 namespace yarpl {
30 namespace flowable {
31 class ThriftStreamShim;
32 } // namespace flowable
33 } // namespace yarpl
34 
35 namespace apache {
36 namespace thrift {
37 
38 template <typename T, bool WithHeader>
39 class ServerStreamMultiPublisher;
40 
41 template <typename T>
42 class ServerStream {
43  public:
44 #if FOLLY_HAS_COROUTINES
ServerStream(folly::coro::AsyncGenerator<T &&> && gen)45   /* implicit */ ServerStream(folly::coro::AsyncGenerator<T&&>&& gen)
46       : fn_(apache::thrift::detail::ServerGeneratorStream::
47                 fromAsyncGenerator<false, T>(std::move(gen))) {}
48 
49   using PayloadAndHeader = apache::thrift::detail::PayloadAndHeader<T>;
ServerStream(folly::coro::AsyncGenerator<PayloadAndHeader &&> && gen)50   /* implicit */ ServerStream(
51       folly::coro::AsyncGenerator<PayloadAndHeader&&>&& gen)
52       : fn_(apache::thrift::detail::ServerGeneratorStream::
53                 fromAsyncGenerator<true, T>(std::move(gen))) {}
54 
55   using promise_type = folly::coro::detail::AsyncGeneratorPromise<T&&, T>;
56 #endif
57 
58   // Completion callback is optional
59   // It may destroy the ServerStreamPublisher object inline
60   // It must not call complete() on the publisher object inline
createPublisher(folly::Function<void ()> onStreamCompleteOrCancel)61   static std::pair<ServerStream<T>, ServerStreamPublisher<T>> createPublisher(
62       folly::Function<void()> onStreamCompleteOrCancel) {
63     return createPublisherImpl<false>(std::move(onStreamCompleteOrCancel));
64   }
65   static std::pair<ServerStream<T>, ServerStreamPublisher<T, true>>
createPublisherWithHeader(folly::Function<void ()> onStreamCompleteOrCancel)66   createPublisherWithHeader(folly::Function<void()> onStreamCompleteOrCancel) {
67     return createPublisherImpl<true>(std::move(onStreamCompleteOrCancel));
68   }
69   static std::pair<ServerStream<T>, ServerStreamPublisher<T>>
createPublisher()70   createPublisher() {
71     return createPublisher([] {});
72   }
73 
createEmpty()74   static ServerStream<T> createEmpty() {
75     auto pair = createPublisher();
76     std::move(pair.second).complete();
77     return std::move(pair.first);
78   }
79 
80   [[deprecated(
81       "Use ScopedServerInterfaceThread instead of invoking handler methods "
82       "directly. This approach changes the threading model and can hide race "
83       "conditions in production code.")]] //
84   ClientBufferedStream<T>
85   toClientStreamUnsafeDoNotUse(
86       folly::EventBase* evb = folly::getUnsafeMutableGlobalEventBase(),
87       int32_t bufferSize = 100) &&;
88 
operator()89   apache::thrift::detail::ServerStreamFactory operator()(
90       folly::Executor::KeepAlive<> serverExecutor,
91       apache::thrift::detail::StreamElementEncoder<T>* encode) {
92     return fn_(std::move(serverExecutor), encode);
93   }
94 
95  private:
ServerStream(apache::thrift::detail::ServerStreamFn<T> fn)96   explicit ServerStream(apache::thrift::detail::ServerStreamFn<T> fn)
97       : fn_(std::move(fn)) {}
98 
99   template <bool WithHeader>
100   static std::pair<ServerStream<T>, ServerStreamPublisher<T, WithHeader>>
createPublisherImpl(folly::Function<void ()> onStreamCompleteOrCancel)101   createPublisherImpl(folly::Function<void()> onStreamCompleteOrCancel) {
102     auto pair =
103         apache::thrift::detail::ServerPublisherStream<T, WithHeader>::create(
104             std::move(onStreamCompleteOrCancel));
105     return std::
106         make_pair<ServerStream<T>, ServerStreamPublisher<T, WithHeader>>(
107             ServerStream<T>(std::move(pair.first)), std::move(pair.second));
108   }
109 
110   apache::thrift::detail::ServerStreamFn<T> fn_;
111 
112   friend class yarpl::flowable::ThriftStreamShim;
113   friend class ServerStreamMultiPublisher<T, false>;
114   friend class ServerStreamMultiPublisher<T, true>;
115 };
116 
117 template <typename Response, typename StreamElement>
118 struct ResponseAndServerStream {
119   using ResponseType = Response;
120   using StreamElementType = StreamElement;
121 
122   Response response;
123   ServerStream<StreamElement> stream;
124 };
125 struct ResponseAndServerStreamFactory {
126   apache::thrift::SerializedResponse response;
127   apache::thrift::detail::ServerStreamFactory stream;
128 };
129 
130 } // namespace thrift
131 } // namespace apache
132 #include <thrift/lib/cpp2/async/ServerStream-inl.h>
133