1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2019. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Author: Kjell Winblad
23  */
24 
25 #include "erl_flxctr.h"
26 
27 static int reader_groups_array_size = 0;
28 #define ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS (reader_groups_array_size)
29 
30 static int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin);
31 static int erts_flxctr_wait_dtor(Binary *context_bin);
32 
33 typedef struct {
34     ErtsThrPrgrLaterOp later_op;
35     Process* process;
36     ErtsFlxCtrDecentralizedCtrArray* array;
37     ErtsFlxCtrDecentralizedCtrArray* next_array;
38     ErtsAlcType_t alloc_type;
39     int nr_of_counters;
40     Sint result[ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE];
41 } DecentralizedReadSnapshotInfo;
42 
43 typedef enum {
44     ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING = 0,
45     ERTS_FLXCTR_SNAPSHOT_ONGOING = 1,
46     ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE = 2
47 } erts_flxctr_snapshot_status;
48 
49 static void
thr_prg_wake_up_and_count(void * bin_p)50 thr_prg_wake_up_and_count(void* bin_p)
51 {
52     Binary* bin = bin_p;
53     DecentralizedReadSnapshotInfo* info = ERTS_MAGIC_BIN_DATA(bin);
54     Process* p = info->process;
55     ErtsFlxCtrDecentralizedCtrArray* array = info->array;
56     ErtsFlxCtrDecentralizedCtrArray* next = info->next_array;
57     int i, sched;
58     /* Reset result array */
59     for (i = 0; i < info->nr_of_counters; i++) {
60         info->result[i] = 0;
61     }
62     /* Read result from snapshot */
63     for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
64         for (i = 0; i < info->nr_of_counters; i++) {
65             info->result[i] = info->result[i] +
66                 erts_atomic_read_nob(&array->array[sched].counters[i]);
67         }
68     }
69     /* Update the next decentralized counter array */
70     for (i = 0; i < info->nr_of_counters; i++) {
71         erts_atomic_add_nob(&next->array[0].counters[i], info->result[i]);
72     }
73     /* Announce that the snapshot is done */
74     {
75     Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
76     if (expected != erts_atomic_cmpxchg_mb(&next->snapshot_status,
77                                            ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING,
78                                            expected)) {
79         /* The CAS failed which means that this thread need to free the next array. */
80         erts_free(info->alloc_type, next->block_start);
81     }
82     }
83     /* Resume the process that requested the snapshot */
84     erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
85     if (!ERTS_PROC_IS_EXITING(p)) {
86         erts_resume(p, ERTS_PROC_LOCK_STATUS);
87     }
88     /* Free the memory that is no longer needed */
89     erts_free(info->alloc_type, array->block_start);
90     erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
91     erts_proc_dec_refc(p);
92     erts_bin_release(bin);
93 }
94 
95 typedef struct {
96     ErtsThrPrgrLaterOp later_op;
97     Process* process;
98 } ErtsFlxCtrWakeUpLaterInfo;
99 
100 static void
thr_prg_wake_up_later(void * bin_p)101 thr_prg_wake_up_later(void* bin_p)
102 {
103     Binary* bin = bin_p;
104     ErtsFlxCtrWakeUpLaterInfo* info = ERTS_MAGIC_BIN_DATA(bin);
105     Process* p = info->process;
106     /* Resume the requesting process */
107     erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
108     if (!ERTS_PROC_IS_EXITING(p)) {
109         erts_resume(p, ERTS_PROC_LOCK_STATUS);
110     }
111     erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
112     /* Free data */
113     erts_proc_dec_refc(p);
114     erts_bin_release(bin);
115 }
116 
117 static
erts_flxctr_read_ctx_bin_dtor(Binary * context_bin)118 int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin) {
119     (void)context_bin;
120     return 1;
121 }
122 
123 static
erts_flxctr_wait_dtor(Binary * context_bin)124 int erts_flxctr_wait_dtor(Binary *context_bin) {
125     (void)context_bin;
126     return 1;
127 }
128 
suspend_until_thr_prg(Process * p)129 static void suspend_until_thr_prg(Process* p)
130 {
131     Binary* state_bin;
132     ErtsFlxCtrWakeUpLaterInfo* info;
133     state_bin = erts_create_magic_binary(sizeof(ErtsFlxCtrWakeUpLaterInfo),
134                                          erts_flxctr_wait_dtor);
135     info = ERTS_MAGIC_BIN_DATA(state_bin);
136     info->process = p;
137     erts_refc_inctest(&state_bin->intern.refc, 1);
138     erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
139     erts_proc_inc_refc(p);
140     ERTS_VBUMP_ALL_REDS(p);
141     erts_schedule_thr_prgr_later_op(thr_prg_wake_up_later, state_bin, &info->later_op);
142 }
143 
144 
145 static ErtsFlxCtrDecentralizedCtrArray*
create_decentralized_ctr_array(ErtsAlcType_t alloc_type,Uint nr_of_counters)146 create_decentralized_ctr_array(ErtsAlcType_t alloc_type, Uint nr_of_counters) {
147     /* Allocate an ErtsFlxCtrDecentralizedCtrArray and make sure that
148        the array field is located at the start of a cache line */
149     char* bytes =
150         erts_alloc(alloc_type,
151                    sizeof(ErtsFlxCtrDecentralizedCtrArray) +
152                    (sizeof(ErtsFlxCtrDecentralizedCtrArrayElem) *
153                     ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS) +
154                    ERTS_CACHE_LINE_SIZE);
155     void* block_start = bytes;
156     int bytes_to_next_cacheline_border;
157     ErtsFlxCtrDecentralizedCtrArray* array;
158     int i, sched;
159     bytes = &bytes[offsetof(ErtsFlxCtrDecentralizedCtrArray, array)];
160     bytes_to_next_cacheline_border =
161         ERTS_CACHE_LINE_SIZE - (((Uint)bytes) % ERTS_CACHE_LINE_SIZE);
162     array = (ErtsFlxCtrDecentralizedCtrArray*)
163         (&bytes[bytes_to_next_cacheline_border -
164                 (int)offsetof(ErtsFlxCtrDecentralizedCtrArray, array)]);
165     ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
166     ASSERT(((Uint)array - (Uint)block_start) <= ERTS_CACHE_LINE_SIZE);
167     /* Initialize fields */
168     erts_atomic_init_nob(&array->snapshot_status, ERTS_FLXCTR_SNAPSHOT_ONGOING);
169     for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
170         for (i = 0; i < nr_of_counters; i++) {
171             erts_atomic_init_nob(&array->array[sched].counters[i], 0);
172         }
173     }
174     array->block_start = block_start;
175     return array;
176 }
177 
erts_flxctr_setup(int decentralized_counter_groups)178 void erts_flxctr_setup(int decentralized_counter_groups)
179 {
180     reader_groups_array_size = decentralized_counter_groups+1;
181 }
182 
erts_flxctr_init(ErtsFlxCtr * c,int is_decentralized,Uint nr_of_counters,ErtsAlcType_t alloc_type)183 void erts_flxctr_init(ErtsFlxCtr* c,
184                       int is_decentralized,
185                       Uint nr_of_counters,
186                       ErtsAlcType_t alloc_type)
187 {
188     ASSERT(nr_of_counters <= ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE);
189     c->is_decentralized = is_decentralized;
190     c->nr_of_counters = nr_of_counters;
191     if (c->is_decentralized) {
192         ErtsFlxCtrDecentralizedCtrArray* array =
193             create_decentralized_ctr_array(alloc_type, nr_of_counters);
194         erts_atomic_set_nob(&array->snapshot_status,
195                             ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING);
196         erts_atomic_init_nob(&c->u.counters_ptr, (Sint)array);
197         ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
198     } else {
199         int i;
200         for (i = 0; i < nr_of_counters; i++) {
201             erts_atomic_init_nob(&c->u.counters[i], 0);
202         }
203     }
204 }
205 
erts_flxctr_destroy(ErtsFlxCtr * c,ErtsAlcType_t type)206 void erts_flxctr_destroy(ErtsFlxCtr* c, ErtsAlcType_t type)
207 {
208     if (c->is_decentralized) {
209         if (erts_flxctr_is_snapshot_ongoing(c)) {
210             ErtsFlxCtrDecentralizedCtrArray* array =
211                 ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
212             /* Try to delegate the resposibilty of freeing to
213                thr_prg_wake_up_and_count */
214             Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
215             if (expected !=
216                 erts_atomic_cmpxchg_mb(&array->snapshot_status,
217                                        ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE,
218                                        expected)) {
219                 /* The delegation was unsuccessful which means that no
220                    snapshot is ongoing anymore and the freeing needs
221                    to be done here */
222                 ERTS_ASSERT(!erts_flxctr_is_snapshot_ongoing(c));
223                 erts_free(type, array->block_start);
224             }
225         } else {
226             erts_free(type, ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->block_start);
227         }
228     }
229 }
230 
231 ErtsFlxCtrSnapshotResult
erts_flxctr_snapshot(ErtsFlxCtr * c,ErtsAlcType_t alloc_type,Process * p)232 erts_flxctr_snapshot(ErtsFlxCtr* c,
233                      ErtsAlcType_t alloc_type,
234                      Process* p)
235 {
236     if (c->is_decentralized) {
237         ErtsFlxCtrDecentralizedCtrArray* array = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
238         if (erts_flxctr_is_snapshot_ongoing(c)) {
239             /* Let the caller try again later */
240             ErtsFlxCtrSnapshotResult res =
241                 {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
242             suspend_until_thr_prg(p);
243             return res;
244         } else {
245             Eterm* hp;
246             Binary* state_bin;
247             Eterm state_mref;
248             DecentralizedReadSnapshotInfo* info;
249             ErtsFlxCtrDecentralizedCtrArray* new_array =
250                 create_decentralized_ctr_array(alloc_type, c->nr_of_counters);
251             int success =
252                 ((Sint)array) == erts_atomic_cmpxchg_mb(&c->u.counters_ptr,
253                                                         (Sint)new_array,
254                                                         (Sint)array);
255             if (!success) {
256                 /* Let the caller try again later */
257                 ErtsFlxCtrSnapshotResult res =
258                     {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
259                 suspend_until_thr_prg(p);
260                 erts_free(alloc_type, new_array->block_start);
261                 return res;
262             }
263             /* Create binary with info about the operation that can be
264                sent to the caller and to a thread progress function */
265             state_bin =
266                 erts_create_magic_binary(sizeof(DecentralizedReadSnapshotInfo),
267                                          erts_flxctr_read_ctx_bin_dtor);
268             hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
269             state_mref = erts_mk_magic_ref(&hp, &MSO(p), state_bin);
270             info = ERTS_MAGIC_BIN_DATA(state_bin);
271             info->alloc_type = alloc_type;
272             info->array = array;
273             info->next_array = new_array;
274             info->process = p;
275             info->nr_of_counters = c->nr_of_counters;
276             erts_proc_inc_refc(p);
277             erts_refc_inctest(&state_bin->intern.refc, 2);
278             erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
279             ERTS_VBUMP_ALL_REDS(p);
280             erts_schedule_thr_prgr_later_op(thr_prg_wake_up_and_count,
281                                             state_bin,
282                                             &info->later_op);
283             {
284                 ErtsFlxCtrSnapshotResult res = {
285                     .type = ERTS_FLXCTR_GET_RESULT_AFTER_TRAP,
286                     .trap_resume_state = state_mref};
287                 return res;
288             }
289         }
290     } else {
291         ErtsFlxCtrSnapshotResult res;
292         int i;
293         res.type = ERTS_FLXCTR_DONE;
294         for (i = 0; i < c->nr_of_counters; i++){
295             res.result[i] = erts_flxctr_read_centralized(c, i);
296         }
297         return res;
298     }
299 }
300 
301 
erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,Uint counter_nr)302 Sint erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,
303                                             Uint counter_nr)
304 {
305     Binary* bin = erts_magic_ref2bin(result_holder);
306     DecentralizedReadSnapshotInfo* data = ERTS_MAGIC_BIN_DATA(bin);;
307     return data->result[counter_nr];
308 }
309 
erts_flxctr_is_snapshot_result(Eterm term)310 int erts_flxctr_is_snapshot_result(Eterm term)
311 {
312     if (is_internal_magic_ref(term)) {
313         Binary* bin = erts_magic_ref2bin(term);
314         return ERTS_MAGIC_BIN_DESTRUCTOR(bin) ==  erts_flxctr_read_ctx_bin_dtor;
315     } else return 0;
316 }
317 
erts_flxctr_read_approx(ErtsFlxCtr * c,Uint counter_nr)318 Sint erts_flxctr_read_approx(ErtsFlxCtr* c,
319                              Uint counter_nr)
320 {
321     if (c->is_decentralized) {
322         ErtsFlxCtrDecentralizedCtrArray* counter = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
323         Sint sum = 0;
324         int sched;
325         for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
326             sum = sum + erts_atomic_read_nob(&counter->array[sched].counters[counter_nr]);
327         }
328         return sum;
329     } else {
330         return erts_flxctr_read_centralized(c, counter_nr);
331     }
332 }
333 
erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr * c)334 int erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr* c)
335 {
336     return c->is_decentralized &&
337         (ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING !=
338          erts_atomic_read_acqb(&ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->snapshot_status));
339 }
340 
erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr * c,Process * p)341 int erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr* c, Process* p)
342 {
343     if (erts_flxctr_is_snapshot_ongoing(c)) {
344         suspend_until_thr_prg(p);
345         return 1;
346     } else {
347         return 0;
348     }
349 }
350 
erts_flxctr_reset(ErtsFlxCtr * c,Uint counter_nr)351 void erts_flxctr_reset(ErtsFlxCtr* c,
352                        Uint counter_nr)
353 {
354     if (c->is_decentralized) {
355         int sched;
356         ErtsFlxCtrDecentralizedCtrArray* counter =
357             ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
358         for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
359             erts_atomic_set_nob(&counter->array[sched].counters[counter_nr], 0);
360         }
361     } else {
362         erts_atomic_set_nob(&c->u.counters[counter_nr], 0);
363     }
364 }
365 
366 
erts_flxctr_set_slot(int group)367 void erts_flxctr_set_slot(int group) {
368     ErtsSchedulerData *esdp = erts_get_scheduler_data();
369     esdp->flxctr_slot_no = group;
370 }
371