1 /*-------------------------------------------------------------------------
2 *
3 * subscriptioncmds.c
4 * subscription catalog manipulation functions
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * subscriptioncmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "miscadmin.h"
18
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/xact.h"
22
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/objectaccess.h"
27 #include "catalog/objectaddress.h"
28 #include "catalog/pg_type.h"
29 #include "catalog/pg_subscription.h"
30 #include "catalog/pg_subscription_rel.h"
31
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
34 #include "commands/subscriptioncmds.h"
35
36 #include "executor/executor.h"
37
38 #include "nodes/makefuncs.h"
39
40 #include "replication/logicallauncher.h"
41 #include "replication/origin.h"
42 #include "replication/slot.h"
43 #include "replication/walreceiver.h"
44 #include "replication/walsender.h"
45 #include "replication/worker_internal.h"
46
47 #include "storage/lmgr.h"
48
49 #include "utils/builtins.h"
50 #include "utils/guc.h"
51 #include "utils/lsyscache.h"
52 #include "utils/memutils.h"
53 #include "utils/syscache.h"
54
55 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
56
57 /*
58 * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
59 *
60 * Since not all options can be specified in both commands, this function
61 * will report an error on options if the target output pointer is NULL to
62 * accommodate that.
63 */
64 static void
parse_subscription_options(List * options,bool * connect,bool * enabled_given,bool * enabled,bool * create_slot,bool * slot_name_given,char ** slot_name,bool * copy_data,char ** synchronous_commit,bool * refresh)65 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
66 bool *enabled, bool *create_slot,
67 bool *slot_name_given, char **slot_name,
68 bool *copy_data, char **synchronous_commit,
69 bool *refresh)
70 {
71 ListCell *lc;
72 bool connect_given = false;
73 bool create_slot_given = false;
74 bool copy_data_given = false;
75 bool refresh_given = false;
76
77 /* If connect is specified, the others also need to be. */
78 Assert(!connect || (enabled && create_slot && copy_data));
79
80 if (connect)
81 *connect = true;
82 if (enabled)
83 {
84 *enabled_given = false;
85 *enabled = true;
86 }
87 if (create_slot)
88 *create_slot = true;
89 if (slot_name)
90 {
91 *slot_name_given = false;
92 *slot_name = NULL;
93 }
94 if (copy_data)
95 *copy_data = true;
96 if (synchronous_commit)
97 *synchronous_commit = NULL;
98 if (refresh)
99 *refresh = true;
100
101 /* Parse options */
102 foreach(lc, options)
103 {
104 DefElem *defel = (DefElem *) lfirst(lc);
105
106 if (strcmp(defel->defname, "connect") == 0 && connect)
107 {
108 if (connect_given)
109 ereport(ERROR,
110 (errcode(ERRCODE_SYNTAX_ERROR),
111 errmsg("conflicting or redundant options")));
112
113 connect_given = true;
114 *connect = defGetBoolean(defel);
115 }
116 else if (strcmp(defel->defname, "enabled") == 0 && enabled)
117 {
118 if (*enabled_given)
119 ereport(ERROR,
120 (errcode(ERRCODE_SYNTAX_ERROR),
121 errmsg("conflicting or redundant options")));
122
123 *enabled_given = true;
124 *enabled = defGetBoolean(defel);
125 }
126 else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
127 {
128 if (create_slot_given)
129 ereport(ERROR,
130 (errcode(ERRCODE_SYNTAX_ERROR),
131 errmsg("conflicting or redundant options")));
132
133 create_slot_given = true;
134 *create_slot = defGetBoolean(defel);
135 }
136 else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
137 {
138 if (*slot_name_given)
139 ereport(ERROR,
140 (errcode(ERRCODE_SYNTAX_ERROR),
141 errmsg("conflicting or redundant options")));
142
143 *slot_name_given = true;
144 *slot_name = defGetString(defel);
145
146 /* Setting slot_name = NONE is treated as no slot name. */
147 if (strcmp(*slot_name, "none") == 0)
148 *slot_name = NULL;
149 else
150 ReplicationSlotValidateName(*slot_name, ERROR);
151 }
152 else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
153 {
154 if (copy_data_given)
155 ereport(ERROR,
156 (errcode(ERRCODE_SYNTAX_ERROR),
157 errmsg("conflicting or redundant options")));
158
159 copy_data_given = true;
160 *copy_data = defGetBoolean(defel);
161 }
162 else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
163 synchronous_commit)
164 {
165 if (*synchronous_commit)
166 ereport(ERROR,
167 (errcode(ERRCODE_SYNTAX_ERROR),
168 errmsg("conflicting or redundant options")));
169
170 *synchronous_commit = defGetString(defel);
171
172 /* Test if the given value is valid for synchronous_commit GUC. */
173 (void) set_config_option("synchronous_commit", *synchronous_commit,
174 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
175 false, 0, false);
176 }
177 else if (strcmp(defel->defname, "refresh") == 0 && refresh)
178 {
179 if (refresh_given)
180 ereport(ERROR,
181 (errcode(ERRCODE_SYNTAX_ERROR),
182 errmsg("conflicting or redundant options")));
183
184 refresh_given = true;
185 *refresh = defGetBoolean(defel);
186 }
187 else
188 ereport(ERROR,
189 (errcode(ERRCODE_SYNTAX_ERROR),
190 errmsg("unrecognized subscription parameter: %s", defel->defname)));
191 }
192
193 /*
194 * We've been explicitly asked to not connect, that requires some
195 * additional processing.
196 */
197 if (connect && !*connect)
198 {
199 /* Check for incompatible options from the user. */
200 if (enabled && *enabled_given && *enabled)
201 ereport(ERROR,
202 (errcode(ERRCODE_SYNTAX_ERROR),
203 errmsg("connect = false and enabled = true are mutually exclusive options")));
204
205 if (create_slot && create_slot_given && *create_slot)
206 ereport(ERROR,
207 (errcode(ERRCODE_SYNTAX_ERROR),
208 errmsg("connect = false and create_slot = true are mutually exclusive options")));
209
210 if (copy_data && copy_data_given && *copy_data)
211 ereport(ERROR,
212 (errcode(ERRCODE_SYNTAX_ERROR),
213 errmsg("connect = false and copy_data = true are mutually exclusive options")));
214
215 /* Change the defaults of other options. */
216 *enabled = false;
217 *create_slot = false;
218 *copy_data = false;
219 }
220
221 /*
222 * Do additional checking for disallowed combination when slot_name = NONE
223 * was used.
224 */
225 if (slot_name && *slot_name_given && !*slot_name)
226 {
227 if (enabled && *enabled_given && *enabled)
228 ereport(ERROR,
229 (errcode(ERRCODE_SYNTAX_ERROR),
230 errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
231
232 if (create_slot && create_slot_given && *create_slot)
233 ereport(ERROR,
234 (errcode(ERRCODE_SYNTAX_ERROR),
235 errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
236
237 if (enabled && !*enabled_given && *enabled)
238 ereport(ERROR,
239 (errcode(ERRCODE_SYNTAX_ERROR),
240 errmsg("subscription with slot_name = NONE must also set enabled = false")));
241
242 if (create_slot && !create_slot_given && *create_slot)
243 ereport(ERROR,
244 (errcode(ERRCODE_SYNTAX_ERROR),
245 errmsg("subscription with slot_name = NONE must also set create_slot = false")));
246 }
247 }
248
249 /*
250 * Auxiliary function to return a text array out of a list of String nodes.
251 */
252 static Datum
publicationListToArray(List * publist)253 publicationListToArray(List *publist)
254 {
255 ArrayType *arr;
256 Datum *datums;
257 int j = 0;
258 ListCell *cell;
259 MemoryContext memcxt;
260 MemoryContext oldcxt;
261
262 /* Create memory context for temporary allocations. */
263 memcxt = AllocSetContextCreate(CurrentMemoryContext,
264 "publicationListToArray to array",
265 ALLOCSET_DEFAULT_MINSIZE,
266 ALLOCSET_DEFAULT_INITSIZE,
267 ALLOCSET_DEFAULT_MAXSIZE);
268 oldcxt = MemoryContextSwitchTo(memcxt);
269
270 datums = palloc(sizeof(text *) * list_length(publist));
271 foreach(cell, publist)
272 {
273 char *name = strVal(lfirst(cell));
274 ListCell *pcell;
275
276 /* Check for duplicates. */
277 foreach(pcell, publist)
278 {
279 char *pname = strVal(lfirst(pcell));
280
281 if (name == pname)
282 break;
283
284 if (strcmp(name, pname) == 0)
285 ereport(ERROR,
286 (errcode(ERRCODE_SYNTAX_ERROR),
287 errmsg("publication name \"%s\" used more than once",
288 pname)));
289 }
290
291 datums[j++] = CStringGetTextDatum(name);
292 }
293
294 MemoryContextSwitchTo(oldcxt);
295
296 arr = construct_array(datums, list_length(publist),
297 TEXTOID, -1, false, 'i');
298 MemoryContextDelete(memcxt);
299
300 return PointerGetDatum(arr);
301 }
302
303 /*
304 * Create new subscription.
305 */
306 ObjectAddress
CreateSubscription(CreateSubscriptionStmt * stmt,bool isTopLevel)307 CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
308 {
309 Relation rel;
310 ObjectAddress myself;
311 Oid subid;
312 bool nulls[Natts_pg_subscription];
313 Datum values[Natts_pg_subscription];
314 Oid owner = GetUserId();
315 HeapTuple tup;
316 bool connect;
317 bool enabled_given;
318 bool enabled;
319 bool copy_data;
320 char *synchronous_commit;
321 char *conninfo;
322 char *slotname;
323 bool slotname_given;
324 char originname[NAMEDATALEN];
325 bool create_slot;
326 List *publications;
327
328 /*
329 * Parse and check options.
330 *
331 * Connection and publication should not be specified here.
332 */
333 parse_subscription_options(stmt->options, &connect, &enabled_given,
334 &enabled, &create_slot, &slotname_given,
335 &slotname, ©_data, &synchronous_commit,
336 NULL);
337
338 /*
339 * Since creating a replication slot is not transactional, rolling back
340 * the transaction leaves the created replication slot. So we cannot run
341 * CREATE SUBSCRIPTION inside a transaction block if creating a
342 * replication slot.
343 */
344 if (create_slot)
345 PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
346
347 if (!superuser())
348 ereport(ERROR,
349 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
350 (errmsg("must be superuser to create subscriptions"))));
351
352 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
353
354 /* Check if name is used */
355 subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
356 CStringGetDatum(stmt->subname));
357 if (OidIsValid(subid))
358 {
359 ereport(ERROR,
360 (errcode(ERRCODE_DUPLICATE_OBJECT),
361 errmsg("subscription \"%s\" already exists",
362 stmt->subname)));
363 }
364
365 if (!slotname_given && slotname == NULL)
366 slotname = stmt->subname;
367
368 /* The default for synchronous_commit of subscriptions is off. */
369 if (synchronous_commit == NULL)
370 synchronous_commit = "off";
371
372 conninfo = stmt->conninfo;
373 publications = stmt->publication;
374
375 /* Load the library providing us libpq calls. */
376 load_file("libpqwalreceiver", false);
377
378 /* Check the connection info string. */
379 walrcv_check_conninfo(conninfo);
380
381 /* Everything ok, form a new tuple. */
382 memset(values, 0, sizeof(values));
383 memset(nulls, false, sizeof(nulls));
384
385 values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
386 values[Anum_pg_subscription_subname - 1] =
387 DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
388 values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
389 values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
390 values[Anum_pg_subscription_subconninfo - 1] =
391 CStringGetTextDatum(conninfo);
392 if (slotname)
393 values[Anum_pg_subscription_subslotname - 1] =
394 DirectFunctionCall1(namein, CStringGetDatum(slotname));
395 else
396 nulls[Anum_pg_subscription_subslotname - 1] = true;
397 values[Anum_pg_subscription_subsynccommit - 1] =
398 CStringGetTextDatum(synchronous_commit);
399 values[Anum_pg_subscription_subpublications - 1] =
400 publicationListToArray(publications);
401
402 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
403
404 /* Insert tuple into catalog. */
405 subid = CatalogTupleInsert(rel, tup);
406 heap_freetuple(tup);
407
408 recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
409
410 snprintf(originname, sizeof(originname), "pg_%u", subid);
411 replorigin_create(originname);
412
413 /*
414 * Connect to remote side to execute requested commands and fetch table
415 * info.
416 */
417 if (connect)
418 {
419 XLogRecPtr lsn;
420 char *err;
421 WalReceiverConn *wrconn;
422 List *tables;
423 ListCell *lc;
424 char table_state;
425
426 /* Try to connect to the publisher. */
427 wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
428 if (!wrconn)
429 ereport(ERROR,
430 (errmsg("could not connect to the publisher: %s", err)));
431
432 PG_TRY();
433 {
434 /*
435 * Set sync state based on if we were asked to do data copy or
436 * not.
437 */
438 table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
439
440 /*
441 * Get the table list from publisher and build local table status
442 * info.
443 */
444 tables = fetch_table_list(wrconn, publications);
445 foreach(lc, tables)
446 {
447 RangeVar *rv = (RangeVar *) lfirst(lc);
448 Oid relid;
449
450 relid = RangeVarGetRelid(rv, AccessShareLock, false);
451
452 /* Check for supported relkind. */
453 CheckSubscriptionRelkind(get_rel_relkind(relid),
454 rv->schemaname, rv->relname);
455
456 SetSubscriptionRelState(subid, relid, table_state,
457 InvalidXLogRecPtr, false);
458 }
459
460 /*
461 * If requested, create permanent slot for the subscription. We
462 * won't use the initial snapshot for anything, so no need to
463 * export it.
464 */
465 if (create_slot)
466 {
467 Assert(slotname);
468
469 walrcv_create_slot(wrconn, slotname, false,
470 CRS_NOEXPORT_SNAPSHOT, &lsn);
471 ereport(NOTICE,
472 (errmsg("created replication slot \"%s\" on publisher",
473 slotname)));
474 }
475 }
476 PG_CATCH();
477 {
478 /* Close the connection in case of failure. */
479 walrcv_disconnect(wrconn);
480 PG_RE_THROW();
481 }
482 PG_END_TRY();
483
484 /* And we are done with the remote side. */
485 walrcv_disconnect(wrconn);
486 }
487 else
488 ereport(WARNING,
489 (errmsg("tables were not subscribed, you will have to run "
490 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
491 "subscribe the tables")));
492
493 heap_close(rel, RowExclusiveLock);
494
495 if (enabled)
496 ApplyLauncherWakeupAtCommit();
497
498 ObjectAddressSet(myself, SubscriptionRelationId, subid);
499
500 InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
501
502 return myself;
503 }
504
505 static void
AlterSubscription_refresh(Subscription * sub,bool copy_data)506 AlterSubscription_refresh(Subscription *sub, bool copy_data)
507 {
508 char *err;
509 List *pubrel_names;
510 List *subrel_states;
511 Oid *subrel_local_oids;
512 Oid *pubrel_local_oids;
513 WalReceiverConn *wrconn;
514 ListCell *lc;
515 int off;
516
517 /* Load the library providing us libpq calls. */
518 load_file("libpqwalreceiver", false);
519
520 /* Try to connect to the publisher. */
521 wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
522 if (!wrconn)
523 ereport(ERROR,
524 (errmsg("could not connect to the publisher: %s", err)));
525
526 /* Get the table list from publisher. */
527 pubrel_names = fetch_table_list(wrconn, sub->publications);
528
529 /* We are done with the remote side, close connection. */
530 walrcv_disconnect(wrconn);
531
532 /* Get local table list. */
533 subrel_states = GetSubscriptionRelations(sub->oid);
534
535 /*
536 * Build qsorted array of local table oids for faster lookup. This can
537 * potentially contain all tables in the database so speed of lookup is
538 * important.
539 */
540 subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
541 off = 0;
542 foreach(lc, subrel_states)
543 {
544 SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
545
546 subrel_local_oids[off++] = relstate->relid;
547 }
548 qsort(subrel_local_oids, list_length(subrel_states),
549 sizeof(Oid), oid_cmp);
550
551 /*
552 * Walk over the remote tables and try to match them to locally known
553 * tables. If the table is not known locally create a new state for it.
554 *
555 * Also builds array of local oids of remote tables for the next step.
556 */
557 off = 0;
558 pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
559
560 foreach(lc, pubrel_names)
561 {
562 RangeVar *rv = (RangeVar *) lfirst(lc);
563 Oid relid;
564
565 relid = RangeVarGetRelid(rv, AccessShareLock, false);
566
567 /* Check for supported relkind. */
568 CheckSubscriptionRelkind(get_rel_relkind(relid),
569 rv->schemaname, rv->relname);
570
571 pubrel_local_oids[off++] = relid;
572
573 if (!bsearch(&relid, subrel_local_oids,
574 list_length(subrel_states), sizeof(Oid), oid_cmp))
575 {
576 SetSubscriptionRelState(sub->oid, relid,
577 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
578 InvalidXLogRecPtr, false);
579 ereport(DEBUG1,
580 (errmsg("table \"%s.%s\" added to subscription \"%s\"",
581 rv->schemaname, rv->relname, sub->name)));
582 }
583 }
584
585 /*
586 * Next remove state for tables we should not care about anymore using the
587 * data we collected above
588 */
589 qsort(pubrel_local_oids, list_length(pubrel_names),
590 sizeof(Oid), oid_cmp);
591
592 for (off = 0; off < list_length(subrel_states); off++)
593 {
594 Oid relid = subrel_local_oids[off];
595
596 if (!bsearch(&relid, pubrel_local_oids,
597 list_length(pubrel_names), sizeof(Oid), oid_cmp))
598 {
599 RemoveSubscriptionRel(sub->oid, relid);
600
601 logicalrep_worker_stop_at_commit(sub->oid, relid);
602
603 ereport(DEBUG1,
604 (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
605 get_namespace_name(get_rel_namespace(relid)),
606 get_rel_name(relid),
607 sub->name)));
608 }
609 }
610 }
611
612 /*
613 * Alter the existing subscription.
614 */
615 ObjectAddress
AlterSubscription(AlterSubscriptionStmt * stmt)616 AlterSubscription(AlterSubscriptionStmt *stmt)
617 {
618 Relation rel;
619 ObjectAddress myself;
620 bool nulls[Natts_pg_subscription];
621 bool replaces[Natts_pg_subscription];
622 Datum values[Natts_pg_subscription];
623 HeapTuple tup;
624 Oid subid;
625 bool update_tuple = false;
626 Subscription *sub;
627
628 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
629
630 /* Fetch the existing tuple. */
631 tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
632 CStringGetDatum(stmt->subname));
633
634 if (!HeapTupleIsValid(tup))
635 ereport(ERROR,
636 (errcode(ERRCODE_UNDEFINED_OBJECT),
637 errmsg("subscription \"%s\" does not exist",
638 stmt->subname)));
639
640 /* must be owner */
641 if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
642 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
643 stmt->subname);
644
645 subid = HeapTupleGetOid(tup);
646 sub = GetSubscription(subid, false);
647
648 /* Lock the subscription so nobody else can do anything with it. */
649 LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
650
651 /* Form a new tuple. */
652 memset(values, 0, sizeof(values));
653 memset(nulls, false, sizeof(nulls));
654 memset(replaces, false, sizeof(replaces));
655
656 switch (stmt->kind)
657 {
658 case ALTER_SUBSCRIPTION_OPTIONS:
659 {
660 char *slotname;
661 bool slotname_given;
662 char *synchronous_commit;
663
664 parse_subscription_options(stmt->options, NULL, NULL, NULL,
665 NULL, &slotname_given, &slotname,
666 NULL, &synchronous_commit, NULL);
667
668 if (slotname_given)
669 {
670 if (sub->enabled && !slotname)
671 ereport(ERROR,
672 (errcode(ERRCODE_SYNTAX_ERROR),
673 errmsg("cannot set slot_name = NONE for enabled subscription")));
674
675 if (slotname)
676 values[Anum_pg_subscription_subslotname - 1] =
677 DirectFunctionCall1(namein, CStringGetDatum(slotname));
678 else
679 nulls[Anum_pg_subscription_subslotname - 1] = true;
680 replaces[Anum_pg_subscription_subslotname - 1] = true;
681 }
682
683 if (synchronous_commit)
684 {
685 values[Anum_pg_subscription_subsynccommit - 1] =
686 CStringGetTextDatum(synchronous_commit);
687 replaces[Anum_pg_subscription_subsynccommit - 1] = true;
688 }
689
690 update_tuple = true;
691 break;
692 }
693
694 case ALTER_SUBSCRIPTION_ENABLED:
695 {
696 bool enabled,
697 enabled_given;
698
699 parse_subscription_options(stmt->options, NULL,
700 &enabled_given, &enabled, NULL,
701 NULL, NULL, NULL, NULL, NULL);
702 Assert(enabled_given);
703
704 if (!sub->slotname && enabled)
705 ereport(ERROR,
706 (errcode(ERRCODE_SYNTAX_ERROR),
707 errmsg("cannot enable subscription that does not have a slot name")));
708
709 values[Anum_pg_subscription_subenabled - 1] =
710 BoolGetDatum(enabled);
711 replaces[Anum_pg_subscription_subenabled - 1] = true;
712
713 if (enabled)
714 ApplyLauncherWakeupAtCommit();
715
716 update_tuple = true;
717 break;
718 }
719
720 case ALTER_SUBSCRIPTION_CONNECTION:
721 /* Load the library providing us libpq calls. */
722 load_file("libpqwalreceiver", false);
723 /* Check the connection info string. */
724 walrcv_check_conninfo(stmt->conninfo);
725
726 values[Anum_pg_subscription_subconninfo - 1] =
727 CStringGetTextDatum(stmt->conninfo);
728 replaces[Anum_pg_subscription_subconninfo - 1] = true;
729 update_tuple = true;
730 break;
731
732 case ALTER_SUBSCRIPTION_PUBLICATION:
733 {
734 bool copy_data;
735 bool refresh;
736
737 parse_subscription_options(stmt->options, NULL, NULL, NULL,
738 NULL, NULL, NULL, ©_data,
739 NULL, &refresh);
740
741 values[Anum_pg_subscription_subpublications - 1] =
742 publicationListToArray(stmt->publication);
743 replaces[Anum_pg_subscription_subpublications - 1] = true;
744
745 update_tuple = true;
746
747 /* Refresh if user asked us to. */
748 if (refresh)
749 {
750 if (!sub->enabled)
751 ereport(ERROR,
752 (errcode(ERRCODE_SYNTAX_ERROR),
753 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
754 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
755
756 /* Make sure refresh sees the new list of publications. */
757 sub->publications = stmt->publication;
758
759 AlterSubscription_refresh(sub, copy_data);
760 }
761
762 break;
763 }
764
765 case ALTER_SUBSCRIPTION_REFRESH:
766 {
767 bool copy_data;
768
769 if (!sub->enabled)
770 ereport(ERROR,
771 (errcode(ERRCODE_SYNTAX_ERROR),
772 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
773
774 parse_subscription_options(stmt->options, NULL, NULL, NULL,
775 NULL, NULL, NULL, ©_data,
776 NULL, NULL);
777
778 AlterSubscription_refresh(sub, copy_data);
779
780 break;
781 }
782
783 default:
784 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
785 stmt->kind);
786 }
787
788 /* Update the catalog if needed. */
789 if (update_tuple)
790 {
791 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
792 replaces);
793
794 CatalogTupleUpdate(rel, &tup->t_self, tup);
795
796 heap_freetuple(tup);
797 }
798
799 heap_close(rel, RowExclusiveLock);
800
801 ObjectAddressSet(myself, SubscriptionRelationId, subid);
802
803 InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
804
805 return myself;
806 }
807
808 /*
809 * Drop a subscription
810 */
811 void
DropSubscription(DropSubscriptionStmt * stmt,bool isTopLevel)812 DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
813 {
814 Relation rel;
815 ObjectAddress myself;
816 HeapTuple tup;
817 Oid subid;
818 Datum datum;
819 bool isnull;
820 char *subname;
821 char *conninfo;
822 char *slotname;
823 List *subworkers;
824 ListCell *lc;
825 char originname[NAMEDATALEN];
826 char *err = NULL;
827 RepOriginId originid;
828 WalReceiverConn *wrconn;
829 StringInfoData cmd;
830
831 /*
832 * Lock pg_subscription with AccessExclusiveLock to ensure that the
833 * launcher doesn't restart new worker during dropping the subscription
834 */
835 rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
836
837 tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
838 CStringGetDatum(stmt->subname));
839
840 if (!HeapTupleIsValid(tup))
841 {
842 heap_close(rel, NoLock);
843
844 if (!stmt->missing_ok)
845 ereport(ERROR,
846 (errcode(ERRCODE_UNDEFINED_OBJECT),
847 errmsg("subscription \"%s\" does not exist",
848 stmt->subname)));
849 else
850 ereport(NOTICE,
851 (errmsg("subscription \"%s\" does not exist, skipping",
852 stmt->subname)));
853
854 return;
855 }
856
857 subid = HeapTupleGetOid(tup);
858
859 /* must be owner */
860 if (!pg_subscription_ownercheck(subid, GetUserId()))
861 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
862 stmt->subname);
863
864 /* DROP hook for the subscription being removed */
865 InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
866
867 /*
868 * Lock the subscription so nobody else can do anything with it (including
869 * the replication workers).
870 */
871 LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
872
873 /* Get subname */
874 datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
875 Anum_pg_subscription_subname, &isnull);
876 Assert(!isnull);
877 subname = pstrdup(NameStr(*DatumGetName(datum)));
878
879 /* Get conninfo */
880 datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
881 Anum_pg_subscription_subconninfo, &isnull);
882 Assert(!isnull);
883 conninfo = TextDatumGetCString(datum);
884
885 /* Get slotname */
886 datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
887 Anum_pg_subscription_subslotname, &isnull);
888 if (!isnull)
889 slotname = pstrdup(NameStr(*DatumGetName(datum)));
890 else
891 slotname = NULL;
892
893 /*
894 * Since dropping a replication slot is not transactional, the replication
895 * slot stays dropped even if the transaction rolls back. So we cannot
896 * run DROP SUBSCRIPTION inside a transaction block if dropping the
897 * replication slot.
898 *
899 * XXX The command name should really be something like "DROP SUBSCRIPTION
900 * of a subscription that is associated with a replication slot", but we
901 * don't have the proper facilities for that.
902 */
903 if (slotname)
904 PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
905
906 ObjectAddressSet(myself, SubscriptionRelationId, subid);
907 EventTriggerSQLDropAddObject(&myself, true, true);
908
909 /* Remove the tuple from catalog. */
910 CatalogTupleDelete(rel, &tup->t_self);
911
912 ReleaseSysCache(tup);
913
914 /*
915 * Stop all the subscription workers immediately.
916 *
917 * This is necessary if we are dropping the replication slot, so that the
918 * slot becomes accessible.
919 *
920 * It is also necessary if the subscription is disabled and was disabled
921 * in the same transaction. Then the workers haven't seen the disabling
922 * yet and will still be running, leading to hangs later when we want to
923 * drop the replication origin. If the subscription was disabled before
924 * this transaction, then there shouldn't be any workers left, so this
925 * won't make a difference.
926 *
927 * New workers won't be started because we hold an exclusive lock on the
928 * subscription till the end of the transaction.
929 */
930 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
931 subworkers = logicalrep_workers_find(subid, false);
932 LWLockRelease(LogicalRepWorkerLock);
933 foreach(lc, subworkers)
934 {
935 LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
936
937 logicalrep_worker_stop(w->subid, w->relid);
938 }
939 list_free(subworkers);
940
941 /* Clean up dependencies */
942 deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
943
944 /* Remove any associated relation synchronization states. */
945 RemoveSubscriptionRel(subid, InvalidOid);
946
947 /* Remove the origin tracking if exists. */
948 snprintf(originname, sizeof(originname), "pg_%u", subid);
949 originid = replorigin_by_name(originname, true);
950 if (originid != InvalidRepOriginId)
951 replorigin_drop(originid, false);
952
953 /*
954 * If there is no slot associated with the subscription, we can finish
955 * here.
956 */
957 if (!slotname)
958 {
959 heap_close(rel, NoLock);
960 return;
961 }
962
963 /*
964 * Otherwise drop the replication slot at the publisher node using the
965 * replication connection.
966 */
967 load_file("libpqwalreceiver", false);
968
969 initStringInfo(&cmd);
970 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
971
972 wrconn = walrcv_connect(conninfo, true, subname, &err);
973 if (wrconn == NULL)
974 ereport(ERROR,
975 (errmsg("could not connect to publisher when attempting to "
976 "drop the replication slot \"%s\"", slotname),
977 errdetail("The error was: %s", err),
978 errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
979 "to disassociate the subscription from the slot.")));
980
981 PG_TRY();
982 {
983 WalRcvExecResult *res;
984
985 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
986
987 if (res->status != WALRCV_OK_COMMAND)
988 ereport(ERROR,
989 (errmsg("could not drop the replication slot \"%s\" on publisher",
990 slotname),
991 errdetail("The error was: %s", res->err)));
992 else
993 ereport(NOTICE,
994 (errmsg("dropped replication slot \"%s\" on publisher",
995 slotname)));
996
997 walrcv_clear_result(res);
998 }
999 PG_CATCH();
1000 {
1001 /* Close the connection in case of failure */
1002 walrcv_disconnect(wrconn);
1003 PG_RE_THROW();
1004 }
1005 PG_END_TRY();
1006
1007 walrcv_disconnect(wrconn);
1008
1009 pfree(cmd.data);
1010
1011 heap_close(rel, NoLock);
1012 }
1013
1014 /*
1015 * Internal workhorse for changing a subscription owner
1016 */
1017 static void
AlterSubscriptionOwner_internal(Relation rel,HeapTuple tup,Oid newOwnerId)1018 AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1019 {
1020 Form_pg_subscription form;
1021
1022 form = (Form_pg_subscription) GETSTRUCT(tup);
1023
1024 if (form->subowner == newOwnerId)
1025 return;
1026
1027 if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
1028 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
1029 NameStr(form->subname));
1030
1031 /* New owner must be a superuser */
1032 if (!superuser_arg(newOwnerId))
1033 ereport(ERROR,
1034 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1035 errmsg("permission denied to change owner of subscription \"%s\"",
1036 NameStr(form->subname)),
1037 errhint("The owner of a subscription must be a superuser.")));
1038
1039 form->subowner = newOwnerId;
1040 CatalogTupleUpdate(rel, &tup->t_self, tup);
1041
1042 /* Update owner dependency reference */
1043 changeDependencyOnOwner(SubscriptionRelationId,
1044 HeapTupleGetOid(tup),
1045 newOwnerId);
1046
1047 InvokeObjectPostAlterHook(SubscriptionRelationId,
1048 HeapTupleGetOid(tup), 0);
1049 }
1050
1051 /*
1052 * Change subscription owner -- by name
1053 */
1054 ObjectAddress
AlterSubscriptionOwner(const char * name,Oid newOwnerId)1055 AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1056 {
1057 Oid subid;
1058 HeapTuple tup;
1059 Relation rel;
1060 ObjectAddress address;
1061
1062 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1063
1064 tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1065 CStringGetDatum(name));
1066
1067 if (!HeapTupleIsValid(tup))
1068 ereport(ERROR,
1069 (errcode(ERRCODE_UNDEFINED_OBJECT),
1070 errmsg("subscription \"%s\" does not exist", name)));
1071
1072 subid = HeapTupleGetOid(tup);
1073
1074 AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1075
1076 ObjectAddressSet(address, SubscriptionRelationId, subid);
1077
1078 heap_freetuple(tup);
1079
1080 heap_close(rel, RowExclusiveLock);
1081
1082 return address;
1083 }
1084
1085 /*
1086 * Change subscription owner -- by OID
1087 */
1088 void
AlterSubscriptionOwner_oid(Oid subid,Oid newOwnerId)1089 AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
1090 {
1091 HeapTuple tup;
1092 Relation rel;
1093
1094 rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1095
1096 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
1097
1098 if (!HeapTupleIsValid(tup))
1099 ereport(ERROR,
1100 (errcode(ERRCODE_UNDEFINED_OBJECT),
1101 errmsg("subscription with OID %u does not exist", subid)));
1102
1103 AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1104
1105 heap_freetuple(tup);
1106
1107 heap_close(rel, RowExclusiveLock);
1108 }
1109
1110 /*
1111 * Get the list of tables which belong to specified publications on the
1112 * publisher connection.
1113 */
1114 static List *
fetch_table_list(WalReceiverConn * wrconn,List * publications)1115 fetch_table_list(WalReceiverConn *wrconn, List *publications)
1116 {
1117 WalRcvExecResult *res;
1118 StringInfoData cmd;
1119 TupleTableSlot *slot;
1120 Oid tableRow[2] = {TEXTOID, TEXTOID};
1121 ListCell *lc;
1122 bool first;
1123 List *tablelist = NIL;
1124
1125 Assert(list_length(publications) > 0);
1126
1127 initStringInfo(&cmd);
1128 appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1129 " FROM pg_catalog.pg_publication_tables t\n"
1130 " WHERE t.pubname IN (");
1131 first = true;
1132 foreach(lc, publications)
1133 {
1134 char *pubname = strVal(lfirst(lc));
1135
1136 if (first)
1137 first = false;
1138 else
1139 appendStringInfoString(&cmd, ", ");
1140
1141 appendStringInfo(&cmd, "%s", quote_literal_cstr(pubname));
1142 }
1143 appendStringInfoString(&cmd, ")");
1144
1145 res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1146 pfree(cmd.data);
1147
1148 if (res->status != WALRCV_OK_TUPLES)
1149 ereport(ERROR,
1150 (errmsg("could not receive list of replicated tables from the publisher: %s",
1151 res->err)));
1152
1153 /* Process tables. */
1154 slot = MakeSingleTupleTableSlot(res->tupledesc);
1155 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1156 {
1157 char *nspname;
1158 char *relname;
1159 bool isnull;
1160 RangeVar *rv;
1161
1162 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1163 Assert(!isnull);
1164 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1165 Assert(!isnull);
1166
1167 rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1168 tablelist = lappend(tablelist, rv);
1169
1170 ExecClearTuple(slot);
1171 }
1172 ExecDropSingleTupleTableSlot(slot);
1173
1174 walrcv_clear_result(res);
1175
1176 return tablelist;
1177 }
1178