1 /*-------------------------------------------------------------------------
2  *
3  * subscriptioncmds.c
4  *		subscription catalog manipulation functions
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *		subscriptioncmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "miscadmin.h"
18 
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/xact.h"
22 
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/objectaccess.h"
27 #include "catalog/objectaddress.h"
28 #include "catalog/pg_type.h"
29 #include "catalog/pg_subscription.h"
30 #include "catalog/pg_subscription_rel.h"
31 
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
34 #include "commands/subscriptioncmds.h"
35 
36 #include "executor/executor.h"
37 
38 #include "nodes/makefuncs.h"
39 
40 #include "replication/logicallauncher.h"
41 #include "replication/origin.h"
42 #include "replication/slot.h"
43 #include "replication/walreceiver.h"
44 #include "replication/walsender.h"
45 #include "replication/worker_internal.h"
46 
47 #include "storage/lmgr.h"
48 
49 #include "utils/builtins.h"
50 #include "utils/guc.h"
51 #include "utils/lsyscache.h"
52 #include "utils/memutils.h"
53 #include "utils/syscache.h"
54 
55 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
56 
57 /*
58  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
59  *
60  * Since not all options can be specified in both commands, this function
61  * will report an error on options if the target output pointer is NULL to
62  * accommodate that.
63  */
64 static void
parse_subscription_options(List * options,bool * connect,bool * enabled_given,bool * enabled,bool * create_slot,bool * slot_name_given,char ** slot_name,bool * copy_data,char ** synchronous_commit,bool * refresh)65 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
66 						   bool *enabled, bool *create_slot,
67 						   bool *slot_name_given, char **slot_name,
68 						   bool *copy_data, char **synchronous_commit,
69 						   bool *refresh)
70 {
71 	ListCell   *lc;
72 	bool		connect_given = false;
73 	bool		create_slot_given = false;
74 	bool		copy_data_given = false;
75 	bool		refresh_given = false;
76 
77 	/* If connect is specified, the others also need to be. */
78 	Assert(!connect || (enabled && create_slot && copy_data));
79 
80 	if (connect)
81 		*connect = true;
82 	if (enabled)
83 	{
84 		*enabled_given = false;
85 		*enabled = true;
86 	}
87 	if (create_slot)
88 		*create_slot = true;
89 	if (slot_name)
90 	{
91 		*slot_name_given = false;
92 		*slot_name = NULL;
93 	}
94 	if (copy_data)
95 		*copy_data = true;
96 	if (synchronous_commit)
97 		*synchronous_commit = NULL;
98 	if (refresh)
99 		*refresh = true;
100 
101 	/* Parse options */
102 	foreach(lc, options)
103 	{
104 		DefElem    *defel = (DefElem *) lfirst(lc);
105 
106 		if (strcmp(defel->defname, "connect") == 0 && connect)
107 		{
108 			if (connect_given)
109 				ereport(ERROR,
110 						(errcode(ERRCODE_SYNTAX_ERROR),
111 						 errmsg("conflicting or redundant options")));
112 
113 			connect_given = true;
114 			*connect = defGetBoolean(defel);
115 		}
116 		else if (strcmp(defel->defname, "enabled") == 0 && enabled)
117 		{
118 			if (*enabled_given)
119 				ereport(ERROR,
120 						(errcode(ERRCODE_SYNTAX_ERROR),
121 						 errmsg("conflicting or redundant options")));
122 
123 			*enabled_given = true;
124 			*enabled = defGetBoolean(defel);
125 		}
126 		else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
127 		{
128 			if (create_slot_given)
129 				ereport(ERROR,
130 						(errcode(ERRCODE_SYNTAX_ERROR),
131 						 errmsg("conflicting or redundant options")));
132 
133 			create_slot_given = true;
134 			*create_slot = defGetBoolean(defel);
135 		}
136 		else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
137 		{
138 			if (*slot_name_given)
139 				ereport(ERROR,
140 						(errcode(ERRCODE_SYNTAX_ERROR),
141 						 errmsg("conflicting or redundant options")));
142 
143 			*slot_name_given = true;
144 			*slot_name = defGetString(defel);
145 
146 			/* Setting slot_name = NONE is treated as no slot name. */
147 			if (strcmp(*slot_name, "none") == 0)
148 				*slot_name = NULL;
149 			else
150 				ReplicationSlotValidateName(*slot_name, ERROR);
151 		}
152 		else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
153 		{
154 			if (copy_data_given)
155 				ereport(ERROR,
156 						(errcode(ERRCODE_SYNTAX_ERROR),
157 						 errmsg("conflicting or redundant options")));
158 
159 			copy_data_given = true;
160 			*copy_data = defGetBoolean(defel);
161 		}
162 		else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
163 				 synchronous_commit)
164 		{
165 			if (*synchronous_commit)
166 				ereport(ERROR,
167 						(errcode(ERRCODE_SYNTAX_ERROR),
168 						 errmsg("conflicting or redundant options")));
169 
170 			*synchronous_commit = defGetString(defel);
171 
172 			/* Test if the given value is valid for synchronous_commit GUC. */
173 			(void) set_config_option("synchronous_commit", *synchronous_commit,
174 									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
175 									 false, 0, false);
176 		}
177 		else if (strcmp(defel->defname, "refresh") == 0 && refresh)
178 		{
179 			if (refresh_given)
180 				ereport(ERROR,
181 						(errcode(ERRCODE_SYNTAX_ERROR),
182 						 errmsg("conflicting or redundant options")));
183 
184 			refresh_given = true;
185 			*refresh = defGetBoolean(defel);
186 		}
187 		else
188 			ereport(ERROR,
189 					(errcode(ERRCODE_SYNTAX_ERROR),
190 					 errmsg("unrecognized subscription parameter: %s", defel->defname)));
191 	}
192 
193 	/*
194 	 * We've been explicitly asked to not connect, that requires some
195 	 * additional processing.
196 	 */
197 	if (connect && !*connect)
198 	{
199 		/* Check for incompatible options from the user. */
200 		if (enabled && *enabled_given && *enabled)
201 			ereport(ERROR,
202 					(errcode(ERRCODE_SYNTAX_ERROR),
203 					 errmsg("connect = false and enabled = true are mutually exclusive options")));
204 
205 		if (create_slot && create_slot_given && *create_slot)
206 			ereport(ERROR,
207 					(errcode(ERRCODE_SYNTAX_ERROR),
208 					 errmsg("connect = false and create_slot = true are mutually exclusive options")));
209 
210 		if (copy_data && copy_data_given && *copy_data)
211 			ereport(ERROR,
212 					(errcode(ERRCODE_SYNTAX_ERROR),
213 					 errmsg("connect = false and copy_data = true are mutually exclusive options")));
214 
215 		/* Change the defaults of other options. */
216 		*enabled = false;
217 		*create_slot = false;
218 		*copy_data = false;
219 	}
220 
221 	/*
222 	 * Do additional checking for disallowed combination when slot_name = NONE
223 	 * was used.
224 	 */
225 	if (slot_name && *slot_name_given && !*slot_name)
226 	{
227 		if (enabled && *enabled_given && *enabled)
228 			ereport(ERROR,
229 					(errcode(ERRCODE_SYNTAX_ERROR),
230 					 errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
231 
232 		if (create_slot && create_slot_given && *create_slot)
233 			ereport(ERROR,
234 					(errcode(ERRCODE_SYNTAX_ERROR),
235 					 errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
236 
237 		if (enabled && !*enabled_given && *enabled)
238 			ereport(ERROR,
239 					(errcode(ERRCODE_SYNTAX_ERROR),
240 					 errmsg("subscription with slot_name = NONE must also set enabled = false")));
241 
242 		if (create_slot && !create_slot_given && *create_slot)
243 			ereport(ERROR,
244 					(errcode(ERRCODE_SYNTAX_ERROR),
245 					 errmsg("subscription with slot_name = NONE must also set create_slot = false")));
246 	}
247 }
248 
249 /*
250  * Auxiliary function to return a text array out of a list of String nodes.
251  */
252 static Datum
publicationListToArray(List * publist)253 publicationListToArray(List *publist)
254 {
255 	ArrayType  *arr;
256 	Datum	   *datums;
257 	int			j = 0;
258 	ListCell   *cell;
259 	MemoryContext memcxt;
260 	MemoryContext oldcxt;
261 
262 	/* Create memory context for temporary allocations. */
263 	memcxt = AllocSetContextCreate(CurrentMemoryContext,
264 								   "publicationListToArray to array",
265 								   ALLOCSET_DEFAULT_MINSIZE,
266 								   ALLOCSET_DEFAULT_INITSIZE,
267 								   ALLOCSET_DEFAULT_MAXSIZE);
268 	oldcxt = MemoryContextSwitchTo(memcxt);
269 
270 	datums = palloc(sizeof(text *) * list_length(publist));
271 	foreach(cell, publist)
272 	{
273 		char	   *name = strVal(lfirst(cell));
274 		ListCell   *pcell;
275 
276 		/* Check for duplicates. */
277 		foreach(pcell, publist)
278 		{
279 			char	   *pname = strVal(lfirst(pcell));
280 
281 			if (name == pname)
282 				break;
283 
284 			if (strcmp(name, pname) == 0)
285 				ereport(ERROR,
286 						(errcode(ERRCODE_SYNTAX_ERROR),
287 						 errmsg("publication name \"%s\" used more than once",
288 								pname)));
289 		}
290 
291 		datums[j++] = CStringGetTextDatum(name);
292 	}
293 
294 	MemoryContextSwitchTo(oldcxt);
295 
296 	arr = construct_array(datums, list_length(publist),
297 						  TEXTOID, -1, false, 'i');
298 	MemoryContextDelete(memcxt);
299 
300 	return PointerGetDatum(arr);
301 }
302 
303 /*
304  * Create new subscription.
305  */
306 ObjectAddress
CreateSubscription(CreateSubscriptionStmt * stmt,bool isTopLevel)307 CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
308 {
309 	Relation	rel;
310 	ObjectAddress myself;
311 	Oid			subid;
312 	bool		nulls[Natts_pg_subscription];
313 	Datum		values[Natts_pg_subscription];
314 	Oid			owner = GetUserId();
315 	HeapTuple	tup;
316 	bool		connect;
317 	bool		enabled_given;
318 	bool		enabled;
319 	bool		copy_data;
320 	char	   *synchronous_commit;
321 	char	   *conninfo;
322 	char	   *slotname;
323 	bool		slotname_given;
324 	char		originname[NAMEDATALEN];
325 	bool		create_slot;
326 	List	   *publications;
327 
328 	/*
329 	 * Parse and check options.
330 	 *
331 	 * Connection and publication should not be specified here.
332 	 */
333 	parse_subscription_options(stmt->options, &connect, &enabled_given,
334 							   &enabled, &create_slot, &slotname_given,
335 							   &slotname, &copy_data, &synchronous_commit,
336 							   NULL);
337 
338 	/*
339 	 * Since creating a replication slot is not transactional, rolling back
340 	 * the transaction leaves the created replication slot.  So we cannot run
341 	 * CREATE SUBSCRIPTION inside a transaction block if creating a
342 	 * replication slot.
343 	 */
344 	if (create_slot)
345 		PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
346 
347 	if (!superuser())
348 		ereport(ERROR,
349 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
350 				 (errmsg("must be superuser to create subscriptions"))));
351 
352 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
353 
354 	/* Check if name is used */
355 	subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
356 							CStringGetDatum(stmt->subname));
357 	if (OidIsValid(subid))
358 	{
359 		ereport(ERROR,
360 				(errcode(ERRCODE_DUPLICATE_OBJECT),
361 				 errmsg("subscription \"%s\" already exists",
362 						stmt->subname)));
363 	}
364 
365 	if (!slotname_given && slotname == NULL)
366 		slotname = stmt->subname;
367 
368 	/* The default for synchronous_commit of subscriptions is off. */
369 	if (synchronous_commit == NULL)
370 		synchronous_commit = "off";
371 
372 	conninfo = stmt->conninfo;
373 	publications = stmt->publication;
374 
375 	/* Load the library providing us libpq calls. */
376 	load_file("libpqwalreceiver", false);
377 
378 	/* Check the connection info string. */
379 	walrcv_check_conninfo(conninfo);
380 
381 	/* Everything ok, form a new tuple. */
382 	memset(values, 0, sizeof(values));
383 	memset(nulls, false, sizeof(nulls));
384 
385 	values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
386 	values[Anum_pg_subscription_subname - 1] =
387 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
388 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
389 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
390 	values[Anum_pg_subscription_subconninfo - 1] =
391 		CStringGetTextDatum(conninfo);
392 	if (slotname)
393 		values[Anum_pg_subscription_subslotname - 1] =
394 			DirectFunctionCall1(namein, CStringGetDatum(slotname));
395 	else
396 		nulls[Anum_pg_subscription_subslotname - 1] = true;
397 	values[Anum_pg_subscription_subsynccommit - 1] =
398 		CStringGetTextDatum(synchronous_commit);
399 	values[Anum_pg_subscription_subpublications - 1] =
400 		publicationListToArray(publications);
401 
402 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
403 
404 	/* Insert tuple into catalog. */
405 	subid = CatalogTupleInsert(rel, tup);
406 	heap_freetuple(tup);
407 
408 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
409 
410 	snprintf(originname, sizeof(originname), "pg_%u", subid);
411 	replorigin_create(originname);
412 
413 	/*
414 	 * Connect to remote side to execute requested commands and fetch table
415 	 * info.
416 	 */
417 	if (connect)
418 	{
419 		XLogRecPtr	lsn;
420 		char	   *err;
421 		WalReceiverConn *wrconn;
422 		List	   *tables;
423 		ListCell   *lc;
424 		char		table_state;
425 
426 		/* Try to connect to the publisher. */
427 		wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
428 		if (!wrconn)
429 			ereport(ERROR,
430 					(errmsg("could not connect to the publisher: %s", err)));
431 
432 		PG_TRY();
433 		{
434 			/*
435 			 * Set sync state based on if we were asked to do data copy or
436 			 * not.
437 			 */
438 			table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
439 
440 			/*
441 			 * Get the table list from publisher and build local table status
442 			 * info.
443 			 */
444 			tables = fetch_table_list(wrconn, publications);
445 			foreach(lc, tables)
446 			{
447 				RangeVar   *rv = (RangeVar *) lfirst(lc);
448 				Oid			relid;
449 
450 				relid = RangeVarGetRelid(rv, AccessShareLock, false);
451 
452 				/* Check for supported relkind. */
453 				CheckSubscriptionRelkind(get_rel_relkind(relid),
454 										 rv->schemaname, rv->relname);
455 
456 				SetSubscriptionRelState(subid, relid, table_state,
457 										InvalidXLogRecPtr, false);
458 			}
459 
460 			/*
461 			 * If requested, create permanent slot for the subscription. We
462 			 * won't use the initial snapshot for anything, so no need to
463 			 * export it.
464 			 */
465 			if (create_slot)
466 			{
467 				Assert(slotname);
468 
469 				walrcv_create_slot(wrconn, slotname, false,
470 								   CRS_NOEXPORT_SNAPSHOT, &lsn);
471 				ereport(NOTICE,
472 						(errmsg("created replication slot \"%s\" on publisher",
473 								slotname)));
474 			}
475 		}
476 		PG_CATCH();
477 		{
478 			/* Close the connection in case of failure. */
479 			walrcv_disconnect(wrconn);
480 			PG_RE_THROW();
481 		}
482 		PG_END_TRY();
483 
484 		/* And we are done with the remote side. */
485 		walrcv_disconnect(wrconn);
486 	}
487 	else
488 		ereport(WARNING,
489 				(errmsg("tables were not subscribed, you will have to run "
490 						"ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
491 						"subscribe the tables")));
492 
493 	heap_close(rel, RowExclusiveLock);
494 
495 	if (enabled)
496 		ApplyLauncherWakeupAtCommit();
497 
498 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
499 
500 	InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
501 
502 	return myself;
503 }
504 
505 static void
AlterSubscription_refresh(Subscription * sub,bool copy_data)506 AlterSubscription_refresh(Subscription *sub, bool copy_data)
507 {
508 	char	   *err;
509 	List	   *pubrel_names;
510 	List	   *subrel_states;
511 	Oid		   *subrel_local_oids;
512 	Oid		   *pubrel_local_oids;
513 	WalReceiverConn *wrconn;
514 	ListCell   *lc;
515 	int			off;
516 
517 	/* Load the library providing us libpq calls. */
518 	load_file("libpqwalreceiver", false);
519 
520 	/* Try to connect to the publisher. */
521 	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
522 	if (!wrconn)
523 		ereport(ERROR,
524 				(errmsg("could not connect to the publisher: %s", err)));
525 
526 	/* Get the table list from publisher. */
527 	pubrel_names = fetch_table_list(wrconn, sub->publications);
528 
529 	/* We are done with the remote side, close connection. */
530 	walrcv_disconnect(wrconn);
531 
532 	/* Get local table list. */
533 	subrel_states = GetSubscriptionRelations(sub->oid);
534 
535 	/*
536 	 * Build qsorted array of local table oids for faster lookup. This can
537 	 * potentially contain all tables in the database so speed of lookup is
538 	 * important.
539 	 */
540 	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
541 	off = 0;
542 	foreach(lc, subrel_states)
543 	{
544 		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
545 
546 		subrel_local_oids[off++] = relstate->relid;
547 	}
548 	qsort(subrel_local_oids, list_length(subrel_states),
549 		  sizeof(Oid), oid_cmp);
550 
551 	/*
552 	 * Walk over the remote tables and try to match them to locally known
553 	 * tables. If the table is not known locally create a new state for it.
554 	 *
555 	 * Also builds array of local oids of remote tables for the next step.
556 	 */
557 	off = 0;
558 	pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
559 
560 	foreach(lc, pubrel_names)
561 	{
562 		RangeVar   *rv = (RangeVar *) lfirst(lc);
563 		Oid			relid;
564 
565 		relid = RangeVarGetRelid(rv, AccessShareLock, false);
566 
567 		/* Check for supported relkind. */
568 		CheckSubscriptionRelkind(get_rel_relkind(relid),
569 								 rv->schemaname, rv->relname);
570 
571 		pubrel_local_oids[off++] = relid;
572 
573 		if (!bsearch(&relid, subrel_local_oids,
574 					 list_length(subrel_states), sizeof(Oid), oid_cmp))
575 		{
576 			SetSubscriptionRelState(sub->oid, relid,
577 									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
578 									InvalidXLogRecPtr, false);
579 			ereport(DEBUG1,
580 					(errmsg("table \"%s.%s\" added to subscription \"%s\"",
581 							rv->schemaname, rv->relname, sub->name)));
582 		}
583 	}
584 
585 	/*
586 	 * Next remove state for tables we should not care about anymore using the
587 	 * data we collected above
588 	 */
589 	qsort(pubrel_local_oids, list_length(pubrel_names),
590 		  sizeof(Oid), oid_cmp);
591 
592 	for (off = 0; off < list_length(subrel_states); off++)
593 	{
594 		Oid			relid = subrel_local_oids[off];
595 
596 		if (!bsearch(&relid, pubrel_local_oids,
597 					 list_length(pubrel_names), sizeof(Oid), oid_cmp))
598 		{
599 			RemoveSubscriptionRel(sub->oid, relid);
600 
601 			logicalrep_worker_stop_at_commit(sub->oid, relid);
602 
603 			ereport(DEBUG1,
604 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
605 							get_namespace_name(get_rel_namespace(relid)),
606 							get_rel_name(relid),
607 							sub->name)));
608 		}
609 	}
610 }
611 
612 /*
613  * Alter the existing subscription.
614  */
615 ObjectAddress
AlterSubscription(AlterSubscriptionStmt * stmt)616 AlterSubscription(AlterSubscriptionStmt *stmt)
617 {
618 	Relation	rel;
619 	ObjectAddress myself;
620 	bool		nulls[Natts_pg_subscription];
621 	bool		replaces[Natts_pg_subscription];
622 	Datum		values[Natts_pg_subscription];
623 	HeapTuple	tup;
624 	Oid			subid;
625 	bool		update_tuple = false;
626 	Subscription *sub;
627 
628 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
629 
630 	/* Fetch the existing tuple. */
631 	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
632 							  CStringGetDatum(stmt->subname));
633 
634 	if (!HeapTupleIsValid(tup))
635 		ereport(ERROR,
636 				(errcode(ERRCODE_UNDEFINED_OBJECT),
637 				 errmsg("subscription \"%s\" does not exist",
638 						stmt->subname)));
639 
640 	/* must be owner */
641 	if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
642 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
643 					   stmt->subname);
644 
645 	subid = HeapTupleGetOid(tup);
646 	sub = GetSubscription(subid, false);
647 
648 	/* Lock the subscription so nobody else can do anything with it. */
649 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
650 
651 	/* Form a new tuple. */
652 	memset(values, 0, sizeof(values));
653 	memset(nulls, false, sizeof(nulls));
654 	memset(replaces, false, sizeof(replaces));
655 
656 	switch (stmt->kind)
657 	{
658 		case ALTER_SUBSCRIPTION_OPTIONS:
659 			{
660 				char	   *slotname;
661 				bool		slotname_given;
662 				char	   *synchronous_commit;
663 
664 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
665 										   NULL, &slotname_given, &slotname,
666 										   NULL, &synchronous_commit, NULL);
667 
668 				if (slotname_given)
669 				{
670 					if (sub->enabled && !slotname)
671 						ereport(ERROR,
672 								(errcode(ERRCODE_SYNTAX_ERROR),
673 								 errmsg("cannot set slot_name = NONE for enabled subscription")));
674 
675 					if (slotname)
676 						values[Anum_pg_subscription_subslotname - 1] =
677 							DirectFunctionCall1(namein, CStringGetDatum(slotname));
678 					else
679 						nulls[Anum_pg_subscription_subslotname - 1] = true;
680 					replaces[Anum_pg_subscription_subslotname - 1] = true;
681 				}
682 
683 				if (synchronous_commit)
684 				{
685 					values[Anum_pg_subscription_subsynccommit - 1] =
686 						CStringGetTextDatum(synchronous_commit);
687 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
688 				}
689 
690 				update_tuple = true;
691 				break;
692 			}
693 
694 		case ALTER_SUBSCRIPTION_ENABLED:
695 			{
696 				bool		enabled,
697 							enabled_given;
698 
699 				parse_subscription_options(stmt->options, NULL,
700 										   &enabled_given, &enabled, NULL,
701 										   NULL, NULL, NULL, NULL, NULL);
702 				Assert(enabled_given);
703 
704 				if (!sub->slotname && enabled)
705 					ereport(ERROR,
706 							(errcode(ERRCODE_SYNTAX_ERROR),
707 							 errmsg("cannot enable subscription that does not have a slot name")));
708 
709 				values[Anum_pg_subscription_subenabled - 1] =
710 					BoolGetDatum(enabled);
711 				replaces[Anum_pg_subscription_subenabled - 1] = true;
712 
713 				if (enabled)
714 					ApplyLauncherWakeupAtCommit();
715 
716 				update_tuple = true;
717 				break;
718 			}
719 
720 		case ALTER_SUBSCRIPTION_CONNECTION:
721 			/* Load the library providing us libpq calls. */
722 			load_file("libpqwalreceiver", false);
723 			/* Check the connection info string. */
724 			walrcv_check_conninfo(stmt->conninfo);
725 
726 			values[Anum_pg_subscription_subconninfo - 1] =
727 				CStringGetTextDatum(stmt->conninfo);
728 			replaces[Anum_pg_subscription_subconninfo - 1] = true;
729 			update_tuple = true;
730 			break;
731 
732 		case ALTER_SUBSCRIPTION_PUBLICATION:
733 			{
734 				bool		copy_data;
735 				bool		refresh;
736 
737 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
738 										   NULL, NULL, NULL, &copy_data,
739 										   NULL, &refresh);
740 
741 				values[Anum_pg_subscription_subpublications - 1] =
742 					publicationListToArray(stmt->publication);
743 				replaces[Anum_pg_subscription_subpublications - 1] = true;
744 
745 				update_tuple = true;
746 
747 				/* Refresh if user asked us to. */
748 				if (refresh)
749 				{
750 					if (!sub->enabled)
751 						ereport(ERROR,
752 								(errcode(ERRCODE_SYNTAX_ERROR),
753 								 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
754 								 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
755 
756 					/* Make sure refresh sees the new list of publications. */
757 					sub->publications = stmt->publication;
758 
759 					AlterSubscription_refresh(sub, copy_data);
760 				}
761 
762 				break;
763 			}
764 
765 		case ALTER_SUBSCRIPTION_REFRESH:
766 			{
767 				bool		copy_data;
768 
769 				if (!sub->enabled)
770 					ereport(ERROR,
771 							(errcode(ERRCODE_SYNTAX_ERROR),
772 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
773 
774 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
775 										   NULL, NULL, NULL, &copy_data,
776 										   NULL, NULL);
777 
778 				AlterSubscription_refresh(sub, copy_data);
779 
780 				break;
781 			}
782 
783 		default:
784 			elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
785 				 stmt->kind);
786 	}
787 
788 	/* Update the catalog if needed. */
789 	if (update_tuple)
790 	{
791 		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
792 								replaces);
793 
794 		CatalogTupleUpdate(rel, &tup->t_self, tup);
795 
796 		heap_freetuple(tup);
797 	}
798 
799 	heap_close(rel, RowExclusiveLock);
800 
801 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
802 
803 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
804 
805 	return myself;
806 }
807 
808 /*
809  * Drop a subscription
810  */
811 void
DropSubscription(DropSubscriptionStmt * stmt,bool isTopLevel)812 DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
813 {
814 	Relation	rel;
815 	ObjectAddress myself;
816 	HeapTuple	tup;
817 	Oid			subid;
818 	Datum		datum;
819 	bool		isnull;
820 	char	   *subname;
821 	char	   *conninfo;
822 	char	   *slotname;
823 	List	   *subworkers;
824 	ListCell   *lc;
825 	char		originname[NAMEDATALEN];
826 	char	   *err = NULL;
827 	RepOriginId originid;
828 	WalReceiverConn *wrconn;
829 	StringInfoData cmd;
830 
831 	/*
832 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
833 	 * launcher doesn't restart new worker during dropping the subscription
834 	 */
835 	rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
836 
837 	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
838 						  CStringGetDatum(stmt->subname));
839 
840 	if (!HeapTupleIsValid(tup))
841 	{
842 		heap_close(rel, NoLock);
843 
844 		if (!stmt->missing_ok)
845 			ereport(ERROR,
846 					(errcode(ERRCODE_UNDEFINED_OBJECT),
847 					 errmsg("subscription \"%s\" does not exist",
848 							stmt->subname)));
849 		else
850 			ereport(NOTICE,
851 					(errmsg("subscription \"%s\" does not exist, skipping",
852 							stmt->subname)));
853 
854 		return;
855 	}
856 
857 	subid = HeapTupleGetOid(tup);
858 
859 	/* must be owner */
860 	if (!pg_subscription_ownercheck(subid, GetUserId()))
861 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
862 					   stmt->subname);
863 
864 	/* DROP hook for the subscription being removed */
865 	InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
866 
867 	/*
868 	 * Lock the subscription so nobody else can do anything with it (including
869 	 * the replication workers).
870 	 */
871 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
872 
873 	/* Get subname */
874 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
875 							Anum_pg_subscription_subname, &isnull);
876 	Assert(!isnull);
877 	subname = pstrdup(NameStr(*DatumGetName(datum)));
878 
879 	/* Get conninfo */
880 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
881 							Anum_pg_subscription_subconninfo, &isnull);
882 	Assert(!isnull);
883 	conninfo = TextDatumGetCString(datum);
884 
885 	/* Get slotname */
886 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
887 							Anum_pg_subscription_subslotname, &isnull);
888 	if (!isnull)
889 		slotname = pstrdup(NameStr(*DatumGetName(datum)));
890 	else
891 		slotname = NULL;
892 
893 	/*
894 	 * Since dropping a replication slot is not transactional, the replication
895 	 * slot stays dropped even if the transaction rolls back.  So we cannot
896 	 * run DROP SUBSCRIPTION inside a transaction block if dropping the
897 	 * replication slot.
898 	 *
899 	 * XXX The command name should really be something like "DROP SUBSCRIPTION
900 	 * of a subscription that is associated with a replication slot", but we
901 	 * don't have the proper facilities for that.
902 	 */
903 	if (slotname)
904 		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
905 
906 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
907 	EventTriggerSQLDropAddObject(&myself, true, true);
908 
909 	/* Remove the tuple from catalog. */
910 	CatalogTupleDelete(rel, &tup->t_self);
911 
912 	ReleaseSysCache(tup);
913 
914 	/*
915 	 * Stop all the subscription workers immediately.
916 	 *
917 	 * This is necessary if we are dropping the replication slot, so that the
918 	 * slot becomes accessible.
919 	 *
920 	 * It is also necessary if the subscription is disabled and was disabled
921 	 * in the same transaction.  Then the workers haven't seen the disabling
922 	 * yet and will still be running, leading to hangs later when we want to
923 	 * drop the replication origin.  If the subscription was disabled before
924 	 * this transaction, then there shouldn't be any workers left, so this
925 	 * won't make a difference.
926 	 *
927 	 * New workers won't be started because we hold an exclusive lock on the
928 	 * subscription till the end of the transaction.
929 	 */
930 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
931 	subworkers = logicalrep_workers_find(subid, false);
932 	LWLockRelease(LogicalRepWorkerLock);
933 	foreach(lc, subworkers)
934 	{
935 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
936 
937 		logicalrep_worker_stop(w->subid, w->relid);
938 	}
939 	list_free(subworkers);
940 
941 	/* Clean up dependencies */
942 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
943 
944 	/* Remove any associated relation synchronization states. */
945 	RemoveSubscriptionRel(subid, InvalidOid);
946 
947 	/* Remove the origin tracking if exists. */
948 	snprintf(originname, sizeof(originname), "pg_%u", subid);
949 	originid = replorigin_by_name(originname, true);
950 	if (originid != InvalidRepOriginId)
951 		replorigin_drop(originid, false);
952 
953 	/*
954 	 * If there is no slot associated with the subscription, we can finish
955 	 * here.
956 	 */
957 	if (!slotname)
958 	{
959 		heap_close(rel, NoLock);
960 		return;
961 	}
962 
963 	/*
964 	 * Otherwise drop the replication slot at the publisher node using the
965 	 * replication connection.
966 	 */
967 	load_file("libpqwalreceiver", false);
968 
969 	initStringInfo(&cmd);
970 	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
971 
972 	wrconn = walrcv_connect(conninfo, true, subname, &err);
973 	if (wrconn == NULL)
974 		ereport(ERROR,
975 				(errmsg("could not connect to publisher when attempting to "
976 						"drop the replication slot \"%s\"", slotname),
977 				 errdetail("The error was: %s", err),
978 				 errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
979 						 "to disassociate the subscription from the slot.")));
980 
981 	PG_TRY();
982 	{
983 		WalRcvExecResult *res;
984 
985 		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
986 
987 		if (res->status != WALRCV_OK_COMMAND)
988 			ereport(ERROR,
989 					(errmsg("could not drop the replication slot \"%s\" on publisher",
990 							slotname),
991 					 errdetail("The error was: %s", res->err)));
992 		else
993 			ereport(NOTICE,
994 					(errmsg("dropped replication slot \"%s\" on publisher",
995 							slotname)));
996 
997 		walrcv_clear_result(res);
998 	}
999 	PG_CATCH();
1000 	{
1001 		/* Close the connection in case of failure */
1002 		walrcv_disconnect(wrconn);
1003 		PG_RE_THROW();
1004 	}
1005 	PG_END_TRY();
1006 
1007 	walrcv_disconnect(wrconn);
1008 
1009 	pfree(cmd.data);
1010 
1011 	heap_close(rel, NoLock);
1012 }
1013 
1014 /*
1015  * Internal workhorse for changing a subscription owner
1016  */
1017 static void
AlterSubscriptionOwner_internal(Relation rel,HeapTuple tup,Oid newOwnerId)1018 AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1019 {
1020 	Form_pg_subscription form;
1021 
1022 	form = (Form_pg_subscription) GETSTRUCT(tup);
1023 
1024 	if (form->subowner == newOwnerId)
1025 		return;
1026 
1027 	if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
1028 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
1029 					   NameStr(form->subname));
1030 
1031 	/* New owner must be a superuser */
1032 	if (!superuser_arg(newOwnerId))
1033 		ereport(ERROR,
1034 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1035 				 errmsg("permission denied to change owner of subscription \"%s\"",
1036 						NameStr(form->subname)),
1037 				 errhint("The owner of a subscription must be a superuser.")));
1038 
1039 	form->subowner = newOwnerId;
1040 	CatalogTupleUpdate(rel, &tup->t_self, tup);
1041 
1042 	/* Update owner dependency reference */
1043 	changeDependencyOnOwner(SubscriptionRelationId,
1044 							HeapTupleGetOid(tup),
1045 							newOwnerId);
1046 
1047 	InvokeObjectPostAlterHook(SubscriptionRelationId,
1048 							  HeapTupleGetOid(tup), 0);
1049 }
1050 
1051 /*
1052  * Change subscription owner -- by name
1053  */
1054 ObjectAddress
AlterSubscriptionOwner(const char * name,Oid newOwnerId)1055 AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1056 {
1057 	Oid			subid;
1058 	HeapTuple	tup;
1059 	Relation	rel;
1060 	ObjectAddress address;
1061 
1062 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1063 
1064 	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1065 							  CStringGetDatum(name));
1066 
1067 	if (!HeapTupleIsValid(tup))
1068 		ereport(ERROR,
1069 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1070 				 errmsg("subscription \"%s\" does not exist", name)));
1071 
1072 	subid = HeapTupleGetOid(tup);
1073 
1074 	AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1075 
1076 	ObjectAddressSet(address, SubscriptionRelationId, subid);
1077 
1078 	heap_freetuple(tup);
1079 
1080 	heap_close(rel, RowExclusiveLock);
1081 
1082 	return address;
1083 }
1084 
1085 /*
1086  * Change subscription owner -- by OID
1087  */
1088 void
AlterSubscriptionOwner_oid(Oid subid,Oid newOwnerId)1089 AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
1090 {
1091 	HeapTuple	tup;
1092 	Relation	rel;
1093 
1094 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1095 
1096 	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
1097 
1098 	if (!HeapTupleIsValid(tup))
1099 		ereport(ERROR,
1100 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1101 				 errmsg("subscription with OID %u does not exist", subid)));
1102 
1103 	AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1104 
1105 	heap_freetuple(tup);
1106 
1107 	heap_close(rel, RowExclusiveLock);
1108 }
1109 
1110 /*
1111  * Get the list of tables which belong to specified publications on the
1112  * publisher connection.
1113  */
1114 static List *
fetch_table_list(WalReceiverConn * wrconn,List * publications)1115 fetch_table_list(WalReceiverConn *wrconn, List *publications)
1116 {
1117 	WalRcvExecResult *res;
1118 	StringInfoData cmd;
1119 	TupleTableSlot *slot;
1120 	Oid			tableRow[2] = {TEXTOID, TEXTOID};
1121 	ListCell   *lc;
1122 	bool		first;
1123 	List	   *tablelist = NIL;
1124 
1125 	Assert(list_length(publications) > 0);
1126 
1127 	initStringInfo(&cmd);
1128 	appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1129 					 "  FROM pg_catalog.pg_publication_tables t\n"
1130 					 " WHERE t.pubname IN (");
1131 	first = true;
1132 	foreach(lc, publications)
1133 	{
1134 		char	   *pubname = strVal(lfirst(lc));
1135 
1136 		if (first)
1137 			first = false;
1138 		else
1139 			appendStringInfoString(&cmd, ", ");
1140 
1141 		appendStringInfo(&cmd, "%s", quote_literal_cstr(pubname));
1142 	}
1143 	appendStringInfoString(&cmd, ")");
1144 
1145 	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1146 	pfree(cmd.data);
1147 
1148 	if (res->status != WALRCV_OK_TUPLES)
1149 		ereport(ERROR,
1150 				(errmsg("could not receive list of replicated tables from the publisher: %s",
1151 						res->err)));
1152 
1153 	/* Process tables. */
1154 	slot = MakeSingleTupleTableSlot(res->tupledesc);
1155 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1156 	{
1157 		char	   *nspname;
1158 		char	   *relname;
1159 		bool		isnull;
1160 		RangeVar   *rv;
1161 
1162 		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1163 		Assert(!isnull);
1164 		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1165 		Assert(!isnull);
1166 
1167 		rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1168 		tablelist = lappend(tablelist, rv);
1169 
1170 		ExecClearTuple(slot);
1171 	}
1172 	ExecDropSingleTupleTableSlot(slot);
1173 
1174 	walrcv_clear_result(res);
1175 
1176 	return tablelist;
1177 }
1178