1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *	  Support routines for external and compressed storage of
5  *	  variable size attributes.
6  *
7  * Copyright (c) 2000-2017, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *		toast_insert_or_update -
16  *			Try to make a given tuple fit into one page by compressing
17  *			or moving off attributes
18  *
19  *		toast_delete -
20  *			Reclaim toast storage when a tuple is deleted
21  *
22  *		heap_tuple_untoast_attr -
23  *			Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27 
28 #include "postgres.h"
29 
30 #include <unistd.h>
31 #include <fcntl.h>
32 
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "common/pg_lzcompress.h"
39 #include "miscadmin.h"
40 #include "utils/expandeddatum.h"
41 #include "utils/fmgroids.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/typcache.h"
45 #include "utils/tqual.h"
46 
47 
48 #undef TOAST_DEBUG
49 
50 /*
51  *	The information at the start of the compressed toast data.
52  */
53 typedef struct toast_compress_header
54 {
55 	int32		vl_len_;		/* varlena header (do not touch directly!) */
56 	int32		rawsize;
57 } toast_compress_header;
58 
59 /*
60  * Utilities for manipulation of header information for compressed
61  * toast entries.
62  */
63 #define TOAST_COMPRESS_HDRSZ		((int32) sizeof(toast_compress_header))
64 #define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->rawsize)
65 #define TOAST_COMPRESS_RAWDATA(ptr) \
66 	(((char *) (ptr)) + TOAST_COMPRESS_HDRSZ)
67 #define TOAST_COMPRESS_SET_RAWSIZE(ptr, len) \
68 	(((toast_compress_header *) (ptr))->rawsize = (len))
69 
70 static void toast_delete_datum(Relation rel, Datum value, bool is_speculative);
71 static Datum toast_save_datum(Relation rel, Datum value,
72 				 struct varlena *oldexternal, int options);
73 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
74 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
75 static struct varlena *toast_fetch_datum(struct varlena *attr);
76 static struct varlena *toast_fetch_datum_slice(struct varlena *attr,
77 						int32 sliceoffset, int32 length);
78 static struct varlena *toast_decompress_datum(struct varlena *attr);
79 static int toast_open_indexes(Relation toastrel,
80 				   LOCKMODE lock,
81 				   Relation **toastidxs,
82 				   int *num_indexes);
83 static void toast_close_indexes(Relation *toastidxs, int num_indexes,
84 					LOCKMODE lock);
85 static void init_toast_snapshot(Snapshot toast_snapshot);
86 
87 
88 /* ----------
89  * heap_tuple_fetch_attr -
90  *
91  *	Public entry point to get back a toasted value from
92  *	external source (possibly still in compressed format).
93  *
94  * This will return a datum that contains all the data internally, ie, not
95  * relying on external storage or memory, but it can still be compressed or
96  * have a short header.  Note some callers assume that if the input is an
97  * EXTERNAL datum, the result will be a pfree'able chunk.
98  * ----------
99  */
100 struct varlena *
heap_tuple_fetch_attr(struct varlena * attr)101 heap_tuple_fetch_attr(struct varlena *attr)
102 {
103 	struct varlena *result;
104 
105 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
106 	{
107 		/*
108 		 * This is an external stored plain value
109 		 */
110 		result = toast_fetch_datum(attr);
111 	}
112 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
113 	{
114 		/*
115 		 * This is an indirect pointer --- dereference it
116 		 */
117 		struct varatt_indirect redirect;
118 
119 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
120 		attr = (struct varlena *) redirect.pointer;
121 
122 		/* nested indirect Datums aren't allowed */
123 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
124 
125 		/* recurse if value is still external in some other way */
126 		if (VARATT_IS_EXTERNAL(attr))
127 			return heap_tuple_fetch_attr(attr);
128 
129 		/*
130 		 * Copy into the caller's memory context, in case caller tries to
131 		 * pfree the result.
132 		 */
133 		result = (struct varlena *) palloc(VARSIZE_ANY(attr));
134 		memcpy(result, attr, VARSIZE_ANY(attr));
135 	}
136 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
137 	{
138 		/*
139 		 * This is an expanded-object pointer --- get flat format
140 		 */
141 		ExpandedObjectHeader *eoh;
142 		Size		resultsize;
143 
144 		eoh = DatumGetEOHP(PointerGetDatum(attr));
145 		resultsize = EOH_get_flat_size(eoh);
146 		result = (struct varlena *) palloc(resultsize);
147 		EOH_flatten_into(eoh, (void *) result, resultsize);
148 	}
149 	else
150 	{
151 		/*
152 		 * This is a plain value inside of the main tuple - why am I called?
153 		 */
154 		result = attr;
155 	}
156 
157 	return result;
158 }
159 
160 
161 /* ----------
162  * heap_tuple_untoast_attr -
163  *
164  *	Public entry point to get back a toasted value from compression
165  *	or external storage.  The result is always non-extended varlena form.
166  *
167  * Note some callers assume that if the input is an EXTERNAL or COMPRESSED
168  * datum, the result will be a pfree'able chunk.
169  * ----------
170  */
171 struct varlena *
heap_tuple_untoast_attr(struct varlena * attr)172 heap_tuple_untoast_attr(struct varlena *attr)
173 {
174 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
175 	{
176 		/*
177 		 * This is an externally stored datum --- fetch it back from there
178 		 */
179 		attr = toast_fetch_datum(attr);
180 		/* If it's compressed, decompress it */
181 		if (VARATT_IS_COMPRESSED(attr))
182 		{
183 			struct varlena *tmp = attr;
184 
185 			attr = toast_decompress_datum(tmp);
186 			pfree(tmp);
187 		}
188 	}
189 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
190 	{
191 		/*
192 		 * This is an indirect pointer --- dereference it
193 		 */
194 		struct varatt_indirect redirect;
195 
196 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
197 		attr = (struct varlena *) redirect.pointer;
198 
199 		/* nested indirect Datums aren't allowed */
200 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
201 
202 		/* recurse in case value is still extended in some other way */
203 		attr = heap_tuple_untoast_attr(attr);
204 
205 		/* if it isn't, we'd better copy it */
206 		if (attr == (struct varlena *) redirect.pointer)
207 		{
208 			struct varlena *result;
209 
210 			result = (struct varlena *) palloc(VARSIZE_ANY(attr));
211 			memcpy(result, attr, VARSIZE_ANY(attr));
212 			attr = result;
213 		}
214 	}
215 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
216 	{
217 		/*
218 		 * This is an expanded-object pointer --- get flat format
219 		 */
220 		attr = heap_tuple_fetch_attr(attr);
221 		/* flatteners are not allowed to produce compressed/short output */
222 		Assert(!VARATT_IS_EXTENDED(attr));
223 	}
224 	else if (VARATT_IS_COMPRESSED(attr))
225 	{
226 		/*
227 		 * This is a compressed value inside of the main tuple
228 		 */
229 		attr = toast_decompress_datum(attr);
230 	}
231 	else if (VARATT_IS_SHORT(attr))
232 	{
233 		/*
234 		 * This is a short-header varlena --- convert to 4-byte header format
235 		 */
236 		Size		data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
237 		Size		new_size = data_size + VARHDRSZ;
238 		struct varlena *new_attr;
239 
240 		new_attr = (struct varlena *) palloc(new_size);
241 		SET_VARSIZE(new_attr, new_size);
242 		memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
243 		attr = new_attr;
244 	}
245 
246 	return attr;
247 }
248 
249 
250 /* ----------
251  * heap_tuple_untoast_attr_slice -
252  *
253  *		Public entry point to get back part of a toasted value
254  *		from compression or external storage.
255  * ----------
256  */
257 struct varlena *
heap_tuple_untoast_attr_slice(struct varlena * attr,int32 sliceoffset,int32 slicelength)258 heap_tuple_untoast_attr_slice(struct varlena *attr,
259 							  int32 sliceoffset, int32 slicelength)
260 {
261 	struct varlena *preslice;
262 	struct varlena *result;
263 	char	   *attrdata;
264 	int32		attrsize;
265 
266 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
267 	{
268 		struct varatt_external toast_pointer;
269 
270 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
271 
272 		/* fast path for non-compressed external datums */
273 		if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
274 			return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
275 
276 		/* fetch it back (compressed marker will get set automatically) */
277 		preslice = toast_fetch_datum(attr);
278 	}
279 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
280 	{
281 		struct varatt_indirect redirect;
282 
283 		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
284 
285 		/* nested indirect Datums aren't allowed */
286 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
287 
288 		return heap_tuple_untoast_attr_slice(redirect.pointer,
289 											 sliceoffset, slicelength);
290 	}
291 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
292 	{
293 		/* pass it off to heap_tuple_fetch_attr to flatten */
294 		preslice = heap_tuple_fetch_attr(attr);
295 	}
296 	else
297 		preslice = attr;
298 
299 	Assert(!VARATT_IS_EXTERNAL(preslice));
300 
301 	if (VARATT_IS_COMPRESSED(preslice))
302 	{
303 		struct varlena *tmp = preslice;
304 
305 		preslice = toast_decompress_datum(tmp);
306 
307 		if (tmp != attr)
308 			pfree(tmp);
309 	}
310 
311 	if (VARATT_IS_SHORT(preslice))
312 	{
313 		attrdata = VARDATA_SHORT(preslice);
314 		attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
315 	}
316 	else
317 	{
318 		attrdata = VARDATA(preslice);
319 		attrsize = VARSIZE(preslice) - VARHDRSZ;
320 	}
321 
322 	/* slicing of datum for compressed cases and plain value */
323 
324 	if (sliceoffset >= attrsize)
325 	{
326 		sliceoffset = 0;
327 		slicelength = 0;
328 	}
329 
330 	if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
331 		slicelength = attrsize - sliceoffset;
332 
333 	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
334 	SET_VARSIZE(result, slicelength + VARHDRSZ);
335 
336 	memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
337 
338 	if (preslice != attr)
339 		pfree(preslice);
340 
341 	return result;
342 }
343 
344 
345 /* ----------
346  * toast_raw_datum_size -
347  *
348  *	Return the raw (detoasted) size of a varlena datum
349  *	(including the VARHDRSZ header)
350  * ----------
351  */
352 Size
toast_raw_datum_size(Datum value)353 toast_raw_datum_size(Datum value)
354 {
355 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
356 	Size		result;
357 
358 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
359 	{
360 		/* va_rawsize is the size of the original datum -- including header */
361 		struct varatt_external toast_pointer;
362 
363 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
364 		result = toast_pointer.va_rawsize;
365 	}
366 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
367 	{
368 		struct varatt_indirect toast_pointer;
369 
370 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
371 
372 		/* nested indirect Datums aren't allowed */
373 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
374 
375 		return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
376 	}
377 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
378 	{
379 		result = EOH_get_flat_size(DatumGetEOHP(value));
380 	}
381 	else if (VARATT_IS_COMPRESSED(attr))
382 	{
383 		/* here, va_rawsize is just the payload size */
384 		result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
385 	}
386 	else if (VARATT_IS_SHORT(attr))
387 	{
388 		/*
389 		 * we have to normalize the header length to VARHDRSZ or else the
390 		 * callers of this function will be confused.
391 		 */
392 		result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
393 	}
394 	else
395 	{
396 		/* plain untoasted datum */
397 		result = VARSIZE(attr);
398 	}
399 	return result;
400 }
401 
402 /* ----------
403  * toast_datum_size
404  *
405  *	Return the physical storage size (possibly compressed) of a varlena datum
406  * ----------
407  */
408 Size
toast_datum_size(Datum value)409 toast_datum_size(Datum value)
410 {
411 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
412 	Size		result;
413 
414 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
415 	{
416 		/*
417 		 * Attribute is stored externally - return the extsize whether
418 		 * compressed or not.  We do not count the size of the toast pointer
419 		 * ... should we?
420 		 */
421 		struct varatt_external toast_pointer;
422 
423 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
424 		result = toast_pointer.va_extsize;
425 	}
426 	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
427 	{
428 		struct varatt_indirect toast_pointer;
429 
430 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
431 
432 		/* nested indirect Datums aren't allowed */
433 		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
434 
435 		return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
436 	}
437 	else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
438 	{
439 		result = EOH_get_flat_size(DatumGetEOHP(value));
440 	}
441 	else if (VARATT_IS_SHORT(attr))
442 	{
443 		result = VARSIZE_SHORT(attr);
444 	}
445 	else
446 	{
447 		/*
448 		 * Attribute is stored inline either compressed or not, just calculate
449 		 * the size of the datum in either case.
450 		 */
451 		result = VARSIZE(attr);
452 	}
453 	return result;
454 }
455 
456 
457 /* ----------
458  * toast_delete -
459  *
460  *	Cascaded delete toast-entries on DELETE
461  * ----------
462  */
463 void
toast_delete(Relation rel,HeapTuple oldtup,bool is_speculative)464 toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
465 {
466 	TupleDesc	tupleDesc;
467 	Form_pg_attribute *att;
468 	int			numAttrs;
469 	int			i;
470 	Datum		toast_values[MaxHeapAttributeNumber];
471 	bool		toast_isnull[MaxHeapAttributeNumber];
472 
473 	/*
474 	 * We should only ever be called for tuples of plain relations or
475 	 * materialized views --- recursing on a toast rel is bad news.
476 	 */
477 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
478 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
479 
480 	/*
481 	 * Get the tuple descriptor and break down the tuple into fields.
482 	 *
483 	 * NOTE: it's debatable whether to use heap_deform_tuple() here or just
484 	 * heap_getattr() only the varlena columns.  The latter could win if there
485 	 * are few varlena columns and many non-varlena ones. However,
486 	 * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
487 	 * O(N^2) if there are many varlena columns, so it seems better to err on
488 	 * the side of linear cost.  (We won't even be here unless there's at
489 	 * least one varlena column, by the way.)
490 	 */
491 	tupleDesc = rel->rd_att;
492 	att = tupleDesc->attrs;
493 	numAttrs = tupleDesc->natts;
494 
495 	Assert(numAttrs <= MaxHeapAttributeNumber);
496 	heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
497 
498 	/*
499 	 * Check for external stored attributes and delete them from the secondary
500 	 * relation.
501 	 */
502 	for (i = 0; i < numAttrs; i++)
503 	{
504 		if (att[i]->attlen == -1)
505 		{
506 			Datum		value = toast_values[i];
507 
508 			if (toast_isnull[i])
509 				continue;
510 			else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
511 				toast_delete_datum(rel, value, is_speculative);
512 		}
513 	}
514 }
515 
516 
517 /* ----------
518  * toast_insert_or_update -
519  *
520  *	Delete no-longer-used toast-entries and create new ones to
521  *	make the new tuple fit on INSERT or UPDATE
522  *
523  * Inputs:
524  *	newtup: the candidate new tuple to be inserted
525  *	oldtup: the old row version for UPDATE, or NULL for INSERT
526  *	options: options to be passed to heap_insert() for toast rows
527  * Result:
528  *	either newtup if no toasting is needed, or a palloc'd modified tuple
529  *	that is what should actually get stored
530  *
531  * NOTE: neither newtup nor oldtup will be modified.  This is a change
532  * from the pre-8.1 API of this routine.
533  * ----------
534  */
535 HeapTuple
toast_insert_or_update(Relation rel,HeapTuple newtup,HeapTuple oldtup,int options)536 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
537 					   int options)
538 {
539 	HeapTuple	result_tuple;
540 	TupleDesc	tupleDesc;
541 	Form_pg_attribute *att;
542 	int			numAttrs;
543 	int			i;
544 
545 	bool		need_change = false;
546 	bool		need_free = false;
547 	bool		need_delold = false;
548 	bool		has_nulls = false;
549 
550 	Size		maxDataLen;
551 	Size		hoff;
552 
553 	char		toast_action[MaxHeapAttributeNumber];
554 	bool		toast_isnull[MaxHeapAttributeNumber];
555 	bool		toast_oldisnull[MaxHeapAttributeNumber];
556 	Datum		toast_values[MaxHeapAttributeNumber];
557 	Datum		toast_oldvalues[MaxHeapAttributeNumber];
558 	struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
559 	int32		toast_sizes[MaxHeapAttributeNumber];
560 	bool		toast_free[MaxHeapAttributeNumber];
561 	bool		toast_delold[MaxHeapAttributeNumber];
562 
563 	/*
564 	 * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super
565 	 * deletions just normally insert/delete the toast values. It seems
566 	 * easiest to deal with that here, instead on, potentially, multiple
567 	 * callers.
568 	 */
569 	options &= ~HEAP_INSERT_SPECULATIVE;
570 
571 	/*
572 	 * We should only ever be called for tuples of plain relations or
573 	 * materialized views --- recursing on a toast rel is bad news.
574 	 */
575 	Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
576 		   rel->rd_rel->relkind == RELKIND_MATVIEW);
577 
578 	/*
579 	 * Get the tuple descriptor and break down the tuple(s) into fields.
580 	 */
581 	tupleDesc = rel->rd_att;
582 	att = tupleDesc->attrs;
583 	numAttrs = tupleDesc->natts;
584 
585 	Assert(numAttrs <= MaxHeapAttributeNumber);
586 	heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
587 	if (oldtup != NULL)
588 		heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
589 
590 	/* ----------
591 	 * Then collect information about the values given
592 	 *
593 	 * NOTE: toast_action[i] can have these values:
594 	 *		' '		default handling
595 	 *		'p'		already processed --- don't touch it
596 	 *		'x'		incompressible, but OK to move off
597 	 *
598 	 * NOTE: toast_sizes[i] is only made valid for varlena attributes with
599 	 *		toast_action[i] different from 'p'.
600 	 * ----------
601 	 */
602 	memset(toast_action, ' ', numAttrs * sizeof(char));
603 	memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
604 	memset(toast_free, 0, numAttrs * sizeof(bool));
605 	memset(toast_delold, 0, numAttrs * sizeof(bool));
606 
607 	for (i = 0; i < numAttrs; i++)
608 	{
609 		struct varlena *old_value;
610 		struct varlena *new_value;
611 
612 		if (oldtup != NULL)
613 		{
614 			/*
615 			 * For UPDATE get the old and new values of this attribute
616 			 */
617 			old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
618 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
619 
620 			/*
621 			 * If the old value is stored on disk, check if it has changed so
622 			 * we have to delete it later.
623 			 */
624 			if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
625 				VARATT_IS_EXTERNAL_ONDISK(old_value))
626 			{
627 				if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
628 					memcmp((char *) old_value, (char *) new_value,
629 						   VARSIZE_EXTERNAL(old_value)) != 0)
630 				{
631 					/*
632 					 * The old external stored value isn't needed any more
633 					 * after the update
634 					 */
635 					toast_delold[i] = true;
636 					need_delold = true;
637 				}
638 				else
639 				{
640 					/*
641 					 * This attribute isn't changed by this update so we reuse
642 					 * the original reference to the old value in the new
643 					 * tuple.
644 					 */
645 					toast_action[i] = 'p';
646 					continue;
647 				}
648 			}
649 		}
650 		else
651 		{
652 			/*
653 			 * For INSERT simply get the new value
654 			 */
655 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
656 		}
657 
658 		/*
659 		 * Handle NULL attributes
660 		 */
661 		if (toast_isnull[i])
662 		{
663 			toast_action[i] = 'p';
664 			has_nulls = true;
665 			continue;
666 		}
667 
668 		/*
669 		 * Now look at varlena attributes
670 		 */
671 		if (att[i]->attlen == -1)
672 		{
673 			/*
674 			 * If the table's attribute says PLAIN always, force it so.
675 			 */
676 			if (att[i]->attstorage == 'p')
677 				toast_action[i] = 'p';
678 
679 			/*
680 			 * We took care of UPDATE above, so any external value we find
681 			 * still in the tuple must be someone else's that we cannot reuse
682 			 * (this includes the case of an out-of-line in-memory datum).
683 			 * Fetch it back (without decompression, unless we are forcing
684 			 * PLAIN storage).  If necessary, we'll push it out as a new
685 			 * external value below.
686 			 */
687 			if (VARATT_IS_EXTERNAL(new_value))
688 			{
689 				toast_oldexternal[i] = new_value;
690 				if (att[i]->attstorage == 'p')
691 					new_value = heap_tuple_untoast_attr(new_value);
692 				else
693 					new_value = heap_tuple_fetch_attr(new_value);
694 				toast_values[i] = PointerGetDatum(new_value);
695 				toast_free[i] = true;
696 				need_change = true;
697 				need_free = true;
698 			}
699 
700 			/*
701 			 * Remember the size of this attribute
702 			 */
703 			toast_sizes[i] = VARSIZE_ANY(new_value);
704 		}
705 		else
706 		{
707 			/*
708 			 * Not a varlena attribute, plain storage always
709 			 */
710 			toast_action[i] = 'p';
711 		}
712 	}
713 
714 	/* ----------
715 	 * Compress and/or save external until data fits into target length
716 	 *
717 	 *	1: Inline compress attributes with attstorage 'x', and store very
718 	 *	   large attributes with attstorage 'x' or 'e' external immediately
719 	 *	2: Store attributes with attstorage 'x' or 'e' external
720 	 *	3: Inline compress attributes with attstorage 'm'
721 	 *	4: Store attributes with attstorage 'm' external
722 	 * ----------
723 	 */
724 
725 	/* compute header overhead --- this should match heap_form_tuple() */
726 	hoff = SizeofHeapTupleHeader;
727 	if (has_nulls)
728 		hoff += BITMAPLEN(numAttrs);
729 	if (newtup->t_data->t_infomask & HEAP_HASOID)
730 		hoff += sizeof(Oid);
731 	hoff = MAXALIGN(hoff);
732 	/* now convert to a limit on the tuple data size */
733 	maxDataLen = TOAST_TUPLE_TARGET - hoff;
734 
735 	/*
736 	 * Look for attributes with attstorage 'x' to compress.  Also find large
737 	 * attributes with attstorage 'x' or 'e', and store them external.
738 	 */
739 	while (heap_compute_data_size(tupleDesc,
740 								  toast_values, toast_isnull) > maxDataLen)
741 	{
742 		int			biggest_attno = -1;
743 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
744 		Datum		old_value;
745 		Datum		new_value;
746 
747 		/*
748 		 * Search for the biggest yet unprocessed internal attribute
749 		 */
750 		for (i = 0; i < numAttrs; i++)
751 		{
752 			if (toast_action[i] != ' ')
753 				continue;
754 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
755 				continue;		/* can't happen, toast_action would be 'p' */
756 			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
757 				continue;
758 			if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
759 				continue;
760 			if (toast_sizes[i] > biggest_size)
761 			{
762 				biggest_attno = i;
763 				biggest_size = toast_sizes[i];
764 			}
765 		}
766 
767 		if (biggest_attno < 0)
768 			break;
769 
770 		/*
771 		 * Attempt to compress it inline, if it has attstorage 'x'
772 		 */
773 		i = biggest_attno;
774 		if (att[i]->attstorage == 'x')
775 		{
776 			old_value = toast_values[i];
777 			new_value = toast_compress_datum(old_value);
778 
779 			if (DatumGetPointer(new_value) != NULL)
780 			{
781 				/* successful compression */
782 				if (toast_free[i])
783 					pfree(DatumGetPointer(old_value));
784 				toast_values[i] = new_value;
785 				toast_free[i] = true;
786 				toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
787 				need_change = true;
788 				need_free = true;
789 			}
790 			else
791 			{
792 				/* incompressible, ignore on subsequent compression passes */
793 				toast_action[i] = 'x';
794 			}
795 		}
796 		else
797 		{
798 			/* has attstorage 'e', ignore on subsequent compression passes */
799 			toast_action[i] = 'x';
800 		}
801 
802 		/*
803 		 * If this value is by itself more than maxDataLen (after compression
804 		 * if any), push it out to the toast table immediately, if possible.
805 		 * This avoids uselessly compressing other fields in the common case
806 		 * where we have one long field and several short ones.
807 		 *
808 		 * XXX maybe the threshold should be less than maxDataLen?
809 		 */
810 		if (toast_sizes[i] > maxDataLen &&
811 			rel->rd_rel->reltoastrelid != InvalidOid)
812 		{
813 			old_value = toast_values[i];
814 			toast_action[i] = 'p';
815 			toast_values[i] = toast_save_datum(rel, toast_values[i],
816 											   toast_oldexternal[i], options);
817 			if (toast_free[i])
818 				pfree(DatumGetPointer(old_value));
819 			toast_free[i] = true;
820 			need_change = true;
821 			need_free = true;
822 		}
823 	}
824 
825 	/*
826 	 * Second we look for attributes of attstorage 'x' or 'e' that are still
827 	 * inline.  But skip this if there's no toast table to push them to.
828 	 */
829 	while (heap_compute_data_size(tupleDesc,
830 								  toast_values, toast_isnull) > maxDataLen &&
831 		   rel->rd_rel->reltoastrelid != InvalidOid)
832 	{
833 		int			biggest_attno = -1;
834 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
835 		Datum		old_value;
836 
837 		/*------
838 		 * Search for the biggest yet inlined attribute with
839 		 * attstorage equals 'x' or 'e'
840 		 *------
841 		 */
842 		for (i = 0; i < numAttrs; i++)
843 		{
844 			if (toast_action[i] == 'p')
845 				continue;
846 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
847 				continue;		/* can't happen, toast_action would be 'p' */
848 			if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
849 				continue;
850 			if (toast_sizes[i] > biggest_size)
851 			{
852 				biggest_attno = i;
853 				biggest_size = toast_sizes[i];
854 			}
855 		}
856 
857 		if (biggest_attno < 0)
858 			break;
859 
860 		/*
861 		 * Store this external
862 		 */
863 		i = biggest_attno;
864 		old_value = toast_values[i];
865 		toast_action[i] = 'p';
866 		toast_values[i] = toast_save_datum(rel, toast_values[i],
867 										   toast_oldexternal[i], options);
868 		if (toast_free[i])
869 			pfree(DatumGetPointer(old_value));
870 		toast_free[i] = true;
871 
872 		need_change = true;
873 		need_free = true;
874 	}
875 
876 	/*
877 	 * Round 3 - this time we take attributes with storage 'm' into
878 	 * compression
879 	 */
880 	while (heap_compute_data_size(tupleDesc,
881 								  toast_values, toast_isnull) > maxDataLen)
882 	{
883 		int			biggest_attno = -1;
884 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
885 		Datum		old_value;
886 		Datum		new_value;
887 
888 		/*
889 		 * Search for the biggest yet uncompressed internal attribute
890 		 */
891 		for (i = 0; i < numAttrs; i++)
892 		{
893 			if (toast_action[i] != ' ')
894 				continue;
895 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
896 				continue;		/* can't happen, toast_action would be 'p' */
897 			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
898 				continue;
899 			if (att[i]->attstorage != 'm')
900 				continue;
901 			if (toast_sizes[i] > biggest_size)
902 			{
903 				biggest_attno = i;
904 				biggest_size = toast_sizes[i];
905 			}
906 		}
907 
908 		if (biggest_attno < 0)
909 			break;
910 
911 		/*
912 		 * Attempt to compress it inline
913 		 */
914 		i = biggest_attno;
915 		old_value = toast_values[i];
916 		new_value = toast_compress_datum(old_value);
917 
918 		if (DatumGetPointer(new_value) != NULL)
919 		{
920 			/* successful compression */
921 			if (toast_free[i])
922 				pfree(DatumGetPointer(old_value));
923 			toast_values[i] = new_value;
924 			toast_free[i] = true;
925 			toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
926 			need_change = true;
927 			need_free = true;
928 		}
929 		else
930 		{
931 			/* incompressible, ignore on subsequent compression passes */
932 			toast_action[i] = 'x';
933 		}
934 	}
935 
936 	/*
937 	 * Finally we store attributes of type 'm' externally.  At this point we
938 	 * increase the target tuple size, so that 'm' attributes aren't stored
939 	 * externally unless really necessary.
940 	 */
941 	maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
942 
943 	while (heap_compute_data_size(tupleDesc,
944 								  toast_values, toast_isnull) > maxDataLen &&
945 		   rel->rd_rel->reltoastrelid != InvalidOid)
946 	{
947 		int			biggest_attno = -1;
948 		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
949 		Datum		old_value;
950 
951 		/*--------
952 		 * Search for the biggest yet inlined attribute with
953 		 * attstorage = 'm'
954 		 *--------
955 		 */
956 		for (i = 0; i < numAttrs; i++)
957 		{
958 			if (toast_action[i] == 'p')
959 				continue;
960 			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
961 				continue;		/* can't happen, toast_action would be 'p' */
962 			if (att[i]->attstorage != 'm')
963 				continue;
964 			if (toast_sizes[i] > biggest_size)
965 			{
966 				biggest_attno = i;
967 				biggest_size = toast_sizes[i];
968 			}
969 		}
970 
971 		if (biggest_attno < 0)
972 			break;
973 
974 		/*
975 		 * Store this external
976 		 */
977 		i = biggest_attno;
978 		old_value = toast_values[i];
979 		toast_action[i] = 'p';
980 		toast_values[i] = toast_save_datum(rel, toast_values[i],
981 										   toast_oldexternal[i], options);
982 		if (toast_free[i])
983 			pfree(DatumGetPointer(old_value));
984 		toast_free[i] = true;
985 
986 		need_change = true;
987 		need_free = true;
988 	}
989 
990 	/*
991 	 * In the case we toasted any values, we need to build a new heap tuple
992 	 * with the changed values.
993 	 */
994 	if (need_change)
995 	{
996 		HeapTupleHeader olddata = newtup->t_data;
997 		HeapTupleHeader new_data;
998 		int32		new_header_len;
999 		int32		new_data_len;
1000 		int32		new_tuple_len;
1001 
1002 		/*
1003 		 * Calculate the new size of the tuple.
1004 		 *
1005 		 * Note: we used to assume here that the old tuple's t_hoff must equal
1006 		 * the new_header_len value, but that was incorrect.  The old tuple
1007 		 * might have a smaller-than-current natts, if there's been an ALTER
1008 		 * TABLE ADD COLUMN since it was stored; and that would lead to a
1009 		 * different conclusion about the size of the null bitmap, or even
1010 		 * whether there needs to be one at all.
1011 		 */
1012 		new_header_len = SizeofHeapTupleHeader;
1013 		if (has_nulls)
1014 			new_header_len += BITMAPLEN(numAttrs);
1015 		if (olddata->t_infomask & HEAP_HASOID)
1016 			new_header_len += sizeof(Oid);
1017 		new_header_len = MAXALIGN(new_header_len);
1018 		new_data_len = heap_compute_data_size(tupleDesc,
1019 											  toast_values, toast_isnull);
1020 		new_tuple_len = new_header_len + new_data_len;
1021 
1022 		/*
1023 		 * Allocate and zero the space needed, and fill HeapTupleData fields.
1024 		 */
1025 		result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
1026 		result_tuple->t_len = new_tuple_len;
1027 		result_tuple->t_self = newtup->t_self;
1028 		result_tuple->t_tableOid = newtup->t_tableOid;
1029 		new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
1030 		result_tuple->t_data = new_data;
1031 
1032 		/*
1033 		 * Copy the existing tuple header, but adjust natts and t_hoff.
1034 		 */
1035 		memcpy(new_data, olddata, SizeofHeapTupleHeader);
1036 		HeapTupleHeaderSetNatts(new_data, numAttrs);
1037 		new_data->t_hoff = new_header_len;
1038 		if (olddata->t_infomask & HEAP_HASOID)
1039 			HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
1040 
1041 		/* Copy over the data, and fill the null bitmap if needed */
1042 		heap_fill_tuple(tupleDesc,
1043 						toast_values,
1044 						toast_isnull,
1045 						(char *) new_data + new_header_len,
1046 						new_data_len,
1047 						&(new_data->t_infomask),
1048 						has_nulls ? new_data->t_bits : NULL);
1049 	}
1050 	else
1051 		result_tuple = newtup;
1052 
1053 	/*
1054 	 * Free allocated temp values
1055 	 */
1056 	if (need_free)
1057 		for (i = 0; i < numAttrs; i++)
1058 			if (toast_free[i])
1059 				pfree(DatumGetPointer(toast_values[i]));
1060 
1061 	/*
1062 	 * Delete external values from the old tuple
1063 	 */
1064 	if (need_delold)
1065 		for (i = 0; i < numAttrs; i++)
1066 			if (toast_delold[i])
1067 				toast_delete_datum(rel, toast_oldvalues[i], false);
1068 
1069 	return result_tuple;
1070 }
1071 
1072 
1073 /* ----------
1074  * toast_flatten_tuple -
1075  *
1076  *	"Flatten" a tuple to contain no out-of-line toasted fields.
1077  *	(This does not eliminate compressed or short-header datums.)
1078  *
1079  *	Note: we expect the caller already checked HeapTupleHasExternal(tup),
1080  *	so there is no need for a short-circuit path.
1081  * ----------
1082  */
1083 HeapTuple
toast_flatten_tuple(HeapTuple tup,TupleDesc tupleDesc)1084 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
1085 {
1086 	HeapTuple	new_tuple;
1087 	Form_pg_attribute *att = tupleDesc->attrs;
1088 	int			numAttrs = tupleDesc->natts;
1089 	int			i;
1090 	Datum		toast_values[MaxTupleAttributeNumber];
1091 	bool		toast_isnull[MaxTupleAttributeNumber];
1092 	bool		toast_free[MaxTupleAttributeNumber];
1093 
1094 	/*
1095 	 * Break down the tuple into fields.
1096 	 */
1097 	Assert(numAttrs <= MaxTupleAttributeNumber);
1098 	heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
1099 
1100 	memset(toast_free, 0, numAttrs * sizeof(bool));
1101 
1102 	for (i = 0; i < numAttrs; i++)
1103 	{
1104 		/*
1105 		 * Look at non-null varlena attributes
1106 		 */
1107 		if (!toast_isnull[i] && att[i]->attlen == -1)
1108 		{
1109 			struct varlena *new_value;
1110 
1111 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1112 			if (VARATT_IS_EXTERNAL(new_value))
1113 			{
1114 				new_value = heap_tuple_fetch_attr(new_value);
1115 				toast_values[i] = PointerGetDatum(new_value);
1116 				toast_free[i] = true;
1117 			}
1118 		}
1119 	}
1120 
1121 	/*
1122 	 * Form the reconfigured tuple.
1123 	 */
1124 	new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
1125 
1126 	/*
1127 	 * Be sure to copy the tuple's OID and identity fields.  We also make a
1128 	 * point of copying visibility info, just in case anybody looks at those
1129 	 * fields in a syscache entry.
1130 	 */
1131 	if (tupleDesc->tdhasoid)
1132 		HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
1133 
1134 	new_tuple->t_self = tup->t_self;
1135 	new_tuple->t_tableOid = tup->t_tableOid;
1136 
1137 	new_tuple->t_data->t_choice = tup->t_data->t_choice;
1138 	new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
1139 	new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
1140 	new_tuple->t_data->t_infomask |=
1141 		tup->t_data->t_infomask & HEAP_XACT_MASK;
1142 	new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
1143 	new_tuple->t_data->t_infomask2 |=
1144 		tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1145 
1146 	/*
1147 	 * Free allocated temp values
1148 	 */
1149 	for (i = 0; i < numAttrs; i++)
1150 		if (toast_free[i])
1151 			pfree(DatumGetPointer(toast_values[i]));
1152 
1153 	return new_tuple;
1154 }
1155 
1156 
1157 /* ----------
1158  * toast_flatten_tuple_to_datum -
1159  *
1160  *	"Flatten" a tuple containing out-of-line toasted fields into a Datum.
1161  *	The result is always palloc'd in the current memory context.
1162  *
1163  *	We have a general rule that Datums of container types (rows, arrays,
1164  *	ranges, etc) must not contain any external TOAST pointers.  Without
1165  *	this rule, we'd have to look inside each Datum when preparing a tuple
1166  *	for storage, which would be expensive and would fail to extend cleanly
1167  *	to new sorts of container types.
1168  *
1169  *	However, we don't want to say that tuples represented as HeapTuples
1170  *	can't contain toasted fields, so instead this routine should be called
1171  *	when such a HeapTuple is being converted into a Datum.
1172  *
1173  *	While we're at it, we decompress any compressed fields too.  This is not
1174  *	necessary for correctness, but reflects an expectation that compression
1175  *	will be more effective if applied to the whole tuple not individual
1176  *	fields.  We are not so concerned about that that we want to deconstruct
1177  *	and reconstruct tuples just to get rid of compressed fields, however.
1178  *	So callers typically won't call this unless they see that the tuple has
1179  *	at least one external field.
1180  *
1181  *	On the other hand, in-line short-header varlena fields are left alone.
1182  *	If we "untoasted" them here, they'd just get changed back to short-header
1183  *	format anyway within heap_fill_tuple.
1184  * ----------
1185  */
1186 Datum
toast_flatten_tuple_to_datum(HeapTupleHeader tup,uint32 tup_len,TupleDesc tupleDesc)1187 toast_flatten_tuple_to_datum(HeapTupleHeader tup,
1188 							 uint32 tup_len,
1189 							 TupleDesc tupleDesc)
1190 {
1191 	HeapTupleHeader new_data;
1192 	int32		new_header_len;
1193 	int32		new_data_len;
1194 	int32		new_tuple_len;
1195 	HeapTupleData tmptup;
1196 	Form_pg_attribute *att = tupleDesc->attrs;
1197 	int			numAttrs = tupleDesc->natts;
1198 	int			i;
1199 	bool		has_nulls = false;
1200 	Datum		toast_values[MaxTupleAttributeNumber];
1201 	bool		toast_isnull[MaxTupleAttributeNumber];
1202 	bool		toast_free[MaxTupleAttributeNumber];
1203 
1204 	/* Build a temporary HeapTuple control structure */
1205 	tmptup.t_len = tup_len;
1206 	ItemPointerSetInvalid(&(tmptup.t_self));
1207 	tmptup.t_tableOid = InvalidOid;
1208 	tmptup.t_data = tup;
1209 
1210 	/*
1211 	 * Break down the tuple into fields.
1212 	 */
1213 	Assert(numAttrs <= MaxTupleAttributeNumber);
1214 	heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1215 
1216 	memset(toast_free, 0, numAttrs * sizeof(bool));
1217 
1218 	for (i = 0; i < numAttrs; i++)
1219 	{
1220 		/*
1221 		 * Look at non-null varlena attributes
1222 		 */
1223 		if (toast_isnull[i])
1224 			has_nulls = true;
1225 		else if (att[i]->attlen == -1)
1226 		{
1227 			struct varlena *new_value;
1228 
1229 			new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1230 			if (VARATT_IS_EXTERNAL(new_value) ||
1231 				VARATT_IS_COMPRESSED(new_value))
1232 			{
1233 				new_value = heap_tuple_untoast_attr(new_value);
1234 				toast_values[i] = PointerGetDatum(new_value);
1235 				toast_free[i] = true;
1236 			}
1237 		}
1238 	}
1239 
1240 	/*
1241 	 * Calculate the new size of the tuple.
1242 	 *
1243 	 * This should match the reconstruction code in toast_insert_or_update.
1244 	 */
1245 	new_header_len = SizeofHeapTupleHeader;
1246 	if (has_nulls)
1247 		new_header_len += BITMAPLEN(numAttrs);
1248 	if (tup->t_infomask & HEAP_HASOID)
1249 		new_header_len += sizeof(Oid);
1250 	new_header_len = MAXALIGN(new_header_len);
1251 	new_data_len = heap_compute_data_size(tupleDesc,
1252 										  toast_values, toast_isnull);
1253 	new_tuple_len = new_header_len + new_data_len;
1254 
1255 	new_data = (HeapTupleHeader) palloc0(new_tuple_len);
1256 
1257 	/*
1258 	 * Copy the existing tuple header, but adjust natts and t_hoff.
1259 	 */
1260 	memcpy(new_data, tup, SizeofHeapTupleHeader);
1261 	HeapTupleHeaderSetNatts(new_data, numAttrs);
1262 	new_data->t_hoff = new_header_len;
1263 	if (tup->t_infomask & HEAP_HASOID)
1264 		HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(tup));
1265 
1266 	/* Set the composite-Datum header fields correctly */
1267 	HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
1268 	HeapTupleHeaderSetTypeId(new_data, tupleDesc->tdtypeid);
1269 	HeapTupleHeaderSetTypMod(new_data, tupleDesc->tdtypmod);
1270 
1271 	/* Copy over the data, and fill the null bitmap if needed */
1272 	heap_fill_tuple(tupleDesc,
1273 					toast_values,
1274 					toast_isnull,
1275 					(char *) new_data + new_header_len,
1276 					new_data_len,
1277 					&(new_data->t_infomask),
1278 					has_nulls ? new_data->t_bits : NULL);
1279 
1280 	/*
1281 	 * Free allocated temp values
1282 	 */
1283 	for (i = 0; i < numAttrs; i++)
1284 		if (toast_free[i])
1285 			pfree(DatumGetPointer(toast_values[i]));
1286 
1287 	return PointerGetDatum(new_data);
1288 }
1289 
1290 
1291 /* ----------
1292  * toast_build_flattened_tuple -
1293  *
1294  *	Build a tuple containing no out-of-line toasted fields.
1295  *	(This does not eliminate compressed or short-header datums.)
1296  *
1297  *	This is essentially just like heap_form_tuple, except that it will
1298  *	expand any external-data pointers beforehand.
1299  *
1300  *	It's not very clear whether it would be preferable to decompress
1301  *	in-line compressed datums while at it.  For now, we don't.
1302  * ----------
1303  */
1304 HeapTuple
toast_build_flattened_tuple(TupleDesc tupleDesc,Datum * values,bool * isnull)1305 toast_build_flattened_tuple(TupleDesc tupleDesc,
1306 							Datum *values,
1307 							bool *isnull)
1308 {
1309 	HeapTuple	new_tuple;
1310 	Form_pg_attribute *att = tupleDesc->attrs;
1311 	int			numAttrs = tupleDesc->natts;
1312 	int			num_to_free;
1313 	int			i;
1314 	Datum		new_values[MaxTupleAttributeNumber];
1315 	Pointer		freeable_values[MaxTupleAttributeNumber];
1316 
1317 	/*
1318 	 * We can pass the caller's isnull array directly to heap_form_tuple, but
1319 	 * we potentially need to modify the values array.
1320 	 */
1321 	Assert(numAttrs <= MaxTupleAttributeNumber);
1322 	memcpy(new_values, values, numAttrs * sizeof(Datum));
1323 
1324 	num_to_free = 0;
1325 	for (i = 0; i < numAttrs; i++)
1326 	{
1327 		/*
1328 		 * Look at non-null varlena attributes
1329 		 */
1330 		if (!isnull[i] && att[i]->attlen == -1)
1331 		{
1332 			struct varlena *new_value;
1333 
1334 			new_value = (struct varlena *) DatumGetPointer(new_values[i]);
1335 			if (VARATT_IS_EXTERNAL(new_value))
1336 			{
1337 				new_value = heap_tuple_fetch_attr(new_value);
1338 				new_values[i] = PointerGetDatum(new_value);
1339 				freeable_values[num_to_free++] = (Pointer) new_value;
1340 			}
1341 		}
1342 	}
1343 
1344 	/*
1345 	 * Form the reconfigured tuple.
1346 	 */
1347 	new_tuple = heap_form_tuple(tupleDesc, new_values, isnull);
1348 
1349 	/*
1350 	 * Free allocated temp values
1351 	 */
1352 	for (i = 0; i < num_to_free; i++)
1353 		pfree(freeable_values[i]);
1354 
1355 	return new_tuple;
1356 }
1357 
1358 
1359 /* ----------
1360  * toast_compress_datum -
1361  *
1362  *	Create a compressed version of a varlena datum
1363  *
1364  *	If we fail (ie, compressed result is actually bigger than original)
1365  *	then return NULL.  We must not use compressed data if it'd expand
1366  *	the tuple!
1367  *
1368  *	We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1369  *	copying them.  But we can't handle external or compressed datums.
1370  * ----------
1371  */
1372 Datum
toast_compress_datum(Datum value)1373 toast_compress_datum(Datum value)
1374 {
1375 	struct varlena *tmp;
1376 	int32		valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1377 	int32		len;
1378 
1379 	Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1380 	Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1381 
1382 	/*
1383 	 * No point in wasting a palloc cycle if value size is out of the allowed
1384 	 * range for compression
1385 	 */
1386 	if (valsize < PGLZ_strategy_default->min_input_size ||
1387 		valsize > PGLZ_strategy_default->max_input_size)
1388 		return PointerGetDatum(NULL);
1389 
1390 	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
1391 									TOAST_COMPRESS_HDRSZ);
1392 
1393 	/*
1394 	 * We recheck the actual size even if pglz_compress() reports success,
1395 	 * because it might be satisfied with having saved as little as one byte
1396 	 * in the compressed data --- which could turn into a net loss once you
1397 	 * consider header and alignment padding.  Worst case, the compressed
1398 	 * format might require three padding bytes (plus header, which is
1399 	 * included in VARSIZE(tmp)), whereas the uncompressed format would take
1400 	 * only one header byte and no padding if the value is short enough.  So
1401 	 * we insist on a savings of more than 2 bytes to ensure we have a gain.
1402 	 */
1403 	len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)),
1404 						valsize,
1405 						TOAST_COMPRESS_RAWDATA(tmp),
1406 						PGLZ_strategy_default);
1407 	if (len >= 0 &&
1408 		len + TOAST_COMPRESS_HDRSZ < valsize - 2)
1409 	{
1410 		TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize);
1411 		SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ);
1412 		/* successful compression */
1413 		return PointerGetDatum(tmp);
1414 	}
1415 	else
1416 	{
1417 		/* incompressible data */
1418 		pfree(tmp);
1419 		return PointerGetDatum(NULL);
1420 	}
1421 }
1422 
1423 
1424 /* ----------
1425  * toast_get_valid_index
1426  *
1427  *	Get OID of valid index associated to given toast relation. A toast
1428  *	relation can have only one valid index at the same time.
1429  */
1430 Oid
toast_get_valid_index(Oid toastoid,LOCKMODE lock)1431 toast_get_valid_index(Oid toastoid, LOCKMODE lock)
1432 {
1433 	int			num_indexes;
1434 	int			validIndex;
1435 	Oid			validIndexOid;
1436 	Relation   *toastidxs;
1437 	Relation	toastrel;
1438 
1439 	/* Open the toast relation */
1440 	toastrel = heap_open(toastoid, lock);
1441 
1442 	/* Look for the valid index of the toast relation */
1443 	validIndex = toast_open_indexes(toastrel,
1444 									lock,
1445 									&toastidxs,
1446 									&num_indexes);
1447 	validIndexOid = RelationGetRelid(toastidxs[validIndex]);
1448 
1449 	/* Close the toast relation and all its indexes */
1450 	toast_close_indexes(toastidxs, num_indexes, NoLock);
1451 	heap_close(toastrel, NoLock);
1452 
1453 	return validIndexOid;
1454 }
1455 
1456 
1457 /* ----------
1458  * toast_save_datum -
1459  *
1460  *	Save one single datum into the secondary relation and return
1461  *	a Datum reference for it.
1462  *
1463  * rel: the main relation we're working with (not the toast rel!)
1464  * value: datum to be pushed to toast storage
1465  * oldexternal: if not NULL, toast pointer previously representing the datum
1466  * options: options to be passed to heap_insert() for toast rows
1467  * ----------
1468  */
1469 static Datum
toast_save_datum(Relation rel,Datum value,struct varlena * oldexternal,int options)1470 toast_save_datum(Relation rel, Datum value,
1471 				 struct varlena *oldexternal, int options)
1472 {
1473 	Relation	toastrel;
1474 	Relation   *toastidxs;
1475 	HeapTuple	toasttup;
1476 	TupleDesc	toasttupDesc;
1477 	Datum		t_values[3];
1478 	bool		t_isnull[3];
1479 	CommandId	mycid = GetCurrentCommandId(true);
1480 	struct varlena *result;
1481 	struct varatt_external toast_pointer;
1482 	union
1483 	{
1484 		struct varlena hdr;
1485 		/* this is to make the union big enough for a chunk: */
1486 		char		data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ];
1487 		/* ensure union is aligned well enough: */
1488 		int32		align_it;
1489 	}			chunk_data;
1490 	int32		chunk_size;
1491 	int32		chunk_seq = 0;
1492 	char	   *data_p;
1493 	int32		data_todo;
1494 	Pointer		dval = DatumGetPointer(value);
1495 	int			num_indexes;
1496 	int			validIndex;
1497 
1498 	Assert(!VARATT_IS_EXTERNAL(value));
1499 
1500 	/*
1501 	 * Open the toast relation and its indexes.  We can use the index to check
1502 	 * uniqueness of the OID we assign to the toasted item, even though it has
1503 	 * additional columns besides OID.
1504 	 */
1505 	toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1506 	toasttupDesc = toastrel->rd_att;
1507 
1508 	/* Open all the toast indexes and look for the valid one */
1509 	validIndex = toast_open_indexes(toastrel,
1510 									RowExclusiveLock,
1511 									&toastidxs,
1512 									&num_indexes);
1513 
1514 	/*
1515 	 * Get the data pointer and length, and compute va_rawsize and va_extsize.
1516 	 *
1517 	 * va_rawsize is the size of the equivalent fully uncompressed datum, so
1518 	 * we have to adjust for short headers.
1519 	 *
1520 	 * va_extsize is the actual size of the data payload in the toast records.
1521 	 */
1522 	if (VARATT_IS_SHORT(dval))
1523 	{
1524 		data_p = VARDATA_SHORT(dval);
1525 		data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1526 		toast_pointer.va_rawsize = data_todo + VARHDRSZ;	/* as if not short */
1527 		toast_pointer.va_extsize = data_todo;
1528 	}
1529 	else if (VARATT_IS_COMPRESSED(dval))
1530 	{
1531 		data_p = VARDATA(dval);
1532 		data_todo = VARSIZE(dval) - VARHDRSZ;
1533 		/* rawsize in a compressed datum is just the size of the payload */
1534 		toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1535 		toast_pointer.va_extsize = data_todo;
1536 		/* Assert that the numbers look like it's compressed */
1537 		Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1538 	}
1539 	else
1540 	{
1541 		data_p = VARDATA(dval);
1542 		data_todo = VARSIZE(dval) - VARHDRSZ;
1543 		toast_pointer.va_rawsize = VARSIZE(dval);
1544 		toast_pointer.va_extsize = data_todo;
1545 	}
1546 
1547 	/*
1548 	 * Insert the correct table OID into the result TOAST pointer.
1549 	 *
1550 	 * Normally this is the actual OID of the target toast table, but during
1551 	 * table-rewriting operations such as CLUSTER, we have to insert the OID
1552 	 * of the table's real permanent toast table instead.  rd_toastoid is set
1553 	 * if we have to substitute such an OID.
1554 	 */
1555 	if (OidIsValid(rel->rd_toastoid))
1556 		toast_pointer.va_toastrelid = rel->rd_toastoid;
1557 	else
1558 		toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1559 
1560 	/*
1561 	 * Choose an OID to use as the value ID for this toast value.
1562 	 *
1563 	 * Normally we just choose an unused OID within the toast table.  But
1564 	 * during table-rewriting operations where we are preserving an existing
1565 	 * toast table OID, we want to preserve toast value OIDs too.  So, if
1566 	 * rd_toastoid is set and we had a prior external value from that same
1567 	 * toast table, re-use its value ID.  If we didn't have a prior external
1568 	 * value (which is a corner case, but possible if the table's attstorage
1569 	 * options have been changed), we have to pick a value ID that doesn't
1570 	 * conflict with either new or existing toast value OIDs.
1571 	 */
1572 	if (!OidIsValid(rel->rd_toastoid))
1573 	{
1574 		/* normal case: just choose an unused OID */
1575 		toast_pointer.va_valueid =
1576 			GetNewOidWithIndex(toastrel,
1577 							   RelationGetRelid(toastidxs[validIndex]),
1578 							   (AttrNumber) 1);
1579 	}
1580 	else
1581 	{
1582 		/* rewrite case: check to see if value was in old toast table */
1583 		toast_pointer.va_valueid = InvalidOid;
1584 		if (oldexternal != NULL)
1585 		{
1586 			struct varatt_external old_toast_pointer;
1587 
1588 			Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
1589 			/* Must copy to access aligned fields */
1590 			VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1591 			if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1592 			{
1593 				/* This value came from the old toast table; reuse its OID */
1594 				toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1595 
1596 				/*
1597 				 * There is a corner case here: the table rewrite might have
1598 				 * to copy both live and recently-dead versions of a row, and
1599 				 * those versions could easily reference the same toast value.
1600 				 * When we copy the second or later version of such a row,
1601 				 * reusing the OID will mean we select an OID that's already
1602 				 * in the new toast table.  Check for that, and if so, just
1603 				 * fall through without writing the data again.
1604 				 *
1605 				 * While annoying and ugly-looking, this is a good thing
1606 				 * because it ensures that we wind up with only one copy of
1607 				 * the toast value when there is only one copy in the old
1608 				 * toast table.  Before we detected this case, we'd have made
1609 				 * multiple copies, wasting space; and what's worse, the
1610 				 * copies belonging to already-deleted heap tuples would not
1611 				 * be reclaimed by VACUUM.
1612 				 */
1613 				if (toastrel_valueid_exists(toastrel,
1614 											toast_pointer.va_valueid))
1615 				{
1616 					/* Match, so short-circuit the data storage loop below */
1617 					data_todo = 0;
1618 				}
1619 			}
1620 		}
1621 		if (toast_pointer.va_valueid == InvalidOid)
1622 		{
1623 			/*
1624 			 * new value; must choose an OID that doesn't conflict in either
1625 			 * old or new toast table
1626 			 */
1627 			do
1628 			{
1629 				toast_pointer.va_valueid =
1630 					GetNewOidWithIndex(toastrel,
1631 									   RelationGetRelid(toastidxs[validIndex]),
1632 									   (AttrNumber) 1);
1633 			} while (toastid_valueid_exists(rel->rd_toastoid,
1634 											toast_pointer.va_valueid));
1635 		}
1636 	}
1637 
1638 	/*
1639 	 * Initialize constant parts of the tuple data
1640 	 */
1641 	t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1642 	t_values[2] = PointerGetDatum(&chunk_data);
1643 	t_isnull[0] = false;
1644 	t_isnull[1] = false;
1645 	t_isnull[2] = false;
1646 
1647 	/*
1648 	 * Split up the item into chunks
1649 	 */
1650 	while (data_todo > 0)
1651 	{
1652 		int			i;
1653 
1654 		CHECK_FOR_INTERRUPTS();
1655 
1656 		/*
1657 		 * Calculate the size of this chunk
1658 		 */
1659 		chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1660 
1661 		/*
1662 		 * Build a tuple and store it
1663 		 */
1664 		t_values[1] = Int32GetDatum(chunk_seq++);
1665 		SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1666 		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1667 		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1668 
1669 		heap_insert(toastrel, toasttup, mycid, options, NULL);
1670 
1671 		/*
1672 		 * Create the index entry.  We cheat a little here by not using
1673 		 * FormIndexDatum: this relies on the knowledge that the index columns
1674 		 * are the same as the initial columns of the table for all the
1675 		 * indexes.  We also cheat by not providing an IndexInfo: this is okay
1676 		 * for now because btree doesn't need one, but we might have to be
1677 		 * more honest someday.
1678 		 *
1679 		 * Note also that there had better not be any user-created index on
1680 		 * the TOAST table, since we don't bother to update anything else.
1681 		 */
1682 		for (i = 0; i < num_indexes; i++)
1683 		{
1684 			/* Only index relations marked as ready can be updated */
1685 			if (IndexIsReady(toastidxs[i]->rd_index))
1686 				index_insert(toastidxs[i], t_values, t_isnull,
1687 							 &(toasttup->t_self),
1688 							 toastrel,
1689 							 toastidxs[i]->rd_index->indisunique ?
1690 							 UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
1691 							 NULL);
1692 		}
1693 
1694 		/*
1695 		 * Free memory
1696 		 */
1697 		heap_freetuple(toasttup);
1698 
1699 		/*
1700 		 * Move on to next chunk
1701 		 */
1702 		data_todo -= chunk_size;
1703 		data_p += chunk_size;
1704 	}
1705 
1706 	/*
1707 	 * Done - close toast relation and its indexes
1708 	 */
1709 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1710 	heap_close(toastrel, RowExclusiveLock);
1711 
1712 	/*
1713 	 * Create the TOAST pointer value that we'll return
1714 	 */
1715 	result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1716 	SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
1717 	memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1718 
1719 	return PointerGetDatum(result);
1720 }
1721 
1722 
1723 /* ----------
1724  * toast_delete_datum -
1725  *
1726  *	Delete a single external stored value.
1727  * ----------
1728  */
1729 static void
toast_delete_datum(Relation rel,Datum value,bool is_speculative)1730 toast_delete_datum(Relation rel, Datum value, bool is_speculative)
1731 {
1732 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1733 	struct varatt_external toast_pointer;
1734 	Relation	toastrel;
1735 	Relation   *toastidxs;
1736 	ScanKeyData toastkey;
1737 	SysScanDesc toastscan;
1738 	HeapTuple	toasttup;
1739 	int			num_indexes;
1740 	int			validIndex;
1741 	SnapshotData SnapshotToast;
1742 
1743 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1744 		return;
1745 
1746 	/* Must copy to access aligned fields */
1747 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1748 
1749 	/*
1750 	 * Open the toast relation and its indexes
1751 	 */
1752 	toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1753 
1754 	/* Fetch valid relation used for process */
1755 	validIndex = toast_open_indexes(toastrel,
1756 									RowExclusiveLock,
1757 									&toastidxs,
1758 									&num_indexes);
1759 
1760 	/*
1761 	 * Setup a scan key to find chunks with matching va_valueid
1762 	 */
1763 	ScanKeyInit(&toastkey,
1764 				(AttrNumber) 1,
1765 				BTEqualStrategyNumber, F_OIDEQ,
1766 				ObjectIdGetDatum(toast_pointer.va_valueid));
1767 
1768 	/*
1769 	 * Find all the chunks.  (We don't actually care whether we see them in
1770 	 * sequence or not, but since we've already locked the index we might as
1771 	 * well use systable_beginscan_ordered.)
1772 	 */
1773 	init_toast_snapshot(&SnapshotToast);
1774 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1775 										   &SnapshotToast, 1, &toastkey);
1776 	while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1777 	{
1778 		/*
1779 		 * Have a chunk, delete it
1780 		 */
1781 		if (is_speculative)
1782 			heap_abort_speculative(toastrel, toasttup);
1783 		else
1784 			simple_heap_delete(toastrel, &toasttup->t_self);
1785 	}
1786 
1787 	/*
1788 	 * End scan and close relations
1789 	 */
1790 	systable_endscan_ordered(toastscan);
1791 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1792 	heap_close(toastrel, RowExclusiveLock);
1793 }
1794 
1795 
1796 /* ----------
1797  * toastrel_valueid_exists -
1798  *
1799  *	Test whether a toast value with the given ID exists in the toast relation.
1800  *	For safety, we consider a value to exist if there are either live or dead
1801  *	toast rows with that ID; see notes for GetNewOid().
1802  * ----------
1803  */
1804 static bool
toastrel_valueid_exists(Relation toastrel,Oid valueid)1805 toastrel_valueid_exists(Relation toastrel, Oid valueid)
1806 {
1807 	bool		result = false;
1808 	ScanKeyData toastkey;
1809 	SysScanDesc toastscan;
1810 	int			num_indexes;
1811 	int			validIndex;
1812 	Relation   *toastidxs;
1813 
1814 	/* Fetch a valid index relation */
1815 	validIndex = toast_open_indexes(toastrel,
1816 									RowExclusiveLock,
1817 									&toastidxs,
1818 									&num_indexes);
1819 
1820 	/*
1821 	 * Setup a scan key to find chunks with matching va_valueid
1822 	 */
1823 	ScanKeyInit(&toastkey,
1824 				(AttrNumber) 1,
1825 				BTEqualStrategyNumber, F_OIDEQ,
1826 				ObjectIdGetDatum(valueid));
1827 
1828 	/*
1829 	 * Is there any such chunk?
1830 	 */
1831 	toastscan = systable_beginscan(toastrel,
1832 								   RelationGetRelid(toastidxs[validIndex]),
1833 								   true, SnapshotAny, 1, &toastkey);
1834 
1835 	if (systable_getnext(toastscan) != NULL)
1836 		result = true;
1837 
1838 	systable_endscan(toastscan);
1839 
1840 	/* Clean up */
1841 	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1842 
1843 	return result;
1844 }
1845 
1846 /* ----------
1847  * toastid_valueid_exists -
1848  *
1849  *	As above, but work from toast rel's OID not an open relation
1850  * ----------
1851  */
1852 static bool
toastid_valueid_exists(Oid toastrelid,Oid valueid)1853 toastid_valueid_exists(Oid toastrelid, Oid valueid)
1854 {
1855 	bool		result;
1856 	Relation	toastrel;
1857 
1858 	toastrel = heap_open(toastrelid, AccessShareLock);
1859 
1860 	result = toastrel_valueid_exists(toastrel, valueid);
1861 
1862 	heap_close(toastrel, AccessShareLock);
1863 
1864 	return result;
1865 }
1866 
1867 
1868 /* ----------
1869  * toast_fetch_datum -
1870  *
1871  *	Reconstruct an in memory Datum from the chunks saved
1872  *	in the toast relation
1873  * ----------
1874  */
1875 static struct varlena *
toast_fetch_datum(struct varlena * attr)1876 toast_fetch_datum(struct varlena *attr)
1877 {
1878 	Relation	toastrel;
1879 	Relation   *toastidxs;
1880 	ScanKeyData toastkey;
1881 	SysScanDesc toastscan;
1882 	HeapTuple	ttup;
1883 	TupleDesc	toasttupDesc;
1884 	struct varlena *result;
1885 	struct varatt_external toast_pointer;
1886 	int32		ressize;
1887 	int32		residx,
1888 				nextidx;
1889 	int32		numchunks;
1890 	Pointer		chunk;
1891 	bool		isnull;
1892 	char	   *chunkdata;
1893 	int32		chunksize;
1894 	int			num_indexes;
1895 	int			validIndex;
1896 	SnapshotData SnapshotToast;
1897 
1898 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1899 		elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
1900 
1901 	/* Must copy to access aligned fields */
1902 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1903 
1904 	ressize = toast_pointer.va_extsize;
1905 	numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1906 
1907 	result = (struct varlena *) palloc(ressize + VARHDRSZ);
1908 
1909 	if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1910 		SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1911 	else
1912 		SET_VARSIZE(result, ressize + VARHDRSZ);
1913 
1914 	/*
1915 	 * Open the toast relation and its indexes
1916 	 */
1917 	toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1918 	toasttupDesc = toastrel->rd_att;
1919 
1920 	/* Look for the valid index of the toast relation */
1921 	validIndex = toast_open_indexes(toastrel,
1922 									AccessShareLock,
1923 									&toastidxs,
1924 									&num_indexes);
1925 
1926 	/*
1927 	 * Setup a scan key to fetch from the index by va_valueid
1928 	 */
1929 	ScanKeyInit(&toastkey,
1930 				(AttrNumber) 1,
1931 				BTEqualStrategyNumber, F_OIDEQ,
1932 				ObjectIdGetDatum(toast_pointer.va_valueid));
1933 
1934 	/*
1935 	 * Read the chunks by index
1936 	 *
1937 	 * Note that because the index is actually on (valueid, chunkidx) we will
1938 	 * see the chunks in chunkidx order, even though we didn't explicitly ask
1939 	 * for it.
1940 	 */
1941 	nextidx = 0;
1942 
1943 	init_toast_snapshot(&SnapshotToast);
1944 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1945 										   &SnapshotToast, 1, &toastkey);
1946 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1947 	{
1948 		/*
1949 		 * Have a chunk, extract the sequence number and the data
1950 		 */
1951 		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1952 		Assert(!isnull);
1953 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1954 		Assert(!isnull);
1955 		if (!VARATT_IS_EXTENDED(chunk))
1956 		{
1957 			chunksize = VARSIZE(chunk) - VARHDRSZ;
1958 			chunkdata = VARDATA(chunk);
1959 		}
1960 		else if (VARATT_IS_SHORT(chunk))
1961 		{
1962 			/* could happen due to heap_form_tuple doing its thing */
1963 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1964 			chunkdata = VARDATA_SHORT(chunk);
1965 		}
1966 		else
1967 		{
1968 			/* should never happen */
1969 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1970 				 toast_pointer.va_valueid,
1971 				 RelationGetRelationName(toastrel));
1972 			chunksize = 0;		/* keep compiler quiet */
1973 			chunkdata = NULL;
1974 		}
1975 
1976 		/*
1977 		 * Some checks on the data we've found
1978 		 */
1979 		if (residx != nextidx)
1980 			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1981 				 residx, nextidx,
1982 				 toast_pointer.va_valueid,
1983 				 RelationGetRelationName(toastrel));
1984 		if (residx < numchunks - 1)
1985 		{
1986 			if (chunksize != TOAST_MAX_CHUNK_SIZE)
1987 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1988 					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1989 					 residx, numchunks,
1990 					 toast_pointer.va_valueid,
1991 					 RelationGetRelationName(toastrel));
1992 		}
1993 		else if (residx == numchunks - 1)
1994 		{
1995 			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1996 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
1997 					 chunksize,
1998 					 (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1999 					 residx,
2000 					 toast_pointer.va_valueid,
2001 					 RelationGetRelationName(toastrel));
2002 		}
2003 		else
2004 			elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2005 				 residx,
2006 				 0, numchunks - 1,
2007 				 toast_pointer.va_valueid,
2008 				 RelationGetRelationName(toastrel));
2009 
2010 		/*
2011 		 * Copy the data into proper place in our result
2012 		 */
2013 		memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
2014 			   chunkdata,
2015 			   chunksize);
2016 
2017 		nextidx++;
2018 	}
2019 
2020 	/*
2021 	 * Final checks that we successfully fetched the datum
2022 	 */
2023 	if (nextidx != numchunks)
2024 		elog(ERROR, "missing chunk number %d for toast value %u in %s",
2025 			 nextidx,
2026 			 toast_pointer.va_valueid,
2027 			 RelationGetRelationName(toastrel));
2028 
2029 	/*
2030 	 * End scan and close relations
2031 	 */
2032 	systable_endscan_ordered(toastscan);
2033 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2034 	heap_close(toastrel, AccessShareLock);
2035 
2036 	return result;
2037 }
2038 
2039 /* ----------
2040  * toast_fetch_datum_slice -
2041  *
2042  *	Reconstruct a segment of a Datum from the chunks saved
2043  *	in the toast relation
2044  * ----------
2045  */
2046 static struct varlena *
toast_fetch_datum_slice(struct varlena * attr,int32 sliceoffset,int32 length)2047 toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, int32 length)
2048 {
2049 	Relation	toastrel;
2050 	Relation   *toastidxs;
2051 	ScanKeyData toastkey[3];
2052 	int			nscankeys;
2053 	SysScanDesc toastscan;
2054 	HeapTuple	ttup;
2055 	TupleDesc	toasttupDesc;
2056 	struct varlena *result;
2057 	struct varatt_external toast_pointer;
2058 	int32		attrsize;
2059 	int32		residx;
2060 	int32		nextidx;
2061 	int			numchunks;
2062 	int			startchunk;
2063 	int			endchunk;
2064 	int32		startoffset;
2065 	int32		endoffset;
2066 	int			totalchunks;
2067 	Pointer		chunk;
2068 	bool		isnull;
2069 	char	   *chunkdata;
2070 	int32		chunksize;
2071 	int32		chcpystrt;
2072 	int32		chcpyend;
2073 	int			num_indexes;
2074 	int			validIndex;
2075 	SnapshotData SnapshotToast;
2076 
2077 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
2078 		elog(ERROR, "toast_fetch_datum_slice shouldn't be called for non-ondisk datums");
2079 
2080 	/* Must copy to access aligned fields */
2081 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
2082 
2083 	/*
2084 	 * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
2085 	 * we can't return a compressed datum which is meaningful to toast later
2086 	 */
2087 	Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
2088 
2089 	attrsize = toast_pointer.va_extsize;
2090 	totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
2091 
2092 	if (sliceoffset >= attrsize)
2093 	{
2094 		sliceoffset = 0;
2095 		length = 0;
2096 	}
2097 
2098 	if (((sliceoffset + length) > attrsize) || length < 0)
2099 		length = attrsize - sliceoffset;
2100 
2101 	result = (struct varlena *) palloc(length + VARHDRSZ);
2102 
2103 	if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2104 		SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
2105 	else
2106 		SET_VARSIZE(result, length + VARHDRSZ);
2107 
2108 	if (length == 0)
2109 		return result;			/* Can save a lot of work at this point! */
2110 
2111 	startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
2112 	endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
2113 	numchunks = (endchunk - startchunk) + 1;
2114 
2115 	startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
2116 	endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
2117 
2118 	/*
2119 	 * Open the toast relation and its indexes
2120 	 */
2121 	toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
2122 	toasttupDesc = toastrel->rd_att;
2123 
2124 	/* Look for the valid index of toast relation */
2125 	validIndex = toast_open_indexes(toastrel,
2126 									AccessShareLock,
2127 									&toastidxs,
2128 									&num_indexes);
2129 
2130 	/*
2131 	 * Setup a scan key to fetch from the index. This is either two keys or
2132 	 * three depending on the number of chunks.
2133 	 */
2134 	ScanKeyInit(&toastkey[0],
2135 				(AttrNumber) 1,
2136 				BTEqualStrategyNumber, F_OIDEQ,
2137 				ObjectIdGetDatum(toast_pointer.va_valueid));
2138 
2139 	/*
2140 	 * Use equality condition for one chunk, a range condition otherwise:
2141 	 */
2142 	if (numchunks == 1)
2143 	{
2144 		ScanKeyInit(&toastkey[1],
2145 					(AttrNumber) 2,
2146 					BTEqualStrategyNumber, F_INT4EQ,
2147 					Int32GetDatum(startchunk));
2148 		nscankeys = 2;
2149 	}
2150 	else
2151 	{
2152 		ScanKeyInit(&toastkey[1],
2153 					(AttrNumber) 2,
2154 					BTGreaterEqualStrategyNumber, F_INT4GE,
2155 					Int32GetDatum(startchunk));
2156 		ScanKeyInit(&toastkey[2],
2157 					(AttrNumber) 2,
2158 					BTLessEqualStrategyNumber, F_INT4LE,
2159 					Int32GetDatum(endchunk));
2160 		nscankeys = 3;
2161 	}
2162 
2163 	/*
2164 	 * Read the chunks by index
2165 	 *
2166 	 * The index is on (valueid, chunkidx) so they will come in order
2167 	 */
2168 	init_toast_snapshot(&SnapshotToast);
2169 	nextidx = startchunk;
2170 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
2171 										   &SnapshotToast, nscankeys, toastkey);
2172 	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
2173 	{
2174 		/*
2175 		 * Have a chunk, extract the sequence number and the data
2176 		 */
2177 		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
2178 		Assert(!isnull);
2179 		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
2180 		Assert(!isnull);
2181 		if (!VARATT_IS_EXTENDED(chunk))
2182 		{
2183 			chunksize = VARSIZE(chunk) - VARHDRSZ;
2184 			chunkdata = VARDATA(chunk);
2185 		}
2186 		else if (VARATT_IS_SHORT(chunk))
2187 		{
2188 			/* could happen due to heap_form_tuple doing its thing */
2189 			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2190 			chunkdata = VARDATA_SHORT(chunk);
2191 		}
2192 		else
2193 		{
2194 			/* should never happen */
2195 			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
2196 				 toast_pointer.va_valueid,
2197 				 RelationGetRelationName(toastrel));
2198 			chunksize = 0;		/* keep compiler quiet */
2199 			chunkdata = NULL;
2200 		}
2201 
2202 		/*
2203 		 * Some checks on the data we've found
2204 		 */
2205 		if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
2206 			elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
2207 				 residx, nextidx,
2208 				 toast_pointer.va_valueid,
2209 				 RelationGetRelationName(toastrel));
2210 		if (residx < totalchunks - 1)
2211 		{
2212 			if (chunksize != TOAST_MAX_CHUNK_SIZE)
2213 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
2214 					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
2215 					 residx, totalchunks,
2216 					 toast_pointer.va_valueid,
2217 					 RelationGetRelationName(toastrel));
2218 		}
2219 		else if (residx == totalchunks - 1)
2220 		{
2221 			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
2222 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
2223 					 chunksize,
2224 					 (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
2225 					 residx,
2226 					 toast_pointer.va_valueid,
2227 					 RelationGetRelationName(toastrel));
2228 		}
2229 		else
2230 			elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2231 				 residx,
2232 				 0, totalchunks - 1,
2233 				 toast_pointer.va_valueid,
2234 				 RelationGetRelationName(toastrel));
2235 
2236 		/*
2237 		 * Copy the data into proper place in our result
2238 		 */
2239 		chcpystrt = 0;
2240 		chcpyend = chunksize - 1;
2241 		if (residx == startchunk)
2242 			chcpystrt = startoffset;
2243 		if (residx == endchunk)
2244 			chcpyend = endoffset;
2245 
2246 		memcpy(VARDATA(result) +
2247 			   (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
2248 			   chunkdata + chcpystrt,
2249 			   (chcpyend - chcpystrt) + 1);
2250 
2251 		nextidx++;
2252 	}
2253 
2254 	/*
2255 	 * Final checks that we successfully fetched the datum
2256 	 */
2257 	if (nextidx != (endchunk + 1))
2258 		elog(ERROR, "missing chunk number %d for toast value %u in %s",
2259 			 nextidx,
2260 			 toast_pointer.va_valueid,
2261 			 RelationGetRelationName(toastrel));
2262 
2263 	/*
2264 	 * End scan and close relations
2265 	 */
2266 	systable_endscan_ordered(toastscan);
2267 	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2268 	heap_close(toastrel, AccessShareLock);
2269 
2270 	return result;
2271 }
2272 
2273 /* ----------
2274  * toast_decompress_datum -
2275  *
2276  * Decompress a compressed version of a varlena datum
2277  */
2278 static struct varlena *
toast_decompress_datum(struct varlena * attr)2279 toast_decompress_datum(struct varlena *attr)
2280 {
2281 	struct varlena *result;
2282 
2283 	Assert(VARATT_IS_COMPRESSED(attr));
2284 
2285 	result = (struct varlena *)
2286 		palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2287 	SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2288 
2289 	if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
2290 						VARSIZE(attr) - TOAST_COMPRESS_HDRSZ,
2291 						VARDATA(result),
2292 						TOAST_COMPRESS_RAWSIZE(attr)) < 0)
2293 		elog(ERROR, "compressed data is corrupted");
2294 
2295 	return result;
2296 }
2297 
2298 
2299 /* ----------
2300  * toast_open_indexes
2301  *
2302  *	Get an array of the indexes associated to the given toast relation
2303  *	and return as well the position of the valid index used by the toast
2304  *	relation in this array. It is the responsibility of the caller of this
2305  *	function to close the indexes as well as free them.
2306  */
2307 static int
toast_open_indexes(Relation toastrel,LOCKMODE lock,Relation ** toastidxs,int * num_indexes)2308 toast_open_indexes(Relation toastrel,
2309 				   LOCKMODE lock,
2310 				   Relation **toastidxs,
2311 				   int *num_indexes)
2312 {
2313 	int			i = 0;
2314 	int			res = 0;
2315 	bool		found = false;
2316 	List	   *indexlist;
2317 	ListCell   *lc;
2318 
2319 	/* Get index list of the toast relation */
2320 	indexlist = RelationGetIndexList(toastrel);
2321 	Assert(indexlist != NIL);
2322 
2323 	*num_indexes = list_length(indexlist);
2324 
2325 	/* Open all the index relations */
2326 	*toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation));
2327 	foreach(lc, indexlist)
2328 		(*toastidxs)[i++] = index_open(lfirst_oid(lc), lock);
2329 
2330 	/* Fetch the first valid index in list */
2331 	for (i = 0; i < *num_indexes; i++)
2332 	{
2333 		Relation	toastidx = (*toastidxs)[i];
2334 
2335 		if (toastidx->rd_index->indisvalid)
2336 		{
2337 			res = i;
2338 			found = true;
2339 			break;
2340 		}
2341 	}
2342 
2343 	/*
2344 	 * Free index list, not necessary anymore as relations are opened and a
2345 	 * valid index has been found.
2346 	 */
2347 	list_free(indexlist);
2348 
2349 	/*
2350 	 * The toast relation should have one valid index, so something is going
2351 	 * wrong if there is nothing.
2352 	 */
2353 	if (!found)
2354 		elog(ERROR, "no valid index found for toast relation with Oid %u",
2355 			 RelationGetRelid(toastrel));
2356 
2357 	return res;
2358 }
2359 
2360 /* ----------
2361  * toast_close_indexes
2362  *
2363  *	Close an array of indexes for a toast relation and free it. This should
2364  *	be called for a set of indexes opened previously with toast_open_indexes.
2365  */
2366 static void
toast_close_indexes(Relation * toastidxs,int num_indexes,LOCKMODE lock)2367 toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock)
2368 {
2369 	int			i;
2370 
2371 	/* Close relations and clean up things */
2372 	for (i = 0; i < num_indexes; i++)
2373 		index_close(toastidxs[i], lock);
2374 	pfree(toastidxs);
2375 }
2376 
2377 /* ----------
2378  * init_toast_snapshot
2379  *
2380  *	Initialize an appropriate TOAST snapshot.  We must use an MVCC snapshot
2381  *	to initialize the TOAST snapshot; since we don't know which one to use,
2382  *	just use the oldest one.  This is safe: at worst, we will get a "snapshot
2383  *	too old" error that might have been avoided otherwise.
2384  */
2385 static void
init_toast_snapshot(Snapshot toast_snapshot)2386 init_toast_snapshot(Snapshot toast_snapshot)
2387 {
2388 	Snapshot	snapshot = GetOldestSnapshot();
2389 
2390 	if (snapshot == NULL)
2391 		elog(ERROR, "no known snapshots");
2392 
2393 	InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken);
2394 }
2395