1<?hh // strict 2/* 3 * Copyright (c) Facebook, Inc. and its affiliates. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 * @package thrift.transport 18 */ 19 20/** 21 * Sockets implementation of the TTransport interface. 22 * 23 * @package thrift.transport 24 */ 25class TSocket extends TTransport 26 implements TTransportStatus, InstrumentedTTransport, IThriftRemoteConn { 27 28 use InstrumentedTTransportTrait; 29 30 /** 31 * Handle to PHP socket 32 * 33 * @var resource 34 */ 35 private ?resource $handle_ = null; 36 37 /** 38 * Remote hostname 39 * 40 * @var string 41 */ 42 protected string $host_ = 'localhost'; 43 44 /** 45 * Remote port 46 * 47 * @var int 48 */ 49 protected int $port_ = 9090; 50 51 /** 52 * Local port 53 * 54 * @var int 55 */ 56 protected int $lport_ = 0; 57 58 /** 59 * Send timeout in milliseconds 60 * 61 * @var int 62 */ 63 private int $sendTimeout_ = 100; 64 65 /** 66 * Recv timeout in milliseconds 67 * 68 * @var int 69 */ 70 private int $recvTimeout_ = 750; 71 72 /** 73 * Is send timeout set? 74 * 75 * @var bool 76 */ 77 private bool $sendTimeoutSet_ = false; 78 79 /** 80 * Persistent socket or plain? 81 * 82 * @var bool 83 */ 84 private bool $persist_ = false; 85 86 /** 87 * When the current read is started 88 * 89 * @var int, null means no read is started 90 */ 91 private ?int $readAttemptStart_ = null; 92 93 /** 94 * When the current write is started 95 * 96 * @var int, null means no write is started 97 */ 98 private ?int $writeAttemptStart_ = null; 99 100 /** 101 * Debugging on? 102 * 103 * @var bool 104 */ 105 protected bool $debug_ = false; 106 107 /** 108 * Debug handler 109 * 110 * @var mixed 111 */ 112 protected (function(string): bool) $debugHandler_; 113 114 /** 115 * error string (in case of open failure) 116 * 117 * @var string or null 118 */ 119 protected ?string $errstr_ = null; 120 121 /** 122 * error number (in case of open failure) 123 * 124 * @var int or null 125 */ 126 protected ?int $errno_ = null; 127 128 /** 129 * Specifies the maximum number of bytes to read 130 * at once from internal stream. 131 */ 132 protected ?int $maxReadChunkSize_ = null; 133 134 /** 135 * Socket constructor 136 * 137 * @param string $host Remote hostname 138 * @param int $port Remote port 139 * @param bool $persist Whether to use a persistent socket 140 * @param string $debugHandler Function to call for error logging 141 */ 142 public function __construct( 143 string $host = 'localhost', 144 int $port = 9090, 145 bool $persist = false, 146 ?(function(string): bool) $debugHandler = null, 147 ) { 148 $this->host_ = $host; 149 $this->port_ = $port; 150 $this->persist_ = $persist; 151 $this->debugHandler_ = $debugHandler ?: fun('error_log'); 152 } 153 154 /** 155 * Sets the internal max read chunk size. 156 * null for no limit (default). 157 */ 158 public function setMaxReadChunkSize(int $maxReadChunkSize): void { 159 $this->maxReadChunkSize_ = $maxReadChunkSize; 160 } 161 162 /** 163 * Sets the socket handle 164 * @param resource $handle 165 * @return $this 166 */ 167 public function setHandle(resource $handle): this { 168 $this->handle_ = $handle; 169 return $this; 170 } 171 172 /** 173 * Gets the meta_data for the current handle 174 */ 175 public function getMetaData(): array<string, mixed> { 176 return stream_get_meta_data($this->handle_); 177 } 178 179 /** 180 * Gets the send timeout. 181 * 182 * @return int timeout 183 */ 184 public function getSendTimeout(): int { 185 return $this->sendTimeout_; 186 } 187 188 /** 189 * Sets the send timeout. 190 * 191 * @param int $timeout Timeout in milliseconds. 192 */ 193 public function setSendTimeout(int $timeout): void { 194 $this->sendTimeout_ = $timeout; 195 } 196 197 /** 198 * Gets the receive timeout. 199 * 200 * @return int timeout 201 */ 202 public function getRecvTimeout(): int { 203 return $this->recvTimeout_; 204 } 205 206 /** 207 * Sets the receive timeout. 208 * 209 * @param int $timeout Timeout in milliseconds. 210 */ 211 public function setRecvTimeout(int $timeout): void { 212 $this->recvTimeout_ = $timeout; 213 } 214 215 /** 216 * Sets debugging output on or off 217 * 218 * @param bool $debug 219 */ 220 public function setDebug(bool $debug): void { 221 $this->debug_ = $debug; 222 } 223 224 /** 225 * Get the host that this socket is connected to 226 * 227 * @return string host 228 */ 229 public function getHost(): string { 230 return $this->host_; 231 } 232 233 /** 234 * Get the remote port that this socket is connected to 235 * 236 * @return int port 237 */ 238 public function getPort(): int { 239 return $this->port_; 240 } 241 242 /** 243 * Get the error string in case of open failure 244 * 245 * @return errstr_ or null 246 */ 247 public function getErrStr(): ?string { 248 return $this->errstr_; 249 } 250 251 /** 252 * Get the error number in case of open failure 253 * 254 * @return errno_ or null 255 */ 256 public function getErrNo(): ?int { 257 return $this->errno_; 258 } 259 260 /** 261 * Tests whether this is open 262 * 263 * @return bool true if the socket is open 264 */ 265 public function isOpen(): bool { 266 return is_resource($this->handle_); 267 } 268 269 /** 270 * Connects the socket. 271 */ 272 public function open(): void { 273 if ($this->isOpen()) { 274 throw new TTransportException( 275 'TSocket: socket already connected', 276 TTransportException::ALREADY_OPEN, 277 ); 278 } 279 280 if ($this->host_ === null) { 281 throw new TTransportException( 282 'TSocket: cannot open null host', 283 TTransportException::NOT_OPEN, 284 ); 285 } 286 287 if ($this->port_ <= 0) { 288 throw new TTransportException( 289 'TSocket: cannot open without port', 290 TTransportException::NOT_OPEN, 291 ); 292 } 293 294 $handle = null; 295 if ($this->persist_) { 296 $handle = @pfsockopen( 297 $this->host_, 298 $this->port_, 299 $this->errno_, 300 $this->errstr_, 301 $this->sendTimeout_ / 1000.0, 302 ); 303 } else { 304 $handle = @fsockopen( 305 $this->host_, 306 $this->port_, 307 $this->errno_, 308 $this->errstr_, 309 $this->sendTimeout_ / 1000.0, 310 ); 311 } 312 313 // Connect failed? 314 if (!$handle) { 315 $error = 316 'TSocket: could not connect to '.$this->host_.':'.$this->port_; 317 $error .= ' ('.$this->errstr_.' ['.$this->errno_.'])'; 318 if ($this->debug_) { 319 call_user_func($this->debugHandler_, $error); 320 } 321 throw new TTransportException( 322 $error, 323 TTransportException::COULD_NOT_CONNECT, 324 ); 325 } 326 327 $this->handle_ = $handle; 328 329 $sock_name = stream_socket_get_name($this->handle_, false); 330 // IPv6 is returned [2401:db00:20:702c:face:0:7:0]:port 331 // or when stream_socket_get_name is buggy it is 332 // 2401:db00:20:702c:face:0:7:0:port 333 $this->lport_ = end(explode(":", $sock_name)); 334 335 stream_set_timeout($this->handle_, 0, $this->sendTimeout_ * 1000); 336 $this->sendTimeoutSet_ = true; 337 } 338 339 /** 340 * Closes the socket. 341 */ 342 public function close(): void { 343 if (!$this->persist_) { 344 @fclose($this->handle_); 345 $this->handle_ = null; 346 } 347 } 348 349 /** 350 * Test to see if the socket is ready for reading. This method returns 351 * immediately. If calling this method in a loop one should sleep or do 352 * other work else CPU cycles will be wasted. 353 * 354 * @return bool True if a non-blocking read of at least one character can 355 * be preformed on the socket. 356 */ 357 public function isReadable(): bool { 358 return $this->isSocketActionable($this->handle_, $check_read = true); 359 } 360 361 /** 362 * Test to see if the socket is ready for writing. This method returns 363 * immediately. If calling this method in a loop one should sleep or do 364 * other work else CPU cycles will be wasted. 365 * 366 * @return bool True if a non-blocking write can be preformed on the socket. 367 */ 368 public function isWritable(): bool { 369 $writable = 370 $this->isSocketActionable($this->handle_, $check_read = false); 371 if (!$writable && $this->sendTimeout_ > 0) { 372 if ($this->writeAttemptStart_ === null) { 373 $this->writeAttemptStart_ = microtime(true); 374 } 375 if (microtime(true) - 376 (int) $this->writeAttemptStart_ > ($this->sendTimeout_ / 1000.0)) { 377 throw new TTransportException( 378 'TSocket: socket not writable after '.$this->sendTimeout_.'ms', 379 TTransportException::TIMED_OUT, 380 ); 381 } 382 } 383 return $writable; 384 } 385 386 private function isSocketActionable( 387 ?resource $socket, 388 bool $check_read, 389 ): bool { 390 // the socket is technically actionable, although any read or write will 391 // fail since close() was already called. 392 if ($socket === null) { 393 return true; 394 } 395 396 $read = $write = array(); 397 if ($check_read) { 398 $read = array($socket); 399 } else { 400 $write = array($socket); 401 } 402 403 $excpt = array(); 404 $ret = stream_select($read, $write, $excpt, 0, 0); 405 if ($ret === false) { 406 $error = 'TSocket: stream_select failed on socket.'; 407 if ($this->debug_) { 408 call_user_func($this->debugHandler_, $error); 409 } 410 throw new TTransportException($error); 411 } 412 413 return $ret !== 0; 414 } 415 416 /** 417 * Reads maximum min($len, $maxReadChunkSize_) bytes 418 * from the stream. 419 */ 420 private function readChunk(int $len): ?string { 421 if ($this->maxReadChunkSize_ !== null) { 422 $len = min($len, $this->maxReadChunkSize_); 423 } 424 425 $res = @fread($this->handle_, $len); 426 $size = strlen($res); 427 428 $this->onRead($size); 429 return $res; 430 } 431 432 /** 433 * Uses stream get contents to do the reading 434 * 435 * @param int $len How many bytes 436 * @return string Binary data 437 */ 438 public function readAll(int $len): string { 439 if ($this->sendTimeoutSet_) { 440 $sec = 0; 441 if ($this->recvTimeout_ > 1000) { 442 $msec = $this->recvTimeout_ % 1000; 443 $sec = ($this->recvTimeout_ - $msec) / 1000; 444 } else { 445 $msec = $this->recvTimeout_; 446 } 447 stream_set_timeout($this->handle_, $sec, $msec * 1000); 448 $this->sendTimeoutSet_ = false; 449 } 450 // This call does not obey stream_set_timeout values! 451 // $buf = @stream_get_contents($this->handle_, $len); 452 $pre = ''; 453 while (true) { 454 $t_start = microtime(true); 455 $buf = $this->readChunk($len); 456 $t_stop = microtime(true); 457 if ($buf === null || $buf === '') { 458 $read_err_detail = sprintf( 459 '%d bytes from %s:%d to localhost:%d. Spent %2.2f ms.', 460 $len, 461 $this->host_, 462 $this->port_, 463 $this->lport_, 464 ($t_stop - $t_start) * 1000, 465 ); 466 $md = stream_get_meta_data($this->handle_); 467 if ($md['timed_out']) { 468 throw new TTransportException( 469 'TSocket: timeout while reading '.$read_err_detail, 470 TTransportException::TIMED_OUT, 471 ); 472 } else { 473 $md_str = str_replace("\n", " ", print_r($md, true)); 474 throw new TTransportException( 475 'TSocket: could not read '.$read_err_detail, 476 TTransportException::COULD_NOT_READ, 477 ); 478 } 479 } else if (($sz = strlen($buf)) < $len) { 480 $md = stream_get_meta_data($this->handle_); 481 if ($md['timed_out']) { 482 $read_err_detail = sprintf( 483 '%d bytes from %s:%d to localhost:%d. Spent %2.2f ms.', 484 $len, 485 $this->host_, 486 $this->port_, 487 $this->lport_, 488 ($t_stop - $t_start) * 1000, 489 ); 490 throw new TTransportException( 491 'TSocket: timeout while reading '.$read_err_detail, 492 TTransportException::TIMED_OUT, 493 ); 494 } else { 495 $pre .= $buf; 496 $len -= $sz; 497 } 498 } else { 499 $this->readAttemptStart_ = null; 500 $res = $pre.$buf; 501 $this->onRead(strlen($res)); 502 return $res; 503 } 504 } 505 506 throw new TTransportException("TSocket: You shouldn't be here"); 507 } 508 509 /** 510 * Read from the socket 511 * 512 * @param int $len How many bytes 513 * @return string Binary data 514 */ 515 public function read(int $len): string { 516 if ($this->sendTimeoutSet_) { 517 stream_set_timeout($this->handle_, 0, $this->recvTimeout_ * 1000); 518 $this->sendTimeoutSet_ = false; 519 } 520 $t_start = microtime(true); 521 $data = $this->readChunk($len); 522 $t_stop = microtime(true); 523 if ($data === null || $data === '') { 524 $read_err_detail = sprintf( 525 '%d bytes from %s:%d to localhost:%d. Spent %2.2f ms.', 526 $len, 527 $this->host_, 528 $this->port_, 529 $this->lport_, 530 ($t_stop - $t_start) * 1000, 531 ); 532 $md = stream_get_meta_data($this->handle_); 533 if ($md['timed_out']) { 534 throw new TTransportException( 535 'TSocket: timeout while reading '.$read_err_detail, 536 TTransportException::TIMED_OUT, 537 ); 538 } else { 539 $md_str = str_replace("\n", " ", print_r($md, true)); 540 throw new TTransportException( 541 'TSocket: could not read '.$read_err_detail, 542 TTransportException::COULD_NOT_READ, 543 ); 544 } 545 } else { 546 $this->readAttemptStart_ = null; 547 } 548 549 $this->onRead(strlen($data)); 550 return $data; 551 } 552 553 /** 554 * Perform a nonblocking read. 555 * @param int $len Number of bytes to read 556 * @return string Binary data or '' is no data is read 557 */ 558 public function nonBlockingRead(int $len): string { 559 $md = stream_get_meta_data($this->handle_); 560 $is_blocking = $md['blocked']; 561 562 // If the stream is currently blocking, we will set to nonblocking 563 // first 564 if ($is_blocking && !stream_set_blocking($this->handle_, 0)) { 565 throw new TTransportException( 566 'TSocket: '.'cannot set stream to non-blocking', 567 ); 568 } 569 570 $data = $this->readChunk($len); 571 572 if ($data === null) { 573 throw new TTransportException('TSocket: failed in non-blocking read'); 574 } 575 576 // Switch back to blocking mode is necessary 577 if ($is_blocking && !stream_set_blocking($this->handle_, 1)) { 578 throw new TTransportException( 579 'TSocket: '.'cannot swtich stream back to blocking', 580 ); 581 } 582 $this->onRead(strlen($data)); 583 return $data; 584 } 585 586 /** 587 * Write to the socket. 588 * 589 * @param string $buf The data to write 590 */ 591 public function write(string $buf): void { 592 if ($this->handle_ === null) { 593 throw new TException('TSocket: handle_ is null'); 594 } 595 596 $this->onWrite(strlen($buf)); 597 598 if (!$this->sendTimeoutSet_) { 599 stream_set_timeout($this->handle_, 0, $this->sendTimeout_ * 1000); 600 $this->sendTimeoutSet_ = true; 601 } 602 603 while (strlen($buf) > 0) { 604 $buflen = strlen($buf); 605 $t_start = microtime(true); 606 $got = @fwrite($this->handle_, $buf); 607 $write_time = microtime(true) - $t_start; 608 609 if ($got === 0 || !is_int($got)) { 610 $read_err_detail = sprintf( 611 '%d bytes from %s:%d to localhost:%d. Spent %2.2f ms.', 612 $buflen, 613 $this->host_, 614 $this->port_, 615 $this->lport_, 616 $write_time * 1000, 617 ); 618 $md = stream_get_meta_data($this->handle_); 619 if ($md['timed_out']) { 620 throw new TTransportException( 621 'TSocket: timeout while writing '.$read_err_detail, 622 TTransportException::TIMED_OUT, 623 ); 624 } else { 625 $md_str = str_replace("\n", " ", print_r($md, true)); 626 throw new TTransportException( 627 'TSocket: could not write '.$read_err_detail, 628 TTransportException::COULD_NOT_WRITE, 629 ); 630 } 631 } 632 $buf = substr($buf, $got); 633 } 634 635 $this->writeAttemptStart_ = null; 636 } 637 638 /** 639 * Flush output to the socket. 640 */ 641 public function flush(): void { 642 $ret = fflush($this->handle_); 643 644 if ($ret === false) { 645 throw new TTransportException( 646 'TSocket: could not flush '.$this->host_.':'.$this->port_, 647 ); 648 } 649 } 650} 651