1 /*-------------------------------------------------------------------------
2 *
3 * columnar_reader.c
4 *
5 * This file contains function definitions for reading columnar tables. This
6 * includes the logic for reading file level metadata, reading row stripes,
7 * and skipping unrelated row chunks and columns.
8 *
9 * Copyright (c) 2016, Citus Data, Inc.
10 *
11 * $Id$
12 *
13 *-------------------------------------------------------------------------
14 */
15
16
17 #include "postgres.h"
18
19 #include "safe_lib.h"
20
21 #include "access/nbtree.h"
22 #include "access/xact.h"
23 #include "catalog/pg_am.h"
24 #include "commands/defrem.h"
25 #include "distributed/listutils.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/optimizer.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/restrictinfo.h"
31 #include "storage/fd.h"
32 #include "utils/guc.h"
33 #include "utils/memutils.h"
34 #include "utils/lsyscache.h"
35 #include "utils/rel.h"
36
37 #include "columnar/columnar.h"
38 #include "columnar/columnar_storage.h"
39 #include "columnar/columnar_tableam.h"
40 #include "columnar/columnar_version_compat.h"
41
42 #define UNEXPECTED_STRIPE_READ_ERR_MSG \
43 "attempted to read an unexpected stripe while reading columnar " \
44 "table %s, stripe with id=" UINT64_FORMAT " is not flushed"
45
46 typedef struct ChunkGroupReadState
47 {
48 int64 currentRow;
49 int64 rowCount;
50 int columnCount;
51 List *projectedColumnList; /* borrowed reference */
52 ChunkData *chunkGroupData;
53 } ChunkGroupReadState;
54
55 typedef struct StripeReadState
56 {
57 int columnCount;
58 int64 rowCount;
59 int64 currentRow;
60 TupleDesc tupleDescriptor;
61 Relation relation;
62 int chunkGroupIndex;
63 int64 chunkGroupsFiltered;
64 MemoryContext stripeReadContext;
65 StripeBuffers *stripeBuffers; /* allocated in stripeReadContext */
66 List *projectedColumnList; /* borrowed reference */
67 ChunkGroupReadState *chunkGroupReadState; /* owned */
68 } StripeReadState;
69
70 struct ColumnarReadState
71 {
72 TupleDesc tupleDescriptor;
73 Relation relation;
74
75 StripeMetadata *currentStripeMetadata;
76 StripeReadState *stripeReadState;
77
78 /*
79 * Integer list of attribute numbers (1-indexed) for columns needed by the
80 * query.
81 */
82 List *projectedColumnList;
83
84 List *whereClauseList;
85 List *whereClauseVars;
86
87 MemoryContext stripeReadContext;
88 int64 chunkGroupsFiltered;
89
90 /*
91 * Memory context guaranteed to be not freed during scan so we can
92 * safely use for any memory allocations regarding ColumnarReadState
93 * itself.
94 */
95 MemoryContext scanContext;
96
97 Snapshot snapshot;
98 bool snapshotRegisteredByUs;
99 };
100
101 /* static function declarations */
102 static MemoryContext CreateStripeReadMemoryContext(void);
103 static bool ColumnarReadIsCurrentStripe(ColumnarReadState *readState,
104 uint64 rowNumber);
105 static StripeMetadata * ColumnarReadGetCurrentStripe(ColumnarReadState *readState);
106 static void ReadStripeRowByRowNumber(ColumnarReadState *readState,
107 uint64 rowNumber, Datum *columnValues,
108 bool *columnNulls);
109 static bool StripeReadIsCurrentChunkGroup(StripeReadState *stripeReadState,
110 int chunkGroupIndex);
111 static void ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
112 StripeMetadata *stripeMetadata,
113 uint64 stripeRowOffset, Datum *columnValues,
114 bool *columnNulls);
115 static bool StripeReadInProgress(ColumnarReadState *readState);
116 static bool HasUnreadStripe(ColumnarReadState *readState);
117 static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
118 TupleDesc tupleDesc, List *projectedColumnList,
119 List *whereClauseList, List *whereClauseVars,
120 MemoryContext stripeReadContext,
121 Snapshot snapshot);
122 static void AdvanceStripeRead(ColumnarReadState *readState);
123 static bool SnapshotMightSeeUnflushedStripes(Snapshot snapshot);
124 static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
125 bool *columnNulls);
126 static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int
127 chunkIndex,
128 TupleDesc tupleDesc,
129 List *projectedColumnList,
130 MemoryContext cxt);
131 static void EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState);
132 static bool ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState,
133 Datum *columnValues,
134 bool *columnNulls);
135 static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
136 StripeMetadata *stripeMetadata,
137 TupleDesc tupleDescriptor,
138 List *projectedColumnList,
139 List *whereClauseList,
140 List *whereClauseVars,
141 int64 *chunkGroupsFiltered,
142 Snapshot snapshot);
143 static ColumnBuffers * LoadColumnBuffers(Relation relation,
144 ColumnChunkSkipNode *chunkSkipNodeArray,
145 uint32 chunkCount, uint64 stripeOffset,
146 Form_pg_attribute attributeForm);
147 static bool * SelectedChunkMask(StripeSkipList *stripeSkipList,
148 List *whereClauseList, List *whereClauseVars,
149 int64 *chunkGroupsFiltered);
150 static Node * BuildBaseConstraint(Var *variable);
151 static List * GetClauseVars(List *clauses, int natts);
152 static OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
153 static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
154 static void UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue);
155 static StripeSkipList * SelectedChunkSkipList(StripeSkipList *stripeSkipList,
156 bool *projectedColumnMask,
157 bool *selectedChunkMask);
158 static uint32 StripeSkipListRowCount(StripeSkipList *stripeSkipList);
159 static bool * ProjectedColumnMask(uint32 columnCount, List *projectedColumnList);
160 static void DeserializeBoolArray(StringInfo boolArrayBuffer, bool *boolArray,
161 uint32 boolArrayLength);
162 static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray,
163 uint32 datumCount, bool datumTypeByValue,
164 int datumTypeLength, char datumTypeAlign,
165 Datum *datumArray);
166 static ChunkData * DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
167 uint32 rowCount, TupleDesc tupleDescriptor,
168 List *projectedColumnList);
169 static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
170 Form_pg_attribute attributeForm);
171
172 /*
173 * ColumnarBeginRead initializes a columnar read operation. This function returns a
174 * read handle that's used during reading rows and finishing the read operation.
175 *
176 * projectedColumnList is an integer list of attribute numbers (1-indexed).
177 */
178 ColumnarReadState *
ColumnarBeginRead(Relation relation,TupleDesc tupleDescriptor,List * projectedColumnList,List * whereClauseList,MemoryContext scanContext,Snapshot snapshot,bool randomAccess)179 ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
180 List *projectedColumnList, List *whereClauseList,
181 MemoryContext scanContext, Snapshot snapshot,
182 bool randomAccess)
183 {
184 /*
185 * We allocate all stripe specific data in the stripeReadContext, and reset
186 * this memory context before loading a new stripe. This is to avoid memory
187 * leaks.
188 */
189 MemoryContext stripeReadContext = CreateStripeReadMemoryContext();
190
191 ColumnarReadState *readState = palloc0(sizeof(ColumnarReadState));
192 readState->relation = relation;
193 readState->projectedColumnList = projectedColumnList;
194 readState->whereClauseList = whereClauseList;
195 readState->whereClauseVars = GetClauseVars(whereClauseList, tupleDescriptor->natts);
196 readState->chunkGroupsFiltered = 0;
197 readState->tupleDescriptor = tupleDescriptor;
198 readState->stripeReadContext = stripeReadContext;
199 readState->stripeReadState = NULL;
200 readState->scanContext = scanContext;
201
202 /*
203 * Note that ColumnarReadFlushPendingWrites might update those two by
204 * registering a new snapshot.
205 */
206 readState->snapshot = snapshot;
207 readState->snapshotRegisteredByUs = false;
208
209 if (!randomAccess)
210 {
211 /*
212 * When doing random access (i.e.: index scan), we don't need to flush
213 * pending writes until we need to read them.
214 * columnar_index_fetch_tuple would do so when needed.
215 */
216 ColumnarReadFlushPendingWrites(readState);
217
218 /*
219 * AdvanceStripeRead sets currentStripeMetadata for the first stripe
220 * to read if not doing random access. Otherwise, reader (i.e.:
221 * ColumnarReadRowByRowNumber) would already decide the stripe to read
222 * on-the-fly.
223 *
224 * Moreover, Since we don't flush pending writes for random access,
225 * AdvanceStripeRead might encounter with stripe metadata entries due
226 * to current transaction's pending writes even when using an MVCC
227 * snapshot, but AdvanceStripeRead would throw an error for that.
228 * Note that this is not the case with for plain table scan methods
229 * (i.e.: SeqScan and Columnar CustomScan).
230 *
231 * For those reasons, we don't call AdvanceStripeRead if we will do
232 * random access.
233 */
234 AdvanceStripeRead(readState);
235 }
236
237 return readState;
238 }
239
240
241 /*
242 * ColumnarReadFlushPendingWrites flushes pending writes for read operation
243 * and sets a new (registered) snapshot if necessary.
244 *
245 * If it sets a new snapshot, then sets snapshotRegisteredByUs to true to
246 * indicate that caller should unregister the snapshot after finishing read
247 * operation.
248 *
249 * Note that this function assumes that readState's relation and snapshot
250 * fields are already set.
251 */
252 void
ColumnarReadFlushPendingWrites(ColumnarReadState * readState)253 ColumnarReadFlushPendingWrites(ColumnarReadState *readState)
254 {
255 Assert(!readState->snapshotRegisteredByUs);
256
257 Oid relfilenode = readState->relation->rd_node.relNode;
258 FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
259
260 if (readState->snapshot == InvalidSnapshot || !IsMVCCSnapshot(readState->snapshot))
261 {
262 return;
263 }
264
265 /*
266 * If we flushed any pending writes, then we should guarantee that
267 * those writes are visible to us too. For this reason, if given
268 * snapshot is an MVCC snapshot, then we set its curcid to current
269 * command id.
270 *
271 * For simplicity, we do that even if we didn't flush any writes
272 * since we don't see any problem with that.
273 *
274 * XXX: We should either not update cid if we are executing a FETCH
275 * (from cursor) command, or we should have a better way to deal with
276 * pending writes, see the discussion in
277 * https://github.com/citusdata/citus/issues/5231.
278 */
279 PushCopiedSnapshot(readState->snapshot);
280
281 /* now our snapshot is the active one */
282 UpdateActiveSnapshotCommandId();
283 Snapshot newSnapshot = GetActiveSnapshot();
284 RegisterSnapshot(newSnapshot);
285
286 /*
287 * To be able to use UpdateActiveSnapshotCommandId, we pushed the
288 * copied snapshot to the stack. However, we don't need to keep it
289 * there since we will anyway rely on ColumnarReadState->snapshot
290 * during read operation.
291 *
292 * Note that since we registered the snapshot already, we guarantee
293 * that PopActiveSnapshot won't free it.
294 */
295 PopActiveSnapshot();
296
297 readState->snapshot = newSnapshot;
298
299 /* not forget to unregister it when finishing read operation */
300 readState->snapshotRegisteredByUs = true;
301 }
302
303
304 /*
305 * CreateStripeReadMemoryContext creates a memory context to be used when
306 * reading a stripe.
307 */
308 static MemoryContext
CreateStripeReadMemoryContext()309 CreateStripeReadMemoryContext()
310 {
311 return AllocSetContextCreate(CurrentMemoryContext, "Stripe Read Memory Context",
312 ALLOCSET_DEFAULT_SIZES);
313 }
314
315
316 /*
317 * ColumnarReadNextRow tries to read a row from the columnar table. On success, it sets
318 * column values, column nulls and rowNumber (if passed to be non-NULL), and returns true.
319 * If there are no more rows to read, the function returns false.
320 */
321 bool
ColumnarReadNextRow(ColumnarReadState * readState,Datum * columnValues,bool * columnNulls,uint64 * rowNumber)322 ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *columnNulls,
323 uint64 *rowNumber)
324 {
325 while (true)
326 {
327 if (!StripeReadInProgress(readState))
328 {
329 if (!HasUnreadStripe(readState))
330 {
331 return false;
332 }
333
334 readState->stripeReadState = BeginStripeRead(readState->currentStripeMetadata,
335 readState->relation,
336 readState->tupleDescriptor,
337 readState->projectedColumnList,
338 readState->whereClauseList,
339 readState->whereClauseVars,
340 readState->stripeReadContext,
341 readState->snapshot);
342 }
343
344 if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls))
345 {
346 AdvanceStripeRead(readState);
347 continue;
348 }
349
350 if (rowNumber)
351 {
352 *rowNumber = readState->currentStripeMetadata->firstRowNumber +
353 readState->stripeReadState->currentRow - 1;
354 }
355
356 return true;
357 }
358
359 return false;
360 }
361
362
363 /*
364 * ColumnarReadRowByRowNumberOrError is a wrapper around
365 * ColumnarReadRowByRowNumber that throws an error if tuple
366 * with rowNumber does not exist.
367 */
368 void
ColumnarReadRowByRowNumberOrError(ColumnarReadState * readState,uint64 rowNumber,Datum * columnValues,bool * columnNulls)369 ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState,
370 uint64 rowNumber, Datum *columnValues,
371 bool *columnNulls)
372 {
373 if (!ColumnarReadRowByRowNumber(readState, rowNumber,
374 columnValues, columnNulls))
375 {
376 ereport(ERROR, (errmsg("cannot read from columnar table %s, tuple with "
377 "row number " UINT64_FORMAT " does not exist",
378 RelationGetRelationName(readState->relation),
379 rowNumber)));
380 }
381 }
382
383
384 /*
385 * ColumnarReadRowByRowNumber reads row with rowNumber from given relation
386 * into columnValues and columnNulls, and returns true. If no such row
387 * exists, then returns false.
388 */
389 bool
ColumnarReadRowByRowNumber(ColumnarReadState * readState,uint64 rowNumber,Datum * columnValues,bool * columnNulls)390 ColumnarReadRowByRowNumber(ColumnarReadState *readState,
391 uint64 rowNumber, Datum *columnValues,
392 bool *columnNulls)
393 {
394 if (!ColumnarReadIsCurrentStripe(readState, rowNumber))
395 {
396 Relation columnarRelation = readState->relation;
397 Snapshot snapshot = readState->snapshot;
398 StripeMetadata *stripeMetadata = FindStripeByRowNumber(columnarRelation,
399 rowNumber, snapshot);
400 if (stripeMetadata == NULL)
401 {
402 /* no such row exists */
403 return false;
404 }
405
406 if (StripeWriteState(stripeMetadata) != STRIPE_WRITE_FLUSHED)
407 {
408 /*
409 * Callers are expected to skip stripes that are not flushed to
410 * disk yet or should wait for the writer xact to commit or abort,
411 * but let's be on the safe side.
412 */
413 ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG,
414 RelationGetRelationName(columnarRelation),
415 stripeMetadata->id)));
416 }
417
418 /* do the cleanup before reading a new stripe */
419 ColumnarResetRead(readState);
420
421 TupleDesc relationTupleDesc = RelationGetDescr(columnarRelation);
422 List *whereClauseList = NIL;
423 List *whereClauseVars = NIL;
424 MemoryContext stripeReadContext = readState->stripeReadContext;
425 readState->stripeReadState = BeginStripeRead(stripeMetadata,
426 columnarRelation,
427 relationTupleDesc,
428 readState->projectedColumnList,
429 whereClauseList,
430 whereClauseVars,
431 stripeReadContext,
432 snapshot);
433
434 readState->currentStripeMetadata = stripeMetadata;
435 }
436
437 ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls);
438
439 return true;
440 }
441
442
443 /*
444 * ColumnarReadIsCurrentStripe returns true if stripe being read contains
445 * row with given rowNumber.
446 */
447 static bool
ColumnarReadIsCurrentStripe(ColumnarReadState * readState,uint64 rowNumber)448 ColumnarReadIsCurrentStripe(ColumnarReadState *readState, uint64 rowNumber)
449 {
450 if (!StripeReadInProgress(readState))
451 {
452 return false;
453 }
454
455 StripeMetadata *currentStripeMetadata = readState->currentStripeMetadata;
456 if (rowNumber >= currentStripeMetadata->firstRowNumber &&
457 rowNumber <= StripeGetHighestRowNumber(currentStripeMetadata))
458 {
459 return true;
460 }
461
462 return false;
463 }
464
465
466 /*
467 * ColumnarReadGetCurrentStripe returns StripeMetadata for the stripe that is
468 * being read.
469 */
470 static StripeMetadata *
ColumnarReadGetCurrentStripe(ColumnarReadState * readState)471 ColumnarReadGetCurrentStripe(ColumnarReadState *readState)
472 {
473 return readState->currentStripeMetadata;
474 }
475
476
477 /*
478 * ReadStripeRowByRowNumber reads row with rowNumber from given
479 * stripeReadState into columnValues and columnNulls.
480 * Errors out if no such row exists in the stripe being read.
481 */
482 static void
ReadStripeRowByRowNumber(ColumnarReadState * readState,uint64 rowNumber,Datum * columnValues,bool * columnNulls)483 ReadStripeRowByRowNumber(ColumnarReadState *readState,
484 uint64 rowNumber, Datum *columnValues,
485 bool *columnNulls)
486 {
487 StripeMetadata *stripeMetadata = ColumnarReadGetCurrentStripe(readState);
488 StripeReadState *stripeReadState = readState->stripeReadState;
489
490 if (rowNumber < stripeMetadata->firstRowNumber)
491 {
492 /* not expected but be on the safe side */
493 ereport(ERROR, (errmsg("row offset cannot be negative")));
494 }
495
496 /* find the exact chunk group to be read */
497 uint64 stripeRowOffset = rowNumber - stripeMetadata->firstRowNumber;
498 int chunkGroupIndex = stripeRowOffset / stripeMetadata->chunkGroupRowCount;
499 if (!StripeReadIsCurrentChunkGroup(stripeReadState, chunkGroupIndex))
500 {
501 if (stripeReadState->chunkGroupReadState)
502 {
503 EndChunkGroupRead(stripeReadState->chunkGroupReadState);
504 }
505
506 stripeReadState->chunkGroupIndex = chunkGroupIndex;
507 stripeReadState->chunkGroupReadState = BeginChunkGroupRead(
508 stripeReadState->stripeBuffers,
509 stripeReadState->chunkGroupIndex,
510 stripeReadState->tupleDescriptor,
511 stripeReadState->projectedColumnList,
512 stripeReadState->stripeReadContext);
513 }
514
515 ReadChunkGroupRowByRowOffset(stripeReadState->chunkGroupReadState,
516 stripeMetadata, stripeRowOffset,
517 columnValues, columnNulls);
518 }
519
520
521 /*
522 * StripeReadIsCurrentChunkGroup returns true if chunk group being read is
523 * the has given chunkGroupIndex in its stripe.
524 */
525 static bool
StripeReadIsCurrentChunkGroup(StripeReadState * stripeReadState,int chunkGroupIndex)526 StripeReadIsCurrentChunkGroup(StripeReadState *stripeReadState, int chunkGroupIndex)
527 {
528 if (!stripeReadState->chunkGroupReadState)
529 {
530 return false;
531 }
532
533 return (stripeReadState->chunkGroupIndex == chunkGroupIndex);
534 }
535
536
537 /*
538 * ReadChunkGroupRowByRowOffset reads row with stripeRowOffset from given
539 * chunkGroupReadState into columnValues and columnNulls.
540 * Errors out if no such row exists in the chunk group being read.
541 */
542 static void
ReadChunkGroupRowByRowOffset(ChunkGroupReadState * chunkGroupReadState,StripeMetadata * stripeMetadata,uint64 stripeRowOffset,Datum * columnValues,bool * columnNulls)543 ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
544 StripeMetadata *stripeMetadata,
545 uint64 stripeRowOffset, Datum *columnValues,
546 bool *columnNulls)
547 {
548 /* set the exact row number to be read from given chunk roup */
549 chunkGroupReadState->currentRow = stripeRowOffset %
550 stripeMetadata->chunkGroupRowCount;
551 if (!ReadChunkGroupNextRow(chunkGroupReadState, columnValues, columnNulls))
552 {
553 /* not expected but be on the safe side */
554 ereport(ERROR, (errmsg("could not find the row in stripe")));
555 }
556 }
557
558
559 /*
560 * StripeReadInProgress returns true if we already started reading a stripe.
561 */
562 static bool
StripeReadInProgress(ColumnarReadState * readState)563 StripeReadInProgress(ColumnarReadState *readState)
564 {
565 return readState->stripeReadState != NULL;
566 }
567
568
569 /*
570 * HasUnreadStripe returns true if we still have stripes to read during current
571 * read operation.
572 */
573 static bool
HasUnreadStripe(ColumnarReadState * readState)574 HasUnreadStripe(ColumnarReadState *readState)
575 {
576 return readState->currentStripeMetadata != NULL;
577 }
578
579
580 /*
581 * ColumnarRescan clears the position where we were scanning so that the next read starts at
582 * the beginning again
583 */
584 void
ColumnarRescan(ColumnarReadState * readState,List * scanQual)585 ColumnarRescan(ColumnarReadState *readState, List *scanQual)
586 {
587 MemoryContext oldContext = MemoryContextSwitchTo(readState->scanContext);
588
589 ColumnarResetRead(readState);
590
591 /* set currentStripeMetadata for the first stripe to read */
592 AdvanceStripeRead(readState);
593
594 readState->chunkGroupsFiltered = 0;
595
596 readState->whereClauseList = copyObject(scanQual);
597 MemoryContextSwitchTo(oldContext);
598 }
599
600
601 /*
602 * Finishes a columnar read operation.
603 */
604 void
ColumnarEndRead(ColumnarReadState * readState)605 ColumnarEndRead(ColumnarReadState *readState)
606 {
607 if (readState->snapshotRegisteredByUs)
608 {
609 /*
610 * init_columnar_read_state created a new snapshot and registered it,
611 * so now forget it.
612 */
613 UnregisterSnapshot(readState->snapshot);
614 }
615
616 MemoryContextDelete(readState->stripeReadContext);
617 if (readState->currentStripeMetadata)
618 {
619 pfree(readState->currentStripeMetadata);
620 }
621
622 pfree(readState);
623 }
624
625
626 /*
627 * ColumnarResetRead resets the stripe and the chunk group that is
628 * being read currently (if any).
629 */
630 void
ColumnarResetRead(ColumnarReadState * readState)631 ColumnarResetRead(ColumnarReadState *readState)
632 {
633 if (StripeReadInProgress(readState))
634 {
635 pfree(readState->currentStripeMetadata);
636 readState->currentStripeMetadata = NULL;
637
638 readState->stripeReadState = NULL;
639 MemoryContextReset(readState->stripeReadContext);
640 }
641 }
642
643
644 /*
645 * BeginStripeRead allocates state for reading a stripe.
646 */
647 static StripeReadState *
BeginStripeRead(StripeMetadata * stripeMetadata,Relation rel,TupleDesc tupleDesc,List * projectedColumnList,List * whereClauseList,List * whereClauseVars,MemoryContext stripeReadContext,Snapshot snapshot)648 BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
649 List *projectedColumnList, List *whereClauseList, List *whereClauseVars,
650 MemoryContext stripeReadContext, Snapshot snapshot)
651 {
652 MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
653
654 StripeReadState *stripeReadState = palloc0(sizeof(StripeReadState));
655
656 stripeReadState->relation = rel;
657 stripeReadState->tupleDescriptor = tupleDesc;
658 stripeReadState->columnCount = tupleDesc->natts;
659 stripeReadState->chunkGroupReadState = NULL;
660 stripeReadState->projectedColumnList = projectedColumnList;
661 stripeReadState->stripeReadContext = stripeReadContext;
662
663 stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel,
664 stripeMetadata,
665 tupleDesc,
666 projectedColumnList,
667 whereClauseList,
668 whereClauseVars,
669 &stripeReadState->
670 chunkGroupsFiltered,
671 snapshot);
672
673 stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
674
675 MemoryContextSwitchTo(oldContext);
676
677
678 return stripeReadState;
679 }
680
681
682 /*
683 * AdvanceStripeRead updates chunkGroupsFiltered and sets
684 * currentStripeMetadata for next stripe read.
685 */
686 static void
AdvanceStripeRead(ColumnarReadState * readState)687 AdvanceStripeRead(ColumnarReadState *readState)
688 {
689 MemoryContext oldContext = MemoryContextSwitchTo(readState->scanContext);
690
691 /* if not read any stripes yet, start from the first one .. */
692 uint64 lastReadRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
693 if (StripeReadInProgress(readState))
694 {
695 /* .. otherwise, continue with the next stripe */
696 lastReadRowNumber = StripeGetHighestRowNumber(readState->currentStripeMetadata);
697
698 readState->chunkGroupsFiltered +=
699 readState->stripeReadState->chunkGroupsFiltered;
700 }
701
702 readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
703 lastReadRowNumber,
704 readState->snapshot);
705
706 if (readState->currentStripeMetadata &&
707 StripeWriteState(readState->currentStripeMetadata) != STRIPE_WRITE_FLUSHED &&
708 !SnapshotMightSeeUnflushedStripes(readState->snapshot))
709 {
710 /*
711 * To be on the safe side, error out if we don't expect to encounter
712 * with an un-flushed stripe. Otherwise, we will skip such stripes
713 * until finding a flushed one.
714 */
715 ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG,
716 RelationGetRelationName(readState->relation),
717 readState->currentStripeMetadata->id)));
718 }
719
720 while (readState->currentStripeMetadata &&
721 StripeWriteState(readState->currentStripeMetadata) != STRIPE_WRITE_FLUSHED)
722 {
723 readState->currentStripeMetadata =
724 FindNextStripeByRowNumber(readState->relation,
725 readState->currentStripeMetadata->firstRowNumber,
726 readState->snapshot);
727 }
728
729 readState->stripeReadState = NULL;
730 MemoryContextReset(readState->stripeReadContext);
731
732 MemoryContextSwitchTo(oldContext);
733 }
734
735
736 /*
737 * SnapshotMightSeeUnflushedStripes returns true if given snapshot is
738 * expected to see un-flushed stripes either because of other backends'
739 * pending writes or aborted transactions.
740 */
741 static bool
SnapshotMightSeeUnflushedStripes(Snapshot snapshot)742 SnapshotMightSeeUnflushedStripes(Snapshot snapshot)
743 {
744 if (snapshot == InvalidSnapshot)
745 {
746 return false;
747 }
748
749 switch (snapshot->snapshot_type)
750 {
751 case SNAPSHOT_ANY:
752 case SNAPSHOT_DIRTY:
753 case SNAPSHOT_NON_VACUUMABLE:
754 {
755 return true;
756 }
757
758 default:
759 return false;
760 }
761 }
762
763
764 /*
765 * ReadStripeNextRow: If more rows can be read from the current stripe, fill
766 * in non-NULL columnValues and return true. Otherwise, return false.
767 *
768 * On entry, all entries in columnNulls should be true; this function only
769 * sets non-NULL entries.
770 *
771 */
772 static bool
ReadStripeNextRow(StripeReadState * stripeReadState,Datum * columnValues,bool * columnNulls)773 ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
774 bool *columnNulls)
775 {
776 if (stripeReadState->currentRow >= stripeReadState->rowCount)
777 {
778 Assert(stripeReadState->currentRow == stripeReadState->rowCount);
779 return false;
780 }
781
782 while (true)
783 {
784 if (stripeReadState->chunkGroupReadState == NULL)
785 {
786 stripeReadState->chunkGroupReadState = BeginChunkGroupRead(
787 stripeReadState->stripeBuffers,
788 stripeReadState->
789 chunkGroupIndex,
790 stripeReadState->
791 tupleDescriptor,
792 stripeReadState->
793 projectedColumnList,
794 stripeReadState->
795 stripeReadContext);
796 }
797
798 if (!ReadChunkGroupNextRow(stripeReadState->chunkGroupReadState, columnValues,
799 columnNulls))
800 {
801 /* if this chunk group is exhausted, fetch the next one and loop */
802 EndChunkGroupRead(stripeReadState->chunkGroupReadState);
803 stripeReadState->chunkGroupReadState = NULL;
804 stripeReadState->chunkGroupIndex++;
805 continue;
806 }
807
808 stripeReadState->currentRow++;
809 return true;
810 }
811
812 Assert(stripeReadState->currentRow == stripeReadState->rowCount);
813 return false;
814 }
815
816
817 /*
818 * BeginChunkGroupRead allocates state for reading a chunk.
819 */
820 static ChunkGroupReadState *
BeginChunkGroupRead(StripeBuffers * stripeBuffers,int chunkIndex,TupleDesc tupleDesc,List * projectedColumnList,MemoryContext cxt)821 BeginChunkGroupRead(StripeBuffers *stripeBuffers, int chunkIndex, TupleDesc tupleDesc,
822 List *projectedColumnList, MemoryContext cxt)
823 {
824 uint32 chunkGroupRowCount =
825 stripeBuffers->selectedChunkGroupRowCounts[chunkIndex];
826
827 MemoryContext oldContext = MemoryContextSwitchTo(cxt);
828
829 ChunkGroupReadState *chunkGroupReadState = palloc0(sizeof(ChunkGroupReadState));
830
831 chunkGroupReadState->currentRow = 0;
832 chunkGroupReadState->rowCount = chunkGroupRowCount;
833 chunkGroupReadState->columnCount = tupleDesc->natts;
834 chunkGroupReadState->projectedColumnList = projectedColumnList;
835
836 chunkGroupReadState->chunkGroupData = DeserializeChunkData(stripeBuffers, chunkIndex,
837 chunkGroupRowCount,
838 tupleDesc,
839 projectedColumnList);
840 MemoryContextSwitchTo(oldContext);
841
842 return chunkGroupReadState;
843 }
844
845
846 /*
847 * EndChunkRead finishes a chunk read.
848 */
849 static void
EndChunkGroupRead(ChunkGroupReadState * chunkGroupReadState)850 EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState)
851 {
852 FreeChunkData(chunkGroupReadState->chunkGroupData);
853 pfree(chunkGroupReadState);
854 }
855
856
857 /*
858 * ReadChunkGroupNextRow: if more rows can be read from the current chunk
859 * group, fill in non-NULL columnValues and return true. Otherwise, return
860 * false.
861 *
862 * On entry, all entries in columnNulls should be true; this function only
863 * sets non-NULL entries.
864 */
865 static bool
ReadChunkGroupNextRow(ChunkGroupReadState * chunkGroupReadState,Datum * columnValues,bool * columnNulls)866 ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState, Datum *columnValues,
867 bool *columnNulls)
868 {
869 if (chunkGroupReadState->currentRow >= chunkGroupReadState->rowCount)
870 {
871 Assert(chunkGroupReadState->currentRow == chunkGroupReadState->rowCount);
872 return false;
873 }
874
875 /*
876 * Initialize to all-NULL. Only non-NULL projected attributes will be set.
877 */
878 memset(columnNulls, true, sizeof(bool) * chunkGroupReadState->columnCount);
879
880 int attno;
881 foreach_int(attno, chunkGroupReadState->projectedColumnList)
882 {
883 const ChunkData *chunkGroupData = chunkGroupReadState->chunkGroupData;
884 const int rowIndex = chunkGroupReadState->currentRow;
885
886 /* attno is 1-indexed; existsArray is 0-indexed */
887 const uint32 columnIndex = attno - 1;
888
889 if (chunkGroupData->existsArray[columnIndex][rowIndex])
890 {
891 columnValues[columnIndex] = chunkGroupData->valueArray[columnIndex][rowIndex];
892 columnNulls[columnIndex] = false;
893 }
894 }
895
896 chunkGroupReadState->currentRow++;
897 return true;
898 }
899
900
901 /*
902 * ColumnarReadChunkGroupsFiltered
903 *
904 * Return the number of chunk groups filtered during this read operation.
905 */
906 int64
ColumnarReadChunkGroupsFiltered(ColumnarReadState * state)907 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state)
908 {
909 return state->chunkGroupsFiltered;
910 }
911
912
913 /*
914 * CreateEmptyChunkDataArray creates data buffers to keep deserialized exist and
915 * value arrays for requested columns in columnMask.
916 */
917 ChunkData *
CreateEmptyChunkData(uint32 columnCount,bool * columnMask,uint32 chunkGroupRowCount)918 CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowCount)
919 {
920 uint32 columnIndex = 0;
921
922 ChunkData *chunkData = palloc0(sizeof(ChunkData));
923 chunkData->existsArray = palloc0(columnCount * sizeof(bool *));
924 chunkData->valueArray = palloc0(columnCount * sizeof(Datum *));
925 chunkData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo));
926 chunkData->columnCount = columnCount;
927 chunkData->rowCount = chunkGroupRowCount;
928
929 /* allocate chunk memory for deserialized data */
930 for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
931 {
932 if (columnMask[columnIndex])
933 {
934 chunkData->existsArray[columnIndex] = palloc0(chunkGroupRowCount *
935 sizeof(bool));
936 chunkData->valueArray[columnIndex] = palloc0(chunkGroupRowCount *
937 sizeof(Datum));
938 chunkData->valueBufferArray[columnIndex] = NULL;
939 }
940 }
941
942 return chunkData;
943 }
944
945
946 /*
947 * FreeChunkData deallocates data buffers to keep deserialized exist and
948 * value arrays for requested columns in columnMask.
949 * ColumnChunkData->serializedValueBuffer lives in memory read/write context
950 * so it is deallocated automatically when the context is deleted.
951 */
952 void
FreeChunkData(ChunkData * chunkData)953 FreeChunkData(ChunkData *chunkData)
954 {
955 uint32 columnIndex = 0;
956
957 if (chunkData == NULL)
958 {
959 return;
960 }
961
962 for (columnIndex = 0; columnIndex < chunkData->columnCount; columnIndex++)
963 {
964 if (chunkData->existsArray[columnIndex] != NULL)
965 {
966 pfree(chunkData->existsArray[columnIndex]);
967 }
968
969 if (chunkData->valueArray[columnIndex] != NULL)
970 {
971 pfree(chunkData->valueArray[columnIndex]);
972 }
973 }
974
975 pfree(chunkData->existsArray);
976 pfree(chunkData->valueArray);
977 pfree(chunkData);
978 }
979
980
981 /* ColumnarTableRowCount returns the exact row count of a table using skiplists */
982 uint64
ColumnarTableRowCount(Relation relation)983 ColumnarTableRowCount(Relation relation)
984 {
985 ListCell *stripeMetadataCell = NULL;
986 uint64 totalRowCount = 0;
987 List *stripeList = StripesForRelfilenode(relation->rd_node);
988
989 foreach(stripeMetadataCell, stripeList)
990 {
991 StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
992 totalRowCount += stripeMetadata->rowCount;
993 }
994
995 return totalRowCount;
996 }
997
998
999 /*
1000 * LoadFilteredStripeBuffers reads serialized stripe data from the given file.
1001 * The function skips over chunks whose rows are refuted by restriction qualifiers,
1002 * and only loads columns that are projected in the query.
1003 */
1004 static StripeBuffers *
LoadFilteredStripeBuffers(Relation relation,StripeMetadata * stripeMetadata,TupleDesc tupleDescriptor,List * projectedColumnList,List * whereClauseList,List * whereClauseVars,int64 * chunkGroupsFiltered,Snapshot snapshot)1005 LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
1006 TupleDesc tupleDescriptor, List *projectedColumnList,
1007 List *whereClauseList, List *whereClauseVars,
1008 int64 *chunkGroupsFiltered, Snapshot snapshot)
1009 {
1010 uint32 columnIndex = 0;
1011 uint32 columnCount = tupleDescriptor->natts;
1012
1013 bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
1014
1015 StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node,
1016 stripeMetadata->id,
1017 tupleDescriptor,
1018 stripeMetadata->chunkCount,
1019 snapshot);
1020
1021 bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, whereClauseList,
1022 whereClauseVars, chunkGroupsFiltered);
1023
1024 StripeSkipList *selectedChunkSkipList =
1025 SelectedChunkSkipList(stripeSkipList, projectedColumnMask,
1026 selectedChunkMask);
1027
1028 /* load column data for projected columns */
1029 ColumnBuffers **columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
1030
1031 for (columnIndex = 0; columnIndex < stripeMetadata->columnCount; columnIndex++)
1032 {
1033 if (projectedColumnMask[columnIndex])
1034 {
1035 ColumnChunkSkipNode *chunkSkipNode =
1036 selectedChunkSkipList->chunkSkipNodeArray[columnIndex];
1037 Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
1038 uint32 chunkCount = selectedChunkSkipList->chunkCount;
1039
1040 ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, chunkSkipNode,
1041 chunkCount,
1042 stripeMetadata->fileOffset,
1043 attributeForm);
1044
1045 columnBuffersArray[columnIndex] = columnBuffers;
1046 }
1047 }
1048
1049 StripeBuffers *stripeBuffers = palloc0(sizeof(StripeBuffers));
1050 stripeBuffers->columnCount = columnCount;
1051 stripeBuffers->rowCount = StripeSkipListRowCount(selectedChunkSkipList);
1052 stripeBuffers->columnBuffersArray = columnBuffersArray;
1053 stripeBuffers->selectedChunkGroupRowCounts =
1054 selectedChunkSkipList->chunkGroupRowCounts;
1055
1056 return stripeBuffers;
1057 }
1058
1059
1060 /*
1061 * LoadColumnBuffers reads serialized column data from the given file. These
1062 * column data are laid out as sequential chunks in the file; and chunk positions
1063 * and lengths are retrieved from the column chunk skip node array.
1064 */
1065 static ColumnBuffers *
LoadColumnBuffers(Relation relation,ColumnChunkSkipNode * chunkSkipNodeArray,uint32 chunkCount,uint64 stripeOffset,Form_pg_attribute attributeForm)1066 LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray,
1067 uint32 chunkCount, uint64 stripeOffset,
1068 Form_pg_attribute attributeForm)
1069 {
1070 uint32 chunkIndex = 0;
1071 ColumnChunkBuffers **chunkBuffersArray =
1072 palloc0(chunkCount * sizeof(ColumnChunkBuffers *));
1073
1074 for (chunkIndex = 0; chunkIndex < chunkCount; chunkIndex++)
1075 {
1076 chunkBuffersArray[chunkIndex] = palloc0(sizeof(ColumnChunkBuffers));
1077 }
1078
1079 /*
1080 * We first read the "exists" chunks. We don't read "values" array here,
1081 * because "exists" chunks are stored sequentially on disk, and we want to
1082 * minimize disk seeks.
1083 */
1084 for (chunkIndex = 0; chunkIndex < chunkCount; chunkIndex++)
1085 {
1086 ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex];
1087 uint64 existsOffset = stripeOffset + chunkSkipNode->existsChunkOffset;
1088 StringInfo rawExistsBuffer = makeStringInfo();
1089
1090 enlargeStringInfo(rawExistsBuffer, chunkSkipNode->existsLength);
1091 rawExistsBuffer->len = chunkSkipNode->existsLength;
1092 ColumnarStorageRead(relation, existsOffset, rawExistsBuffer->data,
1093 chunkSkipNode->existsLength);
1094
1095 chunkBuffersArray[chunkIndex]->existsBuffer = rawExistsBuffer;
1096 }
1097
1098 /* then read "values" chunks, which are also stored sequentially on disk */
1099 for (chunkIndex = 0; chunkIndex < chunkCount; chunkIndex++)
1100 {
1101 ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex];
1102 CompressionType compressionType = chunkSkipNode->valueCompressionType;
1103 uint64 valueOffset = stripeOffset + chunkSkipNode->valueChunkOffset;
1104 StringInfo rawValueBuffer = makeStringInfo();
1105
1106 enlargeStringInfo(rawValueBuffer, chunkSkipNode->valueLength);
1107 rawValueBuffer->len = chunkSkipNode->valueLength;
1108 ColumnarStorageRead(relation, valueOffset, rawValueBuffer->data,
1109 chunkSkipNode->valueLength);
1110
1111 chunkBuffersArray[chunkIndex]->valueBuffer = rawValueBuffer;
1112 chunkBuffersArray[chunkIndex]->valueCompressionType = compressionType;
1113 chunkBuffersArray[chunkIndex]->decompressedValueSize =
1114 chunkSkipNode->decompressedValueSize;
1115 }
1116
1117 ColumnBuffers *columnBuffers = palloc0(sizeof(ColumnBuffers));
1118 columnBuffers->chunkBuffersArray = chunkBuffersArray;
1119
1120 return columnBuffers;
1121 }
1122
1123
1124 /*
1125 * SelectedChunkMask walks over each column's chunks and checks if a chunk can
1126 * be filtered without reading its data. The filtering happens when all rows in
1127 * the chunk can be refuted by the given qualifier conditions.
1128 */
1129 static bool *
SelectedChunkMask(StripeSkipList * stripeSkipList,List * whereClauseList,List * whereClauseVars,int64 * chunkGroupsFiltered)1130 SelectedChunkMask(StripeSkipList *stripeSkipList, List *whereClauseList,
1131 List *whereClauseVars, int64 *chunkGroupsFiltered)
1132 {
1133 ListCell *columnCell = NULL;
1134 uint32 chunkIndex = 0;
1135
1136 bool *selectedChunkMask = palloc0(stripeSkipList->chunkCount * sizeof(bool));
1137 memset(selectedChunkMask, true, stripeSkipList->chunkCount * sizeof(bool));
1138
1139 foreach(columnCell, whereClauseVars)
1140 {
1141 Var *column = lfirst(columnCell);
1142 uint32 columnIndex = column->varattno - 1;
1143
1144 /* if this column's data type doesn't have a comparator, skip it */
1145 FmgrInfo *comparisonFunction = GetFunctionInfoOrNull(column->vartype,
1146 BTREE_AM_OID,
1147 BTORDER_PROC);
1148 if (comparisonFunction == NULL)
1149 {
1150 continue;
1151 }
1152
1153 Node *baseConstraint = BuildBaseConstraint(column);
1154 for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1155 {
1156 ColumnChunkSkipNode *chunkSkipNodeArray =
1157 stripeSkipList->chunkSkipNodeArray[columnIndex];
1158 ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex];
1159
1160 /*
1161 * A column chunk with comparable data type can miss min/max values
1162 * if all values in the chunk are NULL.
1163 */
1164 if (!chunkSkipNode->hasMinMax)
1165 {
1166 continue;
1167 }
1168
1169 UpdateConstraint(baseConstraint, chunkSkipNode->minimumValue,
1170 chunkSkipNode->maximumValue);
1171
1172 List *constraintList = list_make1(baseConstraint);
1173 bool predicateRefuted =
1174 predicate_refuted_by(constraintList, whereClauseList, false);
1175 if (predicateRefuted && selectedChunkMask[chunkIndex])
1176 {
1177 selectedChunkMask[chunkIndex] = false;
1178 *chunkGroupsFiltered += 1;
1179 }
1180 }
1181 }
1182
1183 return selectedChunkMask;
1184 }
1185
1186
1187 /*
1188 * GetFunctionInfoOrNull first resolves the operator for the given data type,
1189 * access method, and support procedure. The function then uses the resolved
1190 * operator's identifier to fill in a function manager object, and returns
1191 * this object. This function is based on a similar function from CitusDB's code.
1192 */
1193 FmgrInfo *
GetFunctionInfoOrNull(Oid typeId,Oid accessMethodId,int16 procedureId)1194 GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, int16 procedureId)
1195 {
1196 FmgrInfo *functionInfo = NULL;
1197
1198 /* get default operator class from pg_opclass for datum type */
1199 Oid operatorClassId = GetDefaultOpClass(typeId, accessMethodId);
1200 if (operatorClassId == InvalidOid)
1201 {
1202 return NULL;
1203 }
1204
1205 Oid operatorFamilyId = get_opclass_family(operatorClassId);
1206 if (operatorFamilyId == InvalidOid)
1207 {
1208 return NULL;
1209 }
1210
1211 Oid operatorId = get_opfamily_proc(operatorFamilyId, typeId, typeId, procedureId);
1212 if (operatorId != InvalidOid)
1213 {
1214 functionInfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
1215
1216 /* fill in the FmgrInfo struct using the operatorId */
1217 fmgr_info(operatorId, functionInfo);
1218 }
1219
1220 return functionInfo;
1221 }
1222
1223
1224 /*
1225 * BuildBaseConstraint builds and returns a base constraint. This constraint
1226 * implements an expression in the form of (var <= max && var >= min), where
1227 * min and max values represent a chunk's min and max values. These chunk
1228 * values are filled in after the constraint is built. This function is based
1229 * on a similar function from CitusDB's shard pruning logic.
1230 */
1231 static Node *
BuildBaseConstraint(Var * variable)1232 BuildBaseConstraint(Var *variable)
1233 {
1234 OpExpr *lessThanExpr = MakeOpExpression(variable, BTLessEqualStrategyNumber);
1235 OpExpr *greaterThanExpr = MakeOpExpression(variable, BTGreaterEqualStrategyNumber);
1236
1237 Node *baseConstraint = make_and_qual((Node *) lessThanExpr, (Node *) greaterThanExpr);
1238
1239 return baseConstraint;
1240 }
1241
1242
1243 /*
1244 * GetClauseVars extracts the Vars from the given clauses for the purpose of
1245 * building constraints that can be refuted by predicate_refuted_by(). It also
1246 * deduplicates and sorts them.
1247 */
1248 static List *
GetClauseVars(List * whereClauseList,int natts)1249 GetClauseVars(List *whereClauseList, int natts)
1250 {
1251 /*
1252 * We don't recurse into or include aggregates, window functions, or
1253 * PHVs. We don't expect any PHVs during execution; and Vars found inside
1254 * an aggregate or window function aren't going to be useful in forming
1255 * constraints that can be refuted.
1256 */
1257 int flags = 0;
1258 List *vars = pull_var_clause((Node *) whereClauseList, flags);
1259 Var **deduplicate = palloc0(sizeof(Var *) * natts);
1260
1261 ListCell *lc;
1262 foreach(lc, vars)
1263 {
1264 Node *node = lfirst(lc);
1265 Assert(IsA(node, Var));
1266
1267 Var *var = (Var *) node;
1268 int idx = var->varattno - 1;
1269
1270 if (deduplicate[idx] != NULL)
1271 {
1272 /* if they have the same varattno, the rest should be identical */
1273 Assert(equal(var, deduplicate[idx]));
1274 }
1275
1276 deduplicate[idx] = var;
1277 }
1278
1279 List *whereClauseVars = NIL;
1280 for (int i = 0; i < natts; i++)
1281 {
1282 Var *var = deduplicate[i];
1283 if (var != NULL)
1284 {
1285 whereClauseVars = lappend(whereClauseVars, var);
1286 }
1287 }
1288
1289 pfree(deduplicate);
1290
1291 return whereClauseVars;
1292 }
1293
1294
1295 /*
1296 * MakeOpExpression builds an operator expression node. This operator expression
1297 * implements the operator clause as defined by the variable and the strategy
1298 * number. The function is copied from CitusDB's shard pruning logic.
1299 */
1300 static OpExpr *
MakeOpExpression(Var * variable,int16 strategyNumber)1301 MakeOpExpression(Var *variable, int16 strategyNumber)
1302 {
1303 Oid typeId = variable->vartype;
1304 Oid typeModId = variable->vartypmod;
1305 Oid collationId = variable->varcollid;
1306
1307 Oid accessMethodId = BTREE_AM_OID;
1308
1309 /* Load the operator from system catalogs */
1310 Oid operatorId = GetOperatorByType(typeId, accessMethodId, strategyNumber);
1311
1312 Const *constantValue = makeNullConst(typeId, typeModId, collationId);
1313
1314 /* Now make the expression with the given variable and a null constant */
1315 OpExpr *expression = (OpExpr *) make_opclause(operatorId,
1316 InvalidOid, /* no result type yet */
1317 false, /* no return set */
1318 (Expr *) variable,
1319 (Expr *) constantValue,
1320 InvalidOid, collationId);
1321
1322 /* Set implementing function id and result type */
1323 expression->opfuncid = get_opcode(operatorId);
1324 expression->opresulttype = get_func_rettype(expression->opfuncid);
1325
1326 return expression;
1327 }
1328
1329
1330 /*
1331 * GetOperatorByType returns operator Oid for the given type, access method,
1332 * and strategy number. Note that this function incorrectly errors out when
1333 * the given type doesn't have its own operator but can use another compatible
1334 * type's default operator. The function is copied from CitusDB's shard pruning
1335 * logic.
1336 */
1337 static Oid
GetOperatorByType(Oid typeId,Oid accessMethodId,int16 strategyNumber)1338 GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber)
1339 {
1340 /* Get default operator class from pg_opclass */
1341 Oid operatorClassId = GetDefaultOpClass(typeId, accessMethodId);
1342
1343 Oid operatorFamily = get_opclass_family(operatorClassId);
1344
1345 Oid operatorId = get_opfamily_member(operatorFamily, typeId, typeId, strategyNumber);
1346
1347 return operatorId;
1348 }
1349
1350
1351 /*
1352 * UpdateConstraint updates the base constraint with the given min/max values.
1353 * The function is copied from CitusDB's shard pruning logic.
1354 */
1355 static void
UpdateConstraint(Node * baseConstraint,Datum minValue,Datum maxValue)1356 UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue)
1357 {
1358 BoolExpr *andExpr = (BoolExpr *) baseConstraint;
1359 Node *lessThanExpr = (Node *) linitial(andExpr->args);
1360 Node *greaterThanExpr = (Node *) lsecond(andExpr->args);
1361
1362 Node *minNode = get_rightop((Expr *) greaterThanExpr);
1363 Node *maxNode = get_rightop((Expr *) lessThanExpr);
1364
1365 Assert(IsA(minNode, Const));
1366 Assert(IsA(maxNode, Const));
1367
1368 Const *minConstant = (Const *) minNode;
1369 Const *maxConstant = (Const *) maxNode;
1370
1371 minConstant->constvalue = minValue;
1372 maxConstant->constvalue = maxValue;
1373
1374 minConstant->constisnull = false;
1375 maxConstant->constisnull = false;
1376
1377 minConstant->constbyval = true;
1378 maxConstant->constbyval = true;
1379 }
1380
1381
1382 /*
1383 * SelectedChunkSkipList constructs a new StripeSkipList in which the
1384 * non-selected chunks are removed from the given stripeSkipList.
1385 */
1386 static StripeSkipList *
SelectedChunkSkipList(StripeSkipList * stripeSkipList,bool * projectedColumnMask,bool * selectedChunkMask)1387 SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
1388 bool *selectedChunkMask)
1389 {
1390 uint32 selectedChunkCount = 0;
1391 uint32 chunkIndex = 0;
1392 uint32 columnIndex = 0;
1393 uint32 columnCount = stripeSkipList->columnCount;
1394 uint32 selectedChunkIndex = 0;
1395
1396 for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1397 {
1398 if (selectedChunkMask[chunkIndex])
1399 {
1400 selectedChunkCount++;
1401 }
1402 }
1403
1404 ColumnChunkSkipNode **selectedChunkSkipNodeArray =
1405 palloc0(columnCount * sizeof(ColumnChunkSkipNode *));
1406
1407 for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
1408 {
1409 bool firstColumn = columnIndex == 0;
1410 selectedChunkIndex = 0;
1411
1412 /* first column's chunk skip node is always read */
1413 if (!projectedColumnMask[columnIndex] && !firstColumn)
1414 {
1415 selectedChunkSkipNodeArray[columnIndex] = NULL;
1416 continue;
1417 }
1418
1419 Assert(stripeSkipList->chunkSkipNodeArray[columnIndex] != NULL);
1420
1421 selectedChunkSkipNodeArray[columnIndex] = palloc0(selectedChunkCount *
1422 sizeof(ColumnChunkSkipNode));
1423
1424 for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1425 {
1426 if (selectedChunkMask[chunkIndex])
1427 {
1428 selectedChunkSkipNodeArray[columnIndex][selectedChunkIndex] =
1429 stripeSkipList->chunkSkipNodeArray[columnIndex][chunkIndex];
1430 selectedChunkIndex++;
1431 }
1432 }
1433 }
1434
1435 selectedChunkIndex = 0;
1436 uint32 *chunkGroupRowCounts = palloc0(selectedChunkCount * sizeof(uint32));
1437 for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1438 {
1439 if (selectedChunkMask[chunkIndex])
1440 {
1441 chunkGroupRowCounts[selectedChunkIndex++] =
1442 stripeSkipList->chunkGroupRowCounts[chunkIndex];
1443 }
1444 }
1445
1446 StripeSkipList *selectedChunkSkipList = palloc0(sizeof(StripeSkipList));
1447 selectedChunkSkipList->chunkSkipNodeArray = selectedChunkSkipNodeArray;
1448 selectedChunkSkipList->chunkCount = selectedChunkCount;
1449 selectedChunkSkipList->columnCount = stripeSkipList->columnCount;
1450 selectedChunkSkipList->chunkGroupRowCounts = chunkGroupRowCounts;
1451
1452 return selectedChunkSkipList;
1453 }
1454
1455
1456 /*
1457 * StripeSkipListRowCount counts the number of rows in the given stripeSkipList.
1458 * To do this, the function finds the first column, and sums up row counts across
1459 * all chunks for that column.
1460 */
1461 static uint32
StripeSkipListRowCount(StripeSkipList * stripeSkipList)1462 StripeSkipListRowCount(StripeSkipList *stripeSkipList)
1463 {
1464 uint32 stripeSkipListRowCount = 0;
1465 uint32 chunkIndex = 0;
1466 uint32 *chunkGroupRowCounts = stripeSkipList->chunkGroupRowCounts;
1467
1468 for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1469 {
1470 uint32 chunkGroupRowCount = chunkGroupRowCounts[chunkIndex];
1471 stripeSkipListRowCount += chunkGroupRowCount;
1472 }
1473
1474 return stripeSkipListRowCount;
1475 }
1476
1477
1478 /*
1479 * ProjectedColumnMask returns a boolean array in which the projected columns
1480 * from the projected column list are marked as true.
1481 */
1482 static bool *
ProjectedColumnMask(uint32 columnCount,List * projectedColumnList)1483 ProjectedColumnMask(uint32 columnCount, List *projectedColumnList)
1484 {
1485 bool *projectedColumnMask = palloc0(columnCount * sizeof(bool));
1486 int attno;
1487
1488 foreach_int(attno, projectedColumnList)
1489 {
1490 /* attno is 1-indexed; projectedColumnMask is 0-indexed */
1491 int columnIndex = attno - 1;
1492 projectedColumnMask[columnIndex] = true;
1493 }
1494
1495 return projectedColumnMask;
1496 }
1497
1498
1499 /*
1500 * DeserializeBoolArray reads an array of bits from the given buffer and stores
1501 * it in provided bool array.
1502 */
1503 static void
DeserializeBoolArray(StringInfo boolArrayBuffer,bool * boolArray,uint32 boolArrayLength)1504 DeserializeBoolArray(StringInfo boolArrayBuffer, bool *boolArray,
1505 uint32 boolArrayLength)
1506 {
1507 uint32 boolArrayIndex = 0;
1508
1509 uint32 maximumBoolCount = boolArrayBuffer->len * 8;
1510 if (boolArrayLength > maximumBoolCount)
1511 {
1512 ereport(ERROR, (errmsg("insufficient data for reading boolean array")));
1513 }
1514
1515 for (boolArrayIndex = 0; boolArrayIndex < boolArrayLength; boolArrayIndex++)
1516 {
1517 uint32 byteIndex = boolArrayIndex / 8;
1518 uint32 bitIndex = boolArrayIndex % 8;
1519 uint8 bitmask = (1 << bitIndex);
1520
1521 uint8 shiftedBit = (boolArrayBuffer->data[byteIndex] & bitmask);
1522 if (shiftedBit == 0)
1523 {
1524 boolArray[boolArrayIndex] = false;
1525 }
1526 else
1527 {
1528 boolArray[boolArrayIndex] = true;
1529 }
1530 }
1531 }
1532
1533
1534 /*
1535 * DeserializeDatumArray reads an array of datums from the given buffer and stores
1536 * them in provided datumArray. If a value is marked as false in the exists array,
1537 * the function assumes that the datum isn't in the buffer, and simply skips it.
1538 */
1539 static void
DeserializeDatumArray(StringInfo datumBuffer,bool * existsArray,uint32 datumCount,bool datumTypeByValue,int datumTypeLength,char datumTypeAlign,Datum * datumArray)1540 DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCount,
1541 bool datumTypeByValue, int datumTypeLength,
1542 char datumTypeAlign, Datum *datumArray)
1543 {
1544 uint32 datumIndex = 0;
1545 uint32 currentDatumDataOffset = 0;
1546
1547 for (datumIndex = 0; datumIndex < datumCount; datumIndex++)
1548 {
1549 if (!existsArray[datumIndex])
1550 {
1551 continue;
1552 }
1553
1554 char *currentDatumDataPointer = datumBuffer->data + currentDatumDataOffset;
1555
1556 datumArray[datumIndex] = fetch_att(currentDatumDataPointer, datumTypeByValue,
1557 datumTypeLength);
1558 currentDatumDataOffset = att_addlength_datum(currentDatumDataOffset,
1559 datumTypeLength,
1560 currentDatumDataPointer);
1561 currentDatumDataOffset = att_align_nominal(currentDatumDataOffset,
1562 datumTypeAlign);
1563
1564 if (currentDatumDataOffset > datumBuffer->len)
1565 {
1566 ereport(ERROR, (errmsg("insufficient data left in datum buffer")));
1567 }
1568 }
1569 }
1570
1571
1572 /*
1573 * DeserializeChunkGroupData deserializes requested data chunk for all columns and
1574 * stores in chunkDataArray. It uncompresses serialized data if necessary. The
1575 * function also deallocates data buffers used for previous chunk, and compressed
1576 * data buffers for the current chunk which will not be needed again. If a column
1577 * data is not present serialized buffer, then default value (or null) is used
1578 * to fill value array.
1579 */
1580 static ChunkData *
DeserializeChunkData(StripeBuffers * stripeBuffers,uint64 chunkIndex,uint32 rowCount,TupleDesc tupleDescriptor,List * projectedColumnList)1581 DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
1582 uint32 rowCount, TupleDesc tupleDescriptor,
1583 List *projectedColumnList)
1584 {
1585 int columnIndex = 0;
1586 bool *columnMask = ProjectedColumnMask(tupleDescriptor->natts, projectedColumnList);
1587 ChunkData *chunkData = CreateEmptyChunkData(tupleDescriptor->natts, columnMask,
1588 rowCount);
1589
1590 for (columnIndex = 0; columnIndex < stripeBuffers->columnCount; columnIndex++)
1591 {
1592 Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
1593 ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
1594 bool columnAdded = false;
1595
1596 if (columnBuffers == NULL && columnMask[columnIndex])
1597 {
1598 columnAdded = true;
1599 }
1600
1601 if (columnBuffers != NULL)
1602 {
1603 ColumnChunkBuffers *chunkBuffers =
1604 columnBuffers->chunkBuffersArray[chunkIndex];
1605
1606 /* decompress and deserialize current chunk's data */
1607 StringInfo valueBuffer =
1608 DecompressBuffer(chunkBuffers->valueBuffer,
1609 chunkBuffers->valueCompressionType,
1610 chunkBuffers->decompressedValueSize);
1611
1612 DeserializeBoolArray(chunkBuffers->existsBuffer,
1613 chunkData->existsArray[columnIndex],
1614 rowCount);
1615 DeserializeDatumArray(valueBuffer, chunkData->existsArray[columnIndex],
1616 rowCount, attributeForm->attbyval,
1617 attributeForm->attlen, attributeForm->attalign,
1618 chunkData->valueArray[columnIndex]);
1619
1620 /* store current chunk's data buffer to be freed at next chunk read */
1621 chunkData->valueBufferArray[columnIndex] = valueBuffer;
1622 }
1623 else if (columnAdded)
1624 {
1625 /*
1626 * This is a column that was added after creation of this stripe.
1627 * So we use either the default value or NULL.
1628 */
1629 if (attributeForm->atthasdef)
1630 {
1631 int rowIndex = 0;
1632
1633 Datum defaultValue = ColumnDefaultValue(tupleDescriptor->constr,
1634 attributeForm);
1635
1636 for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
1637 {
1638 chunkData->existsArray[columnIndex][rowIndex] = true;
1639 chunkData->valueArray[columnIndex][rowIndex] = defaultValue;
1640 }
1641 }
1642 else
1643 {
1644 memset(chunkData->existsArray[columnIndex], false,
1645 rowCount * sizeof(bool));
1646 }
1647 }
1648 }
1649
1650 return chunkData;
1651 }
1652
1653
1654 /*
1655 * ColumnDefaultValue returns default value for given column. Only const values
1656 * are supported. The function errors on any other default value expressions.
1657 */
1658 static Datum
ColumnDefaultValue(TupleConstr * tupleConstraints,Form_pg_attribute attributeForm)1659 ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeForm)
1660 {
1661 Node *defaultValueNode = NULL;
1662 int defValIndex = 0;
1663
1664 for (defValIndex = 0; defValIndex < tupleConstraints->num_defval; defValIndex++)
1665 {
1666 AttrDefault attrDefault = tupleConstraints->defval[defValIndex];
1667 if (attrDefault.adnum == attributeForm->attnum)
1668 {
1669 defaultValueNode = stringToNode(attrDefault.adbin);
1670 break;
1671 }
1672 }
1673
1674 Assert(defaultValueNode != NULL);
1675
1676 /* try reducing the default value node to a const node */
1677 defaultValueNode = eval_const_expressions(NULL, defaultValueNode);
1678 if (IsA(defaultValueNode, Const))
1679 {
1680 Const *constNode = (Const *) defaultValueNode;
1681 return constNode->constvalue;
1682 }
1683 else
1684 {
1685 const char *columnName = NameStr(attributeForm->attname);
1686 ereport(ERROR, (errmsg("unsupported default value for column \"%s\"", columnName),
1687 errhint("Expression is either mutable or "
1688 "does not evaluate to constant value")));
1689 }
1690 }
1691