1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22 
23 #include <thrift/Thrift.h>
24 #include <memory>
25 #include <thrift/server/TServer.h>
26 #include <thrift/transport/PlatformSocket.h>
27 #include <thrift/transport/TBufferTransports.h>
28 #include <thrift/transport/TSocket.h>
29 #include <thrift/transport/TNonblockingServerTransport.h>
30 #include <thrift/concurrency/ThreadManager.h>
31 #include <climits>
32 #include <thrift/concurrency/Thread.h>
33 #include <thrift/concurrency/ThreadFactory.h>
34 #include <thrift/concurrency/Mutex.h>
35 #include <stack>
36 #include <vector>
37 #include <string>
38 #include <cstdlib>
39 #ifdef HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42 #include <event.h>
43 #include <event2/event_compat.h>
44 #include <event2/event_struct.h>
45 
46 namespace apache {
47 namespace thrift {
48 namespace server {
49 
50 using apache::thrift::transport::TMemoryBuffer;
51 using apache::thrift::transport::TSocket;
52 using apache::thrift::transport::TNonblockingServerTransport;
53 using apache::thrift::protocol::TProtocol;
54 using apache::thrift::concurrency::Runnable;
55 using apache::thrift::concurrency::ThreadManager;
56 using apache::thrift::concurrency::ThreadFactory;
57 using apache::thrift::concurrency::Thread;
58 using apache::thrift::concurrency::Mutex;
59 using apache::thrift::concurrency::Guard;
60 
61 #ifdef LIBEVENT_VERSION_NUMBER
62 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
63 #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
64 #define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
65 #else
66 // assume latest version 1 series
67 #define LIBEVENT_VERSION_MAJOR 1
68 #define LIBEVENT_VERSION_MINOR 14
69 #define LIBEVENT_VERSION_REL 13
70 #define LIBEVENT_VERSION_NUMBER                                                                    \
71   ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
72 #endif
73 
74 #if LIBEVENT_VERSION_NUMBER < 0x02000000
75 typedef THRIFT_SOCKET evutil_socket_t;
76 #endif
77 
78 #ifndef SOCKOPT_CAST_T
79 #ifndef _WIN32
80 #define SOCKOPT_CAST_T void
81 #else
82 #define SOCKOPT_CAST_T char
83 #endif // _WIN32
84 #endif
85 
86 template <class T>
const_cast_sockopt(const T * v)87 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
88   return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
89 }
90 
91 template <class T>
cast_sockopt(T * v)92 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
93   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
94 }
95 
96 /**
97  * This is a non-blocking server in C++ for high performance that
98  * operates a set of IO threads (by default only one). It assumes that
99  * all incoming requests are framed with a 4 byte length indicator and
100  * writes out responses using the same framing.
101  */
102 
103 /// Overload condition actions.
104 enum TOverloadAction {
105   T_OVERLOAD_NO_ACTION,       ///< Don't handle overload */
106   T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
107   T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
108 };
109 
110 class TNonblockingIOThread;
111 
112 class TNonblockingServer : public TServer {
113 private:
114   class TConnection;
115 
116   friend class TNonblockingIOThread;
117 
118 private:
119   /// Listen backlog
120   static const int LISTEN_BACKLOG = 1024;
121 
122   /// Default limit on size of idle connection pool
123   static const size_t CONNECTION_STACK_LIMIT = 1024;
124 
125   /// Default limit on frame size
126   static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
127 
128   /// Default limit on total number of connected sockets
129   static const int MAX_CONNECTIONS = INT_MAX;
130 
131   /// Default limit on connections in handler/task processing
132   static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
133 
134   /// Default size of write buffer
135   static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
136 
137   /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
138   static const int IDLE_READ_BUFFER_LIMIT = 1024;
139 
140   /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
141   static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
142 
143   /// # of calls before resizing oversized buffers (0 = check only on close)
144   static const int RESIZE_BUFFER_EVERY_N = 512;
145 
146   /// # of IO threads to use by default
147   static const int DEFAULT_IO_THREADS = 1;
148 
149   /// # of IO threads this server will use
150   size_t numIOThreads_;
151 
152   /// Whether to set high scheduling priority for IO threads
153   bool useHighPriorityIOThreads_;
154 
155   /// Server socket file descriptor
156   THRIFT_SOCKET serverSocket_;
157 
158   /// The optional user-provided event-base (for single-thread servers)
159   event_base* userEventBase_;
160 
161   /// For processing via thread pool, may be nullptr
162   std::shared_ptr<ThreadManager> threadManager_;
163 
164   /// Is thread pool processing?
165   bool threadPoolProcessing_;
166 
167   // Factory to create the IO threads
168   std::shared_ptr<ThreadFactory> ioThreadFactory_;
169 
170   // Vector of IOThread objects that will handle our IO
171   std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;
172 
173   // Index of next IO Thread to be used (for round-robin)
174   uint32_t nextIOThread_;
175 
176   // Synchronizes access to connection stack and similar data
177   Mutex connMutex_;
178 
179   /// Number of TConnection object we've created
180   size_t numTConnections_;
181 
182   /// Number of Connections processing or waiting to process
183   size_t numActiveProcessors_;
184 
185   /// Limit for how many TConnection objects to cache
186   size_t connectionStackLimit_;
187 
188   /// Limit for number of connections processing or waiting to process
189   size_t maxActiveProcessors_;
190 
191   /// Limit for number of open connections
192   size_t maxConnections_;
193 
194   /// Limit for frame size
195   size_t maxFrameSize_;
196 
197   /// Time in milliseconds before an unperformed task expires (0 == infinite).
198   int64_t taskExpireTime_;
199 
200   /**
201    * Hysteresis for overload state.  This is the fraction of the overload
202    * value that needs to be reached before the overload state is cleared;
203    * must be <= 1.0.
204    */
205   double overloadHysteresis_;
206 
207   /// Action to take when we're overloaded.
208   TOverloadAction overloadAction_;
209 
210   /**
211    * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
212    * and found to be exceeded, reinitialized) to this size.
213    */
214   size_t writeBufferDefaultSize_;
215 
216   /**
217    * Max read buffer size for an idle TConnection.  When we place an idle
218    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
219    * we will free the buffer (such that it will be reinitialized by the next
220    * received frame) if it has exceeded this limit.  0 disables this check.
221    */
222   size_t idleReadBufferLimit_;
223 
224   /**
225    * Max write buffer size for an idle connection.  When we place an idle
226    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
227    * we insure that its write buffer is <= to this size; otherwise we
228    * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
229    * idle connections don't hog memory. 0 disables this check.
230    */
231   size_t idleWriteBufferLimit_;
232 
233   /**
234    * Every N calls we check the buffer size limits on a connected TConnection.
235    * 0 disables (i.e. the checks are only done when a connection closes).
236    */
237   int32_t resizeBufferEveryN_;
238 
239   /// Set if we are currently in an overloaded state.
240   bool overloaded_;
241 
242   /// Count of connections dropped since overload started
243   uint32_t nConnectionsDropped_;
244 
245   /// Count of connections dropped on overload since server started
246   uint64_t nTotalConnectionsDropped_;
247 
248   /**
249    * This is a stack of all the objects that have been created but that
250    * are NOT currently in use. When we close a connection, we place it on this
251    * stack so that the object can be reused later, rather than freeing the
252    * memory and reallocating a new object later.
253    */
254   std::stack<TConnection*> connectionStack_;
255 
256   /**
257    * This container holds pointers to all active connections. This container
258    * allows the server to clean up unlcosed connection objects at destruction,
259    * which in turn allows their transports, protocols, processors and handlers
260    * to deallocate and clean up correctly.
261    */
262   std::vector<TConnection*> activeConnections_;
263 
264   /*
265   */
266   std::shared_ptr<TNonblockingServerTransport> serverTransport_;
267 
268   /**
269    * Called when server socket had something happen.  We accept all waiting
270    * client connections on listen socket fd and assign TConnection objects
271    * to handle those requests.
272    *
273    * @param which the event flag that triggered the handler.
274    */
275   void handleEvent(THRIFT_SOCKET fd, short which);
276 
init()277   void init() {
278     serverSocket_ = THRIFT_INVALID_SOCKET;
279     numIOThreads_ = DEFAULT_IO_THREADS;
280     nextIOThread_ = 0;
281     useHighPriorityIOThreads_ = false;
282     userEventBase_ = nullptr;
283     threadPoolProcessing_ = false;
284     numTConnections_ = 0;
285     numActiveProcessors_ = 0;
286     connectionStackLimit_ = CONNECTION_STACK_LIMIT;
287     maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
288     maxConnections_ = MAX_CONNECTIONS;
289     maxFrameSize_ = MAX_FRAME_SIZE;
290     taskExpireTime_ = 0;
291     overloadHysteresis_ = 0.8;
292     overloadAction_ = T_OVERLOAD_NO_ACTION;
293     writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
294     idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
295     idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
296     resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
297     overloaded_ = false;
298     nConnectionsDropped_ = 0;
299     nTotalConnectionsDropped_ = 0;
300   }
301 
302 public:
TNonblockingServer(const std::shared_ptr<TProcessorFactory> & processorFactory,const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport> & serverTransport)303   TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
304                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
305     : TServer(processorFactory), serverTransport_(serverTransport) {
306     init();
307   }
308 
TNonblockingServer(const std::shared_ptr<TProcessor> & processor,const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport> & serverTransport)309   TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
310                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
311     : TServer(processor), serverTransport_(serverTransport) {
312     init();
313   }
314 
315 
316   TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
317                      const std::shared_ptr<TProtocolFactory>& protocolFactory,
318                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
319                      const std::shared_ptr<ThreadManager>& threadManager
320                      = std::shared_ptr<ThreadManager>())
TServer(processorFactory)321     : TServer(processorFactory), serverTransport_(serverTransport) {
322     init();
323 
324     setInputProtocolFactory(protocolFactory);
325     setOutputProtocolFactory(protocolFactory);
326     setThreadManager(threadManager);
327   }
328 
329   TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
330                      const std::shared_ptr<TProtocolFactory>& protocolFactory,
331                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
332                      const std::shared_ptr<ThreadManager>& threadManager
333                      = std::shared_ptr<ThreadManager>())
TServer(processor)334     : TServer(processor), serverTransport_(serverTransport) {
335     init();
336 
337     setInputProtocolFactory(protocolFactory);
338     setOutputProtocolFactory(protocolFactory);
339     setThreadManager(threadManager);
340   }
341 
342   TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
343                      const std::shared_ptr<TTransportFactory>& inputTransportFactory,
344                      const std::shared_ptr<TTransportFactory>& outputTransportFactory,
345                      const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
346                      const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
347                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
348                      const std::shared_ptr<ThreadManager>& threadManager
349                      = std::shared_ptr<ThreadManager>())
TServer(processorFactory)350     : TServer(processorFactory), serverTransport_(serverTransport) {
351     init();
352 
353     setInputTransportFactory(inputTransportFactory);
354     setOutputTransportFactory(outputTransportFactory);
355     setInputProtocolFactory(inputProtocolFactory);
356     setOutputProtocolFactory(outputProtocolFactory);
357     setThreadManager(threadManager);
358   }
359 
360   TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
361                      const std::shared_ptr<TTransportFactory>& inputTransportFactory,
362                      const std::shared_ptr<TTransportFactory>& outputTransportFactory,
363                      const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
364                      const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
365                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
366                      const std::shared_ptr<ThreadManager>& threadManager
367                      = std::shared_ptr<ThreadManager>())
TServer(processor)368     : TServer(processor), serverTransport_(serverTransport) {
369     init();
370 
371     setInputTransportFactory(inputTransportFactory);
372     setOutputTransportFactory(outputTransportFactory);
373     setInputProtocolFactory(inputProtocolFactory);
374     setOutputProtocolFactory(outputProtocolFactory);
375     setThreadManager(threadManager);
376   }
377 
378   ~TNonblockingServer() override;
379 
380   void setThreadManager(std::shared_ptr<ThreadManager> threadManager);
381 
getListenPort()382   int getListenPort() { return serverTransport_->getListenPort(); }
383 
getThreadManager()384   std::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }
385 
386   /**
387    * Sets the number of IO threads used by this server. Can only be used before
388    * the call to serve() and has no effect afterwards.
389    */
setNumIOThreads(size_t numThreads)390   void setNumIOThreads(size_t numThreads) {
391     numIOThreads_ = numThreads;
392     // User-provided event-base doesn't works for multi-threaded servers
393     assert(numIOThreads_ <= 1 || !userEventBase_);
394   }
395 
396   /** Return whether the IO threads will get high scheduling priority */
useHighPriorityIOThreads()397   bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
398 
399   /** Set whether the IO threads will get high scheduling priority. */
setUseHighPriorityIOThreads(bool val)400   void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }
401 
402   /** Return the number of IO threads used by this server. */
getNumIOThreads()403   size_t getNumIOThreads() const { return numIOThreads_; }
404 
405   /**
406    * Get the maximum number of unused TConnection we will hold in reserve.
407    *
408    * @return the current limit on TConnection pool size.
409    */
getConnectionStackLimit()410   size_t getConnectionStackLimit() const { return connectionStackLimit_; }
411 
412   /**
413    * Set the maximum number of unused TConnection we will hold in reserve.
414    *
415    * @param sz the new limit for TConnection pool size.
416    */
setConnectionStackLimit(size_t sz)417   void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }
418 
isThreadPoolProcessing()419   bool isThreadPoolProcessing() const { return threadPoolProcessing_; }
420 
addTask(std::shared_ptr<Runnable> task)421   void addTask(std::shared_ptr<Runnable> task) {
422     threadManager_->add(task, 0LL, taskExpireTime_);
423   }
424 
425   /**
426    * Return the count of sockets currently connected to.
427    *
428    * @return count of connected sockets.
429    */
getNumConnections()430   size_t getNumConnections() const { return numTConnections_; }
431 
432   /**
433    * Return the count of sockets currently connected to.
434    *
435    * @return count of connected sockets.
436    */
getNumActiveConnections()437   size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }
438 
439   /**
440    * Return the count of connection objects allocated but not in use.
441    *
442    * @return count of idle connection objects.
443    */
getNumIdleConnections()444   size_t getNumIdleConnections() const { return connectionStack_.size(); }
445 
446   /**
447    * Return count of number of connections which are currently processing.
448    * This is defined as a connection where all data has been received and
449    * either assigned a task (when threading) or passed to a handler (when
450    * not threading), and where the handler has not yet returned.
451    *
452    * @return # of connections currently processing.
453    */
getNumActiveProcessors()454   size_t getNumActiveProcessors() const { return numActiveProcessors_; }
455 
456   /// Increment the count of connections currently processing.
incrementActiveProcessors()457   void incrementActiveProcessors() {
458     Guard g(connMutex_);
459     ++numActiveProcessors_;
460   }
461 
462   /// Decrement the count of connections currently processing.
decrementActiveProcessors()463   void decrementActiveProcessors() {
464     Guard g(connMutex_);
465     if (numActiveProcessors_ > 0) {
466       --numActiveProcessors_;
467     }
468   }
469 
470   /**
471    * Get the maximum # of connections allowed before overload.
472    *
473    * @return current setting.
474    */
getMaxConnections()475   size_t getMaxConnections() const { return maxConnections_; }
476 
477   /**
478    * Set the maximum # of connections allowed before overload.
479    *
480    * @param maxConnections new setting for maximum # of connections.
481    */
setMaxConnections(size_t maxConnections)482   void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }
483 
484   /**
485    * Get the maximum # of connections waiting in handler/task before overload.
486    *
487    * @return current setting.
488    */
getMaxActiveProcessors()489   size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }
490 
491   /**
492    * Set the maximum # of connections waiting in handler/task before overload.
493    *
494    * @param maxActiveProcessors new setting for maximum # of active processes.
495    */
setMaxActiveProcessors(size_t maxActiveProcessors)496   void setMaxActiveProcessors(size_t maxActiveProcessors) {
497     maxActiveProcessors_ = maxActiveProcessors;
498   }
499 
500   /**
501    * Get the maximum allowed frame size.
502    *
503    * If a client tries to send a message larger than this limit,
504    * its connection will be closed.
505    *
506    * @return Maxium frame size, in bytes.
507    */
getMaxFrameSize()508   size_t getMaxFrameSize() const { return maxFrameSize_; }
509 
510   /**
511    * Set the maximum allowed frame size.
512    *
513    * @param maxFrameSize The new maximum frame size.
514    */
setMaxFrameSize(size_t maxFrameSize)515   void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
516 
517   /**
518    * Get fraction of maximum limits before an overload condition is cleared.
519    *
520    * @return hysteresis fraction
521    */
getOverloadHysteresis()522   double getOverloadHysteresis() const { return overloadHysteresis_; }
523 
524   /**
525    * Set fraction of maximum limits before an overload condition is cleared.
526    * A good value would probably be between 0.5 and 0.9.
527    *
528    * @param hysteresisFraction fraction <= 1.0.
529    */
setOverloadHysteresis(double hysteresisFraction)530   void setOverloadHysteresis(double hysteresisFraction) {
531     if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
532       overloadHysteresis_ = hysteresisFraction;
533     }
534   }
535 
536   /**
537    * Get the action the server will take on overload.
538    *
539    * @return a TOverloadAction enum value for the currently set action.
540    */
getOverloadAction()541   TOverloadAction getOverloadAction() const { return overloadAction_; }
542 
543   /**
544    * Set the action the server is to take on overload.
545    *
546    * @param overloadAction a TOverloadAction enum value for the action.
547    */
setOverloadAction(TOverloadAction overloadAction)548   void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }
549 
550   /**
551    * Get the time in milliseconds after which a task expires (0 == infinite).
552    *
553    * @return a 64-bit time in milliseconds.
554    */
getTaskExpireTime()555   int64_t getTaskExpireTime() const { return taskExpireTime_; }
556 
557   /**
558    * Set the time in milliseconds after which a task expires (0 == infinite).
559    *
560    * @param taskExpireTime a 64-bit time in milliseconds.
561    */
setTaskExpireTime(int64_t taskExpireTime)562   void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }
563 
564   /**
565    * Determine if the server is currently overloaded.
566    * This function checks the maximums for open connections and connections
567    * currently in processing, and sets an overload condition if they are
568    * exceeded.  The overload will persist until both values are below the
569    * current hysteresis fraction of their maximums.
570    *
571    * @return true if an overload condition exists, false if not.
572    */
573   bool serverOverloaded();
574 
575   /** Pop and discard next task on threadpool wait queue.
576    *
577    * @return true if a task was discarded, false if the wait queue was empty.
578    */
579   bool drainPendingTask();
580 
581   /**
582    * Get the starting size of a TConnection object's write buffer.
583    *
584    * @return # bytes we initialize a TConnection object's write buffer to.
585    */
getWriteBufferDefaultSize()586   size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }
587 
588   /**
589    * Set the starting size of a TConnection object's write buffer.
590    *
591    * @param size # bytes we initialize a TConnection object's write buffer to.
592    */
setWriteBufferDefaultSize(size_t size)593   void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }
594 
595   /**
596    * Get the maximum size of read buffer allocated to idle TConnection objects.
597    *
598    * @return # bytes beyond which we will dealloc idle buffer.
599    */
getIdleReadBufferLimit()600   size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }
601 
602   /**
603    * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
604    * Get the maximum size of read buffer allocated to idle TConnection objects.
605    *
606    * @return # bytes beyond which we will dealloc idle buffer.
607    */
getIdleBufferMemLimit()608   size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }
609 
610   /**
611    * Set the maximum size read buffer allocated to idle TConnection objects.
612    * If a TConnection object is found (either on connection close or between
613    * calls when resizeBufferEveryN_ is set) with more than this much memory
614    * allocated to its read buffer, we free it and allow it to be reinitialized
615    * on the next received frame.
616    *
617    * @param limit of bytes beyond which we will shrink buffers when checked.
618    */
setIdleReadBufferLimit(size_t limit)619   void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }
620 
621   /**
622    * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
623    * Set the maximum size read buffer allocated to idle TConnection objects.
624    * If a TConnection object is found (either on connection close or between
625    * calls when resizeBufferEveryN_ is set) with more than this much memory
626    * allocated to its read buffer, we free it and allow it to be reinitialized
627    * on the next received frame.
628    *
629    * @param limit of bytes beyond which we will shrink buffers when checked.
630    */
setIdleBufferMemLimit(size_t limit)631   void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }
632 
633   /**
634    * Get the maximum size of write buffer allocated to idle TConnection objects.
635    *
636    * @return # bytes beyond which we will reallocate buffers when checked.
637    */
getIdleWriteBufferLimit()638   size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }
639 
640   /**
641    * Set the maximum size write buffer allocated to idle TConnection objects.
642    * If a TConnection object is found (either on connection close or between
643    * calls when resizeBufferEveryN_ is set) with more than this much memory
644    * allocated to its write buffer, we destroy and construct that buffer with
645    * writeBufferDefaultSize_ bytes.
646    *
647    * @param limit of bytes beyond which we will shrink buffers when idle.
648    */
setIdleWriteBufferLimit(size_t limit)649   void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }
650 
651   /**
652    * Get # of calls made between buffer size checks.  0 means disabled.
653    *
654    * @return # of calls between buffer size checks.
655    */
getResizeBufferEveryN()656   int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }
657 
658   /**
659    * Check buffer sizes every "count" calls.  This allows buffer limits
660    * to be enforced for persistent connections with a controllable degree
661    * of overhead. 0 disables checks except at connection close.
662    *
663    * @param count the number of calls between checks, or 0 to disable
664    */
setResizeBufferEveryN(int32_t count)665   void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }
666 
667   /**
668    * Main workhorse function, starts up the server listening on a port and
669    * loops over the libevent handler.
670    */
671   void serve() override;
672 
673   /**
674    * Causes the server to terminate gracefully (can be called from any thread).
675    */
676   void stop() override;
677 
678   /// Creates a socket to listen on and binds it to the local port.
679   void createAndListenOnSocket();
680 
681   /**
682    * Register the optional user-provided event-base (for single-thread servers)
683    *
684    * This method should be used when the server is running in a single-thread
685    * mode, and the event base is provided by the user (i.e., the caller).
686    *
687    * @param user_event_base the user-provided event-base. The user is
688    * responsible for freeing the event base memory.
689    */
690   void registerEvents(event_base* user_event_base);
691 
692   /**
693    * Returns the optional user-provided event-base (for single-thread servers).
694    */
getUserEventBase()695   event_base* getUserEventBase() const { return userEventBase_; }
696 
697   /** Some transports, like THeaderTransport, require passing through
698    * the framing size instead of stripping it.
699    */
700   bool getHeaderTransport();
701 
702 private:
703   /**
704    * Callback function that the threadmanager calls when a task reaches
705    * its expiration time.  It is needed to clean up the expired connection.
706    *
707    * @param task the runnable associated with the expired task.
708    */
709   void expireClose(std::shared_ptr<Runnable> task);
710 
711   /**
712    * Return an initialized connection object.  Creates or recovers from
713    * pool a TConnection and initializes it with the provided socket FD
714    * and flags.
715    *
716    * @param socket FD of socket associated with this connection.
717    * @param addr the sockaddr of the client
718    * @param addrLen the length of addr
719    * @return pointer to initialized TConnection object.
720    */
721   TConnection* createConnection(std::shared_ptr<TSocket> socket);
722 
723   /**
724    * Returns a connection to pool or deletion.  If the connection pool
725    * (a stack) isn't full, place the connection object on it, otherwise
726    * just delete it.
727    *
728    * @param connection the TConection being returned.
729    */
730   void returnConnection(TConnection* connection);
731 };
732 
733 class TNonblockingIOThread : public Runnable {
734 public:
735   // Creates an IO thread and sets up the event base.  The listenSocket should
736   // be a valid FD on which listen() has already been called.  If the
737   // listenSocket is < 0, accepting will not be done.
738   TNonblockingIOThread(TNonblockingServer* server,
739                        int number,
740                        THRIFT_SOCKET listenSocket,
741                        bool useHighPriority);
742 
743   ~TNonblockingIOThread() override;
744 
745   // Returns the event-base for this thread.
getEventBase()746   event_base* getEventBase() const { return eventBase_; }
747 
748   // Returns the server for this thread.
getServer()749   TNonblockingServer* getServer() const { return server_; }
750 
751   // Returns the number of this IO thread.
getThreadNumber()752   int getThreadNumber() const { return number_; }
753 
754   // Returns the thread id associated with this object.  This should
755   // only be called after the thread has been started.
getThreadId()756   Thread::id_t getThreadId() const { return threadId_; }
757 
758   // Returns the send-fd for task complete notifications.
getNotificationSendFD()759   evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
760 
761   // Returns the read-fd for task complete notifications.
getNotificationRecvFD()762   evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
763 
764   // Returns the actual thread object associated with this IO thread.
getThread()765   std::shared_ptr<Thread> getThread() const { return thread_; }
766 
767   // Sets the actual thread object associated with this IO thread.
setThread(const std::shared_ptr<Thread> & t)768   void setThread(const std::shared_ptr<Thread>& t) { thread_ = t; }
769 
770   // Used by TConnection objects to indicate processing has finished.
771   bool notify(TNonblockingServer::TConnection* conn);
772 
773   // Enters the event loop and does not return until a call to stop().
774   void run() override;
775 
776   // Exits the event loop as soon as possible.
777   void stop();
778 
779   // Ensures that the event-loop thread is fully finished and shut down.
780   void join();
781 
782   /// Registers the events for the notification & listen sockets
783   void registerEvents();
784 
785 private:
786   /**
787    * C-callable event handler for signaling task completion.  Provides a
788    * callback that libevent can understand that will read a connection
789    * object's address from a pipe and call connection->transition() for
790    * that object.
791    *
792    * @param fd the descriptor the event occurred on.
793    */
794   static void notifyHandler(evutil_socket_t fd, short which, void* v);
795 
796   /**
797    * C-callable event handler for listener events.  Provides a callback
798    * that libevent can understand which invokes server->handleEvent().
799    *
800    * @param fd the descriptor the event occurred on.
801    * @param which the flags associated with the event.
802    * @param v void* callback arg where we placed TNonblockingServer's "this".
803    */
listenHandler(evutil_socket_t fd,short which,void * v)804   static void listenHandler(evutil_socket_t fd, short which, void* v) {
805     ((TNonblockingServer*)v)->handleEvent(fd, which);
806   }
807 
808   /// Exits the loop ASAP in case of shutdown or error.
809   void breakLoop(bool error);
810 
811   /// Create the pipe used to notify I/O process of task completion.
812   void createNotificationPipe();
813 
814   /// Unregisters our events for notification and listen sockets.
815   void cleanupEvents();
816 
817   /// Sets (or clears) high priority scheduling status for the current thread.
818   void setCurrentThreadHighPriority(bool value);
819 
820 private:
821   /// associated server
822   TNonblockingServer* server_;
823 
824   /// thread number (for debugging).
825   const int number_;
826 
827   /// The actual physical thread id.
828   Thread::id_t threadId_;
829 
830   /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
831   THRIFT_SOCKET listenSocket_;
832 
833   /// Sets a high scheduling priority when running
834   bool useHighPriority_;
835 
836   /// pointer to eventbase to be used for looping
837   event_base* eventBase_;
838 
839   /// Set to true if this class is responsible for freeing the event base
840   /// memory.
841   bool ownEventBase_;
842 
843   /// Used with eventBase_ for connection events (only in listener thread)
844   struct event serverEvent_;
845 
846   /// Used with eventBase_ for task completion notification
847   struct event notificationEvent_;
848 
849   /// File descriptors for pipe used for task completion notification.
850   evutil_socket_t notificationPipeFDs_[2];
851 
852   /// Actual IO Thread
853   std::shared_ptr<Thread> thread_;
854 };
855 }
856 }
857 } // apache::thrift::server
858 
859 #endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
860