1 #ifndef TCPDAEMON__HPP 2 #define TCPDAEMON__HPP 3 4 /* $Id: tcp_daemon.hpp 629837 2021-04-22 12:47:49Z ivanov $ 5 * =========================================================================== 6 * 7 * PUBLIC DOMAIN NOTICE 8 * National Center for Biotechnology Information 9 * 10 * This software/database is a "United States Government Work" under the 11 * terms of the United States Copyright Act. It was written as part of 12 * the author's official duties as a United States Government employee and 13 * thus cannot be copyrighted. This software/database is freely available 14 * to the public for use. The National Library of Medicine and the U.S. 15 * Government have not placed any restriction on its use or reproduction. 16 * 17 * Although all reasonable efforts have been taken to ensure the accuracy 18 * and reliability of the software and data, the NLM and the U.S. 19 * Government do not and cannot warrant the performance or results that 20 * may be obtained by using this software or data. The NLM and the U.S. 21 * Government disclaim all warranties, express or implied, including 22 * warranties of performance, merchantability or fitness for any particular 23 * purpose. 24 * 25 * Please cite the author in any work or product based on this material. 26 * 27 * =========================================================================== 28 * 29 * Authors: Dmitri Dmitrienko 30 * 31 * File Description: 32 * 33 */ 34 35 #include <stdexcept> 36 #include <sstream> 37 #include <vector> 38 #include <string> 39 #include <memory> 40 #include <cassert> 41 42 #include "uv.h" 43 #include "uv_extra.h" 44 45 #include "UvHelper.hpp" 46 47 #include "pubseq_gateway_exception.hpp" 48 #include "pubseq_gateway_logging.hpp" 49 USING_NCBI_SCOPE; 50 51 #include "shutdown_data.hpp" 52 extern SShutdownData g_ShutdownData; 53 54 55 void CollectGarbage(void); 56 57 namespace TSL { 58 59 template<typename P, typename U, typename D> 60 class CTcpWorkersList; 61 62 template<typename P, typename U, typename D> 63 class CTcpDaemon; 64 65 template<typename U> 66 struct connection_ctx_t 67 { 68 uv_tcp_t conn; 69 U u; 70 }; 71 72 73 template<typename P, typename U, typename D> 74 struct CTcpWorker; 75 76 77 template<typename P, typename U, typename D> 78 class CTcpWorkersList 79 { 80 friend class CTcpDaemon<P, U, D>; 81 82 private: 83 std::vector<std::unique_ptr<CTcpWorker<P, U, D>>> m_workers; 84 CTcpDaemon<P, U, D> * m_daemon; 85 std::function<void(CTcpDaemon<P, U, D>& daemon)> m_on_watch_dog; 86 87 protected: s_WorkerExecute(void * _worker)88 static void s_WorkerExecute(void * _worker) 89 { 90 CTcpWorker<P, U, D> * worker = 91 static_cast<CTcpWorker<P, U, D>*>(_worker); 92 worker->Execute(); 93 PSG_INFO("worker " << worker->m_id << " finished"); 94 } 95 96 public: 97 static uv_key_t s_thread_worker_key; 98 CTcpWorkersList(CTcpDaemon<P,U,D> * daemon)99 CTcpWorkersList(CTcpDaemon<P, U, D> * daemon) : 100 m_daemon(daemon) 101 {} 102 ~CTcpWorkersList()103 ~CTcpWorkersList() 104 { 105 PSG_INFO("CTcpWorkersList::~()>>"); 106 JoinWorkers(); 107 PSG_INFO("CTcpWorkersList::~()<<"); 108 m_daemon->m_workers = nullptr; 109 } 110 Start(struct uv_export_t * exp,unsigned short nworkers,D & d,std::function<void (CTcpDaemon<P,U,D> & daemon)> OnWatchDog=nullptr)111 void Start(struct uv_export_t * exp, unsigned short nworkers, D & d, 112 std::function<void(CTcpDaemon<P, U, D>& daemon)> OnWatchDog = nullptr) 113 { 114 int err_code; 115 116 for (unsigned int i = 0; i < nworkers; ++i) { 117 m_workers.emplace_back(new CTcpWorker<P, U, D>(i + 1, exp, 118 m_daemon, this, d)); 119 } 120 121 for (auto & it: m_workers) { 122 CTcpWorker<P, U, D> *worker = it.get(); 123 err_code = uv_thread_create(&worker->m_thread, s_WorkerExecute, 124 static_cast<void*>(worker)); 125 if (err_code != 0) 126 NCBI_THROW2(CPubseqGatewayUVException, eUvThreadCreateFailure, 127 "uv_thread_create failed", err_code); 128 } 129 m_on_watch_dog = OnWatchDog; 130 m_daemon->m_workers = this; 131 } 132 AnyWorkerIsRunning(void)133 bool AnyWorkerIsRunning(void) 134 { 135 for (auto & it : m_workers) 136 if (!it->m_shutdown) 137 return true; 138 return false; 139 } 140 KillAll(void)141 void KillAll(void) 142 { 143 for (auto & it : m_workers) 144 it->Stop(); 145 } 146 NumOfRequests(void)147 uint64_t NumOfRequests(void) 148 { 149 uint64_t rv = 0; 150 for (auto & it : m_workers) 151 rv += it->m_request_count; 152 return rv; 153 } 154 s_OnWatchDog(uv_timer_t * handle)155 static void s_OnWatchDog(uv_timer_t * handle) 156 { 157 if (g_ShutdownData.m_ShutdownRequested) { 158 if (g_ShutdownData.m_ActiveRequestCount == 0) { 159 uv_stop(handle->loop); 160 } else { 161 if (chrono::steady_clock::now() > g_ShutdownData.m_Expired) { 162 PSG_MESSAGE("Shutdown timeout is over when there are " 163 "unfinished requests. Exiting immediately."); 164 exit(0); 165 } 166 } 167 return; 168 } 169 170 CTcpWorkersList<P, U, D> * self = 171 static_cast<CTcpWorkersList<P, U, D>*>(handle->data); 172 173 if (!self->AnyWorkerIsRunning()) { 174 uv_stop(handle->loop); 175 } else { 176 if (self->m_on_watch_dog) { 177 self->m_on_watch_dog(*self->m_daemon); 178 } 179 CollectGarbage(); 180 } 181 } 182 JoinWorkers(void)183 void JoinWorkers(void) 184 { 185 int err_code; 186 for (auto & it : m_workers) { 187 CTcpWorker<P, U, D> *worker = it.get(); 188 if (!worker->m_joined) { 189 worker->m_joined = true; 190 while (1) { 191 err_code = uv_thread_join(&worker->m_thread); 192 if (!err_code) { 193 worker->m_thread = 0; 194 break; 195 } else if (-err_code != EAGAIN) { 196 PSG_ERROR("uv_thread_join failed: " << err_code); 197 break; 198 } 199 } 200 } 201 } 202 } 203 }; 204 205 206 struct CTcpWorkerInternal_t { 207 CUvLoop m_loop; 208 uv_tcp_t m_listener; 209 uv_async_t m_async_stop; 210 uv_async_t m_async_work; 211 uv_timer_t m_timer; 212 CTcpWorkerInternal_tTSL::CTcpWorkerInternal_t213 CTcpWorkerInternal_t() : 214 m_listener({0}), 215 m_async_stop({0}), 216 m_async_work({0}), 217 m_timer({0}) 218 {} 219 }; 220 221 222 template<typename P, typename U, typename D> 223 struct CTcpWorker 224 { 225 unsigned int m_id; 226 uv_thread_t m_thread; 227 std::atomic_uint_fast64_t m_request_count; 228 std::atomic_uint_fast16_t m_connection_count; 229 std::atomic_bool m_started; 230 std::atomic_bool m_shutdown; 231 std::atomic_bool m_shuttingdown; 232 bool m_close_all_issued; 233 bool m_joined; 234 int m_error; 235 std::list<std::tuple<uv_tcp_t, U>> m_connected_list; 236 std::list<std::tuple<uv_tcp_t, U>> m_free_list; 237 struct uv_export_t * m_exp; 238 CTcpWorkersList<P, U, D> * m_guard; 239 CTcpDaemon<P, U, D> * m_daemon; 240 std::string m_last_error; 241 D & m_d; 242 P m_protocol; 243 std::unique_ptr<CTcpWorkerInternal_t> m_internal; 244 CTcpWorkerTSL::CTcpWorker245 CTcpWorker(unsigned int id, struct uv_export_t * exp, 246 CTcpDaemon<P, U, D> * daemon, 247 CTcpWorkersList<P, U, D> * guard, D & d) : 248 m_id(id), 249 m_thread(0), 250 m_request_count(0), 251 m_connection_count(0), 252 m_started(false), 253 m_shutdown(false), 254 m_shuttingdown(false), 255 m_close_all_issued(false), 256 m_joined(false), 257 m_error(0), 258 m_exp(exp), 259 m_guard(guard), 260 m_daemon(daemon), 261 m_d(d), 262 m_protocol(m_d) 263 {} 264 StopTSL::CTcpWorker265 void Stop(void) 266 { 267 if (m_started && !m_shutdown && !m_shuttingdown) { 268 uv_async_send(&m_internal->m_async_stop); 269 } 270 } 271 ExecuteTSL::CTcpWorker272 void Execute(void) 273 { 274 try { 275 if (m_internal) 276 NCBI_THROW(CPubseqGatewayException, eWorkerAlreadyStarted, 277 "Worker has already been started"); 278 279 m_internal.reset(new CTcpWorkerInternal_t); 280 281 int err_code; 282 uv_key_set(&CTcpWorkersList<P, U, D>::s_thread_worker_key, this); 283 284 m_protocol.BeforeStart(); 285 err_code = uv_import(m_internal->m_loop.Handle(), 286 reinterpret_cast<uv_stream_t*>(&m_internal->m_listener), 287 m_exp); 288 // PSG_ERROR("worker " << worker->m_id << " uv_import: " << err_code); 289 if (err_code != 0) 290 NCBI_THROW2(CPubseqGatewayUVException, eUvImportFailure, 291 "uv_import failed", err_code); 292 293 m_internal->m_listener.data = this; 294 err_code = uv_listen(reinterpret_cast<uv_stream_t*>(&m_internal->m_listener), 295 m_daemon->m_backlog, s_OnTcpConnection); 296 if (err_code != 0) 297 NCBI_THROW2(CPubseqGatewayUVException, eUvListenFailure, 298 "uv_listen failed", err_code); 299 m_internal->m_listener.data = this; 300 301 err_code = uv_async_init(m_internal->m_loop.Handle(), 302 &m_internal->m_async_stop, s_OnAsyncStop); 303 if (err_code != 0) 304 NCBI_THROW2(CPubseqGatewayUVException, eUvAsyncInitFailure, 305 "uv_async_init failed", err_code); 306 m_internal->m_async_stop.data = this; 307 308 err_code = uv_async_init(m_internal->m_loop.Handle(), 309 &m_internal->m_async_work, s_OnAsyncWork); 310 if (err_code != 0) 311 NCBI_THROW2(CPubseqGatewayUVException, eUvAsyncInitFailure, 312 "uv_async_init failed", err_code); 313 m_internal->m_async_work.data = this; 314 315 err_code = uv_timer_init(m_internal->m_loop.Handle(), 316 &m_internal->m_timer); 317 if (err_code != 0) 318 NCBI_THROW2(CPubseqGatewayUVException, eUvTimerInitFailure, 319 "uv_timer_init failed", err_code); 320 m_internal->m_timer.data = this; 321 322 uv_timer_start(&m_internal->m_timer, s_OnTimer, 1000, 1000); 323 324 m_started = true; 325 m_protocol.ThreadStart(m_internal->m_loop.Handle(), this); 326 327 err_code = uv_run(m_internal->m_loop.Handle(), UV_RUN_DEFAULT); 328 PSG_INFO("uv_run (1) worker " << m_id << 329 " returned " << err_code); 330 } catch (const CPubseqGatewayUVException & exc) { 331 m_error = exc.GetUVLibraryErrorCode(); 332 m_last_error = exc.GetMsg(); 333 } catch (const CException & exc) { 334 m_error = exc.GetErrCode(); 335 m_last_error = exc.GetMsg(); 336 } 337 338 m_shuttingdown = true; 339 PSG_INFO("worker " << m_id << " is closing"); 340 if (m_internal) { 341 try { 342 int err_code; 343 344 if (m_internal->m_listener.type != 0) 345 uv_close(reinterpret_cast<uv_handle_t*>(&m_internal->m_listener), 346 NULL); 347 348 CloseAll(); 349 350 while (m_connection_count > 0) 351 uv_run(m_internal->m_loop.Handle(), UV_RUN_NOWAIT); 352 353 if (m_internal->m_async_stop.type != 0) 354 uv_close(reinterpret_cast<uv_handle_t*>(&m_internal->m_async_stop), 355 NULL); 356 if (m_internal->m_async_work.type != 0) 357 uv_close(reinterpret_cast<uv_handle_t*>(&m_internal->m_async_work), 358 NULL); 359 if (m_internal->m_timer.type != 0) 360 uv_close(reinterpret_cast<uv_handle_t*>(&m_internal->m_timer), 361 NULL); 362 363 m_protocol.ThreadStop(); 364 365 err_code = uv_run(m_internal->m_loop.Handle(), UV_RUN_DEFAULT); 366 367 if (err_code != 0) 368 PSG_INFO("worker " << m_id << 369 ", uv_run (2) returned " << err_code << 370 ", st: " << m_started.load()); 371 // uv_walk(m_internal->m_loop.Handle(), s_LoopWalk, this); 372 err_code = m_internal->m_loop.Close(); 373 if (err_code != 0) { 374 PSG_INFO("worker " << m_id << 375 ", uv_loop_close returned " << err_code << 376 ", st: " << m_started.load()); 377 uv_walk(m_internal->m_loop.Handle(), s_LoopWalk, this); 378 } 379 m_internal.reset(nullptr); 380 } catch(...) { 381 PSG_ERROR("unexpected exception while shutting down worker " << 382 m_id); 383 } 384 } 385 } 386 CloseAllTSL::CTcpWorker387 void CloseAll(void) 388 { 389 assert(m_shuttingdown); 390 if (!m_close_all_issued) { 391 m_close_all_issued = true; 392 for (auto it = m_connected_list.begin(); 393 it != m_connected_list.end(); ++it) { 394 uv_tcp_t *tcp = &std::get<0>(*it); 395 uv_close(reinterpret_cast<uv_handle_t*>(tcp), s_OnCliClosed); 396 } 397 } 398 } 399 OnCliClosedTSL::CTcpWorker400 void OnCliClosed(uv_handle_t * handle) 401 { 402 m_daemon->ClientDisconnected(); 403 --m_connection_count; 404 405 uv_tcp_t *tcp = reinterpret_cast<uv_tcp_t*>(handle); 406 for (auto it = m_connected_list.begin(); 407 it != m_connected_list.end(); ++it) { 408 if (tcp == &std::get<0>(*it)) { 409 m_protocol.OnClosedConnection(reinterpret_cast<uv_stream_t*>(handle), 410 &std::get<1>(*it)); 411 m_free_list.splice(m_free_list.begin(), m_connected_list, it); 412 return; 413 } 414 } 415 assert(false); 416 } 417 WakeWorkerTSL::CTcpWorker418 void WakeWorker(void) 419 { 420 if (m_internal) 421 uv_async_send(&m_internal->m_async_work); 422 } 423 GetConnListTSL::CTcpWorker424 std::list<std::tuple<uv_tcp_t, U>> & GetConnList(void) 425 { 426 return m_connected_list; 427 } 428 429 private: OnAsyncWorkTSL::CTcpWorker430 void OnAsyncWork(void) 431 { 432 // If shutdown is in progress, close outstanding requests 433 // otherwise pick data from them and send back to the client 434 m_protocol.OnAsyncWork(m_shuttingdown || m_shutdown); 435 } 436 s_OnAsyncWorkTSL::CTcpWorker437 static void s_OnAsyncWork(uv_async_t * handle) 438 { 439 PSG_INFO("Worker async work requested"); 440 CTcpWorker<P, U, D> * worker = 441 static_cast<CTcpWorker<P, U, D>*>( 442 uv_key_get(&CTcpWorkersList<P, U, D>::s_thread_worker_key)); 443 worker->OnAsyncWork(); 444 } 445 OnTimerTSL::CTcpWorker446 void OnTimer(void) 447 { 448 m_protocol.OnTimer(); 449 } 450 s_OnTimerTSL::CTcpWorker451 static void s_OnTimer(uv_timer_t * handle) 452 { 453 CTcpWorker<P, U, D> * worker = 454 static_cast<CTcpWorker<P, U, D>*>( 455 uv_key_get(&CTcpWorkersList<P, U, D>::s_thread_worker_key)); 456 worker->OnTimer(); 457 } 458 s_OnAsyncStopTSL::CTcpWorker459 static void s_OnAsyncStop(uv_async_t * handle) 460 { 461 PSG_INFO("Worker async stop requested"); 462 uv_stop(handle->loop); 463 } 464 s_OnTcpConnectionTSL::CTcpWorker465 static void s_OnTcpConnection(uv_stream_t * listener, const int status) 466 { 467 if (listener && status == 0) { 468 CTcpWorker<P, U, D> * worker = 469 static_cast<CTcpWorker<P, U, D>*>(listener->data); 470 worker->OnTcpConnection(listener); 471 } 472 } 473 s_OnCliClosedTSL::CTcpWorker474 static void s_OnCliClosed(uv_handle_t * handle) 475 { 476 CTcpWorker<P, U, D> * worker = 477 static_cast<CTcpWorker<P, U, D>*>( 478 uv_key_get(&CTcpWorkersList<P, U, D>::s_thread_worker_key)); 479 worker->OnCliClosed(handle); 480 } 481 s_LoopWalkTSL::CTcpWorker482 static void s_LoopWalk(uv_handle_t * handle, void * arg) 483 { 484 CTcpWorker<P, U, D> * worker = arg ? 485 static_cast<CTcpWorker<P, U, D>*>(arg) : NULL; 486 PSG_INFO("Handle " << handle << 487 " (" << handle->type << 488 ") @ worker " << (worker ? worker->m_id : -1) << 489 " (" << worker << ")"); 490 } 491 OnTcpConnectionTSL::CTcpWorker492 void OnTcpConnection(uv_stream_t * listener) 493 { 494 if (m_free_list.empty()) 495 m_free_list.push_back(std::make_tuple(uv_tcp_t{0}, U())); 496 497 auto it = m_free_list.begin(); 498 uv_tcp_t * tcp = &std::get<0>(*it); 499 int err_code = uv_tcp_init(m_internal->m_loop.Handle(), tcp); 500 501 if (err_code != 0) 502 return; 503 504 uv_tcp_nodelay(tcp, 1); 505 506 tcp->data = this; 507 m_connected_list.splice(m_connected_list.begin(), m_free_list, it); 508 509 err_code = uv_accept(listener, reinterpret_cast<uv_stream_t*>(tcp)); 510 ++m_connection_count; 511 bool b = m_daemon->ClientConnected(); 512 513 if (err_code != 0 || !b || m_shuttingdown) { 514 uv_close(reinterpret_cast<uv_handle_t*>(tcp), s_OnCliClosed); 515 return; 516 } 517 std::get<1>(*it).Reset(); 518 m_protocol.OnNewConnection(reinterpret_cast<uv_stream_t*>(tcp), 519 &std::get<1>(*it), s_OnCliClosed); 520 } 521 }; 522 523 524 template<typename P, typename U, typename D> 525 class CTcpDaemon 526 { 527 private: 528 std::string m_address; 529 unsigned short m_port; 530 unsigned short m_num_workers; 531 unsigned short m_backlog; 532 unsigned short m_max_connections; 533 CTcpWorkersList<P, U, D> * m_workers; 534 std::atomic_uint_fast16_t m_connection_count; 535 536 friend class CTcpWorkersList<P, U, D>; 537 friend class CTcpWorker<P, U, D>; 538 539 private: s_OnMainSigInt(uv_signal_t *,int)540 static void s_OnMainSigInt(uv_signal_t * /* req */, int /* signum */) 541 { 542 PSG_MESSAGE("SIGINT received. Immediate shutdown performed."); 543 exit(0); 544 // The uv_stop() may hang if some syncronous long operation is in 545 // progress. So it was decided to use exit() which is not a big problem 546 // for PSG because it is a stateless server. 547 // uv_stop(req->loop); 548 } 549 s_OnMainSigTerm(uv_signal_t *,int)550 static void s_OnMainSigTerm(uv_signal_t * /* req */, int /* signum */) 551 { 552 auto now = chrono::steady_clock::now(); 553 auto expiration = now + chrono::hours(24); 554 555 if (g_ShutdownData.m_ShutdownRequested) { 556 if (expiration >= g_ShutdownData.m_Expired) { 557 PSG_MESSAGE("SIGTERM received. The previous shutdown " 558 "expiration is shorter than this one. Ignored."); 559 return; 560 } 561 } 562 563 PSG_MESSAGE("SIGTERM received. Graceful shutdown is initiated"); 564 g_ShutdownData.m_Expired = expiration; 565 g_ShutdownData.m_ShutdownRequested = true; 566 } 567 s_OnMainSigHup(uv_signal_t *,int)568 static void s_OnMainSigHup(uv_signal_t * /* req */, int /* signum */) 569 { PSG_MESSAGE("SIGHUP received. Ignoring."); } 570 s_OnMainSigUsr1(uv_signal_t *,int)571 static void s_OnMainSigUsr1(uv_signal_t * /* req */, int /* signum */) 572 { PSG_MESSAGE("SIGUSR1 received. Ignoring."); } 573 s_OnMainSigUsr2(uv_signal_t *,int)574 static void s_OnMainSigUsr2(uv_signal_t * /* req */, int /* signum */) 575 { PSG_MESSAGE("SIGUSR2 received. Ignoring."); } 576 s_OnMainSigWinch(uv_signal_t *,int)577 static void s_OnMainSigWinch(uv_signal_t * /* req */, int /* signum */) 578 { PSG_MESSAGE("SIGWINCH received. Ignoring."); } 579 ClientConnected(void)580 bool ClientConnected(void) 581 { 582 uint16_t n = ++m_connection_count; 583 return n < m_max_connections; 584 } 585 ClientDisconnected(void)586 bool ClientDisconnected(void) 587 { 588 uint16_t n = --m_connection_count; 589 return n < m_max_connections; 590 } 591 592 593 protected: 594 static constexpr const char IPC_PIPE_NAME[] = "tcp_daemon_startup_rpc"; 595 596 public: CTcpDaemon(const std::string & Address,unsigned short Port,unsigned short NumWorkers,unsigned short BackLog,unsigned short MaxConnections)597 CTcpDaemon(const std::string & Address, unsigned short Port, 598 unsigned short NumWorkers, unsigned short BackLog, 599 unsigned short MaxConnections) : 600 m_address(Address), 601 m_port(Port), 602 m_num_workers(NumWorkers), 603 m_backlog(BackLog), 604 m_max_connections(MaxConnections), 605 m_workers(nullptr), 606 m_connection_count(0) 607 {} 608 OnRequest(P ** p)609 bool OnRequest(P ** p) 610 { 611 CTcpWorker<P, U, D> * worker = static_cast<CTcpWorker<P, U, D>*>( 612 uv_key_get(&CTcpWorkersList<P, U, D>::s_thread_worker_key)); 613 if (worker->m_shutdown) { 614 worker->CloseAll(); 615 *p = nullptr; 616 return false; 617 } 618 619 ++worker->m_request_count; 620 *p = &worker->m_protocol; 621 return true; 622 } 623 NumOfRequests(void)624 uint64_t NumOfRequests(void) 625 { 626 return m_workers ? m_workers->NumOfRequests() : 0; 627 } 628 NumOfConnections(void) const629 uint16_t NumOfConnections(void) const 630 { 631 return m_connection_count; 632 } 633 Run(D & d,std::function<void (CTcpDaemon<P,U,D> & daemon)> OnWatchDog=nullptr)634 void Run(D & d, 635 std::function<void(CTcpDaemon<P, U, D>& daemon)> 636 OnWatchDog = nullptr) 637 { 638 int rc; 639 640 if (m_address.empty()) 641 NCBI_THROW(CPubseqGatewayException, eAddressEmpty, 642 "Failed to start daemon: address is empty"); 643 if (m_port == 0) 644 NCBI_THROW(CPubseqGatewayException, ePortNotSpecified, 645 "Failed to start daemon: port is not specified"); 646 647 signal(SIGPIPE, SIG_IGN); 648 if (CTcpWorkersList<P, U, D>::s_thread_worker_key == 0) { 649 rc = uv_key_create(&CTcpWorkersList<P, U, D>::s_thread_worker_key); 650 if (rc != 0) 651 NCBI_THROW2(CPubseqGatewayUVException, eUvKeyCreateFailure, 652 "uv_key_create failed", rc); 653 } 654 655 CTcpWorkersList<P, U, D> workers(this); 656 {{ 657 CUvLoop loop; 658 659 CUvSignal sigint(loop.Handle()); 660 sigint.Start(SIGINT, s_OnMainSigInt); 661 662 CUvSignal sigterm(loop.Handle()); 663 sigterm.Start(SIGTERM, s_OnMainSigTerm); 664 665 CUvSignal sighup(loop.Handle()); 666 sighup.Start(SIGHUP, s_OnMainSigHup); 667 668 CUvSignal sigusr1(loop.Handle()); 669 sigusr1.Start(SIGUSR1, s_OnMainSigUsr1); 670 671 CUvSignal sigusr2(loop.Handle()); 672 sigusr2.Start(SIGUSR2, s_OnMainSigUsr2); 673 674 CUvSignal sigwinch(loop.Handle()); 675 sigwinch.Start(SIGWINCH, s_OnMainSigWinch); 676 677 678 CUvTcp listener(loop.Handle()); 679 listener.Bind(m_address.c_str(), m_port); 680 681 struct uv_export_t * exp = NULL; 682 rc = uv_export_start(loop.Handle(), 683 reinterpret_cast<uv_stream_t*>(listener.Handle()), 684 IPC_PIPE_NAME, m_num_workers, &exp); 685 if (rc) 686 NCBI_THROW2(CPubseqGatewayUVException, eUvExportStartFailure, 687 "uv_export_start failed", rc); 688 689 try { 690 workers.Start(exp, m_num_workers, d, OnWatchDog); 691 } catch (const exception & exc) { 692 uv_export_close(exp); 693 throw; 694 } 695 696 rc = uv_export_finish(exp); 697 if (rc) 698 NCBI_THROW2(CPubseqGatewayUVException, eUvExportWaitFailure, 699 "uv_export_wait failed", rc); 700 701 listener.Close(nullptr); 702 703 P::DaemonStarted(); 704 705 uv_timer_t watch_dog; 706 uv_timer_init(loop.Handle(), &watch_dog); 707 watch_dog.data = &workers; 708 uv_timer_start(&watch_dog, workers.s_OnWatchDog, 1000, 1000); 709 710 uv_run(loop.Handle(), UV_RUN_DEFAULT); 711 712 uv_close(reinterpret_cast<uv_handle_t*>(&watch_dog), NULL); 713 workers.KillAll(); 714 715 P::DaemonStopped(); 716 }} 717 } 718 }; 719 720 721 template<typename P, typename U, typename D> 722 uv_key_t CTcpWorkersList<P, U, D>::s_thread_worker_key; 723 724 725 template<typename P, typename U, typename D> 726 constexpr const char CTcpDaemon<P, U, D>::IPC_PIPE_NAME[]; 727 728 } 729 730 #endif 731