1 #ifndef CONNECT___SERVER__HPP
2 #define CONNECT___SERVER__HPP
3
4 /* $Id: server.hpp 575325 2018-11-27 18:22:00Z ucko $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Authors: Aaron Ucko, Victor Joukov, Denis Vakatov
30 *
31 */
32
33 /// @file server.hpp
34 /// Framework to create multithreaded network servers with thread-per-request
35 /// scheduling.
36
37 #include <connect/impl/thread_pool_for_server.hpp>
38 #include <connect/ncbi_conn_stream.hpp>
39 #include <connect/ncbi_conn_exception.hpp>
40 #include <connect/ncbi_socket.hpp>
41
42
43 /** @addtogroup ThreadedServer
44 *
45 * @{
46 */
47
48
49 BEGIN_NCBI_SCOPE
50
51
52 class IServer_ConnectionFactory;
53 class IServer_ConnectionBase;
54 struct SServer_Parameters;
55 class CServer_ConnectionPool;
56 class CServer_Connection;
57
58
59 /// Extended copy of the type EIO_Event allowing to distinguish between
60 /// connection closing from client and from ourselves
61 enum EServIO_Event {
62 eServIO_Open = 0x00,
63 eServIO_Read = 0x01,
64 eServIO_Write = 0x02,
65 eServIO_ReadWrite = 0x03, /**< eIO_Read | eIO_Write */
66 eServIO_ClientClose = 0x04,
67 eServIO_OurClose = 0x08,
68 eServIO_Inactivity = 0x10,
69 eServIO_Delete = 0x20,
70 eServIO_Alarm = 0x40
71 };
72
73
74 /// Transform EIO_Event type to EServIO_Event
75 inline EServIO_Event
IOEventToServIOEvent(EIO_Event event)76 IOEventToServIOEvent(EIO_Event event)
77 {
78 return (EServIO_Event) event;
79 }
80
81
82 class CNetCacheServer;
83
84 /////////////////////////////////////////////////////////////////////////////
85 ///
86 /// CServer::
87 ///
88 /// Thread-pool based server. On every event it allocates one of threads
89 /// from pool to serve the event, thereby making possible to serve large
90 /// number of concurrent connections efficiently. You need to subclass it
91 /// only if you want to provide gentle shutdown ability (override
92 /// ShutdownRequested) or process data in main thread on timeout (override
93 /// ProcessTimeout and set parameter accept_timeout to non-zero value).
94 ///
95
96 class NCBI_XCONNECT_EXPORT CServer : protected CConnIniter
97 {
98 public:
99 // 'ctors
100 CServer(void);
101 virtual ~CServer();
102
103 /// Register a listener
104 void AddListener(IServer_ConnectionFactory* factory,
105 unsigned short port);
106
107 /// Removes a listener
108 /// @param port
109 /// the listener on the port will be removed
110 /// @return
111 /// true if the listener has been removed, false if the server does not
112 /// listen on the port.
113 bool RemoveListener(unsigned short port);
114
115 ///
116 void SetParameters(const SServer_Parameters& new_params);
117
118 ///
119 void GetParameters(SServer_Parameters* params);
120
121 /// Start listening before the main loop. If called, tries
122 /// to listen on all requested ports for all listeners, correcting
123 /// errors by calling listeners' OnFailure
124 void StartListening(void);
125
126 /// Enter the main loop
127 void Run(void);
128
129 /// Submit request to be executed by the server thread pool
130 void SubmitRequest(const CRef<CStdRequest>& request);
131
132 /// Mark connection as deferred for processing, i.e. do not poll on it
133 /// and wait when IsReadyToProcess() will return true.
134 void DeferConnectionProcessing(IServer_ConnectionBase* conn);
135 void DeferConnectionProcessing(CSocket* sock);
136
137 /// Close connection. Method should be called only when closing is
138 /// initiated by server itself, because it will generate then event
139 /// eServIO_OurClose.
140 ///
141 /// @sa EServIO_Event
142 void CloseConnection(CSocket* sock);
143
144 /// Add externally created connection to the connection pool which server
145 /// polls on. Throws exception if pool is full.
146 /// NOTE: events to this connection can come theoretically even
147 /// NOTE: if connection gets some error or its peer closes it then conn
148 /// object will be deleted after processing OnClose event. If you don't
149 /// want that you have to call RemoveConnectionFromPool while handling
150 /// OnClose.
151 void AddConnectionToPool(CServer_Connection* conn);
152 /// Remove externally created connection from pool.
153 void RemoveConnectionFromPool(CServer_Connection* conn);
154 /// Force poll cycle to make another iteration.
155 /// Should be called if IsReadyToProcess() for some connection handler
156 /// became true.
157 void WakeUpPollCycle(void);
158 /// Set custom suffix to use on all threads in the server's pool.
159 /// Value can be set only before call to Run(), any change of the value
160 /// after call to Run() will be ignored.
SetCustomThreadSuffix(const string & suffix)161 void SetCustomThreadSuffix(const string& suffix)
162 { m_ThreadSuffix = suffix; }
163
164 /// Provides a list of ports on which the server is listening
165 /// @return
166 /// currently listened ports
167 vector<unsigned short> GetListenerPorts(void);
168
169 protected:
170 /// Initialize the server
171 ///
172 /// Called by Run method before poll cycle.
Init()173 virtual void Init() {}
174
175 /// Cleanup the server
176 ///
177 /// Called by Run method after poll cycle when all processing threads
178 /// are stopped, but before releasing listening ports. Here you're still
179 /// guaranteed that another instance running on the same set of ports will
180 /// fail at StartListening point.
Exit()181 virtual void Exit() {}
182
183 /// Runs synchronously when no socket activity has occurred in a
184 /// while (as determined by m_AcceptTimeout).
185 /// @sa m_Parameters->accept_timeout
ProcessTimeout(void)186 virtual void ProcessTimeout(void) {}
187
188 /// Runs synchronously between iterations.
189 /// @return
190 /// whether to shut down service and return from Run.
ShutdownRequested(void)191 virtual bool ShutdownRequested(void) { return false; }
192
193 private:
194 void x_DoRun(void);
195
196 friend class CNetCacheServer;
GetThreadPool(void)197 CPoolOfThreads_ForServer* GetThreadPool(void) { return m_ThreadPool; }
198
199 SServer_Parameters* m_Parameters;
200 CServer_ConnectionPool* m_ConnectionPool;
201 CPoolOfThreads_ForServer* m_ThreadPool;
202 string m_ThreadSuffix;
203 };
204
205
206 /////////////////////////////////////////////////////////////////////////////
207 ///
208 /// Error codes for OnOverflow method in IServer_ConnectionHandler
209 enum EOverflowReason
210 {
211 eOR_Unknown = 0,
212 eOR_ConnectionPoolFull,
213 eOR_RequestQueueFull,
214 eOR_UnpollableSocket
215 };
216
217
218 /////////////////////////////////////////////////////////////////////////////
219 ///
220 /// IServer_ConnectionHandler::
221 ///
222 /// Implement this interface to provide server functionality.
223 ///
224
225 class NCBI_XCONNECT_EXPORT IServer_ConnectionHandler
226 {
227 public:
~IServer_ConnectionHandler()228 virtual ~IServer_ConnectionHandler() { }
229
230 /// Following three methods are guaranteed to be called NOT
231 /// at the same time as On*, so if you implement them
232 /// you should not guard the variables which they can use with
233 /// mutexes.
234 /// @param alarm_time
235 /// Set this parameter to a pointer to a CTime object to recieve
236 /// an OnTimer event at the moment in time specified by this object.
237 /// @return
238 /// Returns the set of events for which Poll should check.
GetEventsToPollFor(const CTime **) const239 virtual EIO_Event GetEventsToPollFor(const CTime** /*alarm_time*/) const
240 { return eIO_Read; }
241 /// Returns the timeout for this connection
GetTimeout(void)242 virtual const STimeout* GetTimeout(void)
243 { return kDefaultTimeout; }
244 /// Returns connection handler's perception of whether we open or not.
245 /// It is unsafe to just close underlying socket because of the race,
246 /// emerging due to the fact that the socket can linger for a while.
IsOpen(void)247 virtual bool IsOpen(void)
248 { return true; }
249 /// Returns the handler's readiness to process input data or to write
250 /// some output data. OnRead() and OnWrite() are not called unless this
251 /// method return true.
IsReadyToProcess(void) const252 virtual bool IsReadyToProcess(void) const
253 { return true; }
254
255
256 /// Runs in response to an external event [asynchronous].
257 /// You can get socket by calling GetSocket(), if you close the socket
258 /// this object will be destroyed.
259 /// Individual events are:
260 /// A client has just established this connection.
261 virtual void OnOpen(void) = 0;
262 /// The client has just sent data.
263 virtual void OnRead(void) = 0;
264 /// The client is ready to receive data.
265 virtual void OnWrite(void) = 0;
266
267 /// Type of connection closing
268 enum EClosePeer {
269 eOurClose, ///< Connection closed by ourselves
270 eClientClose ///< Connection closed by other peer
271 };
272
273 /// The connection has closed (with information on type of closing)
OnClose(EClosePeer)274 virtual void OnClose(EClosePeer /*peer*/) { }
275
276 /// Runs when a client has been idle for too long, prior to
277 /// closing the connection [synchronous].
OnTimeout(void)278 virtual void OnTimeout(void) { }
279
280 /// This method is called at the moment in time specified earlier by the
281 /// alarm_time parameter of the GetEventsToPollFor method [synchronous].
OnTimer(void)282 virtual void OnTimer(void) { }
283
284 /// Runs when there are insufficient resources to queue a
285 /// connection, prior to closing it. Provides a reason why the
286 /// connection is being close, which can be reported back to the client.
287 // See comment for CAcceptRequest::Process and CServer::CreateRequest
OnOverflow(EOverflowReason)288 virtual void OnOverflow(EOverflowReason) { }
289
290 /// Runs when a socket error is detected
OnError(const string &)291 virtual void OnError(const string & /*err_message*/) { }
292
293 /// Get underlying socket
GetSocket(void)294 CSocket& GetSocket(void) { return *m_Socket; }
295
296 public: // TODO: make it protected. Public is for DEBUG purposes only
297 friend class CServer_Connection;
SetSocket(CSocket * socket)298 void SetSocket(CSocket *socket) { m_Socket = socket; }
299
300 private:
301 CSocket* m_Socket;
302 };
303
304
305
306 /////////////////////////////////////////////////////////////////////////////
307 ///
308 /// IServer_MessageHandler::
309 ///
310 /// TODO:
311 class NCBI_XCONNECT_EXPORT IServer_MessageHandler :
312 public IServer_ConnectionHandler
313 {
314 public:
IServer_MessageHandler()315 IServer_MessageHandler() :
316 m_Buffer(0)
317 { }
~IServer_MessageHandler()318 virtual ~IServer_MessageHandler() { BUF_Destroy(m_Buffer); }
319 virtual void OnRead(void);
320 // You should implement this look-ahead function to decide, did you get
321 // a message in the series of read events. If not, you should return -1.
322 // If yes, return number of chars, beyond well formed message. E.g., if
323 // your message spans all the buffer, return 0. If you returned non-zero
324 // value, this piece of data will be used in the next CheckMessage to
325 // simplify client state management.
326 // You also need to copy bytes, comprising the message from data to buffer.
327 virtual int CheckMessage(BUF* buffer, const void *data, size_t size) = 0;
328 // Process incoming message in the buffer, by using
329 // BUF_Read(buffer, your_data_buffer, BUF_Size(buffer)).
330 virtual void OnMessage(BUF buffer) = 0;
331 private:
332 BUF m_Buffer;
333 };
334
335
336 int NCBI_XCONNECT_EXPORT
337 Server_CheckLineMessage(BUF* buffer, const void *data, size_t size,
338 bool& seen_CR);
339
340 /////////////////////////////////////////////////////////////////////////////
341 ///
342 /// IServer_LineMessageHandler::
343 ///
344 /// TODO:
345 class NCBI_XCONNECT_EXPORT IServer_LineMessageHandler :
346 public IServer_MessageHandler
347 {
348 public:
IServer_LineMessageHandler()349 IServer_LineMessageHandler() :
350 IServer_MessageHandler(), m_SeenCR(false)
351 { }
CheckMessage(BUF * buffer,const void * data,size_t size)352 virtual int CheckMessage(BUF* buffer, const void *data, size_t size) {
353 return Server_CheckLineMessage(buffer, data, size, m_SeenCR);
354 }
355 private:
356 bool m_SeenCR;
357 };
358
359
360
361 /////////////////////////////////////////////////////////////////////////////
362 ///
363 /// IServer_StreamHandler::
364 ///
365 /// TODO:
366 class NCBI_XCONNECT_EXPORT IServer_StreamHandler :
367 public IServer_ConnectionHandler
368 {
369 public:
370 CNcbiIostream &GetStream();
371 private:
372 CConn_SocketStream m_Stream;
373 };
374
375
376
377 /////////////////////////////////////////////////////////////////////////////
378 ///
379 /// IServer_ConnectionFactory::
380 ///
381 /// Factory to be registered with CServer instance. You usually do not
382 /// need to implement it, default template CServer_ConnectionFactory will
383 /// suffice. You NEED to implement it manually to pass server-wide parameters
384 /// to ConnectionHandler instances, e.g. for implementation of gentle shutdown.
385 ///
386
387 class NCBI_XCONNECT_EXPORT IServer_ConnectionFactory
388 {
389 public:
390 /// What to do if the port is busy
391 enum EListenAction {
392 eLAFail = 0, // Can not live without this port, default
393 eLAIgnore = 1, // Do nothing, throw away this listener
394 eLARetry = 2 // Listener should provide another port to try
395 };
~IServer_ConnectionFactory()396 virtual ~IServer_ConnectionFactory() { }
397
398 /// @return
399 /// a new instance of handler for connection
400 virtual IServer_ConnectionHandler* Create(void) = 0;
401 /// Return desired action if the port, mentioned in AddListener is busy.
402 /// If the action is eLARetry, provide new port. The
OnFailure(unsigned short *)403 virtual EListenAction OnFailure(unsigned short* /* port */)
404 { return eLAFail; }
405 };
406
407
408
409 /////////////////////////////////////////////////////////////////////////////
410 ///
411 /// CServer_ConnectionFactory::
412 ///
413 /// Reasonable default implementation for IServer_ConnectionFactory
414 ///
415
416 template <class TServer_ConnectionHandler>
417 class CServer_ConnectionFactory : public IServer_ConnectionFactory
418 {
419 public:
Create()420 virtual IServer_ConnectionHandler* Create()
421 { return new TServer_ConnectionHandler(); }
422 };
423
424
425
426 /////////////////////////////////////////////////////////////////////////////
427 ///
428 /// SServer_Parameters::
429 ///
430 /// Settings for CServer
431 ///
432
433 struct NCBI_XCONNECT_EXPORT SServer_Parameters
434 {
435 /// Maximum # of open connections
436 unsigned int max_connections;
437 /// Temporarily close listener when queue fills?
438 bool temporarily_stop_listening;
439 /// Maximum t between exit checks
440 const STimeout* accept_timeout;
441 /// For how long to keep inactive non-listening sockets open
442 /// (default: 10 minutes)
443 const STimeout* idle_timeout;
444
445 // (settings for the thread pool)
446 unsigned int init_threads; ///< Number of initial threads
447 unsigned int max_threads; ///< Maximum simultaneous threads
448 unsigned int spawn_threshold; ///< Controls when to spawn more threads
449
450 /// Create structure with the default set of parameters
451 SServer_Parameters();
452 };
453
454
455
456 /////////////////////////////////////////////////////////////////////////////
457 ///
458 /// CServer_Exception::
459 ///
460 /// Exceptions thrown by CServer::Run()
461 ///
462
463 class NCBI_XCONNECT_EXPORT CServer_Exception
464 : EXCEPTION_VIRTUAL_BASE public CConnException
465 {
466 public:
467 enum EErrCode {
468 eBadParameters, ///< Out-of-range parameters given
469 eCouldntListen, ///< Unable to bind listening port
470 ePoolOverflow ///< Connection pool overflowed
471 };
472 virtual const char* GetErrCodeString(void) const override;
473 NCBI_EXCEPTION_DEFAULT(CServer_Exception, CConnException);
474 };
475
476
477
478 END_NCBI_SCOPE
479
480
481 /* @} */
482
483 #endif /* CONNECT___SERVER__HPP */
484