1 /*-------------------------------------------------------------------------
2 *
3 * inv_api.c
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
6 *
7 *
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 *
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
20 *
21 *
22 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
24 *
25 *
26 * IDENTIFICATION
27 * src/backend/storage/large_object/inv_api.c
28 *
29 *-------------------------------------------------------------------------
30 */
31 #include "postgres.h"
32
33 #include <limits.h>
34
35 #include "access/genam.h"
36 #include "access/heapam.h"
37 #include "access/sysattr.h"
38 #include "access/tuptoaster.h"
39 #include "access/xact.h"
40 #include "catalog/dependency.h"
41 #include "catalog/indexing.h"
42 #include "catalog/objectaccess.h"
43 #include "catalog/pg_largeobject.h"
44 #include "catalog/pg_largeobject_metadata.h"
45 #include "libpq/libpq-fs.h"
46 #include "miscadmin.h"
47 #include "storage/large_object.h"
48 #include "utils/fmgroids.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51 #include "utils/tqual.h"
52
53
54 /*
55 * GUC: backwards-compatibility flag to suppress LO permission checks
56 */
57 bool lo_compat_privileges;
58
59 /*
60 * All accesses to pg_largeobject and its index make use of a single Relation
61 * reference, so that we only need to open pg_relation once per transaction.
62 * To avoid problems when the first such reference occurs inside a
63 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
64 * the Relation reference to TopTransactionResourceOwner.
65 */
66 static Relation lo_heap_r = NULL;
67 static Relation lo_index_r = NULL;
68
69
70 /*
71 * Open pg_largeobject and its index, if not already done in current xact
72 */
73 static void
open_lo_relation(void)74 open_lo_relation(void)
75 {
76 ResourceOwner currentOwner;
77
78 if (lo_heap_r && lo_index_r)
79 return; /* already open in current xact */
80
81 /* Arrange for the top xact to own these relation references */
82 currentOwner = CurrentResourceOwner;
83 CurrentResourceOwner = TopTransactionResourceOwner;
84
85 /* Use RowExclusiveLock since we might either read or write */
86 if (lo_heap_r == NULL)
87 lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
88 if (lo_index_r == NULL)
89 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
90
91 CurrentResourceOwner = currentOwner;
92 }
93
94 /*
95 * Clean up at main transaction end
96 */
97 void
close_lo_relation(bool isCommit)98 close_lo_relation(bool isCommit)
99 {
100 if (lo_heap_r || lo_index_r)
101 {
102 /*
103 * Only bother to close if committing; else abort cleanup will handle
104 * it
105 */
106 if (isCommit)
107 {
108 ResourceOwner currentOwner;
109
110 currentOwner = CurrentResourceOwner;
111 CurrentResourceOwner = TopTransactionResourceOwner;
112
113 if (lo_index_r)
114 index_close(lo_index_r, NoLock);
115 if (lo_heap_r)
116 heap_close(lo_heap_r, NoLock);
117
118 CurrentResourceOwner = currentOwner;
119 }
120 lo_heap_r = NULL;
121 lo_index_r = NULL;
122 }
123 }
124
125
126 /*
127 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
128 * read with can be specified.
129 */
130 static bool
myLargeObjectExists(Oid loid,Snapshot snapshot)131 myLargeObjectExists(Oid loid, Snapshot snapshot)
132 {
133 Relation pg_lo_meta;
134 ScanKeyData skey[1];
135 SysScanDesc sd;
136 HeapTuple tuple;
137 bool retval = false;
138
139 ScanKeyInit(&skey[0],
140 ObjectIdAttributeNumber,
141 BTEqualStrategyNumber, F_OIDEQ,
142 ObjectIdGetDatum(loid));
143
144 pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
145 AccessShareLock);
146
147 sd = systable_beginscan(pg_lo_meta,
148 LargeObjectMetadataOidIndexId, true,
149 snapshot, 1, skey);
150
151 tuple = systable_getnext(sd);
152 if (HeapTupleIsValid(tuple))
153 retval = true;
154
155 systable_endscan(sd);
156
157 heap_close(pg_lo_meta, AccessShareLock);
158
159 return retval;
160 }
161
162
163 /*
164 * Extract data field from a pg_largeobject tuple, detoasting if needed
165 * and verifying that the length is sane. Returns data pointer (a bytea *),
166 * data length, and an indication of whether to pfree the data pointer.
167 */
168 static void
getdatafield(Form_pg_largeobject tuple,bytea ** pdatafield,int * plen,bool * pfreeit)169 getdatafield(Form_pg_largeobject tuple,
170 bytea **pdatafield,
171 int *plen,
172 bool *pfreeit)
173 {
174 bytea *datafield;
175 int len;
176 bool freeit;
177
178 datafield = &(tuple->data); /* see note at top of file */
179 freeit = false;
180 if (VARATT_IS_EXTENDED(datafield))
181 {
182 datafield = (bytea *)
183 heap_tuple_untoast_attr((struct varlena *) datafield);
184 freeit = true;
185 }
186 len = VARSIZE(datafield) - VARHDRSZ;
187 if (len < 0 || len > LOBLKSIZE)
188 ereport(ERROR,
189 (errcode(ERRCODE_DATA_CORRUPTED),
190 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
191 tuple->loid, tuple->pageno, len)));
192 *pdatafield = datafield;
193 *plen = len;
194 *pfreeit = freeit;
195 }
196
197
198 /*
199 * inv_create -- create a new large object
200 *
201 * Arguments:
202 * lobjId - OID to use for new large object, or InvalidOid to pick one
203 *
204 * Returns:
205 * OID of new object
206 *
207 * If lobjId is not InvalidOid, then an error occurs if the OID is already
208 * in use.
209 */
210 Oid
inv_create(Oid lobjId)211 inv_create(Oid lobjId)
212 {
213 Oid lobjId_new;
214
215 /*
216 * Create a new largeobject with empty data pages
217 */
218 lobjId_new = LargeObjectCreate(lobjId);
219
220 /*
221 * dependency on the owner of largeobject
222 *
223 * The reason why we use LargeObjectRelationId instead of
224 * LargeObjectMetadataRelationId here is to provide backward compatibility
225 * to the applications which utilize a knowledge about internal layout of
226 * system catalogs. OID of pg_largeobject_metadata and loid of
227 * pg_largeobject are same value, so there are no actual differences here.
228 */
229 recordDependencyOnOwner(LargeObjectRelationId,
230 lobjId_new, GetUserId());
231
232 /* Post creation hook for new large object */
233 InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
234
235 /*
236 * Advance command counter to make new tuple visible to later operations.
237 */
238 CommandCounterIncrement();
239
240 return lobjId_new;
241 }
242
243 /*
244 * inv_open -- access an existing large object.
245 *
246 * Returns a large object descriptor, appropriately filled in.
247 * The descriptor and subsidiary data are allocated in the specified
248 * memory context, which must be suitably long-lived for the caller's
249 * purposes. If the returned descriptor has a snapshot associated
250 * with it, the caller must ensure that it also lives long enough,
251 * e.g. by calling RegisterSnapshotOnOwner
252 */
253 LargeObjectDesc *
inv_open(Oid lobjId,int flags,MemoryContext mcxt)254 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
255 {
256 LargeObjectDesc *retval;
257 Snapshot snapshot = NULL;
258 int descflags = 0;
259
260 /*
261 * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
262 * | INV_READ), the caller being allowed to read the large object
263 * descriptor in either case.
264 */
265 if (flags & INV_WRITE)
266 descflags |= IFS_WRLOCK | IFS_RDLOCK;
267 if (flags & INV_READ)
268 descflags |= IFS_RDLOCK;
269
270 if (descflags == 0)
271 ereport(ERROR,
272 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
273 errmsg("invalid flags for opening a large object: %d",
274 flags)));
275
276 /* Get snapshot. If write is requested, use an instantaneous snapshot. */
277 if (descflags & IFS_WRLOCK)
278 snapshot = NULL;
279 else
280 snapshot = GetActiveSnapshot();
281
282 /* Can't use LargeObjectExists here because we need to specify snapshot */
283 if (!myLargeObjectExists(lobjId, snapshot))
284 ereport(ERROR,
285 (errcode(ERRCODE_UNDEFINED_OBJECT),
286 errmsg("large object %u does not exist", lobjId)));
287
288 /* Apply permission checks, again specifying snapshot */
289 if ((descflags & IFS_RDLOCK) != 0)
290 {
291 if (!lo_compat_privileges &&
292 pg_largeobject_aclcheck_snapshot(lobjId,
293 GetUserId(),
294 ACL_SELECT,
295 snapshot) != ACLCHECK_OK)
296 ereport(ERROR,
297 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
298 errmsg("permission denied for large object %u",
299 lobjId)));
300 }
301 if ((descflags & IFS_WRLOCK) != 0)
302 {
303 if (!lo_compat_privileges &&
304 pg_largeobject_aclcheck_snapshot(lobjId,
305 GetUserId(),
306 ACL_UPDATE,
307 snapshot) != ACLCHECK_OK)
308 ereport(ERROR,
309 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
310 errmsg("permission denied for large object %u",
311 lobjId)));
312 }
313
314 /* OK to create a descriptor */
315 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
316 sizeof(LargeObjectDesc));
317 retval->id = lobjId;
318 retval->offset = 0;
319 retval->flags = descflags;
320
321 /* caller sets if needed, not used by the functions in this file */
322 retval->subid = InvalidSubTransactionId;
323
324 /*
325 * The snapshot (if any) is just the currently active snapshot. The
326 * caller will replace it with a longer-lived copy if needed.
327 */
328 retval->snapshot = snapshot;
329
330 return retval;
331 }
332
333 /*
334 * Closes a large object descriptor previously made by inv_open(), and
335 * releases the long-term memory used by it.
336 */
337 void
inv_close(LargeObjectDesc * obj_desc)338 inv_close(LargeObjectDesc *obj_desc)
339 {
340 Assert(PointerIsValid(obj_desc));
341 pfree(obj_desc);
342 }
343
344 /*
345 * Destroys an existing large object (not to be confused with a descriptor!)
346 *
347 * Note we expect caller to have done any required permissions check.
348 */
349 int
inv_drop(Oid lobjId)350 inv_drop(Oid lobjId)
351 {
352 ObjectAddress object;
353
354 /*
355 * Delete any comments and dependencies on the large object
356 */
357 object.classId = LargeObjectRelationId;
358 object.objectId = lobjId;
359 object.objectSubId = 0;
360 performDeletion(&object, DROP_CASCADE, 0);
361
362 /*
363 * Advance command counter so that tuple removal will be seen by later
364 * large-object operations in this transaction.
365 */
366 CommandCounterIncrement();
367
368 /* For historical reasons, we always return 1 on success. */
369 return 1;
370 }
371
372 /*
373 * Determine size of a large object
374 *
375 * NOTE: LOs can contain gaps, just like Unix files. We actually return
376 * the offset of the last byte + 1.
377 */
378 static uint64
inv_getsize(LargeObjectDesc * obj_desc)379 inv_getsize(LargeObjectDesc *obj_desc)
380 {
381 uint64 lastbyte = 0;
382 ScanKeyData skey[1];
383 SysScanDesc sd;
384 HeapTuple tuple;
385
386 Assert(PointerIsValid(obj_desc));
387
388 open_lo_relation();
389
390 ScanKeyInit(&skey[0],
391 Anum_pg_largeobject_loid,
392 BTEqualStrategyNumber, F_OIDEQ,
393 ObjectIdGetDatum(obj_desc->id));
394
395 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
396 obj_desc->snapshot, 1, skey);
397
398 /*
399 * Because the pg_largeobject index is on both loid and pageno, but we
400 * constrain only loid, a backwards scan should visit all pages of the
401 * large object in reverse pageno order. So, it's sufficient to examine
402 * the first valid tuple (== last valid page).
403 */
404 tuple = systable_getnext_ordered(sd, BackwardScanDirection);
405 if (HeapTupleIsValid(tuple))
406 {
407 Form_pg_largeobject data;
408 bytea *datafield;
409 int len;
410 bool pfreeit;
411
412 if (HeapTupleHasNulls(tuple)) /* paranoia */
413 elog(ERROR, "null field found in pg_largeobject");
414 data = (Form_pg_largeobject) GETSTRUCT(tuple);
415 getdatafield(data, &datafield, &len, &pfreeit);
416 lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
417 if (pfreeit)
418 pfree(datafield);
419 }
420
421 systable_endscan_ordered(sd);
422
423 return lastbyte;
424 }
425
426 int64
inv_seek(LargeObjectDesc * obj_desc,int64 offset,int whence)427 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
428 {
429 int64 newoffset;
430
431 Assert(PointerIsValid(obj_desc));
432
433 /*
434 * We allow seek/tell if you have either read or write permission, so no
435 * need for a permission check here.
436 */
437
438 /*
439 * Note: overflow in the additions is possible, but since we will reject
440 * negative results, we don't need any extra test for that.
441 */
442 switch (whence)
443 {
444 case SEEK_SET:
445 newoffset = offset;
446 break;
447 case SEEK_CUR:
448 newoffset = obj_desc->offset + offset;
449 break;
450 case SEEK_END:
451 newoffset = inv_getsize(obj_desc) + offset;
452 break;
453 default:
454 ereport(ERROR,
455 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
456 errmsg("invalid whence setting: %d", whence)));
457 newoffset = 0; /* keep compiler quiet */
458 break;
459 }
460
461 /*
462 * use errmsg_internal here because we don't want to expose INT64_FORMAT
463 * in translatable strings; doing better is not worth the trouble
464 */
465 if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
466 ereport(ERROR,
467 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
468 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
469 newoffset)));
470
471 obj_desc->offset = newoffset;
472 return newoffset;
473 }
474
475 int64
inv_tell(LargeObjectDesc * obj_desc)476 inv_tell(LargeObjectDesc *obj_desc)
477 {
478 Assert(PointerIsValid(obj_desc));
479
480 /*
481 * We allow seek/tell if you have either read or write permission, so no
482 * need for a permission check here.
483 */
484
485 return obj_desc->offset;
486 }
487
488 int
inv_read(LargeObjectDesc * obj_desc,char * buf,int nbytes)489 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
490 {
491 int nread = 0;
492 int64 n;
493 int64 off;
494 int len;
495 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
496 uint64 pageoff;
497 ScanKeyData skey[2];
498 SysScanDesc sd;
499 HeapTuple tuple;
500
501 Assert(PointerIsValid(obj_desc));
502 Assert(buf != NULL);
503
504 if ((obj_desc->flags & IFS_RDLOCK) == 0)
505 ereport(ERROR,
506 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
507 errmsg("permission denied for large object %u",
508 obj_desc->id)));
509
510 if (nbytes <= 0)
511 return 0;
512
513 open_lo_relation();
514
515 ScanKeyInit(&skey[0],
516 Anum_pg_largeobject_loid,
517 BTEqualStrategyNumber, F_OIDEQ,
518 ObjectIdGetDatum(obj_desc->id));
519
520 ScanKeyInit(&skey[1],
521 Anum_pg_largeobject_pageno,
522 BTGreaterEqualStrategyNumber, F_INT4GE,
523 Int32GetDatum(pageno));
524
525 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
526 obj_desc->snapshot, 2, skey);
527
528 while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
529 {
530 Form_pg_largeobject data;
531 bytea *datafield;
532 bool pfreeit;
533
534 if (HeapTupleHasNulls(tuple)) /* paranoia */
535 elog(ERROR, "null field found in pg_largeobject");
536 data = (Form_pg_largeobject) GETSTRUCT(tuple);
537
538 /*
539 * We expect the indexscan will deliver pages in order. However,
540 * there may be missing pages if the LO contains unwritten "holes". We
541 * want missing sections to read out as zeroes.
542 */
543 pageoff = ((uint64) data->pageno) * LOBLKSIZE;
544 if (pageoff > obj_desc->offset)
545 {
546 n = pageoff - obj_desc->offset;
547 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
548 MemSet(buf + nread, 0, n);
549 nread += n;
550 obj_desc->offset += n;
551 }
552
553 if (nread < nbytes)
554 {
555 Assert(obj_desc->offset >= pageoff);
556 off = (int) (obj_desc->offset - pageoff);
557 Assert(off >= 0 && off < LOBLKSIZE);
558
559 getdatafield(data, &datafield, &len, &pfreeit);
560 if (len > off)
561 {
562 n = len - off;
563 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
564 memcpy(buf + nread, VARDATA(datafield) + off, n);
565 nread += n;
566 obj_desc->offset += n;
567 }
568 if (pfreeit)
569 pfree(datafield);
570 }
571
572 if (nread >= nbytes)
573 break;
574 }
575
576 systable_endscan_ordered(sd);
577
578 return nread;
579 }
580
581 int
inv_write(LargeObjectDesc * obj_desc,const char * buf,int nbytes)582 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
583 {
584 int nwritten = 0;
585 int n;
586 int off;
587 int len;
588 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
589 ScanKeyData skey[2];
590 SysScanDesc sd;
591 HeapTuple oldtuple;
592 Form_pg_largeobject olddata;
593 bool neednextpage;
594 bytea *datafield;
595 bool pfreeit;
596 union
597 {
598 bytea hdr;
599 /* this is to make the union big enough for a LO data chunk: */
600 char data[LOBLKSIZE + VARHDRSZ];
601 /* ensure union is aligned well enough: */
602 int32 align_it;
603 } workbuf;
604 char *workb = VARDATA(&workbuf.hdr);
605 HeapTuple newtup;
606 Datum values[Natts_pg_largeobject];
607 bool nulls[Natts_pg_largeobject];
608 bool replace[Natts_pg_largeobject];
609 CatalogIndexState indstate;
610
611 Assert(PointerIsValid(obj_desc));
612 Assert(buf != NULL);
613
614 /* enforce writability because snapshot is probably wrong otherwise */
615 if ((obj_desc->flags & IFS_WRLOCK) == 0)
616 ereport(ERROR,
617 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
618 errmsg("permission denied for large object %u",
619 obj_desc->id)));
620
621 if (nbytes <= 0)
622 return 0;
623
624 /* this addition can't overflow because nbytes is only int32 */
625 if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
626 ereport(ERROR,
627 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
628 errmsg("invalid large object write request size: %d",
629 nbytes)));
630
631 open_lo_relation();
632
633 indstate = CatalogOpenIndexes(lo_heap_r);
634
635 ScanKeyInit(&skey[0],
636 Anum_pg_largeobject_loid,
637 BTEqualStrategyNumber, F_OIDEQ,
638 ObjectIdGetDatum(obj_desc->id));
639
640 ScanKeyInit(&skey[1],
641 Anum_pg_largeobject_pageno,
642 BTGreaterEqualStrategyNumber, F_INT4GE,
643 Int32GetDatum(pageno));
644
645 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
646 obj_desc->snapshot, 2, skey);
647
648 oldtuple = NULL;
649 olddata = NULL;
650 neednextpage = true;
651
652 while (nwritten < nbytes)
653 {
654 /*
655 * If possible, get next pre-existing page of the LO. We expect the
656 * indexscan will deliver these in order --- but there may be holes.
657 */
658 if (neednextpage)
659 {
660 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
661 {
662 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
663 elog(ERROR, "null field found in pg_largeobject");
664 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
665 Assert(olddata->pageno >= pageno);
666 }
667 neednextpage = false;
668 }
669
670 /*
671 * If we have a pre-existing page, see if it is the page we want to
672 * write, or a later one.
673 */
674 if (olddata != NULL && olddata->pageno == pageno)
675 {
676 /*
677 * Update an existing page with fresh data.
678 *
679 * First, load old data into workbuf
680 */
681 getdatafield(olddata, &datafield, &len, &pfreeit);
682 memcpy(workb, VARDATA(datafield), len);
683 if (pfreeit)
684 pfree(datafield);
685
686 /*
687 * Fill any hole
688 */
689 off = (int) (obj_desc->offset % LOBLKSIZE);
690 if (off > len)
691 MemSet(workb + len, 0, off - len);
692
693 /*
694 * Insert appropriate portion of new data
695 */
696 n = LOBLKSIZE - off;
697 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
698 memcpy(workb + off, buf + nwritten, n);
699 nwritten += n;
700 obj_desc->offset += n;
701 off += n;
702 /* compute valid length of new page */
703 len = (len >= off) ? len : off;
704 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
705
706 /*
707 * Form and insert updated tuple
708 */
709 memset(values, 0, sizeof(values));
710 memset(nulls, false, sizeof(nulls));
711 memset(replace, false, sizeof(replace));
712 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
713 replace[Anum_pg_largeobject_data - 1] = true;
714 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
715 values, nulls, replace);
716 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
717 indstate);
718 heap_freetuple(newtup);
719
720 /*
721 * We're done with this old page.
722 */
723 oldtuple = NULL;
724 olddata = NULL;
725 neednextpage = true;
726 }
727 else
728 {
729 /*
730 * Write a brand new page.
731 *
732 * First, fill any hole
733 */
734 off = (int) (obj_desc->offset % LOBLKSIZE);
735 if (off > 0)
736 MemSet(workb, 0, off);
737
738 /*
739 * Insert appropriate portion of new data
740 */
741 n = LOBLKSIZE - off;
742 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
743 memcpy(workb + off, buf + nwritten, n);
744 nwritten += n;
745 obj_desc->offset += n;
746 /* compute valid length of new page */
747 len = off + n;
748 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
749
750 /*
751 * Form and insert updated tuple
752 */
753 memset(values, 0, sizeof(values));
754 memset(nulls, false, sizeof(nulls));
755 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
756 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
757 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
758 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
759 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
760 heap_freetuple(newtup);
761 }
762 pageno++;
763 }
764
765 systable_endscan_ordered(sd);
766
767 CatalogCloseIndexes(indstate);
768
769 /*
770 * Advance command counter so that my tuple updates will be seen by later
771 * large-object operations in this transaction.
772 */
773 CommandCounterIncrement();
774
775 return nwritten;
776 }
777
778 void
inv_truncate(LargeObjectDesc * obj_desc,int64 len)779 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
780 {
781 int32 pageno = (int32) (len / LOBLKSIZE);
782 int32 off;
783 ScanKeyData skey[2];
784 SysScanDesc sd;
785 HeapTuple oldtuple;
786 Form_pg_largeobject olddata;
787 union
788 {
789 bytea hdr;
790 /* this is to make the union big enough for a LO data chunk: */
791 char data[LOBLKSIZE + VARHDRSZ];
792 /* ensure union is aligned well enough: */
793 int32 align_it;
794 } workbuf;
795 char *workb = VARDATA(&workbuf.hdr);
796 HeapTuple newtup;
797 Datum values[Natts_pg_largeobject];
798 bool nulls[Natts_pg_largeobject];
799 bool replace[Natts_pg_largeobject];
800 CatalogIndexState indstate;
801
802 Assert(PointerIsValid(obj_desc));
803
804 /* enforce writability because snapshot is probably wrong otherwise */
805 if ((obj_desc->flags & IFS_WRLOCK) == 0)
806 ereport(ERROR,
807 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
808 errmsg("permission denied for large object %u",
809 obj_desc->id)));
810
811 /*
812 * use errmsg_internal here because we don't want to expose INT64_FORMAT
813 * in translatable strings; doing better is not worth the trouble
814 */
815 if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
816 ereport(ERROR,
817 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
818 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
819 len)));
820
821 open_lo_relation();
822
823 indstate = CatalogOpenIndexes(lo_heap_r);
824
825 /*
826 * Set up to find all pages with desired loid and pageno >= target
827 */
828 ScanKeyInit(&skey[0],
829 Anum_pg_largeobject_loid,
830 BTEqualStrategyNumber, F_OIDEQ,
831 ObjectIdGetDatum(obj_desc->id));
832
833 ScanKeyInit(&skey[1],
834 Anum_pg_largeobject_pageno,
835 BTGreaterEqualStrategyNumber, F_INT4GE,
836 Int32GetDatum(pageno));
837
838 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
839 obj_desc->snapshot, 2, skey);
840
841 /*
842 * If possible, get the page the truncation point is in. The truncation
843 * point may be beyond the end of the LO or in a hole.
844 */
845 olddata = NULL;
846 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
847 {
848 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
849 elog(ERROR, "null field found in pg_largeobject");
850 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
851 Assert(olddata->pageno >= pageno);
852 }
853
854 /*
855 * If we found the page of the truncation point we need to truncate the
856 * data in it. Otherwise if we're in a hole, we need to create a page to
857 * mark the end of data.
858 */
859 if (olddata != NULL && olddata->pageno == pageno)
860 {
861 /* First, load old data into workbuf */
862 bytea *datafield;
863 int pagelen;
864 bool pfreeit;
865
866 getdatafield(olddata, &datafield, &pagelen, &pfreeit);
867 memcpy(workb, VARDATA(datafield), pagelen);
868 if (pfreeit)
869 pfree(datafield);
870
871 /*
872 * Fill any hole
873 */
874 off = len % LOBLKSIZE;
875 if (off > pagelen)
876 MemSet(workb + pagelen, 0, off - pagelen);
877
878 /* compute length of new page */
879 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
880
881 /*
882 * Form and insert updated tuple
883 */
884 memset(values, 0, sizeof(values));
885 memset(nulls, false, sizeof(nulls));
886 memset(replace, false, sizeof(replace));
887 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
888 replace[Anum_pg_largeobject_data - 1] = true;
889 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
890 values, nulls, replace);
891 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
892 indstate);
893 heap_freetuple(newtup);
894 }
895 else
896 {
897 /*
898 * If the first page we found was after the truncation point, we're in
899 * a hole that we'll fill, but we need to delete the later page
900 * because the loop below won't visit it again.
901 */
902 if (olddata != NULL)
903 {
904 Assert(olddata->pageno > pageno);
905 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
906 }
907
908 /*
909 * Write a brand new page.
910 *
911 * Fill the hole up to the truncation point
912 */
913 off = len % LOBLKSIZE;
914 if (off > 0)
915 MemSet(workb, 0, off);
916
917 /* compute length of new page */
918 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
919
920 /*
921 * Form and insert new tuple
922 */
923 memset(values, 0, sizeof(values));
924 memset(nulls, false, sizeof(nulls));
925 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
926 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
927 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
928 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
929 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
930 heap_freetuple(newtup);
931 }
932
933 /*
934 * Delete any pages after the truncation point. If the initial search
935 * didn't find a page, then of course there's nothing more to do.
936 */
937 if (olddata != NULL)
938 {
939 while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
940 {
941 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
942 }
943 }
944
945 systable_endscan_ordered(sd);
946
947 CatalogCloseIndexes(indstate);
948
949 /*
950 * Advance command counter so that tuple updates will be seen by later
951 * large-object operations in this transaction.
952 */
953 CommandCounterIncrement();
954 }
955