1 /*
2 (c) Copyright 2011 Denis Oliver Kropp <dok@directfb.org>
3
4 All rights reserved.
5
6 This program is free software; you can redistribute it and/or
7 modify it under the terms of the GNU General Public License
8 as published by the Free Software Foundation; either version
9 2 of the License, or (at your option) any later version.
10 */
11
12 //#define DIRECT_ENABLE_DEBUG
13
14 #include <config.h>
15
16 #include <direct/debug.h>
17 #include <direct/direct.h>
18 #include <direct/hash.h>
19 #include <direct/log.h>
20 #include <direct/mem.h>
21 #include <direct/memcpy.h>
22 #include <direct/messages.h>
23 #include <direct/thread.h>
24 #include <direct/util.h>
25
26 #include <sys/fcntl.h>
27 #include <sys/ioctl.h>
28 #include <sys/uio.h>
29 #include <unistd.h>
30
31 #include <linux/one.h>
32
33 #include "One.h"
34
35 #ifndef ONEDEV
36 #define ONEDEV "/dev/one0"
37 #endif
38
39
40 D_DEBUG_DOMAIN( One_Main, "One/Main", "One Main" );
41 D_DEBUG_DOMAIN( One_Queue, "One/Queue", "One Queue" );
42 D_DEBUG_DOMAIN( One_Thread, "One/Thread", "One Thread" );
43
44 /*********************************************************************************************************************/
45
46 static DirectMutex one_lock;
47 static unsigned int one_refs;
48 static int one_fd = -1; /* File descriptor of the One Kernel Device */
49
50 __attribute__((constructor))
51 static void
__One_Init(void)52 __One_Init( void )
53 {
54 direct_mutex_init( &one_lock );
55 }
56
57 /*********************************************************************************************************************/
58
59 DirectResult
One_Initialize()60 One_Initialize()
61 {
62 DirectResult ret = DR_OK;
63
64 D_DEBUG_AT( One_Main, "%s()\n", __FUNCTION__ );
65
66 ret = direct_initialize();
67 if (ret)
68 return ret;
69
70 direct_mutex_lock( &one_lock );
71
72 if (!one_refs) {
73 /* Open the One Kernel Device. */
74 one_fd = open( ONEDEV, O_RDWR );
75 if (one_fd < 0) {
76 ret = errno2result( errno );
77 D_PERROR( "One/Main: Opening " ONEDEV " failed!\n" );
78 }
79 }
80
81 if (ret == DR_OK)
82 one_refs++;
83
84 direct_mutex_unlock( &one_lock );
85
86 return ret;
87 }
88
89 DirectResult
One_Shutdown()90 One_Shutdown()
91 {
92 D_DEBUG_AT( One_Main, "%s()\n", __FUNCTION__ );
93
94 direct_mutex_lock( &one_lock );
95
96 if (!--one_refs) {
97 /* Close the One Kernel Device. */
98 close( one_fd );
99 one_fd = -1;
100 }
101
102 direct_mutex_unlock( &one_lock );
103
104 direct_shutdown();
105
106 return DR_OK;
107 }
108
109 /*********************************************************************************************************************/
110
111 DirectResult
OneQueue_New(OneQueueFlags flags,OneQID queue_id,OneQID * ret_id)112 OneQueue_New( OneQueueFlags flags,
113 OneQID queue_id,
114 OneQID *ret_id )
115 {
116 DirectResult ret = DR_OK;
117 OneQueueNew queue_new;
118
119 D_DEBUG_AT( One_Queue, "%s( flags 0x%08x )\n", __FUNCTION__, flags );
120
121 D_ASSERT( ret_id != NULL );
122
123 queue_new.flags = flags;
124 queue_new.queue_id = queue_id;
125
126 while (ioctl( one_fd, ONE_QUEUE_NEW, &queue_new )) {
127 switch (errno) {
128 case EINTR:
129 continue;
130 default:
131 break;
132 }
133
134 ret = errno2result( errno );
135
136 D_PERROR( "One/Queue: ONE_QUEUE_NEW failed!\n" );
137
138 goto error;
139 }
140
141 D_DEBUG_AT( One_Queue, " -> QID 0x%08x\n", queue_new.queue_id );
142
143 *ret_id = queue_new.queue_id;
144
145 error:
146 return ret;
147 }
148
149 DirectResult
OneQueue_Destroy(OneQID queue_id)150 OneQueue_Destroy( OneQID queue_id )
151 {
152 DirectResult ret = DR_OK;
153 OneQueueDestroy queue_destroy;
154
155 D_DEBUG_AT( One_Queue, "%s( QID 0x%08x )\n", __FUNCTION__, queue_id );
156
157 queue_destroy.queue_id = queue_id;
158
159 while (ioctl( one_fd, ONE_QUEUE_DESTROY, &queue_destroy )) {
160 switch (errno) {
161 case EINTR:
162 continue;
163 default:
164 break;
165 }
166
167 ret = errno2result( errno );
168
169 D_PERROR( "One/Queue: ONE_QUEUE_DESTROY failed!\n" );
170
171 goto error;
172 }
173
174 error:
175 return ret;
176 }
177
178 DirectResult
OneQueue_Attach(OneQID queue_id,OneQID target_id)179 OneQueue_Attach( OneQID queue_id,
180 OneQID target_id )
181 {
182 DirectResult ret = DR_OK;
183 OneQueueAttach queue_attach;
184
185 D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, target QID 0x%08x )\n", __FUNCTION__, queue_id, target_id );
186
187 queue_attach.queue_id = queue_id;
188 queue_attach.target_id = target_id;
189
190 while (ioctl( one_fd, ONE_QUEUE_ATTACH, &queue_attach )) {
191 switch (errno) {
192 case EINTR:
193 continue;
194 default:
195 break;
196 }
197
198 ret = errno2result( errno );
199
200 D_PERROR( "One/Queue: ONE_QUEUE_ATTACH failed!\n" );
201
202 goto error;
203 }
204
205 error:
206 return ret;
207 }
208
209 DirectResult
OneQueue_Detach(OneQID queue_id,OneQID target_id)210 OneQueue_Detach( OneQID queue_id,
211 OneQID target_id )
212 {
213 DirectResult ret = DR_OK;
214 OneQueueDetach queue_detach;
215
216 D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, target QID 0x%08x )\n", __FUNCTION__, queue_id, target_id );
217
218 queue_detach.queue_id = queue_id;
219 queue_detach.target_id = target_id;
220
221 while (ioctl( one_fd, ONE_QUEUE_DETACH, &queue_detach )) {
222 switch (errno) {
223 case EINTR:
224 continue;
225 default:
226 break;
227 }
228
229 ret = errno2result( errno );
230
231 D_PERROR( "One/Queue: ONE_QUEUE_DETACH failed!\n" );
232
233 goto error;
234 }
235
236 error:
237 return ret;
238 }
239
240 DirectResult
OneQueue_Dispatch(OneQID queue_id,void * data,size_t length)241 OneQueue_Dispatch( OneQID queue_id,
242 void *data,
243 size_t length )
244 {
245 DirectResult ret = DR_OK;
246 OneQueueDispatch queue_dispatch;
247 struct iovec iov[1];
248
249 D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, data %p, length %zu )\n", __FUNCTION__, queue_id, data, length );
250
251 D_ASSERT( data != NULL );
252
253 iov[0].iov_base = data;
254 iov[0].iov_len = length;
255
256 queue_dispatch.header.queue_id = queue_id;
257 queue_dispatch.header.flags = ONE_PACKET_NO_FLAGS;
258 queue_dispatch.header.size = length;
259 queue_dispatch.header.uncompressed = length;
260
261 queue_dispatch.iov = iov;
262 queue_dispatch.iov_count = 1;
263
264 while (ioctl( one_fd, ONE_QUEUE_DISPATCH, &queue_dispatch )) {
265 switch (errno) {
266 case EINTR:
267 continue;
268 default:
269 break;
270 }
271
272 ret = errno2result( errno );
273
274 D_PERROR( "One/Queue: ONE_QUEUE_DISPATCH failed!\n" );
275
276 goto error;
277 }
278
279 error:
280 return ret;
281 }
282
283 DirectResult
OneQueue_DispatchV(OneQID queue_id,void ** datas,size_t * lengths,size_t count)284 OneQueue_DispatchV( OneQID queue_id,
285 void **datas,
286 size_t *lengths,
287 size_t count )
288 {
289 DirectResult ret = DR_OK;
290 OneQueueDispatch queue_dispatch;
291 size_t i;
292 size_t length = 0;
293 struct iovec iov[count];
294
295 D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, datas %p, lengths %p, count %zu )\n", __FUNCTION__, queue_id, datas, lengths, count );
296
297 D_ASSERT( datas != NULL );
298 D_ASSERT( lengths != NULL );
299 D_ASSERT( count > 0 );
300
301 for (i=0; i<count; i++) {
302 iov[i].iov_base = datas[i];
303 iov[i].iov_len = lengths[i];
304
305 length += lengths[i];
306 }
307
308 D_DEBUG_AT( One_Queue, " -> total length %zu\n", length );
309
310 queue_dispatch.header.queue_id = queue_id;
311 queue_dispatch.header.flags = ONE_PACKET_NO_FLAGS;
312 queue_dispatch.header.size = length;
313 queue_dispatch.header.uncompressed = length;
314
315 queue_dispatch.iov = iov;
316 queue_dispatch.iov_count = count;
317
318 while (ioctl( one_fd, ONE_QUEUE_DISPATCH, &queue_dispatch )) {
319 switch (errno) {
320 case EINTR:
321 continue;
322 default:
323 break;
324 }
325
326 ret = errno2result( errno );
327
328 D_PERROR( "One/Queue: ONE_QUEUE_DISPATCH failed!\n" );
329
330 goto error;
331 }
332
333 error:
334 return ret;
335 }
336
337 DirectResult
OneQueue_Receive(const OneQID * queue_ids,unsigned int queue_count,void * buf,size_t length,size_t * ret_received,bool headerless,int timeout_ms)338 OneQueue_Receive( const OneQID *queue_ids,
339 unsigned int queue_count,
340 void *buf,
341 size_t length,
342 size_t *ret_received,
343 bool headerless,
344 int timeout_ms )
345 {
346 DirectResult ret = DR_OK;
347 OneQueueReceive queue_receive;
348 struct iovec iov[2];
349 OnePacketHeader header;
350
351 #if D_DEBUG_ENABLED
352 unsigned int i;
353
354 D_DEBUG_AT( One_Queue, "%s( ids %p, count %u, buf %p, length %zu )\n", __FUNCTION__, queue_ids, queue_count, buf, length );
355
356 for (i=0; i<queue_count; i++)
357 D_DEBUG_AT( One_Queue, " -> QID 0x%08x\n", queue_ids[i] );
358 #endif
359
360 D_ASSERT( queue_ids != NULL );
361 D_ASSERT( buf != NULL );
362 D_ASSERT( ret_received != NULL );
363
364 if (headerless) {
365 iov[0].iov_base = &header;
366 iov[0].iov_len = sizeof(header);
367
368 iov[1].iov_base = buf;
369 iov[1].iov_len = length;
370 }
371 else {
372 iov[0].iov_base = buf;
373 iov[0].iov_len = length;
374 }
375
376 queue_receive.ids = queue_ids;
377 queue_receive.ids_count = queue_count;
378
379 queue_receive.iov = iov;
380 queue_receive.iov_count = headerless ? 2 : 1;
381
382 queue_receive.timeout_ms = timeout_ms;
383
384 while (ioctl( one_fd, ONE_QUEUE_RECEIVE, &queue_receive )) {
385 switch (errno) {
386 case EINTR:
387 continue;
388 case ETIMEDOUT:
389 D_DEBUG_AT( One_Queue, " -> TIMEOUT\n" );
390 ret = DR_TIMEOUT;
391 goto error;
392 default:
393 break;
394 }
395
396 ret = errno2result( errno );
397
398 D_PERROR( "One/Queue: ONE_QUEUE_RECEIVE failed!\n" );
399
400 goto error;
401 }
402
403 D_DEBUG_AT( One_Queue, " -> received %zu\n", queue_receive.ret_received );
404
405 if (headerless)
406 *ret_received = queue_receive.ret_received - sizeof(header);
407 else
408 *ret_received = queue_receive.ret_received;
409
410 error:
411 return ret;
412 }
413
414 DirectResult
OneQueue_ReceiveV(const OneQID * queue_ids,unsigned int queue_count,void ** buf,size_t * length,size_t count,size_t * ret_received,bool headerless,int timeout_ms)415 OneQueue_ReceiveV( const OneQID *queue_ids,
416 unsigned int queue_count,
417 void **buf,
418 size_t *length,
419 size_t count,
420 size_t *ret_received,
421 bool headerless,
422 int timeout_ms )
423 {
424 DirectResult ret = DR_OK;
425 unsigned int i;
426 OneQueueReceive queue_receive;
427 struct iovec iov[count+1];
428 OnePacketHeader header;
429
430 #if D_DEBUG_ENABLED
431 D_DEBUG_AT( One_Queue, "%s( ids %p, count %u, buf %p, length %p, count %zu )\n", __FUNCTION__, queue_ids, queue_count, buf, length, count );
432
433 for (i=0; i<queue_count; i++)
434 D_DEBUG_AT( One_Queue, " -> QID 0x%08x\n", queue_ids[i] );
435 #endif
436
437 D_ASSERT( queue_ids != NULL );
438 D_ASSERT( buf != NULL );
439 D_ASSERT( ret_received != NULL );
440
441 if (headerless) {
442 iov[0].iov_base = &header;
443 iov[0].iov_len = sizeof(header);
444
445 for (i=0; i<count; i++) {
446 iov[i+1].iov_base = buf[i];
447 iov[i+1].iov_len = length[i];
448 }
449 }
450 else {
451 for (i=0; i<count; i++) {
452 iov[i+0].iov_base = buf[i];
453 iov[i+0].iov_len = length[i];
454 }
455 }
456
457 queue_receive.ids = queue_ids;
458 queue_receive.ids_count = queue_count;
459
460 queue_receive.iov = iov;
461 queue_receive.iov_count = headerless ? (count+1) : count;
462
463 queue_receive.timeout_ms = timeout_ms;
464
465 while (ioctl( one_fd, ONE_QUEUE_RECEIVE, &queue_receive )) {
466 switch (errno) {
467 case EINTR:
468 continue;
469 case ETIMEDOUT:
470 D_DEBUG_AT( One_Queue, " -> TIMEOUT\n" );
471 ret = DR_TIMEOUT;
472 goto error;
473 default:
474 break;
475 }
476
477 ret = errno2result( errno );
478
479 D_PERROR( "One/Queue: ONE_QUEUE_RECEIVE failed!\n" );
480
481 goto error;
482 }
483
484 D_DEBUG_AT( One_Queue, " -> received %zu\n", queue_receive.ret_received );
485
486 if (headerless)
487 *ret_received = queue_receive.ret_received - sizeof(header);
488 else
489 *ret_received = queue_receive.ret_received;
490
491 error:
492 return ret;
493 }
494
495 DirectResult
OneQueue_DispatchReceive(OneQID queue_id,void * data,size_t data_length,const OneQID * queue_ids,unsigned int queue_count,void * buf,size_t buf_length,size_t * ret_received,bool headerless,int timeout_ms)496 OneQueue_DispatchReceive( OneQID queue_id,
497 void *data,
498 size_t data_length,
499 const OneQID *queue_ids,
500 unsigned int queue_count,
501 void *buf,
502 size_t buf_length,
503 size_t *ret_received,
504 bool headerless,
505 int timeout_ms )
506 {
507 DirectResult ret = DR_OK;
508 OneQueueDispatch queue_dispatch;
509 OneQueueDispatchReceive queue_dispatch_receive;
510 struct iovec data_iov[1];
511 struct iovec buf_iov[2];
512 OnePacketHeader header;
513
514 #if D_DEBUG_ENABLED
515 unsigned int i;
516
517 D_DEBUG_AT( One_Queue, "%s( QID 0x%08x, data %p, length %zu, ids %p, count %u, buf %p, length %zu )\n",
518 __FUNCTION__, queue_id, data, data_length, queue_ids, queue_count, buf, buf_length );
519
520 for (i=0; i<queue_count; i++)
521 D_DEBUG_AT( One_Queue, " -> QID 0x%08x\n", queue_ids[i] );
522 #endif
523
524 D_ASSERT( data != NULL );
525 D_ASSERT( queue_ids != NULL );
526 D_ASSERT( buf != NULL );
527 D_ASSERT( ret_received != NULL );
528
529 queue_dispatch_receive.dispatch = &queue_dispatch;
530 queue_dispatch_receive.dispatch_count = 1;
531
532 data_iov[0].iov_base = data;
533 data_iov[0].iov_len = data_length;
534
535 queue_dispatch.header.queue_id = queue_id;
536 queue_dispatch.header.flags = ONE_PACKET_NO_FLAGS;
537 queue_dispatch.header.size = data_length;
538 queue_dispatch.header.uncompressed = data_length;
539
540 queue_dispatch.iov = data_iov;
541 queue_dispatch.iov_count = 1;
542
543 if (headerless) {
544 buf_iov[0].iov_base = &header;
545 buf_iov[0].iov_len = sizeof(header);
546
547 buf_iov[1].iov_base = buf;
548 buf_iov[1].iov_len = buf_length;
549 }
550 else {
551 buf_iov[0].iov_base = buf;
552 buf_iov[0].iov_len = buf_length;
553 }
554
555 queue_dispatch_receive.receive.ids = queue_ids;
556 queue_dispatch_receive.receive.ids_count = queue_count;
557
558 queue_dispatch_receive.receive.iov = buf_iov;
559 queue_dispatch_receive.receive.iov_count = headerless ? 2 : 1;
560
561 queue_dispatch_receive.receive.timeout_ms = timeout_ms;
562
563 while (ioctl( one_fd, ONE_QUEUE_DISPATCH_RECEIVE, &queue_dispatch_receive )) {
564 switch (errno) {
565 case EINTR:
566 continue;
567 case ETIMEDOUT:
568 D_DEBUG_AT( One_Queue, " -> TIMEOUT\n" );
569 ret = DR_TIMEOUT;
570 goto error;
571 default:
572 break;
573 }
574
575 ret = errno2result( errno );
576
577 D_PERROR( "One/Queue: ONE_QUEUE_DISPATCH_RECEIVE failed!\n" );
578
579 goto error;
580 }
581
582 D_DEBUG_AT( One_Queue, " -> received %zu\n", queue_dispatch_receive.receive.ret_received );
583
584 if (headerless)
585 *ret_received = queue_dispatch_receive.receive.ret_received - sizeof(header);
586 else
587 *ret_received = queue_dispatch_receive.receive.ret_received;
588
589 error:
590 return ret;
591 }
592
593 DirectResult
OneQueue_WakeUp(const OneQID * queue_ids,unsigned int queue_count)594 OneQueue_WakeUp( const OneQID *queue_ids,
595 unsigned int queue_count )
596 {
597 DirectResult ret = DR_OK;
598 OneQueueWakeUp queue_wakeup;
599
600 #if D_DEBUG_ENABLED
601 unsigned int i;
602
603 D_DEBUG_AT( One_Queue, "%s( ids %p, count %u )\n", __FUNCTION__, queue_ids, queue_count );
604
605 for (i=0; i<queue_count; i++)
606 D_DEBUG_AT( One_Queue, " -> QID 0x%08x\n", queue_ids[i] );
607 #endif
608
609 D_ASSERT( queue_ids != NULL );
610
611 queue_wakeup.ids = queue_ids;
612 queue_wakeup.ids_count = queue_count;
613
614 while (ioctl( one_fd, ONE_QUEUE_WAKEUP, &queue_wakeup )) {
615 switch (errno) {
616 case EINTR:
617 continue;
618 default:
619 break;
620 }
621
622 ret = errno2result( errno );
623
624 D_PERROR( "One/Queue: ONE_QUEUE_WAKEUP failed!\n" );
625
626 break;
627 }
628
629 return ret;
630 }
631
632 DirectResult
OneQueue_SetName(OneQID queue_id,const char * name)633 OneQueue_SetName( OneQID queue_id,
634 const char *name )
635 {
636 DirectResult ret = DR_OK;
637 OneEntryInfo info;
638
639 D_DEBUG_AT( One_Queue, "%s( id %u, name '%s' )\n", __FUNCTION__, queue_id, name );
640
641 D_ASSERT( name != NULL );
642
643 info.type = ONE_QUEUE;
644 info.id = queue_id;
645
646 direct_snputs( info.name, name, ONE_ENTRY_INFO_NAME_LENGTH );
647
648 while (ioctl( one_fd, ONE_ENTRY_SET_INFO, &info )) {
649 switch (errno) {
650 case EINTR:
651 continue;
652 default:
653 break;
654 }
655
656 ret = errno2result( errno );
657
658 D_PERROR( "One/Queue: ONE_ENTRY_SET_INFO failed!\n" );
659
660 break;
661 }
662
663 return ret;
664 }
665
666 /*********************************************************************************************************************/
667
668 #define RECEIVE_BUFFER_SIZE 65536
669
670 /**********************************************************************************************************************/
671
672 typedef struct {
673 int magic;
674
675 OneQID queue_id;
676
677 OneThreadDispatch dispatch;
678 void *context;
679 } AddedQueue;
680
681 struct __One_OneThread {
682 int magic;
683
684 DirectMutex lock;
685
686 bool stop;
687
688 DirectThread *thread;
689
690 DirectHash *queues;
691
692 OneQID *queue_ids;
693 unsigned int queue_count;
694
695 unsigned int queues_age;
696 };
697
698 static void *
OneThread_Dispatcher(DirectThread * thread,void * arg)699 OneThread_Dispatcher( DirectThread *thread,
700 void *arg )
701 {
702 DirectResult ret;
703 OneThread *data = arg;
704 char *buf;
705 OneQID *ids = NULL;
706 unsigned int ids_capacity = 1000;
707 unsigned int ids_count = 0;
708 unsigned int ids_age = 0;
709
710 D_DEBUG_AT( One_Thread, "%s()\n", __FUNCTION__ );
711
712 buf = D_MALLOC( RECEIVE_BUFFER_SIZE );
713 if (!buf) {
714 D_OOM();
715 return NULL;
716 }
717
718 ids = D_MALLOC( sizeof(OneQID) * ids_capacity );
719 if (!ids) {
720 D_OOM();
721 return NULL;
722 }
723
724 while (!data->stop) {
725 size_t length;
726 size_t offset;
727
728 direct_thread_lock( thread );
729
730 if (data->queues_age != ids_age) {
731 if (!data->queue_count) {
732 // FIXME: use wait queue
733 usleep( 10000 );
734
735 direct_thread_unlock( thread );
736 continue;
737 }
738
739 ids_age = data->queues_age;
740 ids_count = data->queue_count;
741
742 if (ids_count > ids_capacity) {
743 while (ids_count > ids_capacity)
744 ids_capacity *= 2;
745
746 D_FREE( ids );
747
748 D_ASSERT( data->queue_count > 0 );
749
750 ids = D_MALLOC( sizeof(OneQID) * ids_capacity );
751 if (!ids) {
752 D_OOM();
753 direct_thread_unlock( thread );
754 return NULL;
755 }
756 }
757
758 direct_memcpy( ids, data->queue_ids, sizeof(OneQID) * data->queue_count );
759 }
760
761 direct_thread_unlock( thread );
762
763 ret = OneQueue_Receive( ids, ids_count, buf, RECEIVE_BUFFER_SIZE, &length, false, 0 );
764 if (ret) {
765 D_DERROR( ret, "IComaComponent/One: Could not receive from Component Queue!\n" );
766 break;
767 }
768
769 D_DEBUG_AT( One_Thread, "%s() -> received %zu bytes\n", __FUNCTION__, length );
770
771 for (offset=0; offset < length; ) {
772 AddedQueue *queue;
773 OnePacketHeader *header = (OnePacketHeader *)(buf + offset);
774 size_t size = header->uncompressed;
775
776 D_DEBUG_AT( One_Thread, " -> size %zu\n", size );
777
778 offset += sizeof(OnePacketHeader) + size;
779
780 if (offset > length) {
781 D_WARN( "invalid packet (offset %zu, length %zu)", offset, length );
782 continue;
783 }
784
785 direct_thread_lock( thread );
786
787 queue = direct_hash_lookup( data->queues, header->queue_id );
788
789 direct_thread_unlock( thread );
790
791 if (queue) {
792 D_MAGIC_ASSERT( queue, AddedQueue );
793
794 D_DEBUG_AT( One_Thread, " -> added queue %p, dispatch %p\n", queue, queue->dispatch );
795
796 queue->dispatch( queue->context, header, header + 1, data );
797 }
798 }
799 }
800
801 D_FREE( buf );
802
803 return NULL;
804 }
805
806 DirectResult
OneThread_Create(const char * name,OneThread ** ret_thread)807 OneThread_Create( const char *name,
808 OneThread **ret_thread )
809 {
810 DirectResult ret;
811 OneThread *thread;
812
813 D_DEBUG_AT( One_Thread, "%s( '%s' )\n", __FUNCTION__, name );
814
815 D_ASSERT( name != NULL );
816 D_ASSERT( ret_thread != NULL );
817
818 thread = D_CALLOC( 1, sizeof(OneThread) );
819 if (!thread)
820 return D_OOM();
821
822
823 // FIXME: Get rid of id array management when adding more queue connection support (attach)
824
825 thread->queue_ids = D_MALLOC( sizeof(OneQID) );
826 if (!thread->queue_ids) {
827 ret = D_OOM();
828 goto error;
829 }
830
831 ret = OneQueue_New( ONE_QUEUE_NO_FLAGS, ONE_QID_NONE, &thread->queue_ids[0] );
832 if (ret)
833 goto error;
834
835 thread->queue_count = 1;
836 thread->queues_age = 1;
837
838
839 ret = direct_hash_create( 23, &thread->queues );
840 if (ret)
841 goto error_hash_create;
842
843 direct_mutex_init( &thread->lock );
844
845 *ret_thread = thread;
846
847 thread->thread = direct_thread_create( DTT_DEFAULT, OneThread_Dispatcher, thread, name );
848
849 return DR_OK;
850
851
852 error_hash_create:
853 OneQueue_Destroy( thread->queue_ids[0] );
854
855 error:
856 if (thread->queue_ids)
857 D_FREE( thread->queue_ids );
858
859 D_FREE( thread );
860
861 return ret;
862 }
863
864 void
OneThread_Destroy(OneThread * thread)865 OneThread_Destroy( OneThread *thread )
866 {
867 D_DEBUG_AT( One_Thread, "%s()\n", __FUNCTION__ );
868
869 }
870
871 DirectResult
OneThread_AddQueue(OneThread * thread,OneQID queue_id,OneThreadDispatch dispatch,void * context)872 OneThread_AddQueue( OneThread *thread,
873 OneQID queue_id,
874 OneThreadDispatch dispatch,
875 void *context )
876 {
877 AddedQueue *queue;
878
879 D_DEBUG_AT( One_Thread, "%s()\n", __FUNCTION__ );
880
881 direct_thread_lock( thread->thread );
882
883 queue = direct_hash_lookup( thread->queues, queue_id );
884 if (queue) {
885 direct_thread_unlock( thread->thread );
886 return DR_BUSY;
887 }
888
889 queue = D_CALLOC( 1, sizeof(AddedQueue) );
890 if (!queue) {
891 direct_thread_unlock( thread->thread );
892 return D_OOM();
893 }
894
895 queue->queue_id = queue_id;
896 queue->dispatch = dispatch;
897 queue->context = context;
898
899 D_MAGIC_SET( queue, AddedQueue );
900
901 direct_hash_insert( thread->queues, queue_id, queue );
902
903 thread->queue_ids = D_REALLOC( thread->queue_ids, sizeof(OneQID) * (thread->queue_count + 1) );
904
905 thread->queue_ids[thread->queue_count] = queue_id;
906
907 thread->queue_count++;
908
909 thread->queues_age++;
910
911 direct_thread_unlock( thread->thread );
912
913 OneQueue_WakeUp( &thread->queue_ids[0], 1 );
914
915 return DR_OK;
916 }
917
918 DirectResult
OneThread_RemoveQueue(OneThread * thread,OneQID queue_id)919 OneThread_RemoveQueue( OneThread *thread,
920 OneQID queue_id )
921 {
922 AddedQueue *queue;
923 unsigned int i;
924
925 D_DEBUG_AT( One_Thread, "%s()\n", __FUNCTION__ );
926
927 direct_thread_lock( thread->thread );
928
929 queue = direct_hash_lookup( thread->queues, queue_id );
930 if (!queue) {
931 direct_mutex_unlock( &thread->lock );
932 return DR_ITEMNOTFOUND;
933 }
934
935 direct_hash_remove( thread->queues, queue_id );
936
937 for (i=0; i<thread->queue_count; i++) {
938 if (thread->queue_ids[i] == queue_id)
939 break;
940 }
941
942 if (i >= thread->queue_count)
943 D_BUG( "queue_ids are missing element" );
944
945 for (i++; i<thread->queue_count; i++)
946 thread->queue_ids[i-1] = thread->queue_ids[i];
947
948 thread->queue_ids = D_REALLOC( thread->queue_ids, sizeof(OneQID) * thread->queue_count );
949
950 thread->queue_count--;
951
952 thread->queues_age++;
953
954 direct_mutex_unlock( &thread->lock );
955
956 OneQueue_WakeUp( &thread->queue_ids[0], 1 );
957
958 D_MAGIC_CLEAR( queue );
959
960 D_FREE( queue );
961
962 return DR_OK;
963 }
964
965