1 
2 /* interpreters module */
3 /* low-level access to interpreter primitives */
4 
5 #include "Python.h"
6 #include "frameobject.h"
7 #include "interpreteridobject.h"
8 
9 
10 static char *
_copy_raw_string(PyObject * strobj)11 _copy_raw_string(PyObject *strobj)
12 {
13     const char *str = PyUnicode_AsUTF8(strobj);
14     if (str == NULL) {
15         return NULL;
16     }
17     char *copied = PyMem_Malloc(strlen(str)+1);
18     if (copied == NULL) {
19         PyErr_NoMemory();
20         return NULL;
21     }
22     strcpy(copied, str);
23     return copied;
24 }
25 
26 static PyInterpreterState *
_get_current(void)27 _get_current(void)
28 {
29     // _PyInterpreterState_Get() aborts if lookup fails, so don't need
30     // to check the result for NULL.
31     return _PyInterpreterState_Get();
32 }
33 
34 
35 /* data-sharing-specific code ***********************************************/
36 
37 struct _sharednsitem {
38     char *name;
39     _PyCrossInterpreterData data;
40 };
41 
42 static void _sharednsitem_clear(struct _sharednsitem *);  // forward
43 
44 static int
_sharednsitem_init(struct _sharednsitem * item,PyObject * key,PyObject * value)45 _sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
46 {
47     item->name = _copy_raw_string(key);
48     if (item->name == NULL) {
49         return -1;
50     }
51     if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
52         _sharednsitem_clear(item);
53         return -1;
54     }
55     return 0;
56 }
57 
58 static void
_sharednsitem_clear(struct _sharednsitem * item)59 _sharednsitem_clear(struct _sharednsitem *item)
60 {
61     if (item->name != NULL) {
62         PyMem_Free(item->name);
63         item->name = NULL;
64     }
65     _PyCrossInterpreterData_Release(&item->data);
66 }
67 
68 static int
_sharednsitem_apply(struct _sharednsitem * item,PyObject * ns)69 _sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
70 {
71     PyObject *name = PyUnicode_FromString(item->name);
72     if (name == NULL) {
73         return -1;
74     }
75     PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
76     if (value == NULL) {
77         Py_DECREF(name);
78         return -1;
79     }
80     int res = PyDict_SetItem(ns, name, value);
81     Py_DECREF(name);
82     Py_DECREF(value);
83     return res;
84 }
85 
86 typedef struct _sharedns {
87     Py_ssize_t len;
88     struct _sharednsitem* items;
89 } _sharedns;
90 
91 static _sharedns *
_sharedns_new(Py_ssize_t len)92 _sharedns_new(Py_ssize_t len)
93 {
94     _sharedns *shared = PyMem_NEW(_sharedns, 1);
95     if (shared == NULL) {
96         PyErr_NoMemory();
97         return NULL;
98     }
99     shared->len = len;
100     shared->items = PyMem_NEW(struct _sharednsitem, len);
101     if (shared->items == NULL) {
102         PyErr_NoMemory();
103         PyMem_Free(shared);
104         return NULL;
105     }
106     return shared;
107 }
108 
109 static void
_sharedns_free(_sharedns * shared)110 _sharedns_free(_sharedns *shared)
111 {
112     for (Py_ssize_t i=0; i < shared->len; i++) {
113         _sharednsitem_clear(&shared->items[i]);
114     }
115     PyMem_Free(shared->items);
116     PyMem_Free(shared);
117 }
118 
119 static _sharedns *
_get_shared_ns(PyObject * shareable)120 _get_shared_ns(PyObject *shareable)
121 {
122     if (shareable == NULL || shareable == Py_None) {
123         return NULL;
124     }
125     Py_ssize_t len = PyDict_Size(shareable);
126     if (len == 0) {
127         return NULL;
128     }
129 
130     _sharedns *shared = _sharedns_new(len);
131     if (shared == NULL) {
132         return NULL;
133     }
134     Py_ssize_t pos = 0;
135     for (Py_ssize_t i=0; i < len; i++) {
136         PyObject *key, *value;
137         if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
138             break;
139         }
140         if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
141             break;
142         }
143     }
144     if (PyErr_Occurred()) {
145         _sharedns_free(shared);
146         return NULL;
147     }
148     return shared;
149 }
150 
151 static int
_sharedns_apply(_sharedns * shared,PyObject * ns)152 _sharedns_apply(_sharedns *shared, PyObject *ns)
153 {
154     for (Py_ssize_t i=0; i < shared->len; i++) {
155         if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
156             return -1;
157         }
158     }
159     return 0;
160 }
161 
162 // Ultimately we'd like to preserve enough information about the
163 // exception and traceback that we could re-constitute (or at least
164 // simulate, a la traceback.TracebackException), and even chain, a copy
165 // of the exception in the calling interpreter.
166 
167 typedef struct _sharedexception {
168     char *name;
169     char *msg;
170 } _sharedexception;
171 
172 static _sharedexception *
_sharedexception_new(void)173 _sharedexception_new(void)
174 {
175     _sharedexception *err = PyMem_NEW(_sharedexception, 1);
176     if (err == NULL) {
177         PyErr_NoMemory();
178         return NULL;
179     }
180     err->name = NULL;
181     err->msg = NULL;
182     return err;
183 }
184 
185 static void
_sharedexception_clear(_sharedexception * exc)186 _sharedexception_clear(_sharedexception *exc)
187 {
188     if (exc->name != NULL) {
189         PyMem_Free(exc->name);
190     }
191     if (exc->msg != NULL) {
192         PyMem_Free(exc->msg);
193     }
194 }
195 
196 static void
_sharedexception_free(_sharedexception * exc)197 _sharedexception_free(_sharedexception *exc)
198 {
199     _sharedexception_clear(exc);
200     PyMem_Free(exc);
201 }
202 
203 static _sharedexception *
_sharedexception_bind(PyObject * exctype,PyObject * exc,PyObject * tb)204 _sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
205 {
206     assert(exctype != NULL);
207     char *failure = NULL;
208 
209     _sharedexception *err = _sharedexception_new();
210     if (err == NULL) {
211         goto finally;
212     }
213 
214     PyObject *name = PyUnicode_FromFormat("%S", exctype);
215     if (name == NULL) {
216         failure = "unable to format exception type name";
217         goto finally;
218     }
219     err->name = _copy_raw_string(name);
220     Py_DECREF(name);
221     if (err->name == NULL) {
222         if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
223             failure = "out of memory copying exception type name";
224         } else {
225             failure = "unable to encode and copy exception type name";
226         }
227         goto finally;
228     }
229 
230     if (exc != NULL) {
231         PyObject *msg = PyUnicode_FromFormat("%S", exc);
232         if (msg == NULL) {
233             failure = "unable to format exception message";
234             goto finally;
235         }
236         err->msg = _copy_raw_string(msg);
237         Py_DECREF(msg);
238         if (err->msg == NULL) {
239             if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
240                 failure = "out of memory copying exception message";
241             } else {
242                 failure = "unable to encode and copy exception message";
243             }
244             goto finally;
245         }
246     }
247 
248 finally:
249     if (failure != NULL) {
250         PyErr_Clear();
251         if (err->name != NULL) {
252             PyMem_Free(err->name);
253             err->name = NULL;
254         }
255         err->msg = failure;
256     }
257     return err;
258 }
259 
260 static void
_sharedexception_apply(_sharedexception * exc,PyObject * wrapperclass)261 _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
262 {
263     if (exc->name != NULL) {
264         if (exc->msg != NULL) {
265             PyErr_Format(wrapperclass, "%s: %s",  exc->name, exc->msg);
266         }
267         else {
268             PyErr_SetString(wrapperclass, exc->name);
269         }
270     }
271     else if (exc->msg != NULL) {
272         PyErr_SetString(wrapperclass, exc->msg);
273     }
274     else {
275         PyErr_SetNone(wrapperclass);
276     }
277 }
278 
279 
280 /* channel-specific code ****************************************************/
281 
282 #define CHANNEL_SEND 1
283 #define CHANNEL_BOTH 0
284 #define CHANNEL_RECV -1
285 
286 static PyObject *ChannelError;
287 static PyObject *ChannelNotFoundError;
288 static PyObject *ChannelClosedError;
289 static PyObject *ChannelEmptyError;
290 static PyObject *ChannelNotEmptyError;
291 
292 static int
channel_exceptions_init(PyObject * ns)293 channel_exceptions_init(PyObject *ns)
294 {
295     // XXX Move the exceptions into per-module memory?
296 
297     // A channel-related operation failed.
298     ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError",
299                                       PyExc_RuntimeError, NULL);
300     if (ChannelError == NULL) {
301         return -1;
302     }
303     if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) {
304         return -1;
305     }
306 
307     // An operation tried to use a channel that doesn't exist.
308     ChannelNotFoundError = PyErr_NewException(
309             "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL);
310     if (ChannelNotFoundError == NULL) {
311         return -1;
312     }
313     if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) {
314         return -1;
315     }
316 
317     // An operation tried to use a closed channel.
318     ChannelClosedError = PyErr_NewException(
319             "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL);
320     if (ChannelClosedError == NULL) {
321         return -1;
322     }
323     if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) {
324         return -1;
325     }
326 
327     // An operation tried to pop from an empty channel.
328     ChannelEmptyError = PyErr_NewException(
329             "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL);
330     if (ChannelEmptyError == NULL) {
331         return -1;
332     }
333     if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) {
334         return -1;
335     }
336 
337     // An operation tried to close a non-empty channel.
338     ChannelNotEmptyError = PyErr_NewException(
339             "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL);
340     if (ChannelNotEmptyError == NULL) {
341         return -1;
342     }
343     if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) {
344         return -1;
345     }
346 
347     return 0;
348 }
349 
350 /* the channel queue */
351 
352 struct _channelitem;
353 
354 typedef struct _channelitem {
355     _PyCrossInterpreterData *data;
356     struct _channelitem *next;
357 } _channelitem;
358 
359 static _channelitem *
_channelitem_new(void)360 _channelitem_new(void)
361 {
362     _channelitem *item = PyMem_NEW(_channelitem, 1);
363     if (item == NULL) {
364         PyErr_NoMemory();
365         return NULL;
366     }
367     item->data = NULL;
368     item->next = NULL;
369     return item;
370 }
371 
372 static void
_channelitem_clear(_channelitem * item)373 _channelitem_clear(_channelitem *item)
374 {
375     if (item->data != NULL) {
376         _PyCrossInterpreterData_Release(item->data);
377         PyMem_Free(item->data);
378         item->data = NULL;
379     }
380     item->next = NULL;
381 }
382 
383 static void
_channelitem_free(_channelitem * item)384 _channelitem_free(_channelitem *item)
385 {
386     _channelitem_clear(item);
387     PyMem_Free(item);
388 }
389 
390 static void
_channelitem_free_all(_channelitem * item)391 _channelitem_free_all(_channelitem *item)
392 {
393     while (item != NULL) {
394         _channelitem *last = item;
395         item = item->next;
396         _channelitem_free(last);
397     }
398 }
399 
400 static _PyCrossInterpreterData *
_channelitem_popped(_channelitem * item)401 _channelitem_popped(_channelitem *item)
402 {
403     _PyCrossInterpreterData *data = item->data;
404     item->data = NULL;
405     _channelitem_free(item);
406     return data;
407 }
408 
409 typedef struct _channelqueue {
410     int64_t count;
411     _channelitem *first;
412     _channelitem *last;
413 } _channelqueue;
414 
415 static _channelqueue *
_channelqueue_new(void)416 _channelqueue_new(void)
417 {
418     _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
419     if (queue == NULL) {
420         PyErr_NoMemory();
421         return NULL;
422     }
423     queue->count = 0;
424     queue->first = NULL;
425     queue->last = NULL;
426     return queue;
427 }
428 
429 static void
_channelqueue_clear(_channelqueue * queue)430 _channelqueue_clear(_channelqueue *queue)
431 {
432     _channelitem_free_all(queue->first);
433     queue->count = 0;
434     queue->first = NULL;
435     queue->last = NULL;
436 }
437 
438 static void
_channelqueue_free(_channelqueue * queue)439 _channelqueue_free(_channelqueue *queue)
440 {
441     _channelqueue_clear(queue);
442     PyMem_Free(queue);
443 }
444 
445 static int
_channelqueue_put(_channelqueue * queue,_PyCrossInterpreterData * data)446 _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
447 {
448     _channelitem *item = _channelitem_new();
449     if (item == NULL) {
450         return -1;
451     }
452     item->data = data;
453 
454     queue->count += 1;
455     if (queue->first == NULL) {
456         queue->first = item;
457     }
458     else {
459         queue->last->next = item;
460     }
461     queue->last = item;
462     return 0;
463 }
464 
465 static _PyCrossInterpreterData *
_channelqueue_get(_channelqueue * queue)466 _channelqueue_get(_channelqueue *queue)
467 {
468     _channelitem *item = queue->first;
469     if (item == NULL) {
470         return NULL;
471     }
472     queue->first = item->next;
473     if (queue->last == item) {
474         queue->last = NULL;
475     }
476     queue->count -= 1;
477 
478     return _channelitem_popped(item);
479 }
480 
481 /* channel-interpreter associations */
482 
483 struct _channelend;
484 
485 typedef struct _channelend {
486     struct _channelend *next;
487     int64_t interp;
488     int open;
489 } _channelend;
490 
491 static _channelend *
_channelend_new(int64_t interp)492 _channelend_new(int64_t interp)
493 {
494     _channelend *end = PyMem_NEW(_channelend, 1);
495     if (end == NULL) {
496         PyErr_NoMemory();
497         return NULL;
498     }
499     end->next = NULL;
500     end->interp = interp;
501     end->open = 1;
502     return end;
503 }
504 
505 static void
_channelend_free(_channelend * end)506 _channelend_free(_channelend *end)
507 {
508     PyMem_Free(end);
509 }
510 
511 static void
_channelend_free_all(_channelend * end)512 _channelend_free_all(_channelend *end)
513 {
514     while (end != NULL) {
515         _channelend *last = end;
516         end = end->next;
517         _channelend_free(last);
518     }
519 }
520 
521 static _channelend *
_channelend_find(_channelend * first,int64_t interp,_channelend ** pprev)522 _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
523 {
524     _channelend *prev = NULL;
525     _channelend *end = first;
526     while (end != NULL) {
527         if (end->interp == interp) {
528             break;
529         }
530         prev = end;
531         end = end->next;
532     }
533     if (pprev != NULL) {
534         *pprev = prev;
535     }
536     return end;
537 }
538 
539 typedef struct _channelassociations {
540     // Note that the list entries are never removed for interpreter
541     // for which the channel is closed.  This should be a problem in
542     // practice.  Also, a channel isn't automatically closed when an
543     // interpreter is destroyed.
544     int64_t numsendopen;
545     int64_t numrecvopen;
546     _channelend *send;
547     _channelend *recv;
548 } _channelends;
549 
550 static _channelends *
_channelends_new(void)551 _channelends_new(void)
552 {
553     _channelends *ends = PyMem_NEW(_channelends, 1);
554     if (ends== NULL) {
555         return NULL;
556     }
557     ends->numsendopen = 0;
558     ends->numrecvopen = 0;
559     ends->send = NULL;
560     ends->recv = NULL;
561     return ends;
562 }
563 
564 static void
_channelends_clear(_channelends * ends)565 _channelends_clear(_channelends *ends)
566 {
567     _channelend_free_all(ends->send);
568     ends->send = NULL;
569     ends->numsendopen = 0;
570 
571     _channelend_free_all(ends->recv);
572     ends->recv = NULL;
573     ends->numrecvopen = 0;
574 }
575 
576 static void
_channelends_free(_channelends * ends)577 _channelends_free(_channelends *ends)
578 {
579     _channelends_clear(ends);
580     PyMem_Free(ends);
581 }
582 
583 static _channelend *
_channelends_add(_channelends * ends,_channelend * prev,int64_t interp,int send)584 _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
585                  int send)
586 {
587     _channelend *end = _channelend_new(interp);
588     if (end == NULL) {
589         return NULL;
590     }
591 
592     if (prev == NULL) {
593         if (send) {
594             ends->send = end;
595         }
596         else {
597             ends->recv = end;
598         }
599     }
600     else {
601         prev->next = end;
602     }
603     if (send) {
604         ends->numsendopen += 1;
605     }
606     else {
607         ends->numrecvopen += 1;
608     }
609     return end;
610 }
611 
612 static int
_channelends_associate(_channelends * ends,int64_t interp,int send)613 _channelends_associate(_channelends *ends, int64_t interp, int send)
614 {
615     _channelend *prev;
616     _channelend *end = _channelend_find(send ? ends->send : ends->recv,
617                                         interp, &prev);
618     if (end != NULL) {
619         if (!end->open) {
620             PyErr_SetString(ChannelClosedError, "channel already closed");
621             return -1;
622         }
623         // already associated
624         return 0;
625     }
626     if (_channelends_add(ends, prev, interp, send) == NULL) {
627         return -1;
628     }
629     return 0;
630 }
631 
632 static int
_channelends_is_open(_channelends * ends)633 _channelends_is_open(_channelends *ends)
634 {
635     if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
636         return 1;
637     }
638     if (ends->send == NULL && ends->recv == NULL) {
639         return 1;
640     }
641     return 0;
642 }
643 
644 static void
_channelends_close_end(_channelends * ends,_channelend * end,int send)645 _channelends_close_end(_channelends *ends, _channelend *end, int send)
646 {
647     end->open = 0;
648     if (send) {
649         ends->numsendopen -= 1;
650     }
651     else {
652         ends->numrecvopen -= 1;
653     }
654 }
655 
656 static int
_channelends_close_interpreter(_channelends * ends,int64_t interp,int which)657 _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
658 {
659     _channelend *prev;
660     _channelend *end;
661     if (which >= 0) {  // send/both
662         end = _channelend_find(ends->send, interp, &prev);
663         if (end == NULL) {
664             // never associated so add it
665             end = _channelends_add(ends, prev, interp, 1);
666             if (end == NULL) {
667                 return -1;
668             }
669         }
670         _channelends_close_end(ends, end, 1);
671     }
672     if (which <= 0) {  // recv/both
673         end = _channelend_find(ends->recv, interp, &prev);
674         if (end == NULL) {
675             // never associated so add it
676             end = _channelends_add(ends, prev, interp, 0);
677             if (end == NULL) {
678                 return -1;
679             }
680         }
681         _channelends_close_end(ends, end, 0);
682     }
683     return 0;
684 }
685 
686 static void
_channelends_close_all(_channelends * ends,int which,int force)687 _channelends_close_all(_channelends *ends, int which, int force)
688 {
689     // XXX Handle the ends.
690     // XXX Handle force is True.
691 
692     // Ensure all the "send"-associated interpreters are closed.
693     _channelend *end;
694     for (end = ends->send; end != NULL; end = end->next) {
695         _channelends_close_end(ends, end, 1);
696     }
697 
698     // Ensure all the "recv"-associated interpreters are closed.
699     for (end = ends->recv; end != NULL; end = end->next) {
700         _channelends_close_end(ends, end, 0);
701     }
702 }
703 
704 /* channels */
705 
706 struct _channel;
707 struct _channel_closing;
708 static void _channel_clear_closing(struct _channel *);
709 static void _channel_finish_closing(struct _channel *);
710 
711 typedef struct _channel {
712     PyThread_type_lock mutex;
713     _channelqueue *queue;
714     _channelends *ends;
715     int open;
716     struct _channel_closing *closing;
717 } _PyChannelState;
718 
719 static _PyChannelState *
_channel_new(void)720 _channel_new(void)
721 {
722     _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
723     if (chan == NULL) {
724         return NULL;
725     }
726     chan->mutex = PyThread_allocate_lock();
727     if (chan->mutex == NULL) {
728         PyMem_Free(chan);
729         PyErr_SetString(ChannelError,
730                         "can't initialize mutex for new channel");
731         return NULL;
732     }
733     chan->queue = _channelqueue_new();
734     if (chan->queue == NULL) {
735         PyMem_Free(chan);
736         return NULL;
737     }
738     chan->ends = _channelends_new();
739     if (chan->ends == NULL) {
740         _channelqueue_free(chan->queue);
741         PyMem_Free(chan);
742         return NULL;
743     }
744     chan->open = 1;
745     chan->closing = NULL;
746     return chan;
747 }
748 
749 static void
_channel_free(_PyChannelState * chan)750 _channel_free(_PyChannelState *chan)
751 {
752     _channel_clear_closing(chan);
753     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
754     _channelqueue_free(chan->queue);
755     _channelends_free(chan->ends);
756     PyThread_release_lock(chan->mutex);
757 
758     PyThread_free_lock(chan->mutex);
759     PyMem_Free(chan);
760 }
761 
762 static int
_channel_add(_PyChannelState * chan,int64_t interp,_PyCrossInterpreterData * data)763 _channel_add(_PyChannelState *chan, int64_t interp,
764              _PyCrossInterpreterData *data)
765 {
766     int res = -1;
767     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
768 
769     if (!chan->open) {
770         PyErr_SetString(ChannelClosedError, "channel closed");
771         goto done;
772     }
773     if (_channelends_associate(chan->ends, interp, 1) != 0) {
774         goto done;
775     }
776 
777     if (_channelqueue_put(chan->queue, data) != 0) {
778         goto done;
779     }
780 
781     res = 0;
782 done:
783     PyThread_release_lock(chan->mutex);
784     return res;
785 }
786 
787 static _PyCrossInterpreterData *
_channel_next(_PyChannelState * chan,int64_t interp)788 _channel_next(_PyChannelState *chan, int64_t interp)
789 {
790     _PyCrossInterpreterData *data = NULL;
791     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
792 
793     if (!chan->open) {
794         PyErr_SetString(ChannelClosedError, "channel closed");
795         goto done;
796     }
797     if (_channelends_associate(chan->ends, interp, 0) != 0) {
798         goto done;
799     }
800 
801     data = _channelqueue_get(chan->queue);
802     if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
803         chan->open = 0;
804     }
805 
806 done:
807     PyThread_release_lock(chan->mutex);
808     if (chan->queue->count == 0) {
809         _channel_finish_closing(chan);
810     }
811     return data;
812 }
813 
814 static int
_channel_close_interpreter(_PyChannelState * chan,int64_t interp,int end)815 _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
816 {
817     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
818 
819     int res = -1;
820     if (!chan->open) {
821         PyErr_SetString(ChannelClosedError, "channel already closed");
822         goto done;
823     }
824 
825     if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
826         goto done;
827     }
828     chan->open = _channelends_is_open(chan->ends);
829 
830     res = 0;
831 done:
832     PyThread_release_lock(chan->mutex);
833     return res;
834 }
835 
836 static int
_channel_close_all(_PyChannelState * chan,int end,int force)837 _channel_close_all(_PyChannelState *chan, int end, int force)
838 {
839     int res = -1;
840     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
841 
842     if (!chan->open) {
843         PyErr_SetString(ChannelClosedError, "channel already closed");
844         goto done;
845     }
846 
847     if (!force && chan->queue->count > 0) {
848         PyErr_SetString(ChannelNotEmptyError,
849                         "may not be closed if not empty (try force=True)");
850         goto done;
851     }
852 
853     chan->open = 0;
854 
855     // We *could* also just leave these in place, since we've marked
856     // the channel as closed already.
857     _channelends_close_all(chan->ends, end, force);
858 
859     res = 0;
860 done:
861     PyThread_release_lock(chan->mutex);
862     return res;
863 }
864 
865 /* the set of channels */
866 
867 struct _channelref;
868 
869 typedef struct _channelref {
870     int64_t id;
871     _PyChannelState *chan;
872     struct _channelref *next;
873     Py_ssize_t objcount;
874 } _channelref;
875 
876 static _channelref *
_channelref_new(int64_t id,_PyChannelState * chan)877 _channelref_new(int64_t id, _PyChannelState *chan)
878 {
879     _channelref *ref = PyMem_NEW(_channelref, 1);
880     if (ref == NULL) {
881         return NULL;
882     }
883     ref->id = id;
884     ref->chan = chan;
885     ref->next = NULL;
886     ref->objcount = 0;
887     return ref;
888 }
889 
890 //static void
891 //_channelref_clear(_channelref *ref)
892 //{
893 //    ref->id = -1;
894 //    ref->chan = NULL;
895 //    ref->next = NULL;
896 //    ref->objcount = 0;
897 //}
898 
899 static void
_channelref_free(_channelref * ref)900 _channelref_free(_channelref *ref)
901 {
902     if (ref->chan != NULL) {
903         _channel_clear_closing(ref->chan);
904     }
905     //_channelref_clear(ref);
906     PyMem_Free(ref);
907 }
908 
909 static _channelref *
_channelref_find(_channelref * first,int64_t id,_channelref ** pprev)910 _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
911 {
912     _channelref *prev = NULL;
913     _channelref *ref = first;
914     while (ref != NULL) {
915         if (ref->id == id) {
916             break;
917         }
918         prev = ref;
919         ref = ref->next;
920     }
921     if (pprev != NULL) {
922         *pprev = prev;
923     }
924     return ref;
925 }
926 
927 typedef struct _channels {
928     PyThread_type_lock mutex;
929     _channelref *head;
930     int64_t numopen;
931     int64_t next_id;
932 } _channels;
933 
934 static int
_channels_init(_channels * channels)935 _channels_init(_channels *channels)
936 {
937     if (channels->mutex == NULL) {
938         channels->mutex = PyThread_allocate_lock();
939         if (channels->mutex == NULL) {
940             PyErr_SetString(ChannelError,
941                             "can't initialize mutex for channel management");
942             return -1;
943         }
944     }
945     channels->head = NULL;
946     channels->numopen = 0;
947     channels->next_id = 0;
948     return 0;
949 }
950 
951 static int64_t
_channels_next_id(_channels * channels)952 _channels_next_id(_channels *channels)  // needs lock
953 {
954     int64_t id = channels->next_id;
955     if (id < 0) {
956         /* overflow */
957         PyErr_SetString(ChannelError,
958                         "failed to get a channel ID");
959         return -1;
960     }
961     channels->next_id += 1;
962     return id;
963 }
964 
965 static _PyChannelState *
_channels_lookup(_channels * channels,int64_t id,PyThread_type_lock * pmutex)966 _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex)
967 {
968     _PyChannelState *chan = NULL;
969     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
970     if (pmutex != NULL) {
971         *pmutex = NULL;
972     }
973 
974     _channelref *ref = _channelref_find(channels->head, id, NULL);
975     if (ref == NULL) {
976         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
977         goto done;
978     }
979     if (ref->chan == NULL || !ref->chan->open) {
980         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
981         goto done;
982     }
983 
984     if (pmutex != NULL) {
985         // The mutex will be closed by the caller.
986         *pmutex = channels->mutex;
987     }
988 
989     chan = ref->chan;
990 done:
991     if (pmutex == NULL || *pmutex == NULL) {
992         PyThread_release_lock(channels->mutex);
993     }
994     return chan;
995 }
996 
997 static int64_t
_channels_add(_channels * channels,_PyChannelState * chan)998 _channels_add(_channels *channels, _PyChannelState *chan)
999 {
1000     int64_t cid = -1;
1001     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1002 
1003     // Create a new ref.
1004     int64_t id = _channels_next_id(channels);
1005     if (id < 0) {
1006         goto done;
1007     }
1008     _channelref *ref = _channelref_new(id, chan);
1009     if (ref == NULL) {
1010         goto done;
1011     }
1012 
1013     // Add it to the list.
1014     // We assume that the channel is a new one (not already in the list).
1015     ref->next = channels->head;
1016     channels->head = ref;
1017     channels->numopen += 1;
1018 
1019     cid = id;
1020 done:
1021     PyThread_release_lock(channels->mutex);
1022     return cid;
1023 }
1024 
1025 /* forward */
1026 static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
1027 
1028 static int
_channels_close(_channels * channels,int64_t cid,_PyChannelState ** pchan,int end,int force)1029 _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
1030                 int end, int force)
1031 {
1032     int res = -1;
1033     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1034     if (pchan != NULL) {
1035         *pchan = NULL;
1036     }
1037 
1038     _channelref *ref = _channelref_find(channels->head, cid, NULL);
1039     if (ref == NULL) {
1040         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid);
1041         goto done;
1042     }
1043 
1044     if (ref->chan == NULL) {
1045         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1046         goto done;
1047     }
1048     else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
1049         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1050         goto done;
1051     }
1052     else {
1053         if (_channel_close_all(ref->chan, end, force) != 0) {
1054             if (end == CHANNEL_SEND &&
1055                     PyErr_ExceptionMatches(ChannelNotEmptyError)) {
1056                 if (ref->chan->closing != NULL) {
1057                     PyErr_Format(ChannelClosedError,
1058                                  "channel %" PRId64 " closed", cid);
1059                     goto done;
1060                 }
1061                 // Mark the channel as closing and return.  The channel
1062                 // will be cleaned up in _channel_next().
1063                 PyErr_Clear();
1064                 if (_channel_set_closing(ref, channels->mutex) != 0) {
1065                     goto done;
1066                 }
1067                 if (pchan != NULL) {
1068                     *pchan = ref->chan;
1069                 }
1070                 res = 0;
1071             }
1072             goto done;
1073         }
1074         if (pchan != NULL) {
1075             *pchan = ref->chan;
1076         }
1077         else  {
1078             _channel_free(ref->chan);
1079         }
1080         ref->chan = NULL;
1081     }
1082 
1083     res = 0;
1084 done:
1085     PyThread_release_lock(channels->mutex);
1086     return res;
1087 }
1088 
1089 static void
_channels_remove_ref(_channels * channels,_channelref * ref,_channelref * prev,_PyChannelState ** pchan)1090 _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
1091                      _PyChannelState **pchan)
1092 {
1093     if (ref == channels->head) {
1094         channels->head = ref->next;
1095     }
1096     else {
1097         prev->next = ref->next;
1098     }
1099     channels->numopen -= 1;
1100 
1101     if (pchan != NULL) {
1102         *pchan = ref->chan;
1103     }
1104     _channelref_free(ref);
1105 }
1106 
1107 static int
_channels_remove(_channels * channels,int64_t id,_PyChannelState ** pchan)1108 _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
1109 {
1110     int res = -1;
1111     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1112 
1113     if (pchan != NULL) {
1114         *pchan = NULL;
1115     }
1116 
1117     _channelref *prev = NULL;
1118     _channelref *ref = _channelref_find(channels->head, id, &prev);
1119     if (ref == NULL) {
1120         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1121         goto done;
1122     }
1123 
1124     _channels_remove_ref(channels, ref, prev, pchan);
1125 
1126     res = 0;
1127 done:
1128     PyThread_release_lock(channels->mutex);
1129     return res;
1130 }
1131 
1132 static int
_channels_add_id_object(_channels * channels,int64_t id)1133 _channels_add_id_object(_channels *channels, int64_t id)
1134 {
1135     int res = -1;
1136     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1137 
1138     _channelref *ref = _channelref_find(channels->head, id, NULL);
1139     if (ref == NULL) {
1140         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1141         goto done;
1142     }
1143     ref->objcount += 1;
1144 
1145     res = 0;
1146 done:
1147     PyThread_release_lock(channels->mutex);
1148     return res;
1149 }
1150 
1151 static void
_channels_drop_id_object(_channels * channels,int64_t id)1152 _channels_drop_id_object(_channels *channels, int64_t id)
1153 {
1154     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1155 
1156     _channelref *prev = NULL;
1157     _channelref *ref = _channelref_find(channels->head, id, &prev);
1158     if (ref == NULL) {
1159         // Already destroyed.
1160         goto done;
1161     }
1162     ref->objcount -= 1;
1163 
1164     // Destroy if no longer used.
1165     if (ref->objcount == 0) {
1166         _PyChannelState *chan = NULL;
1167         _channels_remove_ref(channels, ref, prev, &chan);
1168         if (chan != NULL) {
1169             _channel_free(chan);
1170         }
1171     }
1172 
1173 done:
1174     PyThread_release_lock(channels->mutex);
1175 }
1176 
1177 static int64_t *
_channels_list_all(_channels * channels,int64_t * count)1178 _channels_list_all(_channels *channels, int64_t *count)
1179 {
1180     int64_t *cids = NULL;
1181     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1182     int64_t numopen = channels->numopen;
1183     if (numopen >= PY_SSIZE_T_MAX) {
1184         PyErr_SetString(PyExc_RuntimeError, "too many channels open");
1185         goto done;
1186     }
1187     int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
1188     if (ids == NULL) {
1189         goto done;
1190     }
1191     _channelref *ref = channels->head;
1192     for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1193         ids[i] = ref->id;
1194     }
1195     *count = channels->numopen;
1196 
1197     cids = ids;
1198 done:
1199     PyThread_release_lock(channels->mutex);
1200     return cids;
1201 }
1202 
1203 /* support for closing non-empty channels */
1204 
1205 struct _channel_closing {
1206     struct _channelref *ref;
1207 };
1208 
1209 static int
_channel_set_closing(struct _channelref * ref,PyThread_type_lock mutex)1210 _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
1211     struct _channel *chan = ref->chan;
1212     if (chan == NULL) {
1213         // already closed
1214         return 0;
1215     }
1216     int res = -1;
1217     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1218     if (chan->closing != NULL) {
1219         PyErr_SetString(ChannelClosedError, "channel closed");
1220         goto done;
1221     }
1222     chan->closing = PyMem_NEW(struct _channel_closing, 1);
1223     if (chan->closing == NULL) {
1224         goto done;
1225     }
1226     chan->closing->ref = ref;
1227 
1228     res = 0;
1229 done:
1230     PyThread_release_lock(chan->mutex);
1231     return res;
1232 }
1233 
1234 static void
_channel_clear_closing(struct _channel * chan)1235 _channel_clear_closing(struct _channel *chan) {
1236     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1237     if (chan->closing != NULL) {
1238         PyMem_Free(chan->closing);
1239         chan->closing = NULL;
1240     }
1241     PyThread_release_lock(chan->mutex);
1242 }
1243 
1244 static void
_channel_finish_closing(struct _channel * chan)1245 _channel_finish_closing(struct _channel *chan) {
1246     struct _channel_closing *closing = chan->closing;
1247     if (closing == NULL) {
1248         return;
1249     }
1250     _channelref *ref = closing->ref;
1251     _channel_clear_closing(chan);
1252     // Do the things that would have been done in _channels_close().
1253     ref->chan = NULL;
1254     _channel_free(chan);
1255 }
1256 
1257 /* "high"-level channel-related functions */
1258 
1259 static int64_t
_channel_create(_channels * channels)1260 _channel_create(_channels *channels)
1261 {
1262     _PyChannelState *chan = _channel_new();
1263     if (chan == NULL) {
1264         return -1;
1265     }
1266     int64_t id = _channels_add(channels, chan);
1267     if (id < 0) {
1268         _channel_free(chan);
1269         return -1;
1270     }
1271     return id;
1272 }
1273 
1274 static int
_channel_destroy(_channels * channels,int64_t id)1275 _channel_destroy(_channels *channels, int64_t id)
1276 {
1277     _PyChannelState *chan = NULL;
1278     if (_channels_remove(channels, id, &chan) != 0) {
1279         return -1;
1280     }
1281     if (chan != NULL) {
1282         _channel_free(chan);
1283     }
1284     return 0;
1285 }
1286 
1287 static int
_channel_send(_channels * channels,int64_t id,PyObject * obj)1288 _channel_send(_channels *channels, int64_t id, PyObject *obj)
1289 {
1290     PyInterpreterState *interp = _get_current();
1291     if (interp == NULL) {
1292         return -1;
1293     }
1294 
1295     // Look up the channel.
1296     PyThread_type_lock mutex = NULL;
1297     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1298     if (chan == NULL) {
1299         return -1;
1300     }
1301     // Past this point we are responsible for releasing the mutex.
1302 
1303     if (chan->closing != NULL) {
1304         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
1305         PyThread_release_lock(mutex);
1306         return -1;
1307     }
1308 
1309     // Convert the object to cross-interpreter data.
1310     _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
1311     if (data == NULL) {
1312         PyThread_release_lock(mutex);
1313         return -1;
1314     }
1315     if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1316         PyThread_release_lock(mutex);
1317         PyMem_Free(data);
1318         return -1;
1319     }
1320 
1321     // Add the data to the channel.
1322     int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
1323     PyThread_release_lock(mutex);
1324     if (res != 0) {
1325         _PyCrossInterpreterData_Release(data);
1326         PyMem_Free(data);
1327         return -1;
1328     }
1329 
1330     return 0;
1331 }
1332 
1333 static PyObject *
_channel_recv(_channels * channels,int64_t id)1334 _channel_recv(_channels *channels, int64_t id)
1335 {
1336     PyInterpreterState *interp = _get_current();
1337     if (interp == NULL) {
1338         return NULL;
1339     }
1340 
1341     // Look up the channel.
1342     PyThread_type_lock mutex = NULL;
1343     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1344     if (chan == NULL) {
1345         return NULL;
1346     }
1347     // Past this point we are responsible for releasing the mutex.
1348 
1349     // Pop off the next item from the channel.
1350     _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp));
1351     PyThread_release_lock(mutex);
1352     if (data == NULL) {
1353         if (!PyErr_Occurred()) {
1354             PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", id);
1355         }
1356         return NULL;
1357     }
1358 
1359     // Convert the data back to an object.
1360     PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1361     if (obj == NULL) {
1362         return NULL;
1363     }
1364     _PyCrossInterpreterData_Release(data);
1365     PyMem_Free(data);
1366 
1367     return obj;
1368 }
1369 
1370 static int
_channel_drop(_channels * channels,int64_t id,int send,int recv)1371 _channel_drop(_channels *channels, int64_t id, int send, int recv)
1372 {
1373     PyInterpreterState *interp = _get_current();
1374     if (interp == NULL) {
1375         return -1;
1376     }
1377 
1378     // Look up the channel.
1379     PyThread_type_lock mutex = NULL;
1380     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1381     if (chan == NULL) {
1382         return -1;
1383     }
1384     // Past this point we are responsible for releasing the mutex.
1385 
1386     // Close one or both of the two ends.
1387     int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
1388     PyThread_release_lock(mutex);
1389     return res;
1390 }
1391 
1392 static int
_channel_close(_channels * channels,int64_t id,int end,int force)1393 _channel_close(_channels *channels, int64_t id, int end, int force)
1394 {
1395     return _channels_close(channels, id, NULL, end, force);
1396 }
1397 
1398 /* ChannelID class */
1399 
1400 static PyTypeObject ChannelIDtype;
1401 
1402 typedef struct channelid {
1403     PyObject_HEAD
1404     int64_t id;
1405     int end;
1406     int resolve;
1407     _channels *channels;
1408 } channelid;
1409 
1410 static int
channel_id_converter(PyObject * arg,void * ptr)1411 channel_id_converter(PyObject *arg, void *ptr)
1412 {
1413     int64_t cid;
1414     if (PyObject_TypeCheck(arg, &ChannelIDtype)) {
1415         cid = ((channelid *)arg)->id;
1416     }
1417     else if (PyIndex_Check(arg)) {
1418         cid = PyLong_AsLongLong(arg);
1419         if (cid == -1 && PyErr_Occurred()) {
1420             return 0;
1421         }
1422         if (cid < 0) {
1423             PyErr_Format(PyExc_ValueError,
1424                         "channel ID must be a non-negative int, got %R", arg);
1425             return 0;
1426         }
1427     }
1428     else {
1429         PyErr_Format(PyExc_TypeError,
1430                      "channel ID must be an int, got %.100s",
1431                      arg->ob_type->tp_name);
1432         return 0;
1433     }
1434     *(int64_t *)ptr = cid;
1435     return 1;
1436 }
1437 
1438 static channelid *
newchannelid(PyTypeObject * cls,int64_t cid,int end,_channels * channels,int force,int resolve)1439 newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
1440              int force, int resolve)
1441 {
1442     channelid *self = PyObject_New(channelid, cls);
1443     if (self == NULL) {
1444         return NULL;
1445     }
1446     self->id = cid;
1447     self->end = end;
1448     self->resolve = resolve;
1449     self->channels = channels;
1450 
1451     if (_channels_add_id_object(channels, cid) != 0) {
1452         if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) {
1453             PyErr_Clear();
1454         }
1455         else {
1456             Py_DECREF((PyObject *)self);
1457             return NULL;
1458         }
1459     }
1460 
1461     return self;
1462 }
1463 
1464 static _channels * _global_channels(void);
1465 
1466 static PyObject *
channelid_new(PyTypeObject * cls,PyObject * args,PyObject * kwds)1467 channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds)
1468 {
1469     static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
1470     int64_t cid;
1471     int send = -1;
1472     int recv = -1;
1473     int force = 0;
1474     int resolve = 0;
1475     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1476                                      "O&|$pppp:ChannelID.__new__", kwlist,
1477                                      channel_id_converter, &cid, &send, &recv, &force, &resolve))
1478         return NULL;
1479 
1480     // Handle "send" and "recv".
1481     if (send == 0 && recv == 0) {
1482         PyErr_SetString(PyExc_ValueError,
1483                         "'send' and 'recv' cannot both be False");
1484         return NULL;
1485     }
1486 
1487     int end = 0;
1488     if (send == 1) {
1489         if (recv == 0 || recv == -1) {
1490             end = CHANNEL_SEND;
1491         }
1492     }
1493     else if (recv == 1) {
1494         end = CHANNEL_RECV;
1495     }
1496 
1497     return (PyObject *)newchannelid(cls, cid, end, _global_channels(),
1498                                     force, resolve);
1499 }
1500 
1501 static void
channelid_dealloc(PyObject * v)1502 channelid_dealloc(PyObject *v)
1503 {
1504     int64_t cid = ((channelid *)v)->id;
1505     _channels *channels = ((channelid *)v)->channels;
1506     Py_TYPE(v)->tp_free(v);
1507 
1508     _channels_drop_id_object(channels, cid);
1509 }
1510 
1511 static PyObject *
channelid_repr(PyObject * self)1512 channelid_repr(PyObject *self)
1513 {
1514     PyTypeObject *type = Py_TYPE(self);
1515     const char *name = _PyType_Name(type);
1516 
1517     channelid *cid = (channelid *)self;
1518     const char *fmt;
1519     if (cid->end == CHANNEL_SEND) {
1520         fmt = "%s(%" PRId64 ", send=True)";
1521     }
1522     else if (cid->end == CHANNEL_RECV) {
1523         fmt = "%s(%" PRId64 ", recv=True)";
1524     }
1525     else {
1526         fmt = "%s(%" PRId64 ")";
1527     }
1528     return PyUnicode_FromFormat(fmt, name, cid->id);
1529 }
1530 
1531 static PyObject *
channelid_str(PyObject * self)1532 channelid_str(PyObject *self)
1533 {
1534     channelid *cid = (channelid *)self;
1535     return PyUnicode_FromFormat("%" PRId64 "", cid->id);
1536 }
1537 
1538 static PyObject *
channelid_int(PyObject * self)1539 channelid_int(PyObject *self)
1540 {
1541     channelid *cid = (channelid *)self;
1542     return PyLong_FromLongLong(cid->id);
1543 }
1544 
1545 static PyNumberMethods channelid_as_number = {
1546      0,                        /* nb_add */
1547      0,                        /* nb_subtract */
1548      0,                        /* nb_multiply */
1549      0,                        /* nb_remainder */
1550      0,                        /* nb_divmod */
1551      0,                        /* nb_power */
1552      0,                        /* nb_negative */
1553      0,                        /* nb_positive */
1554      0,                        /* nb_absolute */
1555      0,                        /* nb_bool */
1556      0,                        /* nb_invert */
1557      0,                        /* nb_lshift */
1558      0,                        /* nb_rshift */
1559      0,                        /* nb_and */
1560      0,                        /* nb_xor */
1561      0,                        /* nb_or */
1562      (unaryfunc)channelid_int, /* nb_int */
1563      0,                        /* nb_reserved */
1564      0,                        /* nb_float */
1565 
1566      0,                        /* nb_inplace_add */
1567      0,                        /* nb_inplace_subtract */
1568      0,                        /* nb_inplace_multiply */
1569      0,                        /* nb_inplace_remainder */
1570      0,                        /* nb_inplace_power */
1571      0,                        /* nb_inplace_lshift */
1572      0,                        /* nb_inplace_rshift */
1573      0,                        /* nb_inplace_and */
1574      0,                        /* nb_inplace_xor */
1575      0,                        /* nb_inplace_or */
1576 
1577      0,                        /* nb_floor_divide */
1578      0,                        /* nb_true_divide */
1579      0,                        /* nb_inplace_floor_divide */
1580      0,                        /* nb_inplace_true_divide */
1581 
1582      (unaryfunc)channelid_int, /* nb_index */
1583 };
1584 
1585 static Py_hash_t
channelid_hash(PyObject * self)1586 channelid_hash(PyObject *self)
1587 {
1588     channelid *cid = (channelid *)self;
1589     PyObject *id = PyLong_FromLongLong(cid->id);
1590     if (id == NULL) {
1591         return -1;
1592     }
1593     Py_hash_t hash = PyObject_Hash(id);
1594     Py_DECREF(id);
1595     return hash;
1596 }
1597 
1598 static PyObject *
channelid_richcompare(PyObject * self,PyObject * other,int op)1599 channelid_richcompare(PyObject *self, PyObject *other, int op)
1600 {
1601     if (op != Py_EQ && op != Py_NE) {
1602         Py_RETURN_NOTIMPLEMENTED;
1603     }
1604 
1605     if (!PyObject_TypeCheck(self, &ChannelIDtype)) {
1606         Py_RETURN_NOTIMPLEMENTED;
1607     }
1608 
1609     channelid *cid = (channelid *)self;
1610     int equal;
1611     if (PyObject_TypeCheck(other, &ChannelIDtype)) {
1612         channelid *othercid = (channelid *)other;
1613         equal = (cid->end == othercid->end) && (cid->id == othercid->id);
1614     }
1615     else if (PyLong_Check(other)) {
1616         /* Fast path */
1617         int overflow;
1618         long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
1619         if (othercid == -1 && PyErr_Occurred()) {
1620             return NULL;
1621         }
1622         equal = !overflow && (othercid >= 0) && (cid->id == othercid);
1623     }
1624     else if (PyNumber_Check(other)) {
1625         PyObject *pyid = PyLong_FromLongLong(cid->id);
1626         if (pyid == NULL) {
1627             return NULL;
1628         }
1629         PyObject *res = PyObject_RichCompare(pyid, other, op);
1630         Py_DECREF(pyid);
1631         return res;
1632     }
1633     else {
1634         Py_RETURN_NOTIMPLEMENTED;
1635     }
1636 
1637     if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
1638         Py_RETURN_TRUE;
1639     }
1640     Py_RETURN_FALSE;
1641 }
1642 
1643 static PyObject *
_channel_from_cid(PyObject * cid,int end)1644 _channel_from_cid(PyObject *cid, int end)
1645 {
1646     PyObject *highlevel = PyImport_ImportModule("interpreters");
1647     if (highlevel == NULL) {
1648         PyErr_Clear();
1649         highlevel = PyImport_ImportModule("test.support.interpreters");
1650         if (highlevel == NULL) {
1651             return NULL;
1652         }
1653     }
1654     const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
1655                                                   "SendChannel";
1656     PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
1657     Py_DECREF(highlevel);
1658     if (cls == NULL) {
1659         return NULL;
1660     }
1661     PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
1662     Py_DECREF(cls);
1663     if (chan == NULL) {
1664         return NULL;
1665     }
1666     return chan;
1667 }
1668 
1669 struct _channelid_xid {
1670     int64_t id;
1671     int end;
1672     int resolve;
1673 };
1674 
1675 static PyObject *
_channelid_from_xid(_PyCrossInterpreterData * data)1676 _channelid_from_xid(_PyCrossInterpreterData *data)
1677 {
1678     struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1679     // Note that we do not preserve the "resolve" flag.
1680     PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end,
1681                                              _global_channels(), 0, 0);
1682     if (xid->end == 0) {
1683         return cid;
1684     }
1685     if (!xid->resolve) {
1686         return cid;
1687     }
1688 
1689     /* Try returning a high-level channel end but fall back to the ID. */
1690     PyObject *chan = _channel_from_cid(cid, xid->end);
1691     if (chan == NULL) {
1692         PyErr_Clear();
1693         return cid;
1694     }
1695     Py_DECREF(cid);
1696     return chan;
1697 }
1698 
1699 static int
_channelid_shared(PyObject * obj,_PyCrossInterpreterData * data)1700 _channelid_shared(PyObject *obj, _PyCrossInterpreterData *data)
1701 {
1702     struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1);
1703     if (xid == NULL) {
1704         return -1;
1705     }
1706     xid->id = ((channelid *)obj)->id;
1707     xid->end = ((channelid *)obj)->end;
1708     xid->resolve = ((channelid *)obj)->resolve;
1709 
1710     data->data = xid;
1711     Py_INCREF(obj);
1712     data->obj = obj;
1713     data->new_object = _channelid_from_xid;
1714     data->free = PyMem_Free;
1715     return 0;
1716 }
1717 
1718 static PyObject *
channelid_end(PyObject * self,void * end)1719 channelid_end(PyObject *self, void *end)
1720 {
1721     int force = 1;
1722     channelid *cid = (channelid *)self;
1723     if (end != NULL) {
1724         return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end,
1725                                         cid->channels, force, cid->resolve);
1726     }
1727 
1728     if (cid->end == CHANNEL_SEND) {
1729         return PyUnicode_InternFromString("send");
1730     }
1731     if (cid->end == CHANNEL_RECV) {
1732         return PyUnicode_InternFromString("recv");
1733     }
1734     return PyUnicode_InternFromString("both");
1735 }
1736 
1737 static int _channelid_end_send = CHANNEL_SEND;
1738 static int _channelid_end_recv = CHANNEL_RECV;
1739 
1740 static PyGetSetDef channelid_getsets[] = {
1741     {"end", (getter)channelid_end, NULL,
1742      PyDoc_STR("'send', 'recv', or 'both'")},
1743     {"send", (getter)channelid_end, NULL,
1744      PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
1745     {"recv", (getter)channelid_end, NULL,
1746      PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
1747     {NULL}
1748 };
1749 
1750 PyDoc_STRVAR(channelid_doc,
1751 "A channel ID identifies a channel and may be used as an int.");
1752 
1753 static PyTypeObject ChannelIDtype = {
1754     PyVarObject_HEAD_INIT(&PyType_Type, 0)
1755     "_xxsubinterpreters.ChannelID", /* tp_name */
1756     sizeof(channelid),              /* tp_basicsize */
1757     0,                              /* tp_itemsize */
1758     (destructor)channelid_dealloc,  /* tp_dealloc */
1759     0,                              /* tp_vectorcall_offset */
1760     0,                              /* tp_getattr */
1761     0,                              /* tp_setattr */
1762     0,                              /* tp_as_async */
1763     (reprfunc)channelid_repr,       /* tp_repr */
1764     &channelid_as_number,           /* tp_as_number */
1765     0,                              /* tp_as_sequence */
1766     0,                              /* tp_as_mapping */
1767     channelid_hash,                 /* tp_hash */
1768     0,                              /* tp_call */
1769     (reprfunc)channelid_str,        /* tp_str */
1770     0,                              /* tp_getattro */
1771     0,                              /* tp_setattro */
1772     0,                              /* tp_as_buffer */
1773     Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
1774     channelid_doc,                  /* tp_doc */
1775     0,                              /* tp_traverse */
1776     0,                              /* tp_clear */
1777     channelid_richcompare,          /* tp_richcompare */
1778     0,                              /* tp_weaklistoffset */
1779     0,                              /* tp_iter */
1780     0,                              /* tp_iternext */
1781     0,                              /* tp_methods */
1782     0,                              /* tp_members */
1783     channelid_getsets,              /* tp_getset */
1784     0,                              /* tp_base */
1785     0,                              /* tp_dict */
1786     0,                              /* tp_descr_get */
1787     0,                              /* tp_descr_set */
1788     0,                              /* tp_dictoffset */
1789     0,                              /* tp_init */
1790     0,                              /* tp_alloc */
1791     // Note that we do not set tp_new to channelid_new.  Instead we
1792     // set it to NULL, meaning it cannot be instantiated from Python
1793     // code.  We do this because there is a strong relationship between
1794     // channel IDs and the channel lifecycle, so this limitation avoids
1795     // related complications.
1796     NULL,                           /* tp_new */
1797 };
1798 
1799 
1800 /* interpreter-specific code ************************************************/
1801 
1802 static PyObject * RunFailedError = NULL;
1803 
1804 static int
interp_exceptions_init(PyObject * ns)1805 interp_exceptions_init(PyObject *ns)
1806 {
1807     // XXX Move the exceptions into per-module memory?
1808 
1809     if (RunFailedError == NULL) {
1810         // An uncaught exception came out of interp_run_string().
1811         RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
1812                                             PyExc_RuntimeError, NULL);
1813         if (RunFailedError == NULL) {
1814             return -1;
1815         }
1816         if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
1817             return -1;
1818         }
1819     }
1820 
1821     return 0;
1822 }
1823 
1824 static int
_is_running(PyInterpreterState * interp)1825 _is_running(PyInterpreterState *interp)
1826 {
1827     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1828     if (PyThreadState_Next(tstate) != NULL) {
1829         PyErr_SetString(PyExc_RuntimeError,
1830                         "interpreter has more than one thread");
1831         return -1;
1832     }
1833     PyFrameObject *frame = tstate->frame;
1834     if (frame == NULL) {
1835         if (PyErr_Occurred() != NULL) {
1836             return -1;
1837         }
1838         return 0;
1839     }
1840     return (int)(frame->f_executing);
1841 }
1842 
1843 static int
_ensure_not_running(PyInterpreterState * interp)1844 _ensure_not_running(PyInterpreterState *interp)
1845 {
1846     int is_running = _is_running(interp);
1847     if (is_running < 0) {
1848         return -1;
1849     }
1850     if (is_running) {
1851         PyErr_Format(PyExc_RuntimeError, "interpreter already running");
1852         return -1;
1853     }
1854     return 0;
1855 }
1856 
1857 static int
_run_script(PyInterpreterState * interp,const char * codestr,_sharedns * shared,_sharedexception ** exc)1858 _run_script(PyInterpreterState *interp, const char *codestr,
1859             _sharedns *shared, _sharedexception **exc)
1860 {
1861     PyObject *exctype = NULL;
1862     PyObject *excval = NULL;
1863     PyObject *tb = NULL;
1864 
1865     PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
1866     if (main_mod == NULL) {
1867         goto error;
1868     }
1869     PyObject *ns = PyModule_GetDict(main_mod);  // borrowed
1870     Py_DECREF(main_mod);
1871     if (ns == NULL) {
1872         goto error;
1873     }
1874     Py_INCREF(ns);
1875 
1876     // Apply the cross-interpreter data.
1877     if (shared != NULL) {
1878         if (_sharedns_apply(shared, ns) != 0) {
1879             Py_DECREF(ns);
1880             goto error;
1881         }
1882     }
1883 
1884     // Run the string (see PyRun_SimpleStringFlags).
1885     PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
1886     Py_DECREF(ns);
1887     if (result == NULL) {
1888         goto error;
1889     }
1890     else {
1891         Py_DECREF(result);  // We throw away the result.
1892     }
1893 
1894     *exc = NULL;
1895     return 0;
1896 
1897 error:
1898     PyErr_Fetch(&exctype, &excval, &tb);
1899 
1900     _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
1901     Py_XDECREF(exctype);
1902     Py_XDECREF(excval);
1903     Py_XDECREF(tb);
1904     if (sharedexc == NULL) {
1905         fprintf(stderr, "RunFailedError: script raised an uncaught exception");
1906         PyErr_Clear();
1907         sharedexc = NULL;
1908     }
1909     else {
1910         assert(!PyErr_Occurred());
1911     }
1912     *exc = sharedexc;
1913     return -1;
1914 }
1915 
1916 static int
_run_script_in_interpreter(PyInterpreterState * interp,const char * codestr,PyObject * shareables)1917 _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
1918                            PyObject *shareables)
1919 {
1920     if (_ensure_not_running(interp) < 0) {
1921         return -1;
1922     }
1923 
1924     _sharedns *shared = _get_shared_ns(shareables);
1925     if (shared == NULL && PyErr_Occurred()) {
1926         return -1;
1927     }
1928 
1929     // Switch to interpreter.
1930     PyThreadState *save_tstate = NULL;
1931     if (interp != _PyInterpreterState_Get()) {
1932         // XXX Using the "head" thread isn't strictly correct.
1933         PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1934         // XXX Possible GILState issues?
1935         save_tstate = PyThreadState_Swap(tstate);
1936     }
1937 
1938     // Run the script.
1939     _sharedexception *exc = NULL;
1940     int result = _run_script(interp, codestr, shared, &exc);
1941 
1942     // Switch back.
1943     if (save_tstate != NULL) {
1944         PyThreadState_Swap(save_tstate);
1945     }
1946 
1947     // Propagate any exception out to the caller.
1948     if (exc != NULL) {
1949         _sharedexception_apply(exc, RunFailedError);
1950         _sharedexception_free(exc);
1951     }
1952     else if (result != 0) {
1953         // We were unable to allocate a shared exception.
1954         PyErr_NoMemory();
1955     }
1956 
1957     if (shared != NULL) {
1958         _sharedns_free(shared);
1959     }
1960 
1961     return result;
1962 }
1963 
1964 
1965 /* module level code ********************************************************/
1966 
1967 /* globals is the process-global state for the module.  It holds all
1968    the data that we need to share between interpreters, so it cannot
1969    hold PyObject values. */
1970 static struct globals {
1971     _channels channels;
1972 } _globals = {{0}};
1973 
1974 static int
_init_globals(void)1975 _init_globals(void)
1976 {
1977     if (_channels_init(&_globals.channels) != 0) {
1978         return -1;
1979     }
1980     return 0;
1981 }
1982 
1983 static _channels *
_global_channels(void)1984 _global_channels(void) {
1985     return &_globals.channels;
1986 }
1987 
1988 static PyObject *
interp_create(PyObject * self,PyObject * args)1989 interp_create(PyObject *self, PyObject *args)
1990 {
1991     if (!PyArg_UnpackTuple(args, "create", 0, 0)) {
1992         return NULL;
1993     }
1994 
1995     // Create and initialize the new interpreter.
1996     PyThreadState *save_tstate = PyThreadState_Get();
1997     // XXX Possible GILState issues?
1998     PyThreadState *tstate = Py_NewInterpreter();
1999     PyThreadState_Swap(save_tstate);
2000     if (tstate == NULL) {
2001         /* Since no new thread state was created, there is no exception to
2002            propagate; raise a fresh one after swapping in the old thread
2003            state. */
2004         PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed");
2005         return NULL;
2006     }
2007     PyObject *idobj = _PyInterpreterState_GetIDObject(tstate->interp);
2008     if (idobj == NULL) {
2009         // XXX Possible GILState issues?
2010         save_tstate = PyThreadState_Swap(tstate);
2011         Py_EndInterpreter(tstate);
2012         PyThreadState_Swap(save_tstate);
2013         return NULL;
2014     }
2015     _PyInterpreterState_RequireIDRef(tstate->interp, 1);
2016     return idobj;
2017 }
2018 
2019 PyDoc_STRVAR(create_doc,
2020 "create() -> ID\n\
2021 \n\
2022 Create a new interpreter and return a unique generated ID.");
2023 
2024 
2025 static PyObject *
interp_destroy(PyObject * self,PyObject * args,PyObject * kwds)2026 interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2027 {
2028     static char *kwlist[] = {"id", NULL};
2029     PyObject *id;
2030     // XXX Use "L" for id?
2031     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2032                                      "O:destroy", kwlist, &id)) {
2033         return NULL;
2034     }
2035 
2036     // Look up the interpreter.
2037     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2038     if (interp == NULL) {
2039         return NULL;
2040     }
2041 
2042     // Ensure we don't try to destroy the current interpreter.
2043     PyInterpreterState *current = _get_current();
2044     if (current == NULL) {
2045         return NULL;
2046     }
2047     if (interp == current) {
2048         PyErr_SetString(PyExc_RuntimeError,
2049                         "cannot destroy the current interpreter");
2050         return NULL;
2051     }
2052 
2053     // Ensure the interpreter isn't running.
2054     /* XXX We *could* support destroying a running interpreter but
2055        aren't going to worry about it for now. */
2056     if (_ensure_not_running(interp) < 0) {
2057         return NULL;
2058     }
2059 
2060     // Destroy the interpreter.
2061     //PyInterpreterState_Delete(interp);
2062     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
2063     // XXX Possible GILState issues?
2064     PyThreadState *save_tstate = PyThreadState_Swap(tstate);
2065     Py_EndInterpreter(tstate);
2066     PyThreadState_Swap(save_tstate);
2067 
2068     Py_RETURN_NONE;
2069 }
2070 
2071 PyDoc_STRVAR(destroy_doc,
2072 "destroy(id)\n\
2073 \n\
2074 Destroy the identified interpreter.\n\
2075 \n\
2076 Attempting to destroy the current interpreter results in a RuntimeError.\n\
2077 So does an unrecognized ID.");
2078 
2079 
2080 static PyObject *
interp_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2081 interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2082 {
2083     PyObject *ids, *id;
2084     PyInterpreterState *interp;
2085 
2086     ids = PyList_New(0);
2087     if (ids == NULL) {
2088         return NULL;
2089     }
2090 
2091     interp = PyInterpreterState_Head();
2092     while (interp != NULL) {
2093         id = _PyInterpreterState_GetIDObject(interp);
2094         if (id == NULL) {
2095             Py_DECREF(ids);
2096             return NULL;
2097         }
2098         // insert at front of list
2099         int res = PyList_Insert(ids, 0, id);
2100         Py_DECREF(id);
2101         if (res < 0) {
2102             Py_DECREF(ids);
2103             return NULL;
2104         }
2105 
2106         interp = PyInterpreterState_Next(interp);
2107     }
2108 
2109     return ids;
2110 }
2111 
2112 PyDoc_STRVAR(list_all_doc,
2113 "list_all() -> [ID]\n\
2114 \n\
2115 Return a list containing the ID of every existing interpreter.");
2116 
2117 
2118 static PyObject *
interp_get_current(PyObject * self,PyObject * Py_UNUSED (ignored))2119 interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored))
2120 {
2121     PyInterpreterState *interp =_get_current();
2122     if (interp == NULL) {
2123         return NULL;
2124     }
2125     return _PyInterpreterState_GetIDObject(interp);
2126 }
2127 
2128 PyDoc_STRVAR(get_current_doc,
2129 "get_current() -> ID\n\
2130 \n\
2131 Return the ID of current interpreter.");
2132 
2133 
2134 static PyObject *
interp_get_main(PyObject * self,PyObject * Py_UNUSED (ignored))2135 interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored))
2136 {
2137     // Currently, 0 is always the main interpreter.
2138     PY_INT64_T id = 0;
2139     return _PyInterpreterID_New(id);
2140 }
2141 
2142 PyDoc_STRVAR(get_main_doc,
2143 "get_main() -> ID\n\
2144 \n\
2145 Return the ID of main interpreter.");
2146 
2147 
2148 static PyObject *
interp_run_string(PyObject * self,PyObject * args,PyObject * kwds)2149 interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
2150 {
2151     static char *kwlist[] = {"id", "script", "shared", NULL};
2152     PyObject *id, *code;
2153     PyObject *shared = NULL;
2154     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2155                                      "OU|O:run_string", kwlist,
2156                                      &id, &code, &shared)) {
2157         return NULL;
2158     }
2159 
2160     // Look up the interpreter.
2161     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2162     if (interp == NULL) {
2163         return NULL;
2164     }
2165 
2166     // Extract code.
2167     Py_ssize_t size;
2168     const char *codestr = PyUnicode_AsUTF8AndSize(code, &size);
2169     if (codestr == NULL) {
2170         return NULL;
2171     }
2172     if (strlen(codestr) != (size_t)size) {
2173         PyErr_SetString(PyExc_ValueError,
2174                         "source code string cannot contain null bytes");
2175         return NULL;
2176     }
2177 
2178     // Run the code in the interpreter.
2179     if (_run_script_in_interpreter(interp, codestr, shared) != 0) {
2180         return NULL;
2181     }
2182     Py_RETURN_NONE;
2183 }
2184 
2185 PyDoc_STRVAR(run_string_doc,
2186 "run_string(id, script, shared)\n\
2187 \n\
2188 Execute the provided string in the identified interpreter.\n\
2189 \n\
2190 See PyRun_SimpleStrings.");
2191 
2192 
2193 static PyObject *
object_is_shareable(PyObject * self,PyObject * args,PyObject * kwds)2194 object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds)
2195 {
2196     static char *kwlist[] = {"obj", NULL};
2197     PyObject *obj;
2198     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2199                                      "O:is_shareable", kwlist, &obj)) {
2200         return NULL;
2201     }
2202 
2203     if (_PyObject_CheckCrossInterpreterData(obj) == 0) {
2204         Py_RETURN_TRUE;
2205     }
2206     PyErr_Clear();
2207     Py_RETURN_FALSE;
2208 }
2209 
2210 PyDoc_STRVAR(is_shareable_doc,
2211 "is_shareable(obj) -> bool\n\
2212 \n\
2213 Return True if the object's data may be shared between interpreters and\n\
2214 False otherwise.");
2215 
2216 
2217 static PyObject *
interp_is_running(PyObject * self,PyObject * args,PyObject * kwds)2218 interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
2219 {
2220     static char *kwlist[] = {"id", NULL};
2221     PyObject *id;
2222     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2223                                      "O:is_running", kwlist, &id)) {
2224         return NULL;
2225     }
2226 
2227     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2228     if (interp == NULL) {
2229         return NULL;
2230     }
2231     int is_running = _is_running(interp);
2232     if (is_running < 0) {
2233         return NULL;
2234     }
2235     if (is_running) {
2236         Py_RETURN_TRUE;
2237     }
2238     Py_RETURN_FALSE;
2239 }
2240 
2241 PyDoc_STRVAR(is_running_doc,
2242 "is_running(id) -> bool\n\
2243 \n\
2244 Return whether or not the identified interpreter is running.");
2245 
2246 static PyObject *
channel_create(PyObject * self,PyObject * Py_UNUSED (ignored))2247 channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
2248 {
2249     int64_t cid = _channel_create(&_globals.channels);
2250     if (cid < 0) {
2251         return NULL;
2252     }
2253     PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0,
2254                                             &_globals.channels, 0, 0);
2255     if (id == NULL) {
2256         if (_channel_destroy(&_globals.channels, cid) != 0) {
2257             // XXX issue a warning?
2258         }
2259         return NULL;
2260     }
2261     assert(((channelid *)id)->channels != NULL);
2262     return id;
2263 }
2264 
2265 PyDoc_STRVAR(channel_create_doc,
2266 "channel_create() -> cid\n\
2267 \n\
2268 Create a new cross-interpreter channel and return a unique generated ID.");
2269 
2270 static PyObject *
channel_destroy(PyObject * self,PyObject * args,PyObject * kwds)2271 channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2272 {
2273     static char *kwlist[] = {"cid", NULL};
2274     int64_t cid;
2275     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
2276                                      channel_id_converter, &cid)) {
2277         return NULL;
2278     }
2279 
2280     if (_channel_destroy(&_globals.channels, cid) != 0) {
2281         return NULL;
2282     }
2283     Py_RETURN_NONE;
2284 }
2285 
2286 PyDoc_STRVAR(channel_destroy_doc,
2287 "channel_destroy(cid)\n\
2288 \n\
2289 Close and finalize the channel.  Afterward attempts to use the channel\n\
2290 will behave as though it never existed.");
2291 
2292 static PyObject *
channel_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2293 channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2294 {
2295     int64_t count = 0;
2296     int64_t *cids = _channels_list_all(&_globals.channels, &count);
2297     if (cids == NULL) {
2298         if (count == 0) {
2299             return PyList_New(0);
2300         }
2301         return NULL;
2302     }
2303     PyObject *ids = PyList_New((Py_ssize_t)count);
2304     if (ids == NULL) {
2305         goto finally;
2306     }
2307     int64_t *cur = cids;
2308     for (int64_t i=0; i < count; cur++, i++) {
2309         PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
2310                                                 &_globals.channels, 0, 0);
2311         if (id == NULL) {
2312             Py_DECREF(ids);
2313             ids = NULL;
2314             break;
2315         }
2316         PyList_SET_ITEM(ids, i, id);
2317     }
2318 
2319 finally:
2320     PyMem_Free(cids);
2321     return ids;
2322 }
2323 
2324 PyDoc_STRVAR(channel_list_all_doc,
2325 "channel_list_all() -> [cid]\n\
2326 \n\
2327 Return the list of all IDs for active channels.");
2328 
2329 static PyObject *
channel_send(PyObject * self,PyObject * args,PyObject * kwds)2330 channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2331 {
2332     static char *kwlist[] = {"cid", "obj", NULL};
2333     int64_t cid;
2334     PyObject *obj;
2335     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
2336                                      channel_id_converter, &cid, &obj)) {
2337         return NULL;
2338     }
2339 
2340     if (_channel_send(&_globals.channels, cid, obj) != 0) {
2341         return NULL;
2342     }
2343     Py_RETURN_NONE;
2344 }
2345 
2346 PyDoc_STRVAR(channel_send_doc,
2347 "channel_send(cid, obj)\n\
2348 \n\
2349 Add the object's data to the channel's queue.");
2350 
2351 static PyObject *
channel_recv(PyObject * self,PyObject * args,PyObject * kwds)2352 channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
2353 {
2354     static char *kwlist[] = {"cid", NULL};
2355     int64_t cid;
2356     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_recv", kwlist,
2357                                      channel_id_converter, &cid)) {
2358         return NULL;
2359     }
2360 
2361     return _channel_recv(&_globals.channels, cid);
2362 }
2363 
2364 PyDoc_STRVAR(channel_recv_doc,
2365 "channel_recv(cid) -> obj\n\
2366 \n\
2367 Return a new object from the data at the from of the channel's queue.");
2368 
2369 static PyObject *
channel_close(PyObject * self,PyObject * args,PyObject * kwds)2370 channel_close(PyObject *self, PyObject *args, PyObject *kwds)
2371 {
2372     static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2373     int64_t cid;
2374     int send = 0;
2375     int recv = 0;
2376     int force = 0;
2377     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2378                                      "O&|$ppp:channel_close", kwlist,
2379                                      channel_id_converter, &cid, &send, &recv, &force)) {
2380         return NULL;
2381     }
2382 
2383     if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) {
2384         return NULL;
2385     }
2386     Py_RETURN_NONE;
2387 }
2388 
2389 PyDoc_STRVAR(channel_close_doc,
2390 "channel_close(cid, *, send=None, recv=None, force=False)\n\
2391 \n\
2392 Close the channel for all interpreters.\n\
2393 \n\
2394 If the channel is empty then the keyword args are ignored and both\n\
2395 ends are immediately closed.  Otherwise, if 'force' is True then\n\
2396 all queued items are released and both ends are immediately\n\
2397 closed.\n\
2398 \n\
2399 If the channel is not empty *and* 'force' is False then following\n\
2400 happens:\n\
2401 \n\
2402  * recv is True (regardless of send):\n\
2403    - raise ChannelNotEmptyError\n\
2404  * recv is None and send is None:\n\
2405    - raise ChannelNotEmptyError\n\
2406  * send is True and recv is not True:\n\
2407    - fully close the 'send' end\n\
2408    - close the 'recv' end to interpreters not already receiving\n\
2409    - fully close it once empty\n\
2410 \n\
2411 Closing an already closed channel results in a ChannelClosedError.\n\
2412 \n\
2413 Once the channel's ID has no more ref counts in any interpreter\n\
2414 the channel will be destroyed.");
2415 
2416 static PyObject *
channel_release(PyObject * self,PyObject * args,PyObject * kwds)2417 channel_release(PyObject *self, PyObject *args, PyObject *kwds)
2418 {
2419     // Note that only the current interpreter is affected.
2420     static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2421     int64_t cid;
2422     int send = 0;
2423     int recv = 0;
2424     int force = 0;
2425     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2426                                      "O&|$ppp:channel_release", kwlist,
2427                                      channel_id_converter, &cid, &send, &recv, &force)) {
2428         return NULL;
2429     }
2430     if (send == 0 && recv == 0) {
2431         send = 1;
2432         recv = 1;
2433     }
2434 
2435     // XXX Handle force is True.
2436     // XXX Fix implicit release.
2437 
2438     if (_channel_drop(&_globals.channels, cid, send, recv) != 0) {
2439         return NULL;
2440     }
2441     Py_RETURN_NONE;
2442 }
2443 
2444 PyDoc_STRVAR(channel_release_doc,
2445 "channel_release(cid, *, send=None, recv=None, force=True)\n\
2446 \n\
2447 Close the channel for the current interpreter.  'send' and 'recv'\n\
2448 (bool) may be used to indicate the ends to close.  By default both\n\
2449 ends are closed.  Closing an already closed end is a noop.");
2450 
2451 static PyObject *
channel__channel_id(PyObject * self,PyObject * args,PyObject * kwds)2452 channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
2453 {
2454     return channelid_new(&ChannelIDtype, args, kwds);
2455 }
2456 
2457 static PyMethodDef module_functions[] = {
2458     {"create",                    (PyCFunction)interp_create,
2459      METH_VARARGS, create_doc},
2460     {"destroy",                   (PyCFunction)(void(*)(void))interp_destroy,
2461      METH_VARARGS | METH_KEYWORDS, destroy_doc},
2462     {"list_all",                  interp_list_all,
2463      METH_NOARGS, list_all_doc},
2464     {"get_current",               interp_get_current,
2465      METH_NOARGS, get_current_doc},
2466     {"get_main",                  interp_get_main,
2467      METH_NOARGS, get_main_doc},
2468     {"is_running",                (PyCFunction)(void(*)(void))interp_is_running,
2469      METH_VARARGS | METH_KEYWORDS, is_running_doc},
2470     {"run_string",                (PyCFunction)(void(*)(void))interp_run_string,
2471      METH_VARARGS | METH_KEYWORDS, run_string_doc},
2472 
2473     {"is_shareable",              (PyCFunction)(void(*)(void))object_is_shareable,
2474      METH_VARARGS | METH_KEYWORDS, is_shareable_doc},
2475 
2476     {"channel_create",            channel_create,
2477      METH_NOARGS, channel_create_doc},
2478     {"channel_destroy",           (PyCFunction)(void(*)(void))channel_destroy,
2479      METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
2480     {"channel_list_all",          channel_list_all,
2481      METH_NOARGS, channel_list_all_doc},
2482     {"channel_send",              (PyCFunction)(void(*)(void))channel_send,
2483      METH_VARARGS | METH_KEYWORDS, channel_send_doc},
2484     {"channel_recv",              (PyCFunction)(void(*)(void))channel_recv,
2485      METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
2486     {"channel_close",             (PyCFunction)(void(*)(void))channel_close,
2487      METH_VARARGS | METH_KEYWORDS, channel_close_doc},
2488     {"channel_release",           (PyCFunction)(void(*)(void))channel_release,
2489      METH_VARARGS | METH_KEYWORDS, channel_release_doc},
2490     {"_channel_id",               (PyCFunction)(void(*)(void))channel__channel_id,
2491      METH_VARARGS | METH_KEYWORDS, NULL},
2492 
2493     {NULL,                        NULL}           /* sentinel */
2494 };
2495 
2496 
2497 /* initialization function */
2498 
2499 PyDoc_STRVAR(module_doc,
2500 "This module provides primitive operations to manage Python interpreters.\n\
2501 The 'interpreters' module provides a more convenient interface.");
2502 
2503 static struct PyModuleDef interpretersmodule = {
2504     PyModuleDef_HEAD_INIT,
2505     "_xxsubinterpreters",  /* m_name */
2506     module_doc,            /* m_doc */
2507     -1,                    /* m_size */
2508     module_functions,      /* m_methods */
2509     NULL,                  /* m_slots */
2510     NULL,                  /* m_traverse */
2511     NULL,                  /* m_clear */
2512     NULL                   /* m_free */
2513 };
2514 
2515 
2516 PyMODINIT_FUNC
PyInit__xxsubinterpreters(void)2517 PyInit__xxsubinterpreters(void)
2518 {
2519     if (_init_globals() != 0) {
2520         return NULL;
2521     }
2522 
2523     /* Initialize types */
2524     if (PyType_Ready(&ChannelIDtype) != 0) {
2525         return NULL;
2526     }
2527 
2528     /* Create the module */
2529     PyObject *module = PyModule_Create(&interpretersmodule);
2530     if (module == NULL) {
2531         return NULL;
2532     }
2533 
2534     /* Add exception types */
2535     PyObject *ns = PyModule_GetDict(module);  // borrowed
2536     if (interp_exceptions_init(ns) != 0) {
2537         return NULL;
2538     }
2539     if (channel_exceptions_init(ns) != 0) {
2540         return NULL;
2541     }
2542 
2543     /* Add other types */
2544     Py_INCREF(&ChannelIDtype);
2545     if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) {
2546         return NULL;
2547     }
2548     Py_INCREF(&_PyInterpreterID_Type);
2549     if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) {
2550         return NULL;
2551     }
2552 
2553     if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) {
2554         return NULL;
2555     }
2556 
2557     return module;
2558 }
2559