1 /*
2 * Copyright (c) 2016-2021 Hanspeter Portner (dev@open-music-kontrollers.ch)
3 *
4 * This is free software: you can redistribute it and/or modify
5 * it under the terms of the Artistic License 2.0 as published by
6 * The Perl Foundation.
7 *
8 * This source is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * Artistic License 2.0 for more details.
12 *
13 * You should have received a copy of the Artistic License 2.0
14 * along the source as a COPYING file. If not, obtain it from
15 * http://www.perlfoundation.org/artistic_license_2_0.
16 */
17
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <math.h>
21
22 #include <eteroj.h>
23 #include <varchunk.h>
24 #include <osc.lv2/reader.h>
25 #include <osc.lv2/writer.h>
26 #include <osc.lv2/forge.h>
27 #include <osc.lv2/stream.h>
28 #include <props.h>
29
30 #define BUF_SIZE 0x100000 // 1M
31 #define MTU_SIZE 1500
32 #define LIST_SIZE 2048
33 #define MAX_NPROPS 3
34 #define STR_LEN 128
35
36 typedef struct _plugstate_t plugstate_t;
37 typedef struct _list_t list_t;
38 typedef struct _plughandle_t plughandle_t;
39
40 struct _list_t {
41 double frames;
42 size_t size;
43 uint8_t buf [MTU_SIZE];
44 };
45
46 struct _plugstate_t {
47 char osc_url [STR_LEN];
48 char osc_error [STR_LEN];
49 int32_t osc_connected;
50 };
51
52 struct _plughandle_t {
53 LV2_URID_Map *map;
54 LV2_URID_Unmap *unmap;
55 LV2_Worker_Schedule *sched;
56 struct {
57 LV2_URID eteroj_connected;
58 LV2_URID eteroj_error;
59 } uris;
60
61 PROPS_T(props, MAX_NPROPS);
62 LV2_OSC_URID osc_urid;
63 LV2_Atom_Forge forge;
64 LV2_Atom_Forge_Ref ref;
65
66 volatile bool status_updated;
67 bool rolling;
68
69 plugstate_t state;
70 plugstate_t stash;
71
72 LV2_Log_Log *log;
73 LV2_Log_Logger logger;
74
75 const LV2_Atom_Sequence *osc_in;
76 LV2_Atom_Sequence *osc_out;
77
78 LV2_OSC_Schedule *osc_sched;
79 list_t list [LIST_SIZE];
80 unsigned nlist;
81
82 struct {
83 LV2_OSC_Driver driver;
84 LV2_OSC_Stream stream;
85 varchunk_t *from_worker;
86 varchunk_t *to_worker;
87 varchunk_t *to_thread;
88 } data;
89
90 char *osc_url;
91 };
92
93 static LV2_State_Status
_state_save(LV2_Handle instance,LV2_State_Store_Function store,LV2_State_Handle state,uint32_t flags,const LV2_Feature * const * features)94 _state_save(LV2_Handle instance, LV2_State_Store_Function store,
95 LV2_State_Handle state, uint32_t flags,
96 const LV2_Feature *const *features)
97 {
98 plughandle_t *handle = (plughandle_t *)instance;
99
100 return props_save(&handle->props, store, state, flags, features);
101 }
102
103 static LV2_State_Status
_state_restore(LV2_Handle instance,LV2_State_Retrieve_Function retrieve,LV2_State_Handle state,uint32_t flags,const LV2_Feature * const * features)104 _state_restore(LV2_Handle instance, LV2_State_Retrieve_Function retrieve,
105 LV2_State_Handle state, uint32_t flags,
106 const LV2_Feature *const *features)
107 {
108 plughandle_t *handle = (plughandle_t *)instance;
109
110 return props_restore(&handle->props, retrieve, state, flags, features);
111 }
112
113 static const LV2_State_Interface state_iface = {
114 .save = _state_save,
115 .restore = _state_restore
116 };
117
118 // non-rt
119 static void *
_data_recv_req(void * data,size_t size,size_t * max)120 _data_recv_req(void *data, size_t size, size_t *max)
121 {
122 plughandle_t *handle = data;
123
124 return varchunk_write_request_max(handle->data.from_worker, size, max);
125 }
126
127 // non-rt
128 static void
_data_recv_adv(void * data,size_t written)129 _data_recv_adv(void *data, size_t written)
130 {
131 plughandle_t *handle = data;
132
133 varchunk_write_advance(handle->data.from_worker, written);
134 }
135
136 // non-rt
137 static const void *
_data_send_req(void * data,size_t * len)138 _data_send_req(void *data, size_t *len)
139 {
140 plughandle_t *handle = data;
141
142 return varchunk_read_request(handle->data.to_worker, len);
143 }
144
145 // non-rt
146 static void
_data_send_adv(void * data)147 _data_send_adv(void *data)
148 {
149 plughandle_t *handle = data;
150
151 varchunk_read_advance(handle->data.to_worker);
152 }
153
154 // rt
155 static void
_url_change(plughandle_t * handle,const char * url)156 _url_change(plughandle_t *handle, const char *url)
157 {
158 LV2_OSC_Writer writer;
159 uint8_t buf [STR_LEN];
160 lv2_osc_writer_initialize(&writer, buf, STR_LEN);
161 lv2_osc_writer_message_vararg(&writer, "/eteroj/url", "s", url);
162 size_t size;
163 lv2_osc_writer_finalize(&writer, &size);
164
165 if(size)
166 {
167 uint8_t *dst;
168 if((dst = varchunk_write_request(handle->data.to_thread, size))) //FIXME use request_max
169 {
170 memcpy(dst, buf, size);
171 varchunk_write_advance(handle->data.to_thread, size);
172 }
173 }
174 }
175
176 static void
_intercept(void * data,int64_t frames,props_impl_t * impl)177 _intercept(void *data, int64_t frames, props_impl_t *impl)
178 {
179 plughandle_t *handle = data;
180
181 _url_change(handle, impl->value.body);
182 }
183
184 static const props_def_t defs [MAX_NPROPS] = {
185 {
186 .property = ETEROJ_URL_URI,
187 .offset = offsetof(plugstate_t, osc_url),
188 .type = LV2_ATOM__String,
189 .max_size = STR_LEN,
190 .event_cb = _intercept
191 },
192 {
193 .property = ETEROJ_ERROR_URI,
194 .offset = offsetof(plugstate_t, osc_error),
195 .access = LV2_PATCH__readable,
196 .type = LV2_ATOM__String,
197 .max_size = STR_LEN
198 },
199 {
200 .property = ETEROJ_CONNECTED_URI,
201 .offset = offsetof(plugstate_t, osc_connected),
202 .access = LV2_PATCH__readable,
203 .type = LV2_ATOM__Bool,
204 }
205 };
206
207 static LV2_Handle
instantiate(const LV2_Descriptor * descriptor,double rate,const char * bundle_path,const LV2_Feature * const * features)208 instantiate(const LV2_Descriptor* descriptor, double rate,
209 const char *bundle_path, const LV2_Feature *const *features)
210 {
211 plughandle_t *handle = calloc(1, sizeof(plughandle_t));
212 if(!handle)
213 {
214 return NULL;
215 }
216
217 mlock(handle, sizeof(plughandle_t));
218
219 for(unsigned i=0; features[i]; i++)
220 {
221 if(!strcmp(features[i]->URI, LV2_URID__map))
222 {
223 handle->map = features[i]->data;
224 }
225 else if(!strcmp(features[i]->URI, LV2_URID__unmap))
226 {
227 handle->unmap = features[i]->data;
228 }
229 else if(!strcmp(features[i]->URI, LV2_LOG__log))
230 {
231 handle->log = features[i]->data;
232 }
233 else if(!strcmp(features[i]->URI, LV2_OSC__schedule))
234 {
235 handle->osc_sched = features[i]->data;
236 }
237 else if(!strcmp(features[i]->URI, LV2_WORKER__schedule))
238 {
239 handle->sched= features[i]->data;
240 }
241 }
242
243 if(!handle->map || !handle->unmap)
244 {
245 fprintf(stderr,
246 "%s: Host does not support urid:(un)map\n", descriptor->URI);
247 free(handle);
248 return NULL;
249 }
250 if(!handle->sched)
251 {
252 fprintf(stderr, "%s: Host does not support work:schedule\n", descriptor->URI);
253 free(handle);
254 return NULL;
255 }
256
257 if(handle->log)
258 {
259 lv2_log_logger_init(&handle->logger, handle->map, handle->log);
260 }
261 lv2_osc_urid_init(&handle->osc_urid, handle->map);
262 lv2_atom_forge_init(&handle->forge, handle->map);
263
264 // init data
265 handle->data.from_worker = varchunk_new(BUF_SIZE, true);
266 handle->data.to_worker = varchunk_new(BUF_SIZE, true);
267 handle->data.to_thread = varchunk_new(BUF_SIZE, true);
268 if(!handle->data.from_worker || !handle->data.to_worker || !handle->data.to_thread)
269 {
270 free(handle);
271 return NULL;
272 }
273
274 handle->data.driver.write_req = _data_recv_req;
275 handle->data.driver.write_adv = _data_recv_adv;
276 handle->data.driver.read_req = _data_send_req;
277 handle->data.driver.read_adv = _data_send_adv;
278
279 if(!props_init(&handle->props, descriptor->URI,
280 defs, MAX_NPROPS, &handle->state, &handle->stash,
281 handle->map, handle))
282 {
283 free(handle);
284 return NULL;
285 }
286
287 handle->uris.eteroj_connected = props_map(&handle->props, ETEROJ_CONNECTED_URI);
288 handle->uris.eteroj_error = props_map(&handle->props, ETEROJ_ERROR_URI);
289
290 return handle;
291 }
292
293 static void
connect_port(LV2_Handle instance,uint32_t port,void * data)294 connect_port(LV2_Handle instance, uint32_t port, void *data)
295 {
296 plughandle_t *handle = (plughandle_t *)instance;
297
298 switch(port)
299 {
300 case 0:
301 handle->osc_in = (const LV2_Atom_Sequence *)data;
302 break;
303 case 1:
304 handle->osc_out = (LV2_Atom_Sequence *)data;
305 break;
306 default:
307 break;
308 }
309 }
310
311 static inline list_t *
_add_list(plughandle_t * handle)312 _add_list(plughandle_t *handle)
313 {
314 if(handle->nlist >= LIST_SIZE)
315 {
316 return NULL;
317 }
318
319 return &handle->list[handle->nlist++];
320 }
321
322 static inline void
_invalidate_list(list_t * list)323 _invalidate_list(list_t *list)
324 {
325 list->size = 0; // invalidate
326 }
327
328 static int
_cmp(const void * a,const void * b)329 _cmp(const void *a, const void *b)
330 {
331 const list_t *A = a;
332 const list_t *B = b;
333
334 if(A->size && B->size)
335 {
336 if(A->frames < B->frames)
337 {
338 return -1;
339 }
340 else if(A->frames > B->frames)
341 {
342 return 1;
343 }
344
345 return 0;
346 }
347 else if(A->size)
348 {
349 return -1;
350 }
351 else if(B->size)
352 {
353 return 1;
354 }
355
356 return 0;
357 }
358
359 static inline void
_sort_list(plughandle_t * handle)360 _sort_list(plughandle_t *handle)
361 {
362 qsort(handle->list, handle->nlist, sizeof(list_t), _cmp);
363 }
364
365 static inline void
_parse(plughandle_t * handle,double frames,const uint8_t * buf,size_t size)366 _parse(plughandle_t *handle, double frames, const uint8_t *buf, size_t size)
367 {
368 if(handle->ref)
369 {
370 handle->ref = lv2_atom_forge_frame_time(&handle->forge, frames);
371 }
372 if(handle->ref)
373 {
374 handle->ref = lv2_osc_forge_packet(&handle->forge, &handle->osc_urid,
375 handle->map, buf, size);
376 }
377 }
378
379 static inline void
_unroll(plughandle_t * handle,const uint8_t * buf,size_t size)380 _unroll(plughandle_t *handle, const uint8_t *buf, size_t size)
381 {
382 LV2_OSC_Reader reader;
383 lv2_osc_reader_initialize(&reader, buf, size);
384
385 if(lv2_osc_reader_is_bundle(&reader))
386 {
387 LV2_OSC_Item *itm = OSC_READER_BUNDLE_BEGIN(&reader, size);
388
389 // immediate dispatch ?
390 if( (itm->timetag == LV2_OSC_IMMEDIATE) || !handle->osc_sched )
391 {
392 _parse(handle, 0.0, buf, size);
393 }
394 else if(size <= MTU_SIZE)
395 {
396 list_t *l = _add_list(handle);
397 if(l)
398 {
399 const double frames = handle->osc_sched->osc2frames(
400 handle->osc_sched->handle, itm->timetag);
401
402 l->frames = frames;
403 l->size = size;
404 memcpy(l->buf, buf, size);
405 }
406 else if(handle->log)
407 {
408 lv2_log_trace(&handle->logger, "message pool overflow");
409 }
410 }
411 else if(handle->log)
412 {
413 lv2_log_trace(&handle->logger, "message too long");
414 }
415 }
416 else if(lv2_osc_reader_is_message(&reader)) // immediate dispatch
417 {
418 _parse(handle, 0.0, buf, size);
419 }
420 }
421
422 static void
run(LV2_Handle instance,uint32_t nsamples)423 run(LV2_Handle instance, uint32_t nsamples)
424 {
425 plughandle_t *handle = instance;
426
427 LV2_Atom_Forge *forge = &handle->forge;
428 LV2_Atom_Forge_Frame frame;
429 uint32_t capacity;
430
431 // prepare sequence forges
432 capacity = handle->osc_out->atom.size;
433 lv2_atom_forge_set_buffer(forge, (uint8_t *)handle->osc_out, capacity);
434 handle->ref = lv2_atom_forge_sequence_head(forge, &frame, 0);
435
436 props_idle(&handle->props, &handle->forge, 0, &handle->ref);
437
438 // write outgoing data
439 LV2_ATOM_SEQUENCE_FOREACH(handle->osc_in, ev)
440 {
441 const LV2_Atom_Object *obj = (const LV2_Atom_Object *)&ev->body;
442
443 if(obj->atom.type == forge->Object)
444 {
445 if( !props_advance(&handle->props, &handle->forge, ev->time.frames, obj, &handle->ref)
446 && lv2_osc_is_message_or_bundle_type(&handle->osc_urid, obj->body.otype) )
447 {
448 uint8_t *dst;
449 size_t reserve = obj->atom.size;
450 if((dst = varchunk_write_request(handle->data.to_worker, reserve)))
451 {
452 LV2_OSC_Writer writer;
453 lv2_osc_writer_initialize(&writer, dst, reserve);
454 lv2_osc_writer_packet(&writer, &handle->osc_urid, handle->unmap, obj->atom.size, &obj->body);
455 size_t written;
456 lv2_osc_writer_finalize(&writer, &written);
457
458 if(written)
459 {
460 varchunk_write_advance(handle->data.to_worker, written);
461 }
462 }
463 else if(handle->log)
464 {
465 lv2_log_trace(&handle->logger, "output ringbuffer overflow");
466 }
467 }
468 }
469 }
470
471 // wake worker once per period
472 const int32_t dummy = 0;
473 handle->sched->schedule_work(handle->sched->handle, sizeof(int32_t), &dummy);
474
475 // reschedule scheduled bundles
476 if(handle->osc_sched)
477 {
478 for(list_t *l = handle->list; l->size; l++)
479 {
480 uint64_t time = be64toh(*(uint64_t *)(l->buf + 8));
481
482 double frames = handle->osc_sched->osc2frames(handle->osc_sched->handle, time);
483 if(frames < 0.0) // we may occasionally get -1 frames events when rescheduling
484 {
485 l->frames = 0.0;
486 }
487 else
488 {
489 l->frames = frames;
490 }
491 }
492 }
493
494 const unsigned nlist = handle->nlist;
495
496 // read incoming data
497 const uint8_t *ptr;
498 size_t size;
499 while((ptr = varchunk_read_request(handle->data.from_worker, &size)))
500 {
501 _unroll(handle, ptr, size);
502
503 varchunk_read_advance(handle->data.from_worker);
504 }
505
506 const unsigned added = handle->nlist - nlist;
507
508 if(added)
509 {
510 _sort_list(handle);
511 }
512
513 unsigned deleted = 0;
514
515 // handle scheduled bundles
516 for(list_t *l = handle->list; l->size; l++)
517 {
518 if(l->frames < 0.0) // late event
519 {
520 if(handle->log)
521 {
522 lv2_log_trace(&handle->logger, "late event: %lf samples", l->frames);
523 }
524
525 l->frames = 0.0; // dispatch as early as possible
526 }
527 else if(l->frames >= nsamples) // not scheduled for this period
528 {
529 break;
530 }
531
532 _parse(handle, l->frames, l->buf, l->size);
533
534 _invalidate_list(l);
535 deleted++;
536 }
537
538 if(deleted)
539 {
540 _sort_list(handle);
541 handle->nlist -= deleted;
542 }
543
544 if(handle->status_updated)
545 {
546 props_set(&handle->props, forge, nsamples-1, handle->uris.eteroj_connected, &handle->ref);
547 props_set(&handle->props, forge, nsamples-1, handle->uris.eteroj_error, &handle->ref);
548
549 handle->status_updated = false;
550 }
551
552 if(handle->ref)
553 {
554 lv2_atom_forge_pop(forge, &frame);
555 }
556 else
557 {
558 lv2_atom_sequence_clear(handle->osc_out);
559 }
560 }
561
562 static inline void
_handle_enum(plughandle_t * handle,LV2_OSC_Enum ev)563 _handle_enum(plughandle_t *handle, LV2_OSC_Enum ev)
564 {
565 const char *err = "";
566
567 if(ev & LV2_OSC_ERR)
568 {
569 char buf [STR_LEN] = { '\0' };
570 err = strerror_r(ev & LV2_OSC_ERR, buf, sizeof(buf));
571
572 if(!err)
573 {
574 err = "Unknown";
575 }
576
577 if(handle->log)
578 {
579 lv2_log_trace(&handle->logger, "%s\n", err);
580 }
581 }
582
583 if(strcmp(handle->state.osc_error, err))
584 {
585 strncpy(handle->state.osc_error, err, STR_LEN-1);
586
587 props_impl_t *impl = _props_impl_get(&handle->props, handle->uris.eteroj_error);
588 if(impl)
589 {
590 _props_impl_set(&handle->props, impl, handle->forge.String, strlen(err), err);
591 }
592
593 handle->status_updated = true;
594 }
595
596 const bool connected = (ev & LV2_OSC_CONN) == LV2_OSC_CONN;
597
598 if(handle->state.osc_connected != connected)
599 {
600 #if 0
601 if(handle->log)
602 lv2_log_trace(&handle->logger, "connected: %i\n", connected);
603 #endif
604
605 handle->state.osc_connected = connected;
606 handle->status_updated = true;
607 }
608 }
609
610 static inline LV2_OSC_Enum
_activate(plughandle_t * handle)611 _activate(plughandle_t *handle)
612 {
613 if(!handle->rolling && handle->osc_url)
614 {
615 const LV2_OSC_Enum ev = lv2_osc_stream_init(&handle->data.stream,
616 handle->osc_url, &handle->data.driver, handle);
617
618 if( (ev & LV2_OSC_ERR) == LV2_OSC_NONE)
619 {
620 handle->rolling = true;
621 }
622
623 return ev;
624 }
625
626 return LV2_OSC_NONE;
627 }
628
629 static void
activate(LV2_Handle instance)630 activate(LV2_Handle instance)
631 {
632 plughandle_t *handle = instance;
633
634 _activate(handle);
635 }
636
637 static inline void
_deactivate(plughandle_t * handle)638 _deactivate(plughandle_t *handle)
639 {
640 if(handle->rolling)
641 {
642 lv2_osc_stream_deinit(&handle->data.stream);
643 handle->rolling = false;
644 }
645 }
646
647 static void
deactivate(LV2_Handle instance)648 deactivate(LV2_Handle instance)
649 {
650 plughandle_t *handle = instance;
651
652 _deactivate(handle);
653 }
654
655 static void
cleanup(LV2_Handle instance)656 cleanup(LV2_Handle instance)
657 {
658 plughandle_t *handle = instance;
659
660 varchunk_free(handle->data.from_worker);
661 varchunk_free(handle->data.to_worker);
662 varchunk_free(handle->data.to_thread);
663
664 if(handle->osc_url)
665 {
666 free(handle->osc_url);
667 }
668
669 munlock(handle, sizeof(plughandle_t));
670 free(handle);
671 }
672
673 // non-rt thread
674 static LV2_Worker_Status
_work(LV2_Handle instance,LV2_Worker_Respond_Function respond,LV2_Worker_Respond_Handle target,uint32_t _size,const void * _body)675 _work(LV2_Handle instance,
676 LV2_Worker_Respond_Function respond,
677 LV2_Worker_Respond_Handle target,
678 uint32_t _size,
679 const void *_body)
680 {
681 plughandle_t *handle = instance;
682 char *osc_url = NULL;
683
684 size_t size;
685 const uint8_t *body;
686 while((body = varchunk_read_request(handle->data.to_thread, &size)))
687 {
688 LV2_OSC_Reader reader;
689 LV2_OSC_Arg arg = {0};
690 lv2_osc_reader_initialize(&reader, body, size);
691 lv2_osc_reader_arg_begin(&reader, &arg, size);
692
693 if(!strcmp(arg.path, "/eteroj/url"))
694 {
695 if(osc_url)
696 {
697 free(osc_url);
698 }
699 osc_url = strdup(arg.s);
700 }
701
702 varchunk_read_advance(handle->data.to_thread);
703 }
704
705 if(osc_url)
706 {
707 if(handle->osc_url)
708 {
709 free(handle->osc_url);
710 }
711 handle->osc_url = osc_url;
712
713 _deactivate(handle);
714 }
715
716 LV2_OSC_Enum ev = _activate(handle);
717
718 if(handle->rolling)
719 {
720 ev |= lv2_osc_stream_run(&handle->data.stream);
721 }
722
723 respond(target, sizeof(LV2_OSC_Enum), &ev);
724
725 return LV2_WORKER_SUCCESS;
726 }
727
728 // rt-thread
729 static LV2_Worker_Status
_work_response(LV2_Handle instance,uint32_t size,const void * body)730 _work_response(LV2_Handle instance, uint32_t size, const void *body)
731 {
732 plughandle_t *handle = instance;
733
734 if(size == sizeof(LV2_OSC_Enum))
735 {
736 const LV2_OSC_Enum *ev = body;
737
738 _handle_enum(handle, *ev);
739 }
740
741 return LV2_WORKER_SUCCESS;
742 }
743
744 // rt-thread
745 static LV2_Worker_Status
_end_run(LV2_Handle instance)746 _end_run(LV2_Handle instance)
747 {
748 // do nothing
749
750 return LV2_WORKER_SUCCESS;
751 }
752
753 static const LV2_Worker_Interface work_iface = {
754 .work = _work,
755 .work_response = _work_response,
756 .end_run = _end_run
757 };
758
759 static const void*
extension_data(const char * uri)760 extension_data(const char* uri)
761 {
762 if(!strcmp(uri, LV2_WORKER__interface))
763 {
764 return &work_iface;
765 }
766 else if(!strcmp(uri, LV2_STATE__interface))
767 {
768 return &state_iface;
769 }
770
771 return NULL;
772 }
773
774 const LV2_Descriptor eteroj_io = {
775 .URI = ETEROJ_IO_URI,
776 .instantiate = instantiate,
777 .connect_port = connect_port,
778 .activate = activate,
779 .run = run,
780 .deactivate = deactivate,
781 .cleanup = cleanup,
782 .extension_data = extension_data
783 };
784