1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
23 #include <thrift/Thrift.h>
24 #include <thrift/stdcxx.h>
25 #include <thrift/server/TServer.h>
26 #include <thrift/transport/PlatformSocket.h>
27 #include <thrift/transport/TBufferTransports.h>
28 #include <thrift/transport/TSocket.h>
29 #include <thrift/transport/TNonblockingServerTransport.h>
30 #include <thrift/concurrency/ThreadManager.h>
31 #include <climits>
32 #include <thrift/concurrency/Thread.h>
33 #include <thrift/concurrency/PlatformThreadFactory.h>
34 #include <thrift/concurrency/Mutex.h>
35 #include <stack>
36 #include <vector>
37 #include <string>
38 #include <cstdlib>
39 #ifdef HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42 #include <event.h>
43 #include <event2/event_compat.h>
44 #include <event2/event_struct.h>
45
46 namespace apache {
47 namespace thrift {
48 namespace server {
49
50 using apache::thrift::transport::TMemoryBuffer;
51 using apache::thrift::transport::TSocket;
52 using apache::thrift::transport::TNonblockingServerTransport;
53 using apache::thrift::protocol::TProtocol;
54 using apache::thrift::concurrency::Runnable;
55 using apache::thrift::concurrency::ThreadManager;
56 using apache::thrift::concurrency::PlatformThreadFactory;
57 using apache::thrift::concurrency::ThreadFactory;
58 using apache::thrift::concurrency::Thread;
59 using apache::thrift::concurrency::Mutex;
60 using apache::thrift::concurrency::Guard;
61
62 #ifdef LIBEVENT_VERSION_NUMBER
63 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
64 #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
65 #define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
66 #else
67 // assume latest version 1 series
68 #define LIBEVENT_VERSION_MAJOR 1
69 #define LIBEVENT_VERSION_MINOR 14
70 #define LIBEVENT_VERSION_REL 13
71 #define LIBEVENT_VERSION_NUMBER \
72 ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
73 #endif
74
75 #if LIBEVENT_VERSION_NUMBER < 0x02000000
76 typedef THRIFT_SOCKET evutil_socket_t;
77 #endif
78
79 #ifndef SOCKOPT_CAST_T
80 #ifndef _WIN32
81 #define SOCKOPT_CAST_T void
82 #else
83 #define SOCKOPT_CAST_T char
84 #endif // _WIN32
85 #endif
86
87 template <class T>
const_cast_sockopt(const T * v)88 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
89 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
90 }
91
92 template <class T>
cast_sockopt(T * v)93 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
94 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
95 }
96
97 /**
98 * This is a non-blocking server in C++ for high performance that
99 * operates a set of IO threads (by default only one). It assumes that
100 * all incoming requests are framed with a 4 byte length indicator and
101 * writes out responses using the same framing.
102 */
103
104 /// Overload condition actions.
105 enum TOverloadAction {
106 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
107 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
108 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
109 };
110
111 class TNonblockingIOThread;
112
113 class TNonblockingServer : public TServer {
114 private:
115 class TConnection;
116
117 friend class TNonblockingIOThread;
118
119 private:
120 /// Listen backlog
121 static const int LISTEN_BACKLOG = 1024;
122
123 /// Default limit on size of idle connection pool
124 static const size_t CONNECTION_STACK_LIMIT = 1024;
125
126 /// Default limit on frame size
127 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
128
129 /// Default limit on total number of connected sockets
130 static const int MAX_CONNECTIONS = INT_MAX;
131
132 /// Default limit on connections in handler/task processing
133 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
134
135 /// Default size of write buffer
136 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
137
138 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
139 static const int IDLE_READ_BUFFER_LIMIT = 1024;
140
141 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
142 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
143
144 /// # of calls before resizing oversized buffers (0 = check only on close)
145 static const int RESIZE_BUFFER_EVERY_N = 512;
146
147 /// # of IO threads to use by default
148 static const int DEFAULT_IO_THREADS = 1;
149
150 /// # of IO threads this server will use
151 size_t numIOThreads_;
152
153 /// Whether to set high scheduling priority for IO threads
154 bool useHighPriorityIOThreads_;
155
156 /// Server socket file descriptor
157 THRIFT_SOCKET serverSocket_;
158
159 /// The optional user-provided event-base (for single-thread servers)
160 event_base* userEventBase_;
161
162 /// For processing via thread pool, may be NULL
163 stdcxx::shared_ptr<ThreadManager> threadManager_;
164
165 /// Is thread pool processing?
166 bool threadPoolProcessing_;
167
168 // Factory to create the IO threads
169 stdcxx::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
170
171 // Vector of IOThread objects that will handle our IO
172 std::vector<stdcxx::shared_ptr<TNonblockingIOThread> > ioThreads_;
173
174 // Index of next IO Thread to be used (for round-robin)
175 uint32_t nextIOThread_;
176
177 // Synchronizes access to connection stack and similar data
178 Mutex connMutex_;
179
180 /// Number of TConnection object we've created
181 size_t numTConnections_;
182
183 /// Number of Connections processing or waiting to process
184 size_t numActiveProcessors_;
185
186 /// Limit for how many TConnection objects to cache
187 size_t connectionStackLimit_;
188
189 /// Limit for number of connections processing or waiting to process
190 size_t maxActiveProcessors_;
191
192 /// Limit for number of open connections
193 size_t maxConnections_;
194
195 /// Limit for frame size
196 size_t maxFrameSize_;
197
198 /// Time in milliseconds before an unperformed task expires (0 == infinite).
199 int64_t taskExpireTime_;
200
201 /**
202 * Hysteresis for overload state. This is the fraction of the overload
203 * value that needs to be reached before the overload state is cleared;
204 * must be <= 1.0.
205 */
206 double overloadHysteresis_;
207
208 /// Action to take when we're overloaded.
209 TOverloadAction overloadAction_;
210
211 /**
212 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
213 * and found to be exceeded, reinitialized) to this size.
214 */
215 size_t writeBufferDefaultSize_;
216
217 /**
218 * Max read buffer size for an idle TConnection. When we place an idle
219 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
220 * we will free the buffer (such that it will be reinitialized by the next
221 * received frame) if it has exceeded this limit. 0 disables this check.
222 */
223 size_t idleReadBufferLimit_;
224
225 /**
226 * Max write buffer size for an idle connection. When we place an idle
227 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
228 * we insure that its write buffer is <= to this size; otherwise we
229 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
230 * idle connections don't hog memory. 0 disables this check.
231 */
232 size_t idleWriteBufferLimit_;
233
234 /**
235 * Every N calls we check the buffer size limits on a connected TConnection.
236 * 0 disables (i.e. the checks are only done when a connection closes).
237 */
238 int32_t resizeBufferEveryN_;
239
240 /// Set if we are currently in an overloaded state.
241 bool overloaded_;
242
243 /// Count of connections dropped since overload started
244 uint32_t nConnectionsDropped_;
245
246 /// Count of connections dropped on overload since server started
247 uint64_t nTotalConnectionsDropped_;
248
249 /**
250 * This is a stack of all the objects that have been created but that
251 * are NOT currently in use. When we close a connection, we place it on this
252 * stack so that the object can be reused later, rather than freeing the
253 * memory and reallocating a new object later.
254 */
255 std::stack<TConnection*> connectionStack_;
256
257 /**
258 * This container holds pointers to all active connections. This container
259 * allows the server to clean up unlcosed connection objects at destruction,
260 * which in turn allows their transports, protocols, processors and handlers
261 * to deallocate and clean up correctly.
262 */
263 std::vector<TConnection*> activeConnections_;
264
265 /*
266 */
267 stdcxx::shared_ptr<TNonblockingServerTransport> serverTransport_;
268
269 /**
270 * Called when server socket had something happen. We accept all waiting
271 * client connections on listen socket fd and assign TConnection objects
272 * to handle those requests.
273 *
274 * @param which the event flag that triggered the handler.
275 */
276 void handleEvent(THRIFT_SOCKET fd, short which);
277
init()278 void init() {
279 serverSocket_ = THRIFT_INVALID_SOCKET;
280 numIOThreads_ = DEFAULT_IO_THREADS;
281 nextIOThread_ = 0;
282 useHighPriorityIOThreads_ = false;
283 userEventBase_ = NULL;
284 threadPoolProcessing_ = false;
285 numTConnections_ = 0;
286 numActiveProcessors_ = 0;
287 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
288 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
289 maxConnections_ = MAX_CONNECTIONS;
290 maxFrameSize_ = MAX_FRAME_SIZE;
291 taskExpireTime_ = 0;
292 overloadHysteresis_ = 0.8;
293 overloadAction_ = T_OVERLOAD_NO_ACTION;
294 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
295 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
296 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
297 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
298 overloaded_ = false;
299 nConnectionsDropped_ = 0;
300 nTotalConnectionsDropped_ = 0;
301 }
302
303 public:
TNonblockingServer(const stdcxx::shared_ptr<TProcessorFactory> & processorFactory,const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport> & serverTransport)304 TNonblockingServer(const stdcxx::shared_ptr<TProcessorFactory>& processorFactory,
305 const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
306 : TServer(processorFactory), serverTransport_(serverTransport) {
307 init();
308 }
309
TNonblockingServer(const stdcxx::shared_ptr<TProcessor> & processor,const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport> & serverTransport)310 TNonblockingServer(const stdcxx::shared_ptr<TProcessor>& processor,
311 const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
312 : TServer(processor), serverTransport_(serverTransport) {
313 init();
314 }
315
316
317 TNonblockingServer(const stdcxx::shared_ptr<TProcessorFactory>& processorFactory,
318 const stdcxx::shared_ptr<TProtocolFactory>& protocolFactory,
319 const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
320 const stdcxx::shared_ptr<ThreadManager>& threadManager
321 = stdcxx::shared_ptr<ThreadManager>())
TServer(processorFactory)322 : TServer(processorFactory), serverTransport_(serverTransport) {
323 init();
324
325 setInputProtocolFactory(protocolFactory);
326 setOutputProtocolFactory(protocolFactory);
327 setThreadManager(threadManager);
328 }
329
330 TNonblockingServer(const stdcxx::shared_ptr<TProcessor>& processor,
331 const stdcxx::shared_ptr<TProtocolFactory>& protocolFactory,
332 const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
333 const stdcxx::shared_ptr<ThreadManager>& threadManager
334 = stdcxx::shared_ptr<ThreadManager>())
TServer(processor)335 : TServer(processor), serverTransport_(serverTransport) {
336 init();
337
338 setInputProtocolFactory(protocolFactory);
339 setOutputProtocolFactory(protocolFactory);
340 setThreadManager(threadManager);
341 }
342
343 TNonblockingServer(const stdcxx::shared_ptr<TProcessorFactory>& processorFactory,
344 const stdcxx::shared_ptr<TTransportFactory>& inputTransportFactory,
345 const stdcxx::shared_ptr<TTransportFactory>& outputTransportFactory,
346 const stdcxx::shared_ptr<TProtocolFactory>& inputProtocolFactory,
347 const stdcxx::shared_ptr<TProtocolFactory>& outputProtocolFactory,
348 const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
349 const stdcxx::shared_ptr<ThreadManager>& threadManager
350 = stdcxx::shared_ptr<ThreadManager>())
TServer(processorFactory)351 : TServer(processorFactory), serverTransport_(serverTransport) {
352 init();
353
354 setInputTransportFactory(inputTransportFactory);
355 setOutputTransportFactory(outputTransportFactory);
356 setInputProtocolFactory(inputProtocolFactory);
357 setOutputProtocolFactory(outputProtocolFactory);
358 setThreadManager(threadManager);
359 }
360
361 TNonblockingServer(const stdcxx::shared_ptr<TProcessor>& processor,
362 const stdcxx::shared_ptr<TTransportFactory>& inputTransportFactory,
363 const stdcxx::shared_ptr<TTransportFactory>& outputTransportFactory,
364 const stdcxx::shared_ptr<TProtocolFactory>& inputProtocolFactory,
365 const stdcxx::shared_ptr<TProtocolFactory>& outputProtocolFactory,
366 const stdcxx::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
367 const stdcxx::shared_ptr<ThreadManager>& threadManager
368 = stdcxx::shared_ptr<ThreadManager>())
TServer(processor)369 : TServer(processor), serverTransport_(serverTransport) {
370 init();
371
372 setInputTransportFactory(inputTransportFactory);
373 setOutputTransportFactory(outputTransportFactory);
374 setInputProtocolFactory(inputProtocolFactory);
375 setOutputProtocolFactory(outputProtocolFactory);
376 setThreadManager(threadManager);
377 }
378
379 ~TNonblockingServer();
380
381 void setThreadManager(stdcxx::shared_ptr<ThreadManager> threadManager);
382
getListenPort()383 int getListenPort() { return serverTransport_->getListenPort(); }
384
getThreadManager()385 stdcxx::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }
386
387 /**
388 * Sets the number of IO threads used by this server. Can only be used before
389 * the call to serve() and has no effect afterwards. We always use a
390 * PosixThreadFactory for the IO worker threads, because they must joinable
391 * for clean shutdown.
392 */
setNumIOThreads(size_t numThreads)393 void setNumIOThreads(size_t numThreads) {
394 numIOThreads_ = numThreads;
395 // User-provided event-base doesn't works for multi-threaded servers
396 assert(numIOThreads_ <= 1 || !userEventBase_);
397 }
398
399 /** Return whether the IO threads will get high scheduling priority */
useHighPriorityIOThreads()400 bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
401
402 /** Set whether the IO threads will get high scheduling priority. */
setUseHighPriorityIOThreads(bool val)403 void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }
404
405 /** Return the number of IO threads used by this server. */
getNumIOThreads()406 size_t getNumIOThreads() const { return numIOThreads_; }
407
408 /**
409 * Get the maximum number of unused TConnection we will hold in reserve.
410 *
411 * @return the current limit on TConnection pool size.
412 */
getConnectionStackLimit()413 size_t getConnectionStackLimit() const { return connectionStackLimit_; }
414
415 /**
416 * Set the maximum number of unused TConnection we will hold in reserve.
417 *
418 * @param sz the new limit for TConnection pool size.
419 */
setConnectionStackLimit(size_t sz)420 void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }
421
isThreadPoolProcessing()422 bool isThreadPoolProcessing() const { return threadPoolProcessing_; }
423
addTask(stdcxx::shared_ptr<Runnable> task)424 void addTask(stdcxx::shared_ptr<Runnable> task) {
425 threadManager_->add(task, 0LL, taskExpireTime_);
426 }
427
428 /**
429 * Return the count of sockets currently connected to.
430 *
431 * @return count of connected sockets.
432 */
getNumConnections()433 size_t getNumConnections() const { return numTConnections_; }
434
435 /**
436 * Return the count of sockets currently connected to.
437 *
438 * @return count of connected sockets.
439 */
getNumActiveConnections()440 size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }
441
442 /**
443 * Return the count of connection objects allocated but not in use.
444 *
445 * @return count of idle connection objects.
446 */
getNumIdleConnections()447 size_t getNumIdleConnections() const { return connectionStack_.size(); }
448
449 /**
450 * Return count of number of connections which are currently processing.
451 * This is defined as a connection where all data has been received and
452 * either assigned a task (when threading) or passed to a handler (when
453 * not threading), and where the handler has not yet returned.
454 *
455 * @return # of connections currently processing.
456 */
getNumActiveProcessors()457 size_t getNumActiveProcessors() const { return numActiveProcessors_; }
458
459 /// Increment the count of connections currently processing.
incrementActiveProcessors()460 void incrementActiveProcessors() {
461 Guard g(connMutex_);
462 ++numActiveProcessors_;
463 }
464
465 /// Decrement the count of connections currently processing.
decrementActiveProcessors()466 void decrementActiveProcessors() {
467 Guard g(connMutex_);
468 if (numActiveProcessors_ > 0) {
469 --numActiveProcessors_;
470 }
471 }
472
473 /**
474 * Get the maximum # of connections allowed before overload.
475 *
476 * @return current setting.
477 */
getMaxConnections()478 size_t getMaxConnections() const { return maxConnections_; }
479
480 /**
481 * Set the maximum # of connections allowed before overload.
482 *
483 * @param maxConnections new setting for maximum # of connections.
484 */
setMaxConnections(size_t maxConnections)485 void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }
486
487 /**
488 * Get the maximum # of connections waiting in handler/task before overload.
489 *
490 * @return current setting.
491 */
getMaxActiveProcessors()492 size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }
493
494 /**
495 * Set the maximum # of connections waiting in handler/task before overload.
496 *
497 * @param maxActiveProcessors new setting for maximum # of active processes.
498 */
setMaxActiveProcessors(size_t maxActiveProcessors)499 void setMaxActiveProcessors(size_t maxActiveProcessors) {
500 maxActiveProcessors_ = maxActiveProcessors;
501 }
502
503 /**
504 * Get the maximum allowed frame size.
505 *
506 * If a client tries to send a message larger than this limit,
507 * its connection will be closed.
508 *
509 * @return Maxium frame size, in bytes.
510 */
getMaxFrameSize()511 size_t getMaxFrameSize() const { return maxFrameSize_; }
512
513 /**
514 * Set the maximum allowed frame size.
515 *
516 * @param maxFrameSize The new maximum frame size.
517 */
setMaxFrameSize(size_t maxFrameSize)518 void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
519
520 /**
521 * Get fraction of maximum limits before an overload condition is cleared.
522 *
523 * @return hysteresis fraction
524 */
getOverloadHysteresis()525 double getOverloadHysteresis() const { return overloadHysteresis_; }
526
527 /**
528 * Set fraction of maximum limits before an overload condition is cleared.
529 * A good value would probably be between 0.5 and 0.9.
530 *
531 * @param hysteresisFraction fraction <= 1.0.
532 */
setOverloadHysteresis(double hysteresisFraction)533 void setOverloadHysteresis(double hysteresisFraction) {
534 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
535 overloadHysteresis_ = hysteresisFraction;
536 }
537 }
538
539 /**
540 * Get the action the server will take on overload.
541 *
542 * @return a TOverloadAction enum value for the currently set action.
543 */
getOverloadAction()544 TOverloadAction getOverloadAction() const { return overloadAction_; }
545
546 /**
547 * Set the action the server is to take on overload.
548 *
549 * @param overloadAction a TOverloadAction enum value for the action.
550 */
setOverloadAction(TOverloadAction overloadAction)551 void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }
552
553 /**
554 * Get the time in milliseconds after which a task expires (0 == infinite).
555 *
556 * @return a 64-bit time in milliseconds.
557 */
getTaskExpireTime()558 int64_t getTaskExpireTime() const { return taskExpireTime_; }
559
560 /**
561 * Set the time in milliseconds after which a task expires (0 == infinite).
562 *
563 * @param taskExpireTime a 64-bit time in milliseconds.
564 */
setTaskExpireTime(int64_t taskExpireTime)565 void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }
566
567 /**
568 * Determine if the server is currently overloaded.
569 * This function checks the maximums for open connections and connections
570 * currently in processing, and sets an overload condition if they are
571 * exceeded. The overload will persist until both values are below the
572 * current hysteresis fraction of their maximums.
573 *
574 * @return true if an overload condition exists, false if not.
575 */
576 bool serverOverloaded();
577
578 /** Pop and discard next task on threadpool wait queue.
579 *
580 * @return true if a task was discarded, false if the wait queue was empty.
581 */
582 bool drainPendingTask();
583
584 /**
585 * Get the starting size of a TConnection object's write buffer.
586 *
587 * @return # bytes we initialize a TConnection object's write buffer to.
588 */
getWriteBufferDefaultSize()589 size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }
590
591 /**
592 * Set the starting size of a TConnection object's write buffer.
593 *
594 * @param size # bytes we initialize a TConnection object's write buffer to.
595 */
setWriteBufferDefaultSize(size_t size)596 void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }
597
598 /**
599 * Get the maximum size of read buffer allocated to idle TConnection objects.
600 *
601 * @return # bytes beyond which we will dealloc idle buffer.
602 */
getIdleReadBufferLimit()603 size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }
604
605 /**
606 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
607 * Get the maximum size of read buffer allocated to idle TConnection objects.
608 *
609 * @return # bytes beyond which we will dealloc idle buffer.
610 */
getIdleBufferMemLimit()611 size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }
612
613 /**
614 * Set the maximum size read buffer allocated to idle TConnection objects.
615 * If a TConnection object is found (either on connection close or between
616 * calls when resizeBufferEveryN_ is set) with more than this much memory
617 * allocated to its read buffer, we free it and allow it to be reinitialized
618 * on the next received frame.
619 *
620 * @param limit of bytes beyond which we will shrink buffers when checked.
621 */
setIdleReadBufferLimit(size_t limit)622 void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }
623
624 /**
625 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
626 * Set the maximum size read buffer allocated to idle TConnection objects.
627 * If a TConnection object is found (either on connection close or between
628 * calls when resizeBufferEveryN_ is set) with more than this much memory
629 * allocated to its read buffer, we free it and allow it to be reinitialized
630 * on the next received frame.
631 *
632 * @param limit of bytes beyond which we will shrink buffers when checked.
633 */
setIdleBufferMemLimit(size_t limit)634 void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }
635
636 /**
637 * Get the maximum size of write buffer allocated to idle TConnection objects.
638 *
639 * @return # bytes beyond which we will reallocate buffers when checked.
640 */
getIdleWriteBufferLimit()641 size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }
642
643 /**
644 * Set the maximum size write buffer allocated to idle TConnection objects.
645 * If a TConnection object is found (either on connection close or between
646 * calls when resizeBufferEveryN_ is set) with more than this much memory
647 * allocated to its write buffer, we destroy and construct that buffer with
648 * writeBufferDefaultSize_ bytes.
649 *
650 * @param limit of bytes beyond which we will shrink buffers when idle.
651 */
setIdleWriteBufferLimit(size_t limit)652 void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }
653
654 /**
655 * Get # of calls made between buffer size checks. 0 means disabled.
656 *
657 * @return # of calls between buffer size checks.
658 */
getResizeBufferEveryN()659 int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }
660
661 /**
662 * Check buffer sizes every "count" calls. This allows buffer limits
663 * to be enforced for persistent connections with a controllable degree
664 * of overhead. 0 disables checks except at connection close.
665 *
666 * @param count the number of calls between checks, or 0 to disable
667 */
setResizeBufferEveryN(int32_t count)668 void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }
669
670 /**
671 * Main workhorse function, starts up the server listening on a port and
672 * loops over the libevent handler.
673 */
674 void serve();
675
676 /**
677 * Causes the server to terminate gracefully (can be called from any thread).
678 */
679 void stop();
680
681 /// Creates a socket to listen on and binds it to the local port.
682 void createAndListenOnSocket();
683
684 /**
685 * Register the optional user-provided event-base (for single-thread servers)
686 *
687 * This method should be used when the server is running in a single-thread
688 * mode, and the event base is provided by the user (i.e., the caller).
689 *
690 * @param user_event_base the user-provided event-base. The user is
691 * responsible for freeing the event base memory.
692 */
693 void registerEvents(event_base* user_event_base);
694
695 /**
696 * Returns the optional user-provided event-base (for single-thread servers).
697 */
getUserEventBase()698 event_base* getUserEventBase() const { return userEventBase_; }
699
700 /** Some transports, like THeaderTransport, require passing through
701 * the framing size instead of stripping it.
702 */
703 bool getHeaderTransport();
704
705 private:
706 /**
707 * Callback function that the threadmanager calls when a task reaches
708 * its expiration time. It is needed to clean up the expired connection.
709 *
710 * @param task the runnable associated with the expired task.
711 */
712 void expireClose(stdcxx::shared_ptr<Runnable> task);
713
714 /**
715 * Return an initialized connection object. Creates or recovers from
716 * pool a TConnection and initializes it with the provided socket FD
717 * and flags.
718 *
719 * @param socket FD of socket associated with this connection.
720 * @param addr the sockaddr of the client
721 * @param addrLen the length of addr
722 * @return pointer to initialized TConnection object.
723 */
724 TConnection* createConnection(stdcxx::shared_ptr<TSocket> socket);
725
726 /**
727 * Returns a connection to pool or deletion. If the connection pool
728 * (a stack) isn't full, place the connection object on it, otherwise
729 * just delete it.
730 *
731 * @param connection the TConection being returned.
732 */
733 void returnConnection(TConnection* connection);
734 };
735
736 class TNonblockingIOThread : public Runnable {
737 public:
738 // Creates an IO thread and sets up the event base. The listenSocket should
739 // be a valid FD on which listen() has already been called. If the
740 // listenSocket is < 0, accepting will not be done.
741 TNonblockingIOThread(TNonblockingServer* server,
742 int number,
743 THRIFT_SOCKET listenSocket,
744 bool useHighPriority);
745
746 ~TNonblockingIOThread();
747
748 // Returns the event-base for this thread.
getEventBase()749 event_base* getEventBase() const { return eventBase_; }
750
751 // Returns the server for this thread.
getServer()752 TNonblockingServer* getServer() const { return server_; }
753
754 // Returns the number of this IO thread.
getThreadNumber()755 int getThreadNumber() const { return number_; }
756
757 // Returns the thread id associated with this object. This should
758 // only be called after the thread has been started.
getThreadId()759 Thread::id_t getThreadId() const { return threadId_; }
760
761 // Returns the send-fd for task complete notifications.
getNotificationSendFD()762 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
763
764 // Returns the read-fd for task complete notifications.
getNotificationRecvFD()765 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
766
767 // Returns the actual thread object associated with this IO thread.
getThread()768 stdcxx::shared_ptr<Thread> getThread() const { return thread_; }
769
770 // Sets the actual thread object associated with this IO thread.
setThread(const stdcxx::shared_ptr<Thread> & t)771 void setThread(const stdcxx::shared_ptr<Thread>& t) { thread_ = t; }
772
773 // Used by TConnection objects to indicate processing has finished.
774 bool notify(TNonblockingServer::TConnection* conn);
775
776 // Enters the event loop and does not return until a call to stop().
777 virtual void run();
778
779 // Exits the event loop as soon as possible.
780 void stop();
781
782 // Ensures that the event-loop thread is fully finished and shut down.
783 void join();
784
785 /// Registers the events for the notification & listen sockets
786 void registerEvents();
787
788 private:
789 /**
790 * C-callable event handler for signaling task completion. Provides a
791 * callback that libevent can understand that will read a connection
792 * object's address from a pipe and call connection->transition() for
793 * that object.
794 *
795 * @param fd the descriptor the event occurred on.
796 */
797 static void notifyHandler(evutil_socket_t fd, short which, void* v);
798
799 /**
800 * C-callable event handler for listener events. Provides a callback
801 * that libevent can understand which invokes server->handleEvent().
802 *
803 * @param fd the descriptor the event occurred on.
804 * @param which the flags associated with the event.
805 * @param v void* callback arg where we placed TNonblockingServer's "this".
806 */
listenHandler(evutil_socket_t fd,short which,void * v)807 static void listenHandler(evutil_socket_t fd, short which, void* v) {
808 ((TNonblockingServer*)v)->handleEvent(fd, which);
809 }
810
811 /// Exits the loop ASAP in case of shutdown or error.
812 void breakLoop(bool error);
813
814 /// Create the pipe used to notify I/O process of task completion.
815 void createNotificationPipe();
816
817 /// Unregisters our events for notification and listen sockets.
818 void cleanupEvents();
819
820 /// Sets (or clears) high priority scheduling status for the current thread.
821 void setCurrentThreadHighPriority(bool value);
822
823 private:
824 /// associated server
825 TNonblockingServer* server_;
826
827 /// thread number (for debugging).
828 const int number_;
829
830 /// The actual physical thread id.
831 Thread::id_t threadId_;
832
833 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
834 THRIFT_SOCKET listenSocket_;
835
836 /// Sets a high scheduling priority when running
837 bool useHighPriority_;
838
839 /// pointer to eventbase to be used for looping
840 event_base* eventBase_;
841
842 /// Set to true if this class is responsible for freeing the event base
843 /// memory.
844 bool ownEventBase_;
845
846 /// Used with eventBase_ for connection events (only in listener thread)
847 struct event serverEvent_;
848
849 /// Used with eventBase_ for task completion notification
850 struct event notificationEvent_;
851
852 /// File descriptors for pipe used for task completion notification.
853 evutil_socket_t notificationPipeFDs_[2];
854
855 /// Actual IO Thread
856 stdcxx::shared_ptr<Thread> thread_;
857 };
858 }
859 }
860 } // apache::thrift::server
861
862 #endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
863