1 /*------------------------------------------------------------------------- 2 * slon.h 3 * 4 * Global definitions for the main replication engine. 5 * 6 * Copyright (c) 2003-2009, PostgreSQL Global Development Group 7 * Author: Jan Wieck, Afilias USA INC. 8 * 9 * 10 *------------------------------------------------------------------------- 11 */ 12 13 #ifndef SLON_H_INCLUDED 14 #define SLON_H_INCLUDED 15 #ifdef MSVC 16 #include "config_msvc.h" 17 #else 18 #include "config.h" 19 #endif 20 #include "types.h" 21 #include "libpq-fe.h" 22 #include "misc.h" 23 #include "conf-file.h" 24 #include "confoptions.h" 25 #include <pg_config.h> 26 #ifdef WIN32 27 #include <winsock2.h> 28 #else 29 #include <sys/time.h> 30 #endif 31 32 #ifndef INT64_FORMAT 33 #define INT64_FORMAT "%" INT64_MODIFIER "d" 34 #endif 35 36 #define SLON_MEMDEBUG 1 37 #ifndef false 38 #define false 0 39 #endif 40 #ifndef true 41 #define true 1 42 #endif 43 44 #undef SLON_CHECK_CMDTUPLES 45 46 #ifdef SLON_CHECK_CMDTUPLES 47 #define SLON_COMMANDS_PER_LINE 1 48 #define SLON_DATA_FETCH_SIZE 100 49 #define SLON_WORKLINES_PER_HELPER (SLON_DATA_FETCH_SIZE * 4) 50 #else 51 #define SLON_COMMANDS_PER_LINE 10 52 #define SLON_DATA_FETCH_SIZE 50 53 #define SLON_WORKLINES_PER_HELPER (SLON_DATA_FETCH_SIZE * 5) 54 #endif 55 56 #define SLON_MAX_PATH 1024 57 /* Maximum path length - set to 1024, consistent with PostgreSQL */ 58 /* See: http://archives.postgresql.org/pgsql-hackers/1999-10/msg00754.php */ 59 /* Also view src/include/pg_config.h.win32 src/include/pg_config_manual.h */ 60 61 /* FIXME: must determine and use OS specific max path length */ 62 /* cbb: Not forcibly necessary; note that MAXPGPATH is 1024 */ 63 64 /* cleanup calls */ 65 #define SLON_VACUUM_FREQUENCY 3 /* vacuum every 3rd cleanup */ 66 67 68 typedef enum 69 { 70 SLON_TSTAT_NONE, 71 SLON_TSTAT_RUNNING, 72 SLON_TSTAT_SHUTDOWN, 73 SLON_TSTAT_RESTART, 74 SLON_TSTAT_DONE 75 } SlonThreadStatus; 76 77 78 extern bool logpid; 79 80 /* ---------- 81 * In memory structures for cluster configuration 82 * ---------- 83 */ 84 typedef struct SlonNode_s SlonNode; 85 typedef struct SlonListen_s SlonListen; 86 typedef struct SlonSet_s SlonSet; 87 typedef struct SlonConn_s SlonConn; 88 typedef struct SlonState_s SlonState; 89 90 typedef struct SlonWorkMsg_s SlonWorkMsg; 91 92 /* ---------- 93 * SlonState 94 * ---------- 95 */ 96 struct SlonState_s 97 { 98 char *actor; 99 pid_t pid; 100 int node; 101 pid_t conn_pid; 102 char *activity; 103 time_t start_time; 104 int64 event; 105 char *event_type; 106 }; 107 108 /* ---------- 109 * SlonNode 110 * ---------- 111 */ 112 struct SlonNode_s 113 { 114 int no_id; /* node ID */ 115 int no_active; /* it's active state */ 116 char *no_comment; /* comment field */ 117 #if 0 118 pthread_mutex_t node_lock; /* mutex for node */ 119 #endif 120 121 char *pa_conninfo; /* path to the node */ 122 int pa_connretry; /* connection retry interval */ 123 124 int64 last_event; /* last event we have received */ 125 char *last_snapshot; /* snapshot of last sync event */ 126 127 SlonThreadStatus listen_status; /* status of the listen thread */ 128 pthread_t listen_thread; /* thread id of listen thread */ 129 SlonListen *listen_head; /* list of origins we listen for */ 130 SlonListen *listen_tail; 131 132 SlonThreadStatus worker_status; /* status of the worker thread */ 133 pthread_t worker_thread; /* thread id of worker thread */ 134 pthread_mutex_t message_lock; /* mutex for the message queue */ 135 pthread_cond_t message_cond; /* condition variable for queue */ 136 SlonWorkMsg *message_head; 137 SlonWorkMsg *message_tail; 138 139 char *archive_name; 140 char *archive_temp; 141 char *archive_counter; 142 char *archive_timestamp; 143 FILE *archive_fp; 144 145 SlonNode *prev; 146 SlonNode *next; 147 }; 148 149 /* ---------- 150 * SlonListen 151 * ---------- 152 */ 153 struct SlonListen_s 154 { 155 int li_origin; /* origin of events */ 156 157 SlonListen *prev; 158 SlonListen *next; 159 }; 160 161 /* ---------- 162 * SlonSet 163 * ---------- 164 */ 165 struct SlonSet_s 166 { 167 int set_id; /* set ID */ 168 int set_origin; /* set origin */ 169 char *set_comment; /* set comment */ 170 171 int sub_provider; /* from where this node receives */ 172 /* data (if subscribed) */ 173 int sub_forward; /* if we need to forward data */ 174 int sub_active; /* if the subscription is active */ 175 176 SlonSet *prev; 177 SlonSet *next; 178 }; 179 180 /* ---------- 181 * SlonConn 182 * ---------- 183 */ 184 struct SlonConn_s 185 { 186 char *symname; /* Symbolic name of connection */ 187 struct SlonNode_s *node; /* remote node this belongs to */ 188 PGconn *dbconn; /* database connection */ 189 pthread_mutex_t conn_lock; /* mutex for conn */ 190 pthread_cond_t conn_cond; /* condition variable for conn */ 191 192 int condition; /* what are we waiting for? */ 193 struct timeval timeout; /* timeofday for timeout */ 194 int pg_version; /* PostgreSQL version */ 195 int conn_pid; /* PID of connection */ 196 197 SlonConn *prev; 198 SlonConn *next; 199 }; 200 201 /* ---------- 202 * SlonDString 203 * ---------- 204 */ 205 #define SLON_DSTRING_SIZE_INIT 256 206 #define SLON_DSTRING_SIZE_INC 2 207 208 typedef struct 209 { 210 size_t n_alloc; 211 size_t n_used; 212 char *data; 213 } SlonDString; 214 215 #define dstring_init(__ds) \ 216 do { \ 217 (__ds)->n_alloc = SLON_DSTRING_SIZE_INIT; \ 218 (__ds)->n_used = 0; \ 219 (__ds)->data = malloc(SLON_DSTRING_SIZE_INIT); \ 220 if ((__ds)->data == NULL) { \ 221 slon_log(SLON_FATAL, "dstring_init: malloc() - %s", \ 222 strerror(errno)); \ 223 slon_abort(); \ 224 } \ 225 } while (0) 226 #define dstring_reset(__ds) \ 227 do { \ 228 (__ds)->n_used = 0; \ 229 (__ds)->data[0] = '\0'; \ 230 } while (0) 231 #define dstring_free(__ds) \ 232 do { \ 233 free((__ds)->data); \ 234 (__ds)->n_used = 0; \ 235 (__ds)->data = NULL; \ 236 } while (0) 237 #define dstring_nappend(__ds,__s,__n) \ 238 do { \ 239 if ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \ 240 { \ 241 while ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \ 242 (__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \ 243 (__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \ 244 if ((__ds)->data == NULL) \ 245 { \ 246 slon_log(SLON_FATAL, "dstring_nappend: realloc() - %s", \ 247 strerror(errno)); \ 248 slon_abort(); \ 249 } \ 250 } \ 251 memcpy(&((__ds)->data[(__ds)->n_used]), (__s), (__n)); \ 252 (__ds)->n_used += (__n); \ 253 } while (0) 254 #define dstring_append(___ds,___s) \ 255 do { \ 256 register int ___n = strlen((___s)); \ 257 dstring_nappend((___ds),(___s),___n); \ 258 } while (0) 259 #define dstring_addchar(__ds,__c) \ 260 do { \ 261 if ((__ds)->n_used + 1 >= (__ds)->n_alloc) \ 262 { \ 263 (__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \ 264 (__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \ 265 if ((__ds)->data == NULL) \ 266 { \ 267 slon_log(SLON_FATAL, "dstring_addchar: realloc() - %s", \ 268 strerror(errno)); \ 269 slon_abort(); \ 270 } \ 271 } \ 272 (__ds)->data[(__ds)->n_used++] = (__c); \ 273 } while (0) 274 #define dstring_terminate(__ds) \ 275 do { \ 276 (__ds)->data[(__ds)->n_used] = '\0'; \ 277 } while (0) 278 #define dstring_data(__ds) ((__ds)->data) 279 280 281 /* ---------- 282 * Macros to add and remove entries from double linked lists 283 * ---------- 284 */ 285 #define DLLIST_ADD_TAIL(_pf,_pl,_obj) \ 286 do { \ 287 if ((_pl) == NULL) { \ 288 (_obj)->prev = (_obj)->next = NULL; \ 289 (_pf) = (_pl) = (_obj); \ 290 } else { \ 291 (_obj)->prev = (_pl); \ 292 (_obj)->next = NULL; \ 293 (_pl)->next = (_obj); \ 294 (_pl) = (_obj); \ 295 } \ 296 } while (0) 297 298 #define DLLIST_ADD_HEAD(_pf,_pl,_obj) \ 299 do { \ 300 if ((_pf) == NULL) { \ 301 (_obj)->prev = (_obj)->next = NULL; \ 302 (_pf) = (_pl) = (_obj); \ 303 } else { \ 304 (_obj)->prev = NULL; \ 305 (_obj)->next = (_pf); \ 306 (_pf)->prev = (_obj); \ 307 (_pf) = (_obj); \ 308 } \ 309 } while (0) 310 311 #define DLLIST_REMOVE(_pf,_pl,_obj) \ 312 do { \ 313 if ((_obj)->prev == NULL) { \ 314 (_pf) = (_obj)->next; \ 315 } else { \ 316 (_obj)->prev->next = (_obj)->next; \ 317 } \ 318 if ((_obj)->next == NULL) { \ 319 (_pl) = (_obj)->prev; \ 320 } else { \ 321 (_obj)->next->prev = (_obj)->prev; \ 322 } \ 323 (_obj)->prev = (_obj)->next = NULL; \ 324 } while (0) 325 326 327 /* ---------- 328 * Macro to compute the difference between two timeval structs 329 * as a double precision floating point. 330 * t1 = start time 331 * t2 = end time 332 * ---------- 333 */ 334 #define TIMEVAL_DIFF(_t1,_t2) \ 335 (((_t1)->tv_usec <= (_t2)->tv_usec) ? \ 336 (double)((_t2)->tv_sec - (_t1)->tv_sec) + (double)((_t2)->tv_usec - (_t1)->tv_usec) / 1000000.0 : \ 337 (double)((_t2)->tv_sec - (_t1)->tv_sec - 1) + (double)((_t2)->tv_usec + 1000000 - (_t1)->tv_usec) / 1000000.0) 338 339 340 /* ---------- 341 * Scheduler runmodes 342 * ---------- 343 */ 344 typedef enum 345 { 346 SCHED_STATUS_OK, 347 SCHED_STATUS_SHUTDOWN, 348 SCHED_STATUS_DONE, 349 SCHED_STATUS_CANCEL, 350 SCHED_STATUS_ERROR 351 } ScheduleStatus; 352 353 /* ---------- 354 * Scheduler wait conditions 355 * ---------- 356 */ 357 #define SCHED_WAIT_SOCK_READ 1 358 #define SCHED_WAIT_SOCK_WRITE 2 359 #define SCHED_WAIT_TIMEOUT 4 360 #define SCHED_WAIT_CANCEL 8 361 362 363 /* ---------- 364 * Globals in runtime_config.c 365 * ---------- 366 */ 367 extern pid_t slon_pid; 368 369 #ifndef WIN32 370 extern pthread_mutex_t slon_watchdog_lock; 371 extern pid_t slon_watchdog_pid; 372 extern pid_t slon_worker_pid; 373 #endif 374 extern char *rtcfg_cluster_name; 375 extern char *rtcfg_namespace; 376 extern char *rtcfg_conninfo; 377 extern int rtcfg_nodeid; 378 extern int rtcfg_nodeactive; 379 extern char *rtcfg_nodecomment; 380 extern char rtcfg_lastevent[]; 381 382 extern SlonNode *rtcfg_node_list_head; 383 extern SlonNode *rtcfg_node_list_tail; 384 extern SlonSet *rtcfg_set_list_head; 385 extern SlonSet *rtcfg_set_list_tail; 386 387 388 /* ---------- 389 * Functions in slon.c 390 * ---------- 391 */ 392 #ifndef WIN32 393 #define slon_abort() \ 394 do { \ 395 pthread_mutex_lock(&slon_watchdog_lock); \ 396 if (slon_watchdog_pid >= 0) { \ 397 slon_log(SLON_DEBUG2, "slon_abort() from pid=%d\n", slon_pid); \ 398 (void) kill(slon_watchdog_pid, SIGTERM); \ 399 slon_watchdog_pid = -1; \ 400 } \ 401 pthread_mutex_unlock(&slon_watchdog_lock); \ 402 pthread_exit(NULL); \ 403 } while (0) 404 #define slon_restart() \ 405 do { \ 406 pthread_mutex_lock(&slon_watchdog_lock); \ 407 if (slon_watchdog_pid >= 0) { \ 408 slon_log(SLON_DEBUG2, "slon_restart() from pid=%d\n", slon_pid); \ 409 (void) kill(slon_watchdog_pid, SIGHUP); \ 410 slon_watchdog_pid = -1; \ 411 } \ 412 pthread_mutex_unlock(&slon_watchdog_lock); \ 413 pthread_exit(NULL); \ 414 } while (0) 415 #define slon_retry() \ 416 do { \ 417 pthread_mutex_lock(&slon_watchdog_lock); \ 418 if (slon_watchdog_pid >= 0) { \ 419 slon_log(SLON_DEBUG2, "slon_retry() from pid=%d\n", slon_pid); \ 420 (void) kill(slon_watchdog_pid, SIGUSR1); \ 421 slon_watchdog_pid = -1; \ 422 } \ 423 pthread_mutex_unlock(&slon_watchdog_lock); \ 424 pthread_exit(NULL); \ 425 } while (0) 426 #else /* WIN32 */ 427 /* On win32, we currently just bail out and let the service control manager 428 * deal with possible restarts */ 429 #define slon_abort() \ 430 do { \ 431 WSACleanup(); \ 432 exit(1); \ 433 } while (0) 434 #define slon_restart() \ 435 do { \ 436 WSACleanup(); \ 437 exit(1); \ 438 } while (0) 439 #define slon_retry() \ 440 do { \ 441 WSACleanup(); \ 442 exit(1); \ 443 } while (0) 444 #endif 445 446 extern void Usage(char *const argv[]); 447 448 extern int sched_wakeuppipe[]; 449 extern pthread_mutex_t slon_wait_listen_lock; 450 extern pthread_cond_t slon_wait_listen_cond; 451 extern int slon_listen_started; 452 453 /* ---------- 454 * Functions in runtime_config.c 455 * ---------- 456 */ 457 extern void rtcfg_lock(void); 458 extern void rtcfg_unlock(void); 459 460 extern void rtcfg_storeNode(int no_id, char *no_comment); 461 extern void rtcfg_enableNode(int no_id); 462 extern void rtcfg_disableNode(int no_id); 463 extern SlonNode *rtcfg_findNode(int no_id); 464 extern int64 rtcfg_setNodeLastEvent(int no_id, int64 event_seq); 465 extern int64 rtcfg_getNodeLastEvent(int no_id); 466 extern void rtcfg_setNodeLastSnapshot(int no_id, char *snapshot); 467 extern char *rtcfg_getNodeLastSnapshot(int no_id); 468 469 extern void rtcfg_storePath(int pa_server, char *pa_conninfo, 470 int pa_connretry); 471 extern void rtcfg_dropPath(int pa_server); 472 473 extern void rtcfg_reloadListen(PGconn *db); 474 extern void rtcfg_storeListen(int li_origin, int li_provider); 475 extern void rtcfg_dropListen(int li_origin, int li_provider); 476 477 extern void rtcfg_storeSet(int set_id, int set_origin, char *set_comment); 478 extern void rtcfg_dropSet(int set_id); 479 extern void rtcfg_moveSet(int set_id, int old_origin, int new_origin, 480 int sub_provider); 481 extern void rtcfg_reloadSets(PGconn *db); 482 483 extern void rtcfg_storeSubscribe(int sub_set, int sub_provider, 484 char *sub_forward); 485 extern void rtcfg_enableSubscription(int sub_set, int sub_provider, 486 char *sub_forward); 487 extern void rtcfg_unsubscribeSet(int sub_set); 488 489 extern void rtcfg_needActivate(int no_id); 490 extern void rtcfg_doActivate(void); 491 extern void rtcfg_joinAllRemoteThreads(void); 492 493 extern void rtcfg_seq_bump(void); 494 extern int64 rtcfg_seq_get(void); 495 496 497 /* ---------- 498 * Functions in local_node.c 499 * ---------- 500 */ 501 extern void *slon_localEventThread(void *dummy); 502 503 /* 504 * ---------- 505 * Global variables in cleanup_thread.c 506 * ---------- 507 */ 508 509 extern int vac_frequency; 510 extern char *cleanup_interval; 511 512 /* ---------- 513 * Functions in cleanup_thread.c 514 * ---------- 515 */ 516 extern void *cleanupThread_main(void *dummy); 517 518 /* ---------- 519 * Global variables in sync_thread.c 520 * ---------- 521 */ 522 extern int sync_interval; 523 extern int sync_interval_timeout; 524 525 526 /* ---------- 527 * Functions in sync_thread.c 528 * ---------- 529 */ 530 extern void *syncThread_main(void *dummy); 531 532 /* ---------- 533 * Functions in monitor_thread.c 534 * ---------- 535 */ 536 extern void *monitorThread_main(void *dummy); 537 extern void monitor_state(const char *actor, int node, pid_t conn_pid, const char *activity, int64 event, const char *event_type); 538 539 /* ---------- 540 * Globals in monitor_thread.c 541 * ---------- 542 */ 543 extern int monitor_interval; 544 extern bool monitor_threads; 545 546 547 /* ---------- 548 * Functions in local_listen.c 549 * ---------- 550 */ 551 extern void *localListenThread_main(void *dummy); 552 553 554 /* ---------- 555 * Functions in remote_listen.c 556 * ---------- 557 */ 558 extern void *remoteListenThread_main(void *cdata); 559 560 561 /* ---------- 562 * Globals in remote_worker.c 563 * ---------- 564 */ 565 extern int sync_group_maxsize; 566 extern int explain_interval; 567 568 569 /* ---------- 570 * Functions in remote_worker.c 571 * ---------- 572 */ 573 extern void *remoteWorkerThread_main(void *cdata); 574 extern void remoteWorker_event(int event_provider, 575 int ev_origin, int64 ev_seqno, 576 char *ev_timestamp, 577 char *ev_snapshot, char *ev_mintxid, char *ev_maxtxid, 578 char *ev_type, 579 char *ev_data1, char *ev_data2, 580 char *ev_data3, char *ev_data4, 581 char *ev_data5, char *ev_data6, 582 char *ev_data7, char *ev_data8); 583 extern void remoteWorker_wakeup(int no_id); 584 extern void remoteWorker_confirm(int no_id, 585 char *con_origin_c, char *con_received_c, 586 char *con_seqno_c, char *con_timestamp_c); 587 588 589 /* ---------- 590 * Functions in scheduler.c 591 * ---------- 592 */ 593 extern int sched_start_mainloop(void); 594 extern int sched_wait_mainloop(void); 595 extern int sched_wait_conn(SlonConn * conn, int condition); 596 extern int sched_wait_time(SlonConn * conn, int condition, int msec); 597 extern int sched_msleep(SlonNode * node, int msec); 598 extern int sched_get_status(void); 599 extern int sched_wakeup_node(int no_id); 600 601 602 /* ---------- 603 * Functions in dbutils.c 604 * ---------- 605 */ 606 extern SlonConn *slon_connectdb(char *conninfo, char *symname); 607 extern void slon_disconnectdb(SlonConn * conn); 608 extern SlonConn *slon_make_dummyconn(char *symname); 609 extern void slon_free_dummyconn(SlonConn * conn); 610 611 extern int db_getLocalNodeId(PGconn *conn); 612 extern int db_checkSchemaVersion(PGconn *conn); 613 614 extern void slon_mkquery(SlonDString * ds, char *fmt,...); 615 extern void slon_appendquery(SlonDString * ds, char *fmt,...); 616 extern char *sql_on_connection; 617 618 /* ---------- 619 * Globals in misc.c 620 * ---------- 621 */ 622 extern int slon_log_level; 623 624 #if !defined(pgpipe) && !defined(WIN32) 625 /* ----------------------------------- 626 * pgpipe is not defined in PG pre-8.0 627 * ----------------------------------- 628 */ 629 #define pgpipe(a) pipe(a) 630 #define piperead(a,b,c) read(a,b,c) 631 #define pipewrite(a,b,c) write(a,b,c) 632 #endif 633 634 #if defined(WIN32) 635 #define snprintf pg_snprintf 636 #endif 637 #endif /* SLON_H_INCLUDED */ 638 639 640 /* 641 * Local Variables: 642 * tab-width: 4 643 * c-indent-level: 4 644 * c-basic-offset: 4 645 * End: 646 */ 647