1 /*
2 * Copyright: 2018-2020, Bjorn Stahl
3 * License: 3-Clause BSD
4 * Description: Implements a support wrapper for the a12 function patterns used
5 * to implement multiple incoming a12 clients and map to one local connection point.
6 */
7 #include <arcan_shmif.h>
8 #include <arcan_shmif_server.h>
9 #include <errno.h>
10 #include <unistd.h>
11 #include <signal.h>
12 #include <poll.h>
13 #include <fcntl.h>
14 #include <inttypes.h>
15 #include <sys/wait.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <stdatomic.h>
19 #include <pthread.h>
20
21 #include "a12.h"
22 #include "a12_int.h"
23 #include "a12_helper.h"
24 #include "arcan_mem.h"
25
26 struct shmifsrv_thread_data {
27 struct shmifsrv_client* C;
28 struct a12_state* S;
29 struct arcan_shmif_cont fake;
30 struct a12helper_opts opts;
31 float font_sz;
32 int kill_fd;
33 uint8_t chid;
34 };
35
36 /* [THREADING]
37 * We use a giant lock (a12 not liking multiple- call into via threading, mostly
38 * due to the buffer function and the channel- state tracker, not impossible just
39 * easier to solve like this).
40 *
41 * The buffer-out is simply a way for the main feed thread to forward the current
42 * pending buffer output
43 *
44 */
45 static bool spawn_thread(struct shmifsrv_thread_data* inarg);
46 static pthread_mutex_t giant_lock = PTHREAD_MUTEX_INITIALIZER;
47
48 static const char* last_lock;
49 static _Atomic volatile size_t buffer_out = 0;
50 static _Atomic volatile uint8_t n_segments;
51
52 #define BEGIN_CRITICAL(X, Y) do{pthread_mutex_lock(X); last_lock = Y;} while(0);
53 #define END_CRITICAL(X) do{pthread_mutex_unlock(X);} while(0);
54
55 /*
56 * Figure out encoding parameters based on client type and buffer parameters.
57 * This is the first heurstic catch-all to later feed in backpressure,
58 * bandwidth and load estimation parameters.
59 */
vopts_from_segment(struct shmifsrv_thread_data * data,struct shmifsrv_vbuffer vb)60 static struct a12_vframe_opts vopts_from_segment(
61 struct shmifsrv_thread_data* data, struct shmifsrv_vbuffer vb)
62 {
63 /* force tpack regardless, tpack doesn't have tuning like this */
64 if (vb.flags.tpack){
65 a12int_trace(A12_TRACE_VIDEO, "tpack segment");
66 return (struct a12_vframe_opts){
67 .method = VFRAME_METHOD_TPACK_ZSTD
68 };
69 }
70
71 /* outsource eval */
72 if (data->opts.eval_vcodec){
73 return data->opts.eval_vcodec(data->S,
74 shmifsrv_client_type(data->C), &vb, data->opts.tag);
75 }
76
77 return (struct a12_vframe_opts){
78 .method = VFRAME_METHOD_DZSTD,
79 .bias = VFRAME_BIAS_BALANCED
80 };
81 }
82
83 extern uint8_t* arcan_base64_encode(
84 const uint8_t* data, size_t inl, size_t* outl, enum arcan_memhint hint);
85
86 /*
87 * the bhandler -related calls are already running inside critical as
88 * they come from the data parsing itself.
89 */
dispatch_bdata(struct a12_state * S,int fd,int type,struct shmifsrv_thread_data * D)90 static void dispatch_bdata(
91 struct a12_state* S, int fd, int type, struct shmifsrv_thread_data* D)
92 {
93 struct shmifsrv_client* srv_cl = D->C;
94 switch(type){
95 case A12_BTYPE_STATE:
96 shmifsrv_enqueue_event(srv_cl, &(struct arcan_event){
97 .category = EVENT_TARGET, .tgt.kind = TARGET_COMMAND_RESTORE}, fd);
98 break;
99 /* There is a difference to the local case here in that the event which carries
100 * the desired size etc. will be sent in advance, thus there will be two times
101 * the number of FONTHINTS, which can result in glitches. This may cost an
102 * update / redraw, but allows blocking font transfers and still maintaing the
103 * right size. */
104 case A12_BTYPE_FONT:
105 shmifsrv_enqueue_event(srv_cl, &(struct arcan_event){
106 .category = EVENT_TARGET, .tgt.kind = TARGET_COMMAND_FONTHINT,
107 .tgt.ioevs[1].iv = 1, .tgt.ioevs[2].fv = D->font_sz, .tgt.ioevs[3].iv = -1},
108 fd);
109 break;
110 /* the [4].iv here is an indication that the font should be appended to the
111 * set used and defined by the first */
112 case A12_BTYPE_FONT_SUPPL:
113 shmifsrv_enqueue_event(srv_cl, &(struct arcan_event){
114 .category = EVENT_TARGET, .tgt.kind = TARGET_COMMAND_FONTHINT,
115 .tgt.ioevs[1].iv = 1, .tgt.ioevs[2].fv = D->font_sz, .tgt.ioevs[3].iv = -1,
116 .tgt.ioevs[4].iv = 1}, fd
117 );
118 break;
119 /* Another subtle difference similar to the fonthint detail is that the blob
120 * here actually lacks the identifier string, even if it was set previously.
121 * If that turns out to be an issue (the requirement is that client should not
122 * care/trust the identifier regardless, as it is mainly used as a UI hint and
123 * whatever parser should be tolerant or indicative of how it didn't understand
124 * the data. Should this be too much of an issue, the transfer- token system
125 * could be used to pair the event */
126 case A12_BTYPE_BLOB:
127 shmifsrv_enqueue_event(srv_cl, &(struct arcan_event){
128 .category = EVENT_TARGET, .tgt.kind = TARGET_COMMAND_BCHUNK_IN}, fd);
129 break;
130 default:
131 a12int_trace(A12_TRACE_SYSTEM,
132 "kind=error:status=EBADTYPE:message=%d", type);
133 break;
134 }
135
136 close(fd);
137 }
138
incoming_bhandler(struct a12_state * S,struct a12_bhandler_meta md,void * tag)139 static struct a12_bhandler_res incoming_bhandler(
140 struct a12_state* S, struct a12_bhandler_meta md, void* tag)
141 {
142 struct a12_bhandler_res res = {
143 .fd = -1,
144 .flag = A12_BHANDLER_DONTWANT
145 };
146
147 /* early out case - on completed / on cancelled, the descriptor either
148 * comes from the temp or the cache so no unlink is needed, just fwd+close */
149 if (md.fd != -1){
150
151 /* these events cover only one direction, the other (_OUT, _STORE, ...)
152 * requires this side to be proactive when receiving the event, forward the
153 * empty output descriptor and use the bstream-queue command combined with
154 * flock- excl/nonblock to figure out when done. */
155 if (md.dcont && md.dcont->user &&
156 !md.streaming && md.state != A12_BHANDLER_CANCELLED){
157 a12int_trace(A12_TRACE_BTRANSFER,
158 "kind=accept:ch=%d:stream=%"PRIu64, md.channel, md.streamid);
159 dispatch_bdata(S, md.fd, md.type, md.dcont->user);
160 }
161 /* already been dispatched as a pipe */
162 else
163 close(md.fd);
164
165 return res;
166 }
167
168 /* So the handler wants a descriptor for us to store or stream the transfer
169 * into. If it is streaming, a pipe is sufficient and we can start the fwd
170 * immediately. */
171 if (md.streaming){
172 int fd[2];
173 if (-1 != pipe(fd)){
174 res.flag = A12_BHANDLER_NEWFD;
175 res.fd = fd[1];
176 dispatch_bdata(S, md.fd, res.fd, md.dcont->user);
177 }
178 return res;
179 }
180
181 /* INCOMPLETE
182 * If there is a !0 checksum and a cache_dir has been set, check the cache for
183 * a possible match by going to base64 and try to open. If successful, update
184 * its timestamp, return that it was cached and trigger dispatch_bdata */
185 size_t i = 0;
186 for (; i < 16; i++){
187 if (md.checksum[i] != 0){
188 break;
189 }
190 }
191 if (0 && i == 16){
192 a12int_trace(A12_TRACE_MISSING,
193 "btransfer cache-lookup and forward");
194 res.flag = A12_BHANDLER_CACHED;
195 return res;
196 }
197
198 /*
199 * Last case, real file with a known destination type and size. Since there is
200 * a format requirement that the non-streaming type is seekable, we only have a
201 * memfd- like approach if the temp-dir creation fails to work with ftruncate
202 * being too large. With the glory that is overcommit and OMK, we might just
203 * DGAF.
204 */
205 /* INCOMPLETE:
206 * we need a mkstemp that works with a dirfd as well and a mkdstemp for the
207 * netproxy tool, so just abuse /tmp for the time being and then add them to
208 * the src/platform/...
209 */
210 char pattern[] = {"/tmp/anetb_XXXXXX"};
211 int fd = mkstemp(pattern);
212 if (-1 == fd){
213 a12int_trace(A12_TRACE_ALLOC, "source=btransfer:kind=eperm");
214 return res;
215 }
216
217 /* In patient wait for funlinkat, enjoy the race.. */
218 unlink(pattern);
219
220 /* Note: the size field comes across a privilege barrier and can be > very <
221 * large, possible allow an option to set an upper cap and reject single
222 * transfers over a certain limit */
223 if (-1 == ftruncate(fd, md.known_size)){
224 a12int_trace(A12_TRACE_ALLOC, "source=btransfer:kind=enospace");
225 close(fd);
226 }
227 else {
228 res.fd = fd;
229 res.flag = A12_BHANDLER_NEWFD;
230 }
231
232 return res;
233 }
234
setup_descriptor_store(struct shmifsrv_thread_data * data,struct shmifsrv_client * cl,struct arcan_event * ev)235 static void setup_descriptor_store(struct shmifsrv_thread_data* data,
236 struct shmifsrv_client* cl, struct arcan_event* ev)
237 {
238 a12int_trace(A12_TRACE_MISSING, "descriptor_store_setup");
239 }
240
redirect_exit(struct shmifsrv_client * C,int level,const char * path)241 static void redirect_exit(
242 struct shmifsrv_client* C, int level, const char* path)
243 {
244 if (!path)
245 return;
246
247 struct arcan_event ev = {
248 .category = EVENT_TARGET,
249 .tgt.kind = TARGET_COMMAND_DEVICE_NODE,
250 .tgt.ioevs[0].iv = -1,
251 .tgt.ioevs[1].iv = level,
252 /* this will ignore the GUID setting */
253 };
254
255 snprintf(ev.tgt.message, COUNT_OF(ev.tgt.message), "%s", path);
256 shmifsrv_enqueue_event(C, &ev, -1);
257 a12int_trace(A12_TRACE_EVENT, "kind=redirect:destination=%s", path);
258 }
259
260 /*
261 * [THREADING]
262 * Called from within a _lock / _unlock block
263 */
on_srv_event(struct arcan_shmif_cont * cont,int chid,struct arcan_event * ev,void * tag)264 static void on_srv_event(
265 struct arcan_shmif_cont* cont, int chid, struct arcan_event* ev, void* tag)
266 {
267 struct shmifsrv_thread_data* data = tag;
268 if (!cont){
269 a12int_trace(A12_TRACE_SYSTEM, "kind=error:type=EINVALCH:val=%d", chid);
270 return;
271 }
272 struct shmifsrv_client* srv_cl = data->C;
273
274 /*
275 * cache this so we can re-inject on font hints where there is a descriptor as
276 * that is delivered as a sideband via btransfer
277 */
278 if (ev->category == EVENT_TARGET && ev->tgt.kind == TARGET_COMMAND_FONTHINT){
279 data->font_sz = ev->tgt.ioevs[2].fv;
280 ev->tgt.ioevs[1].iv = 0;
281 ev->tgt.ioevs[0].iv = -1;
282
283 /* mask continuation entirely */
284 if (ev->tgt.ioevs[4].iv)
285 return;
286 }
287
288 /*
289 * when activated (and we have another possible known connection point)
290 * set that as the client fallback, so that we can do both explicit
291 * migration and crash recovery
292 */
293 if (data->opts.devicehint_cp && chid == 0 &&
294 ev->category == EVENT_TARGET && ev->tgt.kind == TARGET_COMMAND_ACTIVATE){
295 a12int_trace(A12_TRACE_EVENT,
296 "kind=activate:fallback_to:%s", data->opts.devicehint_cp);
297 struct arcan_event ev = {
298 .category = EVENT_TARGET,
299 .tgt.kind = TARGET_COMMAND_DEVICE_NODE,
300 .tgt.ioevs[0].iv = -1,
301 .tgt.ioevs[1].iv = 4 /* fallback */
302 };
303
304 snprintf(ev.tgt.message,
305 COUNT_OF(ev.tgt.message), "%s", data->opts.devicehint_cp);
306 shmifsrv_enqueue_event(srv_cl, &ev, -1);
307 }
308
309 a12int_trace(A12_TRACE_EVENT,
310 "kind=forward:chid=%d:eventstr=%s", chid, arcan_shmif_eventstr(ev, NULL, 0));
311
312 /*
313 * Some events are valuable to intercept and tweak:
314 *
315 * EXIT - unconditional termination is problematic, if we have a local connection-
316 * point defined already, it is better to send the window there for the primary
317 * segment.
318 *
319 * The thing is, we can't guarantee that the EXIT event will arrive if the
320 * channel is forcibly closed etc. so it also needs to be accounted for later
321 */
322 if (data->opts.redirect_exit && chid == 0 &&
323 ev->category == EVENT_TARGET && ev->tgt.kind == TARGET_COMMAND_EXIT){
324 redirect_exit(srv_cl, 2, data->opts.redirect_exit);
325 return;
326 }
327
328 /*
329 * NEWEVENT - necessary, need to map to new channels.
330 */
331 if (ev->category != EVENT_TARGET || ev->tgt.kind != TARGET_COMMAND_NEWSEGMENT){
332 if (!srv_cl){
333 a12int_trace(A12_TRACE_SYSTEM, "kind=error:type=EINVAL:val=%d", chid);
334 }
335 else{
336 /* The descriptor events are more complicated as they can be either incoming or
337 * outgoing. When we talk _STORE or BCHUNK_OUT events arriving here, they won't
338 * actually have a descriptor with them, we need to create the temp. resource,
339 * add the descriptor and then a12_enqueue_bstream accordingly. In addition,
340 * we actually need to pair the token that was added in the descriptor slot so
341 * the other side know where to dump it into */
342 if (arcan_shmif_descrevent(ev)){
343 if (ev->tgt.kind == TARGET_COMMAND_STORE ||
344 ev->tgt.kind == TARGET_COMMAND_BCHUNK_OUT){
345 a12int_trace(A12_TRACE_BTRANSFER, "kind=status:message=outgoing_bchunk");
346 setup_descriptor_store(data, srv_cl, ev);
347 }
348 else
349 shmifsrv_enqueue_event(srv_cl, ev, ev->tgt.ioevs[0].iv);
350 }
351 else
352 shmifsrv_enqueue_event(srv_cl, ev, -1);
353 }
354 return;
355 }
356
357 /* First we need a copy of the current processing thread structure */
358 struct shmifsrv_thread_data* new_data =
359 malloc(sizeof(struct shmifsrv_thread_data));
360 if (!new_data){
361 a12int_trace(A12_TRACE_SYSTEM,
362 "kind=error:type=ENOMEM:src_ch=%d:dst_ch=%d", chid, ev->tgt.ioevs[0].iv);
363 a12_set_channel(data->S, chid);
364 a12_channel_close(data->S);
365 return;
366 }
367 *new_data = *data;
368
369 /* Then we forward the subsegment to the local client */
370 new_data->chid = ev->tgt.ioevs[0].iv;
371 new_data->C = shmifsrv_send_subsegment(
372 srv_cl, ev->tgt.ioevs[2].iv, 0, 32, 32, chid, ev->tgt.ioevs[3].uiv);
373
374 /* That can well fail (descriptors etc.) */
375 if (!new_data->C){
376 a12int_trace(A12_TRACE_SYSTEM,
377 "kind=error:type=ENOSRVMEM:src_ch=%d:dst_ch=%d", chid, ev->tgt.ioevs[0].iv);
378 free(new_data);
379 a12_set_channel(data->S, chid);
380 a12_channel_close(data->S);
381 }
382
383 /* Attach to a new processing thread, and tie this channel to the 'fake'
384 * segment that we only use to extract back the events and so on to for now,
385 * when we deal with 'output' segments as well, this is where we'd need to do
386 * the encoding dance */
387 new_data->fake.user = new_data;
388 a12_set_destination(data->S, &new_data->fake, new_data->chid);
389
390 if (!spawn_thread(new_data)){
391 a12int_trace(A12_TRACE_SYSTEM,
392 "kind=error:type=ENOTHRDMEM:src_ch=%d:dst_ch=%d", chid, ev->tgt.ioevs[0].iv);
393 free(new_data);
394 a12_set_channel(data->S, chid);
395 a12_channel_close(data->S);
396 return;
397 }
398
399 a12int_trace(A12_TRACE_ALLOC,
400 "kind=new_channel:src_ch=%d:dst_ch=%d", chid, (int)new_data->chid);
401 }
402
on_audio_cb(shmif_asample * buf,size_t n_samples,unsigned channels,unsigned rate,void * tag)403 static void on_audio_cb(shmif_asample* buf,
404 size_t n_samples, unsigned channels, unsigned rate, void* tag)
405 {
406 return;
407 struct a12_state* S = tag;
408 a12_channel_aframe(S, buf, n_samples,
409 (struct a12_aframe_cfg){
410 .channels = channels,
411 .samplerate = rate
412 },
413 (struct a12_aframe_opts){
414 .method = AFRAME_METHOD_RAW
415 }
416 );
417 }
418
client_thread(void * inarg)419 static void* client_thread(void* inarg)
420 {
421 struct shmifsrv_thread_data* data = inarg;
422 static const short errmask = POLLERR | POLLNVAL | POLLHUP;
423 struct pollfd pfd[2] = {
424 { .fd = shmifsrv_client_handle(data->C), .events = POLLIN | errmask },
425 { .fd = data->kill_fd, errmask }
426 };
427
428 /* We don't have a monitorable trigger for inbound video/audio frames, so some
429 * timeout is in order for the time being. It might be useful to add that kind
430 * of signalling to shmif though */
431 size_t poll_step = 4;
432
433 /* the ext-io thread might be sleeping waiting for input, when we finished
434 * one pass/burst and know there is queued data to be sent, wake it up */
435 bool dirty = false;
436 redirect_exit(data->C, 4, data->opts.redirect_exit);
437
438 for(;;){
439 if (dirty){
440 write(data->kill_fd, &data->chid, 1);
441 dirty = false;
442 }
443
444 if (-1 == poll(pfd, 2, poll_step)){
445 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR){
446 a12int_trace(A12_TRACE_SYSTEM,
447 "kind=error:status=EPOLL:message=%s", strerror(errno));
448 break;
449 }
450 }
451
452 /* kill socket or poll socket died */
453 if ((pfd[0].revents & errmask) || (pfd[1].revents & errmask)){
454 a12int_trace(A12_TRACE_SYSTEM,
455 "kind=error:status=EBADFD:client=%d:killsig=%d", pfd[0].revents, pfd[1].revents);
456 break;
457 }
458
459 struct arcan_event ev;
460
461 while (shmifsrv_dequeue_events(data->C, &ev, 1)){
462 if (arcan_shmif_descrevent(&ev)){
463 a12int_trace(A12_TRACE_SYSTEM,
464 "kind=error:status=EINVAL:message=client->server descriptor event");
465 continue;
466 }
467
468 /* server-consumed or should be forwarded? */
469 BEGIN_CRITICAL(&giant_lock, "client_event");
470 if (shmifsrv_process_event(data->C, &ev)){
471 a12int_trace(A12_TRACE_EVENT,
472 "kind=consumed:channel=%d:eventstr=%s",
473 data->chid, arcan_shmif_eventstr(&ev, NULL, 0)
474 );
475 }
476 else {
477 a12_set_channel(data->S, data->chid);
478 a12int_trace(A12_TRACE_EVENT, "kind=forward:channel=%d:eventstr=%s",
479 data->chid, arcan_shmif_eventstr(&ev, NULL, 0));
480 a12_channel_enqueue(data->S, &ev);
481 dirty = true;
482 }
483 END_CRITICAL(&giant_lock);
484 }
485
486 int pv;
487 while ((pv = shmifsrv_poll(data->C)) != CLIENT_NOT_READY){
488 /* Dead client, send the close message and that should cascade down the rest
489 * and kill relevant sockets. */
490 if (pv == CLIENT_DEAD){
491 a12int_trace(A12_TRACE_EVENT, "client=dead");
492 goto out;
493 }
494
495 /* the shared buffer_out marks if we should wait a bit before releasing the
496 * client as to not keep oversaturating with incoming video frames, we could
497 * threshold this to something more reasonable, or just have two congestion
498 * levels, one for focused channel and one lower for the rest */
499 if (pv & CLIENT_VBUFFER_READY){
500 if (atomic_load(&buffer_out) > 0){
501 break;
502 }
503
504 /* check the congestion window - there are many more options for congestion
505 * control here, and the tuning is not figured out. One venue would be to
506 * track which channel has a segment with focus, and prioritise those higher. */
507 struct a12_iostat stat = a12_state_iostat(data->S);
508 struct shmifsrv_vbuffer vb = shmifsrv_video(data->C);
509
510 if (data->opts.vframe_block &&
511 stat.vframe_backpressure >= data->opts.vframe_soft_block){
512
513 /* the soft block caps at ~20% of buffer difs for large buffers, the other
514 * option is to have aggregation and dirty rectangles here, then invalidate if
515 * they accumulate to cover all */
516 size_t px_c = vb.w * vb.h;
517 size_t reg_c =
518 (vb.region.x2 - vb.region.x1) * (vb.region.y2 - vb.region.y1);
519 bool allow_soft = vb.flags.subregion &&
520 (reg_c < px_c) && ((float)reg_c / (float)px_c) <= 0.2;
521
522 if (stat.vframe_backpressure >= data->opts.vframe_block || !allow_soft){
523 a12int_trace(A12_TRACE_VDETAIL,
524 "vbuffer=defer:congestion=%zu:soft=:%zu:limit=%zu",
525 stat.vframe_backpressure, data->opts.vframe_soft_block,
526 data->opts.vframe_block
527 );
528 break;
529 }
530 }
531
532 /* two option, one is to map the dma-buf ourselves and do the readback, or with
533 * streams map the stream and convert to h264 on gpu, but easiest now is to
534 * just reject and let the caller do the readback. this is currently done by
535 * default in shmifsrv.*/
536 BEGIN_CRITICAL(&giant_lock, "video-buffer");
537 a12_set_channel(data->S, data->chid);
538
539 /* vopts_from_segment here lets the caller pick compression parameters (coarse),
540 * including the special 'defer this frame until later' */
541 a12_channel_vframe(data->S, &vb, vopts_from_segment(data, vb));
542 dirty = true;
543 END_CRITICAL(&giant_lock);
544 stat = a12_state_iostat(data->S);
545 a12int_trace(A12_TRACE_VDETAIL,
546 "vbuffer=release:time_ms=%zu:time_ms_px=%.4f:congestion=%zu",
547 stat.ms_vframe, stat.ms_vframe_px,
548 stat.vframe_backpressure
549 );
550
551 /* the other part is to, after a certain while of VBUFFER_READY but not any
552 * buffer- out space, track if any of our segments have focus, if so, inject it
553 * anyhow (should help responsiveness), increase video compression time-
554 * tradeoff and defer the step stage so the client gets that we are limited */
555 shmifsrv_video_step(data->C);
556 }
557
558 /* send audio anyway, as not all clients are providing audio and there is less
559 * tricks that can be applied from the backpressured client, dynamic resampling
560 * and heavier compression is an option here as well though */
561 if (pv & CLIENT_ABUFFER_READY){
562 a12int_trace(A12_TRACE_AUDIO, "audio-buffer");
563 BEGIN_CRITICAL(&giant_lock, "audio_buffer");
564 a12_set_channel(data->S, data->chid);
565 shmifsrv_audio(data->C, on_audio_cb, data->S);
566 dirty = true;
567 END_CRITICAL(&giant_lock);
568 }
569 }
570
571 }
572
573 out:
574 BEGIN_CRITICAL(&giant_lock, "client_death");
575 a12_set_channel(data->S, data->chid);
576 a12_channel_close(data->S);
577 write(data->kill_fd, &data->chid, 1);
578 a12int_trace(A12_TRACE_SYSTEM, "client died");
579 END_CRITICAL(&giant_lock);
580
581 /* only shut-down everything on the primary- segment failure */
582 if (data->chid == 0 && data->kill_fd != -1)
583 close(data->kill_fd);
584
585 /* don't kill the shmifsrv client session for the primary one */
586 if (data->chid != 0){
587 shmifsrv_free(data->C, SHMIFSRV_FREE_NO_DMS);
588 }
589
590 atomic_fetch_sub(&n_segments, 1);
591
592 free(inarg);
593 return NULL;
594 }
595
spawn_thread(struct shmifsrv_thread_data * inarg)596 static bool spawn_thread(struct shmifsrv_thread_data* inarg)
597 {
598 pthread_t pth;
599 pthread_attr_t pthattr;
600 pthread_attr_init(&pthattr);
601 pthread_attr_setdetachstate(&pthattr, PTHREAD_CREATE_DETACHED);
602 atomic_fetch_add(&n_segments, 1);
603
604 if (-1 == pthread_create(&pth, &pthattr, client_thread, inarg)){
605 BEGIN_CRITICAL(&giant_lock, "cleanup-spawn");
606 atomic_fetch_sub(&n_segments, 1);
607 a12int_trace(A12_TRACE_ALLOC, "could not spawn thread");
608 free(inarg);
609 END_CRITICAL(&giant_lock);
610 return false;
611 }
612
613 return true;
614 }
615
a12helper_a12cl_shmifsrv(struct a12_state * S,struct shmifsrv_client * C,int fd_in,int fd_out,struct a12helper_opts opts)616 void a12helper_a12cl_shmifsrv(struct a12_state* S,
617 struct shmifsrv_client* C, int fd_in, int fd_out, struct a12helper_opts opts)
618 {
619 uint8_t* outbuf = NULL;
620 size_t outbuf_sz = 0;
621
622
623 /* tie an empty context as channel destination, we use this as a type- wrapper
624 * for the shmifsrv_client now, this logic is slightly different on the
625 * shmifsrv_client side. */
626 struct arcan_shmif_cont fake = {};
627 a12_set_destination(S, &fake, 0);
628 a12_set_bhandler(S, incoming_bhandler, &opts);
629
630 int pipe_pair[2];
631 if (-1 == pipe(pipe_pair))
632 return;
633
634 /*
635 * Spawn the processing- thread that will take care of a shmifsrv_client
636 */
637 struct shmifsrv_thread_data* arg;
638 arg = malloc(sizeof(struct shmifsrv_thread_data));
639 if (!arg){
640 close(pipe_pair[0]);
641 close(pipe_pair[1]);
642 return;
643 }
644
645 /*
646 * the kill_fd will be shared among the other segments, so it is only
647 * here where there is reason to clean it up like this
648 */
649 *arg = (struct shmifsrv_thread_data){
650 .C = C,
651 .S = S,
652 .kill_fd = pipe_pair[1],
653 .opts = opts,
654 .chid = 0
655 };
656 if (!spawn_thread(arg)){
657 close(pipe_pair[0]);
658 close(pipe_pair[1]);
659 return;
660 }
661 fake.user = arg;
662
663 /* Socket in/out liveness, buffer flush / dispatch */
664 size_t n_fd = 2;
665 static const short errmask = POLLERR | POLLNVAL | POLLHUP;
666 struct pollfd fds[3] = {
667 { .fd = fd_in, .events = POLLIN | errmask},
668 { .fd = pipe_pair[0], .events = POLLIN | errmask},
669 { .fd = fd_out, .events = POLLOUT | errmask}
670 };
671
672 /* flush authentication leftovers */
673 a12_unpack(S, NULL, 0, arg, on_srv_event);
674
675 uint8_t inbuf[9000];
676 while(a12_ok(S) && -1 != poll(fds, n_fd, -1)){
677
678 /* death by poll? */
679 if ((fds[0].revents & errmask) ||
680 (fds[1].revents & errmask) ||
681 (n_fd == 3 && (fds[2].revents & errmask))){
682 break;
683 }
684
685 /* flush wakeup data from threads */
686 if (fds[1].revents){
687 if (a12_trace_targets & A12_TRACE_TRANSFER){
688 BEGIN_CRITICAL(&giant_lock, "flush-iopipe");
689 a12int_trace(
690 A12_TRACE_TRANSFER, "client thread wakeup");
691 END_CRITICAL(&giant_lock);
692 }
693
694 read(fds[1].fd, inbuf, 9000);
695 }
696
697 /* pending out, flush or grab next out buffer */
698 if (n_fd == 3 && (fds[2].revents & POLLOUT) && outbuf_sz){
699 ssize_t nw = write(fd_out, outbuf, outbuf_sz);
700
701 if (a12_trace_targets & A12_TRACE_TRANSFER){
702 BEGIN_CRITICAL(&giant_lock, "buffer-send");
703 a12int_trace(
704 A12_TRACE_TRANSFER, "send %zd (left %zu) bytes", nw, outbuf_sz);
705 END_CRITICAL(&giant_lock);
706 }
707
708 if (nw > 0){
709 outbuf += nw;
710 outbuf_sz -= nw;
711 }
712 }
713
714 /* then read and unpack incoming data, note that in the on_srv_event handler
715 * we ALREADY HOLD THE LOCK so it is a deadlock condition to try and lock there */
716 if (fds[0].revents & POLLIN){
717 ssize_t nr = recv(fd_in, inbuf, 9000, 0);
718 if (-1 == nr && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR){
719 if (a12_trace_targets & A12_TRACE_SYSTEM){
720 BEGIN_CRITICAL(&giant_lock, "data error");
721 a12int_trace(A12_TRACE_SYSTEM, "data-in, error: %s", strerror(errno));
722 END_CRITICAL(&giant_lock);
723 }
724 break;
725 }
726 /* pollin- says yes, but recv says no? */
727 if (nr == 0){
728 BEGIN_CRITICAL(&giant_lock, "socket closed");
729 a12int_trace(A12_TRACE_SYSTEM, "data-in, other side closed connection");
730 END_CRITICAL(&giant_lock);
731 break;
732 }
733
734 BEGIN_CRITICAL(&giant_lock, "unpack-event");
735 a12int_trace(A12_TRACE_TRANSFER, "unpack %zd bytes", nr);
736 a12_unpack(S, inbuf, nr, arg, on_srv_event);
737 END_CRITICAL(&giant_lock);
738 }
739
740 if (!outbuf_sz){
741 BEGIN_CRITICAL(&giant_lock, "get-buffer");
742 outbuf_sz = a12_flush(S, &outbuf, 0);
743 END_CRITICAL(&giant_lock);
744 }
745 n_fd = outbuf_sz > 0 ? 3 : 2;
746 }
747
748 a12int_trace(A12_TRACE_SYSTEM, "(srv) shutting down connection");
749 close(pipe_pair[0]);
750 while(atomic_load(&n_segments) > 0){}
751 if (!a12_free(S)){
752 a12int_trace(A12_TRACE_ALLOC, "error cleaning up a12 context");
753 }
754
755 /* only the primary segment left, we will try and migrate that one,
756 * sending the DEVICE_NODE migrate event and performing a non-dms drop */
757 redirect_exit(C, 2, opts.redirect_exit);
758 }
759