1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <string>
20 #include <string_view>
21 #include <type_traits>
22 #include <utility>
23 
24 #include <folly/Portability.h>
25 
26 #include <fmt/core.h>
27 #include <folly/Traits.h>
28 #include <folly/Utility.h>
29 #include <folly/experimental/coro/FutureUtil.h>
30 #include <folly/futures/Future.h>
31 #include <folly/io/Cursor.h>
32 #include <thrift/lib/cpp/protocol/TBase64Utils.h>
33 #include <thrift/lib/cpp2/PluggableFunction.h>
34 #include <thrift/lib/cpp2/SerializationSwitch.h>
35 #include <thrift/lib/cpp2/Thrift.h>
36 #include <thrift/lib/cpp2/async/AsyncProcessor.h>
37 #include <thrift/lib/cpp2/async/AsyncProcessorHelper.h>
38 #include <thrift/lib/cpp2/async/ClientBufferedStream.h>
39 #include <thrift/lib/cpp2/async/ClientSinkBridge.h>
40 #include <thrift/lib/cpp2/async/RequestChannel.h>
41 #include <thrift/lib/cpp2/async/Sink.h>
42 #include <thrift/lib/cpp2/async/StreamCallbacks.h>
43 #include <thrift/lib/cpp2/detail/meta.h>
44 #include <thrift/lib/cpp2/frozen/Frozen.h>
45 #include <thrift/lib/cpp2/protocol/Cpp2Ops.h>
46 #include <thrift/lib/cpp2/protocol/Traits.h>
47 #include <thrift/lib/cpp2/transport/core/RpcMetadataUtil.h>
48 #include <thrift/lib/cpp2/util/Frozen2ViewHelpers.h>
49 #include <thrift/lib/thrift/gen-cpp2/RpcMetadata_types.h>
50 
51 namespace apache {
52 namespace thrift {
53 
54 class BinaryProtocolReader;
55 class CompactProtocolReader;
56 
57 namespace detail {
58 
59 THRIFT_PLUGGABLE_FUNC_DECLARE(
60     bool, includeInRecentRequestsCount, const std::string_view /*methodName*/);
61 
62 template <int N, int Size, class F, class Tuple>
63 struct ForEachImpl {
forEachForEachImpl64   static uint32_t forEach(Tuple&& tuple, F&& f) {
65     uint32_t res = f(std::get<N>(tuple), N);
66     res += ForEachImpl<N + 1, Size, F, Tuple>::forEach(
67         std::forward<Tuple>(tuple), std::forward<F>(f));
68     return res;
69   }
70 };
71 template <int Size, class F, class Tuple>
72 struct ForEachImpl<Size, Size, F, Tuple> {
73   static uint32_t forEach(Tuple&& /*tuple*/, F&& /*f*/) { return 0; }
74 };
75 
76 template <int N = 0, class F, class Tuple>
77 uint32_t forEach(Tuple&& tuple, F&& f) {
78   return ForEachImpl<
79       N,
80       std::tuple_size<typename std::remove_reference<Tuple>::type>::value,
81       F,
82       Tuple>::forEach(std::forward<Tuple>(tuple), std::forward<F>(f));
83 }
84 
85 template <int N, int Size, class F, class Tuple>
86 struct ForEachVoidImpl {
87   static void forEach(Tuple&& tuple, F&& f) {
88     f(std::get<N>(tuple), N);
89     ForEachVoidImpl<N + 1, Size, F, Tuple>::forEach(
90         std::forward<Tuple>(tuple), std::forward<F>(f));
91   }
92 };
93 template <int Size, class F, class Tuple>
94 struct ForEachVoidImpl<Size, Size, F, Tuple> {
95   static void forEach(Tuple&& /*tuple*/, F&& /*f*/) {}
96 };
97 
98 template <int N = 0, class F, class Tuple>
99 void forEachVoid(Tuple&& tuple, F&& f) {
100   ForEachVoidImpl<
101       N,
102       std::tuple_size<typename std::remove_reference<Tuple>::type>::value,
103       F,
104       Tuple>::forEach(std::forward<Tuple>(tuple), std::forward<F>(f));
105 }
106 
107 template <typename Protocol, typename IsSet>
108 struct Writer {
109   Writer(Protocol* prot, const IsSet& isset) : prot_(prot), isset_(isset) {}
110   template <typename FieldData>
111   uint32_t operator()(const FieldData& fieldData, int index) {
112     using Ops = Cpp2Ops<typename FieldData::ref_type>;
113 
114     if (!isset_.getIsSet(index)) {
115       return 0;
116     }
117 
118     int16_t fid = FieldData::fid;
119     const auto& ex = fieldData.ref();
120 
121     uint32_t xfer = 0;
122     xfer += prot_->writeFieldBegin("", Ops::thriftType(), fid);
123     xfer += Ops::write(prot_, &ex);
124     xfer += prot_->writeFieldEnd();
125     return xfer;
126   }
127 
128  private:
129   Protocol* prot_;
130   const IsSet& isset_;
131 };
132 
133 template <typename Protocol, typename IsSet>
134 struct Sizer {
135   Sizer(Protocol* prot, const IsSet& isset) : prot_(prot), isset_(isset) {}
136   template <typename FieldData>
137   uint32_t operator()(const FieldData& fieldData, int index) {
138     using Ops = Cpp2Ops<typename FieldData::ref_type>;
139 
140     if (!isset_.getIsSet(index)) {
141       return 0;
142     }
143 
144     int16_t fid = FieldData::fid;
145     const auto& ex = fieldData.ref();
146 
147     uint32_t xfer = 0;
148     xfer += prot_->serializedFieldSize("", Ops::thriftType(), fid);
149     xfer += Ops::serializedSize(prot_, &ex);
150     return xfer;
151   }
152 
153  private:
154   Protocol* prot_;
155   const IsSet& isset_;
156 };
157 
158 template <typename Protocol, typename IsSet>
159 struct SizerZC {
160   SizerZC(Protocol* prot, const IsSet& isset) : prot_(prot), isset_(isset) {}
161   template <typename FieldData>
162   uint32_t operator()(const FieldData& fieldData, int index) {
163     using Ops = Cpp2Ops<typename FieldData::ref_type>;
164 
165     if (!isset_.getIsSet(index)) {
166       return 0;
167     }
168 
169     int16_t fid = FieldData::fid;
170     const auto& ex = fieldData.ref();
171 
172     uint32_t xfer = 0;
173     xfer += prot_->serializedFieldSize("", Ops::thriftType(), fid);
174     xfer += Ops::serializedSizeZC(prot_, &ex);
175     return xfer;
176   }
177 
178  private:
179   Protocol* prot_;
180   const IsSet& isset_;
181 };
182 
183 template <typename Protocol, typename IsSet>
184 struct Reader {
185   Reader(
186       Protocol* prot,
187       IsSet& isset,
188       int16_t fid,
189       protocol::TType ftype,
190       bool& success)
191       : prot_(prot),
192         isset_(isset),
193         fid_(fid),
194         ftype_(ftype),
195         success_(success) {}
196   template <typename FieldData>
197   void operator()(FieldData& fieldData, int index) {
198     using Ops = Cpp2Ops<typename FieldData::ref_type>;
199 
200     if (ftype_ != Ops::thriftType()) {
201       return;
202     }
203 
204     int16_t myfid = FieldData::fid;
205     auto& ex = fieldData.ref();
206     if (myfid != fid_) {
207       return;
208     }
209 
210     success_ = true;
211     isset_.setIsSet(index);
212     Ops::read(prot_, &ex);
213   }
214 
215  private:
216   Protocol* prot_;
217   IsSet& isset_;
218   int16_t fid_;
219   protocol::TType ftype_;
220   bool& success_;
221 };
222 
223 template <typename T>
224 T& maybe_remove_pointer(T& x) {
225   return x;
226 }
227 
228 template <typename T>
229 T& maybe_remove_pointer(T* x) {
230   return *x;
231 }
232 
233 template <bool hasIsSet, size_t count>
234 struct IsSetHelper {
235   void setIsSet(size_t /*index*/, bool /*value*/ = true) {}
236   bool getIsSet(size_t /*index*/) const { return true; }
237 };
238 
239 template <size_t count>
240 struct IsSetHelper<true, count> {
241   void setIsSet(size_t index, bool value = true) { isset_[index] = value; }
242   bool getIsSet(size_t index) const { return isset_[index]; }
243 
244  private:
245   std::array<bool, count> isset_ = {};
246 };
247 
248 } // namespace detail
249 
250 template <int16_t Fid, typename TC, typename T>
251 struct FieldData {
252   static const constexpr int16_t fid = Fid;
253   static const constexpr protocol::TType ttype = protocol_type_v<TC, T>;
254   typedef TC type_class;
255   typedef T type;
256   typedef typename std::remove_pointer<T>::type ref_type;
257   T value;
258   ref_type& ref() {
259     return apache::thrift::detail::maybe_remove_pointer(value);
260   }
261   const ref_type& ref() const {
262     return apache::thrift::detail::maybe_remove_pointer(value);
263   }
264 };
265 
266 template <bool hasIsSet, typename... Field>
267 class ThriftPresult
268     : private std::tuple<Field...>,
269       public apache::thrift::detail::IsSetHelper<hasIsSet, sizeof...(Field)> {
270   // The fields tuple and IsSetHelper are base classes (rather than members)
271   // to employ the empty base class optimization when they are empty
272   typedef std::tuple<Field...> Fields;
273   typedef apache::thrift::detail::IsSetHelper<hasIsSet, sizeof...(Field)>
274       CurIsSetHelper;
275 
276  public:
277   using size = std::tuple_size<Fields>;
278 
279   CurIsSetHelper& isSet() { return *this; }
280   const CurIsSetHelper& isSet() const { return *this; }
281   Fields& fields() { return *this; }
282   const Fields& fields() const { return *this; }
283 
284   // returns lvalue ref to the appropriate FieldData
285   template <size_t index>
286   auto get() -> decltype(std::get<index>(this->fields())) {
287     return std::get<index>(this->fields());
288   }
289 
290   template <size_t index>
291   auto get() const -> decltype(std::get<index>(this->fields())) {
292     return std::get<index>(this->fields());
293   }
294 
295   template <class Protocol>
296   uint32_t read(Protocol* prot) {
297     auto xfer = prot->getCursorPosition();
298     std::string fname;
299     apache::thrift::protocol::TType ftype;
300     int16_t fid;
301 
302     prot->readStructBegin(fname);
303 
304     while (true) {
305       prot->readFieldBegin(fname, ftype, fid);
306       if (ftype == apache::thrift::protocol::T_STOP) {
307         break;
308       }
309       bool readSomething = false;
310       apache::thrift::detail::forEachVoid(
311           fields(),
312           apache::thrift::detail::Reader<Protocol, CurIsSetHelper>(
313               prot, isSet(), fid, ftype, readSomething));
314       if (!readSomething) {
315         prot->skip(ftype);
316       }
317       prot->readFieldEnd();
318     }
319     prot->readStructEnd();
320 
321     return folly::to_narrow(prot->getCursorPosition() - xfer);
322   }
323 
324   template <class Protocol>
325   uint32_t serializedSize(Protocol* prot) const {
326     uint32_t xfer = 0;
327     xfer += prot->serializedStructSize("");
328     xfer += apache::thrift::detail::forEach(
329         fields(),
330         apache::thrift::detail::Sizer<Protocol, CurIsSetHelper>(prot, isSet()));
331     xfer += prot->serializedSizeStop();
332     return xfer;
333   }
334 
335   template <class Protocol>
336   uint32_t serializedSizeZC(Protocol* prot) const {
337     uint32_t xfer = 0;
338     xfer += prot->serializedStructSize("");
339     xfer += apache::thrift::detail::forEach(
340         fields(),
341         apache::thrift::detail::SizerZC<Protocol, CurIsSetHelper>(
342             prot, isSet()));
343     xfer += prot->serializedSizeStop();
344     return xfer;
345   }
346 
347   template <class Protocol>
348   uint32_t write(Protocol* prot) const {
349     uint32_t xfer = 0;
350     xfer += prot->writeStructBegin("");
351     xfer += apache::thrift::detail::forEach(
352         fields(),
353         apache::thrift::detail::Writer<Protocol, CurIsSetHelper>(
354             prot, isSet()));
355     xfer += prot->writeFieldStop();
356     xfer += prot->writeStructEnd();
357     return xfer;
358   }
359 };
360 
361 template <typename PResults, typename StreamPresult>
362 struct ThriftPResultStream {
363   using StreamPResultType = StreamPresult;
364   using FieldsType = PResults;
365 
366   PResults fields;
367   StreamPresult stream;
368 };
369 
370 template <
371     typename PResults,
372     typename SinkPresult,
373     typename FinalResponsePresult>
374 struct ThriftPResultSink {
375   using SinkPResultType = SinkPresult;
376   using FieldsType = PResults;
377   using FinalResponsePResultType = FinalResponsePresult;
378 
379   PResults fields;
380   SinkPresult stream;
381   FinalResponsePresult finalResponse;
382 };
383 
384 template <bool hasIsSet, class... Args>
385 class Cpp2Ops<ThriftPresult<hasIsSet, Args...>> {
386  public:
387   typedef ThriftPresult<hasIsSet, Args...> Presult;
388   static constexpr protocol::TType thriftType() { return protocol::T_STRUCT; }
389   template <class Protocol>
390   static uint32_t write(Protocol* prot, const Presult* value) {
391     return value->write(prot);
392   }
393   template <class Protocol>
394   static uint32_t read(Protocol* prot, Presult* value) {
395     return value->read(prot);
396   }
397   template <class Protocol>
398   static uint32_t serializedSize(Protocol* prot, const Presult* value) {
399     return value->serializedSize(prot);
400   }
401   template <class Protocol>
402   static uint32_t serializedSizeZC(Protocol* prot, const Presult* value) {
403     return value->serializedSizeZC(prot);
404   }
405 };
406 
407 // Forward declaration
408 namespace detail {
409 namespace ap {
410 
411 template <
412     ErrorBlame Blame,
413     typename Protocol,
414     typename PResult,
415     typename T,
416     typename ErrorMapFunc>
417 class StreamElementEncoderImpl;
418 
419 template <typename Protocol, typename PResult, typename T>
420 folly::Try<T> decode_stream_element(
421     folly::Try<apache::thrift::StreamPayload>&& payload);
422 
423 template <typename Protocol, typename PResult, typename T>
424 apache::thrift::ClientBufferedStream<T> decode_client_buffered_stream(
425     apache::thrift::detail::ClientStreamBridge::ClientPtr streamBridge,
426     const BufferOptions& bufferOptions);
427 
428 template <typename Protocol, typename PResult, typename T>
429 std::unique_ptr<folly::IOBuf> encode_stream_payload(T&& _item);
430 
431 template <typename Protocol, typename PResult>
432 std::unique_ptr<folly::IOBuf> encode_stream_payload(folly::IOBuf&& _item);
433 
434 template <typename Protocol, typename PResult, typename ErrorMapFunc>
435 EncodedStreamError encode_stream_exception(folly::exception_wrapper ew);
436 
437 template <typename Protocol, typename PResult, typename T>
438 T decode_stream_payload_impl(folly::IOBuf& payload, folly::tag_t<T>);
439 
440 template <typename Protocol, typename PResult, typename T>
441 folly::IOBuf decode_stream_payload_impl(
442     folly::IOBuf& payload, folly::tag_t<folly::IOBuf>);
443 
444 template <typename Protocol, typename PResult, typename T>
445 T decode_stream_payload(folly::IOBuf& payload);
446 
447 template <typename Protocol, typename PResult, typename T>
448 folly::exception_wrapper decode_stream_exception(folly::exception_wrapper ew);
449 
450 struct EmptyExMapType {
451   template <typename PResult>
452   bool operator()(PResult&, folly::exception_wrapper) {
453     return false;
454   }
455 };
456 
457 } // namespace ap
458 } // namespace detail
459 
460 //  AsyncClient helpers
461 namespace detail {
462 namespace ac {
463 
464 template <bool HasReturnType, typename PResult>
465 folly::exception_wrapper extract_exn(PResult& result) {
466   using base = std::integral_constant<std::size_t, HasReturnType ? 1 : 0>;
467   auto ew = folly::exception_wrapper();
468   if (HasReturnType && result.getIsSet(0)) {
469     return ew;
470   }
471   foreach_index<PResult::size::value - base::value>([&](auto index) {
472     if (!ew && result.getIsSet(index.value + base::value)) {
473       auto& fdata = result.template get<index.value + base::value>();
474       ew = folly::exception_wrapper(std::move(fdata.ref()));
475     }
476   });
477   if (!ew && HasReturnType) {
478     ew = folly::make_exception_wrapper<TApplicationException>(
479         TApplicationException::TApplicationExceptionType::MISSING_RESULT,
480         "failed: unknown result");
481   }
482   return ew;
483 }
484 
485 template <typename Protocol, typename PResult>
486 folly::exception_wrapper recv_wrapped_helper(
487     Protocol* prot, ClientReceiveState& state, PResult& result) {
488   ContextStack* ctx = state.ctx();
489   MessageType mtype = state.messageType();
490   if (ctx) {
491     ctx->preRead();
492   }
493   try {
494     const folly::IOBuf& buffer = *state.serializedResponse().buffer;
495     // TODO: re-enable checksumming after we properly adjust checksum on the
496     // server to exclude the envelope.
497     // if (state.header() && state.header()->getCrc32c().has_value() &&
498     //     checksum::crc32c(buffer) != *state.header()->getCrc32c()) {
499     //   return folly::make_exception_wrapper<TApplicationException>(
500     //       TApplicationException::TApplicationExceptionType::CHECKSUM_MISMATCH,
501     //       "corrupted response");
502     // }
503     if (mtype == MessageType::T_EXCEPTION) {
504       TApplicationException x;
505       apache::thrift::detail::deserializeExceptionBody(prot, &x);
506       return folly::exception_wrapper(std::move(x));
507     }
508     if (mtype != MessageType::T_REPLY) {
509       prot->skip(protocol::T_STRUCT);
510       return folly::make_exception_wrapper<TApplicationException>(
511           TApplicationException::TApplicationExceptionType::
512               INVALID_MESSAGE_TYPE);
513     }
514     SerializedMessage smsg;
515     smsg.protocolType = prot->protocolType();
516     smsg.buffer = &buffer;
517     if (ctx) {
518       ctx->onReadData(smsg);
519     }
520     apache::thrift::detail::deserializeRequestBody(prot, &result);
521     if (ctx) {
522       ctx->postRead(
523           state.header(), folly::to_narrow(buffer.computeChainDataLength()));
524     }
525     return folly::exception_wrapper();
526   } catch (std::exception const& e) {
527     return folly::exception_wrapper(std::current_exception(), e);
528   } catch (...) {
529     return folly::exception_wrapper(std::current_exception());
530   }
531 }
532 
533 template <typename PResult, typename Protocol, typename... ReturnTs>
534 folly::exception_wrapper recv_wrapped(
535     Protocol* prot, ClientReceiveState& state, ReturnTs&... _returns) {
536   prot->setInput(state.serializedResponse().buffer.get());
537   auto guard = folly::makeGuard([&] { prot->setInput(nullptr); });
538   apache::thrift::ContextStack* ctx = state.ctx();
539   PResult result;
540   foreach(
541       [&](auto index, auto& obj) {
542         result.template get<index.value>().value = &obj;
543       },
544       _returns...);
545   auto ew = recv_wrapped_helper(prot, state, result);
546   if (!ew) {
547     constexpr auto const kHasReturnType = sizeof...(_returns) != 0;
548     ew = apache::thrift::detail::ac::extract_exn<kHasReturnType>(result);
549   }
550   if (ctx && ew) {
551     ctx->handlerErrorWrapped(ew);
552   }
553   return ew;
554 }
555 
556 template <typename PResult, typename Protocol, typename Response, typename Item>
557 folly::exception_wrapper recv_wrapped(
558     Protocol* prot,
559     ClientReceiveState& state,
560     apache::thrift::ResponseAndClientBufferedStream<Response, Item>& _return) {
561   prot->setInput(state.serializedResponse().buffer.get());
562   auto guard = folly::makeGuard([&] { prot->setInput(nullptr); });
563   apache::thrift::ContextStack* ctx = state.ctx();
564 
565   typename PResult::FieldsType result;
566   result.template get<0>().value = &_return.response;
567 
568   auto ew = recv_wrapped_helper(prot, state, result);
569   if (!ew) {
570     ew = apache::thrift::detail::ac::extract_exn<true>(result);
571   }
572   if (ctx && ew) {
573     ctx->handlerErrorWrapped(ew);
574   }
575 
576   if (!ew) {
577     _return.stream = apache::thrift::detail::ap::decode_client_buffered_stream<
578         Protocol,
579         typename PResult::StreamPResultType,
580         Item>(state.extractStreamBridge(), state.bufferOptions());
581   }
582   return ew;
583 }
584 
585 template <typename PResult, typename Protocol, typename Item>
586 folly::exception_wrapper recv_wrapped(
587     Protocol* prot,
588     ClientReceiveState& state,
589     apache::thrift::ClientBufferedStream<Item>& _return) {
590   prot->setInput(state.serializedResponse().buffer.get());
591   auto guard = folly::makeGuard([&] { prot->setInput(nullptr); });
592   apache::thrift::ContextStack* ctx = state.ctx();
593 
594   typename PResult::FieldsType result;
595 
596   auto ew = recv_wrapped_helper(prot, state, result);
597   if (!ew) {
598     ew = apache::thrift::detail::ac::extract_exn<false>(result);
599   }
600   if (ctx && ew) {
601     ctx->handlerErrorWrapped(ew);
602   }
603 
604   if (!ew) {
605     _return = apache::thrift::detail::ap::decode_client_buffered_stream<
606         Protocol,
607         typename PResult::StreamPResultType,
608         Item>(state.extractStreamBridge(), state.bufferOptions());
609   }
610   return ew;
611 }
612 
613 #if FOLLY_HAS_COROUTINES
614 
615 template <
616     typename ProtocolReader,
617     typename ProtocolWriter,
618     typename SinkPResult,
619     typename SinkType,
620     typename FinalResponsePResult,
621     typename FinalResponseType,
622     typename ErrorMapFunc>
623 ClientSink<SinkType, FinalResponseType> createSink(
624     apache::thrift::detail::ClientSinkBridge::Ptr impl) {
625   static apache::thrift::detail::ap::StreamElementEncoderImpl<
626       ErrorBlame::CLIENT,
627       ProtocolWriter,
628       SinkPResult,
629       SinkType,
630       std::decay_t<ErrorMapFunc>>
631       encode;
632   return ClientSink<SinkType, FinalResponseType>(
633       std::move(impl),
634       &encode,
635       apache::thrift::detail::ap::decode_stream_element<
636           ProtocolReader,
637           FinalResponsePResult,
638           FinalResponseType>);
639 }
640 #endif
641 
642 template <
643     typename PResult,
644     typename ErrorMapFunc,
645     typename ProtocolWriter,
646     typename ProtocolReader,
647     typename Response,
648     typename Item,
649     typename FinalResponse>
650 folly::exception_wrapper recv_wrapped(
651     ProtocolReader* prot,
652     ClientReceiveState& state,
653     apache::thrift::detail::ClientSinkBridge::Ptr impl,
654     apache::thrift::ResponseAndClientSink<Response, Item, FinalResponse>&
655         _return) {
656 #if FOLLY_HAS_COROUTINES
657   prot->setInput(state.serializedResponse().buffer.get());
658   auto guard = folly::makeGuard([&] { prot->setInput(nullptr); });
659   apache::thrift::ContextStack* ctx = state.ctx();
660 
661   typename PResult::FieldsType result;
662   result.template get<0>().value = &_return.response;
663 
664   auto ew = recv_wrapped_helper(prot, state, result);
665   if (!ew) {
666     ew = apache::thrift::detail::ac::extract_exn<true>(result);
667   }
668   if (ctx && ew) {
669     ctx->handlerErrorWrapped(ew);
670   }
671 
672   if (!ew) {
673     _return.sink = createSink<
674         ProtocolReader,
675         ProtocolWriter,
676         typename PResult::SinkPResultType,
677         Item,
678         typename PResult::FinalResponsePResultType,
679         FinalResponse,
680         std::decay_t<ErrorMapFunc>>(std::move(impl));
681   }
682   return ew;
683 #else
684   (void)prot;
685   (void)state;
686   (void)impl;
687   (void)_return;
688   std::terminate();
689 #endif
690 }
691 
692 template <
693     typename PResult,
694     typename ErrorMapFunc,
695     typename ProtocolWriter,
696     typename ProtocolReader,
697     typename Item,
698     typename FinalResponse>
699 folly::exception_wrapper recv_wrapped(
700     ProtocolReader* prot,
701     ClientReceiveState& state,
702     apache::thrift::detail::ClientSinkBridge::Ptr impl,
703     apache::thrift::ClientSink<Item, FinalResponse>& _return) {
704 #if FOLLY_HAS_COROUTINES
705   prot->setInput(state.serializedResponse().buffer.get());
706   auto guard = folly::makeGuard([&] { prot->setInput(nullptr); });
707   apache::thrift::ContextStack* ctx = state.ctx();
708 
709   typename PResult::FieldsType result;
710 
711   auto ew = recv_wrapped_helper(prot, state, result);
712   if (!ew) {
713     ew = apache::thrift::detail::ac::extract_exn<false>(result);
714   }
715   if (ctx && ew) {
716     ctx->handlerErrorWrapped(ew);
717   }
718 
719   if (!ew) {
720     _return = createSink<
721         ProtocolReader,
722         ProtocolWriter,
723         typename PResult::SinkPResultType,
724         Item,
725         typename PResult::FinalResponsePResultType,
726         FinalResponse,
727         std::decay_t<ErrorMapFunc>>(std::move(impl));
728   }
729   return ew;
730 #else
731   (void)prot;
732   (void)state;
733   (void)impl;
734   (void)_return;
735   std::terminate();
736 #endif
737 }
738 
739 [[noreturn]] void throw_app_exn(char const* msg);
740 } // namespace ac
741 } // namespace detail
742 
743 //  AsyncProcessor helpers
744 namespace detail {
745 namespace ap {
746 
747 //  Everything templated on only protocol goes here. The corresponding .cpp file
748 //  explicitly instantiates this struct for each supported protocol.
749 template <typename ProtocolReader, typename ProtocolWriter>
750 struct helper {
751   static std::unique_ptr<folly::IOBuf> write_exn(
752       bool includeEnvelope,
753       const char* method,
754       ProtocolWriter* prot,
755       int32_t protoSeqId,
756       ContextStack* ctx,
757       const TApplicationException& x);
758 
759   // Temporary for backwards compatibility
760   static std::unique_ptr<folly::IOBuf> write_exn(
761       const char* method,
762       ProtocolWriter* prot,
763       int32_t protoSeqId,
764       ContextStack* ctx,
765       const TApplicationException& x) {
766     return write_exn(true, method, prot, protoSeqId, ctx, x);
767   }
768 
769   static void process_exn(
770       const char* func,
771       const TApplicationException::TApplicationExceptionType type,
772       const std::string& msg,
773       ResponseChannelRequest::UniquePtr req,
774       Cpp2RequestContext* ctx,
775       folly::EventBase* eb,
776       int32_t protoSeqId);
777 };
778 
779 template <typename ProtocolReader>
780 using writer_of = typename ProtocolReader::ProtocolWriter;
781 template <typename ProtocolWriter>
782 using reader_of = typename ProtocolWriter::ProtocolReader;
783 
784 template <typename ProtocolReader>
785 using helper_r = helper<ProtocolReader, writer_of<ProtocolReader>>;
786 template <typename ProtocolWriter>
787 using helper_w = helper<reader_of<ProtocolWriter>, ProtocolWriter>;
788 
789 template <typename T>
790 inline constexpr bool is_root_async_processor =
791     std::is_void_v<typename T::BaseAsyncProcessor>;
792 
793 template <typename Derived>
794 GeneratedAsyncProcessor::ProcessFunc<Derived> getProcessFuncFromProtocol(
795     folly::tag_t<CompactProtocolReader> /* unused */,
796     const GeneratedAsyncProcessor::ProcessFuncs<Derived>& funcs) {
797   return funcs.compact;
798 }
799 template <typename Derived>
800 GeneratedAsyncProcessor::ProcessFunc<Derived> getProcessFuncFromProtocol(
801     folly::tag_t<BinaryProtocolReader> /* unused */,
802     const GeneratedAsyncProcessor::ProcessFuncs<Derived>& funcs) {
803   return funcs.binary;
804 }
805 
806 inline void nonRecursiveProcessMissing(
807     const std::string& methodName,
808     ResponseChannelRequest::UniquePtr req,
809     folly::EventBase* eb) {
810   if (req) {
811     eb->runInEventBaseThread([request = std::move(req),
812                               name = std::string{methodName}]() mutable {
813       AsyncProcessorHelper::sendUnknownMethodError(std::move(request), name);
814     });
815   }
816 }
817 
818 template <class ProtocolReader, class Processor>
819 void recursiveProcessMissing(
820     Processor* processor,
821     const std::string& fname,
822     ResponseChannelRequest::UniquePtr req,
823     apache::thrift::SerializedCompressedRequest&& serializedRequest,
824     Cpp2RequestContext* ctx,
825     folly::EventBase* eb,
826     concurrency::ThreadManager* tm);
827 
828 template <class ProtocolReader, class Processor>
829 void recursiveProcessPmap(
830     Processor* proc,
831     const typename Processor::ProcessMap& pmap,
832     ResponseChannelRequest::UniquePtr req,
833     apache::thrift::SerializedCompressedRequest&& serializedRequest,
834     Cpp2RequestContext* ctx,
835     folly::EventBase* eb,
836     concurrency::ThreadManager* tm) {
837   const auto& fname = ctx->getMethodName();
838   auto processFuncs = pmap.find(fname);
839   if (processFuncs == pmap.end()) {
840     recursiveProcessMissing<ProtocolReader>(
841         proc, fname, std::move(req), std::move(serializedRequest), ctx, eb, tm);
842     return;
843   }
844 
845   auto pfn = getProcessFuncFromProtocol(
846       folly::tag<ProtocolReader>, processFuncs->second);
847   (proc->*pfn)(std::move(req), std::move(serializedRequest), ctx, eb, tm);
848 }
849 
850 template <class ProtocolReader, class Processor>
851 void recursiveProcessMissing(
852     Processor* processor,
853     const std::string& fname,
854     ResponseChannelRequest::UniquePtr req,
855     apache::thrift::SerializedCompressedRequest&& serializedRequest,
856     Cpp2RequestContext* ctx,
857     folly::EventBase* eb,
858     concurrency::ThreadManager* tm) {
859   if constexpr (is_root_async_processor<Processor>) {
860     nonRecursiveProcessMissing(fname, std::move(req), eb);
861   } else {
862     using BaseAsyncProcessor = typename Processor::BaseAsyncProcessor;
863     recursiveProcessPmap<ProtocolReader, BaseAsyncProcessor>(
864         processor,
865         BaseAsyncProcessor::getOwnProcessMap(),
866         std::move(req),
867         std::move(serializedRequest),
868         ctx,
869         eb,
870         tm);
871   }
872 }
873 
874 /**
875  * Recursive implementation of method resolution based on
876  * Processor::getOwnProcessMap(). This is the fallback/legacy implementation for
877  * generated AsyncProcessor::processSerializedCompressedRequest, which is called
878  * in the absence of MethodMetadata support.
879  *
880  * TODO(praihan): Remove this implementation once all services support
881  * MethodMetadata.
882  */
883 template <class ProtocolReader, class Processor>
884 void recursiveProcess(
885     Processor* processor,
886     ResponseChannelRequest::UniquePtr req,
887     apache::thrift::SerializedCompressedRequest&& serializedRequest,
888     Cpp2RequestContext* ctx,
889     folly::EventBase* eb,
890     concurrency::ThreadManager* tm) {
891   return recursiveProcessPmap<ProtocolReader>(
892       processor,
893       Processor::getOwnProcessMap(),
894       std::move(req),
895       std::move(serializedRequest),
896       ctx,
897       eb,
898       tm);
899 }
900 
901 /**
902  * Non-recursive implementation of method resolution based on
903  * the passed-in MethodMetadata.
904  * See ServerInterface::GeneratedMethodMetadata.
905  */
906 template <class ProtocolReader, class Processor>
907 void nonRecursiveProcess(
908     Processor* processor,
909     ResponseChannelRequest::UniquePtr req,
910     apache::thrift::SerializedCompressedRequest&& serializedRequest,
911     const apache::thrift::AsyncProcessor::MethodMetadata& untypedMethodMetadata,
912     Cpp2RequestContext* ctx,
913     folly::EventBase* eb,
914     concurrency::ThreadManager* tm) {
915   using Metadata = ServerInterface::GeneratedMethodMetadata<Processor>;
916   static_assert(std::is_final_v<Metadata>);
917   const auto& methodMetadata =
918       AsyncProcessorHelper::expectMetadataOfType<Metadata>(
919           untypedMethodMetadata);
920   auto pfn = getProcessFuncFromProtocol(
921       folly::tag<ProtocolReader>, methodMetadata.processFuncs);
922   (processor->*pfn)(std::move(req), std::move(serializedRequest), ctx, eb, tm);
923 }
924 
925 // Generated AsyncProcessor::processSerializedCompressedRequestWithMetadata just
926 // calls this
927 template <class Processor>
928 void process(
929     Processor* processor,
930     ResponseChannelRequest::UniquePtr req,
931     apache::thrift::SerializedCompressedRequest&& serializedRequest,
932     const apache::thrift::AsyncProcessor::MethodMetadata& methodMetadata,
933     protocol::PROTOCOL_TYPES protType,
934     Cpp2RequestContext* ctx,
935     folly::EventBase* eb,
936     concurrency::ThreadManager* tm) {
937   switch (protType) {
938     case protocol::T_BINARY_PROTOCOL: {
939       return nonRecursiveProcess<BinaryProtocolReader>(
940           processor,
941           std::move(req),
942           std::move(serializedRequest),
943           methodMetadata,
944           ctx,
945           eb,
946           tm);
947     }
948     case protocol::T_COMPACT_PROTOCOL: {
949       return nonRecursiveProcess<CompactProtocolReader>(
950           processor,
951           std::move(req),
952           std::move(serializedRequest),
953           methodMetadata,
954           ctx,
955           eb,
956           tm);
957     }
958     default:
959       LOG(ERROR) << "invalid protType: " << folly::to_underlying(protType);
960       return;
961   }
962 }
963 
964 // Generated AsyncProcessor::processSerializedCompressedRequest just calls this
965 template <class Processor>
966 void process(
967     Processor* processor,
968     ResponseChannelRequest::UniquePtr req,
969     apache::thrift::SerializedCompressedRequest&& serializedRequest,
970     protocol::PROTOCOL_TYPES protType,
971     Cpp2RequestContext* ctx,
972     folly::EventBase* eb,
973     concurrency::ThreadManager* tm) {
974   switch (protType) {
975     case protocol::T_BINARY_PROTOCOL: {
976       return recursiveProcess<BinaryProtocolReader>(
977           processor, std::move(req), std::move(serializedRequest), ctx, eb, tm);
978     }
979     case protocol::T_COMPACT_PROTOCOL: {
980       return recursiveProcess<CompactProtocolReader>(
981           processor, std::move(req), std::move(serializedRequest), ctx, eb, tm);
982     }
983     default:
984       LOG(ERROR) << "invalid protType: " << folly::to_underlying(protType);
985       return;
986   }
987 }
988 
989 struct MessageBegin : folly::MoveOnly {
990   std::string methodName;
991   struct Metadata {
992     std::string errMessage;
993     size_t size{0};
994     int32_t seqId{0};
995     MessageType msgType{};
996     bool isValid{true};
997   } metadata;
998 };
999 
1000 bool setupRequestContextWithMessageBegin(
1001     const MessageBegin::Metadata& msgBegin,
1002     protocol::PROTOCOL_TYPES protType,
1003     ResponseChannelRequest::UniquePtr& req,
1004     Cpp2RequestContext* ctx,
1005     folly::EventBase* eb);
1006 
1007 MessageBegin deserializeMessageBegin(
1008     const folly::IOBuf& buf, protocol::PROTOCOL_TYPES protType);
1009 
1010 /**
1011  * The function pointers themselves are contravariant on the Processor type but
1012  * ProcessFuncs is not because templates are invariant. This function performs
1013  * the conversion manually.
1014  */
1015 template <class DerivedProcessor, class BaseProcessor>
1016 std::enable_if_t<
1017     std::is_base_of_v<BaseProcessor, DerivedProcessor>,
1018     GeneratedAsyncProcessor::ProcessFuncs<DerivedProcessor>>
1019 downcastProcessFuncs(
1020     const GeneratedAsyncProcessor::ProcessFuncs<BaseProcessor>& processFuncs) {
1021   return GeneratedAsyncProcessor::ProcessFuncs<DerivedProcessor>{
1022       processFuncs.compact, processFuncs.binary};
1023 }
1024 
1025 template <
1026     class MostDerivedProcessor,
1027     class CurrentProcessor = MostDerivedProcessor>
1028 void populateMethodMetadataMap(AsyncProcessorFactory::MethodMetadataMap& map) {
1029   for (const auto& [methodName, processFuncs] :
1030        CurrentProcessor::getOwnProcessMap()) {
1031     map.emplace(
1032         methodName,
1033         // Always create GeneratatedMethodMetadata<MostDerivedProcessor> so that
1034         // all entries in the map are of the same type.
1035         std::make_shared<
1036             ServerInterface::GeneratedMethodMetadata<MostDerivedProcessor>>(
1037             downcastProcessFuncs<MostDerivedProcessor>(processFuncs)));
1038   }
1039   if constexpr (!is_root_async_processor<CurrentProcessor>) {
1040     populateMethodMetadataMap<
1041         MostDerivedProcessor,
1042         typename CurrentProcessor::BaseAsyncProcessor>(map);
1043   }
1044 }
1045 
1046 template <class Processor>
1047 AsyncProcessorFactory::MethodMetadataMap createMethodMetadataMap() {
1048   AsyncProcessorFactory::MethodMetadataMap result;
1049   populateMethodMetadataMap<Processor>(result);
1050   return result;
1051 }
1052 
1053 template <typename Protocol, typename PResult, typename T>
1054 std::unique_ptr<folly::IOBuf> encode_stream_payload(T&& _item) {
1055   PResult res;
1056   res.template get<0>().value = const_cast<T*>(&_item);
1057   res.setIsSet(0);
1058 
1059   folly::IOBufQueue queue(folly::IOBufQueue::cacheChainLength());
1060   Protocol prot;
1061   prot.setOutput(&queue, res.serializedSizeZC(&prot));
1062 
1063   res.write(&prot);
1064   return std::move(queue).move();
1065 }
1066 
1067 template <typename Protocol, typename PResult>
1068 std::unique_ptr<folly::IOBuf> encode_stream_payload(folly::IOBuf&& _item) {
1069   return std::make_unique<folly::IOBuf>(std::move(_item));
1070 }
1071 
1072 template <
1073     ErrorBlame Blame,
1074     typename Protocol,
1075     typename PResult,
1076     typename ErrorMapFunc>
1077 EncodedStreamError encode_stream_exception(folly::exception_wrapper ew) {
1078   ErrorMapFunc mapException;
1079   Protocol prot;
1080   folly::IOBufQueue queue(folly::IOBufQueue::cacheChainLength());
1081   PResult res;
1082 
1083   PayloadExceptionMetadata exceptionMetadata;
1084   PayloadExceptionMetadataBase exceptionMetadataBase;
1085   if (mapException(res, ew)) {
1086     prot.setOutput(&queue, res.serializedSizeZC(&prot));
1087     res.write(&prot);
1088     exceptionMetadata.declaredException_ref() =
1089         PayloadDeclaredExceptionMetadata();
1090   } else {
1091     constexpr size_t kQueueAppenderGrowth = 4096;
1092     prot.setOutput(&queue, kQueueAppenderGrowth);
1093     TApplicationException ex(ew.what().toStdString());
1094     exceptionMetadataBase.what_utf8_ref() = ex.what();
1095     apache::thrift::detail::serializeExceptionBody(&prot, &ex);
1096     PayloadAppUnknownExceptionMetdata aue;
1097     aue.errorClassification_ref().ensure().blame_ref() = Blame;
1098     exceptionMetadata.appUnknownException_ref() = std::move(aue);
1099   }
1100 
1101   exceptionMetadataBase.metadata_ref() = std::move(exceptionMetadata);
1102   StreamPayloadMetadata streamPayloadMetadata;
1103   PayloadMetadata payloadMetadata;
1104   payloadMetadata.exceptionMetadata_ref() = std::move(exceptionMetadataBase);
1105   streamPayloadMetadata.payloadMetadata_ref() = std::move(payloadMetadata);
1106   return EncodedStreamError(
1107       StreamPayload(std::move(queue).move(), std::move(streamPayloadMetadata)));
1108 }
1109 
1110 template <
1111     ErrorBlame Blame,
1112     typename Protocol,
1113     typename PResult,
1114     typename T,
1115     typename ErrorMapFunc>
1116 class StreamElementEncoderImpl final
1117     : public apache::thrift::detail::StreamElementEncoder<T> {
1118   folly::Try<StreamPayload> operator()(T&& val) override {
1119     StreamPayloadMetadata streamPayloadMetadata;
1120     PayloadMetadata payloadMetadata;
1121     payloadMetadata.responseMetadata_ref().ensure();
1122     streamPayloadMetadata.payloadMetadata_ref() = std::move(payloadMetadata);
1123     return folly::Try<StreamPayload>(
1124         {encode_stream_payload<Protocol, PResult>(std::move(val)),
1125          std::move(streamPayloadMetadata)});
1126   }
1127 
1128   folly::Try<StreamPayload> operator()(folly::exception_wrapper&& e) override {
1129     return folly::Try<StreamPayload>(folly::exception_wrapper(
1130         encode_stream_exception<Blame, Protocol, PResult, ErrorMapFunc>(e)));
1131   }
1132 };
1133 
1134 template <typename Protocol, typename PResult, typename T>
1135 T decode_stream_payload_impl(folly::IOBuf& payload, folly::tag_t<T>) {
1136   PResult args;
1137   T res{};
1138   args.template get<0>().value = &res;
1139 
1140   Protocol prot;
1141   prot.setInput(&payload);
1142   args.read(&prot);
1143   return res;
1144 }
1145 
1146 template <typename Protocol, typename PResult, typename T>
1147 folly::IOBuf decode_stream_payload_impl(
1148     folly::IOBuf& payload, folly::tag_t<folly::IOBuf>) {
1149   return std::move(payload);
1150 }
1151 
1152 template <typename Protocol, typename PResult, typename T>
1153 T decode_stream_payload(folly::IOBuf& payload) {
1154   return decode_stream_payload_impl<Protocol, PResult, T>(
1155       payload, folly::tag_t<T>{});
1156 }
1157 
1158 template <typename Protocol, typename PResult, typename T>
1159 folly::exception_wrapper decode_stream_exception(folly::exception_wrapper ew) {
1160   folly::exception_wrapper hijacked;
1161   ew.handle(
1162       [&hijacked](apache::thrift::detail::EncodedError& err) {
1163         PResult result;
1164         T res{};
1165         result.template get<0>().value = &res;
1166         Protocol prot;
1167         prot.setInput(err.encoded.get());
1168         result.read(&prot);
1169 
1170         CHECK(!result.getIsSet(0));
1171 
1172         foreach_index<PResult::size::value - 1>([&](auto index) {
1173           if (!hijacked && result.getIsSet(index.value + 1)) {
1174             auto& fdata = result.template get<index.value + 1>();
1175             hijacked = folly::exception_wrapper(std::move(fdata.ref()));
1176           }
1177         });
1178 
1179         if (!hijacked) {
1180           // Could not decode the error. It may be a TApplicationException
1181           TApplicationException x;
1182           prot.setInput(err.encoded.get());
1183           apache::thrift::detail::deserializeExceptionBody(&prot, &x);
1184           hijacked = folly::exception_wrapper(std::move(x));
1185         }
1186       },
1187       [&hijacked](apache::thrift::detail::EncodedStreamError& err) {
1188         auto& payload = err.encoded;
1189         DCHECK_EQ(payload.metadata.payloadMetadata_ref().has_value(), true);
1190         DCHECK_EQ(
1191             payload.metadata.payloadMetadata_ref()->getType(),
1192             PayloadMetadata::exceptionMetadata);
1193         auto& exceptionMetadataBase =
1194             payload.metadata.payloadMetadata_ref()->get_exceptionMetadata();
1195         if (auto exceptionMetadataRef = exceptionMetadataBase.metadata_ref()) {
1196           if (exceptionMetadataRef->getType() ==
1197               PayloadExceptionMetadata::declaredException) {
1198             PResult result;
1199             T res{};
1200             Protocol prot;
1201             result.template get<0>().value = &res;
1202             prot.setInput(payload.payload.get());
1203             result.read(&prot);
1204             CHECK(!result.getIsSet(0));
1205             foreach_index<PResult::size::value - 1>([&](auto index) {
1206               if (!hijacked && result.getIsSet(index.value + 1)) {
1207                 auto& fdata = result.template get<index.value + 1>();
1208                 hijacked = folly::exception_wrapper(std::move(fdata.ref()));
1209               }
1210             });
1211 
1212             if (!hijacked) {
1213               hijacked =
1214                   TApplicationException("Failed to parse declared exception");
1215             }
1216           } else {
1217             hijacked = TApplicationException(
1218                 exceptionMetadataBase.what_utf8_ref().value_or(""));
1219           }
1220         } else {
1221           hijacked =
1222               TApplicationException("Missing payload exception metadata");
1223         }
1224       },
1225       [&hijacked](apache::thrift::detail::EncodedStreamRpcError& err) {
1226         StreamRpcError streamRpcError;
1227         CompactProtocolReader reader;
1228         reader.setInput(err.encoded.get());
1229         streamRpcError.read(&reader);
1230         TApplicationException::TApplicationExceptionType exType{
1231             TApplicationException::UNKNOWN};
1232         auto code = streamRpcError.code_ref();
1233         if (code &&
1234             (code.value() == StreamRpcErrorCode::CREDIT_TIMEOUT ||
1235              code.value() == StreamRpcErrorCode::CHUNK_TIMEOUT)) {
1236           exType = TApplicationException::TIMEOUT;
1237         }
1238         hijacked = TApplicationException(
1239             exType, streamRpcError.what_utf8_ref().value_or(""));
1240       },
1241       [](...) {});
1242 
1243   if (hijacked) {
1244     return hijacked;
1245   }
1246   return ew;
1247 }
1248 
1249 template <
1250     typename Protocol,
1251     typename PResult,
1252     typename ErrorMapFunc,
1253     typename T>
1254 ServerStreamFactory encode_server_stream(
1255     apache::thrift::ServerStream<T>&& stream,
1256     folly::Executor::KeepAlive<> serverExecutor) {
1257   static StreamElementEncoderImpl<
1258       ErrorBlame::SERVER,
1259       Protocol,
1260       PResult,
1261       T,
1262       ErrorMapFunc>
1263       encode;
1264   return stream(std::move(serverExecutor), &encode);
1265 }
1266 
1267 template <typename Protocol, typename PResult, typename T>
1268 folly::Try<T> decode_stream_element(
1269     folly::Try<apache::thrift::StreamPayload>&& payload) {
1270   if (payload.hasValue()) {
1271     return folly::Try<T>(
1272         decode_stream_payload<Protocol, PResult, T>(*payload->payload));
1273   } else if (payload.hasException()) {
1274     return folly::Try<T>(decode_stream_exception<Protocol, PResult, T>(
1275         std::move(payload).exception()));
1276   } else {
1277     return folly::Try<T>();
1278   }
1279 }
1280 
1281 template <typename Protocol, typename PResult, typename T>
1282 apache::thrift::ClientBufferedStream<T> decode_client_buffered_stream(
1283     apache::thrift::detail::ClientStreamBridge::ClientPtr streamBridge,
1284     const BufferOptions& bufferOptions) {
1285   return apache::thrift::ClientBufferedStream<T>(
1286       std::move(streamBridge),
1287       decode_stream_element<Protocol, PResult, T>,
1288       bufferOptions);
1289 }
1290 
1291 template <
1292     typename ProtocolReader,
1293     typename ProtocolWriter,
1294     typename SinkPResult,
1295     typename FinalResponsePResult,
1296     typename ErrorMapFunc,
1297     typename SinkType,
1298     typename FinalResponseType>
1299 apache::thrift::detail::SinkConsumerImpl toSinkConsumerImpl(
1300     FOLLY_MAYBE_UNUSED SinkConsumer<SinkType, FinalResponseType>&& sinkConsumer,
1301     FOLLY_MAYBE_UNUSED folly::Executor::KeepAlive<> executor) {
1302 #if FOLLY_HAS_COROUTINES
1303   auto consumer =
1304       [innerConsumer = std::move(sinkConsumer.consumer)](
1305           folly::coro::AsyncGenerator<folly::Try<StreamPayload>&&> gen) mutable
1306       -> folly::coro::Task<folly::Try<StreamPayload>> {
1307     folly::exception_wrapper ew;
1308     try {
1309       FinalResponseType finalResponse = co_await innerConsumer(
1310           [](folly::coro::AsyncGenerator<folly::Try<StreamPayload>&&> gen_)
1311               -> folly::coro::AsyncGenerator<SinkType&&> {
1312             while (auto item = co_await gen_.next()) {
1313               auto payload = std::move(*item);
1314               co_yield folly::coro::co_result(ap::decode_stream_element<
1315                                               ProtocolReader,
1316                                               SinkPResult,
1317                                               SinkType>(std::move(payload)));
1318             }
1319           }(std::move(gen)));
1320       co_return folly::Try<StreamPayload>(StreamPayload(
1321           ap::encode_stream_payload<ProtocolWriter, FinalResponsePResult>(
1322               std::move(finalResponse)),
1323           {}));
1324 // This causes clang internal error on Windows.
1325 #if !(defined(_WIN32) && defined(__clang__))
1326     } catch (std::exception& e) {
1327       ew = folly::exception_wrapper(std::current_exception(), e);
1328 #endif
1329     } catch (...) {
1330       ew = folly::exception_wrapper(std::current_exception());
1331     }
1332     co_return folly::Try<StreamPayload>(ap::encode_stream_exception<
1333                                         ErrorBlame::SERVER,
1334                                         ProtocolWriter,
1335                                         FinalResponsePResult,
1336                                         ErrorMapFunc>(std::move(ew)));
1337   };
1338   return apache::thrift::detail::SinkConsumerImpl{
1339       std::move(consumer),
1340       sinkConsumer.bufferSize,
1341       sinkConsumer.sinkOptions.chunkTimeout,
1342       std::move(executor)};
1343 #else
1344   std::terminate();
1345 #endif
1346 }
1347 
1348 } // namespace ap
1349 } // namespace detail
1350 
1351 //  ServerInterface helpers
1352 namespace detail {
1353 namespace si {
1354 template <typename T>
1355 folly::Future<T> future(
1356     folly::SemiFuture<T>&& future, folly::Executor::KeepAlive<> keepAlive) {
1357   if (future.isReady()) {
1358     return std::move(future).toUnsafeFuture();
1359   }
1360   return std::move(future).via(keepAlive);
1361 }
1362 
1363 using CallbackBase = HandlerCallbackBase;
1364 using CallbackBasePtr = std::unique_ptr<CallbackBase>;
1365 template <typename T>
1366 using Callback = HandlerCallback<T>;
1367 template <typename T>
1368 using CallbackPtr = std::unique_ptr<Callback<T>>;
1369 
1370 class AsyncTmPrep {
1371   ServerInterface* si_;
1372 
1373  public:
1374   AsyncTmPrep(ServerInterface* si, CallbackBase* callback) : si_{si} {
1375     si->setEventBase(callback->getEventBase());
1376     si->setThreadManager(callback->getThreadManager());
1377     si->setRequestContext(callback->getRequestContext());
1378   }
1379 
1380   ~AsyncTmPrep() { si_->clearRequestParams(); }
1381 };
1382 
1383 inline void async_tm_future_oneway(
1384     CallbackBasePtr callback, folly::Future<folly::Unit>&& fut) {
1385   if (!fut.isReady()) {
1386     auto ka = callback->getInternalKeepAlive();
1387     std::move(fut)
1388         .via(std::move(ka))
1389         .thenValueInline([cb = std::move(callback)](auto&&) {});
1390   }
1391 }
1392 
1393 template <typename T>
1394 void async_tm_future(
1395     CallbackPtr<T> callback, folly::Future<folly::lift_unit_t<T>>&& fut) {
1396   if (!fut.isReady()) {
1397     auto ka = callback->getInternalKeepAlive();
1398     std::move(fut)
1399         .via(std::move(ka))
1400         .thenTryInline([cb = std::move(callback)](
1401                            folly::Try<folly::lift_unit_t<T>>&& ret) {
1402           cb->complete(std::move(ret));
1403         });
1404   } else {
1405     callback->complete(std::move(fut).result());
1406   }
1407 }
1408 
1409 inline void async_tm_semifuture_oneway(
1410     CallbackBasePtr callback, folly::SemiFuture<folly::Unit>&& fut) {
1411   if (!fut.isReady()) {
1412     auto ka = callback->getInternalKeepAlive();
1413     std::move(fut)
1414         .via(std::move(ka))
1415         .thenValueInline([cb = std::move(callback)](auto&&) {});
1416   }
1417 }
1418 
1419 template <typename T>
1420 void async_tm_semifuture(
1421     CallbackPtr<T> callback, folly::SemiFuture<folly::lift_unit_t<T>>&& fut) {
1422   if (!fut.isReady()) {
1423     auto ka = callback->getInternalKeepAlive();
1424     std::move(fut)
1425         .via(std::move(ka))
1426         .thenTryInline([cb = std::move(callback)](
1427                            folly::Try<folly::lift_unit_t<T>>&& ret) {
1428           cb->complete(std::move(ret));
1429         });
1430   } else {
1431     callback->complete(std::move(fut).result());
1432   }
1433 }
1434 
1435 #if FOLLY_HAS_COROUTINES
1436 inline void async_tm_coro_oneway(
1437     CallbackBasePtr callback, folly::coro::Task<void>&& task) {
1438   auto ka = callback->getInternalKeepAlive();
1439   std::move(task)
1440       .scheduleOn(std::move(ka))
1441       .startInlineUnsafe([callback = std::move(callback)](auto&&) {});
1442 }
1443 
1444 template <typename T>
1445 void async_tm_coro(CallbackPtr<T> callback, folly::coro::Task<T>&& task) {
1446   auto ka = callback->getInternalKeepAlive();
1447   std::move(task)
1448       .scheduleOn(std::move(ka))
1449       .startInlineUnsafe([callback = std::move(callback)](
1450                              folly::Try<folly::lift_unit_t<T>>&& tryResult) {
1451         callback->complete(std::move(tryResult));
1452       });
1453 }
1454 #endif
1455 
1456 std::string formatUnimplementedMethodException(std::string_view methodName);
1457 TApplicationException create_app_exn_unimplemented(const char* name);
1458 [[noreturn]] void throw_app_exn_unimplemented(char const* name);
1459 
1460 } // namespace si
1461 } // namespace detail
1462 
1463 namespace util {
1464 
1465 namespace detail {
1466 
1467 constexpr ErrorKind fromExceptionKind(ExceptionKind kind) {
1468   switch (kind) {
1469     case ExceptionKind::TRANSIENT:
1470       return ErrorKind::TRANSIENT;
1471 
1472     case ExceptionKind::STATEFUL:
1473       return ErrorKind::STATEFUL;
1474 
1475     case ExceptionKind::PERMANENT:
1476       return ErrorKind::PERMANENT;
1477 
1478     default:
1479       return ErrorKind::UNSPECIFIED;
1480   }
1481 }
1482 
1483 constexpr ErrorBlame fromExceptionBlame(ExceptionBlame blame) {
1484   switch (blame) {
1485     case ExceptionBlame::SERVER:
1486       return ErrorBlame::SERVER;
1487 
1488     case ExceptionBlame::CLIENT:
1489       return ErrorBlame::CLIENT;
1490 
1491     default:
1492       return ErrorBlame::UNSPECIFIED;
1493   }
1494 }
1495 
1496 constexpr ErrorSafety fromExceptionSafety(ExceptionSafety safety) {
1497   switch (safety) {
1498     case ExceptionSafety::SAFE:
1499       return ErrorSafety::SAFE;
1500 
1501     default:
1502       return ErrorSafety::UNSPECIFIED;
1503   }
1504 }
1505 
1506 template <typename T>
1507 std::string serializeExceptionMeta(const folly::exception_wrapper& ew) {
1508   ErrorClassification errorClassification;
1509 
1510   constexpr auto errorKind = apache::thrift::detail::st::struct_private_access::
1511       __fbthrift_cpp2_gen_exception_kind<T>();
1512   errorClassification.kind_ref() = fromExceptionKind(errorKind);
1513   constexpr auto errorBlame = apache::thrift::detail::st::
1514       struct_private_access::__fbthrift_cpp2_gen_exception_blame<T>();
1515   errorClassification.blame_ref() = fromExceptionBlame(errorBlame);
1516   constexpr auto errorSafety = apache::thrift::detail::st::
1517       struct_private_access::__fbthrift_cpp2_gen_exception_safety<T>();
1518   errorClassification.safety_ref() = fromExceptionSafety(errorSafety);
1519 
1520   ew.with_exception([&errorClassification](
1521                         const ExceptionMetadataOverrideBase& ex) {
1522     if (ex.errorKind() != ExceptionKind::UNSPECIFIED) {
1523       errorClassification.kind_ref() = fromExceptionKind(ex.errorKind());
1524     }
1525     if (ex.errorBlame() != ExceptionBlame::UNSPECIFIED) {
1526       errorClassification.blame_ref() = fromExceptionBlame(ex.errorBlame());
1527     }
1528     if (ex.errorSafety() != ExceptionSafety::UNSPECIFIED) {
1529       errorClassification.safety_ref() = fromExceptionSafety(ex.errorSafety());
1530     }
1531   });
1532 
1533   return apache::thrift::detail::serializeErrorClassification(
1534       errorClassification);
1535 }
1536 
1537 } // namespace detail
1538 
1539 void appendExceptionToHeader(
1540     const folly::exception_wrapper& ew, Cpp2RequestContext& ctx);
1541 
1542 template <typename T>
1543 void appendErrorClassificationToHeader(
1544     const folly::exception_wrapper& ew, Cpp2RequestContext& ctx) {
1545   auto header = ctx.getHeader();
1546   if (!header) {
1547     return;
1548   }
1549   auto exMeta = detail::serializeExceptionMeta<T>(ew);
1550   header->setHeader(
1551       std::string(apache::thrift::detail::kHeaderExMeta), std::move(exMeta));
1552 }
1553 
1554 TApplicationException toTApplicationException(
1555     const folly::exception_wrapper& ew);
1556 
1557 bool includeInRecentRequestsCount(const std::string_view methodName);
1558 
1559 } // namespace util
1560 
1561 } // namespace thrift
1562 } // namespace apache
1563