1<?php 2 3/** 4 * Copyright (c) 2006- Facebook 5 * Distributed under the Thrift Software License 6 * 7 * See accompanying file LICENSE or visit the Thrift site at: 8 * http://developers.facebook.com/thrift/ 9 * 10 * @package thrift.transport 11 * @author Mark Slee <mcslee@facebook.com> 12 */ 13 14/** Inherits from Socket */ 15include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php'; 16 17/** 18 * This library makes use of APC cache to make hosts as down in a web 19 * environment. If you are running from the CLI or on a system without APC 20 * installed, then these null functions will step in and act like cache 21 * misses. 22 */ 23if (!function_exists('apc_fetch')) { 24 function apc_fetch($key) { return FALSE; } 25 function apc_store($key, $var, $ttl=0) { return FALSE; } 26} 27 28/** 29 * Sockets implementation of the TTransport interface that allows connection 30 * to a pool of servers. 31 * 32 * @package thrift.transport 33 * @author Mark Slee <mcslee@facebook.com> 34 */ 35class TSocketPool extends TSocket { 36 37 /** 38 * Remote servers. Array of associative arrays with 'host' and 'port' keys 39 */ 40 private $servers_ = array(); 41 42 /** 43 * How many times to retry each host in connect 44 * 45 * @var int 46 */ 47 private $numRetries_ = 1; 48 49 /** 50 * Retry interval in seconds, how long to not try a host if it has been 51 * marked as down. 52 * 53 * @var int 54 */ 55 private $retryInterval_ = 60; 56 57 /** 58 * Max consecutive failures before marking a host down. 59 * 60 * @var int 61 */ 62 private $maxConsecutiveFailures_ = 1; 63 64 /** 65 * Try hosts in order? or Randomized? 66 * 67 * @var bool 68 */ 69 private $randomize_ = TRUE; 70 71 /** 72 * Always try last host, even if marked down? 73 * 74 * @var bool 75 */ 76 private $alwaysTryLast_ = TRUE; 77 78 /** 79 * Socket pool constructor 80 * 81 * @param array $hosts List of remote hostnames 82 * @param mixed $ports Array of remote ports, or a single common port 83 * @param bool $persist Whether to use a persistent socket 84 * @param mixed $debugHandler Function for error logging 85 */ 86 public function __construct($hosts=array('localhost'), 87 $ports=array(9090), 88 $persist=FALSE, 89 $debugHandler=null) { 90 parent::__construct(null, 0, $persist, $debugHandler); 91 92 if (!is_array($ports)) { 93 $port = $ports; 94 $ports = array(); 95 foreach ($hosts as $key => $val) { 96 $ports[$key] = $port; 97 } 98 } 99 100 foreach ($hosts as $key => $host) { 101 $this->servers_ []= array('host' => $host, 102 'port' => $ports[$key]); 103 } 104 } 105 106 /** 107 * Add a server to the pool 108 * 109 * This function does not prevent you from adding a duplicate server entry. 110 * 111 * @param string $host hostname or IP 112 * @param int $port port 113 */ 114 public function addServer($host, $port) { 115 $this->servers_[] = array('host' => $host, 'port' => $port); 116 } 117 118 /** 119 * Sets how many time to keep retrying a host in the connect function. 120 * 121 * @param int $numRetries 122 */ 123 public function setNumRetries($numRetries) { 124 $this->numRetries_ = $numRetries; 125 } 126 127 /** 128 * Sets how long to wait until retrying a host if it was marked down 129 * 130 * @param int $numRetries 131 */ 132 public function setRetryInterval($retryInterval) { 133 $this->retryInterval_ = $retryInterval; 134 } 135 136 /** 137 * Sets how many time to keep retrying a host before marking it as down. 138 * 139 * @param int $numRetries 140 */ 141 public function setMaxConsecutiveFailures($maxConsecutiveFailures) { 142 $this->maxConsecutiveFailures_ = $maxConsecutiveFailures; 143 } 144 145 /** 146 * Turns randomization in connect order on or off. 147 * 148 * @param bool $randomize 149 */ 150 public function setRandomize($randomize) { 151 $this->randomize_ = $randomize; 152 } 153 154 /** 155 * Whether to always try the last server. 156 * 157 * @param bool $alwaysTryLast 158 */ 159 public function setAlwaysTryLast($alwaysTryLast) { 160 $this->alwaysTryLast_ = $alwaysTryLast; 161 } 162 163 164 /** 165 * Connects the socket by iterating through all the servers in the pool 166 * and trying to find one that works. 167 */ 168 public function open() { 169 // Check if we want order randomization 170 if ($this->randomize_) { 171 shuffle($this->servers_); 172 } 173 174 // Count servers to identify the "last" one 175 $numServers = count($this->servers_); 176 177 for ($i = 0; $i < $numServers; ++$i) { 178 179 // This extracts the $host and $port variables 180 extract($this->servers_[$i]); 181 182 // Check APC cache for a record of this server being down 183 $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~'; 184 185 // Cache miss? Assume it's OK 186 $lastFailtime = apc_fetch($failtimeKey); 187 if ($lastFailtime === FALSE) { 188 $lastFailtime = 0; 189 } 190 191 $retryIntervalPassed = FALSE; 192 193 // Cache hit...make sure enough the retry interval has elapsed 194 if ($lastFailtime > 0) { 195 $elapsed = time() - $lastFailtime; 196 if ($elapsed > $this->retryInterval_) { 197 $retryIntervalPassed = TRUE; 198 if ($this->debug_) { 199 call_user_func($this->debugHandler_, 200 'TSocketPool: retryInterval '. 201 '('.$this->retryInterval_.') '. 202 'has passed for host '.$host.':'.$port); 203 } 204 } 205 } 206 207 // Only connect if not in the middle of a fail interval, OR if this 208 // is the LAST server we are trying, just hammer away on it 209 $isLastServer = FALSE; 210 if ($this->alwaysTryLast_) { 211 $isLastServer = ($i == ($numServers - 1)); 212 } 213 214 if (($lastFailtime === 0) || 215 ($isLastServer) || 216 ($lastFailtime > 0 && $retryIntervalPassed)) { 217 218 // Set underlying TSocket params to this one 219 $this->host_ = $host; 220 $this->port_ = $port; 221 222 // Try up to numRetries_ connections per server 223 for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) { 224 try { 225 // Use the underlying TSocket open function 226 parent::open(); 227 228 // Only clear the failure counts if required to do so 229 if ($lastFailtime > 0) { 230 apc_store($failtimeKey, 0); 231 } 232 233 // Successful connection, return now 234 return; 235 236 } catch (TException $tx) { 237 // Connection failed 238 } 239 } 240 241 // Mark failure of this host in the cache 242 $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~'; 243 244 // Ignore cache misses 245 $consecfails = apc_fetch($consecfailsKey); 246 if ($consecfails === FALSE) { 247 $consecfails = 0; 248 } 249 250 // Increment by one 251 $consecfails++; 252 253 // Log and cache this failure 254 if ($consecfails >= $this->maxConsecutiveFailures_) { 255 if ($this->debug_) { 256 call_user_func($this->debugHandler_, 257 'TSocketPool: marking '.$host.':'.$port. 258 ' as down for '.$this->retryInterval_.' secs '. 259 'after '.$consecfails.' failed attempts.'); 260 } 261 // Store the failure time 262 apc_store($failtimeKey, time()); 263 264 // Clear the count of consecutive failures 265 apc_store($consecfailsKey, 0); 266 } else { 267 apc_store($consecfailsKey, $consecfails); 268 } 269 } 270 } 271 272 // Holy shit we failed them all. The system is totally ill! 273 $error = 'TSocketPool: All hosts in pool are down. '; 274 $hosts = array(); 275 foreach ($this->servers_ as $server) { 276 $hosts []= $server['host'].':'.$server['port']; 277 } 278 $hostlist = implode(',', $hosts); 279 $error .= '('.$hostlist.')'; 280 if ($this->debug_) { 281 call_user_func($this->debugHandler_, $error); 282 } 283 throw new TException($error); 284 } 285} 286 287?> 288