1 /*-------------------------------------------------------------------------
2 * runtime_config.c
3 *
4 * Functions maintaining the in-memory configuration information
5 *
6 * Copyright (c) 2003-2009, PostgreSQL Global Development Group
7 * Author: Jan Wieck, Afilias USA INC.
8 *
9 *
10 *-------------------------------------------------------------------------
11 */
12
13
14 #include <pthread.h>
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <stdarg.h>
19 #ifndef WIN32
20 #include <unistd.h>
21 #include <sys/time.h>
22 #endif
23 #include <string.h>
24 #include <errno.h>
25 #include <signal.h>
26 #include <sys/types.h>
27
28 #include "slon.h"
29
30
31 /* ----------
32 * Global data
33 * ----------
34 */
35 pid_t slon_pid;
36
37 #ifndef WIN32
38 pthread_mutex_t slon_watchdog_lock;
39 pid_t slon_watchdog_pid;
40 pid_t slon_worker_pid;
41 #endif
42 char *rtcfg_cluster_name = NULL;
43 char *rtcfg_namespace = NULL;
44 char *rtcfg_conninfo = NULL;
45 int rtcfg_nodeid = -1;
46 int rtcfg_nodeactive = 0;
47 char *rtcfg_nodecomment = NULL;
48 char rtcfg_lastevent[64];
49
50 SlonSet *rtcfg_set_list_head = NULL;
51 SlonSet *rtcfg_set_list_tail = NULL;
52 SlonNode *rtcfg_node_list_head = NULL;
53 SlonNode *rtcfg_node_list_tail = NULL;
54
55
56 /* ----------
57 * Local data
58 * ----------
59 */
60 static pthread_mutex_t config_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_mutex_t cfgseq_lock = PTHREAD_MUTEX_INITIALIZER;
62 static int64 cfgseq = 0;
63
64 struct to_activate
65 {
66 int no_id;
67
68 struct to_activate *prev;
69 struct to_activate *next;
70 };
71 static struct to_activate *to_activate_head = NULL;
72 static struct to_activate *to_activate_tail = NULL;
73
74
75 /* ----------
76 * Local functions
77 * ----------
78 */
79 static void rtcfg_startStopNodeThread(SlonNode * node);
80
81
82 /* ----------
83 * rtcfg_lock
84 * ----------
85 */
86 void
rtcfg_lock(void)87 rtcfg_lock(void)
88 {
89 pthread_mutex_lock(&config_lock);
90 }
91
92
93 /* ----------
94 * rtcfg_unlock
95 * ----------
96 */
97 void
rtcfg_unlock(void)98 rtcfg_unlock(void)
99 {
100 pthread_mutex_unlock(&config_lock);
101 }
102
103
104 /* ----------
105 * rtcfg_storeNode
106 * ----------
107 */
108 void
rtcfg_storeNode(int no_id,char * no_comment)109 rtcfg_storeNode(int no_id, char *no_comment)
110 {
111 SlonNode *node;
112
113 rtcfg_lock();
114
115 /*
116 * If we have that node already, just change the comment field.
117 */
118 node = rtcfg_findNode(no_id);
119 if (node)
120 {
121 slon_log(SLON_CONFIG,
122 "storeNode: no_id=%d no_comment='%s' - update node\n",
123 no_id, no_comment);
124
125 free(node->no_comment);
126 node->no_comment = strdup(no_comment);
127
128 rtcfg_unlock();
129 return;
130 }
131
132 /*
133 * Add the new node to our in-memory configuration.
134 */
135 slon_log(SLON_CONFIG,
136 "storeNode: no_id=%d no_comment='%s'\n",
137 no_id, no_comment);
138
139 node = (SlonNode *) malloc(sizeof(SlonNode));
140 if (node == NULL)
141 {
142 perror("rtcfg_storeNode: malloc()");
143 slon_retry();
144 }
145 memset(node, 0, sizeof(SlonNode));
146
147 node->no_id = no_id;
148 node->no_active = false;
149 node->no_comment = strdup(no_comment);
150 node->last_snapshot = strdup("1:1:");
151 pthread_mutex_init(&(node->message_lock), NULL);
152 pthread_cond_init(&(node->message_cond), NULL);
153
154 DLLIST_ADD_TAIL(rtcfg_node_list_head, rtcfg_node_list_tail, node);
155
156 rtcfg_unlock();
157 rtcfg_seq_bump();
158 }
159
160
161 /* ----------
162 * rtcfg_setNodeLastEvent()
163 *
164 * Set the last_event field in the node runtime structure.
165 *
166 * Returns: 0 if the event_seq is <= the known value -1 if the node is
167 * not known event_seq otherwise
168 * ----------
169 */
170 int64
rtcfg_setNodeLastEvent(int no_id,int64 event_seq)171 rtcfg_setNodeLastEvent(int no_id, int64 event_seq)
172 {
173 SlonNode *node;
174 int64 retval;
175
176 rtcfg_lock();
177 if ((node = rtcfg_findNode(no_id)) != NULL)
178 {
179 if (node->last_event < event_seq)
180 {
181 node->last_event = event_seq;
182 retval = event_seq;
183 }
184 else
185 retval = 0;
186 }
187 else
188 retval = -1;
189
190 rtcfg_unlock();
191
192 slon_log(SLON_DEBUG2,
193 "setNodeLastEvent: no_id=%d event_seq=" INT64_FORMAT "\n",
194 no_id, retval);
195
196 return retval;
197 }
198
199
200 /* ----------
201 * rtcfg_getNodeLastEvent
202 *
203 * Read the nodes last_event field
204 * ----------
205 */
206 int64
rtcfg_getNodeLastEvent(int no_id)207 rtcfg_getNodeLastEvent(int no_id)
208 {
209 SlonNode *node;
210 int64 retval;
211
212 rtcfg_lock();
213 if ((node = rtcfg_findNode(no_id)) != NULL)
214 {
215 retval = node->last_event;
216 }
217 else
218 retval = -1;
219
220 rtcfg_unlock();
221
222 return retval;
223 }
224
225
226 /* ----------
227 * rtcfg_setNodeLastSnapshot()
228 *
229 * Set the last_snapshot field in the node runtime structure.
230 * ----------
231 */
232 void
rtcfg_setNodeLastSnapshot(int no_id,char * snapshot)233 rtcfg_setNodeLastSnapshot(int no_id, char *snapshot)
234 {
235 SlonNode *node;
236
237 if (snapshot == NULL || strcmp(snapshot, "") == 0)
238 snapshot = "1:1:";
239
240 rtcfg_lock();
241 if ((node = rtcfg_findNode(no_id)) != NULL)
242 {
243 if (node->last_snapshot != NULL)
244 free(node->last_snapshot);
245
246 node->last_snapshot = strdup(snapshot);
247 }
248
249 rtcfg_unlock();
250
251 slon_log(SLON_DEBUG2,
252 "setNodeLastSnapshot: no_id=%d snapshot='%s'\n",
253 no_id, snapshot);
254 }
255
256
257 /* ----------
258 * rtcfg_getNodeLastSnapshot
259 *
260 * Read the nodes last_snapshot field
261 * ----------
262 */
263 char *
rtcfg_getNodeLastSnapshot(int no_id)264 rtcfg_getNodeLastSnapshot(int no_id)
265 {
266 SlonNode *node;
267 char *retval;
268
269 rtcfg_lock();
270 if ((node = rtcfg_findNode(no_id)) != NULL)
271 {
272 retval = node->last_snapshot;
273 }
274 else
275 retval = NULL;
276
277 rtcfg_unlock();
278
279 return retval;
280 }
281
282
283 /* ----------
284 * rtcfg_enableNode
285 * ----------
286 */
287 void
rtcfg_enableNode(int no_id)288 rtcfg_enableNode(int no_id)
289 {
290 SlonNode *node;
291
292 rtcfg_lock();
293
294 node = rtcfg_findNode(no_id);
295 if (!node)
296 {
297 rtcfg_unlock();
298
299 slon_log(SLON_FATAL,
300 "enableNode: unknown node ID %d\n", no_id);
301 slon_retry();
302 return;
303 }
304
305 /*
306 * Activate the node
307 */
308 slon_log(SLON_CONFIG,
309 "enableNode: no_id=%d\n", no_id);
310 node->no_active = true;
311
312 rtcfg_unlock();
313 rtcfg_seq_bump();
314
315 rtcfg_startStopNodeThread(node);
316 }
317
318
319 /* ----------
320 * slon_disableNode
321 * ----------
322 */
323 void
rtcfg_disableNode(int no_id)324 rtcfg_disableNode(int no_id)
325 {
326 SlonNode *node;
327
328 rtcfg_lock();
329
330 node = rtcfg_findNode(no_id);
331 if (!node)
332 {
333 rtcfg_unlock();
334
335 slon_log(SLON_FATAL,
336 "enableNode: unknown node ID %d\n", no_id);
337 slon_retry();
338 return;
339 }
340
341 /*
342 * Deactivate the node
343 */
344 slon_log(SLON_CONFIG,
345 "disableNode: no_id=%d\n", no_id);
346 node->no_active = false;
347
348 rtcfg_unlock();
349 rtcfg_seq_bump();
350
351 /*
352 * rtcfg_startStopNodeThread(node);
353 */
354 }
355
356
357 /* ----------
358 * rtcfg_findNode
359 * ----------
360 */
361 SlonNode *
rtcfg_findNode(int no_id)362 rtcfg_findNode(int no_id)
363 {
364 SlonNode *node;
365
366 for (node = rtcfg_node_list_head; node; node = node->next)
367 {
368 if (node->no_id == no_id)
369 return node;
370 }
371
372 return NULL;
373 }
374
375
376 /* ----------
377 * rtcfg_storePath
378 * ----------
379 */
380 void
rtcfg_storePath(int pa_server,char * pa_conninfo,int pa_connretry)381 rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry)
382 {
383 SlonNode *node;
384
385 rtcfg_lock();
386
387 node = rtcfg_findNode(pa_server);
388 if (!node)
389 {
390
391 rtcfg_unlock();
392
393 slon_log(SLON_WARN,
394 "storePath: unknown node ID %d - event pending\n", pa_server);
395 rtcfg_storeNode(pa_server, "<event pending>");
396
397 rtcfg_lock();
398 node = rtcfg_findNode(pa_server);
399 }
400
401 /*
402 * Store the (new) conninfo to the node
403 */
404 slon_log(SLON_CONFIG,
405 "storePath: pa_server=%d pa_client=%d "
406 "pa_conninfo=\"%s\" pa_connretry=%d\n",
407 pa_server, rtcfg_nodeid, pa_conninfo, pa_connretry);
408 if (node->pa_conninfo != NULL)
409 free(node->pa_conninfo);
410 node->pa_conninfo = strdup(pa_conninfo);
411 node->pa_connretry = pa_connretry;
412
413 rtcfg_unlock();
414 rtcfg_seq_bump();
415
416 /*
417 * Eventually start communicating with that node
418 */
419 rtcfg_startStopNodeThread(node);
420 }
421
422
423 /* ----------
424 * rtcfg_dropPath
425 * ----------
426 */
427 void
rtcfg_dropPath(int pa_server)428 rtcfg_dropPath(int pa_server)
429 {
430 SlonNode *node;
431 SlonListen *listen;
432
433 rtcfg_lock();
434
435 node = rtcfg_findNode(pa_server);
436 if (!node)
437 {
438
439 rtcfg_unlock();
440
441 slon_log(SLON_WARN,
442 "dropPath: unknown node ID %d\n", pa_server);
443
444 return;
445 }
446
447 /*
448 * Drop all listen information as well at this provider. Without a path we
449 * cannot listen.
450 */
451 while (node->listen_head != NULL)
452 {
453 listen = node->listen_head;
454
455 DLLIST_REMOVE(node->listen_head, node->listen_tail, listen);
456 free(listen);
457 }
458
459 /*
460 * Remove the conninfo.
461 */
462 slon_log(SLON_CONFIG,
463 "dropPath: pa_server=%d pa_client=%d\n",
464 pa_server, rtcfg_nodeid);
465 if (node->pa_conninfo != NULL)
466 free(node->pa_conninfo);
467 node->pa_conninfo = NULL;
468 node->pa_connretry = 0;
469
470 rtcfg_unlock();
471 rtcfg_seq_bump();
472
473 /*
474 * Eventually start communicating with that node
475 */
476 rtcfg_startStopNodeThread(node);
477 }
478
479
480 /* ----------
481 * rtcfg_storeListen
482 * ----------
483 */
484 void
rtcfg_reloadListen(PGconn * db)485 rtcfg_reloadListen(PGconn *db)
486 {
487 SlonDString query;
488 PGresult *res;
489 int i,
490 n;
491 SlonNode *node;
492 SlonListen *listen;
493
494 /*
495 * Clear out the entire Listen configuration
496 */
497 rtcfg_lock();
498 for (node = rtcfg_node_list_head; node; node = node->next)
499 {
500 while ((listen = node->listen_head) != NULL)
501 {
502 DLLIST_REMOVE(node->listen_head, node->listen_tail, listen);
503 free(listen);
504 }
505 }
506 rtcfg_unlock();
507 rtcfg_seq_bump();
508
509 /*
510 * Read configuration table sl_listen - the interesting pieces
511 */
512 dstring_init(&query);
513
514 slon_mkquery(&query,
515 "select li_origin, li_provider "
516 "from %s.sl_listen where li_receiver = %d",
517 rtcfg_namespace, rtcfg_nodeid);
518 res = PQexec(db, dstring_data(&query));
519 if (PQresultStatus(res) != PGRES_TUPLES_OK)
520 {
521 slon_log(SLON_FATAL, "Cannot get listen config - %s",
522 PQresultErrorMessage(res));
523 PQclear(res);
524 dstring_free(&query);
525 slon_retry();
526 }
527 for (i = 0, n = PQntuples(res); i < n; i++)
528 {
529 int li_origin = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
530 int li_provider = (int) strtol(PQgetvalue(res, i, 1), NULL, 10);
531
532 rtcfg_storeListen(li_origin, li_provider);
533 }
534 PQclear(res);
535
536 dstring_free(&query);
537
538 for (node = rtcfg_node_list_head; node; node = node->next)
539 {
540 rtcfg_startStopNodeThread(node);
541 }
542 }
543
544
545 /* ----------
546 * rtcfg_storeListen
547 * ----------
548 */
549 void
rtcfg_storeListen(int li_origin,int li_provider)550 rtcfg_storeListen(int li_origin, int li_provider)
551 {
552 SlonNode *node;
553 SlonListen *listen;
554
555 rtcfg_lock();
556
557 node = rtcfg_findNode(li_provider);
558 if (!node)
559 {
560 slon_log(SLON_FATAL,
561 "storeListen: unknown node ID %d\n", li_provider);
562 slon_retry();
563 return;
564 }
565
566 /*
567 * Check if we already listen for events from that origin at this
568 * provider.
569 */
570 for (listen = node->listen_head; listen; listen = listen->next)
571 {
572 if (listen->li_origin == li_origin)
573 {
574 slon_log(SLON_DEBUG2,
575 "storeListen: li_origin=%d li_receiver=%d "
576 "li_provider=%d - already listening\n",
577 li_origin, rtcfg_nodeid, li_provider);
578 rtcfg_unlock();
579 return;
580 }
581 }
582
583 /*
584 * Add the new event origin to the provider (this node)
585 */
586 slon_log(SLON_CONFIG,
587 "storeListen: li_origin=%d li_receiver=%d li_provider=%d\n",
588 li_origin, rtcfg_nodeid, li_provider);
589
590 listen = (SlonListen *) malloc(sizeof(SlonListen));
591 if (listen == NULL)
592 {
593 perror("rtcfg_storeListen: malloc()");
594 slon_retry();
595 }
596 memset(listen, 0, sizeof(SlonListen));
597
598 listen->li_origin = li_origin;
599 DLLIST_ADD_TAIL(node->listen_head, node->listen_tail, listen);
600
601 rtcfg_unlock();
602 rtcfg_seq_bump();
603
604 /*
605 * Eventually start communicating with that node
606 */
607 rtcfg_startStopNodeThread(node);
608 }
609
610
611 /* ----------
612 * rtcfg_dropListen
613 * ----------
614 */
615 void
rtcfg_dropListen(int li_origin,int li_provider)616 rtcfg_dropListen(int li_origin, int li_provider)
617 {
618 SlonNode *node;
619 SlonListen *listen;
620
621 rtcfg_lock();
622
623 node = rtcfg_findNode(li_provider);
624 if (!node)
625 {
626 slon_log(SLON_FATAL,
627 "dropListen: unknown node ID %d\n", li_provider);
628 slon_retry();
629 return;
630 }
631
632 /*
633 * Find that listen entry at this provider.
634 */
635 for (listen = node->listen_head; listen; listen = listen->next)
636 {
637 if (listen->li_origin == li_origin)
638 {
639 slon_log(SLON_CONFIG,
640 "dropListen: li_origin=%d li_receiver=%d "
641 "li_provider=%d\n",
642 li_origin, rtcfg_nodeid, li_provider);
643
644 DLLIST_REMOVE(node->listen_head, node->listen_tail, listen);
645 free(listen);
646
647 rtcfg_unlock();
648 rtcfg_seq_bump();
649
650 /*
651 * Eventually stop communicating with that node
652 */
653 rtcfg_startStopNodeThread(node);
654 return;
655 }
656 }
657
658 rtcfg_unlock();
659
660 /*
661 * Add the new event origin to the provider (this node)
662 */
663 slon_log(SLON_DEBUG1,
664 "storeListen: li_origin=%d li_receiver=%d li_provider=%d "
665 "- not listening\n",
666 li_origin, rtcfg_nodeid, li_provider);
667 }
668
669
670 /* ----------
671 * rtcfg_storeSet
672 * ----------
673 */
674 void
rtcfg_storeSet(int set_id,int set_origin,char * set_comment)675 rtcfg_storeSet(int set_id, int set_origin, char *set_comment)
676 {
677 SlonSet *set;
678
679 rtcfg_lock();
680
681 /*
682 * Try to update an existing set configuration
683 */
684 for (set = rtcfg_set_list_head; set; set = set->next)
685 {
686 if (set->set_id == set_id)
687 {
688 int old_origin = set->set_origin;
689
690 slon_log(SLON_CONFIG,
691 "storeSet: set_id=%d set_origin=%d "
692 "set_comment='%s' - update set\n",
693 set_id, set_origin,
694 (set_comment == NULL) ? "<unchanged>" : set_comment);
695 set->set_origin = set_origin;
696 if (set_comment != NULL)
697 {
698 free(set->set_comment);
699 set->set_comment = strdup(set_comment);
700 }
701 rtcfg_unlock();
702 rtcfg_seq_bump();
703 if (old_origin != set_origin)
704 sched_wakeup_node(old_origin);
705 sched_wakeup_node(set_origin);
706 return;
707 }
708 }
709
710 /*
711 * Add a new set to the configuration
712 */
713 slon_log(SLON_CONFIG,
714 "storeSet: set_id=%d set_origin=%d set_comment='%s'\n",
715 set_id, set_origin, set_comment);
716 set = (SlonSet *) malloc(sizeof(SlonSet));
717 if (set == NULL)
718 {
719 perror("rtcfg_storeSet: malloc()");
720 slon_retry();
721 }
722 memset(set, 0, sizeof(SlonSet));
723
724 set->set_id = set_id;
725 set->set_origin = set_origin;
726 set->set_comment = strdup((set_comment == NULL) ? "<unknown>" : set_comment);
727
728 set->sub_provider = -1;
729
730 DLLIST_ADD_TAIL(rtcfg_set_list_head, rtcfg_set_list_tail, set);
731 rtcfg_unlock();
732 rtcfg_seq_bump();
733 sched_wakeup_node(set_origin);
734 }
735
736
737 /* ----------
738 * rtcfg_dropSet
739 * ----------
740 */
741 void
rtcfg_dropSet(int set_id)742 rtcfg_dropSet(int set_id)
743 {
744 SlonSet *set;
745
746 rtcfg_lock();
747
748 /*
749 * Find the set and remove it from the config
750 */
751 for (set = rtcfg_set_list_head; set; set = set->next)
752 {
753 if (set->set_id == set_id)
754 {
755 int old_origin = set->set_origin;
756
757 slon_log(SLON_CONFIG,
758 "dropSet: set_id=%d\n", set_id);
759 DLLIST_REMOVE(rtcfg_set_list_head, rtcfg_set_list_tail, set);
760 free(set->set_comment);
761 free(set);
762
763 rtcfg_unlock();
764 rtcfg_seq_bump();
765 sched_wakeup_node(old_origin);
766 return;
767 }
768 }
769
770 slon_log(SLON_CONFIG,
771 "dropSet: set_id=%d - set not found\n", set_id);
772 rtcfg_unlock();
773 }
774
775 /* ------
776 * rtcfg_reloadSets
777 */
rtcfg_reloadSets(PGconn * db)778 void rtcfg_reloadSets(PGconn * db)
779 {
780 SlonDString query;
781 PGresult *res;
782 int i,
783 n;
784 SlonSet *set;
785
786 rtcfg_lock();
787 dstring_init(&query);
788 /*
789 * Read configuration table sl_set
790 */
791 slon_mkquery(&query,
792 "select set_id, set_origin, set_comment "
793 "from %s.sl_set",
794 rtcfg_namespace);
795 res = PQexec(db, dstring_data(&query));
796 if (PQresultStatus(res) != PGRES_TUPLES_OK)
797 {
798 slon_log(SLON_FATAL, "main: Cannot get set config - %s\n",
799 PQresultErrorMessage(res));
800 PQclear(res);
801 dstring_free(&query);
802 slon_retry();
803 }
804 for (i = 0, n = PQntuples(res); i < n; i++)
805 {
806 int set_id = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
807 int set_origin = (int) strtol(PQgetvalue(res, i, 1), NULL, 10);
808 for (set = rtcfg_set_list_head; set; set = set->next)
809 {
810 if (set->set_id == set_id)
811 {
812 set->set_origin=set_origin;
813 }
814 }/*for set in array*/
815 }/*for tuple*/
816 PQclear(res);
817 rtcfg_unlock();
818 dstring_free(&query);
819 }
820
821
822
823
824 /* ----------
825 * rtcfg_moveSet
826 * ----------
827 */
828 void
rtcfg_moveSet(int set_id,int old_origin,int new_origin,int sub_provider)829 rtcfg_moveSet(int set_id, int old_origin, int new_origin, int sub_provider)
830 {
831 SlonSet *set;
832
833 rtcfg_lock();
834
835 /*
836 * find the set
837 */
838 for (set = rtcfg_set_list_head; set; set = set->next)
839 {
840 if (set->set_id == set_id)
841 {
842 slon_log(SLON_CONFIG,
843 "moveSet: set_id=%d old_origin=%d "
844 "new_origin=%d\n",
845 set_id, old_origin, new_origin);
846
847 set->set_origin = new_origin;
848 set->sub_provider = sub_provider;
849 if (rtcfg_nodeid == old_origin)
850 {
851 set->sub_active = true;
852 set->sub_forward = true;
853 }
854 if (sub_provider < 0)
855 {
856 set->sub_active = false;
857 set->sub_forward = false;
858 }
859 rtcfg_unlock();
860 rtcfg_seq_bump();
861 sched_wakeup_node(old_origin);
862 sched_wakeup_node(new_origin);
863 return;
864 }
865 }
866
867 /*
868 * This cannot happen.
869 */
870 rtcfg_unlock();
871 slon_log(SLON_FATAL, "rtcfg_moveSet(): set %d not found\n", set_id);
872 slon_retry();
873 }
874
875
876 /* ----------
877 * rtcfg_storeSubscribe
878 * ----------
879 */
880 void
rtcfg_storeSubscribe(int sub_set,int sub_provider,char * sub_forward)881 rtcfg_storeSubscribe(int sub_set, int sub_provider, char *sub_forward)
882 {
883 SlonSet *set;
884 int old_provider = -1;
885
886 rtcfg_lock();
887
888 /*
889 * Find the set and store subscription information
890 */
891 for (set = rtcfg_set_list_head; set; set = set->next)
892 {
893 if (set->set_id == sub_set)
894 {
895 slon_log(SLON_CONFIG,
896 "storeSubscribe: sub_set=%d sub_provider=%d "
897 "sub_forward='%s'\n",
898 sub_set, sub_provider, sub_forward);
899 old_provider = set->sub_provider;
900 if (set->sub_provider < 0)
901 set->sub_active = 0;
902 set->sub_provider = sub_provider;
903 set->sub_forward = (*sub_forward == 't');
904 rtcfg_unlock();
905 rtcfg_seq_bump();
906
907 /*
908 * Wakeup the worker threads for the old and new provider
909 */
910 if (old_provider >= 0 && old_provider != sub_provider)
911 sched_wakeup_node(old_provider);
912 if (sub_provider >= 0)
913 sched_wakeup_node(sub_provider);
914 return;
915 }
916 }
917
918 slon_log(SLON_FATAL,
919 "storeSubscribe: set %d not found\n", sub_set);
920 rtcfg_unlock();
921 slon_retry();
922 }
923
924
925 /* ----------
926 * rtcfg_enableSubscription
927 * ----------
928 */
929 void
rtcfg_enableSubscription(int sub_set,int sub_provider,char * sub_forward)930 rtcfg_enableSubscription(int sub_set, int sub_provider, char *sub_forward)
931 {
932 SlonSet *set;
933 int old_provider = -1;
934
935 rtcfg_lock();
936
937 /*
938 * Find the set and enable its subscription
939 */
940 for (set = rtcfg_set_list_head; set; set = set->next)
941 {
942 if (set->set_id == sub_set)
943 {
944 slon_log(SLON_CONFIG,
945 "enableSubscription: sub_set=%d\n",
946 sub_set);
947 old_provider = set->sub_provider;
948 set->sub_provider = sub_provider;
949 set->sub_forward = (*sub_forward == 't');
950 if (set->sub_provider >= 0)
951 set->sub_active = 1;
952 else
953 set->sub_active = 0;
954 rtcfg_unlock();
955 rtcfg_seq_bump();
956 if (old_provider > 0 && old_provider != sub_provider)
957 sched_wakeup_node(old_provider);
958 if (sub_provider > 0)
959 sched_wakeup_node(sub_provider);
960 return;
961 }
962 }
963
964 slon_log(SLON_FATAL,
965 "enableSubscription: set %d not found\n", sub_set);
966 rtcfg_unlock();
967 slon_retry();
968 }
969
970
971 /* ----------
972 * rtcfg_unsubscribeSet
973 * ----------
974 */
975 void
rtcfg_unsubscribeSet(int sub_set)976 rtcfg_unsubscribeSet(int sub_set)
977 {
978 SlonSet *set;
979 int old_provider = -1;
980
981 rtcfg_lock();
982
983 /*
984 * Find the set and store subscription information
985 */
986 for (set = rtcfg_set_list_head; set; set = set->next)
987 {
988 if (set->set_id == sub_set)
989 {
990 slon_log(SLON_CONFIG,
991 "unsubscribeSet: sub_set=%d\n",
992 sub_set);
993 old_provider = set->sub_provider;
994 set->sub_provider = -1;
995 set->sub_active = false;
996 set->sub_forward = false;
997 rtcfg_unlock();
998 rtcfg_seq_bump();
999
1000 /*
1001 * Wakeup the worker threads for the old and new provider
1002 */
1003 if (old_provider >= 0)
1004 sched_wakeup_node(old_provider);
1005 return;
1006 }
1007 }
1008
1009 slon_log(SLON_FATAL,
1010 "unsubscribeSet: set %d not found\n", sub_set);
1011 rtcfg_unlock();
1012 slon_retry();
1013 }
1014
1015
1016 /* ----------
1017 * rtcfg_startStopNodeThread
1018 * ----------
1019 */
1020 static void
rtcfg_startStopNodeThread(SlonNode * node)1021 rtcfg_startStopNodeThread(SlonNode * node)
1022 {
1023 int need_listen = false;
1024 int need_wakeup = false;
1025
1026 rtcfg_lock();
1027
1028 if (sched_get_status() == SCHED_STATUS_OK && node->no_active)
1029 {
1030 /*
1031 * Make sure the node worker exists
1032 */
1033 switch (node->worker_status)
1034 {
1035 case SLON_TSTAT_NONE:
1036 if (pthread_create(&(node->worker_thread), NULL,
1037 remoteWorkerThread_main, (void *) node) < 0)
1038 {
1039 slon_log(SLON_FATAL,
1040 "startStopNodeThread: cannot create "
1041 "remoteWorkerThread - %s\n",
1042 strerror(errno));
1043 rtcfg_unlock();
1044 slon_retry();
1045 }
1046 node->worker_status = SLON_TSTAT_RUNNING;
1047 break;
1048
1049 case SLON_TSTAT_RUNNING:
1050 break;
1051
1052 default:
1053 printf("TODO: ********** rtcfg_startStopNodeThread: restart node worker\n");
1054 }
1055 }
1056 else
1057 {
1058 /*
1059 * Make sure there is no node worker
1060 */
1061 switch (node->worker_status)
1062 {
1063 case SLON_TSTAT_NONE:
1064 break;
1065 default:
1066 break;
1067 }
1068 }
1069
1070 /*
1071 * Determine if we need a listener thread
1072 */
1073 if (node->listen_head != NULL)
1074 need_listen = true;
1075 if (!(node->no_active))
1076 need_listen = false;
1077
1078 /*
1079 * Start or stop the remoteListenThread
1080 */
1081 if (need_listen)
1082 {
1083 /*
1084 * Node specific listen thread is required
1085 */
1086 switch (node->listen_status)
1087 {
1088 case SLON_TSTAT_NONE:
1089 node->listen_status = SLON_TSTAT_RUNNING;
1090 if (pthread_create(&(node->listen_thread), NULL,
1091 remoteListenThread_main, (void *) node) < 0)
1092 {
1093 slon_log(SLON_FATAL,
1094 "startStopNodeThread: cannot create "
1095 "remoteListenThread - %s\n",
1096 strerror(errno));
1097 rtcfg_unlock();
1098 slon_retry();
1099 }
1100 break;
1101
1102 case SLON_TSTAT_RUNNING:
1103 need_wakeup = true;
1104 break;
1105
1106 case SLON_TSTAT_SHUTDOWN:
1107 node->listen_status = SLON_TSTAT_RESTART;
1108 need_wakeup = true;
1109 break;
1110
1111 case SLON_TSTAT_RESTART:
1112 need_wakeup = true;
1113 break;
1114
1115 case SLON_TSTAT_DONE:
1116 pthread_join(node->listen_thread, NULL);
1117 node->listen_status = SLON_TSTAT_NONE;
1118 break;
1119 }
1120 }
1121 else
1122 {
1123 /*
1124 * Node specific listen thread not required
1125 */
1126 switch (node->listen_status)
1127 {
1128 case SLON_TSTAT_NONE:
1129 break;
1130
1131 case SLON_TSTAT_RUNNING:
1132 node->listen_status = SLON_TSTAT_SHUTDOWN;
1133 need_wakeup = true;
1134 break;
1135
1136 case SLON_TSTAT_SHUTDOWN:
1137 need_wakeup = true;
1138 break;
1139
1140 case SLON_TSTAT_RESTART:
1141 node->listen_status = SLON_TSTAT_SHUTDOWN;
1142 need_wakeup = true;
1143 break;
1144
1145 case SLON_TSTAT_DONE:
1146 pthread_join(node->listen_thread, NULL);
1147 node->listen_status = SLON_TSTAT_NONE;
1148 break;
1149 }
1150 }
1151
1152 rtcfg_unlock();
1153
1154 /*
1155 * If need be, wakeup all node threads to reread the configuration.
1156 */
1157 if (need_wakeup)
1158 sched_wakeup_node(node->no_id);
1159 }
1160
1161
1162 /* ----------
1163 * rtcfg_needActivate
1164 * ----------
1165 */
1166 void
rtcfg_needActivate(int no_id)1167 rtcfg_needActivate(int no_id)
1168 {
1169 struct to_activate *anode;
1170
1171 anode = (struct to_activate *) malloc(sizeof(struct to_activate));
1172 if (anode == NULL)
1173 {
1174 perror("rtcfg_needActivate: malloc()");
1175 slon_retry();
1176 }
1177 anode->no_id = no_id;
1178 DLLIST_ADD_TAIL(to_activate_head, to_activate_tail, anode);
1179 }
1180
1181
1182 /* ----------
1183 * rtcfg_doActivate
1184 * ----------
1185 */
1186 void
rtcfg_doActivate(void)1187 rtcfg_doActivate(void)
1188 {
1189 while (to_activate_head != NULL)
1190 {
1191 struct to_activate *anode = to_activate_head;
1192
1193 rtcfg_enableNode(anode->no_id);
1194 DLLIST_REMOVE(to_activate_head, to_activate_tail, anode);
1195 free(anode);
1196 }
1197 }
1198
1199
1200 /* ----------
1201 * rtcfg_joinAllRemoteThreads
1202 * ----------
1203 */
1204 void
rtcfg_joinAllRemoteThreads(void)1205 rtcfg_joinAllRemoteThreads(void)
1206 {
1207 SlonNode *node;
1208
1209 rtcfg_lock();
1210
1211 for (node = rtcfg_node_list_head; node; node = node->next)
1212 {
1213 switch (node->listen_status)
1214 {
1215 case SLON_TSTAT_NONE:
1216 break;
1217
1218 case SLON_TSTAT_RUNNING:
1219 case SLON_TSTAT_SHUTDOWN:
1220 case SLON_TSTAT_RESTART:
1221 node->listen_status = SLON_TSTAT_SHUTDOWN;
1222 /* fall through */
1223
1224 case SLON_TSTAT_DONE:
1225 rtcfg_unlock();
1226 sched_wakeup_node(node->no_id);
1227 pthread_join(node->listen_thread, NULL);
1228 rtcfg_lock();
1229 node->listen_status = SLON_TSTAT_NONE;
1230 break;
1231 }
1232
1233 switch (node->worker_status)
1234 {
1235 case SLON_TSTAT_NONE:
1236 break;
1237
1238 case SLON_TSTAT_RUNNING:
1239 case SLON_TSTAT_SHUTDOWN:
1240 case SLON_TSTAT_RESTART:
1241 node->worker_status = SLON_TSTAT_SHUTDOWN;
1242 /* fall through */
1243
1244 case SLON_TSTAT_DONE:
1245 rtcfg_unlock();
1246 remoteWorker_wakeup(node->no_id);
1247 pthread_join(node->worker_thread, NULL);
1248 rtcfg_lock();
1249 node->worker_status = SLON_TSTAT_NONE;
1250 break;
1251 }
1252
1253 }
1254
1255 rtcfg_unlock();
1256 }
1257
1258
1259 /* ----------
1260 * rtcfg_seq_bump
1261 * ----------
1262 */
1263 void
rtcfg_seq_bump(void)1264 rtcfg_seq_bump(void)
1265 {
1266 pthread_mutex_lock(&cfgseq_lock);
1267 cfgseq++;
1268 pthread_mutex_unlock(&cfgseq_lock);
1269 }
1270
1271
1272 /* ----------
1273 * rtcfg_seq_get
1274 * ----------
1275 */
1276 int64
rtcfg_seq_get(void)1277 rtcfg_seq_get(void)
1278 {
1279 int64 retval;
1280
1281 pthread_mutex_lock(&cfgseq_lock);
1282 retval = cfgseq;
1283 pthread_mutex_unlock(&cfgseq_lock);
1284
1285 return retval;
1286 }
1287
1288 /*
1289 * Local Variables:
1290 * tab-width: 4
1291 * c-indent-level: 4
1292 * c-basic-offset: 4
1293 * End:
1294 */
1295