1 /*-------------------------------------------------------------------------
2  *
3  * pglogical.c
4  * 		pglogical initialization and common functionality
5  *
6  * Copyright (c) 2015, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  pglogical.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "miscadmin.h"
16 
17 #include "access/hash.h"
18 #include "access/htup_details.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21 
22 #include "catalog/pg_extension.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_database.h"
26 #include "catalog/pg_type.h"
27 
28 #include "commands/extension.h"
29 #include "commands/trigger.h"
30 
31 #include "executor/executor.h"
32 
33 #include "mb/pg_wchar.h"
34 
35 #include "nodes/nodeFuncs.h"
36 
37 #include "optimizer/planner.h"
38 
39 #include "parser/parse_coerce.h"
40 
41 #include "replication/reorderbuffer.h"
42 
43 #include "storage/ipc.h"
44 #include "storage/proc.h"
45 
46 #include "utils/builtins.h"
47 #include "utils/fmgroids.h"
48 #include "utils/lsyscache.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51 
52 #include "pgstat.h"
53 
54 #include "pglogical_executor.h"
55 #include "pglogical_node.h"
56 #include "pglogical_conflict.h"
57 #include "pglogical_worker.h"
58 #include "pglogical.h"
59 
60 PG_MODULE_MAGIC;
61 
62 static const struct config_enum_entry PGLogicalConflictResolvers[] = {
63 	{"error", PGLOGICAL_RESOLVE_ERROR, false},
64 #ifndef XCP
65 	{"apply_remote", PGLOGICAL_RESOLVE_APPLY_REMOTE, false},
66 	{"keep_local", PGLOGICAL_RESOLVE_KEEP_LOCAL, false},
67 	{"last_update_wins", PGLOGICAL_RESOLVE_LAST_UPDATE_WINS, false},
68 	{"first_update_wins", PGLOGICAL_RESOLVE_FIRST_UPDATE_WINS, false},
69 #endif
70 	{NULL, 0, false}
71 };
72 
73 /* copied fom guc.c */
74 static const struct config_enum_entry server_message_level_options[] = {
75 	{"debug", DEBUG2, true},
76 	{"debug5", DEBUG5, false},
77 	{"debug4", DEBUG4, false},
78 	{"debug3", DEBUG3, false},
79 	{"debug2", DEBUG2, false},
80 	{"debug1", DEBUG1, false},
81 	{"info", INFO, false},
82 	{"notice", NOTICE, false},
83 	{"warning", WARNING, false},
84 	{"error", ERROR, false},
85 	{"log", LOG, false},
86 	{"fatal", FATAL, false},
87 	{"panic", PANIC, false},
88 	{NULL, 0, false}
89 };
90 
91 bool	pglogical_synchronous_commit = false;
92 char   *pglogical_temp_directory = "";
93 bool	pglogical_use_spi = false;
94 bool	pglogical_batch_inserts = true;
95 static char *pglogical_temp_directory_config;
96 
97 void _PG_init(void);
98 void pglogical_supervisor_main(Datum main_arg);
99 char *pglogical_extra_connection_options;
100 
101 static PGconn * pglogical_connect_base(const char *connstr,
102 									   const char *appname,
103 									   const char *suffix,
104 									   bool replication);
105 
106 
107 /*
108  * Ensure string is not longer than maxlen.
109  *
110  * The way we do this is we if the string is longer we return prefix from that
111  * string and hash of the string which will together be exatly maxlen.
112  *
113  * Maxlen can't be less than 8 because hash produces uint32 which in hex form
114  * can have up to 8 characters.
115  */
116 char *
shorten_hash(const char * str,int maxlen)117 shorten_hash(const char *str, int maxlen)
118 {
119 	char   *ret;
120 	int		len = strlen(str);
121 
122 	Assert(maxlen >= 8);
123 
124 	if (len <= maxlen)
125 		return pstrdup(str);
126 
127 	ret = (char *) palloc(maxlen + 1);
128 	snprintf(ret, maxlen, "%.*s%08x", maxlen - 8,
129 			 str, DatumGetUInt32(hash_any((unsigned char *) str, len)));
130 	ret[maxlen] = '\0';
131 
132 	return ret;
133 }
134 
135 /*
136  * Convert text array to list of strings.
137  *
138  * Note: the resulting list points to the memory of the input array.
139  */
140 List *
textarray_to_list(ArrayType * textarray)141 textarray_to_list(ArrayType *textarray)
142 {
143 	Datum		   *elems;
144 	int				nelems, i;
145 	List		   *res = NIL;
146 
147 	deconstruct_array(textarray,
148 					  TEXTOID, -1, false, 'i',
149 					  &elems, NULL, &nelems);
150 
151 	if (nelems == 0)
152 		return NIL;
153 
154 	for (i = 0; i < nelems; i++)
155 		res = lappend(res, TextDatumGetCString(elems[i]));
156 
157 	return res;
158 }
159 
160 /*
161  * Deconstruct the text representation of a 1-dimensional Postgres array
162  * into individual items.
163  *
164  * On success, returns true and sets *itemarray and *nitems to describe
165  * an array of individual strings.  On parse failure, returns false;
166  * *itemarray may exist or be NULL.
167  *
168  * NOTE: free'ing itemarray is sufficient to deallocate the working storage.
169  */
170 bool
parsePGArray(const char * atext,char *** itemarray,int * nitems)171 parsePGArray(const char *atext, char ***itemarray, int *nitems)
172 {
173 	int			inputlen;
174 	char	  **items;
175 	char	   *strings;
176 	int			curitem;
177 
178 	/*
179 	 * We expect input in the form of "{item,item,item}" where any item is
180 	 * either raw data, or surrounded by double quotes (in which case embedded
181 	 * characters including backslashes and quotes are backslashed).
182 	 *
183 	 * We build the result as an array of pointers followed by the actual
184 	 * string data, all in one malloc block for convenience of deallocation.
185 	 * The worst-case storage need is not more than one pointer and one
186 	 * character for each input character (consider "{,,,,,,,,,,}").
187 	 */
188 	*itemarray = NULL;
189 	*nitems = 0;
190 	inputlen = strlen(atext);
191 	if (inputlen < 2 || atext[0] != '{' || atext[inputlen - 1] != '}')
192 		return false;			/* bad input */
193 	items = (char **) malloc(inputlen * (sizeof(char *) + sizeof(char)));
194 	if (items == NULL)
195 		return false;			/* out of memory */
196 	*itemarray = items;
197 	strings = (char *) (items + inputlen);
198 
199 	atext++;					/* advance over initial '{' */
200 	curitem = 0;
201 	while (*atext != '}')
202 	{
203 		if (*atext == '\0')
204 			return false;		/* premature end of string */
205 		items[curitem] = strings;
206 		while (*atext != '}' && *atext != ',')
207 		{
208 			if (*atext == '\0')
209 				return false;	/* premature end of string */
210 			if (*atext != '"')
211 				*strings++ = *atext++;	/* copy unquoted data */
212 			else
213 			{
214 				/* process quoted substring */
215 				atext++;
216 				while (*atext != '"')
217 				{
218 					if (*atext == '\0')
219 						return false;	/* premature end of string */
220 					if (*atext == '\\')
221 					{
222 						atext++;
223 						if (*atext == '\0')
224 							return false;		/* premature end of string */
225 					}
226 					*strings++ = *atext++;		/* copy quoted data */
227 				}
228 				atext++;
229 			}
230 		}
231 		*strings++ = '\0';
232 		if (*atext == ',')
233 			atext++;
234 		curitem++;
235 	}
236 	if (atext[1] != '\0')
237 		return false;			/* bogus syntax (embedded '}') */
238 	*nitems = curitem;
239 	return true;
240 }
241 
242 /*
243  * Get oid of our queue table.
244  */
245 inline Oid
get_pglogical_table_oid(const char * table)246 get_pglogical_table_oid(const char *table)
247 {
248 	Oid			nspoid;
249 	Oid			reloid;
250 
251 	nspoid = get_namespace_oid(EXTENSION_NAME, false);
252 
253 	reloid = get_relname_relid(table, nspoid);
254 
255 	if (reloid == InvalidOid)
256 		elog(ERROR, "cache lookup failed for relation %s.%s",
257 			 EXTENSION_NAME, table);
258 
259 	return reloid;
260 }
261 
262 #define CONN_PARAM_ARRAY_SIZE 9
263 
264 static PGconn *
pglogical_connect_base(const char * connstr,const char * appname,const char * suffix,bool replication)265 pglogical_connect_base(const char *connstr, const char *appname,
266 					   const char *suffix, bool replication)
267 {
268 	int				i=0;
269 	PGconn		   *conn;
270 	const char	   *keys[CONN_PARAM_ARRAY_SIZE];
271 	const char	   *vals[CONN_PARAM_ARRAY_SIZE];
272 	StringInfoData s;
273 
274 	initStringInfo(&s);
275 	appendStringInfoString(&s, pglogical_extra_connection_options);
276 	appendStringInfoChar(&s, ' ');
277 	appendStringInfoString(&s, connstr);
278 
279 	keys[i] = "dbname";
280 	vals[i] = connstr;
281 	i++;
282 	keys[i] = "application_name";
283 	if (suffix)
284 	{
285 		char	s[NAMEDATALEN];
286 		snprintf(s, NAMEDATALEN,
287 			 "%s_%s",
288 			 shorten_hash(appname, NAMEDATALEN - strlen(suffix) - 2),
289 			 suffix);
290 		vals[i] = s;
291 	}
292 	else
293 		vals[i] = appname;
294 	i++;
295 	keys[i] = "connect_timeout";
296 	vals[i] = "30";
297 	i++;
298 	keys[i] = "keepalives";
299 	vals[i] = "1";
300 	i++;
301 	keys[i] = "keepalives_idle";
302 	vals[i] = "20";
303 	i++;
304 	keys[i] = "keepalives_interval";
305 	vals[i] = "20";
306 	i++;
307 	keys[i] = "keepalives_count";
308 	vals[i] = "5";
309 	i++;
310 	keys[i] = "replication";
311 	vals[i] = replication ? "database" : NULL;
312 	i++;
313 	keys[i] = NULL;
314 	vals[i] = NULL;
315 
316 	Assert(i <= CONN_PARAM_ARRAY_SIZE);
317 
318 	/*
319 	 * We use the expand_dbname parameter to process the connection string
320 	 * (or URI), and pass some extra options.
321 	 */
322 	conn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
323 	if (PQstatus(conn) != CONNECTION_OK)
324 	{
325 		ereport(ERROR,
326 				(errmsg("could not connect to the postgresql server%s: %s",
327 						replication ? " in replication mode" : "",
328 						PQerrorMessage(conn)),
329 				 errdetail("dsn was: %s", s.data)));
330 	}
331 
332 	resetStringInfo(&s);
333 
334 	return conn;
335 }
336 
337 
338 /*
339  * Make standard postgres connection, ERROR on failure.
340  */
341 PGconn *
pglogical_connect(const char * connstring,const char * connname,const char * suffix)342 pglogical_connect(const char *connstring, const char *connname,
343 				  const char *suffix)
344 {
345 	return pglogical_connect_base(connstring, connname, suffix, false);
346 }
347 
348 /*
349  * Make replication connection, ERROR on failure.
350  */
351 PGconn *
pglogical_connect_replica(const char * connstring,const char * connname,const char * suffix)352 pglogical_connect_replica(const char *connstring, const char *connname,
353 						  const char *suffix)
354 {
355 	return pglogical_connect_base(connstring, connname, suffix, true);
356 }
357 
358 /*
359  * Make sure the extension is up to date.
360  *
361  * Called by db manager.
362  */
363 void
pglogical_manage_extension(void)364 pglogical_manage_extension(void)
365 {
366 	Relation	extrel;
367 	SysScanDesc scandesc;
368 	HeapTuple	tuple;
369 	ScanKeyData key[1];
370 
371 	if (RecoveryInProgress())
372 		return;
373 
374 	PushActiveSnapshot(GetTransactionSnapshot());
375 
376 	/* make sure we're operating without other pglogical workers interfering */
377 	extrel = table_open(ExtensionRelationId, ShareUpdateExclusiveLock);
378 
379 	ScanKeyInit(&key[0],
380 				Anum_pg_extension_extname,
381 				BTEqualStrategyNumber, F_NAMEEQ,
382 				CStringGetDatum(EXTENSION_NAME));
383 
384 	scandesc = systable_beginscan(extrel, ExtensionNameIndexId, true,
385 								  NULL, 1, key);
386 
387 	tuple = systable_getnext(scandesc);
388 
389 	/* No extension, nothing to update. */
390 	if (HeapTupleIsValid(tuple))
391 	{
392 		Datum		datum;
393 		bool		isnull;
394 		char	   *extversion;
395 
396 		/* Determine extension version. */
397 		datum = heap_getattr(tuple, Anum_pg_extension_extversion,
398 							 RelationGetDescr(extrel), &isnull);
399 		if (isnull)
400 			elog(ERROR, "extversion is null");
401 		extversion = text_to_cstring(DatumGetTextPP(datum));
402 
403 		/* Only run the alter if the versions don't match. */
404 		if (strcmp(extversion, PGLOGICAL_VERSION) != 0)
405 		{
406 			AlterExtensionStmt alter_stmt;
407 
408 			alter_stmt.options = NIL;
409 			alter_stmt.extname = EXTENSION_NAME;
410 			ExecAlterExtensionStmt(&alter_stmt);
411 		}
412 	}
413 
414 	systable_endscan(scandesc);
415 	table_close(extrel, NoLock);
416 
417 	PopActiveSnapshot();
418 }
419 
420 /*
421  * Call IDENTIFY_SYSTEM on the connection and report its results.
422  */
423 void
pglogical_identify_system(PGconn * streamConn,uint64 * sysid,TimeLineID * timeline,XLogRecPtr * xlogpos,Name * dbname)424 pglogical_identify_system(PGconn *streamConn, uint64* sysid,
425 							TimeLineID *timeline, XLogRecPtr *xlogpos,
426 							Name *dbname)
427 {
428 	PGresult	   *res;
429 
430 	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
431 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
432 	{
433 		elog(ERROR, "could not send replication command \"%s\": %s",
434 			 "IDENTIFY_SYSTEM", PQerrorMessage(streamConn));
435 	}
436 	if (PQntuples(res) != 1 || PQnfields(res) < 4)
437 	{
438 		elog(ERROR, "could not identify system: got %d rows and %d fields, expected %d rows and at least %d fields\n",
439 			 PQntuples(res), PQnfields(res), 1, 4);
440 	}
441 
442 	if (PQnfields(res) > 4)
443 	{
444 		elog(DEBUG2, "ignoring extra fields in IDENTIFY_SYSTEM response; expected 4, got %d",
445 			 PQnfields(res));
446 	}
447 
448 	if (sysid != NULL)
449 	{
450 		const char *remote_sysid = PQgetvalue(res, 0, 0);
451 		if (sscanf(remote_sysid, UINT64_FORMAT, sysid) != 1)
452 			elog(ERROR, "could not parse remote sysid %s", remote_sysid);
453 	}
454 
455 	if (timeline != NULL)
456 	{
457 		const char *remote_tlid = PQgetvalue(res, 0, 1);
458 		if (sscanf(remote_tlid, "%u", timeline) != 1)
459 			elog(ERROR, "could not parse remote tlid %s", remote_tlid);
460 	}
461 
462 	if (xlogpos != NULL)
463 	{
464 		const char *remote_xlogpos = PQgetvalue(res, 0, 2);
465 		uint32 xlogpos_low, xlogpos_high;
466 		if (sscanf(remote_xlogpos, "%X/%X", &xlogpos_high, &xlogpos_low) != 2)
467 			elog(ERROR, "could not parse remote xlogpos %s", remote_xlogpos);
468 		*xlogpos = (((XLogRecPtr)xlogpos_high)<<32) + xlogpos_low;
469 	}
470 
471 	if (dbname != NULL)
472 	{
473 		char *remote_dbname = PQgetvalue(res, 0, 3);
474 		strncpy(NameStr(**dbname), remote_dbname, NAMEDATALEN);
475 		NameStr(**dbname)[NAMEDATALEN-1] = '\0';
476 	}
477 
478 	PQclear(res);
479 }
480 
481 void
pglogical_start_replication(PGconn * streamConn,const char * slot_name,XLogRecPtr start_pos,const char * forward_origins,const char * replication_sets,const char * replicate_only_table,bool force_text_transfer)482 pglogical_start_replication(PGconn *streamConn, const char *slot_name,
483 							XLogRecPtr start_pos, const char *forward_origins,
484 							const char *replication_sets,
485 							const char *replicate_only_table,
486 							bool force_text_transfer)
487 {
488 	StringInfoData	command;
489 	PGresult	   *res;
490 	char		   *sqlstate;
491 	const char	   *want_binary = (force_text_transfer ? "0" : "1");
492 
493 	initStringInfo(&command);
494 	appendStringInfo(&command, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X (",
495 					 slot_name,
496 					 (uint32) (start_pos >> 32),
497 					 (uint32) start_pos);
498 
499 	/* Basic protocol info. */
500 	appendStringInfo(&command, "expected_encoding '%s'",
501 					 GetDatabaseEncodingName());
502 	appendStringInfo(&command, ", min_proto_version '%d'", PGLOGICAL_MIN_PROTO_VERSION_NUM);
503 	appendStringInfo(&command, ", max_proto_version '%d'", PGLOGICAL_MAX_PROTO_VERSION_NUM);
504 	appendStringInfo(&command, ", startup_params_format '1'");
505 
506 	/* Binary protocol compatibility. */
507 	appendStringInfo(&command, ", \"binary.want_internal_basetypes\" '%s'", want_binary);
508 	appendStringInfo(&command, ", \"binary.want_binary_basetypes\" '%s'", want_binary);
509 	appendStringInfo(&command, ", \"binary.basetypes_major_version\" '%u'",
510 					 PG_VERSION_NUM/100);
511 	appendStringInfo(&command, ", \"binary.sizeof_datum\" '%zu'",
512 					 sizeof(Datum));
513 	appendStringInfo(&command, ", \"binary.sizeof_int\" '%zu'", sizeof(int));
514 	appendStringInfo(&command, ", \"binary.sizeof_long\" '%zu'", sizeof(long));
515 	appendStringInfo(&command, ", \"binary.bigendian\" '%d'",
516 #ifdef WORDS_BIGENDIAN
517 					 true
518 #else
519 					 false
520 #endif
521 					 );
522 	appendStringInfo(&command, ", \"binary.float4_byval\" '%d'",
523 #ifdef USE_FLOAT4_BYVAL
524 					 true
525 #else
526 					 false
527 #endif
528 					 );
529 	appendStringInfo(&command, ", \"binary.float8_byval\" '%d'",
530 #ifdef USE_FLOAT8_BYVAL
531 					 true
532 #else
533 					 false
534 #endif
535 					 );
536 	appendStringInfo(&command, ", \"binary.integer_datetimes\" '%d'",
537 #ifdef USE_INTEGER_DATETIMES
538 					 true
539 #else
540 					 false
541 #endif
542 					 );
543 
544 	/* We don't care about this anymore but pglogical 1.x expects this. */
545 	appendStringInfoString(&command,
546 						   ", \"hooks.setup_function\" 'pglogical.pglogical_hooks_setup'");
547 
548 	if (forward_origins)
549 		appendStringInfo(&command, ", \"pglogical.forward_origins\" %s",
550 					 quote_literal_cstr(forward_origins));
551 
552 	if (replicate_only_table)
553 	{
554 		/* Send the table name we want to the upstream */
555 		appendStringInfoString(&command, ", \"pglogical.replicate_only_table\" ");
556 		appendStringInfoString(&command, quote_literal_cstr(replicate_only_table));
557 	}
558 
559 	if (replication_sets)
560 	{
561 		/* Send the replication set names we want to the upstream */
562 		appendStringInfoString(&command, ", \"pglogical.replication_set_names\" ");
563 		appendStringInfoString(&command, quote_literal_cstr(replication_sets));
564 	}
565 
566 	/* Tell the upstream that we want unbounded metadata cache size */
567 	appendStringInfoString(&command, ", \"relmeta_cache_size\" '-1'");
568 
569 	/* general info about the downstream */
570 	appendStringInfo(&command, ", pg_version '%u'", PG_VERSION_NUM);
571 	appendStringInfo(&command, ", pglogical_version '%s'", PGLOGICAL_VERSION);
572 	appendStringInfo(&command, ", pglogical_version_num '%d'", PGLOGICAL_VERSION_NUM);
573 	appendStringInfo(&command, ", pglogical_apply_pid '%d'", MyProcPid);
574 
575 	appendStringInfoChar(&command, ')');
576 
577 	res = PQexec(streamConn, command.data);
578 	sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
579 	if (PQresultStatus(res) != PGRES_COPY_BOTH)
580 		elog(FATAL, "could not send replication command \"%s\": %s\n, sqlstate: %s",
581 			 command.data, PQresultErrorMessage(res), sqlstate);
582 	PQclear(res);
583 }
584 
585 /*
586  * Start the manager workers for every db which has a pglogical node.
587  *
588  * Note that we start workers that are not necessary here. We do this because
589  * we need to check every individual database to check if there is pglogical
590  * node setup and it's not possible to switch connections to different
591  * databases within one background worker. The workers that won't find any
592  * pglogical node setup will exit immediately during startup.
593  * This behavior can cause issue where we consume all the allowed workers and
594  * eventually error out even though the max_worker_processes is set high enough
595  * to satisfy the actual needed worker count.
596  *
597  * Must be run inside a transaction.
598  */
599 static void
start_manager_workers(void)600 start_manager_workers(void)
601 {
602 	Relation	rel;
603 	TableScanDesc scan;
604 	HeapTuple	tup;
605 
606 	/* Run manager worker for every connectable database. */
607 	rel = table_open(DatabaseRelationId, AccessShareLock);
608 	scan = table_beginscan_catalog(rel, 0, NULL);
609 
610 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
611 	{
612 		Form_pg_database	pgdatabase = (Form_pg_database) GETSTRUCT(tup);
613 #if PG_VERSION_NUM < 120000
614 		Oid					dboid = HeapTupleGetOid(tup);
615 #else
616 		Oid					dboid = pgdatabase->oid;
617 #endif
618 		PGLogicalWorker		worker;
619 
620 		CHECK_FOR_INTERRUPTS();
621 
622 		/* Can't run workers on databases which don't allow connection. */
623 		if (!pgdatabase->datallowconn)
624 			continue;
625 
626 		/* Worker already attached, nothing to do. */
627 		LWLockAcquire(PGLogicalCtx->lock, LW_EXCLUSIVE);
628 		if (pglogical_worker_running(pglogical_manager_find(dboid)))
629 		{
630 			LWLockRelease(PGLogicalCtx->lock);
631 			continue;
632 		}
633 		LWLockRelease(PGLogicalCtx->lock);
634 
635 		/* No record found, try running new worker. */
636 		elog(DEBUG1, "registering pglogical manager process for database %s",
637 			 NameStr(pgdatabase->datname));
638 
639 		memset(&worker, 0, sizeof(PGLogicalWorker));
640 		worker.worker_type = PGLOGICAL_WORKER_MANAGER;
641 		worker.dboid = dboid;
642 
643 		pglogical_worker_register(&worker);
644 	}
645 
646 	table_endscan(scan);
647 	table_close(rel, AccessShareLock);
648 }
649 
650 /*
651  * Static bgworker used for initialization and management (our main process).
652  */
653 void
pglogical_supervisor_main(Datum main_arg)654 pglogical_supervisor_main(Datum main_arg)
655 {
656 	/* Establish signal handlers. */
657 	pqsignal(SIGTERM, handle_sigterm);
658 	BackgroundWorkerUnblockSignals();
659 
660 	/*
661 	 * Initialize supervisor info in shared memory.  Strictly speaking we
662 	 * don't need a lock here, because no other process could possibly be
663 	 * looking at this shared struct since they're all started by the
664 	 * supervisor, but let's be safe.
665 	 */
666 	LWLockAcquire(PGLogicalCtx->lock, LW_EXCLUSIVE);
667 	PGLogicalCtx->supervisor = MyProc;
668 	PGLogicalCtx->subscriptions_changed = true;
669 	LWLockRelease(PGLogicalCtx->lock);
670 
671 	/* Make it easy to identify our processes. */
672 	SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
673 					PGC_USERSET, PGC_S_SESSION);
674 
675 	elog(LOG, "starting pglogical supervisor");
676 
677 	VALGRIND_PRINTF("PGLOGICAL: supervisor\n");
678 
679 	/* Setup connection to pinned catalogs (we only ever read pg_database). */
680 #if PG_VERSION_NUM >= 110000
681 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
682 #elif PG_VERSION_NUM >= 90500
683 	BackgroundWorkerInitializeConnection(NULL, NULL);
684 #else
685 	BackgroundWorkerInitializeConnection("postgres", NULL);
686 #endif
687 
688 	/* Main wait loop. */
689 	while (!got_SIGTERM)
690     {
691 		int rc;
692 
693 		if (PGLogicalCtx->subscriptions_changed)
694 		{
695 			/*
696 			 * No need to lock here, since we'll take account of all sub
697 			 * changes up to this point, even if new ones were added between
698 			 * the test above and flag clear. We're just being woken up.
699 			 */
700 			PGLogicalCtx->subscriptions_changed = false;
701 			StartTransactionCommand();
702 			start_manager_workers();
703 			CommitTransactionCommand();
704 		}
705 
706 		rc = WaitLatch(&MyProc->procLatch,
707 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
708 					   180000L);
709 
710         ResetLatch(&MyProc->procLatch);
711 
712         /* emergency bailout if postmaster has died */
713         if (rc & WL_POSTMASTER_DEATH)
714 			proc_exit(1);
715 	}
716 
717 	VALGRIND_PRINTF("PGLOGICAL: supervisor exit\n");
718 	proc_exit(0);
719 }
720 
721 static void
pglogical_temp_directory_assing_hook(const char * newval,void * extra)722 pglogical_temp_directory_assing_hook(const char *newval, void *extra)
723 {
724 	if (strlen(newval))
725 	{
726 		pglogical_temp_directory = strdup(newval);
727 	}
728 	else
729 	{
730 #ifndef WIN32
731 		const char *tmpdir = getenv("TMPDIR");
732 
733 		if (!tmpdir)
734 			tmpdir = "/tmp";
735 #else
736 		char		tmpdir[MAXPGPATH];
737 		int			ret;
738 
739 		ret = GetTempPath(MAXPGPATH, tmpdir);
740 		if (ret == 0 || ret > MAXPGPATH)
741 		{
742 			ereport(ERROR,
743 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
744 					 errmsg("could not locate temporary directory: %s\n",
745 							!ret ? strerror(errno) : "")));
746 			return false;
747 		}
748 #endif
749 
750 		pglogical_temp_directory = strdup(tmpdir);
751 
752 	}
753 
754 	if (pglogical_temp_directory == NULL)
755 		ereport(ERROR,
756 				(errcode(ERRCODE_OUT_OF_MEMORY),
757 				 errmsg("out of memory")));
758 }
759 
760 
761 /*
762  * Entry point for this module.
763  */
764 void
_PG_init(void)765 _PG_init(void)
766 {
767 	BackgroundWorker bgw;
768 
769 	if (!process_shared_preload_libraries_in_progress)
770 		elog(ERROR, "pglogical is not in shared_preload_libraries");
771 
772 	DefineCustomEnumVariable("pglogical.conflict_resolution",
773 							 gettext_noop("Sets method used for conflict resolution for resolvable conflicts."),
774 							 NULL,
775 							 &pglogical_conflict_resolver,
776 #ifdef XCP
777 							 PGLOGICAL_RESOLVE_ERROR,
778 #else
779 							 PGLOGICAL_RESOLVE_APPLY_REMOTE,
780 #endif
781 							 PGLogicalConflictResolvers,
782 							 PGC_SUSET, 0,
783 							 pglogical_conflict_resolver_check_hook,
784 							 NULL, NULL);
785 
786 	DefineCustomEnumVariable("pglogical.conflict_log_level",
787 							 gettext_noop("Sets log level used for logging resolved conflicts."),
788 							 NULL,
789 							 &pglogical_conflict_log_level,
790 							 LOG,
791 							 server_message_level_options,
792 							 PGC_SUSET, 0,
793 							 NULL, NULL, NULL);
794 
795 	DefineCustomBoolVariable("pglogical.synchronous_commit",
796 							 "pglogical specific synchronous commit value",
797 							 NULL,
798 							 &pglogical_synchronous_commit,
799 							 false, PGC_POSTMASTER,
800 							 0,
801 							 NULL, NULL, NULL);
802 
803 	DefineCustomBoolVariable("pglogical.use_spi",
804 							 "Use SPI instead of low-level API for applying changes",
805 							 NULL,
806 							 &pglogical_use_spi,
807 #ifdef XCP
808 							 true,
809 #else
810 							 false,
811 #endif
812 							 PGC_POSTMASTER,
813 							 0,
814 							 NULL, NULL, NULL);
815 
816 	DefineCustomBoolVariable("pglogical.batch_inserts",
817 							 "Batch inserts if possible",
818 							 NULL,
819 							 &pglogical_batch_inserts,
820 							 true,
821 							 PGC_POSTMASTER,
822 							 0,
823 							 NULL, NULL, NULL);
824 
825 	/*
826 	 * We can't use the temp_tablespace safely for our dumps, because Pg's
827 	 * crash recovery is very careful to delete only particularly formatted
828 	 * files. Instead for now just allow user to specify dump storage.
829 	 */
830 	DefineCustomStringVariable("pglogical.temp_directory",
831 							   "Directory to store dumps for local restore",
832 							   NULL,
833 							   &pglogical_temp_directory_config,
834 							   "", PGC_SIGHUP,
835 							   0,
836 							   NULL,
837 							   pglogical_temp_directory_assing_hook,
838 							   NULL);
839 
840 	DefineCustomStringVariable("pglogical.extra_connection_options",
841 							   "connection options to add to all peer node connections",
842 							   NULL,
843 							   &pglogical_extra_connection_options,
844 							   "",
845 							   PGC_SIGHUP,
846 							   0,
847 							   NULL, NULL, NULL);
848 
849 	if (IsBinaryUpgrade)
850 		return;
851 
852 	/* Init workers. */
853 	pglogical_worker_shmem_init();
854 
855 	/* Init executor module */
856 	pglogical_executor_init();
857 
858 	/* Run the supervisor. */
859 	memset(&bgw, 0, sizeof(bgw));
860 	bgw.bgw_flags =	BGWORKER_SHMEM_ACCESS |
861 		BGWORKER_BACKEND_DATABASE_CONNECTION;
862 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
863 	snprintf(bgw.bgw_library_name, BGW_MAXLEN,
864 			 EXTENSION_NAME);
865 	snprintf(bgw.bgw_function_name, BGW_MAXLEN,
866 			 "pglogical_supervisor_main");
867 	snprintf(bgw.bgw_name, BGW_MAXLEN,
868 			 "pglogical supervisor");
869 	bgw.bgw_restart_time = 5;
870 
871 	RegisterBackgroundWorker(&bgw);
872 }
873