1 /*
2  * libiio - Library for interfacing industrial I/O (IIO) devices
3  *
4  * Copyright (C) 2014 Analog Devices, Inc.
5  * Author: Paul Cercueil <paul.cercueil@analog.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * */
18 
19 #include "ops.h"
20 #include "parser.h"
21 #include "thread-pool.h"
22 #include "../debug.h"
23 #include "../iio-private.h"
24 
25 #include <errno.h>
26 #include <limits.h>
27 #include <pthread.h>
28 #include <poll.h>
29 #include <stdbool.h>
30 #include <string.h>
31 #include <sys/eventfd.h>
32 #include <time.h>
33 #include <fcntl.h>
34 #include <signal.h>
35 
36 int yyparse(yyscan_t scanner);
37 
38 struct DevEntry;
39 
40 /* Corresponds to a thread reading from a device */
41 struct ThdEntry {
42 	SLIST_ENTRY(ThdEntry) parser_list_entry;
43 	SLIST_ENTRY(ThdEntry) dev_list_entry;
44 	unsigned int nb, sample_size, samples_count;
45 	ssize_t err;
46 
47 	int eventfd;
48 
49 	struct parser_pdata *pdata;
50 	struct iio_device *dev;
51 	struct DevEntry *entry;
52 
53 	uint32_t *mask;
54 	bool active, is_writer, new_client, wait_for_open;
55 };
56 
thd_entry_event_signal(struct ThdEntry * thd)57 static void thd_entry_event_signal(struct ThdEntry *thd)
58 {
59 	uint64_t e = 1;
60 	int ret;
61 
62 	do {
63 		ret = write(thd->eventfd, &e, sizeof(e));
64 	} while (ret == -1 && errno == EINTR);
65 }
66 
thd_entry_event_wait(struct ThdEntry * thd,pthread_mutex_t * mutex,int fd_in)67 static int thd_entry_event_wait(struct ThdEntry *thd, pthread_mutex_t *mutex,
68 	int fd_in)
69 {
70 	struct pollfd pfd[3];
71 	uint64_t e;
72 	int ret;
73 
74 	pthread_mutex_unlock(mutex);
75 
76 	pfd[0].fd = thd->eventfd;
77 	pfd[0].events = POLLIN;
78 	pfd[1].fd = fd_in;
79 	pfd[1].events = POLLRDHUP;
80 	pfd[2].fd = thread_pool_get_poll_fd(thd->pdata->pool);
81 	pfd[2].events = POLLIN;
82 
83 	do {
84 		poll_nointr(pfd, 3);
85 
86 		if ((pfd[1].revents & POLLRDHUP) || (pfd[2].revents & POLLIN)) {
87 			pthread_mutex_lock(mutex);
88 			return -EPIPE;
89 		}
90 
91 		do {
92 			ret = read(thd->eventfd, &e, sizeof(e));
93 		} while (ret == -1 && errno == EINTR);
94 	} while (ret == -1 && errno == EAGAIN);
95 
96 	pthread_mutex_lock(mutex);
97 
98 	return 0;
99 }
100 
101 /* Corresponds to an opened device */
102 struct DevEntry {
103 	unsigned int ref_count;
104 
105 	struct iio_device *dev;
106 	struct iio_buffer *buf;
107 	unsigned int sample_size, nb_clients;
108 	bool update_mask;
109 	bool cyclic;
110 	bool closed;
111 	bool cancelled;
112 
113 	/* Linked list of ThdEntry structures corresponding
114 	 * to all the threads who opened the device */
115 	SLIST_HEAD(ThdHead, ThdEntry) thdlist_head;
116 	pthread_mutex_t thdlist_lock;
117 
118 	pthread_cond_t rw_ready_cond;
119 
120 	uint32_t *mask;
121 	size_t nb_words;
122 };
123 
124 struct sample_cb_info {
125 	struct parser_pdata *pdata;
126 	unsigned int nb_bytes, cpt;
127 	uint32_t *mask;
128 };
129 
130 /* Protects iio_device_{set,get}_data() from concurrent access from multiple
131  * clients */
132 static pthread_mutex_t devlist_lock = PTHREAD_MUTEX_INITIALIZER;
133 
134 #if WITH_AIO
async_io(struct parser_pdata * pdata,void * buf,size_t len,bool do_read)135 static ssize_t async_io(struct parser_pdata *pdata, void *buf, size_t len,
136 	bool do_read)
137 {
138 	ssize_t ret;
139 	struct pollfd pfd[2];
140 	unsigned int num_pfds;
141 	struct iocb iocb;
142 	struct iocb *ios[1];
143 	struct io_event e[1];
144 
145 	ios[0] = &iocb;
146 
147 	if (do_read)
148 		io_prep_pread(&iocb, pdata->fd_in, buf, len, 0);
149 	else
150 		io_prep_pwrite(&iocb, pdata->fd_out, buf, len, 0);
151 
152 	io_set_eventfd(&iocb, pdata->aio_eventfd);
153 
154 	pthread_mutex_lock(&pdata->aio_mutex);
155 
156 	ret = io_submit(pdata->aio_ctx, 1, ios);
157 	if (ret != 1) {
158 		pthread_mutex_unlock(&pdata->aio_mutex);
159 		IIO_ERROR("Failed to submit IO operation: %zd\n", ret);
160 		return -EIO;
161 	}
162 
163 	pfd[0].fd = pdata->aio_eventfd;
164 	pfd[0].events = POLLIN;
165 	pfd[0].revents = 0;
166 	pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
167 	pfd[1].events = POLLIN;
168 	pfd[1].revents = 0;
169 	num_pfds = 2;
170 
171 	do {
172 		poll_nointr(pfd, num_pfds);
173 
174 		if (pfd[0].revents & POLLIN) {
175 			uint64_t event;
176 			ret = read(pdata->aio_eventfd, &event, sizeof(event));
177 			if (ret != sizeof(event)) {
178 				IIO_ERROR("Failed to read from eventfd: %d\n", -errno);
179 				ret = -EIO;
180 				break;
181 			}
182 
183 			ret = io_getevents(pdata->aio_ctx, 0, 1, e, NULL);
184 			if (ret != 1) {
185 				IIO_ERROR("Failed to read IO events: %zd\n", ret);
186 				ret = -EIO;
187 				break;
188 			} else {
189 				ret = (long)e[0].res;
190 			}
191 		} else if ((num_pfds > 1 && pfd[1].revents & POLLIN)) {
192 			/* Got a STOP event to abort this whole session */
193 			ret = io_cancel(pdata->aio_ctx, &iocb, e);
194 			if (ret != -EINPROGRESS && ret != -EINVAL) {
195 				IIO_ERROR("Failed to cancel IO transfer: %zd\n", ret);
196 				ret = -EIO;
197 				break;
198 			}
199 			/* It should not be long now until we get the cancellation event */
200 			num_pfds = 1;
201 		}
202 	} while (!(pfd[0].revents & POLLIN));
203 
204 	pthread_mutex_unlock(&pdata->aio_mutex);
205 
206 	/* Got STOP event, treat it as EOF */
207 	if (num_pfds == 1)
208 		return 0;
209 
210 	return ret;
211 }
212 
213 #define MAX_AIO_REQ_SIZE (1024 * 1024)
214 
readfd_aio(struct parser_pdata * pdata,void * dest,size_t len)215 static ssize_t readfd_aio(struct parser_pdata *pdata, void *dest, size_t len)
216 {
217 	if (len > MAX_AIO_REQ_SIZE)
218 		len = MAX_AIO_REQ_SIZE;
219 	return async_io(pdata, dest, len, true);
220 }
221 
writefd_aio(struct parser_pdata * pdata,const void * dest,size_t len)222 static ssize_t writefd_aio(struct parser_pdata *pdata, const void *dest,
223 		size_t len)
224 {
225 	if (len > MAX_AIO_REQ_SIZE)
226 		len = MAX_AIO_REQ_SIZE;
227 	return async_io(pdata, (void *)dest, len, false);
228 }
229 #endif /* WITH_AIO */
230 
readfd_io(struct parser_pdata * pdata,void * dest,size_t len)231 static ssize_t readfd_io(struct parser_pdata *pdata, void *dest, size_t len)
232 {
233 	ssize_t ret;
234 	struct pollfd pfd[2];
235 
236 	pfd[0].fd = pdata->fd_in;
237 	pfd[0].events = POLLIN | POLLRDHUP;
238 	pfd[0].revents = 0;
239 	pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
240 	pfd[1].events = POLLIN;
241 	pfd[1].revents = 0;
242 
243 	do {
244 		poll_nointr(pfd, 2);
245 
246 		/* Got STOP event, or client closed the socket: treat it as EOF */
247 		if (pfd[1].revents & POLLIN || pfd[0].revents & POLLRDHUP)
248 			return 0;
249 		if (pfd[0].revents & POLLERR)
250 			return -EIO;
251 		if (!(pfd[0].revents & POLLIN))
252 			continue;
253 
254 		do {
255 			if (pdata->fd_in_is_socket)
256 				ret = recv(pdata->fd_in, dest, len, MSG_NOSIGNAL);
257 			else
258 				ret = read(pdata->fd_in, dest, len);
259 		} while (ret == -1 && errno == EINTR);
260 
261 		if (ret != -1 || errno != EAGAIN)
262 			break;
263 	} while (true);
264 
265 	if (ret == -1)
266 		return -errno;
267 
268 	return ret;
269 }
270 
writefd_io(struct parser_pdata * pdata,const void * src,size_t len)271 static ssize_t writefd_io(struct parser_pdata *pdata, const void *src, size_t len)
272 {
273 	ssize_t ret;
274 	struct pollfd pfd[2];
275 
276 	pfd[0].fd = pdata->fd_out;
277 	pfd[0].events = POLLOUT;
278 	pfd[0].revents = 0;
279 	pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
280 	pfd[1].events = POLLIN;
281 	pfd[1].revents = 0;
282 
283 	do {
284 		poll_nointr(pfd, 2);
285 
286 		/* Got STOP event, or client closed the socket: treat it as EOF */
287 		if (pfd[1].revents & POLLIN || pfd[0].revents & POLLHUP)
288 			return 0;
289 		if (pfd[0].revents & POLLERR)
290 			return -EIO;
291 		if (!(pfd[0].revents & POLLOUT))
292 			continue;
293 
294 		do {
295 			if (pdata->fd_out_is_socket)
296 				ret = send(pdata->fd_out, src, len, MSG_NOSIGNAL);
297 			else
298 				ret = write(pdata->fd_out, src, len);
299 		} while (ret == -1 && errno == EINTR);
300 
301 		if (ret != -1 || errno != EAGAIN)
302 			break;
303 	} while (true);
304 
305 	if (ret == -1)
306 		return -errno;
307 
308 	return ret;
309 }
310 
write_all(struct parser_pdata * pdata,const void * src,size_t len)311 ssize_t write_all(struct parser_pdata *pdata, const void *src, size_t len)
312 {
313 	uintptr_t ptr = (uintptr_t) src;
314 
315 	while (len) {
316 		ssize_t ret = pdata->writefd(pdata, (void *) ptr, len);
317 		if (ret < 0)
318 			return ret;
319 		if (!ret)
320 			return -EPIPE;
321 		ptr += ret;
322 		len -= ret;
323 	}
324 
325 	return ptr - (uintptr_t) src;
326 }
327 
read_all(struct parser_pdata * pdata,void * dst,size_t len)328 static ssize_t read_all(struct parser_pdata *pdata,
329 		void *dst, size_t len)
330 {
331 	uintptr_t ptr = (uintptr_t) dst;
332 
333 	while (len) {
334 		ssize_t ret = pdata->readfd(pdata, (void *) ptr, len);
335 		if (ret < 0)
336 			return ret;
337 		if (!ret)
338 			return -EPIPE;
339 		ptr += ret;
340 		len -= ret;
341 	}
342 
343 	return ptr - (uintptr_t) dst;
344 }
345 
print_value(struct parser_pdata * pdata,long value)346 static void print_value(struct parser_pdata *pdata, long value)
347 {
348 	if (pdata->verbose && value < 0) {
349 		char buf[1024];
350 		iio_strerror(-value, buf, sizeof(buf));
351 		output(pdata, "ERROR: ");
352 		output(pdata, buf);
353 		output(pdata, "\n");
354 	} else {
355 		char buf[128];
356 		iio_snprintf(buf, sizeof(buf), "%li\n", value);
357 		output(pdata, buf);
358 	}
359 }
360 
send_sample(const struct iio_channel * chn,void * src,size_t length,void * d)361 static ssize_t send_sample(const struct iio_channel *chn,
362 		void *src, size_t length, void *d)
363 {
364 	struct sample_cb_info *info = d;
365 	if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
366 		return 0;
367 	if (info->nb_bytes < length)
368 		return 0;
369 
370 	if (info->cpt % length) {
371 		unsigned int i, goal = length - info->cpt % length;
372 		char zero = 0;
373 		ssize_t ret;
374 
375 		for (i = 0; i < goal; i++) {
376 			ret = info->pdata->writefd(info->pdata, &zero, 1);
377 			if (ret < 0)
378 				return ret;
379 		}
380 		info->cpt += goal;
381 	}
382 
383 	info->cpt += length;
384 	info->nb_bytes -= length;
385 	return write_all(info->pdata, src, length);
386 }
387 
receive_sample(const struct iio_channel * chn,void * dst,size_t length,void * d)388 static ssize_t receive_sample(const struct iio_channel *chn,
389 		void *dst, size_t length, void *d)
390 {
391 	struct sample_cb_info *info = d;
392 	if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
393 		return 0;
394 	if (info->cpt == info->nb_bytes)
395 		return 0;
396 
397 	/* Skip the padding if needed */
398 	if (info->cpt % length) {
399 		unsigned int i, goal = length - info->cpt % length;
400 		char foo;
401 		ssize_t ret;
402 
403 		for (i = 0; i < goal; i++) {
404 			ret = info->pdata->readfd(info->pdata, &foo, 1);
405 			if (ret < 0)
406 				return ret;
407 		}
408 		info->cpt += goal;
409 	}
410 
411 	info->cpt += length;
412 	return read_all(info->pdata, dst, length);
413 }
414 
send_data(struct DevEntry * dev,struct ThdEntry * thd,size_t len)415 static ssize_t send_data(struct DevEntry *dev, struct ThdEntry *thd, size_t len)
416 {
417 	struct parser_pdata *pdata = thd->pdata;
418 	bool demux = server_demux && dev->sample_size != thd->sample_size;
419 
420 	if (demux)
421 		len = (len / dev->sample_size) * thd->sample_size;
422 	if (len > thd->nb)
423 		len = thd->nb;
424 
425 	print_value(pdata, len);
426 
427 	if (thd->new_client) {
428 		unsigned int i;
429 		char buf[129], *ptr = buf;
430 		uint32_t *mask = demux ? thd->mask : dev->mask;
431 		ssize_t ret, len;
432 
433 		len = sizeof(buf);
434 		/* Send the current mask */
435 		for (i = dev->nb_words; i > 0 && ptr < buf + sizeof(buf);
436 				i--, ptr += 8) {
437 			iio_snprintf(ptr, len, "%08x", mask[i - 1]);
438 			len -= 8;
439 		}
440 
441 		*ptr = '\n';
442 		len--;
443 
444 		if (len < 0) {
445 			IIO_ERROR("send_data: string length error\n");
446 			return -ENOSPC;
447 		}
448 
449 		ret = write_all(pdata, buf, ptr + 1 - buf);
450 		if (ret < 0)
451 			return ret;
452 
453 		thd->new_client = false;
454 	}
455 
456 	if (!demux) {
457 		/* Short path */
458 		return write_all(pdata, dev->buf->buffer, len);
459 	} else {
460 		struct sample_cb_info info = {
461 			.pdata = pdata,
462 			.cpt = 0,
463 			.nb_bytes = len,
464 			.mask = thd->mask,
465 		};
466 
467 		return iio_buffer_foreach_sample(dev->buf, send_sample, &info);
468 	}
469 }
470 
receive_data(struct DevEntry * dev,struct ThdEntry * thd)471 static ssize_t receive_data(struct DevEntry *dev, struct ThdEntry *thd)
472 {
473 	struct parser_pdata *pdata = thd->pdata;
474 
475 	/* Inform that no error occurred, and that we'll start reading data */
476 	if (thd->new_client) {
477 		print_value(thd->pdata, 0);
478 		thd->new_client = false;
479 	}
480 
481 	if (dev->sample_size == thd->sample_size) {
482 		/* Short path: Receive directly in the buffer */
483 
484 		size_t len = dev->buf->length;
485 		if (thd->nb < len)
486 			len = thd->nb;
487 
488 		return read_all(pdata, dev->buf->buffer, len);
489 	} else {
490 		/* Long path: Mux the samples to the buffer */
491 
492 		struct sample_cb_info info = {
493 			.pdata = pdata,
494 			.cpt = 0,
495 			.nb_bytes = thd->nb,
496 			.mask = thd->mask,
497 		};
498 
499 		return iio_buffer_foreach_sample(dev->buf,
500 				receive_sample, &info);
501 	}
502 }
503 
dev_entry_put(struct DevEntry * entry)504 static void dev_entry_put(struct DevEntry *entry)
505 {
506 	bool free_entry = false;
507 
508 	pthread_mutex_lock(&entry->thdlist_lock);
509 	entry->ref_count--;
510 	if (entry->ref_count == 0)
511 		free_entry = true;
512 	pthread_mutex_unlock(&entry->thdlist_lock);
513 
514 	if (free_entry) {
515 		pthread_mutex_destroy(&entry->thdlist_lock);
516 		pthread_cond_destroy(&entry->rw_ready_cond);
517 
518 		free(entry->mask);
519 		free(entry);
520 	}
521 }
522 
signal_thread(struct ThdEntry * thd,ssize_t ret)523 static void signal_thread(struct ThdEntry *thd, ssize_t ret)
524 {
525 	thd->err = ret;
526 	thd->nb = 0;
527 	thd->active = false;
528 	thd_entry_event_signal(thd);
529 }
530 
rw_thd(struct thread_pool * pool,void * d)531 static void rw_thd(struct thread_pool *pool, void *d)
532 {
533 	struct DevEntry *entry = d;
534 	struct ThdEntry *thd, *next_thd;
535 	struct iio_device *dev = entry->dev;
536 	unsigned int nb_words = entry->nb_words;
537 	ssize_t ret = 0;
538 
539 	IIO_DEBUG("R/W thread started for device %s\n",
540 			dev->name ? dev->name : dev->id);
541 
542 	while (true) {
543 		bool has_readers = false, has_writers = false,
544 		     mask_updated = false;
545 		unsigned int sample_size;
546 
547 		/* NOTE: this while loop must exit with thdlist_lock locked. */
548 		pthread_mutex_lock(&entry->thdlist_lock);
549 
550 		if (SLIST_EMPTY(&entry->thdlist_head))
551 			break;
552 
553 		if (entry->update_mask) {
554 			unsigned int i;
555 			unsigned int samples_count = 0;
556 
557 			memset(entry->mask, 0, nb_words * sizeof(*entry->mask));
558 			SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
559 				for (i = 0; i < nb_words; i++)
560 					entry->mask[i] |= thd->mask[i];
561 
562 				if (thd->samples_count > samples_count)
563 					samples_count = thd->samples_count;
564 			}
565 
566 			if (entry->buf)
567 				iio_buffer_destroy(entry->buf);
568 
569 			for (i = 0; i < dev->nb_channels; i++) {
570 				struct iio_channel *chn = dev->channels[i];
571 				long index = chn->index;
572 
573 				if (index < 0)
574 					continue;
575 
576 				if (TEST_BIT(entry->mask, chn->number))
577 					iio_channel_enable(chn);
578 				else
579 					iio_channel_disable(chn);
580 			}
581 
582 			entry->buf = iio_device_create_buffer(dev,
583 					samples_count, entry->cyclic);
584 			if (!entry->buf) {
585 				ret = -errno;
586 				IIO_ERROR("Unable to create buffer\n");
587 				break;
588 			}
589 			entry->cancelled = false;
590 
591 			/* Signal the threads that we opened the device */
592 			SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
593 				if (thd->wait_for_open) {
594 					thd->wait_for_open = false;
595 					signal_thread(thd, 0);
596 				}
597 			}
598 
599 			IIO_DEBUG("IIO device %s reopened with new mask:\n",
600 					dev->id);
601 			for (i = 0; i < nb_words; i++)
602 				IIO_DEBUG("Mask[%i] = 0x%08x\n", i, entry->mask[i]);
603 			entry->update_mask = false;
604 
605 			entry->sample_size = iio_device_get_sample_size(dev);
606 			mask_updated = true;
607 		}
608 
609 		sample_size = entry->sample_size;
610 
611 		SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
612 			thd->active = !thd->err && thd->nb >= sample_size;
613 			if (mask_updated && thd->active)
614 				signal_thread(thd, thd->nb);
615 
616 			if (thd->is_writer)
617 				has_writers |= thd->active;
618 			else
619 				has_readers |= thd->active;
620 		}
621 
622 		if (!has_readers && !has_writers) {
623 			pthread_cond_wait(&entry->rw_ready_cond,
624 					&entry->thdlist_lock);
625 		}
626 
627 		pthread_mutex_unlock(&entry->thdlist_lock);
628 
629 		if (!has_readers && !has_writers)
630 			continue;
631 
632 		if (has_readers) {
633 			ssize_t nb_bytes;
634 
635 			ret = iio_buffer_refill(entry->buf);
636 
637 			pthread_mutex_lock(&entry->thdlist_lock);
638 
639 			/*
640 			 * When the last client disconnects the buffer is
641 			 * cancelled and iio_buffer_refill() returns an error. A
642 			 * new client might have connected before we got here
643 			 * though, in that case the rw thread has to stay active
644 			 * and a new buffer is created. If the list is still empty the loop
645 			 * will exit normally.
646 			 */
647 			if (entry->cancelled) {
648 				pthread_mutex_unlock(&entry->thdlist_lock);
649 				continue;
650 			}
651 
652 			if (ret < 0) {
653 				IIO_ERROR("Reading from device failed: %i\n",
654 						(int) ret);
655 				break;
656 			}
657 
658 			nb_bytes = ret;
659 
660 			/* We don't use SLIST_FOREACH here. As soon as a thread is
661 			 * signaled, its "thd" structure might be freed;
662 			 * SLIST_FOREACH would then cause a segmentation fault, as it
663 			 * reads "thd" to get the address of the next element. */
664 			for (thd = SLIST_FIRST(&entry->thdlist_head);
665 					thd; thd = next_thd) {
666 				next_thd = SLIST_NEXT(thd, dev_list_entry);
667 
668 				if (!thd->active || thd->is_writer)
669 					continue;
670 
671 				ret = send_data(entry, thd, nb_bytes);
672 				if (ret > 0)
673 					thd->nb -= ret;
674 
675 				if (ret < 0 || thd->nb < sample_size)
676 					signal_thread(thd, (ret < 0) ?
677 							ret : (ssize_t) thd->nb);
678 			}
679 
680 			pthread_mutex_unlock(&entry->thdlist_lock);
681 		}
682 
683 		if (has_writers) {
684 			ssize_t nb_bytes = 0;
685 
686 			pthread_mutex_lock(&entry->thdlist_lock);
687 
688 			/* Reset the size of the buffer to its maximum size */
689 			entry->buf->data_length = entry->buf->length;
690 
691 			/* Same comment as above */
692 			for (thd = SLIST_FIRST(&entry->thdlist_head);
693 					thd; thd = next_thd) {
694 				next_thd = SLIST_NEXT(thd, dev_list_entry);
695 
696 				if (!thd->active || !thd->is_writer)
697 					continue;
698 
699 				ret = receive_data(entry, thd);
700 				if (ret > 0) {
701 					thd->nb -= ret;
702 					if (ret > nb_bytes)
703 						nb_bytes = ret;
704 				}
705 
706 				if (ret < 0)
707 					signal_thread(thd, ret);
708 			}
709 
710 			ret = iio_buffer_push_partial(entry->buf,
711 				nb_bytes / sample_size);
712 			if (entry->cancelled) {
713 				pthread_mutex_unlock(&entry->thdlist_lock);
714 				continue;
715 			}
716 			if (ret < 0) {
717 				IIO_ERROR("Writing to device failed: %i\n",
718 						(int) ret);
719 				break;
720 			}
721 
722 			/* Signal threads which completed their RW command */
723 			for (thd = SLIST_FIRST(&entry->thdlist_head);
724 					thd; thd = next_thd) {
725 				next_thd = SLIST_NEXT(thd, dev_list_entry);
726 				if (thd->active && thd->is_writer &&
727 						thd->nb < sample_size)
728 					signal_thread(thd, thd->nb);
729 			}
730 
731 			pthread_mutex_unlock(&entry->thdlist_lock);
732 		}
733 	}
734 
735 	/* Signal all remaining threads */
736 	for (thd = SLIST_FIRST(&entry->thdlist_head); thd; thd = next_thd) {
737 		next_thd = SLIST_NEXT(thd, dev_list_entry);
738 		SLIST_REMOVE(&entry->thdlist_head, thd, ThdEntry, dev_list_entry);
739 		thd->wait_for_open = false;
740 		signal_thread(thd, ret);
741 	}
742 	if (entry->buf) {
743 		iio_buffer_destroy(entry->buf);
744 		entry->buf = NULL;
745 	}
746 	entry->closed = true;
747 	pthread_mutex_unlock(&entry->thdlist_lock);
748 
749 	pthread_mutex_lock(&devlist_lock);
750 	/* It is possible that a new thread has already started, make sure to
751 	 * not overwrite it. */
752 	if (iio_device_get_data(dev) == entry)
753 		iio_device_set_data(dev, NULL);
754 	pthread_mutex_unlock(&devlist_lock);
755 
756 	IIO_DEBUG("Stopping R/W thread for device %s\n",
757 			dev->name ? dev->name : dev->id);
758 
759 	dev_entry_put(entry);
760 }
761 
parser_lookup_thd_entry(struct parser_pdata * pdata,struct iio_device * dev)762 static struct ThdEntry *parser_lookup_thd_entry(struct parser_pdata *pdata,
763 	struct iio_device *dev)
764 {
765 	struct ThdEntry *t;
766 
767 	SLIST_FOREACH(t, &pdata->thdlist_head, parser_list_entry) {
768 		if (t->dev == dev)
769 			return t;
770 	}
771 
772 	return NULL;
773 }
774 
rw_buffer(struct parser_pdata * pdata,struct iio_device * dev,unsigned int nb,bool is_write)775 static ssize_t rw_buffer(struct parser_pdata *pdata,
776 		struct iio_device *dev, unsigned int nb, bool is_write)
777 {
778 	struct DevEntry *entry;
779 	struct ThdEntry *thd;
780 	ssize_t ret;
781 
782 	if (!dev)
783 		return -ENODEV;
784 
785 	thd = parser_lookup_thd_entry(pdata, dev);
786 	if (!thd)
787 		return -EBADF;
788 
789 	entry = thd->entry;
790 
791 	if (nb < entry->sample_size)
792 		return 0;
793 
794 	pthread_mutex_lock(&entry->thdlist_lock);
795 	if (entry->closed) {
796 		pthread_mutex_unlock(&entry->thdlist_lock);
797 		return -EBADF;
798 	}
799 
800 	if (thd->nb) {
801 		pthread_mutex_unlock(&entry->thdlist_lock);
802 		return -EBUSY;
803 	}
804 
805 	thd->new_client = true;
806 	thd->nb = nb;
807 	thd->err = 0;
808 	thd->is_writer = is_write;
809 	thd->active = true;
810 
811 	pthread_cond_signal(&entry->rw_ready_cond);
812 
813 	IIO_DEBUG("Waiting for completion...\n");
814 	while (thd->active) {
815 		ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
816 		if (ret)
817 			break;
818 	}
819 	if (ret == 0)
820 		ret = thd->err;
821 	pthread_mutex_unlock(&entry->thdlist_lock);
822 
823 	if (ret > 0 && ret < (ssize_t) nb)
824 		print_value(thd->pdata, 0);
825 
826 	IIO_DEBUG("Exiting rw_buffer with code %li\n", (long) ret);
827 	if (ret < 0)
828 		return ret;
829 	else
830 		return nb - ret;
831 }
832 
get_mask(const char * mask,size_t * len)833 static uint32_t *get_mask(const char *mask, size_t *len)
834 {
835 	size_t nb = (*len + 7) / 8;
836 	uint32_t *ptr, *words = calloc(nb, sizeof(*words));
837 	if (!words)
838 		return NULL;
839 
840 	ptr = words + nb;
841 	while (*mask) {
842 		char buf[9];
843 		iio_snprintf(buf, sizeof(buf), "%.*s", 8, mask);
844 		sscanf(buf, "%08x", --ptr);
845 		mask += 8;
846 		IIO_DEBUG("Mask[%lu]: 0x%08x\n",
847 				(unsigned long) (words - ptr) / 4, *ptr);
848 	}
849 
850 	*len = nb;
851 	return words;
852 }
853 
free_thd_entry(struct ThdEntry * t)854 static void free_thd_entry(struct ThdEntry *t)
855 {
856 	close(t->eventfd);
857 	free(t->mask);
858 	free(t);
859 }
860 
remove_thd_entry(struct ThdEntry * t)861 static void remove_thd_entry(struct ThdEntry *t)
862 {
863 	struct DevEntry *entry = t->entry;
864 
865 	pthread_mutex_lock(&entry->thdlist_lock);
866 	if (!entry->closed) {
867 		entry->update_mask = true;
868 		SLIST_REMOVE(&entry->thdlist_head, t, ThdEntry, dev_list_entry);
869 		if (SLIST_EMPTY(&entry->thdlist_head) && entry->buf) {
870 			entry->cancelled = true;
871 			iio_buffer_cancel(entry->buf); /* Wakeup the rw thread */
872 		}
873 
874 		pthread_cond_signal(&entry->rw_ready_cond);
875 	}
876 	pthread_mutex_unlock(&entry->thdlist_lock);
877 	dev_entry_put(entry);
878 
879 	free_thd_entry(t);
880 }
881 
open_dev_helper(struct parser_pdata * pdata,struct iio_device * dev,size_t samples_count,const char * mask,bool cyclic)882 static int open_dev_helper(struct parser_pdata *pdata, struct iio_device *dev,
883 		size_t samples_count, const char *mask, bool cyclic)
884 {
885 	int ret = -ENOMEM;
886 	struct DevEntry *entry;
887 	struct ThdEntry *thd;
888 	size_t len = strlen(mask);
889 	uint32_t *words;
890 	unsigned int nb_channels;
891 	unsigned int cyclic_retry = 500;
892 
893 	if (!dev)
894 		return -ENODEV;
895 
896 	nb_channels = dev->nb_channels;
897 	if (len != ((nb_channels + 31) / 32) * 8)
898 		return -EINVAL;
899 
900 	words = get_mask(mask, &len);
901 	if (!words)
902 		return -ENOMEM;
903 
904 	thd = zalloc(sizeof(*thd));
905 	if (!thd)
906 		goto err_free_words;
907 
908 	thd->wait_for_open = true;
909 	thd->mask = words;
910 	thd->nb = 0;
911 	thd->samples_count = samples_count;
912 	thd->sample_size = iio_device_get_sample_size_mask(dev, words, len);
913 	thd->pdata = pdata;
914 	thd->dev = dev;
915 	thd->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
916 
917 retry:
918 	/* Atomically look up the thread and make sure that it is still active
919 	 * or allocate new one. */
920 	pthread_mutex_lock(&devlist_lock);
921 	entry = iio_device_get_data(dev);
922 	if (entry) {
923 		if (cyclic || entry->cyclic) {
924 			/* Only one client allowed in cyclic mode */
925 			pthread_mutex_unlock(&devlist_lock);
926 
927 			/* There is an inherent race condition if a client
928 			 * creates a new cyclic buffer shortly after destroying
929 			 * a previous. E.g. like
930 			 *
931 			 *     iio_buffer_destroy(buf);
932 			 *     buf = iio_device_create_buffer(dev, n, true);
933 			 *
934 			 * In this case the two buffers each use their own
935 			 * communication channel which are unordered to each
936 			 * other. E.g. the socket open might arrive before the
937 			 * socket close on the host side, even though they were
938 			 * sent in the opposite order on the client side. This
939 			 * race condition can cause an error being reported back
940 			 * to the client, even though the code on the client
941 			 * side was well formed and would work fine e.g. using
942 			 * the local backend.
943 			 *
944 			 * To avoid this issue go to sleep for up to 50ms in
945 			 * intervals of 100us. This should be enough time for
946 			 * the issue to resolve itself. If there actually is
947 			 * contention on the buffer an error will eventually be
948 			 * returned in which case the additional delay cause by
949 			 * the retires should not matter too much.
950 			 *
951 			 * This is not pretty but it works.
952 			 */
953 			if (cyclic_retry) {
954 				struct timespec wait;
955 				wait.tv_sec = 0;
956 				wait.tv_nsec = (100 * 1000);
957 				cyclic_retry--;
958 				nanosleep(&wait, &wait);
959 				goto retry;
960 			}
961 
962 			ret = -EBUSY;
963 			goto err_free_thd;
964 		}
965 
966 		pthread_mutex_lock(&entry->thdlist_lock);
967 		if (!entry->closed) {
968 			pthread_mutex_unlock(&devlist_lock);
969 
970 			entry->ref_count++;
971 
972 			SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
973 			thd->entry = entry;
974 			entry->update_mask = true;
975 			IIO_DEBUG("Added thread to client list\n");
976 
977 			pthread_cond_signal(&entry->rw_ready_cond);
978 
979 			/* Wait until the device is opened by the rw thread */
980 			while (thd->wait_for_open) {
981 				ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
982 				if (ret)
983 					break;
984 			}
985 			pthread_mutex_unlock(&entry->thdlist_lock);
986 
987 			if (ret == 0)
988 				ret = (int) thd->err;
989 			if (ret < 0)
990 				remove_thd_entry(thd);
991 			else
992 				SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
993 			return ret;
994 		} else {
995 			pthread_mutex_unlock(&entry->thdlist_lock);
996 		}
997 	}
998 
999 	entry = zalloc(sizeof(*entry));
1000 	if (!entry) {
1001 		pthread_mutex_unlock(&devlist_lock);
1002 		goto err_free_thd;
1003 	}
1004 
1005 	entry->ref_count = 2; /* One for thread, one for the client */
1006 
1007 	entry->mask = malloc(len * sizeof(*words));
1008 	if (!entry->mask) {
1009 		pthread_mutex_unlock(&devlist_lock);
1010 		goto err_free_entry;
1011 	}
1012 
1013 	entry->cyclic = cyclic;
1014 	entry->nb_words = len;
1015 	entry->update_mask = true;
1016 	entry->dev = dev;
1017 	entry->buf = NULL;
1018 	SLIST_INIT(&entry->thdlist_head);
1019 	SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
1020 	thd->entry = entry;
1021 	IIO_DEBUG("Added thread to client list\n");
1022 
1023 	pthread_mutex_init(&entry->thdlist_lock, NULL);
1024 	pthread_cond_init(&entry->rw_ready_cond, NULL);
1025 
1026 	ret = thread_pool_add_thread(main_thread_pool, rw_thd, entry, "rw_thd");
1027 	if (ret) {
1028 		pthread_mutex_unlock(&devlist_lock);
1029 		goto err_free_entry_mask;
1030 	}
1031 
1032 	IIO_DEBUG("Adding new device thread to device list\n");
1033 	iio_device_set_data(dev, entry);
1034 	pthread_mutex_unlock(&devlist_lock);
1035 
1036 	pthread_mutex_lock(&entry->thdlist_lock);
1037 	/* Wait until the device is opened by the rw thread */
1038 	while (thd->wait_for_open) {
1039 		ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
1040 		if (ret)
1041 			break;
1042 	}
1043 	pthread_mutex_unlock(&entry->thdlist_lock);
1044 
1045 	if (ret == 0)
1046 		ret = (int) thd->err;
1047 	if (ret < 0)
1048 		remove_thd_entry(thd);
1049 	else
1050 		SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
1051 	return ret;
1052 
1053 err_free_entry_mask:
1054 	free(entry->mask);
1055 err_free_entry:
1056 	free(entry);
1057 err_free_thd:
1058 	close(thd->eventfd);
1059 	free(thd);
1060 err_free_words:
1061 	free(words);
1062 	return ret;
1063 }
1064 
close_dev_helper(struct parser_pdata * pdata,struct iio_device * dev)1065 static int close_dev_helper(struct parser_pdata *pdata, struct iio_device *dev)
1066 {
1067 	struct ThdEntry *t;
1068 
1069 	if (!dev)
1070 		return -ENODEV;
1071 
1072 	t = parser_lookup_thd_entry(pdata, dev);
1073 	if (!t)
1074 		return -ENXIO;
1075 
1076 	SLIST_REMOVE(&pdata->thdlist_head, t, ThdEntry, parser_list_entry);
1077 	remove_thd_entry(t);
1078 
1079 	return 0;
1080 }
1081 
open_dev(struct parser_pdata * pdata,struct iio_device * dev,size_t samples_count,const char * mask,bool cyclic)1082 int open_dev(struct parser_pdata *pdata, struct iio_device *dev,
1083 		size_t samples_count, const char *mask, bool cyclic)
1084 {
1085 	int ret = open_dev_helper(pdata, dev, samples_count, mask, cyclic);
1086 	print_value(pdata, ret);
1087 	return ret;
1088 }
1089 
close_dev(struct parser_pdata * pdata,struct iio_device * dev)1090 int close_dev(struct parser_pdata *pdata, struct iio_device *dev)
1091 {
1092 	int ret = close_dev_helper(pdata, dev);
1093 	print_value(pdata, ret);
1094 	return ret;
1095 }
1096 
rw_dev(struct parser_pdata * pdata,struct iio_device * dev,unsigned int nb,bool is_write)1097 ssize_t rw_dev(struct parser_pdata *pdata, struct iio_device *dev,
1098 		unsigned int nb, bool is_write)
1099 {
1100 	ssize_t ret = rw_buffer(pdata, dev, nb, is_write);
1101 	if (ret <= 0 || is_write)
1102 		print_value(pdata, ret);
1103 	return ret;
1104 }
1105 
read_dev_attr(struct parser_pdata * pdata,struct iio_device * dev,const char * attr,enum iio_attr_type type)1106 ssize_t read_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
1107 		const char *attr, enum iio_attr_type type)
1108 {
1109 	/* We use a very large buffer here, as if attr is NULL all the
1110 	 * attributes will be read, which may represents a few kilobytes worth
1111 	 * of data. */
1112 	char buf[0x10000];
1113 	ssize_t ret = -EINVAL;
1114 
1115 	if (!dev) {
1116 		print_value(pdata, -ENODEV);
1117 		return -ENODEV;
1118 	}
1119 
1120 	switch (type) {
1121 		case IIO_ATTR_TYPE_DEVICE:
1122 			ret = iio_device_attr_read(dev, attr, buf, sizeof(buf) - 1);
1123 			break;
1124 		case IIO_ATTR_TYPE_DEBUG:
1125 			ret = iio_device_debug_attr_read(dev,
1126 				attr, buf, sizeof(buf) - 1);
1127 			break;
1128 		case IIO_ATTR_TYPE_BUFFER:
1129 			ret = iio_device_buffer_attr_read(dev,
1130 							attr, buf, sizeof(buf) - 1);
1131 			break;
1132 		default:
1133 			ret = -EINVAL;
1134 			break;
1135 	}
1136 	print_value(pdata, ret);
1137 	if (ret < 0)
1138 		return ret;
1139 
1140 	buf[ret] = '\n';
1141 	return write_all(pdata, buf, ret + 1);
1142 }
1143 
write_dev_attr(struct parser_pdata * pdata,struct iio_device * dev,const char * attr,size_t len,enum iio_attr_type type)1144 ssize_t write_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
1145 		const char *attr, size_t len, enum iio_attr_type type)
1146 {
1147 	ssize_t ret = -ENOMEM;
1148 	char *buf;
1149 
1150 	if (!dev) {
1151 		ret = -ENODEV;
1152 		goto err_print_value;
1153 	}
1154 
1155 	buf = malloc(len);
1156 	if (!buf)
1157 		goto err_print_value;
1158 
1159 	ret = read_all(pdata, buf, len);
1160 	if (ret < 0)
1161 		goto err_free_buffer;
1162 
1163 	switch (type) {
1164 		case IIO_ATTR_TYPE_DEVICE:
1165 			ret = iio_device_attr_write_raw(dev, attr, buf, len);
1166 			break;
1167 		case IIO_ATTR_TYPE_DEBUG:
1168 			ret = iio_device_debug_attr_write_raw(dev, attr, buf, len);
1169 			break;
1170 		case IIO_ATTR_TYPE_BUFFER:
1171 			ret = iio_device_buffer_attr_write_raw(dev, attr, buf, len);
1172 			break;
1173 		default:
1174 			ret = -EINVAL;
1175 			break;
1176 	}
1177 
1178 err_free_buffer:
1179 	free(buf);
1180 err_print_value:
1181 	print_value(pdata, ret);
1182 	return ret;
1183 }
1184 
read_chn_attr(struct parser_pdata * pdata,struct iio_channel * chn,const char * attr)1185 ssize_t read_chn_attr(struct parser_pdata *pdata,
1186 		struct iio_channel *chn, const char *attr)
1187 {
1188 	char buf[1024];
1189 	ssize_t ret = -ENODEV;
1190 
1191 	if (chn)
1192 		ret = iio_channel_attr_read(chn, attr, buf, sizeof(buf) - 1);
1193 	else if (pdata->dev)
1194 		ret = -ENXIO;
1195 	print_value(pdata, ret);
1196 	if (ret < 0)
1197 		return ret;
1198 
1199 	buf[ret] = '\n';
1200 	return write_all(pdata, buf, ret + 1);
1201 }
1202 
write_chn_attr(struct parser_pdata * pdata,struct iio_channel * chn,const char * attr,size_t len)1203 ssize_t write_chn_attr(struct parser_pdata *pdata,
1204 		struct iio_channel *chn, const char *attr, size_t len)
1205 {
1206 	ssize_t ret = -ENOMEM;
1207 	char *buf = malloc(len);
1208 	if (!buf)
1209 		goto err_print_value;
1210 
1211 	ret = read_all(pdata, buf, len);
1212 	if (ret < 0)
1213 		goto err_free_buffer;
1214 
1215 	if (chn)
1216 		ret = iio_channel_attr_write_raw(chn, attr, buf, len);
1217 	else if (pdata->dev)
1218 		ret = -ENXIO;
1219 	else
1220 		ret = -ENODEV;
1221 err_free_buffer:
1222 	free(buf);
1223 err_print_value:
1224 	print_value(pdata, ret);
1225 	return ret;
1226 }
1227 
set_trigger(struct parser_pdata * pdata,struct iio_device * dev,const char * trigger)1228 ssize_t set_trigger(struct parser_pdata *pdata,
1229 		struct iio_device *dev, const char *trigger)
1230 {
1231 	struct iio_device *trig = NULL;
1232 	ssize_t ret = -ENOENT;
1233 
1234 	if (!dev) {
1235 		ret = -ENODEV;
1236 		goto err_print_value;
1237 	}
1238 
1239 	if (trigger) {
1240 		trig = iio_context_find_device(pdata->ctx, trigger);
1241 		if (!trig)
1242 			goto err_print_value;
1243 	}
1244 
1245 	ret = iio_device_set_trigger(dev, trig);
1246 err_print_value:
1247 	print_value(pdata, ret);
1248 	return ret;
1249 }
1250 
get_trigger(struct parser_pdata * pdata,struct iio_device * dev)1251 ssize_t get_trigger(struct parser_pdata *pdata, struct iio_device *dev)
1252 {
1253 	const struct iio_device *trigger;
1254 	ssize_t ret;
1255 
1256 	if (!dev) {
1257 		print_value(pdata, -ENODEV);
1258 		return -ENODEV;
1259 	}
1260 
1261 	ret = iio_device_get_trigger(dev, &trigger);
1262 	if (!ret && trigger) {
1263 		char buf[256];
1264 
1265 		ret = strlen(trigger->name);
1266 		print_value(pdata, ret);
1267 
1268 		iio_snprintf(buf, sizeof(buf), "%s\n", trigger->name);
1269 		ret = write_all(pdata, buf, ret + 1);
1270 	} else {
1271 		print_value(pdata, ret);
1272 	}
1273 	return ret;
1274 }
1275 
set_timeout(struct parser_pdata * pdata,unsigned int timeout)1276 int set_timeout(struct parser_pdata *pdata, unsigned int timeout)
1277 {
1278 	int ret = iio_context_set_timeout(pdata->ctx, timeout);
1279 	print_value(pdata, ret);
1280 	return ret;
1281 }
1282 
set_buffers_count(struct parser_pdata * pdata,struct iio_device * dev,long value)1283 int set_buffers_count(struct parser_pdata *pdata,
1284 		struct iio_device *dev, long value)
1285 {
1286 	int ret = -EINVAL;
1287 
1288 	if (!dev) {
1289 		ret = -ENODEV;
1290 		goto err_print_value;
1291 	}
1292 
1293 	if (value >= 1)
1294 		ret = iio_device_set_kernel_buffers_count(
1295 				dev, (unsigned int) value);
1296 err_print_value:
1297 	print_value(pdata, ret);
1298 	return ret;
1299 }
1300 
read_line(struct parser_pdata * pdata,char * buf,size_t len)1301 ssize_t read_line(struct parser_pdata *pdata, char *buf, size_t len)
1302 {
1303 	ssize_t ret;
1304 
1305 	if (pdata->fd_in_is_socket) {
1306 		struct pollfd pfd[2];
1307 		bool found;
1308 		size_t bytes_read = 0;
1309 
1310 		pfd[0].fd = pdata->fd_in;
1311 		pfd[0].events = POLLIN | POLLRDHUP;
1312 		pfd[0].revents = 0;
1313 		pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
1314 		pfd[1].events = POLLIN;
1315 		pfd[1].revents = 0;
1316 
1317 		do {
1318 			size_t i, to_trunc;
1319 
1320 			poll_nointr(pfd, 2);
1321 
1322 			if (pfd[1].revents & POLLIN ||
1323 					pfd[0].revents & POLLRDHUP)
1324 				return 0;
1325 
1326 			/* First read from the socket, without advancing the
1327 			 * read offset */
1328 			ret = recv(pdata->fd_in, buf, len,
1329 					MSG_NOSIGNAL | MSG_PEEK);
1330 			if (ret < 0)
1331 				return -errno;
1332 
1333 			/* Lookup for the trailing \n */
1334 			for (i = 0; i < (size_t) ret && buf[i] != '\n'; i++);
1335 			found = i < (size_t) ret;
1336 
1337 			len -= ret;
1338 			buf += ret;
1339 
1340 			to_trunc = found ? i + 1 : (size_t) ret;
1341 
1342 			/* Advance the read offset after the \n if found, or
1343 			 * after the last character read otherwise */
1344 			ret = recv(pdata->fd_in, NULL, to_trunc,
1345 					MSG_NOSIGNAL | MSG_TRUNC);
1346 			if (ret < 0)
1347 				return -errno;
1348 
1349 			bytes_read += to_trunc;
1350 		} while (!found && len);
1351 
1352 		/* No \n found? Just garbage data */
1353 		if (!found)
1354 			ret = -EIO;
1355 		else
1356 			ret = bytes_read;
1357 	} else {
1358 		ret = pdata->readfd(pdata, buf, len);
1359 	}
1360 
1361 	return ret;
1362 }
1363 
interpreter(struct iio_context * ctx,int fd_in,int fd_out,bool verbose,bool is_socket,bool use_aio,struct thread_pool * pool)1364 void interpreter(struct iio_context *ctx, int fd_in, int fd_out, bool verbose,
1365 	bool is_socket, bool use_aio, struct thread_pool *pool)
1366 {
1367 	yyscan_t scanner;
1368 	struct parser_pdata pdata;
1369 	unsigned int i;
1370 	int ret;
1371 
1372 	pdata.ctx = ctx;
1373 	pdata.stop = false;
1374 	pdata.fd_in = fd_in;
1375 	pdata.fd_out = fd_out;
1376 	pdata.verbose = verbose;
1377 	pdata.pool = pool;
1378 
1379 	pdata.fd_in_is_socket = is_socket;
1380 	pdata.fd_out_is_socket = is_socket;
1381 
1382 	SLIST_INIT(&pdata.thdlist_head);
1383 
1384 	if (use_aio) {
1385 		/* Note: if WITH_AIO is not defined, use_aio is always false.
1386 		 * We ensure that in iiod.c. */
1387 #if WITH_AIO
1388 		char err_str[1024];
1389 
1390 		pdata.aio_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
1391 		if (pdata.aio_eventfd < 0) {
1392 			iio_strerror(errno, err_str, sizeof(err_str));
1393 			IIO_ERROR("Failed to create AIO eventfd: %s\n", err_str);
1394 			return;
1395 		}
1396 
1397 		pdata.aio_ctx = 0;
1398 		ret = io_setup(1, &pdata.aio_ctx);
1399 		if (ret < 0) {
1400 			iio_strerror(-ret, err_str, sizeof(err_str));
1401 			IIO_ERROR("Failed to create AIO context: %s\n", err_str);
1402 			close(pdata.aio_eventfd);
1403 			return;
1404 		}
1405 		pthread_mutex_init(&pdata.aio_mutex, NULL);
1406 		pdata.readfd = readfd_aio;
1407 		pdata.writefd = writefd_aio;
1408 #endif
1409 	} else {
1410 		pdata.readfd = readfd_io;
1411 		pdata.writefd = writefd_io;
1412 	}
1413 
1414 	yylex_init_extra(&pdata, &scanner);
1415 
1416 	do {
1417 		if (verbose)
1418 			output(&pdata, "iio-daemon > ");
1419 		ret = yyparse(scanner);
1420 	} while (!pdata.stop && ret >= 0);
1421 
1422 	yylex_destroy(scanner);
1423 
1424 	/* Close all opened devices */
1425 	for (i = 0; i < ctx->nb_devices; i++)
1426 		close_dev_helper(&pdata, ctx->devices[i]);
1427 
1428 #if WITH_AIO
1429 	if (use_aio) {
1430 		io_destroy(pdata.aio_ctx);
1431 		close(pdata.aio_eventfd);
1432 	}
1433 #endif
1434 }
1435