1# Copyright 2017 MongoDB, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Logical sessions for ordering sequential operations.
16
17Requires MongoDB 3.6.
18
19.. versionadded:: 3.6
20
21Causally Consistent Reads
22=========================
23
24.. code-block:: python
25
26  with client.start_session(causal_consistency=True) as session:
27      collection = client.db.collection
28      collection.update_one({'_id': 1}, {'$set': {'x': 10}}, session=session)
29      secondary_c = collection.with_options(
30          read_preference=ReadPreference.SECONDARY)
31
32      # A secondary read waits for replication of the write.
33      secondary_c.find_one({'_id': 1}, session=session)
34
35If `causal_consistency` is True (the default), read operations that use
36the session are causally after previous read and write operations. Using a
37causally consistent session, an application can read its own writes and is
38guaranteed monotonic reads, even when reading from replica set secondaries.
39
40.. mongodoc:: causal-consistency
41
42.. _transactions-ref:
43
44Transactions
45============
46
47.. versionadded:: 3.7
48
49MongoDB 4.0 adds support for transactions on replica set primaries. A
50transaction is associated with a :class:`ClientSession`. To start a transaction
51on a session, use :meth:`ClientSession.start_transaction` in a with-statement.
52Then, execute an operation within the transaction by passing the session to the
53operation:
54
55.. code-block:: python
56
57  orders = client.db.orders
58  inventory = client.db.inventory
59  with client.start_session() as session:
60      with session.start_transaction():
61          orders.insert_one({"sku": "abc123", "qty": 100}, session=session)
62          inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}},
63                               {"$inc": {"qty": -100}}, session=session)
64
65Upon normal completion of ``with session.start_transaction()`` block, the
66transaction automatically calls :meth:`ClientSession.commit_transaction`.
67If the block exits with an exception, the transaction automatically calls
68:meth:`ClientSession.abort_transaction`.
69
70In general, multi-document transactions only support read/write (CRUD)
71operations on existing collections. However, MongoDB 4.4 adds support for
72creating collections and indexes with some limitations, including an
73insert operation that would result in the creation of a new collection.
74For a complete description of all the supported and unsupported operations
75see the `MongoDB server's documentation for transactions
76<http://dochub.mongodb.org/core/transactions>`_.
77
78A session may only have a single active transaction at a time, multiple
79transactions on the same session can be executed in sequence.
80
81Sharded Transactions
82^^^^^^^^^^^^^^^^^^^^
83
84.. versionadded:: 3.9
85
86PyMongo 3.9 adds support for transactions on sharded clusters running MongoDB
87>=4.2. Sharded transactions have the same API as replica set transactions.
88When running a transaction against a sharded cluster, the session is
89pinned to the mongos server selected for the first operation in the
90transaction. All subsequent operations that are part of the same transaction
91are routed to the same mongos server. When the transaction is completed, by
92running either commitTransaction or abortTransaction, the session is unpinned.
93
94.. mongodoc:: transactions
95
96.. _snapshot-reads-ref:
97
98Snapshot Reads
99==============
100
101.. versionadded:: 3.12
102
103MongoDB 5.0 adds support for snapshot reads. Snapshot reads are requested by
104passing the ``snapshot`` option to
105:meth:`~pymongo.mongo_client.MongoClient.start_session`.
106If ``snapshot`` is True, all read operations that use this session read data
107from the same snapshot timestamp. The server chooses the latest
108majority-committed snapshot timestamp when executing the first read operation
109using the session. Subsequent reads on this session read from the same
110snapshot timestamp. Snapshot reads are also supported when reading from
111replica set secondaries.
112
113.. code-block:: python
114
115  # Each read using this session reads data from the same point in time.
116  with client.start_session(snapshot=True) as session:
117      order = orders.find_one({"sku": "abc123"}, session=session)
118      inventory = inventory.find_one({"sku": "abc123"}, session=session)
119
120Snapshot Reads Limitations
121^^^^^^^^^^^^^^^^^^^^^^^^^^
122
123Snapshot reads sessions are incompatible with ``causal_consistency=True``.
124Only the following read operations are supported in a snapshot reads session:
125
126- :meth:`~pymongo.collection.Collection.find`
127- :meth:`~pymongo.collection.Collection.find_one`
128- :meth:`~pymongo.collection.Collection.aggregate`
129- :meth:`~pymongo.collection.Collection.count_documents`
130- :meth:`~pymongo.collection.Collection.distinct` (on unsharded collections)
131
132Classes
133=======
134"""
135
136import collections
137import uuid
138
139from bson.binary import Binary
140from bson.int64 import Int64
141from bson.py3compat import abc, integer_types
142from bson.son import SON
143from bson.timestamp import Timestamp
144
145from pymongo import monotonic
146from pymongo.cursor import _SocketManager
147from pymongo.errors import (ConfigurationError,
148                            ConnectionFailure,
149                            InvalidOperation,
150                            OperationFailure,
151                            PyMongoError,
152                            WTimeoutError)
153from pymongo.helpers import _RETRYABLE_ERROR_CODES
154from pymongo.read_concern import ReadConcern
155from pymongo.read_preferences import ReadPreference, _ServerMode
156from pymongo.server_type import SERVER_TYPE
157from pymongo.write_concern import WriteConcern
158
159
160class SessionOptions(object):
161    """Options for a new :class:`ClientSession`.
162
163    :Parameters:
164      - `causal_consistency` (optional): If True, read operations are causally
165        ordered within the session. Defaults to True when the ``snapshot``
166        option is ``False``.
167      - `default_transaction_options` (optional): The default
168        TransactionOptions to use for transactions started on this session.
169      - `snapshot` (optional): If True, then all reads performed using this
170        session will read from the same snapshot. This option is incompatible
171        with ``causal_consistency=True``. Defaults to ``False``.
172
173    .. versionchanged:: 3.12
174       Added the ``snapshot`` parameter.
175    """
176    def __init__(self,
177                 causal_consistency=None,
178                 default_transaction_options=None,
179                 snapshot=False):
180        if snapshot:
181            if causal_consistency:
182                raise ConfigurationError('snapshot reads do not support '
183                                         'causal_consistency=True')
184            causal_consistency = False
185        elif causal_consistency is None:
186            causal_consistency = True
187        self._causal_consistency = causal_consistency
188        if default_transaction_options is not None:
189            if not isinstance(default_transaction_options, TransactionOptions):
190                raise TypeError(
191                    "default_transaction_options must be an instance of "
192                    "pymongo.client_session.TransactionOptions, not: %r" %
193                    (default_transaction_options,))
194        self._default_transaction_options = default_transaction_options
195        self._snapshot = snapshot
196
197    @property
198    def causal_consistency(self):
199        """Whether causal consistency is configured."""
200        return self._causal_consistency
201
202    @property
203    def default_transaction_options(self):
204        """The default TransactionOptions to use for transactions started on
205        this session.
206
207        .. versionadded:: 3.7
208        """
209        return self._default_transaction_options
210
211    @property
212    def snapshot(self):
213        """Whether snapshot reads are configured.
214
215        .. versionadded:: 3.12
216        """
217        return self._snapshot
218
219
220class TransactionOptions(object):
221    """Options for :meth:`ClientSession.start_transaction`.
222
223    :Parameters:
224      - `read_concern` (optional): The
225        :class:`~pymongo.read_concern.ReadConcern` to use for this transaction.
226        If ``None`` (the default) the :attr:`read_preference` of
227        the :class:`MongoClient` is used.
228      - `write_concern` (optional): The
229        :class:`~pymongo.write_concern.WriteConcern` to use for this
230        transaction. If ``None`` (the default) the :attr:`read_preference` of
231        the :class:`MongoClient` is used.
232      - `read_preference` (optional): The read preference to use. If
233        ``None`` (the default) the :attr:`read_preference` of this
234        :class:`MongoClient` is used. See :mod:`~pymongo.read_preferences`
235        for options. Transactions which read must use
236        :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
237      - `max_commit_time_ms` (optional): The maximum amount of time to allow a
238        single commitTransaction command to run. This option is an alias for
239        maxTimeMS option on the commitTransaction command. If ``None`` (the
240        default) maxTimeMS is not used.
241
242    .. versionchanged:: 3.9
243       Added the ``max_commit_time_ms`` option.
244
245    .. versionadded:: 3.7
246    """
247    def __init__(self, read_concern=None, write_concern=None,
248                 read_preference=None, max_commit_time_ms=None):
249        self._read_concern = read_concern
250        self._write_concern = write_concern
251        self._read_preference = read_preference
252        self._max_commit_time_ms = max_commit_time_ms
253        if read_concern is not None:
254            if not isinstance(read_concern, ReadConcern):
255                raise TypeError("read_concern must be an instance of "
256                                "pymongo.read_concern.ReadConcern, not: %r" %
257                                (read_concern,))
258        if write_concern is not None:
259            if not isinstance(write_concern, WriteConcern):
260                raise TypeError("write_concern must be an instance of "
261                                "pymongo.write_concern.WriteConcern, not: %r" %
262                                (write_concern,))
263            if not write_concern.acknowledged:
264                raise ConfigurationError(
265                    "transactions do not support unacknowledged write concern"
266                    ": %r" % (write_concern,))
267        if read_preference is not None:
268            if not isinstance(read_preference, _ServerMode):
269                raise TypeError("%r is not valid for read_preference. See "
270                                "pymongo.read_preferences for valid "
271                                "options." % (read_preference,))
272        if max_commit_time_ms is not None:
273            if not isinstance(max_commit_time_ms, integer_types):
274                raise TypeError(
275                    "max_commit_time_ms must be an integer or None")
276
277    @property
278    def read_concern(self):
279        """This transaction's :class:`~pymongo.read_concern.ReadConcern`."""
280        return self._read_concern
281
282    @property
283    def write_concern(self):
284        """This transaction's :class:`~pymongo.write_concern.WriteConcern`."""
285        return self._write_concern
286
287    @property
288    def read_preference(self):
289        """This transaction's :class:`~pymongo.read_preferences.ReadPreference`.
290        """
291        return self._read_preference
292
293    @property
294    def max_commit_time_ms(self):
295        """The maxTimeMS to use when running a commitTransaction command.
296
297        .. versionadded:: 3.9
298        """
299        return self._max_commit_time_ms
300
301
302def _validate_session_write_concern(session, write_concern):
303    """Validate that an explicit session is not used with an unack'ed write.
304
305    Returns the session to use for the next operation.
306    """
307    if session:
308        if write_concern is not None and not write_concern.acknowledged:
309            # For unacknowledged writes without an explicit session,
310            # drivers SHOULD NOT use an implicit session. If a driver
311            # creates an implicit session for unacknowledged writes
312            # without an explicit session, the driver MUST NOT send the
313            # session ID.
314            if session._implicit:
315                return None
316            else:
317                raise ConfigurationError(
318                    'Explicit sessions are incompatible with '
319                    'unacknowledged write concern: %r' % (
320                        write_concern,))
321    return session
322
323
324class _TransactionContext(object):
325    """Internal transaction context manager for start_transaction."""
326    def __init__(self, session):
327        self.__session = session
328
329    def __enter__(self):
330        return self
331
332    def __exit__(self, exc_type, exc_val, exc_tb):
333        if self.__session.in_transaction:
334            if exc_val is None:
335                self.__session.commit_transaction()
336            else:
337                self.__session.abort_transaction()
338
339
340class _TxnState(object):
341    NONE = 1
342    STARTING = 2
343    IN_PROGRESS = 3
344    COMMITTED = 4
345    COMMITTED_EMPTY = 5
346    ABORTED = 6
347
348
349class _Transaction(object):
350    """Internal class to hold transaction information in a ClientSession."""
351    def __init__(self, opts, client):
352        self.opts = opts
353        self.state = _TxnState.NONE
354        self.sharded = False
355        self.pinned_address = None
356        self.sock_mgr = None
357        self.recovery_token = None
358        self.attempt = 0
359        self.client = client
360
361    def active(self):
362        return self.state in (_TxnState.STARTING, _TxnState.IN_PROGRESS)
363
364    def starting(self):
365        return self.state == _TxnState.STARTING
366
367    @property
368    def pinned_conn(self):
369        if self.active() and self.sock_mgr:
370            return self.sock_mgr.sock
371        return None
372
373    def pin(self, server, sock_info):
374        self.sharded = True
375        self.pinned_address = server.description.address
376        if server.description.server_type == SERVER_TYPE.LoadBalancer:
377            sock_info.pin_txn()
378            self.sock_mgr = _SocketManager(sock_info, False)
379
380    def unpin(self):
381        self.pinned_address = None
382        if self.sock_mgr:
383            self.sock_mgr.close()
384        self.sock_mgr = None
385
386    def reset(self):
387        self.unpin()
388        self.state = _TxnState.NONE
389        self.sharded = False
390        self.recovery_token = None
391        self.attempt = 0
392
393    def __del__(self):
394        if self.sock_mgr:
395            # Reuse the cursor closing machinery to return the socket to the
396            # pool soon.
397            self.client._close_cursor_soon(0, None, self.sock_mgr)
398            self.sock_mgr = None
399
400
401def _reraise_with_unknown_commit(exc):
402    """Re-raise an exception with the UnknownTransactionCommitResult label."""
403    exc._add_error_label("UnknownTransactionCommitResult")
404    raise
405
406
407def _max_time_expired_error(exc):
408    """Return true if exc is a MaxTimeMSExpired error."""
409    return isinstance(exc, OperationFailure) and exc.code == 50
410
411
412# From the transactions spec, all the retryable writes errors plus
413# WriteConcernFailed.
414_UNKNOWN_COMMIT_ERROR_CODES = _RETRYABLE_ERROR_CODES | frozenset([
415    64,    # WriteConcernFailed
416    50,    # MaxTimeMSExpired
417])
418
419# From the Convenient API for Transactions spec, with_transaction must
420# halt retries after 120 seconds.
421# This limit is non-configurable and was chosen to be twice the 60 second
422# default value of MongoDB's `transactionLifetimeLimitSeconds` parameter.
423_WITH_TRANSACTION_RETRY_TIME_LIMIT = 120
424
425
426def _within_time_limit(start_time):
427    """Are we within the with_transaction retry limit?"""
428    return monotonic.time() - start_time < _WITH_TRANSACTION_RETRY_TIME_LIMIT
429
430
431class ClientSession(object):
432    """A session for ordering sequential operations.
433
434    :class:`ClientSession` instances are **not thread-safe or fork-safe**.
435    They can only be used by one thread or process at a time. A single
436    :class:`ClientSession` cannot be used to run multiple operations
437    concurrently.
438
439    Should not be initialized directly by application developers - to create a
440    :class:`ClientSession`, call
441    :meth:`~pymongo.mongo_client.MongoClient.start_session`.
442    """
443    def __init__(self, client, server_session, options, authset, implicit):
444        # A MongoClient, a _ServerSession, a SessionOptions, and a set.
445        self._client = client
446        self._server_session = server_session
447        self._options = options
448        self._authset = authset
449        self._cluster_time = None
450        self._operation_time = None
451        self._snapshot_time = None
452        # Is this an implicitly created session?
453        self._implicit = implicit
454        self._transaction = _Transaction(None, client)
455
456    def end_session(self):
457        """Finish this session. If a transaction has started, abort it.
458
459        It is an error to use the session after the session has ended.
460        """
461        self._end_session(lock=True)
462
463    def _end_session(self, lock):
464        if self._server_session is not None:
465            try:
466                if self.in_transaction:
467                    self.abort_transaction()
468                # It's possible we're still pinned here when the transaction
469                # is in the committed state when the session is discarded.
470                self._unpin()
471            finally:
472                self._client._return_server_session(self._server_session, lock)
473                self._server_session = None
474
475    def _check_ended(self):
476        if self._server_session is None:
477            raise InvalidOperation("Cannot use ended session")
478
479    def __enter__(self):
480        return self
481
482    def __exit__(self, exc_type, exc_val, exc_tb):
483        self._end_session(lock=True)
484
485    @property
486    def client(self):
487        """The :class:`~pymongo.mongo_client.MongoClient` this session was
488        created from.
489        """
490        return self._client
491
492    @property
493    def options(self):
494        """The :class:`SessionOptions` this session was created with."""
495        return self._options
496
497    @property
498    def session_id(self):
499        """A BSON document, the opaque server session identifier."""
500        self._check_ended()
501        return self._server_session.session_id
502
503    @property
504    def cluster_time(self):
505        """The cluster time returned by the last operation executed
506        in this session.
507        """
508        return self._cluster_time
509
510    @property
511    def operation_time(self):
512        """The operation time returned by the last operation executed
513        in this session.
514        """
515        return self._operation_time
516
517    def _inherit_option(self, name, val):
518        """Return the inherited TransactionOption value."""
519        if val:
520            return val
521        txn_opts = self.options.default_transaction_options
522        val = txn_opts and getattr(txn_opts, name)
523        if val:
524            return val
525        return getattr(self.client, name)
526
527    def with_transaction(self, callback, read_concern=None, write_concern=None,
528                         read_preference=None, max_commit_time_ms=None):
529        """Execute a callback in a transaction.
530
531        This method starts a transaction on this session, executes ``callback``
532        once, and then commits the transaction. For example::
533
534          def callback(session):
535              orders = session.client.db.orders
536              inventory = session.client.db.inventory
537              orders.insert_one({"sku": "abc123", "qty": 100}, session=session)
538              inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}},
539                                   {"$inc": {"qty": -100}}, session=session)
540
541          with client.start_session() as session:
542              session.with_transaction(callback)
543
544        To pass arbitrary arguments to the ``callback``, wrap your callable
545        with a ``lambda`` like this::
546
547          def callback(session, custom_arg, custom_kwarg=None):
548              # Transaction operations...
549
550          with client.start_session() as session:
551              session.with_transaction(
552                  lambda s: callback(s, "custom_arg", custom_kwarg=1))
553
554        In the event of an exception, ``with_transaction`` may retry the commit
555        or the entire transaction, therefore ``callback`` may be invoked
556        multiple times by a single call to ``with_transaction``. Developers
557        should be mindful of this possiblity when writing a ``callback`` that
558        modifies application state or has any other side-effects.
559        Note that even when the ``callback`` is invoked multiple times,
560        ``with_transaction`` ensures that the transaction will be committed
561        at-most-once on the server.
562
563        The ``callback`` should not attempt to start new transactions, but
564        should simply run operations meant to be contained within a
565        transaction. The ``callback`` should also not commit the transaction;
566        this is handled automatically by ``with_transaction``. If the
567        ``callback`` does commit or abort the transaction without error,
568        however, ``with_transaction`` will return without taking further
569        action.
570
571        :class:`ClientSession` instances are **not thread-safe or fork-safe**.
572        Consequently, the ``callback`` must not attempt to execute multiple
573        operations concurrently.
574
575        When ``callback`` raises an exception, ``with_transaction``
576        automatically aborts the current transaction. When ``callback`` or
577        :meth:`~ClientSession.commit_transaction` raises an exception that
578        includes the ``"TransientTransactionError"`` error label,
579        ``with_transaction`` starts a new transaction and re-executes
580        the ``callback``.
581
582        When :meth:`~ClientSession.commit_transaction` raises an exception with
583        the ``"UnknownTransactionCommitResult"`` error label,
584        ``with_transaction`` retries the commit until the result of the
585        transaction is known.
586
587        This method will cease retrying after 120 seconds has elapsed. This
588        timeout is not configurable and any exception raised by the
589        ``callback`` or by :meth:`ClientSession.commit_transaction` after the
590        timeout is reached will be re-raised. Applications that desire a
591        different timeout duration should not use this method.
592
593        :Parameters:
594          - `callback`: The callable ``callback`` to run inside a transaction.
595            The callable must accept a single argument, this session. Note,
596            under certain error conditions the callback may be run multiple
597            times.
598          - `read_concern` (optional): The
599            :class:`~pymongo.read_concern.ReadConcern` to use for this
600            transaction.
601          - `write_concern` (optional): The
602            :class:`~pymongo.write_concern.WriteConcern` to use for this
603            transaction.
604          - `read_preference` (optional): The read preference to use for this
605            transaction. If ``None`` (the default) the :attr:`read_preference`
606            of this :class:`Database` is used. See
607            :mod:`~pymongo.read_preferences` for options.
608
609        :Returns:
610          The return value of the ``callback``.
611
612        .. versionadded:: 3.9
613        """
614        start_time = monotonic.time()
615        while True:
616            self.start_transaction(
617                read_concern, write_concern, read_preference,
618                max_commit_time_ms)
619            try:
620                ret = callback(self)
621            except Exception as exc:
622                if self.in_transaction:
623                    self.abort_transaction()
624                if (isinstance(exc, PyMongoError) and
625                        exc.has_error_label("TransientTransactionError") and
626                        _within_time_limit(start_time)):
627                    # Retry the entire transaction.
628                    continue
629                raise
630
631            if not self.in_transaction:
632                # Assume callback intentionally ended the transaction.
633                return ret
634
635            while True:
636                try:
637                    self.commit_transaction()
638                except PyMongoError as exc:
639                    if (exc.has_error_label("UnknownTransactionCommitResult")
640                            and _within_time_limit(start_time)
641                            and not _max_time_expired_error(exc)):
642                        # Retry the commit.
643                        continue
644
645                    if (exc.has_error_label("TransientTransactionError") and
646                            _within_time_limit(start_time)):
647                        # Retry the entire transaction.
648                        break
649                    raise
650
651                # Commit succeeded.
652                return ret
653
654    def start_transaction(self, read_concern=None, write_concern=None,
655                          read_preference=None, max_commit_time_ms=None):
656        """Start a multi-statement transaction.
657
658        Takes the same arguments as :class:`TransactionOptions`.
659
660        .. versionchanged:: 3.9
661           Added the ``max_commit_time_ms`` option.
662
663        .. versionadded:: 3.7
664        """
665        self._check_ended()
666
667        if self.options.snapshot:
668            raise InvalidOperation("Transactions are not supported in "
669                                   "snapshot sessions")
670
671        if self.in_transaction:
672            raise InvalidOperation("Transaction already in progress")
673
674        read_concern = self._inherit_option("read_concern", read_concern)
675        write_concern = self._inherit_option("write_concern", write_concern)
676        read_preference = self._inherit_option(
677            "read_preference", read_preference)
678        if max_commit_time_ms is None:
679            opts = self.options.default_transaction_options
680            if opts:
681                max_commit_time_ms = opts.max_commit_time_ms
682
683        self._transaction.opts = TransactionOptions(
684            read_concern, write_concern, read_preference, max_commit_time_ms)
685        self._transaction.reset()
686        self._transaction.state = _TxnState.STARTING
687        self._start_retryable_write()
688        return _TransactionContext(self)
689
690    def commit_transaction(self):
691        """Commit a multi-statement transaction.
692
693        .. versionadded:: 3.7
694        """
695        self._check_ended()
696        state = self._transaction.state
697        if state is _TxnState.NONE:
698            raise InvalidOperation("No transaction started")
699        elif state in (_TxnState.STARTING, _TxnState.COMMITTED_EMPTY):
700            # Server transaction was never started, no need to send a command.
701            self._transaction.state = _TxnState.COMMITTED_EMPTY
702            return
703        elif state is _TxnState.ABORTED:
704            raise InvalidOperation(
705                "Cannot call commitTransaction after calling abortTransaction")
706        elif state is _TxnState.COMMITTED:
707            # We're explicitly retrying the commit, move the state back to
708            # "in progress" so that in_transaction returns true.
709            self._transaction.state = _TxnState.IN_PROGRESS
710
711        try:
712            self._finish_transaction_with_retry("commitTransaction")
713        except ConnectionFailure as exc:
714            # We do not know if the commit was successfully applied on the
715            # server or if it satisfied the provided write concern, set the
716            # unknown commit error label.
717            exc._remove_error_label("TransientTransactionError")
718            _reraise_with_unknown_commit(exc)
719        except WTimeoutError as exc:
720            # We do not know if the commit has satisfied the provided write
721            # concern, add the unknown commit error label.
722            _reraise_with_unknown_commit(exc)
723        except OperationFailure as exc:
724            if exc.code not in _UNKNOWN_COMMIT_ERROR_CODES:
725                # The server reports errorLabels in the case.
726                raise
727            # We do not know if the commit was successfully applied on the
728            # server or if it satisfied the provided write concern, set the
729            # unknown commit error label.
730            _reraise_with_unknown_commit(exc)
731        finally:
732            self._transaction.state = _TxnState.COMMITTED
733
734    def abort_transaction(self):
735        """Abort a multi-statement transaction.
736
737        .. versionadded:: 3.7
738        """
739        self._check_ended()
740
741        state = self._transaction.state
742        if state is _TxnState.NONE:
743            raise InvalidOperation("No transaction started")
744        elif state is _TxnState.STARTING:
745            # Server transaction was never started, no need to send a command.
746            self._transaction.state = _TxnState.ABORTED
747            return
748        elif state is _TxnState.ABORTED:
749            raise InvalidOperation("Cannot call abortTransaction twice")
750        elif state in (_TxnState.COMMITTED, _TxnState.COMMITTED_EMPTY):
751            raise InvalidOperation(
752                "Cannot call abortTransaction after calling commitTransaction")
753
754        try:
755            self._finish_transaction_with_retry("abortTransaction")
756        except (OperationFailure, ConnectionFailure):
757            # The transactions spec says to ignore abortTransaction errors.
758            pass
759        finally:
760            self._transaction.state = _TxnState.ABORTED
761            self._unpin()
762
763    def _finish_transaction_with_retry(self, command_name):
764        """Run commit or abort with one retry after any retryable error.
765
766        :Parameters:
767          - `command_name`: Either "commitTransaction" or "abortTransaction".
768        """
769        def func(session, sock_info, retryable):
770            return self._finish_transaction(sock_info, command_name)
771        return self._client._retry_internal(True, func, self, None)
772
773    def _finish_transaction(self, sock_info, command_name):
774        self._transaction.attempt += 1
775        opts = self._transaction.opts
776        wc = opts.write_concern
777        cmd = SON([(command_name, 1)])
778        if command_name == "commitTransaction":
779            if opts.max_commit_time_ms:
780                cmd['maxTimeMS'] = opts.max_commit_time_ms
781
782            # Transaction spec says that after the initial commit attempt,
783            # subsequent commitTransaction commands should be upgraded to use
784            # w:"majority" and set a default value of 10 seconds for wtimeout.
785            if self._transaction.attempt > 1:
786                wc_doc = wc.document
787                wc_doc["w"] = "majority"
788                wc_doc.setdefault("wtimeout", 10000)
789                wc = WriteConcern(**wc_doc)
790
791        if self._transaction.recovery_token:
792            cmd['recoveryToken'] = self._transaction.recovery_token
793
794        return self._client.admin._command(
795            sock_info,
796            cmd,
797            session=self,
798            write_concern=wc,
799            parse_write_concern_error=True)
800
801    def _advance_cluster_time(self, cluster_time):
802        """Internal cluster time helper."""
803        if self._cluster_time is None:
804            self._cluster_time = cluster_time
805        elif cluster_time is not None:
806            if cluster_time["clusterTime"] > self._cluster_time["clusterTime"]:
807                self._cluster_time = cluster_time
808
809    def advance_cluster_time(self, cluster_time):
810        """Update the cluster time for this session.
811
812        :Parameters:
813          - `cluster_time`: The
814            :data:`~pymongo.client_session.ClientSession.cluster_time` from
815            another `ClientSession` instance.
816        """
817        if not isinstance(cluster_time, abc.Mapping):
818            raise TypeError(
819                "cluster_time must be a subclass of collections.Mapping")
820        if not isinstance(cluster_time.get("clusterTime"), Timestamp):
821            raise ValueError("Invalid cluster_time")
822        self._advance_cluster_time(cluster_time)
823
824    def _advance_operation_time(self, operation_time):
825        """Internal operation time helper."""
826        if self._operation_time is None:
827            self._operation_time = operation_time
828        elif operation_time is not None:
829            if operation_time > self._operation_time:
830                self._operation_time = operation_time
831
832    def advance_operation_time(self, operation_time):
833        """Update the operation time for this session.
834
835        :Parameters:
836          - `operation_time`: The
837            :data:`~pymongo.client_session.ClientSession.operation_time` from
838            another `ClientSession` instance.
839        """
840        if not isinstance(operation_time, Timestamp):
841            raise TypeError("operation_time must be an instance "
842                            "of bson.timestamp.Timestamp")
843        self._advance_operation_time(operation_time)
844
845    def _process_response(self, reply):
846        """Process a response to a command that was run with this session."""
847        self._advance_cluster_time(reply.get('$clusterTime'))
848        self._advance_operation_time(reply.get('operationTime'))
849        if self._options.snapshot and self._snapshot_time is None:
850            if 'cursor' in reply:
851                ct = reply['cursor'].get('atClusterTime')
852            else:
853                ct = reply.get('atClusterTime')
854            self._snapshot_time = ct
855        if self.in_transaction and self._transaction.sharded:
856            recovery_token = reply.get('recoveryToken')
857            if recovery_token:
858                self._transaction.recovery_token = recovery_token
859
860    @property
861    def has_ended(self):
862        """True if this session is finished."""
863        return self._server_session is None
864
865    @property
866    def in_transaction(self):
867        """True if this session has an active multi-statement transaction.
868
869        .. versionadded:: 3.10
870        """
871        return self._transaction.active()
872
873    @property
874    def _starting_transaction(self):
875        """True if this session is starting a multi-statement transaction.
876        """
877        return self._transaction.starting()
878
879    @property
880    def _pinned_address(self):
881        """The mongos address this transaction was created on."""
882        if self._transaction.active():
883            return self._transaction.pinned_address
884        return None
885
886    @property
887    def _pinned_connection(self):
888        """The connection this transaction was started on."""
889        return self._transaction.pinned_conn
890
891    def _pin(self, server, sock_info):
892        """Pin this session to the given Server or to the given connection."""
893        self._transaction.pin(server, sock_info)
894
895    def _unpin(self):
896        """Unpin this session from any pinned Server."""
897        self._transaction.unpin()
898
899    def _txn_read_preference(self):
900        """Return read preference of this transaction or None."""
901        if self.in_transaction:
902            return self._transaction.opts.read_preference
903        return None
904
905    def _apply_to(self, command, is_retryable, read_preference, sock_info):
906        self._check_ended()
907
908        if self.options.snapshot:
909            self._update_read_concern(command, sock_info)
910
911        self._server_session.last_use = monotonic.time()
912        command['lsid'] = self._server_session.session_id
913
914        if is_retryable:
915            command['txnNumber'] = self._server_session.transaction_id
916            return
917
918        if self.in_transaction:
919            if read_preference != ReadPreference.PRIMARY:
920                raise InvalidOperation(
921                    'read preference in a transaction must be primary, not: '
922                    '%r' % (read_preference,))
923
924            if self._transaction.state == _TxnState.STARTING:
925                # First command begins a new transaction.
926                self._transaction.state = _TxnState.IN_PROGRESS
927                command['startTransaction'] = True
928
929                if self._transaction.opts.read_concern:
930                    rc = self._transaction.opts.read_concern.document
931                    if rc:
932                        command['readConcern'] = rc
933                self._update_read_concern(command, sock_info)
934
935            command['txnNumber'] = self._server_session.transaction_id
936            command['autocommit'] = False
937
938    def _start_retryable_write(self):
939        self._check_ended()
940        self._server_session.inc_transaction_id()
941
942    def _update_read_concern(self, cmd, sock_info):
943        if (self.options.causal_consistency
944                and self.operation_time is not None):
945            cmd.setdefault('readConcern', {})[
946                'afterClusterTime'] = self.operation_time
947        if self.options.snapshot:
948            if sock_info.max_wire_version < 13:
949                raise ConfigurationError(
950                    'Snapshot reads require MongoDB 5.0 or later')
951            rc = cmd.setdefault('readConcern', {})
952            rc['level'] = 'snapshot'
953            if self._snapshot_time is not None:
954                rc['atClusterTime'] = self._snapshot_time
955
956
957class _ServerSession(object):
958    def __init__(self, generation):
959        # Ensure id is type 4, regardless of CodecOptions.uuid_representation.
960        self.session_id = {'id': Binary(uuid.uuid4().bytes, 4)}
961        self.last_use = monotonic.time()
962        self._transaction_id = 0
963        self.dirty = False
964        self.generation = generation
965
966    def mark_dirty(self):
967        """Mark this session as dirty.
968
969        A server session is marked dirty when a command fails with a network
970        error. Dirty sessions are later discarded from the server session pool.
971        """
972        self.dirty = True
973
974    def timed_out(self, session_timeout_minutes):
975        idle_seconds = monotonic.time() - self.last_use
976
977        # Timed out if we have less than a minute to live.
978        return idle_seconds > (session_timeout_minutes - 1) * 60
979
980    @property
981    def transaction_id(self):
982        """Positive 64-bit integer."""
983        return Int64(self._transaction_id)
984
985    def inc_transaction_id(self):
986        self._transaction_id += 1
987
988
989class _ServerSessionPool(collections.deque):
990    """Pool of _ServerSession objects.
991
992    This class is not thread-safe, access it while holding the Topology lock.
993    """
994    def __init__(self, *args, **kwargs):
995        super(_ServerSessionPool, self).__init__(*args, **kwargs)
996        self.generation = 0
997
998    def reset(self):
999        self.generation += 1
1000        self.clear()
1001
1002    def pop_all(self):
1003        ids = []
1004        while self:
1005            ids.append(self.pop().session_id)
1006        return ids
1007
1008    def get_server_session(self, session_timeout_minutes):
1009        # Although the Driver Sessions Spec says we only clear stale sessions
1010        # in return_server_session, PyMongo can't take a lock when returning
1011        # sessions from a __del__ method (like in Cursor.__die), so it can't
1012        # clear stale sessions there. In case many sessions were returned via
1013        # __del__, check for stale sessions here too.
1014        self._clear_stale(session_timeout_minutes)
1015
1016        # The most recently used sessions are on the left.
1017        while self:
1018            s = self.popleft()
1019            if not s.timed_out(session_timeout_minutes):
1020                return s
1021
1022        return _ServerSession(self.generation)
1023
1024    def return_server_session(self, server_session, session_timeout_minutes):
1025        if session_timeout_minutes is not None:
1026            self._clear_stale(session_timeout_minutes)
1027            if server_session.timed_out(session_timeout_minutes):
1028                return
1029        self.return_server_session_no_lock(server_session)
1030
1031    def return_server_session_no_lock(self, server_session):
1032        # Discard sessions from an old pool to avoid duplicate sessions in the
1033        # child process after a fork.
1034        if (server_session.generation == self.generation and
1035                not server_session.dirty):
1036            self.appendleft(server_session)
1037
1038    def _clear_stale(self, session_timeout_minutes):
1039        # Clear stale sessions. The least recently used are on the right.
1040        while self:
1041            if self[-1].timed_out(session_timeout_minutes):
1042                self.pop()
1043            else:
1044                # The remaining sessions also haven't timed out.
1045                break
1046