1 /*
2  * A type which wraps a semaphore
3  *
4  * semaphore.c
5  *
6  * Copyright (c) 2006-2008, R Oudkerk
7  * Licensed to PSF under a Contributor Agreement.
8  */
9 
10 #include "multiprocess.h"
11 
12 enum { RECURSIVE_MUTEX, SEMAPHORE };
13 
14 typedef struct {
15     PyObject_HEAD
16     SEM_HANDLE handle;
17     long last_tid;
18     int count;
19     int maxvalue;
20     int kind;
21 } SemLockObject;
22 
23 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
24 
25 
26 #ifdef MS_WINDOWS
27 
28 /*
29  * Windows definitions
30  */
31 
32 #define SEM_FAILED NULL
33 
34 #define SEM_CLEAR_ERROR() SetLastError(0)
35 #define SEM_GET_LAST_ERROR() GetLastError()
36 #define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
37 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
38 #define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
39 #define SEM_UNLINK(name) 0
40 
41 static int
_GetSemaphoreValue(HANDLE handle,long * value)42 _GetSemaphoreValue(HANDLE handle, long *value)
43 {
44     long previous;
45 
46     switch (WaitForSingleObject(handle, 0)) {
47     case WAIT_OBJECT_0:
48         if (!ReleaseSemaphore(handle, 1, &previous))
49             return MP_STANDARD_ERROR;
50         *value = previous + 1;
51         return 0;
52     case WAIT_TIMEOUT:
53         *value = 0;
54         return 0;
55     default:
56         return MP_STANDARD_ERROR;
57     }
58 }
59 
60 static PyObject *
semlock_acquire(SemLockObject * self,PyObject * args,PyObject * kwds)61 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
62 {
63     int blocking = 1;
64     double timeout;
65     PyObject *timeout_obj = Py_None;
66     DWORD res, full_msecs, nhandles;
67     HANDLE handles[2], sigint_event;
68 
69     static char *kwlist[] = {"block", "timeout", NULL};
70 
71     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
72                                      &blocking, &timeout_obj))
73         return NULL;
74 
75     /* calculate timeout */
76     if (!blocking) {
77         full_msecs = 0;
78     } else if (timeout_obj == Py_None) {
79         full_msecs = INFINITE;
80     } else {
81         timeout = PyFloat_AsDouble(timeout_obj);
82         if (PyErr_Occurred())
83             return NULL;
84         timeout *= 1000.0;      /* convert to millisecs */
85         if (timeout < 0.0) {
86             timeout = 0.0;
87         } else if (timeout >= 0.5 * INFINITE) { /* 25 days */
88             PyErr_SetString(PyExc_OverflowError,
89                             "timeout is too large");
90             return NULL;
91         }
92         full_msecs = (DWORD)(timeout + 0.5);
93     }
94 
95     /* check whether we already own the lock */
96     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
97         ++self->count;
98         Py_RETURN_TRUE;
99     }
100 
101     /* check whether we can acquire without releasing the GIL and blocking */
102     if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) {
103         self->last_tid = GetCurrentThreadId();
104         ++self->count;
105         Py_RETURN_TRUE;
106     }
107 
108     /* prepare list of handles */
109     nhandles = 0;
110     handles[nhandles++] = self->handle;
111     if (_PyOS_IsMainThread()) {
112         sigint_event = _PyOS_SigintEvent();
113         assert(sigint_event != NULL);
114         handles[nhandles++] = sigint_event;
115     }
116 
117     /* do the wait */
118     Py_BEGIN_ALLOW_THREADS
119     if (sigint_event != NULL)
120         ResetEvent(sigint_event);
121     res = WaitForMultipleObjects(nhandles, handles, FALSE, full_msecs);
122     Py_END_ALLOW_THREADS
123 
124     /* handle result */
125     switch (res) {
126     case WAIT_TIMEOUT:
127         Py_RETURN_FALSE;
128     case WAIT_OBJECT_0 + 0:
129         self->last_tid = GetCurrentThreadId();
130         ++self->count;
131         Py_RETURN_TRUE;
132     case WAIT_OBJECT_0 + 1:
133         errno = EINTR;
134         return PyErr_SetFromErrno(PyExc_IOError);
135     case WAIT_FAILED:
136         return PyErr_SetFromWindowsErr(0);
137     default:
138         PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or "
139                      "WaitForMultipleObjects() gave unrecognized "
140                      "value %d", res);
141         return NULL;
142     }
143 }
144 
145 static PyObject *
semlock_release(SemLockObject * self,PyObject * args)146 semlock_release(SemLockObject *self, PyObject *args)
147 {
148     if (self->kind == RECURSIVE_MUTEX) {
149         if (!ISMINE(self)) {
150             PyErr_SetString(PyExc_AssertionError, "attempt to "
151                             "release recursive lock not owned "
152                             "by thread");
153             return NULL;
154         }
155         if (self->count > 1) {
156             --self->count;
157             Py_RETURN_NONE;
158         }
159         assert(self->count == 1);
160     }
161 
162     if (!ReleaseSemaphore(self->handle, 1, NULL)) {
163         if (GetLastError() == ERROR_TOO_MANY_POSTS) {
164             PyErr_SetString(PyExc_ValueError, "semaphore or lock "
165                             "released too many times");
166             return NULL;
167         } else {
168             return PyErr_SetFromWindowsErr(0);
169         }
170     }
171 
172     --self->count;
173     Py_RETURN_NONE;
174 }
175 
176 #else /* !MS_WINDOWS */
177 
178 /*
179  * Unix definitions
180  */
181 
182 #define SEM_CLEAR_ERROR()
183 #define SEM_GET_LAST_ERROR() 0
184 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
185 #define SEM_CLOSE(sem) sem_close(sem)
186 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
187 #define SEM_UNLINK(name) sem_unlink(name)
188 
189 /* OS X 10.4 defines SEM_FAILED as -1 instead of (sem_t *)-1;  this gives
190    compiler warnings, and (potentially) undefined behaviour. */
191 #ifdef __APPLE__
192 #  undef SEM_FAILED
193 #  define SEM_FAILED ((sem_t *)-1)
194 #endif
195 
196 #ifndef HAVE_SEM_UNLINK
197 #  define sem_unlink(name) 0
198 #endif
199 
200 //ifndef HAVE_SEM_TIMEDWAIT
201 #  define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
202 
203 int
sem_timedwait_save(sem_t * sem,struct timespec * deadline,PyThreadState * _save)204 sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
205 {
206     int res;
207     unsigned long delay, difference;
208     struct timeval now, tvdeadline, tvdelay;
209 
210     errno = 0;
211     tvdeadline.tv_sec = deadline->tv_sec;
212     tvdeadline.tv_usec = deadline->tv_nsec / 1000;
213 
214     for (delay = 0 ; ; delay += 1000) {
215         /* poll */
216         if (sem_trywait(sem) == 0)
217             return 0;
218         else if (errno != EAGAIN)
219             return MP_STANDARD_ERROR;
220 
221         /* get current time */
222         if (gettimeofday(&now, NULL) < 0)
223             return MP_STANDARD_ERROR;
224 
225         /* check for timeout */
226         if (tvdeadline.tv_sec < now.tv_sec ||
227             (tvdeadline.tv_sec == now.tv_sec &&
228              tvdeadline.tv_usec <= now.tv_usec)) {
229             errno = ETIMEDOUT;
230             return MP_STANDARD_ERROR;
231         }
232 
233         /* calculate how much time is left */
234         difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
235             (tvdeadline.tv_usec - now.tv_usec);
236 
237         /* check delay not too long -- maximum is 20 msecs */
238         if (delay > 20000)
239             delay = 20000;
240         if (delay > difference)
241             delay = difference;
242 
243         /* sleep */
244         tvdelay.tv_sec = delay / 1000000;
245         tvdelay.tv_usec = delay % 1000000;
246         if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
247             return MP_STANDARD_ERROR;
248 
249         /* check for signals */
250         Py_BLOCK_THREADS
251         res = PyErr_CheckSignals();
252         Py_UNBLOCK_THREADS
253 
254         if (res) {
255             errno = EINTR;
256             return MP_EXCEPTION_HAS_BEEN_SET;
257         }
258     }
259 }
260 
261 //endif /* !HAVE_SEM_TIMEDWAIT */
262 
263 static PyObject *
semlock_acquire(SemLockObject * self,PyObject * args,PyObject * kwds)264 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
265 {
266     int blocking = 1, res, err = 0;
267     double timeout;
268     PyObject *timeout_obj = Py_None;
269     struct timespec deadline = {0};
270     struct timeval now;
271     long sec, nsec;
272 
273     static char *kwlist[] = {"block", "timeout", NULL};
274 
275     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
276                                      &blocking, &timeout_obj))
277         return NULL;
278 
279     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
280         ++self->count;
281         Py_RETURN_TRUE;
282     }
283 
284     if (timeout_obj != Py_None) {
285         timeout = PyFloat_AsDouble(timeout_obj);
286         if (PyErr_Occurred())
287             return NULL;
288         if (timeout < 0.0)
289             timeout = 0.0;
290 
291         if (gettimeofday(&now, NULL) < 0) {
292             PyErr_SetFromErrno(PyExc_OSError);
293             return NULL;
294         }
295         sec = (long) timeout;
296         nsec = (long) (1e9 * (timeout - sec) + 0.5);
297         deadline.tv_sec = now.tv_sec + sec;
298         deadline.tv_nsec = now.tv_usec * 1000 + nsec;
299         deadline.tv_sec += (deadline.tv_nsec / 1000000000);
300         deadline.tv_nsec %= 1000000000;
301     }
302 
303     do {
304         Py_BEGIN_ALLOW_THREADS
305         if (blocking && timeout_obj == Py_None)
306             res = sem_wait(self->handle);
307         else if (!blocking)
308             res = sem_trywait(self->handle);
309         else
310             res = sem_timedwait(self->handle, &deadline);
311         Py_END_ALLOW_THREADS
312         err = errno;
313         if (res == MP_EXCEPTION_HAS_BEEN_SET)
314             break;
315     } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
316 
317     if (res < 0) {
318         errno = err;
319         if (errno == EAGAIN || errno == ETIMEDOUT)
320             Py_RETURN_FALSE;
321         else if (errno == EINTR)
322             return NULL;
323         else
324             return PyErr_SetFromErrno(PyExc_OSError);
325     }
326 
327     ++self->count;
328     self->last_tid = PyThread_get_thread_ident();
329 
330     Py_RETURN_TRUE;
331 }
332 
333 static PyObject *
semlock_release(SemLockObject * self,PyObject * args)334 semlock_release(SemLockObject *self, PyObject *args)
335 {
336     if (self->kind == RECURSIVE_MUTEX) {
337         if (!ISMINE(self)) {
338             PyErr_SetString(PyExc_AssertionError, "attempt to "
339                             "release recursive lock not owned "
340                             "by thread");
341             return NULL;
342         }
343         if (self->count > 1) {
344             --self->count;
345             Py_RETURN_NONE;
346         }
347         assert(self->count == 1);
348     } else {
349 #ifdef HAVE_BROKEN_SEM_GETVALUE
350         /* We will only check properly the maxvalue == 1 case */
351         if (self->maxvalue == 1) {
352             /* make sure that already locked */
353             if (sem_trywait(self->handle) < 0) {
354                 if (errno != EAGAIN) {
355                     PyErr_SetFromErrno(PyExc_OSError);
356                     return NULL;
357                 }
358                 /* it is already locked as expected */
359             } else {
360                 /* it was not locked so undo wait and raise  */
361                 if (sem_post(self->handle) < 0) {
362                     PyErr_SetFromErrno(PyExc_OSError);
363                     return NULL;
364                 }
365                 PyErr_SetString(PyExc_ValueError, "semaphore "
366                                 "or lock released too many "
367                                 "times");
368                 return NULL;
369             }
370         }
371 #else
372         int sval;
373 
374         /* This check is not an absolute guarantee that the semaphore
375            does not rise above maxvalue. */
376         if (sem_getvalue(self->handle, &sval) < 0) {
377             return PyErr_SetFromErrno(PyExc_OSError);
378         } else if (sval >= self->maxvalue) {
379             PyErr_SetString(PyExc_ValueError, "semaphore or lock "
380                             "released too many times");
381             return NULL;
382         }
383 #endif
384     }
385 
386     if (sem_post(self->handle) < 0)
387         return PyErr_SetFromErrno(PyExc_OSError);
388 
389     --self->count;
390     Py_RETURN_NONE;
391 }
392 
393 #endif /* !MS_WINDOWS */
394 
395 /*
396  * All platforms
397  */
398 
399 static PyObject *
newsemlockobject(PyTypeObject * type,SEM_HANDLE handle,int kind,int maxvalue)400 newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
401 {
402     SemLockObject *self;
403 
404     self = PyObject_New(SemLockObject, type);
405     if (!self)
406         return NULL;
407     self->handle = handle;
408     self->kind = kind;
409     self->count = 0;
410     self->last_tid = 0;
411     self->maxvalue = maxvalue;
412     return (PyObject*)self;
413 }
414 
415 static PyObject *
semlock_new(PyTypeObject * type,PyObject * args,PyObject * kwds)416 semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
417 {
418     char buffer[256];
419     SEM_HANDLE handle = SEM_FAILED;
420     int kind, maxvalue, value;
421     PyObject *result;
422     static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
423     static int counter = 0;
424 
425     if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
426                                      &kind, &value, &maxvalue))
427         return NULL;
428 
429     if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
430         PyErr_SetString(PyExc_ValueError, "unrecognized kind");
431         return NULL;
432     }
433 
434     PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%d", (long)getpid(), counter++);
435 
436     SEM_CLEAR_ERROR();
437     handle = SEM_CREATE(buffer, value, maxvalue);
438     /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
439     if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
440         goto failure;
441 
442     if (SEM_UNLINK(buffer) < 0)
443         goto failure;
444 
445     result = newsemlockobject(type, handle, kind, maxvalue);
446     if (!result)
447         goto failure;
448 
449     return result;
450 
451   failure:
452     if (handle != SEM_FAILED)
453         SEM_CLOSE(handle);
454     mp_SetError(NULL, MP_STANDARD_ERROR);
455     return NULL;
456 }
457 
458 static PyObject *
semlock_rebuild(PyTypeObject * type,PyObject * args)459 semlock_rebuild(PyTypeObject *type, PyObject *args)
460 {
461     SEM_HANDLE handle;
462     int kind, maxvalue;
463 
464     if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
465                           &handle, &kind, &maxvalue))
466         return NULL;
467 
468     return newsemlockobject(type, handle, kind, maxvalue);
469 }
470 
471 static void
semlock_dealloc(SemLockObject * self)472 semlock_dealloc(SemLockObject* self)
473 {
474     if (self->handle != SEM_FAILED)
475         SEM_CLOSE(self->handle);
476     PyObject_Del(self);
477 }
478 
479 static PyObject *
semlock_count(SemLockObject * self)480 semlock_count(SemLockObject *self)
481 {
482     return PyLong_FromLong((long)self->count);
483 }
484 
485 static PyObject *
semlock_ismine(SemLockObject * self)486 semlock_ismine(SemLockObject *self)
487 {
488     /* only makes sense for a lock */
489     return PyBool_FromLong(ISMINE(self));
490 }
491 
492 static PyObject *
semlock_getvalue(SemLockObject * self)493 semlock_getvalue(SemLockObject *self)
494 {
495 #ifdef HAVE_BROKEN_SEM_GETVALUE
496     PyErr_SetNone(PyExc_NotImplementedError);
497     return NULL;
498 #else
499     int sval;
500     if (SEM_GETVALUE(self->handle, &sval) < 0)
501         return mp_SetError(NULL, MP_STANDARD_ERROR);
502     /* some posix implementations use negative numbers to indicate
503        the number of waiting threads */
504     if (sval < 0)
505         sval = 0;
506     return PyLong_FromLong((long)sval);
507 #endif
508 }
509 
510 static PyObject *
semlock_iszero(SemLockObject * self)511 semlock_iszero(SemLockObject *self)
512 {
513 #ifdef HAVE_BROKEN_SEM_GETVALUE
514     if (sem_trywait(self->handle) < 0) {
515         if (errno == EAGAIN)
516             Py_RETURN_TRUE;
517         return mp_SetError(NULL, MP_STANDARD_ERROR);
518     } else {
519         if (sem_post(self->handle) < 0)
520             return mp_SetError(NULL, MP_STANDARD_ERROR);
521         Py_RETURN_FALSE;
522     }
523 #else
524     int sval;
525     if (SEM_GETVALUE(self->handle, &sval) < 0)
526         return mp_SetError(NULL, MP_STANDARD_ERROR);
527     return PyBool_FromLong((long)sval == 0);
528 #endif
529 }
530 
531 static PyObject *
semlock_afterfork(SemLockObject * self)532 semlock_afterfork(SemLockObject *self)
533 {
534     self->count = 0;
535     Py_RETURN_NONE;
536 }
537 
538 /*
539  * Semaphore methods
540  */
541 
542 static PyMethodDef semlock_methods[] = {
543     {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
544      "acquire the semaphore/lock"},
545     {"release", (PyCFunction)semlock_release, METH_NOARGS,
546      "release the semaphore/lock"},
547     {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
548      "enter the semaphore/lock"},
549     {"__exit__", (PyCFunction)semlock_release, METH_VARARGS,
550      "exit the semaphore/lock"},
551     {"_count", (PyCFunction)semlock_count, METH_NOARGS,
552      "num of `acquire()`s minus num of `release()`s for this process"},
553     {"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS,
554      "whether the lock is owned by this thread"},
555     {"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS,
556      "get the value of the semaphore"},
557     {"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS,
558      "returns whether semaphore has value zero"},
559     {"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS,
560      ""},
561     {"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS,
562      "rezero the net acquisition count after fork()"},
563     {NULL}
564 };
565 
566 /*
567  * Member table
568  */
569 
570 static PyMemberDef semlock_members[] = {
571     {"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY,
572      ""},
573     {"kind", T_INT, offsetof(SemLockObject, kind), READONLY,
574      ""},
575     {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
576      ""},
577     {NULL}
578 };
579 
580 /*
581  * Semaphore type
582  */
583 
584 PyTypeObject SemLockType = {
585     PyVarObject_HEAD_INIT(NULL, 0)
586     /* tp_name           */ "_multiprocess.SemLock",
587     /* tp_basicsize      */ sizeof(SemLockObject),
588     /* tp_itemsize       */ 0,
589     /* tp_dealloc        */ (destructor)semlock_dealloc,
590     /* tp_print          */ 0,
591     /* tp_getattr        */ 0,
592     /* tp_setattr        */ 0,
593     /* tp_reserved       */ 0,
594     /* tp_repr           */ 0,
595     /* tp_as_number      */ 0,
596     /* tp_as_sequence    */ 0,
597     /* tp_as_mapping     */ 0,
598     /* tp_hash           */ 0,
599     /* tp_call           */ 0,
600     /* tp_str            */ 0,
601     /* tp_getattro       */ 0,
602     /* tp_setattro       */ 0,
603     /* tp_as_buffer      */ 0,
604     /* tp_flags          */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
605     /* tp_doc            */ "Semaphore/Mutex type",
606     /* tp_traverse       */ 0,
607     /* tp_clear          */ 0,
608     /* tp_richcompare    */ 0,
609     /* tp_weaklistoffset */ 0,
610     /* tp_iter           */ 0,
611     /* tp_iternext       */ 0,
612     /* tp_methods        */ semlock_methods,
613     /* tp_members        */ semlock_members,
614     /* tp_getset         */ 0,
615     /* tp_base           */ 0,
616     /* tp_dict           */ 0,
617     /* tp_descr_get      */ 0,
618     /* tp_descr_set      */ 0,
619     /* tp_dictoffset     */ 0,
620     /* tp_init           */ 0,
621     /* tp_alloc          */ 0,
622     /* tp_new            */ semlock_new,
623 };
624