1 /*
2 * E-UAE - The portable Amiga Emulator
3 *
4 * Thread and semaphore support using AmigaOS processes
5 *
6 * Copyright 2003-2005 Richard Drummond
7 */
8
9 #include "sysconfig.h"
10 #include "sysdeps.h"
11
12 #include <exec/exec.h>
13 #include <exec/ports.h>
14 #include <exec/lists.h>
15 #include <dos/dosextens.h>
16 #include <dos/dostags.h>
17
18 #include <proto/exec.h>
19 #include <proto/dos.h>
20 #include <proto/timer.h>
21 #include <clib/alib_protos.h>
22
23 #include "threaddep/thread.h"
24
25 /*
26 * Handle CreateNewProc() differences between AmigaOS-like systems
27 */
28 #ifdef __MORPHOS__
29 /* CreateNewProc() on MorphOS needs to be told that code is PPC */
30 # define myCreateNewProcTags(...) CreateNewProcTags(NP_CodeType, CODETYPE_PPC, __VA_ARGS__)
31 #else
32 # ifdef __amigaos4__
33 /* On OS4, we assert that the threads we create are our children */
34 # define myCreateNewProcTags(...) CreateNewProcTags(NP_Child, TRUE, __VA_ARGS__)
35 # else
36 # define myCreateNewProcTags CreateNewProcTags
37 # endif
38 #endif
39
40 /*
41 * Simple emulation of POSIX semaphores using message-passing
42 * with a proxy task.
43 *
44 */
45
46 static struct Process *proxy_thread;
47 static struct MsgPort *proxy_msgport;
48
49 static int sem_count;
50
51 /*
52 * Message packet which is sent to the proxy thread
53 * to effect blocking on a semaphore
54 */
55
56 /*
57 * Message types
58 */
59 typedef enum {
60 PROXY_MSG_BLOCK,
61 PROXY_MSG_UNBLOCK,
62 PROXY_MSG_DESTROY,
63 } ProxyMsgType;
64
65 struct ProxyMsg {
66 struct Message msg;
67 struct PSemaphore *sem;
68 struct Task *reply_task;
69 ProxyMsgType type;
70 };
71
72 /* Proxy replies to message sender with this signal */
73 #define PROXY_MSG_REPLY_SIG SIGBREAKF_CTRL_F
74
init_proxy_msg(struct ProxyMsg * msg,ProxyMsgType type,struct PSemaphore * sem)75 STATIC_INLINE void init_proxy_msg (struct ProxyMsg *msg, ProxyMsgType type, struct PSemaphore *sem)
76 {
77 memset (msg, 0, sizeof (struct ProxyMsg));
78
79 msg->msg.mn_Length = sizeof (struct ProxyMsg);
80 msg->sem = sem;
81 msg->type = type;
82 msg->reply_task = FindTask (NULL);
83 }
84
put_proxy_msg(struct ProxyMsg * msg)85 STATIC_INLINE void put_proxy_msg (struct ProxyMsg *msg)
86 {
87 PutMsg (proxy_msgport, (struct Message *)msg);
88 }
89
wait_proxy_reply(void)90 STATIC_INLINE BOOL wait_proxy_reply (void)
91 {
92 BOOL got_reply = FALSE;
93 ULONG sigs;
94
95 sigs = Wait (PROXY_MSG_REPLY_SIG | SIGBREAKF_CTRL_C);
96
97 if (sigs & PROXY_MSG_REPLY_SIG)
98 got_reply = TRUE;
99
100 return got_reply;
101 }
102
103 /*
104 * Do proxy's reply to a semaphore message
105 */
do_proxy_reply(struct ProxyMsg * msg)106 STATIC_INLINE void do_proxy_reply (struct ProxyMsg *msg)
107 {
108 ReplyMsg ((struct Message *)msg);
109
110 Signal (msg->reply_task, PROXY_MSG_REPLY_SIG);
111 }
112
113
114
115 /*
116 * Block current process on semaphore
117 */
BlockMe(struct PSemaphore * psem)118 static int BlockMe (struct PSemaphore *psem)
119 {
120 int result = -1;
121 struct ProxyMsg msg;
122
123 init_proxy_msg (&msg, PROXY_MSG_BLOCK, psem);
124
125 Forbid ();
126
127 put_proxy_msg (&msg);
128 if (wait_proxy_reply ())
129 result = 0;
130 else {
131 /* Hack - remove block request from semaphore's waiting list
132 * When the block is interrupted, the proxy won't do it */
133 Remove ((struct Node *)&msg);
134 msg.msg.mn_Node.ln_Succ = msg.msg.mn_Node.ln_Pred = NULL;
135 }
136
137 Permit ();
138
139 return result;
140 }
141
142 /*
143 * Unblock one process blocked on semaphore
144 */
UnblockOne(struct PSemaphore * psem)145 static void UnblockOne (struct PSemaphore *psem)
146 {
147 struct ProxyMsg msg;
148
149 init_proxy_msg (&msg, PROXY_MSG_UNBLOCK, psem);
150
151 put_proxy_msg (&msg);
152 wait_proxy_reply ();
153 }
154
155 /*
156 * Destroy the semaphore block
157 */
Destroy(struct PSemaphore * psem)158 static void Destroy (struct PSemaphore *psem)
159 {
160 struct ProxyMsg msg;
161
162 init_proxy_msg (&msg, PROXY_MSG_DESTROY, psem);
163
164 put_proxy_msg (&msg);
165 wait_proxy_reply ();
166 }
167
168
169
170 /**
171 ** Proxy thread
172 **/
173
handle_startup_msg(void)174 static void handle_startup_msg (void)
175 {
176 struct Process *self = (struct Process*) FindTask (NULL);
177 WaitPort (&self->pr_MsgPort);
178 ReplyMsg (GetMsg (&self->pr_MsgPort));
179 }
180
proxy_thread_main(void)181 static void proxy_thread_main (void)
182 {
183 ULONG sigmask;
184 int dont_exit = 1;
185 struct ProxyMsg *msg = NULL;
186
187 proxy_msgport = CreateMsgPort ();
188 sigmask = SIGBREAKF_CTRL_C | (1 << proxy_msgport->mp_SigBit);
189
190 /* Wait for and reply to startup msg */
191 handle_startup_msg ();
192
193 while (dont_exit) {
194 ULONG sigs = Wait (sigmask);
195
196 /*
197 * Handle any semaphore message sent us
198 */
199 while ((msg = (struct ProxyMsg *) GetMsg (proxy_msgport))) {
200 struct PSemaphore *psem = msg->sem;
201
202 switch (msg->type) {
203 case PROXY_MSG_BLOCK:
204 /* Block requesting task on semaphore.
205 *
206 * We effect the block by not replying to this message now. Just
207 * put the request in the waiting list and reply to it when we
208 * get an unblock request for this semaphore. The process requesting
209 * the block will sleep until we do (unless it is interrupted). */
210 AddTail ((struct List*)&psem->wait_queue, (struct Node *)msg);
211 break;
212
213 case PROXY_MSG_UNBLOCK:
214 /* A request to unblock a task on semaphore. */
215 {
216 struct ProxyMsg *wait_msg;
217
218 Forbid ();
219
220 /* If there's a task blocking on the semaphore, remove
221 * it from the waiting list */
222 wait_msg = (struct ProxyMsg *) RemHead ((struct List *)&psem->wait_queue);
223
224 /* Reply to the unblock request */
225 do_proxy_reply (msg);
226
227 /* Reply to the block request - this will wake up the
228 * blocked task. */
229 if (wait_msg)
230 do_proxy_reply (wait_msg);
231
232 Permit ();
233 }
234 break;
235
236 case PROXY_MSG_DESTROY:
237 Forbid ();
238 {
239 struct ProxyMsg *wait_msg;
240
241 while ((wait_msg = (struct ProxyMsg *) RemHead ((struct List *)&psem->wait_queue))) {
242 /* Break the block for all tasks blocked on the semaphore */
243 Signal (wait_msg->reply_task, SIGBREAKF_CTRL_C);
244 }
245 }
246 do_proxy_reply (msg);
247 Permit ();
248 break;
249 } /* switch (msg->type) */
250 } /* while */
251
252 /*
253 * Check for break signal and exit if received
254 */
255 if ((sigs & SIGBREAKF_CTRL_C) != 0)
256 dont_exit = FALSE;
257 } /* while (dont_exit) */
258
259 /*
260 * Clean up and exit
261 */
262 Forbid ();
263 {
264 struct ProxyMsg *rem_msg;
265
266 /* Reply to any unanswered requests */
267 while ((rem_msg = (struct ProxyMsg *) GetMsg (proxy_msgport)) != NULL)
268 do_proxy_reply (rem_msg);
269
270 /* reply to the quit msg */
271 if (msg != NULL)
272 do_proxy_reply (msg);
273
274 DeleteMsgPort (proxy_msgport);
275 proxy_msgport = 0;
276 proxy_thread = 0;
277 }
278 Permit ();
279 }
280
stop_proxy_thread(void)281 static void stop_proxy_thread (void)
282 {
283 if (proxy_thread)
284 Signal ((struct Task *)proxy_thread, SIGBREAKF_CTRL_C);
285 }
286
287 /*
288 * Start proxy thread
289 */
start_proxy_thread(void)290 static int start_proxy_thread (void)
291 {
292 int result = -1;
293 struct MsgPort *replyport = CreateMsgPort();
294 struct Process *p;
295
296 if (replyport) {
297 p = myCreateNewProcTags (NP_Name, (ULONG) "E-UAE semaphore proxy",
298 NP_Priority, 10,
299 NP_StackSize, 2048,
300 NP_Entry, (ULONG) proxy_thread_main,
301 TAG_DONE);
302 if (p) {
303 /* Send startup message */
304 struct Message msg;
305 msg.mn_ReplyPort = replyport;
306 msg.mn_Length = sizeof msg;
307 PutMsg (&p->pr_MsgPort, (struct Message*)&msg);
308 WaitPort (replyport);
309
310 proxy_thread = p;
311
312 atexit (stop_proxy_thread);
313
314 result = 0;
315 }
316 DeleteMsgPort (replyport);
317 }
318 return result;
319 }
320
321 /**
322 ** Emulation of POSIX semaphore API
323 ** (or, at least, UAE's version thereof).
324 **/
325
uae_sem_init(uae_sem_t * sem,int pshared,unsigned int value)326 int uae_sem_init (uae_sem_t *sem, int pshared, unsigned int value)
327 {
328 int result = 0;
329
330 InitSemaphore (&sem->mutex);
331 NewList ((struct List *)&sem->wait_queue);
332 sem->value = value;
333 sem->live = 1;
334
335 if (sem_count == 0)
336 result = start_proxy_thread ();
337
338 if (result != -1)
339 sem_count++;
340
341 return result;
342 }
343
uae_sem_destroy(uae_sem_t * sem)344 int uae_sem_destroy (uae_sem_t *sem)
345 {
346 ObtainSemaphore (&sem->mutex);
347
348 sem->live = 0;
349 Destroy (sem);
350
351 /* If this is the last semaphore to die,
352 * kill the proxy thread */
353 if (--sem_count == 0)
354 stop_proxy_thread ();
355
356 ReleaseSemaphore (&sem->mutex);
357
358 return 0;
359 }
360
uae_sem_wait(uae_sem_t * sem)361 int uae_sem_wait (uae_sem_t *sem)
362 {
363 int result = -1;
364
365 ObtainSemaphore (&sem->mutex);
366
367 if (sem->live) {
368 if (sem->value > 0) {
369 --sem->value;
370 result = 0;
371 } else {
372 ReleaseSemaphore (&sem->mutex);
373
374 /* Block on this semaphore by waiting for
375 * the proxy thread to reply to our lock request
376 */
377 result = BlockMe (sem);
378
379 ObtainSemaphore (&sem->mutex);
380
381 if (result != -1)
382 --sem->value;
383 }
384 }
385 ReleaseSemaphore (&sem->mutex);
386 return result;
387 }
388
uae_sem_trywait(uae_sem_t * sem)389 int uae_sem_trywait (uae_sem_t *sem)
390 {
391 int result = -1;
392
393 ObtainSemaphore (&sem->mutex);
394
395 if (sem->live) {
396 if (sem->value > 0) {
397 --sem->value;
398 result = 0;
399 }
400 }
401 ReleaseSemaphore (&sem->mutex);
402 return result;
403 }
404
uae_sem_get_value(uae_sem_t * sem)405 int uae_sem_get_value (uae_sem_t *sem)
406 {
407 int value;
408
409 ObtainSemaphore (&sem->mutex);
410 value = sem->value;
411 ReleaseSemaphore (&sem->mutex);
412
413 return value;
414 }
415
uae_sem_post(uae_sem_t * sem)416 int uae_sem_post (uae_sem_t *sem)
417 {
418 int result = -1;
419
420 ObtainSemaphore (&sem->mutex);
421
422 if (sem->live) {
423 sem->value++;
424
425 if (sem->value == 1) {
426 /* Ask the proxy to wake up a task blocked
427 * on this semaphore */
428 UnblockOne (sem);
429 }
430 result = 0;
431 }
432 ReleaseSemaphore (&sem->mutex);
433
434 return result;
435 }
436
437
438 /**
439 ** Thread support
440 **/
441
442 struct startupmsg
443 {
444 struct Message msg;
445 void *(*func) (void *);
446 void *arg;
447 };
448
do_thread(void)449 static void do_thread (void)
450 {
451 struct Process *pr = (struct Process *) FindTask (NULL);
452 struct startupmsg *msg;
453 void *(*func) (void *);
454 void *arg;
455
456 WaitPort (&pr->pr_MsgPort);
457 msg = (struct startupmsg *) GetMsg(&pr->pr_MsgPort);
458 func = msg->func;
459 arg = msg->arg;
460 ReplyMsg ((struct Message*)msg);
461
462 func (arg);
463 }
464
uae_start_thread(char * name,void * (* f)(void *),void * arg,uae_thread_id * foo)465 int uae_start_thread (char *name, void *(*f) (void *), void *arg, uae_thread_id *foo)
466 {
467 struct MsgPort *replyport = CreateMsgPort();
468
469 if (replyport) {
470 *foo = (struct Task *)myCreateNewProcTags (NP_Output, Output (),
471 NP_Input, Input (),
472 NP_Name, (ULONG) "UAE thread",
473 NP_CloseOutput, FALSE,
474 NP_CloseInput, FALSE,
475 NP_StackSize, 16384,
476 NP_Entry, (ULONG) do_thread,
477 TAG_DONE);
478
479 if(*foo) {
480 struct startupmsg msg;
481
482 msg.msg.mn_ReplyPort = replyport;
483 msg.msg.mn_Length = sizeof msg;
484 msg.func = f;
485 msg.arg = arg;
486 PutMsg (&((struct Process*)*foo)->pr_MsgPort, (struct Message*)&msg);
487 WaitPort (replyport);
488 }
489 DeleteMsgPort (replyport);
490 }
491
492 return *foo!=0;
493 }
494
uae_set_thread_priority(int pri)495 extern void uae_set_thread_priority (int pri)
496 {
497 SetTaskPri (FindTask (NULL), pri);
498 }
499