1 #include <stdlib.h>
2 #include "asm.h"
3 #include "lwp_messages.h"
4 #include "lwp_wkspace.h"
5
__lwpmq_msg_insert(mq_cntrl * mqueue,mq_buffercntrl * msg,u32 type)6 void __lwpmq_msg_insert(mq_cntrl *mqueue,mq_buffercntrl *msg,u32 type)
7 {
8 ++mqueue->num_pendingmsgs;
9 msg->prio = type;
10
11 #ifdef _LWPMQ_DEBUG
12 printf("__lwpmq_msg_insert(%p,%p,%d)\n",mqueue,msg,type);
13 #endif
14
15 switch(type) {
16 case LWP_MQ_SEND_REQUEST:
17 __lwpmq_msg_append(mqueue,msg);
18 break;
19 case LWP_MQ_SEND_URGENT:
20 __lwpmq_msg_prepend(mqueue,msg);
21 break;
22 default:
23 {
24 mq_buffercntrl *tmsg;
25 lwp_node *node;
26 lwp_queue *header;
27
28 header = &mqueue->pending_msgs;
29 node = header->first;
30 while(!__lwp_queue_istail(header,node)) {
31 tmsg = (mq_buffercntrl*)node;
32 if(tmsg->prio<=msg->prio) {
33 node = node->next;
34 continue;
35 }
36 break;
37 }
38 __lwp_queue_insert(node->prev,&msg->node);
39 }
40 break;
41 }
42
43 if(mqueue->num_pendingmsgs==1 && mqueue->notify_handler)
44 mqueue->notify_handler(mqueue->notify_arg);
45 }
46
__lwpmq_initialize(mq_cntrl * mqueue,mq_attr * attrs,u32 max_pendingmsgs,u32 max_msgsize)47 u32 __lwpmq_initialize(mq_cntrl *mqueue,mq_attr *attrs,u32 max_pendingmsgs,u32 max_msgsize)
48 {
49 u32 alloc_msgsize;
50 u32 buffering_req;
51
52 #ifdef _LWPMQ_DEBUG
53 printf("__lwpmq_initialize(%p,%p,%d,%d)\n",mqueue,attrs,max_pendingmsgs,max_msgsize);
54 #endif
55 mqueue->max_pendingmsgs = max_pendingmsgs;
56 mqueue->num_pendingmsgs = 0;
57 mqueue->max_msgsize = max_msgsize;
58 __lwpmq_set_notify(mqueue,NULL,NULL);
59
60 alloc_msgsize = max_msgsize;
61 if(alloc_msgsize&(sizeof(u32)-1))
62 alloc_msgsize = (alloc_msgsize+sizeof(u32))&~(sizeof(u32)-1);
63
64 buffering_req = max_pendingmsgs*(alloc_msgsize+sizeof(mq_buffercntrl));
65 mqueue->msq_buffers = (mq_buffer*)__lwp_wkspace_allocate(buffering_req);
66
67 if(!mqueue->msq_buffers) return 0;
68
69 __lwp_queue_initialize(&mqueue->inactive_msgs,mqueue->msq_buffers,max_pendingmsgs,(alloc_msgsize+sizeof(mq_buffercntrl)));
70 __lwp_queue_init_empty(&mqueue->pending_msgs);
71 __lwp_threadqueue_init(&mqueue->wait_queue,__lwpmq_is_priority(attrs)?LWP_THREADQ_MODEPRIORITY:LWP_THREADQ_MODEFIFO,LWP_STATES_WAITING_FOR_MESSAGE,LWP_MQ_STATUS_TIMEOUT);
72
73 return 1;
74 }
75
__lwpmq_seize(mq_cntrl * mqueue,u32 id,void * buffer,u32 * size,u32 wait,u64 timeout)76 u32 __lwpmq_seize(mq_cntrl *mqueue,u32 id,void *buffer,u32 *size,u32 wait,u64 timeout)
77 {
78 u32 level;
79 mq_buffercntrl *msg;
80 lwp_cntrl *exec,*thread;
81
82 exec = _thr_executing;
83 exec->wait.ret_code = LWP_MQ_STATUS_SUCCESSFUL;
84 #ifdef _LWPMQ_DEBUG
85 printf("__lwpmq_seize(%p,%d,%p,%p,%d,%d)\n",mqueue,id,buffer,size,wait,mqueue->num_pendingmsgs);
86 #endif
87
88 _CPU_ISR_Disable(level);
89 if(mqueue->num_pendingmsgs!=0) {
90 --mqueue->num_pendingmsgs;
91 msg = __lwpmq_get_pendingmsg(mqueue);
92 _CPU_ISR_Restore(level);
93
94 *size = msg->contents.size;
95 exec->wait.cnt = msg->prio;
96 __lwpmq_buffer_copy(buffer,msg->contents.buffer,*size);
97
98 thread = __lwp_threadqueue_dequeue(&mqueue->wait_queue);
99 if(!thread) {
100 __lwpmq_free_msg(mqueue,msg);
101 return LWP_MQ_STATUS_SUCCESSFUL;
102 }
103
104 msg->prio = thread->wait.cnt;
105 msg->contents.size = (u32)thread->wait.ret_arg_1;
106 __lwpmq_buffer_copy(msg->contents.buffer,thread->wait.ret_arg,msg->contents.size);
107
108 __lwpmq_msg_insert(mqueue,msg,msg->prio);
109 return LWP_MQ_STATUS_SUCCESSFUL;
110 }
111
112 if(!wait) {
113 _CPU_ISR_Restore(level);
114 exec->wait.ret_code = LWP_MQ_STATUS_UNSATISFIED_NOWAIT;
115 return LWP_MQ_STATUS_UNSATISFIED_NOWAIT;
116 }
117
118 __lwp_threadqueue_csenter(&mqueue->wait_queue);
119 exec->wait.queue = &mqueue->wait_queue;
120 exec->wait.id = id;
121 exec->wait.ret_arg = (void*)buffer;
122 exec->wait.ret_arg_1 = (void*)size;
123 _CPU_ISR_Restore(level);
124
125 __lwp_threadqueue_enqueue(&mqueue->wait_queue,timeout);
126 return LWP_MQ_STATUS_SUCCESSFUL;
127 }
128
__lwpmq_submit(mq_cntrl * mqueue,u32 id,void * buffer,u32 size,u32 type,u32 wait,u64 timeout)129 u32 __lwpmq_submit(mq_cntrl *mqueue,u32 id,void *buffer,u32 size,u32 type,u32 wait,u64 timeout)
130 {
131 u32 level;
132 lwp_cntrl *thread;
133 mq_buffercntrl *msg;
134
135 #ifdef _LWPMQ_DEBUG
136 printf("__lwpmq_submit(%p,%p,%d,%d,%d,%d)\n",mqueue,buffer,size,id,type,wait);
137 #endif
138 if(size>mqueue->max_msgsize)
139 return LWP_MQ_STATUS_INVALID_SIZE;
140
141 if(mqueue->num_pendingmsgs==0) {
142 thread = __lwp_threadqueue_dequeue(&mqueue->wait_queue);
143 if(thread) {
144 __lwpmq_buffer_copy(thread->wait.ret_arg,buffer,size);
145 *(u32*)thread->wait.ret_arg_1 = size;
146 thread->wait.cnt = type;
147 return LWP_MQ_STATUS_SUCCESSFUL;
148 }
149 }
150
151 if(mqueue->num_pendingmsgs<mqueue->max_pendingmsgs) {
152 msg = __lwpmq_allocate_msg(mqueue);
153 if(!msg) return LWP_MQ_STATUS_UNSATISFIED;
154
155 __lwpmq_buffer_copy(msg->contents.buffer,buffer,size);
156 msg->contents.size = size;
157 msg->prio = type;
158 __lwpmq_msg_insert(mqueue,msg,type);
159 return LWP_MQ_STATUS_SUCCESSFUL;
160 }
161
162 if(!wait) return LWP_MQ_STATUS_TOO_MANY;
163 if(__lwp_isr_in_progress()) return LWP_MQ_STATUS_UNSATISFIED;
164
165 {
166 lwp_cntrl *exec = _thr_executing;
167
168 _CPU_ISR_Disable(level);
169 __lwp_threadqueue_csenter(&mqueue->wait_queue);
170 exec->wait.queue = &mqueue->wait_queue;
171 exec->wait.id = id;
172 exec->wait.ret_arg = (void*)buffer;
173 exec->wait.ret_arg_1 = (void*)size;
174 exec->wait.cnt = type;
175 _CPU_ISR_Restore(level);
176
177 __lwp_threadqueue_enqueue(&mqueue->wait_queue,timeout);
178 }
179 return LWP_MQ_STATUS_UNSATISFIED_WAIT;
180 }
181
__lwpmq_broadcast(mq_cntrl * mqueue,void * buffer,u32 size,u32 id,u32 * count)182 u32 __lwpmq_broadcast(mq_cntrl *mqueue,void *buffer,u32 size,u32 id,u32 *count)
183 {
184 lwp_cntrl *thread;
185 u32 num_broadcast;
186 lwp_waitinfo *waitp;
187 u32 rsize;
188 #ifdef _LWPMQ_DEBUG
189 printf("__lwpmq_broadcast(%p,%p,%d,%d,%p)\n",mqueue,buffer,size,id,count);
190 #endif
191 if(mqueue->num_pendingmsgs!=0) {
192 *count = 0;
193 return LWP_MQ_STATUS_SUCCESSFUL;
194 }
195
196 num_broadcast = 0;
197 while((thread=__lwp_threadqueue_dequeue(&mqueue->wait_queue))) {
198 waitp = &thread->wait;
199 ++num_broadcast;
200
201 rsize = size;
202 if(size>mqueue->max_msgsize)
203 rsize = mqueue->max_msgsize;
204
205 __lwpmq_buffer_copy(waitp->ret_arg,buffer,rsize);
206 *(u32*)waitp->ret_arg_1 = size;
207 }
208 *count = num_broadcast;
209 return LWP_MQ_STATUS_SUCCESSFUL;
210 }
211
__lwpmq_close(mq_cntrl * mqueue,u32 status)212 void __lwpmq_close(mq_cntrl *mqueue,u32 status)
213 {
214 __lwp_threadqueue_flush(&mqueue->wait_queue,status);
215 __lwpmq_flush_support(mqueue);
216 __lwp_wkspace_free(mqueue->msq_buffers);
217 }
218
__lwpmq_flush(mq_cntrl * mqueue)219 u32 __lwpmq_flush(mq_cntrl *mqueue)
220 {
221 if(mqueue->num_pendingmsgs!=0)
222 return __lwpmq_flush_support(mqueue);
223 else
224 return 0;
225 }
226
__lwpmq_flush_support(mq_cntrl * mqueue)227 u32 __lwpmq_flush_support(mq_cntrl *mqueue)
228 {
229 u32 level;
230 lwp_node *inactive;
231 lwp_node *mqueue_first;
232 lwp_node *mqueue_last;
233 u32 cnt;
234
235 _CPU_ISR_Disable(level);
236
237 inactive = mqueue->inactive_msgs.first;
238 mqueue_first = mqueue->pending_msgs.first;
239 mqueue_last = mqueue->pending_msgs.last;
240
241 mqueue->inactive_msgs.first = mqueue_first;
242 mqueue_last->next = inactive;
243 inactive->prev = mqueue_last;
244 mqueue_first->prev = __lwp_queue_head(&mqueue->inactive_msgs);
245
246 __lwp_queue_init_empty(&mqueue->pending_msgs);
247
248 cnt = mqueue->num_pendingmsgs;
249 mqueue->num_pendingmsgs = 0;
250
251 _CPU_ISR_Restore(level);
252 return cnt;
253 }
254
__lwpmq_flush_waitthreads(mq_cntrl * mqueue)255 void __lwpmq_flush_waitthreads(mq_cntrl *mqueue)
256 {
257 __lwp_threadqueue_flush(&mqueue->wait_queue,LWP_MQ_STATUS_UNSATISFIED_NOWAIT);
258 }
259