1# 2# Module providing the `SyncManager` class for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# All rights reserved. 9# 10# Redistribution and use in source and binary forms, with or without 11# modification, are permitted provided that the following conditions 12# are met: 13# 14# 1. Redistributions of source code must retain the above copyright 15# notice, this list of conditions and the following disclaimer. 16# 2. Redistributions in binary form must reproduce the above copyright 17# notice, this list of conditions and the following disclaimer in the 18# documentation and/or other materials provided with the distribution. 19# 3. Neither the name of author nor the names of any contributors may be 20# used to endorse or promote products derived from this software 21# without specific prior written permission. 22# 23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33# SUCH DAMAGE. 34# 35 36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 37 38# 39# Imports 40# 41 42import os 43import sys 44import weakref 45import threading 46import array 47import Queue 48 49from traceback import format_exc 50from multiprocess import Process, current_process, active_children, Pool, util, connection 51from multiprocess.process import AuthenticationString 52from multiprocess.forking import exit, Popen, assert_spawning, ForkingPickler 53from multiprocess.util import Finalize, info 54 55try: 56 from dill import PicklingError 57except ImportError: 58 try: 59 from cPickle import PicklingError 60 except ImportError: 61 from pickle import PicklingError 62 63# 64# Register some things for pickling 65# 66 67def reduce_array(a): 68 return array.array, (a.typecode, a.tostring()) 69ForkingPickler.register(array.array, reduce_array) 70 71view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 72 73# 74# Type for identifying shared objects 75# 76 77class Token(object): 78 ''' 79 Type to uniquely indentify a shared object 80 ''' 81 __slots__ = ('typeid', 'address', 'id') 82 83 def __init__(self, typeid, address, id): 84 (self.typeid, self.address, self.id) = (typeid, address, id) 85 86 def __getstate__(self): 87 return (self.typeid, self.address, self.id) 88 89 def __setstate__(self, state): 90 (self.typeid, self.address, self.id) = state 91 92 def __repr__(self): 93 return 'Token(typeid=%r, address=%r, id=%r)' % \ 94 (self.typeid, self.address, self.id) 95 96# 97# Function for communication with a manager's server process 98# 99 100def dispatch(c, id, methodname, args=(), kwds={}): 101 ''' 102 Send a message to manager using connection `c` and return response 103 ''' 104 c.send((id, methodname, args, kwds)) 105 kind, result = c.recv() 106 if kind == '#RETURN': 107 return result 108 raise convert_to_error(kind, result) 109 110def convert_to_error(kind, result): 111 if kind == '#ERROR': 112 return result 113 elif kind == '#TRACEBACK': 114 assert type(result) is str 115 return RemoteError(result) 116 elif kind == '#UNSERIALIZABLE': 117 assert type(result) is str 118 return RemoteError('Unserializable message: %s\n' % result) 119 else: 120 return ValueError('Unrecognized message type') 121 122class RemoteError(Exception): 123 def __str__(self): 124 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 125 126# 127# Functions for finding the method names of an object 128# 129 130def all_methods(obj): 131 ''' 132 Return a list of names of methods of `obj` 133 ''' 134 temp = [] 135 for name in dir(obj): 136 func = getattr(obj, name) 137 if hasattr(func, '__call__'): 138 temp.append(name) 139 return temp 140 141def public_methods(obj): 142 ''' 143 Return a list of names of methods of `obj` which do not start with '_' 144 ''' 145 return [name for name in all_methods(obj) if name[0] != '_'] 146 147# 148# Server which is run in a process controlled by a manager 149# 150 151class Server(object): 152 ''' 153 Server class which runs in a process controlled by a manager object 154 ''' 155 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 156 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 157 158 def __init__(self, registry, address, authkey, serializer): 159 assert isinstance(authkey, bytes) 160 self.registry = registry 161 self.authkey = AuthenticationString(authkey) 162 Listener, Client = listener_client[serializer] 163 164 # do authentication later 165 self.listener = Listener(address=address, backlog=16) 166 self.address = self.listener.address 167 168 self.id_to_obj = {'0': (None, ())} 169 self.id_to_refcount = {} 170 self.mutex = threading.RLock() 171 self.stop = 0 172 173 def serve_forever(self): 174 ''' 175 Run the server forever 176 ''' 177 current_process()._manager_server = self 178 try: 179 try: 180 while 1: 181 try: 182 c = self.listener.accept() 183 except (OSError, IOError): 184 continue 185 t = threading.Thread(target=self.handle_request, args=(c,)) 186 t.daemon = True 187 t.start() 188 except (KeyboardInterrupt, SystemExit): 189 pass 190 finally: 191 self.stop = 999 192 self.listener.close() 193 194 def handle_request(self, c): 195 ''' 196 Handle a new connection 197 ''' 198 funcname = result = request = None 199 try: 200 connection.deliver_challenge(c, self.authkey) 201 connection.answer_challenge(c, self.authkey) 202 request = c.recv() 203 ignore, funcname, args, kwds = request 204 assert funcname in self.public, '%r unrecognized' % funcname 205 func = getattr(self, funcname) 206 except Exception: 207 msg = ('#TRACEBACK', format_exc()) 208 else: 209 try: 210 result = func(c, *args, **kwds) 211 except Exception: 212 msg = ('#TRACEBACK', format_exc()) 213 else: 214 msg = ('#RETURN', result) 215 try: 216 c.send(msg) 217 except Exception, e: 218 try: 219 c.send(('#TRACEBACK', format_exc())) 220 except Exception: 221 pass 222 util.info('Failure to send message: %r', msg) 223 util.info(' ... request was %r', request) 224 util.info(' ... exception was %r', e) 225 226 c.close() 227 228 def serve_client(self, conn): 229 ''' 230 Handle requests from the proxies in a particular process/thread 231 ''' 232 util.debug('starting server thread to service %r', 233 threading.current_thread().name) 234 235 recv = conn.recv 236 send = conn.send 237 id_to_obj = self.id_to_obj 238 239 while not self.stop: 240 241 try: 242 methodname = obj = None 243 request = recv() 244 ident, methodname, args, kwds = request 245 obj, exposed, gettypeid = id_to_obj[ident] 246 247 if methodname not in exposed: 248 raise AttributeError( 249 'method %r of %r object is not in exposed=%r' % 250 (methodname, type(obj), exposed) 251 ) 252 253 function = getattr(obj, methodname) 254 255 try: 256 res = function(*args, **kwds) 257 except Exception, e: 258 msg = ('#ERROR', e) 259 else: 260 typeid = gettypeid and gettypeid.get(methodname, None) 261 if typeid: 262 rident, rexposed = self.create(conn, typeid, res) 263 token = Token(typeid, self.address, rident) 264 msg = ('#PROXY', (rexposed, token)) 265 else: 266 msg = ('#RETURN', res) 267 268 except AttributeError: 269 if methodname is None: 270 msg = ('#TRACEBACK', format_exc()) 271 else: 272 try: 273 fallback_func = self.fallback_mapping[methodname] 274 result = fallback_func( 275 self, conn, ident, obj, *args, **kwds 276 ) 277 msg = ('#RETURN', result) 278 except Exception: 279 msg = ('#TRACEBACK', format_exc()) 280 281 except EOFError: 282 util.debug('got EOF -- exiting thread serving %r', 283 threading.current_thread().name) 284 sys.exit(0) 285 286 except Exception: 287 msg = ('#TRACEBACK', format_exc()) 288 289 try: 290 try: 291 send(msg) 292 except Exception, e: 293 send(('#UNSERIALIZABLE', format_exc())) 294 except Exception, e: 295 util.info('exception in thread serving %r', 296 threading.current_thread().name) 297 util.info(' ... message was %r', msg) 298 util.info(' ... exception was %r', e) 299 conn.close() 300 sys.exit(1) 301 302 def fallback_getvalue(self, conn, ident, obj): 303 return obj 304 305 def fallback_str(self, conn, ident, obj): 306 return str(obj) 307 308 def fallback_repr(self, conn, ident, obj): 309 return repr(obj) 310 311 fallback_mapping = { 312 '__str__':fallback_str, 313 '__repr__':fallback_repr, 314 '#GETVALUE':fallback_getvalue 315 } 316 317 def dummy(self, c): 318 pass 319 320 def debug_info(self, c): 321 ''' 322 Return some info --- useful to spot problems with refcounting 323 ''' 324 self.mutex.acquire() 325 try: 326 result = [] 327 keys = self.id_to_obj.keys() 328 keys.sort() 329 for ident in keys: 330 if ident != '0': 331 result.append(' %s: refcount=%s\n %s' % 332 (ident, self.id_to_refcount[ident], 333 str(self.id_to_obj[ident][0])[:75])) 334 return '\n'.join(result) 335 finally: 336 self.mutex.release() 337 338 def number_of_objects(self, c): 339 ''' 340 Number of shared objects 341 ''' 342 return len(self.id_to_obj) - 1 # don't count ident='0' 343 344 def shutdown(self, c): 345 ''' 346 Shutdown this process 347 ''' 348 try: 349 try: 350 util.debug('manager received shutdown message') 351 c.send(('#RETURN', None)) 352 353 if sys.stdout != sys.__stdout__: 354 util.debug('resetting stdout, stderr') 355 sys.stdout = sys.__stdout__ 356 sys.stderr = sys.__stderr__ 357 358 util._run_finalizers(0) 359 360 for p in active_children(): 361 util.debug('terminating a child process of manager') 362 p.terminate() 363 364 for p in active_children(): 365 util.debug('terminating a child process of manager') 366 p.join() 367 368 util._run_finalizers() 369 util.info('manager exiting with exitcode 0') 370 except: 371 import traceback 372 traceback.print_exc() 373 finally: 374 exit(0) 375 376 def create(self, c, typeid, *args, **kwds): 377 ''' 378 Create a new shared object and return its id 379 ''' 380 self.mutex.acquire() 381 try: 382 callable, exposed, method_to_typeid, proxytype = \ 383 self.registry[typeid] 384 385 if callable is None: 386 assert len(args) == 1 and not kwds 387 obj = args[0] 388 else: 389 obj = callable(*args, **kwds) 390 391 if exposed is None: 392 exposed = public_methods(obj) 393 if method_to_typeid is not None: 394 assert type(method_to_typeid) is dict 395 exposed = list(exposed) + list(method_to_typeid) 396 397 ident = '%x' % id(obj) # convert to string because xmlrpclib 398 # only has 32 bit signed integers 399 util.debug('%r callable returned object with id %r', typeid, ident) 400 401 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 402 if ident not in self.id_to_refcount: 403 self.id_to_refcount[ident] = 0 404 # increment the reference count immediately, to avoid 405 # this object being garbage collected before a Proxy 406 # object for it can be created. The caller of create() 407 # is responsible for doing a decref once the Proxy object 408 # has been created. 409 self.incref(c, ident) 410 return ident, tuple(exposed) 411 finally: 412 self.mutex.release() 413 414 def get_methods(self, c, token): 415 ''' 416 Return the methods of the shared object indicated by token 417 ''' 418 return tuple(self.id_to_obj[token.id][1]) 419 420 def accept_connection(self, c, name): 421 ''' 422 Spawn a new thread to serve this connection 423 ''' 424 threading.current_thread().name = name 425 c.send(('#RETURN', None)) 426 self.serve_client(c) 427 428 def incref(self, c, ident): 429 self.mutex.acquire() 430 try: 431 self.id_to_refcount[ident] += 1 432 finally: 433 self.mutex.release() 434 435 def decref(self, c, ident): 436 self.mutex.acquire() 437 try: 438 assert self.id_to_refcount[ident] >= 1 439 self.id_to_refcount[ident] -= 1 440 if self.id_to_refcount[ident] == 0: 441 del self.id_to_obj[ident], self.id_to_refcount[ident] 442 util.debug('disposing of obj with id %r', ident) 443 finally: 444 self.mutex.release() 445 446# 447# Class to represent state of a manager 448# 449 450class State(object): 451 __slots__ = ['value'] 452 INITIAL = 0 453 STARTED = 1 454 SHUTDOWN = 2 455 456# 457# Mapping from serializer name to Listener and Client types 458# 459 460listener_client = { 461 'pickle' : (connection.Listener, connection.Client), 462 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 463 } 464 465# 466# Definition of BaseManager 467# 468 469class BaseManager(object): 470 ''' 471 Base class for managers 472 ''' 473 _registry = {} 474 _Server = Server 475 476 def __init__(self, address=None, authkey=None, serializer='pickle'): 477 if authkey is None: 478 authkey = current_process().authkey 479 self._address = address # XXX not final address if eg ('', 0) 480 self._authkey = AuthenticationString(authkey) 481 self._state = State() 482 self._state.value = State.INITIAL 483 self._serializer = serializer 484 self._Listener, self._Client = listener_client[serializer] 485 486 def __reduce__(self): 487 return type(self).from_address, \ 488 (self._address, self._authkey, self._serializer) 489 490 def get_server(self): 491 ''' 492 Return server object with serve_forever() method and address attribute 493 ''' 494 assert self._state.value == State.INITIAL 495 return Server(self._registry, self._address, 496 self._authkey, self._serializer) 497 498 def connect(self): 499 ''' 500 Connect manager object to the server process 501 ''' 502 Listener, Client = listener_client[self._serializer] 503 conn = Client(self._address, authkey=self._authkey) 504 dispatch(conn, None, 'dummy') 505 self._state.value = State.STARTED 506 507 def start(self, initializer=None, initargs=()): 508 ''' 509 Spawn a server process for this manager object 510 ''' 511 assert self._state.value == State.INITIAL 512 513 if initializer is not None and not hasattr(initializer, '__call__'): 514 raise TypeError('initializer must be a callable') 515 516 # pipe over which we will retrieve address of server 517 reader, writer = connection.Pipe(duplex=False) 518 519 # spawn process which runs a server 520 self._process = Process( 521 target=type(self)._run_server, 522 args=(self._registry, self._address, self._authkey, 523 self._serializer, writer, initializer, initargs), 524 ) 525 ident = ':'.join(str(i) for i in self._process._identity) 526 self._process.name = type(self).__name__ + '-' + ident 527 self._process.start() 528 529 # get address of server 530 writer.close() 531 self._address = reader.recv() 532 reader.close() 533 534 # register a finalizer 535 self._state.value = State.STARTED 536 self.shutdown = util.Finalize( 537 self, type(self)._finalize_manager, 538 args=(self._process, self._address, self._authkey, 539 self._state, self._Client), 540 exitpriority=0 541 ) 542 543 @classmethod 544 def _run_server(cls, registry, address, authkey, serializer, writer, 545 initializer=None, initargs=()): 546 ''' 547 Create a server, report its address and run it 548 ''' 549 if initializer is not None: 550 initializer(*initargs) 551 552 # create server 553 server = cls._Server(registry, address, authkey, serializer) 554 555 # inform parent process of the server's address 556 writer.send(server.address) 557 writer.close() 558 559 # run the manager 560 util.info('manager serving at %r', server.address) 561 server.serve_forever() 562 563 def _create(self, typeid, *args, **kwds): 564 ''' 565 Create a new shared object; return the token and exposed tuple 566 ''' 567 assert self._state.value == State.STARTED, 'server not yet started' 568 conn = self._Client(self._address, authkey=self._authkey) 569 try: 570 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 571 finally: 572 conn.close() 573 return Token(typeid, self._address, id), exposed 574 575 def join(self, timeout=None): 576 ''' 577 Join the manager process (if it has been spawned) 578 ''' 579 self._process.join(timeout) 580 581 def _debug_info(self): 582 ''' 583 Return some info about the servers shared objects and connections 584 ''' 585 conn = self._Client(self._address, authkey=self._authkey) 586 try: 587 return dispatch(conn, None, 'debug_info') 588 finally: 589 conn.close() 590 591 def _number_of_objects(self): 592 ''' 593 Return the number of shared objects 594 ''' 595 conn = self._Client(self._address, authkey=self._authkey) 596 try: 597 return dispatch(conn, None, 'number_of_objects') 598 finally: 599 conn.close() 600 601 def __enter__(self): 602 return self 603 604 def __exit__(self, exc_type, exc_val, exc_tb): 605 self.shutdown() 606 607 @staticmethod 608 def _finalize_manager(process, address, authkey, state, _Client): 609 ''' 610 Shutdown the manager process; will be registered as a finalizer 611 ''' 612 if process.is_alive(): 613 util.info('sending shutdown message to manager') 614 try: 615 conn = _Client(address, authkey=authkey) 616 try: 617 dispatch(conn, None, 'shutdown') 618 finally: 619 conn.close() 620 except Exception: 621 pass 622 623 process.join(timeout=0.2) 624 if process.is_alive(): 625 util.info('manager still alive') 626 if hasattr(process, 'terminate'): 627 util.info('trying to `terminate()` manager process') 628 process.terminate() 629 process.join(timeout=0.1) 630 if process.is_alive(): 631 util.info('manager still alive after terminate') 632 633 state.value = State.SHUTDOWN 634 try: 635 del BaseProxy._address_to_local[address] 636 except KeyError: 637 pass 638 639 address = property(lambda self: self._address) 640 641 @classmethod 642 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 643 method_to_typeid=None, create_method=True): 644 ''' 645 Register a typeid with the manager type 646 ''' 647 if '_registry' not in cls.__dict__: 648 cls._registry = cls._registry.copy() 649 650 if proxytype is None: 651 proxytype = AutoProxy 652 653 exposed = exposed or getattr(proxytype, '_exposed_', None) 654 655 method_to_typeid = method_to_typeid or \ 656 getattr(proxytype, '_method_to_typeid_', None) 657 658 if method_to_typeid: 659 for key, value in method_to_typeid.items(): 660 assert type(key) is str, '%r is not a string' % key 661 assert type(value) is str, '%r is not a string' % value 662 663 cls._registry[typeid] = ( 664 callable, exposed, method_to_typeid, proxytype 665 ) 666 667 if create_method: 668 def temp(self, *args, **kwds): 669 util.debug('requesting creation of a shared %r object', typeid) 670 token, exp = self._create(typeid, *args, **kwds) 671 proxy = proxytype( 672 token, self._serializer, manager=self, 673 authkey=self._authkey, exposed=exp 674 ) 675 conn = self._Client(token.address, authkey=self._authkey) 676 dispatch(conn, None, 'decref', (token.id,)) 677 return proxy 678 temp.__name__ = typeid 679 setattr(cls, typeid, temp) 680 681# 682# Subclass of set which get cleared after a fork 683# 684 685class ProcessLocalSet(set): 686 def __init__(self): 687 util.register_after_fork(self, lambda obj: obj.clear()) 688 def __reduce__(self): 689 return type(self), () 690 691# 692# Definition of BaseProxy 693# 694 695class BaseProxy(object): 696 ''' 697 A base for proxies of shared objects 698 ''' 699 _address_to_local = {} 700 _mutex = util.ForkAwareThreadLock() 701 702 def __init__(self, token, serializer, manager=None, 703 authkey=None, exposed=None, incref=True): 704 BaseProxy._mutex.acquire() 705 try: 706 tls_idset = BaseProxy._address_to_local.get(token.address, None) 707 if tls_idset is None: 708 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 709 BaseProxy._address_to_local[token.address] = tls_idset 710 finally: 711 BaseProxy._mutex.release() 712 713 # self._tls is used to record the connection used by this 714 # thread to communicate with the manager at token.address 715 self._tls = tls_idset[0] 716 717 # self._idset is used to record the identities of all shared 718 # objects for which the current process owns references and 719 # which are in the manager at token.address 720 self._idset = tls_idset[1] 721 722 self._token = token 723 self._id = self._token.id 724 self._manager = manager 725 self._serializer = serializer 726 self._Client = listener_client[serializer][1] 727 728 if authkey is not None: 729 self._authkey = AuthenticationString(authkey) 730 elif self._manager is not None: 731 self._authkey = self._manager._authkey 732 else: 733 self._authkey = current_process().authkey 734 735 if incref: 736 self._incref() 737 738 util.register_after_fork(self, BaseProxy._after_fork) 739 740 def _connect(self): 741 util.debug('making connection to manager') 742 name = current_process().name 743 if threading.current_thread().name != 'MainThread': 744 name += '|' + threading.current_thread().name 745 conn = self._Client(self._token.address, authkey=self._authkey) 746 dispatch(conn, None, 'accept_connection', (name,)) 747 self._tls.connection = conn 748 749 def _callmethod(self, methodname, args=(), kwds={}): 750 ''' 751 Try to call a method of the referrent and return a copy of the result 752 ''' 753 try: 754 conn = self._tls.connection 755 except AttributeError: 756 util.debug('thread %r does not own a connection', 757 threading.current_thread().name) 758 self._connect() 759 conn = self._tls.connection 760 761 conn.send((self._id, methodname, args, kwds)) 762 kind, result = conn.recv() 763 764 if kind == '#RETURN': 765 return result 766 elif kind == '#PROXY': 767 exposed, token = result 768 proxytype = self._manager._registry[token.typeid][-1] 769 token.address = self._token.address 770 proxy = proxytype( 771 token, self._serializer, manager=self._manager, 772 authkey=self._authkey, exposed=exposed 773 ) 774 conn = self._Client(token.address, authkey=self._authkey) 775 dispatch(conn, None, 'decref', (token.id,)) 776 return proxy 777 raise convert_to_error(kind, result) 778 779 def _getvalue(self): 780 ''' 781 Get a copy of the value of the referent 782 ''' 783 return self._callmethod('#GETVALUE') 784 785 def _incref(self): 786 conn = self._Client(self._token.address, authkey=self._authkey) 787 dispatch(conn, None, 'incref', (self._id,)) 788 util.debug('INCREF %r', self._token.id) 789 790 self._idset.add(self._id) 791 792 state = self._manager and self._manager._state 793 794 self._close = util.Finalize( 795 self, BaseProxy._decref, 796 args=(self._token, self._authkey, state, 797 self._tls, self._idset, self._Client), 798 exitpriority=10 799 ) 800 801 @staticmethod 802 def _decref(token, authkey, state, tls, idset, _Client): 803 idset.discard(token.id) 804 805 # check whether manager is still alive 806 if state is None or state.value == State.STARTED: 807 # tell manager this process no longer cares about referent 808 try: 809 util.debug('DECREF %r', token.id) 810 conn = _Client(token.address, authkey=authkey) 811 dispatch(conn, None, 'decref', (token.id,)) 812 except Exception, e: 813 util.debug('... decref failed %s', e) 814 815 else: 816 util.debug('DECREF %r -- manager already shutdown', token.id) 817 818 # check whether we can close this thread's connection because 819 # the process owns no more references to objects for this manager 820 if not idset and hasattr(tls, 'connection'): 821 util.debug('thread %r has no more proxies so closing conn', 822 threading.current_thread().name) 823 tls.connection.close() 824 del tls.connection 825 826 def _after_fork(self): 827 self._manager = None 828 try: 829 self._incref() 830 except Exception, e: 831 # the proxy may just be for a manager which has shutdown 832 util.info('incref failed: %s' % e) 833 834 def __reduce__(self): 835 kwds = {} 836 if Popen.thread_is_spawning(): 837 kwds['authkey'] = self._authkey 838 839 if getattr(self, '_isauto', False): 840 kwds['exposed'] = self._exposed_ 841 return (RebuildProxy, 842 (AutoProxy, self._token, self._serializer, kwds)) 843 else: 844 return (RebuildProxy, 845 (type(self), self._token, self._serializer, kwds)) 846 847 def __deepcopy__(self, memo): 848 return self._getvalue() 849 850 def __repr__(self): 851 return '<%s object, typeid %r at %s>' % \ 852 (type(self).__name__, self._token.typeid, '0x%x' % id(self)) 853 854 def __str__(self): 855 ''' 856 Return representation of the referent (or a fall-back if that fails) 857 ''' 858 try: 859 return self._callmethod('__repr__') 860 except Exception: 861 return repr(self)[:-1] + "; '__str__()' failed>" 862 863# 864# Function used for unpickling 865# 866 867def RebuildProxy(func, token, serializer, kwds): 868 ''' 869 Function used for unpickling proxy objects. 870 871 If possible the shared object is returned, or otherwise a proxy for it. 872 ''' 873 server = getattr(current_process(), '_manager_server', None) 874 875 if server and server.address == token.address: 876 return server.id_to_obj[token.id][0] 877 else: 878 incref = ( 879 kwds.pop('incref', True) and 880 not getattr(current_process(), '_inheriting', False) 881 ) 882 return func(token, serializer, incref=incref, **kwds) 883 884# 885# Functions to create proxies and proxy types 886# 887 888def MakeProxyType(name, exposed, _cache={}): 889 ''' 890 Return a proxy type whose methods are given by `exposed` 891 ''' 892 exposed = tuple(exposed) 893 try: 894 return _cache[(name, exposed)] 895 except KeyError: 896 pass 897 898 dic = {} 899 900 for meth in exposed: 901 exec '''def %s(self, *args, **kwds): 902 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic 903 904 ProxyType = type(name, (BaseProxy,), dic) 905 ProxyType._exposed_ = exposed 906 _cache[(name, exposed)] = ProxyType 907 return ProxyType 908 909 910def AutoProxy(token, serializer, manager=None, authkey=None, 911 exposed=None, incref=True): 912 ''' 913 Return an auto-proxy for `token` 914 ''' 915 _Client = listener_client[serializer][1] 916 917 if exposed is None: 918 conn = _Client(token.address, authkey=authkey) 919 try: 920 exposed = dispatch(conn, None, 'get_methods', (token,)) 921 finally: 922 conn.close() 923 924 if authkey is None and manager is not None: 925 authkey = manager._authkey 926 if authkey is None: 927 authkey = current_process().authkey 928 929 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 930 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 931 incref=incref) 932 proxy._isauto = True 933 return proxy 934 935# 936# Types/callables which we will register with SyncManager 937# 938 939class Namespace(object): 940 def __init__(self, **kwds): 941 self.__dict__.update(kwds) 942 def __repr__(self): 943 items = self.__dict__.items() 944 temp = [] 945 for name, value in items: 946 if not name.startswith('_'): 947 temp.append('%s=%r' % (name, value)) 948 temp.sort() 949 return 'Namespace(%s)' % str.join(', ', temp) 950 951class Value(object): 952 def __init__(self, typecode, value, lock=True): 953 self._typecode = typecode 954 self._value = value 955 def get(self): 956 return self._value 957 def set(self, value): 958 self._value = value 959 def __repr__(self): 960 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 961 value = property(get, set) 962 963def Array(typecode, sequence, lock=True): 964 return array.array(typecode, sequence) 965 966# 967# Proxy types used by SyncManager 968# 969 970class IteratorProxy(BaseProxy): 971 # XXX remove methods for Py3.0 and Py2.6 972 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close') 973 def __iter__(self): 974 return self 975 def __next__(self, *args): 976 return self._callmethod('__next__', args) 977 def next(self, *args): 978 return self._callmethod('next', args) 979 def send(self, *args): 980 return self._callmethod('send', args) 981 def throw(self, *args): 982 return self._callmethod('throw', args) 983 def close(self, *args): 984 return self._callmethod('close', args) 985 986 987class AcquirerProxy(BaseProxy): 988 _exposed_ = ('acquire', 'release') 989 def acquire(self, blocking=True): 990 return self._callmethod('acquire', (blocking,)) 991 def release(self): 992 return self._callmethod('release') 993 def __enter__(self): 994 return self._callmethod('acquire') 995 def __exit__(self, exc_type, exc_val, exc_tb): 996 return self._callmethod('release') 997 998 999class ConditionProxy(AcquirerProxy): 1000 # XXX will Condition.notfyAll() name be available in Py3.0? 1001 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 1002 def wait(self, timeout=None): 1003 return self._callmethod('wait', (timeout,)) 1004 def notify(self): 1005 return self._callmethod('notify') 1006 def notify_all(self): 1007 return self._callmethod('notify_all') 1008 1009class EventProxy(BaseProxy): 1010 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1011 def is_set(self): 1012 return self._callmethod('is_set') 1013 def set(self): 1014 return self._callmethod('set') 1015 def clear(self): 1016 return self._callmethod('clear') 1017 def wait(self, timeout=None): 1018 return self._callmethod('wait', (timeout,)) 1019 1020class NamespaceProxy(BaseProxy): 1021 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1022 def __getattr__(self, key): 1023 if key[0] == '_': 1024 return object.__getattribute__(self, key) 1025 callmethod = object.__getattribute__(self, '_callmethod') 1026 return callmethod('__getattribute__', (key,)) 1027 def __setattr__(self, key, value): 1028 if key[0] == '_': 1029 return object.__setattr__(self, key, value) 1030 callmethod = object.__getattribute__(self, '_callmethod') 1031 return callmethod('__setattr__', (key, value)) 1032 def __delattr__(self, key): 1033 if key[0] == '_': 1034 return object.__delattr__(self, key) 1035 callmethod = object.__getattribute__(self, '_callmethod') 1036 return callmethod('__delattr__', (key,)) 1037 1038 1039class ValueProxy(BaseProxy): 1040 _exposed_ = ('get', 'set') 1041 def get(self): 1042 return self._callmethod('get') 1043 def set(self, value): 1044 return self._callmethod('set', (value,)) 1045 value = property(get, set) 1046 1047 1048BaseListProxy = MakeProxyType('BaseListProxy', ( 1049 '__add__', '__contains__', '__delitem__', '__delslice__', 1050 '__getitem__', '__getslice__', '__len__', '__mul__', 1051 '__reversed__', '__rmul__', '__setitem__', '__setslice__', 1052 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1053 'reverse', 'sort', '__imul__' 1054 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 1055class ListProxy(BaseListProxy): 1056 def __iadd__(self, value): 1057 self._callmethod('extend', (value,)) 1058 return self 1059 def __imul__(self, value): 1060 self._callmethod('__imul__', (value,)) 1061 return self 1062 1063 1064DictProxy = MakeProxyType('DictProxy', ( 1065 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', 1066 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', 1067 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1068 )) 1069DictProxy._method_to_typeid_ = { 1070 '__iter__': 'Iterator', 1071 } 1072 1073 1074ArrayProxy = MakeProxyType('ArrayProxy', ( 1075 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' 1076 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 1077 1078 1079PoolProxy = MakeProxyType('PoolProxy', ( 1080 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1081 'map', 'map_async', 'terminate' 1082 )) 1083PoolProxy._method_to_typeid_ = { 1084 'apply_async': 'AsyncResult', 1085 'map_async': 'AsyncResult', 1086 'imap': 'Iterator', 1087 'imap_unordered': 'Iterator' 1088 } 1089 1090# 1091# Definition of SyncManager 1092# 1093 1094class SyncManager(BaseManager): 1095 ''' 1096 Subclass of `BaseManager` which supports a number of shared object types. 1097 1098 The types registered are those intended for the synchronization 1099 of threads, plus `dict`, `list` and `Namespace`. 1100 1101 The `multiprocessing.Manager()` function creates started instances of 1102 this class. 1103 ''' 1104 1105SyncManager.register('Queue', Queue.Queue) 1106SyncManager.register('JoinableQueue', Queue.Queue) 1107SyncManager.register('Event', threading.Event, EventProxy) 1108SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1109SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1110SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1111SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1112 AcquirerProxy) 1113SyncManager.register('Condition', threading.Condition, ConditionProxy) 1114SyncManager.register('Pool', Pool, PoolProxy) 1115SyncManager.register('list', list, ListProxy) 1116SyncManager.register('dict', dict, DictProxy) 1117SyncManager.register('Value', Value, ValueProxy) 1118SyncManager.register('Array', Array, ArrayProxy) 1119SyncManager.register('Namespace', Namespace, NamespaceProxy) 1120 1121# types returned by methods of PoolProxy 1122SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1123SyncManager.register('AsyncResult', create_method=False) 1124