1<?php
2/*
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 *   http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 * @package thrift.transport
21 */
22
23namespace Thrift\Transport;
24
25use Thrift\Exception\TException;
26
27/**
28 * This library makes use of APCu cache to make hosts as down in a web
29 * environment. If you are running from the CLI or on a system without APCu
30 * installed, then these null functions will step in and act like cache
31 * misses.
32 */
33if (!function_exists('apcu_fetch')) {
34    function apcu_fetch($key)
35    {
36        return false;
37    }
38
39    function apcu_store($key, $var, $ttl = 0)
40    {
41        return false;
42    }
43}
44
45/**
46 * Sockets implementation of the TTransport interface that allows connection
47 * to a pool of servers.
48 *
49 * @package thrift.transport
50 */
51class TSocketPool extends TSocket
52{
53    /**
54     * Remote servers. Array of associative arrays with 'host' and 'port' keys
55     */
56    private $servers_ = array();
57
58    /**
59     * How many times to retry each host in connect
60     *
61     * @var int
62     */
63    private $numRetries_ = 1;
64
65    /**
66     * Retry interval in seconds, how long to not try a host if it has been
67     * marked as down.
68     *
69     * @var int
70     */
71    private $retryInterval_ = 60;
72
73    /**
74     * Max consecutive failures before marking a host down.
75     *
76     * @var int
77     */
78    private $maxConsecutiveFailures_ = 1;
79
80    /**
81     * Try hosts in order? or Randomized?
82     *
83     * @var bool
84     */
85    private $randomize_ = true;
86
87    /**
88     * Always try last host, even if marked down?
89     *
90     * @var bool
91     */
92    private $alwaysTryLast_ = true;
93
94    /**
95     * Socket pool constructor
96     *
97     * @param array $hosts List of remote hostnames
98     * @param mixed $ports Array of remote ports, or a single common port
99     * @param bool $persist Whether to use a persistent socket
100     * @param mixed $debugHandler Function for error logging
101     */
102    public function __construct(
103        $hosts = array('localhost'),
104        $ports = array(9090),
105        $persist = false,
106        $debugHandler = null
107    ) {
108        parent::__construct(null, 0, $persist, $debugHandler);
109
110        if (!is_array($ports)) {
111            $port = $ports;
112            $ports = array();
113            foreach ($hosts as $key => $val) {
114                $ports[$key] = $port;
115            }
116        }
117
118        foreach ($hosts as $key => $host) {
119            $this->servers_ [] = array('host' => $host,
120                'port' => $ports[$key]);
121        }
122    }
123
124    /**
125     * Add a server to the pool
126     *
127     * This function does not prevent you from adding a duplicate server entry.
128     *
129     * @param string $host hostname or IP
130     * @param int $port port
131     */
132    public function addServer($host, $port)
133    {
134        $this->servers_[] = array('host' => $host, 'port' => $port);
135    }
136
137    /**
138     * Sets how many time to keep retrying a host in the connect function.
139     *
140     * @param int $numRetries
141     */
142    public function setNumRetries($numRetries)
143    {
144        $this->numRetries_ = $numRetries;
145    }
146
147    /**
148     * Sets how long to wait until retrying a host if it was marked down
149     *
150     * @param int $numRetries
151     */
152    public function setRetryInterval($retryInterval)
153    {
154        $this->retryInterval_ = $retryInterval;
155    }
156
157    /**
158     * Sets how many time to keep retrying a host before marking it as down.
159     *
160     * @param int $numRetries
161     */
162    public function setMaxConsecutiveFailures($maxConsecutiveFailures)
163    {
164        $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
165    }
166
167    /**
168     * Turns randomization in connect order on or off.
169     *
170     * @param bool $randomize
171     */
172    public function setRandomize($randomize)
173    {
174        $this->randomize_ = $randomize;
175    }
176
177    /**
178     * Whether to always try the last server.
179     *
180     * @param bool $alwaysTryLast
181     */
182    public function setAlwaysTryLast($alwaysTryLast)
183    {
184        $this->alwaysTryLast_ = $alwaysTryLast;
185    }
186
187    /**
188     * Connects the socket by iterating through all the servers in the pool
189     * and trying to find one that works.
190     */
191    public function open()
192    {
193        // Check if we want order randomization
194        if ($this->randomize_) {
195            shuffle($this->servers_);
196        }
197
198        // Count servers to identify the "last" one
199        $numServers = count($this->servers_);
200
201        for ($i = 0; $i < $numServers; ++$i) {
202            // This extracts the $host and $port variables
203            extract($this->servers_[$i]);
204
205            // Check APCu cache for a record of this server being down
206            $failtimeKey = 'thrift_failtime:' . $host . ':' . $port . '~';
207
208            // Cache miss? Assume it's OK
209            $lastFailtime = apcu_fetch($failtimeKey);
210            if ($lastFailtime === false) {
211                $lastFailtime = 0;
212            }
213
214            $retryIntervalPassed = false;
215
216            // Cache hit...make sure enough the retry interval has elapsed
217            if ($lastFailtime > 0) {
218                $elapsed = time() - $lastFailtime;
219                if ($elapsed > $this->retryInterval_) {
220                    $retryIntervalPassed = true;
221                    if ($this->debug_) {
222                        call_user_func(
223                            $this->debugHandler_,
224                            'TSocketPool: retryInterval ' .
225                            '(' . $this->retryInterval_ . ') ' .
226                            'has passed for host ' . $host . ':' . $port
227                        );
228                    }
229                }
230            }
231
232            // Only connect if not in the middle of a fail interval, OR if this
233            // is the LAST server we are trying, just hammer away on it
234            $isLastServer = false;
235            if ($this->alwaysTryLast_) {
236                $isLastServer = ($i == ($numServers - 1));
237            }
238
239            if (($lastFailtime === 0) ||
240                ($isLastServer) ||
241                ($lastFailtime > 0 && $retryIntervalPassed)) {
242                // Set underlying TSocket params to this one
243                $this->host_ = $host;
244                $this->port_ = $port;
245
246                // Try up to numRetries_ connections per server
247                for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
248                    try {
249                        // Use the underlying TSocket open function
250                        parent::open();
251
252                        // Only clear the failure counts if required to do so
253                        if ($lastFailtime > 0) {
254                            apcu_store($failtimeKey, 0);
255                        }
256
257                        // Successful connection, return now
258                        return;
259                    } catch (TException $tx) {
260                        // Connection failed
261                    }
262                }
263
264                // Mark failure of this host in the cache
265                $consecfailsKey = 'thrift_consecfails:' . $host . ':' . $port . '~';
266
267                // Ignore cache misses
268                $consecfails = apcu_fetch($consecfailsKey);
269                if ($consecfails === false) {
270                    $consecfails = 0;
271                }
272
273                // Increment by one
274                $consecfails++;
275
276                // Log and cache this failure
277                if ($consecfails >= $this->maxConsecutiveFailures_) {
278                    if ($this->debug_) {
279                        call_user_func(
280                            $this->debugHandler_,
281                            'TSocketPool: marking ' . $host . ':' . $port .
282                            ' as down for ' . $this->retryInterval_ . ' secs ' .
283                            'after ' . $consecfails . ' failed attempts.'
284                        );
285                    }
286                    // Store the failure time
287                    apcu_store($failtimeKey, time());
288
289                    // Clear the count of consecutive failures
290                    apcu_store($consecfailsKey, 0);
291                } else {
292                    apcu_store($consecfailsKey, $consecfails);
293                }
294            }
295        }
296
297        // Oh no; we failed them all. The system is totally ill!
298        $error = 'TSocketPool: All hosts in pool are down. ';
299        $hosts = array();
300        foreach ($this->servers_ as $server) {
301            $hosts [] = $server['host'] . ':' . $server['port'];
302        }
303        $hostlist = implode(',', $hosts);
304        $error .= '(' . $hostlist . ')';
305        if ($this->debug_) {
306            call_user_func($this->debugHandler_, $error);
307        }
308        throw new TException($error);
309    }
310}
311