1# 2# Module providing the `SyncManager` class for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# All rights reserved. 9# 10# Redistribution and use in source and binary forms, with or without 11# modification, are permitted provided that the following conditions 12# are met: 13# 14# 1. Redistributions of source code must retain the above copyright 15# notice, this list of conditions and the following disclaimer. 16# 2. Redistributions in binary form must reproduce the above copyright 17# notice, this list of conditions and the following disclaimer in the 18# documentation and/or other materials provided with the distribution. 19# 3. Neither the name of author nor the names of any contributors may be 20# used to endorse or promote products derived from this software 21# without specific prior written permission. 22# 23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33# SUCH DAMAGE. 34# 35 36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 37 38# 39# Imports 40# 41 42import os 43import sys 44import weakref 45import threading 46import array 47import queue 48 49from traceback import format_exc 50try: 51 from dill import PicklingError 52except ImportError: 53 from pickle import PicklingError 54from multiprocess import Process, current_process, active_children, Pool, util, connection 55from multiprocess.process import AuthenticationString 56from multiprocess.forking import exit, Popen, assert_spawning, ForkingPickler 57from multiprocess.util import Finalize, info 58 59# 60# Register some things for pickling 61# 62 63def reduce_array(a): 64 return array.array, (a.typecode, a.tostring()) 65ForkingPickler.register(array.array, reduce_array) 66 67view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 68if view_types[0] is not list: # only needed in Py3.0 69 def rebuild_as_list(obj): 70 return list, (list(obj),) 71 for view_type in view_types: 72 ForkingPickler.register(view_type, rebuild_as_list) 73 import copyreg 74 copyreg.pickle(view_type, rebuild_as_list) 75 76# 77# Type for identifying shared objects 78# 79 80class Token(object): 81 ''' 82 Type to uniquely indentify a shared object 83 ''' 84 __slots__ = ('typeid', 'address', 'id') 85 86 def __init__(self, typeid, address, id): 87 (self.typeid, self.address, self.id) = (typeid, address, id) 88 89 def __getstate__(self): 90 return (self.typeid, self.address, self.id) 91 92 def __setstate__(self, state): 93 (self.typeid, self.address, self.id) = state 94 95 def __repr__(self): 96 return 'Token(typeid=%r, address=%r, id=%r)' % \ 97 (self.typeid, self.address, self.id) 98 99# 100# Function for communication with a manager's server process 101# 102 103def dispatch(c, id, methodname, args=(), kwds={}): 104 ''' 105 Send a message to manager using connection `c` and return response 106 ''' 107 c.send((id, methodname, args, kwds)) 108 kind, result = c.recv() 109 if kind == '#RETURN': 110 return result 111 raise convert_to_error(kind, result) 112 113def convert_to_error(kind, result): 114 if kind == '#ERROR': 115 return result 116 elif kind == '#TRACEBACK': 117 assert type(result) is str 118 return RemoteError(result) 119 elif kind == '#UNSERIALIZABLE': 120 assert type(result) is str 121 return RemoteError('Unserializable message: %s\n' % result) 122 else: 123 return ValueError('Unrecognized message type') 124 125class RemoteError(Exception): 126 def __str__(self): 127 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 128 129# 130# Functions for finding the method names of an object 131# 132 133def all_methods(obj): 134 ''' 135 Return a list of names of methods of `obj` 136 ''' 137 temp = [] 138 for name in dir(obj): 139 func = getattr(obj, name) 140 if hasattr(func, '__call__'): 141 temp.append(name) 142 return temp 143 144def public_methods(obj): 145 ''' 146 Return a list of names of methods of `obj` which do not start with '_' 147 ''' 148 return [name for name in all_methods(obj) if name[0] != '_'] 149 150# 151# Server which is run in a process controlled by a manager 152# 153 154class Server(object): 155 ''' 156 Server class which runs in a process controlled by a manager object 157 ''' 158 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 159 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 160 161 def __init__(self, registry, address, authkey, serializer): 162 assert isinstance(authkey, bytes) 163 self.registry = registry 164 self.authkey = AuthenticationString(authkey) 165 Listener, Client = listener_client[serializer] 166 167 # do authentication later 168 self.listener = Listener(address=address, backlog=5) 169 self.address = self.listener.address 170 171 self.id_to_obj = {'0': (None, ())} 172 self.id_to_refcount = {} 173 self.mutex = threading.RLock() 174 self.stop = 0 175 176 def serve_forever(self): 177 ''' 178 Run the server forever 179 ''' 180 current_process()._manager_server = self 181 try: 182 try: 183 while 1: 184 try: 185 c = self.listener.accept() 186 except (OSError, IOError): 187 continue 188 t = threading.Thread(target=self.handle_request, args=(c,)) 189 t.daemon = True 190 t.start() 191 except (KeyboardInterrupt, SystemExit): 192 pass 193 finally: 194 self.stop = 999 195 self.listener.close() 196 197 def handle_request(self, c): 198 ''' 199 Handle a new connection 200 ''' 201 funcname = result = request = None 202 try: 203 connection.deliver_challenge(c, self.authkey) 204 connection.answer_challenge(c, self.authkey) 205 request = c.recv() 206 ignore, funcname, args, kwds = request 207 assert funcname in self.public, '%r unrecognized' % funcname 208 func = getattr(self, funcname) 209 except Exception: 210 msg = ('#TRACEBACK', format_exc()) 211 else: 212 try: 213 result = func(c, *args, **kwds) 214 except Exception: 215 msg = ('#TRACEBACK', format_exc()) 216 else: 217 msg = ('#RETURN', result) 218 try: 219 c.send(msg) 220 except Exception as e: 221 try: 222 c.send(('#TRACEBACK', format_exc())) 223 except Exception: 224 pass 225 util.info('Failure to send message: %r', msg) 226 util.info(' ... request was %r', request) 227 util.info(' ... exception was %r', e) 228 229 c.close() 230 231 def serve_client(self, conn): 232 ''' 233 Handle requests from the proxies in a particular process/thread 234 ''' 235 util.debug('starting server thread to service %r', 236 threading.current_thread().name) 237 238 recv = conn.recv 239 send = conn.send 240 id_to_obj = self.id_to_obj 241 242 while not self.stop: 243 244 try: 245 methodname = obj = None 246 request = recv() 247 ident, methodname, args, kwds = request 248 obj, exposed, gettypeid = id_to_obj[ident] 249 250 if methodname not in exposed: 251 raise AttributeError( 252 'method %r of %r object is not in exposed=%r' % 253 (methodname, type(obj), exposed) 254 ) 255 256 function = getattr(obj, methodname) 257 258 try: 259 res = function(*args, **kwds) 260 except Exception as e: 261 msg = ('#ERROR', e) 262 else: 263 typeid = gettypeid and gettypeid.get(methodname, None) 264 if typeid: 265 rident, rexposed = self.create(conn, typeid, res) 266 token = Token(typeid, self.address, rident) 267 msg = ('#PROXY', (rexposed, token)) 268 else: 269 msg = ('#RETURN', res) 270 271 except AttributeError: 272 if methodname is None: 273 msg = ('#TRACEBACK', format_exc()) 274 else: 275 try: 276 fallback_func = self.fallback_mapping[methodname] 277 result = fallback_func( 278 self, conn, ident, obj, *args, **kwds 279 ) 280 msg = ('#RETURN', result) 281 except Exception: 282 msg = ('#TRACEBACK', format_exc()) 283 284 except EOFError: 285 util.debug('got EOF -- exiting thread serving %r', 286 threading.current_thread().name) 287 sys.exit(0) 288 289 except Exception: 290 msg = ('#TRACEBACK', format_exc()) 291 292 try: 293 try: 294 send(msg) 295 except Exception as e: 296 send(('#UNSERIALIZABLE', repr(msg))) 297 except Exception as e: 298 util.info('exception in thread serving %r', 299 threading.current_thread().name) 300 util.info(' ... message was %r', msg) 301 util.info(' ... exception was %r', e) 302 conn.close() 303 sys.exit(1) 304 305 def fallback_getvalue(self, conn, ident, obj): 306 return obj 307 308 def fallback_str(self, conn, ident, obj): 309 return str(obj) 310 311 def fallback_repr(self, conn, ident, obj): 312 return repr(obj) 313 314 fallback_mapping = { 315 '__str__':fallback_str, 316 '__repr__':fallback_repr, 317 '#GETVALUE':fallback_getvalue 318 } 319 320 def dummy(self, c): 321 pass 322 323 def debug_info(self, c): 324 ''' 325 Return some info --- useful to spot problems with refcounting 326 ''' 327 self.mutex.acquire() 328 try: 329 result = [] 330 keys = list(self.id_to_obj.keys()) 331 keys.sort() 332 for ident in keys: 333 if ident != '0': 334 result.append(' %s: refcount=%s\n %s' % 335 (ident, self.id_to_refcount[ident], 336 str(self.id_to_obj[ident][0])[:75])) 337 return '\n'.join(result) 338 finally: 339 self.mutex.release() 340 341 def number_of_objects(self, c): 342 ''' 343 Number of shared objects 344 ''' 345 return len(self.id_to_obj) - 1 # don't count ident='0' 346 347 def shutdown(self, c): 348 ''' 349 Shutdown this process 350 ''' 351 try: 352 try: 353 util.debug('manager received shutdown message') 354 c.send(('#RETURN', None)) 355 356 if sys.stdout != sys.__stdout__: 357 util.debug('resetting stdout, stderr') 358 sys.stdout = sys.__stdout__ 359 sys.stderr = sys.__stderr__ 360 361 util._run_finalizers(0) 362 363 for p in active_children(): 364 util.debug('terminating a child process of manager') 365 p.terminate() 366 367 for p in active_children(): 368 util.debug('terminating a child process of manager') 369 p.join() 370 371 util._run_finalizers() 372 util.info('manager exiting with exitcode 0') 373 except: 374 import traceback 375 traceback.print_exc() 376 finally: 377 exit(0) 378 379 def create(self, c, typeid, *args, **kwds): 380 ''' 381 Create a new shared object and return its id 382 ''' 383 self.mutex.acquire() 384 try: 385 callable, exposed, method_to_typeid, proxytype = \ 386 self.registry[typeid] 387 388 if callable is None: 389 assert len(args) == 1 and not kwds 390 obj = args[0] 391 else: 392 obj = callable(*args, **kwds) 393 394 if exposed is None: 395 exposed = public_methods(obj) 396 if method_to_typeid is not None: 397 assert type(method_to_typeid) is dict 398 exposed = list(exposed) + list(method_to_typeid) 399 400 ident = '%x' % id(obj) # convert to string because xmlrpclib 401 # only has 32 bit signed integers 402 util.debug('%r callable returned object with id %r', typeid, ident) 403 404 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 405 if ident not in self.id_to_refcount: 406 self.id_to_refcount[ident] = 0 407 # increment the reference count immediately, to avoid 408 # this object being garbage collected before a Proxy 409 # object for it can be created. The caller of create() 410 # is responsible for doing a decref once the Proxy object 411 # has been created. 412 self.incref(c, ident) 413 return ident, tuple(exposed) 414 finally: 415 self.mutex.release() 416 417 def get_methods(self, c, token): 418 ''' 419 Return the methods of the shared object indicated by token 420 ''' 421 return tuple(self.id_to_obj[token.id][1]) 422 423 def accept_connection(self, c, name): 424 ''' 425 Spawn a new thread to serve this connection 426 ''' 427 threading.current_thread().name = name 428 c.send(('#RETURN', None)) 429 self.serve_client(c) 430 431 def incref(self, c, ident): 432 self.mutex.acquire() 433 try: 434 self.id_to_refcount[ident] += 1 435 finally: 436 self.mutex.release() 437 438 def decref(self, c, ident): 439 self.mutex.acquire() 440 try: 441 assert self.id_to_refcount[ident] >= 1 442 self.id_to_refcount[ident] -= 1 443 if self.id_to_refcount[ident] == 0: 444 del self.id_to_obj[ident], self.id_to_refcount[ident] 445 util.debug('disposing of obj with id %r', ident) 446 finally: 447 self.mutex.release() 448 449# 450# Class to represent state of a manager 451# 452 453class State(object): 454 __slots__ = ['value'] 455 INITIAL = 0 456 STARTED = 1 457 SHUTDOWN = 2 458 459# 460# Mapping from serializer name to Listener and Client types 461# 462 463listener_client = { 464 'pickle' : (connection.Listener, connection.Client), 465 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 466 } 467 468# 469# Definition of BaseManager 470# 471 472class BaseManager(object): 473 ''' 474 Base class for managers 475 ''' 476 _registry = {} 477 _Server = Server 478 479 def __init__(self, address=None, authkey=None, serializer='pickle'): 480 if authkey is None: 481 authkey = current_process().authkey 482 self._address = address # XXX not final address if eg ('', 0) 483 self._authkey = AuthenticationString(authkey) 484 self._state = State() 485 self._state.value = State.INITIAL 486 self._serializer = serializer 487 self._Listener, self._Client = listener_client[serializer] 488 489 def get_server(self): 490 ''' 491 Return server object with serve_forever() method and address attribute 492 ''' 493 assert self._state.value == State.INITIAL 494 return Server(self._registry, self._address, 495 self._authkey, self._serializer) 496 497 def connect(self): 498 ''' 499 Connect manager object to the server process 500 ''' 501 Listener, Client = listener_client[self._serializer] 502 conn = Client(self._address, authkey=self._authkey) 503 dispatch(conn, None, 'dummy') 504 self._state.value = State.STARTED 505 506 def start(self, initializer=None, initargs=()): 507 ''' 508 Spawn a server process for this manager object 509 ''' 510 assert self._state.value == State.INITIAL 511 512 if initializer is not None and not hasattr(initializer, '__call__'): 513 raise TypeError('initializer must be a callable') 514 515 # pipe over which we will retrieve address of server 516 reader, writer = connection.Pipe(duplex=False) 517 518 # spawn process which runs a server 519 self._process = Process( 520 target=type(self)._run_server, 521 args=(self._registry, self._address, self._authkey, 522 self._serializer, writer, initializer, initargs), 523 ) 524 ident = ':'.join(str(i) for i in self._process._identity) 525 self._process.name = type(self).__name__ + '-' + ident 526 self._process.start() 527 528 # get address of server 529 writer.close() 530 self._address = reader.recv() 531 reader.close() 532 533 # register a finalizer 534 self._state.value = State.STARTED 535 self.shutdown = util.Finalize( 536 self, type(self)._finalize_manager, 537 args=(self._process, self._address, self._authkey, 538 self._state, self._Client), 539 exitpriority=0 540 ) 541 542 @classmethod 543 def _run_server(cls, registry, address, authkey, serializer, writer, 544 initializer=None, initargs=()): 545 ''' 546 Create a server, report its address and run it 547 ''' 548 if initializer is not None: 549 initializer(*initargs) 550 551 # create server 552 server = cls._Server(registry, address, authkey, serializer) 553 554 # inform parent process of the server's address 555 writer.send(server.address) 556 writer.close() 557 558 # run the manager 559 util.info('manager serving at %r', server.address) 560 server.serve_forever() 561 562 def _create(self, typeid, *args, **kwds): 563 ''' 564 Create a new shared object; return the token and exposed tuple 565 ''' 566 assert self._state.value == State.STARTED, 'server not yet started' 567 conn = self._Client(self._address, authkey=self._authkey) 568 try: 569 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 570 finally: 571 conn.close() 572 return Token(typeid, self._address, id), exposed 573 574 def join(self, timeout=None): 575 ''' 576 Join the manager process (if it has been spawned) 577 ''' 578 self._process.join(timeout) 579 580 def _debug_info(self): 581 ''' 582 Return some info about the servers shared objects and connections 583 ''' 584 conn = self._Client(self._address, authkey=self._authkey) 585 try: 586 return dispatch(conn, None, 'debug_info') 587 finally: 588 conn.close() 589 590 def _number_of_objects(self): 591 ''' 592 Return the number of shared objects 593 ''' 594 conn = self._Client(self._address, authkey=self._authkey) 595 try: 596 return dispatch(conn, None, 'number_of_objects') 597 finally: 598 conn.close() 599 600 def __enter__(self): 601 return self 602 603 def __exit__(self, exc_type, exc_val, exc_tb): 604 self.shutdown() 605 606 @staticmethod 607 def _finalize_manager(process, address, authkey, state, _Client): 608 ''' 609 Shutdown the manager process; will be registered as a finalizer 610 ''' 611 if process.is_alive(): 612 util.info('sending shutdown message to manager') 613 try: 614 conn = _Client(address, authkey=authkey) 615 try: 616 dispatch(conn, None, 'shutdown') 617 finally: 618 conn.close() 619 except Exception: 620 pass 621 622 process.join(timeout=0.2) 623 if process.is_alive(): 624 util.info('manager still alive') 625 if hasattr(process, 'terminate'): 626 util.info('trying to `terminate()` manager process') 627 process.terminate() 628 process.join(timeout=0.1) 629 if process.is_alive(): 630 util.info('manager still alive after terminate') 631 632 state.value = State.SHUTDOWN 633 try: 634 del BaseProxy._address_to_local[address] 635 except KeyError: 636 pass 637 638 address = property(lambda self: self._address) 639 640 @classmethod 641 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 642 method_to_typeid=None, create_method=True): 643 ''' 644 Register a typeid with the manager type 645 ''' 646 if '_registry' not in cls.__dict__: 647 cls._registry = cls._registry.copy() 648 649 if proxytype is None: 650 proxytype = AutoProxy 651 652 exposed = exposed or getattr(proxytype, '_exposed_', None) 653 654 method_to_typeid = method_to_typeid or \ 655 getattr(proxytype, '_method_to_typeid_', None) 656 657 if method_to_typeid: 658 for key, value in list(method_to_typeid.items()): 659 assert type(key) is str, '%r is not a string' % key 660 assert type(value) is str, '%r is not a string' % value 661 662 cls._registry[typeid] = ( 663 callable, exposed, method_to_typeid, proxytype 664 ) 665 666 if create_method: 667 def temp(self, *args, **kwds): 668 util.debug('requesting creation of a shared %r object', typeid) 669 token, exp = self._create(typeid, *args, **kwds) 670 proxy = proxytype( 671 token, self._serializer, manager=self, 672 authkey=self._authkey, exposed=exp 673 ) 674 conn = self._Client(token.address, authkey=self._authkey) 675 dispatch(conn, None, 'decref', (token.id,)) 676 return proxy 677 temp.__name__ = typeid 678 setattr(cls, typeid, temp) 679 680# 681# Subclass of set which get cleared after a fork 682# 683 684class ProcessLocalSet(set): 685 def __init__(self): 686 util.register_after_fork(self, lambda obj: obj.clear()) 687 def __reduce__(self): 688 return type(self), () 689 690# 691# Definition of BaseProxy 692# 693 694class BaseProxy(object): 695 ''' 696 A base for proxies of shared objects 697 ''' 698 _address_to_local = {} 699 _mutex = util.ForkAwareThreadLock() 700 701 def __init__(self, token, serializer, manager=None, 702 authkey=None, exposed=None, incref=True): 703 BaseProxy._mutex.acquire() 704 try: 705 tls_idset = BaseProxy._address_to_local.get(token.address, None) 706 if tls_idset is None: 707 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 708 BaseProxy._address_to_local[token.address] = tls_idset 709 finally: 710 BaseProxy._mutex.release() 711 712 # self._tls is used to record the connection used by this 713 # thread to communicate with the manager at token.address 714 self._tls = tls_idset[0] 715 716 # self._idset is used to record the identities of all shared 717 # objects for which the current process owns references and 718 # which are in the manager at token.address 719 self._idset = tls_idset[1] 720 721 self._token = token 722 self._id = self._token.id 723 self._manager = manager 724 self._serializer = serializer 725 self._Client = listener_client[serializer][1] 726 727 if authkey is not None: 728 self._authkey = AuthenticationString(authkey) 729 elif self._manager is not None: 730 self._authkey = self._manager._authkey 731 else: 732 self._authkey = current_process().authkey 733 734 if incref: 735 self._incref() 736 737 util.register_after_fork(self, BaseProxy._after_fork) 738 739 def _connect(self): 740 util.debug('making connection to manager') 741 name = current_process().name 742 if threading.current_thread().name != 'MainThread': 743 name += '|' + threading.current_thread().name 744 conn = self._Client(self._token.address, authkey=self._authkey) 745 dispatch(conn, None, 'accept_connection', (name,)) 746 self._tls.connection = conn 747 748 def _callmethod(self, methodname, args=(), kwds={}): 749 ''' 750 Try to call a method of the referrent and return a copy of the result 751 ''' 752 try: 753 conn = self._tls.connection 754 except AttributeError: 755 util.debug('thread %r does not own a connection', 756 threading.current_thread().name) 757 self._connect() 758 conn = self._tls.connection 759 760 conn.send((self._id, methodname, args, kwds)) 761 kind, result = conn.recv() 762 763 if kind == '#RETURN': 764 return result 765 elif kind == '#PROXY': 766 exposed, token = result 767 proxytype = self._manager._registry[token.typeid][-1] 768 proxy = proxytype( 769 token, self._serializer, manager=self._manager, 770 authkey=self._authkey, exposed=exposed 771 ) 772 conn = self._Client(token.address, authkey=self._authkey) 773 dispatch(conn, None, 'decref', (token.id,)) 774 return proxy 775 raise convert_to_error(kind, result) 776 777 def _getvalue(self): 778 ''' 779 Get a copy of the value of the referent 780 ''' 781 return self._callmethod('#GETVALUE') 782 783 def _incref(self): 784 conn = self._Client(self._token.address, authkey=self._authkey) 785 dispatch(conn, None, 'incref', (self._id,)) 786 util.debug('INCREF %r', self._token.id) 787 788 self._idset.add(self._id) 789 790 state = self._manager and self._manager._state 791 792 self._close = util.Finalize( 793 self, BaseProxy._decref, 794 args=(self._token, self._authkey, state, 795 self._tls, self._idset, self._Client), 796 exitpriority=10 797 ) 798 799 @staticmethod 800 def _decref(token, authkey, state, tls, idset, _Client): 801 idset.discard(token.id) 802 803 # check whether manager is still alive 804 if state is None or state.value == State.STARTED: 805 # tell manager this process no longer cares about referent 806 try: 807 util.debug('DECREF %r', token.id) 808 conn = _Client(token.address, authkey=authkey) 809 dispatch(conn, None, 'decref', (token.id,)) 810 except Exception as e: 811 util.debug('... decref failed %s', e) 812 813 else: 814 util.debug('DECREF %r -- manager already shutdown', token.id) 815 816 # check whether we can close this thread's connection because 817 # the process owns no more references to objects for this manager 818 if not idset and hasattr(tls, 'connection'): 819 util.debug('thread %r has no more proxies so closing conn', 820 threading.current_thread().name) 821 tls.connection.close() 822 del tls.connection 823 824 def _after_fork(self): 825 self._manager = None 826 try: 827 self._incref() 828 except Exception as e: 829 # the proxy may just be for a manager which has shutdown 830 util.info('incref failed: %s' % e) 831 832 def __reduce__(self): 833 kwds = {} 834 if Popen.thread_is_spawning(): 835 kwds['authkey'] = self._authkey 836 837 if getattr(self, '_isauto', False): 838 kwds['exposed'] = self._exposed_ 839 return (RebuildProxy, 840 (AutoProxy, self._token, self._serializer, kwds)) 841 else: 842 return (RebuildProxy, 843 (type(self), self._token, self._serializer, kwds)) 844 845 def __deepcopy__(self, memo): 846 return self._getvalue() 847 848 def __repr__(self): 849 return '<%s object, typeid %r at %s>' % \ 850 (type(self).__name__, self._token.typeid, '0x%x' % id(self)) 851 852 def __str__(self): 853 ''' 854 Return representation of the referent (or a fall-back if that fails) 855 ''' 856 try: 857 return self._callmethod('__repr__') 858 except Exception: 859 return repr(self)[:-1] + "; '__str__()' failed>" 860 861# 862# Function used for unpickling 863# 864 865def RebuildProxy(func, token, serializer, kwds): 866 ''' 867 Function used for unpickling proxy objects. 868 869 If possible the shared object is returned, or otherwise a proxy for it. 870 ''' 871 server = getattr(current_process(), '_manager_server', None) 872 873 if server and server.address == token.address: 874 return server.id_to_obj[token.id][0] 875 else: 876 incref = ( 877 kwds.pop('incref', True) and 878 not getattr(current_process(), '_inheriting', False) 879 ) 880 return func(token, serializer, incref=incref, **kwds) 881 882# 883# Functions to create proxies and proxy types 884# 885 886def MakeProxyType(name, exposed, _cache={}): 887 ''' 888 Return an proxy type whose methods are given by `exposed` 889 ''' 890 exposed = tuple(exposed) 891 try: 892 return _cache[(name, exposed)] 893 except KeyError: 894 pass 895 896 dic = {} 897 898 for meth in exposed: 899 exec('''def %s(self, *args, **kwds): 900 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) 901 902 ProxyType = type(name, (BaseProxy,), dic) 903 ProxyType._exposed_ = exposed 904 _cache[(name, exposed)] = ProxyType 905 return ProxyType 906 907 908def AutoProxy(token, serializer, manager=None, authkey=None, 909 exposed=None, incref=True): 910 ''' 911 Return an auto-proxy for `token` 912 ''' 913 _Client = listener_client[serializer][1] 914 915 if exposed is None: 916 conn = _Client(token.address, authkey=authkey) 917 try: 918 exposed = dispatch(conn, None, 'get_methods', (token,)) 919 finally: 920 conn.close() 921 922 if authkey is None and manager is not None: 923 authkey = manager._authkey 924 if authkey is None: 925 authkey = current_process().authkey 926 927 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 928 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 929 incref=incref) 930 proxy._isauto = True 931 return proxy 932 933# 934# Types/callables which we will register with SyncManager 935# 936 937class Namespace(object): 938 def __init__(self, **kwds): 939 self.__dict__.update(kwds) 940 def __repr__(self): 941 items = list(self.__dict__.items()) 942 temp = [] 943 for name, value in items: 944 if not name.startswith('_'): 945 temp.append('%s=%r' % (name, value)) 946 temp.sort() 947 return 'Namespace(%s)' % str.join(', ', temp) 948 949class Value(object): 950 def __init__(self, typecode, value, lock=True): 951 self._typecode = typecode 952 self._value = value 953 def get(self): 954 return self._value 955 def set(self, value): 956 self._value = value 957 def __repr__(self): 958 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 959 value = property(get, set) 960 961def Array(typecode, sequence, lock=True): 962 return array.array(typecode, sequence) 963 964# 965# Proxy types used by SyncManager 966# 967 968class IteratorProxy(BaseProxy): 969 _exposed_ = ('__next__', 'send', 'throw', 'close') 970 def __iter__(self): 971 return self 972 def __next__(self, *args): 973 return self._callmethod('__next__', args) 974 def send(self, *args): 975 return self._callmethod('send', args) 976 def throw(self, *args): 977 return self._callmethod('throw', args) 978 def close(self, *args): 979 return self._callmethod('close', args) 980 981 982class AcquirerProxy(BaseProxy): 983 _exposed_ = ('acquire', 'release') 984 def acquire(self, blocking=True): 985 return self._callmethod('acquire', (blocking,)) 986 def release(self): 987 return self._callmethod('release') 988 def __enter__(self): 989 return self._callmethod('acquire') 990 def __exit__(self, exc_type, exc_val, exc_tb): 991 return self._callmethod('release') 992 993 994class ConditionProxy(AcquirerProxy): 995 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 996 def wait(self, timeout=None): 997 return self._callmethod('wait', (timeout,)) 998 def notify(self): 999 return self._callmethod('notify') 1000 def notify_all(self): 1001 return self._callmethod('notify_all') 1002 1003class EventProxy(BaseProxy): 1004 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1005 def is_set(self): 1006 return self._callmethod('is_set') 1007 def set(self): 1008 return self._callmethod('set') 1009 def clear(self): 1010 return self._callmethod('clear') 1011 def wait(self, timeout=None): 1012 return self._callmethod('wait', (timeout,)) 1013 1014class NamespaceProxy(BaseProxy): 1015 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1016 def __getattr__(self, key): 1017 if key[0] == '_': 1018 return object.__getattribute__(self, key) 1019 callmethod = object.__getattribute__(self, '_callmethod') 1020 return callmethod('__getattribute__', (key,)) 1021 def __setattr__(self, key, value): 1022 if key[0] == '_': 1023 return object.__setattr__(self, key, value) 1024 callmethod = object.__getattribute__(self, '_callmethod') 1025 return callmethod('__setattr__', (key, value)) 1026 def __delattr__(self, key): 1027 if key[0] == '_': 1028 return object.__delattr__(self, key) 1029 callmethod = object.__getattribute__(self, '_callmethod') 1030 return callmethod('__delattr__', (key,)) 1031 1032 1033class ValueProxy(BaseProxy): 1034 _exposed_ = ('get', 'set') 1035 def get(self): 1036 return self._callmethod('get') 1037 def set(self, value): 1038 return self._callmethod('set', (value,)) 1039 value = property(get, set) 1040 1041 1042BaseListProxy = MakeProxyType('BaseListProxy', ( 1043 '__add__', '__contains__', '__delitem__', '__delslice__', 1044 '__getitem__', '__getslice__', '__len__', '__mul__', 1045 '__reversed__', '__rmul__', '__setitem__', '__setslice__', 1046 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1047 'reverse', 'sort', '__imul__' 1048 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 1049class ListProxy(BaseListProxy): 1050 def __iadd__(self, value): 1051 self._callmethod('extend', (value,)) 1052 return self 1053 def __imul__(self, value): 1054 self._callmethod('__imul__', (value,)) 1055 return self 1056 1057 1058DictProxy = MakeProxyType('DictProxy', ( 1059 '__contains__', '__delitem__', '__getitem__', '__len__', 1060 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', 1061 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1062 )) 1063 1064 1065ArrayProxy = MakeProxyType('ArrayProxy', ( 1066 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' 1067 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 1068 1069 1070PoolProxy = MakeProxyType('PoolProxy', ( 1071 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1072 'map', 'map_async', 'terminate' 1073 )) 1074PoolProxy._method_to_typeid_ = { 1075 'apply_async': 'AsyncResult', 1076 'map_async': 'AsyncResult', 1077 'imap': 'Iterator', 1078 'imap_unordered': 'Iterator' 1079 } 1080 1081# 1082# Definition of SyncManager 1083# 1084 1085class SyncManager(BaseManager): 1086 ''' 1087 Subclass of `BaseManager` which supports a number of shared object types. 1088 1089 The types registered are those intended for the synchronization 1090 of threads, plus `dict`, `list` and `Namespace`. 1091 1092 The `multiprocessing.Manager()` function creates started instances of 1093 this class. 1094 ''' 1095 1096SyncManager.register('Queue', queue.Queue) 1097SyncManager.register('JoinableQueue', queue.Queue) 1098SyncManager.register('Event', threading.Event, EventProxy) 1099SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1100SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1101SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1102SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1103 AcquirerProxy) 1104SyncManager.register('Condition', threading.Condition, ConditionProxy) 1105SyncManager.register('Pool', Pool, PoolProxy) 1106SyncManager.register('list', list, ListProxy) 1107SyncManager.register('dict', dict, DictProxy) 1108SyncManager.register('Value', Value, ValueProxy) 1109SyncManager.register('Array', Array, ArrayProxy) 1110SyncManager.register('Namespace', Namespace, NamespaceProxy) 1111 1112# types returned by methods of PoolProxy 1113SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1114SyncManager.register('AsyncResult', create_method=False) 1115