1 /*-
2 * Copyright (c) 1996, 2020 Oracle and/or its affiliates. All rights reserved.
3 *
4 * See the file LICENSE for license information.
5 */
6 /*
7 * Copyright (c) 1990, 1993, 1994
8 * The Regents of the University of California. All rights reserved.
9 *
10 * This code is derived from software contributed to Berkeley by
11 * Margo Seltzer.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
15 * are met:
16 * 1. Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 * 2. Redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution.
21 * 3. Neither the name of the University nor the names of its contributors
22 * may be used to endorse or promote products derived from this software
23 * without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
26 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
29 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
31 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
32 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
34 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
36 *
37 * $Id$
38 */
39
40 /*
41 * PACKAGE: hashing
42 *
43 * DESCRIPTION:
44 * Manipulation of duplicates for the hash package.
45 */
46
47 #include "db_config.h"
48
49 #include "db_int.h"
50 #include "dbinc/db_page.h"
51 #include "dbinc/hash.h"
52 #include "dbinc/btree.h"
53 #include "dbinc/mp.h"
54
55 static int __hamc_chgpg __P((DBC *,
56 db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
57 static int __ham_check_move __P((DBC *, u_int32_t));
58 static int __ham_dcursor __P((DBC *, db_pgno_t, u_int32_t));
59 static int __ham_move_offpage __P((DBC *, PAGE *, u_int32_t, db_pgno_t));
60 static int __hamc_chgpg_func
61 __P((DBC *, DBC *, u_int32_t *, db_pgno_t, u_int32_t, void *));
62
63 /*
64 * Called from hash_access to add a duplicate key. nval is the new
65 * value that we want to add. The flags correspond to the flag values
66 * to cursor_put indicating where to add the new element.
67 * There are 4 cases.
68 * Case 1: The existing duplicate set already resides on a separate page.
69 * We return and let the common code handle this.
70 * Case 2: The element is small enough to just be added to the existing set.
71 * Case 3: The element is large enough to be a big item, so we're going to
72 * have to push the set onto a new page.
73 * Case 4: The element is large enough to push the duplicate set onto a
74 * separate page.
75 *
76 * PUBLIC: int __ham_add_dup __P((DBC *, DBT *, u_int32_t, db_pgno_t *));
77 */
78 int
__ham_add_dup(dbc,nval,flags,pgnop)79 __ham_add_dup(dbc, nval, flags, pgnop)
80 DBC *dbc;
81 DBT *nval;
82 u_int32_t flags;
83 db_pgno_t *pgnop;
84 {
85 DB *dbp;
86 DBT pval, tmp_val;
87 DB_MPOOLFILE *mpf;
88 ENV *env;
89 HASH_CURSOR *hcp;
90 u_int32_t add_bytes, new_size;
91 int cmp, ret;
92 u_int8_t *hk;
93
94 dbp = dbc->dbp;
95 env = dbp->env;
96 mpf = dbp->mpf;
97 hcp = (HASH_CURSOR *)dbc->internal;
98
99 DB_ASSERT(env, flags != DB_CURRENT);
100
101 add_bytes = nval->size +
102 (F_ISSET(nval, DB_DBT_PARTIAL) ? nval->doff : 0);
103 add_bytes = DUP_SIZE(add_bytes);
104
105 if ((ret = __ham_check_move(dbc, add_bytes)) != 0)
106 return (ret);
107
108 /*
109 * Check if resulting duplicate set is going to need to go
110 * onto a separate duplicate page. If so, convert the
111 * duplicate set and add the new one. After conversion,
112 * hcp->dndx is the first free ndx or the index of the
113 * current pointer into the duplicate set.
114 */
115 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
116 /* Add the len bytes to the current singleton. */
117 if (HPAGE_PTYPE(hk) != H_DUPLICATE)
118 add_bytes += DUP_SIZE(0);
119 new_size =
120 LEN_HKEYDATA(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx)) +
121 add_bytes;
122
123 /*
124 * We convert to off-page duplicates if the item is a big item,
125 * the addition of the new item will make the set large, or
126 * if there isn't enough room on this page to add the next item.
127 */
128 if (HPAGE_PTYPE(hk) != H_OFFDUP &&
129 (HPAGE_PTYPE(hk) == H_OFFPAGE || ISBIG(hcp, new_size) ||
130 add_bytes > P_FREESPACE(dbp, hcp->page))) {
131
132 if ((ret = __ham_dup_convert(dbc)) != 0)
133 return (ret);
134 return (hcp->opd->am_put(hcp->opd,
135 NULL, nval, flags, NULL));
136 }
137
138 /* There are two separate cases here: on page and off page. */
139 if (HPAGE_PTYPE(hk) != H_OFFDUP) {
140 if (HPAGE_PTYPE(hk) != H_DUPLICATE) {
141 pval.flags = 0;
142 pval.data = HKEYDATA_DATA(hk);
143 pval.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize,
144 hcp->indx);
145 if ((ret = __ham_make_dup(env,
146 &pval, &tmp_val, &dbc->my_rdata.data,
147 &dbc->my_rdata.ulen)) != 0 || (ret =
148 __ham_replpair(dbc, &tmp_val, H_DUPLICATE)) != 0)
149 return (ret);
150 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
151 HPAGE_PTYPE(hk) = H_DUPLICATE;
152
153 /*
154 * Update the cursor position since we now are in
155 * duplicates.
156 */
157 F_SET(hcp, H_ISDUP);
158 hcp->dup_off = 0;
159 hcp->dup_len = pval.size;
160 hcp->dup_tlen = DUP_SIZE(hcp->dup_len);
161 }
162
163 /* Now make the new entry a duplicate. */
164 if ((ret = __ham_make_dup(env, nval,
165 &tmp_val, &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
166 return (ret);
167
168 tmp_val.dlen = 0;
169 switch (flags) { /* On page. */
170 case DB_KEYFIRST:
171 case DB_KEYLAST:
172 case DB_NODUPDATA:
173 case DB_OVERWRITE_DUP:
174 if (dbp->dup_compare != NULL) {
175 __ham_dsearch(dbc,
176 nval, &tmp_val.doff, &cmp, flags);
177
178 /*
179 * Duplicate duplicates are not supported w/
180 * sorted dups. We can either overwrite or
181 * return DB_KEYEXIST.
182 */
183 if (cmp == 0) {
184 if (flags == DB_OVERWRITE_DUP)
185 return (__ham_overwrite(dbc,
186 nval, flags));
187 return (__db_duperr(dbp, flags));
188 }
189 } else {
190 hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
191 dbp->pgsize, hcp->indx);
192 hcp->dup_len = nval->size;
193 F_SET(hcp, H_ISDUP);
194 if (flags == DB_KEYFIRST)
195 hcp->dup_off = tmp_val.doff = 0;
196 else
197 hcp->dup_off =
198 tmp_val.doff = hcp->dup_tlen;
199 }
200 break;
201 case DB_BEFORE:
202 tmp_val.doff = hcp->dup_off;
203 break;
204 case DB_AFTER:
205 tmp_val.doff = hcp->dup_off + DUP_SIZE(hcp->dup_len);
206 break;
207 default:
208 return (__db_unknown_path(env, "__ham_add_dup"));
209 }
210
211 /* Add the duplicate. */
212 if ((ret = __memp_dirty(mpf, &hcp->page,
213 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 ||
214 (ret = __ham_replpair(dbc, &tmp_val, H_DUPLICATE)) != 0)
215 return (ret);
216
217 /* Now, update the cursor if necessary. */
218 switch (flags) {
219 case DB_AFTER:
220 hcp->dup_off += DUP_SIZE(hcp->dup_len);
221 hcp->dup_len = nval->size;
222 hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
223 break;
224 case DB_BEFORE:
225 case DB_KEYFIRST:
226 case DB_KEYLAST:
227 case DB_NODUPDATA:
228 case DB_OVERWRITE_DUP:
229 hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
230 hcp->dup_len = nval->size;
231 break;
232 default:
233 return (__db_unknown_path(env, "__ham_add_dup"));
234 }
235 ret = __hamc_update(dbc, tmp_val.size, DB_HAM_CURADJ_ADD, 1);
236 return (ret);
237 }
238
239 /*
240 * If we get here, then we're on duplicate pages; set pgnop and
241 * return so the common code can handle it.
242 */
243 memcpy(pgnop, HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)),
244 sizeof(db_pgno_t));
245
246 return (ret);
247 }
248
249 /*
250 * Convert an on-page set of duplicates to an offpage set of duplicates.
251 *
252 * PUBLIC: int __ham_dup_convert __P((DBC *));
253 */
254 int
__ham_dup_convert(dbc)255 __ham_dup_convert(dbc)
256 DBC *dbc;
257 {
258 BOVERFLOW bo;
259 DB *dbp;
260 DBC **hcs;
261 DBT dbt;
262 DB_LSN lsn;
263 DB_MPOOLFILE *mpf;
264 ENV *env;
265 HASH_CURSOR *hcp;
266 HOFFPAGE ho;
267 PAGE *dp;
268 db_indx_t i, len, off;
269 int c, ret, t_ret;
270 u_int8_t *p, *pend;
271
272 dbp = dbc->dbp;
273 env = dbp->env;
274 mpf = dbp->mpf;
275 hcp = (HASH_CURSOR *)dbc->internal;
276
277 /*
278 * Create a new page for the duplicates.
279 */
280 if ((ret = __db_new(dbc,
281 dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, NULL, &dp)) != 0)
282 return (ret);
283 P_INIT(dp, dbp->pgsize,
284 dp->pgno, PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp));
285
286 /*
287 * Get the list of cursors that may need to be updated.
288 */
289 if ((ret = __ham_get_clist(dbp,
290 PGNO(hcp->page), (u_int32_t)hcp->indx, &hcs)) != 0)
291 goto err;
292
293 /*
294 * Now put the duplicates onto the new page.
295 */
296 dbt.flags = 0;
297 switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
298 case H_KEYDATA:
299 /* Simple case, one key on page; move it to dup page. */
300 dbt.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
301 dbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
302 ret = __db_pitem(dbc,
303 dp, 0, BKEYDATA_SIZE(dbt.size), NULL, &dbt);
304 goto finish;
305 case H_OFFPAGE:
306 /* Simple case, one key on page; move it to dup page. */
307 memcpy(&ho, P_ENTRY(dbp, hcp->page, H_DATAINDEX(hcp->indx)),
308 HOFFPAGE_SIZE);
309 UMRW_SET(bo.unused1);
310 B_TSET(bo.type, ho.type);
311 UMRW_SET(bo.unused2);
312 bo.pgno = ho.pgno;
313 bo.tlen = ho.tlen;
314 dbt.size = BOVERFLOW_SIZE;
315 dbt.data = &bo;
316
317 ret = __db_pitem(dbc, dp, 0, dbt.size, &dbt, NULL);
318 finish: if (ret == 0) {
319 /* Update any other cursors. */
320 if (hcs != NULL && DBC_LOGGING(dbc) &&
321 IS_SUBTRANSACTION(dbc->txn)) {
322 if ((ret = __ham_chgpg_log(dbp, dbc->txn,
323 &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
324 PGNO(dp), hcp->indx, 0)) != 0)
325 break;
326 }
327 for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
328 if ((ret = __ham_dcursor(hcs[c],
329 PGNO(dp), 0)) != 0)
330 break;
331 }
332 break;
333 case H_DUPLICATE:
334 p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
335 pend = p +
336 LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
337
338 /*
339 * We need to maintain the duplicate cursor position.
340 * Keep track of where we are in the duplicate set via
341 * the offset, and when it matches the one in the cursor,
342 * set the off-page duplicate cursor index to the current
343 * index.
344 */
345 for (off = 0, i = 0; p < pend; i++) {
346 memcpy(&len, p, sizeof(db_indx_t));
347 dbt.size = len;
348 p += sizeof(db_indx_t);
349 dbt.data = p;
350 p += len + sizeof(db_indx_t);
351 if ((ret = __db_pitem(dbc, dp,
352 i, BKEYDATA_SIZE(dbt.size), NULL, &dbt)) != 0)
353 break;
354
355 /* Update any other cursors */
356 if (hcs != NULL && DBC_LOGGING(dbc) &&
357 IS_SUBTRANSACTION(dbc->txn)) {
358 if ((ret = __ham_chgpg_log(dbp, dbc->txn,
359 &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
360 PGNO(dp), hcp->indx, i)) != 0)
361 break;
362 }
363 for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
364 if (((HASH_CURSOR *)(hcs[c]->internal))->dup_off
365 == off && (ret = __ham_dcursor(hcs[c],
366 PGNO(dp), i)) != 0)
367 goto err;
368 off += len + 2 * sizeof(db_indx_t);
369 }
370 break;
371 case H_BLOB:
372 default:
373 ret = __db_pgfmt(env, hcp->pgno);
374 break;
375 }
376
377 /*
378 * Now attach this to the source page in place of the old duplicate
379 * item.
380 */
381 if (ret == 0)
382 ret = __memp_dirty(mpf,
383 &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0);
384
385 if (ret == 0)
386 ret = __ham_move_offpage(dbc, hcp->page,
387 (u_int32_t)H_DATAINDEX(hcp->indx), PGNO(dp));
388
389 err: if ((t_ret = __memp_fput(mpf,
390 dbc->thread_info, dp, dbc->priority)) != 0 && ret == 0)
391 ret = t_ret;
392
393 if (ret == 0)
394 hcp->dup_tlen = hcp->dup_off = hcp->dup_len = 0;
395
396 if (hcs != NULL)
397 __os_free(env, hcs);
398
399 return (ret);
400 }
401
402 /*
403 * __ham_make_dup
404 *
405 * Take a regular dbt and make it into a duplicate item with all the partial
406 * information set appropriately. If the incoming dbt is a partial, assume
407 * we are creating a new entry and make sure that we do any initial padding.
408 *
409 * PUBLIC: int __ham_make_dup __P((ENV *,
410 * PUBLIC: const DBT *, DBT *d, void **, u_int32_t *));
411 */
412 int
__ham_make_dup(env,notdup,duplicate,bufp,sizep)413 __ham_make_dup(env, notdup, duplicate, bufp, sizep)
414 ENV *env;
415 const DBT *notdup;
416 DBT *duplicate;
417 void **bufp;
418 u_int32_t *sizep;
419 {
420 db_indx_t tsize, item_size;
421 int ret;
422 u_int8_t *p;
423
424 item_size = (db_indx_t)notdup->size;
425 if (F_ISSET(notdup, DB_DBT_PARTIAL))
426 item_size += notdup->doff;
427
428 tsize = DUP_SIZE(item_size);
429 if ((ret = __ham_init_dbt(env, duplicate, tsize, bufp, sizep)) != 0)
430 return (ret);
431
432 duplicate->dlen = 0;
433 duplicate->flags = notdup->flags;
434 F_SET(duplicate, DB_DBT_PARTIAL);
435
436 p = duplicate->data;
437 memcpy(p, &item_size, sizeof(db_indx_t));
438 p += sizeof(db_indx_t);
439 if (F_ISSET(notdup, DB_DBT_PARTIAL)) {
440 memset(p, 0, notdup->doff);
441 p += notdup->doff;
442 }
443 memcpy(p, notdup->data, notdup->size);
444 p += notdup->size;
445 memcpy(p, &item_size, sizeof(db_indx_t));
446
447 duplicate->doff = 0;
448 duplicate->dlen = notdup->size;
449
450 return (0);
451 }
452
453 /*
454 * __ham_check_move --
455 *
456 * Check if we can do whatever we need to on this page. If not,
457 * then we'll have to move the current element to a new page.
458 */
459 static int
__ham_check_move(dbc,add_len)460 __ham_check_move(dbc, add_len)
461 DBC *dbc;
462 u_int32_t add_len;
463 {
464 DB *dbp;
465 DBT k, d;
466 DB_LSN new_lsn;
467 DB_MPOOLFILE *mpf;
468 HASH_CURSOR *hcp;
469 PAGE *new_pagep, *next_pagep;
470 db_pgno_t next_pgno;
471 u_int32_t data_type, key_type, new_datalen, old_len;
472 db_indx_t new_indx;
473 u_int8_t *hk;
474 int found, match, ret;
475
476 dbp = dbc->dbp;
477 mpf = dbp->mpf;
478 hcp = (HASH_CURSOR *)dbc->internal;
479
480 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
481 found = 0;
482
483 /*
484 * If the item is already off page duplicates or an offpage item,
485 * then we know we can do whatever we need to do in-place
486 */
487 if (HPAGE_PTYPE(hk) == H_OFFDUP || HPAGE_PTYPE(hk) == H_OFFPAGE)
488 return (0);
489
490 old_len =
491 LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx));
492 new_datalen = (old_len - HKEYDATA_SIZE(0)) + add_len;
493 if (HPAGE_PTYPE(hk) != H_DUPLICATE)
494 new_datalen += DUP_SIZE(0);
495
496 /*
497 * We need to add a new page under two conditions:
498 * 1. The addition makes the total data length cross the BIG
499 * threshold and the OFFDUP structure won't fit on this page.
500 * 2. The addition does not make the total data cross the
501 * threshold, but the new data won't fit on the page.
502 * If neither of these is true, then we can return.
503 */
504 if (ISBIG(hcp, new_datalen) && (old_len > HOFFDUP_SIZE ||
505 HOFFDUP_SIZE - old_len <= P_FREESPACE(dbp, hcp->page)))
506 return (0);
507
508 if (!ISBIG(hcp, new_datalen) &&
509 (new_datalen - old_len) <= P_FREESPACE(dbp, hcp->page))
510 return (0);
511
512 /*
513 * If we get here, then we need to move the item to a new page.
514 * Check if there are more pages in the chain. We now need to
515 * update new_datalen to include the size of both the key and
516 * the data that we need to move.
517 */
518
519 new_datalen = ISBIG(hcp, new_datalen) ?
520 HOFFDUP_SIZE : HKEYDATA_SIZE(new_datalen);
521 new_datalen +=
522 LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_KEYINDEX(hcp->indx));
523
524 new_pagep = NULL;
525 next_pagep = hcp->page;
526 for (next_pgno = NEXT_PGNO(hcp->page); next_pgno != PGNO_INVALID;
527 next_pgno = NEXT_PGNO(next_pagep)) {
528 if (next_pagep != hcp->page && (ret = __memp_fput(mpf,
529 dbc->thread_info, next_pagep, dbc->priority)) != 0)
530 return (ret);
531
532 if ((ret = __memp_fget(mpf,
533 &next_pgno, dbc->thread_info, dbc->txn,
534 DB_MPOOL_CREATE, &next_pagep)) != 0)
535 return (ret);
536
537 if (P_FREESPACE(dbp, next_pagep) >= new_datalen) {
538 found = 1;
539 break;
540 }
541 }
542
543 if (found != 0) {
544 /* Found a page with space, dirty it and the original. */
545 new_pagep = next_pagep;
546 if ((ret = __memp_dirty(mpf, &hcp->page,
547 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
548 goto err;
549 if ((ret = __memp_dirty(mpf, &new_pagep,
550 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
551 goto err;
552 } else {
553 if ((ret = __memp_dirty(mpf, &next_pagep,
554 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
555 goto err;
556
557 /* Add new page at the end of the chain. */
558 new_pagep = next_pagep;
559 if ((ret = __ham_add_ovflpage(dbc, &new_pagep)) != 0)
560 goto err;
561
562 if (next_pagep != hcp->page) {
563 if ((ret = __memp_fput(mpf,
564 dbc->thread_info, next_pagep, dbc->priority)) != 0)
565 goto err;
566 next_pagep = NULL;
567 /* Dirty the original page to update it. */
568 if ((ret = __memp_dirty(mpf, &hcp->page,
569 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
570 goto err;
571 }
572 }
573
574 /* Copy the item to the new page. */
575 if (DBC_LOGGING(dbc)) {
576 memset(&k, 0, sizeof(DBT));
577 d.flags = 0;
578 if (HPAGE_PTYPE(
579 H_PAIRKEY(dbp, hcp->page, hcp->indx)) == H_OFFPAGE) {
580 k.data = H_PAIRKEY(dbp, hcp->page, hcp->indx);
581 k.size = HOFFPAGE_SIZE;
582 key_type = H_OFFPAGE;
583 } else {
584 k.data =
585 HKEYDATA_DATA(H_PAIRKEY(dbp, hcp->page, hcp->indx));
586 k.size =
587 LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx);
588 key_type = H_KEYDATA;
589 }
590
591 /* Resolve the insert index so it can be written to the log. */
592 if ((ret = __ham_getindex(dbc, new_pagep, &k,
593 key_type, &match, &new_indx)) != 0)
594 return (ret);
595
596 if ((data_type = HPAGE_PTYPE(hk)) == H_OFFPAGE) {
597 d.data = hk;
598 d.size = HOFFPAGE_SIZE;
599 } else if (data_type == H_OFFDUP) {
600 d.data = hk;
601 d.size = HOFFDUP_SIZE;
602 } else {
603 d.data = HKEYDATA_DATA(hk);
604 d.size = LEN_HDATA(dbp,
605 hcp->page, dbp->pgsize, hcp->indx);
606 }
607
608 if ((ret = __ham_insdel_log(dbp, dbc->txn, &new_lsn,
609 0, PUTPAIR, PGNO(new_pagep), (u_int32_t)new_indx,
610 &LSN(new_pagep), OP_SET(key_type, new_pagep), &k,
611 OP_SET(data_type, new_pagep), &d)) != 0) {
612 (void)__memp_fput(mpf,
613 dbc->thread_info, new_pagep, dbc->priority);
614 return (ret);
615 }
616 } else {
617 LSN_NOT_LOGGED(new_lsn);
618 /*
619 * Ensure that an invalid index is passed to __ham_copypair, so
620 * it knows to resolve the index. Resolving the insert index
621 * here would require creating a temporary DBT with the key,
622 * and calling __ham_getindex. Let __ham_copypair do the
623 * resolution using the final key DBT.
624 */
625 new_indx = NDX_INVALID;
626 }
627
628 /* Move lsn onto page. */
629 LSN(new_pagep) = new_lsn; /* Structure assignment. */
630
631 if ((ret = __ham_copypair(dbc, hcp->page,
632 H_KEYINDEX(hcp->indx), new_pagep, &new_indx, 0)) != 0)
633 goto err;
634
635 /* Update all cursors that used to point to this item. */
636 if ((ret = __hamc_chgpg(dbc, PGNO(hcp->page), H_KEYINDEX(hcp->indx),
637 PGNO(new_pagep), new_indx)) != 0)
638 goto err;
639
640 /* Now delete the pair from the current page. */
641 if ((ret = __ham_del_pair(dbc, HAM_DEL_NO_RECLAIM, NULL)) != 0)
642 goto err;
643
644 /*
645 * __ham_del_pair decremented nelem. This is incorrect; we
646 * manually copied the element elsewhere, so the total number
647 * of elements hasn't changed. Increment it again.
648 *
649 * !!!
650 * Note that we still have the metadata page pinned, and
651 * __ham_del_pair dirtied it, so we don't need to set the dirty
652 * flag again.
653 */
654 if (!STD_LOCKING(dbc))
655 hcp->hdr->nelem++;
656
657 ret = __memp_fput(mpf, dbc->thread_info, hcp->page, dbc->priority);
658 hcp->page = new_pagep;
659 hcp->pgno = PGNO(hcp->page);
660 hcp->indx = new_indx;
661 F_SET(hcp, H_EXPAND);
662 F_CLR(hcp, H_DELETED);
663
664 return (ret);
665
666 err: if (new_pagep != NULL)
667 (void)__memp_fput(mpf,
668 dbc->thread_info, new_pagep, dbc->priority);
669 if (next_pagep != NULL &&
670 next_pagep != hcp->page && next_pagep != new_pagep)
671 (void)__memp_fput(mpf,
672 dbc->thread_info, next_pagep, dbc->priority);
673 return (ret);
674
675 }
676
677 /*
678 * __ham_move_offpage --
679 * Replace an onpage set of duplicates with the OFFDUP structure
680 * that references the duplicate page.
681 *
682 * This is really just a special case of __onpage_replace; we should
683 * probably combine them.
684 *
685 */
686 static int
__ham_move_offpage(dbc,pagep,ndx,pgno)687 __ham_move_offpage(dbc, pagep, ndx, pgno)
688 DBC *dbc;
689 PAGE *pagep;
690 u_int32_t ndx;
691 db_pgno_t pgno;
692 {
693 DB *dbp;
694 DBT new_dbt;
695 DBT old_dbt;
696 HOFFDUP od;
697 db_indx_t i, *inp;
698 int32_t difflen;
699 u_int8_t *src;
700 int ret;
701
702 dbp = dbc->dbp;
703 od.type = H_OFFDUP;
704 UMRW_SET(od.unused[0]);
705 UMRW_SET(od.unused[1]);
706 UMRW_SET(od.unused[2]);
707 od.pgno = pgno;
708 ret = 0;
709
710 if (DBC_LOGGING(dbc)) {
711 HKEYDATA *hk;
712 new_dbt.data = &od;
713 new_dbt.size = HOFFDUP_SIZE;
714 hk = (HKEYDATA *)P_ENTRY(dbp, pagep, ndx);
715 if (hk->type == H_KEYDATA || hk->type == H_DUPLICATE) {
716 old_dbt.data = hk->data;
717 old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
718 SSZA(HKEYDATA, data);
719 } else {
720 old_dbt.data = hk;
721 old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx);
722 }
723 if ((ret = __ham_replace_log(dbp, dbc->txn, &LSN(pagep), 0,
724 PGNO(pagep), (u_int32_t)ndx, &LSN(pagep), -1,
725 OP_SET(hk->type, pagep), &old_dbt,
726 OP_SET(H_OFFDUP, pagep), &new_dbt)) != 0)
727 return (ret);
728 } else
729 LSN_NOT_LOGGED(LSN(pagep));
730
731 /*
732 * difflen is the difference in the lengths, and so may be negative.
733 * We know that the difference between two unsigned lengths from a
734 * database page will fit into an int32_t.
735 */
736 difflen =
737 (int32_t)LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
738 (int32_t)HOFFDUP_SIZE;
739 if (difflen != 0) {
740 /* Copy data. */
741 inp = P_INP(dbp, pagep);
742 src = (u_int8_t *)(pagep) + HOFFSET(pagep);
743 memmove(src + difflen, src, inp[ndx] - HOFFSET(pagep));
744 HOFFSET(pagep) += difflen;
745
746 /* Update index table. */
747 for (i = ndx; i < NUM_ENT(pagep); i++)
748 inp[i] += difflen;
749 }
750
751 /* Now copy the offdup entry onto the page. */
752 memcpy(P_ENTRY(dbp, pagep, ndx), &od, HOFFDUP_SIZE);
753 return (ret);
754 }
755
756 /*
757 * __ham_dsearch:
758 * Locate a particular duplicate in a duplicate set. Make sure that
759 * we exit with the cursor set appropriately.
760 *
761 * PUBLIC: void __ham_dsearch
762 * PUBLIC: __P((DBC *, DBT *, u_int32_t *, int *, u_int32_t));
763 */
764 void
__ham_dsearch(dbc,dbt,offp,cmpp,flags)765 __ham_dsearch(dbc, dbt, offp, cmpp, flags)
766 DBC *dbc;
767 DBT *dbt;
768 u_int32_t *offp, flags;
769 int *cmpp;
770 {
771 DB *dbp;
772 DBT cur;
773 HASH_CURSOR *hcp;
774 db_indx_t i, len;
775 int (*func) __P((DB *, const DBT *, const DBT *, size_t *));
776 u_int8_t *data;
777
778 dbp = dbc->dbp;
779 hcp = (HASH_CURSOR *)dbc->internal;
780 func = dbp->dup_compare == NULL ? __dbt_defcmp : dbp->dup_compare;
781
782 i = F_ISSET(hcp, H_CONTINUE) ? hcp->dup_off: 0;
783 data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + i;
784 hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
785 len = hcp->dup_len;
786 while (i < hcp->dup_tlen) {
787 memcpy(&len, data, sizeof(db_indx_t));
788 data += sizeof(db_indx_t);
789 DB_SET_DBT(cur, data, len);
790
791 /*
792 * If we find an exact match, we're done. If in a sorted
793 * duplicate set and the item is larger than our test item,
794 * we're done. In the latter case, if permitting partial
795 * matches, it's not a failure.
796 */
797 *cmpp = func(dbp, dbt, &cur, NULL);
798 if (*cmpp == 0)
799 break;
800 if (*cmpp < 0 && dbp->dup_compare != NULL) {
801 if (flags == DB_GET_BOTH_RANGE)
802 *cmpp = 0;
803 break;
804 }
805
806 i += len + 2 * sizeof(db_indx_t);
807 data += len + sizeof(db_indx_t);
808 }
809
810 *offp = i;
811 hcp->dup_off = i;
812 hcp->dup_len = len;
813 F_SET(hcp, H_ISDUP);
814 }
815
816 /*
817 * __ham_dcursor --
818 *
819 * Create an off page duplicate cursor for this cursor.
820 */
821 static int
__ham_dcursor(dbc,pgno,indx)822 __ham_dcursor(dbc, pgno, indx)
823 DBC *dbc;
824 db_pgno_t pgno;
825 u_int32_t indx;
826 {
827 BTREE_CURSOR *dcp;
828 DB *dbp;
829 HASH_CURSOR *hcp;
830 int ret;
831
832 dbp = dbc->dbp;
833 hcp = (HASH_CURSOR *)dbc->internal;
834
835 if ((ret = __dbc_newopd(dbc, pgno, hcp->opd, &hcp->opd)) != 0)
836 return (ret);
837
838 dcp = (BTREE_CURSOR *)hcp->opd->internal;
839 dcp->pgno = pgno;
840 dcp->indx = indx;
841
842 if (dbp->dup_compare == NULL) {
843 /*
844 * Converting to off-page Recno trees is tricky. The
845 * record number for the cursor is the index + 1 (to
846 * convert to 1-based record numbers).
847 */
848 dcp->recno = indx + 1;
849 }
850
851 /*
852 * Transfer the deleted flag from the top-level cursor to the
853 * created one.
854 */
855 if (F_ISSET(hcp, H_DELETED)) {
856 F_SET(dcp, C_DELETED);
857 F_CLR(hcp, H_DELETED);
858 }
859
860 return (0);
861 }
862
863 struct __hamc_chgpg_args {
864 db_pgno_t new_pgno;
865 db_indx_t new_index;
866 DB_TXN *my_txn;
867 };
868
869 static int
__hamc_chgpg_func(cp,my_dbc,foundp,old_pgno,old_index,vargs)870 __hamc_chgpg_func(cp, my_dbc, foundp, old_pgno, old_index, vargs)
871 DBC *cp, *my_dbc;
872 u_int32_t *foundp;
873 db_pgno_t old_pgno;
874 u_int32_t old_index;
875 void *vargs;
876 {
877 HASH_CURSOR *hcp;
878 struct __hamc_chgpg_args *args;
879
880 if (cp == my_dbc || cp->dbtype != DB_HASH)
881 return (0);
882
883 hcp = (HASH_CURSOR *)cp->internal;
884
885 /*
886 * If a cursor is deleted, it doesn't refer to this
887 * item--it just happens to have the same indx, but
888 * it points to a former neighbor. Don't move it.
889 */
890 if (F_ISSET(hcp, H_DELETED))
891 return (0);
892
893 args = vargs;
894
895 if (hcp->pgno == old_pgno &&
896 hcp->indx == old_index &&
897 !MVCC_SKIP_CURADJ(cp, old_pgno)) {
898 hcp->pgno = args->new_pgno;
899 hcp->indx = args->new_index;
900 if (args->my_txn != NULL && cp->txn != args->my_txn)
901 *foundp = 1;
902 }
903 return (0);
904 }
905
906 /*
907 * __hamc_chgpg --
908 * Adjust the cursors after moving an item to a new page. We only
909 * move cursors that are pointing at this one item and are not
910 * deleted; since we only touch non-deleted cursors, and since
911 * (by definition) no item existed at the pgno/indx we're moving the
912 * item to, we're guaranteed that all the cursors we affect here or
913 * on abort really do refer to this one item.
914 */
915 static int
__hamc_chgpg(dbc,old_pgno,old_index,new_pgno,new_index)916 __hamc_chgpg(dbc, old_pgno, old_index, new_pgno, new_index)
917 DBC *dbc;
918 db_pgno_t old_pgno, new_pgno;
919 u_int32_t old_index, new_index;
920 {
921 DB *dbp;
922 DB_LSN lsn;
923 int ret;
924 u_int32_t found;
925 struct __hamc_chgpg_args args;
926
927 dbp = dbc->dbp;
928
929 args.my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
930 args.new_pgno = new_pgno;
931 args.new_index = new_index;
932
933 if ((ret = __db_walk_cursors(dbp, dbc,
934 __hamc_chgpg_func, &found, old_pgno, old_index, &args)) != 0)
935 return (ret);
936 if (found != 0 && DBC_LOGGING(dbc)) {
937 if ((ret = __ham_chgpg_log(dbp,
938 args.my_txn, &lsn, 0, DB_HAM_CHGPG,
939 old_pgno, new_pgno, old_index, new_index)) != 0)
940 return (ret);
941 }
942 return (0);
943 }
944