1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <cstring>
23 #include <sstream>
24 #ifdef HAVE_SYS_IOCTL_H
25 #include <sys/ioctl.h>
26 #endif
27 #ifdef HAVE_SYS_SOCKET_H
28 #include <sys/socket.h>
29 #endif
30 #ifdef HAVE_SYS_UN_H
31 #include <sys/un.h>
32 #endif
33 #ifdef HAVE_SYS_POLL_H
34 #include <sys/poll.h>
35 #endif
36 #include <sys/types.h>
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #endif
41 #ifdef HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44 #include <fcntl.h>
45
46 #include <thrift/concurrency/Monitor.h>
47 #include <thrift/transport/TSocket.h>
48 #include <thrift/transport/TTransportException.h>
49 #include <thrift/transport/PlatformSocket.h>
50 #include <thrift/transport/SocketCommon.h>
51
52 #ifndef SOCKOPT_CAST_T
53 #ifndef _WIN32
54 #define SOCKOPT_CAST_T void
55 #else
56 #define SOCKOPT_CAST_T char
57 #endif // _WIN32
58 #endif
59
60 template <class T>
const_cast_sockopt(const T * v)61 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
62 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
63 }
64
65 template <class T>
cast_sockopt(T * v)66 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
67 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
68 }
69
70 using std::string;
71
72 namespace apache {
73 namespace thrift {
74 namespace transport {
75
76 /**
77 * TSocket implementation.
78 *
79 */
80
TSocket(const string & host,int port,std::shared_ptr<TConfiguration> config)81 TSocket::TSocket(const string& host, int port, std::shared_ptr<TConfiguration> config)
82 : TVirtualTransport(config),
83 host_(host),
84 port_(port),
85 socket_(THRIFT_INVALID_SOCKET),
86 peerPort_(0),
87 connTimeout_(0),
88 sendTimeout_(0),
89 recvTimeout_(0),
90 keepAlive_(false),
91 lingerOn_(1),
92 lingerVal_(0),
93 noDelay_(1),
94 maxRecvRetries_(5) {
95 }
96
TSocket(const string & path,std::shared_ptr<TConfiguration> config)97 TSocket::TSocket(const string& path, std::shared_ptr<TConfiguration> config)
98 : TVirtualTransport(config),
99 port_(0),
100 path_(path),
101 socket_(THRIFT_INVALID_SOCKET),
102 peerPort_(0),
103 connTimeout_(0),
104 sendTimeout_(0),
105 recvTimeout_(0),
106 keepAlive_(false),
107 lingerOn_(1),
108 lingerVal_(0),
109 noDelay_(1),
110 maxRecvRetries_(5) {
111 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
112 }
113
TSocket(std::shared_ptr<TConfiguration> config)114 TSocket::TSocket(std::shared_ptr<TConfiguration> config)
115 : TVirtualTransport(config),
116 port_(0),
117 socket_(THRIFT_INVALID_SOCKET),
118 peerPort_(0),
119 connTimeout_(0),
120 sendTimeout_(0),
121 recvTimeout_(0),
122 keepAlive_(false),
123 lingerOn_(1),
124 lingerVal_(0),
125 noDelay_(1),
126 maxRecvRetries_(5) {
127 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
128 }
129
TSocket(THRIFT_SOCKET socket,std::shared_ptr<TConfiguration> config)130 TSocket::TSocket(THRIFT_SOCKET socket, std::shared_ptr<TConfiguration> config)
131 : TVirtualTransport(config),
132 port_(0),
133 socket_(socket),
134 peerPort_(0),
135 connTimeout_(0),
136 sendTimeout_(0),
137 recvTimeout_(0),
138 keepAlive_(false),
139 lingerOn_(1),
140 lingerVal_(0),
141 noDelay_(1),
142 maxRecvRetries_(5) {
143 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
144 #ifdef SO_NOSIGPIPE
145 {
146 int one = 1;
147 setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
148 }
149 #endif
150 }
151
TSocket(THRIFT_SOCKET socket,std::shared_ptr<THRIFT_SOCKET> interruptListener,std::shared_ptr<TConfiguration> config)152 TSocket::TSocket(THRIFT_SOCKET socket, std::shared_ptr<THRIFT_SOCKET> interruptListener,
153 std::shared_ptr<TConfiguration> config)
154 : TVirtualTransport(config),
155 port_(0),
156 socket_(socket),
157 peerPort_(0),
158 interruptListener_(interruptListener),
159 connTimeout_(0),
160 sendTimeout_(0),
161 recvTimeout_(0),
162 keepAlive_(false),
163 lingerOn_(1),
164 lingerVal_(0),
165 noDelay_(1),
166 maxRecvRetries_(5) {
167 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
168 #ifdef SO_NOSIGPIPE
169 {
170 int one = 1;
171 setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
172 }
173 #endif
174 }
175
~TSocket()176 TSocket::~TSocket() {
177 close();
178 }
179
hasPendingDataToRead()180 bool TSocket::hasPendingDataToRead() {
181 if (!isOpen()) {
182 return false;
183 }
184
185 int32_t retries = 0;
186 THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE numBytesAvailable;
187 try_again:
188 int r = THRIFT_IOCTL_SOCKET(socket_, FIONREAD, &numBytesAvailable);
189 if (r == -1) {
190 int errno_copy = THRIFT_GET_SOCKET_ERROR;
191 if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
192 goto try_again;
193 }
194 GlobalOutput.perror("TSocket::hasPendingDataToRead() THRIFT_IOCTL_SOCKET() " + getSocketInfo(), errno_copy);
195 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
196 }
197 return numBytesAvailable > 0;
198 }
199
isOpen() const200 bool TSocket::isOpen() const {
201 return (socket_ != THRIFT_INVALID_SOCKET);
202 }
203
peek()204 bool TSocket::peek() {
205 if (!isOpen()) {
206 return false;
207 }
208 if (interruptListener_) {
209 for (int retries = 0;;) {
210 struct THRIFT_POLLFD fds[2];
211 std::memset(fds, 0, sizeof(fds));
212 fds[0].fd = socket_;
213 fds[0].events = THRIFT_POLLIN;
214 fds[1].fd = *(interruptListener_.get());
215 fds[1].events = THRIFT_POLLIN;
216 int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
217 int errno_copy = THRIFT_GET_SOCKET_ERROR;
218 if (ret < 0) {
219 // error cases
220 if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
221 continue;
222 }
223 GlobalOutput.perror("TSocket::peek() THRIFT_POLL() ", errno_copy);
224 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
225 } else if (ret > 0) {
226 // Check the interruptListener
227 if (fds[1].revents & THRIFT_POLLIN) {
228 return false;
229 }
230 // There must be data or a disconnection, fall through to the PEEK
231 break;
232 } else {
233 // timeout
234 return false;
235 }
236 }
237 }
238
239 // Check to see if data is available or if the remote side closed
240 uint8_t buf;
241 int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
242 if (r == -1) {
243 int errno_copy = THRIFT_GET_SOCKET_ERROR;
244 #if defined __FreeBSD__ || defined __MACH__
245 /* shigin:
246 * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by
247 * the other side
248 */
249 if (errno_copy == THRIFT_ECONNRESET) {
250 return false;
251 }
252 #endif
253 GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
254 throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
255 }
256 return (r > 0);
257 }
258
openConnection(struct addrinfo * res)259 void TSocket::openConnection(struct addrinfo* res) {
260
261 if (isOpen()) {
262 return;
263 }
264
265 if (!path_.empty()) {
266 socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
267 } else {
268 socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
269 }
270
271 if (socket_ == THRIFT_INVALID_SOCKET) {
272 int errno_copy = THRIFT_GET_SOCKET_ERROR;
273 GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
274 throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
275 }
276
277 // Send timeout
278 if (sendTimeout_ > 0) {
279 setSendTimeout(sendTimeout_);
280 }
281
282 // Recv timeout
283 if (recvTimeout_ > 0) {
284 setRecvTimeout(recvTimeout_);
285 }
286
287 if (keepAlive_) {
288 setKeepAlive(keepAlive_);
289 }
290
291 // Linger
292 setLinger(lingerOn_, lingerVal_);
293
294 // No delay
295 setNoDelay(noDelay_);
296
297 #ifdef SO_NOSIGPIPE
298 {
299 int one = 1;
300 setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
301 }
302 #endif
303
304 // Uses a low min RTO if asked to.
305 #ifdef TCP_LOW_MIN_RTO
306 if (getUseLowMinRto()) {
307 int one = 1;
308 setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
309 }
310 #endif
311
312 // Set the socket to be non blocking for connect if a timeout exists
313 int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0);
314 if (connTimeout_ > 0) {
315 if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
316 int errno_copy = THRIFT_GET_SOCKET_ERROR;
317 GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy);
318 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
319 }
320 } else {
321 if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) {
322 int errno_copy = THRIFT_GET_SOCKET_ERROR;
323 GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
324 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
325 }
326 }
327
328 // Connect the socket
329 int ret;
330 if (!path_.empty()) {
331
332 /*
333 * TODO: seems that windows now support unix sockets,
334 * see: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
335 */
336 #ifndef _WIN32
337
338 struct sockaddr_un address;
339 socklen_t structlen = fillUnixSocketAddr(address, path_);
340
341 ret = connect(socket_, (struct sockaddr*)&address, structlen);
342 #else
343 GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
344 throw TTransportException(TTransportException::NOT_OPEN,
345 " Unix Domain socket path not supported");
346 #endif
347
348 } else {
349 ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
350 }
351
352 // success case
353 if (ret == 0) {
354 goto done;
355 }
356
357 if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS)
358 && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK)) {
359 int errno_copy = THRIFT_GET_SOCKET_ERROR;
360 GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
361 throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
362 }
363
364 struct THRIFT_POLLFD fds[1];
365 std::memset(fds, 0, sizeof(fds));
366 fds[0].fd = socket_;
367 fds[0].events = THRIFT_POLLOUT;
368 ret = THRIFT_POLL(fds, 1, connTimeout_);
369
370 if (ret > 0) {
371 // Ensure the socket is connected and that there are no errors set
372 int val;
373 socklen_t lon;
374 lon = sizeof(int);
375 int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);
376 if (ret2 == -1) {
377 int errno_copy = THRIFT_GET_SOCKET_ERROR;
378 GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
379 throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
380 }
381 // no errors on socket, go to town
382 if (val == 0) {
383 goto done;
384 }
385 GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(),
386 val);
387 throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
388 } else if (ret == 0) {
389 // socket timed out
390 string errStr = "TSocket::open() timed out " + getSocketInfo();
391 GlobalOutput(errStr.c_str());
392 throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
393 } else {
394 // error on THRIFT_POLL()
395 int errno_copy = THRIFT_GET_SOCKET_ERROR;
396 GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy);
397 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy);
398 }
399
400 done:
401 // Set socket back to normal mode (blocking)
402 if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags)) {
403 int errno_copy = THRIFT_GET_SOCKET_ERROR;
404 GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
405 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
406 }
407
408 if (path_.empty()) {
409 setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen));
410 }
411 }
412
open()413 void TSocket::open() {
414 if (isOpen()) {
415 return;
416 }
417 if (!path_.empty()) {
418 unix_open();
419 } else {
420 local_open();
421 }
422 }
423
unix_open()424 void TSocket::unix_open() {
425 if (!path_.empty()) {
426 // Unix Domain Socket does not need addrinfo struct, so we pass NULL
427 openConnection(nullptr);
428 }
429 }
430
local_open()431 void TSocket::local_open() {
432
433 #ifdef _WIN32
434 TWinsockSingleton::create();
435 #endif // _WIN32
436
437 if (isOpen()) {
438 return;
439 }
440
441 // Validate port number
442 if (port_ < 0 || port_ > 0xFFFF) {
443 throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
444 }
445
446 struct addrinfo hints, *res, *res0;
447 res = nullptr;
448 res0 = nullptr;
449 int error;
450 char port[sizeof("65535")];
451 std::memset(&hints, 0, sizeof(hints));
452 hints.ai_family = PF_UNSPEC;
453 hints.ai_socktype = SOCK_STREAM;
454 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
455 sprintf(port, "%d", port_);
456
457 error = getaddrinfo(host_.c_str(), port, &hints, &res0);
458
459 if (
460 #ifdef _WIN32
461 error == WSANO_DATA
462 #else
463 error == EAI_NODATA
464 #endif
465 ) {
466 hints.ai_flags &= ~AI_ADDRCONFIG;
467 error = getaddrinfo(host_.c_str(), port, &hints, &res0);
468 }
469
470 if (error) {
471 string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo()
472 + string(THRIFT_GAI_STRERROR(error));
473 GlobalOutput(errStr.c_str());
474 close();
475 throw TTransportException(TTransportException::NOT_OPEN,
476 "Could not resolve host for client socket.");
477 }
478
479 // Cycle through all the returned addresses until one
480 // connects or push the exception up.
481 for (res = res0; res; res = res->ai_next) {
482 try {
483 openConnection(res);
484 break;
485 } catch (TTransportException&) {
486 if (res->ai_next) {
487 close();
488 } else {
489 close();
490 freeaddrinfo(res0); // cleanup on failure
491 throw;
492 }
493 }
494 }
495
496 // Free address structure memory
497 freeaddrinfo(res0);
498 }
499
close()500 void TSocket::close() {
501 if (socket_ != THRIFT_INVALID_SOCKET) {
502 shutdown(socket_, THRIFT_SHUT_RDWR);
503 ::THRIFT_CLOSESOCKET(socket_);
504 }
505 socket_ = THRIFT_INVALID_SOCKET;
506 }
507
setSocketFD(THRIFT_SOCKET socket)508 void TSocket::setSocketFD(THRIFT_SOCKET socket) {
509 if (socket_ != THRIFT_INVALID_SOCKET) {
510 close();
511 }
512 socket_ = socket;
513 }
514
read(uint8_t * buf,uint32_t len)515 uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
516 checkReadBytesAvailable(len);
517 if (socket_ == THRIFT_INVALID_SOCKET) {
518 throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
519 }
520
521 int32_t retries = 0;
522
523 // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
524 // the system is out of resources (an awesome undocumented feature).
525 // The following is an approximation of the time interval under which
526 // THRIFT_EAGAIN is taken to indicate an out of resources error.
527 uint32_t eagainThresholdMicros = 0;
528 if (recvTimeout_) {
529 // if a readTimeout is specified along with a max number of recv retries, then
530 // the threshold will ensure that the read timeout is not exceeded even in the
531 // case of resource errors
532 eagainThresholdMicros = (recvTimeout_ * 1000) / ((maxRecvRetries_ > 0) ? maxRecvRetries_ : 2);
533 }
534
535 try_again:
536 // Read from the socket
537 struct timeval begin;
538 if (recvTimeout_ > 0) {
539 THRIFT_GETTIMEOFDAY(&begin, nullptr);
540 } else {
541 // if there is no read timeout we don't need the TOD to determine whether
542 // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
543 begin.tv_sec = begin.tv_usec = 0;
544 }
545
546 int got = 0;
547
548 if (interruptListener_) {
549 struct THRIFT_POLLFD fds[2];
550 std::memset(fds, 0, sizeof(fds));
551 fds[0].fd = socket_;
552 fds[0].events = THRIFT_POLLIN;
553 fds[1].fd = *(interruptListener_.get());
554 fds[1].events = THRIFT_POLLIN;
555
556 int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
557 int errno_copy = THRIFT_GET_SOCKET_ERROR;
558 if (ret < 0) {
559 // error cases
560 if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
561 goto try_again;
562 }
563 GlobalOutput.perror("TSocket::read() THRIFT_POLL() ", errno_copy);
564 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
565 } else if (ret > 0) {
566 // Check the interruptListener
567 if (fds[1].revents & THRIFT_POLLIN) {
568 throw TTransportException(TTransportException::INTERRUPTED, "Interrupted");
569 }
570 } else /* ret == 0 */ {
571 throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_EAGAIN (timed out)");
572 }
573
574 // falling through means there is something to recv and it cannot block
575 }
576
577 got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
578 // THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
579 int errno_copy = THRIFT_GET_SOCKET_ERROR;
580
581 // Check for error on read
582 if (got < 0) {
583 if (errno_copy == THRIFT_EAGAIN) {
584 // if no timeout we can assume that resource exhaustion has occurred.
585 if (recvTimeout_ == 0) {
586 throw TTransportException(TTransportException::TIMED_OUT,
587 "THRIFT_EAGAIN (unavailable resources)");
588 }
589 // check if this is the lack of resources or timeout case
590 struct timeval end;
591 THRIFT_GETTIMEOFDAY(&end, nullptr);
592 auto readElapsedMicros = static_cast<uint32_t>(((end.tv_sec - begin.tv_sec) * 1000 * 1000)
593 + (end.tv_usec - begin.tv_usec));
594
595 if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
596 if (retries++ < maxRecvRetries_) {
597 THRIFT_SLEEP_USEC(50);
598 goto try_again;
599 } else {
600 throw TTransportException(TTransportException::TIMED_OUT,
601 "THRIFT_EAGAIN (unavailable resources)");
602 }
603 } else {
604 // infer that timeout has been hit
605 throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_EAGAIN (timed out)");
606 }
607 }
608
609 // If interrupted, try again
610 if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_) {
611 goto try_again;
612 }
613
614 if (errno_copy == THRIFT_ECONNRESET) {
615 return 0;
616 }
617
618 // This ish isn't open
619 if (errno_copy == THRIFT_ENOTCONN) {
620 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN");
621 }
622
623 // Timed out!
624 if (errno_copy == THRIFT_ETIMEDOUT) {
625 throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
626 }
627
628 // Now it's not a try again case, but a real probblez
629 GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
630
631 // Some other error, whatevz
632 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
633 }
634
635 return got;
636 }
637
write(const uint8_t * buf,uint32_t len)638 void TSocket::write(const uint8_t* buf, uint32_t len) {
639 uint32_t sent = 0;
640
641 while (sent < len) {
642 uint32_t b = write_partial(buf + sent, len - sent);
643 if (b == 0) {
644 // This should only happen if the timeout set with SO_SNDTIMEO expired.
645 // Raise an exception.
646 throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired");
647 }
648 sent += b;
649 }
650 }
651
write_partial(const uint8_t * buf,uint32_t len)652 uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
653 if (socket_ == THRIFT_INVALID_SOCKET) {
654 throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
655 }
656
657 uint32_t sent = 0;
658
659 int flags = 0;
660 #ifdef MSG_NOSIGNAL
661 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
662 // check for the THRIFT_EPIPE return condition and close the socket in that case
663 flags |= MSG_NOSIGNAL;
664 #endif // ifdef MSG_NOSIGNAL
665
666 int b = static_cast<int>(send(socket_, const_cast_sockopt(buf + sent), len - sent, flags));
667
668 if (b < 0) {
669 if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN) {
670 return 0;
671 }
672 // Fail on a send error
673 int errno_copy = THRIFT_GET_SOCKET_ERROR;
674 GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy);
675
676 if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET
677 || errno_copy == THRIFT_ENOTCONN) {
678 throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
679 }
680
681 throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
682 }
683
684 // Fail on blocked send
685 if (b == 0) {
686 throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
687 }
688 return b;
689 }
690
getHost()691 std::string TSocket::getHost() {
692 return host_;
693 }
694
getPort()695 int TSocket::getPort() {
696 return port_;
697 }
698
getPath()699 std::string TSocket::getPath() {
700 return path_;
701 }
702
setHost(string host)703 void TSocket::setHost(string host) {
704 host_ = host;
705 }
706
setPort(int port)707 void TSocket::setPort(int port) {
708 port_ = port;
709 }
710
setPath(std::string path)711 void TSocket::setPath(std::string path) {
712 path_ = path;
713 }
714
setLinger(bool on,int linger)715 void TSocket::setLinger(bool on, int linger) {
716 lingerOn_ = on;
717 lingerVal_ = linger;
718 if (socket_ == THRIFT_INVALID_SOCKET) {
719 return;
720 }
721
722 #ifndef _WIN32
723 struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
724 #else
725 struct linger l = {static_cast<u_short>(lingerOn_ ? 1 : 0), static_cast<u_short>(lingerVal_)};
726 #endif
727
728 int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l));
729 if (ret == -1) {
730 int errno_copy
731 = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
732 GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
733 }
734 }
735
setNoDelay(bool noDelay)736 void TSocket::setNoDelay(bool noDelay) {
737 noDelay_ = noDelay;
738 if (socket_ == THRIFT_INVALID_SOCKET || !path_.empty()) {
739 return;
740 }
741
742 // Set socket to NODELAY
743 int v = noDelay_ ? 1 : 0;
744 int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v));
745 if (ret == -1) {
746 int errno_copy
747 = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
748 GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
749 }
750 }
751
setConnTimeout(int ms)752 void TSocket::setConnTimeout(int ms) {
753 connTimeout_ = ms;
754 }
755
setGenericTimeout(THRIFT_SOCKET s,int timeout_ms,int optname)756 void setGenericTimeout(THRIFT_SOCKET s, int timeout_ms, int optname) {
757 if (timeout_ms < 0) {
758 char errBuf[512];
759 sprintf(errBuf, "TSocket::setGenericTimeout with negative input: %d\n", timeout_ms);
760 GlobalOutput(errBuf);
761 return;
762 }
763
764 if (s == THRIFT_INVALID_SOCKET) {
765 return;
766 }
767
768 #ifdef _WIN32
769 DWORD platform_time = static_cast<DWORD>(timeout_ms);
770 #else
771 struct timeval platform_time = {(int)(timeout_ms / 1000), (int)((timeout_ms % 1000) * 1000)};
772 #endif
773
774 int ret = setsockopt(s, SOL_SOCKET, optname, cast_sockopt(&platform_time), sizeof(platform_time));
775 if (ret == -1) {
776 int errno_copy
777 = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
778 GlobalOutput.perror("TSocket::setGenericTimeout() setsockopt() ", errno_copy);
779 }
780 }
781
setRecvTimeout(int ms)782 void TSocket::setRecvTimeout(int ms) {
783 setGenericTimeout(socket_, ms, SO_RCVTIMEO);
784 recvTimeout_ = ms;
785 }
786
setSendTimeout(int ms)787 void TSocket::setSendTimeout(int ms) {
788 setGenericTimeout(socket_, ms, SO_SNDTIMEO);
789 sendTimeout_ = ms;
790 }
791
setKeepAlive(bool keepAlive)792 void TSocket::setKeepAlive(bool keepAlive) {
793 keepAlive_ = keepAlive;
794
795 if (socket_ == THRIFT_INVALID_SOCKET) {
796 return;
797 }
798
799 int value = keepAlive_;
800 int ret
801 = setsockopt(socket_, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&value), sizeof(value));
802
803 if (ret == -1) {
804 int errno_copy
805 = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
806 GlobalOutput.perror("TSocket::setKeepAlive() setsockopt() " + getSocketInfo(), errno_copy);
807 }
808 }
809
setMaxRecvRetries(int maxRecvRetries)810 void TSocket::setMaxRecvRetries(int maxRecvRetries) {
811 maxRecvRetries_ = maxRecvRetries;
812 }
813
getSocketInfo() const814 string TSocket::getSocketInfo() const {
815 std::ostringstream oss;
816 if (path_.empty()) {
817 if (host_.empty() || port_ == 0) {
818 oss << "<Host: " << getPeerAddress();
819 oss << " Port: " << getPeerPort() << ">";
820 } else {
821 oss << "<Host: " << host_ << " Port: " << port_ << ">";
822 }
823 } else {
824 std::string fmt_path_ = path_;
825 // Handle printing abstract sockets (first character is a '\0' char):
826 if (!fmt_path_.empty() && fmt_path_[0] == '\0')
827 fmt_path_[0] = '@';
828 oss << "<Path: " << fmt_path_ << ">";
829 }
830 return oss.str();
831 }
832
getPeerHost() const833 std::string TSocket::getPeerHost() const {
834 if (peerHost_.empty() && path_.empty()) {
835 struct sockaddr_storage addr;
836 struct sockaddr* addrPtr;
837 socklen_t addrLen;
838
839 if (socket_ == THRIFT_INVALID_SOCKET) {
840 return host_;
841 }
842
843 addrPtr = getCachedAddress(&addrLen);
844
845 if (addrPtr == nullptr) {
846 addrLen = sizeof(addr);
847 if (getpeername(socket_, (sockaddr*)&addr, &addrLen) != 0) {
848 return peerHost_;
849 }
850 addrPtr = (sockaddr*)&addr;
851
852 const_cast<TSocket&>(*this).setCachedAddress(addrPtr, addrLen);
853 }
854
855 char clienthost[NI_MAXHOST];
856 char clientservice[NI_MAXSERV];
857
858 getnameinfo((sockaddr*)addrPtr,
859 addrLen,
860 clienthost,
861 sizeof(clienthost),
862 clientservice,
863 sizeof(clientservice),
864 0);
865
866 peerHost_ = clienthost;
867 }
868 return peerHost_;
869 }
870
getPeerAddress() const871 std::string TSocket::getPeerAddress() const {
872 if (peerAddress_.empty() && path_.empty()) {
873 struct sockaddr_storage addr;
874 struct sockaddr* addrPtr;
875 socklen_t addrLen;
876
877 if (socket_ == THRIFT_INVALID_SOCKET) {
878 return peerAddress_;
879 }
880
881 addrPtr = getCachedAddress(&addrLen);
882
883 if (addrPtr == nullptr) {
884 addrLen = sizeof(addr);
885 if (getpeername(socket_, (sockaddr*)&addr, &addrLen) != 0) {
886 return peerAddress_;
887 }
888 addrPtr = (sockaddr*)&addr;
889
890 const_cast<TSocket&>(*this).setCachedAddress(addrPtr, addrLen);
891 }
892
893 char clienthost[NI_MAXHOST];
894 char clientservice[NI_MAXSERV];
895
896 getnameinfo(addrPtr,
897 addrLen,
898 clienthost,
899 sizeof(clienthost),
900 clientservice,
901 sizeof(clientservice),
902 NI_NUMERICHOST | NI_NUMERICSERV);
903
904 peerAddress_ = clienthost;
905 peerPort_ = std::atoi(clientservice);
906 }
907 return peerAddress_;
908 }
909
getPeerPort() const910 int TSocket::getPeerPort() const {
911 getPeerAddress();
912 return peerPort_;
913 }
914
setCachedAddress(const sockaddr * addr,socklen_t len)915 void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
916 if (!path_.empty()) {
917 return;
918 }
919
920 switch (addr->sa_family) {
921 case AF_INET:
922 if (len == sizeof(sockaddr_in)) {
923 memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
924 }
925 break;
926
927 case AF_INET6:
928 if (len == sizeof(sockaddr_in6)) {
929 memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
930 }
931 break;
932 }
933 peerAddress_.clear();
934 peerHost_.clear();
935 }
936
getCachedAddress(socklen_t * len) const937 sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
938 switch (cachedPeerAddr_.ipv4.sin_family) {
939 case AF_INET:
940 *len = sizeof(sockaddr_in);
941 return (sockaddr*)&cachedPeerAddr_.ipv4;
942
943 case AF_INET6:
944 *len = sizeof(sockaddr_in6);
945 return (sockaddr*)&cachedPeerAddr_.ipv6;
946
947 default:
948 return nullptr;
949 }
950 }
951
952 bool TSocket::useLowMinRto_ = false;
setUseLowMinRto(bool useLowMinRto)953 void TSocket::setUseLowMinRto(bool useLowMinRto) {
954 useLowMinRto_ = useLowMinRto;
955 }
getUseLowMinRto()956 bool TSocket::getUseLowMinRto() {
957 return useLowMinRto_;
958 }
959
getOrigin() const960 const std::string TSocket::getOrigin() const {
961 std::ostringstream oss;
962 oss << getPeerHost() << ":" << getPeerPort();
963 return oss.str();
964 }
965 }
966 }
967 } // apache::thrift::transport
968