1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <string> 20 #include <string_view> 21 #include <type_traits> 22 #include <utility> 23 24 #include <folly/Portability.h> 25 26 #include <fmt/core.h> 27 #include <folly/Traits.h> 28 #include <folly/Utility.h> 29 #include <folly/experimental/coro/FutureUtil.h> 30 #include <folly/futures/Future.h> 31 #include <folly/io/Cursor.h> 32 #include <thrift/lib/cpp/protocol/TBase64Utils.h> 33 #include <thrift/lib/cpp2/PluggableFunction.h> 34 #include <thrift/lib/cpp2/SerializationSwitch.h> 35 #include <thrift/lib/cpp2/Thrift.h> 36 #include <thrift/lib/cpp2/async/AsyncProcessor.h> 37 #include <thrift/lib/cpp2/async/AsyncProcessorHelper.h> 38 #include <thrift/lib/cpp2/async/ClientBufferedStream.h> 39 #include <thrift/lib/cpp2/async/ClientSinkBridge.h> 40 #include <thrift/lib/cpp2/async/RequestChannel.h> 41 #include <thrift/lib/cpp2/async/Sink.h> 42 #include <thrift/lib/cpp2/async/StreamCallbacks.h> 43 #include <thrift/lib/cpp2/detail/meta.h> 44 #include <thrift/lib/cpp2/frozen/Frozen.h> 45 #include <thrift/lib/cpp2/protocol/Cpp2Ops.h> 46 #include <thrift/lib/cpp2/protocol/Traits.h> 47 #include <thrift/lib/cpp2/transport/core/RpcMetadataUtil.h> 48 #include <thrift/lib/cpp2/util/Frozen2ViewHelpers.h> 49 #include <thrift/lib/thrift/gen-cpp2/RpcMetadata_types.h> 50 51 namespace apache { 52 namespace thrift { 53 54 class BinaryProtocolReader; 55 class CompactProtocolReader; 56 57 namespace detail { 58 59 THRIFT_PLUGGABLE_FUNC_DECLARE( 60 bool, includeInRecentRequestsCount, const std::string_view /*methodName*/); 61 62 template <int N, int Size, class F, class Tuple> 63 struct ForEachImpl { forEachForEachImpl64 static uint32_t forEach(Tuple&& tuple, F&& f) { 65 uint32_t res = f(std::get<N>(tuple), N); 66 res += ForEachImpl<N + 1, Size, F, Tuple>::forEach( 67 std::forward<Tuple>(tuple), std::forward<F>(f)); 68 return res; 69 } 70 }; 71 template <int Size, class F, class Tuple> 72 struct ForEachImpl<Size, Size, F, Tuple> { 73 static uint32_t forEach(Tuple&& /*tuple*/, F&& /*f*/) { return 0; } 74 }; 75 76 template <int N = 0, class F, class Tuple> 77 uint32_t forEach(Tuple&& tuple, F&& f) { 78 return ForEachImpl< 79 N, 80 std::tuple_size<typename std::remove_reference<Tuple>::type>::value, 81 F, 82 Tuple>::forEach(std::forward<Tuple>(tuple), std::forward<F>(f)); 83 } 84 85 template <int N, int Size, class F, class Tuple> 86 struct ForEachVoidImpl { 87 static void forEach(Tuple&& tuple, F&& f) { 88 f(std::get<N>(tuple), N); 89 ForEachVoidImpl<N + 1, Size, F, Tuple>::forEach( 90 std::forward<Tuple>(tuple), std::forward<F>(f)); 91 } 92 }; 93 template <int Size, class F, class Tuple> 94 struct ForEachVoidImpl<Size, Size, F, Tuple> { 95 static void forEach(Tuple&& /*tuple*/, F&& /*f*/) {} 96 }; 97 98 template <int N = 0, class F, class Tuple> 99 void forEachVoid(Tuple&& tuple, F&& f) { 100 ForEachVoidImpl< 101 N, 102 std::tuple_size<typename std::remove_reference<Tuple>::type>::value, 103 F, 104 Tuple>::forEach(std::forward<Tuple>(tuple), std::forward<F>(f)); 105 } 106 107 template <typename Protocol, typename IsSet> 108 struct Writer { 109 Writer(Protocol* prot, const IsSet& isset) : prot_(prot), isset_(isset) {} 110 template <typename FieldData> 111 uint32_t operator()(const FieldData& fieldData, int index) { 112 using Ops = Cpp2Ops<typename FieldData::ref_type>; 113 114 if (!isset_.getIsSet(index)) { 115 return 0; 116 } 117 118 int16_t fid = FieldData::fid; 119 const auto& ex = fieldData.ref(); 120 121 uint32_t xfer = 0; 122 xfer += prot_->writeFieldBegin("", Ops::thriftType(), fid); 123 xfer += Ops::write(prot_, &ex); 124 xfer += prot_->writeFieldEnd(); 125 return xfer; 126 } 127 128 private: 129 Protocol* prot_; 130 const IsSet& isset_; 131 }; 132 133 template <typename Protocol, typename IsSet> 134 struct Sizer { 135 Sizer(Protocol* prot, const IsSet& isset) : prot_(prot), isset_(isset) {} 136 template <typename FieldData> 137 uint32_t operator()(const FieldData& fieldData, int index) { 138 using Ops = Cpp2Ops<typename FieldData::ref_type>; 139 140 if (!isset_.getIsSet(index)) { 141 return 0; 142 } 143 144 int16_t fid = FieldData::fid; 145 const auto& ex = fieldData.ref(); 146 147 uint32_t xfer = 0; 148 xfer += prot_->serializedFieldSize("", Ops::thriftType(), fid); 149 xfer += Ops::serializedSize(prot_, &ex); 150 return xfer; 151 } 152 153 private: 154 Protocol* prot_; 155 const IsSet& isset_; 156 }; 157 158 template <typename Protocol, typename IsSet> 159 struct SizerZC { 160 SizerZC(Protocol* prot, const IsSet& isset) : prot_(prot), isset_(isset) {} 161 template <typename FieldData> 162 uint32_t operator()(const FieldData& fieldData, int index) { 163 using Ops = Cpp2Ops<typename FieldData::ref_type>; 164 165 if (!isset_.getIsSet(index)) { 166 return 0; 167 } 168 169 int16_t fid = FieldData::fid; 170 const auto& ex = fieldData.ref(); 171 172 uint32_t xfer = 0; 173 xfer += prot_->serializedFieldSize("", Ops::thriftType(), fid); 174 xfer += Ops::serializedSizeZC(prot_, &ex); 175 return xfer; 176 } 177 178 private: 179 Protocol* prot_; 180 const IsSet& isset_; 181 }; 182 183 template <typename Protocol, typename IsSet> 184 struct Reader { 185 Reader( 186 Protocol* prot, 187 IsSet& isset, 188 int16_t fid, 189 protocol::TType ftype, 190 bool& success) 191 : prot_(prot), 192 isset_(isset), 193 fid_(fid), 194 ftype_(ftype), 195 success_(success) {} 196 template <typename FieldData> 197 void operator()(FieldData& fieldData, int index) { 198 using Ops = Cpp2Ops<typename FieldData::ref_type>; 199 200 if (ftype_ != Ops::thriftType()) { 201 return; 202 } 203 204 int16_t myfid = FieldData::fid; 205 auto& ex = fieldData.ref(); 206 if (myfid != fid_) { 207 return; 208 } 209 210 success_ = true; 211 isset_.setIsSet(index); 212 Ops::read(prot_, &ex); 213 } 214 215 private: 216 Protocol* prot_; 217 IsSet& isset_; 218 int16_t fid_; 219 protocol::TType ftype_; 220 bool& success_; 221 }; 222 223 template <typename T> 224 T& maybe_remove_pointer(T& x) { 225 return x; 226 } 227 228 template <typename T> 229 T& maybe_remove_pointer(T* x) { 230 return *x; 231 } 232 233 template <bool hasIsSet, size_t count> 234 struct IsSetHelper { 235 void setIsSet(size_t /*index*/, bool /*value*/ = true) {} 236 bool getIsSet(size_t /*index*/) const { return true; } 237 }; 238 239 template <size_t count> 240 struct IsSetHelper<true, count> { 241 void setIsSet(size_t index, bool value = true) { isset_[index] = value; } 242 bool getIsSet(size_t index) const { return isset_[index]; } 243 244 private: 245 std::array<bool, count> isset_ = {}; 246 }; 247 248 } // namespace detail 249 250 template <int16_t Fid, typename TC, typename T> 251 struct FieldData { 252 static const constexpr int16_t fid = Fid; 253 static const constexpr protocol::TType ttype = protocol_type_v<TC, T>; 254 typedef TC type_class; 255 typedef T type; 256 typedef typename std::remove_pointer<T>::type ref_type; 257 T value; 258 ref_type& ref() { 259 return apache::thrift::detail::maybe_remove_pointer(value); 260 } 261 const ref_type& ref() const { 262 return apache::thrift::detail::maybe_remove_pointer(value); 263 } 264 }; 265 266 template <bool hasIsSet, typename... Field> 267 class ThriftPresult 268 : private std::tuple<Field...>, 269 public apache::thrift::detail::IsSetHelper<hasIsSet, sizeof...(Field)> { 270 // The fields tuple and IsSetHelper are base classes (rather than members) 271 // to employ the empty base class optimization when they are empty 272 typedef std::tuple<Field...> Fields; 273 typedef apache::thrift::detail::IsSetHelper<hasIsSet, sizeof...(Field)> 274 CurIsSetHelper; 275 276 public: 277 using size = std::tuple_size<Fields>; 278 279 CurIsSetHelper& isSet() { return *this; } 280 const CurIsSetHelper& isSet() const { return *this; } 281 Fields& fields() { return *this; } 282 const Fields& fields() const { return *this; } 283 284 // returns lvalue ref to the appropriate FieldData 285 template <size_t index> 286 auto get() -> decltype(std::get<index>(this->fields())) { 287 return std::get<index>(this->fields()); 288 } 289 290 template <size_t index> 291 auto get() const -> decltype(std::get<index>(this->fields())) { 292 return std::get<index>(this->fields()); 293 } 294 295 template <class Protocol> 296 uint32_t read(Protocol* prot) { 297 auto xfer = prot->getCursorPosition(); 298 std::string fname; 299 apache::thrift::protocol::TType ftype; 300 int16_t fid; 301 302 prot->readStructBegin(fname); 303 304 while (true) { 305 prot->readFieldBegin(fname, ftype, fid); 306 if (ftype == apache::thrift::protocol::T_STOP) { 307 break; 308 } 309 bool readSomething = false; 310 apache::thrift::detail::forEachVoid( 311 fields(), 312 apache::thrift::detail::Reader<Protocol, CurIsSetHelper>( 313 prot, isSet(), fid, ftype, readSomething)); 314 if (!readSomething) { 315 prot->skip(ftype); 316 } 317 prot->readFieldEnd(); 318 } 319 prot->readStructEnd(); 320 321 return folly::to_narrow(prot->getCursorPosition() - xfer); 322 } 323 324 template <class Protocol> 325 uint32_t serializedSize(Protocol* prot) const { 326 uint32_t xfer = 0; 327 xfer += prot->serializedStructSize(""); 328 xfer += apache::thrift::detail::forEach( 329 fields(), 330 apache::thrift::detail::Sizer<Protocol, CurIsSetHelper>(prot, isSet())); 331 xfer += prot->serializedSizeStop(); 332 return xfer; 333 } 334 335 template <class Protocol> 336 uint32_t serializedSizeZC(Protocol* prot) const { 337 uint32_t xfer = 0; 338 xfer += prot->serializedStructSize(""); 339 xfer += apache::thrift::detail::forEach( 340 fields(), 341 apache::thrift::detail::SizerZC<Protocol, CurIsSetHelper>( 342 prot, isSet())); 343 xfer += prot->serializedSizeStop(); 344 return xfer; 345 } 346 347 template <class Protocol> 348 uint32_t write(Protocol* prot) const { 349 uint32_t xfer = 0; 350 xfer += prot->writeStructBegin(""); 351 xfer += apache::thrift::detail::forEach( 352 fields(), 353 apache::thrift::detail::Writer<Protocol, CurIsSetHelper>( 354 prot, isSet())); 355 xfer += prot->writeFieldStop(); 356 xfer += prot->writeStructEnd(); 357 return xfer; 358 } 359 }; 360 361 template <typename PResults, typename StreamPresult> 362 struct ThriftPResultStream { 363 using StreamPResultType = StreamPresult; 364 using FieldsType = PResults; 365 366 PResults fields; 367 StreamPresult stream; 368 }; 369 370 template < 371 typename PResults, 372 typename SinkPresult, 373 typename FinalResponsePresult> 374 struct ThriftPResultSink { 375 using SinkPResultType = SinkPresult; 376 using FieldsType = PResults; 377 using FinalResponsePResultType = FinalResponsePresult; 378 379 PResults fields; 380 SinkPresult stream; 381 FinalResponsePresult finalResponse; 382 }; 383 384 template <bool hasIsSet, class... Args> 385 class Cpp2Ops<ThriftPresult<hasIsSet, Args...>> { 386 public: 387 typedef ThriftPresult<hasIsSet, Args...> Presult; 388 static constexpr protocol::TType thriftType() { return protocol::T_STRUCT; } 389 template <class Protocol> 390 static uint32_t write(Protocol* prot, const Presult* value) { 391 return value->write(prot); 392 } 393 template <class Protocol> 394 static uint32_t read(Protocol* prot, Presult* value) { 395 return value->read(prot); 396 } 397 template <class Protocol> 398 static uint32_t serializedSize(Protocol* prot, const Presult* value) { 399 return value->serializedSize(prot); 400 } 401 template <class Protocol> 402 static uint32_t serializedSizeZC(Protocol* prot, const Presult* value) { 403 return value->serializedSizeZC(prot); 404 } 405 }; 406 407 // Forward declaration 408 namespace detail { 409 namespace ap { 410 411 template < 412 ErrorBlame Blame, 413 typename Protocol, 414 typename PResult, 415 typename T, 416 typename ErrorMapFunc> 417 class StreamElementEncoderImpl; 418 419 template <typename Protocol, typename PResult, typename T> 420 folly::Try<T> decode_stream_element( 421 folly::Try<apache::thrift::StreamPayload>&& payload); 422 423 template <typename Protocol, typename PResult, typename T> 424 apache::thrift::ClientBufferedStream<T> decode_client_buffered_stream( 425 apache::thrift::detail::ClientStreamBridge::ClientPtr streamBridge, 426 const BufferOptions& bufferOptions); 427 428 template <typename Protocol, typename PResult, typename T> 429 std::unique_ptr<folly::IOBuf> encode_stream_payload(T&& _item); 430 431 template <typename Protocol, typename PResult> 432 std::unique_ptr<folly::IOBuf> encode_stream_payload(folly::IOBuf&& _item); 433 434 template <typename Protocol, typename PResult, typename ErrorMapFunc> 435 EncodedStreamError encode_stream_exception(folly::exception_wrapper ew); 436 437 template <typename Protocol, typename PResult, typename T> 438 T decode_stream_payload_impl(folly::IOBuf& payload, folly::tag_t<T>); 439 440 template <typename Protocol, typename PResult, typename T> 441 folly::IOBuf decode_stream_payload_impl( 442 folly::IOBuf& payload, folly::tag_t<folly::IOBuf>); 443 444 template <typename Protocol, typename PResult, typename T> 445 T decode_stream_payload(folly::IOBuf& payload); 446 447 template <typename Protocol, typename PResult, typename T> 448 folly::exception_wrapper decode_stream_exception(folly::exception_wrapper ew); 449 450 struct EmptyExMapType { 451 template <typename PResult> 452 bool operator()(PResult&, folly::exception_wrapper) { 453 return false; 454 } 455 }; 456 457 } // namespace ap 458 } // namespace detail 459 460 // AsyncClient helpers 461 namespace detail { 462 namespace ac { 463 464 template <bool HasReturnType, typename PResult> 465 folly::exception_wrapper extract_exn(PResult& result) { 466 using base = std::integral_constant<std::size_t, HasReturnType ? 1 : 0>; 467 auto ew = folly::exception_wrapper(); 468 if (HasReturnType && result.getIsSet(0)) { 469 return ew; 470 } 471 foreach_index<PResult::size::value - base::value>([&](auto index) { 472 if (!ew && result.getIsSet(index.value + base::value)) { 473 auto& fdata = result.template get<index.value + base::value>(); 474 ew = folly::exception_wrapper(std::move(fdata.ref())); 475 } 476 }); 477 if (!ew && HasReturnType) { 478 ew = folly::make_exception_wrapper<TApplicationException>( 479 TApplicationException::TApplicationExceptionType::MISSING_RESULT, 480 "failed: unknown result"); 481 } 482 return ew; 483 } 484 485 template <typename Protocol, typename PResult> 486 folly::exception_wrapper recv_wrapped_helper( 487 Protocol* prot, ClientReceiveState& state, PResult& result) { 488 ContextStack* ctx = state.ctx(); 489 MessageType mtype = state.messageType(); 490 if (ctx) { 491 ctx->preRead(); 492 } 493 try { 494 const folly::IOBuf& buffer = *state.serializedResponse().buffer; 495 // TODO: re-enable checksumming after we properly adjust checksum on the 496 // server to exclude the envelope. 497 // if (state.header() && state.header()->getCrc32c().has_value() && 498 // checksum::crc32c(buffer) != *state.header()->getCrc32c()) { 499 // return folly::make_exception_wrapper<TApplicationException>( 500 // TApplicationException::TApplicationExceptionType::CHECKSUM_MISMATCH, 501 // "corrupted response"); 502 // } 503 if (mtype == MessageType::T_EXCEPTION) { 504 TApplicationException x; 505 apache::thrift::detail::deserializeExceptionBody(prot, &x); 506 return folly::exception_wrapper(std::move(x)); 507 } 508 if (mtype != MessageType::T_REPLY) { 509 prot->skip(protocol::T_STRUCT); 510 return folly::make_exception_wrapper<TApplicationException>( 511 TApplicationException::TApplicationExceptionType:: 512 INVALID_MESSAGE_TYPE); 513 } 514 SerializedMessage smsg; 515 smsg.protocolType = prot->protocolType(); 516 smsg.buffer = &buffer; 517 if (ctx) { 518 ctx->onReadData(smsg); 519 } 520 apache::thrift::detail::deserializeRequestBody(prot, &result); 521 if (ctx) { 522 ctx->postRead( 523 state.header(), folly::to_narrow(buffer.computeChainDataLength())); 524 } 525 return folly::exception_wrapper(); 526 } catch (std::exception const& e) { 527 return folly::exception_wrapper(std::current_exception(), e); 528 } catch (...) { 529 return folly::exception_wrapper(std::current_exception()); 530 } 531 } 532 533 template <typename PResult, typename Protocol, typename... ReturnTs> 534 folly::exception_wrapper recv_wrapped( 535 Protocol* prot, ClientReceiveState& state, ReturnTs&... _returns) { 536 prot->setInput(state.serializedResponse().buffer.get()); 537 auto guard = folly::makeGuard([&] { prot->setInput(nullptr); }); 538 apache::thrift::ContextStack* ctx = state.ctx(); 539 PResult result; 540 foreach( 541 [&](auto index, auto& obj) { 542 result.template get<index.value>().value = &obj; 543 }, 544 _returns...); 545 auto ew = recv_wrapped_helper(prot, state, result); 546 if (!ew) { 547 constexpr auto const kHasReturnType = sizeof...(_returns) != 0; 548 ew = apache::thrift::detail::ac::extract_exn<kHasReturnType>(result); 549 } 550 if (ctx && ew) { 551 ctx->handlerErrorWrapped(ew); 552 } 553 return ew; 554 } 555 556 template <typename PResult, typename Protocol, typename Response, typename Item> 557 folly::exception_wrapper recv_wrapped( 558 Protocol* prot, 559 ClientReceiveState& state, 560 apache::thrift::ResponseAndClientBufferedStream<Response, Item>& _return) { 561 prot->setInput(state.serializedResponse().buffer.get()); 562 auto guard = folly::makeGuard([&] { prot->setInput(nullptr); }); 563 apache::thrift::ContextStack* ctx = state.ctx(); 564 565 typename PResult::FieldsType result; 566 result.template get<0>().value = &_return.response; 567 568 auto ew = recv_wrapped_helper(prot, state, result); 569 if (!ew) { 570 ew = apache::thrift::detail::ac::extract_exn<true>(result); 571 } 572 if (ctx && ew) { 573 ctx->handlerErrorWrapped(ew); 574 } 575 576 if (!ew) { 577 _return.stream = apache::thrift::detail::ap::decode_client_buffered_stream< 578 Protocol, 579 typename PResult::StreamPResultType, 580 Item>(state.extractStreamBridge(), state.bufferOptions()); 581 } 582 return ew; 583 } 584 585 template <typename PResult, typename Protocol, typename Item> 586 folly::exception_wrapper recv_wrapped( 587 Protocol* prot, 588 ClientReceiveState& state, 589 apache::thrift::ClientBufferedStream<Item>& _return) { 590 prot->setInput(state.serializedResponse().buffer.get()); 591 auto guard = folly::makeGuard([&] { prot->setInput(nullptr); }); 592 apache::thrift::ContextStack* ctx = state.ctx(); 593 594 typename PResult::FieldsType result; 595 596 auto ew = recv_wrapped_helper(prot, state, result); 597 if (!ew) { 598 ew = apache::thrift::detail::ac::extract_exn<false>(result); 599 } 600 if (ctx && ew) { 601 ctx->handlerErrorWrapped(ew); 602 } 603 604 if (!ew) { 605 _return = apache::thrift::detail::ap::decode_client_buffered_stream< 606 Protocol, 607 typename PResult::StreamPResultType, 608 Item>(state.extractStreamBridge(), state.bufferOptions()); 609 } 610 return ew; 611 } 612 613 #if FOLLY_HAS_COROUTINES 614 615 template < 616 typename ProtocolReader, 617 typename ProtocolWriter, 618 typename SinkPResult, 619 typename SinkType, 620 typename FinalResponsePResult, 621 typename FinalResponseType, 622 typename ErrorMapFunc> 623 ClientSink<SinkType, FinalResponseType> createSink( 624 apache::thrift::detail::ClientSinkBridge::Ptr impl) { 625 static apache::thrift::detail::ap::StreamElementEncoderImpl< 626 ErrorBlame::CLIENT, 627 ProtocolWriter, 628 SinkPResult, 629 SinkType, 630 std::decay_t<ErrorMapFunc>> 631 encode; 632 return ClientSink<SinkType, FinalResponseType>( 633 std::move(impl), 634 &encode, 635 apache::thrift::detail::ap::decode_stream_element< 636 ProtocolReader, 637 FinalResponsePResult, 638 FinalResponseType>); 639 } 640 #endif 641 642 template < 643 typename PResult, 644 typename ErrorMapFunc, 645 typename ProtocolWriter, 646 typename ProtocolReader, 647 typename Response, 648 typename Item, 649 typename FinalResponse> 650 folly::exception_wrapper recv_wrapped( 651 ProtocolReader* prot, 652 ClientReceiveState& state, 653 apache::thrift::detail::ClientSinkBridge::Ptr impl, 654 apache::thrift::ResponseAndClientSink<Response, Item, FinalResponse>& 655 _return) { 656 #if FOLLY_HAS_COROUTINES 657 prot->setInput(state.serializedResponse().buffer.get()); 658 auto guard = folly::makeGuard([&] { prot->setInput(nullptr); }); 659 apache::thrift::ContextStack* ctx = state.ctx(); 660 661 typename PResult::FieldsType result; 662 result.template get<0>().value = &_return.response; 663 664 auto ew = recv_wrapped_helper(prot, state, result); 665 if (!ew) { 666 ew = apache::thrift::detail::ac::extract_exn<true>(result); 667 } 668 if (ctx && ew) { 669 ctx->handlerErrorWrapped(ew); 670 } 671 672 if (!ew) { 673 _return.sink = createSink< 674 ProtocolReader, 675 ProtocolWriter, 676 typename PResult::SinkPResultType, 677 Item, 678 typename PResult::FinalResponsePResultType, 679 FinalResponse, 680 std::decay_t<ErrorMapFunc>>(std::move(impl)); 681 } 682 return ew; 683 #else 684 (void)prot; 685 (void)state; 686 (void)impl; 687 (void)_return; 688 std::terminate(); 689 #endif 690 } 691 692 template < 693 typename PResult, 694 typename ErrorMapFunc, 695 typename ProtocolWriter, 696 typename ProtocolReader, 697 typename Item, 698 typename FinalResponse> 699 folly::exception_wrapper recv_wrapped( 700 ProtocolReader* prot, 701 ClientReceiveState& state, 702 apache::thrift::detail::ClientSinkBridge::Ptr impl, 703 apache::thrift::ClientSink<Item, FinalResponse>& _return) { 704 #if FOLLY_HAS_COROUTINES 705 prot->setInput(state.serializedResponse().buffer.get()); 706 auto guard = folly::makeGuard([&] { prot->setInput(nullptr); }); 707 apache::thrift::ContextStack* ctx = state.ctx(); 708 709 typename PResult::FieldsType result; 710 711 auto ew = recv_wrapped_helper(prot, state, result); 712 if (!ew) { 713 ew = apache::thrift::detail::ac::extract_exn<false>(result); 714 } 715 if (ctx && ew) { 716 ctx->handlerErrorWrapped(ew); 717 } 718 719 if (!ew) { 720 _return = createSink< 721 ProtocolReader, 722 ProtocolWriter, 723 typename PResult::SinkPResultType, 724 Item, 725 typename PResult::FinalResponsePResultType, 726 FinalResponse, 727 std::decay_t<ErrorMapFunc>>(std::move(impl)); 728 } 729 return ew; 730 #else 731 (void)prot; 732 (void)state; 733 (void)impl; 734 (void)_return; 735 std::terminate(); 736 #endif 737 } 738 739 [[noreturn]] void throw_app_exn(char const* msg); 740 } // namespace ac 741 } // namespace detail 742 743 // AsyncProcessor helpers 744 namespace detail { 745 namespace ap { 746 747 // Everything templated on only protocol goes here. The corresponding .cpp file 748 // explicitly instantiates this struct for each supported protocol. 749 template <typename ProtocolReader, typename ProtocolWriter> 750 struct helper { 751 static std::unique_ptr<folly::IOBuf> write_exn( 752 bool includeEnvelope, 753 const char* method, 754 ProtocolWriter* prot, 755 int32_t protoSeqId, 756 ContextStack* ctx, 757 const TApplicationException& x); 758 759 // Temporary for backwards compatibility 760 static std::unique_ptr<folly::IOBuf> write_exn( 761 const char* method, 762 ProtocolWriter* prot, 763 int32_t protoSeqId, 764 ContextStack* ctx, 765 const TApplicationException& x) { 766 return write_exn(true, method, prot, protoSeqId, ctx, x); 767 } 768 769 static void process_exn( 770 const char* func, 771 const TApplicationException::TApplicationExceptionType type, 772 const std::string& msg, 773 ResponseChannelRequest::UniquePtr req, 774 Cpp2RequestContext* ctx, 775 folly::EventBase* eb, 776 int32_t protoSeqId); 777 }; 778 779 template <typename ProtocolReader> 780 using writer_of = typename ProtocolReader::ProtocolWriter; 781 template <typename ProtocolWriter> 782 using reader_of = typename ProtocolWriter::ProtocolReader; 783 784 template <typename ProtocolReader> 785 using helper_r = helper<ProtocolReader, writer_of<ProtocolReader>>; 786 template <typename ProtocolWriter> 787 using helper_w = helper<reader_of<ProtocolWriter>, ProtocolWriter>; 788 789 template <typename T> 790 inline constexpr bool is_root_async_processor = 791 std::is_void_v<typename T::BaseAsyncProcessor>; 792 793 template <typename Derived> 794 GeneratedAsyncProcessor::ProcessFunc<Derived> getProcessFuncFromProtocol( 795 folly::tag_t<CompactProtocolReader> /* unused */, 796 const GeneratedAsyncProcessor::ProcessFuncs<Derived>& funcs) { 797 return funcs.compact; 798 } 799 template <typename Derived> 800 GeneratedAsyncProcessor::ProcessFunc<Derived> getProcessFuncFromProtocol( 801 folly::tag_t<BinaryProtocolReader> /* unused */, 802 const GeneratedAsyncProcessor::ProcessFuncs<Derived>& funcs) { 803 return funcs.binary; 804 } 805 806 inline void nonRecursiveProcessMissing( 807 const std::string& methodName, 808 ResponseChannelRequest::UniquePtr req, 809 folly::EventBase* eb) { 810 if (req) { 811 eb->runInEventBaseThread([request = std::move(req), 812 name = std::string{methodName}]() mutable { 813 AsyncProcessorHelper::sendUnknownMethodError(std::move(request), name); 814 }); 815 } 816 } 817 818 template <class ProtocolReader, class Processor> 819 void recursiveProcessMissing( 820 Processor* processor, 821 const std::string& fname, 822 ResponseChannelRequest::UniquePtr req, 823 apache::thrift::SerializedCompressedRequest&& serializedRequest, 824 Cpp2RequestContext* ctx, 825 folly::EventBase* eb, 826 concurrency::ThreadManager* tm); 827 828 template <class ProtocolReader, class Processor> 829 void recursiveProcessPmap( 830 Processor* proc, 831 const typename Processor::ProcessMap& pmap, 832 ResponseChannelRequest::UniquePtr req, 833 apache::thrift::SerializedCompressedRequest&& serializedRequest, 834 Cpp2RequestContext* ctx, 835 folly::EventBase* eb, 836 concurrency::ThreadManager* tm) { 837 const auto& fname = ctx->getMethodName(); 838 auto processFuncs = pmap.find(fname); 839 if (processFuncs == pmap.end()) { 840 recursiveProcessMissing<ProtocolReader>( 841 proc, fname, std::move(req), std::move(serializedRequest), ctx, eb, tm); 842 return; 843 } 844 845 auto pfn = getProcessFuncFromProtocol( 846 folly::tag<ProtocolReader>, processFuncs->second); 847 (proc->*pfn)(std::move(req), std::move(serializedRequest), ctx, eb, tm); 848 } 849 850 template <class ProtocolReader, class Processor> 851 void recursiveProcessMissing( 852 Processor* processor, 853 const std::string& fname, 854 ResponseChannelRequest::UniquePtr req, 855 apache::thrift::SerializedCompressedRequest&& serializedRequest, 856 Cpp2RequestContext* ctx, 857 folly::EventBase* eb, 858 concurrency::ThreadManager* tm) { 859 if constexpr (is_root_async_processor<Processor>) { 860 nonRecursiveProcessMissing(fname, std::move(req), eb); 861 } else { 862 using BaseAsyncProcessor = typename Processor::BaseAsyncProcessor; 863 recursiveProcessPmap<ProtocolReader, BaseAsyncProcessor>( 864 processor, 865 BaseAsyncProcessor::getOwnProcessMap(), 866 std::move(req), 867 std::move(serializedRequest), 868 ctx, 869 eb, 870 tm); 871 } 872 } 873 874 /** 875 * Recursive implementation of method resolution based on 876 * Processor::getOwnProcessMap(). This is the fallback/legacy implementation for 877 * generated AsyncProcessor::processSerializedCompressedRequest, which is called 878 * in the absence of MethodMetadata support. 879 * 880 * TODO(praihan): Remove this implementation once all services support 881 * MethodMetadata. 882 */ 883 template <class ProtocolReader, class Processor> 884 void recursiveProcess( 885 Processor* processor, 886 ResponseChannelRequest::UniquePtr req, 887 apache::thrift::SerializedCompressedRequest&& serializedRequest, 888 Cpp2RequestContext* ctx, 889 folly::EventBase* eb, 890 concurrency::ThreadManager* tm) { 891 return recursiveProcessPmap<ProtocolReader>( 892 processor, 893 Processor::getOwnProcessMap(), 894 std::move(req), 895 std::move(serializedRequest), 896 ctx, 897 eb, 898 tm); 899 } 900 901 /** 902 * Non-recursive implementation of method resolution based on 903 * the passed-in MethodMetadata. 904 * See ServerInterface::GeneratedMethodMetadata. 905 */ 906 template <class ProtocolReader, class Processor> 907 void nonRecursiveProcess( 908 Processor* processor, 909 ResponseChannelRequest::UniquePtr req, 910 apache::thrift::SerializedCompressedRequest&& serializedRequest, 911 const apache::thrift::AsyncProcessor::MethodMetadata& untypedMethodMetadata, 912 Cpp2RequestContext* ctx, 913 folly::EventBase* eb, 914 concurrency::ThreadManager* tm) { 915 using Metadata = ServerInterface::GeneratedMethodMetadata<Processor>; 916 static_assert(std::is_final_v<Metadata>); 917 const auto& methodMetadata = 918 AsyncProcessorHelper::expectMetadataOfType<Metadata>( 919 untypedMethodMetadata); 920 auto pfn = getProcessFuncFromProtocol( 921 folly::tag<ProtocolReader>, methodMetadata.processFuncs); 922 (processor->*pfn)(std::move(req), std::move(serializedRequest), ctx, eb, tm); 923 } 924 925 // Generated AsyncProcessor::processSerializedCompressedRequestWithMetadata just 926 // calls this 927 template <class Processor> 928 void process( 929 Processor* processor, 930 ResponseChannelRequest::UniquePtr req, 931 apache::thrift::SerializedCompressedRequest&& serializedRequest, 932 const apache::thrift::AsyncProcessor::MethodMetadata& methodMetadata, 933 protocol::PROTOCOL_TYPES protType, 934 Cpp2RequestContext* ctx, 935 folly::EventBase* eb, 936 concurrency::ThreadManager* tm) { 937 switch (protType) { 938 case protocol::T_BINARY_PROTOCOL: { 939 return nonRecursiveProcess<BinaryProtocolReader>( 940 processor, 941 std::move(req), 942 std::move(serializedRequest), 943 methodMetadata, 944 ctx, 945 eb, 946 tm); 947 } 948 case protocol::T_COMPACT_PROTOCOL: { 949 return nonRecursiveProcess<CompactProtocolReader>( 950 processor, 951 std::move(req), 952 std::move(serializedRequest), 953 methodMetadata, 954 ctx, 955 eb, 956 tm); 957 } 958 default: 959 LOG(ERROR) << "invalid protType: " << folly::to_underlying(protType); 960 return; 961 } 962 } 963 964 // Generated AsyncProcessor::processSerializedCompressedRequest just calls this 965 template <class Processor> 966 void process( 967 Processor* processor, 968 ResponseChannelRequest::UniquePtr req, 969 apache::thrift::SerializedCompressedRequest&& serializedRequest, 970 protocol::PROTOCOL_TYPES protType, 971 Cpp2RequestContext* ctx, 972 folly::EventBase* eb, 973 concurrency::ThreadManager* tm) { 974 switch (protType) { 975 case protocol::T_BINARY_PROTOCOL: { 976 return recursiveProcess<BinaryProtocolReader>( 977 processor, std::move(req), std::move(serializedRequest), ctx, eb, tm); 978 } 979 case protocol::T_COMPACT_PROTOCOL: { 980 return recursiveProcess<CompactProtocolReader>( 981 processor, std::move(req), std::move(serializedRequest), ctx, eb, tm); 982 } 983 default: 984 LOG(ERROR) << "invalid protType: " << folly::to_underlying(protType); 985 return; 986 } 987 } 988 989 struct MessageBegin : folly::MoveOnly { 990 std::string methodName; 991 struct Metadata { 992 std::string errMessage; 993 size_t size{0}; 994 int32_t seqId{0}; 995 MessageType msgType{}; 996 bool isValid{true}; 997 } metadata; 998 }; 999 1000 bool setupRequestContextWithMessageBegin( 1001 const MessageBegin::Metadata& msgBegin, 1002 protocol::PROTOCOL_TYPES protType, 1003 ResponseChannelRequest::UniquePtr& req, 1004 Cpp2RequestContext* ctx, 1005 folly::EventBase* eb); 1006 1007 MessageBegin deserializeMessageBegin( 1008 const folly::IOBuf& buf, protocol::PROTOCOL_TYPES protType); 1009 1010 /** 1011 * The function pointers themselves are contravariant on the Processor type but 1012 * ProcessFuncs is not because templates are invariant. This function performs 1013 * the conversion manually. 1014 */ 1015 template <class DerivedProcessor, class BaseProcessor> 1016 std::enable_if_t< 1017 std::is_base_of_v<BaseProcessor, DerivedProcessor>, 1018 GeneratedAsyncProcessor::ProcessFuncs<DerivedProcessor>> 1019 downcastProcessFuncs( 1020 const GeneratedAsyncProcessor::ProcessFuncs<BaseProcessor>& processFuncs) { 1021 return GeneratedAsyncProcessor::ProcessFuncs<DerivedProcessor>{ 1022 processFuncs.compact, processFuncs.binary}; 1023 } 1024 1025 template < 1026 class MostDerivedProcessor, 1027 class CurrentProcessor = MostDerivedProcessor> 1028 void populateMethodMetadataMap(AsyncProcessorFactory::MethodMetadataMap& map) { 1029 for (const auto& [methodName, processFuncs] : 1030 CurrentProcessor::getOwnProcessMap()) { 1031 map.emplace( 1032 methodName, 1033 // Always create GeneratatedMethodMetadata<MostDerivedProcessor> so that 1034 // all entries in the map are of the same type. 1035 std::make_shared< 1036 ServerInterface::GeneratedMethodMetadata<MostDerivedProcessor>>( 1037 downcastProcessFuncs<MostDerivedProcessor>(processFuncs))); 1038 } 1039 if constexpr (!is_root_async_processor<CurrentProcessor>) { 1040 populateMethodMetadataMap< 1041 MostDerivedProcessor, 1042 typename CurrentProcessor::BaseAsyncProcessor>(map); 1043 } 1044 } 1045 1046 template <class Processor> 1047 AsyncProcessorFactory::MethodMetadataMap createMethodMetadataMap() { 1048 AsyncProcessorFactory::MethodMetadataMap result; 1049 populateMethodMetadataMap<Processor>(result); 1050 return result; 1051 } 1052 1053 template <typename Protocol, typename PResult, typename T> 1054 std::unique_ptr<folly::IOBuf> encode_stream_payload(T&& _item) { 1055 PResult res; 1056 res.template get<0>().value = const_cast<T*>(&_item); 1057 res.setIsSet(0); 1058 1059 folly::IOBufQueue queue(folly::IOBufQueue::cacheChainLength()); 1060 Protocol prot; 1061 prot.setOutput(&queue, res.serializedSizeZC(&prot)); 1062 1063 res.write(&prot); 1064 return std::move(queue).move(); 1065 } 1066 1067 template <typename Protocol, typename PResult> 1068 std::unique_ptr<folly::IOBuf> encode_stream_payload(folly::IOBuf&& _item) { 1069 return std::make_unique<folly::IOBuf>(std::move(_item)); 1070 } 1071 1072 template < 1073 ErrorBlame Blame, 1074 typename Protocol, 1075 typename PResult, 1076 typename ErrorMapFunc> 1077 EncodedStreamError encode_stream_exception(folly::exception_wrapper ew) { 1078 ErrorMapFunc mapException; 1079 Protocol prot; 1080 folly::IOBufQueue queue(folly::IOBufQueue::cacheChainLength()); 1081 PResult res; 1082 1083 PayloadExceptionMetadata exceptionMetadata; 1084 PayloadExceptionMetadataBase exceptionMetadataBase; 1085 if (mapException(res, ew)) { 1086 prot.setOutput(&queue, res.serializedSizeZC(&prot)); 1087 res.write(&prot); 1088 exceptionMetadata.declaredException_ref() = 1089 PayloadDeclaredExceptionMetadata(); 1090 } else { 1091 constexpr size_t kQueueAppenderGrowth = 4096; 1092 prot.setOutput(&queue, kQueueAppenderGrowth); 1093 TApplicationException ex(ew.what().toStdString()); 1094 exceptionMetadataBase.what_utf8_ref() = ex.what(); 1095 apache::thrift::detail::serializeExceptionBody(&prot, &ex); 1096 PayloadAppUnknownExceptionMetdata aue; 1097 aue.errorClassification_ref().ensure().blame_ref() = Blame; 1098 exceptionMetadata.appUnknownException_ref() = std::move(aue); 1099 } 1100 1101 exceptionMetadataBase.metadata_ref() = std::move(exceptionMetadata); 1102 StreamPayloadMetadata streamPayloadMetadata; 1103 PayloadMetadata payloadMetadata; 1104 payloadMetadata.exceptionMetadata_ref() = std::move(exceptionMetadataBase); 1105 streamPayloadMetadata.payloadMetadata_ref() = std::move(payloadMetadata); 1106 return EncodedStreamError( 1107 StreamPayload(std::move(queue).move(), std::move(streamPayloadMetadata))); 1108 } 1109 1110 template < 1111 ErrorBlame Blame, 1112 typename Protocol, 1113 typename PResult, 1114 typename T, 1115 typename ErrorMapFunc> 1116 class StreamElementEncoderImpl final 1117 : public apache::thrift::detail::StreamElementEncoder<T> { 1118 folly::Try<StreamPayload> operator()(T&& val) override { 1119 StreamPayloadMetadata streamPayloadMetadata; 1120 PayloadMetadata payloadMetadata; 1121 payloadMetadata.responseMetadata_ref().ensure(); 1122 streamPayloadMetadata.payloadMetadata_ref() = std::move(payloadMetadata); 1123 return folly::Try<StreamPayload>( 1124 {encode_stream_payload<Protocol, PResult>(std::move(val)), 1125 std::move(streamPayloadMetadata)}); 1126 } 1127 1128 folly::Try<StreamPayload> operator()(folly::exception_wrapper&& e) override { 1129 return folly::Try<StreamPayload>(folly::exception_wrapper( 1130 encode_stream_exception<Blame, Protocol, PResult, ErrorMapFunc>(e))); 1131 } 1132 }; 1133 1134 template <typename Protocol, typename PResult, typename T> 1135 T decode_stream_payload_impl(folly::IOBuf& payload, folly::tag_t<T>) { 1136 PResult args; 1137 T res{}; 1138 args.template get<0>().value = &res; 1139 1140 Protocol prot; 1141 prot.setInput(&payload); 1142 args.read(&prot); 1143 return res; 1144 } 1145 1146 template <typename Protocol, typename PResult, typename T> 1147 folly::IOBuf decode_stream_payload_impl( 1148 folly::IOBuf& payload, folly::tag_t<folly::IOBuf>) { 1149 return std::move(payload); 1150 } 1151 1152 template <typename Protocol, typename PResult, typename T> 1153 T decode_stream_payload(folly::IOBuf& payload) { 1154 return decode_stream_payload_impl<Protocol, PResult, T>( 1155 payload, folly::tag_t<T>{}); 1156 } 1157 1158 template <typename Protocol, typename PResult, typename T> 1159 folly::exception_wrapper decode_stream_exception(folly::exception_wrapper ew) { 1160 folly::exception_wrapper hijacked; 1161 ew.handle( 1162 [&hijacked](apache::thrift::detail::EncodedError& err) { 1163 PResult result; 1164 T res{}; 1165 result.template get<0>().value = &res; 1166 Protocol prot; 1167 prot.setInput(err.encoded.get()); 1168 result.read(&prot); 1169 1170 CHECK(!result.getIsSet(0)); 1171 1172 foreach_index<PResult::size::value - 1>([&](auto index) { 1173 if (!hijacked && result.getIsSet(index.value + 1)) { 1174 auto& fdata = result.template get<index.value + 1>(); 1175 hijacked = folly::exception_wrapper(std::move(fdata.ref())); 1176 } 1177 }); 1178 1179 if (!hijacked) { 1180 // Could not decode the error. It may be a TApplicationException 1181 TApplicationException x; 1182 prot.setInput(err.encoded.get()); 1183 apache::thrift::detail::deserializeExceptionBody(&prot, &x); 1184 hijacked = folly::exception_wrapper(std::move(x)); 1185 } 1186 }, 1187 [&hijacked](apache::thrift::detail::EncodedStreamError& err) { 1188 auto& payload = err.encoded; 1189 DCHECK_EQ(payload.metadata.payloadMetadata_ref().has_value(), true); 1190 DCHECK_EQ( 1191 payload.metadata.payloadMetadata_ref()->getType(), 1192 PayloadMetadata::exceptionMetadata); 1193 auto& exceptionMetadataBase = 1194 payload.metadata.payloadMetadata_ref()->get_exceptionMetadata(); 1195 if (auto exceptionMetadataRef = exceptionMetadataBase.metadata_ref()) { 1196 if (exceptionMetadataRef->getType() == 1197 PayloadExceptionMetadata::declaredException) { 1198 PResult result; 1199 T res{}; 1200 Protocol prot; 1201 result.template get<0>().value = &res; 1202 prot.setInput(payload.payload.get()); 1203 result.read(&prot); 1204 CHECK(!result.getIsSet(0)); 1205 foreach_index<PResult::size::value - 1>([&](auto index) { 1206 if (!hijacked && result.getIsSet(index.value + 1)) { 1207 auto& fdata = result.template get<index.value + 1>(); 1208 hijacked = folly::exception_wrapper(std::move(fdata.ref())); 1209 } 1210 }); 1211 1212 if (!hijacked) { 1213 hijacked = 1214 TApplicationException("Failed to parse declared exception"); 1215 } 1216 } else { 1217 hijacked = TApplicationException( 1218 exceptionMetadataBase.what_utf8_ref().value_or("")); 1219 } 1220 } else { 1221 hijacked = 1222 TApplicationException("Missing payload exception metadata"); 1223 } 1224 }, 1225 [&hijacked](apache::thrift::detail::EncodedStreamRpcError& err) { 1226 StreamRpcError streamRpcError; 1227 CompactProtocolReader reader; 1228 reader.setInput(err.encoded.get()); 1229 streamRpcError.read(&reader); 1230 TApplicationException::TApplicationExceptionType exType{ 1231 TApplicationException::UNKNOWN}; 1232 auto code = streamRpcError.code_ref(); 1233 if (code && 1234 (code.value() == StreamRpcErrorCode::CREDIT_TIMEOUT || 1235 code.value() == StreamRpcErrorCode::CHUNK_TIMEOUT)) { 1236 exType = TApplicationException::TIMEOUT; 1237 } 1238 hijacked = TApplicationException( 1239 exType, streamRpcError.what_utf8_ref().value_or("")); 1240 }, 1241 [](...) {}); 1242 1243 if (hijacked) { 1244 return hijacked; 1245 } 1246 return ew; 1247 } 1248 1249 template < 1250 typename Protocol, 1251 typename PResult, 1252 typename ErrorMapFunc, 1253 typename T> 1254 ServerStreamFactory encode_server_stream( 1255 apache::thrift::ServerStream<T>&& stream, 1256 folly::Executor::KeepAlive<> serverExecutor) { 1257 static StreamElementEncoderImpl< 1258 ErrorBlame::SERVER, 1259 Protocol, 1260 PResult, 1261 T, 1262 ErrorMapFunc> 1263 encode; 1264 return stream(std::move(serverExecutor), &encode); 1265 } 1266 1267 template <typename Protocol, typename PResult, typename T> 1268 folly::Try<T> decode_stream_element( 1269 folly::Try<apache::thrift::StreamPayload>&& payload) { 1270 if (payload.hasValue()) { 1271 return folly::Try<T>( 1272 decode_stream_payload<Protocol, PResult, T>(*payload->payload)); 1273 } else if (payload.hasException()) { 1274 return folly::Try<T>(decode_stream_exception<Protocol, PResult, T>( 1275 std::move(payload).exception())); 1276 } else { 1277 return folly::Try<T>(); 1278 } 1279 } 1280 1281 template <typename Protocol, typename PResult, typename T> 1282 apache::thrift::ClientBufferedStream<T> decode_client_buffered_stream( 1283 apache::thrift::detail::ClientStreamBridge::ClientPtr streamBridge, 1284 const BufferOptions& bufferOptions) { 1285 return apache::thrift::ClientBufferedStream<T>( 1286 std::move(streamBridge), 1287 decode_stream_element<Protocol, PResult, T>, 1288 bufferOptions); 1289 } 1290 1291 template < 1292 typename ProtocolReader, 1293 typename ProtocolWriter, 1294 typename SinkPResult, 1295 typename FinalResponsePResult, 1296 typename ErrorMapFunc, 1297 typename SinkType, 1298 typename FinalResponseType> 1299 apache::thrift::detail::SinkConsumerImpl toSinkConsumerImpl( 1300 FOLLY_MAYBE_UNUSED SinkConsumer<SinkType, FinalResponseType>&& sinkConsumer, 1301 FOLLY_MAYBE_UNUSED folly::Executor::KeepAlive<> executor) { 1302 #if FOLLY_HAS_COROUTINES 1303 auto consumer = 1304 [innerConsumer = std::move(sinkConsumer.consumer)]( 1305 folly::coro::AsyncGenerator<folly::Try<StreamPayload>&&> gen) mutable 1306 -> folly::coro::Task<folly::Try<StreamPayload>> { 1307 folly::exception_wrapper ew; 1308 try { 1309 FinalResponseType finalResponse = co_await innerConsumer( 1310 [](folly::coro::AsyncGenerator<folly::Try<StreamPayload>&&> gen_) 1311 -> folly::coro::AsyncGenerator<SinkType&&> { 1312 while (auto item = co_await gen_.next()) { 1313 auto payload = std::move(*item); 1314 co_yield folly::coro::co_result(ap::decode_stream_element< 1315 ProtocolReader, 1316 SinkPResult, 1317 SinkType>(std::move(payload))); 1318 } 1319 }(std::move(gen))); 1320 co_return folly::Try<StreamPayload>(StreamPayload( 1321 ap::encode_stream_payload<ProtocolWriter, FinalResponsePResult>( 1322 std::move(finalResponse)), 1323 {})); 1324 // This causes clang internal error on Windows. 1325 #if !(defined(_WIN32) && defined(__clang__)) 1326 } catch (std::exception& e) { 1327 ew = folly::exception_wrapper(std::current_exception(), e); 1328 #endif 1329 } catch (...) { 1330 ew = folly::exception_wrapper(std::current_exception()); 1331 } 1332 co_return folly::Try<StreamPayload>(ap::encode_stream_exception< 1333 ErrorBlame::SERVER, 1334 ProtocolWriter, 1335 FinalResponsePResult, 1336 ErrorMapFunc>(std::move(ew))); 1337 }; 1338 return apache::thrift::detail::SinkConsumerImpl{ 1339 std::move(consumer), 1340 sinkConsumer.bufferSize, 1341 sinkConsumer.sinkOptions.chunkTimeout, 1342 std::move(executor)}; 1343 #else 1344 std::terminate(); 1345 #endif 1346 } 1347 1348 } // namespace ap 1349 } // namespace detail 1350 1351 // ServerInterface helpers 1352 namespace detail { 1353 namespace si { 1354 template <typename T> 1355 folly::Future<T> future( 1356 folly::SemiFuture<T>&& future, folly::Executor::KeepAlive<> keepAlive) { 1357 if (future.isReady()) { 1358 return std::move(future).toUnsafeFuture(); 1359 } 1360 return std::move(future).via(keepAlive); 1361 } 1362 1363 using CallbackBase = HandlerCallbackBase; 1364 using CallbackBasePtr = std::unique_ptr<CallbackBase>; 1365 template <typename T> 1366 using Callback = HandlerCallback<T>; 1367 template <typename T> 1368 using CallbackPtr = std::unique_ptr<Callback<T>>; 1369 1370 class AsyncTmPrep { 1371 ServerInterface* si_; 1372 1373 public: 1374 AsyncTmPrep(ServerInterface* si, CallbackBase* callback) : si_{si} { 1375 si->setEventBase(callback->getEventBase()); 1376 si->setThreadManager(callback->getThreadManager()); 1377 si->setRequestContext(callback->getRequestContext()); 1378 } 1379 1380 ~AsyncTmPrep() { si_->clearRequestParams(); } 1381 }; 1382 1383 inline void async_tm_future_oneway( 1384 CallbackBasePtr callback, folly::Future<folly::Unit>&& fut) { 1385 if (!fut.isReady()) { 1386 auto ka = callback->getInternalKeepAlive(); 1387 std::move(fut) 1388 .via(std::move(ka)) 1389 .thenValueInline([cb = std::move(callback)](auto&&) {}); 1390 } 1391 } 1392 1393 template <typename T> 1394 void async_tm_future( 1395 CallbackPtr<T> callback, folly::Future<folly::lift_unit_t<T>>&& fut) { 1396 if (!fut.isReady()) { 1397 auto ka = callback->getInternalKeepAlive(); 1398 std::move(fut) 1399 .via(std::move(ka)) 1400 .thenTryInline([cb = std::move(callback)]( 1401 folly::Try<folly::lift_unit_t<T>>&& ret) { 1402 cb->complete(std::move(ret)); 1403 }); 1404 } else { 1405 callback->complete(std::move(fut).result()); 1406 } 1407 } 1408 1409 inline void async_tm_semifuture_oneway( 1410 CallbackBasePtr callback, folly::SemiFuture<folly::Unit>&& fut) { 1411 if (!fut.isReady()) { 1412 auto ka = callback->getInternalKeepAlive(); 1413 std::move(fut) 1414 .via(std::move(ka)) 1415 .thenValueInline([cb = std::move(callback)](auto&&) {}); 1416 } 1417 } 1418 1419 template <typename T> 1420 void async_tm_semifuture( 1421 CallbackPtr<T> callback, folly::SemiFuture<folly::lift_unit_t<T>>&& fut) { 1422 if (!fut.isReady()) { 1423 auto ka = callback->getInternalKeepAlive(); 1424 std::move(fut) 1425 .via(std::move(ka)) 1426 .thenTryInline([cb = std::move(callback)]( 1427 folly::Try<folly::lift_unit_t<T>>&& ret) { 1428 cb->complete(std::move(ret)); 1429 }); 1430 } else { 1431 callback->complete(std::move(fut).result()); 1432 } 1433 } 1434 1435 #if FOLLY_HAS_COROUTINES 1436 inline void async_tm_coro_oneway( 1437 CallbackBasePtr callback, folly::coro::Task<void>&& task) { 1438 auto ka = callback->getInternalKeepAlive(); 1439 std::move(task) 1440 .scheduleOn(std::move(ka)) 1441 .startInlineUnsafe([callback = std::move(callback)](auto&&) {}); 1442 } 1443 1444 template <typename T> 1445 void async_tm_coro(CallbackPtr<T> callback, folly::coro::Task<T>&& task) { 1446 auto ka = callback->getInternalKeepAlive(); 1447 std::move(task) 1448 .scheduleOn(std::move(ka)) 1449 .startInlineUnsafe([callback = std::move(callback)]( 1450 folly::Try<folly::lift_unit_t<T>>&& tryResult) { 1451 callback->complete(std::move(tryResult)); 1452 }); 1453 } 1454 #endif 1455 1456 std::string formatUnimplementedMethodException(std::string_view methodName); 1457 TApplicationException create_app_exn_unimplemented(const char* name); 1458 [[noreturn]] void throw_app_exn_unimplemented(char const* name); 1459 1460 } // namespace si 1461 } // namespace detail 1462 1463 namespace util { 1464 1465 namespace detail { 1466 1467 constexpr ErrorKind fromExceptionKind(ExceptionKind kind) { 1468 switch (kind) { 1469 case ExceptionKind::TRANSIENT: 1470 return ErrorKind::TRANSIENT; 1471 1472 case ExceptionKind::STATEFUL: 1473 return ErrorKind::STATEFUL; 1474 1475 case ExceptionKind::PERMANENT: 1476 return ErrorKind::PERMANENT; 1477 1478 default: 1479 return ErrorKind::UNSPECIFIED; 1480 } 1481 } 1482 1483 constexpr ErrorBlame fromExceptionBlame(ExceptionBlame blame) { 1484 switch (blame) { 1485 case ExceptionBlame::SERVER: 1486 return ErrorBlame::SERVER; 1487 1488 case ExceptionBlame::CLIENT: 1489 return ErrorBlame::CLIENT; 1490 1491 default: 1492 return ErrorBlame::UNSPECIFIED; 1493 } 1494 } 1495 1496 constexpr ErrorSafety fromExceptionSafety(ExceptionSafety safety) { 1497 switch (safety) { 1498 case ExceptionSafety::SAFE: 1499 return ErrorSafety::SAFE; 1500 1501 default: 1502 return ErrorSafety::UNSPECIFIED; 1503 } 1504 } 1505 1506 template <typename T> 1507 std::string serializeExceptionMeta(const folly::exception_wrapper& ew) { 1508 ErrorClassification errorClassification; 1509 1510 constexpr auto errorKind = apache::thrift::detail::st::struct_private_access:: 1511 __fbthrift_cpp2_gen_exception_kind<T>(); 1512 errorClassification.kind_ref() = fromExceptionKind(errorKind); 1513 constexpr auto errorBlame = apache::thrift::detail::st:: 1514 struct_private_access::__fbthrift_cpp2_gen_exception_blame<T>(); 1515 errorClassification.blame_ref() = fromExceptionBlame(errorBlame); 1516 constexpr auto errorSafety = apache::thrift::detail::st:: 1517 struct_private_access::__fbthrift_cpp2_gen_exception_safety<T>(); 1518 errorClassification.safety_ref() = fromExceptionSafety(errorSafety); 1519 1520 ew.with_exception([&errorClassification]( 1521 const ExceptionMetadataOverrideBase& ex) { 1522 if (ex.errorKind() != ExceptionKind::UNSPECIFIED) { 1523 errorClassification.kind_ref() = fromExceptionKind(ex.errorKind()); 1524 } 1525 if (ex.errorBlame() != ExceptionBlame::UNSPECIFIED) { 1526 errorClassification.blame_ref() = fromExceptionBlame(ex.errorBlame()); 1527 } 1528 if (ex.errorSafety() != ExceptionSafety::UNSPECIFIED) { 1529 errorClassification.safety_ref() = fromExceptionSafety(ex.errorSafety()); 1530 } 1531 }); 1532 1533 return apache::thrift::detail::serializeErrorClassification( 1534 errorClassification); 1535 } 1536 1537 } // namespace detail 1538 1539 void appendExceptionToHeader( 1540 const folly::exception_wrapper& ew, Cpp2RequestContext& ctx); 1541 1542 template <typename T> 1543 void appendErrorClassificationToHeader( 1544 const folly::exception_wrapper& ew, Cpp2RequestContext& ctx) { 1545 auto header = ctx.getHeader(); 1546 if (!header) { 1547 return; 1548 } 1549 auto exMeta = detail::serializeExceptionMeta<T>(ew); 1550 header->setHeader( 1551 std::string(apache::thrift::detail::kHeaderExMeta), std::move(exMeta)); 1552 } 1553 1554 TApplicationException toTApplicationException( 1555 const folly::exception_wrapper& ew); 1556 1557 bool includeInRecentRequestsCount(const std::string_view methodName); 1558 1559 } // namespace util 1560 1561 } // namespace thrift 1562 } // namespace apache 1563