1 #include <stdlib.h>
2 #include "asm.h"
3 #include "lwp_messages.h"
4 #include "lwp_wkspace.h"
5 
__lwpmq_msg_insert(mq_cntrl * mqueue,mq_buffercntrl * msg,u32 type)6 void __lwpmq_msg_insert(mq_cntrl *mqueue,mq_buffercntrl *msg,u32 type)
7 {
8 	++mqueue->num_pendingmsgs;
9 	msg->prio = type;
10 
11 #ifdef _LWPMQ_DEBUG
12 	printf("__lwpmq_msg_insert(%p,%p,%d)\n",mqueue,msg,type);
13 #endif
14 
15 	switch(type) {
16 		case LWP_MQ_SEND_REQUEST:
17 			__lwpmq_msg_append(mqueue,msg);
18 			break;
19 		case LWP_MQ_SEND_URGENT:
20 			__lwpmq_msg_prepend(mqueue,msg);
21 			break;
22 		default:
23 		{
24 			mq_buffercntrl *tmsg;
25 			lwp_node *node;
26 			lwp_queue *header;
27 
28 			header = &mqueue->pending_msgs;
29 			node = header->first;
30 			while(!__lwp_queue_istail(header,node)) {
31 				tmsg = (mq_buffercntrl*)node;
32 				if(tmsg->prio<=msg->prio) {
33 					node = node->next;
34 					continue;
35 				}
36 				break;
37 			}
38 			__lwp_queue_insert(node->prev,&msg->node);
39 		}
40 		break;
41 	}
42 
43 	if(mqueue->num_pendingmsgs==1 && mqueue->notify_handler)
44 		mqueue->notify_handler(mqueue->notify_arg);
45 }
46 
__lwpmq_initialize(mq_cntrl * mqueue,mq_attr * attrs,u32 max_pendingmsgs,u32 max_msgsize)47 u32 __lwpmq_initialize(mq_cntrl *mqueue,mq_attr *attrs,u32 max_pendingmsgs,u32 max_msgsize)
48 {
49 	u32 alloc_msgsize;
50 	u32 buffering_req;
51 
52 #ifdef _LWPMQ_DEBUG
53 	printf("__lwpmq_initialize(%p,%p,%d,%d)\n",mqueue,attrs,max_pendingmsgs,max_msgsize);
54 #endif
55 	mqueue->max_pendingmsgs = max_pendingmsgs;
56 	mqueue->num_pendingmsgs = 0;
57 	mqueue->max_msgsize = max_msgsize;
58 	__lwpmq_set_notify(mqueue,NULL,NULL);
59 
60 	alloc_msgsize = max_msgsize;
61 	if(alloc_msgsize&(sizeof(u32)-1))
62 		alloc_msgsize = (alloc_msgsize+sizeof(u32))&~(sizeof(u32)-1);
63 
64 	buffering_req = max_pendingmsgs*(alloc_msgsize+sizeof(mq_buffercntrl));
65 	mqueue->msq_buffers = (mq_buffer*)__lwp_wkspace_allocate(buffering_req);
66 
67 	if(!mqueue->msq_buffers) return 0;
68 
69 	__lwp_queue_initialize(&mqueue->inactive_msgs,mqueue->msq_buffers,max_pendingmsgs,(alloc_msgsize+sizeof(mq_buffercntrl)));
70 	__lwp_queue_init_empty(&mqueue->pending_msgs);
71 	__lwp_threadqueue_init(&mqueue->wait_queue,__lwpmq_is_priority(attrs)?LWP_THREADQ_MODEPRIORITY:LWP_THREADQ_MODEFIFO,LWP_STATES_WAITING_FOR_MESSAGE,LWP_MQ_STATUS_TIMEOUT);
72 
73 	return 1;
74 }
75 
__lwpmq_seize(mq_cntrl * mqueue,u32 id,void * buffer,u32 * size,u32 wait,u64 timeout)76 u32 __lwpmq_seize(mq_cntrl *mqueue,u32 id,void *buffer,u32 *size,u32 wait,u64 timeout)
77 {
78 	u32 level;
79 	mq_buffercntrl *msg;
80 	lwp_cntrl *exec,*thread;
81 
82 	exec = _thr_executing;
83 	exec->wait.ret_code = LWP_MQ_STATUS_SUCCESSFUL;
84 #ifdef _LWPMQ_DEBUG
85 	printf("__lwpmq_seize(%p,%d,%p,%p,%d,%d)\n",mqueue,id,buffer,size,wait,mqueue->num_pendingmsgs);
86 #endif
87 
88 	_CPU_ISR_Disable(level);
89 	if(mqueue->num_pendingmsgs!=0) {
90 		--mqueue->num_pendingmsgs;
91 		msg = __lwpmq_get_pendingmsg(mqueue);
92 		_CPU_ISR_Restore(level);
93 
94 		*size = msg->contents.size;
95 		exec->wait.cnt = msg->prio;
96 		__lwpmq_buffer_copy(buffer,msg->contents.buffer,*size);
97 
98 		thread = __lwp_threadqueue_dequeue(&mqueue->wait_queue);
99 		if(!thread) {
100 			__lwpmq_free_msg(mqueue,msg);
101 			return LWP_MQ_STATUS_SUCCESSFUL;
102 		}
103 
104 		msg->prio = thread->wait.cnt;
105 		msg->contents.size = (u32)thread->wait.ret_arg_1;
106 		__lwpmq_buffer_copy(msg->contents.buffer,thread->wait.ret_arg,msg->contents.size);
107 
108 		__lwpmq_msg_insert(mqueue,msg,msg->prio);
109 		return LWP_MQ_STATUS_SUCCESSFUL;
110 	}
111 
112 	if(!wait) {
113 		_CPU_ISR_Restore(level);
114 		exec->wait.ret_code = LWP_MQ_STATUS_UNSATISFIED_NOWAIT;
115 		return LWP_MQ_STATUS_UNSATISFIED_NOWAIT;
116 	}
117 
118 	__lwp_threadqueue_csenter(&mqueue->wait_queue);
119 	exec->wait.queue = &mqueue->wait_queue;
120 	exec->wait.id = id;
121 	exec->wait.ret_arg = (void*)buffer;
122 	exec->wait.ret_arg_1 = (void*)size;
123 	_CPU_ISR_Restore(level);
124 
125 	__lwp_threadqueue_enqueue(&mqueue->wait_queue,timeout);
126 	return LWP_MQ_STATUS_SUCCESSFUL;
127 }
128 
__lwpmq_submit(mq_cntrl * mqueue,u32 id,void * buffer,u32 size,u32 type,u32 wait,u64 timeout)129 u32 __lwpmq_submit(mq_cntrl *mqueue,u32 id,void *buffer,u32 size,u32 type,u32 wait,u64 timeout)
130 {
131 	u32 level;
132 	lwp_cntrl *thread;
133 	mq_buffercntrl *msg;
134 
135 #ifdef _LWPMQ_DEBUG
136 	printf("__lwpmq_submit(%p,%p,%d,%d,%d,%d)\n",mqueue,buffer,size,id,type,wait);
137 #endif
138 	if(size>mqueue->max_msgsize)
139 		return LWP_MQ_STATUS_INVALID_SIZE;
140 
141 	if(mqueue->num_pendingmsgs==0) {
142 		thread = __lwp_threadqueue_dequeue(&mqueue->wait_queue);
143 		if(thread) {
144 			__lwpmq_buffer_copy(thread->wait.ret_arg,buffer,size);
145 			*(u32*)thread->wait.ret_arg_1 = size;
146 			thread->wait.cnt = type;
147 			return LWP_MQ_STATUS_SUCCESSFUL;
148 		}
149 	}
150 
151 	if(mqueue->num_pendingmsgs<mqueue->max_pendingmsgs) {
152 		msg = __lwpmq_allocate_msg(mqueue);
153 		if(!msg) return LWP_MQ_STATUS_UNSATISFIED;
154 
155 		__lwpmq_buffer_copy(msg->contents.buffer,buffer,size);
156 		msg->contents.size = size;
157 		msg->prio = type;
158 		__lwpmq_msg_insert(mqueue,msg,type);
159 		return LWP_MQ_STATUS_SUCCESSFUL;
160 	}
161 
162 	if(!wait) return LWP_MQ_STATUS_TOO_MANY;
163 	if(__lwp_isr_in_progress()) return LWP_MQ_STATUS_UNSATISFIED;
164 
165 	{
166 		lwp_cntrl *exec = _thr_executing;
167 
168 		_CPU_ISR_Disable(level);
169 		__lwp_threadqueue_csenter(&mqueue->wait_queue);
170 		exec->wait.queue = &mqueue->wait_queue;
171 		exec->wait.id = id;
172 		exec->wait.ret_arg = (void*)buffer;
173 		exec->wait.ret_arg_1 = (void*)size;
174 		exec->wait.cnt = type;
175 		_CPU_ISR_Restore(level);
176 
177 		__lwp_threadqueue_enqueue(&mqueue->wait_queue,timeout);
178 	}
179 	return LWP_MQ_STATUS_UNSATISFIED_WAIT;
180 }
181 
__lwpmq_broadcast(mq_cntrl * mqueue,void * buffer,u32 size,u32 id,u32 * count)182 u32 __lwpmq_broadcast(mq_cntrl *mqueue,void *buffer,u32 size,u32 id,u32 *count)
183 {
184 	lwp_cntrl *thread;
185 	u32 num_broadcast;
186 	lwp_waitinfo *waitp;
187 	u32 rsize;
188 #ifdef _LWPMQ_DEBUG
189 	printf("__lwpmq_broadcast(%p,%p,%d,%d,%p)\n",mqueue,buffer,size,id,count);
190 #endif
191 	if(mqueue->num_pendingmsgs!=0) {
192 		*count = 0;
193 		return LWP_MQ_STATUS_SUCCESSFUL;
194 	}
195 
196 	num_broadcast = 0;
197 	while((thread=__lwp_threadqueue_dequeue(&mqueue->wait_queue))) {
198 		waitp = &thread->wait;
199 		++num_broadcast;
200 
201 		rsize = size;
202 		if(size>mqueue->max_msgsize)
203 			rsize = mqueue->max_msgsize;
204 
205 		__lwpmq_buffer_copy(waitp->ret_arg,buffer,rsize);
206 		*(u32*)waitp->ret_arg_1 = size;
207 	}
208 	*count = num_broadcast;
209 	return LWP_MQ_STATUS_SUCCESSFUL;
210 }
211 
__lwpmq_close(mq_cntrl * mqueue,u32 status)212 void __lwpmq_close(mq_cntrl *mqueue,u32 status)
213 {
214 	__lwp_threadqueue_flush(&mqueue->wait_queue,status);
215 	__lwpmq_flush_support(mqueue);
216 	__lwp_wkspace_free(mqueue->msq_buffers);
217 }
218 
__lwpmq_flush(mq_cntrl * mqueue)219 u32 __lwpmq_flush(mq_cntrl *mqueue)
220 {
221 	if(mqueue->num_pendingmsgs!=0)
222 		return __lwpmq_flush_support(mqueue);
223 	else
224 		return 0;
225 }
226 
__lwpmq_flush_support(mq_cntrl * mqueue)227 u32 __lwpmq_flush_support(mq_cntrl *mqueue)
228 {
229 	u32 level;
230 	lwp_node *inactive;
231 	lwp_node *mqueue_first;
232 	lwp_node *mqueue_last;
233 	u32 cnt;
234 
235 	_CPU_ISR_Disable(level);
236 
237 	inactive = mqueue->inactive_msgs.first;
238 	mqueue_first = mqueue->pending_msgs.first;
239 	mqueue_last = mqueue->pending_msgs.last;
240 
241 	mqueue->inactive_msgs.first = mqueue_first;
242 	mqueue_last->next = inactive;
243 	inactive->prev = mqueue_last;
244 	mqueue_first->prev = __lwp_queue_head(&mqueue->inactive_msgs);
245 
246 	__lwp_queue_init_empty(&mqueue->pending_msgs);
247 
248 	cnt = mqueue->num_pendingmsgs;
249 	mqueue->num_pendingmsgs = 0;
250 
251 	_CPU_ISR_Restore(level);
252 	return cnt;
253 }
254 
__lwpmq_flush_waitthreads(mq_cntrl * mqueue)255 void __lwpmq_flush_waitthreads(mq_cntrl *mqueue)
256 {
257 	__lwp_threadqueue_flush(&mqueue->wait_queue,LWP_MQ_STATUS_UNSATISFIED_NOWAIT);
258 }
259