1 /*-------------------------------------------------------------------------
2  *
3  * columnar_reader.c
4  *
5  * This file contains function definitions for reading columnar tables. This
6  * includes the logic for reading file level metadata, reading row stripes,
7  * and skipping unrelated row chunks and columns.
8  *
9  * Copyright (c) 2016, Citus Data, Inc.
10  *
11  * $Id$
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 
17 #include "postgres.h"
18 
19 #include "safe_lib.h"
20 
21 #include "access/nbtree.h"
22 #include "access/xact.h"
23 #include "catalog/pg_am.h"
24 #include "commands/defrem.h"
25 #include "distributed/listutils.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/optimizer.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/restrictinfo.h"
31 #include "storage/fd.h"
32 #include "utils/guc.h"
33 #include "utils/memutils.h"
34 #include "utils/lsyscache.h"
35 #include "utils/rel.h"
36 
37 #include "columnar/columnar.h"
38 #include "columnar/columnar_storage.h"
39 #include "columnar/columnar_tableam.h"
40 #include "columnar/columnar_version_compat.h"
41 
42 #define UNEXPECTED_STRIPE_READ_ERR_MSG \
43 	"attempted to read an unexpected stripe while reading columnar " \
44 	"table %s, stripe with id=" UINT64_FORMAT " is not flushed"
45 
46 typedef struct ChunkGroupReadState
47 {
48 	int64 currentRow;
49 	int64 rowCount;
50 	int columnCount;
51 	List *projectedColumnList;  /* borrowed reference */
52 	ChunkData *chunkGroupData;
53 } ChunkGroupReadState;
54 
55 typedef struct StripeReadState
56 {
57 	int columnCount;
58 	int64 rowCount;
59 	int64 currentRow;
60 	TupleDesc tupleDescriptor;
61 	Relation relation;
62 	int chunkGroupIndex;
63 	int64 chunkGroupsFiltered;
64 	MemoryContext stripeReadContext;
65 	StripeBuffers *stripeBuffers;   /* allocated in stripeReadContext */
66 	List *projectedColumnList;      /* borrowed reference */
67 	ChunkGroupReadState *chunkGroupReadState; /* owned */
68 } StripeReadState;
69 
70 struct ColumnarReadState
71 {
72 	TupleDesc tupleDescriptor;
73 	Relation relation;
74 
75 	StripeMetadata *currentStripeMetadata;
76 	StripeReadState *stripeReadState;
77 
78 	/*
79 	 * Integer list of attribute numbers (1-indexed) for columns needed by the
80 	 * query.
81 	 */
82 	List *projectedColumnList;
83 
84 	List *whereClauseList;
85 	List *whereClauseVars;
86 
87 	MemoryContext stripeReadContext;
88 	int64 chunkGroupsFiltered;
89 
90 	/*
91 	 * Memory context guaranteed to be not freed during scan so we can
92 	 * safely use for any memory allocations regarding ColumnarReadState
93 	 * itself.
94 	 */
95 	MemoryContext scanContext;
96 
97 	Snapshot snapshot;
98 	bool snapshotRegisteredByUs;
99 };
100 
101 /* static function declarations */
102 static MemoryContext CreateStripeReadMemoryContext(void);
103 static bool ColumnarReadIsCurrentStripe(ColumnarReadState *readState,
104 										uint64 rowNumber);
105 static StripeMetadata * ColumnarReadGetCurrentStripe(ColumnarReadState *readState);
106 static void ReadStripeRowByRowNumber(ColumnarReadState *readState,
107 									 uint64 rowNumber, Datum *columnValues,
108 									 bool *columnNulls);
109 static bool StripeReadIsCurrentChunkGroup(StripeReadState *stripeReadState,
110 										  int chunkGroupIndex);
111 static void ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
112 										 StripeMetadata *stripeMetadata,
113 										 uint64 stripeRowOffset, Datum *columnValues,
114 										 bool *columnNulls);
115 static bool StripeReadInProgress(ColumnarReadState *readState);
116 static bool HasUnreadStripe(ColumnarReadState *readState);
117 static StripeReadState * BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel,
118 										 TupleDesc tupleDesc, List *projectedColumnList,
119 										 List *whereClauseList, List *whereClauseVars,
120 										 MemoryContext stripeReadContext,
121 										 Snapshot snapshot);
122 static void AdvanceStripeRead(ColumnarReadState *readState);
123 static bool SnapshotMightSeeUnflushedStripes(Snapshot snapshot);
124 static bool ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
125 							  bool *columnNulls);
126 static ChunkGroupReadState * BeginChunkGroupRead(StripeBuffers *stripeBuffers, int
127 												 chunkIndex,
128 												 TupleDesc tupleDesc,
129 												 List *projectedColumnList,
130 												 MemoryContext cxt);
131 static void EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState);
132 static bool ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState,
133 								  Datum *columnValues,
134 								  bool *columnNulls);
135 static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
136 												 StripeMetadata *stripeMetadata,
137 												 TupleDesc tupleDescriptor,
138 												 List *projectedColumnList,
139 												 List *whereClauseList,
140 												 List *whereClauseVars,
141 												 int64 *chunkGroupsFiltered,
142 												 Snapshot snapshot);
143 static ColumnBuffers * LoadColumnBuffers(Relation relation,
144 										 ColumnChunkSkipNode *chunkSkipNodeArray,
145 										 uint32 chunkCount, uint64 stripeOffset,
146 										 Form_pg_attribute attributeForm);
147 static bool * SelectedChunkMask(StripeSkipList *stripeSkipList,
148 								List *whereClauseList, List *whereClauseVars,
149 								int64 *chunkGroupsFiltered);
150 static Node * BuildBaseConstraint(Var *variable);
151 static List * GetClauseVars(List *clauses, int natts);
152 static OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
153 static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
154 static void UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue);
155 static StripeSkipList * SelectedChunkSkipList(StripeSkipList *stripeSkipList,
156 											  bool *projectedColumnMask,
157 											  bool *selectedChunkMask);
158 static uint32 StripeSkipListRowCount(StripeSkipList *stripeSkipList);
159 static bool * ProjectedColumnMask(uint32 columnCount, List *projectedColumnList);
160 static void DeserializeBoolArray(StringInfo boolArrayBuffer, bool *boolArray,
161 								 uint32 boolArrayLength);
162 static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray,
163 								  uint32 datumCount, bool datumTypeByValue,
164 								  int datumTypeLength, char datumTypeAlign,
165 								  Datum *datumArray);
166 static ChunkData * DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
167 										uint32 rowCount, TupleDesc tupleDescriptor,
168 										List *projectedColumnList);
169 static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
170 								Form_pg_attribute attributeForm);
171 
172 /*
173  * ColumnarBeginRead initializes a columnar read operation. This function returns a
174  * read handle that's used during reading rows and finishing the read operation.
175  *
176  * projectedColumnList is an integer list of attribute numbers (1-indexed).
177  */
178 ColumnarReadState *
ColumnarBeginRead(Relation relation,TupleDesc tupleDescriptor,List * projectedColumnList,List * whereClauseList,MemoryContext scanContext,Snapshot snapshot,bool randomAccess)179 ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor,
180 				  List *projectedColumnList, List *whereClauseList,
181 				  MemoryContext scanContext, Snapshot snapshot,
182 				  bool randomAccess)
183 {
184 	/*
185 	 * We allocate all stripe specific data in the stripeReadContext, and reset
186 	 * this memory context before loading a new stripe. This is to avoid memory
187 	 * leaks.
188 	 */
189 	MemoryContext stripeReadContext = CreateStripeReadMemoryContext();
190 
191 	ColumnarReadState *readState = palloc0(sizeof(ColumnarReadState));
192 	readState->relation = relation;
193 	readState->projectedColumnList = projectedColumnList;
194 	readState->whereClauseList = whereClauseList;
195 	readState->whereClauseVars = GetClauseVars(whereClauseList, tupleDescriptor->natts);
196 	readState->chunkGroupsFiltered = 0;
197 	readState->tupleDescriptor = tupleDescriptor;
198 	readState->stripeReadContext = stripeReadContext;
199 	readState->stripeReadState = NULL;
200 	readState->scanContext = scanContext;
201 
202 	/*
203 	 * Note that ColumnarReadFlushPendingWrites might update those two by
204 	 * registering a new snapshot.
205 	 */
206 	readState->snapshot = snapshot;
207 	readState->snapshotRegisteredByUs = false;
208 
209 	if (!randomAccess)
210 	{
211 		/*
212 		 * When doing random access (i.e.: index scan), we don't need to flush
213 		 * pending writes until we need to read them.
214 		 * columnar_index_fetch_tuple would do so when needed.
215 		 */
216 		ColumnarReadFlushPendingWrites(readState);
217 
218 		/*
219 		 * AdvanceStripeRead sets currentStripeMetadata for the first stripe
220 		 * to read if not doing random access. Otherwise, reader (i.e.:
221 		 * ColumnarReadRowByRowNumber) would already decide the stripe to read
222 		 * on-the-fly.
223 		 *
224 		 * Moreover, Since we don't flush pending writes for random access,
225 		 * AdvanceStripeRead might encounter with stripe metadata entries due
226 		 * to current transaction's pending writes even when using an MVCC
227 		 * snapshot, but AdvanceStripeRead would throw an error for that.
228 		 * Note that this is not the case with for plain table scan methods
229 		 * (i.e.: SeqScan and Columnar CustomScan).
230 		 *
231 		 * For those reasons, we don't call AdvanceStripeRead if we will do
232 		 * random access.
233 		 */
234 		AdvanceStripeRead(readState);
235 	}
236 
237 	return readState;
238 }
239 
240 
241 /*
242  * ColumnarReadFlushPendingWrites flushes pending writes for read operation
243  * and sets a new (registered) snapshot if necessary.
244  *
245  * If it sets a new snapshot, then sets snapshotRegisteredByUs to true to
246  * indicate that caller should unregister the snapshot after finishing read
247  * operation.
248  *
249  * Note that this function assumes that readState's relation and snapshot
250  * fields are already set.
251  */
252 void
ColumnarReadFlushPendingWrites(ColumnarReadState * readState)253 ColumnarReadFlushPendingWrites(ColumnarReadState *readState)
254 {
255 	Assert(!readState->snapshotRegisteredByUs);
256 
257 	Oid relfilenode = readState->relation->rd_node.relNode;
258 	FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
259 
260 	if (readState->snapshot == InvalidSnapshot || !IsMVCCSnapshot(readState->snapshot))
261 	{
262 		return;
263 	}
264 
265 	/*
266 	 * If we flushed any pending writes, then we should guarantee that
267 	 * those writes are visible to us too. For this reason, if given
268 	 * snapshot is an MVCC snapshot, then we set its curcid to current
269 	 * command id.
270 	 *
271 	 * For simplicity, we do that even if we didn't flush any writes
272 	 * since we don't see any problem with that.
273 	 *
274 	 * XXX: We should either not update cid if we are executing a FETCH
275 	 * (from cursor) command, or we should have a better way to deal with
276 	 * pending writes, see the discussion in
277 	 * https://github.com/citusdata/citus/issues/5231.
278 	 */
279 	PushCopiedSnapshot(readState->snapshot);
280 
281 	/* now our snapshot is the active one */
282 	UpdateActiveSnapshotCommandId();
283 	Snapshot newSnapshot = GetActiveSnapshot();
284 	RegisterSnapshot(newSnapshot);
285 
286 	/*
287 	 * To be able to use UpdateActiveSnapshotCommandId, we pushed the
288 	 * copied snapshot to the stack. However, we don't need to keep it
289 	 * there since we will anyway rely on ColumnarReadState->snapshot
290 	 * during read operation.
291 	 *
292 	 * Note that since we registered the snapshot already, we guarantee
293 	 * that PopActiveSnapshot won't free it.
294 	 */
295 	PopActiveSnapshot();
296 
297 	readState->snapshot = newSnapshot;
298 
299 	/* not forget to unregister it when finishing read operation */
300 	readState->snapshotRegisteredByUs = true;
301 }
302 
303 
304 /*
305  * CreateStripeReadMemoryContext creates a memory context to be used when
306  * reading a stripe.
307  */
308 static MemoryContext
CreateStripeReadMemoryContext()309 CreateStripeReadMemoryContext()
310 {
311 	return AllocSetContextCreate(CurrentMemoryContext, "Stripe Read Memory Context",
312 								 ALLOCSET_DEFAULT_SIZES);
313 }
314 
315 
316 /*
317  * ColumnarReadNextRow tries to read a row from the columnar table. On success, it sets
318  * column values, column nulls and rowNumber (if passed to be non-NULL), and returns true.
319  * If there are no more rows to read, the function returns false.
320  */
321 bool
ColumnarReadNextRow(ColumnarReadState * readState,Datum * columnValues,bool * columnNulls,uint64 * rowNumber)322 ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *columnNulls,
323 					uint64 *rowNumber)
324 {
325 	while (true)
326 	{
327 		if (!StripeReadInProgress(readState))
328 		{
329 			if (!HasUnreadStripe(readState))
330 			{
331 				return false;
332 			}
333 
334 			readState->stripeReadState = BeginStripeRead(readState->currentStripeMetadata,
335 														 readState->relation,
336 														 readState->tupleDescriptor,
337 														 readState->projectedColumnList,
338 														 readState->whereClauseList,
339 														 readState->whereClauseVars,
340 														 readState->stripeReadContext,
341 														 readState->snapshot);
342 		}
343 
344 		if (!ReadStripeNextRow(readState->stripeReadState, columnValues, columnNulls))
345 		{
346 			AdvanceStripeRead(readState);
347 			continue;
348 		}
349 
350 		if (rowNumber)
351 		{
352 			*rowNumber = readState->currentStripeMetadata->firstRowNumber +
353 						 readState->stripeReadState->currentRow - 1;
354 		}
355 
356 		return true;
357 	}
358 
359 	return false;
360 }
361 
362 
363 /*
364  * ColumnarReadRowByRowNumberOrError is a wrapper around
365  * ColumnarReadRowByRowNumber that throws an error if tuple
366  * with rowNumber does not exist.
367  */
368 void
ColumnarReadRowByRowNumberOrError(ColumnarReadState * readState,uint64 rowNumber,Datum * columnValues,bool * columnNulls)369 ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState,
370 								  uint64 rowNumber, Datum *columnValues,
371 								  bool *columnNulls)
372 {
373 	if (!ColumnarReadRowByRowNumber(readState, rowNumber,
374 									columnValues, columnNulls))
375 	{
376 		ereport(ERROR, (errmsg("cannot read from columnar table %s, tuple with "
377 							   "row number " UINT64_FORMAT " does not exist",
378 							   RelationGetRelationName(readState->relation),
379 							   rowNumber)));
380 	}
381 }
382 
383 
384 /*
385  * ColumnarReadRowByRowNumber reads row with rowNumber from given relation
386  * into columnValues and columnNulls, and returns true. If no such row
387  * exists, then returns false.
388  */
389 bool
ColumnarReadRowByRowNumber(ColumnarReadState * readState,uint64 rowNumber,Datum * columnValues,bool * columnNulls)390 ColumnarReadRowByRowNumber(ColumnarReadState *readState,
391 						   uint64 rowNumber, Datum *columnValues,
392 						   bool *columnNulls)
393 {
394 	if (!ColumnarReadIsCurrentStripe(readState, rowNumber))
395 	{
396 		Relation columnarRelation = readState->relation;
397 		Snapshot snapshot = readState->snapshot;
398 		StripeMetadata *stripeMetadata = FindStripeByRowNumber(columnarRelation,
399 															   rowNumber, snapshot);
400 		if (stripeMetadata == NULL)
401 		{
402 			/* no such row exists */
403 			return false;
404 		}
405 
406 		if (StripeWriteState(stripeMetadata) != STRIPE_WRITE_FLUSHED)
407 		{
408 			/*
409 			 * Callers are expected to skip stripes that are not flushed to
410 			 * disk yet or should wait for the writer xact to commit or abort,
411 			 * but let's be on the safe side.
412 			 */
413 			ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG,
414 								   RelationGetRelationName(columnarRelation),
415 								   stripeMetadata->id)));
416 		}
417 
418 		/* do the cleanup before reading a new stripe */
419 		ColumnarResetRead(readState);
420 
421 		TupleDesc relationTupleDesc = RelationGetDescr(columnarRelation);
422 		List *whereClauseList = NIL;
423 		List *whereClauseVars = NIL;
424 		MemoryContext stripeReadContext = readState->stripeReadContext;
425 		readState->stripeReadState = BeginStripeRead(stripeMetadata,
426 													 columnarRelation,
427 													 relationTupleDesc,
428 													 readState->projectedColumnList,
429 													 whereClauseList,
430 													 whereClauseVars,
431 													 stripeReadContext,
432 													 snapshot);
433 
434 		readState->currentStripeMetadata = stripeMetadata;
435 	}
436 
437 	ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls);
438 
439 	return true;
440 }
441 
442 
443 /*
444  * ColumnarReadIsCurrentStripe returns true if stripe being read contains
445  * row with given rowNumber.
446  */
447 static bool
ColumnarReadIsCurrentStripe(ColumnarReadState * readState,uint64 rowNumber)448 ColumnarReadIsCurrentStripe(ColumnarReadState *readState, uint64 rowNumber)
449 {
450 	if (!StripeReadInProgress(readState))
451 	{
452 		return false;
453 	}
454 
455 	StripeMetadata *currentStripeMetadata = readState->currentStripeMetadata;
456 	if (rowNumber >= currentStripeMetadata->firstRowNumber &&
457 		rowNumber <= StripeGetHighestRowNumber(currentStripeMetadata))
458 	{
459 		return true;
460 	}
461 
462 	return false;
463 }
464 
465 
466 /*
467  * ColumnarReadGetCurrentStripe returns StripeMetadata for the stripe that is
468  * being read.
469  */
470 static StripeMetadata *
ColumnarReadGetCurrentStripe(ColumnarReadState * readState)471 ColumnarReadGetCurrentStripe(ColumnarReadState *readState)
472 {
473 	return readState->currentStripeMetadata;
474 }
475 
476 
477 /*
478  * ReadStripeRowByRowNumber reads row with rowNumber from given
479  * stripeReadState into columnValues and columnNulls.
480  * Errors out if no such row exists in the stripe being read.
481  */
482 static void
ReadStripeRowByRowNumber(ColumnarReadState * readState,uint64 rowNumber,Datum * columnValues,bool * columnNulls)483 ReadStripeRowByRowNumber(ColumnarReadState *readState,
484 						 uint64 rowNumber, Datum *columnValues,
485 						 bool *columnNulls)
486 {
487 	StripeMetadata *stripeMetadata = ColumnarReadGetCurrentStripe(readState);
488 	StripeReadState *stripeReadState = readState->stripeReadState;
489 
490 	if (rowNumber < stripeMetadata->firstRowNumber)
491 	{
492 		/* not expected but be on the safe side */
493 		ereport(ERROR, (errmsg("row offset cannot be negative")));
494 	}
495 
496 	/* find the exact chunk group to be read */
497 	uint64 stripeRowOffset = rowNumber - stripeMetadata->firstRowNumber;
498 	int chunkGroupIndex = stripeRowOffset / stripeMetadata->chunkGroupRowCount;
499 	if (!StripeReadIsCurrentChunkGroup(stripeReadState, chunkGroupIndex))
500 	{
501 		if (stripeReadState->chunkGroupReadState)
502 		{
503 			EndChunkGroupRead(stripeReadState->chunkGroupReadState);
504 		}
505 
506 		stripeReadState->chunkGroupIndex = chunkGroupIndex;
507 		stripeReadState->chunkGroupReadState = BeginChunkGroupRead(
508 			stripeReadState->stripeBuffers,
509 			stripeReadState->chunkGroupIndex,
510 			stripeReadState->tupleDescriptor,
511 			stripeReadState->projectedColumnList,
512 			stripeReadState->stripeReadContext);
513 	}
514 
515 	ReadChunkGroupRowByRowOffset(stripeReadState->chunkGroupReadState,
516 								 stripeMetadata, stripeRowOffset,
517 								 columnValues, columnNulls);
518 }
519 
520 
521 /*
522  * StripeReadIsCurrentChunkGroup returns true if chunk group being read is
523  * the has given chunkGroupIndex in its stripe.
524  */
525 static bool
StripeReadIsCurrentChunkGroup(StripeReadState * stripeReadState,int chunkGroupIndex)526 StripeReadIsCurrentChunkGroup(StripeReadState *stripeReadState, int chunkGroupIndex)
527 {
528 	if (!stripeReadState->chunkGroupReadState)
529 	{
530 		return false;
531 	}
532 
533 	return (stripeReadState->chunkGroupIndex == chunkGroupIndex);
534 }
535 
536 
537 /*
538  * ReadChunkGroupRowByRowOffset reads row with stripeRowOffset from given
539  * chunkGroupReadState into columnValues and columnNulls.
540  * Errors out if no such row exists in the chunk group being read.
541  */
542 static void
ReadChunkGroupRowByRowOffset(ChunkGroupReadState * chunkGroupReadState,StripeMetadata * stripeMetadata,uint64 stripeRowOffset,Datum * columnValues,bool * columnNulls)543 ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
544 							 StripeMetadata *stripeMetadata,
545 							 uint64 stripeRowOffset, Datum *columnValues,
546 							 bool *columnNulls)
547 {
548 	/* set the exact row number to be read from given chunk roup */
549 	chunkGroupReadState->currentRow = stripeRowOffset %
550 									  stripeMetadata->chunkGroupRowCount;
551 	if (!ReadChunkGroupNextRow(chunkGroupReadState, columnValues, columnNulls))
552 	{
553 		/* not expected but be on the safe side */
554 		ereport(ERROR, (errmsg("could not find the row in stripe")));
555 	}
556 }
557 
558 
559 /*
560  * StripeReadInProgress returns true if we already started reading a stripe.
561  */
562 static bool
StripeReadInProgress(ColumnarReadState * readState)563 StripeReadInProgress(ColumnarReadState *readState)
564 {
565 	return readState->stripeReadState != NULL;
566 }
567 
568 
569 /*
570  * HasUnreadStripe returns true if we still have stripes to read during current
571  * read operation.
572  */
573 static bool
HasUnreadStripe(ColumnarReadState * readState)574 HasUnreadStripe(ColumnarReadState *readState)
575 {
576 	return readState->currentStripeMetadata != NULL;
577 }
578 
579 
580 /*
581  * ColumnarRescan clears the position where we were scanning so that the next read starts at
582  * the beginning again
583  */
584 void
ColumnarRescan(ColumnarReadState * readState,List * scanQual)585 ColumnarRescan(ColumnarReadState *readState, List *scanQual)
586 {
587 	MemoryContext oldContext = MemoryContextSwitchTo(readState->scanContext);
588 
589 	ColumnarResetRead(readState);
590 
591 	/* set currentStripeMetadata for the first stripe to read */
592 	AdvanceStripeRead(readState);
593 
594 	readState->chunkGroupsFiltered = 0;
595 
596 	readState->whereClauseList = copyObject(scanQual);
597 	MemoryContextSwitchTo(oldContext);
598 }
599 
600 
601 /*
602  * Finishes a columnar read operation.
603  */
604 void
ColumnarEndRead(ColumnarReadState * readState)605 ColumnarEndRead(ColumnarReadState *readState)
606 {
607 	if (readState->snapshotRegisteredByUs)
608 	{
609 		/*
610 		 * init_columnar_read_state created a new snapshot and registered it,
611 		 * so now forget it.
612 		 */
613 		UnregisterSnapshot(readState->snapshot);
614 	}
615 
616 	MemoryContextDelete(readState->stripeReadContext);
617 	if (readState->currentStripeMetadata)
618 	{
619 		pfree(readState->currentStripeMetadata);
620 	}
621 
622 	pfree(readState);
623 }
624 
625 
626 /*
627  * ColumnarResetRead resets the stripe and the chunk group that is
628  * being read currently (if any).
629  */
630 void
ColumnarResetRead(ColumnarReadState * readState)631 ColumnarResetRead(ColumnarReadState *readState)
632 {
633 	if (StripeReadInProgress(readState))
634 	{
635 		pfree(readState->currentStripeMetadata);
636 		readState->currentStripeMetadata = NULL;
637 
638 		readState->stripeReadState = NULL;
639 		MemoryContextReset(readState->stripeReadContext);
640 	}
641 }
642 
643 
644 /*
645  * BeginStripeRead allocates state for reading a stripe.
646  */
647 static StripeReadState *
BeginStripeRead(StripeMetadata * stripeMetadata,Relation rel,TupleDesc tupleDesc,List * projectedColumnList,List * whereClauseList,List * whereClauseVars,MemoryContext stripeReadContext,Snapshot snapshot)648 BeginStripeRead(StripeMetadata *stripeMetadata, Relation rel, TupleDesc tupleDesc,
649 				List *projectedColumnList, List *whereClauseList, List *whereClauseVars,
650 				MemoryContext stripeReadContext, Snapshot snapshot)
651 {
652 	MemoryContext oldContext = MemoryContextSwitchTo(stripeReadContext);
653 
654 	StripeReadState *stripeReadState = palloc0(sizeof(StripeReadState));
655 
656 	stripeReadState->relation = rel;
657 	stripeReadState->tupleDescriptor = tupleDesc;
658 	stripeReadState->columnCount = tupleDesc->natts;
659 	stripeReadState->chunkGroupReadState = NULL;
660 	stripeReadState->projectedColumnList = projectedColumnList;
661 	stripeReadState->stripeReadContext = stripeReadContext;
662 
663 	stripeReadState->stripeBuffers = LoadFilteredStripeBuffers(rel,
664 															   stripeMetadata,
665 															   tupleDesc,
666 															   projectedColumnList,
667 															   whereClauseList,
668 															   whereClauseVars,
669 															   &stripeReadState->
670 															   chunkGroupsFiltered,
671 															   snapshot);
672 
673 	stripeReadState->rowCount = stripeReadState->stripeBuffers->rowCount;
674 
675 	MemoryContextSwitchTo(oldContext);
676 
677 
678 	return stripeReadState;
679 }
680 
681 
682 /*
683  * AdvanceStripeRead updates chunkGroupsFiltered and sets
684  * currentStripeMetadata for next stripe read.
685  */
686 static void
AdvanceStripeRead(ColumnarReadState * readState)687 AdvanceStripeRead(ColumnarReadState *readState)
688 {
689 	MemoryContext oldContext = MemoryContextSwitchTo(readState->scanContext);
690 
691 	/* if not read any stripes yet, start from the first one .. */
692 	uint64 lastReadRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
693 	if (StripeReadInProgress(readState))
694 	{
695 		/* .. otherwise, continue with the next stripe */
696 		lastReadRowNumber = StripeGetHighestRowNumber(readState->currentStripeMetadata);
697 
698 		readState->chunkGroupsFiltered +=
699 			readState->stripeReadState->chunkGroupsFiltered;
700 	}
701 
702 	readState->currentStripeMetadata = FindNextStripeByRowNumber(readState->relation,
703 																 lastReadRowNumber,
704 																 readState->snapshot);
705 
706 	if (readState->currentStripeMetadata &&
707 		StripeWriteState(readState->currentStripeMetadata) != STRIPE_WRITE_FLUSHED &&
708 		!SnapshotMightSeeUnflushedStripes(readState->snapshot))
709 	{
710 		/*
711 		 * To be on the safe side, error out if we don't expect to encounter
712 		 * with an un-flushed stripe. Otherwise, we will skip such stripes
713 		 * until finding a flushed one.
714 		 */
715 		ereport(ERROR, (errmsg(UNEXPECTED_STRIPE_READ_ERR_MSG,
716 							   RelationGetRelationName(readState->relation),
717 							   readState->currentStripeMetadata->id)));
718 	}
719 
720 	while (readState->currentStripeMetadata &&
721 		   StripeWriteState(readState->currentStripeMetadata) != STRIPE_WRITE_FLUSHED)
722 	{
723 		readState->currentStripeMetadata =
724 			FindNextStripeByRowNumber(readState->relation,
725 									  readState->currentStripeMetadata->firstRowNumber,
726 									  readState->snapshot);
727 	}
728 
729 	readState->stripeReadState = NULL;
730 	MemoryContextReset(readState->stripeReadContext);
731 
732 	MemoryContextSwitchTo(oldContext);
733 }
734 
735 
736 /*
737  * SnapshotMightSeeUnflushedStripes returns true if given snapshot is
738  * expected to see un-flushed stripes either because of other backends'
739  * pending writes or aborted transactions.
740  */
741 static bool
SnapshotMightSeeUnflushedStripes(Snapshot snapshot)742 SnapshotMightSeeUnflushedStripes(Snapshot snapshot)
743 {
744 	if (snapshot == InvalidSnapshot)
745 	{
746 		return false;
747 	}
748 
749 	switch (snapshot->snapshot_type)
750 	{
751 		case SNAPSHOT_ANY:
752 		case SNAPSHOT_DIRTY:
753 		case SNAPSHOT_NON_VACUUMABLE:
754 		{
755 			return true;
756 		}
757 
758 		default:
759 			return false;
760 	}
761 }
762 
763 
764 /*
765  * ReadStripeNextRow: If more rows can be read from the current stripe, fill
766  * in non-NULL columnValues and return true. Otherwise, return false.
767  *
768  * On entry, all entries in columnNulls should be true; this function only
769  * sets non-NULL entries.
770  *
771  */
772 static bool
ReadStripeNextRow(StripeReadState * stripeReadState,Datum * columnValues,bool * columnNulls)773 ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
774 				  bool *columnNulls)
775 {
776 	if (stripeReadState->currentRow >= stripeReadState->rowCount)
777 	{
778 		Assert(stripeReadState->currentRow == stripeReadState->rowCount);
779 		return false;
780 	}
781 
782 	while (true)
783 	{
784 		if (stripeReadState->chunkGroupReadState == NULL)
785 		{
786 			stripeReadState->chunkGroupReadState = BeginChunkGroupRead(
787 				stripeReadState->stripeBuffers,
788 				stripeReadState->
789 				chunkGroupIndex,
790 				stripeReadState->
791 				tupleDescriptor,
792 				stripeReadState->
793 				projectedColumnList,
794 				stripeReadState->
795 				stripeReadContext);
796 		}
797 
798 		if (!ReadChunkGroupNextRow(stripeReadState->chunkGroupReadState, columnValues,
799 								   columnNulls))
800 		{
801 			/* if this chunk group is exhausted, fetch the next one and loop */
802 			EndChunkGroupRead(stripeReadState->chunkGroupReadState);
803 			stripeReadState->chunkGroupReadState = NULL;
804 			stripeReadState->chunkGroupIndex++;
805 			continue;
806 		}
807 
808 		stripeReadState->currentRow++;
809 		return true;
810 	}
811 
812 	Assert(stripeReadState->currentRow == stripeReadState->rowCount);
813 	return false;
814 }
815 
816 
817 /*
818  * BeginChunkGroupRead allocates state for reading a chunk.
819  */
820 static ChunkGroupReadState *
BeginChunkGroupRead(StripeBuffers * stripeBuffers,int chunkIndex,TupleDesc tupleDesc,List * projectedColumnList,MemoryContext cxt)821 BeginChunkGroupRead(StripeBuffers *stripeBuffers, int chunkIndex, TupleDesc tupleDesc,
822 					List *projectedColumnList, MemoryContext cxt)
823 {
824 	uint32 chunkGroupRowCount =
825 		stripeBuffers->selectedChunkGroupRowCounts[chunkIndex];
826 
827 	MemoryContext oldContext = MemoryContextSwitchTo(cxt);
828 
829 	ChunkGroupReadState *chunkGroupReadState = palloc0(sizeof(ChunkGroupReadState));
830 
831 	chunkGroupReadState->currentRow = 0;
832 	chunkGroupReadState->rowCount = chunkGroupRowCount;
833 	chunkGroupReadState->columnCount = tupleDesc->natts;
834 	chunkGroupReadState->projectedColumnList = projectedColumnList;
835 
836 	chunkGroupReadState->chunkGroupData = DeserializeChunkData(stripeBuffers, chunkIndex,
837 															   chunkGroupRowCount,
838 															   tupleDesc,
839 															   projectedColumnList);
840 	MemoryContextSwitchTo(oldContext);
841 
842 	return chunkGroupReadState;
843 }
844 
845 
846 /*
847  * EndChunkRead finishes a chunk read.
848  */
849 static void
EndChunkGroupRead(ChunkGroupReadState * chunkGroupReadState)850 EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState)
851 {
852 	FreeChunkData(chunkGroupReadState->chunkGroupData);
853 	pfree(chunkGroupReadState);
854 }
855 
856 
857 /*
858  * ReadChunkGroupNextRow: if more rows can be read from the current chunk
859  * group, fill in non-NULL columnValues and return true. Otherwise, return
860  * false.
861  *
862  * On entry, all entries in columnNulls should be true; this function only
863  * sets non-NULL entries.
864  */
865 static bool
ReadChunkGroupNextRow(ChunkGroupReadState * chunkGroupReadState,Datum * columnValues,bool * columnNulls)866 ReadChunkGroupNextRow(ChunkGroupReadState *chunkGroupReadState, Datum *columnValues,
867 					  bool *columnNulls)
868 {
869 	if (chunkGroupReadState->currentRow >= chunkGroupReadState->rowCount)
870 	{
871 		Assert(chunkGroupReadState->currentRow == chunkGroupReadState->rowCount);
872 		return false;
873 	}
874 
875 	/*
876 	 * Initialize to all-NULL. Only non-NULL projected attributes will be set.
877 	 */
878 	memset(columnNulls, true, sizeof(bool) * chunkGroupReadState->columnCount);
879 
880 	int attno;
881 	foreach_int(attno, chunkGroupReadState->projectedColumnList)
882 	{
883 		const ChunkData *chunkGroupData = chunkGroupReadState->chunkGroupData;
884 		const int rowIndex = chunkGroupReadState->currentRow;
885 
886 		/* attno is 1-indexed; existsArray is 0-indexed */
887 		const uint32 columnIndex = attno - 1;
888 
889 		if (chunkGroupData->existsArray[columnIndex][rowIndex])
890 		{
891 			columnValues[columnIndex] = chunkGroupData->valueArray[columnIndex][rowIndex];
892 			columnNulls[columnIndex] = false;
893 		}
894 	}
895 
896 	chunkGroupReadState->currentRow++;
897 	return true;
898 }
899 
900 
901 /*
902  * ColumnarReadChunkGroupsFiltered
903  *
904  * Return the number of chunk groups filtered during this read operation.
905  */
906 int64
ColumnarReadChunkGroupsFiltered(ColumnarReadState * state)907 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state)
908 {
909 	return state->chunkGroupsFiltered;
910 }
911 
912 
913 /*
914  * CreateEmptyChunkDataArray creates data buffers to keep deserialized exist and
915  * value arrays for requested columns in columnMask.
916  */
917 ChunkData *
CreateEmptyChunkData(uint32 columnCount,bool * columnMask,uint32 chunkGroupRowCount)918 CreateEmptyChunkData(uint32 columnCount, bool *columnMask, uint32 chunkGroupRowCount)
919 {
920 	uint32 columnIndex = 0;
921 
922 	ChunkData *chunkData = palloc0(sizeof(ChunkData));
923 	chunkData->existsArray = palloc0(columnCount * sizeof(bool *));
924 	chunkData->valueArray = palloc0(columnCount * sizeof(Datum *));
925 	chunkData->valueBufferArray = palloc0(columnCount * sizeof(StringInfo));
926 	chunkData->columnCount = columnCount;
927 	chunkData->rowCount = chunkGroupRowCount;
928 
929 	/* allocate chunk memory for deserialized data */
930 	for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
931 	{
932 		if (columnMask[columnIndex])
933 		{
934 			chunkData->existsArray[columnIndex] = palloc0(chunkGroupRowCount *
935 														  sizeof(bool));
936 			chunkData->valueArray[columnIndex] = palloc0(chunkGroupRowCount *
937 														 sizeof(Datum));
938 			chunkData->valueBufferArray[columnIndex] = NULL;
939 		}
940 	}
941 
942 	return chunkData;
943 }
944 
945 
946 /*
947  * FreeChunkData deallocates data buffers to keep deserialized exist and
948  * value arrays for requested columns in columnMask.
949  * ColumnChunkData->serializedValueBuffer lives in memory read/write context
950  * so it is deallocated automatically when the context is deleted.
951  */
952 void
FreeChunkData(ChunkData * chunkData)953 FreeChunkData(ChunkData *chunkData)
954 {
955 	uint32 columnIndex = 0;
956 
957 	if (chunkData == NULL)
958 	{
959 		return;
960 	}
961 
962 	for (columnIndex = 0; columnIndex < chunkData->columnCount; columnIndex++)
963 	{
964 		if (chunkData->existsArray[columnIndex] != NULL)
965 		{
966 			pfree(chunkData->existsArray[columnIndex]);
967 		}
968 
969 		if (chunkData->valueArray[columnIndex] != NULL)
970 		{
971 			pfree(chunkData->valueArray[columnIndex]);
972 		}
973 	}
974 
975 	pfree(chunkData->existsArray);
976 	pfree(chunkData->valueArray);
977 	pfree(chunkData);
978 }
979 
980 
981 /* ColumnarTableRowCount returns the exact row count of a table using skiplists */
982 uint64
ColumnarTableRowCount(Relation relation)983 ColumnarTableRowCount(Relation relation)
984 {
985 	ListCell *stripeMetadataCell = NULL;
986 	uint64 totalRowCount = 0;
987 	List *stripeList = StripesForRelfilenode(relation->rd_node);
988 
989 	foreach(stripeMetadataCell, stripeList)
990 	{
991 		StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
992 		totalRowCount += stripeMetadata->rowCount;
993 	}
994 
995 	return totalRowCount;
996 }
997 
998 
999 /*
1000  * LoadFilteredStripeBuffers reads serialized stripe data from the given file.
1001  * The function skips over chunks whose rows are refuted by restriction qualifiers,
1002  * and only loads columns that are projected in the query.
1003  */
1004 static StripeBuffers *
LoadFilteredStripeBuffers(Relation relation,StripeMetadata * stripeMetadata,TupleDesc tupleDescriptor,List * projectedColumnList,List * whereClauseList,List * whereClauseVars,int64 * chunkGroupsFiltered,Snapshot snapshot)1005 LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
1006 						  TupleDesc tupleDescriptor, List *projectedColumnList,
1007 						  List *whereClauseList, List *whereClauseVars,
1008 						  int64 *chunkGroupsFiltered, Snapshot snapshot)
1009 {
1010 	uint32 columnIndex = 0;
1011 	uint32 columnCount = tupleDescriptor->natts;
1012 
1013 	bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
1014 
1015 	StripeSkipList *stripeSkipList = ReadStripeSkipList(relation->rd_node,
1016 														stripeMetadata->id,
1017 														tupleDescriptor,
1018 														stripeMetadata->chunkCount,
1019 														snapshot);
1020 
1021 	bool *selectedChunkMask = SelectedChunkMask(stripeSkipList, whereClauseList,
1022 												whereClauseVars, chunkGroupsFiltered);
1023 
1024 	StripeSkipList *selectedChunkSkipList =
1025 		SelectedChunkSkipList(stripeSkipList, projectedColumnMask,
1026 							  selectedChunkMask);
1027 
1028 	/* load column data for projected columns */
1029 	ColumnBuffers **columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
1030 
1031 	for (columnIndex = 0; columnIndex < stripeMetadata->columnCount; columnIndex++)
1032 	{
1033 		if (projectedColumnMask[columnIndex])
1034 		{
1035 			ColumnChunkSkipNode *chunkSkipNode =
1036 				selectedChunkSkipList->chunkSkipNodeArray[columnIndex];
1037 			Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
1038 			uint32 chunkCount = selectedChunkSkipList->chunkCount;
1039 
1040 			ColumnBuffers *columnBuffers = LoadColumnBuffers(relation, chunkSkipNode,
1041 															 chunkCount,
1042 															 stripeMetadata->fileOffset,
1043 															 attributeForm);
1044 
1045 			columnBuffersArray[columnIndex] = columnBuffers;
1046 		}
1047 	}
1048 
1049 	StripeBuffers *stripeBuffers = palloc0(sizeof(StripeBuffers));
1050 	stripeBuffers->columnCount = columnCount;
1051 	stripeBuffers->rowCount = StripeSkipListRowCount(selectedChunkSkipList);
1052 	stripeBuffers->columnBuffersArray = columnBuffersArray;
1053 	stripeBuffers->selectedChunkGroupRowCounts =
1054 		selectedChunkSkipList->chunkGroupRowCounts;
1055 
1056 	return stripeBuffers;
1057 }
1058 
1059 
1060 /*
1061  * LoadColumnBuffers reads serialized column data from the given file. These
1062  * column data are laid out as sequential chunks in the file; and chunk positions
1063  * and lengths are retrieved from the column chunk skip node array.
1064  */
1065 static ColumnBuffers *
LoadColumnBuffers(Relation relation,ColumnChunkSkipNode * chunkSkipNodeArray,uint32 chunkCount,uint64 stripeOffset,Form_pg_attribute attributeForm)1066 LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray,
1067 				  uint32 chunkCount, uint64 stripeOffset,
1068 				  Form_pg_attribute attributeForm)
1069 {
1070 	uint32 chunkIndex = 0;
1071 	ColumnChunkBuffers **chunkBuffersArray =
1072 		palloc0(chunkCount * sizeof(ColumnChunkBuffers *));
1073 
1074 	for (chunkIndex = 0; chunkIndex < chunkCount; chunkIndex++)
1075 	{
1076 		chunkBuffersArray[chunkIndex] = palloc0(sizeof(ColumnChunkBuffers));
1077 	}
1078 
1079 	/*
1080 	 * We first read the "exists" chunks. We don't read "values" array here,
1081 	 * because "exists" chunks are stored sequentially on disk, and we want to
1082 	 * minimize disk seeks.
1083 	 */
1084 	for (chunkIndex = 0; chunkIndex < chunkCount; chunkIndex++)
1085 	{
1086 		ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex];
1087 		uint64 existsOffset = stripeOffset + chunkSkipNode->existsChunkOffset;
1088 		StringInfo rawExistsBuffer = makeStringInfo();
1089 
1090 		enlargeStringInfo(rawExistsBuffer, chunkSkipNode->existsLength);
1091 		rawExistsBuffer->len = chunkSkipNode->existsLength;
1092 		ColumnarStorageRead(relation, existsOffset, rawExistsBuffer->data,
1093 							chunkSkipNode->existsLength);
1094 
1095 		chunkBuffersArray[chunkIndex]->existsBuffer = rawExistsBuffer;
1096 	}
1097 
1098 	/* then read "values" chunks, which are also stored sequentially on disk */
1099 	for (chunkIndex = 0; chunkIndex < chunkCount; chunkIndex++)
1100 	{
1101 		ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex];
1102 		CompressionType compressionType = chunkSkipNode->valueCompressionType;
1103 		uint64 valueOffset = stripeOffset + chunkSkipNode->valueChunkOffset;
1104 		StringInfo rawValueBuffer = makeStringInfo();
1105 
1106 		enlargeStringInfo(rawValueBuffer, chunkSkipNode->valueLength);
1107 		rawValueBuffer->len = chunkSkipNode->valueLength;
1108 		ColumnarStorageRead(relation, valueOffset, rawValueBuffer->data,
1109 							chunkSkipNode->valueLength);
1110 
1111 		chunkBuffersArray[chunkIndex]->valueBuffer = rawValueBuffer;
1112 		chunkBuffersArray[chunkIndex]->valueCompressionType = compressionType;
1113 		chunkBuffersArray[chunkIndex]->decompressedValueSize =
1114 			chunkSkipNode->decompressedValueSize;
1115 	}
1116 
1117 	ColumnBuffers *columnBuffers = palloc0(sizeof(ColumnBuffers));
1118 	columnBuffers->chunkBuffersArray = chunkBuffersArray;
1119 
1120 	return columnBuffers;
1121 }
1122 
1123 
1124 /*
1125  * SelectedChunkMask walks over each column's chunks and checks if a chunk can
1126  * be filtered without reading its data. The filtering happens when all rows in
1127  * the chunk can be refuted by the given qualifier conditions.
1128  */
1129 static bool *
SelectedChunkMask(StripeSkipList * stripeSkipList,List * whereClauseList,List * whereClauseVars,int64 * chunkGroupsFiltered)1130 SelectedChunkMask(StripeSkipList *stripeSkipList, List *whereClauseList,
1131 				  List *whereClauseVars, int64 *chunkGroupsFiltered)
1132 {
1133 	ListCell *columnCell = NULL;
1134 	uint32 chunkIndex = 0;
1135 
1136 	bool *selectedChunkMask = palloc0(stripeSkipList->chunkCount * sizeof(bool));
1137 	memset(selectedChunkMask, true, stripeSkipList->chunkCount * sizeof(bool));
1138 
1139 	foreach(columnCell, whereClauseVars)
1140 	{
1141 		Var *column = lfirst(columnCell);
1142 		uint32 columnIndex = column->varattno - 1;
1143 
1144 		/* if this column's data type doesn't have a comparator, skip it */
1145 		FmgrInfo *comparisonFunction = GetFunctionInfoOrNull(column->vartype,
1146 															 BTREE_AM_OID,
1147 															 BTORDER_PROC);
1148 		if (comparisonFunction == NULL)
1149 		{
1150 			continue;
1151 		}
1152 
1153 		Node *baseConstraint = BuildBaseConstraint(column);
1154 		for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1155 		{
1156 			ColumnChunkSkipNode *chunkSkipNodeArray =
1157 				stripeSkipList->chunkSkipNodeArray[columnIndex];
1158 			ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex];
1159 
1160 			/*
1161 			 * A column chunk with comparable data type can miss min/max values
1162 			 * if all values in the chunk are NULL.
1163 			 */
1164 			if (!chunkSkipNode->hasMinMax)
1165 			{
1166 				continue;
1167 			}
1168 
1169 			UpdateConstraint(baseConstraint, chunkSkipNode->minimumValue,
1170 							 chunkSkipNode->maximumValue);
1171 
1172 			List *constraintList = list_make1(baseConstraint);
1173 			bool predicateRefuted =
1174 				predicate_refuted_by(constraintList, whereClauseList, false);
1175 			if (predicateRefuted && selectedChunkMask[chunkIndex])
1176 			{
1177 				selectedChunkMask[chunkIndex] = false;
1178 				*chunkGroupsFiltered += 1;
1179 			}
1180 		}
1181 	}
1182 
1183 	return selectedChunkMask;
1184 }
1185 
1186 
1187 /*
1188  * GetFunctionInfoOrNull first resolves the operator for the given data type,
1189  * access method, and support procedure. The function then uses the resolved
1190  * operator's identifier to fill in a function manager object, and returns
1191  * this object. This function is based on a similar function from CitusDB's code.
1192  */
1193 FmgrInfo *
GetFunctionInfoOrNull(Oid typeId,Oid accessMethodId,int16 procedureId)1194 GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, int16 procedureId)
1195 {
1196 	FmgrInfo *functionInfo = NULL;
1197 
1198 	/* get default operator class from pg_opclass for datum type */
1199 	Oid operatorClassId = GetDefaultOpClass(typeId, accessMethodId);
1200 	if (operatorClassId == InvalidOid)
1201 	{
1202 		return NULL;
1203 	}
1204 
1205 	Oid operatorFamilyId = get_opclass_family(operatorClassId);
1206 	if (operatorFamilyId == InvalidOid)
1207 	{
1208 		return NULL;
1209 	}
1210 
1211 	Oid operatorId = get_opfamily_proc(operatorFamilyId, typeId, typeId, procedureId);
1212 	if (operatorId != InvalidOid)
1213 	{
1214 		functionInfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
1215 
1216 		/* fill in the FmgrInfo struct using the operatorId */
1217 		fmgr_info(operatorId, functionInfo);
1218 	}
1219 
1220 	return functionInfo;
1221 }
1222 
1223 
1224 /*
1225  * BuildBaseConstraint builds and returns a base constraint. This constraint
1226  * implements an expression in the form of (var <= max && var >= min), where
1227  * min and max values represent a chunk's min and max values. These chunk
1228  * values are filled in after the constraint is built. This function is based
1229  * on a similar function from CitusDB's shard pruning logic.
1230  */
1231 static Node *
BuildBaseConstraint(Var * variable)1232 BuildBaseConstraint(Var *variable)
1233 {
1234 	OpExpr *lessThanExpr = MakeOpExpression(variable, BTLessEqualStrategyNumber);
1235 	OpExpr *greaterThanExpr = MakeOpExpression(variable, BTGreaterEqualStrategyNumber);
1236 
1237 	Node *baseConstraint = make_and_qual((Node *) lessThanExpr, (Node *) greaterThanExpr);
1238 
1239 	return baseConstraint;
1240 }
1241 
1242 
1243 /*
1244  * GetClauseVars extracts the Vars from the given clauses for the purpose of
1245  * building constraints that can be refuted by predicate_refuted_by(). It also
1246  * deduplicates and sorts them.
1247  */
1248 static List *
GetClauseVars(List * whereClauseList,int natts)1249 GetClauseVars(List *whereClauseList, int natts)
1250 {
1251 	/*
1252 	 * We don't recurse into or include aggregates, window functions, or
1253 	 * PHVs. We don't expect any PHVs during execution; and Vars found inside
1254 	 * an aggregate or window function aren't going to be useful in forming
1255 	 * constraints that can be refuted.
1256 	 */
1257 	int flags = 0;
1258 	List *vars = pull_var_clause((Node *) whereClauseList, flags);
1259 	Var **deduplicate = palloc0(sizeof(Var *) * natts);
1260 
1261 	ListCell *lc;
1262 	foreach(lc, vars)
1263 	{
1264 		Node *node = lfirst(lc);
1265 		Assert(IsA(node, Var));
1266 
1267 		Var *var = (Var *) node;
1268 		int idx = var->varattno - 1;
1269 
1270 		if (deduplicate[idx] != NULL)
1271 		{
1272 			/* if they have the same varattno, the rest should be identical */
1273 			Assert(equal(var, deduplicate[idx]));
1274 		}
1275 
1276 		deduplicate[idx] = var;
1277 	}
1278 
1279 	List *whereClauseVars = NIL;
1280 	for (int i = 0; i < natts; i++)
1281 	{
1282 		Var *var = deduplicate[i];
1283 		if (var != NULL)
1284 		{
1285 			whereClauseVars = lappend(whereClauseVars, var);
1286 		}
1287 	}
1288 
1289 	pfree(deduplicate);
1290 
1291 	return whereClauseVars;
1292 }
1293 
1294 
1295 /*
1296  * MakeOpExpression builds an operator expression node. This operator expression
1297  * implements the operator clause as defined by the variable and the strategy
1298  * number. The function is copied from CitusDB's shard pruning logic.
1299  */
1300 static OpExpr *
MakeOpExpression(Var * variable,int16 strategyNumber)1301 MakeOpExpression(Var *variable, int16 strategyNumber)
1302 {
1303 	Oid typeId = variable->vartype;
1304 	Oid typeModId = variable->vartypmod;
1305 	Oid collationId = variable->varcollid;
1306 
1307 	Oid accessMethodId = BTREE_AM_OID;
1308 
1309 	/* Load the operator from system catalogs */
1310 	Oid operatorId = GetOperatorByType(typeId, accessMethodId, strategyNumber);
1311 
1312 	Const *constantValue = makeNullConst(typeId, typeModId, collationId);
1313 
1314 	/* Now make the expression with the given variable and a null constant */
1315 	OpExpr *expression = (OpExpr *) make_opclause(operatorId,
1316 												  InvalidOid, /* no result type yet */
1317 												  false, /* no return set */
1318 												  (Expr *) variable,
1319 												  (Expr *) constantValue,
1320 												  InvalidOid, collationId);
1321 
1322 	/* Set implementing function id and result type */
1323 	expression->opfuncid = get_opcode(operatorId);
1324 	expression->opresulttype = get_func_rettype(expression->opfuncid);
1325 
1326 	return expression;
1327 }
1328 
1329 
1330 /*
1331  * GetOperatorByType returns operator Oid for the given type, access method,
1332  * and strategy number. Note that this function incorrectly errors out when
1333  * the given type doesn't have its own operator but can use another compatible
1334  * type's default operator. The function is copied from CitusDB's shard pruning
1335  * logic.
1336  */
1337 static Oid
GetOperatorByType(Oid typeId,Oid accessMethodId,int16 strategyNumber)1338 GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber)
1339 {
1340 	/* Get default operator class from pg_opclass */
1341 	Oid operatorClassId = GetDefaultOpClass(typeId, accessMethodId);
1342 
1343 	Oid operatorFamily = get_opclass_family(operatorClassId);
1344 
1345 	Oid operatorId = get_opfamily_member(operatorFamily, typeId, typeId, strategyNumber);
1346 
1347 	return operatorId;
1348 }
1349 
1350 
1351 /*
1352  * UpdateConstraint updates the base constraint with the given min/max values.
1353  * The function is copied from CitusDB's shard pruning logic.
1354  */
1355 static void
UpdateConstraint(Node * baseConstraint,Datum minValue,Datum maxValue)1356 UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue)
1357 {
1358 	BoolExpr *andExpr = (BoolExpr *) baseConstraint;
1359 	Node *lessThanExpr = (Node *) linitial(andExpr->args);
1360 	Node *greaterThanExpr = (Node *) lsecond(andExpr->args);
1361 
1362 	Node *minNode = get_rightop((Expr *) greaterThanExpr);
1363 	Node *maxNode = get_rightop((Expr *) lessThanExpr);
1364 
1365 	Assert(IsA(minNode, Const));
1366 	Assert(IsA(maxNode, Const));
1367 
1368 	Const *minConstant = (Const *) minNode;
1369 	Const *maxConstant = (Const *) maxNode;
1370 
1371 	minConstant->constvalue = minValue;
1372 	maxConstant->constvalue = maxValue;
1373 
1374 	minConstant->constisnull = false;
1375 	maxConstant->constisnull = false;
1376 
1377 	minConstant->constbyval = true;
1378 	maxConstant->constbyval = true;
1379 }
1380 
1381 
1382 /*
1383  * SelectedChunkSkipList constructs a new StripeSkipList in which the
1384  * non-selected chunks are removed from the given stripeSkipList.
1385  */
1386 static StripeSkipList *
SelectedChunkSkipList(StripeSkipList * stripeSkipList,bool * projectedColumnMask,bool * selectedChunkMask)1387 SelectedChunkSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
1388 					  bool *selectedChunkMask)
1389 {
1390 	uint32 selectedChunkCount = 0;
1391 	uint32 chunkIndex = 0;
1392 	uint32 columnIndex = 0;
1393 	uint32 columnCount = stripeSkipList->columnCount;
1394 	uint32 selectedChunkIndex = 0;
1395 
1396 	for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1397 	{
1398 		if (selectedChunkMask[chunkIndex])
1399 		{
1400 			selectedChunkCount++;
1401 		}
1402 	}
1403 
1404 	ColumnChunkSkipNode **selectedChunkSkipNodeArray =
1405 		palloc0(columnCount * sizeof(ColumnChunkSkipNode *));
1406 
1407 	for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
1408 	{
1409 		bool firstColumn = columnIndex == 0;
1410 		selectedChunkIndex = 0;
1411 
1412 		/* first column's chunk skip node is always read */
1413 		if (!projectedColumnMask[columnIndex] && !firstColumn)
1414 		{
1415 			selectedChunkSkipNodeArray[columnIndex] = NULL;
1416 			continue;
1417 		}
1418 
1419 		Assert(stripeSkipList->chunkSkipNodeArray[columnIndex] != NULL);
1420 
1421 		selectedChunkSkipNodeArray[columnIndex] = palloc0(selectedChunkCount *
1422 														  sizeof(ColumnChunkSkipNode));
1423 
1424 		for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1425 		{
1426 			if (selectedChunkMask[chunkIndex])
1427 			{
1428 				selectedChunkSkipNodeArray[columnIndex][selectedChunkIndex] =
1429 					stripeSkipList->chunkSkipNodeArray[columnIndex][chunkIndex];
1430 				selectedChunkIndex++;
1431 			}
1432 		}
1433 	}
1434 
1435 	selectedChunkIndex = 0;
1436 	uint32 *chunkGroupRowCounts = palloc0(selectedChunkCount * sizeof(uint32));
1437 	for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1438 	{
1439 		if (selectedChunkMask[chunkIndex])
1440 		{
1441 			chunkGroupRowCounts[selectedChunkIndex++] =
1442 				stripeSkipList->chunkGroupRowCounts[chunkIndex];
1443 		}
1444 	}
1445 
1446 	StripeSkipList *selectedChunkSkipList = palloc0(sizeof(StripeSkipList));
1447 	selectedChunkSkipList->chunkSkipNodeArray = selectedChunkSkipNodeArray;
1448 	selectedChunkSkipList->chunkCount = selectedChunkCount;
1449 	selectedChunkSkipList->columnCount = stripeSkipList->columnCount;
1450 	selectedChunkSkipList->chunkGroupRowCounts = chunkGroupRowCounts;
1451 
1452 	return selectedChunkSkipList;
1453 }
1454 
1455 
1456 /*
1457  * StripeSkipListRowCount counts the number of rows in the given stripeSkipList.
1458  * To do this, the function finds the first column, and sums up row counts across
1459  * all chunks for that column.
1460  */
1461 static uint32
StripeSkipListRowCount(StripeSkipList * stripeSkipList)1462 StripeSkipListRowCount(StripeSkipList *stripeSkipList)
1463 {
1464 	uint32 stripeSkipListRowCount = 0;
1465 	uint32 chunkIndex = 0;
1466 	uint32 *chunkGroupRowCounts = stripeSkipList->chunkGroupRowCounts;
1467 
1468 	for (chunkIndex = 0; chunkIndex < stripeSkipList->chunkCount; chunkIndex++)
1469 	{
1470 		uint32 chunkGroupRowCount = chunkGroupRowCounts[chunkIndex];
1471 		stripeSkipListRowCount += chunkGroupRowCount;
1472 	}
1473 
1474 	return stripeSkipListRowCount;
1475 }
1476 
1477 
1478 /*
1479  * ProjectedColumnMask returns a boolean array in which the projected columns
1480  * from the projected column list are marked as true.
1481  */
1482 static bool *
ProjectedColumnMask(uint32 columnCount,List * projectedColumnList)1483 ProjectedColumnMask(uint32 columnCount, List *projectedColumnList)
1484 {
1485 	bool *projectedColumnMask = palloc0(columnCount * sizeof(bool));
1486 	int attno;
1487 
1488 	foreach_int(attno, projectedColumnList)
1489 	{
1490 		/* attno is 1-indexed; projectedColumnMask is 0-indexed */
1491 		int columnIndex = attno - 1;
1492 		projectedColumnMask[columnIndex] = true;
1493 	}
1494 
1495 	return projectedColumnMask;
1496 }
1497 
1498 
1499 /*
1500  * DeserializeBoolArray reads an array of bits from the given buffer and stores
1501  * it in provided bool array.
1502  */
1503 static void
DeserializeBoolArray(StringInfo boolArrayBuffer,bool * boolArray,uint32 boolArrayLength)1504 DeserializeBoolArray(StringInfo boolArrayBuffer, bool *boolArray,
1505 					 uint32 boolArrayLength)
1506 {
1507 	uint32 boolArrayIndex = 0;
1508 
1509 	uint32 maximumBoolCount = boolArrayBuffer->len * 8;
1510 	if (boolArrayLength > maximumBoolCount)
1511 	{
1512 		ereport(ERROR, (errmsg("insufficient data for reading boolean array")));
1513 	}
1514 
1515 	for (boolArrayIndex = 0; boolArrayIndex < boolArrayLength; boolArrayIndex++)
1516 	{
1517 		uint32 byteIndex = boolArrayIndex / 8;
1518 		uint32 bitIndex = boolArrayIndex % 8;
1519 		uint8 bitmask = (1 << bitIndex);
1520 
1521 		uint8 shiftedBit = (boolArrayBuffer->data[byteIndex] & bitmask);
1522 		if (shiftedBit == 0)
1523 		{
1524 			boolArray[boolArrayIndex] = false;
1525 		}
1526 		else
1527 		{
1528 			boolArray[boolArrayIndex] = true;
1529 		}
1530 	}
1531 }
1532 
1533 
1534 /*
1535  * DeserializeDatumArray reads an array of datums from the given buffer and stores
1536  * them in provided datumArray. If a value is marked as false in the exists array,
1537  * the function assumes that the datum isn't in the buffer, and simply skips it.
1538  */
1539 static void
DeserializeDatumArray(StringInfo datumBuffer,bool * existsArray,uint32 datumCount,bool datumTypeByValue,int datumTypeLength,char datumTypeAlign,Datum * datumArray)1540 DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCount,
1541 					  bool datumTypeByValue, int datumTypeLength,
1542 					  char datumTypeAlign, Datum *datumArray)
1543 {
1544 	uint32 datumIndex = 0;
1545 	uint32 currentDatumDataOffset = 0;
1546 
1547 	for (datumIndex = 0; datumIndex < datumCount; datumIndex++)
1548 	{
1549 		if (!existsArray[datumIndex])
1550 		{
1551 			continue;
1552 		}
1553 
1554 		char *currentDatumDataPointer = datumBuffer->data + currentDatumDataOffset;
1555 
1556 		datumArray[datumIndex] = fetch_att(currentDatumDataPointer, datumTypeByValue,
1557 										   datumTypeLength);
1558 		currentDatumDataOffset = att_addlength_datum(currentDatumDataOffset,
1559 													 datumTypeLength,
1560 													 currentDatumDataPointer);
1561 		currentDatumDataOffset = att_align_nominal(currentDatumDataOffset,
1562 												   datumTypeAlign);
1563 
1564 		if (currentDatumDataOffset > datumBuffer->len)
1565 		{
1566 			ereport(ERROR, (errmsg("insufficient data left in datum buffer")));
1567 		}
1568 	}
1569 }
1570 
1571 
1572 /*
1573  * DeserializeChunkGroupData deserializes requested data chunk for all columns and
1574  * stores in chunkDataArray. It uncompresses serialized data if necessary. The
1575  * function also deallocates data buffers used for previous chunk, and compressed
1576  * data buffers for the current chunk which will not be needed again. If a column
1577  * data is not present serialized buffer, then default value (or null) is used
1578  * to fill value array.
1579  */
1580 static ChunkData *
DeserializeChunkData(StripeBuffers * stripeBuffers,uint64 chunkIndex,uint32 rowCount,TupleDesc tupleDescriptor,List * projectedColumnList)1581 DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
1582 					 uint32 rowCount, TupleDesc tupleDescriptor,
1583 					 List *projectedColumnList)
1584 {
1585 	int columnIndex = 0;
1586 	bool *columnMask = ProjectedColumnMask(tupleDescriptor->natts, projectedColumnList);
1587 	ChunkData *chunkData = CreateEmptyChunkData(tupleDescriptor->natts, columnMask,
1588 												rowCount);
1589 
1590 	for (columnIndex = 0; columnIndex < stripeBuffers->columnCount; columnIndex++)
1591 	{
1592 		Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, columnIndex);
1593 		ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
1594 		bool columnAdded = false;
1595 
1596 		if (columnBuffers == NULL && columnMask[columnIndex])
1597 		{
1598 			columnAdded = true;
1599 		}
1600 
1601 		if (columnBuffers != NULL)
1602 		{
1603 			ColumnChunkBuffers *chunkBuffers =
1604 				columnBuffers->chunkBuffersArray[chunkIndex];
1605 
1606 			/* decompress and deserialize current chunk's data */
1607 			StringInfo valueBuffer =
1608 				DecompressBuffer(chunkBuffers->valueBuffer,
1609 								 chunkBuffers->valueCompressionType,
1610 								 chunkBuffers->decompressedValueSize);
1611 
1612 			DeserializeBoolArray(chunkBuffers->existsBuffer,
1613 								 chunkData->existsArray[columnIndex],
1614 								 rowCount);
1615 			DeserializeDatumArray(valueBuffer, chunkData->existsArray[columnIndex],
1616 								  rowCount, attributeForm->attbyval,
1617 								  attributeForm->attlen, attributeForm->attalign,
1618 								  chunkData->valueArray[columnIndex]);
1619 
1620 			/* store current chunk's data buffer to be freed at next chunk read */
1621 			chunkData->valueBufferArray[columnIndex] = valueBuffer;
1622 		}
1623 		else if (columnAdded)
1624 		{
1625 			/*
1626 			 * This is a column that was added after creation of this stripe.
1627 			 * So we use either the default value or NULL.
1628 			 */
1629 			if (attributeForm->atthasdef)
1630 			{
1631 				int rowIndex = 0;
1632 
1633 				Datum defaultValue = ColumnDefaultValue(tupleDescriptor->constr,
1634 														attributeForm);
1635 
1636 				for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
1637 				{
1638 					chunkData->existsArray[columnIndex][rowIndex] = true;
1639 					chunkData->valueArray[columnIndex][rowIndex] = defaultValue;
1640 				}
1641 			}
1642 			else
1643 			{
1644 				memset(chunkData->existsArray[columnIndex], false,
1645 					   rowCount * sizeof(bool));
1646 			}
1647 		}
1648 	}
1649 
1650 	return chunkData;
1651 }
1652 
1653 
1654 /*
1655  * ColumnDefaultValue returns default value for given column. Only const values
1656  * are supported. The function errors on any other default value expressions.
1657  */
1658 static Datum
ColumnDefaultValue(TupleConstr * tupleConstraints,Form_pg_attribute attributeForm)1659 ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeForm)
1660 {
1661 	Node *defaultValueNode = NULL;
1662 	int defValIndex = 0;
1663 
1664 	for (defValIndex = 0; defValIndex < tupleConstraints->num_defval; defValIndex++)
1665 	{
1666 		AttrDefault attrDefault = tupleConstraints->defval[defValIndex];
1667 		if (attrDefault.adnum == attributeForm->attnum)
1668 		{
1669 			defaultValueNode = stringToNode(attrDefault.adbin);
1670 			break;
1671 		}
1672 	}
1673 
1674 	Assert(defaultValueNode != NULL);
1675 
1676 	/* try reducing the default value node to a const node */
1677 	defaultValueNode = eval_const_expressions(NULL, defaultValueNode);
1678 	if (IsA(defaultValueNode, Const))
1679 	{
1680 		Const *constNode = (Const *) defaultValueNode;
1681 		return constNode->constvalue;
1682 	}
1683 	else
1684 	{
1685 		const char *columnName = NameStr(attributeForm->attname);
1686 		ereport(ERROR, (errmsg("unsupported default value for column \"%s\"", columnName),
1687 						errhint("Expression is either mutable or "
1688 								"does not evaluate to constant value")));
1689 	}
1690 }
1691