1<?php
2
3/**
4 * Copyright (c) 2006- Facebook
5 * Distributed under the Thrift Software License
6 *
7 * See accompanying file LICENSE or visit the Thrift site at:
8 * http://developers.facebook.com/thrift/
9 *
10 * @package thrift.transport
11 * @author Mark Slee <mcslee@facebook.com>
12 */
13
14/** Inherits from Socket */
15include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
16
17/**
18 * This library makes use of APC cache to make hosts as down in a web
19 * environment. If you are running from the CLI or on a system without APC
20 * installed, then these null functions will step in and act like cache
21 * misses.
22 */
23if (!function_exists('apc_fetch')) {
24  function apc_fetch($key) { return FALSE; }
25  function apc_store($key, $var, $ttl=0) { return FALSE; }
26}
27
28/**
29 * Sockets implementation of the TTransport interface that allows connection
30 * to a pool of servers.
31 *
32 * @package thrift.transport
33 * @author Mark Slee <mcslee@facebook.com>
34 */
35class TSocketPool extends TSocket {
36
37  /**
38   * Remote servers. Array of associative arrays with 'host' and 'port' keys
39   */
40  private $servers_ = array();
41
42  /**
43   * How many times to retry each host in connect
44   *
45   * @var int
46   */
47  private $numRetries_ = 1;
48
49  /**
50   * Retry interval in seconds, how long to not try a host if it has been
51   * marked as down.
52   *
53   * @var int
54   */
55  private $retryInterval_ = 60;
56
57  /**
58   * Max consecutive failures before marking a host down.
59   *
60   * @var int
61   */
62  private $maxConsecutiveFailures_ = 1;
63
64  /**
65   * Try hosts in order? or Randomized?
66   *
67   * @var bool
68   */
69  private $randomize_ = TRUE;
70
71  /**
72   * Always try last host, even if marked down?
73   *
74   * @var bool
75   */
76  private $alwaysTryLast_ = TRUE;
77
78  /**
79   * Socket pool constructor
80   *
81   * @param array  $hosts        List of remote hostnames
82   * @param mixed  $ports        Array of remote ports, or a single common port
83   * @param bool   $persist      Whether to use a persistent socket
84   * @param mixed  $debugHandler Function for error logging
85   */
86  public function __construct($hosts=array('localhost'),
87                              $ports=array(9090),
88                              $persist=FALSE,
89                              $debugHandler=null) {
90    parent::__construct(null, 0, $persist, $debugHandler);
91
92    if (!is_array($ports)) {
93      $port = $ports;
94      $ports = array();
95      foreach ($hosts as $key => $val) {
96        $ports[$key] = $port;
97      }
98    }
99
100    foreach ($hosts as $key => $host) {
101      $this->servers_ []= array('host' => $host,
102                                'port' => $ports[$key]);
103    }
104  }
105
106  /**
107   * Add a server to the pool
108   *
109   * This function does not prevent you from adding a duplicate server entry.
110   *
111   * @param string $host hostname or IP
112   * @param int $port port
113   */
114  public function addServer($host, $port) {
115    $this->servers_[] = array('host' => $host, 'port' => $port);
116  }
117
118  /**
119   * Sets how many time to keep retrying a host in the connect function.
120   *
121   * @param int $numRetries
122   */
123  public function setNumRetries($numRetries) {
124    $this->numRetries_ = $numRetries;
125  }
126
127  /**
128   * Sets how long to wait until retrying a host if it was marked down
129   *
130   * @param int $numRetries
131   */
132  public function setRetryInterval($retryInterval) {
133    $this->retryInterval_ = $retryInterval;
134  }
135
136  /**
137   * Sets how many time to keep retrying a host before marking it as down.
138   *
139   * @param int $numRetries
140   */
141  public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
142    $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
143  }
144
145  /**
146   * Turns randomization in connect order on or off.
147   *
148   * @param bool $randomize
149   */
150  public function setRandomize($randomize) {
151    $this->randomize_ = $randomize;
152  }
153
154  /**
155   * Whether to always try the last server.
156   *
157   * @param bool $alwaysTryLast
158   */
159  public function setAlwaysTryLast($alwaysTryLast) {
160    $this->alwaysTryLast_ = $alwaysTryLast;
161  }
162
163
164  /**
165   * Connects the socket by iterating through all the servers in the pool
166   * and trying to find one that works.
167   */
168  public function open() {
169    // Check if we want order randomization
170    if ($this->randomize_) {
171      shuffle($this->servers_);
172    }
173
174    // Count servers to identify the "last" one
175    $numServers = count($this->servers_);
176
177    for ($i = 0; $i < $numServers; ++$i) {
178
179      // This extracts the $host and $port variables
180      extract($this->servers_[$i]);
181
182      // Check APC cache for a record of this server being down
183      $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
184
185      // Cache miss? Assume it's OK
186      $lastFailtime = apc_fetch($failtimeKey);
187      if ($lastFailtime === FALSE) {
188        $lastFailtime = 0;
189      }
190
191      $retryIntervalPassed = FALSE;
192
193      // Cache hit...make sure enough the retry interval has elapsed
194      if ($lastFailtime > 0) {
195        $elapsed = time() - $lastFailtime;
196        if ($elapsed > $this->retryInterval_) {
197          $retryIntervalPassed = TRUE;
198          if ($this->debug_) {
199            call_user_func($this->debugHandler_,
200                           'TSocketPool: retryInterval '.
201                           '('.$this->retryInterval_.') '.
202                           'has passed for host '.$host.':'.$port);
203          }
204        }
205      }
206
207      // Only connect if not in the middle of a fail interval, OR if this
208      // is the LAST server we are trying, just hammer away on it
209      $isLastServer = FALSE;
210      if ($this->alwaysTryLast_) {
211        $isLastServer = ($i == ($numServers - 1));
212      }
213
214      if (($lastFailtime === 0) ||
215          ($isLastServer) ||
216          ($lastFailtime > 0 && $retryIntervalPassed)) {
217
218        // Set underlying TSocket params to this one
219        $this->host_ = $host;
220        $this->port_ = $port;
221
222        // Try up to numRetries_ connections per server
223        for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
224          try {
225            // Use the underlying TSocket open function
226            parent::open();
227
228            // Only clear the failure counts if required to do so
229            if ($lastFailtime > 0) {
230              apc_store($failtimeKey, 0);
231            }
232
233            // Successful connection, return now
234            return;
235
236          } catch (TException $tx) {
237            // Connection failed
238          }
239        }
240
241        // Mark failure of this host in the cache
242        $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
243
244        // Ignore cache misses
245        $consecfails = apc_fetch($consecfailsKey);
246        if ($consecfails === FALSE) {
247          $consecfails = 0;
248        }
249
250        // Increment by one
251        $consecfails++;
252
253        // Log and cache this failure
254        if ($consecfails >= $this->maxConsecutiveFailures_) {
255          if ($this->debug_) {
256            call_user_func($this->debugHandler_,
257                           'TSocketPool: marking '.$host.':'.$port.
258                           ' as down for '.$this->retryInterval_.' secs '.
259                           'after '.$consecfails.' failed attempts.');
260          }
261          // Store the failure time
262          apc_store($failtimeKey, time());
263
264          // Clear the count of consecutive failures
265          apc_store($consecfailsKey, 0);
266        } else {
267          apc_store($consecfailsKey, $consecfails);
268        }
269      }
270    }
271
272    // Holy shit we failed them all. The system is totally ill!
273    $error = 'TSocketPool: All hosts in pool are down. ';
274    $hosts = array();
275    foreach ($this->servers_ as $server) {
276      $hosts []= $server['host'].':'.$server['port'];
277    }
278    $hostlist = implode(',', $hosts);
279    $error .= '('.$hostlist.')';
280    if ($this->debug_) {
281      call_user_func($this->debugHandler_, $error);
282    }
283    throw new TException($error);
284  }
285}
286
287?>
288