1#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15#    notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17#    notice, this list of conditions and the following disclaimer in the
18#    documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20#    used to endorse or promote products derived from this software
21#    without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
34#
35
36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
37
38#
39# Imports
40#
41
42import os
43import sys
44import weakref
45import threading
46import array
47import queue
48
49from traceback import format_exc
50try:
51    from dill import PicklingError
52except ImportError:
53    from pickle import PicklingError
54from multiprocess import Process, current_process, active_children, Pool, util, connection
55from multiprocess.process import AuthenticationString
56from multiprocess.forking import exit, Popen, assert_spawning, ForkingPickler
57from multiprocess.util import Finalize, info
58
59#
60# Register some things for pickling
61#
62
63def reduce_array(a):
64    return array.array, (a.typecode, a.tostring())
65ForkingPickler.register(array.array, reduce_array)
66
67view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
68if view_types[0] is not list:       # only needed in Py3.0
69    def rebuild_as_list(obj):
70        return list, (list(obj),)
71    for view_type in view_types:
72        ForkingPickler.register(view_type, rebuild_as_list)
73        import copyreg
74        copyreg.pickle(view_type, rebuild_as_list)
75
76#
77# Type for identifying shared objects
78#
79
80class Token(object):
81    '''
82    Type to uniquely indentify a shared object
83    '''
84    __slots__ = ('typeid', 'address', 'id')
85
86    def __init__(self, typeid, address, id):
87        (self.typeid, self.address, self.id) = (typeid, address, id)
88
89    def __getstate__(self):
90        return (self.typeid, self.address, self.id)
91
92    def __setstate__(self, state):
93        (self.typeid, self.address, self.id) = state
94
95    def __repr__(self):
96        return 'Token(typeid=%r, address=%r, id=%r)' % \
97               (self.typeid, self.address, self.id)
98
99#
100# Function for communication with a manager's server process
101#
102
103def dispatch(c, id, methodname, args=(), kwds={}):
104    '''
105    Send a message to manager using connection `c` and return response
106    '''
107    c.send((id, methodname, args, kwds))
108    kind, result = c.recv()
109    if kind == '#RETURN':
110        return result
111    raise convert_to_error(kind, result)
112
113def convert_to_error(kind, result):
114    if kind == '#ERROR':
115        return result
116    elif kind == '#TRACEBACK':
117        assert type(result) is str
118        return  RemoteError(result)
119    elif kind == '#UNSERIALIZABLE':
120        assert type(result) is str
121        return RemoteError('Unserializable message: %s\n' % result)
122    else:
123        return ValueError('Unrecognized message type')
124
125class RemoteError(Exception):
126    def __str__(self):
127        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
128
129#
130# Functions for finding the method names of an object
131#
132
133def all_methods(obj):
134    '''
135    Return a list of names of methods of `obj`
136    '''
137    temp = []
138    for name in dir(obj):
139        func = getattr(obj, name)
140        if hasattr(func, '__call__'):
141            temp.append(name)
142    return temp
143
144def public_methods(obj):
145    '''
146    Return a list of names of methods of `obj` which do not start with '_'
147    '''
148    return [name for name in all_methods(obj) if name[0] != '_']
149
150#
151# Server which is run in a process controlled by a manager
152#
153
154class Server(object):
155    '''
156    Server class which runs in a process controlled by a manager object
157    '''
158    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
159              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
160
161    def __init__(self, registry, address, authkey, serializer):
162        assert isinstance(authkey, bytes)
163        self.registry = registry
164        self.authkey = AuthenticationString(authkey)
165        Listener, Client = listener_client[serializer]
166
167        # do authentication later
168        self.listener = Listener(address=address, backlog=5)
169        self.address = self.listener.address
170
171        self.id_to_obj = {'0': (None, ())}
172        self.id_to_refcount = {}
173        self.mutex = threading.RLock()
174        self.stop = 0
175
176    def serve_forever(self):
177        '''
178        Run the server forever
179        '''
180        current_process()._manager_server = self
181        try:
182            try:
183                while 1:
184                    try:
185                        c = self.listener.accept()
186                    except (OSError, IOError):
187                        continue
188                    t = threading.Thread(target=self.handle_request, args=(c,))
189                    t.daemon = True
190                    t.start()
191            except (KeyboardInterrupt, SystemExit):
192                pass
193        finally:
194            self.stop = 999
195            self.listener.close()
196
197    def handle_request(self, c):
198        '''
199        Handle a new connection
200        '''
201        funcname = result = request = None
202        try:
203            connection.deliver_challenge(c, self.authkey)
204            connection.answer_challenge(c, self.authkey)
205            request = c.recv()
206            ignore, funcname, args, kwds = request
207            assert funcname in self.public, '%r unrecognized' % funcname
208            func = getattr(self, funcname)
209        except Exception:
210            msg = ('#TRACEBACK', format_exc())
211        else:
212            try:
213                result = func(c, *args, **kwds)
214            except Exception:
215                msg = ('#TRACEBACK', format_exc())
216            else:
217                msg = ('#RETURN', result)
218        try:
219            c.send(msg)
220        except Exception as e:
221            try:
222                c.send(('#TRACEBACK', format_exc()))
223            except Exception:
224                pass
225            util.info('Failure to send message: %r', msg)
226            util.info(' ... request was %r', request)
227            util.info(' ... exception was %r', e)
228
229        c.close()
230
231    def serve_client(self, conn):
232        '''
233        Handle requests from the proxies in a particular process/thread
234        '''
235        util.debug('starting server thread to service %r',
236                   threading.current_thread().name)
237
238        recv = conn.recv
239        send = conn.send
240        id_to_obj = self.id_to_obj
241
242        while not self.stop:
243
244            try:
245                methodname = obj = None
246                request = recv()
247                ident, methodname, args, kwds = request
248                obj, exposed, gettypeid = id_to_obj[ident]
249
250                if methodname not in exposed:
251                    raise AttributeError(
252                        'method %r of %r object is not in exposed=%r' %
253                        (methodname, type(obj), exposed)
254                        )
255
256                function = getattr(obj, methodname)
257
258                try:
259                    res = function(*args, **kwds)
260                except Exception as e:
261                    msg = ('#ERROR', e)
262                else:
263                    typeid = gettypeid and gettypeid.get(methodname, None)
264                    if typeid:
265                        rident, rexposed = self.create(conn, typeid, res)
266                        token = Token(typeid, self.address, rident)
267                        msg = ('#PROXY', (rexposed, token))
268                    else:
269                        msg = ('#RETURN', res)
270
271            except AttributeError:
272                if methodname is None:
273                    msg = ('#TRACEBACK', format_exc())
274                else:
275                    try:
276                        fallback_func = self.fallback_mapping[methodname]
277                        result = fallback_func(
278                            self, conn, ident, obj, *args, **kwds
279                            )
280                        msg = ('#RETURN', result)
281                    except Exception:
282                        msg = ('#TRACEBACK', format_exc())
283
284            except EOFError:
285                util.debug('got EOF -- exiting thread serving %r',
286                           threading.current_thread().name)
287                sys.exit(0)
288
289            except Exception:
290                msg = ('#TRACEBACK', format_exc())
291
292            try:
293                try:
294                    send(msg)
295                except Exception as e:
296                    send(('#UNSERIALIZABLE', repr(msg)))
297            except Exception as e:
298                util.info('exception in thread serving %r',
299                        threading.current_thread().name)
300                util.info(' ... message was %r', msg)
301                util.info(' ... exception was %r', e)
302                conn.close()
303                sys.exit(1)
304
305    def fallback_getvalue(self, conn, ident, obj):
306        return obj
307
308    def fallback_str(self, conn, ident, obj):
309        return str(obj)
310
311    def fallback_repr(self, conn, ident, obj):
312        return repr(obj)
313
314    fallback_mapping = {
315        '__str__':fallback_str,
316        '__repr__':fallback_repr,
317        '#GETVALUE':fallback_getvalue
318        }
319
320    def dummy(self, c):
321        pass
322
323    def debug_info(self, c):
324        '''
325        Return some info --- useful to spot problems with refcounting
326        '''
327        self.mutex.acquire()
328        try:
329            result = []
330            keys = list(self.id_to_obj.keys())
331            keys.sort()
332            for ident in keys:
333                if ident != '0':
334                    result.append('  %s:       refcount=%s\n    %s' %
335                                  (ident, self.id_to_refcount[ident],
336                                   str(self.id_to_obj[ident][0])[:75]))
337            return '\n'.join(result)
338        finally:
339            self.mutex.release()
340
341    def number_of_objects(self, c):
342        '''
343        Number of shared objects
344        '''
345        return len(self.id_to_obj) - 1      # don't count ident='0'
346
347    def shutdown(self, c):
348        '''
349        Shutdown this process
350        '''
351        try:
352            try:
353                util.debug('manager received shutdown message')
354                c.send(('#RETURN', None))
355
356                if sys.stdout != sys.__stdout__:
357                    util.debug('resetting stdout, stderr')
358                    sys.stdout = sys.__stdout__
359                    sys.stderr = sys.__stderr__
360
361                util._run_finalizers(0)
362
363                for p in active_children():
364                    util.debug('terminating a child process of manager')
365                    p.terminate()
366
367                for p in active_children():
368                    util.debug('terminating a child process of manager')
369                    p.join()
370
371                util._run_finalizers()
372                util.info('manager exiting with exitcode 0')
373            except:
374                import traceback
375                traceback.print_exc()
376        finally:
377            exit(0)
378
379    def create(self, c, typeid, *args, **kwds):
380        '''
381        Create a new shared object and return its id
382        '''
383        self.mutex.acquire()
384        try:
385            callable, exposed, method_to_typeid, proxytype = \
386                      self.registry[typeid]
387
388            if callable is None:
389                assert len(args) == 1 and not kwds
390                obj = args[0]
391            else:
392                obj = callable(*args, **kwds)
393
394            if exposed is None:
395                exposed = public_methods(obj)
396            if method_to_typeid is not None:
397                assert type(method_to_typeid) is dict
398                exposed = list(exposed) + list(method_to_typeid)
399
400            ident = '%x' % id(obj)  # convert to string because xmlrpclib
401                                    # only has 32 bit signed integers
402            util.debug('%r callable returned object with id %r', typeid, ident)
403
404            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
405            if ident not in self.id_to_refcount:
406                self.id_to_refcount[ident] = 0
407            # increment the reference count immediately, to avoid
408            # this object being garbage collected before a Proxy
409            # object for it can be created.  The caller of create()
410            # is responsible for doing a decref once the Proxy object
411            # has been created.
412            self.incref(c, ident)
413            return ident, tuple(exposed)
414        finally:
415            self.mutex.release()
416
417    def get_methods(self, c, token):
418        '''
419        Return the methods of the shared object indicated by token
420        '''
421        return tuple(self.id_to_obj[token.id][1])
422
423    def accept_connection(self, c, name):
424        '''
425        Spawn a new thread to serve this connection
426        '''
427        threading.current_thread().name = name
428        c.send(('#RETURN', None))
429        self.serve_client(c)
430
431    def incref(self, c, ident):
432        self.mutex.acquire()
433        try:
434            self.id_to_refcount[ident] += 1
435        finally:
436            self.mutex.release()
437
438    def decref(self, c, ident):
439        self.mutex.acquire()
440        try:
441            assert self.id_to_refcount[ident] >= 1
442            self.id_to_refcount[ident] -= 1
443            if self.id_to_refcount[ident] == 0:
444                del self.id_to_obj[ident], self.id_to_refcount[ident]
445                util.debug('disposing of obj with id %r', ident)
446        finally:
447            self.mutex.release()
448
449#
450# Class to represent state of a manager
451#
452
453class State(object):
454    __slots__ = ['value']
455    INITIAL = 0
456    STARTED = 1
457    SHUTDOWN = 2
458
459#
460# Mapping from serializer name to Listener and Client types
461#
462
463listener_client = {
464    'pickle' : (connection.Listener, connection.Client),
465    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
466    }
467
468#
469# Definition of BaseManager
470#
471
472class BaseManager(object):
473    '''
474    Base class for managers
475    '''
476    _registry = {}
477    _Server = Server
478
479    def __init__(self, address=None, authkey=None, serializer='pickle'):
480        if authkey is None:
481            authkey = current_process().authkey
482        self._address = address     # XXX not final address if eg ('', 0)
483        self._authkey = AuthenticationString(authkey)
484        self._state = State()
485        self._state.value = State.INITIAL
486        self._serializer = serializer
487        self._Listener, self._Client = listener_client[serializer]
488
489    def get_server(self):
490        '''
491        Return server object with serve_forever() method and address attribute
492        '''
493        assert self._state.value == State.INITIAL
494        return Server(self._registry, self._address,
495                      self._authkey, self._serializer)
496
497    def connect(self):
498        '''
499        Connect manager object to the server process
500        '''
501        Listener, Client = listener_client[self._serializer]
502        conn = Client(self._address, authkey=self._authkey)
503        dispatch(conn, None, 'dummy')
504        self._state.value = State.STARTED
505
506    def start(self, initializer=None, initargs=()):
507        '''
508        Spawn a server process for this manager object
509        '''
510        assert self._state.value == State.INITIAL
511
512        if initializer is not None and not hasattr(initializer, '__call__'):
513            raise TypeError('initializer must be a callable')
514
515        # pipe over which we will retrieve address of server
516        reader, writer = connection.Pipe(duplex=False)
517
518        # spawn process which runs a server
519        self._process = Process(
520            target=type(self)._run_server,
521            args=(self._registry, self._address, self._authkey,
522                  self._serializer, writer, initializer, initargs),
523            )
524        ident = ':'.join(str(i) for i in self._process._identity)
525        self._process.name = type(self).__name__  + '-' + ident
526        self._process.start()
527
528        # get address of server
529        writer.close()
530        self._address = reader.recv()
531        reader.close()
532
533        # register a finalizer
534        self._state.value = State.STARTED
535        self.shutdown = util.Finalize(
536            self, type(self)._finalize_manager,
537            args=(self._process, self._address, self._authkey,
538                  self._state, self._Client),
539            exitpriority=0
540            )
541
542    @classmethod
543    def _run_server(cls, registry, address, authkey, serializer, writer,
544                    initializer=None, initargs=()):
545        '''
546        Create a server, report its address and run it
547        '''
548        if initializer is not None:
549            initializer(*initargs)
550
551        # create server
552        server = cls._Server(registry, address, authkey, serializer)
553
554        # inform parent process of the server's address
555        writer.send(server.address)
556        writer.close()
557
558        # run the manager
559        util.info('manager serving at %r', server.address)
560        server.serve_forever()
561
562    def _create(self, typeid, *args, **kwds):
563        '''
564        Create a new shared object; return the token and exposed tuple
565        '''
566        assert self._state.value == State.STARTED, 'server not yet started'
567        conn = self._Client(self._address, authkey=self._authkey)
568        try:
569            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
570        finally:
571            conn.close()
572        return Token(typeid, self._address, id), exposed
573
574    def join(self, timeout=None):
575        '''
576        Join the manager process (if it has been spawned)
577        '''
578        self._process.join(timeout)
579
580    def _debug_info(self):
581        '''
582        Return some info about the servers shared objects and connections
583        '''
584        conn = self._Client(self._address, authkey=self._authkey)
585        try:
586            return dispatch(conn, None, 'debug_info')
587        finally:
588            conn.close()
589
590    def _number_of_objects(self):
591        '''
592        Return the number of shared objects
593        '''
594        conn = self._Client(self._address, authkey=self._authkey)
595        try:
596            return dispatch(conn, None, 'number_of_objects')
597        finally:
598            conn.close()
599
600    def __enter__(self):
601        return self
602
603    def __exit__(self, exc_type, exc_val, exc_tb):
604        self.shutdown()
605
606    @staticmethod
607    def _finalize_manager(process, address, authkey, state, _Client):
608        '''
609        Shutdown the manager process; will be registered as a finalizer
610        '''
611        if process.is_alive():
612            util.info('sending shutdown message to manager')
613            try:
614                conn = _Client(address, authkey=authkey)
615                try:
616                    dispatch(conn, None, 'shutdown')
617                finally:
618                    conn.close()
619            except Exception:
620                pass
621
622            process.join(timeout=0.2)
623            if process.is_alive():
624                util.info('manager still alive')
625                if hasattr(process, 'terminate'):
626                    util.info('trying to `terminate()` manager process')
627                    process.terminate()
628                    process.join(timeout=0.1)
629                    if process.is_alive():
630                        util.info('manager still alive after terminate')
631
632        state.value = State.SHUTDOWN
633        try:
634            del BaseProxy._address_to_local[address]
635        except KeyError:
636            pass
637
638    address = property(lambda self: self._address)
639
640    @classmethod
641    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
642                 method_to_typeid=None, create_method=True):
643        '''
644        Register a typeid with the manager type
645        '''
646        if '_registry' not in cls.__dict__:
647            cls._registry = cls._registry.copy()
648
649        if proxytype is None:
650            proxytype = AutoProxy
651
652        exposed = exposed or getattr(proxytype, '_exposed_', None)
653
654        method_to_typeid = method_to_typeid or \
655                           getattr(proxytype, '_method_to_typeid_', None)
656
657        if method_to_typeid:
658            for key, value in list(method_to_typeid.items()):
659                assert type(key) is str, '%r is not a string' % key
660                assert type(value) is str, '%r is not a string' % value
661
662        cls._registry[typeid] = (
663            callable, exposed, method_to_typeid, proxytype
664            )
665
666        if create_method:
667            def temp(self, *args, **kwds):
668                util.debug('requesting creation of a shared %r object', typeid)
669                token, exp = self._create(typeid, *args, **kwds)
670                proxy = proxytype(
671                    token, self._serializer, manager=self,
672                    authkey=self._authkey, exposed=exp
673                    )
674                conn = self._Client(token.address, authkey=self._authkey)
675                dispatch(conn, None, 'decref', (token.id,))
676                return proxy
677            temp.__name__ = typeid
678            setattr(cls, typeid, temp)
679
680#
681# Subclass of set which get cleared after a fork
682#
683
684class ProcessLocalSet(set):
685    def __init__(self):
686        util.register_after_fork(self, lambda obj: obj.clear())
687    def __reduce__(self):
688        return type(self), ()
689
690#
691# Definition of BaseProxy
692#
693
694class BaseProxy(object):
695    '''
696    A base for proxies of shared objects
697    '''
698    _address_to_local = {}
699    _mutex = util.ForkAwareThreadLock()
700
701    def __init__(self, token, serializer, manager=None,
702                 authkey=None, exposed=None, incref=True):
703        BaseProxy._mutex.acquire()
704        try:
705            tls_idset = BaseProxy._address_to_local.get(token.address, None)
706            if tls_idset is None:
707                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
708                BaseProxy._address_to_local[token.address] = tls_idset
709        finally:
710            BaseProxy._mutex.release()
711
712        # self._tls is used to record the connection used by this
713        # thread to communicate with the manager at token.address
714        self._tls = tls_idset[0]
715
716        # self._idset is used to record the identities of all shared
717        # objects for which the current process owns references and
718        # which are in the manager at token.address
719        self._idset = tls_idset[1]
720
721        self._token = token
722        self._id = self._token.id
723        self._manager = manager
724        self._serializer = serializer
725        self._Client = listener_client[serializer][1]
726
727        if authkey is not None:
728            self._authkey = AuthenticationString(authkey)
729        elif self._manager is not None:
730            self._authkey = self._manager._authkey
731        else:
732            self._authkey = current_process().authkey
733
734        if incref:
735            self._incref()
736
737        util.register_after_fork(self, BaseProxy._after_fork)
738
739    def _connect(self):
740        util.debug('making connection to manager')
741        name = current_process().name
742        if threading.current_thread().name != 'MainThread':
743            name += '|' + threading.current_thread().name
744        conn = self._Client(self._token.address, authkey=self._authkey)
745        dispatch(conn, None, 'accept_connection', (name,))
746        self._tls.connection = conn
747
748    def _callmethod(self, methodname, args=(), kwds={}):
749        '''
750        Try to call a method of the referrent and return a copy of the result
751        '''
752        try:
753            conn = self._tls.connection
754        except AttributeError:
755            util.debug('thread %r does not own a connection',
756                       threading.current_thread().name)
757            self._connect()
758            conn = self._tls.connection
759
760        conn.send((self._id, methodname, args, kwds))
761        kind, result = conn.recv()
762
763        if kind == '#RETURN':
764            return result
765        elif kind == '#PROXY':
766            exposed, token = result
767            proxytype = self._manager._registry[token.typeid][-1]
768            proxy = proxytype(
769                token, self._serializer, manager=self._manager,
770                authkey=self._authkey, exposed=exposed
771                )
772            conn = self._Client(token.address, authkey=self._authkey)
773            dispatch(conn, None, 'decref', (token.id,))
774            return proxy
775        raise convert_to_error(kind, result)
776
777    def _getvalue(self):
778        '''
779        Get a copy of the value of the referent
780        '''
781        return self._callmethod('#GETVALUE')
782
783    def _incref(self):
784        conn = self._Client(self._token.address, authkey=self._authkey)
785        dispatch(conn, None, 'incref', (self._id,))
786        util.debug('INCREF %r', self._token.id)
787
788        self._idset.add(self._id)
789
790        state = self._manager and self._manager._state
791
792        self._close = util.Finalize(
793            self, BaseProxy._decref,
794            args=(self._token, self._authkey, state,
795                  self._tls, self._idset, self._Client),
796            exitpriority=10
797            )
798
799    @staticmethod
800    def _decref(token, authkey, state, tls, idset, _Client):
801        idset.discard(token.id)
802
803        # check whether manager is still alive
804        if state is None or state.value == State.STARTED:
805            # tell manager this process no longer cares about referent
806            try:
807                util.debug('DECREF %r', token.id)
808                conn = _Client(token.address, authkey=authkey)
809                dispatch(conn, None, 'decref', (token.id,))
810            except Exception as e:
811                util.debug('... decref failed %s', e)
812
813        else:
814            util.debug('DECREF %r -- manager already shutdown', token.id)
815
816        # check whether we can close this thread's connection because
817        # the process owns no more references to objects for this manager
818        if not idset and hasattr(tls, 'connection'):
819            util.debug('thread %r has no more proxies so closing conn',
820                       threading.current_thread().name)
821            tls.connection.close()
822            del tls.connection
823
824    def _after_fork(self):
825        self._manager = None
826        try:
827            self._incref()
828        except Exception as e:
829            # the proxy may just be for a manager which has shutdown
830            util.info('incref failed: %s' % e)
831
832    def __reduce__(self):
833        kwds = {}
834        if Popen.thread_is_spawning():
835            kwds['authkey'] = self._authkey
836
837        if getattr(self, '_isauto', False):
838            kwds['exposed'] = self._exposed_
839            return (RebuildProxy,
840                    (AutoProxy, self._token, self._serializer, kwds))
841        else:
842            return (RebuildProxy,
843                    (type(self), self._token, self._serializer, kwds))
844
845    def __deepcopy__(self, memo):
846        return self._getvalue()
847
848    def __repr__(self):
849        return '<%s object, typeid %r at %s>' % \
850               (type(self).__name__, self._token.typeid, '0x%x' % id(self))
851
852    def __str__(self):
853        '''
854        Return representation of the referent (or a fall-back if that fails)
855        '''
856        try:
857            return self._callmethod('__repr__')
858        except Exception:
859            return repr(self)[:-1] + "; '__str__()' failed>"
860
861#
862# Function used for unpickling
863#
864
865def RebuildProxy(func, token, serializer, kwds):
866    '''
867    Function used for unpickling proxy objects.
868
869    If possible the shared object is returned, or otherwise a proxy for it.
870    '''
871    server = getattr(current_process(), '_manager_server', None)
872
873    if server and server.address == token.address:
874        return server.id_to_obj[token.id][0]
875    else:
876        incref = (
877            kwds.pop('incref', True) and
878            not getattr(current_process(), '_inheriting', False)
879            )
880        return func(token, serializer, incref=incref, **kwds)
881
882#
883# Functions to create proxies and proxy types
884#
885
886def MakeProxyType(name, exposed, _cache={}):
887    '''
888    Return an proxy type whose methods are given by `exposed`
889    '''
890    exposed = tuple(exposed)
891    try:
892        return _cache[(name, exposed)]
893    except KeyError:
894        pass
895
896    dic = {}
897
898    for meth in exposed:
899        exec('''def %s(self, *args, **kwds):
900        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
901
902    ProxyType = type(name, (BaseProxy,), dic)
903    ProxyType._exposed_ = exposed
904    _cache[(name, exposed)] = ProxyType
905    return ProxyType
906
907
908def AutoProxy(token, serializer, manager=None, authkey=None,
909              exposed=None, incref=True):
910    '''
911    Return an auto-proxy for `token`
912    '''
913    _Client = listener_client[serializer][1]
914
915    if exposed is None:
916        conn = _Client(token.address, authkey=authkey)
917        try:
918            exposed = dispatch(conn, None, 'get_methods', (token,))
919        finally:
920            conn.close()
921
922    if authkey is None and manager is not None:
923        authkey = manager._authkey
924    if authkey is None:
925        authkey = current_process().authkey
926
927    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
928    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
929                      incref=incref)
930    proxy._isauto = True
931    return proxy
932
933#
934# Types/callables which we will register with SyncManager
935#
936
937class Namespace(object):
938    def __init__(self, **kwds):
939        self.__dict__.update(kwds)
940    def __repr__(self):
941        items = list(self.__dict__.items())
942        temp = []
943        for name, value in items:
944            if not name.startswith('_'):
945                temp.append('%s=%r' % (name, value))
946        temp.sort()
947        return 'Namespace(%s)' % str.join(', ', temp)
948
949class Value(object):
950    def __init__(self, typecode, value, lock=True):
951        self._typecode = typecode
952        self._value = value
953    def get(self):
954        return self._value
955    def set(self, value):
956        self._value = value
957    def __repr__(self):
958        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
959    value = property(get, set)
960
961def Array(typecode, sequence, lock=True):
962    return array.array(typecode, sequence)
963
964#
965# Proxy types used by SyncManager
966#
967
968class IteratorProxy(BaseProxy):
969    _exposed_ = ('__next__', 'send', 'throw', 'close')
970    def __iter__(self):
971        return self
972    def __next__(self, *args):
973        return self._callmethod('__next__', args)
974    def send(self, *args):
975        return self._callmethod('send', args)
976    def throw(self, *args):
977        return self._callmethod('throw', args)
978    def close(self, *args):
979        return self._callmethod('close', args)
980
981
982class AcquirerProxy(BaseProxy):
983    _exposed_ = ('acquire', 'release')
984    def acquire(self, blocking=True):
985        return self._callmethod('acquire', (blocking,))
986    def release(self):
987        return self._callmethod('release')
988    def __enter__(self):
989        return self._callmethod('acquire')
990    def __exit__(self, exc_type, exc_val, exc_tb):
991        return self._callmethod('release')
992
993
994class ConditionProxy(AcquirerProxy):
995    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
996    def wait(self, timeout=None):
997        return self._callmethod('wait', (timeout,))
998    def notify(self):
999        return self._callmethod('notify')
1000    def notify_all(self):
1001        return self._callmethod('notify_all')
1002
1003class EventProxy(BaseProxy):
1004    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1005    def is_set(self):
1006        return self._callmethod('is_set')
1007    def set(self):
1008        return self._callmethod('set')
1009    def clear(self):
1010        return self._callmethod('clear')
1011    def wait(self, timeout=None):
1012        return self._callmethod('wait', (timeout,))
1013
1014class NamespaceProxy(BaseProxy):
1015    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1016    def __getattr__(self, key):
1017        if key[0] == '_':
1018            return object.__getattribute__(self, key)
1019        callmethod = object.__getattribute__(self, '_callmethod')
1020        return callmethod('__getattribute__', (key,))
1021    def __setattr__(self, key, value):
1022        if key[0] == '_':
1023            return object.__setattr__(self, key, value)
1024        callmethod = object.__getattribute__(self, '_callmethod')
1025        return callmethod('__setattr__', (key, value))
1026    def __delattr__(self, key):
1027        if key[0] == '_':
1028            return object.__delattr__(self, key)
1029        callmethod = object.__getattribute__(self, '_callmethod')
1030        return callmethod('__delattr__', (key,))
1031
1032
1033class ValueProxy(BaseProxy):
1034    _exposed_ = ('get', 'set')
1035    def get(self):
1036        return self._callmethod('get')
1037    def set(self, value):
1038        return self._callmethod('set', (value,))
1039    value = property(get, set)
1040
1041
1042BaseListProxy = MakeProxyType('BaseListProxy', (
1043    '__add__', '__contains__', '__delitem__', '__delslice__',
1044    '__getitem__', '__getslice__', '__len__', '__mul__',
1045    '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1046    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1047    'reverse', 'sort', '__imul__'
1048    ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
1049class ListProxy(BaseListProxy):
1050    def __iadd__(self, value):
1051        self._callmethod('extend', (value,))
1052        return self
1053    def __imul__(self, value):
1054        self._callmethod('__imul__', (value,))
1055        return self
1056
1057
1058DictProxy = MakeProxyType('DictProxy', (
1059    '__contains__', '__delitem__', '__getitem__', '__len__',
1060    '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1061    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1062    ))
1063
1064
1065ArrayProxy = MakeProxyType('ArrayProxy', (
1066    '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1067    ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
1068
1069
1070PoolProxy = MakeProxyType('PoolProxy', (
1071    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1072    'map', 'map_async', 'terminate'
1073    ))
1074PoolProxy._method_to_typeid_ = {
1075    'apply_async': 'AsyncResult',
1076    'map_async': 'AsyncResult',
1077    'imap': 'Iterator',
1078    'imap_unordered': 'Iterator'
1079    }
1080
1081#
1082# Definition of SyncManager
1083#
1084
1085class SyncManager(BaseManager):
1086    '''
1087    Subclass of `BaseManager` which supports a number of shared object types.
1088
1089    The types registered are those intended for the synchronization
1090    of threads, plus `dict`, `list` and `Namespace`.
1091
1092    The `multiprocessing.Manager()` function creates started instances of
1093    this class.
1094    '''
1095
1096SyncManager.register('Queue', queue.Queue)
1097SyncManager.register('JoinableQueue', queue.Queue)
1098SyncManager.register('Event', threading.Event, EventProxy)
1099SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1100SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1101SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1102SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1103                     AcquirerProxy)
1104SyncManager.register('Condition', threading.Condition, ConditionProxy)
1105SyncManager.register('Pool', Pool, PoolProxy)
1106SyncManager.register('list', list, ListProxy)
1107SyncManager.register('dict', dict, DictProxy)
1108SyncManager.register('Value', Value, ValueProxy)
1109SyncManager.register('Array', Array, ArrayProxy)
1110SyncManager.register('Namespace', Namespace, NamespaceProxy)
1111
1112# types returned by methods of PoolProxy
1113SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1114SyncManager.register('AsyncResult', create_method=False)
1115