1# Copyright 2011-present MongoDB, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you 4# may not use this file except in compliance with the License. You 5# may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 12# implied. See the License for the specific language governing 13# permissions and limitations under the License. 14 15import contextlib 16import copy 17import os 18import platform 19import socket 20import sys 21import threading 22import collections 23import weakref 24 25from pymongo.ssl_support import ( 26 SSLError as _SSLError, 27 HAS_SNI as _HAVE_SNI, 28 IPADDR_SAFE as _IPADDR_SAFE) 29 30from bson import DEFAULT_CODEC_OPTIONS 31from bson.py3compat import imap, itervalues, _unicode 32from bson.son import SON 33from pymongo import auth, helpers, thread_util, __version__ 34from pymongo.client_session import _validate_session_write_concern 35from pymongo.common import (MAX_BSON_SIZE, 36 MAX_IDLE_TIME_SEC, 37 MAX_MESSAGE_SIZE, 38 MAX_POOL_SIZE, 39 MAX_WIRE_VERSION, 40 MAX_WRITE_BATCH_SIZE, 41 MIN_POOL_SIZE, 42 ORDERED_TYPES, 43 WAIT_QUEUE_TIMEOUT) 44from pymongo.errors import (AutoReconnect, 45 CertificateError, 46 ConnectionFailure, 47 ConfigurationError, 48 InvalidOperation, 49 DocumentTooLarge, 50 NetworkTimeout, 51 NotPrimaryError, 52 OperationFailure, 53 PyMongoError) 54from pymongo.hello_compat import HelloCompat 55from pymongo._ipaddress import is_ip_address 56from pymongo.ismaster import IsMaster 57from pymongo.monotonic import time as _time 58from pymongo.monitoring import (ConnectionCheckOutFailedReason, 59 ConnectionClosedReason) 60from pymongo.network import (command, 61 receive_message) 62from pymongo.read_preferences import ReadPreference 63from pymongo.server_api import _add_to_command 64from pymongo.server_type import SERVER_TYPE 65from pymongo.socket_checker import SocketChecker 66# Always use our backport so we always have support for IP address matching 67from pymongo.ssl_match_hostname import match_hostname 68 69try: 70 from fcntl import fcntl, F_GETFD, F_SETFD, FD_CLOEXEC 71 def _set_non_inheritable_non_atomic(fd): 72 """Set the close-on-exec flag on the given file descriptor.""" 73 flags = fcntl(fd, F_GETFD) 74 fcntl(fd, F_SETFD, flags | FD_CLOEXEC) 75except ImportError: 76 # Windows, various platforms we don't claim to support 77 # (Jython, IronPython, ...), systems that don't provide 78 # everything we need from fcntl, etc. 79 def _set_non_inheritable_non_atomic(dummy): 80 """Dummy function for platforms that don't provide fcntl.""" 81 pass 82 83_MAX_TCP_KEEPIDLE = 120 84_MAX_TCP_KEEPINTVL = 10 85_MAX_TCP_KEEPCNT = 9 86 87if sys.platform == 'win32': 88 try: 89 import _winreg as winreg 90 except ImportError: 91 import winreg 92 93 def _query(key, name, default): 94 try: 95 value, _ = winreg.QueryValueEx(key, name) 96 # Ensure the value is a number or raise ValueError. 97 return int(value) 98 except (OSError, ValueError): 99 # QueryValueEx raises OSError when the key does not exist (i.e. 100 # the system is using the Windows default value). 101 return default 102 103 try: 104 with winreg.OpenKey( 105 winreg.HKEY_LOCAL_MACHINE, 106 r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters") as key: 107 _WINDOWS_TCP_IDLE_MS = _query(key, "KeepAliveTime", 7200000) 108 _WINDOWS_TCP_INTERVAL_MS = _query(key, "KeepAliveInterval", 1000) 109 except OSError: 110 # We could not check the default values because winreg.OpenKey failed. 111 # Assume the system is using the default values. 112 _WINDOWS_TCP_IDLE_MS = 7200000 113 _WINDOWS_TCP_INTERVAL_MS = 1000 114 115 def _set_keepalive_times(sock): 116 idle_ms = min(_WINDOWS_TCP_IDLE_MS, _MAX_TCP_KEEPIDLE * 1000) 117 interval_ms = min(_WINDOWS_TCP_INTERVAL_MS, 118 _MAX_TCP_KEEPINTVL * 1000) 119 if (idle_ms < _WINDOWS_TCP_IDLE_MS or 120 interval_ms < _WINDOWS_TCP_INTERVAL_MS): 121 sock.ioctl(socket.SIO_KEEPALIVE_VALS, 122 (1, idle_ms, interval_ms)) 123else: 124 def _set_tcp_option(sock, tcp_option, max_value): 125 if hasattr(socket, tcp_option): 126 sockopt = getattr(socket, tcp_option) 127 try: 128 # PYTHON-1350 - NetBSD doesn't implement getsockopt for 129 # TCP_KEEPIDLE and friends. Don't attempt to set the 130 # values there. 131 default = sock.getsockopt(socket.IPPROTO_TCP, sockopt) 132 if default > max_value: 133 sock.setsockopt(socket.IPPROTO_TCP, sockopt, max_value) 134 except socket.error: 135 pass 136 137 def _set_keepalive_times(sock): 138 _set_tcp_option(sock, 'TCP_KEEPIDLE', _MAX_TCP_KEEPIDLE) 139 _set_tcp_option(sock, 'TCP_KEEPINTVL', _MAX_TCP_KEEPINTVL) 140 _set_tcp_option(sock, 'TCP_KEEPCNT', _MAX_TCP_KEEPCNT) 141 142_METADATA = SON([ 143 ('driver', SON([('name', 'PyMongo'), ('version', __version__)])), 144]) 145 146if sys.platform.startswith('linux'): 147 # platform.linux_distribution was deprecated in Python 3.5. 148 if sys.version_info[:2] < (3, 5): 149 # Distro name and version (e.g. Ubuntu 16.04 xenial) 150 _name = ' '.join([part for part in 151 platform.linux_distribution() if part]) 152 else: 153 _name = platform.system() 154 _METADATA['os'] = SON([ 155 ('type', platform.system()), 156 ('name', _name), 157 ('architecture', platform.machine()), 158 # Kernel version (e.g. 4.4.0-17-generic). 159 ('version', platform.release()) 160 ]) 161elif sys.platform == 'darwin': 162 _METADATA['os'] = SON([ 163 ('type', platform.system()), 164 ('name', platform.system()), 165 ('architecture', platform.machine()), 166 # (mac|i|tv)OS(X) version (e.g. 10.11.6) instead of darwin 167 # kernel version. 168 ('version', platform.mac_ver()[0]) 169 ]) 170elif sys.platform == 'win32': 171 _METADATA['os'] = SON([ 172 ('type', platform.system()), 173 # "Windows XP", "Windows 7", "Windows 10", etc. 174 ('name', ' '.join((platform.system(), platform.release()))), 175 ('architecture', platform.machine()), 176 # Windows patch level (e.g. 5.1.2600-SP3) 177 ('version', '-'.join(platform.win32_ver()[1:3])) 178 ]) 179elif sys.platform.startswith('java'): 180 _name, _ver, _arch = platform.java_ver()[-1] 181 _METADATA['os'] = SON([ 182 # Linux, Windows 7, Mac OS X, etc. 183 ('type', _name), 184 ('name', _name), 185 # x86, x86_64, AMD64, etc. 186 ('architecture', _arch), 187 # Linux kernel version, OSX version, etc. 188 ('version', _ver) 189 ]) 190else: 191 # Get potential alias (e.g. SunOS 5.11 becomes Solaris 2.11) 192 _aliased = platform.system_alias( 193 platform.system(), platform.release(), platform.version()) 194 _METADATA['os'] = SON([ 195 ('type', platform.system()), 196 ('name', ' '.join([part for part in _aliased[:2] if part])), 197 ('architecture', platform.machine()), 198 ('version', _aliased[2]) 199 ]) 200 201if platform.python_implementation().startswith('PyPy'): 202 _METADATA['platform'] = ' '.join( 203 (platform.python_implementation(), 204 '.'.join(imap(str, sys.pypy_version_info)), 205 '(Python %s)' % '.'.join(imap(str, sys.version_info)))) 206elif sys.platform.startswith('java'): 207 _METADATA['platform'] = ' '.join( 208 (platform.python_implementation(), 209 '.'.join(imap(str, sys.version_info)), 210 '(%s)' % ' '.join((platform.system(), platform.release())))) 211else: 212 _METADATA['platform'] = ' '.join( 213 (platform.python_implementation(), 214 '.'.join(imap(str, sys.version_info)))) 215 216 217# If the first getaddrinfo call of this interpreter's life is on a thread, 218# while the main thread holds the import lock, getaddrinfo deadlocks trying 219# to import the IDNA codec. Import it here, where presumably we're on the 220# main thread, to avoid the deadlock. See PYTHON-607. 221u'foo'.encode('idna') 222 223# Remove after PYTHON-2712 224_MOCK_SERVICE_ID = False 225 226 227def _raise_connection_failure(address, error, msg_prefix=None): 228 """Convert a socket.error to ConnectionFailure and raise it.""" 229 host, port = address 230 # If connecting to a Unix socket, port will be None. 231 if port is not None: 232 msg = '%s:%d: %s' % (host, port, error) 233 else: 234 msg = '%s: %s' % (host, error) 235 if msg_prefix: 236 msg = msg_prefix + msg 237 if isinstance(error, socket.timeout): 238 raise NetworkTimeout(msg) 239 elif isinstance(error, _SSLError) and 'timed out' in str(error): 240 # CPython 2.7 and PyPy 2.x do not distinguish network 241 # timeouts from other SSLErrors (https://bugs.python.org/issue10272). 242 # Luckily, we can work around this limitation because the phrase 243 # 'timed out' appears in all the timeout related SSLErrors raised 244 # on the above platforms. 245 raise NetworkTimeout(msg) 246 else: 247 raise AutoReconnect(msg) 248 249 250class PoolOptions(object): 251 252 __slots__ = ('__max_pool_size', '__min_pool_size', 253 '__max_idle_time_seconds', 254 '__connect_timeout', '__socket_timeout', 255 '__wait_queue_timeout', '__wait_queue_multiple', 256 '__ssl_context', '__ssl_match_hostname', '__socket_keepalive', 257 '__event_listeners', '__appname', '__driver', '__metadata', 258 '__compression_settings', '__server_api', '__load_balanced') 259 260 def __init__(self, max_pool_size=MAX_POOL_SIZE, 261 min_pool_size=MIN_POOL_SIZE, 262 max_idle_time_seconds=MAX_IDLE_TIME_SEC, connect_timeout=None, 263 socket_timeout=None, wait_queue_timeout=WAIT_QUEUE_TIMEOUT, 264 wait_queue_multiple=None, ssl_context=None, 265 ssl_match_hostname=True, socket_keepalive=True, 266 event_listeners=None, appname=None, driver=None, 267 compression_settings=None, server_api=None, 268 load_balanced=None): 269 self.__max_pool_size = max_pool_size 270 self.__min_pool_size = min_pool_size 271 self.__max_idle_time_seconds = max_idle_time_seconds 272 self.__connect_timeout = connect_timeout 273 self.__socket_timeout = socket_timeout 274 self.__wait_queue_timeout = wait_queue_timeout 275 self.__wait_queue_multiple = wait_queue_multiple 276 self.__ssl_context = ssl_context 277 self.__ssl_match_hostname = ssl_match_hostname 278 self.__socket_keepalive = socket_keepalive 279 self.__event_listeners = event_listeners 280 self.__appname = appname 281 self.__driver = driver 282 self.__compression_settings = compression_settings 283 self.__server_api = server_api 284 self.__load_balanced = load_balanced 285 self.__metadata = copy.deepcopy(_METADATA) 286 if appname: 287 self.__metadata['application'] = {'name': appname} 288 289 # Combine the "driver" MongoClient option with PyMongo's info, like: 290 # { 291 # 'driver': { 292 # 'name': 'PyMongo|MyDriver', 293 # 'version': '3.7.0|1.2.3', 294 # }, 295 # 'platform': 'CPython 3.6.0|MyPlatform' 296 # } 297 if driver: 298 if driver.name: 299 self.__metadata['driver']['name'] = "%s|%s" % ( 300 _METADATA['driver']['name'], driver.name) 301 if driver.version: 302 self.__metadata['driver']['version'] = "%s|%s" % ( 303 _METADATA['driver']['version'], driver.version) 304 if driver.platform: 305 self.__metadata['platform'] = "%s|%s" % ( 306 _METADATA['platform'], driver.platform) 307 308 @property 309 def non_default_options(self): 310 """The non-default options this pool was created with. 311 312 Added for CMAP's :class:`PoolCreatedEvent`. 313 """ 314 opts = {} 315 if self.__max_pool_size != MAX_POOL_SIZE: 316 opts['maxPoolSize'] = self.__max_pool_size 317 if self.__min_pool_size != MIN_POOL_SIZE: 318 opts['minPoolSize'] = self.__min_pool_size 319 if self.__max_idle_time_seconds != MAX_IDLE_TIME_SEC: 320 opts['maxIdleTimeMS'] = self.__max_idle_time_seconds * 1000 321 if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT: 322 opts['waitQueueTimeoutMS'] = self.__wait_queue_timeout * 1000 323 return opts 324 325 @property 326 def max_pool_size(self): 327 """The maximum allowable number of concurrent connections to each 328 connected server. Requests to a server will block if there are 329 `maxPoolSize` outstanding connections to the requested server. 330 Defaults to 100. Cannot be 0. 331 332 When a server's pool has reached `max_pool_size`, operations for that 333 server block waiting for a socket to be returned to the pool. If 334 ``waitQueueTimeoutMS`` is set, a blocked operation will raise 335 :exc:`~pymongo.errors.ConnectionFailure` after a timeout. 336 By default ``waitQueueTimeoutMS`` is not set. 337 """ 338 return self.__max_pool_size 339 340 @property 341 def min_pool_size(self): 342 """The minimum required number of concurrent connections that the pool 343 will maintain to each connected server. Default is 0. 344 """ 345 return self.__min_pool_size 346 347 @property 348 def max_idle_time_seconds(self): 349 """The maximum number of seconds that a connection can remain 350 idle in the pool before being removed and replaced. Defaults to 351 `None` (no limit). 352 """ 353 return self.__max_idle_time_seconds 354 355 @property 356 def connect_timeout(self): 357 """How long a connection can take to be opened before timing out. 358 """ 359 return self.__connect_timeout 360 361 @property 362 def socket_timeout(self): 363 """How long a send or receive on a socket can take before timing out. 364 """ 365 return self.__socket_timeout 366 367 @property 368 def wait_queue_timeout(self): 369 """How long a thread will wait for a socket from the pool if the pool 370 has no free sockets. 371 """ 372 return self.__wait_queue_timeout 373 374 @property 375 def wait_queue_multiple(self): 376 """Multiplied by max_pool_size to give the number of threads allowed 377 to wait for a socket at one time. 378 """ 379 return self.__wait_queue_multiple 380 381 @property 382 def ssl_context(self): 383 """An SSLContext instance or None. 384 """ 385 return self.__ssl_context 386 387 @property 388 def ssl_match_hostname(self): 389 """Call ssl.match_hostname if cert_reqs is not ssl.CERT_NONE. 390 """ 391 return self.__ssl_match_hostname 392 393 @property 394 def socket_keepalive(self): 395 """Whether to send periodic messages to determine if a connection 396 is closed. 397 """ 398 return self.__socket_keepalive 399 400 @property 401 def event_listeners(self): 402 """An instance of pymongo.monitoring._EventListeners. 403 """ 404 return self.__event_listeners 405 406 @property 407 def appname(self): 408 """The application name, for sending with hello in server handshake. 409 """ 410 return self.__appname 411 412 @property 413 def driver(self): 414 """Driver name and version, for sending with hello in handshake. 415 """ 416 return self.__driver 417 418 @property 419 def compression_settings(self): 420 return self.__compression_settings 421 422 @property 423 def metadata(self): 424 """A dict of metadata about the application, driver, os, and platform. 425 """ 426 return self.__metadata.copy() 427 428 @property 429 def server_api(self): 430 """A pymongo.server_api.ServerApi or None. 431 """ 432 return self.__server_api 433 434 @property 435 def load_balanced(self): 436 """True if this Pool is configured in load balanced mode. 437 """ 438 return self.__load_balanced 439 440 441def _negotiate_creds(all_credentials): 442 """Return one credential that needs mechanism negotiation, if any. 443 """ 444 if all_credentials: 445 for creds in all_credentials.values(): 446 if creds.mechanism == 'DEFAULT' and creds.username: 447 return creds 448 return None 449 450 451def _speculative_context(all_credentials): 452 """Return the _AuthContext to use for speculative auth, if any. 453 """ 454 if all_credentials and len(all_credentials) == 1: 455 creds = next(itervalues(all_credentials)) 456 return auth._AuthContext.from_credentials(creds) 457 return None 458 459 460class _CancellationContext(object): 461 def __init__(self): 462 self._cancelled = False 463 464 def cancel(self): 465 """Cancel this context.""" 466 self._cancelled = True 467 468 @property 469 def cancelled(self): 470 """Was cancel called?""" 471 return self._cancelled 472 473 474class SocketInfo(object): 475 """Store a socket with some metadata. 476 477 :Parameters: 478 - `sock`: a raw socket object 479 - `pool`: a Pool instance 480 - `address`: the server's (host, port) 481 - `id`: the id of this socket in it's pool 482 """ 483 def __init__(self, sock, pool, address, id): 484 self.pool_ref = weakref.ref(pool) 485 self.sock = sock 486 self.address = address 487 self.id = id 488 self.authset = set() 489 self.closed = False 490 self.last_checkin_time = _time() 491 self.performed_handshake = False 492 self.is_writable = False 493 self.max_wire_version = MAX_WIRE_VERSION 494 self.max_bson_size = MAX_BSON_SIZE 495 self.max_message_size = MAX_MESSAGE_SIZE 496 self.max_write_batch_size = MAX_WRITE_BATCH_SIZE 497 self.supports_sessions = False 498 self.hello_ok = None 499 self.is_mongos = False 500 self.op_msg_enabled = False 501 self.listeners = pool.opts.event_listeners 502 self.enabled_for_cmap = pool.enabled_for_cmap 503 self.compression_settings = pool.opts.compression_settings 504 self.compression_context = None 505 self.socket_checker = SocketChecker() 506 # Support for mechanism negotiation on the initial handshake. 507 # Maps credential to saslSupportedMechs. 508 self.negotiated_mechanisms = {} 509 self.auth_ctx = {} 510 511 # The pool's generation changes with each reset() so we can close 512 # sockets created before the last reset. 513 self.pool_gen = pool.gen 514 self.generation = self.pool_gen.get_overall() 515 self.ready = False 516 self.cancel_context = None 517 if not pool.handshake: 518 # This is a Monitor connection. 519 self.cancel_context = _CancellationContext() 520 self.opts = pool.opts 521 self.more_to_come = False 522 # For load balancer support. 523 self.service_id = None 524 # When executing a transaction in load balancing mode, this flag is 525 # set to true to indicate that the session now owns the connection. 526 self.pinned_txn = False 527 self.pinned_cursor = False 528 self.active = False 529 530 def pin_txn(self): 531 self.pinned_txn = True 532 assert not self.pinned_cursor 533 534 def pin_cursor(self): 535 self.pinned_cursor = True 536 assert not self.pinned_txn 537 538 def unpin(self): 539 pool = self.pool_ref() 540 if pool: 541 pool.return_socket(self) 542 else: 543 self.close_socket(ConnectionClosedReason.STALE) 544 545 def hello_cmd(self): 546 if self.opts.server_api or self.hello_ok: 547 return SON([(HelloCompat.CMD, 1)]) 548 else: 549 return SON([(HelloCompat.LEGACY_CMD, 1), ('helloOk', True)]) 550 551 def hello(self, all_credentials=None): 552 return self._hello(None, None, None, all_credentials) 553 554 def _hello(self, cluster_time, topology_version, 555 heartbeat_frequency, all_credentials): 556 cmd = self.hello_cmd() 557 performing_handshake = not self.performed_handshake 558 awaitable = False 559 if performing_handshake: 560 self.performed_handshake = True 561 cmd['client'] = self.opts.metadata 562 if self.compression_settings: 563 cmd['compression'] = self.compression_settings.compressors 564 if self.opts.load_balanced: 565 cmd['loadBalanced'] = True 566 elif topology_version is not None: 567 cmd['topologyVersion'] = topology_version 568 cmd['maxAwaitTimeMS'] = int(heartbeat_frequency*1000) 569 awaitable = True 570 # If connect_timeout is None there is no timeout. 571 if self.opts.connect_timeout: 572 self.sock.settimeout( 573 self.opts.connect_timeout + heartbeat_frequency) 574 575 if self.max_wire_version >= 6 and cluster_time is not None: 576 cmd['$clusterTime'] = cluster_time 577 578 # XXX: Simplify in PyMongo 4.0 when all_credentials is always a single 579 # unchangeable value per MongoClient. 580 creds = _negotiate_creds(all_credentials) 581 if creds: 582 cmd['saslSupportedMechs'] = creds.source + '.' + creds.username 583 auth_ctx = _speculative_context(all_credentials) 584 if auth_ctx: 585 cmd['speculativeAuthenticate'] = auth_ctx.speculate_command() 586 587 doc = self.command('admin', cmd, publish_events=False, 588 exhaust_allowed=awaitable) 589 # PYTHON-2712 will remove this topologyVersion fallback logic. 590 if self.opts.load_balanced and _MOCK_SERVICE_ID: 591 process_id = doc.get('topologyVersion', {}).get('processId') 592 doc.setdefault('serviceId', process_id) 593 if not self.opts.load_balanced: 594 doc.pop('serviceId', None) 595 hello = IsMaster(doc, awaitable=awaitable) 596 self.is_writable = hello.is_writable 597 self.max_wire_version = hello.max_wire_version 598 self.max_bson_size = hello.max_bson_size 599 self.max_message_size = hello.max_message_size 600 self.max_write_batch_size = hello.max_write_batch_size 601 self.supports_sessions = ( 602 hello.logical_session_timeout_minutes is not None) 603 self.hello_ok = hello.hello_ok 604 self.is_mongos = hello.server_type == SERVER_TYPE.Mongos 605 if performing_handshake and self.compression_settings: 606 ctx = self.compression_settings.get_compression_context( 607 hello.compressors) 608 self.compression_context = ctx 609 610 self.op_msg_enabled = hello.max_wire_version >= 6 611 if creds: 612 self.negotiated_mechanisms[creds] = hello.sasl_supported_mechs 613 if auth_ctx: 614 auth_ctx.parse_response(hello) 615 if auth_ctx.speculate_succeeded(): 616 self.auth_ctx[auth_ctx.credentials] = auth_ctx 617 if self.opts.load_balanced: 618 if not hello.service_id: 619 raise ConfigurationError( 620 'Driver attempted to initialize in load balancing mode,' 621 ' but the server does not support this mode') 622 self.service_id = hello.service_id 623 self.generation = self.pool_gen.get(self.service_id) 624 return hello 625 626 def _next_reply(self): 627 reply = self.receive_message(None) 628 self.more_to_come = reply.more_to_come 629 unpacked_docs = reply.unpack_response() 630 response_doc = unpacked_docs[0] 631 helpers._check_command_response(response_doc, self.max_wire_version) 632 # Remove after PYTHON-2712. 633 if not self.opts.load_balanced: 634 response_doc.pop('serviceId', None) 635 return response_doc 636 637 def command(self, dbname, spec, secondary_ok=False, 638 read_preference=ReadPreference.PRIMARY, 639 codec_options=DEFAULT_CODEC_OPTIONS, check=True, 640 allowable_errors=None, check_keys=False, 641 read_concern=None, 642 write_concern=None, 643 parse_write_concern_error=False, 644 collation=None, 645 session=None, 646 client=None, 647 retryable_write=False, 648 publish_events=True, 649 user_fields=None, 650 exhaust_allowed=False): 651 """Execute a command or raise an error. 652 653 :Parameters: 654 - `dbname`: name of the database on which to run the command 655 - `spec`: a command document as a dict, SON, or mapping object 656 - `secondary_ok`: whether to set the secondaryOkay wire protocol bit 657 - `read_preference`: a read preference 658 - `codec_options`: a CodecOptions instance 659 - `check`: raise OperationFailure if there are errors 660 - `allowable_errors`: errors to ignore if `check` is True 661 - `check_keys`: if True, check `spec` for invalid keys 662 - `read_concern`: The read concern for this command. 663 - `write_concern`: The write concern for this command. 664 - `parse_write_concern_error`: Whether to parse the 665 ``writeConcernError`` field in the command response. 666 - `collation`: The collation for this command. 667 - `session`: optional ClientSession instance. 668 - `client`: optional MongoClient for gossipping $clusterTime. 669 - `retryable_write`: True if this command is a retryable write. 670 - `publish_events`: Should we publish events for this command? 671 - `user_fields` (optional): Response fields that should be decoded 672 using the TypeDecoders from codec_options, passed to 673 bson._decode_all_selective. 674 """ 675 self.validate_session(client, session) 676 session = _validate_session_write_concern(session, write_concern) 677 678 # Ensure command name remains in first place. 679 if not isinstance(spec, ORDERED_TYPES): 680 spec = SON(spec) 681 682 if (read_concern and self.max_wire_version < 4 683 and not read_concern.ok_for_legacy): 684 raise ConfigurationError( 685 'read concern level of %s is not valid ' 686 'with a max wire version of %d.' 687 % (read_concern.level, self.max_wire_version)) 688 if not (write_concern is None or write_concern.acknowledged or 689 collation is None): 690 raise ConfigurationError( 691 'Collation is unsupported for unacknowledged writes.') 692 if (self.max_wire_version >= 5 and 693 write_concern and 694 not write_concern.is_server_default): 695 spec['writeConcern'] = write_concern.document 696 elif self.max_wire_version < 5 and collation is not None: 697 raise ConfigurationError( 698 'Must be connected to MongoDB 3.4+ to use a collation.') 699 700 self.add_server_api(spec) 701 if session: 702 session._apply_to(spec, retryable_write, read_preference, 703 self) 704 self.send_cluster_time(spec, session, client) 705 listeners = self.listeners if publish_events else None 706 unacknowledged = write_concern and not write_concern.acknowledged 707 if self.op_msg_enabled: 708 self._raise_if_not_writable(unacknowledged) 709 try: 710 return command(self, dbname, spec, secondary_ok, 711 self.is_mongos, read_preference, codec_options, 712 session, client, check, allowable_errors, 713 self.address, check_keys, listeners, 714 self.max_bson_size, read_concern, 715 parse_write_concern_error=parse_write_concern_error, 716 collation=collation, 717 compression_ctx=self.compression_context, 718 use_op_msg=self.op_msg_enabled, 719 unacknowledged=unacknowledged, 720 user_fields=user_fields, 721 exhaust_allowed=exhaust_allowed) 722 except (OperationFailure, NotPrimaryError): 723 raise 724 # Catch socket.error, KeyboardInterrupt, etc. and close ourselves. 725 except BaseException as error: 726 self._raise_connection_failure(error) 727 728 def send_message(self, message, max_doc_size): 729 """Send a raw BSON message or raise ConnectionFailure. 730 731 If a network exception is raised, the socket is closed. 732 """ 733 if (self.max_bson_size is not None 734 and max_doc_size > self.max_bson_size): 735 raise DocumentTooLarge( 736 "BSON document too large (%d bytes) - the connected server " 737 "supports BSON document sizes up to %d bytes." % 738 (max_doc_size, self.max_bson_size)) 739 740 try: 741 self.sock.sendall(message) 742 except BaseException as error: 743 self._raise_connection_failure(error) 744 745 def receive_message(self, request_id): 746 """Receive a raw BSON message or raise ConnectionFailure. 747 748 If any exception is raised, the socket is closed. 749 """ 750 try: 751 return receive_message(self, request_id, self.max_message_size) 752 except BaseException as error: 753 self._raise_connection_failure(error) 754 755 def _raise_if_not_writable(self, unacknowledged): 756 """Raise NotPrimaryError on unacknowledged write if this socket is not 757 writable. 758 """ 759 if unacknowledged and not self.is_writable: 760 # Write won't succeed, bail as if we'd received a not primary error. 761 raise NotPrimaryError("not primary", { 762 "ok": 0, "errmsg": "not primary", "code": 10107}) 763 764 def legacy_write(self, request_id, msg, max_doc_size, with_last_error): 765 """Send OP_INSERT, etc., optionally returning response as a dict. 766 767 Can raise ConnectionFailure or OperationFailure. 768 769 :Parameters: 770 - `request_id`: an int. 771 - `msg`: bytes, an OP_INSERT, OP_UPDATE, or OP_DELETE message, 772 perhaps with a getlasterror command appended. 773 - `max_doc_size`: size in bytes of the largest document in `msg`. 774 - `with_last_error`: True if a getlasterror command is appended. 775 """ 776 self._raise_if_not_writable(not with_last_error) 777 778 self.send_message(msg, max_doc_size) 779 if with_last_error: 780 reply = self.receive_message(request_id) 781 return helpers._check_gle_response(reply.command_response(), 782 self.max_wire_version) 783 784 def write_command(self, request_id, msg): 785 """Send "insert" etc. command, returning response as a dict. 786 787 Can raise ConnectionFailure or OperationFailure. 788 789 :Parameters: 790 - `request_id`: an int. 791 - `msg`: bytes, the command message. 792 """ 793 self.send_message(msg, 0) 794 reply = self.receive_message(request_id) 795 result = reply.command_response() 796 797 # Raises NotPrimaryError or OperationFailure. 798 helpers._check_command_response(result, self.max_wire_version) 799 return result 800 801 def check_auth(self, all_credentials): 802 """Update this socket's authentication. 803 804 Log in or out to bring this socket's credentials up to date with 805 those provided. Can raise ConnectionFailure or OperationFailure. 806 807 :Parameters: 808 - `all_credentials`: dict, maps auth source to MongoCredential. 809 """ 810 if all_credentials or self.authset: 811 cached = set(itervalues(all_credentials)) 812 authset = self.authset.copy() 813 814 # Logout any credentials that no longer exist in the cache. 815 for credentials in authset - cached: 816 auth.logout(credentials.source, self) 817 self.authset.discard(credentials) 818 819 for credentials in cached - authset: 820 self.authenticate(credentials) 821 822 # CMAP spec says to publish the ready event only after authenticating 823 # the connection. 824 if not self.ready: 825 self.ready = True 826 if self.enabled_for_cmap: 827 self.listeners.publish_connection_ready(self.address, self.id) 828 829 def authenticate(self, credentials): 830 """Log in to the server and store these credentials in `authset`. 831 832 Can raise ConnectionFailure or OperationFailure. 833 834 :Parameters: 835 - `credentials`: A MongoCredential. 836 """ 837 auth.authenticate(credentials, self) 838 self.authset.add(credentials) 839 # negotiated_mechanisms are no longer needed. 840 self.negotiated_mechanisms.pop(credentials, None) 841 self.auth_ctx.pop(credentials, None) 842 843 def validate_session(self, client, session): 844 """Validate this session before use with client. 845 846 Raises error if this session is logged in as a different user or 847 the client is not the one that created the session. 848 """ 849 if session: 850 if session._client is not client: 851 raise InvalidOperation( 852 'Can only use session with the MongoClient that' 853 ' started it') 854 if session._authset != self.authset: 855 raise InvalidOperation( 856 'Cannot use session after authenticating with different' 857 ' credentials') 858 859 def close_socket(self, reason): 860 """Close this connection with a reason.""" 861 if self.closed: 862 return 863 self._close_socket() 864 if reason and self.enabled_for_cmap: 865 self.listeners.publish_connection_closed( 866 self.address, self.id, reason) 867 868 def _close_socket(self): 869 """Close this connection.""" 870 if self.closed: 871 return 872 self.closed = True 873 if self.cancel_context: 874 self.cancel_context.cancel() 875 # Note: We catch exceptions to avoid spurious errors on interpreter 876 # shutdown. 877 try: 878 self.sock.close() 879 except Exception: 880 pass 881 882 def socket_closed(self): 883 """Return True if we know socket has been closed, False otherwise.""" 884 return self.socket_checker.socket_closed(self.sock) 885 886 def send_cluster_time(self, command, session, client): 887 """Add cluster time for MongoDB >= 3.6.""" 888 if self.max_wire_version >= 6 and client: 889 client._send_cluster_time(command, session) 890 891 def add_server_api(self, command): 892 """Add server_api parameters.""" 893 if self.opts.server_api: 894 _add_to_command(command, self.opts.server_api) 895 896 def update_last_checkin_time(self): 897 self.last_checkin_time = _time() 898 899 def update_is_writable(self, is_writable): 900 self.is_writable = is_writable 901 902 def idle_time_seconds(self): 903 """Seconds since this socket was last checked into its pool.""" 904 return _time() - self.last_checkin_time 905 906 def _raise_connection_failure(self, error): 907 # Catch *all* exceptions from socket methods and close the socket. In 908 # regular Python, socket operations only raise socket.error, even if 909 # the underlying cause was a Ctrl-C: a signal raised during socket.recv 910 # is expressed as an EINTR error from poll. See internal_select_ex() in 911 # socketmodule.c. All error codes from poll become socket.error at 912 # first. Eventually in PyEval_EvalFrameEx the interpreter checks for 913 # signals and throws KeyboardInterrupt into the current frame on the 914 # main thread. 915 # 916 # But in Gevent and Eventlet, the polling mechanism (epoll, kqueue, 917 # ...) is called in Python code, which experiences the signal as a 918 # KeyboardInterrupt from the start, rather than as an initial 919 # socket.error, so we catch that, close the socket, and reraise it. 920 # 921 # The connection closed event will be emitted later in return_socket. 922 if self.ready: 923 reason = None 924 else: 925 reason = ConnectionClosedReason.ERROR 926 self.close_socket(reason) 927 # SSLError from PyOpenSSL inherits directly from Exception. 928 if isinstance(error, (IOError, OSError, _SSLError)): 929 _raise_connection_failure(self.address, error) 930 else: 931 raise 932 933 def __eq__(self, other): 934 return self.sock == other.sock 935 936 def __ne__(self, other): 937 return not self == other 938 939 def __hash__(self): 940 return hash(self.sock) 941 942 def __repr__(self): 943 return "SocketInfo(%s)%s at %s" % ( 944 repr(self.sock), 945 self.closed and " CLOSED" or "", 946 id(self) 947 ) 948 949 950def _create_connection(address, options): 951 """Given (host, port) and PoolOptions, connect and return a socket object. 952 953 Can raise socket.error. 954 955 This is a modified version of create_connection from CPython >= 2.7. 956 """ 957 host, port = address 958 959 # Check if dealing with a unix domain socket 960 if host.endswith('.sock'): 961 if not hasattr(socket, "AF_UNIX"): 962 raise ConnectionFailure("UNIX-sockets are not supported " 963 "on this system") 964 sock = socket.socket(socket.AF_UNIX) 965 # SOCK_CLOEXEC not supported for Unix sockets. 966 _set_non_inheritable_non_atomic(sock.fileno()) 967 try: 968 sock.connect(host) 969 return sock 970 except socket.error: 971 sock.close() 972 raise 973 974 # Don't try IPv6 if we don't support it. Also skip it if host 975 # is 'localhost' (::1 is fine). Avoids slow connect issues 976 # like PYTHON-356. 977 family = socket.AF_INET 978 if socket.has_ipv6 and host != 'localhost': 979 family = socket.AF_UNSPEC 980 981 err = None 982 for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM): 983 af, socktype, proto, dummy, sa = res 984 # SOCK_CLOEXEC was new in CPython 3.2, and only available on a limited 985 # number of platforms (newer Linux and *BSD). Starting with CPython 3.4 986 # all file descriptors are created non-inheritable. See PEP 446. 987 try: 988 sock = socket.socket( 989 af, socktype | getattr(socket, 'SOCK_CLOEXEC', 0), proto) 990 except socket.error: 991 # Can SOCK_CLOEXEC be defined even if the kernel doesn't support 992 # it? 993 sock = socket.socket(af, socktype, proto) 994 # Fallback when SOCK_CLOEXEC isn't available. 995 _set_non_inheritable_non_atomic(sock.fileno()) 996 try: 997 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 998 sock.settimeout(options.connect_timeout) 999 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1000 options.socket_keepalive) 1001 if options.socket_keepalive: 1002 _set_keepalive_times(sock) 1003 sock.connect(sa) 1004 return sock 1005 except socket.error as e: 1006 err = e 1007 sock.close() 1008 1009 if err is not None: 1010 raise err 1011 else: 1012 # This likely means we tried to connect to an IPv6 only 1013 # host with an OS/kernel or Python interpreter that doesn't 1014 # support IPv6. The test case is Jython2.5.1 which doesn't 1015 # support IPv6 at all. 1016 raise socket.error('getaddrinfo failed') 1017 1018 1019def _configured_socket(address, options): 1020 """Given (host, port) and PoolOptions, return a configured socket. 1021 1022 Can raise socket.error, ConnectionFailure, or CertificateError. 1023 1024 Sets socket's SSL and timeout options. 1025 """ 1026 sock = _create_connection(address, options) 1027 ssl_context = options.ssl_context 1028 1029 if ssl_context is not None: 1030 host = address[0] 1031 try: 1032 # According to RFC6066, section 3, IPv4 and IPv6 literals are 1033 # not permitted for SNI hostname. 1034 # Previous to Python 3.7 wrap_socket would blindly pass 1035 # IP addresses as SNI hostname. 1036 # https://bugs.python.org/issue32185 1037 # We have to pass hostname / ip address to wrap_socket 1038 # to use SSLContext.check_hostname. 1039 if _HAVE_SNI and (not is_ip_address(host) or _IPADDR_SAFE): 1040 sock = ssl_context.wrap_socket(sock, server_hostname=host) 1041 else: 1042 sock = ssl_context.wrap_socket(sock) 1043 except CertificateError: 1044 sock.close() 1045 # Raise CertificateError directly like we do after match_hostname 1046 # below. 1047 raise 1048 except (IOError, OSError, _SSLError) as exc: 1049 sock.close() 1050 # We raise AutoReconnect for transient and permanent SSL handshake 1051 # failures alike. Permanent handshake failures, like protocol 1052 # mismatch, will be turned into ServerSelectionTimeoutErrors later. 1053 _raise_connection_failure(address, exc, "SSL handshake failed: ") 1054 if (ssl_context.verify_mode and not 1055 getattr(ssl_context, "check_hostname", False) and 1056 options.ssl_match_hostname): 1057 try: 1058 match_hostname(sock.getpeercert(), hostname=host) 1059 except CertificateError: 1060 sock.close() 1061 raise 1062 1063 sock.settimeout(options.socket_timeout) 1064 return sock 1065 1066 1067class _PoolClosedError(PyMongoError): 1068 """Internal error raised when a thread tries to get a connection from a 1069 closed pool. 1070 """ 1071 pass 1072 1073 1074class _PoolGeneration(object): 1075 def __init__(self): 1076 # Maps service_id to generation. 1077 self._generations = collections.defaultdict(int) 1078 # Overall pool generation. 1079 self._generation = 0 1080 1081 def get(self, service_id): 1082 """Get the generation for the given service_id.""" 1083 if service_id is None: 1084 return self._generation 1085 return self._generations[service_id] 1086 1087 def get_overall(self): 1088 """Get the Pool's overall generation.""" 1089 return self._generation 1090 1091 def inc(self, service_id): 1092 """Increment the generation for the given service_id.""" 1093 self._generation += 1 1094 if service_id is None: 1095 for service_id in self._generations: 1096 self._generations[service_id] += 1 1097 else: 1098 self._generations[service_id] += 1 1099 1100 def stale(self, gen, service_id): 1101 """Return if the given generation for a given service_id is stale.""" 1102 return gen != self.get(service_id) 1103 1104 1105class PoolState(object): 1106 PAUSED = 1 1107 READY = 2 1108 CLOSED = 3 1109 1110 1111# Do *not* explicitly inherit from object or Jython won't call __del__ 1112# http://bugs.jython.org/issue1057 1113class Pool: 1114 def __init__(self, address, options, handshake=True): 1115 """ 1116 :Parameters: 1117 - `address`: a (hostname, port) tuple 1118 - `options`: a PoolOptions instance 1119 - `handshake`: whether to call hello for each new SocketInfo 1120 """ 1121 # Check a socket's health with socket_closed() every once in a while. 1122 # Can override for testing: 0 to always check, None to never check. 1123 self._check_interval_seconds = 1 1124 # LIFO pool. Sockets are ordered on idle time. Sockets claimed 1125 # and returned to pool from the left side. Stale sockets removed 1126 # from the right side. 1127 self.sockets = collections.deque() 1128 self.lock = threading.Lock() 1129 self.active_sockets = 0 1130 # Monotonically increasing connection ID required for CMAP Events. 1131 self.next_connection_id = 1 1132 self.closed = False 1133 # Track whether the sockets in this pool are writeable or not. 1134 self.is_writable = None 1135 1136 # Keep track of resets, so we notice sockets created before the most 1137 # recent reset and close them. 1138 # self.generation = 0 1139 self.gen = _PoolGeneration() 1140 self.pid = os.getpid() 1141 self.address = address 1142 self.opts = options 1143 self.handshake = handshake 1144 # Don't publish events in Monitor pools. 1145 self.enabled_for_cmap = ( 1146 self.handshake and 1147 self.opts.event_listeners is not None and 1148 self.opts.event_listeners.enabled_for_cmap) 1149 1150 if (self.opts.wait_queue_multiple is None or 1151 self.opts.max_pool_size is None): 1152 max_waiters = None 1153 else: 1154 max_waiters = ( 1155 self.opts.max_pool_size * self.opts.wait_queue_multiple) 1156 1157 self._socket_semaphore = thread_util.create_semaphore( 1158 self.opts.max_pool_size, max_waiters) 1159 if self.enabled_for_cmap: 1160 self.opts.event_listeners.publish_pool_created( 1161 self.address, self.opts.non_default_options) 1162 # Retain references to pinned connections to prevent the CPython GC 1163 # from thinking that a cursor's pinned connection can be GC'd when the 1164 # cursor is GC'd (see PYTHON-2751). 1165 self.__pinned_sockets = set() 1166 self.ncursors = 0 1167 self.ntxns = 0 1168 1169 def _reset(self, close, service_id=None): 1170 with self.lock: 1171 if self.closed: 1172 return 1173 self.gen.inc(service_id) 1174 newpid = os.getpid() 1175 if self.pid != newpid: 1176 self.pid = newpid 1177 self.active_sockets = 0 1178 if service_id is None: 1179 sockets, self.sockets = self.sockets, collections.deque() 1180 else: 1181 discard = collections.deque() 1182 keep = collections.deque() 1183 for sock_info in self.sockets: 1184 if sock_info.service_id == service_id: 1185 discard.append(sock_info) 1186 else: 1187 keep.append(sock_info) 1188 sockets = discard 1189 self.sockets = keep 1190 1191 if close: 1192 self.closed = True 1193 1194 listeners = self.opts.event_listeners 1195 # CMAP spec says that close() MUST close sockets before publishing the 1196 # PoolClosedEvent but that reset() SHOULD close sockets *after* 1197 # publishing the PoolClearedEvent. 1198 if close: 1199 for sock_info in sockets: 1200 sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED) 1201 if self.enabled_for_cmap: 1202 listeners.publish_pool_closed(self.address) 1203 else: 1204 if self.enabled_for_cmap: 1205 listeners.publish_pool_cleared(self.address, 1206 service_id=service_id) 1207 for sock_info in sockets: 1208 sock_info.close_socket(ConnectionClosedReason.STALE) 1209 1210 def update_is_writable(self, is_writable): 1211 """Updates the is_writable attribute on all sockets currently in the 1212 Pool. 1213 """ 1214 self.is_writable = is_writable 1215 with self.lock: 1216 for socket in self.sockets: 1217 socket.update_is_writable(self.is_writable) 1218 1219 def reset(self, service_id=None): 1220 self._reset(close=False, service_id=service_id) 1221 1222 def close(self): 1223 self._reset(close=True) 1224 1225 def stale_generation(self, gen, service_id): 1226 return self.gen.stale(gen, service_id) 1227 1228 def remove_stale_sockets(self, reference_generation, all_credentials): 1229 """Removes stale sockets then adds new ones if pool is too small and 1230 has not been reset. The `reference_generation` argument specifies the 1231 `generation` at the point in time this operation was requested on the 1232 pool. 1233 """ 1234 if self.opts.max_idle_time_seconds is not None: 1235 with self.lock: 1236 while (self.sockets and 1237 self.sockets[-1].idle_time_seconds() > self.opts.max_idle_time_seconds): 1238 sock_info = self.sockets.pop() 1239 sock_info.close_socket(ConnectionClosedReason.IDLE) 1240 1241 while True: 1242 with self.lock: 1243 if (len(self.sockets) + self.active_sockets >= 1244 self.opts.min_pool_size): 1245 # There are enough sockets in the pool. 1246 break 1247 1248 # We must acquire the semaphore to respect max_pool_size. 1249 if not self._socket_semaphore.acquire(False): 1250 break 1251 try: 1252 sock_info = self.connect(all_credentials) 1253 with self.lock: 1254 # Close connection and return if the pool was reset during 1255 # socket creation or while acquiring the pool lock. 1256 if self.gen.get_overall() != reference_generation: 1257 sock_info.close_socket(ConnectionClosedReason.STALE) 1258 break 1259 self.sockets.appendleft(sock_info) 1260 finally: 1261 self._socket_semaphore.release() 1262 1263 def connect(self, all_credentials=None): 1264 """Connect to Mongo and return a new SocketInfo. 1265 1266 Can raise ConnectionFailure or CertificateError. 1267 1268 Note that the pool does not keep a reference to the socket -- you 1269 must call return_socket() when you're done with it. 1270 """ 1271 with self.lock: 1272 conn_id = self.next_connection_id 1273 self.next_connection_id += 1 1274 1275 listeners = self.opts.event_listeners 1276 if self.enabled_for_cmap: 1277 listeners.publish_connection_created(self.address, conn_id) 1278 1279 try: 1280 sock = _configured_socket(self.address, self.opts) 1281 except BaseException as error: 1282 if self.enabled_for_cmap: 1283 listeners.publish_connection_closed( 1284 self.address, conn_id, ConnectionClosedReason.ERROR) 1285 1286 if isinstance(error, (IOError, OSError, _SSLError)): 1287 _raise_connection_failure(self.address, error) 1288 1289 raise 1290 1291 sock_info = SocketInfo(sock, self, self.address, conn_id) 1292 try: 1293 if self.handshake: 1294 sock_info.hello(all_credentials) 1295 self.is_writable = sock_info.is_writable 1296 1297 sock_info.check_auth(all_credentials) 1298 except BaseException: 1299 sock_info.close_socket(ConnectionClosedReason.ERROR) 1300 raise 1301 1302 return sock_info 1303 1304 @contextlib.contextmanager 1305 def get_socket(self, all_credentials, handler=None): 1306 """Get a socket from the pool. Use with a "with" statement. 1307 1308 Returns a :class:`SocketInfo` object wrapping a connected 1309 :class:`socket.socket`. 1310 1311 This method should always be used in a with-statement:: 1312 1313 with pool.get_socket(credentials) as socket_info: 1314 socket_info.send_message(msg) 1315 data = socket_info.receive_message(op_code, request_id) 1316 1317 The socket is logged in or out as needed to match ``all_credentials`` 1318 using the correct authentication mechanism for the server's wire 1319 protocol version. 1320 1321 Can raise ConnectionFailure or OperationFailure. 1322 1323 :Parameters: 1324 - `all_credentials`: dict, maps auth source to MongoCredential. 1325 - `handler` (optional): A _MongoClientErrorHandler. 1326 """ 1327 listeners = self.opts.event_listeners 1328 if self.enabled_for_cmap: 1329 listeners.publish_connection_check_out_started(self.address) 1330 1331 sock_info = self._get_socket(all_credentials) 1332 if self.enabled_for_cmap: 1333 listeners.publish_connection_checked_out( 1334 self.address, sock_info.id) 1335 try: 1336 yield sock_info 1337 except: 1338 # Exception in caller. Ensure the connection gets returned. 1339 # Note that when pinned is True, the session owns the 1340 # connection and it is responsible for checking the connection 1341 # back into the pool. 1342 pinned = sock_info.pinned_txn or sock_info.pinned_cursor 1343 if handler: 1344 # Perform SDAM error handling rules while the connection is 1345 # still checked out. 1346 exc_type, exc_val, _ = sys.exc_info() 1347 handler.handle(exc_type, exc_val) 1348 if not pinned and sock_info.active: 1349 self.return_socket(sock_info) 1350 raise 1351 if sock_info.pinned_txn: 1352 with self.lock: 1353 self.__pinned_sockets.add(sock_info) 1354 self.ntxns += 1 1355 elif sock_info.pinned_cursor: 1356 with self.lock: 1357 self.__pinned_sockets.add(sock_info) 1358 self.ncursors += 1 1359 elif sock_info.active: 1360 self.return_socket(sock_info) 1361 1362 def _get_socket(self, all_credentials): 1363 """Get or create a SocketInfo. Can raise ConnectionFailure.""" 1364 # We use the pid here to avoid issues with fork / multiprocessing. 1365 # See test.test_client:TestClient.test_fork for an example of 1366 # what could go wrong otherwise 1367 if self.pid != os.getpid(): 1368 self.reset() 1369 1370 if self.closed: 1371 if self.enabled_for_cmap: 1372 self.opts.event_listeners.publish_connection_check_out_failed( 1373 self.address, ConnectionCheckOutFailedReason.POOL_CLOSED) 1374 raise _PoolClosedError( 1375 'Attempted to check out a connection from closed connection ' 1376 'pool') 1377 1378 # Get a free socket or create one. 1379 if not self._socket_semaphore.acquire( 1380 True, self.opts.wait_queue_timeout): 1381 self._raise_wait_queue_timeout() 1382 1383 # We've now acquired the semaphore and must release it on error. 1384 sock_info = None 1385 incremented = False 1386 try: 1387 with self.lock: 1388 self.active_sockets += 1 1389 incremented = True 1390 1391 while sock_info is None: 1392 try: 1393 with self.lock: 1394 sock_info = self.sockets.popleft() 1395 except IndexError: 1396 # Can raise ConnectionFailure or CertificateError. 1397 sock_info = self.connect(all_credentials) 1398 else: 1399 if self._perished(sock_info): 1400 sock_info = None 1401 sock_info.check_auth(all_credentials) 1402 except BaseException: 1403 if sock_info: 1404 # We checked out a socket but authentication failed. 1405 sock_info.close_socket(ConnectionClosedReason.ERROR) 1406 self._socket_semaphore.release() 1407 1408 if incremented: 1409 with self.lock: 1410 self.active_sockets -= 1 1411 1412 if self.enabled_for_cmap: 1413 self.opts.event_listeners.publish_connection_check_out_failed( 1414 self.address, ConnectionCheckOutFailedReason.CONN_ERROR) 1415 raise 1416 1417 sock_info.active = True 1418 return sock_info 1419 1420 def return_socket(self, sock_info): 1421 """Return the socket to the pool, or if it's closed discard it. 1422 1423 :Parameters: 1424 - `sock_info`: The socket to check into the pool. 1425 """ 1426 txn = sock_info.pinned_txn 1427 cursor = sock_info.pinned_cursor 1428 sock_info.active = False 1429 sock_info.pinned_txn = False 1430 sock_info.pinned_cursor = False 1431 self.__pinned_sockets.discard(sock_info) 1432 listeners = self.opts.event_listeners 1433 if self.enabled_for_cmap: 1434 listeners.publish_connection_checked_in(self.address, sock_info.id) 1435 if self.pid != os.getpid(): 1436 self.reset() 1437 else: 1438 if self.closed: 1439 sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED) 1440 elif sock_info.closed: 1441 # CMAP requires the closed event be emitted after the check in. 1442 if self.enabled_for_cmap: 1443 listeners.publish_connection_closed( 1444 self.address, sock_info.id, 1445 ConnectionClosedReason.ERROR) 1446 else: 1447 with self.lock: 1448 # Hold the lock to ensure this section does not race with 1449 # Pool.reset(). 1450 if self.stale_generation(sock_info.generation, 1451 sock_info.service_id): 1452 sock_info.close_socket(ConnectionClosedReason.STALE) 1453 else: 1454 sock_info.update_last_checkin_time() 1455 sock_info.update_is_writable(self.is_writable) 1456 self.sockets.appendleft(sock_info) 1457 1458 self._socket_semaphore.release() 1459 with self.lock: 1460 if txn: 1461 self.ntxns -= 1 1462 elif cursor: 1463 self.ncursors -= 1 1464 self.active_sockets -= 1 1465 1466 def _perished(self, sock_info): 1467 """Return True and close the connection if it is "perished". 1468 1469 This side-effecty function checks if this socket has been idle for 1470 for longer than the max idle time, or if the socket has been closed by 1471 some external network error, or if the socket's generation is outdated. 1472 1473 Checking sockets lets us avoid seeing *some* 1474 :class:`~pymongo.errors.AutoReconnect` exceptions on server 1475 hiccups, etc. We only check if the socket was closed by an external 1476 error if it has been > 1 second since the socket was checked into the 1477 pool, to keep performance reasonable - we can't avoid AutoReconnects 1478 completely anyway. 1479 """ 1480 idle_time_seconds = sock_info.idle_time_seconds() 1481 # If socket is idle, open a new one. 1482 if (self.opts.max_idle_time_seconds is not None and 1483 idle_time_seconds > self.opts.max_idle_time_seconds): 1484 sock_info.close_socket(ConnectionClosedReason.IDLE) 1485 return True 1486 1487 if (self._check_interval_seconds is not None and ( 1488 0 == self._check_interval_seconds or 1489 idle_time_seconds > self._check_interval_seconds)): 1490 if sock_info.socket_closed(): 1491 sock_info.close_socket(ConnectionClosedReason.ERROR) 1492 return True 1493 1494 if self.stale_generation(sock_info.generation, sock_info.service_id): 1495 sock_info.close_socket(ConnectionClosedReason.STALE) 1496 return True 1497 1498 return False 1499 1500 def _raise_wait_queue_timeout(self): 1501 listeners = self.opts.event_listeners 1502 if self.enabled_for_cmap: 1503 listeners.publish_connection_check_out_failed( 1504 self.address, ConnectionCheckOutFailedReason.TIMEOUT) 1505 if self.opts.load_balanced: 1506 other_ops = self.active_sockets - self.ncursors - self.ntxns 1507 raise ConnectionFailure( 1508 'Timeout waiting for connection from the connection pool. ' 1509 'maxPoolSize: %s, connections in use by cursors: %s, ' 1510 'connections in use by transactions: %s, connections in use ' 1511 'by other operations: %s, wait_queue_timeout: %s' % ( 1512 self.opts.max_pool_size, self.ncursors, self.ntxns, 1513 other_ops, self.opts.wait_queue_timeout)) 1514 raise ConnectionFailure( 1515 'Timed out while checking out a connection from connection pool. ' 1516 'maxPoolSize: %s, wait_queue_timeout: %s' % ( 1517 self.opts.max_pool_size, self.opts.wait_queue_timeout)) 1518 1519 def __del__(self): 1520 # Avoid ResourceWarnings in Python 3 1521 # Close all sockets without calling reset() or close() because it is 1522 # not safe to acquire a lock in __del__. 1523 for sock_info in self.sockets: 1524 sock_info.close_socket(None) 1525