1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22 
23 #include <thrift/Thrift.h>
24 #include <thrift/stdcxx.h>
25 #include <thrift/server/TServer.h>
26 #include <thrift/transport/PlatformSocket.h>
27 #include <thrift/transport/TBufferTransports.h>
28 #include <thrift/transport/TSocket.h>
29 #include <thrift/transport/TNonblockingServerTransport.h>
30 #include <thrift/concurrency/ThreadManager.h>
31 #include <climits>
32 #include <thrift/concurrency/Thread.h>
33 #include <thrift/concurrency/PlatformThreadFactory.h>
34 #include <thrift/concurrency/Mutex.h>
35 #include <stack>
36 #include <vector>
37 #include <string>
38 #include <cstdlib>
39 #ifdef HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42 #include <event.h>
43 #include <event2/event_compat.h>
44 #include <event2/event_struct.h>
45 
46 namespace apache {
47 namespace thrift {
48 namespace server {
49 
50 using apache::thrift::transport::TMemoryBuffer;
51 using apache::thrift::transport::TSocket;
52 using apache::thrift::transport::TNonblockingServerTransport;
53 using apache::thrift::protocol::TProtocol;
54 using apache::thrift::concurrency::Runnable;
55 using apache::thrift::concurrency::ThreadManager;
56 using apache::thrift::concurrency::PlatformThreadFactory;
57 using apache::thrift::concurrency::ThreadFactory;
58 using apache::thrift::concurrency::Thread;
59 using apache::thrift::concurrency::Mutex;
60 using apache::thrift::concurrency::Guard;
61 
62 #ifdef LIBEVENT_VERSION_NUMBER
63 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
64 #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
65 #define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
66 #else
67 // assume latest version 1 series
68 #define LIBEVENT_VERSION_MAJOR 1
69 #define LIBEVENT_VERSION_MINOR 14
70 #define LIBEVENT_VERSION_REL 13
71 #define LIBEVENT_VERSION_NUMBER                                                                    \
72   ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
73 #endif
74 
75 #if LIBEVENT_VERSION_NUMBER < 0x02000000
76 typedef THRIFT_SOCKET evutil_socket_t;
77 #endif
78 
79 #ifndef SOCKOPT_CAST_T
80 #ifndef _WIN32
81 #define SOCKOPT_CAST_T void
82 #else
83 #define SOCKOPT_CAST_T char
84 #endif // _WIN32
85 #endif
86 
87 template <class T>
const_cast_sockopt(const T * v)88 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
89   return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
90 }
91 
92 template <class T>
cast_sockopt(T * v)93 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
94   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
95 }
96 
97 /**
98  * This is a non-blocking server in C++ for high performance that
99  * operates a set of IO threads (by default only one). It assumes that
100  * all incoming requests are framed with a 4 byte length indicator and
101  * writes out responses using the same framing.
102  */
103 
104 /// Overload condition actions.
105 enum TOverloadAction {
106   T_OVERLOAD_NO_ACTION,       ///< Don't handle overload */
107   T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
108   T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
109 };
110 
111 class TNonblockingIOThread;
112 
113 class TNonblockingServer : public TServer {
114 private:
115   class TConnection;
116 
117   friend class TNonblockingIOThread;
118 
119 private:
120   /// Listen backlog
121   static const int LISTEN_BACKLOG = 1024;
122 
123   /// Default limit on size of idle connection pool
124   static const size_t CONNECTION_STACK_LIMIT = 1024;
125 
126   /// Default limit on frame size
127   static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
128 
129   /// Default limit on total number of connected sockets
130   static const int MAX_CONNECTIONS = INT_MAX;
131 
132   /// Default limit on connections in handler/task processing
133   static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
134 
135   /// Default size of write buffer
136   static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
137 
138   /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
139   static const int IDLE_READ_BUFFER_LIMIT = 1024;
140 
141   /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
142   static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
143 
144   /// # of calls before resizing oversized buffers (0 = check only on close)
145   static const int RESIZE_BUFFER_EVERY_N = 512;
146 
147   /// # of IO threads to use by default
148   static const int DEFAULT_IO_THREADS = 1;
149 
150   /// # of IO threads this server will use
151   size_t numIOThreads_;
152 
153   /// Whether to set high scheduling priority for IO threads
154   bool useHighPriorityIOThreads_;
155 
156   /// Server socket file descriptor
157   THRIFT_SOCKET serverSocket_;
158 
159   /// The optional user-provided event-base (for single-thread servers)
160   event_base* userEventBase_;
161 
162   /// For processing via thread pool, may be NULL
163   stdcxx::shared_ptr<ThreadManager> threadManager_;
164 
165   /// Is thread pool processing?
166   bool threadPoolProcessing_;
167 
168   // Factory to create the IO threads
169   stdcxx::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
170 
171   // Vector of IOThread objects that will handle our IO
172   std::vector<stdcxx::shared_ptr<TNonblockingIOThread> > ioThreads_;
173 
174   // Index of next IO Thread to be used (for round-robin)
175   uint32_t nextIOThread_;
176 
177   // Synchronizes access to connection stack and similar data
178   Mutex connMutex_;
179 
180   /// Number of TConnection object we've created
181   size_t numTConnections_;
182 
183   /// Number of Connections processing or waiting to process
184   size_t numActiveProcessors_;
185 
186   /// Limit for how many TConnection objects to cache
187   size_t connectionStackLimit_;
188 
189   /// Limit for number of connections processing or waiting to process
190   size_t maxActiveProcessors_;
191 
192   /// Limit for number of open connections
193   size_t maxConnections_;
194 
195   /// Limit for frame size
196   size_t maxFrameSize_;
197 
198   /// Time in milliseconds before an unperformed task expires (0 == infinite).
199   int64_t taskExpireTime_;
200 
201   /**
202    * Hysteresis for overload state.  This is the fraction of the overload
203    * value that needs to be reached before the overload state is cleared;
204    * must be <= 1.0.
205    */
206   double overloadHysteresis_;
207 
208   /// Action to take when we're overloaded.
209   TOverloadAction overloadAction_;
210 
211   /**
212    * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
213    * and found to be exceeded, reinitialized) to this size.
214    */
215   size_t writeBufferDefaultSize_;
216 
217   /**
218    * Max read buffer size for an idle TConnection.  When we place an idle
219    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
220    * we will free the buffer (such that it will be reinitialized by the next
221    * received frame) if it has exceeded this limit.  0 disables this check.
222    */
223   size_t idleReadBufferLimit_;
224 
225   /**
226    * Max write buffer size for an idle connection.  When we place an idle
227    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
228    * we insure that its write buffer is <= to this size; otherwise we
229    * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
230    * idle connections don't hog memory. 0 disables this check.
231    */
232   size_t idleWriteBufferLimit_;
233 
234   /**
235    * Every N calls we check the buffer size limits on a connected TConnection.
236    * 0 disables (i.e. the checks are only done when a connection closes).
237    */
238   int32_t resizeBufferEveryN_;
239 
240   /// Set if we are currently in an overloaded state.
241   bool overloaded_;
242 
243   /// Count of connections dropped since overload started
244   uint32_t nConnectionsDropped_;
245 
246   /// Count of connections dropped on overload since server started
247   uint64_t nTotalConnectionsDropped_;
248 
249   /**
250    * This is a stack of all the objects that have been created but that
251    * are NOT currently in use. When we close a connection, we place it on this
252    * stack so that the object can be reused later, rather than freeing the
253    * memory and reallocating a new object later.
254    */
255   std::stack<TConnection*> connectionStack_;
256 
257   /**
258    * This container holds pointers to all active connections. This container
259    * allows the server to clean up unlcosed connection objects at destruction,
260    * which in turn allows their transports, protocols, processors and handlers
261    * to deallocate and clean up correctly.
262    */
263   std::vector<TConnection*> activeConnections_;
264 
265   /*
266   */
267   stdcxx::shared_ptr<TNonblockingServerTransport> serverTransport_;
268 
269   /**
270    * Called when server socket had something happen.  We accept all waiting
271    * client connections on listen socket fd and assign TConnection objects
272    * to handle those requests.
273    *
274    * @param which the event flag that triggered the handler.
275    */
276   void handleEvent(THRIFT_SOCKET fd, short which);
277 
init()278   void init() {
279     serverSocket_ = THRIFT_INVALID_SOCKET;
280     numIOThreads_ = DEFAULT_IO_THREADS;
281     nextIOThread_ = 0;
282     useHighPriorityIOThreads_ = false;
283     userEventBase_ = NULL;
284     threadPoolProcessing_ = false;
285     numTConnections_ = 0;
286     numActiveProcessors_ = 0;
287     connectionStackLimit_ = CONNECTION_STACK_LIMIT;
288     maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
289     maxConnections_ = MAX_CONNECTIONS;
290     maxFrameSize_ = MAX_FRAME_SIZE;
291     taskExpireTime_ = 0;
292     overloadHysteresis_ = 0.8;
293     overloadAction_ = T_OVERLOAD_NO_ACTION;
294     writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
295     idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
296     idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
297     resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
298     overloaded_ = false;
299     nConnectionsDropped_ = 0;
300     nTotalConnectionsDropped_ = 0;
301   }
302 
303 public:
TNonblockingServer(const stdcxx::shared_ptr<TProcessorFactory> & processorFactory,const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport> & serverTransport)304   TNonblockingServer(const stdcxx::shared_ptr<TProcessorFactory>& processorFactory,
305                      const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
306     : TServer(processorFactory), serverTransport_(serverTransport) {
307     init();
308   }
309 
TNonblockingServer(const stdcxx::shared_ptr<TProcessor> & processor,const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport> & serverTransport)310   TNonblockingServer(const stdcxx::shared_ptr<TProcessor>& processor,
311                      const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
312     : TServer(processor), serverTransport_(serverTransport) {
313     init();
314   }
315 
316 
317   TNonblockingServer(const stdcxx::shared_ptr<TProcessorFactory>& processorFactory,
318                      const stdcxx::shared_ptr<TProtocolFactory>& protocolFactory,
319                      const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
320                      const stdcxx::shared_ptr<ThreadManager>& threadManager
321                      = stdcxx::shared_ptr<ThreadManager>())
TServer(processorFactory)322     : TServer(processorFactory), serverTransport_(serverTransport) {
323     init();
324 
325     setInputProtocolFactory(protocolFactory);
326     setOutputProtocolFactory(protocolFactory);
327     setThreadManager(threadManager);
328   }
329 
330   TNonblockingServer(const stdcxx::shared_ptr<TProcessor>& processor,
331                      const stdcxx::shared_ptr<TProtocolFactory>& protocolFactory,
332                      const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
333                      const stdcxx::shared_ptr<ThreadManager>& threadManager
334                      = stdcxx::shared_ptr<ThreadManager>())
TServer(processor)335     : TServer(processor), serverTransport_(serverTransport) {
336     init();
337 
338     setInputProtocolFactory(protocolFactory);
339     setOutputProtocolFactory(protocolFactory);
340     setThreadManager(threadManager);
341   }
342 
343   TNonblockingServer(const stdcxx::shared_ptr<TProcessorFactory>& processorFactory,
344                      const stdcxx::shared_ptr<TTransportFactory>& inputTransportFactory,
345                      const stdcxx::shared_ptr<TTransportFactory>& outputTransportFactory,
346                      const stdcxx::shared_ptr<TProtocolFactory>& inputProtocolFactory,
347                      const stdcxx::shared_ptr<TProtocolFactory>& outputProtocolFactory,
348                      const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
349                      const stdcxx::shared_ptr<ThreadManager>& threadManager
350                      = stdcxx::shared_ptr<ThreadManager>())
TServer(processorFactory)351     : TServer(processorFactory), serverTransport_(serverTransport) {
352     init();
353 
354     setInputTransportFactory(inputTransportFactory);
355     setOutputTransportFactory(outputTransportFactory);
356     setInputProtocolFactory(inputProtocolFactory);
357     setOutputProtocolFactory(outputProtocolFactory);
358     setThreadManager(threadManager);
359   }
360 
361   TNonblockingServer(const stdcxx::shared_ptr<TProcessor>& processor,
362                      const stdcxx::shared_ptr<TTransportFactory>& inputTransportFactory,
363                      const stdcxx::shared_ptr<TTransportFactory>& outputTransportFactory,
364                      const stdcxx::shared_ptr<TProtocolFactory>& inputProtocolFactory,
365                      const stdcxx::shared_ptr<TProtocolFactory>& outputProtocolFactory,
366                      const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
367                      const stdcxx::shared_ptr<ThreadManager>& threadManager
368                      = stdcxx::shared_ptr<ThreadManager>())
TServer(processor)369     : TServer(processor), serverTransport_(serverTransport) {
370     init();
371 
372     setInputTransportFactory(inputTransportFactory);
373     setOutputTransportFactory(outputTransportFactory);
374     setInputProtocolFactory(inputProtocolFactory);
375     setOutputProtocolFactory(outputProtocolFactory);
376     setThreadManager(threadManager);
377   }
378 
379   ~TNonblockingServer();
380 
381   void setThreadManager(stdcxx::shared_ptr<ThreadManager> threadManager);
382 
getListenPort()383   int getListenPort() { return serverTransport_->getListenPort(); }
384 
getThreadManager()385   stdcxx::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }
386 
387   /**
388    * Sets the number of IO threads used by this server. Can only be used before
389    * the call to serve() and has no effect afterwards.  We always use a
390    * PosixThreadFactory for the IO worker threads, because they must joinable
391    * for clean shutdown.
392    */
setNumIOThreads(size_t numThreads)393   void setNumIOThreads(size_t numThreads) {
394     numIOThreads_ = numThreads;
395     // User-provided event-base doesn't works for multi-threaded servers
396     assert(numIOThreads_ <= 1 || !userEventBase_);
397   }
398 
399   /** Return whether the IO threads will get high scheduling priority */
useHighPriorityIOThreads()400   bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
401 
402   /** Set whether the IO threads will get high scheduling priority. */
setUseHighPriorityIOThreads(bool val)403   void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }
404 
405   /** Return the number of IO threads used by this server. */
getNumIOThreads()406   size_t getNumIOThreads() const { return numIOThreads_; }
407 
408   /**
409    * Get the maximum number of unused TConnection we will hold in reserve.
410    *
411    * @return the current limit on TConnection pool size.
412    */
getConnectionStackLimit()413   size_t getConnectionStackLimit() const { return connectionStackLimit_; }
414 
415   /**
416    * Set the maximum number of unused TConnection we will hold in reserve.
417    *
418    * @param sz the new limit for TConnection pool size.
419    */
setConnectionStackLimit(size_t sz)420   void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }
421 
isThreadPoolProcessing()422   bool isThreadPoolProcessing() const { return threadPoolProcessing_; }
423 
addTask(stdcxx::shared_ptr<Runnable> task)424   void addTask(stdcxx::shared_ptr<Runnable> task) {
425     threadManager_->add(task, 0LL, taskExpireTime_);
426   }
427 
428   /**
429    * Return the count of sockets currently connected to.
430    *
431    * @return count of connected sockets.
432    */
getNumConnections()433   size_t getNumConnections() const { return numTConnections_; }
434 
435   /**
436    * Return the count of sockets currently connected to.
437    *
438    * @return count of connected sockets.
439    */
getNumActiveConnections()440   size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }
441 
442   /**
443    * Return the count of connection objects allocated but not in use.
444    *
445    * @return count of idle connection objects.
446    */
getNumIdleConnections()447   size_t getNumIdleConnections() const { return connectionStack_.size(); }
448 
449   /**
450    * Return count of number of connections which are currently processing.
451    * This is defined as a connection where all data has been received and
452    * either assigned a task (when threading) or passed to a handler (when
453    * not threading), and where the handler has not yet returned.
454    *
455    * @return # of connections currently processing.
456    */
getNumActiveProcessors()457   size_t getNumActiveProcessors() const { return numActiveProcessors_; }
458 
459   /// Increment the count of connections currently processing.
incrementActiveProcessors()460   void incrementActiveProcessors() {
461     Guard g(connMutex_);
462     ++numActiveProcessors_;
463   }
464 
465   /// Decrement the count of connections currently processing.
decrementActiveProcessors()466   void decrementActiveProcessors() {
467     Guard g(connMutex_);
468     if (numActiveProcessors_ > 0) {
469       --numActiveProcessors_;
470     }
471   }
472 
473   /**
474    * Get the maximum # of connections allowed before overload.
475    *
476    * @return current setting.
477    */
getMaxConnections()478   size_t getMaxConnections() const { return maxConnections_; }
479 
480   /**
481    * Set the maximum # of connections allowed before overload.
482    *
483    * @param maxConnections new setting for maximum # of connections.
484    */
setMaxConnections(size_t maxConnections)485   void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }
486 
487   /**
488    * Get the maximum # of connections waiting in handler/task before overload.
489    *
490    * @return current setting.
491    */
getMaxActiveProcessors()492   size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }
493 
494   /**
495    * Set the maximum # of connections waiting in handler/task before overload.
496    *
497    * @param maxActiveProcessors new setting for maximum # of active processes.
498    */
setMaxActiveProcessors(size_t maxActiveProcessors)499   void setMaxActiveProcessors(size_t maxActiveProcessors) {
500     maxActiveProcessors_ = maxActiveProcessors;
501   }
502 
503   /**
504    * Get the maximum allowed frame size.
505    *
506    * If a client tries to send a message larger than this limit,
507    * its connection will be closed.
508    *
509    * @return Maxium frame size, in bytes.
510    */
getMaxFrameSize()511   size_t getMaxFrameSize() const { return maxFrameSize_; }
512 
513   /**
514    * Set the maximum allowed frame size.
515    *
516    * @param maxFrameSize The new maximum frame size.
517    */
setMaxFrameSize(size_t maxFrameSize)518   void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
519 
520   /**
521    * Get fraction of maximum limits before an overload condition is cleared.
522    *
523    * @return hysteresis fraction
524    */
getOverloadHysteresis()525   double getOverloadHysteresis() const { return overloadHysteresis_; }
526 
527   /**
528    * Set fraction of maximum limits before an overload condition is cleared.
529    * A good value would probably be between 0.5 and 0.9.
530    *
531    * @param hysteresisFraction fraction <= 1.0.
532    */
setOverloadHysteresis(double hysteresisFraction)533   void setOverloadHysteresis(double hysteresisFraction) {
534     if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
535       overloadHysteresis_ = hysteresisFraction;
536     }
537   }
538 
539   /**
540    * Get the action the server will take on overload.
541    *
542    * @return a TOverloadAction enum value for the currently set action.
543    */
getOverloadAction()544   TOverloadAction getOverloadAction() const { return overloadAction_; }
545 
546   /**
547    * Set the action the server is to take on overload.
548    *
549    * @param overloadAction a TOverloadAction enum value for the action.
550    */
setOverloadAction(TOverloadAction overloadAction)551   void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }
552 
553   /**
554    * Get the time in milliseconds after which a task expires (0 == infinite).
555    *
556    * @return a 64-bit time in milliseconds.
557    */
getTaskExpireTime()558   int64_t getTaskExpireTime() const { return taskExpireTime_; }
559 
560   /**
561    * Set the time in milliseconds after which a task expires (0 == infinite).
562    *
563    * @param taskExpireTime a 64-bit time in milliseconds.
564    */
setTaskExpireTime(int64_t taskExpireTime)565   void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }
566 
567   /**
568    * Determine if the server is currently overloaded.
569    * This function checks the maximums for open connections and connections
570    * currently in processing, and sets an overload condition if they are
571    * exceeded.  The overload will persist until both values are below the
572    * current hysteresis fraction of their maximums.
573    *
574    * @return true if an overload condition exists, false if not.
575    */
576   bool serverOverloaded();
577 
578   /** Pop and discard next task on threadpool wait queue.
579    *
580    * @return true if a task was discarded, false if the wait queue was empty.
581    */
582   bool drainPendingTask();
583 
584   /**
585    * Get the starting size of a TConnection object's write buffer.
586    *
587    * @return # bytes we initialize a TConnection object's write buffer to.
588    */
getWriteBufferDefaultSize()589   size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }
590 
591   /**
592    * Set the starting size of a TConnection object's write buffer.
593    *
594    * @param size # bytes we initialize a TConnection object's write buffer to.
595    */
setWriteBufferDefaultSize(size_t size)596   void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }
597 
598   /**
599    * Get the maximum size of read buffer allocated to idle TConnection objects.
600    *
601    * @return # bytes beyond which we will dealloc idle buffer.
602    */
getIdleReadBufferLimit()603   size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }
604 
605   /**
606    * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
607    * Get the maximum size of read buffer allocated to idle TConnection objects.
608    *
609    * @return # bytes beyond which we will dealloc idle buffer.
610    */
getIdleBufferMemLimit()611   size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }
612 
613   /**
614    * Set the maximum size read buffer allocated to idle TConnection objects.
615    * If a TConnection object is found (either on connection close or between
616    * calls when resizeBufferEveryN_ is set) with more than this much memory
617    * allocated to its read buffer, we free it and allow it to be reinitialized
618    * on the next received frame.
619    *
620    * @param limit of bytes beyond which we will shrink buffers when checked.
621    */
setIdleReadBufferLimit(size_t limit)622   void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }
623 
624   /**
625    * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
626    * Set the maximum size read buffer allocated to idle TConnection objects.
627    * If a TConnection object is found (either on connection close or between
628    * calls when resizeBufferEveryN_ is set) with more than this much memory
629    * allocated to its read buffer, we free it and allow it to be reinitialized
630    * on the next received frame.
631    *
632    * @param limit of bytes beyond which we will shrink buffers when checked.
633    */
setIdleBufferMemLimit(size_t limit)634   void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }
635 
636   /**
637    * Get the maximum size of write buffer allocated to idle TConnection objects.
638    *
639    * @return # bytes beyond which we will reallocate buffers when checked.
640    */
getIdleWriteBufferLimit()641   size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }
642 
643   /**
644    * Set the maximum size write buffer allocated to idle TConnection objects.
645    * If a TConnection object is found (either on connection close or between
646    * calls when resizeBufferEveryN_ is set) with more than this much memory
647    * allocated to its write buffer, we destroy and construct that buffer with
648    * writeBufferDefaultSize_ bytes.
649    *
650    * @param limit of bytes beyond which we will shrink buffers when idle.
651    */
setIdleWriteBufferLimit(size_t limit)652   void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }
653 
654   /**
655    * Get # of calls made between buffer size checks.  0 means disabled.
656    *
657    * @return # of calls between buffer size checks.
658    */
getResizeBufferEveryN()659   int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }
660 
661   /**
662    * Check buffer sizes every "count" calls.  This allows buffer limits
663    * to be enforced for persistent connections with a controllable degree
664    * of overhead. 0 disables checks except at connection close.
665    *
666    * @param count the number of calls between checks, or 0 to disable
667    */
setResizeBufferEveryN(int32_t count)668   void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }
669 
670   /**
671    * Main workhorse function, starts up the server listening on a port and
672    * loops over the libevent handler.
673    */
674   void serve();
675 
676   /**
677    * Causes the server to terminate gracefully (can be called from any thread).
678    */
679   void stop();
680 
681   /// Creates a socket to listen on and binds it to the local port.
682   void createAndListenOnSocket();
683 
684   /**
685    * Register the optional user-provided event-base (for single-thread servers)
686    *
687    * This method should be used when the server is running in a single-thread
688    * mode, and the event base is provided by the user (i.e., the caller).
689    *
690    * @param user_event_base the user-provided event-base. The user is
691    * responsible for freeing the event base memory.
692    */
693   void registerEvents(event_base* user_event_base);
694 
695   /**
696    * Returns the optional user-provided event-base (for single-thread servers).
697    */
getUserEventBase()698   event_base* getUserEventBase() const { return userEventBase_; }
699 
700   /** Some transports, like THeaderTransport, require passing through
701    * the framing size instead of stripping it.
702    */
703   bool getHeaderTransport();
704 
705 private:
706   /**
707    * Callback function that the threadmanager calls when a task reaches
708    * its expiration time.  It is needed to clean up the expired connection.
709    *
710    * @param task the runnable associated with the expired task.
711    */
712   void expireClose(stdcxx::shared_ptr<Runnable> task);
713 
714   /**
715    * Return an initialized connection object.  Creates or recovers from
716    * pool a TConnection and initializes it with the provided socket FD
717    * and flags.
718    *
719    * @param socket FD of socket associated with this connection.
720    * @param addr the sockaddr of the client
721    * @param addrLen the length of addr
722    * @return pointer to initialized TConnection object.
723    */
724   TConnection* createConnection(stdcxx::shared_ptr<TSocket> socket);
725 
726   /**
727    * Returns a connection to pool or deletion.  If the connection pool
728    * (a stack) isn't full, place the connection object on it, otherwise
729    * just delete it.
730    *
731    * @param connection the TConection being returned.
732    */
733   void returnConnection(TConnection* connection);
734 };
735 
736 class TNonblockingIOThread : public Runnable {
737 public:
738   // Creates an IO thread and sets up the event base.  The listenSocket should
739   // be a valid FD on which listen() has already been called.  If the
740   // listenSocket is < 0, accepting will not be done.
741   TNonblockingIOThread(TNonblockingServer* server,
742                        int number,
743                        THRIFT_SOCKET listenSocket,
744                        bool useHighPriority);
745 
746   ~TNonblockingIOThread();
747 
748   // Returns the event-base for this thread.
getEventBase()749   event_base* getEventBase() const { return eventBase_; }
750 
751   // Returns the server for this thread.
getServer()752   TNonblockingServer* getServer() const { return server_; }
753 
754   // Returns the number of this IO thread.
getThreadNumber()755   int getThreadNumber() const { return number_; }
756 
757   // Returns the thread id associated with this object.  This should
758   // only be called after the thread has been started.
getThreadId()759   Thread::id_t getThreadId() const { return threadId_; }
760 
761   // Returns the send-fd for task complete notifications.
getNotificationSendFD()762   evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
763 
764   // Returns the read-fd for task complete notifications.
getNotificationRecvFD()765   evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
766 
767   // Returns the actual thread object associated with this IO thread.
getThread()768   stdcxx::shared_ptr<Thread> getThread() const { return thread_; }
769 
770   // Sets the actual thread object associated with this IO thread.
setThread(const stdcxx::shared_ptr<Thread> & t)771   void setThread(const stdcxx::shared_ptr<Thread>& t) { thread_ = t; }
772 
773   // Used by TConnection objects to indicate processing has finished.
774   bool notify(TNonblockingServer::TConnection* conn);
775 
776   // Enters the event loop and does not return until a call to stop().
777   virtual void run();
778 
779   // Exits the event loop as soon as possible.
780   void stop();
781 
782   // Ensures that the event-loop thread is fully finished and shut down.
783   void join();
784 
785   /// Registers the events for the notification & listen sockets
786   void registerEvents();
787 
788 private:
789   /**
790    * C-callable event handler for signaling task completion.  Provides a
791    * callback that libevent can understand that will read a connection
792    * object's address from a pipe and call connection->transition() for
793    * that object.
794    *
795    * @param fd the descriptor the event occurred on.
796    */
797   static void notifyHandler(evutil_socket_t fd, short which, void* v);
798 
799   /**
800    * C-callable event handler for listener events.  Provides a callback
801    * that libevent can understand which invokes server->handleEvent().
802    *
803    * @param fd the descriptor the event occurred on.
804    * @param which the flags associated with the event.
805    * @param v void* callback arg where we placed TNonblockingServer's "this".
806    */
listenHandler(evutil_socket_t fd,short which,void * v)807   static void listenHandler(evutil_socket_t fd, short which, void* v) {
808     ((TNonblockingServer*)v)->handleEvent(fd, which);
809   }
810 
811   /// Exits the loop ASAP in case of shutdown or error.
812   void breakLoop(bool error);
813 
814   /// Create the pipe used to notify I/O process of task completion.
815   void createNotificationPipe();
816 
817   /// Unregisters our events for notification and listen sockets.
818   void cleanupEvents();
819 
820   /// Sets (or clears) high priority scheduling status for the current thread.
821   void setCurrentThreadHighPriority(bool value);
822 
823 private:
824   /// associated server
825   TNonblockingServer* server_;
826 
827   /// thread number (for debugging).
828   const int number_;
829 
830   /// The actual physical thread id.
831   Thread::id_t threadId_;
832 
833   /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
834   THRIFT_SOCKET listenSocket_;
835 
836   /// Sets a high scheduling priority when running
837   bool useHighPriority_;
838 
839   /// pointer to eventbase to be used for looping
840   event_base* eventBase_;
841 
842   /// Set to true if this class is responsible for freeing the event base
843   /// memory.
844   bool ownEventBase_;
845 
846   /// Used with eventBase_ for connection events (only in listener thread)
847   struct event serverEvent_;
848 
849   /// Used with eventBase_ for task completion notification
850   struct event notificationEvent_;
851 
852   /// File descriptors for pipe used for task completion notification.
853   evutil_socket_t notificationPipeFDs_[2];
854 
855   /// Actual IO Thread
856   stdcxx::shared_ptr<Thread> thread_;
857 };
858 }
859 }
860 } // apache::thrift::server
861 
862 #endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
863