1 /*-------------------------------------------------------------------------
2  *
3  * hashinsert.c
4  *	  Item insertion in hash tables for Postgres.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hashinsert.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/hash.h"
19 #include "utils/rel.h"
20 
21 
22 /*
23  *	_hash_doinsert() -- Handle insertion of a single index tuple.
24  *
25  *		This routine is called by the public interface routines, hashbuild
26  *		and hashinsert.  By here, itup is completely filled in.
27  */
28 void
_hash_doinsert(Relation rel,IndexTuple itup)29 _hash_doinsert(Relation rel, IndexTuple itup)
30 {
31 	Buffer		buf;
32 	Buffer		metabuf;
33 	HashMetaPage metap;
34 	BlockNumber blkno;
35 	BlockNumber oldblkno = InvalidBlockNumber;
36 	bool		retry = false;
37 	Page		metapage;
38 	Page		page;
39 	HashPageOpaque pageopaque;
40 	Size		itemsz;
41 	bool		do_expand;
42 	uint32		hashkey;
43 	Bucket		bucket;
44 
45 	/*
46 	 * Get the hash key for the item (it's stored in the index tuple itself).
47 	 */
48 	hashkey = _hash_get_indextuple_hashkey(itup);
49 
50 	/* compute item size too */
51 	itemsz = IndexTupleDSize(*itup);
52 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
53 								 * need to be consistent */
54 
55 	/* Read the metapage */
56 	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
57 	metapage = BufferGetPage(metabuf);
58 	metap = HashPageGetMeta(metapage);
59 
60 	/*
61 	 * Check whether the item can fit on a hash page at all. (Eventually, we
62 	 * ought to try to apply TOAST methods if not.)  Note that at this point,
63 	 * itemsz doesn't include the ItemId.
64 	 *
65 	 * XXX this is useless code if we are only storing hash keys.
66 	 */
67 	if (itemsz > HashMaxItemSize(metapage))
68 		ereport(ERROR,
69 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
70 				 errmsg("index row size %zu exceeds hash maximum %zu",
71 						itemsz, HashMaxItemSize(metapage)),
72 			errhint("Values larger than a buffer page cannot be indexed.")));
73 
74 	/*
75 	 * Loop until we get a lock on the correct target bucket.
76 	 */
77 	for (;;)
78 	{
79 		/*
80 		 * Compute the target bucket number, and convert to block number.
81 		 */
82 		bucket = _hash_hashkey2bucket(hashkey,
83 									  metap->hashm_maxbucket,
84 									  metap->hashm_highmask,
85 									  metap->hashm_lowmask);
86 
87 		blkno = BUCKET_TO_BLKNO(metap, bucket);
88 
89 		/* Release metapage lock, but keep pin. */
90 		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
91 
92 		/*
93 		 * If the previous iteration of this loop locked what is still the
94 		 * correct target bucket, we are done.  Otherwise, drop any old lock
95 		 * and lock what now appears to be the correct bucket.
96 		 */
97 		if (retry)
98 		{
99 			if (oldblkno == blkno)
100 				break;
101 			_hash_droplock(rel, oldblkno, HASH_SHARE);
102 		}
103 		_hash_getlock(rel, blkno, HASH_SHARE);
104 
105 		/*
106 		 * Reacquire metapage lock and check that no bucket split has taken
107 		 * place while we were awaiting the bucket lock.
108 		 */
109 		_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
110 		oldblkno = blkno;
111 		retry = true;
112 	}
113 
114 	/* Fetch the primary bucket page for the bucket */
115 	buf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BUCKET_PAGE);
116 	page = BufferGetPage(buf);
117 	pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
118 	Assert(pageopaque->hasho_bucket == bucket);
119 
120 	/* Do the insertion */
121 	while (PageGetFreeSpace(page) < itemsz)
122 	{
123 		/*
124 		 * no space on this page; check for an overflow page
125 		 */
126 		BlockNumber nextblkno = pageopaque->hasho_nextblkno;
127 
128 		if (BlockNumberIsValid(nextblkno))
129 		{
130 			/*
131 			 * ovfl page exists; go get it.  if it doesn't have room, we'll
132 			 * find out next pass through the loop test above.
133 			 */
134 			_hash_relbuf(rel, buf);
135 			buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
136 			page = BufferGetPage(buf);
137 		}
138 		else
139 		{
140 			/*
141 			 * we're at the end of the bucket chain and we haven't found a
142 			 * page with enough room.  allocate a new overflow page.
143 			 */
144 
145 			/* release our write lock without modifying buffer */
146 			_hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
147 
148 			/* chain to a new overflow page */
149 			buf = _hash_addovflpage(rel, metabuf, buf);
150 			page = BufferGetPage(buf);
151 
152 			/* should fit now, given test above */
153 			Assert(PageGetFreeSpace(page) >= itemsz);
154 		}
155 		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
156 		Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE);
157 		Assert(pageopaque->hasho_bucket == bucket);
158 	}
159 
160 	/* found page with enough space, so add the item here */
161 	(void) _hash_pgaddtup(rel, buf, itemsz, itup);
162 
163 	/* write and release the modified page */
164 	_hash_wrtbuf(rel, buf);
165 
166 	/* We can drop the bucket lock now */
167 	_hash_droplock(rel, blkno, HASH_SHARE);
168 
169 	/*
170 	 * Write-lock the metapage so we can increment the tuple count. After
171 	 * incrementing it, check to see if it's time for a split.
172 	 */
173 	_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
174 
175 	metap->hashm_ntuples += 1;
176 
177 	/* Make sure this stays in sync with _hash_expandtable() */
178 	do_expand = metap->hashm_ntuples >
179 		(double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
180 
181 	/* Write out the metapage and drop lock, but keep pin */
182 	_hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
183 
184 	/* Attempt to split if a split is needed */
185 	if (do_expand)
186 		_hash_expandtable(rel, metabuf);
187 
188 	/* Finally drop our pin on the metapage */
189 	_hash_dropbuf(rel, metabuf);
190 }
191 
192 /*
193  *	_hash_pgaddtup() -- add a tuple to a particular page in the index.
194  *
195  * This routine adds the tuple to the page as requested; it does not write out
196  * the page.  It is an error to call pgaddtup() without pin and write lock on
197  * the target buffer.
198  *
199  * Returns the offset number at which the tuple was inserted.  This function
200  * is responsible for preserving the condition that tuples in a hash index
201  * page are sorted by hashkey value.
202  */
203 OffsetNumber
_hash_pgaddtup(Relation rel,Buffer buf,Size itemsize,IndexTuple itup)204 _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
205 {
206 	OffsetNumber itup_off;
207 	Page		page;
208 	uint32		hashkey;
209 
210 	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
211 	page = BufferGetPage(buf);
212 
213 	/* Find where to insert the tuple (preserving page's hashkey ordering) */
214 	hashkey = _hash_get_indextuple_hashkey(itup);
215 	itup_off = _hash_binsearch(page, hashkey);
216 
217 	if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
218 		== InvalidOffsetNumber)
219 		elog(ERROR, "failed to add index item to \"%s\"",
220 			 RelationGetRelationName(rel));
221 
222 	return itup_off;
223 }
224