1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/experimental/coro/AsyncGenerator.h>
21 #include <folly/experimental/coro/Coroutine.h>
22 #include <folly/experimental/coro/Materialize.h>
23 
24 #include <tuple>
25 #include <variant>
26 
27 #if FOLLY_HAS_COROUTINES
28 
29 namespace folly {
30 namespace coro {
31 
32 template <typename Id, typename Value>
33 using Enumerated = std::tuple<Id, Value>;
34 
35 // Multiplex the results of multiple streams into a single stream that
36 // contains all of the events of the input stream.
37 //
38 // The output is a stream of std::tuple<Id, Event> where the first tuple
39 // element is the result of a call to selectId(innerStream). The default
40 // selectId returns a size_t set to the index of the stream that the event
41 // came from and where Event is the CallbackRecord result of
42 // materialize(innerStream).
43 //
44 // Example:
45 //   AsyncGenerator<AsyncGenerator<int>&&> streams();
46 //
47 //   Task<void> consumer() {
48 //     auto events = multiplex(streams());
49 //     while (auto item != co_await events.next()) {
50 //       auto&& [index, event] = *item;
51 //       if (event.index() == 0) {
52 //          // Value
53 //          int value = std::get<0>(event);
54 //          std::cout << index << " value " << value << "\n";
55 //       } else if (event.index() == 1) {
56 //         // End Of Stream
57 //         std::cout << index << " end\n";
58 //       } else {
59 //         // Exception
60 //         folly::exception_wrapper error = std::get<2>(event);
61 //         std::cout << index << " error " << error.what() << "\n";
62 //       }
63 //     }
64 //   }
65 template <
66     typename SelectIdFn,
67     typename Reference,
68     typename Value,
69     typename KeyType = std::decay_t<
70         invoke_result_t<SelectIdFn&, const AsyncGenerator<Reference, Value>&>>>
71 AsyncGenerator<
72     Enumerated<KeyType, CallbackRecord<Reference>>,
73     Enumerated<KeyType, CallbackRecord<Value>>>
74 multiplex(
75     folly::Executor::KeepAlive<> exec,
76     AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources,
77     SelectIdFn&& selectId);
78 
79 template <typename Reference, typename Value>
80 AsyncGenerator<
81     Enumerated<size_t, CallbackRecord<Reference>>,
82     Enumerated<size_t, CallbackRecord<Value>>>
83 multiplex(
84     folly::Executor::KeepAlive<> exec,
85     AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources);
86 
87 } // namespace coro
88 } // namespace folly
89 
90 #endif // FOLLY_HAS_COROUTINES
91 
92 #include <folly/experimental/coro/Multiplex-inl.h>
93