1# Copyright 2011-present MongoDB, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you
4# may not use this file except in compliance with the License.  You
5# may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.  See the License for the specific language governing
13# permissions and limitations under the License.
14
15import contextlib
16import copy
17import os
18import platform
19import socket
20import sys
21import threading
22import collections
23import weakref
24
25from pymongo.ssl_support import (
26    SSLError as _SSLError,
27    HAS_SNI as _HAVE_SNI,
28    IPADDR_SAFE as _IPADDR_SAFE)
29
30from bson import DEFAULT_CODEC_OPTIONS
31from bson.py3compat import imap, itervalues, _unicode
32from bson.son import SON
33from pymongo import auth, helpers, thread_util, __version__
34from pymongo.client_session import _validate_session_write_concern
35from pymongo.common import (MAX_BSON_SIZE,
36                            MAX_IDLE_TIME_SEC,
37                            MAX_MESSAGE_SIZE,
38                            MAX_POOL_SIZE,
39                            MAX_WIRE_VERSION,
40                            MAX_WRITE_BATCH_SIZE,
41                            MIN_POOL_SIZE,
42                            ORDERED_TYPES,
43                            WAIT_QUEUE_TIMEOUT)
44from pymongo.errors import (AutoReconnect,
45                            CertificateError,
46                            ConnectionFailure,
47                            ConfigurationError,
48                            InvalidOperation,
49                            DocumentTooLarge,
50                            NetworkTimeout,
51                            NotPrimaryError,
52                            OperationFailure,
53                            PyMongoError)
54from pymongo.hello_compat import HelloCompat
55from pymongo._ipaddress import is_ip_address
56from pymongo.ismaster import IsMaster
57from pymongo.monotonic import time as _time
58from pymongo.monitoring import (ConnectionCheckOutFailedReason,
59                                ConnectionClosedReason)
60from pymongo.network import (command,
61                             receive_message)
62from pymongo.read_preferences import ReadPreference
63from pymongo.server_api import _add_to_command
64from pymongo.server_type import SERVER_TYPE
65from pymongo.socket_checker import SocketChecker
66# Always use our backport so we always have support for IP address matching
67from pymongo.ssl_match_hostname import match_hostname
68
69try:
70    from fcntl import fcntl, F_GETFD, F_SETFD, FD_CLOEXEC
71    def _set_non_inheritable_non_atomic(fd):
72        """Set the close-on-exec flag on the given file descriptor."""
73        flags = fcntl(fd, F_GETFD)
74        fcntl(fd, F_SETFD, flags | FD_CLOEXEC)
75except ImportError:
76    # Windows, various platforms we don't claim to support
77    # (Jython, IronPython, ...), systems that don't provide
78    # everything we need from fcntl, etc.
79    def _set_non_inheritable_non_atomic(dummy):
80        """Dummy function for platforms that don't provide fcntl."""
81        pass
82
83_MAX_TCP_KEEPIDLE = 120
84_MAX_TCP_KEEPINTVL = 10
85_MAX_TCP_KEEPCNT = 9
86
87if sys.platform == 'win32':
88    try:
89        import _winreg as winreg
90    except ImportError:
91        import winreg
92
93    def _query(key, name, default):
94        try:
95            value, _ = winreg.QueryValueEx(key, name)
96            # Ensure the value is a number or raise ValueError.
97            return int(value)
98        except (OSError, ValueError):
99            # QueryValueEx raises OSError when the key does not exist (i.e.
100            # the system is using the Windows default value).
101            return default
102
103    try:
104        with winreg.OpenKey(
105                winreg.HKEY_LOCAL_MACHINE,
106                r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters") as key:
107            _WINDOWS_TCP_IDLE_MS = _query(key, "KeepAliveTime", 7200000)
108            _WINDOWS_TCP_INTERVAL_MS = _query(key, "KeepAliveInterval", 1000)
109    except OSError:
110        # We could not check the default values because winreg.OpenKey failed.
111        # Assume the system is using the default values.
112        _WINDOWS_TCP_IDLE_MS = 7200000
113        _WINDOWS_TCP_INTERVAL_MS = 1000
114
115    def _set_keepalive_times(sock):
116        idle_ms = min(_WINDOWS_TCP_IDLE_MS, _MAX_TCP_KEEPIDLE * 1000)
117        interval_ms = min(_WINDOWS_TCP_INTERVAL_MS,
118                          _MAX_TCP_KEEPINTVL * 1000)
119        if (idle_ms < _WINDOWS_TCP_IDLE_MS or
120                interval_ms < _WINDOWS_TCP_INTERVAL_MS):
121            sock.ioctl(socket.SIO_KEEPALIVE_VALS,
122                       (1, idle_ms, interval_ms))
123else:
124    def _set_tcp_option(sock, tcp_option, max_value):
125        if hasattr(socket, tcp_option):
126            sockopt = getattr(socket, tcp_option)
127            try:
128                # PYTHON-1350 - NetBSD doesn't implement getsockopt for
129                # TCP_KEEPIDLE and friends. Don't attempt to set the
130                # values there.
131                default = sock.getsockopt(socket.IPPROTO_TCP, sockopt)
132                if default > max_value:
133                    sock.setsockopt(socket.IPPROTO_TCP, sockopt, max_value)
134            except socket.error:
135                pass
136
137    def _set_keepalive_times(sock):
138        _set_tcp_option(sock, 'TCP_KEEPIDLE', _MAX_TCP_KEEPIDLE)
139        _set_tcp_option(sock, 'TCP_KEEPINTVL', _MAX_TCP_KEEPINTVL)
140        _set_tcp_option(sock, 'TCP_KEEPCNT', _MAX_TCP_KEEPCNT)
141
142_METADATA = SON([
143    ('driver', SON([('name', 'PyMongo'), ('version', __version__)])),
144])
145
146if sys.platform.startswith('linux'):
147    # platform.linux_distribution was deprecated in Python 3.5.
148    if sys.version_info[:2] < (3, 5):
149        # Distro name and version (e.g. Ubuntu 16.04 xenial)
150        _name = ' '.join([part for part in
151                          platform.linux_distribution() if part])
152    else:
153        _name = platform.system()
154    _METADATA['os'] = SON([
155        ('type', platform.system()),
156        ('name', _name),
157        ('architecture', platform.machine()),
158        # Kernel version (e.g. 4.4.0-17-generic).
159        ('version', platform.release())
160    ])
161elif sys.platform == 'darwin':
162    _METADATA['os'] = SON([
163        ('type', platform.system()),
164        ('name', platform.system()),
165        ('architecture', platform.machine()),
166        # (mac|i|tv)OS(X) version (e.g. 10.11.6) instead of darwin
167        # kernel version.
168        ('version', platform.mac_ver()[0])
169    ])
170elif sys.platform == 'win32':
171    _METADATA['os'] = SON([
172        ('type', platform.system()),
173        # "Windows XP", "Windows 7", "Windows 10", etc.
174        ('name', ' '.join((platform.system(), platform.release()))),
175        ('architecture', platform.machine()),
176        # Windows patch level (e.g. 5.1.2600-SP3)
177        ('version', '-'.join(platform.win32_ver()[1:3]))
178    ])
179elif sys.platform.startswith('java'):
180    _name, _ver, _arch = platform.java_ver()[-1]
181    _METADATA['os'] = SON([
182        # Linux, Windows 7, Mac OS X, etc.
183        ('type', _name),
184        ('name', _name),
185        # x86, x86_64, AMD64, etc.
186        ('architecture', _arch),
187        # Linux kernel version, OSX version, etc.
188        ('version', _ver)
189    ])
190else:
191    # Get potential alias (e.g. SunOS 5.11 becomes Solaris 2.11)
192    _aliased = platform.system_alias(
193        platform.system(), platform.release(), platform.version())
194    _METADATA['os'] = SON([
195        ('type', platform.system()),
196        ('name', ' '.join([part for part in _aliased[:2] if part])),
197        ('architecture', platform.machine()),
198        ('version', _aliased[2])
199    ])
200
201if platform.python_implementation().startswith('PyPy'):
202    _METADATA['platform'] = ' '.join(
203        (platform.python_implementation(),
204         '.'.join(imap(str, sys.pypy_version_info)),
205         '(Python %s)' % '.'.join(imap(str, sys.version_info))))
206elif sys.platform.startswith('java'):
207    _METADATA['platform'] = ' '.join(
208        (platform.python_implementation(),
209         '.'.join(imap(str, sys.version_info)),
210         '(%s)' % ' '.join((platform.system(), platform.release()))))
211else:
212    _METADATA['platform'] = ' '.join(
213        (platform.python_implementation(),
214         '.'.join(imap(str, sys.version_info))))
215
216
217# If the first getaddrinfo call of this interpreter's life is on a thread,
218# while the main thread holds the import lock, getaddrinfo deadlocks trying
219# to import the IDNA codec. Import it here, where presumably we're on the
220# main thread, to avoid the deadlock. See PYTHON-607.
221u'foo'.encode('idna')
222
223# Remove after PYTHON-2712
224_MOCK_SERVICE_ID = False
225
226
227def _raise_connection_failure(address, error, msg_prefix=None):
228    """Convert a socket.error to ConnectionFailure and raise it."""
229    host, port = address
230    # If connecting to a Unix socket, port will be None.
231    if port is not None:
232        msg = '%s:%d: %s' % (host, port, error)
233    else:
234        msg = '%s: %s' % (host, error)
235    if msg_prefix:
236        msg = msg_prefix + msg
237    if isinstance(error, socket.timeout):
238        raise NetworkTimeout(msg)
239    elif isinstance(error, _SSLError) and 'timed out' in str(error):
240        # CPython 2.7 and PyPy 2.x do not distinguish network
241        # timeouts from other SSLErrors (https://bugs.python.org/issue10272).
242        # Luckily, we can work around this limitation because the phrase
243        # 'timed out' appears in all the timeout related SSLErrors raised
244        # on the above platforms.
245        raise NetworkTimeout(msg)
246    else:
247        raise AutoReconnect(msg)
248
249
250class PoolOptions(object):
251
252    __slots__ = ('__max_pool_size', '__min_pool_size',
253                 '__max_idle_time_seconds',
254                 '__connect_timeout', '__socket_timeout',
255                 '__wait_queue_timeout', '__wait_queue_multiple',
256                 '__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
257                 '__event_listeners', '__appname', '__driver', '__metadata',
258                 '__compression_settings', '__server_api', '__load_balanced')
259
260    def __init__(self, max_pool_size=MAX_POOL_SIZE,
261                 min_pool_size=MIN_POOL_SIZE,
262                 max_idle_time_seconds=MAX_IDLE_TIME_SEC, connect_timeout=None,
263                 socket_timeout=None, wait_queue_timeout=WAIT_QUEUE_TIMEOUT,
264                 wait_queue_multiple=None, ssl_context=None,
265                 ssl_match_hostname=True, socket_keepalive=True,
266                 event_listeners=None, appname=None, driver=None,
267                 compression_settings=None, server_api=None,
268                 load_balanced=None):
269        self.__max_pool_size = max_pool_size
270        self.__min_pool_size = min_pool_size
271        self.__max_idle_time_seconds = max_idle_time_seconds
272        self.__connect_timeout = connect_timeout
273        self.__socket_timeout = socket_timeout
274        self.__wait_queue_timeout = wait_queue_timeout
275        self.__wait_queue_multiple = wait_queue_multiple
276        self.__ssl_context = ssl_context
277        self.__ssl_match_hostname = ssl_match_hostname
278        self.__socket_keepalive = socket_keepalive
279        self.__event_listeners = event_listeners
280        self.__appname = appname
281        self.__driver = driver
282        self.__compression_settings = compression_settings
283        self.__server_api = server_api
284        self.__load_balanced = load_balanced
285        self.__metadata = copy.deepcopy(_METADATA)
286        if appname:
287            self.__metadata['application'] = {'name': appname}
288
289        # Combine the "driver" MongoClient option with PyMongo's info, like:
290        # {
291        #    'driver': {
292        #        'name': 'PyMongo|MyDriver',
293        #        'version': '3.7.0|1.2.3',
294        #    },
295        #    'platform': 'CPython 3.6.0|MyPlatform'
296        # }
297        if driver:
298            if driver.name:
299                self.__metadata['driver']['name'] = "%s|%s" % (
300                    _METADATA['driver']['name'], driver.name)
301            if driver.version:
302                self.__metadata['driver']['version'] = "%s|%s" % (
303                    _METADATA['driver']['version'], driver.version)
304            if driver.platform:
305                self.__metadata['platform'] = "%s|%s" % (
306                    _METADATA['platform'], driver.platform)
307
308    @property
309    def non_default_options(self):
310        """The non-default options this pool was created with.
311
312        Added for CMAP's :class:`PoolCreatedEvent`.
313        """
314        opts = {}
315        if self.__max_pool_size != MAX_POOL_SIZE:
316            opts['maxPoolSize'] = self.__max_pool_size
317        if self.__min_pool_size != MIN_POOL_SIZE:
318            opts['minPoolSize'] = self.__min_pool_size
319        if self.__max_idle_time_seconds != MAX_IDLE_TIME_SEC:
320            opts['maxIdleTimeMS'] = self.__max_idle_time_seconds * 1000
321        if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT:
322            opts['waitQueueTimeoutMS'] = self.__wait_queue_timeout * 1000
323        return opts
324
325    @property
326    def max_pool_size(self):
327        """The maximum allowable number of concurrent connections to each
328        connected server. Requests to a server will block if there are
329        `maxPoolSize` outstanding connections to the requested server.
330        Defaults to 100. Cannot be 0.
331
332        When a server's pool has reached `max_pool_size`, operations for that
333        server block waiting for a socket to be returned to the pool. If
334        ``waitQueueTimeoutMS`` is set, a blocked operation will raise
335        :exc:`~pymongo.errors.ConnectionFailure` after a timeout.
336        By default ``waitQueueTimeoutMS`` is not set.
337        """
338        return self.__max_pool_size
339
340    @property
341    def min_pool_size(self):
342        """The minimum required number of concurrent connections that the pool
343        will maintain to each connected server. Default is 0.
344        """
345        return self.__min_pool_size
346
347    @property
348    def max_idle_time_seconds(self):
349        """The maximum number of seconds that a connection can remain
350        idle in the pool before being removed and replaced. Defaults to
351        `None` (no limit).
352        """
353        return self.__max_idle_time_seconds
354
355    @property
356    def connect_timeout(self):
357        """How long a connection can take to be opened before timing out.
358        """
359        return self.__connect_timeout
360
361    @property
362    def socket_timeout(self):
363        """How long a send or receive on a socket can take before timing out.
364        """
365        return self.__socket_timeout
366
367    @property
368    def wait_queue_timeout(self):
369        """How long a thread will wait for a socket from the pool if the pool
370        has no free sockets.
371        """
372        return self.__wait_queue_timeout
373
374    @property
375    def wait_queue_multiple(self):
376        """Multiplied by max_pool_size to give the number of threads allowed
377        to wait for a socket at one time.
378        """
379        return self.__wait_queue_multiple
380
381    @property
382    def ssl_context(self):
383        """An SSLContext instance or None.
384        """
385        return self.__ssl_context
386
387    @property
388    def ssl_match_hostname(self):
389        """Call ssl.match_hostname if cert_reqs is not ssl.CERT_NONE.
390        """
391        return self.__ssl_match_hostname
392
393    @property
394    def socket_keepalive(self):
395        """Whether to send periodic messages to determine if a connection
396        is closed.
397        """
398        return self.__socket_keepalive
399
400    @property
401    def event_listeners(self):
402        """An instance of pymongo.monitoring._EventListeners.
403        """
404        return self.__event_listeners
405
406    @property
407    def appname(self):
408        """The application name, for sending with hello in server handshake.
409        """
410        return self.__appname
411
412    @property
413    def driver(self):
414        """Driver name and version, for sending with hello in handshake.
415        """
416        return self.__driver
417
418    @property
419    def compression_settings(self):
420        return self.__compression_settings
421
422    @property
423    def metadata(self):
424        """A dict of metadata about the application, driver, os, and platform.
425        """
426        return self.__metadata.copy()
427
428    @property
429    def server_api(self):
430        """A pymongo.server_api.ServerApi or None.
431        """
432        return self.__server_api
433
434    @property
435    def load_balanced(self):
436        """True if this Pool is configured in load balanced mode.
437        """
438        return self.__load_balanced
439
440
441def _negotiate_creds(all_credentials):
442    """Return one credential that needs mechanism negotiation, if any.
443    """
444    if all_credentials:
445        for creds in all_credentials.values():
446            if creds.mechanism == 'DEFAULT' and creds.username:
447                return creds
448    return None
449
450
451def _speculative_context(all_credentials):
452    """Return the _AuthContext to use for speculative auth, if any.
453    """
454    if all_credentials and len(all_credentials) == 1:
455        creds = next(itervalues(all_credentials))
456        return auth._AuthContext.from_credentials(creds)
457    return None
458
459
460class _CancellationContext(object):
461    def __init__(self):
462        self._cancelled = False
463
464    def cancel(self):
465        """Cancel this context."""
466        self._cancelled = True
467
468    @property
469    def cancelled(self):
470        """Was cancel called?"""
471        return self._cancelled
472
473
474class SocketInfo(object):
475    """Store a socket with some metadata.
476
477    :Parameters:
478      - `sock`: a raw socket object
479      - `pool`: a Pool instance
480      - `address`: the server's (host, port)
481      - `id`: the id of this socket in it's pool
482    """
483    def __init__(self, sock, pool, address, id):
484        self.pool_ref = weakref.ref(pool)
485        self.sock = sock
486        self.address = address
487        self.id = id
488        self.authset = set()
489        self.closed = False
490        self.last_checkin_time = _time()
491        self.performed_handshake = False
492        self.is_writable = False
493        self.max_wire_version = MAX_WIRE_VERSION
494        self.max_bson_size = MAX_BSON_SIZE
495        self.max_message_size = MAX_MESSAGE_SIZE
496        self.max_write_batch_size = MAX_WRITE_BATCH_SIZE
497        self.supports_sessions = False
498        self.hello_ok = None
499        self.is_mongos = False
500        self.op_msg_enabled = False
501        self.listeners = pool.opts.event_listeners
502        self.enabled_for_cmap = pool.enabled_for_cmap
503        self.compression_settings = pool.opts.compression_settings
504        self.compression_context = None
505        self.socket_checker = SocketChecker()
506        # Support for mechanism negotiation on the initial handshake.
507        # Maps credential to saslSupportedMechs.
508        self.negotiated_mechanisms = {}
509        self.auth_ctx = {}
510
511        # The pool's generation changes with each reset() so we can close
512        # sockets created before the last reset.
513        self.pool_gen = pool.gen
514        self.generation = self.pool_gen.get_overall()
515        self.ready = False
516        self.cancel_context = None
517        if not pool.handshake:
518            # This is a Monitor connection.
519            self.cancel_context = _CancellationContext()
520        self.opts = pool.opts
521        self.more_to_come = False
522        # For load balancer support.
523        self.service_id = None
524        # When executing a transaction in load balancing mode, this flag is
525        # set to true to indicate that the session now owns the connection.
526        self.pinned_txn = False
527        self.pinned_cursor = False
528        self.active = False
529
530    def pin_txn(self):
531        self.pinned_txn = True
532        assert not self.pinned_cursor
533
534    def pin_cursor(self):
535        self.pinned_cursor = True
536        assert not self.pinned_txn
537
538    def unpin(self):
539        pool = self.pool_ref()
540        if pool:
541            pool.return_socket(self)
542        else:
543            self.close_socket(ConnectionClosedReason.STALE)
544
545    def hello_cmd(self):
546        if self.opts.server_api or self.hello_ok:
547            return SON([(HelloCompat.CMD, 1)])
548        else:
549            return SON([(HelloCompat.LEGACY_CMD, 1), ('helloOk', True)])
550
551    def hello(self, all_credentials=None):
552        return self._hello(None, None, None, all_credentials)
553
554    def _hello(self, cluster_time, topology_version,
555                  heartbeat_frequency, all_credentials):
556        cmd = self.hello_cmd()
557        performing_handshake = not self.performed_handshake
558        awaitable = False
559        if performing_handshake:
560            self.performed_handshake = True
561            cmd['client'] = self.opts.metadata
562            if self.compression_settings:
563                cmd['compression'] = self.compression_settings.compressors
564            if self.opts.load_balanced:
565                cmd['loadBalanced'] = True
566        elif topology_version is not None:
567            cmd['topologyVersion'] = topology_version
568            cmd['maxAwaitTimeMS'] = int(heartbeat_frequency*1000)
569            awaitable = True
570            # If connect_timeout is None there is no timeout.
571            if self.opts.connect_timeout:
572                self.sock.settimeout(
573                    self.opts.connect_timeout + heartbeat_frequency)
574
575        if self.max_wire_version >= 6 and cluster_time is not None:
576            cmd['$clusterTime'] = cluster_time
577
578        # XXX: Simplify in PyMongo 4.0 when all_credentials is always a single
579        # unchangeable value per MongoClient.
580        creds = _negotiate_creds(all_credentials)
581        if creds:
582            cmd['saslSupportedMechs'] = creds.source + '.' + creds.username
583        auth_ctx = _speculative_context(all_credentials)
584        if auth_ctx:
585            cmd['speculativeAuthenticate'] = auth_ctx.speculate_command()
586
587        doc = self.command('admin', cmd, publish_events=False,
588                           exhaust_allowed=awaitable)
589        # PYTHON-2712 will remove this topologyVersion fallback logic.
590        if self.opts.load_balanced and _MOCK_SERVICE_ID:
591            process_id = doc.get('topologyVersion', {}).get('processId')
592            doc.setdefault('serviceId', process_id)
593        if not self.opts.load_balanced:
594            doc.pop('serviceId', None)
595        hello = IsMaster(doc, awaitable=awaitable)
596        self.is_writable = hello.is_writable
597        self.max_wire_version = hello.max_wire_version
598        self.max_bson_size = hello.max_bson_size
599        self.max_message_size = hello.max_message_size
600        self.max_write_batch_size = hello.max_write_batch_size
601        self.supports_sessions = (
602            hello.logical_session_timeout_minutes is not None)
603        self.hello_ok = hello.hello_ok
604        self.is_mongos = hello.server_type == SERVER_TYPE.Mongos
605        if performing_handshake and self.compression_settings:
606            ctx = self.compression_settings.get_compression_context(
607                hello.compressors)
608            self.compression_context = ctx
609
610        self.op_msg_enabled = hello.max_wire_version >= 6
611        if creds:
612            self.negotiated_mechanisms[creds] = hello.sasl_supported_mechs
613        if auth_ctx:
614            auth_ctx.parse_response(hello)
615            if auth_ctx.speculate_succeeded():
616                self.auth_ctx[auth_ctx.credentials] = auth_ctx
617        if self.opts.load_balanced:
618            if not hello.service_id:
619                raise ConfigurationError(
620                    'Driver attempted to initialize in load balancing mode,'
621                    ' but the server does not support this mode')
622            self.service_id = hello.service_id
623            self.generation = self.pool_gen.get(self.service_id)
624        return hello
625
626    def _next_reply(self):
627        reply = self.receive_message(None)
628        self.more_to_come = reply.more_to_come
629        unpacked_docs = reply.unpack_response()
630        response_doc = unpacked_docs[0]
631        helpers._check_command_response(response_doc, self.max_wire_version)
632        # Remove after PYTHON-2712.
633        if not self.opts.load_balanced:
634            response_doc.pop('serviceId', None)
635        return response_doc
636
637    def command(self, dbname, spec, secondary_ok=False,
638                read_preference=ReadPreference.PRIMARY,
639                codec_options=DEFAULT_CODEC_OPTIONS, check=True,
640                allowable_errors=None, check_keys=False,
641                read_concern=None,
642                write_concern=None,
643                parse_write_concern_error=False,
644                collation=None,
645                session=None,
646                client=None,
647                retryable_write=False,
648                publish_events=True,
649                user_fields=None,
650                exhaust_allowed=False):
651        """Execute a command or raise an error.
652
653        :Parameters:
654          - `dbname`: name of the database on which to run the command
655          - `spec`: a command document as a dict, SON, or mapping object
656          - `secondary_ok`: whether to set the secondaryOkay wire protocol bit
657          - `read_preference`: a read preference
658          - `codec_options`: a CodecOptions instance
659          - `check`: raise OperationFailure if there are errors
660          - `allowable_errors`: errors to ignore if `check` is True
661          - `check_keys`: if True, check `spec` for invalid keys
662          - `read_concern`: The read concern for this command.
663          - `write_concern`: The write concern for this command.
664          - `parse_write_concern_error`: Whether to parse the
665            ``writeConcernError`` field in the command response.
666          - `collation`: The collation for this command.
667          - `session`: optional ClientSession instance.
668          - `client`: optional MongoClient for gossipping $clusterTime.
669          - `retryable_write`: True if this command is a retryable write.
670          - `publish_events`: Should we publish events for this command?
671          - `user_fields` (optional): Response fields that should be decoded
672            using the TypeDecoders from codec_options, passed to
673            bson._decode_all_selective.
674        """
675        self.validate_session(client, session)
676        session = _validate_session_write_concern(session, write_concern)
677
678        # Ensure command name remains in first place.
679        if not isinstance(spec, ORDERED_TYPES):
680            spec = SON(spec)
681
682        if (read_concern and self.max_wire_version < 4
683                and not read_concern.ok_for_legacy):
684            raise ConfigurationError(
685                'read concern level of %s is not valid '
686                'with a max wire version of %d.'
687                % (read_concern.level, self.max_wire_version))
688        if not (write_concern is None or write_concern.acknowledged or
689                collation is None):
690            raise ConfigurationError(
691                'Collation is unsupported for unacknowledged writes.')
692        if (self.max_wire_version >= 5 and
693                write_concern and
694                not write_concern.is_server_default):
695            spec['writeConcern'] = write_concern.document
696        elif self.max_wire_version < 5 and collation is not None:
697            raise ConfigurationError(
698                'Must be connected to MongoDB 3.4+ to use a collation.')
699
700        self.add_server_api(spec)
701        if session:
702            session._apply_to(spec, retryable_write, read_preference,
703                              self)
704        self.send_cluster_time(spec, session, client)
705        listeners = self.listeners if publish_events else None
706        unacknowledged = write_concern and not write_concern.acknowledged
707        if self.op_msg_enabled:
708            self._raise_if_not_writable(unacknowledged)
709        try:
710            return command(self, dbname, spec, secondary_ok,
711                           self.is_mongos, read_preference, codec_options,
712                           session, client, check, allowable_errors,
713                           self.address, check_keys, listeners,
714                           self.max_bson_size, read_concern,
715                           parse_write_concern_error=parse_write_concern_error,
716                           collation=collation,
717                           compression_ctx=self.compression_context,
718                           use_op_msg=self.op_msg_enabled,
719                           unacknowledged=unacknowledged,
720                           user_fields=user_fields,
721                           exhaust_allowed=exhaust_allowed)
722        except (OperationFailure, NotPrimaryError):
723            raise
724        # Catch socket.error, KeyboardInterrupt, etc. and close ourselves.
725        except BaseException as error:
726            self._raise_connection_failure(error)
727
728    def send_message(self, message, max_doc_size):
729        """Send a raw BSON message or raise ConnectionFailure.
730
731        If a network exception is raised, the socket is closed.
732        """
733        if (self.max_bson_size is not None
734                and max_doc_size > self.max_bson_size):
735            raise DocumentTooLarge(
736                "BSON document too large (%d bytes) - the connected server "
737                "supports BSON document sizes up to %d bytes." %
738                (max_doc_size, self.max_bson_size))
739
740        try:
741            self.sock.sendall(message)
742        except BaseException as error:
743            self._raise_connection_failure(error)
744
745    def receive_message(self, request_id):
746        """Receive a raw BSON message or raise ConnectionFailure.
747
748        If any exception is raised, the socket is closed.
749        """
750        try:
751            return receive_message(self, request_id, self.max_message_size)
752        except BaseException as error:
753            self._raise_connection_failure(error)
754
755    def _raise_if_not_writable(self, unacknowledged):
756        """Raise NotPrimaryError on unacknowledged write if this socket is not
757        writable.
758        """
759        if unacknowledged and not self.is_writable:
760            # Write won't succeed, bail as if we'd received a not primary error.
761            raise NotPrimaryError("not primary", {
762                "ok": 0, "errmsg": "not primary", "code": 10107})
763
764    def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
765        """Send OP_INSERT, etc., optionally returning response as a dict.
766
767        Can raise ConnectionFailure or OperationFailure.
768
769        :Parameters:
770          - `request_id`: an int.
771          - `msg`: bytes, an OP_INSERT, OP_UPDATE, or OP_DELETE message,
772            perhaps with a getlasterror command appended.
773          - `max_doc_size`: size in bytes of the largest document in `msg`.
774          - `with_last_error`: True if a getlasterror command is appended.
775        """
776        self._raise_if_not_writable(not with_last_error)
777
778        self.send_message(msg, max_doc_size)
779        if with_last_error:
780            reply = self.receive_message(request_id)
781            return helpers._check_gle_response(reply.command_response(),
782                                               self.max_wire_version)
783
784    def write_command(self, request_id, msg):
785        """Send "insert" etc. command, returning response as a dict.
786
787        Can raise ConnectionFailure or OperationFailure.
788
789        :Parameters:
790          - `request_id`: an int.
791          - `msg`: bytes, the command message.
792        """
793        self.send_message(msg, 0)
794        reply = self.receive_message(request_id)
795        result = reply.command_response()
796
797        # Raises NotPrimaryError or OperationFailure.
798        helpers._check_command_response(result, self.max_wire_version)
799        return result
800
801    def check_auth(self, all_credentials):
802        """Update this socket's authentication.
803
804        Log in or out to bring this socket's credentials up to date with
805        those provided. Can raise ConnectionFailure or OperationFailure.
806
807        :Parameters:
808          - `all_credentials`: dict, maps auth source to MongoCredential.
809        """
810        if all_credentials or self.authset:
811            cached = set(itervalues(all_credentials))
812            authset = self.authset.copy()
813
814            # Logout any credentials that no longer exist in the cache.
815            for credentials in authset - cached:
816                auth.logout(credentials.source, self)
817                self.authset.discard(credentials)
818
819            for credentials in cached - authset:
820                self.authenticate(credentials)
821
822        # CMAP spec says to publish the ready event only after authenticating
823        # the connection.
824        if not self.ready:
825            self.ready = True
826            if self.enabled_for_cmap:
827                self.listeners.publish_connection_ready(self.address, self.id)
828
829    def authenticate(self, credentials):
830        """Log in to the server and store these credentials in `authset`.
831
832        Can raise ConnectionFailure or OperationFailure.
833
834        :Parameters:
835          - `credentials`: A MongoCredential.
836        """
837        auth.authenticate(credentials, self)
838        self.authset.add(credentials)
839        # negotiated_mechanisms are no longer needed.
840        self.negotiated_mechanisms.pop(credentials, None)
841        self.auth_ctx.pop(credentials, None)
842
843    def validate_session(self, client, session):
844        """Validate this session before use with client.
845
846        Raises error if this session is logged in as a different user or
847        the client is not the one that created the session.
848        """
849        if session:
850            if session._client is not client:
851                raise InvalidOperation(
852                    'Can only use session with the MongoClient that'
853                    ' started it')
854            if session._authset != self.authset:
855                raise InvalidOperation(
856                    'Cannot use session after authenticating with different'
857                    ' credentials')
858
859    def close_socket(self, reason):
860        """Close this connection with a reason."""
861        if self.closed:
862            return
863        self._close_socket()
864        if reason and self.enabled_for_cmap:
865            self.listeners.publish_connection_closed(
866                self.address, self.id, reason)
867
868    def _close_socket(self):
869        """Close this connection."""
870        if self.closed:
871            return
872        self.closed = True
873        if self.cancel_context:
874            self.cancel_context.cancel()
875        # Note: We catch exceptions to avoid spurious errors on interpreter
876        # shutdown.
877        try:
878            self.sock.close()
879        except Exception:
880            pass
881
882    def socket_closed(self):
883        """Return True if we know socket has been closed, False otherwise."""
884        return self.socket_checker.socket_closed(self.sock)
885
886    def send_cluster_time(self, command, session, client):
887        """Add cluster time for MongoDB >= 3.6."""
888        if self.max_wire_version >= 6 and client:
889            client._send_cluster_time(command, session)
890
891    def add_server_api(self, command):
892        """Add server_api parameters."""
893        if self.opts.server_api:
894            _add_to_command(command, self.opts.server_api)
895
896    def update_last_checkin_time(self):
897        self.last_checkin_time = _time()
898
899    def update_is_writable(self, is_writable):
900        self.is_writable = is_writable
901
902    def idle_time_seconds(self):
903        """Seconds since this socket was last checked into its pool."""
904        return _time() - self.last_checkin_time
905
906    def _raise_connection_failure(self, error):
907        # Catch *all* exceptions from socket methods and close the socket. In
908        # regular Python, socket operations only raise socket.error, even if
909        # the underlying cause was a Ctrl-C: a signal raised during socket.recv
910        # is expressed as an EINTR error from poll. See internal_select_ex() in
911        # socketmodule.c. All error codes from poll become socket.error at
912        # first. Eventually in PyEval_EvalFrameEx the interpreter checks for
913        # signals and throws KeyboardInterrupt into the current frame on the
914        # main thread.
915        #
916        # But in Gevent and Eventlet, the polling mechanism (epoll, kqueue,
917        # ...) is called in Python code, which experiences the signal as a
918        # KeyboardInterrupt from the start, rather than as an initial
919        # socket.error, so we catch that, close the socket, and reraise it.
920        #
921        # The connection closed event will be emitted later in return_socket.
922        if self.ready:
923            reason = None
924        else:
925            reason = ConnectionClosedReason.ERROR
926        self.close_socket(reason)
927        # SSLError from PyOpenSSL inherits directly from Exception.
928        if isinstance(error, (IOError, OSError, _SSLError)):
929            _raise_connection_failure(self.address, error)
930        else:
931            raise
932
933    def __eq__(self, other):
934        return self.sock == other.sock
935
936    def __ne__(self, other):
937        return not self == other
938
939    def __hash__(self):
940        return hash(self.sock)
941
942    def __repr__(self):
943        return "SocketInfo(%s)%s at %s" % (
944            repr(self.sock),
945            self.closed and " CLOSED" or "",
946            id(self)
947        )
948
949
950def _create_connection(address, options):
951    """Given (host, port) and PoolOptions, connect and return a socket object.
952
953    Can raise socket.error.
954
955    This is a modified version of create_connection from CPython >= 2.7.
956    """
957    host, port = address
958
959    # Check if dealing with a unix domain socket
960    if host.endswith('.sock'):
961        if not hasattr(socket, "AF_UNIX"):
962            raise ConnectionFailure("UNIX-sockets are not supported "
963                                    "on this system")
964        sock = socket.socket(socket.AF_UNIX)
965        # SOCK_CLOEXEC not supported for Unix sockets.
966        _set_non_inheritable_non_atomic(sock.fileno())
967        try:
968            sock.connect(host)
969            return sock
970        except socket.error:
971            sock.close()
972            raise
973
974    # Don't try IPv6 if we don't support it. Also skip it if host
975    # is 'localhost' (::1 is fine). Avoids slow connect issues
976    # like PYTHON-356.
977    family = socket.AF_INET
978    if socket.has_ipv6 and host != 'localhost':
979        family = socket.AF_UNSPEC
980
981    err = None
982    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
983        af, socktype, proto, dummy, sa = res
984        # SOCK_CLOEXEC was new in CPython 3.2, and only available on a limited
985        # number of platforms (newer Linux and *BSD). Starting with CPython 3.4
986        # all file descriptors are created non-inheritable. See PEP 446.
987        try:
988            sock = socket.socket(
989                af, socktype | getattr(socket, 'SOCK_CLOEXEC', 0), proto)
990        except socket.error:
991            # Can SOCK_CLOEXEC be defined even if the kernel doesn't support
992            # it?
993            sock = socket.socket(af, socktype, proto)
994        # Fallback when SOCK_CLOEXEC isn't available.
995        _set_non_inheritable_non_atomic(sock.fileno())
996        try:
997            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
998            sock.settimeout(options.connect_timeout)
999            sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,
1000                            options.socket_keepalive)
1001            if options.socket_keepalive:
1002                _set_keepalive_times(sock)
1003            sock.connect(sa)
1004            return sock
1005        except socket.error as e:
1006            err = e
1007            sock.close()
1008
1009    if err is not None:
1010        raise err
1011    else:
1012        # This likely means we tried to connect to an IPv6 only
1013        # host with an OS/kernel or Python interpreter that doesn't
1014        # support IPv6. The test case is Jython2.5.1 which doesn't
1015        # support IPv6 at all.
1016        raise socket.error('getaddrinfo failed')
1017
1018
1019def _configured_socket(address, options):
1020    """Given (host, port) and PoolOptions, return a configured socket.
1021
1022    Can raise socket.error, ConnectionFailure, or CertificateError.
1023
1024    Sets socket's SSL and timeout options.
1025    """
1026    sock = _create_connection(address, options)
1027    ssl_context = options.ssl_context
1028
1029    if ssl_context is not None:
1030        host = address[0]
1031        try:
1032            # According to RFC6066, section 3, IPv4 and IPv6 literals are
1033            # not permitted for SNI hostname.
1034            # Previous to Python 3.7 wrap_socket would blindly pass
1035            # IP addresses as SNI hostname.
1036            # https://bugs.python.org/issue32185
1037            # We have to pass hostname / ip address to wrap_socket
1038            # to use SSLContext.check_hostname.
1039            if _HAVE_SNI and (not is_ip_address(host) or _IPADDR_SAFE):
1040                sock = ssl_context.wrap_socket(sock, server_hostname=host)
1041            else:
1042                sock = ssl_context.wrap_socket(sock)
1043        except CertificateError:
1044            sock.close()
1045            # Raise CertificateError directly like we do after match_hostname
1046            # below.
1047            raise
1048        except (IOError, OSError, _SSLError) as exc:
1049            sock.close()
1050            # We raise AutoReconnect for transient and permanent SSL handshake
1051            # failures alike. Permanent handshake failures, like protocol
1052            # mismatch, will be turned into ServerSelectionTimeoutErrors later.
1053            _raise_connection_failure(address, exc, "SSL handshake failed: ")
1054        if (ssl_context.verify_mode and not
1055                getattr(ssl_context, "check_hostname", False) and
1056                options.ssl_match_hostname):
1057            try:
1058                match_hostname(sock.getpeercert(), hostname=host)
1059            except CertificateError:
1060                sock.close()
1061                raise
1062
1063    sock.settimeout(options.socket_timeout)
1064    return sock
1065
1066
1067class _PoolClosedError(PyMongoError):
1068    """Internal error raised when a thread tries to get a connection from a
1069    closed pool.
1070    """
1071    pass
1072
1073
1074class _PoolGeneration(object):
1075    def __init__(self):
1076        # Maps service_id to generation.
1077        self._generations = collections.defaultdict(int)
1078        # Overall pool generation.
1079        self._generation = 0
1080
1081    def get(self, service_id):
1082        """Get the generation for the given service_id."""
1083        if service_id is None:
1084            return self._generation
1085        return self._generations[service_id]
1086
1087    def get_overall(self):
1088        """Get the Pool's overall generation."""
1089        return self._generation
1090
1091    def inc(self, service_id):
1092        """Increment the generation for the given service_id."""
1093        self._generation += 1
1094        if service_id is None:
1095            for service_id in self._generations:
1096                self._generations[service_id] += 1
1097        else:
1098            self._generations[service_id] += 1
1099
1100    def stale(self, gen, service_id):
1101        """Return if the given generation for a given service_id is stale."""
1102        return gen != self.get(service_id)
1103
1104
1105class PoolState(object):
1106    PAUSED = 1
1107    READY = 2
1108    CLOSED = 3
1109
1110
1111# Do *not* explicitly inherit from object or Jython won't call __del__
1112# http://bugs.jython.org/issue1057
1113class Pool:
1114    def __init__(self, address, options, handshake=True):
1115        """
1116        :Parameters:
1117          - `address`: a (hostname, port) tuple
1118          - `options`: a PoolOptions instance
1119          - `handshake`: whether to call hello for each new SocketInfo
1120        """
1121        # Check a socket's health with socket_closed() every once in a while.
1122        # Can override for testing: 0 to always check, None to never check.
1123        self._check_interval_seconds = 1
1124        # LIFO pool. Sockets are ordered on idle time. Sockets claimed
1125        # and returned to pool from the left side. Stale sockets removed
1126        # from the right side.
1127        self.sockets = collections.deque()
1128        self.lock = threading.Lock()
1129        self.active_sockets = 0
1130        # Monotonically increasing connection ID required for CMAP Events.
1131        self.next_connection_id = 1
1132        self.closed = False
1133        # Track whether the sockets in this pool are writeable or not.
1134        self.is_writable = None
1135
1136        # Keep track of resets, so we notice sockets created before the most
1137        # recent reset and close them.
1138        # self.generation = 0
1139        self.gen = _PoolGeneration()
1140        self.pid = os.getpid()
1141        self.address = address
1142        self.opts = options
1143        self.handshake = handshake
1144        # Don't publish events in Monitor pools.
1145        self.enabled_for_cmap = (
1146                self.handshake and
1147                self.opts.event_listeners is not None and
1148                self.opts.event_listeners.enabled_for_cmap)
1149
1150        if (self.opts.wait_queue_multiple is None or
1151                self.opts.max_pool_size is None):
1152            max_waiters = None
1153        else:
1154            max_waiters = (
1155                self.opts.max_pool_size * self.opts.wait_queue_multiple)
1156
1157        self._socket_semaphore = thread_util.create_semaphore(
1158            self.opts.max_pool_size, max_waiters)
1159        if self.enabled_for_cmap:
1160            self.opts.event_listeners.publish_pool_created(
1161                self.address, self.opts.non_default_options)
1162        # Retain references to pinned connections to prevent the CPython GC
1163        # from thinking that a cursor's pinned connection can be GC'd when the
1164        # cursor is GC'd (see PYTHON-2751).
1165        self.__pinned_sockets = set()
1166        self.ncursors = 0
1167        self.ntxns = 0
1168
1169    def _reset(self, close, service_id=None):
1170        with self.lock:
1171            if self.closed:
1172                return
1173            self.gen.inc(service_id)
1174            newpid = os.getpid()
1175            if self.pid != newpid:
1176                self.pid = newpid
1177                self.active_sockets = 0
1178            if service_id is None:
1179                sockets, self.sockets = self.sockets, collections.deque()
1180            else:
1181                discard = collections.deque()
1182                keep = collections.deque()
1183                for sock_info in self.sockets:
1184                    if sock_info.service_id == service_id:
1185                        discard.append(sock_info)
1186                    else:
1187                        keep.append(sock_info)
1188                sockets = discard
1189                self.sockets = keep
1190
1191            if close:
1192                self.closed = True
1193
1194        listeners = self.opts.event_listeners
1195        # CMAP spec says that close() MUST close sockets before publishing the
1196        # PoolClosedEvent but that reset() SHOULD close sockets *after*
1197        # publishing the PoolClearedEvent.
1198        if close:
1199            for sock_info in sockets:
1200                sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED)
1201            if self.enabled_for_cmap:
1202                listeners.publish_pool_closed(self.address)
1203        else:
1204            if self.enabled_for_cmap:
1205                listeners.publish_pool_cleared(self.address,
1206                                               service_id=service_id)
1207            for sock_info in sockets:
1208                sock_info.close_socket(ConnectionClosedReason.STALE)
1209
1210    def update_is_writable(self, is_writable):
1211        """Updates the is_writable attribute on all sockets currently in the
1212        Pool.
1213        """
1214        self.is_writable = is_writable
1215        with self.lock:
1216            for socket in self.sockets:
1217                socket.update_is_writable(self.is_writable)
1218
1219    def reset(self, service_id=None):
1220        self._reset(close=False, service_id=service_id)
1221
1222    def close(self):
1223        self._reset(close=True)
1224
1225    def stale_generation(self, gen, service_id):
1226        return self.gen.stale(gen, service_id)
1227
1228    def remove_stale_sockets(self, reference_generation, all_credentials):
1229        """Removes stale sockets then adds new ones if pool is too small and
1230        has not been reset. The `reference_generation` argument specifies the
1231        `generation` at the point in time this operation was requested on the
1232        pool.
1233        """
1234        if self.opts.max_idle_time_seconds is not None:
1235            with self.lock:
1236                while (self.sockets and
1237                       self.sockets[-1].idle_time_seconds() > self.opts.max_idle_time_seconds):
1238                    sock_info = self.sockets.pop()
1239                    sock_info.close_socket(ConnectionClosedReason.IDLE)
1240
1241        while True:
1242            with self.lock:
1243                if (len(self.sockets) + self.active_sockets >=
1244                        self.opts.min_pool_size):
1245                    # There are enough sockets in the pool.
1246                    break
1247
1248            # We must acquire the semaphore to respect max_pool_size.
1249            if not self._socket_semaphore.acquire(False):
1250                break
1251            try:
1252                sock_info = self.connect(all_credentials)
1253                with self.lock:
1254                    # Close connection and return if the pool was reset during
1255                    # socket creation or while acquiring the pool lock.
1256                    if self.gen.get_overall() != reference_generation:
1257                        sock_info.close_socket(ConnectionClosedReason.STALE)
1258                        break
1259                    self.sockets.appendleft(sock_info)
1260            finally:
1261                self._socket_semaphore.release()
1262
1263    def connect(self, all_credentials=None):
1264        """Connect to Mongo and return a new SocketInfo.
1265
1266        Can raise ConnectionFailure or CertificateError.
1267
1268        Note that the pool does not keep a reference to the socket -- you
1269        must call return_socket() when you're done with it.
1270        """
1271        with self.lock:
1272            conn_id = self.next_connection_id
1273            self.next_connection_id += 1
1274
1275        listeners = self.opts.event_listeners
1276        if self.enabled_for_cmap:
1277            listeners.publish_connection_created(self.address, conn_id)
1278
1279        try:
1280            sock = _configured_socket(self.address, self.opts)
1281        except BaseException as error:
1282            if self.enabled_for_cmap:
1283                listeners.publish_connection_closed(
1284                    self.address, conn_id, ConnectionClosedReason.ERROR)
1285
1286            if isinstance(error, (IOError, OSError, _SSLError)):
1287                _raise_connection_failure(self.address, error)
1288
1289            raise
1290
1291        sock_info = SocketInfo(sock, self, self.address, conn_id)
1292        try:
1293            if self.handshake:
1294                sock_info.hello(all_credentials)
1295                self.is_writable = sock_info.is_writable
1296
1297            sock_info.check_auth(all_credentials)
1298        except BaseException:
1299            sock_info.close_socket(ConnectionClosedReason.ERROR)
1300            raise
1301
1302        return sock_info
1303
1304    @contextlib.contextmanager
1305    def get_socket(self, all_credentials, handler=None):
1306        """Get a socket from the pool. Use with a "with" statement.
1307
1308        Returns a :class:`SocketInfo` object wrapping a connected
1309        :class:`socket.socket`.
1310
1311        This method should always be used in a with-statement::
1312
1313            with pool.get_socket(credentials) as socket_info:
1314                socket_info.send_message(msg)
1315                data = socket_info.receive_message(op_code, request_id)
1316
1317        The socket is logged in or out as needed to match ``all_credentials``
1318        using the correct authentication mechanism for the server's wire
1319        protocol version.
1320
1321        Can raise ConnectionFailure or OperationFailure.
1322
1323        :Parameters:
1324          - `all_credentials`: dict, maps auth source to MongoCredential.
1325          - `handler` (optional): A _MongoClientErrorHandler.
1326        """
1327        listeners = self.opts.event_listeners
1328        if self.enabled_for_cmap:
1329            listeners.publish_connection_check_out_started(self.address)
1330
1331        sock_info = self._get_socket(all_credentials)
1332        if self.enabled_for_cmap:
1333            listeners.publish_connection_checked_out(
1334                self.address, sock_info.id)
1335        try:
1336            yield sock_info
1337        except:
1338            # Exception in caller. Ensure the connection gets returned.
1339            # Note that when pinned is True, the session owns the
1340            # connection and it is responsible for checking the connection
1341            # back into the pool.
1342            pinned = sock_info.pinned_txn or sock_info.pinned_cursor
1343            if handler:
1344                # Perform SDAM error handling rules while the connection is
1345                # still checked out.
1346                exc_type, exc_val, _ = sys.exc_info()
1347                handler.handle(exc_type, exc_val)
1348            if not pinned and sock_info.active:
1349                self.return_socket(sock_info)
1350            raise
1351        if sock_info.pinned_txn:
1352            with self.lock:
1353                self.__pinned_sockets.add(sock_info)
1354                self.ntxns += 1
1355        elif sock_info.pinned_cursor:
1356            with self.lock:
1357                self.__pinned_sockets.add(sock_info)
1358                self.ncursors += 1
1359        elif sock_info.active:
1360            self.return_socket(sock_info)
1361
1362    def _get_socket(self, all_credentials):
1363        """Get or create a SocketInfo. Can raise ConnectionFailure."""
1364        # We use the pid here to avoid issues with fork / multiprocessing.
1365        # See test.test_client:TestClient.test_fork for an example of
1366        # what could go wrong otherwise
1367        if self.pid != os.getpid():
1368            self.reset()
1369
1370        if self.closed:
1371            if self.enabled_for_cmap:
1372                self.opts.event_listeners.publish_connection_check_out_failed(
1373                    self.address, ConnectionCheckOutFailedReason.POOL_CLOSED)
1374            raise _PoolClosedError(
1375                'Attempted to check out a connection from closed connection '
1376                'pool')
1377
1378        # Get a free socket or create one.
1379        if not self._socket_semaphore.acquire(
1380                True, self.opts.wait_queue_timeout):
1381            self._raise_wait_queue_timeout()
1382
1383        # We've now acquired the semaphore and must release it on error.
1384        sock_info = None
1385        incremented = False
1386        try:
1387            with self.lock:
1388                self.active_sockets += 1
1389                incremented = True
1390
1391            while sock_info is None:
1392                try:
1393                    with self.lock:
1394                        sock_info = self.sockets.popleft()
1395                except IndexError:
1396                    # Can raise ConnectionFailure or CertificateError.
1397                    sock_info = self.connect(all_credentials)
1398                else:
1399                    if self._perished(sock_info):
1400                        sock_info = None
1401            sock_info.check_auth(all_credentials)
1402        except BaseException:
1403            if sock_info:
1404                # We checked out a socket but authentication failed.
1405                sock_info.close_socket(ConnectionClosedReason.ERROR)
1406            self._socket_semaphore.release()
1407
1408            if incremented:
1409                with self.lock:
1410                    self.active_sockets -= 1
1411
1412            if self.enabled_for_cmap:
1413                self.opts.event_listeners.publish_connection_check_out_failed(
1414                    self.address, ConnectionCheckOutFailedReason.CONN_ERROR)
1415            raise
1416
1417        sock_info.active = True
1418        return sock_info
1419
1420    def return_socket(self, sock_info):
1421        """Return the socket to the pool, or if it's closed discard it.
1422
1423        :Parameters:
1424          - `sock_info`: The socket to check into the pool.
1425        """
1426        txn = sock_info.pinned_txn
1427        cursor = sock_info.pinned_cursor
1428        sock_info.active = False
1429        sock_info.pinned_txn = False
1430        sock_info.pinned_cursor = False
1431        self.__pinned_sockets.discard(sock_info)
1432        listeners = self.opts.event_listeners
1433        if self.enabled_for_cmap:
1434            listeners.publish_connection_checked_in(self.address, sock_info.id)
1435        if self.pid != os.getpid():
1436            self.reset()
1437        else:
1438            if self.closed:
1439                sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED)
1440            elif sock_info.closed:
1441                # CMAP requires the closed event be emitted after the check in.
1442                if self.enabled_for_cmap:
1443                    listeners.publish_connection_closed(
1444                        self.address, sock_info.id,
1445                        ConnectionClosedReason.ERROR)
1446            else:
1447                with self.lock:
1448                    # Hold the lock to ensure this section does not race with
1449                    # Pool.reset().
1450                    if self.stale_generation(sock_info.generation,
1451                                             sock_info.service_id):
1452                        sock_info.close_socket(ConnectionClosedReason.STALE)
1453                    else:
1454                        sock_info.update_last_checkin_time()
1455                        sock_info.update_is_writable(self.is_writable)
1456                        self.sockets.appendleft(sock_info)
1457
1458        self._socket_semaphore.release()
1459        with self.lock:
1460            if txn:
1461                self.ntxns -= 1
1462            elif cursor:
1463                self.ncursors -= 1
1464            self.active_sockets -= 1
1465
1466    def _perished(self, sock_info):
1467        """Return True and close the connection if it is "perished".
1468
1469        This side-effecty function checks if this socket has been idle for
1470        for longer than the max idle time, or if the socket has been closed by
1471        some external network error, or if the socket's generation is outdated.
1472
1473        Checking sockets lets us avoid seeing *some*
1474        :class:`~pymongo.errors.AutoReconnect` exceptions on server
1475        hiccups, etc. We only check if the socket was closed by an external
1476        error if it has been > 1 second since the socket was checked into the
1477        pool, to keep performance reasonable - we can't avoid AutoReconnects
1478        completely anyway.
1479        """
1480        idle_time_seconds = sock_info.idle_time_seconds()
1481        # If socket is idle, open a new one.
1482        if (self.opts.max_idle_time_seconds is not None and
1483                idle_time_seconds > self.opts.max_idle_time_seconds):
1484            sock_info.close_socket(ConnectionClosedReason.IDLE)
1485            return True
1486
1487        if (self._check_interval_seconds is not None and (
1488                0 == self._check_interval_seconds or
1489                idle_time_seconds > self._check_interval_seconds)):
1490            if sock_info.socket_closed():
1491                sock_info.close_socket(ConnectionClosedReason.ERROR)
1492                return True
1493
1494        if self.stale_generation(sock_info.generation, sock_info.service_id):
1495            sock_info.close_socket(ConnectionClosedReason.STALE)
1496            return True
1497
1498        return False
1499
1500    def _raise_wait_queue_timeout(self):
1501        listeners = self.opts.event_listeners
1502        if self.enabled_for_cmap:
1503            listeners.publish_connection_check_out_failed(
1504                self.address, ConnectionCheckOutFailedReason.TIMEOUT)
1505        if self.opts.load_balanced:
1506            other_ops = self.active_sockets - self.ncursors - self.ntxns
1507            raise ConnectionFailure(
1508                'Timeout waiting for connection from the connection pool. '
1509                'maxPoolSize: %s, connections in use by cursors: %s, '
1510                'connections in use by transactions: %s, connections in use '
1511                'by other operations: %s, wait_queue_timeout: %s' % (
1512                    self.opts.max_pool_size, self.ncursors, self.ntxns,
1513                    other_ops, self.opts.wait_queue_timeout))
1514        raise ConnectionFailure(
1515            'Timed out while checking out a connection from connection pool. '
1516            'maxPoolSize: %s, wait_queue_timeout: %s' % (
1517                self.opts.max_pool_size, self.opts.wait_queue_timeout))
1518
1519    def __del__(self):
1520        # Avoid ResourceWarnings in Python 3
1521        # Close all sockets without calling reset() or close() because it is
1522        # not safe to acquire a lock in __del__.
1523        for sock_info in self.sockets:
1524            sock_info.close_socket(None)
1525