1 /*-------------------------------------------------------------------------
2  * ipcutil.c
3  *
4  *	Semaphore and message passing support for the slony_logshipper.
5  *
6  *	Copyright (c) 2003-2009, PostgreSQL Global Development Group
7  *	Author: Jan Wieck, Afilias USA INC.
8  *
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 
14 #ifndef WIN32
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <stdarg.h>
18 #include <unistd.h>
19 #include <sys/types.h>
20 #include <sys/ipc.h>
21 #include <sys/sem.h>
22 #include <sys/msg.h>
23 #include <sys/wait.h>
24 #include <errno.h>
25 #include <signal.h>
26 #include <string.h>
27 #endif
28 #include "../slonik/types.h"
29 #include "libpq-fe.h"
30 
31 #include "slony_logshipper.h"
32 
33 
34 /*
35  * The daemonized logshipper keeps a sorted queue of archive
36  * files that need processing.
37  */
38 typedef struct queue_elem_s
39 {
40 	char	   *archive_path;
41 	struct queue_elem_s *next;
42 }	queue_elem;
43 
44 
45 /*
46  * Static data
47  */
48 static char *ipc_archive_dir = NULL;
49 static key_t semkey;
50 static key_t msgkey;
51 static int	semid;
52 static int	msgid;
53 static int	ipc_creator;
54 static queue_elem *archive_queue_head = NULL;
55 static queue_elem *archive_queue_tail = NULL;
56 
57 
58 /*
59  * Local functions
60  */
61 static int	ipc_generate_keys(char *archive_dir);
62 static void ipc_sighandler(int sig);
63 static int	ipc_add_path(char *path);
64 static int	ipc_send_code(char *archive_dir, int code);
65 
66 
67 /*
68  * ipc_init() -
69  *
70  *	Called to setup or connect to the semaphore set and message queue.
71  */
72 int
ipc_init(char * archive_dir)73 ipc_init(char *archive_dir)
74 {
75 	struct sembuf sops[2];
76 
77 	if (ipc_generate_keys(archive_dir) < 0)
78 		return -1;
79 
80 	/*
81 	 * We eventually have to start over again in case the existing daemon
82 	 * destroys the semaphore set after we attached and before we can lock it.
83 	 */
84 	while (true)
85 	{
86 		/*
87 		 * Create or attach to the semaphore set
88 		 */
89 		semid = semget(semkey, 3, 0700 | IPC_CREAT);
90 		if (semid < 0)
91 		{
92 			fprintf(stderr, "cannot create or attache to semaphore set\n"
93 					"semget(): %s\n", strerror(errno));
94 			return -1;
95 		}
96 
97 		/*
98 		 * We now do two initial operations with NOWAIT: wait for #1 =0 inc
99 		 * sem	#1 +1 We never again touch semaphore #1, so this either
100 		 * succeeds, meaning that we created the set and hold the current
101 		 * lock. Or it fails with EAGAIN, meaning we attached to an existing
102 		 * set. Or it fails with EIDRM, meaning the set was destroyed.
103 		 */
104 		sops[0].sem_num = 1;
105 		sops[0].sem_op = 0;
106 		sops[0].sem_flg = IPC_NOWAIT;
107 		sops[1].sem_num = 1;
108 		sops[1].sem_op = 1;
109 		sops[1].sem_flg = 0;
110 
111 		if (semop(semid, sops, 2) < 0)
112 		{
113 			if (errno == EIDRM)
114 				continue;
115 
116 			if (errno != EAGAIN)
117 			{
118 				fprintf(stderr, "semop failed in ipc_init(): %s",
119 						strerror(errno));
120 				return -1;
121 			}
122 
123 			/*
124 			 * Grab the lock on semaphore #0
125 			 */
126 			if (ipc_lock() < 0)
127 			{
128 				/*
129 				 * Since theres a gap between attaching and locking, the set
130 				 * could have been destroyed. In that case, start over.
131 				 */
132 				if (errno == EIDRM)
133 					continue;
134 
135 				fprintf(stderr, "semop() failed in ipc_init(): %s\n",
136 						strerror(errno));
137 				return -1;
138 			}
139 
140 			ipc_creator = 0;
141 		}
142 		else
143 		{
144 			/*
145 			 * Initial semop succeeded - we are the creator of this set.
146 			 */
147 			ipc_creator = 1;
148 			signal(SIGINT, ipc_sighandler);
149 			signal(SIGTERM, ipc_sighandler);
150 			signal(SIGPIPE, ipc_sighandler);
151 		}
152 
153 		break;
154 	}
155 
156 	/*
157 	 * At this point we have the semaphore set and it is locked.
158 	 */
159 	msgid = msgget(msgkey, 0700 | IPC_CREAT);
160 	if (msgid < 0)
161 	{
162 		fprintf(stderr, "msgget() failed in ipc_init(): %s\n", strerror(errno));
163 		if (ipc_creator)
164 		{
165 			if (semctl(semid, 0, IPC_RMID) < 0)
166 				fprintf(stderr, "semctl() failed in ipc_init(): %s\n",
167 						strerror(errno));
168 		}
169 		return -1;
170 	}
171 
172 	return ipc_creator;
173 }
174 
175 
176 /*
177  * ipc_finish() -
178  *
179  *	Locks and destroys the semaphore set and message queue if called
180  *	in the daemonized queue runner. If force isn't given, it will
181  *	only do so if the archive queue is empty after locking the set
182  *	and draining the message queue.
183  */
184 int
ipc_finish(bool force)185 ipc_finish(bool force)
186 {
187 	if (ipc_creator)
188 	{
189 		if (!force)
190 		{
191 			/*
192 			 * We are the creator of the semaphore set, so if this isn't a
193 			 * force operation, we lock it first, poll the message queue and
194 			 * check that we have an empty queue.
195 			 */
196 			if (ipc_lock() < 0)
197 			{
198 				fprintf(stderr, "semop() failed in ipc_finish(): %s\n",
199 						strerror(errno));
200 				return -1;
201 			}
202 
203 			if (ipc_poll(false) < 0)
204 				return -1;
205 
206 			if (archive_queue_head != NULL)
207 			{
208 				if (ipc_unlock() < 0)
209 				{
210 					fprintf(stderr, "semop() failed in ipc_finish(): %s\n",
211 							strerror(errno));
212 					return -1;
213 				}
214 				return 1;
215 			}
216 		}
217 
218 		/*
219 		 * At this point, we are either forced to stop or we have a lock and
220 		 * the queue is empty.
221 		 */
222 		if (msgctl(msgid, IPC_RMID, NULL) < 0)
223 		{
224 			fprintf(stderr, "msgctl() failed in ipc_finish(): %s\n",
225 					strerror(errno));
226 			semctl(semid, 0, IPC_RMID);
227 			return -1;
228 		}
229 
230 		if (semctl(semid, 0, IPC_RMID) < 0)
231 		{
232 			fprintf(stderr, "semctl() failed in ipc_finish(): %s\n",
233 					strerror(errno));
234 			return -1;
235 		}
236 	}
237 
238 	return 0;
239 }
240 
241 
242 /*
243  * ipc_poll() -
244  *
245  *	Check for incoming messages
246  */
247 int
ipc_poll(bool blocking)248 ipc_poll(bool blocking)
249 {
250 	int			rc;
251 	struct
252 	{
253 		long		mtype;
254 		char		mtext[MSGMAX];
255 	}			msg;
256 
257 	while (true)
258 	{
259 		rc = msgrcv(msgid, &msg, sizeof(msg), 0,
260 					(blocking) ? 0 : IPC_NOWAIT);
261 		if (rc < 0)
262 		{
263 			if (errno == ENOMSG)
264 				return 0;
265 
266 			fprintf(stderr, "msgrcv() failed in ipc_poll(): %s\n",
267 					strerror(errno));
268 			return -1;
269 		}
270 
271 		if (msg.mtype == 2)
272 			shutdown_smart_requested = true;
273 		else if (msg.mtype == 3)
274 			shutdown_immed_requested = true;
275 		else if (msg.mtype == 4)
276 			wait_for_resume = false;
277 		else if (msg.mtype == 5)
278 			logfile_switch_requested = true;
279 		else if (ipc_add_path(msg.mtext) < 0)
280 			return -1;
281 
282 		if (blocking)
283 			break;
284 	}
285 
286 	return 0;
287 }
288 
289 
290 /*
291  * ipc_add_path() -
292  *
293  *	Add an archive path to the sorted queue
294  */
295 static int
ipc_add_path(char * path)296 ipc_add_path(char *path)
297 {
298 	queue_elem **elemp;
299 	queue_elem *elem;
300 
301 	if ((elem = (queue_elem *) malloc(sizeof(queue_elem))) == NULL)
302 	{
303 		fprintf(stderr, "out of memory in ipc_add_path()\n");
304 		return -1;
305 	}
306 	if ((elem->archive_path = strdup(path)) == NULL)
307 	{
308 		fprintf(stderr, "out of memory in ipc_add_path()\n");
309 		return -1;
310 	}
311 	elem->next = NULL;
312 
313 	/*
314 	 * See if we have to insert it in front of something else
315 	 */
316 	for (elemp = &archive_queue_head; *elemp != NULL; elemp = &((*elemp)->next))
317 	{
318 		if (strcmp(elem->archive_path, (*elemp)->archive_path) < 0)
319 		{
320 			elem->next = *elemp;
321 			*elemp = elem;
322 			return 0;
323 		}
324 	}
325 
326 
327 	if (archive_queue_head == NULL)
328 	{
329 		archive_queue_head = elem;
330 		archive_queue_tail = elem;
331 	}
332 	else
333 	{
334 		archive_queue_tail->next = elem;
335 		archive_queue_tail = elem;
336 	}
337 
338 	return 0;
339 }
340 
341 
342 /*
343  * ipc_send_path() -
344  *
345  *	Add an archive path to the daemons archive queue. Done by calling
346  *	ipc_add_path() or sending the path to the daemon.
347  */
348 int
ipc_send_path(char * logfname)349 ipc_send_path(char *logfname)
350 {
351 	struct
352 	{
353 		long		mtype;
354 		char		mtext[MSGMAX];
355 	}			msg;
356 
357 	if (strlen(logfname) > (MSGMAX - 1))
358 	{
359 		fprintf(stderr, "path exceeds MSGMAX: %s\n", logfname);
360 		return -1;
361 	}
362 
363 	/*
364 	 * As the creator, we are also the consumer, so we simply add the file to
365 	 * the queue.
366 	 */
367 	if (ipc_creator)
368 		return ipc_add_path(logfname);
369 
370 	msg.mtype = 1;
371 	strcpy(msg.mtext, logfname);
372 	if (msgsnd(msgid, &msg, strlen(logfname) + 1, 0) < 0)
373 	{
374 		fprintf(stderr, "msgsnd() in ipc_send_path() failed: %s\n",
375 				strerror(errno));
376 		return -1;
377 	}
378 
379 	return 0;
380 }
381 
382 
383 /*
384  * ipc_recv_path()
385  *
386  *	Get the next archive file to process from the queue.
387  */
388 int
ipc_recv_path(char * buf)389 ipc_recv_path(char *buf)
390 {
391 	queue_elem *elem;
392 	int			rc;
393 	struct
394 	{
395 		long		mtype;
396 		char		mtext[MSGMAX];
397 	}			msg;
398 
399 	while (true)
400 	{
401 		if (ipc_poll(false) < 0)
402 		{
403 			ipc_finish(true);
404 			return -1;
405 		}
406 
407 		/*
408 		 * If something requested an immediate shutdown, don't report any more
409 		 * logfiles back.
410 		 */
411 		if (shutdown_immed_requested)
412 		{
413 			ipc_finish(true);
414 			return 0;
415 		}
416 
417 		/*
418 		 * If a smart shutdown was requested, try to close the queue but don't
419 		 * force it.
420 		 */
421 		if (shutdown_smart_requested)
422 		{
423 			if ((rc = ipc_finish(false)) == 0)
424 			{
425 				return 0;
426 			}
427 			if (rc < 0)
428 			{
429 				ipc_finish(true);
430 				return -1;
431 			}
432 		}
433 
434 		/*
435 		 * If we have something in the queue, return that.
436 		 */
437 		if (archive_queue_head != NULL)
438 		{
439 			elem = archive_queue_head;
440 			archive_queue_head = archive_queue_head->next;
441 			if (archive_queue_head == NULL)
442 				archive_queue_tail = NULL;
443 
444 			strcpy(buf, elem->archive_path);
445 			free(elem->archive_path);
446 			free(elem);
447 			return 1;
448 		}
449 
450 		/*
451 		 * Receive one single message blocking for it.
452 		 */
453 		rc = msgrcv(msgid, &msg, sizeof(msg), 0, IPC_NOWAIT);
454 		if (rc < 0)
455 		{
456 			if (errno == ENOMSG)
457 				return -2;
458 			if (errno == EINTR)
459 				continue;
460 
461 			fprintf(stderr, "msgrcv() failed in ipc_recv_path(): %s\n",
462 					strerror(errno));
463 			ipc_finish(true);
464 			return -1;
465 		}
466 
467 		if (msg.mtype == 2)
468 			shutdown_smart_requested = true;
469 		else if (msg.mtype == 3)
470 			shutdown_immed_requested = true;
471 		else if (msg.mtype == 4)
472 			wait_for_resume = false;
473 		else if (msg.mtype == 5)
474 			logfile_switch_requested = true;
475 		else if (ipc_add_path(msg.mtext) < 0)
476 		{
477 			ipc_finish(true);
478 			return -1;
479 		}
480 	}
481 }
482 
483 
484 /*
485  * ipc_send_term() -
486  *
487  *	Send a terminate request to the daemon.
488  */
489 int
ipc_send_term(char * archive_dir,bool immediate)490 ipc_send_term(char *archive_dir, bool immediate)
491 {
492 	int			rc;
493 
494 	rc = ipc_send_code(archive_dir, (immediate) ? 3 : 2);
495 	if (rc != 0)
496 		return rc;
497 
498 	return ipc_wait_for_destroy();
499 }
500 
501 
502 /*
503  * ipc_send_logswitch() -
504  *
505  *	Send a logswitch code to the daemon.
506  */
507 int
ipc_send_logswitch(char * archive_dir)508 ipc_send_logswitch(char *archive_dir)
509 {
510 	return ipc_send_code(archive_dir, 5);
511 }
512 
513 
514 /*
515  * ipc_send_resume() -
516  *
517  *	Send a resume (after error) code to the daemon.
518  */
519 int
ipc_send_resume(char * archive_dir)520 ipc_send_resume(char *archive_dir)
521 {
522 	return ipc_send_code(archive_dir, 4);
523 }
524 
525 
526 /*
527  * ipc_send_code() -
528  *
529  *	Support function for ipc_send_term() and ipc_send_resume().
530  */
531 static int
ipc_send_code(char * archive_dir,int code)532 ipc_send_code(char *archive_dir, int code)
533 {
534 	struct
535 	{
536 		long		mtype;
537 		char		mtext[1];
538 	}			msg;
539 
540 	if (ipc_generate_keys(archive_dir) < 0)
541 		return -1;
542 
543 	if ((semid = semget(semkey, 0, 0)) < 0)
544 	{
545 		if (!opt_quiet)
546 			fprintf(stderr, "no logshipper daemon running\n");
547 		return 2;
548 	}
549 
550 	if (ipc_lock() < 0)
551 		return -1;
552 
553 	if ((msgid = msgget(msgkey, 0)) < 0)
554 	{
555 		fprintf(stderr, "msgget() failed in ipc_send_code(): %s\n",
556 				strerror(errno));
557 		ipc_unlock();
558 		return -1;
559 	}
560 
561 	msg.mtype = (long) code;
562 	if (msgsnd(msgid, &msg, 0, 0) < 0)
563 	{
564 		fprintf(stderr, "msgsnd() failed in ipc_send_code(): %s\n",
565 				strerror(errno));
566 		ipc_unlock();
567 		return -1;
568 	}
569 
570 	return ipc_unlock();
571 }
572 
573 
574 /*
575  * ipc_lock()
576  *
577  *	Lock the semaphore.
578  */
579 int
ipc_lock(void)580 ipc_lock(void)
581 {
582 	struct sembuf sops[1] = {{0, -1, 0}};
583 
584 	if (semop(semid, sops, 1) < 0)
585 	{
586 		fprintf(stderr, "semop() failed in ipc_lock(): %s\n",
587 				strerror(errno));
588 		return -1;
589 	}
590 	return 0;
591 }
592 
593 
594 /*
595  * ipc_unlock()
596  *
597  *	Unlock the semaphore.
598  */
599 int
ipc_unlock(void)600 ipc_unlock(void)
601 {
602 	struct sembuf sops[1] = {{0, 1, 0}};
603 
604 	if (semop(semid, sops, 1) < 0)
605 	{
606 		fprintf(stderr, "semop() failed in ipc_lock(): %s\n",
607 				strerror(errno));
608 		return -1;
609 	}
610 	return 0;
611 }
612 
613 
614 /*
615  * ipc_wait_for_destroy()
616  *
617  *	Wait until the semaphore set is destroyed.
618  */
619 int
ipc_wait_for_destroy(void)620 ipc_wait_for_destroy(void)
621 {
622 	struct sembuf sops[1] = {{2, -1, 0}};
623 
624 	if (semop(semid, sops, 1) < 0)
625 	{
626 		if (errno != EIDRM && errno != EINVAL)
627 		{
628 			fprintf(stderr, "semop() failed in ipc_wait_for_destroy(): %s\n",
629 					strerror(errno));
630 			return -1;
631 		}
632 	}
633 	return 0;
634 }
635 
636 
637 /*
638  * ipc_cleanup() -
639  *
640  *	Destroy the semaphore set and message queue. Use with caution, this
641  *	code does not check if there is a daemon running.
642  */
643 int
ipc_cleanup(char * archive_dir)644 ipc_cleanup(char *archive_dir)
645 {
646 	int			rc = 0;
647 
648 	if (ipc_generate_keys(archive_dir) < 0)
649 		return -1;
650 
651 	if ((semid = semget(semkey, 0, 0)) >= 0)
652 	{
653 		if (semctl(semid, 0, IPC_RMID) < 0)
654 		{
655 			fprintf(stderr, "semctl() failed in ipc_cleanup(): %s\n",
656 					strerror(errno));
657 			rc = -1;
658 		}
659 		else if (!opt_quiet)
660 			fprintf(stderr, "semaphore set removed\n");
661 		rc = 1;
662 	}
663 	else if (!opt_quiet)
664 		fprintf(stderr, "no semaphore set found\n");
665 
666 	if ((msgid = msgget(msgkey, 0)) >= 0)
667 	{
668 		if (msgctl(msgid, IPC_RMID, NULL) < 0)
669 		{
670 			fprintf(stderr, "msgctl() failed in ipc_cleanup(): %s\n",
671 					strerror(errno));
672 			rc = -1;
673 		}
674 		else if (!opt_quiet)
675 			fprintf(stderr, "message queue removed\n");
676 		if (rc >= 0)
677 			rc |= 2;
678 	}
679 	else if (!opt_quiet)
680 		fprintf(stderr, "no message queue found\n");
681 
682 	return rc;
683 }
684 
685 
686 /*
687  * ipc_set_shutdown_smart() -
688  *
689  *	Put the queue into smart shutdown mode. This will cause
690  *	ipc_recv_path() to return 0 once the queue is empty.
691  */
692 void
ipc_set_shutdown_smart(void)693 ipc_set_shutdown_smart(void)
694 {
695 	shutdown_smart_requested = true;
696 }
697 
698 
699 /*
700  * ipc_set_shutdown_immed() -
701  *
702  *	Put the queue into immediate shutdown mode. This will cause
703  *	ipc_recv_path() to return 0 at the next call.
704  */
705 void
ipc_set_shutdown_immed(void)706 ipc_set_shutdown_immed(void)
707 {
708 	shutdown_immed_requested = true;
709 }
710 
711 
712 /*
713  * ipc_generate_keys() -
714  *
715  *	Generate the semkey and msgkey used for the Sys-V IPC objects.
716  */
717 static int
ipc_generate_keys(char * archive_dir)718 ipc_generate_keys(char *archive_dir)
719 {
720 	if (ipc_archive_dir != NULL)
721 	{
722 		free(ipc_archive_dir);
723 		ipc_archive_dir = NULL;
724 	}
725 
726 	/* ----
727 	 * Compute the two IPC keys used for the semaphore set and the
728 	 * message queue.
729 	 * ----
730 	 */
731 	semkey = ftok(archive_dir, 64);
732 	if (semkey < 0)
733 	{
734 		fprintf(stderr, "ftok(%s, 64): %s\n", archive_dir, strerror(errno));
735 		return -1;
736 	}
737 	msgkey = ftok(archive_dir, 65);
738 	if (msgkey < 0)
739 	{
740 		fprintf(stderr, "ftok(%s, 65): %s\n", archive_dir, strerror(errno));
741 		return -1;
742 	}
743 
744 	ipc_archive_dir = strdup(archive_dir);
745 	if (ipc_archive_dir == NULL)
746 	{
747 		fprintf(stderr, "out of memory in ipc_generate_keys()\n");
748 		return -1;
749 	}
750 
751 	return 0;
752 }
753 
754 
755 /*
756  * ipc_sighandler()
757  *
758  *	Called on SIGINT and SIGTERM. Puts the daemon into immediate
759  *	shutdown mode by sending ourself a -T message.
760  */
761 static void
ipc_sighandler(int sig)762 ipc_sighandler(int sig)
763 {
764 	struct
765 	{
766 		long		mtype;
767 		char		mtext[1];
768 	}			msg;
769 
770 	msg.mtype = 3;
771 	msgsnd(msgid, &msg, 0, 0);
772 }
773