1 /*-------------------------------------------------------------------------
2 *
3 * inv_api.c
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
6 *
7 *
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 *
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
20 *
21 *
22 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
24 *
25 *
26 * IDENTIFICATION
27 * src/backend/storage/large_object/inv_api.c
28 *
29 *-------------------------------------------------------------------------
30 */
31 #include "postgres.h"
32
33 #include <limits.h>
34
35 #include "access/detoast.h"
36 #include "access/genam.h"
37 #include "access/htup_details.h"
38 #include "access/sysattr.h"
39 #include "access/table.h"
40 #include "access/xact.h"
41 #include "catalog/dependency.h"
42 #include "catalog/indexing.h"
43 #include "catalog/objectaccess.h"
44 #include "catalog/pg_largeobject.h"
45 #include "catalog/pg_largeobject_metadata.h"
46 #include "libpq/libpq-fs.h"
47 #include "miscadmin.h"
48 #include "storage/large_object.h"
49 #include "utils/acl.h"
50 #include "utils/fmgroids.h"
51 #include "utils/rel.h"
52 #include "utils/snapmgr.h"
53
54
55 /*
56 * GUC: backwards-compatibility flag to suppress LO permission checks
57 */
58 bool lo_compat_privileges;
59
60 /*
61 * All accesses to pg_largeobject and its index make use of a single Relation
62 * reference, so that we only need to open pg_relation once per transaction.
63 * To avoid problems when the first such reference occurs inside a
64 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
65 * the Relation reference to TopTransactionResourceOwner.
66 */
67 static Relation lo_heap_r = NULL;
68 static Relation lo_index_r = NULL;
69
70
71 /*
72 * Open pg_largeobject and its index, if not already done in current xact
73 */
74 static void
open_lo_relation(void)75 open_lo_relation(void)
76 {
77 ResourceOwner currentOwner;
78
79 if (lo_heap_r && lo_index_r)
80 return; /* already open in current xact */
81
82 /* Arrange for the top xact to own these relation references */
83 currentOwner = CurrentResourceOwner;
84 CurrentResourceOwner = TopTransactionResourceOwner;
85
86 /* Use RowExclusiveLock since we might either read or write */
87 if (lo_heap_r == NULL)
88 lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
89 if (lo_index_r == NULL)
90 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
91
92 CurrentResourceOwner = currentOwner;
93 }
94
95 /*
96 * Clean up at main transaction end
97 */
98 void
close_lo_relation(bool isCommit)99 close_lo_relation(bool isCommit)
100 {
101 if (lo_heap_r || lo_index_r)
102 {
103 /*
104 * Only bother to close if committing; else abort cleanup will handle
105 * it
106 */
107 if (isCommit)
108 {
109 ResourceOwner currentOwner;
110
111 currentOwner = CurrentResourceOwner;
112 CurrentResourceOwner = TopTransactionResourceOwner;
113
114 if (lo_index_r)
115 index_close(lo_index_r, NoLock);
116 if (lo_heap_r)
117 table_close(lo_heap_r, NoLock);
118
119 CurrentResourceOwner = currentOwner;
120 }
121 lo_heap_r = NULL;
122 lo_index_r = NULL;
123 }
124 }
125
126
127 /*
128 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
129 * read with can be specified.
130 */
131 static bool
myLargeObjectExists(Oid loid,Snapshot snapshot)132 myLargeObjectExists(Oid loid, Snapshot snapshot)
133 {
134 Relation pg_lo_meta;
135 ScanKeyData skey[1];
136 SysScanDesc sd;
137 HeapTuple tuple;
138 bool retval = false;
139
140 ScanKeyInit(&skey[0],
141 Anum_pg_largeobject_metadata_oid,
142 BTEqualStrategyNumber, F_OIDEQ,
143 ObjectIdGetDatum(loid));
144
145 pg_lo_meta = table_open(LargeObjectMetadataRelationId,
146 AccessShareLock);
147
148 sd = systable_beginscan(pg_lo_meta,
149 LargeObjectMetadataOidIndexId, true,
150 snapshot, 1, skey);
151
152 tuple = systable_getnext(sd);
153 if (HeapTupleIsValid(tuple))
154 retval = true;
155
156 systable_endscan(sd);
157
158 table_close(pg_lo_meta, AccessShareLock);
159
160 return retval;
161 }
162
163
164 /*
165 * Extract data field from a pg_largeobject tuple, detoasting if needed
166 * and verifying that the length is sane. Returns data pointer (a bytea *),
167 * data length, and an indication of whether to pfree the data pointer.
168 */
169 static void
getdatafield(Form_pg_largeobject tuple,bytea ** pdatafield,int * plen,bool * pfreeit)170 getdatafield(Form_pg_largeobject tuple,
171 bytea **pdatafield,
172 int *plen,
173 bool *pfreeit)
174 {
175 bytea *datafield;
176 int len;
177 bool freeit;
178
179 datafield = &(tuple->data); /* see note at top of file */
180 freeit = false;
181 if (VARATT_IS_EXTENDED(datafield))
182 {
183 datafield = (bytea *)
184 detoast_attr((struct varlena *) datafield);
185 freeit = true;
186 }
187 len = VARSIZE(datafield) - VARHDRSZ;
188 if (len < 0 || len > LOBLKSIZE)
189 ereport(ERROR,
190 (errcode(ERRCODE_DATA_CORRUPTED),
191 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
192 tuple->loid, tuple->pageno, len)));
193 *pdatafield = datafield;
194 *plen = len;
195 *pfreeit = freeit;
196 }
197
198
199 /*
200 * inv_create -- create a new large object
201 *
202 * Arguments:
203 * lobjId - OID to use for new large object, or InvalidOid to pick one
204 *
205 * Returns:
206 * OID of new object
207 *
208 * If lobjId is not InvalidOid, then an error occurs if the OID is already
209 * in use.
210 */
211 Oid
inv_create(Oid lobjId)212 inv_create(Oid lobjId)
213 {
214 Oid lobjId_new;
215
216 /*
217 * Create a new largeobject with empty data pages
218 */
219 lobjId_new = LargeObjectCreate(lobjId);
220
221 /*
222 * dependency on the owner of largeobject
223 *
224 * The reason why we use LargeObjectRelationId instead of
225 * LargeObjectMetadataRelationId here is to provide backward compatibility
226 * to the applications which utilize a knowledge about internal layout of
227 * system catalogs. OID of pg_largeobject_metadata and loid of
228 * pg_largeobject are same value, so there are no actual differences here.
229 */
230 recordDependencyOnOwner(LargeObjectRelationId,
231 lobjId_new, GetUserId());
232
233 /* Post creation hook for new large object */
234 InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
235
236 /*
237 * Advance command counter to make new tuple visible to later operations.
238 */
239 CommandCounterIncrement();
240
241 return lobjId_new;
242 }
243
244 /*
245 * inv_open -- access an existing large object.
246 *
247 * Returns a large object descriptor, appropriately filled in.
248 * The descriptor and subsidiary data are allocated in the specified
249 * memory context, which must be suitably long-lived for the caller's
250 * purposes. If the returned descriptor has a snapshot associated
251 * with it, the caller must ensure that it also lives long enough,
252 * e.g. by calling RegisterSnapshotOnOwner
253 */
254 LargeObjectDesc *
inv_open(Oid lobjId,int flags,MemoryContext mcxt)255 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
256 {
257 LargeObjectDesc *retval;
258 Snapshot snapshot = NULL;
259 int descflags = 0;
260
261 /*
262 * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
263 * | INV_READ), the caller being allowed to read the large object
264 * descriptor in either case.
265 */
266 if (flags & INV_WRITE)
267 descflags |= IFS_WRLOCK | IFS_RDLOCK;
268 if (flags & INV_READ)
269 descflags |= IFS_RDLOCK;
270
271 if (descflags == 0)
272 ereport(ERROR,
273 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
274 errmsg("invalid flags for opening a large object: %d",
275 flags)));
276
277 /* Get snapshot. If write is requested, use an instantaneous snapshot. */
278 if (descflags & IFS_WRLOCK)
279 snapshot = NULL;
280 else
281 snapshot = GetActiveSnapshot();
282
283 /* Can't use LargeObjectExists here because we need to specify snapshot */
284 if (!myLargeObjectExists(lobjId, snapshot))
285 ereport(ERROR,
286 (errcode(ERRCODE_UNDEFINED_OBJECT),
287 errmsg("large object %u does not exist", lobjId)));
288
289 /* Apply permission checks, again specifying snapshot */
290 if ((descflags & IFS_RDLOCK) != 0)
291 {
292 if (!lo_compat_privileges &&
293 pg_largeobject_aclcheck_snapshot(lobjId,
294 GetUserId(),
295 ACL_SELECT,
296 snapshot) != ACLCHECK_OK)
297 ereport(ERROR,
298 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
299 errmsg("permission denied for large object %u",
300 lobjId)));
301 }
302 if ((descflags & IFS_WRLOCK) != 0)
303 {
304 if (!lo_compat_privileges &&
305 pg_largeobject_aclcheck_snapshot(lobjId,
306 GetUserId(),
307 ACL_UPDATE,
308 snapshot) != ACLCHECK_OK)
309 ereport(ERROR,
310 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
311 errmsg("permission denied for large object %u",
312 lobjId)));
313 }
314
315 /* OK to create a descriptor */
316 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
317 sizeof(LargeObjectDesc));
318 retval->id = lobjId;
319 retval->offset = 0;
320 retval->flags = descflags;
321
322 /* caller sets if needed, not used by the functions in this file */
323 retval->subid = InvalidSubTransactionId;
324
325 /*
326 * The snapshot (if any) is just the currently active snapshot. The
327 * caller will replace it with a longer-lived copy if needed.
328 */
329 retval->snapshot = snapshot;
330
331 return retval;
332 }
333
334 /*
335 * Closes a large object descriptor previously made by inv_open(), and
336 * releases the long-term memory used by it.
337 */
338 void
inv_close(LargeObjectDesc * obj_desc)339 inv_close(LargeObjectDesc *obj_desc)
340 {
341 Assert(PointerIsValid(obj_desc));
342 pfree(obj_desc);
343 }
344
345 /*
346 * Destroys an existing large object (not to be confused with a descriptor!)
347 *
348 * Note we expect caller to have done any required permissions check.
349 */
350 int
inv_drop(Oid lobjId)351 inv_drop(Oid lobjId)
352 {
353 ObjectAddress object;
354
355 /*
356 * Delete any comments and dependencies on the large object
357 */
358 object.classId = LargeObjectRelationId;
359 object.objectId = lobjId;
360 object.objectSubId = 0;
361 performDeletion(&object, DROP_CASCADE, 0);
362
363 /*
364 * Advance command counter so that tuple removal will be seen by later
365 * large-object operations in this transaction.
366 */
367 CommandCounterIncrement();
368
369 /* For historical reasons, we always return 1 on success. */
370 return 1;
371 }
372
373 /*
374 * Determine size of a large object
375 *
376 * NOTE: LOs can contain gaps, just like Unix files. We actually return
377 * the offset of the last byte + 1.
378 */
379 static uint64
inv_getsize(LargeObjectDesc * obj_desc)380 inv_getsize(LargeObjectDesc *obj_desc)
381 {
382 uint64 lastbyte = 0;
383 ScanKeyData skey[1];
384 SysScanDesc sd;
385 HeapTuple tuple;
386
387 Assert(PointerIsValid(obj_desc));
388
389 open_lo_relation();
390
391 ScanKeyInit(&skey[0],
392 Anum_pg_largeobject_loid,
393 BTEqualStrategyNumber, F_OIDEQ,
394 ObjectIdGetDatum(obj_desc->id));
395
396 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
397 obj_desc->snapshot, 1, skey);
398
399 /*
400 * Because the pg_largeobject index is on both loid and pageno, but we
401 * constrain only loid, a backwards scan should visit all pages of the
402 * large object in reverse pageno order. So, it's sufficient to examine
403 * the first valid tuple (== last valid page).
404 */
405 tuple = systable_getnext_ordered(sd, BackwardScanDirection);
406 if (HeapTupleIsValid(tuple))
407 {
408 Form_pg_largeobject data;
409 bytea *datafield;
410 int len;
411 bool pfreeit;
412
413 if (HeapTupleHasNulls(tuple)) /* paranoia */
414 elog(ERROR, "null field found in pg_largeobject");
415 data = (Form_pg_largeobject) GETSTRUCT(tuple);
416 getdatafield(data, &datafield, &len, &pfreeit);
417 lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
418 if (pfreeit)
419 pfree(datafield);
420 }
421
422 systable_endscan_ordered(sd);
423
424 return lastbyte;
425 }
426
427 int64
inv_seek(LargeObjectDesc * obj_desc,int64 offset,int whence)428 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
429 {
430 int64 newoffset;
431
432 Assert(PointerIsValid(obj_desc));
433
434 /*
435 * We allow seek/tell if you have either read or write permission, so no
436 * need for a permission check here.
437 */
438
439 /*
440 * Note: overflow in the additions is possible, but since we will reject
441 * negative results, we don't need any extra test for that.
442 */
443 switch (whence)
444 {
445 case SEEK_SET:
446 newoffset = offset;
447 break;
448 case SEEK_CUR:
449 newoffset = obj_desc->offset + offset;
450 break;
451 case SEEK_END:
452 newoffset = inv_getsize(obj_desc) + offset;
453 break;
454 default:
455 ereport(ERROR,
456 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
457 errmsg("invalid whence setting: %d", whence)));
458 newoffset = 0; /* keep compiler quiet */
459 break;
460 }
461
462 /*
463 * use errmsg_internal here because we don't want to expose INT64_FORMAT
464 * in translatable strings; doing better is not worth the trouble
465 */
466 if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
467 ereport(ERROR,
468 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
470 newoffset)));
471
472 obj_desc->offset = newoffset;
473 return newoffset;
474 }
475
476 int64
inv_tell(LargeObjectDesc * obj_desc)477 inv_tell(LargeObjectDesc *obj_desc)
478 {
479 Assert(PointerIsValid(obj_desc));
480
481 /*
482 * We allow seek/tell if you have either read or write permission, so no
483 * need for a permission check here.
484 */
485
486 return obj_desc->offset;
487 }
488
489 int
inv_read(LargeObjectDesc * obj_desc,char * buf,int nbytes)490 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
491 {
492 int nread = 0;
493 int64 n;
494 int64 off;
495 int len;
496 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
497 uint64 pageoff;
498 ScanKeyData skey[2];
499 SysScanDesc sd;
500 HeapTuple tuple;
501
502 Assert(PointerIsValid(obj_desc));
503 Assert(buf != NULL);
504
505 if ((obj_desc->flags & IFS_RDLOCK) == 0)
506 ereport(ERROR,
507 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
508 errmsg("permission denied for large object %u",
509 obj_desc->id)));
510
511 if (nbytes <= 0)
512 return 0;
513
514 open_lo_relation();
515
516 ScanKeyInit(&skey[0],
517 Anum_pg_largeobject_loid,
518 BTEqualStrategyNumber, F_OIDEQ,
519 ObjectIdGetDatum(obj_desc->id));
520
521 ScanKeyInit(&skey[1],
522 Anum_pg_largeobject_pageno,
523 BTGreaterEqualStrategyNumber, F_INT4GE,
524 Int32GetDatum(pageno));
525
526 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
527 obj_desc->snapshot, 2, skey);
528
529 while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
530 {
531 Form_pg_largeobject data;
532 bytea *datafield;
533 bool pfreeit;
534
535 if (HeapTupleHasNulls(tuple)) /* paranoia */
536 elog(ERROR, "null field found in pg_largeobject");
537 data = (Form_pg_largeobject) GETSTRUCT(tuple);
538
539 /*
540 * We expect the indexscan will deliver pages in order. However,
541 * there may be missing pages if the LO contains unwritten "holes". We
542 * want missing sections to read out as zeroes.
543 */
544 pageoff = ((uint64) data->pageno) * LOBLKSIZE;
545 if (pageoff > obj_desc->offset)
546 {
547 n = pageoff - obj_desc->offset;
548 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
549 MemSet(buf + nread, 0, n);
550 nread += n;
551 obj_desc->offset += n;
552 }
553
554 if (nread < nbytes)
555 {
556 Assert(obj_desc->offset >= pageoff);
557 off = (int) (obj_desc->offset - pageoff);
558 Assert(off >= 0 && off < LOBLKSIZE);
559
560 getdatafield(data, &datafield, &len, &pfreeit);
561 if (len > off)
562 {
563 n = len - off;
564 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
565 memcpy(buf + nread, VARDATA(datafield) + off, n);
566 nread += n;
567 obj_desc->offset += n;
568 }
569 if (pfreeit)
570 pfree(datafield);
571 }
572
573 if (nread >= nbytes)
574 break;
575 }
576
577 systable_endscan_ordered(sd);
578
579 return nread;
580 }
581
582 int
inv_write(LargeObjectDesc * obj_desc,const char * buf,int nbytes)583 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
584 {
585 int nwritten = 0;
586 int n;
587 int off;
588 int len;
589 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
590 ScanKeyData skey[2];
591 SysScanDesc sd;
592 HeapTuple oldtuple;
593 Form_pg_largeobject olddata;
594 bool neednextpage;
595 bytea *datafield;
596 bool pfreeit;
597 union
598 {
599 bytea hdr;
600 /* this is to make the union big enough for a LO data chunk: */
601 char data[LOBLKSIZE + VARHDRSZ];
602 /* ensure union is aligned well enough: */
603 int32 align_it;
604 } workbuf;
605 char *workb = VARDATA(&workbuf.hdr);
606 HeapTuple newtup;
607 Datum values[Natts_pg_largeobject];
608 bool nulls[Natts_pg_largeobject];
609 bool replace[Natts_pg_largeobject];
610 CatalogIndexState indstate;
611
612 Assert(PointerIsValid(obj_desc));
613 Assert(buf != NULL);
614
615 /* enforce writability because snapshot is probably wrong otherwise */
616 if ((obj_desc->flags & IFS_WRLOCK) == 0)
617 ereport(ERROR,
618 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
619 errmsg("permission denied for large object %u",
620 obj_desc->id)));
621
622 if (nbytes <= 0)
623 return 0;
624
625 /* this addition can't overflow because nbytes is only int32 */
626 if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
627 ereport(ERROR,
628 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629 errmsg("invalid large object write request size: %d",
630 nbytes)));
631
632 open_lo_relation();
633
634 indstate = CatalogOpenIndexes(lo_heap_r);
635
636 ScanKeyInit(&skey[0],
637 Anum_pg_largeobject_loid,
638 BTEqualStrategyNumber, F_OIDEQ,
639 ObjectIdGetDatum(obj_desc->id));
640
641 ScanKeyInit(&skey[1],
642 Anum_pg_largeobject_pageno,
643 BTGreaterEqualStrategyNumber, F_INT4GE,
644 Int32GetDatum(pageno));
645
646 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
647 obj_desc->snapshot, 2, skey);
648
649 oldtuple = NULL;
650 olddata = NULL;
651 neednextpage = true;
652
653 while (nwritten < nbytes)
654 {
655 /*
656 * If possible, get next pre-existing page of the LO. We expect the
657 * indexscan will deliver these in order --- but there may be holes.
658 */
659 if (neednextpage)
660 {
661 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
662 {
663 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
664 elog(ERROR, "null field found in pg_largeobject");
665 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
666 Assert(olddata->pageno >= pageno);
667 }
668 neednextpage = false;
669 }
670
671 /*
672 * If we have a pre-existing page, see if it is the page we want to
673 * write, or a later one.
674 */
675 if (olddata != NULL && olddata->pageno == pageno)
676 {
677 /*
678 * Update an existing page with fresh data.
679 *
680 * First, load old data into workbuf
681 */
682 getdatafield(olddata, &datafield, &len, &pfreeit);
683 memcpy(workb, VARDATA(datafield), len);
684 if (pfreeit)
685 pfree(datafield);
686
687 /*
688 * Fill any hole
689 */
690 off = (int) (obj_desc->offset % LOBLKSIZE);
691 if (off > len)
692 MemSet(workb + len, 0, off - len);
693
694 /*
695 * Insert appropriate portion of new data
696 */
697 n = LOBLKSIZE - off;
698 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
699 memcpy(workb + off, buf + nwritten, n);
700 nwritten += n;
701 obj_desc->offset += n;
702 off += n;
703 /* compute valid length of new page */
704 len = (len >= off) ? len : off;
705 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
706
707 /*
708 * Form and insert updated tuple
709 */
710 memset(values, 0, sizeof(values));
711 memset(nulls, false, sizeof(nulls));
712 memset(replace, false, sizeof(replace));
713 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
714 replace[Anum_pg_largeobject_data - 1] = true;
715 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
716 values, nulls, replace);
717 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
718 indstate);
719 heap_freetuple(newtup);
720
721 /*
722 * We're done with this old page.
723 */
724 oldtuple = NULL;
725 olddata = NULL;
726 neednextpage = true;
727 }
728 else
729 {
730 /*
731 * Write a brand new page.
732 *
733 * First, fill any hole
734 */
735 off = (int) (obj_desc->offset % LOBLKSIZE);
736 if (off > 0)
737 MemSet(workb, 0, off);
738
739 /*
740 * Insert appropriate portion of new data
741 */
742 n = LOBLKSIZE - off;
743 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
744 memcpy(workb + off, buf + nwritten, n);
745 nwritten += n;
746 obj_desc->offset += n;
747 /* compute valid length of new page */
748 len = off + n;
749 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
750
751 /*
752 * Form and insert updated tuple
753 */
754 memset(values, 0, sizeof(values));
755 memset(nulls, false, sizeof(nulls));
756 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
757 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
758 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
759 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
760 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
761 heap_freetuple(newtup);
762 }
763 pageno++;
764 }
765
766 systable_endscan_ordered(sd);
767
768 CatalogCloseIndexes(indstate);
769
770 /*
771 * Advance command counter so that my tuple updates will be seen by later
772 * large-object operations in this transaction.
773 */
774 CommandCounterIncrement();
775
776 return nwritten;
777 }
778
779 void
inv_truncate(LargeObjectDesc * obj_desc,int64 len)780 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
781 {
782 int32 pageno = (int32) (len / LOBLKSIZE);
783 int32 off;
784 ScanKeyData skey[2];
785 SysScanDesc sd;
786 HeapTuple oldtuple;
787 Form_pg_largeobject olddata;
788 union
789 {
790 bytea hdr;
791 /* this is to make the union big enough for a LO data chunk: */
792 char data[LOBLKSIZE + VARHDRSZ];
793 /* ensure union is aligned well enough: */
794 int32 align_it;
795 } workbuf;
796 char *workb = VARDATA(&workbuf.hdr);
797 HeapTuple newtup;
798 Datum values[Natts_pg_largeobject];
799 bool nulls[Natts_pg_largeobject];
800 bool replace[Natts_pg_largeobject];
801 CatalogIndexState indstate;
802
803 Assert(PointerIsValid(obj_desc));
804
805 /* enforce writability because snapshot is probably wrong otherwise */
806 if ((obj_desc->flags & IFS_WRLOCK) == 0)
807 ereport(ERROR,
808 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
809 errmsg("permission denied for large object %u",
810 obj_desc->id)));
811
812 /*
813 * use errmsg_internal here because we don't want to expose INT64_FORMAT
814 * in translatable strings; doing better is not worth the trouble
815 */
816 if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
817 ereport(ERROR,
818 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
819 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
820 len)));
821
822 open_lo_relation();
823
824 indstate = CatalogOpenIndexes(lo_heap_r);
825
826 /*
827 * Set up to find all pages with desired loid and pageno >= target
828 */
829 ScanKeyInit(&skey[0],
830 Anum_pg_largeobject_loid,
831 BTEqualStrategyNumber, F_OIDEQ,
832 ObjectIdGetDatum(obj_desc->id));
833
834 ScanKeyInit(&skey[1],
835 Anum_pg_largeobject_pageno,
836 BTGreaterEqualStrategyNumber, F_INT4GE,
837 Int32GetDatum(pageno));
838
839 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
840 obj_desc->snapshot, 2, skey);
841
842 /*
843 * If possible, get the page the truncation point is in. The truncation
844 * point may be beyond the end of the LO or in a hole.
845 */
846 olddata = NULL;
847 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
848 {
849 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
850 elog(ERROR, "null field found in pg_largeobject");
851 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
852 Assert(olddata->pageno >= pageno);
853 }
854
855 /*
856 * If we found the page of the truncation point we need to truncate the
857 * data in it. Otherwise if we're in a hole, we need to create a page to
858 * mark the end of data.
859 */
860 if (olddata != NULL && olddata->pageno == pageno)
861 {
862 /* First, load old data into workbuf */
863 bytea *datafield;
864 int pagelen;
865 bool pfreeit;
866
867 getdatafield(olddata, &datafield, &pagelen, &pfreeit);
868 memcpy(workb, VARDATA(datafield), pagelen);
869 if (pfreeit)
870 pfree(datafield);
871
872 /*
873 * Fill any hole
874 */
875 off = len % LOBLKSIZE;
876 if (off > pagelen)
877 MemSet(workb + pagelen, 0, off - pagelen);
878
879 /* compute length of new page */
880 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
881
882 /*
883 * Form and insert updated tuple
884 */
885 memset(values, 0, sizeof(values));
886 memset(nulls, false, sizeof(nulls));
887 memset(replace, false, sizeof(replace));
888 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
889 replace[Anum_pg_largeobject_data - 1] = true;
890 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
891 values, nulls, replace);
892 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
893 indstate);
894 heap_freetuple(newtup);
895 }
896 else
897 {
898 /*
899 * If the first page we found was after the truncation point, we're in
900 * a hole that we'll fill, but we need to delete the later page
901 * because the loop below won't visit it again.
902 */
903 if (olddata != NULL)
904 {
905 Assert(olddata->pageno > pageno);
906 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
907 }
908
909 /*
910 * Write a brand new page.
911 *
912 * Fill the hole up to the truncation point
913 */
914 off = len % LOBLKSIZE;
915 if (off > 0)
916 MemSet(workb, 0, off);
917
918 /* compute length of new page */
919 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
920
921 /*
922 * Form and insert new tuple
923 */
924 memset(values, 0, sizeof(values));
925 memset(nulls, false, sizeof(nulls));
926 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
927 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
928 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
929 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
930 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
931 heap_freetuple(newtup);
932 }
933
934 /*
935 * Delete any pages after the truncation point. If the initial search
936 * didn't find a page, then of course there's nothing more to do.
937 */
938 if (olddata != NULL)
939 {
940 while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
941 {
942 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
943 }
944 }
945
946 systable_endscan_ordered(sd);
947
948 CatalogCloseIndexes(indstate);
949
950 /*
951 * Advance command counter so that tuple updates will be seen by later
952 * large-object operations in this transaction.
953 */
954 CommandCounterIncrement();
955 }
956