1<?php 2/* 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 * @package thrift.transport 21 */ 22 23namespace Thrift\Transport; 24 25use Thrift\Exception\TException; 26 27/** 28 * This library makes use of APCu cache to make hosts as down in a web 29 * environment. If you are running from the CLI or on a system without APCu 30 * installed, then these null functions will step in and act like cache 31 * misses. 32 */ 33if (!function_exists('apcu_fetch')) { 34 function apcu_fetch($key) 35 { 36 return false; 37 } 38 39 function apcu_store($key, $var, $ttl = 0) 40 { 41 return false; 42 } 43} 44 45/** 46 * Sockets implementation of the TTransport interface that allows connection 47 * to a pool of servers. 48 * 49 * @package thrift.transport 50 */ 51class TSocketPool extends TSocket 52{ 53 /** 54 * Remote servers. Array of associative arrays with 'host' and 'port' keys 55 */ 56 private $servers_ = array(); 57 58 /** 59 * How many times to retry each host in connect 60 * 61 * @var int 62 */ 63 private $numRetries_ = 1; 64 65 /** 66 * Retry interval in seconds, how long to not try a host if it has been 67 * marked as down. 68 * 69 * @var int 70 */ 71 private $retryInterval_ = 60; 72 73 /** 74 * Max consecutive failures before marking a host down. 75 * 76 * @var int 77 */ 78 private $maxConsecutiveFailures_ = 1; 79 80 /** 81 * Try hosts in order? or Randomized? 82 * 83 * @var bool 84 */ 85 private $randomize_ = true; 86 87 /** 88 * Always try last host, even if marked down? 89 * 90 * @var bool 91 */ 92 private $alwaysTryLast_ = true; 93 94 /** 95 * Socket pool constructor 96 * 97 * @param array $hosts List of remote hostnames 98 * @param mixed $ports Array of remote ports, or a single common port 99 * @param bool $persist Whether to use a persistent socket 100 * @param mixed $debugHandler Function for error logging 101 */ 102 public function __construct( 103 $hosts = array('localhost'), 104 $ports = array(9090), 105 $persist = false, 106 $debugHandler = null 107 ) { 108 parent::__construct(null, 0, $persist, $debugHandler); 109 110 if (!is_array($ports)) { 111 $port = $ports; 112 $ports = array(); 113 foreach ($hosts as $key => $val) { 114 $ports[$key] = $port; 115 } 116 } 117 118 foreach ($hosts as $key => $host) { 119 $this->servers_ [] = array('host' => $host, 120 'port' => $ports[$key]); 121 } 122 } 123 124 /** 125 * Add a server to the pool 126 * 127 * This function does not prevent you from adding a duplicate server entry. 128 * 129 * @param string $host hostname or IP 130 * @param int $port port 131 */ 132 public function addServer($host, $port) 133 { 134 $this->servers_[] = array('host' => $host, 'port' => $port); 135 } 136 137 /** 138 * Sets how many time to keep retrying a host in the connect function. 139 * 140 * @param int $numRetries 141 */ 142 public function setNumRetries($numRetries) 143 { 144 $this->numRetries_ = $numRetries; 145 } 146 147 /** 148 * Sets how long to wait until retrying a host if it was marked down 149 * 150 * @param int $numRetries 151 */ 152 public function setRetryInterval($retryInterval) 153 { 154 $this->retryInterval_ = $retryInterval; 155 } 156 157 /** 158 * Sets how many time to keep retrying a host before marking it as down. 159 * 160 * @param int $numRetries 161 */ 162 public function setMaxConsecutiveFailures($maxConsecutiveFailures) 163 { 164 $this->maxConsecutiveFailures_ = $maxConsecutiveFailures; 165 } 166 167 /** 168 * Turns randomization in connect order on or off. 169 * 170 * @param bool $randomize 171 */ 172 public function setRandomize($randomize) 173 { 174 $this->randomize_ = $randomize; 175 } 176 177 /** 178 * Whether to always try the last server. 179 * 180 * @param bool $alwaysTryLast 181 */ 182 public function setAlwaysTryLast($alwaysTryLast) 183 { 184 $this->alwaysTryLast_ = $alwaysTryLast; 185 } 186 187 /** 188 * Connects the socket by iterating through all the servers in the pool 189 * and trying to find one that works. 190 */ 191 public function open() 192 { 193 // Check if we want order randomization 194 if ($this->randomize_) { 195 shuffle($this->servers_); 196 } 197 198 // Count servers to identify the "last" one 199 $numServers = count($this->servers_); 200 201 for ($i = 0; $i < $numServers; ++$i) { 202 // This extracts the $host and $port variables 203 extract($this->servers_[$i]); 204 205 // Check APCu cache for a record of this server being down 206 $failtimeKey = 'thrift_failtime:' . $host . ':' . $port . '~'; 207 208 // Cache miss? Assume it's OK 209 $lastFailtime = apcu_fetch($failtimeKey); 210 if ($lastFailtime === false) { 211 $lastFailtime = 0; 212 } 213 214 $retryIntervalPassed = false; 215 216 // Cache hit...make sure enough the retry interval has elapsed 217 if ($lastFailtime > 0) { 218 $elapsed = time() - $lastFailtime; 219 if ($elapsed > $this->retryInterval_) { 220 $retryIntervalPassed = true; 221 if ($this->debug_) { 222 call_user_func( 223 $this->debugHandler_, 224 'TSocketPool: retryInterval ' . 225 '(' . $this->retryInterval_ . ') ' . 226 'has passed for host ' . $host . ':' . $port 227 ); 228 } 229 } 230 } 231 232 // Only connect if not in the middle of a fail interval, OR if this 233 // is the LAST server we are trying, just hammer away on it 234 $isLastServer = false; 235 if ($this->alwaysTryLast_) { 236 $isLastServer = ($i == ($numServers - 1)); 237 } 238 239 if (($lastFailtime === 0) || 240 ($isLastServer) || 241 ($lastFailtime > 0 && $retryIntervalPassed)) { 242 // Set underlying TSocket params to this one 243 $this->host_ = $host; 244 $this->port_ = $port; 245 246 // Try up to numRetries_ connections per server 247 for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) { 248 try { 249 // Use the underlying TSocket open function 250 parent::open(); 251 252 // Only clear the failure counts if required to do so 253 if ($lastFailtime > 0) { 254 apcu_store($failtimeKey, 0); 255 } 256 257 // Successful connection, return now 258 return; 259 } catch (TException $tx) { 260 // Connection failed 261 } 262 } 263 264 // Mark failure of this host in the cache 265 $consecfailsKey = 'thrift_consecfails:' . $host . ':' . $port . '~'; 266 267 // Ignore cache misses 268 $consecfails = apcu_fetch($consecfailsKey); 269 if ($consecfails === false) { 270 $consecfails = 0; 271 } 272 273 // Increment by one 274 $consecfails++; 275 276 // Log and cache this failure 277 if ($consecfails >= $this->maxConsecutiveFailures_) { 278 if ($this->debug_) { 279 call_user_func( 280 $this->debugHandler_, 281 'TSocketPool: marking ' . $host . ':' . $port . 282 ' as down for ' . $this->retryInterval_ . ' secs ' . 283 'after ' . $consecfails . ' failed attempts.' 284 ); 285 } 286 // Store the failure time 287 apcu_store($failtimeKey, time()); 288 289 // Clear the count of consecutive failures 290 apcu_store($consecfailsKey, 0); 291 } else { 292 apcu_store($consecfailsKey, $consecfails); 293 } 294 } 295 } 296 297 // Oh no; we failed them all. The system is totally ill! 298 $error = 'TSocketPool: All hosts in pool are down. '; 299 $hosts = array(); 300 foreach ($this->servers_ as $server) { 301 $hosts [] = $server['host'] . ':' . $server['port']; 302 } 303 $hostlist = implode(',', $hosts); 304 $error .= '(' . $hostlist . ')'; 305 if ($this->debug_) { 306 call_user_func($this->debugHandler_, $error); 307 } 308 throw new TException($error); 309 } 310} 311