1 #ifndef TCPDAEMON__HPP
2 #define TCPDAEMON__HPP
3 
4 /*  $Id: tcp_daemon.hpp 629837 2021-04-22 12:47:49Z ivanov $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Dmitri Dmitrienko
30  *
31  * File Description:
32  *
33  */
34 
35 #include <stdexcept>
36 #include <sstream>
37 #include <vector>
38 #include <string>
39 #include <memory>
40 #include <cassert>
41 
42 #include "uv.h"
43 #include "uv_extra.h"
44 
45 #include "UvHelper.hpp"
46 
47 #include "pubseq_gateway_exception.hpp"
48 #include "pubseq_gateway_logging.hpp"
49 USING_NCBI_SCOPE;
50 
51 #include "shutdown_data.hpp"
52 extern SShutdownData       g_ShutdownData;
53 
54 
55 void CollectGarbage(void);
56 
57 namespace TSL {
58 
59 template<typename P, typename U, typename D>
60 class CTcpWorkersList;
61 
62 template<typename P, typename U, typename D>
63 class CTcpDaemon;
64 
65 template<typename U>
66 struct connection_ctx_t
67 {
68     uv_tcp_t    conn;
69     U           u;
70 };
71 
72 
73 template<typename P, typename U, typename D>
74 struct CTcpWorker;
75 
76 
77 template<typename P, typename U, typename D>
78 class CTcpWorkersList
79 {
80     friend class CTcpDaemon<P, U, D>;
81 
82 private:
83     std::vector<std::unique_ptr<CTcpWorker<P, U, D>>>   m_workers;
84     CTcpDaemon<P, U, D> *                               m_daemon;
85     std::function<void(CTcpDaemon<P, U, D>& daemon)>    m_on_watch_dog;
86 
87 protected:
s_WorkerExecute(void * _worker)88     static void s_WorkerExecute(void *  _worker)
89     {
90         CTcpWorker<P, U, D> *   worker =
91                                     static_cast<CTcpWorker<P, U, D>*>(_worker);
92         worker->Execute();
93         PSG_INFO("worker " << worker->m_id << " finished");
94     }
95 
96 public:
97     static uv_key_t         s_thread_worker_key;
98 
CTcpWorkersList(CTcpDaemon<P,U,D> * daemon)99     CTcpWorkersList(CTcpDaemon<P, U, D> *  daemon) :
100         m_daemon(daemon)
101     {}
102 
~CTcpWorkersList()103     ~CTcpWorkersList()
104     {
105         PSG_INFO("CTcpWorkersList::~()>>");
106         JoinWorkers();
107         PSG_INFO("CTcpWorkersList::~()<<");
108         m_daemon->m_workers = nullptr;
109     }
110 
Start(struct uv_export_t * exp,unsigned short nworkers,D & d,std::function<void (CTcpDaemon<P,U,D> & daemon)> OnWatchDog=nullptr)111     void Start(struct uv_export_t *  exp, unsigned short  nworkers, D &  d,
112                std::function<void(CTcpDaemon<P, U, D>& daemon)> OnWatchDog = nullptr)
113     {
114         int         err_code;
115 
116         for (unsigned int  i = 0; i < nworkers; ++i) {
117             m_workers.emplace_back(new CTcpWorker<P, U, D>(i + 1, exp,
118                                                            m_daemon, this, d));
119         }
120 
121         for (auto &  it: m_workers) {
122             CTcpWorker<P, U, D> *worker = it.get();
123             err_code = uv_thread_create(&worker->m_thread, s_WorkerExecute,
124                                         static_cast<void*>(worker));
125             if (err_code != 0)
126                 NCBI_THROW2(CPubseqGatewayUVException, eUvThreadCreateFailure,
127                             "uv_thread_create failed", err_code);
128         }
129         m_on_watch_dog = OnWatchDog;
130         m_daemon->m_workers = this;
131     }
132 
AnyWorkerIsRunning(void)133     bool AnyWorkerIsRunning(void)
134     {
135         for (auto & it : m_workers)
136             if (!it->m_shutdown)
137                 return true;
138         return false;
139     }
140 
KillAll(void)141     void KillAll(void)
142     {
143         for (auto & it : m_workers)
144             it->Stop();
145     }
146 
NumOfRequests(void)147     uint64_t NumOfRequests(void)
148     {
149         uint64_t        rv = 0;
150         for (auto & it : m_workers)
151             rv += it->m_request_count;
152         return rv;
153     }
154 
s_OnWatchDog(uv_timer_t * handle)155     static void s_OnWatchDog(uv_timer_t *  handle)
156     {
157         if (g_ShutdownData.m_ShutdownRequested) {
158             if (g_ShutdownData.m_ActiveRequestCount == 0) {
159                 uv_stop(handle->loop);
160             } else {
161                 if (chrono::steady_clock::now() > g_ShutdownData.m_Expired) {
162                     PSG_MESSAGE("Shutdown timeout is over when there are "
163                                 "unfinished requests. Exiting immediately.");
164                     exit(0);
165                 }
166             }
167             return;
168         }
169 
170         CTcpWorkersList<P, U, D> *      self =
171                         static_cast<CTcpWorkersList<P, U, D>*>(handle->data);
172 
173         if (!self->AnyWorkerIsRunning()) {
174             uv_stop(handle->loop);
175         } else {
176             if (self->m_on_watch_dog) {
177                 self->m_on_watch_dog(*self->m_daemon);
178             }
179             CollectGarbage();
180         }
181     }
182 
JoinWorkers(void)183     void JoinWorkers(void)
184     {
185         int         err_code;
186         for (auto & it : m_workers) {
187             CTcpWorker<P, U, D> *worker = it.get();
188             if (!worker->m_joined) {
189                 worker->m_joined = true;
190                 while (1) {
191                     err_code = uv_thread_join(&worker->m_thread);
192                     if (!err_code) {
193                         worker->m_thread = 0;
194                         break;
195                     } else if (-err_code != EAGAIN) {
196                         PSG_ERROR("uv_thread_join failed: " << err_code);
197                         break;
198                     }
199                 }
200             }
201         }
202     }
203 };
204 
205 
206 struct CTcpWorkerInternal_t {
207     CUvLoop         m_loop;
208     uv_tcp_t        m_listener;
209     uv_async_t      m_async_stop;
210     uv_async_t      m_async_work;
211     uv_timer_t      m_timer;
212 
CTcpWorkerInternal_tTSL::CTcpWorkerInternal_t213     CTcpWorkerInternal_t() :
214         m_listener({0}),
215         m_async_stop({0}),
216         m_async_work({0}),
217         m_timer({0})
218     {}
219 };
220 
221 
222 template<typename P, typename U, typename D>
223 struct CTcpWorker
224 {
225     unsigned int                            m_id;
226     uv_thread_t                             m_thread;
227     std::atomic_uint_fast64_t               m_request_count;
228     std::atomic_uint_fast16_t               m_connection_count;
229     std::atomic_bool                        m_started;
230     std::atomic_bool                        m_shutdown;
231     std::atomic_bool                        m_shuttingdown;
232     bool                                    m_close_all_issued;
233     bool                                    m_joined;
234     int                                     m_error;
235     std::list<std::tuple<uv_tcp_t, U>>      m_connected_list;
236     std::list<std::tuple<uv_tcp_t, U>>      m_free_list;
237     struct uv_export_t *                    m_exp;
238     CTcpWorkersList<P, U, D> *              m_guard;
239     CTcpDaemon<P, U, D> *                   m_daemon;
240     std::string                             m_last_error;
241     D &                                     m_d;
242     P                                       m_protocol;
243     std::unique_ptr<CTcpWorkerInternal_t>   m_internal;
244 
CTcpWorkerTSL::CTcpWorker245     CTcpWorker(unsigned int  id, struct uv_export_t *  exp,
246                CTcpDaemon<P, U, D> *  daemon,
247                CTcpWorkersList<P, U, D> *  guard, D &  d) :
248         m_id(id),
249         m_thread(0),
250         m_request_count(0),
251         m_connection_count(0),
252         m_started(false),
253         m_shutdown(false),
254         m_shuttingdown(false),
255         m_close_all_issued(false),
256         m_joined(false),
257         m_error(0),
258         m_exp(exp),
259         m_guard(guard),
260         m_daemon(daemon),
261         m_d(d),
262         m_protocol(m_d)
263     {}
264 
StopTSL::CTcpWorker265     void Stop(void)
266     {
267         if (m_started && !m_shutdown && !m_shuttingdown) {
268             uv_async_send(&m_internal->m_async_stop);
269         }
270     }
271 
ExecuteTSL::CTcpWorker272     void Execute(void)
273     {
274         try {
275             if (m_internal)
276                 NCBI_THROW(CPubseqGatewayException, eWorkerAlreadyStarted,
277                            "Worker has already been started");
278 
279             m_internal.reset(new CTcpWorkerInternal_t);
280 
281             int         err_code;
282             uv_key_set(&CTcpWorkersList<P, U, D>::s_thread_worker_key, this);
283 
284             m_protocol.BeforeStart();
285             err_code = uv_import(m_internal->m_loop.Handle(),
286                                  reinterpret_cast<uv_stream_t*>(&m_internal->m_listener),
287                                  m_exp);
288             // PSG_ERROR("worker " << worker->m_id << " uv_import: " << err_code);
289             if (err_code != 0)
290                 NCBI_THROW2(CPubseqGatewayUVException, eUvImportFailure,
291                             "uv_import failed", err_code);
292 
293             m_internal->m_listener.data = this;
294             err_code = uv_listen(reinterpret_cast<uv_stream_t*>(&m_internal->m_listener),
295                                  m_daemon->m_backlog, s_OnTcpConnection);
296             if (err_code != 0)
297                 NCBI_THROW2(CPubseqGatewayUVException, eUvListenFailure,
298                             "uv_listen failed", err_code);
299             m_internal->m_listener.data = this;
300 
301             err_code = uv_async_init(m_internal->m_loop.Handle(),
302                                      &m_internal->m_async_stop, s_OnAsyncStop);
303             if (err_code != 0)
304                 NCBI_THROW2(CPubseqGatewayUVException, eUvAsyncInitFailure,
305                             "uv_async_init failed", err_code);
306             m_internal->m_async_stop.data = this;
307 
308             err_code = uv_async_init(m_internal->m_loop.Handle(),
309                                      &m_internal->m_async_work, s_OnAsyncWork);
310             if (err_code != 0)
311                 NCBI_THROW2(CPubseqGatewayUVException, eUvAsyncInitFailure,
312                             "uv_async_init failed", err_code);
313             m_internal->m_async_work.data = this;
314 
315             err_code = uv_timer_init(m_internal->m_loop.Handle(),
316                                      &m_internal->m_timer);
317             if (err_code != 0)
318                 NCBI_THROW2(CPubseqGatewayUVException, eUvTimerInitFailure,
319                             "uv_timer_init failed", err_code);
320             m_internal->m_timer.data = this;
321 
322             uv_timer_start(&m_internal->m_timer, s_OnTimer, 1000, 1000);
323 
324             m_started = true;
325             m_protocol.ThreadStart(m_internal->m_loop.Handle(), this);
326 
327             err_code = uv_run(m_internal->m_loop.Handle(), UV_RUN_DEFAULT);
328             PSG_INFO("uv_run (1) worker " << m_id <<
329                      " returned " <<  err_code);
330         } catch (const CPubseqGatewayUVException &  exc) {
331             m_error = exc.GetUVLibraryErrorCode();
332             m_last_error = exc.GetMsg();
333         } catch (const CException &  exc) {
334             m_error = exc.GetErrCode();
335             m_last_error = exc.GetMsg();
336         }
337 
338         m_shuttingdown = true;
339         PSG_INFO("worker " << m_id << " is closing");
340         if (m_internal) {
341             try {
342                 int         err_code;
343 
344                 if (m_internal->m_listener.type != 0)
345                     uv_close(reinterpret_cast<uv_handle_t*>(&m_internal->m_listener),
346                              NULL);
347 
348                 CloseAll();
349 
350                 while (m_connection_count > 0)
351                     uv_run(m_internal->m_loop.Handle(), UV_RUN_NOWAIT);
352 
353                 if (m_internal->m_async_stop.type != 0)
354                     uv_close(reinterpret_cast<uv_handle_t*>(&m_internal->m_async_stop),
355                              NULL);
356                 if (m_internal->m_async_work.type != 0)
357                     uv_close(reinterpret_cast<uv_handle_t*>(&m_internal->m_async_work),
358                              NULL);
359                 if (m_internal->m_timer.type != 0)
360                     uv_close(reinterpret_cast<uv_handle_t*>(&m_internal->m_timer),
361                              NULL);
362 
363                 m_protocol.ThreadStop();
364 
365                 err_code = uv_run(m_internal->m_loop.Handle(), UV_RUN_DEFAULT);
366 
367                 if (err_code != 0)
368                     PSG_INFO("worker " << m_id <<
369                              ", uv_run (2) returned " << err_code <<
370                              ", st: " << m_started.load());
371                 // uv_walk(m_internal->m_loop.Handle(), s_LoopWalk, this);
372                 err_code = m_internal->m_loop.Close();
373                 if (err_code != 0) {
374                     PSG_INFO("worker " << m_id <<
375                              ", uv_loop_close returned " << err_code <<
376                              ", st: " << m_started.load());
377                     uv_walk(m_internal->m_loop.Handle(), s_LoopWalk, this);
378                 }
379                 m_internal.reset(nullptr);
380             } catch(...) {
381                 PSG_ERROR("unexpected exception while shutting down worker " <<
382                           m_id);
383             }
384         }
385     }
386 
CloseAllTSL::CTcpWorker387     void CloseAll(void)
388     {
389         assert(m_shuttingdown);
390         if (!m_close_all_issued) {
391             m_close_all_issued = true;
392             for (auto  it = m_connected_list.begin();
393                  it != m_connected_list.end(); ++it) {
394                 uv_tcp_t *tcp = &std::get<0>(*it);
395                 uv_close(reinterpret_cast<uv_handle_t*>(tcp), s_OnCliClosed);
396             }
397         }
398     }
399 
OnCliClosedTSL::CTcpWorker400     void OnCliClosed(uv_handle_t *  handle)
401     {
402         m_daemon->ClientDisconnected();
403         --m_connection_count;
404 
405         uv_tcp_t *tcp = reinterpret_cast<uv_tcp_t*>(handle);
406         for (auto it = m_connected_list.begin();
407              it != m_connected_list.end(); ++it) {
408             if (tcp == &std::get<0>(*it)) {
409                 m_protocol.OnClosedConnection(reinterpret_cast<uv_stream_t*>(handle),
410                                               &std::get<1>(*it));
411                 m_free_list.splice(m_free_list.begin(), m_connected_list, it);
412                 return;
413             }
414         }
415         assert(false);
416     }
417 
WakeWorkerTSL::CTcpWorker418     void WakeWorker(void)
419     {
420         if (m_internal)
421             uv_async_send(&m_internal->m_async_work);
422     }
423 
GetConnListTSL::CTcpWorker424     std::list<std::tuple<uv_tcp_t, U>> &  GetConnList(void)
425     {
426         return m_connected_list;
427     }
428 
429 private:
OnAsyncWorkTSL::CTcpWorker430     void OnAsyncWork(void)
431     {
432         // If shutdown is in progress, close outstanding requests
433         // otherwise pick data from them and send back to the client
434         m_protocol.OnAsyncWork(m_shuttingdown || m_shutdown);
435     }
436 
s_OnAsyncWorkTSL::CTcpWorker437     static void s_OnAsyncWork(uv_async_t *  handle)
438     {
439         PSG_INFO("Worker async work requested");
440         CTcpWorker<P, U, D> *       worker =
441             static_cast<CTcpWorker<P, U, D>*>(
442                     uv_key_get(&CTcpWorkersList<P, U, D>::s_thread_worker_key));
443         worker->OnAsyncWork();
444     }
445 
OnTimerTSL::CTcpWorker446     void OnTimer(void)
447     {
448         m_protocol.OnTimer();
449     }
450 
s_OnTimerTSL::CTcpWorker451     static void s_OnTimer(uv_timer_t *  handle)
452     {
453         CTcpWorker<P, U, D> *           worker =
454             static_cast<CTcpWorker<P, U, D>*>(
455                     uv_key_get(&CTcpWorkersList<P, U, D>::s_thread_worker_key));
456         worker->OnTimer();
457     }
458 
s_OnAsyncStopTSL::CTcpWorker459     static void s_OnAsyncStop(uv_async_t *  handle)
460     {
461         PSG_INFO("Worker async stop requested");
462         uv_stop(handle->loop);
463     }
464 
s_OnTcpConnectionTSL::CTcpWorker465     static void s_OnTcpConnection(uv_stream_t *  listener, const int  status)
466     {
467         if (listener && status == 0) {
468             CTcpWorker<P, U, D> *       worker =
469                 static_cast<CTcpWorker<P, U, D>*>(listener->data);
470             worker->OnTcpConnection(listener);
471         }
472     }
473 
s_OnCliClosedTSL::CTcpWorker474     static void s_OnCliClosed(uv_handle_t *  handle)
475     {
476         CTcpWorker<P, U, D> *           worker =
477             static_cast<CTcpWorker<P, U, D>*>(
478                     uv_key_get(&CTcpWorkersList<P, U, D>::s_thread_worker_key));
479         worker->OnCliClosed(handle);
480     }
481 
s_LoopWalkTSL::CTcpWorker482     static void s_LoopWalk(uv_handle_t *  handle, void *  arg)
483     {
484         CTcpWorker<P, U, D> *           worker = arg ?
485                                 static_cast<CTcpWorker<P, U, D>*>(arg) : NULL;
486         PSG_INFO("Handle " << handle <<
487                  " (" << handle->type <<
488                  ") @ worker " << (worker ? worker->m_id : -1) <<
489                  " (" << worker << ")");
490     }
491 
OnTcpConnectionTSL::CTcpWorker492     void OnTcpConnection(uv_stream_t *  listener)
493     {
494         if (m_free_list.empty())
495             m_free_list.push_back(std::make_tuple(uv_tcp_t{0}, U()));
496 
497         auto        it = m_free_list.begin();
498         uv_tcp_t *  tcp = &std::get<0>(*it);
499         int         err_code = uv_tcp_init(m_internal->m_loop.Handle(), tcp);
500 
501         if (err_code != 0)
502             return;
503 
504         uv_tcp_nodelay(tcp, 1);
505 
506         tcp->data = this;
507         m_connected_list.splice(m_connected_list.begin(), m_free_list, it);
508 
509         err_code = uv_accept(listener, reinterpret_cast<uv_stream_t*>(tcp));
510         ++m_connection_count;
511         bool b = m_daemon->ClientConnected();
512 
513         if (err_code != 0 || !b || m_shuttingdown) {
514             uv_close(reinterpret_cast<uv_handle_t*>(tcp), s_OnCliClosed);
515             return;
516         }
517         std::get<1>(*it).Reset();
518         m_protocol.OnNewConnection(reinterpret_cast<uv_stream_t*>(tcp),
519                                    &std::get<1>(*it), s_OnCliClosed);
520     }
521 };
522 
523 
524 template<typename P, typename U, typename D>
525 class CTcpDaemon
526 {
527 private:
528     std::string                     m_address;
529     unsigned short                  m_port;
530     unsigned short                  m_num_workers;
531     unsigned short                  m_backlog;
532     unsigned short                  m_max_connections;
533     CTcpWorkersList<P, U, D> *      m_workers;
534     std::atomic_uint_fast16_t       m_connection_count;
535 
536     friend class CTcpWorkersList<P, U, D>;
537     friend class CTcpWorker<P, U, D>;
538 
539 private:
s_OnMainSigInt(uv_signal_t *,int)540     static void s_OnMainSigInt(uv_signal_t *  /* req */, int  /* signum */)
541     {
542         PSG_MESSAGE("SIGINT received. Immediate shutdown performed.");
543         exit(0);
544         // The uv_stop() may hang if some syncronous long operation is in
545         // progress. So it was decided to use exit() which is not a big problem
546         // for PSG because it is a stateless server.
547         // uv_stop(req->loop);
548     }
549 
s_OnMainSigTerm(uv_signal_t *,int)550     static void s_OnMainSigTerm(uv_signal_t *  /* req */, int  /* signum */)
551     {
552         auto        now = chrono::steady_clock::now();
553         auto        expiration = now + chrono::hours(24);
554 
555         if (g_ShutdownData.m_ShutdownRequested) {
556             if (expiration >= g_ShutdownData.m_Expired) {
557                 PSG_MESSAGE("SIGTERM received. The previous shutdown "
558                             "expiration is shorter than this one. Ignored.");
559                 return;
560             }
561         }
562 
563         PSG_MESSAGE("SIGTERM received. Graceful shutdown is initiated");
564         g_ShutdownData.m_Expired = expiration;
565         g_ShutdownData.m_ShutdownRequested = true;
566     }
567 
s_OnMainSigHup(uv_signal_t *,int)568     static void s_OnMainSigHup(uv_signal_t *  /* req */, int  /* signum */)
569     { PSG_MESSAGE("SIGHUP received. Ignoring."); }
570 
s_OnMainSigUsr1(uv_signal_t *,int)571     static void s_OnMainSigUsr1(uv_signal_t *  /* req */, int  /* signum */)
572     { PSG_MESSAGE("SIGUSR1 received. Ignoring."); }
573 
s_OnMainSigUsr2(uv_signal_t *,int)574     static void s_OnMainSigUsr2(uv_signal_t *  /* req */, int  /* signum */)
575     { PSG_MESSAGE("SIGUSR2 received. Ignoring."); }
576 
s_OnMainSigWinch(uv_signal_t *,int)577     static void s_OnMainSigWinch(uv_signal_t *  /* req */, int  /* signum */)
578     { PSG_MESSAGE("SIGWINCH received. Ignoring."); }
579 
ClientConnected(void)580     bool ClientConnected(void)
581     {
582         uint16_t n = ++m_connection_count;
583         return n < m_max_connections;
584     }
585 
ClientDisconnected(void)586     bool ClientDisconnected(void)
587     {
588         uint16_t n = --m_connection_count;
589         return n < m_max_connections;
590     }
591 
592 
593 protected:
594     static constexpr const char IPC_PIPE_NAME[] = "tcp_daemon_startup_rpc";
595 
596 public:
CTcpDaemon(const std::string & Address,unsigned short Port,unsigned short NumWorkers,unsigned short BackLog,unsigned short MaxConnections)597     CTcpDaemon(const std::string &  Address, unsigned short  Port,
598                unsigned short  NumWorkers, unsigned short  BackLog,
599                unsigned short  MaxConnections) :
600         m_address(Address),
601         m_port(Port),
602         m_num_workers(NumWorkers),
603         m_backlog(BackLog),
604         m_max_connections(MaxConnections),
605         m_workers(nullptr),
606         m_connection_count(0)
607     {}
608 
OnRequest(P ** p)609     bool OnRequest(P **  p)
610     {
611         CTcpWorker<P, U, D> *   worker = static_cast<CTcpWorker<P, U, D>*>(
612                     uv_key_get(&CTcpWorkersList<P, U, D>::s_thread_worker_key));
613         if (worker->m_shutdown) {
614             worker->CloseAll();
615             *p = nullptr;
616             return false;
617         }
618 
619         ++worker->m_request_count;
620         *p = &worker->m_protocol;
621         return true;
622     }
623 
NumOfRequests(void)624     uint64_t NumOfRequests(void)
625     {
626         return m_workers ? m_workers->NumOfRequests() : 0;
627     }
628 
NumOfConnections(void) const629     uint16_t NumOfConnections(void) const
630     {
631         return m_connection_count;
632     }
633 
Run(D & d,std::function<void (CTcpDaemon<P,U,D> & daemon)> OnWatchDog=nullptr)634     void Run(D &  d,
635              std::function<void(CTcpDaemon<P, U, D>& daemon)>
636                                                     OnWatchDog = nullptr)
637     {
638         int         rc;
639 
640         if (m_address.empty())
641             NCBI_THROW(CPubseqGatewayException, eAddressEmpty,
642                        "Failed to start daemon: address is empty");
643         if (m_port == 0)
644             NCBI_THROW(CPubseqGatewayException, ePortNotSpecified,
645                        "Failed to start daemon: port is not specified");
646 
647         signal(SIGPIPE, SIG_IGN);
648         if (CTcpWorkersList<P, U, D>::s_thread_worker_key == 0) {
649             rc = uv_key_create(&CTcpWorkersList<P, U, D>::s_thread_worker_key);
650             if (rc != 0)
651                 NCBI_THROW2(CPubseqGatewayUVException, eUvKeyCreateFailure,
652                             "uv_key_create failed", rc);
653         }
654 
655         CTcpWorkersList<P, U, D>    workers(this);
656         {{
657             CUvLoop         loop;
658 
659             CUvSignal       sigint(loop.Handle());
660             sigint.Start(SIGINT, s_OnMainSigInt);
661 
662             CUvSignal       sigterm(loop.Handle());
663             sigterm.Start(SIGTERM, s_OnMainSigTerm);
664 
665             CUvSignal       sighup(loop.Handle());
666             sighup.Start(SIGHUP, s_OnMainSigHup);
667 
668             CUvSignal       sigusr1(loop.Handle());
669             sigusr1.Start(SIGUSR1, s_OnMainSigUsr1);
670 
671             CUvSignal       sigusr2(loop.Handle());
672             sigusr2.Start(SIGUSR2, s_OnMainSigUsr2);
673 
674             CUvSignal       sigwinch(loop.Handle());
675             sigwinch.Start(SIGWINCH, s_OnMainSigWinch);
676 
677 
678             CUvTcp          listener(loop.Handle());
679             listener.Bind(m_address.c_str(), m_port);
680 
681             struct uv_export_t *    exp = NULL;
682             rc = uv_export_start(loop.Handle(),
683                                  reinterpret_cast<uv_stream_t*>(listener.Handle()),
684                                  IPC_PIPE_NAME, m_num_workers, &exp);
685             if (rc)
686                 NCBI_THROW2(CPubseqGatewayUVException, eUvExportStartFailure,
687                             "uv_export_start failed", rc);
688 
689             try {
690                 workers.Start(exp, m_num_workers, d, OnWatchDog);
691             } catch (const exception &  exc) {
692                 uv_export_close(exp);
693                 throw;
694             }
695 
696             rc = uv_export_finish(exp);
697             if (rc)
698                 NCBI_THROW2(CPubseqGatewayUVException, eUvExportWaitFailure,
699                             "uv_export_wait failed", rc);
700 
701             listener.Close(nullptr);
702 
703             P::DaemonStarted();
704 
705             uv_timer_t      watch_dog;
706             uv_timer_init(loop.Handle(), &watch_dog);
707             watch_dog.data = &workers;
708             uv_timer_start(&watch_dog, workers.s_OnWatchDog, 1000, 1000);
709 
710             uv_run(loop.Handle(), UV_RUN_DEFAULT);
711 
712             uv_close(reinterpret_cast<uv_handle_t*>(&watch_dog), NULL);
713             workers.KillAll();
714 
715             P::DaemonStopped();
716         }}
717     }
718 };
719 
720 
721 template<typename P, typename U, typename D>
722 uv_key_t CTcpWorkersList<P, U, D>::s_thread_worker_key;
723 
724 
725 template<typename P, typename U, typename D>
726 constexpr const char CTcpDaemon<P, U, D>::IPC_PIPE_NAME[];
727 
728 }
729 
730 #endif
731