1 /*
2 * libiio - Library for interfacing industrial I/O (IIO) devices
3 *
4 * Copyright (C) 2014 Analog Devices, Inc.
5 * Author: Paul Cercueil <paul.cercueil@analog.com>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * */
18
19 #include "ops.h"
20 #include "parser.h"
21 #include "thread-pool.h"
22 #include "../debug.h"
23 #include "../iio-private.h"
24
25 #include <errno.h>
26 #include <limits.h>
27 #include <pthread.h>
28 #include <poll.h>
29 #include <stdbool.h>
30 #include <string.h>
31 #include <sys/eventfd.h>
32 #include <time.h>
33 #include <fcntl.h>
34 #include <signal.h>
35
36 int yyparse(yyscan_t scanner);
37
38 struct DevEntry;
39
40 /* Corresponds to a thread reading from a device */
41 struct ThdEntry {
42 SLIST_ENTRY(ThdEntry) parser_list_entry;
43 SLIST_ENTRY(ThdEntry) dev_list_entry;
44 unsigned int nb, sample_size, samples_count;
45 ssize_t err;
46
47 int eventfd;
48
49 struct parser_pdata *pdata;
50 struct iio_device *dev;
51 struct DevEntry *entry;
52
53 uint32_t *mask;
54 bool active, is_writer, new_client, wait_for_open;
55 };
56
thd_entry_event_signal(struct ThdEntry * thd)57 static void thd_entry_event_signal(struct ThdEntry *thd)
58 {
59 uint64_t e = 1;
60 int ret;
61
62 do {
63 ret = write(thd->eventfd, &e, sizeof(e));
64 } while (ret == -1 && errno == EINTR);
65 }
66
thd_entry_event_wait(struct ThdEntry * thd,pthread_mutex_t * mutex,int fd_in)67 static int thd_entry_event_wait(struct ThdEntry *thd, pthread_mutex_t *mutex,
68 int fd_in)
69 {
70 struct pollfd pfd[3];
71 uint64_t e;
72 int ret;
73
74 pthread_mutex_unlock(mutex);
75
76 pfd[0].fd = thd->eventfd;
77 pfd[0].events = POLLIN;
78 pfd[1].fd = fd_in;
79 pfd[1].events = POLLRDHUP;
80 pfd[2].fd = thread_pool_get_poll_fd(thd->pdata->pool);
81 pfd[2].events = POLLIN;
82
83 do {
84 poll_nointr(pfd, 3);
85
86 if ((pfd[1].revents & POLLRDHUP) || (pfd[2].revents & POLLIN)) {
87 pthread_mutex_lock(mutex);
88 return -EPIPE;
89 }
90
91 do {
92 ret = read(thd->eventfd, &e, sizeof(e));
93 } while (ret == -1 && errno == EINTR);
94 } while (ret == -1 && errno == EAGAIN);
95
96 pthread_mutex_lock(mutex);
97
98 return 0;
99 }
100
101 /* Corresponds to an opened device */
102 struct DevEntry {
103 unsigned int ref_count;
104
105 struct iio_device *dev;
106 struct iio_buffer *buf;
107 unsigned int sample_size, nb_clients;
108 bool update_mask;
109 bool cyclic;
110 bool closed;
111 bool cancelled;
112
113 /* Linked list of ThdEntry structures corresponding
114 * to all the threads who opened the device */
115 SLIST_HEAD(ThdHead, ThdEntry) thdlist_head;
116 pthread_mutex_t thdlist_lock;
117
118 pthread_cond_t rw_ready_cond;
119
120 uint32_t *mask;
121 size_t nb_words;
122 };
123
124 struct sample_cb_info {
125 struct parser_pdata *pdata;
126 unsigned int nb_bytes, cpt;
127 uint32_t *mask;
128 };
129
130 /* Protects iio_device_{set,get}_data() from concurrent access from multiple
131 * clients */
132 static pthread_mutex_t devlist_lock = PTHREAD_MUTEX_INITIALIZER;
133
134 #if WITH_AIO
async_io(struct parser_pdata * pdata,void * buf,size_t len,bool do_read)135 static ssize_t async_io(struct parser_pdata *pdata, void *buf, size_t len,
136 bool do_read)
137 {
138 ssize_t ret;
139 struct pollfd pfd[2];
140 unsigned int num_pfds;
141 struct iocb iocb;
142 struct iocb *ios[1];
143 struct io_event e[1];
144
145 ios[0] = &iocb;
146
147 if (do_read)
148 io_prep_pread(&iocb, pdata->fd_in, buf, len, 0);
149 else
150 io_prep_pwrite(&iocb, pdata->fd_out, buf, len, 0);
151
152 io_set_eventfd(&iocb, pdata->aio_eventfd);
153
154 pthread_mutex_lock(&pdata->aio_mutex);
155
156 ret = io_submit(pdata->aio_ctx, 1, ios);
157 if (ret != 1) {
158 pthread_mutex_unlock(&pdata->aio_mutex);
159 IIO_ERROR("Failed to submit IO operation: %zd\n", ret);
160 return -EIO;
161 }
162
163 pfd[0].fd = pdata->aio_eventfd;
164 pfd[0].events = POLLIN;
165 pfd[0].revents = 0;
166 pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
167 pfd[1].events = POLLIN;
168 pfd[1].revents = 0;
169 num_pfds = 2;
170
171 do {
172 poll_nointr(pfd, num_pfds);
173
174 if (pfd[0].revents & POLLIN) {
175 uint64_t event;
176 ret = read(pdata->aio_eventfd, &event, sizeof(event));
177 if (ret != sizeof(event)) {
178 IIO_ERROR("Failed to read from eventfd: %d\n", -errno);
179 ret = -EIO;
180 break;
181 }
182
183 ret = io_getevents(pdata->aio_ctx, 0, 1, e, NULL);
184 if (ret != 1) {
185 IIO_ERROR("Failed to read IO events: %zd\n", ret);
186 ret = -EIO;
187 break;
188 } else {
189 ret = (long)e[0].res;
190 }
191 } else if ((num_pfds > 1 && pfd[1].revents & POLLIN)) {
192 /* Got a STOP event to abort this whole session */
193 ret = io_cancel(pdata->aio_ctx, &iocb, e);
194 if (ret != -EINPROGRESS && ret != -EINVAL) {
195 IIO_ERROR("Failed to cancel IO transfer: %zd\n", ret);
196 ret = -EIO;
197 break;
198 }
199 /* It should not be long now until we get the cancellation event */
200 num_pfds = 1;
201 }
202 } while (!(pfd[0].revents & POLLIN));
203
204 pthread_mutex_unlock(&pdata->aio_mutex);
205
206 /* Got STOP event, treat it as EOF */
207 if (num_pfds == 1)
208 return 0;
209
210 return ret;
211 }
212
213 #define MAX_AIO_REQ_SIZE (1024 * 1024)
214
readfd_aio(struct parser_pdata * pdata,void * dest,size_t len)215 static ssize_t readfd_aio(struct parser_pdata *pdata, void *dest, size_t len)
216 {
217 if (len > MAX_AIO_REQ_SIZE)
218 len = MAX_AIO_REQ_SIZE;
219 return async_io(pdata, dest, len, true);
220 }
221
writefd_aio(struct parser_pdata * pdata,const void * dest,size_t len)222 static ssize_t writefd_aio(struct parser_pdata *pdata, const void *dest,
223 size_t len)
224 {
225 if (len > MAX_AIO_REQ_SIZE)
226 len = MAX_AIO_REQ_SIZE;
227 return async_io(pdata, (void *)dest, len, false);
228 }
229 #endif /* WITH_AIO */
230
readfd_io(struct parser_pdata * pdata,void * dest,size_t len)231 static ssize_t readfd_io(struct parser_pdata *pdata, void *dest, size_t len)
232 {
233 ssize_t ret;
234 struct pollfd pfd[2];
235
236 pfd[0].fd = pdata->fd_in;
237 pfd[0].events = POLLIN | POLLRDHUP;
238 pfd[0].revents = 0;
239 pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
240 pfd[1].events = POLLIN;
241 pfd[1].revents = 0;
242
243 do {
244 poll_nointr(pfd, 2);
245
246 /* Got STOP event, or client closed the socket: treat it as EOF */
247 if (pfd[1].revents & POLLIN || pfd[0].revents & POLLRDHUP)
248 return 0;
249 if (pfd[0].revents & POLLERR)
250 return -EIO;
251 if (!(pfd[0].revents & POLLIN))
252 continue;
253
254 do {
255 if (pdata->fd_in_is_socket)
256 ret = recv(pdata->fd_in, dest, len, MSG_NOSIGNAL);
257 else
258 ret = read(pdata->fd_in, dest, len);
259 } while (ret == -1 && errno == EINTR);
260
261 if (ret != -1 || errno != EAGAIN)
262 break;
263 } while (true);
264
265 if (ret == -1)
266 return -errno;
267
268 return ret;
269 }
270
writefd_io(struct parser_pdata * pdata,const void * src,size_t len)271 static ssize_t writefd_io(struct parser_pdata *pdata, const void *src, size_t len)
272 {
273 ssize_t ret;
274 struct pollfd pfd[2];
275
276 pfd[0].fd = pdata->fd_out;
277 pfd[0].events = POLLOUT;
278 pfd[0].revents = 0;
279 pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
280 pfd[1].events = POLLIN;
281 pfd[1].revents = 0;
282
283 do {
284 poll_nointr(pfd, 2);
285
286 /* Got STOP event, or client closed the socket: treat it as EOF */
287 if (pfd[1].revents & POLLIN || pfd[0].revents & POLLHUP)
288 return 0;
289 if (pfd[0].revents & POLLERR)
290 return -EIO;
291 if (!(pfd[0].revents & POLLOUT))
292 continue;
293
294 do {
295 if (pdata->fd_out_is_socket)
296 ret = send(pdata->fd_out, src, len, MSG_NOSIGNAL);
297 else
298 ret = write(pdata->fd_out, src, len);
299 } while (ret == -1 && errno == EINTR);
300
301 if (ret != -1 || errno != EAGAIN)
302 break;
303 } while (true);
304
305 if (ret == -1)
306 return -errno;
307
308 return ret;
309 }
310
write_all(struct parser_pdata * pdata,const void * src,size_t len)311 ssize_t write_all(struct parser_pdata *pdata, const void *src, size_t len)
312 {
313 uintptr_t ptr = (uintptr_t) src;
314
315 while (len) {
316 ssize_t ret = pdata->writefd(pdata, (void *) ptr, len);
317 if (ret < 0)
318 return ret;
319 if (!ret)
320 return -EPIPE;
321 ptr += ret;
322 len -= ret;
323 }
324
325 return ptr - (uintptr_t) src;
326 }
327
read_all(struct parser_pdata * pdata,void * dst,size_t len)328 static ssize_t read_all(struct parser_pdata *pdata,
329 void *dst, size_t len)
330 {
331 uintptr_t ptr = (uintptr_t) dst;
332
333 while (len) {
334 ssize_t ret = pdata->readfd(pdata, (void *) ptr, len);
335 if (ret < 0)
336 return ret;
337 if (!ret)
338 return -EPIPE;
339 ptr += ret;
340 len -= ret;
341 }
342
343 return ptr - (uintptr_t) dst;
344 }
345
print_value(struct parser_pdata * pdata,long value)346 static void print_value(struct parser_pdata *pdata, long value)
347 {
348 if (pdata->verbose && value < 0) {
349 char buf[1024];
350 iio_strerror(-value, buf, sizeof(buf));
351 output(pdata, "ERROR: ");
352 output(pdata, buf);
353 output(pdata, "\n");
354 } else {
355 char buf[128];
356 iio_snprintf(buf, sizeof(buf), "%li\n", value);
357 output(pdata, buf);
358 }
359 }
360
send_sample(const struct iio_channel * chn,void * src,size_t length,void * d)361 static ssize_t send_sample(const struct iio_channel *chn,
362 void *src, size_t length, void *d)
363 {
364 struct sample_cb_info *info = d;
365 if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
366 return 0;
367 if (info->nb_bytes < length)
368 return 0;
369
370 if (info->cpt % length) {
371 unsigned int i, goal = length - info->cpt % length;
372 char zero = 0;
373 ssize_t ret;
374
375 for (i = 0; i < goal; i++) {
376 ret = info->pdata->writefd(info->pdata, &zero, 1);
377 if (ret < 0)
378 return ret;
379 }
380 info->cpt += goal;
381 }
382
383 info->cpt += length;
384 info->nb_bytes -= length;
385 return write_all(info->pdata, src, length);
386 }
387
receive_sample(const struct iio_channel * chn,void * dst,size_t length,void * d)388 static ssize_t receive_sample(const struct iio_channel *chn,
389 void *dst, size_t length, void *d)
390 {
391 struct sample_cb_info *info = d;
392 if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
393 return 0;
394 if (info->cpt == info->nb_bytes)
395 return 0;
396
397 /* Skip the padding if needed */
398 if (info->cpt % length) {
399 unsigned int i, goal = length - info->cpt % length;
400 char foo;
401 ssize_t ret;
402
403 for (i = 0; i < goal; i++) {
404 ret = info->pdata->readfd(info->pdata, &foo, 1);
405 if (ret < 0)
406 return ret;
407 }
408 info->cpt += goal;
409 }
410
411 info->cpt += length;
412 return read_all(info->pdata, dst, length);
413 }
414
send_data(struct DevEntry * dev,struct ThdEntry * thd,size_t len)415 static ssize_t send_data(struct DevEntry *dev, struct ThdEntry *thd, size_t len)
416 {
417 struct parser_pdata *pdata = thd->pdata;
418 bool demux = server_demux && dev->sample_size != thd->sample_size;
419
420 if (demux)
421 len = (len / dev->sample_size) * thd->sample_size;
422 if (len > thd->nb)
423 len = thd->nb;
424
425 print_value(pdata, len);
426
427 if (thd->new_client) {
428 unsigned int i;
429 char buf[129], *ptr = buf;
430 uint32_t *mask = demux ? thd->mask : dev->mask;
431 ssize_t ret, len;
432
433 len = sizeof(buf);
434 /* Send the current mask */
435 for (i = dev->nb_words; i > 0 && ptr < buf + sizeof(buf);
436 i--, ptr += 8) {
437 iio_snprintf(ptr, len, "%08x", mask[i - 1]);
438 len -= 8;
439 }
440
441 *ptr = '\n';
442 len--;
443
444 if (len < 0) {
445 IIO_ERROR("send_data: string length error\n");
446 return -ENOSPC;
447 }
448
449 ret = write_all(pdata, buf, ptr + 1 - buf);
450 if (ret < 0)
451 return ret;
452
453 thd->new_client = false;
454 }
455
456 if (!demux) {
457 /* Short path */
458 return write_all(pdata, dev->buf->buffer, len);
459 } else {
460 struct sample_cb_info info = {
461 .pdata = pdata,
462 .cpt = 0,
463 .nb_bytes = len,
464 .mask = thd->mask,
465 };
466
467 return iio_buffer_foreach_sample(dev->buf, send_sample, &info);
468 }
469 }
470
receive_data(struct DevEntry * dev,struct ThdEntry * thd)471 static ssize_t receive_data(struct DevEntry *dev, struct ThdEntry *thd)
472 {
473 struct parser_pdata *pdata = thd->pdata;
474
475 /* Inform that no error occurred, and that we'll start reading data */
476 if (thd->new_client) {
477 print_value(thd->pdata, 0);
478 thd->new_client = false;
479 }
480
481 if (dev->sample_size == thd->sample_size) {
482 /* Short path: Receive directly in the buffer */
483
484 size_t len = dev->buf->length;
485 if (thd->nb < len)
486 len = thd->nb;
487
488 return read_all(pdata, dev->buf->buffer, len);
489 } else {
490 /* Long path: Mux the samples to the buffer */
491
492 struct sample_cb_info info = {
493 .pdata = pdata,
494 .cpt = 0,
495 .nb_bytes = thd->nb,
496 .mask = thd->mask,
497 };
498
499 return iio_buffer_foreach_sample(dev->buf,
500 receive_sample, &info);
501 }
502 }
503
dev_entry_put(struct DevEntry * entry)504 static void dev_entry_put(struct DevEntry *entry)
505 {
506 bool free_entry = false;
507
508 pthread_mutex_lock(&entry->thdlist_lock);
509 entry->ref_count--;
510 if (entry->ref_count == 0)
511 free_entry = true;
512 pthread_mutex_unlock(&entry->thdlist_lock);
513
514 if (free_entry) {
515 pthread_mutex_destroy(&entry->thdlist_lock);
516 pthread_cond_destroy(&entry->rw_ready_cond);
517
518 free(entry->mask);
519 free(entry);
520 }
521 }
522
signal_thread(struct ThdEntry * thd,ssize_t ret)523 static void signal_thread(struct ThdEntry *thd, ssize_t ret)
524 {
525 thd->err = ret;
526 thd->nb = 0;
527 thd->active = false;
528 thd_entry_event_signal(thd);
529 }
530
rw_thd(struct thread_pool * pool,void * d)531 static void rw_thd(struct thread_pool *pool, void *d)
532 {
533 struct DevEntry *entry = d;
534 struct ThdEntry *thd, *next_thd;
535 struct iio_device *dev = entry->dev;
536 unsigned int nb_words = entry->nb_words;
537 ssize_t ret = 0;
538
539 IIO_DEBUG("R/W thread started for device %s\n",
540 dev->name ? dev->name : dev->id);
541
542 while (true) {
543 bool has_readers = false, has_writers = false,
544 mask_updated = false;
545 unsigned int sample_size;
546
547 /* NOTE: this while loop must exit with thdlist_lock locked. */
548 pthread_mutex_lock(&entry->thdlist_lock);
549
550 if (SLIST_EMPTY(&entry->thdlist_head))
551 break;
552
553 if (entry->update_mask) {
554 unsigned int i;
555 unsigned int samples_count = 0;
556
557 memset(entry->mask, 0, nb_words * sizeof(*entry->mask));
558 SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
559 for (i = 0; i < nb_words; i++)
560 entry->mask[i] |= thd->mask[i];
561
562 if (thd->samples_count > samples_count)
563 samples_count = thd->samples_count;
564 }
565
566 if (entry->buf)
567 iio_buffer_destroy(entry->buf);
568
569 for (i = 0; i < dev->nb_channels; i++) {
570 struct iio_channel *chn = dev->channels[i];
571 long index = chn->index;
572
573 if (index < 0)
574 continue;
575
576 if (TEST_BIT(entry->mask, chn->number))
577 iio_channel_enable(chn);
578 else
579 iio_channel_disable(chn);
580 }
581
582 entry->buf = iio_device_create_buffer(dev,
583 samples_count, entry->cyclic);
584 if (!entry->buf) {
585 ret = -errno;
586 IIO_ERROR("Unable to create buffer\n");
587 break;
588 }
589 entry->cancelled = false;
590
591 /* Signal the threads that we opened the device */
592 SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
593 if (thd->wait_for_open) {
594 thd->wait_for_open = false;
595 signal_thread(thd, 0);
596 }
597 }
598
599 IIO_DEBUG("IIO device %s reopened with new mask:\n",
600 dev->id);
601 for (i = 0; i < nb_words; i++)
602 IIO_DEBUG("Mask[%i] = 0x%08x\n", i, entry->mask[i]);
603 entry->update_mask = false;
604
605 entry->sample_size = iio_device_get_sample_size(dev);
606 mask_updated = true;
607 }
608
609 sample_size = entry->sample_size;
610
611 SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
612 thd->active = !thd->err && thd->nb >= sample_size;
613 if (mask_updated && thd->active)
614 signal_thread(thd, thd->nb);
615
616 if (thd->is_writer)
617 has_writers |= thd->active;
618 else
619 has_readers |= thd->active;
620 }
621
622 if (!has_readers && !has_writers) {
623 pthread_cond_wait(&entry->rw_ready_cond,
624 &entry->thdlist_lock);
625 }
626
627 pthread_mutex_unlock(&entry->thdlist_lock);
628
629 if (!has_readers && !has_writers)
630 continue;
631
632 if (has_readers) {
633 ssize_t nb_bytes;
634
635 ret = iio_buffer_refill(entry->buf);
636
637 pthread_mutex_lock(&entry->thdlist_lock);
638
639 /*
640 * When the last client disconnects the buffer is
641 * cancelled and iio_buffer_refill() returns an error. A
642 * new client might have connected before we got here
643 * though, in that case the rw thread has to stay active
644 * and a new buffer is created. If the list is still empty the loop
645 * will exit normally.
646 */
647 if (entry->cancelled) {
648 pthread_mutex_unlock(&entry->thdlist_lock);
649 continue;
650 }
651
652 if (ret < 0) {
653 IIO_ERROR("Reading from device failed: %i\n",
654 (int) ret);
655 break;
656 }
657
658 nb_bytes = ret;
659
660 /* We don't use SLIST_FOREACH here. As soon as a thread is
661 * signaled, its "thd" structure might be freed;
662 * SLIST_FOREACH would then cause a segmentation fault, as it
663 * reads "thd" to get the address of the next element. */
664 for (thd = SLIST_FIRST(&entry->thdlist_head);
665 thd; thd = next_thd) {
666 next_thd = SLIST_NEXT(thd, dev_list_entry);
667
668 if (!thd->active || thd->is_writer)
669 continue;
670
671 ret = send_data(entry, thd, nb_bytes);
672 if (ret > 0)
673 thd->nb -= ret;
674
675 if (ret < 0 || thd->nb < sample_size)
676 signal_thread(thd, (ret < 0) ?
677 ret : (ssize_t) thd->nb);
678 }
679
680 pthread_mutex_unlock(&entry->thdlist_lock);
681 }
682
683 if (has_writers) {
684 ssize_t nb_bytes = 0;
685
686 pthread_mutex_lock(&entry->thdlist_lock);
687
688 /* Reset the size of the buffer to its maximum size */
689 entry->buf->data_length = entry->buf->length;
690
691 /* Same comment as above */
692 for (thd = SLIST_FIRST(&entry->thdlist_head);
693 thd; thd = next_thd) {
694 next_thd = SLIST_NEXT(thd, dev_list_entry);
695
696 if (!thd->active || !thd->is_writer)
697 continue;
698
699 ret = receive_data(entry, thd);
700 if (ret > 0) {
701 thd->nb -= ret;
702 if (ret > nb_bytes)
703 nb_bytes = ret;
704 }
705
706 if (ret < 0)
707 signal_thread(thd, ret);
708 }
709
710 ret = iio_buffer_push_partial(entry->buf,
711 nb_bytes / sample_size);
712 if (entry->cancelled) {
713 pthread_mutex_unlock(&entry->thdlist_lock);
714 continue;
715 }
716 if (ret < 0) {
717 IIO_ERROR("Writing to device failed: %i\n",
718 (int) ret);
719 break;
720 }
721
722 /* Signal threads which completed their RW command */
723 for (thd = SLIST_FIRST(&entry->thdlist_head);
724 thd; thd = next_thd) {
725 next_thd = SLIST_NEXT(thd, dev_list_entry);
726 if (thd->active && thd->is_writer &&
727 thd->nb < sample_size)
728 signal_thread(thd, thd->nb);
729 }
730
731 pthread_mutex_unlock(&entry->thdlist_lock);
732 }
733 }
734
735 /* Signal all remaining threads */
736 for (thd = SLIST_FIRST(&entry->thdlist_head); thd; thd = next_thd) {
737 next_thd = SLIST_NEXT(thd, dev_list_entry);
738 SLIST_REMOVE(&entry->thdlist_head, thd, ThdEntry, dev_list_entry);
739 thd->wait_for_open = false;
740 signal_thread(thd, ret);
741 }
742 if (entry->buf) {
743 iio_buffer_destroy(entry->buf);
744 entry->buf = NULL;
745 }
746 entry->closed = true;
747 pthread_mutex_unlock(&entry->thdlist_lock);
748
749 pthread_mutex_lock(&devlist_lock);
750 /* It is possible that a new thread has already started, make sure to
751 * not overwrite it. */
752 if (iio_device_get_data(dev) == entry)
753 iio_device_set_data(dev, NULL);
754 pthread_mutex_unlock(&devlist_lock);
755
756 IIO_DEBUG("Stopping R/W thread for device %s\n",
757 dev->name ? dev->name : dev->id);
758
759 dev_entry_put(entry);
760 }
761
parser_lookup_thd_entry(struct parser_pdata * pdata,struct iio_device * dev)762 static struct ThdEntry *parser_lookup_thd_entry(struct parser_pdata *pdata,
763 struct iio_device *dev)
764 {
765 struct ThdEntry *t;
766
767 SLIST_FOREACH(t, &pdata->thdlist_head, parser_list_entry) {
768 if (t->dev == dev)
769 return t;
770 }
771
772 return NULL;
773 }
774
rw_buffer(struct parser_pdata * pdata,struct iio_device * dev,unsigned int nb,bool is_write)775 static ssize_t rw_buffer(struct parser_pdata *pdata,
776 struct iio_device *dev, unsigned int nb, bool is_write)
777 {
778 struct DevEntry *entry;
779 struct ThdEntry *thd;
780 ssize_t ret;
781
782 if (!dev)
783 return -ENODEV;
784
785 thd = parser_lookup_thd_entry(pdata, dev);
786 if (!thd)
787 return -EBADF;
788
789 entry = thd->entry;
790
791 if (nb < entry->sample_size)
792 return 0;
793
794 pthread_mutex_lock(&entry->thdlist_lock);
795 if (entry->closed) {
796 pthread_mutex_unlock(&entry->thdlist_lock);
797 return -EBADF;
798 }
799
800 if (thd->nb) {
801 pthread_mutex_unlock(&entry->thdlist_lock);
802 return -EBUSY;
803 }
804
805 thd->new_client = true;
806 thd->nb = nb;
807 thd->err = 0;
808 thd->is_writer = is_write;
809 thd->active = true;
810
811 pthread_cond_signal(&entry->rw_ready_cond);
812
813 IIO_DEBUG("Waiting for completion...\n");
814 while (thd->active) {
815 ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
816 if (ret)
817 break;
818 }
819 if (ret == 0)
820 ret = thd->err;
821 pthread_mutex_unlock(&entry->thdlist_lock);
822
823 if (ret > 0 && ret < (ssize_t) nb)
824 print_value(thd->pdata, 0);
825
826 IIO_DEBUG("Exiting rw_buffer with code %li\n", (long) ret);
827 if (ret < 0)
828 return ret;
829 else
830 return nb - ret;
831 }
832
get_mask(const char * mask,size_t * len)833 static uint32_t *get_mask(const char *mask, size_t *len)
834 {
835 size_t nb = (*len + 7) / 8;
836 uint32_t *ptr, *words = calloc(nb, sizeof(*words));
837 if (!words)
838 return NULL;
839
840 ptr = words + nb;
841 while (*mask) {
842 char buf[9];
843 iio_snprintf(buf, sizeof(buf), "%.*s", 8, mask);
844 sscanf(buf, "%08x", --ptr);
845 mask += 8;
846 IIO_DEBUG("Mask[%lu]: 0x%08x\n",
847 (unsigned long) (words - ptr) / 4, *ptr);
848 }
849
850 *len = nb;
851 return words;
852 }
853
free_thd_entry(struct ThdEntry * t)854 static void free_thd_entry(struct ThdEntry *t)
855 {
856 close(t->eventfd);
857 free(t->mask);
858 free(t);
859 }
860
remove_thd_entry(struct ThdEntry * t)861 static void remove_thd_entry(struct ThdEntry *t)
862 {
863 struct DevEntry *entry = t->entry;
864
865 pthread_mutex_lock(&entry->thdlist_lock);
866 if (!entry->closed) {
867 entry->update_mask = true;
868 SLIST_REMOVE(&entry->thdlist_head, t, ThdEntry, dev_list_entry);
869 if (SLIST_EMPTY(&entry->thdlist_head) && entry->buf) {
870 entry->cancelled = true;
871 iio_buffer_cancel(entry->buf); /* Wakeup the rw thread */
872 }
873
874 pthread_cond_signal(&entry->rw_ready_cond);
875 }
876 pthread_mutex_unlock(&entry->thdlist_lock);
877 dev_entry_put(entry);
878
879 free_thd_entry(t);
880 }
881
open_dev_helper(struct parser_pdata * pdata,struct iio_device * dev,size_t samples_count,const char * mask,bool cyclic)882 static int open_dev_helper(struct parser_pdata *pdata, struct iio_device *dev,
883 size_t samples_count, const char *mask, bool cyclic)
884 {
885 int ret = -ENOMEM;
886 struct DevEntry *entry;
887 struct ThdEntry *thd;
888 size_t len = strlen(mask);
889 uint32_t *words;
890 unsigned int nb_channels;
891 unsigned int cyclic_retry = 500;
892
893 if (!dev)
894 return -ENODEV;
895
896 nb_channels = dev->nb_channels;
897 if (len != ((nb_channels + 31) / 32) * 8)
898 return -EINVAL;
899
900 words = get_mask(mask, &len);
901 if (!words)
902 return -ENOMEM;
903
904 thd = zalloc(sizeof(*thd));
905 if (!thd)
906 goto err_free_words;
907
908 thd->wait_for_open = true;
909 thd->mask = words;
910 thd->nb = 0;
911 thd->samples_count = samples_count;
912 thd->sample_size = iio_device_get_sample_size_mask(dev, words, len);
913 thd->pdata = pdata;
914 thd->dev = dev;
915 thd->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
916
917 retry:
918 /* Atomically look up the thread and make sure that it is still active
919 * or allocate new one. */
920 pthread_mutex_lock(&devlist_lock);
921 entry = iio_device_get_data(dev);
922 if (entry) {
923 if (cyclic || entry->cyclic) {
924 /* Only one client allowed in cyclic mode */
925 pthread_mutex_unlock(&devlist_lock);
926
927 /* There is an inherent race condition if a client
928 * creates a new cyclic buffer shortly after destroying
929 * a previous. E.g. like
930 *
931 * iio_buffer_destroy(buf);
932 * buf = iio_device_create_buffer(dev, n, true);
933 *
934 * In this case the two buffers each use their own
935 * communication channel which are unordered to each
936 * other. E.g. the socket open might arrive before the
937 * socket close on the host side, even though they were
938 * sent in the opposite order on the client side. This
939 * race condition can cause an error being reported back
940 * to the client, even though the code on the client
941 * side was well formed and would work fine e.g. using
942 * the local backend.
943 *
944 * To avoid this issue go to sleep for up to 50ms in
945 * intervals of 100us. This should be enough time for
946 * the issue to resolve itself. If there actually is
947 * contention on the buffer an error will eventually be
948 * returned in which case the additional delay cause by
949 * the retires should not matter too much.
950 *
951 * This is not pretty but it works.
952 */
953 if (cyclic_retry) {
954 struct timespec wait;
955 wait.tv_sec = 0;
956 wait.tv_nsec = (100 * 1000);
957 cyclic_retry--;
958 nanosleep(&wait, &wait);
959 goto retry;
960 }
961
962 ret = -EBUSY;
963 goto err_free_thd;
964 }
965
966 pthread_mutex_lock(&entry->thdlist_lock);
967 if (!entry->closed) {
968 pthread_mutex_unlock(&devlist_lock);
969
970 entry->ref_count++;
971
972 SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
973 thd->entry = entry;
974 entry->update_mask = true;
975 IIO_DEBUG("Added thread to client list\n");
976
977 pthread_cond_signal(&entry->rw_ready_cond);
978
979 /* Wait until the device is opened by the rw thread */
980 while (thd->wait_for_open) {
981 ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
982 if (ret)
983 break;
984 }
985 pthread_mutex_unlock(&entry->thdlist_lock);
986
987 if (ret == 0)
988 ret = (int) thd->err;
989 if (ret < 0)
990 remove_thd_entry(thd);
991 else
992 SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
993 return ret;
994 } else {
995 pthread_mutex_unlock(&entry->thdlist_lock);
996 }
997 }
998
999 entry = zalloc(sizeof(*entry));
1000 if (!entry) {
1001 pthread_mutex_unlock(&devlist_lock);
1002 goto err_free_thd;
1003 }
1004
1005 entry->ref_count = 2; /* One for thread, one for the client */
1006
1007 entry->mask = malloc(len * sizeof(*words));
1008 if (!entry->mask) {
1009 pthread_mutex_unlock(&devlist_lock);
1010 goto err_free_entry;
1011 }
1012
1013 entry->cyclic = cyclic;
1014 entry->nb_words = len;
1015 entry->update_mask = true;
1016 entry->dev = dev;
1017 entry->buf = NULL;
1018 SLIST_INIT(&entry->thdlist_head);
1019 SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
1020 thd->entry = entry;
1021 IIO_DEBUG("Added thread to client list\n");
1022
1023 pthread_mutex_init(&entry->thdlist_lock, NULL);
1024 pthread_cond_init(&entry->rw_ready_cond, NULL);
1025
1026 ret = thread_pool_add_thread(main_thread_pool, rw_thd, entry, "rw_thd");
1027 if (ret) {
1028 pthread_mutex_unlock(&devlist_lock);
1029 goto err_free_entry_mask;
1030 }
1031
1032 IIO_DEBUG("Adding new device thread to device list\n");
1033 iio_device_set_data(dev, entry);
1034 pthread_mutex_unlock(&devlist_lock);
1035
1036 pthread_mutex_lock(&entry->thdlist_lock);
1037 /* Wait until the device is opened by the rw thread */
1038 while (thd->wait_for_open) {
1039 ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
1040 if (ret)
1041 break;
1042 }
1043 pthread_mutex_unlock(&entry->thdlist_lock);
1044
1045 if (ret == 0)
1046 ret = (int) thd->err;
1047 if (ret < 0)
1048 remove_thd_entry(thd);
1049 else
1050 SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
1051 return ret;
1052
1053 err_free_entry_mask:
1054 free(entry->mask);
1055 err_free_entry:
1056 free(entry);
1057 err_free_thd:
1058 close(thd->eventfd);
1059 free(thd);
1060 err_free_words:
1061 free(words);
1062 return ret;
1063 }
1064
close_dev_helper(struct parser_pdata * pdata,struct iio_device * dev)1065 static int close_dev_helper(struct parser_pdata *pdata, struct iio_device *dev)
1066 {
1067 struct ThdEntry *t;
1068
1069 if (!dev)
1070 return -ENODEV;
1071
1072 t = parser_lookup_thd_entry(pdata, dev);
1073 if (!t)
1074 return -ENXIO;
1075
1076 SLIST_REMOVE(&pdata->thdlist_head, t, ThdEntry, parser_list_entry);
1077 remove_thd_entry(t);
1078
1079 return 0;
1080 }
1081
open_dev(struct parser_pdata * pdata,struct iio_device * dev,size_t samples_count,const char * mask,bool cyclic)1082 int open_dev(struct parser_pdata *pdata, struct iio_device *dev,
1083 size_t samples_count, const char *mask, bool cyclic)
1084 {
1085 int ret = open_dev_helper(pdata, dev, samples_count, mask, cyclic);
1086 print_value(pdata, ret);
1087 return ret;
1088 }
1089
close_dev(struct parser_pdata * pdata,struct iio_device * dev)1090 int close_dev(struct parser_pdata *pdata, struct iio_device *dev)
1091 {
1092 int ret = close_dev_helper(pdata, dev);
1093 print_value(pdata, ret);
1094 return ret;
1095 }
1096
rw_dev(struct parser_pdata * pdata,struct iio_device * dev,unsigned int nb,bool is_write)1097 ssize_t rw_dev(struct parser_pdata *pdata, struct iio_device *dev,
1098 unsigned int nb, bool is_write)
1099 {
1100 ssize_t ret = rw_buffer(pdata, dev, nb, is_write);
1101 if (ret <= 0 || is_write)
1102 print_value(pdata, ret);
1103 return ret;
1104 }
1105
read_dev_attr(struct parser_pdata * pdata,struct iio_device * dev,const char * attr,enum iio_attr_type type)1106 ssize_t read_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
1107 const char *attr, enum iio_attr_type type)
1108 {
1109 /* We use a very large buffer here, as if attr is NULL all the
1110 * attributes will be read, which may represents a few kilobytes worth
1111 * of data. */
1112 char buf[0x10000];
1113 ssize_t ret = -EINVAL;
1114
1115 if (!dev) {
1116 print_value(pdata, -ENODEV);
1117 return -ENODEV;
1118 }
1119
1120 switch (type) {
1121 case IIO_ATTR_TYPE_DEVICE:
1122 ret = iio_device_attr_read(dev, attr, buf, sizeof(buf) - 1);
1123 break;
1124 case IIO_ATTR_TYPE_DEBUG:
1125 ret = iio_device_debug_attr_read(dev,
1126 attr, buf, sizeof(buf) - 1);
1127 break;
1128 case IIO_ATTR_TYPE_BUFFER:
1129 ret = iio_device_buffer_attr_read(dev,
1130 attr, buf, sizeof(buf) - 1);
1131 break;
1132 default:
1133 ret = -EINVAL;
1134 break;
1135 }
1136 print_value(pdata, ret);
1137 if (ret < 0)
1138 return ret;
1139
1140 buf[ret] = '\n';
1141 return write_all(pdata, buf, ret + 1);
1142 }
1143
write_dev_attr(struct parser_pdata * pdata,struct iio_device * dev,const char * attr,size_t len,enum iio_attr_type type)1144 ssize_t write_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
1145 const char *attr, size_t len, enum iio_attr_type type)
1146 {
1147 ssize_t ret = -ENOMEM;
1148 char *buf;
1149
1150 if (!dev) {
1151 ret = -ENODEV;
1152 goto err_print_value;
1153 }
1154
1155 buf = malloc(len);
1156 if (!buf)
1157 goto err_print_value;
1158
1159 ret = read_all(pdata, buf, len);
1160 if (ret < 0)
1161 goto err_free_buffer;
1162
1163 switch (type) {
1164 case IIO_ATTR_TYPE_DEVICE:
1165 ret = iio_device_attr_write_raw(dev, attr, buf, len);
1166 break;
1167 case IIO_ATTR_TYPE_DEBUG:
1168 ret = iio_device_debug_attr_write_raw(dev, attr, buf, len);
1169 break;
1170 case IIO_ATTR_TYPE_BUFFER:
1171 ret = iio_device_buffer_attr_write_raw(dev, attr, buf, len);
1172 break;
1173 default:
1174 ret = -EINVAL;
1175 break;
1176 }
1177
1178 err_free_buffer:
1179 free(buf);
1180 err_print_value:
1181 print_value(pdata, ret);
1182 return ret;
1183 }
1184
read_chn_attr(struct parser_pdata * pdata,struct iio_channel * chn,const char * attr)1185 ssize_t read_chn_attr(struct parser_pdata *pdata,
1186 struct iio_channel *chn, const char *attr)
1187 {
1188 char buf[1024];
1189 ssize_t ret = -ENODEV;
1190
1191 if (chn)
1192 ret = iio_channel_attr_read(chn, attr, buf, sizeof(buf) - 1);
1193 else if (pdata->dev)
1194 ret = -ENXIO;
1195 print_value(pdata, ret);
1196 if (ret < 0)
1197 return ret;
1198
1199 buf[ret] = '\n';
1200 return write_all(pdata, buf, ret + 1);
1201 }
1202
write_chn_attr(struct parser_pdata * pdata,struct iio_channel * chn,const char * attr,size_t len)1203 ssize_t write_chn_attr(struct parser_pdata *pdata,
1204 struct iio_channel *chn, const char *attr, size_t len)
1205 {
1206 ssize_t ret = -ENOMEM;
1207 char *buf = malloc(len);
1208 if (!buf)
1209 goto err_print_value;
1210
1211 ret = read_all(pdata, buf, len);
1212 if (ret < 0)
1213 goto err_free_buffer;
1214
1215 if (chn)
1216 ret = iio_channel_attr_write_raw(chn, attr, buf, len);
1217 else if (pdata->dev)
1218 ret = -ENXIO;
1219 else
1220 ret = -ENODEV;
1221 err_free_buffer:
1222 free(buf);
1223 err_print_value:
1224 print_value(pdata, ret);
1225 return ret;
1226 }
1227
set_trigger(struct parser_pdata * pdata,struct iio_device * dev,const char * trigger)1228 ssize_t set_trigger(struct parser_pdata *pdata,
1229 struct iio_device *dev, const char *trigger)
1230 {
1231 struct iio_device *trig = NULL;
1232 ssize_t ret = -ENOENT;
1233
1234 if (!dev) {
1235 ret = -ENODEV;
1236 goto err_print_value;
1237 }
1238
1239 if (trigger) {
1240 trig = iio_context_find_device(pdata->ctx, trigger);
1241 if (!trig)
1242 goto err_print_value;
1243 }
1244
1245 ret = iio_device_set_trigger(dev, trig);
1246 err_print_value:
1247 print_value(pdata, ret);
1248 return ret;
1249 }
1250
get_trigger(struct parser_pdata * pdata,struct iio_device * dev)1251 ssize_t get_trigger(struct parser_pdata *pdata, struct iio_device *dev)
1252 {
1253 const struct iio_device *trigger;
1254 ssize_t ret;
1255
1256 if (!dev) {
1257 print_value(pdata, -ENODEV);
1258 return -ENODEV;
1259 }
1260
1261 ret = iio_device_get_trigger(dev, &trigger);
1262 if (!ret && trigger) {
1263 char buf[256];
1264
1265 ret = strlen(trigger->name);
1266 print_value(pdata, ret);
1267
1268 iio_snprintf(buf, sizeof(buf), "%s\n", trigger->name);
1269 ret = write_all(pdata, buf, ret + 1);
1270 } else {
1271 print_value(pdata, ret);
1272 }
1273 return ret;
1274 }
1275
set_timeout(struct parser_pdata * pdata,unsigned int timeout)1276 int set_timeout(struct parser_pdata *pdata, unsigned int timeout)
1277 {
1278 int ret = iio_context_set_timeout(pdata->ctx, timeout);
1279 print_value(pdata, ret);
1280 return ret;
1281 }
1282
set_buffers_count(struct parser_pdata * pdata,struct iio_device * dev,long value)1283 int set_buffers_count(struct parser_pdata *pdata,
1284 struct iio_device *dev, long value)
1285 {
1286 int ret = -EINVAL;
1287
1288 if (!dev) {
1289 ret = -ENODEV;
1290 goto err_print_value;
1291 }
1292
1293 if (value >= 1)
1294 ret = iio_device_set_kernel_buffers_count(
1295 dev, (unsigned int) value);
1296 err_print_value:
1297 print_value(pdata, ret);
1298 return ret;
1299 }
1300
read_line(struct parser_pdata * pdata,char * buf,size_t len)1301 ssize_t read_line(struct parser_pdata *pdata, char *buf, size_t len)
1302 {
1303 ssize_t ret;
1304
1305 if (pdata->fd_in_is_socket) {
1306 struct pollfd pfd[2];
1307 bool found;
1308 size_t bytes_read = 0;
1309
1310 pfd[0].fd = pdata->fd_in;
1311 pfd[0].events = POLLIN | POLLRDHUP;
1312 pfd[0].revents = 0;
1313 pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
1314 pfd[1].events = POLLIN;
1315 pfd[1].revents = 0;
1316
1317 do {
1318 size_t i, to_trunc;
1319
1320 poll_nointr(pfd, 2);
1321
1322 if (pfd[1].revents & POLLIN ||
1323 pfd[0].revents & POLLRDHUP)
1324 return 0;
1325
1326 /* First read from the socket, without advancing the
1327 * read offset */
1328 ret = recv(pdata->fd_in, buf, len,
1329 MSG_NOSIGNAL | MSG_PEEK);
1330 if (ret < 0)
1331 return -errno;
1332
1333 /* Lookup for the trailing \n */
1334 for (i = 0; i < (size_t) ret && buf[i] != '\n'; i++);
1335 found = i < (size_t) ret;
1336
1337 len -= ret;
1338 buf += ret;
1339
1340 to_trunc = found ? i + 1 : (size_t) ret;
1341
1342 /* Advance the read offset after the \n if found, or
1343 * after the last character read otherwise */
1344 ret = recv(pdata->fd_in, NULL, to_trunc,
1345 MSG_NOSIGNAL | MSG_TRUNC);
1346 if (ret < 0)
1347 return -errno;
1348
1349 bytes_read += to_trunc;
1350 } while (!found && len);
1351
1352 /* No \n found? Just garbage data */
1353 if (!found)
1354 ret = -EIO;
1355 else
1356 ret = bytes_read;
1357 } else {
1358 ret = pdata->readfd(pdata, buf, len);
1359 }
1360
1361 return ret;
1362 }
1363
interpreter(struct iio_context * ctx,int fd_in,int fd_out,bool verbose,bool is_socket,bool use_aio,struct thread_pool * pool)1364 void interpreter(struct iio_context *ctx, int fd_in, int fd_out, bool verbose,
1365 bool is_socket, bool use_aio, struct thread_pool *pool)
1366 {
1367 yyscan_t scanner;
1368 struct parser_pdata pdata;
1369 unsigned int i;
1370 int ret;
1371
1372 pdata.ctx = ctx;
1373 pdata.stop = false;
1374 pdata.fd_in = fd_in;
1375 pdata.fd_out = fd_out;
1376 pdata.verbose = verbose;
1377 pdata.pool = pool;
1378
1379 pdata.fd_in_is_socket = is_socket;
1380 pdata.fd_out_is_socket = is_socket;
1381
1382 SLIST_INIT(&pdata.thdlist_head);
1383
1384 if (use_aio) {
1385 /* Note: if WITH_AIO is not defined, use_aio is always false.
1386 * We ensure that in iiod.c. */
1387 #if WITH_AIO
1388 char err_str[1024];
1389
1390 pdata.aio_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
1391 if (pdata.aio_eventfd < 0) {
1392 iio_strerror(errno, err_str, sizeof(err_str));
1393 IIO_ERROR("Failed to create AIO eventfd: %s\n", err_str);
1394 return;
1395 }
1396
1397 pdata.aio_ctx = 0;
1398 ret = io_setup(1, &pdata.aio_ctx);
1399 if (ret < 0) {
1400 iio_strerror(-ret, err_str, sizeof(err_str));
1401 IIO_ERROR("Failed to create AIO context: %s\n", err_str);
1402 close(pdata.aio_eventfd);
1403 return;
1404 }
1405 pthread_mutex_init(&pdata.aio_mutex, NULL);
1406 pdata.readfd = readfd_aio;
1407 pdata.writefd = writefd_aio;
1408 #endif
1409 } else {
1410 pdata.readfd = readfd_io;
1411 pdata.writefd = writefd_io;
1412 }
1413
1414 yylex_init_extra(&pdata, &scanner);
1415
1416 do {
1417 if (verbose)
1418 output(&pdata, "iio-daemon > ");
1419 ret = yyparse(scanner);
1420 } while (!pdata.stop && ret >= 0);
1421
1422 yylex_destroy(scanner);
1423
1424 /* Close all opened devices */
1425 for (i = 0; i < ctx->nb_devices; i++)
1426 close_dev_helper(&pdata, ctx->devices[i]);
1427
1428 #if WITH_AIO
1429 if (use_aio) {
1430 io_destroy(pdata.aio_ctx);
1431 close(pdata.aio_eventfd);
1432 }
1433 #endif
1434 }
1435