1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <cstring>
23 #include <memory>
24 #include <stdexcept>
25 #include <sys/types.h>
26 #ifdef HAVE_SYS_SOCKET_H
27 #include <sys/socket.h>
28 #endif
29 #ifdef HAVE_SYS_UN_H
30 #include <sys/un.h>
31 #endif
32 #ifdef HAVE_SYS_POLL_H
33 #include <sys/poll.h>
34 #endif
35 #ifdef HAVE_NETINET_IN_H
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #endif
39 #ifdef HAVE_NETDB_H
40 #include <netdb.h>
41 #endif
42 #include <fcntl.h>
43 #ifdef HAVE_UNISTD_H
44 #include <unistd.h>
45 #endif
46 #ifdef HAVE_SYS_STAT_H
47 #include <sys/stat.h>
48 #endif
49 
50 #include <thrift/transport/PlatformSocket.h>
51 #include <thrift/transport/TServerSocket.h>
52 #include <thrift/transport/TSocket.h>
53 #include <thrift/transport/TSocketUtils.h>
54 #include <thrift/transport/SocketCommon.h>
55 
56 #ifndef AF_LOCAL
57 #define AF_LOCAL AF_UNIX
58 #endif
59 
60 #ifndef SOCKOPT_CAST_T
61 #ifndef _WIN32
62 #define SOCKOPT_CAST_T void
63 #else
64 #define SOCKOPT_CAST_T char
65 #endif // _WIN32
66 #endif
67 
68 #ifdef _WIN32
69 // Including Windows.h can conflict with Winsock2 usage, and also
70 // adds problematic macros like min() and max(). Try to work around:
71 #define NOMINMAX
72 #define WIN32_LEAN_AND_MEAN
73 #include <Windows.h>
74 #undef NOMINMAX
75 #undef WIN32_LEAN_AND_MEAN
76 #endif
77 
78 template <class T>
const_cast_sockopt(const T * v)79 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
80   return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
81 }
82 
83 template <class T>
cast_sockopt(T * v)84 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
85   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
86 }
87 
destroyer_of_fine_sockets(THRIFT_SOCKET * ssock)88 void destroyer_of_fine_sockets(THRIFT_SOCKET* ssock) {
89   ::THRIFT_CLOSESOCKET(*ssock);
90   delete ssock;
91 }
92 
93 using std::string;
94 
95 namespace apache {
96 namespace thrift {
97 namespace transport {
98 
99 using std::shared_ptr;
100 
TServerSocket(int port)101 TServerSocket::TServerSocket(int port)
102   : interruptableChildren_(true),
103     port_(port),
104     serverSocket_(THRIFT_INVALID_SOCKET),
105     acceptBacklog_(DEFAULT_BACKLOG),
106     sendTimeout_(0),
107     recvTimeout_(0),
108     accTimeout_(-1),
109     retryLimit_(0),
110     retryDelay_(0),
111     tcpSendBuffer_(0),
112     tcpRecvBuffer_(0),
113     keepAlive_(false),
114     listening_(false),
115     interruptSockWriter_(THRIFT_INVALID_SOCKET),
116     interruptSockReader_(THRIFT_INVALID_SOCKET),
117     childInterruptSockWriter_(THRIFT_INVALID_SOCKET) {
118 }
119 
TServerSocket(int port,int sendTimeout,int recvTimeout)120 TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout)
121   : interruptableChildren_(true),
122     port_(port),
123     serverSocket_(THRIFT_INVALID_SOCKET),
124     acceptBacklog_(DEFAULT_BACKLOG),
125     sendTimeout_(sendTimeout),
126     recvTimeout_(recvTimeout),
127     accTimeout_(-1),
128     retryLimit_(0),
129     retryDelay_(0),
130     tcpSendBuffer_(0),
131     tcpRecvBuffer_(0),
132     keepAlive_(false),
133     listening_(false),
134     interruptSockWriter_(THRIFT_INVALID_SOCKET),
135     interruptSockReader_(THRIFT_INVALID_SOCKET),
136     childInterruptSockWriter_(THRIFT_INVALID_SOCKET) {
137 }
138 
TServerSocket(const string & address,int port)139 TServerSocket::TServerSocket(const string& address, int port)
140   : interruptableChildren_(true),
141     port_(port),
142     address_(address),
143     serverSocket_(THRIFT_INVALID_SOCKET),
144     acceptBacklog_(DEFAULT_BACKLOG),
145     sendTimeout_(0),
146     recvTimeout_(0),
147     accTimeout_(-1),
148     retryLimit_(0),
149     retryDelay_(0),
150     tcpSendBuffer_(0),
151     tcpRecvBuffer_(0),
152     keepAlive_(false),
153     listening_(false),
154     interruptSockWriter_(THRIFT_INVALID_SOCKET),
155     interruptSockReader_(THRIFT_INVALID_SOCKET),
156     childInterruptSockWriter_(THRIFT_INVALID_SOCKET) {
157 }
158 
TServerSocket(const string & path)159 TServerSocket::TServerSocket(const string& path)
160   : interruptableChildren_(true),
161     port_(0),
162     path_(path),
163     serverSocket_(THRIFT_INVALID_SOCKET),
164     acceptBacklog_(DEFAULT_BACKLOG),
165     sendTimeout_(0),
166     recvTimeout_(0),
167     accTimeout_(-1),
168     retryLimit_(0),
169     retryDelay_(0),
170     tcpSendBuffer_(0),
171     tcpRecvBuffer_(0),
172     keepAlive_(false),
173     listening_(false),
174     interruptSockWriter_(THRIFT_INVALID_SOCKET),
175     interruptSockReader_(THRIFT_INVALID_SOCKET),
176     childInterruptSockWriter_(THRIFT_INVALID_SOCKET) {
177 }
178 
~TServerSocket()179 TServerSocket::~TServerSocket() {
180   close();
181 }
182 
isOpen() const183 bool TServerSocket::isOpen() const {
184   if (serverSocket_ == THRIFT_INVALID_SOCKET)
185     return false;
186 
187   if (!listening_)
188     return false;
189 
190   if (!path_.empty() && (path_[0] != '\0')) {
191     // On some platforms the domain socket file may not be instantly
192     // available yet, i.e. the Windows file system can be slow. Therefore
193     // we should check that the domain socket file actually exists.
194 #ifdef _MSC_VER
195     // Currently there is a bug in ClangCl on Windows so the stat() call
196     // does not work. Workaround is a Windows-specific call if file exists:
197     DWORD const f_attrib = GetFileAttributesA(path_.c_str());
198     if (f_attrib == INVALID_FILE_ATTRIBUTES) {
199 #else
200     struct THRIFT_STAT path_info;
201     if (::THRIFT_STAT(path_.c_str(), &path_info) < 0) {
202 #endif
203       const std::string vError = "TServerSocket::isOpen(): The domain socket path '" + path_ + "' does not exist (yet).";
204       GlobalOutput.perror(vError.c_str(), THRIFT_GET_SOCKET_ERROR);
205       return false;
206     }
207   }
208 
209   return true;
210 }
211 
212 void TServerSocket::setSendTimeout(int sendTimeout) {
213   sendTimeout_ = sendTimeout;
214 }
215 
216 void TServerSocket::setRecvTimeout(int recvTimeout) {
217   recvTimeout_ = recvTimeout;
218 }
219 
220 void TServerSocket::setAcceptTimeout(int accTimeout) {
221   accTimeout_ = accTimeout;
222 }
223 
224 void TServerSocket::setAcceptBacklog(int accBacklog) {
225   acceptBacklog_ = accBacklog;
226 }
227 
228 void TServerSocket::setRetryLimit(int retryLimit) {
229   retryLimit_ = retryLimit;
230 }
231 
232 void TServerSocket::setRetryDelay(int retryDelay) {
233   retryDelay_ = retryDelay;
234 }
235 
236 void TServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
237   tcpSendBuffer_ = tcpSendBuffer;
238 }
239 
240 void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
241   tcpRecvBuffer_ = tcpRecvBuffer;
242 }
243 
244 void TServerSocket::setInterruptableChildren(bool enable) {
245   if (listening_) {
246     throw std::logic_error("setInterruptableChildren cannot be called after listen()");
247   }
248   interruptableChildren_ = enable;
249 }
250 
251 void TServerSocket::_setup_sockopts() {
252 
253   // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
254   int one = 1;
255   if (-1 == setsockopt(serverSocket_,
256                        SOL_SOCKET,
257                        THRIFT_NO_SOCKET_CACHING,
258                        cast_sockopt(&one),
259                        sizeof(one))) {
260 // ignore errors coming out of this setsockopt on Windows.  This is because
261 // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
262 // want to force servers to be an admin.
263 #ifndef _WIN32
264     int errno_copy = THRIFT_GET_SOCKET_ERROR;
265     GlobalOutput.perror("TServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
266                         errno_copy);
267     close();
268     throw TTransportException(TTransportException::NOT_OPEN,
269                               "Could not set THRIFT_NO_SOCKET_CACHING",
270                               errno_copy);
271 #endif
272   }
273 
274   // Set TCP buffer sizes
275   if (tcpSendBuffer_ > 0) {
276     if (-1 == setsockopt(serverSocket_,
277                          SOL_SOCKET,
278                          SO_SNDBUF,
279                          cast_sockopt(&tcpSendBuffer_),
280                          sizeof(tcpSendBuffer_))) {
281       int errno_copy = THRIFT_GET_SOCKET_ERROR;
282       GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
283       close();
284       throw TTransportException(TTransportException::NOT_OPEN,
285                                 "Could not set SO_SNDBUF",
286                                 errno_copy);
287     }
288   }
289 
290   if (tcpRecvBuffer_ > 0) {
291     if (-1 == setsockopt(serverSocket_,
292                          SOL_SOCKET,
293                          SO_RCVBUF,
294                          cast_sockopt(&tcpRecvBuffer_),
295                          sizeof(tcpRecvBuffer_))) {
296       int errno_copy = THRIFT_GET_SOCKET_ERROR;
297       GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
298       close();
299       throw TTransportException(TTransportException::NOT_OPEN,
300                                 "Could not set SO_RCVBUF",
301                                 errno_copy);
302     }
303   }
304 
305   // Turn linger off, don't want to block on calls to close
306   struct linger ling = {0, 0};
307   if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) {
308     int errno_copy = THRIFT_GET_SOCKET_ERROR;
309     GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
310     close();
311     throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
312   }
313 
314   // Set NONBLOCK on the accept socket
315   int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
316   if (flags == -1) {
317     int errno_copy = THRIFT_GET_SOCKET_ERROR;
318     GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
319     close();
320     throw TTransportException(TTransportException::NOT_OPEN,
321                               "THRIFT_FCNTL() THRIFT_F_GETFL failed",
322                               errno_copy);
323   }
324   if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
325     int errno_copy = THRIFT_GET_SOCKET_ERROR;
326     GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
327     close();
328     throw TTransportException(TTransportException::NOT_OPEN,
329                               "THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
330                               errno_copy);
331   }
332 }
333 
334 void TServerSocket::_setup_unixdomain_sockopts() {
335 }
336 
337 void TServerSocket::_setup_tcp_sockopts() {
338   int one = 1;
339 
340   // Defer accept
341 #ifdef TCP_DEFER_ACCEPT
342   if (path_.empty()) {
343     if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_DEFER_ACCEPT, &one, sizeof(one))) {
344       int errno_copy = THRIFT_GET_SOCKET_ERROR;
345       GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
346       close();
347       throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT",
348                                 errno_copy);
349     }
350   }
351 #endif // #ifdef TCP_DEFER_ACCEPT
352 
353   // TCP Nodelay, speed over bandwidth
354   if (-1
355       == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
356     int errno_copy = THRIFT_GET_SOCKET_ERROR;
357     GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
358     close();
359     throw TTransportException(TTransportException::NOT_OPEN,
360                               "Could not set TCP_NODELAY",
361                               errno_copy);
362   }
363 }
364 
365 void TServerSocket::listen() {
366 #ifdef _WIN32
367   TWinsockSingleton::create();
368 #endif // _WIN32
369 
370   THRIFT_SOCKET sv[2];
371   // Create the socket pair used to interrupt
372   if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
373     GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt",
374                         THRIFT_GET_SOCKET_ERROR);
375     interruptSockWriter_ = THRIFT_INVALID_SOCKET;
376     interruptSockReader_ = THRIFT_INVALID_SOCKET;
377   } else {
378     interruptSockWriter_ = sv[1];
379     interruptSockReader_ = sv[0];
380   }
381 
382   // Create the socket pair used to interrupt all clients
383   if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
384     GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt",
385                         THRIFT_GET_SOCKET_ERROR);
386     childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
387     pChildInterruptSockReader_.reset();
388   } else {
389     childInterruptSockWriter_ = sv[1];
390     pChildInterruptSockReader_
391         = std::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets);
392   }
393 
394   // tcp == false means Unix Domain socket
395   bool tcp = (path_.empty());
396 
397   // Validate port number
398   if (port_ < 0 || port_ > 0xFFFF) {
399     throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
400   }
401 
402   // Resolve host:port strings into an iterable of struct addrinfo*
403   AddressResolutionHelper resolved_addresses;
404   if (tcp) {
405     try {
406       resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM,
407                                  AI_PASSIVE | AI_V4MAPPED);
408     } catch (const std::system_error& e) {
409       GlobalOutput.printf("getaddrinfo() -> %d; %s", e.code().value(), e.what());
410       close();
411       throw TTransportException(TTransportException::NOT_OPEN,
412                                 "Could not resolve host for server socket.");
413     }
414   }
415 
416   // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
417   // always seem to work. The client can configure the retry variables.
418   int retries = 0;
419   int errno_copy = 0;
420 
421   if (!tcp) {
422     // -- Unix Domain Socket -- //
423 
424     serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
425 
426     if (serverSocket_ == THRIFT_INVALID_SOCKET) {
427       int errno_copy = THRIFT_GET_SOCKET_ERROR;
428       GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
429       close();
430       throw TTransportException(TTransportException::NOT_OPEN,
431                                 "Could not create server socket.",
432                                 errno_copy);
433     }
434 
435     _setup_sockopts();
436     _setup_unixdomain_sockopts();
437 
438 /*
439  * TODO: seems that windows now support unix sockets,
440  *       see: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
441  */
442 #ifndef _WIN32
443 
444     struct sockaddr_un address;
445     socklen_t structlen = fillUnixSocketAddr(address, path_);
446 
447     do {
448       if (0 == ::bind(serverSocket_, (struct sockaddr*)&address, structlen)) {
449         break;
450       }
451       errno_copy = THRIFT_GET_SOCKET_ERROR;
452       // use short circuit evaluation here to only sleep if we need to
453     } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
454 #else
455     GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
456     throw TTransportException(TTransportException::NOT_OPEN,
457                               " Unix Domain socket path not supported");
458 #endif
459   } else {
460 
461     // -- TCP socket -- //
462 
463     auto addr_iter = AddressResolutionHelper::Iter{};
464 
465     // Via DNS or somehow else, single hostname can resolve into many addresses.
466     // Results may contain perhaps a mix of IPv4 and IPv6.  Here, we iterate
467     // over what system gave us, picking the first address that works.
468     do {
469       if (!addr_iter) {
470         // init + recycle over many retries
471         addr_iter = resolved_addresses.iterate();
472       }
473       auto trybind = *addr_iter++;
474 
475       serverSocket_ = socket(trybind->ai_family, trybind->ai_socktype, trybind->ai_protocol);
476       if (serverSocket_ == -1) {
477         errno_copy = THRIFT_GET_SOCKET_ERROR;
478         continue;
479       }
480 
481       _setup_sockopts();
482       _setup_tcp_sockopts();
483 
484 #ifdef IPV6_V6ONLY
485       if (trybind->ai_family == AF_INET6) {
486         int zero = 0;
487         if (-1 == setsockopt(serverSocket_,
488                              IPPROTO_IPV6,
489                              IPV6_V6ONLY,
490                              cast_sockopt(&zero),
491                              sizeof(zero))) {
492           GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
493         }
494       }
495 #endif // #ifdef IPV6_V6ONLY
496 
497       if (0 == ::bind(serverSocket_, trybind->ai_addr, static_cast<int>(trybind->ai_addrlen))) {
498         break;
499       }
500       errno_copy = THRIFT_GET_SOCKET_ERROR;
501 
502       // use short circuit evaluation here to only sleep if we need to
503     } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
504 
505     // retrieve bind info
506     if (port_ == 0 && retries <= retryLimit_) {
507       struct sockaddr_storage sa;
508       socklen_t len = sizeof(sa);
509       std::memset(&sa, 0, len);
510       if (::getsockname(serverSocket_, reinterpret_cast<struct sockaddr*>(&sa), &len) < 0) {
511         errno_copy = THRIFT_GET_SOCKET_ERROR;
512         GlobalOutput.perror("TServerSocket::getPort() getsockname() ", errno_copy);
513       } else {
514         if (sa.ss_family == AF_INET6) {
515           const auto* sin = reinterpret_cast<const struct sockaddr_in6*>(&sa);
516           port_ = ntohs(sin->sin6_port);
517         } else {
518           const auto* sin = reinterpret_cast<const struct sockaddr_in*>(&sa);
519           port_ = ntohs(sin->sin_port);
520         }
521       }
522     }
523   } // TCP socket //
524 
525   // throw error if socket still wasn't created successfully
526   if (serverSocket_ == THRIFT_INVALID_SOCKET) {
527     GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
528     close();
529     throw TTransportException(TTransportException::NOT_OPEN,
530                               "Could not create server socket.",
531                               errno_copy);
532   }
533 
534   // throw an error if we failed to bind properly
535   if (retries > retryLimit_) {
536     char errbuf[1024];
537     if (!tcp) {
538       THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TServerSocket::listen() PATH %s", path_.c_str());
539     } else {
540       THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TServerSocket::listen() BIND %d", port_);
541     }
542     GlobalOutput(errbuf);
543     close();
544     throw TTransportException(TTransportException::NOT_OPEN,
545                               "Could not bind",
546                               errno_copy);
547   }
548 
549   if (listenCallback_)
550     listenCallback_(serverSocket_);
551 
552   // Call listen
553   if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
554     errno_copy = THRIFT_GET_SOCKET_ERROR;
555     GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
556     close();
557     throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
558   }
559 
560   // The socket is now listening!
561   listening_ = true;
562 }
563 
564 int TServerSocket::getPort() {
565   return port_;
566 }
567 
568 shared_ptr<TTransport> TServerSocket::acceptImpl() {
569   if (serverSocket_ == THRIFT_INVALID_SOCKET) {
570     throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
571   }
572 
573   struct THRIFT_POLLFD fds[2];
574 
575   int maxEintrs = 5;
576   int numEintrs = 0;
577 
578   while (true) {
579     std::memset(fds, 0, sizeof(fds));
580     fds[0].fd = serverSocket_;
581     fds[0].events = THRIFT_POLLIN;
582     if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
583       fds[1].fd = interruptSockReader_;
584       fds[1].events = THRIFT_POLLIN;
585     }
586     /*
587       TODO: if THRIFT_EINTR is received, we'll restart the timeout.
588       To be accurate, we need to fix this in the future.
589      */
590     int ret = THRIFT_POLL(fds, 2, accTimeout_);
591 
592     if (ret < 0) {
593       // error cases
594       if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && (numEintrs++ < maxEintrs)) {
595         // THRIFT_EINTR needs to be handled manually and we can tolerate
596         // a certain number
597         continue;
598       }
599       int errno_copy = THRIFT_GET_SOCKET_ERROR;
600       GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_POLL() ", errno_copy);
601       throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
602     } else if (ret > 0) {
603       // Check for an interrupt signal
604       if (interruptSockReader_ != THRIFT_INVALID_SOCKET && (fds[1].revents & THRIFT_POLLIN)) {
605         int8_t buf;
606         if (-1 == recv(interruptSockReader_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
607           GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ",
608                               THRIFT_GET_SOCKET_ERROR);
609         }
610         throw TTransportException(TTransportException::INTERRUPTED);
611       }
612 
613       // Check for the actual server socket being ready
614       if (fds[0].revents & THRIFT_POLLIN) {
615         break;
616       }
617     } else {
618       GlobalOutput("TServerSocket::acceptImpl() THRIFT_POLL 0");
619       throw TTransportException(TTransportException::UNKNOWN);
620     }
621   }
622 
623   struct sockaddr_storage clientAddress;
624   int size = sizeof(clientAddress);
625   THRIFT_SOCKET clientSocket
626       = ::accept(serverSocket_, (struct sockaddr*)&clientAddress, (socklen_t*)&size);
627 
628   if (clientSocket == THRIFT_INVALID_SOCKET) {
629     int errno_copy = THRIFT_GET_SOCKET_ERROR;
630     GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
631     throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
632   }
633 
634   // Make sure client socket is blocking
635   int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
636   if (flags == -1) {
637     int errno_copy = THRIFT_GET_SOCKET_ERROR;
638     ::THRIFT_CLOSESOCKET(clientSocket);
639     GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
640     throw TTransportException(TTransportException::UNKNOWN,
641                               "THRIFT_FCNTL(THRIFT_F_GETFL)",
642                               errno_copy);
643   }
644 
645   if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) {
646     int errno_copy = THRIFT_GET_SOCKET_ERROR;
647     ::THRIFT_CLOSESOCKET(clientSocket);
648     GlobalOutput
649         .perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ",
650                 errno_copy);
651     throw TTransportException(TTransportException::UNKNOWN,
652                               "THRIFT_FCNTL(THRIFT_F_SETFL)",
653                               errno_copy);
654   }
655 
656   shared_ptr<TSocket> client = createSocket(clientSocket);
657   if (sendTimeout_ > 0) {
658     client->setSendTimeout(sendTimeout_);
659   }
660   if (recvTimeout_ > 0) {
661     client->setRecvTimeout(recvTimeout_);
662   }
663   if (keepAlive_) {
664     client->setKeepAlive(keepAlive_);
665   }
666   client->setCachedAddress((sockaddr*)&clientAddress, size);
667 
668   if (acceptCallback_)
669     acceptCallback_(clientSocket);
670 
671   return client;
672 }
673 
674 shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
675   if (interruptableChildren_) {
676     return std::make_shared<TSocket>(clientSocket, pChildInterruptSockReader_);
677   } else {
678     return std::make_shared<TSocket>(clientSocket);
679   }
680 }
681 
682 void TServerSocket::notify(THRIFT_SOCKET notifySocket) {
683   if (notifySocket != THRIFT_INVALID_SOCKET) {
684     int8_t byte = 0;
685     if (-1 == send(notifySocket, cast_sockopt(&byte), sizeof(int8_t), 0)) {
686       GlobalOutput.perror("TServerSocket::notify() send() ", THRIFT_GET_SOCKET_ERROR);
687     }
688   }
689 }
690 
691 void TServerSocket::interrupt() {
692   concurrency::Guard g(rwMutex_);
693   if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) {
694     notify(interruptSockWriter_);
695   }
696 }
697 
698 void TServerSocket::interruptChildren() {
699   concurrency::Guard g(rwMutex_);
700   if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) {
701     notify(childInterruptSockWriter_);
702   }
703 }
704 
705 void TServerSocket::close() {
706   concurrency::Guard g(rwMutex_);
707   if (serverSocket_ != THRIFT_INVALID_SOCKET) {
708     shutdown(serverSocket_, THRIFT_SHUT_RDWR);
709     ::THRIFT_CLOSESOCKET(serverSocket_);
710   }
711   if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) {
712     ::THRIFT_CLOSESOCKET(interruptSockWriter_);
713   }
714   if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
715     ::THRIFT_CLOSESOCKET(interruptSockReader_);
716   }
717   if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) {
718     ::THRIFT_CLOSESOCKET(childInterruptSockWriter_);
719   }
720   serverSocket_ = THRIFT_INVALID_SOCKET;
721   interruptSockWriter_ = THRIFT_INVALID_SOCKET;
722   interruptSockReader_ = THRIFT_INVALID_SOCKET;
723   childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
724   pChildInterruptSockReader_.reset();
725   listening_ = false;
726 }
727 } // namespace transport
728 } // namespace thrift
729 } // namespace apache
730