1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c
4  *	  routines for manipulating inversion fs large objects. This file
5  *	  contains the user-level large object application interface routines.
6  *
7  *
8  * Note: we access pg_largeobject.data using its C struct declaration.
9  * This is safe because it immediately follows pageno which is an int4 field,
10  * and therefore the data field will always be 4-byte aligned, even if it
11  * is in the short 1-byte-header format.  We have to detoast it since it's
12  * quite likely to be in compressed or short format.  We also need to check
13  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14  *
15  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16  * does most of the backend code.  We expect that CurrentMemoryContext will
17  * be a short-lived context.  Data that must persist across function calls
18  * is kept either in CacheMemoryContext (the Relation structs) or in the
19  * memory context given to inv_open (for LargeObjectDesc structs).
20  *
21  *
22  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
23  * Portions Copyright (c) 1994, Regents of the University of California
24  *
25  *
26  * IDENTIFICATION
27  *	  src/backend/storage/large_object/inv_api.c
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32 
33 #include <limits.h>
34 
35 #include "access/genam.h"
36 #include "access/heapam.h"
37 #include "access/sysattr.h"
38 #include "access/tuptoaster.h"
39 #include "access/xact.h"
40 #include "catalog/dependency.h"
41 #include "catalog/indexing.h"
42 #include "catalog/objectaccess.h"
43 #include "catalog/pg_largeobject.h"
44 #include "catalog/pg_largeobject_metadata.h"
45 #include "libpq/libpq-fs.h"
46 #include "miscadmin.h"
47 #include "storage/large_object.h"
48 #include "utils/fmgroids.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51 #include "utils/tqual.h"
52 
53 
54 /*
55  * GUC: backwards-compatibility flag to suppress LO permission checks
56  */
57 bool		lo_compat_privileges;
58 
59 /*
60  * All accesses to pg_largeobject and its index make use of a single Relation
61  * reference, so that we only need to open pg_relation once per transaction.
62  * To avoid problems when the first such reference occurs inside a
63  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
64  * the Relation reference to TopTransactionResourceOwner.
65  */
66 static Relation lo_heap_r = NULL;
67 static Relation lo_index_r = NULL;
68 
69 
70 /*
71  * Open pg_largeobject and its index, if not already done in current xact
72  */
73 static void
open_lo_relation(void)74 open_lo_relation(void)
75 {
76 	ResourceOwner currentOwner;
77 
78 	if (lo_heap_r && lo_index_r)
79 		return;					/* already open in current xact */
80 
81 	/* Arrange for the top xact to own these relation references */
82 	currentOwner = CurrentResourceOwner;
83 	CurrentResourceOwner = TopTransactionResourceOwner;
84 
85 	/* Use RowExclusiveLock since we might either read or write */
86 	if (lo_heap_r == NULL)
87 		lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
88 	if (lo_index_r == NULL)
89 		lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
90 
91 	CurrentResourceOwner = currentOwner;
92 }
93 
94 /*
95  * Clean up at main transaction end
96  */
97 void
close_lo_relation(bool isCommit)98 close_lo_relation(bool isCommit)
99 {
100 	if (lo_heap_r || lo_index_r)
101 	{
102 		/*
103 		 * Only bother to close if committing; else abort cleanup will handle
104 		 * it
105 		 */
106 		if (isCommit)
107 		{
108 			ResourceOwner currentOwner;
109 
110 			currentOwner = CurrentResourceOwner;
111 			CurrentResourceOwner = TopTransactionResourceOwner;
112 
113 			if (lo_index_r)
114 				index_close(lo_index_r, NoLock);
115 			if (lo_heap_r)
116 				heap_close(lo_heap_r, NoLock);
117 
118 			CurrentResourceOwner = currentOwner;
119 		}
120 		lo_heap_r = NULL;
121 		lo_index_r = NULL;
122 	}
123 }
124 
125 
126 /*
127  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
128  * read with can be specified.
129  */
130 static bool
myLargeObjectExists(Oid loid,Snapshot snapshot)131 myLargeObjectExists(Oid loid, Snapshot snapshot)
132 {
133 	Relation	pg_lo_meta;
134 	ScanKeyData skey[1];
135 	SysScanDesc sd;
136 	HeapTuple	tuple;
137 	bool		retval = false;
138 
139 	ScanKeyInit(&skey[0],
140 				ObjectIdAttributeNumber,
141 				BTEqualStrategyNumber, F_OIDEQ,
142 				ObjectIdGetDatum(loid));
143 
144 	pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
145 						   AccessShareLock);
146 
147 	sd = systable_beginscan(pg_lo_meta,
148 							LargeObjectMetadataOidIndexId, true,
149 							snapshot, 1, skey);
150 
151 	tuple = systable_getnext(sd);
152 	if (HeapTupleIsValid(tuple))
153 		retval = true;
154 
155 	systable_endscan(sd);
156 
157 	heap_close(pg_lo_meta, AccessShareLock);
158 
159 	return retval;
160 }
161 
162 
163 /*
164  * Extract data field from a pg_largeobject tuple, detoasting if needed
165  * and verifying that the length is sane.  Returns data pointer (a bytea *),
166  * data length, and an indication of whether to pfree the data pointer.
167  */
168 static void
getdatafield(Form_pg_largeobject tuple,bytea ** pdatafield,int * plen,bool * pfreeit)169 getdatafield(Form_pg_largeobject tuple,
170 			 bytea **pdatafield,
171 			 int *plen,
172 			 bool *pfreeit)
173 {
174 	bytea	   *datafield;
175 	int			len;
176 	bool		freeit;
177 
178 	datafield = &(tuple->data); /* see note at top of file */
179 	freeit = false;
180 	if (VARATT_IS_EXTENDED(datafield))
181 	{
182 		datafield = (bytea *)
183 			heap_tuple_untoast_attr((struct varlena *) datafield);
184 		freeit = true;
185 	}
186 	len = VARSIZE(datafield) - VARHDRSZ;
187 	if (len < 0 || len > LOBLKSIZE)
188 		ereport(ERROR,
189 				(errcode(ERRCODE_DATA_CORRUPTED),
190 				 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
191 						tuple->loid, tuple->pageno, len)));
192 	*pdatafield = datafield;
193 	*plen = len;
194 	*pfreeit = freeit;
195 }
196 
197 
198 /*
199  *	inv_create -- create a new large object
200  *
201  *	Arguments:
202  *	  lobjId - OID to use for new large object, or InvalidOid to pick one
203  *
204  *	Returns:
205  *	  OID of new object
206  *
207  * If lobjId is not InvalidOid, then an error occurs if the OID is already
208  * in use.
209  */
210 Oid
inv_create(Oid lobjId)211 inv_create(Oid lobjId)
212 {
213 	Oid			lobjId_new;
214 
215 	/*
216 	 * Create a new largeobject with empty data pages
217 	 */
218 	lobjId_new = LargeObjectCreate(lobjId);
219 
220 	/*
221 	 * dependency on the owner of largeobject
222 	 *
223 	 * The reason why we use LargeObjectRelationId instead of
224 	 * LargeObjectMetadataRelationId here is to provide backward compatibility
225 	 * to the applications which utilize a knowledge about internal layout of
226 	 * system catalogs. OID of pg_largeobject_metadata and loid of
227 	 * pg_largeobject are same value, so there are no actual differences here.
228 	 */
229 	recordDependencyOnOwner(LargeObjectRelationId,
230 							lobjId_new, GetUserId());
231 
232 	/* Post creation hook for new large object */
233 	InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
234 
235 	/*
236 	 * Advance command counter to make new tuple visible to later operations.
237 	 */
238 	CommandCounterIncrement();
239 
240 	return lobjId_new;
241 }
242 
243 /*
244  *	inv_open -- access an existing large object.
245  *
246  * Returns a large object descriptor, appropriately filled in.
247  * The descriptor and subsidiary data are allocated in the specified
248  * memory context, which must be suitably long-lived for the caller's
249  * purposes.  If the returned descriptor has a snapshot associated
250  * with it, the caller must ensure that it also lives long enough,
251  * e.g. by calling RegisterSnapshotOnOwner
252  */
253 LargeObjectDesc *
inv_open(Oid lobjId,int flags,MemoryContext mcxt)254 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
255 {
256 	LargeObjectDesc *retval;
257 	Snapshot	snapshot = NULL;
258 	int			descflags = 0;
259 
260 	/*
261 	 * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
262 	 * | INV_READ), the caller being allowed to read the large object
263 	 * descriptor in either case.
264 	 */
265 	if (flags & INV_WRITE)
266 		descflags |= IFS_WRLOCK | IFS_RDLOCK;
267 	if (flags & INV_READ)
268 		descflags |= IFS_RDLOCK;
269 
270 	if (descflags == 0)
271 		ereport(ERROR,
272 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
273 				 errmsg("invalid flags for opening a large object: %d",
274 						flags)));
275 
276 	/* Get snapshot.  If write is requested, use an instantaneous snapshot. */
277 	if (descflags & IFS_WRLOCK)
278 		snapshot = NULL;
279 	else
280 		snapshot = GetActiveSnapshot();
281 
282 	/* Can't use LargeObjectExists here because we need to specify snapshot */
283 	if (!myLargeObjectExists(lobjId, snapshot))
284 		ereport(ERROR,
285 				(errcode(ERRCODE_UNDEFINED_OBJECT),
286 				 errmsg("large object %u does not exist", lobjId)));
287 
288 	/* Apply permission checks, again specifying snapshot */
289 	if ((descflags & IFS_RDLOCK) != 0)
290 	{
291 		if (!lo_compat_privileges &&
292 			pg_largeobject_aclcheck_snapshot(lobjId,
293 											 GetUserId(),
294 											 ACL_SELECT,
295 											 snapshot) != ACLCHECK_OK)
296 			ereport(ERROR,
297 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
298 					 errmsg("permission denied for large object %u",
299 							lobjId)));
300 	}
301 	if ((descflags & IFS_WRLOCK) != 0)
302 	{
303 		if (!lo_compat_privileges &&
304 			pg_largeobject_aclcheck_snapshot(lobjId,
305 											 GetUserId(),
306 											 ACL_UPDATE,
307 											 snapshot) != ACLCHECK_OK)
308 			ereport(ERROR,
309 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
310 					 errmsg("permission denied for large object %u",
311 							lobjId)));
312 	}
313 
314 	/* OK to create a descriptor */
315 	retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
316 													sizeof(LargeObjectDesc));
317 	retval->id = lobjId;
318 	retval->offset = 0;
319 	retval->flags = descflags;
320 
321 	/* caller sets if needed, not used by the functions in this file */
322 	retval->subid = InvalidSubTransactionId;
323 
324 	/*
325 	 * The snapshot (if any) is just the currently active snapshot.  The
326 	 * caller will replace it with a longer-lived copy if needed.
327 	 */
328 	retval->snapshot = snapshot;
329 
330 	return retval;
331 }
332 
333 /*
334  * Closes a large object descriptor previously made by inv_open(), and
335  * releases the long-term memory used by it.
336  */
337 void
inv_close(LargeObjectDesc * obj_desc)338 inv_close(LargeObjectDesc *obj_desc)
339 {
340 	Assert(PointerIsValid(obj_desc));
341 	pfree(obj_desc);
342 }
343 
344 /*
345  * Destroys an existing large object (not to be confused with a descriptor!)
346  *
347  * Note we expect caller to have done any required permissions check.
348  */
349 int
inv_drop(Oid lobjId)350 inv_drop(Oid lobjId)
351 {
352 	ObjectAddress object;
353 
354 	/*
355 	 * Delete any comments and dependencies on the large object
356 	 */
357 	object.classId = LargeObjectRelationId;
358 	object.objectId = lobjId;
359 	object.objectSubId = 0;
360 	performDeletion(&object, DROP_CASCADE, 0);
361 
362 	/*
363 	 * Advance command counter so that tuple removal will be seen by later
364 	 * large-object operations in this transaction.
365 	 */
366 	CommandCounterIncrement();
367 
368 	/* For historical reasons, we always return 1 on success. */
369 	return 1;
370 }
371 
372 /*
373  * Determine size of a large object
374  *
375  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
376  * the offset of the last byte + 1.
377  */
378 static uint64
inv_getsize(LargeObjectDesc * obj_desc)379 inv_getsize(LargeObjectDesc *obj_desc)
380 {
381 	uint64		lastbyte = 0;
382 	ScanKeyData skey[1];
383 	SysScanDesc sd;
384 	HeapTuple	tuple;
385 
386 	Assert(PointerIsValid(obj_desc));
387 
388 	open_lo_relation();
389 
390 	ScanKeyInit(&skey[0],
391 				Anum_pg_largeobject_loid,
392 				BTEqualStrategyNumber, F_OIDEQ,
393 				ObjectIdGetDatum(obj_desc->id));
394 
395 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
396 									obj_desc->snapshot, 1, skey);
397 
398 	/*
399 	 * Because the pg_largeobject index is on both loid and pageno, but we
400 	 * constrain only loid, a backwards scan should visit all pages of the
401 	 * large object in reverse pageno order.  So, it's sufficient to examine
402 	 * the first valid tuple (== last valid page).
403 	 */
404 	tuple = systable_getnext_ordered(sd, BackwardScanDirection);
405 	if (HeapTupleIsValid(tuple))
406 	{
407 		Form_pg_largeobject data;
408 		bytea	   *datafield;
409 		int			len;
410 		bool		pfreeit;
411 
412 		if (HeapTupleHasNulls(tuple))	/* paranoia */
413 			elog(ERROR, "null field found in pg_largeobject");
414 		data = (Form_pg_largeobject) GETSTRUCT(tuple);
415 		getdatafield(data, &datafield, &len, &pfreeit);
416 		lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
417 		if (pfreeit)
418 			pfree(datafield);
419 	}
420 
421 	systable_endscan_ordered(sd);
422 
423 	return lastbyte;
424 }
425 
426 int64
inv_seek(LargeObjectDesc * obj_desc,int64 offset,int whence)427 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
428 {
429 	int64		newoffset;
430 
431 	Assert(PointerIsValid(obj_desc));
432 
433 	/*
434 	 * We allow seek/tell if you have either read or write permission, so no
435 	 * need for a permission check here.
436 	 */
437 
438 	/*
439 	 * Note: overflow in the additions is possible, but since we will reject
440 	 * negative results, we don't need any extra test for that.
441 	 */
442 	switch (whence)
443 	{
444 		case SEEK_SET:
445 			newoffset = offset;
446 			break;
447 		case SEEK_CUR:
448 			newoffset = obj_desc->offset + offset;
449 			break;
450 		case SEEK_END:
451 			newoffset = inv_getsize(obj_desc) + offset;
452 			break;
453 		default:
454 			ereport(ERROR,
455 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
456 					 errmsg("invalid whence setting: %d", whence)));
457 			newoffset = 0;		/* keep compiler quiet */
458 			break;
459 	}
460 
461 	/*
462 	 * use errmsg_internal here because we don't want to expose INT64_FORMAT
463 	 * in translatable strings; doing better is not worth the trouble
464 	 */
465 	if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
466 		ereport(ERROR,
467 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
468 				 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
469 								 newoffset)));
470 
471 	obj_desc->offset = newoffset;
472 	return newoffset;
473 }
474 
475 int64
inv_tell(LargeObjectDesc * obj_desc)476 inv_tell(LargeObjectDesc *obj_desc)
477 {
478 	Assert(PointerIsValid(obj_desc));
479 
480 	/*
481 	 * We allow seek/tell if you have either read or write permission, so no
482 	 * need for a permission check here.
483 	 */
484 
485 	return obj_desc->offset;
486 }
487 
488 int
inv_read(LargeObjectDesc * obj_desc,char * buf,int nbytes)489 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
490 {
491 	int			nread = 0;
492 	int64		n;
493 	int64		off;
494 	int			len;
495 	int32		pageno = (int32) (obj_desc->offset / LOBLKSIZE);
496 	uint64		pageoff;
497 	ScanKeyData skey[2];
498 	SysScanDesc sd;
499 	HeapTuple	tuple;
500 
501 	Assert(PointerIsValid(obj_desc));
502 	Assert(buf != NULL);
503 
504 	if ((obj_desc->flags & IFS_RDLOCK) == 0)
505 		ereport(ERROR,
506 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
507 				 errmsg("permission denied for large object %u",
508 						obj_desc->id)));
509 
510 	if (nbytes <= 0)
511 		return 0;
512 
513 	open_lo_relation();
514 
515 	ScanKeyInit(&skey[0],
516 				Anum_pg_largeobject_loid,
517 				BTEqualStrategyNumber, F_OIDEQ,
518 				ObjectIdGetDatum(obj_desc->id));
519 
520 	ScanKeyInit(&skey[1],
521 				Anum_pg_largeobject_pageno,
522 				BTGreaterEqualStrategyNumber, F_INT4GE,
523 				Int32GetDatum(pageno));
524 
525 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
526 									obj_desc->snapshot, 2, skey);
527 
528 	while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
529 	{
530 		Form_pg_largeobject data;
531 		bytea	   *datafield;
532 		bool		pfreeit;
533 
534 		if (HeapTupleHasNulls(tuple))	/* paranoia */
535 			elog(ERROR, "null field found in pg_largeobject");
536 		data = (Form_pg_largeobject) GETSTRUCT(tuple);
537 
538 		/*
539 		 * We expect the indexscan will deliver pages in order.  However,
540 		 * there may be missing pages if the LO contains unwritten "holes". We
541 		 * want missing sections to read out as zeroes.
542 		 */
543 		pageoff = ((uint64) data->pageno) * LOBLKSIZE;
544 		if (pageoff > obj_desc->offset)
545 		{
546 			n = pageoff - obj_desc->offset;
547 			n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
548 			MemSet(buf + nread, 0, n);
549 			nread += n;
550 			obj_desc->offset += n;
551 		}
552 
553 		if (nread < nbytes)
554 		{
555 			Assert(obj_desc->offset >= pageoff);
556 			off = (int) (obj_desc->offset - pageoff);
557 			Assert(off >= 0 && off < LOBLKSIZE);
558 
559 			getdatafield(data, &datafield, &len, &pfreeit);
560 			if (len > off)
561 			{
562 				n = len - off;
563 				n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
564 				memcpy(buf + nread, VARDATA(datafield) + off, n);
565 				nread += n;
566 				obj_desc->offset += n;
567 			}
568 			if (pfreeit)
569 				pfree(datafield);
570 		}
571 
572 		if (nread >= nbytes)
573 			break;
574 	}
575 
576 	systable_endscan_ordered(sd);
577 
578 	return nread;
579 }
580 
581 int
inv_write(LargeObjectDesc * obj_desc,const char * buf,int nbytes)582 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
583 {
584 	int			nwritten = 0;
585 	int			n;
586 	int			off;
587 	int			len;
588 	int32		pageno = (int32) (obj_desc->offset / LOBLKSIZE);
589 	ScanKeyData skey[2];
590 	SysScanDesc sd;
591 	HeapTuple	oldtuple;
592 	Form_pg_largeobject olddata;
593 	bool		neednextpage;
594 	bytea	   *datafield;
595 	bool		pfreeit;
596 	union
597 	{
598 		bytea		hdr;
599 		/* this is to make the union big enough for a LO data chunk: */
600 		char		data[LOBLKSIZE + VARHDRSZ];
601 		/* ensure union is aligned well enough: */
602 		int32		align_it;
603 	}			workbuf;
604 	char	   *workb = VARDATA(&workbuf.hdr);
605 	HeapTuple	newtup;
606 	Datum		values[Natts_pg_largeobject];
607 	bool		nulls[Natts_pg_largeobject];
608 	bool		replace[Natts_pg_largeobject];
609 	CatalogIndexState indstate;
610 
611 	Assert(PointerIsValid(obj_desc));
612 	Assert(buf != NULL);
613 
614 	/* enforce writability because snapshot is probably wrong otherwise */
615 	if ((obj_desc->flags & IFS_WRLOCK) == 0)
616 		ereport(ERROR,
617 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
618 				 errmsg("permission denied for large object %u",
619 						obj_desc->id)));
620 
621 	if (nbytes <= 0)
622 		return 0;
623 
624 	/* this addition can't overflow because nbytes is only int32 */
625 	if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
626 		ereport(ERROR,
627 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
628 				 errmsg("invalid large object write request size: %d",
629 						nbytes)));
630 
631 	open_lo_relation();
632 
633 	indstate = CatalogOpenIndexes(lo_heap_r);
634 
635 	ScanKeyInit(&skey[0],
636 				Anum_pg_largeobject_loid,
637 				BTEqualStrategyNumber, F_OIDEQ,
638 				ObjectIdGetDatum(obj_desc->id));
639 
640 	ScanKeyInit(&skey[1],
641 				Anum_pg_largeobject_pageno,
642 				BTGreaterEqualStrategyNumber, F_INT4GE,
643 				Int32GetDatum(pageno));
644 
645 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
646 									obj_desc->snapshot, 2, skey);
647 
648 	oldtuple = NULL;
649 	olddata = NULL;
650 	neednextpage = true;
651 
652 	while (nwritten < nbytes)
653 	{
654 		/*
655 		 * If possible, get next pre-existing page of the LO.  We expect the
656 		 * indexscan will deliver these in order --- but there may be holes.
657 		 */
658 		if (neednextpage)
659 		{
660 			if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
661 			{
662 				if (HeapTupleHasNulls(oldtuple))	/* paranoia */
663 					elog(ERROR, "null field found in pg_largeobject");
664 				olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
665 				Assert(olddata->pageno >= pageno);
666 			}
667 			neednextpage = false;
668 		}
669 
670 		/*
671 		 * If we have a pre-existing page, see if it is the page we want to
672 		 * write, or a later one.
673 		 */
674 		if (olddata != NULL && olddata->pageno == pageno)
675 		{
676 			/*
677 			 * Update an existing page with fresh data.
678 			 *
679 			 * First, load old data into workbuf
680 			 */
681 			getdatafield(olddata, &datafield, &len, &pfreeit);
682 			memcpy(workb, VARDATA(datafield), len);
683 			if (pfreeit)
684 				pfree(datafield);
685 
686 			/*
687 			 * Fill any hole
688 			 */
689 			off = (int) (obj_desc->offset % LOBLKSIZE);
690 			if (off > len)
691 				MemSet(workb + len, 0, off - len);
692 
693 			/*
694 			 * Insert appropriate portion of new data
695 			 */
696 			n = LOBLKSIZE - off;
697 			n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
698 			memcpy(workb + off, buf + nwritten, n);
699 			nwritten += n;
700 			obj_desc->offset += n;
701 			off += n;
702 			/* compute valid length of new page */
703 			len = (len >= off) ? len : off;
704 			SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
705 
706 			/*
707 			 * Form and insert updated tuple
708 			 */
709 			memset(values, 0, sizeof(values));
710 			memset(nulls, false, sizeof(nulls));
711 			memset(replace, false, sizeof(replace));
712 			values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
713 			replace[Anum_pg_largeobject_data - 1] = true;
714 			newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
715 									   values, nulls, replace);
716 			CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
717 									   indstate);
718 			heap_freetuple(newtup);
719 
720 			/*
721 			 * We're done with this old page.
722 			 */
723 			oldtuple = NULL;
724 			olddata = NULL;
725 			neednextpage = true;
726 		}
727 		else
728 		{
729 			/*
730 			 * Write a brand new page.
731 			 *
732 			 * First, fill any hole
733 			 */
734 			off = (int) (obj_desc->offset % LOBLKSIZE);
735 			if (off > 0)
736 				MemSet(workb, 0, off);
737 
738 			/*
739 			 * Insert appropriate portion of new data
740 			 */
741 			n = LOBLKSIZE - off;
742 			n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
743 			memcpy(workb + off, buf + nwritten, n);
744 			nwritten += n;
745 			obj_desc->offset += n;
746 			/* compute valid length of new page */
747 			len = off + n;
748 			SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
749 
750 			/*
751 			 * Form and insert updated tuple
752 			 */
753 			memset(values, 0, sizeof(values));
754 			memset(nulls, false, sizeof(nulls));
755 			values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
756 			values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
757 			values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
758 			newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
759 			CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
760 			heap_freetuple(newtup);
761 		}
762 		pageno++;
763 	}
764 
765 	systable_endscan_ordered(sd);
766 
767 	CatalogCloseIndexes(indstate);
768 
769 	/*
770 	 * Advance command counter so that my tuple updates will be seen by later
771 	 * large-object operations in this transaction.
772 	 */
773 	CommandCounterIncrement();
774 
775 	return nwritten;
776 }
777 
778 void
inv_truncate(LargeObjectDesc * obj_desc,int64 len)779 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
780 {
781 	int32		pageno = (int32) (len / LOBLKSIZE);
782 	int32		off;
783 	ScanKeyData skey[2];
784 	SysScanDesc sd;
785 	HeapTuple	oldtuple;
786 	Form_pg_largeobject olddata;
787 	union
788 	{
789 		bytea		hdr;
790 		/* this is to make the union big enough for a LO data chunk: */
791 		char		data[LOBLKSIZE + VARHDRSZ];
792 		/* ensure union is aligned well enough: */
793 		int32		align_it;
794 	}			workbuf;
795 	char	   *workb = VARDATA(&workbuf.hdr);
796 	HeapTuple	newtup;
797 	Datum		values[Natts_pg_largeobject];
798 	bool		nulls[Natts_pg_largeobject];
799 	bool		replace[Natts_pg_largeobject];
800 	CatalogIndexState indstate;
801 
802 	Assert(PointerIsValid(obj_desc));
803 
804 	/* enforce writability because snapshot is probably wrong otherwise */
805 	if ((obj_desc->flags & IFS_WRLOCK) == 0)
806 		ereport(ERROR,
807 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
808 				 errmsg("permission denied for large object %u",
809 						obj_desc->id)));
810 
811 	/*
812 	 * use errmsg_internal here because we don't want to expose INT64_FORMAT
813 	 * in translatable strings; doing better is not worth the trouble
814 	 */
815 	if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
816 		ereport(ERROR,
817 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
818 				 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
819 								 len)));
820 
821 	open_lo_relation();
822 
823 	indstate = CatalogOpenIndexes(lo_heap_r);
824 
825 	/*
826 	 * Set up to find all pages with desired loid and pageno >= target
827 	 */
828 	ScanKeyInit(&skey[0],
829 				Anum_pg_largeobject_loid,
830 				BTEqualStrategyNumber, F_OIDEQ,
831 				ObjectIdGetDatum(obj_desc->id));
832 
833 	ScanKeyInit(&skey[1],
834 				Anum_pg_largeobject_pageno,
835 				BTGreaterEqualStrategyNumber, F_INT4GE,
836 				Int32GetDatum(pageno));
837 
838 	sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
839 									obj_desc->snapshot, 2, skey);
840 
841 	/*
842 	 * If possible, get the page the truncation point is in. The truncation
843 	 * point may be beyond the end of the LO or in a hole.
844 	 */
845 	olddata = NULL;
846 	if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
847 	{
848 		if (HeapTupleHasNulls(oldtuple))	/* paranoia */
849 			elog(ERROR, "null field found in pg_largeobject");
850 		olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
851 		Assert(olddata->pageno >= pageno);
852 	}
853 
854 	/*
855 	 * If we found the page of the truncation point we need to truncate the
856 	 * data in it.  Otherwise if we're in a hole, we need to create a page to
857 	 * mark the end of data.
858 	 */
859 	if (olddata != NULL && olddata->pageno == pageno)
860 	{
861 		/* First, load old data into workbuf */
862 		bytea	   *datafield;
863 		int			pagelen;
864 		bool		pfreeit;
865 
866 		getdatafield(olddata, &datafield, &pagelen, &pfreeit);
867 		memcpy(workb, VARDATA(datafield), pagelen);
868 		if (pfreeit)
869 			pfree(datafield);
870 
871 		/*
872 		 * Fill any hole
873 		 */
874 		off = len % LOBLKSIZE;
875 		if (off > pagelen)
876 			MemSet(workb + pagelen, 0, off - pagelen);
877 
878 		/* compute length of new page */
879 		SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
880 
881 		/*
882 		 * Form and insert updated tuple
883 		 */
884 		memset(values, 0, sizeof(values));
885 		memset(nulls, false, sizeof(nulls));
886 		memset(replace, false, sizeof(replace));
887 		values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
888 		replace[Anum_pg_largeobject_data - 1] = true;
889 		newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
890 								   values, nulls, replace);
891 		CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
892 								   indstate);
893 		heap_freetuple(newtup);
894 	}
895 	else
896 	{
897 		/*
898 		 * If the first page we found was after the truncation point, we're in
899 		 * a hole that we'll fill, but we need to delete the later page
900 		 * because the loop below won't visit it again.
901 		 */
902 		if (olddata != NULL)
903 		{
904 			Assert(olddata->pageno > pageno);
905 			CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
906 		}
907 
908 		/*
909 		 * Write a brand new page.
910 		 *
911 		 * Fill the hole up to the truncation point
912 		 */
913 		off = len % LOBLKSIZE;
914 		if (off > 0)
915 			MemSet(workb, 0, off);
916 
917 		/* compute length of new page */
918 		SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
919 
920 		/*
921 		 * Form and insert new tuple
922 		 */
923 		memset(values, 0, sizeof(values));
924 		memset(nulls, false, sizeof(nulls));
925 		values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
926 		values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
927 		values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
928 		newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
929 		CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
930 		heap_freetuple(newtup);
931 	}
932 
933 	/*
934 	 * Delete any pages after the truncation point.  If the initial search
935 	 * didn't find a page, then of course there's nothing more to do.
936 	 */
937 	if (olddata != NULL)
938 	{
939 		while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
940 		{
941 			CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
942 		}
943 	}
944 
945 	systable_endscan_ordered(sd);
946 
947 	CatalogCloseIndexes(indstate);
948 
949 	/*
950 	 * Advance command counter so that tuple updates will be seen by later
951 	 * large-object operations in this transaction.
952 	 */
953 	CommandCounterIncrement();
954 }
955