1  /*
2   * E-UAE - The portable Amiga Emulator
3   *
4   * Thread and semaphore support using AmigaOS processes
5   *
6   * Copyright 2003-2005 Richard Drummond
7   */
8 
9 #include "sysconfig.h"
10 #include "sysdeps.h"
11 
12 #include <exec/exec.h>
13 #include <exec/ports.h>
14 #include <exec/lists.h>
15 #include <dos/dosextens.h>
16 #include <dos/dostags.h>
17 
18 #include <proto/exec.h>
19 #include <proto/dos.h>
20 #include <proto/timer.h>
21 #include <clib/alib_protos.h>
22 
23 #include "threaddep/thread.h"
24 
25 /*
26  * Handle CreateNewProc() differences between AmigaOS-like systems
27  */
28 #ifdef __MORPHOS__
29 /* CreateNewProc() on MorphOS needs to be told that code is PPC */
30 # define myCreateNewProcTags(...) CreateNewProcTags(NP_CodeType, CODETYPE_PPC, __VA_ARGS__)
31 #else
32 # ifdef __amigaos4__
33 /* On OS4, we assert that the threads we create are our children */
34 #  define myCreateNewProcTags(...) CreateNewProcTags(NP_Child, TRUE, __VA_ARGS__)
35 # else
36 #  define myCreateNewProcTags CreateNewProcTags
37 # endif
38 #endif
39 
40 /*
41  * Simple emulation of POSIX semaphores using message-passing
42  * with a proxy task.
43  *
44  */
45 
46 static struct Process *proxy_thread;
47 static struct MsgPort *proxy_msgport;
48 
49 static int sem_count;
50 
51 /*
52  * Message packet which is sent to the proxy thread
53  * to effect blocking on a semaphore
54  */
55 
56 /*
57  * Message types
58  */
59 typedef enum {
60     PROXY_MSG_BLOCK,
61     PROXY_MSG_UNBLOCK,
62     PROXY_MSG_DESTROY,
63 } ProxyMsgType;
64 
65 struct ProxyMsg {
66     struct Message     msg;
67     struct PSemaphore *sem;
68     struct Task       *reply_task;
69     ProxyMsgType       type;
70 };
71 
72 /* Proxy replies to message sender with this signal */
73 #define PROXY_MSG_REPLY_SIG		SIGBREAKF_CTRL_F
74 
init_proxy_msg(struct ProxyMsg * msg,ProxyMsgType type,struct PSemaphore * sem)75 STATIC_INLINE void init_proxy_msg (struct ProxyMsg *msg, ProxyMsgType type, struct PSemaphore *sem)
76 {
77     memset (msg, 0, sizeof (struct ProxyMsg));
78 
79     msg->msg.mn_Length = sizeof (struct ProxyMsg);
80     msg->sem           = sem;
81     msg->type          = type;
82     msg->reply_task    = FindTask (NULL);
83 }
84 
put_proxy_msg(struct ProxyMsg * msg)85 STATIC_INLINE void put_proxy_msg (struct ProxyMsg *msg)
86 {
87     PutMsg (proxy_msgport, (struct Message *)msg);
88 }
89 
wait_proxy_reply(void)90 STATIC_INLINE BOOL wait_proxy_reply (void)
91 {
92     BOOL  got_reply = FALSE;
93     ULONG sigs;
94 
95     sigs = Wait (PROXY_MSG_REPLY_SIG | SIGBREAKF_CTRL_C);
96 
97     if (sigs & PROXY_MSG_REPLY_SIG)
98 	got_reply = TRUE;
99 
100     return got_reply;
101 }
102 
103 /*
104  * Do proxy's reply to a semaphore message
105  */
do_proxy_reply(struct ProxyMsg * msg)106 STATIC_INLINE void do_proxy_reply (struct ProxyMsg *msg)
107 {
108    ReplyMsg ((struct Message *)msg);
109 
110    Signal (msg->reply_task, PROXY_MSG_REPLY_SIG);
111 }
112 
113 
114 
115 /*
116  * Block current process on semaphore
117  */
BlockMe(struct PSemaphore * psem)118 static int BlockMe (struct PSemaphore *psem)
119 {
120     int result = -1;
121     struct ProxyMsg msg;
122 
123     init_proxy_msg (&msg, PROXY_MSG_BLOCK, psem);
124 
125     Forbid ();
126 
127     put_proxy_msg (&msg);
128     if (wait_proxy_reply ())
129 	result = 0;
130     else {
131 	/* Hack - remove block request from semaphore's waiting list
132 	 * When the block is interrupted, the proxy won't do it */
133 	Remove ((struct Node *)&msg);
134 	msg.msg.mn_Node.ln_Succ = msg.msg.mn_Node.ln_Pred = NULL;
135     }
136 
137     Permit ();
138 
139     return result;
140 }
141 
142 /*
143  * Unblock one process blocked on semaphore
144  */
UnblockOne(struct PSemaphore * psem)145 static void UnblockOne (struct PSemaphore *psem)
146 {
147     struct ProxyMsg msg;
148 
149     init_proxy_msg (&msg, PROXY_MSG_UNBLOCK, psem);
150 
151     put_proxy_msg (&msg);
152     wait_proxy_reply ();
153 }
154 
155 /*
156  * Destroy the semaphore block
157  */
Destroy(struct PSemaphore * psem)158 static void Destroy (struct PSemaphore *psem)
159 {
160     struct ProxyMsg msg;
161 
162     init_proxy_msg (&msg, PROXY_MSG_DESTROY, psem);
163 
164     put_proxy_msg (&msg);
165     wait_proxy_reply ();
166 }
167 
168 
169 
170 /**
171  ** Proxy thread
172  **/
173 
handle_startup_msg(void)174 static void handle_startup_msg (void)
175 {
176     struct Process *self = (struct Process*) FindTask (NULL);
177     WaitPort (&self->pr_MsgPort);
178     ReplyMsg (GetMsg (&self->pr_MsgPort));
179 }
180 
proxy_thread_main(void)181 static void proxy_thread_main (void)
182 {
183     ULONG sigmask;
184     int dont_exit = 1;
185     struct ProxyMsg *msg = NULL;
186 
187     proxy_msgport = CreateMsgPort ();
188     sigmask = SIGBREAKF_CTRL_C | (1 << proxy_msgport->mp_SigBit);
189 
190     /* Wait for and reply to startup msg */
191     handle_startup_msg ();
192 
193     while (dont_exit) {
194 	ULONG sigs = Wait (sigmask);
195 
196 	/*
197 	 * Handle any semaphore message sent us
198 	 */
199 	while ((msg = (struct ProxyMsg *) GetMsg (proxy_msgport))) {
200 	    struct PSemaphore *psem = msg->sem;
201 
202 	    switch (msg->type) {
203 		case PROXY_MSG_BLOCK:
204 		    /* Block requesting task on semaphore.
205 		     *
206 		     * We effect the block by not replying to this message now. Just
207 		     * put the request in the waiting list and reply to it when we
208 		     * get an unblock request for this semaphore. The process requesting
209 		     * the block will sleep until we do (unless it is interrupted). */
210 		    AddTail ((struct List*)&psem->wait_queue, (struct Node *)msg);
211 		    break;
212 
213 		case PROXY_MSG_UNBLOCK:
214 		    /* A request to unblock a task on semaphore. */
215 		    {
216 			struct ProxyMsg *wait_msg;
217 
218 			Forbid ();
219 
220 			/* If there's a task blocking on the semaphore, remove
221 			 * it from the waiting list */
222 			wait_msg = (struct ProxyMsg *) RemHead ((struct List *)&psem->wait_queue);
223 
224 			/* Reply to the unblock request */
225 			do_proxy_reply (msg);
226 
227 			/* Reply to the block request - this will wake up the
228 			 * blocked task. */
229 			if (wait_msg)
230 			    do_proxy_reply (wait_msg);
231 
232 			Permit ();
233 		    }
234 		    break;
235 
236 		case PROXY_MSG_DESTROY:
237 		    Forbid ();
238 		    {
239 			struct ProxyMsg *wait_msg;
240 
241 			while ((wait_msg = (struct ProxyMsg *) RemHead ((struct List *)&psem->wait_queue))) {
242 			    /* Break the block for all tasks blocked on the semaphore */
243 			    Signal (wait_msg->reply_task, SIGBREAKF_CTRL_C);
244 			}
245 		    }
246 		    do_proxy_reply (msg);
247 		    Permit ();
248 		    break;
249 	    } /* switch (msg->type) */
250 	} /* while */
251 
252 	/*
253 	 * Check for break signal and exit if received
254 	 */
255 	if ((sigs & SIGBREAKF_CTRL_C) != 0)
256 	    dont_exit = FALSE;
257     } /* while (dont_exit) */
258 
259    /*
260     * Clean up and exit
261     */
262    Forbid ();
263    {
264 	struct ProxyMsg *rem_msg;
265 
266 	/* Reply to any unanswered requests */
267 	while ((rem_msg = (struct ProxyMsg *) GetMsg (proxy_msgport)) != NULL)
268 	    do_proxy_reply (rem_msg);
269 
270 	/* reply to the quit msg */
271 	if (msg != NULL)
272 	    do_proxy_reply (msg);
273 
274 	DeleteMsgPort (proxy_msgport);
275 	proxy_msgport = 0;
276 	proxy_thread  = 0;
277    }
278    Permit ();
279 }
280 
stop_proxy_thread(void)281 static void stop_proxy_thread (void)
282 {
283     if (proxy_thread)
284 	Signal ((struct Task *)proxy_thread, SIGBREAKF_CTRL_C);
285 }
286 
287 /*
288  * Start proxy thread
289  */
start_proxy_thread(void)290 static int start_proxy_thread (void)
291 {
292     int result = -1;
293     struct MsgPort *replyport = CreateMsgPort();
294     struct Process *p;
295 
296     if (replyport) {
297 	p = myCreateNewProcTags (NP_Name,	(ULONG) "E-UAE semaphore proxy",
298 				 NP_Priority,		10,
299 				 NP_StackSize,		2048,
300 				 NP_Entry,	(ULONG) proxy_thread_main,
301 				 TAG_DONE);
302 	if (p) {
303 	    /* Send startup message */
304 	    struct Message msg;
305 	    msg.mn_ReplyPort = replyport;
306 	    msg.mn_Length    = sizeof msg;
307 	    PutMsg (&p->pr_MsgPort, (struct Message*)&msg);
308 	    WaitPort (replyport);
309 
310 	    proxy_thread = p;
311 
312 	    atexit (stop_proxy_thread);
313 
314 	    result = 0;
315 	}
316 	DeleteMsgPort (replyport);
317     }
318     return result;
319 }
320 
321 /**
322  ** Emulation of POSIX semaphore API
323  ** (or, at least, UAE's version thereof).
324  **/
325 
uae_sem_init(uae_sem_t * sem,int pshared,unsigned int value)326 int uae_sem_init (uae_sem_t *sem, int pshared, unsigned int value)
327 {
328     int result = 0;
329 
330     InitSemaphore (&sem->mutex);
331     NewList ((struct List *)&sem->wait_queue);
332     sem->value = value;
333     sem->live  = 1;
334 
335     if (sem_count == 0)
336 	result = start_proxy_thread ();
337 
338     if (result != -1)
339 	sem_count++;
340 
341     return result;
342 }
343 
uae_sem_destroy(uae_sem_t * sem)344 int uae_sem_destroy (uae_sem_t *sem)
345 {
346     ObtainSemaphore (&sem->mutex);
347 
348     sem->live = 0;
349     Destroy (sem);
350 
351     /* If this is the last semaphore to die,
352      * kill the proxy thread */
353     if (--sem_count == 0)
354 	stop_proxy_thread ();
355 
356     ReleaseSemaphore (&sem->mutex);
357 
358     return 0;
359 }
360 
uae_sem_wait(uae_sem_t * sem)361 int uae_sem_wait (uae_sem_t *sem)
362 {
363     int result = -1;
364 
365     ObtainSemaphore (&sem->mutex);
366 
367     if (sem->live) {
368 	if (sem->value > 0) {
369 	    --sem->value;
370 	    result = 0;
371 	} else {
372 	    ReleaseSemaphore (&sem->mutex);
373 
374 	    /* Block on this semaphore by waiting for
375 	     * the proxy thread to reply to our lock request
376 	     */
377 	    result = BlockMe (sem);
378 
379 	    ObtainSemaphore (&sem->mutex);
380 
381 	    if (result != -1)
382 		--sem->value;
383 	}
384     }
385     ReleaseSemaphore (&sem->mutex);
386     return result;
387 }
388 
uae_sem_trywait(uae_sem_t * sem)389 int uae_sem_trywait (uae_sem_t *sem)
390 {
391     int result = -1;
392 
393     ObtainSemaphore (&sem->mutex);
394 
395     if (sem->live) {
396 	if (sem->value > 0) {
397 	    --sem->value;
398 	    result = 0;
399 	}
400     }
401     ReleaseSemaphore (&sem->mutex);
402     return result;
403 }
404 
uae_sem_get_value(uae_sem_t * sem)405 int uae_sem_get_value (uae_sem_t *sem)
406 {
407     int value;
408 
409     ObtainSemaphore (&sem->mutex);
410     value = sem->value;
411     ReleaseSemaphore (&sem->mutex);
412 
413     return value;
414 }
415 
uae_sem_post(uae_sem_t * sem)416 int uae_sem_post (uae_sem_t *sem)
417 {
418     int result = -1;
419 
420     ObtainSemaphore (&sem->mutex);
421 
422     if (sem->live) {
423 	sem->value++;
424 
425 	if (sem->value == 1) {
426 	    /* Ask the proxy to wake up a task blocked
427 	     * on this semaphore */
428 	    UnblockOne (sem);
429 	}
430 	result = 0;
431     }
432     ReleaseSemaphore (&sem->mutex);
433 
434     return result;
435 }
436 
437 
438 /**
439  ** Thread support
440  **/
441 
442 struct startupmsg
443 {
444     struct Message msg;
445     void           *(*func) (void *);
446     void           *arg;
447 };
448 
do_thread(void)449 static void do_thread (void)
450 {
451    struct Process *pr = (struct Process *) FindTask (NULL);
452    struct startupmsg *msg;
453    void *(*func) (void *);
454    void *arg;
455 
456    WaitPort (&pr->pr_MsgPort);
457    msg = (struct startupmsg *) GetMsg(&pr->pr_MsgPort);
458    func = msg->func;
459    arg  = msg->arg;
460    ReplyMsg ((struct Message*)msg);
461 
462    func (arg);
463 }
464 
uae_start_thread(char * name,void * (* f)(void *),void * arg,uae_thread_id * foo)465 int uae_start_thread (char *name, void *(*f) (void *), void *arg, uae_thread_id *foo)
466 {
467     struct MsgPort *replyport = CreateMsgPort();
468 
469     if (replyport) {
470 	*foo = (struct Task *)myCreateNewProcTags (NP_Output,		   Output (),
471 						   NP_Input,		   Input (),
472 						   NP_Name,	   (ULONG) "UAE thread",
473 						   NP_CloseOutput,	   FALSE,
474 						   NP_CloseInput,	   FALSE,
475 						   NP_StackSize,	   16384,
476 						   NP_Entry,	   (ULONG) do_thread,
477 						   TAG_DONE);
478 
479 	if(*foo) {
480 	    struct startupmsg msg;
481 
482 	    msg.msg.mn_ReplyPort = replyport;
483 	    msg.msg.mn_Length    = sizeof msg;
484 	    msg.func             = f;
485 	    msg.arg              = arg;
486 	    PutMsg (&((struct Process*)*foo)->pr_MsgPort, (struct Message*)&msg);
487 	    WaitPort (replyport);
488 	}
489 	DeleteMsgPort (replyport);
490     }
491 
492     return *foo!=0;
493 }
494 
uae_set_thread_priority(int pri)495 extern void uae_set_thread_priority (int pri)
496 {
497     SetTaskPri (FindTask (NULL), pri);
498 }
499