1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <folly/Portability.h> 20 #include <folly/Try.h> 21 #include <folly/executors/GlobalExecutor.h> 22 #include <folly/experimental/coro/AsyncGenerator.h> 23 #include <thrift/lib/cpp2/async/ClientBufferedStream.h> 24 #include <thrift/lib/cpp2/async/RpcTypes.h> 25 #include <thrift/lib/cpp2/async/ServerGeneratorStream.h> 26 #include <thrift/lib/cpp2/async/ServerPublisherStream.h> 27 #include <thrift/lib/cpp2/async/StreamCallbacks.h> 28 29 namespace yarpl { 30 namespace flowable { 31 class ThriftStreamShim; 32 } // namespace flowable 33 } // namespace yarpl 34 35 namespace apache { 36 namespace thrift { 37 38 template <typename T, bool WithHeader> 39 class ServerStreamMultiPublisher; 40 41 template <typename T> 42 class ServerStream { 43 public: 44 #if FOLLY_HAS_COROUTINES ServerStream(folly::coro::AsyncGenerator<T &&> && gen)45 /* implicit */ ServerStream(folly::coro::AsyncGenerator<T&&>&& gen) 46 : fn_(apache::thrift::detail::ServerGeneratorStream:: 47 fromAsyncGenerator<false, T>(std::move(gen))) {} 48 49 using PayloadAndHeader = apache::thrift::detail::PayloadAndHeader<T>; ServerStream(folly::coro::AsyncGenerator<PayloadAndHeader &&> && gen)50 /* implicit */ ServerStream( 51 folly::coro::AsyncGenerator<PayloadAndHeader&&>&& gen) 52 : fn_(apache::thrift::detail::ServerGeneratorStream:: 53 fromAsyncGenerator<true, T>(std::move(gen))) {} 54 55 using promise_type = folly::coro::detail::AsyncGeneratorPromise<T&&, T>; 56 #endif 57 58 // Completion callback is optional 59 // It may destroy the ServerStreamPublisher object inline 60 // It must not call complete() on the publisher object inline createPublisher(folly::Function<void ()> onStreamCompleteOrCancel)61 static std::pair<ServerStream<T>, ServerStreamPublisher<T>> createPublisher( 62 folly::Function<void()> onStreamCompleteOrCancel) { 63 return createPublisherImpl<false>(std::move(onStreamCompleteOrCancel)); 64 } 65 static std::pair<ServerStream<T>, ServerStreamPublisher<T, true>> createPublisherWithHeader(folly::Function<void ()> onStreamCompleteOrCancel)66 createPublisherWithHeader(folly::Function<void()> onStreamCompleteOrCancel) { 67 return createPublisherImpl<true>(std::move(onStreamCompleteOrCancel)); 68 } 69 static std::pair<ServerStream<T>, ServerStreamPublisher<T>> createPublisher()70 createPublisher() { 71 return createPublisher([] {}); 72 } 73 createEmpty()74 static ServerStream<T> createEmpty() { 75 auto pair = createPublisher(); 76 std::move(pair.second).complete(); 77 return std::move(pair.first); 78 } 79 80 [[deprecated( 81 "Use ScopedServerInterfaceThread instead of invoking handler methods " 82 "directly. This approach changes the threading model and can hide race " 83 "conditions in production code.")]] // 84 ClientBufferedStream<T> 85 toClientStreamUnsafeDoNotUse( 86 folly::EventBase* evb = folly::getUnsafeMutableGlobalEventBase(), 87 int32_t bufferSize = 100) &&; 88 operator()89 apache::thrift::detail::ServerStreamFactory operator()( 90 folly::Executor::KeepAlive<> serverExecutor, 91 apache::thrift::detail::StreamElementEncoder<T>* encode) { 92 return fn_(std::move(serverExecutor), encode); 93 } 94 95 private: ServerStream(apache::thrift::detail::ServerStreamFn<T> fn)96 explicit ServerStream(apache::thrift::detail::ServerStreamFn<T> fn) 97 : fn_(std::move(fn)) {} 98 99 template <bool WithHeader> 100 static std::pair<ServerStream<T>, ServerStreamPublisher<T, WithHeader>> createPublisherImpl(folly::Function<void ()> onStreamCompleteOrCancel)101 createPublisherImpl(folly::Function<void()> onStreamCompleteOrCancel) { 102 auto pair = 103 apache::thrift::detail::ServerPublisherStream<T, WithHeader>::create( 104 std::move(onStreamCompleteOrCancel)); 105 return std:: 106 make_pair<ServerStream<T>, ServerStreamPublisher<T, WithHeader>>( 107 ServerStream<T>(std::move(pair.first)), std::move(pair.second)); 108 } 109 110 apache::thrift::detail::ServerStreamFn<T> fn_; 111 112 friend class yarpl::flowable::ThriftStreamShim; 113 friend class ServerStreamMultiPublisher<T, false>; 114 friend class ServerStreamMultiPublisher<T, true>; 115 }; 116 117 template <typename Response, typename StreamElement> 118 struct ResponseAndServerStream { 119 using ResponseType = Response; 120 using StreamElementType = StreamElement; 121 122 Response response; 123 ServerStream<StreamElement> stream; 124 }; 125 struct ResponseAndServerStreamFactory { 126 apache::thrift::SerializedResponse response; 127 apache::thrift::detail::ServerStreamFactory stream; 128 }; 129 130 } // namespace thrift 131 } // namespace apache 132 #include <thrift/lib/cpp2/async/ServerStream-inl.h> 133