1 /*-------------------------------------------------------------------------
2  * slon.h
3  *
4  *	Global definitions for the main replication engine.
5  *
6  *	Copyright (c) 2003-2009, PostgreSQL Global Development Group
7  *	Author: Jan Wieck, Afilias USA INC.
8  *
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 #ifndef SLON_H_INCLUDED
14 #define SLON_H_INCLUDED
15 #ifdef MSVC
16 #include "config_msvc.h"
17 #else
18 #include "config.h"
19 #endif
20 #include "types.h"
21 #include "libpq-fe.h"
22 #include "misc.h"
23 #include "conf-file.h"
24 #include "confoptions.h"
25 #include <pg_config.h>
26 #ifdef WIN32
27 #include <winsock2.h>
28 #else
29 #include <sys/time.h>
30 #endif
31 
32 #ifndef INT64_FORMAT
33 #define INT64_FORMAT "%" INT64_MODIFIER "d"
34 #endif
35 
36 #define SLON_MEMDEBUG	1
37 #ifndef false
38 #define   false 0
39 #endif
40 #ifndef true
41 #define   true 1
42 #endif
43 
44 #undef	SLON_CHECK_CMDTUPLES
45 
46 #ifdef	SLON_CHECK_CMDTUPLES
47 #define SLON_COMMANDS_PER_LINE		1
48 #define SLON_DATA_FETCH_SIZE		100
49 #define SLON_WORKLINES_PER_HELPER	(SLON_DATA_FETCH_SIZE * 4)
50 #else
51 #define SLON_COMMANDS_PER_LINE		10
52 #define SLON_DATA_FETCH_SIZE		50
53 #define SLON_WORKLINES_PER_HELPER	(SLON_DATA_FETCH_SIZE * 5)
54 #endif
55 
56 #define SLON_MAX_PATH 1024
57 /* Maximum path length - set to 1024, consistent with PostgreSQL */
58 /* See: http://archives.postgresql.org/pgsql-hackers/1999-10/msg00754.php */
59 /* Also  view src/include/pg_config.h.win32 src/include/pg_config_manual.h */
60 
61  /* FIXME: must determine and use OS specific max path length */
62  /* cbb: Not forcibly necessary; note that MAXPGPATH is 1024 */
63 
64  /* cleanup calls */
65 #define SLON_VACUUM_FREQUENCY		3	/* vacuum every 3rd cleanup */
66 
67 
68 typedef enum
69 {
70 	SLON_TSTAT_NONE,
71 	SLON_TSTAT_RUNNING,
72 	SLON_TSTAT_SHUTDOWN,
73 	SLON_TSTAT_RESTART,
74 	SLON_TSTAT_DONE
75 }	SlonThreadStatus;
76 
77 
78 extern bool logpid;
79 
80 /* ----------
81  * In memory structures for cluster configuration
82  * ----------
83  */
84 typedef struct SlonNode_s SlonNode;
85 typedef struct SlonListen_s SlonListen;
86 typedef struct SlonSet_s SlonSet;
87 typedef struct SlonConn_s SlonConn;
88 typedef struct SlonState_s SlonState;
89 
90 typedef struct SlonWorkMsg_s SlonWorkMsg;
91 
92 /* ----------
93  * SlonState
94  * ----------
95  */
96 struct SlonState_s
97 {
98 	char	   *actor;
99 	pid_t		pid;
100 	int			node;
101 	pid_t		conn_pid;
102 	char	   *activity;
103 	time_t		start_time;
104 	int64		event;
105 	char	   *event_type;
106 };
107 
108 /* ----------
109  * SlonNode
110  * ----------
111  */
112 struct SlonNode_s
113 {
114 	int			no_id;			/* node ID */
115 	int			no_active;		/* it's active state */
116 	char	   *no_comment;		/* comment field */
117 #if 0
118 	pthread_mutex_t node_lock;	/* mutex for node */
119 #endif
120 
121 	char	   *pa_conninfo;	/* path to the node */
122 	int			pa_connretry;	/* connection retry interval */
123 
124 	int64		last_event;		/* last event we have received */
125 	char	   *last_snapshot;	/* snapshot of last sync event */
126 
127 	SlonThreadStatus listen_status;		/* status of the listen thread */
128 	pthread_t	listen_thread;	/* thread id of listen thread */
129 	SlonListen *listen_head;	/* list of origins we listen for */
130 	SlonListen *listen_tail;
131 
132 	SlonThreadStatus worker_status;		/* status of the worker thread */
133 	pthread_t	worker_thread;	/* thread id of worker thread */
134 	pthread_mutex_t message_lock;		/* mutex for the message queue */
135 	pthread_cond_t message_cond;	/* condition variable for queue */
136 	SlonWorkMsg *message_head;
137 	SlonWorkMsg *message_tail;
138 
139 	char	   *archive_name;
140 	char	   *archive_temp;
141 	char	   *archive_counter;
142 	char	   *archive_timestamp;
143 	FILE	   *archive_fp;
144 
145 	SlonNode   *prev;
146 	SlonNode   *next;
147 };
148 
149 /* ----------
150  * SlonListen
151  * ----------
152  */
153 struct SlonListen_s
154 {
155 	int			li_origin;		/* origin of events */
156 
157 	SlonListen *prev;
158 	SlonListen *next;
159 };
160 
161 /* ----------
162  * SlonSet
163  * ----------
164  */
165 struct SlonSet_s
166 {
167 	int			set_id;			/* set ID */
168 	int			set_origin;		/* set origin */
169 	char	   *set_comment;	/* set comment */
170 
171 	int			sub_provider;	/* from where this node receives */
172 	/* data (if subscribed) */
173 	int			sub_forward;	/* if we need to forward data */
174 	int			sub_active;		/* if the subscription is active */
175 
176 	SlonSet    *prev;
177 	SlonSet    *next;
178 };
179 
180 /* ----------
181  * SlonConn
182  * ----------
183  */
184 struct SlonConn_s
185 {
186 	char	   *symname;		/* Symbolic name of connection */
187 	struct SlonNode_s *node;	/* remote node this belongs to */
188 	PGconn	   *dbconn;			/* database connection */
189 	pthread_mutex_t conn_lock;	/* mutex for conn */
190 	pthread_cond_t conn_cond;	/* condition variable for conn */
191 
192 	int			condition;		/* what are we waiting for? */
193 	struct timeval timeout;		/* timeofday for timeout */
194 	int			pg_version;		/* PostgreSQL version */
195 	int			conn_pid;		/* PID of connection */
196 
197 	SlonConn   *prev;
198 	SlonConn   *next;
199 };
200 
201 /* ----------
202  * SlonDString
203  * ----------
204  */
205 #define		SLON_DSTRING_SIZE_INIT	256
206 #define		SLON_DSTRING_SIZE_INC	2
207 
208 typedef struct
209 {
210 	size_t		n_alloc;
211 	size_t		n_used;
212 	char	   *data;
213 }	SlonDString;
214 
215 #define		dstring_init(__ds) \
216 do { \
217 	(__ds)->n_alloc = SLON_DSTRING_SIZE_INIT; \
218 	(__ds)->n_used = 0; \
219 	(__ds)->data = malloc(SLON_DSTRING_SIZE_INIT); \
220 	if ((__ds)->data == NULL) { \
221 		slon_log(SLON_FATAL, "dstring_init: malloc() - %s", \
222 				strerror(errno)); \
223 		slon_abort(); \
224 	} \
225 } while (0)
226 #define		dstring_reset(__ds) \
227 do { \
228 	(__ds)->n_used = 0; \
229 	(__ds)->data[0] = '\0'; \
230 } while (0)
231 #define		dstring_free(__ds) \
232 do { \
233 	free((__ds)->data); \
234 	(__ds)->n_used = 0; \
235 	(__ds)->data = NULL; \
236 } while (0)
237 #define		dstring_nappend(__ds,__s,__n) \
238 do { \
239 	if ((__ds)->n_used + (__n) >= (__ds)->n_alloc)	\
240 	{ \
241 		while ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \
242 			(__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \
243 		(__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \
244 		if ((__ds)->data == NULL) \
245 		{ \
246 			slon_log(SLON_FATAL, "dstring_nappend: realloc() - %s", \
247 					strerror(errno)); \
248 			slon_abort(); \
249 		} \
250 	} \
251 	memcpy(&((__ds)->data[(__ds)->n_used]), (__s), (__n)); \
252 	(__ds)->n_used += (__n); \
253 } while (0)
254 #define		dstring_append(___ds,___s) \
255 do { \
256 	register int ___n = strlen((___s)); \
257 	dstring_nappend((___ds),(___s),___n); \
258 } while (0)
259 #define		dstring_addchar(__ds,__c) \
260 do { \
261 	if ((__ds)->n_used + 1 >= (__ds)->n_alloc)	\
262 	{ \
263 		(__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \
264 		(__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \
265 		if ((__ds)->data == NULL) \
266 		{ \
267 			slon_log(SLON_FATAL, "dstring_addchar: realloc() - %s", \
268 					strerror(errno)); \
269 			slon_abort(); \
270 		} \
271 	} \
272 	(__ds)->data[(__ds)->n_used++] = (__c); \
273 } while (0)
274 #define		dstring_terminate(__ds) \
275 do { \
276 	(__ds)->data[(__ds)->n_used] = '\0'; \
277 } while (0)
278 #define		dstring_data(__ds)	((__ds)->data)
279 
280 
281 /* ----------
282  * Macros to add and remove entries from double linked lists
283  * ----------
284  */
285 #define DLLIST_ADD_TAIL(_pf,_pl,_obj) \
286 do { \
287 	if ((_pl) == NULL) { \
288 		(_obj)->prev = (_obj)->next = NULL; \
289 		(_pf) = (_pl) = (_obj); \
290 	} else { \
291 		(_obj)->prev = (_pl); \
292 		(_obj)->next = NULL; \
293 		(_pl)->next = (_obj); \
294 		(_pl) = (_obj); \
295 	} \
296 } while (0)
297 
298 #define DLLIST_ADD_HEAD(_pf,_pl,_obj) \
299 do { \
300 	if ((_pf) == NULL) { \
301 		(_obj)->prev = (_obj)->next = NULL; \
302 		(_pf) = (_pl) = (_obj); \
303 	} else { \
304 		(_obj)->prev = NULL; \
305 		(_obj)->next = (_pf); \
306 		(_pf)->prev = (_obj); \
307 		(_pf) = (_obj); \
308 	} \
309 } while (0)
310 
311 #define DLLIST_REMOVE(_pf,_pl,_obj) \
312 do { \
313 	if ((_obj)->prev == NULL) { \
314 		(_pf) = (_obj)->next; \
315 	} else { \
316 		(_obj)->prev->next = (_obj)->next; \
317 	} \
318 	if ((_obj)->next == NULL) { \
319 		(_pl) = (_obj)->prev; \
320 	} else { \
321 		(_obj)->next->prev = (_obj)->prev; \
322 	} \
323 	(_obj)->prev = (_obj)->next = NULL; \
324 } while (0)
325 
326 
327 /* ----------
328  * Macro to compute the difference between two timeval structs
329  * as a double precision floating point.
330  * t1 = start time
331  * t2 = end time
332  * ----------
333  */
334 #define TIMEVAL_DIFF(_t1,_t2) \
335 	(((_t1)->tv_usec <= (_t2)->tv_usec) ? \
336 		(double)((_t2)->tv_sec - (_t1)->tv_sec) + (double)((_t2)->tv_usec - (_t1)->tv_usec) / 1000000.0 : \
337 		(double)((_t2)->tv_sec - (_t1)->tv_sec - 1) + (double)((_t2)->tv_usec + 1000000 - (_t1)->tv_usec) / 1000000.0)
338 
339 
340 /* ----------
341  * Scheduler runmodes
342  * ----------
343  */
344 typedef enum
345 {
346 	SCHED_STATUS_OK,
347 	SCHED_STATUS_SHUTDOWN,
348 	SCHED_STATUS_DONE,
349 	SCHED_STATUS_CANCEL,
350 	SCHED_STATUS_ERROR
351 }	ScheduleStatus;
352 
353 /* ----------
354  * Scheduler wait conditions
355  * ----------
356  */
357 #define SCHED_WAIT_SOCK_READ	1
358 #define SCHED_WAIT_SOCK_WRITE	2
359 #define SCHED_WAIT_TIMEOUT		4
360 #define SCHED_WAIT_CANCEL		8
361 
362 
363 /* ----------
364  * Globals in runtime_config.c
365  * ----------
366  */
367 extern pid_t slon_pid;
368 
369 #ifndef WIN32
370 extern pthread_mutex_t slon_watchdog_lock;
371 extern pid_t slon_watchdog_pid;
372 extern pid_t slon_worker_pid;
373 #endif
374 extern char *rtcfg_cluster_name;
375 extern char *rtcfg_namespace;
376 extern char *rtcfg_conninfo;
377 extern int	rtcfg_nodeid;
378 extern int	rtcfg_nodeactive;
379 extern char *rtcfg_nodecomment;
380 extern char rtcfg_lastevent[];
381 
382 extern SlonNode *rtcfg_node_list_head;
383 extern SlonNode *rtcfg_node_list_tail;
384 extern SlonSet *rtcfg_set_list_head;
385 extern SlonSet *rtcfg_set_list_tail;
386 
387 
388 /* ----------
389  * Functions in slon.c
390  * ----------
391  */
392 #ifndef WIN32
393 #define slon_abort() \
394 do { \
395 	pthread_mutex_lock(&slon_watchdog_lock); \
396 	if (slon_watchdog_pid >= 0) { \
397 		slon_log(SLON_DEBUG2, "slon_abort() from pid=%d\n", slon_pid); \
398 		(void) kill(slon_watchdog_pid, SIGTERM);			\
399 		slon_watchdog_pid = -1; \
400 	} \
401 	pthread_mutex_unlock(&slon_watchdog_lock); \
402 	pthread_exit(NULL); \
403 } while (0)
404 #define slon_restart() \
405 do { \
406 	pthread_mutex_lock(&slon_watchdog_lock); \
407 	if (slon_watchdog_pid >= 0) { \
408 		slon_log(SLON_DEBUG2, "slon_restart() from pid=%d\n", slon_pid); \
409 		(void) kill(slon_watchdog_pid, SIGHUP);			\
410 		slon_watchdog_pid = -1; \
411 	} \
412 	pthread_mutex_unlock(&slon_watchdog_lock); \
413 	pthread_exit(NULL); \
414 } while (0)
415 #define slon_retry() \
416 do { \
417 	pthread_mutex_lock(&slon_watchdog_lock); \
418 	if (slon_watchdog_pid >= 0) { \
419 		slon_log(SLON_DEBUG2, "slon_retry() from pid=%d\n", slon_pid); \
420 		(void) kill(slon_watchdog_pid, SIGUSR1);			\
421 		slon_watchdog_pid = -1; \
422 	} \
423 	pthread_mutex_unlock(&slon_watchdog_lock); \
424 	pthread_exit(NULL); \
425 } while (0)
426 #else							/* WIN32 */
427 /* On win32, we currently just bail out and let the service control manager
428  * deal with possible restarts */
429 #define slon_abort() \
430 do { \
431 	WSACleanup(); \
432 	exit(1); \
433 } while (0)
434 #define slon_restart() \
435 do { \
436 	WSACleanup(); \
437 	exit(1); \
438 } while (0)
439 #define slon_retry() \
440 do { \
441 	WSACleanup(); \
442 	exit(1); \
443 } while (0)
444 #endif
445 
446 extern void Usage(char *const argv[]);
447 
448 extern int	sched_wakeuppipe[];
449 extern pthread_mutex_t slon_wait_listen_lock;
450 extern pthread_cond_t slon_wait_listen_cond;
451 extern int	slon_listen_started;
452 
453 /* ----------
454  * Functions in runtime_config.c
455  * ----------
456  */
457 extern void rtcfg_lock(void);
458 extern void rtcfg_unlock(void);
459 
460 extern void rtcfg_storeNode(int no_id, char *no_comment);
461 extern void rtcfg_enableNode(int no_id);
462 extern void rtcfg_disableNode(int no_id);
463 extern SlonNode *rtcfg_findNode(int no_id);
464 extern int64 rtcfg_setNodeLastEvent(int no_id, int64 event_seq);
465 extern int64 rtcfg_getNodeLastEvent(int no_id);
466 extern void rtcfg_setNodeLastSnapshot(int no_id, char *snapshot);
467 extern char *rtcfg_getNodeLastSnapshot(int no_id);
468 
469 extern void rtcfg_storePath(int pa_server, char *pa_conninfo,
470 				int pa_connretry);
471 extern void rtcfg_dropPath(int pa_server);
472 
473 extern void rtcfg_reloadListen(PGconn *db);
474 extern void rtcfg_storeListen(int li_origin, int li_provider);
475 extern void rtcfg_dropListen(int li_origin, int li_provider);
476 
477 extern void rtcfg_storeSet(int set_id, int set_origin, char *set_comment);
478 extern void rtcfg_dropSet(int set_id);
479 extern void rtcfg_moveSet(int set_id, int old_origin, int new_origin,
480 			  int sub_provider);
481 extern void rtcfg_reloadSets(PGconn *db);
482 
483 extern void rtcfg_storeSubscribe(int sub_set, int sub_provider,
484 					 char *sub_forward);
485 extern void rtcfg_enableSubscription(int sub_set, int sub_provider,
486 						 char *sub_forward);
487 extern void rtcfg_unsubscribeSet(int sub_set);
488 
489 extern void rtcfg_needActivate(int no_id);
490 extern void rtcfg_doActivate(void);
491 extern void rtcfg_joinAllRemoteThreads(void);
492 
493 extern void rtcfg_seq_bump(void);
494 extern int64 rtcfg_seq_get(void);
495 
496 
497 /* ----------
498  * Functions in local_node.c
499  * ----------
500  */
501 extern void *slon_localEventThread(void *dummy);
502 
503 /*
504  * ----------
505  * Global variables in cleanup_thread.c
506  * ----------
507  */
508 
509 extern int	vac_frequency;
510 extern char *cleanup_interval;
511 
512 /* ----------
513  * Functions in cleanup_thread.c
514  * ----------
515  */
516 extern void *cleanupThread_main(void *dummy);
517 
518 /* ----------
519  * Global variables in sync_thread.c
520  * ----------
521  */
522 extern int	sync_interval;
523 extern int	sync_interval_timeout;
524 
525 
526 /* ----------
527  * Functions in sync_thread.c
528  * ----------
529  */
530 extern void *syncThread_main(void *dummy);
531 
532 /* ----------
533  * Functions in monitor_thread.c
534  * ----------
535  */
536 extern void *monitorThread_main(void *dummy);
537 extern void monitor_state(const char *actor, int node, pid_t conn_pid, const char *activity, int64 event, const char *event_type);
538 
539 /* ----------
540  * Globals in monitor_thread.c
541  * ----------
542  */
543 extern int	monitor_interval;
544 extern bool monitor_threads;
545 
546 
547 /* ----------
548  * Functions in local_listen.c
549  * ----------
550  */
551 extern void *localListenThread_main(void *dummy);
552 
553 
554 /* ----------
555  * Functions in remote_listen.c
556  * ----------
557  */
558 extern void *remoteListenThread_main(void *cdata);
559 
560 
561 /* ----------
562  * Globals in remote_worker.c
563  * ----------
564  */
565 extern int	sync_group_maxsize;
566 extern int	explain_interval;
567 
568 
569 /* ----------
570  * Functions in remote_worker.c
571  * ----------
572  */
573 extern void *remoteWorkerThread_main(void *cdata);
574 extern void remoteWorker_event(int event_provider,
575 				   int ev_origin, int64 ev_seqno,
576 				   char *ev_timestamp,
577 				   char *ev_snapshot, char *ev_mintxid, char *ev_maxtxid,
578 				   char *ev_type,
579 				   char *ev_data1, char *ev_data2,
580 				   char *ev_data3, char *ev_data4,
581 				   char *ev_data5, char *ev_data6,
582 				   char *ev_data7, char *ev_data8);
583 extern void remoteWorker_wakeup(int no_id);
584 extern void remoteWorker_confirm(int no_id,
585 					 char *con_origin_c, char *con_received_c,
586 					 char *con_seqno_c, char *con_timestamp_c);
587 
588 
589 /* ----------
590  * Functions in scheduler.c
591  * ----------
592  */
593 extern int	sched_start_mainloop(void);
594 extern int	sched_wait_mainloop(void);
595 extern int	sched_wait_conn(SlonConn * conn, int condition);
596 extern int	sched_wait_time(SlonConn * conn, int condition, int msec);
597 extern int	sched_msleep(SlonNode * node, int msec);
598 extern int	sched_get_status(void);
599 extern int	sched_wakeup_node(int no_id);
600 
601 
602 /* ----------
603  * Functions in dbutils.c
604  * ----------
605  */
606 extern SlonConn *slon_connectdb(char *conninfo, char *symname);
607 extern void slon_disconnectdb(SlonConn * conn);
608 extern SlonConn *slon_make_dummyconn(char *symname);
609 extern void slon_free_dummyconn(SlonConn * conn);
610 
611 extern int	db_getLocalNodeId(PGconn *conn);
612 extern int	db_checkSchemaVersion(PGconn *conn);
613 
614 extern void slon_mkquery(SlonDString * ds, char *fmt,...);
615 extern void slon_appendquery(SlonDString * ds, char *fmt,...);
616 extern char *sql_on_connection;
617 
618 /* ----------
619  * Globals in misc.c
620  * ----------
621  */
622 extern int	slon_log_level;
623 
624 #if !defined(pgpipe) && !defined(WIN32)
625 /* -----------------------------------
626  * pgpipe is not defined in PG pre-8.0
627  * -----------------------------------
628  */
629 #define pgpipe(a)			pipe(a)
630 #define piperead(a,b,c)		read(a,b,c)
631 #define pipewrite(a,b,c)	write(a,b,c)
632 #endif
633 
634 #if defined(WIN32)
635 #define snprintf pg_snprintf
636 #endif
637 #endif   /* SLON_H_INCLUDED */
638 
639 
640 /*
641  * Local Variables:
642  *	tab-width: 4
643  *	c-indent-level: 4
644  *	c-basic-offset: 4
645  * End:
646  */
647