1 #ifndef CONNECT___SERVER__HPP
2 #define CONNECT___SERVER__HPP
3 
4 /* $Id: server.hpp 575325 2018-11-27 18:22:00Z ucko $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors:  Aaron Ucko, Victor Joukov, Denis Vakatov
30  *
31  */
32 
33 /// @file server.hpp
34 /// Framework to create multithreaded network servers with thread-per-request
35 /// scheduling.
36 
37 #include <connect/impl/thread_pool_for_server.hpp>
38 #include <connect/ncbi_conn_stream.hpp>
39 #include <connect/ncbi_conn_exception.hpp>
40 #include <connect/ncbi_socket.hpp>
41 
42 
43 /** @addtogroup ThreadedServer
44  *
45  * @{
46  */
47 
48 
49 BEGIN_NCBI_SCOPE
50 
51 
52 class  IServer_ConnectionFactory;
53 class  IServer_ConnectionBase;
54 struct SServer_Parameters;
55 class  CServer_ConnectionPool;
56 class  CServer_Connection;
57 
58 
59 /// Extended copy of the type EIO_Event allowing to distinguish between
60 /// connection closing from client and from ourselves
61 enum EServIO_Event {
62     eServIO_Open        = 0x00,
63     eServIO_Read        = 0x01,
64     eServIO_Write       = 0x02,
65     eServIO_ReadWrite   = 0x03, /**< eIO_Read | eIO_Write */
66     eServIO_ClientClose = 0x04,
67     eServIO_OurClose    = 0x08,
68     eServIO_Inactivity  = 0x10,
69     eServIO_Delete      = 0x20,
70     eServIO_Alarm       = 0x40
71 };
72 
73 
74 /// Transform EIO_Event type to EServIO_Event
75 inline EServIO_Event
IOEventToServIOEvent(EIO_Event event)76 IOEventToServIOEvent(EIO_Event event)
77 {
78     return (EServIO_Event) event;
79 }
80 
81 
82 class CNetCacheServer;
83 
84 /////////////////////////////////////////////////////////////////////////////
85 ///
86 ///  CServer::
87 ///
88 /// Thread-pool based server. On every event it allocates one of threads
89 /// from pool to serve the event, thereby making possible to serve large
90 /// number of concurrent connections efficiently. You need to subclass it
91 /// only if you want to provide gentle shutdown ability (override
92 /// ShutdownRequested) or process data in main thread on timeout (override
93 /// ProcessTimeout and set parameter accept_timeout to non-zero value).
94 ///
95 
96 class NCBI_XCONNECT_EXPORT CServer : protected CConnIniter
97 {
98 public:
99     // 'ctors
100     CServer(void);
101     virtual ~CServer();
102 
103     /// Register a listener
104     void AddListener(IServer_ConnectionFactory* factory,
105                      unsigned short             port);
106 
107     /// Removes a listener
108     /// @param port
109     ///  the listener on the port will be removed
110     /// @return
111     ///  true if the listener has been removed, false if the server does not
112     ///  listen on the port.
113     bool RemoveListener(unsigned short  port);
114 
115     ///
116     void SetParameters(const SServer_Parameters& new_params);
117 
118     ///
119     void GetParameters(SServer_Parameters* params);
120 
121     /// Start listening before the main loop. If called, tries
122     /// to listen on all requested ports for all listeners, correcting
123     /// errors by calling listeners' OnFailure
124     void StartListening(void);
125 
126     /// Enter the main loop
127     void Run(void);
128 
129     /// Submit request to be executed by the server thread pool
130     void SubmitRequest(const CRef<CStdRequest>& request);
131 
132     /// Mark connection as deferred for processing, i.e. do not poll on it
133     /// and wait when IsReadyToProcess() will return true.
134     void DeferConnectionProcessing(IServer_ConnectionBase* conn);
135     void DeferConnectionProcessing(CSocket* sock);
136 
137     /// Close connection. Method should be called only when closing is
138     /// initiated by server itself, because it will generate then event
139     /// eServIO_OurClose.
140     ///
141     /// @sa EServIO_Event
142     void CloseConnection(CSocket* sock);
143 
144     /// Add externally created connection to the connection pool which server
145     /// polls on. Throws exception if pool is full.
146     /// NOTE: events to this connection can come theoretically even
147     /// NOTE: if connection gets some error or its peer closes it then conn
148     /// object will be deleted after processing OnClose event. If you don't
149     /// want that you have to call RemoveConnectionFromPool while handling
150     /// OnClose.
151     void AddConnectionToPool(CServer_Connection* conn);
152     /// Remove externally created connection from pool.
153     void RemoveConnectionFromPool(CServer_Connection* conn);
154     /// Force poll cycle to make another iteration.
155     /// Should be called if IsReadyToProcess() for some connection handler
156     /// became true.
157     void WakeUpPollCycle(void);
158     /// Set custom suffix to use on all threads in the server's pool.
159     /// Value can be set only before call to Run(), any change of the value
160     /// after call to Run() will be ignored.
SetCustomThreadSuffix(const string & suffix)161     void SetCustomThreadSuffix(const string& suffix)
162     { m_ThreadSuffix = suffix; }
163 
164     /// Provides a list of ports on which the server is listening
165     /// @return
166     ///  currently listened ports
167     vector<unsigned short>  GetListenerPorts(void);
168 
169 protected:
170     /// Initialize the server
171     ///
172     /// Called by Run method before poll cycle.
Init()173     virtual void Init() {}
174 
175     /// Cleanup the server
176     ///
177     /// Called by Run method after poll cycle when all processing threads
178     /// are stopped, but before releasing listening ports. Here you're still
179     /// guaranteed that another instance running on the same set of ports will
180     /// fail at StartListening point.
Exit()181     virtual void Exit() {}
182 
183     /// Runs synchronously when no socket activity has occurred in a
184     /// while (as determined by m_AcceptTimeout).
185     /// @sa m_Parameters->accept_timeout
ProcessTimeout(void)186     virtual void ProcessTimeout(void) {}
187 
188     /// Runs synchronously between iterations.
189     /// @return
190     ///  whether to shut down service and return from Run.
ShutdownRequested(void)191     virtual bool ShutdownRequested(void) { return false; }
192 
193 private:
194     void x_DoRun(void);
195 
196     friend class CNetCacheServer;
GetThreadPool(void)197     CPoolOfThreads_ForServer* GetThreadPool(void) { return m_ThreadPool; }
198 
199     SServer_Parameters*         m_Parameters;
200     CServer_ConnectionPool*     m_ConnectionPool;
201     CPoolOfThreads_ForServer*   m_ThreadPool;
202     string m_ThreadSuffix;
203 };
204 
205 
206 /////////////////////////////////////////////////////////////////////////////
207 ///
208 ///  Error codes for OnOverflow method in IServer_ConnectionHandler
209 enum EOverflowReason
210 {
211     eOR_Unknown = 0,
212     eOR_ConnectionPoolFull,
213     eOR_RequestQueueFull,
214     eOR_UnpollableSocket
215 };
216 
217 
218 /////////////////////////////////////////////////////////////////////////////
219 ///
220 ///  IServer_ConnectionHandler::
221 ///
222 /// Implement this interface to provide server functionality.
223 ///
224 
225 class NCBI_XCONNECT_EXPORT IServer_ConnectionHandler
226 {
227 public:
~IServer_ConnectionHandler()228     virtual ~IServer_ConnectionHandler() { }
229 
230     /// Following three methods are guaranteed to be called NOT
231     /// at the same time as On*, so if you implement them
232     /// you should not guard the variables which they can use with
233     /// mutexes.
234     /// @param alarm_time
235     ///   Set this parameter to a pointer to a CTime object to recieve
236     ///   an OnTimer event at the moment in time specified by this object.
237     /// @return
238     ///   Returns the set of events for which Poll should check.
GetEventsToPollFor(const CTime **) const239     virtual EIO_Event GetEventsToPollFor(const CTime** /*alarm_time*/) const
240         { return eIO_Read; }
241     /// Returns the timeout for this connection
GetTimeout(void)242     virtual const STimeout* GetTimeout(void)
243         { return kDefaultTimeout; }
244     /// Returns connection handler's perception of whether we open or not.
245     /// It is unsafe to just close underlying socket because of the race,
246     /// emerging due to the fact that the socket can linger for a while.
IsOpen(void)247     virtual bool IsOpen(void)
248         { return true; }
249     /// Returns the handler's readiness to process input data or to write
250     /// some output data. OnRead() and OnWrite() are not called unless this
251     /// method return true.
IsReadyToProcess(void) const252     virtual bool IsReadyToProcess(void) const
253         { return true; }
254 
255 
256     /// Runs in response to an external event [asynchronous].
257     /// You can get socket by calling GetSocket(), if you close the socket
258     /// this object will be destroyed.
259     /// Individual events are:
260     /// A client has just established this connection.
261     virtual void OnOpen(void) = 0;
262     /// The client has just sent data.
263     virtual void OnRead(void) = 0;
264     /// The client is ready to receive data.
265     virtual void OnWrite(void) = 0;
266 
267     /// Type of connection closing
268     enum EClosePeer {
269         eOurClose,     ///< Connection closed by ourselves
270         eClientClose   ///< Connection closed by other peer
271     };
272 
273     /// The connection has closed (with information on type of closing)
OnClose(EClosePeer)274     virtual void OnClose(EClosePeer /*peer*/) { }
275 
276     /// Runs when a client has been idle for too long, prior to
277     /// closing the connection [synchronous].
OnTimeout(void)278     virtual void OnTimeout(void) { }
279 
280     /// This method is called at the moment in time specified earlier by the
281     /// alarm_time parameter of the GetEventsToPollFor method [synchronous].
OnTimer(void)282     virtual void OnTimer(void) { }
283 
284     /// Runs when there are insufficient resources to queue a
285     /// connection, prior to closing it. Provides a reason why the
286     /// connection is being close, which can be reported back to the client.
287     // See comment for CAcceptRequest::Process and CServer::CreateRequest
OnOverflow(EOverflowReason)288     virtual void OnOverflow(EOverflowReason) { }
289 
290     /// Runs when a socket error is detected
OnError(const string &)291     virtual void OnError(const string &  /*err_message*/) { }
292 
293     /// Get underlying socket
GetSocket(void)294     CSocket& GetSocket(void) { return *m_Socket; }
295 
296 public: // TODO: make it protected. Public is for DEBUG purposes only
297     friend class CServer_Connection;
SetSocket(CSocket * socket)298     void SetSocket(CSocket *socket) { m_Socket = socket; }
299 
300 private:
301     CSocket* m_Socket;
302 };
303 
304 
305 
306 /////////////////////////////////////////////////////////////////////////////
307 ///
308 /// IServer_MessageHandler::
309 ///
310 /// TODO:
311 class NCBI_XCONNECT_EXPORT IServer_MessageHandler :
312     public IServer_ConnectionHandler
313 {
314 public:
IServer_MessageHandler()315     IServer_MessageHandler() :
316         m_Buffer(0)
317     { }
~IServer_MessageHandler()318     virtual ~IServer_MessageHandler() { BUF_Destroy(m_Buffer); }
319     virtual void OnRead(void);
320     // You should implement this look-ahead function to decide, did you get
321     // a message in the series of read events. If not, you should return -1.
322     // If yes, return number of chars, beyond well formed message. E.g., if
323     // your message spans all the buffer, return 0. If you returned non-zero
324     // value, this piece of data will be used in the next CheckMessage to
325     // simplify client state management.
326     // You also need to copy bytes, comprising the message from data to buffer.
327     virtual int CheckMessage(BUF* buffer, const void *data, size_t size) = 0;
328     // Process incoming message in the buffer, by using
329     // BUF_Read(buffer, your_data_buffer, BUF_Size(buffer)).
330     virtual void OnMessage(BUF buffer) = 0;
331 private:
332     BUF m_Buffer;
333 };
334 
335 
336 int NCBI_XCONNECT_EXPORT
337 Server_CheckLineMessage(BUF* buffer, const void *data, size_t size,
338                         bool& seen_CR);
339 
340 /////////////////////////////////////////////////////////////////////////////
341 ///
342 /// IServer_LineMessageHandler::
343 ///
344 /// TODO:
345 class NCBI_XCONNECT_EXPORT IServer_LineMessageHandler :
346     public IServer_MessageHandler
347 {
348 public:
IServer_LineMessageHandler()349     IServer_LineMessageHandler() :
350         IServer_MessageHandler(), m_SeenCR(false)
351     { }
CheckMessage(BUF * buffer,const void * data,size_t size)352     virtual int CheckMessage(BUF* buffer, const void *data, size_t size) {
353         return Server_CheckLineMessage(buffer, data, size, m_SeenCR);
354     }
355 private:
356     bool m_SeenCR;
357 };
358 
359 
360 
361 /////////////////////////////////////////////////////////////////////////////
362 ///
363 /// IServer_StreamHandler::
364 ///
365 /// TODO:
366 class NCBI_XCONNECT_EXPORT IServer_StreamHandler :
367     public IServer_ConnectionHandler
368 {
369 public:
370     CNcbiIostream &GetStream();
371 private:
372     CConn_SocketStream m_Stream;
373 };
374 
375 
376 
377 /////////////////////////////////////////////////////////////////////////////
378 ///
379 ///  IServer_ConnectionFactory::
380 ///
381 /// Factory to be registered with CServer instance. You usually do not
382 /// need to implement it, default template CServer_ConnectionFactory will
383 /// suffice. You NEED to implement it manually to pass server-wide parameters
384 /// to ConnectionHandler instances, e.g. for implementation of gentle shutdown.
385 ///
386 
387 class NCBI_XCONNECT_EXPORT IServer_ConnectionFactory
388 {
389 public:
390     /// What to do if the port is busy
391     enum EListenAction {
392         eLAFail   = 0,   // Can not live without this port, default
393         eLAIgnore = 1,   // Do nothing, throw away this listener
394         eLARetry  = 2    // Listener should provide another port to try
395     };
~IServer_ConnectionFactory()396     virtual ~IServer_ConnectionFactory() { }
397 
398     /// @return
399     ///  a new instance of handler for connection
400     virtual IServer_ConnectionHandler* Create(void) = 0;
401     /// Return desired action if the port, mentioned in AddListener is busy.
402     /// If the action is eLARetry, provide new port. The
OnFailure(unsigned short *)403     virtual EListenAction OnFailure(unsigned short* /* port */)
404         { return eLAFail; }
405 };
406 
407 
408 
409 /////////////////////////////////////////////////////////////////////////////
410 ///
411 ///  CServer_ConnectionFactory::
412 ///
413 /// Reasonable default implementation for IServer_ConnectionFactory
414 ///
415 
416 template <class TServer_ConnectionHandler>
417 class CServer_ConnectionFactory : public IServer_ConnectionFactory
418 {
419 public:
Create()420     virtual IServer_ConnectionHandler* Create()
421         { return new TServer_ConnectionHandler(); }
422 };
423 
424 
425 
426 /////////////////////////////////////////////////////////////////////////////
427 ///
428 ///  SServer_Parameters::
429 ///
430 /// Settings for CServer
431 ///
432 
433 struct NCBI_XCONNECT_EXPORT SServer_Parameters
434 {
435     /// Maximum # of open connections
436     unsigned int    max_connections;
437     /// Temporarily close listener when queue fills?
438     bool            temporarily_stop_listening;
439     /// Maximum t between exit checks
440     const STimeout* accept_timeout;
441     /// For how long to keep inactive non-listening sockets open
442     /// (default:  10 minutes)
443     const STimeout* idle_timeout;
444 
445     // (settings for the thread pool)
446     unsigned int    init_threads;    ///< Number of initial threads
447     unsigned int    max_threads;     ///< Maximum simultaneous threads
448     unsigned int    spawn_threshold; ///< Controls when to spawn more threads
449 
450     /// Create structure with the default set of parameters
451     SServer_Parameters();
452 };
453 
454 
455 
456 /////////////////////////////////////////////////////////////////////////////
457 ///
458 ///  CServer_Exception::
459 ///
460 /// Exceptions thrown by CServer::Run()
461 ///
462 
463 class NCBI_XCONNECT_EXPORT CServer_Exception
464     : EXCEPTION_VIRTUAL_BASE public CConnException
465 {
466 public:
467     enum EErrCode {
468         eBadParameters, ///< Out-of-range parameters given
469         eCouldntListen, ///< Unable to bind listening port
470         ePoolOverflow   ///< Connection pool overflowed
471     };
472     virtual const char* GetErrCodeString(void) const override;
473     NCBI_EXCEPTION_DEFAULT(CServer_Exception, CConnException);
474 };
475 
476 
477 
478 END_NCBI_SCOPE
479 
480 
481 /* @} */
482 
483 #endif  /* CONNECT___SERVER__HPP */
484