1 /*-------------------------------------------------------------------------
2  *
3  * hashinsert.c
4  *	  Item insertion in hash tables for Postgres.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hashinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/hash.h"
19 #include "access/hash_xlog.h"
20 #include "access/heapam.h"
21 #include "miscadmin.h"
22 #include "utils/rel.h"
23 #include "storage/lwlock.h"
24 #include "storage/buf_internals.h"
25 
26 static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
27 					  RelFileNode hnode);
28 
29 /*
30  *	_hash_doinsert() -- Handle insertion of a single index tuple.
31  *
32  *		This routine is called by the public interface routines, hashbuild
33  *		and hashinsert.  By here, itup is completely filled in.
34  */
35 void
_hash_doinsert(Relation rel,IndexTuple itup,Relation heapRel)36 _hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
37 {
38 	Buffer		buf = InvalidBuffer;
39 	Buffer		bucket_buf;
40 	Buffer		metabuf;
41 	HashMetaPage metap;
42 	HashMetaPage usedmetap = NULL;
43 	Page		metapage;
44 	Page		page;
45 	HashPageOpaque pageopaque;
46 	Size		itemsz;
47 	bool		do_expand;
48 	uint32		hashkey;
49 	Bucket		bucket;
50 	OffsetNumber itup_off;
51 
52 	/*
53 	 * Get the hash key for the item (it's stored in the index tuple itself).
54 	 */
55 	hashkey = _hash_get_indextuple_hashkey(itup);
56 
57 	/* compute item size too */
58 	itemsz = IndexTupleDSize(*itup);
59 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
60 								 * need to be consistent */
61 
62 restart_insert:
63 
64 	/*
65 	 * Read the metapage.  We don't lock it yet; HashMaxItemSize() will
66 	 * examine pd_pagesize_version, but that can't change so we can examine it
67 	 * without a lock.
68 	 */
69 	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
70 	metapage = BufferGetPage(metabuf);
71 
72 	/*
73 	 * Check whether the item can fit on a hash page at all. (Eventually, we
74 	 * ought to try to apply TOAST methods if not.)  Note that at this point,
75 	 * itemsz doesn't include the ItemId.
76 	 *
77 	 * XXX this is useless code if we are only storing hash keys.
78 	 */
79 	if (itemsz > HashMaxItemSize(metapage))
80 		ereport(ERROR,
81 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
82 				 errmsg("index row size %zu exceeds hash maximum %zu",
83 						itemsz, HashMaxItemSize(metapage)),
84 				 errhint("Values larger than a buffer page cannot be indexed.")));
85 
86 	/* Lock the primary bucket page for the target bucket. */
87 	buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_WRITE,
88 										  &usedmetap);
89 	Assert(usedmetap != NULL);
90 
91 	/* remember the primary bucket buffer to release the pin on it at end. */
92 	bucket_buf = buf;
93 
94 	page = BufferGetPage(buf);
95 	pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
96 	bucket = pageopaque->hasho_bucket;
97 
98 	/*
99 	 * If this bucket is in the process of being split, try to finish the
100 	 * split before inserting, because that might create room for the
101 	 * insertion to proceed without allocating an additional overflow page.
102 	 * It's only interesting to finish the split if we're trying to insert
103 	 * into the bucket from which we're removing tuples (the "old" bucket),
104 	 * not if we're trying to insert into the bucket into which tuples are
105 	 * being moved (the "new" bucket).
106 	 */
107 	if (H_BUCKET_BEING_SPLIT(pageopaque) && IsBufferCleanupOK(buf))
108 	{
109 		/* release the lock on bucket buffer, before completing the split. */
110 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
111 
112 		_hash_finish_split(rel, metabuf, buf, bucket,
113 						   usedmetap->hashm_maxbucket,
114 						   usedmetap->hashm_highmask,
115 						   usedmetap->hashm_lowmask);
116 
117 		/* release the pin on old and meta buffer.  retry for insert. */
118 		_hash_dropbuf(rel, buf);
119 		_hash_dropbuf(rel, metabuf);
120 		goto restart_insert;
121 	}
122 
123 	/* Do the insertion */
124 	while (PageGetFreeSpace(page) < itemsz)
125 	{
126 		BlockNumber nextblkno;
127 
128 		/*
129 		 * Check if current page has any DEAD tuples. If yes, delete these
130 		 * tuples and see if we can get a space for the new item to be
131 		 * inserted before moving to the next page in the bucket chain.
132 		 */
133 		if (H_HAS_DEAD_TUPLES(pageopaque))
134 		{
135 
136 			if (IsBufferCleanupOK(buf))
137 			{
138 				_hash_vacuum_one_page(rel, metabuf, buf, heapRel->rd_node);
139 
140 				if (PageGetFreeSpace(page) >= itemsz)
141 					break;		/* OK, now we have enough space */
142 			}
143 		}
144 
145 		/*
146 		 * no space on this page; check for an overflow page
147 		 */
148 		nextblkno = pageopaque->hasho_nextblkno;
149 
150 		if (BlockNumberIsValid(nextblkno))
151 		{
152 			/*
153 			 * ovfl page exists; go get it.  if it doesn't have room, we'll
154 			 * find out next pass through the loop test above.  we always
155 			 * release both the lock and pin if this is an overflow page, but
156 			 * only the lock if this is the primary bucket page, since the pin
157 			 * on the primary bucket must be retained throughout the scan.
158 			 */
159 			if (buf != bucket_buf)
160 				_hash_relbuf(rel, buf);
161 			else
162 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
163 			buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
164 			page = BufferGetPage(buf);
165 		}
166 		else
167 		{
168 			/*
169 			 * we're at the end of the bucket chain and we haven't found a
170 			 * page with enough room.  allocate a new overflow page.
171 			 */
172 
173 			/* release our write lock without modifying buffer */
174 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
175 
176 			/* chain to a new overflow page */
177 			buf = _hash_addovflpage(rel, metabuf, buf, (buf == bucket_buf) ? true : false);
178 			page = BufferGetPage(buf);
179 
180 			/* should fit now, given test above */
181 			Assert(PageGetFreeSpace(page) >= itemsz);
182 		}
183 		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
184 		Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
185 		Assert(pageopaque->hasho_bucket == bucket);
186 	}
187 
188 	/*
189 	 * Write-lock the metapage so we can increment the tuple count. After
190 	 * incrementing it, check to see if it's time for a split.
191 	 */
192 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
193 
194 	/* Do the update.  No ereport(ERROR) until changes are logged */
195 	START_CRIT_SECTION();
196 
197 	/* found page with enough space, so add the item here */
198 	itup_off = _hash_pgaddtup(rel, buf, itemsz, itup);
199 	MarkBufferDirty(buf);
200 
201 	/* metapage operations */
202 	metap = HashPageGetMeta(metapage);
203 	metap->hashm_ntuples += 1;
204 
205 	/* Make sure this stays in sync with _hash_expandtable() */
206 	do_expand = metap->hashm_ntuples >
207 		(double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
208 
209 	MarkBufferDirty(metabuf);
210 
211 	/* XLOG stuff */
212 	if (RelationNeedsWAL(rel))
213 	{
214 		xl_hash_insert xlrec;
215 		XLogRecPtr	recptr;
216 
217 		xlrec.offnum = itup_off;
218 
219 		XLogBeginInsert();
220 		XLogRegisterData((char *) &xlrec, SizeOfHashInsert);
221 
222 		XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
223 
224 		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
225 		XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
226 
227 		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);
228 
229 		PageSetLSN(BufferGetPage(buf), recptr);
230 		PageSetLSN(BufferGetPage(metabuf), recptr);
231 	}
232 
233 	END_CRIT_SECTION();
234 
235 	/* drop lock on metapage, but keep pin */
236 	LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
237 
238 	/*
239 	 * Release the modified page and ensure to release the pin on primary
240 	 * page.
241 	 */
242 	_hash_relbuf(rel, buf);
243 	if (buf != bucket_buf)
244 		_hash_dropbuf(rel, bucket_buf);
245 
246 	/* Attempt to split if a split is needed */
247 	if (do_expand)
248 		_hash_expandtable(rel, metabuf);
249 
250 	/* Finally drop our pin on the metapage */
251 	_hash_dropbuf(rel, metabuf);
252 }
253 
254 /*
255  *	_hash_pgaddtup() -- add a tuple to a particular page in the index.
256  *
257  * This routine adds the tuple to the page as requested; it does not write out
258  * the page.  It is an error to call pgaddtup() without pin and write lock on
259  * the target buffer.
260  *
261  * Returns the offset number at which the tuple was inserted.  This function
262  * is responsible for preserving the condition that tuples in a hash index
263  * page are sorted by hashkey value.
264  */
265 OffsetNumber
_hash_pgaddtup(Relation rel,Buffer buf,Size itemsize,IndexTuple itup)266 _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
267 {
268 	OffsetNumber itup_off;
269 	Page		page;
270 	uint32		hashkey;
271 
272 	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
273 	page = BufferGetPage(buf);
274 
275 	/* Find where to insert the tuple (preserving page's hashkey ordering) */
276 	hashkey = _hash_get_indextuple_hashkey(itup);
277 	itup_off = _hash_binsearch(page, hashkey);
278 
279 	if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
280 		== InvalidOffsetNumber)
281 		elog(ERROR, "failed to add index item to \"%s\"",
282 			 RelationGetRelationName(rel));
283 
284 	return itup_off;
285 }
286 
287 /*
288  *	_hash_pgaddmultitup() -- add a tuple vector to a particular page in the
289  *							 index.
290  *
291  * This routine has same requirements for locking and tuple ordering as
292  * _hash_pgaddtup().
293  *
294  * Returns the offset number array at which the tuples were inserted.
295  */
296 void
_hash_pgaddmultitup(Relation rel,Buffer buf,IndexTuple * itups,OffsetNumber * itup_offsets,uint16 nitups)297 _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
298 					OffsetNumber *itup_offsets, uint16 nitups)
299 {
300 	OffsetNumber itup_off;
301 	Page		page;
302 	uint32		hashkey;
303 	int			i;
304 
305 	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
306 	page = BufferGetPage(buf);
307 
308 	for (i = 0; i < nitups; i++)
309 	{
310 		Size		itemsize;
311 
312 		itemsize = IndexTupleDSize(*itups[i]);
313 		itemsize = MAXALIGN(itemsize);
314 
315 		/* Find where to insert the tuple (preserving page's hashkey ordering) */
316 		hashkey = _hash_get_indextuple_hashkey(itups[i]);
317 		itup_off = _hash_binsearch(page, hashkey);
318 
319 		itup_offsets[i] = itup_off;
320 
321 		if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
322 			== InvalidOffsetNumber)
323 			elog(ERROR, "failed to add index item to \"%s\"",
324 				 RelationGetRelationName(rel));
325 	}
326 }
327 
328 /*
329  * _hash_vacuum_one_page - vacuum just one index page.
330  *
331  * Try to remove LP_DEAD items from the given page. We must acquire cleanup
332  * lock on the page being modified before calling this function.
333  */
334 
335 static void
_hash_vacuum_one_page(Relation rel,Buffer metabuf,Buffer buf,RelFileNode hnode)336 _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
337 					  RelFileNode hnode)
338 {
339 	OffsetNumber deletable[MaxOffsetNumber];
340 	int			ndeletable = 0;
341 	OffsetNumber offnum,
342 				maxoff;
343 	Page		page = BufferGetPage(buf);
344 	HashPageOpaque pageopaque;
345 	HashMetaPage metap;
346 
347 	/* Scan each tuple in page to see if it is marked as LP_DEAD */
348 	maxoff = PageGetMaxOffsetNumber(page);
349 	for (offnum = FirstOffsetNumber;
350 		 offnum <= maxoff;
351 		 offnum = OffsetNumberNext(offnum))
352 	{
353 		ItemId		itemId = PageGetItemId(page, offnum);
354 
355 		if (ItemIdIsDead(itemId))
356 			deletable[ndeletable++] = offnum;
357 	}
358 
359 	if (ndeletable > 0)
360 	{
361 		/*
362 		 * Write-lock the meta page so that we can decrement tuple count.
363 		 */
364 		LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
365 
366 		/* No ereport(ERROR) until changes are logged */
367 		START_CRIT_SECTION();
368 
369 		PageIndexMultiDelete(page, deletable, ndeletable);
370 
371 		/*
372 		 * Mark the page as not containing any LP_DEAD items. This is not
373 		 * certainly true (there might be some that have recently been marked,
374 		 * but weren't included in our target-item list), but it will almost
375 		 * always be true and it doesn't seem worth an additional page scan to
376 		 * check it. Remember that LH_PAGE_HAS_DEAD_TUPLES is only a hint
377 		 * anyway.
378 		 */
379 		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
380 		pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
381 
382 		metap = HashPageGetMeta(BufferGetPage(metabuf));
383 		metap->hashm_ntuples -= ndeletable;
384 
385 		MarkBufferDirty(buf);
386 		MarkBufferDirty(metabuf);
387 
388 		/* XLOG stuff */
389 		if (RelationNeedsWAL(rel))
390 		{
391 			xl_hash_vacuum_one_page xlrec;
392 			XLogRecPtr	recptr;
393 
394 			xlrec.hnode = hnode;
395 			xlrec.ntuples = ndeletable;
396 
397 			XLogBeginInsert();
398 			XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
399 			XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
400 
401 			/*
402 			 * We need the target-offsets array whether or not we store the
403 			 * whole buffer, to allow us to find the latestRemovedXid on a
404 			 * standby server.
405 			 */
406 			XLogRegisterData((char *) deletable,
407 							 ndeletable * sizeof(OffsetNumber));
408 
409 			XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
410 
411 			recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
412 
413 			PageSetLSN(BufferGetPage(buf), recptr);
414 			PageSetLSN(BufferGetPage(metabuf), recptr);
415 		}
416 
417 		END_CRIT_SECTION();
418 
419 		/*
420 		 * Releasing write lock on meta page as we have updated the tuple
421 		 * count.
422 		 */
423 		LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
424 	}
425 }
426