1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson 2017. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 #define STATIC_ERLANG_NIF 1
22
23 #include "erl_nif.h"
24 #include "config.h"
25 #include "sys.h"
26
27 #ifdef VALGRIND
28 # include <valgrind/memcheck.h>
29 #endif
30
31 #define ACCUMULATOR_SIZE (2 << 10)
32
33 #define FIND_NIF_RESCHEDULE_SIZE (1 << 20)
34
35 /* NIF interface declarations */
36 static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM load_info);
37 static int upgrade(ErlNifEnv *env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info);
38 static void unload(ErlNifEnv *env, void* priv_data);
39
40 static ErlNifResourceType *rtype_buffer;
41
42 static ERL_NIF_TERM am_ok;
43 static ERL_NIF_TERM am_error;
44
45 static ERL_NIF_TERM am_lock_order_violation;
46
47 static ERL_NIF_TERM am_acquired;
48 static ERL_NIF_TERM am_busy;
49
50 static ERL_NIF_TERM am_continue;
51
52 static ERL_NIF_TERM am_out_of_memory;
53 static ERL_NIF_TERM am_not_found;
54
55 typedef struct {
56 #ifdef DEBUG
57 erts_atomic32_t concurrent_users;
58 #endif
59
60 ErlNifBinary accumulator;
61 size_t accumulated_bytes;
62 int accumulator_present;
63
64 ErlNifIOQueue *queue;
65
66 erts_atomic32_t external_lock;
67 } buffer_data_t;
68
69 static ERL_NIF_TERM new_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
70
71 static ERL_NIF_TERM peek_head_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
72 static ERL_NIF_TERM skip_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
73 static ERL_NIF_TERM size_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
74
75 static ERL_NIF_TERM write_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
76 static ERL_NIF_TERM copying_read_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
77
78 static ERL_NIF_TERM find_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
79
80 static ERL_NIF_TERM trylock_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
81 static ERL_NIF_TERM unlock_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
82
83 static ErlNifFunc nif_funcs[] = {
84 {"new", 0, new_nif},
85 {"size", 1, size_nif},
86 {"peek_head", 1, peek_head_nif},
87 {"copying_read", 2, copying_read_nif},
88 {"write", 2, write_nif},
89 {"skip", 2, skip_nif},
90 {"find_byte_index", 2, find_nif},
91 {"try_lock", 1, trylock_nif},
92 {"unlock", 1, unlock_nif},
93 };
94
95 ERL_NIF_INIT(prim_buffer, nif_funcs, load, NULL, upgrade, unload)
96
97 static void gc_buffer(ErlNifEnv *env, void* data);
98
load(ErlNifEnv * env,void ** priv_data,ERL_NIF_TERM load_info)99 static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM load_info)
100 {
101 am_ok = enif_make_atom(env, "ok");
102 am_error = enif_make_atom(env, "error");
103
104 am_lock_order_violation = enif_make_atom(env, "lock_order_violation");
105 am_acquired = enif_make_atom(env, "acquired");
106 am_busy = enif_make_atom(env, "busy");
107
108 am_continue = enif_make_atom(env, "continue");
109
110 am_out_of_memory = enif_make_atom(env, "out_of_memory");
111 am_not_found = enif_make_atom(env, "not_found");
112
113 rtype_buffer = enif_open_resource_type(env, NULL, "gc_buffer", gc_buffer,
114 ERL_NIF_RT_CREATE, NULL);
115
116 *priv_data = NULL;
117
118 return 0;
119 }
120
unload(ErlNifEnv * env,void * priv_data)121 static void unload(ErlNifEnv *env, void* priv_data)
122 {
123
124 }
125
upgrade(ErlNifEnv * env,void ** priv_data,void ** old_priv_data,ERL_NIF_TERM load_info)126 static int upgrade(ErlNifEnv *env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info)
127 {
128 if(*old_priv_data != NULL) {
129 return -1; /* Don't know how to do that */
130 }
131
132 if(*priv_data != NULL) {
133 return -1; /* Don't know how to do that */
134 }
135
136 if(load(env, priv_data, load_info)) {
137 return -1;
138 }
139
140 return 0;
141 }
142
gc_buffer(ErlNifEnv * env,void * data)143 static void gc_buffer(ErlNifEnv *env, void* data) {
144 buffer_data_t *buffer = (buffer_data_t*)data;
145
146 if(buffer->accumulator_present) {
147 enif_release_binary(&buffer->accumulator);
148 }
149
150 enif_ioq_destroy(buffer->queue);
151 }
152
get_buffer_data(ErlNifEnv * env,ERL_NIF_TERM opaque,buffer_data_t ** buffer)153 static int get_buffer_data(ErlNifEnv *env, ERL_NIF_TERM opaque, buffer_data_t **buffer) {
154 return enif_get_resource(env, opaque, rtype_buffer, (void **)buffer);
155 }
156
157 /* Copies a number of bytes from the head of the iovec, skipping "vec_skip"
158 * vector elements followed by "byte_skip" bytes on the target vector. */
copy_from_iovec(SysIOVec * iovec,int vec_len,int vec_skip,size_t byte_skip,size_t size,char * data)159 static void copy_from_iovec(SysIOVec *iovec, int vec_len, int vec_skip,
160 size_t byte_skip, size_t size, char *data) {
161
162 size_t bytes_copied, skip_offset;
163 int vec_index;
164
165 skip_offset = byte_skip;
166 vec_index = vec_skip;
167 bytes_copied = 0;
168
169 while(bytes_copied < size) {
170 size_t block_size, copy_size;
171 char *block_start;
172
173 ASSERT(vec_index < vec_len);
174
175 block_start = (char*)iovec[vec_index].iov_base;
176 block_size = iovec[vec_index].iov_len;
177
178 copy_size = MIN(size - bytes_copied, block_size - skip_offset);
179 sys_memcpy(&data[bytes_copied], &block_start[skip_offset], copy_size);
180
181 bytes_copied += copy_size;
182 skip_offset = 0;
183
184 vec_index++;
185 }
186 }
187
188 /* Convenience function for copy_from_iovec over queues. */
copy_from_queue(ErlNifIOQueue * queue,int queue_skip,size_t byte_skip,size_t size,char * data)189 static void copy_from_queue(ErlNifIOQueue *queue, int queue_skip,
190 size_t byte_skip, size_t size, char *data) {
191
192 SysIOVec *queued_data;
193 int queue_length;
194
195 queued_data = enif_ioq_peek(queue, &queue_length);
196 ASSERT(queue_skip < queue_length);
197
198 copy_from_iovec(queued_data, queue_length, queue_skip, byte_skip, size, data);
199 }
200
enqueue_write_accumulator(buffer_data_t * buffer)201 static int enqueue_write_accumulator(buffer_data_t *buffer) {
202 ASSERT(!buffer->accumulator_present ^ (buffer->accumulated_bytes > 0));
203
204 if(buffer->accumulator_present && buffer->accumulated_bytes > 0) {
205 if(!enif_realloc_binary(&buffer->accumulator, buffer->accumulated_bytes)) {
206 return 0;
207 } else if(!enif_ioq_enq_binary(buffer->queue, &buffer->accumulator, 0)) {
208 return 0;
209 }
210
211 /* The queue owns the accumulator now. */
212 buffer->accumulator_present = 0;
213 buffer->accumulated_bytes = 0;
214 }
215
216 return 1;
217 }
218
combine_small_writes(buffer_data_t * buffer,ErlNifIOVec * iovec)219 static int combine_small_writes(buffer_data_t *buffer, ErlNifIOVec *iovec) {
220 ASSERT(!buffer->accumulator_present ^ (buffer->accumulated_bytes > 0));
221
222 if(buffer->accumulated_bytes + iovec->size >= ACCUMULATOR_SIZE) {
223 if(iovec->size >= (ACCUMULATOR_SIZE / 2)) {
224 return 0;
225 }
226
227 if(!enqueue_write_accumulator(buffer)) {
228 return 0;
229 }
230 }
231
232 if(!buffer->accumulator_present) {
233 if(!enif_alloc_binary(ACCUMULATOR_SIZE, &buffer->accumulator)) {
234 return 0;
235 }
236
237 buffer->accumulator_present = 1;
238 }
239
240 copy_from_iovec(iovec->iov, iovec->iovcnt, 0, 0, iovec->size,
241 (char*)&buffer->accumulator.data[buffer->accumulated_bytes]);
242 buffer->accumulated_bytes += iovec->size;
243
244 return 1;
245 }
246
247 /* *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** */
248
new_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])249 static ERL_NIF_TERM new_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
250 buffer_data_t *buffer;
251 ERL_NIF_TERM result;
252
253 buffer = (buffer_data_t*)enif_alloc_resource(rtype_buffer, sizeof(buffer_data_t));
254 buffer->queue = enif_ioq_create(ERL_NIF_IOQ_NORMAL);
255
256 if(buffer->queue != NULL) {
257 #ifdef DEBUG
258 erts_atomic32_init_nob(&buffer->concurrent_users, 0);
259 #endif
260 erts_atomic32_init_nob(&buffer->external_lock, 0);
261
262 buffer->accumulator_present = 0;
263 buffer->accumulated_bytes = 0;
264
265 result = enif_make_resource(env, buffer);
266 } else {
267 result = enif_raise_exception(env, am_out_of_memory);
268 }
269
270 enif_release_resource(buffer);
271
272 return result;
273 }
274
size_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])275 static ERL_NIF_TERM size_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
276 buffer_data_t *buffer;
277
278 size_t total_size;
279
280 if(argc != 1 || !get_buffer_data(env, argv[0], &buffer)) {
281 return enif_make_badarg(env);
282 }
283
284 ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
285
286 total_size = enif_ioq_size(buffer->queue);
287
288 if(buffer->accumulator_present) {
289 total_size += buffer->accumulated_bytes;
290 } else {
291 ASSERT(buffer->accumulated_bytes == 0);
292 }
293
294 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
295
296 return enif_make_uint64(env, total_size);
297 }
298
copying_read_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])299 static ERL_NIF_TERM copying_read_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
300 buffer_data_t *buffer;
301
302 ERL_NIF_TERM result;
303 unsigned char *data;
304 Uint64 block_size;
305
306 if(argc != 2 || !get_buffer_data(env, argv[0], &buffer)
307 || !enif_get_uint64(env, argv[1], &block_size)) {
308 return enif_make_badarg(env);
309 }
310
311 ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
312
313 if(!enqueue_write_accumulator(buffer)) {
314 return enif_raise_exception(env, am_out_of_memory);
315 }
316
317 if(enif_ioq_size(buffer->queue) < block_size) {
318 return enif_make_badarg(env);
319 }
320
321 data = enif_make_new_binary(env, block_size, &result);
322
323 if(block_size > 0) {
324 copy_from_queue(buffer->queue, 0, 0, block_size, (char*)data);
325 enif_ioq_deq(buffer->queue, block_size, NULL);
326 }
327
328 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
329
330 return result;
331 }
332
write_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])333 static ERL_NIF_TERM write_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
334 buffer_data_t *buffer;
335
336 ErlNifIOVec vec, *iovec = &vec;
337 ERL_NIF_TERM tail;
338
339 if(argc != 2 || !get_buffer_data(env, argv[0], &buffer)
340 || !enif_inspect_iovec(env, 64, argv[1], &tail, &iovec)) {
341 return enif_make_badarg(env);
342 }
343
344 ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
345
346 if(!combine_small_writes(buffer, iovec)) {
347 if(!enqueue_write_accumulator(buffer) || !enif_ioq_enqv(buffer->queue, iovec, 0)) {
348 return enif_raise_exception(env, am_out_of_memory);
349 }
350 }
351
352 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
353
354 if(!enif_is_empty_list(env, tail)) {
355 const ERL_NIF_TERM new_argv[2] = {argv[0], tail};
356
357 return enif_schedule_nif(env, "write", 0, &write_nif, argc, new_argv);
358 }
359
360 return am_ok;
361 }
362
peek_head_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])363 static ERL_NIF_TERM peek_head_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
364 buffer_data_t *buffer;
365
366 ERL_NIF_TERM result;
367
368 if(argc != 1 || !get_buffer_data(env, argv[0], &buffer)) {
369 return enif_make_badarg(env);
370 }
371
372 ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
373
374 if(!enqueue_write_accumulator(buffer)) {
375 return enif_raise_exception(env, am_out_of_memory);
376 }
377
378 if(!enif_ioq_peek_head(env, buffer->queue, NULL, &result)) {
379 return enif_make_badarg(env);
380 }
381
382 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
383
384 return result;
385 }
386
skip_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])387 static ERL_NIF_TERM skip_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
388 buffer_data_t *buffer;
389
390 Uint64 block_size;
391
392 if(argc != 2 || !get_buffer_data(env, argv[0], &buffer)
393 || !enif_get_uint64(env, argv[1], &block_size)) {
394 return enif_make_badarg(env);
395 }
396
397 ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
398
399 if(!enqueue_write_accumulator(buffer)) {
400 return enif_raise_exception(env, am_out_of_memory);
401 } else if(enif_ioq_size(buffer->queue) < block_size) {
402 return enif_make_badarg(env);
403 }
404
405 enif_ioq_deq(buffer->queue, block_size, NULL);
406
407 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
408
409 return am_ok;
410 }
411
find_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])412 static ERL_NIF_TERM find_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
413 buffer_data_t *buffer;
414
415 int queue_length, queue_index;
416 SysIOVec *queued_data;
417 size_t queue_size;
418
419 size_t search_offset;
420 int needle;
421
422 if(argc != 2 || !get_buffer_data(env, argv[0], &buffer)
423 || !enif_get_int(env, argv[1], &needle)) {
424 return enif_make_badarg(env);
425 }
426
427 ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
428
429 if(!enqueue_write_accumulator(buffer)) {
430 return enif_raise_exception(env, am_out_of_memory);
431 } else if(needle < 0 || needle > 255) {
432 return enif_make_badarg(env);
433 }
434
435 queued_data = enif_ioq_peek(buffer->queue, &queue_length);
436 queue_size = enif_ioq_size(buffer->queue);
437 queue_index = 0;
438
439 search_offset = 0;
440
441 if(queue_size > (FIND_NIF_RESCHEDULE_SIZE / 100)) {
442 if(enif_thread_type() == ERL_NIF_THR_NORMAL_SCHEDULER) {
443 int timeslice_percent;
444
445 if(queue_size >= FIND_NIF_RESCHEDULE_SIZE) {
446 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
447
448 return enif_schedule_nif(env, "find",
449 ERL_NIF_DIRTY_JOB_CPU_BOUND, &find_nif, argc, argv);
450 }
451
452 timeslice_percent = (queue_size * 100) / FIND_NIF_RESCHEDULE_SIZE;
453 enif_consume_timeslice(env, timeslice_percent);
454 }
455 }
456
457 while(queue_index < queue_length) {
458 char *needle_address;
459 char *block_start;
460 size_t block_size;
461
462 block_start = queued_data[queue_index].iov_base;
463 block_size = queued_data[queue_index].iov_len;
464
465 needle_address = memchr(block_start, needle, block_size);
466
467 if(needle_address != NULL) {
468 size_t result = search_offset + (needle_address - block_start);
469
470 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
471
472 return enif_make_tuple2(env, am_ok, enif_make_uint64(env, result));
473 }
474
475 search_offset += block_size;
476 queue_index++;
477 }
478
479 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
480
481 return am_not_found;
482 }
483
484 /* */
485
trylock_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])486 static ERL_NIF_TERM trylock_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
487 buffer_data_t *buffer;
488
489 if(argc != 1 || !get_buffer_data(env, argv[0], &buffer)) {
490 return enif_make_badarg(env);
491 }
492
493 if(erts_atomic32_cmpxchg_acqb(&buffer->external_lock, 1, 0) == 0) {
494 return am_acquired;
495 }
496
497 return am_busy;
498 }
499
unlock_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])500 static ERL_NIF_TERM unlock_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
501 buffer_data_t *buffer;
502
503 if(argc != 1 || !get_buffer_data(env, argv[0], &buffer)) {
504 return enif_make_badarg(env);
505 }
506
507 if(erts_atomic32_cmpxchg_relb(&buffer->external_lock, 0, 1) == 0) {
508 return enif_raise_exception(env, am_lock_order_violation);
509 }
510
511 return am_ok;
512 }
513