1 /*-------------------------------------------------------------------------
2  * slon.c
3  *
4  *	The control framework for the node daemon.
5  *
6  *	Copyright (c) 2003-2009, PostgreSQL Global Development Group
7  *	Author: Jan Wieck, Afilias USA INC.
8  *
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 
14 #include <pthread.h>
15 
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <errno.h>
21 #include <signal.h>
22 #ifndef WIN32
23 #include <sys/time.h>
24 #include <unistd.h>
25 #include <sys/wait.h>
26 #endif
27 #include <sys/types.h>
28 
29 
30 
31 #ifdef WIN32
32 #include <winsock2.h>
33 #include "port/win32service.h"
34 #define sleep(x) Sleep(x*1000)
35 /**
36  * some versions of MSVC seem to need these defined
37  * to finish linking.
38  */
__iob_func()39 FILE * __iob_func() {
40 
41 	FILE  result[] = {*stdin,*stdout,*stderr};
42 	return result;
43 }
44 
45 #endif
46 
47 #include "libpq-fe.h"
48 
49 #include "slon.h"
50 
51 
52 #include "confoptions.h"
53 
54 
55 
56 /* ----------
57  * Global data
58  * ----------
59  */
60 #ifndef WIN32
61 #define		SLON_WATCHDOG_NORMAL		0
62 #define		SLON_WATCHDOG_RESTART		1
63 #define		SLON_WATCHDOG_RETRY			2
64 #define		SLON_WATCHDOG_SHUTDOWN		3
65 static int	watchdog_status = SLON_WATCHDOG_NORMAL;
66 #endif
67 int			sched_wakeuppipe[2];
68 
69 pthread_mutex_t slon_wait_listen_lock;
70 pthread_cond_t slon_wait_listen_cond;
71 int			slon_listen_started = 0;
72 bool		monitor_threads;
73 
74 int			apply_cache_size;
75 
76 /* ----------
77  * Local data
78  * ----------
79  */
80 static void slon_exit(int code);
81 static pthread_t local_event_thread;
82 static pthread_t local_cleanup_thread;
83 static pthread_t local_sync_thread;
84 static pthread_t local_monitor_thread;
85 
86 static pthread_t main_thread;
87 static char *const * main_argv;
88 
89 static void SlonMain(void);
90 
91 #ifndef WIN32
92 static void SlonWatchdog(void);
93 static void sighandler(int signo);
94 void		slon_terminate_worker(void);
95 #endif
96 typedef void (*sighandler_t) (int);
97 static sighandler_t install_signal_handler(int signum, sighandler_t handler);
98 
99 int			slon_log_level;
100 char	   *pid_file;
101 char	   *archive_dir = NULL;
102 static int	child_status;
103 
104 /**
105  * A variable to indicate that the
106  * worker has been restarted by the watchdog.
107  */
108 int			worker_restarted = 0;
109 
110 /* ----------
111  * Usage
112  * ----------
113  */
114 void
Usage(char * const argv[])115 Usage(char *const argv[])
116 {
117 	fprintf(stderr, "usage: %s [options] clustername conninfo\n", argv[0]);
118 	fprintf(stderr, "\n");
119 	fprintf(stderr, "Options:\n");
120 
121 	fprintf(stderr, "    -h                    print usage message and exit\n");
122 	fprintf(stderr, "    -v                    print version and exit\n");
123 	fprintf(stderr, "    -d <debuglevel>       verbosity of logging (1..4)\n");
124 	fprintf(stderr, "    -s <milliseconds>     SYNC check interval (default 2000)\n");
125 	fprintf(stderr, "    -t <milliseconds>     SYNC interval timeout (default 10000)\n");
126 	fprintf(stderr, "    -o <milliseconds>     desired subscriber SYNC processing time\n");
127 	fprintf(stderr, "    -g <num>              maximum SYNC group size (default 20)\n");
128 	fprintf(stderr, "    -c <num>              how often to vacuum in cleanup cycles\n");
129 	fprintf(stderr, "    -p <filename>         slon pid file\n");
130 	fprintf(stderr, "    -f <filename>         slon configuration file\n");
131 	fprintf(stderr, "    -a <directory>        directory to store SYNC archive files\n");
132 	fprintf(stderr, "    -x <command>          program to run after writing archive file\n");
133 	fprintf(stderr, "    -q <num>              Terminate when this node reaches # of SYNCs\n");
134 	fprintf(stderr, "    -r <num>              # of syncs for -q option\n");
135 	fprintf(stderr, "    -l <interval>         this node should lag providers by this interval\n");
136 #ifdef WIN32
137 	fprintf(stderr, "\nWindows service registration:\n");
138 	fprintf(stderr, " slon -regservice [servicename]\n");
139 	fprintf(stderr, " slon -unregservice [servicename]\n");
140 	fprintf(stderr, " slon -listengines [servicename]\n");
141 	fprintf(stderr, " slon -addengine [servicename] <configfile>\n");
142 	fprintf(stderr, " slon -delengine [servicename] <configfile>\n");
143 #endif
144 	exit(1);
145 }
146 
147 
148 /* ----------
149  * main
150  * ----------
151  */
152 int
main(int argc,char * const argv[])153 main(int argc, char *const argv[])
154 {
155 	char	   *cp1;
156 	char	   *cp2;
157 	int			c;
158 	int			errors = 0;
159 	extern int	optind;
160 	extern char *optarg;
161 
162 
163 #ifdef WIN32
164 	WSADATA		wsaData;
165 	int			err;
166 #endif
167 
168 
169 #ifdef WIN32
170 	if (argc >= 2 && !strcmp(argv[1], "-service"))
171 	{
172 		win32_servicestart();
173 		exit(0);
174 	}
175 	if (argc >= 2 && !strcmp(argv[1], "-subservice"))
176 	{
177 		win32_isservice = 1;
178 		argc--;
179 		argv++;
180 	}
181 	if (argc >= 2 && argc <= 4 && (
182 								   !strcmp(argv[1], "-regservice") ||
183 								   !strcmp(argv[1], "-unregservice") ||
184 								   !strcmp(argv[1], "-addengine") ||
185 								   !strcmp(argv[1], "-delengine") ||
186 								   !strcmp(argv[1], "-listengines")))
187 	{
188 		win32_serviceconfig(argc, argv);
189 	}
190 #endif
191 
192 	InitializeConfOptions();
193 
194 	while ((c = getopt(argc, argv, "f:a:d:s:t:g:c:p:o:q:r:l:x:hv?")) != EOF)
195 	{
196 		switch (c)
197 		{
198 			case '?':
199 				Usage(argv);
200 			case 'q':
201 				set_config_option("quit_sync_provider", optarg);
202 				break;
203 
204 			case 'r':
205 				set_config_option("quit_sync_finalsync", optarg);
206 				break;
207 
208 			case 'f':
209 				ProcessConfigFile(optarg);
210 				break;
211 
212 			case 'a':
213 				set_config_option("archive_dir", optarg);
214 				break;
215 
216 			case 'd':
217 				set_config_option("log_level", optarg);
218 				break;
219 
220 			case 's':
221 				set_config_option("sync_interval", optarg);
222 				break;
223 
224 			case 't':
225 				set_config_option("sync_interval_timeout", optarg);
226 				break;
227 
228 			case 'g':
229 				set_config_option("sync_group_maxsize", optarg);
230 				break;
231 
232 			case 'c':
233 				set_config_option("vac_frequency", optarg);
234 				break;
235 
236 			case 'p':
237 				set_config_option("pid_file", optarg);
238 				break;
239 
240 			case 'o':
241 				set_config_option("desired_sync_time", optarg);
242 				break;
243 
244 			case 'l':
245 				set_config_option("lag_interval", optarg);
246 				break;
247 
248 			case 'h':
249 				errors++;
250 				break;
251 
252 			case 'v':
253 				printf("slon version %s\n", SLONY_I_VERSION_STRING);
254 				exit(0);
255 				break;
256 
257 			case 'x':
258 				set_config_option("command_on_logarchive", optarg);
259 				break;
260 
261 			default:
262 				fprintf(stderr, "unknown option '%c'\n", c);
263 				errors++;
264 				break;
265 		}
266 	}
267 
268 	/*
269 	 * Make sure the sync interval timeout isn't too small.
270 	 */
271 	if (sync_interval_timeout != 0 && sync_interval_timeout <= sync_interval)
272 		sync_interval_timeout = sync_interval * 2;
273 
274 	/*
275 	 * Remember the cluster name and build the properly quoted namespace
276 	 * identifier
277 	 */
278 	slon_pid = getpid();
279 #ifndef WIN32
280 	if (pthread_mutex_init(&slon_watchdog_lock, NULL) < 0)
281 	{
282 		slon_log(SLON_FATAL, "slon: pthread_mutex_init() - %s\n",
283 				 strerror(errno));
284 		exit(-1);
285 	}
286 	slon_watchdog_pid = slon_pid;
287 	slon_worker_pid = -1;
288 #endif
289 	main_argv = argv;
290 
291 	if ((char *) argv[optind])
292 	{
293 		set_config_option("cluster_name", (char *) argv[optind]);
294 		set_config_option("conn_info", (char *) argv[++optind]);
295 	}
296 
297 	if (rtcfg_cluster_name != NULL)
298 	{
299 		rtcfg_namespace = malloc(strlen(rtcfg_cluster_name) * 2 + 4);
300 		cp2 = rtcfg_namespace;
301 		*cp2++ = '"';
302 		*cp2++ = '_';
303 		for (cp1 = (char *) rtcfg_cluster_name; *cp1; cp1++)
304 		{
305 			if (*cp1 == '"')
306 				*cp2++ = '"';
307 			*cp2++ = *cp1;
308 		}
309 		*cp2++ = '"';
310 		*cp2 = '\0';
311 	}
312 	else
313 	{
314 		errors++;
315 	}
316 
317 	slon_log(SLON_CONFIG, "main: slon version %s starting up\n",
318 			 SLONY_I_VERSION_STRING);
319 
320 	/*
321 	 * Remember the connection information for the local node.
322 	 */
323 	if (rtcfg_conninfo == NULL)
324 	{
325 		errors++;
326 	}
327 
328 	if (errors != 0)
329 	{
330 		Usage(argv);
331 	}
332 
333 #ifdef WIN32
334 
335 	/*
336 	 * Startup the network subsystem, in case our libpq doesn't
337 	 */
338 	err = WSAStartup(MAKEWORD(1, 1), &wsaData);
339 	if (err != 0)
340 	{
341 		slon_log(SLON_FATAL, "main: Cannot start the network subsystem - %d\n", err);
342 		exit(-1);
343 	}
344 #endif
345 
346 	if (pid_file)
347 	{
348 		FILE	   *pidfile;
349 
350 		pidfile = fopen(pid_file, "w");
351 		if (pidfile)
352 		{
353 			fprintf(pidfile, "%d\n", slon_pid);
354 			fclose(pidfile);
355 		}
356 		else
357 		{
358 			slon_log(SLON_FATAL, "Cannot open pid_file \"%s\"\n", pid_file);
359 			exit(-1);
360 		}
361 	}
362 
363 	/*
364 	 * Create the pipe used to kick the workers scheduler thread
365 	 */
366 	if (pgpipe(sched_wakeuppipe) < 0)
367 	{
368 		slon_log(SLON_FATAL, "slon: sched_wakeuppipe create failed -(%d) %s\n", errno, strerror(errno));
369 		slon_exit(-1);
370 	}
371 
372 	if (!PQisthreadsafe())
373 	{
374 		slon_log(SLON_FATAL, "slon: libpq was not compiled with thread safety enabled (normally: --enable-thread-safety).  slon is a multithreaded application requiring thread-safe libpq\n");
375 		slon_exit(-1);
376 	}
377 
378 	if (!PQisthreadsafe())
379 	{
380 		slon_log(SLON_FATAL, "slon: libpq was not compiled with --enable-thread-safety. Slony-I requires a thread enabled libpq\n");
381 		slon_exit(-1);
382 	}
383 
384 	/*
385 	 * There is no watchdog process on win32. We delegate restarting and other
386 	 * such tasks to the Service Control Manager. And win32 doesn't support
387 	 * signals, so we don't need to catch them...
388 	 */
389 #ifndef WIN32
390 	SlonWatchdog();
391 #else
392 	SlonMain();
393 #endif
394 	exit(0);
395 }
396 
397 
398 /* ----------
399  * SlonMain
400  * ----------
401  */
402 static void
SlonMain(void)403 SlonMain(void)
404 {
405 	PGresult   *res;
406 	SlonDString query;
407 	int			i,
408 				n;
409 	PGconn	   *startup_conn;
410 
411 	slon_pid = getpid();
412 #ifndef WIN32
413 	slon_worker_pid = slon_pid;
414 #endif
415 
416 	if (pthread_mutex_init(&slon_wait_listen_lock, NULL) < 0)
417 	{
418 		slon_log(SLON_FATAL, "main: pthread_mutex_init() failed - %s\n",
419 				 strerror(errno));
420 		slon_abort();
421 	}
422 	if (pthread_cond_init(&slon_wait_listen_cond, NULL) < 0)
423 	{
424 		slon_log(SLON_FATAL, "main: pthread_cond_init() failed - %s\n",
425 				 strerror(errno));
426 		slon_abort();
427 	}
428 
429 
430 	/*
431 	 * Dump out current configuration - all elements of the various arrays...
432 	 */
433 	dump_configuration();
434 
435 	/*
436 	 * Connect to the local database to read the initial configuration
437 	 */
438 	startup_conn = PQconnectdb(rtcfg_conninfo);
439 	if (startup_conn == NULL)
440 	{
441 		slon_log(SLON_FATAL, "main: PQconnectdb() failed - sleep 10s\n");
442 		sleep(10);
443 		slon_retry();
444 		exit(-1);
445 	}
446 	if (PQstatus(startup_conn) != CONNECTION_OK)
447 	{
448 		slon_log(SLON_FATAL, "main: Cannot connect to local database - %s - sleep 10s\n",
449 				 PQerrorMessage(startup_conn));
450 		PQfinish(startup_conn);
451 		sleep(10);
452 		slon_retry();
453 		exit(-1);
454 	}
455 
456 	/*
457 	 * Get our local node ID
458 	 */
459 	rtcfg_nodeid = db_getLocalNodeId(startup_conn);
460 	if (rtcfg_nodeid < 0)
461 	{
462 		slon_log(SLON_FATAL, "main: Node is not initialized properly - sleep 10s\n");
463 		sleep(10);
464 		slon_retry();
465 		exit(-1);
466 	}
467 	if (db_checkSchemaVersion(startup_conn) < 0)
468 	{
469 		slon_log(SLON_FATAL, "main: Node has wrong Slony-I schema or module version loaded\n");
470 		slon_abort();
471 	}
472 	slon_log(SLON_CONFIG, "main: local node id = %d\n", rtcfg_nodeid);
473 
474 	dstring_init(&query);
475 	slon_mkquery(&query, "select %s.slon_node_health_check();", rtcfg_namespace);
476 	res = PQexec(startup_conn, dstring_data(&query));
477 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
478 	{
479 		slon_log(SLON_FATAL, "could not call slon_node_health_check() - %",
480 				 PQresultErrorMessage(res));
481 		slon_abort();
482 	}
483 	else
484 	{
485 		if (PQntuples(res) != 1)
486 		{
487 			slon_log(SLON_FATAL,
488 					 "query '%s' returned %d rows (expected 1)\n",
489 					 query, PQntuples(res));
490 			slon_abort();
491 		}
492 		else
493 		{
494 			if (*(PQgetvalue(res, 0, 0)) == 'f')
495 			{
496 				slon_log(SLON_FATAL,
497 						 "slon_node_health_check() returned false - fatal health problem!\n%s\nREPAIR CONFIG may be helpful to rectify this problem\n",
498 						 PQresultErrorMessage(res));
499 				slon_abort();
500 			}
501 		}
502 	}
503 	PQclear(res);
504 	dstring_free(&query);
505 
506 #ifndef WIN32
507 	if (signal(SIGHUP, SIG_IGN) == SIG_ERR)
508 	{
509 		slon_log(SLON_FATAL, "main: SIGHUP signal handler setup failed -(%d) %s\n", errno, strerror(errno));
510 		slon_abort();
511 	}
512 	if (signal(SIGINT, SIG_IGN) == SIG_ERR)
513 	{
514 		slon_log(SLON_FATAL, "main: SIGINT signal handler setup failed -(%d) %s\n", errno, strerror(errno));
515 		slon_abort();
516 	}
517 	if (signal(SIGTERM, SIG_IGN) == SIG_ERR)
518 	{
519 		slon_log(SLON_FATAL, "main: SIGTERM signal handler setup failed -(%d) %s\n", errno, strerror(errno));
520 		slon_abort();
521 	}
522 	if (signal(SIGCHLD, SIG_IGN) == SIG_ERR)
523 	{
524 		slon_log(SLON_FATAL, "main: SIGCHLD signal handler setup failed -(%d) %s\n", errno, strerror(errno));
525 		slon_abort();
526 	}
527 	if (signal(SIGQUIT, SIG_IGN) == SIG_ERR)
528 	{
529 		slon_log(SLON_FATAL, "main: SIGQUIT signal handler setup failed -(%d) %s\n", errno, strerror(errno));
530 		slon_abort();
531 	}
532 #endif
533 
534 	slon_log(SLON_INFO, "main: main process started\n");
535 
536 	/*
537 	 * Start the event scheduling system
538 	 */
539 	slon_log(SLON_CONFIG, "main: launching sched_start_mainloop\n");
540 	if (sched_start_mainloop() < 0)
541 		slon_retry();
542 
543 	slon_log(SLON_CONFIG, "main: loading current cluster configuration\n");
544 
545 	/*
546 	 * Begin a transaction
547 	 */
548 	res = PQexec(startup_conn,
549 				 "start transaction; "
550 				 "set transaction isolation level serializable;");
551 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
552 	{
553 		slon_log(SLON_FATAL, "Cannot start transaction - %s - sleep 10s\n",
554 				 PQresultErrorMessage(res));
555 		sleep(10);
556 		PQclear(res);
557 		slon_retry();
558 	}
559 	PQclear(res);
560 
561 	/*
562 	 * Read configuration table sl_node
563 	 */
564 	dstring_init(&query);
565 	slon_mkquery(&query,
566 				 "select no_id, no_active, no_comment, "
567 				 "    (select coalesce(max(con_seqno),0) from %s.sl_confirm "
568 				 "        where con_origin = no_id and con_received = %d) "
569 				 "        as last_event, "
570 				 "    (select ev_snapshot from %s.sl_event "
571 				 "        where ev_origin = no_id "
572 				 "        and ev_seqno = (select max(ev_seqno) "
573 				 "                    from %s.sl_event "
574 				 "                    where ev_origin = no_id "
575 			   "                    and ev_type = 'SYNC')) as last_snapshot "
576 				 "from %s.sl_node "
577 				 "order by no_id; ",
578 				 rtcfg_namespace, rtcfg_nodeid,
579 				 rtcfg_namespace, rtcfg_namespace,
580 				 rtcfg_namespace);
581 	res = PQexec(startup_conn, dstring_data(&query));
582 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
583 	{
584 		slon_log(SLON_FATAL, "main: Cannot get node list - %s\n",
585 				 PQresultErrorMessage(res));
586 		PQclear(res);
587 		dstring_free(&query);
588 		slon_retry();
589 	}
590 	for (i = 0, n = PQntuples(res); i < n; i++)
591 	{
592 		int			no_id = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
593 		int			no_active = (*PQgetvalue(res, i, 1) == 't') ? 1 : 0;
594 		char	   *no_comment = PQgetvalue(res, i, 2);
595 		int64		last_event;
596 
597 		if (no_id == rtcfg_nodeid)
598 		{
599 			/*
600 			 * Complete our own local node entry
601 			 */
602 			rtcfg_nodeactive = no_active;
603 			rtcfg_nodecomment = strdup(no_comment);
604 		}
605 		else
606 		{
607 			/*
608 			 * Add a remote node
609 			 */
610 			slon_scanint64(PQgetvalue(res, i, 3), &last_event);
611 			rtcfg_storeNode(no_id, no_comment);
612 			rtcfg_setNodeLastEvent(no_id, last_event);
613 			rtcfg_setNodeLastSnapshot(no_id, PQgetvalue(res, i, 4));
614 
615 			/*
616 			 * If it is active, remember for activation just before we start
617 			 * processing events.
618 			 */
619 			if (no_active)
620 				rtcfg_needActivate(no_id);
621 		}
622 	}
623 	PQclear(res);
624 
625 	/*
626 	 * Read configuration table sl_path - the interesting pieces
627 	 */
628 	slon_mkquery(&query,
629 				 "select pa_server, pa_conninfo, pa_connretry "
630 				 "from %s.sl_path where pa_client = %d"
631 				 " and pa_conninfo<>'<event pending>'",
632 				 rtcfg_namespace, rtcfg_nodeid);
633 	res = PQexec(startup_conn, dstring_data(&query));
634 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
635 	{
636 		slon_log(SLON_FATAL, "main: Cannot get path config - %s\n",
637 				 PQresultErrorMessage(res));
638 		PQclear(res);
639 		dstring_free(&query);
640 		slon_retry();
641 	}
642 	for (i = 0, n = PQntuples(res); i < n; i++)
643 	{
644 		int			pa_server = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
645 		char	   *pa_conninfo = PQgetvalue(res, i, 1);
646 		int			pa_connretry = (int) strtol(PQgetvalue(res, i, 2), NULL, 10);
647 
648 		rtcfg_storePath(pa_server, pa_conninfo, pa_connretry);
649 	}
650 	PQclear(res);
651 
652 	/*
653 	 * Load the initial listen configuration
654 	 */
655 	rtcfg_reloadListen(startup_conn);
656 
657 	/*
658 	 * Read configuration table sl_set
659 	 */
660 	slon_mkquery(&query,
661 				 "select set_id, set_origin, set_comment "
662 				 "from %s.sl_set",
663 				 rtcfg_namespace);
664 	res = PQexec(startup_conn, dstring_data(&query));
665 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
666 	{
667 		slon_log(SLON_FATAL, "main: Cannot get set config - %s\n",
668 				 PQresultErrorMessage(res));
669 		PQclear(res);
670 		dstring_free(&query);
671 		slon_retry();
672 	}
673 	for (i = 0, n = PQntuples(res); i < n; i++)
674 	{
675 		int			set_id = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
676 		int			set_origin = (int) strtol(PQgetvalue(res, i, 1), NULL, 10);
677 		char	   *set_comment = PQgetvalue(res, i, 2);
678 
679 		rtcfg_storeSet(set_id, set_origin, set_comment);
680 	}
681 	PQclear(res);
682 
683 	/*
684 	 * Read configuration table sl_subscribe - only subscriptions for local
685 	 * node
686 	 */
687 	slon_mkquery(&query,
688 				 "select sub_set, sub_provider, sub_forward, sub_active "
689 				 "from %s.sl_subscribe "
690 				 "where sub_receiver = %d",
691 				 rtcfg_namespace, rtcfg_nodeid);
692 	res = PQexec(startup_conn, dstring_data(&query));
693 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
694 	{
695 		slon_log(SLON_FATAL, "main: Cannot get subscription config - %s\n",
696 				 PQresultErrorMessage(res));
697 		PQclear(res);
698 		dstring_free(&query);
699 		slon_retry();
700 	}
701 	for (i = 0, n = PQntuples(res); i < n; i++)
702 	{
703 		int			sub_set = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
704 		int			sub_provider = (int) strtol(PQgetvalue(res, i, 1), NULL, 10);
705 		char	   *sub_forward = PQgetvalue(res, i, 2);
706 		char	   *sub_active = PQgetvalue(res, i, 3);
707 
708 		rtcfg_storeSubscribe(sub_set, sub_provider, sub_forward);
709 		if (*sub_active == 't')
710 			rtcfg_enableSubscription(sub_set, sub_provider, sub_forward);
711 	}
712 	PQclear(res);
713 
714 	/*
715 	 * Remember the last known local event sequence
716 	 */
717 	slon_mkquery(&query,
718 				 "select coalesce(max(ev_seqno), -1) from %s.sl_event "
719 				 "where ev_origin = '%d'",
720 				 rtcfg_namespace, rtcfg_nodeid);
721 	res = PQexec(startup_conn, dstring_data(&query));
722 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
723 	{
724 		slon_log(SLON_FATAL, "main: Cannot get last local eventid - %s\n",
725 				 PQresultErrorMessage(res));
726 		PQclear(res);
727 		dstring_free(&query);
728 		slon_retry();
729 	}
730 	if (PQntuples(res) == 0)
731 		strcpy(rtcfg_lastevent, "-1");
732 	else if (PQgetisnull(res, 0, 0))
733 		strcpy(rtcfg_lastevent, "-1");
734 	else
735 		strcpy(rtcfg_lastevent, PQgetvalue(res, 0, 0));
736 	PQclear(res);
737 	dstring_free(&query);
738 	slon_log(SLON_CONFIG,
739 			 "main: last local event sequence = %s\n",
740 			 rtcfg_lastevent);
741 
742 	/*
743 	 * Rollback the transaction we used to get the config snapshot
744 	 */
745 	res = PQexec(startup_conn, "rollback transaction;");
746 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
747 	{
748 		slon_log(SLON_FATAL, "main: Cannot rollback transaction - %s\n",
749 				 PQresultErrorMessage(res));
750 		PQclear(res);
751 		slon_retry();
752 	}
753 	PQclear(res);
754 
755 	/*
756 	 * Done with the startup, don't need the local connection any more.
757 	 */
758 	PQfinish(startup_conn);
759 
760 	slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n");
761 
762 	/*
763 	 * Create the local event thread that monitors the local node for
764 	 * administrative events to adjust the configuration at runtime. We wait
765 	 * here until the local listen thread has checked that there is no other
766 	 * slon daemon running.
767 	 */
768 	pthread_mutex_lock(&slon_wait_listen_lock);
769 	if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0)
770 	{
771 		slon_log(SLON_FATAL, "main: cannot create localListenThread - %s\n",
772 				 strerror(errno));
773 		slon_retry();
774 	}
775 	pthread_cond_wait(&slon_wait_listen_cond, &slon_wait_listen_lock);
776 	if (!slon_listen_started)
777 	{
778 		/**
779 		 * The local listen thread did not start up properly.
780 		 */
781 		slon_log(SLON_FATAL, "main: localListenThread did not start\n");
782 		slon_abort();
783 	}
784 	pthread_mutex_unlock(&slon_wait_listen_lock);
785 
786 	/*
787 	 * Enable all nodes that are active
788 	 */
789 	rtcfg_doActivate();
790 
791 	/*
792 	 * Create the local cleanup thread that will remove old events and log
793 	 * data.
794 	 */
795 	if (pthread_create(&local_cleanup_thread, NULL, cleanupThread_main, NULL) < 0)
796 	{
797 		slon_log(SLON_FATAL, "main: cannot create cleanupThread - %s\n",
798 				 strerror(errno));
799 		slon_retry();
800 	}
801 
802 	/*
803 	 * Create the local sync thread that will generate SYNC events if we had
804 	 * local database updates.
805 	 */
806 	if (pthread_create(&local_sync_thread, NULL, syncThread_main, NULL) < 0)
807 	{
808 		slon_log(SLON_FATAL, "main: cannot create syncThread - %s\n",
809 				 strerror(errno));
810 		slon_retry();
811 	}
812 
813 	/*
814 	 * Create the local monitor thread that will process monitoring requests
815 	 */
816 	if (monitor_threads)
817 	{
818 		if (pthread_create(&local_monitor_thread, NULL, monitorThread_main, NULL) < 0)
819 		{
820 			slon_log(SLON_FATAL, "main: cannot create monitorThread - %s\n",
821 					 strerror(errno));
822 			slon_retry();
823 		}
824 	}
825 
826 	/*
827 	 * Wait until the scheduler has shut down all remote connections
828 	 */
829 	slon_log(SLON_INFO, "main: running scheduler mainloop\n");
830 	if (sched_wait_mainloop() < 0)
831 	{
832 		slon_log(SLON_FATAL, "main: scheduler returned with error\n");
833 		slon_retry();
834 	}
835 	slon_log(SLON_INFO, "main: scheduler mainloop returned\n");
836 
837 	/*
838 	 * Wait for all remote threads to finish
839 	 */
840 	main_thread = pthread_self();
841 
842 	slon_log(SLON_CONFIG, "main: wait for remote threads\n");
843 	rtcfg_joinAllRemoteThreads();
844 
845 	/*
846 	 * Wait for the local threads to finish
847 	 */
848 	if (pthread_join(local_event_thread, NULL) < 0)
849 		slon_log(SLON_ERROR, "main: cannot join localListenThread - %s\n",
850 				 strerror(errno));
851 
852 	if (pthread_join(local_cleanup_thread, NULL) < 0)
853 		slon_log(SLON_ERROR, "main: cannot join cleanupThread - %s\n",
854 				 strerror(errno));
855 
856 	if (pthread_join(local_sync_thread, NULL) < 0)
857 		slon_log(SLON_ERROR, "main: cannot join syncThread - %s\n",
858 				 strerror(errno));
859 
860 	if (pthread_join(local_monitor_thread, NULL) < 0)
861 		slon_log(SLON_ERROR, "main: cannot join monitorThread - %s\n",
862 				 strerror(errno));
863 
864 	slon_log(SLON_CONFIG, "main: done\n");
865 
866 	exit(0);
867 }
868 
869 #ifndef WIN32
870 /* ----------
871  * SlonWatchdog
872  * ----------
873  */
874 static void
SlonWatchdog(void)875 SlonWatchdog(void)
876 {
877 	pid_t		pid;
878 	int			shutdown = 0;
879 	int			return_code = -99;
880 	char	   *termination_reason = "unknown";
881 
882 	slon_log(SLON_INFO, "slon: watchdog process started\n");
883 
884 
885 
886 	slon_log(SLON_CONFIG, "slon: watchdog ready - pid = %d\n", slon_watchdog_pid);
887 
888 	slon_worker_pid = fork();
889 	if (slon_worker_pid == 0)
890 	{
891 		SlonMain();
892 		exit(-1);
893 	}
894 	else if (slon_worker_pid < 0)
895 	{
896 		slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n",
897 				 errno, strerror(errno));
898 		slon_exit(-1);
899 
900 	}
901 
902 	/*
903 	 * Install signal handlers
904 	 */
905 
906 	if (install_signal_handler(SIGHUP, sighandler) == SIG_ERR)
907 	{
908 		slon_log(SLON_FATAL, "slon: SIGHUP signal handler setup failed -(%d) %s\n", errno, strerror(errno));
909 		slon_exit(-1);
910 	}
911 
912 	if (install_signal_handler(SIGUSR1, sighandler) == SIG_ERR)
913 	{
914 		slon_log(SLON_FATAL, "slon: SIGUSR1 signal handler setup failed -(%d) %s\n", errno, strerror(errno));
915 		slon_exit(-1);
916 	}
917 	if (install_signal_handler(SIGALRM, sighandler) == SIG_ERR)
918 	{
919 		slon_log(SLON_FATAL, "slon: SIGALRM signal handler setup failed -(%d) %s\n", errno, strerror(errno));
920 		slon_exit(-1);
921 	}
922 	if (install_signal_handler(SIGINT, sighandler) == SIG_ERR)
923 	{
924 		slon_log(SLON_FATAL, "slon: SIGINT signal handler setup failed -(%d) %s\n", errno, strerror(errno));
925 		slon_exit(-1);
926 	}
927 	if (install_signal_handler(SIGTERM, sighandler) == SIG_ERR)
928 	{
929 		slon_log(SLON_FATAL, "slon: SIGTERM signal handler setup failed -(%d) %s\n", errno, strerror(errno));
930 		slon_exit(-1);
931 	}
932 
933 
934 	if (install_signal_handler(SIGQUIT, sighandler) == SIG_ERR)
935 	{
936 		slon_log(SLON_FATAL, "slon: SIGQUIT signal handler setup failed -(%d) %s\n", errno, strerror(errno));
937 		slon_exit(-1);
938 	}
939 
940 	slon_log(SLON_CONFIG, "slon: worker process created - pid = %d\n",
941 			 slon_worker_pid);
942 	while (!shutdown)
943 	{
944 		while ((pid = wait(&child_status)) != slon_worker_pid)
945 		{
946 			if (pid < 0 && errno == EINTR)
947 				continue;
948 
949 			slon_log(SLON_CONFIG, "slon: child terminated status: %d; pid: %d, current worker pid: %d errno: %d\n", child_status, pid, slon_worker_pid, errno);
950 
951 			if (pid < 0)
952 			{
953 				/**
954 				 * if errno is not EINTR and pid<0 we have
955 				 * a problem.
956 				 * looping on wait() isn't a good idea.
957 				 */
958 				slon_log(SLON_FATAL, "slon: wait returned an error pid:%d errno:%d\n",
959 						 pid, errno);
960 				exit(-1);
961 			}
962 		}
963 		if (WIFSIGNALED(child_status))
964 		{
965 			return_code = WTERMSIG(child_status);
966 			termination_reason = "signal";
967 		}
968 		else if (WIFEXITED(child_status))
969 		{
970 			return_code = WEXITSTATUS(child_status);
971 			termination_reason = "exit code";
972 		}
973 		slon_log(SLON_CONFIG, "slon: child terminated %s: %d; pid: %d, current worker pid: %d\n", termination_reason, return_code, pid, slon_worker_pid);
974 
975 
976 		switch (watchdog_status)
977 		{
978 			case SLON_WATCHDOG_RESTART:
979 				slon_log(SLON_CONFIG, "slon: restart of worker in 20 seconds\n");
980 				sleep(20);
981 				slon_worker_pid = fork();
982 				if (slon_worker_pid == 0)
983 				{
984 					worker_restarted = 1;
985 					SlonMain();
986 					exit(-1);
987 				}
988 				else if (slon_worker_pid < 0)
989 				{
990 					slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n",
991 							 errno, strerror(errno));
992 					slon_exit(-1);
993 
994 				}
995 				watchdog_status = SLON_WATCHDOG_NORMAL;
996 				continue;
997 
998 			case SLON_WATCHDOG_NORMAL:
999 			case SLON_WATCHDOG_RETRY:
1000 				watchdog_status = SLON_WATCHDOG_RETRY;
1001 				if (child_status != 0)
1002 				{
1003 					slon_log(SLON_CONFIG, "slon: restart of worker in 10 seconds\n");
1004 					(void) sleep(10);
1005 				}
1006 				else
1007 				{
1008 					slon_log(SLON_CONFIG, "slon: restart of worker\n");
1009 				}
1010 				if (watchdog_status == SLON_WATCHDOG_RETRY)
1011 				{
1012 					slon_worker_pid = fork();
1013 					if (slon_worker_pid == 0)
1014 					{
1015 						worker_restarted = 1;
1016 						SlonMain();
1017 						exit(-1);
1018 					}
1019 					else if (slon_worker_pid < 0)
1020 					{
1021 						slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n",
1022 								 errno, strerror(errno));
1023 						slon_exit(-1);
1024 
1025 					}
1026 					watchdog_status = SLON_WATCHDOG_NORMAL;
1027 					continue;
1028 				}
1029 				break;
1030 
1031 			default:
1032 				shutdown = 1;
1033 				break;
1034 		}						/* switch */
1035 	}							/* while */
1036 
1037 	slon_log(SLON_INFO, "slon: done\n");
1038 
1039 	/*
1040 	 * That's it.
1041 	 */
1042 	slon_exit(0);
1043 }
1044 
1045 
1046 /* ----------
1047  * sighandler
1048  * ----------
1049  */
1050 static void
sighandler(int signo)1051 sighandler(int signo)
1052 {
1053 	switch (signo)
1054 	{
1055 		case SIGALRM:
1056 			kill(slon_worker_pid, SIGKILL);
1057 			break;
1058 
1059 		case SIGCHLD:
1060 			break;
1061 
1062 		case SIGHUP:
1063 			watchdog_status = SLON_WATCHDOG_RESTART;
1064 			slon_terminate_worker();
1065 			break;
1066 
1067 		case SIGUSR1:
1068 			watchdog_status = SLON_WATCHDOG_RETRY;
1069 			slon_terminate_worker();
1070 			break;
1071 
1072 		case SIGINT:
1073 		case SIGTERM:
1074 			watchdog_status = SLON_WATCHDOG_SHUTDOWN;
1075 			slon_terminate_worker();
1076 			break;
1077 
1078 		case SIGQUIT:
1079 			kill(slon_worker_pid, SIGKILL);
1080 			slon_exit(-1);
1081 			break;
1082 	}
1083 }
1084 
1085 
1086 /* ----------
1087  * slon_terminate_worker
1088  * ----------
1089  */
1090 void
slon_terminate_worker()1091 slon_terminate_worker()
1092 {
1093 	(void) kill(slon_worker_pid, SIGKILL);
1094 
1095 }
1096 #endif
1097 /* ----------
1098  * slon_exit
1099  * ----------
1100  */
1101 static void
slon_exit(int code)1102 slon_exit(int code)
1103 {
1104 #ifdef WIN32
1105 	/* Cleanup winsock */
1106 	WSACleanup();
1107 #endif
1108 
1109 	if (pid_file)
1110 	{
1111 		slon_log(SLON_INFO, "slon: remove pid file\n");
1112 		(void) unlink(pid_file);
1113 	}
1114 
1115 	slon_log(SLON_INFO, "slon: exit(%d)\n", code);
1116 
1117 	exit(code);
1118 }
1119 
1120 static sighandler_t
install_signal_handler(int signo,sighandler_t handler)1121 install_signal_handler(int signo, sighandler_t handler)
1122 {
1123 
1124 
1125 #ifndef WIN32
1126 	struct sigaction act;
1127 
1128 	act.sa_handler = handler;
1129 	(void) sigemptyset(&act.sa_mask);
1130 	act.sa_flags = SA_NODEFER;
1131 
1132 
1133 
1134 	if (sigaction(signo, &act, NULL) < 0)
1135 	{
1136 		return SIG_ERR;
1137 	}
1138 	return handler;
1139 #else
1140 	return signal(signo, handler);
1141 #endif
1142 }
1143 
1144 /*
1145  * Local Variables:
1146  *	tab-width: 4
1147  *	c-indent-level: 4
1148  *	c-basic-offset: 4
1149  * End:
1150  */
1151