1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <cstring>
23 #include <sstream>
24 #ifdef HAVE_SYS_IOCTL_H
25 #include <sys/ioctl.h>
26 #endif
27 #ifdef HAVE_SYS_SOCKET_H
28 #include <sys/socket.h>
29 #endif
30 #ifdef HAVE_SYS_UN_H
31 #include <sys/un.h>
32 #endif
33 #ifdef HAVE_SYS_POLL_H
34 #include <sys/poll.h>
35 #endif
36 #include <sys/types.h>
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #endif
41 #ifdef HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44 #include <fcntl.h>
45 
46 #include <thrift/concurrency/Monitor.h>
47 #include <thrift/transport/TSocket.h>
48 #include <thrift/transport/TTransportException.h>
49 #include <thrift/transport/PlatformSocket.h>
50 #include <thrift/transport/SocketCommon.h>
51 
52 #ifndef SOCKOPT_CAST_T
53 #ifndef _WIN32
54 #define SOCKOPT_CAST_T void
55 #else
56 #define SOCKOPT_CAST_T char
57 #endif // _WIN32
58 #endif
59 
60 template <class T>
const_cast_sockopt(const T * v)61 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
62   return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
63 }
64 
65 template <class T>
cast_sockopt(T * v)66 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
67   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
68 }
69 
70 using std::string;
71 
72 namespace apache {
73 namespace thrift {
74 namespace transport {
75 
76 /**
77  * TSocket implementation.
78  *
79  */
80 
TSocket(const string & host,int port,std::shared_ptr<TConfiguration> config)81 TSocket::TSocket(const string& host, int port, std::shared_ptr<TConfiguration> config)
82   : TVirtualTransport(config),
83     host_(host),
84     port_(port),
85     socket_(THRIFT_INVALID_SOCKET),
86     peerPort_(0),
87     connTimeout_(0),
88     sendTimeout_(0),
89     recvTimeout_(0),
90     keepAlive_(false),
91     lingerOn_(1),
92     lingerVal_(0),
93     noDelay_(1),
94     maxRecvRetries_(5) {
95 }
96 
TSocket(const string & path,std::shared_ptr<TConfiguration> config)97 TSocket::TSocket(const string& path, std::shared_ptr<TConfiguration> config)
98   : TVirtualTransport(config),
99     port_(0),
100     path_(path),
101     socket_(THRIFT_INVALID_SOCKET),
102     peerPort_(0),
103     connTimeout_(0),
104     sendTimeout_(0),
105     recvTimeout_(0),
106     keepAlive_(false),
107     lingerOn_(1),
108     lingerVal_(0),
109     noDelay_(1),
110     maxRecvRetries_(5) {
111   cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
112 }
113 
TSocket(std::shared_ptr<TConfiguration> config)114 TSocket::TSocket(std::shared_ptr<TConfiguration> config)
115   : TVirtualTransport(config),
116     port_(0),
117     socket_(THRIFT_INVALID_SOCKET),
118     peerPort_(0),
119     connTimeout_(0),
120     sendTimeout_(0),
121     recvTimeout_(0),
122     keepAlive_(false),
123     lingerOn_(1),
124     lingerVal_(0),
125     noDelay_(1),
126     maxRecvRetries_(5) {
127   cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
128 }
129 
TSocket(THRIFT_SOCKET socket,std::shared_ptr<TConfiguration> config)130 TSocket::TSocket(THRIFT_SOCKET socket, std::shared_ptr<TConfiguration> config)
131   : TVirtualTransport(config),
132     port_(0),
133     socket_(socket),
134     peerPort_(0),
135     connTimeout_(0),
136     sendTimeout_(0),
137     recvTimeout_(0),
138     keepAlive_(false),
139     lingerOn_(1),
140     lingerVal_(0),
141     noDelay_(1),
142     maxRecvRetries_(5) {
143   cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
144 #ifdef SO_NOSIGPIPE
145   {
146     int one = 1;
147     setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
148   }
149 #endif
150 }
151 
TSocket(THRIFT_SOCKET socket,std::shared_ptr<THRIFT_SOCKET> interruptListener,std::shared_ptr<TConfiguration> config)152 TSocket::TSocket(THRIFT_SOCKET socket, std::shared_ptr<THRIFT_SOCKET> interruptListener,
153                 std::shared_ptr<TConfiguration> config)
154   : TVirtualTransport(config),
155     port_(0),
156     socket_(socket),
157     peerPort_(0),
158     interruptListener_(interruptListener),
159     connTimeout_(0),
160     sendTimeout_(0),
161     recvTimeout_(0),
162     keepAlive_(false),
163     lingerOn_(1),
164     lingerVal_(0),
165     noDelay_(1),
166     maxRecvRetries_(5) {
167   cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
168 #ifdef SO_NOSIGPIPE
169   {
170     int one = 1;
171     setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
172   }
173 #endif
174 }
175 
~TSocket()176 TSocket::~TSocket() {
177   close();
178 }
179 
hasPendingDataToRead()180 bool TSocket::hasPendingDataToRead() {
181   if (!isOpen()) {
182     return false;
183   }
184 
185   int32_t retries = 0;
186   THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE numBytesAvailable;
187 try_again:
188   int r = THRIFT_IOCTL_SOCKET(socket_, FIONREAD, &numBytesAvailable);
189   if (r == -1) {
190     int errno_copy = THRIFT_GET_SOCKET_ERROR;
191     if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
192       goto try_again;
193     }
194     GlobalOutput.perror("TSocket::hasPendingDataToRead() THRIFT_IOCTL_SOCKET() " + getSocketInfo(), errno_copy);
195     throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
196   }
197   return numBytesAvailable > 0;
198 }
199 
isOpen() const200 bool TSocket::isOpen() const {
201   return (socket_ != THRIFT_INVALID_SOCKET);
202 }
203 
peek()204 bool TSocket::peek() {
205   if (!isOpen()) {
206     return false;
207   }
208   if (interruptListener_) {
209     for (int retries = 0;;) {
210       struct THRIFT_POLLFD fds[2];
211       std::memset(fds, 0, sizeof(fds));
212       fds[0].fd = socket_;
213       fds[0].events = THRIFT_POLLIN;
214       fds[1].fd = *(interruptListener_.get());
215       fds[1].events = THRIFT_POLLIN;
216       int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
217       int errno_copy = THRIFT_GET_SOCKET_ERROR;
218       if (ret < 0) {
219         // error cases
220         if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
221           continue;
222         }
223         GlobalOutput.perror("TSocket::peek() THRIFT_POLL() ", errno_copy);
224         throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
225       } else if (ret > 0) {
226         // Check the interruptListener
227         if (fds[1].revents & THRIFT_POLLIN) {
228           return false;
229         }
230         // There must be data or a disconnection, fall through to the PEEK
231         break;
232       } else {
233         // timeout
234         return false;
235       }
236     }
237   }
238 
239   // Check to see if data is available or if the remote side closed
240   uint8_t buf;
241   int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
242   if (r == -1) {
243     int errno_copy = THRIFT_GET_SOCKET_ERROR;
244 #if defined __FreeBSD__ || defined __MACH__
245     /* shigin:
246      * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by
247      * the other side
248      */
249     if (errno_copy == THRIFT_ECONNRESET) {
250       return false;
251     }
252 #endif
253     GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
254     throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
255   }
256   return (r > 0);
257 }
258 
openConnection(struct addrinfo * res)259 void TSocket::openConnection(struct addrinfo* res) {
260 
261   if (isOpen()) {
262     return;
263   }
264 
265   if (!path_.empty()) {
266     socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
267   } else {
268     socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
269   }
270 
271   if (socket_ == THRIFT_INVALID_SOCKET) {
272     int errno_copy = THRIFT_GET_SOCKET_ERROR;
273     GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
274     throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
275   }
276 
277   // Send timeout
278   if (sendTimeout_ > 0) {
279     setSendTimeout(sendTimeout_);
280   }
281 
282   // Recv timeout
283   if (recvTimeout_ > 0) {
284     setRecvTimeout(recvTimeout_);
285   }
286 
287   if (keepAlive_) {
288     setKeepAlive(keepAlive_);
289   }
290 
291   // Linger
292   setLinger(lingerOn_, lingerVal_);
293 
294   // No delay
295   setNoDelay(noDelay_);
296 
297 #ifdef SO_NOSIGPIPE
298   {
299     int one = 1;
300     setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
301   }
302 #endif
303 
304 // Uses a low min RTO if asked to.
305 #ifdef TCP_LOW_MIN_RTO
306   if (getUseLowMinRto()) {
307     int one = 1;
308     setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
309   }
310 #endif
311 
312   // Set the socket to be non blocking for connect if a timeout exists
313   int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0);
314   if (connTimeout_ > 0) {
315     if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
316       int errno_copy = THRIFT_GET_SOCKET_ERROR;
317       GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy);
318       throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
319     }
320   } else {
321     if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) {
322       int errno_copy = THRIFT_GET_SOCKET_ERROR;
323       GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
324       throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
325     }
326   }
327 
328   // Connect the socket
329   int ret;
330   if (!path_.empty()) {
331 
332 /*
333  * TODO: seems that windows now support unix sockets,
334  *       see: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
335  */
336 #ifndef _WIN32
337 
338     struct sockaddr_un address;
339     socklen_t structlen = fillUnixSocketAddr(address, path_);
340 
341     ret = connect(socket_, (struct sockaddr*)&address, structlen);
342 #else
343     GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
344     throw TTransportException(TTransportException::NOT_OPEN,
345                               " Unix Domain socket path not supported");
346 #endif
347 
348   } else {
349     ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
350   }
351 
352   // success case
353   if (ret == 0) {
354     goto done;
355   }
356 
357   if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS)
358       && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK)) {
359     int errno_copy = THRIFT_GET_SOCKET_ERROR;
360     GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
361     throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
362   }
363 
364   struct THRIFT_POLLFD fds[1];
365   std::memset(fds, 0, sizeof(fds));
366   fds[0].fd = socket_;
367   fds[0].events = THRIFT_POLLOUT;
368   ret = THRIFT_POLL(fds, 1, connTimeout_);
369 
370   if (ret > 0) {
371     // Ensure the socket is connected and that there are no errors set
372     int val;
373     socklen_t lon;
374     lon = sizeof(int);
375     int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);
376     if (ret2 == -1) {
377       int errno_copy = THRIFT_GET_SOCKET_ERROR;
378       GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
379       throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
380     }
381     // no errors on socket, go to town
382     if (val == 0) {
383       goto done;
384     }
385     GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(),
386                         val);
387     throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
388   } else if (ret == 0) {
389     // socket timed out
390     string errStr = "TSocket::open() timed out " + getSocketInfo();
391     GlobalOutput(errStr.c_str());
392     throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
393   } else {
394     // error on THRIFT_POLL()
395     int errno_copy = THRIFT_GET_SOCKET_ERROR;
396     GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy);
397     throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy);
398   }
399 
400 done:
401   // Set socket back to normal mode (blocking)
402   if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags)) {
403     int errno_copy = THRIFT_GET_SOCKET_ERROR;
404     GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
405     throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
406   }
407 
408   if (path_.empty()) {
409     setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen));
410   }
411 }
412 
open()413 void TSocket::open() {
414   if (isOpen()) {
415     return;
416   }
417   if (!path_.empty()) {
418     unix_open();
419   } else {
420     local_open();
421   }
422 }
423 
unix_open()424 void TSocket::unix_open() {
425   if (!path_.empty()) {
426     // Unix Domain Socket does not need addrinfo struct, so we pass NULL
427     openConnection(nullptr);
428   }
429 }
430 
local_open()431 void TSocket::local_open() {
432 
433 #ifdef _WIN32
434   TWinsockSingleton::create();
435 #endif // _WIN32
436 
437   if (isOpen()) {
438     return;
439   }
440 
441   // Validate port number
442   if (port_ < 0 || port_ > 0xFFFF) {
443     throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
444   }
445 
446   struct addrinfo hints, *res, *res0;
447   res = nullptr;
448   res0 = nullptr;
449   int error;
450   char port[sizeof("65535")];
451   std::memset(&hints, 0, sizeof(hints));
452   hints.ai_family = PF_UNSPEC;
453   hints.ai_socktype = SOCK_STREAM;
454   hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
455   sprintf(port, "%d", port_);
456 
457   error = getaddrinfo(host_.c_str(), port, &hints, &res0);
458 
459   if (
460 #ifdef _WIN32
461       error == WSANO_DATA
462 #else
463       error == EAI_NODATA
464 #endif
465     ) {
466     hints.ai_flags &= ~AI_ADDRCONFIG;
467     error = getaddrinfo(host_.c_str(), port, &hints, &res0);
468   }
469 
470   if (error) {
471     string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo()
472                     + string(THRIFT_GAI_STRERROR(error));
473     GlobalOutput(errStr.c_str());
474     close();
475     throw TTransportException(TTransportException::NOT_OPEN,
476                               "Could not resolve host for client socket.");
477   }
478 
479   // Cycle through all the returned addresses until one
480   // connects or push the exception up.
481   for (res = res0; res; res = res->ai_next) {
482     try {
483       openConnection(res);
484       break;
485     } catch (TTransportException&) {
486       if (res->ai_next) {
487         close();
488       } else {
489         close();
490         freeaddrinfo(res0); // cleanup on failure
491         throw;
492       }
493     }
494   }
495 
496   // Free address structure memory
497   freeaddrinfo(res0);
498 }
499 
close()500 void TSocket::close() {
501   if (socket_ != THRIFT_INVALID_SOCKET) {
502     shutdown(socket_, THRIFT_SHUT_RDWR);
503     ::THRIFT_CLOSESOCKET(socket_);
504   }
505   socket_ = THRIFT_INVALID_SOCKET;
506 }
507 
setSocketFD(THRIFT_SOCKET socket)508 void TSocket::setSocketFD(THRIFT_SOCKET socket) {
509   if (socket_ != THRIFT_INVALID_SOCKET) {
510     close();
511   }
512   socket_ = socket;
513 }
514 
read(uint8_t * buf,uint32_t len)515 uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
516   checkReadBytesAvailable(len);
517   if (socket_ == THRIFT_INVALID_SOCKET) {
518     throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
519   }
520 
521   int32_t retries = 0;
522 
523   // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
524   // the system is out of resources (an awesome undocumented feature).
525   // The following is an approximation of the time interval under which
526   // THRIFT_EAGAIN is taken to indicate an out of resources error.
527   uint32_t eagainThresholdMicros = 0;
528   if (recvTimeout_) {
529     // if a readTimeout is specified along with a max number of recv retries, then
530     // the threshold will ensure that the read timeout is not exceeded even in the
531     // case of resource errors
532     eagainThresholdMicros = (recvTimeout_ * 1000) / ((maxRecvRetries_ > 0) ? maxRecvRetries_ : 2);
533   }
534 
535 try_again:
536   // Read from the socket
537   struct timeval begin;
538   if (recvTimeout_ > 0) {
539     THRIFT_GETTIMEOFDAY(&begin, nullptr);
540   } else {
541     // if there is no read timeout we don't need the TOD to determine whether
542     // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
543     begin.tv_sec = begin.tv_usec = 0;
544   }
545 
546   int got = 0;
547 
548   if (interruptListener_) {
549     struct THRIFT_POLLFD fds[2];
550     std::memset(fds, 0, sizeof(fds));
551     fds[0].fd = socket_;
552     fds[0].events = THRIFT_POLLIN;
553     fds[1].fd = *(interruptListener_.get());
554     fds[1].events = THRIFT_POLLIN;
555 
556     int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
557     int errno_copy = THRIFT_GET_SOCKET_ERROR;
558     if (ret < 0) {
559       // error cases
560       if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
561         goto try_again;
562       }
563       GlobalOutput.perror("TSocket::read() THRIFT_POLL() ", errno_copy);
564       throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
565     } else if (ret > 0) {
566       // Check the interruptListener
567       if (fds[1].revents & THRIFT_POLLIN) {
568         throw TTransportException(TTransportException::INTERRUPTED, "Interrupted");
569       }
570     } else /* ret == 0 */ {
571       throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_EAGAIN (timed out)");
572     }
573 
574     // falling through means there is something to recv and it cannot block
575   }
576 
577   got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
578   // THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
579   int errno_copy = THRIFT_GET_SOCKET_ERROR;
580 
581   // Check for error on read
582   if (got < 0) {
583     if (errno_copy == THRIFT_EAGAIN) {
584       // if no timeout we can assume that resource exhaustion has occurred.
585       if (recvTimeout_ == 0) {
586         throw TTransportException(TTransportException::TIMED_OUT,
587                                   "THRIFT_EAGAIN (unavailable resources)");
588       }
589       // check if this is the lack of resources or timeout case
590       struct timeval end;
591       THRIFT_GETTIMEOFDAY(&end, nullptr);
592       auto readElapsedMicros = static_cast<uint32_t>(((end.tv_sec - begin.tv_sec) * 1000 * 1000)
593                                                          + (end.tv_usec - begin.tv_usec));
594 
595       if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
596         if (retries++ < maxRecvRetries_) {
597           THRIFT_SLEEP_USEC(50);
598           goto try_again;
599         } else {
600           throw TTransportException(TTransportException::TIMED_OUT,
601                                     "THRIFT_EAGAIN (unavailable resources)");
602         }
603       } else {
604         // infer that timeout has been hit
605         throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_EAGAIN (timed out)");
606       }
607     }
608 
609     // If interrupted, try again
610     if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_) {
611       goto try_again;
612     }
613 
614     if (errno_copy == THRIFT_ECONNRESET) {
615       return 0;
616     }
617 
618     // This ish isn't open
619     if (errno_copy == THRIFT_ENOTCONN) {
620       throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN");
621     }
622 
623     // Timed out!
624     if (errno_copy == THRIFT_ETIMEDOUT) {
625       throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
626     }
627 
628     // Now it's not a try again case, but a real probblez
629     GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
630 
631     // Some other error, whatevz
632     throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
633   }
634 
635   return got;
636 }
637 
write(const uint8_t * buf,uint32_t len)638 void TSocket::write(const uint8_t* buf, uint32_t len) {
639   uint32_t sent = 0;
640 
641   while (sent < len) {
642     uint32_t b = write_partial(buf + sent, len - sent);
643     if (b == 0) {
644       // This should only happen if the timeout set with SO_SNDTIMEO expired.
645       // Raise an exception.
646       throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired");
647     }
648     sent += b;
649   }
650 }
651 
write_partial(const uint8_t * buf,uint32_t len)652 uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
653   if (socket_ == THRIFT_INVALID_SOCKET) {
654     throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
655   }
656 
657   uint32_t sent = 0;
658 
659   int flags = 0;
660 #ifdef MSG_NOSIGNAL
661   // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
662   // check for the THRIFT_EPIPE return condition and close the socket in that case
663   flags |= MSG_NOSIGNAL;
664 #endif // ifdef MSG_NOSIGNAL
665 
666   int b = static_cast<int>(send(socket_, const_cast_sockopt(buf + sent), len - sent, flags));
667 
668   if (b < 0) {
669     if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN) {
670       return 0;
671     }
672     // Fail on a send error
673     int errno_copy = THRIFT_GET_SOCKET_ERROR;
674     GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy);
675 
676     if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET
677         || errno_copy == THRIFT_ENOTCONN) {
678       throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
679     }
680 
681     throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
682   }
683 
684   // Fail on blocked send
685   if (b == 0) {
686     throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
687   }
688   return b;
689 }
690 
getHost()691 std::string TSocket::getHost() {
692   return host_;
693 }
694 
getPort()695 int TSocket::getPort() {
696   return port_;
697 }
698 
getPath()699 std::string TSocket::getPath() {
700     return path_;
701 }
702 
setHost(string host)703 void TSocket::setHost(string host) {
704   host_ = host;
705 }
706 
setPort(int port)707 void TSocket::setPort(int port) {
708   port_ = port;
709 }
710 
setPath(std::string path)711 void TSocket::setPath(std::string path) {
712     path_ = path;
713 }
714 
setLinger(bool on,int linger)715 void TSocket::setLinger(bool on, int linger) {
716   lingerOn_ = on;
717   lingerVal_ = linger;
718   if (socket_ == THRIFT_INVALID_SOCKET) {
719     return;
720   }
721 
722 #ifndef _WIN32
723   struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
724 #else
725   struct linger l = {static_cast<u_short>(lingerOn_ ? 1 : 0), static_cast<u_short>(lingerVal_)};
726 #endif
727 
728   int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l));
729   if (ret == -1) {
730     int errno_copy
731         = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
732     GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
733   }
734 }
735 
setNoDelay(bool noDelay)736 void TSocket::setNoDelay(bool noDelay) {
737   noDelay_ = noDelay;
738   if (socket_ == THRIFT_INVALID_SOCKET || !path_.empty()) {
739     return;
740   }
741 
742   // Set socket to NODELAY
743   int v = noDelay_ ? 1 : 0;
744   int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v));
745   if (ret == -1) {
746     int errno_copy
747         = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
748     GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
749   }
750 }
751 
setConnTimeout(int ms)752 void TSocket::setConnTimeout(int ms) {
753   connTimeout_ = ms;
754 }
755 
setGenericTimeout(THRIFT_SOCKET s,int timeout_ms,int optname)756 void setGenericTimeout(THRIFT_SOCKET s, int timeout_ms, int optname) {
757   if (timeout_ms < 0) {
758     char errBuf[512];
759     sprintf(errBuf, "TSocket::setGenericTimeout with negative input: %d\n", timeout_ms);
760     GlobalOutput(errBuf);
761     return;
762   }
763 
764   if (s == THRIFT_INVALID_SOCKET) {
765     return;
766   }
767 
768 #ifdef _WIN32
769   DWORD platform_time = static_cast<DWORD>(timeout_ms);
770 #else
771   struct timeval platform_time = {(int)(timeout_ms / 1000), (int)((timeout_ms % 1000) * 1000)};
772 #endif
773 
774   int ret = setsockopt(s, SOL_SOCKET, optname, cast_sockopt(&platform_time), sizeof(platform_time));
775   if (ret == -1) {
776     int errno_copy
777         = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
778     GlobalOutput.perror("TSocket::setGenericTimeout() setsockopt() ", errno_copy);
779   }
780 }
781 
setRecvTimeout(int ms)782 void TSocket::setRecvTimeout(int ms) {
783   setGenericTimeout(socket_, ms, SO_RCVTIMEO);
784   recvTimeout_ = ms;
785 }
786 
setSendTimeout(int ms)787 void TSocket::setSendTimeout(int ms) {
788   setGenericTimeout(socket_, ms, SO_SNDTIMEO);
789   sendTimeout_ = ms;
790 }
791 
setKeepAlive(bool keepAlive)792 void TSocket::setKeepAlive(bool keepAlive) {
793   keepAlive_ = keepAlive;
794 
795   if (socket_ == THRIFT_INVALID_SOCKET) {
796     return;
797   }
798 
799   int value = keepAlive_;
800   int ret
801       = setsockopt(socket_, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&value), sizeof(value));
802 
803   if (ret == -1) {
804     int errno_copy
805         = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
806     GlobalOutput.perror("TSocket::setKeepAlive() setsockopt() " + getSocketInfo(), errno_copy);
807   }
808 }
809 
setMaxRecvRetries(int maxRecvRetries)810 void TSocket::setMaxRecvRetries(int maxRecvRetries) {
811   maxRecvRetries_ = maxRecvRetries;
812 }
813 
getSocketInfo() const814 string TSocket::getSocketInfo() const {
815   std::ostringstream oss;
816   if (path_.empty()) {
817     if (host_.empty() || port_ == 0) {
818       oss << "<Host: " << getPeerAddress();
819       oss << " Port: " << getPeerPort() << ">";
820     } else {
821       oss << "<Host: " << host_ << " Port: " << port_ << ">";
822     }
823   } else {
824     std::string fmt_path_ = path_;
825     // Handle printing abstract sockets (first character is a '\0' char):
826     if (!fmt_path_.empty() && fmt_path_[0] == '\0')
827       fmt_path_[0] = '@';
828     oss << "<Path: " << fmt_path_ << ">";
829   }
830   return oss.str();
831 }
832 
getPeerHost() const833 std::string TSocket::getPeerHost() const {
834   if (peerHost_.empty() && path_.empty()) {
835     struct sockaddr_storage addr;
836     struct sockaddr* addrPtr;
837     socklen_t addrLen;
838 
839     if (socket_ == THRIFT_INVALID_SOCKET) {
840       return host_;
841     }
842 
843     addrPtr = getCachedAddress(&addrLen);
844 
845     if (addrPtr == nullptr) {
846       addrLen = sizeof(addr);
847       if (getpeername(socket_, (sockaddr*)&addr, &addrLen) != 0) {
848         return peerHost_;
849       }
850       addrPtr = (sockaddr*)&addr;
851 
852       const_cast<TSocket&>(*this).setCachedAddress(addrPtr, addrLen);
853     }
854 
855     char clienthost[NI_MAXHOST];
856     char clientservice[NI_MAXSERV];
857 
858     getnameinfo((sockaddr*)addrPtr,
859                 addrLen,
860                 clienthost,
861                 sizeof(clienthost),
862                 clientservice,
863                 sizeof(clientservice),
864                 0);
865 
866     peerHost_ = clienthost;
867   }
868   return peerHost_;
869 }
870 
getPeerAddress() const871 std::string TSocket::getPeerAddress() const {
872   if (peerAddress_.empty() && path_.empty()) {
873     struct sockaddr_storage addr;
874     struct sockaddr* addrPtr;
875     socklen_t addrLen;
876 
877     if (socket_ == THRIFT_INVALID_SOCKET) {
878       return peerAddress_;
879     }
880 
881     addrPtr = getCachedAddress(&addrLen);
882 
883     if (addrPtr == nullptr) {
884       addrLen = sizeof(addr);
885       if (getpeername(socket_, (sockaddr*)&addr, &addrLen) != 0) {
886         return peerAddress_;
887       }
888       addrPtr = (sockaddr*)&addr;
889 
890       const_cast<TSocket&>(*this).setCachedAddress(addrPtr, addrLen);
891     }
892 
893     char clienthost[NI_MAXHOST];
894     char clientservice[NI_MAXSERV];
895 
896     getnameinfo(addrPtr,
897                 addrLen,
898                 clienthost,
899                 sizeof(clienthost),
900                 clientservice,
901                 sizeof(clientservice),
902                 NI_NUMERICHOST | NI_NUMERICSERV);
903 
904     peerAddress_ = clienthost;
905     peerPort_ = std::atoi(clientservice);
906   }
907   return peerAddress_;
908 }
909 
getPeerPort() const910 int TSocket::getPeerPort() const {
911   getPeerAddress();
912   return peerPort_;
913 }
914 
setCachedAddress(const sockaddr * addr,socklen_t len)915 void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
916   if (!path_.empty()) {
917     return;
918   }
919 
920   switch (addr->sa_family) {
921   case AF_INET:
922     if (len == sizeof(sockaddr_in)) {
923       memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
924     }
925     break;
926 
927   case AF_INET6:
928     if (len == sizeof(sockaddr_in6)) {
929       memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
930     }
931     break;
932   }
933   peerAddress_.clear();
934   peerHost_.clear();
935 }
936 
getCachedAddress(socklen_t * len) const937 sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
938   switch (cachedPeerAddr_.ipv4.sin_family) {
939   case AF_INET:
940     *len = sizeof(sockaddr_in);
941     return (sockaddr*)&cachedPeerAddr_.ipv4;
942 
943   case AF_INET6:
944     *len = sizeof(sockaddr_in6);
945     return (sockaddr*)&cachedPeerAddr_.ipv6;
946 
947   default:
948     return nullptr;
949   }
950 }
951 
952 bool TSocket::useLowMinRto_ = false;
setUseLowMinRto(bool useLowMinRto)953 void TSocket::setUseLowMinRto(bool useLowMinRto) {
954   useLowMinRto_ = useLowMinRto;
955 }
getUseLowMinRto()956 bool TSocket::getUseLowMinRto() {
957   return useLowMinRto_;
958 }
959 
getOrigin() const960 const std::string TSocket::getOrigin() const {
961   std::ostringstream oss;
962   oss << getPeerHost() << ":" << getPeerPort();
963   return oss.str();
964 }
965 }
966 }
967 } // apache::thrift::transport
968