1 /*
2  * Copyright: 2018-2020, Bjorn Stahl
3  * License: 3-Clause BSD
4  * Description: Implements a support wrapper for the a12 function patterns used
5  * to implement multiple incoming a12 clients and map to one local connection point.
6  */
7 #include <arcan_shmif.h>
8 #include <arcan_shmif_server.h>
9 #include <errno.h>
10 #include <unistd.h>
11 #include <signal.h>
12 #include <poll.h>
13 #include <fcntl.h>
14 #include <inttypes.h>
15 #include <sys/wait.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <stdatomic.h>
19 #include <pthread.h>
20 
21 #include "a12.h"
22 #include "a12_int.h"
23 #include "a12_helper.h"
24 #include "arcan_mem.h"
25 
26 struct shmifsrv_thread_data {
27 	struct shmifsrv_client* C;
28 	struct a12_state* S;
29 	struct arcan_shmif_cont fake;
30 	struct a12helper_opts opts;
31 	float font_sz;
32 	int kill_fd;
33 	uint8_t chid;
34 };
35 
36 /* [THREADING]
37  * We use a giant lock (a12 not liking multiple- call into via threading, mostly
38  * due to the buffer function and the channel- state tracker, not impossible just
39  * easier to solve like this).
40  *
41  * The buffer-out is simply a way for the main feed thread to forward the current
42  * pending buffer output
43  *
44 */
45 static bool spawn_thread(struct shmifsrv_thread_data* inarg);
46 static pthread_mutex_t giant_lock = PTHREAD_MUTEX_INITIALIZER;
47 
48 static const char* last_lock;
49 static _Atomic volatile size_t buffer_out = 0;
50 static _Atomic volatile uint8_t n_segments;
51 
52 #define BEGIN_CRITICAL(X, Y) do{pthread_mutex_lock(X); last_lock = Y;} while(0);
53 #define END_CRITICAL(X) do{pthread_mutex_unlock(X);} while(0);
54 
55 /*
56  * Figure out encoding parameters based on client type and buffer parameters.
57  * This is the first heurstic catch-all to later feed in backpressure,
58  * bandwidth and load estimation parameters.
59  */
vopts_from_segment(struct shmifsrv_thread_data * data,struct shmifsrv_vbuffer vb)60 static struct a12_vframe_opts vopts_from_segment(
61 	struct shmifsrv_thread_data* data, struct shmifsrv_vbuffer vb)
62 {
63 /* force tpack regardless, tpack doesn't have tuning like this */
64 	if (vb.flags.tpack){
65 		a12int_trace(A12_TRACE_VIDEO, "tpack segment");
66 		return (struct a12_vframe_opts){
67 			.method = VFRAME_METHOD_TPACK_ZSTD
68 		};
69 	}
70 
71 /* outsource eval */
72 	if (data->opts.eval_vcodec){
73 		return data->opts.eval_vcodec(data->S,
74 			shmifsrv_client_type(data->C), &vb, data->opts.tag);
75 	}
76 
77 	return (struct a12_vframe_opts){
78 		.method = VFRAME_METHOD_DZSTD,
79 			.bias = VFRAME_BIAS_BALANCED
80 	};
81 }
82 
83 extern uint8_t* arcan_base64_encode(
84 	const uint8_t* data, size_t inl, size_t* outl, enum arcan_memhint hint);
85 
86 /*
87  * the bhandler -related calls are already running inside critical as
88  * they come from the data parsing itself.
89  */
dispatch_bdata(struct a12_state * S,int fd,int type,struct shmifsrv_thread_data * D)90 static void dispatch_bdata(
91 	struct a12_state* S, int fd, int type, struct shmifsrv_thread_data* D)
92 {
93 	struct shmifsrv_client* srv_cl = D->C;
94 	switch(type){
95 	case A12_BTYPE_STATE:
96 		shmifsrv_enqueue_event(srv_cl, &(struct arcan_event){
97 			.category = EVENT_TARGET, .tgt.kind = TARGET_COMMAND_RESTORE}, fd);
98 	break;
99 /* There is a difference to the local case here in that the event which carries
100  * the desired size etc. will be sent in advance, thus there will be two times
101  * the number of FONTHINTS, which can result in glitches.  This may cost an
102  * update / redraw, but allows blocking font transfers and still maintaing the
103  * right size. */
104 	case A12_BTYPE_FONT:
105 		shmifsrv_enqueue_event(srv_cl, &(struct arcan_event){
106 			.category = EVENT_TARGET, .tgt.kind = TARGET_COMMAND_FONTHINT,
107 			.tgt.ioevs[1].iv = 1, .tgt.ioevs[2].fv = D->font_sz, .tgt.ioevs[3].iv = -1},
108 		fd);
109 	break;
110 /* the [4].iv here is an indication that the font should be appended to the
111  * set used and defined by the first */
112 	case A12_BTYPE_FONT_SUPPL:
113 		shmifsrv_enqueue_event(srv_cl, &(struct arcan_event){
114 			.category = EVENT_TARGET, .tgt.kind = TARGET_COMMAND_FONTHINT,
115 			.tgt.ioevs[1].iv = 1, .tgt.ioevs[2].fv = D->font_sz, .tgt.ioevs[3].iv = -1,
116 			.tgt.ioevs[4].iv = 1}, fd
117 		);
118 	break;
119 /* Another subtle difference similar to the fonthint detail is that the blob
120  * here actually lacks the identifier string, even if it was set previously.
121  * If that turns out to be an issue (the requirement is that client should not
122  * care/trust the identifier regardless, as it is mainly used as a UI hint and
123  * whatever parser should be tolerant or indicative of how it didn't understand
124  * the data. Should this be too much of an issue, the transfer- token system
125  * could be used to pair the event */
126 	case A12_BTYPE_BLOB:
127 		shmifsrv_enqueue_event(srv_cl, &(struct arcan_event){
128 				.category = EVENT_TARGET, .tgt.kind = TARGET_COMMAND_BCHUNK_IN}, fd);
129 	break;
130 	default:
131 		a12int_trace(A12_TRACE_SYSTEM,
132 			"kind=error:status=EBADTYPE:message=%d", type);
133 	break;
134 	}
135 
136 	close(fd);
137 }
138 
incoming_bhandler(struct a12_state * S,struct a12_bhandler_meta md,void * tag)139 static struct a12_bhandler_res incoming_bhandler(
140 	struct a12_state* S, struct a12_bhandler_meta md, void* tag)
141 {
142 	struct a12_bhandler_res res = {
143 		.fd = -1,
144 		.flag = A12_BHANDLER_DONTWANT
145 	};
146 
147 /* early out case - on completed / on cancelled, the descriptor either
148  * comes from the temp or the cache so no unlink is needed, just fwd+close */
149 	if (md.fd != -1){
150 
151 /* these events cover only one direction, the other (_OUT, _STORE, ...)
152  * requires this side to be proactive when receiving the event, forward the
153  * empty output descriptor and use the bstream-queue command combined with
154  * flock- excl/nonblock to figure out when done. */
155 		if (md.dcont && md.dcont->user &&
156 			!md.streaming && md.state != A12_BHANDLER_CANCELLED){
157 			a12int_trace(A12_TRACE_BTRANSFER,
158 				"kind=accept:ch=%d:stream=%"PRIu64, md.channel, md.streamid);
159 			dispatch_bdata(S, md.fd, md.type, md.dcont->user);
160 		}
161 /* already been dispatched as a pipe */
162 		else
163 			close(md.fd);
164 
165 		return res;
166 	}
167 
168 /* So the handler wants a descriptor for us to store or stream the transfer
169  * into. If it is streaming, a pipe is sufficient and we can start the fwd
170  * immediately. */
171 	if (md.streaming){
172 		int fd[2];
173 		if (-1 != pipe(fd)){
174 			res.flag = A12_BHANDLER_NEWFD;
175 			res.fd = fd[1];
176 			dispatch_bdata(S, md.fd, res.fd, md.dcont->user);
177 		}
178 		return res;
179 	}
180 
181 /* INCOMPLETE
182  * If there is a !0 checksum and a cache_dir has been set, check the cache for
183  * a possible match by going to base64 and try to open. If successful, update
184  * its timestamp, return that it was cached and trigger dispatch_bdata */
185 	size_t i = 0;
186 	for (; i < 16; i++){
187 		if (md.checksum[i] != 0){
188 			break;
189 		}
190 	}
191 	if (0 && i == 16){
192 		a12int_trace(A12_TRACE_MISSING,
193 			"btransfer cache-lookup and forward");
194 		res.flag = A12_BHANDLER_CACHED;
195 		return res;
196 	}
197 
198 /*
199  * Last case, real file with a known destination type and size. Since there is
200  * a format requirement that the non-streaming type is seekable, we only have a
201  * memfd- like approach if the temp-dir creation fails to work with ftruncate
202  * being too large. With the glory that is overcommit and OMK, we might just
203  * DGAF.
204  */
205 /* INCOMPLETE:
206  * we need a mkstemp that works with a dirfd as well and a mkdstemp for the
207  * netproxy tool, so just abuse /tmp for the time being and then add them to
208  * the src/platform/...
209  */
210 	char pattern[] = {"/tmp/anetb_XXXXXX"};
211 	int fd = mkstemp(pattern);
212 	if (-1 == fd){
213 		a12int_trace(A12_TRACE_ALLOC, "source=btransfer:kind=eperm");
214 		return res;
215 	}
216 
217 /* In patient wait for funlinkat, enjoy the race.. */
218 	unlink(pattern);
219 
220 /* Note: the size field comes across a privilege barrier and can be > very <
221  * large, possible allow an option to set an upper cap and reject single
222  * transfers over a certain limit */
223 	if (-1 == ftruncate(fd, md.known_size)){
224 		a12int_trace(A12_TRACE_ALLOC, "source=btransfer:kind=enospace");
225 		close(fd);
226 	}
227 	else {
228 		res.fd = fd;
229 		res.flag = A12_BHANDLER_NEWFD;
230 	}
231 
232 	return res;
233 }
234 
setup_descriptor_store(struct shmifsrv_thread_data * data,struct shmifsrv_client * cl,struct arcan_event * ev)235 static void setup_descriptor_store(struct shmifsrv_thread_data* data,
236 	struct shmifsrv_client* cl, struct arcan_event* ev)
237 {
238 	a12int_trace(A12_TRACE_MISSING, "descriptor_store_setup");
239 }
240 
redirect_exit(struct shmifsrv_client * C,int level,const char * path)241 static void redirect_exit(
242 	struct shmifsrv_client* C, int level, const char* path)
243 {
244 	if (!path)
245 		return;
246 
247 	struct arcan_event ev = {
248 		.category = EVENT_TARGET,
249 		.tgt.kind = TARGET_COMMAND_DEVICE_NODE,
250 		.tgt.ioevs[0].iv = -1,
251 		.tgt.ioevs[1].iv = level,
252 /* this will ignore the GUID setting */
253 		};
254 
255 	snprintf(ev.tgt.message, COUNT_OF(ev.tgt.message), "%s", path);
256 	shmifsrv_enqueue_event(C, &ev, -1);
257 	a12int_trace(A12_TRACE_EVENT, "kind=redirect:destination=%s", path);
258 }
259 
260 /*
261  * [THREADING]
262  * Called from within a _lock / _unlock block
263  */
on_srv_event(struct arcan_shmif_cont * cont,int chid,struct arcan_event * ev,void * tag)264 static void on_srv_event(
265 	struct arcan_shmif_cont* cont, int chid, struct arcan_event* ev, void* tag)
266 {
267 	struct shmifsrv_thread_data* data = tag;
268 	if (!cont){
269 		a12int_trace(A12_TRACE_SYSTEM, "kind=error:type=EINVALCH:val=%d", chid);
270 		return;
271 	}
272 	struct shmifsrv_client* srv_cl = data->C;
273 
274 /*
275  * cache this so we can re-inject on font hints where there is a descriptor as
276  * that is delivered as a sideband via btransfer
277  */
278 	if (ev->category == EVENT_TARGET && ev->tgt.kind == TARGET_COMMAND_FONTHINT){
279 		data->font_sz = ev->tgt.ioevs[2].fv;
280 		ev->tgt.ioevs[1].iv = 0;
281 		ev->tgt.ioevs[0].iv = -1;
282 
283 /* mask continuation entirely */
284 		if (ev->tgt.ioevs[4].iv)
285 			return;
286 	}
287 
288 /*
289  * when activated (and we have another possible known connection point)
290  * set that as the client fallback, so that we can do both explicit
291  * migration and crash recovery
292  */
293 	if (data->opts.devicehint_cp && chid == 0 &&
294 		ev->category == EVENT_TARGET && ev->tgt.kind == TARGET_COMMAND_ACTIVATE){
295 		a12int_trace(A12_TRACE_EVENT,
296 			"kind=activate:fallback_to:%s", data->opts.devicehint_cp);
297 		struct arcan_event ev = {
298 			.category = EVENT_TARGET,
299 			.tgt.kind = TARGET_COMMAND_DEVICE_NODE,
300 			.tgt.ioevs[0].iv = -1,
301 			.tgt.ioevs[1].iv = 4 /* fallback */
302 		};
303 
304 		snprintf(ev.tgt.message,
305 			COUNT_OF(ev.tgt.message), "%s", data->opts.devicehint_cp);
306 		shmifsrv_enqueue_event(srv_cl, &ev, -1);
307 	}
308 
309 	a12int_trace(A12_TRACE_EVENT,
310 		"kind=forward:chid=%d:eventstr=%s", chid, arcan_shmif_eventstr(ev, NULL, 0));
311 
312 /*
313  * Some events are valuable to intercept and tweak:
314  *
315  * EXIT - unconditional termination is problematic, if we have a local connection-
316  * point defined already, it is better to send the window there for the primary
317  * segment.
318  *
319  * The thing is, we can't guarantee that the EXIT event will arrive if the
320  * channel is forcibly closed etc. so it also needs to be accounted for later
321  */
322  	if (data->opts.redirect_exit && chid == 0 &&
323 		ev->category == EVENT_TARGET && ev->tgt.kind == TARGET_COMMAND_EXIT){
324 		redirect_exit(srv_cl, 2, data->opts.redirect_exit);
325 		return;
326 	}
327 
328 /*
329  * NEWEVENT - necessary, need to map to new channels.
330  */
331 	if (ev->category != EVENT_TARGET || ev->tgt.kind != TARGET_COMMAND_NEWSEGMENT){
332 		if (!srv_cl){
333 			a12int_trace(A12_TRACE_SYSTEM, "kind=error:type=EINVAL:val=%d", chid);
334 		}
335 		else{
336 /* The descriptor events are more complicated as they can be either incoming or
337  * outgoing. When we talk _STORE or BCHUNK_OUT events arriving here, they won't
338  * actually have a descriptor with them, we need to create the temp. resource,
339  * add the descriptor and then a12_enqueue_bstream accordingly. In addition,
340  * we actually need to pair the token that was added in the descriptor slot so
341  * the other side know where to dump it into */
342 			if (arcan_shmif_descrevent(ev)){
343 				if (ev->tgt.kind == TARGET_COMMAND_STORE ||
344 					ev->tgt.kind == TARGET_COMMAND_BCHUNK_OUT){
345 					a12int_trace(A12_TRACE_BTRANSFER, "kind=status:message=outgoing_bchunk");
346 					setup_descriptor_store(data, srv_cl, ev);
347 				}
348 				else
349 					shmifsrv_enqueue_event(srv_cl, ev, ev->tgt.ioevs[0].iv);
350 			}
351 			else
352 				shmifsrv_enqueue_event(srv_cl, ev, -1);
353 		}
354 		return;
355 	}
356 
357 /* First we need a copy of the current processing thread structure */
358 	struct shmifsrv_thread_data* new_data =
359 		malloc(sizeof(struct shmifsrv_thread_data));
360 	if (!new_data){
361 		a12int_trace(A12_TRACE_SYSTEM,
362 			"kind=error:type=ENOMEM:src_ch=%d:dst_ch=%d", chid, ev->tgt.ioevs[0].iv);
363 		a12_set_channel(data->S, chid);
364 		a12_channel_close(data->S);
365 		return;
366 	}
367 	*new_data = *data;
368 
369 /* Then we forward the subsegment to the local client */
370 	new_data->chid = ev->tgt.ioevs[0].iv;
371 	new_data->C = shmifsrv_send_subsegment(
372 		srv_cl, ev->tgt.ioevs[2].iv, 0, 32, 32, chid, ev->tgt.ioevs[3].uiv);
373 
374 /* That can well fail (descriptors etc.) */
375 	if (!new_data->C){
376 		a12int_trace(A12_TRACE_SYSTEM,
377 			"kind=error:type=ENOSRVMEM:src_ch=%d:dst_ch=%d", chid, ev->tgt.ioevs[0].iv);
378 		free(new_data);
379 		a12_set_channel(data->S, chid);
380 		a12_channel_close(data->S);
381 	}
382 
383 /* Attach to a new processing thread, and tie this channel to the 'fake'
384  * segment that we only use to extract back the events and so on to for now,
385  * when we deal with 'output' segments as well, this is where we'd need to do
386  * the encoding dance */
387 	new_data->fake.user = new_data;
388 	a12_set_destination(data->S, &new_data->fake, new_data->chid);
389 
390 	if (!spawn_thread(new_data)){
391 		a12int_trace(A12_TRACE_SYSTEM,
392 			"kind=error:type=ENOTHRDMEM:src_ch=%d:dst_ch=%d", chid, ev->tgt.ioevs[0].iv);
393 		free(new_data);
394 		a12_set_channel(data->S, chid);
395 		a12_channel_close(data->S);
396 		return;
397 	}
398 
399 	a12int_trace(A12_TRACE_ALLOC,
400 		"kind=new_channel:src_ch=%d:dst_ch=%d", chid, (int)new_data->chid);
401 }
402 
on_audio_cb(shmif_asample * buf,size_t n_samples,unsigned channels,unsigned rate,void * tag)403 static void on_audio_cb(shmif_asample* buf,
404 	size_t n_samples,  unsigned channels, unsigned rate, void* tag)
405 {
406 	return;
407 	struct a12_state* S = tag;
408 	a12_channel_aframe(S, buf, n_samples,
409 		(struct a12_aframe_cfg){
410 			.channels = channels,
411 			.samplerate = rate
412 		},
413 		(struct a12_aframe_opts){
414 			.method = AFRAME_METHOD_RAW
415 		}
416 	);
417 }
418 
client_thread(void * inarg)419 static void* client_thread(void* inarg)
420 {
421 	struct shmifsrv_thread_data* data = inarg;
422 	static const short errmask = POLLERR | POLLNVAL | POLLHUP;
423 	struct pollfd pfd[2] = {
424 		{ .fd = shmifsrv_client_handle(data->C), .events = POLLIN | errmask },
425 		{ .fd = data->kill_fd, errmask }
426 	};
427 
428 /* We don't have a monitorable trigger for inbound video/audio frames, so some
429  * timeout is in order for the time being. It might be useful to add that kind
430  * of signalling to shmif though */
431 	size_t poll_step = 4;
432 
433 /* the ext-io thread might be sleeping waiting for input, when we finished
434  * one pass/burst and know there is queued data to be sent, wake it up */
435 	bool dirty = false;
436 	redirect_exit(data->C, 4, data->opts.redirect_exit);
437 
438 	for(;;){
439 		if (dirty){
440 			write(data->kill_fd, &data->chid, 1);
441 			dirty = false;
442 		}
443 
444 		if (-1 == poll(pfd, 2, poll_step)){
445 			if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR){
446 				a12int_trace(A12_TRACE_SYSTEM,
447 					"kind=error:status=EPOLL:message=%s", strerror(errno));
448 				break;
449 			}
450 		}
451 
452 /* kill socket or poll socket died */
453 		if ((pfd[0].revents & errmask) || (pfd[1].revents & errmask)){
454 			a12int_trace(A12_TRACE_SYSTEM,
455 				"kind=error:status=EBADFD:client=%d:killsig=%d", pfd[0].revents, pfd[1].revents);
456 			break;
457 		}
458 
459 		struct arcan_event ev;
460 
461 		while (shmifsrv_dequeue_events(data->C, &ev, 1)){
462 			if (arcan_shmif_descrevent(&ev)){
463 				a12int_trace(A12_TRACE_SYSTEM,
464 					"kind=error:status=EINVAL:message=client->server descriptor event");
465 				continue;
466 			}
467 
468 /* server-consumed or should be forwarded? */
469 			BEGIN_CRITICAL(&giant_lock, "client_event");
470 				if (shmifsrv_process_event(data->C, &ev)){
471 					a12int_trace(A12_TRACE_EVENT,
472 						"kind=consumed:channel=%d:eventstr=%s",
473 						data->chid, arcan_shmif_eventstr(&ev, NULL, 0)
474 					);
475 				}
476 				else {
477 					a12_set_channel(data->S, data->chid);
478 					a12int_trace(A12_TRACE_EVENT, "kind=forward:channel=%d:eventstr=%s",
479 						data->chid, arcan_shmif_eventstr(&ev, NULL, 0));
480 					a12_channel_enqueue(data->S, &ev);
481 					dirty = true;
482 				}
483 			END_CRITICAL(&giant_lock);
484 		}
485 
486 		int pv;
487 		while ((pv = shmifsrv_poll(data->C)) != CLIENT_NOT_READY){
488 /* Dead client, send the close message and that should cascade down the rest
489  * and kill relevant sockets. */
490 			if (pv == CLIENT_DEAD){
491 				a12int_trace(A12_TRACE_EVENT, "client=dead");
492 				goto out;
493 			}
494 
495 /* the shared buffer_out marks if we should wait a bit before releasing the
496  * client as to not keep oversaturating with incoming video frames, we could
497  * threshold this to something more reasonable, or just have two congestion
498  * levels, one for focused channel and one lower for the rest */
499 			if (pv & CLIENT_VBUFFER_READY){
500 				if (atomic_load(&buffer_out) > 0){
501 					break;
502 				}
503 
504 /* check the congestion window - there are many more options for congestion
505  * control here, and the tuning is not figured out. One venue would be to
506  * track which channel has a segment with focus, and prioritise those higher. */
507 				struct a12_iostat stat = a12_state_iostat(data->S);
508 				struct shmifsrv_vbuffer vb = shmifsrv_video(data->C);
509 
510 				if (data->opts.vframe_block &&
511 					stat.vframe_backpressure >= data->opts.vframe_soft_block){
512 
513 /* the soft block caps at ~20% of buffer difs for large buffers, the other
514  * option is to have aggregation and dirty rectangles here, then invalidate if
515  * they accumulate to cover all */
516 					size_t px_c = vb.w * vb.h;
517 					size_t reg_c =
518 						(vb.region.x2 - vb.region.x1) * (vb.region.y2 - vb.region.y1);
519 					bool allow_soft = vb.flags.subregion &&
520 						(reg_c < px_c) && ((float)reg_c / (float)px_c) <= 0.2;
521 
522 					if (stat.vframe_backpressure >= data->opts.vframe_block || !allow_soft){
523 						a12int_trace(A12_TRACE_VDETAIL,
524 							"vbuffer=defer:congestion=%zu:soft=:%zu:limit=%zu",
525 							stat.vframe_backpressure, data->opts.vframe_soft_block,
526 							data->opts.vframe_block
527 						);
528 						break;
529 					}
530 				}
531 
532 /* two option, one is to map the dma-buf ourselves and do the readback, or with
533  * streams map the stream and convert to h264 on gpu, but easiest now is to
534  * just reject and let the caller do the readback. this is currently done by
535  * default in shmifsrv.*/
536 				BEGIN_CRITICAL(&giant_lock, "video-buffer");
537 					a12_set_channel(data->S, data->chid);
538 
539 /* vopts_from_segment here lets the caller pick compression parameters (coarse),
540  * including the special 'defer this frame until later' */
541 					a12_channel_vframe(data->S, &vb, vopts_from_segment(data, vb));
542 					dirty = true;
543 				END_CRITICAL(&giant_lock);
544 				stat = a12_state_iostat(data->S);
545 				a12int_trace(A12_TRACE_VDETAIL,
546 					"vbuffer=release:time_ms=%zu:time_ms_px=%.4f:congestion=%zu",
547 					stat.ms_vframe, stat.ms_vframe_px,
548 					stat.vframe_backpressure
549 				);
550 
551 /* the other part is to, after a certain while of VBUFFER_READY but not any
552  * buffer- out space, track if any of our segments have focus, if so, inject it
553  * anyhow (should help responsiveness), increase video compression time-
554  * tradeoff and defer the step stage so the client gets that we are limited */
555 				shmifsrv_video_step(data->C);
556 			}
557 
558 /* send audio anyway, as not all clients are providing audio and there is less
559  * tricks that can be applied from the backpressured client, dynamic resampling
560  * and heavier compression is an option here as well though */
561 			if (pv & CLIENT_ABUFFER_READY){
562 				a12int_trace(A12_TRACE_AUDIO, "audio-buffer");
563 				BEGIN_CRITICAL(&giant_lock, "audio_buffer");
564 					a12_set_channel(data->S, data->chid);
565 					shmifsrv_audio(data->C, on_audio_cb, data->S);
566 					dirty = true;
567 				END_CRITICAL(&giant_lock);
568 			}
569 		}
570 
571 	}
572 
573 out:
574 	BEGIN_CRITICAL(&giant_lock, "client_death");
575 		a12_set_channel(data->S, data->chid);
576 		a12_channel_close(data->S);
577 		write(data->kill_fd, &data->chid, 1);
578 		a12int_trace(A12_TRACE_SYSTEM, "client died");
579 	END_CRITICAL(&giant_lock);
580 
581 /* only shut-down everything on the primary- segment failure */
582 	if (data->chid == 0 && data->kill_fd != -1)
583 		close(data->kill_fd);
584 
585 /* don't kill the shmifsrv client session for the primary one */
586 	if (data->chid != 0){
587 		shmifsrv_free(data->C, SHMIFSRV_FREE_NO_DMS);
588 	}
589 
590 	atomic_fetch_sub(&n_segments, 1);
591 
592 	free(inarg);
593 	return NULL;
594 }
595 
spawn_thread(struct shmifsrv_thread_data * inarg)596 static bool spawn_thread(struct shmifsrv_thread_data* inarg)
597 {
598 	pthread_t pth;
599 	pthread_attr_t pthattr;
600 	pthread_attr_init(&pthattr);
601 	pthread_attr_setdetachstate(&pthattr, PTHREAD_CREATE_DETACHED);
602 	atomic_fetch_add(&n_segments, 1);
603 
604 	if (-1 == pthread_create(&pth, &pthattr, client_thread, inarg)){
605 		BEGIN_CRITICAL(&giant_lock, "cleanup-spawn");
606 			atomic_fetch_sub(&n_segments, 1);
607 			a12int_trace(A12_TRACE_ALLOC, "could not spawn thread");
608 			free(inarg);
609 		END_CRITICAL(&giant_lock);
610 		return false;
611 	}
612 
613 	return true;
614 }
615 
a12helper_a12cl_shmifsrv(struct a12_state * S,struct shmifsrv_client * C,int fd_in,int fd_out,struct a12helper_opts opts)616 void a12helper_a12cl_shmifsrv(struct a12_state* S,
617 	struct shmifsrv_client* C, int fd_in, int fd_out, struct a12helper_opts opts)
618 {
619 	uint8_t* outbuf = NULL;
620 	size_t outbuf_sz = 0;
621 
622 
623 /* tie an empty context as channel destination, we use this as a type- wrapper
624  * for the shmifsrv_client now, this logic is slightly different on the
625  * shmifsrv_client side. */
626 	struct arcan_shmif_cont fake = {};
627 	a12_set_destination(S, &fake, 0);
628 	a12_set_bhandler(S, incoming_bhandler, &opts);
629 
630 	int pipe_pair[2];
631 	if (-1 == pipe(pipe_pair))
632 		return;
633 
634 /*
635  * Spawn the processing- thread that will take care of a shmifsrv_client
636  */
637 	struct shmifsrv_thread_data* arg;
638 	arg = malloc(sizeof(struct shmifsrv_thread_data));
639 	if (!arg){
640 		close(pipe_pair[0]);
641 		close(pipe_pair[1]);
642 		return;
643 	}
644 
645 /*
646  * the kill_fd will be shared among the other segments, so it is only
647  * here where there is reason to clean it up like this
648  */
649 	*arg = (struct shmifsrv_thread_data){
650 		.C = C,
651 		.S = S,
652 		.kill_fd = pipe_pair[1],
653 		.opts = opts,
654 		.chid = 0
655 	};
656 	if (!spawn_thread(arg)){
657 		close(pipe_pair[0]);
658 		close(pipe_pair[1]);
659 		return;
660 	}
661 	fake.user = arg;
662 
663 /* Socket in/out liveness, buffer flush / dispatch */
664 	size_t n_fd = 2;
665 	static const short errmask = POLLERR | POLLNVAL | POLLHUP;
666 	struct pollfd fds[3] = {
667 		{	.fd = fd_in, .events = POLLIN | errmask},
668 		{ .fd = pipe_pair[0], .events = POLLIN | errmask},
669 		{ .fd = fd_out, .events = POLLOUT | errmask}
670 	};
671 
672 /* flush authentication leftovers */
673 	a12_unpack(S, NULL, 0, arg, on_srv_event);
674 
675 	uint8_t inbuf[9000];
676 	while(a12_ok(S) && -1 != poll(fds, n_fd, -1)){
677 
678 /* death by poll? */
679 		if ((fds[0].revents & errmask) ||
680 				(fds[1].revents & errmask) ||
681 				(n_fd == 3 && (fds[2].revents & errmask))){
682 			break;
683 		}
684 
685 /* flush wakeup data from threads */
686 		if (fds[1].revents){
687 			if (a12_trace_targets & A12_TRACE_TRANSFER){
688 				BEGIN_CRITICAL(&giant_lock, "flush-iopipe");
689 					a12int_trace(
690 						A12_TRACE_TRANSFER, "client thread wakeup");
691 				END_CRITICAL(&giant_lock);
692 			}
693 
694 			read(fds[1].fd, inbuf, 9000);
695 		}
696 
697 /* pending out, flush or grab next out buffer */
698 		if (n_fd == 3 && (fds[2].revents & POLLOUT) && outbuf_sz){
699 			ssize_t nw = write(fd_out, outbuf, outbuf_sz);
700 
701 			if (a12_trace_targets & A12_TRACE_TRANSFER){
702 				BEGIN_CRITICAL(&giant_lock, "buffer-send");
703 				a12int_trace(
704 					A12_TRACE_TRANSFER, "send %zd (left %zu) bytes", nw, outbuf_sz);
705 				END_CRITICAL(&giant_lock);
706 			}
707 
708 			if (nw > 0){
709 				outbuf += nw;
710 				outbuf_sz -= nw;
711 			}
712 		}
713 
714 /* then read and unpack incoming data, note that in the on_srv_event handler
715  * we ALREADY HOLD THE LOCK so it is a deadlock condition to try and lock there */
716 		if (fds[0].revents & POLLIN){
717 			ssize_t nr = recv(fd_in, inbuf, 9000, 0);
718 			if (-1 == nr && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR){
719 				if (a12_trace_targets & A12_TRACE_SYSTEM){
720 					BEGIN_CRITICAL(&giant_lock, "data error");
721 						a12int_trace(A12_TRACE_SYSTEM, "data-in, error: %s", strerror(errno));
722 					END_CRITICAL(&giant_lock);
723 				}
724 				break;
725 			}
726 	/* pollin- says yes, but recv says no? */
727 			if (nr == 0){
728 				BEGIN_CRITICAL(&giant_lock, "socket closed");
729 				a12int_trace(A12_TRACE_SYSTEM, "data-in, other side closed connection");
730 				END_CRITICAL(&giant_lock);
731 				break;
732 			}
733 
734 			BEGIN_CRITICAL(&giant_lock, "unpack-event");
735 				a12int_trace(A12_TRACE_TRANSFER, "unpack %zd bytes", nr);
736 				a12_unpack(S, inbuf, nr, arg, on_srv_event);
737 			END_CRITICAL(&giant_lock);
738 		}
739 
740 		if (!outbuf_sz){
741 			BEGIN_CRITICAL(&giant_lock, "get-buffer");
742 				outbuf_sz = a12_flush(S, &outbuf, 0);
743 			END_CRITICAL(&giant_lock);
744 		}
745 		n_fd = outbuf_sz > 0 ? 3 : 2;
746 	}
747 
748 	a12int_trace(A12_TRACE_SYSTEM, "(srv) shutting down connection");
749 	close(pipe_pair[0]);
750 	while(atomic_load(&n_segments) > 0){}
751 	if (!a12_free(S)){
752 		a12int_trace(A12_TRACE_ALLOC, "error cleaning up a12 context");
753 	}
754 
755 /* only the primary segment left, we will try and migrate that one,
756  * sending the DEVICE_NODE migrate event and performing a non-dms drop */
757 	redirect_exit(C, 2, opts.redirect_exit);
758 }
759