1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <folly/ExceptionWrapper.h> 20 #include <folly/experimental/coro/AsyncGenerator.h> 21 #include <folly/experimental/coro/Coroutine.h> 22 #include <folly/experimental/coro/Materialize.h> 23 24 #include <tuple> 25 #include <variant> 26 27 #if FOLLY_HAS_COROUTINES 28 29 namespace folly { 30 namespace coro { 31 32 template <typename Id, typename Value> 33 using Enumerated = std::tuple<Id, Value>; 34 35 // Multiplex the results of multiple streams into a single stream that 36 // contains all of the events of the input stream. 37 // 38 // The output is a stream of std::tuple<Id, Event> where the first tuple 39 // element is the result of a call to selectId(innerStream). The default 40 // selectId returns a size_t set to the index of the stream that the event 41 // came from and where Event is the CallbackRecord result of 42 // materialize(innerStream). 43 // 44 // Example: 45 // AsyncGenerator<AsyncGenerator<int>&&> streams(); 46 // 47 // Task<void> consumer() { 48 // auto events = multiplex(streams()); 49 // while (auto item != co_await events.next()) { 50 // auto&& [index, event] = *item; 51 // if (event.index() == 0) { 52 // // Value 53 // int value = std::get<0>(event); 54 // std::cout << index << " value " << value << "\n"; 55 // } else if (event.index() == 1) { 56 // // End Of Stream 57 // std::cout << index << " end\n"; 58 // } else { 59 // // Exception 60 // folly::exception_wrapper error = std::get<2>(event); 61 // std::cout << index << " error " << error.what() << "\n"; 62 // } 63 // } 64 // } 65 template < 66 typename SelectIdFn, 67 typename Reference, 68 typename Value, 69 typename KeyType = std::decay_t< 70 invoke_result_t<SelectIdFn&, const AsyncGenerator<Reference, Value>&>>> 71 AsyncGenerator< 72 Enumerated<KeyType, CallbackRecord<Reference>>, 73 Enumerated<KeyType, CallbackRecord<Value>>> 74 multiplex( 75 folly::Executor::KeepAlive<> exec, 76 AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources, 77 SelectIdFn&& selectId); 78 79 template <typename Reference, typename Value> 80 AsyncGenerator< 81 Enumerated<size_t, CallbackRecord<Reference>>, 82 Enumerated<size_t, CallbackRecord<Value>>> 83 multiplex( 84 folly::Executor::KeepAlive<> exec, 85 AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources); 86 87 } // namespace coro 88 } // namespace folly 89 90 #endif // FOLLY_HAS_COROUTINES 91 92 #include <folly/experimental/coro/Multiplex-inl.h> 93