1 /*-------------------------------------------------------------------------
2  * runtime_config.c
3  *
4  *	Functions maintaining the in-memory configuration information
5  *
6  *	Copyright (c) 2003-2009, PostgreSQL Global Development Group
7  *	Author: Jan Wieck, Afilias USA INC.
8  *
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 
14 #include <pthread.h>
15 
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <stdarg.h>
19 #ifndef WIN32
20 #include <unistd.h>
21 #include <sys/time.h>
22 #endif
23 #include <string.h>
24 #include <errno.h>
25 #include <signal.h>
26 #include <sys/types.h>
27 
28 #include "slon.h"
29 
30 
31 /* ----------
32  * Global data
33  * ----------
34  */
35 pid_t		slon_pid;
36 
37 #ifndef WIN32
38 pthread_mutex_t slon_watchdog_lock;
39 pid_t		slon_watchdog_pid;
40 pid_t		slon_worker_pid;
41 #endif
42 char	   *rtcfg_cluster_name = NULL;
43 char	   *rtcfg_namespace = NULL;
44 char	   *rtcfg_conninfo = NULL;
45 int			rtcfg_nodeid = -1;
46 int			rtcfg_nodeactive = 0;
47 char	   *rtcfg_nodecomment = NULL;
48 char		rtcfg_lastevent[64];
49 
50 SlonSet    *rtcfg_set_list_head = NULL;
51 SlonSet    *rtcfg_set_list_tail = NULL;
52 SlonNode   *rtcfg_node_list_head = NULL;
53 SlonNode   *rtcfg_node_list_tail = NULL;
54 
55 
56 /* ----------
57  * Local data
58  * ----------
59  */
60 static pthread_mutex_t config_lock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_mutex_t cfgseq_lock = PTHREAD_MUTEX_INITIALIZER;
62 static int64 cfgseq = 0;
63 
64 struct to_activate
65 {
66 	int			no_id;
67 
68 	struct to_activate *prev;
69 	struct to_activate *next;
70 };
71 static struct to_activate *to_activate_head = NULL;
72 static struct to_activate *to_activate_tail = NULL;
73 
74 
75 /* ----------
76  * Local functions
77  * ----------
78  */
79 static void rtcfg_startStopNodeThread(SlonNode * node);
80 
81 
82 /* ----------
83  * rtcfg_lock
84  * ----------
85  */
86 void
rtcfg_lock(void)87 rtcfg_lock(void)
88 {
89 	pthread_mutex_lock(&config_lock);
90 }
91 
92 
93 /* ----------
94  * rtcfg_unlock
95  * ----------
96  */
97 void
rtcfg_unlock(void)98 rtcfg_unlock(void)
99 {
100 	pthread_mutex_unlock(&config_lock);
101 }
102 
103 
104 /* ----------
105  * rtcfg_storeNode
106  * ----------
107  */
108 void
rtcfg_storeNode(int no_id,char * no_comment)109 rtcfg_storeNode(int no_id, char *no_comment)
110 {
111 	SlonNode   *node;
112 
113 	rtcfg_lock();
114 
115 	/*
116 	 * If we have that node already, just change the comment field.
117 	 */
118 	node = rtcfg_findNode(no_id);
119 	if (node)
120 	{
121 		slon_log(SLON_CONFIG,
122 				 "storeNode: no_id=%d no_comment='%s' - update node\n",
123 				 no_id, no_comment);
124 
125 		free(node->no_comment);
126 		node->no_comment = strdup(no_comment);
127 
128 		rtcfg_unlock();
129 		return;
130 	}
131 
132 	/*
133 	 * Add the new node to our in-memory configuration.
134 	 */
135 	slon_log(SLON_CONFIG,
136 			 "storeNode: no_id=%d no_comment='%s'\n",
137 			 no_id, no_comment);
138 
139 	node = (SlonNode *) malloc(sizeof(SlonNode));
140 	if (node == NULL)
141 	{
142 		perror("rtcfg_storeNode: malloc()");
143 		slon_retry();
144 	}
145 	memset(node, 0, sizeof(SlonNode));
146 
147 	node->no_id = no_id;
148 	node->no_active = false;
149 	node->no_comment = strdup(no_comment);
150 	node->last_snapshot = strdup("1:1:");
151 	pthread_mutex_init(&(node->message_lock), NULL);
152 	pthread_cond_init(&(node->message_cond), NULL);
153 
154 	DLLIST_ADD_TAIL(rtcfg_node_list_head, rtcfg_node_list_tail, node);
155 
156 	rtcfg_unlock();
157 	rtcfg_seq_bump();
158 }
159 
160 
161 /* ----------
162  * rtcfg_setNodeLastEvent()
163  *
164  * Set the last_event field in the node runtime structure.
165  *
166  * Returns: 0 if the event_seq is <= the known value -1 if the node is
167  * not known event_seq otherwise
168  * ----------
169  */
170 int64
rtcfg_setNodeLastEvent(int no_id,int64 event_seq)171 rtcfg_setNodeLastEvent(int no_id, int64 event_seq)
172 {
173 	SlonNode   *node;
174 	int64		retval;
175 
176 	rtcfg_lock();
177 	if ((node = rtcfg_findNode(no_id)) != NULL)
178 	{
179 		if (node->last_event < event_seq)
180 		{
181 			node->last_event = event_seq;
182 			retval = event_seq;
183 		}
184 		else
185 			retval = 0;
186 	}
187 	else
188 		retval = -1;
189 
190 	rtcfg_unlock();
191 
192 	slon_log(SLON_DEBUG2,
193 			 "setNodeLastEvent: no_id=%d event_seq=" INT64_FORMAT "\n",
194 			 no_id, retval);
195 
196 	return retval;
197 }
198 
199 
200 /* ----------
201  * rtcfg_getNodeLastEvent
202  *
203  * Read the nodes last_event field
204  * ----------
205  */
206 int64
rtcfg_getNodeLastEvent(int no_id)207 rtcfg_getNodeLastEvent(int no_id)
208 {
209 	SlonNode   *node;
210 	int64		retval;
211 
212 	rtcfg_lock();
213 	if ((node = rtcfg_findNode(no_id)) != NULL)
214 	{
215 		retval = node->last_event;
216 	}
217 	else
218 		retval = -1;
219 
220 	rtcfg_unlock();
221 
222 	return retval;
223 }
224 
225 
226 /* ----------
227  * rtcfg_setNodeLastSnapshot()
228  *
229  * Set the last_snapshot field in the node runtime structure.
230  * ----------
231  */
232 void
rtcfg_setNodeLastSnapshot(int no_id,char * snapshot)233 rtcfg_setNodeLastSnapshot(int no_id, char *snapshot)
234 {
235 	SlonNode   *node;
236 
237 	if (snapshot == NULL || strcmp(snapshot, "") == 0)
238 		snapshot = "1:1:";
239 
240 	rtcfg_lock();
241 	if ((node = rtcfg_findNode(no_id)) != NULL)
242 	{
243 		if (node->last_snapshot != NULL)
244 			free(node->last_snapshot);
245 
246 		node->last_snapshot = strdup(snapshot);
247 	}
248 
249 	rtcfg_unlock();
250 
251 	slon_log(SLON_DEBUG2,
252 			 "setNodeLastSnapshot: no_id=%d snapshot='%s'\n",
253 			 no_id, snapshot);
254 }
255 
256 
257 /* ----------
258  * rtcfg_getNodeLastSnapshot
259  *
260  * Read the nodes last_snapshot field
261  * ----------
262  */
263 char *
rtcfg_getNodeLastSnapshot(int no_id)264 rtcfg_getNodeLastSnapshot(int no_id)
265 {
266 	SlonNode   *node;
267 	char	   *retval;
268 
269 	rtcfg_lock();
270 	if ((node = rtcfg_findNode(no_id)) != NULL)
271 	{
272 		retval = node->last_snapshot;
273 	}
274 	else
275 		retval = NULL;
276 
277 	rtcfg_unlock();
278 
279 	return retval;
280 }
281 
282 
283 /* ----------
284  * rtcfg_enableNode
285  * ----------
286  */
287 void
rtcfg_enableNode(int no_id)288 rtcfg_enableNode(int no_id)
289 {
290 	SlonNode   *node;
291 
292 	rtcfg_lock();
293 
294 	node = rtcfg_findNode(no_id);
295 	if (!node)
296 	{
297 		rtcfg_unlock();
298 
299 		slon_log(SLON_FATAL,
300 				 "enableNode: unknown node ID %d\n", no_id);
301 		slon_retry();
302 		return;
303 	}
304 
305 	/*
306 	 * Activate the node
307 	 */
308 	slon_log(SLON_CONFIG,
309 			 "enableNode: no_id=%d\n", no_id);
310 	node->no_active = true;
311 
312 	rtcfg_unlock();
313 	rtcfg_seq_bump();
314 
315 	rtcfg_startStopNodeThread(node);
316 }
317 
318 
319 /* ----------
320  * slon_disableNode
321  * ----------
322  */
323 void
rtcfg_disableNode(int no_id)324 rtcfg_disableNode(int no_id)
325 {
326 	SlonNode   *node;
327 
328 	rtcfg_lock();
329 
330 	node = rtcfg_findNode(no_id);
331 	if (!node)
332 	{
333 		rtcfg_unlock();
334 
335 		slon_log(SLON_FATAL,
336 				 "enableNode: unknown node ID %d\n", no_id);
337 		slon_retry();
338 		return;
339 	}
340 
341 	/*
342 	 * Deactivate the node
343 	 */
344 	slon_log(SLON_CONFIG,
345 			 "disableNode: no_id=%d\n", no_id);
346 	node->no_active = false;
347 
348 	rtcfg_unlock();
349 	rtcfg_seq_bump();
350 
351 	/*
352 	 * rtcfg_startStopNodeThread(node);
353 	 */
354 }
355 
356 
357 /* ----------
358  * rtcfg_findNode
359  * ----------
360  */
361 SlonNode *
rtcfg_findNode(int no_id)362 rtcfg_findNode(int no_id)
363 {
364 	SlonNode   *node;
365 
366 	for (node = rtcfg_node_list_head; node; node = node->next)
367 	{
368 		if (node->no_id == no_id)
369 			return node;
370 	}
371 
372 	return NULL;
373 }
374 
375 
376 /* ----------
377  * rtcfg_storePath
378  * ----------
379  */
380 void
rtcfg_storePath(int pa_server,char * pa_conninfo,int pa_connretry)381 rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry)
382 {
383 	SlonNode   *node;
384 
385 	rtcfg_lock();
386 
387 	node = rtcfg_findNode(pa_server);
388 	if (!node)
389 	{
390 
391 		rtcfg_unlock();
392 
393 		slon_log(SLON_WARN,
394 			   "storePath: unknown node ID %d - event pending\n", pa_server);
395 		rtcfg_storeNode(pa_server, "<event pending>");
396 
397 		rtcfg_lock();
398 		node = rtcfg_findNode(pa_server);
399 	}
400 
401 	/*
402 	 * Store the (new) conninfo to the node
403 	 */
404 	slon_log(SLON_CONFIG,
405 			 "storePath: pa_server=%d pa_client=%d "
406 			 "pa_conninfo=\"%s\" pa_connretry=%d\n",
407 			 pa_server, rtcfg_nodeid, pa_conninfo, pa_connretry);
408 	if (node->pa_conninfo != NULL)
409 		free(node->pa_conninfo);
410 	node->pa_conninfo = strdup(pa_conninfo);
411 	node->pa_connretry = pa_connretry;
412 
413 	rtcfg_unlock();
414 	rtcfg_seq_bump();
415 
416 	/*
417 	 * Eventually start communicating with that node
418 	 */
419 	rtcfg_startStopNodeThread(node);
420 }
421 
422 
423 /* ----------
424  * rtcfg_dropPath
425  * ----------
426  */
427 void
rtcfg_dropPath(int pa_server)428 rtcfg_dropPath(int pa_server)
429 {
430 	SlonNode   *node;
431 	SlonListen *listen;
432 
433 	rtcfg_lock();
434 
435 	node = rtcfg_findNode(pa_server);
436 	if (!node)
437 	{
438 
439 		rtcfg_unlock();
440 
441 		slon_log(SLON_WARN,
442 				 "dropPath: unknown node ID %d\n", pa_server);
443 
444 		return;
445 	}
446 
447 	/*
448 	 * Drop all listen information as well at this provider. Without a path we
449 	 * cannot listen.
450 	 */
451 	while (node->listen_head != NULL)
452 	{
453 		listen = node->listen_head;
454 
455 		DLLIST_REMOVE(node->listen_head, node->listen_tail, listen);
456 		free(listen);
457 	}
458 
459 	/*
460 	 * Remove the conninfo.
461 	 */
462 	slon_log(SLON_CONFIG,
463 			 "dropPath: pa_server=%d pa_client=%d\n",
464 			 pa_server, rtcfg_nodeid);
465 	if (node->pa_conninfo != NULL)
466 		free(node->pa_conninfo);
467 	node->pa_conninfo = NULL;
468 	node->pa_connretry = 0;
469 
470 	rtcfg_unlock();
471 	rtcfg_seq_bump();
472 
473 	/*
474 	 * Eventually start communicating with that node
475 	 */
476 	rtcfg_startStopNodeThread(node);
477 }
478 
479 
480 /* ----------
481  * rtcfg_storeListen
482  * ----------
483  */
484 void
rtcfg_reloadListen(PGconn * db)485 rtcfg_reloadListen(PGconn *db)
486 {
487 	SlonDString query;
488 	PGresult   *res;
489 	int			i,
490 				n;
491 	SlonNode   *node;
492 	SlonListen *listen;
493 
494 	/*
495 	 * Clear out the entire Listen configuration
496 	 */
497 	rtcfg_lock();
498 	for (node = rtcfg_node_list_head; node; node = node->next)
499 	{
500 		while ((listen = node->listen_head) != NULL)
501 		{
502 			DLLIST_REMOVE(node->listen_head, node->listen_tail, listen);
503 			free(listen);
504 		}
505 	}
506 	rtcfg_unlock();
507 	rtcfg_seq_bump();
508 
509 	/*
510 	 * Read configuration table sl_listen - the interesting pieces
511 	 */
512 	dstring_init(&query);
513 
514 	slon_mkquery(&query,
515 				 "select li_origin, li_provider "
516 				 "from %s.sl_listen where li_receiver = %d",
517 				 rtcfg_namespace, rtcfg_nodeid);
518 	res = PQexec(db, dstring_data(&query));
519 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
520 	{
521 		slon_log(SLON_FATAL, "Cannot get listen config - %s",
522 				 PQresultErrorMessage(res));
523 		PQclear(res);
524 		dstring_free(&query);
525 		slon_retry();
526 	}
527 	for (i = 0, n = PQntuples(res); i < n; i++)
528 	{
529 		int			li_origin = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
530 		int			li_provider = (int) strtol(PQgetvalue(res, i, 1), NULL, 10);
531 
532 		rtcfg_storeListen(li_origin, li_provider);
533 	}
534 	PQclear(res);
535 
536 	dstring_free(&query);
537 
538 	for (node = rtcfg_node_list_head; node; node = node->next)
539 	{
540 		rtcfg_startStopNodeThread(node);
541 	}
542 }
543 
544 
545 /* ----------
546  * rtcfg_storeListen
547  * ----------
548  */
549 void
rtcfg_storeListen(int li_origin,int li_provider)550 rtcfg_storeListen(int li_origin, int li_provider)
551 {
552 	SlonNode   *node;
553 	SlonListen *listen;
554 
555 	rtcfg_lock();
556 
557 	node = rtcfg_findNode(li_provider);
558 	if (!node)
559 	{
560 		slon_log(SLON_FATAL,
561 				 "storeListen: unknown node ID %d\n", li_provider);
562 		slon_retry();
563 		return;
564 	}
565 
566 	/*
567 	 * Check if we already listen for events from that origin at this
568 	 * provider.
569 	 */
570 	for (listen = node->listen_head; listen; listen = listen->next)
571 	{
572 		if (listen->li_origin == li_origin)
573 		{
574 			slon_log(SLON_DEBUG2,
575 					 "storeListen: li_origin=%d li_receiver=%d "
576 					 "li_provider=%d - already listening\n",
577 					 li_origin, rtcfg_nodeid, li_provider);
578 			rtcfg_unlock();
579 			return;
580 		}
581 	}
582 
583 	/*
584 	 * Add the new event origin to the provider (this node)
585 	 */
586 	slon_log(SLON_CONFIG,
587 			 "storeListen: li_origin=%d li_receiver=%d li_provider=%d\n",
588 			 li_origin, rtcfg_nodeid, li_provider);
589 
590 	listen = (SlonListen *) malloc(sizeof(SlonListen));
591 	if (listen == NULL)
592 	{
593 		perror("rtcfg_storeListen: malloc()");
594 		slon_retry();
595 	}
596 	memset(listen, 0, sizeof(SlonListen));
597 
598 	listen->li_origin = li_origin;
599 	DLLIST_ADD_TAIL(node->listen_head, node->listen_tail, listen);
600 
601 	rtcfg_unlock();
602 	rtcfg_seq_bump();
603 
604 	/*
605 	 * Eventually start communicating with that node
606 	 */
607 	rtcfg_startStopNodeThread(node);
608 }
609 
610 
611 /* ----------
612  * rtcfg_dropListen
613  * ----------
614  */
615 void
rtcfg_dropListen(int li_origin,int li_provider)616 rtcfg_dropListen(int li_origin, int li_provider)
617 {
618 	SlonNode   *node;
619 	SlonListen *listen;
620 
621 	rtcfg_lock();
622 
623 	node = rtcfg_findNode(li_provider);
624 	if (!node)
625 	{
626 		slon_log(SLON_FATAL,
627 				 "dropListen: unknown node ID %d\n", li_provider);
628 		slon_retry();
629 		return;
630 	}
631 
632 	/*
633 	 * Find that listen entry at this provider.
634 	 */
635 	for (listen = node->listen_head; listen; listen = listen->next)
636 	{
637 		if (listen->li_origin == li_origin)
638 		{
639 			slon_log(SLON_CONFIG,
640 					 "dropListen: li_origin=%d li_receiver=%d "
641 					 "li_provider=%d\n",
642 					 li_origin, rtcfg_nodeid, li_provider);
643 
644 			DLLIST_REMOVE(node->listen_head, node->listen_tail, listen);
645 			free(listen);
646 
647 			rtcfg_unlock();
648 			rtcfg_seq_bump();
649 
650 			/*
651 			 * Eventually stop communicating with that node
652 			 */
653 			rtcfg_startStopNodeThread(node);
654 			return;
655 		}
656 	}
657 
658 	rtcfg_unlock();
659 
660 	/*
661 	 * Add the new event origin to the provider (this node)
662 	 */
663 	slon_log(SLON_DEBUG1,
664 			 "storeListen: li_origin=%d li_receiver=%d li_provider=%d "
665 			 "- not listening\n",
666 			 li_origin, rtcfg_nodeid, li_provider);
667 }
668 
669 
670 /* ----------
671  * rtcfg_storeSet
672  * ----------
673  */
674 void
rtcfg_storeSet(int set_id,int set_origin,char * set_comment)675 rtcfg_storeSet(int set_id, int set_origin, char *set_comment)
676 {
677 	SlonSet    *set;
678 
679 	rtcfg_lock();
680 
681 	/*
682 	 * Try to update an existing set configuration
683 	 */
684 	for (set = rtcfg_set_list_head; set; set = set->next)
685 	{
686 		if (set->set_id == set_id)
687 		{
688 			int			old_origin = set->set_origin;
689 
690 			slon_log(SLON_CONFIG,
691 					 "storeSet: set_id=%d set_origin=%d "
692 					 "set_comment='%s' - update set\n",
693 					 set_id, set_origin,
694 					 (set_comment == NULL) ? "<unchanged>" : set_comment);
695 			set->set_origin = set_origin;
696 			if (set_comment != NULL)
697 			{
698 				free(set->set_comment);
699 				set->set_comment = strdup(set_comment);
700 			}
701 			rtcfg_unlock();
702 			rtcfg_seq_bump();
703 			if (old_origin != set_origin)
704 				sched_wakeup_node(old_origin);
705 			sched_wakeup_node(set_origin);
706 			return;
707 		}
708 	}
709 
710 	/*
711 	 * Add a new set to the configuration
712 	 */
713 	slon_log(SLON_CONFIG,
714 			 "storeSet: set_id=%d set_origin=%d set_comment='%s'\n",
715 			 set_id, set_origin, set_comment);
716 	set = (SlonSet *) malloc(sizeof(SlonSet));
717 	if (set == NULL)
718 	{
719 		perror("rtcfg_storeSet: malloc()");
720 		slon_retry();
721 	}
722 	memset(set, 0, sizeof(SlonSet));
723 
724 	set->set_id = set_id;
725 	set->set_origin = set_origin;
726 	set->set_comment = strdup((set_comment == NULL) ? "<unknown>" : set_comment);
727 
728 	set->sub_provider = -1;
729 
730 	DLLIST_ADD_TAIL(rtcfg_set_list_head, rtcfg_set_list_tail, set);
731 	rtcfg_unlock();
732 	rtcfg_seq_bump();
733 	sched_wakeup_node(set_origin);
734 }
735 
736 
737 /* ----------
738  * rtcfg_dropSet
739  * ----------
740  */
741 void
rtcfg_dropSet(int set_id)742 rtcfg_dropSet(int set_id)
743 {
744 	SlonSet    *set;
745 
746 	rtcfg_lock();
747 
748 	/*
749 	 * Find the set and remove it from the config
750 	 */
751 	for (set = rtcfg_set_list_head; set; set = set->next)
752 	{
753 		if (set->set_id == set_id)
754 		{
755 			int			old_origin = set->set_origin;
756 
757 			slon_log(SLON_CONFIG,
758 					 "dropSet: set_id=%d\n", set_id);
759 			DLLIST_REMOVE(rtcfg_set_list_head, rtcfg_set_list_tail, set);
760 			free(set->set_comment);
761 			free(set);
762 
763 			rtcfg_unlock();
764 			rtcfg_seq_bump();
765 			sched_wakeup_node(old_origin);
766 			return;
767 		}
768 	}
769 
770 	slon_log(SLON_CONFIG,
771 			 "dropSet: set_id=%d - set not found\n", set_id);
772 	rtcfg_unlock();
773 }
774 
775 /* ------
776  * rtcfg_reloadSets
777  */
rtcfg_reloadSets(PGconn * db)778 void rtcfg_reloadSets(PGconn * db)
779 {
780 	SlonDString query;
781 	PGresult   *res;
782 	int			i,
783 				n;
784 	SlonSet    *set;
785 
786 	rtcfg_lock();
787 	dstring_init(&query);
788 	/*
789 	 * Read configuration table sl_set
790 	 */
791 	slon_mkquery(&query,
792 				 "select set_id, set_origin, set_comment "
793 				 "from %s.sl_set",
794 				 rtcfg_namespace);
795 	res = PQexec(db, dstring_data(&query));
796 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
797 	{
798 		slon_log(SLON_FATAL, "main: Cannot get set config - %s\n",
799 				 PQresultErrorMessage(res));
800 		PQclear(res);
801 		dstring_free(&query);
802 		slon_retry();
803 	}
804 	for (i = 0, n = PQntuples(res); i < n; i++)
805 	{
806 		int			set_id = (int) strtol(PQgetvalue(res, i, 0), NULL, 10);
807 		int			set_origin = (int) strtol(PQgetvalue(res, i, 1), NULL, 10);
808 		for (set = rtcfg_set_list_head; set; set = set->next)
809 		{
810 			if (set->set_id == set_id)
811 			{
812 				set->set_origin=set_origin;
813 			}
814 		}/*for set in array*/
815 	}/*for tuple*/
816 	PQclear(res);
817 	rtcfg_unlock();
818 	dstring_free(&query);
819 }
820 
821 
822 
823 
824 /* ----------
825  * rtcfg_moveSet
826  * ----------
827  */
828 void
rtcfg_moveSet(int set_id,int old_origin,int new_origin,int sub_provider)829 rtcfg_moveSet(int set_id, int old_origin, int new_origin, int sub_provider)
830 {
831 	SlonSet    *set;
832 
833 	rtcfg_lock();
834 
835 	/*
836 	 * find the set
837 	 */
838 	for (set = rtcfg_set_list_head; set; set = set->next)
839 	{
840 		if (set->set_id == set_id)
841 		{
842 			slon_log(SLON_CONFIG,
843 					 "moveSet: set_id=%d old_origin=%d "
844 					 "new_origin=%d\n",
845 					 set_id, old_origin, new_origin);
846 
847 			set->set_origin = new_origin;
848 			set->sub_provider = sub_provider;
849 			if (rtcfg_nodeid == old_origin)
850 			{
851 				set->sub_active = true;
852 				set->sub_forward = true;
853 			}
854 			if (sub_provider < 0)
855 			{
856 				set->sub_active = false;
857 				set->sub_forward = false;
858 			}
859 			rtcfg_unlock();
860 			rtcfg_seq_bump();
861 			sched_wakeup_node(old_origin);
862 			sched_wakeup_node(new_origin);
863 			return;
864 		}
865 	}
866 
867 	/*
868 	 * This cannot happen.
869 	 */
870 	rtcfg_unlock();
871 	slon_log(SLON_FATAL, "rtcfg_moveSet(): set %d not found\n", set_id);
872 	slon_retry();
873 }
874 
875 
876 /* ----------
877  * rtcfg_storeSubscribe
878  * ----------
879  */
880 void
rtcfg_storeSubscribe(int sub_set,int sub_provider,char * sub_forward)881 rtcfg_storeSubscribe(int sub_set, int sub_provider, char *sub_forward)
882 {
883 	SlonSet    *set;
884 	int			old_provider = -1;
885 
886 	rtcfg_lock();
887 
888 	/*
889 	 * Find the set and store subscription information
890 	 */
891 	for (set = rtcfg_set_list_head; set; set = set->next)
892 	{
893 		if (set->set_id == sub_set)
894 		{
895 			slon_log(SLON_CONFIG,
896 					 "storeSubscribe: sub_set=%d sub_provider=%d "
897 					 "sub_forward='%s'\n",
898 					 sub_set, sub_provider, sub_forward);
899 			old_provider = set->sub_provider;
900 			if (set->sub_provider < 0)
901 				set->sub_active = 0;
902 			set->sub_provider = sub_provider;
903 			set->sub_forward = (*sub_forward == 't');
904 			rtcfg_unlock();
905 			rtcfg_seq_bump();
906 
907 			/*
908 			 * Wakeup the worker threads for the old and new provider
909 			 */
910 			if (old_provider >= 0 && old_provider != sub_provider)
911 				sched_wakeup_node(old_provider);
912 			if (sub_provider >= 0)
913 				sched_wakeup_node(sub_provider);
914 			return;
915 		}
916 	}
917 
918 	slon_log(SLON_FATAL,
919 			 "storeSubscribe: set %d not found\n", sub_set);
920 	rtcfg_unlock();
921 	slon_retry();
922 }
923 
924 
925 /* ----------
926  * rtcfg_enableSubscription
927  * ----------
928  */
929 void
rtcfg_enableSubscription(int sub_set,int sub_provider,char * sub_forward)930 rtcfg_enableSubscription(int sub_set, int sub_provider, char *sub_forward)
931 {
932 	SlonSet    *set;
933 	int			old_provider = -1;
934 
935 	rtcfg_lock();
936 
937 	/*
938 	 * Find the set and enable its subscription
939 	 */
940 	for (set = rtcfg_set_list_head; set; set = set->next)
941 	{
942 		if (set->set_id == sub_set)
943 		{
944 			slon_log(SLON_CONFIG,
945 					 "enableSubscription: sub_set=%d\n",
946 					 sub_set);
947 			old_provider = set->sub_provider;
948 			set->sub_provider = sub_provider;
949 			set->sub_forward = (*sub_forward == 't');
950 			if (set->sub_provider >= 0)
951 				set->sub_active = 1;
952 			else
953 				set->sub_active = 0;
954 			rtcfg_unlock();
955 			rtcfg_seq_bump();
956 			if (old_provider > 0 && old_provider != sub_provider)
957 				sched_wakeup_node(old_provider);
958 			if (sub_provider > 0)
959 				sched_wakeup_node(sub_provider);
960 			return;
961 		}
962 	}
963 
964 	slon_log(SLON_FATAL,
965 			 "enableSubscription: set %d not found\n", sub_set);
966 	rtcfg_unlock();
967 	slon_retry();
968 }
969 
970 
971 /* ----------
972  * rtcfg_unsubscribeSet
973  * ----------
974  */
975 void
rtcfg_unsubscribeSet(int sub_set)976 rtcfg_unsubscribeSet(int sub_set)
977 {
978 	SlonSet    *set;
979 	int			old_provider = -1;
980 
981 	rtcfg_lock();
982 
983 	/*
984 	 * Find the set and store subscription information
985 	 */
986 	for (set = rtcfg_set_list_head; set; set = set->next)
987 	{
988 		if (set->set_id == sub_set)
989 		{
990 			slon_log(SLON_CONFIG,
991 					 "unsubscribeSet: sub_set=%d\n",
992 					 sub_set);
993 			old_provider = set->sub_provider;
994 			set->sub_provider = -1;
995 			set->sub_active = false;
996 			set->sub_forward = false;
997 			rtcfg_unlock();
998 			rtcfg_seq_bump();
999 
1000 			/*
1001 			 * Wakeup the worker threads for the old and new provider
1002 			 */
1003 			if (old_provider >= 0)
1004 				sched_wakeup_node(old_provider);
1005 			return;
1006 		}
1007 	}
1008 
1009 	slon_log(SLON_FATAL,
1010 			 "unsubscribeSet: set %d not found\n", sub_set);
1011 	rtcfg_unlock();
1012 	slon_retry();
1013 }
1014 
1015 
1016 /* ----------
1017  * rtcfg_startStopNodeThread
1018  * ----------
1019  */
1020 static void
rtcfg_startStopNodeThread(SlonNode * node)1021 rtcfg_startStopNodeThread(SlonNode * node)
1022 {
1023 	int			need_listen = false;
1024 	int			need_wakeup = false;
1025 
1026 	rtcfg_lock();
1027 
1028 	if (sched_get_status() == SCHED_STATUS_OK && node->no_active)
1029 	{
1030 		/*
1031 		 * Make sure the node worker exists
1032 		 */
1033 		switch (node->worker_status)
1034 		{
1035 			case SLON_TSTAT_NONE:
1036 				if (pthread_create(&(node->worker_thread), NULL,
1037 								 remoteWorkerThread_main, (void *) node) < 0)
1038 				{
1039 					slon_log(SLON_FATAL,
1040 							 "startStopNodeThread: cannot create "
1041 							 "remoteWorkerThread - %s\n",
1042 							 strerror(errno));
1043 					rtcfg_unlock();
1044 					slon_retry();
1045 				}
1046 				node->worker_status = SLON_TSTAT_RUNNING;
1047 				break;
1048 
1049 			case SLON_TSTAT_RUNNING:
1050 				break;
1051 
1052 			default:
1053 				printf("TODO: ********** rtcfg_startStopNodeThread: restart node worker\n");
1054 		}
1055 	}
1056 	else
1057 	{
1058 		/*
1059 		 * Make sure there is no node worker
1060 		 */
1061 		switch (node->worker_status)
1062 		{
1063 			case SLON_TSTAT_NONE:
1064 				break;
1065 			default:
1066 				break;
1067 		}
1068 	}
1069 
1070 	/*
1071 	 * Determine if we need a listener thread
1072 	 */
1073 	if (node->listen_head != NULL)
1074 		need_listen = true;
1075 	if (!(node->no_active))
1076 		need_listen = false;
1077 
1078 	/*
1079 	 * Start or stop the remoteListenThread
1080 	 */
1081 	if (need_listen)
1082 	{
1083 		/*
1084 		 * Node specific listen thread is required
1085 		 */
1086 		switch (node->listen_status)
1087 		{
1088 			case SLON_TSTAT_NONE:
1089 				node->listen_status = SLON_TSTAT_RUNNING;
1090 				if (pthread_create(&(node->listen_thread), NULL,
1091 								 remoteListenThread_main, (void *) node) < 0)
1092 				{
1093 					slon_log(SLON_FATAL,
1094 							 "startStopNodeThread: cannot create "
1095 							 "remoteListenThread - %s\n",
1096 							 strerror(errno));
1097 					rtcfg_unlock();
1098 					slon_retry();
1099 				}
1100 				break;
1101 
1102 			case SLON_TSTAT_RUNNING:
1103 				need_wakeup = true;
1104 				break;
1105 
1106 			case SLON_TSTAT_SHUTDOWN:
1107 				node->listen_status = SLON_TSTAT_RESTART;
1108 				need_wakeup = true;
1109 				break;
1110 
1111 			case SLON_TSTAT_RESTART:
1112 				need_wakeup = true;
1113 				break;
1114 
1115 			case SLON_TSTAT_DONE:
1116 				pthread_join(node->listen_thread, NULL);
1117 				node->listen_status = SLON_TSTAT_NONE;
1118 				break;
1119 		}
1120 	}
1121 	else
1122 	{
1123 		/*
1124 		 * Node specific listen thread not required
1125 		 */
1126 		switch (node->listen_status)
1127 		{
1128 			case SLON_TSTAT_NONE:
1129 				break;
1130 
1131 			case SLON_TSTAT_RUNNING:
1132 				node->listen_status = SLON_TSTAT_SHUTDOWN;
1133 				need_wakeup = true;
1134 				break;
1135 
1136 			case SLON_TSTAT_SHUTDOWN:
1137 				need_wakeup = true;
1138 				break;
1139 
1140 			case SLON_TSTAT_RESTART:
1141 				node->listen_status = SLON_TSTAT_SHUTDOWN;
1142 				need_wakeup = true;
1143 				break;
1144 
1145 			case SLON_TSTAT_DONE:
1146 				pthread_join(node->listen_thread, NULL);
1147 				node->listen_status = SLON_TSTAT_NONE;
1148 				break;
1149 		}
1150 	}
1151 
1152 	rtcfg_unlock();
1153 
1154 	/*
1155 	 * If need be, wakeup all node threads to reread the configuration.
1156 	 */
1157 	if (need_wakeup)
1158 		sched_wakeup_node(node->no_id);
1159 }
1160 
1161 
1162 /* ----------
1163  * rtcfg_needActivate
1164  * ----------
1165  */
1166 void
rtcfg_needActivate(int no_id)1167 rtcfg_needActivate(int no_id)
1168 {
1169 	struct to_activate *anode;
1170 
1171 	anode = (struct to_activate *) malloc(sizeof(struct to_activate));
1172 	if (anode == NULL)
1173 	{
1174 		perror("rtcfg_needActivate: malloc()");
1175 		slon_retry();
1176 	}
1177 	anode->no_id = no_id;
1178 	DLLIST_ADD_TAIL(to_activate_head, to_activate_tail, anode);
1179 }
1180 
1181 
1182 /* ----------
1183  * rtcfg_doActivate
1184  * ----------
1185  */
1186 void
rtcfg_doActivate(void)1187 rtcfg_doActivate(void)
1188 {
1189 	while (to_activate_head != NULL)
1190 	{
1191 		struct to_activate *anode = to_activate_head;
1192 
1193 		rtcfg_enableNode(anode->no_id);
1194 		DLLIST_REMOVE(to_activate_head, to_activate_tail, anode);
1195 		free(anode);
1196 	}
1197 }
1198 
1199 
1200 /* ----------
1201  * rtcfg_joinAllRemoteThreads
1202  * ----------
1203  */
1204 void
rtcfg_joinAllRemoteThreads(void)1205 rtcfg_joinAllRemoteThreads(void)
1206 {
1207 	SlonNode   *node;
1208 
1209 	rtcfg_lock();
1210 
1211 	for (node = rtcfg_node_list_head; node; node = node->next)
1212 	{
1213 		switch (node->listen_status)
1214 		{
1215 			case SLON_TSTAT_NONE:
1216 				break;
1217 
1218 			case SLON_TSTAT_RUNNING:
1219 			case SLON_TSTAT_SHUTDOWN:
1220 			case SLON_TSTAT_RESTART:
1221 				node->listen_status = SLON_TSTAT_SHUTDOWN;
1222 				/* fall through */
1223 
1224 			case SLON_TSTAT_DONE:
1225 				rtcfg_unlock();
1226 				sched_wakeup_node(node->no_id);
1227 				pthread_join(node->listen_thread, NULL);
1228 				rtcfg_lock();
1229 				node->listen_status = SLON_TSTAT_NONE;
1230 				break;
1231 		}
1232 
1233 		switch (node->worker_status)
1234 		{
1235 			case SLON_TSTAT_NONE:
1236 				break;
1237 
1238 			case SLON_TSTAT_RUNNING:
1239 			case SLON_TSTAT_SHUTDOWN:
1240 			case SLON_TSTAT_RESTART:
1241 				node->worker_status = SLON_TSTAT_SHUTDOWN;
1242 				/* fall through */
1243 
1244 			case SLON_TSTAT_DONE:
1245 				rtcfg_unlock();
1246 				remoteWorker_wakeup(node->no_id);
1247 				pthread_join(node->worker_thread, NULL);
1248 				rtcfg_lock();
1249 				node->worker_status = SLON_TSTAT_NONE;
1250 				break;
1251 		}
1252 
1253 	}
1254 
1255 	rtcfg_unlock();
1256 }
1257 
1258 
1259 /* ----------
1260  * rtcfg_seq_bump
1261  * ----------
1262  */
1263 void
rtcfg_seq_bump(void)1264 rtcfg_seq_bump(void)
1265 {
1266 	pthread_mutex_lock(&cfgseq_lock);
1267 	cfgseq++;
1268 	pthread_mutex_unlock(&cfgseq_lock);
1269 }
1270 
1271 
1272 /* ----------
1273  * rtcfg_seq_get
1274  * ----------
1275  */
1276 int64
rtcfg_seq_get(void)1277 rtcfg_seq_get(void)
1278 {
1279 	int64		retval;
1280 
1281 	pthread_mutex_lock(&cfgseq_lock);
1282 	retval = cfgseq;
1283 	pthread_mutex_unlock(&cfgseq_lock);
1284 
1285 	return retval;
1286 }
1287 
1288 /*
1289  * Local Variables:
1290  *	tab-width: 4
1291  *	c-indent-level: 4
1292  *	c-basic-offset: 4
1293  * End:
1294  */
1295