1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 1996-2020. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * This file contains the 'ets' bif interface functions.
23 */
24
25 /*
26 #ifdef DEBUG
27 #define HARDDEBUG 1
28 #endif
29 */
30
31 #ifdef HAVE_CONFIG_H
32 # include "config.h"
33 #endif
34
35 #include "sys.h"
36 #include "erl_vm.h"
37 #include "global.h"
38 #include "erl_process.h"
39 #include "error.h"
40 #define ERTS_WANT_DB_INTERNAL__
41 #include "erl_db.h"
42 #include "bif.h"
43 #include "big.h"
44 #include "erl_binary.h"
45 #include "bif.h"
46
47 /*
48 * Extended error information for ETS functions.
49 */
50
51 #define EXI_TYPE am_type /* The type is wrong. */
52 #define EXI_ID am_id /* The table identifier is invalid. */
53 #define EXI_ACCESS am_access /* Insufficient access rights for ETS table. */
54 #define EXI_TAB_TYPE am_table_type /* Unsupported table type for this operation. */
55 #define EXI_BAD_KEY am_badkey /* No such key exists in the table. */
56 #define EXI_KEY_POS am_keypos /* The element to update is also the key. */
57 #define EXI_POSITION am_position /* The position is out of range. */
58 #define EXI_OWNER am_owner /* The receiving process is already the owner. */
59 #define EXI_NOT_OWNER am_not_owner /* The current process is not the owner. */
60
61 erts_atomic_t erts_ets_misc_mem_size;
62
63 /*
64 ** Utility macros
65 */
66
67 #if defined(DEBUG)
68 # define DBG_RANDOM_REDS(REDS, SEED) \
69 ((REDS) * 0.1 * erts_sched_local_random_float(SEED))
70 #else
71 # define DBG_RANDOM_REDS(REDS, SEED) (REDS)
72 #endif
73
74
75
76 #define DB_BIF_GET_TABLE(TB, WHAT, KIND, BIF_IX) \
77 DB_GET_TABLE(TB, BIF_ARG_1, WHAT, KIND, BIF_IX, NULL, BIF_P)
78
79 #define DB_TRAP_GET_TABLE(TB, TID, WHAT, KIND, BIF_EXP) \
80 DB_GET_TABLE(TB, TID, WHAT, KIND, 0, BIF_EXP, BIF_P)
81
82 #define DB_GET_TABLE(TB, TID, WHAT, KIND, BIF_IX, BIF_EXP, PROC) \
83 do { \
84 Uint freason__; \
85 if (!(TB = db_get_table(PROC, TID, WHAT, KIND, &freason__))) { \
86 return db_bif_fail(PROC, freason__, BIF_IX, BIF_EXP); \
87 } \
88 }while(0)
89
90 #define DB_GET_APPROX_NITEMS(DB) \
91 erts_flxctr_read_approx(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID)
92 #define DB_GET_APPROX_MEM_CONSUMED(DB) \
93 erts_flxctr_read_approx(&(DB)->common.counters, ERTS_DB_TABLE_MEM_COUNTER_ID)
94
db_bif_fail(Process * p,Uint freason,Uint bif_ix,Export * bif_exp)95 static BIF_RETTYPE db_bif_fail(Process* p, Uint freason,
96 Uint bif_ix, Export* bif_exp)
97 {
98 if (freason == TRAP) {
99 if (!bif_exp) {
100 bif_exp = BIF_TRAP_EXPORT(bif_ix);
101 }
102
103 ERTS_BIF_PREP_TRAP(bif_exp, p, bif_exp->info.mfa.arity);
104 }
105
106 p->freason = freason;
107 return THE_NON_VALUE;
108 }
109
110
111 /* Get a key from any table structure and a tagged object */
112 #define TERM_GETKEY(tb, obj) db_getkey((tb)->common.keypos, (obj))
113
114 # define ITERATION_SAFETY(Proc,Tab) \
115 ((IS_TREE_TABLE((Tab)->common.status) || IS_CATREE_TABLE((Tab)->common.status) \
116 || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE \
117 : (((Tab)->common.status & DB_FINE_LOCKED) ? ITER_UNSAFE : ITER_SAFE_LOCKED))
118
119 #define DID_TRAP(P,Ret) (!is_value(Ret) && ((P)->freason == TRAP))
120
121 /*
122 * "fixed_tabs": list of all fixed tables for a process
123 */
124 #ifdef DEBUG
125 static int fixed_tabs_find(DbFixation* first, DbFixation* fix);
126 #endif
127
fixed_tabs_insert(Process * p,DbFixation * fix)128 static void fixed_tabs_insert(Process* p, DbFixation* fix)
129 {
130 DbFixation* first = erts_psd_get(p, ERTS_PSD_ETS_FIXED_TABLES);
131
132 if (!first) {
133 fix->tabs.next = fix->tabs.prev = fix;
134 erts_psd_set(p, ERTS_PSD_ETS_FIXED_TABLES, fix);
135 }
136 else {
137 ASSERT(!fixed_tabs_find(first, fix));
138 fix->tabs.prev = first->tabs.prev;
139 fix->tabs.next = first;
140 fix->tabs.prev->tabs.next = fix;
141 first->tabs.prev = fix;
142 }
143 }
144
fixed_tabs_delete(Process * p,DbFixation * fix)145 static void fixed_tabs_delete(Process *p, DbFixation* fix)
146 {
147 if (fix->tabs.next == fix) {
148 DbFixation* old;
149 ASSERT(fix->tabs.prev == fix);
150 old = erts_psd_set(p, ERTS_PSD_ETS_FIXED_TABLES, NULL);
151 ASSERT(old == fix); (void)old;
152 }
153 else {
154 DbFixation *first = (DbFixation*) erts_psd_get(p, ERTS_PSD_ETS_FIXED_TABLES);
155
156 ASSERT(fixed_tabs_find(first, fix));
157 fix->tabs.prev->tabs.next = fix->tabs.next;
158 fix->tabs.next->tabs.prev = fix->tabs.prev;
159
160 if (fix == first)
161 erts_psd_set(p, ERTS_PSD_ETS_FIXED_TABLES, fix->tabs.next);
162 }
163 }
164
165 #ifdef DEBUG
fixed_tabs_find(DbFixation * first,DbFixation * fix)166 static int fixed_tabs_find(DbFixation* first, DbFixation* fix)
167 {
168 DbFixation* p;
169
170 if (!first) {
171 first = (DbFixation*) erts_psd_get(fix->procs.p, ERTS_PSD_ETS_FIXED_TABLES);
172 }
173 p = first;
174 do {
175 if (p == fix)
176 return 1;
177 ASSERT(p->procs.p == fix->procs.p);
178 ASSERT(p->tabs.next->tabs.prev == p);
179 p = p->tabs.next;
180 } while (p != first);
181 return 0;
182 }
183 #endif
184
185
186 /*
187 * fixing_procs: tree of all processes fixating a table
188 */
189 #define ERTS_RBT_PREFIX fixing_procs
190 #define ERTS_RBT_T DbFixation
191 #define ERTS_RBT_KEY_T Process*
192 #define ERTS_RBT_FLAGS_T int
193 #define ERTS_RBT_INIT_EMPTY_TNODE(T) \
194 do { \
195 (T)->procs.parent = NULL; \
196 (T)->procs.right = NULL; \
197 (T)->procs.left = NULL; \
198 } while (0)
199 #define ERTS_RBT_IS_RED(T) ((T)->procs.is_red)
200 #define ERTS_RBT_SET_RED(T) ((T)->procs.is_red = 1)
201 #define ERTS_RBT_IS_BLACK(T) (!(T)->procs.is_red)
202 #define ERTS_RBT_SET_BLACK(T) ((T)->procs.is_red = 0)
203 #define ERTS_RBT_GET_FLAGS(T) ((T)->procs.is_red)
204 #define ERTS_RBT_SET_FLAGS(T, F) ((T)->procs.is_red = (F))
205 #define ERTS_RBT_GET_PARENT(T) ((T)->procs.parent)
206 #define ERTS_RBT_SET_PARENT(T, P) ((T)->procs.parent = (P))
207 #define ERTS_RBT_GET_RIGHT(T) ((T)->procs.right)
208 #define ERTS_RBT_SET_RIGHT(T, R) ((T)->procs.right = (R))
209 #define ERTS_RBT_GET_LEFT(T) ((T)->procs.left)
210 #define ERTS_RBT_SET_LEFT(T, L) ((T)->procs.left = (L))
211 #define ERTS_RBT_GET_KEY(T) ((T)->procs.p)
212 #define ERTS_RBT_IS_LT(KX, KY) ((KX) < (KY))
213 #define ERTS_RBT_IS_EQ(KX, KY) ((KX) == (KY))
214
215 #define ERTS_RBT_WANT_INSERT
216 #define ERTS_RBT_WANT_LOOKUP
217 #define ERTS_RBT_WANT_DELETE
218 #define ERTS_RBT_WANT_FOREACH
219 #define ERTS_RBT_WANT_FOREACH_DESTROY
220 #define ERTS_RBT_UNDEF
221
222 #include "erl_rbtree.h"
223
224 #ifdef HARDDEBUG
225 # error Do something useful with CHECK_TABLES maybe
226 #else
227 # define CHECK_TABLES()
228 #endif
229
230
231 static void
232 send_ets_transfer_message(Process *c_p, Process *proc,
233 ErtsProcLocks *locks,
234 DbTable *tb, Eterm heir_data);
235 static void schedule_free_dbtable(DbTable* tb);
236 static void delete_sched_table(Process *c_p, DbTable *tb);
237
table_dec_refc(DbTable * tb,erts_aint_t min_val)238 static void table_dec_refc(DbTable *tb, erts_aint_t min_val)
239 {
240 if (erts_refc_dectest(&tb->common.refc, min_val) == 0)
241 schedule_free_dbtable(tb);
242 }
243
244 static int
db_table_tid_destructor(Binary * unused)245 db_table_tid_destructor(Binary *unused)
246 {
247 return 1;
248 }
249
250 static ERTS_INLINE void
make_btid(DbTable * tb)251 make_btid(DbTable *tb)
252 {
253 Binary *btid = erts_create_magic_indirection(db_table_tid_destructor);
254 erts_atomic_t *tbref = erts_binary_to_magic_indirection(btid);
255 erts_atomic_init_nob(tbref, (erts_aint_t) tb);
256 tb->common.btid = btid;
257 /*
258 * Table and magic indirection refer eachother,
259 * and table is refered once by being alive...
260 */
261 erts_refc_init(&tb->common.refc, 2);
262 erts_refc_inc(&btid->intern.refc, 1);
263 }
264
btid2tab(Binary * btid)265 static ERTS_INLINE DbTable* btid2tab(Binary* btid)
266 {
267 erts_atomic_t *tbref = erts_binary_to_magic_indirection(btid);
268 return (DbTable *) erts_atomic_read_nob(tbref);
269 }
270
271 static DbTable *
tid2tab(Eterm tid,Eterm * error_info_p)272 tid2tab(Eterm tid, Eterm *error_info_p)
273 {
274 DbTable *tb;
275 Binary *btid;
276 erts_atomic_t *tbref;
277
278 ASSERT(error_info_p);
279 if (!is_internal_magic_ref(tid)) {
280 *error_info_p = EXI_TYPE;
281 return NULL;
282 }
283
284 btid = erts_magic_ref2bin(tid);
285 if (ERTS_MAGIC_BIN_DESTRUCTOR(btid) != db_table_tid_destructor) {
286 *error_info_p = EXI_TYPE;
287 return NULL;
288 }
289
290 tbref = erts_binary_to_magic_indirection(btid);
291 tb = (DbTable *) erts_atomic_read_nob(tbref);
292
293 ASSERT(!tb || tb->common.btid == btid);
294
295 if (tb == NULL) {
296 *error_info_p = EXI_ID;
297 }
298
299 return tb;
300 }
301
302 static ERTS_INLINE int
is_table_alive(DbTable * tb)303 is_table_alive(DbTable *tb)
304 {
305 erts_atomic_t *tbref;
306 DbTable *rtb;
307
308 tbref = erts_binary_to_magic_indirection(tb->common.btid);
309 rtb = (DbTable *) erts_atomic_read_nob(tbref);
310
311 ASSERT(!rtb || rtb == tb);
312
313 return !!rtb;
314 }
315
316 static ERTS_INLINE int
is_table_named(DbTable * tb)317 is_table_named(DbTable *tb)
318 {
319 return tb->common.type & DB_NAMED_TABLE;
320 }
321
322
323 static ERTS_INLINE void
tid_clear(Process * c_p,DbTable * tb)324 tid_clear(Process *c_p, DbTable *tb)
325 {
326 DbTable *rtb;
327 Binary *btid = tb->common.btid;
328 erts_atomic_t *tbref = erts_binary_to_magic_indirection(btid);
329 rtb = (DbTable *) erts_atomic_xchg_nob(tbref, (erts_aint_t) NULL);
330 ASSERT(!rtb || tb == rtb);
331 if (rtb) {
332 table_dec_refc(tb, 1);
333 delete_sched_table(c_p, tb);
334 }
335 }
336
337 static ERTS_INLINE Eterm
make_tid(Process * c_p,DbTable * tb)338 make_tid(Process *c_p, DbTable *tb)
339 {
340 Eterm *hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE);
341 return erts_mk_magic_ref(&hp, &c_p->off_heap, tb->common.btid);
342 }
343
344 Eterm
erts_db_make_tid(Process * c_p,DbTableCommon * tb)345 erts_db_make_tid(Process *c_p, DbTableCommon *tb)
346 {
347 return make_tid(c_p, (DbTable*)tb);
348 }
349
350
351
352 /*
353 ** The meta hash table of all NAMED ets tables
354 */
355 # define META_NAME_TAB_LOCK_CNT 256
356 union {
357 erts_rwmtx_t lck;
358 byte align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_rwmtx_t))];
359 }meta_name_tab_rwlocks[META_NAME_TAB_LOCK_CNT];
360 static struct meta_name_tab_entry {
361 union {
362 Eterm name_atom;
363 Eterm mcnt; /* Length of mvec in multiple tab entry */
364 }u;
365 union {
366 DbTable *tb;
367 struct meta_name_tab_entry* mvec;
368 }pu;
369 } *meta_name_tab;
370
371 static unsigned meta_name_tab_mask;
372
373 static ERTS_INLINE
meta_name_tab_bucket(Eterm name,erts_rwmtx_t ** lockp)374 struct meta_name_tab_entry* meta_name_tab_bucket(Eterm name,
375 erts_rwmtx_t** lockp)
376 {
377 unsigned bix = atom_val(name) & meta_name_tab_mask;
378 struct meta_name_tab_entry* bucket = &meta_name_tab[bix];
379 /* Only non-dirty schedulers are allowed to access the metatable
380 The smp 1 optimizations for ETS depend on that */
381 ASSERT(erts_get_scheduler_data() && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data()));
382 *lockp = &meta_name_tab_rwlocks[bix % META_NAME_TAB_LOCK_CNT].lck;
383 return bucket;
384 }
385
386
387 typedef enum {
388 LCK_READ=1, /* read only access */
389 LCK_WRITE=2, /* exclusive table write access */
390 LCK_WRITE_REC=3, /* record write access */
391 NOLCK_ACCESS=4 /* Used to access the table structure
392 without acquiring the table lock */
393 } db_lock_kind_t;
394
395 extern DbTableMethod db_hash;
396 extern DbTableMethod db_tree;
397 extern DbTableMethod db_catree;
398
399 int user_requested_db_max_tabs;
400 int erts_ets_realloc_always_moves;
401 int erts_ets_always_compress;
402 static int db_max_tabs;
403
404 /*
405 ** Forward decls, static functions
406 */
407
408 static void fix_table_locked(Process* p, DbTable* tb);
409 static void unfix_table_locked(Process* p, DbTable* tb, db_lock_kind_t* kind);
410 static void set_heir(Process* me, DbTable* tb, Eterm heir, UWord heir_data);
411 static void free_heir_data(DbTable*);
412 static SWord free_fixations_locked(Process* p, DbTable *tb);
413
414 static void delete_all_objects_continue(Process* p, DbTable* tb);
415 static SWord free_table_continue(Process *p, DbTable *tb, SWord reds);
416 static void print_table(fmtfn_t to, void *to_arg, int show, DbTable* tb);
417 static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1);
418 static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1);
419 static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1);
420 static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1);
421 static BIF_RETTYPE ets_delete_trap(BIF_ALIST_1);
422 static Eterm table_info(Process* p, DbTable* tb, Eterm What);
423
424 static BIF_RETTYPE ets_select1(Process* p, int bif_ix, Eterm arg1);
425 static BIF_RETTYPE ets_select2(Process* p, DbTable*, Eterm tid, Eterm ms);
426 static BIF_RETTYPE ets_select3(Process* p, DbTable*, Eterm tid, Eterm ms, Sint chunk_size);
427
428
429 /*
430 * Exported global
431 */
432 Export ets_select_delete_continue_exp;
433 Export ets_select_count_continue_exp;
434 Export ets_select_replace_continue_exp;
435 Export ets_select_continue_exp;
436 /*
437 * Static traps
438 */
439 static Export ets_delete_continue_exp;
440
441 static Export *ets_info_binary_trap = NULL;
442
443 static void
free_dbtable(void * vtb)444 free_dbtable(void *vtb)
445 {
446 DbTable *tb = (DbTable *) vtb;
447 erts_flxctr_add(&tb->common.counters,
448 ERTS_DB_TABLE_MEM_COUNTER_ID,
449 -((Sint)erts_flxctr_nr_of_allocated_bytes(&tb->common.counters)));
450 ASSERT(erts_flxctr_is_snapshot_ongoing(&tb->common.counters) ||
451 sizeof(DbTable) == DB_GET_APPROX_MEM_CONSUMED(tb));
452
453 ASSERT(is_immed(tb->common.heir_data));
454
455 if (!DB_LOCK_FREE(tb)) {
456 erts_rwmtx_destroy(&tb->common.rwlock);
457 erts_mtx_destroy(&tb->common.fixlock);
458 }
459
460 if (tb->common.btid)
461 erts_bin_release(tb->common.btid);
462
463 erts_flxctr_destroy(&tb->common.counters, ERTS_ALC_T_ETS_CTRS);
464 erts_free(ERTS_ALC_T_DB_TABLE, tb);
465 }
466
schedule_free_dbtable(DbTable * tb)467 static void schedule_free_dbtable(DbTable* tb)
468 {
469 /*
470 * NON-SMP case: Caller is *not* allowed to access the *tb
471 * structure after this function has returned!
472 * SMP case: Caller is allowed to access the *common* part of the *tb
473 * structure until the bif has returned (we typically need to
474 * unlock the table lock after this function has returned).
475 * Caller is *not* allowed to access the specialized part
476 * (hash or tree) of *tb after this function has returned.
477 */
478 ASSERT(erts_refc_read(&tb->common.refc, 0) == 0);
479 ASSERT(erts_refc_read(&tb->common.fix_count, 0) == 0);
480 erts_schedule_thr_prgr_later_cleanup_op(free_dbtable,
481 (void *) tb,
482 &tb->release.data,
483 sizeof(DbTable));
484 }
485
486 static ERTS_INLINE void
save_sched_table(Process * c_p,DbTable * tb)487 save_sched_table(Process *c_p, DbTable *tb)
488 {
489 ErtsSchedulerData *esdp = erts_proc_sched_data(c_p);
490 DbTable *first;
491
492 ASSERT(esdp);
493 erts_atomic_inc_nob(&esdp->ets_tables.count);
494 erts_refc_inc(&tb->common.refc, 1);
495
496 first = esdp->ets_tables.clist;
497 if (!first) {
498 tb->common.all.next = tb->common.all.prev = tb;
499 esdp->ets_tables.clist = tb;
500 }
501 else {
502 tb->common.all.prev = first->common.all.prev;
503 tb->common.all.next = first;
504 tb->common.all.prev->common.all.next = tb;
505 first->common.all.prev = tb;
506 }
507 }
508
509 static ERTS_INLINE void
remove_sched_table(ErtsSchedulerData * esdp,DbTable * tb)510 remove_sched_table(ErtsSchedulerData *esdp, DbTable *tb)
511 {
512 ErtsEtsAllYieldData *eaydp;
513 ASSERT(esdp);
514 ASSERT(erts_get_ref_numbers_thr_id(ERTS_MAGIC_BIN_REFN(tb->common.btid))
515 == (Uint32) esdp->no);
516
517 ASSERT(erts_atomic_read_nob(&esdp->ets_tables.count) > 0);
518 erts_atomic_dec_nob(&esdp->ets_tables.count);
519
520 eaydp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all);
521 if (eaydp->ongoing) {
522 /* ets:all() op process list from last to first... */
523 if (eaydp->tab == tb) {
524 if (eaydp->tab == esdp->ets_tables.clist)
525 eaydp->tab = NULL;
526 else
527 eaydp->tab = tb->common.all.prev;
528 }
529 }
530
531 if (tb->common.all.next == tb) {
532 ASSERT(tb->common.all.prev == tb);
533 ASSERT(esdp->ets_tables.clist == tb);
534 esdp->ets_tables.clist = NULL;
535 }
536 else {
537 #ifdef DEBUG
538 DbTable *tmp = esdp->ets_tables.clist;
539 do {
540 if (tmp == tb) break;
541 tmp = tmp->common.all.next;
542 } while (tmp != esdp->ets_tables.clist);
543 ASSERT(tmp == tb);
544 #endif
545 tb->common.all.prev->common.all.next = tb->common.all.next;
546 tb->common.all.next->common.all.prev = tb->common.all.prev;
547
548 if (esdp->ets_tables.clist == tb)
549 esdp->ets_tables.clist = tb->common.all.next;
550
551 }
552
553 table_dec_refc(tb, 0);
554 }
555
556 static void
scheduled_remove_sched_table(void * vtb)557 scheduled_remove_sched_table(void *vtb)
558 {
559 remove_sched_table(erts_get_scheduler_data(), (DbTable *) vtb);
560 }
561
562 static void
delete_sched_table(Process * c_p,DbTable * tb)563 delete_sched_table(Process *c_p, DbTable *tb)
564 {
565 ErtsSchedulerData *esdp = erts_proc_sched_data(c_p);
566 Uint32 sid;
567
568 ASSERT(esdp);
569
570 ASSERT(tb->common.btid);
571 sid = erts_get_ref_numbers_thr_id(ERTS_MAGIC_BIN_REFN(tb->common.btid));
572 ASSERT(1 <= sid && sid <= erts_no_schedulers);
573 if (sid == (Uint32) esdp->no)
574 remove_sched_table(esdp, tb);
575 else
576 erts_schedule_misc_aux_work((int) sid, scheduled_remove_sched_table, tb);
577 }
578
579 static ERTS_INLINE void
save_owned_table(Process * c_p,DbTable * tb)580 save_owned_table(Process *c_p, DbTable *tb)
581 {
582 DbTable *first;
583
584 erts_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
585
586 first = (DbTable*) erts_psd_get(c_p, ERTS_PSD_ETS_OWNED_TABLES);
587
588 erts_refc_inc(&tb->common.refc, 1);
589
590 if (!first) {
591 tb->common.owned.next = tb->common.owned.prev = tb;
592 erts_psd_set(c_p, ERTS_PSD_ETS_OWNED_TABLES, tb);
593 }
594 else {
595 tb->common.owned.prev = first->common.owned.prev;
596 tb->common.owned.next = first;
597 tb->common.owned.prev->common.owned.next = tb;
598 first->common.owned.prev = tb;
599 }
600 erts_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
601 }
602
603 static ERTS_INLINE void
delete_owned_table(Process * p,DbTable * tb)604 delete_owned_table(Process *p, DbTable *tb)
605 {
606 erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
607 if (tb->common.owned.next == tb) {
608 DbTable* old;
609 ASSERT(tb->common.owned.prev == tb);
610 old = erts_psd_set(p, ERTS_PSD_ETS_OWNED_TABLES, NULL);
611 ASSERT(old == tb); (void)old;
612 }
613 else {
614 DbTable *first = (DbTable*) erts_psd_get(p, ERTS_PSD_ETS_OWNED_TABLES);
615 #ifdef DEBUG
616 DbTable *tmp = first;
617 do {
618 if (tmp == tb) break;
619 tmp = tmp->common.owned.next;
620 } while (tmp != first);
621 ASSERT(tmp == tb);
622 #endif
623 tb->common.owned.prev->common.owned.next = tb->common.owned.next;
624 tb->common.owned.next->common.owned.prev = tb->common.owned.prev;
625
626 if (tb == first)
627 erts_psd_set(p, ERTS_PSD_ETS_OWNED_TABLES, tb->common.owned.next);
628 }
629 erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
630
631 table_dec_refc(tb, 1);
632 }
633
db_init_lock(DbTable * tb,int use_frequent_read_lock)634 static ERTS_INLINE void db_init_lock(DbTable* tb, int use_frequent_read_lock)
635 {
636 erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
637 if (use_frequent_read_lock)
638 rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
639 if (erts_ets_rwmtx_spin_count >= 0)
640 rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
641 if (!DB_LOCK_FREE(tb)) {
642 erts_rwmtx_init_opt(&tb->common.rwlock, &rwmtx_opt, "db_tab",
643 tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
644 erts_mtx_init(&tb->common.fixlock, "db_tab_fix",
645 tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
646 }
647 tb->common.is_thread_safe = !(tb->common.status & DB_FINE_LOCKED);
648 ASSERT(!DB_LOCK_FREE(tb) || tb->common.is_thread_safe);
649 }
650
db_lock(DbTable * tb,db_lock_kind_t kind)651 static ERTS_INLINE void db_lock(DbTable* tb, db_lock_kind_t kind)
652 {
653 if (DB_LOCK_FREE(tb))
654 return;
655 if (tb->common.type & DB_FINE_LOCKED) {
656 if (kind == LCK_WRITE) {
657 erts_rwmtx_rwlock(&tb->common.rwlock);
658 tb->common.is_thread_safe = 1;
659 } else if (kind != NOLCK_ACCESS) {
660 erts_rwmtx_rlock(&tb->common.rwlock);
661 ASSERT(!tb->common.is_thread_safe);
662 }
663 }
664 else
665 {
666 switch (kind) {
667 case LCK_WRITE:
668 case LCK_WRITE_REC:
669 erts_rwmtx_rwlock(&tb->common.rwlock);
670 break;
671 case NOLCK_ACCESS:
672 return;
673 default:
674 erts_rwmtx_rlock(&tb->common.rwlock);
675 }
676 ASSERT(tb->common.is_thread_safe);
677 }
678 }
679
db_unlock(DbTable * tb,db_lock_kind_t kind)680 static ERTS_INLINE void db_unlock(DbTable* tb, db_lock_kind_t kind)
681 {
682 if (DB_LOCK_FREE(tb) || kind == NOLCK_ACCESS)
683 return;
684 if (tb->common.type & DB_FINE_LOCKED) {
685 if (kind == LCK_WRITE) {
686 ASSERT(tb->common.is_thread_safe);
687 tb->common.is_thread_safe = 0;
688 erts_rwmtx_rwunlock(&tb->common.rwlock);
689 }
690 else {
691 ASSERT(!tb->common.is_thread_safe);
692 erts_rwmtx_runlock(&tb->common.rwlock);
693 }
694 }
695 else {
696 ASSERT(tb->common.is_thread_safe);
697 switch (kind) {
698 case LCK_WRITE:
699 case LCK_WRITE_REC:
700 erts_rwmtx_rwunlock(&tb->common.rwlock);
701 break;
702 default:
703 erts_rwmtx_runlock(&tb->common.rwlock);
704 }
705 }
706 }
707
db_is_exclusive(DbTable * tb,db_lock_kind_t kind)708 static ERTS_INLINE int db_is_exclusive(DbTable* tb, db_lock_kind_t kind)
709 {
710 if (DB_LOCK_FREE(tb))
711 return 1;
712
713 return
714 kind != LCK_READ &&
715 kind != NOLCK_ACCESS &&
716 tb->common.is_thread_safe;
717 }
718
handle_lacking_permission(Process * p,DbTable * tb,db_lock_kind_t kind,Uint * freason_p)719 static DbTable* handle_lacking_permission(Process* p, DbTable* tb,
720 db_lock_kind_t kind,
721 Uint* freason_p)
722 {
723 if (tb->common.status & DB_BUSY) {
724 void* continuation_state;
725 if (!db_is_exclusive(tb, kind)) {
726 db_unlock(tb, kind);
727 db_lock(tb, LCK_WRITE);
728 }
729 continuation_state = (void*)erts_atomic_read_nob(&tb->common.continuation_state);
730 if (continuation_state != NULL) {
731 const long iterations_per_red = 10;
732 const long reds = iterations_per_red * ERTS_BIF_REDS_LEFT(p);
733 long nr_of_reductions = DBG_RANDOM_REDS(reds, (Uint)freason_p);
734 const long init_reds = nr_of_reductions;
735 tb->common.continuation(&nr_of_reductions,
736 &continuation_state,
737 NULL);
738 if (continuation_state == NULL) {
739 erts_atomic_set_relb(&tb->common.continuation_state, (Sint)NULL);
740 }
741 BUMP_REDS(p, (init_reds - nr_of_reductions) / iterations_per_red);
742 } else {
743 delete_all_objects_continue(p, tb);
744 }
745 db_unlock(tb, LCK_WRITE);
746 tb = NULL;
747 *freason_p = TRAP;
748 }
749 else if (p->common.id != tb->common.owner
750 && !(p->flags & F_ETS_SUPER_USER)) {
751 db_unlock(tb, kind);
752 tb = NULL;
753 p->fvalue = EXI_ACCESS;
754 *freason_p = BADARG | EXF_HAS_EXT_INFO;
755 }
756 return tb;
757 }
758
759 static ERTS_INLINE
db_get_table_aux(Process * p,Eterm id,int what,db_lock_kind_t kind,int meta_already_locked,Uint * freason_p)760 DbTable* db_get_table_aux(Process *p,
761 Eterm id,
762 int what,
763 db_lock_kind_t kind,
764 int meta_already_locked,
765 Uint* freason_p)
766 {
767 DbTable *tb;
768
769 /*
770 * IMPORTANT: Only non-dirty scheduler threads are allowed
771 * to access tables. Memory management depend on it.
772 */
773 ASSERT(erts_get_scheduler_data() && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data()));
774
775 ASSERT((what == DB_READ_TBL_STRUCT) == (kind == NOLCK_ACCESS));
776
777 if (META_DB_LOCK_FREE())
778 meta_already_locked = 1;
779
780 if (is_not_atom(id)) {
781 tb = tid2tab(id, &p->fvalue);
782 } else {
783 erts_rwmtx_t *mtl;
784 struct meta_name_tab_entry* bucket = meta_name_tab_bucket(id,&mtl);
785 if (!meta_already_locked)
786 erts_rwmtx_rlock(mtl);
787 else {
788 ERTS_LC_ASSERT(META_DB_LOCK_FREE()
789 || erts_lc_rwmtx_is_rlocked(mtl)
790 || erts_lc_rwmtx_is_rwlocked(mtl));
791 }
792 tb = NULL;
793 if (bucket->pu.tb != NULL) {
794 if (is_atom(bucket->u.name_atom)) { /* single */
795 if (bucket->u.name_atom == id)
796 tb = bucket->pu.tb;
797 }
798 else { /* multi */
799 Uint cnt = unsigned_val(bucket->u.mcnt);
800 Uint i;
801 for (i=0; i<cnt; i++) {
802 if (bucket->pu.mvec[i].u.name_atom == id) {
803 tb = bucket->pu.mvec[i].pu.tb;
804 break;
805 }
806 }
807 }
808 }
809 if (!meta_already_locked)
810 erts_rwmtx_runlock(mtl);
811
812 if (tb == NULL) {
813 p->fvalue = EXI_ID;
814 }
815 }
816
817 if (tb) {
818 db_lock(tb, kind);
819 #ifdef ETS_DBG_FORCE_TRAP
820 /*
821 * The ets_SUITE uses this to verify that all table lookups calls
822 * can handle a failed TRAP return correctly.
823 */
824 if (what != DB_READ_TBL_STRUCT && tb->common.dbg_force_trap) {
825 if (!(p->flags & F_DBG_FORCED_TRAP)) {
826 db_unlock(tb, kind);
827 tb = NULL;
828 *freason_p = TRAP;
829 p->fvalue = EXI_TYPE;
830 p->flags |= F_DBG_FORCED_TRAP;
831 return tb;
832 } else {
833 /* back from forced trap */
834 p->flags &= ~F_DBG_FORCED_TRAP;
835 }
836 }
837 #endif
838 if (what != DB_READ_TBL_STRUCT
839 /* IMPORTANT: the above check is necessary as the status field
840 might be in an intermediate state when
841 kind==NOLCK_ACCESS */
842 && ERTS_UNLIKELY(!(tb->common.status & what))) {
843 tb = handle_lacking_permission(p, tb, kind, freason_p);
844 }
845 }
846 else {
847 *freason_p = BADARG | EXF_HAS_EXT_INFO;
848 }
849
850 return tb;
851 }
852
853 static ERTS_INLINE
db_get_table(Process * p,Eterm id,int what,db_lock_kind_t kind,Uint * freason_p)854 DbTable* db_get_table(Process *p,
855 Eterm id,
856 int what,
857 db_lock_kind_t kind,
858 Uint* freason_p)
859 {
860 return db_get_table_aux(p, id, what, kind, 0, freason_p);
861 }
862
db_get_table_or_fail_return(DbTable ** tb,Eterm table_id,Uint32 what,db_lock_kind_t kind,Uint bif_ix,Process * p)863 static BIF_RETTYPE db_get_table_or_fail_return(DbTable **tb, /* out */
864 Eterm table_id,
865 Uint32 what,
866 db_lock_kind_t kind,
867 Uint bif_ix,
868 Process* p)
869 {
870 DB_GET_TABLE(*tb, table_id, what, kind, bif_ix, NULL, p);
871 return THE_NON_VALUE;
872 }
873
insert_named_tab(Eterm name_atom,DbTable * tb,int have_lock)874 static int insert_named_tab(Eterm name_atom, DbTable* tb, int have_lock)
875 {
876 int ret = 0;
877 erts_rwmtx_t* rwlock;
878 struct meta_name_tab_entry* new_entry;
879 struct meta_name_tab_entry* bucket = meta_name_tab_bucket(name_atom,
880 &rwlock);
881
882 if (META_DB_LOCK_FREE())
883 have_lock = 1;
884
885 if (!have_lock)
886 erts_rwmtx_rwlock(rwlock);
887
888 if (bucket->pu.tb == NULL) { /* empty */
889 new_entry = bucket;
890 }
891 else {
892 struct meta_name_tab_entry* entries;
893 Uint cnt;
894 if (is_atom(bucket->u.name_atom)) { /* single */
895 size_t size;
896 if (bucket->u.name_atom == name_atom) {
897 goto done;
898 }
899 cnt = 2;
900 size = sizeof(struct meta_name_tab_entry)*cnt;
901 entries = erts_db_alloc_nt(ERTS_ALC_T_DB_NTAB_ENT, size);
902 ERTS_ETS_MISC_MEM_ADD(size);
903 new_entry = &entries[0];
904 entries[1] = *bucket;
905 }
906 else { /* multi */
907 size_t size, old_size;
908 Uint i;
909 cnt = unsigned_val(bucket->u.mcnt);
910 for (i=0; i<cnt; i++) {
911 if (bucket->pu.mvec[i].u.name_atom == name_atom) {
912 goto done;
913 }
914 }
915 old_size = sizeof(struct meta_name_tab_entry)*cnt;
916 size = sizeof(struct meta_name_tab_entry)*(cnt+1);
917 entries = erts_db_realloc_nt(ERTS_ALC_T_DB_NTAB_ENT,
918 bucket->pu.mvec,
919 old_size,
920 size);
921 ERTS_ETS_MISC_MEM_ADD(size-old_size);
922 new_entry = &entries[cnt];
923 cnt++;
924 }
925 bucket->pu.mvec = entries;
926 bucket->u.mcnt = make_small(cnt);
927 }
928 new_entry->pu.tb = tb;
929 new_entry->u.name_atom = name_atom;
930 ret = 1; /* Ok */
931
932 done:
933 if (!have_lock)
934 erts_rwmtx_rwunlock(rwlock);
935 return ret;
936 }
937
remove_named_tab(DbTable * tb,int have_lock)938 static int remove_named_tab(DbTable *tb, int have_lock)
939 {
940 int ret = 0;
941 erts_rwmtx_t* rwlock;
942 Eterm name_atom = tb->common.the_name;
943 struct meta_name_tab_entry* bucket = meta_name_tab_bucket(name_atom,
944 &rwlock);
945 ASSERT(is_table_named(tb));
946
947 if (META_DB_LOCK_FREE())
948 have_lock = 1;
949
950 if (!have_lock && erts_rwmtx_tryrwlock(rwlock) == EBUSY) {
951 db_unlock(tb, LCK_WRITE);
952 erts_rwmtx_rwlock(rwlock);
953 db_lock(tb, LCK_WRITE);
954 }
955
956 ERTS_LC_ASSERT(META_DB_LOCK_FREE() || erts_lc_rwmtx_is_rwlocked(rwlock));
957
958 if (bucket->pu.tb == NULL) {
959 goto done;
960 }
961 else if (is_atom(bucket->u.name_atom)) { /* single */
962 if (bucket->u.name_atom != name_atom) {
963 goto done;
964 }
965 bucket->pu.tb = NULL;
966 }
967 else { /* multi */
968 Uint cnt = unsigned_val(bucket->u.mcnt);
969 Uint i = 0;
970 for (;;) {
971 if (bucket->pu.mvec[i].u.name_atom == name_atom) {
972 break;
973 }
974 if (++i >= cnt) {
975 goto done;
976 }
977 }
978 if (cnt == 2) { /* multi -> single */
979 size_t size;
980 struct meta_name_tab_entry* entries = bucket->pu.mvec;
981 *bucket = entries[1-i];
982 size = sizeof(struct meta_name_tab_entry)*cnt;
983 erts_db_free_nt(ERTS_ALC_T_DB_NTAB_ENT, entries, size);
984 ERTS_ETS_MISC_MEM_ADD(-size);
985 ASSERT(is_atom(bucket->u.name_atom));
986 }
987 else {
988 size_t size, old_size;
989 ASSERT(cnt > 2);
990 bucket->u.mcnt = make_small(--cnt);
991 if (i != cnt) {
992 /* reposition last one before realloc destroys it */
993 bucket->pu.mvec[i] = bucket->pu.mvec[cnt];
994 }
995 old_size = sizeof(struct meta_name_tab_entry)*(cnt+1);
996 size = sizeof(struct meta_name_tab_entry)*cnt;
997 bucket->pu.mvec = erts_db_realloc_nt(ERTS_ALC_T_DB_NTAB_ENT,
998 bucket->pu.mvec,
999 old_size,
1000 size);
1001 ERTS_ETS_MISC_MEM_ADD(size - old_size);
1002
1003 }
1004 }
1005 ret = 1; /* Ok */
1006
1007 done:
1008 if (!have_lock)
1009 erts_rwmtx_rwunlock(rwlock);
1010 return ret;
1011 }
1012
1013 /* Do a fast fixation of a hash table.
1014 ** Must be matched by a local unfix before releasing table lock.
1015 */
local_fix_table(DbTable * tb)1016 static ERTS_INLINE void local_fix_table(DbTable* tb)
1017 {
1018 erts_refc_inc(&tb->common.fix_count, 1);
1019 }
local_unfix_table(DbTable * tb)1020 static ERTS_INLINE void local_unfix_table(DbTable* tb)
1021 {
1022 if (erts_refc_dectest(&tb->common.fix_count, 0) == 0) {
1023 ASSERT(IS_HASH_TABLE(tb->common.status));
1024 db_unfix_table_hash(&(tb->hash));
1025 }
1026 }
1027
1028
1029 /*
1030 * BIFs.
1031 */
1032
ets_safe_fixtable_2(BIF_ALIST_2)1033 BIF_RETTYPE ets_safe_fixtable_2(BIF_ALIST_2)
1034 {
1035 DbTable *tb;
1036 db_lock_kind_t kind;
1037 #ifdef HARDDEBUG
1038 erts_fprintf(stderr,
1039 "ets:safe_fixtable(%T,%T); Process: %T, initial: %T:%T/%bpu\n",
1040 BIF_ARG_1, BIF_ARG_2, BIF_P->common.id,
1041 BIF_P->u.initial[0], BIF_P->u.initial[1], BIF_P->u.initial[2]);
1042 #endif
1043 kind = (BIF_ARG_2 == am_true) ? LCK_READ : LCK_WRITE_REC;
1044
1045 DB_BIF_GET_TABLE(tb, DB_READ, kind, BIF_ets_safe_fixtable_2);
1046
1047 if (BIF_ARG_2 == am_true) {
1048 fix_table_locked(BIF_P, tb);
1049 }
1050 else if (BIF_ARG_2 == am_false) {
1051 if (IS_FIXED(tb)) {
1052 unfix_table_locked(BIF_P, tb, &kind);
1053 }
1054 }
1055 else {
1056 db_unlock(tb, kind);
1057 BIF_ERROR(BIF_P, BADARG);
1058 }
1059 db_unlock(tb, kind);
1060 BIF_RET(am_true);
1061 }
1062
1063
1064 /*
1065 ** Returns the first Key in a table
1066 */
ets_first_1(BIF_ALIST_1)1067 BIF_RETTYPE ets_first_1(BIF_ALIST_1)
1068 {
1069 DbTable* tb;
1070 int cret;
1071 Eterm ret;
1072
1073 CHECK_TABLES();
1074
1075 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_first_1);
1076
1077 cret = tb->common.meth->db_first(BIF_P, tb, &ret);
1078
1079 db_unlock(tb, LCK_READ);
1080
1081 if (cret != DB_ERROR_NONE) {
1082 BIF_ERROR(BIF_P, BADARG);
1083 }
1084 BIF_RET(ret);
1085 }
1086
1087 /*
1088 ** The next BIF, given a key, return the "next" key
1089 */
ets_next_2(BIF_ALIST_2)1090 BIF_RETTYPE ets_next_2(BIF_ALIST_2)
1091 {
1092 DbTable* tb;
1093 int cret;
1094 Eterm ret;
1095
1096 CHECK_TABLES();
1097
1098 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_next_2);
1099
1100 cret = tb->common.meth->db_next(BIF_P, tb, BIF_ARG_2, &ret);
1101
1102 db_unlock(tb, LCK_READ);
1103
1104 if (cret != DB_ERROR_NONE) {
1105 BIF_ERROR(BIF_P, BADARG);
1106 }
1107 BIF_RET(ret);
1108 }
1109
1110 /*
1111 ** Returns the last Key in a table
1112 */
ets_last_1(BIF_ALIST_1)1113 BIF_RETTYPE ets_last_1(BIF_ALIST_1)
1114 {
1115 DbTable* tb;
1116 int cret;
1117 Eterm ret;
1118
1119 CHECK_TABLES();
1120
1121 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_last_1);
1122
1123 cret = tb->common.meth->db_last(BIF_P, tb, &ret);
1124
1125 db_unlock(tb, LCK_READ);
1126
1127 if (cret != DB_ERROR_NONE) {
1128 BIF_ERROR(BIF_P, BADARG);
1129 }
1130 BIF_RET(ret);
1131 }
1132
1133 /*
1134 ** The prev BIF, given a key, return the "previous" key
1135 */
ets_prev_2(BIF_ALIST_2)1136 BIF_RETTYPE ets_prev_2(BIF_ALIST_2)
1137 {
1138 DbTable* tb;
1139 int cret;
1140 Eterm ret;
1141
1142 CHECK_TABLES();
1143
1144 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_prev_2);
1145
1146 cret = tb->common.meth->db_prev(BIF_P,tb,BIF_ARG_2,&ret);
1147
1148 db_unlock(tb, LCK_READ);
1149
1150 if (cret != DB_ERROR_NONE) {
1151 BIF_ERROR(BIF_P, BADARG);
1152 }
1153 BIF_RET(ret);
1154 }
1155
1156 /*
1157 ** take(Tab, Key)
1158 */
ets_take_2(BIF_ALIST_2)1159 BIF_RETTYPE ets_take_2(BIF_ALIST_2)
1160 {
1161 DbTable* tb;
1162 int cret;
1163 Eterm ret;
1164 CHECK_TABLES();
1165
1166 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_take_2);
1167
1168 cret = tb->common.meth->db_take(BIF_P, tb, BIF_ARG_2, &ret);
1169
1170 ASSERT(cret == DB_ERROR_NONE); (void)cret;
1171 db_unlock(tb, LCK_WRITE_REC);
1172 BIF_RET(ret);
1173 }
1174
1175 /*
1176 ** update_element(Tab, Key, {Pos, Value})
1177 ** update_element(Tab, Key, [{Pos, Value}])
1178 */
ets_update_element_3(BIF_ALIST_3)1179 BIF_RETTYPE ets_update_element_3(BIF_ALIST_3)
1180 {
1181 DbTable* tb;
1182 int cret = DB_ERROR_BADITEM;
1183 Eterm list;
1184 Eterm iter;
1185 DeclareTmpHeap(cell,2,BIF_P);
1186 DbUpdateHandle handle;
1187
1188 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_element_3);
1189
1190 UseTmpHeap(2,BIF_P);
1191 if (!(tb->common.status & (DB_SET | DB_ORDERED_SET | DB_CA_ORDERED_SET))) {
1192 BIF_P->fvalue = EXI_TAB_TYPE;
1193 cret = DB_ERROR_BADPARAM;
1194 goto bail_out;
1195 }
1196 if (is_tuple(BIF_ARG_3)) {
1197 list = CONS(cell, BIF_ARG_3, NIL);
1198 }
1199 else {
1200 list = BIF_ARG_3;
1201 }
1202
1203 if (!tb->common.meth->db_lookup_dbterm(BIF_P, tb, BIF_ARG_2, THE_NON_VALUE, &handle)) {
1204 cret = DB_ERROR_BADKEY;
1205 goto bail_out;
1206 }
1207
1208 /* First verify that list is ok to avoid nasty rollback scenarios
1209 */
1210 for (iter=list ; is_not_nil(iter); iter = CDR(list_val(iter))) {
1211 Eterm pv;
1212 Eterm* pvp;
1213 Sint position;
1214
1215 if (is_not_list(iter)) {
1216 goto finalize;
1217 }
1218 pv = CAR(list_val(iter)); /* {Pos,Value} */
1219 if (is_not_tuple(pv)) {
1220 goto finalize;
1221 }
1222 pvp = tuple_val(pv);
1223 if (arityval(*pvp) != 2 || !is_small(pvp[1])) {
1224 goto finalize;
1225 }
1226 position = signed_val(pvp[1]);
1227 if (position == tb->common.keypos) {
1228 BIF_P->fvalue = EXI_KEY_POS;
1229 cret = DB_ERROR_UNSPEC;
1230 goto finalize;
1231 }
1232 if (position < 1 || position == tb->common.keypos ||
1233 position > arityval(handle.dbterm->tpl[0])) {
1234 goto finalize;
1235 }
1236 }
1237 /* The point of no return, no failures from here on.
1238 */
1239 cret = DB_ERROR_NONE;
1240
1241 for (iter=list ; is_not_nil(iter); iter = CDR(list_val(iter))) {
1242 Eterm* pvp = tuple_val(CAR(list_val(iter))); /* {Pos,Value} */
1243 db_do_update_element(&handle, signed_val(pvp[1]), pvp[2]);
1244 }
1245
1246 finalize:
1247 tb->common.meth->db_finalize_dbterm(cret, &handle);
1248
1249 bail_out:
1250 UnUseTmpHeap(2,BIF_P);
1251 db_unlock(tb, LCK_WRITE_REC);
1252
1253 switch (cret) {
1254 case DB_ERROR_NONE:
1255 BIF_RET(am_true);
1256 case DB_ERROR_BADKEY:
1257 BIF_RET(am_false);
1258 case DB_ERROR_SYSRES:
1259 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
1260 case DB_ERROR_UNSPEC:
1261 BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
1262 default:
1263 BIF_ERROR(BIF_P, BADARG);
1264 break;
1265 }
1266 }
1267
1268 static BIF_RETTYPE
do_update_counter(Process * p,DbTable * tb,Eterm arg2,Eterm arg3,Eterm arg4)1269 do_update_counter(Process *p, DbTable* tb,
1270 Eterm arg2, Eterm arg3, Eterm arg4)
1271 {
1272 int cret = DB_ERROR_BADITEM;
1273 Eterm upop_list;
1274 int list_size;
1275 Eterm ret; /* int or [int] */
1276 Eterm* ret_list_currp = NULL;
1277 Eterm* ret_list_prevp = NULL;
1278 Eterm iter;
1279 DeclareTmpHeap(cell, 5, p);
1280 Eterm *tuple = cell+2;
1281 DbUpdateHandle handle;
1282 Uint halloc_size = 0; /* overestimated heap usage */
1283 Eterm* htop; /* actual heap usage */
1284 Eterm* hstart;
1285 Eterm* hend;
1286
1287 UseTmpHeap(5, p);
1288 if (!(tb->common.status & (DB_SET | DB_ORDERED_SET | DB_CA_ORDERED_SET))) {
1289 p->fvalue = EXI_TAB_TYPE;
1290 cret = DB_ERROR_BADPARAM;
1291 goto bail_out;
1292 }
1293 if (is_integer(arg3)) { /* Incr */
1294 upop_list = CONS(cell,
1295 TUPLE2(tuple, make_small(tb->common.keypos+1), arg3),
1296 NIL);
1297 }
1298 else if (is_tuple(arg3)) { /* {Upop} */
1299 upop_list = CONS(cell, arg3, NIL);
1300 }
1301 else { /* [{Upop}] (probably) */
1302 upop_list = arg3;
1303 ret_list_prevp = &ret;
1304 }
1305
1306 if (!tb->common.meth->db_lookup_dbterm(p, tb, arg2, arg4, &handle)) {
1307 p->fvalue = EXI_BAD_KEY;
1308 cret = DB_ERROR_BADPARAM;
1309 goto bail_out; /* key not found */
1310 }
1311
1312 /* First verify that list is ok to avoid nasty rollback scenarios
1313 */
1314 list_size = 0;
1315 for (iter=upop_list ; is_not_nil(iter); iter = CDR(list_val(iter)),
1316 list_size += 2) {
1317 Eterm upop;
1318 Eterm* tpl;
1319 Sint position;
1320 Eterm incr, warp;
1321 Wterm oldcnt;
1322
1323 if (is_not_list(iter)) {
1324 goto finalize;
1325 }
1326 upop = CAR(list_val(iter));
1327 if (is_not_tuple(upop)) {
1328 goto finalize;
1329 }
1330 tpl = tuple_val(upop);
1331 switch (arityval(*tpl)) {
1332 case 4: /* threshold specified */
1333 if (is_not_integer(tpl[3])) {
1334 goto finalize;
1335 }
1336 warp = tpl[4];
1337 if (is_big(warp)) {
1338 halloc_size += BIG_NEED_SIZE(big_arity(warp));
1339 }
1340 else if (is_not_small(warp)) {
1341 goto finalize;
1342 }
1343 /* Fall through */
1344 case 2:
1345 if (!is_small(tpl[1])) {
1346 goto finalize;
1347 }
1348 incr = tpl[2];
1349 if (is_big(incr)) {
1350 halloc_size += BIG_NEED_SIZE(big_arity(incr));
1351 }
1352 else if (is_not_small(incr)) {
1353 goto finalize;
1354 }
1355 position = signed_val(tpl[1]);
1356 if (position == tb->common.keypos) {
1357 p->fvalue = EXI_KEY_POS;
1358 cret = DB_ERROR_BADPARAM;
1359 goto finalize;
1360 }
1361 else if (position < 1 || position > arityval(handle.dbterm->tpl[0])) {
1362 p->fvalue = EXI_POSITION;
1363 cret = DB_ERROR_BADPARAM;
1364 goto finalize;
1365 }
1366 oldcnt = db_do_read_element(&handle, position);
1367 if (is_big(oldcnt)) {
1368 halloc_size += BIG_NEED_SIZE(big_arity(oldcnt));
1369 }
1370 else if (is_not_small(oldcnt)) {
1371 goto finalize;
1372 }
1373 break;
1374 default:
1375 goto finalize;
1376 }
1377 halloc_size += 2; /* worst growth case: small(0)+small(0)=big(2) */
1378 }
1379
1380 /* The point of no return, no failures from here on.
1381 */
1382 cret = DB_ERROR_NONE;
1383
1384 if (ret_list_prevp) { /* Prepare to return a list */
1385 ret = NIL;
1386 halloc_size += list_size;
1387 hstart = HAlloc(p, halloc_size);
1388 ret_list_currp = hstart;
1389 htop = hstart + list_size;
1390 hend = hstart + halloc_size;
1391 }
1392 else {
1393 #ifdef DEBUG
1394 ret = THE_NON_VALUE;
1395 #endif
1396 hstart = htop = HAlloc(p, halloc_size);
1397 }
1398 hend = hstart + halloc_size;
1399
1400 for (iter=upop_list ; is_not_nil(iter); iter = CDR(list_val(iter))) {
1401
1402 Eterm* tpl = tuple_val(CAR(list_val(iter)));
1403 Sint position = signed_val(tpl[1]);
1404 Eterm incr = tpl[2];
1405 Wterm oldcnt = db_do_read_element(&handle,position);
1406 Eterm newcnt = db_add_counter(&htop, oldcnt, incr);
1407
1408 if (newcnt == NIL) {
1409 cret = DB_ERROR_SYSRES; /* Can only happen if BIG_ARITY_MAX */
1410 ret = NIL; /* is reached, ie should not happen */
1411 htop = hstart;
1412 break;
1413 }
1414 ASSERT(is_integer(newcnt));
1415
1416 if (arityval(*tpl) == 4) { /* Maybe warp it */
1417 Eterm threshold = tpl[3];
1418 if ((CMP(incr,make_small(0)) < 0) ? /* negative increment? */
1419 (CMP(newcnt,threshold) < 0) : /* if negative, check if below */
1420 (CMP(newcnt,threshold) > 0)) { /* else check if above threshold */
1421
1422 newcnt = tpl[4];
1423 }
1424 }
1425
1426 db_do_update_element(&handle,position,newcnt);
1427
1428 if (ret_list_prevp) {
1429 *ret_list_prevp = CONS(ret_list_currp,newcnt,NIL);
1430 ret_list_prevp = &CDR(ret_list_currp);
1431 ret_list_currp += 2;
1432 }
1433 else {
1434 ret = newcnt;
1435 break;
1436 }
1437 }
1438
1439 ASSERT(is_integer(ret) || is_nil(ret) ||
1440 (is_list(ret) && (list_val(ret)+list_size)==ret_list_currp));
1441 ASSERT(htop <= hend);
1442
1443 HRelease(p, hend, htop);
1444
1445 finalize:
1446 tb->common.meth->db_finalize_dbterm(cret, &handle);
1447
1448 bail_out:
1449 UnUseTmpHeap(5, p);
1450 db_unlock(tb, LCK_WRITE_REC);
1451
1452 switch (cret) {
1453 case DB_ERROR_NONE:
1454 BIF_RET(ret);
1455 case DB_ERROR_SYSRES:
1456 BIF_ERROR(p, SYSTEM_LIMIT);
1457 case DB_ERROR_BADPARAM:
1458 BIF_ERROR(p, BADARG | EXF_HAS_EXT_INFO);
1459 default:
1460 BIF_ERROR(p, BADARG);
1461 break;
1462 }
1463 }
1464
1465 /*
1466 ** update_counter(Tab, Key, Incr)
1467 ** update_counter(Tab, Key, Upop)
1468 ** update_counter(Tab, Key, [{Upop}])
1469 ** Upop = {Pos,Incr} | {Pos,Incr,Threshold,WarpTo}
1470 ** Returns new value(s) (integer or [integer])
1471 */
ets_update_counter_3(BIF_ALIST_3)1472 BIF_RETTYPE ets_update_counter_3(BIF_ALIST_3)
1473 {
1474 DbTable* tb;
1475
1476 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_counter_3);
1477
1478 return do_update_counter(BIF_P, tb, BIF_ARG_2, BIF_ARG_3, THE_NON_VALUE);
1479 }
1480
1481 /*
1482 ** update_counter(Tab, Key, Incr, Default)
1483 ** update_counter(Tab, Key, Upop, Default)
1484 ** update_counter(Tab, Key, [{Upop}], Default)
1485 ** Upop = {Pos,Incr} | {Pos,Incr,Threshold,WarpTo}
1486 ** Returns new value(s) (integer or [integer])
1487 */
ets_update_counter_4(BIF_ALIST_4)1488 BIF_RETTYPE ets_update_counter_4(BIF_ALIST_4)
1489 {
1490 DbTable* tb;
1491
1492 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_counter_4);
1493
1494 if (is_not_tuple(BIF_ARG_4)) {
1495 db_unlock(tb, LCK_WRITE_REC);
1496 BIF_ERROR(BIF_P, BADARG);
1497 }
1498
1499 return do_update_counter(BIF_P, tb, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
1500 }
1501
1502 typedef enum {
1503 ETS_INSERT_2_LIST_PROCESS_LOCAL,
1504 ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK,
1505 ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK_DESTROY,
1506 ETS_INSERT_2_LIST_GLOBAL
1507 } ets_insert_2_list_status;
1508
1509 typedef struct {
1510 ets_insert_2_list_status status;
1511 BIF_RETTYPE destroy_return_value;
1512 DbTable* tb;
1513 void* continuation_state;
1514 Binary* continuation_res_bin;
1515 } ets_insert_2_list_info;
1516
1517
1518 static ERTS_INLINE BIF_RETTYPE
ets_cret_to_return_value(Process * p,int cret)1519 ets_cret_to_return_value(Process* p, int cret)
1520 {
1521 ASSERT(p || cret == DB_ERROR_NONE_FALSE || cret == DB_ERROR_NONE);
1522 switch (cret) {
1523 case DB_ERROR_NONE_FALSE:
1524 BIF_RET(am_false);
1525 case DB_ERROR_NONE:
1526 BIF_RET(am_true);
1527 case DB_ERROR_SYSRES:
1528 BIF_ERROR(p, SYSTEM_LIMIT);
1529 default:
1530 BIF_ERROR(p, BADARG);
1531 }
1532 }
1533
1534 /*
1535 * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
1536 * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
1537 *
1538 * Start of code section that Yielding C Fun (YCF) transforms
1539 *
1540 * The functions within #idef YCF_FUNCTIONS below are not called directly.
1541 * YCF generates yieldable versions of these functions before "erl_db.c" is
1542 * compiled. These generated functions are placed in the file
1543 * "erl_db_insert_list.ycf.h" which is included below. The generation of
1544 * "erl_db_insert_list.ycf.h" is defined in
1545 * "$ERL_TOP/erts/emulator/Makefile.in". See
1546 * "$ERL_TOP/erts/emulator/internal_doc/AutomaticYieldingOfCCode.md"
1547 * for more information about YCF.
1548 *
1549 * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
1550 * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
1551 */
1552
1553 /*
1554 * The LOCAL_VARIABLE macro is a trick to create a local variable that does not
1555 * get renamed by YCF.
1556 * Such variables will not retain their values over yields. Beware!
1557 *
1558 * I use this as a workaround for a limitation/bug in YCF. It does not do
1559 * proper variable name substitution in expressions passed as argument to
1560 * YCF_CONSUME_REDS(Expr).
1561 */
1562 #define LOCAL_VARIABLE(TYPE, NAME) TYPE NAME
1563
1564 #ifdef YCF_FUNCTIONS
ets_insert_2_list_check(int keypos,Eterm list)1565 static long ets_insert_2_list_check(int keypos, Eterm list)
1566 {
1567 Eterm lst = THE_NON_VALUE;
1568 long i = 0;
1569 for (lst = list; is_list(lst); lst = CDR(list_val(lst))) {
1570 i++;
1571 if (is_not_tuple(CAR(list_val(lst))) ||
1572 (arityval(*tuple_val(CAR(list_val(lst)))) < keypos)) {
1573 return -1;
1574 }
1575 }
1576 if (lst != NIL) {
1577 return -1;
1578 }
1579 return i;
1580 }
1581
ets_insert_new_2_list_has_member(DbTable * tb,Eterm list)1582 static int ets_insert_new_2_list_has_member(DbTable* tb, Eterm list)
1583 {
1584 Eterm lst;
1585 Eterm lookup_ret;
1586 DbTableMethod* meth = tb->common.meth;
1587 for (lst = list; is_list(lst); lst = CDR(list_val(lst))) {
1588 meth->db_member(tb,
1589 TERM_GETKEY(tb,CAR(list_val(lst))),
1590 &lookup_ret);
1591 if (lookup_ret != am_false) {
1592 return 1;
1593 }
1594 }
1595 return 0;
1596 }
1597
ets_insert_2_list_from_p_heap(DbTable * tb,Eterm list)1598 static int ets_insert_2_list_from_p_heap(DbTable* tb, Eterm list)
1599 {
1600 Eterm lst;
1601 DbTableMethod* meth = tb->common.meth;
1602 int cret = DB_ERROR_NONE;
1603 for (lst = list; is_list(lst); lst = CDR(list_val(lst))) {
1604 LOCAL_VARIABLE(SWord, consumed_reds);
1605 consumed_reds = 1;
1606 cret = meth->db_put(tb, CAR(list_val(lst)), 0, &consumed_reds);
1607 if (cret != DB_ERROR_NONE)
1608 return cret;
1609 YCF_CONSUME_REDS(consumed_reds);
1610 }
1611 return DB_ERROR_NONE;
1612 }
1613 #endif /* YCF_FUNCTIONS */
1614
1615 /* This function is called both as is, and as YCF transformed. */
ets_insert_2_list_destroy_copied_dbterms(DbTableMethod * meth,int compressed,void * db_term_list)1616 static void ets_insert_2_list_destroy_copied_dbterms(DbTableMethod* meth,
1617 int compressed,
1618 void* db_term_list)
1619 {
1620 void* lst = db_term_list;
1621 void* term = NULL;
1622 while (lst != NULL) {
1623 term = meth->db_dbterm_list_remove_first(&lst);
1624 meth->db_free_dbterm(compressed, term);
1625 }
1626 }
1627
1628 #ifdef YCF_FUNCTIONS
ets_insert_2_list_copy_term_list(DbTableMethod * meth,int compress,int keypos,Eterm list)1629 static void* ets_insert_2_list_copy_term_list(DbTableMethod* meth,
1630 int compress,
1631 int keypos,
1632 Eterm list)
1633 {
1634 void* db_term_list = NULL;
1635 void *term;
1636 Eterm lst;
1637 for (lst = list; is_list(lst); lst = CDR(list_val(lst))) {
1638 term = meth->db_eterm_to_dbterm(compress,
1639 keypos,
1640 CAR(list_val(lst)));
1641 if (db_term_list != NULL) {
1642 db_term_list =
1643 meth->db_dbterm_list_prepend(db_term_list,
1644 term);
1645 } else {
1646 db_term_list = term;
1647 }
1648 }
1649
1650 return db_term_list;
1651
1652 /* The following code will be executed if the calling process is
1653 killed in the middle of the for loop above*/
1654 YCF_SPECIAL_CODE_START(ON_DESTROY_STATE); {
1655 ets_insert_2_list_destroy_copied_dbterms(meth,
1656 compress,
1657 db_term_list);
1658 } YCF_SPECIAL_CODE_END();
1659 }
1660
ets_insert_new_2_dbterm_list_has_member(DbTable * tb,void * db_term_list)1661 static int ets_insert_new_2_dbterm_list_has_member(DbTable* tb, void* db_term_list)
1662 {
1663 Eterm lookup_ret;
1664 DbTableMethod* meth = tb->common.meth;
1665 void* lst = db_term_list;
1666 void* term = NULL;
1667 Eterm key;
1668 while (lst != NULL) {
1669 term = meth->db_dbterm_list_remove_first(&lst);
1670 key = meth->db_get_dbterm_key(tb, term);
1671 meth->db_member(tb, key, &lookup_ret);
1672 if (lookup_ret != am_false) {
1673 return 1;
1674 }
1675 }
1676 return 0;
1677 }
1678
ets_insert_2_list_insert_db_term_list(DbTable * tb,void * list)1679 static void ets_insert_2_list_insert_db_term_list(DbTable* tb,
1680 void* list)
1681 {
1682 void* lst = list;
1683 void* term = NULL;
1684 DbTableMethod* meth = tb->common.meth;
1685 do {
1686 LOCAL_VARIABLE(SWord, consumed_reds);
1687 consumed_reds = 1;
1688 term = meth->db_dbterm_list_remove_first(&lst);
1689 meth->db_put_dbterm(tb, term, 0, &consumed_reds);
1690 YCF_CONSUME_REDS(consumed_reds);
1691 } while (lst != NULL);
1692 return;
1693 }
1694
ets_insert_2_list_lock_tbl(Eterm table_id,Process * p,Uint bif_ix,ets_insert_2_list_status on_success_status)1695 static void ets_insert_2_list_lock_tbl(Eterm table_id,
1696 Process* p,
1697 Uint bif_ix,
1698 ets_insert_2_list_status on_success_status)
1699 {
1700 BIF_RETTYPE fail_ret;
1701 DbTable* tb;
1702 do {
1703 fail_ret = db_get_table_or_fail_return(&tb,
1704 table_id,
1705 DB_WRITE,
1706 LCK_WRITE,
1707 bif_ix,
1708 p);
1709 if (tb == NULL) {
1710 ets_insert_2_list_info *ctx = YCF_GET_EXTRA_CONTEXT();
1711 if (p->freason == TRAP) {
1712 ctx->status = ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK;
1713 } else {
1714 ctx->status = ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK_DESTROY;
1715 ctx->destroy_return_value = fail_ret;
1716 }
1717 #ifdef DEBUG
1718 /*
1719 * Setting ctx to NULL to avoid that YCF crashes with a
1720 * pointer to stack error when running a debug
1721 * build. YCF_GET_EXTRA_CONTEXT() may change between
1722 * yields as we use stack allocated data for the context
1723 * before the first yield so it is important that the
1724 * context is obtained again with YCF_GET_EXTRA_CONTEXT()
1725 * if a yield might have happened.
1726 */
1727 ctx = NULL;
1728 #endif
1729 YCF_YIELD();
1730 } else {
1731 ets_insert_2_list_info *ctx = YCF_GET_EXTRA_CONTEXT();
1732 ctx->status = on_success_status;
1733 ASSERT(DB_LOCK_FREE(tb) || erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
1734 ASSERT(!(tb->common.status & DB_DELETE));
1735 }
1736 } while (tb == NULL);
1737 }
1738 #endif /* YCF_FUNCTIONS */
1739
can_insert_without_yield(Uint32 tb_type,long list_len,long reds_left)1740 static ERTS_INLINE int can_insert_without_yield(Uint32 tb_type,
1741 long list_len,
1742 long reds_left)
1743 {
1744 if (tb_type & DB_BAG) {
1745 /* Bag inserts can be really bad and we don't know how much searching
1746 * for duplicates we will do */
1747 return 0;
1748 }
1749 else {
1750 return list_len <= reds_left;
1751 }
1752 }
1753
1754 #ifdef YCF_FUNCTIONS
ets_insert_2_list(Process * p,Eterm table_id,DbTable * tb,Eterm list,int is_insert_new)1755 static BIF_RETTYPE ets_insert_2_list(Process* p,
1756 Eterm table_id,
1757 DbTable *tb,
1758 Eterm list,
1759 int is_insert_new)
1760 {
1761 int cret = DB_ERROR_NONE;
1762 void* db_term_list = NULL; /* OBS: memory managements depends on that
1763 db_term_list is initialized to NULL */
1764 DbTableMethod* meth = tb->common.meth;
1765 int compressed = tb->common.compress;
1766 int keypos = tb->common.keypos;
1767 Uint32 tb_type = tb->common.type;
1768 Uint bif_ix = (is_insert_new ? BIF_ets_insert_new_2 : BIF_ets_insert_2);
1769 long list_len;
1770 /* tb should not be accessed after this point unless the table
1771 lock is held as the table can get deleted while the function is
1772 yielding */
1773 list_len = ets_insert_2_list_check(keypos, list);
1774 if (list_len < 0) {
1775 Eterm ret;
1776 /*
1777 * Check whether we have sufficient access rights for the
1778 * table. This is necessary to ensure that the correct reason
1779 * for the failure will be available in stack backtrace.
1780 */
1781 ets_insert_2_list_lock_tbl(table_id, p, bif_ix, ETS_INSERT_2_LIST_PROCESS_LOCAL);
1782 db_unlock(tb, LCK_WRITE);
1783 ERTS_BIF_PREP_ERROR_TRAPPED2(ret, p, BADARG, BIF_TRAP_EXPORT(bif_ix), table_id, list);
1784 return ret;
1785 }
1786 if (can_insert_without_yield(tb_type, list_len, YCF_NR_OF_REDS_LEFT())) {
1787 long reds_boost;
1788 /* There is enough reductions left to do the inserts directly
1789 from the heap without yielding */
1790 ets_insert_2_list_lock_tbl(table_id, p, bif_ix, ETS_INSERT_2_LIST_PROCESS_LOCAL);
1791 /* Ensure that we will not yield while inserting from heap */
1792 reds_boost = YCF_MAX_NR_OF_REDS - YCF_NR_OF_REDS_LEFT();
1793 YCF_SET_NR_OF_REDS_LEFT(YCF_MAX_NR_OF_REDS);
1794 if (is_insert_new) {
1795 if (ets_insert_new_2_list_has_member(tb, list)) {
1796 cret = DB_ERROR_NONE_FALSE;
1797 } else {
1798 cret = ets_insert_2_list_from_p_heap(tb, list);
1799 }
1800 } else {
1801 cret = ets_insert_2_list_from_p_heap(tb, list);
1802 }
1803 db_unlock(tb, LCK_WRITE);
1804 YCF_SET_NR_OF_REDS_LEFT(YCF_NR_OF_REDS_LEFT() - reds_boost);
1805 return ets_cret_to_return_value(p, cret);
1806 }
1807 /* Copy term list from heap so that other processes can help */
1808 db_term_list =
1809 ets_insert_2_list_copy_term_list(meth, compressed, keypos, list);
1810 /* Lock table */
1811 ets_insert_2_list_lock_tbl(table_id, p, bif_ix, ETS_INSERT_2_LIST_GLOBAL);
1812 /* The operation must complete after this point */
1813 if (is_insert_new) {
1814 if (ets_insert_new_2_dbterm_list_has_member(tb, db_term_list)) {
1815 ets_insert_2_list_destroy_copied_dbterms(meth,
1816 compressed,
1817 db_term_list);
1818 cret = DB_ERROR_NONE_FALSE;
1819 } else {
1820 ets_insert_2_list_insert_db_term_list(tb, db_term_list);
1821 }
1822 } else {
1823 ets_insert_2_list_insert_db_term_list(tb, db_term_list);
1824 }
1825 if (tb->common.continuation != NULL) {
1826 /* Uninstall the continuation from the table struct */
1827 tb->common.continuation = NULL;
1828 if (is_insert_new) {
1829 int* result_ptr =
1830 ERTS_MAGIC_BIN_DATA(tb->common.continuation_res_bin);
1831 *result_ptr = cret;
1832 erts_bin_release(tb->common.continuation_res_bin);
1833 }
1834 tb->common.status |= tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
1835 tb->common.status &= ~DB_BUSY;
1836 erts_atomic_set_relb(&tb->common.continuation_state, (Sint)NULL);
1837 }
1838
1839 return ets_cret_to_return_value(NULL, cret);
1840
1841 /* The following code will be executed if the initiating process
1842 is killed before an ets_insert_2_list_lock_tbl call has
1843 succeeded */
1844 YCF_SPECIAL_CODE_START(ON_DESTROY_STATE); {
1845 ets_insert_2_list_destroy_copied_dbterms(meth,
1846 compressed,
1847 db_term_list);
1848 } YCF_SPECIAL_CODE_END();
1849 }
1850 #endif /* YCF_FUNCTIONS */
1851
1852 /*
1853 * < < < < < < < < < < < < < < < < < < < < < < < < < < < < < <
1854 * < < < < < < < < < < < < < < < < < < < < < < < < < < < < < <
1855 *
1856 * End of code section that Yielding C Fun (YCF) transforms
1857 *
1858 * < < < < < < < < < < < < < < < < < < < < < < < < < < < < < <
1859 * < < < < < < < < < < < < < < < < < < < < < < < < < < < < < <
1860 */
1861 #if defined(DEBUG) && defined(ARCH_64)
1862 #include "erl_db_insert_list.debug.ycf.h"
1863 #else
1864 #include "erl_db_insert_list.ycf.h"
1865 #endif
1866
ets_insert_2_yield_alloc(size_t size,void * ctx)1867 static void* ets_insert_2_yield_alloc(size_t size, void* ctx)
1868 {
1869 (void)ctx;
1870 return erts_alloc(ERTS_ALC_T_ETS_I_LST_TRAP, size);
1871 }
1872
ets_insert_2_yield_free(void * data,void * ctx)1873 static void ets_insert_2_yield_free(void* data, void* ctx)
1874 {
1875 (void)ctx;
1876 erts_free(ERTS_ALC_T_ETS_I_LST_TRAP, data);
1877 }
1878
ets_insert_2_list_yield_dtor(Binary * bin)1879 static int ets_insert_2_list_yield_dtor(Binary* bin)
1880 {
1881 ets_insert_2_list_info* ctx = ERTS_MAGIC_BIN_DATA(bin);
1882 if (ctx->status != ETS_INSERT_2_LIST_GLOBAL &&
1883 ctx->continuation_state != NULL) {
1884 /* The operation has not been committed to the table and has
1885 not completed*/
1886 ets_insert_2_list_ycf_gen_destroy(ctx->continuation_state);
1887 }
1888 return 1;
1889 }
1890
ets_insert_2_list_continuation(long * reds_ptr,void ** state,void * extra_context)1891 static void ets_insert_2_list_continuation(long *reds_ptr,
1892 void** state,
1893 void* extra_context)
1894 {
1895 #if defined(DEBUG) && defined(ARCH_64)
1896 ycf_debug_set_stack_start(reds_ptr);
1897 #endif
1898 ets_insert_2_list_ycf_gen_continue(reds_ptr, state, extra_context);
1899 #if defined(DEBUG) && defined(ARCH_64)
1900 ycf_debug_reset_stack_start();
1901 #endif
1902 }
1903
db_insert_new_2_res_bin_dtor(Binary * context_bin)1904 static int db_insert_new_2_res_bin_dtor(Binary *context_bin)
1905 {
1906 (void)context_bin;
1907 return 1;
1908 }
1909
1910 #define ITERATIONS_PER_RED 8
1911
ets_insert_2_list_driver(Process * p,Eterm tid,Eterm list,int is_insert_new)1912 static BIF_RETTYPE ets_insert_2_list_driver(Process* p,
1913 Eterm tid,
1914 Eterm list,
1915 int is_insert_new) {
1916 const long reds = ITERATIONS_PER_RED * ERTS_BIF_REDS_LEFT(p);
1917 long nr_of_reductions = DBG_RANDOM_REDS(reds, (Uint)&p);
1918 const long init_reds = nr_of_reductions;
1919 ets_insert_2_list_info* ctx = NULL;
1920 ets_insert_2_list_info ictx;
1921 BIF_RETTYPE ret = THE_NON_VALUE;
1922 Eterm state_mref = list;
1923 Uint bix = (is_insert_new ? BIF_ets_insert_new_2 : BIF_ets_insert_2);
1924 if (is_internal_magic_ref(state_mref)) {
1925 Binary* state_bin = erts_magic_ref2bin(state_mref);
1926 if (ERTS_MAGIC_BIN_DESTRUCTOR(state_bin) != ets_insert_2_list_yield_dtor) {
1927 BIF_ERROR(p, BADARG);
1928 }
1929 /* Continue a trapped call */
1930 erts_set_gc_state(p, 1);
1931 ctx = ERTS_MAGIC_BIN_DATA(state_bin);
1932 if (ctx->status == ETS_INSERT_2_LIST_GLOBAL) {
1933 /* An operation that can be helped by other operations is
1934 handled here */
1935 Uint freason;
1936 int cret = DB_ERROR_NONE;
1937 DbTable* tb;
1938 /* First check if another process has completed the
1939 operation without acquiring the lock */
1940 tb = db_get_table(p, tid, DB_READ_TBL_STRUCT, NOLCK_ACCESS, &freason);
1941 ASSERT(tb || freason != TRAP);
1942 if (tb != NULL &&
1943 (void*)erts_atomic_read_acqb(&tb->common.continuation_state) ==
1944 ctx->continuation_state) {
1945 /* The lock has to be taken to complete the operation */
1946 if (NULL == (tb = db_get_table(p, tid, DB_WRITE, LCK_WRITE, &freason))) {
1947 if (freason == TRAP){
1948 erts_set_gc_state(p, 0);
1949 return db_bif_fail(p, freason, bix, NULL);
1950 }
1951 }
1952 /* Must be done since the db_get_table call did not trap */
1953 if (tb != NULL) {
1954 db_unlock(tb, LCK_WRITE);
1955 }
1956 }
1957 if (is_insert_new) {
1958 int* res = ERTS_MAGIC_BIN_DATA(ctx->continuation_res_bin);
1959 cret = *res;
1960 }
1961 return ets_cret_to_return_value(NULL, cret);
1962 } else {
1963 #if defined(DEBUG) && defined(ARCH_64)
1964 ycf_debug_set_stack_start(&nr_of_reductions);
1965 #endif
1966 ret = ets_insert_2_list_ycf_gen_continue(&nr_of_reductions,
1967 &ctx->continuation_state,
1968 ctx);
1969 #if defined(DEBUG) && defined(ARCH_64)
1970 ycf_debug_reset_stack_start();
1971 #endif
1972 }
1973 } else {
1974 /* Start call */
1975 ictx.continuation_state = NULL;
1976 ictx.status = ETS_INSERT_2_LIST_PROCESS_LOCAL;
1977 ictx.tb = NULL;
1978 ctx = &ictx;
1979 DB_GET_TABLE(ctx->tb, tid, DB_READ_TBL_STRUCT, NOLCK_ACCESS, bix, NULL, p);
1980 #if defined(DEBUG) && defined(ARCH_64)
1981 ycf_debug_set_stack_start(&nr_of_reductions);
1982 #endif
1983 ret = ets_insert_2_list_ycf_gen_yielding(&nr_of_reductions,
1984 &ctx->continuation_state,
1985 ctx,
1986 ets_insert_2_yield_alloc,
1987 ets_insert_2_yield_free,
1988 NULL,
1989 0,
1990 NULL,
1991 p,
1992 tid,
1993 ctx->tb,
1994 list,
1995 is_insert_new);
1996 #if defined(DEBUG) && defined(ARCH_64)
1997 ycf_debug_reset_stack_start();
1998 #endif
1999 if (ctx->continuation_state != NULL) {
2000 Binary* state_bin = erts_create_magic_binary(sizeof(ets_insert_2_list_info),
2001 ets_insert_2_list_yield_dtor);
2002 Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
2003 state_mref = erts_mk_magic_ref(&hp, &MSO(p), state_bin);
2004 ctx = ERTS_MAGIC_BIN_DATA(state_bin);
2005 *ctx = ictx;
2006 }
2007 }
2008 BUMP_REDS(p, (init_reds - nr_of_reductions) / ITERATIONS_PER_RED);
2009 if (ctx->status == ETS_INSERT_2_LIST_GLOBAL &&
2010 ctx->continuation_state != NULL &&
2011 ctx->tb->common.continuation == NULL) {
2012 /* Install the continuation in the table structure so other
2013 threads can help */
2014 if (is_insert_new) {
2015 Binary* bin =
2016 erts_create_magic_binary(sizeof(int),
2017 db_insert_new_2_res_bin_dtor);
2018 Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
2019 erts_mk_magic_ref(&hp, &MSO(p), bin);
2020 erts_refc_inctest(&bin->intern.refc, 2);
2021 ctx->tb->common.continuation_res_bin = bin;
2022 ctx->continuation_res_bin = bin;
2023 }
2024 ctx->tb->common.continuation = ets_insert_2_list_continuation;
2025 ctx->tb->common.status &= ~(DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
2026 ctx->tb->common.status |= DB_BUSY;
2027 erts_atomic_set_relb(&ctx->tb->common.continuation_state,
2028 (Sint)ctx->continuation_state);
2029 }
2030 if (ctx->status == ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK_DESTROY) {
2031 return ctx->destroy_return_value;
2032 }
2033 if (ctx->status == ETS_INSERT_2_LIST_GLOBAL) {
2034 db_unlock(ctx->tb, LCK_WRITE);
2035 }
2036 if (ctx->continuation_state != NULL) {
2037 erts_set_gc_state(p, 0);
2038 BIF_TRAP2(BIF_TRAP_EXPORT(bix), p, tid, state_mref);
2039 }
2040 return ret;
2041 }
2042
2043 /*
2044 ** The put BIF
2045 */
ets_insert_2(BIF_ALIST_2)2046 BIF_RETTYPE ets_insert_2(BIF_ALIST_2)
2047 {
2048 DbTable* tb;
2049 int cret = DB_ERROR_NONE;
2050 Eterm insert_term;
2051 DbTableMethod* meth;
2052 SWord consumed_reds = 0;
2053 CHECK_TABLES();
2054 if (BIF_ARG_2 == NIL) {
2055 /* Check that the table exists */
2056 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_insert_2);
2057 db_unlock(tb, LCK_WRITE_REC);
2058 BIF_RET(am_true);
2059 } if ((is_list(BIF_ARG_2) && CDR(list_val(BIF_ARG_2)) != NIL) ||
2060 is_internal_magic_ref(BIF_ARG_2)) {
2061 /* Handle list case */
2062 return ets_insert_2_list_driver(BIF_P,
2063 BIF_ARG_1,
2064 BIF_ARG_2,
2065 0);
2066 } else if (is_list(BIF_ARG_2)) {
2067 insert_term = CAR(list_val(BIF_ARG_2));
2068 } else {
2069 insert_term = BIF_ARG_2;
2070 }
2071
2072 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_insert_2);
2073
2074 meth = tb->common.meth;
2075 if (is_not_tuple(insert_term) ||
2076 (arityval(*tuple_val(insert_term)) < tb->common.keypos)) {
2077 db_unlock(tb, LCK_WRITE_REC);
2078 BIF_ERROR(BIF_P, BADARG);
2079 }
2080 cret = meth->db_put(tb, insert_term, 0, &consumed_reds);
2081
2082 db_unlock(tb, LCK_WRITE_REC);
2083
2084 BUMP_REDS(BIF_P, consumed_reds / ITERATIONS_PER_RED);
2085 return ets_cret_to_return_value(BIF_P, cret);
2086 }
2087
2088
2089 /*
2090 ** The put-if-not-already-there BIF...
2091 */
ets_insert_new_2(BIF_ALIST_2)2092 BIF_RETTYPE ets_insert_new_2(BIF_ALIST_2)
2093 {
2094 DbTable* tb;
2095 int cret = DB_ERROR_NONE;
2096 Eterm ret = am_true;
2097 Eterm obj;
2098 db_lock_kind_t kind;
2099 SWord consumed_reds = 0;
2100 CHECK_TABLES();
2101
2102 if (BIF_ARG_2 == NIL) {
2103 /* Check that the table exists */
2104 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_insert_2);
2105 db_unlock(tb, LCK_WRITE_REC);
2106 BIF_RET(am_true);
2107 } if ((is_list(BIF_ARG_2) && CDR(list_val(BIF_ARG_2)) != NIL) ||
2108 is_internal_magic_ref(BIF_ARG_2)) {
2109 /* Handle list case */
2110 return ets_insert_2_list_driver(BIF_P, BIF_ARG_1, BIF_ARG_2, 1);
2111 } else if (is_list(BIF_ARG_2)) {
2112 obj = CAR(list_val(BIF_ARG_2));
2113 } else {
2114 obj = BIF_ARG_2;
2115 }
2116
2117 /* Only one object */
2118 kind = LCK_WRITE_REC;
2119 DB_BIF_GET_TABLE(tb, DB_WRITE, kind, BIF_ets_insert_new_2);
2120
2121 if (is_not_tuple(obj)
2122 || (arityval(*tuple_val(obj)) < tb->common.keypos)) {
2123 db_unlock(tb, kind);
2124 BIF_ERROR(BIF_P, BADARG);
2125 }
2126 cret = tb->common.meth->db_put(tb, obj,
2127 1, /* key_clash_fail */
2128 &consumed_reds);
2129
2130 db_unlock(tb, kind);
2131
2132 BUMP_REDS(BIF_P, consumed_reds / ITERATIONS_PER_RED);
2133 switch (cret) {
2134 case DB_ERROR_NONE:
2135 BIF_RET(ret);
2136 case DB_ERROR_BADKEY:
2137 BIF_RET(am_false);
2138 case DB_ERROR_SYSRES:
2139 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2140 default:
2141 BIF_ERROR(BIF_P, BADARG);
2142 }
2143 }
2144
2145 /*
2146 ** Rename a (possibly) named table
2147 */
2148
ets_rename_2(BIF_ALIST_2)2149 BIF_RETTYPE ets_rename_2(BIF_ALIST_2)
2150 {
2151 DbTable* tb;
2152 Eterm ret;
2153 Eterm old_name;
2154 erts_rwmtx_t *lck1, *lck2;
2155 Uint freason;
2156
2157 #ifdef HARDDEBUG
2158 erts_fprintf(stderr,
2159 "ets:rename(%T,%T); Process: %T, initial: %T:%T/%bpu\n",
2160 BIF_ARG_1, BIF_ARG_2, BIF_P->common.id,
2161 BIF_P->u.initial[0], BIF_P->u.initial[1], BIF_P->u.initial[2]);
2162 #endif
2163
2164
2165 if (is_not_atom(BIF_ARG_2)) {
2166 /* Do lookup to report bad table identifier or table name. */
2167 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_READ, BIF_ets_rename_2);
2168 db_unlock(tb, LCK_READ);
2169 BIF_ERROR(BIF_P, BADARG);
2170 }
2171
2172 (void) meta_name_tab_bucket(BIF_ARG_2, &lck1);
2173
2174 if (is_atom(BIF_ARG_1)) {
2175 old_name = BIF_ARG_1;
2176 named_tab:
2177 (void) meta_name_tab_bucket(old_name, &lck2);
2178 if (lck1 == lck2)
2179 lck2 = NULL;
2180 else if (lck1 > lck2) {
2181 erts_rwmtx_t *tmp = lck1;
2182 lck1 = lck2;
2183 lck2 = tmp;
2184 }
2185 }
2186 else {
2187 tb = tid2tab(BIF_ARG_1, &BIF_P->fvalue);
2188 if (!tb)
2189 BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
2190 else {
2191 if (is_table_named(tb)) {
2192 old_name = tb->common.the_name;
2193 goto named_tab;
2194 }
2195 lck2 = NULL;
2196 }
2197 }
2198
2199 erts_rwmtx_rwlock(lck1);
2200 if (lck2)
2201 erts_rwmtx_rwlock(lck2);
2202
2203 tb = db_get_table_aux(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, 1, &freason);
2204 if (!tb)
2205 goto fail;
2206
2207 if (is_table_named(tb)) {
2208 if (!insert_named_tab(BIF_ARG_2, tb, 1))
2209 goto badarg;
2210
2211 if (!remove_named_tab(tb, 1))
2212 erts_exit(ERTS_ERROR_EXIT,"Could not find named tab %s", tb->common.the_name);
2213 ret = BIF_ARG_2;
2214 }
2215 else { /* Not a named table */
2216 ret = BIF_ARG_1;
2217 }
2218 tb->common.the_name = BIF_ARG_2;
2219
2220 db_unlock(tb, LCK_WRITE);
2221 erts_rwmtx_rwunlock(lck1);
2222 if (lck2)
2223 erts_rwmtx_rwunlock(lck2);
2224 BIF_RET(ret);
2225
2226 badarg:
2227 freason = BADARG;
2228
2229 fail:
2230 if (tb)
2231 db_unlock(tb, LCK_WRITE);
2232 erts_rwmtx_rwunlock(lck1);
2233 if (lck2)
2234 erts_rwmtx_rwunlock(lck2);
2235
2236 return db_bif_fail(BIF_P, freason, BIF_ets_rename_2, NULL);
2237 }
2238
2239
2240 /*
2241 ** The create table BIF
2242 ** Args: (Name, Properties)
2243 */
2244
ets_new_2(BIF_ALIST_2)2245 BIF_RETTYPE ets_new_2(BIF_ALIST_2)
2246 {
2247 DbTable* tb = NULL;
2248 Eterm list;
2249 Eterm val;
2250 Eterm ret;
2251 Eterm heir;
2252 UWord heir_data;
2253 Uint32 status;
2254 Sint keypos;
2255 int is_named, is_compressed;
2256 int is_fine_locked, frequent_read;
2257 int is_decentralized_counters;
2258 int is_decentralized_counters_option;
2259 int cret;
2260 DbTableMethod* meth;
2261
2262 if (is_not_atom(BIF_ARG_1)) {
2263 BIF_ERROR(BIF_P, BADARG);
2264 }
2265 if (is_not_nil(BIF_ARG_2) && is_not_list(BIF_ARG_2)) {
2266 BIF_ERROR(BIF_P, BADARG);
2267 }
2268
2269 status = DB_SET | DB_PROTECTED;
2270 keypos = 1;
2271 is_named = 0;
2272 is_fine_locked = 0;
2273 frequent_read = 0;
2274 is_decentralized_counters = 0;
2275 is_decentralized_counters_option = -1;
2276 heir = am_none;
2277 heir_data = (UWord) am_undefined;
2278 is_compressed = erts_ets_always_compress;
2279
2280 list = BIF_ARG_2;
2281 while(is_list(list)) {
2282 val = CAR(list_val(list));
2283 if (val == am_bag) {
2284 status |= DB_BAG;
2285 status &= ~(DB_SET | DB_DUPLICATE_BAG | DB_ORDERED_SET | DB_CA_ORDERED_SET);
2286 }
2287 else if (val == am_duplicate_bag) {
2288 status |= DB_DUPLICATE_BAG;
2289 status &= ~(DB_SET | DB_BAG | DB_ORDERED_SET | DB_CA_ORDERED_SET);
2290 }
2291 else if (val == am_ordered_set) {
2292 is_decentralized_counters = 1;
2293 status |= DB_ORDERED_SET;
2294 status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG | DB_CA_ORDERED_SET);
2295 }
2296 else if (is_tuple(val)) {
2297 Eterm *tp = tuple_val(val);
2298 if (arityval(tp[0]) == 2) {
2299 if (tp[1] == am_keypos
2300 && is_small(tp[2]) && (signed_val(tp[2]) > 0)) {
2301 keypos = signed_val(tp[2]);
2302 }
2303 else if (tp[1] == am_write_concurrency) {
2304 if (tp[2] == am_true) {
2305 is_fine_locked = 1;
2306 } else if (tp[2] == am_false) {
2307 is_fine_locked = 0;
2308 } else break;
2309 if (DB_LOCK_FREE(NULL))
2310 is_fine_locked = 0;
2311 }
2312 else if (tp[1] == am_read_concurrency) {
2313 if (tp[2] == am_true) {
2314 frequent_read = 1;
2315 } else if (tp[2] == am_false) {
2316 frequent_read = 0;
2317 } else break;
2318 }
2319 else if (tp[1] == am_heir && tp[2] == am_none) {
2320 heir = am_none;
2321 heir_data = am_undefined;
2322 }
2323 else if (tp[1] == am_decentralized_counters) {
2324 if (tp[2] == am_true) {
2325 is_decentralized_counters_option = 1;
2326 } else if (tp[2] == am_false) {
2327 is_decentralized_counters_option = 0;
2328 } else break;
2329 }
2330 else break;
2331 }
2332 else if (arityval(tp[0]) == 3 && tp[1] == am_heir
2333 && is_internal_pid(tp[2])) {
2334 heir = tp[2];
2335 heir_data = tp[3];
2336 }
2337 else break;
2338 }
2339 else if (val == am_public) {
2340 status |= DB_PUBLIC;
2341 status &= ~(DB_PROTECTED|DB_PRIVATE);
2342 }
2343 else if (val == am_private) {
2344 status |= DB_PRIVATE;
2345 status &= ~(DB_PROTECTED|DB_PUBLIC);
2346 }
2347 else if (val == am_named_table) {
2348 is_named = 1;
2349 status |= DB_NAMED_TABLE;
2350 }
2351 else if (val == am_compressed) {
2352 is_compressed = 1;
2353 }
2354 else if (val == am_set || val == am_protected)
2355 ;
2356 else break;
2357
2358 list = CDR(list_val(list));
2359 }
2360 if (is_not_nil(list)) { /* bad opt or not a well formed list */
2361 BIF_ERROR(BIF_P, BADARG);
2362 }
2363 if (-1 != is_decentralized_counters_option) {
2364 is_decentralized_counters = is_decentralized_counters_option;
2365 }
2366 if (IS_TREE_TABLE(status) && is_fine_locked && !(status & DB_PRIVATE)) {
2367 meth = &db_catree;
2368 status |= DB_CA_ORDERED_SET;
2369 status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG | DB_ORDERED_SET);
2370 status |= DB_FINE_LOCKED;
2371 }
2372 else if (IS_HASH_TABLE(status)) {
2373 meth = &db_hash;
2374 if (is_fine_locked && !(status & DB_PRIVATE)) {
2375 status |= DB_FINE_LOCKED;
2376 }
2377 }
2378 else if (IS_TREE_TABLE(status)) {
2379 meth = &db_tree;
2380 }
2381 else {
2382 BIF_ERROR(BIF_P, BADARG);
2383 }
2384
2385 if (frequent_read && !(status & DB_PRIVATE))
2386 status |= DB_FREQ_READ;
2387
2388 /* we create table outside any table lock
2389 * and take the unusal cost of destroy table if it
2390 * fails to find a slot
2391 */
2392 {
2393 DbTable init_tb;
2394 erts_flxctr_init(&init_tb.common.counters, 0, 2, ERTS_ALC_T_ETS_CTRS);
2395 tb = (DbTable*) erts_db_alloc(ERTS_ALC_T_DB_TABLE,
2396 &init_tb, sizeof(DbTable));
2397 erts_flxctr_init(&tb->common.counters,
2398 (status & DB_FINE_LOCKED) && is_decentralized_counters,
2399 2,
2400 ERTS_ALC_T_ETS_CTRS);
2401 erts_flxctr_add(&tb->common.counters,
2402 ERTS_DB_TABLE_MEM_COUNTER_ID,
2403 DB_GET_APPROX_MEM_CONSUMED(&init_tb) +
2404 erts_flxctr_nr_of_allocated_bytes(&tb->common.counters));
2405 }
2406
2407 tb->common.meth = meth;
2408 tb->common.the_name = BIF_ARG_1;
2409 tb->common.status = status;
2410 tb->common.type = status;
2411 /* Note, 'type' is *read only* from now on... */
2412 tb->common.continuation = NULL;
2413 erts_atomic_set_nob(&tb->common.continuation_state, (Sint)NULL);
2414 erts_refc_init(&tb->common.fix_count, 0);
2415 db_init_lock(tb, status & (DB_FINE_LOCKED|DB_FREQ_READ));
2416 tb->common.keypos = keypos;
2417 tb->common.owner = BIF_P->common.id;
2418 set_heir(BIF_P, tb, heir, heir_data);
2419
2420 tb->common.fixing_procs = NULL;
2421 tb->common.compress = is_compressed;
2422 #ifdef ETS_DBG_FORCE_TRAP
2423 tb->common.dbg_force_trap = erts_ets_dbg_force_trap;
2424 #endif
2425
2426 cret = meth->db_create(BIF_P, tb);
2427 ASSERT(cret == DB_ERROR_NONE); (void)cret;
2428
2429 make_btid(tb);
2430
2431 if (is_named)
2432 ret = BIF_ARG_1;
2433 else
2434 ret = make_tid(BIF_P, tb);
2435
2436 save_sched_table(BIF_P, tb);
2437 save_owned_table(BIF_P, tb);
2438
2439 if (is_named && !insert_named_tab(BIF_ARG_1, tb, 0)) {
2440 tid_clear(BIF_P, tb);
2441 delete_owned_table(BIF_P, tb);
2442
2443 db_lock(tb,LCK_WRITE);
2444 free_heir_data(tb);
2445 tb->common.meth->db_free_empty_table(tb);
2446 db_unlock(tb,LCK_WRITE);
2447 table_dec_refc(tb, 0);
2448 BIF_ERROR(BIF_P, BADARG);
2449 }
2450
2451 BIF_P->flags |= F_USING_DB; /* So we can remove tb if p dies */
2452
2453 #ifdef HARDDEBUG
2454 erts_fprintf(stderr,
2455 "ets:new(%T,%T)=%T; Process: %T, initial: %T:%T/%bpu\n",
2456 BIF_ARG_1, BIF_ARG_2, ret, BIF_P->common.id,
2457 BIF_P->u.initial[0], BIF_P->u.initial[1], BIF_P->u.initial[2]);
2458 #endif
2459
2460 BIF_RET(ret);
2461 }
2462
2463 /*
2464 ** Retrieves the tid() of a named ets table.
2465 */
ets_whereis_1(BIF_ALIST_1)2466 BIF_RETTYPE ets_whereis_1(BIF_ALIST_1)
2467 {
2468 DbTable* tb;
2469 Eterm res;
2470 Uint freason;
2471
2472 if (is_not_atom(BIF_ARG_1)) {
2473 BIF_ERROR(BIF_P, BADARG);
2474 }
2475
2476 if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) {
2477 if (BIF_P->fvalue == EXI_ID) {
2478 BIF_RET(am_undefined);
2479 } else {
2480 //ToDo: Could we avoid this
2481 return db_bif_fail(BIF_P, freason, BIF_ets_whereis_1, NULL);
2482 }
2483 }
2484
2485 res = make_tid(BIF_P, tb);
2486 db_unlock(tb, LCK_READ);
2487
2488 BIF_RET(res);
2489 }
2490
2491 /*
2492 ** The lookup BIF
2493 */
ets_lookup_2(BIF_ALIST_2)2494 BIF_RETTYPE ets_lookup_2(BIF_ALIST_2)
2495 {
2496 DbTable* tb;
2497 int cret;
2498 Eterm ret;
2499
2500 CHECK_TABLES();
2501
2502 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_lookup_2);
2503
2504 cret = tb->common.meth->db_get(BIF_P, tb, BIF_ARG_2, &ret);
2505
2506 db_unlock(tb, LCK_READ);
2507
2508 switch (cret) {
2509 case DB_ERROR_NONE:
2510 BIF_RET(ret);
2511 case DB_ERROR_SYSRES:
2512 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2513 default:
2514 BIF_ERROR(BIF_P, BADARG);
2515 }
2516
2517 }
2518
2519 /*
2520 ** The lookup BIF
2521 */
ets_member_2(BIF_ALIST_2)2522 BIF_RETTYPE ets_member_2(BIF_ALIST_2)
2523 {
2524 DbTable* tb;
2525 int cret;
2526 Eterm ret;
2527
2528 CHECK_TABLES();
2529
2530 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_member_2);
2531
2532 cret = tb->common.meth->db_member(tb, BIF_ARG_2, &ret);
2533
2534 db_unlock(tb, LCK_READ);
2535
2536 switch (cret) {
2537 case DB_ERROR_NONE:
2538 BIF_RET(ret);
2539 case DB_ERROR_SYSRES:
2540 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2541 default:
2542 BIF_ERROR(BIF_P, BADARG);
2543 }
2544
2545 }
2546
2547 /*
2548 ** Get an element from a term
2549 ** get_element_3(Tab, Key, Index)
2550 ** return the element or a list of elements if bag
2551 */
ets_lookup_element_3(BIF_ALIST_3)2552 BIF_RETTYPE ets_lookup_element_3(BIF_ALIST_3)
2553 {
2554 DbTable* tb;
2555 Sint index;
2556 int cret;
2557 Eterm ret;
2558
2559 CHECK_TABLES();
2560
2561 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_lookup_element_3);
2562
2563 if (is_not_small(BIF_ARG_3) || ((index = signed_val(BIF_ARG_3)) < 1)) {
2564 db_unlock(tb, LCK_READ);
2565 BIF_ERROR(BIF_P, BADARG);
2566 }
2567
2568 cret = tb->common.meth->db_get_element(BIF_P, tb,
2569 BIF_ARG_2, index, &ret);
2570 db_unlock(tb, LCK_READ);
2571 switch (cret) {
2572 case DB_ERROR_NONE:
2573 BIF_RET(ret);
2574 case DB_ERROR_SYSRES:
2575 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2576 case DB_ERROR_BADKEY:
2577 BIF_P->fvalue = EXI_BAD_KEY;
2578 BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
2579 default:
2580 BIF_ERROR(BIF_P, BADARG);
2581 }
2582 }
2583
2584 /*
2585 * BIF to erase a whole table and release all memory it holds
2586 */
ets_delete_1(BIF_ALIST_1)2587 BIF_RETTYPE ets_delete_1(BIF_ALIST_1)
2588 {
2589 SWord initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
2590 SWord reds = initial_reds;
2591 DbTable* tb;
2592
2593 #ifdef HARDDEBUG
2594 erts_fprintf(stderr,
2595 "ets:delete(%T); Process: %T, initial: %T:%T/%bpu\n",
2596 BIF_ARG_1, BIF_P->common.id,
2597 BIF_P->u.initial[0], BIF_P->u.initial[1], BIF_P->u.initial[2]);
2598 #endif
2599
2600 CHECK_TABLES();
2601
2602 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_delete_1);
2603
2604 /*
2605 * Clear all access bits to prevent any ets operation to access the
2606 * table while it is being deleted.
2607 */
2608 tb->common.status &= ~(DB_PROTECTED|DB_PUBLIC|DB_PRIVATE);
2609 tb->common.status |= DB_DELETE;
2610
2611 if (tb->common.owner != BIF_P->common.id) {
2612
2613 /*
2614 * The table is being deleted by a process other than its owner.
2615 * To make sure that the table will be completely deleted if the
2616 * current process will be killed (e.g. by an EXIT signal), we will
2617 * now transfer the ownership to the current process.
2618 */
2619
2620 Process *rp = erts_proc_lookup_raw(tb->common.owner);
2621 /*
2622 * Process 'rp' might be exiting, but our table lock prevents it
2623 * from terminating as it cannot complete erts_db_process_exiting().
2624 */
2625 ASSERT(!(ERTS_PSFLG_FREE & erts_atomic32_read_nob(&rp->state)));
2626
2627 delete_owned_table(rp, tb);
2628 BIF_P->flags |= F_USING_DB;
2629 tb->common.owner = BIF_P->common.id;
2630 save_owned_table(BIF_P, tb);
2631 }
2632
2633 if (is_table_named(tb))
2634 remove_named_tab(tb, 0);
2635
2636 /* disable inheritance */
2637 free_heir_data(tb);
2638 tb->common.heir = am_none;
2639
2640 reds -= free_fixations_locked(BIF_P, tb);
2641 tid_clear(BIF_P, tb);
2642 db_unlock(tb, LCK_WRITE);
2643
2644 reds = free_table_continue(BIF_P, tb, reds);
2645 if (reds < 0) {
2646 /*
2647 * Package the DbTable* pointer into a bignum so that it can be safely
2648 * passed through a trap. We used to pass the DbTable* pointer directly
2649 * (it looks like an continuation pointer), but that is will crash the
2650 * emulator if this BIF is call traced.
2651 */
2652 Eterm *hp = HAlloc(BIF_P, 2);
2653 hp[0] = make_pos_bignum_header(1);
2654 hp[1] = (Eterm) tb;
2655 BUMP_ALL_REDS(BIF_P);
2656 BIF_TRAP1(&ets_delete_continue_exp, BIF_P, make_big(hp));
2657 }
2658 else {
2659 BUMP_REDS(BIF_P, (initial_reds - reds));
2660 BIF_RET(am_true);
2661 }
2662 }
2663
2664 /*
2665 ** BIF ets:give_away(Tab, Pid, GiftData)
2666 */
ets_give_away_3(BIF_ALIST_3)2667 BIF_RETTYPE ets_give_away_3(BIF_ALIST_3)
2668 {
2669 Process* to_proc = NULL;
2670 ErtsProcLocks to_locks = ERTS_PROC_LOCK_MAIN;
2671 Eterm to_pid = BIF_ARG_2;
2672 Eterm from_pid;
2673 DbTable* tb = NULL;
2674 Uint freason;
2675
2676 /*
2677 * Note that lock of the process must be taken before the lock
2678 * of the table.
2679 */
2680 to_proc = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, to_pid, to_locks);
2681 /*
2682 * If the table identifier has a problem, we want to report that even if
2683 * the Pid is bad.
2684 */
2685 tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, &freason);
2686 if (!tb)
2687 goto fail;
2688
2689 if (!to_proc) {
2690 freason = BADARG;
2691 goto fail;
2692 }
2693
2694 if (tb->common.owner != BIF_P->common.id) {
2695 BIF_P->fvalue = EXI_NOT_OWNER;
2696 freason = BADARG | EXF_HAS_EXT_INFO;
2697 goto fail;
2698 }
2699
2700 from_pid = tb->common.owner;
2701 if (to_pid == from_pid) {
2702 BIF_P->fvalue = EXI_OWNER;
2703 freason = BADARG | EXF_HAS_EXT_INFO;
2704 goto fail;
2705 }
2706
2707 delete_owned_table(BIF_P, tb);
2708 to_proc->flags |= F_USING_DB;
2709 tb->common.owner = to_pid;
2710 save_owned_table(to_proc, tb);
2711
2712 db_unlock(tb,LCK_WRITE);
2713 send_ets_transfer_message(BIF_P, to_proc, &to_locks,
2714 tb, BIF_ARG_3);
2715 erts_proc_unlock(to_proc, to_locks);
2716 UnUseTmpHeap(5,BIF_P);
2717 BIF_RET(am_true);
2718
2719 fail:
2720 if (to_proc != NULL && to_proc != BIF_P) erts_proc_unlock(to_proc, to_locks);
2721 if (tb != NULL) db_unlock(tb, LCK_WRITE);
2722
2723 return db_bif_fail(BIF_P, freason, BIF_ets_give_away_3, NULL);
2724 }
2725
ets_setopts_2(BIF_ALIST_2)2726 BIF_RETTYPE ets_setopts_2(BIF_ALIST_2)
2727 {
2728 DbTable* tb = NULL;
2729 Eterm* tp;
2730 Eterm opt;
2731 Eterm heir = THE_NON_VALUE;
2732 UWord heir_data = (UWord) THE_NON_VALUE;
2733 Uint32 protection = 0;
2734 DeclareTmpHeap(fakelist,2,BIF_P);
2735 Eterm tail;
2736
2737 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_setopts_2);
2738 if (tb == NULL) {
2739 BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
2740 }
2741
2742 UseTmpHeap(2,BIF_P);
2743 for (tail = is_tuple(BIF_ARG_2) ? CONS(fakelist, BIF_ARG_2, NIL) : BIF_ARG_2;
2744 is_list(tail);
2745 tail = CDR(list_val(tail))) {
2746
2747 opt = CAR(list_val(tail));
2748 if (!is_tuple(opt) || (tp = tuple_val(opt), arityval(tp[0]) < 2)) {
2749 goto badarg;
2750 }
2751
2752 switch (tp[1]) {
2753 case am_heir:
2754 if (heir != THE_NON_VALUE) goto badarg;
2755 heir = tp[2];
2756 if (arityval(tp[0]) == 2 && heir == am_none) {
2757 heir_data = am_undefined;
2758 }
2759 else if (arityval(tp[0]) == 3 && is_internal_pid(heir)) {
2760 heir_data = tp[3];
2761 }
2762 else goto badarg;
2763 break;
2764
2765 case am_protection:
2766 if (arityval(tp[0]) != 2 || protection != 0) goto badarg;
2767 switch (tp[2]) {
2768 case am_private: protection = DB_PRIVATE; break;
2769 case am_protected: protection = DB_PROTECTED; break;
2770 case am_public: protection = DB_PUBLIC; break;
2771 default: goto badarg;
2772 }
2773 break;
2774
2775 default: goto badarg;
2776 }
2777 }
2778
2779 if (tail != NIL)
2780 goto badarg;
2781
2782 if (tb->common.owner != BIF_P->common.id)
2783 goto badarg;
2784
2785 if (heir_data != THE_NON_VALUE) {
2786 free_heir_data(tb);
2787 set_heir(BIF_P, tb, heir, heir_data);
2788 }
2789 if (protection) {
2790 tb->common.status &= ~(DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
2791 tb->common.status |= protection;
2792 }
2793
2794 db_unlock (tb,LCK_WRITE);
2795 UnUseTmpHeap(2,BIF_P);
2796 BIF_RET(am_true);
2797
2798 badarg:
2799 UnUseTmpHeap(2,BIF_P);
2800 if (tb != NULL) {
2801 db_unlock(tb,LCK_WRITE);
2802 }
2803 BIF_ERROR(BIF_P, BADARG);
2804 }
2805
2806 /*
2807 * Common for delete_all_objects and select_delete(DeleteAll).
2808 */
ets_internal_delete_all_2(BIF_ALIST_2)2809 BIF_RETTYPE ets_internal_delete_all_2(BIF_ALIST_2)
2810 {
2811 SWord initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
2812 SWord reds = initial_reds;
2813 Eterm nitems_holder = THE_NON_VALUE;
2814 DbTable* tb;
2815 CHECK_TABLES();
2816
2817 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_internal_delete_all_2);
2818
2819 if (BIF_ARG_2 == am_undefined) {
2820 reds = tb->common.meth->db_delete_all_objects(BIF_P,
2821 tb,
2822 reds,
2823 &nitems_holder);
2824 ASSERT(nitems_holder != THE_NON_VALUE);
2825 ASSERT(!(tb->common.status & DB_BUSY));
2826
2827 if (reds < 0) {
2828 /*
2829 * Oboy, need to trap AND need to be atomic.
2830 * Solved by cooperative trapping where every process trying to
2831 * access this table (including this process) will "fail" to lookup
2832 * the table and instead pitch in deleting objects
2833 * (in delete_all_objects_continue) and then trap to self.
2834 */
2835 ASSERT((tb->common.status & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC))
2836 ==
2837 (tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC)));
2838 tb->common.status &= ~(DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
2839 tb->common.status |= DB_BUSY;
2840 db_unlock(tb, LCK_WRITE);
2841 BUMP_ALL_REDS(BIF_P);
2842 BIF_TRAP2(BIF_TRAP_EXPORT(BIF_ets_internal_delete_all_2), BIF_P,
2843 BIF_ARG_1, nitems_holder);
2844 }
2845 else {
2846 /* Done, no trapping needed */
2847 BUMP_REDS(BIF_P, (initial_reds - reds));
2848 }
2849
2850 }
2851 else {
2852 /*
2853 * The table lookup succeeded and second argument is nitems_holder
2854 * and not 'undefined', which means we have trapped at least once
2855 * and are now done.
2856 */
2857 nitems_holder = BIF_ARG_2;
2858 }
2859 db_unlock(tb, LCK_WRITE);
2860 {
2861 Eterm nitems =
2862 tb->common.meth->db_delete_all_objects_get_nitems_from_holder(BIF_P,
2863 nitems_holder);
2864 BIF_RET(nitems);
2865 }
2866 }
2867
delete_all_objects_continue(Process * p,DbTable * tb)2868 static void delete_all_objects_continue(Process* p, DbTable* tb)
2869 {
2870 SWord initial_reds = ERTS_BIF_REDS_LEFT(p);
2871 SWord reds = initial_reds;
2872
2873 ERTS_LC_ASSERT(DB_LOCK_FREE(tb) || erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
2874
2875 if ((tb->common.status & (DB_DELETE|DB_BUSY)) != DB_BUSY)
2876 return;
2877
2878 reds = tb->common.meth->db_delete_all_objects(p, tb, reds, NULL);
2879
2880 if (reds < 0) {
2881 BUMP_ALL_REDS(p);
2882 }
2883 else {
2884 tb->common.status |= tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
2885 tb->common.status &= ~DB_BUSY;
2886 BUMP_REDS(p, (initial_reds - reds));
2887 }
2888 }
2889
2890 /*
2891 ** Erase an object with given key, or maybe several objects if we have a bag
2892 ** Called as db_erase(Tab, Key), where Key is element 1 of the
2893 ** object(s) we want to erase
2894 */
ets_delete_2(BIF_ALIST_2)2895 BIF_RETTYPE ets_delete_2(BIF_ALIST_2)
2896 {
2897 DbTable* tb;
2898 int cret;
2899 Eterm ret;
2900
2901 CHECK_TABLES();
2902
2903 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_delete_2);
2904
2905 cret = tb->common.meth->db_erase(tb,BIF_ARG_2,&ret);
2906
2907 db_unlock(tb, LCK_WRITE_REC);
2908
2909 switch (cret) {
2910 case DB_ERROR_NONE:
2911 BIF_RET(ret);
2912 case DB_ERROR_SYSRES:
2913 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2914 default:
2915 BIF_ERROR(BIF_P, BADARG);
2916 }
2917 }
2918
2919 /*
2920 ** Erase a specific object, or maybe several objects if we have a bag
2921 */
ets_delete_object_2(BIF_ALIST_2)2922 BIF_RETTYPE ets_delete_object_2(BIF_ALIST_2)
2923 {
2924 DbTable* tb;
2925 int cret;
2926 Eterm ret;
2927
2928 CHECK_TABLES();
2929
2930 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_delete_object_2);
2931
2932 if (is_not_tuple(BIF_ARG_2) ||
2933 (arityval(*tuple_val(BIF_ARG_2)) < tb->common.keypos)) {
2934 db_unlock(tb, LCK_WRITE_REC);
2935 BIF_ERROR(BIF_P, BADARG);
2936 }
2937
2938 cret = tb->common.meth->db_erase_object(tb, BIF_ARG_2, &ret);
2939 db_unlock(tb, LCK_WRITE_REC);
2940
2941 switch (cret) {
2942 case DB_ERROR_NONE:
2943 BIF_RET(ret);
2944 case DB_ERROR_SYSRES:
2945 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2946 default:
2947 BIF_ERROR(BIF_P, BADARG);
2948 }
2949 }
2950
2951 /*
2952 ** This is for trapping, cannot be called directly.
2953 */
ets_select_delete_trap_1(BIF_ALIST_1)2954 static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1)
2955 {
2956 Process *p = BIF_P;
2957 Eterm a1 = BIF_ARG_1;
2958 BIF_RETTYPE result;
2959 DbTable* tb;
2960 int cret;
2961 Eterm ret;
2962 Eterm *tptr;
2963 db_lock_kind_t kind = LCK_WRITE_REC;
2964 enum DbIterSafety safety = ITER_SAFE;
2965
2966 CHECK_TABLES();
2967 ASSERT(is_tuple(a1));
2968 tptr = tuple_val(a1);
2969 ASSERT(arityval(*tptr) >= 1);
2970
2971 DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind,
2972 &ets_select_delete_continue_exp);
2973
2974 cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret,&safety);
2975
2976 if(!DID_TRAP(p,ret) && safety != ITER_SAFE) {
2977 ASSERT(erts_refc_read(&tb->common.fix_count,1));
2978 unfix_table_locked(p, tb, &kind);
2979 }
2980
2981 db_unlock(tb, kind);
2982
2983 switch (cret) {
2984 case DB_ERROR_NONE:
2985 ERTS_BIF_PREP_RET(result, ret);
2986 break;
2987 default:
2988 ERTS_BIF_PREP_ERROR(result, p, BADARG);
2989 break;
2990 }
2991 erts_match_set_release_result(p);
2992
2993 return result;
2994 }
2995
2996
2997 /*
2998 * ets:select_delete/2 without special case for "delete-all".
2999 */
ets_internal_select_delete_2(BIF_ALIST_2)3000 BIF_RETTYPE ets_internal_select_delete_2(BIF_ALIST_2)
3001 {
3002 BIF_RETTYPE result;
3003 DbTable* tb;
3004 int cret;
3005 Eterm ret;
3006 enum DbIterSafety safety;
3007
3008 CHECK_TABLES();
3009
3010 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_internal_select_delete_2);
3011
3012 safety = ITERATION_SAFETY(BIF_P,tb);
3013 if (safety == ITER_UNSAFE) {
3014 local_fix_table(tb);
3015 }
3016 cret = tb->common.meth->db_select_delete(BIF_P, tb, BIF_ARG_1, BIF_ARG_2,
3017 &ret, safety);
3018
3019 if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3020 fix_table_locked(BIF_P,tb);
3021 }
3022 if (safety == ITER_UNSAFE) {
3023 local_unfix_table(tb);
3024 }
3025 db_unlock(tb, LCK_WRITE_REC);
3026
3027 switch (cret) {
3028 case DB_ERROR_NONE:
3029 ERTS_BIF_PREP_RET(result, ret);
3030 break;
3031 case DB_ERROR_SYSRES:
3032 ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3033 break;
3034 default:
3035 ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3036 break;
3037 }
3038
3039 erts_match_set_release_result(BIF_P);
3040
3041 return result;
3042 }
3043
3044 /*
3045 * ets:all/0
3046 *
3047 * ets:all() calls ets:internal_request_all/0 which
3048 * requests information about all tables from
3049 * each scheduler thread. Each scheduler replies
3050 * to the calling process with information about
3051 * existing tables created on that specific scheduler.
3052 */
3053
3054 struct ErtsEtsAllReq_ {
3055 erts_atomic32_t refc;
3056 Process *proc;
3057 ErtsOIRefStorage ref;
3058 ErtsEtsAllReqList list[1]; /* one per scheduler */
3059 };
3060
3061 #define ERTS_ETS_ALL_REQ_SIZE \
3062 (sizeof(ErtsEtsAllReq) \
3063 + (sizeof(ErtsEtsAllReqList) \
3064 * (erts_no_schedulers - 1)))
3065
3066 typedef struct {
3067 ErtsEtsAllReq *ongoing;
3068 ErlHeapFragment *hfrag;
3069 DbTable *tab;
3070 ErtsEtsAllReq *queue;
3071 } ErtsEtsAllData;
3072
3073 /* Tables handled before yielding */
3074 #define ERTS_ETS_ALL_TB_YCNT 200
3075 /*
3076 * Min yield count required before starting
3077 * an operation that will require yield.
3078 */
3079 #define ERTS_ETS_ALL_TB_YCNT_START 10
3080
3081 #ifdef DEBUG
3082 /* Test yielding... */
3083 #undef ERTS_ETS_ALL_TB_YCNT
3084 #undef ERTS_ETS_ALL_TB_YCNT_START
3085 #define ERTS_ETS_ALL_TB_YCNT 10
3086 #define ERTS_ETS_ALL_TB_YCNT_START 1
3087 #endif
3088
3089 static int
ets_all_reply(ErtsSchedulerData * esdp,ErtsEtsAllReq ** reqpp,ErlHeapFragment ** hfragpp,DbTable ** tablepp,int * yield_count_p)3090 ets_all_reply(ErtsSchedulerData *esdp, ErtsEtsAllReq **reqpp,
3091 ErlHeapFragment **hfragpp, DbTable **tablepp,
3092 int *yield_count_p)
3093 {
3094 ErtsEtsAllReq *reqp = *reqpp;
3095 ErlHeapFragment *hfragp = *hfragpp;
3096 int ycount = *yield_count_p;
3097 DbTable *tb, *first;
3098 Uint sz;
3099 Eterm list, msg, ref, *hp;
3100 ErlOffHeap *ohp;
3101 ErtsMessage *mp;
3102
3103 /*
3104 * - save_sched_table() inserts at end of circular list.
3105 *
3106 * - This function scans from the end so we know that
3107 * the amount of tables to scan wont grow even if we
3108 * yield.
3109 *
3110 * - remove_sched_table() updates the table we yielded
3111 * on if it removes it.
3112 */
3113
3114 if (hfragp) {
3115 /* Restart of a yielded operation... */
3116 ASSERT(hfragp->used_size < hfragp->alloc_size);
3117 ohp = &hfragp->off_heap;
3118 hp = &hfragp->mem[hfragp->used_size];
3119 list = *hp;
3120 hfragp->used_size = hfragp->alloc_size;
3121 first = esdp->ets_tables.clist;
3122 tb = *tablepp;
3123 }
3124 else {
3125 /* A new operation... */
3126 ASSERT(!*tablepp);
3127
3128 /* Max heap size needed... */
3129 sz = erts_atomic_read_nob(&esdp->ets_tables.count);
3130 sz *= ERTS_MAGIC_REF_THING_SIZE + 2;
3131 sz += 3 + ERTS_REF_THING_SIZE;
3132 hfragp = new_message_buffer(sz);
3133
3134 hp = &hfragp->mem[0];
3135 ohp = &hfragp->off_heap;
3136 list = NIL;
3137 first = esdp->ets_tables.clist;
3138 tb = first ? first->common.all.prev : NULL;
3139 }
3140
3141 if (tb) {
3142 while (1) {
3143 if (is_table_alive(tb)) {
3144 Eterm tid;
3145 if (is_table_named(tb))
3146 tid = tb->common.the_name;
3147 else
3148 tid = erts_mk_magic_ref(&hp, ohp, tb->common.btid);
3149 list = CONS(hp, tid, list);
3150 hp += 2;
3151 }
3152
3153 if (tb == first)
3154 break;
3155
3156 tb = tb->common.all.prev;
3157
3158 if (--ycount <= 0) {
3159 sz = hp - &hfragp->mem[0];
3160 ASSERT(hfragp->alloc_size > sz + 1);
3161 *hp = list;
3162 hfragp->used_size = sz;
3163 *hfragpp = hfragp;
3164 *reqpp = reqp;
3165 *tablepp = tb;
3166 *yield_count_p = 0;
3167 return 1; /* Yield! */
3168 }
3169 }
3170 }
3171
3172 ref = erts_oiref_storage_make_ref(&reqp->ref, &hp);
3173 msg = TUPLE2(hp, ref, list);
3174 hp += 3;
3175
3176 sz = hp - &hfragp->mem[0];
3177 ASSERT(sz <= hfragp->alloc_size);
3178
3179 hfragp = erts_resize_message_buffer(hfragp, sz, &msg, 1);
3180
3181 mp = erts_alloc_message(0, NULL);
3182 mp->data.heap_frag = hfragp;
3183
3184 erts_queue_message(reqp->proc, 0, mp, msg, am_system);
3185
3186 erts_proc_dec_refc(reqp->proc);
3187
3188 if (erts_atomic32_dec_read_nob(&reqp->refc) == 0)
3189 erts_free(ERTS_ALC_T_ETS_ALL_REQ, reqp);
3190
3191 *reqpp = NULL;
3192 *hfragpp = NULL;
3193 *tablepp = NULL;
3194 *yield_count_p = ycount;
3195
3196 return 0;
3197 }
3198
3199 int
erts_handle_yielded_ets_all_request(ErtsSchedulerData * esdp,ErtsEtsAllYieldData * eaydp)3200 erts_handle_yielded_ets_all_request(ErtsSchedulerData *esdp,
3201 ErtsEtsAllYieldData *eaydp)
3202 {
3203 int ix = (int) esdp->no - 1;
3204 int yc = ERTS_ETS_ALL_TB_YCNT;
3205
3206 while (1) {
3207 if (!eaydp->ongoing) {
3208 ErtsEtsAllReq *ongoing;
3209
3210 if (!eaydp->queue)
3211 return 0; /* All work completed! */
3212
3213 if (yc < ERTS_ETS_ALL_TB_YCNT_START &&
3214 yc > erts_atomic_read_nob(&esdp->ets_tables.count))
3215 return 1; /* Yield! */
3216
3217 eaydp->ongoing = ongoing = eaydp->queue;
3218 if (ongoing->list[ix].next == ongoing)
3219 eaydp->queue = NULL;
3220 else {
3221 ongoing->list[ix].next->list[ix].prev = ongoing->list[ix].prev;
3222 ongoing->list[ix].prev->list[ix].next = ongoing->list[ix].next;
3223 eaydp->queue = ongoing->list[ix].next;
3224 }
3225 ASSERT(!eaydp->hfrag);
3226 ASSERT(!eaydp->tab);
3227 }
3228
3229 if (ets_all_reply(esdp, &eaydp->ongoing, &eaydp->hfrag, &eaydp->tab, &yc))
3230 return 1; /* Yield! */
3231 }
3232 }
3233
3234 static void
handle_ets_all_request(void * vreq)3235 handle_ets_all_request(void *vreq)
3236 {
3237 ErtsSchedulerData *esdp = erts_get_scheduler_data();
3238 ErtsEtsAllYieldData *eayp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all);
3239 ErtsEtsAllReq *req = (ErtsEtsAllReq *) vreq;
3240
3241 if (!eayp->ongoing && !eayp->queue) {
3242 /* No ets:all() operations ongoing... */
3243 ErlHeapFragment *hf = NULL;
3244 DbTable *tb = NULL;
3245 int yc = ERTS_ETS_ALL_TB_YCNT;
3246 if (ets_all_reply(esdp, &req, &hf, &tb, &yc)) {
3247 /* Yielded... */
3248 ASSERT(hf);
3249 eayp->ongoing = req;
3250 eayp->hfrag = hf;
3251 eayp->tab = tb;
3252 erts_notify_new_aux_yield_work(esdp);
3253 }
3254 }
3255 else {
3256 /* Ongoing ets:all() operations; queue up this request... */
3257 int ix = (int) esdp->no - 1;
3258 if (!eayp->queue) {
3259 req->list[ix].next = req;
3260 req->list[ix].prev = req;
3261 eayp->queue = req;
3262 }
3263 else {
3264 req->list[ix].next = eayp->queue;
3265 req->list[ix].prev = eayp->queue->list[ix].prev;
3266 eayp->queue->list[ix].prev = req;
3267 req->list[ix].prev->list[ix].next = req;
3268 }
3269 }
3270 }
3271
ets_internal_request_all_0(BIF_ALIST_0)3272 BIF_RETTYPE ets_internal_request_all_0(BIF_ALIST_0)
3273 {
3274 Eterm ref = erts_make_ref(BIF_P);
3275 ErtsEtsAllReq *req = erts_alloc(ERTS_ALC_T_ETS_ALL_REQ,
3276 ERTS_ETS_ALL_REQ_SIZE);
3277 erts_atomic32_init_nob(&req->refc,
3278 (erts_aint32_t) erts_no_schedulers);
3279 erts_oiref_storage_save(&req->ref, ref);
3280 req->proc = BIF_P;
3281 erts_proc_add_refc(BIF_P, (Sint) erts_no_schedulers);
3282
3283 if (erts_no_schedulers > 1)
3284 erts_schedule_multi_misc_aux_work(1,
3285 erts_no_schedulers,
3286 handle_ets_all_request,
3287 (void *) req);
3288
3289 handle_ets_all_request((void *) req);
3290 BIF_RET(ref);
3291 }
3292
3293 /*
3294 ** db_slot(Db, Slot) -> [Items].
3295 */
ets_slot_2(BIF_ALIST_2)3296 BIF_RETTYPE ets_slot_2(BIF_ALIST_2)
3297 {
3298 DbTable* tb;
3299 int cret;
3300 Eterm ret;
3301
3302 CHECK_TABLES();
3303
3304 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_slot_2);
3305
3306 /* The slot number is checked in table specific code. */
3307 cret = tb->common.meth->db_slot(BIF_P, tb, BIF_ARG_2, &ret);
3308 db_unlock(tb, LCK_READ);
3309 switch (cret) {
3310 case DB_ERROR_NONE:
3311 BIF_RET(ret);
3312 case DB_ERROR_SYSRES:
3313 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
3314 default:
3315 BIF_ERROR(BIF_P, BADARG);
3316 }
3317 }
3318
3319 /*
3320 ** The match BIF, called as ets:match(Table, Pattern), ets:match(Continuation) or ets:match(Table,Pattern,ChunkSize).
3321 */
3322
ets_match_1(BIF_ALIST_1)3323 BIF_RETTYPE ets_match_1(BIF_ALIST_1)
3324 {
3325 return ets_select1(BIF_P, BIF_ets_match_1, BIF_ARG_1);
3326 }
3327
ets_match_2(BIF_ALIST_2)3328 BIF_RETTYPE ets_match_2(BIF_ALIST_2)
3329 {
3330 DbTable* tb;
3331 Eterm ms;
3332 DeclareTmpHeap(buff,8,BIF_P);
3333 Eterm *hp = buff;
3334 Eterm res;
3335
3336 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_2);
3337
3338 UseTmpHeap(8,BIF_P);
3339 ms = CONS(hp, am_DollarDollar, NIL);
3340 hp += 2;
3341 ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
3342 hp += 4;
3343 ms = CONS(hp, ms, NIL);
3344 res = ets_select2(BIF_P, tb, BIF_ARG_1, ms);
3345 UnUseTmpHeap(8,BIF_P);
3346 return res;
3347 }
3348
ets_match_3(BIF_ALIST_3)3349 BIF_RETTYPE ets_match_3(BIF_ALIST_3)
3350 {
3351 DbTable* tb;
3352 Eterm ms;
3353 Sint chunk_size;
3354 DeclareTmpHeap(buff,8,BIF_P);
3355 Eterm *hp = buff;
3356 Eterm res;
3357
3358 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_3);
3359
3360 /* Chunk size strictly greater than 0 */
3361 if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
3362 db_unlock(tb, LCK_READ);
3363 BIF_ERROR(BIF_P, BADARG);
3364 }
3365
3366 UseTmpHeap(8,BIF_P);
3367 ms = CONS(hp, am_DollarDollar, NIL);
3368 hp += 2;
3369 ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
3370 hp += 4;
3371 ms = CONS(hp, ms, NIL);
3372 res = ets_select3(BIF_P, tb, BIF_ARG_1, ms, chunk_size);
3373 UnUseTmpHeap(8,BIF_P);
3374 return res;
3375 }
3376
3377
ets_select_3(BIF_ALIST_3)3378 BIF_RETTYPE ets_select_3(BIF_ALIST_3)
3379 {
3380 DbTable* tb;
3381 Sint chunk_size;
3382
3383 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_3);
3384
3385 /* Chunk size strictly greater than 0 */
3386 if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
3387 db_unlock(tb, LCK_READ);
3388 BIF_ERROR(BIF_P, BADARG);
3389 }
3390
3391 return ets_select3(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, chunk_size);
3392 }
3393
3394 static BIF_RETTYPE
ets_select3(Process * p,DbTable * tb,Eterm tid,Eterm ms,Sint chunk_size)3395 ets_select3(Process* p, DbTable* tb, Eterm tid, Eterm ms, Sint chunk_size)
3396 {
3397 BIF_RETTYPE result;
3398 int cret;
3399 Eterm ret;
3400 enum DbIterSafety safety;
3401
3402 CHECK_TABLES();
3403
3404 safety = ITERATION_SAFETY(p,tb);
3405 if (safety == ITER_UNSAFE) {
3406 local_fix_table(tb);
3407 }
3408 cret = tb->common.meth->db_select_chunk(p, tb, tid,
3409 ms, chunk_size,
3410 0 /* not reversed */,
3411 &ret, safety);
3412 if (DID_TRAP(p,ret) && safety != ITER_SAFE) {
3413 fix_table_locked(p, tb);
3414 }
3415 if (safety == ITER_UNSAFE) {
3416 local_unfix_table(tb);
3417 }
3418 db_unlock(tb, LCK_READ);
3419
3420 switch (cret) {
3421 case DB_ERROR_NONE:
3422 ERTS_BIF_PREP_RET(result, ret);
3423 break;
3424 case DB_ERROR_SYSRES:
3425 ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3426 break;
3427 default:
3428 ERTS_BIF_PREP_ERROR(result, p, BADARG);
3429 break;
3430 }
3431
3432 erts_match_set_release_result(p);
3433
3434 return result;
3435 }
3436
3437
3438 /* Trap here from: ets_select_1/2/3
3439 */
ets_select_trap_1(BIF_ALIST_1)3440 static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1)
3441 {
3442 Process *p = BIF_P;
3443 Eterm a1 = BIF_ARG_1;
3444 BIF_RETTYPE result;
3445 DbTable* tb;
3446 int cret;
3447 Eterm ret;
3448 Eterm *tptr;
3449 db_lock_kind_t kind = LCK_READ;
3450 enum DbIterSafety safety = ITER_SAFE;
3451
3452 CHECK_TABLES();
3453
3454 tptr = tuple_val(a1);
3455 ASSERT(arityval(*tptr) >= 1);
3456
3457 DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind,
3458 &ets_select_continue_exp);
3459
3460 cret = tb->common.meth->db_select_continue(p, tb, a1, &ret, &safety);
3461
3462 if (!DID_TRAP(p,ret)) {
3463 if (safety != ITER_SAFE) {
3464 ASSERT(erts_refc_read(&tb->common.fix_count,1));
3465 unfix_table_locked(p, tb, &kind);
3466 }
3467 }
3468 db_unlock(tb, kind);
3469
3470 switch (cret) {
3471 case DB_ERROR_NONE:
3472 ERTS_BIF_PREP_RET(result, ret);
3473 break;
3474 case DB_ERROR_SYSRES:
3475 ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3476 break;
3477 default:
3478 ERTS_BIF_PREP_ERROR(result, p, BADARG);
3479 break;
3480 }
3481
3482 erts_match_set_release_result(p);
3483
3484 return result;
3485 }
3486
3487
ets_select_1(BIF_ALIST_1)3488 BIF_RETTYPE ets_select_1(BIF_ALIST_1)
3489 {
3490 return ets_select1(BIF_P, BIF_ets_select_1, BIF_ARG_1);
3491 /* TRAP: ets_select_trap_1 */
3492 }
3493
3494 /*
3495 * Common impl for select/1, select_reverse/1, match/1 and match_object/1
3496 */
ets_select1(Process * p,int bif_ix,Eterm arg1)3497 static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1)
3498 {
3499 BIF_RETTYPE result;
3500 DbTable* tb;
3501 int cret;
3502 Eterm ret;
3503 Eterm *tptr;
3504 enum DbIterSafety safety, safety_copy;
3505
3506 CHECK_TABLES();
3507
3508 /*
3509 * Make sure that the table exists.
3510 */
3511
3512 if (!is_tuple(arg1)) {
3513 if (arg1 == am_EOT) {
3514 BIF_RET(am_EOT);
3515 }
3516 BIF_ERROR(p, BADARG);
3517 }
3518 tptr = tuple_val(arg1);
3519 if (arityval(*tptr) < 1)
3520 BIF_ERROR(p, BADARG);
3521
3522 DB_GET_TABLE(tb, tptr[1], DB_READ, LCK_READ, bif_ix, NULL, p);
3523
3524 safety = ITERATION_SAFETY(p,tb);
3525 if (safety == ITER_UNSAFE) {
3526 local_fix_table(tb);
3527 }
3528
3529 safety_copy = safety;
3530 cret = tb->common.meth->db_select_continue(p,tb, arg1, &ret, &safety_copy);
3531
3532 if (DID_TRAP(p,ret) && safety != ITER_SAFE) {
3533 fix_table_locked(p, tb);
3534 }
3535 if (safety == ITER_UNSAFE) {
3536 local_unfix_table(tb);
3537 }
3538 db_unlock(tb, LCK_READ);
3539
3540 switch (cret) {
3541 case DB_ERROR_NONE:
3542 ERTS_BIF_PREP_RET(result, ret);
3543 break;
3544 case DB_ERROR_SYSRES:
3545 ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3546 break;
3547 default:
3548 ERTS_BIF_PREP_ERROR(result, p, BADARG);
3549 break;
3550 }
3551
3552 erts_match_set_release_result(p);
3553
3554 return result;
3555 }
3556
ets_select_2(BIF_ALIST_2)3557 BIF_RETTYPE ets_select_2(BIF_ALIST_2)
3558 {
3559 DbTable* tb;
3560 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_2);
3561 return ets_select2(BIF_P, tb, BIF_ARG_1, BIF_ARG_2);
3562 /* TRAP: ets_select_trap_1 */
3563 }
3564
3565 static BIF_RETTYPE
ets_select2(Process * p,DbTable * tb,Eterm tid,Eterm ms)3566 ets_select2(Process* p, DbTable* tb, Eterm tid, Eterm ms)
3567 {
3568 BIF_RETTYPE result;
3569 int cret;
3570 enum DbIterSafety safety;
3571 Eterm ret;
3572
3573 CHECK_TABLES();
3574
3575 safety = ITERATION_SAFETY(p,tb);
3576 if (safety == ITER_UNSAFE) {
3577 local_fix_table(tb);
3578 }
3579
3580 cret = tb->common.meth->db_select(p, tb, tid, ms, 0, &ret, safety);
3581
3582 if (DID_TRAP(p,ret) && safety != ITER_SAFE) {
3583 fix_table_locked(p, tb);
3584 }
3585 if (safety == ITER_UNSAFE) {
3586 local_unfix_table(tb);
3587 }
3588 db_unlock(tb, LCK_READ);
3589
3590 switch (cret) {
3591 case DB_ERROR_NONE:
3592 ERTS_BIF_PREP_RET(result, ret);
3593 break;
3594 case DB_ERROR_SYSRES:
3595 ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3596 break;
3597 default:
3598 ERTS_BIF_PREP_ERROR(result, p, BADARG);
3599 break;
3600 }
3601
3602 erts_match_set_release_result(p);
3603
3604 return result;
3605 }
3606
3607 /* We get here instead of in the real BIF when trapping */
ets_select_count_1(BIF_ALIST_1)3608 static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1)
3609 {
3610 Process *p = BIF_P;
3611 Eterm a1 = BIF_ARG_1;
3612 BIF_RETTYPE result;
3613 DbTable* tb;
3614 int cret;
3615 Eterm ret;
3616 Eterm *tptr;
3617 db_lock_kind_t kind = LCK_READ;
3618 enum DbIterSafety safety = ITER_SAFE;
3619
3620 CHECK_TABLES();
3621
3622 tptr = tuple_val(a1);
3623 ASSERT(arityval(*tptr) >= 1);
3624
3625 DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind,
3626 &ets_select_count_continue_exp);
3627
3628 cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret, &safety);
3629
3630 if (!DID_TRAP(p,ret) && safety != ITER_SAFE) {
3631 ASSERT(erts_refc_read(&tb->common.fix_count,1));
3632 unfix_table_locked(p, tb, &kind);
3633 }
3634 db_unlock(tb, kind);
3635
3636 switch (cret) {
3637 case DB_ERROR_NONE:
3638 ERTS_BIF_PREP_RET(result, ret);
3639 break;
3640 case DB_ERROR_SYSRES:
3641 ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3642 break;
3643 default:
3644 ERTS_BIF_PREP_ERROR(result, p, BADARG);
3645 break;
3646 }
3647
3648 erts_match_set_release_result(p);
3649
3650 return result;
3651 }
3652
ets_select_count_2(BIF_ALIST_2)3653 BIF_RETTYPE ets_select_count_2(BIF_ALIST_2)
3654 {
3655 BIF_RETTYPE result;
3656 DbTable* tb;
3657 int cret;
3658 enum DbIterSafety safety;
3659 Eterm ret;
3660
3661 CHECK_TABLES();
3662
3663 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_count_2);
3664
3665 safety = ITERATION_SAFETY(BIF_P,tb);
3666 if (safety == ITER_UNSAFE) {
3667 local_fix_table(tb);
3668 }
3669 cret = tb->common.meth->db_select_count(BIF_P,tb, BIF_ARG_1, BIF_ARG_2,
3670 &ret, safety);
3671
3672 if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3673 fix_table_locked(BIF_P, tb);
3674 }
3675 if (safety == ITER_UNSAFE) {
3676 local_unfix_table(tb);
3677 }
3678 db_unlock(tb, LCK_READ);
3679 switch (cret) {
3680 case DB_ERROR_NONE:
3681 ERTS_BIF_PREP_RET(result, ret);
3682 break;
3683 case DB_ERROR_SYSRES:
3684 ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3685 break;
3686 default:
3687 ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3688 break;
3689 }
3690
3691 erts_match_set_release_result(BIF_P);
3692
3693 return result;
3694 }
3695
3696 /*
3697 ** This is for trapping, cannot be called directly.
3698 */
ets_select_replace_1(BIF_ALIST_1)3699 static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1)
3700 {
3701 Process *p = BIF_P;
3702 Eterm a1 = BIF_ARG_1;
3703 BIF_RETTYPE result;
3704 DbTable* tb;
3705 int cret;
3706 Eterm ret;
3707 Eterm *tptr;
3708 db_lock_kind_t kind = LCK_WRITE_REC;
3709 enum DbIterSafety safety = ITER_SAFE;
3710
3711 CHECK_TABLES();
3712 ASSERT(is_tuple(a1));
3713 tptr = tuple_val(a1);
3714 ASSERT(arityval(*tptr) >= 1);
3715
3716 DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind,
3717 &ets_select_replace_continue_exp);
3718
3719 cret = tb->common.meth->db_select_replace_continue(p,tb,a1,&ret,&safety);
3720
3721 if(!DID_TRAP(p,ret) && safety != ITER_SAFE) {
3722 ASSERT(erts_refc_read(&tb->common.fix_count,1));
3723 unfix_table_locked(p, tb, &kind);
3724 }
3725
3726 db_unlock(tb, kind);
3727
3728 switch (cret) {
3729 case DB_ERROR_NONE:
3730 ERTS_BIF_PREP_RET(result, ret);
3731 break;
3732 default:
3733 ERTS_BIF_PREP_ERROR(result, p, BADARG);
3734 break;
3735 }
3736 erts_match_set_release_result(p);
3737
3738 return result;
3739 }
3740
3741
ets_select_replace_2(BIF_ALIST_2)3742 BIF_RETTYPE ets_select_replace_2(BIF_ALIST_2)
3743 {
3744 BIF_RETTYPE result;
3745 DbTable* tb;
3746 int cret;
3747 Eterm ret;
3748 enum DbIterSafety safety;
3749
3750 CHECK_TABLES();
3751
3752 DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_select_replace_2);
3753
3754 if (tb->common.status & DB_BAG) {
3755 /* Bag implementation presented both semantic consistency
3756 and performance issues */
3757 db_unlock(tb, LCK_WRITE_REC);
3758 BIF_P->fvalue = EXI_TAB_TYPE;
3759 BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
3760 }
3761
3762 safety = ITERATION_SAFETY(BIF_P,tb);
3763 if (safety == ITER_UNSAFE) {
3764 local_fix_table(tb);
3765 }
3766 cret = tb->common.meth->db_select_replace(BIF_P, tb, BIF_ARG_1, BIF_ARG_2,
3767 &ret, safety);
3768
3769 if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3770 fix_table_locked(BIF_P,tb);
3771 }
3772 if (safety == ITER_UNSAFE) {
3773 local_unfix_table(tb);
3774 }
3775 db_unlock(tb, LCK_WRITE_REC);
3776
3777 switch (cret) {
3778 case DB_ERROR_NONE:
3779 ERTS_BIF_PREP_RET(result, ret);
3780 break;
3781 case DB_ERROR_SYSRES:
3782 ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3783 break;
3784 default:
3785 ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3786 break;
3787 }
3788
3789 erts_match_set_release_result(BIF_P);
3790
3791 return result;
3792 }
3793
3794
ets_select_reverse_3(BIF_ALIST_3)3795 BIF_RETTYPE ets_select_reverse_3(BIF_ALIST_3)
3796 {
3797 BIF_RETTYPE result;
3798 DbTable* tb;
3799 int cret;
3800 enum DbIterSafety safety;
3801 Eterm ret;
3802 Sint chunk_size;
3803
3804 CHECK_TABLES();
3805
3806 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_reverse_3);
3807
3808 /* Chunk size strictly greater than 0 */
3809 if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
3810 db_unlock(tb, LCK_READ);
3811 BIF_ERROR(BIF_P, BADARG);
3812 }
3813 safety = ITERATION_SAFETY(BIF_P,tb);
3814 if (safety == ITER_UNSAFE) {
3815 local_fix_table(tb);
3816 }
3817 cret = tb->common.meth->db_select_chunk(BIF_P,tb, BIF_ARG_1,
3818 BIF_ARG_2, chunk_size,
3819 1 /* reversed */, &ret, safety);
3820 if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3821 fix_table_locked(BIF_P, tb);
3822 }
3823 if (safety == ITER_UNSAFE) {
3824 local_unfix_table(tb);
3825 }
3826 db_unlock(tb, LCK_READ);
3827 switch (cret) {
3828 case DB_ERROR_NONE:
3829 ERTS_BIF_PREP_RET(result, ret);
3830 break;
3831 case DB_ERROR_SYSRES:
3832 ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3833 break;
3834 default:
3835 ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3836 break;
3837 }
3838 erts_match_set_release_result(BIF_P);
3839 return result;
3840 }
3841
ets_select_reverse_1(BIF_ALIST_1)3842 BIF_RETTYPE ets_select_reverse_1(BIF_ALIST_1)
3843 {
3844 return ets_select1(BIF_P, BIF_ets_select_reverse_1, BIF_ARG_1);
3845 }
3846
ets_select_reverse_2(BIF_ALIST_2)3847 BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2)
3848 {
3849 BIF_RETTYPE result;
3850 DbTable* tb;
3851 int cret;
3852 enum DbIterSafety safety;
3853 Eterm ret;
3854
3855 CHECK_TABLES();
3856
3857 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_reverse_2);
3858
3859 safety = ITERATION_SAFETY(BIF_P,tb);
3860 if (safety == ITER_UNSAFE) {
3861 local_fix_table(tb);
3862 }
3863 cret = tb->common.meth->db_select(BIF_P,tb, BIF_ARG_1, BIF_ARG_2,
3864 1 /*reversed*/, &ret, safety);
3865
3866 if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3867 fix_table_locked(BIF_P, tb);
3868 }
3869 if (safety == ITER_UNSAFE) {
3870 local_unfix_table(tb);
3871 }
3872 db_unlock(tb, LCK_READ);
3873 switch (cret) {
3874 case DB_ERROR_NONE:
3875 ERTS_BIF_PREP_RET(result, ret);
3876 break;
3877 case DB_ERROR_SYSRES:
3878 ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3879 break;
3880 default:
3881 ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3882 break;
3883 }
3884 erts_match_set_release_result(BIF_P);
3885 return result;
3886 }
3887
3888
3889 /*
3890 ** ets:match_object(Continuation)
3891 */
ets_match_object_1(BIF_ALIST_1)3892 BIF_RETTYPE ets_match_object_1(BIF_ALIST_1)
3893 {
3894 return ets_select1(BIF_P, BIF_ets_match_object_1, BIF_ARG_1);
3895 }
3896
3897 /*
3898 ** ets:match_object(Table, Pattern)
3899 */
ets_match_object_2(BIF_ALIST_2)3900 BIF_RETTYPE ets_match_object_2(BIF_ALIST_2)
3901 {
3902 DbTable* tb;
3903 Eterm ms;
3904 DeclareTmpHeap(buff,8,BIF_P);
3905 Eterm *hp = buff;
3906 Eterm res;
3907
3908 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_object_2);
3909
3910 UseTmpHeap(8,BIF_P);
3911 ms = CONS(hp, am_DollarUnderscore, NIL);
3912 hp += 2;
3913 ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
3914 hp += 4;
3915 ms = CONS(hp, ms, NIL);
3916 res = ets_select2(BIF_P, tb, BIF_ARG_1, ms);
3917 UnUseTmpHeap(8,BIF_P);
3918 return res;
3919 }
3920
3921 /*
3922 ** ets:match_object(Table,Pattern,ChunkSize)
3923 */
ets_match_object_3(BIF_ALIST_3)3924 BIF_RETTYPE ets_match_object_3(BIF_ALIST_3)
3925 {
3926 DbTable* tb;
3927 Sint chunk_size;
3928 Eterm ms;
3929 DeclareTmpHeap(buff,8,BIF_P);
3930 Eterm *hp = buff;
3931 Eterm res;
3932
3933 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_object_3);
3934
3935 /* Chunk size strictly greater than 0 */
3936 if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
3937 db_unlock(tb, LCK_READ);
3938 BIF_ERROR(BIF_P, BADARG);
3939 }
3940
3941 UseTmpHeap(8,BIF_P);
3942 ms = CONS(hp, am_DollarUnderscore, NIL);
3943 hp += 2;
3944 ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
3945 hp += 4;
3946 ms = CONS(hp, ms, NIL);
3947 res = ets_select3(BIF_P, tb, BIF_ARG_1, ms, chunk_size);
3948 UnUseTmpHeap(8,BIF_P);
3949 return res;
3950 }
3951
3952 /*
3953 * BIF to extract information about a particular table.
3954 */
3955
ets_info_1(BIF_ALIST_1)3956 BIF_RETTYPE ets_info_1(BIF_ALIST_1)
3957 {
3958 static Eterm fields[] = {am_protection, am_keypos, am_type, am_named_table,
3959 am_node, am_size, am_name, am_heir, am_owner, am_memory, am_compressed,
3960 am_write_concurrency,
3961 am_read_concurrency,
3962 am_decentralized_counters,
3963 am_id};
3964 Eterm results[sizeof(fields)/sizeof(Eterm)];
3965 DbTable* tb;
3966 Eterm res;
3967 int i;
3968 Eterm* hp;
3969 Uint freason;
3970 Sint size = -1;
3971 Sint memory = -1;
3972 Eterm table;
3973 int is_ctrs_read_result_set = 0;
3974 /*Process* rp = NULL;*/
3975 /* If/when we implement lockless private tables:
3976 Eterm owner;
3977 */
3978 if(is_tuple(BIF_ARG_1) &&
3979 is_tuple_arity(BIF_ARG_1, 2) &&
3980 erts_flxctr_is_snapshot_result(tuple_val(BIF_ARG_1)[1])) {
3981 Eterm counter_read_result = tuple_val(BIF_ARG_1)[1];
3982 table = tuple_val(BIF_ARG_1)[2];
3983 size = erts_flxctr_get_snapshot_result_after_trap(counter_read_result,
3984 ERTS_DB_TABLE_NITEMS_COUNTER_ID);
3985 memory = erts_flxctr_get_snapshot_result_after_trap(counter_read_result,
3986 ERTS_DB_TABLE_MEM_COUNTER_ID);
3987 is_ctrs_read_result_set = 1;
3988 } else {
3989 table = BIF_ARG_1;
3990 }
3991 if ((tb = db_get_table(BIF_P, table, DB_INFO, LCK_READ, &freason)) == NULL) {
3992 if (BIF_P->fvalue == EXI_TYPE) {
3993 /* TRAP or invalid table identifier (not atom or magic reference). */
3994 return db_bif_fail(BIF_P, freason, BIF_ets_info_1, NULL);
3995 } else {
3996 /* The table no longer exists. */
3997 BIF_RET(am_undefined);
3998 }
3999 }
4000
4001 /* If/when we implement lockless private tables:
4002 owner = tb->common.owner;
4003 */
4004
4005 /* If/when we implement lockless private tables:
4006 if ((tb->common.status & DB_PRIVATE) && owner != BIF_P->common.id) {
4007 db_unlock(tb, LCK_READ);
4008 rp = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
4009 owner, ERTS_PROC_LOCK_MAIN);
4010 if (rp == NULL) {
4011 BIF_RET(am_undefined);
4012 }
4013 if (rp == ERTS_PROC_LOCK_BUSY) {
4014 ERTS_BIF_YIELD1(BIF_TRAP_EXPORT(BIF_ets_info_1), BIF_P, BIF_ARG_1);
4015 }
4016 if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL
4017 || tb->common.owner != owner) {
4018 if (BIF_P != rp)
4019 erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
4020 if (is_atom(BIF_ARG_1) || is_small(BIF_ARG_1)) {
4021 BIF_RET(am_undefined);
4022 }
4023 BIF_ERROR(BIF_P, BADARG);
4024 }
4025 }*/
4026
4027 if (!is_ctrs_read_result_set) {
4028 ErtsFlxCtrSnapshotResult res =
4029 erts_flxctr_snapshot(&tb->common.counters, ERTS_ALC_T_ETS_CTRS, BIF_P);
4030 if (ERTS_FLXCTR_GET_RESULT_AFTER_TRAP == res.type) {
4031 Eterm tuple;
4032 db_unlock(tb, LCK_READ);
4033 hp = HAlloc(BIF_P, 3);
4034 tuple = TUPLE2(hp, res.trap_resume_state, table);
4035 BIF_TRAP1(BIF_TRAP_EXPORT(BIF_ets_info_1), BIF_P, tuple);
4036 } else if (res.type == ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP) {
4037 db_unlock(tb, LCK_READ);
4038 BIF_TRAP1(BIF_TRAP_EXPORT(BIF_ets_info_1), BIF_P, table);
4039 } else {
4040 size = res.result[ERTS_DB_TABLE_NITEMS_COUNTER_ID];
4041 memory = res.result[ERTS_DB_TABLE_MEM_COUNTER_ID];
4042 is_ctrs_read_result_set = 1;
4043 }
4044 }
4045 for (i = 0; i < sizeof(fields)/sizeof(Eterm); i++) {
4046 if (is_ctrs_read_result_set && am_size == fields[i]) {
4047 results[i] = erts_make_integer(size, BIF_P);
4048 } else if (is_ctrs_read_result_set && am_memory == fields[i]) {
4049 Sint words = (Sint) ((memory + sizeof(Sint) - 1) / sizeof(Sint));
4050 results[i] = erts_make_integer(words, BIF_P);
4051 } else {
4052 results[i] = table_info(BIF_P, tb, fields[i]);
4053 ASSERT(is_value(results[i]));
4054 }
4055 }
4056 db_unlock(tb, LCK_READ);
4057
4058 /*if (rp != NULL && rp != BIF_P)
4059 erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);*/
4060
4061 hp = HAlloc(BIF_P, 5*sizeof(fields)/sizeof(Eterm));
4062 res = NIL;
4063 for (i = 0; i < sizeof(fields)/sizeof(Eterm); i++) {
4064 Eterm tuple;
4065 tuple = TUPLE2(hp, fields[i], results[i]);
4066 hp += 3;
4067 res = CONS(hp, tuple, res);
4068 hp += 2;
4069 }
4070 BIF_RET(res);
4071 }
4072
4073 /*
4074 * BIF to extract information about a particular table.
4075 */
4076
ets_info_2(BIF_ALIST_2)4077 BIF_RETTYPE ets_info_2(BIF_ALIST_2)
4078 {
4079 DbTable* tb;
4080 Eterm ret = THE_NON_VALUE;
4081 Uint freason;
4082 if (erts_flxctr_is_snapshot_result(BIF_ARG_1)) {
4083 Sint res;
4084 if (am_memory == BIF_ARG_2) {
4085 res = erts_flxctr_get_snapshot_result_after_trap(BIF_ARG_1,
4086 ERTS_DB_TABLE_MEM_COUNTER_ID);
4087 res = (Sint) ((res + sizeof(Sint) - 1) / sizeof(Sint));
4088 } else {
4089 res = erts_flxctr_get_snapshot_result_after_trap(BIF_ARG_1,
4090 ERTS_DB_TABLE_NITEMS_COUNTER_ID);
4091 }
4092 BIF_RET(erts_make_integer(res, BIF_P));
4093 }
4094
4095 if (BIF_ARG_2 == am_binary)
4096 BIF_TRAP1(ets_info_binary_trap, BIF_P, BIF_ARG_1);
4097
4098 if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) {
4099 if (BIF_P->fvalue == EXI_TYPE) {
4100 /* TRAP or invalid table identifier (not atom or magic reference). */
4101 return db_bif_fail(BIF_P, freason, BIF_ets_info_2, NULL);
4102 } else {
4103 /* The table no longer exists. */
4104 BIF_RET(am_undefined);
4105 }
4106 }
4107 if (BIF_ARG_2 == am_size || BIF_ARG_2 == am_memory) {
4108 ErtsFlxCtrSnapshotResult res =
4109 erts_flxctr_snapshot(&tb->common.counters, ERTS_ALC_T_ETS_CTRS, BIF_P);
4110 if (ERTS_FLXCTR_GET_RESULT_AFTER_TRAP == res.type) {
4111 db_unlock(tb, LCK_READ);
4112 BIF_TRAP2(BIF_TRAP_EXPORT(BIF_ets_info_2), BIF_P, res.trap_resume_state, BIF_ARG_2);
4113 } else if (res.type == ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP) {
4114 db_unlock(tb, LCK_READ);
4115 BIF_TRAP2(BIF_TRAP_EXPORT(BIF_ets_info_2), BIF_P, BIF_ARG_1, BIF_ARG_2);
4116 } else if (BIF_ARG_2 == am_size) {
4117 ret = erts_make_integer(res.result[ERTS_DB_TABLE_NITEMS_COUNTER_ID], BIF_P);
4118 } else { /* BIF_ARG_2 == am_memory */
4119 Sint r = res.result[ERTS_DB_TABLE_MEM_COUNTER_ID];
4120 r = (Sint) ((r + sizeof(Sint) - 1) / sizeof(Sint));
4121 ret = erts_make_integer(r, BIF_P);
4122 }
4123 } else {
4124 ret = table_info(BIF_P, tb, BIF_ARG_2);
4125 }
4126 db_unlock(tb, LCK_READ);
4127 if (is_non_value(ret)) {
4128 BIF_ERROR(BIF_P, BADARG);
4129 }
4130 BIF_RET(ret);
4131 }
4132
4133
ets_is_compiled_ms_1(BIF_ALIST_1)4134 BIF_RETTYPE ets_is_compiled_ms_1(BIF_ALIST_1)
4135 {
4136 if (erts_db_get_match_prog_binary(BIF_ARG_1)) {
4137 BIF_RET(am_true);
4138 } else {
4139 BIF_RET(am_false);
4140 }
4141 }
4142
ets_match_spec_compile_1(BIF_ALIST_1)4143 BIF_RETTYPE ets_match_spec_compile_1(BIF_ALIST_1)
4144 {
4145 Uint freason;
4146 Binary *mp = db_match_set_compile(BIF_P, BIF_ARG_1, DCOMP_TABLE, &freason);
4147 Eterm *hp;
4148 if (mp == NULL) {
4149 BIF_ERROR(BIF_P, freason);
4150 }
4151
4152 hp = HAlloc(BIF_P, ERTS_MAGIC_REF_THING_SIZE);
4153
4154 BIF_RET(erts_db_make_match_prog_ref(BIF_P, mp, &hp));
4155 }
4156
ets_match_spec_run_r_3(BIF_ALIST_3)4157 BIF_RETTYPE ets_match_spec_run_r_3(BIF_ALIST_3)
4158 {
4159 Eterm ret = BIF_ARG_3;
4160 int i = 0;
4161 Eterm *hp;
4162 Eterm lst;
4163 Binary *mp;
4164 Eterm res;
4165 Uint32 dummy;
4166
4167 if (!(is_list(BIF_ARG_1) || BIF_ARG_1 == NIL)) {
4168 error:
4169 BIF_ERROR(BIF_P, BADARG);
4170 }
4171
4172 mp = erts_db_get_match_prog_binary(BIF_ARG_2);
4173 if (!mp)
4174 goto error;
4175
4176 if (BIF_ARG_1 == NIL) {
4177 BIF_RET(BIF_ARG_3);
4178 }
4179 for (lst = BIF_ARG_1; is_list(lst); lst = CDR(list_val(lst))) {
4180 if (++i > CONTEXT_REDS) {
4181 BUMP_ALL_REDS(BIF_P);
4182 BIF_TRAP3(BIF_TRAP_EXPORT(BIF_ets_match_spec_run_r_3),
4183 BIF_P,lst,BIF_ARG_2,ret);
4184 }
4185 res = db_prog_match(BIF_P, BIF_P,
4186 mp, CAR(list_val(lst)), NULL, 0,
4187 ERTS_PAM_COPY_RESULT, &dummy);
4188 if (is_value(res)) {
4189 hp = HAlloc(BIF_P, 2);
4190 ret = CONS(hp,res,ret);
4191 /*hp += 2;*/
4192 }
4193 }
4194 if (lst != NIL) {
4195 goto error;
4196 }
4197 BIF_RET2(ret,i);
4198 }
4199
erts_internal_ets_lookup_binary_info_2(BIF_ALIST_2)4200 BIF_RETTYPE erts_internal_ets_lookup_binary_info_2(BIF_ALIST_2)
4201 {
4202 DbTable* tb;
4203 int cret;
4204 Eterm ret;
4205
4206 CHECK_TABLES();
4207
4208 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_erts_internal_ets_lookup_binary_info_2);
4209
4210 cret = tb->common.meth->db_get_binary_info(BIF_P, tb, BIF_ARG_2, &ret);
4211
4212 db_unlock(tb, LCK_READ);
4213
4214 switch (cret) {
4215 case DB_ERROR_NONE:
4216 BIF_RET(ret);
4217 case DB_ERROR_SYSRES:
4218 BIF_ERROR(BIF_P, SYSTEM_LIMIT);
4219 default:
4220 BIF_ERROR(BIF_P, BADARG);
4221 }
4222 }
4223
erts_internal_ets_raw_first_1(BIF_ALIST_1)4224 BIF_RETTYPE erts_internal_ets_raw_first_1(BIF_ALIST_1)
4225 {
4226 DbTable* tb;
4227 int cret;
4228 Eterm ret;
4229
4230 CHECK_TABLES();
4231
4232 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_erts_internal_ets_raw_first_1);
4233
4234 cret = tb->common.meth->db_raw_first(BIF_P, tb, &ret);
4235
4236 db_unlock(tb, LCK_READ);
4237
4238 if (cret != DB_ERROR_NONE) {
4239 BIF_ERROR(BIF_P, BADARG);
4240 }
4241 BIF_RET(ret);
4242 }
4243
erts_internal_ets_raw_next_2(BIF_ALIST_2)4244 BIF_RETTYPE erts_internal_ets_raw_next_2(BIF_ALIST_2)
4245 {
4246 DbTable* tb;
4247 int cret;
4248 Eterm ret;
4249
4250 CHECK_TABLES();
4251
4252 DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_erts_internal_ets_raw_next_2);
4253
4254 cret = tb->common.meth->db_raw_next(BIF_P, tb, BIF_ARG_2, &ret);
4255
4256 db_unlock(tb, LCK_READ);
4257
4258 if (cret != DB_ERROR_NONE) {
4259 BIF_ERROR(BIF_P, BADARG);
4260 }
4261 BIF_RET(ret);
4262 }
4263
4264 BIF_RETTYPE
erts_internal_ets_super_user_1(BIF_ALIST_1)4265 erts_internal_ets_super_user_1(BIF_ALIST_1)
4266 {
4267 if (BIF_ARG_1 == am_true)
4268 BIF_P->flags |= F_ETS_SUPER_USER;
4269 else if (BIF_ARG_1 == am_false)
4270 BIF_P->flags &= ~F_ETS_SUPER_USER;
4271 else
4272 BIF_ERROR(BIF_P, BADARG);
4273 BIF_RET(am_ok);
4274 }
4275
4276 /*
4277 ** External interface (NOT BIF's)
4278 */
4279
4280 int erts_ets_rwmtx_spin_count = -1;
4281
4282 /* Init the db */
4283
init_db(ErtsDbSpinCount db_spin_count)4284 void init_db(ErtsDbSpinCount db_spin_count)
4285 {
4286 int i;
4287 unsigned bits;
4288 size_t size;
4289
4290 int max_spin_count = (1 << 15) - 1; /* internal limit */
4291 erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
4292 rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
4293 rwmtx_opt.lived = ERTS_RWMTX_LONG_LIVED;
4294
4295 switch (db_spin_count) {
4296 case ERTS_DB_SPNCNT_NONE:
4297 erts_ets_rwmtx_spin_count = 0;
4298 break;
4299 case ERTS_DB_SPNCNT_VERY_LOW:
4300 erts_ets_rwmtx_spin_count = 100;
4301 break;
4302 case ERTS_DB_SPNCNT_LOW:
4303 erts_ets_rwmtx_spin_count = 200;
4304 erts_ets_rwmtx_spin_count += erts_no_schedulers * 50;
4305 if (erts_ets_rwmtx_spin_count > 1000)
4306 erts_ets_rwmtx_spin_count = 1000;
4307 break;
4308 case ERTS_DB_SPNCNT_HIGH:
4309 erts_ets_rwmtx_spin_count = 2000;
4310 erts_ets_rwmtx_spin_count += erts_no_schedulers * 100;
4311 if (erts_ets_rwmtx_spin_count > 15000)
4312 erts_ets_rwmtx_spin_count = 15000;
4313 break;
4314 case ERTS_DB_SPNCNT_VERY_HIGH:
4315 erts_ets_rwmtx_spin_count = 15000;
4316 erts_ets_rwmtx_spin_count += erts_no_schedulers * 500;
4317 if (erts_ets_rwmtx_spin_count > max_spin_count)
4318 erts_ets_rwmtx_spin_count = max_spin_count;
4319 break;
4320 case ERTS_DB_SPNCNT_EXTREMELY_HIGH:
4321 erts_ets_rwmtx_spin_count = max_spin_count;
4322 break;
4323 case ERTS_DB_SPNCNT_NORMAL:
4324 default:
4325 erts_ets_rwmtx_spin_count = -1;
4326 break;
4327 }
4328
4329 if (erts_ets_rwmtx_spin_count >= 0)
4330 rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
4331
4332 for (i=0; i<META_NAME_TAB_LOCK_CNT; i++) {
4333 erts_rwmtx_init_opt(&meta_name_tab_rwlocks[i].lck, &rwmtx_opt,
4334 "meta_name_tab", make_small(i),
4335 ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DB);
4336 }
4337
4338 erts_atomic_init_nob(&erts_ets_misc_mem_size, 0);
4339 db_initialize_util();
4340
4341 if (user_requested_db_max_tabs < DB_DEF_MAX_TABS)
4342 db_max_tabs = DB_DEF_MAX_TABS;
4343 else
4344 db_max_tabs = user_requested_db_max_tabs;
4345
4346 bits = erts_fit_in_bits_int32(db_max_tabs-1);
4347 if (bits > SMALL_BITS) {
4348 erts_exit(ERTS_ERROR_EXIT,"Max limit for ets tabled too high %u (max %u).",
4349 db_max_tabs, ((Uint)1)<<SMALL_BITS);
4350 }
4351
4352 /*
4353 * We don't have ony hard limit for number of tables anymore, .
4354 * but we use 'db_max_tabs' to determine size of name hash table.
4355 */
4356 meta_name_tab_mask = (((Uint) 1)<<bits) - 1;
4357 size = sizeof(struct meta_name_tab_entry)*(meta_name_tab_mask+1);
4358 meta_name_tab = erts_db_alloc_nt(ERTS_ALC_T_DB_TABLES, size);
4359 ERTS_ETS_MISC_MEM_ADD(size);
4360
4361 for (i=0; i<=meta_name_tab_mask; i++) {
4362 meta_name_tab[i].pu.tb = NULL;
4363 meta_name_tab[i].u.name_atom = NIL;
4364 }
4365
4366 db_initialize_hash();
4367 db_initialize_tree();
4368 db_initialize_catree();
4369
4370 /* Non visual BIF to trap to. */
4371 erts_init_trap_export(&ets_select_delete_continue_exp,
4372 am_ets, ERTS_MAKE_AM("select_delete_trap"), 1,
4373 &ets_select_delete_trap_1);
4374
4375 /* Non visual BIF to trap to. */
4376 erts_init_trap_export(&ets_select_count_continue_exp,
4377 am_ets, ERTS_MAKE_AM("count_trap"), 1,
4378 &ets_select_count_1);
4379
4380 /* Non visual BIF to trap to. */
4381 erts_init_trap_export(&ets_select_replace_continue_exp,
4382 am_ets, ERTS_MAKE_AM("replace_trap"), 1,
4383 &ets_select_replace_1);
4384
4385 /* Non visual BIF to trap to. */
4386 erts_init_trap_export(&ets_select_continue_exp,
4387 am_ets, ERTS_MAKE_AM("select_trap"), 1,
4388 &ets_select_trap_1);
4389
4390 /* Non visual BIF to trap to. */
4391 erts_init_trap_export(&ets_delete_continue_exp,
4392 am_ets, ERTS_MAKE_AM("delete_trap"), 1,
4393 &ets_delete_trap);
4394
4395 /* ets:info(Tab, binary) trap... */
4396
4397 ets_info_binary_trap = erts_export_put(am_erts_internal,
4398 am_ets_info_binary,
4399 1);
4400 }
4401
4402 void
erts_ets_sched_spec_data_init(ErtsSchedulerData * esdp)4403 erts_ets_sched_spec_data_init(ErtsSchedulerData *esdp)
4404 {
4405 ErtsEtsAllYieldData *eaydp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all);
4406 eaydp->ongoing = NULL;
4407 eaydp->hfrag = NULL;
4408 eaydp->tab = NULL;
4409 eaydp->queue = NULL;
4410 esdp->ets_tables.clist = NULL;
4411 erts_atomic_init_nob(&esdp->ets_tables.count, 0);
4412 }
4413
4414
4415 /* In: Table LCK_WRITE
4416 ** Return TRUE : ok, table not mine and NOT locked anymore.
4417 ** Return FALSE: failed, table still mine (LCK_WRITE)
4418 */
give_away_to_heir(Process * p,DbTable * tb)4419 static int give_away_to_heir(Process* p, DbTable* tb)
4420 {
4421 Process* to_proc;
4422 ErtsProcLocks to_locks = ERTS_PROC_LOCK_MAIN;
4423 Eterm to_pid;
4424 UWord heir_data;
4425
4426 ASSERT(tb->common.owner == p->common.id);
4427 ASSERT(is_internal_pid(tb->common.heir));
4428 ASSERT(tb->common.heir != p->common.id);
4429 retry:
4430 to_pid = tb->common.heir;
4431 to_proc = erts_pid2proc_opt(p, ERTS_PROC_LOCK_MAIN,
4432 to_pid, to_locks,
4433 ERTS_P2P_FLG_TRY_LOCK);
4434 if (to_proc == ERTS_PROC_LOCK_BUSY) {
4435 db_unlock(tb,LCK_WRITE);
4436 to_proc = erts_pid2proc(p, ERTS_PROC_LOCK_MAIN,
4437 to_pid, to_locks);
4438 db_lock(tb,LCK_WRITE);
4439 ASSERT(tb != NULL);
4440
4441 if (tb->common.owner != p->common.id) {
4442 if (to_proc != NULL ) {
4443 erts_proc_unlock(to_proc, to_locks);
4444 }
4445 db_unlock(tb,LCK_WRITE);
4446 return !0; /* ok, someone already gave my table away */
4447 }
4448 if (tb->common.heir != to_pid) { /* someone changed the heir */
4449 if (to_proc != NULL ) {
4450 erts_proc_unlock(to_proc, to_locks);
4451 }
4452 if (to_pid == p->common.id || to_pid == am_none) {
4453 return 0; /* no real heir, table still mine */
4454 }
4455 goto retry;
4456 }
4457 }
4458 if (to_proc == NULL) {
4459 return 0; /* heir not alive, table still mine */
4460 }
4461 if (to_proc->common.u.alive.started_interval
4462 != tb->common.heir_started_interval) {
4463 erts_proc_unlock(to_proc, to_locks);
4464 return 0; /* heir dead and pid reused, table still mine */
4465 }
4466
4467 delete_owned_table(p, tb);
4468 to_proc->flags |= F_USING_DB;
4469 tb->common.owner = to_pid;
4470 save_owned_table(to_proc, tb);
4471
4472 db_unlock(tb,LCK_WRITE);
4473 heir_data = tb->common.heir_data;
4474 if (!is_immed(heir_data)) {
4475 Eterm* tpv = ((DbTerm*)heir_data)->tpl; /* tuple_val */
4476 ASSERT(arityval(*tpv) == 1);
4477 heir_data = tpv[1];
4478 }
4479 send_ets_transfer_message(p, to_proc, &to_locks, tb, heir_data);
4480 erts_proc_unlock(to_proc, to_locks);
4481 return !0;
4482 }
4483
4484 static void
send_ets_transfer_message(Process * c_p,Process * proc,ErtsProcLocks * locks,DbTable * tb,Eterm heir_data)4485 send_ets_transfer_message(Process *c_p, Process *proc,
4486 ErtsProcLocks *locks,
4487 DbTable *tb, Eterm heir_data)
4488 {
4489 Uint hsz, hd_sz;
4490 ErtsMessage *mp;
4491 Eterm *hp;
4492 ErlOffHeap *ohp;
4493 Eterm tid, hd_copy, msg, sender;
4494
4495 hsz = 5;
4496 if (!is_table_named(tb))
4497 hsz += ERTS_MAGIC_REF_THING_SIZE;
4498 if (is_immed(heir_data))
4499 hd_sz = 0;
4500 else {
4501 hd_sz = size_object(heir_data);
4502 hsz += hd_sz;
4503 }
4504
4505 mp = erts_alloc_message_heap(proc, locks, hsz, &hp, &ohp);
4506 if (is_table_named(tb))
4507 tid = tb->common.the_name;
4508 else
4509 tid = erts_mk_magic_ref(&hp, ohp, tb->common.btid);
4510 if (!hd_sz)
4511 hd_copy = heir_data;
4512 else
4513 hd_copy = copy_struct(heir_data, hd_sz, &hp, ohp);
4514 sender = c_p->common.id;
4515 msg = TUPLE4(hp, am_ETS_TRANSFER, tid, sender, hd_copy);
4516 ERL_MESSAGE_TOKEN(mp) = am_undefined;
4517 erts_queue_proc_message(c_p, proc, *locks, mp, msg);
4518 }
4519
4520
4521 /* Auto-release fixation from exiting process */
proc_cleanup_fixed_table(Process * p,DbFixation * fix)4522 static SWord proc_cleanup_fixed_table(Process* p, DbFixation* fix)
4523 {
4524 DbTable* tb = btid2tab(fix->tabs.btid);
4525 SWord work = 0;
4526
4527 ASSERT(fix->procs.p == p); (void)p;
4528 if (tb) {
4529 db_lock(tb, LCK_WRITE_REC);
4530 if (!(tb->common.status & DB_DELETE)) {
4531 erts_aint_t diff;
4532 int use_locks = !DB_LOCK_FREE(tb);
4533
4534 if (use_locks)
4535 erts_mtx_lock(&tb->common.fixlock);
4536
4537 ASSERT(fixing_procs_rbt_lookup(tb->common.fixing_procs, p));
4538
4539 diff = -((erts_aint_t) fix->counter);
4540 erts_refc_add(&tb->common.fix_count,diff,0);
4541 fix->counter = 0;
4542
4543 fixing_procs_rbt_delete(&tb->common.fixing_procs, fix);
4544
4545 if (use_locks)
4546 erts_mtx_unlock(&tb->common.fixlock);
4547
4548 if (!IS_FIXED(tb) && IS_HASH_TABLE(tb->common.status)) {
4549 work += db_unfix_table_hash(&(tb->hash));
4550 }
4551
4552 ASSERT(sizeof(DbFixation) == ERTS_ALC_DBG_BLK_SZ(fix));
4553 ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof(DbFixation), 0);
4554 }
4555 db_unlock(tb, LCK_WRITE_REC);
4556 }
4557
4558 erts_bin_release(fix->tabs.btid);
4559 erts_free(ERTS_ALC_T_DB_FIXATION, fix);
4560 ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
4561 ++work;
4562
4563 return work;
4564 }
4565
4566
4567 /*
4568 * erts_db_process_exiting() is called when a process terminates.
4569 * It returns 0 when completely done, and !0 when it wants to
4570 * yield. *yield_state can hold a pointer to a state while
4571 * yielding.
4572 */
4573 #define ERTS_DB_INTERNAL_ERROR(LSTR) \
4574 erts_exit(ERTS_ABORT_EXIT, "%s:%d:erts_db_process_exiting(): " LSTR "\n", \
4575 __FILE__, __LINE__)
4576
4577 int
erts_db_process_exiting(Process * c_p,ErtsProcLocks c_p_locks,void ** yield_state)4578 erts_db_process_exiting(Process *c_p, ErtsProcLocks c_p_locks, void **yield_state)
4579 {
4580 typedef struct {
4581 enum {
4582 GET_OWNED_TABLE,
4583 FREE_OWNED_TABLE,
4584 UNFIX_TABLES,
4585 }op;
4586 DbTable *tb;
4587 } CleanupState;
4588 CleanupState *state = (CleanupState *) *yield_state;
4589 Eterm pid = c_p->common.id;
4590 CleanupState default_state;
4591 SWord initial_reds = ERTS_BIF_REDS_LEFT(c_p);
4592 SWord reds = initial_reds;
4593
4594 if (!state) {
4595 state = &default_state;
4596 state->op = GET_OWNED_TABLE;
4597 state->tb = NULL;
4598 }
4599
4600 do {
4601 switch (state->op) {
4602 case GET_OWNED_TABLE: {
4603 DbTable* tb;
4604 erts_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
4605 tb = (DbTable*) erts_psd_get(c_p, ERTS_PSD_ETS_OWNED_TABLES);
4606 erts_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
4607
4608 if (!tb) {
4609 /* Done with owned tables; now fixations */
4610 state->op = UNFIX_TABLES;
4611 break;
4612 }
4613
4614 ASSERT(tb != state->tb);
4615 state->tb = tb;
4616 db_lock(tb, LCK_WRITE);
4617 /*
4618 * Ownership may have changed since we looked up the table.
4619 */
4620 if (tb->common.owner != pid) {
4621 db_unlock(tb, LCK_WRITE);
4622 break;
4623 }
4624 if (tb->common.heir != am_none
4625 && tb->common.heir != pid
4626 && give_away_to_heir(c_p, tb)) {
4627 break;
4628 }
4629 /* Clear all access bits. */
4630 tb->common.status &= ~(DB_PROTECTED | DB_PUBLIC | DB_PRIVATE);
4631 tb->common.status |= DB_DELETE;
4632
4633 if (is_table_named(tb))
4634 remove_named_tab(tb, 0);
4635
4636 free_heir_data(tb);
4637 reds -= free_fixations_locked(c_p, tb);
4638 tid_clear(c_p, tb);
4639 db_unlock(tb, LCK_WRITE);
4640 state->op = FREE_OWNED_TABLE;
4641 break;
4642 }
4643 case FREE_OWNED_TABLE:
4644 reds = free_table_continue(c_p, state->tb, reds);
4645 if (reds < 0)
4646 goto yield;
4647
4648 state->op = GET_OWNED_TABLE;
4649 break;
4650
4651 case UNFIX_TABLES: {
4652 DbFixation* fix;
4653
4654 fix = (DbFixation*) erts_psd_get(c_p, ERTS_PSD_ETS_FIXED_TABLES);
4655
4656 if (!fix) {
4657 /* Done */
4658
4659 if (state != &default_state)
4660 erts_free(ERTS_ALC_T_DB_PROC_CLEANUP, state);
4661 *yield_state = NULL;
4662
4663 BUMP_REDS(c_p, (initial_reds - reds));
4664 return 0;
4665 }
4666
4667 fixed_tabs_delete(c_p, fix);
4668 reds -= proc_cleanup_fixed_table(c_p, fix);
4669
4670 break;
4671 }
4672 default:
4673 ERTS_DB_INTERNAL_ERROR("Bad internal state");
4674 }
4675
4676 } while (reds > 0);
4677
4678 yield:
4679
4680 if (state == &default_state) {
4681 *yield_state = erts_alloc(ERTS_ALC_T_DB_PROC_CLEANUP,
4682 sizeof(CleanupState));
4683 sys_memcpy(*yield_state, (void*) state, sizeof(CleanupState));
4684 }
4685 else
4686 ASSERT(state == *yield_state);
4687
4688 return !0;
4689 }
4690
4691
4692 /* SMP note: table only need to be LCK_READ locked */
fix_table_locked(Process * p,DbTable * tb)4693 static void fix_table_locked(Process* p, DbTable* tb)
4694 {
4695 DbFixation *fix;
4696 int use_locks = !DB_LOCK_FREE(tb);
4697
4698 if (use_locks)
4699 erts_mtx_lock(&tb->common.fixlock);
4700
4701 erts_refc_inc(&tb->common.fix_count,1);
4702 fix = tb->common.fixing_procs;
4703 if (fix == NULL) {
4704 tb->common.time.monotonic
4705 = erts_get_monotonic_time(erts_proc_sched_data(p));
4706 tb->common.time.offset = erts_get_time_offset();
4707 }
4708 else {
4709 fix = fixing_procs_rbt_lookup(fix, p);
4710 if (fix) {
4711 ASSERT(fixed_tabs_find(NULL, fix));
4712 ++(fix->counter);
4713 if (use_locks)
4714 erts_mtx_unlock(&tb->common.fixlock);
4715 return;
4716 }
4717 }
4718 fix = (DbFixation *) erts_db_alloc(ERTS_ALC_T_DB_FIXATION,
4719 tb, sizeof(DbFixation));
4720 ERTS_ETS_MISC_MEM_ADD(sizeof(DbFixation));
4721 fix->tabs.btid = tb->common.btid;
4722 erts_refc_inc(&fix->tabs.btid->intern.refc, 2);
4723 fix->procs.p = p;
4724 fix->counter = 1;
4725 fixing_procs_rbt_insert(&tb->common.fixing_procs, fix);
4726
4727 if (use_locks)
4728 erts_mtx_unlock(&tb->common.fixlock);
4729
4730 p->flags |= F_USING_DB;
4731
4732 fixed_tabs_insert(p, fix);
4733 }
4734
4735 /* SMP note: May re-lock table
4736 */
unfix_table_locked(Process * p,DbTable * tb,db_lock_kind_t * kind_p)4737 static void unfix_table_locked(Process* p, DbTable* tb,
4738 db_lock_kind_t* kind_p)
4739 {
4740 DbFixation* fix;
4741 int use_locks = !DB_LOCK_FREE(tb);
4742
4743 if (use_locks)
4744 erts_mtx_lock(&tb->common.fixlock);
4745
4746 fix = fixing_procs_rbt_lookup(tb->common.fixing_procs, p);
4747
4748 if (fix) {
4749 erts_refc_dec(&tb->common.fix_count,0);
4750 --(fix->counter);
4751 ASSERT(fix->counter >= 0);
4752 if (fix->counter == 0) {
4753 fixing_procs_rbt_delete(&tb->common.fixing_procs, fix);
4754 if (use_locks)
4755 erts_mtx_unlock(&tb->common.fixlock);
4756 fixed_tabs_delete(p, fix);
4757
4758 erts_refc_dec(&fix->tabs.btid->intern.refc, 1);
4759
4760 erts_db_free(ERTS_ALC_T_DB_FIXATION,
4761 tb, (void *) fix, sizeof(DbFixation));
4762 ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
4763 goto unlocked;
4764 }
4765 }
4766 if (use_locks)
4767 erts_mtx_unlock(&tb->common.fixlock);
4768 unlocked:
4769
4770 if (!IS_FIXED(tb) && IS_HASH_TABLE(tb->common.status)
4771 && erts_atomic_read_nob(&tb->hash.fixdel) != (erts_aint_t)NULL) {
4772 if (*kind_p == LCK_READ && tb->common.is_thread_safe) {
4773 /* Must have write lock while purging pseudo-deleted (OTP-8166) */
4774 if (use_locks) {
4775 erts_rwmtx_runlock(&tb->common.rwlock);
4776 erts_rwmtx_rwlock(&tb->common.rwlock);
4777 }
4778 *kind_p = LCK_WRITE;
4779 if (tb->common.status & (DB_DELETE|DB_BUSY))
4780 return;
4781 }
4782 db_unfix_table_hash(&(tb->hash));
4783 }
4784 }
4785
4786 struct free_fixations_ctx
4787 {
4788 Process* p;
4789 DbTable* tb;
4790 SWord cnt;
4791 };
4792
free_fixations_op(DbFixation * fix,void * vctx,Sint reds)4793 static int free_fixations_op(DbFixation* fix, void* vctx, Sint reds)
4794 {
4795 struct free_fixations_ctx* ctx = (struct free_fixations_ctx*) vctx;
4796 erts_aint_t diff;
4797
4798 ASSERT(btid2tab(fix->tabs.btid) == ctx->tb);
4799 ASSERT(fix->counter > 0);
4800 ASSERT(ctx->tb->common.status & DB_DELETE);
4801
4802 diff = -((erts_aint_t) fix->counter);
4803 erts_refc_add(&ctx->tb->common.fix_count, diff, 0);
4804
4805 if (fix->procs.p != ctx->p) { /* Fixated by other process */
4806 fix->counter = 0;
4807
4808 /* Fake memory stats for table */
4809 ASSERT(sizeof(DbFixation) == ERTS_ALC_DBG_BLK_SZ(fix));
4810 ERTS_DB_ALC_MEM_UPDATE_(ctx->tb, sizeof(DbFixation), 0);
4811
4812 erts_schedule_ets_free_fixation(fix->procs.p->common.id, fix);
4813 /*
4814 * Either sys task is scheduled and erts_db_execute_free_fixation()
4815 * will remove 'fix' or process will exit, drop sys task and
4816 * proc_cleanup_fixed_table() will remove 'fix'.
4817 */
4818 }
4819 else
4820 {
4821 fixed_tabs_delete(fix->procs.p, fix);
4822
4823 erts_bin_release(fix->tabs.btid);
4824
4825 erts_db_free(ERTS_ALC_T_DB_FIXATION,
4826 ctx->tb, (void *) fix, sizeof(DbFixation));
4827 ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
4828 }
4829 ctx->cnt++;
4830 return 1;
4831 }
4832
erts_db_execute_free_fixation(Process * p,DbFixation * fix)4833 int erts_db_execute_free_fixation(Process* p, DbFixation* fix)
4834 {
4835 ASSERT(fix->counter == 0);
4836 fixed_tabs_delete(p, fix);
4837
4838 erts_bin_release(fix->tabs.btid);
4839
4840 erts_free(ERTS_ALC_T_DB_FIXATION, fix);
4841 ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
4842 return 1;
4843 }
4844
free_fixations_locked(Process * p,DbTable * tb)4845 static SWord free_fixations_locked(Process* p, DbTable *tb)
4846 {
4847 struct free_fixations_ctx ctx;
4848
4849 ERTS_LC_ASSERT(DB_LOCK_FREE(tb) || erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
4850
4851 ctx.p = p;
4852 ctx.tb = tb;
4853 ctx.cnt = 0;
4854 fixing_procs_rbt_foreach_destroy(&tb->common.fixing_procs,
4855 free_fixations_op, &ctx);
4856 tb->common.fixing_procs = NULL;
4857 return ctx.cnt;
4858 }
4859
set_heir(Process * me,DbTable * tb,Eterm heir,UWord heir_data)4860 static void set_heir(Process* me, DbTable* tb, Eterm heir, UWord heir_data)
4861 {
4862 tb->common.heir = heir;
4863 if (heir == am_none) {
4864 return;
4865 }
4866 if (heir == me->common.id) {
4867 erts_ensure_later_proc_interval(me->common.u.alive.started_interval);
4868 tb->common.heir_started_interval = me->common.u.alive.started_interval;
4869 }
4870 else {
4871 Process* heir_proc= erts_proc_lookup(heir);
4872 if (heir_proc != NULL) {
4873 erts_ensure_later_proc_interval(heir_proc->common.u.alive.started_interval);
4874 tb->common.heir_started_interval = heir_proc->common.u.alive.started_interval;
4875 } else {
4876 tb->common.heir = am_none;
4877 }
4878 }
4879
4880 if (!is_immed(heir_data)) {
4881 DeclareTmpHeap(tmp,2,me);
4882 Eterm wrap_tpl;
4883 int size;
4884 DbTerm* dbterm;
4885 Eterm* top;
4886 ErlOffHeap tmp_offheap;
4887
4888 UseTmpHeap(2,me);
4889 /* Make a dummy 1-tuple around data to use DbTerm */
4890 wrap_tpl = TUPLE1(tmp,heir_data);
4891 size = size_object(wrap_tpl);
4892 dbterm = erts_db_alloc(ERTS_ALC_T_DB_HEIR_DATA, (DbTable *)tb,
4893 (sizeof(DbTerm) + sizeof(Eterm)*(size-1)));
4894 dbterm->size = size;
4895 top = dbterm->tpl;
4896 tmp_offheap.first = NULL;
4897 copy_struct(wrap_tpl, size, &top, &tmp_offheap);
4898 dbterm->first_oh = tmp_offheap.first;
4899 heir_data = (UWord)dbterm;
4900 UnUseTmpHeap(2,me);
4901 ASSERT(!is_immed(heir_data));
4902 }
4903 tb->common.heir_data = heir_data;
4904 }
4905
free_heir_data(DbTable * tb)4906 static void free_heir_data(DbTable* tb)
4907 {
4908 if (tb->common.heir != am_none && !is_immed(tb->common.heir_data)) {
4909 DbTerm* p = (DbTerm*) tb->common.heir_data;
4910 db_cleanup_offheap_comp(p);
4911 erts_db_free(ERTS_ALC_T_DB_HEIR_DATA, tb, (void *)p,
4912 sizeof(DbTerm) + (p->size-1)*sizeof(Eterm));
4913 }
4914 #ifdef DEBUG
4915 tb->common.heir_data = am_undefined;
4916 #endif
4917 }
4918
ets_delete_trap(BIF_ALIST_1)4919 static BIF_RETTYPE ets_delete_trap(BIF_ALIST_1)
4920 {
4921 SWord initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
4922 SWord reds = initial_reds;
4923 Eterm cont = BIF_ARG_1;
4924 Eterm* ptr = big_val(cont);
4925 DbTable *tb = *((DbTable **) (UWord) (ptr + 1));
4926
4927 ASSERT(*ptr == make_pos_bignum_header(1));
4928
4929 reds = free_table_continue(BIF_P, tb, reds);
4930 if (reds < 0) {
4931 BUMP_ALL_REDS(BIF_P);
4932 BIF_TRAP1(&ets_delete_continue_exp, BIF_P, cont);
4933 }
4934 else {
4935 BUMP_REDS(BIF_P, (initial_reds - reds));
4936 BIF_RET(am_true);
4937 }
4938 }
4939
4940
4941 /*
4942 * free_table_continue() returns reductions left
4943 * done if >= 0
4944 * yield if < 0
4945 */
free_table_continue(Process * p,DbTable * tb,SWord reds)4946 static SWord free_table_continue(Process *p, DbTable *tb, SWord reds)
4947 {
4948 reds = tb->common.meth->db_free_table_continue(tb, reds);
4949
4950 if (reds < 0) {
4951 #ifdef HARDDEBUG
4952 erts_fprintf(stderr,"ets: free_table_cont %T (continue begin)\r\n",
4953 tb->common.id);
4954 #endif
4955 /* More work to be done. Let other processes work and call us again. */
4956 }
4957 else {
4958 #ifdef HARDDEBUG
4959 erts_fprintf(stderr,"ets: free_table_cont %T (continue end)\r\n",
4960 tb->common.id);
4961 #endif
4962 /* Completely done - we will not get called again. */
4963 delete_owned_table(p, tb);
4964 table_dec_refc(tb, 0);
4965 }
4966 return reds;
4967 }
4968
4969 struct fixing_procs_info_ctx
4970 {
4971 Process* p;
4972 Eterm list;
4973 };
4974
fixing_procs_info_op(DbFixation * fix,void * vctx,Sint reds)4975 static int fixing_procs_info_op(DbFixation* fix, void* vctx, Sint reds)
4976 {
4977 struct fixing_procs_info_ctx* ctx = (struct fixing_procs_info_ctx*) vctx;
4978 Eterm* hp;
4979 Eterm tpl;
4980
4981 hp = HAllocX(ctx->p, 5, 100);
4982 tpl = TUPLE2(hp, fix->procs.p->common.id, make_small(fix->counter));
4983 hp += 3;
4984 ctx->list = CONS(hp, tpl, ctx->list);
4985 return 1;
4986 }
4987
table_info(Process * p,DbTable * tb,Eterm What)4988 static Eterm table_info(Process* p, DbTable* tb, Eterm What)
4989 {
4990 Eterm ret = THE_NON_VALUE;
4991 int use_monotonic;
4992
4993 if (What == am_size) {
4994 Uint size = (Uint) (DB_GET_APPROX_NITEMS(tb));
4995 ret = erts_make_integer(size, p);
4996 } else if (What == am_type) {
4997 if (tb->common.status & DB_SET) {
4998 ret = am_set;
4999 } else if (tb->common.status & DB_DUPLICATE_BAG) {
5000 ret = am_duplicate_bag;
5001 } else if (tb->common.status & DB_ORDERED_SET) {
5002 ret = am_ordered_set;
5003 } else if (tb->common.status & DB_CA_ORDERED_SET) {
5004 ret = am_ordered_set;
5005 } else { /*TT*/
5006 ASSERT(tb->common.status & DB_BAG);
5007 ret = am_bag;
5008 }
5009 } else if (What == am_memory) {
5010 Uint words = (Uint) ((DB_GET_APPROX_MEM_CONSUMED(tb)
5011 + sizeof(Uint)
5012 - 1)
5013 / sizeof(Uint));
5014 ret = erts_make_integer(words, p);
5015 } else if (What == am_owner) {
5016 ret = tb->common.owner;
5017 } else if (What == am_heir) {
5018 ret = tb->common.heir;
5019 } else if (What == am_protection) {
5020 if (tb->common.status & DB_PRIVATE)
5021 ret = am_private;
5022 else if (tb->common.status & DB_PROTECTED)
5023 ret = am_protected;
5024 else if (tb->common.status & DB_PUBLIC)
5025 ret = am_public;
5026 } else if (What == am_write_concurrency) {
5027 ret = tb->common.status & DB_FINE_LOCKED ? am_true : am_false;
5028 } else if (What == am_read_concurrency) {
5029 ret = tb->common.status & DB_FREQ_READ ? am_true : am_false;
5030 } else if (What == am_name) {
5031 ret = tb->common.the_name;
5032 } else if (What == am_keypos) {
5033 ret = make_small(tb->common.keypos);
5034 } else if (What == am_node) {
5035 ret = erts_this_dist_entry->sysname;
5036 } else if (What == am_named_table) {
5037 ret = is_table_named(tb) ? am_true : am_false;
5038 } else if (What == am_compressed) {
5039 ret = tb->common.compress ? am_true : am_false;
5040 } else if (What == am_id) {
5041 ret = make_tid(p, tb);
5042 } else if (What == am_decentralized_counters) {
5043 ret = tb->common.counters.is_decentralized ? am_true : am_false;
5044 }
5045
5046 /*
5047 * For debugging purposes
5048 */
5049 else if (What == am_data) {
5050 print_table(ERTS_PRINT_STDOUT, NULL, 1, tb);
5051 ret = am_true;
5052 } else if (ERTS_IS_ATOM_STR("fixed",What)) {
5053 if (IS_FIXED(tb))
5054 ret = am_true;
5055 else
5056 ret = am_false;
5057 } else if ((use_monotonic
5058 = ERTS_IS_ATOM_STR("safe_fixed_monotonic_time",
5059 What))
5060 || ERTS_IS_ATOM_STR("safe_fixed", What)) {
5061 if (!DB_LOCK_FREE(tb))
5062 erts_mtx_lock(&tb->common.fixlock);
5063 if (IS_FIXED(tb)) {
5064 Uint need;
5065 Eterm *hp;
5066 Eterm time;
5067 Sint64 mtime;
5068 struct fixing_procs_info_ctx ctx;
5069
5070 need = 3;
5071 if (use_monotonic) {
5072 mtime = (Sint64) tb->common.time.monotonic;
5073 mtime += ERTS_MONOTONIC_OFFSET_NATIVE;
5074 if (!IS_SSMALL(mtime))
5075 need += ERTS_SINT64_HEAP_SIZE(mtime);
5076 }
5077 else {
5078 mtime = 0;
5079 need += 4;
5080 }
5081 ctx.p = p;
5082 ctx.list = NIL;
5083 fixing_procs_rbt_foreach(tb->common.fixing_procs,
5084 fixing_procs_info_op,
5085 &ctx);
5086
5087 hp = HAlloc(p, need);
5088 if (use_monotonic)
5089 time = (IS_SSMALL(mtime)
5090 ? make_small(mtime)
5091 : erts_sint64_to_big(mtime, &hp));
5092 else {
5093 Uint ms, s, us;
5094 erts_make_timestamp_value(&ms, &s, &us,
5095 tb->common.time.monotonic,
5096 tb->common.time.offset);
5097 time = TUPLE3(hp, make_small(ms), make_small(s), make_small(us));
5098 hp += 4;
5099 }
5100 ret = TUPLE2(hp, time, ctx.list);
5101 } else {
5102 ret = am_false;
5103 }
5104 if (!DB_LOCK_FREE(tb))
5105 erts_mtx_unlock(&tb->common.fixlock);
5106 } else if (ERTS_IS_ATOM_STR("stats",What)) {
5107 if (IS_HASH_TABLE(tb->common.status)) {
5108 FloatDef f;
5109 DbHashStats stats;
5110 Eterm avg, std_dev_real, std_dev_exp;
5111 Eterm* hp;
5112
5113 db_calc_stats_hash(&tb->hash, &stats);
5114 hp = HAlloc(p, 1 + 7 + FLOAT_SIZE_OBJECT*3);
5115 f.fd = stats.avg_chain_len;
5116 avg = make_float(hp);
5117 PUT_DOUBLE(f, hp);
5118 hp += FLOAT_SIZE_OBJECT;
5119
5120 f.fd = stats.std_dev_chain_len;
5121 std_dev_real = make_float(hp);
5122 PUT_DOUBLE(f, hp);
5123 hp += FLOAT_SIZE_OBJECT;
5124
5125 f.fd = stats.std_dev_expected;
5126 std_dev_exp = make_float(hp);
5127 PUT_DOUBLE(f, hp);
5128 hp += FLOAT_SIZE_OBJECT;
5129 ret = TUPLE7(hp, make_small(erts_atomic_read_nob(&tb->hash.nactive)),
5130 avg, std_dev_real, std_dev_exp,
5131 make_small(stats.min_chain_len),
5132 make_small(stats.max_chain_len),
5133 make_small(stats.kept_items));
5134 }
5135 else if (IS_CATREE_TABLE(tb->common.status)) {
5136 DbCATreeStats stats;
5137 Eterm* hp;
5138
5139 db_calc_stats_catree(&tb->catree, &stats);
5140 hp = HAlloc(p, 4);
5141 ret = TUPLE3(hp,
5142 make_small(stats.route_nodes),
5143 make_small(stats.base_nodes),
5144 make_small(stats.max_depth));
5145
5146 }
5147 else
5148 ret = am_false;
5149 }
5150 return ret;
5151 }
5152
print_table(fmtfn_t to,void * to_arg,int show,DbTable * tb)5153 static void print_table(fmtfn_t to, void *to_arg, int show, DbTable* tb)
5154 {
5155 Eterm tid;
5156 Eterm heap[ERTS_MAGIC_REF_THING_SIZE];
5157
5158 if (is_table_named(tb)) {
5159 tid = tb->common.the_name;
5160 } else {
5161 ErlOffHeap oh;
5162 ERTS_INIT_OFF_HEAP(&oh);
5163 write_magic_ref_thing(heap, &oh, (ErtsMagicBinary *) tb->common.btid);
5164 tid = make_internal_ref(heap);
5165 }
5166
5167 erts_print(to, to_arg, "Table: %T\n", tid);
5168 erts_print(to, to_arg, "Name: %T\n", tb->common.the_name);
5169
5170 tb->common.meth->db_print(to, to_arg, show, tb);
5171
5172 erts_print(to, to_arg, "Objects: %d\n", (int)DB_GET_APPROX_NITEMS(tb));
5173 erts_print(to, to_arg, "Words: %bpu\n",
5174 (Uint) ((DB_GET_APPROX_MEM_CONSUMED(tb)
5175 + sizeof(Uint)
5176 - 1)
5177 / sizeof(Uint)));
5178 erts_print(to, to_arg, "Type: %T\n", table_info(NULL, tb, am_type));
5179 erts_print(to, to_arg, "Protection: %T\n", table_info(NULL, tb, am_protection));
5180 erts_print(to, to_arg, "Compressed: %T\n", table_info(NULL, tb, am_compressed));
5181 erts_print(to, to_arg, "Write Concurrency: %T\n", table_info(NULL, tb, am_write_concurrency));
5182 erts_print(to, to_arg, "Read Concurrency: %T\n", table_info(NULL, tb, am_read_concurrency));
5183 }
5184
5185 typedef struct {
5186 fmtfn_t to;
5187 void *to_arg;
5188 int show;
5189 } ErtsPrintDbInfo;
5190
5191 static void
db_info_print(DbTable * tb,void * vpdbip)5192 db_info_print(DbTable *tb, void *vpdbip)
5193 {
5194 ErtsPrintDbInfo *pdbip = (ErtsPrintDbInfo *) vpdbip;
5195 erts_print(pdbip->to, pdbip->to_arg, "=ets:%T\n", tb->common.owner);
5196 erts_print(pdbip->to, pdbip->to_arg, "Slot: %bpu\n", (Uint) tb);
5197 print_table(pdbip->to, pdbip->to_arg, pdbip->show, tb);
5198 }
5199
db_info(fmtfn_t to,void * to_arg,int show)5200 void db_info(fmtfn_t to, void *to_arg, int show) /* Called by break handler */
5201 {
5202 ErtsPrintDbInfo pdbi;
5203
5204 pdbi.to = to;
5205 pdbi.to_arg = to_arg;
5206 pdbi.show = show;
5207
5208 erts_db_foreach_table(db_info_print, &pdbi, !0);
5209 }
5210
5211 Uint
erts_get_ets_misc_mem_size(void)5212 erts_get_ets_misc_mem_size(void)
5213 {
5214 ERTS_THR_MEMORY_BARRIER;
5215 /* Memory not allocated in ets_alloc */
5216 return (Uint) erts_atomic_read_nob(&erts_ets_misc_mem_size);
5217 }
5218
5219 /* SMP Note: May only be used when system is locked */
5220 void
erts_db_foreach_table(void (* func)(DbTable *,void *),void * arg,int alive_only)5221 erts_db_foreach_table(void (*func)(DbTable *, void *), void *arg, int alive_only)
5222 {
5223 int ix;
5224
5225 ASSERT(erts_thr_progress_is_blocking());
5226
5227 for (ix = 0; ix < erts_no_schedulers; ix++) {
5228 ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(ix);
5229 DbTable *first = esdp->ets_tables.clist;
5230 if (first) {
5231 DbTable *tb = first;
5232 do {
5233 if (!alive_only || is_table_alive(tb))
5234 (*func)(tb, arg);
5235 tb = tb->common.all.next;
5236 } while (tb != first);
5237 }
5238 }
5239 }
5240
5241 /* SMP Note: May only be used when system is locked */
5242 void
erts_db_foreach_offheap(DbTable * tb,void (* func)(ErlOffHeap *,void *),void * arg)5243 erts_db_foreach_offheap(DbTable *tb,
5244 void (*func)(ErlOffHeap *, void *),
5245 void *arg)
5246 {
5247 tb->common.meth->db_foreach_offheap(tb, func, arg);
5248 }
5249
5250 void
erts_db_foreach_thr_prgr_offheap(void (* func)(ErlOffHeap *,void *),void * arg)5251 erts_db_foreach_thr_prgr_offheap(void (*func)(ErlOffHeap *, void *),
5252 void *arg)
5253 {
5254 erts_db_foreach_thr_prgr_offheap_hash(func, arg);
5255 erts_db_foreach_thr_prgr_offheap_tree(func, arg);
5256 erts_db_foreach_thr_prgr_offheap_catree(func, arg);
5257 }
5258
5259 /* retrieve max number of ets tables */
5260 Uint
erts_db_get_max_tabs()5261 erts_db_get_max_tabs()
5262 {
5263 return db_max_tabs;
5264 }
5265
erts_ets_table_count(void)5266 Uint erts_ets_table_count(void)
5267 {
5268 Uint tb_count = 0;
5269 Uint six;
5270
5271 for (six = 0; six < erts_no_schedulers; six++) {
5272 ErtsSchedulerData *esdp = &erts_aligned_scheduler_data[six].esd;
5273 tb_count += erts_atomic_read_nob(&esdp->ets_tables.count);
5274 }
5275 return tb_count;
5276 }
5277
5278 /*
5279 * For testing of meta tables only.
5280 *
5281 * Given a name atom (as returned from ets:new/2), return a list of 'cnt'
5282 * number of other names that will hash to the same bucket in meta_name_tab.
5283 *
5284 * WARNING: Will bloat the atom table!
5285 */
5286 Eterm
erts_ets_colliding_names(Process * p,Eterm name,Uint cnt)5287 erts_ets_colliding_names(Process* p, Eterm name, Uint cnt)
5288 {
5289 Eterm list = NIL;
5290 Eterm* hp = HAlloc(p,cnt*2);
5291 Uint index = atom_val(name) & meta_name_tab_mask;
5292
5293 while (cnt) {
5294 if (index != atom_val(name)) {
5295 while (index >= atom_table_size()) {
5296 char tmp[20];
5297 erts_snprintf(tmp, sizeof(tmp), "am%x", atom_table_size());
5298 erts_atom_put((byte *) tmp, sys_strlen(tmp), ERTS_ATOM_ENC_LATIN1, 1);
5299 }
5300 list = CONS(hp, make_atom(index), list);
5301 hp += 2;
5302 --cnt;
5303 }
5304 index += meta_name_tab_mask + 1;
5305 }
5306 return list;
5307 }
5308
5309 #ifdef ERTS_ENABLE_LOCK_COUNT
5310
erts_lcnt_enable_db_lock_count(DbTable * tb,int enable)5311 void erts_lcnt_enable_db_lock_count(DbTable *tb, int enable) {
5312 if (DB_LOCK_FREE(tb))
5313 return;
5314 if(enable) {
5315 erts_lcnt_install_new_lock_info(&tb->common.rwlock.lcnt, "db_tab",
5316 tb->common.the_name, ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
5317 erts_lcnt_install_new_lock_info(&tb->common.fixlock.lcnt, "db_tab_fix",
5318 tb->common.the_name, ERTS_LOCK_TYPE_MUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
5319 } else {
5320 erts_lcnt_uninstall(&tb->common.rwlock.lcnt);
5321 erts_lcnt_uninstall(&tb->common.fixlock.lcnt);
5322 }
5323
5324 if(IS_HASH_TABLE(tb->common.status)) {
5325 erts_lcnt_enable_db_hash_lock_count(&tb->hash, enable);
5326 } else if(IS_CATREE_TABLE(tb->common.status)) {
5327 /* erts_lcnt_enable_db_catree_lock_count is not thread safe so
5328 the table needs to get locked */
5329 db_lock(tb, LCK_WRITE);
5330 erts_lcnt_enable_db_catree_lock_count(&tb->catree, enable);
5331 db_unlock(tb, LCK_WRITE);
5332 }
5333 }
5334
lcnt_update_db_locks_per_sched(void * enable)5335 static void lcnt_update_db_locks_per_sched(void *enable) {
5336 ErtsSchedulerData *esdp;
5337 DbTable *head;
5338
5339 esdp = erts_get_scheduler_data();
5340 head = esdp->ets_tables.clist;
5341
5342 if(head) {
5343 DbTable *iterator = head;
5344
5345 do {
5346 if(is_table_alive(iterator)) {
5347 erts_lcnt_enable_db_lock_count(iterator, !!enable);
5348 }
5349
5350 iterator = iterator->common.all.next;
5351 } while (iterator != head);
5352 }
5353 }
5354
erts_lcnt_update_db_locks(int enable)5355 void erts_lcnt_update_db_locks(int enable) {
5356 erts_schedule_multi_misc_aux_work(0, erts_no_schedulers,
5357 &lcnt_update_db_locks_per_sched, (void*)(UWord)enable);
5358 }
5359 #endif /* ERTS_ENABLE_LOCK_COUNT */
5360
5361 #ifdef ETS_DBG_FORCE_TRAP
5362 int erts_ets_dbg_force_trap = 0;
5363 #endif
5364
erts_ets_force_split(Eterm tid,int on)5365 int erts_ets_force_split(Eterm tid, int on)
5366 {
5367 Eterm ignore;
5368 DbTable* tb = tid2tab(tid, &ignore);
5369 if (!tb || !IS_CATREE_TABLE(tb->common.type))
5370 return 0;
5371
5372 db_lock(tb, LCK_WRITE);
5373 if (!(tb->common.status & DB_DELETE))
5374 db_catree_force_split(&tb->catree, on);
5375 db_unlock(tb, LCK_WRITE);
5376 return 1;
5377 }
5378
erts_ets_debug_random_split_join(Eterm tid,int on)5379 int erts_ets_debug_random_split_join(Eterm tid, int on)
5380 {
5381 Eterm ignore;
5382 DbTable* tb = tid2tab(tid, &ignore);
5383 if (!tb || !IS_CATREE_TABLE(tb->common.type))
5384 return 0;
5385
5386 db_lock(tb, LCK_WRITE);
5387 if (!(tb->common.status & DB_DELETE))
5388 db_catree_debug_random_split_join(&tb->catree, on);
5389 db_unlock(tb, LCK_WRITE);
5390 return 1;
5391 }
5392