1 /*
2  * A type which wraps a semaphore
3  *
4  * semaphore.c
5  *
6  * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7  */
8 
9 #include "processing.h"
10 
11 enum { RECURSIVE_MUTEX, SEMAPHORE, BOUNDED_SEMAPHORE };
12 
13 typedef struct {
14     PyObject_HEAD
15     SEM_HANDLE handle;
16     long last_tid;
17     int count;
18     int maxvalue;
19     int kind;
20 } SemLock;
21 
22 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
23 
24 
25 #ifdef MS_WINDOWS
26 
27 /*
28  * Windows definitions
29  */
30 
31 static SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE };
32 
33 #define SEM_FAILED NULL
34 
35 #define SEM_CLEAR_ERROR() SetLastError(0)
36 #define SEM_GET_LAST_ERROR() GetLastError()
37 #define SEM_CREATE(name, val, max) CreateSemaphore(&sa, val, max, NULL)
38 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
39 #define SEM_GETVALUE(sem, pval) _SemLock_GetSemaphoreValue(sem, pval)
40 #define SEM_UNLINK(name) 0
41 
42 static int
_SemLock_GetSemaphoreValue(HANDLE handle,long * value)43 _SemLock_GetSemaphoreValue(HANDLE handle, long *value)
44 {
45     long previous;
46 
47     switch (WaitForSingleObject(handle, 0)) {
48     case WAIT_OBJECT_0:
49         if (!ReleaseSemaphore(handle, 1, &previous))
50             return STANDARD_ERROR;
51         *value = previous + 1;
52         return 0;
53     case WAIT_TIMEOUT:
54         *value = 0;
55         return 0;
56     default:
57         return STANDARD_ERROR;
58     }
59 }
60 
61 static PyObject *
SemLock_acquire(SemLock * self,PyObject * args,PyObject * kwds)62 SemLock_acquire(SemLock *self, PyObject *args, PyObject *kwds)
63 {
64     int blocking = 1;
65     double timeout;
66     PyObject *timeout_obj = Py_None;
67     DWORD res, dwTimeout;
68 
69     static char *kwlist[] = {"block", "timeout", NULL};
70 
71     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
72                                      &blocking, &timeout_obj))
73         return NULL;
74 
75     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
76         ++self->count;
77         Py_RETURN_TRUE;
78     }
79 
80     /* work out timeout */
81     if (!blocking) {
82         dwTimeout = 0;
83     } else if (timeout_obj == Py_None) {
84         dwTimeout = INFINITE;
85     } else {
86         timeout = PyFloat_AsDouble(timeout_obj);
87         if (PyErr_Occurred())
88             return NULL;
89         timeout *= 1000.0;      /* convert to millisecs */
90         if (timeout < 0.0) {
91             timeout = 0.0;
92         } else if (timeout >= ((double)INFINITE - 0.5)) {
93             PyErr_SetString(PyExc_OverflowError, "timeout is too large");
94             return NULL;
95         }
96         dwTimeout = (DWORD)(timeout + 0.5);
97     }
98 
99     /* do the wait */
100     if (dwTimeout == 0) {
101         res = WaitForSingleObject(self->handle, 0);
102     } else if (main_thread == GetCurrentThreadId()) {
103         res = WaitForSingleObject(self->handle, 0);
104         if (res == WAIT_TIMEOUT) {
105             HANDLE handles[2] = {self->handle, hInterruptEvent};
106             ResetEvent(hInterruptEvent);
107             Py_BEGIN_ALLOW_THREADS
108             res = WaitForMultipleObjects(2, handles, FALSE, dwTimeout);
109             Py_END_ALLOW_THREADS
110         }
111     } else {
112         Py_BEGIN_ALLOW_THREADS
113         res = WaitForSingleObject(self->handle, dwTimeout);
114         Py_END_ALLOW_THREADS
115     }
116 
117     /* handle result */
118     switch (res) {
119     case WAIT_TIMEOUT:
120         Py_RETURN_FALSE;
121     case WAIT_OBJECT_0:
122         self->last_tid = GetCurrentThreadId();
123         ++self->count;
124         Py_RETURN_TRUE;
125     case (WAIT_OBJECT_0 + 1):  /* we got SIGINT; do like in timemodule.c */
126         Sleep(1);
127         errno = EINTR;
128         PyErr_SetFromErrno(PyExc_OSError);
129         return NULL;
130     case WAIT_FAILED:
131         return PyErr_SetFromWindowsErr(0);
132     default:
133         PyErr_SetString(PyExc_RuntimeError, "WaitForSingleObject() or "
134                         "WaitForMultipleObjects() gave unrecognized value");
135         return NULL;
136     }
137 }
138 
139 static PyObject *
SemLock_release(SemLock * self,PyObject * args)140 SemLock_release(SemLock *self, PyObject *args)
141 {
142     if (self->kind == RECURSIVE_MUTEX) {
143         if (!ISMINE(self)) {
144             PyErr_SetString(PyExc_AssertionError, "attempt to release "
145                             "recursive lock not owned by thread");
146             return NULL;
147         }
148         if (self->count > 1) {
149             --self->count;
150             Py_RETURN_NONE;
151         }
152         assert(self->count == 1);
153     }
154 
155     if (!ReleaseSemaphore(self->handle, 1, NULL)) {
156         if (GetLastError() == ERROR_TOO_MANY_POSTS) {
157             PyErr_SetString(PyExc_ValueError,
158                             "semaphore or lock released too many times");
159             return NULL;
160         } else {
161             return PyErr_SetFromWindowsErr(0);
162         }
163     }
164 
165     --self->count;
166     Py_RETURN_NONE;
167 }
168 
169 #else /* !MS_WINDOWS */
170 
171 /*
172  * Unix definitions
173  */
174 
175 #define SEM_CLEAR_ERROR()
176 #define SEM_GET_LAST_ERROR() 0
177 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
178 #define SEM_CLOSE(sem) sem_close(sem)
179 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
180 #define SEM_UNLINK(name) sem_unlink(name)
181 
182 #if HAVE_BROKEN_SEM_UNLINK
183 #  define sem_unlink(name) 0
184 #endif
185 
186 #if !HAVE_SEM_TIMEDWAIT
187 #  define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
188 
189 int
sem_timedwait_save(sem_t * sem,struct timespec * deadline,PyThreadState * _save)190 sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
191 {
192     int res;
193     unsigned long delay, difference;
194     struct timeval now, tvdeadline, tvdelay;
195 
196     errno = 0;
197     tvdeadline.tv_sec = deadline->tv_sec;
198     tvdeadline.tv_usec = deadline->tv_nsec / 1000;
199 
200     for (delay = 0 ; ; delay += 1000) {
201         /* poll */
202         if (sem_trywait(sem) == 0)
203             return 0;
204         else if (errno != EAGAIN)
205             return STANDARD_ERROR;
206 
207         /* get current time */
208         if (gettimeofday(&now, NULL) < 0)
209             return STANDARD_ERROR;
210 
211         /* check for timeout */
212         if (tvdeadline.tv_sec < now.tv_sec ||
213             (tvdeadline.tv_sec == now.tv_sec &&
214              tvdeadline.tv_usec <= now.tv_usec)) {
215             errno = ETIMEDOUT;
216             return STANDARD_ERROR;
217         }
218 
219         /* calculate how much time is left */
220         difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
221             (tvdeadline.tv_usec - now.tv_usec);
222 
223         /* check delay not too long -- maximum is 20 msecs */
224         if (delay > 20000)
225             delay = 20000;
226         if (delay > difference)
227             delay = difference;
228 
229         /* sleep */
230         tvdelay.tv_sec = delay / 1000000;
231         tvdelay.tv_usec = delay % 1000000;
232         if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
233             return STANDARD_ERROR;
234 
235         /* check for signals */
236         Py_BLOCK_THREADS
237         res = PyErr_CheckSignals();
238         Py_UNBLOCK_THREADS
239 
240         if (res) {
241             errno = EINTR;
242             return EXCEPTION_HAS_BEEN_SET;
243         }
244     }
245 }
246 
247 #endif /* !HAVE_SEM_TIMEDWAIT */
248 
249 static PyObject *
SemLock_acquire(SemLock * self,PyObject * args,PyObject * kwds)250 SemLock_acquire(SemLock *self, PyObject *args, PyObject *kwds)
251 {
252     int blocking = 1, res;
253     double timeout;
254     PyObject *timeout_obj = Py_None;
255     struct timespec deadline = {0};
256     struct timeval now;
257     long sec, nsec;
258 
259     static char *kwlist[] = {"block", "timeout", NULL};
260 
261     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
262                                      &blocking, &timeout_obj))
263         return NULL;
264 
265     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
266         ++self->count;
267         Py_RETURN_TRUE;
268     }
269 
270     if (timeout_obj != Py_None) {
271         timeout = PyFloat_AsDouble(timeout_obj);
272         if (PyErr_Occurred())
273             return NULL;
274         if (timeout < 0.0)
275             timeout = 0.0;
276 
277         if (gettimeofday(&now, NULL) < 0) {
278             PyErr_SetFromErrno(PyExc_OSError);
279             return NULL;
280         }
281         sec = (long) timeout;
282         nsec = (long) (1e9 * (timeout - sec) + 0.5);
283         deadline.tv_sec = now.tv_sec + sec;
284         deadline.tv_nsec = now.tv_usec * 1000 + nsec;
285         deadline.tv_sec += (deadline.tv_nsec / 1000000000);
286         deadline.tv_nsec %= 1000000000;
287     }
288 
289     do {
290         Py_BEGIN_ALLOW_THREADS
291         if (blocking && timeout_obj == Py_None)
292             res = sem_wait(self->handle);
293         else if (!blocking)
294             res = sem_trywait(self->handle);
295         else
296             res = sem_timedwait(self->handle, &deadline);
297         Py_END_ALLOW_THREADS
298         if (res == EXCEPTION_HAS_BEEN_SET)
299             break;
300     } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
301 
302     if (res < 0) {
303         if (errno == EAGAIN || errno == ETIMEDOUT)
304             Py_RETURN_FALSE;
305         else if (errno == EINTR)
306             return NULL;
307         else
308             return PyErr_SetFromErrno(PyExc_OSError);
309     }
310 
311     ++self->count;
312     self->last_tid = PyThread_get_thread_ident();
313 
314     Py_RETURN_TRUE;
315 }
316 
317 static PyObject *
SemLock_release(SemLock * self,PyObject * args)318 SemLock_release(SemLock *self, PyObject *args)
319 {
320     switch (self->kind) {
321     case RECURSIVE_MUTEX:
322         if (!ISMINE(self)) {
323             PyErr_SetString(PyExc_AssertionError, "attempt to release "
324                             "recursive lock not owned by thread");
325             return NULL;
326         }
327         if (self->count > 1) {
328             --self->count;
329             Py_RETURN_NONE;
330         }
331         assert(self->count == 1);
332         break;
333 #if HAVE_BROKEN_SEM_GETVALUE
334     case BOUNDED_SEMAPHORE:
335         /* We will only check properly the Lock case (where maxvalue == 1) */
336         if (self->maxvalue == 1) {
337             /* make sure that already locked */
338             if (sem_trywait(self->handle) < 0) {
339                 if (errno != EAGAIN)
340                     return PyErr_SetFromErrno(PyExc_OSError);
341                 /* it is already locked as expected */
342             } else {
343                 /* it was not locked -- so undo wait and raise error */
344                 if (sem_post(self->handle) < 0)
345                     return PyErr_SetFromErrno(PyExc_OSError);
346                 PyErr_SetString(PyExc_ValueError,
347                                 "semaphore or lock released too many times");
348                 return NULL;
349             }
350         }
351 #else
352     case BOUNDED_SEMAPHORE:
353         {
354             int sval;
355 
356             /* This check is not an absolute guarantee that the semaphore
357                does not rise above maxvalue. */
358             if (sem_getvalue(self->handle, &sval) < 0) {
359                 return PyErr_SetFromErrno(PyExc_OSError);
360             } else if (sval >= self->maxvalue) {
361                 PyErr_SetString(PyExc_ValueError,
362                                 "semaphore or lock released too many times");
363                 return NULL;
364             }
365         }
366 #endif
367     }
368 
369     if (sem_post(self->handle) < 0)
370         return PyErr_SetFromErrno(PyExc_OSError);
371 
372     --self->count;
373     Py_RETURN_NONE;
374 }
375 
376 #endif /* !MS_WINDOWS */
377 
378 /*
379  * All platforms
380  */
381 
382 static PyObject *
_SemLock_create(PyTypeObject * type,SEM_HANDLE handle,int kind,int maxvalue)383 _SemLock_create(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
384 {
385     SemLock *self;
386 
387     self = (SemLock*)type->tp_alloc(type, 0);
388     if (!self)
389         return NULL;
390     self->handle = handle;
391     self->kind = kind;
392     self->count = 0;
393     self->last_tid = 0;
394     self->maxvalue = maxvalue;
395     return (PyObject*)self;
396 }
397 
398 static PyObject *
SemLock_new(PyTypeObject * type,PyObject * args,PyObject * kwds)399 SemLock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
400 {
401     char buffer[256];
402     SEM_HANDLE handle = SEM_FAILED;
403     int kind, maxvalue, value;
404     PyObject *result;
405     static char *kwlist[] = {"kind", "value", NULL};
406     static int counter = 0;
407 
408     if (!PyArg_ParseTupleAndKeywords(args, kwds, "ii", kwlist, &kind, &value))
409         return NULL;
410 
411     if (kind < RECURSIVE_MUTEX || kind > BOUNDED_SEMAPHORE) {
412         PyErr_SetString(PyExc_ValueError, "unrecongnized blocker type");
413         return NULL;
414     }
415 
416     PyOS_snprintf(buffer, sizeof(buffer), "/pr%d-%d", getpid(), counter++);
417 
418     if (kind == BOUNDED_SEMAPHORE)
419         maxvalue = value;
420     else if (kind == RECURSIVE_MUTEX)
421         maxvalue = 1;
422     else
423         maxvalue = INT_MAX;
424 
425     SEM_CLEAR_ERROR();
426     handle = SEM_CREATE(buffer, value, maxvalue);
427     /* On Windows we should fail if GetLastError() == ERROR_ALREADY_EXISTS */
428     if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
429         goto failure;
430 
431     if (SEM_UNLINK(buffer) < 0)
432         goto failure;
433 
434     result = _SemLock_create(type, handle, kind, maxvalue);
435     if (!result)
436         goto failure;
437 
438     return result;
439 
440  failure:
441     if (handle != SEM_FAILED)
442         SEM_CLOSE(handle);
443     SetException(NULL, STANDARD_ERROR);
444     return NULL;
445 }
446 
447 static PyObject *
SemLock_rebuild(PyTypeObject * type,PyObject * args)448 SemLock_rebuild(PyTypeObject *type, PyObject *args)
449 {
450     SEM_HANDLE handle;
451     int kind, maxvalue;
452 
453     if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii", &handle, &kind, &maxvalue))
454         return NULL;
455 
456     return _SemLock_create(type, handle, kind, maxvalue);
457 }
458 
459 static void
SemLock_dealloc(SemLock * self)460 SemLock_dealloc(SemLock* self)
461 {
462     if (self->handle != SEM_FAILED)
463         SEM_CLOSE(self->handle);
464     self->ob_type->tp_free((PyObject*)self);
465 }
466 
467 static PyObject *
SemLock_count(SemLock * self)468 SemLock_count(SemLock *self)
469 {
470     return PyInt_FromLong((long)self->count);
471 }
472 
473 static PyObject *
SemLock_ismine(SemLock * self)474 SemLock_ismine(SemLock *self)
475 {
476     /* only makes sense for a lock */
477     return PyBool_FromLong(ISMINE(self));
478 }
479 
480 static PyObject *
SemLock_getvalue(SemLock * self)481 SemLock_getvalue(SemLock *self)
482 {
483 #if HAVE_BROKEN_SEM_GETVALUE
484     PyErr_SetNone(PyExc_NotImplementedError);
485     return NULL;
486 #else
487     int sval;
488     if (SEM_GETVALUE(self->handle, &sval) < 0)
489         SetException(NULL, STANDARD_ERROR);
490     return PyInt_FromLong((long)sval);
491 #endif
492 }
493 
494 static PyObject *
SemLock_afterfork(SemLock * self)495 SemLock_afterfork(SemLock *self)
496 {
497     self->count = 0;
498     Py_RETURN_NONE;
499 }
500 
501 /*
502  * Semaphore methods
503  */
504 
505 static PyMethodDef SemLock_methods[] = {
506     {"acquire", (PyCFunction)SemLock_acquire, METH_KEYWORDS,
507      "acquire the semaphore/lock"},
508     {"release", (PyCFunction)SemLock_release, METH_NOARGS,
509      "release the semaphore/lock"},
510     {"__enter__", (PyCFunction)SemLock_acquire, METH_KEYWORDS,
511      "enter the semaphore/lock"},
512     {"__exit__", (PyCFunction)SemLock_release, METH_VARARGS,
513      "exit the semaphore/lock"},
514     {"_count", (PyCFunction)SemLock_count, METH_NOARGS,
515      "number of `acquire()`s minus number of `release()`s for this process"},
516     {"_isMine", (PyCFunction)SemLock_ismine, METH_NOARGS,
517      "whether the lock is owned by this thread"},
518     {"_getValue", (PyCFunction)SemLock_getvalue, METH_NOARGS,
519      "get the value of the semaphore"},
520     {"_rebuild", (PyCFunction)SemLock_rebuild, METH_VARARGS | METH_CLASS,
521      ""},
522     {"_afterFork", (PyCFunction)SemLock_afterfork, METH_NOARGS,
523      "rezero the net acquisition count after fork()"},
524     {NULL}
525 };
526 
527 /*
528  * Member table
529  */
530 
531 static PyMemberDef SemLock_members[] = {
532     {"handle", T_SEM_HANDLE, offsetof(SemLock, handle), READONLY, ""},
533     {"kind", T_INT, offsetof(SemLock, kind), READONLY, ""},
534     {"maxvalue", T_INT, offsetof(SemLock, maxvalue), READONLY, ""},
535     {NULL}
536 };
537 
538 /*
539  * Semaphore type
540  */
541 
542 PyTypeObject SemLockType = {
543     PyObject_HEAD_INIT(NULL)
544     0,                         /* ob_size */
545     "_processing.SemLock",     /* tp_name */
546     sizeof(SemLock),           /* tp_basicsize */
547     0,                         /* tp_itemsize */
548     (destructor)SemLock_dealloc,
549                                /* tp_dealloc */
550     0,                         /* tp_print */
551     0,                         /* tp_getattr */
552     0,                         /* tp_setattr */
553     0,                         /* tp_compare */
554     0,                         /* tp_repr */
555     0,                         /* tp_as_number */
556     0,                         /* tp_as_sequence */
557     0,                         /* tp_as_mapping */
558     0,                         /* tp_hash */
559     0,                         /* tp_call */
560     0,                         /* tp_str */
561     0,                         /* tp_getattro */
562     0,                         /* tp_setattro */
563     0,                         /* tp_as_buffer */
564     Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
565                                /* tp_flags */
566     "Semaphore/Mutex type",    /* tp_doc */
567     0,                         /* tp_traverse */
568     0,                         /* tp_clear */
569     0,                         /* tp_richcompare */
570     0,                         /* tp_weaklistoffset */
571     0,                         /* tp_iter */
572     0,                         /* tp_iternext */
573     SemLock_methods,           /* tp_methods */
574     SemLock_members,           /* tp_members */
575     0,                         /* tp_getset */
576     0,                         /* tp_base */
577     0,                         /* tp_dict */
578     0,                         /* tp_descr_get */
579     0,                         /* tp_descr_set */
580     0,                         /* tp_dictoffset */
581     0,                         /* tp_init */
582     0,                         /* tp_alloc */
583     (newfunc)SemLock_new,      /* tp_new */
584 };
585