1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 2013 Oracle and/or its affiliates. All rights reserved.
5 */
6 /*
7 * Copyright (c) 1990, 1993, 1994
8 * The Regents of the University of California. All rights reserved.
9 *
10 * This code is derived from software contributed to Berkeley by
11 * Margo Seltzer.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
15 * are met:
16 * 1. Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 * 2. Redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution.
21 * 3. Neither the name of the University nor the names of its contributors
22 * may be used to endorse or promote products derived from this software
23 * without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
26 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
29 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
31 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
32 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
34 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
36 *
37 * $Id$
38 */
39
40 /*
41 * PACKAGE: hashing
42 *
43 * DESCRIPTION:
44 * Manipulation of duplicates for the hash package.
45 */
46
47 #include "db_config.h"
48
49 #include "db_int.h"
50 #include "dbinc/db_page.h"
51 #include "dbinc/hash.h"
52 #include "dbinc/btree.h"
53 #include "dbinc/mp.h"
54
55 static int __hamc_chgpg __P((DBC *,
56 db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
57 static int __ham_check_move __P((DBC *, u_int32_t));
58 static int __ham_dcursor __P((DBC *, db_pgno_t, u_int32_t));
59 static int __ham_move_offpage __P((DBC *, PAGE *, u_int32_t, db_pgno_t));
60 static int __hamc_chgpg_func
61 __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
62
63 /*
64 * Called from hash_access to add a duplicate key. nval is the new
65 * value that we want to add. The flags correspond to the flag values
66 * to cursor_put indicating where to add the new element.
67 * There are 4 cases.
68 * Case 1: The existing duplicate set already resides on a separate page.
69 * We return and let the common code handle this.
70 * Case 2: The element is small enough to just be added to the existing set.
71 * Case 3: The element is large enough to be a big item, so we're going to
72 * have to push the set onto a new page.
73 * Case 4: The element is large enough to push the duplicate set onto a
74 * separate page.
75 *
76 * PUBLIC: int __ham_add_dup __P((DBC *, DBT *, u_int32_t, db_pgno_t *));
77 */
78 int
__ham_add_dup(dbc,nval,flags,pgnop)79 __ham_add_dup(dbc, nval, flags, pgnop)
80 DBC *dbc;
81 DBT *nval;
82 u_int32_t flags;
83 db_pgno_t *pgnop;
84 {
85 DB *dbp;
86 DBT pval, tmp_val;
87 DB_MPOOLFILE *mpf;
88 ENV *env;
89 HASH_CURSOR *hcp;
90 u_int32_t add_bytes, new_size;
91 int cmp, ret;
92 u_int8_t *hk;
93
94 dbp = dbc->dbp;
95 env = dbp->env;
96 mpf = dbp->mpf;
97 hcp = (HASH_CURSOR *)dbc->internal;
98
99 DB_ASSERT(env, flags != DB_CURRENT);
100
101 add_bytes = nval->size +
102 (F_ISSET(nval, DB_DBT_PARTIAL) ? nval->doff : 0);
103 add_bytes = DUP_SIZE(add_bytes);
104
105 if ((ret = __ham_check_move(dbc, add_bytes)) != 0)
106 return (ret);
107
108 /*
109 * Check if resulting duplicate set is going to need to go
110 * onto a separate duplicate page. If so, convert the
111 * duplicate set and add the new one. After conversion,
112 * hcp->dndx is the first free ndx or the index of the
113 * current pointer into the duplicate set.
114 */
115 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
116 /* Add the len bytes to the current singleton. */
117 if (HPAGE_PTYPE(hk) != H_DUPLICATE)
118 add_bytes += DUP_SIZE(0);
119 new_size =
120 LEN_HKEYDATA(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx)) +
121 add_bytes;
122
123 /*
124 * We convert to off-page duplicates if the item is a big item,
125 * the addition of the new item will make the set large, or
126 * if there isn't enough room on this page to add the next item.
127 */
128 if (HPAGE_PTYPE(hk) != H_OFFDUP &&
129 (HPAGE_PTYPE(hk) == H_OFFPAGE || ISBIG(hcp, new_size) ||
130 add_bytes > P_FREESPACE(dbp, hcp->page))) {
131
132 if ((ret = __ham_dup_convert(dbc)) != 0)
133 return (ret);
134 return (hcp->opd->am_put(hcp->opd,
135 NULL, nval, flags, NULL));
136 }
137
138 /* There are two separate cases here: on page and off page. */
139 if (HPAGE_PTYPE(hk) != H_OFFDUP) {
140 if (HPAGE_PTYPE(hk) != H_DUPLICATE) {
141 pval.flags = 0;
142 pval.data = HKEYDATA_DATA(hk);
143 pval.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize,
144 hcp->indx);
145 if ((ret = __ham_make_dup(env,
146 &pval, &tmp_val, &dbc->my_rdata.data,
147 &dbc->my_rdata.ulen)) != 0 || (ret =
148 __ham_replpair(dbc, &tmp_val, H_DUPLICATE)) != 0)
149 return (ret);
150 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
151 HPAGE_PTYPE(hk) = H_DUPLICATE;
152
153 /*
154 * Update the cursor position since we now are in
155 * duplicates.
156 */
157 F_SET(hcp, H_ISDUP);
158 hcp->dup_off = 0;
159 hcp->dup_len = pval.size;
160 hcp->dup_tlen = DUP_SIZE(hcp->dup_len);
161 }
162
163 /* Now make the new entry a duplicate. */
164 if ((ret = __ham_make_dup(env, nval,
165 &tmp_val, &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
166 return (ret);
167
168 tmp_val.dlen = 0;
169 switch (flags) { /* On page. */
170 case DB_KEYFIRST:
171 case DB_KEYLAST:
172 case DB_NODUPDATA:
173 case DB_OVERWRITE_DUP:
174 if (dbp->dup_compare != NULL) {
175 __ham_dsearch(dbc,
176 nval, &tmp_val.doff, &cmp, flags);
177
178 /*
179 * Duplicate duplicates are not supported w/
180 * sorted dups. We can either overwrite or
181 * return DB_KEYEXIST.
182 */
183 if (cmp == 0) {
184 if (flags == DB_OVERWRITE_DUP)
185 return (__ham_overwrite(dbc,
186 nval, flags));
187 return (__db_duperr(dbp, flags));
188 }
189 } else {
190 hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
191 dbp->pgsize, hcp->indx);
192 hcp->dup_len = nval->size;
193 F_SET(hcp, H_ISDUP);
194 if (flags == DB_KEYFIRST)
195 hcp->dup_off = tmp_val.doff = 0;
196 else
197 hcp->dup_off =
198 tmp_val.doff = hcp->dup_tlen;
199 }
200 break;
201 case DB_BEFORE:
202 tmp_val.doff = hcp->dup_off;
203 break;
204 case DB_AFTER:
205 tmp_val.doff = hcp->dup_off + DUP_SIZE(hcp->dup_len);
206 break;
207 default:
208 return (__db_unknown_path(env, "__ham_add_dup"));
209 }
210
211 /* Add the duplicate. */
212 if ((ret = __memp_dirty(mpf, &hcp->page,
213 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 ||
214 (ret = __ham_replpair(dbc, &tmp_val, H_DUPLICATE)) != 0)
215 return (ret);
216
217 /* Now, update the cursor if necessary. */
218 switch (flags) {
219 case DB_AFTER:
220 hcp->dup_off += DUP_SIZE(hcp->dup_len);
221 hcp->dup_len = nval->size;
222 hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
223 break;
224 case DB_BEFORE:
225 case DB_KEYFIRST:
226 case DB_KEYLAST:
227 case DB_NODUPDATA:
228 case DB_OVERWRITE_DUP:
229 hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
230 hcp->dup_len = nval->size;
231 break;
232 default:
233 return (__db_unknown_path(env, "__ham_add_dup"));
234 }
235 ret = __hamc_update(dbc, tmp_val.size, DB_HAM_CURADJ_ADD, 1);
236 return (ret);
237 }
238
239 /*
240 * If we get here, then we're on duplicate pages; set pgnop and
241 * return so the common code can handle it.
242 */
243 memcpy(pgnop, HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)),
244 sizeof(db_pgno_t));
245
246 return (ret);
247 }
248
249 /*
250 * Convert an on-page set of duplicates to an offpage set of duplicates.
251 *
252 * PUBLIC: int __ham_dup_convert __P((DBC *));
253 */
254 int
__ham_dup_convert(dbc)255 __ham_dup_convert(dbc)
256 DBC *dbc;
257 {
258 BOVERFLOW bo;
259 DB *dbp;
260 DBC **hcs;
261 DBT dbt;
262 DB_LSN lsn;
263 DB_MPOOLFILE *mpf;
264 ENV *env;
265 HASH_CURSOR *hcp;
266 HOFFPAGE ho;
267 PAGE *dp;
268 db_indx_t i, len, off;
269 int c, ret, t_ret;
270 u_int8_t *p, *pend;
271
272 dbp = dbc->dbp;
273 env = dbp->env;
274 mpf = dbp->mpf;
275 hcp = (HASH_CURSOR *)dbc->internal;
276
277 /*
278 * Create a new page for the duplicates.
279 */
280 if ((ret = __db_new(dbc,
281 dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, NULL, &dp)) != 0)
282 return (ret);
283 P_INIT(dp, dbp->pgsize,
284 dp->pgno, PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp));
285
286 /*
287 * Get the list of cursors that may need to be updated.
288 */
289 if ((ret = __ham_get_clist(dbp,
290 PGNO(hcp->page), (u_int32_t)hcp->indx, &hcs)) != 0)
291 goto err;
292
293 /*
294 * Now put the duplicates onto the new page.
295 */
296 dbt.flags = 0;
297 switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
298 case H_KEYDATA:
299 /* Simple case, one key on page; move it to dup page. */
300 dbt.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
301 dbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
302 ret = __db_pitem(dbc,
303 dp, 0, BKEYDATA_SIZE(dbt.size), NULL, &dbt);
304 goto finish;
305 case H_OFFPAGE:
306 /* Simple case, one key on page; move it to dup page. */
307 memcpy(&ho, P_ENTRY(dbp, hcp->page, H_DATAINDEX(hcp->indx)),
308 HOFFPAGE_SIZE);
309 UMRW_SET(bo.unused1);
310 B_TSET(bo.type, ho.type);
311 UMRW_SET(bo.unused2);
312 bo.pgno = ho.pgno;
313 bo.tlen = ho.tlen;
314 dbt.size = BOVERFLOW_SIZE;
315 dbt.data = &bo;
316
317 ret = __db_pitem(dbc, dp, 0, dbt.size, &dbt, NULL);
318 finish: if (ret == 0) {
319 /* Update any other cursors. */
320 if (hcs != NULL && DBC_LOGGING(dbc) &&
321 IS_SUBTRANSACTION(dbc->txn)) {
322 if ((ret = __ham_chgpg_log(dbp, dbc->txn,
323 &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
324 PGNO(dp), hcp->indx, 0)) != 0)
325 break;
326 }
327 for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
328 if ((ret = __ham_dcursor(hcs[c],
329 PGNO(dp), 0)) != 0)
330 break;
331 }
332 break;
333 case H_DUPLICATE:
334 p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
335 pend = p +
336 LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
337
338 /*
339 * We need to maintain the duplicate cursor position.
340 * Keep track of where we are in the duplicate set via
341 * the offset, and when it matches the one in the cursor,
342 * set the off-page duplicate cursor index to the current
343 * index.
344 */
345 for (off = 0, i = 0; p < pend; i++) {
346 memcpy(&len, p, sizeof(db_indx_t));
347 dbt.size = len;
348 p += sizeof(db_indx_t);
349 dbt.data = p;
350 p += len + sizeof(db_indx_t);
351 if ((ret = __db_pitem(dbc, dp,
352 i, BKEYDATA_SIZE(dbt.size), NULL, &dbt)) != 0)
353 break;
354
355 /* Update any other cursors */
356 if (hcs != NULL && DBC_LOGGING(dbc) &&
357 IS_SUBTRANSACTION(dbc->txn)) {
358 if ((ret = __ham_chgpg_log(dbp, dbc->txn,
359 &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
360 PGNO(dp), hcp->indx, i)) != 0)
361 break;
362 }
363 for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
364 if (((HASH_CURSOR *)(hcs[c]->internal))->dup_off
365 == off && (ret = __ham_dcursor(hcs[c],
366 PGNO(dp), i)) != 0)
367 goto err;
368 off += len + 2 * sizeof(db_indx_t);
369 }
370 break;
371 default:
372 ret = __db_pgfmt(env, hcp->pgno);
373 break;
374 }
375
376 /*
377 * Now attach this to the source page in place of the old duplicate
378 * item.
379 */
380 if (ret == 0)
381 ret = __memp_dirty(mpf,
382 &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0);
383
384 if (ret == 0)
385 ret = __ham_move_offpage(dbc, hcp->page,
386 (u_int32_t)H_DATAINDEX(hcp->indx), PGNO(dp));
387
388 err: if ((t_ret = __memp_fput(mpf,
389 dbc->thread_info, dp, dbc->priority)) != 0 && ret == 0)
390 ret = t_ret;
391
392 if (ret == 0)
393 hcp->dup_tlen = hcp->dup_off = hcp->dup_len = 0;
394
395 if (hcs != NULL)
396 __os_free(env, hcs);
397
398 return (ret);
399 }
400
401 /*
402 * __ham_make_dup
403 *
404 * Take a regular dbt and make it into a duplicate item with all the partial
405 * information set appropriately. If the incoming dbt is a partial, assume
406 * we are creating a new entry and make sure that we do any initial padding.
407 *
408 * PUBLIC: int __ham_make_dup __P((ENV *,
409 * PUBLIC: const DBT *, DBT *d, void **, u_int32_t *));
410 */
411 int
__ham_make_dup(env,notdup,duplicate,bufp,sizep)412 __ham_make_dup(env, notdup, duplicate, bufp, sizep)
413 ENV *env;
414 const DBT *notdup;
415 DBT *duplicate;
416 void **bufp;
417 u_int32_t *sizep;
418 {
419 db_indx_t tsize, item_size;
420 int ret;
421 u_int8_t *p;
422
423 item_size = (db_indx_t)notdup->size;
424 if (F_ISSET(notdup, DB_DBT_PARTIAL))
425 item_size += notdup->doff;
426
427 tsize = DUP_SIZE(item_size);
428 if ((ret = __ham_init_dbt(env, duplicate, tsize, bufp, sizep)) != 0)
429 return (ret);
430
431 duplicate->dlen = 0;
432 duplicate->flags = notdup->flags;
433 F_SET(duplicate, DB_DBT_PARTIAL);
434
435 p = duplicate->data;
436 memcpy(p, &item_size, sizeof(db_indx_t));
437 p += sizeof(db_indx_t);
438 if (F_ISSET(notdup, DB_DBT_PARTIAL)) {
439 memset(p, 0, notdup->doff);
440 p += notdup->doff;
441 }
442 memcpy(p, notdup->data, notdup->size);
443 p += notdup->size;
444 memcpy(p, &item_size, sizeof(db_indx_t));
445
446 duplicate->doff = 0;
447 duplicate->dlen = notdup->size;
448
449 return (0);
450 }
451
452 /*
453 * __ham_check_move --
454 *
455 * Check if we can do whatever we need to on this page. If not,
456 * then we'll have to move the current element to a new page.
457 */
458 static int
__ham_check_move(dbc,add_len)459 __ham_check_move(dbc, add_len)
460 DBC *dbc;
461 u_int32_t add_len;
462 {
463 DB *dbp;
464 DBT k, d;
465 DB_LSN new_lsn;
466 DB_MPOOLFILE *mpf;
467 HASH_CURSOR *hcp;
468 PAGE *new_pagep, *next_pagep;
469 db_pgno_t next_pgno;
470 u_int32_t data_type, key_type, new_datalen, old_len;
471 db_indx_t new_indx;
472 u_int8_t *hk;
473 int found, match, ret;
474
475 dbp = dbc->dbp;
476 mpf = dbp->mpf;
477 hcp = (HASH_CURSOR *)dbc->internal;
478
479 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
480 found = 0;
481
482 /*
483 * If the item is already off page duplicates or an offpage item,
484 * then we know we can do whatever we need to do in-place
485 */
486 if (HPAGE_PTYPE(hk) == H_OFFDUP || HPAGE_PTYPE(hk) == H_OFFPAGE)
487 return (0);
488
489 old_len =
490 LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx));
491 new_datalen = (old_len - HKEYDATA_SIZE(0)) + add_len;
492 if (HPAGE_PTYPE(hk) != H_DUPLICATE)
493 new_datalen += DUP_SIZE(0);
494
495 /*
496 * We need to add a new page under two conditions:
497 * 1. The addition makes the total data length cross the BIG
498 * threshold and the OFFDUP structure won't fit on this page.
499 * 2. The addition does not make the total data cross the
500 * threshold, but the new data won't fit on the page.
501 * If neither of these is true, then we can return.
502 */
503 if (ISBIG(hcp, new_datalen) && (old_len > HOFFDUP_SIZE ||
504 HOFFDUP_SIZE - old_len <= P_FREESPACE(dbp, hcp->page)))
505 return (0);
506
507 if (!ISBIG(hcp, new_datalen) &&
508 (new_datalen - old_len) <= P_FREESPACE(dbp, hcp->page))
509 return (0);
510
511 /*
512 * If we get here, then we need to move the item to a new page.
513 * Check if there are more pages in the chain. We now need to
514 * update new_datalen to include the size of both the key and
515 * the data that we need to move.
516 */
517
518 new_datalen = ISBIG(hcp, new_datalen) ?
519 HOFFDUP_SIZE : HKEYDATA_SIZE(new_datalen);
520 new_datalen +=
521 LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_KEYINDEX(hcp->indx));
522
523 new_pagep = NULL;
524 next_pagep = hcp->page;
525 for (next_pgno = NEXT_PGNO(hcp->page); next_pgno != PGNO_INVALID;
526 next_pgno = NEXT_PGNO(next_pagep)) {
527 if (next_pagep != hcp->page && (ret = __memp_fput(mpf,
528 dbc->thread_info, next_pagep, dbc->priority)) != 0)
529 return (ret);
530
531 if ((ret = __memp_fget(mpf,
532 &next_pgno, dbc->thread_info, dbc->txn,
533 DB_MPOOL_CREATE, &next_pagep)) != 0)
534 return (ret);
535
536 if (P_FREESPACE(dbp, next_pagep) >= new_datalen) {
537 found = 1;
538 break;
539 }
540 }
541
542 if (found != 0) {
543 /* Found a page with space, dirty it and the original. */
544 new_pagep = next_pagep;
545 if ((ret = __memp_dirty(mpf, &hcp->page,
546 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
547 goto err;
548 if ((ret = __memp_dirty(mpf, &new_pagep,
549 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
550 goto err;
551 } else {
552 if ((ret = __memp_dirty(mpf, &next_pagep,
553 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
554 goto err;
555
556 /* Add new page at the end of the chain. */
557 new_pagep = next_pagep;
558 if ((ret = __ham_add_ovflpage(dbc, &new_pagep)) != 0)
559 goto err;
560
561 if (next_pagep != hcp->page) {
562 if ((ret = __memp_fput(mpf,
563 dbc->thread_info, next_pagep, dbc->priority)) != 0)
564 goto err;
565 next_pagep = NULL;
566 /* Dirty the original page to update it. */
567 if ((ret = __memp_dirty(mpf, &hcp->page,
568 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
569 goto err;
570 }
571 }
572
573 /* Copy the item to the new page. */
574 if (DBC_LOGGING(dbc)) {
575 memset(&k, 0, sizeof(DBT));
576 d.flags = 0;
577 if (HPAGE_PTYPE(
578 H_PAIRKEY(dbp, hcp->page, hcp->indx)) == H_OFFPAGE) {
579 k.data = H_PAIRKEY(dbp, hcp->page, hcp->indx);
580 k.size = HOFFPAGE_SIZE;
581 key_type = H_OFFPAGE;
582 } else {
583 k.data =
584 HKEYDATA_DATA(H_PAIRKEY(dbp, hcp->page, hcp->indx));
585 k.size =
586 LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx);
587 key_type = H_KEYDATA;
588 }
589
590 /* Resolve the insert index so it can be written to the log. */
591 if ((ret = __ham_getindex(dbc, new_pagep, &k,
592 key_type, &match, &new_indx)) != 0)
593 return (ret);
594
595 if ((data_type = HPAGE_PTYPE(hk)) == H_OFFPAGE) {
596 d.data = hk;
597 d.size = HOFFPAGE_SIZE;
598 } else if (data_type == H_OFFDUP) {
599 d.data = hk;
600 d.size = HOFFDUP_SIZE;
601 } else {
602 d.data = HKEYDATA_DATA(hk);
603 d.size = LEN_HDATA(dbp,
604 hcp->page, dbp->pgsize, hcp->indx);
605 }
606
607 if ((ret = __ham_insdel_log(dbp, dbc->txn, &new_lsn,
608 0, PUTPAIR, PGNO(new_pagep), (u_int32_t)new_indx,
609 &LSN(new_pagep), OP_SET(key_type, new_pagep), &k,
610 OP_SET(data_type, new_pagep), &d)) != 0) {
611 (void)__memp_fput(mpf,
612 dbc->thread_info, new_pagep, dbc->priority);
613 return (ret);
614 }
615 } else {
616 LSN_NOT_LOGGED(new_lsn);
617 /*
618 * Ensure that an invalid index is passed to __ham_copypair, so
619 * it knows to resolve the index. Resolving the insert index
620 * here would require creating a temporary DBT with the key,
621 * and calling __ham_getindex. Let __ham_copypair do the
622 * resolution using the final key DBT.
623 */
624 new_indx = NDX_INVALID;
625 }
626
627 /* Move lsn onto page. */
628 LSN(new_pagep) = new_lsn; /* Structure assignment. */
629
630 if ((ret = __ham_copypair(dbc, hcp->page,
631 H_KEYINDEX(hcp->indx), new_pagep, &new_indx, 0)) != 0)
632 goto err;
633
634 /* Update all cursors that used to point to this item. */
635 if ((ret = __hamc_chgpg(dbc, PGNO(hcp->page), H_KEYINDEX(hcp->indx),
636 PGNO(new_pagep), new_indx)) != 0)
637 goto err;
638
639 /* Now delete the pair from the current page. */
640 if ((ret = __ham_del_pair(dbc, HAM_DEL_NO_RECLAIM, NULL)) != 0)
641 goto err;
642
643 /*
644 * __ham_del_pair decremented nelem. This is incorrect; we
645 * manually copied the element elsewhere, so the total number
646 * of elements hasn't changed. Increment it again.
647 *
648 * !!!
649 * Note that we still have the metadata page pinned, and
650 * __ham_del_pair dirtied it, so we don't need to set the dirty
651 * flag again.
652 */
653 if (!STD_LOCKING(dbc))
654 hcp->hdr->nelem++;
655
656 ret = __memp_fput(mpf, dbc->thread_info, hcp->page, dbc->priority);
657 hcp->page = new_pagep;
658 hcp->pgno = PGNO(hcp->page);
659 hcp->indx = new_indx;
660 F_SET(hcp, H_EXPAND);
661 F_CLR(hcp, H_DELETED);
662
663 return (ret);
664
665 err: if (new_pagep != NULL)
666 (void)__memp_fput(mpf,
667 dbc->thread_info, new_pagep, dbc->priority);
668 if (next_pagep != NULL &&
669 next_pagep != hcp->page && next_pagep != new_pagep)
670 (void)__memp_fput(mpf,
671 dbc->thread_info, next_pagep, dbc->priority);
672 return (ret);
673
674 }
675
676 /*
677 * __ham_move_offpage --
678 * Replace an onpage set of duplicates with the OFFDUP structure
679 * that references the duplicate page.
680 *
681 * XXX
682 * This is really just a special case of __onpage_replace; we should
683 * probably combine them.
684 *
685 */
686 static int
__ham_move_offpage(dbc,pagep,ndx,pgno)687 __ham_move_offpage(dbc, pagep, ndx, pgno)
688 DBC *dbc;
689 PAGE *pagep;
690 u_int32_t ndx;
691 db_pgno_t pgno;
692 {
693 DB *dbp;
694 DBT new_dbt;
695 DBT old_dbt;
696 HOFFDUP od;
697 db_indx_t i, *inp;
698 int32_t difflen;
699 u_int8_t *src;
700 int ret;
701
702 dbp = dbc->dbp;
703 od.type = H_OFFDUP;
704 UMRW_SET(od.unused[0]);
705 UMRW_SET(od.unused[1]);
706 UMRW_SET(od.unused[2]);
707 od.pgno = pgno;
708 ret = 0;
709
710 if (DBC_LOGGING(dbc)) {
711 HKEYDATA *hk;
712 new_dbt.data = &od;
713 new_dbt.size = HOFFDUP_SIZE;
714 hk = (HKEYDATA *)P_ENTRY(dbp, pagep, ndx);
715 if (hk->type == H_KEYDATA || hk->type == H_DUPLICATE) {
716 old_dbt.data = hk->data;
717 old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
718 SSZA(HKEYDATA, data);
719 } else {
720 old_dbt.data = hk;
721 old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx);
722 }
723 if ((ret = __ham_replace_log(dbp, dbc->txn, &LSN(pagep), 0,
724 PGNO(pagep), (u_int32_t)ndx, &LSN(pagep), -1,
725 OP_SET(hk->type, pagep), &old_dbt,
726 OP_SET(H_OFFDUP, pagep), &new_dbt)) != 0)
727 return (ret);
728 } else
729 LSN_NOT_LOGGED(LSN(pagep));
730
731 /*
732 * difflen is the difference in the lengths, and so may be negative.
733 * We know that the difference between two unsigned lengths from a
734 * database page will fit into an int32_t.
735 */
736 difflen =
737 (int32_t)LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
738 (int32_t)HOFFDUP_SIZE;
739 if (difflen != 0) {
740 /* Copy data. */
741 inp = P_INP(dbp, pagep);
742 src = (u_int8_t *)(pagep) + HOFFSET(pagep);
743 memmove(src + difflen, src, inp[ndx] - HOFFSET(pagep));
744 HOFFSET(pagep) += difflen;
745
746 /* Update index table. */
747 for (i = ndx; i < NUM_ENT(pagep); i++)
748 inp[i] += difflen;
749 }
750
751 /* Now copy the offdup entry onto the page. */
752 memcpy(P_ENTRY(dbp, pagep, ndx), &od, HOFFDUP_SIZE);
753 return (ret);
754 }
755
756 /*
757 * __ham_dsearch:
758 * Locate a particular duplicate in a duplicate set. Make sure that
759 * we exit with the cursor set appropriately.
760 *
761 * PUBLIC: void __ham_dsearch
762 * PUBLIC: __P((DBC *, DBT *, u_int32_t *, int *, u_int32_t));
763 */
764 void
__ham_dsearch(dbc,dbt,offp,cmpp,flags)765 __ham_dsearch(dbc, dbt, offp, cmpp, flags)
766 DBC *dbc;
767 DBT *dbt;
768 u_int32_t *offp, flags;
769 int *cmpp;
770 {
771 DB *dbp;
772 DBT cur;
773 HASH_CURSOR *hcp;
774 db_indx_t i, len;
775 int (*func) __P((DB *, const DBT *, const DBT *));
776 u_int8_t *data;
777
778 dbp = dbc->dbp;
779 hcp = (HASH_CURSOR *)dbc->internal;
780 func = dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare;
781
782 i = F_ISSET(hcp, H_CONTINUE) ? hcp->dup_off: 0;
783 data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + i;
784 hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
785 len = hcp->dup_len;
786 while (i < hcp->dup_tlen) {
787 memcpy(&len, data, sizeof(db_indx_t));
788 data += sizeof(db_indx_t);
789 DB_SET_DBT(cur, data, len);
790
791 /*
792 * If we find an exact match, we're done. If in a sorted
793 * duplicate set and the item is larger than our test item,
794 * we're done. In the latter case, if permitting partial
795 * matches, it's not a failure.
796 */
797 *cmpp = func(dbp, dbt, &cur);
798 if (*cmpp == 0)
799 break;
800 if (*cmpp < 0 && dbp->dup_compare != NULL) {
801 if (flags == DB_GET_BOTH_RANGE)
802 *cmpp = 0;
803 break;
804 }
805
806 i += len + 2 * sizeof(db_indx_t);
807 data += len + sizeof(db_indx_t);
808 }
809
810 *offp = i;
811 hcp->dup_off = i;
812 hcp->dup_len = len;
813 F_SET(hcp, H_ISDUP);
814 }
815
816 /*
817 * __ham_dcursor --
818 *
819 * Create an off page duplicate cursor for this cursor.
820 */
821 static int
__ham_dcursor(dbc,pgno,indx)822 __ham_dcursor(dbc, pgno, indx)
823 DBC *dbc;
824 db_pgno_t pgno;
825 u_int32_t indx;
826 {
827 BTREE_CURSOR *dcp;
828 DB *dbp;
829 HASH_CURSOR *hcp;
830 int ret;
831
832 dbp = dbc->dbp;
833 hcp = (HASH_CURSOR *)dbc->internal;
834
835 if ((ret = __dbc_newopd(dbc, pgno, hcp->opd, &hcp->opd)) != 0)
836 return (ret);
837
838 dcp = (BTREE_CURSOR *)hcp->opd->internal;
839 dcp->pgno = pgno;
840 dcp->indx = indx;
841
842 if (dbp->dup_compare == NULL) {
843 /*
844 * Converting to off-page Recno trees is tricky. The
845 * record number for the cursor is the index + 1 (to
846 * convert to 1-based record numbers).
847 */
848 dcp->recno = indx + 1;
849 }
850
851 /*
852 * Transfer the deleted flag from the top-level cursor to the
853 * created one.
854 */
855 if (F_ISSET(hcp, H_DELETED)) {
856 F_SET(dcp, C_DELETED);
857 F_CLR(hcp, H_DELETED);
858 }
859
860 return (0);
861 }
862
863 struct __hamc_chgpg_args {
864 db_pgno_t new_pgno;
865 db_indx_t new_index;
866 DB_TXN *my_txn;
867 };
868
869 static int
__hamc_chgpg_func(cp,my_dbc,foundp,old_pgno,old_index,vargs)870 __hamc_chgpg_func(cp, my_dbc, foundp, old_pgno, old_index, vargs)
871 DBC *cp, *my_dbc;
872 u_int32_t *foundp;
873 db_pgno_t old_pgno;
874 u_int32_t old_index;
875 void *vargs;
876 {
877 HASH_CURSOR *hcp;
878 struct __hamc_chgpg_args *args;
879
880 if (cp == my_dbc || cp->dbtype != DB_HASH)
881 return (0);
882
883 hcp = (HASH_CURSOR *)cp->internal;
884
885 /*
886 * If a cursor is deleted, it doesn't refer to this
887 * item--it just happens to have the same indx, but
888 * it points to a former neighbor. Don't move it.
889 */
890 if (F_ISSET(hcp, H_DELETED))
891 return (0);
892
893 args = vargs;
894
895 if (hcp->pgno == old_pgno &&
896 hcp->indx == old_index &&
897 !MVCC_SKIP_CURADJ(cp, old_pgno)) {
898 hcp->pgno = args->new_pgno;
899 hcp->indx = args->new_index;
900 if (args->my_txn != NULL && cp->txn != args->my_txn)
901 *foundp = 1;
902 }
903 return (0);
904 }
905
906 /*
907 * __hamc_chgpg --
908 * Adjust the cursors after moving an item to a new page. We only
909 * move cursors that are pointing at this one item and are not
910 * deleted; since we only touch non-deleted cursors, and since
911 * (by definition) no item existed at the pgno/indx we're moving the
912 * item to, we're guaranteed that all the cursors we affect here or
913 * on abort really do refer to this one item.
914 */
915 static int
__hamc_chgpg(dbc,old_pgno,old_index,new_pgno,new_index)916 __hamc_chgpg(dbc, old_pgno, old_index, new_pgno, new_index)
917 DBC *dbc;
918 db_pgno_t old_pgno, new_pgno;
919 u_int32_t old_index, new_index;
920 {
921 DB *dbp;
922 DB_LSN lsn;
923 int ret;
924 u_int32_t found;
925 struct __hamc_chgpg_args args;
926
927 dbp = dbc->dbp;
928
929 args.my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
930 args.new_pgno = new_pgno;
931 args.new_index = new_index;
932
933 if ((ret = __db_walk_cursors(dbp, dbc,
934 __hamc_chgpg_func, &found, old_pgno, old_index, &args)) != 0)
935 return (ret);
936 if (found != 0 && DBC_LOGGING(dbc)) {
937 if ((ret = __ham_chgpg_log(dbp,
938 args.my_txn, &lsn, 0, DB_HAM_CHGPG,
939 old_pgno, new_pgno, old_index, new_index)) != 0)
940 return (ret);
941 }
942 return (0);
943 }
944