1 /*-------------------------------------------------------------------------
2 * slon.c
3 *
4 * The control framework for the node daemon.
5 *
6 * Copyright (c) 2003-2009, PostgreSQL Global Development Group
7 * Author: Jan Wieck, Afilias USA INC.
8 *
9 *
10 *-------------------------------------------------------------------------
11 */
12
13
14 #include <pthread.h>
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <errno.h>
21 #include <signal.h>
22 #ifndef WIN32
23 #include <sys/time.h>
24 #include <unistd.h>
25 #include <sys/wait.h>
26 #endif
27 #include <sys/types.h>
28
29
30
31 #ifdef WIN32
32 #include <winsock2.h>
33 #include "port/win32service.h"
34 #define sleep(x) Sleep(x*1000)
35 /**
36 * some versions of MSVC seem to need these defined
37 * to finish linking.
38 */
__iob_func()39 FILE * __iob_func() {
40
41 FILE result[] = {*stdin,*stdout,*stderr};
42 return result;
43 }
44
45 #endif
46
47 #include "libpq-fe.h"
48
49 #include "slon.h"
50
51
52 #include "confoptions.h"
53
54
55
56 /* ----------
57 * Global data
58 * ----------
59 */
60 #ifndef WIN32
61 #define SLON_WATCHDOG_NORMAL 0
62 #define SLON_WATCHDOG_RESTART 1
63 #define SLON_WATCHDOG_RETRY 2
64 #define SLON_WATCHDOG_SHUTDOWN 3
65 static int watchdog_status = SLON_WATCHDOG_NORMAL;
66 #endif
67 int sched_wakeuppipe[2];
68
69 pthread_mutex_t slon_wait_listen_lock;
70 pthread_cond_t slon_wait_listen_cond;
71 int slon_listen_started = 0;
72 bool monitor_threads;
73
74 int apply_cache_size;
75
76 /* ----------
77 * Local data
78 * ----------
79 */
80 static void slon_exit(int code);
81 static pthread_t local_event_thread;
82 static pthread_t local_cleanup_thread;
83 static pthread_t local_sync_thread;
84 static pthread_t local_monitor_thread;
85
86 static pthread_t main_thread;
87 static char *const * main_argv;
88
89 static void SlonMain(void);
90
91 #ifndef WIN32
92 static void SlonWatchdog(void);
93 static void sighandler(int signo);
94 void slon_terminate_worker(void);
95 #endif
96 typedef void (*sighandler_t) (int);
97 static sighandler_t install_signal_handler(int signum, sighandler_t handler);
98
99 int slon_log_level;
100 char *pid_file;
101 char *archive_dir = NULL;
102 static int child_status;
103
104 /**
105 * A variable to indicate that the
106 * worker has been restarted by the watchdog.
107 */
108 int worker_restarted = 0;
109
110 /* ----------
111 * Usage
112 * ----------
113 */
114 void
Usage(char * const argv[])115 Usage(char *const argv[])
116 {
117 fprintf(stderr, "usage: %s [options] clustername conninfo\n", argv[0]);
118 fprintf(stderr, "\n");
119 fprintf(stderr, "Options:\n");
120
121 fprintf(stderr, " -h print usage message and exit\n");
122 fprintf(stderr, " -v print version and exit\n");
123 fprintf(stderr, " -d <debuglevel> verbosity of logging (1..4)\n");
124 fprintf(stderr, " -s <milliseconds> SYNC check interval (default 2000)\n");
125 fprintf(stderr, " -t <milliseconds> SYNC interval timeout (default 10000)\n");
126 fprintf(stderr, " -o <milliseconds> desired subscriber SYNC processing time\n");
127 fprintf(stderr, " -g <num> maximum SYNC group size (default 20)\n");
128 fprintf(stderr, " -c <num> how often to vacuum in cleanup cycles\n");
129 fprintf(stderr, " -p <filename> slon pid file\n");
130 fprintf(stderr, " -f <filename> slon configuration file\n");
131 fprintf(stderr, " -a <directory> directory to store SYNC archive files\n");
132 fprintf(stderr, " -x <command> program to run after writing archive file\n");
133 fprintf(stderr, " -q <num> Terminate when this node reaches # of SYNCs\n");
134 fprintf(stderr, " -r <num> # of syncs for -q option\n");
135 fprintf(stderr, " -l <interval> this node should lag providers by this interval\n");
136 #ifdef WIN32
137 fprintf(stderr, "\nWindows service registration:\n");
138 fprintf(stderr, " slon -regservice [servicename]\n");
139 fprintf(stderr, " slon -unregservice [servicename]\n");
140 fprintf(stderr, " slon -listengines [servicename]\n");
141 fprintf(stderr, " slon -addengine [servicename] <configfile>\n");
142 fprintf(stderr, " slon -delengine [servicename] <configfile>\n");
143 #endif
144 exit(1);
145 }
146
147
148 /* ----------
149 * main
150 * ----------
151 */
152 int
main(int argc,char * const argv[])153 main(int argc, char *const argv[])
154 {
155 char *cp1;
156 char *cp2;
157 int c;
158 int errors = 0;
159 extern int optind;
160 extern char *optarg;
161
162
163 #ifdef WIN32
164 WSADATA wsaData;
165 int err;
166 #endif
167
168
169 #ifdef WIN32
170 if (argc >= 2 && !strcmp(argv[1], "-service"))
171 {
172 win32_servicestart();
173 exit(0);
174 }
175 if (argc >= 2 && !strcmp(argv[1], "-subservice"))
176 {
177 win32_isservice = 1;
178 argc--;
179 argv++;
180 }
181 if (argc >= 2 && argc <= 4 && (
182 !strcmp(argv[1], "-regservice") ||
183 !strcmp(argv[1], "-unregservice") ||
184 !strcmp(argv[1], "-addengine") ||
185 !strcmp(argv[1], "-delengine") ||
186 !strcmp(argv[1], "-listengines")))
187 {
188 win32_serviceconfig(argc, argv);
189 }
190 #endif
191
192 InitializeConfOptions();
193
194 while ((c = getopt(argc, argv, "f:a:d:s:t:g:c:p:o:q:r:l:x:hv?")) != EOF)
195 {
196 switch (c)
197 {
198 case '?':
199 Usage(argv);
200 case 'q':
201 set_config_option("quit_sync_provider", optarg);
202 break;
203
204 case 'r':
205 set_config_option("quit_sync_finalsync", optarg);
206 break;
207
208 case 'f':
209 ProcessConfigFile(optarg);
210 break;
211
212 case 'a':
213 set_config_option("archive_dir", optarg);
214 break;
215
216 case 'd':
217 set_config_option("log_level", optarg);
218 break;
219
220 case 's':
221 set_config_option("sync_interval", optarg);
222 break;
223
224 case 't':
225 set_config_option("sync_interval_timeout", optarg);
226 break;
227
228 case 'g':
229 set_config_option("sync_group_maxsize", optarg);
230 break;
231
232 case 'c':
233 set_config_option("vac_frequency", optarg);
234 break;
235
236 case 'p':
237 set_config_option("pid_file", optarg);
238 break;
239
240 case 'o':
241 set_config_option("desired_sync_time", optarg);
242 break;
243
244 case 'l':
245 set_config_option("lag_interval", optarg);
246 break;
247
248 case 'h':
249 errors++;
250 break;
251
252 case 'v':
253 printf("slon version %s\n", SLONY_I_VERSION_STRING);
254 exit(0);
255 break;
256
257 case 'x':
258 set_config_option("command_on_logarchive", optarg);
259 break;
260
261 default:
262 fprintf(stderr, "unknown option '%c'\n", c);
263 errors++;
264 break;
265 }
266 }
267
268 /*
269 * Make sure the sync interval timeout isn't too small.
270 */
271 if (sync_interval_timeout != 0 && sync_interval_timeout <= sync_interval)
272 sync_interval_timeout = sync_interval * 2;
273
274 /*
275 * Remember the cluster name and build the properly quoted namespace
276 * identifier
277 */
278 slon_pid = getpid();
279 #ifndef WIN32
280 if (pthread_mutex_init(&slon_watchdog_lock, NULL) < 0)
281 {
282 slon_log(SLON_FATAL, "slon: pthread_mutex_init() - %s\n",
283 strerror(errno));
284 exit(-1);
285 }
286 slon_watchdog_pid = slon_pid;
287 slon_worker_pid = -1;
288 #endif
289 main_argv = argv;
290
291 if ((char *) argv[optind])
292 {
293 set_config_option("cluster_name", (char *) argv[optind]);
294 set_config_option("conn_info", (char *) argv[++optind]);
295 }
296
297 if (rtcfg_cluster_name != NULL)
298 {
299 rtcfg_namespace = malloc(strlen(rtcfg_cluster_name) * 2 + 4);
300 cp2 = rtcfg_namespace;
301 *cp2++ = '"';
302 *cp2++ = '_';
303 for (cp1 = (char *) rtcfg_cluster_name; *cp1; cp1++)
304 {
305 if (*cp1 == '"')
306 *cp2++ = '"';
307 *cp2++ = *cp1;
308 }
309 *cp2++ = '"';
310 *cp2 = '\0';
311 }
312 else
313 {
314 errors++;
315 }
316
317 slon_log(SLON_CONFIG, "main: slon version %s starting up\n",
318 SLONY_I_VERSION_STRING);
319
320 /*
321 * Remember the connection information for the local node.
322 */
323 if (rtcfg_conninfo == NULL)
324 {
325 errors++;
326 }
327
328 if (errors != 0)
329 {
330 Usage(argv);
331 }
332
333 #ifdef WIN32
334
335 /*
336 * Startup the network subsystem, in case our libpq doesn't
337 */
338 err = WSAStartup(MAKEWORD(1, 1), &wsaData);
339 if (err != 0)
340 {
341 slon_log(SLON_FATAL, "main: Cannot start the network subsystem - %d\n", err);
342 exit(-1);
343 }
344 #endif
345
346 if (pid_file)
347 {
348 FILE *pidfile;
349
350 pidfile = fopen(pid_file, "w");
351 if (pidfile)
352 {
353 fprintf(pidfile, "%d\n", slon_pid);
354 fclose(pidfile);
355 }
356 else
357 {
358 slon_log(SLON_FATAL, "Cannot open pid_file \"%s\"\n", pid_file);
359 exit(-1);
360 }
361 }
362
363 /*
364 * Create the pipe used to kick the workers scheduler thread
365 */
366 if (pgpipe(sched_wakeuppipe) < 0)
367 {
368 slon_log(SLON_FATAL, "slon: sched_wakeuppipe create failed -(%d) %s\n", errno, strerror(errno));
369 slon_exit(-1);
370 }
371
372 if (!PQisthreadsafe())
373 {
374 slon_log(SLON_FATAL, "slon: libpq was not compiled with thread safety enabled (normally: --enable-thread-safety). slon is a multithreaded application requiring thread-safe libpq\n");
375 slon_exit(-1);
376 }
377
378 if (!PQisthreadsafe())
379 {
380 slon_log(SLON_FATAL, "slon: libpq was not compiled with --enable-thread-safety. Slony-I requires a thread enabled libpq\n");
381 slon_exit(-1);
382 }
383
384 /*
385 * There is no watchdog process on win32. We delegate restarting and other
386 * such tasks to the Service Control Manager. And win32 doesn't support
387 * signals, so we don't need to catch them...
388 */
389 #ifndef WIN32
390 SlonWatchdog();
391 #else
392 SlonMain();
393 #endif
394 exit(0);
395 }
396
397
398 /* ----------
399 * SlonMain
400 * ----------
401 */
402 static void
SlonMain(void)403 SlonMain(void)
404 {
405 PGresult *res;
406 SlonDString query;
407 int i,
408 n;
409 PGconn *startup_conn;
410
411 slon_pid = getpid();
412 #ifndef WIN32
413 slon_worker_pid = slon_pid;
414 #endif
415
416 if (pthread_mutex_init(&slon_wait_listen_lock, NULL) < 0)
417 {
418 slon_log(SLON_FATAL, "main: pthread_mutex_init() failed - %s\n",
419 strerror(errno));
420 slon_abort();
421 }
422 if (pthread_cond_init(&slon_wait_listen_cond, NULL) < 0)
423 {
424 slon_log(SLON_FATAL, "main: pthread_cond_init() failed - %s\n",
425 strerror(errno));
426 slon_abort();
427 }
428
429
430 /*
431 * Dump out current configuration - all elements of the various arrays...
432 */
433 dump_configuration();
434
435 /*
436 * Connect to the local database to read the initial configuration
437 */
438 startup_conn = PQconnectdb(rtcfg_conninfo);
439 if (startup_conn == NULL)
440 {
441 slon_log(SLON_FATAL, "main: PQconnectdb() failed - sleep 10s\n");
442 sleep(10);
443 slon_retry();
444 exit(-1);
445 }
446 if (PQstatus(startup_conn) != CONNECTION_OK)
447 {
448 slon_log(SLON_FATAL, "main: Cannot connect to local database - %s - sleep 10s\n",
449 PQerrorMessage(startup_conn));
450 PQfinish(startup_conn);
451 sleep(10);
452 slon_retry();
453 exit(-1);
454 }
455
456 /*
457 * Get our local node ID
458 */
459 rtcfg_nodeid = db_getLocalNodeId(startup_conn);
460 if (rtcfg_nodeid < 0)
461 {
462 slon_log(SLON_FATAL, "main: Node is not initialized properly - sleep 10s\n");
463 sleep(10);
464 slon_retry();
465 exit(-1);
466 }
467 if (db_checkSchemaVersion(startup_conn) < 0)
468 {
469 slon_log(SLON_FATAL, "main: Node has wrong Slony-I schema or module version loaded\n");
470 slon_abort();
471 }
472 slon_log(SLON_CONFIG, "main: local node id = %d\n", rtcfg_nodeid);
473
474 dstring_init(&query);
475 slon_mkquery(&query, "select %s.slon_node_health_check();", rtcfg_namespace);
476 res = PQexec(startup_conn, dstring_data(&query));
477 if (PQresultStatus(res) != PGRES_TUPLES_OK)
478 {
479 slon_log(SLON_FATAL, "could not call slon_node_health_check() - %",
480 PQresultErrorMessage(res));
481 slon_abort();
482 }
483 else
484 {
485 if (PQntuples(res) != 1)
486 {
487 slon_log(SLON_FATAL,
488 "query '%s' returned %d rows (expected 1)\n",
489 query, PQntuples(res));
490 slon_abort();
491 }
492 else
493 {
494 if (*(PQgetvalue(res, 0, 0)) == 'f')
495 {
496 slon_log(SLON_FATAL,
497 "slon_node_health_check() returned false - fatal health problem!\n%s\nREPAIR CONFIG may be helpful to rectify this problem\n",
498 PQresultErrorMessage(res));
499 slon_abort();
500 }
501 }
502 }
503 PQclear(res);
504 dstring_free(&query);
505
506 #ifndef WIN32
507 if (signal(SIGHUP, SIG_IGN) == SIG_ERR)
508 {
509 slon_log(SLON_FATAL, "main: SIGHUP signal handler setup failed -(%d) %s\n", errno, strerror(errno));
510 slon_abort();
511 }
512 if (signal(SIGINT, SIG_IGN) == SIG_ERR)
513 {
514 slon_log(SLON_FATAL, "main: SIGINT signal handler setup failed -(%d) %s\n", errno, strerror(errno));
515 slon_abort();
516 }
517 if (signal(SIGTERM, SIG_IGN) == SIG_ERR)
518 {
519 slon_log(SLON_FATAL, "main: SIGTERM signal handler setup failed -(%d) %s\n", errno, strerror(errno));
520 slon_abort();
521 }
522 if (signal(SIGCHLD, SIG_IGN) == SIG_ERR)
523 {
524 slon_log(SLON_FATAL, "main: SIGCHLD signal handler setup failed -(%d) %s\n", errno, strerror(errno));
525 slon_abort();
526 }
527 if (signal(SIGQUIT, SIG_IGN) == SIG_ERR)
528 {
529 slon_log(SLON_FATAL, "main: SIGQUIT signal handler setup failed -(%d) %s\n", errno, strerror(errno));
530 slon_abort();
531 }
532 #endif
533
534 slon_log(SLON_INFO, "main: main process started\n");
535
536 /*
537 * Start the event scheduling system
538 */
539 slon_log(SLON_CONFIG, "main: launching sched_start_mainloop\n");
540 if (sched_start_mainloop() < 0)
541 slon_retry();
542
543 slon_log(SLON_CONFIG, "main: loading current cluster configuration\n");
544
545 /*
546 * Begin a transaction
547 */
548 res = PQexec(startup_conn,
549 "start transaction; "
550 "set transaction isolation level serializable;");
551 if (PQresultStatus(res) != PGRES_COMMAND_OK)
552 {
553 slon_log(SLON_FATAL, "Cannot start transaction - %s - sleep 10s\n",
554 PQresultErrorMessage(res));
555 sleep(10);
556 PQclear(res);
557 slon_retry();
558 }
559 PQclear(res);
560
561 /*
562 * Read configuration table sl_node
563 */
564 dstring_init(&query);
565 slon_mkquery(&query,
566 "select no_id, no_active, no_comment, "
567 " (select coalesce(max(con_seqno),0) from %s.sl_confirm "
568 " where con_origin = no_id and con_received = %d) "
569 " as last_event, "
570 " (select ev_snapshot from %s.sl_event "
571 " where ev_origin = no_id "
572 " and ev_seqno = (select max(ev_seqno) "
573 " from %s.sl_event "
574 " where ev_origin = no_id "
575 " and ev_type = 'SYNC')) as last_snapshot "
576 "from %s.sl_node "
577 "order by no_id; ",
578 rtcfg_namespace, rtcfg_nodeid,
579 rtcfg_namespace, rtcfg_namespace,
580 rtcfg_namespace);
581 res = PQexec(startup_conn, dstring_data(&query));
582 if (PQresultStatus(res) != PGRES_TUPLES_OK)
583 {
584 slon_log(SLON_FATAL, "main: Cannot get node list - %s\n",
585 PQresultErrorMessage(res));
586 PQclear(res);
587 dstring_free(&query);
588 slon_retry();
589 }
590 for (i = 0, n = PQntuples(res); i < n; i++)
591 {
592 int no_id = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
593 int no_active = (*PQgetvalue(res, i, 1) == 't') ? 1 : 0;
594 char *no_comment = PQgetvalue(res, i, 2);
595 int64 last_event;
596
597 if (no_id == rtcfg_nodeid)
598 {
599 /*
600 * Complete our own local node entry
601 */
602 rtcfg_nodeactive = no_active;
603 rtcfg_nodecomment = strdup(no_comment);
604 }
605 else
606 {
607 /*
608 * Add a remote node
609 */
610 slon_scanint64(PQgetvalue(res, i, 3), &last_event);
611 rtcfg_storeNode(no_id, no_comment);
612 rtcfg_setNodeLastEvent(no_id, last_event);
613 rtcfg_setNodeLastSnapshot(no_id, PQgetvalue(res, i, 4));
614
615 /*
616 * If it is active, remember for activation just before we start
617 * processing events.
618 */
619 if (no_active)
620 rtcfg_needActivate(no_id);
621 }
622 }
623 PQclear(res);
624
625 /*
626 * Read configuration table sl_path - the interesting pieces
627 */
628 slon_mkquery(&query,
629 "select pa_server, pa_conninfo, pa_connretry "
630 "from %s.sl_path where pa_client = %d"
631 " and pa_conninfo<>'<event pending>'",
632 rtcfg_namespace, rtcfg_nodeid);
633 res = PQexec(startup_conn, dstring_data(&query));
634 if (PQresultStatus(res) != PGRES_TUPLES_OK)
635 {
636 slon_log(SLON_FATAL, "main: Cannot get path config - %s\n",
637 PQresultErrorMessage(res));
638 PQclear(res);
639 dstring_free(&query);
640 slon_retry();
641 }
642 for (i = 0, n = PQntuples(res); i < n; i++)
643 {
644 int pa_server = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
645 char *pa_conninfo = PQgetvalue(res, i, 1);
646 int pa_connretry = (int) strtol(PQgetvalue(res, i, 2), NULL, 10);
647
648 rtcfg_storePath(pa_server, pa_conninfo, pa_connretry);
649 }
650 PQclear(res);
651
652 /*
653 * Load the initial listen configuration
654 */
655 rtcfg_reloadListen(startup_conn);
656
657 /*
658 * Read configuration table sl_set
659 */
660 slon_mkquery(&query,
661 "select set_id, set_origin, set_comment "
662 "from %s.sl_set",
663 rtcfg_namespace);
664 res = PQexec(startup_conn, dstring_data(&query));
665 if (PQresultStatus(res) != PGRES_TUPLES_OK)
666 {
667 slon_log(SLON_FATAL, "main: Cannot get set config - %s\n",
668 PQresultErrorMessage(res));
669 PQclear(res);
670 dstring_free(&query);
671 slon_retry();
672 }
673 for (i = 0, n = PQntuples(res); i < n; i++)
674 {
675 int set_id = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
676 int set_origin = (int) strtol(PQgetvalue(res, i, 1), NULL, 10);
677 char *set_comment = PQgetvalue(res, i, 2);
678
679 rtcfg_storeSet(set_id, set_origin, set_comment);
680 }
681 PQclear(res);
682
683 /*
684 * Read configuration table sl_subscribe - only subscriptions for local
685 * node
686 */
687 slon_mkquery(&query,
688 "select sub_set, sub_provider, sub_forward, sub_active "
689 "from %s.sl_subscribe "
690 "where sub_receiver = %d",
691 rtcfg_namespace, rtcfg_nodeid);
692 res = PQexec(startup_conn, dstring_data(&query));
693 if (PQresultStatus(res) != PGRES_TUPLES_OK)
694 {
695 slon_log(SLON_FATAL, "main: Cannot get subscription config - %s\n",
696 PQresultErrorMessage(res));
697 PQclear(res);
698 dstring_free(&query);
699 slon_retry();
700 }
701 for (i = 0, n = PQntuples(res); i < n; i++)
702 {
703 int sub_set = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
704 int sub_provider = (int) strtol(PQgetvalue(res, i, 1), NULL, 10);
705 char *sub_forward = PQgetvalue(res, i, 2);
706 char *sub_active = PQgetvalue(res, i, 3);
707
708 rtcfg_storeSubscribe(sub_set, sub_provider, sub_forward);
709 if (*sub_active == 't')
710 rtcfg_enableSubscription(sub_set, sub_provider, sub_forward);
711 }
712 PQclear(res);
713
714 /*
715 * Remember the last known local event sequence
716 */
717 slon_mkquery(&query,
718 "select coalesce(max(ev_seqno), -1) from %s.sl_event "
719 "where ev_origin = '%d'",
720 rtcfg_namespace, rtcfg_nodeid);
721 res = PQexec(startup_conn, dstring_data(&query));
722 if (PQresultStatus(res) != PGRES_TUPLES_OK)
723 {
724 slon_log(SLON_FATAL, "main: Cannot get last local eventid - %s\n",
725 PQresultErrorMessage(res));
726 PQclear(res);
727 dstring_free(&query);
728 slon_retry();
729 }
730 if (PQntuples(res) == 0)
731 strcpy(rtcfg_lastevent, "-1");
732 else if (PQgetisnull(res, 0, 0))
733 strcpy(rtcfg_lastevent, "-1");
734 else
735 strcpy(rtcfg_lastevent, PQgetvalue(res, 0, 0));
736 PQclear(res);
737 dstring_free(&query);
738 slon_log(SLON_CONFIG,
739 "main: last local event sequence = %s\n",
740 rtcfg_lastevent);
741
742 /*
743 * Rollback the transaction we used to get the config snapshot
744 */
745 res = PQexec(startup_conn, "rollback transaction;");
746 if (PQresultStatus(res) != PGRES_COMMAND_OK)
747 {
748 slon_log(SLON_FATAL, "main: Cannot rollback transaction - %s\n",
749 PQresultErrorMessage(res));
750 PQclear(res);
751 slon_retry();
752 }
753 PQclear(res);
754
755 /*
756 * Done with the startup, don't need the local connection any more.
757 */
758 PQfinish(startup_conn);
759
760 slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n");
761
762 /*
763 * Create the local event thread that monitors the local node for
764 * administrative events to adjust the configuration at runtime. We wait
765 * here until the local listen thread has checked that there is no other
766 * slon daemon running.
767 */
768 pthread_mutex_lock(&slon_wait_listen_lock);
769 if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0)
770 {
771 slon_log(SLON_FATAL, "main: cannot create localListenThread - %s\n",
772 strerror(errno));
773 slon_retry();
774 }
775 pthread_cond_wait(&slon_wait_listen_cond, &slon_wait_listen_lock);
776 if (!slon_listen_started)
777 {
778 /**
779 * The local listen thread did not start up properly.
780 */
781 slon_log(SLON_FATAL, "main: localListenThread did not start\n");
782 slon_abort();
783 }
784 pthread_mutex_unlock(&slon_wait_listen_lock);
785
786 /*
787 * Enable all nodes that are active
788 */
789 rtcfg_doActivate();
790
791 /*
792 * Create the local cleanup thread that will remove old events and log
793 * data.
794 */
795 if (pthread_create(&local_cleanup_thread, NULL, cleanupThread_main, NULL) < 0)
796 {
797 slon_log(SLON_FATAL, "main: cannot create cleanupThread - %s\n",
798 strerror(errno));
799 slon_retry();
800 }
801
802 /*
803 * Create the local sync thread that will generate SYNC events if we had
804 * local database updates.
805 */
806 if (pthread_create(&local_sync_thread, NULL, syncThread_main, NULL) < 0)
807 {
808 slon_log(SLON_FATAL, "main: cannot create syncThread - %s\n",
809 strerror(errno));
810 slon_retry();
811 }
812
813 /*
814 * Create the local monitor thread that will process monitoring requests
815 */
816 if (monitor_threads)
817 {
818 if (pthread_create(&local_monitor_thread, NULL, monitorThread_main, NULL) < 0)
819 {
820 slon_log(SLON_FATAL, "main: cannot create monitorThread - %s\n",
821 strerror(errno));
822 slon_retry();
823 }
824 }
825
826 /*
827 * Wait until the scheduler has shut down all remote connections
828 */
829 slon_log(SLON_INFO, "main: running scheduler mainloop\n");
830 if (sched_wait_mainloop() < 0)
831 {
832 slon_log(SLON_FATAL, "main: scheduler returned with error\n");
833 slon_retry();
834 }
835 slon_log(SLON_INFO, "main: scheduler mainloop returned\n");
836
837 /*
838 * Wait for all remote threads to finish
839 */
840 main_thread = pthread_self();
841
842 slon_log(SLON_CONFIG, "main: wait for remote threads\n");
843 rtcfg_joinAllRemoteThreads();
844
845 /*
846 * Wait for the local threads to finish
847 */
848 if (pthread_join(local_event_thread, NULL) < 0)
849 slon_log(SLON_ERROR, "main: cannot join localListenThread - %s\n",
850 strerror(errno));
851
852 if (pthread_join(local_cleanup_thread, NULL) < 0)
853 slon_log(SLON_ERROR, "main: cannot join cleanupThread - %s\n",
854 strerror(errno));
855
856 if (pthread_join(local_sync_thread, NULL) < 0)
857 slon_log(SLON_ERROR, "main: cannot join syncThread - %s\n",
858 strerror(errno));
859
860 if (pthread_join(local_monitor_thread, NULL) < 0)
861 slon_log(SLON_ERROR, "main: cannot join monitorThread - %s\n",
862 strerror(errno));
863
864 slon_log(SLON_CONFIG, "main: done\n");
865
866 exit(0);
867 }
868
869 #ifndef WIN32
870 /* ----------
871 * SlonWatchdog
872 * ----------
873 */
874 static void
SlonWatchdog(void)875 SlonWatchdog(void)
876 {
877 pid_t pid;
878 int shutdown = 0;
879 int return_code = -99;
880 char *termination_reason = "unknown";
881
882 slon_log(SLON_INFO, "slon: watchdog process started\n");
883
884
885
886 slon_log(SLON_CONFIG, "slon: watchdog ready - pid = %d\n", slon_watchdog_pid);
887
888 slon_worker_pid = fork();
889 if (slon_worker_pid == 0)
890 {
891 SlonMain();
892 exit(-1);
893 }
894 else if (slon_worker_pid < 0)
895 {
896 slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n",
897 errno, strerror(errno));
898 slon_exit(-1);
899
900 }
901
902 /*
903 * Install signal handlers
904 */
905
906 if (install_signal_handler(SIGHUP, sighandler) == SIG_ERR)
907 {
908 slon_log(SLON_FATAL, "slon: SIGHUP signal handler setup failed -(%d) %s\n", errno, strerror(errno));
909 slon_exit(-1);
910 }
911
912 if (install_signal_handler(SIGUSR1, sighandler) == SIG_ERR)
913 {
914 slon_log(SLON_FATAL, "slon: SIGUSR1 signal handler setup failed -(%d) %s\n", errno, strerror(errno));
915 slon_exit(-1);
916 }
917 if (install_signal_handler(SIGALRM, sighandler) == SIG_ERR)
918 {
919 slon_log(SLON_FATAL, "slon: SIGALRM signal handler setup failed -(%d) %s\n", errno, strerror(errno));
920 slon_exit(-1);
921 }
922 if (install_signal_handler(SIGINT, sighandler) == SIG_ERR)
923 {
924 slon_log(SLON_FATAL, "slon: SIGINT signal handler setup failed -(%d) %s\n", errno, strerror(errno));
925 slon_exit(-1);
926 }
927 if (install_signal_handler(SIGTERM, sighandler) == SIG_ERR)
928 {
929 slon_log(SLON_FATAL, "slon: SIGTERM signal handler setup failed -(%d) %s\n", errno, strerror(errno));
930 slon_exit(-1);
931 }
932
933
934 if (install_signal_handler(SIGQUIT, sighandler) == SIG_ERR)
935 {
936 slon_log(SLON_FATAL, "slon: SIGQUIT signal handler setup failed -(%d) %s\n", errno, strerror(errno));
937 slon_exit(-1);
938 }
939
940 slon_log(SLON_CONFIG, "slon: worker process created - pid = %d\n",
941 slon_worker_pid);
942 while (!shutdown)
943 {
944 while ((pid = wait(&child_status)) != slon_worker_pid)
945 {
946 if (pid < 0 && errno == EINTR)
947 continue;
948
949 slon_log(SLON_CONFIG, "slon: child terminated status: %d; pid: %d, current worker pid: %d errno: %d\n", child_status, pid, slon_worker_pid, errno);
950
951 if (pid < 0)
952 {
953 /**
954 * if errno is not EINTR and pid<0 we have
955 * a problem.
956 * looping on wait() isn't a good idea.
957 */
958 slon_log(SLON_FATAL, "slon: wait returned an error pid:%d errno:%d\n",
959 pid, errno);
960 exit(-1);
961 }
962 }
963 if (WIFSIGNALED(child_status))
964 {
965 return_code = WTERMSIG(child_status);
966 termination_reason = "signal";
967 }
968 else if (WIFEXITED(child_status))
969 {
970 return_code = WEXITSTATUS(child_status);
971 termination_reason = "exit code";
972 }
973 slon_log(SLON_CONFIG, "slon: child terminated %s: %d; pid: %d, current worker pid: %d\n", termination_reason, return_code, pid, slon_worker_pid);
974
975
976 switch (watchdog_status)
977 {
978 case SLON_WATCHDOG_RESTART:
979 slon_log(SLON_CONFIG, "slon: restart of worker in 20 seconds\n");
980 sleep(20);
981 slon_worker_pid = fork();
982 if (slon_worker_pid == 0)
983 {
984 worker_restarted = 1;
985 SlonMain();
986 exit(-1);
987 }
988 else if (slon_worker_pid < 0)
989 {
990 slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n",
991 errno, strerror(errno));
992 slon_exit(-1);
993
994 }
995 watchdog_status = SLON_WATCHDOG_NORMAL;
996 continue;
997
998 case SLON_WATCHDOG_NORMAL:
999 case SLON_WATCHDOG_RETRY:
1000 watchdog_status = SLON_WATCHDOG_RETRY;
1001 if (child_status != 0)
1002 {
1003 slon_log(SLON_CONFIG, "slon: restart of worker in 10 seconds\n");
1004 (void) sleep(10);
1005 }
1006 else
1007 {
1008 slon_log(SLON_CONFIG, "slon: restart of worker\n");
1009 }
1010 if (watchdog_status == SLON_WATCHDOG_RETRY)
1011 {
1012 slon_worker_pid = fork();
1013 if (slon_worker_pid == 0)
1014 {
1015 worker_restarted = 1;
1016 SlonMain();
1017 exit(-1);
1018 }
1019 else if (slon_worker_pid < 0)
1020 {
1021 slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n",
1022 errno, strerror(errno));
1023 slon_exit(-1);
1024
1025 }
1026 watchdog_status = SLON_WATCHDOG_NORMAL;
1027 continue;
1028 }
1029 break;
1030
1031 default:
1032 shutdown = 1;
1033 break;
1034 } /* switch */
1035 } /* while */
1036
1037 slon_log(SLON_INFO, "slon: done\n");
1038
1039 /*
1040 * That's it.
1041 */
1042 slon_exit(0);
1043 }
1044
1045
1046 /* ----------
1047 * sighandler
1048 * ----------
1049 */
1050 static void
sighandler(int signo)1051 sighandler(int signo)
1052 {
1053 switch (signo)
1054 {
1055 case SIGALRM:
1056 kill(slon_worker_pid, SIGKILL);
1057 break;
1058
1059 case SIGCHLD:
1060 break;
1061
1062 case SIGHUP:
1063 watchdog_status = SLON_WATCHDOG_RESTART;
1064 slon_terminate_worker();
1065 break;
1066
1067 case SIGUSR1:
1068 watchdog_status = SLON_WATCHDOG_RETRY;
1069 slon_terminate_worker();
1070 break;
1071
1072 case SIGINT:
1073 case SIGTERM:
1074 watchdog_status = SLON_WATCHDOG_SHUTDOWN;
1075 slon_terminate_worker();
1076 break;
1077
1078 case SIGQUIT:
1079 kill(slon_worker_pid, SIGKILL);
1080 slon_exit(-1);
1081 break;
1082 }
1083 }
1084
1085
1086 /* ----------
1087 * slon_terminate_worker
1088 * ----------
1089 */
1090 void
slon_terminate_worker()1091 slon_terminate_worker()
1092 {
1093 (void) kill(slon_worker_pid, SIGKILL);
1094
1095 }
1096 #endif
1097 /* ----------
1098 * slon_exit
1099 * ----------
1100 */
1101 static void
slon_exit(int code)1102 slon_exit(int code)
1103 {
1104 #ifdef WIN32
1105 /* Cleanup winsock */
1106 WSACleanup();
1107 #endif
1108
1109 if (pid_file)
1110 {
1111 slon_log(SLON_INFO, "slon: remove pid file\n");
1112 (void) unlink(pid_file);
1113 }
1114
1115 slon_log(SLON_INFO, "slon: exit(%d)\n", code);
1116
1117 exit(code);
1118 }
1119
1120 static sighandler_t
install_signal_handler(int signo,sighandler_t handler)1121 install_signal_handler(int signo, sighandler_t handler)
1122 {
1123
1124
1125 #ifndef WIN32
1126 struct sigaction act;
1127
1128 act.sa_handler = handler;
1129 (void) sigemptyset(&act.sa_mask);
1130 act.sa_flags = SA_NODEFER;
1131
1132
1133
1134 if (sigaction(signo, &act, NULL) < 0)
1135 {
1136 return SIG_ERR;
1137 }
1138 return handler;
1139 #else
1140 return signal(signo, handler);
1141 #endif
1142 }
1143
1144 /*
1145 * Local Variables:
1146 * tab-width: 4
1147 * c-indent-level: 4
1148 * c-basic-offset: 4
1149 * End:
1150 */
1151