1 /*-------------------------------------------------------------------------
2  *
3  * hashinsert.c
4  *	  Item insertion in hash tables for Postgres.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hashinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/hash.h"
19 #include "access/hash_xlog.h"
20 #include "miscadmin.h"
21 #include "storage/buf_internals.h"
22 #include "storage/lwlock.h"
23 #include "storage/predicate.h"
24 #include "utils/rel.h"
25 
26 static void _hash_vacuum_one_page(Relation rel, Relation hrel,
27 								  Buffer metabuf, Buffer buf);
28 
29 /*
30  *	_hash_doinsert() -- Handle insertion of a single index tuple.
31  *
32  *		This routine is called by the public interface routines, hashbuild
33  *		and hashinsert.  By here, itup is completely filled in.
34  */
35 void
_hash_doinsert(Relation rel,IndexTuple itup,Relation heapRel)36 _hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
37 {
38 	Buffer		buf = InvalidBuffer;
39 	Buffer		bucket_buf;
40 	Buffer		metabuf;
41 	HashMetaPage metap;
42 	HashMetaPage usedmetap = NULL;
43 	Page		metapage;
44 	Page		page;
45 	HashPageOpaque pageopaque;
46 	Size		itemsz;
47 	bool		do_expand;
48 	uint32		hashkey;
49 	Bucket		bucket;
50 	OffsetNumber itup_off;
51 
52 	/*
53 	 * Get the hash key for the item (it's stored in the index tuple itself).
54 	 */
55 	hashkey = _hash_get_indextuple_hashkey(itup);
56 
57 	/* compute item size too */
58 	itemsz = IndexTupleSize(itup);
59 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
60 								 * need to be consistent */
61 
62 restart_insert:
63 
64 	/*
65 	 * Read the metapage.  We don't lock it yet; HashMaxItemSize() will
66 	 * examine pd_pagesize_version, but that can't change so we can examine it
67 	 * without a lock.
68 	 */
69 	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
70 	metapage = BufferGetPage(metabuf);
71 
72 	/*
73 	 * Check whether the item can fit on a hash page at all. (Eventually, we
74 	 * ought to try to apply TOAST methods if not.)  Note that at this point,
75 	 * itemsz doesn't include the ItemId.
76 	 *
77 	 * XXX this is useless code if we are only storing hash keys.
78 	 */
79 	if (itemsz > HashMaxItemSize(metapage))
80 		ereport(ERROR,
81 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
82 				 errmsg("index row size %zu exceeds hash maximum %zu",
83 						itemsz, HashMaxItemSize(metapage)),
84 				 errhint("Values larger than a buffer page cannot be indexed.")));
85 
86 	/* Lock the primary bucket page for the target bucket. */
87 	buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_WRITE,
88 										  &usedmetap);
89 	Assert(usedmetap != NULL);
90 
91 	CheckForSerializableConflictIn(rel, NULL, BufferGetBlockNumber(buf));
92 
93 	/* remember the primary bucket buffer to release the pin on it at end. */
94 	bucket_buf = buf;
95 
96 	page = BufferGetPage(buf);
97 	pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
98 	bucket = pageopaque->hasho_bucket;
99 
100 	/*
101 	 * If this bucket is in the process of being split, try to finish the
102 	 * split before inserting, because that might create room for the
103 	 * insertion to proceed without allocating an additional overflow page.
104 	 * It's only interesting to finish the split if we're trying to insert
105 	 * into the bucket from which we're removing tuples (the "old" bucket),
106 	 * not if we're trying to insert into the bucket into which tuples are
107 	 * being moved (the "new" bucket).
108 	 */
109 	if (H_BUCKET_BEING_SPLIT(pageopaque) && IsBufferCleanupOK(buf))
110 	{
111 		/* release the lock on bucket buffer, before completing the split. */
112 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
113 
114 		_hash_finish_split(rel, metabuf, buf, bucket,
115 						   usedmetap->hashm_maxbucket,
116 						   usedmetap->hashm_highmask,
117 						   usedmetap->hashm_lowmask);
118 
119 		/* release the pin on old and meta buffer.  retry for insert. */
120 		_hash_dropbuf(rel, buf);
121 		_hash_dropbuf(rel, metabuf);
122 		goto restart_insert;
123 	}
124 
125 	/* Do the insertion */
126 	while (PageGetFreeSpace(page) < itemsz)
127 	{
128 		BlockNumber nextblkno;
129 
130 		/*
131 		 * Check if current page has any DEAD tuples. If yes, delete these
132 		 * tuples and see if we can get a space for the new item to be
133 		 * inserted before moving to the next page in the bucket chain.
134 		 */
135 		if (H_HAS_DEAD_TUPLES(pageopaque))
136 		{
137 
138 			if (IsBufferCleanupOK(buf))
139 			{
140 				_hash_vacuum_one_page(rel, heapRel, metabuf, buf);
141 
142 				if (PageGetFreeSpace(page) >= itemsz)
143 					break;		/* OK, now we have enough space */
144 			}
145 		}
146 
147 		/*
148 		 * no space on this page; check for an overflow page
149 		 */
150 		nextblkno = pageopaque->hasho_nextblkno;
151 
152 		if (BlockNumberIsValid(nextblkno))
153 		{
154 			/*
155 			 * ovfl page exists; go get it.  if it doesn't have room, we'll
156 			 * find out next pass through the loop test above.  we always
157 			 * release both the lock and pin if this is an overflow page, but
158 			 * only the lock if this is the primary bucket page, since the pin
159 			 * on the primary bucket must be retained throughout the scan.
160 			 */
161 			if (buf != bucket_buf)
162 				_hash_relbuf(rel, buf);
163 			else
164 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
165 			buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
166 			page = BufferGetPage(buf);
167 		}
168 		else
169 		{
170 			/*
171 			 * we're at the end of the bucket chain and we haven't found a
172 			 * page with enough room.  allocate a new overflow page.
173 			 */
174 
175 			/* release our write lock without modifying buffer */
176 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
177 
178 			/* chain to a new overflow page */
179 			buf = _hash_addovflpage(rel, metabuf, buf, (buf == bucket_buf) ? true : false);
180 			page = BufferGetPage(buf);
181 
182 			/* should fit now, given test above */
183 			Assert(PageGetFreeSpace(page) >= itemsz);
184 		}
185 		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
186 		Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
187 		Assert(pageopaque->hasho_bucket == bucket);
188 	}
189 
190 	/*
191 	 * Write-lock the metapage so we can increment the tuple count. After
192 	 * incrementing it, check to see if it's time for a split.
193 	 */
194 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
195 
196 	/* Do the update.  No ereport(ERROR) until changes are logged */
197 	START_CRIT_SECTION();
198 
199 	/* found page with enough space, so add the item here */
200 	itup_off = _hash_pgaddtup(rel, buf, itemsz, itup);
201 	MarkBufferDirty(buf);
202 
203 	/* metapage operations */
204 	metap = HashPageGetMeta(metapage);
205 	metap->hashm_ntuples += 1;
206 
207 	/* Make sure this stays in sync with _hash_expandtable() */
208 	do_expand = metap->hashm_ntuples >
209 		(double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
210 
211 	MarkBufferDirty(metabuf);
212 
213 	/* XLOG stuff */
214 	if (RelationNeedsWAL(rel))
215 	{
216 		xl_hash_insert xlrec;
217 		XLogRecPtr	recptr;
218 
219 		xlrec.offnum = itup_off;
220 
221 		XLogBeginInsert();
222 		XLogRegisterData((char *) &xlrec, SizeOfHashInsert);
223 
224 		XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
225 
226 		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
227 		XLogRegisterBufData(0, (char *) itup, IndexTupleSize(itup));
228 
229 		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);
230 
231 		PageSetLSN(BufferGetPage(buf), recptr);
232 		PageSetLSN(BufferGetPage(metabuf), recptr);
233 	}
234 
235 	END_CRIT_SECTION();
236 
237 	/* drop lock on metapage, but keep pin */
238 	LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
239 
240 	/*
241 	 * Release the modified page and ensure to release the pin on primary
242 	 * page.
243 	 */
244 	_hash_relbuf(rel, buf);
245 	if (buf != bucket_buf)
246 		_hash_dropbuf(rel, bucket_buf);
247 
248 	/* Attempt to split if a split is needed */
249 	if (do_expand)
250 		_hash_expandtable(rel, metabuf);
251 
252 	/* Finally drop our pin on the metapage */
253 	_hash_dropbuf(rel, metabuf);
254 }
255 
256 /*
257  *	_hash_pgaddtup() -- add a tuple to a particular page in the index.
258  *
259  * This routine adds the tuple to the page as requested; it does not write out
260  * the page.  It is an error to call this function without pin and write lock
261  * on the target buffer.
262  *
263  * Returns the offset number at which the tuple was inserted.  This function
264  * is responsible for preserving the condition that tuples in a hash index
265  * page are sorted by hashkey value.
266  */
267 OffsetNumber
_hash_pgaddtup(Relation rel,Buffer buf,Size itemsize,IndexTuple itup)268 _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
269 {
270 	OffsetNumber itup_off;
271 	Page		page;
272 	uint32		hashkey;
273 
274 	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
275 	page = BufferGetPage(buf);
276 
277 	/* Find where to insert the tuple (preserving page's hashkey ordering) */
278 	hashkey = _hash_get_indextuple_hashkey(itup);
279 	itup_off = _hash_binsearch(page, hashkey);
280 
281 	if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
282 		== InvalidOffsetNumber)
283 		elog(ERROR, "failed to add index item to \"%s\"",
284 			 RelationGetRelationName(rel));
285 
286 	return itup_off;
287 }
288 
289 /*
290  *	_hash_pgaddmultitup() -- add a tuple vector to a particular page in the
291  *							 index.
292  *
293  * This routine has same requirements for locking and tuple ordering as
294  * _hash_pgaddtup().
295  *
296  * Returns the offset number array at which the tuples were inserted.
297  */
298 void
_hash_pgaddmultitup(Relation rel,Buffer buf,IndexTuple * itups,OffsetNumber * itup_offsets,uint16 nitups)299 _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
300 					OffsetNumber *itup_offsets, uint16 nitups)
301 {
302 	OffsetNumber itup_off;
303 	Page		page;
304 	uint32		hashkey;
305 	int			i;
306 
307 	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
308 	page = BufferGetPage(buf);
309 
310 	for (i = 0; i < nitups; i++)
311 	{
312 		Size		itemsize;
313 
314 		itemsize = IndexTupleSize(itups[i]);
315 		itemsize = MAXALIGN(itemsize);
316 
317 		/* Find where to insert the tuple (preserving page's hashkey ordering) */
318 		hashkey = _hash_get_indextuple_hashkey(itups[i]);
319 		itup_off = _hash_binsearch(page, hashkey);
320 
321 		itup_offsets[i] = itup_off;
322 
323 		if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
324 			== InvalidOffsetNumber)
325 			elog(ERROR, "failed to add index item to \"%s\"",
326 				 RelationGetRelationName(rel));
327 	}
328 }
329 
330 /*
331  * _hash_vacuum_one_page - vacuum just one index page.
332  *
333  * Try to remove LP_DEAD items from the given page. We must acquire cleanup
334  * lock on the page being modified before calling this function.
335  */
336 
337 static void
_hash_vacuum_one_page(Relation rel,Relation hrel,Buffer metabuf,Buffer buf)338 _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
339 {
340 	OffsetNumber deletable[MaxOffsetNumber];
341 	int			ndeletable = 0;
342 	OffsetNumber offnum,
343 				maxoff;
344 	Page		page = BufferGetPage(buf);
345 	HashPageOpaque pageopaque;
346 	HashMetaPage metap;
347 
348 	/* Scan each tuple in page to see if it is marked as LP_DEAD */
349 	maxoff = PageGetMaxOffsetNumber(page);
350 	for (offnum = FirstOffsetNumber;
351 		 offnum <= maxoff;
352 		 offnum = OffsetNumberNext(offnum))
353 	{
354 		ItemId		itemId = PageGetItemId(page, offnum);
355 
356 		if (ItemIdIsDead(itemId))
357 			deletable[ndeletable++] = offnum;
358 	}
359 
360 	if (ndeletable > 0)
361 	{
362 		TransactionId latestRemovedXid;
363 
364 		latestRemovedXid =
365 			index_compute_xid_horizon_for_tuples(rel, hrel, buf,
366 												 deletable, ndeletable);
367 
368 		/*
369 		 * Write-lock the meta page so that we can decrement tuple count.
370 		 */
371 		LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
372 
373 		/* No ereport(ERROR) until changes are logged */
374 		START_CRIT_SECTION();
375 
376 		PageIndexMultiDelete(page, deletable, ndeletable);
377 
378 		/*
379 		 * Mark the page as not containing any LP_DEAD items. This is not
380 		 * certainly true (there might be some that have recently been marked,
381 		 * but weren't included in our target-item list), but it will almost
382 		 * always be true and it doesn't seem worth an additional page scan to
383 		 * check it. Remember that LH_PAGE_HAS_DEAD_TUPLES is only a hint
384 		 * anyway.
385 		 */
386 		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
387 		pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
388 
389 		metap = HashPageGetMeta(BufferGetPage(metabuf));
390 		metap->hashm_ntuples -= ndeletable;
391 
392 		MarkBufferDirty(buf);
393 		MarkBufferDirty(metabuf);
394 
395 		/* XLOG stuff */
396 		if (RelationNeedsWAL(rel))
397 		{
398 			xl_hash_vacuum_one_page xlrec;
399 			XLogRecPtr	recptr;
400 
401 			xlrec.latestRemovedXid = latestRemovedXid;
402 			xlrec.ntuples = ndeletable;
403 
404 			XLogBeginInsert();
405 			XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
406 			XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
407 
408 			/*
409 			 * We need the target-offsets array whether or not we store the
410 			 * whole buffer, to allow us to find the latestRemovedXid on a
411 			 * standby server.
412 			 */
413 			XLogRegisterData((char *) deletable,
414 							 ndeletable * sizeof(OffsetNumber));
415 
416 			XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
417 
418 			recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
419 
420 			PageSetLSN(BufferGetPage(buf), recptr);
421 			PageSetLSN(BufferGetPage(metabuf), recptr);
422 		}
423 
424 		END_CRIT_SECTION();
425 
426 		/*
427 		 * Releasing write lock on meta page as we have updated the tuple
428 		 * count.
429 		 */
430 		LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
431 	}
432 }
433