1 /*-------------------------------------------------------------------------
2 *
3 * pglogical.c
4 * pglogical initialization and common functionality
5 *
6 * Copyright (c) 2015, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * pglogical.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "miscadmin.h"
16
17 #include "access/hash.h"
18 #include "access/htup_details.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21
22 #include "catalog/pg_extension.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_database.h"
26 #include "catalog/pg_type.h"
27
28 #include "commands/extension.h"
29 #include "commands/trigger.h"
30
31 #include "executor/executor.h"
32
33 #include "mb/pg_wchar.h"
34
35 #include "nodes/nodeFuncs.h"
36
37 #include "optimizer/planner.h"
38
39 #include "parser/parse_coerce.h"
40
41 #include "replication/reorderbuffer.h"
42
43 #include "storage/ipc.h"
44 #include "storage/proc.h"
45
46 #include "utils/builtins.h"
47 #include "utils/fmgroids.h"
48 #include "utils/lsyscache.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51
52 #include "pgstat.h"
53
54 #include "pglogical_executor.h"
55 #include "pglogical_node.h"
56 #include "pglogical_conflict.h"
57 #include "pglogical_worker.h"
58 #include "pglogical.h"
59
60 PG_MODULE_MAGIC;
61
62 static const struct config_enum_entry PGLogicalConflictResolvers[] = {
63 {"error", PGLOGICAL_RESOLVE_ERROR, false},
64 #ifndef XCP
65 {"apply_remote", PGLOGICAL_RESOLVE_APPLY_REMOTE, false},
66 {"keep_local", PGLOGICAL_RESOLVE_KEEP_LOCAL, false},
67 {"last_update_wins", PGLOGICAL_RESOLVE_LAST_UPDATE_WINS, false},
68 {"first_update_wins", PGLOGICAL_RESOLVE_FIRST_UPDATE_WINS, false},
69 #endif
70 {NULL, 0, false}
71 };
72
73 /* copied fom guc.c */
74 static const struct config_enum_entry server_message_level_options[] = {
75 {"debug", DEBUG2, true},
76 {"debug5", DEBUG5, false},
77 {"debug4", DEBUG4, false},
78 {"debug3", DEBUG3, false},
79 {"debug2", DEBUG2, false},
80 {"debug1", DEBUG1, false},
81 {"info", INFO, false},
82 {"notice", NOTICE, false},
83 {"warning", WARNING, false},
84 {"error", ERROR, false},
85 {"log", LOG, false},
86 {"fatal", FATAL, false},
87 {"panic", PANIC, false},
88 {NULL, 0, false}
89 };
90
91 bool pglogical_synchronous_commit = false;
92 char *pglogical_temp_directory = "";
93 bool pglogical_use_spi = false;
94 bool pglogical_batch_inserts = true;
95 static char *pglogical_temp_directory_config;
96
97 void _PG_init(void);
98 void pglogical_supervisor_main(Datum main_arg);
99 char *pglogical_extra_connection_options;
100
101 static PGconn * pglogical_connect_base(const char *connstr,
102 const char *appname,
103 const char *suffix,
104 bool replication);
105
106
107 /*
108 * Ensure string is not longer than maxlen.
109 *
110 * The way we do this is we if the string is longer we return prefix from that
111 * string and hash of the string which will together be exatly maxlen.
112 *
113 * Maxlen can't be less than 8 because hash produces uint32 which in hex form
114 * can have up to 8 characters.
115 */
116 char *
shorten_hash(const char * str,int maxlen)117 shorten_hash(const char *str, int maxlen)
118 {
119 char *ret;
120 int len = strlen(str);
121
122 Assert(maxlen >= 8);
123
124 if (len <= maxlen)
125 return pstrdup(str);
126
127 ret = (char *) palloc(maxlen + 1);
128 snprintf(ret, maxlen, "%.*s%08x", maxlen - 8,
129 str, DatumGetUInt32(hash_any((unsigned char *) str, len)));
130 ret[maxlen] = '\0';
131
132 return ret;
133 }
134
135 /*
136 * Convert text array to list of strings.
137 *
138 * Note: the resulting list points to the memory of the input array.
139 */
140 List *
textarray_to_list(ArrayType * textarray)141 textarray_to_list(ArrayType *textarray)
142 {
143 Datum *elems;
144 int nelems, i;
145 List *res = NIL;
146
147 deconstruct_array(textarray,
148 TEXTOID, -1, false, 'i',
149 &elems, NULL, &nelems);
150
151 if (nelems == 0)
152 return NIL;
153
154 for (i = 0; i < nelems; i++)
155 res = lappend(res, TextDatumGetCString(elems[i]));
156
157 return res;
158 }
159
160 /*
161 * Deconstruct the text representation of a 1-dimensional Postgres array
162 * into individual items.
163 *
164 * On success, returns true and sets *itemarray and *nitems to describe
165 * an array of individual strings. On parse failure, returns false;
166 * *itemarray may exist or be NULL.
167 *
168 * NOTE: free'ing itemarray is sufficient to deallocate the working storage.
169 */
170 bool
parsePGArray(const char * atext,char *** itemarray,int * nitems)171 parsePGArray(const char *atext, char ***itemarray, int *nitems)
172 {
173 int inputlen;
174 char **items;
175 char *strings;
176 int curitem;
177
178 /*
179 * We expect input in the form of "{item,item,item}" where any item is
180 * either raw data, or surrounded by double quotes (in which case embedded
181 * characters including backslashes and quotes are backslashed).
182 *
183 * We build the result as an array of pointers followed by the actual
184 * string data, all in one malloc block for convenience of deallocation.
185 * The worst-case storage need is not more than one pointer and one
186 * character for each input character (consider "{,,,,,,,,,,}").
187 */
188 *itemarray = NULL;
189 *nitems = 0;
190 inputlen = strlen(atext);
191 if (inputlen < 2 || atext[0] != '{' || atext[inputlen - 1] != '}')
192 return false; /* bad input */
193 items = (char **) malloc(inputlen * (sizeof(char *) + sizeof(char)));
194 if (items == NULL)
195 return false; /* out of memory */
196 *itemarray = items;
197 strings = (char *) (items + inputlen);
198
199 atext++; /* advance over initial '{' */
200 curitem = 0;
201 while (*atext != '}')
202 {
203 if (*atext == '\0')
204 return false; /* premature end of string */
205 items[curitem] = strings;
206 while (*atext != '}' && *atext != ',')
207 {
208 if (*atext == '\0')
209 return false; /* premature end of string */
210 if (*atext != '"')
211 *strings++ = *atext++; /* copy unquoted data */
212 else
213 {
214 /* process quoted substring */
215 atext++;
216 while (*atext != '"')
217 {
218 if (*atext == '\0')
219 return false; /* premature end of string */
220 if (*atext == '\\')
221 {
222 atext++;
223 if (*atext == '\0')
224 return false; /* premature end of string */
225 }
226 *strings++ = *atext++; /* copy quoted data */
227 }
228 atext++;
229 }
230 }
231 *strings++ = '\0';
232 if (*atext == ',')
233 atext++;
234 curitem++;
235 }
236 if (atext[1] != '\0')
237 return false; /* bogus syntax (embedded '}') */
238 *nitems = curitem;
239 return true;
240 }
241
242 /*
243 * Get oid of our queue table.
244 */
245 inline Oid
get_pglogical_table_oid(const char * table)246 get_pglogical_table_oid(const char *table)
247 {
248 Oid nspoid;
249 Oid reloid;
250
251 nspoid = get_namespace_oid(EXTENSION_NAME, false);
252
253 reloid = get_relname_relid(table, nspoid);
254
255 if (reloid == InvalidOid)
256 elog(ERROR, "cache lookup failed for relation %s.%s",
257 EXTENSION_NAME, table);
258
259 return reloid;
260 }
261
262 #define CONN_PARAM_ARRAY_SIZE 9
263
264 static PGconn *
pglogical_connect_base(const char * connstr,const char * appname,const char * suffix,bool replication)265 pglogical_connect_base(const char *connstr, const char *appname,
266 const char *suffix, bool replication)
267 {
268 int i=0;
269 PGconn *conn;
270 const char *keys[CONN_PARAM_ARRAY_SIZE];
271 const char *vals[CONN_PARAM_ARRAY_SIZE];
272 StringInfoData s;
273
274 initStringInfo(&s);
275 appendStringInfoString(&s, pglogical_extra_connection_options);
276 appendStringInfoChar(&s, ' ');
277 appendStringInfoString(&s, connstr);
278
279 keys[i] = "dbname";
280 vals[i] = connstr;
281 i++;
282 keys[i] = "application_name";
283 if (suffix)
284 {
285 char s[NAMEDATALEN];
286 snprintf(s, NAMEDATALEN,
287 "%s_%s",
288 shorten_hash(appname, NAMEDATALEN - strlen(suffix) - 2),
289 suffix);
290 vals[i] = s;
291 }
292 else
293 vals[i] = appname;
294 i++;
295 keys[i] = "connect_timeout";
296 vals[i] = "30";
297 i++;
298 keys[i] = "keepalives";
299 vals[i] = "1";
300 i++;
301 keys[i] = "keepalives_idle";
302 vals[i] = "20";
303 i++;
304 keys[i] = "keepalives_interval";
305 vals[i] = "20";
306 i++;
307 keys[i] = "keepalives_count";
308 vals[i] = "5";
309 i++;
310 keys[i] = "replication";
311 vals[i] = replication ? "database" : NULL;
312 i++;
313 keys[i] = NULL;
314 vals[i] = NULL;
315
316 Assert(i <= CONN_PARAM_ARRAY_SIZE);
317
318 /*
319 * We use the expand_dbname parameter to process the connection string
320 * (or URI), and pass some extra options.
321 */
322 conn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
323 if (PQstatus(conn) != CONNECTION_OK)
324 {
325 ereport(ERROR,
326 (errmsg("could not connect to the postgresql server%s: %s",
327 replication ? " in replication mode" : "",
328 PQerrorMessage(conn)),
329 errdetail("dsn was: %s", s.data)));
330 }
331
332 resetStringInfo(&s);
333
334 return conn;
335 }
336
337
338 /*
339 * Make standard postgres connection, ERROR on failure.
340 */
341 PGconn *
pglogical_connect(const char * connstring,const char * connname,const char * suffix)342 pglogical_connect(const char *connstring, const char *connname,
343 const char *suffix)
344 {
345 return pglogical_connect_base(connstring, connname, suffix, false);
346 }
347
348 /*
349 * Make replication connection, ERROR on failure.
350 */
351 PGconn *
pglogical_connect_replica(const char * connstring,const char * connname,const char * suffix)352 pglogical_connect_replica(const char *connstring, const char *connname,
353 const char *suffix)
354 {
355 return pglogical_connect_base(connstring, connname, suffix, true);
356 }
357
358 /*
359 * Make sure the extension is up to date.
360 *
361 * Called by db manager.
362 */
363 void
pglogical_manage_extension(void)364 pglogical_manage_extension(void)
365 {
366 Relation extrel;
367 SysScanDesc scandesc;
368 HeapTuple tuple;
369 ScanKeyData key[1];
370
371 if (RecoveryInProgress())
372 return;
373
374 PushActiveSnapshot(GetTransactionSnapshot());
375
376 /* make sure we're operating without other pglogical workers interfering */
377 extrel = table_open(ExtensionRelationId, ShareUpdateExclusiveLock);
378
379 ScanKeyInit(&key[0],
380 Anum_pg_extension_extname,
381 BTEqualStrategyNumber, F_NAMEEQ,
382 CStringGetDatum(EXTENSION_NAME));
383
384 scandesc = systable_beginscan(extrel, ExtensionNameIndexId, true,
385 NULL, 1, key);
386
387 tuple = systable_getnext(scandesc);
388
389 /* No extension, nothing to update. */
390 if (HeapTupleIsValid(tuple))
391 {
392 Datum datum;
393 bool isnull;
394 char *extversion;
395
396 /* Determine extension version. */
397 datum = heap_getattr(tuple, Anum_pg_extension_extversion,
398 RelationGetDescr(extrel), &isnull);
399 if (isnull)
400 elog(ERROR, "extversion is null");
401 extversion = text_to_cstring(DatumGetTextPP(datum));
402
403 /* Only run the alter if the versions don't match. */
404 if (strcmp(extversion, PGLOGICAL_VERSION) != 0)
405 {
406 AlterExtensionStmt alter_stmt;
407
408 alter_stmt.options = NIL;
409 alter_stmt.extname = EXTENSION_NAME;
410 ExecAlterExtensionStmt(&alter_stmt);
411 }
412 }
413
414 systable_endscan(scandesc);
415 table_close(extrel, NoLock);
416
417 PopActiveSnapshot();
418 }
419
420 /*
421 * Call IDENTIFY_SYSTEM on the connection and report its results.
422 */
423 void
pglogical_identify_system(PGconn * streamConn,uint64 * sysid,TimeLineID * timeline,XLogRecPtr * xlogpos,Name * dbname)424 pglogical_identify_system(PGconn *streamConn, uint64* sysid,
425 TimeLineID *timeline, XLogRecPtr *xlogpos,
426 Name *dbname)
427 {
428 PGresult *res;
429
430 res = PQexec(streamConn, "IDENTIFY_SYSTEM");
431 if (PQresultStatus(res) != PGRES_TUPLES_OK)
432 {
433 elog(ERROR, "could not send replication command \"%s\": %s",
434 "IDENTIFY_SYSTEM", PQerrorMessage(streamConn));
435 }
436 if (PQntuples(res) != 1 || PQnfields(res) < 4)
437 {
438 elog(ERROR, "could not identify system: got %d rows and %d fields, expected %d rows and at least %d fields\n",
439 PQntuples(res), PQnfields(res), 1, 4);
440 }
441
442 if (PQnfields(res) > 4)
443 {
444 elog(DEBUG2, "ignoring extra fields in IDENTIFY_SYSTEM response; expected 4, got %d",
445 PQnfields(res));
446 }
447
448 if (sysid != NULL)
449 {
450 const char *remote_sysid = PQgetvalue(res, 0, 0);
451 if (sscanf(remote_sysid, UINT64_FORMAT, sysid) != 1)
452 elog(ERROR, "could not parse remote sysid %s", remote_sysid);
453 }
454
455 if (timeline != NULL)
456 {
457 const char *remote_tlid = PQgetvalue(res, 0, 1);
458 if (sscanf(remote_tlid, "%u", timeline) != 1)
459 elog(ERROR, "could not parse remote tlid %s", remote_tlid);
460 }
461
462 if (xlogpos != NULL)
463 {
464 const char *remote_xlogpos = PQgetvalue(res, 0, 2);
465 uint32 xlogpos_low, xlogpos_high;
466 if (sscanf(remote_xlogpos, "%X/%X", &xlogpos_high, &xlogpos_low) != 2)
467 elog(ERROR, "could not parse remote xlogpos %s", remote_xlogpos);
468 *xlogpos = (((XLogRecPtr)xlogpos_high)<<32) + xlogpos_low;
469 }
470
471 if (dbname != NULL)
472 {
473 char *remote_dbname = PQgetvalue(res, 0, 3);
474 strncpy(NameStr(**dbname), remote_dbname, NAMEDATALEN);
475 NameStr(**dbname)[NAMEDATALEN-1] = '\0';
476 }
477
478 PQclear(res);
479 }
480
481 void
pglogical_start_replication(PGconn * streamConn,const char * slot_name,XLogRecPtr start_pos,const char * forward_origins,const char * replication_sets,const char * replicate_only_table,bool force_text_transfer)482 pglogical_start_replication(PGconn *streamConn, const char *slot_name,
483 XLogRecPtr start_pos, const char *forward_origins,
484 const char *replication_sets,
485 const char *replicate_only_table,
486 bool force_text_transfer)
487 {
488 StringInfoData command;
489 PGresult *res;
490 char *sqlstate;
491 const char *want_binary = (force_text_transfer ? "0" : "1");
492
493 initStringInfo(&command);
494 appendStringInfo(&command, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (",
495 slot_name,
496 (uint32) (start_pos >> 32),
497 (uint32) start_pos);
498
499 /* Basic protocol info. */
500 appendStringInfo(&command, "expected_encoding '%s'",
501 GetDatabaseEncodingName());
502 appendStringInfo(&command, ", min_proto_version '%d'", PGLOGICAL_MIN_PROTO_VERSION_NUM);
503 appendStringInfo(&command, ", max_proto_version '%d'", PGLOGICAL_MAX_PROTO_VERSION_NUM);
504 appendStringInfo(&command, ", startup_params_format '1'");
505
506 /* Binary protocol compatibility. */
507 appendStringInfo(&command, ", \"binary.want_internal_basetypes\" '%s'", want_binary);
508 appendStringInfo(&command, ", \"binary.want_binary_basetypes\" '%s'", want_binary);
509 appendStringInfo(&command, ", \"binary.basetypes_major_version\" '%u'",
510 PG_VERSION_NUM/100);
511 appendStringInfo(&command, ", \"binary.sizeof_datum\" '%zu'",
512 sizeof(Datum));
513 appendStringInfo(&command, ", \"binary.sizeof_int\" '%zu'", sizeof(int));
514 appendStringInfo(&command, ", \"binary.sizeof_long\" '%zu'", sizeof(long));
515 appendStringInfo(&command, ", \"binary.bigendian\" '%d'",
516 #ifdef WORDS_BIGENDIAN
517 true
518 #else
519 false
520 #endif
521 );
522 appendStringInfo(&command, ", \"binary.float4_byval\" '%d'",
523 #ifdef USE_FLOAT4_BYVAL
524 true
525 #else
526 false
527 #endif
528 );
529 appendStringInfo(&command, ", \"binary.float8_byval\" '%d'",
530 #ifdef USE_FLOAT8_BYVAL
531 true
532 #else
533 false
534 #endif
535 );
536 appendStringInfo(&command, ", \"binary.integer_datetimes\" '%d'",
537 #ifdef USE_INTEGER_DATETIMES
538 true
539 #else
540 false
541 #endif
542 );
543
544 /* We don't care about this anymore but pglogical 1.x expects this. */
545 appendStringInfoString(&command,
546 ", \"hooks.setup_function\" 'pglogical.pglogical_hooks_setup'");
547
548 if (forward_origins)
549 appendStringInfo(&command, ", \"pglogical.forward_origins\" %s",
550 quote_literal_cstr(forward_origins));
551
552 if (replicate_only_table)
553 {
554 /* Send the table name we want to the upstream */
555 appendStringInfoString(&command, ", \"pglogical.replicate_only_table\" ");
556 appendStringInfoString(&command, quote_literal_cstr(replicate_only_table));
557 }
558
559 if (replication_sets)
560 {
561 /* Send the replication set names we want to the upstream */
562 appendStringInfoString(&command, ", \"pglogical.replication_set_names\" ");
563 appendStringInfoString(&command, quote_literal_cstr(replication_sets));
564 }
565
566 /* Tell the upstream that we want unbounded metadata cache size */
567 appendStringInfoString(&command, ", \"relmeta_cache_size\" '-1'");
568
569 /* general info about the downstream */
570 appendStringInfo(&command, ", pg_version '%u'", PG_VERSION_NUM);
571 appendStringInfo(&command, ", pglogical_version '%s'", PGLOGICAL_VERSION);
572 appendStringInfo(&command, ", pglogical_version_num '%d'", PGLOGICAL_VERSION_NUM);
573 appendStringInfo(&command, ", pglogical_apply_pid '%d'", MyProcPid);
574
575 appendStringInfoChar(&command, ')');
576
577 res = PQexec(streamConn, command.data);
578 sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
579 if (PQresultStatus(res) != PGRES_COPY_BOTH)
580 elog(FATAL, "could not send replication command \"%s\": %s\n, sqlstate: %s",
581 command.data, PQresultErrorMessage(res), sqlstate);
582 PQclear(res);
583 }
584
585 /*
586 * Start the manager workers for every db which has a pglogical node.
587 *
588 * Note that we start workers that are not necessary here. We do this because
589 * we need to check every individual database to check if there is pglogical
590 * node setup and it's not possible to switch connections to different
591 * databases within one background worker. The workers that won't find any
592 * pglogical node setup will exit immediately during startup.
593 * This behavior can cause issue where we consume all the allowed workers and
594 * eventually error out even though the max_worker_processes is set high enough
595 * to satisfy the actual needed worker count.
596 *
597 * Must be run inside a transaction.
598 */
599 static void
start_manager_workers(void)600 start_manager_workers(void)
601 {
602 Relation rel;
603 TableScanDesc scan;
604 HeapTuple tup;
605
606 /* Run manager worker for every connectable database. */
607 rel = table_open(DatabaseRelationId, AccessShareLock);
608 scan = table_beginscan_catalog(rel, 0, NULL);
609
610 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
611 {
612 Form_pg_database pgdatabase = (Form_pg_database) GETSTRUCT(tup);
613 #if PG_VERSION_NUM < 120000
614 Oid dboid = HeapTupleGetOid(tup);
615 #else
616 Oid dboid = pgdatabase->oid;
617 #endif
618 PGLogicalWorker worker;
619
620 CHECK_FOR_INTERRUPTS();
621
622 /* Can't run workers on databases which don't allow connection. */
623 if (!pgdatabase->datallowconn)
624 continue;
625
626 /* Worker already attached, nothing to do. */
627 LWLockAcquire(PGLogicalCtx->lock, LW_EXCLUSIVE);
628 if (pglogical_worker_running(pglogical_manager_find(dboid)))
629 {
630 LWLockRelease(PGLogicalCtx->lock);
631 continue;
632 }
633 LWLockRelease(PGLogicalCtx->lock);
634
635 /* No record found, try running new worker. */
636 elog(DEBUG1, "registering pglogical manager process for database %s",
637 NameStr(pgdatabase->datname));
638
639 memset(&worker, 0, sizeof(PGLogicalWorker));
640 worker.worker_type = PGLOGICAL_WORKER_MANAGER;
641 worker.dboid = dboid;
642
643 pglogical_worker_register(&worker);
644 }
645
646 table_endscan(scan);
647 table_close(rel, AccessShareLock);
648 }
649
650 /*
651 * Static bgworker used for initialization and management (our main process).
652 */
653 void
pglogical_supervisor_main(Datum main_arg)654 pglogical_supervisor_main(Datum main_arg)
655 {
656 /* Establish signal handlers. */
657 pqsignal(SIGTERM, handle_sigterm);
658 BackgroundWorkerUnblockSignals();
659
660 /*
661 * Initialize supervisor info in shared memory. Strictly speaking we
662 * don't need a lock here, because no other process could possibly be
663 * looking at this shared struct since they're all started by the
664 * supervisor, but let's be safe.
665 */
666 LWLockAcquire(PGLogicalCtx->lock, LW_EXCLUSIVE);
667 PGLogicalCtx->supervisor = MyProc;
668 PGLogicalCtx->subscriptions_changed = true;
669 LWLockRelease(PGLogicalCtx->lock);
670
671 /* Make it easy to identify our processes. */
672 SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
673 PGC_USERSET, PGC_S_SESSION);
674
675 elog(LOG, "starting pglogical supervisor");
676
677 VALGRIND_PRINTF("PGLOGICAL: supervisor\n");
678
679 /* Setup connection to pinned catalogs (we only ever read pg_database). */
680 #if PG_VERSION_NUM >= 110000
681 BackgroundWorkerInitializeConnection(NULL, NULL, 0);
682 #elif PG_VERSION_NUM >= 90500
683 BackgroundWorkerInitializeConnection(NULL, NULL);
684 #else
685 BackgroundWorkerInitializeConnection("postgres", NULL);
686 #endif
687
688 /* Main wait loop. */
689 while (!got_SIGTERM)
690 {
691 int rc;
692
693 if (PGLogicalCtx->subscriptions_changed)
694 {
695 /*
696 * No need to lock here, since we'll take account of all sub
697 * changes up to this point, even if new ones were added between
698 * the test above and flag clear. We're just being woken up.
699 */
700 PGLogicalCtx->subscriptions_changed = false;
701 StartTransactionCommand();
702 start_manager_workers();
703 CommitTransactionCommand();
704 }
705
706 rc = WaitLatch(&MyProc->procLatch,
707 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
708 180000L);
709
710 ResetLatch(&MyProc->procLatch);
711
712 /* emergency bailout if postmaster has died */
713 if (rc & WL_POSTMASTER_DEATH)
714 proc_exit(1);
715 }
716
717 VALGRIND_PRINTF("PGLOGICAL: supervisor exit\n");
718 proc_exit(0);
719 }
720
721 static void
pglogical_temp_directory_assing_hook(const char * newval,void * extra)722 pglogical_temp_directory_assing_hook(const char *newval, void *extra)
723 {
724 if (strlen(newval))
725 {
726 pglogical_temp_directory = strdup(newval);
727 }
728 else
729 {
730 #ifndef WIN32
731 const char *tmpdir = getenv("TMPDIR");
732
733 if (!tmpdir)
734 tmpdir = "/tmp";
735 #else
736 char tmpdir[MAXPGPATH];
737 int ret;
738
739 ret = GetTempPath(MAXPGPATH, tmpdir);
740 if (ret == 0 || ret > MAXPGPATH)
741 {
742 ereport(ERROR,
743 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
744 errmsg("could not locate temporary directory: %s\n",
745 !ret ? strerror(errno) : "")));
746 return false;
747 }
748 #endif
749
750 pglogical_temp_directory = strdup(tmpdir);
751
752 }
753
754 if (pglogical_temp_directory == NULL)
755 ereport(ERROR,
756 (errcode(ERRCODE_OUT_OF_MEMORY),
757 errmsg("out of memory")));
758 }
759
760
761 /*
762 * Entry point for this module.
763 */
764 void
_PG_init(void)765 _PG_init(void)
766 {
767 BackgroundWorker bgw;
768
769 if (!process_shared_preload_libraries_in_progress)
770 elog(ERROR, "pglogical is not in shared_preload_libraries");
771
772 DefineCustomEnumVariable("pglogical.conflict_resolution",
773 gettext_noop("Sets method used for conflict resolution for resolvable conflicts."),
774 NULL,
775 &pglogical_conflict_resolver,
776 #ifdef XCP
777 PGLOGICAL_RESOLVE_ERROR,
778 #else
779 PGLOGICAL_RESOLVE_APPLY_REMOTE,
780 #endif
781 PGLogicalConflictResolvers,
782 PGC_SUSET, 0,
783 pglogical_conflict_resolver_check_hook,
784 NULL, NULL);
785
786 DefineCustomEnumVariable("pglogical.conflict_log_level",
787 gettext_noop("Sets log level used for logging resolved conflicts."),
788 NULL,
789 &pglogical_conflict_log_level,
790 LOG,
791 server_message_level_options,
792 PGC_SUSET, 0,
793 NULL, NULL, NULL);
794
795 DefineCustomBoolVariable("pglogical.synchronous_commit",
796 "pglogical specific synchronous commit value",
797 NULL,
798 &pglogical_synchronous_commit,
799 false, PGC_POSTMASTER,
800 0,
801 NULL, NULL, NULL);
802
803 DefineCustomBoolVariable("pglogical.use_spi",
804 "Use SPI instead of low-level API for applying changes",
805 NULL,
806 &pglogical_use_spi,
807 #ifdef XCP
808 true,
809 #else
810 false,
811 #endif
812 PGC_POSTMASTER,
813 0,
814 NULL, NULL, NULL);
815
816 DefineCustomBoolVariable("pglogical.batch_inserts",
817 "Batch inserts if possible",
818 NULL,
819 &pglogical_batch_inserts,
820 true,
821 PGC_POSTMASTER,
822 0,
823 NULL, NULL, NULL);
824
825 /*
826 * We can't use the temp_tablespace safely for our dumps, because Pg's
827 * crash recovery is very careful to delete only particularly formatted
828 * files. Instead for now just allow user to specify dump storage.
829 */
830 DefineCustomStringVariable("pglogical.temp_directory",
831 "Directory to store dumps for local restore",
832 NULL,
833 &pglogical_temp_directory_config,
834 "", PGC_SIGHUP,
835 0,
836 NULL,
837 pglogical_temp_directory_assing_hook,
838 NULL);
839
840 DefineCustomStringVariable("pglogical.extra_connection_options",
841 "connection options to add to all peer node connections",
842 NULL,
843 &pglogical_extra_connection_options,
844 "",
845 PGC_SIGHUP,
846 0,
847 NULL, NULL, NULL);
848
849 if (IsBinaryUpgrade)
850 return;
851
852 /* Init workers. */
853 pglogical_worker_shmem_init();
854
855 /* Init executor module */
856 pglogical_executor_init();
857
858 /* Run the supervisor. */
859 memset(&bgw, 0, sizeof(bgw));
860 bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
861 BGWORKER_BACKEND_DATABASE_CONNECTION;
862 bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
863 snprintf(bgw.bgw_library_name, BGW_MAXLEN,
864 EXTENSION_NAME);
865 snprintf(bgw.bgw_function_name, BGW_MAXLEN,
866 "pglogical_supervisor_main");
867 snprintf(bgw.bgw_name, BGW_MAXLEN,
868 "pglogical supervisor");
869 bgw.bgw_restart_time = 5;
870
871 RegisterBackgroundWorker(&bgw);
872 }
873