1 /*-------------------------------------------------------------------------
2 *
3 * citus_ruleutils.c
4 * Version independent ruleutils wrapper
5 *
6 * Copyright (c) Citus Data, Inc.
7 *-------------------------------------------------------------------------
8 */
9
10 #include "postgres.h"
11 #include "miscadmin.h"
12
13 #include "distributed/pg_version_constants.h"
14
15 #include <stddef.h>
16
17 #include "access/attnum.h"
18 #include "access/genam.h"
19 #include "access/heapam.h"
20 #include "access/htup.h"
21 #include "access/htup_details.h"
22 #include "access/skey.h"
23 #include "access/stratnum.h"
24 #include "access/sysattr.h"
25 #if PG_VERSION_NUM >= PG_VERSION_14
26 #include "access/toast_compression.h"
27 #endif
28 #include "access/tupdesc.h"
29 #include "catalog/dependency.h"
30 #include "catalog/indexing.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_am.h"
33 #include "catalog/pg_attribute.h"
34 #include "catalog/pg_authid.h"
35 #include "catalog/pg_class.h"
36 #include "catalog/pg_collation.h"
37 #include "catalog/pg_depend.h"
38 #include "catalog/pg_extension.h"
39 #include "catalog/pg_foreign_data_wrapper.h"
40 #include "catalog/pg_index.h"
41 #include "catalog/pg_type.h"
42 #include "commands/defrem.h"
43 #include "commands/extension.h"
44 #include "distributed/citus_ruleutils.h"
45 #include "distributed/commands.h"
46 #include "distributed/listutils.h"
47 #include "distributed/multi_partitioning_utils.h"
48 #include "distributed/metadata_cache.h"
49 #include "distributed/metadata_sync.h"
50 #include "distributed/metadata_utility.h"
51 #include "distributed/namespace_utils.h"
52 #include "distributed/relay_utility.h"
53 #include "distributed/version_compat.h"
54 #include "distributed/worker_protocol.h"
55 #include "foreign/foreign.h"
56 #include "lib/stringinfo.h"
57 #include "nodes/nodes.h"
58 #include "nodes/nodeFuncs.h"
59 #include "nodes/parsenodes.h"
60 #include "nodes/pg_list.h"
61 #include "parser/parse_utilcmd.h"
62 #include "parser/parser.h"
63 #include "storage/lock.h"
64 #include "utils/acl.h"
65 #include "utils/array.h"
66 #include "utils/builtins.h"
67 #include "utils/elog.h"
68 #include "utils/errcodes.h"
69 #include "utils/fmgroids.h"
70 #include "utils/lsyscache.h"
71 #include "utils/palloc.h"
72 #include "utils/rel.h"
73 #include "utils/relcache.h"
74 #include "utils/ruleutils.h"
75 #include "utils/syscache.h"
76
77
78 static void deparse_index_columns(StringInfo buffer, List *indexParameterList,
79 List *deparseContext);
80 static void AppendOptionListToString(StringInfo stringData, List *options);
81 static void AppendStorageParametersToString(StringInfo stringBuffer,
82 List *optionList);
83 static void simple_quote_literal(StringInfo buf, const char *val);
84 static char * flatten_reloptions(Oid relid);
85 static void AddVacuumParams(ReindexStmt *reindexStmt, StringInfo buffer);
86
87
88 /*
89 * pg_get_extensiondef_string finds the foreign data wrapper that corresponds to
90 * the given foreign tableId, and checks if an extension owns this foreign data
91 * wrapper. If it does, the function returns the extension's definition. If not,
92 * the function returns null.
93 */
94 char *
pg_get_extensiondef_string(Oid tableRelationId)95 pg_get_extensiondef_string(Oid tableRelationId)
96 {
97 ForeignTable *foreignTable = GetForeignTable(tableRelationId);
98 ForeignServer *server = GetForeignServer(foreignTable->serverid);
99 ForeignDataWrapper *foreignDataWrapper = GetForeignDataWrapper(server->fdwid);
100 StringInfoData buffer = { NULL, 0, 0, 0 };
101
102 Oid classId = ForeignDataWrapperRelationId;
103 Oid objectId = server->fdwid;
104
105 Oid extensionId = getExtensionOfObject(classId, objectId);
106 if (OidIsValid(extensionId))
107 {
108 char *extensionName = get_extension_name(extensionId);
109 Oid extensionSchemaId = get_extension_schema(extensionId);
110 char *extensionSchema = get_namespace_name(extensionSchemaId);
111
112 initStringInfo(&buffer);
113 appendStringInfo(&buffer, "CREATE EXTENSION IF NOT EXISTS %s WITH SCHEMA %s",
114 quote_identifier(extensionName),
115 quote_identifier(extensionSchema));
116 }
117 else
118 {
119 ereport(NOTICE, (errmsg("foreign-data wrapper \"%s\" does not have an "
120 "extension defined", foreignDataWrapper->fdwname)));
121 }
122
123 return (buffer.data);
124 }
125
126
127 /*
128 * get_extension_schema - given an extension OID, fetch its extnamespace
129 *
130 * Returns InvalidOid if no such extension.
131 */
132 Oid
get_extension_schema(Oid ext_oid)133 get_extension_schema(Oid ext_oid)
134 {
135 /* *INDENT-OFF* */
136 Oid result;
137 Relation rel;
138 HeapTuple tuple;
139 ScanKeyData entry[1];
140
141 rel = table_open(ExtensionRelationId, AccessShareLock);
142
143 ScanKeyInit(&entry[0],
144 Anum_pg_extension_oid,
145 BTEqualStrategyNumber, F_OIDEQ,
146 ObjectIdGetDatum(ext_oid));
147
148 SysScanDesc scandesc = systable_beginscan(rel, ExtensionOidIndexId, true,
149 NULL, 1, entry);
150
151 tuple = systable_getnext(scandesc);
152
153 /* We assume that there can be at most one matching tuple */
154 if (HeapTupleIsValid(tuple))
155 result = ((Form_pg_extension) GETSTRUCT(tuple))->extnamespace;
156 else
157 result = InvalidOid;
158
159 systable_endscan(scandesc);
160
161 table_close(rel, AccessShareLock);
162
163 return result;
164 /* *INDENT-ON* */
165 }
166
167
168 /*
169 * pg_get_serverdef_string finds the foreign server that corresponds to the
170 * given foreign tableId, and returns this server's definition.
171 */
172 char *
pg_get_serverdef_string(Oid tableRelationId)173 pg_get_serverdef_string(Oid tableRelationId)
174 {
175 ForeignTable *foreignTable = GetForeignTable(tableRelationId);
176 ForeignServer *server = GetForeignServer(foreignTable->serverid);
177 ForeignDataWrapper *foreignDataWrapper = GetForeignDataWrapper(server->fdwid);
178
179 StringInfoData buffer = { NULL, 0, 0, 0 };
180 initStringInfo(&buffer);
181
182 appendStringInfo(&buffer, "CREATE SERVER IF NOT EXISTS %s",
183 quote_identifier(server->servername));
184 if (server->servertype != NULL)
185 {
186 appendStringInfo(&buffer, " TYPE %s",
187 quote_literal_cstr(server->servertype));
188 }
189 if (server->serverversion != NULL)
190 {
191 appendStringInfo(&buffer, " VERSION %s",
192 quote_literal_cstr(server->serverversion));
193 }
194
195 appendStringInfo(&buffer, " FOREIGN DATA WRAPPER %s",
196 quote_identifier(foreignDataWrapper->fdwname));
197
198 /* append server options, if any */
199 AppendOptionListToString(&buffer, server->options);
200
201 return (buffer.data);
202 }
203
204
205 /*
206 * pg_get_sequencedef_string returns the definition of a given sequence. This
207 * definition includes explicit values for all CREATE SEQUENCE options.
208 */
209 char *
pg_get_sequencedef_string(Oid sequenceRelationId)210 pg_get_sequencedef_string(Oid sequenceRelationId)
211 {
212 Form_pg_sequence pgSequenceForm = pg_get_sequencedef(sequenceRelationId);
213
214 /* build our DDL command */
215 char *qualifiedSequenceName = generate_qualified_relation_name(sequenceRelationId);
216 char *typeName = format_type_be(pgSequenceForm->seqtypid);
217
218 char *sequenceDef = psprintf(CREATE_SEQUENCE_COMMAND, qualifiedSequenceName,
219 typeName,
220 pgSequenceForm->seqincrement, pgSequenceForm->seqmin,
221 pgSequenceForm->seqmax, pgSequenceForm->seqstart,
222 pgSequenceForm->seqcache,
223 pgSequenceForm->seqcycle ? "" : "NO ");
224
225 return sequenceDef;
226 }
227
228
229 /*
230 * pg_get_sequencedef returns the Form_pg_sequence data about the sequence with the given
231 * object id.
232 */
233 Form_pg_sequence
pg_get_sequencedef(Oid sequenceRelationId)234 pg_get_sequencedef(Oid sequenceRelationId)
235 {
236 HeapTuple heapTuple = SearchSysCache1(SEQRELID, sequenceRelationId);
237 if (!HeapTupleIsValid(heapTuple))
238 {
239 elog(ERROR, "cache lookup failed for sequence %u", sequenceRelationId);
240 }
241
242 Form_pg_sequence pgSequenceForm = (Form_pg_sequence) GETSTRUCT(heapTuple);
243
244 ReleaseSysCache(heapTuple);
245
246 return pgSequenceForm;
247 }
248
249
250 /*
251 * pg_get_tableschemadef_string returns the definition of a given table. This
252 * definition includes table's schema, default column values, not null and check
253 * constraints. The definition does not include constraints that trigger index
254 * creations; specifically, unique and primary key constraints are excluded.
255 * When includeSequenceDefaults is NEXTVAL_SEQUENCE_DEFAULTS, the function also creates
256 * DEFAULT clauses for columns getting their default values from a sequence.
257 * When it's WORKER_NEXTVAL_SEQUENCE_DEFAULTS, the function creates the DEFAULT
258 * clause using worker_nextval('sequence') and not nextval('sequence')
259 */
260 char *
pg_get_tableschemadef_string(Oid tableRelationId,IncludeSequenceDefaults includeSequenceDefaults,char * accessMethod)261 pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
262 includeSequenceDefaults, char *accessMethod)
263 {
264 bool firstAttributePrinted = false;
265 AttrNumber defaultValueIndex = 0;
266 AttrNumber constraintIndex = 0;
267 AttrNumber constraintCount = 0;
268 StringInfoData buffer = { NULL, 0, 0, 0 };
269
270 /*
271 * Instead of retrieving values from system catalogs as other functions in
272 * ruleutils.c do, we follow an unusual approach here: we open the relation,
273 * and fetch the relation's tuple descriptor. We do this because the tuple
274 * descriptor already contains information harnessed from pg_attrdef,
275 * pg_attribute, pg_constraint, and pg_class; and therefore using the
276 * descriptor saves us from a lot of additional work.
277 */
278 Relation relation = relation_open(tableRelationId, AccessShareLock);
279 char *relationName = generate_relation_name(tableRelationId, NIL);
280
281 EnsureRelationKindSupported(tableRelationId);
282
283 initStringInfo(&buffer);
284
285 if (RegularTable(tableRelationId))
286 {
287 appendStringInfoString(&buffer, "CREATE ");
288
289 if (relation->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
290 {
291 appendStringInfoString(&buffer, "UNLOGGED ");
292 }
293
294 appendStringInfo(&buffer, "TABLE %s (", relationName);
295 }
296 else
297 {
298 appendStringInfo(&buffer, "CREATE FOREIGN TABLE %s (", relationName);
299 }
300
301 /*
302 * Iterate over the table's columns. If a particular column is not dropped
303 * and is not inherited from another table, print the column's name and its
304 * formatted type.
305 */
306 TupleDesc tupleDescriptor = RelationGetDescr(relation);
307 TupleConstr *tupleConstraints = tupleDescriptor->constr;
308
309 for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
310 attributeIndex++)
311 {
312 Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
313
314 /*
315 * We disregard the inherited attributes (i.e., attinhcount > 0) here. The
316 * reasoning behind this is that Citus implements declarative partitioning
317 * by creating the partitions first and then sending
318 * "ALTER TABLE parent_table ATTACH PARTITION .." command. This may not play
319 * well with regular inherited tables, which isn't a big concern from Citus'
320 * perspective.
321 */
322 if (!attributeForm->attisdropped)
323 {
324 if (firstAttributePrinted)
325 {
326 appendStringInfoString(&buffer, ", ");
327 }
328 firstAttributePrinted = true;
329
330 const char *attributeName = NameStr(attributeForm->attname);
331 appendStringInfo(&buffer, "%s ", quote_identifier(attributeName));
332
333 const char *attributeTypeName = format_type_with_typemod(
334 attributeForm->atttypid,
335 attributeForm->
336 atttypmod);
337 appendStringInfoString(&buffer, attributeTypeName);
338
339 /* if this column has a default value, append the default value */
340 if (attributeForm->atthasdef)
341 {
342 List *defaultContext = NULL;
343 char *defaultString = NULL;
344
345 Assert(tupleConstraints != NULL);
346
347 AttrDefault *defaultValueList = tupleConstraints->defval;
348 Assert(defaultValueList != NULL);
349
350 AttrDefault *defaultValue = &(defaultValueList[defaultValueIndex]);
351 defaultValueIndex++;
352
353 Assert(defaultValue->adnum == (attributeIndex + 1));
354 Assert(defaultValueIndex <= tupleConstraints->num_defval);
355
356 /* convert expression to node tree, and prepare deparse context */
357 Node *defaultNode = (Node *) stringToNode(defaultValue->adbin);
358
359 /*
360 * if column default value is explicitly requested, or it is
361 * not set from a sequence then we include DEFAULT clause for
362 * this column.
363 */
364 if (includeSequenceDefaults ||
365 !contain_nextval_expression_walker(defaultNode, NULL))
366 {
367 defaultContext = deparse_context_for(relationName, tableRelationId);
368
369 /* deparse default value string */
370 defaultString = deparse_expression(defaultNode, defaultContext,
371 false, false);
372
373 if (attributeForm->attgenerated == ATTRIBUTE_GENERATED_STORED)
374 {
375 appendStringInfo(&buffer, " GENERATED ALWAYS AS (%s) STORED",
376 defaultString);
377 }
378 else
379 {
380 Oid seqOid = GetSequenceOid(tableRelationId, defaultValue->adnum);
381 if (includeSequenceDefaults == WORKER_NEXTVAL_SEQUENCE_DEFAULTS &&
382 seqOid != InvalidOid &&
383 pg_get_sequencedef(seqOid)->seqtypid != INT8OID)
384 {
385 /*
386 * We use worker_nextval for int and smallint types.
387 * Check issue #5126 and PR #5254 for details.
388 * https://github.com/citusdata/citus/issues/5126
389 */
390 char *sequenceName = generate_qualified_relation_name(
391 seqOid);
392 appendStringInfo(&buffer,
393 " DEFAULT worker_nextval(%s::regclass)",
394 quote_literal_cstr(sequenceName));
395 }
396 else
397 {
398 appendStringInfo(&buffer, " DEFAULT %s", defaultString);
399 }
400 }
401 }
402 }
403
404 /* if this column has a not null constraint, append the constraint */
405 if (attributeForm->attnotnull)
406 {
407 appendStringInfoString(&buffer, " NOT NULL");
408 }
409
410 #if PG_VERSION_NUM >= PG_VERSION_14
411 if (CompressionMethodIsValid(attributeForm->attcompression))
412 {
413 appendStringInfo(&buffer, " COMPRESSION %s",
414 GetCompressionMethodName(attributeForm->attcompression));
415 }
416 #endif
417
418 if (attributeForm->attcollation != InvalidOid &&
419 attributeForm->attcollation != DEFAULT_COLLATION_OID)
420 {
421 appendStringInfo(&buffer, " COLLATE %s", generate_collation_name(
422 attributeForm->attcollation));
423 }
424 }
425 }
426
427 /*
428 * Now check if the table has any constraints. If it does, set the number of
429 * check constraints here. Then iterate over all check constraints and print
430 * them.
431 */
432 if (tupleConstraints != NULL)
433 {
434 constraintCount = tupleConstraints->num_check;
435 }
436
437 for (constraintIndex = 0; constraintIndex < constraintCount; constraintIndex++)
438 {
439 ConstrCheck *checkConstraintList = tupleConstraints->check;
440 ConstrCheck *checkConstraint = &(checkConstraintList[constraintIndex]);
441
442
443 /* if an attribute or constraint has been printed, format properly */
444 if (firstAttributePrinted || constraintIndex > 0)
445 {
446 appendStringInfoString(&buffer, ", ");
447 }
448
449 appendStringInfo(&buffer, "CONSTRAINT %s CHECK ",
450 quote_identifier(checkConstraint->ccname));
451
452 /* convert expression to node tree, and prepare deparse context */
453 Node *checkNode = (Node *) stringToNode(checkConstraint->ccbin);
454 List *checkContext = deparse_context_for(relationName, tableRelationId);
455
456 /* deparse check constraint string */
457 char *checkString = deparse_expression(checkNode, checkContext, false, false);
458
459 appendStringInfoString(&buffer, "(");
460 appendStringInfoString(&buffer, checkString);
461 appendStringInfoString(&buffer, ")");
462 }
463
464 /* close create table's outer parentheses */
465 appendStringInfoString(&buffer, ")");
466
467 /*
468 * If the relation is a foreign table, append the server name and options to
469 * the create table statement.
470 */
471 char relationKind = relation->rd_rel->relkind;
472 if (relationKind == RELKIND_FOREIGN_TABLE)
473 {
474 ForeignTable *foreignTable = GetForeignTable(tableRelationId);
475 ForeignServer *foreignServer = GetForeignServer(foreignTable->serverid);
476
477 char *serverName = foreignServer->servername;
478 appendStringInfo(&buffer, " SERVER %s", quote_identifier(serverName));
479 AppendOptionListToString(&buffer, foreignTable->options);
480 }
481 else if (relationKind == RELKIND_PARTITIONED_TABLE)
482 {
483 char *partitioningInformation = GeneratePartitioningInformation(tableRelationId);
484 appendStringInfo(&buffer, " PARTITION BY %s ", partitioningInformation);
485 }
486
487 /*
488 * Add table access methods for pg12 and higher when the table is configured with an
489 * access method
490 */
491 if (accessMethod)
492 {
493 appendStringInfo(&buffer, " USING %s", quote_identifier(accessMethod));
494 }
495 else if (OidIsValid(relation->rd_rel->relam))
496 {
497 HeapTuple amTup = SearchSysCache1(AMOID, ObjectIdGetDatum(
498 relation->rd_rel->relam));
499 if (!HeapTupleIsValid(amTup))
500 {
501 elog(ERROR, "cache lookup failed for access method %u",
502 relation->rd_rel->relam);
503 }
504 Form_pg_am amForm = (Form_pg_am) GETSTRUCT(amTup);
505 appendStringInfo(&buffer, " USING %s", quote_identifier(NameStr(amForm->amname)));
506 ReleaseSysCache(amTup);
507 }
508
509 /*
510 * Add any reloptions (storage parameters) defined on the table in a WITH
511 * clause.
512 */
513 {
514 char *reloptions = flatten_reloptions(tableRelationId);
515 if (reloptions)
516 {
517 appendStringInfo(&buffer, " WITH (%s)", reloptions);
518 pfree(reloptions);
519 }
520 }
521
522 relation_close(relation, AccessShareLock);
523
524 return (buffer.data);
525 }
526
527
528 /*
529 * EnsureRelationKindSupported errors out if the given relation is not supported
530 * as a distributed relation.
531 */
532 void
EnsureRelationKindSupported(Oid relationId)533 EnsureRelationKindSupported(Oid relationId)
534 {
535 char relationKind = get_rel_relkind(relationId);
536 if (!relationKind)
537 {
538 ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
539 errmsg("relation with OID %d does not exist",
540 relationId)));
541 }
542
543 bool supportedRelationKind = RegularTable(relationId) ||
544 relationKind == RELKIND_FOREIGN_TABLE;
545
546 /*
547 * Citus doesn't support bare inherited tables (i.e., not a partition or
548 * partitioned table)
549 */
550 supportedRelationKind = supportedRelationKind && !(IsChildTable(relationId) ||
551 IsParentTable(relationId));
552
553 if (!supportedRelationKind)
554 {
555 char *relationName = get_rel_name(relationId);
556
557 ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
558 errmsg("%s is not a regular, foreign or partitioned table",
559 relationName)));
560 }
561 }
562
563
564 /*
565 * pg_get_tablecolumnoptionsdef_string returns column storage type and column
566 * statistics definitions for given table, _if_ these definitions differ from
567 * their default values. The function returns null if all columns use default
568 * values for their storage types and statistics.
569 */
570 char *
pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)571 pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
572 {
573 List *columnOptionList = NIL;
574 ListCell *columnOptionCell = NULL;
575 bool firstOptionPrinted = false;
576 StringInfoData buffer = { NULL, 0, 0, 0 };
577
578 /*
579 * Instead of retrieving values from system catalogs, we open the relation,
580 * and use the relation's tuple descriptor to access attribute information.
581 * This is primarily to maintain symmetry with pg_get_tableschemadef.
582 */
583 Relation relation = relation_open(tableRelationId, AccessShareLock);
584
585 EnsureRelationKindSupported(tableRelationId);
586
587 /*
588 * Iterate over the table's columns. If a particular column is not dropped
589 * and is not inherited from another table, check if column storage or
590 * statistics statements need to be printed.
591 */
592 TupleDesc tupleDescriptor = RelationGetDescr(relation);
593
594 if (tupleDescriptor->natts > MaxAttrNumber)
595 {
596 ereport(ERROR, (errmsg("bad number of tuple descriptor attributes")));
597 }
598
599 AttrNumber natts = tupleDescriptor->natts;
600 for (AttrNumber attributeIndex = 0;
601 attributeIndex < natts;
602 attributeIndex++)
603 {
604 Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
605 char *attributeName = NameStr(attributeForm->attname);
606 char defaultStorageType = get_typstorage(attributeForm->atttypid);
607
608 if (!attributeForm->attisdropped && attributeForm->attinhcount == 0)
609 {
610 /*
611 * If the user changed the column's default storage type, create
612 * alter statement and add statement to a list for later processing.
613 */
614 if (attributeForm->attstorage != defaultStorageType)
615 {
616 char *storageName = 0;
617 StringInfoData statement = { NULL, 0, 0, 0 };
618 initStringInfo(&statement);
619
620 switch (attributeForm->attstorage)
621 {
622 case 'p':
623 {
624 storageName = "PLAIN";
625 break;
626 }
627
628 case 'e':
629 {
630 storageName = "EXTERNAL";
631 break;
632 }
633
634 case 'm':
635 {
636 storageName = "MAIN";
637 break;
638 }
639
640 case 'x':
641 {
642 storageName = "EXTENDED";
643 break;
644 }
645
646 default:
647 {
648 ereport(ERROR, (errmsg("unrecognized storage type: %c",
649 attributeForm->attstorage)));
650 break;
651 }
652 }
653
654 appendStringInfo(&statement, "ALTER COLUMN %s ",
655 quote_identifier(attributeName));
656 appendStringInfo(&statement, "SET STORAGE %s", storageName);
657
658 columnOptionList = lappend(columnOptionList, statement.data);
659 }
660
661 /*
662 * If the user changed the column's statistics target, create
663 * alter statement and add statement to a list for later processing.
664 */
665 if (attributeForm->attstattarget >= 0)
666 {
667 StringInfoData statement = { NULL, 0, 0, 0 };
668 initStringInfo(&statement);
669
670 appendStringInfo(&statement, "ALTER COLUMN %s ",
671 quote_identifier(attributeName));
672 appendStringInfo(&statement, "SET STATISTICS %d",
673 attributeForm->attstattarget);
674
675 columnOptionList = lappend(columnOptionList, statement.data);
676 }
677 }
678 }
679
680 /*
681 * Iterate over column storage and statistics statements that we created,
682 * and append them to a single alter table statement.
683 */
684 foreach(columnOptionCell, columnOptionList)
685 {
686 if (!firstOptionPrinted)
687 {
688 initStringInfo(&buffer);
689 appendStringInfo(&buffer, "ALTER TABLE ONLY %s ",
690 generate_relation_name(tableRelationId, NIL));
691 }
692 else
693 {
694 appendStringInfoString(&buffer, ", ");
695 }
696 firstOptionPrinted = true;
697
698 char *columnOptionStatement = (char *) lfirst(columnOptionCell);
699 appendStringInfoString(&buffer, columnOptionStatement);
700
701 pfree(columnOptionStatement);
702 }
703
704 list_free(columnOptionList);
705 relation_close(relation, AccessShareLock);
706
707 return (buffer.data);
708 }
709
710
711 /*
712 * deparse_shard_index_statement uses the provided CREATE INDEX node, dist.
713 * relation, and shard identifier to populate a provided buffer with a string
714 * representation of a shard-extended version of that command.
715 */
716 void
deparse_shard_index_statement(IndexStmt * origStmt,Oid distrelid,int64 shardid,StringInfo buffer)717 deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid,
718 StringInfo buffer)
719 {
720 IndexStmt *indexStmt = copyObject(origStmt); /* copy to avoid modifications */
721
722 /* extend relation and index name using shard identifier */
723 AppendShardIdToName(&(indexStmt->relation->relname), shardid);
724 AppendShardIdToName(&(indexStmt->idxname), shardid);
725
726 char *relationName = indexStmt->relation->relname;
727 char *indexName = indexStmt->idxname;
728
729 /* use extended shard name and transformed stmt for deparsing */
730 List *deparseContext = deparse_context_for(relationName, distrelid);
731 indexStmt = transformIndexStmt(distrelid, indexStmt, NULL);
732
733 appendStringInfo(buffer, "CREATE %s INDEX %s %s %s ON %s %s USING %s ",
734 (indexStmt->unique ? "UNIQUE" : ""),
735 (indexStmt->concurrent ? "CONCURRENTLY" : ""),
736 (indexStmt->if_not_exists ? "IF NOT EXISTS" : ""),
737 quote_identifier(indexName),
738 (indexStmt->relation->inh ? "" : "ONLY"),
739 quote_qualified_identifier(indexStmt->relation->schemaname,
740 relationName),
741 indexStmt->accessMethod);
742
743 /*
744 * Switch to empty search_path to deparse_index_columns to produce fully-
745 * qualified names in expressions.
746 */
747 PushOverrideEmptySearchPath(CurrentMemoryContext);
748
749 /* index column or expression list begins here */
750 appendStringInfoChar(buffer, '(');
751 deparse_index_columns(buffer, indexStmt->indexParams, deparseContext);
752 appendStringInfoString(buffer, ") ");
753
754 /* column/expressions for INCLUDE list */
755 if (indexStmt->indexIncludingParams != NIL)
756 {
757 appendStringInfoString(buffer, "INCLUDE (");
758 deparse_index_columns(buffer, indexStmt->indexIncludingParams, deparseContext);
759 appendStringInfoChar(buffer, ')');
760 }
761
762 AppendStorageParametersToString(buffer, indexStmt->options);
763
764 if (indexStmt->whereClause != NULL)
765 {
766 appendStringInfo(buffer, "WHERE %s", deparse_expression(indexStmt->whereClause,
767 deparseContext, false,
768 false));
769 }
770
771 /* revert back to original search_path */
772 PopOverrideSearchPath();
773 }
774
775
776 /*
777 * deparse_shard_reindex_statement uses the provided REINDEX node, dist.
778 * relation, and shard identifier to populate a provided buffer with a string
779 * representation of a shard-extended version of that command.
780 */
781 void
deparse_shard_reindex_statement(ReindexStmt * origStmt,Oid distrelid,int64 shardid,StringInfo buffer)782 deparse_shard_reindex_statement(ReindexStmt *origStmt, Oid distrelid, int64 shardid,
783 StringInfo buffer)
784 {
785 ReindexStmt *reindexStmt = copyObject(origStmt); /* copy to avoid modifications */
786 char *relationName = NULL;
787 const char *concurrentlyString =
788 IsReindexWithParam_compat(reindexStmt, "concurrently") ? "CONCURRENTLY " : "";
789
790
791 if (reindexStmt->kind == REINDEX_OBJECT_INDEX ||
792 reindexStmt->kind == REINDEX_OBJECT_TABLE)
793 {
794 /* extend relation and index name using shard identifier */
795 AppendShardIdToName(&(reindexStmt->relation->relname), shardid);
796
797 relationName = reindexStmt->relation->relname;
798 }
799
800 appendStringInfoString(buffer, "REINDEX ");
801 AddVacuumParams(reindexStmt, buffer);
802
803 switch (reindexStmt->kind)
804 {
805 case REINDEX_OBJECT_INDEX:
806 {
807 appendStringInfo(buffer, "INDEX %s%s", concurrentlyString,
808 quote_qualified_identifier(reindexStmt->relation->schemaname,
809 relationName));
810 break;
811 }
812
813 case REINDEX_OBJECT_TABLE:
814 {
815 appendStringInfo(buffer, "TABLE %s%s", concurrentlyString,
816 quote_qualified_identifier(reindexStmt->relation->schemaname,
817 relationName));
818 break;
819 }
820
821 case REINDEX_OBJECT_SCHEMA:
822 {
823 appendStringInfo(buffer, "SCHEMA %s%s", concurrentlyString,
824 quote_identifier(reindexStmt->name));
825 break;
826 }
827
828 case REINDEX_OBJECT_SYSTEM:
829 {
830 appendStringInfo(buffer, "SYSTEM %s%s", concurrentlyString,
831 quote_identifier(reindexStmt->name));
832 break;
833 }
834
835 case REINDEX_OBJECT_DATABASE:
836 {
837 appendStringInfo(buffer, "DATABASE %s%s", concurrentlyString,
838 quote_identifier(reindexStmt->name));
839 break;
840 }
841 }
842 }
843
844
845 /*
846 * IsReindexWithParam_compat returns true if the given parameter
847 * exists for the given reindexStmt.
848 */
849 bool
IsReindexWithParam_compat(ReindexStmt * reindexStmt,char * param)850 IsReindexWithParam_compat(ReindexStmt *reindexStmt, char *param)
851 {
852 #if PG_VERSION_NUM < PG_VERSION_14
853 if (strcmp(param, "concurrently") == 0)
854 {
855 return reindexStmt->concurrent;
856 }
857 else if (strcmp(param, "verbose") == 0)
858 {
859 return reindexStmt->options & REINDEXOPT_VERBOSE;
860 }
861 return false;
862 #else
863 DefElem *opt = NULL;
864 foreach_ptr(opt, reindexStmt->params)
865 {
866 if (strcmp(opt->defname, param) == 0)
867 {
868 return defGetBoolean(opt);
869 }
870 }
871 return false;
872 #endif
873 }
874
875
876 /*
877 * AddVacuumParams adds vacuum params to the given buffer.
878 */
879 static void
AddVacuumParams(ReindexStmt * reindexStmt,StringInfo buffer)880 AddVacuumParams(ReindexStmt *reindexStmt, StringInfo buffer)
881 {
882 StringInfo temp = makeStringInfo();
883 if (IsReindexWithParam_compat(reindexStmt, "verbose"))
884 {
885 appendStringInfoString(temp, "VERBOSE");
886 }
887 #if PG_VERSION_NUM >= PG_VERSION_14
888 char *tableSpaceName = NULL;
889 DefElem *opt = NULL;
890 foreach_ptr(opt, reindexStmt->params)
891 {
892 if (strcmp(opt->defname, "tablespace") == 0)
893 {
894 tableSpaceName = defGetString(opt);
895 break;
896 }
897 }
898
899 if (tableSpaceName)
900 {
901 if (temp->len > 0)
902 {
903 appendStringInfo(temp, ", TABLESPACE %s", tableSpaceName);
904 }
905 else
906 {
907 appendStringInfo(temp, "TABLESPACE %s", tableSpaceName);
908 }
909 }
910 #endif
911
912 if (temp->len > 0)
913 {
914 appendStringInfo(buffer, "(%s) ", temp->data);
915 }
916 }
917
918
919 /* deparse_index_columns appends index or include parameters to the provided buffer */
920 static void
deparse_index_columns(StringInfo buffer,List * indexParameterList,List * deparseContext)921 deparse_index_columns(StringInfo buffer, List *indexParameterList, List *deparseContext)
922 {
923 ListCell *indexParameterCell = NULL;
924 foreach(indexParameterCell, indexParameterList)
925 {
926 IndexElem *indexElement = (IndexElem *) lfirst(indexParameterCell);
927
928 /* use commas to separate subsequent elements */
929 if (indexParameterCell != list_head(indexParameterList))
930 {
931 appendStringInfoChar(buffer, ',');
932 }
933
934 if (indexElement->name)
935 {
936 appendStringInfo(buffer, "%s ", quote_identifier(indexElement->name));
937 }
938 else if (indexElement->expr)
939 {
940 appendStringInfo(buffer, "(%s)", deparse_expression(indexElement->expr,
941 deparseContext, false,
942 false));
943 }
944
945 if (indexElement->collation != NIL)
946 {
947 appendStringInfo(buffer, "COLLATE %s ",
948 NameListToQuotedString(indexElement->collation));
949 }
950
951 if (indexElement->opclass != NIL)
952 {
953 appendStringInfo(buffer, "%s ",
954 NameListToQuotedString(indexElement->opclass));
955 }
956 #if PG_VERSION_NUM >= PG_VERSION_13
957
958 /* Commit on postgres: 911e70207703799605f5a0e8aad9f06cff067c63*/
959 if (indexElement->opclassopts != NIL)
960 {
961 ereport(ERROR, errmsg(
962 "citus currently doesn't support operator class parameters in indexes"));
963 }
964 #endif
965
966 if (indexElement->ordering != SORTBY_DEFAULT)
967 {
968 bool sortAsc = (indexElement->ordering == SORTBY_ASC);
969 appendStringInfo(buffer, "%s ", (sortAsc ? "ASC" : "DESC"));
970 }
971
972 if (indexElement->nulls_ordering != SORTBY_NULLS_DEFAULT)
973 {
974 bool nullsFirst = (indexElement->nulls_ordering == SORTBY_NULLS_FIRST);
975 appendStringInfo(buffer, "NULLS %s ", (nullsFirst ? "FIRST" : "LAST"));
976 }
977 }
978 }
979
980
981 /*
982 * pg_get_indexclusterdef_string returns the definition of a cluster statement
983 * for given index. The function returns null if the table is not clustered on
984 * given index.
985 */
986 char *
pg_get_indexclusterdef_string(Oid indexRelationId)987 pg_get_indexclusterdef_string(Oid indexRelationId)
988 {
989 StringInfoData buffer = { NULL, 0, 0, 0 };
990
991 HeapTuple indexTuple = SearchSysCache(INDEXRELID, ObjectIdGetDatum(indexRelationId),
992 0, 0, 0);
993 if (!HeapTupleIsValid(indexTuple))
994 {
995 ereport(ERROR, (errmsg("cache lookup failed for index %u", indexRelationId)));
996 }
997
998 Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
999 Oid tableRelationId = indexForm->indrelid;
1000
1001 /* check if the table is clustered on this index */
1002 if (indexForm->indisclustered)
1003 {
1004 char *qualifiedRelationName =
1005 generate_qualified_relation_name(tableRelationId);
1006 char *indexName = get_rel_name(indexRelationId); /* needs to be quoted */
1007
1008 initStringInfo(&buffer);
1009 appendStringInfo(&buffer, "ALTER TABLE %s CLUSTER ON %s",
1010 qualifiedRelationName, quote_identifier(indexName));
1011 }
1012
1013 ReleaseSysCache(indexTuple);
1014
1015 return (buffer.data);
1016 }
1017
1018
1019 /*
1020 * generate_qualified_relation_name computes the schema-qualified name to display for a
1021 * relation specified by OID.
1022 */
1023 char *
generate_qualified_relation_name(Oid relid)1024 generate_qualified_relation_name(Oid relid)
1025 {
1026 HeapTuple tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1027 if (!HeapTupleIsValid(tp))
1028 {
1029 elog(ERROR, "cache lookup failed for relation %u", relid);
1030 }
1031 Form_pg_class reltup = (Form_pg_class) GETSTRUCT(tp);
1032 char *relname = NameStr(reltup->relname);
1033
1034 char *nspname = get_namespace_name(reltup->relnamespace);
1035 if (!nspname)
1036 {
1037 elog(ERROR, "cache lookup failed for namespace %u",
1038 reltup->relnamespace);
1039 }
1040
1041 char *result = quote_qualified_identifier(nspname, relname);
1042
1043 ReleaseSysCache(tp);
1044
1045 return result;
1046 }
1047
1048
1049 /*
1050 * AppendOptionListToString converts the option list to its textual format, and
1051 * appends this text to the given string buffer.
1052 */
1053 static void
AppendOptionListToString(StringInfo stringBuffer,List * optionList)1054 AppendOptionListToString(StringInfo stringBuffer, List *optionList)
1055 {
1056 if (optionList != NIL)
1057 {
1058 ListCell *optionCell = NULL;
1059 bool firstOptionPrinted = false;
1060
1061 appendStringInfo(stringBuffer, " OPTIONS (");
1062
1063 foreach(optionCell, optionList)
1064 {
1065 DefElem *option = (DefElem *) lfirst(optionCell);
1066 char *optionName = option->defname;
1067 char *optionValue = defGetString(option);
1068
1069 if (firstOptionPrinted)
1070 {
1071 appendStringInfo(stringBuffer, ", ");
1072 }
1073 firstOptionPrinted = true;
1074
1075 appendStringInfo(stringBuffer, "%s ", quote_identifier(optionName));
1076 appendStringInfo(stringBuffer, "%s", quote_literal_cstr(optionValue));
1077 }
1078
1079 appendStringInfo(stringBuffer, ")");
1080 }
1081 }
1082
1083
1084 /*
1085 * AppendStorageParametersToString converts the storage parameter list to its
1086 * textual format, and appends this text to the given string buffer.
1087 */
1088 static void
AppendStorageParametersToString(StringInfo stringBuffer,List * optionList)1089 AppendStorageParametersToString(StringInfo stringBuffer, List *optionList)
1090 {
1091 ListCell *optionCell = NULL;
1092 bool firstOptionPrinted = false;
1093
1094 if (optionList == NIL)
1095 {
1096 return;
1097 }
1098
1099 appendStringInfo(stringBuffer, " WITH (");
1100
1101 foreach(optionCell, optionList)
1102 {
1103 DefElem *option = (DefElem *) lfirst(optionCell);
1104 char *optionName = option->defname;
1105 char *optionValue = defGetString(option);
1106
1107 if (firstOptionPrinted)
1108 {
1109 appendStringInfo(stringBuffer, ", ");
1110 }
1111 firstOptionPrinted = true;
1112
1113 appendStringInfo(stringBuffer, "%s = %s ",
1114 quote_identifier(optionName),
1115 quote_literal_cstr(optionValue));
1116 }
1117
1118 appendStringInfo(stringBuffer, ")");
1119 }
1120
1121
1122 /*
1123 * contain_nextval_expression_walker walks over expression tree and returns
1124 * true if it contains call to 'nextval' function.
1125 */
1126 bool
contain_nextval_expression_walker(Node * node,void * context)1127 contain_nextval_expression_walker(Node *node, void *context)
1128 {
1129 if (node == NULL)
1130 {
1131 return false;
1132 }
1133
1134 if (IsA(node, FuncExpr))
1135 {
1136 FuncExpr *funcExpr = (FuncExpr *) node;
1137
1138 if (funcExpr->funcid == F_NEXTVAL)
1139 {
1140 return true;
1141 }
1142 }
1143 return expression_tree_walker(node, contain_nextval_expression_walker, context);
1144 }
1145
1146
1147 /*
1148 * pg_get_replica_identity_command function returns the required ALTER .. TABLE
1149 * command to define the replica identity.
1150 */
1151 char *
pg_get_replica_identity_command(Oid tableRelationId)1152 pg_get_replica_identity_command(Oid tableRelationId)
1153 {
1154 StringInfo buf = makeStringInfo();
1155
1156 Relation relation = table_open(tableRelationId, AccessShareLock);
1157
1158 char replicaIdentity = relation->rd_rel->relreplident;
1159
1160 char *relationName = generate_qualified_relation_name(tableRelationId);
1161
1162 if (replicaIdentity == REPLICA_IDENTITY_INDEX)
1163 {
1164 Oid indexId = RelationGetReplicaIndex(relation);
1165
1166 if (OidIsValid(indexId))
1167 {
1168 appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY USING INDEX %s ",
1169 relationName,
1170 quote_identifier(get_rel_name(indexId)));
1171 }
1172 }
1173 else if (replicaIdentity == REPLICA_IDENTITY_NOTHING)
1174 {
1175 appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY NOTHING",
1176 relationName);
1177 }
1178 else if (replicaIdentity == REPLICA_IDENTITY_FULL)
1179 {
1180 appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY FULL",
1181 relationName);
1182 }
1183
1184 table_close(relation, AccessShareLock);
1185
1186 return (buf->len > 0) ? buf->data : NULL;
1187 }
1188
1189
1190 /*
1191 * Generate a C string representing a relation's reloptions, or NULL if none.
1192 *
1193 * This function comes from PostgreSQL source code in
1194 * src/backend/utils/adt/ruleutils.c
1195 */
1196 static char *
flatten_reloptions(Oid relid)1197 flatten_reloptions(Oid relid)
1198 {
1199 char *result = NULL;
1200 bool isnull;
1201
1202 HeapTuple tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1203 if (!HeapTupleIsValid(tuple))
1204 {
1205 elog(ERROR, "cache lookup failed for relation %u", relid);
1206 }
1207
1208 Datum reloptions = SysCacheGetAttr(RELOID, tuple,
1209 Anum_pg_class_reloptions, &isnull);
1210 if (!isnull)
1211 {
1212 StringInfoData buf;
1213 Datum *options;
1214 int noptions;
1215 int i;
1216
1217 initStringInfo(&buf);
1218
1219 deconstruct_array(DatumGetArrayTypeP(reloptions),
1220 TEXTOID, -1, false, 'i',
1221 &options, NULL, &noptions);
1222
1223 for (i = 0; i < noptions; i++)
1224 {
1225 char *option = TextDatumGetCString(options[i]);
1226 char *value;
1227
1228 /*
1229 * Each array element should have the form name=value. If the "="
1230 * is missing for some reason, treat it like an empty value.
1231 */
1232 char *name = option;
1233 char *separator = strchr(option, '=');
1234 if (separator)
1235 {
1236 *separator = '\0';
1237 value = separator + 1;
1238 }
1239 else
1240 {
1241 value = "";
1242 }
1243
1244 if (i > 0)
1245 {
1246 appendStringInfoString(&buf, ", ");
1247 }
1248 appendStringInfo(&buf, "%s=", quote_identifier(name));
1249
1250 /*
1251 * In general we need to quote the value; but to avoid unnecessary
1252 * clutter, do not quote if it is an identifier that would not
1253 * need quoting. (We could also allow numbers, but that is a bit
1254 * trickier than it looks --- for example, are leading zeroes
1255 * significant? We don't want to assume very much here about what
1256 * custom reloptions might mean.)
1257 */
1258 if (quote_identifier(value) == value)
1259 {
1260 appendStringInfoString(&buf, value);
1261 }
1262 else
1263 {
1264 simple_quote_literal(&buf, value);
1265 }
1266
1267 pfree(option);
1268 }
1269
1270 result = buf.data;
1271 }
1272
1273 ReleaseSysCache(tuple);
1274
1275 return result;
1276 }
1277
1278
1279 /*
1280 * simple_quote_literal - Format a string as a SQL literal, append to buf
1281 *
1282 * This function comes from PostgreSQL source code in
1283 * src/backend/utils/adt/ruleutils.c
1284 */
1285 static void
simple_quote_literal(StringInfo buf,const char * val)1286 simple_quote_literal(StringInfo buf, const char *val)
1287 {
1288 /*
1289 * We form the string literal according to the prevailing setting of
1290 * standard_conforming_strings; we never use E''. User is responsible for
1291 * making sure result is used correctly.
1292 */
1293 appendStringInfoChar(buf, '\'');
1294 for (const char *valptr = val; *valptr; valptr++)
1295 {
1296 char ch = *valptr;
1297
1298 if (SQL_STR_DOUBLE(ch, !standard_conforming_strings))
1299 {
1300 appendStringInfoChar(buf, ch);
1301 }
1302 appendStringInfoChar(buf, ch);
1303 }
1304 appendStringInfoChar(buf, '\'');
1305 }
1306
1307
1308 /*
1309 * RoleSpecString resolves the role specification to its string form that is suitable for transport to a worker node.
1310 * This function resolves the following identifiers from the current context so they are safe to transfer.
1311 *
1312 * CURRENT_USER - resolved to the user name of the current role being used
1313 * SESSION_USER - resolved to the user name of the user that opened the session
1314 * CURRENT_ROLE - same as CURRENT_USER, resolved to the user name of the current role being used
1315 * Postgres treats CURRENT_ROLE is equivalent to CURRENT_USER, and we follow the same approach.
1316 *
1317 * withQuoteIdentifier is used, because if the results will be used in a query the quotes are needed but if not there
1318 * should not be extra quotes.
1319 */
1320 const char *
RoleSpecString(RoleSpec * spec,bool withQuoteIdentifier)1321 RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier)
1322 {
1323 switch (spec->roletype)
1324 {
1325 case ROLESPEC_CSTRING:
1326 {
1327 return withQuoteIdentifier ?
1328 quote_identifier(spec->rolename) :
1329 spec->rolename;
1330 }
1331
1332 #if PG_VERSION_NUM >= PG_VERSION_14
1333 case ROLESPEC_CURRENT_ROLE:
1334 #endif
1335 case ROLESPEC_CURRENT_USER:
1336 {
1337 return withQuoteIdentifier ?
1338 quote_identifier(GetUserNameFromId(GetUserId(), false)) :
1339 GetUserNameFromId(GetUserId(), false);
1340 }
1341
1342 case ROLESPEC_SESSION_USER:
1343 {
1344 return withQuoteIdentifier ?
1345 quote_identifier(GetUserNameFromId(GetSessionUserId(), false)) :
1346 GetUserNameFromId(GetSessionUserId(), false);
1347 }
1348
1349 case ROLESPEC_PUBLIC:
1350 {
1351 return "PUBLIC";
1352 }
1353
1354 default:
1355 {
1356 elog(ERROR, "unexpected role type %d", spec->roletype);
1357 }
1358 }
1359 }
1360