xref: /freebsd/sys/contrib/ck/src/ck_ht.c (revision 1323ec57)
1 /*
2  * Copyright 2012-2015 Samy Al Bahra.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 #define CK_HT_IM
28 #include <ck_ht.h>
29 
30 /*
31  * This implementation borrows several techniques from Josh Dybnis's
32  * nbds library which can be found at http://code.google.com/p/nbds
33  */
34 #include <ck_cc.h>
35 #include <ck_md.h>
36 #include <ck_pr.h>
37 #include <ck_stdint.h>
38 #include <ck_stdbool.h>
39 #include <ck_string.h>
40 
41 #include "ck_ht_hash.h"
42 #include "ck_internal.h"
43 
44 #ifndef CK_HT_BUCKET_LENGTH
45 
46 #ifdef CK_HT_PP
47 #define CK_HT_BUCKET_SHIFT 2ULL
48 #else
49 #define CK_HT_BUCKET_SHIFT 1ULL
50 #endif
51 
52 #define CK_HT_BUCKET_LENGTH (1U << CK_HT_BUCKET_SHIFT)
53 #define CK_HT_BUCKET_MASK (CK_HT_BUCKET_LENGTH - 1)
54 #endif
55 
56 #ifndef CK_HT_PROBE_DEFAULT
57 #define CK_HT_PROBE_DEFAULT 64ULL
58 #endif
59 
60 #if defined(CK_F_PR_LOAD_8) && defined(CK_F_PR_STORE_8)
61 #define CK_HT_WORD	    uint8_t
62 #define CK_HT_WORD_MAX	    UINT8_MAX
63 #define CK_HT_STORE(x, y)   ck_pr_store_8(x, y)
64 #define CK_HT_LOAD(x)	    ck_pr_load_8(x)
65 #elif defined(CK_F_PR_LOAD_16) && defined(CK_F_PR_STORE_16)
66 #define CK_HT_WORD	    uint16_t
67 #define CK_HT_WORD_MAX	    UINT16_MAX
68 #define CK_HT_STORE(x, y)   ck_pr_store_16(x, y)
69 #define CK_HT_LOAD(x)	    ck_pr_load_16(x)
70 #elif defined(CK_F_PR_LOAD_32) && defined(CK_F_PR_STORE_32)
71 #define CK_HT_WORD	    uint32_t
72 #define CK_HT_WORD_MAX	    UINT32_MAX
73 #define CK_HT_STORE(x, y)   ck_pr_store_32(x, y)
74 #define CK_HT_LOAD(x)	    ck_pr_load_32(x)
75 #else
76 #error "ck_ht is not supported on your platform."
77 #endif
78 
79 struct ck_ht_map {
80 	unsigned int mode;
81 	CK_HT_TYPE deletions;
82 	CK_HT_TYPE probe_maximum;
83 	CK_HT_TYPE probe_length;
84 	CK_HT_TYPE probe_limit;
85 	CK_HT_TYPE size;
86 	CK_HT_TYPE n_entries;
87 	CK_HT_TYPE mask;
88 	CK_HT_TYPE capacity;
89 	CK_HT_TYPE step;
90 	CK_HT_WORD *probe_bound;
91 	struct ck_ht_entry *entries;
92 };
93 
94 void
95 ck_ht_stat(struct ck_ht *table,
96     struct ck_ht_stat *st)
97 {
98 	struct ck_ht_map *map = table->map;
99 
100 	st->n_entries = map->n_entries;
101 	st->probe_maximum = map->probe_maximum;
102 	return;
103 }
104 
105 void
106 ck_ht_hash(struct ck_ht_hash *h,
107     struct ck_ht *table,
108     const void *key,
109     uint16_t key_length)
110 {
111 
112 	table->h(h, key, key_length, table->seed);
113 	return;
114 }
115 
116 void
117 ck_ht_hash_direct(struct ck_ht_hash *h,
118     struct ck_ht *table,
119     uintptr_t key)
120 {
121 
122 	ck_ht_hash(h, table, &key, sizeof(key));
123 	return;
124 }
125 
126 static void
127 ck_ht_hash_wrapper(struct ck_ht_hash *h,
128     const void *key,
129     size_t length,
130     uint64_t seed)
131 {
132 
133 	h->value = (unsigned long)MurmurHash64A(key, length, seed);
134 	return;
135 }
136 
137 static struct ck_ht_map *
138 ck_ht_map_create(struct ck_ht *table, CK_HT_TYPE entries)
139 {
140 	struct ck_ht_map *map;
141 	CK_HT_TYPE size;
142 	uintptr_t prefix;
143 	uint32_t n_entries;
144 
145 	n_entries = ck_internal_power_2(entries);
146 	if (n_entries < CK_HT_BUCKET_LENGTH)
147 		n_entries = CK_HT_BUCKET_LENGTH;
148 
149 	size = sizeof(struct ck_ht_map) +
150 		   (sizeof(struct ck_ht_entry) * n_entries + CK_MD_CACHELINE - 1);
151 
152 	if (table->mode & CK_HT_WORKLOAD_DELETE) {
153 		prefix = sizeof(CK_HT_WORD) * n_entries;
154 		size += prefix;
155 	} else {
156 		prefix = 0;
157 	}
158 
159 	map = table->m->malloc(size);
160 	if (map == NULL)
161 		return NULL;
162 
163 	map->mode = table->mode;
164 	map->size = size;
165 	map->probe_limit = ck_internal_max_64(n_entries >>
166 	    (CK_HT_BUCKET_SHIFT + 2), CK_HT_PROBE_DEFAULT);
167 
168 	map->deletions = 0;
169 	map->probe_maximum = 0;
170 	map->capacity = n_entries;
171 	map->step = ck_cc_ffsll(map->capacity);
172 	map->mask = map->capacity - 1;
173 	map->n_entries = 0;
174 	map->entries = (struct ck_ht_entry *)(((uintptr_t)&map[1] + prefix +
175 	    CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1));
176 
177 	if (table->mode & CK_HT_WORKLOAD_DELETE) {
178 		map->probe_bound = (CK_HT_WORD *)&map[1];
179 		memset(map->probe_bound, 0, prefix);
180 	} else {
181 		map->probe_bound = NULL;
182 	}
183 
184 	memset(map->entries, 0, sizeof(struct ck_ht_entry) * n_entries);
185 	ck_pr_fence_store();
186 	return map;
187 }
188 
189 static inline void
190 ck_ht_map_bound_set(struct ck_ht_map *m,
191     struct ck_ht_hash h,
192     CK_HT_TYPE n_probes)
193 {
194 	CK_HT_TYPE offset = h.value & m->mask;
195 
196 	if (n_probes > m->probe_maximum)
197 		CK_HT_TYPE_STORE(&m->probe_maximum, n_probes);
198 
199 	if (m->probe_bound != NULL && m->probe_bound[offset] < n_probes) {
200 		if (n_probes >= CK_HT_WORD_MAX)
201 			n_probes = CK_HT_WORD_MAX;
202 
203 		CK_HT_STORE(&m->probe_bound[offset], n_probes);
204 		ck_pr_fence_store();
205 	}
206 
207 	return;
208 }
209 
210 static inline CK_HT_TYPE
211 ck_ht_map_bound_get(struct ck_ht_map *m, struct ck_ht_hash h)
212 {
213 	CK_HT_TYPE offset = h.value & m->mask;
214 	CK_HT_TYPE r = CK_HT_WORD_MAX;
215 
216 	if (m->probe_bound != NULL) {
217 		r = CK_HT_LOAD(&m->probe_bound[offset]);
218 		if (r == CK_HT_WORD_MAX)
219 			r = CK_HT_TYPE_LOAD(&m->probe_maximum);
220 	} else {
221 		r = CK_HT_TYPE_LOAD(&m->probe_maximum);
222 	}
223 
224 	return r;
225 }
226 
227 static void
228 ck_ht_map_destroy(struct ck_malloc *m, struct ck_ht_map *map, bool defer)
229 {
230 
231 	m->free(map, map->size, defer);
232 	return;
233 }
234 
235 static inline size_t
236 ck_ht_map_probe_next(struct ck_ht_map *map, size_t offset, ck_ht_hash_t h, size_t probes)
237 {
238 	ck_ht_hash_t r;
239 	size_t stride;
240 	unsigned long level = (unsigned long)probes >> CK_HT_BUCKET_SHIFT;
241 
242 	r.value = (h.value >> map->step) >> level;
243 	stride = (r.value & ~CK_HT_BUCKET_MASK) << 1
244 		     | (r.value & CK_HT_BUCKET_MASK);
245 
246 	return (offset + level +
247 	    (stride | CK_HT_BUCKET_LENGTH)) & map->mask;
248 }
249 
250 bool
251 ck_ht_init(struct ck_ht *table,
252     unsigned int mode,
253     ck_ht_hash_cb_t *h,
254     struct ck_malloc *m,
255     CK_HT_TYPE entries,
256     uint64_t seed)
257 {
258 
259 	if (m == NULL || m->malloc == NULL || m->free == NULL)
260 		return false;
261 
262 	table->m = m;
263 	table->mode = mode;
264 	table->seed = seed;
265 
266 	if (h == NULL) {
267 		table->h = ck_ht_hash_wrapper;
268 	} else {
269 		table->h = h;
270 	}
271 
272 	table->map = ck_ht_map_create(table, entries);
273 	return table->map != NULL;
274 }
275 
276 static struct ck_ht_entry *
277 ck_ht_map_probe_wr(struct ck_ht_map *map,
278     ck_ht_hash_t h,
279     ck_ht_entry_t *snapshot,
280     ck_ht_entry_t **available,
281     const void *key,
282     uint16_t key_length,
283     CK_HT_TYPE *probe_limit,
284     CK_HT_TYPE *probe_wr)
285 {
286 	struct ck_ht_entry *bucket, *cursor;
287 	struct ck_ht_entry *first = NULL;
288 	size_t offset, i, j;
289 	CK_HT_TYPE probes = 0;
290 	CK_HT_TYPE limit;
291 
292 	if (probe_limit == NULL) {
293 		limit = ck_ht_map_bound_get(map, h);
294 	} else {
295 		limit = CK_HT_TYPE_MAX;
296 	}
297 
298 	offset = h.value & map->mask;
299 	for (i = 0; i < map->probe_limit; i++) {
300 		/*
301 		 * Probe on a complete cache line first. Scan forward and wrap around to
302 		 * the beginning of the cache line. Only when the complete cache line has
303 		 * been scanned do we move on to the next row.
304 		 */
305 		bucket = (void *)((uintptr_t)(map->entries + offset) &
306 			     ~(CK_MD_CACHELINE - 1));
307 
308 		for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
309 			uint16_t k;
310 
311 			if (probes++ > limit)
312 				break;
313 
314 			cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
315 
316 			/*
317 			 * It is probably worth it to encapsulate probe state
318 			 * in order to prevent a complete reprobe sequence in
319 			 * the case of intermittent writers.
320 			 */
321 			if (cursor->key == CK_HT_KEY_TOMBSTONE) {
322 				if (first == NULL) {
323 					first = cursor;
324 					*probe_wr = probes;
325 				}
326 
327 				continue;
328 			}
329 
330 			if (cursor->key == CK_HT_KEY_EMPTY)
331 				goto leave;
332 
333 			if (cursor->key == (uintptr_t)key)
334 				goto leave;
335 
336 			if (map->mode & CK_HT_MODE_BYTESTRING) {
337 				void *pointer;
338 
339 				/*
340 				 * Check memoized portion of hash value before
341 				 * expensive full-length comparison.
342 				 */
343 				k = ck_ht_entry_key_length(cursor);
344 				if (k != key_length)
345 					continue;
346 
347 #ifdef CK_HT_PP
348 				if ((cursor->value >> CK_MD_VMA_BITS) != ((h.value >> 32) & CK_HT_KEY_MASK))
349 					continue;
350 #else
351 				if (cursor->hash != h.value)
352 					continue;
353 #endif
354 
355 				pointer = ck_ht_entry_key(cursor);
356 				if (memcmp(pointer, key, key_length) == 0)
357 					goto leave;
358 			}
359 		}
360 
361 		offset = ck_ht_map_probe_next(map, offset, h, probes);
362 	}
363 
364 	cursor = NULL;
365 
366 leave:
367 	if (probe_limit != NULL) {
368 		*probe_limit = probes;
369 	} else if (first == NULL) {
370 		*probe_wr = probes;
371 	}
372 
373 	*available = first;
374 
375 	if (cursor != NULL) {
376 		*snapshot = *cursor;
377 	}
378 
379 	return cursor;
380 }
381 
382 bool
383 ck_ht_gc(struct ck_ht *ht, unsigned long cycles, unsigned long seed)
384 {
385 	CK_HT_WORD *bounds = NULL;
386 	struct ck_ht_map *map = ht->map;
387 	CK_HT_TYPE maximum, i;
388 	CK_HT_TYPE size = 0;
389 
390 	if (map->n_entries == 0) {
391 		CK_HT_TYPE_STORE(&map->probe_maximum, 0);
392 		if (map->probe_bound != NULL)
393 			memset(map->probe_bound, 0, sizeof(CK_HT_WORD) * map->capacity);
394 
395 		return true;
396 	}
397 
398 	if (cycles == 0) {
399 		maximum = 0;
400 
401 		if (map->probe_bound != NULL) {
402 			size = sizeof(CK_HT_WORD) * map->capacity;
403 			bounds = ht->m->malloc(size);
404 			if (bounds == NULL)
405 				return false;
406 
407 			memset(bounds, 0, size);
408 		}
409 	} else {
410 		maximum = map->probe_maximum;
411 	}
412 
413 	for (i = 0; i < map->capacity; i++) {
414 		struct ck_ht_entry *entry, *priority, snapshot;
415 		struct ck_ht_hash h;
416 		CK_HT_TYPE probes_wr;
417 		CK_HT_TYPE offset;
418 
419 		entry = &map->entries[(i + seed) & map->mask];
420 		if (entry->key == CK_HT_KEY_EMPTY ||
421 		    entry->key == CK_HT_KEY_TOMBSTONE) {
422 			continue;
423 		}
424 
425 		if (ht->mode & CK_HT_MODE_BYTESTRING) {
426 #ifndef CK_HT_PP
427 			h.value = entry->hash;
428 #else
429 			ht->h(&h, ck_ht_entry_key(entry), ck_ht_entry_key_length(entry),
430 			    ht->seed);
431 #endif
432 			entry = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
433 			    ck_ht_entry_key(entry),
434 			    ck_ht_entry_key_length(entry),
435 			    NULL, &probes_wr);
436 		} else {
437 #ifndef CK_HT_PP
438 			h.value = entry->hash;
439 #else
440 			ht->h(&h, &entry->key, sizeof(entry->key), ht->seed);
441 #endif
442 			entry = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
443 			    (void *)entry->key,
444 			    sizeof(entry->key),
445 			    NULL, &probes_wr);
446 		}
447 
448 		offset = h.value & map->mask;
449 
450 		if (priority != NULL) {
451 			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
452 			ck_pr_fence_store();
453 #ifndef CK_HT_PP
454 			CK_HT_TYPE_STORE(&priority->key_length, entry->key_length);
455 			CK_HT_TYPE_STORE(&priority->hash, entry->hash);
456 #endif
457 			ck_pr_store_ptr_unsafe(&priority->value, (void *)entry->value);
458 			ck_pr_fence_store();
459 			ck_pr_store_ptr_unsafe(&priority->key, (void *)entry->key);
460 			ck_pr_fence_store();
461 			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
462 			ck_pr_fence_store();
463 			ck_pr_store_ptr_unsafe(&entry->key, (void *)CK_HT_KEY_TOMBSTONE);
464 			ck_pr_fence_store();
465 		}
466 
467 		if (cycles == 0) {
468 			if (probes_wr > maximum)
469 				maximum = probes_wr;
470 
471 			if (probes_wr >= CK_HT_WORD_MAX)
472 				probes_wr = CK_HT_WORD_MAX;
473 
474 			if (bounds != NULL && probes_wr > bounds[offset])
475 				bounds[offset] = probes_wr;
476 		} else if (--cycles == 0)
477 			break;
478 	}
479 
480 	if (maximum != map->probe_maximum)
481 		CK_HT_TYPE_STORE(&map->probe_maximum, maximum);
482 
483 	if (bounds != NULL) {
484 		for (i = 0; i < map->capacity; i++)
485 			CK_HT_STORE(&map->probe_bound[i], bounds[i]);
486 
487 		ht->m->free(bounds, size, false);
488 	}
489 
490 	return true;
491 }
492 
493 static struct ck_ht_entry *
494 ck_ht_map_probe_rd(struct ck_ht_map *map,
495     ck_ht_hash_t h,
496     ck_ht_entry_t *snapshot,
497     const void *key,
498     uint16_t key_length)
499 {
500 	struct ck_ht_entry *bucket, *cursor;
501 	size_t offset, i, j;
502 	CK_HT_TYPE probes = 0;
503 	CK_HT_TYPE probe_maximum;
504 
505 #ifndef CK_HT_PP
506 	CK_HT_TYPE d = 0;
507 	CK_HT_TYPE d_prime = 0;
508 retry:
509 #endif
510 
511 	probe_maximum = ck_ht_map_bound_get(map, h);
512 	offset = h.value & map->mask;
513 
514 	for (i = 0; i < map->probe_limit; i++) {
515 		/*
516 		 * Probe on a complete cache line first. Scan forward and wrap around to
517 		 * the beginning of the cache line. Only when the complete cache line has
518 		 * been scanned do we move on to the next row.
519 		 */
520 		bucket = (void *)((uintptr_t)(map->entries + offset) &
521 			     ~(CK_MD_CACHELINE - 1));
522 
523 		for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
524 			uint16_t k;
525 
526 			if (probes++ > probe_maximum)
527 				return NULL;
528 
529 			cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
530 
531 #ifdef CK_HT_PP
532 			snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
533 			ck_pr_fence_load();
534 			snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
535 #else
536 			d = CK_HT_TYPE_LOAD(&map->deletions);
537 			snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
538 			ck_pr_fence_load();
539 			snapshot->key_length = CK_HT_TYPE_LOAD(&cursor->key_length);
540 			snapshot->hash = CK_HT_TYPE_LOAD(&cursor->hash);
541 			snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
542 #endif
543 
544 			/*
545 			 * It is probably worth it to encapsulate probe state
546 			 * in order to prevent a complete reprobe sequence in
547 			 * the case of intermittent writers.
548 			 */
549 			if (snapshot->key == CK_HT_KEY_TOMBSTONE)
550 				continue;
551 
552 			if (snapshot->key == CK_HT_KEY_EMPTY)
553 				goto leave;
554 
555 			if (snapshot->key == (uintptr_t)key)
556 				goto leave;
557 
558 			if (map->mode & CK_HT_MODE_BYTESTRING) {
559 				void *pointer;
560 
561 				/*
562 				 * Check memoized portion of hash value before
563 				 * expensive full-length comparison.
564 				 */
565 				k = ck_ht_entry_key_length(snapshot);
566 				if (k != key_length)
567 					continue;
568 #ifdef CK_HT_PP
569 				if ((snapshot->value >> CK_MD_VMA_BITS) != ((h.value >> 32) & CK_HT_KEY_MASK))
570 					continue;
571 #else
572 				if (snapshot->hash != h.value)
573 					continue;
574 
575 				d_prime = CK_HT_TYPE_LOAD(&map->deletions);
576 
577 				/*
578 				 * It is possible that the slot was
579 				 * replaced, initiate a re-probe.
580 				 */
581 				if (d != d_prime)
582 					goto retry;
583 #endif
584 
585 				pointer = ck_ht_entry_key(snapshot);
586 				if (memcmp(pointer, key, key_length) == 0)
587 					goto leave;
588 			}
589 		}
590 
591 		offset = ck_ht_map_probe_next(map, offset, h, probes);
592 	}
593 
594 	return NULL;
595 
596 leave:
597 	return cursor;
598 }
599 
600 CK_HT_TYPE
601 ck_ht_count(struct ck_ht *table)
602 {
603 	struct ck_ht_map *map = ck_pr_load_ptr(&table->map);
604 
605 	return CK_HT_TYPE_LOAD(&map->n_entries);
606 }
607 
608 bool
609 ck_ht_next(struct ck_ht *table,
610     struct ck_ht_iterator *i,
611     struct ck_ht_entry **entry)
612 {
613 	struct ck_ht_map *map = table->map;
614 	uintptr_t key;
615 
616 	if (i->offset >= map->capacity)
617 		return false;
618 
619 	do {
620 		key = map->entries[i->offset].key;
621 		if (key != CK_HT_KEY_EMPTY && key != CK_HT_KEY_TOMBSTONE)
622 			break;
623 	} while (++i->offset < map->capacity);
624 
625 	if (i->offset >= map->capacity)
626 		return false;
627 
628 	*entry = map->entries + i->offset++;
629 	return true;
630 }
631 
632 bool
633 ck_ht_reset_size_spmc(struct ck_ht *table, CK_HT_TYPE size)
634 {
635 	struct ck_ht_map *map, *update;
636 
637 	map = table->map;
638 	update = ck_ht_map_create(table, size);
639 	if (update == NULL)
640 		return false;
641 
642 	ck_pr_store_ptr_unsafe(&table->map, update);
643 	ck_ht_map_destroy(table->m, map, true);
644 	return true;
645 }
646 
647 bool
648 ck_ht_reset_spmc(struct ck_ht *table)
649 {
650 	struct ck_ht_map *map = table->map;
651 
652 	return ck_ht_reset_size_spmc(table, map->capacity);
653 }
654 
655 bool
656 ck_ht_grow_spmc(struct ck_ht *table, CK_HT_TYPE capacity)
657 {
658 	struct ck_ht_map *map, *update;
659 	struct ck_ht_entry *bucket, *previous;
660 	struct ck_ht_hash h;
661 	size_t k, i, j, offset;
662 	CK_HT_TYPE probes;
663 
664 restart:
665 	map = table->map;
666 
667 	if (map->capacity >= capacity)
668 		return false;
669 
670 	update = ck_ht_map_create(table, capacity);
671 	if (update == NULL)
672 		return false;
673 
674 	for (k = 0; k < map->capacity; k++) {
675 		previous = &map->entries[k];
676 
677 		if (previous->key == CK_HT_KEY_EMPTY || previous->key == CK_HT_KEY_TOMBSTONE)
678 			continue;
679 
680 		if (table->mode & CK_HT_MODE_BYTESTRING) {
681 #ifdef CK_HT_PP
682 			void *key;
683 			uint16_t key_length;
684 
685 			key = ck_ht_entry_key(previous);
686 			key_length = ck_ht_entry_key_length(previous);
687 #endif
688 
689 #ifndef CK_HT_PP
690 			h.value = previous->hash;
691 #else
692 			table->h(&h, key, key_length, table->seed);
693 #endif
694 		} else {
695 #ifndef CK_HT_PP
696 			h.value = previous->hash;
697 #else
698 			table->h(&h, &previous->key, sizeof(previous->key), table->seed);
699 #endif
700 		}
701 
702 		offset = h.value & update->mask;
703 		probes = 0;
704 
705 		for (i = 0; i < update->probe_limit; i++) {
706 			bucket = (void *)((uintptr_t)(update->entries + offset) & ~(CK_MD_CACHELINE - 1));
707 
708 			for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
709 				struct ck_ht_entry *cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
710 
711 				probes++;
712 				if (CK_CC_LIKELY(cursor->key == CK_HT_KEY_EMPTY)) {
713 					*cursor = *previous;
714 					update->n_entries++;
715 					ck_ht_map_bound_set(update, h, probes);
716 					break;
717 				}
718 			}
719 
720 			if (j < CK_HT_BUCKET_LENGTH)
721 				break;
722 
723 			offset = ck_ht_map_probe_next(update, offset, h, probes);
724 		}
725 
726 		if (i == update->probe_limit) {
727 			/*
728 			 * We have hit the probe limit, the map needs to be even
729 			 * larger.
730 			 */
731 			ck_ht_map_destroy(table->m, update, false);
732 			capacity <<= 1;
733 			goto restart;
734 		}
735 	}
736 
737 	ck_pr_fence_store();
738 	ck_pr_store_ptr_unsafe(&table->map, update);
739 	ck_ht_map_destroy(table->m, map, true);
740 	return true;
741 }
742 
743 bool
744 ck_ht_remove_spmc(struct ck_ht *table,
745     ck_ht_hash_t h,
746     ck_ht_entry_t *entry)
747 {
748 	struct ck_ht_map *map;
749 	struct ck_ht_entry *candidate, snapshot;
750 
751 	map = table->map;
752 
753 	if (table->mode & CK_HT_MODE_BYTESTRING) {
754 		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
755 		    ck_ht_entry_key(entry),
756 		    ck_ht_entry_key_length(entry));
757 	} else {
758 		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
759 		    (void *)entry->key,
760 		    sizeof(entry->key));
761 	}
762 
763 	/* No matching entry was found. */
764 	if (candidate == NULL || snapshot.key == CK_HT_KEY_EMPTY)
765 		return false;
766 
767 	*entry = snapshot;
768 
769 	ck_pr_store_ptr_unsafe(&candidate->key, (void *)CK_HT_KEY_TOMBSTONE);
770 	ck_pr_fence_store();
771 	CK_HT_TYPE_STORE(&map->n_entries, map->n_entries - 1);
772 	return true;
773 }
774 
775 bool
776 ck_ht_get_spmc(struct ck_ht *table,
777     ck_ht_hash_t h,
778     ck_ht_entry_t *entry)
779 {
780 	struct ck_ht_entry *candidate, snapshot;
781 	struct ck_ht_map *map;
782 	CK_HT_TYPE d, d_prime;
783 
784 restart:
785 	map = ck_pr_load_ptr(&table->map);
786 
787 	/*
788 	 * Platforms that cannot read key and key_length atomically must reprobe
789 	 * on the scan of any single entry.
790 	 */
791 	d = CK_HT_TYPE_LOAD(&map->deletions);
792 
793 	if (table->mode & CK_HT_MODE_BYTESTRING) {
794 		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
795 		    ck_ht_entry_key(entry), ck_ht_entry_key_length(entry));
796 	} else {
797 		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
798 		    (void *)entry->key, sizeof(entry->key));
799 	}
800 
801 	d_prime = CK_HT_TYPE_LOAD(&map->deletions);
802 	if (d != d_prime) {
803 		/*
804 		 * It is possible we have read (K, V'). Only valid states are
805 		 * (K, V), (K', V') and (T, V). Restart load operation in face
806 		 * of concurrent deletions or replacements.
807 		 */
808 		goto restart;
809 	}
810 
811 	if (candidate == NULL || snapshot.key == CK_HT_KEY_EMPTY)
812 		return false;
813 
814 	*entry = snapshot;
815 	return true;
816 }
817 
818 bool
819 ck_ht_set_spmc(struct ck_ht *table,
820     ck_ht_hash_t h,
821     ck_ht_entry_t *entry)
822 {
823 	struct ck_ht_entry snapshot, *candidate, *priority;
824 	struct ck_ht_map *map;
825 	CK_HT_TYPE probes, probes_wr;
826 	bool empty = false;
827 
828 	for (;;) {
829 		map = table->map;
830 
831 		if (table->mode & CK_HT_MODE_BYTESTRING) {
832 			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
833 			    ck_ht_entry_key(entry),
834 			    ck_ht_entry_key_length(entry),
835 			    &probes, &probes_wr);
836 		} else {
837 			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
838 			    (void *)entry->key,
839 			    sizeof(entry->key),
840 			    &probes, &probes_wr);
841 		}
842 
843 		if (priority != NULL) {
844 			probes = probes_wr;
845 			break;
846 		}
847 
848 		if (candidate != NULL)
849 			break;
850 
851 		if (ck_ht_grow_spmc(table, map->capacity << 1) == false)
852 			return false;
853 	}
854 
855 	if (candidate == NULL) {
856 		candidate = priority;
857 		empty = true;
858 	}
859 
860 	if (candidate->key != CK_HT_KEY_EMPTY &&
861 	    priority != NULL && candidate != priority) {
862 		/*
863 		 * Entry is moved into another position in probe sequence.
864 		 * We avoid a state of (K, B) (where [K, B] -> [K', B]) by
865 		 * guaranteeing a forced reprobe before transitioning from K to
866 		 * T. (K, B) implies (K, B, D') so we will reprobe successfully
867 		 * from this transient state.
868 		 */
869 		probes = probes_wr;
870 
871 #ifndef CK_HT_PP
872 		CK_HT_TYPE_STORE(&priority->key_length, entry->key_length);
873 		CK_HT_TYPE_STORE(&priority->hash, entry->hash);
874 #endif
875 
876 		/*
877 		 * Readers must observe version counter change before they
878 		 * observe re-use. If they observe re-use, it is at most
879 		 * a tombstone.
880 		 */
881 		if (priority->value == CK_HT_KEY_TOMBSTONE) {
882 			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
883 			ck_pr_fence_store();
884 		}
885 
886 		ck_pr_store_ptr_unsafe(&priority->value, (void *)entry->value);
887 		ck_pr_fence_store();
888 		ck_pr_store_ptr_unsafe(&priority->key, (void *)entry->key);
889 		ck_pr_fence_store();
890 
891 		/*
892 		 * Make sure that readers who observe the tombstone would
893 		 * also observe counter change.
894 		 */
895 		CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
896 		ck_pr_fence_store();
897 
898 		ck_pr_store_ptr_unsafe(&candidate->key, (void *)CK_HT_KEY_TOMBSTONE);
899 		ck_pr_fence_store();
900 	} else {
901 		/*
902 		 * In this case we are inserting a new entry or replacing
903 		 * an existing entry. Yes, this can be combined into above branch,
904 		 * but isn't because you are actually looking at dying code
905 		 * (ck_ht is effectively deprecated and is being replaced soon).
906 		 */
907 		bool replace = candidate->key != CK_HT_KEY_EMPTY &&
908 		    candidate->key != CK_HT_KEY_TOMBSTONE;
909 
910 		if (priority != NULL) {
911 			if (priority->key == CK_HT_KEY_TOMBSTONE) {
912 				CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
913 				ck_pr_fence_store();
914 			}
915 
916 			candidate = priority;
917 			probes = probes_wr;
918 		}
919 
920 #ifdef CK_HT_PP
921 		ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
922 		ck_pr_fence_store();
923 		ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
924 #else
925 		CK_HT_TYPE_STORE(&candidate->key_length, entry->key_length);
926 		CK_HT_TYPE_STORE(&candidate->hash, entry->hash);
927 		ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
928 		ck_pr_fence_store();
929 		ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
930 #endif
931 
932 		/*
933 		 * If we are insert a new entry then increment number
934 		 * of entries associated with map.
935 		 */
936 		if (replace == false)
937 			CK_HT_TYPE_STORE(&map->n_entries, map->n_entries + 1);
938 	}
939 
940 	ck_ht_map_bound_set(map, h, probes);
941 
942 	/* Enforce a load factor of 0.5. */
943 	if (map->n_entries * 2 > map->capacity)
944 		ck_ht_grow_spmc(table, map->capacity << 1);
945 
946 	if (empty == true) {
947 		entry->key = CK_HT_KEY_EMPTY;
948 	} else {
949 		*entry = snapshot;
950 	}
951 
952 	return true;
953 }
954 
955 bool
956 ck_ht_put_spmc(struct ck_ht *table,
957     ck_ht_hash_t h,
958     ck_ht_entry_t *entry)
959 {
960 	struct ck_ht_entry snapshot, *candidate, *priority;
961 	struct ck_ht_map *map;
962 	CK_HT_TYPE probes, probes_wr;
963 
964 	for (;;) {
965 		map = table->map;
966 
967 		if (table->mode & CK_HT_MODE_BYTESTRING) {
968 			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
969 			    ck_ht_entry_key(entry),
970 			    ck_ht_entry_key_length(entry),
971 			    &probes, &probes_wr);
972 		} else {
973 			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
974 			    (void *)entry->key,
975 			    sizeof(entry->key),
976 			    &probes, &probes_wr);
977 		}
978 
979 		if (candidate != NULL || priority != NULL)
980 			break;
981 
982 		if (ck_ht_grow_spmc(table, map->capacity << 1) == false)
983 			return false;
984 	}
985 
986 	if (priority != NULL) {
987 		/* Version counter is updated before re-use. */
988 		CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
989 		ck_pr_fence_store();
990 
991 		/* Re-use tombstone if one was found. */
992 		candidate = priority;
993 		probes = probes_wr;
994 	} else if (candidate->key != CK_HT_KEY_EMPTY &&
995 	    candidate->key != CK_HT_KEY_TOMBSTONE) {
996 		/*
997 		 * If the snapshot key is non-empty and the value field is not
998 		 * a tombstone then an identical key was found. As store does
999 		 * not implement replacement, we will fail.
1000 		 */
1001 		return false;
1002 	}
1003 
1004 	ck_ht_map_bound_set(map, h, probes);
1005 
1006 #ifdef CK_HT_PP
1007 	ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
1008 	ck_pr_fence_store();
1009 	ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
1010 #else
1011 	CK_HT_TYPE_STORE(&candidate->key_length, entry->key_length);
1012 	CK_HT_TYPE_STORE(&candidate->hash, entry->hash);
1013 	ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
1014 	ck_pr_fence_store();
1015 	ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
1016 #endif
1017 
1018 	CK_HT_TYPE_STORE(&map->n_entries, map->n_entries + 1);
1019 
1020 	/* Enforce a load factor of 0.5. */
1021 	if (map->n_entries * 2 > map->capacity)
1022 		ck_ht_grow_spmc(table, map->capacity << 1);
1023 
1024 	return true;
1025 }
1026 
1027 void
1028 ck_ht_destroy(struct ck_ht *table)
1029 {
1030 
1031 	ck_ht_map_destroy(table->m, table->map, false);
1032 	return;
1033 }
1034