1 /*
2 * A type which wraps a semaphore
3 *
4 * semaphore.c
5 *
6 * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7 */
8
9 #include "processing.h"
10
11 enum { RECURSIVE_MUTEX, SEMAPHORE, BOUNDED_SEMAPHORE };
12
13 typedef struct {
14 PyObject_HEAD
15 SEM_HANDLE handle;
16 long last_tid;
17 int count;
18 int maxvalue;
19 int kind;
20 } SemLock;
21
22 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
23
24
25 #ifdef MS_WINDOWS
26
27 /*
28 * Windows definitions
29 */
30
31 static SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE };
32
33 #define SEM_FAILED NULL
34
35 #define SEM_CLEAR_ERROR() SetLastError(0)
36 #define SEM_GET_LAST_ERROR() GetLastError()
37 #define SEM_CREATE(name, val, max) CreateSemaphore(&sa, val, max, NULL)
38 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
39 #define SEM_GETVALUE(sem, pval) _SemLock_GetSemaphoreValue(sem, pval)
40 #define SEM_UNLINK(name) 0
41
42 static int
_SemLock_GetSemaphoreValue(HANDLE handle,long * value)43 _SemLock_GetSemaphoreValue(HANDLE handle, long *value)
44 {
45 long previous;
46
47 switch (WaitForSingleObject(handle, 0)) {
48 case WAIT_OBJECT_0:
49 if (!ReleaseSemaphore(handle, 1, &previous))
50 return STANDARD_ERROR;
51 *value = previous + 1;
52 return 0;
53 case WAIT_TIMEOUT:
54 *value = 0;
55 return 0;
56 default:
57 return STANDARD_ERROR;
58 }
59 }
60
61 static PyObject *
SemLock_acquire(SemLock * self,PyObject * args,PyObject * kwds)62 SemLock_acquire(SemLock *self, PyObject *args, PyObject *kwds)
63 {
64 int blocking = 1;
65 double timeout;
66 PyObject *timeout_obj = Py_None;
67 DWORD res, dwTimeout;
68
69 static char *kwlist[] = {"block", "timeout", NULL};
70
71 if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
72 &blocking, &timeout_obj))
73 return NULL;
74
75 if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
76 ++self->count;
77 Py_RETURN_TRUE;
78 }
79
80 /* work out timeout */
81 if (!blocking) {
82 dwTimeout = 0;
83 } else if (timeout_obj == Py_None) {
84 dwTimeout = INFINITE;
85 } else {
86 timeout = PyFloat_AsDouble(timeout_obj);
87 if (PyErr_Occurred())
88 return NULL;
89 timeout *= 1000.0; /* convert to millisecs */
90 if (timeout < 0.0) {
91 timeout = 0.0;
92 } else if (timeout >= ((double)INFINITE - 0.5)) {
93 PyErr_SetString(PyExc_OverflowError, "timeout is too large");
94 return NULL;
95 }
96 dwTimeout = (DWORD)(timeout + 0.5);
97 }
98
99 /* do the wait */
100 if (dwTimeout == 0) {
101 res = WaitForSingleObject(self->handle, 0);
102 } else if (main_thread == GetCurrentThreadId()) {
103 res = WaitForSingleObject(self->handle, 0);
104 if (res == WAIT_TIMEOUT) {
105 HANDLE handles[2] = {self->handle, hInterruptEvent};
106 ResetEvent(hInterruptEvent);
107 Py_BEGIN_ALLOW_THREADS
108 res = WaitForMultipleObjects(2, handles, FALSE, dwTimeout);
109 Py_END_ALLOW_THREADS
110 }
111 } else {
112 Py_BEGIN_ALLOW_THREADS
113 res = WaitForSingleObject(self->handle, dwTimeout);
114 Py_END_ALLOW_THREADS
115 }
116
117 /* handle result */
118 switch (res) {
119 case WAIT_TIMEOUT:
120 Py_RETURN_FALSE;
121 case WAIT_OBJECT_0:
122 self->last_tid = GetCurrentThreadId();
123 ++self->count;
124 Py_RETURN_TRUE;
125 case (WAIT_OBJECT_0 + 1): /* we got SIGINT; do like in timemodule.c */
126 Sleep(1);
127 errno = EINTR;
128 PyErr_SetFromErrno(PyExc_OSError);
129 return NULL;
130 case WAIT_FAILED:
131 return PyErr_SetFromWindowsErr(0);
132 default:
133 PyErr_SetString(PyExc_RuntimeError, "WaitForSingleObject() or "
134 "WaitForMultipleObjects() gave unrecognized value");
135 return NULL;
136 }
137 }
138
139 static PyObject *
SemLock_release(SemLock * self,PyObject * args)140 SemLock_release(SemLock *self, PyObject *args)
141 {
142 if (self->kind == RECURSIVE_MUTEX) {
143 if (!ISMINE(self)) {
144 PyErr_SetString(PyExc_AssertionError, "attempt to release "
145 "recursive lock not owned by thread");
146 return NULL;
147 }
148 if (self->count > 1) {
149 --self->count;
150 Py_RETURN_NONE;
151 }
152 assert(self->count == 1);
153 }
154
155 if (!ReleaseSemaphore(self->handle, 1, NULL)) {
156 if (GetLastError() == ERROR_TOO_MANY_POSTS) {
157 PyErr_SetString(PyExc_ValueError,
158 "semaphore or lock released too many times");
159 return NULL;
160 } else {
161 return PyErr_SetFromWindowsErr(0);
162 }
163 }
164
165 --self->count;
166 Py_RETURN_NONE;
167 }
168
169 #else /* !MS_WINDOWS */
170
171 /*
172 * Unix definitions
173 */
174
175 #define SEM_CLEAR_ERROR()
176 #define SEM_GET_LAST_ERROR() 0
177 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
178 #define SEM_CLOSE(sem) sem_close(sem)
179 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
180 #define SEM_UNLINK(name) sem_unlink(name)
181
182 #if HAVE_BROKEN_SEM_UNLINK
183 # define sem_unlink(name) 0
184 #endif
185
186 #if !HAVE_SEM_TIMEDWAIT
187 # define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
188
189 int
sem_timedwait_save(sem_t * sem,struct timespec * deadline,PyThreadState * _save)190 sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
191 {
192 int res;
193 unsigned long delay, difference;
194 struct timeval now, tvdeadline, tvdelay;
195
196 errno = 0;
197 tvdeadline.tv_sec = deadline->tv_sec;
198 tvdeadline.tv_usec = deadline->tv_nsec / 1000;
199
200 for (delay = 0 ; ; delay += 1000) {
201 /* poll */
202 if (sem_trywait(sem) == 0)
203 return 0;
204 else if (errno != EAGAIN)
205 return STANDARD_ERROR;
206
207 /* get current time */
208 if (gettimeofday(&now, NULL) < 0)
209 return STANDARD_ERROR;
210
211 /* check for timeout */
212 if (tvdeadline.tv_sec < now.tv_sec ||
213 (tvdeadline.tv_sec == now.tv_sec &&
214 tvdeadline.tv_usec <= now.tv_usec)) {
215 errno = ETIMEDOUT;
216 return STANDARD_ERROR;
217 }
218
219 /* calculate how much time is left */
220 difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
221 (tvdeadline.tv_usec - now.tv_usec);
222
223 /* check delay not too long -- maximum is 20 msecs */
224 if (delay > 20000)
225 delay = 20000;
226 if (delay > difference)
227 delay = difference;
228
229 /* sleep */
230 tvdelay.tv_sec = delay / 1000000;
231 tvdelay.tv_usec = delay % 1000000;
232 if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
233 return STANDARD_ERROR;
234
235 /* check for signals */
236 Py_BLOCK_THREADS
237 res = PyErr_CheckSignals();
238 Py_UNBLOCK_THREADS
239
240 if (res) {
241 errno = EINTR;
242 return EXCEPTION_HAS_BEEN_SET;
243 }
244 }
245 }
246
247 #endif /* !HAVE_SEM_TIMEDWAIT */
248
249 static PyObject *
SemLock_acquire(SemLock * self,PyObject * args,PyObject * kwds)250 SemLock_acquire(SemLock *self, PyObject *args, PyObject *kwds)
251 {
252 int blocking = 1, res;
253 double timeout;
254 PyObject *timeout_obj = Py_None;
255 struct timespec deadline = {0};
256 struct timeval now;
257 long sec, nsec;
258
259 static char *kwlist[] = {"block", "timeout", NULL};
260
261 if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
262 &blocking, &timeout_obj))
263 return NULL;
264
265 if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
266 ++self->count;
267 Py_RETURN_TRUE;
268 }
269
270 if (timeout_obj != Py_None) {
271 timeout = PyFloat_AsDouble(timeout_obj);
272 if (PyErr_Occurred())
273 return NULL;
274 if (timeout < 0.0)
275 timeout = 0.0;
276
277 if (gettimeofday(&now, NULL) < 0) {
278 PyErr_SetFromErrno(PyExc_OSError);
279 return NULL;
280 }
281 sec = (long) timeout;
282 nsec = (long) (1e9 * (timeout - sec) + 0.5);
283 deadline.tv_sec = now.tv_sec + sec;
284 deadline.tv_nsec = now.tv_usec * 1000 + nsec;
285 deadline.tv_sec += (deadline.tv_nsec / 1000000000);
286 deadline.tv_nsec %= 1000000000;
287 }
288
289 do {
290 Py_BEGIN_ALLOW_THREADS
291 if (blocking && timeout_obj == Py_None)
292 res = sem_wait(self->handle);
293 else if (!blocking)
294 res = sem_trywait(self->handle);
295 else
296 res = sem_timedwait(self->handle, &deadline);
297 Py_END_ALLOW_THREADS
298 if (res == EXCEPTION_HAS_BEEN_SET)
299 break;
300 } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
301
302 if (res < 0) {
303 if (errno == EAGAIN || errno == ETIMEDOUT)
304 Py_RETURN_FALSE;
305 else if (errno == EINTR)
306 return NULL;
307 else
308 return PyErr_SetFromErrno(PyExc_OSError);
309 }
310
311 ++self->count;
312 self->last_tid = PyThread_get_thread_ident();
313
314 Py_RETURN_TRUE;
315 }
316
317 static PyObject *
SemLock_release(SemLock * self,PyObject * args)318 SemLock_release(SemLock *self, PyObject *args)
319 {
320 switch (self->kind) {
321 case RECURSIVE_MUTEX:
322 if (!ISMINE(self)) {
323 PyErr_SetString(PyExc_AssertionError, "attempt to release "
324 "recursive lock not owned by thread");
325 return NULL;
326 }
327 if (self->count > 1) {
328 --self->count;
329 Py_RETURN_NONE;
330 }
331 assert(self->count == 1);
332 break;
333 #if HAVE_BROKEN_SEM_GETVALUE
334 case BOUNDED_SEMAPHORE:
335 /* We will only check properly the Lock case (where maxvalue == 1) */
336 if (self->maxvalue == 1) {
337 /* make sure that already locked */
338 if (sem_trywait(self->handle) < 0) {
339 if (errno != EAGAIN)
340 return PyErr_SetFromErrno(PyExc_OSError);
341 /* it is already locked as expected */
342 } else {
343 /* it was not locked -- so undo wait and raise error */
344 if (sem_post(self->handle) < 0)
345 return PyErr_SetFromErrno(PyExc_OSError);
346 PyErr_SetString(PyExc_ValueError,
347 "semaphore or lock released too many times");
348 return NULL;
349 }
350 }
351 #else
352 case BOUNDED_SEMAPHORE:
353 {
354 int sval;
355
356 /* This check is not an absolute guarantee that the semaphore
357 does not rise above maxvalue. */
358 if (sem_getvalue(self->handle, &sval) < 0) {
359 return PyErr_SetFromErrno(PyExc_OSError);
360 } else if (sval >= self->maxvalue) {
361 PyErr_SetString(PyExc_ValueError,
362 "semaphore or lock released too many times");
363 return NULL;
364 }
365 }
366 #endif
367 }
368
369 if (sem_post(self->handle) < 0)
370 return PyErr_SetFromErrno(PyExc_OSError);
371
372 --self->count;
373 Py_RETURN_NONE;
374 }
375
376 #endif /* !MS_WINDOWS */
377
378 /*
379 * All platforms
380 */
381
382 static PyObject *
_SemLock_create(PyTypeObject * type,SEM_HANDLE handle,int kind,int maxvalue)383 _SemLock_create(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
384 {
385 SemLock *self;
386
387 self = (SemLock*)type->tp_alloc(type, 0);
388 if (!self)
389 return NULL;
390 self->handle = handle;
391 self->kind = kind;
392 self->count = 0;
393 self->last_tid = 0;
394 self->maxvalue = maxvalue;
395 return (PyObject*)self;
396 }
397
398 static PyObject *
SemLock_new(PyTypeObject * type,PyObject * args,PyObject * kwds)399 SemLock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
400 {
401 char buffer[256];
402 SEM_HANDLE handle = SEM_FAILED;
403 int kind, maxvalue, value;
404 PyObject *result;
405 static char *kwlist[] = {"kind", "value", NULL};
406 static int counter = 0;
407
408 if (!PyArg_ParseTupleAndKeywords(args, kwds, "ii", kwlist, &kind, &value))
409 return NULL;
410
411 if (kind < RECURSIVE_MUTEX || kind > BOUNDED_SEMAPHORE) {
412 PyErr_SetString(PyExc_ValueError, "unrecongnized blocker type");
413 return NULL;
414 }
415
416 PyOS_snprintf(buffer, sizeof(buffer), "/pr%d-%d", getpid(), counter++);
417
418 if (kind == BOUNDED_SEMAPHORE)
419 maxvalue = value;
420 else if (kind == RECURSIVE_MUTEX)
421 maxvalue = 1;
422 else
423 maxvalue = INT_MAX;
424
425 SEM_CLEAR_ERROR();
426 handle = SEM_CREATE(buffer, value, maxvalue);
427 /* On Windows we should fail if GetLastError() == ERROR_ALREADY_EXISTS */
428 if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
429 goto failure;
430
431 if (SEM_UNLINK(buffer) < 0)
432 goto failure;
433
434 result = _SemLock_create(type, handle, kind, maxvalue);
435 if (!result)
436 goto failure;
437
438 return result;
439
440 failure:
441 if (handle != SEM_FAILED)
442 SEM_CLOSE(handle);
443 SetException(NULL, STANDARD_ERROR);
444 return NULL;
445 }
446
447 static PyObject *
SemLock_rebuild(PyTypeObject * type,PyObject * args)448 SemLock_rebuild(PyTypeObject *type, PyObject *args)
449 {
450 SEM_HANDLE handle;
451 int kind, maxvalue;
452
453 if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii", &handle, &kind, &maxvalue))
454 return NULL;
455
456 return _SemLock_create(type, handle, kind, maxvalue);
457 }
458
459 static void
SemLock_dealloc(SemLock * self)460 SemLock_dealloc(SemLock* self)
461 {
462 if (self->handle != SEM_FAILED)
463 SEM_CLOSE(self->handle);
464 self->ob_type->tp_free((PyObject*)self);
465 }
466
467 static PyObject *
SemLock_count(SemLock * self)468 SemLock_count(SemLock *self)
469 {
470 return PyInt_FromLong((long)self->count);
471 }
472
473 static PyObject *
SemLock_ismine(SemLock * self)474 SemLock_ismine(SemLock *self)
475 {
476 /* only makes sense for a lock */
477 return PyBool_FromLong(ISMINE(self));
478 }
479
480 static PyObject *
SemLock_getvalue(SemLock * self)481 SemLock_getvalue(SemLock *self)
482 {
483 #if HAVE_BROKEN_SEM_GETVALUE
484 PyErr_SetNone(PyExc_NotImplementedError);
485 return NULL;
486 #else
487 int sval;
488 if (SEM_GETVALUE(self->handle, &sval) < 0)
489 SetException(NULL, STANDARD_ERROR);
490 return PyInt_FromLong((long)sval);
491 #endif
492 }
493
494 static PyObject *
SemLock_afterfork(SemLock * self)495 SemLock_afterfork(SemLock *self)
496 {
497 self->count = 0;
498 Py_RETURN_NONE;
499 }
500
501 /*
502 * Semaphore methods
503 */
504
505 static PyMethodDef SemLock_methods[] = {
506 {"acquire", (PyCFunction)SemLock_acquire, METH_KEYWORDS,
507 "acquire the semaphore/lock"},
508 {"release", (PyCFunction)SemLock_release, METH_NOARGS,
509 "release the semaphore/lock"},
510 {"__enter__", (PyCFunction)SemLock_acquire, METH_KEYWORDS,
511 "enter the semaphore/lock"},
512 {"__exit__", (PyCFunction)SemLock_release, METH_VARARGS,
513 "exit the semaphore/lock"},
514 {"_count", (PyCFunction)SemLock_count, METH_NOARGS,
515 "number of `acquire()`s minus number of `release()`s for this process"},
516 {"_isMine", (PyCFunction)SemLock_ismine, METH_NOARGS,
517 "whether the lock is owned by this thread"},
518 {"_getValue", (PyCFunction)SemLock_getvalue, METH_NOARGS,
519 "get the value of the semaphore"},
520 {"_rebuild", (PyCFunction)SemLock_rebuild, METH_VARARGS | METH_CLASS,
521 ""},
522 {"_afterFork", (PyCFunction)SemLock_afterfork, METH_NOARGS,
523 "rezero the net acquisition count after fork()"},
524 {NULL}
525 };
526
527 /*
528 * Member table
529 */
530
531 static PyMemberDef SemLock_members[] = {
532 {"handle", T_SEM_HANDLE, offsetof(SemLock, handle), READONLY, ""},
533 {"kind", T_INT, offsetof(SemLock, kind), READONLY, ""},
534 {"maxvalue", T_INT, offsetof(SemLock, maxvalue), READONLY, ""},
535 {NULL}
536 };
537
538 /*
539 * Semaphore type
540 */
541
542 PyTypeObject SemLockType = {
543 PyObject_HEAD_INIT(NULL)
544 0, /* ob_size */
545 "_processing.SemLock", /* tp_name */
546 sizeof(SemLock), /* tp_basicsize */
547 0, /* tp_itemsize */
548 (destructor)SemLock_dealloc,
549 /* tp_dealloc */
550 0, /* tp_print */
551 0, /* tp_getattr */
552 0, /* tp_setattr */
553 0, /* tp_compare */
554 0, /* tp_repr */
555 0, /* tp_as_number */
556 0, /* tp_as_sequence */
557 0, /* tp_as_mapping */
558 0, /* tp_hash */
559 0, /* tp_call */
560 0, /* tp_str */
561 0, /* tp_getattro */
562 0, /* tp_setattro */
563 0, /* tp_as_buffer */
564 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
565 /* tp_flags */
566 "Semaphore/Mutex type", /* tp_doc */
567 0, /* tp_traverse */
568 0, /* tp_clear */
569 0, /* tp_richcompare */
570 0, /* tp_weaklistoffset */
571 0, /* tp_iter */
572 0, /* tp_iternext */
573 SemLock_methods, /* tp_methods */
574 SemLock_members, /* tp_members */
575 0, /* tp_getset */
576 0, /* tp_base */
577 0, /* tp_dict */
578 0, /* tp_descr_get */
579 0, /* tp_descr_set */
580 0, /* tp_dictoffset */
581 0, /* tp_init */
582 0, /* tp_alloc */
583 (newfunc)SemLock_new, /* tp_new */
584 };
585