1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <thrift/server/TNonblockingServer.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/transport/TSocket.h>
25 #include <thrift/concurrency/ThreadFactory.h>
26 #include <thrift/transport/PlatformSocket.h>
27
28 #include <algorithm>
29 #include <iostream>
30
31 #ifdef HAVE_POLL_H
32 #include <poll.h>
33 #elif HAVE_SYS_POLL_H
34 #include <sys/poll.h>
35 #elif HAVE_SYS_SELECT_H
36 #include <sys/select.h>
37 #endif
38
39 #ifdef HAVE_SYS_SOCKET_H
40 #include <sys/socket.h>
41 #endif
42
43 #ifdef HAVE_NETINET_IN_H
44 #include <netinet/in.h>
45 #include <netinet/tcp.h>
46 #endif
47
48 #ifdef HAVE_ARPA_INET_H
49 #include <arpa/inet.h>
50 #endif
51
52 #ifdef HAVE_NETDB_H
53 #include <netdb.h>
54 #endif
55
56 #ifdef HAVE_FCNTL_H
57 #include <fcntl.h>
58 #endif
59
60 #include <assert.h>
61
62 #ifdef HAVE_SCHED_H
63 #include <sched.h>
64 #endif
65
66 #ifndef AF_LOCAL
67 #define AF_LOCAL AF_UNIX
68 #endif
69
70 #ifdef HAVE_INTTYPES_H
71 #include <inttypes.h>
72 #endif
73
74 #ifdef HAVE_STDINT_H
75 #include <stdint.h>
76 #endif
77
78 namespace apache {
79 namespace thrift {
80 namespace server {
81
82 using namespace apache::thrift::protocol;
83 using namespace apache::thrift::transport;
84 using namespace apache::thrift::concurrency;
85 using apache::thrift::transport::TSocket;
86 using apache::thrift::transport::TTransportException;
87 using std::shared_ptr;
88
89 /// Three states for sockets: recv frame size, recv data, and send mode
90 enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
91
92 /**
93 * Five states for the nonblocking server:
94 * 1) initialize
95 * 2) read 4 byte frame size
96 * 3) read frame of data
97 * 4) send back data (if any)
98 * 5) force immediate connection close
99 */
100 enum TAppState {
101 APP_INIT,
102 APP_READ_FRAME_SIZE,
103 APP_READ_REQUEST,
104 APP_WAIT_TASK,
105 APP_SEND_RESULT,
106 APP_CLOSE_CONNECTION
107 };
108
109 /**
110 * Represents a connection that is handled via libevent. This connection
111 * essentially encapsulates a socket that has some associated libevent state.
112 */
113 class TNonblockingServer::TConnection {
114 private:
115 /// Server IO Thread handling this connection
116 TNonblockingIOThread* ioThread_;
117
118 /// Server handle
119 TNonblockingServer* server_;
120
121 /// TProcessor
122 std::shared_ptr<TProcessor> processor_;
123
124 /// Object wrapping network socket
125 std::shared_ptr<TSocket> tSocket_;
126
127 /// Libevent object
128 struct event event_;
129
130 /// Libevent flags
131 short eventFlags_;
132
133 /// Socket mode
134 TSocketState socketState_;
135
136 /// Application state
137 TAppState appState_;
138
139 /// How much data needed to read
140 uint32_t readWant_;
141
142 /// Where in the read buffer are we
143 uint32_t readBufferPos_;
144
145 /// Read buffer
146 uint8_t* readBuffer_;
147
148 /// Read buffer size
149 uint32_t readBufferSize_;
150
151 /// Write buffer
152 uint8_t* writeBuffer_;
153
154 /// Write buffer size
155 uint32_t writeBufferSize_;
156
157 /// How far through writing are we?
158 uint32_t writeBufferPos_;
159
160 /// Largest size of write buffer seen since buffer was constructed
161 size_t largestWriteBufferSize_;
162
163 /// Count of the number of calls for use with getResizeBufferEveryN().
164 int32_t callsForResize_;
165
166 /// Transport to read from
167 std::shared_ptr<TMemoryBuffer> inputTransport_;
168
169 /// Transport that processor writes to
170 std::shared_ptr<TMemoryBuffer> outputTransport_;
171
172 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
173 std::shared_ptr<TTransport> factoryInputTransport_;
174 std::shared_ptr<TTransport> factoryOutputTransport_;
175
176 /// Protocol decoder
177 std::shared_ptr<TProtocol> inputProtocol_;
178
179 /// Protocol encoder
180 std::shared_ptr<TProtocol> outputProtocol_;
181
182 /// Server event handler, if any
183 std::shared_ptr<TServerEventHandler> serverEventHandler_;
184
185 /// Thrift call context, if any
186 void* connectionContext_;
187
188 /// Go into read mode
setRead()189 void setRead() { setFlags(EV_READ | EV_PERSIST); }
190
191 /// Go into write mode
setWrite()192 void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
193
194 /// Set socket idle
setIdle()195 void setIdle() { setFlags(0); }
196
197 /**
198 * Set event flags for this connection.
199 *
200 * @param eventFlags flags we pass to libevent for the connection.
201 */
202 void setFlags(short eventFlags);
203
204 /**
205 * Libevent handler called (via our static wrapper) when the connection
206 * socket had something happen. Rather than use the flags libevent passed,
207 * we use the connection state to determine whether we need to read or
208 * write the socket.
209 */
210 void workSocket();
211
212 public:
213 class Task;
214
215 /// Constructor
TConnection(std::shared_ptr<TSocket> socket,TNonblockingIOThread * ioThread)216 TConnection(std::shared_ptr<TSocket> socket,
217 TNonblockingIOThread* ioThread) {
218 readBuffer_ = nullptr;
219 readBufferSize_ = 0;
220
221 ioThread_ = ioThread;
222 server_ = ioThread->getServer();
223
224 // Allocate input and output transports these only need to be allocated
225 // once per TConnection (they don't need to be reallocated on init() call)
226 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
227 outputTransport_.reset(
228 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
229
230 tSocket_ = socket;
231
232 init(ioThread);
233 }
234
~TConnection()235 ~TConnection() { std::free(readBuffer_); }
236
237 /// Close this connection and free or reset its resources.
238 void close();
239
240 /**
241 * Check buffers against any size limits and shrink it if exceeded.
242 *
243 * @param readLimit we reduce read buffer size to this (if nonzero).
244 * @param writeLimit if nonzero and write buffer is larger, replace it.
245 */
246 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
247
248 /// Initialize
249 void init(TNonblockingIOThread* ioThread);
250
251 /// set socket for connection
252 void setSocket(std::shared_ptr<TSocket> socket);
253
254 /**
255 * This is called when the application transitions from one state into
256 * another. This means that it has finished writing the data that it needed
257 * to, or finished receiving the data that it needed to.
258 */
259 void transition();
260
261 /**
262 * C-callable event handler for connection events. Provides a callback
263 * that libevent can understand which invokes connection_->workSocket().
264 *
265 * @param fd the descriptor the event occurred on.
266 * @param which the flags associated with the event.
267 * @param v void* callback arg where we placed TConnection's "this".
268 */
eventHandler(evutil_socket_t fd,short,void * v)269 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
270 assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
271 ((TConnection*)v)->workSocket();
272 }
273
274 /**
275 * Notification to server that processing has ended on this request.
276 * Can be called either when processing is completed or when a waiting
277 * task has been preemptively terminated (on overload).
278 *
279 * Don't call this from the IO thread itself.
280 *
281 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
282 */
notifyIOThread()283 bool notifyIOThread() { return ioThread_->notify(this); }
284
285 /*
286 * Returns the number of this connection's currently assigned IO
287 * thread.
288 */
getIOThreadNumber() const289 int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
290
291 /// Force connection shutdown for this connection.
forceClose()292 void forceClose() {
293 appState_ = APP_CLOSE_CONNECTION;
294 if (!notifyIOThread()) {
295 server_->decrementActiveProcessors();
296 close();
297 throw TException("TConnection::forceClose: failed write on notify pipe");
298 }
299 }
300
301 /// return the server this connection was initialized for.
getServer() const302 TNonblockingServer* getServer() const { return server_; }
303
304 /// get state of connection.
getState() const305 TAppState getState() const { return appState_; }
306
307 /// return the TSocket transport wrapping this network connection
getTSocket() const308 std::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
309
310 /// return the server event handler if any
getServerEventHandler()311 std::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
312
313 /// return the Thrift connection context if any
getConnectionContext()314 void* getConnectionContext() { return connectionContext_; }
315 };
316
317 class TNonblockingServer::TConnection::Task : public Runnable {
318 public:
Task(std::shared_ptr<TProcessor> processor,std::shared_ptr<TProtocol> input,std::shared_ptr<TProtocol> output,TConnection * connection)319 Task(std::shared_ptr<TProcessor> processor,
320 std::shared_ptr<TProtocol> input,
321 std::shared_ptr<TProtocol> output,
322 TConnection* connection)
323 : processor_(processor),
324 input_(input),
325 output_(output),
326 connection_(connection),
327 serverEventHandler_(connection_->getServerEventHandler()),
328 connectionContext_(connection_->getConnectionContext()) {}
329
run()330 void run() override {
331 try {
332 for (;;) {
333 if (serverEventHandler_) {
334 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
335 }
336 if (!processor_->process(input_, output_, connectionContext_)
337 || !input_->getTransport()->peek()) {
338 break;
339 }
340 }
341 } catch (const TTransportException& ttx) {
342 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
343 } catch (const std::bad_alloc&) {
344 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
345 exit(1);
346 } catch (const std::exception& x) {
347 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
348 typeid(x).name(),
349 x.what());
350 } catch (...) {
351 GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
352 }
353
354 // Signal completion back to the libevent thread via a pipe
355 if (!connection_->notifyIOThread()) {
356 GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
357 connection_->server_->decrementActiveProcessors();
358 connection_->close();
359 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
360 }
361 }
362
getTConnection()363 TConnection* getTConnection() { return connection_; }
364
365 private:
366 std::shared_ptr<TProcessor> processor_;
367 std::shared_ptr<TProtocol> input_;
368 std::shared_ptr<TProtocol> output_;
369 TConnection* connection_;
370 std::shared_ptr<TServerEventHandler> serverEventHandler_;
371 void* connectionContext_;
372 };
373
init(TNonblockingIOThread * ioThread)374 void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
375 ioThread_ = ioThread;
376 server_ = ioThread->getServer();
377 appState_ = APP_INIT;
378 eventFlags_ = 0;
379
380 readBufferPos_ = 0;
381 readWant_ = 0;
382
383 writeBuffer_ = nullptr;
384 writeBufferSize_ = 0;
385 writeBufferPos_ = 0;
386 largestWriteBufferSize_ = 0;
387
388 socketState_ = SOCKET_RECV_FRAMING;
389 callsForResize_ = 0;
390
391 // get input/transports
392 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
393 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
394
395 // Create protocol
396 if (server_->getHeaderTransport()) {
397 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
398 factoryOutputTransport_);
399 outputProtocol_ = inputProtocol_;
400 } else {
401 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
402 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
403 }
404
405 // Set up for any server event handler
406 serverEventHandler_ = server_->getEventHandler();
407 if (serverEventHandler_) {
408 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
409 } else {
410 connectionContext_ = nullptr;
411 }
412
413 // Get the processor
414 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
415 }
416
setSocket(std::shared_ptr<TSocket> socket)417 void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket) {
418 tSocket_ = socket;
419 }
420
workSocket()421 void TNonblockingServer::TConnection::workSocket() {
422 int got = 0, left = 0, sent = 0;
423 uint32_t fetch = 0;
424
425 switch (socketState_) {
426 case SOCKET_RECV_FRAMING:
427 union {
428 uint8_t buf[sizeof(uint32_t)];
429 uint32_t size;
430 } framing;
431
432 // if we've already received some bytes we kept them here
433 framing.size = readWant_;
434 // determine size of this frame
435 try {
436 // Read from the socket
437 fetch = tSocket_->read(&framing.buf[readBufferPos_],
438 uint32_t(sizeof(framing.size) - readBufferPos_));
439 if (fetch == 0) {
440 // Whenever we get here it means a remote disconnect
441 close();
442 return;
443 }
444 readBufferPos_ += fetch;
445 } catch (TTransportException& te) {
446 //In Nonblocking SSLSocket some operations need to be retried again.
447 //Current approach is parsing exception message, but a better solution needs to be investigated.
448 if(!strstr(te.what(), "retry")) {
449 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
450 close();
451
452 return;
453 }
454 }
455
456 if (readBufferPos_ < sizeof(framing.size)) {
457 // more needed before frame size is known -- save what we have so far
458 readWant_ = framing.size;
459 return;
460 }
461
462 readWant_ = ntohl(framing.size);
463 if (readWant_ > server_->getMaxFrameSize()) {
464 // Don't allow giant frame sizes. This prevents bad clients from
465 // causing us to try and allocate a giant buffer.
466 GlobalOutput.printf(
467 "TNonblockingServer: frame size too large "
468 "(%" PRIu32 " > %" PRIu64
469 ") from client %s. "
470 "Remote side not using TFramedTransport?",
471 readWant_,
472 (uint64_t)server_->getMaxFrameSize(),
473 tSocket_->getSocketInfo().c_str());
474 close();
475 return;
476 }
477 // size known; now get the rest of the frame
478 transition();
479
480 // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
481 // regular sockets, because if there is more data, libevent will fire the event handler registered for read
482 // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
483 // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
484 // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
485 // despite having more data.
486 if (tSocket_->hasPendingDataToRead())
487 {
488 workSocket();
489 }
490
491 return;
492
493 case SOCKET_RECV:
494 // It is an error to be in this state if we already have all the data
495 if (!(readBufferPos_ < readWant_)) {
496 GlobalOutput.printf("TNonblockingServer: frame size too short");
497 close();
498 return;
499 }
500
501 try {
502 // Read from the socket
503 fetch = readWant_ - readBufferPos_;
504 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
505 } catch (TTransportException& te) {
506 //In Nonblocking SSLSocket some operations need to be retried again.
507 //Current approach is parsing exception message, but a better solution needs to be investigated.
508 if(!strstr(te.what(), "retry")) {
509 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
510 close();
511 }
512
513 return;
514 }
515
516 if (got > 0) {
517 // Move along in the buffer
518 readBufferPos_ += got;
519
520 // Check that we did not overdo it
521 assert(readBufferPos_ <= readWant_);
522
523 // We are done reading, move onto the next state
524 if (readBufferPos_ == readWant_) {
525 transition();
526 }
527 return;
528 }
529
530 // Whenever we get down here it means a remote disconnect
531 close();
532
533 return;
534
535 case SOCKET_SEND:
536 // Should never have position past size
537 assert(writeBufferPos_ <= writeBufferSize_);
538
539 // If there is no data to send, then let us move on
540 if (writeBufferPos_ == writeBufferSize_) {
541 GlobalOutput("WARNING: Send state with no data to send");
542 transition();
543 return;
544 }
545
546 try {
547 left = writeBufferSize_ - writeBufferPos_;
548 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
549 } catch (TTransportException& te) {
550 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
551 close();
552 return;
553 }
554
555 writeBufferPos_ += sent;
556
557 // Did we overdo it?
558 assert(writeBufferPos_ <= writeBufferSize_);
559
560 // We are done!
561 if (writeBufferPos_ == writeBufferSize_) {
562 transition();
563 }
564
565 return;
566
567 default:
568 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
569 assert(0);
570 }
571 }
572
getHeaderTransport()573 bool TNonblockingServer::getHeaderTransport() {
574 // Currently if there is no output protocol factory,
575 // we assume header transport (without having to create
576 // a new transport and check)
577 return getOutputProtocolFactory() == nullptr;
578 }
579
580 /**
581 * This is called when the application transitions from one state into
582 * another. This means that it has finished writing the data that it needed
583 * to, or finished receiving the data that it needed to.
584 */
transition()585 void TNonblockingServer::TConnection::transition() {
586 // ensure this connection is active right now
587 assert(ioThread_);
588 assert(server_);
589
590 // Switch upon the state that we are currently in and move to a new state
591 switch (appState_) {
592
593 case APP_READ_REQUEST:
594 // We are done reading the request, package the read buffer into transport
595 // and get back some data from the dispatch function
596 if (server_->getHeaderTransport()) {
597 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
598 outputTransport_->resetBuffer();
599 } else {
600 // We saved room for the framing size in case header transport needed it,
601 // but just skip it for the non-header case
602 inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
603 outputTransport_->resetBuffer();
604
605 // Prepend four bytes of blank space to the buffer so we can
606 // write the frame size there later.
607 outputTransport_->getWritePtr(4);
608 outputTransport_->wroteBytes(4);
609 }
610
611 server_->incrementActiveProcessors();
612
613 if (server_->isThreadPoolProcessing()) {
614 // We are setting up a Task to do this work and we will wait on it
615
616 // Create task and dispatch to the thread manager
617 std::shared_ptr<Runnable> task = std::shared_ptr<Runnable>(
618 new Task(processor_, inputProtocol_, outputProtocol_, this));
619 // The application is now waiting on the task to finish
620 appState_ = APP_WAIT_TASK;
621
622 // Set this connection idle so that libevent doesn't process more
623 // data on it while we're still waiting for the threadmanager to
624 // finish this task
625 setIdle();
626
627 try {
628 server_->addTask(task);
629 } catch (IllegalStateException& ise) {
630 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
631 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
632 server_->decrementActiveProcessors();
633 close();
634 } catch (TimedOutException& to) {
635 GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
636 server_->decrementActiveProcessors();
637 close();
638 }
639
640 return;
641 } else {
642 try {
643 if (serverEventHandler_) {
644 serverEventHandler_->processContext(connectionContext_, getTSocket());
645 }
646 // Invoke the processor
647 processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
648 } catch (const TTransportException& ttx) {
649 GlobalOutput.printf(
650 "TNonblockingServer transport error in "
651 "process(): %s",
652 ttx.what());
653 server_->decrementActiveProcessors();
654 close();
655 return;
656 } catch (const std::exception& x) {
657 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
658 typeid(x).name(),
659 x.what());
660 server_->decrementActiveProcessors();
661 close();
662 return;
663 } catch (...) {
664 GlobalOutput.printf("Server::process() unknown exception");
665 server_->decrementActiveProcessors();
666 close();
667 return;
668 }
669 }
670 // fallthrough
671
672 // Intentionally fall through here, the call to process has written into
673 // the writeBuffer_
674
675 case APP_WAIT_TASK:
676 // We have now finished processing a task and the result has been written
677 // into the outputTransport_, so we grab its contents and place them into
678 // the writeBuffer_ for actual writing by the libevent thread
679
680 server_->decrementActiveProcessors();
681 // Get the result of the operation
682 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
683
684 // If the function call generated return data, then move into the send
685 // state and get going
686 // 4 bytes were reserved for frame size
687 if (writeBufferSize_ > 4) {
688
689 // Move into write state
690 writeBufferPos_ = 0;
691 socketState_ = SOCKET_SEND;
692
693 // Put the frame size into the write buffer
694 auto frameSize = (int32_t)htonl(writeBufferSize_ - 4);
695 memcpy(writeBuffer_, &frameSize, 4);
696
697 // Socket into write mode
698 appState_ = APP_SEND_RESULT;
699 setWrite();
700
701 return;
702 }
703
704 // In this case, the request was oneway and we should fall through
705 // right back into the read frame header state
706 goto LABEL_APP_INIT;
707
708 case APP_SEND_RESULT:
709 // it's now safe to perform buffer size housekeeping.
710 if (writeBufferSize_ > largestWriteBufferSize_) {
711 largestWriteBufferSize_ = writeBufferSize_;
712 }
713 if (server_->getResizeBufferEveryN() > 0
714 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
715 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
716 server_->getIdleWriteBufferLimit());
717 callsForResize_ = 0;
718 }
719 // fallthrough
720
721 // N.B.: We also intentionally fall through here into the INIT state!
722
723 LABEL_APP_INIT:
724 case APP_INIT:
725
726 // Clear write buffer variables
727 writeBuffer_ = nullptr;
728 writeBufferPos_ = 0;
729 writeBufferSize_ = 0;
730
731 // Into read4 state we go
732 socketState_ = SOCKET_RECV_FRAMING;
733 appState_ = APP_READ_FRAME_SIZE;
734
735 readBufferPos_ = 0;
736
737 // Register read event
738 setRead();
739
740 return;
741
742 case APP_READ_FRAME_SIZE:
743 readWant_ += 4;
744
745 // We just read the request length
746 // Double the buffer size until it is big enough
747 if (readWant_ > readBufferSize_) {
748 if (readBufferSize_ == 0) {
749 readBufferSize_ = 1;
750 }
751 uint32_t newSize = readBufferSize_;
752 while (readWant_ > newSize) {
753 newSize *= 2;
754 }
755
756 auto* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
757 if (newBuffer == nullptr) {
758 // nothing else to be done...
759 throw std::bad_alloc();
760 }
761 readBuffer_ = newBuffer;
762 readBufferSize_ = newSize;
763 }
764
765 readBufferPos_ = 4;
766 *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
767
768 // Move into read request state
769 socketState_ = SOCKET_RECV;
770 appState_ = APP_READ_REQUEST;
771
772 return;
773
774 case APP_CLOSE_CONNECTION:
775 server_->decrementActiveProcessors();
776 close();
777 return;
778
779 default:
780 GlobalOutput.printf("Unexpected Application State %d", appState_);
781 assert(0);
782 }
783 }
784
setFlags(short eventFlags)785 void TNonblockingServer::TConnection::setFlags(short eventFlags) {
786 // Catch the do nothing case
787 if (eventFlags_ == eventFlags) {
788 return;
789 }
790
791 // Delete a previously existing event
792 if (eventFlags_ && event_del(&event_) == -1) {
793 GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
794 return;
795 }
796
797 // Update in memory structure
798 eventFlags_ = eventFlags;
799
800 // Do not call event_set if there are no flags
801 if (!eventFlags_) {
802 return;
803 }
804
805 /*
806 * event_set:
807 *
808 * Prepares the event structure &event to be used in future calls to
809 * event_add() and event_del(). The event will be prepared to call the
810 * eventHandler using the 'sock' file descriptor to monitor events.
811 *
812 * The events can be either EV_READ, EV_WRITE, or both, indicating
813 * that an application can read or write from the file respectively without
814 * blocking.
815 *
816 * The eventHandler will be called with the file descriptor that triggered
817 * the event and the type of event which will be one of: EV_TIMEOUT,
818 * EV_SIGNAL, EV_READ, EV_WRITE.
819 *
820 * The additional flag EV_PERSIST makes an event_add() persistent until
821 * event_del() has been called.
822 *
823 * Once initialized, the &event struct can be used repeatedly with
824 * event_add() and event_del() and does not need to be reinitialized unless
825 * the eventHandler and/or the argument to it are to be changed. However,
826 * when an ev structure has been added to libevent using event_add() the
827 * structure must persist until the event occurs (assuming EV_PERSIST
828 * is not set) or is removed using event_del(). You may not reuse the same
829 * ev structure for multiple monitored descriptors; each descriptor needs
830 * its own ev.
831 */
832 event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
833 event_base_set(ioThread_->getEventBase(), &event_);
834
835 // Add the event
836 if (event_add(&event_, nullptr) == -1) {
837 GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
838 }
839 }
840
841 /**
842 * Closes a connection
843 */
close()844 void TNonblockingServer::TConnection::close() {
845 setIdle();
846
847 if (serverEventHandler_) {
848 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
849 }
850 ioThread_ = nullptr;
851
852 // Close the socket
853 tSocket_->close();
854
855 // close any factory produced transports
856 factoryInputTransport_->close();
857 factoryOutputTransport_->close();
858
859 // release processor and handler
860 processor_.reset();
861
862 // Give this object back to the server that owns it
863 server_->returnConnection(this);
864 }
865
checkIdleBufferMemLimit(size_t readLimit,size_t writeLimit)866 void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
867 if (readLimit > 0 && readBufferSize_ > readLimit) {
868 free(readBuffer_);
869 readBuffer_ = nullptr;
870 readBufferSize_ = 0;
871 }
872
873 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
874 // just start over
875 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
876 largestWriteBufferSize_ = 0;
877 }
878 }
879
~TNonblockingServer()880 TNonblockingServer::~TNonblockingServer() {
881 // Close any active connections (moves them to the idle connection stack)
882 while (activeConnections_.size()) {
883 activeConnections_.front()->close();
884 }
885 // Clean up unused TConnection objects in connectionStack_
886 while (!connectionStack_.empty()) {
887 TConnection* connection = connectionStack_.top();
888 connectionStack_.pop();
889 delete connection;
890 }
891 // The TNonblockingIOThread objects have shared_ptrs to the Thread
892 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
893 // objects (as runnable) so these objects will never deallocate without help.
894 while (!ioThreads_.empty()) {
895 std::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
896 ioThreads_.pop_back();
897 iot->setThread(std::shared_ptr<Thread>());
898 }
899 }
900
901 /**
902 * Creates a new connection either by reusing an object off the stack or
903 * by allocating a new one entirely
904 */
createConnection(std::shared_ptr<TSocket> socket)905 TNonblockingServer::TConnection* TNonblockingServer::createConnection(std::shared_ptr<TSocket> socket) {
906 // Check the stack
907 Guard g(connMutex_);
908
909 // pick an IO thread to handle this connection -- currently round robin
910 assert(nextIOThread_ < ioThreads_.size());
911 int selectedThreadIdx = nextIOThread_;
912 nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
913
914 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
915
916 // Check the connection stack to see if we can re-use
917 TConnection* result = nullptr;
918 if (connectionStack_.empty()) {
919 result = new TConnection(socket, ioThread);
920 ++numTConnections_;
921 } else {
922 result = connectionStack_.top();
923 connectionStack_.pop();
924 result->setSocket(socket);
925 result->init(ioThread);
926 }
927 activeConnections_.push_back(result);
928 return result;
929 }
930
931 /**
932 * Returns a connection to the stack
933 */
returnConnection(TConnection * connection)934 void TNonblockingServer::returnConnection(TConnection* connection) {
935 Guard g(connMutex_);
936
937 activeConnections_.erase(std::remove(activeConnections_.begin(),
938 activeConnections_.end(),
939 connection),
940 activeConnections_.end());
941
942 if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
943 delete connection;
944 --numTConnections_;
945 } else {
946 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
947 connectionStack_.push(connection);
948 }
949 }
950
951 /**
952 * Server socket had something happen. We accept all waiting client
953 * connections on fd and assign TConnection objects to handle those requests.
954 */
handleEvent(THRIFT_SOCKET fd,short which)955 void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
956 (void)which;
957 // Make sure that libevent didn't mess up the socket handles
958 assert(fd == serverSocket_);
959
960 // Going to accept a new client socket
961 std::shared_ptr<TSocket> clientSocket;
962
963 clientSocket = serverTransport_->accept();
964 if (clientSocket) {
965 // If we're overloaded, take action here
966 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
967 Guard g(connMutex_);
968 nConnectionsDropped_++;
969 nTotalConnectionsDropped_++;
970 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
971 clientSocket->close();
972 return;
973 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
974 if (!drainPendingTask()) {
975 // Nothing left to discard, so we drop connection instead.
976 clientSocket->close();
977 return;
978 }
979 }
980 }
981
982 // Create a new TConnection for this client socket.
983 TConnection* clientConnection = createConnection(clientSocket);
984
985 // Fail fast if we could not create a TConnection object
986 if (clientConnection == nullptr) {
987 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
988 clientSocket->close();
989 return;
990 }
991
992 /*
993 * Either notify the ioThread that is assigned this connection to
994 * start processing, or if it is us, we'll just ask this
995 * connection to do its initial state change here.
996 *
997 * (We need to avoid writing to our own notification pipe, to
998 * avoid possible deadlocks if the pipe is full.)
999 *
1000 * The IO thread #0 is the only one that handles these listen
1001 * events, so unless the connection has been assigned to thread #0
1002 * we know it's not on our thread.
1003 */
1004 if (clientConnection->getIOThreadNumber() == 0) {
1005 clientConnection->transition();
1006 } else {
1007 if (!clientConnection->notifyIOThread()) {
1008 GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
1009 clientConnection->close();
1010 }
1011 }
1012 }
1013 }
1014
1015 /**
1016 * Creates a socket to listen on and binds it to the local port.
1017 */
createAndListenOnSocket()1018 void TNonblockingServer::createAndListenOnSocket() {
1019 serverTransport_->listen();
1020 serverSocket_ = serverTransport_->getSocketFD();
1021 }
1022
1023
setThreadManager(std::shared_ptr<ThreadManager> threadManager)1024 void TNonblockingServer::setThreadManager(std::shared_ptr<ThreadManager> threadManager) {
1025 threadManager_ = threadManager;
1026 if (threadManager) {
1027 threadManager->setExpireCallback(
1028 std::bind(&TNonblockingServer::expireClose,
1029 this,
1030 std::placeholders::_1));
1031 threadPoolProcessing_ = true;
1032 } else {
1033 threadPoolProcessing_ = false;
1034 }
1035 }
1036
serverOverloaded()1037 bool TNonblockingServer::serverOverloaded() {
1038 size_t activeConnections = numTConnections_ - connectionStack_.size();
1039 if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
1040 if (!overloaded_) {
1041 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
1042 overloaded_ = true;
1043 }
1044 } else {
1045 if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
1046 && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1047 GlobalOutput.printf(
1048 "TNonblockingServer: overload ended; "
1049 "%u dropped (%llu total)",
1050 nConnectionsDropped_,
1051 nTotalConnectionsDropped_);
1052 nConnectionsDropped_ = 0;
1053 overloaded_ = false;
1054 }
1055 }
1056
1057 return overloaded_;
1058 }
1059
drainPendingTask()1060 bool TNonblockingServer::drainPendingTask() {
1061 if (threadManager_) {
1062 std::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1063 if (task) {
1064 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1065 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
1066 connection->forceClose();
1067 return true;
1068 }
1069 }
1070 return false;
1071 }
1072
expireClose(std::shared_ptr<Runnable> task)1073 void TNonblockingServer::expireClose(std::shared_ptr<Runnable> task) {
1074 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1075 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
1076 connection->forceClose();
1077 }
1078
stop()1079 void TNonblockingServer::stop() {
1080 // Breaks the event loop in all threads so that they end ASAP.
1081 for (auto & ioThread : ioThreads_) {
1082 ioThread->stop();
1083 }
1084 }
1085
registerEvents(event_base * user_event_base)1086 void TNonblockingServer::registerEvents(event_base* user_event_base) {
1087 userEventBase_ = user_event_base;
1088
1089 // init listen socket
1090 if (serverSocket_ == THRIFT_INVALID_SOCKET)
1091 createAndListenOnSocket();
1092
1093 // set up the IO threads
1094 assert(ioThreads_.empty());
1095 if (!numIOThreads_) {
1096 numIOThreads_ = DEFAULT_IO_THREADS;
1097 }
1098 // User-provided event-base doesn't works for multi-threaded servers
1099 assert(numIOThreads_ == 1 || !userEventBase_);
1100
1101 for (uint32_t id = 0; id < numIOThreads_; ++id) {
1102 // the first IO thread also does the listening on server socket
1103 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
1104
1105 shared_ptr<TNonblockingIOThread> thread(
1106 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1107 ioThreads_.push_back(thread);
1108 }
1109
1110 // Notify handler of the preServe event
1111 if (eventHandler_) {
1112 eventHandler_->preServe();
1113 }
1114
1115 // Start all of our helper IO threads. Note that the threads run forever,
1116 // only terminating if stop() is called.
1117 assert(ioThreads_.size() == numIOThreads_);
1118 assert(ioThreads_.size() > 0);
1119
1120 GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
1121 ioThreads_.size());
1122
1123 // Launch all the secondary IO threads in separate threads
1124 if (ioThreads_.size() > 1) {
1125 ioThreadFactory_.reset(new ThreadFactory(
1126 false // detached
1127 ));
1128
1129 assert(ioThreadFactory_.get());
1130
1131 // intentionally starting at thread 1, not 0
1132 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
1133 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1134 ioThreads_[i]->setThread(thread);
1135 thread->start();
1136 }
1137 }
1138
1139 // Register the events for the primary (listener) IO thread
1140 ioThreads_[0]->registerEvents();
1141 }
1142
1143 /**
1144 * Main workhorse function, starts up the server listening on a port and
1145 * loops over the libevent handler.
1146 */
serve()1147 void TNonblockingServer::serve() {
1148
1149 if (ioThreads_.empty())
1150 registerEvents(nullptr);
1151
1152 // Run the primary (listener) IO thread loop in our main thread; this will
1153 // only return when the server is shutting down.
1154 ioThreads_[0]->run();
1155
1156 // Ensure all threads are finished before exiting serve()
1157 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
1158 ioThreads_[i]->join();
1159 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1160 }
1161 }
1162
TNonblockingIOThread(TNonblockingServer * server,int number,THRIFT_SOCKET listenSocket,bool useHighPriority)1163 TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1164 int number,
1165 THRIFT_SOCKET listenSocket,
1166 bool useHighPriority)
1167 : server_(server),
1168 number_(number),
1169 threadId_{},
1170 listenSocket_(listenSocket),
1171 useHighPriority_(useHighPriority),
1172 eventBase_(nullptr),
1173 ownEventBase_(false),
1174 serverEvent_{},
1175 notificationEvent_{} {
1176 notificationPipeFDs_[0] = -1;
1177 notificationPipeFDs_[1] = -1;
1178 }
1179
~TNonblockingIOThread()1180 TNonblockingIOThread::~TNonblockingIOThread() {
1181 // make sure our associated thread is fully finished
1182 join();
1183
1184 if (eventBase_ && ownEventBase_) {
1185 event_base_free(eventBase_);
1186 ownEventBase_ = false;
1187 }
1188
1189 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1190 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
1191 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
1192 }
1193 listenSocket_ = THRIFT_INVALID_SOCKET;
1194 }
1195
1196 for (auto notificationPipeFD : notificationPipeFDs_) {
1197 if (notificationPipeFD >= 0) {
1198 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFD)) {
1199 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
1200 THRIFT_GET_SOCKET_ERROR);
1201 }
1202 notificationPipeFD = THRIFT_INVALID_SOCKET;
1203 }
1204 }
1205 }
1206
createNotificationPipe()1207 void TNonblockingIOThread::createNotificationPipe() {
1208 if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1209 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
1210 throw TException("can't create notification pipe");
1211 }
1212 if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
1213 || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
1214 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1215 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1216 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
1217 }
1218 for (auto notificationPipeFD : notificationPipeFDs_) {
1219 #if LIBEVENT_VERSION_NUMBER < 0x02000000
1220 int flags;
1221 if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0
1222 || THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) {
1223 #else
1224 if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) {
1225 #endif
1226 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1227 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1228 throw TException(
1229 "TNonblockingServer::createNotificationPipe() "
1230 "FD_CLOEXEC");
1231 }
1232 }
1233 }
1234
1235 /**
1236 * Register the core libevent events onto the proper base.
1237 */
1238 void TNonblockingIOThread::registerEvents() {
1239 threadId_ = Thread::get_current();
1240
1241 assert(eventBase_ == nullptr);
1242 eventBase_ = getServer()->getUserEventBase();
1243 if (eventBase_ == nullptr) {
1244 eventBase_ = event_base_new();
1245 ownEventBase_ = true;
1246 }
1247
1248 // Print some libevent stats
1249 if (number_ == 0) {
1250 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1251 event_get_version(),
1252 event_base_get_method(eventBase_));
1253 }
1254
1255 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1256 // Register the server event
1257 event_set(&serverEvent_,
1258 listenSocket_,
1259 EV_READ | EV_PERSIST,
1260 TNonblockingIOThread::listenHandler,
1261 server_);
1262 event_base_set(eventBase_, &serverEvent_);
1263
1264 // Add the event and start up the server
1265 if (-1 == event_add(&serverEvent_, nullptr)) {
1266 throw TException(
1267 "TNonblockingServer::serve(): "
1268 "event_add() failed on server listen event");
1269 }
1270 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
1271 }
1272
1273 createNotificationPipe();
1274
1275 // Create an event to be notified when a task finishes
1276 event_set(¬ificationEvent_,
1277 getNotificationRecvFD(),
1278 EV_READ | EV_PERSIST,
1279 TNonblockingIOThread::notifyHandler,
1280 this);
1281
1282 // Attach to the base
1283 event_base_set(eventBase_, ¬ificationEvent_);
1284
1285 // Add the event and start up the server
1286 if (-1 == event_add(¬ificationEvent_, nullptr)) {
1287 throw TException(
1288 "TNonblockingServer::serve(): "
1289 "event_add() failed on task-done notification event");
1290 }
1291 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
1292 }
1293
1294 bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
1295 auto fd = getNotificationSendFD();
1296 if (fd < 0) {
1297 return false;
1298 }
1299
1300 int ret = -1;
1301 long kSize = sizeof(conn);
1302 const char * pos = (const char *)const_cast_sockopt(&conn);
1303
1304 #if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
1305 struct pollfd pfd = {fd, POLLOUT, 0};
1306
1307 while (kSize > 0) {
1308 pfd.revents = 0;
1309 ret = poll(&pfd, 1, -1);
1310 if (ret < 0) {
1311 return false;
1312 } else if (ret == 0) {
1313 continue;
1314 }
1315
1316 if (pfd.revents & POLLHUP || pfd.revents & POLLERR) {
1317 ::THRIFT_CLOSESOCKET(fd);
1318 return false;
1319 }
1320
1321 if (pfd.revents & POLLOUT) {
1322 ret = send(fd, pos, kSize, 0);
1323 if (ret < 0) {
1324 if (errno == EAGAIN) {
1325 continue;
1326 }
1327
1328 ::THRIFT_CLOSESOCKET(fd);
1329 return false;
1330 }
1331
1332 kSize -= ret;
1333 pos += ret;
1334 }
1335 }
1336 #else
1337 fd_set wfds, efds;
1338
1339 while (kSize > 0) {
1340 FD_ZERO(&wfds);
1341 FD_ZERO(&efds);
1342 FD_SET(fd, &wfds);
1343 FD_SET(fd, &efds);
1344 ret = select(static_cast<int>(fd + 1), nullptr, &wfds, &efds, nullptr);
1345 if (ret < 0) {
1346 return false;
1347 } else if (ret == 0) {
1348 continue;
1349 }
1350
1351 if (FD_ISSET(fd, &efds)) {
1352 ::THRIFT_CLOSESOCKET(fd);
1353 return false;
1354 }
1355
1356 if (FD_ISSET(fd, &wfds)) {
1357 ret = send(fd, pos, kSize, 0);
1358 if (ret < 0) {
1359 if (errno == EAGAIN) {
1360 continue;
1361 }
1362
1363 ::THRIFT_CLOSESOCKET(fd);
1364 return false;
1365 }
1366
1367 kSize -= ret;
1368 pos += ret;
1369 }
1370 }
1371 #endif
1372
1373 return true;
1374 }
1375
1376 /* static */
1377 void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
1378 auto* ioThread = (TNonblockingIOThread*)v;
1379 assert(ioThread);
1380 (void)which;
1381
1382 while (true) {
1383 TNonblockingServer::TConnection* connection = nullptr;
1384 const int kSize = sizeof(connection);
1385 long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
1386 if (nBytes == kSize) {
1387 if (connection == nullptr) {
1388 // this is the command to stop our thread, exit the handler!
1389 ioThread->breakLoop(false);
1390 return;
1391 }
1392 connection->transition();
1393 } else if (nBytes > 0) {
1394 // throw away these bytes and hope that next time we get a solid read
1395 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
1396 ioThread->breakLoop(true);
1397 return;
1398 } else if (nBytes == 0) {
1399 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1400 ioThread->breakLoop(false);
1401 // exit the loop
1402 break;
1403 } else { // nBytes < 0
1404 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
1405 && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
1406 GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
1407 ioThread->breakLoop(true);
1408 return;
1409 }
1410 // exit the loop
1411 break;
1412 }
1413 }
1414 }
1415
1416 void TNonblockingIOThread::breakLoop(bool error) {
1417 if (error) {
1418 GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
1419 // TODO: figure out something better to do here, but for now kill the
1420 // whole process.
1421 GlobalOutput.printf("TNonblockingServer: aborting process.");
1422 ::abort();
1423 }
1424
1425 // If we're running in the same thread, we can't use the notify(0)
1426 // mechanism to stop the thread, but happily if we're running in the
1427 // same thread, this means the thread can't be blocking in the event
1428 // loop either.
1429 if (!Thread::is_current(threadId_)) {
1430 notify(nullptr);
1431 } else {
1432 // cause the loop to stop ASAP - even if it has things to do in it
1433 event_base_loopbreak(eventBase_);
1434 }
1435 }
1436
1437 void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
1438 #ifdef HAVE_SCHED_H
1439 // Start out with a standard, low-priority setup for the sched params.
1440 struct sched_param sp;
1441 bzero((void*)&sp, sizeof(sp));
1442 int policy = SCHED_OTHER;
1443
1444 // If desired, set up high-priority sched params structure.
1445 if (value) {
1446 // FIFO scheduler, ranked above default SCHED_OTHER queue
1447 policy = SCHED_FIFO;
1448 // The priority only compares us to other SCHED_FIFO threads, so we
1449 // just pick a random priority halfway between min & max.
1450 const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
1451
1452 sp.sched_priority = priority;
1453 }
1454
1455 // Actually set the sched params for the current thread.
1456 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1457 GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1458 } else {
1459 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
1460 }
1461 #else
1462 THRIFT_UNUSED_VARIABLE(value);
1463 #endif
1464 }
1465
1466 void TNonblockingIOThread::run() {
1467 if (eventBase_ == nullptr) {
1468 registerEvents();
1469 }
1470 if (useHighPriority_) {
1471 setCurrentThreadHighPriority(true);
1472 }
1473
1474 if (eventBase_ != nullptr)
1475 {
1476 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
1477 // Run libevent engine, never returns, invokes calls to eventHandler
1478 event_base_loop(eventBase_, 0);
1479
1480 if (useHighPriority_) {
1481 setCurrentThreadHighPriority(false);
1482 }
1483
1484 // cleans up our registered events
1485 cleanupEvents();
1486 }
1487
1488 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
1489 }
1490
1491 void TNonblockingIOThread::cleanupEvents() {
1492 // stop the listen socket, if any
1493 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1494 if (event_del(&serverEvent_) == -1) {
1495 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
1496 }
1497 }
1498
1499 event_del(¬ificationEvent_);
1500 }
1501
1502 void TNonblockingIOThread::stop() {
1503 // This should cause the thread to fall out of its event loop ASAP.
1504 breakLoop(false);
1505 }
1506
1507 void TNonblockingIOThread::join() {
1508 // If this was a thread created by a factory (not the thread that called
1509 // serve()), we join() it to make sure we shut down fully.
1510 if (thread_) {
1511 try {
1512 // Note that it is safe to both join() ourselves twice, as well as join
1513 // the current thread as the pthread implementation checks for deadlock.
1514 thread_->join();
1515 } catch (...) {
1516 // swallow everything
1517 }
1518 }
1519 }
1520 }
1521 }
1522 } // apache::thrift::server
1523