1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c
4  *	  routines for manipulating inversion fs large objects. This file
5  *	  contains the user-level large object application interface routines.
6  *
7  *
8  * Note: we access pg_largeobject.data using its C struct declaration.
9  * This is safe because it immediately follows pageno which is an int4 field,
10  * and therefore the data field will always be 4-byte aligned, even if it
11  * is in the short 1-byte-header format.  We have to detoast it since it's
12  * quite likely to be in compressed or short format.  We also need to check
13  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14  *
15  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16  * does most of the backend code.  We expect that CurrentMemoryContext will
17  * be a short-lived context.  Data that must persist across function calls
18  * is kept either in CacheMemoryContext (the Relation structs) or in the
19  * memory context given to inv_open (for LargeObjectDesc structs).
20  *
21  *
22  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
23  * Portions Copyright (c) 1994, Regents of the University of California
24  *
25  *
26  * IDENTIFICATION
27  *	  src/backend/storage/large_object/inv_api.c
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32 
33 #include <limits.h>
34 
35 #include "access/detoast.h"
36 #include "access/genam.h"
37 #include "access/htup_details.h"
38 #include "access/sysattr.h"
39 #include "access/table.h"
40 #include "access/xact.h"
41 #include "catalog/dependency.h"
42 #include "catalog/indexing.h"
43 #include "catalog/objectaccess.h"
44 #include "catalog/pg_largeobject.h"
45 #include "catalog/pg_largeobject_metadata.h"
46 #include "libpq/libpq-fs.h"
47 #include "miscadmin.h"
48 #include "storage/large_object.h"
49 #include "utils/acl.h"
50 #include "utils/fmgroids.h"
51 #include "utils/rel.h"
52 #include "utils/snapmgr.h"
53 
54 
55 /*
56  * GUC: backwards-compatibility flag to suppress LO permission checks
57  */
58 bool		lo_compat_privileges;
59 
60 /*
61  * All accesses to pg_largeobject and its index make use of a single Relation
62  * reference, so that we only need to open pg_relation once per transaction.
63  * To avoid problems when the first such reference occurs inside a
64  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
65  * the Relation reference to TopTransactionResourceOwner.
66  */
67 static Relation lo_heap_r = NULL;
68 static Relation lo_index_r = NULL;
69 
70 
71 /*
72  * Open pg_largeobject and its index, if not already done in current xact
73  */
74 static void
open_lo_relation(void)75 open_lo_relation(void)
76 {
77 	ResourceOwner currentOwner;
78 
79 	if (lo_heap_r && lo_index_r)
80 		return;					/* already open in current xact */
81 
82 	/* Arrange for the top xact to own these relation references */
83 	currentOwner = CurrentResourceOwner;
84 	CurrentResourceOwner = TopTransactionResourceOwner;
85 
86 	/* Use RowExclusiveLock since we might either read or write */
87 	if (lo_heap_r == NULL)
88 		lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
89 	if (lo_index_r == NULL)
90 		lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
91 
92 	CurrentResourceOwner = currentOwner;
93 }
94 
95 /*
96  * Clean up at main transaction end
97  */
98 void
close_lo_relation(bool isCommit)99 close_lo_relation(bool isCommit)
100 {
101 	if (lo_heap_r || lo_index_r)
102 	{
103 		/*
104 		 * Only bother to close if committing; else abort cleanup will handle
105 		 * it
106 		 */
107 		if (isCommit)
108 		{
109 			ResourceOwner currentOwner;
110 
111 			currentOwner = CurrentResourceOwner;
112 			CurrentResourceOwner = TopTransactionResourceOwner;
113 
114 			if (lo_index_r)
115 				index_close(lo_index_r, NoLock);
116 			if (lo_heap_r)
117 				table_close(lo_heap_r, NoLock);
118 
119 			CurrentResourceOwner = currentOwner;
120 		}
121 		lo_heap_r = NULL;
122 		lo_index_r = NULL;
123 	}
124 }
125 
126 
127 /*
128  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
129  * read with can be specified.
130  */
131 static bool
myLargeObjectExists(Oid loid,Snapshot snapshot)132 myLargeObjectExists(Oid loid, Snapshot snapshot)
133 {
134 	Relation	pg_lo_meta;
135 	ScanKeyData skey[1];
136 	SysScanDesc sd;
137 	HeapTuple	tuple;
138 	bool		retval = false;
139 
140 	ScanKeyInit(&skey[0],
141 				Anum_pg_largeobject_metadata_oid,
142 				BTEqualStrategyNumber, F_OIDEQ,
143 				ObjectIdGetDatum(loid));
144 
145 	pg_lo_meta = table_open(LargeObjectMetadataRelationId,
146 							AccessShareLock);
147 
148 	sd = systable_beginscan(pg_lo_meta,
149 							LargeObjectMetadataOidIndexId, true,
150 							snapshot, 1, skey);
151 
152 	tuple = systable_getnext(sd);
153 	if (HeapTupleIsValid(tuple))
154 		retval = true;
155 
156 	systable_endscan(sd);
157 
158 	table_close(pg_lo_meta, AccessShareLock);
159 
160 	return retval;
161 }
162 
163 
164 /*
165  * Extract data field from a pg_largeobject tuple, detoasting if needed
166  * and verifying that the length is sane.  Returns data pointer (a bytea *),
167  * data length, and an indication of whether to pfree the data pointer.
168  */
169 static void
getdatafield(Form_pg_largeobject tuple,bytea ** pdatafield,int * plen,bool * pfreeit)170 getdatafield(Form_pg_largeobject tuple,
171 			 bytea **pdatafield,
172 			 int *plen,
173 			 bool *pfreeit)
174 {
175 	bytea	   *datafield;
176 	int			len;
177 	bool		freeit;
178 
179 	datafield = &(tuple->data); /* see note at top of file */
180 	freeit = false;
181 	if (VARATT_IS_EXTENDED(datafield))
182 	{
183 		datafield = (bytea *)
184 			detoast_attr((struct varlena *) datafield);
185 		freeit = true;
186 	}
187 	len = VARSIZE(datafield) - VARHDRSZ;
188 	if (len < 0 || len > LOBLKSIZE)
189 		ereport(ERROR,
190 				(errcode(ERRCODE_DATA_CORRUPTED),
191 				 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
192 						tuple->loid, tuple->pageno, len)));
193 	*pdatafield = datafield;
194 	*plen = len;
195 	*pfreeit = freeit;
196 }
197 
198 
199 /*
200  *	inv_create -- create a new large object
201  *
202  *	Arguments:
203  *	  lobjId - OID to use for new large object, or InvalidOid to pick one
204  *
205  *	Returns:
206  *	  OID of new object
207  *
208  * If lobjId is not InvalidOid, then an error occurs if the OID is already
209  * in use.
210  */
211 Oid
inv_create(Oid lobjId)212 inv_create(Oid lobjId)
213 {
214 	Oid			lobjId_new;
215 
216 	/*
217 	 * Create a new largeobject with empty data pages
218 	 */
219 	lobjId_new = LargeObjectCreate(lobjId);
220 
221 	/*
222 	 * dependency on the owner of largeobject
223 	 *
224 	 * The reason why we use LargeObjectRelationId instead of
225 	 * LargeObjectMetadataRelationId here is to provide backward compatibility
226 	 * to the applications which utilize a knowledge about internal layout of
227 	 * system catalogs. OID of pg_largeobject_metadata and loid of
228 	 * pg_largeobject are same value, so there are no actual differences here.
229 	 */
230 	recordDependencyOnOwner(LargeObjectRelationId,
231 							lobjId_new, GetUserId());
232 
233 	/* Post creation hook for new large object */
234 	InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
235 
236 	/*
237 	 * Advance command counter to make new tuple visible to later operations.
238 	 */
239 	CommandCounterIncrement();
240 
241 	return lobjId_new;
242 }
243 
244 /*
245  *	inv_open -- access an existing large object.
246  *
247  * Returns a large object descriptor, appropriately filled in.
248  * The descriptor and subsidiary data are allocated in the specified
249  * memory context, which must be suitably long-lived for the caller's
250  * purposes.  If the returned descriptor has a snapshot associated
251  * with it, the caller must ensure that it also lives long enough,
252  * e.g. by calling RegisterSnapshotOnOwner
253  */
254 LargeObjectDesc *
inv_open(Oid lobjId,int flags,MemoryContext mcxt)255 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
256 {
257 	LargeObjectDesc *retval;
258 	Snapshot	snapshot = NULL;
259 	int			descflags = 0;
260 
261 	/*
262 	 * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
263 	 * | INV_READ), the caller being allowed to read the large object
264 	 * descriptor in either case.
265 	 */
266 	if (flags & INV_WRITE)
267 		descflags |= IFS_WRLOCK | IFS_RDLOCK;
268 	if (flags & INV_READ)
269 		descflags |= IFS_RDLOCK;
270 
271 	if (descflags == 0)
272 		ereport(ERROR,
273 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
274 				 errmsg("invalid flags for opening a large object: %d",
275 						flags)));
276 
277 	/* Get snapshot.  If write is requested, use an instantaneous snapshot. */
278 	if (descflags & IFS_WRLOCK)
279 		snapshot = NULL;
280 	else
281 		snapshot = GetActiveSnapshot();
282 
283 	/* Can't use LargeObjectExists here because we need to specify snapshot */
284 	if (!myLargeObjectExists(lobjId, snapshot))
285 		ereport(ERROR,
286 				(errcode(ERRCODE_UNDEFINED_OBJECT),
287 				 errmsg("large object %u does not exist", lobjId)));
288 
289 	/* Apply permission checks, again specifying snapshot */
290 	if ((descflags & IFS_RDLOCK) != 0)
291 	{
292 		if (!lo_compat_privileges &&
293 			pg_largeobject_aclcheck_snapshot(lobjId,
294 											 GetUserId(),
295 											 ACL_SELECT,
296 											 snapshot) != ACLCHECK_OK)
297 			ereport(ERROR,
298 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
299 					 errmsg("permission denied for large object %u",
300 							lobjId)));
301 	}
302 	if ((descflags & IFS_WRLOCK) != 0)
303 	{
304 		if (!lo_compat_privileges &&
305 			pg_largeobject_aclcheck_snapshot(lobjId,
306 											 GetUserId(),
307 											 ACL_UPDATE,
308 											 snapshot) != ACLCHECK_OK)
309 			ereport(ERROR,
310 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
311 					 errmsg("permission denied for large object %u",
312 							lobjId)));
313 	}
314 
315 	/* OK to create a descriptor */
316 	retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
317 													sizeof(LargeObjectDesc));
318 	retval->id = lobjId;
319 	retval->offset = 0;
320 	retval->flags = descflags;
321 
322 	/* caller sets if needed, not used by the functions in this file */
323 	retval->subid = InvalidSubTransactionId;
324 
325 	/*
326 	 * The snapshot (if any) is just the currently active snapshot.  The
327 	 * caller will replace it with a longer-lived copy if needed.
328 	 */
329 	retval->snapshot = snapshot;
330 
331 	return retval;
332 }
333 
334 /*
335  * Closes a large object descriptor previously made by inv_open(), and
336  * releases the long-term memory used by it.
337  */
338 void
inv_close(LargeObjectDesc * obj_desc)339 inv_close(LargeObjectDesc *obj_desc)
340 {
341 	Assert(PointerIsValid(obj_desc));
342 	pfree(obj_desc);
343 }
344 
345 /*
346  * Destroys an existing large object (not to be confused with a descriptor!)
347  *
348  * Note we expect caller to have done any required permissions check.
349  */
350 int
inv_drop(Oid lobjId)351 inv_drop(Oid lobjId)
352 {
353 	ObjectAddress object;
354 
355 	/*
356 	 * Delete any comments and dependencies on the large object
357 	 */
358 	object.classId = LargeObjectRelationId;
359 	object.objectId = lobjId;
360 	object.objectSubId = 0;
361 	performDeletion(&object, DROP_CASCADE, 0);
362 
363 	/*
364 	 * Advance command counter so that tuple removal will be seen by later
365 	 * large-object operations in this transaction.
366 	 */
367 	CommandCounterIncrement();
368 
369 	/* For historical reasons, we always return 1 on success. */
370 	return 1;
371 }
372 
373 /*
374  * Determine size of a large object
375  *
376  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
377  * the offset of the last byte + 1.
378  */
379 static uint64
inv_getsize(LargeObjectDesc * obj_desc)380 inv_getsize(LargeObjectDesc *obj_desc)
381 {
382 	uint64		lastbyte = 0;
383 	ScanKeyData skey[1];
384 	SysScanDesc sd;
385 	HeapTuple	tuple;
386 
387 	Assert(PointerIsValid(obj_desc));
388 
389 	open_lo_relation();
390 
391 	ScanKeyInit(&skey[0],
392 				Anum_pg_largeobject_loid,
393 				BTEqualStrategyNumber, F_OIDEQ,
394 				ObjectIdGetDatum(obj_desc->id));
395 
396 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
397 									obj_desc->snapshot, 1, skey);
398 
399 	/*
400 	 * Because the pg_largeobject index is on both loid and pageno, but we
401 	 * constrain only loid, a backwards scan should visit all pages of the
402 	 * large object in reverse pageno order.  So, it's sufficient to examine
403 	 * the first valid tuple (== last valid page).
404 	 */
405 	tuple = systable_getnext_ordered(sd, BackwardScanDirection);
406 	if (HeapTupleIsValid(tuple))
407 	{
408 		Form_pg_largeobject data;
409 		bytea	   *datafield;
410 		int			len;
411 		bool		pfreeit;
412 
413 		if (HeapTupleHasNulls(tuple))	/* paranoia */
414 			elog(ERROR, "null field found in pg_largeobject");
415 		data = (Form_pg_largeobject) GETSTRUCT(tuple);
416 		getdatafield(data, &datafield, &len, &pfreeit);
417 		lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
418 		if (pfreeit)
419 			pfree(datafield);
420 	}
421 
422 	systable_endscan_ordered(sd);
423 
424 	return lastbyte;
425 }
426 
427 int64
inv_seek(LargeObjectDesc * obj_desc,int64 offset,int whence)428 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
429 {
430 	int64		newoffset;
431 
432 	Assert(PointerIsValid(obj_desc));
433 
434 	/*
435 	 * We allow seek/tell if you have either read or write permission, so no
436 	 * need for a permission check here.
437 	 */
438 
439 	/*
440 	 * Note: overflow in the additions is possible, but since we will reject
441 	 * negative results, we don't need any extra test for that.
442 	 */
443 	switch (whence)
444 	{
445 		case SEEK_SET:
446 			newoffset = offset;
447 			break;
448 		case SEEK_CUR:
449 			newoffset = obj_desc->offset + offset;
450 			break;
451 		case SEEK_END:
452 			newoffset = inv_getsize(obj_desc) + offset;
453 			break;
454 		default:
455 			ereport(ERROR,
456 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
457 					 errmsg("invalid whence setting: %d", whence)));
458 			newoffset = 0;		/* keep compiler quiet */
459 			break;
460 	}
461 
462 	/*
463 	 * use errmsg_internal here because we don't want to expose INT64_FORMAT
464 	 * in translatable strings; doing better is not worth the trouble
465 	 */
466 	if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
467 		ereport(ERROR,
468 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469 				 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
470 								 newoffset)));
471 
472 	obj_desc->offset = newoffset;
473 	return newoffset;
474 }
475 
476 int64
inv_tell(LargeObjectDesc * obj_desc)477 inv_tell(LargeObjectDesc *obj_desc)
478 {
479 	Assert(PointerIsValid(obj_desc));
480 
481 	/*
482 	 * We allow seek/tell if you have either read or write permission, so no
483 	 * need for a permission check here.
484 	 */
485 
486 	return obj_desc->offset;
487 }
488 
489 int
inv_read(LargeObjectDesc * obj_desc,char * buf,int nbytes)490 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
491 {
492 	int			nread = 0;
493 	int64		n;
494 	int64		off;
495 	int			len;
496 	int32		pageno = (int32) (obj_desc->offset / LOBLKSIZE);
497 	uint64		pageoff;
498 	ScanKeyData skey[2];
499 	SysScanDesc sd;
500 	HeapTuple	tuple;
501 
502 	Assert(PointerIsValid(obj_desc));
503 	Assert(buf != NULL);
504 
505 	if ((obj_desc->flags & IFS_RDLOCK) == 0)
506 		ereport(ERROR,
507 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
508 				 errmsg("permission denied for large object %u",
509 						obj_desc->id)));
510 
511 	if (nbytes <= 0)
512 		return 0;
513 
514 	open_lo_relation();
515 
516 	ScanKeyInit(&skey[0],
517 				Anum_pg_largeobject_loid,
518 				BTEqualStrategyNumber, F_OIDEQ,
519 				ObjectIdGetDatum(obj_desc->id));
520 
521 	ScanKeyInit(&skey[1],
522 				Anum_pg_largeobject_pageno,
523 				BTGreaterEqualStrategyNumber, F_INT4GE,
524 				Int32GetDatum(pageno));
525 
526 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
527 									obj_desc->snapshot, 2, skey);
528 
529 	while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
530 	{
531 		Form_pg_largeobject data;
532 		bytea	   *datafield;
533 		bool		pfreeit;
534 
535 		if (HeapTupleHasNulls(tuple))	/* paranoia */
536 			elog(ERROR, "null field found in pg_largeobject");
537 		data = (Form_pg_largeobject) GETSTRUCT(tuple);
538 
539 		/*
540 		 * We expect the indexscan will deliver pages in order.  However,
541 		 * there may be missing pages if the LO contains unwritten "holes". We
542 		 * want missing sections to read out as zeroes.
543 		 */
544 		pageoff = ((uint64) data->pageno) * LOBLKSIZE;
545 		if (pageoff > obj_desc->offset)
546 		{
547 			n = pageoff - obj_desc->offset;
548 			n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
549 			MemSet(buf + nread, 0, n);
550 			nread += n;
551 			obj_desc->offset += n;
552 		}
553 
554 		if (nread < nbytes)
555 		{
556 			Assert(obj_desc->offset >= pageoff);
557 			off = (int) (obj_desc->offset - pageoff);
558 			Assert(off >= 0 && off < LOBLKSIZE);
559 
560 			getdatafield(data, &datafield, &len, &pfreeit);
561 			if (len > off)
562 			{
563 				n = len - off;
564 				n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
565 				memcpy(buf + nread, VARDATA(datafield) + off, n);
566 				nread += n;
567 				obj_desc->offset += n;
568 			}
569 			if (pfreeit)
570 				pfree(datafield);
571 		}
572 
573 		if (nread >= nbytes)
574 			break;
575 	}
576 
577 	systable_endscan_ordered(sd);
578 
579 	return nread;
580 }
581 
582 int
inv_write(LargeObjectDesc * obj_desc,const char * buf,int nbytes)583 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
584 {
585 	int			nwritten = 0;
586 	int			n;
587 	int			off;
588 	int			len;
589 	int32		pageno = (int32) (obj_desc->offset / LOBLKSIZE);
590 	ScanKeyData skey[2];
591 	SysScanDesc sd;
592 	HeapTuple	oldtuple;
593 	Form_pg_largeobject olddata;
594 	bool		neednextpage;
595 	bytea	   *datafield;
596 	bool		pfreeit;
597 	union
598 	{
599 		bytea		hdr;
600 		/* this is to make the union big enough for a LO data chunk: */
601 		char		data[LOBLKSIZE + VARHDRSZ];
602 		/* ensure union is aligned well enough: */
603 		int32		align_it;
604 	}			workbuf;
605 	char	   *workb = VARDATA(&workbuf.hdr);
606 	HeapTuple	newtup;
607 	Datum		values[Natts_pg_largeobject];
608 	bool		nulls[Natts_pg_largeobject];
609 	bool		replace[Natts_pg_largeobject];
610 	CatalogIndexState indstate;
611 
612 	Assert(PointerIsValid(obj_desc));
613 	Assert(buf != NULL);
614 
615 	/* enforce writability because snapshot is probably wrong otherwise */
616 	if ((obj_desc->flags & IFS_WRLOCK) == 0)
617 		ereport(ERROR,
618 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
619 				 errmsg("permission denied for large object %u",
620 						obj_desc->id)));
621 
622 	if (nbytes <= 0)
623 		return 0;
624 
625 	/* this addition can't overflow because nbytes is only int32 */
626 	if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
627 		ereport(ERROR,
628 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629 				 errmsg("invalid large object write request size: %d",
630 						nbytes)));
631 
632 	open_lo_relation();
633 
634 	indstate = CatalogOpenIndexes(lo_heap_r);
635 
636 	ScanKeyInit(&skey[0],
637 				Anum_pg_largeobject_loid,
638 				BTEqualStrategyNumber, F_OIDEQ,
639 				ObjectIdGetDatum(obj_desc->id));
640 
641 	ScanKeyInit(&skey[1],
642 				Anum_pg_largeobject_pageno,
643 				BTGreaterEqualStrategyNumber, F_INT4GE,
644 				Int32GetDatum(pageno));
645 
646 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
647 									obj_desc->snapshot, 2, skey);
648 
649 	oldtuple = NULL;
650 	olddata = NULL;
651 	neednextpage = true;
652 
653 	while (nwritten < nbytes)
654 	{
655 		/*
656 		 * If possible, get next pre-existing page of the LO.  We expect the
657 		 * indexscan will deliver these in order --- but there may be holes.
658 		 */
659 		if (neednextpage)
660 		{
661 			if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
662 			{
663 				if (HeapTupleHasNulls(oldtuple))	/* paranoia */
664 					elog(ERROR, "null field found in pg_largeobject");
665 				olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
666 				Assert(olddata->pageno >= pageno);
667 			}
668 			neednextpage = false;
669 		}
670 
671 		/*
672 		 * If we have a pre-existing page, see if it is the page we want to
673 		 * write, or a later one.
674 		 */
675 		if (olddata != NULL && olddata->pageno == pageno)
676 		{
677 			/*
678 			 * Update an existing page with fresh data.
679 			 *
680 			 * First, load old data into workbuf
681 			 */
682 			getdatafield(olddata, &datafield, &len, &pfreeit);
683 			memcpy(workb, VARDATA(datafield), len);
684 			if (pfreeit)
685 				pfree(datafield);
686 
687 			/*
688 			 * Fill any hole
689 			 */
690 			off = (int) (obj_desc->offset % LOBLKSIZE);
691 			if (off > len)
692 				MemSet(workb + len, 0, off - len);
693 
694 			/*
695 			 * Insert appropriate portion of new data
696 			 */
697 			n = LOBLKSIZE - off;
698 			n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
699 			memcpy(workb + off, buf + nwritten, n);
700 			nwritten += n;
701 			obj_desc->offset += n;
702 			off += n;
703 			/* compute valid length of new page */
704 			len = (len >= off) ? len : off;
705 			SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
706 
707 			/*
708 			 * Form and insert updated tuple
709 			 */
710 			memset(values, 0, sizeof(values));
711 			memset(nulls, false, sizeof(nulls));
712 			memset(replace, false, sizeof(replace));
713 			values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
714 			replace[Anum_pg_largeobject_data - 1] = true;
715 			newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
716 									   values, nulls, replace);
717 			CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
718 									   indstate);
719 			heap_freetuple(newtup);
720 
721 			/*
722 			 * We're done with this old page.
723 			 */
724 			oldtuple = NULL;
725 			olddata = NULL;
726 			neednextpage = true;
727 		}
728 		else
729 		{
730 			/*
731 			 * Write a brand new page.
732 			 *
733 			 * First, fill any hole
734 			 */
735 			off = (int) (obj_desc->offset % LOBLKSIZE);
736 			if (off > 0)
737 				MemSet(workb, 0, off);
738 
739 			/*
740 			 * Insert appropriate portion of new data
741 			 */
742 			n = LOBLKSIZE - off;
743 			n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
744 			memcpy(workb + off, buf + nwritten, n);
745 			nwritten += n;
746 			obj_desc->offset += n;
747 			/* compute valid length of new page */
748 			len = off + n;
749 			SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
750 
751 			/*
752 			 * Form and insert updated tuple
753 			 */
754 			memset(values, 0, sizeof(values));
755 			memset(nulls, false, sizeof(nulls));
756 			values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
757 			values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
758 			values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
759 			newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
760 			CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
761 			heap_freetuple(newtup);
762 		}
763 		pageno++;
764 	}
765 
766 	systable_endscan_ordered(sd);
767 
768 	CatalogCloseIndexes(indstate);
769 
770 	/*
771 	 * Advance command counter so that my tuple updates will be seen by later
772 	 * large-object operations in this transaction.
773 	 */
774 	CommandCounterIncrement();
775 
776 	return nwritten;
777 }
778 
779 void
inv_truncate(LargeObjectDesc * obj_desc,int64 len)780 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
781 {
782 	int32		pageno = (int32) (len / LOBLKSIZE);
783 	int32		off;
784 	ScanKeyData skey[2];
785 	SysScanDesc sd;
786 	HeapTuple	oldtuple;
787 	Form_pg_largeobject olddata;
788 	union
789 	{
790 		bytea		hdr;
791 		/* this is to make the union big enough for a LO data chunk: */
792 		char		data[LOBLKSIZE + VARHDRSZ];
793 		/* ensure union is aligned well enough: */
794 		int32		align_it;
795 	}			workbuf;
796 	char	   *workb = VARDATA(&workbuf.hdr);
797 	HeapTuple	newtup;
798 	Datum		values[Natts_pg_largeobject];
799 	bool		nulls[Natts_pg_largeobject];
800 	bool		replace[Natts_pg_largeobject];
801 	CatalogIndexState indstate;
802 
803 	Assert(PointerIsValid(obj_desc));
804 
805 	/* enforce writability because snapshot is probably wrong otherwise */
806 	if ((obj_desc->flags & IFS_WRLOCK) == 0)
807 		ereport(ERROR,
808 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
809 				 errmsg("permission denied for large object %u",
810 						obj_desc->id)));
811 
812 	/*
813 	 * use errmsg_internal here because we don't want to expose INT64_FORMAT
814 	 * in translatable strings; doing better is not worth the trouble
815 	 */
816 	if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
817 		ereport(ERROR,
818 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
819 				 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
820 								 len)));
821 
822 	open_lo_relation();
823 
824 	indstate = CatalogOpenIndexes(lo_heap_r);
825 
826 	/*
827 	 * Set up to find all pages with desired loid and pageno >= target
828 	 */
829 	ScanKeyInit(&skey[0],
830 				Anum_pg_largeobject_loid,
831 				BTEqualStrategyNumber, F_OIDEQ,
832 				ObjectIdGetDatum(obj_desc->id));
833 
834 	ScanKeyInit(&skey[1],
835 				Anum_pg_largeobject_pageno,
836 				BTGreaterEqualStrategyNumber, F_INT4GE,
837 				Int32GetDatum(pageno));
838 
839 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
840 									obj_desc->snapshot, 2, skey);
841 
842 	/*
843 	 * If possible, get the page the truncation point is in. The truncation
844 	 * point may be beyond the end of the LO or in a hole.
845 	 */
846 	olddata = NULL;
847 	if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
848 	{
849 		if (HeapTupleHasNulls(oldtuple))	/* paranoia */
850 			elog(ERROR, "null field found in pg_largeobject");
851 		olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
852 		Assert(olddata->pageno >= pageno);
853 	}
854 
855 	/*
856 	 * If we found the page of the truncation point we need to truncate the
857 	 * data in it.  Otherwise if we're in a hole, we need to create a page to
858 	 * mark the end of data.
859 	 */
860 	if (olddata != NULL && olddata->pageno == pageno)
861 	{
862 		/* First, load old data into workbuf */
863 		bytea	   *datafield;
864 		int			pagelen;
865 		bool		pfreeit;
866 
867 		getdatafield(olddata, &datafield, &pagelen, &pfreeit);
868 		memcpy(workb, VARDATA(datafield), pagelen);
869 		if (pfreeit)
870 			pfree(datafield);
871 
872 		/*
873 		 * Fill any hole
874 		 */
875 		off = len % LOBLKSIZE;
876 		if (off > pagelen)
877 			MemSet(workb + pagelen, 0, off - pagelen);
878 
879 		/* compute length of new page */
880 		SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
881 
882 		/*
883 		 * Form and insert updated tuple
884 		 */
885 		memset(values, 0, sizeof(values));
886 		memset(nulls, false, sizeof(nulls));
887 		memset(replace, false, sizeof(replace));
888 		values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
889 		replace[Anum_pg_largeobject_data - 1] = true;
890 		newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
891 								   values, nulls, replace);
892 		CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
893 								   indstate);
894 		heap_freetuple(newtup);
895 	}
896 	else
897 	{
898 		/*
899 		 * If the first page we found was after the truncation point, we're in
900 		 * a hole that we'll fill, but we need to delete the later page
901 		 * because the loop below won't visit it again.
902 		 */
903 		if (olddata != NULL)
904 		{
905 			Assert(olddata->pageno > pageno);
906 			CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
907 		}
908 
909 		/*
910 		 * Write a brand new page.
911 		 *
912 		 * Fill the hole up to the truncation point
913 		 */
914 		off = len % LOBLKSIZE;
915 		if (off > 0)
916 			MemSet(workb, 0, off);
917 
918 		/* compute length of new page */
919 		SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
920 
921 		/*
922 		 * Form and insert new tuple
923 		 */
924 		memset(values, 0, sizeof(values));
925 		memset(nulls, false, sizeof(nulls));
926 		values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
927 		values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
928 		values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
929 		newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
930 		CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
931 		heap_freetuple(newtup);
932 	}
933 
934 	/*
935 	 * Delete any pages after the truncation point.  If the initial search
936 	 * didn't find a page, then of course there's nothing more to do.
937 	 */
938 	if (olddata != NULL)
939 	{
940 		while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
941 		{
942 			CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
943 		}
944 	}
945 
946 	systable_endscan_ordered(sd);
947 
948 	CatalogCloseIndexes(indstate);
949 
950 	/*
951 	 * Advance command counter so that tuple updates will be seen by later
952 	 * large-object operations in this transaction.
953 	 */
954 	CommandCounterIncrement();
955 }
956