1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2019. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Author: Kjell Winblad
23 */
24
25 #include "erl_flxctr.h"
26
27 static int reader_groups_array_size = 0;
28 #define ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS (reader_groups_array_size)
29
30 static int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin);
31 static int erts_flxctr_wait_dtor(Binary *context_bin);
32
33 typedef struct {
34 ErtsThrPrgrLaterOp later_op;
35 Process* process;
36 ErtsFlxCtrDecentralizedCtrArray* array;
37 ErtsFlxCtrDecentralizedCtrArray* next_array;
38 ErtsAlcType_t alloc_type;
39 int nr_of_counters;
40 Sint result[ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE];
41 } DecentralizedReadSnapshotInfo;
42
43 typedef enum {
44 ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING = 0,
45 ERTS_FLXCTR_SNAPSHOT_ONGOING = 1,
46 ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE = 2
47 } erts_flxctr_snapshot_status;
48
49 static void
thr_prg_wake_up_and_count(void * bin_p)50 thr_prg_wake_up_and_count(void* bin_p)
51 {
52 Binary* bin = bin_p;
53 DecentralizedReadSnapshotInfo* info = ERTS_MAGIC_BIN_DATA(bin);
54 Process* p = info->process;
55 ErtsFlxCtrDecentralizedCtrArray* array = info->array;
56 ErtsFlxCtrDecentralizedCtrArray* next = info->next_array;
57 int i, sched;
58 /* Reset result array */
59 for (i = 0; i < info->nr_of_counters; i++) {
60 info->result[i] = 0;
61 }
62 /* Read result from snapshot */
63 for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
64 for (i = 0; i < info->nr_of_counters; i++) {
65 info->result[i] = info->result[i] +
66 erts_atomic_read_nob(&array->array[sched].counters[i]);
67 }
68 }
69 /* Update the next decentralized counter array */
70 for (i = 0; i < info->nr_of_counters; i++) {
71 erts_atomic_add_nob(&next->array[0].counters[i], info->result[i]);
72 }
73 /* Announce that the snapshot is done */
74 {
75 Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
76 if (expected != erts_atomic_cmpxchg_mb(&next->snapshot_status,
77 ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING,
78 expected)) {
79 /* The CAS failed which means that this thread need to free the next array. */
80 erts_free(info->alloc_type, next->block_start);
81 }
82 }
83 /* Resume the process that requested the snapshot */
84 erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
85 if (!ERTS_PROC_IS_EXITING(p)) {
86 erts_resume(p, ERTS_PROC_LOCK_STATUS);
87 }
88 /* Free the memory that is no longer needed */
89 erts_free(info->alloc_type, array->block_start);
90 erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
91 erts_proc_dec_refc(p);
92 erts_bin_release(bin);
93 }
94
95 typedef struct {
96 ErtsThrPrgrLaterOp later_op;
97 Process* process;
98 } ErtsFlxCtrWakeUpLaterInfo;
99
100 static void
thr_prg_wake_up_later(void * bin_p)101 thr_prg_wake_up_later(void* bin_p)
102 {
103 Binary* bin = bin_p;
104 ErtsFlxCtrWakeUpLaterInfo* info = ERTS_MAGIC_BIN_DATA(bin);
105 Process* p = info->process;
106 /* Resume the requesting process */
107 erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
108 if (!ERTS_PROC_IS_EXITING(p)) {
109 erts_resume(p, ERTS_PROC_LOCK_STATUS);
110 }
111 erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
112 /* Free data */
113 erts_proc_dec_refc(p);
114 erts_bin_release(bin);
115 }
116
117 static
erts_flxctr_read_ctx_bin_dtor(Binary * context_bin)118 int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin) {
119 (void)context_bin;
120 return 1;
121 }
122
123 static
erts_flxctr_wait_dtor(Binary * context_bin)124 int erts_flxctr_wait_dtor(Binary *context_bin) {
125 (void)context_bin;
126 return 1;
127 }
128
suspend_until_thr_prg(Process * p)129 static void suspend_until_thr_prg(Process* p)
130 {
131 Binary* state_bin;
132 ErtsFlxCtrWakeUpLaterInfo* info;
133 state_bin = erts_create_magic_binary(sizeof(ErtsFlxCtrWakeUpLaterInfo),
134 erts_flxctr_wait_dtor);
135 info = ERTS_MAGIC_BIN_DATA(state_bin);
136 info->process = p;
137 erts_refc_inctest(&state_bin->intern.refc, 1);
138 erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
139 erts_proc_inc_refc(p);
140 ERTS_VBUMP_ALL_REDS(p);
141 erts_schedule_thr_prgr_later_op(thr_prg_wake_up_later, state_bin, &info->later_op);
142 }
143
144
145 static ErtsFlxCtrDecentralizedCtrArray*
create_decentralized_ctr_array(ErtsAlcType_t alloc_type,Uint nr_of_counters)146 create_decentralized_ctr_array(ErtsAlcType_t alloc_type, Uint nr_of_counters) {
147 /* Allocate an ErtsFlxCtrDecentralizedCtrArray and make sure that
148 the array field is located at the start of a cache line */
149 char* bytes =
150 erts_alloc(alloc_type,
151 sizeof(ErtsFlxCtrDecentralizedCtrArray) +
152 (sizeof(ErtsFlxCtrDecentralizedCtrArrayElem) *
153 ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS) +
154 ERTS_CACHE_LINE_SIZE);
155 void* block_start = bytes;
156 int bytes_to_next_cacheline_border;
157 ErtsFlxCtrDecentralizedCtrArray* array;
158 int i, sched;
159 bytes = &bytes[offsetof(ErtsFlxCtrDecentralizedCtrArray, array)];
160 bytes_to_next_cacheline_border =
161 ERTS_CACHE_LINE_SIZE - (((Uint)bytes) % ERTS_CACHE_LINE_SIZE);
162 array = (ErtsFlxCtrDecentralizedCtrArray*)
163 (&bytes[bytes_to_next_cacheline_border -
164 (int)offsetof(ErtsFlxCtrDecentralizedCtrArray, array)]);
165 ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
166 ASSERT(((Uint)array - (Uint)block_start) <= ERTS_CACHE_LINE_SIZE);
167 /* Initialize fields */
168 erts_atomic_init_nob(&array->snapshot_status, ERTS_FLXCTR_SNAPSHOT_ONGOING);
169 for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
170 for (i = 0; i < nr_of_counters; i++) {
171 erts_atomic_init_nob(&array->array[sched].counters[i], 0);
172 }
173 }
174 array->block_start = block_start;
175 return array;
176 }
177
erts_flxctr_setup(int decentralized_counter_groups)178 void erts_flxctr_setup(int decentralized_counter_groups)
179 {
180 reader_groups_array_size = decentralized_counter_groups+1;
181 }
182
erts_flxctr_init(ErtsFlxCtr * c,int is_decentralized,Uint nr_of_counters,ErtsAlcType_t alloc_type)183 void erts_flxctr_init(ErtsFlxCtr* c,
184 int is_decentralized,
185 Uint nr_of_counters,
186 ErtsAlcType_t alloc_type)
187 {
188 ASSERT(nr_of_counters <= ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE);
189 c->is_decentralized = is_decentralized;
190 c->nr_of_counters = nr_of_counters;
191 if (c->is_decentralized) {
192 ErtsFlxCtrDecentralizedCtrArray* array =
193 create_decentralized_ctr_array(alloc_type, nr_of_counters);
194 erts_atomic_set_nob(&array->snapshot_status,
195 ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING);
196 erts_atomic_init_nob(&c->u.counters_ptr, (Sint)array);
197 ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
198 } else {
199 int i;
200 for (i = 0; i < nr_of_counters; i++) {
201 erts_atomic_init_nob(&c->u.counters[i], 0);
202 }
203 }
204 }
205
erts_flxctr_destroy(ErtsFlxCtr * c,ErtsAlcType_t type)206 void erts_flxctr_destroy(ErtsFlxCtr* c, ErtsAlcType_t type)
207 {
208 if (c->is_decentralized) {
209 if (erts_flxctr_is_snapshot_ongoing(c)) {
210 ErtsFlxCtrDecentralizedCtrArray* array =
211 ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
212 /* Try to delegate the resposibilty of freeing to
213 thr_prg_wake_up_and_count */
214 Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
215 if (expected !=
216 erts_atomic_cmpxchg_mb(&array->snapshot_status,
217 ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE,
218 expected)) {
219 /* The delegation was unsuccessful which means that no
220 snapshot is ongoing anymore and the freeing needs
221 to be done here */
222 ERTS_ASSERT(!erts_flxctr_is_snapshot_ongoing(c));
223 erts_free(type, array->block_start);
224 }
225 } else {
226 erts_free(type, ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->block_start);
227 }
228 }
229 }
230
231 ErtsFlxCtrSnapshotResult
erts_flxctr_snapshot(ErtsFlxCtr * c,ErtsAlcType_t alloc_type,Process * p)232 erts_flxctr_snapshot(ErtsFlxCtr* c,
233 ErtsAlcType_t alloc_type,
234 Process* p)
235 {
236 if (c->is_decentralized) {
237 ErtsFlxCtrDecentralizedCtrArray* array = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
238 if (erts_flxctr_is_snapshot_ongoing(c)) {
239 /* Let the caller try again later */
240 ErtsFlxCtrSnapshotResult res =
241 {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
242 suspend_until_thr_prg(p);
243 return res;
244 } else {
245 Eterm* hp;
246 Binary* state_bin;
247 Eterm state_mref;
248 DecentralizedReadSnapshotInfo* info;
249 ErtsFlxCtrDecentralizedCtrArray* new_array =
250 create_decentralized_ctr_array(alloc_type, c->nr_of_counters);
251 int success =
252 ((Sint)array) == erts_atomic_cmpxchg_mb(&c->u.counters_ptr,
253 (Sint)new_array,
254 (Sint)array);
255 if (!success) {
256 /* Let the caller try again later */
257 ErtsFlxCtrSnapshotResult res =
258 {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
259 suspend_until_thr_prg(p);
260 erts_free(alloc_type, new_array->block_start);
261 return res;
262 }
263 /* Create binary with info about the operation that can be
264 sent to the caller and to a thread progress function */
265 state_bin =
266 erts_create_magic_binary(sizeof(DecentralizedReadSnapshotInfo),
267 erts_flxctr_read_ctx_bin_dtor);
268 hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
269 state_mref = erts_mk_magic_ref(&hp, &MSO(p), state_bin);
270 info = ERTS_MAGIC_BIN_DATA(state_bin);
271 info->alloc_type = alloc_type;
272 info->array = array;
273 info->next_array = new_array;
274 info->process = p;
275 info->nr_of_counters = c->nr_of_counters;
276 erts_proc_inc_refc(p);
277 erts_refc_inctest(&state_bin->intern.refc, 2);
278 erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
279 ERTS_VBUMP_ALL_REDS(p);
280 erts_schedule_thr_prgr_later_op(thr_prg_wake_up_and_count,
281 state_bin,
282 &info->later_op);
283 {
284 ErtsFlxCtrSnapshotResult res = {
285 .type = ERTS_FLXCTR_GET_RESULT_AFTER_TRAP,
286 .trap_resume_state = state_mref};
287 return res;
288 }
289 }
290 } else {
291 ErtsFlxCtrSnapshotResult res;
292 int i;
293 res.type = ERTS_FLXCTR_DONE;
294 for (i = 0; i < c->nr_of_counters; i++){
295 res.result[i] = erts_flxctr_read_centralized(c, i);
296 }
297 return res;
298 }
299 }
300
301
erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,Uint counter_nr)302 Sint erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,
303 Uint counter_nr)
304 {
305 Binary* bin = erts_magic_ref2bin(result_holder);
306 DecentralizedReadSnapshotInfo* data = ERTS_MAGIC_BIN_DATA(bin);;
307 return data->result[counter_nr];
308 }
309
erts_flxctr_is_snapshot_result(Eterm term)310 int erts_flxctr_is_snapshot_result(Eterm term)
311 {
312 if (is_internal_magic_ref(term)) {
313 Binary* bin = erts_magic_ref2bin(term);
314 return ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_flxctr_read_ctx_bin_dtor;
315 } else return 0;
316 }
317
erts_flxctr_read_approx(ErtsFlxCtr * c,Uint counter_nr)318 Sint erts_flxctr_read_approx(ErtsFlxCtr* c,
319 Uint counter_nr)
320 {
321 if (c->is_decentralized) {
322 ErtsFlxCtrDecentralizedCtrArray* counter = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
323 Sint sum = 0;
324 int sched;
325 for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
326 sum = sum + erts_atomic_read_nob(&counter->array[sched].counters[counter_nr]);
327 }
328 return sum;
329 } else {
330 return erts_flxctr_read_centralized(c, counter_nr);
331 }
332 }
333
erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr * c)334 int erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr* c)
335 {
336 return c->is_decentralized &&
337 (ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING !=
338 erts_atomic_read_acqb(&ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->snapshot_status));
339 }
340
erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr * c,Process * p)341 int erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr* c, Process* p)
342 {
343 if (erts_flxctr_is_snapshot_ongoing(c)) {
344 suspend_until_thr_prg(p);
345 return 1;
346 } else {
347 return 0;
348 }
349 }
350
erts_flxctr_reset(ErtsFlxCtr * c,Uint counter_nr)351 void erts_flxctr_reset(ErtsFlxCtr* c,
352 Uint counter_nr)
353 {
354 if (c->is_decentralized) {
355 int sched;
356 ErtsFlxCtrDecentralizedCtrArray* counter =
357 ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
358 for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
359 erts_atomic_set_nob(&counter->array[sched].counters[counter_nr], 0);
360 }
361 } else {
362 erts_atomic_set_nob(&c->u.counters[counter_nr], 0);
363 }
364 }
365
366
erts_flxctr_set_slot(int group)367 void erts_flxctr_set_slot(int group) {
368 ErtsSchedulerData *esdp = erts_get_scheduler_data();
369 esdp->flxctr_slot_no = group;
370 }
371