1 /*-------------------------------------------------------------------------
2  *
3  * heaptoast.c
4  *	  Heap-specific definitions for external and compressed storage
5  *	  of variable size attributes.
6  *
7  * Copyright (c) 2000-2020, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/heap/heaptoast.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *		heap_toast_insert_or_update -
16  *			Try to make a given tuple fit into one page by compressing
17  *			or moving off attributes
18  *
19  *		heap_toast_delete -
20  *			Reclaim toast storage when a tuple is deleted
21  *
22  *-------------------------------------------------------------------------
23  */
24 
25 #include "postgres.h"
26 
27 #include "access/detoast.h"
28 #include "access/genam.h"
29 #include "access/heapam.h"
30 #include "access/heaptoast.h"
31 #include "access/toast_helper.h"
32 #include "access/toast_internals.h"
33 #include "utils/fmgroids.h"
34 
35 
36 /* ----------
37  * heap_toast_delete -
38  *
39  *	Cascaded delete toast-entries on DELETE
40  * ----------
41  */
42 void
43 heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
44 {
45 	TupleDesc	tupleDesc;
46 	Datum		toast_values[MaxHeapAttributeNumber];
47 	bool		toast_isnull[MaxHeapAttributeNumber];
48 
49 	/*
50 	 * We should only ever be called for tuples of plain relations or
51 	 * materialized views --- recursing on a toast rel is bad news.
52 	 */
53 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
54 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
55 
56 	/*
57 	 * Get the tuple descriptor and break down the tuple into fields.
58 	 *
59 	 * NOTE: it's debatable whether to use heap_deform_tuple() here or just
60 	 * heap_getattr() only the varlena columns.  The latter could win if there
61 	 * are few varlena columns and many non-varlena ones. However,
62 	 * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
63 	 * O(N^2) if there are many varlena columns, so it seems better to err on
64 	 * the side of linear cost.  (We won't even be here unless there's at
65 	 * least one varlena column, by the way.)
66 	 */
67 	tupleDesc = rel->rd_att;
68 
69 	Assert(tupleDesc->natts <= MaxHeapAttributeNumber);
70 	heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
71 
72 	/* Do the real work. */
73 	toast_delete_external(rel, toast_values, toast_isnull, is_speculative);
74 }
75 
76 
77 /* ----------
78  * heap_toast_insert_or_update -
79  *
80  *	Delete no-longer-used toast-entries and create new ones to
81  *	make the new tuple fit on INSERT or UPDATE
82  *
83  * Inputs:
84  *	newtup: the candidate new tuple to be inserted
85  *	oldtup: the old row version for UPDATE, or NULL for INSERT
86  *	options: options to be passed to heap_insert() for toast rows
87  * Result:
88  *	either newtup if no toasting is needed, or a palloc'd modified tuple
89  *	that is what should actually get stored
90  *
91  * NOTE: neither newtup nor oldtup will be modified.  This is a change
92  * from the pre-8.1 API of this routine.
93  * ----------
94  */
95 HeapTuple
96 heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
97 							int options)
98 {
99 	HeapTuple	result_tuple;
100 	TupleDesc	tupleDesc;
101 	int			numAttrs;
102 
103 	Size		maxDataLen;
104 	Size		hoff;
105 
106 	bool		toast_isnull[MaxHeapAttributeNumber];
107 	bool		toast_oldisnull[MaxHeapAttributeNumber];
108 	Datum		toast_values[MaxHeapAttributeNumber];
109 	Datum		toast_oldvalues[MaxHeapAttributeNumber];
110 	ToastAttrInfo toast_attr[MaxHeapAttributeNumber];
111 	ToastTupleContext ttc;
112 
113 	/*
114 	 * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super
115 	 * deletions just normally insert/delete the toast values. It seems
116 	 * easiest to deal with that here, instead on, potentially, multiple
117 	 * callers.
118 	 */
119 	options &= ~HEAP_INSERT_SPECULATIVE;
120 
121 	/*
122 	 * We should only ever be called for tuples of plain relations or
123 	 * materialized views --- recursing on a toast rel is bad news.
124 	 */
125 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
126 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
127 
128 	/*
129 	 * Get the tuple descriptor and break down the tuple(s) into fields.
130 	 */
131 	tupleDesc = rel->rd_att;
132 	numAttrs = tupleDesc->natts;
133 
134 	Assert(numAttrs <= MaxHeapAttributeNumber);
135 	heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
136 	if (oldtup != NULL)
137 		heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
138 
139 	/* ----------
140 	 * Prepare for toasting
141 	 * ----------
142 	 */
143 	ttc.ttc_rel = rel;
144 	ttc.ttc_values = toast_values;
145 	ttc.ttc_isnull = toast_isnull;
146 	if (oldtup == NULL)
147 	{
148 		ttc.ttc_oldvalues = NULL;
149 		ttc.ttc_oldisnull = NULL;
150 	}
151 	else
152 	{
153 		ttc.ttc_oldvalues = toast_oldvalues;
154 		ttc.ttc_oldisnull = toast_oldisnull;
155 	}
156 	ttc.ttc_attr = toast_attr;
157 	toast_tuple_init(&ttc);
158 
159 	/* ----------
160 	 * Compress and/or save external until data fits into target length
161 	 *
162 	 *	1: Inline compress attributes with attstorage EXTENDED, and store very
163 	 *	   large attributes with attstorage EXTENDED or EXTERNAL external
164 	 *	   immediately
165 	 *	2: Store attributes with attstorage EXTENDED or EXTERNAL external
166 	 *	3: Inline compress attributes with attstorage MAIN
167 	 *	4: Store attributes with attstorage MAIN external
168 	 * ----------
169 	 */
170 
171 	/* compute header overhead --- this should match heap_form_tuple() */
172 	hoff = SizeofHeapTupleHeader;
173 	if ((ttc.ttc_flags & TOAST_HAS_NULLS) != 0)
174 		hoff += BITMAPLEN(numAttrs);
175 	hoff = MAXALIGN(hoff);
176 	/* now convert to a limit on the tuple data size */
177 	maxDataLen = RelationGetToastTupleTarget(rel, TOAST_TUPLE_TARGET) - hoff;
178 
179 	/*
180 	 * Look for attributes with attstorage EXTENDED to compress.  Also find
181 	 * large attributes with attstorage EXTENDED or EXTERNAL, and store them
182 	 * external.
183 	 */
184 	while (heap_compute_data_size(tupleDesc,
185 								  toast_values, toast_isnull) > maxDataLen)
186 	{
187 		int			biggest_attno;
188 
189 		biggest_attno = toast_tuple_find_biggest_attribute(&ttc, true, false);
190 		if (biggest_attno < 0)
191 			break;
192 
193 		/*
194 		 * Attempt to compress it inline, if it has attstorage EXTENDED
195 		 */
196 		if (TupleDescAttr(tupleDesc, biggest_attno)->attstorage == TYPSTORAGE_EXTENDED)
197 			toast_tuple_try_compression(&ttc, biggest_attno);
198 		else
199 		{
200 			/*
201 			 * has attstorage EXTERNAL, ignore on subsequent compression
202 			 * passes
203 			 */
204 			toast_attr[biggest_attno].tai_colflags |= TOASTCOL_INCOMPRESSIBLE;
205 		}
206 
207 		/*
208 		 * If this value is by itself more than maxDataLen (after compression
209 		 * if any), push it out to the toast table immediately, if possible.
210 		 * This avoids uselessly compressing other fields in the common case
211 		 * where we have one long field and several short ones.
212 		 *
213 		 * XXX maybe the threshold should be less than maxDataLen?
214 		 */
215 		if (toast_attr[biggest_attno].tai_size > maxDataLen &&
216 			rel->rd_rel->reltoastrelid != InvalidOid)
217 			toast_tuple_externalize(&ttc, biggest_attno, options);
218 	}
219 
220 	/*
221 	 * Second we look for attributes of attstorage EXTENDED or EXTERNAL that
222 	 * are still inline, and make them external.  But skip this if there's no
223 	 * toast table to push them to.
224 	 */
225 	while (heap_compute_data_size(tupleDesc,
226 								  toast_values, toast_isnull) > maxDataLen &&
227 		   rel->rd_rel->reltoastrelid != InvalidOid)
228 	{
229 		int			biggest_attno;
230 
231 		biggest_attno = toast_tuple_find_biggest_attribute(&ttc, false, false);
232 		if (biggest_attno < 0)
233 			break;
234 		toast_tuple_externalize(&ttc, biggest_attno, options);
235 	}
236 
237 	/*
238 	 * Round 3 - this time we take attributes with storage MAIN into
239 	 * compression
240 	 */
241 	while (heap_compute_data_size(tupleDesc,
242 								  toast_values, toast_isnull) > maxDataLen)
243 	{
244 		int			biggest_attno;
245 
246 		biggest_attno = toast_tuple_find_biggest_attribute(&ttc, true, true);
247 		if (biggest_attno < 0)
248 			break;
249 
250 		toast_tuple_try_compression(&ttc, biggest_attno);
251 	}
252 
253 	/*
254 	 * Finally we store attributes of type MAIN externally.  At this point we
255 	 * increase the target tuple size, so that MAIN attributes aren't stored
256 	 * externally unless really necessary.
257 	 */
258 	maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
259 
260 	while (heap_compute_data_size(tupleDesc,
261 								  toast_values, toast_isnull) > maxDataLen &&
262 		   rel->rd_rel->reltoastrelid != InvalidOid)
263 	{
264 		int			biggest_attno;
265 
266 		biggest_attno = toast_tuple_find_biggest_attribute(&ttc, false, true);
267 		if (biggest_attno < 0)
268 			break;
269 
270 		toast_tuple_externalize(&ttc, biggest_attno, options);
271 	}
272 
273 	/*
274 	 * In the case we toasted any values, we need to build a new heap tuple
275 	 * with the changed values.
276 	 */
277 	if ((ttc.ttc_flags & TOAST_NEEDS_CHANGE) != 0)
278 	{
279 		HeapTupleHeader olddata = newtup->t_data;
280 		HeapTupleHeader new_data;
281 		int32		new_header_len;
282 		int32		new_data_len;
283 		int32		new_tuple_len;
284 
285 		/*
286 		 * Calculate the new size of the tuple.
287 		 *
288 		 * Note: we used to assume here that the old tuple's t_hoff must equal
289 		 * the new_header_len value, but that was incorrect.  The old tuple
290 		 * might have a smaller-than-current natts, if there's been an ALTER
291 		 * TABLE ADD COLUMN since it was stored; and that would lead to a
292 		 * different conclusion about the size of the null bitmap, or even
293 		 * whether there needs to be one at all.
294 		 */
295 		new_header_len = SizeofHeapTupleHeader;
296 		if ((ttc.ttc_flags & TOAST_HAS_NULLS) != 0)
297 			new_header_len += BITMAPLEN(numAttrs);
298 		new_header_len = MAXALIGN(new_header_len);
299 		new_data_len = heap_compute_data_size(tupleDesc,
300 											  toast_values, toast_isnull);
301 		new_tuple_len = new_header_len + new_data_len;
302 
303 		/*
304 		 * Allocate and zero the space needed, and fill HeapTupleData fields.
305 		 */
306 		result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
307 		result_tuple->t_len = new_tuple_len;
308 		result_tuple->t_self = newtup->t_self;
309 		result_tuple->t_tableOid = newtup->t_tableOid;
310 		new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
311 		result_tuple->t_data = new_data;
312 
313 		/*
314 		 * Copy the existing tuple header, but adjust natts and t_hoff.
315 		 */
316 		memcpy(new_data, olddata, SizeofHeapTupleHeader);
317 		HeapTupleHeaderSetNatts(new_data, numAttrs);
318 		new_data->t_hoff = new_header_len;
319 
320 		/* Copy over the data, and fill the null bitmap if needed */
321 		heap_fill_tuple(tupleDesc,
322 						toast_values,
323 						toast_isnull,
324 						(char *) new_data + new_header_len,
325 						new_data_len,
326 						&(new_data->t_infomask),
327 						((ttc.ttc_flags & TOAST_HAS_NULLS) != 0) ?
328 						new_data->t_bits : NULL);
329 	}
330 	else
331 		result_tuple = newtup;
332 
333 	toast_tuple_cleanup(&ttc);
334 
335 	return result_tuple;
336 }
337 
338 
339 /* ----------
340  * toast_flatten_tuple -
341  *
342  *	"Flatten" a tuple to contain no out-of-line toasted fields.
343  *	(This does not eliminate compressed or short-header datums.)
344  *
345  *	Note: we expect the caller already checked HeapTupleHasExternal(tup),
346  *	so there is no need for a short-circuit path.
347  * ----------
348  */
349 HeapTuple
350 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
351 {
352 	HeapTuple	new_tuple;
353 	int			numAttrs = tupleDesc->natts;
354 	int			i;
355 	Datum		toast_values[MaxTupleAttributeNumber];
356 	bool		toast_isnull[MaxTupleAttributeNumber];
357 	bool		toast_free[MaxTupleAttributeNumber];
358 
359 	/*
360 	 * Break down the tuple into fields.
361 	 */
362 	Assert(numAttrs <= MaxTupleAttributeNumber);
363 	heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
364 
365 	memset(toast_free, 0, numAttrs * sizeof(bool));
366 
367 	for (i = 0; i < numAttrs; i++)
368 	{
369 		/*
370 		 * Look at non-null varlena attributes
371 		 */
372 		if (!toast_isnull[i] && TupleDescAttr(tupleDesc, i)->attlen == -1)
373 		{
374 			struct varlena *new_value;
375 
376 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
377 			if (VARATT_IS_EXTERNAL(new_value))
378 			{
379 				new_value = detoast_external_attr(new_value);
380 				toast_values[i] = PointerGetDatum(new_value);
381 				toast_free[i] = true;
382 			}
383 		}
384 	}
385 
386 	/*
387 	 * Form the reconfigured tuple.
388 	 */
389 	new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
390 
391 	/*
392 	 * Be sure to copy the tuple's identity fields.  We also make a point of
393 	 * copying visibility info, just in case anybody looks at those fields in
394 	 * a syscache entry.
395 	 */
396 	new_tuple->t_self = tup->t_self;
397 	new_tuple->t_tableOid = tup->t_tableOid;
398 
399 	new_tuple->t_data->t_choice = tup->t_data->t_choice;
400 	new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
401 	new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
402 	new_tuple->t_data->t_infomask |=
403 		tup->t_data->t_infomask & HEAP_XACT_MASK;
404 	new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
405 	new_tuple->t_data->t_infomask2 |=
406 		tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
407 
408 	/*
409 	 * Free allocated temp values
410 	 */
411 	for (i = 0; i < numAttrs; i++)
412 		if (toast_free[i])
413 			pfree(DatumGetPointer(toast_values[i]));
414 
415 	return new_tuple;
416 }
417 
418 
419 /* ----------
420  * toast_flatten_tuple_to_datum -
421  *
422  *	"Flatten" a tuple containing out-of-line toasted fields into a Datum.
423  *	The result is always palloc'd in the current memory context.
424  *
425  *	We have a general rule that Datums of container types (rows, arrays,
426  *	ranges, etc) must not contain any external TOAST pointers.  Without
427  *	this rule, we'd have to look inside each Datum when preparing a tuple
428  *	for storage, which would be expensive and would fail to extend cleanly
429  *	to new sorts of container types.
430  *
431  *	However, we don't want to say that tuples represented as HeapTuples
432  *	can't contain toasted fields, so instead this routine should be called
433  *	when such a HeapTuple is being converted into a Datum.
434  *
435  *	While we're at it, we decompress any compressed fields too.  This is not
436  *	necessary for correctness, but reflects an expectation that compression
437  *	will be more effective if applied to the whole tuple not individual
438  *	fields.  We are not so concerned about that that we want to deconstruct
439  *	and reconstruct tuples just to get rid of compressed fields, however.
440  *	So callers typically won't call this unless they see that the tuple has
441  *	at least one external field.
442  *
443  *	On the other hand, in-line short-header varlena fields are left alone.
444  *	If we "untoasted" them here, they'd just get changed back to short-header
445  *	format anyway within heap_fill_tuple.
446  * ----------
447  */
448 Datum
449 toast_flatten_tuple_to_datum(HeapTupleHeader tup,
450 							 uint32 tup_len,
451 							 TupleDesc tupleDesc)
452 {
453 	HeapTupleHeader new_data;
454 	int32		new_header_len;
455 	int32		new_data_len;
456 	int32		new_tuple_len;
457 	HeapTupleData tmptup;
458 	int			numAttrs = tupleDesc->natts;
459 	int			i;
460 	bool		has_nulls = false;
461 	Datum		toast_values[MaxTupleAttributeNumber];
462 	bool		toast_isnull[MaxTupleAttributeNumber];
463 	bool		toast_free[MaxTupleAttributeNumber];
464 
465 	/* Build a temporary HeapTuple control structure */
466 	tmptup.t_len = tup_len;
467 	ItemPointerSetInvalid(&(tmptup.t_self));
468 	tmptup.t_tableOid = InvalidOid;
469 	tmptup.t_data = tup;
470 
471 	/*
472 	 * Break down the tuple into fields.
473 	 */
474 	Assert(numAttrs <= MaxTupleAttributeNumber);
475 	heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
476 
477 	memset(toast_free, 0, numAttrs * sizeof(bool));
478 
479 	for (i = 0; i < numAttrs; i++)
480 	{
481 		/*
482 		 * Look at non-null varlena attributes
483 		 */
484 		if (toast_isnull[i])
485 			has_nulls = true;
486 		else if (TupleDescAttr(tupleDesc, i)->attlen == -1)
487 		{
488 			struct varlena *new_value;
489 
490 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
491 			if (VARATT_IS_EXTERNAL(new_value) ||
492 				VARATT_IS_COMPRESSED(new_value))
493 			{
494 				new_value = detoast_attr(new_value);
495 				toast_values[i] = PointerGetDatum(new_value);
496 				toast_free[i] = true;
497 			}
498 		}
499 	}
500 
501 	/*
502 	 * Calculate the new size of the tuple.
503 	 *
504 	 * This should match the reconstruction code in
505 	 * heap_toast_insert_or_update.
506 	 */
507 	new_header_len = SizeofHeapTupleHeader;
508 	if (has_nulls)
509 		new_header_len += BITMAPLEN(numAttrs);
510 	new_header_len = MAXALIGN(new_header_len);
511 	new_data_len = heap_compute_data_size(tupleDesc,
512 										  toast_values, toast_isnull);
513 	new_tuple_len = new_header_len + new_data_len;
514 
515 	new_data = (HeapTupleHeader) palloc0(new_tuple_len);
516 
517 	/*
518 	 * Copy the existing tuple header, but adjust natts and t_hoff.
519 	 */
520 	memcpy(new_data, tup, SizeofHeapTupleHeader);
521 	HeapTupleHeaderSetNatts(new_data, numAttrs);
522 	new_data->t_hoff = new_header_len;
523 
524 	/* Set the composite-Datum header fields correctly */
525 	HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
526 	HeapTupleHeaderSetTypeId(new_data, tupleDesc->tdtypeid);
527 	HeapTupleHeaderSetTypMod(new_data, tupleDesc->tdtypmod);
528 
529 	/* Copy over the data, and fill the null bitmap if needed */
530 	heap_fill_tuple(tupleDesc,
531 					toast_values,
532 					toast_isnull,
533 					(char *) new_data + new_header_len,
534 					new_data_len,
535 					&(new_data->t_infomask),
536 					has_nulls ? new_data->t_bits : NULL);
537 
538 	/*
539 	 * Free allocated temp values
540 	 */
541 	for (i = 0; i < numAttrs; i++)
542 		if (toast_free[i])
543 			pfree(DatumGetPointer(toast_values[i]));
544 
545 	return PointerGetDatum(new_data);
546 }
547 
548 
549 /* ----------
550  * toast_build_flattened_tuple -
551  *
552  *	Build a tuple containing no out-of-line toasted fields.
553  *	(This does not eliminate compressed or short-header datums.)
554  *
555  *	This is essentially just like heap_form_tuple, except that it will
556  *	expand any external-data pointers beforehand.
557  *
558  *	It's not very clear whether it would be preferable to decompress
559  *	in-line compressed datums while at it.  For now, we don't.
560  * ----------
561  */
562 HeapTuple
563 toast_build_flattened_tuple(TupleDesc tupleDesc,
564 							Datum *values,
565 							bool *isnull)
566 {
567 	HeapTuple	new_tuple;
568 	int			numAttrs = tupleDesc->natts;
569 	int			num_to_free;
570 	int			i;
571 	Datum		new_values[MaxTupleAttributeNumber];
572 	Pointer		freeable_values[MaxTupleAttributeNumber];
573 
574 	/*
575 	 * We can pass the caller's isnull array directly to heap_form_tuple, but
576 	 * we potentially need to modify the values array.
577 	 */
578 	Assert(numAttrs <= MaxTupleAttributeNumber);
579 	memcpy(new_values, values, numAttrs * sizeof(Datum));
580 
581 	num_to_free = 0;
582 	for (i = 0; i < numAttrs; i++)
583 	{
584 		/*
585 		 * Look at non-null varlena attributes
586 		 */
587 		if (!isnull[i] && TupleDescAttr(tupleDesc, i)->attlen == -1)
588 		{
589 			struct varlena *new_value;
590 
591 			new_value = (struct varlena *) DatumGetPointer(new_values[i]);
592 			if (VARATT_IS_EXTERNAL(new_value))
593 			{
594 				new_value = detoast_external_attr(new_value);
595 				new_values[i] = PointerGetDatum(new_value);
596 				freeable_values[num_to_free++] = (Pointer) new_value;
597 			}
598 		}
599 	}
600 
601 	/*
602 	 * Form the reconfigured tuple.
603 	 */
604 	new_tuple = heap_form_tuple(tupleDesc, new_values, isnull);
605 
606 	/*
607 	 * Free allocated temp values
608 	 */
609 	for (i = 0; i < num_to_free; i++)
610 		pfree(freeable_values[i]);
611 
612 	return new_tuple;
613 }
614 
615 /*
616  * Fetch a TOAST slice from a heap table.
617  *
618  * toastrel is the relation from which chunks are to be fetched.
619  * valueid identifies the TOAST value from which chunks are being fetched.
620  * attrsize is the total size of the TOAST value.
621  * sliceoffset is the byte offset within the TOAST value from which to fetch.
622  * slicelength is the number of bytes to be fetched from the TOAST value.
623  * result is the varlena into which the results should be written.
624  */
625 void
626 heap_fetch_toast_slice(Relation toastrel, Oid valueid, int32 attrsize,
627 					   int32 sliceoffset, int32 slicelength,
628 					   struct varlena *result)
629 {
630 	Relation   *toastidxs;
631 	ScanKeyData toastkey[3];
632 	TupleDesc	toasttupDesc = toastrel->rd_att;
633 	int			nscankeys;
634 	SysScanDesc toastscan;
635 	HeapTuple	ttup;
636 	int32		expectedchunk;
637 	int32		totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
638 	int			startchunk;
639 	int			endchunk;
640 	int			num_indexes;
641 	int			validIndex;
642 	SnapshotData SnapshotToast;
643 
644 	/* Look for the valid index of toast relation */
645 	validIndex = toast_open_indexes(toastrel,
646 									AccessShareLock,
647 									&toastidxs,
648 									&num_indexes);
649 
650 	startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
651 	endchunk = (sliceoffset + slicelength - 1) / TOAST_MAX_CHUNK_SIZE;
652 	Assert(endchunk <= totalchunks);
653 
654 	/* Set up a scan key to fetch from the index. */
655 	ScanKeyInit(&toastkey[0],
656 				(AttrNumber) 1,
657 				BTEqualStrategyNumber, F_OIDEQ,
658 				ObjectIdGetDatum(valueid));
659 
660 	/*
661 	 * No additional condition if fetching all chunks. Otherwise, use an
662 	 * equality condition for one chunk, and a range condition otherwise.
663 	 */
664 	if (startchunk == 0 && endchunk == totalchunks - 1)
665 		nscankeys = 1;
666 	else if (startchunk == endchunk)
667 	{
668 		ScanKeyInit(&toastkey[1],
669 					(AttrNumber) 2,
670 					BTEqualStrategyNumber, F_INT4EQ,
671 					Int32GetDatum(startchunk));
672 		nscankeys = 2;
673 	}
674 	else
675 	{
676 		ScanKeyInit(&toastkey[1],
677 					(AttrNumber) 2,
678 					BTGreaterEqualStrategyNumber, F_INT4GE,
679 					Int32GetDatum(startchunk));
680 		ScanKeyInit(&toastkey[2],
681 					(AttrNumber) 2,
682 					BTLessEqualStrategyNumber, F_INT4LE,
683 					Int32GetDatum(endchunk));
684 		nscankeys = 3;
685 	}
686 
687 	/* Prepare for scan */
688 	init_toast_snapshot(&SnapshotToast);
689 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
690 										   &SnapshotToast, nscankeys, toastkey);
691 
692 	/*
693 	 * Read the chunks by index
694 	 *
695 	 * The index is on (valueid, chunkidx) so they will come in order
696 	 */
697 	expectedchunk = startchunk;
698 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
699 	{
700 		int32		curchunk;
701 		Pointer		chunk;
702 		bool		isnull;
703 		char	   *chunkdata;
704 		int32		chunksize;
705 		int32		expected_size;
706 		int32		chcpystrt;
707 		int32		chcpyend;
708 
709 		/*
710 		 * Have a chunk, extract the sequence number and the data
711 		 */
712 		curchunk = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
713 		Assert(!isnull);
714 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
715 		Assert(!isnull);
716 		if (!VARATT_IS_EXTENDED(chunk))
717 		{
718 			chunksize = VARSIZE(chunk) - VARHDRSZ;
719 			chunkdata = VARDATA(chunk);
720 		}
721 		else if (VARATT_IS_SHORT(chunk))
722 		{
723 			/* could happen due to heap_form_tuple doing its thing */
724 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
725 			chunkdata = VARDATA_SHORT(chunk);
726 		}
727 		else
728 		{
729 			/* should never happen */
730 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
731 				 valueid, RelationGetRelationName(toastrel));
732 			chunksize = 0;		/* keep compiler quiet */
733 			chunkdata = NULL;
734 		}
735 
736 		/*
737 		 * Some checks on the data we've found
738 		 */
739 		if (curchunk != expectedchunk)
740 			ereport(ERROR,
741 					(errcode(ERRCODE_DATA_CORRUPTED),
742 					 errmsg_internal("unexpected chunk number %d (expected %d) for toast value %u in %s",
743 									 curchunk, expectedchunk, valueid,
744 									 RelationGetRelationName(toastrel))));
745 		if (curchunk > endchunk)
746 			ereport(ERROR,
747 					(errcode(ERRCODE_DATA_CORRUPTED),
748 					 errmsg_internal("unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
749 									 curchunk,
750 									 startchunk, endchunk, valueid,
751 									 RelationGetRelationName(toastrel))));
752 		expected_size = curchunk < totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
753 			: attrsize - ((totalchunks - 1) * TOAST_MAX_CHUNK_SIZE);
754 		if (chunksize != expected_size)
755 			ereport(ERROR,
756 					(errcode(ERRCODE_DATA_CORRUPTED),
757 					 errmsg_internal("unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
758 									 chunksize, expected_size,
759 									 curchunk, totalchunks, valueid,
760 									 RelationGetRelationName(toastrel))));
761 
762 		/*
763 		 * Copy the data into proper place in our result
764 		 */
765 		chcpystrt = 0;
766 		chcpyend = chunksize - 1;
767 		if (curchunk == startchunk)
768 			chcpystrt = sliceoffset % TOAST_MAX_CHUNK_SIZE;
769 		if (curchunk == endchunk)
770 			chcpyend = (sliceoffset + slicelength - 1) % TOAST_MAX_CHUNK_SIZE;
771 
772 		memcpy(VARDATA(result) +
773 			   (curchunk * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
774 			   chunkdata + chcpystrt,
775 			   (chcpyend - chcpystrt) + 1);
776 
777 		expectedchunk++;
778 	}
779 
780 	/*
781 	 * Final checks that we successfully fetched the datum
782 	 */
783 	if (expectedchunk != (endchunk + 1))
784 		ereport(ERROR,
785 				(errcode(ERRCODE_DATA_CORRUPTED),
786 				 errmsg_internal("missing chunk number %d for toast value %u in %s",
787 								 expectedchunk, valueid,
788 								 RelationGetRelationName(toastrel))));
789 
790 	/* End scan and close indexes. */
791 	systable_endscan_ordered(toastscan);
792 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
793 }
794