1 /*-------------------------------------------------------------------------
2  *
3  * columnar_metadata.c
4  *
5  * Copyright (c) Citus Data, Inc.
6  *
7  * Manages metadata for columnar relations in separate, shared metadata tables
8  * in the "columnar" schema.
9  *
10  *   * holds basic stripe information including data size and row counts
11  *   * holds basic chunk and chunk group information like data offsets and
12  *     min/max values (used for Chunk Group Filtering)
13  *   * useful for fast VACUUM operations (e.g. reporting with VACUUM VERBOSE)
14  *   * useful for stats/costing
15  *   * maps logical row numbers to stripe IDs
16  *   * TODO: visibility information
17  *
18  *-------------------------------------------------------------------------
19  */
20 
21 
22 #include "postgres.h"
23 
24 #include "safe_lib.h"
25 
26 #include "citus_version.h"
27 #include "columnar/columnar.h"
28 #include "columnar/columnar_storage.h"
29 #include "columnar/columnar_version_compat.h"
30 #include "distributed/listutils.h"
31 
32 #include <sys/stat.h>
33 #include "access/heapam.h"
34 #include "access/htup_details.h"
35 #include "access/nbtree.h"
36 #include "access/xact.h"
37 #include "catalog/indexing.h"
38 #include "catalog/pg_namespace.h"
39 #include "catalog/pg_collation.h"
40 #include "catalog/pg_type.h"
41 #include "catalog/namespace.h"
42 #include "commands/defrem.h"
43 #include "commands/sequence.h"
44 #include "commands/trigger.h"
45 #include "distributed/metadata_cache.h"
46 #include "executor/executor.h"
47 #include "executor/spi.h"
48 #include "miscadmin.h"
49 #include "nodes/execnodes.h"
50 #include "lib/stringinfo.h"
51 #include "port.h"
52 #include "storage/fd.h"
53 #include "storage/lmgr.h"
54 #include "storage/procarray.h"
55 #include "storage/smgr.h"
56 #include "utils/builtins.h"
57 #include "utils/fmgroids.h"
58 #include "utils/memutils.h"
59 #include "utils/lsyscache.h"
60 #include "utils/rel.h"
61 #include "utils/relfilenodemap.h"
62 
63 
64 typedef struct
65 {
66 	Relation rel;
67 	EState *estate;
68 	ResultRelInfo *resultRelInfo;
69 } ModifyState;
70 
71 /* RowNumberLookupMode to be used in StripeMetadataLookupRowNumber */
72 typedef enum RowNumberLookupMode
73 {
74 	/*
75 	 * Find the stripe whose firstRowNumber is less than or equal to given
76 	 * input rowNumber.
77 	 */
78 	FIND_LESS_OR_EQUAL,
79 
80 	/*
81 	 * Find the stripe whose firstRowNumber is greater than input rowNumber.
82 	 */
83 	FIND_GREATER
84 } RowNumberLookupMode;
85 
86 static void InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId,
87 										 uint32 columnCount, uint32 chunkGroupRowCount,
88 										 uint64 firstRowNumber);
89 static void GetHighestUsedAddressAndId(uint64 storageId,
90 									   uint64 *highestUsedAddress,
91 									   uint64 *highestUsedId);
92 static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId,
93 												bool *update, Datum *newValues);
94 static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
95 static StripeMetadata * BuildStripeMetadata(Relation columnarStripes,
96 											HeapTuple heapTuple);
97 static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
98 										chunkGroupCount, Snapshot snapshot);
99 static Oid ColumnarStorageIdSequenceRelationId(void);
100 static Oid ColumnarStripeRelationId(void);
101 static Oid ColumnarStripePKeyIndexRelationId(void);
102 static Oid ColumnarStripeFirstRowNumberIndexRelationId(void);
103 static Oid ColumnarOptionsRelationId(void);
104 static Oid ColumnarOptionsIndexRegclass(void);
105 static Oid ColumnarChunkRelationId(void);
106 static Oid ColumnarChunkGroupRelationId(void);
107 static Oid ColumnarChunkIndexRelationId(void);
108 static Oid ColumnarChunkGroupIndexRelationId(void);
109 static Oid ColumnarNamespaceId(void);
110 static uint64 LookupStorageId(RelFileNode relfilenode);
111 static uint64 GetHighestUsedRowNumber(uint64 storageId);
112 static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
113 												   AttrNumber storageIdAtrrNumber,
114 												   Oid storageIdIndexId,
115 												   uint64 storageId);
116 static ModifyState * StartModifyRelation(Relation rel);
117 static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
118 											 bool *nulls);
119 static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple);
120 static void FinishModifyRelation(ModifyState *state);
121 static EState * create_estate_for_relation(Relation rel);
122 static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
123 static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
124 static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite);
125 static StripeMetadata * StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber,
126 													  Snapshot snapshot,
127 													  RowNumberLookupMode lookupMode);
128 static void CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata);
129 
130 PG_FUNCTION_INFO_V1(columnar_relation_storageid);
131 
132 /* constants for columnar.options */
133 #define Natts_columnar_options 5
134 #define Anum_columnar_options_regclass 1
135 #define Anum_columnar_options_chunk_group_row_limit 2
136 #define Anum_columnar_options_stripe_row_limit 3
137 #define Anum_columnar_options_compression_level 4
138 #define Anum_columnar_options_compression 5
139 
140 /* ----------------
141  *		columnar.options definition.
142  * ----------------
143  */
144 typedef struct FormData_columnar_options
145 {
146 	Oid regclass;
147 	int32 chunk_group_row_limit;
148 	int32 stripe_row_limit;
149 	int32 compressionLevel;
150 	NameData compression;
151 
152 #ifdef CATALOG_VARLEN           /* variable-length fields start here */
153 #endif
154 } FormData_columnar_options;
155 typedef FormData_columnar_options *Form_columnar_options;
156 
157 
158 /* constants for columnar.stripe */
159 #define Natts_columnar_stripe 9
160 #define Anum_columnar_stripe_storageid 1
161 #define Anum_columnar_stripe_stripe 2
162 #define Anum_columnar_stripe_file_offset 3
163 #define Anum_columnar_stripe_data_length 4
164 #define Anum_columnar_stripe_column_count 5
165 #define Anum_columnar_stripe_chunk_row_count 6
166 #define Anum_columnar_stripe_row_count 7
167 #define Anum_columnar_stripe_chunk_count 8
168 #define Anum_columnar_stripe_first_row_number 9
169 
170 /* constants for columnar.chunk_group */
171 #define Natts_columnar_chunkgroup 4
172 #define Anum_columnar_chunkgroup_storageid 1
173 #define Anum_columnar_chunkgroup_stripe 2
174 #define Anum_columnar_chunkgroup_chunk 3
175 #define Anum_columnar_chunkgroup_row_count 4
176 
177 /* constants for columnar.chunk */
178 #define Natts_columnar_chunk 14
179 #define Anum_columnar_chunk_storageid 1
180 #define Anum_columnar_chunk_stripe 2
181 #define Anum_columnar_chunk_attr 3
182 #define Anum_columnar_chunk_chunk 4
183 #define Anum_columnar_chunk_minimum_value 5
184 #define Anum_columnar_chunk_maximum_value 6
185 #define Anum_columnar_chunk_value_stream_offset 7
186 #define Anum_columnar_chunk_value_stream_length 8
187 #define Anum_columnar_chunk_exists_stream_offset 9
188 #define Anum_columnar_chunk_exists_stream_length 10
189 #define Anum_columnar_chunk_value_compression_type 11
190 #define Anum_columnar_chunk_value_compression_level 12
191 #define Anum_columnar_chunk_value_decompressed_size 13
192 #define Anum_columnar_chunk_value_count 14
193 
194 
195 /*
196  * InitColumnarOptions initialized the columnar table options. Meaning it writes the
197  * default options to the options table if not already existing.
198  */
199 void
InitColumnarOptions(Oid regclass)200 InitColumnarOptions(Oid regclass)
201 {
202 	/*
203 	 * When upgrading we retain options for all columnar tables by upgrading
204 	 * "columnar.options" catalog table, so we shouldn't do anything here.
205 	 */
206 	if (IsBinaryUpgrade)
207 	{
208 		return;
209 	}
210 
211 	ColumnarOptions defaultOptions = {
212 		.chunkRowCount = columnar_chunk_group_row_limit,
213 		.stripeRowCount = columnar_stripe_row_limit,
214 		.compressionType = columnar_compression,
215 		.compressionLevel = columnar_compression_level
216 	};
217 
218 	WriteColumnarOptions(regclass, &defaultOptions, false);
219 }
220 
221 
222 /*
223  * SetColumnarOptions writes the passed table options as the authoritive options to the
224  * table irregardless of the optiones already existing or not. This can be used to put a
225  * table in a certain state.
226  */
227 void
SetColumnarOptions(Oid regclass,ColumnarOptions * options)228 SetColumnarOptions(Oid regclass, ColumnarOptions *options)
229 {
230 	WriteColumnarOptions(regclass, options, true);
231 }
232 
233 
234 /*
235  * WriteColumnarOptions writes the options to the catalog table for a given regclass.
236  *  - If overwrite is false it will only write the values if there is not already a record
237  *    found.
238  *  - If overwrite is true it will always write the settings
239  *
240  * The return value indicates if the record has been written.
241  */
242 static bool
WriteColumnarOptions(Oid regclass,ColumnarOptions * options,bool overwrite)243 WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite)
244 {
245 	/*
246 	 * When upgrading we should retain the options from the previous
247 	 * cluster and don't write new options.
248 	 */
249 	Assert(!IsBinaryUpgrade);
250 
251 	bool written = false;
252 
253 	bool nulls[Natts_columnar_options] = { 0 };
254 	Datum values[Natts_columnar_options] = {
255 		ObjectIdGetDatum(regclass),
256 		Int32GetDatum(options->chunkRowCount),
257 		Int32GetDatum(options->stripeRowCount),
258 		Int32GetDatum(options->compressionLevel),
259 		0, /* to be filled below */
260 	};
261 
262 	NameData compressionName = { 0 };
263 	namestrcpy(&compressionName, CompressionTypeStr(options->compressionType));
264 	values[Anum_columnar_options_compression - 1] = NameGetDatum(&compressionName);
265 
266 	/* create heap tuple and insert into catalog table */
267 	Relation columnarOptions = relation_open(ColumnarOptionsRelationId(),
268 											 RowExclusiveLock);
269 	TupleDesc tupleDescriptor = RelationGetDescr(columnarOptions);
270 
271 	/* find existing item to perform update if exist */
272 	ScanKeyData scanKey[1] = { 0 };
273 	ScanKeyInit(&scanKey[0], Anum_columnar_options_regclass, BTEqualStrategyNumber,
274 				F_OIDEQ,
275 				ObjectIdGetDatum(regclass));
276 
277 	Relation index = index_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
278 	SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
279 															1, scanKey);
280 
281 	HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
282 	if (HeapTupleIsValid(heapTuple))
283 	{
284 		if (overwrite)
285 		{
286 			/* TODO check if the options are actually different, skip if not changed */
287 			/* update existing record */
288 			bool update[Natts_columnar_options] = { 0 };
289 			update[Anum_columnar_options_chunk_group_row_limit - 1] = true;
290 			update[Anum_columnar_options_stripe_row_limit - 1] = true;
291 			update[Anum_columnar_options_compression_level - 1] = true;
292 			update[Anum_columnar_options_compression - 1] = true;
293 
294 			HeapTuple tuple = heap_modify_tuple(heapTuple, tupleDescriptor,
295 												values, nulls, update);
296 			CatalogTupleUpdate(columnarOptions, &tuple->t_self, tuple);
297 			written = true;
298 		}
299 	}
300 	else
301 	{
302 		/* inserting new record */
303 		HeapTuple newTuple = heap_form_tuple(tupleDescriptor, values, nulls);
304 		CatalogTupleInsert(columnarOptions, newTuple);
305 		written = true;
306 	}
307 
308 	if (written)
309 	{
310 		CommandCounterIncrement();
311 	}
312 
313 	systable_endscan_ordered(scanDescriptor);
314 	index_close(index, AccessShareLock);
315 	relation_close(columnarOptions, RowExclusiveLock);
316 
317 	return written;
318 }
319 
320 
321 /*
322  * DeleteColumnarTableOptions removes the columnar table options for a regclass. When
323  * missingOk is false it will throw an error when no table options can be found.
324  *
325  * Returns whether a record has been removed.
326  */
327 bool
DeleteColumnarTableOptions(Oid regclass,bool missingOk)328 DeleteColumnarTableOptions(Oid regclass, bool missingOk)
329 {
330 	bool result = false;
331 
332 	/*
333 	 * When upgrading we shouldn't delete or modify table options and
334 	 * retain options from the previous cluster.
335 	 */
336 	Assert(!IsBinaryUpgrade);
337 
338 	Relation columnarOptions = try_relation_open(ColumnarOptionsRelationId(),
339 												 RowExclusiveLock);
340 	if (columnarOptions == NULL)
341 	{
342 		/* extension has been dropped */
343 		return false;
344 	}
345 
346 	/* find existing item to remove */
347 	ScanKeyData scanKey[1] = { 0 };
348 	ScanKeyInit(&scanKey[0], Anum_columnar_options_regclass, BTEqualStrategyNumber,
349 				F_OIDEQ,
350 				ObjectIdGetDatum(regclass));
351 
352 	Relation index = index_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
353 	SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
354 															1, scanKey);
355 
356 	HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
357 	if (HeapTupleIsValid(heapTuple))
358 	{
359 		CatalogTupleDelete(columnarOptions, &heapTuple->t_self);
360 		CommandCounterIncrement();
361 
362 		result = true;
363 	}
364 	else if (!missingOk)
365 	{
366 		ereport(ERROR, (errmsg("missing options for regclass: %d", regclass)));
367 	}
368 
369 	systable_endscan_ordered(scanDescriptor);
370 	index_close(index, AccessShareLock);
371 	relation_close(columnarOptions, RowExclusiveLock);
372 
373 	return result;
374 }
375 
376 
377 bool
ReadColumnarOptions(Oid regclass,ColumnarOptions * options)378 ReadColumnarOptions(Oid regclass, ColumnarOptions *options)
379 {
380 	ScanKeyData scanKey[1];
381 
382 	ScanKeyInit(&scanKey[0], Anum_columnar_options_regclass, BTEqualStrategyNumber,
383 				F_OIDEQ,
384 				ObjectIdGetDatum(regclass));
385 
386 	Oid columnarOptionsOid = ColumnarOptionsRelationId();
387 	Relation columnarOptions = try_relation_open(columnarOptionsOid, AccessShareLock);
388 	if (columnarOptions == NULL)
389 	{
390 		/*
391 		 * Extension has been dropped. This can be called while
392 		 * dropping extension or database via ObjectAccess().
393 		 */
394 		return false;
395 	}
396 
397 	Relation index = try_relation_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
398 	if (index == NULL)
399 	{
400 		table_close(columnarOptions, AccessShareLock);
401 
402 		/* extension has been dropped */
403 		return false;
404 	}
405 
406 	SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
407 															1, scanKey);
408 
409 	HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
410 	if (HeapTupleIsValid(heapTuple))
411 	{
412 		Form_columnar_options tupOptions = (Form_columnar_options) GETSTRUCT(heapTuple);
413 
414 		options->chunkRowCount = tupOptions->chunk_group_row_limit;
415 		options->stripeRowCount = tupOptions->stripe_row_limit;
416 		options->compressionLevel = tupOptions->compressionLevel;
417 		options->compressionType = ParseCompressionType(NameStr(tupOptions->compression));
418 	}
419 	else
420 	{
421 		/* populate options with system defaults */
422 		options->compressionType = columnar_compression;
423 		options->stripeRowCount = columnar_stripe_row_limit;
424 		options->chunkRowCount = columnar_chunk_group_row_limit;
425 		options->compressionLevel = columnar_compression_level;
426 	}
427 
428 	systable_endscan_ordered(scanDescriptor);
429 	index_close(index, AccessShareLock);
430 	relation_close(columnarOptions, AccessShareLock);
431 
432 	return true;
433 }
434 
435 
436 /*
437  * SaveStripeSkipList saves chunkList for a given stripe as rows
438  * of columnar.chunk.
439  */
440 void
SaveStripeSkipList(RelFileNode relfilenode,uint64 stripe,StripeSkipList * chunkList,TupleDesc tupleDescriptor)441 SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunkList,
442 				   TupleDesc tupleDescriptor)
443 {
444 	uint32 columnIndex = 0;
445 	uint32 chunkIndex = 0;
446 	uint32 columnCount = chunkList->columnCount;
447 
448 	uint64 storageId = LookupStorageId(relfilenode);
449 	Oid columnarChunkOid = ColumnarChunkRelationId();
450 	Relation columnarChunk = table_open(columnarChunkOid, RowExclusiveLock);
451 	ModifyState *modifyState = StartModifyRelation(columnarChunk);
452 
453 	for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
454 	{
455 		for (chunkIndex = 0; chunkIndex < chunkList->chunkCount; chunkIndex++)
456 		{
457 			ColumnChunkSkipNode *chunk =
458 				&chunkList->chunkSkipNodeArray[columnIndex][chunkIndex];
459 
460 			Datum values[Natts_columnar_chunk] = {
461 				UInt64GetDatum(storageId),
462 				Int64GetDatum(stripe),
463 				Int32GetDatum(columnIndex + 1),
464 				Int32GetDatum(chunkIndex),
465 				0, /* to be filled below */
466 				0, /* to be filled below */
467 				Int64GetDatum(chunk->valueChunkOffset),
468 				Int64GetDatum(chunk->valueLength),
469 				Int64GetDatum(chunk->existsChunkOffset),
470 				Int64GetDatum(chunk->existsLength),
471 				Int32GetDatum(chunk->valueCompressionType),
472 				Int32GetDatum(chunk->valueCompressionLevel),
473 				Int64GetDatum(chunk->decompressedValueSize),
474 				Int64GetDatum(chunk->rowCount)
475 			};
476 
477 			bool nulls[Natts_columnar_chunk] = { false };
478 
479 			if (chunk->hasMinMax)
480 			{
481 				values[Anum_columnar_chunk_minimum_value - 1] =
482 					PointerGetDatum(DatumToBytea(chunk->minimumValue,
483 												 &tupleDescriptor->attrs[columnIndex]));
484 				values[Anum_columnar_chunk_maximum_value - 1] =
485 					PointerGetDatum(DatumToBytea(chunk->maximumValue,
486 												 &tupleDescriptor->attrs[columnIndex]));
487 			}
488 			else
489 			{
490 				nulls[Anum_columnar_chunk_minimum_value - 1] = true;
491 				nulls[Anum_columnar_chunk_maximum_value - 1] = true;
492 			}
493 
494 			InsertTupleAndEnforceConstraints(modifyState, values, nulls);
495 		}
496 	}
497 
498 	FinishModifyRelation(modifyState);
499 	table_close(columnarChunk, RowExclusiveLock);
500 }
501 
502 
503 /*
504  * SaveChunkGroups saves the metadata for given chunk groups in columnar.chunk_group.
505  */
506 void
SaveChunkGroups(RelFileNode relfilenode,uint64 stripe,List * chunkGroupRowCounts)507 SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
508 				List *chunkGroupRowCounts)
509 {
510 	uint64 storageId = LookupStorageId(relfilenode);
511 	Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
512 	Relation columnarChunkGroup = table_open(columnarChunkGroupOid, RowExclusiveLock);
513 	ModifyState *modifyState = StartModifyRelation(columnarChunkGroup);
514 
515 	ListCell *lc = NULL;
516 	int chunkId = 0;
517 
518 	foreach(lc, chunkGroupRowCounts)
519 	{
520 		int64 rowCount = lfirst_int(lc);
521 		Datum values[Natts_columnar_chunkgroup] = {
522 			UInt64GetDatum(storageId),
523 			Int64GetDatum(stripe),
524 			Int32GetDatum(chunkId),
525 			Int64GetDatum(rowCount)
526 		};
527 
528 		bool nulls[Natts_columnar_chunkgroup] = { false };
529 
530 		InsertTupleAndEnforceConstraints(modifyState, values, nulls);
531 		chunkId++;
532 	}
533 
534 	FinishModifyRelation(modifyState);
535 	table_close(columnarChunkGroup, NoLock);
536 }
537 
538 
539 /*
540  * ReadStripeSkipList fetches chunk metadata for a given stripe.
541  */
542 StripeSkipList *
ReadStripeSkipList(RelFileNode relfilenode,uint64 stripe,TupleDesc tupleDescriptor,uint32 chunkCount,Snapshot snapshot)543 ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
544 				   uint32 chunkCount, Snapshot snapshot)
545 {
546 	int32 columnIndex = 0;
547 	HeapTuple heapTuple = NULL;
548 	uint32 columnCount = tupleDescriptor->natts;
549 	ScanKeyData scanKey[2];
550 
551 	uint64 storageId = LookupStorageId(relfilenode);
552 
553 	Oid columnarChunkOid = ColumnarChunkRelationId();
554 	Relation columnarChunk = table_open(columnarChunkOid, AccessShareLock);
555 	Relation index = index_open(ColumnarChunkIndexRelationId(), AccessShareLock);
556 
557 	ScanKeyInit(&scanKey[0], Anum_columnar_chunk_storageid,
558 				BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
559 	ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe,
560 				BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
561 
562 	SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index,
563 															snapshot, 2, scanKey);
564 
565 	StripeSkipList *chunkList = palloc0(sizeof(StripeSkipList));
566 	chunkList->chunkCount = chunkCount;
567 	chunkList->columnCount = columnCount;
568 	chunkList->chunkSkipNodeArray = palloc0(columnCount * sizeof(ColumnChunkSkipNode *));
569 	for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
570 	{
571 		chunkList->chunkSkipNodeArray[columnIndex] =
572 			palloc0(chunkCount * sizeof(ColumnChunkSkipNode));
573 	}
574 
575 	while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
576 																 ForwardScanDirection)))
577 	{
578 		Datum datumArray[Natts_columnar_chunk];
579 		bool isNullArray[Natts_columnar_chunk];
580 
581 		heap_deform_tuple(heapTuple, RelationGetDescr(columnarChunk), datumArray,
582 						  isNullArray);
583 
584 		int32 attr = DatumGetInt32(datumArray[Anum_columnar_chunk_attr - 1]);
585 		int32 chunkIndex = DatumGetInt32(datumArray[Anum_columnar_chunk_chunk - 1]);
586 
587 		if (attr <= 0 || attr > columnCount)
588 		{
589 			ereport(ERROR, (errmsg("invalid columnar chunk entry"),
590 							errdetail("Attribute number out of range: %d", attr)));
591 		}
592 
593 		if (chunkIndex < 0 || chunkIndex >= chunkCount)
594 		{
595 			ereport(ERROR, (errmsg("invalid columnar chunk entry"),
596 							errdetail("Chunk number out of range: %d", chunkIndex)));
597 		}
598 
599 		columnIndex = attr - 1;
600 
601 		ColumnChunkSkipNode *chunk =
602 			&chunkList->chunkSkipNodeArray[columnIndex][chunkIndex];
603 		chunk->rowCount = DatumGetInt64(datumArray[Anum_columnar_chunk_value_count -
604 												   1]);
605 		chunk->valueChunkOffset =
606 			DatumGetInt64(datumArray[Anum_columnar_chunk_value_stream_offset - 1]);
607 		chunk->valueLength =
608 			DatumGetInt64(datumArray[Anum_columnar_chunk_value_stream_length - 1]);
609 		chunk->existsChunkOffset =
610 			DatumGetInt64(datumArray[Anum_columnar_chunk_exists_stream_offset - 1]);
611 		chunk->existsLength =
612 			DatumGetInt64(datumArray[Anum_columnar_chunk_exists_stream_length - 1]);
613 		chunk->valueCompressionType =
614 			DatumGetInt32(datumArray[Anum_columnar_chunk_value_compression_type - 1]);
615 		chunk->valueCompressionLevel =
616 			DatumGetInt32(datumArray[Anum_columnar_chunk_value_compression_level - 1]);
617 		chunk->decompressedValueSize =
618 			DatumGetInt64(datumArray[Anum_columnar_chunk_value_decompressed_size - 1]);
619 
620 		if (isNullArray[Anum_columnar_chunk_minimum_value - 1] ||
621 			isNullArray[Anum_columnar_chunk_maximum_value - 1])
622 		{
623 			chunk->hasMinMax = false;
624 		}
625 		else
626 		{
627 			bytea *minValue = DatumGetByteaP(
628 				datumArray[Anum_columnar_chunk_minimum_value - 1]);
629 			bytea *maxValue = DatumGetByteaP(
630 				datumArray[Anum_columnar_chunk_maximum_value - 1]);
631 
632 			chunk->minimumValue =
633 				ByteaToDatum(minValue, &tupleDescriptor->attrs[columnIndex]);
634 			chunk->maximumValue =
635 				ByteaToDatum(maxValue, &tupleDescriptor->attrs[columnIndex]);
636 
637 			chunk->hasMinMax = true;
638 		}
639 	}
640 
641 	systable_endscan_ordered(scanDescriptor);
642 	index_close(index, AccessShareLock);
643 	table_close(columnarChunk, AccessShareLock);
644 
645 	chunkList->chunkGroupRowCounts =
646 		ReadChunkGroupRowCounts(storageId, stripe, chunkCount, snapshot);
647 
648 	return chunkList;
649 }
650 
651 
652 /*
653  * FindStripeByRowNumber returns StripeMetadata for the stripe whose
654  * firstRowNumber is greater than given rowNumber. If no such stripe
655  * exists, then returns NULL.
656  */
657 StripeMetadata *
FindNextStripeByRowNumber(Relation relation,uint64 rowNumber,Snapshot snapshot)658 FindNextStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
659 {
660 	return StripeMetadataLookupRowNumber(relation, rowNumber, snapshot, FIND_GREATER);
661 }
662 
663 
664 /*
665  * FindStripeByRowNumber returns StripeMetadata for the stripe that contains
666  * the row with rowNumber. If no such stripe exists, then returns NULL.
667  */
668 StripeMetadata *
FindStripeByRowNumber(Relation relation,uint64 rowNumber,Snapshot snapshot)669 FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
670 {
671 	StripeMetadata *stripeMetadata =
672 		FindStripeWithMatchingFirstRowNumber(relation, rowNumber, snapshot);
673 	if (!stripeMetadata)
674 	{
675 		return NULL;
676 	}
677 
678 	if (rowNumber > StripeGetHighestRowNumber(stripeMetadata))
679 	{
680 		return NULL;
681 	}
682 
683 	return stripeMetadata;
684 }
685 
686 
687 /*
688  * FindStripeWithMatchingFirstRowNumber returns a StripeMetadata object for
689  * the stripe that has the greatest firstRowNumber among the stripes whose
690  * firstRowNumber is smaller than or equal to given rowNumber. If no such
691  * stripe exists, then returns NULL.
692  *
693  * Note that this doesn't mean that found stripe certainly contains the tuple
694  * with given rowNumber. This is because, it also needs to be verified if
695  * highest row number that found stripe contains is greater than or equal to
696  * given rowNumber. For this reason, unless that additional check is done,
697  * this function is mostly useful for checking against "possible" constraint
698  * violations due to concurrent writes that are not flushed by other backends
699  * yet.
700  */
701 StripeMetadata *
FindStripeWithMatchingFirstRowNumber(Relation relation,uint64 rowNumber,Snapshot snapshot)702 FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber,
703 									 Snapshot snapshot)
704 {
705 	return StripeMetadataLookupRowNumber(relation, rowNumber, snapshot,
706 										 FIND_LESS_OR_EQUAL);
707 }
708 
709 
710 /*
711  * StripeWriteState returns write state of given stripe.
712  */
713 StripeWriteStateEnum
StripeWriteState(StripeMetadata * stripeMetadata)714 StripeWriteState(StripeMetadata *stripeMetadata)
715 {
716 	if (stripeMetadata->aborted)
717 	{
718 		return STRIPE_WRITE_ABORTED;
719 	}
720 	else if (stripeMetadata->rowCount > 0)
721 	{
722 		return STRIPE_WRITE_FLUSHED;
723 	}
724 	else
725 	{
726 		return STRIPE_WRITE_IN_PROGRESS;
727 	}
728 }
729 
730 
731 /*
732  * StripeGetHighestRowNumber returns rowNumber of the row with highest
733  * rowNumber in given stripe.
734  */
735 uint64
StripeGetHighestRowNumber(StripeMetadata * stripeMetadata)736 StripeGetHighestRowNumber(StripeMetadata *stripeMetadata)
737 {
738 	return stripeMetadata->firstRowNumber + stripeMetadata->rowCount - 1;
739 }
740 
741 
742 /*
743  * StripeMetadataLookupRowNumber returns StripeMetadata for the stripe whose
744  * firstRowNumber is less than or equal to (FIND_LESS_OR_EQUAL), or is
745  * greater than (FIND_GREATER) given rowNumber by doing backward index
746  * scan on stripe_first_row_number_idx.
747  * If no such stripe exists, then returns NULL.
748  */
749 static StripeMetadata *
StripeMetadataLookupRowNumber(Relation relation,uint64 rowNumber,Snapshot snapshot,RowNumberLookupMode lookupMode)750 StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot,
751 							  RowNumberLookupMode lookupMode)
752 {
753 	Assert(lookupMode == FIND_LESS_OR_EQUAL || lookupMode == FIND_GREATER);
754 
755 	StripeMetadata *foundStripeMetadata = NULL;
756 
757 	uint64 storageId = ColumnarStorageGetStorageId(relation, false);
758 	ScanKeyData scanKey[2];
759 	ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
760 				BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
761 
762 	StrategyNumber strategyNumber = InvalidStrategy;
763 	RegProcedure procedure = InvalidOid;
764 	if (lookupMode == FIND_LESS_OR_EQUAL)
765 	{
766 		strategyNumber = BTLessEqualStrategyNumber;
767 		procedure = F_INT8LE;
768 	}
769 	else if (lookupMode == FIND_GREATER)
770 	{
771 		strategyNumber = BTGreaterStrategyNumber;
772 		procedure = F_INT8GT;
773 	}
774 	ScanKeyInit(&scanKey[1], Anum_columnar_stripe_first_row_number,
775 				strategyNumber, procedure, UInt64GetDatum(rowNumber));
776 
777 
778 	Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock);
779 	Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
780 								AccessShareLock);
781 	SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
782 															snapshot, 2,
783 															scanKey);
784 
785 	ScanDirection scanDirection = NoMovementScanDirection;
786 	if (lookupMode == FIND_LESS_OR_EQUAL)
787 	{
788 		scanDirection = BackwardScanDirection;
789 	}
790 	else if (lookupMode == FIND_GREATER)
791 	{
792 		scanDirection = ForwardScanDirection;
793 	}
794 	HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, scanDirection);
795 	if (HeapTupleIsValid(heapTuple))
796 	{
797 		foundStripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
798 	}
799 
800 	systable_endscan_ordered(scanDescriptor);
801 	index_close(index, AccessShareLock);
802 	table_close(columnarStripes, AccessShareLock);
803 
804 	return foundStripeMetadata;
805 }
806 
807 
808 /*
809  * CheckStripeMetadataConsistency first decides if stripe write operation for
810  * given stripe is "flushed", "aborted" or "in-progress", then errors out if
811  * its metadata entry contradicts with this fact.
812  *
813  * Checks performed here are just to catch bugs, so it is encouraged to call
814  * this function whenever a StripeMetadata object is built from an heap tuple
815  * of columnar.stripe. Currently, BuildStripeMetadata is the only function
816  * that does this.
817  */
818 static void
CheckStripeMetadataConsistency(StripeMetadata * stripeMetadata)819 CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata)
820 {
821 	bool stripeLooksInProgress =
822 		stripeMetadata->rowCount == 0 && stripeMetadata->chunkCount == 0 &&
823 		stripeMetadata->fileOffset == ColumnarInvalidLogicalOffset &&
824 		stripeMetadata->dataLength == 0;
825 
826 	/*
827 	 * Even if stripe is flushed, fileOffset and dataLength might be equal
828 	 * to 0 for zero column tables, but those two should still be consistent
829 	 * with respect to each other.
830 	 */
831 	bool stripeLooksFlushed =
832 		stripeMetadata->rowCount > 0 && stripeMetadata->chunkCount > 0 &&
833 		((stripeMetadata->fileOffset != ColumnarInvalidLogicalOffset &&
834 		  stripeMetadata->dataLength > 0) ||
835 		 (stripeMetadata->fileOffset == ColumnarInvalidLogicalOffset &&
836 		  stripeMetadata->dataLength == 0));
837 
838 	StripeWriteStateEnum stripeWriteState = StripeWriteState(stripeMetadata);
839 	if (stripeWriteState == STRIPE_WRITE_FLUSHED && stripeLooksFlushed)
840 	{
841 		/*
842 		 * If stripe was flushed to disk, then we expect stripe to store
843 		 * at least one tuple.
844 		 */
845 		return;
846 	}
847 	else if (stripeWriteState == STRIPE_WRITE_IN_PROGRESS && stripeLooksInProgress)
848 	{
849 		/*
850 		 * If stripe was not flushed to disk, then values of given four
851 		 * fields should match the columns inserted by
852 		 * InsertEmptyStripeMetadataRow.
853 		 */
854 		return;
855 	}
856 	else if (stripeWriteState == STRIPE_WRITE_ABORTED && (stripeLooksInProgress ||
857 														  stripeLooksFlushed))
858 	{
859 		/*
860 		 * Stripe metadata entry for an aborted write can be complete or
861 		 * incomplete. We might have aborted the transaction before or after
862 		 * inserting into stripe metadata.
863 		 */
864 		return;
865 	}
866 
867 	ereport(ERROR, (errmsg("unexpected stripe state, stripe metadata "
868 						   "entry for stripe with id=" UINT64_FORMAT
869 						   " is not consistent", stripeMetadata->id)));
870 }
871 
872 
873 /*
874  * FindStripeWithHighestRowNumber returns StripeMetadata for the stripe that
875  * has the row with highest rowNumber by doing backward index scan on
876  * stripe_first_row_number_idx. If given relation is empty, then returns NULL.
877  */
878 StripeMetadata *
FindStripeWithHighestRowNumber(Relation relation,Snapshot snapshot)879 FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)
880 {
881 	StripeMetadata *stripeWithHighestRowNumber = NULL;
882 
883 	uint64 storageId = ColumnarStorageGetStorageId(relation, false);
884 	ScanKeyData scanKey[1];
885 	ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
886 				BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
887 
888 	Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock);
889 	Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
890 								AccessShareLock);
891 	SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
892 															snapshot, 1, scanKey);
893 
894 	HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection);
895 	if (HeapTupleIsValid(heapTuple))
896 	{
897 		stripeWithHighestRowNumber = BuildStripeMetadata(columnarStripes, heapTuple);
898 	}
899 
900 	systable_endscan_ordered(scanDescriptor);
901 	index_close(index, AccessShareLock);
902 	table_close(columnarStripes, AccessShareLock);
903 
904 	return stripeWithHighestRowNumber;
905 }
906 
907 
908 /*
909  * ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
910  * given stripe.
911  */
912 static uint32 *
ReadChunkGroupRowCounts(uint64 storageId,uint64 stripe,uint32 chunkGroupCount,Snapshot snapshot)913 ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
914 						Snapshot snapshot)
915 {
916 	Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
917 	Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock);
918 	Relation index = index_open(ColumnarChunkGroupIndexRelationId(), AccessShareLock);
919 
920 	ScanKeyData scanKey[2];
921 	ScanKeyInit(&scanKey[0], Anum_columnar_chunkgroup_storageid,
922 				BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
923 	ScanKeyInit(&scanKey[1], Anum_columnar_chunkgroup_stripe,
924 				BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
925 
926 	SysScanDesc scanDescriptor =
927 		systable_beginscan_ordered(columnarChunkGroup, index, snapshot, 2, scanKey);
928 
929 	uint32 chunkGroupIndex = 0;
930 	HeapTuple heapTuple = NULL;
931 	uint32 *chunkGroupRowCounts = palloc0(chunkGroupCount * sizeof(uint32));
932 
933 	while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
934 																 ForwardScanDirection)))
935 	{
936 		Datum datumArray[Natts_columnar_chunkgroup];
937 		bool isNullArray[Natts_columnar_chunkgroup];
938 
939 		heap_deform_tuple(heapTuple,
940 						  RelationGetDescr(columnarChunkGroup),
941 						  datumArray, isNullArray);
942 
943 		uint32 tupleChunkGroupIndex =
944 			DatumGetUInt32(datumArray[Anum_columnar_chunkgroup_chunk - 1]);
945 		if (chunkGroupIndex >= chunkGroupCount ||
946 			tupleChunkGroupIndex != chunkGroupIndex)
947 		{
948 			elog(ERROR, "unexpected chunk group");
949 		}
950 
951 		chunkGroupRowCounts[chunkGroupIndex] =
952 			(uint32) DatumGetUInt64(datumArray[Anum_columnar_chunkgroup_row_count - 1]);
953 		chunkGroupIndex++;
954 	}
955 
956 	if (chunkGroupIndex != chunkGroupCount)
957 	{
958 		elog(ERROR, "unexpected chunk group count");
959 	}
960 
961 	systable_endscan_ordered(scanDescriptor);
962 	index_close(index, AccessShareLock);
963 	table_close(columnarChunkGroup, AccessShareLock);
964 
965 	return chunkGroupRowCounts;
966 }
967 
968 
969 /*
970  * InsertEmptyStripeMetadataRow adds a row to columnar.stripe for the empty
971  * stripe reservation made for stripeId.
972  */
973 static void
InsertEmptyStripeMetadataRow(uint64 storageId,uint64 stripeId,uint32 columnCount,uint32 chunkGroupRowCount,uint64 firstRowNumber)974 InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCount,
975 							 uint32 chunkGroupRowCount, uint64 firstRowNumber)
976 {
977 	bool nulls[Natts_columnar_stripe] = { false };
978 
979 	Datum values[Natts_columnar_stripe] = { 0 };
980 	values[Anum_columnar_stripe_storageid - 1] =
981 		UInt64GetDatum(storageId);
982 	values[Anum_columnar_stripe_stripe - 1] =
983 		UInt64GetDatum(stripeId);
984 	values[Anum_columnar_stripe_column_count - 1] =
985 		UInt32GetDatum(columnCount);
986 	values[Anum_columnar_stripe_chunk_row_count - 1] =
987 		UInt32GetDatum(chunkGroupRowCount);
988 	values[Anum_columnar_stripe_first_row_number - 1] =
989 		UInt64GetDatum(firstRowNumber);
990 
991 	/* stripe has no rows yet, so initialize rest of the columns accordingly */
992 	values[Anum_columnar_stripe_row_count - 1] =
993 		UInt64GetDatum(0);
994 	values[Anum_columnar_stripe_file_offset - 1] =
995 		UInt64GetDatum(ColumnarInvalidLogicalOffset);
996 	values[Anum_columnar_stripe_data_length - 1] =
997 		UInt64GetDatum(0);
998 	values[Anum_columnar_stripe_chunk_count - 1] =
999 		UInt32GetDatum(0);
1000 
1001 	Oid columnarStripesOid = ColumnarStripeRelationId();
1002 	Relation columnarStripes = table_open(columnarStripesOid, RowExclusiveLock);
1003 
1004 	ModifyState *modifyState = StartModifyRelation(columnarStripes);
1005 
1006 	InsertTupleAndEnforceConstraints(modifyState, values, nulls);
1007 
1008 	FinishModifyRelation(modifyState);
1009 
1010 	table_close(columnarStripes, RowExclusiveLock);
1011 }
1012 
1013 
1014 /*
1015  * StripesForRelfilenode returns a list of StripeMetadata for stripes
1016  * of the given relfilenode.
1017  */
1018 List *
StripesForRelfilenode(RelFileNode relfilenode)1019 StripesForRelfilenode(RelFileNode relfilenode)
1020 {
1021 	uint64 storageId = LookupStorageId(relfilenode);
1022 
1023 	return ReadDataFileStripeList(storageId, GetTransactionSnapshot());
1024 }
1025 
1026 
1027 /*
1028  * GetHighestUsedAddress returns the highest used address for the given
1029  * relfilenode across all active and inactive transactions.
1030  *
1031  * This is used by truncate stage of VACUUM, and VACUUM can be called
1032  * for empty tables. So this doesn't throw errors for empty tables and
1033  * returns 0.
1034  */
1035 uint64
GetHighestUsedAddress(RelFileNode relfilenode)1036 GetHighestUsedAddress(RelFileNode relfilenode)
1037 {
1038 	uint64 storageId = LookupStorageId(relfilenode);
1039 
1040 	uint64 highestUsedAddress = 0;
1041 	uint64 highestUsedId = 0;
1042 	GetHighestUsedAddressAndId(storageId, &highestUsedAddress, &highestUsedId);
1043 
1044 	return highestUsedAddress;
1045 }
1046 
1047 
1048 /*
1049  * GetHighestUsedAddressAndId returns the highest used address and id for
1050  * the given relfilenode across all active and inactive transactions.
1051  */
1052 static void
GetHighestUsedAddressAndId(uint64 storageId,uint64 * highestUsedAddress,uint64 * highestUsedId)1053 GetHighestUsedAddressAndId(uint64 storageId,
1054 						   uint64 *highestUsedAddress,
1055 						   uint64 *highestUsedId)
1056 {
1057 	ListCell *stripeMetadataCell = NULL;
1058 
1059 	SnapshotData SnapshotDirty;
1060 	InitDirtySnapshot(SnapshotDirty);
1061 
1062 	List *stripeMetadataList = ReadDataFileStripeList(storageId, &SnapshotDirty);
1063 
1064 	*highestUsedId = 0;
1065 
1066 	/* file starts with metapage */
1067 	*highestUsedAddress = COLUMNAR_BYTES_PER_PAGE;
1068 
1069 	foreach(stripeMetadataCell, stripeMetadataList)
1070 	{
1071 		StripeMetadata *stripe = lfirst(stripeMetadataCell);
1072 		uint64 lastByte = stripe->fileOffset + stripe->dataLength - 1;
1073 		*highestUsedAddress = Max(*highestUsedAddress, lastByte);
1074 		*highestUsedId = Max(*highestUsedId, stripe->id);
1075 	}
1076 }
1077 
1078 
1079 /*
1080  * ReserveEmptyStripe reserves an empty stripe for given relation
1081  * and inserts it into columnar.stripe. It is guaranteed that concurrent
1082  * writes won't overwrite the returned stripe.
1083  */
1084 EmptyStripeReservation *
ReserveEmptyStripe(Relation rel,uint64 columnCount,uint64 chunkGroupRowCount,uint64 stripeRowCount)1085 ReserveEmptyStripe(Relation rel, uint64 columnCount, uint64 chunkGroupRowCount,
1086 				   uint64 stripeRowCount)
1087 {
1088 	EmptyStripeReservation *stripeReservation = palloc0(sizeof(EmptyStripeReservation));
1089 
1090 	uint64 storageId = ColumnarStorageGetStorageId(rel, false);
1091 
1092 	stripeReservation->stripeId = ColumnarStorageReserveStripeId(rel);
1093 	stripeReservation->stripeFirstRowNumber =
1094 		ColumnarStorageReserveRowNumber(rel, stripeRowCount);
1095 
1096 	/*
1097 	 * XXX: Instead of inserting a dummy entry to columnar.stripe and
1098 	 * updating it when flushing the stripe, we could have a hash table
1099 	 * in shared memory for the bookkeeping of ongoing writes.
1100 	 */
1101 	InsertEmptyStripeMetadataRow(storageId, stripeReservation->stripeId,
1102 								 columnCount, chunkGroupRowCount,
1103 								 stripeReservation->stripeFirstRowNumber);
1104 
1105 	return stripeReservation;
1106 }
1107 
1108 
1109 /*
1110  * CompleteStripeReservation completes reservation of the stripe with
1111  * stripeId for given size and in-place updates related stripe metadata tuple
1112  * to complete reservation.
1113  */
1114 StripeMetadata *
CompleteStripeReservation(Relation rel,uint64 stripeId,uint64 sizeBytes,uint64 rowCount,uint64 chunkCount)1115 CompleteStripeReservation(Relation rel, uint64 stripeId, uint64 sizeBytes,
1116 						  uint64 rowCount, uint64 chunkCount)
1117 {
1118 	uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes);
1119 	uint64 storageId = ColumnarStorageGetStorageId(rel, false);
1120 
1121 	bool update[Natts_columnar_stripe] = { false };
1122 	update[Anum_columnar_stripe_file_offset - 1] = true;
1123 	update[Anum_columnar_stripe_data_length - 1] = true;
1124 	update[Anum_columnar_stripe_row_count - 1] = true;
1125 	update[Anum_columnar_stripe_chunk_count - 1] = true;
1126 
1127 	Datum newValues[Natts_columnar_stripe] = { 0 };
1128 	newValues[Anum_columnar_stripe_file_offset - 1] = Int64GetDatum(resLogicalStart);
1129 	newValues[Anum_columnar_stripe_data_length - 1] = Int64GetDatum(sizeBytes);
1130 	newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount);
1131 	newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount);
1132 
1133 	return UpdateStripeMetadataRow(storageId, stripeId, update, newValues);
1134 }
1135 
1136 
1137 /*
1138  * UpdateStripeMetadataRow updates stripe metadata tuple for the stripe with
1139  * stripeId according to given newValues and update arrays.
1140  * Note that this function shouldn't be used for the cases where any indexes
1141  * of stripe metadata should be updated according to modifications done.
1142  */
1143 static StripeMetadata *
UpdateStripeMetadataRow(uint64 storageId,uint64 stripeId,bool * update,Datum * newValues)1144 UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
1145 						Datum *newValues)
1146 {
1147 	SnapshotData dirtySnapshot;
1148 	InitDirtySnapshot(dirtySnapshot);
1149 
1150 	ScanKeyData scanKey[2];
1151 	ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
1152 				BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
1153 	ScanKeyInit(&scanKey[1], Anum_columnar_stripe_stripe,
1154 				BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripeId));
1155 
1156 	Oid columnarStripesOid = ColumnarStripeRelationId();
1157 
1158 	Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
1159 	Relation columnarStripePkeyIndex = index_open(ColumnarStripePKeyIndexRelationId(),
1160 												  AccessShareLock);
1161 
1162 	SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes,
1163 															columnarStripePkeyIndex,
1164 															&dirtySnapshot, 2, scanKey);
1165 
1166 	HeapTuple oldTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
1167 	if (!HeapTupleIsValid(oldTuple))
1168 	{
1169 		ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, "
1170 							   "columnar storage with id=" UINT64_FORMAT
1171 							   " does not have stripe with id=" UINT64_FORMAT,
1172 							   storageId, stripeId)));
1173 	}
1174 
1175 	/*
1176 	 * heap_inplace_update already doesn't allow changing size of the original
1177 	 * tuple, so we don't allow setting any Datum's to NULL values.
1178 	 */
1179 	bool newNulls[Natts_columnar_stripe] = { false };
1180 	TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
1181 	HeapTuple modifiedTuple = heap_modify_tuple(oldTuple, tupleDescriptor,
1182 												newValues, newNulls, update);
1183 
1184 	heap_inplace_update(columnarStripes, modifiedTuple);
1185 
1186 	/*
1187 	 * Existing tuple now contains modifications, because we used
1188 	 * heap_inplace_update().
1189 	 */
1190 	HeapTuple newTuple = oldTuple;
1191 
1192 	/*
1193 	 * Must not pass modifiedTuple, because BuildStripeMetadata expects a real
1194 	 * heap tuple with MVCC fields.
1195 	 */
1196 	StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes,
1197 																 newTuple);
1198 
1199 	CommandCounterIncrement();
1200 
1201 	systable_endscan_ordered(scanDescriptor);
1202 	index_close(columnarStripePkeyIndex, AccessShareLock);
1203 	table_close(columnarStripes, AccessShareLock);
1204 
1205 	/* return StripeMetadata object built from modified tuple */
1206 	return modifiedStripeMetadata;
1207 }
1208 
1209 
1210 /*
1211  * ReadDataFileStripeList reads the stripe list for a given storageId
1212  * in the given snapshot.
1213  */
1214 static List *
ReadDataFileStripeList(uint64 storageId,Snapshot snapshot)1215 ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
1216 {
1217 	List *stripeMetadataList = NIL;
1218 	ScanKeyData scanKey[1];
1219 	HeapTuple heapTuple;
1220 
1221 	ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
1222 				BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
1223 
1224 	Oid columnarStripesOid = ColumnarStripeRelationId();
1225 
1226 	Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
1227 	Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
1228 								AccessShareLock);
1229 
1230 	SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
1231 															snapshot, 1,
1232 															scanKey);
1233 
1234 	while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
1235 																 ForwardScanDirection)))
1236 	{
1237 		StripeMetadata *stripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
1238 		stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
1239 	}
1240 
1241 	systable_endscan_ordered(scanDescriptor);
1242 	index_close(index, AccessShareLock);
1243 	table_close(columnarStripes, AccessShareLock);
1244 
1245 	return stripeMetadataList;
1246 }
1247 
1248 
1249 /*
1250  * BuildStripeMetadata builds a StripeMetadata object from given heap tuple.
1251  *
1252  * NB: heapTuple must be a proper heap tuple with MVCC fields.
1253  */
1254 static StripeMetadata *
BuildStripeMetadata(Relation columnarStripes,HeapTuple heapTuple)1255 BuildStripeMetadata(Relation columnarStripes, HeapTuple heapTuple)
1256 {
1257 	Assert(RelationGetRelid(columnarStripes) == ColumnarStripeRelationId());
1258 
1259 	Datum datumArray[Natts_columnar_stripe];
1260 	bool isNullArray[Natts_columnar_stripe];
1261 	heap_deform_tuple(heapTuple, RelationGetDescr(columnarStripes),
1262 					  datumArray, isNullArray);
1263 
1264 	StripeMetadata *stripeMetadata = palloc0(sizeof(StripeMetadata));
1265 	stripeMetadata->id = DatumGetInt64(datumArray[Anum_columnar_stripe_stripe - 1]);
1266 	stripeMetadata->fileOffset = DatumGetInt64(
1267 		datumArray[Anum_columnar_stripe_file_offset - 1]);
1268 	stripeMetadata->dataLength = DatumGetInt64(
1269 		datumArray[Anum_columnar_stripe_data_length - 1]);
1270 	stripeMetadata->columnCount = DatumGetInt32(
1271 		datumArray[Anum_columnar_stripe_column_count - 1]);
1272 	stripeMetadata->chunkCount = DatumGetInt32(
1273 		datumArray[Anum_columnar_stripe_chunk_count - 1]);
1274 	stripeMetadata->chunkGroupRowCount = DatumGetInt32(
1275 		datumArray[Anum_columnar_stripe_chunk_row_count - 1]);
1276 	stripeMetadata->rowCount = DatumGetInt64(
1277 		datumArray[Anum_columnar_stripe_row_count - 1]);
1278 	stripeMetadata->firstRowNumber = DatumGetUInt64(
1279 		datumArray[Anum_columnar_stripe_first_row_number - 1]);
1280 
1281 	/*
1282 	 * If there is unflushed data in a parent transaction, then we would
1283 	 * have already thrown an error before starting to scan the table.. If
1284 	 * the data is from an earlier subxact that committed, then it would
1285 	 * have been flushed already. For this reason, we don't care about
1286 	 * subtransaction id here.
1287 	 */
1288 	TransactionId entryXmin = HeapTupleHeaderGetXmin(heapTuple->t_data);
1289 	stripeMetadata->aborted = !TransactionIdIsInProgress(entryXmin) &&
1290 							  TransactionIdDidAbort(entryXmin);
1291 	stripeMetadata->insertedByCurrentXact =
1292 		TransactionIdIsCurrentTransactionId(entryXmin);
1293 
1294 	CheckStripeMetadataConsistency(stripeMetadata);
1295 
1296 	return stripeMetadata;
1297 }
1298 
1299 
1300 /*
1301  * DeleteMetadataRows removes the rows with given relfilenode from columnar
1302  * metadata tables.
1303  */
1304 void
DeleteMetadataRows(RelFileNode relfilenode)1305 DeleteMetadataRows(RelFileNode relfilenode)
1306 {
1307 	/*
1308 	 * During a restore for binary upgrade, metadata tables and indexes may or
1309 	 * may not exist.
1310 	 */
1311 	if (IsBinaryUpgrade)
1312 	{
1313 		return;
1314 	}
1315 
1316 	uint64 storageId = LookupStorageId(relfilenode);
1317 
1318 	DeleteStorageFromColumnarMetadataTable(ColumnarStripeRelationId(),
1319 										   Anum_columnar_stripe_storageid,
1320 										   ColumnarStripePKeyIndexRelationId(),
1321 										   storageId);
1322 	DeleteStorageFromColumnarMetadataTable(ColumnarChunkGroupRelationId(),
1323 										   Anum_columnar_chunkgroup_storageid,
1324 										   ColumnarChunkGroupIndexRelationId(),
1325 										   storageId);
1326 	DeleteStorageFromColumnarMetadataTable(ColumnarChunkRelationId(),
1327 										   Anum_columnar_chunk_storageid,
1328 										   ColumnarChunkIndexRelationId(),
1329 										   storageId);
1330 }
1331 
1332 
1333 /*
1334  * DeleteStorageFromColumnarMetadataTable removes the rows with given
1335  * storageId from given columnar metadata table.
1336  */
1337 static void
DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,AttrNumber storageIdAtrrNumber,Oid storageIdIndexId,uint64 storageId)1338 DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
1339 									   AttrNumber storageIdAtrrNumber,
1340 									   Oid storageIdIndexId, uint64 storageId)
1341 {
1342 	ScanKeyData scanKey[1];
1343 	ScanKeyInit(&scanKey[0], storageIdAtrrNumber, BTEqualStrategyNumber,
1344 				F_INT8EQ, UInt64GetDatum(storageId));
1345 
1346 	Relation metadataTable = try_relation_open(metadataTableId, AccessShareLock);
1347 	if (metadataTable == NULL)
1348 	{
1349 		/* extension has been dropped */
1350 		return;
1351 	}
1352 
1353 	Relation index = index_open(storageIdIndexId, AccessShareLock);
1354 
1355 	SysScanDesc scanDescriptor = systable_beginscan_ordered(metadataTable, index, NULL,
1356 															1, scanKey);
1357 
1358 	ModifyState *modifyState = StartModifyRelation(metadataTable);
1359 
1360 	HeapTuple heapTuple;
1361 	while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
1362 																 ForwardScanDirection)))
1363 	{
1364 		DeleteTupleAndEnforceConstraints(modifyState, heapTuple);
1365 	}
1366 
1367 	systable_endscan_ordered(scanDescriptor);
1368 
1369 	FinishModifyRelation(modifyState);
1370 
1371 	index_close(index, AccessShareLock);
1372 	table_close(metadataTable, AccessShareLock);
1373 }
1374 
1375 
1376 /*
1377  * StartModifyRelation allocates resources for modifications.
1378  */
1379 static ModifyState *
StartModifyRelation(Relation rel)1380 StartModifyRelation(Relation rel)
1381 {
1382 	EState *estate = create_estate_for_relation(rel);
1383 
1384 #if PG_VERSION_NUM >= PG_VERSION_14
1385 	ResultRelInfo *resultRelInfo = makeNode(ResultRelInfo);
1386 	InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
1387 #else
1388 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
1389 #endif
1390 
1391 	/* ExecSimpleRelationInsert, ... require caller to open indexes */
1392 	ExecOpenIndices(resultRelInfo, false);
1393 
1394 	ModifyState *modifyState = palloc(sizeof(ModifyState));
1395 	modifyState->rel = rel;
1396 	modifyState->estate = estate;
1397 	modifyState->resultRelInfo = resultRelInfo;
1398 
1399 	return modifyState;
1400 }
1401 
1402 
1403 /*
1404  * InsertTupleAndEnforceConstraints inserts a tuple into a relation and makes
1405  * sure constraints are enforced and indexes are updated.
1406  */
1407 static void
InsertTupleAndEnforceConstraints(ModifyState * state,Datum * values,bool * nulls)1408 InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls)
1409 {
1410 	TupleDesc tupleDescriptor = RelationGetDescr(state->rel);
1411 	HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls);
1412 
1413 	TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor,
1414 												  &TTSOpsHeapTuple);
1415 
1416 	ExecStoreHeapTuple(tuple, slot, false);
1417 
1418 	/* use ExecSimpleRelationInsert to enforce constraints */
1419 	ExecSimpleRelationInsert_compat(state->resultRelInfo, state->estate, slot);
1420 }
1421 
1422 
1423 /*
1424  * DeleteTupleAndEnforceConstraints deletes a tuple from a relation and
1425  * makes sure constraints (e.g. FK constraints) are enforced.
1426  */
1427 static void
DeleteTupleAndEnforceConstraints(ModifyState * state,HeapTuple heapTuple)1428 DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple)
1429 {
1430 	EState *estate = state->estate;
1431 	ResultRelInfo *resultRelInfo = state->resultRelInfo;
1432 
1433 	ItemPointer tid = &(heapTuple->t_self);
1434 	simple_heap_delete(state->rel, tid);
1435 
1436 	/* execute AFTER ROW DELETE Triggers to enforce constraints */
1437 	ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL);
1438 }
1439 
1440 
1441 /*
1442  * FinishModifyRelation cleans up resources after modifications are done.
1443  */
1444 static void
FinishModifyRelation(ModifyState * state)1445 FinishModifyRelation(ModifyState *state)
1446 {
1447 	ExecCloseIndices(state->resultRelInfo);
1448 
1449 	AfterTriggerEndQuery(state->estate);
1450 #if PG_VERSION_NUM >= PG_VERSION_14
1451 	ExecCloseResultRelations(state->estate);
1452 	ExecCloseRangeTableRelations(state->estate);
1453 #else
1454 	ExecCleanUpTriggerState(state->estate);
1455 #endif
1456 	ExecResetTupleTable(state->estate->es_tupleTable, false);
1457 	FreeExecutorState(state->estate);
1458 
1459 	CommandCounterIncrement();
1460 }
1461 
1462 
1463 /*
1464  * Based on a similar function from
1465  * postgres/src/backend/replication/logical/worker.c.
1466  *
1467  * Executor state preparation for evaluation of constraint expressions,
1468  * indexes and triggers.
1469  *
1470  * This is based on similar code in copy.c
1471  */
1472 static EState *
create_estate_for_relation(Relation rel)1473 create_estate_for_relation(Relation rel)
1474 {
1475 	EState *estate = CreateExecutorState();
1476 
1477 	RangeTblEntry *rte = makeNode(RangeTblEntry);
1478 	rte->rtekind = RTE_RELATION;
1479 	rte->relid = RelationGetRelid(rel);
1480 	rte->relkind = rel->rd_rel->relkind;
1481 	rte->rellockmode = AccessShareLock;
1482 	ExecInitRangeTable(estate, list_make1(rte));
1483 
1484 #if PG_VERSION_NUM < PG_VERSION_14
1485 	ResultRelInfo *resultRelInfo = makeNode(ResultRelInfo);
1486 	InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
1487 
1488 	estate->es_result_relations = resultRelInfo;
1489 	estate->es_num_result_relations = 1;
1490 	estate->es_result_relation_info = resultRelInfo;
1491 #endif
1492 
1493 	estate->es_output_cid = GetCurrentCommandId(true);
1494 
1495 	/* Prepare to catch AFTER triggers. */
1496 	AfterTriggerBeginQuery();
1497 
1498 	return estate;
1499 }
1500 
1501 
1502 /*
1503  * DatumToBytea serializes a datum into a bytea value.
1504  */
1505 static bytea *
DatumToBytea(Datum value,Form_pg_attribute attrForm)1506 DatumToBytea(Datum value, Form_pg_attribute attrForm)
1507 {
1508 	int datumLength = att_addlength_datum(0, attrForm->attlen, value);
1509 	bytea *result = palloc0(datumLength + VARHDRSZ);
1510 
1511 	SET_VARSIZE(result, datumLength + VARHDRSZ);
1512 
1513 	if (attrForm->attlen > 0)
1514 	{
1515 		if (attrForm->attbyval)
1516 		{
1517 			Datum tmp;
1518 			store_att_byval(&tmp, value, attrForm->attlen);
1519 
1520 			memcpy_s(VARDATA(result), datumLength + VARHDRSZ,
1521 					 &tmp, attrForm->attlen);
1522 		}
1523 		else
1524 		{
1525 			memcpy_s(VARDATA(result), datumLength + VARHDRSZ,
1526 					 DatumGetPointer(value), attrForm->attlen);
1527 		}
1528 	}
1529 	else
1530 	{
1531 		memcpy_s(VARDATA(result), datumLength + VARHDRSZ,
1532 				 DatumGetPointer(value), datumLength);
1533 	}
1534 
1535 	return result;
1536 }
1537 
1538 
1539 /*
1540  * ByteaToDatum deserializes a value which was previously serialized using
1541  * DatumToBytea.
1542  */
1543 static Datum
ByteaToDatum(bytea * bytes,Form_pg_attribute attrForm)1544 ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm)
1545 {
1546 	/*
1547 	 * We copy the data so the result of this function lives even
1548 	 * after the byteaDatum is freed.
1549 	 */
1550 	char *binaryDataCopy = palloc0(VARSIZE_ANY_EXHDR(bytes));
1551 	memcpy_s(binaryDataCopy, VARSIZE_ANY_EXHDR(bytes),
1552 			 VARDATA_ANY(bytes), VARSIZE_ANY_EXHDR(bytes));
1553 
1554 	return fetch_att(binaryDataCopy, attrForm->attbyval, attrForm->attlen);
1555 }
1556 
1557 
1558 /*
1559  * ColumnarStorageIdSequenceRelationId returns relation id of columnar.stripe.
1560  * TODO: should we cache this similar to citus?
1561  */
1562 static Oid
ColumnarStorageIdSequenceRelationId(void)1563 ColumnarStorageIdSequenceRelationId(void)
1564 {
1565 	return get_relname_relid("storageid_seq", ColumnarNamespaceId());
1566 }
1567 
1568 
1569 /*
1570  * ColumnarStripeRelationId returns relation id of columnar.stripe.
1571  * TODO: should we cache this similar to citus?
1572  */
1573 static Oid
ColumnarStripeRelationId(void)1574 ColumnarStripeRelationId(void)
1575 {
1576 	return get_relname_relid("stripe", ColumnarNamespaceId());
1577 }
1578 
1579 
1580 /*
1581  * ColumnarStripePKeyIndexRelationId returns relation id of columnar.stripe_pkey.
1582  * TODO: should we cache this similar to citus?
1583  */
1584 static Oid
ColumnarStripePKeyIndexRelationId(void)1585 ColumnarStripePKeyIndexRelationId(void)
1586 {
1587 	return get_relname_relid("stripe_pkey", ColumnarNamespaceId());
1588 }
1589 
1590 
1591 /*
1592  * ColumnarStripeFirstRowNumberIndexRelationId returns relation id of
1593  * columnar.stripe_first_row_number_idx.
1594  * TODO: should we cache this similar to citus?
1595  */
1596 static Oid
ColumnarStripeFirstRowNumberIndexRelationId(void)1597 ColumnarStripeFirstRowNumberIndexRelationId(void)
1598 {
1599 	return get_relname_relid("stripe_first_row_number_idx", ColumnarNamespaceId());
1600 }
1601 
1602 
1603 /*
1604  * ColumnarOptionsRelationId returns relation id of columnar.options.
1605  */
1606 static Oid
ColumnarOptionsRelationId(void)1607 ColumnarOptionsRelationId(void)
1608 {
1609 	return get_relname_relid("options", ColumnarNamespaceId());
1610 }
1611 
1612 
1613 /*
1614  * ColumnarOptionsIndexRegclass returns relation id of columnar.options_pkey.
1615  */
1616 static Oid
ColumnarOptionsIndexRegclass(void)1617 ColumnarOptionsIndexRegclass(void)
1618 {
1619 	return get_relname_relid("options_pkey", ColumnarNamespaceId());
1620 }
1621 
1622 
1623 /*
1624  * ColumnarChunkRelationId returns relation id of columnar.chunk.
1625  * TODO: should we cache this similar to citus?
1626  */
1627 static Oid
ColumnarChunkRelationId(void)1628 ColumnarChunkRelationId(void)
1629 {
1630 	return get_relname_relid("chunk", ColumnarNamespaceId());
1631 }
1632 
1633 
1634 /*
1635  * ColumnarChunkGroupRelationId returns relation id of columnar.chunk_group.
1636  * TODO: should we cache this similar to citus?
1637  */
1638 static Oid
ColumnarChunkGroupRelationId(void)1639 ColumnarChunkGroupRelationId(void)
1640 {
1641 	return get_relname_relid("chunk_group", ColumnarNamespaceId());
1642 }
1643 
1644 
1645 /*
1646  * ColumnarChunkIndexRelationId returns relation id of columnar.chunk_pkey.
1647  * TODO: should we cache this similar to citus?
1648  */
1649 static Oid
ColumnarChunkIndexRelationId(void)1650 ColumnarChunkIndexRelationId(void)
1651 {
1652 	return get_relname_relid("chunk_pkey", ColumnarNamespaceId());
1653 }
1654 
1655 
1656 /*
1657  * ColumnarChunkGroupIndexRelationId returns relation id of columnar.chunk_group_pkey.
1658  * TODO: should we cache this similar to citus?
1659  */
1660 static Oid
ColumnarChunkGroupIndexRelationId(void)1661 ColumnarChunkGroupIndexRelationId(void)
1662 {
1663 	return get_relname_relid("chunk_group_pkey", ColumnarNamespaceId());
1664 }
1665 
1666 
1667 /*
1668  * ColumnarNamespaceId returns namespace id of the schema we store columnar
1669  * related tables.
1670  */
1671 static Oid
ColumnarNamespaceId(void)1672 ColumnarNamespaceId(void)
1673 {
1674 	return get_namespace_oid("columnar", false);
1675 }
1676 
1677 
1678 /*
1679  * LookupStorageId reads storage metapage to find the storage ID for the given relfilenode. It returns
1680  * false if the relation doesn't have a meta page yet.
1681  */
1682 static uint64
LookupStorageId(RelFileNode relfilenode)1683 LookupStorageId(RelFileNode relfilenode)
1684 {
1685 	Oid relationId = RelidByRelfilenode(relfilenode.spcNode,
1686 										relfilenode.relNode);
1687 
1688 	Relation relation = relation_open(relationId, AccessShareLock);
1689 	uint64 storageId = ColumnarStorageGetStorageId(relation, false);
1690 	table_close(relation, AccessShareLock);
1691 
1692 	return storageId;
1693 }
1694 
1695 
1696 /*
1697  * ColumnarMetadataNewStorageId - create a new, unique storage id and return
1698  * it.
1699  */
1700 uint64
ColumnarMetadataNewStorageId()1701 ColumnarMetadataNewStorageId()
1702 {
1703 	return nextval_internal(ColumnarStorageIdSequenceRelationId(), false);
1704 }
1705 
1706 
1707 /*
1708  * columnar_relation_storageid returns storage id associated with the
1709  * given relation id, or -1 if there is no associated storage id yet.
1710  */
1711 Datum
columnar_relation_storageid(PG_FUNCTION_ARGS)1712 columnar_relation_storageid(PG_FUNCTION_ARGS)
1713 {
1714 	Oid relationId = PG_GETARG_OID(0);
1715 	Relation relation = relation_open(relationId, AccessShareLock);
1716 	if (!IsColumnarTableAmTable(relationId))
1717 	{
1718 		elog(ERROR, "relation \"%s\" is not a columnar table",
1719 			 RelationGetRelationName(relation));
1720 	}
1721 
1722 	uint64 storageId = ColumnarStorageGetStorageId(relation, false);
1723 
1724 	relation_close(relation, AccessShareLock);
1725 
1726 	PG_RETURN_INT64(storageId);
1727 }
1728 
1729 
1730 /*
1731  * ColumnarStorageUpdateIfNeeded - upgrade columnar storage to the current version by
1732  * using information from the metadata tables.
1733  */
1734 void
ColumnarStorageUpdateIfNeeded(Relation rel,bool isUpgrade)1735 ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade)
1736 {
1737 	if (ColumnarStorageIsCurrent(rel))
1738 	{
1739 		return;
1740 	}
1741 
1742 	RelationOpenSmgr(rel);
1743 	BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
1744 	if (nblocks < 2)
1745 	{
1746 		ColumnarStorageInit(rel->rd_smgr, ColumnarMetadataNewStorageId());
1747 		return;
1748 	}
1749 
1750 	uint64 storageId = ColumnarStorageGetStorageId(rel, true);
1751 
1752 	uint64 highestId;
1753 	uint64 highestOffset;
1754 	GetHighestUsedAddressAndId(storageId, &highestOffset, &highestId);
1755 
1756 	uint64 reservedStripeId = highestId + 1;
1757 	uint64 reservedOffset = highestOffset + 1;
1758 	uint64 reservedRowNumber = GetHighestUsedRowNumber(storageId) + 1;
1759 	ColumnarStorageUpdateCurrent(rel, isUpgrade, reservedStripeId,
1760 								 reservedRowNumber, reservedOffset);
1761 }
1762 
1763 
1764 /*
1765  * GetHighestUsedRowNumber returns the highest used rowNumber for given
1766  * storageId. Returns COLUMNAR_INVALID_ROW_NUMBER if storage with
1767  * storageId has no stripes.
1768  * Note that normally we would use ColumnarStorageGetReservedRowNumber
1769  * to decide that. However, this function is designed to be used when
1770  * building the metapage itself during upgrades.
1771  */
1772 static uint64
GetHighestUsedRowNumber(uint64 storageId)1773 GetHighestUsedRowNumber(uint64 storageId)
1774 {
1775 	uint64 highestRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
1776 
1777 	List *stripeMetadataList = ReadDataFileStripeList(storageId,
1778 													  GetTransactionSnapshot());
1779 	StripeMetadata *stripeMetadata = NULL;
1780 	foreach_ptr(stripeMetadata, stripeMetadataList)
1781 	{
1782 		highestRowNumber = Max(highestRowNumber,
1783 							   StripeGetHighestRowNumber(stripeMetadata));
1784 	}
1785 
1786 	return highestRowNumber;
1787 }
1788