1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 1998-2018. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 ** Implementation of unordered ETS tables.
23 ** The tables are implemented as linear dynamic hash tables.
24 ** https://en.wikipedia.org/wiki/Linear_hashing
25 */
26
27 /* SMP:
28 ** The hash table supports two different locking "modes",
29 ** coarse grained and fine grained locking.
30 **
31 ** Coarse grained locking relies entirely on the caller (erl_db.c) to obtain
32 ** the right kind of lock on the entire table depending on operation (reading
33 ** or writing). No further locking is then done by the table itself.
34 **
35 ** Fine grained locking is supported by this code to allow concurrent updates
36 ** (and reading) to different parts of the table. This works by keeping one
37 ** rw-mtx for every N'th bucket. Even dynamic growing and shrinking by
38 ** rehashing buckets can be done without exclusive table lock.
39 **
40 ** A table will support fine grained locking if it is created with flag
41 ** DB_FINE_LOCKED set. The table variable is_thread_safe will then indicate
42 ** if operations need to obtain fine grained locks or not. Some operations
43 ** will for example always use exclusive table lock to guarantee
44 ** a higher level of atomicity.
45 */
46
47 /* FIXATION:
48 ** Fixating the table, by ets:safe_fixtable or as done by select-operations,
49 ** guarantees two things in current implementation.
50 ** (1) Keys will not *totaly* disappear from the table. A key can thus be used
51 ** as an iterator to find the next key in iteration sequence. Note however
52 ** that this does not mean that (pointers to) table objects are guaranteed
53 ** to be maintained while the table is fixated. A BAG or DBAG may actually
54 ** remove objects as long as there is at least one object left in the table
55 ** with the same key (alive or pseudo-deleted).
56 ** (2) Objects will not be moved between buckets due to table grow/shrink.
57 ** This will guarantee that iterations do not miss keys or get double-hits.
58 **
59 ** With fine grained locking, a concurrent thread can fixate the table at any
60 ** time. A "dangerous" operation (delete or move) therefore needs to check
61 ** if the table is fixated while write-locking the bucket.
62 */
63
64 /*
65 #ifdef DEBUG
66 #define HARDDEBUG 1
67 #endif
68 */
69
70 #ifdef HAVE_CONFIG_H
71 # include "config.h"
72 #endif
73
74 #include "sys.h"
75 #include "erl_vm.h"
76 #include "global.h"
77 #include "erl_process.h"
78 #include "error.h"
79 #define ERTS_WANT_DB_INTERNAL__
80 #include "erl_db.h"
81 #include "bif.h"
82 #include "big.h"
83 #include "export.h"
84 #include "erl_binary.h"
85
86 #include "erl_db_hash.h"
87
88 /*
89 * The following symbols can be manipulated to "tune" the linear hash array
90 */
91 #define GROW_LIMIT(NACTIVE) ((NACTIVE)*1)
92 #define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2)
93
94 /*
95 ** We want the first mandatory segment to be small (to reduce minimal footprint)
96 ** and larger extra segments (to reduce number of alloc/free calls).
97 */
98
99 /* Number of slots in first segment */
100 #define FIRST_SEGSZ_EXP 8
101 #define FIRST_SEGSZ (1 << FIRST_SEGSZ_EXP)
102 #define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1)
103
104 /* Number of slots per extra segment */
105 #define EXT_SEGSZ_EXP 11
106 #define EXT_SEGSZ (1 << EXT_SEGSZ_EXP)
107 #define EXT_SEGSZ_MASK (EXT_SEGSZ-1)
108
109 #define NSEG_1 (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*))
110 #define NSEG_2 256 /* Size of second segment table */
111 #define NSEG_INC 128 /* Number of segments to grow after that */
112
113 # define DB_USING_FINE_LOCKING(TB) (((TB))->common.type & DB_FINE_LOCKED)
114
115 #ifdef ETHR_ORDERED_READ_DEPEND
116 #define SEGTAB(tb) ((struct segment**) erts_atomic_read_nob(&(tb)->segtab))
117 #else
118 #define SEGTAB(tb) \
119 (DB_USING_FINE_LOCKING(tb) \
120 ? ((struct segment**) erts_atomic_read_ddrb(&(tb)->segtab)) \
121 : ((struct segment**) erts_atomic_read_nob(&(tb)->segtab)))
122 #endif
123 #define NACTIVE(tb) ((int)erts_atomic_read_nob(&(tb)->nactive))
124 #define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems))
125
126 #define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP)
127
128 #define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK]
129
130 /*
131 * When deleting a table, the number of records to delete.
132 * Approximate number, because we must delete entire buckets.
133 */
134 #define DELETE_RECORD_LIMIT 10000
135
136 /* Calculate slot index from hash value.
137 ** RLOCK_HASH or WLOCK_HASH must be done before.
138 */
hash_to_ix(DbTableHash * tb,HashValue hval)139 static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval)
140 {
141 Uint mask = (DB_USING_FINE_LOCKING(tb)
142 ? erts_atomic_read_acqb(&tb->szm)
143 : erts_atomic_read_nob(&tb->szm));
144 Uint ix = hval & mask;
145 if (ix >= erts_atomic_read_nob(&tb->nactive)) {
146 ix &= mask>>1;
147 ASSERT(ix < erts_atomic_read_nob(&tb->nactive));
148 }
149 return ix;
150 }
151
152
alloc_fixdel(DbTableHash * tb)153 static ERTS_INLINE FixedDeletion* alloc_fixdel(DbTableHash* tb)
154 {
155 FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
156 (DbTable *) tb,
157 sizeof(FixedDeletion));
158 ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
159 return fixd;
160 }
161
free_fixdel(DbTableHash * tb,FixedDeletion * fixd)162 static ERTS_INLINE void free_fixdel(DbTableHash* tb, FixedDeletion* fixd)
163 {
164 erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
165 fixd, sizeof(FixedDeletion));
166 ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
167 }
168
link_fixdel(DbTableHash * tb,FixedDeletion * fixd,erts_aint_t fixated_by_me)169 static ERTS_INLINE int link_fixdel(DbTableHash* tb,
170 FixedDeletion* fixd,
171 erts_aint_t fixated_by_me)
172 {
173 erts_aint_t was_next;
174 erts_aint_t exp_next;
175
176 was_next = erts_atomic_read_acqb(&tb->fixdel);
177 do { /* Lockless atomic insertion in linked list: */
178 if (NFIXED(tb) <= fixated_by_me) {
179 free_fixdel(tb, fixd);
180 return 0; /* raced by unfixer */
181 }
182 exp_next = was_next;
183 fixd->next = (FixedDeletion*) exp_next;
184 was_next = erts_atomic_cmpxchg_mb(&tb->fixdel,
185 (erts_aint_t) fixd,
186 exp_next);
187 }while (was_next != exp_next);
188 return 1;
189 }
190
191 /* Remember a slot containing a pseudo-deleted item
192 * Return false if we got raced by unfixing thread
193 * and the object should be deleted for real.
194 */
add_fixed_deletion(DbTableHash * tb,int ix,erts_aint_t fixated_by_me)195 static int add_fixed_deletion(DbTableHash* tb, int ix,
196 erts_aint_t fixated_by_me)
197 {
198 FixedDeletion* fixd = alloc_fixdel(tb);
199 fixd->slot = ix;
200 fixd->all = 0;
201 return link_fixdel(tb, fixd, fixated_by_me);
202 }
203
204
is_pseudo_deleted(HashDbTerm * p)205 static ERTS_INLINE int is_pseudo_deleted(HashDbTerm* p)
206 {
207 return p->pseudo_deleted;
208 }
209
210
211 /* optimised version of make_hash (normal case? atomic key) */
212 #define MAKE_HASH(term) \
213 ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \
214 make_internal_hash(term, 0)) & MAX_HASH_MASK)
215
216 # define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1)
217 # define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck)
218 # define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval))
219
220 /* Fine grained read lock */
RLOCK_HASH(DbTableHash * tb,HashValue hval)221 static ERTS_INLINE erts_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
222 {
223 if (tb->common.is_thread_safe) {
224 return NULL;
225 } else {
226 erts_rwmtx_t* lck = GET_LOCK(tb,hval);
227 ASSERT(tb->common.type & DB_FINE_LOCKED);
228 erts_rwmtx_rlock(lck);
229 return lck;
230 }
231 }
232 /* Fine grained write lock */
WLOCK_HASH(DbTableHash * tb,HashValue hval)233 static ERTS_INLINE erts_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval)
234 {
235 if (tb->common.is_thread_safe) {
236 return NULL;
237 } else {
238 erts_rwmtx_t* lck = GET_LOCK(tb,hval);
239 ASSERT(tb->common.type & DB_FINE_LOCKED);
240 erts_rwmtx_rwlock(lck);
241 return lck;
242 }
243 }
244
RUNLOCK_HASH(erts_rwmtx_t * lck)245 static ERTS_INLINE void RUNLOCK_HASH(erts_rwmtx_t* lck)
246 {
247 if (lck != NULL) {
248 erts_rwmtx_runlock(lck);
249 }
250 }
251
WUNLOCK_HASH(erts_rwmtx_t * lck)252 static ERTS_INLINE void WUNLOCK_HASH(erts_rwmtx_t* lck)
253 {
254 if (lck != NULL) {
255 erts_rwmtx_rwunlock(lck);
256 }
257 }
258
259
260 #ifdef ERTS_ENABLE_LOCK_CHECK
261 # define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd))
262 # define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval)))
263 # define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_lc_rwmtx_is_rwlocked(lck))
264 # define IS_TAB_WLOCKED(tb) erts_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock)
265 #else
266 # define IS_HASH_RLOCKED(tb,hval) (1)
267 # define IS_HASH_WLOCKED(tb,hval) (1)
268 # define IS_TAB_WLOCKED(tb) (1)
269 #endif
270
271
272 /* Iteration helper
273 ** Returns "next" slot index or 0 if EOT reached.
274 ** Slot READ locks updated accordingly, unlocked if EOT.
275 */
next_slot(DbTableHash * tb,Uint ix,erts_rwmtx_t ** lck_ptr)276 static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix,
277 erts_rwmtx_t** lck_ptr)
278 {
279 ix += DB_HASH_LOCK_CNT;
280 if (ix < NACTIVE(tb)) return ix;
281 RUNLOCK_HASH(*lck_ptr);
282 ix = (ix + 1) & DB_HASH_LOCK_MASK;
283 if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix);
284 return ix;
285 }
286 /* Same as next_slot but with WRITE locking */
next_slot_w(DbTableHash * tb,Uint ix,erts_rwmtx_t ** lck_ptr)287 static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix,
288 erts_rwmtx_t** lck_ptr)
289 {
290 ix += DB_HASH_LOCK_CNT;
291 if (ix < NACTIVE(tb)) return ix;
292 WUNLOCK_HASH(*lck_ptr);
293 ix = (ix + 1) & DB_HASH_LOCK_MASK;
294 if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix);
295 return ix;
296 }
297
298
299
free_term(DbTableHash * tb,HashDbTerm * p)300 static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p)
301 {
302 db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm));
303 }
304
free_term_list(DbTableHash * tb,HashDbTerm * p)305 static ERTS_INLINE void free_term_list(DbTableHash *tb, HashDbTerm* p)
306 {
307 while (p) {
308 HashDbTerm* next = p->next;
309 free_term(tb, p);
310 p = next;
311 }
312 }
313
314
315 /*
316 * Local types
317 */
318 struct mp_prefound {
319 HashDbTerm** bucket;
320 int ix;
321 };
322
323 struct mp_info {
324 int something_can_match; /* The match_spec is not "impossible" */
325 int key_given;
326 struct mp_prefound dlists[10]; /* Default list of "pre-found" buckets */
327 struct mp_prefound* lists; /* Buckets to search if keys are given,
328 * = dlists initially */
329 unsigned num_lists; /* Number of elements in "lists",
330 * = 0 initially */
331 Binary *mp; /* The compiled match program */
332 };
333
334 /* A table segment */
335 struct segment {
336 HashDbTerm* buckets[1];
337 };
338 #define SIZEOF_SEGMENT(N) \
339 (offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N))
340
341 /* An extended segment table */
342 struct ext_segtab {
343 ErtsThrPrgrLaterOp lop;
344 struct segment** prev_segtab; /* Used when table is shrinking */
345 int prev_nsegs; /* Size of prev_segtab */
346 int nsegs; /* Size of this segtab */
347 struct segment* segtab[1]; /* The segment table */
348 };
349 #define SIZEOF_EXT_SEGTAB(NSEGS) \
350 (offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS))
351
352
SET_SEGTAB(DbTableHash * tb,struct segment ** segtab)353 static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
354 struct segment** segtab)
355 {
356 if (DB_USING_FINE_LOCKING(tb))
357 erts_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
358 else
359 erts_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab);
360 }
361
362 /* Used by select_replace on analyze_pattern */
363 typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);
364
365 /*
366 ** Forward decl's (static functions)
367 */
368 static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix);
369 static void alloc_seg(DbTableHash *tb);
370 static int free_seg(DbTableHash *tb, int free_records);
371 static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
372 HashDbTerm *list);
373 static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
374 HashValue hval, HashDbTerm *list);
375 static void shrink(DbTableHash* tb, int nitems);
376 static void grow(DbTableHash* tb, int nitems);
377 static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
378 Uint sz, DbTableHash*);
379 static int analyze_pattern(DbTableHash *tb, Eterm pattern,
380 extra_match_validator_t extra_validator, /* Optional callback */
381 struct mp_info *mpi);
382
383 /*
384 * Method interface functions
385 */
386 static int db_first_hash(Process *p,
387 DbTable *tbl,
388 Eterm *ret);
389
390 static int db_next_hash(Process *p,
391 DbTable *tbl,
392 Eterm key,
393 Eterm *ret);
394
395 static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret);
396
397 static int db_get_element_hash(Process *p, DbTable *tbl,
398 Eterm key, int ndex, Eterm *ret);
399
400 static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret);
401
402 static int db_slot_hash(Process *p, DbTable *tbl,
403 Eterm slot_term, Eterm *ret);
404
405 static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
406 Eterm pattern, Sint chunk_size,
407 int reverse, Eterm *ret);
408 static int db_select_hash(Process *p, DbTable *tbl, Eterm tid,
409 Eterm pattern, int reverse, Eterm *ret);
410 static int db_select_continue_hash(Process *p, DbTable *tbl,
411 Eterm continuation, Eterm *ret);
412
413 static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
414 Eterm pattern, Eterm *ret);
415 static int db_select_count_continue_hash(Process *p, DbTable *tbl,
416 Eterm continuation, Eterm *ret);
417
418 static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
419 Eterm pattern, Eterm *ret);
420 static int db_select_delete_continue_hash(Process *p, DbTable *tbl,
421 Eterm continuation, Eterm *ret);
422
423 static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid,
424 Eterm pattern, Eterm *ret);
425 static int db_select_replace_continue_hash(Process *p, DbTable *tbl,
426 Eterm continuation, Eterm *ret);
427
428 static int db_take_hash(Process *, DbTable *, Eterm, Eterm *);
429 static void db_print_hash(fmtfn_t to,
430 void *to_arg,
431 int show,
432 DbTable *tbl);
433 static int db_free_empty_table_hash(DbTable *tbl);
434
435 static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds);
436
437
438 static void db_foreach_offheap_hash(DbTable *,
439 void (*)(ErlOffHeap *, void *),
440 void *);
441
442 static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds);
443 #ifdef HARDDEBUG
444 static void db_check_table_hash(DbTableHash *tb);
445 #endif
446 static int
447 db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
448 DbUpdateHandle* handle);
449 static void
450 db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle);
451
try_shrink(DbTableHash * tb)452 static ERTS_INLINE void try_shrink(DbTableHash* tb)
453 {
454 int nactive = NACTIVE(tb);
455 int nitems = NITEMS(tb);
456 if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)
457 && !IS_FIXED(tb)) {
458 shrink(tb, nitems);
459 }
460 }
461
462 /* Is this a live object (not pseodo-deleted) with the specified key?
463 */
has_live_key(DbTableHash * tb,HashDbTerm * b,Eterm key,HashValue hval)464 static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b,
465 Eterm key, HashValue hval)
466 {
467 if (b->hvalue != hval || is_pseudo_deleted(b))
468 return 0;
469 else {
470 Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
471 ASSERT(!is_header(itemKey));
472 return EQ(key, itemKey);
473 }
474 }
475
476 /* Has this object the specified key? Can be pseudo-deleted.
477 */
has_key(DbTableHash * tb,HashDbTerm * b,Eterm key,HashValue hval)478 static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b,
479 Eterm key, HashValue hval)
480 {
481 if (b->hvalue != hval)
482 return 0;
483 else {
484 Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
485 ASSERT(!is_header(itemKey));
486 return EQ(key, itemKey);
487 }
488 }
489
new_dbterm(DbTableHash * tb,Eterm obj)490 static ERTS_INLINE HashDbTerm* new_dbterm(DbTableHash* tb, Eterm obj)
491 {
492 HashDbTerm* p;
493 if (tb->common.compress) {
494 p = db_store_term_comp(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
495 }
496 else {
497 p = db_store_term(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
498 }
499 return p;
500 }
501
replace_dbterm(DbTableHash * tb,HashDbTerm * old,Eterm obj)502 static ERTS_INLINE HashDbTerm* replace_dbterm(DbTableHash* tb, HashDbTerm* old,
503 Eterm obj)
504 {
505 HashDbTerm* ret;
506 ASSERT(old != NULL);
507 if (tb->common.compress) {
508 ret = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
509 }
510 else {
511 ret = db_store_term(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
512 }
513 return ret;
514 }
515
516
517
518 /*
519 ** External interface
520 */
521 DbTableMethod db_hash =
522 {
523 db_create_hash,
524 db_first_hash,
525 db_next_hash,
526 db_first_hash, /* last == first */
527 db_next_hash, /* prev == next */
528 db_put_hash,
529 db_get_hash,
530 db_get_element_hash,
531 db_member_hash,
532 db_erase_hash,
533 db_erase_object_hash,
534 db_slot_hash,
535 db_select_chunk_hash,
536 db_select_hash,
537 db_select_delete_hash,
538 db_select_continue_hash, /* hmm continue_hash? */
539 db_select_delete_continue_hash,
540 db_select_count_hash,
541 db_select_count_continue_hash,
542 db_select_replace_hash,
543 db_select_replace_continue_hash,
544 db_take_hash,
545 db_delete_all_objects_hash,
546 db_free_empty_table_hash,
547 db_free_table_continue_hash,
548 db_print_hash,
549 db_foreach_offheap_hash,
550 db_lookup_dbterm_hash,
551 db_finalize_dbterm_hash
552 };
553
554 #ifdef DEBUG
555 /* Wait a while to provoke race and get code coverage */
DEBUG_WAIT(void)556 static void DEBUG_WAIT(void)
557 {
558 unsigned long spin = 1UL << 20;
559 while (--spin);
560 }
561 #else
562 # define DEBUG_WAIT()
563 #endif
564
565 /* Rare case of restoring the rest of the fixdel list
566 when "unfixer" gets interrupted by "fixer" */
restore_fixdel(DbTableHash * tb,FixedDeletion * fixdel)567 static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
568 {
569 /*int tries = 0;*/
570 DEBUG_WAIT();
571 if (erts_atomic_cmpxchg_relb(&tb->fixdel,
572 (erts_aint_t) fixdel,
573 (erts_aint_t) NULL) != (erts_aint_t) NULL) {
574 /* Oboy, must join lists */
575 FixedDeletion* last = fixdel;
576 erts_aint_t was_tail;
577 erts_aint_t exp_tail;
578
579 while (last->next != NULL) last = last->next;
580 was_tail = erts_atomic_read_acqb(&tb->fixdel);
581 do { /* Lockless atomic list insertion */
582 exp_tail = was_tail;
583 last->next = (FixedDeletion*) exp_tail;
584 /*++tries;*/
585 DEBUG_WAIT();
586 was_tail = erts_atomic_cmpxchg_relb(&tb->fixdel,
587 (erts_aint_t) fixdel,
588 exp_tail);
589 }while (was_tail != exp_tail);
590 }
591 /*erts_fprintf(stderr,"erl_db_hash: restore_fixdel tries=%d\r\n", tries);*/
592 }
593 /*
594 ** Table interface routines ie what's called by the bif's
595 */
596
db_unfix_table_hash(DbTableHash * tb)597 SWord db_unfix_table_hash(DbTableHash *tb)
598 {
599 FixedDeletion* fixdel;
600 SWord work = 0;
601
602 ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
603 || (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock)
604 && !tb->common.is_thread_safe));
605 restart:
606 fixdel = (FixedDeletion*) erts_atomic_xchg_mb(&tb->fixdel,
607 (erts_aint_t) NULL);
608 while (fixdel) {
609 FixedDeletion *free_me;
610
611 do {
612 HashDbTerm **bp;
613 HashDbTerm *b;
614 HashDbTerm *free_us = NULL;
615 erts_rwmtx_t* lck;
616
617 lck = WLOCK_HASH(tb, fixdel->slot);
618
619 if (IS_FIXED(tb)) { /* interrupted by fixer */
620 WUNLOCK_HASH(lck);
621 restore_fixdel(tb,fixdel);
622 if (!IS_FIXED(tb)) {
623 goto restart; /* unfixed again! */
624 }
625 return work;
626 }
627 if (fixdel->slot < NACTIVE(tb)) {
628 bp = &BUCKET(tb, fixdel->slot);
629 b = *bp;
630
631 while (b != NULL) {
632 if (is_pseudo_deleted(b)) {
633 HashDbTerm* nxt = b->next;
634 b->next = free_us;
635 free_us = b;
636 work++;
637 b = *bp = nxt;
638 } else {
639 bp = &b->next;
640 b = b->next;
641 }
642 }
643 }
644 /* else slot has been joined and purged by shrink() */
645 WUNLOCK_HASH(lck);
646 free_term_list(tb, free_us);
647
648 }while (fixdel->all && fixdel->slot-- > 0);
649
650 free_me = fixdel;
651 fixdel = fixdel->next;
652 free_fixdel(tb, free_me);
653 work++;
654 }
655
656 /* ToDo: Maybe try grow/shrink the table as well */
657
658 return work;
659 }
660
db_create_hash(Process * p,DbTable * tbl)661 int db_create_hash(Process *p, DbTable *tbl)
662 {
663 DbTableHash *tb = &tbl->hash;
664
665 erts_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK);
666 erts_atomic_init_nob(&tb->nactive, FIRST_SEGSZ);
667 erts_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
668 erts_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
669 SET_SEGTAB(tb, tb->first_segtab);
670 tb->nsegs = NSEG_1;
671 tb->nslots = FIRST_SEGSZ;
672 tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
673 (DbTable *) tb,
674 SIZEOF_SEGMENT(FIRST_SEGSZ));
675 sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ));
676
677 erts_atomic_init_nob(&tb->is_resizing, 0);
678 if (tb->common.type & DB_FINE_LOCKED) {
679 erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
680 int i;
681 if (tb->common.type & DB_FREQ_READ)
682 rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
683 if (erts_ets_rwmtx_spin_count >= 0)
684 rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
685 tb->locks = (DbTableHashFineLocks*) erts_db_alloc(ERTS_ALC_T_DB_SEG, /* Other type maybe? */
686 (DbTable *) tb,
687 sizeof(DbTableHashFineLocks));
688 for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
689 erts_rwmtx_init_opt(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
690 "db_hash_slot", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
691 }
692 /* This important property is needed to guarantee the two buckets
693 * involved in a grow/shrink operation it protected by the same lock:
694 */
695 ASSERT(erts_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
696 }
697 else { /* coarse locking */
698 tb->locks = NULL;
699 }
700 ERTS_THR_MEMORY_BARRIER;
701 return DB_ERROR_NONE;
702 }
703
db_first_hash(Process * p,DbTable * tbl,Eterm * ret)704 static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret)
705 {
706 DbTableHash *tb = &tbl->hash;
707 Uint ix = 0;
708 erts_rwmtx_t* lck = RLOCK_HASH(tb,ix);
709 HashDbTerm* list;
710
711 list = BUCKET(tb,ix);
712 list = next_live(tb, &ix, &lck, list);
713
714 if (list != NULL) {
715 *ret = db_copy_key(p, tbl, &list->dbterm);
716 RUNLOCK_HASH(lck);
717 }
718 else {
719 *ret = am_EOT;
720 }
721 return DB_ERROR_NONE;
722 }
723
724
db_next_hash(Process * p,DbTable * tbl,Eterm key,Eterm * ret)725 static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
726 {
727 DbTableHash *tb = &tbl->hash;
728 HashValue hval;
729 Uint ix;
730 HashDbTerm* b;
731 erts_rwmtx_t* lck;
732
733 hval = MAKE_HASH(key);
734 lck = RLOCK_HASH(tb,hval);
735 ix = hash_to_ix(tb, hval);
736 b = BUCKET(tb, ix);
737
738 for (;;) {
739 if (b == NULL) {
740 RUNLOCK_HASH(lck);
741 return DB_ERROR_BADKEY;
742 }
743 if (has_key(tb, b, key, hval)) {
744 break;
745 }
746 b = b->next;
747 }
748 /* Key found */
749
750 b = next_live(tb, &ix, &lck, b->next);
751 if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
752 while (b != 0) {
753 if (!has_live_key(tb, b, key, hval)) {
754 break;
755 }
756 b = next_live(tb, &ix, &lck, b->next);
757 }
758 }
759 if (b == NULL) {
760 *ret = am_EOT;
761 }
762 else {
763 *ret = db_copy_key(p, tbl, &b->dbterm);
764 RUNLOCK_HASH(lck);
765 }
766 return DB_ERROR_NONE;
767 }
768
db_put_hash(DbTable * tbl,Eterm obj,int key_clash_fail)769 int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
770 {
771 DbTableHash *tb = &tbl->hash;
772 HashValue hval;
773 int ix;
774 Eterm key;
775 HashDbTerm** bp;
776 HashDbTerm* b;
777 HashDbTerm* q;
778 erts_rwmtx_t* lck;
779 int nitems;
780 int ret = DB_ERROR_NONE;
781
782 key = GETKEY(tb, tuple_val(obj));
783 hval = MAKE_HASH(key);
784 lck = WLOCK_HASH(tb, hval);
785 ix = hash_to_ix(tb, hval);
786 bp = &BUCKET(tb, ix);
787 b = *bp;
788
789 for (;;) {
790 if (b == NULL) {
791 goto Lnew;
792 }
793 if (has_key(tb,b,key,hval)) {
794 break;
795 }
796 bp = &b->next;
797 b = b->next;
798 }
799 /* Key found
800 */
801 if (tb->common.status & DB_SET) {
802 HashDbTerm* bnext = b->next;
803 if (is_pseudo_deleted(b)) {
804 erts_atomic_inc_nob(&tb->common.nitems);
805 b->pseudo_deleted = 0;
806 }
807 else if (key_clash_fail) {
808 ret = DB_ERROR_BADKEY;
809 goto Ldone;
810 }
811 q = replace_dbterm(tb, b, obj);
812 q->next = bnext;
813 ASSERT(q->hvalue == hval);
814 *bp = q;
815 goto Ldone;
816 }
817 else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */
818 q = b;
819 do {
820 if (!is_pseudo_deleted(q)) {
821 ret = DB_ERROR_BADKEY;
822 goto Ldone;
823 }
824 q = q->next;
825 }while (q != NULL && has_key(tb,q,key,hval));
826 }
827 else if (tb->common.status & DB_BAG) {
828 HashDbTerm** qp = bp;
829 q = b;
830 do {
831 if (db_eq(&tb->common,obj,&q->dbterm)) {
832 if (is_pseudo_deleted(q)) {
833 erts_atomic_inc_nob(&tb->common.nitems);
834 q->pseudo_deleted = 0;
835 ASSERT(q->hvalue == hval);
836 if (q != b) { /* must move to preserve key insertion order */
837 *qp = q->next;
838 q->next = b;
839 *bp = q;
840 }
841 }
842 goto Ldone;
843 }
844 qp = &q->next;
845 q = *qp;
846 }while (q != NULL && has_key(tb,q,key,hval));
847 }
848 /*else DB_DUPLICATE_BAG */
849
850 Lnew:
851 q = new_dbterm(tb, obj);
852 q->hvalue = hval;
853 q->pseudo_deleted = 0;
854 q->next = b;
855 *bp = q;
856 nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
857 WUNLOCK_HASH(lck);
858 {
859 int nactive = NACTIVE(tb);
860 if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
861 grow(tb, nitems);
862 }
863 }
864 return DB_ERROR_NONE;
865
866 Ldone:
867 WUNLOCK_HASH(lck);
868 return ret;
869 }
870
871 static Eterm
get_term_list(Process * p,DbTableHash * tb,Eterm key,HashValue hval,HashDbTerm * b1,HashDbTerm ** bend)872 get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval,
873 HashDbTerm *b1, HashDbTerm **bend)
874 {
875 HashDbTerm* b2 = b1->next;
876 Eterm copy;
877 Uint sz = b1->dbterm.size + 2;
878
879 if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
880 while (b2 && has_key(tb, b2, key, hval)) {
881 if (!is_pseudo_deleted(b2))
882 sz += b2->dbterm.size + 2;
883
884 b2 = b2->next;
885 }
886 }
887 copy = build_term_list(p, b1, b2, sz, tb);
888 if (bend) {
889 *bend = b2;
890 }
891 return copy;
892 }
893
db_get_hash(Process * p,DbTable * tbl,Eterm key,Eterm * ret)894 int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
895 {
896 DbTableHash *tb = &tbl->hash;
897 HashValue hval;
898 int ix;
899 HashDbTerm* b;
900 erts_rwmtx_t* lck;
901
902 hval = MAKE_HASH(key);
903 lck = RLOCK_HASH(tb,hval);
904 ix = hash_to_ix(tb, hval);
905 b = BUCKET(tb, ix);
906
907 while(b != 0) {
908 if (has_live_key(tb, b, key, hval)) {
909 *ret = get_term_list(p, tb, key, hval, b, NULL);
910 goto done;
911 }
912 b = b->next;
913 }
914 *ret = NIL;
915 done:
916 RUNLOCK_HASH(lck);
917 return DB_ERROR_NONE;
918 }
919
db_member_hash(DbTable * tbl,Eterm key,Eterm * ret)920 static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
921 {
922 DbTableHash *tb = &tbl->hash;
923 HashValue hval;
924 int ix;
925 HashDbTerm* b1;
926 erts_rwmtx_t* lck;
927
928 hval = MAKE_HASH(key);
929 ix = hash_to_ix(tb, hval);
930 lck = RLOCK_HASH(tb, hval);
931 b1 = BUCKET(tb, ix);
932
933 while(b1 != 0) {
934 if (has_live_key(tb,b1,key,hval)) {
935 *ret = am_true;
936 goto done;
937 }
938 b1 = b1->next;
939 }
940 *ret = am_false;
941 done:
942 RUNLOCK_HASH(lck);
943 return DB_ERROR_NONE;
944 }
945
db_get_element_hash(Process * p,DbTable * tbl,Eterm key,int ndex,Eterm * ret)946 static int db_get_element_hash(Process *p, DbTable *tbl,
947 Eterm key,
948 int ndex,
949 Eterm *ret)
950 {
951 DbTableHash *tb = &tbl->hash;
952 HashValue hval;
953 int ix;
954 HashDbTerm* b1;
955 erts_rwmtx_t* lck;
956 int retval;
957
958 hval = MAKE_HASH(key);
959 lck = RLOCK_HASH(tb, hval);
960 ix = hash_to_ix(tb, hval);
961 b1 = BUCKET(tb, ix);
962
963
964 while(b1 != 0) {
965 if (has_live_key(tb,b1,key,hval)) {
966 if (ndex > arityval(b1->dbterm.tpl[0])) {
967 retval = DB_ERROR_BADITEM;
968 goto done;
969 }
970 if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
971 HashDbTerm* b;
972 HashDbTerm* b2 = b1->next;
973 Eterm elem_list = NIL;
974
975 while(b2 != NULL && has_key(tb,b2,key,hval)) {
976 if (ndex > arityval(b2->dbterm.tpl[0])
977 && !is_pseudo_deleted(b2)) {
978 retval = DB_ERROR_BADITEM;
979 goto done;
980 }
981 b2 = b2->next;
982 }
983 b = b1;
984 while(b != b2) {
985 if (!is_pseudo_deleted(b)) {
986 Eterm *hp;
987 Eterm copy = db_copy_element_from_ets(&tb->common, p,
988 &b->dbterm, ndex, &hp, 2);
989 elem_list = CONS(hp, copy, elem_list);
990 }
991 b = b->next;
992 }
993 *ret = elem_list;
994 }
995 else {
996 Eterm* hp;
997 *ret = db_copy_element_from_ets(&tb->common, p, &b1->dbterm, ndex, &hp, 0);
998 }
999 retval = DB_ERROR_NONE;
1000 goto done;
1001 }
1002 b1 = b1->next;
1003 }
1004 retval = DB_ERROR_BADKEY;
1005 done:
1006 RUNLOCK_HASH(lck);
1007 return retval;
1008 }
1009
1010 /*
1011 ** NB, this is for the db_erase/2 bif.
1012 */
db_erase_hash(DbTable * tbl,Eterm key,Eterm * ret)1013 int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
1014 {
1015 DbTableHash *tb = &tbl->hash;
1016 HashValue hval;
1017 int ix;
1018 HashDbTerm** bp;
1019 HashDbTerm* b;
1020 HashDbTerm* free_us = NULL;
1021 erts_rwmtx_t* lck;
1022 int nitems_diff = 0;
1023
1024 hval = MAKE_HASH(key);
1025 lck = WLOCK_HASH(tb,hval);
1026 ix = hash_to_ix(tb, hval);
1027 bp = &BUCKET(tb, ix);
1028 b = *bp;
1029
1030 while(b != 0) {
1031 if (has_live_key(tb,b,key,hval)) {
1032 --nitems_diff;
1033 if (nitems_diff == -1 && IS_FIXED(tb)
1034 && add_fixed_deletion(tb, ix, 0)) {
1035 /* Pseudo remove (no need to keep several of same key) */
1036 b->pseudo_deleted = 1;
1037 } else {
1038 HashDbTerm* next = b->next;
1039 b->next = free_us;
1040 free_us = b;
1041 b = *bp = next;
1042 continue;
1043 }
1044 }
1045 else {
1046 if (nitems_diff && !is_pseudo_deleted(b))
1047 break;
1048 }
1049 bp = &b->next;
1050 b = b->next;
1051 }
1052 WUNLOCK_HASH(lck);
1053 if (nitems_diff) {
1054 erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
1055 try_shrink(tb);
1056 }
1057 free_term_list(tb, free_us);
1058 *ret = am_true;
1059 return DB_ERROR_NONE;
1060 }
1061
1062 /*
1063 ** This is for the ets:delete_object BIF
1064 */
db_erase_object_hash(DbTable * tbl,Eterm object,Eterm * ret)1065 static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
1066 {
1067 DbTableHash *tb = &tbl->hash;
1068 HashValue hval;
1069 int ix;
1070 HashDbTerm** bp;
1071 HashDbTerm* b;
1072 HashDbTerm* free_us = NULL;
1073 erts_rwmtx_t* lck;
1074 int nitems_diff = 0;
1075 int nkeys = 0;
1076 Eterm key;
1077
1078 key = GETKEY(tb, tuple_val(object));
1079 hval = MAKE_HASH(key);
1080 lck = WLOCK_HASH(tb,hval);
1081 ix = hash_to_ix(tb, hval);
1082 bp = &BUCKET(tb, ix);
1083 b = *bp;
1084
1085 while(b != 0) {
1086 if (has_live_key(tb,b,key,hval)) {
1087 ++nkeys;
1088 if (db_eq(&tb->common,object, &b->dbterm)) {
1089 --nitems_diff;
1090 if (nkeys==1 && IS_FIXED(tb) && add_fixed_deletion(tb,ix,0)) {
1091 b->pseudo_deleted = 1;
1092 bp = &b->next;
1093 b = b->next;
1094 } else {
1095 HashDbTerm* next = b->next;
1096 b->next = free_us;
1097 free_us = b;
1098 b = *bp = next;
1099 }
1100 if (tb->common.status & (DB_DUPLICATE_BAG)) {
1101 continue;
1102 } else {
1103 break;
1104 }
1105 }
1106 }
1107 else if (nitems_diff && !is_pseudo_deleted(b)) {
1108 break;
1109 }
1110 bp = &b->next;
1111 b = b->next;
1112 }
1113 WUNLOCK_HASH(lck);
1114 if (nitems_diff) {
1115 erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
1116 try_shrink(tb);
1117 }
1118 free_term_list(tb, free_us);
1119 *ret = am_true;
1120 return DB_ERROR_NONE;
1121 }
1122
1123
db_slot_hash(Process * p,DbTable * tbl,Eterm slot_term,Eterm * ret)1124 static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret)
1125 {
1126 DbTableHash *tb = &tbl->hash;
1127 erts_rwmtx_t* lck;
1128 Sint slot;
1129 int retval;
1130 int nactive;
1131
1132 if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0)) {
1133 return DB_ERROR_BADPARAM;
1134 }
1135 lck = RLOCK_HASH(tb, slot);
1136 nactive = NACTIVE(tb);
1137 if (slot < nactive) {
1138 *ret = build_term_list(p, BUCKET(tb, slot), NULL, 0, tb);
1139 retval = DB_ERROR_NONE;
1140 }
1141 else if (slot == nactive) {
1142 *ret = am_EOT;
1143 retval = DB_ERROR_NONE;
1144 }
1145 else {
1146 retval = DB_ERROR_BADPARAM;
1147 }
1148 RUNLOCK_HASH(lck);
1149 return retval;
1150 }
1151
1152
1153 /*
1154 * Match traversal callbacks
1155 */
1156
1157 typedef struct match_callbacks_t_ match_callbacks_t;
1158 struct match_callbacks_t_
1159 {
1160 /* Called when no match is possible.
1161 * context_ptr: Pointer to context
1162 * ret: Pointer to traversal function term return.
1163 *
1164 * Both the direct return value and 'ret' are used as the traversal function return values.
1165 */
1166 int (*on_nothing_can_match)(match_callbacks_t* ctx, Eterm* ret);
1167
1168 /* Called for each match result.
1169 * context_ptr: Pointer to context
1170 * slot_ix: Current slot index
1171 * current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element;
1172 * can be (carefully) used to adjust iteration when deleting or replacing elements.
1173 * match_res: The result of running the match program against the current term.
1174 *
1175 * Should return 1 for successful match, 0 otherwise.
1176 */
1177 int (*on_match_res)(match_callbacks_t* ctx, Sint slot_ix,
1178 HashDbTerm*** current_ptr_ptr, Eterm match_res);
1179
1180 /* Called when either we've matched enough elements in this cycle or EOT was reached.
1181 * context_ptr: Pointer to context
1182 * slot_ix: Current slot index
1183 * got: How many elements have been matched so far
1184 * iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle
1185 * mpp: Double pointer to the compiled match program
1186 * ret: Pointer to traversal function term return.
1187 *
1188 * Both the direct return value and 'ret' are used as the traversal function return values.
1189 * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
1190 */
1191 int (*on_loop_ended)(match_callbacks_t* ctx, Sint slot_ix, Sint got,
1192 Sint iterations_left, Binary** mpp, Eterm* ret);
1193
1194 /* Called when it's time to trap
1195 * context_ptr: Pointer to context
1196 * slot_ix: Current slot index
1197 * got: How many elements have been matched so far
1198 * mpp: Double pointer to the compiled match program
1199 * ret: Pointer to traversal function term return.
1200 *
1201 * Both the direct return value and 'ret' are used as the traversal function return values.
1202 * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
1203 */
1204 int (*on_trap)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Binary** mpp,
1205 Eterm* ret);
1206
1207 };
1208
1209
1210 /*
1211 * Begin hash table match traversal
1212 */
match_traverse(Process * p,DbTableHash * tb,Eterm pattern,extra_match_validator_t extra_match_validator,Sint chunk_size,Sint iterations_left,Eterm ** hpp,int lock_for_write,match_callbacks_t * ctx,Eterm * ret)1213 static int match_traverse(Process* p, DbTableHash* tb,
1214 Eterm pattern,
1215 extra_match_validator_t extra_match_validator, /* Optional */
1216 Sint chunk_size, /* If 0, no chunking */
1217 Sint iterations_left, /* Nr. of iterations left */
1218 Eterm** hpp, /* Heap */
1219 int lock_for_write, /* Set to 1 if we're going to delete or
1220 modify existing terms */
1221 match_callbacks_t* ctx,
1222 Eterm* ret)
1223 {
1224 Sint slot_ix; /* Slot index */
1225 HashDbTerm** current_ptr; /* Refers to either the bucket pointer or
1226 * the 'next' pointer in the previous term
1227 */
1228 HashDbTerm* saved_current; /* Helper to avoid double skip on match */
1229 struct mp_info mpi;
1230 unsigned current_list_pos = 0; /* Prefound buckets list index */
1231 Eterm match_res;
1232 Sint got = 0; /* Matched terms counter */
1233 erts_rwmtx_t* lck; /* Slot lock */
1234 int ret_value;
1235 erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
1236 = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
1237 void (*unlock_hash_function)(erts_rwmtx_t*)
1238 = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
1239 Sint (*next_slot_function)(DbTableHash*, Uint, erts_rwmtx_t**)
1240 = (lock_for_write ? next_slot_w : next_slot);
1241
1242 if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi))
1243 != DB_ERROR_NONE)
1244 {
1245 *ret = NIL;
1246 goto done;
1247 }
1248
1249 if (!mpi.something_can_match) {
1250 /* Can't possibly match anything */
1251 ret_value = ctx->on_nothing_can_match(ctx, ret);
1252 goto done;
1253 }
1254
1255 /*
1256 * Look for initial slot / bucket
1257 */
1258 if (!mpi.key_given) {
1259 /* Run this code if pattern is variable or GETKEY(pattern) */
1260 /* is a variable */
1261 slot_ix = 0;
1262 lck = lock_hash_function(tb,slot_ix);
1263 for (;;) {
1264 ASSERT(slot_ix < NACTIVE(tb));
1265 if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) {
1266 break;
1267 }
1268 slot_ix = next_slot_function(tb,slot_ix,&lck);
1269 if (slot_ix == 0) {
1270 ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left,
1271 &mpi.mp, ret);
1272 goto done;
1273 }
1274 }
1275 } else {
1276 /* We have at least one */
1277 slot_ix = mpi.lists[current_list_pos].ix;
1278 lck = lock_hash_function(tb, slot_ix);
1279 current_ptr = mpi.lists[current_list_pos].bucket;
1280 ASSERT(*current_ptr == BUCKET(tb,slot_ix));
1281 ++current_list_pos;
1282 }
1283
1284 /*
1285 * Execute traversal cycle
1286 */
1287 for(;;) {
1288 if (*current_ptr != NULL) {
1289 if (!is_pseudo_deleted(*current_ptr)) {
1290 DbTerm* obj = &(*current_ptr)->dbterm;
1291 if (tb->common.compress)
1292 obj = db_alloc_tmp_uncompressed(&tb->common, obj);
1293 match_res = db_match_dbterm_uncompressed(&tb->common, p, mpi.mp,
1294 obj, hpp, 2);
1295 saved_current = *current_ptr;
1296 if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) {
1297 ++got;
1298 }
1299 if (tb->common.compress)
1300 db_free_tmp_uncompressed(obj);
1301
1302 --iterations_left;
1303 if (*current_ptr != saved_current) {
1304 /* Don't advance to next, the callback did it already */
1305 continue;
1306 }
1307 }
1308 current_ptr = &((*current_ptr)->next);
1309 }
1310 else if (mpi.key_given) { /* Key is bound */
1311 unlock_hash_function(lck);
1312 if (current_list_pos == mpi.num_lists) {
1313 ret_value = ctx->on_loop_ended(ctx, -1, got, iterations_left, &mpi.mp, ret);
1314 goto done;
1315 } else {
1316 slot_ix = mpi.lists[current_list_pos].ix;
1317 lck = lock_hash_function(tb, slot_ix);
1318 current_ptr = mpi.lists[current_list_pos].bucket;
1319 ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
1320 ++current_list_pos;
1321 }
1322 }
1323 else { /* Key is variable */
1324 if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) {
1325 slot_ix = -1;
1326 break;
1327 }
1328 if (chunk_size && got >= chunk_size) {
1329 unlock_hash_function(lck);
1330 break;
1331 }
1332 if (iterations_left <= 0) {
1333 unlock_hash_function(lck);
1334 ret_value = ctx->on_trap(ctx, slot_ix, got, &mpi.mp, ret);
1335 goto done;
1336 }
1337 current_ptr = &BUCKET(tb,slot_ix);
1338 }
1339 }
1340
1341 ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, &mpi.mp, ret);
1342
1343 done:
1344 /* We should only jump directly to this label if
1345 * we've already called ctx->nothing_can_match / loop_ended / trap
1346 */
1347 if (mpi.mp != NULL) {
1348 erts_bin_free(mpi.mp);
1349 }
1350 if (mpi.lists != mpi.dlists) {
1351 erts_free(ERTS_ALC_T_DB_SEL_LIST,
1352 (void *) mpi.lists);
1353 }
1354 return ret_value;
1355
1356 }
1357
1358 /*
1359 * Continue hash table match traversal
1360 */
match_traverse_continue(Process * p,DbTableHash * tb,Sint chunk_size,Sint iterations_left,Eterm ** hpp,Sint slot_ix,Sint got,Binary ** mpp,int lock_for_write,match_callbacks_t * ctx,Eterm * ret)1361 static int match_traverse_continue(Process* p, DbTableHash* tb,
1362 Sint chunk_size, /* If 0, no chunking */
1363 Sint iterations_left, /* Nr. of iterations left */
1364 Eterm** hpp, /* Heap */
1365 Sint slot_ix, /* Slot index to resume traversal from */
1366 Sint got, /* Matched terms counter */
1367 Binary** mpp, /* Existing match program */
1368 int lock_for_write, /* Set to 1 if we're going to delete or
1369 modify existing terms */
1370 match_callbacks_t* ctx,
1371 Eterm* ret)
1372 {
1373 HashDbTerm** current_ptr; /* Refers to either the bucket pointer or
1374 * the 'next' pointer in the previous term
1375 */
1376 HashDbTerm* saved_current; /* Helper to avoid double skip on match */
1377 Eterm match_res;
1378 erts_rwmtx_t* lck;
1379 int ret_value;
1380 erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
1381 = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
1382 void (*unlock_hash_function)(erts_rwmtx_t*)
1383 = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
1384 Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr)
1385 = (lock_for_write ? next_slot_w : next_slot);
1386
1387 if (got < 0) {
1388 *ret = NIL;
1389 return DB_ERROR_BADPARAM;
1390 }
1391
1392 if (slot_ix < 0 /* EOT */
1393 || (chunk_size && got >= chunk_size))
1394 {
1395 /* Already got all or enough in the match_list */
1396 ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
1397 goto done;
1398 }
1399
1400 lck = lock_hash_function(tb, slot_ix);
1401 if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */
1402 unlock_hash_function(lck);
1403 *ret = NIL;
1404 ret_value = DB_ERROR_BADPARAM;
1405 goto done;
1406 }
1407
1408 /*
1409 * Resume traversal cycle from where we left
1410 */
1411 current_ptr = &BUCKET(tb,slot_ix);
1412 for(;;) {
1413 if (*current_ptr != NULL) {
1414 if (!is_pseudo_deleted(*current_ptr)) {
1415 DbTerm* obj = &(*current_ptr)->dbterm;
1416 if (tb->common.compress)
1417 obj = db_alloc_tmp_uncompressed(&tb->common, obj);
1418 match_res = db_match_dbterm_uncompressed(&tb->common, p, *mpp,
1419 obj, hpp, 2);
1420 saved_current = *current_ptr;
1421 if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) {
1422 ++got;
1423 }
1424 if (tb->common.compress)
1425 db_free_tmp_uncompressed(obj);
1426
1427 --iterations_left;
1428 if (*current_ptr != saved_current) {
1429 /* Don't advance to next, the callback did it already */
1430 continue;
1431 }
1432 }
1433 current_ptr = &((*current_ptr)->next);
1434 }
1435 else {
1436 if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) {
1437 slot_ix = -1;
1438 break;
1439 }
1440 if (chunk_size && got >= chunk_size) {
1441 unlock_hash_function(lck);
1442 break;
1443 }
1444 if (iterations_left <= 0) {
1445 unlock_hash_function(lck);
1446 ret_value = ctx->on_trap(ctx, slot_ix, got, mpp, ret);
1447 goto done;
1448 }
1449 current_ptr = &BUCKET(tb,slot_ix);
1450 }
1451 }
1452
1453 ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
1454
1455 done:
1456 /* We should only jump directly to this label if
1457 * we've already called on_loop_ended / on_trap
1458 */
1459 return ret_value;
1460
1461 }
1462
1463
1464 /*
1465 * Common traversal trapping/continuation code;
1466 * used by select_count, select_delete and select_replace,
1467 * as well as their continuation-handling counterparts.
1468 */
1469
on_simple_trap(Export * trap_function,Process * p,DbTableHash * tb,Eterm tid,Eterm * prev_continuation_tptr,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)1470 static ERTS_INLINE int on_simple_trap(Export* trap_function,
1471 Process* p,
1472 DbTableHash* tb,
1473 Eterm tid,
1474 Eterm* prev_continuation_tptr,
1475 Sint slot_ix,
1476 Sint got,
1477 Binary** mpp,
1478 Eterm* ret)
1479 {
1480 Eterm* hp;
1481 Eterm egot;
1482 Eterm mpb;
1483 Eterm continuation;
1484 int is_first_trap = (prev_continuation_tptr == NULL);
1485 size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0);
1486
1487 BUMP_ALL_REDS(p);
1488 if (IS_USMALL(0, got)) {
1489 hp = HAllocX(p, base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE);
1490 egot = make_small(got);
1491 }
1492 else {
1493 hp = HAllocX(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5,
1494 ERTS_MAGIC_REF_THING_SIZE);
1495 egot = uint_to_big(got, hp);
1496 hp += BIG_UINT_HEAP_SIZE;
1497 }
1498
1499 if (is_first_trap) {
1500 if (is_atom(tid))
1501 tid = erts_db_make_tid(p, &tb->common);
1502 mpb = erts_db_make_match_prog_ref(p, *mpp, &hp);
1503 *mpp = NULL; /* otherwise the caller will destroy it */
1504 }
1505 else {
1506 ASSERT(!is_atom(tid));
1507 mpb = prev_continuation_tptr[3];
1508 }
1509
1510 continuation = TUPLE4(
1511 hp,
1512 tid,
1513 make_small(slot_ix),
1514 mpb,
1515 egot);
1516 ERTS_BIF_PREP_TRAP1(*ret, trap_function, p, continuation);
1517 return DB_ERROR_NONE;
1518 }
1519
unpack_simple_continuation(Eterm continuation,Eterm ** tptr_ptr,Eterm * tid_ptr,Sint * slot_ix_p,Binary ** mpp,Sint * got_p)1520 static ERTS_INLINE int unpack_simple_continuation(Eterm continuation,
1521 Eterm** tptr_ptr,
1522 Eterm* tid_ptr,
1523 Sint* slot_ix_p,
1524 Binary** mpp,
1525 Sint* got_p)
1526 {
1527 Eterm* tptr;
1528 ASSERT(is_tuple(continuation));
1529 tptr = tuple_val(continuation);
1530 if (arityval(*tptr) != 4)
1531 return 1;
1532
1533 if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) {
1534 return 1;
1535 }
1536
1537 *tptr_ptr = tptr;
1538 *tid_ptr = tptr[1];
1539 *slot_ix_p = unsigned_val(tptr[2]);
1540 *mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]);
1541 if (is_big(tptr[4])) {
1542 *got_p = big_to_uint32(tptr[4]);
1543 }
1544 else {
1545 *got_p = unsigned_val(tptr[4]);
1546 }
1547 return 0;
1548 }
1549
1550
1551 /*
1552 *
1553 * select / select_chunk match traversal
1554 *
1555 */
1556
1557 #define MAX_SELECT_CHUNK_ITERATIONS 1000
1558
1559 typedef struct {
1560 match_callbacks_t base;
1561 Process* p;
1562 DbTableHash* tb;
1563 Eterm tid;
1564 Eterm* hp;
1565 Sint chunk_size;
1566 Eterm match_list;
1567 Eterm* prev_continuation_tptr;
1568 } select_chunk_context_t;
1569
select_chunk_on_nothing_can_match(match_callbacks_t * ctx_base,Eterm * ret)1570 static int select_chunk_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret)
1571 {
1572 select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1573 *ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
1574 return DB_ERROR_NONE;
1575 }
1576
select_chunk_on_match_res(match_callbacks_t * ctx_base,Sint slot_ix,HashDbTerm *** current_ptr_ptr,Eterm match_res)1577 static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
1578 HashDbTerm*** current_ptr_ptr,
1579 Eterm match_res)
1580 {
1581 select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1582 if (is_value(match_res)) {
1583 ctx->match_list = CONS(ctx->hp, match_res, ctx->match_list);
1584 return 1;
1585 }
1586 return 0;
1587 }
1588
select_chunk_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)1589 static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base,
1590 Sint slot_ix, Sint got,
1591 Sint iterations_left, Binary** mpp,
1592 Eterm* ret)
1593 {
1594 select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1595 Eterm mpb;
1596
1597 if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) {
1598 /* We didn't get to iterate a single time, which means EOT */
1599 ASSERT(ctx->match_list == NIL);
1600 *ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
1601 return DB_ERROR_NONE;
1602 }
1603 else {
1604 ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS);
1605 BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
1606 if (ctx->chunk_size) {
1607 Eterm continuation;
1608 Eterm rest = NIL;
1609 Sint rest_size = 0;
1610
1611 if (got > ctx->chunk_size) { /* Split list in return value and 'rest' */
1612 Eterm tmp = ctx->match_list;
1613 rest = ctx->match_list;
1614 while (got-- > ctx->chunk_size + 1) {
1615 tmp = CDR(list_val(tmp));
1616 ++rest_size;
1617 }
1618 ++rest_size;
1619 ctx->match_list = CDR(list_val(tmp));
1620 CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
1621 been in 'user space' */
1622 }
1623 if (rest != NIL || slot_ix >= 0) { /* Need more calls */
1624 Eterm tid = ctx->tid;
1625 ctx->hp = HAllocX(ctx->p,
1626 3 + 7 + ERTS_MAGIC_REF_THING_SIZE,
1627 ERTS_MAGIC_REF_THING_SIZE);
1628 mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &ctx->hp);
1629 if (is_atom(tid))
1630 tid = erts_db_make_tid(ctx->p,
1631 &ctx->tb->common);
1632 continuation = TUPLE6(
1633 ctx->hp,
1634 tid,
1635 make_small(slot_ix),
1636 make_small(ctx->chunk_size),
1637 mpb, rest,
1638 make_small(rest_size));
1639 *mpp = NULL; /* Otherwise the caller will destroy it */
1640 ctx->hp += 7;
1641 *ret = TUPLE2(ctx->hp, ctx->match_list, continuation);
1642 return DB_ERROR_NONE;
1643 } else { /* All data is exhausted */
1644 if (ctx->match_list != NIL) { /* No more data to search but still a
1645 result to return to the caller */
1646 ctx->hp = HAlloc(ctx->p, 3);
1647 *ret = TUPLE2(ctx->hp, ctx->match_list, am_EOT);
1648 return DB_ERROR_NONE;
1649 } else { /* Reached the end of the ttable with no data to return */
1650 *ret = am_EOT;
1651 return DB_ERROR_NONE;
1652 }
1653 }
1654 }
1655 *ret = ctx->match_list;
1656 return DB_ERROR_NONE;
1657 }
1658 }
1659
select_chunk_on_trap(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)1660 static int select_chunk_on_trap(match_callbacks_t* ctx_base,
1661 Sint slot_ix, Sint got,
1662 Binary** mpp, Eterm* ret)
1663 {
1664 select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1665 Eterm mpb;
1666 Eterm continuation;
1667 Eterm* hp;
1668
1669 BUMP_ALL_REDS(ctx->p);
1670
1671 if (ctx->prev_continuation_tptr == NULL) {
1672 Eterm tid = ctx->tid;
1673 /* First time we're trapping */
1674 hp = HAllocX(ctx->p, 7 + ERTS_MAGIC_REF_THING_SIZE,
1675 ERTS_MAGIC_REF_THING_SIZE);
1676 if (is_atom(tid))
1677 tid = erts_db_make_tid(ctx->p, &ctx->tb->common);
1678 mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp);
1679 continuation = TUPLE6(
1680 hp,
1681 tid,
1682 make_small(slot_ix),
1683 make_small(ctx->chunk_size),
1684 mpb,
1685 ctx->match_list,
1686 make_small(got));
1687 *mpp = NULL; /* otherwise the caller will destroy it */
1688 }
1689 else {
1690 /* Not the first time we're trapping; reuse continuation terms */
1691 hp = HAlloc(ctx->p, 7);
1692 continuation = TUPLE6(
1693 hp,
1694 ctx->prev_continuation_tptr[1],
1695 make_small(slot_ix),
1696 ctx->prev_continuation_tptr[3],
1697 ctx->prev_continuation_tptr[4],
1698 ctx->match_list,
1699 make_small(got));
1700 }
1701 ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->p,
1702 continuation);
1703 return DB_ERROR_NONE;
1704 }
1705
db_select_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,int reverse,Eterm * ret)1706 static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern,
1707 int reverse, Eterm *ret)
1708 {
1709 return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret);
1710 }
1711
db_select_chunk_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Sint chunk_size,int reverse,Eterm * ret)1712 static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
1713 Eterm pattern, Sint chunk_size,
1714 int reverse, Eterm *ret)
1715 {
1716 select_chunk_context_t ctx;
1717
1718 ctx.base.on_nothing_can_match = select_chunk_on_nothing_can_match;
1719 ctx.base.on_match_res = select_chunk_on_match_res;
1720 ctx.base.on_loop_ended = select_chunk_on_loop_ended;
1721 ctx.base.on_trap = select_chunk_on_trap,
1722 ctx.p = p;
1723 ctx.tb = &tbl->hash;
1724 ctx.tid = tid;
1725 ctx.hp = NULL;
1726 ctx.chunk_size = chunk_size;
1727 ctx.match_list = NIL;
1728 ctx.prev_continuation_tptr = NULL;
1729
1730 return match_traverse(
1731 ctx.p, ctx.tb,
1732 pattern, NULL,
1733 ctx.chunk_size,
1734 MAX_SELECT_CHUNK_ITERATIONS,
1735 &ctx.hp, 0,
1736 &ctx.base, ret);
1737 }
1738
1739 /*
1740 *
1741 * select_continue match traversal
1742 *
1743 */
1744
1745 static
select_chunk_continue_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)1746 int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base,
1747 Sint slot_ix, Sint got,
1748 Sint iterations_left, Binary** mpp,
1749 Eterm* ret)
1750 {
1751 select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
1752 Eterm continuation;
1753 Eterm rest = NIL;
1754 Eterm* hp;
1755
1756 ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS);
1757 BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
1758 if (ctx->chunk_size) {
1759 Sint rest_size = 0;
1760 if (got > ctx->chunk_size) {
1761 /* Cannot write destructively here,
1762 the list may have
1763 been in user space */
1764 hp = HAlloc(ctx->p, (got - ctx->chunk_size) * 2);
1765 while (got-- > ctx->chunk_size) {
1766 rest = CONS(hp, CAR(list_val(ctx->match_list)), rest);
1767 hp += 2;
1768 ctx->match_list = CDR(list_val(ctx->match_list));
1769 ++rest_size;
1770 }
1771 }
1772 if (rest != NIL || slot_ix >= 0) {
1773 hp = HAlloc(ctx->p, 3 + 7);
1774 continuation = TUPLE6(
1775 hp,
1776 ctx->prev_continuation_tptr[1],
1777 make_small(slot_ix),
1778 ctx->prev_continuation_tptr[3],
1779 ctx->prev_continuation_tptr[4],
1780 rest,
1781 make_small(rest_size));
1782 hp += 7;
1783 *ret = TUPLE2(hp, ctx->match_list, continuation);
1784 return DB_ERROR_NONE;
1785 } else {
1786 if (ctx->match_list != NIL) {
1787 hp = HAlloc(ctx->p, 3);
1788 *ret = TUPLE2(hp, ctx->match_list, am_EOT);
1789 return DB_ERROR_NONE;
1790 } else {
1791 *ret = am_EOT;
1792 return DB_ERROR_NONE;
1793 }
1794 }
1795 }
1796 *ret = ctx->match_list;
1797 return DB_ERROR_NONE;
1798 }
1799
1800 /*
1801 * This is called when select traps
1802 */
db_select_continue_hash(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret)1803 static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation,
1804 Eterm* ret)
1805 {
1806 select_chunk_context_t ctx;
1807 Eterm* tptr;
1808 Eterm tid;
1809 Binary* mp;
1810 Sint got;
1811 Sint slot_ix;
1812 Sint chunk_size;
1813 Eterm match_list;
1814 Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS;
1815
1816 /* Decode continuation. We know it's a tuple but not the arity or anything else */
1817 ASSERT(is_tuple(continuation));
1818 tptr = tuple_val(continuation);
1819
1820 if (arityval(*tptr) != 6)
1821 goto badparam;
1822
1823 if (!is_small(tptr[2]) || !is_small(tptr[3]) ||
1824 !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
1825 goto badparam;
1826 if ((chunk_size = signed_val(tptr[3])) < 0)
1827 goto badparam;
1828
1829 mp = erts_db_get_match_prog_binary(tptr[4]);
1830 if (mp == NULL)
1831 goto badparam;
1832
1833 if ((got = signed_val(tptr[6])) < 0)
1834 goto badparam;
1835
1836 tid = tptr[1];
1837 slot_ix = signed_val(tptr[2]);
1838 match_list = tptr[5];
1839
1840 /* Proceed */
1841 ctx.base.on_match_res = select_chunk_on_match_res;
1842 ctx.base.on_loop_ended = select_chunk_continue_on_loop_ended;
1843 ctx.base.on_trap = select_chunk_on_trap;
1844 ctx.p = p;
1845 ctx.tb = &tbl->hash;
1846 ctx.tid = tid;
1847 ctx.hp = NULL;
1848 ctx.chunk_size = chunk_size;
1849 ctx.match_list = match_list;
1850 ctx.prev_continuation_tptr = tptr;
1851
1852 return match_traverse_continue(
1853 ctx.p, ctx.tb, ctx.chunk_size,
1854 iterations_left, &ctx.hp, slot_ix, got, &mp, 0,
1855 &ctx.base, ret);
1856
1857 badparam:
1858 *ret = NIL;
1859 return DB_ERROR_BADPARAM;
1860 }
1861
1862 #undef MAX_SELECT_CHUNK_ITERATIONS
1863
1864
1865 /*
1866 *
1867 * select_count match traversal
1868 *
1869 */
1870
1871 #define MAX_SELECT_COUNT_ITERATIONS 1000
1872
1873 typedef struct {
1874 match_callbacks_t base;
1875 Process* p;
1876 DbTableHash* tb;
1877 Eterm tid;
1878 Eterm* prev_continuation_tptr;
1879 } select_count_context_t;
1880
select_count_on_nothing_can_match(match_callbacks_t * ctx_base,Eterm * ret)1881 static int select_count_on_nothing_can_match(match_callbacks_t* ctx_base,
1882 Eterm* ret)
1883 {
1884 *ret = make_small(0);
1885 return DB_ERROR_NONE;
1886 }
1887
select_count_on_match_res(match_callbacks_t * ctx_base,Sint slot_ix,HashDbTerm *** current_ptr_ptr,Eterm match_res)1888 static int select_count_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
1889 HashDbTerm*** current_ptr_ptr,
1890 Eterm match_res)
1891 {
1892 return (match_res == am_true);
1893 }
1894
select_count_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)1895 static int select_count_on_loop_ended(match_callbacks_t* ctx_base,
1896 Sint slot_ix, Sint got,
1897 Sint iterations_left, Binary** mpp,
1898 Eterm* ret)
1899 {
1900 select_count_context_t* ctx = (select_count_context_t*) ctx_base;
1901 ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS);
1902 BUMP_REDS(ctx->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left);
1903 *ret = erts_make_integer(got, ctx->p);
1904 return DB_ERROR_NONE;
1905 }
1906
select_count_on_trap(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)1907 static int select_count_on_trap(match_callbacks_t* ctx_base,
1908 Sint slot_ix, Sint got,
1909 Binary** mpp, Eterm* ret)
1910 {
1911 select_count_context_t* ctx = (select_count_context_t*) ctx_base;
1912 return on_simple_trap(
1913 &ets_select_count_continue_exp,
1914 ctx->p,
1915 ctx->tb,
1916 ctx->tid,
1917 ctx->prev_continuation_tptr,
1918 slot_ix, got, mpp, ret);
1919 }
1920
db_select_count_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret)1921 static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
1922 Eterm pattern, Eterm *ret)
1923 {
1924 select_count_context_t ctx;
1925 Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS;
1926 Sint chunk_size = 0;
1927
1928 ctx.base.on_nothing_can_match = select_count_on_nothing_can_match;
1929 ctx.base.on_match_res = select_count_on_match_res;
1930 ctx.base.on_loop_ended = select_count_on_loop_ended;
1931 ctx.base.on_trap = select_count_on_trap;
1932 ctx.p = p;
1933 ctx.tb = &tbl->hash;
1934 ctx.tid = tid;
1935 ctx.prev_continuation_tptr = NULL;
1936
1937 return match_traverse(
1938 ctx.p, ctx.tb,
1939 pattern, NULL,
1940 chunk_size, iterations_left, NULL, 0,
1941 &ctx.base, ret);
1942 }
1943
1944 /*
1945 * This is called when select_count traps
1946 */
db_select_count_continue_hash(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret)1947 static int db_select_count_continue_hash(Process* p, DbTable* tbl,
1948 Eterm continuation, Eterm* ret)
1949 {
1950 select_count_context_t ctx;
1951 Eterm* tptr;
1952 Eterm tid;
1953 Binary* mp;
1954 Sint got;
1955 Sint slot_ix;
1956 Sint chunk_size = 0;
1957 *ret = NIL;
1958
1959 if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
1960 *ret = NIL;
1961 return DB_ERROR_BADPARAM;
1962 }
1963
1964 ctx.base.on_match_res = select_count_on_match_res;
1965 ctx.base.on_loop_ended = select_count_on_loop_ended;
1966 ctx.base.on_trap = select_count_on_trap;
1967 ctx.p = p;
1968 ctx.tb = &tbl->hash;
1969 ctx.tid = tid;
1970 ctx.prev_continuation_tptr = tptr;
1971
1972 return match_traverse_continue(
1973 ctx.p, ctx.tb, chunk_size,
1974 MAX_SELECT_COUNT_ITERATIONS,
1975 NULL, slot_ix, got, &mp, 0,
1976 &ctx.base, ret);
1977 }
1978
1979 #undef MAX_SELECT_COUNT_ITERATIONS
1980
1981
1982 /*
1983 *
1984 * select_delete match traversal
1985 *
1986 */
1987
1988 #define MAX_SELECT_DELETE_ITERATIONS 1000
1989
1990 typedef struct {
1991 match_callbacks_t base;
1992 Process* p;
1993 DbTableHash* tb;
1994 Eterm tid;
1995 Eterm* prev_continuation_tptr;
1996 erts_aint_t fixated_by_me;
1997 Uint last_pseudo_delete;
1998 HashDbTerm* free_us;
1999 } select_delete_context_t;
2000
select_delete_on_nothing_can_match(match_callbacks_t * ctx_base,Eterm * ret)2001 static int select_delete_on_nothing_can_match(match_callbacks_t* ctx_base,
2002 Eterm* ret)
2003 {
2004 *ret = make_small(0);
2005 return DB_ERROR_NONE;
2006 }
2007
select_delete_on_match_res(match_callbacks_t * ctx_base,Sint slot_ix,HashDbTerm *** current_ptr_ptr,Eterm match_res)2008 static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
2009 HashDbTerm*** current_ptr_ptr,
2010 Eterm match_res)
2011 {
2012 HashDbTerm** current_ptr = *current_ptr_ptr;
2013 select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
2014 HashDbTerm* del;
2015 if (match_res != am_true)
2016 return 0;
2017
2018 if (NFIXED(ctx->tb) > ctx->fixated_by_me) { /* fixated by others? */
2019 if (slot_ix != ctx->last_pseudo_delete) {
2020 if (!add_fixed_deletion(ctx->tb, slot_ix, ctx->fixated_by_me))
2021 goto do_erase;
2022 ctx->last_pseudo_delete = slot_ix;
2023 }
2024 (*current_ptr)->pseudo_deleted = 1;
2025 }
2026 else {
2027 do_erase:
2028 del = *current_ptr;
2029 *current_ptr = (*current_ptr)->next; // replace pointer to term using next
2030 del->next = ctx->free_us;
2031 ctx->free_us = del;
2032 }
2033 erts_atomic_dec_nob(&ctx->tb->common.nitems);
2034
2035 return 1;
2036 }
2037
select_delete_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)2038 static int select_delete_on_loop_ended(match_callbacks_t* ctx_base,
2039 Sint slot_ix, Sint got,
2040 Sint iterations_left, Binary** mpp,
2041 Eterm* ret)
2042 {
2043 select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
2044 free_term_list(ctx->tb, ctx->free_us);
2045 ctx->free_us = NULL;
2046 ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS);
2047 BUMP_REDS(ctx->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left);
2048 if (got) {
2049 try_shrink(ctx->tb);
2050 }
2051 *ret = erts_make_integer(got, ctx->p);
2052 return DB_ERROR_NONE;
2053 }
2054
select_delete_on_trap(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)2055 static int select_delete_on_trap(match_callbacks_t* ctx_base,
2056 Sint slot_ix, Sint got,
2057 Binary** mpp, Eterm* ret)
2058 {
2059 select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
2060 free_term_list(ctx->tb, ctx->free_us);
2061 ctx->free_us = NULL;
2062 return on_simple_trap(
2063 &ets_select_delete_continue_exp,
2064 ctx->p,
2065 ctx->tb,
2066 ctx->tid,
2067 ctx->prev_continuation_tptr,
2068 slot_ix, got, mpp, ret);
2069 }
2070
db_select_delete_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret)2071 static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
2072 Eterm pattern, Eterm *ret)
2073 {
2074 select_delete_context_t ctx;
2075 Sint chunk_size = 0;
2076
2077 ctx.base.on_nothing_can_match = select_delete_on_nothing_can_match;
2078 ctx.base.on_match_res = select_delete_on_match_res;
2079 ctx.base.on_loop_ended = select_delete_on_loop_ended;
2080 ctx.base.on_trap = select_delete_on_trap;
2081 ctx.p = p;
2082 ctx.tb = &tbl->hash;
2083 ctx.tid = tid;
2084 ctx.prev_continuation_tptr = NULL;
2085 ctx.fixated_by_me = ctx.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */
2086 ctx.last_pseudo_delete = (Uint) -1;
2087 ctx.free_us = NULL;
2088
2089 return match_traverse(
2090 ctx.p, ctx.tb,
2091 pattern, NULL,
2092 chunk_size,
2093 MAX_SELECT_DELETE_ITERATIONS, NULL, 1,
2094 &ctx.base, ret);
2095 }
2096
2097 /*
2098 * This is called when select_delete traps
2099 */
db_select_delete_continue_hash(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret)2100 static int db_select_delete_continue_hash(Process* p, DbTable* tbl,
2101 Eterm continuation, Eterm* ret)
2102 {
2103 select_delete_context_t ctx;
2104 Eterm* tptr;
2105 Eterm tid;
2106 Binary* mp;
2107 Sint got;
2108 Sint slot_ix;
2109 Sint chunk_size = 0;
2110
2111 if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
2112 *ret = NIL;
2113 return DB_ERROR_BADPARAM;
2114 }
2115
2116 ctx.base.on_match_res = select_delete_on_match_res;
2117 ctx.base.on_loop_ended = select_delete_on_loop_ended;
2118 ctx.base.on_trap = select_delete_on_trap;
2119 ctx.p = p;
2120 ctx.tb = &tbl->hash;
2121 ctx.tid = tid;
2122 ctx.prev_continuation_tptr = tptr;
2123 ctx.fixated_by_me = ONLY_WRITER(p, ctx.tb) ? 0 : 1; /* TODO: something nicer */
2124 ctx.last_pseudo_delete = (Uint) -1;
2125 ctx.free_us = NULL;
2126
2127 return match_traverse_continue(
2128 ctx.p, ctx.tb, chunk_size,
2129 MAX_SELECT_DELETE_ITERATIONS,
2130 NULL, slot_ix, got, &mp, 1,
2131 &ctx.base, ret);
2132 }
2133
2134 #undef MAX_SELECT_DELETE_ITERATIONS
2135
2136
2137 /*
2138 *
2139 * select_replace match traversal
2140 *
2141 */
2142
2143 #define MAX_SELECT_REPLACE_ITERATIONS 1000
2144
2145 typedef struct {
2146 match_callbacks_t base;
2147 Process* p;
2148 DbTableHash* tb;
2149 Eterm tid;
2150 Eterm* prev_continuation_tptr;
2151 } select_replace_context_t;
2152
select_replace_on_nothing_can_match(match_callbacks_t * ctx_base,Eterm * ret)2153 static int select_replace_on_nothing_can_match(match_callbacks_t* ctx_base,
2154 Eterm* ret)
2155 {
2156 *ret = make_small(0);
2157 return DB_ERROR_NONE;
2158 }
2159
select_replace_on_match_res(match_callbacks_t * ctx_base,Sint slot_ix,HashDbTerm *** current_ptr_ptr,Eterm match_res)2160 static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
2161 HashDbTerm*** current_ptr_ptr,
2162 Eterm match_res)
2163 {
2164 select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
2165 DbTableHash* tb = ctx->tb;
2166 HashDbTerm* new;
2167 HashDbTerm* next;
2168 HashValue hval;
2169
2170 if (is_value(match_res)) {
2171 #ifdef DEBUG
2172 Eterm key = db_getkey(tb->common.keypos, match_res);
2173 ASSERT(is_value(key));
2174 ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl)));
2175 #endif
2176 next = (**current_ptr_ptr)->next;
2177 hval = (**current_ptr_ptr)->hvalue;
2178 new = new_dbterm(tb, match_res);
2179 new->next = next;
2180 new->hvalue = hval;
2181 new->pseudo_deleted = 0;
2182 free_term(tb, **current_ptr_ptr);
2183 **current_ptr_ptr = new; /* replace 'next' pointer in previous object */
2184 *current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */
2185 return 1;
2186 }
2187 return 0;
2188 }
2189
select_replace_on_loop_ended(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Sint iterations_left,Binary ** mpp,Eterm * ret)2190 static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix,
2191 Sint got, Sint iterations_left,
2192 Binary** mpp, Eterm* ret)
2193 {
2194 select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
2195 ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS);
2196 /* the more objects we've replaced, the more reductions we've consumed */
2197 BUMP_REDS(ctx->p,
2198 MIN(MAX_SELECT_REPLACE_ITERATIONS * 2,
2199 (MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got));
2200 *ret = erts_make_integer(got, ctx->p);
2201 return DB_ERROR_NONE;
2202 }
2203
select_replace_on_trap(match_callbacks_t * ctx_base,Sint slot_ix,Sint got,Binary ** mpp,Eterm * ret)2204 static int select_replace_on_trap(match_callbacks_t* ctx_base,
2205 Sint slot_ix, Sint got,
2206 Binary** mpp, Eterm* ret)
2207 {
2208 select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
2209 return on_simple_trap(
2210 &ets_select_replace_continue_exp,
2211 ctx->p,
2212 ctx->tb,
2213 ctx->tid,
2214 ctx->prev_continuation_tptr,
2215 slot_ix, got, mpp, ret);
2216 }
2217
db_select_replace_hash(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret)2218 static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret)
2219 {
2220 select_replace_context_t ctx;
2221 Sint chunk_size = 0;
2222
2223 /* Bag implementation presented both semantic consistency and performance issues,
2224 * unsupported for now
2225 */
2226 ASSERT(!(tbl->hash.common.status & DB_BAG));
2227
2228 ctx.base.on_nothing_can_match = select_replace_on_nothing_can_match;
2229 ctx.base.on_match_res = select_replace_on_match_res;
2230 ctx.base.on_loop_ended = select_replace_on_loop_ended;
2231 ctx.base.on_trap = select_replace_on_trap;
2232 ctx.p = p;
2233 ctx.tb = &tbl->hash;
2234 ctx.tid = tid;
2235 ctx.prev_continuation_tptr = NULL;
2236
2237 return match_traverse(
2238 ctx.p, ctx.tb,
2239 pattern, db_match_keeps_key,
2240 chunk_size,
2241 MAX_SELECT_REPLACE_ITERATIONS, NULL, 1,
2242 &ctx.base, ret);
2243 }
2244
2245 /*
2246 * This is called when select_replace traps
2247 */
db_select_replace_continue_hash(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret)2248 static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret)
2249 {
2250 select_replace_context_t ctx;
2251 Eterm* tptr;
2252 Eterm tid ;
2253 Binary* mp;
2254 Sint got;
2255 Sint slot_ix;
2256 Sint chunk_size = 0;
2257 *ret = NIL;
2258
2259 if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
2260 *ret = NIL;
2261 return DB_ERROR_BADPARAM;
2262 }
2263
2264 /* Proceed */
2265 ctx.base.on_match_res = select_replace_on_match_res;
2266 ctx.base.on_loop_ended = select_replace_on_loop_ended;
2267 ctx.base.on_trap = select_replace_on_trap;
2268 ctx.p = p;
2269 ctx.tb = &tbl->hash;
2270 ctx.tid = tid;
2271 ctx.prev_continuation_tptr = tptr;
2272
2273 return match_traverse_continue(
2274 ctx.p, ctx.tb, chunk_size,
2275 MAX_SELECT_REPLACE_ITERATIONS,
2276 NULL, slot_ix, got, &mp, 1,
2277 &ctx.base, ret);
2278 }
2279
2280
db_take_hash(Process * p,DbTable * tbl,Eterm key,Eterm * ret)2281 static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
2282 {
2283 DbTableHash *tb = &tbl->hash;
2284 HashDbTerm **bp, *b;
2285 HashDbTerm *free_us = NULL;
2286 HashValue hval = MAKE_HASH(key);
2287 erts_rwmtx_t *lck = WLOCK_HASH(tb, hval);
2288 int ix = hash_to_ix(tb, hval);
2289 int nitems_diff = 0;
2290
2291 *ret = NIL;
2292 for (bp = &BUCKET(tb, ix), b = *bp; b; bp = &b->next, b = b->next) {
2293 if (has_live_key(tb, b, key, hval)) {
2294 HashDbTerm *bend;
2295
2296 *ret = get_term_list(p, tb, key, hval, b, &bend);
2297 while (b != bend) {
2298 --nitems_diff;
2299 if (nitems_diff == -1 && IS_FIXED(tb)
2300 && add_fixed_deletion(tb, ix, 0)) {
2301 /* Pseudo remove (no need to keep several of same key) */
2302 bp = &b->next;
2303 b->pseudo_deleted = 1;
2304 b = b->next;
2305 } else {
2306 HashDbTerm* next = b->next;
2307 b->next = free_us;
2308 free_us = b;
2309 b = *bp = next;
2310 }
2311 }
2312 break;
2313 }
2314 }
2315 WUNLOCK_HASH(lck);
2316 if (nitems_diff) {
2317 erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
2318 try_shrink(tb);
2319 }
2320 free_term_list(tb, free_us);
2321 return DB_ERROR_NONE;
2322 }
2323
2324
2325 /*
2326 ** Other interface routines (not directly coupled to one bif)
2327 */
2328
db_initialize_hash(void)2329 void db_initialize_hash(void)
2330 {
2331 }
2332
2333
db_mark_all_deleted_hash(DbTable * tbl,SWord reds)2334 static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds)
2335 {
2336 const int LOOPS_PER_REDUCTION = 8;
2337 DbTableHash *tb = &tbl->hash;
2338 FixedDeletion* fixdel;
2339 SWord loops = reds * LOOPS_PER_REDUCTION;
2340 int i;
2341
2342 ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb));
2343
2344 fixdel = (FixedDeletion*) erts_atomic_read_nob(&tb->fixdel);
2345 if (fixdel && fixdel->trap) {
2346 /* Continue after trap */
2347 ASSERT(fixdel->all);
2348 ASSERT(fixdel->slot < NACTIVE(tb));
2349 i = fixdel->slot;
2350 }
2351 else {
2352 /* First call */
2353 int ok;
2354 fixdel = alloc_fixdel(tb);
2355 ok = link_fixdel(tb, fixdel, 0);
2356 ASSERT(ok); (void)ok;
2357 i = 0;
2358 }
2359
2360 do {
2361 HashDbTerm* b;
2362 for (b = BUCKET(tb,i); b; b = b->next)
2363 b->pseudo_deleted = 1;
2364 } while (++i < NACTIVE(tb) && --loops > 0);
2365
2366 if (i < NACTIVE(tb)) {
2367 /* Yield */
2368 fixdel->slot = i;
2369 fixdel->all = 0;
2370 fixdel->trap = 1;
2371 return -1;
2372 }
2373
2374 fixdel->slot = NACTIVE(tb) - 1;
2375 fixdel->all = 1;
2376 fixdel->trap = 0;
2377 erts_atomic_set_nob(&tb->common.nitems, 0);
2378 return loops < 0 ? 0 : loops / LOOPS_PER_REDUCTION;
2379 }
2380
2381
2382 /* Display hash table contents (for dump) */
db_print_hash(fmtfn_t to,void * to_arg,int show,DbTable * tbl)2383 static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl)
2384 {
2385 DbTableHash *tb = &tbl->hash;
2386 DbHashStats stats;
2387 int i;
2388
2389 erts_print(to, to_arg, "Buckets: %d\n", NACTIVE(tb));
2390
2391 i = tbl->common.is_thread_safe;
2392 /* If crash dumping we set table to thread safe in order to
2393 avoid taking any locks */
2394 if (ERTS_IS_CRASH_DUMPING)
2395 tbl->common.is_thread_safe = 1;
2396
2397 db_calc_stats_hash(&tbl->hash, &stats);
2398
2399 tbl->common.is_thread_safe = i;
2400
2401 erts_print(to, to_arg, "Chain Length Avg: %f\n", stats.avg_chain_len);
2402 erts_print(to, to_arg, "Chain Length Max: %d\n", stats.max_chain_len);
2403 erts_print(to, to_arg, "Chain Length Min: %d\n", stats.min_chain_len);
2404 erts_print(to, to_arg, "Chain Length Std Dev: %f\n",
2405 stats.std_dev_chain_len);
2406 erts_print(to, to_arg, "Chain Length Expected Std Dev: %f\n",
2407 stats.std_dev_expected);
2408
2409 if (IS_FIXED(tb))
2410 erts_print(to, to_arg, "Fixed: %d\n", stats.kept_items);
2411 else
2412 erts_print(to, to_arg, "Fixed: false\n");
2413
2414 if (show) {
2415 for (i = 0; i < NACTIVE(tb); i++) {
2416 HashDbTerm* list = BUCKET(tb,i);
2417 if (list == NULL)
2418 continue;
2419 erts_print(to, to_arg, "%d: [", i);
2420 while(list != 0) {
2421 if (is_pseudo_deleted(list))
2422 erts_print(to, to_arg, "*");
2423 if (tb->common.compress) {
2424 Eterm key = GETKEY(tb, list->dbterm.tpl);
2425 erts_print(to, to_arg, "key=%T", key);
2426 }
2427 else {
2428 Eterm obj = make_tuple(list->dbterm.tpl);
2429 erts_print(to, to_arg, "%T", obj);
2430 }
2431 if (list->next != 0)
2432 erts_print(to, to_arg, ",");
2433 list = list->next;
2434 }
2435 erts_print(to, to_arg, "]\n");
2436 }
2437 }
2438 }
2439
db_free_empty_table_hash(DbTable * tbl)2440 static int db_free_empty_table_hash(DbTable *tbl)
2441 {
2442 ASSERT(NITEMS(tbl) == 0);
2443 while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0)
2444 ;
2445 return 0;
2446 }
2447
db_free_table_continue_hash(DbTable * tbl,SWord reds)2448 static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds)
2449 {
2450 DbTableHash *tb = &tbl->hash;
2451 FixedDeletion* fixdel = (FixedDeletion*) erts_atomic_read_acqb(&tb->fixdel);
2452 ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE));
2453
2454 while (fixdel != NULL) {
2455 FixedDeletion *fx = fixdel;
2456
2457 fixdel = fx->next;
2458 free_fixdel(tb, fx);
2459 if (--reds < 0) {
2460 erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
2461 return reds; /* Not done */
2462 }
2463 }
2464 erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);
2465
2466 while(tb->nslots != 0) {
2467 reds -= EXT_SEGSZ/64 + free_seg(tb, 1);
2468
2469 /*
2470 * If we have done enough work, get out here.
2471 */
2472 if (reds < 0) {
2473 return reds; /* Not done */
2474 }
2475 }
2476 if (tb->locks != NULL) {
2477 int i;
2478 for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
2479 erts_rwmtx_destroy(GET_LOCK(tb,i));
2480 }
2481 erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
2482 (void*)tb->locks, sizeof(DbTableHashFineLocks));
2483 tb->locks = NULL;
2484 }
2485 ASSERT(erts_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
2486 return reds; /* Done */
2487 }
2488
2489
2490
2491 /*
2492 ** Utility routines. (static)
2493 */
2494 /*
2495 ** For the select functions, analyzes the pattern and determines which
2496 ** slots should be searched. Also compiles the match program
2497 */
analyze_pattern(DbTableHash * tb,Eterm pattern,extra_match_validator_t extra_validator,struct mp_info * mpi)2498 static int analyze_pattern(DbTableHash *tb, Eterm pattern,
2499 extra_match_validator_t extra_validator, /* Optional callback */
2500 struct mp_info *mpi)
2501 {
2502 Eterm *ptpl;
2503 Eterm lst, tpl, ttpl;
2504 Eterm *matches,*guards, *bodies;
2505 Eterm sbuff[30];
2506 Eterm *buff = sbuff;
2507 Eterm key = NIL;
2508 HashValue hval = NIL;
2509 int num_heads = 0;
2510 int i;
2511
2512 mpi->lists = mpi->dlists;
2513 mpi->num_lists = 0;
2514 mpi->key_given = 1;
2515 mpi->something_can_match = 0;
2516 mpi->mp = NULL;
2517
2518 for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
2519 ++num_heads;
2520
2521 if (lst != NIL) {/* proper list... */
2522 return DB_ERROR_BADPARAM;
2523 }
2524
2525 if (num_heads > 10) {
2526 buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
2527 mpi->lists = erts_alloc(ERTS_ALC_T_DB_SEL_LIST,
2528 sizeof(*(mpi->lists)) * num_heads);
2529 }
2530
2531 matches = buff;
2532 guards = buff + num_heads;
2533 bodies = buff + (num_heads * 2);
2534
2535 i = 0;
2536 for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
2537 Eterm match;
2538 Eterm guard;
2539 Eterm body;
2540
2541 ttpl = CAR(list_val(lst));
2542 if (!is_tuple(ttpl)) {
2543 if (buff != sbuff) {
2544 erts_free(ERTS_ALC_T_DB_TMP, buff);
2545 }
2546 return DB_ERROR_BADPARAM;
2547 }
2548 ptpl = tuple_val(ttpl);
2549 if (ptpl[0] != make_arityval(3U)) {
2550 if (buff != sbuff) {
2551 erts_free(ERTS_ALC_T_DB_TMP, buff);
2552 }
2553 return DB_ERROR_BADPARAM;
2554 }
2555 matches[i] = match = tpl = ptpl[1];
2556 guards[i] = guard = ptpl[2];
2557 bodies[i] = body = ptpl[3];
2558
2559 if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
2560 if (buff != sbuff) {
2561 erts_free(ERTS_ALC_T_DB_TMP, buff);
2562 }
2563 return DB_ERROR_BADPARAM;
2564 }
2565
2566 if (!is_list(body) || CDR(list_val(body)) != NIL ||
2567 CAR(list_val(body)) != am_DollarUnderscore) {
2568 }
2569 ++i;
2570 if (!(mpi->key_given)) {
2571 continue;
2572 }
2573 if (tpl == am_Underscore || db_is_variable(tpl) != -1) {
2574 (mpi->key_given) = 0;
2575 (mpi->something_can_match) = 1;
2576 } else {
2577 key = db_getkey(tb->common.keypos, tpl);
2578 if (is_value(key)) {
2579 if (!db_has_variable(key)) { /* Bound key */
2580 int ix, search_slot;
2581 HashDbTerm** bp;
2582 erts_rwmtx_t* lck;
2583 hval = MAKE_HASH(key);
2584 lck = RLOCK_HASH(tb,hval);
2585 ix = hash_to_ix(tb, hval);
2586 bp = &BUCKET(tb,ix);
2587 if (lck == NULL) {
2588 search_slot = search_list(tb,key,hval,*bp) != NULL;
2589 } else {
2590 /* No point to verify if key exist now as there may be
2591 concurrent inserters/deleters anyway */
2592 RUNLOCK_HASH(lck);
2593 search_slot = 1;
2594 }
2595 if (search_slot) {
2596 int j;
2597 for (j=0; ; ++j) {
2598 if (j == mpi->num_lists) {
2599 mpi->lists[mpi->num_lists].bucket = bp;
2600 mpi->lists[mpi->num_lists].ix = ix;
2601 ++mpi->num_lists;
2602 break;
2603 }
2604 if (mpi->lists[j].bucket == bp) {
2605 ASSERT(mpi->lists[j].ix == ix);
2606 break;
2607 }
2608 ASSERT(mpi->lists[j].ix != ix);
2609 }
2610 mpi->something_can_match = 1;
2611 }
2612 } else {
2613 mpi->key_given = 0;
2614 mpi->something_can_match = 1;
2615 }
2616 }
2617 }
2618 }
2619
2620 /*
2621 * It would be nice not to compile the match_spec if nothing could match,
2622 * but then the select calls would not fail like they should on bad
2623 * match specs that happen to specify non existent keys etc.
2624 */
2625 if ((mpi->mp = db_match_compile(matches, guards, bodies,
2626 num_heads, DCOMP_TABLE, NULL))
2627 == NULL) {
2628 if (buff != sbuff) {
2629 erts_free(ERTS_ALC_T_DB_TMP, buff);
2630 }
2631 return DB_ERROR_BADPARAM;
2632 }
2633 if (buff != sbuff) {
2634 erts_free(ERTS_ALC_T_DB_TMP, buff);
2635 }
2636 return DB_ERROR_NONE;
2637 }
2638
alloc_ext_segtab(DbTableHash * tb,unsigned seg_ix)2639 static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix)
2640 {
2641 struct segment** old_segtab = SEGTAB(tb);
2642 int nsegs = 0;
2643 struct ext_segtab* est;
2644
2645 ASSERT(seg_ix >= NSEG_1);
2646 switch (seg_ix) {
2647 case NSEG_1: nsegs = NSEG_2; break;
2648 default: nsegs = seg_ix + NSEG_INC; break;
2649 }
2650 ASSERT(nsegs > tb->nsegs);
2651 est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
2652 (DbTable *) tb,
2653 SIZEOF_EXT_SEGTAB(nsegs));
2654 est->nsegs = nsegs;
2655 est->prev_segtab = old_segtab;
2656 est->prev_nsegs = tb->nsegs;
2657 sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
2658 #ifdef DEBUG
2659 sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
2660 #endif
2661 return est;
2662 }
2663
2664 /* Extend table with one new segment
2665 */
alloc_seg(DbTableHash * tb)2666 static void alloc_seg(DbTableHash *tb)
2667 {
2668 int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots);
2669 struct segment** segtab;
2670
2671 ASSERT(seg_ix > 0);
2672 if (seg_ix == tb->nsegs) { /* New segtab needed */
2673 struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix);
2674 SET_SEGTAB(tb, est->segtab);
2675 tb->nsegs = est->nsegs;
2676 }
2677 ASSERT(seg_ix < tb->nsegs);
2678 segtab = SEGTAB(tb);
2679 segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
2680 (DbTable *) tb,
2681 SIZEOF_SEGMENT(EXT_SEGSZ));
2682 sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ));
2683 tb->nslots += EXT_SEGSZ;
2684 }
2685
dealloc_ext_segtab(void * lop_data)2686 static void dealloc_ext_segtab(void* lop_data)
2687 {
2688 struct ext_segtab* est = (struct ext_segtab*) lop_data;
2689
2690 erts_free(ERTS_ALC_T_DB_SEG, est);
2691 }
2692
2693 /* Shrink table by freeing the top segment
2694 ** free_records: 1=free any records in segment, 0=assume segment is empty
2695 */
free_seg(DbTableHash * tb,int free_records)2696 static int free_seg(DbTableHash *tb, int free_records)
2697 {
2698 const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1;
2699 struct segment** const segtab = SEGTAB(tb);
2700 struct segment* const segp = segtab[seg_ix];
2701 Uint seg_sz;
2702 int nrecords = 0;
2703
2704 ASSERT(segp != NULL);
2705 #ifndef DEBUG
2706 if (free_records)
2707 #endif
2708 {
2709 int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
2710 while (i--) {
2711 HashDbTerm* p = segp->buckets[i];
2712 while(p != 0) {
2713 HashDbTerm* nxt = p->next;
2714 ASSERT(free_records); /* segment not empty as assumed? */
2715 free_term(tb, p);
2716 p = nxt;
2717 ++nrecords;
2718 }
2719 }
2720 }
2721
2722 if (seg_ix >= NSEG_1) {
2723 struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab);
2724
2725 if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */
2726 ASSERT(est->prev_segtab != NULL);
2727 SET_SEGTAB(tb, est->prev_segtab);
2728 tb->nsegs = est->prev_nsegs;
2729
2730 if (!tb->common.is_thread_safe) {
2731 /*
2732 * Table is doing a graceful shrink operation and we must avoid
2733 * deallocating this segtab while it may still be read by other
2734 * threads. Schedule deallocation with thread progress to make
2735 * sure no lingering threads are still hanging in BUCKET macro
2736 * with an old segtab pointer.
2737 */
2738 Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs);
2739 ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est));
2740 ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0);
2741 erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab,
2742 est,
2743 &est->lop,
2744 sz);
2745 }
2746 else
2747 erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est,
2748 SIZEOF_EXT_SEGTAB(est->nsegs));
2749 }
2750 }
2751 seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
2752 erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz));
2753
2754 #ifdef DEBUG
2755 if (seg_ix < tb->nsegs)
2756 SEGTAB(tb)[seg_ix] = NULL;
2757 #endif
2758 tb->nslots -= seg_sz;
2759 ASSERT(tb->nslots >= 0);
2760 return nrecords;
2761 }
2762
2763
2764 /*
2765 ** Copy terms from ptr1 until ptr2
2766 ** works for ptr1 == ptr2 == 0 => []
2767 ** or ptr2 == 0
2768 ** sz is either precalculated heap size or 0 if not known
2769 */
build_term_list(Process * p,HashDbTerm * ptr1,HashDbTerm * ptr2,Uint sz,DbTableHash * tb)2770 static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
2771 Uint sz, DbTableHash* tb)
2772 {
2773 HashDbTerm* ptr;
2774 Eterm list = NIL;
2775 Eterm copy;
2776 Eterm *hp, *hend;
2777
2778 if (!sz) {
2779 ptr = ptr1;
2780 while(ptr != ptr2) {
2781 if (!is_pseudo_deleted(ptr))
2782 sz += ptr->dbterm.size + 2;
2783 ptr = ptr->next;
2784 }
2785 }
2786
2787 hp = HAlloc(p, sz);
2788 hend = hp + sz;
2789
2790 ptr = ptr1;
2791 while(ptr != ptr2) {
2792 if (!is_pseudo_deleted(ptr)) {
2793 copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p));
2794 list = CONS(hp, copy, list);
2795 hp += 2;
2796 }
2797 ptr = ptr->next;
2798 }
2799 HRelease(p,hend,hp);
2800
2801 return list;
2802 }
2803
2804 static ERTS_INLINE int
begin_resizing(DbTableHash * tb)2805 begin_resizing(DbTableHash* tb)
2806 {
2807 if (DB_USING_FINE_LOCKING(tb))
2808 return !erts_atomic_xchg_acqb(&tb->is_resizing, 1);
2809 else
2810 ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
2811 return 1;
2812 }
2813
2814 static ERTS_INLINE void
done_resizing(DbTableHash * tb)2815 done_resizing(DbTableHash* tb)
2816 {
2817 if (DB_USING_FINE_LOCKING(tb))
2818 erts_atomic_set_relb(&tb->is_resizing, 0);
2819 }
2820
2821 /* Grow table with one or more new buckets.
2822 ** Allocate new segment if needed.
2823 */
grow(DbTableHash * tb,int nitems)2824 static void grow(DbTableHash* tb, int nitems)
2825 {
2826 HashDbTerm** pnext;
2827 HashDbTerm** to_pnext;
2828 HashDbTerm* p;
2829 erts_rwmtx_t* lck;
2830 int nactive;
2831 int from_ix, to_ix;
2832 int szm;
2833 int loop_limit = 5;
2834
2835 do {
2836 if (!begin_resizing(tb))
2837 return; /* already in progress */
2838 nactive = NACTIVE(tb);
2839 if (nitems <= GROW_LIMIT(nactive)) {
2840 goto abort; /* already done (race) */
2841 }
2842
2843 /* Ensure that the slot nactive exists */
2844 if (nactive == tb->nslots) {
2845 /* Time to get a new segment */
2846 ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0);
2847 alloc_seg(tb);
2848 }
2849 ASSERT(nactive < tb->nslots);
2850
2851 szm = erts_atomic_read_nob(&tb->szm);
2852 if (nactive <= szm) {
2853 from_ix = nactive & (szm >> 1);
2854 } else {
2855 ASSERT(nactive == szm+1);
2856 from_ix = 0;
2857 szm = (szm<<1) | 1;
2858 }
2859 to_ix = nactive;
2860
2861 lck = WLOCK_HASH(tb, from_ix);
2862 ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix));
2863 /* Now a final double check (with the from_ix lock held)
2864 * that we did not get raced by a table fixer.
2865 */
2866 if (IS_FIXED(tb)) {
2867 WUNLOCK_HASH(lck);
2868 goto abort;
2869 }
2870 erts_atomic_set_nob(&tb->nactive, ++nactive);
2871 if (from_ix == 0) {
2872 if (DB_USING_FINE_LOCKING(tb))
2873 erts_atomic_set_relb(&tb->szm, szm);
2874 else
2875 erts_atomic_set_nob(&tb->szm, szm);
2876 }
2877 done_resizing(tb);
2878
2879 /* Finally, let's split the bucket. We try to do it in a smart way
2880 to keep link order and avoid unnecessary updates of next-pointers */
2881 pnext = &BUCKET(tb, from_ix);
2882 p = *pnext;
2883 to_pnext = &BUCKET(tb, to_ix);
2884 while (p != NULL) {
2885 if (is_pseudo_deleted(p)) { /* rare but possible with fine locking */
2886 *pnext = p->next;
2887 free_term(tb, p);
2888 p = *pnext;
2889 }
2890 else {
2891 int ix = p->hvalue & szm;
2892 if (ix != from_ix) {
2893 ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
2894 *to_pnext = p;
2895 /* Swap "from" and "to": */
2896 from_ix = ix;
2897 to_pnext = pnext;
2898 }
2899 pnext = &p->next;
2900 p = *pnext;
2901 }
2902 }
2903 *to_pnext = NULL;
2904 WUNLOCK_HASH(lck);
2905
2906 }while (--loop_limit && nitems > GROW_LIMIT(nactive));
2907
2908 return;
2909
2910 abort:
2911 done_resizing(tb);
2912 }
2913
2914
2915 /* Shrink table by joining top bucket.
2916 ** Remove top segment if it gets empty.
2917 */
shrink(DbTableHash * tb,int nitems)2918 static void shrink(DbTableHash* tb, int nitems)
2919 {
2920 HashDbTerm** src_bp;
2921 HashDbTerm** dst_bp;
2922 HashDbTerm** bp;
2923 erts_rwmtx_t* lck;
2924 int src_ix, dst_ix, low_szm;
2925 int nactive;
2926 int loop_limit = 5;
2927
2928 do {
2929 if (!begin_resizing(tb))
2930 return; /* already in progress */
2931 nactive = NACTIVE(tb);
2932 if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) {
2933 goto abort; /* already done (race) */
2934 }
2935 src_ix = nactive - 1;
2936 low_szm = erts_atomic_read_nob(&tb->szm) >> 1;
2937 dst_ix = src_ix & low_szm;
2938
2939 ASSERT(dst_ix < src_ix);
2940 ASSERT(nactive > FIRST_SEGSZ);
2941 lck = WLOCK_HASH(tb, dst_ix);
2942 ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix));
2943 /* Double check for racing table fixers */
2944 if (IS_FIXED(tb)) {
2945 WUNLOCK_HASH(lck);
2946 goto abort;
2947 }
2948
2949 src_bp = &BUCKET(tb, src_ix);
2950 dst_bp = &BUCKET(tb, dst_ix);
2951 bp = src_bp;
2952
2953 /*
2954 * We join lists by appending "dst" at the end of "src"
2955 * as we must step through "src" anyway to purge pseudo deleted.
2956 */
2957 while(*bp != NULL) {
2958 if (is_pseudo_deleted(*bp)) {
2959 HashDbTerm* deleted = *bp;
2960 *bp = deleted->next;
2961 free_term(tb, deleted);
2962 } else {
2963 bp = &(*bp)->next;
2964 }
2965 }
2966 *bp = *dst_bp;
2967 *dst_bp = *src_bp;
2968 *src_bp = NULL;
2969
2970 nactive = src_ix;
2971 erts_atomic_set_nob(&tb->nactive, nactive);
2972 if (dst_ix == 0) {
2973 erts_atomic_set_relb(&tb->szm, low_szm);
2974 }
2975 WUNLOCK_HASH(lck);
2976
2977 if (tb->nslots - src_ix >= EXT_SEGSZ) {
2978 free_seg(tb, 0);
2979 }
2980 done_resizing(tb);
2981
2982 } while (--loop_limit
2983 && nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive));
2984 return;
2985
2986 abort:
2987 done_resizing(tb);
2988 }
2989
2990 /* Search a list of tuples for a matching key */
2991
search_list(DbTableHash * tb,Eterm key,HashValue hval,HashDbTerm * list)2992 static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
2993 HashValue hval, HashDbTerm *list)
2994 {
2995 while (list != 0) {
2996 if (has_live_key(tb,list,key,hval))
2997 return list;
2998 list = list->next;
2999 }
3000 return 0;
3001 }
3002
3003
3004 /* This function is called by the next AND the select BIF */
3005 /* It return the next live object in a table, NULL if no more */
3006 /* In-bucket: RLOCKED */
3007 /* Out-bucket: RLOCKED unless NULL */
next_live(DbTableHash * tb,Uint * iptr,erts_rwmtx_t ** lck_ptr,HashDbTerm * list)3008 static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
3009 HashDbTerm *list)
3010 {
3011 int i;
3012
3013 ERTS_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr));
3014
3015 for ( ; list != NULL; list = list->next) {
3016 if (!is_pseudo_deleted(list))
3017 return list;
3018 }
3019
3020 i = *iptr;
3021 while ((i=next_slot(tb, i, lck_ptr)) != 0) {
3022
3023 list = BUCKET(tb,i);
3024 while (list != NULL) {
3025 if (!is_pseudo_deleted(list)) {
3026 *iptr = i;
3027 return list;
3028 }
3029 list = list->next;
3030 }
3031 }
3032 /* *iptr = ??? */
3033 return NULL;
3034 }
3035
3036 static int
db_lookup_dbterm_hash(Process * p,DbTable * tbl,Eterm key,Eterm obj,DbUpdateHandle * handle)3037 db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
3038 DbUpdateHandle* handle)
3039 {
3040 DbTableHash *tb = &tbl->hash;
3041 HashValue hval;
3042 HashDbTerm **bp, *b;
3043 erts_rwmtx_t* lck;
3044 int flags = 0;
3045
3046 ASSERT(tb->common.status & DB_SET);
3047
3048 hval = MAKE_HASH(key);
3049 lck = WLOCK_HASH(tb, hval);
3050 bp = &BUCKET(tb, hash_to_ix(tb, hval));
3051 b = *bp;
3052
3053 for (;;) {
3054 if (b == NULL) {
3055 break;
3056 }
3057 if (has_key(tb, b, key, hval)) {
3058 if (!is_pseudo_deleted(b)) {
3059 goto Ldone;
3060 }
3061 break;
3062 }
3063 bp = &b->next;
3064 b = *bp;
3065 }
3066
3067 if (obj == THE_NON_VALUE) {
3068 WUNLOCK_HASH(lck);
3069 return 0;
3070 }
3071
3072 {
3073 Eterm *objp = tuple_val(obj);
3074 int arity = arityval(*objp);
3075 Eterm *htop, *hend;
3076
3077 ASSERT(arity >= tb->common.keypos);
3078 htop = HAlloc(p, arity + 1);
3079 hend = htop + arity + 1;
3080 sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
3081 htop[tb->common.keypos] = key;
3082 obj = make_tuple(htop);
3083
3084 if (b == NULL) {
3085 HashDbTerm *q = new_dbterm(tb, obj);
3086
3087 q->hvalue = hval;
3088 q->pseudo_deleted = 0;
3089 q->next = NULL;
3090 *bp = b = q;
3091 flags |= DB_INC_TRY_GROW;
3092 } else {
3093 HashDbTerm *q, *next = b->next;
3094
3095 ASSERT(is_pseudo_deleted(b));
3096 q = replace_dbterm(tb, b, obj);
3097 q->next = next;
3098 ASSERT(q->hvalue == hval);
3099 q->pseudo_deleted = 0;
3100 *bp = b = q;
3101 erts_atomic_inc_nob(&tb->common.nitems);
3102 }
3103
3104 HRelease(p, hend, htop);
3105 flags |= DB_NEW_OBJECT;
3106 }
3107
3108 Ldone:
3109 handle->tb = tbl;
3110 handle->bp = (void **)bp;
3111 handle->dbterm = &b->dbterm;
3112 handle->flags = flags;
3113 handle->new_size = b->dbterm.size;
3114 handle->lck = lck;
3115 return 1;
3116 }
3117
3118 /* Must be called after call to db_lookup_dbterm
3119 */
3120 static void
db_finalize_dbterm_hash(int cret,DbUpdateHandle * handle)3121 db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
3122 {
3123 DbTable* tbl = handle->tb;
3124 DbTableHash *tb = &tbl->hash;
3125 HashDbTerm **bp = (HashDbTerm **) handle->bp;
3126 HashDbTerm *b = *bp;
3127 erts_rwmtx_t* lck = (erts_rwmtx_t*) handle->lck;
3128 HashDbTerm* free_me = NULL;
3129
3130 ERTS_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */
3131
3132 ASSERT((&b->dbterm == handle->dbterm) == !(tb->common.compress && handle->flags & DB_MUST_RESIZE));
3133
3134 if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
3135 if (IS_FIXED(tb) && add_fixed_deletion(tb, hash_to_ix(tb, b->hvalue),
3136 0)) {
3137 b->pseudo_deleted = 1;
3138 } else {
3139 *bp = b->next;
3140 free_me = b;
3141 }
3142
3143 WUNLOCK_HASH(lck);
3144 if (!(handle->flags & DB_INC_TRY_GROW))
3145 erts_atomic_dec_nob(&tb->common.nitems);
3146 try_shrink(tb);
3147 } else {
3148 if (handle->flags & DB_MUST_RESIZE) {
3149 ASSERT(cret == DB_ERROR_NONE);
3150 db_finalize_resize(handle, offsetof(HashDbTerm,dbterm));
3151 free_me = b;
3152 }
3153 if (handle->flags & DB_INC_TRY_GROW) {
3154 int nactive;
3155 int nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
3156 ASSERT(cret == DB_ERROR_NONE);
3157 WUNLOCK_HASH(lck);
3158 nactive = NACTIVE(tb);
3159
3160 if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
3161 grow(tb, nitems);
3162 }
3163 } else {
3164 WUNLOCK_HASH(lck);
3165 }
3166 }
3167
3168 if (free_me)
3169 free_term(tb, free_me);
3170
3171 #ifdef DEBUG
3172 handle->dbterm = 0;
3173 #endif
3174 return;
3175 }
3176
db_delete_all_objects_hash(Process * p,DbTable * tbl,SWord reds)3177 static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds)
3178 {
3179 if (IS_FIXED(tbl)) {
3180 reds = db_mark_all_deleted_hash(tbl, reds);
3181 } else {
3182 reds = db_free_table_continue_hash(tbl, reds);
3183 if (reds < 0)
3184 return reds;
3185
3186 db_create_hash(p, tbl);
3187 erts_atomic_set_nob(&tbl->hash.common.nitems, 0);
3188 }
3189 return reds;
3190 }
3191
db_foreach_offheap_hash(DbTable * tbl,void (* func)(ErlOffHeap *,void *),void * arg)3192 void db_foreach_offheap_hash(DbTable *tbl,
3193 void (*func)(ErlOffHeap *, void *),
3194 void * arg)
3195 {
3196 DbTableHash *tb = &tbl->hash;
3197 HashDbTerm* list;
3198 int i;
3199 int nactive = NACTIVE(tb);
3200
3201 for (i = 0; i < nactive; i++) {
3202 list = BUCKET(tb,i);
3203 while(list != 0) {
3204 ErlOffHeap tmp_offheap;
3205 tmp_offheap.first = list->dbterm.first_oh;
3206 tmp_offheap.overhead = 0;
3207 (*func)(&tmp_offheap, arg);
3208 list->dbterm.first_oh = tmp_offheap.first;
3209 list = list->next;
3210 }
3211 }
3212 }
3213
db_calc_stats_hash(DbTableHash * tb,DbHashStats * stats)3214 void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
3215 {
3216 HashDbTerm* b;
3217 erts_rwmtx_t* lck;
3218 int sum = 0;
3219 int sq_sum = 0;
3220 int kept_items = 0;
3221 int ix;
3222 int len;
3223
3224 stats->min_chain_len = INT_MAX;
3225 stats->max_chain_len = 0;
3226 ix = 0;
3227 lck = RLOCK_HASH(tb,ix);
3228 do {
3229 len = 0;
3230 for (b = BUCKET(tb,ix); b!=NULL; b=b->next) {
3231 len++;
3232 if (is_pseudo_deleted(b))
3233 ++kept_items;
3234 }
3235 sum += len;
3236 sq_sum += len*len;
3237 if (len < stats->min_chain_len) stats->min_chain_len = len;
3238 if (len > stats->max_chain_len) stats->max_chain_len = len;
3239 ix = next_slot(tb,ix,&lck);
3240 }while (ix);
3241 stats->avg_chain_len = (float)sum / NACTIVE(tb);
3242 stats->std_dev_chain_len = sqrt((sq_sum - stats->avg_chain_len*sum) / NACTIVE(tb));
3243 /* Expected standard deviation from a good uniform hash function,
3244 ie binomial distribution (not taking the linear hashing into acount) */
3245 stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb)));
3246 stats->kept_items = kept_items;
3247 }
3248
3249 /* For testing only */
erts_ets_hash_sizeof_ext_segtab(void)3250 Eterm erts_ets_hash_sizeof_ext_segtab(void)
3251 {
3252 return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1);
3253 }
3254
3255 #ifdef ERTS_ENABLE_LOCK_COUNT
erts_lcnt_enable_db_hash_lock_count(DbTableHash * tb,int enable)3256 void erts_lcnt_enable_db_hash_lock_count(DbTableHash *tb, int enable) {
3257 int i;
3258
3259 if(tb->locks == NULL) {
3260 return;
3261 }
3262
3263 for(i = 0; i < DB_HASH_LOCK_CNT; i++) {
3264 erts_lcnt_ref_t *ref = &tb->locks->lck_vec[i].lck.lcnt;
3265
3266 if(enable) {
3267 erts_lcnt_install_new_lock_info(ref, "db_hash_slot", tb->common.the_name,
3268 ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
3269 } else {
3270 erts_lcnt_uninstall(ref);
3271 }
3272 }
3273 }
3274 #endif /* ERTS_ENABLE_LOCK_COUNT */
3275