1 /*-------------------------------------------------------------------------
2 *
3 * inv_api.c
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
6 *
7 *
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 *
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
20 *
21 *
22 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
24 *
25 *
26 * IDENTIFICATION
27 * src/backend/storage/large_object/inv_api.c
28 *
29 *-------------------------------------------------------------------------
30 */
31 #include "postgres.h"
32
33 #include <limits.h>
34
35 #include "access/genam.h"
36 #include "access/sysattr.h"
37 #include "access/table.h"
38 #include "access/tuptoaster.h"
39 #include "access/xact.h"
40 #include "catalog/dependency.h"
41 #include "catalog/indexing.h"
42 #include "catalog/objectaccess.h"
43 #include "catalog/pg_largeobject.h"
44 #include "catalog/pg_largeobject_metadata.h"
45 #include "libpq/libpq-fs.h"
46 #include "miscadmin.h"
47 #include "storage/large_object.h"
48 #include "utils/fmgroids.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51
52
53 /*
54 * GUC: backwards-compatibility flag to suppress LO permission checks
55 */
56 bool lo_compat_privileges;
57
58 /*
59 * All accesses to pg_largeobject and its index make use of a single Relation
60 * reference, so that we only need to open pg_relation once per transaction.
61 * To avoid problems when the first such reference occurs inside a
62 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
63 * the Relation reference to TopTransactionResourceOwner.
64 */
65 static Relation lo_heap_r = NULL;
66 static Relation lo_index_r = NULL;
67
68
69 /*
70 * Open pg_largeobject and its index, if not already done in current xact
71 */
72 static void
open_lo_relation(void)73 open_lo_relation(void)
74 {
75 ResourceOwner currentOwner;
76
77 if (lo_heap_r && lo_index_r)
78 return; /* already open in current xact */
79
80 /* Arrange for the top xact to own these relation references */
81 currentOwner = CurrentResourceOwner;
82 CurrentResourceOwner = TopTransactionResourceOwner;
83
84 /* Use RowExclusiveLock since we might either read or write */
85 if (lo_heap_r == NULL)
86 lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
87 if (lo_index_r == NULL)
88 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
89
90 CurrentResourceOwner = currentOwner;
91 }
92
93 /*
94 * Clean up at main transaction end
95 */
96 void
close_lo_relation(bool isCommit)97 close_lo_relation(bool isCommit)
98 {
99 if (lo_heap_r || lo_index_r)
100 {
101 /*
102 * Only bother to close if committing; else abort cleanup will handle
103 * it
104 */
105 if (isCommit)
106 {
107 ResourceOwner currentOwner;
108
109 currentOwner = CurrentResourceOwner;
110 CurrentResourceOwner = TopTransactionResourceOwner;
111
112 if (lo_index_r)
113 index_close(lo_index_r, NoLock);
114 if (lo_heap_r)
115 table_close(lo_heap_r, NoLock);
116
117 CurrentResourceOwner = currentOwner;
118 }
119 lo_heap_r = NULL;
120 lo_index_r = NULL;
121 }
122 }
123
124
125 /*
126 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
127 * read with can be specified.
128 */
129 static bool
myLargeObjectExists(Oid loid,Snapshot snapshot)130 myLargeObjectExists(Oid loid, Snapshot snapshot)
131 {
132 Relation pg_lo_meta;
133 ScanKeyData skey[1];
134 SysScanDesc sd;
135 HeapTuple tuple;
136 bool retval = false;
137
138 ScanKeyInit(&skey[0],
139 Anum_pg_largeobject_metadata_oid,
140 BTEqualStrategyNumber, F_OIDEQ,
141 ObjectIdGetDatum(loid));
142
143 pg_lo_meta = table_open(LargeObjectMetadataRelationId,
144 AccessShareLock);
145
146 sd = systable_beginscan(pg_lo_meta,
147 LargeObjectMetadataOidIndexId, true,
148 snapshot, 1, skey);
149
150 tuple = systable_getnext(sd);
151 if (HeapTupleIsValid(tuple))
152 retval = true;
153
154 systable_endscan(sd);
155
156 table_close(pg_lo_meta, AccessShareLock);
157
158 return retval;
159 }
160
161
162 /*
163 * Extract data field from a pg_largeobject tuple, detoasting if needed
164 * and verifying that the length is sane. Returns data pointer (a bytea *),
165 * data length, and an indication of whether to pfree the data pointer.
166 */
167 static void
getdatafield(Form_pg_largeobject tuple,bytea ** pdatafield,int * plen,bool * pfreeit)168 getdatafield(Form_pg_largeobject tuple,
169 bytea **pdatafield,
170 int *plen,
171 bool *pfreeit)
172 {
173 bytea *datafield;
174 int len;
175 bool freeit;
176
177 datafield = &(tuple->data); /* see note at top of file */
178 freeit = false;
179 if (VARATT_IS_EXTENDED(datafield))
180 {
181 datafield = (bytea *)
182 heap_tuple_untoast_attr((struct varlena *) datafield);
183 freeit = true;
184 }
185 len = VARSIZE(datafield) - VARHDRSZ;
186 if (len < 0 || len > LOBLKSIZE)
187 ereport(ERROR,
188 (errcode(ERRCODE_DATA_CORRUPTED),
189 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
190 tuple->loid, tuple->pageno, len)));
191 *pdatafield = datafield;
192 *plen = len;
193 *pfreeit = freeit;
194 }
195
196
197 /*
198 * inv_create -- create a new large object
199 *
200 * Arguments:
201 * lobjId - OID to use for new large object, or InvalidOid to pick one
202 *
203 * Returns:
204 * OID of new object
205 *
206 * If lobjId is not InvalidOid, then an error occurs if the OID is already
207 * in use.
208 */
209 Oid
inv_create(Oid lobjId)210 inv_create(Oid lobjId)
211 {
212 Oid lobjId_new;
213
214 /*
215 * Create a new largeobject with empty data pages
216 */
217 lobjId_new = LargeObjectCreate(lobjId);
218
219 /*
220 * dependency on the owner of largeobject
221 *
222 * The reason why we use LargeObjectRelationId instead of
223 * LargeObjectMetadataRelationId here is to provide backward compatibility
224 * to the applications which utilize a knowledge about internal layout of
225 * system catalogs. OID of pg_largeobject_metadata and loid of
226 * pg_largeobject are same value, so there are no actual differences here.
227 */
228 recordDependencyOnOwner(LargeObjectRelationId,
229 lobjId_new, GetUserId());
230
231 /* Post creation hook for new large object */
232 InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
233
234 /*
235 * Advance command counter to make new tuple visible to later operations.
236 */
237 CommandCounterIncrement();
238
239 return lobjId_new;
240 }
241
242 /*
243 * inv_open -- access an existing large object.
244 *
245 * Returns a large object descriptor, appropriately filled in.
246 * The descriptor and subsidiary data are allocated in the specified
247 * memory context, which must be suitably long-lived for the caller's
248 * purposes. If the returned descriptor has a snapshot associated
249 * with it, the caller must ensure that it also lives long enough,
250 * e.g. by calling RegisterSnapshotOnOwner
251 */
252 LargeObjectDesc *
inv_open(Oid lobjId,int flags,MemoryContext mcxt)253 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
254 {
255 LargeObjectDesc *retval;
256 Snapshot snapshot = NULL;
257 int descflags = 0;
258
259 /*
260 * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
261 * | INV_READ), the caller being allowed to read the large object
262 * descriptor in either case.
263 */
264 if (flags & INV_WRITE)
265 descflags |= IFS_WRLOCK | IFS_RDLOCK;
266 if (flags & INV_READ)
267 descflags |= IFS_RDLOCK;
268
269 if (descflags == 0)
270 ereport(ERROR,
271 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
272 errmsg("invalid flags for opening a large object: %d",
273 flags)));
274
275 /* Get snapshot. If write is requested, use an instantaneous snapshot. */
276 if (descflags & IFS_WRLOCK)
277 snapshot = NULL;
278 else
279 snapshot = GetActiveSnapshot();
280
281 /* Can't use LargeObjectExists here because we need to specify snapshot */
282 if (!myLargeObjectExists(lobjId, snapshot))
283 ereport(ERROR,
284 (errcode(ERRCODE_UNDEFINED_OBJECT),
285 errmsg("large object %u does not exist", lobjId)));
286
287 /* Apply permission checks, again specifying snapshot */
288 if ((descflags & IFS_RDLOCK) != 0)
289 {
290 if (!lo_compat_privileges &&
291 pg_largeobject_aclcheck_snapshot(lobjId,
292 GetUserId(),
293 ACL_SELECT,
294 snapshot) != ACLCHECK_OK)
295 ereport(ERROR,
296 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
297 errmsg("permission denied for large object %u",
298 lobjId)));
299 }
300 if ((descflags & IFS_WRLOCK) != 0)
301 {
302 if (!lo_compat_privileges &&
303 pg_largeobject_aclcheck_snapshot(lobjId,
304 GetUserId(),
305 ACL_UPDATE,
306 snapshot) != ACLCHECK_OK)
307 ereport(ERROR,
308 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
309 errmsg("permission denied for large object %u",
310 lobjId)));
311 }
312
313 /* OK to create a descriptor */
314 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
315 sizeof(LargeObjectDesc));
316 retval->id = lobjId;
317 retval->offset = 0;
318 retval->flags = descflags;
319
320 /* caller sets if needed, not used by the functions in this file */
321 retval->subid = InvalidSubTransactionId;
322
323 /*
324 * The snapshot (if any) is just the currently active snapshot. The
325 * caller will replace it with a longer-lived copy if needed.
326 */
327 retval->snapshot = snapshot;
328
329 return retval;
330 }
331
332 /*
333 * Closes a large object descriptor previously made by inv_open(), and
334 * releases the long-term memory used by it.
335 */
336 void
inv_close(LargeObjectDesc * obj_desc)337 inv_close(LargeObjectDesc *obj_desc)
338 {
339 Assert(PointerIsValid(obj_desc));
340 pfree(obj_desc);
341 }
342
343 /*
344 * Destroys an existing large object (not to be confused with a descriptor!)
345 *
346 * Note we expect caller to have done any required permissions check.
347 */
348 int
inv_drop(Oid lobjId)349 inv_drop(Oid lobjId)
350 {
351 ObjectAddress object;
352
353 /*
354 * Delete any comments and dependencies on the large object
355 */
356 object.classId = LargeObjectRelationId;
357 object.objectId = lobjId;
358 object.objectSubId = 0;
359 performDeletion(&object, DROP_CASCADE, 0);
360
361 /*
362 * Advance command counter so that tuple removal will be seen by later
363 * large-object operations in this transaction.
364 */
365 CommandCounterIncrement();
366
367 /* For historical reasons, we always return 1 on success. */
368 return 1;
369 }
370
371 /*
372 * Determine size of a large object
373 *
374 * NOTE: LOs can contain gaps, just like Unix files. We actually return
375 * the offset of the last byte + 1.
376 */
377 static uint64
inv_getsize(LargeObjectDesc * obj_desc)378 inv_getsize(LargeObjectDesc *obj_desc)
379 {
380 uint64 lastbyte = 0;
381 ScanKeyData skey[1];
382 SysScanDesc sd;
383 HeapTuple tuple;
384
385 Assert(PointerIsValid(obj_desc));
386
387 open_lo_relation();
388
389 ScanKeyInit(&skey[0],
390 Anum_pg_largeobject_loid,
391 BTEqualStrategyNumber, F_OIDEQ,
392 ObjectIdGetDatum(obj_desc->id));
393
394 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
395 obj_desc->snapshot, 1, skey);
396
397 /*
398 * Because the pg_largeobject index is on both loid and pageno, but we
399 * constrain only loid, a backwards scan should visit all pages of the
400 * large object in reverse pageno order. So, it's sufficient to examine
401 * the first valid tuple (== last valid page).
402 */
403 tuple = systable_getnext_ordered(sd, BackwardScanDirection);
404 if (HeapTupleIsValid(tuple))
405 {
406 Form_pg_largeobject data;
407 bytea *datafield;
408 int len;
409 bool pfreeit;
410
411 if (HeapTupleHasNulls(tuple)) /* paranoia */
412 elog(ERROR, "null field found in pg_largeobject");
413 data = (Form_pg_largeobject) GETSTRUCT(tuple);
414 getdatafield(data, &datafield, &len, &pfreeit);
415 lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
416 if (pfreeit)
417 pfree(datafield);
418 }
419
420 systable_endscan_ordered(sd);
421
422 return lastbyte;
423 }
424
425 int64
inv_seek(LargeObjectDesc * obj_desc,int64 offset,int whence)426 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
427 {
428 int64 newoffset;
429
430 Assert(PointerIsValid(obj_desc));
431
432 /*
433 * We allow seek/tell if you have either read or write permission, so no
434 * need for a permission check here.
435 */
436
437 /*
438 * Note: overflow in the additions is possible, but since we will reject
439 * negative results, we don't need any extra test for that.
440 */
441 switch (whence)
442 {
443 case SEEK_SET:
444 newoffset = offset;
445 break;
446 case SEEK_CUR:
447 newoffset = obj_desc->offset + offset;
448 break;
449 case SEEK_END:
450 newoffset = inv_getsize(obj_desc) + offset;
451 break;
452 default:
453 ereport(ERROR,
454 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455 errmsg("invalid whence setting: %d", whence)));
456 newoffset = 0; /* keep compiler quiet */
457 break;
458 }
459
460 /*
461 * use errmsg_internal here because we don't want to expose INT64_FORMAT
462 * in translatable strings; doing better is not worth the trouble
463 */
464 if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
465 ereport(ERROR,
466 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
467 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
468 newoffset)));
469
470 obj_desc->offset = newoffset;
471 return newoffset;
472 }
473
474 int64
inv_tell(LargeObjectDesc * obj_desc)475 inv_tell(LargeObjectDesc *obj_desc)
476 {
477 Assert(PointerIsValid(obj_desc));
478
479 /*
480 * We allow seek/tell if you have either read or write permission, so no
481 * need for a permission check here.
482 */
483
484 return obj_desc->offset;
485 }
486
487 int
inv_read(LargeObjectDesc * obj_desc,char * buf,int nbytes)488 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
489 {
490 int nread = 0;
491 int64 n;
492 int64 off;
493 int len;
494 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
495 uint64 pageoff;
496 ScanKeyData skey[2];
497 SysScanDesc sd;
498 HeapTuple tuple;
499
500 Assert(PointerIsValid(obj_desc));
501 Assert(buf != NULL);
502
503 if ((obj_desc->flags & IFS_RDLOCK) == 0)
504 ereport(ERROR,
505 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
506 errmsg("permission denied for large object %u",
507 obj_desc->id)));
508
509 if (nbytes <= 0)
510 return 0;
511
512 open_lo_relation();
513
514 ScanKeyInit(&skey[0],
515 Anum_pg_largeobject_loid,
516 BTEqualStrategyNumber, F_OIDEQ,
517 ObjectIdGetDatum(obj_desc->id));
518
519 ScanKeyInit(&skey[1],
520 Anum_pg_largeobject_pageno,
521 BTGreaterEqualStrategyNumber, F_INT4GE,
522 Int32GetDatum(pageno));
523
524 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
525 obj_desc->snapshot, 2, skey);
526
527 while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
528 {
529 Form_pg_largeobject data;
530 bytea *datafield;
531 bool pfreeit;
532
533 if (HeapTupleHasNulls(tuple)) /* paranoia */
534 elog(ERROR, "null field found in pg_largeobject");
535 data = (Form_pg_largeobject) GETSTRUCT(tuple);
536
537 /*
538 * We expect the indexscan will deliver pages in order. However,
539 * there may be missing pages if the LO contains unwritten "holes". We
540 * want missing sections to read out as zeroes.
541 */
542 pageoff = ((uint64) data->pageno) * LOBLKSIZE;
543 if (pageoff > obj_desc->offset)
544 {
545 n = pageoff - obj_desc->offset;
546 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
547 MemSet(buf + nread, 0, n);
548 nread += n;
549 obj_desc->offset += n;
550 }
551
552 if (nread < nbytes)
553 {
554 Assert(obj_desc->offset >= pageoff);
555 off = (int) (obj_desc->offset - pageoff);
556 Assert(off >= 0 && off < LOBLKSIZE);
557
558 getdatafield(data, &datafield, &len, &pfreeit);
559 if (len > off)
560 {
561 n = len - off;
562 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
563 memcpy(buf + nread, VARDATA(datafield) + off, n);
564 nread += n;
565 obj_desc->offset += n;
566 }
567 if (pfreeit)
568 pfree(datafield);
569 }
570
571 if (nread >= nbytes)
572 break;
573 }
574
575 systable_endscan_ordered(sd);
576
577 return nread;
578 }
579
580 int
inv_write(LargeObjectDesc * obj_desc,const char * buf,int nbytes)581 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
582 {
583 int nwritten = 0;
584 int n;
585 int off;
586 int len;
587 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
588 ScanKeyData skey[2];
589 SysScanDesc sd;
590 HeapTuple oldtuple;
591 Form_pg_largeobject olddata;
592 bool neednextpage;
593 bytea *datafield;
594 bool pfreeit;
595 union
596 {
597 bytea hdr;
598 /* this is to make the union big enough for a LO data chunk: */
599 char data[LOBLKSIZE + VARHDRSZ];
600 /* ensure union is aligned well enough: */
601 int32 align_it;
602 } workbuf;
603 char *workb = VARDATA(&workbuf.hdr);
604 HeapTuple newtup;
605 Datum values[Natts_pg_largeobject];
606 bool nulls[Natts_pg_largeobject];
607 bool replace[Natts_pg_largeobject];
608 CatalogIndexState indstate;
609
610 Assert(PointerIsValid(obj_desc));
611 Assert(buf != NULL);
612
613 /* enforce writability because snapshot is probably wrong otherwise */
614 if ((obj_desc->flags & IFS_WRLOCK) == 0)
615 ereport(ERROR,
616 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
617 errmsg("permission denied for large object %u",
618 obj_desc->id)));
619
620 if (nbytes <= 0)
621 return 0;
622
623 /* this addition can't overflow because nbytes is only int32 */
624 if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
625 ereport(ERROR,
626 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
627 errmsg("invalid large object write request size: %d",
628 nbytes)));
629
630 open_lo_relation();
631
632 indstate = CatalogOpenIndexes(lo_heap_r);
633
634 ScanKeyInit(&skey[0],
635 Anum_pg_largeobject_loid,
636 BTEqualStrategyNumber, F_OIDEQ,
637 ObjectIdGetDatum(obj_desc->id));
638
639 ScanKeyInit(&skey[1],
640 Anum_pg_largeobject_pageno,
641 BTGreaterEqualStrategyNumber, F_INT4GE,
642 Int32GetDatum(pageno));
643
644 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
645 obj_desc->snapshot, 2, skey);
646
647 oldtuple = NULL;
648 olddata = NULL;
649 neednextpage = true;
650
651 while (nwritten < nbytes)
652 {
653 /*
654 * If possible, get next pre-existing page of the LO. We expect the
655 * indexscan will deliver these in order --- but there may be holes.
656 */
657 if (neednextpage)
658 {
659 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
660 {
661 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
662 elog(ERROR, "null field found in pg_largeobject");
663 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
664 Assert(olddata->pageno >= pageno);
665 }
666 neednextpage = false;
667 }
668
669 /*
670 * If we have a pre-existing page, see if it is the page we want to
671 * write, or a later one.
672 */
673 if (olddata != NULL && olddata->pageno == pageno)
674 {
675 /*
676 * Update an existing page with fresh data.
677 *
678 * First, load old data into workbuf
679 */
680 getdatafield(olddata, &datafield, &len, &pfreeit);
681 memcpy(workb, VARDATA(datafield), len);
682 if (pfreeit)
683 pfree(datafield);
684
685 /*
686 * Fill any hole
687 */
688 off = (int) (obj_desc->offset % LOBLKSIZE);
689 if (off > len)
690 MemSet(workb + len, 0, off - len);
691
692 /*
693 * Insert appropriate portion of new data
694 */
695 n = LOBLKSIZE - off;
696 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
697 memcpy(workb + off, buf + nwritten, n);
698 nwritten += n;
699 obj_desc->offset += n;
700 off += n;
701 /* compute valid length of new page */
702 len = (len >= off) ? len : off;
703 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
704
705 /*
706 * Form and insert updated tuple
707 */
708 memset(values, 0, sizeof(values));
709 memset(nulls, false, sizeof(nulls));
710 memset(replace, false, sizeof(replace));
711 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
712 replace[Anum_pg_largeobject_data - 1] = true;
713 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
714 values, nulls, replace);
715 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
716 indstate);
717 heap_freetuple(newtup);
718
719 /*
720 * We're done with this old page.
721 */
722 oldtuple = NULL;
723 olddata = NULL;
724 neednextpage = true;
725 }
726 else
727 {
728 /*
729 * Write a brand new page.
730 *
731 * First, fill any hole
732 */
733 off = (int) (obj_desc->offset % LOBLKSIZE);
734 if (off > 0)
735 MemSet(workb, 0, off);
736
737 /*
738 * Insert appropriate portion of new data
739 */
740 n = LOBLKSIZE - off;
741 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
742 memcpy(workb + off, buf + nwritten, n);
743 nwritten += n;
744 obj_desc->offset += n;
745 /* compute valid length of new page */
746 len = off + n;
747 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
748
749 /*
750 * Form and insert updated tuple
751 */
752 memset(values, 0, sizeof(values));
753 memset(nulls, false, sizeof(nulls));
754 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
755 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
756 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
757 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
758 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
759 heap_freetuple(newtup);
760 }
761 pageno++;
762 }
763
764 systable_endscan_ordered(sd);
765
766 CatalogCloseIndexes(indstate);
767
768 /*
769 * Advance command counter so that my tuple updates will be seen by later
770 * large-object operations in this transaction.
771 */
772 CommandCounterIncrement();
773
774 return nwritten;
775 }
776
777 void
inv_truncate(LargeObjectDesc * obj_desc,int64 len)778 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
779 {
780 int32 pageno = (int32) (len / LOBLKSIZE);
781 int32 off;
782 ScanKeyData skey[2];
783 SysScanDesc sd;
784 HeapTuple oldtuple;
785 Form_pg_largeobject olddata;
786 union
787 {
788 bytea hdr;
789 /* this is to make the union big enough for a LO data chunk: */
790 char data[LOBLKSIZE + VARHDRSZ];
791 /* ensure union is aligned well enough: */
792 int32 align_it;
793 } workbuf;
794 char *workb = VARDATA(&workbuf.hdr);
795 HeapTuple newtup;
796 Datum values[Natts_pg_largeobject];
797 bool nulls[Natts_pg_largeobject];
798 bool replace[Natts_pg_largeobject];
799 CatalogIndexState indstate;
800
801 Assert(PointerIsValid(obj_desc));
802
803 /* enforce writability because snapshot is probably wrong otherwise */
804 if ((obj_desc->flags & IFS_WRLOCK) == 0)
805 ereport(ERROR,
806 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
807 errmsg("permission denied for large object %u",
808 obj_desc->id)));
809
810 /*
811 * use errmsg_internal here because we don't want to expose INT64_FORMAT
812 * in translatable strings; doing better is not worth the trouble
813 */
814 if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
815 ereport(ERROR,
816 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
817 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
818 len)));
819
820 open_lo_relation();
821
822 indstate = CatalogOpenIndexes(lo_heap_r);
823
824 /*
825 * Set up to find all pages with desired loid and pageno >= target
826 */
827 ScanKeyInit(&skey[0],
828 Anum_pg_largeobject_loid,
829 BTEqualStrategyNumber, F_OIDEQ,
830 ObjectIdGetDatum(obj_desc->id));
831
832 ScanKeyInit(&skey[1],
833 Anum_pg_largeobject_pageno,
834 BTGreaterEqualStrategyNumber, F_INT4GE,
835 Int32GetDatum(pageno));
836
837 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
838 obj_desc->snapshot, 2, skey);
839
840 /*
841 * If possible, get the page the truncation point is in. The truncation
842 * point may be beyond the end of the LO or in a hole.
843 */
844 olddata = NULL;
845 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
846 {
847 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
848 elog(ERROR, "null field found in pg_largeobject");
849 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
850 Assert(olddata->pageno >= pageno);
851 }
852
853 /*
854 * If we found the page of the truncation point we need to truncate the
855 * data in it. Otherwise if we're in a hole, we need to create a page to
856 * mark the end of data.
857 */
858 if (olddata != NULL && olddata->pageno == pageno)
859 {
860 /* First, load old data into workbuf */
861 bytea *datafield;
862 int pagelen;
863 bool pfreeit;
864
865 getdatafield(olddata, &datafield, &pagelen, &pfreeit);
866 memcpy(workb, VARDATA(datafield), pagelen);
867 if (pfreeit)
868 pfree(datafield);
869
870 /*
871 * Fill any hole
872 */
873 off = len % LOBLKSIZE;
874 if (off > pagelen)
875 MemSet(workb + pagelen, 0, off - pagelen);
876
877 /* compute length of new page */
878 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
879
880 /*
881 * Form and insert updated tuple
882 */
883 memset(values, 0, sizeof(values));
884 memset(nulls, false, sizeof(nulls));
885 memset(replace, false, sizeof(replace));
886 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
887 replace[Anum_pg_largeobject_data - 1] = true;
888 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
889 values, nulls, replace);
890 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
891 indstate);
892 heap_freetuple(newtup);
893 }
894 else
895 {
896 /*
897 * If the first page we found was after the truncation point, we're in
898 * a hole that we'll fill, but we need to delete the later page
899 * because the loop below won't visit it again.
900 */
901 if (olddata != NULL)
902 {
903 Assert(olddata->pageno > pageno);
904 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
905 }
906
907 /*
908 * Write a brand new page.
909 *
910 * Fill the hole up to the truncation point
911 */
912 off = len % LOBLKSIZE;
913 if (off > 0)
914 MemSet(workb, 0, off);
915
916 /* compute length of new page */
917 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
918
919 /*
920 * Form and insert new tuple
921 */
922 memset(values, 0, sizeof(values));
923 memset(nulls, false, sizeof(nulls));
924 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
925 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
926 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
927 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
928 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
929 heap_freetuple(newtup);
930 }
931
932 /*
933 * Delete any pages after the truncation point. If the initial search
934 * didn't find a page, then of course there's nothing more to do.
935 */
936 if (olddata != NULL)
937 {
938 while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
939 {
940 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
941 }
942 }
943
944 systable_endscan_ordered(sd);
945
946 CatalogCloseIndexes(indstate);
947
948 /*
949 * Advance command counter so that tuple updates will be seen by later
950 * large-object operations in this transaction.
951 */
952 CommandCounterIncrement();
953 }
954