1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson 2017. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 #define STATIC_ERLANG_NIF 1
22 
23 #include "erl_nif.h"
24 #include "config.h"
25 #include "sys.h"
26 
27 #ifdef VALGRIND
28 #  include <valgrind/memcheck.h>
29 #endif
30 
31 #define ACCUMULATOR_SIZE (2 << 10)
32 
33 #define FIND_NIF_RESCHEDULE_SIZE (1 << 20)
34 
35 /* NIF interface declarations */
36 static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM load_info);
37 static int upgrade(ErlNifEnv *env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info);
38 static void unload(ErlNifEnv *env, void* priv_data);
39 
40 static ErlNifResourceType *rtype_buffer;
41 
42 static ERL_NIF_TERM am_ok;
43 static ERL_NIF_TERM am_error;
44 
45 static ERL_NIF_TERM am_lock_order_violation;
46 
47 static ERL_NIF_TERM am_acquired;
48 static ERL_NIF_TERM am_busy;
49 
50 static ERL_NIF_TERM am_continue;
51 
52 static ERL_NIF_TERM am_out_of_memory;
53 static ERL_NIF_TERM am_not_found;
54 
55 typedef struct {
56 #ifdef DEBUG
57     erts_atomic32_t concurrent_users;
58 #endif
59 
60     ErlNifBinary accumulator;
61     size_t accumulated_bytes;
62     int accumulator_present;
63 
64     ErlNifIOQueue *queue;
65 
66     erts_atomic32_t external_lock;
67 } buffer_data_t;
68 
69 static ERL_NIF_TERM new_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
70 
71 static ERL_NIF_TERM peek_head_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
72 static ERL_NIF_TERM skip_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
73 static ERL_NIF_TERM size_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
74 
75 static ERL_NIF_TERM write_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
76 static ERL_NIF_TERM copying_read_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
77 
78 static ERL_NIF_TERM find_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
79 
80 static ERL_NIF_TERM trylock_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
81 static ERL_NIF_TERM unlock_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
82 
83 static ErlNifFunc nif_funcs[] = {
84     {"new", 0, new_nif},
85     {"size", 1, size_nif},
86     {"peek_head", 1, peek_head_nif},
87     {"copying_read", 2, copying_read_nif},
88     {"write", 2, write_nif},
89     {"skip", 2, skip_nif},
90     {"find_byte_index", 2, find_nif},
91     {"try_lock", 1, trylock_nif},
92     {"unlock", 1, unlock_nif},
93 };
94 
95 ERL_NIF_INIT(prim_buffer, nif_funcs, load, NULL, upgrade, unload)
96 
97 static void gc_buffer(ErlNifEnv *env, void* data);
98 
load(ErlNifEnv * env,void ** priv_data,ERL_NIF_TERM load_info)99 static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM load_info)
100 {
101     am_ok = enif_make_atom(env, "ok");
102     am_error = enif_make_atom(env, "error");
103 
104     am_lock_order_violation = enif_make_atom(env, "lock_order_violation");
105     am_acquired = enif_make_atom(env, "acquired");
106     am_busy = enif_make_atom(env, "busy");
107 
108     am_continue = enif_make_atom(env, "continue");
109 
110     am_out_of_memory = enif_make_atom(env, "out_of_memory");
111     am_not_found = enif_make_atom(env, "not_found");
112 
113     rtype_buffer = enif_open_resource_type(env, NULL, "gc_buffer", gc_buffer,
114         ERL_NIF_RT_CREATE, NULL);
115 
116     *priv_data = NULL;
117 
118     return 0;
119 }
120 
unload(ErlNifEnv * env,void * priv_data)121 static void unload(ErlNifEnv *env, void* priv_data)
122 {
123 
124 }
125 
upgrade(ErlNifEnv * env,void ** priv_data,void ** old_priv_data,ERL_NIF_TERM load_info)126 static int upgrade(ErlNifEnv *env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info)
127 {
128     if(*old_priv_data != NULL) {
129         return -1; /* Don't know how to do that */
130     }
131 
132     if(*priv_data != NULL) {
133         return -1; /* Don't know how to do that */
134     }
135 
136     if(load(env, priv_data, load_info)) {
137         return -1;
138     }
139 
140     return 0;
141 }
142 
gc_buffer(ErlNifEnv * env,void * data)143 static void gc_buffer(ErlNifEnv *env, void* data) {
144     buffer_data_t *buffer = (buffer_data_t*)data;
145 
146     if(buffer->accumulator_present) {
147         enif_release_binary(&buffer->accumulator);
148     }
149 
150     enif_ioq_destroy(buffer->queue);
151 }
152 
get_buffer_data(ErlNifEnv * env,ERL_NIF_TERM opaque,buffer_data_t ** buffer)153 static int get_buffer_data(ErlNifEnv *env, ERL_NIF_TERM opaque, buffer_data_t **buffer) {
154     return enif_get_resource(env, opaque, rtype_buffer, (void **)buffer);
155 }
156 
157 /* Copies a number of bytes from the head of the iovec, skipping "vec_skip"
158  * vector elements followed by "byte_skip" bytes on the target vector. */
copy_from_iovec(SysIOVec * iovec,int vec_len,int vec_skip,size_t byte_skip,size_t size,char * data)159 static void copy_from_iovec(SysIOVec *iovec, int vec_len, int vec_skip,
160         size_t byte_skip, size_t size, char *data) {
161 
162     size_t bytes_copied, skip_offset;
163     int vec_index;
164 
165     skip_offset = byte_skip;
166     vec_index = vec_skip;
167     bytes_copied = 0;
168 
169     while(bytes_copied < size) {
170         size_t block_size, copy_size;
171         char *block_start;
172 
173         ASSERT(vec_index < vec_len);
174 
175         block_start = (char*)iovec[vec_index].iov_base;
176         block_size = iovec[vec_index].iov_len;
177 
178         copy_size = MIN(size - bytes_copied, block_size - skip_offset);
179         sys_memcpy(&data[bytes_copied], &block_start[skip_offset], copy_size);
180 
181         bytes_copied += copy_size;
182         skip_offset = 0;
183 
184         vec_index++;
185     }
186 }
187 
188 /* Convenience function for copy_from_iovec over queues. */
copy_from_queue(ErlNifIOQueue * queue,int queue_skip,size_t byte_skip,size_t size,char * data)189 static void copy_from_queue(ErlNifIOQueue *queue, int queue_skip,
190     size_t byte_skip, size_t size, char *data) {
191 
192     SysIOVec *queued_data;
193     int queue_length;
194 
195     queued_data = enif_ioq_peek(queue, &queue_length);
196     ASSERT(queue_skip < queue_length);
197 
198     copy_from_iovec(queued_data, queue_length, queue_skip, byte_skip, size, data);
199 }
200 
enqueue_write_accumulator(buffer_data_t * buffer)201 static int enqueue_write_accumulator(buffer_data_t *buffer) {
202     ASSERT(!buffer->accumulator_present ^ (buffer->accumulated_bytes > 0));
203 
204     if(buffer->accumulator_present && buffer->accumulated_bytes > 0) {
205         if(!enif_realloc_binary(&buffer->accumulator, buffer->accumulated_bytes)) {
206             return 0;
207         } else if(!enif_ioq_enq_binary(buffer->queue, &buffer->accumulator, 0)) {
208             return 0;
209         }
210 
211         /* The queue owns the accumulator now. */
212         buffer->accumulator_present = 0;
213         buffer->accumulated_bytes = 0;
214     }
215 
216     return 1;
217 }
218 
combine_small_writes(buffer_data_t * buffer,ErlNifIOVec * iovec)219 static int combine_small_writes(buffer_data_t *buffer, ErlNifIOVec *iovec) {
220     ASSERT(!buffer->accumulator_present ^ (buffer->accumulated_bytes > 0));
221 
222     if(buffer->accumulated_bytes + iovec->size >= ACCUMULATOR_SIZE) {
223         if(iovec->size >= (ACCUMULATOR_SIZE / 2)) {
224             return 0;
225         }
226 
227         if(!enqueue_write_accumulator(buffer)) {
228             return 0;
229         }
230     }
231 
232     if(!buffer->accumulator_present) {
233         if(!enif_alloc_binary(ACCUMULATOR_SIZE, &buffer->accumulator)) {
234             return 0;
235         }
236 
237         buffer->accumulator_present = 1;
238     }
239 
240     copy_from_iovec(iovec->iov, iovec->iovcnt, 0, 0, iovec->size,
241             (char*)&buffer->accumulator.data[buffer->accumulated_bytes]);
242     buffer->accumulated_bytes += iovec->size;
243 
244     return 1;
245 }
246 
247 /* *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** */
248 
new_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])249 static ERL_NIF_TERM new_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
250     buffer_data_t *buffer;
251     ERL_NIF_TERM result;
252 
253     buffer = (buffer_data_t*)enif_alloc_resource(rtype_buffer, sizeof(buffer_data_t));
254     buffer->queue = enif_ioq_create(ERL_NIF_IOQ_NORMAL);
255 
256     if(buffer->queue != NULL) {
257 #ifdef DEBUG
258         erts_atomic32_init_nob(&buffer->concurrent_users, 0);
259 #endif
260         erts_atomic32_init_nob(&buffer->external_lock, 0);
261 
262         buffer->accumulator_present = 0;
263         buffer->accumulated_bytes = 0;
264 
265         result = enif_make_resource(env, buffer);
266     } else {
267         result = enif_raise_exception(env, am_out_of_memory);
268     }
269 
270     enif_release_resource(buffer);
271 
272     return result;
273 }
274 
size_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])275 static ERL_NIF_TERM size_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
276     buffer_data_t *buffer;
277 
278     size_t total_size;
279 
280     if(argc != 1 || !get_buffer_data(env, argv[0], &buffer)) {
281         return enif_make_badarg(env);
282     }
283 
284     ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
285 
286     total_size = enif_ioq_size(buffer->queue);
287 
288     if(buffer->accumulator_present) {
289         total_size += buffer->accumulated_bytes;
290     } else {
291         ASSERT(buffer->accumulated_bytes == 0);
292     }
293 
294     ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
295 
296     return enif_make_uint64(env, total_size);
297 }
298 
copying_read_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])299 static ERL_NIF_TERM copying_read_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
300     buffer_data_t *buffer;
301 
302     ERL_NIF_TERM result;
303     unsigned char *data;
304     Uint64 block_size;
305 
306     if(argc != 2 || !get_buffer_data(env, argv[0], &buffer)
307                  || !enif_get_uint64(env, argv[1], &block_size)) {
308         return enif_make_badarg(env);
309     }
310 
311     ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
312 
313     if(!enqueue_write_accumulator(buffer)) {
314         return enif_raise_exception(env, am_out_of_memory);
315     }
316 
317     if(enif_ioq_size(buffer->queue) < block_size) {
318         return enif_make_badarg(env);
319     }
320 
321     data = enif_make_new_binary(env, block_size, &result);
322 
323     if(block_size > 0) {
324         copy_from_queue(buffer->queue, 0, 0, block_size, (char*)data);
325         enif_ioq_deq(buffer->queue, block_size, NULL);
326     }
327 
328     ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
329 
330     return result;
331 }
332 
write_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])333 static ERL_NIF_TERM write_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
334     buffer_data_t *buffer;
335 
336     ErlNifIOVec vec, *iovec = &vec;
337     ERL_NIF_TERM tail;
338 
339     if(argc != 2 || !get_buffer_data(env, argv[0], &buffer)
340                  || !enif_inspect_iovec(env, 64, argv[1], &tail, &iovec)) {
341         return enif_make_badarg(env);
342     }
343 
344     ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
345 
346     if(!combine_small_writes(buffer, iovec)) {
347         if(!enqueue_write_accumulator(buffer) || !enif_ioq_enqv(buffer->queue, iovec, 0)) {
348             return enif_raise_exception(env, am_out_of_memory);
349         }
350     }
351 
352     ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
353 
354     if(!enif_is_empty_list(env, tail)) {
355         const ERL_NIF_TERM new_argv[2] = {argv[0], tail};
356 
357         return enif_schedule_nif(env, "write", 0, &write_nif, argc, new_argv);
358     }
359 
360     return am_ok;
361 }
362 
peek_head_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])363 static ERL_NIF_TERM peek_head_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
364     buffer_data_t *buffer;
365 
366     ERL_NIF_TERM result;
367 
368     if(argc != 1 || !get_buffer_data(env, argv[0], &buffer)) {
369         return enif_make_badarg(env);
370     }
371 
372     ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
373 
374     if(!enqueue_write_accumulator(buffer)) {
375         return enif_raise_exception(env, am_out_of_memory);
376     }
377 
378     if(!enif_ioq_peek_head(env, buffer->queue, NULL, &result)) {
379         return enif_make_badarg(env);
380     }
381 
382     ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
383 
384     return result;
385 }
386 
skip_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])387 static ERL_NIF_TERM skip_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
388     buffer_data_t *buffer;
389 
390     Uint64 block_size;
391 
392     if(argc != 2 || !get_buffer_data(env, argv[0], &buffer)
393                  || !enif_get_uint64(env, argv[1], &block_size)) {
394         return enif_make_badarg(env);
395     }
396 
397     ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
398 
399     if(!enqueue_write_accumulator(buffer)) {
400         return enif_raise_exception(env, am_out_of_memory);
401     } else if(enif_ioq_size(buffer->queue) < block_size) {
402         return enif_make_badarg(env);
403     }
404 
405     enif_ioq_deq(buffer->queue, block_size, NULL);
406 
407     ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
408 
409     return am_ok;
410 }
411 
find_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])412 static ERL_NIF_TERM find_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
413     buffer_data_t *buffer;
414 
415     int queue_length, queue_index;
416     SysIOVec *queued_data;
417     size_t queue_size;
418 
419     size_t search_offset;
420     int needle;
421 
422     if(argc != 2 || !get_buffer_data(env, argv[0], &buffer)
423                  || !enif_get_int(env, argv[1], &needle)) {
424         return enif_make_badarg(env);
425     }
426 
427     ASSERT(erts_atomic32_inc_read_acqb(&buffer->concurrent_users) == 1);
428 
429     if(!enqueue_write_accumulator(buffer)) {
430         return enif_raise_exception(env, am_out_of_memory);
431     } else if(needle < 0 || needle > 255) {
432         return enif_make_badarg(env);
433     }
434 
435     queued_data = enif_ioq_peek(buffer->queue, &queue_length);
436     queue_size = enif_ioq_size(buffer->queue);
437     queue_index = 0;
438 
439     search_offset = 0;
440 
441     if(queue_size > (FIND_NIF_RESCHEDULE_SIZE / 100)) {
442         if(enif_thread_type() == ERL_NIF_THR_NORMAL_SCHEDULER) {
443             int timeslice_percent;
444 
445             if(queue_size >= FIND_NIF_RESCHEDULE_SIZE) {
446                 ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
447 
448                 return enif_schedule_nif(env, "find",
449                     ERL_NIF_DIRTY_JOB_CPU_BOUND, &find_nif, argc, argv);
450             }
451 
452             timeslice_percent = (queue_size * 100) / FIND_NIF_RESCHEDULE_SIZE;
453             enif_consume_timeslice(env, timeslice_percent);
454         }
455     }
456 
457     while(queue_index < queue_length) {
458         char *needle_address;
459         char *block_start;
460         size_t block_size;
461 
462         block_start = queued_data[queue_index].iov_base;
463         block_size = queued_data[queue_index].iov_len;
464 
465         needle_address = memchr(block_start, needle, block_size);
466 
467         if(needle_address != NULL) {
468             size_t result = search_offset + (needle_address - block_start);
469 
470             ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
471 
472             return enif_make_tuple2(env, am_ok, enif_make_uint64(env, result));
473         }
474 
475         search_offset += block_size;
476         queue_index++;
477     }
478 
479     ASSERT(erts_atomic32_dec_read_relb(&buffer->concurrent_users) == 0);
480 
481     return am_not_found;
482 }
483 
484 /* */
485 
trylock_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])486 static ERL_NIF_TERM trylock_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
487     buffer_data_t *buffer;
488 
489     if(argc != 1 || !get_buffer_data(env, argv[0], &buffer)) {
490         return enif_make_badarg(env);
491     }
492 
493     if(erts_atomic32_cmpxchg_acqb(&buffer->external_lock, 1, 0) == 0) {
494         return am_acquired;
495     }
496 
497     return am_busy;
498 }
499 
unlock_nif(ErlNifEnv * env,int argc,const ERL_NIF_TERM argv[])500 static ERL_NIF_TERM unlock_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
501     buffer_data_t *buffer;
502 
503     if(argc != 1 || !get_buffer_data(env, argv[0], &buffer)) {
504         return enif_make_badarg(env);
505     }
506 
507     if(erts_atomic32_cmpxchg_relb(&buffer->external_lock, 0, 1) == 0) {
508         return enif_raise_exception(env, am_lock_order_violation);
509     }
510 
511     return am_ok;
512 }
513