1 /*-------------------------------------------------------------------------
2 * ipcutil.c
3 *
4 * Semaphore and message passing support for the slony_logshipper.
5 *
6 * Copyright (c) 2003-2009, PostgreSQL Global Development Group
7 * Author: Jan Wieck, Afilias USA INC.
8 *
9 *
10 *-------------------------------------------------------------------------
11 */
12
13
14 #ifndef WIN32
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <stdarg.h>
18 #include <unistd.h>
19 #include <sys/types.h>
20 #include <sys/ipc.h>
21 #include <sys/sem.h>
22 #include <sys/msg.h>
23 #include <sys/wait.h>
24 #include <errno.h>
25 #include <signal.h>
26 #include <string.h>
27 #endif
28 #include "../slonik/types.h"
29 #include "libpq-fe.h"
30
31 #include "slony_logshipper.h"
32
33
34 /*
35 * The daemonized logshipper keeps a sorted queue of archive
36 * files that need processing.
37 */
38 typedef struct queue_elem_s
39 {
40 char *archive_path;
41 struct queue_elem_s *next;
42 } queue_elem;
43
44
45 /*
46 * Static data
47 */
48 static char *ipc_archive_dir = NULL;
49 static key_t semkey;
50 static key_t msgkey;
51 static int semid;
52 static int msgid;
53 static int ipc_creator;
54 static queue_elem *archive_queue_head = NULL;
55 static queue_elem *archive_queue_tail = NULL;
56
57
58 /*
59 * Local functions
60 */
61 static int ipc_generate_keys(char *archive_dir);
62 static void ipc_sighandler(int sig);
63 static int ipc_add_path(char *path);
64 static int ipc_send_code(char *archive_dir, int code);
65
66
67 /*
68 * ipc_init() -
69 *
70 * Called to setup or connect to the semaphore set and message queue.
71 */
72 int
ipc_init(char * archive_dir)73 ipc_init(char *archive_dir)
74 {
75 struct sembuf sops[2];
76
77 if (ipc_generate_keys(archive_dir) < 0)
78 return -1;
79
80 /*
81 * We eventually have to start over again in case the existing daemon
82 * destroys the semaphore set after we attached and before we can lock it.
83 */
84 while (true)
85 {
86 /*
87 * Create or attach to the semaphore set
88 */
89 semid = semget(semkey, 3, 0700 | IPC_CREAT);
90 if (semid < 0)
91 {
92 fprintf(stderr, "cannot create or attache to semaphore set\n"
93 "semget(): %s\n", strerror(errno));
94 return -1;
95 }
96
97 /*
98 * We now do two initial operations with NOWAIT: wait for #1 =0 inc
99 * sem #1 +1 We never again touch semaphore #1, so this either
100 * succeeds, meaning that we created the set and hold the current
101 * lock. Or it fails with EAGAIN, meaning we attached to an existing
102 * set. Or it fails with EIDRM, meaning the set was destroyed.
103 */
104 sops[0].sem_num = 1;
105 sops[0].sem_op = 0;
106 sops[0].sem_flg = IPC_NOWAIT;
107 sops[1].sem_num = 1;
108 sops[1].sem_op = 1;
109 sops[1].sem_flg = 0;
110
111 if (semop(semid, sops, 2) < 0)
112 {
113 if (errno == EIDRM)
114 continue;
115
116 if (errno != EAGAIN)
117 {
118 fprintf(stderr, "semop failed in ipc_init(): %s",
119 strerror(errno));
120 return -1;
121 }
122
123 /*
124 * Grab the lock on semaphore #0
125 */
126 if (ipc_lock() < 0)
127 {
128 /*
129 * Since theres a gap between attaching and locking, the set
130 * could have been destroyed. In that case, start over.
131 */
132 if (errno == EIDRM)
133 continue;
134
135 fprintf(stderr, "semop() failed in ipc_init(): %s\n",
136 strerror(errno));
137 return -1;
138 }
139
140 ipc_creator = 0;
141 }
142 else
143 {
144 /*
145 * Initial semop succeeded - we are the creator of this set.
146 */
147 ipc_creator = 1;
148 signal(SIGINT, ipc_sighandler);
149 signal(SIGTERM, ipc_sighandler);
150 signal(SIGPIPE, ipc_sighandler);
151 }
152
153 break;
154 }
155
156 /*
157 * At this point we have the semaphore set and it is locked.
158 */
159 msgid = msgget(msgkey, 0700 | IPC_CREAT);
160 if (msgid < 0)
161 {
162 fprintf(stderr, "msgget() failed in ipc_init(): %s\n", strerror(errno));
163 if (ipc_creator)
164 {
165 if (semctl(semid, 0, IPC_RMID) < 0)
166 fprintf(stderr, "semctl() failed in ipc_init(): %s\n",
167 strerror(errno));
168 }
169 return -1;
170 }
171
172 return ipc_creator;
173 }
174
175
176 /*
177 * ipc_finish() -
178 *
179 * Locks and destroys the semaphore set and message queue if called
180 * in the daemonized queue runner. If force isn't given, it will
181 * only do so if the archive queue is empty after locking the set
182 * and draining the message queue.
183 */
184 int
ipc_finish(bool force)185 ipc_finish(bool force)
186 {
187 if (ipc_creator)
188 {
189 if (!force)
190 {
191 /*
192 * We are the creator of the semaphore set, so if this isn't a
193 * force operation, we lock it first, poll the message queue and
194 * check that we have an empty queue.
195 */
196 if (ipc_lock() < 0)
197 {
198 fprintf(stderr, "semop() failed in ipc_finish(): %s\n",
199 strerror(errno));
200 return -1;
201 }
202
203 if (ipc_poll(false) < 0)
204 return -1;
205
206 if (archive_queue_head != NULL)
207 {
208 if (ipc_unlock() < 0)
209 {
210 fprintf(stderr, "semop() failed in ipc_finish(): %s\n",
211 strerror(errno));
212 return -1;
213 }
214 return 1;
215 }
216 }
217
218 /*
219 * At this point, we are either forced to stop or we have a lock and
220 * the queue is empty.
221 */
222 if (msgctl(msgid, IPC_RMID, NULL) < 0)
223 {
224 fprintf(stderr, "msgctl() failed in ipc_finish(): %s\n",
225 strerror(errno));
226 semctl(semid, 0, IPC_RMID);
227 return -1;
228 }
229
230 if (semctl(semid, 0, IPC_RMID) < 0)
231 {
232 fprintf(stderr, "semctl() failed in ipc_finish(): %s\n",
233 strerror(errno));
234 return -1;
235 }
236 }
237
238 return 0;
239 }
240
241
242 /*
243 * ipc_poll() -
244 *
245 * Check for incoming messages
246 */
247 int
ipc_poll(bool blocking)248 ipc_poll(bool blocking)
249 {
250 int rc;
251 struct
252 {
253 long mtype;
254 char mtext[MSGMAX];
255 } msg;
256
257 while (true)
258 {
259 rc = msgrcv(msgid, &msg, sizeof(msg), 0,
260 (blocking) ? 0 : IPC_NOWAIT);
261 if (rc < 0)
262 {
263 if (errno == ENOMSG)
264 return 0;
265
266 fprintf(stderr, "msgrcv() failed in ipc_poll(): %s\n",
267 strerror(errno));
268 return -1;
269 }
270
271 if (msg.mtype == 2)
272 shutdown_smart_requested = true;
273 else if (msg.mtype == 3)
274 shutdown_immed_requested = true;
275 else if (msg.mtype == 4)
276 wait_for_resume = false;
277 else if (msg.mtype == 5)
278 logfile_switch_requested = true;
279 else if (ipc_add_path(msg.mtext) < 0)
280 return -1;
281
282 if (blocking)
283 break;
284 }
285
286 return 0;
287 }
288
289
290 /*
291 * ipc_add_path() -
292 *
293 * Add an archive path to the sorted queue
294 */
295 static int
ipc_add_path(char * path)296 ipc_add_path(char *path)
297 {
298 queue_elem **elemp;
299 queue_elem *elem;
300
301 if ((elem = (queue_elem *) malloc(sizeof(queue_elem))) == NULL)
302 {
303 fprintf(stderr, "out of memory in ipc_add_path()\n");
304 return -1;
305 }
306 if ((elem->archive_path = strdup(path)) == NULL)
307 {
308 fprintf(stderr, "out of memory in ipc_add_path()\n");
309 return -1;
310 }
311 elem->next = NULL;
312
313 /*
314 * See if we have to insert it in front of something else
315 */
316 for (elemp = &archive_queue_head; *elemp != NULL; elemp = &((*elemp)->next))
317 {
318 if (strcmp(elem->archive_path, (*elemp)->archive_path) < 0)
319 {
320 elem->next = *elemp;
321 *elemp = elem;
322 return 0;
323 }
324 }
325
326
327 if (archive_queue_head == NULL)
328 {
329 archive_queue_head = elem;
330 archive_queue_tail = elem;
331 }
332 else
333 {
334 archive_queue_tail->next = elem;
335 archive_queue_tail = elem;
336 }
337
338 return 0;
339 }
340
341
342 /*
343 * ipc_send_path() -
344 *
345 * Add an archive path to the daemons archive queue. Done by calling
346 * ipc_add_path() or sending the path to the daemon.
347 */
348 int
ipc_send_path(char * logfname)349 ipc_send_path(char *logfname)
350 {
351 struct
352 {
353 long mtype;
354 char mtext[MSGMAX];
355 } msg;
356
357 if (strlen(logfname) > (MSGMAX - 1))
358 {
359 fprintf(stderr, "path exceeds MSGMAX: %s\n", logfname);
360 return -1;
361 }
362
363 /*
364 * As the creator, we are also the consumer, so we simply add the file to
365 * the queue.
366 */
367 if (ipc_creator)
368 return ipc_add_path(logfname);
369
370 msg.mtype = 1;
371 strcpy(msg.mtext, logfname);
372 if (msgsnd(msgid, &msg, strlen(logfname) + 1, 0) < 0)
373 {
374 fprintf(stderr, "msgsnd() in ipc_send_path() failed: %s\n",
375 strerror(errno));
376 return -1;
377 }
378
379 return 0;
380 }
381
382
383 /*
384 * ipc_recv_path()
385 *
386 * Get the next archive file to process from the queue.
387 */
388 int
ipc_recv_path(char * buf)389 ipc_recv_path(char *buf)
390 {
391 queue_elem *elem;
392 int rc;
393 struct
394 {
395 long mtype;
396 char mtext[MSGMAX];
397 } msg;
398
399 while (true)
400 {
401 if (ipc_poll(false) < 0)
402 {
403 ipc_finish(true);
404 return -1;
405 }
406
407 /*
408 * If something requested an immediate shutdown, don't report any more
409 * logfiles back.
410 */
411 if (shutdown_immed_requested)
412 {
413 ipc_finish(true);
414 return 0;
415 }
416
417 /*
418 * If a smart shutdown was requested, try to close the queue but don't
419 * force it.
420 */
421 if (shutdown_smart_requested)
422 {
423 if ((rc = ipc_finish(false)) == 0)
424 {
425 return 0;
426 }
427 if (rc < 0)
428 {
429 ipc_finish(true);
430 return -1;
431 }
432 }
433
434 /*
435 * If we have something in the queue, return that.
436 */
437 if (archive_queue_head != NULL)
438 {
439 elem = archive_queue_head;
440 archive_queue_head = archive_queue_head->next;
441 if (archive_queue_head == NULL)
442 archive_queue_tail = NULL;
443
444 strcpy(buf, elem->archive_path);
445 free(elem->archive_path);
446 free(elem);
447 return 1;
448 }
449
450 /*
451 * Receive one single message blocking for it.
452 */
453 rc = msgrcv(msgid, &msg, sizeof(msg), 0, IPC_NOWAIT);
454 if (rc < 0)
455 {
456 if (errno == ENOMSG)
457 return -2;
458 if (errno == EINTR)
459 continue;
460
461 fprintf(stderr, "msgrcv() failed in ipc_recv_path(): %s\n",
462 strerror(errno));
463 ipc_finish(true);
464 return -1;
465 }
466
467 if (msg.mtype == 2)
468 shutdown_smart_requested = true;
469 else if (msg.mtype == 3)
470 shutdown_immed_requested = true;
471 else if (msg.mtype == 4)
472 wait_for_resume = false;
473 else if (msg.mtype == 5)
474 logfile_switch_requested = true;
475 else if (ipc_add_path(msg.mtext) < 0)
476 {
477 ipc_finish(true);
478 return -1;
479 }
480 }
481 }
482
483
484 /*
485 * ipc_send_term() -
486 *
487 * Send a terminate request to the daemon.
488 */
489 int
ipc_send_term(char * archive_dir,bool immediate)490 ipc_send_term(char *archive_dir, bool immediate)
491 {
492 int rc;
493
494 rc = ipc_send_code(archive_dir, (immediate) ? 3 : 2);
495 if (rc != 0)
496 return rc;
497
498 return ipc_wait_for_destroy();
499 }
500
501
502 /*
503 * ipc_send_logswitch() -
504 *
505 * Send a logswitch code to the daemon.
506 */
507 int
ipc_send_logswitch(char * archive_dir)508 ipc_send_logswitch(char *archive_dir)
509 {
510 return ipc_send_code(archive_dir, 5);
511 }
512
513
514 /*
515 * ipc_send_resume() -
516 *
517 * Send a resume (after error) code to the daemon.
518 */
519 int
ipc_send_resume(char * archive_dir)520 ipc_send_resume(char *archive_dir)
521 {
522 return ipc_send_code(archive_dir, 4);
523 }
524
525
526 /*
527 * ipc_send_code() -
528 *
529 * Support function for ipc_send_term() and ipc_send_resume().
530 */
531 static int
ipc_send_code(char * archive_dir,int code)532 ipc_send_code(char *archive_dir, int code)
533 {
534 struct
535 {
536 long mtype;
537 char mtext[1];
538 } msg;
539
540 if (ipc_generate_keys(archive_dir) < 0)
541 return -1;
542
543 if ((semid = semget(semkey, 0, 0)) < 0)
544 {
545 if (!opt_quiet)
546 fprintf(stderr, "no logshipper daemon running\n");
547 return 2;
548 }
549
550 if (ipc_lock() < 0)
551 return -1;
552
553 if ((msgid = msgget(msgkey, 0)) < 0)
554 {
555 fprintf(stderr, "msgget() failed in ipc_send_code(): %s\n",
556 strerror(errno));
557 ipc_unlock();
558 return -1;
559 }
560
561 msg.mtype = (long) code;
562 if (msgsnd(msgid, &msg, 0, 0) < 0)
563 {
564 fprintf(stderr, "msgsnd() failed in ipc_send_code(): %s\n",
565 strerror(errno));
566 ipc_unlock();
567 return -1;
568 }
569
570 return ipc_unlock();
571 }
572
573
574 /*
575 * ipc_lock()
576 *
577 * Lock the semaphore.
578 */
579 int
ipc_lock(void)580 ipc_lock(void)
581 {
582 struct sembuf sops[1] = {{0, -1, 0}};
583
584 if (semop(semid, sops, 1) < 0)
585 {
586 fprintf(stderr, "semop() failed in ipc_lock(): %s\n",
587 strerror(errno));
588 return -1;
589 }
590 return 0;
591 }
592
593
594 /*
595 * ipc_unlock()
596 *
597 * Unlock the semaphore.
598 */
599 int
ipc_unlock(void)600 ipc_unlock(void)
601 {
602 struct sembuf sops[1] = {{0, 1, 0}};
603
604 if (semop(semid, sops, 1) < 0)
605 {
606 fprintf(stderr, "semop() failed in ipc_lock(): %s\n",
607 strerror(errno));
608 return -1;
609 }
610 return 0;
611 }
612
613
614 /*
615 * ipc_wait_for_destroy()
616 *
617 * Wait until the semaphore set is destroyed.
618 */
619 int
ipc_wait_for_destroy(void)620 ipc_wait_for_destroy(void)
621 {
622 struct sembuf sops[1] = {{2, -1, 0}};
623
624 if (semop(semid, sops, 1) < 0)
625 {
626 if (errno != EIDRM && errno != EINVAL)
627 {
628 fprintf(stderr, "semop() failed in ipc_wait_for_destroy(): %s\n",
629 strerror(errno));
630 return -1;
631 }
632 }
633 return 0;
634 }
635
636
637 /*
638 * ipc_cleanup() -
639 *
640 * Destroy the semaphore set and message queue. Use with caution, this
641 * code does not check if there is a daemon running.
642 */
643 int
ipc_cleanup(char * archive_dir)644 ipc_cleanup(char *archive_dir)
645 {
646 int rc = 0;
647
648 if (ipc_generate_keys(archive_dir) < 0)
649 return -1;
650
651 if ((semid = semget(semkey, 0, 0)) >= 0)
652 {
653 if (semctl(semid, 0, IPC_RMID) < 0)
654 {
655 fprintf(stderr, "semctl() failed in ipc_cleanup(): %s\n",
656 strerror(errno));
657 rc = -1;
658 }
659 else if (!opt_quiet)
660 fprintf(stderr, "semaphore set removed\n");
661 rc = 1;
662 }
663 else if (!opt_quiet)
664 fprintf(stderr, "no semaphore set found\n");
665
666 if ((msgid = msgget(msgkey, 0)) >= 0)
667 {
668 if (msgctl(msgid, IPC_RMID, NULL) < 0)
669 {
670 fprintf(stderr, "msgctl() failed in ipc_cleanup(): %s\n",
671 strerror(errno));
672 rc = -1;
673 }
674 else if (!opt_quiet)
675 fprintf(stderr, "message queue removed\n");
676 if (rc >= 0)
677 rc |= 2;
678 }
679 else if (!opt_quiet)
680 fprintf(stderr, "no message queue found\n");
681
682 return rc;
683 }
684
685
686 /*
687 * ipc_set_shutdown_smart() -
688 *
689 * Put the queue into smart shutdown mode. This will cause
690 * ipc_recv_path() to return 0 once the queue is empty.
691 */
692 void
ipc_set_shutdown_smart(void)693 ipc_set_shutdown_smart(void)
694 {
695 shutdown_smart_requested = true;
696 }
697
698
699 /*
700 * ipc_set_shutdown_immed() -
701 *
702 * Put the queue into immediate shutdown mode. This will cause
703 * ipc_recv_path() to return 0 at the next call.
704 */
705 void
ipc_set_shutdown_immed(void)706 ipc_set_shutdown_immed(void)
707 {
708 shutdown_immed_requested = true;
709 }
710
711
712 /*
713 * ipc_generate_keys() -
714 *
715 * Generate the semkey and msgkey used for the Sys-V IPC objects.
716 */
717 static int
ipc_generate_keys(char * archive_dir)718 ipc_generate_keys(char *archive_dir)
719 {
720 if (ipc_archive_dir != NULL)
721 {
722 free(ipc_archive_dir);
723 ipc_archive_dir = NULL;
724 }
725
726 /* ----
727 * Compute the two IPC keys used for the semaphore set and the
728 * message queue.
729 * ----
730 */
731 semkey = ftok(archive_dir, 64);
732 if (semkey < 0)
733 {
734 fprintf(stderr, "ftok(%s, 64): %s\n", archive_dir, strerror(errno));
735 return -1;
736 }
737 msgkey = ftok(archive_dir, 65);
738 if (msgkey < 0)
739 {
740 fprintf(stderr, "ftok(%s, 65): %s\n", archive_dir, strerror(errno));
741 return -1;
742 }
743
744 ipc_archive_dir = strdup(archive_dir);
745 if (ipc_archive_dir == NULL)
746 {
747 fprintf(stderr, "out of memory in ipc_generate_keys()\n");
748 return -1;
749 }
750
751 return 0;
752 }
753
754
755 /*
756 * ipc_sighandler()
757 *
758 * Called on SIGINT and SIGTERM. Puts the daemon into immediate
759 * shutdown mode by sending ourself a -T message.
760 */
761 static void
ipc_sighandler(int sig)762 ipc_sighandler(int sig)
763 {
764 struct
765 {
766 long mtype;
767 char mtext[1];
768 } msg;
769
770 msg.mtype = 3;
771 msgsnd(msgid, &msg, 0, 0);
772 }
773