1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2019. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Author: Kjell Winblad
23 */
24
25 #include "erl_flxctr.h"
26
27 static int reader_groups_array_size = 0;
28 #define ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS (reader_groups_array_size)
29
30 static int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin);
31 static int erts_flxctr_wait_dtor(Binary *context_bin);
32
33 typedef struct {
34 ErtsThrPrgrLaterOp later_op;
35 Process* process;
36 ErtsFlxCtrDecentralizedCtrArray* array;
37 ErtsFlxCtrDecentralizedCtrArray* next_array;
38 ErtsAlcType_t alloc_type;
39 int nr_of_counters;
40 Sint result[ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE];
41 } DecentralizedReadSnapshotInfo;
42
43 typedef enum {
44 ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING = 0,
45 ERTS_FLXCTR_SNAPSHOT_ONGOING = 1,
46 ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE = 2
47 } erts_flxctr_snapshot_status;
48
49 #define ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE \
50 (sizeof(ErtsFlxCtrDecentralizedCtrArray) + \
51 (sizeof(ErtsFlxCtrDecentralizedCtrArrayElem) * \
52 ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS) + \
53 ERTS_CACHE_LINE_SIZE)
54
55 #ifdef DEBUG
56 #define FLXCTR_MEM_DEBUG 1
57 #endif
58
59 #ifdef FLXCTR_MEM_DEBUG
60 static erts_atomic_t debug_mem_usage;
61 #endif
62
63 #ifdef FLXCTR_MEM_DEBUG
64 #define FLXCTR_FREE(ALLOC_TYPE, ADDRESS) do { \
65 erts_free(ALLOC_TYPE, ADDRESS); \
66 erts_atomic_add_mb(&debug_mem_usage, -ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE); \
67 } while(0)
68 #else
69 #define FLXCTR_FREE(ALLOC_TYPE, ADDRESS) erts_free(ALLOC_TYPE, ADDRESS)
70 #endif
71
72 static void
thr_prg_wake_up_and_count(void * bin_p)73 thr_prg_wake_up_and_count(void* bin_p)
74 {
75 Binary* bin = bin_p;
76 DecentralizedReadSnapshotInfo* info = ERTS_MAGIC_BIN_DATA(bin);
77 Process* p = info->process;
78 ErtsFlxCtrDecentralizedCtrArray* array = info->array;
79 ErtsFlxCtrDecentralizedCtrArray* next = info->next_array;
80 int i, sched;
81 /* Reset result array */
82 for (i = 0; i < info->nr_of_counters; i++) {
83 info->result[i] = 0;
84 }
85 /* Read result from snapshot */
86 for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
87 for (i = 0; i < info->nr_of_counters; i++) {
88 info->result[i] = info->result[i] +
89 erts_atomic_read_nob(&array->array[sched].counters[i]);
90 }
91 }
92 /* Update the next decentralized counter array */
93 for (i = 0; i < info->nr_of_counters; i++) {
94 erts_atomic_add_nob(&next->array[0].counters[i], info->result[i]);
95 }
96 /* Announce that the snapshot is done */
97 {
98 Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
99 if (expected != erts_atomic_cmpxchg_mb(&next->snapshot_status,
100 ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING,
101 expected)) {
102 /* The CAS failed which means that this thread need to free the next array. */
103 FLXCTR_FREE(info->alloc_type, next->block_start);
104 }
105 }
106 /* Resume the process that requested the snapshot */
107 erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
108 if (!ERTS_PROC_IS_EXITING(p)) {
109 erts_resume(p, ERTS_PROC_LOCK_STATUS);
110 }
111 /* Free the memory that is no longer needed */
112 FLXCTR_FREE(info->alloc_type, array->block_start);
113 erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
114 erts_proc_dec_refc(p);
115 erts_bin_release(bin);
116 }
117
118 typedef struct {
119 ErtsThrPrgrLaterOp later_op;
120 Process* process;
121 } ErtsFlxCtrWakeUpLaterInfo;
122
123 static void
thr_prg_wake_up_later(void * bin_p)124 thr_prg_wake_up_later(void* bin_p)
125 {
126 Binary* bin = bin_p;
127 ErtsFlxCtrWakeUpLaterInfo* info = ERTS_MAGIC_BIN_DATA(bin);
128 Process* p = info->process;
129 /* Resume the requesting process */
130 erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
131 if (!ERTS_PROC_IS_EXITING(p)) {
132 erts_resume(p, ERTS_PROC_LOCK_STATUS);
133 }
134 erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
135 /* Free data */
136 erts_proc_dec_refc(p);
137 erts_bin_release(bin);
138 }
139
140 static
erts_flxctr_read_ctx_bin_dtor(Binary * context_bin)141 int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin) {
142 (void)context_bin;
143 return 1;
144 }
145
146 static
erts_flxctr_wait_dtor(Binary * context_bin)147 int erts_flxctr_wait_dtor(Binary *context_bin) {
148 (void)context_bin;
149 return 1;
150 }
151
suspend_until_thr_prg(Process * p)152 static void suspend_until_thr_prg(Process* p)
153 {
154 Binary* state_bin;
155 ErtsFlxCtrWakeUpLaterInfo* info;
156 state_bin = erts_create_magic_binary(sizeof(ErtsFlxCtrWakeUpLaterInfo),
157 erts_flxctr_wait_dtor);
158 info = ERTS_MAGIC_BIN_DATA(state_bin);
159 info->process = p;
160 erts_refc_inctest(&state_bin->intern.refc, 1);
161 erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
162 erts_proc_inc_refc(p);
163 ERTS_VBUMP_ALL_REDS(p);
164 erts_schedule_thr_prgr_later_op(thr_prg_wake_up_later, state_bin, &info->later_op);
165 }
166
erts_flxctr_nr_of_allocated_bytes(ErtsFlxCtr * c)167 size_t erts_flxctr_nr_of_allocated_bytes(ErtsFlxCtr* c)
168 {
169 if (c->is_decentralized) {
170 return ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE;
171 } else {
172 return 0;
173 }
174 }
175
176 static ErtsFlxCtrDecentralizedCtrArray*
create_decentralized_ctr_array(ErtsAlcType_t alloc_type,Uint nr_of_counters)177 create_decentralized_ctr_array(ErtsAlcType_t alloc_type, Uint nr_of_counters) {
178 /* Allocate an ErtsFlxCtrDecentralizedCtrArray and make sure that
179 the array field is located at the start of a cache line */
180 char* bytes =
181 erts_alloc(alloc_type,
182 ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE);
183 void* block_start = bytes;
184 int bytes_to_next_cacheline_border;
185 ErtsFlxCtrDecentralizedCtrArray* array;
186 int i, sched;
187 #ifdef FLXCTR_MEM_DEBUG
188 erts_atomic_add_mb(&debug_mem_usage, ERTS_FLXCTR_DECENTRALIZED_COUNTER_ARRAY_SIZE);
189 #endif
190 bytes = &bytes[offsetof(ErtsFlxCtrDecentralizedCtrArray, array)];
191 bytes_to_next_cacheline_border =
192 ERTS_CACHE_LINE_SIZE - (((Uint)bytes) % ERTS_CACHE_LINE_SIZE);
193 array = (ErtsFlxCtrDecentralizedCtrArray*)
194 (&bytes[bytes_to_next_cacheline_border -
195 (int)offsetof(ErtsFlxCtrDecentralizedCtrArray, array)]);
196 ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
197 ASSERT(((Uint)array - (Uint)block_start) <= ERTS_CACHE_LINE_SIZE);
198 /* Initialize fields */
199 erts_atomic_init_nob(&array->snapshot_status, ERTS_FLXCTR_SNAPSHOT_ONGOING);
200 for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
201 for (i = 0; i < nr_of_counters; i++) {
202 erts_atomic_init_nob(&array->array[sched].counters[i], 0);
203 }
204 }
205 array->block_start = block_start;
206 return array;
207 }
208
erts_flxctr_setup(int decentralized_counter_groups)209 void erts_flxctr_setup(int decentralized_counter_groups)
210 {
211 reader_groups_array_size = decentralized_counter_groups+1;
212 #ifdef FLXCTR_MEM_DEBUG
213 erts_atomic_init_mb(&debug_mem_usage, 0);
214 #endif
215 }
216
erts_flxctr_init(ErtsFlxCtr * c,int is_decentralized,Uint nr_of_counters,ErtsAlcType_t alloc_type)217 void erts_flxctr_init(ErtsFlxCtr* c,
218 int is_decentralized,
219 Uint nr_of_counters,
220 ErtsAlcType_t alloc_type)
221 {
222 ASSERT(nr_of_counters <= ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE);
223 c->is_decentralized = is_decentralized;
224 c->nr_of_counters = nr_of_counters;
225 if (c->is_decentralized) {
226 ErtsFlxCtrDecentralizedCtrArray* array =
227 create_decentralized_ctr_array(alloc_type, nr_of_counters);
228 erts_atomic_set_nob(&array->snapshot_status,
229 ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING);
230 erts_atomic_init_nob(&c->u.counters_ptr, (Sint)array);
231 ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
232 } else {
233 int i;
234 for (i = 0; i < nr_of_counters; i++) {
235 erts_atomic_init_nob(&c->u.counters[i], 0);
236 }
237 }
238 }
239
erts_flxctr_destroy(ErtsFlxCtr * c,ErtsAlcType_t alloc_type)240 void erts_flxctr_destroy(ErtsFlxCtr* c, ErtsAlcType_t alloc_type)
241 {
242 if (c->is_decentralized) {
243 if (erts_flxctr_is_snapshot_ongoing(c)) {
244 ErtsFlxCtrDecentralizedCtrArray* array =
245 ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
246 /* Try to delegate the resposibilty of freeing to
247 thr_prg_wake_up_and_count */
248 Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
249 if (expected !=
250 erts_atomic_cmpxchg_mb(&array->snapshot_status,
251 ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE,
252 expected)) {
253 /* The delegation was unsuccessful which means that no
254 snapshot is ongoing anymore and the freeing needs
255 to be done here */
256 ERTS_ASSERT(!erts_flxctr_is_snapshot_ongoing(c));
257 FLXCTR_FREE(alloc_type, array->block_start);
258 }
259 } else {
260 FLXCTR_FREE(alloc_type, ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->block_start);
261 }
262 }
263 }
264
265 ErtsFlxCtrSnapshotResult
erts_flxctr_snapshot(ErtsFlxCtr * c,ErtsAlcType_t alloc_type,Process * p)266 erts_flxctr_snapshot(ErtsFlxCtr* c,
267 ErtsAlcType_t alloc_type,
268 Process* p)
269 {
270 if (c->is_decentralized) {
271 ErtsFlxCtrDecentralizedCtrArray* array = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
272 if (erts_flxctr_is_snapshot_ongoing(c)) {
273 /* Let the caller try again later */
274 ErtsFlxCtrSnapshotResult res =
275 {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
276 suspend_until_thr_prg(p);
277 return res;
278 } else {
279 Eterm* hp;
280 Binary* state_bin;
281 Eterm state_mref;
282 DecentralizedReadSnapshotInfo* info;
283 ErtsFlxCtrDecentralizedCtrArray* new_array =
284 create_decentralized_ctr_array(alloc_type, c->nr_of_counters);
285 int success =
286 ((Sint)array) == erts_atomic_cmpxchg_mb(&c->u.counters_ptr,
287 (Sint)new_array,
288 (Sint)array);
289 if (!success) {
290 /* Let the caller try again later */
291 ErtsFlxCtrSnapshotResult res =
292 {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
293 suspend_until_thr_prg(p);
294 FLXCTR_FREE(alloc_type, new_array->block_start);
295 return res;
296 }
297 /* Create binary with info about the operation that can be
298 sent to the caller and to a thread progress function */
299 state_bin =
300 erts_create_magic_binary(sizeof(DecentralizedReadSnapshotInfo),
301 erts_flxctr_read_ctx_bin_dtor);
302 hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
303 state_mref = erts_mk_magic_ref(&hp, &MSO(p), state_bin);
304 info = ERTS_MAGIC_BIN_DATA(state_bin);
305 info->alloc_type = alloc_type;
306 info->array = array;
307 info->next_array = new_array;
308 info->process = p;
309 info->nr_of_counters = c->nr_of_counters;
310 erts_proc_inc_refc(p);
311 erts_refc_inctest(&state_bin->intern.refc, 2);
312 erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
313 ERTS_VBUMP_ALL_REDS(p);
314 erts_schedule_thr_prgr_later_op(thr_prg_wake_up_and_count,
315 state_bin,
316 &info->later_op);
317 {
318 ErtsFlxCtrSnapshotResult res = {
319 .type = ERTS_FLXCTR_GET_RESULT_AFTER_TRAP,
320 .trap_resume_state = state_mref};
321 return res;
322 }
323 }
324 } else {
325 ErtsFlxCtrSnapshotResult res;
326 int i;
327 res.type = ERTS_FLXCTR_DONE;
328 for (i = 0; i < c->nr_of_counters; i++){
329 res.result[i] = erts_flxctr_read_centralized(c, i);
330 }
331 return res;
332 }
333 }
334
335
erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,Uint counter_nr)336 Sint erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,
337 Uint counter_nr)
338 {
339 Binary* bin = erts_magic_ref2bin(result_holder);
340 DecentralizedReadSnapshotInfo* data = ERTS_MAGIC_BIN_DATA(bin);;
341 return data->result[counter_nr];
342 }
343
erts_flxctr_is_snapshot_result(Eterm term)344 int erts_flxctr_is_snapshot_result(Eterm term)
345 {
346 if (is_internal_magic_ref(term)) {
347 Binary* bin = erts_magic_ref2bin(term);
348 return ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_flxctr_read_ctx_bin_dtor;
349 } else return 0;
350 }
351
erts_flxctr_read_approx(ErtsFlxCtr * c,Uint counter_nr)352 Sint erts_flxctr_read_approx(ErtsFlxCtr* c,
353 Uint counter_nr)
354 {
355 if (c->is_decentralized) {
356 ErtsFlxCtrDecentralizedCtrArray* counter = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
357 Sint sum = 0;
358 int sched;
359 for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
360 sum = sum + erts_atomic_read_nob(&counter->array[sched].counters[counter_nr]);
361 }
362 return sum;
363 } else {
364 return erts_flxctr_read_centralized(c, counter_nr);
365 }
366 }
367
erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr * c)368 int erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr* c)
369 {
370 return c->is_decentralized &&
371 (ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING !=
372 erts_atomic_read_acqb(&ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->snapshot_status));
373 }
374
erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr * c,Process * p)375 int erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr* c, Process* p)
376 {
377 if (erts_flxctr_is_snapshot_ongoing(c)) {
378 suspend_until_thr_prg(p);
379 return 1;
380 } else {
381 return 0;
382 }
383 }
384
erts_flxctr_reset(ErtsFlxCtr * c,Uint counter_nr)385 void erts_flxctr_reset(ErtsFlxCtr* c,
386 Uint counter_nr)
387 {
388 if (c->is_decentralized) {
389 int sched;
390 ErtsFlxCtrDecentralizedCtrArray* counter =
391 ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
392 for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
393 erts_atomic_set_nob(&counter->array[sched].counters[counter_nr], 0);
394 }
395 } else {
396 erts_atomic_set_nob(&c->u.counters[counter_nr], 0);
397 }
398 }
399
400
erts_flxctr_set_slot(int group)401 void erts_flxctr_set_slot(int group)
402 {
403 ErtsSchedulerData *esdp = erts_get_scheduler_data();
404 esdp->flxctr_slot_no = group;
405 }
406
erts_flxctr_debug_memory_usage(void)407 Sint erts_flxctr_debug_memory_usage(void)
408 {
409 #ifdef FLXCTR_MEM_DEBUG
410 return erts_atomic_read_mb(&debug_mem_usage);
411 #else
412 return -1;
413 #endif
414 }
415
416
417