1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 1998-2018. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22 ** Implementation of unordered ETS tables.
23 ** The tables are implemented as linear dynamic hash tables.
24 ** https://en.wikipedia.org/wiki/Linear_hashing
25 */
26 
27 /* SMP:
28 ** The hash table supports two different locking "modes",
29 ** coarse grained and fine grained locking.
30 **
31 ** Coarse grained locking relies entirely on the caller (erl_db.c) to obtain
32 ** the right kind of lock on the entire table depending on operation (reading
33 ** or writing). No further locking is then done by the table itself.
34 **
35 ** Fine grained locking is supported by this code to allow concurrent updates
36 ** (and reading) to different parts of the table. This works by keeping one
37 ** rw-mtx for every N'th bucket. Even dynamic growing and shrinking by
38 ** rehashing buckets can be done without exclusive table lock.
39 **
40 ** A table will support fine grained locking if it is created with flag
41 ** DB_FINE_LOCKED set. The table variable is_thread_safe will then indicate
42 ** if operations need to obtain fine grained locks or not. Some operations
43 ** will for example always use exclusive table lock to guarantee
44 ** a higher level of atomicity.
45 */
46 
47 /* FIXATION:
48 ** Fixating the table, by ets:safe_fixtable or as done by select-operations,
49 ** guarantees two things in current implementation.
50 ** (1) Keys will not *totaly* disappear from the table. A key can thus be used
51 **     as an iterator to find the next key in iteration sequence. Note however
52 **     that this does not mean that (pointers to) table objects are guaranteed
53 **     to be maintained while the table is fixated. A BAG or DBAG may actually
54 **     remove objects as long as there is at least one object left in the table
55 **     with the same key (alive or pseudo-deleted).
56 ** (2) Objects will not be moved between buckets due to table grow/shrink.
57 **     This will guarantee that iterations do not miss keys or get double-hits.
58 **
59 ** With fine grained locking, a concurrent thread can fixate the table at any
60 ** time. A "dangerous" operation (delete or move) therefore needs to check
61 ** if the table is fixated while write-locking the bucket.
62 */
63 
64 /*
65 #ifdef DEBUG
66 #define HARDDEBUG 1
67 #endif
68 */
69 
70 #ifdef HAVE_CONFIG_H
71 #  include "config.h"
72 #endif
73 
74 #include "sys.h"
75 #include "erl_vm.h"
76 #include "global.h"
77 #include "erl_process.h"
78 #include "error.h"
79 #define ERTS_WANT_DB_INTERNAL__
80 #include "erl_db.h"
81 #include "bif.h"
82 #include "big.h"
83 #include "export.h"
84 #include "erl_binary.h"
85 
86 #include "erl_db_hash.h"
87 
88 /*
89  * The following symbols can be manipulated to "tune" the linear hash array
90  */
91 #define GROW_LIMIT(NACTIVE) ((NACTIVE)*1)
92 #define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2)
93 
94 /*
95 ** We want the first mandatory segment to be small (to reduce minimal footprint)
96 ** and larger extra segments (to reduce number of alloc/free calls).
97 */
98 
99 /* Number of slots in first segment */
100 #define FIRST_SEGSZ_EXP  8
101 #define FIRST_SEGSZ      (1 << FIRST_SEGSZ_EXP)
102 #define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1)
103 
104 /* Number of slots per extra segment */
105 #define EXT_SEGSZ_EXP  11
106 #define EXT_SEGSZ   (1 << EXT_SEGSZ_EXP)
107 #define EXT_SEGSZ_MASK (EXT_SEGSZ-1)
108 
109 #define NSEG_1   (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*))
110 #define NSEG_2   256 /* Size of second segment table */
111 #define NSEG_INC 128 /* Number of segments to grow after that */
112 
113 #  define DB_USING_FINE_LOCKING(TB) (((TB))->common.type & DB_FINE_LOCKED)
114 
115 #ifdef ETHR_ORDERED_READ_DEPEND
116 #define SEGTAB(tb) ((struct segment**) erts_atomic_read_nob(&(tb)->segtab))
117 #else
118 #define SEGTAB(tb)							\
119     (DB_USING_FINE_LOCKING(tb)						\
120      ? ((struct segment**) erts_atomic_read_ddrb(&(tb)->segtab))	\
121      : ((struct segment**) erts_atomic_read_nob(&(tb)->segtab)))
122 #endif
123 #define NACTIVE(tb) ((int)erts_atomic_read_nob(&(tb)->nactive))
124 #define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems))
125 
126 #define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP)
127 
128 #define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK]
129 
130 /*
131  * When deleting a table, the number of records to delete.
132  * Approximate number, because we must delete entire buckets.
133  */
134 #define DELETE_RECORD_LIMIT 10000
135 
136 /* Calculate slot index from hash value.
137 ** RLOCK_HASH or WLOCK_HASH must be done before.
138 */
hash_to_ix(DbTableHash * tb,HashValue hval)139 static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval)
140 {
141     Uint mask = (DB_USING_FINE_LOCKING(tb)
142 		 ? erts_atomic_read_acqb(&tb->szm)
143 		 : erts_atomic_read_nob(&tb->szm));
144     Uint ix = hval & mask;
145     if (ix >= erts_atomic_read_nob(&tb->nactive)) {
146 	ix &= mask>>1;
147 	ASSERT(ix < erts_atomic_read_nob(&tb->nactive));
148     }
149     return ix;
150 }
151 
152 
alloc_fixdel(DbTableHash * tb)153 static ERTS_INLINE FixedDeletion* alloc_fixdel(DbTableHash* tb)
154 {
155     FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
156                                                          (DbTable *) tb,
157                                                          sizeof(FixedDeletion));
158     ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
159     return fixd;
160 }
161 
free_fixdel(DbTableHash * tb,FixedDeletion * fixd)162 static ERTS_INLINE void free_fixdel(DbTableHash* tb, FixedDeletion* fixd)
163 {
164     erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
165                  fixd, sizeof(FixedDeletion));
166     ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
167 }
168 
link_fixdel(DbTableHash * tb,FixedDeletion * fixd,erts_aint_t fixated_by_me)169 static ERTS_INLINE int link_fixdel(DbTableHash* tb,
170                                    FixedDeletion* fixd,
171                                    erts_aint_t fixated_by_me)
172 {
173     erts_aint_t was_next;
174     erts_aint_t exp_next;
175 
176     was_next = erts_atomic_read_acqb(&tb->fixdel);
177     do { /* Lockless atomic insertion in linked list: */
178         if (NFIXED(tb) <= fixated_by_me) {
179             free_fixdel(tb, fixd);
180             return 0; /* raced by unfixer */
181         }
182         exp_next = was_next;
183 	fixd->next = (FixedDeletion*) exp_next;
184 	was_next = erts_atomic_cmpxchg_mb(&tb->fixdel,
185                                               (erts_aint_t) fixd,
186                                               exp_next);
187     }while (was_next != exp_next);
188     return 1;
189 }
190 
191 /* Remember a slot containing a pseudo-deleted item
192  * Return false if we got raced by unfixing thread
193  * and the object should be deleted for real.
194  */
add_fixed_deletion(DbTableHash * tb,int ix,erts_aint_t fixated_by_me)195 static int add_fixed_deletion(DbTableHash* tb, int ix,
196                               erts_aint_t fixated_by_me)
197 {
198     FixedDeletion* fixd = alloc_fixdel(tb);
199     fixd->slot = ix;
200     fixd->all = 0;
201     return link_fixdel(tb, fixd, fixated_by_me);
202 }
203 
204 
is_pseudo_deleted(HashDbTerm * p)205 static ERTS_INLINE int is_pseudo_deleted(HashDbTerm* p)
206 {
207     return p->pseudo_deleted;
208 }
209 
210 
211 /* optimised version of make_hash (normal case? atomic key) */
212 #define MAKE_HASH(term) \
213     ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \
214       make_internal_hash(term, 0)) & MAX_HASH_MASK)
215 
216 #  define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1)
217 #  define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck)
218 #  define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval))
219 
220 /* Fine grained read lock */
RLOCK_HASH(DbTableHash * tb,HashValue hval)221 static ERTS_INLINE erts_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
222 {
223     if (tb->common.is_thread_safe) {
224 	return NULL;
225     } else {
226 	erts_rwmtx_t* lck = GET_LOCK(tb,hval);
227 	ASSERT(tb->common.type & DB_FINE_LOCKED);
228 	erts_rwmtx_rlock(lck);
229 	return lck;
230     }
231 }
232 /* Fine grained write lock */
WLOCK_HASH(DbTableHash * tb,HashValue hval)233 static ERTS_INLINE erts_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval)
234 {
235     if (tb->common.is_thread_safe) {
236 	return NULL;
237     } else {
238 	erts_rwmtx_t* lck = GET_LOCK(tb,hval);
239 	ASSERT(tb->common.type & DB_FINE_LOCKED);
240 	erts_rwmtx_rwlock(lck);
241 	return lck;
242     }
243 }
244 
RUNLOCK_HASH(erts_rwmtx_t * lck)245 static ERTS_INLINE void RUNLOCK_HASH(erts_rwmtx_t* lck)
246 {
247     if (lck != NULL) {
248 	erts_rwmtx_runlock(lck);
249     }
250 }
251 
WUNLOCK_HASH(erts_rwmtx_t * lck)252 static ERTS_INLINE void WUNLOCK_HASH(erts_rwmtx_t* lck)
253 {
254     if (lck != NULL) {
255 	erts_rwmtx_rwunlock(lck);
256     }
257 }
258 
259 
260 #ifdef ERTS_ENABLE_LOCK_CHECK
261 #  define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd))
262 #  define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval)))
263 #  define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_lc_rwmtx_is_rwlocked(lck))
264 #  define IS_TAB_WLOCKED(tb) erts_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock)
265 #else
266 #  define IS_HASH_RLOCKED(tb,hval) (1)
267 #  define IS_HASH_WLOCKED(tb,hval) (1)
268 #  define IS_TAB_WLOCKED(tb) (1)
269 #endif
270 
271 
272 /* Iteration helper
273 ** Returns "next" slot index or 0 if EOT reached.
274 ** Slot READ locks updated accordingly, unlocked if EOT.
275 */
next_slot(DbTableHash * tb,Uint ix,erts_rwmtx_t ** lck_ptr)276 static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix,
277 				  erts_rwmtx_t** lck_ptr)
278 {
279     ix += DB_HASH_LOCK_CNT;
280     if (ix < NACTIVE(tb)) return ix;
281     RUNLOCK_HASH(*lck_ptr);
282     ix = (ix + 1) & DB_HASH_LOCK_MASK;
283     if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix);
284     return ix;
285 }
286 /* Same as next_slot but with WRITE locking */
next_slot_w(DbTableHash * tb,Uint ix,erts_rwmtx_t ** lck_ptr)287 static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix,
288 				    erts_rwmtx_t** lck_ptr)
289 {
290     ix += DB_HASH_LOCK_CNT;
291     if (ix < NACTIVE(tb)) return ix;
292     WUNLOCK_HASH(*lck_ptr);
293     ix = (ix + 1) & DB_HASH_LOCK_MASK;
294     if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix);
295     return ix;
296 }
297 
298 
299 
free_term(DbTableHash * tb,HashDbTerm * p)300 static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p)
301 {
302     db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm));
303 }
304 
free_term_list(DbTableHash * tb,HashDbTerm * p)305 static ERTS_INLINE void free_term_list(DbTableHash *tb, HashDbTerm* p)
306 {
307     while (p) {
308         HashDbTerm* next = p->next;
309         free_term(tb, p);
310         p = next;
311     }
312 }
313 
314 
315 /*
316  * Local types
317  */
318 struct mp_prefound {
319     HashDbTerm** bucket;
320     int ix;
321 };
322 
323 struct mp_info {
324     int something_can_match;	/* The match_spec is not "impossible" */
325     int key_given;
326     struct mp_prefound dlists[10];  /* Default list of "pre-found" buckets */
327     struct mp_prefound* lists;   /* Buckets to search if keys are given,
328 				  * = dlists initially */
329     unsigned num_lists;         /* Number of elements in "lists",
330 				 * = 0 initially */
331     Binary *mp;                 /* The compiled match program */
332 };
333 
334 /* A table segment */
335 struct segment {
336     HashDbTerm* buckets[1];
337 };
338 #define SIZEOF_SEGMENT(N) \
339     (offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N))
340 
341 /* An extended segment table */
342 struct ext_segtab {
343     ErtsThrPrgrLaterOp lop;
344     struct segment** prev_segtab;  /* Used when table is shrinking */
345     int prev_nsegs;                /* Size of prev_segtab */
346     int nsegs;                     /* Size of this segtab */
347     struct segment* segtab[1];     /* The segment table */
348 };
349 #define SIZEOF_EXT_SEGTAB(NSEGS) \
350     (offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS))
351 
352 
SET_SEGTAB(DbTableHash * tb,struct segment ** segtab)353 static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
354 				   struct segment** segtab)
355 {
356     if (DB_USING_FINE_LOCKING(tb))
357 	erts_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
358     else
359 	erts_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab);
360 }
361 
362 /* Used by select_replace on analyze_pattern */
363 typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);
364 
365 /*
366 ** Forward decl's (static functions)
367 */
368 static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix);
369 static void alloc_seg(DbTableHash *tb);
370 static int free_seg(DbTableHash *tb, int free_records);
371 static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
372 			     HashDbTerm *list);
373 static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
374 			       HashValue hval, HashDbTerm *list);
375 static void shrink(DbTableHash* tb, int nitems);
376 static void grow(DbTableHash* tb, int nitems);
377 static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
378 			   Uint sz, DbTableHash*);
379 static int analyze_pattern(DbTableHash *tb, Eterm pattern,
380                            extra_match_validator_t extra_validator, /* Optional callback */
381                            struct mp_info *mpi);
382 
383 /*
384  *  Method interface functions
385  */
386 static int db_first_hash(Process *p,
387 			 DbTable *tbl,
388 			 Eterm *ret);
389 
390 static int db_next_hash(Process *p,
391 			DbTable *tbl,
392 			Eterm key,
393 			Eterm *ret);
394 
395 static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret);
396 
397 static int db_get_element_hash(Process *p, DbTable *tbl,
398 			       Eterm key, int ndex, Eterm *ret);
399 
400 static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret);
401 
402 static int db_slot_hash(Process *p, DbTable *tbl,
403 			Eterm slot_term, Eterm *ret);
404 
405 static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
406 				Eterm pattern, Sint chunk_size,
407 				int reverse, Eterm *ret);
408 static int db_select_hash(Process *p, DbTable *tbl, Eterm tid,
409 			  Eterm pattern, int reverse, Eterm *ret);
410 static int db_select_continue_hash(Process *p, DbTable *tbl,
411 				   Eterm continuation, Eterm *ret);
412 
413 static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
414 				Eterm pattern, Eterm *ret);
415 static int db_select_count_continue_hash(Process *p, DbTable *tbl,
416 					 Eterm continuation, Eterm *ret);
417 
418 static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
419 				 Eterm pattern, Eterm *ret);
420 static int db_select_delete_continue_hash(Process *p, DbTable *tbl,
421 					  Eterm continuation, Eterm *ret);
422 
423 static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid,
424                                   Eterm pattern, Eterm *ret);
425 static int db_select_replace_continue_hash(Process *p, DbTable *tbl,
426                                            Eterm continuation, Eterm *ret);
427 
428 static int db_take_hash(Process *, DbTable *, Eterm, Eterm *);
429 static void db_print_hash(fmtfn_t to,
430 			  void *to_arg,
431 			  int show,
432 			  DbTable *tbl);
433 static int db_free_empty_table_hash(DbTable *tbl);
434 
435 static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds);
436 
437 
438 static void db_foreach_offheap_hash(DbTable *,
439 				    void (*)(ErlOffHeap *, void *),
440 				    void *);
441 
442 static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds);
443 #ifdef HARDDEBUG
444 static void db_check_table_hash(DbTableHash *tb);
445 #endif
446 static int
447 db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
448                       DbUpdateHandle* handle);
449 static void
450 db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle);
451 
try_shrink(DbTableHash * tb)452 static ERTS_INLINE void try_shrink(DbTableHash* tb)
453 {
454     int nactive = NACTIVE(tb);
455     int nitems = NITEMS(tb);
456     if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)
457 	&& !IS_FIXED(tb)) {
458 	shrink(tb, nitems);
459     }
460 }
461 
462 /* Is this a live object (not pseodo-deleted) with the specified key?
463 */
has_live_key(DbTableHash * tb,HashDbTerm * b,Eterm key,HashValue hval)464 static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b,
465 				    Eterm key, HashValue hval)
466 {
467     if (b->hvalue != hval || is_pseudo_deleted(b))
468         return 0;
469     else {
470 	Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
471 	ASSERT(!is_header(itemKey));
472 	return EQ(key, itemKey);
473     }
474 }
475 
476 /* Has this object the specified key? Can be pseudo-deleted.
477 */
has_key(DbTableHash * tb,HashDbTerm * b,Eterm key,HashValue hval)478 static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b,
479 			       Eterm key, HashValue hval)
480 {
481     if (b->hvalue != hval)
482         return 0;
483     else {
484 	Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
485 	ASSERT(!is_header(itemKey));
486 	return EQ(key, itemKey);
487     }
488 }
489 
new_dbterm(DbTableHash * tb,Eterm obj)490 static ERTS_INLINE HashDbTerm* new_dbterm(DbTableHash* tb, Eterm obj)
491 {
492     HashDbTerm* p;
493     if (tb->common.compress) {
494 	p = db_store_term_comp(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
495     }
496     else {
497 	p = db_store_term(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
498     }
499     return p;
500 }
501 
replace_dbterm(DbTableHash * tb,HashDbTerm * old,Eterm obj)502 static ERTS_INLINE HashDbTerm* replace_dbterm(DbTableHash* tb, HashDbTerm* old,
503 					      Eterm obj)
504 {
505     HashDbTerm* ret;
506     ASSERT(old != NULL);
507     if (tb->common.compress) {
508 	ret = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
509     }
510     else {
511 	ret = db_store_term(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
512     }
513     return ret;
514 }
515 
516 
517 
518 /*
519 ** External interface
520 */
521 DbTableMethod db_hash =
522 {
523     db_create_hash,
524     db_first_hash,
525     db_next_hash,
526     db_first_hash,   /* last == first  */
527     db_next_hash,    /* prev == next   */
528     db_put_hash,
529     db_get_hash,
530     db_get_element_hash,
531     db_member_hash,
532     db_erase_hash,
533     db_erase_object_hash,
534     db_slot_hash,
535     db_select_chunk_hash,
536     db_select_hash,
537     db_select_delete_hash,
538     db_select_continue_hash, /* hmm continue_hash? */
539     db_select_delete_continue_hash,
540     db_select_count_hash,
541     db_select_count_continue_hash,
542     db_select_replace_hash,
543     db_select_replace_continue_hash,
544     db_take_hash,
545     db_delete_all_objects_hash,
546     db_free_empty_table_hash,
547     db_free_table_continue_hash,
548     db_print_hash,
549     db_foreach_offheap_hash,
550     db_lookup_dbterm_hash,
551     db_finalize_dbterm_hash
552 };
553 
554 #ifdef DEBUG
555 /* Wait a while to provoke race and get code coverage */
DEBUG_WAIT(void)556 static void DEBUG_WAIT(void)
557 {
558     unsigned long spin = 1UL << 20;
559     while (--spin);
560 }
561 #else
562 #  define DEBUG_WAIT()
563 #endif
564 
565 /* Rare case of restoring the rest of the fixdel list
566    when "unfixer" gets interrupted by "fixer" */
restore_fixdel(DbTableHash * tb,FixedDeletion * fixdel)567 static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
568 {
569     /*int tries = 0;*/
570     DEBUG_WAIT();
571     if (erts_atomic_cmpxchg_relb(&tb->fixdel,
572 				     (erts_aint_t) fixdel,
573 				     (erts_aint_t) NULL) != (erts_aint_t) NULL) {
574 	/* Oboy, must join lists */
575 	FixedDeletion* last = fixdel;
576 	erts_aint_t was_tail;
577 	erts_aint_t exp_tail;
578 
579 	while (last->next != NULL) last = last->next;
580 	was_tail = erts_atomic_read_acqb(&tb->fixdel);
581 	do { /* Lockless atomic list insertion */
582 	    exp_tail = was_tail;
583 	    last->next = (FixedDeletion*) exp_tail;
584 	    /*++tries;*/
585 	    DEBUG_WAIT();
586 	    was_tail = erts_atomic_cmpxchg_relb(&tb->fixdel,
587 						    (erts_aint_t) fixdel,
588 						    exp_tail);
589 	}while (was_tail != exp_tail);
590     }
591     /*erts_fprintf(stderr,"erl_db_hash: restore_fixdel tries=%d\r\n", tries);*/
592 }
593 /*
594 ** Table interface routines ie what's called by the bif's
595 */
596 
db_unfix_table_hash(DbTableHash * tb)597 SWord db_unfix_table_hash(DbTableHash *tb)
598 {
599     FixedDeletion* fixdel;
600     SWord work = 0;
601 
602     ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
603                    || (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock)
604                        && !tb->common.is_thread_safe));
605 restart:
606     fixdel = (FixedDeletion*) erts_atomic_xchg_mb(&tb->fixdel,
607                                                   (erts_aint_t) NULL);
608     while (fixdel) {
609         FixedDeletion *free_me;
610 
611         do {
612             HashDbTerm **bp;
613             HashDbTerm *b;
614             HashDbTerm *free_us = NULL;
615             erts_rwmtx_t* lck;
616 
617             lck = WLOCK_HASH(tb, fixdel->slot);
618 
619             if (IS_FIXED(tb)) { /* interrupted by fixer */
620                 WUNLOCK_HASH(lck);
621                 restore_fixdel(tb,fixdel);
622                 if (!IS_FIXED(tb)) {
623                     goto restart; /* unfixed again! */
624                 }
625                 return work;
626             }
627             if (fixdel->slot < NACTIVE(tb)) {
628                 bp = &BUCKET(tb, fixdel->slot);
629                 b = *bp;
630 
631                 while (b != NULL) {
632                     if (is_pseudo_deleted(b)) {
633                         HashDbTerm* nxt = b->next;
634                         b->next = free_us;
635                         free_us = b;
636                         work++;
637                         b = *bp = nxt;
638                     } else {
639                         bp = &b->next;
640                         b = b->next;
641                     }
642                 }
643             }
644             /* else slot has been joined and purged by shrink() */
645             WUNLOCK_HASH(lck);
646             free_term_list(tb, free_us);
647 
648         }while (fixdel->all && fixdel->slot-- > 0);
649 
650         free_me = fixdel;
651         fixdel = fixdel->next;
652         free_fixdel(tb, free_me);
653         work++;
654     }
655 
656     /* ToDo: Maybe try grow/shrink the table as well */
657 
658     return work;
659 }
660 
db_create_hash(Process * p,DbTable * tbl)661 int db_create_hash(Process *p, DbTable *tbl)
662 {
663     DbTableHash *tb = &tbl->hash;
664 
665     erts_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK);
666     erts_atomic_init_nob(&tb->nactive, FIRST_SEGSZ);
667     erts_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
668     erts_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
669     SET_SEGTAB(tb, tb->first_segtab);
670     tb->nsegs = NSEG_1;
671     tb->nslots = FIRST_SEGSZ;
672     tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
673                                                           (DbTable *) tb,
674                                                           SIZEOF_SEGMENT(FIRST_SEGSZ));
675     sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ));
676 
677     erts_atomic_init_nob(&tb->is_resizing, 0);
678     if (tb->common.type & DB_FINE_LOCKED) {
679 	erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
680 	int i;
681 	if (tb->common.type & DB_FREQ_READ)
682 	    rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
683 	if (erts_ets_rwmtx_spin_count >= 0)
684 	    rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
685 	tb->locks = (DbTableHashFineLocks*) erts_db_alloc(ERTS_ALC_T_DB_SEG, /* Other type maybe? */
686                                                           (DbTable *) tb,
687                                                           sizeof(DbTableHashFineLocks));
688 	for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
689             erts_rwmtx_init_opt(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
690                 "db_hash_slot", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
691 	}
692 	/* This important property is needed to guarantee the two buckets
693     	 * involved in a grow/shrink operation it protected by the same lock:
694 	 */
695 	ASSERT(erts_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
696     }
697     else { /* coarse locking */
698 	tb->locks = NULL;
699     }
700     ERTS_THR_MEMORY_BARRIER;
701     return DB_ERROR_NONE;
702 }
703 
db_first_hash(Process * p,DbTable * tbl,Eterm * ret)704 static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret)
705 {
706     DbTableHash *tb = &tbl->hash;
707     Uint ix = 0;
708     erts_rwmtx_t* lck = RLOCK_HASH(tb,ix);
709     HashDbTerm* list;
710 
711     list = BUCKET(tb,ix);
712     list = next_live(tb, &ix, &lck, list);
713 
714     if (list != NULL) {
715 	*ret = db_copy_key(p, tbl, &list->dbterm);
716 	RUNLOCK_HASH(lck);
717     }
718     else {
719 	*ret = am_EOT;
720     }
721     return DB_ERROR_NONE;
722 }
723 
724 
db_next_hash(Process * p,DbTable * tbl,Eterm key,Eterm * ret)725 static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
726 {
727     DbTableHash *tb = &tbl->hash;
728     HashValue hval;
729     Uint ix;
730     HashDbTerm* b;
731     erts_rwmtx_t* lck;
732 
733     hval = MAKE_HASH(key);
734     lck = RLOCK_HASH(tb,hval);
735     ix = hash_to_ix(tb, hval);
736     b = BUCKET(tb, ix);
737 
738     for (;;) {
739 	if (b == NULL) {
740 	    RUNLOCK_HASH(lck);
741 	    return DB_ERROR_BADKEY;
742 	}
743 	if (has_key(tb, b, key, hval)) {
744 	    break;
745 	}
746 	b = b->next;
747     }
748     /* Key found */
749 
750     b = next_live(tb, &ix, &lck, b->next);
751     if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
752 	while (b != 0) {
753 	    if (!has_live_key(tb, b, key, hval)) {
754 		break;
755 	    }
756 	    b = next_live(tb, &ix, &lck, b->next);
757 	}
758     }
759     if (b == NULL) {
760 	*ret = am_EOT;
761     }
762     else {
763 	*ret = db_copy_key(p, tbl, &b->dbterm);
764 	RUNLOCK_HASH(lck);
765     }
766     return DB_ERROR_NONE;
767 }
768 
db_put_hash(DbTable * tbl,Eterm obj,int key_clash_fail)769 int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
770 {
771     DbTableHash *tb = &tbl->hash;
772     HashValue hval;
773     int ix;
774     Eterm key;
775     HashDbTerm** bp;
776     HashDbTerm* b;
777     HashDbTerm* q;
778     erts_rwmtx_t* lck;
779     int nitems;
780     int ret = DB_ERROR_NONE;
781 
782     key = GETKEY(tb, tuple_val(obj));
783     hval = MAKE_HASH(key);
784     lck = WLOCK_HASH(tb, hval);
785     ix = hash_to_ix(tb, hval);
786     bp = &BUCKET(tb, ix);
787     b = *bp;
788 
789     for (;;) {
790 	if (b == NULL) {
791 	    goto Lnew;
792 	}
793 	if (has_key(tb,b,key,hval)) {
794 	    break;
795 	}
796 	bp = &b->next;
797 	b = b->next;
798     }
799     /* Key found
800     */
801     if (tb->common.status & DB_SET) {
802 	HashDbTerm* bnext = b->next;
803 	if (is_pseudo_deleted(b)) {
804 	    erts_atomic_inc_nob(&tb->common.nitems);
805             b->pseudo_deleted = 0;
806 	}
807 	else if (key_clash_fail) {
808 	    ret = DB_ERROR_BADKEY;
809 	    goto Ldone;
810 	}
811 	q = replace_dbterm(tb, b, obj);
812 	q->next = bnext;
813 	ASSERT(q->hvalue == hval);
814 	*bp = q;
815 	goto Ldone;
816     }
817     else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */
818 	q = b;
819 	do {
820 	    if (!is_pseudo_deleted(q)) {
821 		ret = DB_ERROR_BADKEY;
822 		goto Ldone;
823 	    }
824 	    q = q->next;
825 	}while (q != NULL && has_key(tb,q,key,hval));
826     }
827     else if (tb->common.status & DB_BAG) {
828 	HashDbTerm** qp = bp;
829 	q = b;
830 	do {
831 	    if (db_eq(&tb->common,obj,&q->dbterm)) {
832 		if (is_pseudo_deleted(q)) {
833 		    erts_atomic_inc_nob(&tb->common.nitems);
834                     q->pseudo_deleted = 0;
835 		    ASSERT(q->hvalue == hval);
836 		    if (q != b) { /* must move to preserve key insertion order */
837 			*qp = q->next;
838 			q->next = b;
839 			*bp = q;
840 		    }
841 		}
842 		goto Ldone;
843 	    }
844 	    qp = &q->next;
845 	    q = *qp;
846 	}while (q != NULL && has_key(tb,q,key,hval));
847     }
848     /*else DB_DUPLICATE_BAG */
849 
850 Lnew:
851     q = new_dbterm(tb, obj);
852     q->hvalue = hval;
853     q->pseudo_deleted = 0;
854     q->next = b;
855     *bp = q;
856     nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
857     WUNLOCK_HASH(lck);
858     {
859 	int nactive = NACTIVE(tb);
860 	if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
861 	    grow(tb, nitems);
862 	}
863     }
864     return DB_ERROR_NONE;
865 
866 Ldone:
867     WUNLOCK_HASH(lck);
868     return ret;
869 }
870 
871 static Eterm
get_term_list(Process * p,DbTableHash * tb,Eterm key,HashValue hval,HashDbTerm * b1,HashDbTerm ** bend)872 get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval,
873               HashDbTerm *b1, HashDbTerm **bend)
874 {
875     HashDbTerm* b2 = b1->next;
876     Eterm copy;
877     Uint sz = b1->dbterm.size + 2;
878 
879     if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
880         while (b2 && has_key(tb, b2, key, hval)) {
881 	    if (!is_pseudo_deleted(b2))
882 		sz += b2->dbterm.size + 2;
883 
884             b2 = b2->next;
885         }
886     }
887     copy = build_term_list(p, b1, b2, sz, tb);
888     if (bend) {
889         *bend = b2;
890     }
891     return copy;
892 }
893 
db_get_hash(Process * p,DbTable * tbl,Eterm key,Eterm * ret)894 int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
895 {
896     DbTableHash *tb = &tbl->hash;
897     HashValue hval;
898     int ix;
899     HashDbTerm* b;
900     erts_rwmtx_t* lck;
901 
902     hval = MAKE_HASH(key);
903     lck = RLOCK_HASH(tb,hval);
904     ix = hash_to_ix(tb, hval);
905     b = BUCKET(tb, ix);
906 
907     while(b != 0) {
908         if (has_live_key(tb, b, key, hval)) {
909             *ret = get_term_list(p, tb, key, hval, b, NULL);
910 	    goto done;
911 	}
912         b = b->next;
913     }
914     *ret = NIL;
915 done:
916     RUNLOCK_HASH(lck);
917     return DB_ERROR_NONE;
918 }
919 
db_member_hash(DbTable * tbl,Eterm key,Eterm * ret)920 static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
921 {
922     DbTableHash *tb = &tbl->hash;
923     HashValue hval;
924     int ix;
925     HashDbTerm* b1;
926     erts_rwmtx_t* lck;
927 
928     hval = MAKE_HASH(key);
929     ix = hash_to_ix(tb, hval);
930     lck = RLOCK_HASH(tb, hval);
931     b1 = BUCKET(tb, ix);
932 
933     while(b1 != 0) {
934 	if (has_live_key(tb,b1,key,hval)) {
935 	    *ret = am_true;
936 	    goto done;
937 	}
938 	b1 = b1->next;
939     }
940     *ret = am_false;
941 done:
942     RUNLOCK_HASH(lck);
943     return DB_ERROR_NONE;
944 }
945 
db_get_element_hash(Process * p,DbTable * tbl,Eterm key,int ndex,Eterm * ret)946 static int db_get_element_hash(Process *p, DbTable *tbl,
947 			       Eterm key,
948 			       int ndex,
949 			       Eterm *ret)
950 {
951     DbTableHash *tb = &tbl->hash;
952     HashValue hval;
953     int ix;
954     HashDbTerm* b1;
955     erts_rwmtx_t* lck;
956     int retval;
957 
958     hval = MAKE_HASH(key);
959     lck = RLOCK_HASH(tb, hval);
960     ix = hash_to_ix(tb, hval);
961     b1 = BUCKET(tb, ix);
962 
963 
964     while(b1 != 0) {
965 	if (has_live_key(tb,b1,key,hval)) {
966 	    if (ndex > arityval(b1->dbterm.tpl[0])) {
967 		retval = DB_ERROR_BADITEM;
968 		goto done;
969 	    }
970 	    if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
971 		HashDbTerm* b;
972 		HashDbTerm* b2 = b1->next;
973 		Eterm elem_list = NIL;
974 
975 		while(b2 != NULL && has_key(tb,b2,key,hval)) {
976 		    if (ndex > arityval(b2->dbterm.tpl[0])
977 			&& !is_pseudo_deleted(b2)) {
978 			retval = DB_ERROR_BADITEM;
979 			goto done;
980 		    }
981 		    b2 = b2->next;
982 		}
983 		b = b1;
984 		while(b != b2) {
985 		    if (!is_pseudo_deleted(b)) {
986 			Eterm *hp;
987 			Eterm copy = db_copy_element_from_ets(&tb->common, p,
988 							      &b->dbterm, ndex, &hp, 2);
989 			elem_list = CONS(hp, copy, elem_list);
990 		    }
991 		    b = b->next;
992 		}
993 		*ret = elem_list;
994 	    }
995 	    else {
996 		Eterm* hp;
997 		*ret = db_copy_element_from_ets(&tb->common, p, &b1->dbterm, ndex, &hp, 0);
998 	    }
999 	    retval = DB_ERROR_NONE;
1000 	    goto done;
1001 	}
1002 	b1 = b1->next;
1003     }
1004     retval = DB_ERROR_BADKEY;
1005 done:
1006     RUNLOCK_HASH(lck);
1007     return retval;
1008 }
1009 
1010 /*
1011 ** NB, this is for the db_erase/2 bif.
1012 */
db_erase_hash(DbTable * tbl,Eterm key,Eterm * ret)1013 int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
1014 {
1015     DbTableHash *tb = &tbl->hash;
1016     HashValue hval;
1017     int ix;
1018     HashDbTerm** bp;
1019     HashDbTerm* b;
1020     HashDbTerm* free_us = NULL;
1021     erts_rwmtx_t* lck;
1022     int nitems_diff = 0;
1023 
1024     hval = MAKE_HASH(key);
1025     lck = WLOCK_HASH(tb,hval);
1026     ix = hash_to_ix(tb, hval);
1027     bp = &BUCKET(tb, ix);
1028     b = *bp;
1029 
1030     while(b != 0) {
1031 	if (has_live_key(tb,b,key,hval)) {
1032 	    --nitems_diff;
1033 	    if (nitems_diff == -1 && IS_FIXED(tb)
1034                 && add_fixed_deletion(tb, ix, 0)) {
1035 		/* Pseudo remove (no need to keep several of same key) */
1036 		b->pseudo_deleted = 1;
1037 	    } else {
1038 		HashDbTerm* next = b->next;
1039                 b->next = free_us;
1040 		free_us = b;
1041 		b = *bp = next;
1042 		continue;
1043 	    }
1044 	}
1045 	else {
1046 	    if (nitems_diff && !is_pseudo_deleted(b))
1047 		break;
1048 	}
1049 	bp = &b->next;
1050 	b = b->next;
1051     }
1052     WUNLOCK_HASH(lck);
1053     if (nitems_diff) {
1054 	erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
1055 	try_shrink(tb);
1056     }
1057     free_term_list(tb, free_us);
1058     *ret = am_true;
1059     return DB_ERROR_NONE;
1060 }
1061 
1062 /*
1063 ** This is for the ets:delete_object BIF
1064 */
db_erase_object_hash(DbTable * tbl,Eterm object,Eterm * ret)1065 static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
1066 {
1067     DbTableHash *tb = &tbl->hash;
1068     HashValue hval;
1069     int ix;
1070     HashDbTerm** bp;
1071     HashDbTerm* b;
1072     HashDbTerm* free_us = NULL;
1073     erts_rwmtx_t* lck;
1074     int nitems_diff = 0;
1075     int nkeys = 0;
1076     Eterm key;
1077 
1078     key = GETKEY(tb, tuple_val(object));
1079     hval = MAKE_HASH(key);
1080     lck = WLOCK_HASH(tb,hval);
1081     ix = hash_to_ix(tb, hval);
1082     bp = &BUCKET(tb, ix);
1083     b = *bp;
1084 
1085     while(b != 0) {
1086 	if (has_live_key(tb,b,key,hval)) {
1087 	    ++nkeys;
1088 	    if (db_eq(&tb->common,object, &b->dbterm)) {
1089 		--nitems_diff;
1090 		if (nkeys==1 && IS_FIXED(tb) && add_fixed_deletion(tb,ix,0)) {
1091 		    b->pseudo_deleted = 1;
1092 		    bp = &b->next;
1093 		    b = b->next;
1094 		} else {
1095                     HashDbTerm* next = b->next;
1096 		    b->next = free_us;
1097                     free_us = b;
1098 		    b = *bp = next;
1099 		}
1100 		if (tb->common.status & (DB_DUPLICATE_BAG)) {
1101 		    continue;
1102 		} else {
1103 		    break;
1104 		}
1105 	    }
1106 	}
1107 	else if (nitems_diff && !is_pseudo_deleted(b)) {
1108 	    break;
1109 	}
1110 	bp = &b->next;
1111 	b = b->next;
1112     }
1113     WUNLOCK_HASH(lck);
1114     if (nitems_diff) {
1115 	erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
1116 	try_shrink(tb);
1117     }
1118     free_term_list(tb, free_us);
1119     *ret = am_true;
1120     return DB_ERROR_NONE;
1121 }
1122 
1123 
db_slot_hash(Process * p,DbTable * tbl,Eterm slot_term,Eterm * ret)1124 static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret)
1125 {
1126     DbTableHash *tb = &tbl->hash;
1127     erts_rwmtx_t* lck;
1128     Sint slot;
1129     int retval;
1130     int nactive;
1131 
1132     if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0)) {
1133 	return DB_ERROR_BADPARAM;
1134     }
1135     lck = RLOCK_HASH(tb, slot);
1136     nactive = NACTIVE(tb);
1137     if (slot < nactive) {
1138 	*ret = build_term_list(p, BUCKET(tb, slot), NULL, 0, tb);
1139 	retval = DB_ERROR_NONE;
1140     }
1141     else if (slot == nactive) {
1142 	*ret = am_EOT;
1143 	retval = DB_ERROR_NONE;
1144     }
1145     else {
1146 	retval = DB_ERROR_BADPARAM;
1147     }
1148     RUNLOCK_HASH(lck);
1149     return retval;
1150 }
1151 
1152 
1153 /*
1154  * Match traversal callbacks
1155  */
1156 
1157 typedef struct match_callbacks_t_ match_callbacks_t;
1158 struct match_callbacks_t_
1159 {
1160 /* Called when no match is possible.
1161  *      context_ptr: Pointer to context
1162  *      ret: Pointer to traversal function term return.
1163  *
1164  * Both the direct return value and 'ret' are used as the traversal function return values.
1165  */
1166     int (*on_nothing_can_match)(match_callbacks_t* ctx, Eterm* ret);
1167 
1168 /* Called for each match result.
1169  *      context_ptr: Pointer to context
1170  *      slot_ix: Current slot index
1171  *      current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element;
1172  *                       can be (carefully) used to adjust iteration when deleting or replacing elements.
1173  *      match_res: The result of running the match program against the current term.
1174  *
1175  * Should return 1 for successful match, 0 otherwise.
1176  */
1177     int (*on_match_res)(match_callbacks_t* ctx, Sint slot_ix,
1178                         HashDbTerm*** current_ptr_ptr, Eterm match_res);
1179 
1180 /* Called when either we've matched enough elements in this cycle or EOT was reached.
1181  *      context_ptr: Pointer to context
1182  *      slot_ix: Current slot index
1183  *      got: How many elements have been matched so far
1184  *      iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle
1185  *      mpp: Double pointer to the compiled match program
1186  *      ret: Pointer to traversal function term return.
1187  *
1188  * Both the direct return value and 'ret' are used as the traversal function return values.
1189  * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
1190  */
1191     int (*on_loop_ended)(match_callbacks_t* ctx, Sint slot_ix, Sint got,
1192                          Sint iterations_left, Binary** mpp, Eterm* ret);
1193 
1194 /* Called when it's time to trap
1195  *      context_ptr: Pointer to context
1196  *      slot_ix: Current slot index
1197  *      got: How many elements have been matched so far
1198  *      mpp: Double pointer to the compiled match program
1199  *      ret: Pointer to traversal function term return.
1200  *
1201  * Both the direct return value and 'ret' are used as the traversal function return values.
1202  * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
1203  */
1204     int (*on_trap)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Binary** mpp,
1205                    Eterm* ret);
1206 
1207 };
1208 
1209 
1210 /*
1211  * Begin hash table match traversal
1212  */
match_traverse(Process * p,DbTableHash * tb,Eterm pattern,extra_match_validator_t extra_match_validator,Sint chunk_size,Sint iterations_left,Eterm ** hpp,int lock_for_write,match_callbacks_t * ctx,Eterm * ret)1213 static int match_traverse(Process* p, DbTableHash* tb,
1214                           Eterm pattern,
1215                           extra_match_validator_t extra_match_validator, /* Optional */
1216                           Sint chunk_size,      /* If 0, no chunking */
1217                           Sint iterations_left, /* Nr. of iterations left */
1218                           Eterm** hpp,          /* Heap */
1219                           int lock_for_write,   /* Set to 1 if we're going to delete or
1220                                                    modify existing terms */
1221                           match_callbacks_t* ctx,
1222                           Eterm* ret)
1223 {
1224     Sint slot_ix;                  /* Slot index */
1225     HashDbTerm** current_ptr;      /* Refers to either the bucket pointer or
1226                                     * the 'next' pointer in the previous term
1227                                     */
1228     HashDbTerm* saved_current;     /* Helper to avoid double skip on match */
1229     struct mp_info mpi;
1230     unsigned current_list_pos = 0; /* Prefound buckets list index */
1231     Eterm match_res;
1232     Sint got = 0;                  /* Matched terms counter */
1233     erts_rwmtx_t* lck;         /* Slot lock */
1234     int ret_value;
1235     erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
1236         = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
1237     void (*unlock_hash_function)(erts_rwmtx_t*)
1238         = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
1239     Sint (*next_slot_function)(DbTableHash*, Uint, erts_rwmtx_t**)
1240         = (lock_for_write ? next_slot_w : next_slot);
1241 
1242     if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi))
1243             != DB_ERROR_NONE)
1244     {
1245         *ret = NIL;
1246         goto done;
1247     }
1248 
1249     if (!mpi.something_can_match) {
1250         /* Can't possibly match anything */
1251         ret_value = ctx->on_nothing_can_match(ctx, ret);
1252         goto done;
1253     }
1254 
1255     /*
1256      * Look for initial slot / bucket
1257      */
1258     if (!mpi.key_given) {
1259         /* Run this code if pattern is variable or GETKEY(pattern)  */
1260         /* is a variable                                            */
1261         slot_ix = 0;
1262         lck = lock_hash_function(tb,slot_ix);
1263         for (;;) {
1264             ASSERT(slot_ix < NACTIVE(tb));
1265             if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) {
1266                 break;
1267             }
1268             slot_ix = next_slot_function(tb,slot_ix,&lck);
1269             if (slot_ix == 0) {
1270                 ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left,
1271                                                &mpi.mp, ret);
1272                 goto done;
1273             }
1274         }
1275     } else {
1276         /* We have at least one */
1277         slot_ix = mpi.lists[current_list_pos].ix;
1278         lck = lock_hash_function(tb, slot_ix);
1279         current_ptr = mpi.lists[current_list_pos].bucket;
1280         ASSERT(*current_ptr == BUCKET(tb,slot_ix));
1281         ++current_list_pos;
1282     }
1283 
1284     /*
1285      * Execute traversal cycle
1286      */
1287     for(;;) {
1288         if (*current_ptr != NULL) {
1289             if (!is_pseudo_deleted(*current_ptr)) {
1290                 DbTerm* obj = &(*current_ptr)->dbterm;
1291                 if (tb->common.compress)
1292                     obj = db_alloc_tmp_uncompressed(&tb->common, obj);
1293                 match_res = db_match_dbterm_uncompressed(&tb->common, p, mpi.mp,
1294                                                          obj, hpp, 2);
1295                 saved_current = *current_ptr;
1296                 if (ctx->on_match_res(ctx, slot_ix, &current_ptr, match_res)) {
1297                     ++got;
1298                 }
1299                 if (tb->common.compress)
1300                     db_free_tmp_uncompressed(obj);
1301 
1302                 --iterations_left;
1303                 if (*current_ptr != saved_current) {
1304                     /* Don't advance to next, the callback did it already */
1305                     continue;
1306                 }
1307             }
1308             current_ptr = &((*current_ptr)->next);
1309         }
1310         else if (mpi.key_given) {  /* Key is bound */
1311             unlock_hash_function(lck);
1312             if (current_list_pos == mpi.num_lists) {
1313                 ret_value = ctx->on_loop_ended(ctx, -1, got, iterations_left, &mpi.mp, ret);
1314                 goto done;
1315             } else {
1316                 slot_ix = mpi.lists[current_list_pos].ix;
1317                 lck = lock_hash_function(tb, slot_ix);
1318                 current_ptr = mpi.lists[current_list_pos].bucket;
1319                 ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
1320                 ++current_list_pos;
1321             }
1322         }
1323         else { /* Key is variable */
1324             if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) {
1325                 slot_ix = -1;
1326                 break;
1327             }
1328             if (chunk_size && got >= chunk_size) {
1329                 unlock_hash_function(lck);
1330                 break;
1331             }
1332             if (iterations_left <= 0) {
1333                 unlock_hash_function(lck);
1334                 ret_value = ctx->on_trap(ctx, slot_ix, got, &mpi.mp, ret);
1335                 goto done;
1336             }
1337             current_ptr = &BUCKET(tb,slot_ix);
1338         }
1339     }
1340 
1341     ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, &mpi.mp, ret);
1342 
1343 done:
1344     /* We should only jump directly to this label if
1345      * we've already called ctx->nothing_can_match / loop_ended / trap
1346      */
1347     if (mpi.mp != NULL) {
1348         erts_bin_free(mpi.mp);
1349     }
1350     if (mpi.lists != mpi.dlists) {
1351         erts_free(ERTS_ALC_T_DB_SEL_LIST,
1352                 (void *) mpi.lists);
1353     }
1354     return ret_value;
1355 
1356 }
1357 
1358 /*
1359  * Continue hash table match traversal
1360  */
match_traverse_continue(Process * p,DbTableHash * tb,Sint chunk_size,Sint iterations_left,Eterm ** hpp,Sint slot_ix,Sint got,Binary ** mpp,int lock_for_write,match_callbacks_t * ctx,Eterm * ret)1361 static int match_traverse_continue(Process* p, DbTableHash* tb,
1362                                    Sint chunk_size,      /* If 0, no chunking */
1363                                    Sint iterations_left, /* Nr. of iterations left */
1364                                    Eterm** hpp,          /* Heap */
1365                                    Sint slot_ix,         /* Slot index to resume traversal from */
1366                                    Sint got,             /* Matched terms counter */
1367                                    Binary** mpp,         /* Existing match program */
1368                                    int lock_for_write,   /* Set to 1 if we're going to delete or
1369                                                             modify existing terms */
1370                                    match_callbacks_t* ctx,
1371                                    Eterm* ret)
1372 {
1373     HashDbTerm** current_ptr;  /* Refers to either the bucket pointer or
1374                                        * the 'next' pointer in the previous term
1375                                        */
1376     HashDbTerm* saved_current; /* Helper to avoid double skip on match */
1377     Eterm match_res;
1378     erts_rwmtx_t* lck;
1379     int ret_value;
1380     erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
1381         = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
1382     void (*unlock_hash_function)(erts_rwmtx_t*)
1383         = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
1384     Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr)
1385         = (lock_for_write ? next_slot_w : next_slot);
1386 
1387     if (got < 0) {
1388         *ret = NIL;
1389         return DB_ERROR_BADPARAM;
1390     }
1391 
1392     if (slot_ix < 0 /* EOT */
1393        || (chunk_size && got >= chunk_size))
1394     {
1395         /* Already got all or enough in the match_list */
1396         ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
1397         goto done;
1398     }
1399 
1400     lck = lock_hash_function(tb, slot_ix);
1401     if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */
1402         unlock_hash_function(lck);
1403         *ret = NIL;
1404         ret_value = DB_ERROR_BADPARAM;
1405         goto done;
1406     }
1407 
1408     /*
1409      * Resume traversal cycle from where we left
1410      */
1411     current_ptr = &BUCKET(tb,slot_ix);
1412     for(;;) {
1413         if (*current_ptr != NULL) {
1414             if (!is_pseudo_deleted(*current_ptr)) {
1415                 DbTerm* obj = &(*current_ptr)->dbterm;
1416                 if (tb->common.compress)
1417                     obj = db_alloc_tmp_uncompressed(&tb->common, obj);
1418                 match_res = db_match_dbterm_uncompressed(&tb->common, p, *mpp,
1419                                                          obj, hpp, 2);
1420                 saved_current = *current_ptr;
1421                 if (ctx->on_match_res(ctx, slot_ix, &current_ptr, match_res)) {
1422                     ++got;
1423                 }
1424                 if (tb->common.compress)
1425                     db_free_tmp_uncompressed(obj);
1426 
1427                 --iterations_left;
1428                 if (*current_ptr != saved_current) {
1429                     /* Don't advance to next, the callback did it already */
1430                     continue;
1431                 }
1432             }
1433             current_ptr = &((*current_ptr)->next);
1434         }
1435         else {
1436             if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) {
1437                 slot_ix = -1;
1438                 break;
1439             }
1440             if (chunk_size && got >= chunk_size) {
1441                 unlock_hash_function(lck);
1442                 break;
1443             }
1444             if (iterations_left <= 0) {
1445                 unlock_hash_function(lck);
1446                 ret_value = ctx->on_trap(ctx, slot_ix, got, mpp, ret);
1447                 goto done;
1448             }
1449             current_ptr = &BUCKET(tb,slot_ix);
1450         }
1451     }
1452 
1453     ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
1454 
1455 done:
1456     /* We should only jump directly to this label if
1457      * we've already called on_loop_ended / on_trap
1458      */
1459     return ret_value;
1460 
1461 }
1462 
1463 
1464 /*
1465  * Common traversal trapping/continuation code;
1466  * used by select_count, select_delete and select_replace,
1467  * as well as their continuation-handling counterparts.
1468  */
1469 
on_simple_trap(Export * trap_function,Process * p,DbTableHash * tb,Eterm tid,Eterm * prev_continuation_tptr,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)1470 static ERTS_INLINE int on_simple_trap(Export* trap_function,
1471                                                  Process* p,
1472                                                  DbTableHash* tb,
1473                                                  Eterm tid,
1474                                                  Eterm* prev_continuation_tptr,
1475                                                  Sint slot_ix,
1476                                                  Sint got,
1477                                                  Binary** mpp,
1478                                                  Eterm* ret)
1479 {
1480     Eterm* hp;
1481     Eterm egot;
1482     Eterm mpb;
1483     Eterm continuation;
1484     int is_first_trap = (prev_continuation_tptr == NULL);
1485     size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0);
1486 
1487     BUMP_ALL_REDS(p);
1488     if (IS_USMALL(0, got)) {
1489 	hp = HAllocX(p,  base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE);
1490 	egot = make_small(got);
1491     }
1492     else {
1493 	hp = HAllocX(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5,
1494                      ERTS_MAGIC_REF_THING_SIZE);
1495 	egot = uint_to_big(got, hp);
1496 	hp += BIG_UINT_HEAP_SIZE;
1497     }
1498 
1499     if (is_first_trap) {
1500         if (is_atom(tid))
1501             tid = erts_db_make_tid(p, &tb->common);
1502         mpb = erts_db_make_match_prog_ref(p, *mpp, &hp);
1503         *mpp = NULL; /* otherwise the caller will destroy it */
1504     }
1505     else {
1506         ASSERT(!is_atom(tid));
1507         mpb = prev_continuation_tptr[3];
1508     }
1509 
1510     continuation = TUPLE4(
1511             hp,
1512             tid,
1513             make_small(slot_ix),
1514             mpb,
1515             egot);
1516     ERTS_BIF_PREP_TRAP1(*ret, trap_function, p, continuation);
1517     return DB_ERROR_NONE;
1518 }
1519 
unpack_simple_continuation(Eterm continuation,Eterm ** tptr_ptr,Eterm * tid_ptr,Sint * slot_ix_p,Binary ** mpp,Sint * got_p)1520 static ERTS_INLINE int unpack_simple_continuation(Eterm continuation,
1521                                                   Eterm** tptr_ptr,
1522                                                   Eterm* tid_ptr,
1523                                                   Sint* slot_ix_p,
1524                                                   Binary** mpp,
1525                                                   Sint* got_p)
1526 {
1527     Eterm* tptr;
1528     ASSERT(is_tuple(continuation));
1529     tptr = tuple_val(continuation);
1530     if (arityval(*tptr) != 4)
1531         return 1;
1532 
1533     if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) {
1534         return 1;
1535     }
1536 
1537     *tptr_ptr = tptr;
1538     *tid_ptr = tptr[1];
1539     *slot_ix_p = unsigned_val(tptr[2]);
1540     *mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]);
1541     if (is_big(tptr[4])) {
1542         *got_p = big_to_uint32(tptr[4]);
1543     }
1544     else {
1545         *got_p = unsigned_val(tptr[4]);
1546     }
1547     return 0;
1548 }
1549 
1550 
1551 /*
1552  *
1553  * select / select_chunk match traversal
1554  *
1555  */
1556 
1557 #define MAX_SELECT_CHUNK_ITERATIONS 1000
1558 
1559 typedef struct {
1560     match_callbacks_t base;
1561     Process* p;
1562     DbTableHash* tb;
1563     Eterm tid;
1564     Eterm* hp;
1565     Sint chunk_size;
1566     Eterm match_list;
1567     Eterm* prev_continuation_tptr;
1568 } select_chunk_context_t;
1569 
select_chunk_on_nothing_can_match(match_callbacks_t * ctx_base,Eterm * ret)1570 static int select_chunk_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret)
1571 {
1572     select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1573     *ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
1574     return DB_ERROR_NONE;
1575 }
1576 
select_chunk_on_match_res(match_callbacks_t * ctx_base,Sint slot_ix,HashDbTerm *** current_ptr_ptr,Eterm match_res)1577 static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
1578                                      HashDbTerm*** current_ptr_ptr,
1579                                      Eterm match_res)
1580 {
1581     select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1582     if (is_value(match_res)) {
1583         ctx->match_list = CONS(ctx->hp, match_res, ctx->match_list);
1584         return 1;
1585     }
1586     return 0;
1587 }
1588 
select_chunk_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)1589 static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base,
1590                                       Sint slot_ix, Sint got,
1591                                       Sint iterations_left, Binary** mpp,
1592                                       Eterm* ret)
1593 {
1594     select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1595     Eterm mpb;
1596 
1597     if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) {
1598         /* We didn't get to iterate a single time, which means EOT */
1599         ASSERT(ctx->match_list == NIL);
1600         *ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
1601         return DB_ERROR_NONE;
1602     }
1603     else {
1604         ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS);
1605         BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
1606         if (ctx->chunk_size) {
1607             Eterm continuation;
1608             Eterm rest = NIL;
1609             Sint rest_size = 0;
1610 
1611             if (got > ctx->chunk_size) { /* Split list in return value and 'rest' */
1612                 Eterm tmp = ctx->match_list;
1613                 rest = ctx->match_list;
1614                 while (got-- > ctx->chunk_size + 1) {
1615                     tmp = CDR(list_val(tmp));
1616                     ++rest_size;
1617                 }
1618                 ++rest_size;
1619                 ctx->match_list = CDR(list_val(tmp));
1620                 CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
1621                                              been in 'user space' */
1622             }
1623             if (rest != NIL || slot_ix >= 0) { /* Need more calls */
1624                 Eterm tid = ctx->tid;
1625                 ctx->hp = HAllocX(ctx->p,
1626                                   3 + 7 + ERTS_MAGIC_REF_THING_SIZE,
1627                                   ERTS_MAGIC_REF_THING_SIZE);
1628                 mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &ctx->hp);
1629                 if (is_atom(tid))
1630                     tid = erts_db_make_tid(ctx->p,
1631                                            &ctx->tb->common);
1632                 continuation = TUPLE6(
1633                         ctx->hp,
1634                         tid,
1635                         make_small(slot_ix),
1636                         make_small(ctx->chunk_size),
1637                         mpb, rest,
1638                         make_small(rest_size));
1639                 *mpp = NULL; /* Otherwise the caller will destroy it */
1640                 ctx->hp += 7;
1641                 *ret = TUPLE2(ctx->hp, ctx->match_list, continuation);
1642                 return DB_ERROR_NONE;
1643             } else { /* All data is exhausted */
1644                 if (ctx->match_list != NIL) { /* No more data to search but still a
1645                                                             result to return to the caller */
1646                     ctx->hp = HAlloc(ctx->p, 3);
1647                     *ret = TUPLE2(ctx->hp, ctx->match_list, am_EOT);
1648                     return DB_ERROR_NONE;
1649                 } else { /* Reached the end of the ttable with no data to return */
1650                     *ret = am_EOT;
1651                     return DB_ERROR_NONE;
1652                 }
1653             }
1654         }
1655         *ret = ctx->match_list;
1656         return DB_ERROR_NONE;
1657     }
1658 }
1659 
select_chunk_on_trap(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)1660 static int select_chunk_on_trap(match_callbacks_t* ctx_base,
1661                                 Sint slot_ix, Sint got,
1662                                 Binary** mpp, Eterm* ret)
1663 {
1664     select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1665     Eterm mpb;
1666     Eterm continuation;
1667     Eterm* hp;
1668 
1669     BUMP_ALL_REDS(ctx->p);
1670 
1671     if (ctx->prev_continuation_tptr == NULL) {
1672         Eterm tid = ctx->tid;
1673         /* First time we're trapping */
1674         hp = HAllocX(ctx->p, 7 + ERTS_MAGIC_REF_THING_SIZE,
1675                      ERTS_MAGIC_REF_THING_SIZE);
1676         if (is_atom(tid))
1677             tid = erts_db_make_tid(ctx->p, &ctx->tb->common);
1678         mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp);
1679         continuation = TUPLE6(
1680                 hp,
1681                 tid,
1682                 make_small(slot_ix),
1683                 make_small(ctx->chunk_size),
1684                 mpb,
1685                 ctx->match_list,
1686                 make_small(got));
1687         *mpp = NULL; /* otherwise the caller will destroy it */
1688     }
1689     else {
1690         /* Not the first time we're trapping; reuse continuation terms */
1691         hp = HAlloc(ctx->p, 7);
1692         continuation = TUPLE6(
1693                 hp,
1694                 ctx->prev_continuation_tptr[1],
1695                 make_small(slot_ix),
1696                 ctx->prev_continuation_tptr[3],
1697                 ctx->prev_continuation_tptr[4],
1698                 ctx->match_list,
1699                 make_small(got));
1700     }
1701     ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->p,
1702                         continuation);
1703     return DB_ERROR_NONE;
1704 }
1705 
db_select_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,int reverse,Eterm * ret)1706 static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern,
1707                           int reverse, Eterm *ret)
1708 {
1709     return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret);
1710 }
1711 
db_select_chunk_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Sint chunk_size,int reverse,Eterm * ret)1712 static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
1713                                 Eterm pattern, Sint chunk_size,
1714                                 int reverse, Eterm *ret)
1715 {
1716     select_chunk_context_t ctx;
1717 
1718     ctx.base.on_nothing_can_match = select_chunk_on_nothing_can_match;
1719     ctx.base.on_match_res         = select_chunk_on_match_res;
1720     ctx.base.on_loop_ended        = select_chunk_on_loop_ended;
1721     ctx.base.on_trap              = select_chunk_on_trap,
1722     ctx.p = p;
1723     ctx.tb = &tbl->hash;
1724     ctx.tid = tid;
1725     ctx.hp = NULL;
1726     ctx.chunk_size = chunk_size;
1727     ctx.match_list = NIL;
1728     ctx.prev_continuation_tptr = NULL;
1729 
1730     return match_traverse(
1731             ctx.p, ctx.tb,
1732             pattern, NULL,
1733             ctx.chunk_size,
1734             MAX_SELECT_CHUNK_ITERATIONS,
1735             &ctx.hp, 0,
1736             &ctx.base, ret);
1737 }
1738 
1739 /*
1740  *
1741  * select_continue match traversal
1742  *
1743  */
1744 
1745 static
select_chunk_continue_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)1746 int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base,
1747                                         Sint slot_ix, Sint got,
1748                                         Sint iterations_left, Binary** mpp,
1749                                         Eterm* ret)
1750 {
1751     select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1752     Eterm continuation;
1753     Eterm rest = NIL;
1754     Eterm* hp;
1755 
1756     ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS);
1757     BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
1758     if (ctx->chunk_size) {
1759         Sint rest_size = 0;
1760         if (got > ctx->chunk_size) {
1761             /* Cannot write destructively here,
1762                the list may have
1763                been in user space */
1764             hp = HAlloc(ctx->p, (got - ctx->chunk_size) * 2);
1765             while (got-- > ctx->chunk_size) {
1766                 rest = CONS(hp, CAR(list_val(ctx->match_list)), rest);
1767                 hp += 2;
1768                 ctx->match_list = CDR(list_val(ctx->match_list));
1769                 ++rest_size;
1770             }
1771         }
1772         if (rest != NIL || slot_ix >= 0) {
1773             hp = HAlloc(ctx->p, 3 + 7);
1774             continuation = TUPLE6(
1775                     hp,
1776                     ctx->prev_continuation_tptr[1],
1777                     make_small(slot_ix),
1778                     ctx->prev_continuation_tptr[3],
1779                     ctx->prev_continuation_tptr[4],
1780                     rest,
1781                     make_small(rest_size));
1782             hp += 7;
1783             *ret = TUPLE2(hp, ctx->match_list, continuation);
1784             return DB_ERROR_NONE;
1785         } else {
1786             if (ctx->match_list != NIL) {
1787                 hp = HAlloc(ctx->p, 3);
1788                 *ret = TUPLE2(hp, ctx->match_list, am_EOT);
1789                 return DB_ERROR_NONE;
1790             } else {
1791                 *ret = am_EOT;
1792                 return DB_ERROR_NONE;
1793             }
1794         }
1795     }
1796     *ret = ctx->match_list;
1797     return DB_ERROR_NONE;
1798 }
1799 
1800 /*
1801  * This is called when select traps
1802  */
db_select_continue_hash(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret)1803 static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation,
1804                                    Eterm* ret)
1805 {
1806     select_chunk_context_t ctx;
1807     Eterm* tptr;
1808     Eterm tid;
1809     Binary* mp;
1810     Sint got;
1811     Sint slot_ix;
1812     Sint chunk_size;
1813     Eterm match_list;
1814     Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS;
1815 
1816     /* Decode continuation. We know it's a tuple but not the arity or anything else */
1817     ASSERT(is_tuple(continuation));
1818     tptr = tuple_val(continuation);
1819 
1820     if (arityval(*tptr) != 6)
1821         goto badparam;
1822 
1823     if (!is_small(tptr[2]) || !is_small(tptr[3]) ||
1824             !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
1825         goto badparam;
1826     if ((chunk_size = signed_val(tptr[3])) < 0)
1827         goto badparam;
1828 
1829     mp = erts_db_get_match_prog_binary(tptr[4]);
1830     if (mp == NULL)
1831         goto badparam;
1832 
1833     if ((got = signed_val(tptr[6])) < 0)
1834         goto badparam;
1835 
1836     tid = tptr[1];
1837     slot_ix = signed_val(tptr[2]);
1838     match_list = tptr[5];
1839 
1840     /* Proceed */
1841     ctx.base.on_match_res  = select_chunk_on_match_res;
1842     ctx.base.on_loop_ended = select_chunk_continue_on_loop_ended;
1843     ctx.base.on_trap       = select_chunk_on_trap;
1844     ctx.p = p;
1845     ctx.tb = &tbl->hash;
1846     ctx.tid = tid;
1847     ctx.hp = NULL;
1848     ctx.chunk_size = chunk_size;
1849     ctx.match_list = match_list;
1850     ctx.prev_continuation_tptr = tptr;
1851 
1852     return match_traverse_continue(
1853             ctx.p, ctx.tb, ctx.chunk_size,
1854             iterations_left, &ctx.hp, slot_ix, got, &mp, 0,
1855             &ctx.base, ret);
1856 
1857 badparam:
1858     *ret = NIL;
1859     return DB_ERROR_BADPARAM;
1860 }
1861 
1862 #undef MAX_SELECT_CHUNK_ITERATIONS
1863 
1864 
1865 /*
1866  *
1867  * select_count match traversal
1868  *
1869  */
1870 
1871 #define MAX_SELECT_COUNT_ITERATIONS 1000
1872 
1873 typedef struct {
1874     match_callbacks_t base;
1875     Process* p;
1876     DbTableHash* tb;
1877     Eterm tid;
1878     Eterm* prev_continuation_tptr;
1879 } select_count_context_t;
1880 
select_count_on_nothing_can_match(match_callbacks_t * ctx_base,Eterm * ret)1881 static int select_count_on_nothing_can_match(match_callbacks_t* ctx_base,
1882                                              Eterm* ret)
1883 {
1884     *ret = make_small(0);
1885     return DB_ERROR_NONE;
1886 }
1887 
select_count_on_match_res(match_callbacks_t * ctx_base,Sint slot_ix,HashDbTerm *** current_ptr_ptr,Eterm match_res)1888 static int select_count_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
1889                                      HashDbTerm*** current_ptr_ptr,
1890                                      Eterm match_res)
1891 {
1892     return (match_res == am_true);
1893 }
1894 
select_count_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)1895 static int select_count_on_loop_ended(match_callbacks_t* ctx_base,
1896                                       Sint slot_ix, Sint got,
1897                                       Sint iterations_left, Binary** mpp,
1898                                       Eterm* ret)
1899 {
1900     select_count_context_t* ctx = (select_count_context_t*) ctx_base;
1901     ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS);
1902     BUMP_REDS(ctx->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left);
1903     *ret = erts_make_integer(got, ctx->p);
1904     return DB_ERROR_NONE;
1905 }
1906 
select_count_on_trap(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)1907 static int select_count_on_trap(match_callbacks_t* ctx_base,
1908                                 Sint slot_ix, Sint got,
1909                                 Binary** mpp, Eterm* ret)
1910 {
1911     select_count_context_t* ctx = (select_count_context_t*) ctx_base;
1912     return on_simple_trap(
1913             &ets_select_count_continue_exp,
1914             ctx->p,
1915             ctx->tb,
1916             ctx->tid,
1917             ctx->prev_continuation_tptr,
1918             slot_ix, got, mpp, ret);
1919 }
1920 
db_select_count_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret)1921 static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
1922                                 Eterm pattern, Eterm *ret)
1923 {
1924     select_count_context_t ctx;
1925     Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS;
1926     Sint chunk_size = 0;
1927 
1928     ctx.base.on_nothing_can_match = select_count_on_nothing_can_match;
1929     ctx.base.on_match_res         = select_count_on_match_res;
1930     ctx.base.on_loop_ended        = select_count_on_loop_ended;
1931     ctx.base.on_trap              = select_count_on_trap;
1932     ctx.p = p;
1933     ctx.tb = &tbl->hash;
1934     ctx.tid = tid;
1935     ctx.prev_continuation_tptr = NULL;
1936 
1937     return match_traverse(
1938             ctx.p, ctx.tb,
1939             pattern, NULL,
1940             chunk_size, iterations_left, NULL, 0,
1941             &ctx.base, ret);
1942 }
1943 
1944 /*
1945  * This is called when select_count traps
1946  */
db_select_count_continue_hash(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret)1947 static int db_select_count_continue_hash(Process* p, DbTable* tbl,
1948                                          Eterm continuation, Eterm* ret)
1949 {
1950     select_count_context_t ctx;
1951     Eterm* tptr;
1952     Eterm tid;
1953     Binary* mp;
1954     Sint got;
1955     Sint slot_ix;
1956     Sint chunk_size = 0;
1957     *ret = NIL;
1958 
1959     if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
1960         *ret = NIL;
1961         return DB_ERROR_BADPARAM;
1962     }
1963 
1964     ctx.base.on_match_res  = select_count_on_match_res;
1965     ctx.base.on_loop_ended = select_count_on_loop_ended;
1966     ctx.base.on_trap       = select_count_on_trap;
1967     ctx.p = p;
1968     ctx.tb = &tbl->hash;
1969     ctx.tid = tid;
1970     ctx.prev_continuation_tptr = tptr;
1971 
1972     return match_traverse_continue(
1973             ctx.p, ctx.tb, chunk_size,
1974             MAX_SELECT_COUNT_ITERATIONS,
1975             NULL, slot_ix, got, &mp, 0,
1976             &ctx.base, ret);
1977 }
1978 
1979 #undef MAX_SELECT_COUNT_ITERATIONS
1980 
1981 
1982 /*
1983  *
1984  * select_delete match traversal
1985  *
1986  */
1987 
1988 #define MAX_SELECT_DELETE_ITERATIONS 1000
1989 
1990 typedef struct {
1991     match_callbacks_t base;
1992     Process* p;
1993     DbTableHash* tb;
1994     Eterm tid;
1995     Eterm* prev_continuation_tptr;
1996     erts_aint_t fixated_by_me;
1997     Uint last_pseudo_delete;
1998     HashDbTerm* free_us;
1999 } select_delete_context_t;
2000 
select_delete_on_nothing_can_match(match_callbacks_t * ctx_base,Eterm * ret)2001 static int select_delete_on_nothing_can_match(match_callbacks_t* ctx_base,
2002                                               Eterm* ret)
2003 {
2004     *ret = make_small(0);
2005     return DB_ERROR_NONE;
2006 }
2007 
select_delete_on_match_res(match_callbacks_t * ctx_base,Sint slot_ix,HashDbTerm *** current_ptr_ptr,Eterm match_res)2008 static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
2009                                       HashDbTerm*** current_ptr_ptr,
2010                                       Eterm match_res)
2011 {
2012     HashDbTerm** current_ptr = *current_ptr_ptr;
2013     select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
2014     HashDbTerm* del;
2015     if (match_res != am_true)
2016         return 0;
2017 
2018     if (NFIXED(ctx->tb) > ctx->fixated_by_me) { /* fixated by others? */
2019         if (slot_ix != ctx->last_pseudo_delete) {
2020             if (!add_fixed_deletion(ctx->tb, slot_ix, ctx->fixated_by_me))
2021                 goto do_erase;
2022             ctx->last_pseudo_delete = slot_ix;
2023         }
2024         (*current_ptr)->pseudo_deleted = 1;
2025     }
2026     else {
2027     do_erase:
2028         del = *current_ptr;
2029         *current_ptr = (*current_ptr)->next; // replace pointer to term using next
2030         del->next = ctx->free_us;
2031         ctx->free_us = del;
2032     }
2033     erts_atomic_dec_nob(&ctx->tb->common.nitems);
2034 
2035     return 1;
2036 }
2037 
select_delete_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)2038 static int select_delete_on_loop_ended(match_callbacks_t* ctx_base,
2039                                        Sint slot_ix, Sint got,
2040                                        Sint iterations_left, Binary** mpp,
2041                                        Eterm* ret)
2042 {
2043     select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
2044     free_term_list(ctx->tb, ctx->free_us);
2045     ctx->free_us = NULL;
2046     ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS);
2047     BUMP_REDS(ctx->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left);
2048     if (got) {
2049 	try_shrink(ctx->tb);
2050     }
2051     *ret = erts_make_integer(got, ctx->p);
2052     return DB_ERROR_NONE;
2053 }
2054 
select_delete_on_trap(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)2055 static int select_delete_on_trap(match_callbacks_t* ctx_base,
2056                                  Sint slot_ix, Sint got,
2057                                  Binary** mpp, Eterm* ret)
2058 {
2059     select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
2060     free_term_list(ctx->tb, ctx->free_us);
2061     ctx->free_us = NULL;
2062     return on_simple_trap(
2063             &ets_select_delete_continue_exp,
2064             ctx->p,
2065             ctx->tb,
2066             ctx->tid,
2067             ctx->prev_continuation_tptr,
2068             slot_ix, got, mpp, ret);
2069 }
2070 
db_select_delete_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret)2071 static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
2072                                  Eterm pattern, Eterm *ret)
2073 {
2074     select_delete_context_t ctx;
2075     Sint chunk_size = 0;
2076 
2077     ctx.base.on_nothing_can_match = select_delete_on_nothing_can_match;
2078     ctx.base.on_match_res         = select_delete_on_match_res;
2079     ctx.base.on_loop_ended        = select_delete_on_loop_ended;
2080     ctx.base.on_trap              = select_delete_on_trap;
2081     ctx.p = p;
2082     ctx.tb = &tbl->hash;
2083     ctx.tid = tid;
2084     ctx.prev_continuation_tptr = NULL;
2085     ctx.fixated_by_me = ctx.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */
2086     ctx.last_pseudo_delete = (Uint) -1;
2087     ctx.free_us = NULL;
2088 
2089     return match_traverse(
2090             ctx.p, ctx.tb,
2091             pattern, NULL,
2092             chunk_size,
2093             MAX_SELECT_DELETE_ITERATIONS, NULL, 1,
2094             &ctx.base, ret);
2095 }
2096 
2097 /*
2098  * This is called when select_delete traps
2099  */
db_select_delete_continue_hash(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret)2100 static int db_select_delete_continue_hash(Process* p, DbTable* tbl,
2101                                           Eterm continuation, Eterm* ret)
2102 {
2103     select_delete_context_t ctx;
2104     Eterm* tptr;
2105     Eterm tid;
2106     Binary* mp;
2107     Sint got;
2108     Sint slot_ix;
2109     Sint chunk_size = 0;
2110 
2111     if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
2112         *ret = NIL;
2113         return DB_ERROR_BADPARAM;
2114     }
2115 
2116     ctx.base.on_match_res  = select_delete_on_match_res;
2117     ctx.base.on_loop_ended = select_delete_on_loop_ended;
2118     ctx.base.on_trap       = select_delete_on_trap;
2119     ctx.p = p;
2120     ctx.tb = &tbl->hash;
2121     ctx.tid = tid;
2122     ctx.prev_continuation_tptr = tptr;
2123     ctx.fixated_by_me = ONLY_WRITER(p, ctx.tb) ? 0 : 1; /* TODO: something nicer */
2124     ctx.last_pseudo_delete = (Uint) -1;
2125     ctx.free_us = NULL;
2126 
2127     return match_traverse_continue(
2128             ctx.p, ctx.tb, chunk_size,
2129             MAX_SELECT_DELETE_ITERATIONS,
2130             NULL, slot_ix, got, &mp, 1,
2131             &ctx.base, ret);
2132 }
2133 
2134 #undef MAX_SELECT_DELETE_ITERATIONS
2135 
2136 
2137 /*
2138  *
2139  * select_replace match traversal
2140  *
2141  */
2142 
2143 #define MAX_SELECT_REPLACE_ITERATIONS 1000
2144 
2145 typedef struct {
2146     match_callbacks_t base;
2147     Process* p;
2148     DbTableHash* tb;
2149     Eterm tid;
2150     Eterm* prev_continuation_tptr;
2151 } select_replace_context_t;
2152 
select_replace_on_nothing_can_match(match_callbacks_t * ctx_base,Eterm * ret)2153 static int select_replace_on_nothing_can_match(match_callbacks_t* ctx_base,
2154                                                Eterm* ret)
2155 {
2156     *ret = make_small(0);
2157     return DB_ERROR_NONE;
2158 }
2159 
select_replace_on_match_res(match_callbacks_t * ctx_base,Sint slot_ix,HashDbTerm *** current_ptr_ptr,Eterm match_res)2160 static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
2161                                        HashDbTerm*** current_ptr_ptr,
2162                                        Eterm match_res)
2163 {
2164     select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
2165     DbTableHash* tb = ctx->tb;
2166     HashDbTerm* new;
2167     HashDbTerm* next;
2168     HashValue hval;
2169 
2170     if (is_value(match_res)) {
2171 #ifdef DEBUG
2172         Eterm key = db_getkey(tb->common.keypos, match_res);
2173         ASSERT(is_value(key));
2174         ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl)));
2175 #endif
2176         next = (**current_ptr_ptr)->next;
2177         hval = (**current_ptr_ptr)->hvalue;
2178         new = new_dbterm(tb, match_res);
2179         new->next = next;
2180         new->hvalue = hval;
2181         new->pseudo_deleted = 0;
2182         free_term(tb, **current_ptr_ptr);
2183         **current_ptr_ptr = new; /* replace 'next' pointer in previous object */
2184         *current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */
2185         return 1;
2186     }
2187     return 0;
2188 }
2189 
select_replace_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)2190 static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix,
2191                                         Sint got, Sint iterations_left,
2192                                         Binary** mpp, Eterm* ret)
2193 {
2194     select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
2195     ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS);
2196     /* the more objects we've replaced, the more reductions we've consumed */
2197     BUMP_REDS(ctx->p,
2198               MIN(MAX_SELECT_REPLACE_ITERATIONS * 2,
2199                   (MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got));
2200     *ret = erts_make_integer(got, ctx->p);
2201     return DB_ERROR_NONE;
2202 }
2203 
select_replace_on_trap(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)2204 static int select_replace_on_trap(match_callbacks_t* ctx_base,
2205                                   Sint slot_ix, Sint got,
2206                                   Binary** mpp, Eterm* ret)
2207 {
2208     select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
2209     return on_simple_trap(
2210             &ets_select_replace_continue_exp,
2211             ctx->p,
2212             ctx->tb,
2213             ctx->tid,
2214             ctx->prev_continuation_tptr,
2215             slot_ix, got, mpp, ret);
2216 }
2217 
db_select_replace_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret)2218 static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret)
2219 {
2220     select_replace_context_t ctx;
2221     Sint chunk_size = 0;
2222 
2223     /* Bag implementation presented both semantic consistency and performance issues,
2224      * unsupported for now
2225      */
2226     ASSERT(!(tbl->hash.common.status & DB_BAG));
2227 
2228     ctx.base.on_nothing_can_match = select_replace_on_nothing_can_match;
2229     ctx.base.on_match_res         = select_replace_on_match_res;
2230     ctx.base.on_loop_ended        = select_replace_on_loop_ended;
2231     ctx.base.on_trap              = select_replace_on_trap;
2232     ctx.p = p;
2233     ctx.tb = &tbl->hash;
2234     ctx.tid = tid;
2235     ctx.prev_continuation_tptr = NULL;
2236 
2237     return match_traverse(
2238             ctx.p, ctx.tb,
2239             pattern, db_match_keeps_key,
2240             chunk_size,
2241             MAX_SELECT_REPLACE_ITERATIONS, NULL, 1,
2242             &ctx.base, ret);
2243 }
2244 
2245 /*
2246  * This is called when select_replace traps
2247  */
db_select_replace_continue_hash(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret)2248 static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret)
2249 {
2250     select_replace_context_t ctx;
2251     Eterm* tptr;
2252     Eterm tid ;
2253     Binary* mp;
2254     Sint got;
2255     Sint slot_ix;
2256     Sint chunk_size = 0;
2257     *ret = NIL;
2258 
2259     if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
2260         *ret = NIL;
2261         return DB_ERROR_BADPARAM;
2262     }
2263 
2264     /* Proceed */
2265     ctx.base.on_match_res  = select_replace_on_match_res;
2266     ctx.base.on_loop_ended = select_replace_on_loop_ended;
2267     ctx.base.on_trap       = select_replace_on_trap;
2268     ctx.p = p;
2269     ctx.tb = &tbl->hash;
2270     ctx.tid = tid;
2271     ctx.prev_continuation_tptr = tptr;
2272 
2273     return match_traverse_continue(
2274             ctx.p, ctx.tb, chunk_size,
2275             MAX_SELECT_REPLACE_ITERATIONS,
2276             NULL, slot_ix, got, &mp, 1,
2277             &ctx.base, ret);
2278 }
2279 
2280 
db_take_hash(Process * p,DbTable * tbl,Eterm key,Eterm * ret)2281 static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
2282 {
2283     DbTableHash *tb = &tbl->hash;
2284     HashDbTerm **bp, *b;
2285     HashDbTerm *free_us = NULL;
2286     HashValue hval = MAKE_HASH(key);
2287     erts_rwmtx_t *lck = WLOCK_HASH(tb, hval);
2288     int ix = hash_to_ix(tb, hval);
2289     int nitems_diff = 0;
2290 
2291     *ret = NIL;
2292     for (bp = &BUCKET(tb, ix), b = *bp; b; bp = &b->next, b = b->next) {
2293         if (has_live_key(tb, b, key, hval)) {
2294             HashDbTerm *bend;
2295 
2296             *ret = get_term_list(p, tb, key, hval, b, &bend);
2297             while (b != bend) {
2298                 --nitems_diff;
2299                 if (nitems_diff == -1 && IS_FIXED(tb)
2300                     && add_fixed_deletion(tb, ix, 0)) {
2301                     /* Pseudo remove (no need to keep several of same key) */
2302                     bp = &b->next;
2303                     b->pseudo_deleted = 1;
2304                     b = b->next;
2305                 } else {
2306                     HashDbTerm* next = b->next;
2307                     b->next = free_us;
2308                     free_us = b;
2309                     b = *bp = next;
2310                 }
2311             }
2312             break;
2313         }
2314     }
2315     WUNLOCK_HASH(lck);
2316     if (nitems_diff) {
2317         erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
2318         try_shrink(tb);
2319     }
2320     free_term_list(tb, free_us);
2321     return DB_ERROR_NONE;
2322 }
2323 
2324 
2325 /*
2326 ** Other interface routines (not directly coupled to one bif)
2327 */
2328 
db_initialize_hash(void)2329 void db_initialize_hash(void)
2330 {
2331 }
2332 
2333 
db_mark_all_deleted_hash(DbTable * tbl,SWord reds)2334 static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds)
2335 {
2336     const int LOOPS_PER_REDUCTION  = 8;
2337     DbTableHash *tb = &tbl->hash;
2338     FixedDeletion* fixdel;
2339     SWord loops = reds * LOOPS_PER_REDUCTION;
2340     int i;
2341 
2342     ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb));
2343 
2344     fixdel = (FixedDeletion*) erts_atomic_read_nob(&tb->fixdel);
2345     if (fixdel && fixdel->trap) {
2346         /* Continue after trap */
2347         ASSERT(fixdel->all);
2348         ASSERT(fixdel->slot < NACTIVE(tb));
2349         i = fixdel->slot;
2350     }
2351     else {
2352         /* First call */
2353         int ok;
2354         fixdel = alloc_fixdel(tb);
2355         ok = link_fixdel(tb, fixdel, 0);
2356         ASSERT(ok); (void)ok;
2357         i = 0;
2358     }
2359 
2360     do {
2361         HashDbTerm* b;
2362 	for (b = BUCKET(tb,i); b; b = b->next)
2363             b->pseudo_deleted = 1;
2364     } while (++i < NACTIVE(tb) && --loops > 0);
2365 
2366     if (i < NACTIVE(tb)) {
2367          /* Yield */
2368         fixdel->slot = i;
2369         fixdel->all = 0;
2370         fixdel->trap = 1;
2371         return -1;
2372     }
2373 
2374     fixdel->slot = NACTIVE(tb) - 1;
2375     fixdel->all = 1;
2376     fixdel->trap = 0;
2377     erts_atomic_set_nob(&tb->common.nitems, 0);
2378     return loops < 0 ? 0 : loops / LOOPS_PER_REDUCTION;
2379 }
2380 
2381 
2382 /* Display hash table contents (for dump) */
db_print_hash(fmtfn_t to,void * to_arg,int show,DbTable * tbl)2383 static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl)
2384 {
2385     DbTableHash *tb = &tbl->hash;
2386     DbHashStats stats;
2387     int i;
2388 
2389     erts_print(to, to_arg, "Buckets: %d\n", NACTIVE(tb));
2390 
2391     i = tbl->common.is_thread_safe;
2392     /* If crash dumping we set table to thread safe in order to
2393        avoid taking any locks */
2394     if (ERTS_IS_CRASH_DUMPING)
2395         tbl->common.is_thread_safe = 1;
2396 
2397     db_calc_stats_hash(&tbl->hash, &stats);
2398 
2399     tbl->common.is_thread_safe = i;
2400 
2401     erts_print(to, to_arg, "Chain Length Avg: %f\n", stats.avg_chain_len);
2402     erts_print(to, to_arg, "Chain Length Max: %d\n", stats.max_chain_len);
2403     erts_print(to, to_arg, "Chain Length Min: %d\n", stats.min_chain_len);
2404     erts_print(to, to_arg, "Chain Length Std Dev: %f\n",
2405                stats.std_dev_chain_len);
2406     erts_print(to, to_arg, "Chain Length Expected Std Dev: %f\n",
2407                stats.std_dev_expected);
2408 
2409     if (IS_FIXED(tb))
2410         erts_print(to, to_arg, "Fixed: %d\n", stats.kept_items);
2411     else
2412         erts_print(to, to_arg, "Fixed: false\n");
2413 
2414     if (show) {
2415 	for (i = 0; i < NACTIVE(tb); i++) {
2416 	    HashDbTerm* list = BUCKET(tb,i);
2417 	    if (list == NULL)
2418 		continue;
2419 	    erts_print(to, to_arg, "%d: [", i);
2420 	    while(list != 0) {
2421 		if (is_pseudo_deleted(list))
2422 		    erts_print(to, to_arg, "*");
2423 		if (tb->common.compress) {
2424 		    Eterm key = GETKEY(tb, list->dbterm.tpl);
2425 		    erts_print(to, to_arg, "key=%T", key);
2426 		}
2427 		else {
2428 		    Eterm obj = make_tuple(list->dbterm.tpl);
2429 		    erts_print(to, to_arg, "%T", obj);
2430 		}
2431 		if (list->next != 0)
2432 		    erts_print(to, to_arg, ",");
2433 		list = list->next;
2434 	    }
2435 	    erts_print(to, to_arg, "]\n");
2436 	}
2437     }
2438 }
2439 
db_free_empty_table_hash(DbTable * tbl)2440 static int db_free_empty_table_hash(DbTable *tbl)
2441 {
2442     ASSERT(NITEMS(tbl) == 0);
2443     while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0)
2444 	;
2445     return 0;
2446 }
2447 
db_free_table_continue_hash(DbTable * tbl,SWord reds)2448 static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds)
2449 {
2450     DbTableHash *tb = &tbl->hash;
2451     FixedDeletion* fixdel = (FixedDeletion*) erts_atomic_read_acqb(&tb->fixdel);
2452     ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE));
2453 
2454     while (fixdel != NULL) {
2455 	FixedDeletion *fx = fixdel;
2456 
2457 	fixdel = fx->next;
2458         free_fixdel(tb, fx);
2459 	if (--reds < 0) {
2460 	    erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
2461 	    return reds;		/* Not done */
2462 	}
2463     }
2464     erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);
2465 
2466     while(tb->nslots != 0) {
2467 	reds -= EXT_SEGSZ/64 + free_seg(tb, 1);
2468 
2469 	/*
2470 	 * If we have done enough work, get out here.
2471 	 */
2472 	if (reds < 0) {
2473 	    return reds;	/* Not done */
2474 	}
2475     }
2476     if (tb->locks != NULL) {
2477 	int i;
2478 	for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
2479 	    erts_rwmtx_destroy(GET_LOCK(tb,i));
2480 	}
2481 	erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
2482 		     (void*)tb->locks, sizeof(DbTableHashFineLocks));
2483 	tb->locks = NULL;
2484     }
2485     ASSERT(erts_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
2486     return reds;			/* Done */
2487 }
2488 
2489 
2490 
2491 /*
2492 ** Utility routines. (static)
2493 */
2494 /*
2495 ** For the select functions, analyzes the pattern and determines which
2496 ** slots should be searched. Also compiles the match program
2497 */
analyze_pattern(DbTableHash * tb,Eterm pattern,extra_match_validator_t extra_validator,struct mp_info * mpi)2498 static int analyze_pattern(DbTableHash *tb, Eterm pattern,
2499                            extra_match_validator_t extra_validator, /* Optional callback */
2500                            struct mp_info *mpi)
2501 {
2502     Eterm *ptpl;
2503     Eterm lst, tpl, ttpl;
2504     Eterm *matches,*guards, *bodies;
2505     Eterm sbuff[30];
2506     Eterm *buff = sbuff;
2507     Eterm key = NIL;
2508     HashValue hval = NIL;
2509     int num_heads = 0;
2510     int i;
2511 
2512     mpi->lists = mpi->dlists;
2513     mpi->num_lists = 0;
2514     mpi->key_given = 1;
2515     mpi->something_can_match = 0;
2516     mpi->mp = NULL;
2517 
2518     for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
2519 	++num_heads;
2520 
2521     if (lst != NIL) {/* proper list... */
2522 	return DB_ERROR_BADPARAM;
2523     }
2524 
2525     if (num_heads > 10) {
2526 	buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
2527 	mpi->lists = erts_alloc(ERTS_ALC_T_DB_SEL_LIST,
2528 				sizeof(*(mpi->lists)) * num_heads);
2529     }
2530 
2531     matches = buff;
2532     guards = buff + num_heads;
2533     bodies = buff + (num_heads * 2);
2534 
2535     i = 0;
2536     for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
2537         Eterm match;
2538         Eterm guard;
2539         Eterm body;
2540 
2541 	ttpl = CAR(list_val(lst));
2542 	if (!is_tuple(ttpl)) {
2543 	    if (buff != sbuff) {
2544 		erts_free(ERTS_ALC_T_DB_TMP, buff);
2545 	    }
2546 	    return DB_ERROR_BADPARAM;
2547 	}
2548 	ptpl = tuple_val(ttpl);
2549 	if (ptpl[0] != make_arityval(3U)) {
2550 	    if (buff != sbuff) {
2551 		erts_free(ERTS_ALC_T_DB_TMP, buff);
2552 	    }
2553 	    return DB_ERROR_BADPARAM;
2554 	}
2555 	matches[i] = match = tpl = ptpl[1];
2556 	guards[i] = guard = ptpl[2];
2557 	bodies[i] = body = ptpl[3];
2558 
2559         if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
2560 	    if (buff != sbuff) {
2561 		erts_free(ERTS_ALC_T_DB_TMP, buff);
2562 	    }
2563             return DB_ERROR_BADPARAM;
2564         }
2565 
2566 	if (!is_list(body) || CDR(list_val(body)) != NIL ||
2567 	    CAR(list_val(body)) != am_DollarUnderscore) {
2568 	}
2569 	++i;
2570 	if (!(mpi->key_given)) {
2571 	    continue;
2572 	}
2573 	if (tpl == am_Underscore || db_is_variable(tpl) != -1) {
2574 	    (mpi->key_given) = 0;
2575 	    (mpi->something_can_match) = 1;
2576 	} else {
2577 	    key = db_getkey(tb->common.keypos, tpl);
2578 	    if (is_value(key)) {
2579 		if (!db_has_variable(key)) {   /* Bound key */
2580 		    int ix, search_slot;
2581 		    HashDbTerm** bp;
2582 		    erts_rwmtx_t* lck;
2583 		    hval = MAKE_HASH(key);
2584 		    lck = RLOCK_HASH(tb,hval);
2585 		    ix = hash_to_ix(tb, hval);
2586 		    bp = &BUCKET(tb,ix);
2587 		    if (lck == NULL) {
2588 			search_slot = search_list(tb,key,hval,*bp) != NULL;
2589 		    } else {
2590 			/* No point to verify if key exist now as there may be
2591 			   concurrent inserters/deleters anyway */
2592 			RUNLOCK_HASH(lck);
2593 			search_slot = 1;
2594 		    }
2595 		    if (search_slot) {
2596 			int j;
2597 			for (j=0; ; ++j) {
2598 			    if (j == mpi->num_lists) {
2599 				mpi->lists[mpi->num_lists].bucket = bp;
2600 				mpi->lists[mpi->num_lists].ix = ix;
2601 				++mpi->num_lists;
2602 				break;
2603 			    }
2604 			    if (mpi->lists[j].bucket == bp) {
2605 				ASSERT(mpi->lists[j].ix == ix);
2606 				break;
2607 			    }
2608 			    ASSERT(mpi->lists[j].ix != ix);
2609 			}
2610 			mpi->something_can_match = 1;
2611 		    }
2612 		} else {
2613 		    mpi->key_given = 0;
2614 		    mpi->something_can_match = 1;
2615 		}
2616 	    }
2617 	}
2618     }
2619 
2620     /*
2621      * It would be nice not to compile the match_spec if nothing could match,
2622      * but then the select calls would not fail like they should on bad
2623      * match specs that happen to specify non existent keys etc.
2624      */
2625     if ((mpi->mp = db_match_compile(matches, guards, bodies,
2626 				    num_heads, DCOMP_TABLE, NULL))
2627 	== NULL) {
2628 	if (buff != sbuff) {
2629 	    erts_free(ERTS_ALC_T_DB_TMP, buff);
2630 	}
2631 	return DB_ERROR_BADPARAM;
2632     }
2633     if (buff != sbuff) {
2634 	erts_free(ERTS_ALC_T_DB_TMP, buff);
2635     }
2636     return DB_ERROR_NONE;
2637 }
2638 
alloc_ext_segtab(DbTableHash * tb,unsigned seg_ix)2639 static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix)
2640 {
2641     struct segment** old_segtab = SEGTAB(tb);
2642     int nsegs = 0;
2643     struct ext_segtab* est;
2644 
2645     ASSERT(seg_ix >= NSEG_1);
2646     switch (seg_ix) {
2647     case NSEG_1: nsegs = NSEG_2; break;
2648     default:     nsegs = seg_ix + NSEG_INC; break;
2649     }
2650     ASSERT(nsegs > tb->nsegs);
2651     est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
2652                                              (DbTable *) tb,
2653                                              SIZEOF_EXT_SEGTAB(nsegs));
2654     est->nsegs = nsegs;
2655     est->prev_segtab = old_segtab;
2656     est->prev_nsegs = tb->nsegs;
2657     sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
2658 #ifdef DEBUG
2659     sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
2660 #endif
2661     return est;
2662 }
2663 
2664 /* Extend table with one new segment
2665 */
alloc_seg(DbTableHash * tb)2666 static void alloc_seg(DbTableHash *tb)
2667 {
2668     int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots);
2669     struct segment** segtab;
2670 
2671     ASSERT(seg_ix > 0);
2672     if (seg_ix == tb->nsegs) { /* New segtab needed */
2673 	struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix);
2674         SET_SEGTAB(tb, est->segtab);
2675         tb->nsegs = est->nsegs;
2676     }
2677     ASSERT(seg_ix < tb->nsegs);
2678     segtab = SEGTAB(tb);
2679     segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
2680                                                      (DbTable *) tb,
2681                                                      SIZEOF_SEGMENT(EXT_SEGSZ));
2682     sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ));
2683     tb->nslots += EXT_SEGSZ;
2684 }
2685 
dealloc_ext_segtab(void * lop_data)2686 static void dealloc_ext_segtab(void* lop_data)
2687 {
2688     struct ext_segtab* est = (struct ext_segtab*) lop_data;
2689 
2690     erts_free(ERTS_ALC_T_DB_SEG, est);
2691 }
2692 
2693 /* Shrink table by freeing the top segment
2694 ** free_records: 1=free any records in segment, 0=assume segment is empty
2695 */
free_seg(DbTableHash * tb,int free_records)2696 static int free_seg(DbTableHash *tb, int free_records)
2697 {
2698     const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1;
2699     struct segment** const segtab = SEGTAB(tb);
2700     struct segment* const segp = segtab[seg_ix];
2701     Uint seg_sz;
2702     int nrecords = 0;
2703 
2704     ASSERT(segp != NULL);
2705 #ifndef DEBUG
2706     if (free_records)
2707 #endif
2708     {
2709 	int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
2710 	while (i--) {
2711 	    HashDbTerm* p = segp->buckets[i];
2712 	    while(p != 0) {
2713 		HashDbTerm* nxt = p->next;
2714 		ASSERT(free_records); /* segment not empty as assumed? */
2715 		free_term(tb, p);
2716 		p = nxt;
2717 		++nrecords;
2718 	    }
2719 	}
2720     }
2721 
2722     if (seg_ix >= NSEG_1) {
2723         struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab);
2724 
2725         if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */
2726             ASSERT(est->prev_segtab != NULL);
2727             SET_SEGTAB(tb, est->prev_segtab);
2728             tb->nsegs = est->prev_nsegs;
2729 
2730             if (!tb->common.is_thread_safe) {
2731                 /*
2732                  * Table is doing a graceful shrink operation and we must avoid
2733                  * deallocating this segtab while it may still be read by other
2734                  * threads. Schedule deallocation with thread progress to make
2735                  * sure no lingering threads are still hanging in BUCKET macro
2736                  * with an old segtab pointer.
2737                  */
2738                 Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs);
2739                 ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est));
2740                 ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0);
2741                 erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab,
2742                                                         est,
2743                                                         &est->lop,
2744                                                         sz);
2745             }
2746             else
2747                 erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est,
2748                              SIZEOF_EXT_SEGTAB(est->nsegs));
2749         }
2750     }
2751     seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
2752     erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz));
2753 
2754 #ifdef DEBUG
2755     if (seg_ix < tb->nsegs)
2756         SEGTAB(tb)[seg_ix] = NULL;
2757 #endif
2758     tb->nslots -= seg_sz;
2759     ASSERT(tb->nslots >= 0);
2760     return nrecords;
2761 }
2762 
2763 
2764 /*
2765 ** Copy terms from ptr1 until ptr2
2766 ** works for ptr1 == ptr2 == 0  => []
2767 ** or ptr2 == 0
2768 ** sz is either precalculated heap size or 0 if not known
2769 */
build_term_list(Process * p,HashDbTerm * ptr1,HashDbTerm * ptr2,Uint sz,DbTableHash * tb)2770 static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
2771 			     Uint sz, DbTableHash* tb)
2772 {
2773     HashDbTerm* ptr;
2774     Eterm list = NIL;
2775     Eterm copy;
2776     Eterm *hp, *hend;
2777 
2778     if (!sz) {
2779 	ptr = ptr1;
2780 	while(ptr != ptr2) {
2781 	    if (!is_pseudo_deleted(ptr))
2782 		sz += ptr->dbterm.size + 2;
2783 	    ptr = ptr->next;
2784 	}
2785     }
2786 
2787     hp = HAlloc(p, sz);
2788     hend = hp + sz;
2789 
2790     ptr = ptr1;
2791     while(ptr != ptr2) {
2792 	if (!is_pseudo_deleted(ptr)) {
2793 	    copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p));
2794 	    list = CONS(hp, copy, list);
2795 	    hp  += 2;
2796 	}
2797 	ptr = ptr->next;
2798     }
2799     HRelease(p,hend,hp);
2800 
2801     return list;
2802 }
2803 
2804 static ERTS_INLINE int
begin_resizing(DbTableHash * tb)2805 begin_resizing(DbTableHash* tb)
2806 {
2807     if (DB_USING_FINE_LOCKING(tb))
2808 	return !erts_atomic_xchg_acqb(&tb->is_resizing, 1);
2809     else
2810         ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
2811     return 1;
2812 }
2813 
2814 static ERTS_INLINE void
done_resizing(DbTableHash * tb)2815 done_resizing(DbTableHash* tb)
2816 {
2817     if (DB_USING_FINE_LOCKING(tb))
2818 	erts_atomic_set_relb(&tb->is_resizing, 0);
2819 }
2820 
2821 /* Grow table with one or more new buckets.
2822 ** Allocate new segment if needed.
2823 */
grow(DbTableHash * tb,int nitems)2824 static void grow(DbTableHash* tb, int nitems)
2825 {
2826     HashDbTerm** pnext;
2827     HashDbTerm** to_pnext;
2828     HashDbTerm* p;
2829     erts_rwmtx_t* lck;
2830     int nactive;
2831     int from_ix, to_ix;
2832     int szm;
2833     int loop_limit = 5;
2834 
2835     do {
2836         if (!begin_resizing(tb))
2837             return; /* already in progress */
2838         nactive = NACTIVE(tb);
2839         if (nitems <= GROW_LIMIT(nactive)) {
2840             goto abort; /* already done (race) */
2841         }
2842 
2843         /* Ensure that the slot nactive exists */
2844         if (nactive == tb->nslots) {
2845             /* Time to get a new segment */
2846             ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0);
2847             alloc_seg(tb);
2848         }
2849         ASSERT(nactive < tb->nslots);
2850 
2851         szm = erts_atomic_read_nob(&tb->szm);
2852         if (nactive <= szm) {
2853             from_ix = nactive & (szm >> 1);
2854         } else {
2855             ASSERT(nactive == szm+1);
2856             from_ix = 0;
2857             szm = (szm<<1) | 1;
2858         }
2859         to_ix = nactive;
2860 
2861         lck = WLOCK_HASH(tb, from_ix);
2862         ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix));
2863         /* Now a final double check (with the from_ix lock held)
2864          * that we did not get raced by a table fixer.
2865          */
2866         if (IS_FIXED(tb)) {
2867             WUNLOCK_HASH(lck);
2868             goto abort;
2869         }
2870         erts_atomic_set_nob(&tb->nactive, ++nactive);
2871         if (from_ix == 0) {
2872             if (DB_USING_FINE_LOCKING(tb))
2873                 erts_atomic_set_relb(&tb->szm, szm);
2874             else
2875                 erts_atomic_set_nob(&tb->szm, szm);
2876         }
2877         done_resizing(tb);
2878 
2879         /* Finally, let's split the bucket. We try to do it in a smart way
2880            to keep link order and avoid unnecessary updates of next-pointers */
2881         pnext = &BUCKET(tb, from_ix);
2882         p = *pnext;
2883         to_pnext = &BUCKET(tb, to_ix);
2884         while (p != NULL) {
2885             if (is_pseudo_deleted(p)) { /* rare but possible with fine locking */
2886                 *pnext = p->next;
2887                 free_term(tb, p);
2888                 p = *pnext;
2889             }
2890             else {
2891                 int ix = p->hvalue & szm;
2892                 if (ix != from_ix) {
2893                     ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
2894                     *to_pnext = p;
2895                     /* Swap "from" and "to": */
2896                     from_ix = ix;
2897                     to_pnext = pnext;
2898                 }
2899                 pnext = &p->next;
2900                 p = *pnext;
2901             }
2902         }
2903         *to_pnext = NULL;
2904         WUNLOCK_HASH(lck);
2905 
2906     }while (--loop_limit && nitems > GROW_LIMIT(nactive));
2907 
2908     return;
2909 
2910 abort:
2911     done_resizing(tb);
2912 }
2913 
2914 
2915 /* Shrink table by joining top bucket.
2916 ** Remove top segment if it gets empty.
2917 */
shrink(DbTableHash * tb,int nitems)2918 static void shrink(DbTableHash* tb, int nitems)
2919 {
2920     HashDbTerm** src_bp;
2921     HashDbTerm** dst_bp;
2922     HashDbTerm** bp;
2923     erts_rwmtx_t* lck;
2924     int src_ix, dst_ix, low_szm;
2925     int nactive;
2926     int loop_limit = 5;
2927 
2928     do {
2929         if (!begin_resizing(tb))
2930             return; /* already in progress */
2931         nactive = NACTIVE(tb);
2932         if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) {
2933             goto abort; /* already done (race) */
2934         }
2935         src_ix = nactive - 1;
2936         low_szm = erts_atomic_read_nob(&tb->szm) >> 1;
2937         dst_ix = src_ix & low_szm;
2938 
2939         ASSERT(dst_ix < src_ix);
2940         ASSERT(nactive > FIRST_SEGSZ);
2941         lck = WLOCK_HASH(tb, dst_ix);
2942         ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix));
2943         /* Double check for racing table fixers */
2944         if (IS_FIXED(tb)) {
2945             WUNLOCK_HASH(lck);
2946             goto abort;
2947         }
2948 
2949         src_bp = &BUCKET(tb, src_ix);
2950         dst_bp = &BUCKET(tb, dst_ix);
2951         bp = src_bp;
2952 
2953         /*
2954          * We join lists by appending "dst" at the end of "src"
2955          * as we must step through "src" anyway to purge pseudo deleted.
2956          */
2957         while(*bp != NULL) {
2958             if (is_pseudo_deleted(*bp)) {
2959                 HashDbTerm* deleted = *bp;
2960                 *bp = deleted->next;
2961                 free_term(tb, deleted);
2962             } else {
2963                 bp = &(*bp)->next;
2964             }
2965         }
2966         *bp = *dst_bp;
2967         *dst_bp = *src_bp;
2968         *src_bp = NULL;
2969 
2970         nactive = src_ix;
2971         erts_atomic_set_nob(&tb->nactive, nactive);
2972         if (dst_ix == 0) {
2973             erts_atomic_set_relb(&tb->szm, low_szm);
2974         }
2975         WUNLOCK_HASH(lck);
2976 
2977         if (tb->nslots - src_ix >= EXT_SEGSZ) {
2978             free_seg(tb, 0);
2979         }
2980         done_resizing(tb);
2981 
2982     } while (--loop_limit
2983              && nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive));
2984     return;
2985 
2986 abort:
2987     done_resizing(tb);
2988 }
2989 
2990 /* Search a list of tuples for a matching key */
2991 
search_list(DbTableHash * tb,Eterm key,HashValue hval,HashDbTerm * list)2992 static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
2993 			       HashValue hval, HashDbTerm *list)
2994 {
2995     while (list != 0) {
2996 	if (has_live_key(tb,list,key,hval))
2997 	    return list;
2998 	list = list->next;
2999     }
3000     return 0;
3001 }
3002 
3003 
3004 /* This function is called by the next AND the select BIF */
3005 /* It return the next live object in a table, NULL if no more */
3006 /* In-bucket: RLOCKED */
3007 /* Out-bucket: RLOCKED unless NULL */
next_live(DbTableHash * tb,Uint * iptr,erts_rwmtx_t ** lck_ptr,HashDbTerm * list)3008 static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
3009 			     HashDbTerm *list)
3010 {
3011     int i;
3012 
3013     ERTS_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr));
3014 
3015     for ( ; list != NULL; list = list->next) {
3016 	if (!is_pseudo_deleted(list))
3017 	    return list;
3018     }
3019 
3020     i = *iptr;
3021     while ((i=next_slot(tb, i, lck_ptr)) != 0) {
3022 
3023 	list = BUCKET(tb,i);
3024 	while (list != NULL) {
3025 	    if (!is_pseudo_deleted(list)) {
3026 		*iptr = i;
3027 		return list;
3028 	    }
3029 	    list = list->next;
3030 	}
3031     }
3032     /* *iptr = ??? */
3033     return NULL;
3034 }
3035 
3036 static int
db_lookup_dbterm_hash(Process * p,DbTable * tbl,Eterm key,Eterm obj,DbUpdateHandle * handle)3037 db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
3038                       DbUpdateHandle* handle)
3039 {
3040     DbTableHash *tb = &tbl->hash;
3041     HashValue hval;
3042     HashDbTerm **bp, *b;
3043     erts_rwmtx_t* lck;
3044     int flags = 0;
3045 
3046     ASSERT(tb->common.status & DB_SET);
3047 
3048     hval = MAKE_HASH(key);
3049     lck = WLOCK_HASH(tb, hval);
3050     bp = &BUCKET(tb, hash_to_ix(tb, hval));
3051     b = *bp;
3052 
3053     for (;;) {
3054         if (b == NULL) {
3055             break;
3056         }
3057         if (has_key(tb, b, key, hval)) {
3058             if (!is_pseudo_deleted(b)) {
3059                 goto Ldone;
3060             }
3061             break;
3062         }
3063         bp = &b->next;
3064         b = *bp;
3065     }
3066 
3067     if (obj == THE_NON_VALUE) {
3068         WUNLOCK_HASH(lck);
3069         return 0;
3070     }
3071 
3072     {
3073         Eterm *objp = tuple_val(obj);
3074         int arity = arityval(*objp);
3075         Eterm *htop, *hend;
3076 
3077         ASSERT(arity >= tb->common.keypos);
3078         htop = HAlloc(p, arity + 1);
3079         hend = htop + arity + 1;
3080         sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
3081         htop[tb->common.keypos] = key;
3082         obj = make_tuple(htop);
3083 
3084         if (b == NULL) {
3085             HashDbTerm *q = new_dbterm(tb, obj);
3086 
3087             q->hvalue = hval;
3088             q->pseudo_deleted = 0;
3089             q->next = NULL;
3090             *bp = b = q;
3091             flags |= DB_INC_TRY_GROW;
3092         } else {
3093             HashDbTerm *q, *next = b->next;
3094 
3095             ASSERT(is_pseudo_deleted(b));
3096             q = replace_dbterm(tb, b, obj);
3097             q->next = next;
3098             ASSERT(q->hvalue == hval);
3099             q->pseudo_deleted = 0;
3100             *bp = b = q;
3101             erts_atomic_inc_nob(&tb->common.nitems);
3102         }
3103 
3104         HRelease(p, hend, htop);
3105         flags |= DB_NEW_OBJECT;
3106     }
3107 
3108 Ldone:
3109     handle->tb = tbl;
3110     handle->bp = (void **)bp;
3111     handle->dbterm = &b->dbterm;
3112     handle->flags = flags;
3113     handle->new_size = b->dbterm.size;
3114     handle->lck = lck;
3115     return 1;
3116 }
3117 
3118 /* Must be called after call to db_lookup_dbterm
3119 */
3120 static void
db_finalize_dbterm_hash(int cret,DbUpdateHandle * handle)3121 db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
3122 {
3123     DbTable* tbl = handle->tb;
3124     DbTableHash *tb = &tbl->hash;
3125     HashDbTerm **bp = (HashDbTerm **) handle->bp;
3126     HashDbTerm *b = *bp;
3127     erts_rwmtx_t* lck = (erts_rwmtx_t*) handle->lck;
3128     HashDbTerm* free_me = NULL;
3129 
3130     ERTS_LC_ASSERT(IS_HASH_WLOCKED(tb, lck));  /* locked by db_lookup_dbterm_hash */
3131 
3132     ASSERT((&b->dbterm == handle->dbterm) == !(tb->common.compress && handle->flags & DB_MUST_RESIZE));
3133 
3134     if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
3135         if (IS_FIXED(tb) && add_fixed_deletion(tb, hash_to_ix(tb, b->hvalue),
3136                                                0)) {
3137             b->pseudo_deleted = 1;
3138         } else {
3139             *bp = b->next;
3140             free_me = b;
3141         }
3142 
3143         WUNLOCK_HASH(lck);
3144         if (!(handle->flags & DB_INC_TRY_GROW))
3145             erts_atomic_dec_nob(&tb->common.nitems);
3146         try_shrink(tb);
3147     } else {
3148         if (handle->flags & DB_MUST_RESIZE) {
3149             ASSERT(cret == DB_ERROR_NONE);
3150             db_finalize_resize(handle, offsetof(HashDbTerm,dbterm));
3151             free_me = b;
3152         }
3153         if (handle->flags & DB_INC_TRY_GROW) {
3154             int nactive;
3155             int nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
3156             ASSERT(cret == DB_ERROR_NONE);
3157             WUNLOCK_HASH(lck);
3158             nactive = NACTIVE(tb);
3159 
3160             if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
3161                 grow(tb, nitems);
3162             }
3163         } else {
3164             WUNLOCK_HASH(lck);
3165         }
3166     }
3167 
3168     if (free_me)
3169         free_term(tb, free_me);
3170 
3171 #ifdef DEBUG
3172     handle->dbterm = 0;
3173 #endif
3174     return;
3175 }
3176 
db_delete_all_objects_hash(Process * p,DbTable * tbl,SWord reds)3177 static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds)
3178 {
3179     if (IS_FIXED(tbl)) {
3180 	reds = db_mark_all_deleted_hash(tbl, reds);
3181     } else {
3182         reds = db_free_table_continue_hash(tbl, reds);
3183         if (reds < 0)
3184             return reds;
3185 
3186 	db_create_hash(p, tbl);
3187 	erts_atomic_set_nob(&tbl->hash.common.nitems, 0);
3188     }
3189     return reds;
3190 }
3191 
db_foreach_offheap_hash(DbTable * tbl,void (* func)(ErlOffHeap *,void *),void * arg)3192 void db_foreach_offheap_hash(DbTable *tbl,
3193 			     void (*func)(ErlOffHeap *, void *),
3194 			     void * arg)
3195 {
3196     DbTableHash *tb = &tbl->hash;
3197     HashDbTerm* list;
3198     int i;
3199     int nactive = NACTIVE(tb);
3200 
3201     for (i = 0; i < nactive; i++) {
3202 	list = BUCKET(tb,i);
3203 	while(list != 0) {
3204 	    ErlOffHeap tmp_offheap;
3205 	    tmp_offheap.first = list->dbterm.first_oh;
3206 	    tmp_offheap.overhead = 0;
3207 	    (*func)(&tmp_offheap, arg);
3208 	    list->dbterm.first_oh = tmp_offheap.first;
3209 	    list = list->next;
3210 	}
3211     }
3212 }
3213 
db_calc_stats_hash(DbTableHash * tb,DbHashStats * stats)3214 void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
3215 {
3216     HashDbTerm* b;
3217     erts_rwmtx_t* lck;
3218     int sum = 0;
3219     int sq_sum = 0;
3220     int kept_items = 0;
3221     int ix;
3222     int len;
3223 
3224     stats->min_chain_len = INT_MAX;
3225     stats->max_chain_len = 0;
3226     ix = 0;
3227     lck = RLOCK_HASH(tb,ix);
3228     do {
3229 	len = 0;
3230 	for (b = BUCKET(tb,ix); b!=NULL; b=b->next) {
3231 	    len++;
3232             if (is_pseudo_deleted(b))
3233                 ++kept_items;
3234 	}
3235 	sum += len;
3236 	sq_sum += len*len;
3237 	if (len < stats->min_chain_len) stats->min_chain_len = len;
3238 	if (len > stats->max_chain_len) stats->max_chain_len = len;
3239 	ix = next_slot(tb,ix,&lck);
3240     }while (ix);
3241     stats->avg_chain_len = (float)sum / NACTIVE(tb);
3242     stats->std_dev_chain_len = sqrt((sq_sum - stats->avg_chain_len*sum) / NACTIVE(tb));
3243     /* Expected	standard deviation from a good uniform hash function,
3244        ie binomial distribution (not taking the linear hashing into acount) */
3245     stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb)));
3246     stats->kept_items = kept_items;
3247 }
3248 
3249 /* For testing only */
erts_ets_hash_sizeof_ext_segtab(void)3250 Eterm erts_ets_hash_sizeof_ext_segtab(void)
3251 {
3252     return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1);
3253 }
3254 
3255 #ifdef ERTS_ENABLE_LOCK_COUNT
erts_lcnt_enable_db_hash_lock_count(DbTableHash * tb,int enable)3256 void erts_lcnt_enable_db_hash_lock_count(DbTableHash *tb, int enable) {
3257     int i;
3258 
3259     if(tb->locks == NULL) {
3260         return;
3261     }
3262 
3263     for(i = 0; i < DB_HASH_LOCK_CNT; i++) {
3264         erts_lcnt_ref_t *ref = &tb->locks->lck_vec[i].lck.lcnt;
3265 
3266         if(enable) {
3267             erts_lcnt_install_new_lock_info(ref, "db_hash_slot", tb->common.the_name,
3268                 ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
3269         } else {
3270             erts_lcnt_uninstall(ref);
3271         }
3272     }
3273 }
3274 #endif /* ERTS_ENABLE_LOCK_COUNT */
3275