1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *	  Support routines for external and compressed storage of
5  *	  variable size attributes.
6  *
7  * Copyright (c) 2000-2018, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *		toast_insert_or_update -
16  *			Try to make a given tuple fit into one page by compressing
17  *			or moving off attributes
18  *
19  *		toast_delete -
20  *			Reclaim toast storage when a tuple is deleted
21  *
22  *		heap_tuple_untoast_attr -
23  *			Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27 
28 #include "postgres.h"
29 
30 #include <unistd.h>
31 #include <fcntl.h>
32 
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "common/int.h"
39 #include "common/pg_lzcompress.h"
40 #include "miscadmin.h"
41 #include "utils/expandeddatum.h"
42 #include "utils/fmgroids.h"
43 #include "utils/rel.h"
44 #include "utils/snapmgr.h"
45 #include "utils/typcache.h"
46 #include "utils/tqual.h"
47 
48 
49 #undef TOAST_DEBUG
50 
51 /*
52  *	The information at the start of the compressed toast data.
53  */
54 typedef struct toast_compress_header
55 {
56 	int32		vl_len_;		/* varlena header (do not touch directly!) */
57 	int32		rawsize;
58 } toast_compress_header;
59 
60 /*
61  * Utilities for manipulation of header information for compressed
62  * toast entries.
63  */
64 #define TOAST_COMPRESS_HDRSZ		((int32) sizeof(toast_compress_header))
65 #define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->rawsize)
66 #define TOAST_COMPRESS_RAWDATA(ptr) \
67 	(((char *) (ptr)) + TOAST_COMPRESS_HDRSZ)
68 #define TOAST_COMPRESS_SET_RAWSIZE(ptr, len) \
69 	(((toast_compress_header *) (ptr))->rawsize = (len))
70 
71 static void toast_delete_datum(Relation rel, Datum value, bool is_speculative);
72 static Datum toast_save_datum(Relation rel, Datum value,
73 				 struct varlena *oldexternal, int options);
74 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
75 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
76 static struct varlena *toast_fetch_datum(struct varlena *attr);
77 static struct varlena *toast_fetch_datum_slice(struct varlena *attr,
78 						int32 sliceoffset, int32 length);
79 static struct varlena *toast_decompress_datum(struct varlena *attr);
80 static int toast_open_indexes(Relation toastrel,
81 				   LOCKMODE lock,
82 				   Relation **toastidxs,
83 				   int *num_indexes);
84 static void toast_close_indexes(Relation *toastidxs, int num_indexes,
85 					LOCKMODE lock);
86 static void init_toast_snapshot(Snapshot toast_snapshot);
87 
88 
89 /* ----------
90  * heap_tuple_fetch_attr -
91  *
92  *	Public entry point to get back a toasted value from
93  *	external source (possibly still in compressed format).
94  *
95  * This will return a datum that contains all the data internally, ie, not
96  * relying on external storage or memory, but it can still be compressed or
97  * have a short header.  Note some callers assume that if the input is an
98  * EXTERNAL datum, the result will be a pfree'able chunk.
99  * ----------
100  */
101 struct varlena *
heap_tuple_fetch_attr(struct varlena * attr)102 heap_tuple_fetch_attr(struct varlena *attr)
103 {
104 	struct varlena *result;
105 
106 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
107 	{
108 		/*
109 		 * This is an external stored plain value
110 		 */
111 		result = toast_fetch_datum(attr);
112 	}
113 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
114 	{
115 		/*
116 		 * This is an indirect pointer --- dereference it
117 		 */
118 		struct varatt_indirect redirect;
119 
120 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
121 		attr = (struct varlena *) redirect.pointer;
122 
123 		/* nested indirect Datums aren't allowed */
124 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
125 
126 		/* recurse if value is still external in some other way */
127 		if (VARATT_IS_EXTERNAL(attr))
128 			return heap_tuple_fetch_attr(attr);
129 
130 		/*
131 		 * Copy into the caller's memory context, in case caller tries to
132 		 * pfree the result.
133 		 */
134 		result = (struct varlena *) palloc(VARSIZE_ANY(attr));
135 		memcpy(result, attr, VARSIZE_ANY(attr));
136 	}
137 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
138 	{
139 		/*
140 		 * This is an expanded-object pointer --- get flat format
141 		 */
142 		ExpandedObjectHeader *eoh;
143 		Size		resultsize;
144 
145 		eoh = DatumGetEOHP(PointerGetDatum(attr));
146 		resultsize = EOH_get_flat_size(eoh);
147 		result = (struct varlena *) palloc(resultsize);
148 		EOH_flatten_into(eoh, (void *) result, resultsize);
149 	}
150 	else
151 	{
152 		/*
153 		 * This is a plain value inside of the main tuple - why am I called?
154 		 */
155 		result = attr;
156 	}
157 
158 	return result;
159 }
160 
161 
162 /* ----------
163  * heap_tuple_untoast_attr -
164  *
165  *	Public entry point to get back a toasted value from compression
166  *	or external storage.  The result is always non-extended varlena form.
167  *
168  * Note some callers assume that if the input is an EXTERNAL or COMPRESSED
169  * datum, the result will be a pfree'able chunk.
170  * ----------
171  */
172 struct varlena *
heap_tuple_untoast_attr(struct varlena * attr)173 heap_tuple_untoast_attr(struct varlena *attr)
174 {
175 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
176 	{
177 		/*
178 		 * This is an externally stored datum --- fetch it back from there
179 		 */
180 		attr = toast_fetch_datum(attr);
181 		/* If it's compressed, decompress it */
182 		if (VARATT_IS_COMPRESSED(attr))
183 		{
184 			struct varlena *tmp = attr;
185 
186 			attr = toast_decompress_datum(tmp);
187 			pfree(tmp);
188 		}
189 	}
190 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
191 	{
192 		/*
193 		 * This is an indirect pointer --- dereference it
194 		 */
195 		struct varatt_indirect redirect;
196 
197 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
198 		attr = (struct varlena *) redirect.pointer;
199 
200 		/* nested indirect Datums aren't allowed */
201 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
202 
203 		/* recurse in case value is still extended in some other way */
204 		attr = heap_tuple_untoast_attr(attr);
205 
206 		/* if it isn't, we'd better copy it */
207 		if (attr == (struct varlena *) redirect.pointer)
208 		{
209 			struct varlena *result;
210 
211 			result = (struct varlena *) palloc(VARSIZE_ANY(attr));
212 			memcpy(result, attr, VARSIZE_ANY(attr));
213 			attr = result;
214 		}
215 	}
216 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
217 	{
218 		/*
219 		 * This is an expanded-object pointer --- get flat format
220 		 */
221 		attr = heap_tuple_fetch_attr(attr);
222 		/* flatteners are not allowed to produce compressed/short output */
223 		Assert(!VARATT_IS_EXTENDED(attr));
224 	}
225 	else if (VARATT_IS_COMPRESSED(attr))
226 	{
227 		/*
228 		 * This is a compressed value inside of the main tuple
229 		 */
230 		attr = toast_decompress_datum(attr);
231 	}
232 	else if (VARATT_IS_SHORT(attr))
233 	{
234 		/*
235 		 * This is a short-header varlena --- convert to 4-byte header format
236 		 */
237 		Size		data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
238 		Size		new_size = data_size + VARHDRSZ;
239 		struct varlena *new_attr;
240 
241 		new_attr = (struct varlena *) palloc(new_size);
242 		SET_VARSIZE(new_attr, new_size);
243 		memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
244 		attr = new_attr;
245 	}
246 
247 	return attr;
248 }
249 
250 
251 /* ----------
252  * heap_tuple_untoast_attr_slice -
253  *
254  *		Public entry point to get back part of a toasted value
255  *		from compression or external storage.
256  *
257  * sliceoffset is where to start (zero or more)
258  * If slicelength < 0, return everything beyond sliceoffset
259  * ----------
260  */
261 struct varlena *
heap_tuple_untoast_attr_slice(struct varlena * attr,int32 sliceoffset,int32 slicelength)262 heap_tuple_untoast_attr_slice(struct varlena *attr,
263 							  int32 sliceoffset, int32 slicelength)
264 {
265 	struct varlena *preslice;
266 	struct varlena *result;
267 	char	   *attrdata;
268 	int32		slicelimit;
269 	int32		attrsize;
270 
271 	if (sliceoffset < 0)
272 		elog(ERROR, "invalid sliceoffset: %d", sliceoffset);
273 
274 	/*
275 	 * Compute slicelimit = offset + length, or -1 if we must fetch all of the
276 	 * value.  In case of integer overflow, we must fetch all.
277 	 */
278 	if (slicelength < 0)
279 		slicelimit = -1;
280 	else if (pg_add_s32_overflow(sliceoffset, slicelength, &slicelimit))
281 		slicelength = slicelimit = -1;
282 
283 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
284 	{
285 		struct varatt_external toast_pointer;
286 
287 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
288 
289 		/* fast path for non-compressed external datums */
290 		if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
291 			return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
292 
293 		/* fetch it back (compressed marker will get set automatically) */
294 		preslice = toast_fetch_datum(attr);
295 	}
296 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
297 	{
298 		struct varatt_indirect redirect;
299 
300 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
301 
302 		/* nested indirect Datums aren't allowed */
303 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
304 
305 		return heap_tuple_untoast_attr_slice(redirect.pointer,
306 											 sliceoffset, slicelength);
307 	}
308 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
309 	{
310 		/* pass it off to heap_tuple_fetch_attr to flatten */
311 		preslice = heap_tuple_fetch_attr(attr);
312 	}
313 	else
314 		preslice = attr;
315 
316 	Assert(!VARATT_IS_EXTERNAL(preslice));
317 
318 	if (VARATT_IS_COMPRESSED(preslice))
319 	{
320 		struct varlena *tmp = preslice;
321 
322 		preslice = toast_decompress_datum(tmp);
323 
324 		if (tmp != attr)
325 			pfree(tmp);
326 	}
327 
328 	if (VARATT_IS_SHORT(preslice))
329 	{
330 		attrdata = VARDATA_SHORT(preslice);
331 		attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
332 	}
333 	else
334 	{
335 		attrdata = VARDATA(preslice);
336 		attrsize = VARSIZE(preslice) - VARHDRSZ;
337 	}
338 
339 	/* slicing of datum for compressed cases and plain value */
340 
341 	if (sliceoffset >= attrsize)
342 	{
343 		sliceoffset = 0;
344 		slicelength = 0;
345 	}
346 	else if (slicelength < 0 || slicelimit > attrsize)
347 		slicelength = attrsize - sliceoffset;
348 
349 	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
350 	SET_VARSIZE(result, slicelength + VARHDRSZ);
351 
352 	memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
353 
354 	if (preslice != attr)
355 		pfree(preslice);
356 
357 	return result;
358 }
359 
360 
361 /* ----------
362  * toast_raw_datum_size -
363  *
364  *	Return the raw (detoasted) size of a varlena datum
365  *	(including the VARHDRSZ header)
366  * ----------
367  */
368 Size
toast_raw_datum_size(Datum value)369 toast_raw_datum_size(Datum value)
370 {
371 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
372 	Size		result;
373 
374 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
375 	{
376 		/* va_rawsize is the size of the original datum -- including header */
377 		struct varatt_external toast_pointer;
378 
379 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
380 		result = toast_pointer.va_rawsize;
381 	}
382 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
383 	{
384 		struct varatt_indirect toast_pointer;
385 
386 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
387 
388 		/* nested indirect Datums aren't allowed */
389 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
390 
391 		return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
392 	}
393 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
394 	{
395 		result = EOH_get_flat_size(DatumGetEOHP(value));
396 	}
397 	else if (VARATT_IS_COMPRESSED(attr))
398 	{
399 		/* here, va_rawsize is just the payload size */
400 		result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
401 	}
402 	else if (VARATT_IS_SHORT(attr))
403 	{
404 		/*
405 		 * we have to normalize the header length to VARHDRSZ or else the
406 		 * callers of this function will be confused.
407 		 */
408 		result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
409 	}
410 	else
411 	{
412 		/* plain untoasted datum */
413 		result = VARSIZE(attr);
414 	}
415 	return result;
416 }
417 
418 /* ----------
419  * toast_datum_size
420  *
421  *	Return the physical storage size (possibly compressed) of a varlena datum
422  * ----------
423  */
424 Size
toast_datum_size(Datum value)425 toast_datum_size(Datum value)
426 {
427 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
428 	Size		result;
429 
430 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
431 	{
432 		/*
433 		 * Attribute is stored externally - return the extsize whether
434 		 * compressed or not.  We do not count the size of the toast pointer
435 		 * ... should we?
436 		 */
437 		struct varatt_external toast_pointer;
438 
439 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
440 		result = toast_pointer.va_extsize;
441 	}
442 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
443 	{
444 		struct varatt_indirect toast_pointer;
445 
446 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
447 
448 		/* nested indirect Datums aren't allowed */
449 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
450 
451 		return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
452 	}
453 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
454 	{
455 		result = EOH_get_flat_size(DatumGetEOHP(value));
456 	}
457 	else if (VARATT_IS_SHORT(attr))
458 	{
459 		result = VARSIZE_SHORT(attr);
460 	}
461 	else
462 	{
463 		/*
464 		 * Attribute is stored inline either compressed or not, just calculate
465 		 * the size of the datum in either case.
466 		 */
467 		result = VARSIZE(attr);
468 	}
469 	return result;
470 }
471 
472 
473 /* ----------
474  * toast_delete -
475  *
476  *	Cascaded delete toast-entries on DELETE
477  * ----------
478  */
479 void
toast_delete(Relation rel,HeapTuple oldtup,bool is_speculative)480 toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
481 {
482 	TupleDesc	tupleDesc;
483 	int			numAttrs;
484 	int			i;
485 	Datum		toast_values[MaxHeapAttributeNumber];
486 	bool		toast_isnull[MaxHeapAttributeNumber];
487 
488 	/*
489 	 * We should only ever be called for tuples of plain relations or
490 	 * materialized views --- recursing on a toast rel is bad news.
491 	 */
492 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
493 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
494 
495 	/*
496 	 * Get the tuple descriptor and break down the tuple into fields.
497 	 *
498 	 * NOTE: it's debatable whether to use heap_deform_tuple() here or just
499 	 * heap_getattr() only the varlena columns.  The latter could win if there
500 	 * are few varlena columns and many non-varlena ones. However,
501 	 * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
502 	 * O(N^2) if there are many varlena columns, so it seems better to err on
503 	 * the side of linear cost.  (We won't even be here unless there's at
504 	 * least one varlena column, by the way.)
505 	 */
506 	tupleDesc = rel->rd_att;
507 	numAttrs = tupleDesc->natts;
508 
509 	Assert(numAttrs <= MaxHeapAttributeNumber);
510 	heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
511 
512 	/*
513 	 * Check for external stored attributes and delete them from the secondary
514 	 * relation.
515 	 */
516 	for (i = 0; i < numAttrs; i++)
517 	{
518 		if (TupleDescAttr(tupleDesc, i)->attlen == -1)
519 		{
520 			Datum		value = toast_values[i];
521 
522 			if (toast_isnull[i])
523 				continue;
524 			else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
525 				toast_delete_datum(rel, value, is_speculative);
526 		}
527 	}
528 }
529 
530 
531 /* ----------
532  * toast_insert_or_update -
533  *
534  *	Delete no-longer-used toast-entries and create new ones to
535  *	make the new tuple fit on INSERT or UPDATE
536  *
537  * Inputs:
538  *	newtup: the candidate new tuple to be inserted
539  *	oldtup: the old row version for UPDATE, or NULL for INSERT
540  *	options: options to be passed to heap_insert() for toast rows
541  * Result:
542  *	either newtup if no toasting is needed, or a palloc'd modified tuple
543  *	that is what should actually get stored
544  *
545  * NOTE: neither newtup nor oldtup will be modified.  This is a change
546  * from the pre-8.1 API of this routine.
547  * ----------
548  */
549 HeapTuple
toast_insert_or_update(Relation rel,HeapTuple newtup,HeapTuple oldtup,int options)550 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
551 					   int options)
552 {
553 	HeapTuple	result_tuple;
554 	TupleDesc	tupleDesc;
555 	int			numAttrs;
556 	int			i;
557 
558 	bool		need_change = false;
559 	bool		need_free = false;
560 	bool		need_delold = false;
561 	bool		has_nulls = false;
562 
563 	Size		maxDataLen;
564 	Size		hoff;
565 
566 	char		toast_action[MaxHeapAttributeNumber];
567 	bool		toast_isnull[MaxHeapAttributeNumber];
568 	bool		toast_oldisnull[MaxHeapAttributeNumber];
569 	Datum		toast_values[MaxHeapAttributeNumber];
570 	Datum		toast_oldvalues[MaxHeapAttributeNumber];
571 	struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
572 	int32		toast_sizes[MaxHeapAttributeNumber];
573 	bool		toast_free[MaxHeapAttributeNumber];
574 	bool		toast_delold[MaxHeapAttributeNumber];
575 
576 	/*
577 	 * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super
578 	 * deletions just normally insert/delete the toast values. It seems
579 	 * easiest to deal with that here, instead on, potentially, multiple
580 	 * callers.
581 	 */
582 	options &= ~HEAP_INSERT_SPECULATIVE;
583 
584 	/*
585 	 * We should only ever be called for tuples of plain relations or
586 	 * materialized views --- recursing on a toast rel is bad news.
587 	 */
588 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
589 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
590 
591 	/*
592 	 * Get the tuple descriptor and break down the tuple(s) into fields.
593 	 */
594 	tupleDesc = rel->rd_att;
595 	numAttrs = tupleDesc->natts;
596 
597 	Assert(numAttrs <= MaxHeapAttributeNumber);
598 	heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
599 	if (oldtup != NULL)
600 		heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
601 
602 	/* ----------
603 	 * Then collect information about the values given
604 	 *
605 	 * NOTE: toast_action[i] can have these values:
606 	 *		' '		default handling
607 	 *		'p'		already processed --- don't touch it
608 	 *		'x'		incompressible, but OK to move off
609 	 *
610 	 * NOTE: toast_sizes[i] is only made valid for varlena attributes with
611 	 *		toast_action[i] different from 'p'.
612 	 * ----------
613 	 */
614 	memset(toast_action, ' ', numAttrs * sizeof(char));
615 	memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
616 	memset(toast_free, 0, numAttrs * sizeof(bool));
617 	memset(toast_delold, 0, numAttrs * sizeof(bool));
618 
619 	for (i = 0; i < numAttrs; i++)
620 	{
621 		Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
622 		struct varlena *old_value;
623 		struct varlena *new_value;
624 
625 		if (oldtup != NULL)
626 		{
627 			/*
628 			 * For UPDATE get the old and new values of this attribute
629 			 */
630 			old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
631 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
632 
633 			/*
634 			 * If the old value is stored on disk, check if it has changed so
635 			 * we have to delete it later.
636 			 */
637 			if (att->attlen == -1 && !toast_oldisnull[i] &&
638 				VARATT_IS_EXTERNAL_ONDISK(old_value))
639 			{
640 				if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
641 					memcmp((char *) old_value, (char *) new_value,
642 						   VARSIZE_EXTERNAL(old_value)) != 0)
643 				{
644 					/*
645 					 * The old external stored value isn't needed any more
646 					 * after the update
647 					 */
648 					toast_delold[i] = true;
649 					need_delold = true;
650 				}
651 				else
652 				{
653 					/*
654 					 * This attribute isn't changed by this update so we reuse
655 					 * the original reference to the old value in the new
656 					 * tuple.
657 					 */
658 					toast_action[i] = 'p';
659 					continue;
660 				}
661 			}
662 		}
663 		else
664 		{
665 			/*
666 			 * For INSERT simply get the new value
667 			 */
668 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
669 		}
670 
671 		/*
672 		 * Handle NULL attributes
673 		 */
674 		if (toast_isnull[i])
675 		{
676 			toast_action[i] = 'p';
677 			has_nulls = true;
678 			continue;
679 		}
680 
681 		/*
682 		 * Now look at varlena attributes
683 		 */
684 		if (att->attlen == -1)
685 		{
686 			/*
687 			 * If the table's attribute says PLAIN always, force it so.
688 			 */
689 			if (att->attstorage == 'p')
690 				toast_action[i] = 'p';
691 
692 			/*
693 			 * We took care of UPDATE above, so any external value we find
694 			 * still in the tuple must be someone else's that we cannot reuse
695 			 * (this includes the case of an out-of-line in-memory datum).
696 			 * Fetch it back (without decompression, unless we are forcing
697 			 * PLAIN storage).  If necessary, we'll push it out as a new
698 			 * external value below.
699 			 */
700 			if (VARATT_IS_EXTERNAL(new_value))
701 			{
702 				toast_oldexternal[i] = new_value;
703 				if (att->attstorage == 'p')
704 					new_value = heap_tuple_untoast_attr(new_value);
705 				else
706 					new_value = heap_tuple_fetch_attr(new_value);
707 				toast_values[i] = PointerGetDatum(new_value);
708 				toast_free[i] = true;
709 				need_change = true;
710 				need_free = true;
711 			}
712 
713 			/*
714 			 * Remember the size of this attribute
715 			 */
716 			toast_sizes[i] = VARSIZE_ANY(new_value);
717 		}
718 		else
719 		{
720 			/*
721 			 * Not a varlena attribute, plain storage always
722 			 */
723 			toast_action[i] = 'p';
724 		}
725 	}
726 
727 	/* ----------
728 	 * Compress and/or save external until data fits into target length
729 	 *
730 	 *	1: Inline compress attributes with attstorage 'x', and store very
731 	 *	   large attributes with attstorage 'x' or 'e' external immediately
732 	 *	2: Store attributes with attstorage 'x' or 'e' external
733 	 *	3: Inline compress attributes with attstorage 'm'
734 	 *	4: Store attributes with attstorage 'm' external
735 	 * ----------
736 	 */
737 
738 	/* compute header overhead --- this should match heap_form_tuple() */
739 	hoff = SizeofHeapTupleHeader;
740 	if (has_nulls)
741 		hoff += BITMAPLEN(numAttrs);
742 	if (newtup->t_data->t_infomask & HEAP_HASOID)
743 		hoff += sizeof(Oid);
744 	hoff = MAXALIGN(hoff);
745 	/* now convert to a limit on the tuple data size */
746 	maxDataLen = RelationGetToastTupleTarget(rel, TOAST_TUPLE_TARGET) - hoff;
747 
748 	/*
749 	 * Look for attributes with attstorage 'x' to compress.  Also find large
750 	 * attributes with attstorage 'x' or 'e', and store them external.
751 	 */
752 	while (heap_compute_data_size(tupleDesc,
753 								  toast_values, toast_isnull) > maxDataLen)
754 	{
755 		int			biggest_attno = -1;
756 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
757 		Datum		old_value;
758 		Datum		new_value;
759 
760 		/*
761 		 * Search for the biggest yet unprocessed internal attribute
762 		 */
763 		for (i = 0; i < numAttrs; i++)
764 		{
765 			Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
766 
767 			if (toast_action[i] != ' ')
768 				continue;
769 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
770 				continue;		/* can't happen, toast_action would be 'p' */
771 			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
772 				continue;
773 			if (att->attstorage != 'x' && att->attstorage != 'e')
774 				continue;
775 			if (toast_sizes[i] > biggest_size)
776 			{
777 				biggest_attno = i;
778 				biggest_size = toast_sizes[i];
779 			}
780 		}
781 
782 		if (biggest_attno < 0)
783 			break;
784 
785 		/*
786 		 * Attempt to compress it inline, if it has attstorage 'x'
787 		 */
788 		i = biggest_attno;
789 		if (TupleDescAttr(tupleDesc, i)->attstorage == 'x')
790 		{
791 			old_value = toast_values[i];
792 			new_value = toast_compress_datum(old_value);
793 
794 			if (DatumGetPointer(new_value) != NULL)
795 			{
796 				/* successful compression */
797 				if (toast_free[i])
798 					pfree(DatumGetPointer(old_value));
799 				toast_values[i] = new_value;
800 				toast_free[i] = true;
801 				toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
802 				need_change = true;
803 				need_free = true;
804 			}
805 			else
806 			{
807 				/* incompressible, ignore on subsequent compression passes */
808 				toast_action[i] = 'x';
809 			}
810 		}
811 		else
812 		{
813 			/* has attstorage 'e', ignore on subsequent compression passes */
814 			toast_action[i] = 'x';
815 		}
816 
817 		/*
818 		 * If this value is by itself more than maxDataLen (after compression
819 		 * if any), push it out to the toast table immediately, if possible.
820 		 * This avoids uselessly compressing other fields in the common case
821 		 * where we have one long field and several short ones.
822 		 *
823 		 * XXX maybe the threshold should be less than maxDataLen?
824 		 */
825 		if (toast_sizes[i] > maxDataLen &&
826 			rel->rd_rel->reltoastrelid != InvalidOid)
827 		{
828 			old_value = toast_values[i];
829 			toast_action[i] = 'p';
830 			toast_values[i] = toast_save_datum(rel, toast_values[i],
831 											   toast_oldexternal[i], options);
832 			if (toast_free[i])
833 				pfree(DatumGetPointer(old_value));
834 			toast_free[i] = true;
835 			need_change = true;
836 			need_free = true;
837 		}
838 	}
839 
840 	/*
841 	 * Second we look for attributes of attstorage 'x' or 'e' that are still
842 	 * inline.  But skip this if there's no toast table to push them to.
843 	 */
844 	while (heap_compute_data_size(tupleDesc,
845 								  toast_values, toast_isnull) > maxDataLen &&
846 		   rel->rd_rel->reltoastrelid != InvalidOid)
847 	{
848 		int			biggest_attno = -1;
849 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
850 		Datum		old_value;
851 
852 		/*------
853 		 * Search for the biggest yet inlined attribute with
854 		 * attstorage equals 'x' or 'e'
855 		 *------
856 		 */
857 		for (i = 0; i < numAttrs; i++)
858 		{
859 			Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
860 
861 			if (toast_action[i] == 'p')
862 				continue;
863 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
864 				continue;		/* can't happen, toast_action would be 'p' */
865 			if (att->attstorage != 'x' && att->attstorage != 'e')
866 				continue;
867 			if (toast_sizes[i] > biggest_size)
868 			{
869 				biggest_attno = i;
870 				biggest_size = toast_sizes[i];
871 			}
872 		}
873 
874 		if (biggest_attno < 0)
875 			break;
876 
877 		/*
878 		 * Store this external
879 		 */
880 		i = biggest_attno;
881 		old_value = toast_values[i];
882 		toast_action[i] = 'p';
883 		toast_values[i] = toast_save_datum(rel, toast_values[i],
884 										   toast_oldexternal[i], options);
885 		if (toast_free[i])
886 			pfree(DatumGetPointer(old_value));
887 		toast_free[i] = true;
888 
889 		need_change = true;
890 		need_free = true;
891 	}
892 
893 	/*
894 	 * Round 3 - this time we take attributes with storage 'm' into
895 	 * compression
896 	 */
897 	while (heap_compute_data_size(tupleDesc,
898 								  toast_values, toast_isnull) > maxDataLen)
899 	{
900 		int			biggest_attno = -1;
901 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
902 		Datum		old_value;
903 		Datum		new_value;
904 
905 		/*
906 		 * Search for the biggest yet uncompressed internal attribute
907 		 */
908 		for (i = 0; i < numAttrs; i++)
909 		{
910 			if (toast_action[i] != ' ')
911 				continue;
912 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
913 				continue;		/* can't happen, toast_action would be 'p' */
914 			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
915 				continue;
916 			if (TupleDescAttr(tupleDesc, i)->attstorage != 'm')
917 				continue;
918 			if (toast_sizes[i] > biggest_size)
919 			{
920 				biggest_attno = i;
921 				biggest_size = toast_sizes[i];
922 			}
923 		}
924 
925 		if (biggest_attno < 0)
926 			break;
927 
928 		/*
929 		 * Attempt to compress it inline
930 		 */
931 		i = biggest_attno;
932 		old_value = toast_values[i];
933 		new_value = toast_compress_datum(old_value);
934 
935 		if (DatumGetPointer(new_value) != NULL)
936 		{
937 			/* successful compression */
938 			if (toast_free[i])
939 				pfree(DatumGetPointer(old_value));
940 			toast_values[i] = new_value;
941 			toast_free[i] = true;
942 			toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
943 			need_change = true;
944 			need_free = true;
945 		}
946 		else
947 		{
948 			/* incompressible, ignore on subsequent compression passes */
949 			toast_action[i] = 'x';
950 		}
951 	}
952 
953 	/*
954 	 * Finally we store attributes of type 'm' externally.  At this point we
955 	 * increase the target tuple size, so that 'm' attributes aren't stored
956 	 * externally unless really necessary.
957 	 */
958 	maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
959 
960 	while (heap_compute_data_size(tupleDesc,
961 								  toast_values, toast_isnull) > maxDataLen &&
962 		   rel->rd_rel->reltoastrelid != InvalidOid)
963 	{
964 		int			biggest_attno = -1;
965 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
966 		Datum		old_value;
967 
968 		/*--------
969 		 * Search for the biggest yet inlined attribute with
970 		 * attstorage = 'm'
971 		 *--------
972 		 */
973 		for (i = 0; i < numAttrs; i++)
974 		{
975 			if (toast_action[i] == 'p')
976 				continue;
977 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
978 				continue;		/* can't happen, toast_action would be 'p' */
979 			if (TupleDescAttr(tupleDesc, i)->attstorage != 'm')
980 				continue;
981 			if (toast_sizes[i] > biggest_size)
982 			{
983 				biggest_attno = i;
984 				biggest_size = toast_sizes[i];
985 			}
986 		}
987 
988 		if (biggest_attno < 0)
989 			break;
990 
991 		/*
992 		 * Store this external
993 		 */
994 		i = biggest_attno;
995 		old_value = toast_values[i];
996 		toast_action[i] = 'p';
997 		toast_values[i] = toast_save_datum(rel, toast_values[i],
998 										   toast_oldexternal[i], options);
999 		if (toast_free[i])
1000 			pfree(DatumGetPointer(old_value));
1001 		toast_free[i] = true;
1002 
1003 		need_change = true;
1004 		need_free = true;
1005 	}
1006 
1007 	/*
1008 	 * In the case we toasted any values, we need to build a new heap tuple
1009 	 * with the changed values.
1010 	 */
1011 	if (need_change)
1012 	{
1013 		HeapTupleHeader olddata = newtup->t_data;
1014 		HeapTupleHeader new_data;
1015 		int32		new_header_len;
1016 		int32		new_data_len;
1017 		int32		new_tuple_len;
1018 
1019 		/*
1020 		 * Calculate the new size of the tuple.
1021 		 *
1022 		 * Note: we used to assume here that the old tuple's t_hoff must equal
1023 		 * the new_header_len value, but that was incorrect.  The old tuple
1024 		 * might have a smaller-than-current natts, if there's been an ALTER
1025 		 * TABLE ADD COLUMN since it was stored; and that would lead to a
1026 		 * different conclusion about the size of the null bitmap, or even
1027 		 * whether there needs to be one at all.
1028 		 */
1029 		new_header_len = SizeofHeapTupleHeader;
1030 		if (has_nulls)
1031 			new_header_len += BITMAPLEN(numAttrs);
1032 		if (olddata->t_infomask & HEAP_HASOID)
1033 			new_header_len += sizeof(Oid);
1034 		new_header_len = MAXALIGN(new_header_len);
1035 		new_data_len = heap_compute_data_size(tupleDesc,
1036 											  toast_values, toast_isnull);
1037 		new_tuple_len = new_header_len + new_data_len;
1038 
1039 		/*
1040 		 * Allocate and zero the space needed, and fill HeapTupleData fields.
1041 		 */
1042 		result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
1043 		result_tuple->t_len = new_tuple_len;
1044 		result_tuple->t_self = newtup->t_self;
1045 		result_tuple->t_tableOid = newtup->t_tableOid;
1046 		new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
1047 		result_tuple->t_data = new_data;
1048 
1049 		/*
1050 		 * Copy the existing tuple header, but adjust natts and t_hoff.
1051 		 */
1052 		memcpy(new_data, olddata, SizeofHeapTupleHeader);
1053 		HeapTupleHeaderSetNatts(new_data, numAttrs);
1054 		new_data->t_hoff = new_header_len;
1055 		if (olddata->t_infomask & HEAP_HASOID)
1056 			HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
1057 
1058 		/* Copy over the data, and fill the null bitmap if needed */
1059 		heap_fill_tuple(tupleDesc,
1060 						toast_values,
1061 						toast_isnull,
1062 						(char *) new_data + new_header_len,
1063 						new_data_len,
1064 						&(new_data->t_infomask),
1065 						has_nulls ? new_data->t_bits : NULL);
1066 	}
1067 	else
1068 		result_tuple = newtup;
1069 
1070 	/*
1071 	 * Free allocated temp values
1072 	 */
1073 	if (need_free)
1074 		for (i = 0; i < numAttrs; i++)
1075 			if (toast_free[i])
1076 				pfree(DatumGetPointer(toast_values[i]));
1077 
1078 	/*
1079 	 * Delete external values from the old tuple
1080 	 */
1081 	if (need_delold)
1082 		for (i = 0; i < numAttrs; i++)
1083 			if (toast_delold[i])
1084 				toast_delete_datum(rel, toast_oldvalues[i], false);
1085 
1086 	return result_tuple;
1087 }
1088 
1089 
1090 /* ----------
1091  * toast_flatten_tuple -
1092  *
1093  *	"Flatten" a tuple to contain no out-of-line toasted fields.
1094  *	(This does not eliminate compressed or short-header datums.)
1095  *
1096  *	Note: we expect the caller already checked HeapTupleHasExternal(tup),
1097  *	so there is no need for a short-circuit path.
1098  * ----------
1099  */
1100 HeapTuple
toast_flatten_tuple(HeapTuple tup,TupleDesc tupleDesc)1101 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
1102 {
1103 	HeapTuple	new_tuple;
1104 	int			numAttrs = tupleDesc->natts;
1105 	int			i;
1106 	Datum		toast_values[MaxTupleAttributeNumber];
1107 	bool		toast_isnull[MaxTupleAttributeNumber];
1108 	bool		toast_free[MaxTupleAttributeNumber];
1109 
1110 	/*
1111 	 * Break down the tuple into fields.
1112 	 */
1113 	Assert(numAttrs <= MaxTupleAttributeNumber);
1114 	heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
1115 
1116 	memset(toast_free, 0, numAttrs * sizeof(bool));
1117 
1118 	for (i = 0; i < numAttrs; i++)
1119 	{
1120 		/*
1121 		 * Look at non-null varlena attributes
1122 		 */
1123 		if (!toast_isnull[i] && TupleDescAttr(tupleDesc, i)->attlen == -1)
1124 		{
1125 			struct varlena *new_value;
1126 
1127 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1128 			if (VARATT_IS_EXTERNAL(new_value))
1129 			{
1130 				new_value = heap_tuple_fetch_attr(new_value);
1131 				toast_values[i] = PointerGetDatum(new_value);
1132 				toast_free[i] = true;
1133 			}
1134 		}
1135 	}
1136 
1137 	/*
1138 	 * Form the reconfigured tuple.
1139 	 */
1140 	new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
1141 
1142 	/*
1143 	 * Be sure to copy the tuple's OID and identity fields.  We also make a
1144 	 * point of copying visibility info, just in case anybody looks at those
1145 	 * fields in a syscache entry.
1146 	 */
1147 	if (tupleDesc->tdhasoid)
1148 		HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
1149 
1150 	new_tuple->t_self = tup->t_self;
1151 	new_tuple->t_tableOid = tup->t_tableOid;
1152 
1153 	new_tuple->t_data->t_choice = tup->t_data->t_choice;
1154 	new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
1155 	new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
1156 	new_tuple->t_data->t_infomask |=
1157 		tup->t_data->t_infomask & HEAP_XACT_MASK;
1158 	new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
1159 	new_tuple->t_data->t_infomask2 |=
1160 		tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1161 
1162 	/*
1163 	 * Free allocated temp values
1164 	 */
1165 	for (i = 0; i < numAttrs; i++)
1166 		if (toast_free[i])
1167 			pfree(DatumGetPointer(toast_values[i]));
1168 
1169 	return new_tuple;
1170 }
1171 
1172 
1173 /* ----------
1174  * toast_flatten_tuple_to_datum -
1175  *
1176  *	"Flatten" a tuple containing out-of-line toasted fields into a Datum.
1177  *	The result is always palloc'd in the current memory context.
1178  *
1179  *	We have a general rule that Datums of container types (rows, arrays,
1180  *	ranges, etc) must not contain any external TOAST pointers.  Without
1181  *	this rule, we'd have to look inside each Datum when preparing a tuple
1182  *	for storage, which would be expensive and would fail to extend cleanly
1183  *	to new sorts of container types.
1184  *
1185  *	However, we don't want to say that tuples represented as HeapTuples
1186  *	can't contain toasted fields, so instead this routine should be called
1187  *	when such a HeapTuple is being converted into a Datum.
1188  *
1189  *	While we're at it, we decompress any compressed fields too.  This is not
1190  *	necessary for correctness, but reflects an expectation that compression
1191  *	will be more effective if applied to the whole tuple not individual
1192  *	fields.  We are not so concerned about that that we want to deconstruct
1193  *	and reconstruct tuples just to get rid of compressed fields, however.
1194  *	So callers typically won't call this unless they see that the tuple has
1195  *	at least one external field.
1196  *
1197  *	On the other hand, in-line short-header varlena fields are left alone.
1198  *	If we "untoasted" them here, they'd just get changed back to short-header
1199  *	format anyway within heap_fill_tuple.
1200  * ----------
1201  */
1202 Datum
toast_flatten_tuple_to_datum(HeapTupleHeader tup,uint32 tup_len,TupleDesc tupleDesc)1203 toast_flatten_tuple_to_datum(HeapTupleHeader tup,
1204 							 uint32 tup_len,
1205 							 TupleDesc tupleDesc)
1206 {
1207 	HeapTupleHeader new_data;
1208 	int32		new_header_len;
1209 	int32		new_data_len;
1210 	int32		new_tuple_len;
1211 	HeapTupleData tmptup;
1212 	int			numAttrs = tupleDesc->natts;
1213 	int			i;
1214 	bool		has_nulls = false;
1215 	Datum		toast_values[MaxTupleAttributeNumber];
1216 	bool		toast_isnull[MaxTupleAttributeNumber];
1217 	bool		toast_free[MaxTupleAttributeNumber];
1218 
1219 	/* Build a temporary HeapTuple control structure */
1220 	tmptup.t_len = tup_len;
1221 	ItemPointerSetInvalid(&(tmptup.t_self));
1222 	tmptup.t_tableOid = InvalidOid;
1223 	tmptup.t_data = tup;
1224 
1225 	/*
1226 	 * Break down the tuple into fields.
1227 	 */
1228 	Assert(numAttrs <= MaxTupleAttributeNumber);
1229 	heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1230 
1231 	memset(toast_free, 0, numAttrs * sizeof(bool));
1232 
1233 	for (i = 0; i < numAttrs; i++)
1234 	{
1235 		/*
1236 		 * Look at non-null varlena attributes
1237 		 */
1238 		if (toast_isnull[i])
1239 			has_nulls = true;
1240 		else if (TupleDescAttr(tupleDesc, i)->attlen == -1)
1241 		{
1242 			struct varlena *new_value;
1243 
1244 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1245 			if (VARATT_IS_EXTERNAL(new_value) ||
1246 				VARATT_IS_COMPRESSED(new_value))
1247 			{
1248 				new_value = heap_tuple_untoast_attr(new_value);
1249 				toast_values[i] = PointerGetDatum(new_value);
1250 				toast_free[i] = true;
1251 			}
1252 		}
1253 	}
1254 
1255 	/*
1256 	 * Calculate the new size of the tuple.
1257 	 *
1258 	 * This should match the reconstruction code in toast_insert_or_update.
1259 	 */
1260 	new_header_len = SizeofHeapTupleHeader;
1261 	if (has_nulls)
1262 		new_header_len += BITMAPLEN(numAttrs);
1263 	if (tup->t_infomask & HEAP_HASOID)
1264 		new_header_len += sizeof(Oid);
1265 	new_header_len = MAXALIGN(new_header_len);
1266 	new_data_len = heap_compute_data_size(tupleDesc,
1267 										  toast_values, toast_isnull);
1268 	new_tuple_len = new_header_len + new_data_len;
1269 
1270 	new_data = (HeapTupleHeader) palloc0(new_tuple_len);
1271 
1272 	/*
1273 	 * Copy the existing tuple header, but adjust natts and t_hoff.
1274 	 */
1275 	memcpy(new_data, tup, SizeofHeapTupleHeader);
1276 	HeapTupleHeaderSetNatts(new_data, numAttrs);
1277 	new_data->t_hoff = new_header_len;
1278 	if (tup->t_infomask & HEAP_HASOID)
1279 		HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(tup));
1280 
1281 	/* Set the composite-Datum header fields correctly */
1282 	HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
1283 	HeapTupleHeaderSetTypeId(new_data, tupleDesc->tdtypeid);
1284 	HeapTupleHeaderSetTypMod(new_data, tupleDesc->tdtypmod);
1285 
1286 	/* Copy over the data, and fill the null bitmap if needed */
1287 	heap_fill_tuple(tupleDesc,
1288 					toast_values,
1289 					toast_isnull,
1290 					(char *) new_data + new_header_len,
1291 					new_data_len,
1292 					&(new_data->t_infomask),
1293 					has_nulls ? new_data->t_bits : NULL);
1294 
1295 	/*
1296 	 * Free allocated temp values
1297 	 */
1298 	for (i = 0; i < numAttrs; i++)
1299 		if (toast_free[i])
1300 			pfree(DatumGetPointer(toast_values[i]));
1301 
1302 	return PointerGetDatum(new_data);
1303 }
1304 
1305 
1306 /* ----------
1307  * toast_build_flattened_tuple -
1308  *
1309  *	Build a tuple containing no out-of-line toasted fields.
1310  *	(This does not eliminate compressed or short-header datums.)
1311  *
1312  *	This is essentially just like heap_form_tuple, except that it will
1313  *	expand any external-data pointers beforehand.
1314  *
1315  *	It's not very clear whether it would be preferable to decompress
1316  *	in-line compressed datums while at it.  For now, we don't.
1317  * ----------
1318  */
1319 HeapTuple
toast_build_flattened_tuple(TupleDesc tupleDesc,Datum * values,bool * isnull)1320 toast_build_flattened_tuple(TupleDesc tupleDesc,
1321 							Datum *values,
1322 							bool *isnull)
1323 {
1324 	HeapTuple	new_tuple;
1325 	int			numAttrs = tupleDesc->natts;
1326 	int			num_to_free;
1327 	int			i;
1328 	Datum		new_values[MaxTupleAttributeNumber];
1329 	Pointer		freeable_values[MaxTupleAttributeNumber];
1330 
1331 	/*
1332 	 * We can pass the caller's isnull array directly to heap_form_tuple, but
1333 	 * we potentially need to modify the values array.
1334 	 */
1335 	Assert(numAttrs <= MaxTupleAttributeNumber);
1336 	memcpy(new_values, values, numAttrs * sizeof(Datum));
1337 
1338 	num_to_free = 0;
1339 	for (i = 0; i < numAttrs; i++)
1340 	{
1341 		/*
1342 		 * Look at non-null varlena attributes
1343 		 */
1344 		if (!isnull[i] && TupleDescAttr(tupleDesc, i)->attlen == -1)
1345 		{
1346 			struct varlena *new_value;
1347 
1348 			new_value = (struct varlena *) DatumGetPointer(new_values[i]);
1349 			if (VARATT_IS_EXTERNAL(new_value))
1350 			{
1351 				new_value = heap_tuple_fetch_attr(new_value);
1352 				new_values[i] = PointerGetDatum(new_value);
1353 				freeable_values[num_to_free++] = (Pointer) new_value;
1354 			}
1355 		}
1356 	}
1357 
1358 	/*
1359 	 * Form the reconfigured tuple.
1360 	 */
1361 	new_tuple = heap_form_tuple(tupleDesc, new_values, isnull);
1362 
1363 	/*
1364 	 * Free allocated temp values
1365 	 */
1366 	for (i = 0; i < num_to_free; i++)
1367 		pfree(freeable_values[i]);
1368 
1369 	return new_tuple;
1370 }
1371 
1372 
1373 /* ----------
1374  * toast_compress_datum -
1375  *
1376  *	Create a compressed version of a varlena datum
1377  *
1378  *	If we fail (ie, compressed result is actually bigger than original)
1379  *	then return NULL.  We must not use compressed data if it'd expand
1380  *	the tuple!
1381  *
1382  *	We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1383  *	copying them.  But we can't handle external or compressed datums.
1384  * ----------
1385  */
1386 Datum
toast_compress_datum(Datum value)1387 toast_compress_datum(Datum value)
1388 {
1389 	struct varlena *tmp;
1390 	int32		valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1391 	int32		len;
1392 
1393 	Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1394 	Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1395 
1396 	/*
1397 	 * No point in wasting a palloc cycle if value size is out of the allowed
1398 	 * range for compression
1399 	 */
1400 	if (valsize < PGLZ_strategy_default->min_input_size ||
1401 		valsize > PGLZ_strategy_default->max_input_size)
1402 		return PointerGetDatum(NULL);
1403 
1404 	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
1405 									TOAST_COMPRESS_HDRSZ);
1406 
1407 	/*
1408 	 * We recheck the actual size even if pglz_compress() reports success,
1409 	 * because it might be satisfied with having saved as little as one byte
1410 	 * in the compressed data --- which could turn into a net loss once you
1411 	 * consider header and alignment padding.  Worst case, the compressed
1412 	 * format might require three padding bytes (plus header, which is
1413 	 * included in VARSIZE(tmp)), whereas the uncompressed format would take
1414 	 * only one header byte and no padding if the value is short enough.  So
1415 	 * we insist on a savings of more than 2 bytes to ensure we have a gain.
1416 	 */
1417 	len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)),
1418 						valsize,
1419 						TOAST_COMPRESS_RAWDATA(tmp),
1420 						PGLZ_strategy_default);
1421 	if (len >= 0 &&
1422 		len + TOAST_COMPRESS_HDRSZ < valsize - 2)
1423 	{
1424 		TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize);
1425 		SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ);
1426 		/* successful compression */
1427 		return PointerGetDatum(tmp);
1428 	}
1429 	else
1430 	{
1431 		/* incompressible data */
1432 		pfree(tmp);
1433 		return PointerGetDatum(NULL);
1434 	}
1435 }
1436 
1437 
1438 /* ----------
1439  * toast_get_valid_index
1440  *
1441  *	Get OID of valid index associated to given toast relation. A toast
1442  *	relation can have only one valid index at the same time.
1443  */
1444 Oid
toast_get_valid_index(Oid toastoid,LOCKMODE lock)1445 toast_get_valid_index(Oid toastoid, LOCKMODE lock)
1446 {
1447 	int			num_indexes;
1448 	int			validIndex;
1449 	Oid			validIndexOid;
1450 	Relation   *toastidxs;
1451 	Relation	toastrel;
1452 
1453 	/* Open the toast relation */
1454 	toastrel = heap_open(toastoid, lock);
1455 
1456 	/* Look for the valid index of the toast relation */
1457 	validIndex = toast_open_indexes(toastrel,
1458 									lock,
1459 									&toastidxs,
1460 									&num_indexes);
1461 	validIndexOid = RelationGetRelid(toastidxs[validIndex]);
1462 
1463 	/* Close the toast relation and all its indexes */
1464 	toast_close_indexes(toastidxs, num_indexes, NoLock);
1465 	heap_close(toastrel, NoLock);
1466 
1467 	return validIndexOid;
1468 }
1469 
1470 
1471 /* ----------
1472  * toast_save_datum -
1473  *
1474  *	Save one single datum into the secondary relation and return
1475  *	a Datum reference for it.
1476  *
1477  * rel: the main relation we're working with (not the toast rel!)
1478  * value: datum to be pushed to toast storage
1479  * oldexternal: if not NULL, toast pointer previously representing the datum
1480  * options: options to be passed to heap_insert() for toast rows
1481  * ----------
1482  */
1483 static Datum
toast_save_datum(Relation rel,Datum value,struct varlena * oldexternal,int options)1484 toast_save_datum(Relation rel, Datum value,
1485 				 struct varlena *oldexternal, int options)
1486 {
1487 	Relation	toastrel;
1488 	Relation   *toastidxs;
1489 	HeapTuple	toasttup;
1490 	TupleDesc	toasttupDesc;
1491 	Datum		t_values[3];
1492 	bool		t_isnull[3];
1493 	CommandId	mycid = GetCurrentCommandId(true);
1494 	struct varlena *result;
1495 	struct varatt_external toast_pointer;
1496 	union
1497 	{
1498 		struct varlena hdr;
1499 		/* this is to make the union big enough for a chunk: */
1500 		char		data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ];
1501 		/* ensure union is aligned well enough: */
1502 		int32		align_it;
1503 	}			chunk_data;
1504 	int32		chunk_size;
1505 	int32		chunk_seq = 0;
1506 	char	   *data_p;
1507 	int32		data_todo;
1508 	Pointer		dval = DatumGetPointer(value);
1509 	int			num_indexes;
1510 	int			validIndex;
1511 
1512 	Assert(!VARATT_IS_EXTERNAL(value));
1513 
1514 	/*
1515 	 * Open the toast relation and its indexes.  We can use the index to check
1516 	 * uniqueness of the OID we assign to the toasted item, even though it has
1517 	 * additional columns besides OID.
1518 	 */
1519 	toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1520 	toasttupDesc = toastrel->rd_att;
1521 
1522 	/* Open all the toast indexes and look for the valid one */
1523 	validIndex = toast_open_indexes(toastrel,
1524 									RowExclusiveLock,
1525 									&toastidxs,
1526 									&num_indexes);
1527 
1528 	/*
1529 	 * Get the data pointer and length, and compute va_rawsize and va_extsize.
1530 	 *
1531 	 * va_rawsize is the size of the equivalent fully uncompressed datum, so
1532 	 * we have to adjust for short headers.
1533 	 *
1534 	 * va_extsize is the actual size of the data payload in the toast records.
1535 	 */
1536 	if (VARATT_IS_SHORT(dval))
1537 	{
1538 		data_p = VARDATA_SHORT(dval);
1539 		data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1540 		toast_pointer.va_rawsize = data_todo + VARHDRSZ;	/* as if not short */
1541 		toast_pointer.va_extsize = data_todo;
1542 	}
1543 	else if (VARATT_IS_COMPRESSED(dval))
1544 	{
1545 		data_p = VARDATA(dval);
1546 		data_todo = VARSIZE(dval) - VARHDRSZ;
1547 		/* rawsize in a compressed datum is just the size of the payload */
1548 		toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1549 		toast_pointer.va_extsize = data_todo;
1550 		/* Assert that the numbers look like it's compressed */
1551 		Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1552 	}
1553 	else
1554 	{
1555 		data_p = VARDATA(dval);
1556 		data_todo = VARSIZE(dval) - VARHDRSZ;
1557 		toast_pointer.va_rawsize = VARSIZE(dval);
1558 		toast_pointer.va_extsize = data_todo;
1559 	}
1560 
1561 	/*
1562 	 * Insert the correct table OID into the result TOAST pointer.
1563 	 *
1564 	 * Normally this is the actual OID of the target toast table, but during
1565 	 * table-rewriting operations such as CLUSTER, we have to insert the OID
1566 	 * of the table's real permanent toast table instead.  rd_toastoid is set
1567 	 * if we have to substitute such an OID.
1568 	 */
1569 	if (OidIsValid(rel->rd_toastoid))
1570 		toast_pointer.va_toastrelid = rel->rd_toastoid;
1571 	else
1572 		toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1573 
1574 	/*
1575 	 * Choose an OID to use as the value ID for this toast value.
1576 	 *
1577 	 * Normally we just choose an unused OID within the toast table.  But
1578 	 * during table-rewriting operations where we are preserving an existing
1579 	 * toast table OID, we want to preserve toast value OIDs too.  So, if
1580 	 * rd_toastoid is set and we had a prior external value from that same
1581 	 * toast table, re-use its value ID.  If we didn't have a prior external
1582 	 * value (which is a corner case, but possible if the table's attstorage
1583 	 * options have been changed), we have to pick a value ID that doesn't
1584 	 * conflict with either new or existing toast value OIDs.
1585 	 */
1586 	if (!OidIsValid(rel->rd_toastoid))
1587 	{
1588 		/* normal case: just choose an unused OID */
1589 		toast_pointer.va_valueid =
1590 			GetNewOidWithIndex(toastrel,
1591 							   RelationGetRelid(toastidxs[validIndex]),
1592 							   (AttrNumber) 1);
1593 	}
1594 	else
1595 	{
1596 		/* rewrite case: check to see if value was in old toast table */
1597 		toast_pointer.va_valueid = InvalidOid;
1598 		if (oldexternal != NULL)
1599 		{
1600 			struct varatt_external old_toast_pointer;
1601 
1602 			Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
1603 			/* Must copy to access aligned fields */
1604 			VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1605 			if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1606 			{
1607 				/* This value came from the old toast table; reuse its OID */
1608 				toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1609 
1610 				/*
1611 				 * There is a corner case here: the table rewrite might have
1612 				 * to copy both live and recently-dead versions of a row, and
1613 				 * those versions could easily reference the same toast value.
1614 				 * When we copy the second or later version of such a row,
1615 				 * reusing the OID will mean we select an OID that's already
1616 				 * in the new toast table.  Check for that, and if so, just
1617 				 * fall through without writing the data again.
1618 				 *
1619 				 * While annoying and ugly-looking, this is a good thing
1620 				 * because it ensures that we wind up with only one copy of
1621 				 * the toast value when there is only one copy in the old
1622 				 * toast table.  Before we detected this case, we'd have made
1623 				 * multiple copies, wasting space; and what's worse, the
1624 				 * copies belonging to already-deleted heap tuples would not
1625 				 * be reclaimed by VACUUM.
1626 				 */
1627 				if (toastrel_valueid_exists(toastrel,
1628 											toast_pointer.va_valueid))
1629 				{
1630 					/* Match, so short-circuit the data storage loop below */
1631 					data_todo = 0;
1632 				}
1633 			}
1634 		}
1635 		if (toast_pointer.va_valueid == InvalidOid)
1636 		{
1637 			/*
1638 			 * new value; must choose an OID that doesn't conflict in either
1639 			 * old or new toast table
1640 			 */
1641 			do
1642 			{
1643 				toast_pointer.va_valueid =
1644 					GetNewOidWithIndex(toastrel,
1645 									   RelationGetRelid(toastidxs[validIndex]),
1646 									   (AttrNumber) 1);
1647 			} while (toastid_valueid_exists(rel->rd_toastoid,
1648 											toast_pointer.va_valueid));
1649 		}
1650 	}
1651 
1652 	/*
1653 	 * Initialize constant parts of the tuple data
1654 	 */
1655 	t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1656 	t_values[2] = PointerGetDatum(&chunk_data);
1657 	t_isnull[0] = false;
1658 	t_isnull[1] = false;
1659 	t_isnull[2] = false;
1660 
1661 	/*
1662 	 * Split up the item into chunks
1663 	 */
1664 	while (data_todo > 0)
1665 	{
1666 		int			i;
1667 
1668 		CHECK_FOR_INTERRUPTS();
1669 
1670 		/*
1671 		 * Calculate the size of this chunk
1672 		 */
1673 		chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1674 
1675 		/*
1676 		 * Build a tuple and store it
1677 		 */
1678 		t_values[1] = Int32GetDatum(chunk_seq++);
1679 		SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1680 		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1681 		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1682 
1683 		heap_insert(toastrel, toasttup, mycid, options, NULL);
1684 
1685 		/*
1686 		 * Create the index entry.  We cheat a little here by not using
1687 		 * FormIndexDatum: this relies on the knowledge that the index columns
1688 		 * are the same as the initial columns of the table for all the
1689 		 * indexes.  We also cheat by not providing an IndexInfo: this is okay
1690 		 * for now because btree doesn't need one, but we might have to be
1691 		 * more honest someday.
1692 		 *
1693 		 * Note also that there had better not be any user-created index on
1694 		 * the TOAST table, since we don't bother to update anything else.
1695 		 */
1696 		for (i = 0; i < num_indexes; i++)
1697 		{
1698 			/* Only index relations marked as ready can be updated */
1699 			if (IndexIsReady(toastidxs[i]->rd_index))
1700 				index_insert(toastidxs[i], t_values, t_isnull,
1701 							 &(toasttup->t_self),
1702 							 toastrel,
1703 							 toastidxs[i]->rd_index->indisunique ?
1704 							 UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
1705 							 NULL);
1706 		}
1707 
1708 		/*
1709 		 * Free memory
1710 		 */
1711 		heap_freetuple(toasttup);
1712 
1713 		/*
1714 		 * Move on to next chunk
1715 		 */
1716 		data_todo -= chunk_size;
1717 		data_p += chunk_size;
1718 	}
1719 
1720 	/*
1721 	 * Done - close toast relation and its indexes
1722 	 */
1723 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1724 	heap_close(toastrel, RowExclusiveLock);
1725 
1726 	/*
1727 	 * Create the TOAST pointer value that we'll return
1728 	 */
1729 	result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1730 	SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
1731 	memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1732 
1733 	return PointerGetDatum(result);
1734 }
1735 
1736 
1737 /* ----------
1738  * toast_delete_datum -
1739  *
1740  *	Delete a single external stored value.
1741  * ----------
1742  */
1743 static void
toast_delete_datum(Relation rel,Datum value,bool is_speculative)1744 toast_delete_datum(Relation rel, Datum value, bool is_speculative)
1745 {
1746 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1747 	struct varatt_external toast_pointer;
1748 	Relation	toastrel;
1749 	Relation   *toastidxs;
1750 	ScanKeyData toastkey;
1751 	SysScanDesc toastscan;
1752 	HeapTuple	toasttup;
1753 	int			num_indexes;
1754 	int			validIndex;
1755 	SnapshotData SnapshotToast;
1756 
1757 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1758 		return;
1759 
1760 	/* Must copy to access aligned fields */
1761 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1762 
1763 	/*
1764 	 * Open the toast relation and its indexes
1765 	 */
1766 	toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1767 
1768 	/* Fetch valid relation used for process */
1769 	validIndex = toast_open_indexes(toastrel,
1770 									RowExclusiveLock,
1771 									&toastidxs,
1772 									&num_indexes);
1773 
1774 	/*
1775 	 * Setup a scan key to find chunks with matching va_valueid
1776 	 */
1777 	ScanKeyInit(&toastkey,
1778 				(AttrNumber) 1,
1779 				BTEqualStrategyNumber, F_OIDEQ,
1780 				ObjectIdGetDatum(toast_pointer.va_valueid));
1781 
1782 	/*
1783 	 * Find all the chunks.  (We don't actually care whether we see them in
1784 	 * sequence or not, but since we've already locked the index we might as
1785 	 * well use systable_beginscan_ordered.)
1786 	 */
1787 	init_toast_snapshot(&SnapshotToast);
1788 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1789 										   &SnapshotToast, 1, &toastkey);
1790 	while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1791 	{
1792 		/*
1793 		 * Have a chunk, delete it
1794 		 */
1795 		if (is_speculative)
1796 			heap_abort_speculative(toastrel, toasttup);
1797 		else
1798 			simple_heap_delete(toastrel, &toasttup->t_self);
1799 	}
1800 
1801 	/*
1802 	 * End scan and close relations
1803 	 */
1804 	systable_endscan_ordered(toastscan);
1805 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1806 	heap_close(toastrel, RowExclusiveLock);
1807 }
1808 
1809 
1810 /* ----------
1811  * toastrel_valueid_exists -
1812  *
1813  *	Test whether a toast value with the given ID exists in the toast relation.
1814  *	For safety, we consider a value to exist if there are either live or dead
1815  *	toast rows with that ID; see notes for GetNewOid().
1816  * ----------
1817  */
1818 static bool
toastrel_valueid_exists(Relation toastrel,Oid valueid)1819 toastrel_valueid_exists(Relation toastrel, Oid valueid)
1820 {
1821 	bool		result = false;
1822 	ScanKeyData toastkey;
1823 	SysScanDesc toastscan;
1824 	int			num_indexes;
1825 	int			validIndex;
1826 	Relation   *toastidxs;
1827 
1828 	/* Fetch a valid index relation */
1829 	validIndex = toast_open_indexes(toastrel,
1830 									RowExclusiveLock,
1831 									&toastidxs,
1832 									&num_indexes);
1833 
1834 	/*
1835 	 * Setup a scan key to find chunks with matching va_valueid
1836 	 */
1837 	ScanKeyInit(&toastkey,
1838 				(AttrNumber) 1,
1839 				BTEqualStrategyNumber, F_OIDEQ,
1840 				ObjectIdGetDatum(valueid));
1841 
1842 	/*
1843 	 * Is there any such chunk?
1844 	 */
1845 	toastscan = systable_beginscan(toastrel,
1846 								   RelationGetRelid(toastidxs[validIndex]),
1847 								   true, SnapshotAny, 1, &toastkey);
1848 
1849 	if (systable_getnext(toastscan) != NULL)
1850 		result = true;
1851 
1852 	systable_endscan(toastscan);
1853 
1854 	/* Clean up */
1855 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1856 
1857 	return result;
1858 }
1859 
1860 /* ----------
1861  * toastid_valueid_exists -
1862  *
1863  *	As above, but work from toast rel's OID not an open relation
1864  * ----------
1865  */
1866 static bool
toastid_valueid_exists(Oid toastrelid,Oid valueid)1867 toastid_valueid_exists(Oid toastrelid, Oid valueid)
1868 {
1869 	bool		result;
1870 	Relation	toastrel;
1871 
1872 	toastrel = heap_open(toastrelid, AccessShareLock);
1873 
1874 	result = toastrel_valueid_exists(toastrel, valueid);
1875 
1876 	heap_close(toastrel, AccessShareLock);
1877 
1878 	return result;
1879 }
1880 
1881 
1882 /* ----------
1883  * toast_fetch_datum -
1884  *
1885  *	Reconstruct an in memory Datum from the chunks saved
1886  *	in the toast relation
1887  * ----------
1888  */
1889 static struct varlena *
toast_fetch_datum(struct varlena * attr)1890 toast_fetch_datum(struct varlena *attr)
1891 {
1892 	Relation	toastrel;
1893 	Relation   *toastidxs;
1894 	ScanKeyData toastkey;
1895 	SysScanDesc toastscan;
1896 	HeapTuple	ttup;
1897 	TupleDesc	toasttupDesc;
1898 	struct varlena *result;
1899 	struct varatt_external toast_pointer;
1900 	int32		ressize;
1901 	int32		residx,
1902 				nextidx;
1903 	int32		numchunks;
1904 	Pointer		chunk;
1905 	bool		isnull;
1906 	char	   *chunkdata;
1907 	int32		chunksize;
1908 	int			num_indexes;
1909 	int			validIndex;
1910 	SnapshotData SnapshotToast;
1911 
1912 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1913 		elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
1914 
1915 	/* Must copy to access aligned fields */
1916 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1917 
1918 	ressize = toast_pointer.va_extsize;
1919 	numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1920 
1921 	result = (struct varlena *) palloc(ressize + VARHDRSZ);
1922 
1923 	if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1924 		SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1925 	else
1926 		SET_VARSIZE(result, ressize + VARHDRSZ);
1927 
1928 	/*
1929 	 * Open the toast relation and its indexes
1930 	 */
1931 	toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1932 	toasttupDesc = toastrel->rd_att;
1933 
1934 	/* Look for the valid index of the toast relation */
1935 	validIndex = toast_open_indexes(toastrel,
1936 									AccessShareLock,
1937 									&toastidxs,
1938 									&num_indexes);
1939 
1940 	/*
1941 	 * Setup a scan key to fetch from the index by va_valueid
1942 	 */
1943 	ScanKeyInit(&toastkey,
1944 				(AttrNumber) 1,
1945 				BTEqualStrategyNumber, F_OIDEQ,
1946 				ObjectIdGetDatum(toast_pointer.va_valueid));
1947 
1948 	/*
1949 	 * Read the chunks by index
1950 	 *
1951 	 * Note that because the index is actually on (valueid, chunkidx) we will
1952 	 * see the chunks in chunkidx order, even though we didn't explicitly ask
1953 	 * for it.
1954 	 */
1955 	nextidx = 0;
1956 
1957 	init_toast_snapshot(&SnapshotToast);
1958 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1959 										   &SnapshotToast, 1, &toastkey);
1960 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1961 	{
1962 		/*
1963 		 * Have a chunk, extract the sequence number and the data
1964 		 */
1965 		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1966 		Assert(!isnull);
1967 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1968 		Assert(!isnull);
1969 		if (!VARATT_IS_EXTENDED(chunk))
1970 		{
1971 			chunksize = VARSIZE(chunk) - VARHDRSZ;
1972 			chunkdata = VARDATA(chunk);
1973 		}
1974 		else if (VARATT_IS_SHORT(chunk))
1975 		{
1976 			/* could happen due to heap_form_tuple doing its thing */
1977 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1978 			chunkdata = VARDATA_SHORT(chunk);
1979 		}
1980 		else
1981 		{
1982 			/* should never happen */
1983 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1984 				 toast_pointer.va_valueid,
1985 				 RelationGetRelationName(toastrel));
1986 			chunksize = 0;		/* keep compiler quiet */
1987 			chunkdata = NULL;
1988 		}
1989 
1990 		/*
1991 		 * Some checks on the data we've found
1992 		 */
1993 		if (residx != nextidx)
1994 			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1995 				 residx, nextidx,
1996 				 toast_pointer.va_valueid,
1997 				 RelationGetRelationName(toastrel));
1998 		if (residx < numchunks - 1)
1999 		{
2000 			if (chunksize != TOAST_MAX_CHUNK_SIZE)
2001 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
2002 					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
2003 					 residx, numchunks,
2004 					 toast_pointer.va_valueid,
2005 					 RelationGetRelationName(toastrel));
2006 		}
2007 		else if (residx == numchunks - 1)
2008 		{
2009 			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
2010 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
2011 					 chunksize,
2012 					 (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
2013 					 residx,
2014 					 toast_pointer.va_valueid,
2015 					 RelationGetRelationName(toastrel));
2016 		}
2017 		else
2018 			elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2019 				 residx,
2020 				 0, numchunks - 1,
2021 				 toast_pointer.va_valueid,
2022 				 RelationGetRelationName(toastrel));
2023 
2024 		/*
2025 		 * Copy the data into proper place in our result
2026 		 */
2027 		memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
2028 			   chunkdata,
2029 			   chunksize);
2030 
2031 		nextidx++;
2032 	}
2033 
2034 	/*
2035 	 * Final checks that we successfully fetched the datum
2036 	 */
2037 	if (nextidx != numchunks)
2038 		elog(ERROR, "missing chunk number %d for toast value %u in %s",
2039 			 nextidx,
2040 			 toast_pointer.va_valueid,
2041 			 RelationGetRelationName(toastrel));
2042 
2043 	/*
2044 	 * End scan and close relations
2045 	 */
2046 	systable_endscan_ordered(toastscan);
2047 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2048 	heap_close(toastrel, AccessShareLock);
2049 
2050 	return result;
2051 }
2052 
2053 /* ----------
2054  * toast_fetch_datum_slice -
2055  *
2056  *	Reconstruct a segment of a Datum from the chunks saved
2057  *	in the toast relation
2058  * ----------
2059  */
2060 static struct varlena *
toast_fetch_datum_slice(struct varlena * attr,int32 sliceoffset,int32 length)2061 toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, int32 length)
2062 {
2063 	Relation	toastrel;
2064 	Relation   *toastidxs;
2065 	ScanKeyData toastkey[3];
2066 	int			nscankeys;
2067 	SysScanDesc toastscan;
2068 	HeapTuple	ttup;
2069 	TupleDesc	toasttupDesc;
2070 	struct varlena *result;
2071 	struct varatt_external toast_pointer;
2072 	int32		attrsize;
2073 	int32		residx;
2074 	int32		nextidx;
2075 	int			numchunks;
2076 	int			startchunk;
2077 	int			endchunk;
2078 	int32		startoffset;
2079 	int32		endoffset;
2080 	int			totalchunks;
2081 	Pointer		chunk;
2082 	bool		isnull;
2083 	char	   *chunkdata;
2084 	int32		chunksize;
2085 	int32		chcpystrt;
2086 	int32		chcpyend;
2087 	int			num_indexes;
2088 	int			validIndex;
2089 	SnapshotData SnapshotToast;
2090 
2091 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
2092 		elog(ERROR, "toast_fetch_datum_slice shouldn't be called for non-ondisk datums");
2093 
2094 	/* Must copy to access aligned fields */
2095 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
2096 
2097 	/*
2098 	 * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
2099 	 * we can't return a compressed datum which is meaningful to toast later
2100 	 */
2101 	Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
2102 
2103 	attrsize = toast_pointer.va_extsize;
2104 	totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
2105 
2106 	if (sliceoffset >= attrsize)
2107 	{
2108 		sliceoffset = 0;
2109 		length = 0;
2110 	}
2111 
2112 	/*
2113 	 * Adjust length request if needed.  (Note: our sole caller,
2114 	 * heap_tuple_untoast_attr_slice, protects us against sliceoffset + length
2115 	 * overflowing.)
2116 	 */
2117 	else if (((sliceoffset + length) > attrsize) || length < 0)
2118 		length = attrsize - sliceoffset;
2119 
2120 	result = (struct varlena *) palloc(length + VARHDRSZ);
2121 
2122 	if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2123 		SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
2124 	else
2125 		SET_VARSIZE(result, length + VARHDRSZ);
2126 
2127 	if (length == 0)
2128 		return result;			/* Can save a lot of work at this point! */
2129 
2130 	startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
2131 	endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
2132 	numchunks = (endchunk - startchunk) + 1;
2133 
2134 	startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
2135 	endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
2136 
2137 	/*
2138 	 * Open the toast relation and its indexes
2139 	 */
2140 	toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
2141 	toasttupDesc = toastrel->rd_att;
2142 
2143 	/* Look for the valid index of toast relation */
2144 	validIndex = toast_open_indexes(toastrel,
2145 									AccessShareLock,
2146 									&toastidxs,
2147 									&num_indexes);
2148 
2149 	/*
2150 	 * Setup a scan key to fetch from the index. This is either two keys or
2151 	 * three depending on the number of chunks.
2152 	 */
2153 	ScanKeyInit(&toastkey[0],
2154 				(AttrNumber) 1,
2155 				BTEqualStrategyNumber, F_OIDEQ,
2156 				ObjectIdGetDatum(toast_pointer.va_valueid));
2157 
2158 	/*
2159 	 * Use equality condition for one chunk, a range condition otherwise:
2160 	 */
2161 	if (numchunks == 1)
2162 	{
2163 		ScanKeyInit(&toastkey[1],
2164 					(AttrNumber) 2,
2165 					BTEqualStrategyNumber, F_INT4EQ,
2166 					Int32GetDatum(startchunk));
2167 		nscankeys = 2;
2168 	}
2169 	else
2170 	{
2171 		ScanKeyInit(&toastkey[1],
2172 					(AttrNumber) 2,
2173 					BTGreaterEqualStrategyNumber, F_INT4GE,
2174 					Int32GetDatum(startchunk));
2175 		ScanKeyInit(&toastkey[2],
2176 					(AttrNumber) 2,
2177 					BTLessEqualStrategyNumber, F_INT4LE,
2178 					Int32GetDatum(endchunk));
2179 		nscankeys = 3;
2180 	}
2181 
2182 	/*
2183 	 * Read the chunks by index
2184 	 *
2185 	 * The index is on (valueid, chunkidx) so they will come in order
2186 	 */
2187 	init_toast_snapshot(&SnapshotToast);
2188 	nextidx = startchunk;
2189 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
2190 										   &SnapshotToast, nscankeys, toastkey);
2191 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
2192 	{
2193 		/*
2194 		 * Have a chunk, extract the sequence number and the data
2195 		 */
2196 		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
2197 		Assert(!isnull);
2198 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
2199 		Assert(!isnull);
2200 		if (!VARATT_IS_EXTENDED(chunk))
2201 		{
2202 			chunksize = VARSIZE(chunk) - VARHDRSZ;
2203 			chunkdata = VARDATA(chunk);
2204 		}
2205 		else if (VARATT_IS_SHORT(chunk))
2206 		{
2207 			/* could happen due to heap_form_tuple doing its thing */
2208 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2209 			chunkdata = VARDATA_SHORT(chunk);
2210 		}
2211 		else
2212 		{
2213 			/* should never happen */
2214 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
2215 				 toast_pointer.va_valueid,
2216 				 RelationGetRelationName(toastrel));
2217 			chunksize = 0;		/* keep compiler quiet */
2218 			chunkdata = NULL;
2219 		}
2220 
2221 		/*
2222 		 * Some checks on the data we've found
2223 		 */
2224 		if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
2225 			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
2226 				 residx, nextidx,
2227 				 toast_pointer.va_valueid,
2228 				 RelationGetRelationName(toastrel));
2229 		if (residx < totalchunks - 1)
2230 		{
2231 			if (chunksize != TOAST_MAX_CHUNK_SIZE)
2232 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
2233 					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
2234 					 residx, totalchunks,
2235 					 toast_pointer.va_valueid,
2236 					 RelationGetRelationName(toastrel));
2237 		}
2238 		else if (residx == totalchunks - 1)
2239 		{
2240 			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
2241 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
2242 					 chunksize,
2243 					 (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
2244 					 residx,
2245 					 toast_pointer.va_valueid,
2246 					 RelationGetRelationName(toastrel));
2247 		}
2248 		else
2249 			elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2250 				 residx,
2251 				 0, totalchunks - 1,
2252 				 toast_pointer.va_valueid,
2253 				 RelationGetRelationName(toastrel));
2254 
2255 		/*
2256 		 * Copy the data into proper place in our result
2257 		 */
2258 		chcpystrt = 0;
2259 		chcpyend = chunksize - 1;
2260 		if (residx == startchunk)
2261 			chcpystrt = startoffset;
2262 		if (residx == endchunk)
2263 			chcpyend = endoffset;
2264 
2265 		memcpy(VARDATA(result) +
2266 			   (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
2267 			   chunkdata + chcpystrt,
2268 			   (chcpyend - chcpystrt) + 1);
2269 
2270 		nextidx++;
2271 	}
2272 
2273 	/*
2274 	 * Final checks that we successfully fetched the datum
2275 	 */
2276 	if (nextidx != (endchunk + 1))
2277 		elog(ERROR, "missing chunk number %d for toast value %u in %s",
2278 			 nextidx,
2279 			 toast_pointer.va_valueid,
2280 			 RelationGetRelationName(toastrel));
2281 
2282 	/*
2283 	 * End scan and close relations
2284 	 */
2285 	systable_endscan_ordered(toastscan);
2286 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2287 	heap_close(toastrel, AccessShareLock);
2288 
2289 	return result;
2290 }
2291 
2292 /* ----------
2293  * toast_decompress_datum -
2294  *
2295  * Decompress a compressed version of a varlena datum
2296  */
2297 static struct varlena *
toast_decompress_datum(struct varlena * attr)2298 toast_decompress_datum(struct varlena *attr)
2299 {
2300 	struct varlena *result;
2301 
2302 	Assert(VARATT_IS_COMPRESSED(attr));
2303 
2304 	result = (struct varlena *)
2305 		palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2306 	SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2307 
2308 	if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
2309 						VARSIZE(attr) - TOAST_COMPRESS_HDRSZ,
2310 						VARDATA(result),
2311 						TOAST_COMPRESS_RAWSIZE(attr)) < 0)
2312 		elog(ERROR, "compressed data is corrupted");
2313 
2314 	return result;
2315 }
2316 
2317 
2318 /* ----------
2319  * toast_open_indexes
2320  *
2321  *	Get an array of the indexes associated to the given toast relation
2322  *	and return as well the position of the valid index used by the toast
2323  *	relation in this array. It is the responsibility of the caller of this
2324  *	function to close the indexes as well as free them.
2325  */
2326 static int
toast_open_indexes(Relation toastrel,LOCKMODE lock,Relation ** toastidxs,int * num_indexes)2327 toast_open_indexes(Relation toastrel,
2328 				   LOCKMODE lock,
2329 				   Relation **toastidxs,
2330 				   int *num_indexes)
2331 {
2332 	int			i = 0;
2333 	int			res = 0;
2334 	bool		found = false;
2335 	List	   *indexlist;
2336 	ListCell   *lc;
2337 
2338 	/* Get index list of the toast relation */
2339 	indexlist = RelationGetIndexList(toastrel);
2340 	Assert(indexlist != NIL);
2341 
2342 	*num_indexes = list_length(indexlist);
2343 
2344 	/* Open all the index relations */
2345 	*toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation));
2346 	foreach(lc, indexlist)
2347 		(*toastidxs)[i++] = index_open(lfirst_oid(lc), lock);
2348 
2349 	/* Fetch the first valid index in list */
2350 	for (i = 0; i < *num_indexes; i++)
2351 	{
2352 		Relation	toastidx = (*toastidxs)[i];
2353 
2354 		if (toastidx->rd_index->indisvalid)
2355 		{
2356 			res = i;
2357 			found = true;
2358 			break;
2359 		}
2360 	}
2361 
2362 	/*
2363 	 * Free index list, not necessary anymore as relations are opened and a
2364 	 * valid index has been found.
2365 	 */
2366 	list_free(indexlist);
2367 
2368 	/*
2369 	 * The toast relation should have one valid index, so something is going
2370 	 * wrong if there is nothing.
2371 	 */
2372 	if (!found)
2373 		elog(ERROR, "no valid index found for toast relation with Oid %u",
2374 			 RelationGetRelid(toastrel));
2375 
2376 	return res;
2377 }
2378 
2379 /* ----------
2380  * toast_close_indexes
2381  *
2382  *	Close an array of indexes for a toast relation and free it. This should
2383  *	be called for a set of indexes opened previously with toast_open_indexes.
2384  */
2385 static void
toast_close_indexes(Relation * toastidxs,int num_indexes,LOCKMODE lock)2386 toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock)
2387 {
2388 	int			i;
2389 
2390 	/* Close relations and clean up things */
2391 	for (i = 0; i < num_indexes; i++)
2392 		index_close(toastidxs[i], lock);
2393 	pfree(toastidxs);
2394 }
2395 
2396 /* ----------
2397  * init_toast_snapshot
2398  *
2399  *	Initialize an appropriate TOAST snapshot.  We must use an MVCC snapshot
2400  *	to initialize the TOAST snapshot; since we don't know which one to use,
2401  *	just use the oldest one.  This is safe: at worst, we will get a "snapshot
2402  *	too old" error that might have been avoided otherwise.
2403  */
2404 static void
init_toast_snapshot(Snapshot toast_snapshot)2405 init_toast_snapshot(Snapshot toast_snapshot)
2406 {
2407 	Snapshot	snapshot = GetOldestSnapshot();
2408 
2409 	/*
2410 	 * GetOldestSnapshot returns NULL if the session has no active snapshots.
2411 	 * We can get that if, for example, a procedure fetches a toasted value
2412 	 * into a local variable, commits, and then tries to detoast the value.
2413 	 * Such coding is unsafe, because once we commit there is nothing to
2414 	 * prevent the toast data from being deleted.  Detoasting *must* happen in
2415 	 * the same transaction that originally fetched the toast pointer.  Hence,
2416 	 * rather than trying to band-aid over the problem, throw an error.  (This
2417 	 * is not very much protection, because in many scenarios the procedure
2418 	 * would have already created a new transaction snapshot, preventing us
2419 	 * from detecting the problem.  But it's better than nothing, and for sure
2420 	 * we shouldn't expend code on masking the problem more.)
2421 	 */
2422 	if (snapshot == NULL)
2423 		elog(ERROR, "cannot fetch toast data without an active snapshot");
2424 
2425 	InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken);
2426 }
2427