1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <cstring>
23 #include <sstream>
24 #ifdef HAVE_SYS_SOCKET_H
25 #include <sys/socket.h>
26 #endif
27 #ifdef HAVE_SYS_UN_H
28 #include <sys/un.h>
29 #endif
30 #ifdef HAVE_SYS_POLL_H
31 #include <sys/poll.h>
32 #endif
33 #include <sys/types.h>
34 #ifdef HAVE_NETINET_IN_H
35 #include <netinet/in.h>
36 #include <netinet/tcp.h>
37 #endif
38 #ifdef HAVE_UNISTD_H
39 #include <unistd.h>
40 #endif
41 #include <fcntl.h>
42 
43 #include <thrift/concurrency/Monitor.h>
44 #include <thrift/transport/TSocket.h>
45 #include <thrift/transport/TTransportException.h>
46 #include <thrift/transport/PlatformSocket.h>
47 
48 #ifndef SOCKOPT_CAST_T
49 #   ifndef _WIN32
50 #       define SOCKOPT_CAST_T void
51 #   else
52 #       define SOCKOPT_CAST_T char
53 #   endif // _WIN32
54 #endif
55 
56 template<class T>
const_cast_sockopt(const T * v)57 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v)
58 {
59     return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
60 }
61 
62 template<class T>
cast_sockopt(T * v)63 inline SOCKOPT_CAST_T* cast_sockopt(T* v)
64 {
65     return reinterpret_cast<SOCKOPT_CAST_T*>(v);
66 }
67 
68 namespace apache
69 {
70 namespace thrift
71 {
72 namespace transport
73 {
74 
75 using namespace std;
76 
77 // Global var to track total socket sys calls
78 uint32_t g_socket_syscalls = 0;
79 
80 /**
81  * TSocket implementation.
82  *
83  */
84 
TSocket(string host,int port)85 TSocket::TSocket(string host, int port) :
86     host_(host),
87     port_(port),
88     path_(""),
89     socket_(THRIFT_INVALID_SOCKET),
90     connTimeout_(0),
91     sendTimeout_(0),
92     recvTimeout_(0),
93     lingerOn_(1),
94     lingerVal_(0),
95     noDelay_(1),
96     maxRecvRetries_(5)
97 {
98     recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
99     recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
100 }
101 
TSocket(string path)102 TSocket::TSocket(string path) :
103     host_(""),
104     port_(0),
105     path_(path),
106     socket_(THRIFT_INVALID_SOCKET),
107     connTimeout_(0),
108     sendTimeout_(0),
109     recvTimeout_(0),
110     lingerOn_(1),
111     lingerVal_(0),
112     noDelay_(1),
113     maxRecvRetries_(5)
114 {
115     recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
116     recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
117     cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
118 }
119 
TSocket()120 TSocket::TSocket() :
121     host_(""),
122     port_(0),
123     path_(""),
124     socket_(THRIFT_INVALID_SOCKET),
125     connTimeout_(0),
126     sendTimeout_(0),
127     recvTimeout_(0),
128     lingerOn_(1),
129     lingerVal_(0),
130     noDelay_(1),
131     maxRecvRetries_(5)
132 {
133     recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
134     recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
135     cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
136 }
137 
TSocket(THRIFT_SOCKET socket)138 TSocket::TSocket(THRIFT_SOCKET socket) :
139     host_(""),
140     port_(0),
141     path_(""),
142     socket_(socket),
143     connTimeout_(0),
144     sendTimeout_(0),
145     recvTimeout_(0),
146     lingerOn_(1),
147     lingerVal_(0),
148     noDelay_(1),
149     maxRecvRetries_(5)
150 {
151     recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
152     recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
153     cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
154 }
155 
~TSocket()156 TSocket::~TSocket()
157 {
158     close();
159 }
160 
isOpen()161 bool TSocket::isOpen()
162 {
163     return (socket_ != THRIFT_INVALID_SOCKET);
164 }
165 
peek()166 bool TSocket::peek()
167 {
168     if (!isOpen())
169     {
170         return false;
171     }
172 
173     uint8_t buf;
174     int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
175 
176     if (r == -1)
177     {
178         int errno_copy = THRIFT_GET_SOCKET_ERROR;
179 #if defined __FreeBSD__ || defined __MACH__
180 
181         /* shigin:
182          * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by
183          * the other side
184          */
185         if (errno_copy == THRIFT_ECONNRESET)
186         {
187             close();
188             return false;
189         }
190 
191 #endif
192         GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
193         throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
194     }
195 
196     return (r > 0);
197 }
198 
openConnection(struct addrinfo * res)199 void TSocket::openConnection(struct addrinfo* res)
200 {
201 
202     if (isOpen())
203     {
204         return;
205     }
206 
207     if (! path_.empty())
208     {
209         socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
210     }
211     else
212     {
213         socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
214     }
215 
216     if (socket_ == THRIFT_INVALID_SOCKET)
217     {
218         int errno_copy = THRIFT_GET_SOCKET_ERROR;
219         GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
220         throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
221     }
222 
223     // Send timeout
224     if (sendTimeout_ > 0)
225     {
226         setSendTimeout(sendTimeout_);
227     }
228 
229     // Recv timeout
230     if (recvTimeout_ > 0)
231     {
232         setRecvTimeout(recvTimeout_);
233     }
234 
235     // Linger
236     setLinger(lingerOn_, lingerVal_);
237 
238     // No delay
239     setNoDelay(noDelay_);
240 
241     // Uses a low min RTO if asked to.
242 #ifdef TCP_LOW_MIN_RTO
243 
244     if (getUseLowMinRto())
245     {
246         int one = 1;
247         setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
248     }
249 
250 #endif
251 
252 
253     // Set the socket to be non blocking for connect if a timeout exists
254     int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0);
255 
256     if (connTimeout_ > 0)
257     {
258         if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK))
259         {
260             int errno_copy = THRIFT_GET_SOCKET_ERROR;
261             GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy);
262             throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
263         }
264     }
265     else
266     {
267         if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK))
268         {
269             int errno_copy = THRIFT_GET_SOCKET_ERROR;
270             GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
271             throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
272         }
273     }
274 
275     // Connect the socket
276     int ret;
277 
278     if (! path_.empty())
279     {
280 
281 #ifndef _WIN32
282 
283         struct sockaddr_un address;
284         socklen_t len;
285 
286         if (path_.length() > sizeof(address.sun_path))
287         {
288             int errno_copy = THRIFT_GET_SOCKET_ERROR;
289             GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy);
290             throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
291         }
292 
293         address.sun_family = AF_UNIX;
294         THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
295         len = sizeof(address);
296         ret = connect(socket_, (struct sockaddr*) &address, len);
297 
298 #else
299         GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
300         throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path not supported");
301 #endif
302 
303     }
304     else
305     {
306         ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
307     }
308 
309     // success case
310     if (ret == 0)
311     {
312         goto done;
313     }
314 
315     if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS) && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK))
316     {
317         int errno_copy = THRIFT_GET_SOCKET_ERROR;
318         GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
319         throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
320     }
321 
322 
323     struct THRIFT_POLLFD fds[1];
324 
325     std::memset(fds, 0, sizeof(fds));
326 
327     fds[0].fd = socket_;
328 
329     fds[0].events = THRIFT_POLLOUT;
330 
331     ret = THRIFT_POLL(fds, 1, connTimeout_);
332 
333     if (ret > 0)
334     {
335         // Ensure the socket is connected and that there are no errors set
336         int val;
337         socklen_t lon;
338         lon = sizeof(int);
339         int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);
340 
341         if (ret2 == -1)
342         {
343             int errno_copy = THRIFT_GET_SOCKET_ERROR;
344             GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
345             throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
346         }
347 
348         // no errors on socket, go to town
349         if (val == 0)
350         {
351             goto done;
352         }
353 
354         GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(), val);
355         throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
356     }
357     else if (ret == 0)
358     {
359         // socket timed out
360         string errStr = "TSocket::open() timed out " + getSocketInfo();
361         GlobalOutput(errStr.c_str());
362         throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
363     }
364     else
365     {
366         // error on THRIFT_POLL()
367         int errno_copy = THRIFT_GET_SOCKET_ERROR;
368         GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy);
369         throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy);
370     }
371 
372 done:
373     // Set socket back to normal mode (blocking)
374     THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags);
375 
376     if (path_.empty())
377     {
378         setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen));
379     }
380 }
381 
open()382 void TSocket::open()
383 {
384     if (isOpen())
385     {
386         return;
387     }
388 
389     if (! path_.empty())
390     {
391         unix_open();
392     }
393     else
394     {
395         local_open();
396     }
397 }
398 
unix_open()399 void TSocket::unix_open()
400 {
401     if (! path_.empty())
402     {
403         // Unix Domain SOcket does not need addrinfo struct, so we pass NULL
404         openConnection(NULL);
405     }
406 }
407 
local_open()408 void TSocket::local_open()
409 {
410 
411 #ifdef _WIN32
412     TWinsockSingleton::create();
413 #endif // _WIN32
414 
415     if (isOpen())
416     {
417         return;
418     }
419 
420     // Validate port number
421     if (port_ < 0 || port_ > 0xFFFF)
422     {
423         throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid");
424     }
425 
426     struct addrinfo hints, *res, *res0;
427 
428     res = NULL;
429 
430     res0 = NULL;
431 
432     int error;
433 
434     char port[sizeof("65535")];
435 
436     std::memset(&hints, 0, sizeof(hints));
437 
438     hints.ai_family = PF_UNSPEC;
439 
440     hints.ai_socktype = SOCK_STREAM;
441 
442     hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
443 
444     sprintf(port, "%d", port_);
445 
446     error = getaddrinfo(host_.c_str(), port, &hints, &res0);
447 
448     if (error)
449     {
450         string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(THRIFT_GAI_STRERROR(error));
451         GlobalOutput(errStr.c_str());
452         close();
453         throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for client socket.");
454     }
455 
456     // Cycle through all the returned addresses until one
457     // connects or push the exception up.
458     for (res = res0; res; res = res->ai_next)
459     {
460         try
461         {
462             openConnection(res);
463             break;
464         }
465         catch (TTransportException&)
466         {
467             if (res->ai_next)
468             {
469                 close();
470             }
471             else
472             {
473                 close();
474                 freeaddrinfo(res0); // cleanup on failure
475                 throw;
476             }
477         }
478     }
479 
480     // Free address structure memory
481     freeaddrinfo(res0);
482 }
483 
close()484 void TSocket::close()
485 {
486     if (socket_ != THRIFT_INVALID_SOCKET)
487     {
488         shutdown(socket_, THRIFT_SHUT_RDWR);
489         ::THRIFT_CLOSESOCKET(socket_);
490     }
491 
492     socket_ = THRIFT_INVALID_SOCKET;
493 }
494 
setSocketFD(THRIFT_SOCKET socket)495 void TSocket::setSocketFD(THRIFT_SOCKET socket)
496 {
497     if (socket_ != THRIFT_INVALID_SOCKET)
498     {
499         close();
500     }
501 
502     socket_ = socket;
503 }
504 
read(uint8_t * buf,uint32_t len)505 uint32_t TSocket::read(uint8_t* buf, uint32_t len)
506 {
507     if (socket_ == THRIFT_INVALID_SOCKET)
508     {
509         throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
510     }
511 
512     int32_t retries = 0;
513 
514     // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
515     // the system is out of resources (an awesome undocumented feature).
516     // The following is an approximation of the time interval under which
517     // THRIFT_EAGAIN is taken to indicate an out of resources error.
518     uint32_t eagainThresholdMicros = 0;
519 
520     if (recvTimeout_)
521     {
522         // if a readTimeout is specified along with a max number of recv retries, then
523         // the threshold will ensure that the read timeout is not exceeded even in the
524         // case of resource errors
525         eagainThresholdMicros = (recvTimeout_ * 1000) / ((maxRecvRetries_ > 0) ? maxRecvRetries_ : 2);
526     }
527 
528 try_again:
529     // Read from the socket
530     struct timeval begin;
531 
532     if (recvTimeout_ > 0)
533     {
534         THRIFT_GETTIMEOFDAY(&begin, NULL);
535     }
536     else
537     {
538         // if there is no read timeout we don't need the TOD to determine whether
539         // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
540         begin.tv_sec = begin.tv_usec = 0;
541     }
542 
543     int got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
544     int errno_copy = THRIFT_GET_SOCKET_ERROR; //THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
545     ++g_socket_syscalls;
546 
547     // Check for error on read
548     if (got < 0)
549     {
550         if (errno_copy == THRIFT_EAGAIN)
551         {
552             // if no timeout we can assume that resource exhaustion has occurred.
553             if (recvTimeout_ == 0)
554             {
555                 throw TTransportException(TTransportException::TIMED_OUT,
556                                           "THRIFT_EAGAIN (unavailable resources)");
557             }
558 
559             // check if this is the lack of resources or timeout case
560             struct timeval end;
561             THRIFT_GETTIMEOFDAY(&end, NULL);
562             uint32_t readElapsedMicros =  static_cast<uint32_t>(
563                                               ((end.tv_sec - begin.tv_sec) * 1000 * 1000)
564                                               + (((uint64_t)(end.tv_usec - begin.tv_usec))));
565 
566             if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros))
567             {
568                 if (retries++ < maxRecvRetries_)
569                 {
570                     THRIFT_SLEEP_USEC(50);
571                     goto try_again;
572                 }
573                 else
574                 {
575                     throw TTransportException(TTransportException::TIMED_OUT,
576                                               "THRIFT_EAGAIN (unavailable resources)");
577                 }
578             }
579             else
580             {
581                 // infer that timeout has been hit
582                 throw TTransportException(TTransportException::TIMED_OUT,
583                                           "THRIFT_EAGAIN (timed out)");
584             }
585         }
586 
587         // If interrupted, try again
588         if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_)
589         {
590             goto try_again;
591         }
592 
593 #if defined __FreeBSD__ || defined __MACH__
594 
595         if (errno_copy == THRIFT_ECONNRESET)
596         {
597             /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
598              * THRIFT_ECONNRESET if peer performed shutdown
599              * edhall: eliminated close() since we do that in the destructor.
600              */
601             return 0;
602         }
603 
604 #endif
605 
606 #ifdef _WIN32
607 
608         if (errno_copy == WSAECONNRESET)
609         {
610             return 0; // EOF
611         }
612 
613 #endif
614 
615         // Now it's not a try again case, but a real probblez
616         GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
617 
618         // If we disconnect with no linger time
619         if (errno_copy == THRIFT_ECONNRESET)
620         {
621             throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ECONNRESET");
622         }
623 
624         // This ish isn't open
625         if (errno_copy == THRIFT_ENOTCONN)
626         {
627             throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN");
628         }
629 
630         // Timed out!
631         if (errno_copy == THRIFT_ETIMEDOUT)
632         {
633             throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
634         }
635 
636         // Some other error, whatevz
637         throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
638     }
639 
640     // The remote host has closed the socket
641     if (got == 0)
642     {
643         // edhall: we used to call close() here, but our caller may want to deal
644         // with the socket fd and we'll close() in our destructor in any case.
645         return 0;
646     }
647 
648     // Pack data into string
649     return got;
650 }
651 
write(const uint8_t * buf,uint32_t len)652 void TSocket::write(const uint8_t* buf, uint32_t len)
653 {
654     uint32_t sent = 0;
655 
656     while (sent < len)
657     {
658         uint32_t b = write_partial(buf + sent, len - sent);
659 
660         if (b == 0)
661         {
662             // This should only happen if the timeout set with SO_SNDTIMEO expired.
663             // Raise an exception.
664             throw TTransportException(TTransportException::TIMED_OUT,
665                                       "send timeout expired");
666         }
667 
668         sent += b;
669     }
670 }
671 
write_partial(const uint8_t * buf,uint32_t len)672 uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len)
673 {
674     if (socket_ == THRIFT_INVALID_SOCKET)
675     {
676         throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
677     }
678 
679     uint32_t sent = 0;
680 
681     int flags = 0;
682 #ifdef MSG_NOSIGNAL
683     // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
684     // check for the THRIFT_EPIPE return condition and close the socket in that case
685     flags |= MSG_NOSIGNAL;
686 #endif // ifdef MSG_NOSIGNAL
687 
688     int b = static_cast<int>(send(socket_, const_cast_sockopt(buf + sent), len - sent, flags));
689     ++g_socket_syscalls;
690 
691     if (b < 0)
692     {
693         if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN)
694         {
695             return 0;
696         }
697 
698         // Fail on a send error
699         int errno_copy = THRIFT_GET_SOCKET_ERROR;
700         GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy);
701 
702         if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET || errno_copy == THRIFT_ENOTCONN)
703         {
704             close();
705             throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
706         }
707 
708         throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
709     }
710 
711     // Fail on blocked send
712     if (b == 0)
713     {
714         throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
715     }
716 
717     return b;
718 }
719 
getHost()720 std::string TSocket::getHost()
721 {
722     return host_;
723 }
724 
getPort()725 int TSocket::getPort()
726 {
727     return port_;
728 }
729 
setHost(string host)730 void TSocket::setHost(string host)
731 {
732     host_ = host;
733 }
734 
setPort(int port)735 void TSocket::setPort(int port)
736 {
737     port_ = port;
738 }
739 
setLinger(bool on,int linger)740 void TSocket::setLinger(bool on, int linger)
741 {
742     lingerOn_ = on;
743     lingerVal_ = linger;
744 
745     if (socket_ == THRIFT_INVALID_SOCKET)
746     {
747         return;
748     }
749 
750     struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
751 
752     int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l));
753 
754     if (ret == -1)
755     {
756         int errno_copy = THRIFT_GET_SOCKET_ERROR;  // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
757         GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
758     }
759 }
760 
setNoDelay(bool noDelay)761 void TSocket::setNoDelay(bool noDelay)
762 {
763     noDelay_ = noDelay;
764 
765     if (socket_ == THRIFT_INVALID_SOCKET || !path_.empty())
766     {
767         return;
768     }
769 
770     // Set socket to NODELAY
771     int v = noDelay_ ? 1 : 0;
772     int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v));
773 
774     if (ret == -1)
775     {
776         int errno_copy = THRIFT_GET_SOCKET_ERROR;  // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
777         GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
778     }
779 }
780 
setConnTimeout(int ms)781 void TSocket::setConnTimeout(int ms)
782 {
783     connTimeout_ = ms;
784 }
785 
setRecvTimeout(int ms)786 void TSocket::setRecvTimeout(int ms)
787 {
788     if (ms < 0)
789     {
790         char errBuf[512];
791         sprintf(errBuf, "TSocket::setRecvTimeout with negative input: %d\n", ms);
792         GlobalOutput(errBuf);
793         return;
794     }
795 
796     recvTimeout_ = ms;
797 
798     if (socket_ == THRIFT_INVALID_SOCKET)
799     {
800         return;
801     }
802 
803     recvTimeval_.tv_sec = (int)(recvTimeout_ / 1000);
804     recvTimeval_.tv_usec = (int)((recvTimeout_ % 1000) * 1000);
805 
806     // Copy because THRIFT_POLL may modify
807     struct timeval r = recvTimeval_;
808     int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, cast_sockopt(&r), sizeof(r));
809 
810     if (ret == -1)
811     {
812         int errno_copy = THRIFT_GET_SOCKET_ERROR;  // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
813         GlobalOutput.perror("TSocket::setRecvTimeout() setsockopt() " + getSocketInfo(), errno_copy);
814     }
815 }
816 
setSendTimeout(int ms)817 void TSocket::setSendTimeout(int ms)
818 {
819     if (ms < 0)
820     {
821         char errBuf[512];
822         sprintf(errBuf, "TSocket::setSendTimeout with negative input: %d\n", ms);
823         GlobalOutput(errBuf);
824         return;
825     }
826 
827     sendTimeout_ = ms;
828 
829     if (socket_ == THRIFT_INVALID_SOCKET)
830     {
831         return;
832     }
833 
834     struct timeval s = {(int)(sendTimeout_ / 1000),
835                (int)((sendTimeout_ % 1000) * 1000)
836     };
837 
838     int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, cast_sockopt(&s), sizeof(s));
839 
840     if (ret == -1)
841     {
842         int errno_copy = THRIFT_GET_SOCKET_ERROR;  // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
843         GlobalOutput.perror("TSocket::setSendTimeout() setsockopt() " + getSocketInfo(), errno_copy);
844     }
845 }
846 
setMaxRecvRetries(int maxRecvRetries)847 void TSocket::setMaxRecvRetries(int maxRecvRetries)
848 {
849     maxRecvRetries_ = maxRecvRetries;
850 }
851 
getSocketInfo()852 string TSocket::getSocketInfo()
853 {
854     std::ostringstream oss;
855 
856     if (host_.empty() || port_ == 0)
857     {
858         oss << "<Host: " << getPeerAddress();
859         oss << " Port: " << getPeerPort() << ">";
860     }
861     else
862     {
863         oss << "<Host: " << host_ << " Port: " << port_ << ">";
864     }
865 
866     return oss.str();
867 }
868 
getPeerHost()869 std::string TSocket::getPeerHost()
870 {
871     if (peerHost_.empty() && path_.empty())
872     {
873         struct sockaddr_storage addr;
874         struct sockaddr* addrPtr;
875         socklen_t addrLen;
876 
877         if (socket_ == THRIFT_INVALID_SOCKET)
878         {
879             return host_;
880         }
881 
882         addrPtr = getCachedAddress(&addrLen);
883 
884         if (addrPtr == NULL)
885         {
886             addrLen = sizeof(addr);
887 
888             if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0)
889             {
890                 return peerHost_;
891             }
892 
893             addrPtr = (sockaddr*)&addr;
894 
895             setCachedAddress(addrPtr, addrLen);
896         }
897 
898         char clienthost[NI_MAXHOST];
899         char clientservice[NI_MAXSERV];
900 
901         getnameinfo((sockaddr*) addrPtr, addrLen,
902                     clienthost, sizeof(clienthost),
903                     clientservice, sizeof(clientservice), 0);
904 
905         peerHost_ = clienthost;
906     }
907 
908     return peerHost_;
909 }
910 
getPeerAddress()911 std::string TSocket::getPeerAddress()
912 {
913     if (peerAddress_.empty() && path_.empty())
914     {
915         struct sockaddr_storage addr;
916         struct sockaddr* addrPtr;
917         socklen_t addrLen;
918 
919         if (socket_ == THRIFT_INVALID_SOCKET)
920         {
921             return peerAddress_;
922         }
923 
924         addrPtr = getCachedAddress(&addrLen);
925 
926         if (addrPtr == NULL)
927         {
928             addrLen = sizeof(addr);
929 
930             if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0)
931             {
932                 return peerAddress_;
933             }
934 
935             addrPtr = (sockaddr*)&addr;
936 
937             setCachedAddress(addrPtr, addrLen);
938         }
939 
940         char clienthost[NI_MAXHOST];
941         char clientservice[NI_MAXSERV];
942 
943         getnameinfo(addrPtr, addrLen,
944                     clienthost, sizeof(clienthost),
945                     clientservice, sizeof(clientservice),
946                     NI_NUMERICHOST | NI_NUMERICSERV);
947 
948         peerAddress_ = clienthost;
949         peerPort_ = std::atoi(clientservice);
950     }
951 
952     return peerAddress_;
953 }
954 
getPeerPort()955 int TSocket::getPeerPort()
956 {
957     getPeerAddress();
958     return peerPort_;
959 }
960 
setCachedAddress(const sockaddr * addr,socklen_t len)961 void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len)
962 {
963     if (!path_.empty())
964     {
965         return;
966     }
967 
968     switch (addr->sa_family)
969     {
970         case AF_INET:
971             if (len == sizeof(sockaddr_in))
972             {
973                 memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
974             }
975 
976             break;
977 
978         case AF_INET6:
979             if (len == sizeof(sockaddr_in6))
980             {
981                 memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
982             }
983 
984             break;
985     }
986 }
987 
getCachedAddress(socklen_t * len) const988 sockaddr* TSocket::getCachedAddress(socklen_t* len) const
989 {
990     switch (cachedPeerAddr_.ipv4.sin_family)
991     {
992         case AF_INET:
993             *len = sizeof(sockaddr_in);
994             return (sockaddr*) &cachedPeerAddr_.ipv4;
995 
996         case AF_INET6:
997             *len = sizeof(sockaddr_in6);
998             return (sockaddr*) &cachedPeerAddr_.ipv6;
999 
1000         default:
1001             return NULL;
1002     }
1003 }
1004 
1005 bool TSocket::useLowMinRto_ = false;
setUseLowMinRto(bool useLowMinRto)1006 void TSocket::setUseLowMinRto(bool useLowMinRto)
1007 {
1008     useLowMinRto_ = useLowMinRto;
1009 }
getUseLowMinRto()1010 bool TSocket::getUseLowMinRto()
1011 {
1012     return useLowMinRto_;
1013 }
1014 
1015 }
1016 }
1017 } // apache::thrift::transport
1018