1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <cstring>
23 #include <sstream>
24 #ifdef HAVE_SYS_SOCKET_H
25 #include <sys/socket.h>
26 #endif
27 #ifdef HAVE_SYS_UN_H
28 #include <sys/un.h>
29 #endif
30 #ifdef HAVE_SYS_POLL_H
31 #include <sys/poll.h>
32 #endif
33 #include <sys/types.h>
34 #ifdef HAVE_NETINET_IN_H
35 #include <netinet/in.h>
36 #include <netinet/tcp.h>
37 #endif
38 #ifdef HAVE_UNISTD_H
39 #include <unistd.h>
40 #endif
41 #include <fcntl.h>
42
43 #include <thrift/concurrency/Monitor.h>
44 #include <thrift/transport/TSocket.h>
45 #include <thrift/transport/TTransportException.h>
46 #include <thrift/transport/PlatformSocket.h>
47
48 #ifndef SOCKOPT_CAST_T
49 # ifndef _WIN32
50 # define SOCKOPT_CAST_T void
51 # else
52 # define SOCKOPT_CAST_T char
53 # endif // _WIN32
54 #endif
55
56 template<class T>
const_cast_sockopt(const T * v)57 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v)
58 {
59 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
60 }
61
62 template<class T>
cast_sockopt(T * v)63 inline SOCKOPT_CAST_T* cast_sockopt(T* v)
64 {
65 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
66 }
67
68 namespace apache
69 {
70 namespace thrift
71 {
72 namespace transport
73 {
74
75 using namespace std;
76
77 // Global var to track total socket sys calls
78 uint32_t g_socket_syscalls = 0;
79
80 /**
81 * TSocket implementation.
82 *
83 */
84
TSocket(string host,int port)85 TSocket::TSocket(string host, int port) :
86 host_(host),
87 port_(port),
88 path_(""),
89 socket_(THRIFT_INVALID_SOCKET),
90 connTimeout_(0),
91 sendTimeout_(0),
92 recvTimeout_(0),
93 lingerOn_(1),
94 lingerVal_(0),
95 noDelay_(1),
96 maxRecvRetries_(5)
97 {
98 recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
99 recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
100 }
101
TSocket(string path)102 TSocket::TSocket(string path) :
103 host_(""),
104 port_(0),
105 path_(path),
106 socket_(THRIFT_INVALID_SOCKET),
107 connTimeout_(0),
108 sendTimeout_(0),
109 recvTimeout_(0),
110 lingerOn_(1),
111 lingerVal_(0),
112 noDelay_(1),
113 maxRecvRetries_(5)
114 {
115 recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
116 recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
117 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
118 }
119
TSocket()120 TSocket::TSocket() :
121 host_(""),
122 port_(0),
123 path_(""),
124 socket_(THRIFT_INVALID_SOCKET),
125 connTimeout_(0),
126 sendTimeout_(0),
127 recvTimeout_(0),
128 lingerOn_(1),
129 lingerVal_(0),
130 noDelay_(1),
131 maxRecvRetries_(5)
132 {
133 recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
134 recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
135 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
136 }
137
TSocket(THRIFT_SOCKET socket)138 TSocket::TSocket(THRIFT_SOCKET socket) :
139 host_(""),
140 port_(0),
141 path_(""),
142 socket_(socket),
143 connTimeout_(0),
144 sendTimeout_(0),
145 recvTimeout_(0),
146 lingerOn_(1),
147 lingerVal_(0),
148 noDelay_(1),
149 maxRecvRetries_(5)
150 {
151 recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
152 recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
153 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
154 }
155
~TSocket()156 TSocket::~TSocket()
157 {
158 close();
159 }
160
isOpen()161 bool TSocket::isOpen()
162 {
163 return (socket_ != THRIFT_INVALID_SOCKET);
164 }
165
peek()166 bool TSocket::peek()
167 {
168 if (!isOpen())
169 {
170 return false;
171 }
172
173 uint8_t buf;
174 int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
175
176 if (r == -1)
177 {
178 int errno_copy = THRIFT_GET_SOCKET_ERROR;
179 #if defined __FreeBSD__ || defined __MACH__
180
181 /* shigin:
182 * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by
183 * the other side
184 */
185 if (errno_copy == THRIFT_ECONNRESET)
186 {
187 close();
188 return false;
189 }
190
191 #endif
192 GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
193 throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
194 }
195
196 return (r > 0);
197 }
198
openConnection(struct addrinfo * res)199 void TSocket::openConnection(struct addrinfo* res)
200 {
201
202 if (isOpen())
203 {
204 return;
205 }
206
207 if (! path_.empty())
208 {
209 socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
210 }
211 else
212 {
213 socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
214 }
215
216 if (socket_ == THRIFT_INVALID_SOCKET)
217 {
218 int errno_copy = THRIFT_GET_SOCKET_ERROR;
219 GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
220 throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
221 }
222
223 // Send timeout
224 if (sendTimeout_ > 0)
225 {
226 setSendTimeout(sendTimeout_);
227 }
228
229 // Recv timeout
230 if (recvTimeout_ > 0)
231 {
232 setRecvTimeout(recvTimeout_);
233 }
234
235 // Linger
236 setLinger(lingerOn_, lingerVal_);
237
238 // No delay
239 setNoDelay(noDelay_);
240
241 // Uses a low min RTO if asked to.
242 #ifdef TCP_LOW_MIN_RTO
243
244 if (getUseLowMinRto())
245 {
246 int one = 1;
247 setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
248 }
249
250 #endif
251
252
253 // Set the socket to be non blocking for connect if a timeout exists
254 int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0);
255
256 if (connTimeout_ > 0)
257 {
258 if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK))
259 {
260 int errno_copy = THRIFT_GET_SOCKET_ERROR;
261 GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy);
262 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
263 }
264 }
265 else
266 {
267 if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK))
268 {
269 int errno_copy = THRIFT_GET_SOCKET_ERROR;
270 GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
271 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
272 }
273 }
274
275 // Connect the socket
276 int ret;
277
278 if (! path_.empty())
279 {
280
281 #ifndef _WIN32
282
283 struct sockaddr_un address;
284 socklen_t len;
285
286 if (path_.length() > sizeof(address.sun_path))
287 {
288 int errno_copy = THRIFT_GET_SOCKET_ERROR;
289 GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy);
290 throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
291 }
292
293 address.sun_family = AF_UNIX;
294 THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
295 len = sizeof(address);
296 ret = connect(socket_, (struct sockaddr*) &address, len);
297
298 #else
299 GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
300 throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path not supported");
301 #endif
302
303 }
304 else
305 {
306 ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
307 }
308
309 // success case
310 if (ret == 0)
311 {
312 goto done;
313 }
314
315 if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS) && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK))
316 {
317 int errno_copy = THRIFT_GET_SOCKET_ERROR;
318 GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
319 throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
320 }
321
322
323 struct THRIFT_POLLFD fds[1];
324
325 std::memset(fds, 0, sizeof(fds));
326
327 fds[0].fd = socket_;
328
329 fds[0].events = THRIFT_POLLOUT;
330
331 ret = THRIFT_POLL(fds, 1, connTimeout_);
332
333 if (ret > 0)
334 {
335 // Ensure the socket is connected and that there are no errors set
336 int val;
337 socklen_t lon;
338 lon = sizeof(int);
339 int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);
340
341 if (ret2 == -1)
342 {
343 int errno_copy = THRIFT_GET_SOCKET_ERROR;
344 GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
345 throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
346 }
347
348 // no errors on socket, go to town
349 if (val == 0)
350 {
351 goto done;
352 }
353
354 GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(), val);
355 throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
356 }
357 else if (ret == 0)
358 {
359 // socket timed out
360 string errStr = "TSocket::open() timed out " + getSocketInfo();
361 GlobalOutput(errStr.c_str());
362 throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
363 }
364 else
365 {
366 // error on THRIFT_POLL()
367 int errno_copy = THRIFT_GET_SOCKET_ERROR;
368 GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy);
369 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy);
370 }
371
372 done:
373 // Set socket back to normal mode (blocking)
374 THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags);
375
376 if (path_.empty())
377 {
378 setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen));
379 }
380 }
381
open()382 void TSocket::open()
383 {
384 if (isOpen())
385 {
386 return;
387 }
388
389 if (! path_.empty())
390 {
391 unix_open();
392 }
393 else
394 {
395 local_open();
396 }
397 }
398
unix_open()399 void TSocket::unix_open()
400 {
401 if (! path_.empty())
402 {
403 // Unix Domain SOcket does not need addrinfo struct, so we pass NULL
404 openConnection(NULL);
405 }
406 }
407
local_open()408 void TSocket::local_open()
409 {
410
411 #ifdef _WIN32
412 TWinsockSingleton::create();
413 #endif // _WIN32
414
415 if (isOpen())
416 {
417 return;
418 }
419
420 // Validate port number
421 if (port_ < 0 || port_ > 0xFFFF)
422 {
423 throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid");
424 }
425
426 struct addrinfo hints, *res, *res0;
427
428 res = NULL;
429
430 res0 = NULL;
431
432 int error;
433
434 char port[sizeof("65535")];
435
436 std::memset(&hints, 0, sizeof(hints));
437
438 hints.ai_family = PF_UNSPEC;
439
440 hints.ai_socktype = SOCK_STREAM;
441
442 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
443
444 sprintf(port, "%d", port_);
445
446 error = getaddrinfo(host_.c_str(), port, &hints, &res0);
447
448 if (error)
449 {
450 string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(THRIFT_GAI_STRERROR(error));
451 GlobalOutput(errStr.c_str());
452 close();
453 throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for client socket.");
454 }
455
456 // Cycle through all the returned addresses until one
457 // connects or push the exception up.
458 for (res = res0; res; res = res->ai_next)
459 {
460 try
461 {
462 openConnection(res);
463 break;
464 }
465 catch (TTransportException&)
466 {
467 if (res->ai_next)
468 {
469 close();
470 }
471 else
472 {
473 close();
474 freeaddrinfo(res0); // cleanup on failure
475 throw;
476 }
477 }
478 }
479
480 // Free address structure memory
481 freeaddrinfo(res0);
482 }
483
close()484 void TSocket::close()
485 {
486 if (socket_ != THRIFT_INVALID_SOCKET)
487 {
488 shutdown(socket_, THRIFT_SHUT_RDWR);
489 ::THRIFT_CLOSESOCKET(socket_);
490 }
491
492 socket_ = THRIFT_INVALID_SOCKET;
493 }
494
setSocketFD(THRIFT_SOCKET socket)495 void TSocket::setSocketFD(THRIFT_SOCKET socket)
496 {
497 if (socket_ != THRIFT_INVALID_SOCKET)
498 {
499 close();
500 }
501
502 socket_ = socket;
503 }
504
read(uint8_t * buf,uint32_t len)505 uint32_t TSocket::read(uint8_t* buf, uint32_t len)
506 {
507 if (socket_ == THRIFT_INVALID_SOCKET)
508 {
509 throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
510 }
511
512 int32_t retries = 0;
513
514 // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
515 // the system is out of resources (an awesome undocumented feature).
516 // The following is an approximation of the time interval under which
517 // THRIFT_EAGAIN is taken to indicate an out of resources error.
518 uint32_t eagainThresholdMicros = 0;
519
520 if (recvTimeout_)
521 {
522 // if a readTimeout is specified along with a max number of recv retries, then
523 // the threshold will ensure that the read timeout is not exceeded even in the
524 // case of resource errors
525 eagainThresholdMicros = (recvTimeout_ * 1000) / ((maxRecvRetries_ > 0) ? maxRecvRetries_ : 2);
526 }
527
528 try_again:
529 // Read from the socket
530 struct timeval begin;
531
532 if (recvTimeout_ > 0)
533 {
534 THRIFT_GETTIMEOFDAY(&begin, NULL);
535 }
536 else
537 {
538 // if there is no read timeout we don't need the TOD to determine whether
539 // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
540 begin.tv_sec = begin.tv_usec = 0;
541 }
542
543 int got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
544 int errno_copy = THRIFT_GET_SOCKET_ERROR; //THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
545 ++g_socket_syscalls;
546
547 // Check for error on read
548 if (got < 0)
549 {
550 if (errno_copy == THRIFT_EAGAIN)
551 {
552 // if no timeout we can assume that resource exhaustion has occurred.
553 if (recvTimeout_ == 0)
554 {
555 throw TTransportException(TTransportException::TIMED_OUT,
556 "THRIFT_EAGAIN (unavailable resources)");
557 }
558
559 // check if this is the lack of resources or timeout case
560 struct timeval end;
561 THRIFT_GETTIMEOFDAY(&end, NULL);
562 uint32_t readElapsedMicros = static_cast<uint32_t>(
563 ((end.tv_sec - begin.tv_sec) * 1000 * 1000)
564 + (((uint64_t)(end.tv_usec - begin.tv_usec))));
565
566 if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros))
567 {
568 if (retries++ < maxRecvRetries_)
569 {
570 THRIFT_SLEEP_USEC(50);
571 goto try_again;
572 }
573 else
574 {
575 throw TTransportException(TTransportException::TIMED_OUT,
576 "THRIFT_EAGAIN (unavailable resources)");
577 }
578 }
579 else
580 {
581 // infer that timeout has been hit
582 throw TTransportException(TTransportException::TIMED_OUT,
583 "THRIFT_EAGAIN (timed out)");
584 }
585 }
586
587 // If interrupted, try again
588 if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_)
589 {
590 goto try_again;
591 }
592
593 #if defined __FreeBSD__ || defined __MACH__
594
595 if (errno_copy == THRIFT_ECONNRESET)
596 {
597 /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
598 * THRIFT_ECONNRESET if peer performed shutdown
599 * edhall: eliminated close() since we do that in the destructor.
600 */
601 return 0;
602 }
603
604 #endif
605
606 #ifdef _WIN32
607
608 if (errno_copy == WSAECONNRESET)
609 {
610 return 0; // EOF
611 }
612
613 #endif
614
615 // Now it's not a try again case, but a real probblez
616 GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
617
618 // If we disconnect with no linger time
619 if (errno_copy == THRIFT_ECONNRESET)
620 {
621 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ECONNRESET");
622 }
623
624 // This ish isn't open
625 if (errno_copy == THRIFT_ENOTCONN)
626 {
627 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN");
628 }
629
630 // Timed out!
631 if (errno_copy == THRIFT_ETIMEDOUT)
632 {
633 throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
634 }
635
636 // Some other error, whatevz
637 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
638 }
639
640 // The remote host has closed the socket
641 if (got == 0)
642 {
643 // edhall: we used to call close() here, but our caller may want to deal
644 // with the socket fd and we'll close() in our destructor in any case.
645 return 0;
646 }
647
648 // Pack data into string
649 return got;
650 }
651
write(const uint8_t * buf,uint32_t len)652 void TSocket::write(const uint8_t* buf, uint32_t len)
653 {
654 uint32_t sent = 0;
655
656 while (sent < len)
657 {
658 uint32_t b = write_partial(buf + sent, len - sent);
659
660 if (b == 0)
661 {
662 // This should only happen if the timeout set with SO_SNDTIMEO expired.
663 // Raise an exception.
664 throw TTransportException(TTransportException::TIMED_OUT,
665 "send timeout expired");
666 }
667
668 sent += b;
669 }
670 }
671
write_partial(const uint8_t * buf,uint32_t len)672 uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len)
673 {
674 if (socket_ == THRIFT_INVALID_SOCKET)
675 {
676 throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
677 }
678
679 uint32_t sent = 0;
680
681 int flags = 0;
682 #ifdef MSG_NOSIGNAL
683 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
684 // check for the THRIFT_EPIPE return condition and close the socket in that case
685 flags |= MSG_NOSIGNAL;
686 #endif // ifdef MSG_NOSIGNAL
687
688 int b = static_cast<int>(send(socket_, const_cast_sockopt(buf + sent), len - sent, flags));
689 ++g_socket_syscalls;
690
691 if (b < 0)
692 {
693 if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN)
694 {
695 return 0;
696 }
697
698 // Fail on a send error
699 int errno_copy = THRIFT_GET_SOCKET_ERROR;
700 GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy);
701
702 if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET || errno_copy == THRIFT_ENOTCONN)
703 {
704 close();
705 throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
706 }
707
708 throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
709 }
710
711 // Fail on blocked send
712 if (b == 0)
713 {
714 throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
715 }
716
717 return b;
718 }
719
getHost()720 std::string TSocket::getHost()
721 {
722 return host_;
723 }
724
getPort()725 int TSocket::getPort()
726 {
727 return port_;
728 }
729
setHost(string host)730 void TSocket::setHost(string host)
731 {
732 host_ = host;
733 }
734
setPort(int port)735 void TSocket::setPort(int port)
736 {
737 port_ = port;
738 }
739
setLinger(bool on,int linger)740 void TSocket::setLinger(bool on, int linger)
741 {
742 lingerOn_ = on;
743 lingerVal_ = linger;
744
745 if (socket_ == THRIFT_INVALID_SOCKET)
746 {
747 return;
748 }
749
750 struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
751
752 int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l));
753
754 if (ret == -1)
755 {
756 int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
757 GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
758 }
759 }
760
setNoDelay(bool noDelay)761 void TSocket::setNoDelay(bool noDelay)
762 {
763 noDelay_ = noDelay;
764
765 if (socket_ == THRIFT_INVALID_SOCKET || !path_.empty())
766 {
767 return;
768 }
769
770 // Set socket to NODELAY
771 int v = noDelay_ ? 1 : 0;
772 int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v));
773
774 if (ret == -1)
775 {
776 int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
777 GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
778 }
779 }
780
setConnTimeout(int ms)781 void TSocket::setConnTimeout(int ms)
782 {
783 connTimeout_ = ms;
784 }
785
setRecvTimeout(int ms)786 void TSocket::setRecvTimeout(int ms)
787 {
788 if (ms < 0)
789 {
790 char errBuf[512];
791 sprintf(errBuf, "TSocket::setRecvTimeout with negative input: %d\n", ms);
792 GlobalOutput(errBuf);
793 return;
794 }
795
796 recvTimeout_ = ms;
797
798 if (socket_ == THRIFT_INVALID_SOCKET)
799 {
800 return;
801 }
802
803 recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
804 recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
805
806 // Copy because THRIFT_POLL may modify
807 struct timeval r = recvTimeval_;
808 int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, cast_sockopt(&r), sizeof(r));
809
810 if (ret == -1)
811 {
812 int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
813 GlobalOutput.perror("TSocket::setRecvTimeout() setsockopt() " + getSocketInfo(), errno_copy);
814 }
815 }
816
setSendTimeout(int ms)817 void TSocket::setSendTimeout(int ms)
818 {
819 if (ms < 0)
820 {
821 char errBuf[512];
822 sprintf(errBuf, "TSocket::setSendTimeout with negative input: %d\n", ms);
823 GlobalOutput(errBuf);
824 return;
825 }
826
827 sendTimeout_ = ms;
828
829 if (socket_ == THRIFT_INVALID_SOCKET)
830 {
831 return;
832 }
833
834 struct timeval s = {(int)(sendTimeout_ / 1000),
835 (int)((sendTimeout_ % 1000) * 1000)
836 };
837
838 int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, cast_sockopt(&s), sizeof(s));
839
840 if (ret == -1)
841 {
842 int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
843 GlobalOutput.perror("TSocket::setSendTimeout() setsockopt() " + getSocketInfo(), errno_copy);
844 }
845 }
846
setMaxRecvRetries(int maxRecvRetries)847 void TSocket::setMaxRecvRetries(int maxRecvRetries)
848 {
849 maxRecvRetries_ = maxRecvRetries;
850 }
851
getSocketInfo()852 string TSocket::getSocketInfo()
853 {
854 std::ostringstream oss;
855
856 if (host_.empty() || port_ == 0)
857 {
858 oss << "<Host: " << getPeerAddress();
859 oss << " Port: " << getPeerPort() << ">";
860 }
861 else
862 {
863 oss << "<Host: " << host_ << " Port: " << port_ << ">";
864 }
865
866 return oss.str();
867 }
868
getPeerHost()869 std::string TSocket::getPeerHost()
870 {
871 if (peerHost_.empty() && path_.empty())
872 {
873 struct sockaddr_storage addr;
874 struct sockaddr* addrPtr;
875 socklen_t addrLen;
876
877 if (socket_ == THRIFT_INVALID_SOCKET)
878 {
879 return host_;
880 }
881
882 addrPtr = getCachedAddress(&addrLen);
883
884 if (addrPtr == NULL)
885 {
886 addrLen = sizeof(addr);
887
888 if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0)
889 {
890 return peerHost_;
891 }
892
893 addrPtr = (sockaddr*)&addr;
894
895 setCachedAddress(addrPtr, addrLen);
896 }
897
898 char clienthost[NI_MAXHOST];
899 char clientservice[NI_MAXSERV];
900
901 getnameinfo((sockaddr*) addrPtr, addrLen,
902 clienthost, sizeof(clienthost),
903 clientservice, sizeof(clientservice), 0);
904
905 peerHost_ = clienthost;
906 }
907
908 return peerHost_;
909 }
910
getPeerAddress()911 std::string TSocket::getPeerAddress()
912 {
913 if (peerAddress_.empty() && path_.empty())
914 {
915 struct sockaddr_storage addr;
916 struct sockaddr* addrPtr;
917 socklen_t addrLen;
918
919 if (socket_ == THRIFT_INVALID_SOCKET)
920 {
921 return peerAddress_;
922 }
923
924 addrPtr = getCachedAddress(&addrLen);
925
926 if (addrPtr == NULL)
927 {
928 addrLen = sizeof(addr);
929
930 if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0)
931 {
932 return peerAddress_;
933 }
934
935 addrPtr = (sockaddr*)&addr;
936
937 setCachedAddress(addrPtr, addrLen);
938 }
939
940 char clienthost[NI_MAXHOST];
941 char clientservice[NI_MAXSERV];
942
943 getnameinfo(addrPtr, addrLen,
944 clienthost, sizeof(clienthost),
945 clientservice, sizeof(clientservice),
946 NI_NUMERICHOST | NI_NUMERICSERV);
947
948 peerAddress_ = clienthost;
949 peerPort_ = std::atoi(clientservice);
950 }
951
952 return peerAddress_;
953 }
954
getPeerPort()955 int TSocket::getPeerPort()
956 {
957 getPeerAddress();
958 return peerPort_;
959 }
960
setCachedAddress(const sockaddr * addr,socklen_t len)961 void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len)
962 {
963 if (!path_.empty())
964 {
965 return;
966 }
967
968 switch (addr->sa_family)
969 {
970 case AF_INET:
971 if (len == sizeof(sockaddr_in))
972 {
973 memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
974 }
975
976 break;
977
978 case AF_INET6:
979 if (len == sizeof(sockaddr_in6))
980 {
981 memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
982 }
983
984 break;
985 }
986 }
987
getCachedAddress(socklen_t * len) const988 sockaddr* TSocket::getCachedAddress(socklen_t* len) const
989 {
990 switch (cachedPeerAddr_.ipv4.sin_family)
991 {
992 case AF_INET:
993 *len = sizeof(sockaddr_in);
994 return (sockaddr*) &cachedPeerAddr_.ipv4;
995
996 case AF_INET6:
997 *len = sizeof(sockaddr_in6);
998 return (sockaddr*) &cachedPeerAddr_.ipv6;
999
1000 default:
1001 return NULL;
1002 }
1003 }
1004
1005 bool TSocket::useLowMinRto_ = false;
setUseLowMinRto(bool useLowMinRto)1006 void TSocket::setUseLowMinRto(bool useLowMinRto)
1007 {
1008 useLowMinRto_ = useLowMinRto;
1009 }
getUseLowMinRto()1010 bool TSocket::getUseLowMinRto()
1011 {
1012 return useLowMinRto_;
1013 }
1014
1015 }
1016 }
1017 } // apache::thrift::transport
1018