1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 namespace wangle {
20 
21 template <typename T, typename R, typename P>
22 folly::Future<BroadcastHandler<T, R>*>
getHandler()23 BroadcastPool<T, R, P>::BroadcastManager::getHandler() {
24   // getFuture() returns a completed future if we are already connected
25   // Set the executor to the InlineExecutor because subsequent code depends
26   // on the future callback being called inline to ensure that the handler
27   // is not garbage collected before use.
28   auto future =
29       sharedPromise_.getFuture().via(&folly::InlineExecutor::instance());
30 
31   if (connectStarted_) {
32     // Either already connected, in which case the future has the handler,
33     // or there's an outstanding connect request and the promise will be
34     // fulfilled when the connect request completes.
35     return future;
36   }
37 
38   // Kickoff connect request and fulfill all pending promises on completion
39   connectStarted_ = true;
40 
41   broadcastPool_->serverPool_->connect(client_.get(), routingData_)
42       .thenValue([this](DefaultPipeline* pipeline) {
43         DestructorGuard dg(this);
44         pipeline->setPipelineManager(this);
45 
46         auto pipelineFactory = broadcastPool_->broadcastPipelineFactory_;
47         try {
48           pipelineFactory->setRoutingData(pipeline, routingData_);
49         } catch (const std::exception& ex) {
50           handleConnectError(ex);
51           return;
52         }
53 
54         if (deletingBroadcast_) {
55           // setRoutingData() could result in an error that would cause the
56           // BroadcastPipeline to get deleted.
57           handleConnectError(std::runtime_error(
58               "Broadcast deleted due to upstream connection error"));
59           return;
60         }
61 
62         auto handler = pipelineFactory->getBroadcastHandler(pipeline);
63         CHECK(handler);
64         sharedPromise_.setValue(handler);
65 
66         // If all the observers go away before connect returns, then the
67         // BroadcastHandler will be idle without any subscribers. Close
68         // the pipeline and remove the broadcast from the pool so that
69         // connections are not leaked.
70         handler->closeIfIdle();
71       })
72       .thenError(
73           folly::tag_t<std::exception>{},
74           [this](const std::exception& ex) { handleConnectError(ex); });
75 
76   return future;
77 }
78 
79 template <typename T, typename R, typename P>
deletePipeline(PipelineBase * pipeline)80 void BroadcastPool<T, R, P>::BroadcastManager::deletePipeline(
81     PipelineBase* pipeline) {
82   CHECK(client_->getPipeline() == pipeline);
83   deletingBroadcast_ = true;
84   broadcastPool_->deleteBroadcast(routingData_);
85 }
86 
87 template <typename T, typename R, typename P>
handleConnectError(const std::exception & ex)88 void BroadcastPool<T, R, P>::BroadcastManager::handleConnectError(
89     const std::exception& ex) noexcept {
90   LOG(ERROR) << "Error connecting to upstream: " << ex.what();
91 
92   auto sharedPromise = std::move(sharedPromise_);
93   broadcastPool_->deleteBroadcast(routingData_);
94   sharedPromise.setException(folly::make_exception_wrapper<std::exception>(ex));
95 }
96 
97 template <typename T, typename R, typename P>
getHandler(const R & routingData)98 folly::Future<BroadcastHandler<T, R>*> BroadcastPool<T, R, P>::getHandler(
99     const R& routingData) {
100   const auto& iter = broadcasts_.find(routingData);
101   if (iter != broadcasts_.end()) {
102     return iter->second->getHandler();
103   }
104 
105   typename BroadcastManager::UniquePtr broadcast(
106       new BroadcastManager(this, routingData));
107 
108   auto broadcastPtr = broadcast.get();
109   broadcasts_.insert(std::make_pair(routingData, std::move(broadcast)));
110 
111   // The executor on this future is set to be an InlineExecutor to ensure that
112   // the continuation can be run inline and satisfy the lifetime requirement
113   // on the return value of this function.
114   return broadcastPtr->getHandler();
115 }
116 
117 } // namespace wangle
118