1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *	  Support routines for external and compressed storage of
5  *	  variable size attributes.
6  *
7  * Copyright (c) 2000-2016, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *		toast_insert_or_update -
16  *			Try to make a given tuple fit into one page by compressing
17  *			or moving off attributes
18  *
19  *		toast_delete -
20  *			Reclaim toast storage when a tuple is deleted
21  *
22  *		heap_tuple_untoast_attr -
23  *			Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27 
28 #include "postgres.h"
29 
30 #include <unistd.h>
31 #include <fcntl.h>
32 
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "common/pg_lzcompress.h"
39 #include "miscadmin.h"
40 #include "utils/expandeddatum.h"
41 #include "utils/fmgroids.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/typcache.h"
45 #include "utils/tqual.h"
46 
47 
48 #undef TOAST_DEBUG
49 
50 /*
51  *	The information at the start of the compressed toast data.
52  */
53 typedef struct toast_compress_header
54 {
55 	int32		vl_len_;		/* varlena header (do not touch directly!) */
56 	int32		rawsize;
57 } toast_compress_header;
58 
59 /*
60  * Utilities for manipulation of header information for compressed
61  * toast entries.
62  */
63 #define TOAST_COMPRESS_HDRSZ		((int32) sizeof(toast_compress_header))
64 #define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->rawsize)
65 #define TOAST_COMPRESS_RAWDATA(ptr) \
66 	(((char *) (ptr)) + TOAST_COMPRESS_HDRSZ)
67 #define TOAST_COMPRESS_SET_RAWSIZE(ptr, len) \
68 	(((toast_compress_header *) (ptr))->rawsize = (len))
69 
70 static void toast_delete_datum(Relation rel, Datum value, bool is_speculative);
71 static Datum toast_save_datum(Relation rel, Datum value,
72 				 struct varlena * oldexternal, int options);
73 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
74 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
75 static struct varlena *toast_fetch_datum(struct varlena * attr);
76 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
77 						int32 sliceoffset, int32 length);
78 static struct varlena *toast_decompress_datum(struct varlena * attr);
79 static int toast_open_indexes(Relation toastrel,
80 				   LOCKMODE lock,
81 				   Relation **toastidxs,
82 				   int *num_indexes);
83 static void toast_close_indexes(Relation *toastidxs, int num_indexes,
84 					LOCKMODE lock);
85 static void init_toast_snapshot(Snapshot toast_snapshot);
86 
87 
88 /* ----------
89  * heap_tuple_fetch_attr -
90  *
91  *	Public entry point to get back a toasted value from
92  *	external source (possibly still in compressed format).
93  *
94  * This will return a datum that contains all the data internally, ie, not
95  * relying on external storage or memory, but it can still be compressed or
96  * have a short header.  Note some callers assume that if the input is an
97  * EXTERNAL datum, the result will be a pfree'able chunk.
98  * ----------
99  */
100 struct varlena *
heap_tuple_fetch_attr(struct varlena * attr)101 heap_tuple_fetch_attr(struct varlena * attr)
102 {
103 	struct varlena *result;
104 
105 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
106 	{
107 		/*
108 		 * This is an external stored plain value
109 		 */
110 		result = toast_fetch_datum(attr);
111 	}
112 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
113 	{
114 		/*
115 		 * This is an indirect pointer --- dereference it
116 		 */
117 		struct varatt_indirect redirect;
118 
119 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
120 		attr = (struct varlena *) redirect.pointer;
121 
122 		/* nested indirect Datums aren't allowed */
123 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
124 
125 		/* recurse if value is still external in some other way */
126 		if (VARATT_IS_EXTERNAL(attr))
127 			return heap_tuple_fetch_attr(attr);
128 
129 		/*
130 		 * Copy into the caller's memory context, in case caller tries to
131 		 * pfree the result.
132 		 */
133 		result = (struct varlena *) palloc(VARSIZE_ANY(attr));
134 		memcpy(result, attr, VARSIZE_ANY(attr));
135 	}
136 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
137 	{
138 		/*
139 		 * This is an expanded-object pointer --- get flat format
140 		 */
141 		ExpandedObjectHeader *eoh;
142 		Size		resultsize;
143 
144 		eoh = DatumGetEOHP(PointerGetDatum(attr));
145 		resultsize = EOH_get_flat_size(eoh);
146 		result = (struct varlena *) palloc(resultsize);
147 		EOH_flatten_into(eoh, (void *) result, resultsize);
148 	}
149 	else
150 	{
151 		/*
152 		 * This is a plain value inside of the main tuple - why am I called?
153 		 */
154 		result = attr;
155 	}
156 
157 	return result;
158 }
159 
160 
161 /* ----------
162  * heap_tuple_untoast_attr -
163  *
164  *	Public entry point to get back a toasted value from compression
165  *	or external storage.  The result is always non-extended varlena form.
166  *
167  * Note some callers assume that if the input is an EXTERNAL or COMPRESSED
168  * datum, the result will be a pfree'able chunk.
169  * ----------
170  */
171 struct varlena *
heap_tuple_untoast_attr(struct varlena * attr)172 heap_tuple_untoast_attr(struct varlena * attr)
173 {
174 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
175 	{
176 		/*
177 		 * This is an externally stored datum --- fetch it back from there
178 		 */
179 		attr = toast_fetch_datum(attr);
180 		/* If it's compressed, decompress it */
181 		if (VARATT_IS_COMPRESSED(attr))
182 		{
183 			struct varlena *tmp = attr;
184 
185 			attr = toast_decompress_datum(tmp);
186 			pfree(tmp);
187 		}
188 	}
189 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
190 	{
191 		/*
192 		 * This is an indirect pointer --- dereference it
193 		 */
194 		struct varatt_indirect redirect;
195 
196 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
197 		attr = (struct varlena *) redirect.pointer;
198 
199 		/* nested indirect Datums aren't allowed */
200 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
201 
202 		/* recurse in case value is still extended in some other way */
203 		attr = heap_tuple_untoast_attr(attr);
204 
205 		/* if it isn't, we'd better copy it */
206 		if (attr == (struct varlena *) redirect.pointer)
207 		{
208 			struct varlena *result;
209 
210 			result = (struct varlena *) palloc(VARSIZE_ANY(attr));
211 			memcpy(result, attr, VARSIZE_ANY(attr));
212 			attr = result;
213 		}
214 	}
215 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
216 	{
217 		/*
218 		 * This is an expanded-object pointer --- get flat format
219 		 */
220 		attr = heap_tuple_fetch_attr(attr);
221 		/* flatteners are not allowed to produce compressed/short output */
222 		Assert(!VARATT_IS_EXTENDED(attr));
223 	}
224 	else if (VARATT_IS_COMPRESSED(attr))
225 	{
226 		/*
227 		 * This is a compressed value inside of the main tuple
228 		 */
229 		attr = toast_decompress_datum(attr);
230 	}
231 	else if (VARATT_IS_SHORT(attr))
232 	{
233 		/*
234 		 * This is a short-header varlena --- convert to 4-byte header format
235 		 */
236 		Size		data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
237 		Size		new_size = data_size + VARHDRSZ;
238 		struct varlena *new_attr;
239 
240 		new_attr = (struct varlena *) palloc(new_size);
241 		SET_VARSIZE(new_attr, new_size);
242 		memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
243 		attr = new_attr;
244 	}
245 
246 	return attr;
247 }
248 
249 
250 /* ----------
251  * heap_tuple_untoast_attr_slice -
252  *
253  *		Public entry point to get back part of a toasted value
254  *		from compression or external storage.
255  * ----------
256  */
257 struct varlena *
heap_tuple_untoast_attr_slice(struct varlena * attr,int32 sliceoffset,int32 slicelength)258 heap_tuple_untoast_attr_slice(struct varlena * attr,
259 							  int32 sliceoffset, int32 slicelength)
260 {
261 	struct varlena *preslice;
262 	struct varlena *result;
263 	char	   *attrdata;
264 	int32		attrsize;
265 
266 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
267 	{
268 		struct varatt_external toast_pointer;
269 
270 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
271 
272 		/* fast path for non-compressed external datums */
273 		if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
274 			return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
275 
276 		/* fetch it back (compressed marker will get set automatically) */
277 		preslice = toast_fetch_datum(attr);
278 	}
279 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
280 	{
281 		struct varatt_indirect redirect;
282 
283 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
284 
285 		/* nested indirect Datums aren't allowed */
286 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
287 
288 		return heap_tuple_untoast_attr_slice(redirect.pointer,
289 											 sliceoffset, slicelength);
290 	}
291 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
292 	{
293 		/* pass it off to heap_tuple_fetch_attr to flatten */
294 		preslice = heap_tuple_fetch_attr(attr);
295 	}
296 	else
297 		preslice = attr;
298 
299 	Assert(!VARATT_IS_EXTERNAL(preslice));
300 
301 	if (VARATT_IS_COMPRESSED(preslice))
302 	{
303 		struct varlena *tmp = preslice;
304 
305 		preslice = toast_decompress_datum(tmp);
306 
307 		if (tmp != attr)
308 			pfree(tmp);
309 	}
310 
311 	if (VARATT_IS_SHORT(preslice))
312 	{
313 		attrdata = VARDATA_SHORT(preslice);
314 		attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
315 	}
316 	else
317 	{
318 		attrdata = VARDATA(preslice);
319 		attrsize = VARSIZE(preslice) - VARHDRSZ;
320 	}
321 
322 	/* slicing of datum for compressed cases and plain value */
323 
324 	if (sliceoffset >= attrsize)
325 	{
326 		sliceoffset = 0;
327 		slicelength = 0;
328 	}
329 
330 	if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
331 		slicelength = attrsize - sliceoffset;
332 
333 	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
334 	SET_VARSIZE(result, slicelength + VARHDRSZ);
335 
336 	memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
337 
338 	if (preslice != attr)
339 		pfree(preslice);
340 
341 	return result;
342 }
343 
344 
345 /* ----------
346  * toast_raw_datum_size -
347  *
348  *	Return the raw (detoasted) size of a varlena datum
349  *	(including the VARHDRSZ header)
350  * ----------
351  */
352 Size
toast_raw_datum_size(Datum value)353 toast_raw_datum_size(Datum value)
354 {
355 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
356 	Size		result;
357 
358 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
359 	{
360 		/* va_rawsize is the size of the original datum -- including header */
361 		struct varatt_external toast_pointer;
362 
363 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
364 		result = toast_pointer.va_rawsize;
365 	}
366 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
367 	{
368 		struct varatt_indirect toast_pointer;
369 
370 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
371 
372 		/* nested indirect Datums aren't allowed */
373 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
374 
375 		return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
376 	}
377 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
378 	{
379 		result = EOH_get_flat_size(DatumGetEOHP(value));
380 	}
381 	else if (VARATT_IS_COMPRESSED(attr))
382 	{
383 		/* here, va_rawsize is just the payload size */
384 		result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
385 	}
386 	else if (VARATT_IS_SHORT(attr))
387 	{
388 		/*
389 		 * we have to normalize the header length to VARHDRSZ or else the
390 		 * callers of this function will be confused.
391 		 */
392 		result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
393 	}
394 	else
395 	{
396 		/* plain untoasted datum */
397 		result = VARSIZE(attr);
398 	}
399 	return result;
400 }
401 
402 /* ----------
403  * toast_datum_size
404  *
405  *	Return the physical storage size (possibly compressed) of a varlena datum
406  * ----------
407  */
408 Size
toast_datum_size(Datum value)409 toast_datum_size(Datum value)
410 {
411 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
412 	Size		result;
413 
414 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
415 	{
416 		/*
417 		 * Attribute is stored externally - return the extsize whether
418 		 * compressed or not.  We do not count the size of the toast pointer
419 		 * ... should we?
420 		 */
421 		struct varatt_external toast_pointer;
422 
423 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
424 		result = toast_pointer.va_extsize;
425 	}
426 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
427 	{
428 		struct varatt_indirect toast_pointer;
429 
430 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
431 
432 		/* nested indirect Datums aren't allowed */
433 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
434 
435 		return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
436 	}
437 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
438 	{
439 		result = EOH_get_flat_size(DatumGetEOHP(value));
440 	}
441 	else if (VARATT_IS_SHORT(attr))
442 	{
443 		result = VARSIZE_SHORT(attr);
444 	}
445 	else
446 	{
447 		/*
448 		 * Attribute is stored inline either compressed or not, just calculate
449 		 * the size of the datum in either case.
450 		 */
451 		result = VARSIZE(attr);
452 	}
453 	return result;
454 }
455 
456 
457 /* ----------
458  * toast_delete -
459  *
460  *	Cascaded delete toast-entries on DELETE
461  * ----------
462  */
463 void
toast_delete(Relation rel,HeapTuple oldtup,bool is_speculative)464 toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
465 {
466 	TupleDesc	tupleDesc;
467 	Form_pg_attribute *att;
468 	int			numAttrs;
469 	int			i;
470 	Datum		toast_values[MaxHeapAttributeNumber];
471 	bool		toast_isnull[MaxHeapAttributeNumber];
472 
473 	/*
474 	 * We should only ever be called for tuples of plain relations or
475 	 * materialized views --- recursing on a toast rel is bad news.
476 	 */
477 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
478 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
479 
480 	/*
481 	 * Get the tuple descriptor and break down the tuple into fields.
482 	 *
483 	 * NOTE: it's debatable whether to use heap_deform_tuple() here or just
484 	 * heap_getattr() only the varlena columns.  The latter could win if there
485 	 * are few varlena columns and many non-varlena ones. However,
486 	 * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
487 	 * O(N^2) if there are many varlena columns, so it seems better to err on
488 	 * the side of linear cost.  (We won't even be here unless there's at
489 	 * least one varlena column, by the way.)
490 	 */
491 	tupleDesc = rel->rd_att;
492 	att = tupleDesc->attrs;
493 	numAttrs = tupleDesc->natts;
494 
495 	Assert(numAttrs <= MaxHeapAttributeNumber);
496 	heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
497 
498 	/*
499 	 * Check for external stored attributes and delete them from the secondary
500 	 * relation.
501 	 */
502 	for (i = 0; i < numAttrs; i++)
503 	{
504 		if (att[i]->attlen == -1)
505 		{
506 			Datum		value = toast_values[i];
507 
508 			if (toast_isnull[i])
509 				continue;
510 			else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
511 				toast_delete_datum(rel, value, is_speculative);
512 		}
513 	}
514 }
515 
516 
517 /* ----------
518  * toast_insert_or_update -
519  *
520  *	Delete no-longer-used toast-entries and create new ones to
521  *	make the new tuple fit on INSERT or UPDATE
522  *
523  * Inputs:
524  *	newtup: the candidate new tuple to be inserted
525  *	oldtup: the old row version for UPDATE, or NULL for INSERT
526  *	options: options to be passed to heap_insert() for toast rows
527  * Result:
528  *	either newtup if no toasting is needed, or a palloc'd modified tuple
529  *	that is what should actually get stored
530  *
531  * NOTE: neither newtup nor oldtup will be modified.  This is a change
532  * from the pre-8.1 API of this routine.
533  * ----------
534  */
535 HeapTuple
toast_insert_or_update(Relation rel,HeapTuple newtup,HeapTuple oldtup,int options)536 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
537 					   int options)
538 {
539 	HeapTuple	result_tuple;
540 	TupleDesc	tupleDesc;
541 	Form_pg_attribute *att;
542 	int			numAttrs;
543 	int			i;
544 
545 	bool		need_change = false;
546 	bool		need_free = false;
547 	bool		need_delold = false;
548 	bool		has_nulls = false;
549 
550 	Size		maxDataLen;
551 	Size		hoff;
552 
553 	char		toast_action[MaxHeapAttributeNumber];
554 	bool		toast_isnull[MaxHeapAttributeNumber];
555 	bool		toast_oldisnull[MaxHeapAttributeNumber];
556 	Datum		toast_values[MaxHeapAttributeNumber];
557 	Datum		toast_oldvalues[MaxHeapAttributeNumber];
558 	struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
559 	int32		toast_sizes[MaxHeapAttributeNumber];
560 	bool		toast_free[MaxHeapAttributeNumber];
561 	bool		toast_delold[MaxHeapAttributeNumber];
562 
563 	/*
564 	 * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super
565 	 * deletions just normally insert/delete the toast values. It seems
566 	 * easiest to deal with that here, instead on, potentially, multiple
567 	 * callers.
568 	 */
569 	options &= ~HEAP_INSERT_SPECULATIVE;
570 
571 	/*
572 	 * We should only ever be called for tuples of plain relations or
573 	 * materialized views --- recursing on a toast rel is bad news.
574 	 */
575 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
576 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
577 
578 	/*
579 	 * Get the tuple descriptor and break down the tuple(s) into fields.
580 	 */
581 	tupleDesc = rel->rd_att;
582 	att = tupleDesc->attrs;
583 	numAttrs = tupleDesc->natts;
584 
585 	Assert(numAttrs <= MaxHeapAttributeNumber);
586 	heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
587 	if (oldtup != NULL)
588 		heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
589 
590 	/* ----------
591 	 * Then collect information about the values given
592 	 *
593 	 * NOTE: toast_action[i] can have these values:
594 	 *		' '		default handling
595 	 *		'p'		already processed --- don't touch it
596 	 *		'x'		incompressible, but OK to move off
597 	 *
598 	 * NOTE: toast_sizes[i] is only made valid for varlena attributes with
599 	 *		toast_action[i] different from 'p'.
600 	 * ----------
601 	 */
602 	memset(toast_action, ' ', numAttrs * sizeof(char));
603 	memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
604 	memset(toast_free, 0, numAttrs * sizeof(bool));
605 	memset(toast_delold, 0, numAttrs * sizeof(bool));
606 
607 	for (i = 0; i < numAttrs; i++)
608 	{
609 		struct varlena *old_value;
610 		struct varlena *new_value;
611 
612 		if (oldtup != NULL)
613 		{
614 			/*
615 			 * For UPDATE get the old and new values of this attribute
616 			 */
617 			old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
618 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
619 
620 			/*
621 			 * If the old value is stored on disk, check if it has changed so
622 			 * we have to delete it later.
623 			 */
624 			if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
625 				VARATT_IS_EXTERNAL_ONDISK(old_value))
626 			{
627 				if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
628 					memcmp((char *) old_value, (char *) new_value,
629 						   VARSIZE_EXTERNAL(old_value)) != 0)
630 				{
631 					/*
632 					 * The old external stored value isn't needed any more
633 					 * after the update
634 					 */
635 					toast_delold[i] = true;
636 					need_delold = true;
637 				}
638 				else
639 				{
640 					/*
641 					 * This attribute isn't changed by this update so we reuse
642 					 * the original reference to the old value in the new
643 					 * tuple.
644 					 */
645 					toast_action[i] = 'p';
646 					continue;
647 				}
648 			}
649 		}
650 		else
651 		{
652 			/*
653 			 * For INSERT simply get the new value
654 			 */
655 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
656 		}
657 
658 		/*
659 		 * Handle NULL attributes
660 		 */
661 		if (toast_isnull[i])
662 		{
663 			toast_action[i] = 'p';
664 			has_nulls = true;
665 			continue;
666 		}
667 
668 		/*
669 		 * Now look at varlena attributes
670 		 */
671 		if (att[i]->attlen == -1)
672 		{
673 			/*
674 			 * If the table's attribute says PLAIN always, force it so.
675 			 */
676 			if (att[i]->attstorage == 'p')
677 				toast_action[i] = 'p';
678 
679 			/*
680 			 * We took care of UPDATE above, so any external value we find
681 			 * still in the tuple must be someone else's that we cannot reuse
682 			 * (this includes the case of an out-of-line in-memory datum).
683 			 * Fetch it back (without decompression, unless we are forcing
684 			 * PLAIN storage).  If necessary, we'll push it out as a new
685 			 * external value below.
686 			 */
687 			if (VARATT_IS_EXTERNAL(new_value))
688 			{
689 				toast_oldexternal[i] = new_value;
690 				if (att[i]->attstorage == 'p')
691 					new_value = heap_tuple_untoast_attr(new_value);
692 				else
693 					new_value = heap_tuple_fetch_attr(new_value);
694 				toast_values[i] = PointerGetDatum(new_value);
695 				toast_free[i] = true;
696 				need_change = true;
697 				need_free = true;
698 			}
699 
700 			/*
701 			 * Remember the size of this attribute
702 			 */
703 			toast_sizes[i] = VARSIZE_ANY(new_value);
704 		}
705 		else
706 		{
707 			/*
708 			 * Not a varlena attribute, plain storage always
709 			 */
710 			toast_action[i] = 'p';
711 		}
712 	}
713 
714 	/* ----------
715 	 * Compress and/or save external until data fits into target length
716 	 *
717 	 *	1: Inline compress attributes with attstorage 'x', and store very
718 	 *	   large attributes with attstorage 'x' or 'e' external immediately
719 	 *	2: Store attributes with attstorage 'x' or 'e' external
720 	 *	3: Inline compress attributes with attstorage 'm'
721 	 *	4: Store attributes with attstorage 'm' external
722 	 * ----------
723 	 */
724 
725 	/* compute header overhead --- this should match heap_form_tuple() */
726 	hoff = SizeofHeapTupleHeader;
727 	if (has_nulls)
728 		hoff += BITMAPLEN(numAttrs);
729 	if (newtup->t_data->t_infomask & HEAP_HASOID)
730 		hoff += sizeof(Oid);
731 	hoff = MAXALIGN(hoff);
732 	/* now convert to a limit on the tuple data size */
733 	maxDataLen = TOAST_TUPLE_TARGET - hoff;
734 
735 	/*
736 	 * Look for attributes with attstorage 'x' to compress.  Also find large
737 	 * attributes with attstorage 'x' or 'e', and store them external.
738 	 */
739 	while (heap_compute_data_size(tupleDesc,
740 								  toast_values, toast_isnull) > maxDataLen)
741 	{
742 		int			biggest_attno = -1;
743 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
744 		Datum		old_value;
745 		Datum		new_value;
746 
747 		/*
748 		 * Search for the biggest yet unprocessed internal attribute
749 		 */
750 		for (i = 0; i < numAttrs; i++)
751 		{
752 			if (toast_action[i] != ' ')
753 				continue;
754 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
755 				continue;		/* can't happen, toast_action would be 'p' */
756 			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
757 				continue;
758 			if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
759 				continue;
760 			if (toast_sizes[i] > biggest_size)
761 			{
762 				biggest_attno = i;
763 				biggest_size = toast_sizes[i];
764 			}
765 		}
766 
767 		if (biggest_attno < 0)
768 			break;
769 
770 		/*
771 		 * Attempt to compress it inline, if it has attstorage 'x'
772 		 */
773 		i = biggest_attno;
774 		if (att[i]->attstorage == 'x')
775 		{
776 			old_value = toast_values[i];
777 			new_value = toast_compress_datum(old_value);
778 
779 			if (DatumGetPointer(new_value) != NULL)
780 			{
781 				/* successful compression */
782 				if (toast_free[i])
783 					pfree(DatumGetPointer(old_value));
784 				toast_values[i] = new_value;
785 				toast_free[i] = true;
786 				toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
787 				need_change = true;
788 				need_free = true;
789 			}
790 			else
791 			{
792 				/* incompressible, ignore on subsequent compression passes */
793 				toast_action[i] = 'x';
794 			}
795 		}
796 		else
797 		{
798 			/* has attstorage 'e', ignore on subsequent compression passes */
799 			toast_action[i] = 'x';
800 		}
801 
802 		/*
803 		 * If this value is by itself more than maxDataLen (after compression
804 		 * if any), push it out to the toast table immediately, if possible.
805 		 * This avoids uselessly compressing other fields in the common case
806 		 * where we have one long field and several short ones.
807 		 *
808 		 * XXX maybe the threshold should be less than maxDataLen?
809 		 */
810 		if (toast_sizes[i] > maxDataLen &&
811 			rel->rd_rel->reltoastrelid != InvalidOid)
812 		{
813 			old_value = toast_values[i];
814 			toast_action[i] = 'p';
815 			toast_values[i] = toast_save_datum(rel, toast_values[i],
816 											   toast_oldexternal[i], options);
817 			if (toast_free[i])
818 				pfree(DatumGetPointer(old_value));
819 			toast_free[i] = true;
820 			need_change = true;
821 			need_free = true;
822 		}
823 	}
824 
825 	/*
826 	 * Second we look for attributes of attstorage 'x' or 'e' that are still
827 	 * inline.  But skip this if there's no toast table to push them to.
828 	 */
829 	while (heap_compute_data_size(tupleDesc,
830 								  toast_values, toast_isnull) > maxDataLen &&
831 		   rel->rd_rel->reltoastrelid != InvalidOid)
832 	{
833 		int			biggest_attno = -1;
834 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
835 		Datum		old_value;
836 
837 		/*------
838 		 * Search for the biggest yet inlined attribute with
839 		 * attstorage equals 'x' or 'e'
840 		 *------
841 		 */
842 		for (i = 0; i < numAttrs; i++)
843 		{
844 			if (toast_action[i] == 'p')
845 				continue;
846 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
847 				continue;		/* can't happen, toast_action would be 'p' */
848 			if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
849 				continue;
850 			if (toast_sizes[i] > biggest_size)
851 			{
852 				biggest_attno = i;
853 				biggest_size = toast_sizes[i];
854 			}
855 		}
856 
857 		if (biggest_attno < 0)
858 			break;
859 
860 		/*
861 		 * Store this external
862 		 */
863 		i = biggest_attno;
864 		old_value = toast_values[i];
865 		toast_action[i] = 'p';
866 		toast_values[i] = toast_save_datum(rel, toast_values[i],
867 										   toast_oldexternal[i], options);
868 		if (toast_free[i])
869 			pfree(DatumGetPointer(old_value));
870 		toast_free[i] = true;
871 
872 		need_change = true;
873 		need_free = true;
874 	}
875 
876 	/*
877 	 * Round 3 - this time we take attributes with storage 'm' into
878 	 * compression
879 	 */
880 	while (heap_compute_data_size(tupleDesc,
881 								  toast_values, toast_isnull) > maxDataLen)
882 	{
883 		int			biggest_attno = -1;
884 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
885 		Datum		old_value;
886 		Datum		new_value;
887 
888 		/*
889 		 * Search for the biggest yet uncompressed internal attribute
890 		 */
891 		for (i = 0; i < numAttrs; i++)
892 		{
893 			if (toast_action[i] != ' ')
894 				continue;
895 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
896 				continue;		/* can't happen, toast_action would be 'p' */
897 			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
898 				continue;
899 			if (att[i]->attstorage != 'm')
900 				continue;
901 			if (toast_sizes[i] > biggest_size)
902 			{
903 				biggest_attno = i;
904 				biggest_size = toast_sizes[i];
905 			}
906 		}
907 
908 		if (biggest_attno < 0)
909 			break;
910 
911 		/*
912 		 * Attempt to compress it inline
913 		 */
914 		i = biggest_attno;
915 		old_value = toast_values[i];
916 		new_value = toast_compress_datum(old_value);
917 
918 		if (DatumGetPointer(new_value) != NULL)
919 		{
920 			/* successful compression */
921 			if (toast_free[i])
922 				pfree(DatumGetPointer(old_value));
923 			toast_values[i] = new_value;
924 			toast_free[i] = true;
925 			toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
926 			need_change = true;
927 			need_free = true;
928 		}
929 		else
930 		{
931 			/* incompressible, ignore on subsequent compression passes */
932 			toast_action[i] = 'x';
933 		}
934 	}
935 
936 	/*
937 	 * Finally we store attributes of type 'm' externally.  At this point we
938 	 * increase the target tuple size, so that 'm' attributes aren't stored
939 	 * externally unless really necessary.
940 	 */
941 	maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
942 
943 	while (heap_compute_data_size(tupleDesc,
944 								  toast_values, toast_isnull) > maxDataLen &&
945 		   rel->rd_rel->reltoastrelid != InvalidOid)
946 	{
947 		int			biggest_attno = -1;
948 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
949 		Datum		old_value;
950 
951 		/*--------
952 		 * Search for the biggest yet inlined attribute with
953 		 * attstorage = 'm'
954 		 *--------
955 		 */
956 		for (i = 0; i < numAttrs; i++)
957 		{
958 			if (toast_action[i] == 'p')
959 				continue;
960 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
961 				continue;		/* can't happen, toast_action would be 'p' */
962 			if (att[i]->attstorage != 'm')
963 				continue;
964 			if (toast_sizes[i] > biggest_size)
965 			{
966 				biggest_attno = i;
967 				biggest_size = toast_sizes[i];
968 			}
969 		}
970 
971 		if (biggest_attno < 0)
972 			break;
973 
974 		/*
975 		 * Store this external
976 		 */
977 		i = biggest_attno;
978 		old_value = toast_values[i];
979 		toast_action[i] = 'p';
980 		toast_values[i] = toast_save_datum(rel, toast_values[i],
981 										   toast_oldexternal[i], options);
982 		if (toast_free[i])
983 			pfree(DatumGetPointer(old_value));
984 		toast_free[i] = true;
985 
986 		need_change = true;
987 		need_free = true;
988 	}
989 
990 	/*
991 	 * In the case we toasted any values, we need to build a new heap tuple
992 	 * with the changed values.
993 	 */
994 	if (need_change)
995 	{
996 		HeapTupleHeader olddata = newtup->t_data;
997 		HeapTupleHeader new_data;
998 		int32		new_header_len;
999 		int32		new_data_len;
1000 		int32		new_tuple_len;
1001 
1002 		/*
1003 		 * Calculate the new size of the tuple.
1004 		 *
1005 		 * Note: we used to assume here that the old tuple's t_hoff must equal
1006 		 * the new_header_len value, but that was incorrect.  The old tuple
1007 		 * might have a smaller-than-current natts, if there's been an ALTER
1008 		 * TABLE ADD COLUMN since it was stored; and that would lead to a
1009 		 * different conclusion about the size of the null bitmap, or even
1010 		 * whether there needs to be one at all.
1011 		 */
1012 		new_header_len = SizeofHeapTupleHeader;
1013 		if (has_nulls)
1014 			new_header_len += BITMAPLEN(numAttrs);
1015 		if (olddata->t_infomask & HEAP_HASOID)
1016 			new_header_len += sizeof(Oid);
1017 		new_header_len = MAXALIGN(new_header_len);
1018 		new_data_len = heap_compute_data_size(tupleDesc,
1019 											  toast_values, toast_isnull);
1020 		new_tuple_len = new_header_len + new_data_len;
1021 
1022 		/*
1023 		 * Allocate and zero the space needed, and fill HeapTupleData fields.
1024 		 */
1025 		result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
1026 		result_tuple->t_len = new_tuple_len;
1027 		result_tuple->t_self = newtup->t_self;
1028 		result_tuple->t_tableOid = newtup->t_tableOid;
1029 		new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
1030 		result_tuple->t_data = new_data;
1031 
1032 		/*
1033 		 * Copy the existing tuple header, but adjust natts and t_hoff.
1034 		 */
1035 		memcpy(new_data, olddata, SizeofHeapTupleHeader);
1036 		HeapTupleHeaderSetNatts(new_data, numAttrs);
1037 		new_data->t_hoff = new_header_len;
1038 		if (olddata->t_infomask & HEAP_HASOID)
1039 			HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
1040 
1041 		/* Copy over the data, and fill the null bitmap if needed */
1042 		heap_fill_tuple(tupleDesc,
1043 						toast_values,
1044 						toast_isnull,
1045 						(char *) new_data + new_header_len,
1046 						new_data_len,
1047 						&(new_data->t_infomask),
1048 						has_nulls ? new_data->t_bits : NULL);
1049 	}
1050 	else
1051 		result_tuple = newtup;
1052 
1053 	/*
1054 	 * Free allocated temp values
1055 	 */
1056 	if (need_free)
1057 		for (i = 0; i < numAttrs; i++)
1058 			if (toast_free[i])
1059 				pfree(DatumGetPointer(toast_values[i]));
1060 
1061 	/*
1062 	 * Delete external values from the old tuple
1063 	 */
1064 	if (need_delold)
1065 		for (i = 0; i < numAttrs; i++)
1066 			if (toast_delold[i])
1067 				toast_delete_datum(rel, toast_oldvalues[i], false);
1068 
1069 	return result_tuple;
1070 }
1071 
1072 
1073 /* ----------
1074  * toast_flatten_tuple -
1075  *
1076  *	"Flatten" a tuple to contain no out-of-line toasted fields.
1077  *	(This does not eliminate compressed or short-header datums.)
1078  *
1079  *	Note: we expect the caller already checked HeapTupleHasExternal(tup),
1080  *	so there is no need for a short-circuit path.
1081  * ----------
1082  */
1083 HeapTuple
toast_flatten_tuple(HeapTuple tup,TupleDesc tupleDesc)1084 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
1085 {
1086 	HeapTuple	new_tuple;
1087 	Form_pg_attribute *att = tupleDesc->attrs;
1088 	int			numAttrs = tupleDesc->natts;
1089 	int			i;
1090 	Datum		toast_values[MaxTupleAttributeNumber];
1091 	bool		toast_isnull[MaxTupleAttributeNumber];
1092 	bool		toast_free[MaxTupleAttributeNumber];
1093 
1094 	/*
1095 	 * Break down the tuple into fields.
1096 	 */
1097 	Assert(numAttrs <= MaxTupleAttributeNumber);
1098 	heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
1099 
1100 	memset(toast_free, 0, numAttrs * sizeof(bool));
1101 
1102 	for (i = 0; i < numAttrs; i++)
1103 	{
1104 		/*
1105 		 * Look at non-null varlena attributes
1106 		 */
1107 		if (!toast_isnull[i] && att[i]->attlen == -1)
1108 		{
1109 			struct varlena *new_value;
1110 
1111 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1112 			if (VARATT_IS_EXTERNAL(new_value))
1113 			{
1114 				new_value = heap_tuple_fetch_attr(new_value);
1115 				toast_values[i] = PointerGetDatum(new_value);
1116 				toast_free[i] = true;
1117 			}
1118 		}
1119 	}
1120 
1121 	/*
1122 	 * Form the reconfigured tuple.
1123 	 */
1124 	new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
1125 
1126 	/*
1127 	 * Be sure to copy the tuple's OID and identity fields.  We also make a
1128 	 * point of copying visibility info, just in case anybody looks at those
1129 	 * fields in a syscache entry.
1130 	 */
1131 	if (tupleDesc->tdhasoid)
1132 		HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
1133 
1134 	new_tuple->t_self = tup->t_self;
1135 	new_tuple->t_tableOid = tup->t_tableOid;
1136 
1137 	new_tuple->t_data->t_choice = tup->t_data->t_choice;
1138 	new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
1139 	new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
1140 	new_tuple->t_data->t_infomask |=
1141 		tup->t_data->t_infomask & HEAP_XACT_MASK;
1142 	new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
1143 	new_tuple->t_data->t_infomask2 |=
1144 		tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1145 
1146 	/*
1147 	 * Free allocated temp values
1148 	 */
1149 	for (i = 0; i < numAttrs; i++)
1150 		if (toast_free[i])
1151 			pfree(DatumGetPointer(toast_values[i]));
1152 
1153 	return new_tuple;
1154 }
1155 
1156 
1157 /* ----------
1158  * toast_flatten_tuple_to_datum -
1159  *
1160  *	"Flatten" a tuple containing out-of-line toasted fields into a Datum.
1161  *	The result is always palloc'd in the current memory context.
1162  *
1163  *	We have a general rule that Datums of container types (rows, arrays,
1164  *	ranges, etc) must not contain any external TOAST pointers.  Without
1165  *	this rule, we'd have to look inside each Datum when preparing a tuple
1166  *	for storage, which would be expensive and would fail to extend cleanly
1167  *	to new sorts of container types.
1168  *
1169  *	However, we don't want to say that tuples represented as HeapTuples
1170  *	can't contain toasted fields, so instead this routine should be called
1171  *	when such a HeapTuple is being converted into a Datum.
1172  *
1173  *	While we're at it, we decompress any compressed fields too.  This is not
1174  *	necessary for correctness, but reflects an expectation that compression
1175  *	will be more effective if applied to the whole tuple not individual
1176  *	fields.  We are not so concerned about that that we want to deconstruct
1177  *	and reconstruct tuples just to get rid of compressed fields, however.
1178  *	So callers typically won't call this unless they see that the tuple has
1179  *	at least one external field.
1180  *
1181  *	On the other hand, in-line short-header varlena fields are left alone.
1182  *	If we "untoasted" them here, they'd just get changed back to short-header
1183  *	format anyway within heap_fill_tuple.
1184  * ----------
1185  */
1186 Datum
toast_flatten_tuple_to_datum(HeapTupleHeader tup,uint32 tup_len,TupleDesc tupleDesc)1187 toast_flatten_tuple_to_datum(HeapTupleHeader tup,
1188 							 uint32 tup_len,
1189 							 TupleDesc tupleDesc)
1190 {
1191 	HeapTupleHeader new_data;
1192 	int32		new_header_len;
1193 	int32		new_data_len;
1194 	int32		new_tuple_len;
1195 	HeapTupleData tmptup;
1196 	Form_pg_attribute *att = tupleDesc->attrs;
1197 	int			numAttrs = tupleDesc->natts;
1198 	int			i;
1199 	bool		has_nulls = false;
1200 	Datum		toast_values[MaxTupleAttributeNumber];
1201 	bool		toast_isnull[MaxTupleAttributeNumber];
1202 	bool		toast_free[MaxTupleAttributeNumber];
1203 
1204 	/* Build a temporary HeapTuple control structure */
1205 	tmptup.t_len = tup_len;
1206 	ItemPointerSetInvalid(&(tmptup.t_self));
1207 	tmptup.t_tableOid = InvalidOid;
1208 	tmptup.t_data = tup;
1209 
1210 	/*
1211 	 * Break down the tuple into fields.
1212 	 */
1213 	Assert(numAttrs <= MaxTupleAttributeNumber);
1214 	heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1215 
1216 	memset(toast_free, 0, numAttrs * sizeof(bool));
1217 
1218 	for (i = 0; i < numAttrs; i++)
1219 	{
1220 		/*
1221 		 * Look at non-null varlena attributes
1222 		 */
1223 		if (toast_isnull[i])
1224 			has_nulls = true;
1225 		else if (att[i]->attlen == -1)
1226 		{
1227 			struct varlena *new_value;
1228 
1229 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1230 			if (VARATT_IS_EXTERNAL(new_value) ||
1231 				VARATT_IS_COMPRESSED(new_value))
1232 			{
1233 				new_value = heap_tuple_untoast_attr(new_value);
1234 				toast_values[i] = PointerGetDatum(new_value);
1235 				toast_free[i] = true;
1236 			}
1237 		}
1238 	}
1239 
1240 	/*
1241 	 * Calculate the new size of the tuple.
1242 	 *
1243 	 * This should match the reconstruction code in toast_insert_or_update.
1244 	 */
1245 	new_header_len = SizeofHeapTupleHeader;
1246 	if (has_nulls)
1247 		new_header_len += BITMAPLEN(numAttrs);
1248 	if (tup->t_infomask & HEAP_HASOID)
1249 		new_header_len += sizeof(Oid);
1250 	new_header_len = MAXALIGN(new_header_len);
1251 	new_data_len = heap_compute_data_size(tupleDesc,
1252 										  toast_values, toast_isnull);
1253 	new_tuple_len = new_header_len + new_data_len;
1254 
1255 	new_data = (HeapTupleHeader) palloc0(new_tuple_len);
1256 
1257 	/*
1258 	 * Copy the existing tuple header, but adjust natts and t_hoff.
1259 	 */
1260 	memcpy(new_data, tup, SizeofHeapTupleHeader);
1261 	HeapTupleHeaderSetNatts(new_data, numAttrs);
1262 	new_data->t_hoff = new_header_len;
1263 	if (tup->t_infomask & HEAP_HASOID)
1264 		HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(tup));
1265 
1266 	/* Set the composite-Datum header fields correctly */
1267 	HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
1268 	HeapTupleHeaderSetTypeId(new_data, tupleDesc->tdtypeid);
1269 	HeapTupleHeaderSetTypMod(new_data, tupleDesc->tdtypmod);
1270 
1271 	/* Copy over the data, and fill the null bitmap if needed */
1272 	heap_fill_tuple(tupleDesc,
1273 					toast_values,
1274 					toast_isnull,
1275 					(char *) new_data + new_header_len,
1276 					new_data_len,
1277 					&(new_data->t_infomask),
1278 					has_nulls ? new_data->t_bits : NULL);
1279 
1280 	/*
1281 	 * Free allocated temp values
1282 	 */
1283 	for (i = 0; i < numAttrs; i++)
1284 		if (toast_free[i])
1285 			pfree(DatumGetPointer(toast_values[i]));
1286 
1287 	return PointerGetDatum(new_data);
1288 }
1289 
1290 
1291 /* ----------
1292  * toast_compress_datum -
1293  *
1294  *	Create a compressed version of a varlena datum
1295  *
1296  *	If we fail (ie, compressed result is actually bigger than original)
1297  *	then return NULL.  We must not use compressed data if it'd expand
1298  *	the tuple!
1299  *
1300  *	We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1301  *	copying them.  But we can't handle external or compressed datums.
1302  * ----------
1303  */
1304 Datum
toast_compress_datum(Datum value)1305 toast_compress_datum(Datum value)
1306 {
1307 	struct varlena *tmp;
1308 	int32		valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1309 	int32		len;
1310 
1311 	Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1312 	Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1313 
1314 	/*
1315 	 * No point in wasting a palloc cycle if value size is out of the allowed
1316 	 * range for compression
1317 	 */
1318 	if (valsize < PGLZ_strategy_default->min_input_size ||
1319 		valsize > PGLZ_strategy_default->max_input_size)
1320 		return PointerGetDatum(NULL);
1321 
1322 	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
1323 									TOAST_COMPRESS_HDRSZ);
1324 
1325 	/*
1326 	 * We recheck the actual size even if pglz_compress() reports success,
1327 	 * because it might be satisfied with having saved as little as one byte
1328 	 * in the compressed data --- which could turn into a net loss once you
1329 	 * consider header and alignment padding.  Worst case, the compressed
1330 	 * format might require three padding bytes (plus header, which is
1331 	 * included in VARSIZE(tmp)), whereas the uncompressed format would take
1332 	 * only one header byte and no padding if the value is short enough.  So
1333 	 * we insist on a savings of more than 2 bytes to ensure we have a gain.
1334 	 */
1335 	len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)),
1336 						valsize,
1337 						TOAST_COMPRESS_RAWDATA(tmp),
1338 						PGLZ_strategy_default);
1339 	if (len >= 0 &&
1340 		len + TOAST_COMPRESS_HDRSZ < valsize - 2)
1341 	{
1342 		TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize);
1343 		SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ);
1344 		/* successful compression */
1345 		return PointerGetDatum(tmp);
1346 	}
1347 	else
1348 	{
1349 		/* incompressible data */
1350 		pfree(tmp);
1351 		return PointerGetDatum(NULL);
1352 	}
1353 }
1354 
1355 
1356 /* ----------
1357  * toast_get_valid_index
1358  *
1359  *	Get OID of valid index associated to given toast relation. A toast
1360  *	relation can have only one valid index at the same time.
1361  */
1362 Oid
toast_get_valid_index(Oid toastoid,LOCKMODE lock)1363 toast_get_valid_index(Oid toastoid, LOCKMODE lock)
1364 {
1365 	int			num_indexes;
1366 	int			validIndex;
1367 	Oid			validIndexOid;
1368 	Relation   *toastidxs;
1369 	Relation	toastrel;
1370 
1371 	/* Open the toast relation */
1372 	toastrel = heap_open(toastoid, lock);
1373 
1374 	/* Look for the valid index of the toast relation */
1375 	validIndex = toast_open_indexes(toastrel,
1376 									lock,
1377 									&toastidxs,
1378 									&num_indexes);
1379 	validIndexOid = RelationGetRelid(toastidxs[validIndex]);
1380 
1381 	/* Close the toast relation and all its indexes */
1382 	toast_close_indexes(toastidxs, num_indexes, NoLock);
1383 	heap_close(toastrel, NoLock);
1384 
1385 	return validIndexOid;
1386 }
1387 
1388 
1389 /* ----------
1390  * toast_save_datum -
1391  *
1392  *	Save one single datum into the secondary relation and return
1393  *	a Datum reference for it.
1394  *
1395  * rel: the main relation we're working with (not the toast rel!)
1396  * value: datum to be pushed to toast storage
1397  * oldexternal: if not NULL, toast pointer previously representing the datum
1398  * options: options to be passed to heap_insert() for toast rows
1399  * ----------
1400  */
1401 static Datum
toast_save_datum(Relation rel,Datum value,struct varlena * oldexternal,int options)1402 toast_save_datum(Relation rel, Datum value,
1403 				 struct varlena * oldexternal, int options)
1404 {
1405 	Relation	toastrel;
1406 	Relation   *toastidxs;
1407 	HeapTuple	toasttup;
1408 	TupleDesc	toasttupDesc;
1409 	Datum		t_values[3];
1410 	bool		t_isnull[3];
1411 	CommandId	mycid = GetCurrentCommandId(true);
1412 	struct varlena *result;
1413 	struct varatt_external toast_pointer;
1414 	union
1415 	{
1416 		struct varlena hdr;
1417 		/* this is to make the union big enough for a chunk: */
1418 		char		data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ];
1419 		/* ensure union is aligned well enough: */
1420 		int32		align_it;
1421 	}			chunk_data;
1422 	int32		chunk_size;
1423 	int32		chunk_seq = 0;
1424 	char	   *data_p;
1425 	int32		data_todo;
1426 	Pointer		dval = DatumGetPointer(value);
1427 	int			num_indexes;
1428 	int			validIndex;
1429 
1430 	Assert(!VARATT_IS_EXTERNAL(value));
1431 
1432 	/*
1433 	 * Open the toast relation and its indexes.  We can use the index to check
1434 	 * uniqueness of the OID we assign to the toasted item, even though it has
1435 	 * additional columns besides OID.
1436 	 */
1437 	toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1438 	toasttupDesc = toastrel->rd_att;
1439 
1440 	/* Open all the toast indexes and look for the valid one */
1441 	validIndex = toast_open_indexes(toastrel,
1442 									RowExclusiveLock,
1443 									&toastidxs,
1444 									&num_indexes);
1445 
1446 	/*
1447 	 * Get the data pointer and length, and compute va_rawsize and va_extsize.
1448 	 *
1449 	 * va_rawsize is the size of the equivalent fully uncompressed datum, so
1450 	 * we have to adjust for short headers.
1451 	 *
1452 	 * va_extsize is the actual size of the data payload in the toast records.
1453 	 */
1454 	if (VARATT_IS_SHORT(dval))
1455 	{
1456 		data_p = VARDATA_SHORT(dval);
1457 		data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1458 		toast_pointer.va_rawsize = data_todo + VARHDRSZ;		/* as if not short */
1459 		toast_pointer.va_extsize = data_todo;
1460 	}
1461 	else if (VARATT_IS_COMPRESSED(dval))
1462 	{
1463 		data_p = VARDATA(dval);
1464 		data_todo = VARSIZE(dval) - VARHDRSZ;
1465 		/* rawsize in a compressed datum is just the size of the payload */
1466 		toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1467 		toast_pointer.va_extsize = data_todo;
1468 		/* Assert that the numbers look like it's compressed */
1469 		Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1470 	}
1471 	else
1472 	{
1473 		data_p = VARDATA(dval);
1474 		data_todo = VARSIZE(dval) - VARHDRSZ;
1475 		toast_pointer.va_rawsize = VARSIZE(dval);
1476 		toast_pointer.va_extsize = data_todo;
1477 	}
1478 
1479 	/*
1480 	 * Insert the correct table OID into the result TOAST pointer.
1481 	 *
1482 	 * Normally this is the actual OID of the target toast table, but during
1483 	 * table-rewriting operations such as CLUSTER, we have to insert the OID
1484 	 * of the table's real permanent toast table instead.  rd_toastoid is set
1485 	 * if we have to substitute such an OID.
1486 	 */
1487 	if (OidIsValid(rel->rd_toastoid))
1488 		toast_pointer.va_toastrelid = rel->rd_toastoid;
1489 	else
1490 		toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1491 
1492 	/*
1493 	 * Choose an OID to use as the value ID for this toast value.
1494 	 *
1495 	 * Normally we just choose an unused OID within the toast table.  But
1496 	 * during table-rewriting operations where we are preserving an existing
1497 	 * toast table OID, we want to preserve toast value OIDs too.  So, if
1498 	 * rd_toastoid is set and we had a prior external value from that same
1499 	 * toast table, re-use its value ID.  If we didn't have a prior external
1500 	 * value (which is a corner case, but possible if the table's attstorage
1501 	 * options have been changed), we have to pick a value ID that doesn't
1502 	 * conflict with either new or existing toast value OIDs.
1503 	 */
1504 	if (!OidIsValid(rel->rd_toastoid))
1505 	{
1506 		/* normal case: just choose an unused OID */
1507 		toast_pointer.va_valueid =
1508 			GetNewOidWithIndex(toastrel,
1509 							   RelationGetRelid(toastidxs[validIndex]),
1510 							   (AttrNumber) 1);
1511 	}
1512 	else
1513 	{
1514 		/* rewrite case: check to see if value was in old toast table */
1515 		toast_pointer.va_valueid = InvalidOid;
1516 		if (oldexternal != NULL)
1517 		{
1518 			struct varatt_external old_toast_pointer;
1519 
1520 			Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
1521 			/* Must copy to access aligned fields */
1522 			VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1523 			if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1524 			{
1525 				/* This value came from the old toast table; reuse its OID */
1526 				toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1527 
1528 				/*
1529 				 * There is a corner case here: the table rewrite might have
1530 				 * to copy both live and recently-dead versions of a row, and
1531 				 * those versions could easily reference the same toast value.
1532 				 * When we copy the second or later version of such a row,
1533 				 * reusing the OID will mean we select an OID that's already
1534 				 * in the new toast table.  Check for that, and if so, just
1535 				 * fall through without writing the data again.
1536 				 *
1537 				 * While annoying and ugly-looking, this is a good thing
1538 				 * because it ensures that we wind up with only one copy of
1539 				 * the toast value when there is only one copy in the old
1540 				 * toast table.  Before we detected this case, we'd have made
1541 				 * multiple copies, wasting space; and what's worse, the
1542 				 * copies belonging to already-deleted heap tuples would not
1543 				 * be reclaimed by VACUUM.
1544 				 */
1545 				if (toastrel_valueid_exists(toastrel,
1546 											toast_pointer.va_valueid))
1547 				{
1548 					/* Match, so short-circuit the data storage loop below */
1549 					data_todo = 0;
1550 				}
1551 			}
1552 		}
1553 		if (toast_pointer.va_valueid == InvalidOid)
1554 		{
1555 			/*
1556 			 * new value; must choose an OID that doesn't conflict in either
1557 			 * old or new toast table
1558 			 */
1559 			do
1560 			{
1561 				toast_pointer.va_valueid =
1562 					GetNewOidWithIndex(toastrel,
1563 									 RelationGetRelid(toastidxs[validIndex]),
1564 									   (AttrNumber) 1);
1565 			} while (toastid_valueid_exists(rel->rd_toastoid,
1566 											toast_pointer.va_valueid));
1567 		}
1568 	}
1569 
1570 	/*
1571 	 * Initialize constant parts of the tuple data
1572 	 */
1573 	t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1574 	t_values[2] = PointerGetDatum(&chunk_data);
1575 	t_isnull[0] = false;
1576 	t_isnull[1] = false;
1577 	t_isnull[2] = false;
1578 
1579 	/*
1580 	 * Split up the item into chunks
1581 	 */
1582 	while (data_todo > 0)
1583 	{
1584 		int			i;
1585 
1586 		CHECK_FOR_INTERRUPTS();
1587 
1588 		/*
1589 		 * Calculate the size of this chunk
1590 		 */
1591 		chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1592 
1593 		/*
1594 		 * Build a tuple and store it
1595 		 */
1596 		t_values[1] = Int32GetDatum(chunk_seq++);
1597 		SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1598 		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1599 		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1600 
1601 		heap_insert(toastrel, toasttup, mycid, options, NULL);
1602 
1603 		/*
1604 		 * Create the index entry.  We cheat a little here by not using
1605 		 * FormIndexDatum: this relies on the knowledge that the index columns
1606 		 * are the same as the initial columns of the table for all the
1607 		 * indexes.
1608 		 *
1609 		 * Note also that there had better not be any user-created index on
1610 		 * the TOAST table, since we don't bother to update anything else.
1611 		 */
1612 		for (i = 0; i < num_indexes; i++)
1613 		{
1614 			/* Only index relations marked as ready can be updated */
1615 			if (IndexIsReady(toastidxs[i]->rd_index))
1616 				index_insert(toastidxs[i], t_values, t_isnull,
1617 							 &(toasttup->t_self),
1618 							 toastrel,
1619 							 toastidxs[i]->rd_index->indisunique ?
1620 							 UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
1621 		}
1622 
1623 		/*
1624 		 * Free memory
1625 		 */
1626 		heap_freetuple(toasttup);
1627 
1628 		/*
1629 		 * Move on to next chunk
1630 		 */
1631 		data_todo -= chunk_size;
1632 		data_p += chunk_size;
1633 	}
1634 
1635 	/*
1636 	 * Done - close toast relation and its indexes
1637 	 */
1638 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1639 	heap_close(toastrel, RowExclusiveLock);
1640 
1641 	/*
1642 	 * Create the TOAST pointer value that we'll return
1643 	 */
1644 	result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1645 	SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
1646 	memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1647 
1648 	return PointerGetDatum(result);
1649 }
1650 
1651 
1652 /* ----------
1653  * toast_delete_datum -
1654  *
1655  *	Delete a single external stored value.
1656  * ----------
1657  */
1658 static void
toast_delete_datum(Relation rel,Datum value,bool is_speculative)1659 toast_delete_datum(Relation rel, Datum value, bool is_speculative)
1660 {
1661 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1662 	struct varatt_external toast_pointer;
1663 	Relation	toastrel;
1664 	Relation   *toastidxs;
1665 	ScanKeyData toastkey;
1666 	SysScanDesc toastscan;
1667 	HeapTuple	toasttup;
1668 	int			num_indexes;
1669 	int			validIndex;
1670 	SnapshotData SnapshotToast;
1671 
1672 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1673 		return;
1674 
1675 	/* Must copy to access aligned fields */
1676 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1677 
1678 	/*
1679 	 * Open the toast relation and its indexes
1680 	 */
1681 	toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1682 
1683 	/* Fetch valid relation used for process */
1684 	validIndex = toast_open_indexes(toastrel,
1685 									RowExclusiveLock,
1686 									&toastidxs,
1687 									&num_indexes);
1688 
1689 	/*
1690 	 * Setup a scan key to find chunks with matching va_valueid
1691 	 */
1692 	ScanKeyInit(&toastkey,
1693 				(AttrNumber) 1,
1694 				BTEqualStrategyNumber, F_OIDEQ,
1695 				ObjectIdGetDatum(toast_pointer.va_valueid));
1696 
1697 	/*
1698 	 * Find all the chunks.  (We don't actually care whether we see them in
1699 	 * sequence or not, but since we've already locked the index we might as
1700 	 * well use systable_beginscan_ordered.)
1701 	 */
1702 	init_toast_snapshot(&SnapshotToast);
1703 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1704 										   &SnapshotToast, 1, &toastkey);
1705 	while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1706 	{
1707 		/*
1708 		 * Have a chunk, delete it
1709 		 */
1710 		if (is_speculative)
1711 			heap_abort_speculative(toastrel, toasttup);
1712 		else
1713 			simple_heap_delete(toastrel, &toasttup->t_self);
1714 	}
1715 
1716 	/*
1717 	 * End scan and close relations
1718 	 */
1719 	systable_endscan_ordered(toastscan);
1720 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1721 	heap_close(toastrel, RowExclusiveLock);
1722 }
1723 
1724 
1725 /* ----------
1726  * toastrel_valueid_exists -
1727  *
1728  *	Test whether a toast value with the given ID exists in the toast relation.
1729  *	For safety, we consider a value to exist if there are either live or dead
1730  *	toast rows with that ID; see notes for GetNewOid().
1731  * ----------
1732  */
1733 static bool
toastrel_valueid_exists(Relation toastrel,Oid valueid)1734 toastrel_valueid_exists(Relation toastrel, Oid valueid)
1735 {
1736 	bool		result = false;
1737 	ScanKeyData toastkey;
1738 	SysScanDesc toastscan;
1739 	int			num_indexes;
1740 	int			validIndex;
1741 	Relation   *toastidxs;
1742 
1743 	/* Fetch a valid index relation */
1744 	validIndex = toast_open_indexes(toastrel,
1745 									RowExclusiveLock,
1746 									&toastidxs,
1747 									&num_indexes);
1748 
1749 	/*
1750 	 * Setup a scan key to find chunks with matching va_valueid
1751 	 */
1752 	ScanKeyInit(&toastkey,
1753 				(AttrNumber) 1,
1754 				BTEqualStrategyNumber, F_OIDEQ,
1755 				ObjectIdGetDatum(valueid));
1756 
1757 	/*
1758 	 * Is there any such chunk?
1759 	 */
1760 	toastscan = systable_beginscan(toastrel,
1761 								   RelationGetRelid(toastidxs[validIndex]),
1762 								   true, SnapshotAny, 1, &toastkey);
1763 
1764 	if (systable_getnext(toastscan) != NULL)
1765 		result = true;
1766 
1767 	systable_endscan(toastscan);
1768 
1769 	/* Clean up */
1770 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1771 
1772 	return result;
1773 }
1774 
1775 /* ----------
1776  * toastid_valueid_exists -
1777  *
1778  *	As above, but work from toast rel's OID not an open relation
1779  * ----------
1780  */
1781 static bool
toastid_valueid_exists(Oid toastrelid,Oid valueid)1782 toastid_valueid_exists(Oid toastrelid, Oid valueid)
1783 {
1784 	bool		result;
1785 	Relation	toastrel;
1786 
1787 	toastrel = heap_open(toastrelid, AccessShareLock);
1788 
1789 	result = toastrel_valueid_exists(toastrel, valueid);
1790 
1791 	heap_close(toastrel, AccessShareLock);
1792 
1793 	return result;
1794 }
1795 
1796 
1797 /* ----------
1798  * toast_fetch_datum -
1799  *
1800  *	Reconstruct an in memory Datum from the chunks saved
1801  *	in the toast relation
1802  * ----------
1803  */
1804 static struct varlena *
toast_fetch_datum(struct varlena * attr)1805 toast_fetch_datum(struct varlena * attr)
1806 {
1807 	Relation	toastrel;
1808 	Relation   *toastidxs;
1809 	ScanKeyData toastkey;
1810 	SysScanDesc toastscan;
1811 	HeapTuple	ttup;
1812 	TupleDesc	toasttupDesc;
1813 	struct varlena *result;
1814 	struct varatt_external toast_pointer;
1815 	int32		ressize;
1816 	int32		residx,
1817 				nextidx;
1818 	int32		numchunks;
1819 	Pointer		chunk;
1820 	bool		isnull;
1821 	char	   *chunkdata;
1822 	int32		chunksize;
1823 	int			num_indexes;
1824 	int			validIndex;
1825 	SnapshotData SnapshotToast;
1826 
1827 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1828 		elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
1829 
1830 	/* Must copy to access aligned fields */
1831 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1832 
1833 	ressize = toast_pointer.va_extsize;
1834 	numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1835 
1836 	result = (struct varlena *) palloc(ressize + VARHDRSZ);
1837 
1838 	if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1839 		SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1840 	else
1841 		SET_VARSIZE(result, ressize + VARHDRSZ);
1842 
1843 	/*
1844 	 * Open the toast relation and its indexes
1845 	 */
1846 	toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1847 	toasttupDesc = toastrel->rd_att;
1848 
1849 	/* Look for the valid index of the toast relation */
1850 	validIndex = toast_open_indexes(toastrel,
1851 									AccessShareLock,
1852 									&toastidxs,
1853 									&num_indexes);
1854 
1855 	/*
1856 	 * Setup a scan key to fetch from the index by va_valueid
1857 	 */
1858 	ScanKeyInit(&toastkey,
1859 				(AttrNumber) 1,
1860 				BTEqualStrategyNumber, F_OIDEQ,
1861 				ObjectIdGetDatum(toast_pointer.va_valueid));
1862 
1863 	/*
1864 	 * Read the chunks by index
1865 	 *
1866 	 * Note that because the index is actually on (valueid, chunkidx) we will
1867 	 * see the chunks in chunkidx order, even though we didn't explicitly ask
1868 	 * for it.
1869 	 */
1870 	nextidx = 0;
1871 
1872 	init_toast_snapshot(&SnapshotToast);
1873 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1874 										   &SnapshotToast, 1, &toastkey);
1875 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1876 	{
1877 		/*
1878 		 * Have a chunk, extract the sequence number and the data
1879 		 */
1880 		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1881 		Assert(!isnull);
1882 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1883 		Assert(!isnull);
1884 		if (!VARATT_IS_EXTENDED(chunk))
1885 		{
1886 			chunksize = VARSIZE(chunk) - VARHDRSZ;
1887 			chunkdata = VARDATA(chunk);
1888 		}
1889 		else if (VARATT_IS_SHORT(chunk))
1890 		{
1891 			/* could happen due to heap_form_tuple doing its thing */
1892 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1893 			chunkdata = VARDATA_SHORT(chunk);
1894 		}
1895 		else
1896 		{
1897 			/* should never happen */
1898 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1899 				 toast_pointer.va_valueid,
1900 				 RelationGetRelationName(toastrel));
1901 			chunksize = 0;		/* keep compiler quiet */
1902 			chunkdata = NULL;
1903 		}
1904 
1905 		/*
1906 		 * Some checks on the data we've found
1907 		 */
1908 		if (residx != nextidx)
1909 			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1910 				 residx, nextidx,
1911 				 toast_pointer.va_valueid,
1912 				 RelationGetRelationName(toastrel));
1913 		if (residx < numchunks - 1)
1914 		{
1915 			if (chunksize != TOAST_MAX_CHUNK_SIZE)
1916 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1917 					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1918 					 residx, numchunks,
1919 					 toast_pointer.va_valueid,
1920 					 RelationGetRelationName(toastrel));
1921 		}
1922 		else if (residx == numchunks - 1)
1923 		{
1924 			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1925 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
1926 					 chunksize,
1927 					 (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1928 					 residx,
1929 					 toast_pointer.va_valueid,
1930 					 RelationGetRelationName(toastrel));
1931 		}
1932 		else
1933 			elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1934 				 residx,
1935 				 0, numchunks - 1,
1936 				 toast_pointer.va_valueid,
1937 				 RelationGetRelationName(toastrel));
1938 
1939 		/*
1940 		 * Copy the data into proper place in our result
1941 		 */
1942 		memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1943 			   chunkdata,
1944 			   chunksize);
1945 
1946 		nextidx++;
1947 	}
1948 
1949 	/*
1950 	 * Final checks that we successfully fetched the datum
1951 	 */
1952 	if (nextidx != numchunks)
1953 		elog(ERROR, "missing chunk number %d for toast value %u in %s",
1954 			 nextidx,
1955 			 toast_pointer.va_valueid,
1956 			 RelationGetRelationName(toastrel));
1957 
1958 	/*
1959 	 * End scan and close relations
1960 	 */
1961 	systable_endscan_ordered(toastscan);
1962 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
1963 	heap_close(toastrel, AccessShareLock);
1964 
1965 	return result;
1966 }
1967 
1968 /* ----------
1969  * toast_fetch_datum_slice -
1970  *
1971  *	Reconstruct a segment of a Datum from the chunks saved
1972  *	in the toast relation
1973  * ----------
1974  */
1975 static struct varlena *
toast_fetch_datum_slice(struct varlena * attr,int32 sliceoffset,int32 length)1976 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1977 {
1978 	Relation	toastrel;
1979 	Relation   *toastidxs;
1980 	ScanKeyData toastkey[3];
1981 	int			nscankeys;
1982 	SysScanDesc toastscan;
1983 	HeapTuple	ttup;
1984 	TupleDesc	toasttupDesc;
1985 	struct varlena *result;
1986 	struct varatt_external toast_pointer;
1987 	int32		attrsize;
1988 	int32		residx;
1989 	int32		nextidx;
1990 	int			numchunks;
1991 	int			startchunk;
1992 	int			endchunk;
1993 	int32		startoffset;
1994 	int32		endoffset;
1995 	int			totalchunks;
1996 	Pointer		chunk;
1997 	bool		isnull;
1998 	char	   *chunkdata;
1999 	int32		chunksize;
2000 	int32		chcpystrt;
2001 	int32		chcpyend;
2002 	int			num_indexes;
2003 	int			validIndex;
2004 	SnapshotData SnapshotToast;
2005 
2006 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
2007 		elog(ERROR, "toast_fetch_datum_slice shouldn't be called for non-ondisk datums");
2008 
2009 	/* Must copy to access aligned fields */
2010 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
2011 
2012 	/*
2013 	 * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
2014 	 * we can't return a compressed datum which is meaningful to toast later
2015 	 */
2016 	Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
2017 
2018 	attrsize = toast_pointer.va_extsize;
2019 	totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
2020 
2021 	if (sliceoffset >= attrsize)
2022 	{
2023 		sliceoffset = 0;
2024 		length = 0;
2025 	}
2026 
2027 	if (((sliceoffset + length) > attrsize) || length < 0)
2028 		length = attrsize - sliceoffset;
2029 
2030 	result = (struct varlena *) palloc(length + VARHDRSZ);
2031 
2032 	if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2033 		SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
2034 	else
2035 		SET_VARSIZE(result, length + VARHDRSZ);
2036 
2037 	if (length == 0)
2038 		return result;			/* Can save a lot of work at this point! */
2039 
2040 	startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
2041 	endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
2042 	numchunks = (endchunk - startchunk) + 1;
2043 
2044 	startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
2045 	endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
2046 
2047 	/*
2048 	 * Open the toast relation and its indexes
2049 	 */
2050 	toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
2051 	toasttupDesc = toastrel->rd_att;
2052 
2053 	/* Look for the valid index of toast relation */
2054 	validIndex = toast_open_indexes(toastrel,
2055 									AccessShareLock,
2056 									&toastidxs,
2057 									&num_indexes);
2058 
2059 	/*
2060 	 * Setup a scan key to fetch from the index. This is either two keys or
2061 	 * three depending on the number of chunks.
2062 	 */
2063 	ScanKeyInit(&toastkey[0],
2064 				(AttrNumber) 1,
2065 				BTEqualStrategyNumber, F_OIDEQ,
2066 				ObjectIdGetDatum(toast_pointer.va_valueid));
2067 
2068 	/*
2069 	 * Use equality condition for one chunk, a range condition otherwise:
2070 	 */
2071 	if (numchunks == 1)
2072 	{
2073 		ScanKeyInit(&toastkey[1],
2074 					(AttrNumber) 2,
2075 					BTEqualStrategyNumber, F_INT4EQ,
2076 					Int32GetDatum(startchunk));
2077 		nscankeys = 2;
2078 	}
2079 	else
2080 	{
2081 		ScanKeyInit(&toastkey[1],
2082 					(AttrNumber) 2,
2083 					BTGreaterEqualStrategyNumber, F_INT4GE,
2084 					Int32GetDatum(startchunk));
2085 		ScanKeyInit(&toastkey[2],
2086 					(AttrNumber) 2,
2087 					BTLessEqualStrategyNumber, F_INT4LE,
2088 					Int32GetDatum(endchunk));
2089 		nscankeys = 3;
2090 	}
2091 
2092 	/*
2093 	 * Read the chunks by index
2094 	 *
2095 	 * The index is on (valueid, chunkidx) so they will come in order
2096 	 */
2097 	init_toast_snapshot(&SnapshotToast);
2098 	nextidx = startchunk;
2099 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
2100 										&SnapshotToast, nscankeys, toastkey);
2101 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
2102 	{
2103 		/*
2104 		 * Have a chunk, extract the sequence number and the data
2105 		 */
2106 		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
2107 		Assert(!isnull);
2108 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
2109 		Assert(!isnull);
2110 		if (!VARATT_IS_EXTENDED(chunk))
2111 		{
2112 			chunksize = VARSIZE(chunk) - VARHDRSZ;
2113 			chunkdata = VARDATA(chunk);
2114 		}
2115 		else if (VARATT_IS_SHORT(chunk))
2116 		{
2117 			/* could happen due to heap_form_tuple doing its thing */
2118 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2119 			chunkdata = VARDATA_SHORT(chunk);
2120 		}
2121 		else
2122 		{
2123 			/* should never happen */
2124 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
2125 				 toast_pointer.va_valueid,
2126 				 RelationGetRelationName(toastrel));
2127 			chunksize = 0;		/* keep compiler quiet */
2128 			chunkdata = NULL;
2129 		}
2130 
2131 		/*
2132 		 * Some checks on the data we've found
2133 		 */
2134 		if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
2135 			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
2136 				 residx, nextidx,
2137 				 toast_pointer.va_valueid,
2138 				 RelationGetRelationName(toastrel));
2139 		if (residx < totalchunks - 1)
2140 		{
2141 			if (chunksize != TOAST_MAX_CHUNK_SIZE)
2142 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
2143 					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
2144 					 residx, totalchunks,
2145 					 toast_pointer.va_valueid,
2146 					 RelationGetRelationName(toastrel));
2147 		}
2148 		else if (residx == totalchunks - 1)
2149 		{
2150 			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
2151 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
2152 					 chunksize,
2153 					 (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
2154 					 residx,
2155 					 toast_pointer.va_valueid,
2156 					 RelationGetRelationName(toastrel));
2157 		}
2158 		else
2159 			elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2160 				 residx,
2161 				 0, totalchunks - 1,
2162 				 toast_pointer.va_valueid,
2163 				 RelationGetRelationName(toastrel));
2164 
2165 		/*
2166 		 * Copy the data into proper place in our result
2167 		 */
2168 		chcpystrt = 0;
2169 		chcpyend = chunksize - 1;
2170 		if (residx == startchunk)
2171 			chcpystrt = startoffset;
2172 		if (residx == endchunk)
2173 			chcpyend = endoffset;
2174 
2175 		memcpy(VARDATA(result) +
2176 			   (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
2177 			   chunkdata + chcpystrt,
2178 			   (chcpyend - chcpystrt) + 1);
2179 
2180 		nextidx++;
2181 	}
2182 
2183 	/*
2184 	 * Final checks that we successfully fetched the datum
2185 	 */
2186 	if (nextidx != (endchunk + 1))
2187 		elog(ERROR, "missing chunk number %d for toast value %u in %s",
2188 			 nextidx,
2189 			 toast_pointer.va_valueid,
2190 			 RelationGetRelationName(toastrel));
2191 
2192 	/*
2193 	 * End scan and close relations
2194 	 */
2195 	systable_endscan_ordered(toastscan);
2196 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2197 	heap_close(toastrel, AccessShareLock);
2198 
2199 	return result;
2200 }
2201 
2202 /* ----------
2203  * toast_decompress_datum -
2204  *
2205  * Decompress a compressed version of a varlena datum
2206  */
2207 static struct varlena *
toast_decompress_datum(struct varlena * attr)2208 toast_decompress_datum(struct varlena * attr)
2209 {
2210 	struct varlena *result;
2211 
2212 	Assert(VARATT_IS_COMPRESSED(attr));
2213 
2214 	result = (struct varlena *)
2215 		palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2216 	SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2217 
2218 	if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
2219 						VARSIZE(attr) - TOAST_COMPRESS_HDRSZ,
2220 						VARDATA(result),
2221 						TOAST_COMPRESS_RAWSIZE(attr)) < 0)
2222 		elog(ERROR, "compressed data is corrupted");
2223 
2224 	return result;
2225 }
2226 
2227 
2228 /* ----------
2229  * toast_open_indexes
2230  *
2231  *	Get an array of the indexes associated to the given toast relation
2232  *	and return as well the position of the valid index used by the toast
2233  *	relation in this array. It is the responsibility of the caller of this
2234  *	function to close the indexes as well as free them.
2235  */
2236 static int
toast_open_indexes(Relation toastrel,LOCKMODE lock,Relation ** toastidxs,int * num_indexes)2237 toast_open_indexes(Relation toastrel,
2238 				   LOCKMODE lock,
2239 				   Relation **toastidxs,
2240 				   int *num_indexes)
2241 {
2242 	int			i = 0;
2243 	int			res = 0;
2244 	bool		found = false;
2245 	List	   *indexlist;
2246 	ListCell   *lc;
2247 
2248 	/* Get index list of the toast relation */
2249 	indexlist = RelationGetIndexList(toastrel);
2250 	Assert(indexlist != NIL);
2251 
2252 	*num_indexes = list_length(indexlist);
2253 
2254 	/* Open all the index relations */
2255 	*toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation));
2256 	foreach(lc, indexlist)
2257 		(*toastidxs)[i++] = index_open(lfirst_oid(lc), lock);
2258 
2259 	/* Fetch the first valid index in list */
2260 	for (i = 0; i < *num_indexes; i++)
2261 	{
2262 		Relation	toastidx = (*toastidxs)[i];
2263 
2264 		if (toastidx->rd_index->indisvalid)
2265 		{
2266 			res = i;
2267 			found = true;
2268 			break;
2269 		}
2270 	}
2271 
2272 	/*
2273 	 * Free index list, not necessary anymore as relations are opened and a
2274 	 * valid index has been found.
2275 	 */
2276 	list_free(indexlist);
2277 
2278 	/*
2279 	 * The toast relation should have one valid index, so something is going
2280 	 * wrong if there is nothing.
2281 	 */
2282 	if (!found)
2283 		elog(ERROR, "no valid index found for toast relation with Oid %u",
2284 			 RelationGetRelid(toastrel));
2285 
2286 	return res;
2287 }
2288 
2289 /* ----------
2290  * toast_close_indexes
2291  *
2292  *	Close an array of indexes for a toast relation and free it. This should
2293  *	be called for a set of indexes opened previously with toast_open_indexes.
2294  */
2295 static void
toast_close_indexes(Relation * toastidxs,int num_indexes,LOCKMODE lock)2296 toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock)
2297 {
2298 	int			i;
2299 
2300 	/* Close relations and clean up things */
2301 	for (i = 0; i < num_indexes; i++)
2302 		index_close(toastidxs[i], lock);
2303 	pfree(toastidxs);
2304 }
2305 
2306 /* ----------
2307  * init_toast_snapshot
2308  *
2309  *	Initialize an appropriate TOAST snapshot.  We must use an MVCC snapshot
2310  *	to initialize the TOAST snapshot; since we don't know which one to use,
2311  *	just use the oldest one.  This is safe: at worst, we will get a "snapshot
2312  *	too old" error that might have been avoided otherwise.
2313  */
2314 static void
init_toast_snapshot(Snapshot toast_snapshot)2315 init_toast_snapshot(Snapshot toast_snapshot)
2316 {
2317 	Snapshot	snapshot = GetOldestSnapshot();
2318 
2319 	if (snapshot == NULL)
2320 		elog(ERROR, "no known snapshots");
2321 
2322 	InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken);
2323 }
2324