1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <cstring>
23 #include <memory>
24 #include <stdexcept>
25 #include <sys/types.h>
26 #ifdef HAVE_SYS_SOCKET_H
27 #include <sys/socket.h>
28 #endif
29 #ifdef HAVE_SYS_UN_H
30 #include <sys/un.h>
31 #endif
32 #ifdef HAVE_SYS_POLL_H
33 #include <sys/poll.h>
34 #endif
35 #ifdef HAVE_NETINET_IN_H
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #endif
39 #ifdef HAVE_NETDB_H
40 #include <netdb.h>
41 #endif
42 #include <fcntl.h>
43 #ifdef HAVE_UNISTD_H
44 #include <unistd.h>
45 #endif
46 
47 #include <thrift/transport/PlatformSocket.h>
48 #include <thrift/transport/TNonblockingServerSocket.h>
49 #include <thrift/transport/TSocket.h>
50 #include <thrift/transport/TSocketUtils.h>
51 #include <thrift/transport/SocketCommon.h>
52 
53 #ifndef AF_LOCAL
54 #define AF_LOCAL AF_UNIX
55 #endif
56 
57 #ifndef SOCKOPT_CAST_T
58 #ifndef _WIN32
59 #define SOCKOPT_CAST_T void
60 #else
61 #define SOCKOPT_CAST_T char
62 #endif // _WIN32
63 #endif
64 
65 template <class T>
const_cast_sockopt(const T * v)66 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
67   return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
68 }
69 
70 template <class T>
cast_sockopt(T * v)71 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
72   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
73 }
74 
75 namespace apache {
76 namespace thrift {
77 namespace transport {
78 
79 using std::shared_ptr;
80 using std::string;
81 
TNonblockingServerSocket(int port)82 TNonblockingServerSocket::TNonblockingServerSocket(int port)
83   : port_(port),
84     listenPort_(port),
85     serverSocket_(THRIFT_INVALID_SOCKET),
86     acceptBacklog_(DEFAULT_BACKLOG),
87     sendTimeout_(0),
88     recvTimeout_(0),
89     retryLimit_(0),
90     retryDelay_(0),
91     tcpSendBuffer_(0),
92     tcpRecvBuffer_(0),
93     keepAlive_(false),
94     listening_(false) {
95 }
96 
TNonblockingServerSocket(int port,int sendTimeout,int recvTimeout)97 TNonblockingServerSocket::TNonblockingServerSocket(int port, int sendTimeout, int recvTimeout)
98   : port_(port),
99     listenPort_(port),
100     serverSocket_(THRIFT_INVALID_SOCKET),
101     acceptBacklog_(DEFAULT_BACKLOG),
102     sendTimeout_(sendTimeout),
103     recvTimeout_(recvTimeout),
104     retryLimit_(0),
105     retryDelay_(0),
106     tcpSendBuffer_(0),
107     tcpRecvBuffer_(0),
108     keepAlive_(false),
109     listening_(false) {
110 }
111 
TNonblockingServerSocket(const string & address,int port)112 TNonblockingServerSocket::TNonblockingServerSocket(const string& address, int port)
113   : port_(port),
114     listenPort_(port),
115     address_(address),
116     serverSocket_(THRIFT_INVALID_SOCKET),
117     acceptBacklog_(DEFAULT_BACKLOG),
118     sendTimeout_(0),
119     recvTimeout_(0),
120     retryLimit_(0),
121     retryDelay_(0),
122     tcpSendBuffer_(0),
123     tcpRecvBuffer_(0),
124     keepAlive_(false),
125     listening_(false) {
126 }
127 
TNonblockingServerSocket(const string & path)128 TNonblockingServerSocket::TNonblockingServerSocket(const string& path)
129   : port_(0),
130     listenPort_(0),
131     path_(path),
132     serverSocket_(THRIFT_INVALID_SOCKET),
133     acceptBacklog_(DEFAULT_BACKLOG),
134     sendTimeout_(0),
135     recvTimeout_(0),
136     retryLimit_(0),
137     retryDelay_(0),
138     tcpSendBuffer_(0),
139     tcpRecvBuffer_(0),
140     keepAlive_(false),
141     listening_(false) {
142 }
143 
~TNonblockingServerSocket()144 TNonblockingServerSocket::~TNonblockingServerSocket() {
145   close();
146 }
147 
setSendTimeout(int sendTimeout)148 void TNonblockingServerSocket::setSendTimeout(int sendTimeout) {
149   sendTimeout_ = sendTimeout;
150 }
151 
setRecvTimeout(int recvTimeout)152 void TNonblockingServerSocket::setRecvTimeout(int recvTimeout) {
153   recvTimeout_ = recvTimeout;
154 }
155 
setAcceptBacklog(int accBacklog)156 void TNonblockingServerSocket::setAcceptBacklog(int accBacklog) {
157   acceptBacklog_ = accBacklog;
158 }
159 
setRetryLimit(int retryLimit)160 void TNonblockingServerSocket::setRetryLimit(int retryLimit) {
161   retryLimit_ = retryLimit;
162 }
163 
setRetryDelay(int retryDelay)164 void TNonblockingServerSocket::setRetryDelay(int retryDelay) {
165   retryDelay_ = retryDelay;
166 }
167 
setTcpSendBuffer(int tcpSendBuffer)168 void TNonblockingServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
169   tcpSendBuffer_ = tcpSendBuffer;
170 }
171 
setTcpRecvBuffer(int tcpRecvBuffer)172 void TNonblockingServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
173   tcpRecvBuffer_ = tcpRecvBuffer;
174 }
175 
_setup_sockopts()176 void TNonblockingServerSocket::_setup_sockopts() {
177 
178   // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
179   int one = 1;
180   if (-1 == setsockopt(serverSocket_,
181                        SOL_SOCKET,
182                        THRIFT_NO_SOCKET_CACHING,
183                        cast_sockopt(&one),
184                        sizeof(one))) {
185 // ignore errors coming out of this setsockopt on Windows.  This is because
186 // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
187 // want to force servers to be an admin.
188 #ifndef _WIN32
189     int errno_copy = THRIFT_GET_SOCKET_ERROR;
190     GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
191                         errno_copy);
192     close();
193     throw TTransportException(TTransportException::NOT_OPEN,
194                               "Could not set THRIFT_NO_SOCKET_CACHING",
195                               errno_copy);
196 #endif
197   }
198 
199   // Set TCP buffer sizes
200   if (tcpSendBuffer_ > 0) {
201     if (-1 == setsockopt(serverSocket_,
202                          SOL_SOCKET,
203                          SO_SNDBUF,
204                          cast_sockopt(&tcpSendBuffer_),
205                          sizeof(tcpSendBuffer_))) {
206       int errno_copy = THRIFT_GET_SOCKET_ERROR;
207       GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
208       close();
209       throw TTransportException(TTransportException::NOT_OPEN,
210                                 "Could not set SO_SNDBUF",
211                                 errno_copy);
212     }
213   }
214 
215   if (tcpRecvBuffer_ > 0) {
216     if (-1 == setsockopt(serverSocket_,
217                          SOL_SOCKET,
218                          SO_RCVBUF,
219                          cast_sockopt(&tcpRecvBuffer_),
220                          sizeof(tcpRecvBuffer_))) {
221       int errno_copy = THRIFT_GET_SOCKET_ERROR;
222       GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
223       close();
224       throw TTransportException(TTransportException::NOT_OPEN,
225                                 "Could not set SO_RCVBUF",
226                                 errno_copy);
227     }
228   }
229 
230   // Turn linger off, don't want to block on calls to close
231   struct linger ling = {0, 0};
232   if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) {
233     int errno_copy = THRIFT_GET_SOCKET_ERROR;
234     GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
235     close();
236     throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
237   }
238 
239   // Keepalive to ensure full result flushing
240   if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one))) {
241     int errno_copy = THRIFT_GET_SOCKET_ERROR;
242     GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_KEEPALIVE ", errno_copy);
243     close();
244     throw TTransportException(TTransportException::NOT_OPEN,
245       "Could not set TCP_NODELAY",
246       errno_copy);
247   }
248 
249   // Set NONBLOCK on the accept socket
250   int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
251   if (flags == -1) {
252     int errno_copy = THRIFT_GET_SOCKET_ERROR;
253     GlobalOutput.perror("TNonblockingServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
254     close();
255     throw TTransportException(TTransportException::NOT_OPEN,
256                               "THRIFT_FCNTL() THRIFT_F_GETFL failed",
257                               errno_copy);
258   }
259 
260   if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
261     int errno_copy = THRIFT_GET_SOCKET_ERROR;
262     GlobalOutput.perror("TNonblockingServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
263     close();
264     throw TTransportException(TTransportException::NOT_OPEN,
265                               "THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
266                               errno_copy);
267   }
268 
269 } // _setup_sockopts()
270 
_setup_tcp_sockopts()271 void TNonblockingServerSocket::_setup_tcp_sockopts() {
272   int one = 1;
273 
274   // Set TCP nodelay if available, MAC OS X Hack
275   // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
276 #ifndef TCP_NOPUSH
277   // TCP Nodelay, speed over bandwidth
278   if (-1
279       == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
280     int errno_copy = THRIFT_GET_SOCKET_ERROR;
281     GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
282     close();
283     throw TTransportException(TTransportException::NOT_OPEN,
284                               "Could not set TCP_NODELAY",
285                               errno_copy);
286   }
287 #endif
288 
289 #ifdef TCP_LOW_MIN_RTO
290   if (TSocket::getUseLowMinRto()) {
291     if (-1 == setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one))) {
292       int errno_copy = THRIFT_GET_SOCKET_ERROR;
293       GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_LOW_MIN_RTO ", errno_copy);
294       close();
295       throw TTransportException(TTransportException::NOT_OPEN,
296         "Could not set TCP_NODELAY",
297         errno_copy);
298     }
299   }
300 #endif
301 
302 } // _setup_tcp_sockopts()
303 
listen()304 void TNonblockingServerSocket::listen() {
305   listening_ = true;
306 #ifdef _WIN32
307   TWinsockSingleton::create();
308 #endif // _WIN32
309 
310   // tcp == false means Unix Domain socket
311   bool tcp = (path_.empty());
312 
313   // Validate port number
314   if (port_ < 0 || port_ > 0xFFFF) {
315     throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
316   }
317 
318   // Resolve host:port strings into an iterable of struct addrinfo*
319   AddressResolutionHelper resolved_addresses;
320   if (tcp) {
321     try {
322       resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM,
323                                  AI_PASSIVE | AI_V4MAPPED);
324     } catch (const std::system_error& e) {
325       GlobalOutput.printf("getaddrinfo() -> %d. %s", e.code().value(), e.what());
326       close();
327       throw TTransportException(TTransportException::NOT_OPEN,
328                                 "Could not resolve host for server socket.");
329     }
330   }
331 
332   // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
333   // always seem to work. The client can configure the retry variables.
334   int retries = 0;
335   int errno_copy = 0;
336 
337   if (!tcp) {
338     // -- Unix Domain Socket -- //
339 
340     serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
341 
342     if (serverSocket_ == THRIFT_INVALID_SOCKET) {
343       int errno_copy = THRIFT_GET_SOCKET_ERROR;
344       GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
345       close();
346       throw TTransportException(TTransportException::NOT_OPEN,
347                                 "Could not create server socket.",
348                                 errno_copy);
349     }
350 
351     _setup_sockopts();
352     //_setup_unixdomain_sockopts();
353 
354 /*
355  * TODO: seems that windows now support unix sockets,
356  *       see: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
357  */
358 #ifndef _WIN32
359 
360     struct sockaddr_un address;
361     socklen_t structlen = fillUnixSocketAddr(address, path_);
362 
363     do {
364       if (0 == ::bind(serverSocket_, (struct sockaddr*)&address, structlen)) {
365         break;
366       }
367       errno_copy = THRIFT_GET_SOCKET_ERROR;
368       // use short circuit evaluation here to only sleep if we need to
369     } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
370 #else
371     GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
372     throw TTransportException(TTransportException::NOT_OPEN,
373                               " Unix Domain socket path not supported");
374 #endif
375   } else {
376 
377     // -- TCP socket -- //
378 
379     auto addr_iter = AddressResolutionHelper::Iter{};
380 
381     // Via DNS or somehow else, single hostname can resolve into many addresses.
382     // Results may contain perhaps a mix of IPv4 and IPv6.  Here, we iterate
383     // over what system gave us, picking the first address that works.
384     do {
385       if (!addr_iter) {
386         // init + recycle over many retries
387         addr_iter = resolved_addresses.iterate();
388       }
389       auto trybind = *addr_iter++;
390 
391       serverSocket_ = socket(trybind->ai_family, trybind->ai_socktype, trybind->ai_protocol);
392       if (serverSocket_ == -1) {
393         errno_copy = THRIFT_GET_SOCKET_ERROR;
394         continue;
395       }
396 
397       _setup_sockopts();
398       _setup_tcp_sockopts();
399 
400 #ifdef IPV6_V6ONLY
401       if (trybind->ai_family == AF_INET6) {
402         int zero = 0;
403         if (-1 == setsockopt(serverSocket_,
404                              IPPROTO_IPV6,
405                              IPV6_V6ONLY,
406                              cast_sockopt(&zero),
407                              sizeof(zero))) {
408           GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
409         }
410       }
411 #endif // #ifdef IPV6_V6ONLY
412 
413       if (0 == ::bind(serverSocket_, trybind->ai_addr, static_cast<int>(trybind->ai_addrlen))) {
414         break;
415       }
416       errno_copy = THRIFT_GET_SOCKET_ERROR;
417 
418       // use short circuit evaluation here to only sleep if we need to
419     } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
420 
421     // retrieve bind info
422     if (port_ == 0 && retries <= retryLimit_) {
423       struct sockaddr_storage sa;
424       socklen_t len = sizeof(sa);
425       std::memset(&sa, 0, len);
426       if (::getsockname(serverSocket_, reinterpret_cast<struct sockaddr*>(&sa), &len) < 0) {
427         errno_copy = THRIFT_GET_SOCKET_ERROR;
428         GlobalOutput.perror("TNonblockingServerSocket::getPort() getsockname() ", errno_copy);
429       } else {
430         if (sa.ss_family == AF_INET6) {
431           const auto* sin = reinterpret_cast<const struct sockaddr_in6*>(&sa);
432           listenPort_ = ntohs(sin->sin6_port);
433         } else {
434           const auto* sin = reinterpret_cast<const struct sockaddr_in*>(&sa);
435           listenPort_ = ntohs(sin->sin_port);
436         }
437       }
438     }
439   } // TCP socket //
440 
441   // throw error if socket still wasn't created successfully
442   if (serverSocket_ == THRIFT_INVALID_SOCKET) {
443     GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
444     close();
445     throw TTransportException(TTransportException::NOT_OPEN,
446                               "Could not create server socket.",
447                               errno_copy);
448   }
449 
450   // throw an error if we failed to bind properly
451   if (retries > retryLimit_) {
452     char errbuf[1024];
453     if (!tcp) {
454       THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() PATH %s", path_.c_str());
455     } else {
456       THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() BIND %d", port_);
457     }
458     GlobalOutput(errbuf);
459     close();
460     throw TTransportException(TTransportException::NOT_OPEN,
461                               "Could not bind",
462                               errno_copy);
463   }
464 
465   if (listenCallback_)
466     listenCallback_(serverSocket_);
467 
468   // Call listen
469   if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
470     errno_copy = THRIFT_GET_SOCKET_ERROR;
471     GlobalOutput.perror("TNonblockingServerSocket::listen() listen() ", errno_copy);
472     close();
473     throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
474   }
475 
476   // The socket is now listening!
477 }
478 
getPort()479 int TNonblockingServerSocket::getPort() {
480   return port_;
481 }
482 
getListenPort()483 int TNonblockingServerSocket::getListenPort() {
484   return listenPort_;
485 }
486 
acceptImpl()487 shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() {
488   if (serverSocket_ == THRIFT_INVALID_SOCKET) {
489     throw TTransportException(TTransportException::NOT_OPEN,
490                               "TNonblockingServerSocket not listening");
491   }
492 
493   struct sockaddr_storage clientAddress;
494   int size = sizeof(clientAddress);
495   THRIFT_SOCKET clientSocket
496       = ::accept(serverSocket_, (struct sockaddr*)&clientAddress, (socklen_t*)&size);
497 
498   if (clientSocket == THRIFT_INVALID_SOCKET) {
499     int errno_copy = THRIFT_GET_SOCKET_ERROR;
500     GlobalOutput.perror("TNonblockingServerSocket::acceptImpl() ::accept() ", errno_copy);
501     throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
502   }
503 
504   // Explicitly set this socket to NONBLOCK mode
505   int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
506   if (flags == -1) {
507     int errno_copy = THRIFT_GET_SOCKET_ERROR;
508     ::THRIFT_CLOSESOCKET(clientSocket);
509     GlobalOutput.perror("TNonblockingServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
510     throw TTransportException(TTransportException::UNKNOWN,
511                               "THRIFT_FCNTL(THRIFT_F_GETFL)",
512                               errno_copy);
513   }
514 
515   if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
516     int errno_copy = THRIFT_GET_SOCKET_ERROR;
517     ::THRIFT_CLOSESOCKET(clientSocket);
518     GlobalOutput
519         .perror("TNonblockingServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ",
520                 errno_copy);
521     throw TTransportException(TTransportException::UNKNOWN,
522                               "THRIFT_FCNTL(THRIFT_F_SETFL)",
523                               errno_copy);
524   }
525 
526   shared_ptr<TSocket> client = createSocket(clientSocket);
527   if (sendTimeout_ > 0) {
528     client->setSendTimeout(sendTimeout_);
529   }
530   if (recvTimeout_ > 0) {
531     client->setRecvTimeout(recvTimeout_);
532   }
533   if (keepAlive_) {
534     client->setKeepAlive(keepAlive_);
535   }
536   client->setCachedAddress((sockaddr*)&clientAddress, size);
537 
538   if (acceptCallback_)
539     acceptCallback_(clientSocket);
540 
541   return client;
542 }
543 
createSocket(THRIFT_SOCKET clientSocket)544 shared_ptr<TSocket> TNonblockingServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
545   return std::make_shared<TSocket>(clientSocket);
546 }
547 
close()548 void TNonblockingServerSocket::close() {
549   if (serverSocket_ != THRIFT_INVALID_SOCKET) {
550     shutdown(serverSocket_, THRIFT_SHUT_RDWR);
551     ::THRIFT_CLOSESOCKET(serverSocket_);
552   }
553   serverSocket_ = THRIFT_INVALID_SOCKET;
554   listening_ = false;
555 }
556 } // namespace transport
557 } // namespace thrift
558 } // namespace apache
559