1 /*-------------------------------------------------------------------------
2 *
3 * inv_api.c
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
6 *
7 *
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 *
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
20 *
21 *
22 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
24 *
25 *
26 * IDENTIFICATION
27 * src/backend/storage/large_object/inv_api.c
28 *
29 *-------------------------------------------------------------------------
30 */
31 #include "postgres.h"
32
33 #include <limits.h>
34
35 #include "access/genam.h"
36 #include "access/heapam.h"
37 #include "access/sysattr.h"
38 #include "access/tuptoaster.h"
39 #include "access/xact.h"
40 #include "catalog/dependency.h"
41 #include "catalog/indexing.h"
42 #include "catalog/objectaccess.h"
43 #include "catalog/pg_largeobject.h"
44 #include "catalog/pg_largeobject_metadata.h"
45 #include "libpq/libpq-fs.h"
46 #include "miscadmin.h"
47 #include "storage/large_object.h"
48 #include "utils/fmgroids.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51 #include "utils/tqual.h"
52
53
54 /*
55 * All accesses to pg_largeobject and its index make use of a single Relation
56 * reference, so that we only need to open pg_relation once per transaction.
57 * To avoid problems when the first such reference occurs inside a
58 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
59 * the Relation reference to TopTransactionResourceOwner.
60 */
61 static Relation lo_heap_r = NULL;
62 static Relation lo_index_r = NULL;
63
64
65 /*
66 * Open pg_largeobject and its index, if not already done in current xact
67 */
68 static void
open_lo_relation(void)69 open_lo_relation(void)
70 {
71 ResourceOwner currentOwner;
72
73 if (lo_heap_r && lo_index_r)
74 return; /* already open in current xact */
75
76 /* Arrange for the top xact to own these relation references */
77 currentOwner = CurrentResourceOwner;
78 PG_TRY();
79 {
80 CurrentResourceOwner = TopTransactionResourceOwner;
81
82 /* Use RowExclusiveLock since we might either read or write */
83 if (lo_heap_r == NULL)
84 lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
85 if (lo_index_r == NULL)
86 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
87 }
88 PG_CATCH();
89 {
90 /* Ensure CurrentResourceOwner is restored on error */
91 CurrentResourceOwner = currentOwner;
92 PG_RE_THROW();
93 }
94 PG_END_TRY();
95 CurrentResourceOwner = currentOwner;
96 }
97
98 /*
99 * Clean up at main transaction end
100 */
101 void
close_lo_relation(bool isCommit)102 close_lo_relation(bool isCommit)
103 {
104 if (lo_heap_r || lo_index_r)
105 {
106 /*
107 * Only bother to close if committing; else abort cleanup will handle
108 * it
109 */
110 if (isCommit)
111 {
112 ResourceOwner currentOwner;
113
114 currentOwner = CurrentResourceOwner;
115 PG_TRY();
116 {
117 CurrentResourceOwner = TopTransactionResourceOwner;
118
119 if (lo_index_r)
120 index_close(lo_index_r, NoLock);
121 if (lo_heap_r)
122 heap_close(lo_heap_r, NoLock);
123 }
124 PG_CATCH();
125 {
126 /* Ensure CurrentResourceOwner is restored on error */
127 CurrentResourceOwner = currentOwner;
128 PG_RE_THROW();
129 }
130 PG_END_TRY();
131 CurrentResourceOwner = currentOwner;
132 }
133 lo_heap_r = NULL;
134 lo_index_r = NULL;
135 }
136 }
137
138
139 /*
140 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
141 * read with can be specified.
142 */
143 static bool
myLargeObjectExists(Oid loid,Snapshot snapshot)144 myLargeObjectExists(Oid loid, Snapshot snapshot)
145 {
146 Relation pg_lo_meta;
147 ScanKeyData skey[1];
148 SysScanDesc sd;
149 HeapTuple tuple;
150 bool retval = false;
151
152 ScanKeyInit(&skey[0],
153 ObjectIdAttributeNumber,
154 BTEqualStrategyNumber, F_OIDEQ,
155 ObjectIdGetDatum(loid));
156
157 pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
158 AccessShareLock);
159
160 sd = systable_beginscan(pg_lo_meta,
161 LargeObjectMetadataOidIndexId, true,
162 snapshot, 1, skey);
163
164 tuple = systable_getnext(sd);
165 if (HeapTupleIsValid(tuple))
166 retval = true;
167
168 systable_endscan(sd);
169
170 heap_close(pg_lo_meta, AccessShareLock);
171
172 return retval;
173 }
174
175
176 /*
177 * Extract data field from a pg_largeobject tuple, detoasting if needed
178 * and verifying that the length is sane. Returns data pointer (a bytea *),
179 * data length, and an indication of whether to pfree the data pointer.
180 */
181 static void
getdatafield(Form_pg_largeobject tuple,bytea ** pdatafield,int * plen,bool * pfreeit)182 getdatafield(Form_pg_largeobject tuple,
183 bytea **pdatafield,
184 int *plen,
185 bool *pfreeit)
186 {
187 bytea *datafield;
188 int len;
189 bool freeit;
190
191 datafield = &(tuple->data); /* see note at top of file */
192 freeit = false;
193 if (VARATT_IS_EXTENDED(datafield))
194 {
195 datafield = (bytea *)
196 heap_tuple_untoast_attr((struct varlena *) datafield);
197 freeit = true;
198 }
199 len = VARSIZE(datafield) - VARHDRSZ;
200 if (len < 0 || len > LOBLKSIZE)
201 ereport(ERROR,
202 (errcode(ERRCODE_DATA_CORRUPTED),
203 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
204 tuple->loid, tuple->pageno, len)));
205 *pdatafield = datafield;
206 *plen = len;
207 *pfreeit = freeit;
208 }
209
210
211 /*
212 * inv_create -- create a new large object
213 *
214 * Arguments:
215 * lobjId - OID to use for new large object, or InvalidOid to pick one
216 *
217 * Returns:
218 * OID of new object
219 *
220 * If lobjId is not InvalidOid, then an error occurs if the OID is already
221 * in use.
222 */
223 Oid
inv_create(Oid lobjId)224 inv_create(Oid lobjId)
225 {
226 Oid lobjId_new;
227
228 /*
229 * Create a new largeobject with empty data pages
230 */
231 lobjId_new = LargeObjectCreate(lobjId);
232
233 /*
234 * dependency on the owner of largeobject
235 *
236 * The reason why we use LargeObjectRelationId instead of
237 * LargeObjectMetadataRelationId here is to provide backward compatibility
238 * to the applications which utilize a knowledge about internal layout of
239 * system catalogs. OID of pg_largeobject_metadata and loid of
240 * pg_largeobject are same value, so there are no actual differences here.
241 */
242 recordDependencyOnOwner(LargeObjectRelationId,
243 lobjId_new, GetUserId());
244
245 /* Post creation hook for new large object */
246 InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
247
248 /*
249 * Advance command counter to make new tuple visible to later operations.
250 */
251 CommandCounterIncrement();
252
253 return lobjId_new;
254 }
255
256 /*
257 * inv_open -- access an existing large object.
258 *
259 * Returns a large object descriptor, appropriately filled in.
260 * The descriptor and subsidiary data are allocated in the specified
261 * memory context, which must be suitably long-lived for the caller's
262 * purposes. If the returned descriptor has a snapshot associated
263 * with it, the caller must ensure that it also lives long enough,
264 * e.g. by calling RegisterSnapshotOnOwner
265 */
266 LargeObjectDesc *
inv_open(Oid lobjId,int flags,MemoryContext mcxt)267 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
268 {
269 LargeObjectDesc *retval;
270 Snapshot snapshot = NULL;
271 int descflags = 0;
272
273 if (flags & INV_WRITE)
274 {
275 snapshot = NULL; /* instantaneous MVCC snapshot */
276 descflags = IFS_WRLOCK | IFS_RDLOCK;
277 }
278 else if (flags & INV_READ)
279 {
280 snapshot = GetActiveSnapshot();
281 descflags = IFS_RDLOCK;
282 }
283 else
284 ereport(ERROR,
285 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
286 errmsg("invalid flags for opening a large object: %d",
287 flags)));
288
289 /* Can't use LargeObjectExists here because we need to specify snapshot */
290 if (!myLargeObjectExists(lobjId, snapshot))
291 ereport(ERROR,
292 (errcode(ERRCODE_UNDEFINED_OBJECT),
293 errmsg("large object %u does not exist", lobjId)));
294
295 /* OK to create a descriptor */
296 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
297 sizeof(LargeObjectDesc));
298 retval->id = lobjId;
299 retval->offset = 0;
300 retval->flags = descflags;
301
302 /* caller sets if needed, not used by the functions in this file */
303 retval->subid = InvalidSubTransactionId;
304
305 /*
306 * The snapshot (if any) is just the currently active snapshot. The
307 * caller will replace it with a longer-lived copy if needed.
308 */
309 retval->snapshot = snapshot;
310 retval->flags = descflags;
311
312 return retval;
313 }
314
315 /*
316 * Closes a large object descriptor previously made by inv_open(), and
317 * releases the long-term memory used by it.
318 */
319 void
inv_close(LargeObjectDesc * obj_desc)320 inv_close(LargeObjectDesc *obj_desc)
321 {
322 Assert(PointerIsValid(obj_desc));
323 pfree(obj_desc);
324 }
325
326 /*
327 * Destroys an existing large object (not to be confused with a descriptor!)
328 *
329 * returns -1 if failed
330 */
331 int
inv_drop(Oid lobjId)332 inv_drop(Oid lobjId)
333 {
334 ObjectAddress object;
335
336 /*
337 * Delete any comments and dependencies on the large object
338 */
339 object.classId = LargeObjectRelationId;
340 object.objectId = lobjId;
341 object.objectSubId = 0;
342 performDeletion(&object, DROP_CASCADE, 0);
343
344 /*
345 * Advance command counter so that tuple removal will be seen by later
346 * large-object operations in this transaction.
347 */
348 CommandCounterIncrement();
349
350 return 1;
351 }
352
353 /*
354 * Determine size of a large object
355 *
356 * NOTE: LOs can contain gaps, just like Unix files. We actually return
357 * the offset of the last byte + 1.
358 */
359 static uint64
inv_getsize(LargeObjectDesc * obj_desc)360 inv_getsize(LargeObjectDesc *obj_desc)
361 {
362 uint64 lastbyte = 0;
363 ScanKeyData skey[1];
364 SysScanDesc sd;
365 HeapTuple tuple;
366
367 Assert(PointerIsValid(obj_desc));
368
369 open_lo_relation();
370
371 ScanKeyInit(&skey[0],
372 Anum_pg_largeobject_loid,
373 BTEqualStrategyNumber, F_OIDEQ,
374 ObjectIdGetDatum(obj_desc->id));
375
376 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
377 obj_desc->snapshot, 1, skey);
378
379 /*
380 * Because the pg_largeobject index is on both loid and pageno, but we
381 * constrain only loid, a backwards scan should visit all pages of the
382 * large object in reverse pageno order. So, it's sufficient to examine
383 * the first valid tuple (== last valid page).
384 */
385 tuple = systable_getnext_ordered(sd, BackwardScanDirection);
386 if (HeapTupleIsValid(tuple))
387 {
388 Form_pg_largeobject data;
389 bytea *datafield;
390 int len;
391 bool pfreeit;
392
393 if (HeapTupleHasNulls(tuple)) /* paranoia */
394 elog(ERROR, "null field found in pg_largeobject");
395 data = (Form_pg_largeobject) GETSTRUCT(tuple);
396 getdatafield(data, &datafield, &len, &pfreeit);
397 lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
398 if (pfreeit)
399 pfree(datafield);
400 }
401
402 systable_endscan_ordered(sd);
403
404 return lastbyte;
405 }
406
407 int64
inv_seek(LargeObjectDesc * obj_desc,int64 offset,int whence)408 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
409 {
410 int64 newoffset;
411
412 Assert(PointerIsValid(obj_desc));
413
414 /*
415 * Note: overflow in the additions is possible, but since we will reject
416 * negative results, we don't need any extra test for that.
417 */
418 switch (whence)
419 {
420 case SEEK_SET:
421 newoffset = offset;
422 break;
423 case SEEK_CUR:
424 newoffset = obj_desc->offset + offset;
425 break;
426 case SEEK_END:
427 newoffset = inv_getsize(obj_desc) + offset;
428 break;
429 default:
430 ereport(ERROR,
431 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
432 errmsg("invalid whence setting: %d", whence)));
433 newoffset = 0; /* keep compiler quiet */
434 break;
435 }
436
437 /*
438 * use errmsg_internal here because we don't want to expose INT64_FORMAT
439 * in translatable strings; doing better is not worth the trouble
440 */
441 if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
442 ereport(ERROR,
443 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
444 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
445 newoffset)));
446
447 obj_desc->offset = newoffset;
448 return newoffset;
449 }
450
451 int64
inv_tell(LargeObjectDesc * obj_desc)452 inv_tell(LargeObjectDesc *obj_desc)
453 {
454 Assert(PointerIsValid(obj_desc));
455
456 return obj_desc->offset;
457 }
458
459 int
inv_read(LargeObjectDesc * obj_desc,char * buf,int nbytes)460 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
461 {
462 int nread = 0;
463 int64 n;
464 int64 off;
465 int len;
466 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
467 uint64 pageoff;
468 ScanKeyData skey[2];
469 SysScanDesc sd;
470 HeapTuple tuple;
471
472 Assert(PointerIsValid(obj_desc));
473 Assert(buf != NULL);
474
475 if (nbytes <= 0)
476 return 0;
477
478 open_lo_relation();
479
480 ScanKeyInit(&skey[0],
481 Anum_pg_largeobject_loid,
482 BTEqualStrategyNumber, F_OIDEQ,
483 ObjectIdGetDatum(obj_desc->id));
484
485 ScanKeyInit(&skey[1],
486 Anum_pg_largeobject_pageno,
487 BTGreaterEqualStrategyNumber, F_INT4GE,
488 Int32GetDatum(pageno));
489
490 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
491 obj_desc->snapshot, 2, skey);
492
493 while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
494 {
495 Form_pg_largeobject data;
496 bytea *datafield;
497 bool pfreeit;
498
499 if (HeapTupleHasNulls(tuple)) /* paranoia */
500 elog(ERROR, "null field found in pg_largeobject");
501 data = (Form_pg_largeobject) GETSTRUCT(tuple);
502
503 /*
504 * We expect the indexscan will deliver pages in order. However,
505 * there may be missing pages if the LO contains unwritten "holes". We
506 * want missing sections to read out as zeroes.
507 */
508 pageoff = ((uint64) data->pageno) * LOBLKSIZE;
509 if (pageoff > obj_desc->offset)
510 {
511 n = pageoff - obj_desc->offset;
512 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
513 MemSet(buf + nread, 0, n);
514 nread += n;
515 obj_desc->offset += n;
516 }
517
518 if (nread < nbytes)
519 {
520 Assert(obj_desc->offset >= pageoff);
521 off = (int) (obj_desc->offset - pageoff);
522 Assert(off >= 0 && off < LOBLKSIZE);
523
524 getdatafield(data, &datafield, &len, &pfreeit);
525 if (len > off)
526 {
527 n = len - off;
528 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
529 memcpy(buf + nread, VARDATA(datafield) + off, n);
530 nread += n;
531 obj_desc->offset += n;
532 }
533 if (pfreeit)
534 pfree(datafield);
535 }
536
537 if (nread >= nbytes)
538 break;
539 }
540
541 systable_endscan_ordered(sd);
542
543 return nread;
544 }
545
546 int
inv_write(LargeObjectDesc * obj_desc,const char * buf,int nbytes)547 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
548 {
549 int nwritten = 0;
550 int n;
551 int off;
552 int len;
553 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
554 ScanKeyData skey[2];
555 SysScanDesc sd;
556 HeapTuple oldtuple;
557 Form_pg_largeobject olddata;
558 bool neednextpage;
559 bytea *datafield;
560 bool pfreeit;
561 union
562 {
563 bytea hdr;
564 /* this is to make the union big enough for a LO data chunk: */
565 char data[LOBLKSIZE + VARHDRSZ];
566 /* ensure union is aligned well enough: */
567 int32 align_it;
568 } workbuf;
569 char *workb = VARDATA(&workbuf.hdr);
570 HeapTuple newtup;
571 Datum values[Natts_pg_largeobject];
572 bool nulls[Natts_pg_largeobject];
573 bool replace[Natts_pg_largeobject];
574 CatalogIndexState indstate;
575
576 Assert(PointerIsValid(obj_desc));
577 Assert(buf != NULL);
578
579 /* enforce writability because snapshot is probably wrong otherwise */
580 Assert(obj_desc->flags & IFS_WRLOCK);
581
582 if (nbytes <= 0)
583 return 0;
584
585 /* this addition can't overflow because nbytes is only int32 */
586 if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
587 ereport(ERROR,
588 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
589 errmsg("invalid large object write request size: %d",
590 nbytes)));
591
592 open_lo_relation();
593
594 indstate = CatalogOpenIndexes(lo_heap_r);
595
596 ScanKeyInit(&skey[0],
597 Anum_pg_largeobject_loid,
598 BTEqualStrategyNumber, F_OIDEQ,
599 ObjectIdGetDatum(obj_desc->id));
600
601 ScanKeyInit(&skey[1],
602 Anum_pg_largeobject_pageno,
603 BTGreaterEqualStrategyNumber, F_INT4GE,
604 Int32GetDatum(pageno));
605
606 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
607 obj_desc->snapshot, 2, skey);
608
609 oldtuple = NULL;
610 olddata = NULL;
611 neednextpage = true;
612
613 while (nwritten < nbytes)
614 {
615 /*
616 * If possible, get next pre-existing page of the LO. We expect the
617 * indexscan will deliver these in order --- but there may be holes.
618 */
619 if (neednextpage)
620 {
621 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
622 {
623 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
624 elog(ERROR, "null field found in pg_largeobject");
625 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
626 Assert(olddata->pageno >= pageno);
627 }
628 neednextpage = false;
629 }
630
631 /*
632 * If we have a pre-existing page, see if it is the page we want to
633 * write, or a later one.
634 */
635 if (olddata != NULL && olddata->pageno == pageno)
636 {
637 /*
638 * Update an existing page with fresh data.
639 *
640 * First, load old data into workbuf
641 */
642 getdatafield(olddata, &datafield, &len, &pfreeit);
643 memcpy(workb, VARDATA(datafield), len);
644 if (pfreeit)
645 pfree(datafield);
646
647 /*
648 * Fill any hole
649 */
650 off = (int) (obj_desc->offset % LOBLKSIZE);
651 if (off > len)
652 MemSet(workb + len, 0, off - len);
653
654 /*
655 * Insert appropriate portion of new data
656 */
657 n = LOBLKSIZE - off;
658 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
659 memcpy(workb + off, buf + nwritten, n);
660 nwritten += n;
661 obj_desc->offset += n;
662 off += n;
663 /* compute valid length of new page */
664 len = (len >= off) ? len : off;
665 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
666
667 /*
668 * Form and insert updated tuple
669 */
670 memset(values, 0, sizeof(values));
671 memset(nulls, false, sizeof(nulls));
672 memset(replace, false, sizeof(replace));
673 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
674 replace[Anum_pg_largeobject_data - 1] = true;
675 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
676 values, nulls, replace);
677 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
678 indstate);
679 heap_freetuple(newtup);
680
681 /*
682 * We're done with this old page.
683 */
684 oldtuple = NULL;
685 olddata = NULL;
686 neednextpage = true;
687 }
688 else
689 {
690 /*
691 * Write a brand new page.
692 *
693 * First, fill any hole
694 */
695 off = (int) (obj_desc->offset % LOBLKSIZE);
696 if (off > 0)
697 MemSet(workb, 0, off);
698
699 /*
700 * Insert appropriate portion of new data
701 */
702 n = LOBLKSIZE - off;
703 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
704 memcpy(workb + off, buf + nwritten, n);
705 nwritten += n;
706 obj_desc->offset += n;
707 /* compute valid length of new page */
708 len = off + n;
709 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
710
711 /*
712 * Form and insert updated tuple
713 */
714 memset(values, 0, sizeof(values));
715 memset(nulls, false, sizeof(nulls));
716 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
717 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
718 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
719 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
720 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
721 heap_freetuple(newtup);
722 }
723 pageno++;
724 }
725
726 systable_endscan_ordered(sd);
727
728 CatalogCloseIndexes(indstate);
729
730 /*
731 * Advance command counter so that my tuple updates will be seen by later
732 * large-object operations in this transaction.
733 */
734 CommandCounterIncrement();
735
736 return nwritten;
737 }
738
739 void
inv_truncate(LargeObjectDesc * obj_desc,int64 len)740 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
741 {
742 int32 pageno = (int32) (len / LOBLKSIZE);
743 int32 off;
744 ScanKeyData skey[2];
745 SysScanDesc sd;
746 HeapTuple oldtuple;
747 Form_pg_largeobject olddata;
748 union
749 {
750 bytea hdr;
751 /* this is to make the union big enough for a LO data chunk: */
752 char data[LOBLKSIZE + VARHDRSZ];
753 /* ensure union is aligned well enough: */
754 int32 align_it;
755 } workbuf;
756 char *workb = VARDATA(&workbuf.hdr);
757 HeapTuple newtup;
758 Datum values[Natts_pg_largeobject];
759 bool nulls[Natts_pg_largeobject];
760 bool replace[Natts_pg_largeobject];
761 CatalogIndexState indstate;
762
763 Assert(PointerIsValid(obj_desc));
764
765 /* enforce writability because snapshot is probably wrong otherwise */
766 Assert(obj_desc->flags & IFS_WRLOCK);
767
768 /*
769 * use errmsg_internal here because we don't want to expose INT64_FORMAT
770 * in translatable strings; doing better is not worth the trouble
771 */
772 if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
773 ereport(ERROR,
774 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
775 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
776 len)));
777
778 open_lo_relation();
779
780 indstate = CatalogOpenIndexes(lo_heap_r);
781
782 /*
783 * Set up to find all pages with desired loid and pageno >= target
784 */
785 ScanKeyInit(&skey[0],
786 Anum_pg_largeobject_loid,
787 BTEqualStrategyNumber, F_OIDEQ,
788 ObjectIdGetDatum(obj_desc->id));
789
790 ScanKeyInit(&skey[1],
791 Anum_pg_largeobject_pageno,
792 BTGreaterEqualStrategyNumber, F_INT4GE,
793 Int32GetDatum(pageno));
794
795 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
796 obj_desc->snapshot, 2, skey);
797
798 /*
799 * If possible, get the page the truncation point is in. The truncation
800 * point may be beyond the end of the LO or in a hole.
801 */
802 olddata = NULL;
803 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
804 {
805 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
806 elog(ERROR, "null field found in pg_largeobject");
807 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
808 Assert(olddata->pageno >= pageno);
809 }
810
811 /*
812 * If we found the page of the truncation point we need to truncate the
813 * data in it. Otherwise if we're in a hole, we need to create a page to
814 * mark the end of data.
815 */
816 if (olddata != NULL && olddata->pageno == pageno)
817 {
818 /* First, load old data into workbuf */
819 bytea *datafield;
820 int pagelen;
821 bool pfreeit;
822
823 getdatafield(olddata, &datafield, &pagelen, &pfreeit);
824 memcpy(workb, VARDATA(datafield), pagelen);
825 if (pfreeit)
826 pfree(datafield);
827
828 /*
829 * Fill any hole
830 */
831 off = len % LOBLKSIZE;
832 if (off > pagelen)
833 MemSet(workb + pagelen, 0, off - pagelen);
834
835 /* compute length of new page */
836 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
837
838 /*
839 * Form and insert updated tuple
840 */
841 memset(values, 0, sizeof(values));
842 memset(nulls, false, sizeof(nulls));
843 memset(replace, false, sizeof(replace));
844 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
845 replace[Anum_pg_largeobject_data - 1] = true;
846 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
847 values, nulls, replace);
848 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
849 indstate);
850 heap_freetuple(newtup);
851 }
852 else
853 {
854 /*
855 * If the first page we found was after the truncation point, we're in
856 * a hole that we'll fill, but we need to delete the later page
857 * because the loop below won't visit it again.
858 */
859 if (olddata != NULL)
860 {
861 Assert(olddata->pageno > pageno);
862 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
863 }
864
865 /*
866 * Write a brand new page.
867 *
868 * Fill the hole up to the truncation point
869 */
870 off = len % LOBLKSIZE;
871 if (off > 0)
872 MemSet(workb, 0, off);
873
874 /* compute length of new page */
875 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
876
877 /*
878 * Form and insert new tuple
879 */
880 memset(values, 0, sizeof(values));
881 memset(nulls, false, sizeof(nulls));
882 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
883 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
884 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
885 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
886 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
887 heap_freetuple(newtup);
888 }
889
890 /*
891 * Delete any pages after the truncation point. If the initial search
892 * didn't find a page, then of course there's nothing more to do.
893 */
894 if (olddata != NULL)
895 {
896 while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
897 {
898 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
899 }
900 }
901
902 systable_endscan_ordered(sd);
903
904 CatalogCloseIndexes(indstate);
905
906 /*
907 * Advance command counter so that tuple updates will be seen by later
908 * large-object operations in this transaction.
909 */
910 CommandCounterIncrement();
911 }
912