1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/container/F14Set.h>
20 #include <folly/executors/SequencedExecutor.h>
21 #include <folly/experimental/channels/Channel.h>
22 #include <folly/experimental/channels/FanoutSender.h>
23 #include <folly/experimental/channels/detail/Utility.h>
24 
25 namespace folly {
26 namespace channels {
27 
28 template <typename ValueType, typename ContextType>
FanoutChannel(TProcessor * processor)29 FanoutChannel<ValueType, ContextType>::FanoutChannel(TProcessor* processor)
30     : processor_(processor) {}
31 
32 template <typename ValueType, typename ContextType>
FanoutChannel(FanoutChannel && other)33 FanoutChannel<ValueType, ContextType>::FanoutChannel(
34     FanoutChannel&& other) noexcept
35     : processor_(std::exchange(other.processor_, nullptr)) {}
36 
37 template <typename ValueType, typename ContextType>
38 FanoutChannel<ValueType, ContextType>&
39 FanoutChannel<ValueType, ContextType>::operator=(
40     FanoutChannel&& other) noexcept {
41   if (&other == this) {
42     return *this;
43   }
44   if (processor_) {
45     std::move(*this).close();
46   }
47   processor_ = std::exchange(other.processor_, nullptr);
48   return *this;
49 }
50 
51 template <typename ValueType, typename ContextType>
~FanoutChannel()52 FanoutChannel<ValueType, ContextType>::~FanoutChannel() {
53   if (processor_ != nullptr) {
54     std::move(*this).close(folly::exception_wrapper());
55   }
56 }
57 
58 template <typename ValueType, typename ContextType>
59 FanoutChannel<ValueType, ContextType>::operator bool() const {
60   return processor_ != nullptr;
61 }
62 
63 template <typename ValueType, typename ContextType>
subscribe(folly::Function<std::vector<ValueType> (const ContextType &)> getInitialValues)64 Receiver<ValueType> FanoutChannel<ValueType, ContextType>::subscribe(
65     folly::Function<std::vector<ValueType>(const ContextType&)>
66         getInitialValues) {
67   return processor_->subscribe(std::move(getInitialValues));
68 }
69 
70 template <typename ValueType, typename ContextType>
anySubscribers()71 bool FanoutChannel<ValueType, ContextType>::anySubscribers() const {
72   return processor_->anySubscribers();
73 }
74 
75 template <typename ValueType, typename ContextType>
closeSubscribers(folly::exception_wrapper ex)76 void FanoutChannel<ValueType, ContextType>::closeSubscribers(
77     folly::exception_wrapper ex) {
78   processor_->closeSubscribers(
79       ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult());
80 }
81 
82 template <typename ValueType, typename ContextType>
close(folly::exception_wrapper ex)83 void FanoutChannel<ValueType, ContextType>::close(
84     folly::exception_wrapper ex) && {
85   processor_->destroyHandle(
86       ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult());
87   processor_ = nullptr;
88 }
89 
90 namespace detail {
91 
92 template <typename ValueType, typename ContextType>
93 class IFanoutChannelProcessor : public IChannelCallback {
94  public:
95   virtual Receiver<ValueType> subscribe(
96       folly::Function<std::vector<ValueType>(const ContextType&)>
97           getInitialValues) = 0;
98 
99   virtual bool anySubscribers() = 0;
100 
101   virtual void closeSubscribers(CloseResult closeResult) = 0;
102 
103   virtual void destroyHandle(CloseResult closeResult) = 0;
104 };
105 
106 /**
107  * This object fans out values from the input receiver to all output receivers.
108  * The lifetime of this object is described by the following state machine.
109  *
110  * The input receiver can be in one of three conceptual states: Active,
111  * CancellationTriggered, or CancellationProcessed (removed). When the input
112  * receiver reaches the CancellationProcessed state AND the user's FanoutChannel
113  * object is deleted, this object is deleted.
114  *
115  * When an input receiver receives a value indicating that the channel has
116  * been closed, the state of the input receiver transitions from Active directly
117  * to CancellationProcessed (and this object will be deleted once the user
118  * destroys their FanoutChannel object).
119  *
120  * When the user destroys their FanoutChannel object, the state of the input
121  * receiver transitions from Active to CancellationTriggered. This object will
122  * then be deleted once the input receiver transitions to the
123  * CancellationProcessed state.
124  */
125 template <typename ValueType, typename ContextType>
126 class FanoutChannelProcessor
127     : public IFanoutChannelProcessor<ValueType, ContextType> {
128  private:
129   struct State {
StateState130     State(ContextType _context) : context(std::move(_context)) {}
131 
getReceiverStateState132     ChannelState getReceiverState() {
133       return detail::getReceiverState(receiver.get());
134     }
135 
136     ChannelBridgePtr<ValueType> receiver;
137     FanoutSender<ValueType> fanoutSender;
138     ContextType context;
139     bool handleDeleted{false};
140   };
141 
142   using WLockedStatePtr = typename folly::Synchronized<State>::WLockedPtr;
143 
144  public:
FanoutChannelProcessor(folly::Executor::KeepAlive<folly::SequencedExecutor> executor,ContextType context)145   explicit FanoutChannelProcessor(
146       folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
147       ContextType context)
148       : executor_(std::move(executor)), state_(std::move(context)) {}
149 
150   /**
151    * Starts fanning out values from the input receiver to all output receivers.
152    *
153    * @param inputReceiver: The input receiver to fan out values from.
154    */
start(Receiver<ValueType> inputReceiver)155   void start(Receiver<ValueType> inputReceiver) {
156     auto state = state_.wlock();
157     auto [unbufferedInputReceiver, buffer] =
158         detail::receiverUnbuffer(std::move(inputReceiver));
159     state->receiver = std::move(unbufferedInputReceiver);
160 
161     // Start processing new values that come in from the input receiver.
162     processAllAvailableValues(state, std::move(buffer));
163   }
164 
165   /**
166    * Returns a new output receiver that will receive all values from the input
167    * receiver. If a getInitialValues parameter is provided, it will be executed
168    * to determine the set of initial values that will (only) go to the new input
169    * receiver.
170    */
subscribe(folly::Function<std::vector<ValueType> (const ContextType &)> getInitialValues)171   Receiver<ValueType> subscribe(
172       folly::Function<std::vector<ValueType>(const ContextType&)>
173           getInitialValues) override {
174     auto state = state_.wlock();
175     auto initialValues = getInitialValues ? getInitialValues(state->context)
176                                           : std::vector<ValueType>();
177     if (!state->receiver) {
178       auto [receiver, sender] = Channel<ValueType>::create();
179       for (auto&& value : initialValues) {
180         sender.write(std::move(value));
181       }
182       std::move(sender).close();
183       return std::move(receiver);
184     }
185     return state->fanoutSender.subscribe(std::move(initialValues));
186   }
187 
188   /**
189    * Closes all subscribers without closing the fanout channel.
190    */
closeSubscribers(CloseResult closeResult)191   void closeSubscribers(CloseResult closeResult) {
192     auto state = state_.wlock();
193     std::move(state->fanoutSender)
194         .close(
195             closeResult.exception.has_value() ? closeResult.exception.value()
196                                               : folly::exception_wrapper());
197   }
198 
199   /**
200    * This is called when the user's FanoutChannel object has been destroyed.
201    */
destroyHandle(CloseResult closeResult)202   void destroyHandle(CloseResult closeResult) {
203     auto state = state_.wlock();
204     processHandleDestroyed(state, std::move(closeResult));
205   }
206 
207   /**
208    * Returns whether this fanout channel has any output receivers.
209    */
anySubscribers()210   bool anySubscribers() override {
211     return state_.wlock()->fanoutSender.anySubscribers();
212   }
213 
214  private:
215   /**
216    * Called when one of the channels we are listening to has an update (either
217    * a value from the input receiver or a cancellation from an output receiver).
218    */
consume(ChannelBridgeBase *)219   void consume(ChannelBridgeBase*) override {
220     executor_->add([=]() {
221       // One or more values are now available from the input receiver.
222       auto state = state_.wlock();
223       CHECK_NE(state->getReceiverState(), ChannelState::CancellationProcessed);
224       processAllAvailableValues(state);
225     });
226   }
227 
canceled(ChannelBridgeBase *)228   void canceled(ChannelBridgeBase*) override {
229     executor_->add([=]() {
230       // We previously cancelled this input receiver, due to the destruction of
231       // the handle. Process the cancellation for this input receiver.
232       auto state = state_.wlock();
233       processReceiverCancelled(state, CloseResult());
234     });
235   }
236 
237   /**
238    * Processes all available values from the input receiver (starting from the
239    * provided buffer, if present).
240    *
241    * If an value was received indicating that the input channel has been closed
242    * (or if the transform function indicated that channel should be closed), we
243    * will process cancellation for the input receiver.
244    */
245   void processAllAvailableValues(
246       WLockedStatePtr& state,
247       std::optional<ReceiverQueue<ValueType>> buffer = std::nullopt) {
248     auto closeResult = state->receiver->isReceiverCancelled()
249         ? CloseResult()
250         : (buffer.has_value() ? processValues(state, std::move(buffer.value()))
251                               : std::nullopt);
252     while (!closeResult.has_value()) {
253       if (state->receiver->receiverWait(this)) {
254         // There are no more values available right now. We will stop processing
255         // until the channel fires the consume() callback (indicating that more
256         // values are available).
257         break;
258       }
259       auto values = state->receiver->receiverGetValues();
260       CHECK(!values.empty());
261       closeResult = processValues(state, std::move(values));
262     }
263     if (closeResult.has_value()) {
264       // The receiver received a value indicating channel closure.
265       state->receiver->receiverCancel();
266       processReceiverCancelled(state, std::move(closeResult.value()));
267     }
268   }
269 
270   /**
271    * Processes the given set of values for the input receiver. Returns a
272    * CloseResult if channel was closed, so the caller can stop attempting to
273    * process values from it.
274    */
processValues(WLockedStatePtr & state,ReceiverQueue<ValueType> values)275   std::optional<CloseResult> processValues(
276       WLockedStatePtr& state, ReceiverQueue<ValueType> values) {
277     while (!values.empty()) {
278       auto inputResult = std::move(values.front());
279       values.pop();
280       if (inputResult.hasValue()) {
281         // We have received a normal value from the input receiver. Write it to
282         // all output senders.
283         state->context.update(inputResult.value());
284         state->fanoutSender.write(std::move(inputResult.value()));
285       } else {
286         // The input receiver was closed.
287         return inputResult.hasException()
288             ? CloseResult(std::move(inputResult.exception()))
289             : CloseResult();
290       }
291     }
292     return std::nullopt;
293   }
294 
295   /**
296    * Processes the cancellation of the input receiver. We will close all senders
297    * with the exception received from the input receiver (if any).
298    */
processReceiverCancelled(WLockedStatePtr & state,CloseResult closeResult)299   void processReceiverCancelled(
300       WLockedStatePtr& state, CloseResult closeResult) {
301     CHECK_EQ(state->getReceiverState(), ChannelState::CancellationTriggered);
302     state->receiver = nullptr;
303     std::move(state->fanoutSender)
304         .close(
305             closeResult.exception.has_value() ? closeResult.exception.value()
306                                               : folly::exception_wrapper());
307     maybeDelete(state);
308   }
309 
310   /**
311    * Processes the destruction of the user's FanoutChannel object.  We will
312    * cancel the receiver and trigger cancellation for all senders not already
313    * cancelled.
314    */
processHandleDestroyed(WLockedStatePtr & state,CloseResult closeResult)315   void processHandleDestroyed(WLockedStatePtr& state, CloseResult closeResult) {
316     state->handleDeleted = true;
317     if (state->getReceiverState() == ChannelState::Active) {
318       state->receiver->receiverCancel();
319     }
320     std::move(state->fanoutSender)
321         .close(
322             closeResult.exception.has_value() ? closeResult.exception.value()
323                                               : folly::exception_wrapper());
324     maybeDelete(state);
325   }
326 
327   /**
328    * Deletes this object if we have already processed cancellation for the
329    * receiver and all senders, and if the user's FanoutChannel object was
330    * destroyed.
331    */
maybeDelete(WLockedStatePtr & state)332   void maybeDelete(WLockedStatePtr& state) {
333     if (state->getReceiverState() == ChannelState::CancellationProcessed &&
334         state->handleDeleted) {
335       state.unlock();
336       delete this;
337     }
338   }
339 
340   folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
341   folly::Synchronized<State> state_;
342 };
343 } // namespace detail
344 
345 template <typename TReceiver, typename ValueType, typename ContextType>
createFanoutChannel(TReceiver inputReceiver,folly::Executor::KeepAlive<folly::SequencedExecutor> executor,ContextType context)346 FanoutChannel<ValueType, ContextType> createFanoutChannel(
347     TReceiver inputReceiver,
348     folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
349     ContextType context) {
350   auto* processor = new detail::FanoutChannelProcessor<ValueType, ContextType>(
351       std::move(executor), std::move(context));
352   processor->start(std::move(inputReceiver));
353   return FanoutChannel<ValueType, ContextType>(processor);
354 }
355 } // namespace channels
356 } // namespace folly
357