1<?hh // strict
2/*
3 * Copyright (c) Facebook, Inc. and its affiliates.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 * @package thrift.transport
18 */
19
20/**
21 * Sockets implementation of the TTransport interface.
22 *
23 * @package thrift.transport
24 */
25class TSocket extends TTransport
26  implements TTransportStatus, InstrumentedTTransport, IThriftRemoteConn {
27
28  use InstrumentedTTransportTrait;
29
30  /**
31   * Handle to PHP socket
32   *
33   * @var resource
34   */
35  private ?resource $handle_ = null;
36
37  /**
38   * Remote hostname
39   *
40   * @var string
41   */
42  protected string $host_ = 'localhost';
43
44  /**
45   * Remote port
46   *
47   * @var int
48   */
49  protected int $port_ = 9090;
50
51  /**
52   * Local port
53   *
54   * @var int
55   */
56  protected int $lport_ = 0;
57
58  /**
59   * Send timeout in milliseconds
60   *
61   * @var int
62   */
63  private int $sendTimeout_ = 100;
64
65  /**
66   * Recv timeout in milliseconds
67   *
68   * @var int
69   */
70  private int $recvTimeout_ = 750;
71
72  /**
73   * Is send timeout set?
74   *
75   * @var bool
76   */
77  private bool $sendTimeoutSet_ = false;
78
79  /**
80   * Persistent socket or plain?
81   *
82   * @var bool
83   */
84  private bool $persist_ = false;
85
86  /**
87   * When the current read is started
88   *
89   * @var int, null means no read is started
90   */
91  private ?int $readAttemptStart_ = null;
92
93  /**
94   * When the current write is started
95   *
96   * @var int, null means no write is started
97   */
98  private ?int $writeAttemptStart_ = null;
99
100  /**
101   * Debugging on?
102   *
103   * @var bool
104   */
105  protected bool $debug_ = false;
106
107  /**
108   * Debug handler
109   *
110   * @var mixed
111   */
112  protected (function(string): bool) $debugHandler_;
113
114  /**
115   * error string (in case of open failure)
116   *
117   * @var string or null
118   */
119  protected ?string $errstr_ = null;
120
121  /**
122   * error number (in case of open failure)
123   *
124   * @var int or null
125   */
126  protected ?int $errno_ = null;
127
128  /**
129   * Specifies the maximum number of bytes to read
130   * at once from internal stream.
131   */
132  protected ?int $maxReadChunkSize_ = null;
133
134  /**
135   * Socket constructor
136   *
137   * @param string $host         Remote hostname
138   * @param int    $port         Remote port
139   * @param bool   $persist      Whether to use a persistent socket
140   * @param string $debugHandler Function to call for error logging
141   */
142  public function __construct(
143    string $host = 'localhost',
144    int $port = 9090,
145    bool $persist = false,
146    ?(function(string): bool) $debugHandler = null,
147  ) {
148      $this->host_ = $host;
149    $this->port_ = $port;
150    $this->persist_ = $persist;
151    $this->debugHandler_ = $debugHandler ?: fun('error_log');
152  }
153
154  /**
155   * Sets the internal max read chunk size.
156   * null for no limit (default).
157   */
158  public function setMaxReadChunkSize(int $maxReadChunkSize): void {
159    $this->maxReadChunkSize_ = $maxReadChunkSize;
160  }
161
162  /**
163   * Sets the socket handle
164   * @param resource $handle
165   * @return $this
166   */
167  public function setHandle(resource $handle): this {
168    $this->handle_ = $handle;
169    return $this;
170  }
171
172  /**
173   * Gets the meta_data for the current handle
174   */
175  public function getMetaData(): array<string, mixed> {
176    return stream_get_meta_data($this->handle_);
177  }
178
179  /**
180   * Gets the send timeout.
181   *
182   * @return int timeout
183   */
184  public function getSendTimeout(): int {
185    return $this->sendTimeout_;
186  }
187
188  /**
189   * Sets the send timeout.
190   *
191   * @param int $timeout  Timeout in milliseconds.
192   */
193  public function setSendTimeout(int $timeout): void {
194    $this->sendTimeout_ = $timeout;
195  }
196
197  /**
198   * Gets the receive timeout.
199   *
200   * @return int timeout
201   */
202  public function getRecvTimeout(): int {
203    return $this->recvTimeout_;
204  }
205
206  /**
207   * Sets the receive timeout.
208   *
209   * @param int $timeout  Timeout in milliseconds.
210   */
211  public function setRecvTimeout(int $timeout): void {
212    $this->recvTimeout_ = $timeout;
213  }
214
215  /**
216   * Sets debugging output on or off
217   *
218   * @param bool $debug
219   */
220  public function setDebug(bool $debug): void {
221    $this->debug_ = $debug;
222  }
223
224  /**
225   * Get the host that this socket is connected to
226   *
227   * @return string host
228   */
229  public function getHost(): string {
230    return $this->host_;
231  }
232
233  /**
234   * Get the remote port that this socket is connected to
235   *
236   * @return int port
237   */
238  public function getPort(): int {
239    return $this->port_;
240  }
241
242  /**
243   * Get the error string in case of open failure
244   *
245   * @return errstr_ or null
246   */
247  public function getErrStr(): ?string {
248    return $this->errstr_;
249  }
250
251  /**
252   * Get the error number in case of open failure
253   *
254   * @return errno_ or null
255   */
256  public function getErrNo(): ?int {
257    return $this->errno_;
258  }
259
260  /**
261   * Tests whether this is open
262   *
263   * @return bool true if the socket is open
264   */
265  public function isOpen(): bool {
266    return is_resource($this->handle_);
267  }
268
269  /**
270   * Connects the socket.
271   */
272  public function open(): void {
273    if ($this->isOpen()) {
274      throw new TTransportException(
275        'TSocket: socket already connected',
276        TTransportException::ALREADY_OPEN,
277      );
278    }
279
280    if ($this->host_ === null) {
281      throw new TTransportException(
282        'TSocket: cannot open null host',
283        TTransportException::NOT_OPEN,
284      );
285    }
286
287    if ($this->port_ <= 0) {
288      throw new TTransportException(
289        'TSocket: cannot open without port',
290        TTransportException::NOT_OPEN,
291      );
292    }
293
294    $handle = null;
295    if ($this->persist_) {
296      $handle = @pfsockopen(
297        $this->host_,
298        $this->port_,
299        $this->errno_,
300        $this->errstr_,
301        $this->sendTimeout_ / 1000.0,
302      );
303    } else {
304      $handle = @fsockopen(
305        $this->host_,
306        $this->port_,
307        $this->errno_,
308        $this->errstr_,
309        $this->sendTimeout_ / 1000.0,
310      );
311    }
312
313    // Connect failed?
314    if (!$handle) {
315      $error =
316        'TSocket: could not connect to '.$this->host_.':'.$this->port_;
317      $error .= ' ('.$this->errstr_.' ['.$this->errno_.'])';
318      if ($this->debug_) {
319        call_user_func($this->debugHandler_, $error);
320      }
321      throw new TTransportException(
322        $error,
323        TTransportException::COULD_NOT_CONNECT,
324      );
325    }
326
327    $this->handle_ = $handle;
328
329    $sock_name = stream_socket_get_name($this->handle_, false);
330    // IPv6 is returned [2401:db00:20:702c:face:0:7:0]:port
331    // or when stream_socket_get_name is buggy it is
332    // 2401:db00:20:702c:face:0:7:0:port
333    $this->lport_ = end(explode(":", $sock_name));
334
335    stream_set_timeout($this->handle_, 0, $this->sendTimeout_ * 1000);
336    $this->sendTimeoutSet_ = true;
337  }
338
339  /**
340   * Closes the socket.
341   */
342  public function close(): void {
343    if (!$this->persist_) {
344      @fclose($this->handle_);
345      $this->handle_ = null;
346    }
347  }
348
349  /**
350   *  Test to see if the socket is ready for reading. This method returns
351   *  immediately. If calling this method in a loop one should sleep or do
352   *  other work else CPU cycles will be wasted.
353   *
354   *  @return bool  True if a non-blocking read of at least one character can
355   *                be preformed on the socket.
356   */
357  public function isReadable(): bool {
358    return $this->isSocketActionable($this->handle_, $check_read = true);
359  }
360
361  /**
362   *  Test to see if the socket is ready for writing. This method returns
363   *  immediately. If calling this method in a loop one should sleep or do
364   *  other work else CPU cycles will be wasted.
365   *
366   *  @return bool True if a non-blocking write can be preformed on the socket.
367   */
368  public function isWritable(): bool {
369    $writable =
370      $this->isSocketActionable($this->handle_, $check_read = false);
371    if (!$writable && $this->sendTimeout_ > 0) {
372      if ($this->writeAttemptStart_ === null) {
373        $this->writeAttemptStart_ = microtime(true);
374      }
375      if (microtime(true) -
376          (int) $this->writeAttemptStart_ > ($this->sendTimeout_ / 1000.0)) {
377        throw new TTransportException(
378          'TSocket: socket not writable after '.$this->sendTimeout_.'ms',
379          TTransportException::TIMED_OUT,
380        );
381      }
382    }
383    return $writable;
384  }
385
386  private function isSocketActionable(
387    ?resource $socket,
388    bool $check_read,
389  ): bool {
390    // the socket is technically actionable, although any read or write will
391    // fail since close() was already called.
392    if ($socket === null) {
393      return true;
394    }
395
396    $read = $write = array();
397    if ($check_read) {
398      $read = array($socket);
399    } else {
400      $write = array($socket);
401    }
402
403    $excpt = array();
404    $ret = stream_select($read, $write, $excpt, 0, 0);
405    if ($ret === false) {
406      $error = 'TSocket: stream_select failed on socket.';
407      if ($this->debug_) {
408        call_user_func($this->debugHandler_, $error);
409      }
410      throw new TTransportException($error);
411    }
412
413    return $ret !== 0;
414  }
415
416  /**
417   * Reads maximum min($len, $maxReadChunkSize_) bytes
418   * from the stream.
419   */
420  private function readChunk(int $len): ?string {
421    if ($this->maxReadChunkSize_ !== null) {
422      $len = min($len, $this->maxReadChunkSize_);
423    }
424
425    $res = @fread($this->handle_, $len);
426    $size = strlen($res);
427
428    $this->onRead($size);
429    return $res;
430  }
431
432  /**
433   * Uses stream get contents to do the reading
434   *
435   * @param int $len How many bytes
436   * @return string Binary data
437   */
438  public function readAll(int $len): string {
439    if ($this->sendTimeoutSet_) {
440      $sec = 0;
441      if ($this->recvTimeout_ > 1000) {
442        $msec = $this->recvTimeout_ % 1000;
443        $sec = ($this->recvTimeout_ - $msec) / 1000;
444      } else {
445        $msec = $this->recvTimeout_;
446      }
447      stream_set_timeout($this->handle_, $sec, $msec * 1000);
448      $this->sendTimeoutSet_ = false;
449    }
450    // This call does not obey stream_set_timeout values!
451    // $buf = @stream_get_contents($this->handle_, $len);
452    $pre = '';
453    while (true) {
454      $t_start = microtime(true);
455      $buf = $this->readChunk($len);
456      $t_stop = microtime(true);
457      if ($buf === null || $buf === '') {
458        $read_err_detail = sprintf(
459          '%d bytes from %s:%d to localhost:%d. Spent %2.2f ms.',
460          $len,
461          $this->host_,
462          $this->port_,
463          $this->lport_,
464          ($t_stop - $t_start) * 1000,
465        );
466        $md = stream_get_meta_data($this->handle_);
467        if ($md['timed_out']) {
468          throw new TTransportException(
469            'TSocket: timeout while reading '.$read_err_detail,
470            TTransportException::TIMED_OUT,
471          );
472        } else {
473          $md_str = str_replace("\n", " ", print_r($md, true));
474          throw new TTransportException(
475            'TSocket: could not read '.$read_err_detail,
476            TTransportException::COULD_NOT_READ,
477          );
478        }
479      } else if (($sz = strlen($buf)) < $len) {
480        $md = stream_get_meta_data($this->handle_);
481        if ($md['timed_out']) {
482          $read_err_detail = sprintf(
483            '%d bytes from %s:%d to localhost:%d. Spent %2.2f ms.',
484            $len,
485            $this->host_,
486            $this->port_,
487            $this->lport_,
488            ($t_stop - $t_start) * 1000,
489          );
490          throw new TTransportException(
491            'TSocket: timeout while reading '.$read_err_detail,
492            TTransportException::TIMED_OUT,
493          );
494        } else {
495          $pre .= $buf;
496          $len -= $sz;
497        }
498      } else {
499        $this->readAttemptStart_ = null;
500        $res = $pre.$buf;
501        $this->onRead(strlen($res));
502        return $res;
503      }
504    }
505
506    throw new TTransportException("TSocket: You shouldn't be here");
507  }
508
509  /**
510   * Read from the socket
511   *
512   * @param int $len How many bytes
513   * @return string Binary data
514   */
515  public function read(int $len): string {
516    if ($this->sendTimeoutSet_) {
517      stream_set_timeout($this->handle_, 0, $this->recvTimeout_ * 1000);
518      $this->sendTimeoutSet_ = false;
519    }
520    $t_start = microtime(true);
521    $data = $this->readChunk($len);
522    $t_stop = microtime(true);
523    if ($data === null || $data === '') {
524      $read_err_detail = sprintf(
525        '%d bytes from %s:%d to localhost:%d. Spent %2.2f ms.',
526        $len,
527        $this->host_,
528        $this->port_,
529        $this->lport_,
530        ($t_stop - $t_start) * 1000,
531      );
532      $md = stream_get_meta_data($this->handle_);
533      if ($md['timed_out']) {
534        throw new TTransportException(
535          'TSocket: timeout while reading '.$read_err_detail,
536          TTransportException::TIMED_OUT,
537        );
538      } else {
539        $md_str = str_replace("\n", " ", print_r($md, true));
540        throw new TTransportException(
541          'TSocket: could not read '.$read_err_detail,
542          TTransportException::COULD_NOT_READ,
543        );
544      }
545    } else {
546      $this->readAttemptStart_ = null;
547    }
548
549    $this->onRead(strlen($data));
550    return $data;
551  }
552
553  /**
554   * Perform a nonblocking read.
555   * @param int $len Number of bytes to read
556   * @return string Binary data or '' is no data is read
557   */
558  public function nonBlockingRead(int $len): string {
559    $md = stream_get_meta_data($this->handle_);
560    $is_blocking = $md['blocked'];
561
562    // If the stream is currently blocking, we will set to nonblocking
563    // first
564    if ($is_blocking && !stream_set_blocking($this->handle_, 0)) {
565      throw new TTransportException(
566        'TSocket: '.'cannot set stream to non-blocking',
567      );
568    }
569
570    $data = $this->readChunk($len);
571
572    if ($data === null) {
573      throw new TTransportException('TSocket: failed in non-blocking read');
574    }
575
576    // Switch back to blocking mode is necessary
577    if ($is_blocking && !stream_set_blocking($this->handle_, 1)) {
578      throw new TTransportException(
579        'TSocket: '.'cannot swtich stream back to blocking',
580      );
581    }
582    $this->onRead(strlen($data));
583    return $data;
584  }
585
586  /**
587   * Write to the socket.
588   *
589   * @param string $buf The data to write
590   */
591  public function write(string $buf): void {
592    if ($this->handle_ === null) {
593      throw new TException('TSocket: handle_ is null');
594    }
595
596    $this->onWrite(strlen($buf));
597
598    if (!$this->sendTimeoutSet_) {
599      stream_set_timeout($this->handle_, 0, $this->sendTimeout_ * 1000);
600      $this->sendTimeoutSet_ = true;
601    }
602
603    while (strlen($buf) > 0) {
604      $buflen = strlen($buf);
605      $t_start = microtime(true);
606      $got = @fwrite($this->handle_, $buf);
607      $write_time = microtime(true) - $t_start;
608
609      if ($got === 0 || !is_int($got)) {
610        $read_err_detail = sprintf(
611          '%d bytes from %s:%d to localhost:%d. Spent %2.2f ms.',
612          $buflen,
613          $this->host_,
614          $this->port_,
615          $this->lport_,
616          $write_time * 1000,
617        );
618        $md = stream_get_meta_data($this->handle_);
619        if ($md['timed_out']) {
620          throw new TTransportException(
621            'TSocket: timeout while writing '.$read_err_detail,
622            TTransportException::TIMED_OUT,
623          );
624        } else {
625          $md_str = str_replace("\n", " ", print_r($md, true));
626          throw new TTransportException(
627            'TSocket: could not write '.$read_err_detail,
628            TTransportException::COULD_NOT_WRITE,
629          );
630        }
631      }
632      $buf = substr($buf, $got);
633    }
634
635    $this->writeAttemptStart_ = null;
636  }
637
638  /**
639   * Flush output to the socket.
640   */
641  public function flush(): void {
642    $ret = fflush($this->handle_);
643
644    if ($ret === false) {
645      throw new TTransportException(
646        'TSocket: could not flush '.$this->host_.':'.$this->port_,
647      );
648    }
649  }
650}
651