1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2019. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Author: Kjell Winblad
23  */
24 
25 #include "erl_flxctr.h"
26 
27 static int reader_groups_array_size = 0;
28 #define ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS (reader_groups_array_size)
29 
30 static int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin);
31 static int erts_flxctr_wait_dtor(Binary *context_bin);
32 
33 typedef struct {
34     ErtsThrPrgrLaterOp later_op;
35     Process* process;
36     ErtsFlxCtrDecentralizedCtrArray* array;
37     ErtsFlxCtrDecentralizedCtrArray* next_array;
38     ErtsAlcType_t alloc_type;
39     int nr_of_counters;
40     Sint result[ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE];
41 } DecentralizedReadSnapshotInfo;
42 
43 typedef enum {
44     ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING = 0,
45     ERTS_FLXCTR_SNAPSHOT_ONGOING = 1,
46     ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE = 2
47 } erts_flxctr_snapshot_status;
48 
49 #define ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE    \
50     (sizeof(ErtsFlxCtrDecentralizedCtrArray) +          \
51     (sizeof(ErtsFlxCtrDecentralizedCtrArrayElem) *      \
52      ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS) +              \
53      ERTS_CACHE_LINE_SIZE)
54 
55 #ifdef DEBUG
56 #define FLXCTR_MEM_DEBUG 1
57 #endif
58 
59 #ifdef FLXCTR_MEM_DEBUG
60 static erts_atomic_t debug_mem_usage;
61 #endif
62 
63 #ifdef FLXCTR_MEM_DEBUG
64 #define FLXCTR_FREE(ALLOC_TYPE, ADDRESS) do {                           \
65         erts_free(ALLOC_TYPE, ADDRESS);   \
66         erts_atomic_add_mb(&debug_mem_usage, -ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE); \
67     } while(0)
68 #else
69 #define FLXCTR_FREE(ALLOC_TYPE, ADDRESS) erts_free(ALLOC_TYPE, ADDRESS)
70 #endif
71 
72 static void
thr_prg_wake_up_and_count(void * bin_p)73 thr_prg_wake_up_and_count(void* bin_p)
74 {
75     Binary* bin = bin_p;
76     DecentralizedReadSnapshotInfo* info = ERTS_MAGIC_BIN_DATA(bin);
77     Process* p = info->process;
78     ErtsFlxCtrDecentralizedCtrArray* array = info->array;
79     ErtsFlxCtrDecentralizedCtrArray* next = info->next_array;
80     int i, sched;
81     /* Reset result array */
82     for (i = 0; i < info->nr_of_counters; i++) {
83         info->result[i] = 0;
84     }
85     /* Read result from snapshot */
86     for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
87         for (i = 0; i < info->nr_of_counters; i++) {
88             info->result[i] = info->result[i] +
89                 erts_atomic_read_nob(&array->array[sched].counters[i]);
90         }
91     }
92     /* Update the next decentralized counter array */
93     for (i = 0; i < info->nr_of_counters; i++) {
94         erts_atomic_add_nob(&next->array[0].counters[i], info->result[i]);
95     }
96     /* Announce that the snapshot is done */
97     {
98         Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
99         if (expected != erts_atomic_cmpxchg_mb(&next->snapshot_status,
100                                                ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING,
101                                                expected)) {
102             /* The CAS failed which means that this thread need to free the next array. */
103             FLXCTR_FREE(info->alloc_type, next->block_start);
104         }
105     }
106     /* Resume the process that requested the snapshot */
107     erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
108     if (!ERTS_PROC_IS_EXITING(p)) {
109         erts_resume(p, ERTS_PROC_LOCK_STATUS);
110     }
111     /* Free the memory that is no longer needed */
112     FLXCTR_FREE(info->alloc_type, array->block_start);
113     erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
114     erts_proc_dec_refc(p);
115     erts_bin_release(bin);
116 }
117 
118 typedef struct {
119     ErtsThrPrgrLaterOp later_op;
120     Process* process;
121 } ErtsFlxCtrWakeUpLaterInfo;
122 
123 static void
thr_prg_wake_up_later(void * bin_p)124 thr_prg_wake_up_later(void* bin_p)
125 {
126     Binary* bin = bin_p;
127     ErtsFlxCtrWakeUpLaterInfo* info = ERTS_MAGIC_BIN_DATA(bin);
128     Process* p = info->process;
129     /* Resume the requesting process */
130     erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
131     if (!ERTS_PROC_IS_EXITING(p)) {
132         erts_resume(p, ERTS_PROC_LOCK_STATUS);
133     }
134     erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
135     /* Free data */
136     erts_proc_dec_refc(p);
137     erts_bin_release(bin);
138 }
139 
140 static
erts_flxctr_read_ctx_bin_dtor(Binary * context_bin)141 int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin) {
142     (void)context_bin;
143     return 1;
144 }
145 
146 static
erts_flxctr_wait_dtor(Binary * context_bin)147 int erts_flxctr_wait_dtor(Binary *context_bin) {
148     (void)context_bin;
149     return 1;
150 }
151 
suspend_until_thr_prg(Process * p)152 static void suspend_until_thr_prg(Process* p)
153 {
154     Binary* state_bin;
155     ErtsFlxCtrWakeUpLaterInfo* info;
156     state_bin = erts_create_magic_binary(sizeof(ErtsFlxCtrWakeUpLaterInfo),
157                                          erts_flxctr_wait_dtor);
158     info = ERTS_MAGIC_BIN_DATA(state_bin);
159     info->process = p;
160     erts_refc_inctest(&state_bin->intern.refc, 1);
161     erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
162     erts_proc_inc_refc(p);
163     ERTS_VBUMP_ALL_REDS(p);
164     erts_schedule_thr_prgr_later_op(thr_prg_wake_up_later, state_bin, &info->later_op);
165 }
166 
erts_flxctr_nr_of_allocated_bytes(ErtsFlxCtr * c)167 size_t erts_flxctr_nr_of_allocated_bytes(ErtsFlxCtr* c)
168 {
169     if (c->is_decentralized) {
170         return ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE;
171     } else {
172         return 0;
173     }
174 }
175 
176 static ErtsFlxCtrDecentralizedCtrArray*
create_decentralized_ctr_array(ErtsAlcType_t alloc_type,Uint nr_of_counters)177 create_decentralized_ctr_array(ErtsAlcType_t alloc_type, Uint nr_of_counters) {
178     /* Allocate an ErtsFlxCtrDecentralizedCtrArray and make sure that
179        the array field is located at the start of a cache line */
180     char* bytes =
181         erts_alloc(alloc_type,
182                    ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE);
183     void* block_start = bytes;
184     int bytes_to_next_cacheline_border;
185     ErtsFlxCtrDecentralizedCtrArray* array;
186     int i, sched;
187 #ifdef FLXCTR_MEM_DEBUG
188     erts_atomic_add_mb(&debug_mem_usage, ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE);
189 #endif
190     bytes = &bytes[offsetof(ErtsFlxCtrDecentralizedCtrArray, array)];
191     bytes_to_next_cacheline_border =
192         ERTS_CACHE_LINE_SIZE - (((Uint)bytes) % ERTS_CACHE_LINE_SIZE);
193     array = (ErtsFlxCtrDecentralizedCtrArray*)
194         (&bytes[bytes_to_next_cacheline_border -
195                 (int)offsetof(ErtsFlxCtrDecentralizedCtrArray, array)]);
196     ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
197     ASSERT(((Uint)array - (Uint)block_start) <= ERTS_CACHE_LINE_SIZE);
198     /* Initialize fields */
199     erts_atomic_init_nob(&array->snapshot_status, ERTS_FLXCTR_SNAPSHOT_ONGOING);
200     for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
201         for (i = 0; i < nr_of_counters; i++) {
202             erts_atomic_init_nob(&array->array[sched].counters[i], 0);
203         }
204     }
205     array->block_start = block_start;
206     return array;
207 }
208 
erts_flxctr_setup(int decentralized_counter_groups)209 void erts_flxctr_setup(int decentralized_counter_groups)
210 {
211     reader_groups_array_size = decentralized_counter_groups+1;
212 #ifdef FLXCTR_MEM_DEBUG
213     erts_atomic_init_mb(&debug_mem_usage, 0);
214 #endif
215 }
216 
erts_flxctr_init(ErtsFlxCtr * c,int is_decentralized,Uint nr_of_counters,ErtsAlcType_t alloc_type)217 void erts_flxctr_init(ErtsFlxCtr* c,
218                       int is_decentralized,
219                       Uint nr_of_counters,
220                       ErtsAlcType_t alloc_type)
221 {
222     ASSERT(nr_of_counters <= ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE);
223     c->is_decentralized = is_decentralized;
224     c->nr_of_counters = nr_of_counters;
225     if (c->is_decentralized) {
226         ErtsFlxCtrDecentralizedCtrArray* array =
227             create_decentralized_ctr_array(alloc_type, nr_of_counters);
228         erts_atomic_set_nob(&array->snapshot_status,
229                             ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING);
230         erts_atomic_init_nob(&c->u.counters_ptr, (Sint)array);
231         ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
232     } else {
233         int i;
234         for (i = 0; i < nr_of_counters; i++) {
235             erts_atomic_init_nob(&c->u.counters[i], 0);
236         }
237     }
238 }
239 
erts_flxctr_destroy(ErtsFlxCtr * c,ErtsAlcType_t alloc_type)240 void erts_flxctr_destroy(ErtsFlxCtr* c, ErtsAlcType_t alloc_type)
241 {
242     if (c->is_decentralized) {
243         if (erts_flxctr_is_snapshot_ongoing(c)) {
244             ErtsFlxCtrDecentralizedCtrArray* array =
245                 ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
246             /* Try to delegate the resposibilty of freeing to
247                thr_prg_wake_up_and_count */
248             Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
249             if (expected !=
250                 erts_atomic_cmpxchg_mb(&array->snapshot_status,
251                                        ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE,
252                                        expected)) {
253                 /* The delegation was unsuccessful which means that no
254                    snapshot is ongoing anymore and the freeing needs
255                    to be done here */
256                 ERTS_ASSERT(!erts_flxctr_is_snapshot_ongoing(c));
257                 FLXCTR_FREE(alloc_type, array->block_start);
258             }
259         } else {
260             FLXCTR_FREE(alloc_type, ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->block_start);
261         }
262     }
263 }
264 
265 ErtsFlxCtrSnapshotResult
erts_flxctr_snapshot(ErtsFlxCtr * c,ErtsAlcType_t alloc_type,Process * p)266 erts_flxctr_snapshot(ErtsFlxCtr* c,
267                      ErtsAlcType_t alloc_type,
268                      Process* p)
269 {
270     if (c->is_decentralized) {
271         ErtsFlxCtrDecentralizedCtrArray* array = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
272         if (erts_flxctr_is_snapshot_ongoing(c)) {
273             /* Let the caller try again later */
274             ErtsFlxCtrSnapshotResult res =
275                 {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
276             suspend_until_thr_prg(p);
277             return res;
278         } else {
279             Eterm* hp;
280             Binary* state_bin;
281             Eterm state_mref;
282             DecentralizedReadSnapshotInfo* info;
283             ErtsFlxCtrDecentralizedCtrArray* new_array =
284                 create_decentralized_ctr_array(alloc_type, c->nr_of_counters);
285             int success =
286                 ((Sint)array) == erts_atomic_cmpxchg_mb(&c->u.counters_ptr,
287                                                         (Sint)new_array,
288                                                         (Sint)array);
289             if (!success) {
290                 /* Let the caller try again later */
291                 ErtsFlxCtrSnapshotResult res =
292                     {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
293                 suspend_until_thr_prg(p);
294                 FLXCTR_FREE(alloc_type, new_array->block_start);
295                 return res;
296             }
297             /* Create binary with info about the operation that can be
298                sent to the caller and to a thread progress function */
299             state_bin =
300                 erts_create_magic_binary(sizeof(DecentralizedReadSnapshotInfo),
301                                          erts_flxctr_read_ctx_bin_dtor);
302             hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
303             state_mref = erts_mk_magic_ref(&hp, &MSO(p), state_bin);
304             info = ERTS_MAGIC_BIN_DATA(state_bin);
305             info->alloc_type = alloc_type;
306             info->array = array;
307             info->next_array = new_array;
308             info->process = p;
309             info->nr_of_counters = c->nr_of_counters;
310             erts_proc_inc_refc(p);
311             erts_refc_inctest(&state_bin->intern.refc, 2);
312             erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
313             ERTS_VBUMP_ALL_REDS(p);
314             erts_schedule_thr_prgr_later_op(thr_prg_wake_up_and_count,
315                                             state_bin,
316                                             &info->later_op);
317             {
318                 ErtsFlxCtrSnapshotResult res = {
319                     .type = ERTS_FLXCTR_GET_RESULT_AFTER_TRAP,
320                     .trap_resume_state = state_mref};
321                 return res;
322             }
323         }
324     } else {
325         ErtsFlxCtrSnapshotResult res;
326         int i;
327         res.type = ERTS_FLXCTR_DONE;
328         for (i = 0; i < c->nr_of_counters; i++){
329             res.result[i] = erts_flxctr_read_centralized(c, i);
330         }
331         return res;
332     }
333 }
334 
335 
erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,Uint counter_nr)336 Sint erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,
337                                             Uint counter_nr)
338 {
339     Binary* bin = erts_magic_ref2bin(result_holder);
340     DecentralizedReadSnapshotInfo* data = ERTS_MAGIC_BIN_DATA(bin);;
341     return data->result[counter_nr];
342 }
343 
erts_flxctr_is_snapshot_result(Eterm term)344 int erts_flxctr_is_snapshot_result(Eterm term)
345 {
346     if (is_internal_magic_ref(term)) {
347         Binary* bin = erts_magic_ref2bin(term);
348         return ERTS_MAGIC_BIN_DESTRUCTOR(bin) ==  erts_flxctr_read_ctx_bin_dtor;
349     } else return 0;
350 }
351 
erts_flxctr_read_approx(ErtsFlxCtr * c,Uint counter_nr)352 Sint erts_flxctr_read_approx(ErtsFlxCtr* c,
353                              Uint counter_nr)
354 {
355     if (c->is_decentralized) {
356         ErtsFlxCtrDecentralizedCtrArray* counter = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
357         Sint sum = 0;
358         int sched;
359         for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
360             sum = sum + erts_atomic_read_nob(&counter->array[sched].counters[counter_nr]);
361         }
362         return sum;
363     } else {
364         return erts_flxctr_read_centralized(c, counter_nr);
365     }
366 }
367 
erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr * c)368 int erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr* c)
369 {
370     return c->is_decentralized &&
371         (ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING !=
372          erts_atomic_read_acqb(&ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->snapshot_status));
373 }
374 
erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr * c,Process * p)375 int erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr* c, Process* p)
376 {
377     if (erts_flxctr_is_snapshot_ongoing(c)) {
378         suspend_until_thr_prg(p);
379         return 1;
380     } else {
381         return 0;
382     }
383 }
384 
erts_flxctr_reset(ErtsFlxCtr * c,Uint counter_nr)385 void erts_flxctr_reset(ErtsFlxCtr* c,
386                        Uint counter_nr)
387 {
388     if (c->is_decentralized) {
389         int sched;
390         ErtsFlxCtrDecentralizedCtrArray* counter =
391             ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
392         for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
393             erts_atomic_set_nob(&counter->array[sched].counters[counter_nr], 0);
394         }
395     } else {
396         erts_atomic_set_nob(&c->u.counters[counter_nr], 0);
397     }
398 }
399 
400 
erts_flxctr_set_slot(int group)401 void erts_flxctr_set_slot(int group)
402 {
403     ErtsSchedulerData *esdp = erts_get_scheduler_data();
404     esdp->flxctr_slot_no = group;
405 }
406 
erts_flxctr_debug_memory_usage(void)407 Sint erts_flxctr_debug_memory_usage(void)
408 {
409 #ifdef FLXCTR_MEM_DEBUG
410     return erts_atomic_read_mb(&debug_mem_usage);
411 #else
412     return -1;
413 #endif
414 }
415 
416 
417