1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <cstring>
23 #include <memory>
24 #include <stdexcept>
25 #include <sys/types.h>
26 #ifdef HAVE_SYS_SOCKET_H
27 #include <sys/socket.h>
28 #endif
29 #ifdef HAVE_SYS_UN_H
30 #include <sys/un.h>
31 #endif
32 #ifdef HAVE_SYS_POLL_H
33 #include <sys/poll.h>
34 #endif
35 #ifdef HAVE_NETINET_IN_H
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #endif
39 #ifdef HAVE_NETDB_H
40 #include <netdb.h>
41 #endif
42 #include <fcntl.h>
43 #ifdef HAVE_UNISTD_H
44 #include <unistd.h>
45 #endif
46 #ifdef HAVE_SYS_STAT_H
47 #include <sys/stat.h>
48 #endif
49
50 #include <thrift/transport/PlatformSocket.h>
51 #include <thrift/transport/TServerSocket.h>
52 #include <thrift/transport/TSocket.h>
53 #include <thrift/transport/TSocketUtils.h>
54 #include <thrift/transport/SocketCommon.h>
55
56 #ifndef AF_LOCAL
57 #define AF_LOCAL AF_UNIX
58 #endif
59
60 #ifndef SOCKOPT_CAST_T
61 #ifndef _WIN32
62 #define SOCKOPT_CAST_T void
63 #else
64 #define SOCKOPT_CAST_T char
65 #endif // _WIN32
66 #endif
67
68 #ifdef _WIN32
69 // Including Windows.h can conflict with Winsock2 usage, and also
70 // adds problematic macros like min() and max(). Try to work around:
71 #define NOMINMAX
72 #define WIN32_LEAN_AND_MEAN
73 #include <Windows.h>
74 #undef NOMINMAX
75 #undef WIN32_LEAN_AND_MEAN
76 #endif
77
78 template <class T>
const_cast_sockopt(const T * v)79 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
80 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
81 }
82
83 template <class T>
cast_sockopt(T * v)84 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
85 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
86 }
87
destroyer_of_fine_sockets(THRIFT_SOCKET * ssock)88 void destroyer_of_fine_sockets(THRIFT_SOCKET* ssock) {
89 ::THRIFT_CLOSESOCKET(*ssock);
90 delete ssock;
91 }
92
93 using std::string;
94
95 namespace apache {
96 namespace thrift {
97 namespace transport {
98
99 using std::shared_ptr;
100
TServerSocket(int port)101 TServerSocket::TServerSocket(int port)
102 : interruptableChildren_(true),
103 port_(port),
104 serverSocket_(THRIFT_INVALID_SOCKET),
105 acceptBacklog_(DEFAULT_BACKLOG),
106 sendTimeout_(0),
107 recvTimeout_(0),
108 accTimeout_(-1),
109 retryLimit_(0),
110 retryDelay_(0),
111 tcpSendBuffer_(0),
112 tcpRecvBuffer_(0),
113 keepAlive_(false),
114 listening_(false),
115 interruptSockWriter_(THRIFT_INVALID_SOCKET),
116 interruptSockReader_(THRIFT_INVALID_SOCKET),
117 childInterruptSockWriter_(THRIFT_INVALID_SOCKET) {
118 }
119
TServerSocket(int port,int sendTimeout,int recvTimeout)120 TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout)
121 : interruptableChildren_(true),
122 port_(port),
123 serverSocket_(THRIFT_INVALID_SOCKET),
124 acceptBacklog_(DEFAULT_BACKLOG),
125 sendTimeout_(sendTimeout),
126 recvTimeout_(recvTimeout),
127 accTimeout_(-1),
128 retryLimit_(0),
129 retryDelay_(0),
130 tcpSendBuffer_(0),
131 tcpRecvBuffer_(0),
132 keepAlive_(false),
133 listening_(false),
134 interruptSockWriter_(THRIFT_INVALID_SOCKET),
135 interruptSockReader_(THRIFT_INVALID_SOCKET),
136 childInterruptSockWriter_(THRIFT_INVALID_SOCKET) {
137 }
138
TServerSocket(const string & address,int port)139 TServerSocket::TServerSocket(const string& address, int port)
140 : interruptableChildren_(true),
141 port_(port),
142 address_(address),
143 serverSocket_(THRIFT_INVALID_SOCKET),
144 acceptBacklog_(DEFAULT_BACKLOG),
145 sendTimeout_(0),
146 recvTimeout_(0),
147 accTimeout_(-1),
148 retryLimit_(0),
149 retryDelay_(0),
150 tcpSendBuffer_(0),
151 tcpRecvBuffer_(0),
152 keepAlive_(false),
153 listening_(false),
154 interruptSockWriter_(THRIFT_INVALID_SOCKET),
155 interruptSockReader_(THRIFT_INVALID_SOCKET),
156 childInterruptSockWriter_(THRIFT_INVALID_SOCKET) {
157 }
158
TServerSocket(const string & path)159 TServerSocket::TServerSocket(const string& path)
160 : interruptableChildren_(true),
161 port_(0),
162 path_(path),
163 serverSocket_(THRIFT_INVALID_SOCKET),
164 acceptBacklog_(DEFAULT_BACKLOG),
165 sendTimeout_(0),
166 recvTimeout_(0),
167 accTimeout_(-1),
168 retryLimit_(0),
169 retryDelay_(0),
170 tcpSendBuffer_(0),
171 tcpRecvBuffer_(0),
172 keepAlive_(false),
173 listening_(false),
174 interruptSockWriter_(THRIFT_INVALID_SOCKET),
175 interruptSockReader_(THRIFT_INVALID_SOCKET),
176 childInterruptSockWriter_(THRIFT_INVALID_SOCKET) {
177 }
178
~TServerSocket()179 TServerSocket::~TServerSocket() {
180 close();
181 }
182
isOpen() const183 bool TServerSocket::isOpen() const {
184 if (serverSocket_ == THRIFT_INVALID_SOCKET)
185 return false;
186
187 if (!listening_)
188 return false;
189
190 if (!path_.empty() && (path_[0] != '\0')) {
191 // On some platforms the domain socket file may not be instantly
192 // available yet, i.e. the Windows file system can be slow. Therefore
193 // we should check that the domain socket file actually exists.
194 #ifdef _MSC_VER
195 // Currently there is a bug in ClangCl on Windows so the stat() call
196 // does not work. Workaround is a Windows-specific call if file exists:
197 DWORD const f_attrib = GetFileAttributesA(path_.c_str());
198 if (f_attrib == INVALID_FILE_ATTRIBUTES) {
199 #else
200 struct THRIFT_STAT path_info;
201 if (::THRIFT_STAT(path_.c_str(), &path_info) < 0) {
202 #endif
203 const std::string vError = "TServerSocket::isOpen(): The domain socket path '" + path_ + "' does not exist (yet).";
204 GlobalOutput.perror(vError.c_str(), THRIFT_GET_SOCKET_ERROR);
205 return false;
206 }
207 }
208
209 return true;
210 }
211
212 void TServerSocket::setSendTimeout(int sendTimeout) {
213 sendTimeout_ = sendTimeout;
214 }
215
216 void TServerSocket::setRecvTimeout(int recvTimeout) {
217 recvTimeout_ = recvTimeout;
218 }
219
220 void TServerSocket::setAcceptTimeout(int accTimeout) {
221 accTimeout_ = accTimeout;
222 }
223
224 void TServerSocket::setAcceptBacklog(int accBacklog) {
225 acceptBacklog_ = accBacklog;
226 }
227
228 void TServerSocket::setRetryLimit(int retryLimit) {
229 retryLimit_ = retryLimit;
230 }
231
232 void TServerSocket::setRetryDelay(int retryDelay) {
233 retryDelay_ = retryDelay;
234 }
235
236 void TServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
237 tcpSendBuffer_ = tcpSendBuffer;
238 }
239
240 void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
241 tcpRecvBuffer_ = tcpRecvBuffer;
242 }
243
244 void TServerSocket::setInterruptableChildren(bool enable) {
245 if (listening_) {
246 throw std::logic_error("setInterruptableChildren cannot be called after listen()");
247 }
248 interruptableChildren_ = enable;
249 }
250
251 void TServerSocket::_setup_sockopts() {
252
253 // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
254 int one = 1;
255 if (-1 == setsockopt(serverSocket_,
256 SOL_SOCKET,
257 THRIFT_NO_SOCKET_CACHING,
258 cast_sockopt(&one),
259 sizeof(one))) {
260 // ignore errors coming out of this setsockopt on Windows. This is because
261 // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
262 // want to force servers to be an admin.
263 #ifndef _WIN32
264 int errno_copy = THRIFT_GET_SOCKET_ERROR;
265 GlobalOutput.perror("TServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
266 errno_copy);
267 close();
268 throw TTransportException(TTransportException::NOT_OPEN,
269 "Could not set THRIFT_NO_SOCKET_CACHING",
270 errno_copy);
271 #endif
272 }
273
274 // Set TCP buffer sizes
275 if (tcpSendBuffer_ > 0) {
276 if (-1 == setsockopt(serverSocket_,
277 SOL_SOCKET,
278 SO_SNDBUF,
279 cast_sockopt(&tcpSendBuffer_),
280 sizeof(tcpSendBuffer_))) {
281 int errno_copy = THRIFT_GET_SOCKET_ERROR;
282 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
283 close();
284 throw TTransportException(TTransportException::NOT_OPEN,
285 "Could not set SO_SNDBUF",
286 errno_copy);
287 }
288 }
289
290 if (tcpRecvBuffer_ > 0) {
291 if (-1 == setsockopt(serverSocket_,
292 SOL_SOCKET,
293 SO_RCVBUF,
294 cast_sockopt(&tcpRecvBuffer_),
295 sizeof(tcpRecvBuffer_))) {
296 int errno_copy = THRIFT_GET_SOCKET_ERROR;
297 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
298 close();
299 throw TTransportException(TTransportException::NOT_OPEN,
300 "Could not set SO_RCVBUF",
301 errno_copy);
302 }
303 }
304
305 // Turn linger off, don't want to block on calls to close
306 struct linger ling = {0, 0};
307 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) {
308 int errno_copy = THRIFT_GET_SOCKET_ERROR;
309 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
310 close();
311 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
312 }
313
314 // Set NONBLOCK on the accept socket
315 int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
316 if (flags == -1) {
317 int errno_copy = THRIFT_GET_SOCKET_ERROR;
318 GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
319 close();
320 throw TTransportException(TTransportException::NOT_OPEN,
321 "THRIFT_FCNTL() THRIFT_F_GETFL failed",
322 errno_copy);
323 }
324 if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
325 int errno_copy = THRIFT_GET_SOCKET_ERROR;
326 GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
327 close();
328 throw TTransportException(TTransportException::NOT_OPEN,
329 "THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
330 errno_copy);
331 }
332 }
333
334 void TServerSocket::_setup_unixdomain_sockopts() {
335 }
336
337 void TServerSocket::_setup_tcp_sockopts() {
338 int one = 1;
339
340 // Defer accept
341 #ifdef TCP_DEFER_ACCEPT
342 if (path_.empty()) {
343 if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_DEFER_ACCEPT, &one, sizeof(one))) {
344 int errno_copy = THRIFT_GET_SOCKET_ERROR;
345 GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
346 close();
347 throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT",
348 errno_copy);
349 }
350 }
351 #endif // #ifdef TCP_DEFER_ACCEPT
352
353 // TCP Nodelay, speed over bandwidth
354 if (-1
355 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
356 int errno_copy = THRIFT_GET_SOCKET_ERROR;
357 GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
358 close();
359 throw TTransportException(TTransportException::NOT_OPEN,
360 "Could not set TCP_NODELAY",
361 errno_copy);
362 }
363 }
364
365 void TServerSocket::listen() {
366 #ifdef _WIN32
367 TWinsockSingleton::create();
368 #endif // _WIN32
369
370 THRIFT_SOCKET sv[2];
371 // Create the socket pair used to interrupt
372 if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
373 GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt",
374 THRIFT_GET_SOCKET_ERROR);
375 interruptSockWriter_ = THRIFT_INVALID_SOCKET;
376 interruptSockReader_ = THRIFT_INVALID_SOCKET;
377 } else {
378 interruptSockWriter_ = sv[1];
379 interruptSockReader_ = sv[0];
380 }
381
382 // Create the socket pair used to interrupt all clients
383 if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
384 GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt",
385 THRIFT_GET_SOCKET_ERROR);
386 childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
387 pChildInterruptSockReader_.reset();
388 } else {
389 childInterruptSockWriter_ = sv[1];
390 pChildInterruptSockReader_
391 = std::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets);
392 }
393
394 // tcp == false means Unix Domain socket
395 bool tcp = (path_.empty());
396
397 // Validate port number
398 if (port_ < 0 || port_ > 0xFFFF) {
399 throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
400 }
401
402 // Resolve host:port strings into an iterable of struct addrinfo*
403 AddressResolutionHelper resolved_addresses;
404 if (tcp) {
405 try {
406 resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM,
407 AI_PASSIVE | AI_V4MAPPED);
408 } catch (const std::system_error& e) {
409 GlobalOutput.printf("getaddrinfo() -> %d; %s", e.code().value(), e.what());
410 close();
411 throw TTransportException(TTransportException::NOT_OPEN,
412 "Could not resolve host for server socket.");
413 }
414 }
415
416 // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
417 // always seem to work. The client can configure the retry variables.
418 int retries = 0;
419 int errno_copy = 0;
420
421 if (!tcp) {
422 // -- Unix Domain Socket -- //
423
424 serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
425
426 if (serverSocket_ == THRIFT_INVALID_SOCKET) {
427 int errno_copy = THRIFT_GET_SOCKET_ERROR;
428 GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
429 close();
430 throw TTransportException(TTransportException::NOT_OPEN,
431 "Could not create server socket.",
432 errno_copy);
433 }
434
435 _setup_sockopts();
436 _setup_unixdomain_sockopts();
437
438 /*
439 * TODO: seems that windows now support unix sockets,
440 * see: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
441 */
442 #ifndef _WIN32
443
444 struct sockaddr_un address;
445 socklen_t structlen = fillUnixSocketAddr(address, path_);
446
447 do {
448 if (0 == ::bind(serverSocket_, (struct sockaddr*)&address, structlen)) {
449 break;
450 }
451 errno_copy = THRIFT_GET_SOCKET_ERROR;
452 // use short circuit evaluation here to only sleep if we need to
453 } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
454 #else
455 GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
456 throw TTransportException(TTransportException::NOT_OPEN,
457 " Unix Domain socket path not supported");
458 #endif
459 } else {
460
461 // -- TCP socket -- //
462
463 auto addr_iter = AddressResolutionHelper::Iter{};
464
465 // Via DNS or somehow else, single hostname can resolve into many addresses.
466 // Results may contain perhaps a mix of IPv4 and IPv6. Here, we iterate
467 // over what system gave us, picking the first address that works.
468 do {
469 if (!addr_iter) {
470 // init + recycle over many retries
471 addr_iter = resolved_addresses.iterate();
472 }
473 auto trybind = *addr_iter++;
474
475 serverSocket_ = socket(trybind->ai_family, trybind->ai_socktype, trybind->ai_protocol);
476 if (serverSocket_ == -1) {
477 errno_copy = THRIFT_GET_SOCKET_ERROR;
478 continue;
479 }
480
481 _setup_sockopts();
482 _setup_tcp_sockopts();
483
484 #ifdef IPV6_V6ONLY
485 if (trybind->ai_family == AF_INET6) {
486 int zero = 0;
487 if (-1 == setsockopt(serverSocket_,
488 IPPROTO_IPV6,
489 IPV6_V6ONLY,
490 cast_sockopt(&zero),
491 sizeof(zero))) {
492 GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
493 }
494 }
495 #endif // #ifdef IPV6_V6ONLY
496
497 if (0 == ::bind(serverSocket_, trybind->ai_addr, static_cast<int>(trybind->ai_addrlen))) {
498 break;
499 }
500 errno_copy = THRIFT_GET_SOCKET_ERROR;
501
502 // use short circuit evaluation here to only sleep if we need to
503 } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
504
505 // retrieve bind info
506 if (port_ == 0 && retries <= retryLimit_) {
507 struct sockaddr_storage sa;
508 socklen_t len = sizeof(sa);
509 std::memset(&sa, 0, len);
510 if (::getsockname(serverSocket_, reinterpret_cast<struct sockaddr*>(&sa), &len) < 0) {
511 errno_copy = THRIFT_GET_SOCKET_ERROR;
512 GlobalOutput.perror("TServerSocket::getPort() getsockname() ", errno_copy);
513 } else {
514 if (sa.ss_family == AF_INET6) {
515 const auto* sin = reinterpret_cast<const struct sockaddr_in6*>(&sa);
516 port_ = ntohs(sin->sin6_port);
517 } else {
518 const auto* sin = reinterpret_cast<const struct sockaddr_in*>(&sa);
519 port_ = ntohs(sin->sin_port);
520 }
521 }
522 }
523 } // TCP socket //
524
525 // throw error if socket still wasn't created successfully
526 if (serverSocket_ == THRIFT_INVALID_SOCKET) {
527 GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
528 close();
529 throw TTransportException(TTransportException::NOT_OPEN,
530 "Could not create server socket.",
531 errno_copy);
532 }
533
534 // throw an error if we failed to bind properly
535 if (retries > retryLimit_) {
536 char errbuf[1024];
537 if (!tcp) {
538 THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TServerSocket::listen() PATH %s", path_.c_str());
539 } else {
540 THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TServerSocket::listen() BIND %d", port_);
541 }
542 GlobalOutput(errbuf);
543 close();
544 throw TTransportException(TTransportException::NOT_OPEN,
545 "Could not bind",
546 errno_copy);
547 }
548
549 if (listenCallback_)
550 listenCallback_(serverSocket_);
551
552 // Call listen
553 if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
554 errno_copy = THRIFT_GET_SOCKET_ERROR;
555 GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
556 close();
557 throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
558 }
559
560 // The socket is now listening!
561 listening_ = true;
562 }
563
564 int TServerSocket::getPort() {
565 return port_;
566 }
567
568 shared_ptr<TTransport> TServerSocket::acceptImpl() {
569 if (serverSocket_ == THRIFT_INVALID_SOCKET) {
570 throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
571 }
572
573 struct THRIFT_POLLFD fds[2];
574
575 int maxEintrs = 5;
576 int numEintrs = 0;
577
578 while (true) {
579 std::memset(fds, 0, sizeof(fds));
580 fds[0].fd = serverSocket_;
581 fds[0].events = THRIFT_POLLIN;
582 if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
583 fds[1].fd = interruptSockReader_;
584 fds[1].events = THRIFT_POLLIN;
585 }
586 /*
587 TODO: if THRIFT_EINTR is received, we'll restart the timeout.
588 To be accurate, we need to fix this in the future.
589 */
590 int ret = THRIFT_POLL(fds, 2, accTimeout_);
591
592 if (ret < 0) {
593 // error cases
594 if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && (numEintrs++ < maxEintrs)) {
595 // THRIFT_EINTR needs to be handled manually and we can tolerate
596 // a certain number
597 continue;
598 }
599 int errno_copy = THRIFT_GET_SOCKET_ERROR;
600 GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_POLL() ", errno_copy);
601 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
602 } else if (ret > 0) {
603 // Check for an interrupt signal
604 if (interruptSockReader_ != THRIFT_INVALID_SOCKET && (fds[1].revents & THRIFT_POLLIN)) {
605 int8_t buf;
606 if (-1 == recv(interruptSockReader_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
607 GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ",
608 THRIFT_GET_SOCKET_ERROR);
609 }
610 throw TTransportException(TTransportException::INTERRUPTED);
611 }
612
613 // Check for the actual server socket being ready
614 if (fds[0].revents & THRIFT_POLLIN) {
615 break;
616 }
617 } else {
618 GlobalOutput("TServerSocket::acceptImpl() THRIFT_POLL 0");
619 throw TTransportException(TTransportException::UNKNOWN);
620 }
621 }
622
623 struct sockaddr_storage clientAddress;
624 int size = sizeof(clientAddress);
625 THRIFT_SOCKET clientSocket
626 = ::accept(serverSocket_, (struct sockaddr*)&clientAddress, (socklen_t*)&size);
627
628 if (clientSocket == THRIFT_INVALID_SOCKET) {
629 int errno_copy = THRIFT_GET_SOCKET_ERROR;
630 GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
631 throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
632 }
633
634 // Make sure client socket is blocking
635 int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
636 if (flags == -1) {
637 int errno_copy = THRIFT_GET_SOCKET_ERROR;
638 ::THRIFT_CLOSESOCKET(clientSocket);
639 GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
640 throw TTransportException(TTransportException::UNKNOWN,
641 "THRIFT_FCNTL(THRIFT_F_GETFL)",
642 errno_copy);
643 }
644
645 if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) {
646 int errno_copy = THRIFT_GET_SOCKET_ERROR;
647 ::THRIFT_CLOSESOCKET(clientSocket);
648 GlobalOutput
649 .perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ",
650 errno_copy);
651 throw TTransportException(TTransportException::UNKNOWN,
652 "THRIFT_FCNTL(THRIFT_F_SETFL)",
653 errno_copy);
654 }
655
656 shared_ptr<TSocket> client = createSocket(clientSocket);
657 if (sendTimeout_ > 0) {
658 client->setSendTimeout(sendTimeout_);
659 }
660 if (recvTimeout_ > 0) {
661 client->setRecvTimeout(recvTimeout_);
662 }
663 if (keepAlive_) {
664 client->setKeepAlive(keepAlive_);
665 }
666 client->setCachedAddress((sockaddr*)&clientAddress, size);
667
668 if (acceptCallback_)
669 acceptCallback_(clientSocket);
670
671 return client;
672 }
673
674 shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
675 if (interruptableChildren_) {
676 return std::make_shared<TSocket>(clientSocket, pChildInterruptSockReader_);
677 } else {
678 return std::make_shared<TSocket>(clientSocket);
679 }
680 }
681
682 void TServerSocket::notify(THRIFT_SOCKET notifySocket) {
683 if (notifySocket != THRIFT_INVALID_SOCKET) {
684 int8_t byte = 0;
685 if (-1 == send(notifySocket, cast_sockopt(&byte), sizeof(int8_t), 0)) {
686 GlobalOutput.perror("TServerSocket::notify() send() ", THRIFT_GET_SOCKET_ERROR);
687 }
688 }
689 }
690
691 void TServerSocket::interrupt() {
692 concurrency::Guard g(rwMutex_);
693 if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) {
694 notify(interruptSockWriter_);
695 }
696 }
697
698 void TServerSocket::interruptChildren() {
699 concurrency::Guard g(rwMutex_);
700 if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) {
701 notify(childInterruptSockWriter_);
702 }
703 }
704
705 void TServerSocket::close() {
706 concurrency::Guard g(rwMutex_);
707 if (serverSocket_ != THRIFT_INVALID_SOCKET) {
708 shutdown(serverSocket_, THRIFT_SHUT_RDWR);
709 ::THRIFT_CLOSESOCKET(serverSocket_);
710 }
711 if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) {
712 ::THRIFT_CLOSESOCKET(interruptSockWriter_);
713 }
714 if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
715 ::THRIFT_CLOSESOCKET(interruptSockReader_);
716 }
717 if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) {
718 ::THRIFT_CLOSESOCKET(childInterruptSockWriter_);
719 }
720 serverSocket_ = THRIFT_INVALID_SOCKET;
721 interruptSockWriter_ = THRIFT_INVALID_SOCKET;
722 interruptSockReader_ = THRIFT_INVALID_SOCKET;
723 childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
724 pChildInterruptSockReader_.reset();
725 listening_ = false;
726 }
727 } // namespace transport
728 } // namespace thrift
729 } // namespace apache
730