1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2011-2018. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Description: Scheduler specific pre-allocators. Each scheduler
23  *              thread allocates memory in its own private chunk of
24  *              memory. Memory blocks deallocated by remote
25  *              schedulers (or other threads) are passed back to
26  *              the chunk owner via a lock-free data structure.
27  *
28  * Author: 	Rickard Green
29  */
30 
31 #ifdef HAVE_CONFIG_H
32 #  include "config.h"
33 #endif
34 
35 
36 #include "erl_process.h"
37 #include "erl_thr_progress.h"
38 
39 erts_sspa_data_t *
erts_sspa_create(size_t blk_sz,int pa_size,int nthreads,const char * name)40 erts_sspa_create(size_t blk_sz, int pa_size, int nthreads, const char* name)
41 {
42     erts_sspa_data_t *data;
43     size_t tot_size;
44     size_t chunk_mem_size;
45     char *p;
46     char *chunk_start;
47     int cix;
48     int no_blocks = pa_size;
49     int no_blocks_per_chunk;
50     size_t aligned_blk_sz;
51 
52 #if !defined(ERTS_STRUCTURE_ALIGNED_ALLOC)
53     /* Force 64-bit alignment... */
54     aligned_blk_sz = ((blk_sz - 1) / 8) * 8 + 8;
55 #else
56     /* Alignment of structure is enough... */
57     aligned_blk_sz = blk_sz;
58 #endif
59 
60     if (!name) { /* schedulers only variant */
61         ASSERT(!nthreads);
62         nthreads = erts_no_schedulers;
63     }
64     else {
65         ASSERT(nthreads > 0);
66     }
67 
68     if (nthreads == 1)
69 	no_blocks_per_chunk = no_blocks;
70     else {
71 	int extra = (no_blocks - 1)/4 + 1;
72 	if (extra == 0)
73 	    extra = 1;
74 	no_blocks_per_chunk = no_blocks;
75 	no_blocks_per_chunk += extra * nthreads;
76 	no_blocks_per_chunk /= nthreads;
77     }
78     no_blocks = no_blocks_per_chunk * nthreads;
79     chunk_mem_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_sspa_chunk_header_t));
80     chunk_mem_size += aligned_blk_sz * no_blocks_per_chunk;
81     chunk_mem_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(chunk_mem_size);
82     tot_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_sspa_data_t));
83     tot_size += chunk_mem_size * nthreads;
84 
85     p = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_PRE_ALLOC_DATA, tot_size);
86     data = (erts_sspa_data_t *) p;
87     p += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_sspa_data_t));
88     chunk_start = p;
89 
90     data->chunks_mem_size = chunk_mem_size;
91     data->start = chunk_start;
92     data->end = chunk_start + chunk_mem_size * nthreads;
93     data->nthreads = nthreads;
94 
95     if (name) { /* thread variant */
96         erts_tsd_key_create(&data->tsd_key, (char*)name);
97         erts_atomic_init_nob(&data->id_generator, 0);
98     }
99 
100     /* Initialize all chunks */
101     for (cix = 0; cix < nthreads; cix++) {
102 	erts_sspa_chunk_t *chnk = erts_sspa_cix2chunk(data, cix);
103 	erts_sspa_chunk_header_t *chdr = &chnk->aligned.header;
104 	erts_sspa_blk_t *blk;
105 	int i;
106 
107 	erts_atomic_init_nob(&chdr->tail.data.last, (erts_aint_t) &chdr->tail.data.marker);
108 	erts_atomic_init_nob(&chdr->tail.data.marker.next_atmc, ERTS_AINT_NULL);
109 	erts_atomic_init_nob(&chdr->tail.data.um_refc[0], 0);
110 	erts_atomic_init_nob(&chdr->tail.data.um_refc[1], 0);
111 	erts_atomic32_init_nob(&chdr->tail.data.um_refc_ix, 0);
112 
113 	chdr->head.no_thr_progress_check = 0;
114 	chdr->head.used_marker = 1;
115 	chdr->head.first = &chdr->tail.data.marker;
116 	chdr->head.unref_end = &chdr->tail.data.marker;
117 	chdr->head.next.thr_progress = erts_thr_progress_current();
118 	chdr->head.next.thr_progress_reached = 1;
119 	chdr->head.next.um_refc_ix = 1;
120 	chdr->head.next.unref_end = &chdr->tail.data.marker;
121 
122 	p = &chnk->data[0];
123 	chdr->local.first = (erts_sspa_blk_t *) p;
124 	blk = (erts_sspa_blk_t *) p;
125 	for (i = 0; i < no_blocks_per_chunk; i++) {
126 	    blk = (erts_sspa_blk_t *) p;
127 	    p += aligned_blk_sz;
128 	    blk->next_ptr = (erts_sspa_blk_t *) p;
129 	}
130 
131 	blk->next_ptr = NULL;
132 	chdr->local.last = blk;
133 	chdr->local.cnt = no_blocks_per_chunk;
134 	chdr->local.lim = no_blocks_per_chunk / 3;
135 
136 	ERTS_SSPA_DBG_CHK_LCL(chdr);
137     }
138 
139     return data;
140 }
141 
142 static ERTS_INLINE void
enqueue_remote_managed_thread(erts_sspa_chunk_header_t * chdr,erts_sspa_blk_t * this,int cinit)143 enqueue_remote_managed_thread(erts_sspa_chunk_header_t *chdr,
144 			      erts_sspa_blk_t *this,
145 			      int cinit)
146 {
147     erts_aint_t itmp;
148     erts_sspa_blk_t *enq;
149 
150     erts_atomic_init_nob(&this->next_atmc, ERTS_AINT_NULL);
151     /* Enqueue at end of list... */
152 
153     enq = (erts_sspa_blk_t *) erts_atomic_read_nob(&chdr->tail.data.last);
154     itmp = erts_atomic_cmpxchg_relb(&enq->next_atmc,
155 				    (erts_aint_t) this,
156 				    ERTS_AINT_NULL);
157     if (itmp == ERTS_AINT_NULL) {
158 	/* We are required to move last pointer */
159 #ifdef DEBUG
160 	ASSERT(ERTS_AINT_NULL == erts_atomic_read_nob(&this->next_atmc));
161 	ASSERT(((erts_aint_t) enq)
162 	       == erts_atomic_xchg_relb(&chdr->tail.data.last,
163 				      (erts_aint_t) this));
164 #else
165 	erts_atomic_set_relb(&chdr->tail.data.last, (erts_aint_t) this);
166 #endif
167     }
168     else {
169 	/*
170 	 * We *need* to insert element somewhere in between the
171 	 * last element we read earlier and the actual last element.
172 	 */
173 	int i = cinit;
174 
175 	while (1) {
176 	    erts_aint_t itmp2;
177 	    erts_atomic_set_nob(&this->next_atmc, itmp);
178 	    itmp2 = erts_atomic_cmpxchg_relb(&enq->next_atmc,
179 					     (erts_aint_t) this,
180 					     itmp);
181 	    if (itmp == itmp2)
182 		break; /* inserted this */
183 	    if ((i & 1) == 0)
184 		itmp = itmp2;
185 	    else {
186 		enq = (erts_sspa_blk_t *) itmp2;
187 		itmp = erts_atomic_read_acqb(&enq->next_atmc);
188 		ASSERT(itmp != ERTS_AINT_NULL);
189 	    }
190 	    i++;
191 	}
192     }
193 }
194 
195 static ERTS_INLINE erts_aint_t
check_insert_marker(erts_sspa_chunk_header_t * chdr,erts_aint_t ilast)196 check_insert_marker(erts_sspa_chunk_header_t *chdr, erts_aint_t ilast)
197 {
198     if (!chdr->head.used_marker
199 	&& chdr->head.unref_end == (erts_sspa_blk_t *) ilast) {
200 	erts_aint_t itmp;
201 	erts_sspa_blk_t *last = (erts_sspa_blk_t *) ilast;
202 
203 	erts_atomic_init_nob(&chdr->tail.data.marker.next_atmc, ERTS_AINT_NULL);
204 	itmp = erts_atomic_cmpxchg_relb(&last->next_atmc,
205 					(erts_aint_t) &chdr->tail.data.marker,
206 					ERTS_AINT_NULL);
207 	if (itmp == ERTS_AINT_NULL) {
208 	    ilast = (erts_aint_t) &chdr->tail.data.marker;
209 	    chdr->head.used_marker = !0;
210 	    erts_atomic_set_relb(&chdr->tail.data.last, ilast);
211 	}
212     }
213     return ilast;
214 }
215 
216 void
erts_sspa_remote_free(erts_sspa_chunk_header_t * chdr,erts_sspa_blk_t * blk,int cinit)217 erts_sspa_remote_free(erts_sspa_chunk_header_t *chdr,
218 		      erts_sspa_blk_t *blk,
219 		      int cinit)
220 {
221     int um_refc_ix = 0;
222     int managed_thread = erts_thr_progress_is_managed_thread();
223     if (!managed_thread) {
224 	um_refc_ix = erts_atomic32_read_acqb(&chdr->tail.data.um_refc_ix);
225 	while (1) {
226 	    int tmp_um_refc_ix;
227 	    erts_atomic_inc_acqb(&chdr->tail.data.um_refc[um_refc_ix]);
228 	    tmp_um_refc_ix = erts_atomic32_read_acqb(&chdr->tail.data.um_refc_ix);
229 	    if (tmp_um_refc_ix == um_refc_ix)
230 		break;
231 	    erts_atomic_dec_relb(&chdr->tail.data.um_refc[um_refc_ix]);
232 	    um_refc_ix = tmp_um_refc_ix;
233 	}
234     }
235 
236     enqueue_remote_managed_thread(chdr, blk, cinit);
237 
238     if (!managed_thread)
239 	erts_atomic_dec_relb(&chdr->tail.data.um_refc[um_refc_ix]);
240 }
241 
242 static ERTS_INLINE void
fetch_remote(erts_sspa_chunk_header_t * chdr,int max)243 fetch_remote(erts_sspa_chunk_header_t *chdr, int max)
244 {
245     int new_local = 0;
246 
247     if (chdr->head.no_thr_progress_check < ERTS_SSPA_FORCE_THR_CHECK_PROGRESS)
248 	chdr->head.no_thr_progress_check++;
249     else {
250 	erts_aint_t ilast;
251 
252 	chdr->head.no_thr_progress_check = 0;
253 
254 	ilast = erts_atomic_read_nob(&chdr->tail.data.last);
255 	if (((erts_sspa_blk_t *) ilast) == &chdr->tail.data.marker
256 	    && chdr->head.first == &chdr->tail.data.marker)
257 	    return;
258 
259 	if (chdr->head.next.thr_progress_reached
260 	    || erts_thr_progress_has_reached(chdr->head.next.thr_progress)) {
261 	    int um_refc_ix;
262 	    chdr->head.next.thr_progress_reached = 1;
263 	    um_refc_ix = chdr->head.next.um_refc_ix;
264 	    if (erts_atomic_read_nob(&chdr->tail.data.um_refc[um_refc_ix]) == 0) {
265 
266 		ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
267 
268 		/* Move unreferenced end pointer forward... */
269 
270 		chdr->head.unref_end = chdr->head.next.unref_end;
271 
272 		ilast = check_insert_marker(chdr, ilast);
273 
274 		if (chdr->head.unref_end != (erts_sspa_blk_t *) ilast) {
275 		    chdr->head.next.unref_end = (erts_sspa_blk_t *) ilast;
276 		    chdr->head.next.thr_progress = erts_thr_progress_later(NULL);
277 		    erts_atomic32_set_relb(&chdr->tail.data.um_refc_ix,
278 					   um_refc_ix);
279 		    chdr->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0;
280 		    chdr->head.next.thr_progress_reached = 0;
281 		}
282 	    }
283 	}
284     }
285 
286     if (new_local < max && chdr->head.first != chdr->head.unref_end) {
287 	erts_sspa_blk_t *first, *this, *next, *last;
288 	first = chdr->head.first;
289 	if (first == &chdr->tail.data.marker) {
290 	    chdr->head.used_marker = 0;
291 	    first = ((erts_sspa_blk_t *)
292 		     erts_atomic_read_nob(&first->next_atmc));
293 	    chdr->head.first = first;
294 	}
295 	if (first != chdr->head.unref_end) {
296 
297 	    ERTS_SSPA_DBG_CHK_LCL(chdr);
298 
299 	    this = last = first;
300 	    do {
301 		next = (erts_sspa_blk_t *) erts_atomic_read_nob(&this->next_atmc);
302 		if (this == &chdr->tail.data.marker)
303 		    chdr->head.used_marker = 0;
304 		else {
305 		    last->next_ptr = this;
306 		    last = this;
307 		    new_local++;
308 		}
309 		this = next;
310 	    } while (new_local < max && this != chdr->head.unref_end);
311 	    chdr->head.first = this;
312 	    if (!chdr->local.last)
313 		chdr->local.first = first;
314 	    else
315 		chdr->local.last->next_ptr = first;
316 	    chdr->local.last = last;
317 	    last->next_ptr = NULL;
318 	    chdr->local.cnt += new_local;
319 
320 	    ERTS_SSPA_DBG_CHK_LCL(chdr);
321 	}
322     }
323 
324 }
325 
326 erts_sspa_blk_t *
erts_sspa_process_remote_frees(erts_sspa_chunk_header_t * chdr,erts_sspa_blk_t * old_res)327 erts_sspa_process_remote_frees(erts_sspa_chunk_header_t *chdr,
328 			       erts_sspa_blk_t *old_res)
329 {
330     erts_sspa_blk_t *res = old_res;
331 
332     fetch_remote(chdr, ERTS_SSPA_MAX_GET_NEW_LOCAL);
333 
334     if (!res && chdr->local.first) {
335 
336 	ERTS_SSPA_DBG_CHK_LCL(chdr);
337 
338 	res = chdr->local.first;
339 	chdr->local.first = res->next_ptr;
340 	chdr->local.cnt--;
341 	if (!chdr->local.first)
342 	    chdr->local.last = NULL;
343 
344 	ERTS_SSPA_DBG_CHK_LCL(chdr);
345     }
346 
347     return res;
348 }
349 
350