1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/experimental/coro/Materialize.h>
18 #include <folly/experimental/coro/Merge.h>
19 #include <folly/experimental/coro/Transform.h>
20 
21 #if FOLLY_HAS_COROUTINES
22 
23 namespace folly {
24 namespace coro {
25 
26 template <
27     typename SelectIdFn,
28     typename Reference,
29     typename Value,
30     typename KeyType>
31 AsyncGenerator<
32     Enumerated<KeyType, CallbackRecord<Reference>>,
33     Enumerated<KeyType, CallbackRecord<Value>>>
multiplex(folly::Executor::KeepAlive<> exec,AsyncGenerator<AsyncGenerator<Reference,Value>> && sources,SelectIdFn && selectId)34 multiplex(
35     folly::Executor::KeepAlive<> exec,
36     AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources,
37     SelectIdFn&& selectId) {
38   using EventType = CallbackRecord<Reference>;
39   using ReferenceType = Enumerated<KeyType, EventType>;
40 
41   return merge(
42       std::move(exec),
43       transform(
44           std::move(sources),
45           [selectId = std::forward<SelectIdFn>(selectId)](
46               AsyncGenerator<Reference, Value>&& item) mutable {
47             KeyType id = invoke(selectId, item);
48             return transform(
49                 materialize(std::move(item)),
50                 [id = std::move(id)](EventType&& event) {
51                   return ReferenceType{id, std::move(event)};
52                 });
53           }));
54 }
55 
56 struct MultiplexIdcountFn {
57   size_t n = 0;
58   template <typename Inner>
operatorMultiplexIdcountFn59   size_t operator()(Inner&&) noexcept {
60     return n++;
61   }
62 };
63 
64 template <typename Reference, typename Value>
65 AsyncGenerator<
66     Enumerated<size_t, CallbackRecord<Reference>>,
67     Enumerated<size_t, CallbackRecord<Value>>>
multiplex(folly::Executor::KeepAlive<> exec,AsyncGenerator<AsyncGenerator<Reference,Value>> && sources)68 multiplex(
69     folly::Executor::KeepAlive<> exec,
70     AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources) {
71   return multiplex(std::move(exec), std::move(sources), MultiplexIdcountFn{});
72 }
73 
74 } // namespace coro
75 } // namespace folly
76 
77 #endif // FOLLY_HAS_COROUTINES
78