1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <cstring>
23 #include <sstream>
24 #ifdef HAVE_SYS_IOCTL_H
25 #include <sys/ioctl.h>
26 #endif
27 #ifdef HAVE_SYS_SOCKET_H
28 #include <sys/socket.h>
29 #endif
30 #ifdef HAVE_SYS_UN_H
31 #include <sys/un.h>
32 #endif
33 #ifdef HAVE_SYS_POLL_H
34 #include <sys/poll.h>
35 #endif
36 #include <sys/types.h>
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #endif
41 #ifdef HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44 #include <fcntl.h>
45 
46 #include <thrift/concurrency/Monitor.h>
47 #include <thrift/transport/TSocket.h>
48 #include <thrift/transport/TTransportException.h>
49 #include <thrift/transport/PlatformSocket.h>
50 
51 #ifndef SOCKOPT_CAST_T
52 #ifndef _WIN32
53 #define SOCKOPT_CAST_T void
54 #else
55 #define SOCKOPT_CAST_T char
56 #endif // _WIN32
57 #endif
58 
59 template <class T>
const_cast_sockopt(const T * v)60 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
61   return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
62 }
63 
64 template <class T>
cast_sockopt(T * v)65 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
66   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
67 }
68 
69 using std::string;
70 
71 namespace apache {
72 namespace thrift {
73 namespace transport {
74 
75 /**
76  * TSocket implementation.
77  *
78  */
79 
TSocket(const string & host,int port)80 TSocket::TSocket(const string& host, int port)
81   : host_(host),
82     port_(port),
83     socket_(THRIFT_INVALID_SOCKET),
84     peerPort_(0),
85     connTimeout_(0),
86     sendTimeout_(0),
87     recvTimeout_(0),
88     keepAlive_(false),
89     lingerOn_(1),
90     lingerVal_(0),
91     noDelay_(1),
92     maxRecvRetries_(5) {
93 }
94 
TSocket(const string & path)95 TSocket::TSocket(const string& path)
96   : port_(0),
97     path_(path),
98     socket_(THRIFT_INVALID_SOCKET),
99     peerPort_(0),
100     connTimeout_(0),
101     sendTimeout_(0),
102     recvTimeout_(0),
103     keepAlive_(false),
104     lingerOn_(1),
105     lingerVal_(0),
106     noDelay_(1),
107     maxRecvRetries_(5) {
108   cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
109 }
110 
TSocket()111 TSocket::TSocket()
112   : port_(0),
113     socket_(THRIFT_INVALID_SOCKET),
114     peerPort_(0),
115     connTimeout_(0),
116     sendTimeout_(0),
117     recvTimeout_(0),
118     keepAlive_(false),
119     lingerOn_(1),
120     lingerVal_(0),
121     noDelay_(1),
122     maxRecvRetries_(5) {
123   cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
124 }
125 
TSocket(THRIFT_SOCKET socket)126 TSocket::TSocket(THRIFT_SOCKET socket)
127   : port_(0),
128     socket_(socket),
129     peerPort_(0),
130     connTimeout_(0),
131     sendTimeout_(0),
132     recvTimeout_(0),
133     keepAlive_(false),
134     lingerOn_(1),
135     lingerVal_(0),
136     noDelay_(1),
137     maxRecvRetries_(5) {
138   cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
139 #ifdef SO_NOSIGPIPE
140   {
141     int one = 1;
142     setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
143   }
144 #endif
145 }
146 
TSocket(THRIFT_SOCKET socket,stdcxx::shared_ptr<THRIFT_SOCKET> interruptListener)147 TSocket::TSocket(THRIFT_SOCKET socket, stdcxx::shared_ptr<THRIFT_SOCKET> interruptListener)
148   : port_(0),
149     socket_(socket),
150     peerPort_(0),
151     interruptListener_(interruptListener),
152     connTimeout_(0),
153     sendTimeout_(0),
154     recvTimeout_(0),
155     keepAlive_(false),
156     lingerOn_(1),
157     lingerVal_(0),
158     noDelay_(1),
159     maxRecvRetries_(5) {
160   cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
161 #ifdef SO_NOSIGPIPE
162   {
163     int one = 1;
164     setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
165   }
166 #endif
167 }
168 
~TSocket()169 TSocket::~TSocket() {
170   close();
171 }
172 
hasPendingDataToRead()173 bool TSocket::hasPendingDataToRead() {
174   if (!isOpen()) {
175     return false;
176   }
177 
178   int32_t retries = 0;
179   THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE numBytesAvailable;
180 try_again:
181   int r = THRIFT_IOCTL_SOCKET(socket_, FIONREAD, &numBytesAvailable);
182   if (r == -1) {
183     int errno_copy = THRIFT_GET_SOCKET_ERROR;
184     if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
185       goto try_again;
186     }
187     GlobalOutput.perror("TSocket::hasPendingDataToRead() THRIFT_IOCTL_SOCKET() " + getSocketInfo(), errno_copy);
188     throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
189   }
190   return numBytesAvailable > 0;
191 }
192 
isOpen()193 bool TSocket::isOpen() {
194   return (socket_ != THRIFT_INVALID_SOCKET);
195 }
196 
peek()197 bool TSocket::peek() {
198   if (!isOpen()) {
199     return false;
200   }
201   if (interruptListener_) {
202     for (int retries = 0;;) {
203       struct THRIFT_POLLFD fds[2];
204       std::memset(fds, 0, sizeof(fds));
205       fds[0].fd = socket_;
206       fds[0].events = THRIFT_POLLIN;
207       fds[1].fd = *(interruptListener_.get());
208       fds[1].events = THRIFT_POLLIN;
209       int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
210       int errno_copy = THRIFT_GET_SOCKET_ERROR;
211       if (ret < 0) {
212         // error cases
213         if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
214           continue;
215         }
216         GlobalOutput.perror("TSocket::peek() THRIFT_POLL() ", errno_copy);
217         throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
218       } else if (ret > 0) {
219         // Check the interruptListener
220         if (fds[1].revents & THRIFT_POLLIN) {
221           return false;
222         }
223         // There must be data or a disconnection, fall through to the PEEK
224         break;
225       } else {
226         // timeout
227         return false;
228       }
229     }
230   }
231 
232   // Check to see if data is available or if the remote side closed
233   uint8_t buf;
234   int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
235   if (r == -1) {
236     int errno_copy = THRIFT_GET_SOCKET_ERROR;
237 #if defined __FreeBSD__ || defined __MACH__
238     /* shigin:
239      * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by
240      * the other side
241      */
242     if (errno_copy == THRIFT_ECONNRESET) {
243       return false;
244     }
245 #endif
246     GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
247     throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
248   }
249   return (r > 0);
250 }
251 
openConnection(struct addrinfo * res)252 void TSocket::openConnection(struct addrinfo* res) {
253 
254   if (isOpen()) {
255     return;
256   }
257 
258   if (!path_.empty()) {
259     socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
260   } else {
261     socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
262   }
263 
264   if (socket_ == THRIFT_INVALID_SOCKET) {
265     int errno_copy = THRIFT_GET_SOCKET_ERROR;
266     GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
267     throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
268   }
269 
270   // Send timeout
271   if (sendTimeout_ > 0) {
272     setSendTimeout(sendTimeout_);
273   }
274 
275   // Recv timeout
276   if (recvTimeout_ > 0) {
277     setRecvTimeout(recvTimeout_);
278   }
279 
280   if (keepAlive_) {
281     setKeepAlive(keepAlive_);
282   }
283 
284   // Linger
285   setLinger(lingerOn_, lingerVal_);
286 
287   // No delay
288   setNoDelay(noDelay_);
289 
290 #ifdef SO_NOSIGPIPE
291   {
292     int one = 1;
293     setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
294   }
295 #endif
296 
297 // Uses a low min RTO if asked to.
298 #ifdef TCP_LOW_MIN_RTO
299   if (getUseLowMinRto()) {
300     int one = 1;
301     setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
302   }
303 #endif
304 
305   // Set the socket to be non blocking for connect if a timeout exists
306   int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0);
307   if (connTimeout_ > 0) {
308     if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
309       int errno_copy = THRIFT_GET_SOCKET_ERROR;
310       GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy);
311       throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
312     }
313   } else {
314     if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) {
315       int errno_copy = THRIFT_GET_SOCKET_ERROR;
316       GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
317       throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
318     }
319   }
320 
321   // Connect the socket
322   int ret;
323   if (!path_.empty()) {
324 
325 #ifndef _WIN32
326     size_t len = path_.size() + 1;
327     if (len > sizeof(((sockaddr_un*)NULL)->sun_path)) {
328       int errno_copy = THRIFT_GET_SOCKET_ERROR;
329       GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy);
330       throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
331     }
332 
333     struct sockaddr_un address;
334     address.sun_family = AF_UNIX;
335     memcpy(address.sun_path, path_.c_str(), len);
336 
337     socklen_t structlen = static_cast<socklen_t>(sizeof(address));
338 
339     if (!address.sun_path[0]) { // abstract namespace socket
340 #ifdef __linux__
341       // sun_path is not null-terminated in this case and structlen determines its length
342       structlen -= sizeof(address.sun_path) - len;
343 #else
344       GlobalOutput.perror("TSocket::open() Abstract Namespace Domain sockets only supported on linux: ", -99);
345       throw TTransportException(TTransportException::NOT_OPEN,
346                                 " Abstract Namespace Domain socket path not supported");
347 #endif
348     }
349 
350     ret = connect(socket_, (struct sockaddr*)&address, structlen);
351 #else
352     GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
353     throw TTransportException(TTransportException::NOT_OPEN,
354                               " Unix Domain socket path not supported");
355 #endif
356 
357   } else {
358     ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
359   }
360 
361   // success case
362   if (ret == 0) {
363     goto done;
364   }
365 
366   if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS)
367       && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK)) {
368     int errno_copy = THRIFT_GET_SOCKET_ERROR;
369     GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
370     throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
371   }
372 
373   struct THRIFT_POLLFD fds[1];
374   std::memset(fds, 0, sizeof(fds));
375   fds[0].fd = socket_;
376   fds[0].events = THRIFT_POLLOUT;
377   ret = THRIFT_POLL(fds, 1, connTimeout_);
378 
379   if (ret > 0) {
380     // Ensure the socket is connected and that there are no errors set
381     int val;
382     socklen_t lon;
383     lon = sizeof(int);
384     int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);
385     if (ret2 == -1) {
386       int errno_copy = THRIFT_GET_SOCKET_ERROR;
387       GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
388       throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
389     }
390     // no errors on socket, go to town
391     if (val == 0) {
392       goto done;
393     }
394     GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(),
395                         val);
396     throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
397   } else if (ret == 0) {
398     // socket timed out
399     string errStr = "TSocket::open() timed out " + getSocketInfo();
400     GlobalOutput(errStr.c_str());
401     throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
402   } else {
403     // error on THRIFT_POLL()
404     int errno_copy = THRIFT_GET_SOCKET_ERROR;
405     GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy);
406     throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy);
407   }
408 
409 done:
410   // Set socket back to normal mode (blocking)
411   if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags)) {
412     int errno_copy = THRIFT_GET_SOCKET_ERROR;
413     GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
414     throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
415   }
416 
417   if (path_.empty()) {
418     setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen));
419   }
420 }
421 
open()422 void TSocket::open() {
423   if (isOpen()) {
424     return;
425   }
426   if (!path_.empty()) {
427     unix_open();
428   } else {
429     local_open();
430   }
431 }
432 
unix_open()433 void TSocket::unix_open() {
434   if (!path_.empty()) {
435     // Unix Domain SOcket does not need addrinfo struct, so we pass NULL
436     openConnection(NULL);
437   }
438 }
439 
local_open()440 void TSocket::local_open() {
441 
442 #ifdef _WIN32
443   TWinsockSingleton::create();
444 #endif // _WIN32
445 
446   if (isOpen()) {
447     return;
448   }
449 
450   // Validate port number
451   if (port_ < 0 || port_ > 0xFFFF) {
452     throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
453   }
454 
455   struct addrinfo hints, *res, *res0;
456   res = NULL;
457   res0 = NULL;
458   int error;
459   char port[sizeof("65535")];
460   std::memset(&hints, 0, sizeof(hints));
461   hints.ai_family = PF_UNSPEC;
462   hints.ai_socktype = SOCK_STREAM;
463   hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
464   sprintf(port, "%d", port_);
465 
466   error = getaddrinfo(host_.c_str(), port, &hints, &res0);
467 
468 #ifdef _WIN32
469   if (error == WSANO_DATA) {
470     hints.ai_flags &= ~AI_ADDRCONFIG;
471     error = getaddrinfo(host_.c_str(), port, &hints, &res0);
472   }
473 #endif
474 
475   if (error) {
476     string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo()
477                     + string(THRIFT_GAI_STRERROR(error));
478     GlobalOutput(errStr.c_str());
479     close();
480     throw TTransportException(TTransportException::NOT_OPEN,
481                               "Could not resolve host for client socket.");
482   }
483 
484   // Cycle through all the returned addresses until one
485   // connects or push the exception up.
486   for (res = res0; res; res = res->ai_next) {
487     try {
488       openConnection(res);
489       break;
490     } catch (TTransportException&) {
491       if (res->ai_next) {
492         close();
493       } else {
494         close();
495         freeaddrinfo(res0); // cleanup on failure
496         throw;
497       }
498     }
499   }
500 
501   // Free address structure memory
502   freeaddrinfo(res0);
503 }
504 
close()505 void TSocket::close() {
506   if (socket_ != THRIFT_INVALID_SOCKET) {
507     shutdown(socket_, THRIFT_SHUT_RDWR);
508     ::THRIFT_CLOSESOCKET(socket_);
509   }
510   socket_ = THRIFT_INVALID_SOCKET;
511 }
512 
setSocketFD(THRIFT_SOCKET socket)513 void TSocket::setSocketFD(THRIFT_SOCKET socket) {
514   if (socket_ != THRIFT_INVALID_SOCKET) {
515     close();
516   }
517   socket_ = socket;
518 }
519 
read(uint8_t * buf,uint32_t len)520 uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
521   if (socket_ == THRIFT_INVALID_SOCKET) {
522     throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
523   }
524 
525   int32_t retries = 0;
526 
527   // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
528   // the system is out of resources (an awesome undocumented feature).
529   // The following is an approximation of the time interval under which
530   // THRIFT_EAGAIN is taken to indicate an out of resources error.
531   uint32_t eagainThresholdMicros = 0;
532   if (recvTimeout_) {
533     // if a readTimeout is specified along with a max number of recv retries, then
534     // the threshold will ensure that the read timeout is not exceeded even in the
535     // case of resource errors
536     eagainThresholdMicros = (recvTimeout_ * 1000) / ((maxRecvRetries_ > 0) ? maxRecvRetries_ : 2);
537   }
538 
539 try_again:
540   // Read from the socket
541   struct timeval begin;
542   if (recvTimeout_ > 0) {
543     THRIFT_GETTIMEOFDAY(&begin, NULL);
544   } else {
545     // if there is no read timeout we don't need the TOD to determine whether
546     // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
547     begin.tv_sec = begin.tv_usec = 0;
548   }
549 
550   int got = 0;
551 
552   if (interruptListener_) {
553     struct THRIFT_POLLFD fds[2];
554     std::memset(fds, 0, sizeof(fds));
555     fds[0].fd = socket_;
556     fds[0].events = THRIFT_POLLIN;
557     fds[1].fd = *(interruptListener_.get());
558     fds[1].events = THRIFT_POLLIN;
559 
560     int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
561     int errno_copy = THRIFT_GET_SOCKET_ERROR;
562     if (ret < 0) {
563       // error cases
564       if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
565         goto try_again;
566       }
567       GlobalOutput.perror("TSocket::read() THRIFT_POLL() ", errno_copy);
568       throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
569     } else if (ret > 0) {
570       // Check the interruptListener
571       if (fds[1].revents & THRIFT_POLLIN) {
572         throw TTransportException(TTransportException::INTERRUPTED, "Interrupted");
573       }
574     } else /* ret == 0 */ {
575       throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_EAGAIN (timed out)");
576     }
577 
578     // falling through means there is something to recv and it cannot block
579   }
580 
581   got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
582   // THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
583   int errno_copy = THRIFT_GET_SOCKET_ERROR;
584 
585   // Check for error on read
586   if (got < 0) {
587     if (errno_copy == THRIFT_EAGAIN) {
588       // if no timeout we can assume that resource exhaustion has occurred.
589       if (recvTimeout_ == 0) {
590         throw TTransportException(TTransportException::TIMED_OUT,
591                                   "THRIFT_EAGAIN (unavailable resources)");
592       }
593       // check if this is the lack of resources or timeout case
594       struct timeval end;
595       THRIFT_GETTIMEOFDAY(&end, NULL);
596       uint32_t readElapsedMicros = static_cast<uint32_t>(((end.tv_sec - begin.tv_sec) * 1000 * 1000)
597                                                          + (end.tv_usec - begin.tv_usec));
598 
599       if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
600         if (retries++ < maxRecvRetries_) {
601           THRIFT_SLEEP_USEC(50);
602           goto try_again;
603         } else {
604           throw TTransportException(TTransportException::TIMED_OUT,
605                                     "THRIFT_EAGAIN (unavailable resources)");
606         }
607       } else {
608         // infer that timeout has been hit
609         throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_EAGAIN (timed out)");
610       }
611     }
612 
613     // If interrupted, try again
614     if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_) {
615       goto try_again;
616     }
617 
618     if (errno_copy == THRIFT_ECONNRESET) {
619       return 0;
620     }
621 
622     // This ish isn't open
623     if (errno_copy == THRIFT_ENOTCONN) {
624       throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN");
625     }
626 
627     // Timed out!
628     if (errno_copy == THRIFT_ETIMEDOUT) {
629       throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
630     }
631 
632     // Now it's not a try again case, but a real probblez
633     GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
634 
635     // Some other error, whatevz
636     throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
637   }
638 
639   return got;
640 }
641 
write(const uint8_t * buf,uint32_t len)642 void TSocket::write(const uint8_t* buf, uint32_t len) {
643   uint32_t sent = 0;
644 
645   while (sent < len) {
646     uint32_t b = write_partial(buf + sent, len - sent);
647     if (b == 0) {
648       // This should only happen if the timeout set with SO_SNDTIMEO expired.
649       // Raise an exception.
650       throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired");
651     }
652     sent += b;
653   }
654 }
655 
write_partial(const uint8_t * buf,uint32_t len)656 uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
657   if (socket_ == THRIFT_INVALID_SOCKET) {
658     throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
659   }
660 
661   uint32_t sent = 0;
662 
663   int flags = 0;
664 #ifdef MSG_NOSIGNAL
665   // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
666   // check for the THRIFT_EPIPE return condition and close the socket in that case
667   flags |= MSG_NOSIGNAL;
668 #endif // ifdef MSG_NOSIGNAL
669 
670   int b = static_cast<int>(send(socket_, const_cast_sockopt(buf + sent), len - sent, flags));
671 
672   if (b < 0) {
673     if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN) {
674       return 0;
675     }
676     // Fail on a send error
677     int errno_copy = THRIFT_GET_SOCKET_ERROR;
678     GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy);
679 
680     if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET
681         || errno_copy == THRIFT_ENOTCONN) {
682       throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
683     }
684 
685     throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
686   }
687 
688   // Fail on blocked send
689   if (b == 0) {
690     throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
691   }
692   return b;
693 }
694 
getHost()695 std::string TSocket::getHost() {
696   return host_;
697 }
698 
getPort()699 int TSocket::getPort() {
700   return port_;
701 }
702 
setHost(string host)703 void TSocket::setHost(string host) {
704   host_ = host;
705 }
706 
setPort(int port)707 void TSocket::setPort(int port) {
708   port_ = port;
709 }
710 
setLinger(bool on,int linger)711 void TSocket::setLinger(bool on, int linger) {
712   lingerOn_ = on;
713   lingerVal_ = linger;
714   if (socket_ == THRIFT_INVALID_SOCKET) {
715     return;
716   }
717 
718 #ifndef _WIN32
719   struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
720 #else
721   struct linger l = {static_cast<u_short>(lingerOn_ ? 1 : 0), static_cast<u_short>(lingerVal_)};
722 #endif
723 
724   int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l));
725   if (ret == -1) {
726     int errno_copy
727         = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
728     GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
729   }
730 }
731 
setNoDelay(bool noDelay)732 void TSocket::setNoDelay(bool noDelay) {
733   noDelay_ = noDelay;
734   if (socket_ == THRIFT_INVALID_SOCKET || !path_.empty()) {
735     return;
736   }
737 
738   // Set socket to NODELAY
739   int v = noDelay_ ? 1 : 0;
740   int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v));
741   if (ret == -1) {
742     int errno_copy
743         = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
744     GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
745   }
746 }
747 
setConnTimeout(int ms)748 void TSocket::setConnTimeout(int ms) {
749   connTimeout_ = ms;
750 }
751 
setGenericTimeout(THRIFT_SOCKET s,int timeout_ms,int optname)752 void setGenericTimeout(THRIFT_SOCKET s, int timeout_ms, int optname) {
753   if (timeout_ms < 0) {
754     char errBuf[512];
755     sprintf(errBuf, "TSocket::setGenericTimeout with negative input: %d\n", timeout_ms);
756     GlobalOutput(errBuf);
757     return;
758   }
759 
760   if (s == THRIFT_INVALID_SOCKET) {
761     return;
762   }
763 
764 #ifdef _WIN32
765   DWORD platform_time = static_cast<DWORD>(timeout_ms);
766 #else
767   struct timeval platform_time = {(int)(timeout_ms / 1000), (int)((timeout_ms % 1000) * 1000)};
768 #endif
769 
770   int ret = setsockopt(s, SOL_SOCKET, optname, cast_sockopt(&platform_time), sizeof(platform_time));
771   if (ret == -1) {
772     int errno_copy
773         = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
774     GlobalOutput.perror("TSocket::setGenericTimeout() setsockopt() ", errno_copy);
775   }
776 }
777 
setRecvTimeout(int ms)778 void TSocket::setRecvTimeout(int ms) {
779   setGenericTimeout(socket_, ms, SO_RCVTIMEO);
780   recvTimeout_ = ms;
781 }
782 
setSendTimeout(int ms)783 void TSocket::setSendTimeout(int ms) {
784   setGenericTimeout(socket_, ms, SO_SNDTIMEO);
785   sendTimeout_ = ms;
786 }
787 
setKeepAlive(bool keepAlive)788 void TSocket::setKeepAlive(bool keepAlive) {
789   keepAlive_ = keepAlive;
790 
791   if (socket_ == THRIFT_INVALID_SOCKET) {
792     return;
793   }
794 
795   int value = keepAlive_;
796   int ret
797       = setsockopt(socket_, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&value), sizeof(value));
798 
799   if (ret == -1) {
800     int errno_copy
801         = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
802     GlobalOutput.perror("TSocket::setKeepAlive() setsockopt() " + getSocketInfo(), errno_copy);
803   }
804 }
805 
setMaxRecvRetries(int maxRecvRetries)806 void TSocket::setMaxRecvRetries(int maxRecvRetries) {
807   maxRecvRetries_ = maxRecvRetries;
808 }
809 
getSocketInfo()810 string TSocket::getSocketInfo() {
811   std::ostringstream oss;
812   if (path_.empty()) {
813     if (host_.empty() || port_ == 0) {
814       oss << "<Host: " << getPeerAddress();
815       oss << " Port: " << getPeerPort() << ">";
816     } else {
817       oss << "<Host: " << host_ << " Port: " << port_ << ">";
818     }
819   } else {
820     oss << "<Path: " << path_ << ">";
821   }
822   return oss.str();
823 }
824 
getPeerHost()825 std::string TSocket::getPeerHost() {
826   if (peerHost_.empty() && path_.empty()) {
827     struct sockaddr_storage addr;
828     struct sockaddr* addrPtr;
829     socklen_t addrLen;
830 
831     if (socket_ == THRIFT_INVALID_SOCKET) {
832       return host_;
833     }
834 
835     addrPtr = getCachedAddress(&addrLen);
836 
837     if (addrPtr == NULL) {
838       addrLen = sizeof(addr);
839       if (getpeername(socket_, (sockaddr*)&addr, &addrLen) != 0) {
840         return peerHost_;
841       }
842       addrPtr = (sockaddr*)&addr;
843 
844       setCachedAddress(addrPtr, addrLen);
845     }
846 
847     char clienthost[NI_MAXHOST];
848     char clientservice[NI_MAXSERV];
849 
850     getnameinfo((sockaddr*)addrPtr,
851                 addrLen,
852                 clienthost,
853                 sizeof(clienthost),
854                 clientservice,
855                 sizeof(clientservice),
856                 0);
857 
858     peerHost_ = clienthost;
859   }
860   return peerHost_;
861 }
862 
getPeerAddress()863 std::string TSocket::getPeerAddress() {
864   if (peerAddress_.empty() && path_.empty()) {
865     struct sockaddr_storage addr;
866     struct sockaddr* addrPtr;
867     socklen_t addrLen;
868 
869     if (socket_ == THRIFT_INVALID_SOCKET) {
870       return peerAddress_;
871     }
872 
873     addrPtr = getCachedAddress(&addrLen);
874 
875     if (addrPtr == NULL) {
876       addrLen = sizeof(addr);
877       if (getpeername(socket_, (sockaddr*)&addr, &addrLen) != 0) {
878         return peerAddress_;
879       }
880       addrPtr = (sockaddr*)&addr;
881 
882       setCachedAddress(addrPtr, addrLen);
883     }
884 
885     char clienthost[NI_MAXHOST];
886     char clientservice[NI_MAXSERV];
887 
888     getnameinfo(addrPtr,
889                 addrLen,
890                 clienthost,
891                 sizeof(clienthost),
892                 clientservice,
893                 sizeof(clientservice),
894                 NI_NUMERICHOST | NI_NUMERICSERV);
895 
896     peerAddress_ = clienthost;
897     peerPort_ = std::atoi(clientservice);
898   }
899   return peerAddress_;
900 }
901 
getPeerPort()902 int TSocket::getPeerPort() {
903   getPeerAddress();
904   return peerPort_;
905 }
906 
setCachedAddress(const sockaddr * addr,socklen_t len)907 void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
908   if (!path_.empty()) {
909     return;
910   }
911 
912   switch (addr->sa_family) {
913   case AF_INET:
914     if (len == sizeof(sockaddr_in)) {
915       memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
916     }
917     break;
918 
919   case AF_INET6:
920     if (len == sizeof(sockaddr_in6)) {
921       memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
922     }
923     break;
924   }
925   peerAddress_.clear();
926   peerHost_.clear();
927 }
928 
getCachedAddress(socklen_t * len) const929 sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
930   switch (cachedPeerAddr_.ipv4.sin_family) {
931   case AF_INET:
932     *len = sizeof(sockaddr_in);
933     return (sockaddr*)&cachedPeerAddr_.ipv4;
934 
935   case AF_INET6:
936     *len = sizeof(sockaddr_in6);
937     return (sockaddr*)&cachedPeerAddr_.ipv6;
938 
939   default:
940     return NULL;
941   }
942 }
943 
944 bool TSocket::useLowMinRto_ = false;
setUseLowMinRto(bool useLowMinRto)945 void TSocket::setUseLowMinRto(bool useLowMinRto) {
946   useLowMinRto_ = useLowMinRto;
947 }
getUseLowMinRto()948 bool TSocket::getUseLowMinRto() {
949   return useLowMinRto_;
950 }
951 
getOrigin()952 const std::string TSocket::getOrigin() {
953   std::ostringstream oss;
954   oss << getPeerHost() << ":" << getPeerPort();
955   return oss.str();
956 }
957 }
958 }
959 } // apache::thrift::transport
960