1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c
4  *	  routines for manipulating inversion fs large objects. This file
5  *	  contains the user-level large object application interface routines.
6  *
7  *
8  * Note: we access pg_largeobject.data using its C struct declaration.
9  * This is safe because it immediately follows pageno which is an int4 field,
10  * and therefore the data field will always be 4-byte aligned, even if it
11  * is in the short 1-byte-header format.  We have to detoast it since it's
12  * quite likely to be in compressed or short format.  We also need to check
13  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14  *
15  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16  * does most of the backend code.  We expect that CurrentMemoryContext will
17  * be a short-lived context.  Data that must persist across function calls
18  * is kept either in CacheMemoryContext (the Relation structs) or in the
19  * memory context given to inv_open (for LargeObjectDesc structs).
20  *
21  *
22  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
23  * Portions Copyright (c) 1994, Regents of the University of California
24  *
25  *
26  * IDENTIFICATION
27  *	  src/backend/storage/large_object/inv_api.c
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32 
33 #include <limits.h>
34 
35 #include "access/genam.h"
36 #include "access/heapam.h"
37 #include "access/sysattr.h"
38 #include "access/tuptoaster.h"
39 #include "access/xact.h"
40 #include "catalog/dependency.h"
41 #include "catalog/indexing.h"
42 #include "catalog/objectaccess.h"
43 #include "catalog/pg_largeobject.h"
44 #include "catalog/pg_largeobject_metadata.h"
45 #include "libpq/libpq-fs.h"
46 #include "miscadmin.h"
47 #include "storage/large_object.h"
48 #include "utils/fmgroids.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51 #include "utils/tqual.h"
52 
53 
54 /*
55  * All accesses to pg_largeobject and its index make use of a single Relation
56  * reference, so that we only need to open pg_relation once per transaction.
57  * To avoid problems when the first such reference occurs inside a
58  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
59  * the Relation reference to TopTransactionResourceOwner.
60  */
61 static Relation lo_heap_r = NULL;
62 static Relation lo_index_r = NULL;
63 
64 
65 /*
66  * Open pg_largeobject and its index, if not already done in current xact
67  */
68 static void
open_lo_relation(void)69 open_lo_relation(void)
70 {
71 	ResourceOwner currentOwner;
72 
73 	if (lo_heap_r && lo_index_r)
74 		return;					/* already open in current xact */
75 
76 	/* Arrange for the top xact to own these relation references */
77 	currentOwner = CurrentResourceOwner;
78 	PG_TRY();
79 	{
80 		CurrentResourceOwner = TopTransactionResourceOwner;
81 
82 		/* Use RowExclusiveLock since we might either read or write */
83 		if (lo_heap_r == NULL)
84 			lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
85 		if (lo_index_r == NULL)
86 			lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
87 	}
88 	PG_CATCH();
89 	{
90 		/* Ensure CurrentResourceOwner is restored on error */
91 		CurrentResourceOwner = currentOwner;
92 		PG_RE_THROW();
93 	}
94 	PG_END_TRY();
95 	CurrentResourceOwner = currentOwner;
96 }
97 
98 /*
99  * Clean up at main transaction end
100  */
101 void
close_lo_relation(bool isCommit)102 close_lo_relation(bool isCommit)
103 {
104 	if (lo_heap_r || lo_index_r)
105 	{
106 		/*
107 		 * Only bother to close if committing; else abort cleanup will handle
108 		 * it
109 		 */
110 		if (isCommit)
111 		{
112 			ResourceOwner currentOwner;
113 
114 			currentOwner = CurrentResourceOwner;
115 			PG_TRY();
116 			{
117 				CurrentResourceOwner = TopTransactionResourceOwner;
118 
119 				if (lo_index_r)
120 					index_close(lo_index_r, NoLock);
121 				if (lo_heap_r)
122 					heap_close(lo_heap_r, NoLock);
123 			}
124 			PG_CATCH();
125 			{
126 				/* Ensure CurrentResourceOwner is restored on error */
127 				CurrentResourceOwner = currentOwner;
128 				PG_RE_THROW();
129 			}
130 			PG_END_TRY();
131 			CurrentResourceOwner = currentOwner;
132 		}
133 		lo_heap_r = NULL;
134 		lo_index_r = NULL;
135 	}
136 }
137 
138 
139 /*
140  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
141  * read with can be specified.
142  */
143 static bool
myLargeObjectExists(Oid loid,Snapshot snapshot)144 myLargeObjectExists(Oid loid, Snapshot snapshot)
145 {
146 	Relation	pg_lo_meta;
147 	ScanKeyData skey[1];
148 	SysScanDesc sd;
149 	HeapTuple	tuple;
150 	bool		retval = false;
151 
152 	ScanKeyInit(&skey[0],
153 				ObjectIdAttributeNumber,
154 				BTEqualStrategyNumber, F_OIDEQ,
155 				ObjectIdGetDatum(loid));
156 
157 	pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
158 						   AccessShareLock);
159 
160 	sd = systable_beginscan(pg_lo_meta,
161 							LargeObjectMetadataOidIndexId, true,
162 							snapshot, 1, skey);
163 
164 	tuple = systable_getnext(sd);
165 	if (HeapTupleIsValid(tuple))
166 		retval = true;
167 
168 	systable_endscan(sd);
169 
170 	heap_close(pg_lo_meta, AccessShareLock);
171 
172 	return retval;
173 }
174 
175 
176 /*
177  * Extract data field from a pg_largeobject tuple, detoasting if needed
178  * and verifying that the length is sane.  Returns data pointer (a bytea *),
179  * data length, and an indication of whether to pfree the data pointer.
180  */
181 static void
getdatafield(Form_pg_largeobject tuple,bytea ** pdatafield,int * plen,bool * pfreeit)182 getdatafield(Form_pg_largeobject tuple,
183 			 bytea **pdatafield,
184 			 int *plen,
185 			 bool *pfreeit)
186 {
187 	bytea	   *datafield;
188 	int			len;
189 	bool		freeit;
190 
191 	datafield = &(tuple->data); /* see note at top of file */
192 	freeit = false;
193 	if (VARATT_IS_EXTENDED(datafield))
194 	{
195 		datafield = (bytea *)
196 			heap_tuple_untoast_attr((struct varlena *) datafield);
197 		freeit = true;
198 	}
199 	len = VARSIZE(datafield) - VARHDRSZ;
200 	if (len < 0 || len > LOBLKSIZE)
201 		ereport(ERROR,
202 				(errcode(ERRCODE_DATA_CORRUPTED),
203 				 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
204 						tuple->loid, tuple->pageno, len)));
205 	*pdatafield = datafield;
206 	*plen = len;
207 	*pfreeit = freeit;
208 }
209 
210 
211 /*
212  *	inv_create -- create a new large object
213  *
214  *	Arguments:
215  *	  lobjId - OID to use for new large object, or InvalidOid to pick one
216  *
217  *	Returns:
218  *	  OID of new object
219  *
220  * If lobjId is not InvalidOid, then an error occurs if the OID is already
221  * in use.
222  */
223 Oid
inv_create(Oid lobjId)224 inv_create(Oid lobjId)
225 {
226 	Oid			lobjId_new;
227 
228 	/*
229 	 * Create a new largeobject with empty data pages
230 	 */
231 	lobjId_new = LargeObjectCreate(lobjId);
232 
233 	/*
234 	 * dependency on the owner of largeobject
235 	 *
236 	 * The reason why we use LargeObjectRelationId instead of
237 	 * LargeObjectMetadataRelationId here is to provide backward compatibility
238 	 * to the applications which utilize a knowledge about internal layout of
239 	 * system catalogs. OID of pg_largeobject_metadata and loid of
240 	 * pg_largeobject are same value, so there are no actual differences here.
241 	 */
242 	recordDependencyOnOwner(LargeObjectRelationId,
243 							lobjId_new, GetUserId());
244 
245 	/* Post creation hook for new large object */
246 	InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
247 
248 	/*
249 	 * Advance command counter to make new tuple visible to later operations.
250 	 */
251 	CommandCounterIncrement();
252 
253 	return lobjId_new;
254 }
255 
256 /*
257  *	inv_open -- access an existing large object.
258  *
259  * Returns a large object descriptor, appropriately filled in.
260  * The descriptor and subsidiary data are allocated in the specified
261  * memory context, which must be suitably long-lived for the caller's
262  * purposes.  If the returned descriptor has a snapshot associated
263  * with it, the caller must ensure that it also lives long enough,
264  * e.g. by calling RegisterSnapshotOnOwner
265  */
266 LargeObjectDesc *
inv_open(Oid lobjId,int flags,MemoryContext mcxt)267 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
268 {
269 	LargeObjectDesc *retval;
270 	Snapshot	snapshot = NULL;
271 	int			descflags = 0;
272 
273 	if (flags & INV_WRITE)
274 	{
275 		snapshot = NULL;		/* instantaneous MVCC snapshot */
276 		descflags = IFS_WRLOCK | IFS_RDLOCK;
277 	}
278 	else if (flags & INV_READ)
279 	{
280 		snapshot = GetActiveSnapshot();
281 		descflags = IFS_RDLOCK;
282 	}
283 	else
284 		ereport(ERROR,
285 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
286 				 errmsg("invalid flags for opening a large object: %d",
287 						flags)));
288 
289 	/* Can't use LargeObjectExists here because we need to specify snapshot */
290 	if (!myLargeObjectExists(lobjId, snapshot))
291 		ereport(ERROR,
292 				(errcode(ERRCODE_UNDEFINED_OBJECT),
293 				 errmsg("large object %u does not exist", lobjId)));
294 
295 	/* OK to create a descriptor */
296 	retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
297 													sizeof(LargeObjectDesc));
298 	retval->id = lobjId;
299 	retval->offset = 0;
300 	retval->flags = descflags;
301 
302 	/* caller sets if needed, not used by the functions in this file */
303 	retval->subid = InvalidSubTransactionId;
304 
305 	/*
306 	 * The snapshot (if any) is just the currently active snapshot.  The
307 	 * caller will replace it with a longer-lived copy if needed.
308 	 */
309 	retval->snapshot = snapshot;
310 	retval->flags = descflags;
311 
312 	return retval;
313 }
314 
315 /*
316  * Closes a large object descriptor previously made by inv_open(), and
317  * releases the long-term memory used by it.
318  */
319 void
inv_close(LargeObjectDesc * obj_desc)320 inv_close(LargeObjectDesc *obj_desc)
321 {
322 	Assert(PointerIsValid(obj_desc));
323 	pfree(obj_desc);
324 }
325 
326 /*
327  * Destroys an existing large object (not to be confused with a descriptor!)
328  *
329  * returns -1 if failed
330  */
331 int
inv_drop(Oid lobjId)332 inv_drop(Oid lobjId)
333 {
334 	ObjectAddress object;
335 
336 	/*
337 	 * Delete any comments and dependencies on the large object
338 	 */
339 	object.classId = LargeObjectRelationId;
340 	object.objectId = lobjId;
341 	object.objectSubId = 0;
342 	performDeletion(&object, DROP_CASCADE, 0);
343 
344 	/*
345 	 * Advance command counter so that tuple removal will be seen by later
346 	 * large-object operations in this transaction.
347 	 */
348 	CommandCounterIncrement();
349 
350 	return 1;
351 }
352 
353 /*
354  * Determine size of a large object
355  *
356  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
357  * the offset of the last byte + 1.
358  */
359 static uint64
inv_getsize(LargeObjectDesc * obj_desc)360 inv_getsize(LargeObjectDesc *obj_desc)
361 {
362 	uint64		lastbyte = 0;
363 	ScanKeyData skey[1];
364 	SysScanDesc sd;
365 	HeapTuple	tuple;
366 
367 	Assert(PointerIsValid(obj_desc));
368 
369 	open_lo_relation();
370 
371 	ScanKeyInit(&skey[0],
372 				Anum_pg_largeobject_loid,
373 				BTEqualStrategyNumber, F_OIDEQ,
374 				ObjectIdGetDatum(obj_desc->id));
375 
376 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
377 									obj_desc->snapshot, 1, skey);
378 
379 	/*
380 	 * Because the pg_largeobject index is on both loid and pageno, but we
381 	 * constrain only loid, a backwards scan should visit all pages of the
382 	 * large object in reverse pageno order.  So, it's sufficient to examine
383 	 * the first valid tuple (== last valid page).
384 	 */
385 	tuple = systable_getnext_ordered(sd, BackwardScanDirection);
386 	if (HeapTupleIsValid(tuple))
387 	{
388 		Form_pg_largeobject data;
389 		bytea	   *datafield;
390 		int			len;
391 		bool		pfreeit;
392 
393 		if (HeapTupleHasNulls(tuple))	/* paranoia */
394 			elog(ERROR, "null field found in pg_largeobject");
395 		data = (Form_pg_largeobject) GETSTRUCT(tuple);
396 		getdatafield(data, &datafield, &len, &pfreeit);
397 		lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
398 		if (pfreeit)
399 			pfree(datafield);
400 	}
401 
402 	systable_endscan_ordered(sd);
403 
404 	return lastbyte;
405 }
406 
407 int64
inv_seek(LargeObjectDesc * obj_desc,int64 offset,int whence)408 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
409 {
410 	int64		newoffset;
411 
412 	Assert(PointerIsValid(obj_desc));
413 
414 	/*
415 	 * Note: overflow in the additions is possible, but since we will reject
416 	 * negative results, we don't need any extra test for that.
417 	 */
418 	switch (whence)
419 	{
420 		case SEEK_SET:
421 			newoffset = offset;
422 			break;
423 		case SEEK_CUR:
424 			newoffset = obj_desc->offset + offset;
425 			break;
426 		case SEEK_END:
427 			newoffset = inv_getsize(obj_desc) + offset;
428 			break;
429 		default:
430 			ereport(ERROR,
431 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
432 					 errmsg("invalid whence setting: %d", whence)));
433 			newoffset = 0;		/* keep compiler quiet */
434 			break;
435 	}
436 
437 	/*
438 	 * use errmsg_internal here because we don't want to expose INT64_FORMAT
439 	 * in translatable strings; doing better is not worth the trouble
440 	 */
441 	if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
442 		ereport(ERROR,
443 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
444 				 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
445 								 newoffset)));
446 
447 	obj_desc->offset = newoffset;
448 	return newoffset;
449 }
450 
451 int64
inv_tell(LargeObjectDesc * obj_desc)452 inv_tell(LargeObjectDesc *obj_desc)
453 {
454 	Assert(PointerIsValid(obj_desc));
455 
456 	return obj_desc->offset;
457 }
458 
459 int
inv_read(LargeObjectDesc * obj_desc,char * buf,int nbytes)460 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
461 {
462 	int			nread = 0;
463 	int64		n;
464 	int64		off;
465 	int			len;
466 	int32		pageno = (int32) (obj_desc->offset / LOBLKSIZE);
467 	uint64		pageoff;
468 	ScanKeyData skey[2];
469 	SysScanDesc sd;
470 	HeapTuple	tuple;
471 
472 	Assert(PointerIsValid(obj_desc));
473 	Assert(buf != NULL);
474 
475 	if (nbytes <= 0)
476 		return 0;
477 
478 	open_lo_relation();
479 
480 	ScanKeyInit(&skey[0],
481 				Anum_pg_largeobject_loid,
482 				BTEqualStrategyNumber, F_OIDEQ,
483 				ObjectIdGetDatum(obj_desc->id));
484 
485 	ScanKeyInit(&skey[1],
486 				Anum_pg_largeobject_pageno,
487 				BTGreaterEqualStrategyNumber, F_INT4GE,
488 				Int32GetDatum(pageno));
489 
490 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
491 									obj_desc->snapshot, 2, skey);
492 
493 	while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
494 	{
495 		Form_pg_largeobject data;
496 		bytea	   *datafield;
497 		bool		pfreeit;
498 
499 		if (HeapTupleHasNulls(tuple))	/* paranoia */
500 			elog(ERROR, "null field found in pg_largeobject");
501 		data = (Form_pg_largeobject) GETSTRUCT(tuple);
502 
503 		/*
504 		 * We expect the indexscan will deliver pages in order.  However,
505 		 * there may be missing pages if the LO contains unwritten "holes". We
506 		 * want missing sections to read out as zeroes.
507 		 */
508 		pageoff = ((uint64) data->pageno) * LOBLKSIZE;
509 		if (pageoff > obj_desc->offset)
510 		{
511 			n = pageoff - obj_desc->offset;
512 			n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
513 			MemSet(buf + nread, 0, n);
514 			nread += n;
515 			obj_desc->offset += n;
516 		}
517 
518 		if (nread < nbytes)
519 		{
520 			Assert(obj_desc->offset >= pageoff);
521 			off = (int) (obj_desc->offset - pageoff);
522 			Assert(off >= 0 && off < LOBLKSIZE);
523 
524 			getdatafield(data, &datafield, &len, &pfreeit);
525 			if (len > off)
526 			{
527 				n = len - off;
528 				n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
529 				memcpy(buf + nread, VARDATA(datafield) + off, n);
530 				nread += n;
531 				obj_desc->offset += n;
532 			}
533 			if (pfreeit)
534 				pfree(datafield);
535 		}
536 
537 		if (nread >= nbytes)
538 			break;
539 	}
540 
541 	systable_endscan_ordered(sd);
542 
543 	return nread;
544 }
545 
546 int
inv_write(LargeObjectDesc * obj_desc,const char * buf,int nbytes)547 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
548 {
549 	int			nwritten = 0;
550 	int			n;
551 	int			off;
552 	int			len;
553 	int32		pageno = (int32) (obj_desc->offset / LOBLKSIZE);
554 	ScanKeyData skey[2];
555 	SysScanDesc sd;
556 	HeapTuple	oldtuple;
557 	Form_pg_largeobject olddata;
558 	bool		neednextpage;
559 	bytea	   *datafield;
560 	bool		pfreeit;
561 	union
562 	{
563 		bytea		hdr;
564 		/* this is to make the union big enough for a LO data chunk: */
565 		char		data[LOBLKSIZE + VARHDRSZ];
566 		/* ensure union is aligned well enough: */
567 		int32		align_it;
568 	}			workbuf;
569 	char	   *workb = VARDATA(&workbuf.hdr);
570 	HeapTuple	newtup;
571 	Datum		values[Natts_pg_largeobject];
572 	bool		nulls[Natts_pg_largeobject];
573 	bool		replace[Natts_pg_largeobject];
574 	CatalogIndexState indstate;
575 
576 	Assert(PointerIsValid(obj_desc));
577 	Assert(buf != NULL);
578 
579 	/* enforce writability because snapshot is probably wrong otherwise */
580 	Assert(obj_desc->flags & IFS_WRLOCK);
581 
582 	if (nbytes <= 0)
583 		return 0;
584 
585 	/* this addition can't overflow because nbytes is only int32 */
586 	if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
587 		ereport(ERROR,
588 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
589 				 errmsg("invalid large object write request size: %d",
590 						nbytes)));
591 
592 	open_lo_relation();
593 
594 	indstate = CatalogOpenIndexes(lo_heap_r);
595 
596 	ScanKeyInit(&skey[0],
597 				Anum_pg_largeobject_loid,
598 				BTEqualStrategyNumber, F_OIDEQ,
599 				ObjectIdGetDatum(obj_desc->id));
600 
601 	ScanKeyInit(&skey[1],
602 				Anum_pg_largeobject_pageno,
603 				BTGreaterEqualStrategyNumber, F_INT4GE,
604 				Int32GetDatum(pageno));
605 
606 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
607 									obj_desc->snapshot, 2, skey);
608 
609 	oldtuple = NULL;
610 	olddata = NULL;
611 	neednextpage = true;
612 
613 	while (nwritten < nbytes)
614 	{
615 		/*
616 		 * If possible, get next pre-existing page of the LO.  We expect the
617 		 * indexscan will deliver these in order --- but there may be holes.
618 		 */
619 		if (neednextpage)
620 		{
621 			if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
622 			{
623 				if (HeapTupleHasNulls(oldtuple))	/* paranoia */
624 					elog(ERROR, "null field found in pg_largeobject");
625 				olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
626 				Assert(olddata->pageno >= pageno);
627 			}
628 			neednextpage = false;
629 		}
630 
631 		/*
632 		 * If we have a pre-existing page, see if it is the page we want to
633 		 * write, or a later one.
634 		 */
635 		if (olddata != NULL && olddata->pageno == pageno)
636 		{
637 			/*
638 			 * Update an existing page with fresh data.
639 			 *
640 			 * First, load old data into workbuf
641 			 */
642 			getdatafield(olddata, &datafield, &len, &pfreeit);
643 			memcpy(workb, VARDATA(datafield), len);
644 			if (pfreeit)
645 				pfree(datafield);
646 
647 			/*
648 			 * Fill any hole
649 			 */
650 			off = (int) (obj_desc->offset % LOBLKSIZE);
651 			if (off > len)
652 				MemSet(workb + len, 0, off - len);
653 
654 			/*
655 			 * Insert appropriate portion of new data
656 			 */
657 			n = LOBLKSIZE - off;
658 			n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
659 			memcpy(workb + off, buf + nwritten, n);
660 			nwritten += n;
661 			obj_desc->offset += n;
662 			off += n;
663 			/* compute valid length of new page */
664 			len = (len >= off) ? len : off;
665 			SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
666 
667 			/*
668 			 * Form and insert updated tuple
669 			 */
670 			memset(values, 0, sizeof(values));
671 			memset(nulls, false, sizeof(nulls));
672 			memset(replace, false, sizeof(replace));
673 			values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
674 			replace[Anum_pg_largeobject_data - 1] = true;
675 			newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
676 									   values, nulls, replace);
677 			CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
678 									   indstate);
679 			heap_freetuple(newtup);
680 
681 			/*
682 			 * We're done with this old page.
683 			 */
684 			oldtuple = NULL;
685 			olddata = NULL;
686 			neednextpage = true;
687 		}
688 		else
689 		{
690 			/*
691 			 * Write a brand new page.
692 			 *
693 			 * First, fill any hole
694 			 */
695 			off = (int) (obj_desc->offset % LOBLKSIZE);
696 			if (off > 0)
697 				MemSet(workb, 0, off);
698 
699 			/*
700 			 * Insert appropriate portion of new data
701 			 */
702 			n = LOBLKSIZE - off;
703 			n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
704 			memcpy(workb + off, buf + nwritten, n);
705 			nwritten += n;
706 			obj_desc->offset += n;
707 			/* compute valid length of new page */
708 			len = off + n;
709 			SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
710 
711 			/*
712 			 * Form and insert updated tuple
713 			 */
714 			memset(values, 0, sizeof(values));
715 			memset(nulls, false, sizeof(nulls));
716 			values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
717 			values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
718 			values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
719 			newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
720 			CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
721 			heap_freetuple(newtup);
722 		}
723 		pageno++;
724 	}
725 
726 	systable_endscan_ordered(sd);
727 
728 	CatalogCloseIndexes(indstate);
729 
730 	/*
731 	 * Advance command counter so that my tuple updates will be seen by later
732 	 * large-object operations in this transaction.
733 	 */
734 	CommandCounterIncrement();
735 
736 	return nwritten;
737 }
738 
739 void
inv_truncate(LargeObjectDesc * obj_desc,int64 len)740 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
741 {
742 	int32		pageno = (int32) (len / LOBLKSIZE);
743 	int32		off;
744 	ScanKeyData skey[2];
745 	SysScanDesc sd;
746 	HeapTuple	oldtuple;
747 	Form_pg_largeobject olddata;
748 	union
749 	{
750 		bytea		hdr;
751 		/* this is to make the union big enough for a LO data chunk: */
752 		char		data[LOBLKSIZE + VARHDRSZ];
753 		/* ensure union is aligned well enough: */
754 		int32		align_it;
755 	}			workbuf;
756 	char	   *workb = VARDATA(&workbuf.hdr);
757 	HeapTuple	newtup;
758 	Datum		values[Natts_pg_largeobject];
759 	bool		nulls[Natts_pg_largeobject];
760 	bool		replace[Natts_pg_largeobject];
761 	CatalogIndexState indstate;
762 
763 	Assert(PointerIsValid(obj_desc));
764 
765 	/* enforce writability because snapshot is probably wrong otherwise */
766 	Assert(obj_desc->flags & IFS_WRLOCK);
767 
768 	/*
769 	 * use errmsg_internal here because we don't want to expose INT64_FORMAT
770 	 * in translatable strings; doing better is not worth the trouble
771 	 */
772 	if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
773 		ereport(ERROR,
774 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
775 				 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
776 								 len)));
777 
778 	open_lo_relation();
779 
780 	indstate = CatalogOpenIndexes(lo_heap_r);
781 
782 	/*
783 	 * Set up to find all pages with desired loid and pageno >= target
784 	 */
785 	ScanKeyInit(&skey[0],
786 				Anum_pg_largeobject_loid,
787 				BTEqualStrategyNumber, F_OIDEQ,
788 				ObjectIdGetDatum(obj_desc->id));
789 
790 	ScanKeyInit(&skey[1],
791 				Anum_pg_largeobject_pageno,
792 				BTGreaterEqualStrategyNumber, F_INT4GE,
793 				Int32GetDatum(pageno));
794 
795 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
796 									obj_desc->snapshot, 2, skey);
797 
798 	/*
799 	 * If possible, get the page the truncation point is in. The truncation
800 	 * point may be beyond the end of the LO or in a hole.
801 	 */
802 	olddata = NULL;
803 	if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
804 	{
805 		if (HeapTupleHasNulls(oldtuple))	/* paranoia */
806 			elog(ERROR, "null field found in pg_largeobject");
807 		olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
808 		Assert(olddata->pageno >= pageno);
809 	}
810 
811 	/*
812 	 * If we found the page of the truncation point we need to truncate the
813 	 * data in it.  Otherwise if we're in a hole, we need to create a page to
814 	 * mark the end of data.
815 	 */
816 	if (olddata != NULL && olddata->pageno == pageno)
817 	{
818 		/* First, load old data into workbuf */
819 		bytea	   *datafield;
820 		int			pagelen;
821 		bool		pfreeit;
822 
823 		getdatafield(olddata, &datafield, &pagelen, &pfreeit);
824 		memcpy(workb, VARDATA(datafield), pagelen);
825 		if (pfreeit)
826 			pfree(datafield);
827 
828 		/*
829 		 * Fill any hole
830 		 */
831 		off = len % LOBLKSIZE;
832 		if (off > pagelen)
833 			MemSet(workb + pagelen, 0, off - pagelen);
834 
835 		/* compute length of new page */
836 		SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
837 
838 		/*
839 		 * Form and insert updated tuple
840 		 */
841 		memset(values, 0, sizeof(values));
842 		memset(nulls, false, sizeof(nulls));
843 		memset(replace, false, sizeof(replace));
844 		values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
845 		replace[Anum_pg_largeobject_data - 1] = true;
846 		newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
847 								   values, nulls, replace);
848 		CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
849 								   indstate);
850 		heap_freetuple(newtup);
851 	}
852 	else
853 	{
854 		/*
855 		 * If the first page we found was after the truncation point, we're in
856 		 * a hole that we'll fill, but we need to delete the later page
857 		 * because the loop below won't visit it again.
858 		 */
859 		if (olddata != NULL)
860 		{
861 			Assert(olddata->pageno > pageno);
862 			CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
863 		}
864 
865 		/*
866 		 * Write a brand new page.
867 		 *
868 		 * Fill the hole up to the truncation point
869 		 */
870 		off = len % LOBLKSIZE;
871 		if (off > 0)
872 			MemSet(workb, 0, off);
873 
874 		/* compute length of new page */
875 		SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
876 
877 		/*
878 		 * Form and insert new tuple
879 		 */
880 		memset(values, 0, sizeof(values));
881 		memset(nulls, false, sizeof(nulls));
882 		values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
883 		values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
884 		values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
885 		newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
886 		CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
887 		heap_freetuple(newtup);
888 	}
889 
890 	/*
891 	 * Delete any pages after the truncation point.  If the initial search
892 	 * didn't find a page, then of course there's nothing more to do.
893 	 */
894 	if (olddata != NULL)
895 	{
896 		while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
897 		{
898 			CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
899 		}
900 	}
901 
902 	systable_endscan_ordered(sd);
903 
904 	CatalogCloseIndexes(indstate);
905 
906 	/*
907 	 * Advance command counter so that tuple updates will be seen by later
908 	 * large-object operations in this transaction.
909 	 */
910 	CommandCounterIncrement();
911 }
912