1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *	  Support routines for external and compressed storage of
5  *	  variable size attributes.
6  *
7  * Copyright (c) 2000-2019, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *		toast_insert_or_update -
16  *			Try to make a given tuple fit into one page by compressing
17  *			or moving off attributes
18  *
19  *		toast_delete -
20  *			Reclaim toast storage when a tuple is deleted
21  *
22  *		heap_tuple_untoast_attr -
23  *			Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27 
28 #include "postgres.h"
29 
30 #include <unistd.h>
31 #include <fcntl.h>
32 
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "common/int.h"
39 #include "common/pg_lzcompress.h"
40 #include "miscadmin.h"
41 #include "utils/expandeddatum.h"
42 #include "utils/fmgroids.h"
43 #include "utils/rel.h"
44 #include "utils/snapmgr.h"
45 #include "utils/typcache.h"
46 
47 
48 #undef TOAST_DEBUG
49 
50 /*
51  *	The information at the start of the compressed toast data.
52  */
53 typedef struct toast_compress_header
54 {
55 	int32		vl_len_;		/* varlena header (do not touch directly!) */
56 	int32		rawsize;
57 } toast_compress_header;
58 
59 /*
60  * Utilities for manipulation of header information for compressed
61  * toast entries.
62  */
63 #define TOAST_COMPRESS_HDRSZ		((int32) sizeof(toast_compress_header))
64 #define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->rawsize)
65 #define TOAST_COMPRESS_RAWDATA(ptr) \
66 	(((char *) (ptr)) + TOAST_COMPRESS_HDRSZ)
67 #define TOAST_COMPRESS_SET_RAWSIZE(ptr, len) \
68 	(((toast_compress_header *) (ptr))->rawsize = (len))
69 
70 static void toast_delete_datum(Relation rel, Datum value, bool is_speculative);
71 static Datum toast_save_datum(Relation rel, Datum value,
72 							  struct varlena *oldexternal, int options);
73 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
74 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
75 static struct varlena *toast_fetch_datum(struct varlena *attr);
76 static struct varlena *toast_fetch_datum_slice(struct varlena *attr,
77 											   int32 sliceoffset, int32 length);
78 static struct varlena *toast_decompress_datum(struct varlena *attr);
79 static struct varlena *toast_decompress_datum_slice(struct varlena *attr, int32 slicelength);
80 static int	toast_open_indexes(Relation toastrel,
81 							   LOCKMODE lock,
82 							   Relation **toastidxs,
83 							   int *num_indexes);
84 static void toast_close_indexes(Relation *toastidxs, int num_indexes,
85 								LOCKMODE lock);
86 static void init_toast_snapshot(Snapshot toast_snapshot);
87 
88 
89 /* ----------
90  * heap_tuple_fetch_attr -
91  *
92  *	Public entry point to get back a toasted value from
93  *	external source (possibly still in compressed format).
94  *
95  * This will return a datum that contains all the data internally, ie, not
96  * relying on external storage or memory, but it can still be compressed or
97  * have a short header.  Note some callers assume that if the input is an
98  * EXTERNAL datum, the result will be a pfree'able chunk.
99  * ----------
100  */
101 struct varlena *
heap_tuple_fetch_attr(struct varlena * attr)102 heap_tuple_fetch_attr(struct varlena *attr)
103 {
104 	struct varlena *result;
105 
106 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
107 	{
108 		/*
109 		 * This is an external stored plain value
110 		 */
111 		result = toast_fetch_datum(attr);
112 	}
113 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
114 	{
115 		/*
116 		 * This is an indirect pointer --- dereference it
117 		 */
118 		struct varatt_indirect redirect;
119 
120 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
121 		attr = (struct varlena *) redirect.pointer;
122 
123 		/* nested indirect Datums aren't allowed */
124 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
125 
126 		/* recurse if value is still external in some other way */
127 		if (VARATT_IS_EXTERNAL(attr))
128 			return heap_tuple_fetch_attr(attr);
129 
130 		/*
131 		 * Copy into the caller's memory context, in case caller tries to
132 		 * pfree the result.
133 		 */
134 		result = (struct varlena *) palloc(VARSIZE_ANY(attr));
135 		memcpy(result, attr, VARSIZE_ANY(attr));
136 	}
137 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
138 	{
139 		/*
140 		 * This is an expanded-object pointer --- get flat format
141 		 */
142 		ExpandedObjectHeader *eoh;
143 		Size		resultsize;
144 
145 		eoh = DatumGetEOHP(PointerGetDatum(attr));
146 		resultsize = EOH_get_flat_size(eoh);
147 		result = (struct varlena *) palloc(resultsize);
148 		EOH_flatten_into(eoh, (void *) result, resultsize);
149 	}
150 	else
151 	{
152 		/*
153 		 * This is a plain value inside of the main tuple - why am I called?
154 		 */
155 		result = attr;
156 	}
157 
158 	return result;
159 }
160 
161 
162 /* ----------
163  * heap_tuple_untoast_attr -
164  *
165  *	Public entry point to get back a toasted value from compression
166  *	or external storage.  The result is always non-extended varlena form.
167  *
168  * Note some callers assume that if the input is an EXTERNAL or COMPRESSED
169  * datum, the result will be a pfree'able chunk.
170  * ----------
171  */
172 struct varlena *
heap_tuple_untoast_attr(struct varlena * attr)173 heap_tuple_untoast_attr(struct varlena *attr)
174 {
175 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
176 	{
177 		/*
178 		 * This is an externally stored datum --- fetch it back from there
179 		 */
180 		attr = toast_fetch_datum(attr);
181 		/* If it's compressed, decompress it */
182 		if (VARATT_IS_COMPRESSED(attr))
183 		{
184 			struct varlena *tmp = attr;
185 
186 			attr = toast_decompress_datum(tmp);
187 			pfree(tmp);
188 		}
189 	}
190 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
191 	{
192 		/*
193 		 * This is an indirect pointer --- dereference it
194 		 */
195 		struct varatt_indirect redirect;
196 
197 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
198 		attr = (struct varlena *) redirect.pointer;
199 
200 		/* nested indirect Datums aren't allowed */
201 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
202 
203 		/* recurse in case value is still extended in some other way */
204 		attr = heap_tuple_untoast_attr(attr);
205 
206 		/* if it isn't, we'd better copy it */
207 		if (attr == (struct varlena *) redirect.pointer)
208 		{
209 			struct varlena *result;
210 
211 			result = (struct varlena *) palloc(VARSIZE_ANY(attr));
212 			memcpy(result, attr, VARSIZE_ANY(attr));
213 			attr = result;
214 		}
215 	}
216 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
217 	{
218 		/*
219 		 * This is an expanded-object pointer --- get flat format
220 		 */
221 		attr = heap_tuple_fetch_attr(attr);
222 		/* flatteners are not allowed to produce compressed/short output */
223 		Assert(!VARATT_IS_EXTENDED(attr));
224 	}
225 	else if (VARATT_IS_COMPRESSED(attr))
226 	{
227 		/*
228 		 * This is a compressed value inside of the main tuple
229 		 */
230 		attr = toast_decompress_datum(attr);
231 	}
232 	else if (VARATT_IS_SHORT(attr))
233 	{
234 		/*
235 		 * This is a short-header varlena --- convert to 4-byte header format
236 		 */
237 		Size		data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
238 		Size		new_size = data_size + VARHDRSZ;
239 		struct varlena *new_attr;
240 
241 		new_attr = (struct varlena *) palloc(new_size);
242 		SET_VARSIZE(new_attr, new_size);
243 		memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
244 		attr = new_attr;
245 	}
246 
247 	return attr;
248 }
249 
250 
251 /* ----------
252  * heap_tuple_untoast_attr_slice -
253  *
254  *		Public entry point to get back part of a toasted value
255  *		from compression or external storage.
256  *
257  * sliceoffset is where to start (zero or more)
258  * If slicelength < 0, return everything beyond sliceoffset
259  * ----------
260  */
261 struct varlena *
heap_tuple_untoast_attr_slice(struct varlena * attr,int32 sliceoffset,int32 slicelength)262 heap_tuple_untoast_attr_slice(struct varlena *attr,
263 							  int32 sliceoffset, int32 slicelength)
264 {
265 	struct varlena *preslice;
266 	struct varlena *result;
267 	char	   *attrdata;
268 	int32		slicelimit;
269 	int32		attrsize;
270 
271 	if (sliceoffset < 0)
272 		elog(ERROR, "invalid sliceoffset: %d", sliceoffset);
273 
274 	/*
275 	 * Compute slicelimit = offset + length, or -1 if we must fetch all of the
276 	 * value.  In case of integer overflow, we must fetch all.
277 	 */
278 	if (slicelength < 0)
279 		slicelimit = -1;
280 	else if (pg_add_s32_overflow(sliceoffset, slicelength, &slicelimit))
281 		slicelength = slicelimit = -1;
282 
283 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
284 	{
285 		struct varatt_external toast_pointer;
286 
287 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
288 
289 		/* fast path for non-compressed external datums */
290 		if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
291 			return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
292 
293 		/* fetch it back (compressed marker will get set automatically) */
294 		preslice = toast_fetch_datum(attr);
295 	}
296 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
297 	{
298 		struct varatt_indirect redirect;
299 
300 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
301 
302 		/* nested indirect Datums aren't allowed */
303 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
304 
305 		return heap_tuple_untoast_attr_slice(redirect.pointer,
306 											 sliceoffset, slicelength);
307 	}
308 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
309 	{
310 		/* pass it off to heap_tuple_fetch_attr to flatten */
311 		preslice = heap_tuple_fetch_attr(attr);
312 	}
313 	else
314 		preslice = attr;
315 
316 	Assert(!VARATT_IS_EXTERNAL(preslice));
317 
318 	if (VARATT_IS_COMPRESSED(preslice))
319 	{
320 		struct varlena *tmp = preslice;
321 
322 		/* Decompress enough to encompass the slice and the offset */
323 		if (slicelimit >= 0)
324 			preslice = toast_decompress_datum_slice(tmp, slicelimit);
325 		else
326 			preslice = toast_decompress_datum(tmp);
327 
328 		if (tmp != attr)
329 			pfree(tmp);
330 	}
331 
332 	if (VARATT_IS_SHORT(preslice))
333 	{
334 		attrdata = VARDATA_SHORT(preslice);
335 		attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
336 	}
337 	else
338 	{
339 		attrdata = VARDATA(preslice);
340 		attrsize = VARSIZE(preslice) - VARHDRSZ;
341 	}
342 
343 	/* slicing of datum for compressed cases and plain value */
344 
345 	if (sliceoffset >= attrsize)
346 	{
347 		sliceoffset = 0;
348 		slicelength = 0;
349 	}
350 	else if (slicelength < 0 || slicelimit > attrsize)
351 		slicelength = attrsize - sliceoffset;
352 
353 	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
354 	SET_VARSIZE(result, slicelength + VARHDRSZ);
355 
356 	memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
357 
358 	if (preslice != attr)
359 		pfree(preslice);
360 
361 	return result;
362 }
363 
364 
365 /* ----------
366  * toast_raw_datum_size -
367  *
368  *	Return the raw (detoasted) size of a varlena datum
369  *	(including the VARHDRSZ header)
370  * ----------
371  */
372 Size
toast_raw_datum_size(Datum value)373 toast_raw_datum_size(Datum value)
374 {
375 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
376 	Size		result;
377 
378 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
379 	{
380 		/* va_rawsize is the size of the original datum -- including header */
381 		struct varatt_external toast_pointer;
382 
383 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
384 		result = toast_pointer.va_rawsize;
385 	}
386 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
387 	{
388 		struct varatt_indirect toast_pointer;
389 
390 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
391 
392 		/* nested indirect Datums aren't allowed */
393 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
394 
395 		return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
396 	}
397 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
398 	{
399 		result = EOH_get_flat_size(DatumGetEOHP(value));
400 	}
401 	else if (VARATT_IS_COMPRESSED(attr))
402 	{
403 		/* here, va_rawsize is just the payload size */
404 		result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
405 	}
406 	else if (VARATT_IS_SHORT(attr))
407 	{
408 		/*
409 		 * we have to normalize the header length to VARHDRSZ or else the
410 		 * callers of this function will be confused.
411 		 */
412 		result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
413 	}
414 	else
415 	{
416 		/* plain untoasted datum */
417 		result = VARSIZE(attr);
418 	}
419 	return result;
420 }
421 
422 /* ----------
423  * toast_datum_size
424  *
425  *	Return the physical storage size (possibly compressed) of a varlena datum
426  * ----------
427  */
428 Size
toast_datum_size(Datum value)429 toast_datum_size(Datum value)
430 {
431 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
432 	Size		result;
433 
434 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
435 	{
436 		/*
437 		 * Attribute is stored externally - return the extsize whether
438 		 * compressed or not.  We do not count the size of the toast pointer
439 		 * ... should we?
440 		 */
441 		struct varatt_external toast_pointer;
442 
443 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
444 		result = toast_pointer.va_extsize;
445 	}
446 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
447 	{
448 		struct varatt_indirect toast_pointer;
449 
450 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
451 
452 		/* nested indirect Datums aren't allowed */
453 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
454 
455 		return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
456 	}
457 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
458 	{
459 		result = EOH_get_flat_size(DatumGetEOHP(value));
460 	}
461 	else if (VARATT_IS_SHORT(attr))
462 	{
463 		result = VARSIZE_SHORT(attr);
464 	}
465 	else
466 	{
467 		/*
468 		 * Attribute is stored inline either compressed or not, just calculate
469 		 * the size of the datum in either case.
470 		 */
471 		result = VARSIZE(attr);
472 	}
473 	return result;
474 }
475 
476 
477 /* ----------
478  * toast_delete -
479  *
480  *	Cascaded delete toast-entries on DELETE
481  * ----------
482  */
483 void
toast_delete(Relation rel,HeapTuple oldtup,bool is_speculative)484 toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
485 {
486 	TupleDesc	tupleDesc;
487 	int			numAttrs;
488 	int			i;
489 	Datum		toast_values[MaxHeapAttributeNumber];
490 	bool		toast_isnull[MaxHeapAttributeNumber];
491 
492 	/*
493 	 * We should only ever be called for tuples of plain relations or
494 	 * materialized views --- recursing on a toast rel is bad news.
495 	 */
496 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
497 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
498 
499 	/*
500 	 * Get the tuple descriptor and break down the tuple into fields.
501 	 *
502 	 * NOTE: it's debatable whether to use heap_deform_tuple() here or just
503 	 * heap_getattr() only the varlena columns.  The latter could win if there
504 	 * are few varlena columns and many non-varlena ones. However,
505 	 * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
506 	 * O(N^2) if there are many varlena columns, so it seems better to err on
507 	 * the side of linear cost.  (We won't even be here unless there's at
508 	 * least one varlena column, by the way.)
509 	 */
510 	tupleDesc = rel->rd_att;
511 	numAttrs = tupleDesc->natts;
512 
513 	Assert(numAttrs <= MaxHeapAttributeNumber);
514 	heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
515 
516 	/*
517 	 * Check for external stored attributes and delete them from the secondary
518 	 * relation.
519 	 */
520 	for (i = 0; i < numAttrs; i++)
521 	{
522 		if (TupleDescAttr(tupleDesc, i)->attlen == -1)
523 		{
524 			Datum		value = toast_values[i];
525 
526 			if (toast_isnull[i])
527 				continue;
528 			else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
529 				toast_delete_datum(rel, value, is_speculative);
530 		}
531 	}
532 }
533 
534 
535 /* ----------
536  * toast_insert_or_update -
537  *
538  *	Delete no-longer-used toast-entries and create new ones to
539  *	make the new tuple fit on INSERT or UPDATE
540  *
541  * Inputs:
542  *	newtup: the candidate new tuple to be inserted
543  *	oldtup: the old row version for UPDATE, or NULL for INSERT
544  *	options: options to be passed to heap_insert() for toast rows
545  * Result:
546  *	either newtup if no toasting is needed, or a palloc'd modified tuple
547  *	that is what should actually get stored
548  *
549  * NOTE: neither newtup nor oldtup will be modified.  This is a change
550  * from the pre-8.1 API of this routine.
551  * ----------
552  */
553 HeapTuple
toast_insert_or_update(Relation rel,HeapTuple newtup,HeapTuple oldtup,int options)554 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
555 					   int options)
556 {
557 	HeapTuple	result_tuple;
558 	TupleDesc	tupleDesc;
559 	int			numAttrs;
560 	int			i;
561 
562 	bool		need_change = false;
563 	bool		need_free = false;
564 	bool		need_delold = false;
565 	bool		has_nulls = false;
566 
567 	Size		maxDataLen;
568 	Size		hoff;
569 
570 	char		toast_action[MaxHeapAttributeNumber];
571 	bool		toast_isnull[MaxHeapAttributeNumber];
572 	bool		toast_oldisnull[MaxHeapAttributeNumber];
573 	Datum		toast_values[MaxHeapAttributeNumber];
574 	Datum		toast_oldvalues[MaxHeapAttributeNumber];
575 	struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
576 	int32		toast_sizes[MaxHeapAttributeNumber];
577 	bool		toast_free[MaxHeapAttributeNumber];
578 	bool		toast_delold[MaxHeapAttributeNumber];
579 
580 	/*
581 	 * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super
582 	 * deletions just normally insert/delete the toast values. It seems
583 	 * easiest to deal with that here, instead on, potentially, multiple
584 	 * callers.
585 	 */
586 	options &= ~HEAP_INSERT_SPECULATIVE;
587 
588 	/*
589 	 * We should only ever be called for tuples of plain relations or
590 	 * materialized views --- recursing on a toast rel is bad news.
591 	 */
592 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
593 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
594 
595 	/*
596 	 * Get the tuple descriptor and break down the tuple(s) into fields.
597 	 */
598 	tupleDesc = rel->rd_att;
599 	numAttrs = tupleDesc->natts;
600 
601 	Assert(numAttrs <= MaxHeapAttributeNumber);
602 	heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
603 	if (oldtup != NULL)
604 		heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
605 
606 	/* ----------
607 	 * Then collect information about the values given
608 	 *
609 	 * NOTE: toast_action[i] can have these values:
610 	 *		' '		default handling
611 	 *		'p'		already processed --- don't touch it
612 	 *		'x'		incompressible, but OK to move off
613 	 *
614 	 * NOTE: toast_sizes[i] is only made valid for varlena attributes with
615 	 *		toast_action[i] different from 'p'.
616 	 * ----------
617 	 */
618 	memset(toast_action, ' ', numAttrs * sizeof(char));
619 	memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
620 	memset(toast_free, 0, numAttrs * sizeof(bool));
621 	memset(toast_delold, 0, numAttrs * sizeof(bool));
622 
623 	for (i = 0; i < numAttrs; i++)
624 	{
625 		Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
626 		struct varlena *old_value;
627 		struct varlena *new_value;
628 
629 		if (oldtup != NULL)
630 		{
631 			/*
632 			 * For UPDATE get the old and new values of this attribute
633 			 */
634 			old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
635 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
636 
637 			/*
638 			 * If the old value is stored on disk, check if it has changed so
639 			 * we have to delete it later.
640 			 */
641 			if (att->attlen == -1 && !toast_oldisnull[i] &&
642 				VARATT_IS_EXTERNAL_ONDISK(old_value))
643 			{
644 				if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
645 					memcmp((char *) old_value, (char *) new_value,
646 						   VARSIZE_EXTERNAL(old_value)) != 0)
647 				{
648 					/*
649 					 * The old external stored value isn't needed any more
650 					 * after the update
651 					 */
652 					toast_delold[i] = true;
653 					need_delold = true;
654 				}
655 				else
656 				{
657 					/*
658 					 * This attribute isn't changed by this update so we reuse
659 					 * the original reference to the old value in the new
660 					 * tuple.
661 					 */
662 					toast_action[i] = 'p';
663 					continue;
664 				}
665 			}
666 		}
667 		else
668 		{
669 			/*
670 			 * For INSERT simply get the new value
671 			 */
672 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
673 		}
674 
675 		/*
676 		 * Handle NULL attributes
677 		 */
678 		if (toast_isnull[i])
679 		{
680 			toast_action[i] = 'p';
681 			has_nulls = true;
682 			continue;
683 		}
684 
685 		/*
686 		 * Now look at varlena attributes
687 		 */
688 		if (att->attlen == -1)
689 		{
690 			/*
691 			 * If the table's attribute says PLAIN always, force it so.
692 			 */
693 			if (att->attstorage == 'p')
694 				toast_action[i] = 'p';
695 
696 			/*
697 			 * We took care of UPDATE above, so any external value we find
698 			 * still in the tuple must be someone else's that we cannot reuse
699 			 * (this includes the case of an out-of-line in-memory datum).
700 			 * Fetch it back (without decompression, unless we are forcing
701 			 * PLAIN storage).  If necessary, we'll push it out as a new
702 			 * external value below.
703 			 */
704 			if (VARATT_IS_EXTERNAL(new_value))
705 			{
706 				toast_oldexternal[i] = new_value;
707 				if (att->attstorage == 'p')
708 					new_value = heap_tuple_untoast_attr(new_value);
709 				else
710 					new_value = heap_tuple_fetch_attr(new_value);
711 				toast_values[i] = PointerGetDatum(new_value);
712 				toast_free[i] = true;
713 				need_change = true;
714 				need_free = true;
715 			}
716 
717 			/*
718 			 * Remember the size of this attribute
719 			 */
720 			toast_sizes[i] = VARSIZE_ANY(new_value);
721 		}
722 		else
723 		{
724 			/*
725 			 * Not a varlena attribute, plain storage always
726 			 */
727 			toast_action[i] = 'p';
728 		}
729 	}
730 
731 	/* ----------
732 	 * Compress and/or save external until data fits into target length
733 	 *
734 	 *	1: Inline compress attributes with attstorage 'x', and store very
735 	 *	   large attributes with attstorage 'x' or 'e' external immediately
736 	 *	2: Store attributes with attstorage 'x' or 'e' external
737 	 *	3: Inline compress attributes with attstorage 'm'
738 	 *	4: Store attributes with attstorage 'm' external
739 	 * ----------
740 	 */
741 
742 	/* compute header overhead --- this should match heap_form_tuple() */
743 	hoff = SizeofHeapTupleHeader;
744 	if (has_nulls)
745 		hoff += BITMAPLEN(numAttrs);
746 	hoff = MAXALIGN(hoff);
747 	/* now convert to a limit on the tuple data size */
748 	maxDataLen = RelationGetToastTupleTarget(rel, TOAST_TUPLE_TARGET) - hoff;
749 
750 	/*
751 	 * Look for attributes with attstorage 'x' to compress.  Also find large
752 	 * attributes with attstorage 'x' or 'e', and store them external.
753 	 */
754 	while (heap_compute_data_size(tupleDesc,
755 								  toast_values, toast_isnull) > maxDataLen)
756 	{
757 		int			biggest_attno = -1;
758 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
759 		Datum		old_value;
760 		Datum		new_value;
761 
762 		/*
763 		 * Search for the biggest yet unprocessed internal attribute
764 		 */
765 		for (i = 0; i < numAttrs; i++)
766 		{
767 			Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
768 
769 			if (toast_action[i] != ' ')
770 				continue;
771 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
772 				continue;		/* can't happen, toast_action would be 'p' */
773 			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
774 				continue;
775 			if (att->attstorage != 'x' && att->attstorage != 'e')
776 				continue;
777 			if (toast_sizes[i] > biggest_size)
778 			{
779 				biggest_attno = i;
780 				biggest_size = toast_sizes[i];
781 			}
782 		}
783 
784 		if (biggest_attno < 0)
785 			break;
786 
787 		/*
788 		 * Attempt to compress it inline, if it has attstorage 'x'
789 		 */
790 		i = biggest_attno;
791 		if (TupleDescAttr(tupleDesc, i)->attstorage == 'x')
792 		{
793 			old_value = toast_values[i];
794 			new_value = toast_compress_datum(old_value);
795 
796 			if (DatumGetPointer(new_value) != NULL)
797 			{
798 				/* successful compression */
799 				if (toast_free[i])
800 					pfree(DatumGetPointer(old_value));
801 				toast_values[i] = new_value;
802 				toast_free[i] = true;
803 				toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
804 				need_change = true;
805 				need_free = true;
806 			}
807 			else
808 			{
809 				/* incompressible, ignore on subsequent compression passes */
810 				toast_action[i] = 'x';
811 			}
812 		}
813 		else
814 		{
815 			/* has attstorage 'e', ignore on subsequent compression passes */
816 			toast_action[i] = 'x';
817 		}
818 
819 		/*
820 		 * If this value is by itself more than maxDataLen (after compression
821 		 * if any), push it out to the toast table immediately, if possible.
822 		 * This avoids uselessly compressing other fields in the common case
823 		 * where we have one long field and several short ones.
824 		 *
825 		 * XXX maybe the threshold should be less than maxDataLen?
826 		 */
827 		if (toast_sizes[i] > maxDataLen &&
828 			rel->rd_rel->reltoastrelid != InvalidOid)
829 		{
830 			old_value = toast_values[i];
831 			toast_action[i] = 'p';
832 			toast_values[i] = toast_save_datum(rel, toast_values[i],
833 											   toast_oldexternal[i], options);
834 			if (toast_free[i])
835 				pfree(DatumGetPointer(old_value));
836 			toast_free[i] = true;
837 			need_change = true;
838 			need_free = true;
839 		}
840 	}
841 
842 	/*
843 	 * Second we look for attributes of attstorage 'x' or 'e' that are still
844 	 * inline.  But skip this if there's no toast table to push them to.
845 	 */
846 	while (heap_compute_data_size(tupleDesc,
847 								  toast_values, toast_isnull) > maxDataLen &&
848 		   rel->rd_rel->reltoastrelid != InvalidOid)
849 	{
850 		int			biggest_attno = -1;
851 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
852 		Datum		old_value;
853 
854 		/*------
855 		 * Search for the biggest yet inlined attribute with
856 		 * attstorage equals 'x' or 'e'
857 		 *------
858 		 */
859 		for (i = 0; i < numAttrs; i++)
860 		{
861 			Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
862 
863 			if (toast_action[i] == 'p')
864 				continue;
865 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
866 				continue;		/* can't happen, toast_action would be 'p' */
867 			if (att->attstorage != 'x' && att->attstorage != 'e')
868 				continue;
869 			if (toast_sizes[i] > biggest_size)
870 			{
871 				biggest_attno = i;
872 				biggest_size = toast_sizes[i];
873 			}
874 		}
875 
876 		if (biggest_attno < 0)
877 			break;
878 
879 		/*
880 		 * Store this external
881 		 */
882 		i = biggest_attno;
883 		old_value = toast_values[i];
884 		toast_action[i] = 'p';
885 		toast_values[i] = toast_save_datum(rel, toast_values[i],
886 										   toast_oldexternal[i], options);
887 		if (toast_free[i])
888 			pfree(DatumGetPointer(old_value));
889 		toast_free[i] = true;
890 
891 		need_change = true;
892 		need_free = true;
893 	}
894 
895 	/*
896 	 * Round 3 - this time we take attributes with storage 'm' into
897 	 * compression
898 	 */
899 	while (heap_compute_data_size(tupleDesc,
900 								  toast_values, toast_isnull) > maxDataLen)
901 	{
902 		int			biggest_attno = -1;
903 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
904 		Datum		old_value;
905 		Datum		new_value;
906 
907 		/*
908 		 * Search for the biggest yet uncompressed internal attribute
909 		 */
910 		for (i = 0; i < numAttrs; i++)
911 		{
912 			if (toast_action[i] != ' ')
913 				continue;
914 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
915 				continue;		/* can't happen, toast_action would be 'p' */
916 			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
917 				continue;
918 			if (TupleDescAttr(tupleDesc, i)->attstorage != 'm')
919 				continue;
920 			if (toast_sizes[i] > biggest_size)
921 			{
922 				biggest_attno = i;
923 				biggest_size = toast_sizes[i];
924 			}
925 		}
926 
927 		if (biggest_attno < 0)
928 			break;
929 
930 		/*
931 		 * Attempt to compress it inline
932 		 */
933 		i = biggest_attno;
934 		old_value = toast_values[i];
935 		new_value = toast_compress_datum(old_value);
936 
937 		if (DatumGetPointer(new_value) != NULL)
938 		{
939 			/* successful compression */
940 			if (toast_free[i])
941 				pfree(DatumGetPointer(old_value));
942 			toast_values[i] = new_value;
943 			toast_free[i] = true;
944 			toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
945 			need_change = true;
946 			need_free = true;
947 		}
948 		else
949 		{
950 			/* incompressible, ignore on subsequent compression passes */
951 			toast_action[i] = 'x';
952 		}
953 	}
954 
955 	/*
956 	 * Finally we store attributes of type 'm' externally.  At this point we
957 	 * increase the target tuple size, so that 'm' attributes aren't stored
958 	 * externally unless really necessary.
959 	 */
960 	maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
961 
962 	while (heap_compute_data_size(tupleDesc,
963 								  toast_values, toast_isnull) > maxDataLen &&
964 		   rel->rd_rel->reltoastrelid != InvalidOid)
965 	{
966 		int			biggest_attno = -1;
967 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
968 		Datum		old_value;
969 
970 		/*--------
971 		 * Search for the biggest yet inlined attribute with
972 		 * attstorage = 'm'
973 		 *--------
974 		 */
975 		for (i = 0; i < numAttrs; i++)
976 		{
977 			if (toast_action[i] == 'p')
978 				continue;
979 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
980 				continue;		/* can't happen, toast_action would be 'p' */
981 			if (TupleDescAttr(tupleDesc, i)->attstorage != 'm')
982 				continue;
983 			if (toast_sizes[i] > biggest_size)
984 			{
985 				biggest_attno = i;
986 				biggest_size = toast_sizes[i];
987 			}
988 		}
989 
990 		if (biggest_attno < 0)
991 			break;
992 
993 		/*
994 		 * Store this external
995 		 */
996 		i = biggest_attno;
997 		old_value = toast_values[i];
998 		toast_action[i] = 'p';
999 		toast_values[i] = toast_save_datum(rel, toast_values[i],
1000 										   toast_oldexternal[i], options);
1001 		if (toast_free[i])
1002 			pfree(DatumGetPointer(old_value));
1003 		toast_free[i] = true;
1004 
1005 		need_change = true;
1006 		need_free = true;
1007 	}
1008 
1009 	/*
1010 	 * In the case we toasted any values, we need to build a new heap tuple
1011 	 * with the changed values.
1012 	 */
1013 	if (need_change)
1014 	{
1015 		HeapTupleHeader olddata = newtup->t_data;
1016 		HeapTupleHeader new_data;
1017 		int32		new_header_len;
1018 		int32		new_data_len;
1019 		int32		new_tuple_len;
1020 
1021 		/*
1022 		 * Calculate the new size of the tuple.
1023 		 *
1024 		 * Note: we used to assume here that the old tuple's t_hoff must equal
1025 		 * the new_header_len value, but that was incorrect.  The old tuple
1026 		 * might have a smaller-than-current natts, if there's been an ALTER
1027 		 * TABLE ADD COLUMN since it was stored; and that would lead to a
1028 		 * different conclusion about the size of the null bitmap, or even
1029 		 * whether there needs to be one at all.
1030 		 */
1031 		new_header_len = SizeofHeapTupleHeader;
1032 		if (has_nulls)
1033 			new_header_len += BITMAPLEN(numAttrs);
1034 		new_header_len = MAXALIGN(new_header_len);
1035 		new_data_len = heap_compute_data_size(tupleDesc,
1036 											  toast_values, toast_isnull);
1037 		new_tuple_len = new_header_len + new_data_len;
1038 
1039 		/*
1040 		 * Allocate and zero the space needed, and fill HeapTupleData fields.
1041 		 */
1042 		result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
1043 		result_tuple->t_len = new_tuple_len;
1044 		result_tuple->t_self = newtup->t_self;
1045 		result_tuple->t_tableOid = newtup->t_tableOid;
1046 		new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
1047 		result_tuple->t_data = new_data;
1048 
1049 		/*
1050 		 * Copy the existing tuple header, but adjust natts and t_hoff.
1051 		 */
1052 		memcpy(new_data, olddata, SizeofHeapTupleHeader);
1053 		HeapTupleHeaderSetNatts(new_data, numAttrs);
1054 		new_data->t_hoff = new_header_len;
1055 
1056 		/* Copy over the data, and fill the null bitmap if needed */
1057 		heap_fill_tuple(tupleDesc,
1058 						toast_values,
1059 						toast_isnull,
1060 						(char *) new_data + new_header_len,
1061 						new_data_len,
1062 						&(new_data->t_infomask),
1063 						has_nulls ? new_data->t_bits : NULL);
1064 	}
1065 	else
1066 		result_tuple = newtup;
1067 
1068 	/*
1069 	 * Free allocated temp values
1070 	 */
1071 	if (need_free)
1072 		for (i = 0; i < numAttrs; i++)
1073 			if (toast_free[i])
1074 				pfree(DatumGetPointer(toast_values[i]));
1075 
1076 	/*
1077 	 * Delete external values from the old tuple
1078 	 */
1079 	if (need_delold)
1080 		for (i = 0; i < numAttrs; i++)
1081 			if (toast_delold[i])
1082 				toast_delete_datum(rel, toast_oldvalues[i], false);
1083 
1084 	return result_tuple;
1085 }
1086 
1087 
1088 /* ----------
1089  * toast_flatten_tuple -
1090  *
1091  *	"Flatten" a tuple to contain no out-of-line toasted fields.
1092  *	(This does not eliminate compressed or short-header datums.)
1093  *
1094  *	Note: we expect the caller already checked HeapTupleHasExternal(tup),
1095  *	so there is no need for a short-circuit path.
1096  * ----------
1097  */
1098 HeapTuple
toast_flatten_tuple(HeapTuple tup,TupleDesc tupleDesc)1099 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
1100 {
1101 	HeapTuple	new_tuple;
1102 	int			numAttrs = tupleDesc->natts;
1103 	int			i;
1104 	Datum		toast_values[MaxTupleAttributeNumber];
1105 	bool		toast_isnull[MaxTupleAttributeNumber];
1106 	bool		toast_free[MaxTupleAttributeNumber];
1107 
1108 	/*
1109 	 * Break down the tuple into fields.
1110 	 */
1111 	Assert(numAttrs <= MaxTupleAttributeNumber);
1112 	heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
1113 
1114 	memset(toast_free, 0, numAttrs * sizeof(bool));
1115 
1116 	for (i = 0; i < numAttrs; i++)
1117 	{
1118 		/*
1119 		 * Look at non-null varlena attributes
1120 		 */
1121 		if (!toast_isnull[i] && TupleDescAttr(tupleDesc, i)->attlen == -1)
1122 		{
1123 			struct varlena *new_value;
1124 
1125 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1126 			if (VARATT_IS_EXTERNAL(new_value))
1127 			{
1128 				new_value = heap_tuple_fetch_attr(new_value);
1129 				toast_values[i] = PointerGetDatum(new_value);
1130 				toast_free[i] = true;
1131 			}
1132 		}
1133 	}
1134 
1135 	/*
1136 	 * Form the reconfigured tuple.
1137 	 */
1138 	new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
1139 
1140 	/*
1141 	 * Be sure to copy the tuple's identity fields.  We also make a point of
1142 	 * copying visibility info, just in case anybody looks at those fields in
1143 	 * a syscache entry.
1144 	 */
1145 	new_tuple->t_self = tup->t_self;
1146 	new_tuple->t_tableOid = tup->t_tableOid;
1147 
1148 	new_tuple->t_data->t_choice = tup->t_data->t_choice;
1149 	new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
1150 	new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
1151 	new_tuple->t_data->t_infomask |=
1152 		tup->t_data->t_infomask & HEAP_XACT_MASK;
1153 	new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
1154 	new_tuple->t_data->t_infomask2 |=
1155 		tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1156 
1157 	/*
1158 	 * Free allocated temp values
1159 	 */
1160 	for (i = 0; i < numAttrs; i++)
1161 		if (toast_free[i])
1162 			pfree(DatumGetPointer(toast_values[i]));
1163 
1164 	return new_tuple;
1165 }
1166 
1167 
1168 /* ----------
1169  * toast_flatten_tuple_to_datum -
1170  *
1171  *	"Flatten" a tuple containing out-of-line toasted fields into a Datum.
1172  *	The result is always palloc'd in the current memory context.
1173  *
1174  *	We have a general rule that Datums of container types (rows, arrays,
1175  *	ranges, etc) must not contain any external TOAST pointers.  Without
1176  *	this rule, we'd have to look inside each Datum when preparing a tuple
1177  *	for storage, which would be expensive and would fail to extend cleanly
1178  *	to new sorts of container types.
1179  *
1180  *	However, we don't want to say that tuples represented as HeapTuples
1181  *	can't contain toasted fields, so instead this routine should be called
1182  *	when such a HeapTuple is being converted into a Datum.
1183  *
1184  *	While we're at it, we decompress any compressed fields too.  This is not
1185  *	necessary for correctness, but reflects an expectation that compression
1186  *	will be more effective if applied to the whole tuple not individual
1187  *	fields.  We are not so concerned about that that we want to deconstruct
1188  *	and reconstruct tuples just to get rid of compressed fields, however.
1189  *	So callers typically won't call this unless they see that the tuple has
1190  *	at least one external field.
1191  *
1192  *	On the other hand, in-line short-header varlena fields are left alone.
1193  *	If we "untoasted" them here, they'd just get changed back to short-header
1194  *	format anyway within heap_fill_tuple.
1195  * ----------
1196  */
1197 Datum
toast_flatten_tuple_to_datum(HeapTupleHeader tup,uint32 tup_len,TupleDesc tupleDesc)1198 toast_flatten_tuple_to_datum(HeapTupleHeader tup,
1199 							 uint32 tup_len,
1200 							 TupleDesc tupleDesc)
1201 {
1202 	HeapTupleHeader new_data;
1203 	int32		new_header_len;
1204 	int32		new_data_len;
1205 	int32		new_tuple_len;
1206 	HeapTupleData tmptup;
1207 	int			numAttrs = tupleDesc->natts;
1208 	int			i;
1209 	bool		has_nulls = false;
1210 	Datum		toast_values[MaxTupleAttributeNumber];
1211 	bool		toast_isnull[MaxTupleAttributeNumber];
1212 	bool		toast_free[MaxTupleAttributeNumber];
1213 
1214 	/* Build a temporary HeapTuple control structure */
1215 	tmptup.t_len = tup_len;
1216 	ItemPointerSetInvalid(&(tmptup.t_self));
1217 	tmptup.t_tableOid = InvalidOid;
1218 	tmptup.t_data = tup;
1219 
1220 	/*
1221 	 * Break down the tuple into fields.
1222 	 */
1223 	Assert(numAttrs <= MaxTupleAttributeNumber);
1224 	heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1225 
1226 	memset(toast_free, 0, numAttrs * sizeof(bool));
1227 
1228 	for (i = 0; i < numAttrs; i++)
1229 	{
1230 		/*
1231 		 * Look at non-null varlena attributes
1232 		 */
1233 		if (toast_isnull[i])
1234 			has_nulls = true;
1235 		else if (TupleDescAttr(tupleDesc, i)->attlen == -1)
1236 		{
1237 			struct varlena *new_value;
1238 
1239 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1240 			if (VARATT_IS_EXTERNAL(new_value) ||
1241 				VARATT_IS_COMPRESSED(new_value))
1242 			{
1243 				new_value = heap_tuple_untoast_attr(new_value);
1244 				toast_values[i] = PointerGetDatum(new_value);
1245 				toast_free[i] = true;
1246 			}
1247 		}
1248 	}
1249 
1250 	/*
1251 	 * Calculate the new size of the tuple.
1252 	 *
1253 	 * This should match the reconstruction code in toast_insert_or_update.
1254 	 */
1255 	new_header_len = SizeofHeapTupleHeader;
1256 	if (has_nulls)
1257 		new_header_len += BITMAPLEN(numAttrs);
1258 	new_header_len = MAXALIGN(new_header_len);
1259 	new_data_len = heap_compute_data_size(tupleDesc,
1260 										  toast_values, toast_isnull);
1261 	new_tuple_len = new_header_len + new_data_len;
1262 
1263 	new_data = (HeapTupleHeader) palloc0(new_tuple_len);
1264 
1265 	/*
1266 	 * Copy the existing tuple header, but adjust natts and t_hoff.
1267 	 */
1268 	memcpy(new_data, tup, SizeofHeapTupleHeader);
1269 	HeapTupleHeaderSetNatts(new_data, numAttrs);
1270 	new_data->t_hoff = new_header_len;
1271 
1272 	/* Set the composite-Datum header fields correctly */
1273 	HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
1274 	HeapTupleHeaderSetTypeId(new_data, tupleDesc->tdtypeid);
1275 	HeapTupleHeaderSetTypMod(new_data, tupleDesc->tdtypmod);
1276 
1277 	/* Copy over the data, and fill the null bitmap if needed */
1278 	heap_fill_tuple(tupleDesc,
1279 					toast_values,
1280 					toast_isnull,
1281 					(char *) new_data + new_header_len,
1282 					new_data_len,
1283 					&(new_data->t_infomask),
1284 					has_nulls ? new_data->t_bits : NULL);
1285 
1286 	/*
1287 	 * Free allocated temp values
1288 	 */
1289 	for (i = 0; i < numAttrs; i++)
1290 		if (toast_free[i])
1291 			pfree(DatumGetPointer(toast_values[i]));
1292 
1293 	return PointerGetDatum(new_data);
1294 }
1295 
1296 
1297 /* ----------
1298  * toast_build_flattened_tuple -
1299  *
1300  *	Build a tuple containing no out-of-line toasted fields.
1301  *	(This does not eliminate compressed or short-header datums.)
1302  *
1303  *	This is essentially just like heap_form_tuple, except that it will
1304  *	expand any external-data pointers beforehand.
1305  *
1306  *	It's not very clear whether it would be preferable to decompress
1307  *	in-line compressed datums while at it.  For now, we don't.
1308  * ----------
1309  */
1310 HeapTuple
toast_build_flattened_tuple(TupleDesc tupleDesc,Datum * values,bool * isnull)1311 toast_build_flattened_tuple(TupleDesc tupleDesc,
1312 							Datum *values,
1313 							bool *isnull)
1314 {
1315 	HeapTuple	new_tuple;
1316 	int			numAttrs = tupleDesc->natts;
1317 	int			num_to_free;
1318 	int			i;
1319 	Datum		new_values[MaxTupleAttributeNumber];
1320 	Pointer		freeable_values[MaxTupleAttributeNumber];
1321 
1322 	/*
1323 	 * We can pass the caller's isnull array directly to heap_form_tuple, but
1324 	 * we potentially need to modify the values array.
1325 	 */
1326 	Assert(numAttrs <= MaxTupleAttributeNumber);
1327 	memcpy(new_values, values, numAttrs * sizeof(Datum));
1328 
1329 	num_to_free = 0;
1330 	for (i = 0; i < numAttrs; i++)
1331 	{
1332 		/*
1333 		 * Look at non-null varlena attributes
1334 		 */
1335 		if (!isnull[i] && TupleDescAttr(tupleDesc, i)->attlen == -1)
1336 		{
1337 			struct varlena *new_value;
1338 
1339 			new_value = (struct varlena *) DatumGetPointer(new_values[i]);
1340 			if (VARATT_IS_EXTERNAL(new_value))
1341 			{
1342 				new_value = heap_tuple_fetch_attr(new_value);
1343 				new_values[i] = PointerGetDatum(new_value);
1344 				freeable_values[num_to_free++] = (Pointer) new_value;
1345 			}
1346 		}
1347 	}
1348 
1349 	/*
1350 	 * Form the reconfigured tuple.
1351 	 */
1352 	new_tuple = heap_form_tuple(tupleDesc, new_values, isnull);
1353 
1354 	/*
1355 	 * Free allocated temp values
1356 	 */
1357 	for (i = 0; i < num_to_free; i++)
1358 		pfree(freeable_values[i]);
1359 
1360 	return new_tuple;
1361 }
1362 
1363 
1364 /* ----------
1365  * toast_compress_datum -
1366  *
1367  *	Create a compressed version of a varlena datum
1368  *
1369  *	If we fail (ie, compressed result is actually bigger than original)
1370  *	then return NULL.  We must not use compressed data if it'd expand
1371  *	the tuple!
1372  *
1373  *	We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1374  *	copying them.  But we can't handle external or compressed datums.
1375  * ----------
1376  */
1377 Datum
toast_compress_datum(Datum value)1378 toast_compress_datum(Datum value)
1379 {
1380 	struct varlena *tmp;
1381 	int32		valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1382 	int32		len;
1383 
1384 	Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1385 	Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1386 
1387 	/*
1388 	 * No point in wasting a palloc cycle if value size is out of the allowed
1389 	 * range for compression
1390 	 */
1391 	if (valsize < PGLZ_strategy_default->min_input_size ||
1392 		valsize > PGLZ_strategy_default->max_input_size)
1393 		return PointerGetDatum(NULL);
1394 
1395 	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
1396 									TOAST_COMPRESS_HDRSZ);
1397 
1398 	/*
1399 	 * We recheck the actual size even if pglz_compress() reports success,
1400 	 * because it might be satisfied with having saved as little as one byte
1401 	 * in the compressed data --- which could turn into a net loss once you
1402 	 * consider header and alignment padding.  Worst case, the compressed
1403 	 * format might require three padding bytes (plus header, which is
1404 	 * included in VARSIZE(tmp)), whereas the uncompressed format would take
1405 	 * only one header byte and no padding if the value is short enough.  So
1406 	 * we insist on a savings of more than 2 bytes to ensure we have a gain.
1407 	 */
1408 	len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)),
1409 						valsize,
1410 						TOAST_COMPRESS_RAWDATA(tmp),
1411 						PGLZ_strategy_default);
1412 	if (len >= 0 &&
1413 		len + TOAST_COMPRESS_HDRSZ < valsize - 2)
1414 	{
1415 		TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize);
1416 		SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ);
1417 		/* successful compression */
1418 		return PointerGetDatum(tmp);
1419 	}
1420 	else
1421 	{
1422 		/* incompressible data */
1423 		pfree(tmp);
1424 		return PointerGetDatum(NULL);
1425 	}
1426 }
1427 
1428 
1429 /* ----------
1430  * toast_get_valid_index
1431  *
1432  *	Get OID of valid index associated to given toast relation. A toast
1433  *	relation can have only one valid index at the same time.
1434  */
1435 Oid
toast_get_valid_index(Oid toastoid,LOCKMODE lock)1436 toast_get_valid_index(Oid toastoid, LOCKMODE lock)
1437 {
1438 	int			num_indexes;
1439 	int			validIndex;
1440 	Oid			validIndexOid;
1441 	Relation   *toastidxs;
1442 	Relation	toastrel;
1443 
1444 	/* Open the toast relation */
1445 	toastrel = table_open(toastoid, lock);
1446 
1447 	/* Look for the valid index of the toast relation */
1448 	validIndex = toast_open_indexes(toastrel,
1449 									lock,
1450 									&toastidxs,
1451 									&num_indexes);
1452 	validIndexOid = RelationGetRelid(toastidxs[validIndex]);
1453 
1454 	/* Close the toast relation and all its indexes */
1455 	toast_close_indexes(toastidxs, num_indexes, NoLock);
1456 	table_close(toastrel, NoLock);
1457 
1458 	return validIndexOid;
1459 }
1460 
1461 
1462 /* ----------
1463  * toast_save_datum -
1464  *
1465  *	Save one single datum into the secondary relation and return
1466  *	a Datum reference for it.
1467  *
1468  * rel: the main relation we're working with (not the toast rel!)
1469  * value: datum to be pushed to toast storage
1470  * oldexternal: if not NULL, toast pointer previously representing the datum
1471  * options: options to be passed to heap_insert() for toast rows
1472  * ----------
1473  */
1474 static Datum
toast_save_datum(Relation rel,Datum value,struct varlena * oldexternal,int options)1475 toast_save_datum(Relation rel, Datum value,
1476 				 struct varlena *oldexternal, int options)
1477 {
1478 	Relation	toastrel;
1479 	Relation   *toastidxs;
1480 	HeapTuple	toasttup;
1481 	TupleDesc	toasttupDesc;
1482 	Datum		t_values[3];
1483 	bool		t_isnull[3];
1484 	CommandId	mycid = GetCurrentCommandId(true);
1485 	struct varlena *result;
1486 	struct varatt_external toast_pointer;
1487 	union
1488 	{
1489 		struct varlena hdr;
1490 		/* this is to make the union big enough for a chunk: */
1491 		char		data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ];
1492 		/* ensure union is aligned well enough: */
1493 		int32		align_it;
1494 	}			chunk_data;
1495 	int32		chunk_size;
1496 	int32		chunk_seq = 0;
1497 	char	   *data_p;
1498 	int32		data_todo;
1499 	Pointer		dval = DatumGetPointer(value);
1500 	int			num_indexes;
1501 	int			validIndex;
1502 
1503 	Assert(!VARATT_IS_EXTERNAL(value));
1504 
1505 	/*
1506 	 * Open the toast relation and its indexes.  We can use the index to check
1507 	 * uniqueness of the OID we assign to the toasted item, even though it has
1508 	 * additional columns besides OID.
1509 	 */
1510 	toastrel = table_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1511 	toasttupDesc = toastrel->rd_att;
1512 
1513 	/* Open all the toast indexes and look for the valid one */
1514 	validIndex = toast_open_indexes(toastrel,
1515 									RowExclusiveLock,
1516 									&toastidxs,
1517 									&num_indexes);
1518 
1519 	/*
1520 	 * Get the data pointer and length, and compute va_rawsize and va_extsize.
1521 	 *
1522 	 * va_rawsize is the size of the equivalent fully uncompressed datum, so
1523 	 * we have to adjust for short headers.
1524 	 *
1525 	 * va_extsize is the actual size of the data payload in the toast records.
1526 	 */
1527 	if (VARATT_IS_SHORT(dval))
1528 	{
1529 		data_p = VARDATA_SHORT(dval);
1530 		data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1531 		toast_pointer.va_rawsize = data_todo + VARHDRSZ;	/* as if not short */
1532 		toast_pointer.va_extsize = data_todo;
1533 	}
1534 	else if (VARATT_IS_COMPRESSED(dval))
1535 	{
1536 		data_p = VARDATA(dval);
1537 		data_todo = VARSIZE(dval) - VARHDRSZ;
1538 		/* rawsize in a compressed datum is just the size of the payload */
1539 		toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1540 		toast_pointer.va_extsize = data_todo;
1541 		/* Assert that the numbers look like it's compressed */
1542 		Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1543 	}
1544 	else
1545 	{
1546 		data_p = VARDATA(dval);
1547 		data_todo = VARSIZE(dval) - VARHDRSZ;
1548 		toast_pointer.va_rawsize = VARSIZE(dval);
1549 		toast_pointer.va_extsize = data_todo;
1550 	}
1551 
1552 	/*
1553 	 * Insert the correct table OID into the result TOAST pointer.
1554 	 *
1555 	 * Normally this is the actual OID of the target toast table, but during
1556 	 * table-rewriting operations such as CLUSTER, we have to insert the OID
1557 	 * of the table's real permanent toast table instead.  rd_toastoid is set
1558 	 * if we have to substitute such an OID.
1559 	 */
1560 	if (OidIsValid(rel->rd_toastoid))
1561 		toast_pointer.va_toastrelid = rel->rd_toastoid;
1562 	else
1563 		toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1564 
1565 	/*
1566 	 * Choose an OID to use as the value ID for this toast value.
1567 	 *
1568 	 * Normally we just choose an unused OID within the toast table.  But
1569 	 * during table-rewriting operations where we are preserving an existing
1570 	 * toast table OID, we want to preserve toast value OIDs too.  So, if
1571 	 * rd_toastoid is set and we had a prior external value from that same
1572 	 * toast table, re-use its value ID.  If we didn't have a prior external
1573 	 * value (which is a corner case, but possible if the table's attstorage
1574 	 * options have been changed), we have to pick a value ID that doesn't
1575 	 * conflict with either new or existing toast value OIDs.
1576 	 */
1577 	if (!OidIsValid(rel->rd_toastoid))
1578 	{
1579 		/* normal case: just choose an unused OID */
1580 		toast_pointer.va_valueid =
1581 			GetNewOidWithIndex(toastrel,
1582 							   RelationGetRelid(toastidxs[validIndex]),
1583 							   (AttrNumber) 1);
1584 	}
1585 	else
1586 	{
1587 		/* rewrite case: check to see if value was in old toast table */
1588 		toast_pointer.va_valueid = InvalidOid;
1589 		if (oldexternal != NULL)
1590 		{
1591 			struct varatt_external old_toast_pointer;
1592 
1593 			Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
1594 			/* Must copy to access aligned fields */
1595 			VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1596 			if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1597 			{
1598 				/* This value came from the old toast table; reuse its OID */
1599 				toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1600 
1601 				/*
1602 				 * There is a corner case here: the table rewrite might have
1603 				 * to copy both live and recently-dead versions of a row, and
1604 				 * those versions could easily reference the same toast value.
1605 				 * When we copy the second or later version of such a row,
1606 				 * reusing the OID will mean we select an OID that's already
1607 				 * in the new toast table.  Check for that, and if so, just
1608 				 * fall through without writing the data again.
1609 				 *
1610 				 * While annoying and ugly-looking, this is a good thing
1611 				 * because it ensures that we wind up with only one copy of
1612 				 * the toast value when there is only one copy in the old
1613 				 * toast table.  Before we detected this case, we'd have made
1614 				 * multiple copies, wasting space; and what's worse, the
1615 				 * copies belonging to already-deleted heap tuples would not
1616 				 * be reclaimed by VACUUM.
1617 				 */
1618 				if (toastrel_valueid_exists(toastrel,
1619 											toast_pointer.va_valueid))
1620 				{
1621 					/* Match, so short-circuit the data storage loop below */
1622 					data_todo = 0;
1623 				}
1624 			}
1625 		}
1626 		if (toast_pointer.va_valueid == InvalidOid)
1627 		{
1628 			/*
1629 			 * new value; must choose an OID that doesn't conflict in either
1630 			 * old or new toast table
1631 			 */
1632 			do
1633 			{
1634 				toast_pointer.va_valueid =
1635 					GetNewOidWithIndex(toastrel,
1636 									   RelationGetRelid(toastidxs[validIndex]),
1637 									   (AttrNumber) 1);
1638 			} while (toastid_valueid_exists(rel->rd_toastoid,
1639 											toast_pointer.va_valueid));
1640 		}
1641 	}
1642 
1643 	/*
1644 	 * Initialize constant parts of the tuple data
1645 	 */
1646 	t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1647 	t_values[2] = PointerGetDatum(&chunk_data);
1648 	t_isnull[0] = false;
1649 	t_isnull[1] = false;
1650 	t_isnull[2] = false;
1651 
1652 	/*
1653 	 * Split up the item into chunks
1654 	 */
1655 	while (data_todo > 0)
1656 	{
1657 		int			i;
1658 
1659 		CHECK_FOR_INTERRUPTS();
1660 
1661 		/*
1662 		 * Calculate the size of this chunk
1663 		 */
1664 		chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1665 
1666 		/*
1667 		 * Build a tuple and store it
1668 		 */
1669 		t_values[1] = Int32GetDatum(chunk_seq++);
1670 		SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1671 		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1672 		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1673 
1674 		heap_insert(toastrel, toasttup, mycid, options, NULL);
1675 
1676 		/*
1677 		 * Create the index entry.  We cheat a little here by not using
1678 		 * FormIndexDatum: this relies on the knowledge that the index columns
1679 		 * are the same as the initial columns of the table for all the
1680 		 * indexes.  We also cheat by not providing an IndexInfo: this is okay
1681 		 * for now because btree doesn't need one, but we might have to be
1682 		 * more honest someday.
1683 		 *
1684 		 * Note also that there had better not be any user-created index on
1685 		 * the TOAST table, since we don't bother to update anything else.
1686 		 */
1687 		for (i = 0; i < num_indexes; i++)
1688 		{
1689 			/* Only index relations marked as ready can be updated */
1690 			if (toastidxs[i]->rd_index->indisready)
1691 				index_insert(toastidxs[i], t_values, t_isnull,
1692 							 &(toasttup->t_self),
1693 							 toastrel,
1694 							 toastidxs[i]->rd_index->indisunique ?
1695 							 UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
1696 							 NULL);
1697 		}
1698 
1699 		/*
1700 		 * Free memory
1701 		 */
1702 		heap_freetuple(toasttup);
1703 
1704 		/*
1705 		 * Move on to next chunk
1706 		 */
1707 		data_todo -= chunk_size;
1708 		data_p += chunk_size;
1709 	}
1710 
1711 	/*
1712 	 * Done - close toast relation and its indexes
1713 	 */
1714 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1715 	table_close(toastrel, RowExclusiveLock);
1716 
1717 	/*
1718 	 * Create the TOAST pointer value that we'll return
1719 	 */
1720 	result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1721 	SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
1722 	memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1723 
1724 	return PointerGetDatum(result);
1725 }
1726 
1727 
1728 /* ----------
1729  * toast_delete_datum -
1730  *
1731  *	Delete a single external stored value.
1732  * ----------
1733  */
1734 static void
toast_delete_datum(Relation rel,Datum value,bool is_speculative)1735 toast_delete_datum(Relation rel, Datum value, bool is_speculative)
1736 {
1737 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1738 	struct varatt_external toast_pointer;
1739 	Relation	toastrel;
1740 	Relation   *toastidxs;
1741 	ScanKeyData toastkey;
1742 	SysScanDesc toastscan;
1743 	HeapTuple	toasttup;
1744 	int			num_indexes;
1745 	int			validIndex;
1746 	SnapshotData SnapshotToast;
1747 
1748 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1749 		return;
1750 
1751 	/* Must copy to access aligned fields */
1752 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1753 
1754 	/*
1755 	 * Open the toast relation and its indexes
1756 	 */
1757 	toastrel = table_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1758 
1759 	/* Fetch valid relation used for process */
1760 	validIndex = toast_open_indexes(toastrel,
1761 									RowExclusiveLock,
1762 									&toastidxs,
1763 									&num_indexes);
1764 
1765 	/*
1766 	 * Setup a scan key to find chunks with matching va_valueid
1767 	 */
1768 	ScanKeyInit(&toastkey,
1769 				(AttrNumber) 1,
1770 				BTEqualStrategyNumber, F_OIDEQ,
1771 				ObjectIdGetDatum(toast_pointer.va_valueid));
1772 
1773 	/*
1774 	 * Find all the chunks.  (We don't actually care whether we see them in
1775 	 * sequence or not, but since we've already locked the index we might as
1776 	 * well use systable_beginscan_ordered.)
1777 	 */
1778 	init_toast_snapshot(&SnapshotToast);
1779 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1780 										   &SnapshotToast, 1, &toastkey);
1781 	while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1782 	{
1783 		/*
1784 		 * Have a chunk, delete it
1785 		 */
1786 		if (is_speculative)
1787 			heap_abort_speculative(toastrel, &toasttup->t_self);
1788 		else
1789 			simple_heap_delete(toastrel, &toasttup->t_self);
1790 	}
1791 
1792 	/*
1793 	 * End scan and close relations
1794 	 */
1795 	systable_endscan_ordered(toastscan);
1796 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1797 	table_close(toastrel, RowExclusiveLock);
1798 }
1799 
1800 
1801 /* ----------
1802  * toastrel_valueid_exists -
1803  *
1804  *	Test whether a toast value with the given ID exists in the toast relation.
1805  *	For safety, we consider a value to exist if there are either live or dead
1806  *	toast rows with that ID; see notes for GetNewOidWithIndex().
1807  * ----------
1808  */
1809 static bool
toastrel_valueid_exists(Relation toastrel,Oid valueid)1810 toastrel_valueid_exists(Relation toastrel, Oid valueid)
1811 {
1812 	bool		result = false;
1813 	ScanKeyData toastkey;
1814 	SysScanDesc toastscan;
1815 	int			num_indexes;
1816 	int			validIndex;
1817 	Relation   *toastidxs;
1818 
1819 	/* Fetch a valid index relation */
1820 	validIndex = toast_open_indexes(toastrel,
1821 									RowExclusiveLock,
1822 									&toastidxs,
1823 									&num_indexes);
1824 
1825 	/*
1826 	 * Setup a scan key to find chunks with matching va_valueid
1827 	 */
1828 	ScanKeyInit(&toastkey,
1829 				(AttrNumber) 1,
1830 				BTEqualStrategyNumber, F_OIDEQ,
1831 				ObjectIdGetDatum(valueid));
1832 
1833 	/*
1834 	 * Is there any such chunk?
1835 	 */
1836 	toastscan = systable_beginscan(toastrel,
1837 								   RelationGetRelid(toastidxs[validIndex]),
1838 								   true, SnapshotAny, 1, &toastkey);
1839 
1840 	if (systable_getnext(toastscan) != NULL)
1841 		result = true;
1842 
1843 	systable_endscan(toastscan);
1844 
1845 	/* Clean up */
1846 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1847 
1848 	return result;
1849 }
1850 
1851 /* ----------
1852  * toastid_valueid_exists -
1853  *
1854  *	As above, but work from toast rel's OID not an open relation
1855  * ----------
1856  */
1857 static bool
toastid_valueid_exists(Oid toastrelid,Oid valueid)1858 toastid_valueid_exists(Oid toastrelid, Oid valueid)
1859 {
1860 	bool		result;
1861 	Relation	toastrel;
1862 
1863 	toastrel = table_open(toastrelid, AccessShareLock);
1864 
1865 	result = toastrel_valueid_exists(toastrel, valueid);
1866 
1867 	table_close(toastrel, AccessShareLock);
1868 
1869 	return result;
1870 }
1871 
1872 
1873 /* ----------
1874  * toast_fetch_datum -
1875  *
1876  *	Reconstruct an in memory Datum from the chunks saved
1877  *	in the toast relation
1878  * ----------
1879  */
1880 static struct varlena *
toast_fetch_datum(struct varlena * attr)1881 toast_fetch_datum(struct varlena *attr)
1882 {
1883 	Relation	toastrel;
1884 	Relation   *toastidxs;
1885 	ScanKeyData toastkey;
1886 	SysScanDesc toastscan;
1887 	HeapTuple	ttup;
1888 	TupleDesc	toasttupDesc;
1889 	struct varlena *result;
1890 	struct varatt_external toast_pointer;
1891 	int32		ressize;
1892 	int32		residx,
1893 				nextidx;
1894 	int32		numchunks;
1895 	Pointer		chunk;
1896 	bool		isnull;
1897 	char	   *chunkdata;
1898 	int32		chunksize;
1899 	int			num_indexes;
1900 	int			validIndex;
1901 	SnapshotData SnapshotToast;
1902 
1903 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1904 		elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
1905 
1906 	/* Must copy to access aligned fields */
1907 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1908 
1909 	ressize = toast_pointer.va_extsize;
1910 	numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1911 
1912 	result = (struct varlena *) palloc(ressize + VARHDRSZ);
1913 
1914 	if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1915 		SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1916 	else
1917 		SET_VARSIZE(result, ressize + VARHDRSZ);
1918 
1919 	/*
1920 	 * Open the toast relation and its indexes
1921 	 */
1922 	toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
1923 	toasttupDesc = toastrel->rd_att;
1924 
1925 	/* Look for the valid index of the toast relation */
1926 	validIndex = toast_open_indexes(toastrel,
1927 									AccessShareLock,
1928 									&toastidxs,
1929 									&num_indexes);
1930 
1931 	/*
1932 	 * Setup a scan key to fetch from the index by va_valueid
1933 	 */
1934 	ScanKeyInit(&toastkey,
1935 				(AttrNumber) 1,
1936 				BTEqualStrategyNumber, F_OIDEQ,
1937 				ObjectIdGetDatum(toast_pointer.va_valueid));
1938 
1939 	/*
1940 	 * Read the chunks by index
1941 	 *
1942 	 * Note that because the index is actually on (valueid, chunkidx) we will
1943 	 * see the chunks in chunkidx order, even though we didn't explicitly ask
1944 	 * for it.
1945 	 */
1946 	nextidx = 0;
1947 
1948 	init_toast_snapshot(&SnapshotToast);
1949 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1950 										   &SnapshotToast, 1, &toastkey);
1951 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1952 	{
1953 		/*
1954 		 * Have a chunk, extract the sequence number and the data
1955 		 */
1956 		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1957 		Assert(!isnull);
1958 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1959 		Assert(!isnull);
1960 		if (!VARATT_IS_EXTENDED(chunk))
1961 		{
1962 			chunksize = VARSIZE(chunk) - VARHDRSZ;
1963 			chunkdata = VARDATA(chunk);
1964 		}
1965 		else if (VARATT_IS_SHORT(chunk))
1966 		{
1967 			/* could happen due to heap_form_tuple doing its thing */
1968 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1969 			chunkdata = VARDATA_SHORT(chunk);
1970 		}
1971 		else
1972 		{
1973 			/* should never happen */
1974 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1975 				 toast_pointer.va_valueid,
1976 				 RelationGetRelationName(toastrel));
1977 			chunksize = 0;		/* keep compiler quiet */
1978 			chunkdata = NULL;
1979 		}
1980 
1981 		/*
1982 		 * Some checks on the data we've found
1983 		 */
1984 		if (residx != nextidx)
1985 			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1986 				 residx, nextidx,
1987 				 toast_pointer.va_valueid,
1988 				 RelationGetRelationName(toastrel));
1989 		if (residx < numchunks - 1)
1990 		{
1991 			if (chunksize != TOAST_MAX_CHUNK_SIZE)
1992 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1993 					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1994 					 residx, numchunks,
1995 					 toast_pointer.va_valueid,
1996 					 RelationGetRelationName(toastrel));
1997 		}
1998 		else if (residx == numchunks - 1)
1999 		{
2000 			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
2001 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
2002 					 chunksize,
2003 					 (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
2004 					 residx,
2005 					 toast_pointer.va_valueid,
2006 					 RelationGetRelationName(toastrel));
2007 		}
2008 		else
2009 			elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2010 				 residx,
2011 				 0, numchunks - 1,
2012 				 toast_pointer.va_valueid,
2013 				 RelationGetRelationName(toastrel));
2014 
2015 		/*
2016 		 * Copy the data into proper place in our result
2017 		 */
2018 		memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
2019 			   chunkdata,
2020 			   chunksize);
2021 
2022 		nextidx++;
2023 	}
2024 
2025 	/*
2026 	 * Final checks that we successfully fetched the datum
2027 	 */
2028 	if (nextidx != numchunks)
2029 		elog(ERROR, "missing chunk number %d for toast value %u in %s",
2030 			 nextidx,
2031 			 toast_pointer.va_valueid,
2032 			 RelationGetRelationName(toastrel));
2033 
2034 	/*
2035 	 * End scan and close relations
2036 	 */
2037 	systable_endscan_ordered(toastscan);
2038 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2039 	table_close(toastrel, AccessShareLock);
2040 
2041 	return result;
2042 }
2043 
2044 /* ----------
2045  * toast_fetch_datum_slice -
2046  *
2047  *	Reconstruct a segment of a Datum from the chunks saved
2048  *	in the toast relation
2049  *
2050  *	Note that this function only supports non-compressed external datums.
2051  * ----------
2052  */
2053 static struct varlena *
toast_fetch_datum_slice(struct varlena * attr,int32 sliceoffset,int32 length)2054 toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, int32 length)
2055 {
2056 	Relation	toastrel;
2057 	Relation   *toastidxs;
2058 	ScanKeyData toastkey[3];
2059 	int			nscankeys;
2060 	SysScanDesc toastscan;
2061 	HeapTuple	ttup;
2062 	TupleDesc	toasttupDesc;
2063 	struct varlena *result;
2064 	struct varatt_external toast_pointer;
2065 	int32		attrsize;
2066 	int32		residx;
2067 	int32		nextidx;
2068 	int			numchunks;
2069 	int			startchunk;
2070 	int			endchunk;
2071 	int32		startoffset;
2072 	int32		endoffset;
2073 	int			totalchunks;
2074 	Pointer		chunk;
2075 	bool		isnull;
2076 	char	   *chunkdata;
2077 	int32		chunksize;
2078 	int32		chcpystrt;
2079 	int32		chcpyend;
2080 	int			num_indexes;
2081 	int			validIndex;
2082 	SnapshotData SnapshotToast;
2083 
2084 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
2085 		elog(ERROR, "toast_fetch_datum_slice shouldn't be called for non-ondisk datums");
2086 
2087 	/* Must copy to access aligned fields */
2088 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
2089 
2090 	/*
2091 	 * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
2092 	 * we can't return a compressed datum which is meaningful to toast later
2093 	 */
2094 	Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
2095 
2096 	attrsize = toast_pointer.va_extsize;
2097 	totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
2098 
2099 	if (sliceoffset >= attrsize)
2100 	{
2101 		sliceoffset = 0;
2102 		length = 0;
2103 	}
2104 
2105 	/*
2106 	 * Adjust length request if needed.  (Note: our sole caller,
2107 	 * heap_tuple_untoast_attr_slice, protects us against sliceoffset + length
2108 	 * overflowing.)
2109 	 */
2110 	else if (((sliceoffset + length) > attrsize) || length < 0)
2111 		length = attrsize - sliceoffset;
2112 
2113 	result = (struct varlena *) palloc(length + VARHDRSZ);
2114 
2115 	SET_VARSIZE(result, length + VARHDRSZ);
2116 
2117 	if (length == 0)
2118 		return result;			/* Can save a lot of work at this point! */
2119 
2120 	startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
2121 	endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
2122 	numchunks = (endchunk - startchunk) + 1;
2123 
2124 	startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
2125 	endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
2126 
2127 	/*
2128 	 * Open the toast relation and its indexes
2129 	 */
2130 	toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
2131 	toasttupDesc = toastrel->rd_att;
2132 
2133 	/* Look for the valid index of toast relation */
2134 	validIndex = toast_open_indexes(toastrel,
2135 									AccessShareLock,
2136 									&toastidxs,
2137 									&num_indexes);
2138 
2139 	/*
2140 	 * Setup a scan key to fetch from the index. This is either two keys or
2141 	 * three depending on the number of chunks.
2142 	 */
2143 	ScanKeyInit(&toastkey[0],
2144 				(AttrNumber) 1,
2145 				BTEqualStrategyNumber, F_OIDEQ,
2146 				ObjectIdGetDatum(toast_pointer.va_valueid));
2147 
2148 	/*
2149 	 * Use equality condition for one chunk, a range condition otherwise:
2150 	 */
2151 	if (numchunks == 1)
2152 	{
2153 		ScanKeyInit(&toastkey[1],
2154 					(AttrNumber) 2,
2155 					BTEqualStrategyNumber, F_INT4EQ,
2156 					Int32GetDatum(startchunk));
2157 		nscankeys = 2;
2158 	}
2159 	else
2160 	{
2161 		ScanKeyInit(&toastkey[1],
2162 					(AttrNumber) 2,
2163 					BTGreaterEqualStrategyNumber, F_INT4GE,
2164 					Int32GetDatum(startchunk));
2165 		ScanKeyInit(&toastkey[2],
2166 					(AttrNumber) 2,
2167 					BTLessEqualStrategyNumber, F_INT4LE,
2168 					Int32GetDatum(endchunk));
2169 		nscankeys = 3;
2170 	}
2171 
2172 	/*
2173 	 * Read the chunks by index
2174 	 *
2175 	 * The index is on (valueid, chunkidx) so they will come in order
2176 	 */
2177 	init_toast_snapshot(&SnapshotToast);
2178 	nextidx = startchunk;
2179 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
2180 										   &SnapshotToast, nscankeys, toastkey);
2181 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
2182 	{
2183 		/*
2184 		 * Have a chunk, extract the sequence number and the data
2185 		 */
2186 		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
2187 		Assert(!isnull);
2188 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
2189 		Assert(!isnull);
2190 		if (!VARATT_IS_EXTENDED(chunk))
2191 		{
2192 			chunksize = VARSIZE(chunk) - VARHDRSZ;
2193 			chunkdata = VARDATA(chunk);
2194 		}
2195 		else if (VARATT_IS_SHORT(chunk))
2196 		{
2197 			/* could happen due to heap_form_tuple doing its thing */
2198 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2199 			chunkdata = VARDATA_SHORT(chunk);
2200 		}
2201 		else
2202 		{
2203 			/* should never happen */
2204 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
2205 				 toast_pointer.va_valueid,
2206 				 RelationGetRelationName(toastrel));
2207 			chunksize = 0;		/* keep compiler quiet */
2208 			chunkdata = NULL;
2209 		}
2210 
2211 		/*
2212 		 * Some checks on the data we've found
2213 		 */
2214 		if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
2215 			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
2216 				 residx, nextidx,
2217 				 toast_pointer.va_valueid,
2218 				 RelationGetRelationName(toastrel));
2219 		if (residx < totalchunks - 1)
2220 		{
2221 			if (chunksize != TOAST_MAX_CHUNK_SIZE)
2222 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
2223 					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
2224 					 residx, totalchunks,
2225 					 toast_pointer.va_valueid,
2226 					 RelationGetRelationName(toastrel));
2227 		}
2228 		else if (residx == totalchunks - 1)
2229 		{
2230 			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
2231 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
2232 					 chunksize,
2233 					 (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
2234 					 residx,
2235 					 toast_pointer.va_valueid,
2236 					 RelationGetRelationName(toastrel));
2237 		}
2238 		else
2239 			elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2240 				 residx,
2241 				 0, totalchunks - 1,
2242 				 toast_pointer.va_valueid,
2243 				 RelationGetRelationName(toastrel));
2244 
2245 		/*
2246 		 * Copy the data into proper place in our result
2247 		 */
2248 		chcpystrt = 0;
2249 		chcpyend = chunksize - 1;
2250 		if (residx == startchunk)
2251 			chcpystrt = startoffset;
2252 		if (residx == endchunk)
2253 			chcpyend = endoffset;
2254 
2255 		memcpy(VARDATA(result) +
2256 			   (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
2257 			   chunkdata + chcpystrt,
2258 			   (chcpyend - chcpystrt) + 1);
2259 
2260 		nextidx++;
2261 	}
2262 
2263 	/*
2264 	 * Final checks that we successfully fetched the datum
2265 	 */
2266 	if (nextidx != (endchunk + 1))
2267 		elog(ERROR, "missing chunk number %d for toast value %u in %s",
2268 			 nextidx,
2269 			 toast_pointer.va_valueid,
2270 			 RelationGetRelationName(toastrel));
2271 
2272 	/*
2273 	 * End scan and close relations
2274 	 */
2275 	systable_endscan_ordered(toastscan);
2276 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2277 	table_close(toastrel, AccessShareLock);
2278 
2279 	return result;
2280 }
2281 
2282 /* ----------
2283  * toast_decompress_datum -
2284  *
2285  * Decompress a compressed version of a varlena datum
2286  */
2287 static struct varlena *
toast_decompress_datum(struct varlena * attr)2288 toast_decompress_datum(struct varlena *attr)
2289 {
2290 	struct varlena *result;
2291 
2292 	Assert(VARATT_IS_COMPRESSED(attr));
2293 
2294 	result = (struct varlena *)
2295 		palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2296 	SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2297 
2298 	if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
2299 						VARSIZE(attr) - TOAST_COMPRESS_HDRSZ,
2300 						VARDATA(result),
2301 						TOAST_COMPRESS_RAWSIZE(attr), true) < 0)
2302 		elog(ERROR, "compressed data is corrupted");
2303 
2304 	return result;
2305 }
2306 
2307 
2308 /* ----------
2309  * toast_decompress_datum_slice -
2310  *
2311  * Decompress the front of a compressed version of a varlena datum.
2312  * offset handling happens in heap_tuple_untoast_attr_slice.
2313  * Here we just decompress a slice from the front.
2314  */
2315 static struct varlena *
toast_decompress_datum_slice(struct varlena * attr,int32 slicelength)2316 toast_decompress_datum_slice(struct varlena *attr, int32 slicelength)
2317 {
2318 	struct varlena *result;
2319 	int32		rawsize;
2320 
2321 	Assert(VARATT_IS_COMPRESSED(attr));
2322 
2323 	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
2324 
2325 	rawsize = pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
2326 							  VARSIZE(attr) - TOAST_COMPRESS_HDRSZ,
2327 							  VARDATA(result),
2328 							  slicelength, false);
2329 	if (rawsize < 0)
2330 		elog(ERROR, "compressed data is corrupted");
2331 
2332 	SET_VARSIZE(result, rawsize + VARHDRSZ);
2333 	return result;
2334 }
2335 
2336 
2337 /* ----------
2338  * toast_open_indexes
2339  *
2340  *	Get an array of the indexes associated to the given toast relation
2341  *	and return as well the position of the valid index used by the toast
2342  *	relation in this array. It is the responsibility of the caller of this
2343  *	function to close the indexes as well as free them.
2344  */
2345 static int
toast_open_indexes(Relation toastrel,LOCKMODE lock,Relation ** toastidxs,int * num_indexes)2346 toast_open_indexes(Relation toastrel,
2347 				   LOCKMODE lock,
2348 				   Relation **toastidxs,
2349 				   int *num_indexes)
2350 {
2351 	int			i = 0;
2352 	int			res = 0;
2353 	bool		found = false;
2354 	List	   *indexlist;
2355 	ListCell   *lc;
2356 
2357 	/* Get index list of the toast relation */
2358 	indexlist = RelationGetIndexList(toastrel);
2359 	Assert(indexlist != NIL);
2360 
2361 	*num_indexes = list_length(indexlist);
2362 
2363 	/* Open all the index relations */
2364 	*toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation));
2365 	foreach(lc, indexlist)
2366 		(*toastidxs)[i++] = index_open(lfirst_oid(lc), lock);
2367 
2368 	/* Fetch the first valid index in list */
2369 	for (i = 0; i < *num_indexes; i++)
2370 	{
2371 		Relation	toastidx = (*toastidxs)[i];
2372 
2373 		if (toastidx->rd_index->indisvalid)
2374 		{
2375 			res = i;
2376 			found = true;
2377 			break;
2378 		}
2379 	}
2380 
2381 	/*
2382 	 * Free index list, not necessary anymore as relations are opened and a
2383 	 * valid index has been found.
2384 	 */
2385 	list_free(indexlist);
2386 
2387 	/*
2388 	 * The toast relation should have one valid index, so something is going
2389 	 * wrong if there is nothing.
2390 	 */
2391 	if (!found)
2392 		elog(ERROR, "no valid index found for toast relation with Oid %u",
2393 			 RelationGetRelid(toastrel));
2394 
2395 	return res;
2396 }
2397 
2398 /* ----------
2399  * toast_close_indexes
2400  *
2401  *	Close an array of indexes for a toast relation and free it. This should
2402  *	be called for a set of indexes opened previously with toast_open_indexes.
2403  */
2404 static void
toast_close_indexes(Relation * toastidxs,int num_indexes,LOCKMODE lock)2405 toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock)
2406 {
2407 	int			i;
2408 
2409 	/* Close relations and clean up things */
2410 	for (i = 0; i < num_indexes; i++)
2411 		index_close(toastidxs[i], lock);
2412 	pfree(toastidxs);
2413 }
2414 
2415 /* ----------
2416  * init_toast_snapshot
2417  *
2418  *	Initialize an appropriate TOAST snapshot.  We must use an MVCC snapshot
2419  *	to initialize the TOAST snapshot; since we don't know which one to use,
2420  *	just use the oldest one.  This is safe: at worst, we will get a "snapshot
2421  *	too old" error that might have been avoided otherwise.
2422  */
2423 static void
init_toast_snapshot(Snapshot toast_snapshot)2424 init_toast_snapshot(Snapshot toast_snapshot)
2425 {
2426 	Snapshot	snapshot = GetOldestSnapshot();
2427 
2428 	/*
2429 	 * GetOldestSnapshot returns NULL if the session has no active snapshots.
2430 	 * We can get that if, for example, a procedure fetches a toasted value
2431 	 * into a local variable, commits, and then tries to detoast the value.
2432 	 * Such coding is unsafe, because once we commit there is nothing to
2433 	 * prevent the toast data from being deleted.  Detoasting *must* happen in
2434 	 * the same transaction that originally fetched the toast pointer.  Hence,
2435 	 * rather than trying to band-aid over the problem, throw an error.  (This
2436 	 * is not very much protection, because in many scenarios the procedure
2437 	 * would have already created a new transaction snapshot, preventing us
2438 	 * from detecting the problem.  But it's better than nothing, and for sure
2439 	 * we shouldn't expend code on masking the problem more.)
2440 	 */
2441 	if (snapshot == NULL)
2442 		elog(ERROR, "cannot fetch toast data without an active snapshot");
2443 
2444 	InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken);
2445 }
2446