1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <folly/ThreadLocal.h> 20 #include <folly/futures/SharedPromise.h> 21 #include <folly/io/async/DelayedDestruction.h> 22 #include <wangle/bootstrap/BaseClientBootstrap.h> 23 #include <wangle/bootstrap/ClientBootstrap.h> 24 #include <wangle/channel/Pipeline.h> 25 #include <wangle/channel/broadcast/BroadcastHandler.h> 26 27 namespace wangle { 28 29 template <typename R, typename P = DefaultPipeline> 30 class ServerPool { 31 public: ~ServerPool()32 virtual ~ServerPool() {} 33 34 /** 35 * Kick off an upstream connect request given the BaseClientBootstrap 36 * when a broadcast is not available locally. 37 */ 38 virtual folly::Future<P*> connect( 39 BaseClientBootstrap<P>* client, 40 const R& routingData) noexcept = 0; 41 }; 42 43 /** 44 * A pool of upstream broadcast pipelines. There is atmost one broadcast 45 * for any unique routing data. Creates and maintains upstream connections 46 * and broadcast pipeliens as necessary. 47 * 48 * Meant to be used as a thread-local instance. 49 */ 50 template <typename T, typename R, typename P = DefaultPipeline> 51 class BroadcastPool { 52 public: 53 class BroadcastManager : public PipelineManager, 54 public folly::DelayedDestruction { 55 public: 56 using UniquePtr = folly::DelayedDestructionUniquePtr<BroadcastManager>; 57 BroadcastManager(BroadcastPool<T,R,P> * broadcastPool,const R & routingData)58 BroadcastManager( 59 BroadcastPool<T, R, P>* broadcastPool, 60 const R& routingData) 61 : broadcastPool_(broadcastPool), 62 routingData_(routingData), 63 client_(broadcastPool_->clientBootstrapFactory_->newClient()) { 64 client_->pipelineFactory(broadcastPool_->broadcastPipelineFactory_); 65 } 66 ~BroadcastManager()67 ~BroadcastManager() override { 68 if (client_->getPipeline()) { 69 client_->getPipeline()->setPipelineManager(nullptr); 70 } 71 } 72 73 folly::Future<BroadcastHandler<T, R>*> getHandler(); 74 75 // PipelineManager implementation 76 void deletePipeline(PipelineBase* pipeline) override; 77 78 private: 79 void handleConnectError(const std::exception& ex) noexcept; 80 81 BroadcastPool<T, R, P>* broadcastPool_{nullptr}; 82 R routingData_; 83 84 std::unique_ptr<BaseClientBootstrap<P>> client_; 85 86 bool connectStarted_{false}; 87 bool deletingBroadcast_{false}; 88 folly::SharedPromise<BroadcastHandler<T, R>*> sharedPromise_; 89 }; 90 91 BroadcastPool( 92 std::shared_ptr<ServerPool<R, P>> serverPool, 93 std::shared_ptr<BroadcastPipelineFactory<T, R>> pipelineFactory, 94 std::shared_ptr<BaseClientBootstrapFactory<>> clientFactory = 95 std::make_shared<ClientBootstrapFactory>()) serverPool_(serverPool)96 : serverPool_(serverPool), 97 broadcastPipelineFactory_(pipelineFactory), 98 clientBootstrapFactory_(clientFactory) {} 99 ~BroadcastPool()100 virtual ~BroadcastPool() {} 101 102 // Non-copyable 103 BroadcastPool(const BroadcastPool&) = delete; 104 BroadcastPool& operator=(const BroadcastPool&) = delete; 105 106 // Movable 107 BroadcastPool(BroadcastPool&&) = default; 108 BroadcastPool& operator=(BroadcastPool&&) = default; 109 110 /** 111 * Gets the BroadcastHandler, or creates one if it doesn't exist already, 112 * for the given routingData. 113 * 114 * If a broadcast is already available for the given routingData, 115 * returns the BroadcastHandler from the pipeline. If not, an upstream 116 * connection is created and stored along with a new broadcast pipeline 117 * for this routingData, and its BroadcastHandler is returned. 118 * 119 * Caller should immediately subscribe to the returned BroadcastHandler 120 * to prevent it from being garbage collected. 121 * Note that to ensure that this works correctly, the returned future 122 * completes on an InlineExecutor such that .then will be called inline with 123 * satisfaction of the underlying promise. 124 */ 125 virtual folly::Future<BroadcastHandler<T, R>*> getHandler( 126 const R& routingData); 127 128 /** 129 * Checks if a broadcast is available locally for the given routingData. 130 */ isBroadcasting(const R & routingData)131 bool isBroadcasting(const R& routingData) { 132 return (broadcasts_.find(routingData) != broadcasts_.end()); 133 } 134 deleteBroadcast(const R & routingData)135 virtual void deleteBroadcast(const R& routingData) { 136 broadcasts_.erase(routingData); 137 } 138 139 private: 140 std::shared_ptr<ServerPool<R, P>> serverPool_; 141 std::shared_ptr<BroadcastPipelineFactory<T, R>> broadcastPipelineFactory_; 142 std::shared_ptr<BaseClientBootstrapFactory<>> clientBootstrapFactory_; 143 std::map<R, typename BroadcastManager::UniquePtr> broadcasts_; 144 }; 145 146 } // namespace wangle 147 148 #include <wangle/channel/broadcast/BroadcastPool-inl.h> 149