1#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15#    notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17#    notice, this list of conditions and the following disclaimer in the
18#    documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20#    used to endorse or promote products derived from this software
21#    without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
34#
35
36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
37
38#
39# Imports
40#
41
42import os
43import sys
44import weakref
45import threading
46import array
47import Queue
48
49from traceback import format_exc
50from multiprocess import Process, current_process, active_children, Pool, util, connection
51from multiprocess.process import AuthenticationString
52from multiprocess.forking import exit, Popen, assert_spawning, ForkingPickler
53from multiprocess.util import Finalize, info
54
55try:
56    from dill import PicklingError
57except ImportError:
58    try:
59        from cPickle import PicklingError
60    except ImportError:
61        from pickle import PicklingError
62
63#
64# Register some things for pickling
65#
66
67def reduce_array(a):
68    return array.array, (a.typecode, a.tostring())
69ForkingPickler.register(array.array, reduce_array)
70
71view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
72
73#
74# Type for identifying shared objects
75#
76
77class Token(object):
78    '''
79    Type to uniquely indentify a shared object
80    '''
81    __slots__ = ('typeid', 'address', 'id')
82
83    def __init__(self, typeid, address, id):
84        (self.typeid, self.address, self.id) = (typeid, address, id)
85
86    def __getstate__(self):
87        return (self.typeid, self.address, self.id)
88
89    def __setstate__(self, state):
90        (self.typeid, self.address, self.id) = state
91
92    def __repr__(self):
93        return 'Token(typeid=%r, address=%r, id=%r)' % \
94               (self.typeid, self.address, self.id)
95
96#
97# Function for communication with a manager's server process
98#
99
100def dispatch(c, id, methodname, args=(), kwds={}):
101    '''
102    Send a message to manager using connection `c` and return response
103    '''
104    c.send((id, methodname, args, kwds))
105    kind, result = c.recv()
106    if kind == '#RETURN':
107        return result
108    raise convert_to_error(kind, result)
109
110def convert_to_error(kind, result):
111    if kind == '#ERROR':
112        return result
113    elif kind == '#TRACEBACK':
114        assert type(result) is str
115        return  RemoteError(result)
116    elif kind == '#UNSERIALIZABLE':
117        assert type(result) is str
118        return RemoteError('Unserializable message: %s\n' % result)
119    else:
120        return ValueError('Unrecognized message type')
121
122class RemoteError(Exception):
123    def __str__(self):
124        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
125
126#
127# Functions for finding the method names of an object
128#
129
130def all_methods(obj):
131    '''
132    Return a list of names of methods of `obj`
133    '''
134    temp = []
135    for name in dir(obj):
136        func = getattr(obj, name)
137        if hasattr(func, '__call__'):
138            temp.append(name)
139    return temp
140
141def public_methods(obj):
142    '''
143    Return a list of names of methods of `obj` which do not start with '_'
144    '''
145    return [name for name in all_methods(obj) if name[0] != '_']
146
147#
148# Server which is run in a process controlled by a manager
149#
150
151class Server(object):
152    '''
153    Server class which runs in a process controlled by a manager object
154    '''
155    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
156              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
157
158    def __init__(self, registry, address, authkey, serializer):
159        assert isinstance(authkey, bytes)
160        self.registry = registry
161        self.authkey = AuthenticationString(authkey)
162        Listener, Client = listener_client[serializer]
163
164        # do authentication later
165        self.listener = Listener(address=address, backlog=16)
166        self.address = self.listener.address
167
168        self.id_to_obj = {'0': (None, ())}
169        self.id_to_refcount = {}
170        self.mutex = threading.RLock()
171        self.stop = 0
172
173    def serve_forever(self):
174        '''
175        Run the server forever
176        '''
177        current_process()._manager_server = self
178        try:
179            try:
180                while 1:
181                    try:
182                        c = self.listener.accept()
183                    except (OSError, IOError):
184                        continue
185                    t = threading.Thread(target=self.handle_request, args=(c,))
186                    t.daemon = True
187                    t.start()
188            except (KeyboardInterrupt, SystemExit):
189                pass
190        finally:
191            self.stop = 999
192            self.listener.close()
193
194    def handle_request(self, c):
195        '''
196        Handle a new connection
197        '''
198        funcname = result = request = None
199        try:
200            connection.deliver_challenge(c, self.authkey)
201            connection.answer_challenge(c, self.authkey)
202            request = c.recv()
203            ignore, funcname, args, kwds = request
204            assert funcname in self.public, '%r unrecognized' % funcname
205            func = getattr(self, funcname)
206        except Exception:
207            msg = ('#TRACEBACK', format_exc())
208        else:
209            try:
210                result = func(c, *args, **kwds)
211            except Exception:
212                msg = ('#TRACEBACK', format_exc())
213            else:
214                msg = ('#RETURN', result)
215        try:
216            c.send(msg)
217        except Exception, e:
218            try:
219                c.send(('#TRACEBACK', format_exc()))
220            except Exception:
221                pass
222            util.info('Failure to send message: %r', msg)
223            util.info(' ... request was %r', request)
224            util.info(' ... exception was %r', e)
225
226        c.close()
227
228    def serve_client(self, conn):
229        '''
230        Handle requests from the proxies in a particular process/thread
231        '''
232        util.debug('starting server thread to service %r',
233                   threading.current_thread().name)
234
235        recv = conn.recv
236        send = conn.send
237        id_to_obj = self.id_to_obj
238
239        while not self.stop:
240
241            try:
242                methodname = obj = None
243                request = recv()
244                ident, methodname, args, kwds = request
245                obj, exposed, gettypeid = id_to_obj[ident]
246
247                if methodname not in exposed:
248                    raise AttributeError(
249                        'method %r of %r object is not in exposed=%r' %
250                        (methodname, type(obj), exposed)
251                        )
252
253                function = getattr(obj, methodname)
254
255                try:
256                    res = function(*args, **kwds)
257                except Exception, e:
258                    msg = ('#ERROR', e)
259                else:
260                    typeid = gettypeid and gettypeid.get(methodname, None)
261                    if typeid:
262                        rident, rexposed = self.create(conn, typeid, res)
263                        token = Token(typeid, self.address, rident)
264                        msg = ('#PROXY', (rexposed, token))
265                    else:
266                        msg = ('#RETURN', res)
267
268            except AttributeError:
269                if methodname is None:
270                    msg = ('#TRACEBACK', format_exc())
271                else:
272                    try:
273                        fallback_func = self.fallback_mapping[methodname]
274                        result = fallback_func(
275                            self, conn, ident, obj, *args, **kwds
276                            )
277                        msg = ('#RETURN', result)
278                    except Exception:
279                        msg = ('#TRACEBACK', format_exc())
280
281            except EOFError:
282                util.debug('got EOF -- exiting thread serving %r',
283                           threading.current_thread().name)
284                sys.exit(0)
285
286            except Exception:
287                msg = ('#TRACEBACK', format_exc())
288
289            try:
290                try:
291                    send(msg)
292                except Exception, e:
293                    send(('#UNSERIALIZABLE', format_exc()))
294            except Exception, e:
295                util.info('exception in thread serving %r',
296                        threading.current_thread().name)
297                util.info(' ... message was %r', msg)
298                util.info(' ... exception was %r', e)
299                conn.close()
300                sys.exit(1)
301
302    def fallback_getvalue(self, conn, ident, obj):
303        return obj
304
305    def fallback_str(self, conn, ident, obj):
306        return str(obj)
307
308    def fallback_repr(self, conn, ident, obj):
309        return repr(obj)
310
311    fallback_mapping = {
312        '__str__':fallback_str,
313        '__repr__':fallback_repr,
314        '#GETVALUE':fallback_getvalue
315        }
316
317    def dummy(self, c):
318        pass
319
320    def debug_info(self, c):
321        '''
322        Return some info --- useful to spot problems with refcounting
323        '''
324        self.mutex.acquire()
325        try:
326            result = []
327            keys = self.id_to_obj.keys()
328            keys.sort()
329            for ident in keys:
330                if ident != '0':
331                    result.append('  %s:       refcount=%s\n    %s' %
332                                  (ident, self.id_to_refcount[ident],
333                                   str(self.id_to_obj[ident][0])[:75]))
334            return '\n'.join(result)
335        finally:
336            self.mutex.release()
337
338    def number_of_objects(self, c):
339        '''
340        Number of shared objects
341        '''
342        return len(self.id_to_obj) - 1      # don't count ident='0'
343
344    def shutdown(self, c):
345        '''
346        Shutdown this process
347        '''
348        try:
349            try:
350                util.debug('manager received shutdown message')
351                c.send(('#RETURN', None))
352
353                if sys.stdout != sys.__stdout__:
354                    util.debug('resetting stdout, stderr')
355                    sys.stdout = sys.__stdout__
356                    sys.stderr = sys.__stderr__
357
358                util._run_finalizers(0)
359
360                for p in active_children():
361                    util.debug('terminating a child process of manager')
362                    p.terminate()
363
364                for p in active_children():
365                    util.debug('terminating a child process of manager')
366                    p.join()
367
368                util._run_finalizers()
369                util.info('manager exiting with exitcode 0')
370            except:
371                import traceback
372                traceback.print_exc()
373        finally:
374            exit(0)
375
376    def create(self, c, typeid, *args, **kwds):
377        '''
378        Create a new shared object and return its id
379        '''
380        self.mutex.acquire()
381        try:
382            callable, exposed, method_to_typeid, proxytype = \
383                      self.registry[typeid]
384
385            if callable is None:
386                assert len(args) == 1 and not kwds
387                obj = args[0]
388            else:
389                obj = callable(*args, **kwds)
390
391            if exposed is None:
392                exposed = public_methods(obj)
393            if method_to_typeid is not None:
394                assert type(method_to_typeid) is dict
395                exposed = list(exposed) + list(method_to_typeid)
396
397            ident = '%x' % id(obj)  # convert to string because xmlrpclib
398                                    # only has 32 bit signed integers
399            util.debug('%r callable returned object with id %r', typeid, ident)
400
401            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
402            if ident not in self.id_to_refcount:
403                self.id_to_refcount[ident] = 0
404            # increment the reference count immediately, to avoid
405            # this object being garbage collected before a Proxy
406            # object for it can be created.  The caller of create()
407            # is responsible for doing a decref once the Proxy object
408            # has been created.
409            self.incref(c, ident)
410            return ident, tuple(exposed)
411        finally:
412            self.mutex.release()
413
414    def get_methods(self, c, token):
415        '''
416        Return the methods of the shared object indicated by token
417        '''
418        return tuple(self.id_to_obj[token.id][1])
419
420    def accept_connection(self, c, name):
421        '''
422        Spawn a new thread to serve this connection
423        '''
424        threading.current_thread().name = name
425        c.send(('#RETURN', None))
426        self.serve_client(c)
427
428    def incref(self, c, ident):
429        self.mutex.acquire()
430        try:
431            self.id_to_refcount[ident] += 1
432        finally:
433            self.mutex.release()
434
435    def decref(self, c, ident):
436        self.mutex.acquire()
437        try:
438            assert self.id_to_refcount[ident] >= 1
439            self.id_to_refcount[ident] -= 1
440            if self.id_to_refcount[ident] == 0:
441                del self.id_to_obj[ident], self.id_to_refcount[ident]
442                util.debug('disposing of obj with id %r', ident)
443        finally:
444            self.mutex.release()
445
446#
447# Class to represent state of a manager
448#
449
450class State(object):
451    __slots__ = ['value']
452    INITIAL = 0
453    STARTED = 1
454    SHUTDOWN = 2
455
456#
457# Mapping from serializer name to Listener and Client types
458#
459
460listener_client = {
461    'pickle' : (connection.Listener, connection.Client),
462    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
463    }
464
465#
466# Definition of BaseManager
467#
468
469class BaseManager(object):
470    '''
471    Base class for managers
472    '''
473    _registry = {}
474    _Server = Server
475
476    def __init__(self, address=None, authkey=None, serializer='pickle'):
477        if authkey is None:
478            authkey = current_process().authkey
479        self._address = address     # XXX not final address if eg ('', 0)
480        self._authkey = AuthenticationString(authkey)
481        self._state = State()
482        self._state.value = State.INITIAL
483        self._serializer = serializer
484        self._Listener, self._Client = listener_client[serializer]
485
486    def __reduce__(self):
487        return type(self).from_address, \
488               (self._address, self._authkey, self._serializer)
489
490    def get_server(self):
491        '''
492        Return server object with serve_forever() method and address attribute
493        '''
494        assert self._state.value == State.INITIAL
495        return Server(self._registry, self._address,
496                      self._authkey, self._serializer)
497
498    def connect(self):
499        '''
500        Connect manager object to the server process
501        '''
502        Listener, Client = listener_client[self._serializer]
503        conn = Client(self._address, authkey=self._authkey)
504        dispatch(conn, None, 'dummy')
505        self._state.value = State.STARTED
506
507    def start(self, initializer=None, initargs=()):
508        '''
509        Spawn a server process for this manager object
510        '''
511        assert self._state.value == State.INITIAL
512
513        if initializer is not None and not hasattr(initializer, '__call__'):
514            raise TypeError('initializer must be a callable')
515
516        # pipe over which we will retrieve address of server
517        reader, writer = connection.Pipe(duplex=False)
518
519        # spawn process which runs a server
520        self._process = Process(
521            target=type(self)._run_server,
522            args=(self._registry, self._address, self._authkey,
523                  self._serializer, writer, initializer, initargs),
524            )
525        ident = ':'.join(str(i) for i in self._process._identity)
526        self._process.name = type(self).__name__  + '-' + ident
527        self._process.start()
528
529        # get address of server
530        writer.close()
531        self._address = reader.recv()
532        reader.close()
533
534        # register a finalizer
535        self._state.value = State.STARTED
536        self.shutdown = util.Finalize(
537            self, type(self)._finalize_manager,
538            args=(self._process, self._address, self._authkey,
539                  self._state, self._Client),
540            exitpriority=0
541            )
542
543    @classmethod
544    def _run_server(cls, registry, address, authkey, serializer, writer,
545                    initializer=None, initargs=()):
546        '''
547        Create a server, report its address and run it
548        '''
549        if initializer is not None:
550            initializer(*initargs)
551
552        # create server
553        server = cls._Server(registry, address, authkey, serializer)
554
555        # inform parent process of the server's address
556        writer.send(server.address)
557        writer.close()
558
559        # run the manager
560        util.info('manager serving at %r', server.address)
561        server.serve_forever()
562
563    def _create(self, typeid, *args, **kwds):
564        '''
565        Create a new shared object; return the token and exposed tuple
566        '''
567        assert self._state.value == State.STARTED, 'server not yet started'
568        conn = self._Client(self._address, authkey=self._authkey)
569        try:
570            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
571        finally:
572            conn.close()
573        return Token(typeid, self._address, id), exposed
574
575    def join(self, timeout=None):
576        '''
577        Join the manager process (if it has been spawned)
578        '''
579        self._process.join(timeout)
580
581    def _debug_info(self):
582        '''
583        Return some info about the servers shared objects and connections
584        '''
585        conn = self._Client(self._address, authkey=self._authkey)
586        try:
587            return dispatch(conn, None, 'debug_info')
588        finally:
589            conn.close()
590
591    def _number_of_objects(self):
592        '''
593        Return the number of shared objects
594        '''
595        conn = self._Client(self._address, authkey=self._authkey)
596        try:
597            return dispatch(conn, None, 'number_of_objects')
598        finally:
599            conn.close()
600
601    def __enter__(self):
602        return self
603
604    def __exit__(self, exc_type, exc_val, exc_tb):
605        self.shutdown()
606
607    @staticmethod
608    def _finalize_manager(process, address, authkey, state, _Client):
609        '''
610        Shutdown the manager process; will be registered as a finalizer
611        '''
612        if process.is_alive():
613            util.info('sending shutdown message to manager')
614            try:
615                conn = _Client(address, authkey=authkey)
616                try:
617                    dispatch(conn, None, 'shutdown')
618                finally:
619                    conn.close()
620            except Exception:
621                pass
622
623            process.join(timeout=0.2)
624            if process.is_alive():
625                util.info('manager still alive')
626                if hasattr(process, 'terminate'):
627                    util.info('trying to `terminate()` manager process')
628                    process.terminate()
629                    process.join(timeout=0.1)
630                    if process.is_alive():
631                        util.info('manager still alive after terminate')
632
633        state.value = State.SHUTDOWN
634        try:
635            del BaseProxy._address_to_local[address]
636        except KeyError:
637            pass
638
639    address = property(lambda self: self._address)
640
641    @classmethod
642    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
643                 method_to_typeid=None, create_method=True):
644        '''
645        Register a typeid with the manager type
646        '''
647        if '_registry' not in cls.__dict__:
648            cls._registry = cls._registry.copy()
649
650        if proxytype is None:
651            proxytype = AutoProxy
652
653        exposed = exposed or getattr(proxytype, '_exposed_', None)
654
655        method_to_typeid = method_to_typeid or \
656                           getattr(proxytype, '_method_to_typeid_', None)
657
658        if method_to_typeid:
659            for key, value in method_to_typeid.items():
660                assert type(key) is str, '%r is not a string' % key
661                assert type(value) is str, '%r is not a string' % value
662
663        cls._registry[typeid] = (
664            callable, exposed, method_to_typeid, proxytype
665            )
666
667        if create_method:
668            def temp(self, *args, **kwds):
669                util.debug('requesting creation of a shared %r object', typeid)
670                token, exp = self._create(typeid, *args, **kwds)
671                proxy = proxytype(
672                    token, self._serializer, manager=self,
673                    authkey=self._authkey, exposed=exp
674                    )
675                conn = self._Client(token.address, authkey=self._authkey)
676                dispatch(conn, None, 'decref', (token.id,))
677                return proxy
678            temp.__name__ = typeid
679            setattr(cls, typeid, temp)
680
681#
682# Subclass of set which get cleared after a fork
683#
684
685class ProcessLocalSet(set):
686    def __init__(self):
687        util.register_after_fork(self, lambda obj: obj.clear())
688    def __reduce__(self):
689        return type(self), ()
690
691#
692# Definition of BaseProxy
693#
694
695class BaseProxy(object):
696    '''
697    A base for proxies of shared objects
698    '''
699    _address_to_local = {}
700    _mutex = util.ForkAwareThreadLock()
701
702    def __init__(self, token, serializer, manager=None,
703                 authkey=None, exposed=None, incref=True):
704        BaseProxy._mutex.acquire()
705        try:
706            tls_idset = BaseProxy._address_to_local.get(token.address, None)
707            if tls_idset is None:
708                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
709                BaseProxy._address_to_local[token.address] = tls_idset
710        finally:
711            BaseProxy._mutex.release()
712
713        # self._tls is used to record the connection used by this
714        # thread to communicate with the manager at token.address
715        self._tls = tls_idset[0]
716
717        # self._idset is used to record the identities of all shared
718        # objects for which the current process owns references and
719        # which are in the manager at token.address
720        self._idset = tls_idset[1]
721
722        self._token = token
723        self._id = self._token.id
724        self._manager = manager
725        self._serializer = serializer
726        self._Client = listener_client[serializer][1]
727
728        if authkey is not None:
729            self._authkey = AuthenticationString(authkey)
730        elif self._manager is not None:
731            self._authkey = self._manager._authkey
732        else:
733            self._authkey = current_process().authkey
734
735        if incref:
736            self._incref()
737
738        util.register_after_fork(self, BaseProxy._after_fork)
739
740    def _connect(self):
741        util.debug('making connection to manager')
742        name = current_process().name
743        if threading.current_thread().name != 'MainThread':
744            name += '|' + threading.current_thread().name
745        conn = self._Client(self._token.address, authkey=self._authkey)
746        dispatch(conn, None, 'accept_connection', (name,))
747        self._tls.connection = conn
748
749    def _callmethod(self, methodname, args=(), kwds={}):
750        '''
751        Try to call a method of the referrent and return a copy of the result
752        '''
753        try:
754            conn = self._tls.connection
755        except AttributeError:
756            util.debug('thread %r does not own a connection',
757                       threading.current_thread().name)
758            self._connect()
759            conn = self._tls.connection
760
761        conn.send((self._id, methodname, args, kwds))
762        kind, result = conn.recv()
763
764        if kind == '#RETURN':
765            return result
766        elif kind == '#PROXY':
767            exposed, token = result
768            proxytype = self._manager._registry[token.typeid][-1]
769            token.address = self._token.address
770            proxy = proxytype(
771                token, self._serializer, manager=self._manager,
772                authkey=self._authkey, exposed=exposed
773                )
774            conn = self._Client(token.address, authkey=self._authkey)
775            dispatch(conn, None, 'decref', (token.id,))
776            return proxy
777        raise convert_to_error(kind, result)
778
779    def _getvalue(self):
780        '''
781        Get a copy of the value of the referent
782        '''
783        return self._callmethod('#GETVALUE')
784
785    def _incref(self):
786        conn = self._Client(self._token.address, authkey=self._authkey)
787        dispatch(conn, None, 'incref', (self._id,))
788        util.debug('INCREF %r', self._token.id)
789
790        self._idset.add(self._id)
791
792        state = self._manager and self._manager._state
793
794        self._close = util.Finalize(
795            self, BaseProxy._decref,
796            args=(self._token, self._authkey, state,
797                  self._tls, self._idset, self._Client),
798            exitpriority=10
799            )
800
801    @staticmethod
802    def _decref(token, authkey, state, tls, idset, _Client):
803        idset.discard(token.id)
804
805        # check whether manager is still alive
806        if state is None or state.value == State.STARTED:
807            # tell manager this process no longer cares about referent
808            try:
809                util.debug('DECREF %r', token.id)
810                conn = _Client(token.address, authkey=authkey)
811                dispatch(conn, None, 'decref', (token.id,))
812            except Exception, e:
813                util.debug('... decref failed %s', e)
814
815        else:
816            util.debug('DECREF %r -- manager already shutdown', token.id)
817
818        # check whether we can close this thread's connection because
819        # the process owns no more references to objects for this manager
820        if not idset and hasattr(tls, 'connection'):
821            util.debug('thread %r has no more proxies so closing conn',
822                       threading.current_thread().name)
823            tls.connection.close()
824            del tls.connection
825
826    def _after_fork(self):
827        self._manager = None
828        try:
829            self._incref()
830        except Exception, e:
831            # the proxy may just be for a manager which has shutdown
832            util.info('incref failed: %s' % e)
833
834    def __reduce__(self):
835        kwds = {}
836        if Popen.thread_is_spawning():
837            kwds['authkey'] = self._authkey
838
839        if getattr(self, '_isauto', False):
840            kwds['exposed'] = self._exposed_
841            return (RebuildProxy,
842                    (AutoProxy, self._token, self._serializer, kwds))
843        else:
844            return (RebuildProxy,
845                    (type(self), self._token, self._serializer, kwds))
846
847    def __deepcopy__(self, memo):
848        return self._getvalue()
849
850    def __repr__(self):
851        return '<%s object, typeid %r at %s>' % \
852               (type(self).__name__, self._token.typeid, '0x%x' % id(self))
853
854    def __str__(self):
855        '''
856        Return representation of the referent (or a fall-back if that fails)
857        '''
858        try:
859            return self._callmethod('__repr__')
860        except Exception:
861            return repr(self)[:-1] + "; '__str__()' failed>"
862
863#
864# Function used for unpickling
865#
866
867def RebuildProxy(func, token, serializer, kwds):
868    '''
869    Function used for unpickling proxy objects.
870
871    If possible the shared object is returned, or otherwise a proxy for it.
872    '''
873    server = getattr(current_process(), '_manager_server', None)
874
875    if server and server.address == token.address:
876        return server.id_to_obj[token.id][0]
877    else:
878        incref = (
879            kwds.pop('incref', True) and
880            not getattr(current_process(), '_inheriting', False)
881            )
882        return func(token, serializer, incref=incref, **kwds)
883
884#
885# Functions to create proxies and proxy types
886#
887
888def MakeProxyType(name, exposed, _cache={}):
889    '''
890    Return a proxy type whose methods are given by `exposed`
891    '''
892    exposed = tuple(exposed)
893    try:
894        return _cache[(name, exposed)]
895    except KeyError:
896        pass
897
898    dic = {}
899
900    for meth in exposed:
901        exec '''def %s(self, *args, **kwds):
902        return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
903
904    ProxyType = type(name, (BaseProxy,), dic)
905    ProxyType._exposed_ = exposed
906    _cache[(name, exposed)] = ProxyType
907    return ProxyType
908
909
910def AutoProxy(token, serializer, manager=None, authkey=None,
911              exposed=None, incref=True):
912    '''
913    Return an auto-proxy for `token`
914    '''
915    _Client = listener_client[serializer][1]
916
917    if exposed is None:
918        conn = _Client(token.address, authkey=authkey)
919        try:
920            exposed = dispatch(conn, None, 'get_methods', (token,))
921        finally:
922            conn.close()
923
924    if authkey is None and manager is not None:
925        authkey = manager._authkey
926    if authkey is None:
927        authkey = current_process().authkey
928
929    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
930    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
931                      incref=incref)
932    proxy._isauto = True
933    return proxy
934
935#
936# Types/callables which we will register with SyncManager
937#
938
939class Namespace(object):
940    def __init__(self, **kwds):
941        self.__dict__.update(kwds)
942    def __repr__(self):
943        items = self.__dict__.items()
944        temp = []
945        for name, value in items:
946            if not name.startswith('_'):
947                temp.append('%s=%r' % (name, value))
948        temp.sort()
949        return 'Namespace(%s)' % str.join(', ', temp)
950
951class Value(object):
952    def __init__(self, typecode, value, lock=True):
953        self._typecode = typecode
954        self._value = value
955    def get(self):
956        return self._value
957    def set(self, value):
958        self._value = value
959    def __repr__(self):
960        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
961    value = property(get, set)
962
963def Array(typecode, sequence, lock=True):
964    return array.array(typecode, sequence)
965
966#
967# Proxy types used by SyncManager
968#
969
970class IteratorProxy(BaseProxy):
971    # XXX remove methods for Py3.0 and Py2.6
972    _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
973    def __iter__(self):
974        return self
975    def __next__(self, *args):
976        return self._callmethod('__next__', args)
977    def next(self, *args):
978        return self._callmethod('next', args)
979    def send(self, *args):
980        return self._callmethod('send', args)
981    def throw(self, *args):
982        return self._callmethod('throw', args)
983    def close(self, *args):
984        return self._callmethod('close', args)
985
986
987class AcquirerProxy(BaseProxy):
988    _exposed_ = ('acquire', 'release')
989    def acquire(self, blocking=True):
990        return self._callmethod('acquire', (blocking,))
991    def release(self):
992        return self._callmethod('release')
993    def __enter__(self):
994        return self._callmethod('acquire')
995    def __exit__(self, exc_type, exc_val, exc_tb):
996        return self._callmethod('release')
997
998
999class ConditionProxy(AcquirerProxy):
1000    # XXX will Condition.notfyAll() name be available in Py3.0?
1001    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1002    def wait(self, timeout=None):
1003        return self._callmethod('wait', (timeout,))
1004    def notify(self):
1005        return self._callmethod('notify')
1006    def notify_all(self):
1007        return self._callmethod('notify_all')
1008
1009class EventProxy(BaseProxy):
1010    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1011    def is_set(self):
1012        return self._callmethod('is_set')
1013    def set(self):
1014        return self._callmethod('set')
1015    def clear(self):
1016        return self._callmethod('clear')
1017    def wait(self, timeout=None):
1018        return self._callmethod('wait', (timeout,))
1019
1020class NamespaceProxy(BaseProxy):
1021    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1022    def __getattr__(self, key):
1023        if key[0] == '_':
1024            return object.__getattribute__(self, key)
1025        callmethod = object.__getattribute__(self, '_callmethod')
1026        return callmethod('__getattribute__', (key,))
1027    def __setattr__(self, key, value):
1028        if key[0] == '_':
1029            return object.__setattr__(self, key, value)
1030        callmethod = object.__getattribute__(self, '_callmethod')
1031        return callmethod('__setattr__', (key, value))
1032    def __delattr__(self, key):
1033        if key[0] == '_':
1034            return object.__delattr__(self, key)
1035        callmethod = object.__getattribute__(self, '_callmethod')
1036        return callmethod('__delattr__', (key,))
1037
1038
1039class ValueProxy(BaseProxy):
1040    _exposed_ = ('get', 'set')
1041    def get(self):
1042        return self._callmethod('get')
1043    def set(self, value):
1044        return self._callmethod('set', (value,))
1045    value = property(get, set)
1046
1047
1048BaseListProxy = MakeProxyType('BaseListProxy', (
1049    '__add__', '__contains__', '__delitem__', '__delslice__',
1050    '__getitem__', '__getslice__', '__len__', '__mul__',
1051    '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1052    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1053    'reverse', 'sort', '__imul__'
1054    ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
1055class ListProxy(BaseListProxy):
1056    def __iadd__(self, value):
1057        self._callmethod('extend', (value,))
1058        return self
1059    def __imul__(self, value):
1060        self._callmethod('__imul__', (value,))
1061        return self
1062
1063
1064DictProxy = MakeProxyType('DictProxy', (
1065    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1066    '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1067    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1068    ))
1069DictProxy._method_to_typeid_ = {
1070    '__iter__': 'Iterator',
1071    }
1072
1073
1074ArrayProxy = MakeProxyType('ArrayProxy', (
1075    '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1076    ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
1077
1078
1079PoolProxy = MakeProxyType('PoolProxy', (
1080    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1081    'map', 'map_async', 'terminate'
1082    ))
1083PoolProxy._method_to_typeid_ = {
1084    'apply_async': 'AsyncResult',
1085    'map_async': 'AsyncResult',
1086    'imap': 'Iterator',
1087    'imap_unordered': 'Iterator'
1088    }
1089
1090#
1091# Definition of SyncManager
1092#
1093
1094class SyncManager(BaseManager):
1095    '''
1096    Subclass of `BaseManager` which supports a number of shared object types.
1097
1098    The types registered are those intended for the synchronization
1099    of threads, plus `dict`, `list` and `Namespace`.
1100
1101    The `multiprocessing.Manager()` function creates started instances of
1102    this class.
1103    '''
1104
1105SyncManager.register('Queue', Queue.Queue)
1106SyncManager.register('JoinableQueue', Queue.Queue)
1107SyncManager.register('Event', threading.Event, EventProxy)
1108SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1109SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1110SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1111SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1112                     AcquirerProxy)
1113SyncManager.register('Condition', threading.Condition, ConditionProxy)
1114SyncManager.register('Pool', Pool, PoolProxy)
1115SyncManager.register('list', list, ListProxy)
1116SyncManager.register('dict', dict, DictProxy)
1117SyncManager.register('Value', Value, ValueProxy)
1118SyncManager.register('Array', Array, ArrayProxy)
1119SyncManager.register('Namespace', Namespace, NamespaceProxy)
1120
1121# types returned by methods of PoolProxy
1122SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1123SyncManager.register('AsyncResult', create_method=False)
1124