1 /*
2    (c) Copyright 2011  Denis Oliver Kropp <dok@directfb.org>
3 
4    All rights reserved.
5 
6    This program is free software; you can redistribute it and/or
7    modify it under the terms of the GNU General Public License
8    as published by the Free Software Foundation; either version
9    2 of the License, or (at your option) any later version.
10 */
11 
12 //#define DIRECT_ENABLE_DEBUG
13 
14 #include <config.h>
15 
16 #include <direct/debug.h>
17 #include <direct/direct.h>
18 #include <direct/hash.h>
19 #include <direct/log.h>
20 #include <direct/mem.h>
21 #include <direct/memcpy.h>
22 #include <direct/messages.h>
23 #include <direct/thread.h>
24 #include <direct/util.h>
25 
26 #include <sys/fcntl.h>
27 #include <sys/ioctl.h>
28 #include <sys/uio.h>
29 #include <unistd.h>
30 
31 #include <linux/one.h>
32 
33 #include "One.h"
34 
35 #ifndef ONEDEV
36 #define ONEDEV "/dev/one0"
37 #endif
38 
39 
40 D_DEBUG_DOMAIN( One_Main,   "One/Main",   "One Main" );
41 D_DEBUG_DOMAIN( One_Queue,  "One/Queue",  "One Queue" );
42 D_DEBUG_DOMAIN( One_Thread, "One/Thread", "One Thread" );
43 
44 /*********************************************************************************************************************/
45 
46 static DirectMutex  one_lock;
47 static unsigned int one_refs;
48 static int          one_fd = -1;   /* File descriptor of the One Kernel Device */
49 
50 __attribute__((constructor))
51 static void
__One_Init(void)52 __One_Init( void )
53 {
54      direct_mutex_init( &one_lock );
55 }
56 
57 /*********************************************************************************************************************/
58 
59 DirectResult
One_Initialize()60 One_Initialize()
61 {
62      DirectResult ret = DR_OK;
63 
64      D_DEBUG_AT( One_Main, "%s()\n", __FUNCTION__ );
65 
66      ret = direct_initialize();
67      if (ret)
68           return ret;
69 
70      direct_mutex_lock( &one_lock );
71 
72      if (!one_refs) {
73           /* Open the One Kernel Device. */
74           one_fd = open( ONEDEV, O_RDWR );
75           if (one_fd < 0) {
76                ret = errno2result( errno );
77                D_PERROR( "One/Main: Opening " ONEDEV " failed!\n" );
78           }
79      }
80 
81      if (ret == DR_OK)
82           one_refs++;
83 
84      direct_mutex_unlock( &one_lock );
85 
86      return ret;
87 }
88 
89 DirectResult
One_Shutdown()90 One_Shutdown()
91 {
92      D_DEBUG_AT( One_Main, "%s()\n", __FUNCTION__ );
93 
94      direct_mutex_lock( &one_lock );
95 
96      if (!--one_refs) {
97           /* Close the One Kernel Device. */
98           close( one_fd );
99           one_fd = -1;
100      }
101 
102      direct_mutex_unlock( &one_lock );
103 
104      direct_shutdown();
105 
106      return DR_OK;
107 }
108 
109 /*********************************************************************************************************************/
110 
111 DirectResult
OneQueue_New(OneQueueFlags flags,OneQID queue_id,OneQID * ret_id)112 OneQueue_New( OneQueueFlags  flags,
113               OneQID         queue_id,
114               OneQID        *ret_id )
115 {
116      DirectResult ret = DR_OK;
117      OneQueueNew  queue_new;
118 
119      D_DEBUG_AT( One_Queue, "%s( flags 0x%08x )\n", __FUNCTION__, flags );
120 
121      D_ASSERT( ret_id != NULL );
122 
123      queue_new.flags    = flags;
124      queue_new.queue_id = queue_id;
125 
126      while (ioctl( one_fd, ONE_QUEUE_NEW, &queue_new )) {
127           switch (errno) {
128                case EINTR:
129                     continue;
130                default:
131                     break;
132           }
133 
134           ret = errno2result( errno );
135 
136           D_PERROR( "One/Queue: ONE_QUEUE_NEW failed!\n" );
137 
138           goto error;
139      }
140 
141      D_DEBUG_AT( One_Queue, "  -> QID 0x%08x\n", queue_new.queue_id );
142 
143      *ret_id = queue_new.queue_id;
144 
145 error:
146      return ret;
147 }
148 
149 DirectResult
OneQueue_Destroy(OneQID queue_id)150 OneQueue_Destroy( OneQID queue_id )
151 {
152      DirectResult    ret = DR_OK;
153      OneQueueDestroy queue_destroy;
154 
155      D_DEBUG_AT( One_Queue, "%s( QID 0x%08x )\n", __FUNCTION__, queue_id );
156 
157      queue_destroy.queue_id = queue_id;
158 
159      while (ioctl( one_fd, ONE_QUEUE_DESTROY, &queue_destroy )) {
160           switch (errno) {
161                case EINTR:
162                     continue;
163                default:
164                     break;
165           }
166 
167           ret = errno2result( errno );
168 
169           D_PERROR( "One/Queue: ONE_QUEUE_DESTROY failed!\n" );
170 
171           goto error;
172      }
173 
174 error:
175      return ret;
176 }
177 
178 DirectResult
OneQueue_Attach(OneQID queue_id,OneQID target_id)179 OneQueue_Attach( OneQID queue_id,
180                  OneQID target_id )
181 {
182      DirectResult   ret = DR_OK;
183      OneQueueAttach queue_attach;
184 
185      D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, target QID 0x%08x )\n", __FUNCTION__, queue_id, target_id );
186 
187      queue_attach.queue_id  = queue_id;
188      queue_attach.target_id = target_id;
189 
190      while (ioctl( one_fd, ONE_QUEUE_ATTACH, &queue_attach )) {
191           switch (errno) {
192                case EINTR:
193                     continue;
194                default:
195                     break;
196           }
197 
198           ret = errno2result( errno );
199 
200           D_PERROR( "One/Queue: ONE_QUEUE_ATTACH failed!\n" );
201 
202           goto error;
203      }
204 
205 error:
206      return ret;
207 }
208 
209 DirectResult
OneQueue_Detach(OneQID queue_id,OneQID target_id)210 OneQueue_Detach( OneQID queue_id,
211                  OneQID target_id )
212 {
213      DirectResult   ret = DR_OK;
214      OneQueueDetach queue_detach;
215 
216      D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, target QID 0x%08x )\n", __FUNCTION__, queue_id, target_id );
217 
218      queue_detach.queue_id  = queue_id;
219      queue_detach.target_id = target_id;
220 
221      while (ioctl( one_fd, ONE_QUEUE_DETACH, &queue_detach )) {
222           switch (errno) {
223                case EINTR:
224                     continue;
225                default:
226                     break;
227           }
228 
229           ret = errno2result( errno );
230 
231           D_PERROR( "One/Queue: ONE_QUEUE_DETACH failed!\n" );
232 
233           goto error;
234      }
235 
236 error:
237      return ret;
238 }
239 
240 DirectResult
OneQueue_Dispatch(OneQID queue_id,void * data,size_t length)241 OneQueue_Dispatch( OneQID  queue_id,
242                    void   *data,
243                    size_t  length )
244 {
245      DirectResult     ret = DR_OK;
246      OneQueueDispatch queue_dispatch;
247      struct iovec     iov[1];
248 
249      D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, data %p, length %zu )\n", __FUNCTION__, queue_id, data, length );
250 
251      D_ASSERT( data != NULL );
252 
253      iov[0].iov_base = data;
254      iov[0].iov_len  = length;
255 
256      queue_dispatch.header.queue_id     = queue_id;
257      queue_dispatch.header.flags        = ONE_PACKET_NO_FLAGS;
258      queue_dispatch.header.size         = length;
259      queue_dispatch.header.uncompressed = length;
260 
261      queue_dispatch.iov       = iov;
262      queue_dispatch.iov_count = 1;
263 
264      while (ioctl( one_fd, ONE_QUEUE_DISPATCH, &queue_dispatch )) {
265           switch (errno) {
266                case EINTR:
267                     continue;
268                default:
269                     break;
270           }
271 
272           ret = errno2result( errno );
273 
274           D_PERROR( "One/Queue: ONE_QUEUE_DISPATCH failed!\n" );
275 
276           goto error;
277      }
278 
279 error:
280      return ret;
281 }
282 
283 DirectResult
OneQueue_DispatchV(OneQID queue_id,void ** datas,size_t * lengths,size_t count)284 OneQueue_DispatchV( OneQID         queue_id,
285                     void         **datas,
286                     size_t        *lengths,
287                     size_t         count )
288 {
289      DirectResult     ret = DR_OK;
290      OneQueueDispatch queue_dispatch;
291      size_t           i;
292      size_t           length = 0;
293      struct iovec     iov[count];
294 
295      D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, datas %p, lengths %p, count %zu )\n", __FUNCTION__, queue_id, datas, lengths, count );
296 
297      D_ASSERT( datas != NULL );
298      D_ASSERT( lengths != NULL );
299      D_ASSERT( count > 0 );
300 
301      for (i=0; i<count; i++) {
302           iov[i].iov_base = datas[i];
303           iov[i].iov_len  = lengths[i];
304 
305           length += lengths[i];
306      }
307 
308      D_DEBUG_AT( One_Queue, "  -> total length %zu\n", length );
309 
310      queue_dispatch.header.queue_id     = queue_id;
311      queue_dispatch.header.flags        = ONE_PACKET_NO_FLAGS;
312      queue_dispatch.header.size         = length;
313      queue_dispatch.header.uncompressed = length;
314 
315      queue_dispatch.iov       = iov;
316      queue_dispatch.iov_count = count;
317 
318      while (ioctl( one_fd, ONE_QUEUE_DISPATCH, &queue_dispatch )) {
319           switch (errno) {
320                case EINTR:
321                     continue;
322                default:
323                     break;
324           }
325 
326           ret = errno2result( errno );
327 
328           D_PERROR( "One/Queue: ONE_QUEUE_DISPATCH failed!\n" );
329 
330           goto error;
331      }
332 
333 error:
334      return ret;
335 }
336 
337 DirectResult
OneQueue_Receive(const OneQID * queue_ids,unsigned int queue_count,void * buf,size_t length,size_t * ret_received,bool headerless,int timeout_ms)338 OneQueue_Receive( const OneQID *queue_ids,
339                   unsigned int  queue_count,
340                   void         *buf,
341                   size_t        length,
342                   size_t       *ret_received,
343                   bool          headerless,
344                   int           timeout_ms )
345 {
346      DirectResult    ret = DR_OK;
347      OneQueueReceive queue_receive;
348      struct iovec    iov[2];
349      OnePacketHeader header;
350 
351 #if D_DEBUG_ENABLED
352      unsigned int    i;
353 
354      D_DEBUG_AT( One_Queue, "%s( ids %p, count %u, buf %p, length %zu )\n", __FUNCTION__, queue_ids, queue_count, buf, length );
355 
356      for (i=0; i<queue_count; i++)
357           D_DEBUG_AT( One_Queue, "  -> QID 0x%08x\n", queue_ids[i] );
358 #endif
359 
360      D_ASSERT( queue_ids != NULL );
361      D_ASSERT( buf != NULL );
362      D_ASSERT( ret_received != NULL );
363 
364      if (headerless) {
365           iov[0].iov_base = &header;
366           iov[0].iov_len  = sizeof(header);
367 
368           iov[1].iov_base = buf;
369           iov[1].iov_len  = length;
370      }
371      else {
372           iov[0].iov_base = buf;
373           iov[0].iov_len  = length;
374      }
375 
376      queue_receive.ids       = queue_ids;
377      queue_receive.ids_count = queue_count;
378 
379      queue_receive.iov       = iov;
380      queue_receive.iov_count = headerless ? 2 : 1;
381 
382      queue_receive.timeout_ms = timeout_ms;
383 
384      while (ioctl( one_fd, ONE_QUEUE_RECEIVE, &queue_receive )) {
385           switch (errno) {
386                case EINTR:
387                     continue;
388                case ETIMEDOUT:
389                     D_DEBUG_AT( One_Queue, "  -> TIMEOUT\n" );
390                     ret = DR_TIMEOUT;
391                     goto error;
392                default:
393                     break;
394           }
395 
396           ret = errno2result( errno );
397 
398           D_PERROR( "One/Queue: ONE_QUEUE_RECEIVE failed!\n" );
399 
400           goto error;
401      }
402 
403      D_DEBUG_AT( One_Queue, "  -> received %zu\n", queue_receive.ret_received );
404 
405      if (headerless)
406           *ret_received = queue_receive.ret_received - sizeof(header);
407      else
408           *ret_received = queue_receive.ret_received;
409 
410 error:
411      return ret;
412 }
413 
414 DirectResult
OneQueue_ReceiveV(const OneQID * queue_ids,unsigned int queue_count,void ** buf,size_t * length,size_t count,size_t * ret_received,bool headerless,int timeout_ms)415 OneQueue_ReceiveV( const OneQID  *queue_ids,
416                    unsigned int   queue_count,
417                    void         **buf,
418                    size_t        *length,
419                    size_t         count,
420                    size_t        *ret_received,
421                    bool           headerless,
422                    int            timeout_ms )
423 {
424      DirectResult    ret = DR_OK;
425      unsigned int    i;
426      OneQueueReceive queue_receive;
427      struct iovec    iov[count+1];
428      OnePacketHeader header;
429 
430 #if D_DEBUG_ENABLED
431      D_DEBUG_AT( One_Queue, "%s( ids %p, count %u, buf %p, length %p, count %zu )\n", __FUNCTION__, queue_ids, queue_count, buf, length, count );
432 
433      for (i=0; i<queue_count; i++)
434           D_DEBUG_AT( One_Queue, "  -> QID 0x%08x\n", queue_ids[i] );
435 #endif
436 
437      D_ASSERT( queue_ids != NULL );
438      D_ASSERT( buf != NULL );
439      D_ASSERT( ret_received != NULL );
440 
441      if (headerless) {
442           iov[0].iov_base = &header;
443           iov[0].iov_len  = sizeof(header);
444 
445           for (i=0; i<count; i++) {
446                iov[i+1].iov_base = buf[i];
447                iov[i+1].iov_len  = length[i];
448           }
449      }
450      else {
451           for (i=0; i<count; i++) {
452                iov[i+0].iov_base = buf[i];
453                iov[i+0].iov_len  = length[i];
454           }
455      }
456 
457      queue_receive.ids       = queue_ids;
458      queue_receive.ids_count = queue_count;
459 
460      queue_receive.iov       = iov;
461      queue_receive.iov_count = headerless ? (count+1) : count;
462 
463      queue_receive.timeout_ms = timeout_ms;
464 
465      while (ioctl( one_fd, ONE_QUEUE_RECEIVE, &queue_receive )) {
466           switch (errno) {
467                case EINTR:
468                     continue;
469                case ETIMEDOUT:
470                     D_DEBUG_AT( One_Queue, "  -> TIMEOUT\n" );
471                     ret = DR_TIMEOUT;
472                     goto error;
473                default:
474                     break;
475           }
476 
477           ret = errno2result( errno );
478 
479           D_PERROR( "One/Queue: ONE_QUEUE_RECEIVE failed!\n" );
480 
481           goto error;
482      }
483 
484      D_DEBUG_AT( One_Queue, "  -> received %zu\n", queue_receive.ret_received );
485 
486      if (headerless)
487           *ret_received = queue_receive.ret_received - sizeof(header);
488      else
489           *ret_received = queue_receive.ret_received;
490 
491 error:
492      return ret;
493 }
494 
495 DirectResult
OneQueue_DispatchReceive(OneQID queue_id,void * data,size_t data_length,const OneQID * queue_ids,unsigned int queue_count,void * buf,size_t buf_length,size_t * ret_received,bool headerless,int timeout_ms)496 OneQueue_DispatchReceive( OneQID          queue_id,
497                           void           *data,
498                           size_t          data_length,
499                           const OneQID   *queue_ids,
500                           unsigned int    queue_count,
501                           void           *buf,
502                           size_t          buf_length,
503                           size_t         *ret_received,
504                           bool            headerless,
505                           int             timeout_ms )
506 {
507      DirectResult            ret = DR_OK;
508      OneQueueDispatch        queue_dispatch;
509      OneQueueDispatchReceive queue_dispatch_receive;
510      struct iovec            data_iov[1];
511      struct iovec            buf_iov[2];
512      OnePacketHeader         header;
513 
514 #if D_DEBUG_ENABLED
515      unsigned int    i;
516 
517      D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, data %p, length %zu, ids %p, count %u, buf %p, length %zu )\n",
518                  __FUNCTION__, queue_id, data, data_length, queue_ids, queue_count, buf, buf_length );
519 
520      for (i=0; i<queue_count; i++)
521           D_DEBUG_AT( One_Queue, "  -> QID 0x%08x\n", queue_ids[i] );
522 #endif
523 
524      D_ASSERT( data != NULL );
525      D_ASSERT( queue_ids != NULL );
526      D_ASSERT( buf != NULL );
527      D_ASSERT( ret_received != NULL );
528 
529      queue_dispatch_receive.dispatch       = &queue_dispatch;
530      queue_dispatch_receive.dispatch_count = 1;
531 
532      data_iov[0].iov_base = data;
533      data_iov[0].iov_len  = data_length;
534 
535      queue_dispatch.header.queue_id     = queue_id;
536      queue_dispatch.header.flags        = ONE_PACKET_NO_FLAGS;
537      queue_dispatch.header.size         = data_length;
538      queue_dispatch.header.uncompressed = data_length;
539 
540      queue_dispatch.iov       = data_iov;
541      queue_dispatch.iov_count = 1;
542 
543      if (headerless) {
544           buf_iov[0].iov_base = &header;
545           buf_iov[0].iov_len  = sizeof(header);
546 
547           buf_iov[1].iov_base = buf;
548           buf_iov[1].iov_len  = buf_length;
549      }
550      else {
551           buf_iov[0].iov_base = buf;
552           buf_iov[0].iov_len  = buf_length;
553      }
554 
555      queue_dispatch_receive.receive.ids       = queue_ids;
556      queue_dispatch_receive.receive.ids_count = queue_count;
557 
558      queue_dispatch_receive.receive.iov       = buf_iov;
559      queue_dispatch_receive.receive.iov_count = headerless ? 2 : 1;
560 
561      queue_dispatch_receive.receive.timeout_ms = timeout_ms;
562 
563      while (ioctl( one_fd, ONE_QUEUE_DISPATCH_RECEIVE, &queue_dispatch_receive )) {
564           switch (errno) {
565                case EINTR:
566                     continue;
567                case ETIMEDOUT:
568                     D_DEBUG_AT( One_Queue, "  -> TIMEOUT\n" );
569                     ret = DR_TIMEOUT;
570                     goto error;
571                default:
572                     break;
573           }
574 
575           ret = errno2result( errno );
576 
577           D_PERROR( "One/Queue: ONE_QUEUE_DISPATCH_RECEIVE failed!\n" );
578 
579           goto error;
580      }
581 
582      D_DEBUG_AT( One_Queue, "  -> received %zu\n", queue_dispatch_receive.receive.ret_received );
583 
584      if (headerless)
585           *ret_received = queue_dispatch_receive.receive.ret_received - sizeof(header);
586      else
587           *ret_received = queue_dispatch_receive.receive.ret_received;
588 
589 error:
590      return ret;
591 }
592 
593 DirectResult
OneQueue_WakeUp(const OneQID * queue_ids,unsigned int queue_count)594 OneQueue_WakeUp( const OneQID *queue_ids,
595                  unsigned int  queue_count )
596 {
597      DirectResult   ret = DR_OK;
598      OneQueueWakeUp queue_wakeup;
599 
600 #if D_DEBUG_ENABLED
601      unsigned int   i;
602 
603      D_DEBUG_AT( One_Queue, "%s( ids %p, count %u )\n", __FUNCTION__, queue_ids, queue_count );
604 
605      for (i=0; i<queue_count; i++)
606           D_DEBUG_AT( One_Queue, "  -> QID 0x%08x\n", queue_ids[i] );
607 #endif
608 
609      D_ASSERT( queue_ids != NULL );
610 
611      queue_wakeup.ids       = queue_ids;
612      queue_wakeup.ids_count = queue_count;
613 
614      while (ioctl( one_fd, ONE_QUEUE_WAKEUP, &queue_wakeup )) {
615           switch (errno) {
616                case EINTR:
617                     continue;
618                default:
619                     break;
620           }
621 
622           ret = errno2result( errno );
623 
624           D_PERROR( "One/Queue: ONE_QUEUE_WAKEUP failed!\n" );
625 
626           break;
627      }
628 
629      return ret;
630 }
631 
632 DirectResult
OneQueue_SetName(OneQID queue_id,const char * name)633 OneQueue_SetName( OneQID      queue_id,
634                   const char *name )
635 {
636      DirectResult ret = DR_OK;
637      OneEntryInfo info;
638 
639      D_DEBUG_AT( One_Queue, "%s( id %u, name '%s' )\n", __FUNCTION__, queue_id, name );
640 
641      D_ASSERT( name != NULL );
642 
643      info.type = ONE_QUEUE;
644      info.id   = queue_id;
645 
646      direct_snputs( info.name, name, ONE_ENTRY_INFO_NAME_LENGTH );
647 
648      while (ioctl( one_fd, ONE_ENTRY_SET_INFO, &info )) {
649           switch (errno) {
650                case EINTR:
651                     continue;
652                default:
653                     break;
654           }
655 
656           ret = errno2result( errno );
657 
658           D_PERROR( "One/Queue: ONE_ENTRY_SET_INFO failed!\n" );
659 
660           break;
661      }
662 
663      return ret;
664 }
665 
666 /*********************************************************************************************************************/
667 
668 #define RECEIVE_BUFFER_SIZE   65536
669 
670 /**********************************************************************************************************************/
671 
672 typedef struct {
673      int                 magic;
674 
675      OneQID              queue_id;
676 
677      OneThreadDispatch   dispatch;
678      void               *context;
679 } AddedQueue;
680 
681 struct __One_OneThread {
682      int                 magic;
683 
684      DirectMutex         lock;
685 
686      bool                stop;
687 
688      DirectThread       *thread;
689 
690      DirectHash         *queues;
691 
692      OneQID             *queue_ids;
693      unsigned int        queue_count;
694 
695      unsigned int        queues_age;
696 };
697 
698 static void *
OneThread_Dispatcher(DirectThread * thread,void * arg)699 OneThread_Dispatcher( DirectThread *thread,
700                       void         *arg )
701 {
702      DirectResult  ret;
703      OneThread    *data = arg;
704      char         *buf;
705      OneQID       *ids          = NULL;
706      unsigned int  ids_capacity = 1000;
707      unsigned int  ids_count    = 0;
708      unsigned int  ids_age      = 0;
709 
710      D_DEBUG_AT( One_Thread, "%s()\n", __FUNCTION__ );
711 
712      buf = D_MALLOC( RECEIVE_BUFFER_SIZE );
713      if (!buf) {
714           D_OOM();
715           return NULL;
716      }
717 
718      ids = D_MALLOC( sizeof(OneQID) * ids_capacity );
719      if (!ids) {
720           D_OOM();
721           return NULL;
722      }
723 
724      while (!data->stop) {
725           size_t length;
726           size_t offset;
727 
728           direct_thread_lock( thread );
729 
730           if (data->queues_age != ids_age) {
731                if (!data->queue_count) {
732                     // FIXME: use wait queue
733                     usleep( 10000 );
734 
735                     direct_thread_unlock( thread );
736                     continue;
737                }
738 
739                ids_age   = data->queues_age;
740                ids_count = data->queue_count;
741 
742                if (ids_count > ids_capacity) {
743                     while (ids_count > ids_capacity)
744                          ids_capacity *= 2;
745 
746                     D_FREE( ids );
747 
748                     D_ASSERT( data->queue_count > 0 );
749 
750                     ids = D_MALLOC( sizeof(OneQID) * ids_capacity );
751                     if (!ids) {
752                          D_OOM();
753                          direct_thread_unlock( thread );
754                          return NULL;
755                     }
756                }
757 
758                direct_memcpy( ids, data->queue_ids, sizeof(OneQID) * data->queue_count );
759           }
760 
761           direct_thread_unlock( thread );
762 
763           ret = OneQueue_Receive( ids, ids_count, buf, RECEIVE_BUFFER_SIZE, &length, false, 0 );
764           if (ret) {
765                D_DERROR( ret, "IComaComponent/One: Could not receive from Component Queue!\n" );
766                break;
767           }
768 
769           D_DEBUG_AT( One_Thread, "%s() -> received %zu bytes\n", __FUNCTION__, length );
770 
771           for (offset=0; offset < length; ) {
772                AddedQueue      *queue;
773                OnePacketHeader *header = (OnePacketHeader *)(buf + offset);
774                size_t           size   = header->uncompressed;
775 
776                D_DEBUG_AT( One_Thread, "  -> size %zu\n", size );
777 
778                offset += sizeof(OnePacketHeader) + size;
779 
780                if (offset > length) {
781                     D_WARN( "invalid packet (offset %zu, length %zu)", offset, length );
782                     continue;
783                }
784 
785                direct_thread_lock( thread );
786 
787                queue = direct_hash_lookup( data->queues, header->queue_id );
788 
789                direct_thread_unlock( thread );
790 
791                if (queue) {
792                     D_MAGIC_ASSERT( queue, AddedQueue );
793 
794                     D_DEBUG_AT( One_Thread, "  -> added queue %p, dispatch %p\n", queue, queue->dispatch );
795 
796                     queue->dispatch( queue->context, header, header + 1, data );
797                }
798           }
799      }
800 
801      D_FREE( buf );
802 
803      return NULL;
804 }
805 
806 DirectResult
OneThread_Create(const char * name,OneThread ** ret_thread)807 OneThread_Create( const char  *name,
808                   OneThread  **ret_thread )
809 {
810      DirectResult  ret;
811      OneThread    *thread;
812 
813      D_DEBUG_AT( One_Thread, "%s( '%s' )\n", __FUNCTION__, name );
814 
815      D_ASSERT( name != NULL );
816      D_ASSERT( ret_thread != NULL );
817 
818      thread = D_CALLOC( 1, sizeof(OneThread) );
819      if (!thread)
820           return D_OOM();
821 
822 
823      // FIXME: Get rid of id array management when adding more queue connection support (attach)
824 
825      thread->queue_ids = D_MALLOC( sizeof(OneQID) );
826      if (!thread->queue_ids) {
827           ret = D_OOM();
828           goto error;
829      }
830 
831      ret = OneQueue_New( ONE_QUEUE_NO_FLAGS, ONE_QID_NONE, &thread->queue_ids[0] );
832      if (ret)
833           goto error;
834 
835      thread->queue_count = 1;
836      thread->queues_age  = 1;
837 
838 
839      ret = direct_hash_create( 23, &thread->queues );
840      if (ret)
841           goto error_hash_create;
842 
843      direct_mutex_init( &thread->lock );
844 
845      *ret_thread = thread;
846 
847      thread->thread = direct_thread_create( DTT_DEFAULT, OneThread_Dispatcher, thread, name );
848 
849      return DR_OK;
850 
851 
852 error_hash_create:
853      OneQueue_Destroy( thread->queue_ids[0] );
854 
855 error:
856      if (thread->queue_ids)
857           D_FREE( thread->queue_ids );
858 
859      D_FREE( thread );
860 
861      return ret;
862 }
863 
864 void
OneThread_Destroy(OneThread * thread)865 OneThread_Destroy( OneThread *thread )
866 {
867      D_DEBUG_AT( One_Thread, "%s()\n", __FUNCTION__ );
868 
869 }
870 
871 DirectResult
OneThread_AddQueue(OneThread * thread,OneQID queue_id,OneThreadDispatch dispatch,void * context)872 OneThread_AddQueue( OneThread         *thread,
873                     OneQID             queue_id,
874                     OneThreadDispatch  dispatch,
875                     void              *context )
876 {
877      AddedQueue *queue;
878 
879      D_DEBUG_AT( One_Thread, "%s()\n", __FUNCTION__ );
880 
881      direct_thread_lock( thread->thread );
882 
883      queue = direct_hash_lookup( thread->queues, queue_id );
884      if (queue) {
885           direct_thread_unlock( thread->thread );
886           return DR_BUSY;
887      }
888 
889      queue = D_CALLOC( 1, sizeof(AddedQueue) );
890      if (!queue) {
891           direct_thread_unlock( thread->thread );
892           return D_OOM();
893      }
894 
895      queue->queue_id = queue_id;
896      queue->dispatch = dispatch;
897      queue->context  = context;
898 
899      D_MAGIC_SET( queue, AddedQueue );
900 
901      direct_hash_insert( thread->queues, queue_id, queue );
902 
903      thread->queue_ids = D_REALLOC( thread->queue_ids, sizeof(OneQID) * (thread->queue_count + 1) );
904 
905      thread->queue_ids[thread->queue_count] = queue_id;
906 
907      thread->queue_count++;
908 
909      thread->queues_age++;
910 
911      direct_thread_unlock( thread->thread );
912 
913      OneQueue_WakeUp( &thread->queue_ids[0], 1 );
914 
915      return DR_OK;
916 }
917 
918 DirectResult
OneThread_RemoveQueue(OneThread * thread,OneQID queue_id)919 OneThread_RemoveQueue( OneThread *thread,
920                        OneQID     queue_id )
921 {
922      AddedQueue   *queue;
923      unsigned int  i;
924 
925      D_DEBUG_AT( One_Thread, "%s()\n", __FUNCTION__ );
926 
927      direct_thread_lock( thread->thread );
928 
929      queue = direct_hash_lookup( thread->queues, queue_id );
930      if (!queue) {
931           direct_mutex_unlock( &thread->lock );
932           return DR_ITEMNOTFOUND;
933      }
934 
935      direct_hash_remove( thread->queues, queue_id );
936 
937      for (i=0; i<thread->queue_count; i++) {
938           if (thread->queue_ids[i] == queue_id)
939                break;
940      }
941 
942      if (i >= thread->queue_count)
943           D_BUG( "queue_ids are missing element" );
944 
945      for (i++; i<thread->queue_count; i++)
946           thread->queue_ids[i-1] = thread->queue_ids[i];
947 
948      thread->queue_ids = D_REALLOC( thread->queue_ids, sizeof(OneQID) * thread->queue_count );
949 
950      thread->queue_count--;
951 
952      thread->queues_age++;
953 
954      direct_mutex_unlock( &thread->lock );
955 
956      OneQueue_WakeUp( &thread->queue_ids[0], 1 );
957 
958      D_MAGIC_CLEAR( queue );
959 
960      D_FREE( queue );
961 
962      return DR_OK;
963 }
964 
965