1 /*
2 * A type which wraps a semaphore
3 *
4 * semaphore.c
5 *
6 * Copyright (c) 2006-2008, R Oudkerk
7 * Licensed to PSF under a Contributor Agreement.
8 */
9
10 #include "multiprocess.h"
11
12 enum { RECURSIVE_MUTEX, SEMAPHORE };
13
14 typedef struct {
15 PyObject_HEAD
16 SEM_HANDLE handle;
17 long last_tid;
18 int count;
19 int maxvalue;
20 int kind;
21 } SemLockObject;
22
23 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
24
25
26 #ifdef MS_WINDOWS
27
28 /*
29 * Windows definitions
30 */
31
32 #define SEM_FAILED NULL
33
34 #define SEM_CLEAR_ERROR() SetLastError(0)
35 #define SEM_GET_LAST_ERROR() GetLastError()
36 #define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
37 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
38 #define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
39 #define SEM_UNLINK(name) 0
40
41 static int
_GetSemaphoreValue(HANDLE handle,long * value)42 _GetSemaphoreValue(HANDLE handle, long *value)
43 {
44 long previous;
45
46 switch (WaitForSingleObject(handle, 0)) {
47 case WAIT_OBJECT_0:
48 if (!ReleaseSemaphore(handle, 1, &previous))
49 return MP_STANDARD_ERROR;
50 *value = previous + 1;
51 return 0;
52 case WAIT_TIMEOUT:
53 *value = 0;
54 return 0;
55 default:
56 return MP_STANDARD_ERROR;
57 }
58 }
59
60 static PyObject *
semlock_acquire(SemLockObject * self,PyObject * args,PyObject * kwds)61 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
62 {
63 int blocking = 1;
64 double timeout;
65 PyObject *timeout_obj = Py_None;
66 DWORD res, full_msecs, nhandles;
67 HANDLE handles[2], sigint_event;
68
69 static char *kwlist[] = {"block", "timeout", NULL};
70
71 if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
72 &blocking, &timeout_obj))
73 return NULL;
74
75 /* calculate timeout */
76 if (!blocking) {
77 full_msecs = 0;
78 } else if (timeout_obj == Py_None) {
79 full_msecs = INFINITE;
80 } else {
81 timeout = PyFloat_AsDouble(timeout_obj);
82 if (PyErr_Occurred())
83 return NULL;
84 timeout *= 1000.0; /* convert to millisecs */
85 if (timeout < 0.0) {
86 timeout = 0.0;
87 } else if (timeout >= 0.5 * INFINITE) { /* 25 days */
88 PyErr_SetString(PyExc_OverflowError,
89 "timeout is too large");
90 return NULL;
91 }
92 full_msecs = (DWORD)(timeout + 0.5);
93 }
94
95 /* check whether we already own the lock */
96 if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
97 ++self->count;
98 Py_RETURN_TRUE;
99 }
100
101 /* check whether we can acquire without releasing the GIL and blocking */
102 if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) {
103 self->last_tid = GetCurrentThreadId();
104 ++self->count;
105 Py_RETURN_TRUE;
106 }
107
108 /* prepare list of handles */
109 nhandles = 0;
110 handles[nhandles++] = self->handle;
111 if (_PyOS_IsMainThread()) {
112 sigint_event = _PyOS_SigintEvent();
113 assert(sigint_event != NULL);
114 handles[nhandles++] = sigint_event;
115 }
116
117 /* do the wait */
118 Py_BEGIN_ALLOW_THREADS
119 if (sigint_event != NULL)
120 ResetEvent(sigint_event);
121 res = WaitForMultipleObjects(nhandles, handles, FALSE, full_msecs);
122 Py_END_ALLOW_THREADS
123
124 /* handle result */
125 switch (res) {
126 case WAIT_TIMEOUT:
127 Py_RETURN_FALSE;
128 case WAIT_OBJECT_0 + 0:
129 self->last_tid = GetCurrentThreadId();
130 ++self->count;
131 Py_RETURN_TRUE;
132 case WAIT_OBJECT_0 + 1:
133 errno = EINTR;
134 return PyErr_SetFromErrno(PyExc_IOError);
135 case WAIT_FAILED:
136 return PyErr_SetFromWindowsErr(0);
137 default:
138 PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or "
139 "WaitForMultipleObjects() gave unrecognized "
140 "value %d", res);
141 return NULL;
142 }
143 }
144
145 static PyObject *
semlock_release(SemLockObject * self,PyObject * args)146 semlock_release(SemLockObject *self, PyObject *args)
147 {
148 if (self->kind == RECURSIVE_MUTEX) {
149 if (!ISMINE(self)) {
150 PyErr_SetString(PyExc_AssertionError, "attempt to "
151 "release recursive lock not owned "
152 "by thread");
153 return NULL;
154 }
155 if (self->count > 1) {
156 --self->count;
157 Py_RETURN_NONE;
158 }
159 assert(self->count == 1);
160 }
161
162 if (!ReleaseSemaphore(self->handle, 1, NULL)) {
163 if (GetLastError() == ERROR_TOO_MANY_POSTS) {
164 PyErr_SetString(PyExc_ValueError, "semaphore or lock "
165 "released too many times");
166 return NULL;
167 } else {
168 return PyErr_SetFromWindowsErr(0);
169 }
170 }
171
172 --self->count;
173 Py_RETURN_NONE;
174 }
175
176 #else /* !MS_WINDOWS */
177
178 /*
179 * Unix definitions
180 */
181
182 #define SEM_CLEAR_ERROR()
183 #define SEM_GET_LAST_ERROR() 0
184 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
185 #define SEM_CLOSE(sem) sem_close(sem)
186 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
187 #define SEM_UNLINK(name) sem_unlink(name)
188
189 /* OS X 10.4 defines SEM_FAILED as -1 instead of (sem_t *)-1; this gives
190 compiler warnings, and (potentially) undefined behaviour. */
191 #ifdef __APPLE__
192 # undef SEM_FAILED
193 # define SEM_FAILED ((sem_t *)-1)
194 #endif
195
196 #ifndef HAVE_SEM_UNLINK
197 # define sem_unlink(name) 0
198 #endif
199
200 //ifndef HAVE_SEM_TIMEDWAIT
201 # define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
202
203 int
sem_timedwait_save(sem_t * sem,struct timespec * deadline,PyThreadState * _save)204 sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
205 {
206 int res;
207 unsigned long delay, difference;
208 struct timeval now, tvdeadline, tvdelay;
209
210 errno = 0;
211 tvdeadline.tv_sec = deadline->tv_sec;
212 tvdeadline.tv_usec = deadline->tv_nsec / 1000;
213
214 for (delay = 0 ; ; delay += 1000) {
215 /* poll */
216 if (sem_trywait(sem) == 0)
217 return 0;
218 else if (errno != EAGAIN)
219 return MP_STANDARD_ERROR;
220
221 /* get current time */
222 if (gettimeofday(&now, NULL) < 0)
223 return MP_STANDARD_ERROR;
224
225 /* check for timeout */
226 if (tvdeadline.tv_sec < now.tv_sec ||
227 (tvdeadline.tv_sec == now.tv_sec &&
228 tvdeadline.tv_usec <= now.tv_usec)) {
229 errno = ETIMEDOUT;
230 return MP_STANDARD_ERROR;
231 }
232
233 /* calculate how much time is left */
234 difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
235 (tvdeadline.tv_usec - now.tv_usec);
236
237 /* check delay not too long -- maximum is 20 msecs */
238 if (delay > 20000)
239 delay = 20000;
240 if (delay > difference)
241 delay = difference;
242
243 /* sleep */
244 tvdelay.tv_sec = delay / 1000000;
245 tvdelay.tv_usec = delay % 1000000;
246 if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
247 return MP_STANDARD_ERROR;
248
249 /* check for signals */
250 Py_BLOCK_THREADS
251 res = PyErr_CheckSignals();
252 Py_UNBLOCK_THREADS
253
254 if (res) {
255 errno = EINTR;
256 return MP_EXCEPTION_HAS_BEEN_SET;
257 }
258 }
259 }
260
261 //endif /* !HAVE_SEM_TIMEDWAIT */
262
263 static PyObject *
semlock_acquire(SemLockObject * self,PyObject * args,PyObject * kwds)264 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
265 {
266 int blocking = 1, res, err = 0;
267 double timeout;
268 PyObject *timeout_obj = Py_None;
269 struct timespec deadline = {0};
270 struct timeval now;
271 long sec, nsec;
272
273 static char *kwlist[] = {"block", "timeout", NULL};
274
275 if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
276 &blocking, &timeout_obj))
277 return NULL;
278
279 if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
280 ++self->count;
281 Py_RETURN_TRUE;
282 }
283
284 if (timeout_obj != Py_None) {
285 timeout = PyFloat_AsDouble(timeout_obj);
286 if (PyErr_Occurred())
287 return NULL;
288 if (timeout < 0.0)
289 timeout = 0.0;
290
291 if (gettimeofday(&now, NULL) < 0) {
292 PyErr_SetFromErrno(PyExc_OSError);
293 return NULL;
294 }
295 sec = (long) timeout;
296 nsec = (long) (1e9 * (timeout - sec) + 0.5);
297 deadline.tv_sec = now.tv_sec + sec;
298 deadline.tv_nsec = now.tv_usec * 1000 + nsec;
299 deadline.tv_sec += (deadline.tv_nsec / 1000000000);
300 deadline.tv_nsec %= 1000000000;
301 }
302
303 do {
304 Py_BEGIN_ALLOW_THREADS
305 if (blocking && timeout_obj == Py_None)
306 res = sem_wait(self->handle);
307 else if (!blocking)
308 res = sem_trywait(self->handle);
309 else
310 res = sem_timedwait(self->handle, &deadline);
311 Py_END_ALLOW_THREADS
312 err = errno;
313 if (res == MP_EXCEPTION_HAS_BEEN_SET)
314 break;
315 } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
316
317 if (res < 0) {
318 errno = err;
319 if (errno == EAGAIN || errno == ETIMEDOUT)
320 Py_RETURN_FALSE;
321 else if (errno == EINTR)
322 return NULL;
323 else
324 return PyErr_SetFromErrno(PyExc_OSError);
325 }
326
327 ++self->count;
328 self->last_tid = PyThread_get_thread_ident();
329
330 Py_RETURN_TRUE;
331 }
332
333 static PyObject *
semlock_release(SemLockObject * self,PyObject * args)334 semlock_release(SemLockObject *self, PyObject *args)
335 {
336 if (self->kind == RECURSIVE_MUTEX) {
337 if (!ISMINE(self)) {
338 PyErr_SetString(PyExc_AssertionError, "attempt to "
339 "release recursive lock not owned "
340 "by thread");
341 return NULL;
342 }
343 if (self->count > 1) {
344 --self->count;
345 Py_RETURN_NONE;
346 }
347 assert(self->count == 1);
348 } else {
349 #ifdef HAVE_BROKEN_SEM_GETVALUE
350 /* We will only check properly the maxvalue == 1 case */
351 if (self->maxvalue == 1) {
352 /* make sure that already locked */
353 if (sem_trywait(self->handle) < 0) {
354 if (errno != EAGAIN) {
355 PyErr_SetFromErrno(PyExc_OSError);
356 return NULL;
357 }
358 /* it is already locked as expected */
359 } else {
360 /* it was not locked so undo wait and raise */
361 if (sem_post(self->handle) < 0) {
362 PyErr_SetFromErrno(PyExc_OSError);
363 return NULL;
364 }
365 PyErr_SetString(PyExc_ValueError, "semaphore "
366 "or lock released too many "
367 "times");
368 return NULL;
369 }
370 }
371 #else
372 int sval;
373
374 /* This check is not an absolute guarantee that the semaphore
375 does not rise above maxvalue. */
376 if (sem_getvalue(self->handle, &sval) < 0) {
377 return PyErr_SetFromErrno(PyExc_OSError);
378 } else if (sval >= self->maxvalue) {
379 PyErr_SetString(PyExc_ValueError, "semaphore or lock "
380 "released too many times");
381 return NULL;
382 }
383 #endif
384 }
385
386 if (sem_post(self->handle) < 0)
387 return PyErr_SetFromErrno(PyExc_OSError);
388
389 --self->count;
390 Py_RETURN_NONE;
391 }
392
393 #endif /* !MS_WINDOWS */
394
395 /*
396 * All platforms
397 */
398
399 static PyObject *
newsemlockobject(PyTypeObject * type,SEM_HANDLE handle,int kind,int maxvalue)400 newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
401 {
402 SemLockObject *self;
403
404 self = PyObject_New(SemLockObject, type);
405 if (!self)
406 return NULL;
407 self->handle = handle;
408 self->kind = kind;
409 self->count = 0;
410 self->last_tid = 0;
411 self->maxvalue = maxvalue;
412 return (PyObject*)self;
413 }
414
415 static PyObject *
semlock_new(PyTypeObject * type,PyObject * args,PyObject * kwds)416 semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
417 {
418 char buffer[256];
419 SEM_HANDLE handle = SEM_FAILED;
420 int kind, maxvalue, value;
421 PyObject *result;
422 static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
423 static int counter = 0;
424
425 if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
426 &kind, &value, &maxvalue))
427 return NULL;
428
429 if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
430 PyErr_SetString(PyExc_ValueError, "unrecognized kind");
431 return NULL;
432 }
433
434 PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%d", (long)getpid(), counter++);
435
436 SEM_CLEAR_ERROR();
437 handle = SEM_CREATE(buffer, value, maxvalue);
438 /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
439 if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
440 goto failure;
441
442 if (SEM_UNLINK(buffer) < 0)
443 goto failure;
444
445 result = newsemlockobject(type, handle, kind, maxvalue);
446 if (!result)
447 goto failure;
448
449 return result;
450
451 failure:
452 if (handle != SEM_FAILED)
453 SEM_CLOSE(handle);
454 mp_SetError(NULL, MP_STANDARD_ERROR);
455 return NULL;
456 }
457
458 static PyObject *
semlock_rebuild(PyTypeObject * type,PyObject * args)459 semlock_rebuild(PyTypeObject *type, PyObject *args)
460 {
461 SEM_HANDLE handle;
462 int kind, maxvalue;
463
464 if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
465 &handle, &kind, &maxvalue))
466 return NULL;
467
468 return newsemlockobject(type, handle, kind, maxvalue);
469 }
470
471 static void
semlock_dealloc(SemLockObject * self)472 semlock_dealloc(SemLockObject* self)
473 {
474 if (self->handle != SEM_FAILED)
475 SEM_CLOSE(self->handle);
476 PyObject_Del(self);
477 }
478
479 static PyObject *
semlock_count(SemLockObject * self)480 semlock_count(SemLockObject *self)
481 {
482 return PyLong_FromLong((long)self->count);
483 }
484
485 static PyObject *
semlock_ismine(SemLockObject * self)486 semlock_ismine(SemLockObject *self)
487 {
488 /* only makes sense for a lock */
489 return PyBool_FromLong(ISMINE(self));
490 }
491
492 static PyObject *
semlock_getvalue(SemLockObject * self)493 semlock_getvalue(SemLockObject *self)
494 {
495 #ifdef HAVE_BROKEN_SEM_GETVALUE
496 PyErr_SetNone(PyExc_NotImplementedError);
497 return NULL;
498 #else
499 int sval;
500 if (SEM_GETVALUE(self->handle, &sval) < 0)
501 return mp_SetError(NULL, MP_STANDARD_ERROR);
502 /* some posix implementations use negative numbers to indicate
503 the number of waiting threads */
504 if (sval < 0)
505 sval = 0;
506 return PyLong_FromLong((long)sval);
507 #endif
508 }
509
510 static PyObject *
semlock_iszero(SemLockObject * self)511 semlock_iszero(SemLockObject *self)
512 {
513 #ifdef HAVE_BROKEN_SEM_GETVALUE
514 if (sem_trywait(self->handle) < 0) {
515 if (errno == EAGAIN)
516 Py_RETURN_TRUE;
517 return mp_SetError(NULL, MP_STANDARD_ERROR);
518 } else {
519 if (sem_post(self->handle) < 0)
520 return mp_SetError(NULL, MP_STANDARD_ERROR);
521 Py_RETURN_FALSE;
522 }
523 #else
524 int sval;
525 if (SEM_GETVALUE(self->handle, &sval) < 0)
526 return mp_SetError(NULL, MP_STANDARD_ERROR);
527 return PyBool_FromLong((long)sval == 0);
528 #endif
529 }
530
531 static PyObject *
semlock_afterfork(SemLockObject * self)532 semlock_afterfork(SemLockObject *self)
533 {
534 self->count = 0;
535 Py_RETURN_NONE;
536 }
537
538 /*
539 * Semaphore methods
540 */
541
542 static PyMethodDef semlock_methods[] = {
543 {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
544 "acquire the semaphore/lock"},
545 {"release", (PyCFunction)semlock_release, METH_NOARGS,
546 "release the semaphore/lock"},
547 {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
548 "enter the semaphore/lock"},
549 {"__exit__", (PyCFunction)semlock_release, METH_VARARGS,
550 "exit the semaphore/lock"},
551 {"_count", (PyCFunction)semlock_count, METH_NOARGS,
552 "num of `acquire()`s minus num of `release()`s for this process"},
553 {"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS,
554 "whether the lock is owned by this thread"},
555 {"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS,
556 "get the value of the semaphore"},
557 {"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS,
558 "returns whether semaphore has value zero"},
559 {"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS,
560 ""},
561 {"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS,
562 "rezero the net acquisition count after fork()"},
563 {NULL}
564 };
565
566 /*
567 * Member table
568 */
569
570 static PyMemberDef semlock_members[] = {
571 {"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY,
572 ""},
573 {"kind", T_INT, offsetof(SemLockObject, kind), READONLY,
574 ""},
575 {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
576 ""},
577 {NULL}
578 };
579
580 /*
581 * Semaphore type
582 */
583
584 PyTypeObject SemLockType = {
585 PyVarObject_HEAD_INIT(NULL, 0)
586 /* tp_name */ "_multiprocess.SemLock",
587 /* tp_basicsize */ sizeof(SemLockObject),
588 /* tp_itemsize */ 0,
589 /* tp_dealloc */ (destructor)semlock_dealloc,
590 /* tp_print */ 0,
591 /* tp_getattr */ 0,
592 /* tp_setattr */ 0,
593 /* tp_reserved */ 0,
594 /* tp_repr */ 0,
595 /* tp_as_number */ 0,
596 /* tp_as_sequence */ 0,
597 /* tp_as_mapping */ 0,
598 /* tp_hash */ 0,
599 /* tp_call */ 0,
600 /* tp_str */ 0,
601 /* tp_getattro */ 0,
602 /* tp_setattro */ 0,
603 /* tp_as_buffer */ 0,
604 /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
605 /* tp_doc */ "Semaphore/Mutex type",
606 /* tp_traverse */ 0,
607 /* tp_clear */ 0,
608 /* tp_richcompare */ 0,
609 /* tp_weaklistoffset */ 0,
610 /* tp_iter */ 0,
611 /* tp_iternext */ 0,
612 /* tp_methods */ semlock_methods,
613 /* tp_members */ semlock_members,
614 /* tp_getset */ 0,
615 /* tp_base */ 0,
616 /* tp_dict */ 0,
617 /* tp_descr_get */ 0,
618 /* tp_descr_set */ 0,
619 /* tp_dictoffset */ 0,
620 /* tp_init */ 0,
621 /* tp_alloc */ 0,
622 /* tp_new */ semlock_new,
623 };
624