1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c
4  *	  routines for manipulating inversion fs large objects. This file
5  *	  contains the user-level large object application interface routines.
6  *
7  *
8  * Note: we access pg_largeobject.data using its C struct declaration.
9  * This is safe because it immediately follows pageno which is an int4 field,
10  * and therefore the data field will always be 4-byte aligned, even if it
11  * is in the short 1-byte-header format.  We have to detoast it since it's
12  * quite likely to be in compressed or short format.  We also need to check
13  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14  *
15  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16  * does most of the backend code.  We expect that CurrentMemoryContext will
17  * be a short-lived context.  Data that must persist across function calls
18  * is kept either in CacheMemoryContext (the Relation structs) or in the
19  * memory context given to inv_open (for LargeObjectDesc structs).
20  *
21  *
22  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
23  * Portions Copyright (c) 1994, Regents of the University of California
24  *
25  *
26  * IDENTIFICATION
27  *	  src/backend/storage/large_object/inv_api.c
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32 
33 #include <limits.h>
34 
35 #include "access/genam.h"
36 #include "access/sysattr.h"
37 #include "access/table.h"
38 #include "access/tuptoaster.h"
39 #include "access/xact.h"
40 #include "catalog/dependency.h"
41 #include "catalog/indexing.h"
42 #include "catalog/objectaccess.h"
43 #include "catalog/pg_largeobject.h"
44 #include "catalog/pg_largeobject_metadata.h"
45 #include "libpq/libpq-fs.h"
46 #include "miscadmin.h"
47 #include "storage/large_object.h"
48 #include "utils/fmgroids.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51 
52 
53 /*
54  * GUC: backwards-compatibility flag to suppress LO permission checks
55  */
56 bool		lo_compat_privileges;
57 
58 /*
59  * All accesses to pg_largeobject and its index make use of a single Relation
60  * reference, so that we only need to open pg_relation once per transaction.
61  * To avoid problems when the first such reference occurs inside a
62  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
63  * the Relation reference to TopTransactionResourceOwner.
64  */
65 static Relation lo_heap_r = NULL;
66 static Relation lo_index_r = NULL;
67 
68 
69 /*
70  * Open pg_largeobject and its index, if not already done in current xact
71  */
72 static void
open_lo_relation(void)73 open_lo_relation(void)
74 {
75 	ResourceOwner currentOwner;
76 
77 	if (lo_heap_r && lo_index_r)
78 		return;					/* already open in current xact */
79 
80 	/* Arrange for the top xact to own these relation references */
81 	currentOwner = CurrentResourceOwner;
82 	CurrentResourceOwner = TopTransactionResourceOwner;
83 
84 	/* Use RowExclusiveLock since we might either read or write */
85 	if (lo_heap_r == NULL)
86 		lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
87 	if (lo_index_r == NULL)
88 		lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
89 
90 	CurrentResourceOwner = currentOwner;
91 }
92 
93 /*
94  * Clean up at main transaction end
95  */
96 void
close_lo_relation(bool isCommit)97 close_lo_relation(bool isCommit)
98 {
99 	if (lo_heap_r || lo_index_r)
100 	{
101 		/*
102 		 * Only bother to close if committing; else abort cleanup will handle
103 		 * it
104 		 */
105 		if (isCommit)
106 		{
107 			ResourceOwner currentOwner;
108 
109 			currentOwner = CurrentResourceOwner;
110 			CurrentResourceOwner = TopTransactionResourceOwner;
111 
112 			if (lo_index_r)
113 				index_close(lo_index_r, NoLock);
114 			if (lo_heap_r)
115 				table_close(lo_heap_r, NoLock);
116 
117 			CurrentResourceOwner = currentOwner;
118 		}
119 		lo_heap_r = NULL;
120 		lo_index_r = NULL;
121 	}
122 }
123 
124 
125 /*
126  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
127  * read with can be specified.
128  */
129 static bool
myLargeObjectExists(Oid loid,Snapshot snapshot)130 myLargeObjectExists(Oid loid, Snapshot snapshot)
131 {
132 	Relation	pg_lo_meta;
133 	ScanKeyData skey[1];
134 	SysScanDesc sd;
135 	HeapTuple	tuple;
136 	bool		retval = false;
137 
138 	ScanKeyInit(&skey[0],
139 				Anum_pg_largeobject_metadata_oid,
140 				BTEqualStrategyNumber, F_OIDEQ,
141 				ObjectIdGetDatum(loid));
142 
143 	pg_lo_meta = table_open(LargeObjectMetadataRelationId,
144 							AccessShareLock);
145 
146 	sd = systable_beginscan(pg_lo_meta,
147 							LargeObjectMetadataOidIndexId, true,
148 							snapshot, 1, skey);
149 
150 	tuple = systable_getnext(sd);
151 	if (HeapTupleIsValid(tuple))
152 		retval = true;
153 
154 	systable_endscan(sd);
155 
156 	table_close(pg_lo_meta, AccessShareLock);
157 
158 	return retval;
159 }
160 
161 
162 /*
163  * Extract data field from a pg_largeobject tuple, detoasting if needed
164  * and verifying that the length is sane.  Returns data pointer (a bytea *),
165  * data length, and an indication of whether to pfree the data pointer.
166  */
167 static void
getdatafield(Form_pg_largeobject tuple,bytea ** pdatafield,int * plen,bool * pfreeit)168 getdatafield(Form_pg_largeobject tuple,
169 			 bytea **pdatafield,
170 			 int *plen,
171 			 bool *pfreeit)
172 {
173 	bytea	   *datafield;
174 	int			len;
175 	bool		freeit;
176 
177 	datafield = &(tuple->data); /* see note at top of file */
178 	freeit = false;
179 	if (VARATT_IS_EXTENDED(datafield))
180 	{
181 		datafield = (bytea *)
182 			heap_tuple_untoast_attr((struct varlena *) datafield);
183 		freeit = true;
184 	}
185 	len = VARSIZE(datafield) - VARHDRSZ;
186 	if (len < 0 || len > LOBLKSIZE)
187 		ereport(ERROR,
188 				(errcode(ERRCODE_DATA_CORRUPTED),
189 				 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
190 						tuple->loid, tuple->pageno, len)));
191 	*pdatafield = datafield;
192 	*plen = len;
193 	*pfreeit = freeit;
194 }
195 
196 
197 /*
198  *	inv_create -- create a new large object
199  *
200  *	Arguments:
201  *	  lobjId - OID to use for new large object, or InvalidOid to pick one
202  *
203  *	Returns:
204  *	  OID of new object
205  *
206  * If lobjId is not InvalidOid, then an error occurs if the OID is already
207  * in use.
208  */
209 Oid
inv_create(Oid lobjId)210 inv_create(Oid lobjId)
211 {
212 	Oid			lobjId_new;
213 
214 	/*
215 	 * Create a new largeobject with empty data pages
216 	 */
217 	lobjId_new = LargeObjectCreate(lobjId);
218 
219 	/*
220 	 * dependency on the owner of largeobject
221 	 *
222 	 * The reason why we use LargeObjectRelationId instead of
223 	 * LargeObjectMetadataRelationId here is to provide backward compatibility
224 	 * to the applications which utilize a knowledge about internal layout of
225 	 * system catalogs. OID of pg_largeobject_metadata and loid of
226 	 * pg_largeobject are same value, so there are no actual differences here.
227 	 */
228 	recordDependencyOnOwner(LargeObjectRelationId,
229 							lobjId_new, GetUserId());
230 
231 	/* Post creation hook for new large object */
232 	InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
233 
234 	/*
235 	 * Advance command counter to make new tuple visible to later operations.
236 	 */
237 	CommandCounterIncrement();
238 
239 	return lobjId_new;
240 }
241 
242 /*
243  *	inv_open -- access an existing large object.
244  *
245  * Returns a large object descriptor, appropriately filled in.
246  * The descriptor and subsidiary data are allocated in the specified
247  * memory context, which must be suitably long-lived for the caller's
248  * purposes.  If the returned descriptor has a snapshot associated
249  * with it, the caller must ensure that it also lives long enough,
250  * e.g. by calling RegisterSnapshotOnOwner
251  */
252 LargeObjectDesc *
inv_open(Oid lobjId,int flags,MemoryContext mcxt)253 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
254 {
255 	LargeObjectDesc *retval;
256 	Snapshot	snapshot = NULL;
257 	int			descflags = 0;
258 
259 	/*
260 	 * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
261 	 * | INV_READ), the caller being allowed to read the large object
262 	 * descriptor in either case.
263 	 */
264 	if (flags & INV_WRITE)
265 		descflags |= IFS_WRLOCK | IFS_RDLOCK;
266 	if (flags & INV_READ)
267 		descflags |= IFS_RDLOCK;
268 
269 	if (descflags == 0)
270 		ereport(ERROR,
271 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
272 				 errmsg("invalid flags for opening a large object: %d",
273 						flags)));
274 
275 	/* Get snapshot.  If write is requested, use an instantaneous snapshot. */
276 	if (descflags & IFS_WRLOCK)
277 		snapshot = NULL;
278 	else
279 		snapshot = GetActiveSnapshot();
280 
281 	/* Can't use LargeObjectExists here because we need to specify snapshot */
282 	if (!myLargeObjectExists(lobjId, snapshot))
283 		ereport(ERROR,
284 				(errcode(ERRCODE_UNDEFINED_OBJECT),
285 				 errmsg("large object %u does not exist", lobjId)));
286 
287 	/* Apply permission checks, again specifying snapshot */
288 	if ((descflags & IFS_RDLOCK) != 0)
289 	{
290 		if (!lo_compat_privileges &&
291 			pg_largeobject_aclcheck_snapshot(lobjId,
292 											 GetUserId(),
293 											 ACL_SELECT,
294 											 snapshot) != ACLCHECK_OK)
295 			ereport(ERROR,
296 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
297 					 errmsg("permission denied for large object %u",
298 							lobjId)));
299 	}
300 	if ((descflags & IFS_WRLOCK) != 0)
301 	{
302 		if (!lo_compat_privileges &&
303 			pg_largeobject_aclcheck_snapshot(lobjId,
304 											 GetUserId(),
305 											 ACL_UPDATE,
306 											 snapshot) != ACLCHECK_OK)
307 			ereport(ERROR,
308 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
309 					 errmsg("permission denied for large object %u",
310 							lobjId)));
311 	}
312 
313 	/* OK to create a descriptor */
314 	retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
315 													sizeof(LargeObjectDesc));
316 	retval->id = lobjId;
317 	retval->offset = 0;
318 	retval->flags = descflags;
319 
320 	/* caller sets if needed, not used by the functions in this file */
321 	retval->subid = InvalidSubTransactionId;
322 
323 	/*
324 	 * The snapshot (if any) is just the currently active snapshot.  The
325 	 * caller will replace it with a longer-lived copy if needed.
326 	 */
327 	retval->snapshot = snapshot;
328 
329 	return retval;
330 }
331 
332 /*
333  * Closes a large object descriptor previously made by inv_open(), and
334  * releases the long-term memory used by it.
335  */
336 void
inv_close(LargeObjectDesc * obj_desc)337 inv_close(LargeObjectDesc *obj_desc)
338 {
339 	Assert(PointerIsValid(obj_desc));
340 	pfree(obj_desc);
341 }
342 
343 /*
344  * Destroys an existing large object (not to be confused with a descriptor!)
345  *
346  * Note we expect caller to have done any required permissions check.
347  */
348 int
inv_drop(Oid lobjId)349 inv_drop(Oid lobjId)
350 {
351 	ObjectAddress object;
352 
353 	/*
354 	 * Delete any comments and dependencies on the large object
355 	 */
356 	object.classId = LargeObjectRelationId;
357 	object.objectId = lobjId;
358 	object.objectSubId = 0;
359 	performDeletion(&object, DROP_CASCADE, 0);
360 
361 	/*
362 	 * Advance command counter so that tuple removal will be seen by later
363 	 * large-object operations in this transaction.
364 	 */
365 	CommandCounterIncrement();
366 
367 	/* For historical reasons, we always return 1 on success. */
368 	return 1;
369 }
370 
371 /*
372  * Determine size of a large object
373  *
374  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
375  * the offset of the last byte + 1.
376  */
377 static uint64
inv_getsize(LargeObjectDesc * obj_desc)378 inv_getsize(LargeObjectDesc *obj_desc)
379 {
380 	uint64		lastbyte = 0;
381 	ScanKeyData skey[1];
382 	SysScanDesc sd;
383 	HeapTuple	tuple;
384 
385 	Assert(PointerIsValid(obj_desc));
386 
387 	open_lo_relation();
388 
389 	ScanKeyInit(&skey[0],
390 				Anum_pg_largeobject_loid,
391 				BTEqualStrategyNumber, F_OIDEQ,
392 				ObjectIdGetDatum(obj_desc->id));
393 
394 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
395 									obj_desc->snapshot, 1, skey);
396 
397 	/*
398 	 * Because the pg_largeobject index is on both loid and pageno, but we
399 	 * constrain only loid, a backwards scan should visit all pages of the
400 	 * large object in reverse pageno order.  So, it's sufficient to examine
401 	 * the first valid tuple (== last valid page).
402 	 */
403 	tuple = systable_getnext_ordered(sd, BackwardScanDirection);
404 	if (HeapTupleIsValid(tuple))
405 	{
406 		Form_pg_largeobject data;
407 		bytea	   *datafield;
408 		int			len;
409 		bool		pfreeit;
410 
411 		if (HeapTupleHasNulls(tuple))	/* paranoia */
412 			elog(ERROR, "null field found in pg_largeobject");
413 		data = (Form_pg_largeobject) GETSTRUCT(tuple);
414 		getdatafield(data, &datafield, &len, &pfreeit);
415 		lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
416 		if (pfreeit)
417 			pfree(datafield);
418 	}
419 
420 	systable_endscan_ordered(sd);
421 
422 	return lastbyte;
423 }
424 
425 int64
inv_seek(LargeObjectDesc * obj_desc,int64 offset,int whence)426 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
427 {
428 	int64		newoffset;
429 
430 	Assert(PointerIsValid(obj_desc));
431 
432 	/*
433 	 * We allow seek/tell if you have either read or write permission, so no
434 	 * need for a permission check here.
435 	 */
436 
437 	/*
438 	 * Note: overflow in the additions is possible, but since we will reject
439 	 * negative results, we don't need any extra test for that.
440 	 */
441 	switch (whence)
442 	{
443 		case SEEK_SET:
444 			newoffset = offset;
445 			break;
446 		case SEEK_CUR:
447 			newoffset = obj_desc->offset + offset;
448 			break;
449 		case SEEK_END:
450 			newoffset = inv_getsize(obj_desc) + offset;
451 			break;
452 		default:
453 			ereport(ERROR,
454 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455 					 errmsg("invalid whence setting: %d", whence)));
456 			newoffset = 0;		/* keep compiler quiet */
457 			break;
458 	}
459 
460 	/*
461 	 * use errmsg_internal here because we don't want to expose INT64_FORMAT
462 	 * in translatable strings; doing better is not worth the trouble
463 	 */
464 	if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
465 		ereport(ERROR,
466 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
467 				 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
468 								 newoffset)));
469 
470 	obj_desc->offset = newoffset;
471 	return newoffset;
472 }
473 
474 int64
inv_tell(LargeObjectDesc * obj_desc)475 inv_tell(LargeObjectDesc *obj_desc)
476 {
477 	Assert(PointerIsValid(obj_desc));
478 
479 	/*
480 	 * We allow seek/tell if you have either read or write permission, so no
481 	 * need for a permission check here.
482 	 */
483 
484 	return obj_desc->offset;
485 }
486 
487 int
inv_read(LargeObjectDesc * obj_desc,char * buf,int nbytes)488 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
489 {
490 	int			nread = 0;
491 	int64		n;
492 	int64		off;
493 	int			len;
494 	int32		pageno = (int32) (obj_desc->offset / LOBLKSIZE);
495 	uint64		pageoff;
496 	ScanKeyData skey[2];
497 	SysScanDesc sd;
498 	HeapTuple	tuple;
499 
500 	Assert(PointerIsValid(obj_desc));
501 	Assert(buf != NULL);
502 
503 	if ((obj_desc->flags & IFS_RDLOCK) == 0)
504 		ereport(ERROR,
505 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
506 				 errmsg("permission denied for large object %u",
507 						obj_desc->id)));
508 
509 	if (nbytes <= 0)
510 		return 0;
511 
512 	open_lo_relation();
513 
514 	ScanKeyInit(&skey[0],
515 				Anum_pg_largeobject_loid,
516 				BTEqualStrategyNumber, F_OIDEQ,
517 				ObjectIdGetDatum(obj_desc->id));
518 
519 	ScanKeyInit(&skey[1],
520 				Anum_pg_largeobject_pageno,
521 				BTGreaterEqualStrategyNumber, F_INT4GE,
522 				Int32GetDatum(pageno));
523 
524 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
525 									obj_desc->snapshot, 2, skey);
526 
527 	while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
528 	{
529 		Form_pg_largeobject data;
530 		bytea	   *datafield;
531 		bool		pfreeit;
532 
533 		if (HeapTupleHasNulls(tuple))	/* paranoia */
534 			elog(ERROR, "null field found in pg_largeobject");
535 		data = (Form_pg_largeobject) GETSTRUCT(tuple);
536 
537 		/*
538 		 * We expect the indexscan will deliver pages in order.  However,
539 		 * there may be missing pages if the LO contains unwritten "holes". We
540 		 * want missing sections to read out as zeroes.
541 		 */
542 		pageoff = ((uint64) data->pageno) * LOBLKSIZE;
543 		if (pageoff > obj_desc->offset)
544 		{
545 			n = pageoff - obj_desc->offset;
546 			n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
547 			MemSet(buf + nread, 0, n);
548 			nread += n;
549 			obj_desc->offset += n;
550 		}
551 
552 		if (nread < nbytes)
553 		{
554 			Assert(obj_desc->offset >= pageoff);
555 			off = (int) (obj_desc->offset - pageoff);
556 			Assert(off >= 0 && off < LOBLKSIZE);
557 
558 			getdatafield(data, &datafield, &len, &pfreeit);
559 			if (len > off)
560 			{
561 				n = len - off;
562 				n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
563 				memcpy(buf + nread, VARDATA(datafield) + off, n);
564 				nread += n;
565 				obj_desc->offset += n;
566 			}
567 			if (pfreeit)
568 				pfree(datafield);
569 		}
570 
571 		if (nread >= nbytes)
572 			break;
573 	}
574 
575 	systable_endscan_ordered(sd);
576 
577 	return nread;
578 }
579 
580 int
inv_write(LargeObjectDesc * obj_desc,const char * buf,int nbytes)581 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
582 {
583 	int			nwritten = 0;
584 	int			n;
585 	int			off;
586 	int			len;
587 	int32		pageno = (int32) (obj_desc->offset / LOBLKSIZE);
588 	ScanKeyData skey[2];
589 	SysScanDesc sd;
590 	HeapTuple	oldtuple;
591 	Form_pg_largeobject olddata;
592 	bool		neednextpage;
593 	bytea	   *datafield;
594 	bool		pfreeit;
595 	union
596 	{
597 		bytea		hdr;
598 		/* this is to make the union big enough for a LO data chunk: */
599 		char		data[LOBLKSIZE + VARHDRSZ];
600 		/* ensure union is aligned well enough: */
601 		int32		align_it;
602 	}			workbuf;
603 	char	   *workb = VARDATA(&workbuf.hdr);
604 	HeapTuple	newtup;
605 	Datum		values[Natts_pg_largeobject];
606 	bool		nulls[Natts_pg_largeobject];
607 	bool		replace[Natts_pg_largeobject];
608 	CatalogIndexState indstate;
609 
610 	Assert(PointerIsValid(obj_desc));
611 	Assert(buf != NULL);
612 
613 	/* enforce writability because snapshot is probably wrong otherwise */
614 	if ((obj_desc->flags & IFS_WRLOCK) == 0)
615 		ereport(ERROR,
616 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
617 				 errmsg("permission denied for large object %u",
618 						obj_desc->id)));
619 
620 	if (nbytes <= 0)
621 		return 0;
622 
623 	/* this addition can't overflow because nbytes is only int32 */
624 	if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
625 		ereport(ERROR,
626 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
627 				 errmsg("invalid large object write request size: %d",
628 						nbytes)));
629 
630 	open_lo_relation();
631 
632 	indstate = CatalogOpenIndexes(lo_heap_r);
633 
634 	ScanKeyInit(&skey[0],
635 				Anum_pg_largeobject_loid,
636 				BTEqualStrategyNumber, F_OIDEQ,
637 				ObjectIdGetDatum(obj_desc->id));
638 
639 	ScanKeyInit(&skey[1],
640 				Anum_pg_largeobject_pageno,
641 				BTGreaterEqualStrategyNumber, F_INT4GE,
642 				Int32GetDatum(pageno));
643 
644 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
645 									obj_desc->snapshot, 2, skey);
646 
647 	oldtuple = NULL;
648 	olddata = NULL;
649 	neednextpage = true;
650 
651 	while (nwritten < nbytes)
652 	{
653 		/*
654 		 * If possible, get next pre-existing page of the LO.  We expect the
655 		 * indexscan will deliver these in order --- but there may be holes.
656 		 */
657 		if (neednextpage)
658 		{
659 			if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
660 			{
661 				if (HeapTupleHasNulls(oldtuple))	/* paranoia */
662 					elog(ERROR, "null field found in pg_largeobject");
663 				olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
664 				Assert(olddata->pageno >= pageno);
665 			}
666 			neednextpage = false;
667 		}
668 
669 		/*
670 		 * If we have a pre-existing page, see if it is the page we want to
671 		 * write, or a later one.
672 		 */
673 		if (olddata != NULL && olddata->pageno == pageno)
674 		{
675 			/*
676 			 * Update an existing page with fresh data.
677 			 *
678 			 * First, load old data into workbuf
679 			 */
680 			getdatafield(olddata, &datafield, &len, &pfreeit);
681 			memcpy(workb, VARDATA(datafield), len);
682 			if (pfreeit)
683 				pfree(datafield);
684 
685 			/*
686 			 * Fill any hole
687 			 */
688 			off = (int) (obj_desc->offset % LOBLKSIZE);
689 			if (off > len)
690 				MemSet(workb + len, 0, off - len);
691 
692 			/*
693 			 * Insert appropriate portion of new data
694 			 */
695 			n = LOBLKSIZE - off;
696 			n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
697 			memcpy(workb + off, buf + nwritten, n);
698 			nwritten += n;
699 			obj_desc->offset += n;
700 			off += n;
701 			/* compute valid length of new page */
702 			len = (len >= off) ? len : off;
703 			SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
704 
705 			/*
706 			 * Form and insert updated tuple
707 			 */
708 			memset(values, 0, sizeof(values));
709 			memset(nulls, false, sizeof(nulls));
710 			memset(replace, false, sizeof(replace));
711 			values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
712 			replace[Anum_pg_largeobject_data - 1] = true;
713 			newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
714 									   values, nulls, replace);
715 			CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
716 									   indstate);
717 			heap_freetuple(newtup);
718 
719 			/*
720 			 * We're done with this old page.
721 			 */
722 			oldtuple = NULL;
723 			olddata = NULL;
724 			neednextpage = true;
725 		}
726 		else
727 		{
728 			/*
729 			 * Write a brand new page.
730 			 *
731 			 * First, fill any hole
732 			 */
733 			off = (int) (obj_desc->offset % LOBLKSIZE);
734 			if (off > 0)
735 				MemSet(workb, 0, off);
736 
737 			/*
738 			 * Insert appropriate portion of new data
739 			 */
740 			n = LOBLKSIZE - off;
741 			n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
742 			memcpy(workb + off, buf + nwritten, n);
743 			nwritten += n;
744 			obj_desc->offset += n;
745 			/* compute valid length of new page */
746 			len = off + n;
747 			SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
748 
749 			/*
750 			 * Form and insert updated tuple
751 			 */
752 			memset(values, 0, sizeof(values));
753 			memset(nulls, false, sizeof(nulls));
754 			values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
755 			values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
756 			values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
757 			newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
758 			CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
759 			heap_freetuple(newtup);
760 		}
761 		pageno++;
762 	}
763 
764 	systable_endscan_ordered(sd);
765 
766 	CatalogCloseIndexes(indstate);
767 
768 	/*
769 	 * Advance command counter so that my tuple updates will be seen by later
770 	 * large-object operations in this transaction.
771 	 */
772 	CommandCounterIncrement();
773 
774 	return nwritten;
775 }
776 
777 void
inv_truncate(LargeObjectDesc * obj_desc,int64 len)778 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
779 {
780 	int32		pageno = (int32) (len / LOBLKSIZE);
781 	int32		off;
782 	ScanKeyData skey[2];
783 	SysScanDesc sd;
784 	HeapTuple	oldtuple;
785 	Form_pg_largeobject olddata;
786 	union
787 	{
788 		bytea		hdr;
789 		/* this is to make the union big enough for a LO data chunk: */
790 		char		data[LOBLKSIZE + VARHDRSZ];
791 		/* ensure union is aligned well enough: */
792 		int32		align_it;
793 	}			workbuf;
794 	char	   *workb = VARDATA(&workbuf.hdr);
795 	HeapTuple	newtup;
796 	Datum		values[Natts_pg_largeobject];
797 	bool		nulls[Natts_pg_largeobject];
798 	bool		replace[Natts_pg_largeobject];
799 	CatalogIndexState indstate;
800 
801 	Assert(PointerIsValid(obj_desc));
802 
803 	/* enforce writability because snapshot is probably wrong otherwise */
804 	if ((obj_desc->flags & IFS_WRLOCK) == 0)
805 		ereport(ERROR,
806 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
807 				 errmsg("permission denied for large object %u",
808 						obj_desc->id)));
809 
810 	/*
811 	 * use errmsg_internal here because we don't want to expose INT64_FORMAT
812 	 * in translatable strings; doing better is not worth the trouble
813 	 */
814 	if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
815 		ereport(ERROR,
816 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
817 				 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
818 								 len)));
819 
820 	open_lo_relation();
821 
822 	indstate = CatalogOpenIndexes(lo_heap_r);
823 
824 	/*
825 	 * Set up to find all pages with desired loid and pageno >= target
826 	 */
827 	ScanKeyInit(&skey[0],
828 				Anum_pg_largeobject_loid,
829 				BTEqualStrategyNumber, F_OIDEQ,
830 				ObjectIdGetDatum(obj_desc->id));
831 
832 	ScanKeyInit(&skey[1],
833 				Anum_pg_largeobject_pageno,
834 				BTGreaterEqualStrategyNumber, F_INT4GE,
835 				Int32GetDatum(pageno));
836 
837 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
838 									obj_desc->snapshot, 2, skey);
839 
840 	/*
841 	 * If possible, get the page the truncation point is in. The truncation
842 	 * point may be beyond the end of the LO or in a hole.
843 	 */
844 	olddata = NULL;
845 	if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
846 	{
847 		if (HeapTupleHasNulls(oldtuple))	/* paranoia */
848 			elog(ERROR, "null field found in pg_largeobject");
849 		olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
850 		Assert(olddata->pageno >= pageno);
851 	}
852 
853 	/*
854 	 * If we found the page of the truncation point we need to truncate the
855 	 * data in it.  Otherwise if we're in a hole, we need to create a page to
856 	 * mark the end of data.
857 	 */
858 	if (olddata != NULL && olddata->pageno == pageno)
859 	{
860 		/* First, load old data into workbuf */
861 		bytea	   *datafield;
862 		int			pagelen;
863 		bool		pfreeit;
864 
865 		getdatafield(olddata, &datafield, &pagelen, &pfreeit);
866 		memcpy(workb, VARDATA(datafield), pagelen);
867 		if (pfreeit)
868 			pfree(datafield);
869 
870 		/*
871 		 * Fill any hole
872 		 */
873 		off = len % LOBLKSIZE;
874 		if (off > pagelen)
875 			MemSet(workb + pagelen, 0, off - pagelen);
876 
877 		/* compute length of new page */
878 		SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
879 
880 		/*
881 		 * Form and insert updated tuple
882 		 */
883 		memset(values, 0, sizeof(values));
884 		memset(nulls, false, sizeof(nulls));
885 		memset(replace, false, sizeof(replace));
886 		values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
887 		replace[Anum_pg_largeobject_data - 1] = true;
888 		newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
889 								   values, nulls, replace);
890 		CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
891 								   indstate);
892 		heap_freetuple(newtup);
893 	}
894 	else
895 	{
896 		/*
897 		 * If the first page we found was after the truncation point, we're in
898 		 * a hole that we'll fill, but we need to delete the later page
899 		 * because the loop below won't visit it again.
900 		 */
901 		if (olddata != NULL)
902 		{
903 			Assert(olddata->pageno > pageno);
904 			CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
905 		}
906 
907 		/*
908 		 * Write a brand new page.
909 		 *
910 		 * Fill the hole up to the truncation point
911 		 */
912 		off = len % LOBLKSIZE;
913 		if (off > 0)
914 			MemSet(workb, 0, off);
915 
916 		/* compute length of new page */
917 		SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
918 
919 		/*
920 		 * Form and insert new tuple
921 		 */
922 		memset(values, 0, sizeof(values));
923 		memset(nulls, false, sizeof(nulls));
924 		values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
925 		values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
926 		values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
927 		newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
928 		CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
929 		heap_freetuple(newtup);
930 	}
931 
932 	/*
933 	 * Delete any pages after the truncation point.  If the initial search
934 	 * didn't find a page, then of course there's nothing more to do.
935 	 */
936 	if (olddata != NULL)
937 	{
938 		while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
939 		{
940 			CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
941 		}
942 	}
943 
944 	systable_endscan_ordered(sd);
945 
946 	CatalogCloseIndexes(indstate);
947 
948 	/*
949 	 * Advance command counter so that tuple updates will be seen by later
950 	 * large-object operations in this transaction.
951 	 */
952 	CommandCounterIncrement();
953 }
954