1 /*-------------------------------------------------------------------------
2 *
3 * columnar_metadata.c
4 *
5 * Copyright (c) Citus Data, Inc.
6 *
7 * Manages metadata for columnar relations in separate, shared metadata tables
8 * in the "columnar" schema.
9 *
10 * * holds basic stripe information including data size and row counts
11 * * holds basic chunk and chunk group information like data offsets and
12 * min/max values (used for Chunk Group Filtering)
13 * * useful for fast VACUUM operations (e.g. reporting with VACUUM VERBOSE)
14 * * useful for stats/costing
15 * * maps logical row numbers to stripe IDs
16 * * TODO: visibility information
17 *
18 *-------------------------------------------------------------------------
19 */
20
21
22 #include "postgres.h"
23
24 #include "safe_lib.h"
25
26 #include "citus_version.h"
27 #include "columnar/columnar.h"
28 #include "columnar/columnar_storage.h"
29 #include "columnar/columnar_version_compat.h"
30 #include "distributed/listutils.h"
31
32 #include <sys/stat.h>
33 #include "access/heapam.h"
34 #include "access/htup_details.h"
35 #include "access/nbtree.h"
36 #include "access/xact.h"
37 #include "catalog/indexing.h"
38 #include "catalog/pg_namespace.h"
39 #include "catalog/pg_collation.h"
40 #include "catalog/pg_type.h"
41 #include "catalog/namespace.h"
42 #include "commands/defrem.h"
43 #include "commands/sequence.h"
44 #include "commands/trigger.h"
45 #include "distributed/metadata_cache.h"
46 #include "executor/executor.h"
47 #include "executor/spi.h"
48 #include "miscadmin.h"
49 #include "nodes/execnodes.h"
50 #include "lib/stringinfo.h"
51 #include "port.h"
52 #include "storage/fd.h"
53 #include "storage/lmgr.h"
54 #include "storage/procarray.h"
55 #include "storage/smgr.h"
56 #include "utils/builtins.h"
57 #include "utils/fmgroids.h"
58 #include "utils/memutils.h"
59 #include "utils/lsyscache.h"
60 #include "utils/rel.h"
61 #include "utils/relfilenodemap.h"
62
63
64 typedef struct
65 {
66 Relation rel;
67 EState *estate;
68 ResultRelInfo *resultRelInfo;
69 } ModifyState;
70
71 /* RowNumberLookupMode to be used in StripeMetadataLookupRowNumber */
72 typedef enum RowNumberLookupMode
73 {
74 /*
75 * Find the stripe whose firstRowNumber is less than or equal to given
76 * input rowNumber.
77 */
78 FIND_LESS_OR_EQUAL,
79
80 /*
81 * Find the stripe whose firstRowNumber is greater than input rowNumber.
82 */
83 FIND_GREATER
84 } RowNumberLookupMode;
85
86 static void InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId,
87 uint32 columnCount, uint32 chunkGroupRowCount,
88 uint64 firstRowNumber);
89 static void GetHighestUsedAddressAndId(uint64 storageId,
90 uint64 *highestUsedAddress,
91 uint64 *highestUsedId);
92 static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId,
93 bool *update, Datum *newValues);
94 static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
95 static StripeMetadata * BuildStripeMetadata(Relation columnarStripes,
96 HeapTuple heapTuple);
97 static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
98 chunkGroupCount, Snapshot snapshot);
99 static Oid ColumnarStorageIdSequenceRelationId(void);
100 static Oid ColumnarStripeRelationId(void);
101 static Oid ColumnarStripePKeyIndexRelationId(void);
102 static Oid ColumnarStripeFirstRowNumberIndexRelationId(void);
103 static Oid ColumnarOptionsRelationId(void);
104 static Oid ColumnarOptionsIndexRegclass(void);
105 static Oid ColumnarChunkRelationId(void);
106 static Oid ColumnarChunkGroupRelationId(void);
107 static Oid ColumnarChunkIndexRelationId(void);
108 static Oid ColumnarChunkGroupIndexRelationId(void);
109 static Oid ColumnarNamespaceId(void);
110 static uint64 LookupStorageId(RelFileNode relfilenode);
111 static uint64 GetHighestUsedRowNumber(uint64 storageId);
112 static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
113 AttrNumber storageIdAtrrNumber,
114 Oid storageIdIndexId,
115 uint64 storageId);
116 static ModifyState * StartModifyRelation(Relation rel);
117 static void InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values,
118 bool *nulls);
119 static void DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple);
120 static void FinishModifyRelation(ModifyState *state);
121 static EState * create_estate_for_relation(Relation rel);
122 static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
123 static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
124 static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite);
125 static StripeMetadata * StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber,
126 Snapshot snapshot,
127 RowNumberLookupMode lookupMode);
128 static void CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata);
129
130 PG_FUNCTION_INFO_V1(columnar_relation_storageid);
131
132 /* constants for columnar.options */
133 #define Natts_columnar_options 5
134 #define Anum_columnar_options_regclass 1
135 #define Anum_columnar_options_chunk_group_row_limit 2
136 #define Anum_columnar_options_stripe_row_limit 3
137 #define Anum_columnar_options_compression_level 4
138 #define Anum_columnar_options_compression 5
139
140 /* ----------------
141 * columnar.options definition.
142 * ----------------
143 */
144 typedef struct FormData_columnar_options
145 {
146 Oid regclass;
147 int32 chunk_group_row_limit;
148 int32 stripe_row_limit;
149 int32 compressionLevel;
150 NameData compression;
151
152 #ifdef CATALOG_VARLEN /* variable-length fields start here */
153 #endif
154 } FormData_columnar_options;
155 typedef FormData_columnar_options *Form_columnar_options;
156
157
158 /* constants for columnar.stripe */
159 #define Natts_columnar_stripe 9
160 #define Anum_columnar_stripe_storageid 1
161 #define Anum_columnar_stripe_stripe 2
162 #define Anum_columnar_stripe_file_offset 3
163 #define Anum_columnar_stripe_data_length 4
164 #define Anum_columnar_stripe_column_count 5
165 #define Anum_columnar_stripe_chunk_row_count 6
166 #define Anum_columnar_stripe_row_count 7
167 #define Anum_columnar_stripe_chunk_count 8
168 #define Anum_columnar_stripe_first_row_number 9
169
170 /* constants for columnar.chunk_group */
171 #define Natts_columnar_chunkgroup 4
172 #define Anum_columnar_chunkgroup_storageid 1
173 #define Anum_columnar_chunkgroup_stripe 2
174 #define Anum_columnar_chunkgroup_chunk 3
175 #define Anum_columnar_chunkgroup_row_count 4
176
177 /* constants for columnar.chunk */
178 #define Natts_columnar_chunk 14
179 #define Anum_columnar_chunk_storageid 1
180 #define Anum_columnar_chunk_stripe 2
181 #define Anum_columnar_chunk_attr 3
182 #define Anum_columnar_chunk_chunk 4
183 #define Anum_columnar_chunk_minimum_value 5
184 #define Anum_columnar_chunk_maximum_value 6
185 #define Anum_columnar_chunk_value_stream_offset 7
186 #define Anum_columnar_chunk_value_stream_length 8
187 #define Anum_columnar_chunk_exists_stream_offset 9
188 #define Anum_columnar_chunk_exists_stream_length 10
189 #define Anum_columnar_chunk_value_compression_type 11
190 #define Anum_columnar_chunk_value_compression_level 12
191 #define Anum_columnar_chunk_value_decompressed_size 13
192 #define Anum_columnar_chunk_value_count 14
193
194
195 /*
196 * InitColumnarOptions initialized the columnar table options. Meaning it writes the
197 * default options to the options table if not already existing.
198 */
199 void
InitColumnarOptions(Oid regclass)200 InitColumnarOptions(Oid regclass)
201 {
202 /*
203 * When upgrading we retain options for all columnar tables by upgrading
204 * "columnar.options" catalog table, so we shouldn't do anything here.
205 */
206 if (IsBinaryUpgrade)
207 {
208 return;
209 }
210
211 ColumnarOptions defaultOptions = {
212 .chunkRowCount = columnar_chunk_group_row_limit,
213 .stripeRowCount = columnar_stripe_row_limit,
214 .compressionType = columnar_compression,
215 .compressionLevel = columnar_compression_level
216 };
217
218 WriteColumnarOptions(regclass, &defaultOptions, false);
219 }
220
221
222 /*
223 * SetColumnarOptions writes the passed table options as the authoritive options to the
224 * table irregardless of the optiones already existing or not. This can be used to put a
225 * table in a certain state.
226 */
227 void
SetColumnarOptions(Oid regclass,ColumnarOptions * options)228 SetColumnarOptions(Oid regclass, ColumnarOptions *options)
229 {
230 WriteColumnarOptions(regclass, options, true);
231 }
232
233
234 /*
235 * WriteColumnarOptions writes the options to the catalog table for a given regclass.
236 * - If overwrite is false it will only write the values if there is not already a record
237 * found.
238 * - If overwrite is true it will always write the settings
239 *
240 * The return value indicates if the record has been written.
241 */
242 static bool
WriteColumnarOptions(Oid regclass,ColumnarOptions * options,bool overwrite)243 WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite)
244 {
245 /*
246 * When upgrading we should retain the options from the previous
247 * cluster and don't write new options.
248 */
249 Assert(!IsBinaryUpgrade);
250
251 bool written = false;
252
253 bool nulls[Natts_columnar_options] = { 0 };
254 Datum values[Natts_columnar_options] = {
255 ObjectIdGetDatum(regclass),
256 Int32GetDatum(options->chunkRowCount),
257 Int32GetDatum(options->stripeRowCount),
258 Int32GetDatum(options->compressionLevel),
259 0, /* to be filled below */
260 };
261
262 NameData compressionName = { 0 };
263 namestrcpy(&compressionName, CompressionTypeStr(options->compressionType));
264 values[Anum_columnar_options_compression - 1] = NameGetDatum(&compressionName);
265
266 /* create heap tuple and insert into catalog table */
267 Relation columnarOptions = relation_open(ColumnarOptionsRelationId(),
268 RowExclusiveLock);
269 TupleDesc tupleDescriptor = RelationGetDescr(columnarOptions);
270
271 /* find existing item to perform update if exist */
272 ScanKeyData scanKey[1] = { 0 };
273 ScanKeyInit(&scanKey[0], Anum_columnar_options_regclass, BTEqualStrategyNumber,
274 F_OIDEQ,
275 ObjectIdGetDatum(regclass));
276
277 Relation index = index_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
278 SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
279 1, scanKey);
280
281 HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
282 if (HeapTupleIsValid(heapTuple))
283 {
284 if (overwrite)
285 {
286 /* TODO check if the options are actually different, skip if not changed */
287 /* update existing record */
288 bool update[Natts_columnar_options] = { 0 };
289 update[Anum_columnar_options_chunk_group_row_limit - 1] = true;
290 update[Anum_columnar_options_stripe_row_limit - 1] = true;
291 update[Anum_columnar_options_compression_level - 1] = true;
292 update[Anum_columnar_options_compression - 1] = true;
293
294 HeapTuple tuple = heap_modify_tuple(heapTuple, tupleDescriptor,
295 values, nulls, update);
296 CatalogTupleUpdate(columnarOptions, &tuple->t_self, tuple);
297 written = true;
298 }
299 }
300 else
301 {
302 /* inserting new record */
303 HeapTuple newTuple = heap_form_tuple(tupleDescriptor, values, nulls);
304 CatalogTupleInsert(columnarOptions, newTuple);
305 written = true;
306 }
307
308 if (written)
309 {
310 CommandCounterIncrement();
311 }
312
313 systable_endscan_ordered(scanDescriptor);
314 index_close(index, AccessShareLock);
315 relation_close(columnarOptions, RowExclusiveLock);
316
317 return written;
318 }
319
320
321 /*
322 * DeleteColumnarTableOptions removes the columnar table options for a regclass. When
323 * missingOk is false it will throw an error when no table options can be found.
324 *
325 * Returns whether a record has been removed.
326 */
327 bool
DeleteColumnarTableOptions(Oid regclass,bool missingOk)328 DeleteColumnarTableOptions(Oid regclass, bool missingOk)
329 {
330 bool result = false;
331
332 /*
333 * When upgrading we shouldn't delete or modify table options and
334 * retain options from the previous cluster.
335 */
336 Assert(!IsBinaryUpgrade);
337
338 Relation columnarOptions = try_relation_open(ColumnarOptionsRelationId(),
339 RowExclusiveLock);
340 if (columnarOptions == NULL)
341 {
342 /* extension has been dropped */
343 return false;
344 }
345
346 /* find existing item to remove */
347 ScanKeyData scanKey[1] = { 0 };
348 ScanKeyInit(&scanKey[0], Anum_columnar_options_regclass, BTEqualStrategyNumber,
349 F_OIDEQ,
350 ObjectIdGetDatum(regclass));
351
352 Relation index = index_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
353 SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
354 1, scanKey);
355
356 HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
357 if (HeapTupleIsValid(heapTuple))
358 {
359 CatalogTupleDelete(columnarOptions, &heapTuple->t_self);
360 CommandCounterIncrement();
361
362 result = true;
363 }
364 else if (!missingOk)
365 {
366 ereport(ERROR, (errmsg("missing options for regclass: %d", regclass)));
367 }
368
369 systable_endscan_ordered(scanDescriptor);
370 index_close(index, AccessShareLock);
371 relation_close(columnarOptions, RowExclusiveLock);
372
373 return result;
374 }
375
376
377 bool
ReadColumnarOptions(Oid regclass,ColumnarOptions * options)378 ReadColumnarOptions(Oid regclass, ColumnarOptions *options)
379 {
380 ScanKeyData scanKey[1];
381
382 ScanKeyInit(&scanKey[0], Anum_columnar_options_regclass, BTEqualStrategyNumber,
383 F_OIDEQ,
384 ObjectIdGetDatum(regclass));
385
386 Oid columnarOptionsOid = ColumnarOptionsRelationId();
387 Relation columnarOptions = try_relation_open(columnarOptionsOid, AccessShareLock);
388 if (columnarOptions == NULL)
389 {
390 /*
391 * Extension has been dropped. This can be called while
392 * dropping extension or database via ObjectAccess().
393 */
394 return false;
395 }
396
397 Relation index = try_relation_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
398 if (index == NULL)
399 {
400 table_close(columnarOptions, AccessShareLock);
401
402 /* extension has been dropped */
403 return false;
404 }
405
406 SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarOptions, index, NULL,
407 1, scanKey);
408
409 HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
410 if (HeapTupleIsValid(heapTuple))
411 {
412 Form_columnar_options tupOptions = (Form_columnar_options) GETSTRUCT(heapTuple);
413
414 options->chunkRowCount = tupOptions->chunk_group_row_limit;
415 options->stripeRowCount = tupOptions->stripe_row_limit;
416 options->compressionLevel = tupOptions->compressionLevel;
417 options->compressionType = ParseCompressionType(NameStr(tupOptions->compression));
418 }
419 else
420 {
421 /* populate options with system defaults */
422 options->compressionType = columnar_compression;
423 options->stripeRowCount = columnar_stripe_row_limit;
424 options->chunkRowCount = columnar_chunk_group_row_limit;
425 options->compressionLevel = columnar_compression_level;
426 }
427
428 systable_endscan_ordered(scanDescriptor);
429 index_close(index, AccessShareLock);
430 relation_close(columnarOptions, AccessShareLock);
431
432 return true;
433 }
434
435
436 /*
437 * SaveStripeSkipList saves chunkList for a given stripe as rows
438 * of columnar.chunk.
439 */
440 void
SaveStripeSkipList(RelFileNode relfilenode,uint64 stripe,StripeSkipList * chunkList,TupleDesc tupleDescriptor)441 SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunkList,
442 TupleDesc tupleDescriptor)
443 {
444 uint32 columnIndex = 0;
445 uint32 chunkIndex = 0;
446 uint32 columnCount = chunkList->columnCount;
447
448 uint64 storageId = LookupStorageId(relfilenode);
449 Oid columnarChunkOid = ColumnarChunkRelationId();
450 Relation columnarChunk = table_open(columnarChunkOid, RowExclusiveLock);
451 ModifyState *modifyState = StartModifyRelation(columnarChunk);
452
453 for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
454 {
455 for (chunkIndex = 0; chunkIndex < chunkList->chunkCount; chunkIndex++)
456 {
457 ColumnChunkSkipNode *chunk =
458 &chunkList->chunkSkipNodeArray[columnIndex][chunkIndex];
459
460 Datum values[Natts_columnar_chunk] = {
461 UInt64GetDatum(storageId),
462 Int64GetDatum(stripe),
463 Int32GetDatum(columnIndex + 1),
464 Int32GetDatum(chunkIndex),
465 0, /* to be filled below */
466 0, /* to be filled below */
467 Int64GetDatum(chunk->valueChunkOffset),
468 Int64GetDatum(chunk->valueLength),
469 Int64GetDatum(chunk->existsChunkOffset),
470 Int64GetDatum(chunk->existsLength),
471 Int32GetDatum(chunk->valueCompressionType),
472 Int32GetDatum(chunk->valueCompressionLevel),
473 Int64GetDatum(chunk->decompressedValueSize),
474 Int64GetDatum(chunk->rowCount)
475 };
476
477 bool nulls[Natts_columnar_chunk] = { false };
478
479 if (chunk->hasMinMax)
480 {
481 values[Anum_columnar_chunk_minimum_value - 1] =
482 PointerGetDatum(DatumToBytea(chunk->minimumValue,
483 &tupleDescriptor->attrs[columnIndex]));
484 values[Anum_columnar_chunk_maximum_value - 1] =
485 PointerGetDatum(DatumToBytea(chunk->maximumValue,
486 &tupleDescriptor->attrs[columnIndex]));
487 }
488 else
489 {
490 nulls[Anum_columnar_chunk_minimum_value - 1] = true;
491 nulls[Anum_columnar_chunk_maximum_value - 1] = true;
492 }
493
494 InsertTupleAndEnforceConstraints(modifyState, values, nulls);
495 }
496 }
497
498 FinishModifyRelation(modifyState);
499 table_close(columnarChunk, RowExclusiveLock);
500 }
501
502
503 /*
504 * SaveChunkGroups saves the metadata for given chunk groups in columnar.chunk_group.
505 */
506 void
SaveChunkGroups(RelFileNode relfilenode,uint64 stripe,List * chunkGroupRowCounts)507 SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
508 List *chunkGroupRowCounts)
509 {
510 uint64 storageId = LookupStorageId(relfilenode);
511 Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
512 Relation columnarChunkGroup = table_open(columnarChunkGroupOid, RowExclusiveLock);
513 ModifyState *modifyState = StartModifyRelation(columnarChunkGroup);
514
515 ListCell *lc = NULL;
516 int chunkId = 0;
517
518 foreach(lc, chunkGroupRowCounts)
519 {
520 int64 rowCount = lfirst_int(lc);
521 Datum values[Natts_columnar_chunkgroup] = {
522 UInt64GetDatum(storageId),
523 Int64GetDatum(stripe),
524 Int32GetDatum(chunkId),
525 Int64GetDatum(rowCount)
526 };
527
528 bool nulls[Natts_columnar_chunkgroup] = { false };
529
530 InsertTupleAndEnforceConstraints(modifyState, values, nulls);
531 chunkId++;
532 }
533
534 FinishModifyRelation(modifyState);
535 table_close(columnarChunkGroup, NoLock);
536 }
537
538
539 /*
540 * ReadStripeSkipList fetches chunk metadata for a given stripe.
541 */
542 StripeSkipList *
ReadStripeSkipList(RelFileNode relfilenode,uint64 stripe,TupleDesc tupleDescriptor,uint32 chunkCount,Snapshot snapshot)543 ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
544 uint32 chunkCount, Snapshot snapshot)
545 {
546 int32 columnIndex = 0;
547 HeapTuple heapTuple = NULL;
548 uint32 columnCount = tupleDescriptor->natts;
549 ScanKeyData scanKey[2];
550
551 uint64 storageId = LookupStorageId(relfilenode);
552
553 Oid columnarChunkOid = ColumnarChunkRelationId();
554 Relation columnarChunk = table_open(columnarChunkOid, AccessShareLock);
555 Relation index = index_open(ColumnarChunkIndexRelationId(), AccessShareLock);
556
557 ScanKeyInit(&scanKey[0], Anum_columnar_chunk_storageid,
558 BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
559 ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe,
560 BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
561
562 SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index,
563 snapshot, 2, scanKey);
564
565 StripeSkipList *chunkList = palloc0(sizeof(StripeSkipList));
566 chunkList->chunkCount = chunkCount;
567 chunkList->columnCount = columnCount;
568 chunkList->chunkSkipNodeArray = palloc0(columnCount * sizeof(ColumnChunkSkipNode *));
569 for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
570 {
571 chunkList->chunkSkipNodeArray[columnIndex] =
572 palloc0(chunkCount * sizeof(ColumnChunkSkipNode));
573 }
574
575 while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
576 ForwardScanDirection)))
577 {
578 Datum datumArray[Natts_columnar_chunk];
579 bool isNullArray[Natts_columnar_chunk];
580
581 heap_deform_tuple(heapTuple, RelationGetDescr(columnarChunk), datumArray,
582 isNullArray);
583
584 int32 attr = DatumGetInt32(datumArray[Anum_columnar_chunk_attr - 1]);
585 int32 chunkIndex = DatumGetInt32(datumArray[Anum_columnar_chunk_chunk - 1]);
586
587 if (attr <= 0 || attr > columnCount)
588 {
589 ereport(ERROR, (errmsg("invalid columnar chunk entry"),
590 errdetail("Attribute number out of range: %d", attr)));
591 }
592
593 if (chunkIndex < 0 || chunkIndex >= chunkCount)
594 {
595 ereport(ERROR, (errmsg("invalid columnar chunk entry"),
596 errdetail("Chunk number out of range: %d", chunkIndex)));
597 }
598
599 columnIndex = attr - 1;
600
601 ColumnChunkSkipNode *chunk =
602 &chunkList->chunkSkipNodeArray[columnIndex][chunkIndex];
603 chunk->rowCount = DatumGetInt64(datumArray[Anum_columnar_chunk_value_count -
604 1]);
605 chunk->valueChunkOffset =
606 DatumGetInt64(datumArray[Anum_columnar_chunk_value_stream_offset - 1]);
607 chunk->valueLength =
608 DatumGetInt64(datumArray[Anum_columnar_chunk_value_stream_length - 1]);
609 chunk->existsChunkOffset =
610 DatumGetInt64(datumArray[Anum_columnar_chunk_exists_stream_offset - 1]);
611 chunk->existsLength =
612 DatumGetInt64(datumArray[Anum_columnar_chunk_exists_stream_length - 1]);
613 chunk->valueCompressionType =
614 DatumGetInt32(datumArray[Anum_columnar_chunk_value_compression_type - 1]);
615 chunk->valueCompressionLevel =
616 DatumGetInt32(datumArray[Anum_columnar_chunk_value_compression_level - 1]);
617 chunk->decompressedValueSize =
618 DatumGetInt64(datumArray[Anum_columnar_chunk_value_decompressed_size - 1]);
619
620 if (isNullArray[Anum_columnar_chunk_minimum_value - 1] ||
621 isNullArray[Anum_columnar_chunk_maximum_value - 1])
622 {
623 chunk->hasMinMax = false;
624 }
625 else
626 {
627 bytea *minValue = DatumGetByteaP(
628 datumArray[Anum_columnar_chunk_minimum_value - 1]);
629 bytea *maxValue = DatumGetByteaP(
630 datumArray[Anum_columnar_chunk_maximum_value - 1]);
631
632 chunk->minimumValue =
633 ByteaToDatum(minValue, &tupleDescriptor->attrs[columnIndex]);
634 chunk->maximumValue =
635 ByteaToDatum(maxValue, &tupleDescriptor->attrs[columnIndex]);
636
637 chunk->hasMinMax = true;
638 }
639 }
640
641 systable_endscan_ordered(scanDescriptor);
642 index_close(index, AccessShareLock);
643 table_close(columnarChunk, AccessShareLock);
644
645 chunkList->chunkGroupRowCounts =
646 ReadChunkGroupRowCounts(storageId, stripe, chunkCount, snapshot);
647
648 return chunkList;
649 }
650
651
652 /*
653 * FindStripeByRowNumber returns StripeMetadata for the stripe whose
654 * firstRowNumber is greater than given rowNumber. If no such stripe
655 * exists, then returns NULL.
656 */
657 StripeMetadata *
FindNextStripeByRowNumber(Relation relation,uint64 rowNumber,Snapshot snapshot)658 FindNextStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
659 {
660 return StripeMetadataLookupRowNumber(relation, rowNumber, snapshot, FIND_GREATER);
661 }
662
663
664 /*
665 * FindStripeByRowNumber returns StripeMetadata for the stripe that contains
666 * the row with rowNumber. If no such stripe exists, then returns NULL.
667 */
668 StripeMetadata *
FindStripeByRowNumber(Relation relation,uint64 rowNumber,Snapshot snapshot)669 FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
670 {
671 StripeMetadata *stripeMetadata =
672 FindStripeWithMatchingFirstRowNumber(relation, rowNumber, snapshot);
673 if (!stripeMetadata)
674 {
675 return NULL;
676 }
677
678 if (rowNumber > StripeGetHighestRowNumber(stripeMetadata))
679 {
680 return NULL;
681 }
682
683 return stripeMetadata;
684 }
685
686
687 /*
688 * FindStripeWithMatchingFirstRowNumber returns a StripeMetadata object for
689 * the stripe that has the greatest firstRowNumber among the stripes whose
690 * firstRowNumber is smaller than or equal to given rowNumber. If no such
691 * stripe exists, then returns NULL.
692 *
693 * Note that this doesn't mean that found stripe certainly contains the tuple
694 * with given rowNumber. This is because, it also needs to be verified if
695 * highest row number that found stripe contains is greater than or equal to
696 * given rowNumber. For this reason, unless that additional check is done,
697 * this function is mostly useful for checking against "possible" constraint
698 * violations due to concurrent writes that are not flushed by other backends
699 * yet.
700 */
701 StripeMetadata *
FindStripeWithMatchingFirstRowNumber(Relation relation,uint64 rowNumber,Snapshot snapshot)702 FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber,
703 Snapshot snapshot)
704 {
705 return StripeMetadataLookupRowNumber(relation, rowNumber, snapshot,
706 FIND_LESS_OR_EQUAL);
707 }
708
709
710 /*
711 * StripeWriteState returns write state of given stripe.
712 */
713 StripeWriteStateEnum
StripeWriteState(StripeMetadata * stripeMetadata)714 StripeWriteState(StripeMetadata *stripeMetadata)
715 {
716 if (stripeMetadata->aborted)
717 {
718 return STRIPE_WRITE_ABORTED;
719 }
720 else if (stripeMetadata->rowCount > 0)
721 {
722 return STRIPE_WRITE_FLUSHED;
723 }
724 else
725 {
726 return STRIPE_WRITE_IN_PROGRESS;
727 }
728 }
729
730
731 /*
732 * StripeGetHighestRowNumber returns rowNumber of the row with highest
733 * rowNumber in given stripe.
734 */
735 uint64
StripeGetHighestRowNumber(StripeMetadata * stripeMetadata)736 StripeGetHighestRowNumber(StripeMetadata *stripeMetadata)
737 {
738 return stripeMetadata->firstRowNumber + stripeMetadata->rowCount - 1;
739 }
740
741
742 /*
743 * StripeMetadataLookupRowNumber returns StripeMetadata for the stripe whose
744 * firstRowNumber is less than or equal to (FIND_LESS_OR_EQUAL), or is
745 * greater than (FIND_GREATER) given rowNumber by doing backward index
746 * scan on stripe_first_row_number_idx.
747 * If no such stripe exists, then returns NULL.
748 */
749 static StripeMetadata *
StripeMetadataLookupRowNumber(Relation relation,uint64 rowNumber,Snapshot snapshot,RowNumberLookupMode lookupMode)750 StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot,
751 RowNumberLookupMode lookupMode)
752 {
753 Assert(lookupMode == FIND_LESS_OR_EQUAL || lookupMode == FIND_GREATER);
754
755 StripeMetadata *foundStripeMetadata = NULL;
756
757 uint64 storageId = ColumnarStorageGetStorageId(relation, false);
758 ScanKeyData scanKey[2];
759 ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
760 BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
761
762 StrategyNumber strategyNumber = InvalidStrategy;
763 RegProcedure procedure = InvalidOid;
764 if (lookupMode == FIND_LESS_OR_EQUAL)
765 {
766 strategyNumber = BTLessEqualStrategyNumber;
767 procedure = F_INT8LE;
768 }
769 else if (lookupMode == FIND_GREATER)
770 {
771 strategyNumber = BTGreaterStrategyNumber;
772 procedure = F_INT8GT;
773 }
774 ScanKeyInit(&scanKey[1], Anum_columnar_stripe_first_row_number,
775 strategyNumber, procedure, UInt64GetDatum(rowNumber));
776
777
778 Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock);
779 Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
780 AccessShareLock);
781 SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
782 snapshot, 2,
783 scanKey);
784
785 ScanDirection scanDirection = NoMovementScanDirection;
786 if (lookupMode == FIND_LESS_OR_EQUAL)
787 {
788 scanDirection = BackwardScanDirection;
789 }
790 else if (lookupMode == FIND_GREATER)
791 {
792 scanDirection = ForwardScanDirection;
793 }
794 HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, scanDirection);
795 if (HeapTupleIsValid(heapTuple))
796 {
797 foundStripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
798 }
799
800 systable_endscan_ordered(scanDescriptor);
801 index_close(index, AccessShareLock);
802 table_close(columnarStripes, AccessShareLock);
803
804 return foundStripeMetadata;
805 }
806
807
808 /*
809 * CheckStripeMetadataConsistency first decides if stripe write operation for
810 * given stripe is "flushed", "aborted" or "in-progress", then errors out if
811 * its metadata entry contradicts with this fact.
812 *
813 * Checks performed here are just to catch bugs, so it is encouraged to call
814 * this function whenever a StripeMetadata object is built from an heap tuple
815 * of columnar.stripe. Currently, BuildStripeMetadata is the only function
816 * that does this.
817 */
818 static void
CheckStripeMetadataConsistency(StripeMetadata * stripeMetadata)819 CheckStripeMetadataConsistency(StripeMetadata *stripeMetadata)
820 {
821 bool stripeLooksInProgress =
822 stripeMetadata->rowCount == 0 && stripeMetadata->chunkCount == 0 &&
823 stripeMetadata->fileOffset == ColumnarInvalidLogicalOffset &&
824 stripeMetadata->dataLength == 0;
825
826 /*
827 * Even if stripe is flushed, fileOffset and dataLength might be equal
828 * to 0 for zero column tables, but those two should still be consistent
829 * with respect to each other.
830 */
831 bool stripeLooksFlushed =
832 stripeMetadata->rowCount > 0 && stripeMetadata->chunkCount > 0 &&
833 ((stripeMetadata->fileOffset != ColumnarInvalidLogicalOffset &&
834 stripeMetadata->dataLength > 0) ||
835 (stripeMetadata->fileOffset == ColumnarInvalidLogicalOffset &&
836 stripeMetadata->dataLength == 0));
837
838 StripeWriteStateEnum stripeWriteState = StripeWriteState(stripeMetadata);
839 if (stripeWriteState == STRIPE_WRITE_FLUSHED && stripeLooksFlushed)
840 {
841 /*
842 * If stripe was flushed to disk, then we expect stripe to store
843 * at least one tuple.
844 */
845 return;
846 }
847 else if (stripeWriteState == STRIPE_WRITE_IN_PROGRESS && stripeLooksInProgress)
848 {
849 /*
850 * If stripe was not flushed to disk, then values of given four
851 * fields should match the columns inserted by
852 * InsertEmptyStripeMetadataRow.
853 */
854 return;
855 }
856 else if (stripeWriteState == STRIPE_WRITE_ABORTED && (stripeLooksInProgress ||
857 stripeLooksFlushed))
858 {
859 /*
860 * Stripe metadata entry for an aborted write can be complete or
861 * incomplete. We might have aborted the transaction before or after
862 * inserting into stripe metadata.
863 */
864 return;
865 }
866
867 ereport(ERROR, (errmsg("unexpected stripe state, stripe metadata "
868 "entry for stripe with id=" UINT64_FORMAT
869 " is not consistent", stripeMetadata->id)));
870 }
871
872
873 /*
874 * FindStripeWithHighestRowNumber returns StripeMetadata for the stripe that
875 * has the row with highest rowNumber by doing backward index scan on
876 * stripe_first_row_number_idx. If given relation is empty, then returns NULL.
877 */
878 StripeMetadata *
FindStripeWithHighestRowNumber(Relation relation,Snapshot snapshot)879 FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot)
880 {
881 StripeMetadata *stripeWithHighestRowNumber = NULL;
882
883 uint64 storageId = ColumnarStorageGetStorageId(relation, false);
884 ScanKeyData scanKey[1];
885 ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
886 BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
887
888 Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock);
889 Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
890 AccessShareLock);
891 SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
892 snapshot, 1, scanKey);
893
894 HeapTuple heapTuple = systable_getnext_ordered(scanDescriptor, BackwardScanDirection);
895 if (HeapTupleIsValid(heapTuple))
896 {
897 stripeWithHighestRowNumber = BuildStripeMetadata(columnarStripes, heapTuple);
898 }
899
900 systable_endscan_ordered(scanDescriptor);
901 index_close(index, AccessShareLock);
902 table_close(columnarStripes, AccessShareLock);
903
904 return stripeWithHighestRowNumber;
905 }
906
907
908 /*
909 * ReadChunkGroupRowCounts returns an array of row counts of chunk groups for the
910 * given stripe.
911 */
912 static uint32 *
ReadChunkGroupRowCounts(uint64 storageId,uint64 stripe,uint32 chunkGroupCount,Snapshot snapshot)913 ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,
914 Snapshot snapshot)
915 {
916 Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
917 Relation columnarChunkGroup = table_open(columnarChunkGroupOid, AccessShareLock);
918 Relation index = index_open(ColumnarChunkGroupIndexRelationId(), AccessShareLock);
919
920 ScanKeyData scanKey[2];
921 ScanKeyInit(&scanKey[0], Anum_columnar_chunkgroup_storageid,
922 BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
923 ScanKeyInit(&scanKey[1], Anum_columnar_chunkgroup_stripe,
924 BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
925
926 SysScanDesc scanDescriptor =
927 systable_beginscan_ordered(columnarChunkGroup, index, snapshot, 2, scanKey);
928
929 uint32 chunkGroupIndex = 0;
930 HeapTuple heapTuple = NULL;
931 uint32 *chunkGroupRowCounts = palloc0(chunkGroupCount * sizeof(uint32));
932
933 while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
934 ForwardScanDirection)))
935 {
936 Datum datumArray[Natts_columnar_chunkgroup];
937 bool isNullArray[Natts_columnar_chunkgroup];
938
939 heap_deform_tuple(heapTuple,
940 RelationGetDescr(columnarChunkGroup),
941 datumArray, isNullArray);
942
943 uint32 tupleChunkGroupIndex =
944 DatumGetUInt32(datumArray[Anum_columnar_chunkgroup_chunk - 1]);
945 if (chunkGroupIndex >= chunkGroupCount ||
946 tupleChunkGroupIndex != chunkGroupIndex)
947 {
948 elog(ERROR, "unexpected chunk group");
949 }
950
951 chunkGroupRowCounts[chunkGroupIndex] =
952 (uint32) DatumGetUInt64(datumArray[Anum_columnar_chunkgroup_row_count - 1]);
953 chunkGroupIndex++;
954 }
955
956 if (chunkGroupIndex != chunkGroupCount)
957 {
958 elog(ERROR, "unexpected chunk group count");
959 }
960
961 systable_endscan_ordered(scanDescriptor);
962 index_close(index, AccessShareLock);
963 table_close(columnarChunkGroup, AccessShareLock);
964
965 return chunkGroupRowCounts;
966 }
967
968
969 /*
970 * InsertEmptyStripeMetadataRow adds a row to columnar.stripe for the empty
971 * stripe reservation made for stripeId.
972 */
973 static void
InsertEmptyStripeMetadataRow(uint64 storageId,uint64 stripeId,uint32 columnCount,uint32 chunkGroupRowCount,uint64 firstRowNumber)974 InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCount,
975 uint32 chunkGroupRowCount, uint64 firstRowNumber)
976 {
977 bool nulls[Natts_columnar_stripe] = { false };
978
979 Datum values[Natts_columnar_stripe] = { 0 };
980 values[Anum_columnar_stripe_storageid - 1] =
981 UInt64GetDatum(storageId);
982 values[Anum_columnar_stripe_stripe - 1] =
983 UInt64GetDatum(stripeId);
984 values[Anum_columnar_stripe_column_count - 1] =
985 UInt32GetDatum(columnCount);
986 values[Anum_columnar_stripe_chunk_row_count - 1] =
987 UInt32GetDatum(chunkGroupRowCount);
988 values[Anum_columnar_stripe_first_row_number - 1] =
989 UInt64GetDatum(firstRowNumber);
990
991 /* stripe has no rows yet, so initialize rest of the columns accordingly */
992 values[Anum_columnar_stripe_row_count - 1] =
993 UInt64GetDatum(0);
994 values[Anum_columnar_stripe_file_offset - 1] =
995 UInt64GetDatum(ColumnarInvalidLogicalOffset);
996 values[Anum_columnar_stripe_data_length - 1] =
997 UInt64GetDatum(0);
998 values[Anum_columnar_stripe_chunk_count - 1] =
999 UInt32GetDatum(0);
1000
1001 Oid columnarStripesOid = ColumnarStripeRelationId();
1002 Relation columnarStripes = table_open(columnarStripesOid, RowExclusiveLock);
1003
1004 ModifyState *modifyState = StartModifyRelation(columnarStripes);
1005
1006 InsertTupleAndEnforceConstraints(modifyState, values, nulls);
1007
1008 FinishModifyRelation(modifyState);
1009
1010 table_close(columnarStripes, RowExclusiveLock);
1011 }
1012
1013
1014 /*
1015 * StripesForRelfilenode returns a list of StripeMetadata for stripes
1016 * of the given relfilenode.
1017 */
1018 List *
StripesForRelfilenode(RelFileNode relfilenode)1019 StripesForRelfilenode(RelFileNode relfilenode)
1020 {
1021 uint64 storageId = LookupStorageId(relfilenode);
1022
1023 return ReadDataFileStripeList(storageId, GetTransactionSnapshot());
1024 }
1025
1026
1027 /*
1028 * GetHighestUsedAddress returns the highest used address for the given
1029 * relfilenode across all active and inactive transactions.
1030 *
1031 * This is used by truncate stage of VACUUM, and VACUUM can be called
1032 * for empty tables. So this doesn't throw errors for empty tables and
1033 * returns 0.
1034 */
1035 uint64
GetHighestUsedAddress(RelFileNode relfilenode)1036 GetHighestUsedAddress(RelFileNode relfilenode)
1037 {
1038 uint64 storageId = LookupStorageId(relfilenode);
1039
1040 uint64 highestUsedAddress = 0;
1041 uint64 highestUsedId = 0;
1042 GetHighestUsedAddressAndId(storageId, &highestUsedAddress, &highestUsedId);
1043
1044 return highestUsedAddress;
1045 }
1046
1047
1048 /*
1049 * GetHighestUsedAddressAndId returns the highest used address and id for
1050 * the given relfilenode across all active and inactive transactions.
1051 */
1052 static void
GetHighestUsedAddressAndId(uint64 storageId,uint64 * highestUsedAddress,uint64 * highestUsedId)1053 GetHighestUsedAddressAndId(uint64 storageId,
1054 uint64 *highestUsedAddress,
1055 uint64 *highestUsedId)
1056 {
1057 ListCell *stripeMetadataCell = NULL;
1058
1059 SnapshotData SnapshotDirty;
1060 InitDirtySnapshot(SnapshotDirty);
1061
1062 List *stripeMetadataList = ReadDataFileStripeList(storageId, &SnapshotDirty);
1063
1064 *highestUsedId = 0;
1065
1066 /* file starts with metapage */
1067 *highestUsedAddress = COLUMNAR_BYTES_PER_PAGE;
1068
1069 foreach(stripeMetadataCell, stripeMetadataList)
1070 {
1071 StripeMetadata *stripe = lfirst(stripeMetadataCell);
1072 uint64 lastByte = stripe->fileOffset + stripe->dataLength - 1;
1073 *highestUsedAddress = Max(*highestUsedAddress, lastByte);
1074 *highestUsedId = Max(*highestUsedId, stripe->id);
1075 }
1076 }
1077
1078
1079 /*
1080 * ReserveEmptyStripe reserves an empty stripe for given relation
1081 * and inserts it into columnar.stripe. It is guaranteed that concurrent
1082 * writes won't overwrite the returned stripe.
1083 */
1084 EmptyStripeReservation *
ReserveEmptyStripe(Relation rel,uint64 columnCount,uint64 chunkGroupRowCount,uint64 stripeRowCount)1085 ReserveEmptyStripe(Relation rel, uint64 columnCount, uint64 chunkGroupRowCount,
1086 uint64 stripeRowCount)
1087 {
1088 EmptyStripeReservation *stripeReservation = palloc0(sizeof(EmptyStripeReservation));
1089
1090 uint64 storageId = ColumnarStorageGetStorageId(rel, false);
1091
1092 stripeReservation->stripeId = ColumnarStorageReserveStripeId(rel);
1093 stripeReservation->stripeFirstRowNumber =
1094 ColumnarStorageReserveRowNumber(rel, stripeRowCount);
1095
1096 /*
1097 * XXX: Instead of inserting a dummy entry to columnar.stripe and
1098 * updating it when flushing the stripe, we could have a hash table
1099 * in shared memory for the bookkeeping of ongoing writes.
1100 */
1101 InsertEmptyStripeMetadataRow(storageId, stripeReservation->stripeId,
1102 columnCount, chunkGroupRowCount,
1103 stripeReservation->stripeFirstRowNumber);
1104
1105 return stripeReservation;
1106 }
1107
1108
1109 /*
1110 * CompleteStripeReservation completes reservation of the stripe with
1111 * stripeId for given size and in-place updates related stripe metadata tuple
1112 * to complete reservation.
1113 */
1114 StripeMetadata *
CompleteStripeReservation(Relation rel,uint64 stripeId,uint64 sizeBytes,uint64 rowCount,uint64 chunkCount)1115 CompleteStripeReservation(Relation rel, uint64 stripeId, uint64 sizeBytes,
1116 uint64 rowCount, uint64 chunkCount)
1117 {
1118 uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes);
1119 uint64 storageId = ColumnarStorageGetStorageId(rel, false);
1120
1121 bool update[Natts_columnar_stripe] = { false };
1122 update[Anum_columnar_stripe_file_offset - 1] = true;
1123 update[Anum_columnar_stripe_data_length - 1] = true;
1124 update[Anum_columnar_stripe_row_count - 1] = true;
1125 update[Anum_columnar_stripe_chunk_count - 1] = true;
1126
1127 Datum newValues[Natts_columnar_stripe] = { 0 };
1128 newValues[Anum_columnar_stripe_file_offset - 1] = Int64GetDatum(resLogicalStart);
1129 newValues[Anum_columnar_stripe_data_length - 1] = Int64GetDatum(sizeBytes);
1130 newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount);
1131 newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount);
1132
1133 return UpdateStripeMetadataRow(storageId, stripeId, update, newValues);
1134 }
1135
1136
1137 /*
1138 * UpdateStripeMetadataRow updates stripe metadata tuple for the stripe with
1139 * stripeId according to given newValues and update arrays.
1140 * Note that this function shouldn't be used for the cases where any indexes
1141 * of stripe metadata should be updated according to modifications done.
1142 */
1143 static StripeMetadata *
UpdateStripeMetadataRow(uint64 storageId,uint64 stripeId,bool * update,Datum * newValues)1144 UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
1145 Datum *newValues)
1146 {
1147 SnapshotData dirtySnapshot;
1148 InitDirtySnapshot(dirtySnapshot);
1149
1150 ScanKeyData scanKey[2];
1151 ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
1152 BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
1153 ScanKeyInit(&scanKey[1], Anum_columnar_stripe_stripe,
1154 BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripeId));
1155
1156 Oid columnarStripesOid = ColumnarStripeRelationId();
1157
1158 Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
1159 Relation columnarStripePkeyIndex = index_open(ColumnarStripePKeyIndexRelationId(),
1160 AccessShareLock);
1161
1162 SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes,
1163 columnarStripePkeyIndex,
1164 &dirtySnapshot, 2, scanKey);
1165
1166 HeapTuple oldTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
1167 if (!HeapTupleIsValid(oldTuple))
1168 {
1169 ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, "
1170 "columnar storage with id=" UINT64_FORMAT
1171 " does not have stripe with id=" UINT64_FORMAT,
1172 storageId, stripeId)));
1173 }
1174
1175 /*
1176 * heap_inplace_update already doesn't allow changing size of the original
1177 * tuple, so we don't allow setting any Datum's to NULL values.
1178 */
1179 bool newNulls[Natts_columnar_stripe] = { false };
1180 TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
1181 HeapTuple modifiedTuple = heap_modify_tuple(oldTuple, tupleDescriptor,
1182 newValues, newNulls, update);
1183
1184 heap_inplace_update(columnarStripes, modifiedTuple);
1185
1186 /*
1187 * Existing tuple now contains modifications, because we used
1188 * heap_inplace_update().
1189 */
1190 HeapTuple newTuple = oldTuple;
1191
1192 /*
1193 * Must not pass modifiedTuple, because BuildStripeMetadata expects a real
1194 * heap tuple with MVCC fields.
1195 */
1196 StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes,
1197 newTuple);
1198
1199 CommandCounterIncrement();
1200
1201 systable_endscan_ordered(scanDescriptor);
1202 index_close(columnarStripePkeyIndex, AccessShareLock);
1203 table_close(columnarStripes, AccessShareLock);
1204
1205 /* return StripeMetadata object built from modified tuple */
1206 return modifiedStripeMetadata;
1207 }
1208
1209
1210 /*
1211 * ReadDataFileStripeList reads the stripe list for a given storageId
1212 * in the given snapshot.
1213 */
1214 static List *
ReadDataFileStripeList(uint64 storageId,Snapshot snapshot)1215 ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
1216 {
1217 List *stripeMetadataList = NIL;
1218 ScanKeyData scanKey[1];
1219 HeapTuple heapTuple;
1220
1221 ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
1222 BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
1223
1224 Oid columnarStripesOid = ColumnarStripeRelationId();
1225
1226 Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
1227 Relation index = index_open(ColumnarStripeFirstRowNumberIndexRelationId(),
1228 AccessShareLock);
1229
1230 SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarStripes, index,
1231 snapshot, 1,
1232 scanKey);
1233
1234 while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
1235 ForwardScanDirection)))
1236 {
1237 StripeMetadata *stripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple);
1238 stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
1239 }
1240
1241 systable_endscan_ordered(scanDescriptor);
1242 index_close(index, AccessShareLock);
1243 table_close(columnarStripes, AccessShareLock);
1244
1245 return stripeMetadataList;
1246 }
1247
1248
1249 /*
1250 * BuildStripeMetadata builds a StripeMetadata object from given heap tuple.
1251 *
1252 * NB: heapTuple must be a proper heap tuple with MVCC fields.
1253 */
1254 static StripeMetadata *
BuildStripeMetadata(Relation columnarStripes,HeapTuple heapTuple)1255 BuildStripeMetadata(Relation columnarStripes, HeapTuple heapTuple)
1256 {
1257 Assert(RelationGetRelid(columnarStripes) == ColumnarStripeRelationId());
1258
1259 Datum datumArray[Natts_columnar_stripe];
1260 bool isNullArray[Natts_columnar_stripe];
1261 heap_deform_tuple(heapTuple, RelationGetDescr(columnarStripes),
1262 datumArray, isNullArray);
1263
1264 StripeMetadata *stripeMetadata = palloc0(sizeof(StripeMetadata));
1265 stripeMetadata->id = DatumGetInt64(datumArray[Anum_columnar_stripe_stripe - 1]);
1266 stripeMetadata->fileOffset = DatumGetInt64(
1267 datumArray[Anum_columnar_stripe_file_offset - 1]);
1268 stripeMetadata->dataLength = DatumGetInt64(
1269 datumArray[Anum_columnar_stripe_data_length - 1]);
1270 stripeMetadata->columnCount = DatumGetInt32(
1271 datumArray[Anum_columnar_stripe_column_count - 1]);
1272 stripeMetadata->chunkCount = DatumGetInt32(
1273 datumArray[Anum_columnar_stripe_chunk_count - 1]);
1274 stripeMetadata->chunkGroupRowCount = DatumGetInt32(
1275 datumArray[Anum_columnar_stripe_chunk_row_count - 1]);
1276 stripeMetadata->rowCount = DatumGetInt64(
1277 datumArray[Anum_columnar_stripe_row_count - 1]);
1278 stripeMetadata->firstRowNumber = DatumGetUInt64(
1279 datumArray[Anum_columnar_stripe_first_row_number - 1]);
1280
1281 /*
1282 * If there is unflushed data in a parent transaction, then we would
1283 * have already thrown an error before starting to scan the table.. If
1284 * the data is from an earlier subxact that committed, then it would
1285 * have been flushed already. For this reason, we don't care about
1286 * subtransaction id here.
1287 */
1288 TransactionId entryXmin = HeapTupleHeaderGetXmin(heapTuple->t_data);
1289 stripeMetadata->aborted = !TransactionIdIsInProgress(entryXmin) &&
1290 TransactionIdDidAbort(entryXmin);
1291 stripeMetadata->insertedByCurrentXact =
1292 TransactionIdIsCurrentTransactionId(entryXmin);
1293
1294 CheckStripeMetadataConsistency(stripeMetadata);
1295
1296 return stripeMetadata;
1297 }
1298
1299
1300 /*
1301 * DeleteMetadataRows removes the rows with given relfilenode from columnar
1302 * metadata tables.
1303 */
1304 void
DeleteMetadataRows(RelFileNode relfilenode)1305 DeleteMetadataRows(RelFileNode relfilenode)
1306 {
1307 /*
1308 * During a restore for binary upgrade, metadata tables and indexes may or
1309 * may not exist.
1310 */
1311 if (IsBinaryUpgrade)
1312 {
1313 return;
1314 }
1315
1316 uint64 storageId = LookupStorageId(relfilenode);
1317
1318 DeleteStorageFromColumnarMetadataTable(ColumnarStripeRelationId(),
1319 Anum_columnar_stripe_storageid,
1320 ColumnarStripePKeyIndexRelationId(),
1321 storageId);
1322 DeleteStorageFromColumnarMetadataTable(ColumnarChunkGroupRelationId(),
1323 Anum_columnar_chunkgroup_storageid,
1324 ColumnarChunkGroupIndexRelationId(),
1325 storageId);
1326 DeleteStorageFromColumnarMetadataTable(ColumnarChunkRelationId(),
1327 Anum_columnar_chunk_storageid,
1328 ColumnarChunkIndexRelationId(),
1329 storageId);
1330 }
1331
1332
1333 /*
1334 * DeleteStorageFromColumnarMetadataTable removes the rows with given
1335 * storageId from given columnar metadata table.
1336 */
1337 static void
DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,AttrNumber storageIdAtrrNumber,Oid storageIdIndexId,uint64 storageId)1338 DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
1339 AttrNumber storageIdAtrrNumber,
1340 Oid storageIdIndexId, uint64 storageId)
1341 {
1342 ScanKeyData scanKey[1];
1343 ScanKeyInit(&scanKey[0], storageIdAtrrNumber, BTEqualStrategyNumber,
1344 F_INT8EQ, UInt64GetDatum(storageId));
1345
1346 Relation metadataTable = try_relation_open(metadataTableId, AccessShareLock);
1347 if (metadataTable == NULL)
1348 {
1349 /* extension has been dropped */
1350 return;
1351 }
1352
1353 Relation index = index_open(storageIdIndexId, AccessShareLock);
1354
1355 SysScanDesc scanDescriptor = systable_beginscan_ordered(metadataTable, index, NULL,
1356 1, scanKey);
1357
1358 ModifyState *modifyState = StartModifyRelation(metadataTable);
1359
1360 HeapTuple heapTuple;
1361 while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
1362 ForwardScanDirection)))
1363 {
1364 DeleteTupleAndEnforceConstraints(modifyState, heapTuple);
1365 }
1366
1367 systable_endscan_ordered(scanDescriptor);
1368
1369 FinishModifyRelation(modifyState);
1370
1371 index_close(index, AccessShareLock);
1372 table_close(metadataTable, AccessShareLock);
1373 }
1374
1375
1376 /*
1377 * StartModifyRelation allocates resources for modifications.
1378 */
1379 static ModifyState *
StartModifyRelation(Relation rel)1380 StartModifyRelation(Relation rel)
1381 {
1382 EState *estate = create_estate_for_relation(rel);
1383
1384 #if PG_VERSION_NUM >= PG_VERSION_14
1385 ResultRelInfo *resultRelInfo = makeNode(ResultRelInfo);
1386 InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
1387 #else
1388 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
1389 #endif
1390
1391 /* ExecSimpleRelationInsert, ... require caller to open indexes */
1392 ExecOpenIndices(resultRelInfo, false);
1393
1394 ModifyState *modifyState = palloc(sizeof(ModifyState));
1395 modifyState->rel = rel;
1396 modifyState->estate = estate;
1397 modifyState->resultRelInfo = resultRelInfo;
1398
1399 return modifyState;
1400 }
1401
1402
1403 /*
1404 * InsertTupleAndEnforceConstraints inserts a tuple into a relation and makes
1405 * sure constraints are enforced and indexes are updated.
1406 */
1407 static void
InsertTupleAndEnforceConstraints(ModifyState * state,Datum * values,bool * nulls)1408 InsertTupleAndEnforceConstraints(ModifyState *state, Datum *values, bool *nulls)
1409 {
1410 TupleDesc tupleDescriptor = RelationGetDescr(state->rel);
1411 HeapTuple tuple = heap_form_tuple(tupleDescriptor, values, nulls);
1412
1413 TupleTableSlot *slot = ExecInitExtraTupleSlot(state->estate, tupleDescriptor,
1414 &TTSOpsHeapTuple);
1415
1416 ExecStoreHeapTuple(tuple, slot, false);
1417
1418 /* use ExecSimpleRelationInsert to enforce constraints */
1419 ExecSimpleRelationInsert_compat(state->resultRelInfo, state->estate, slot);
1420 }
1421
1422
1423 /*
1424 * DeleteTupleAndEnforceConstraints deletes a tuple from a relation and
1425 * makes sure constraints (e.g. FK constraints) are enforced.
1426 */
1427 static void
DeleteTupleAndEnforceConstraints(ModifyState * state,HeapTuple heapTuple)1428 DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple)
1429 {
1430 EState *estate = state->estate;
1431 ResultRelInfo *resultRelInfo = state->resultRelInfo;
1432
1433 ItemPointer tid = &(heapTuple->t_self);
1434 simple_heap_delete(state->rel, tid);
1435
1436 /* execute AFTER ROW DELETE Triggers to enforce constraints */
1437 ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL);
1438 }
1439
1440
1441 /*
1442 * FinishModifyRelation cleans up resources after modifications are done.
1443 */
1444 static void
FinishModifyRelation(ModifyState * state)1445 FinishModifyRelation(ModifyState *state)
1446 {
1447 ExecCloseIndices(state->resultRelInfo);
1448
1449 AfterTriggerEndQuery(state->estate);
1450 #if PG_VERSION_NUM >= PG_VERSION_14
1451 ExecCloseResultRelations(state->estate);
1452 ExecCloseRangeTableRelations(state->estate);
1453 #else
1454 ExecCleanUpTriggerState(state->estate);
1455 #endif
1456 ExecResetTupleTable(state->estate->es_tupleTable, false);
1457 FreeExecutorState(state->estate);
1458
1459 CommandCounterIncrement();
1460 }
1461
1462
1463 /*
1464 * Based on a similar function from
1465 * postgres/src/backend/replication/logical/worker.c.
1466 *
1467 * Executor state preparation for evaluation of constraint expressions,
1468 * indexes and triggers.
1469 *
1470 * This is based on similar code in copy.c
1471 */
1472 static EState *
create_estate_for_relation(Relation rel)1473 create_estate_for_relation(Relation rel)
1474 {
1475 EState *estate = CreateExecutorState();
1476
1477 RangeTblEntry *rte = makeNode(RangeTblEntry);
1478 rte->rtekind = RTE_RELATION;
1479 rte->relid = RelationGetRelid(rel);
1480 rte->relkind = rel->rd_rel->relkind;
1481 rte->rellockmode = AccessShareLock;
1482 ExecInitRangeTable(estate, list_make1(rte));
1483
1484 #if PG_VERSION_NUM < PG_VERSION_14
1485 ResultRelInfo *resultRelInfo = makeNode(ResultRelInfo);
1486 InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
1487
1488 estate->es_result_relations = resultRelInfo;
1489 estate->es_num_result_relations = 1;
1490 estate->es_result_relation_info = resultRelInfo;
1491 #endif
1492
1493 estate->es_output_cid = GetCurrentCommandId(true);
1494
1495 /* Prepare to catch AFTER triggers. */
1496 AfterTriggerBeginQuery();
1497
1498 return estate;
1499 }
1500
1501
1502 /*
1503 * DatumToBytea serializes a datum into a bytea value.
1504 */
1505 static bytea *
DatumToBytea(Datum value,Form_pg_attribute attrForm)1506 DatumToBytea(Datum value, Form_pg_attribute attrForm)
1507 {
1508 int datumLength = att_addlength_datum(0, attrForm->attlen, value);
1509 bytea *result = palloc0(datumLength + VARHDRSZ);
1510
1511 SET_VARSIZE(result, datumLength + VARHDRSZ);
1512
1513 if (attrForm->attlen > 0)
1514 {
1515 if (attrForm->attbyval)
1516 {
1517 Datum tmp;
1518 store_att_byval(&tmp, value, attrForm->attlen);
1519
1520 memcpy_s(VARDATA(result), datumLength + VARHDRSZ,
1521 &tmp, attrForm->attlen);
1522 }
1523 else
1524 {
1525 memcpy_s(VARDATA(result), datumLength + VARHDRSZ,
1526 DatumGetPointer(value), attrForm->attlen);
1527 }
1528 }
1529 else
1530 {
1531 memcpy_s(VARDATA(result), datumLength + VARHDRSZ,
1532 DatumGetPointer(value), datumLength);
1533 }
1534
1535 return result;
1536 }
1537
1538
1539 /*
1540 * ByteaToDatum deserializes a value which was previously serialized using
1541 * DatumToBytea.
1542 */
1543 static Datum
ByteaToDatum(bytea * bytes,Form_pg_attribute attrForm)1544 ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm)
1545 {
1546 /*
1547 * We copy the data so the result of this function lives even
1548 * after the byteaDatum is freed.
1549 */
1550 char *binaryDataCopy = palloc0(VARSIZE_ANY_EXHDR(bytes));
1551 memcpy_s(binaryDataCopy, VARSIZE_ANY_EXHDR(bytes),
1552 VARDATA_ANY(bytes), VARSIZE_ANY_EXHDR(bytes));
1553
1554 return fetch_att(binaryDataCopy, attrForm->attbyval, attrForm->attlen);
1555 }
1556
1557
1558 /*
1559 * ColumnarStorageIdSequenceRelationId returns relation id of columnar.stripe.
1560 * TODO: should we cache this similar to citus?
1561 */
1562 static Oid
ColumnarStorageIdSequenceRelationId(void)1563 ColumnarStorageIdSequenceRelationId(void)
1564 {
1565 return get_relname_relid("storageid_seq", ColumnarNamespaceId());
1566 }
1567
1568
1569 /*
1570 * ColumnarStripeRelationId returns relation id of columnar.stripe.
1571 * TODO: should we cache this similar to citus?
1572 */
1573 static Oid
ColumnarStripeRelationId(void)1574 ColumnarStripeRelationId(void)
1575 {
1576 return get_relname_relid("stripe", ColumnarNamespaceId());
1577 }
1578
1579
1580 /*
1581 * ColumnarStripePKeyIndexRelationId returns relation id of columnar.stripe_pkey.
1582 * TODO: should we cache this similar to citus?
1583 */
1584 static Oid
ColumnarStripePKeyIndexRelationId(void)1585 ColumnarStripePKeyIndexRelationId(void)
1586 {
1587 return get_relname_relid("stripe_pkey", ColumnarNamespaceId());
1588 }
1589
1590
1591 /*
1592 * ColumnarStripeFirstRowNumberIndexRelationId returns relation id of
1593 * columnar.stripe_first_row_number_idx.
1594 * TODO: should we cache this similar to citus?
1595 */
1596 static Oid
ColumnarStripeFirstRowNumberIndexRelationId(void)1597 ColumnarStripeFirstRowNumberIndexRelationId(void)
1598 {
1599 return get_relname_relid("stripe_first_row_number_idx", ColumnarNamespaceId());
1600 }
1601
1602
1603 /*
1604 * ColumnarOptionsRelationId returns relation id of columnar.options.
1605 */
1606 static Oid
ColumnarOptionsRelationId(void)1607 ColumnarOptionsRelationId(void)
1608 {
1609 return get_relname_relid("options", ColumnarNamespaceId());
1610 }
1611
1612
1613 /*
1614 * ColumnarOptionsIndexRegclass returns relation id of columnar.options_pkey.
1615 */
1616 static Oid
ColumnarOptionsIndexRegclass(void)1617 ColumnarOptionsIndexRegclass(void)
1618 {
1619 return get_relname_relid("options_pkey", ColumnarNamespaceId());
1620 }
1621
1622
1623 /*
1624 * ColumnarChunkRelationId returns relation id of columnar.chunk.
1625 * TODO: should we cache this similar to citus?
1626 */
1627 static Oid
ColumnarChunkRelationId(void)1628 ColumnarChunkRelationId(void)
1629 {
1630 return get_relname_relid("chunk", ColumnarNamespaceId());
1631 }
1632
1633
1634 /*
1635 * ColumnarChunkGroupRelationId returns relation id of columnar.chunk_group.
1636 * TODO: should we cache this similar to citus?
1637 */
1638 static Oid
ColumnarChunkGroupRelationId(void)1639 ColumnarChunkGroupRelationId(void)
1640 {
1641 return get_relname_relid("chunk_group", ColumnarNamespaceId());
1642 }
1643
1644
1645 /*
1646 * ColumnarChunkIndexRelationId returns relation id of columnar.chunk_pkey.
1647 * TODO: should we cache this similar to citus?
1648 */
1649 static Oid
ColumnarChunkIndexRelationId(void)1650 ColumnarChunkIndexRelationId(void)
1651 {
1652 return get_relname_relid("chunk_pkey", ColumnarNamespaceId());
1653 }
1654
1655
1656 /*
1657 * ColumnarChunkGroupIndexRelationId returns relation id of columnar.chunk_group_pkey.
1658 * TODO: should we cache this similar to citus?
1659 */
1660 static Oid
ColumnarChunkGroupIndexRelationId(void)1661 ColumnarChunkGroupIndexRelationId(void)
1662 {
1663 return get_relname_relid("chunk_group_pkey", ColumnarNamespaceId());
1664 }
1665
1666
1667 /*
1668 * ColumnarNamespaceId returns namespace id of the schema we store columnar
1669 * related tables.
1670 */
1671 static Oid
ColumnarNamespaceId(void)1672 ColumnarNamespaceId(void)
1673 {
1674 return get_namespace_oid("columnar", false);
1675 }
1676
1677
1678 /*
1679 * LookupStorageId reads storage metapage to find the storage ID for the given relfilenode. It returns
1680 * false if the relation doesn't have a meta page yet.
1681 */
1682 static uint64
LookupStorageId(RelFileNode relfilenode)1683 LookupStorageId(RelFileNode relfilenode)
1684 {
1685 Oid relationId = RelidByRelfilenode(relfilenode.spcNode,
1686 relfilenode.relNode);
1687
1688 Relation relation = relation_open(relationId, AccessShareLock);
1689 uint64 storageId = ColumnarStorageGetStorageId(relation, false);
1690 table_close(relation, AccessShareLock);
1691
1692 return storageId;
1693 }
1694
1695
1696 /*
1697 * ColumnarMetadataNewStorageId - create a new, unique storage id and return
1698 * it.
1699 */
1700 uint64
ColumnarMetadataNewStorageId()1701 ColumnarMetadataNewStorageId()
1702 {
1703 return nextval_internal(ColumnarStorageIdSequenceRelationId(), false);
1704 }
1705
1706
1707 /*
1708 * columnar_relation_storageid returns storage id associated with the
1709 * given relation id, or -1 if there is no associated storage id yet.
1710 */
1711 Datum
columnar_relation_storageid(PG_FUNCTION_ARGS)1712 columnar_relation_storageid(PG_FUNCTION_ARGS)
1713 {
1714 Oid relationId = PG_GETARG_OID(0);
1715 Relation relation = relation_open(relationId, AccessShareLock);
1716 if (!IsColumnarTableAmTable(relationId))
1717 {
1718 elog(ERROR, "relation \"%s\" is not a columnar table",
1719 RelationGetRelationName(relation));
1720 }
1721
1722 uint64 storageId = ColumnarStorageGetStorageId(relation, false);
1723
1724 relation_close(relation, AccessShareLock);
1725
1726 PG_RETURN_INT64(storageId);
1727 }
1728
1729
1730 /*
1731 * ColumnarStorageUpdateIfNeeded - upgrade columnar storage to the current version by
1732 * using information from the metadata tables.
1733 */
1734 void
ColumnarStorageUpdateIfNeeded(Relation rel,bool isUpgrade)1735 ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade)
1736 {
1737 if (ColumnarStorageIsCurrent(rel))
1738 {
1739 return;
1740 }
1741
1742 RelationOpenSmgr(rel);
1743 BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
1744 if (nblocks < 2)
1745 {
1746 ColumnarStorageInit(rel->rd_smgr, ColumnarMetadataNewStorageId());
1747 return;
1748 }
1749
1750 uint64 storageId = ColumnarStorageGetStorageId(rel, true);
1751
1752 uint64 highestId;
1753 uint64 highestOffset;
1754 GetHighestUsedAddressAndId(storageId, &highestOffset, &highestId);
1755
1756 uint64 reservedStripeId = highestId + 1;
1757 uint64 reservedOffset = highestOffset + 1;
1758 uint64 reservedRowNumber = GetHighestUsedRowNumber(storageId) + 1;
1759 ColumnarStorageUpdateCurrent(rel, isUpgrade, reservedStripeId,
1760 reservedRowNumber, reservedOffset);
1761 }
1762
1763
1764 /*
1765 * GetHighestUsedRowNumber returns the highest used rowNumber for given
1766 * storageId. Returns COLUMNAR_INVALID_ROW_NUMBER if storage with
1767 * storageId has no stripes.
1768 * Note that normally we would use ColumnarStorageGetReservedRowNumber
1769 * to decide that. However, this function is designed to be used when
1770 * building the metapage itself during upgrades.
1771 */
1772 static uint64
GetHighestUsedRowNumber(uint64 storageId)1773 GetHighestUsedRowNumber(uint64 storageId)
1774 {
1775 uint64 highestRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
1776
1777 List *stripeMetadataList = ReadDataFileStripeList(storageId,
1778 GetTransactionSnapshot());
1779 StripeMetadata *stripeMetadata = NULL;
1780 foreach_ptr(stripeMetadata, stripeMetadataList)
1781 {
1782 highestRowNumber = Max(highestRowNumber,
1783 StripeGetHighestRowNumber(stripeMetadata));
1784 }
1785
1786 return highestRowNumber;
1787 }
1788