1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <folly/container/F14Set.h>
20 #include <folly/executors/SequencedExecutor.h>
21 #include <folly/experimental/channels/Channel.h>
22 #include <folly/experimental/channels/FanoutSender.h>
23 #include <folly/experimental/channels/detail/Utility.h>
24
25 namespace folly {
26 namespace channels {
27
28 template <typename ValueType, typename ContextType>
FanoutChannel(TProcessor * processor)29 FanoutChannel<ValueType, ContextType>::FanoutChannel(TProcessor* processor)
30 : processor_(processor) {}
31
32 template <typename ValueType, typename ContextType>
FanoutChannel(FanoutChannel && other)33 FanoutChannel<ValueType, ContextType>::FanoutChannel(
34 FanoutChannel&& other) noexcept
35 : processor_(std::exchange(other.processor_, nullptr)) {}
36
37 template <typename ValueType, typename ContextType>
38 FanoutChannel<ValueType, ContextType>&
39 FanoutChannel<ValueType, ContextType>::operator=(
40 FanoutChannel&& other) noexcept {
41 if (&other == this) {
42 return *this;
43 }
44 if (processor_) {
45 std::move(*this).close();
46 }
47 processor_ = std::exchange(other.processor_, nullptr);
48 return *this;
49 }
50
51 template <typename ValueType, typename ContextType>
~FanoutChannel()52 FanoutChannel<ValueType, ContextType>::~FanoutChannel() {
53 if (processor_ != nullptr) {
54 std::move(*this).close(folly::exception_wrapper());
55 }
56 }
57
58 template <typename ValueType, typename ContextType>
59 FanoutChannel<ValueType, ContextType>::operator bool() const {
60 return processor_ != nullptr;
61 }
62
63 template <typename ValueType, typename ContextType>
subscribe(folly::Function<std::vector<ValueType> (const ContextType &)> getInitialValues)64 Receiver<ValueType> FanoutChannel<ValueType, ContextType>::subscribe(
65 folly::Function<std::vector<ValueType>(const ContextType&)>
66 getInitialValues) {
67 return processor_->subscribe(std::move(getInitialValues));
68 }
69
70 template <typename ValueType, typename ContextType>
anySubscribers()71 bool FanoutChannel<ValueType, ContextType>::anySubscribers() const {
72 return processor_->anySubscribers();
73 }
74
75 template <typename ValueType, typename ContextType>
closeSubscribers(folly::exception_wrapper ex)76 void FanoutChannel<ValueType, ContextType>::closeSubscribers(
77 folly::exception_wrapper ex) {
78 processor_->closeSubscribers(
79 ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult());
80 }
81
82 template <typename ValueType, typename ContextType>
close(folly::exception_wrapper ex)83 void FanoutChannel<ValueType, ContextType>::close(
84 folly::exception_wrapper ex) && {
85 processor_->destroyHandle(
86 ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult());
87 processor_ = nullptr;
88 }
89
90 namespace detail {
91
92 template <typename ValueType, typename ContextType>
93 class IFanoutChannelProcessor : public IChannelCallback {
94 public:
95 virtual Receiver<ValueType> subscribe(
96 folly::Function<std::vector<ValueType>(const ContextType&)>
97 getInitialValues) = 0;
98
99 virtual bool anySubscribers() = 0;
100
101 virtual void closeSubscribers(CloseResult closeResult) = 0;
102
103 virtual void destroyHandle(CloseResult closeResult) = 0;
104 };
105
106 /**
107 * This object fans out values from the input receiver to all output receivers.
108 * The lifetime of this object is described by the following state machine.
109 *
110 * The input receiver can be in one of three conceptual states: Active,
111 * CancellationTriggered, or CancellationProcessed (removed). When the input
112 * receiver reaches the CancellationProcessed state AND the user's FanoutChannel
113 * object is deleted, this object is deleted.
114 *
115 * When an input receiver receives a value indicating that the channel has
116 * been closed, the state of the input receiver transitions from Active directly
117 * to CancellationProcessed (and this object will be deleted once the user
118 * destroys their FanoutChannel object).
119 *
120 * When the user destroys their FanoutChannel object, the state of the input
121 * receiver transitions from Active to CancellationTriggered. This object will
122 * then be deleted once the input receiver transitions to the
123 * CancellationProcessed state.
124 */
125 template <typename ValueType, typename ContextType>
126 class FanoutChannelProcessor
127 : public IFanoutChannelProcessor<ValueType, ContextType> {
128 private:
129 struct State {
StateState130 State(ContextType _context) : context(std::move(_context)) {}
131
getReceiverStateState132 ChannelState getReceiverState() {
133 return detail::getReceiverState(receiver.get());
134 }
135
136 ChannelBridgePtr<ValueType> receiver;
137 FanoutSender<ValueType> fanoutSender;
138 ContextType context;
139 bool handleDeleted{false};
140 };
141
142 using WLockedStatePtr = typename folly::Synchronized<State>::WLockedPtr;
143
144 public:
FanoutChannelProcessor(folly::Executor::KeepAlive<folly::SequencedExecutor> executor,ContextType context)145 explicit FanoutChannelProcessor(
146 folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
147 ContextType context)
148 : executor_(std::move(executor)), state_(std::move(context)) {}
149
150 /**
151 * Starts fanning out values from the input receiver to all output receivers.
152 *
153 * @param inputReceiver: The input receiver to fan out values from.
154 */
start(Receiver<ValueType> inputReceiver)155 void start(Receiver<ValueType> inputReceiver) {
156 auto state = state_.wlock();
157 auto [unbufferedInputReceiver, buffer] =
158 detail::receiverUnbuffer(std::move(inputReceiver));
159 state->receiver = std::move(unbufferedInputReceiver);
160
161 // Start processing new values that come in from the input receiver.
162 processAllAvailableValues(state, std::move(buffer));
163 }
164
165 /**
166 * Returns a new output receiver that will receive all values from the input
167 * receiver. If a getInitialValues parameter is provided, it will be executed
168 * to determine the set of initial values that will (only) go to the new input
169 * receiver.
170 */
subscribe(folly::Function<std::vector<ValueType> (const ContextType &)> getInitialValues)171 Receiver<ValueType> subscribe(
172 folly::Function<std::vector<ValueType>(const ContextType&)>
173 getInitialValues) override {
174 auto state = state_.wlock();
175 auto initialValues = getInitialValues ? getInitialValues(state->context)
176 : std::vector<ValueType>();
177 if (!state->receiver) {
178 auto [receiver, sender] = Channel<ValueType>::create();
179 for (auto&& value : initialValues) {
180 sender.write(std::move(value));
181 }
182 std::move(sender).close();
183 return std::move(receiver);
184 }
185 return state->fanoutSender.subscribe(std::move(initialValues));
186 }
187
188 /**
189 * Closes all subscribers without closing the fanout channel.
190 */
closeSubscribers(CloseResult closeResult)191 void closeSubscribers(CloseResult closeResult) {
192 auto state = state_.wlock();
193 std::move(state->fanoutSender)
194 .close(
195 closeResult.exception.has_value() ? closeResult.exception.value()
196 : folly::exception_wrapper());
197 }
198
199 /**
200 * This is called when the user's FanoutChannel object has been destroyed.
201 */
destroyHandle(CloseResult closeResult)202 void destroyHandle(CloseResult closeResult) {
203 auto state = state_.wlock();
204 processHandleDestroyed(state, std::move(closeResult));
205 }
206
207 /**
208 * Returns whether this fanout channel has any output receivers.
209 */
anySubscribers()210 bool anySubscribers() override {
211 return state_.wlock()->fanoutSender.anySubscribers();
212 }
213
214 private:
215 /**
216 * Called when one of the channels we are listening to has an update (either
217 * a value from the input receiver or a cancellation from an output receiver).
218 */
consume(ChannelBridgeBase *)219 void consume(ChannelBridgeBase*) override {
220 executor_->add([=]() {
221 // One or more values are now available from the input receiver.
222 auto state = state_.wlock();
223 CHECK_NE(state->getReceiverState(), ChannelState::CancellationProcessed);
224 processAllAvailableValues(state);
225 });
226 }
227
canceled(ChannelBridgeBase *)228 void canceled(ChannelBridgeBase*) override {
229 executor_->add([=]() {
230 // We previously cancelled this input receiver, due to the destruction of
231 // the handle. Process the cancellation for this input receiver.
232 auto state = state_.wlock();
233 processReceiverCancelled(state, CloseResult());
234 });
235 }
236
237 /**
238 * Processes all available values from the input receiver (starting from the
239 * provided buffer, if present).
240 *
241 * If an value was received indicating that the input channel has been closed
242 * (or if the transform function indicated that channel should be closed), we
243 * will process cancellation for the input receiver.
244 */
245 void processAllAvailableValues(
246 WLockedStatePtr& state,
247 std::optional<ReceiverQueue<ValueType>> buffer = std::nullopt) {
248 auto closeResult = state->receiver->isReceiverCancelled()
249 ? CloseResult()
250 : (buffer.has_value() ? processValues(state, std::move(buffer.value()))
251 : std::nullopt);
252 while (!closeResult.has_value()) {
253 if (state->receiver->receiverWait(this)) {
254 // There are no more values available right now. We will stop processing
255 // until the channel fires the consume() callback (indicating that more
256 // values are available).
257 break;
258 }
259 auto values = state->receiver->receiverGetValues();
260 CHECK(!values.empty());
261 closeResult = processValues(state, std::move(values));
262 }
263 if (closeResult.has_value()) {
264 // The receiver received a value indicating channel closure.
265 state->receiver->receiverCancel();
266 processReceiverCancelled(state, std::move(closeResult.value()));
267 }
268 }
269
270 /**
271 * Processes the given set of values for the input receiver. Returns a
272 * CloseResult if channel was closed, so the caller can stop attempting to
273 * process values from it.
274 */
processValues(WLockedStatePtr & state,ReceiverQueue<ValueType> values)275 std::optional<CloseResult> processValues(
276 WLockedStatePtr& state, ReceiverQueue<ValueType> values) {
277 while (!values.empty()) {
278 auto inputResult = std::move(values.front());
279 values.pop();
280 if (inputResult.hasValue()) {
281 // We have received a normal value from the input receiver. Write it to
282 // all output senders.
283 state->context.update(inputResult.value());
284 state->fanoutSender.write(std::move(inputResult.value()));
285 } else {
286 // The input receiver was closed.
287 return inputResult.hasException()
288 ? CloseResult(std::move(inputResult.exception()))
289 : CloseResult();
290 }
291 }
292 return std::nullopt;
293 }
294
295 /**
296 * Processes the cancellation of the input receiver. We will close all senders
297 * with the exception received from the input receiver (if any).
298 */
processReceiverCancelled(WLockedStatePtr & state,CloseResult closeResult)299 void processReceiverCancelled(
300 WLockedStatePtr& state, CloseResult closeResult) {
301 CHECK_EQ(state->getReceiverState(), ChannelState::CancellationTriggered);
302 state->receiver = nullptr;
303 std::move(state->fanoutSender)
304 .close(
305 closeResult.exception.has_value() ? closeResult.exception.value()
306 : folly::exception_wrapper());
307 maybeDelete(state);
308 }
309
310 /**
311 * Processes the destruction of the user's FanoutChannel object. We will
312 * cancel the receiver and trigger cancellation for all senders not already
313 * cancelled.
314 */
processHandleDestroyed(WLockedStatePtr & state,CloseResult closeResult)315 void processHandleDestroyed(WLockedStatePtr& state, CloseResult closeResult) {
316 state->handleDeleted = true;
317 if (state->getReceiverState() == ChannelState::Active) {
318 state->receiver->receiverCancel();
319 }
320 std::move(state->fanoutSender)
321 .close(
322 closeResult.exception.has_value() ? closeResult.exception.value()
323 : folly::exception_wrapper());
324 maybeDelete(state);
325 }
326
327 /**
328 * Deletes this object if we have already processed cancellation for the
329 * receiver and all senders, and if the user's FanoutChannel object was
330 * destroyed.
331 */
maybeDelete(WLockedStatePtr & state)332 void maybeDelete(WLockedStatePtr& state) {
333 if (state->getReceiverState() == ChannelState::CancellationProcessed &&
334 state->handleDeleted) {
335 state.unlock();
336 delete this;
337 }
338 }
339
340 folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
341 folly::Synchronized<State> state_;
342 };
343 } // namespace detail
344
345 template <typename TReceiver, typename ValueType, typename ContextType>
createFanoutChannel(TReceiver inputReceiver,folly::Executor::KeepAlive<folly::SequencedExecutor> executor,ContextType context)346 FanoutChannel<ValueType, ContextType> createFanoutChannel(
347 TReceiver inputReceiver,
348 folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
349 ContextType context) {
350 auto* processor = new detail::FanoutChannelProcessor<ValueType, ContextType>(
351 std::move(executor), std::move(context));
352 processor->start(std::move(inputReceiver));
353 return FanoutChannel<ValueType, ContextType>(processor);
354 }
355 } // namespace channels
356 } // namespace folly
357