1# Copyright 2017 MongoDB, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Logical sessions for ordering sequential operations. 16 17Requires MongoDB 3.6. 18 19.. versionadded:: 3.6 20 21Causally Consistent Reads 22========================= 23 24.. code-block:: python 25 26 with client.start_session(causal_consistency=True) as session: 27 collection = client.db.collection 28 collection.update_one({'_id': 1}, {'$set': {'x': 10}}, session=session) 29 secondary_c = collection.with_options( 30 read_preference=ReadPreference.SECONDARY) 31 32 # A secondary read waits for replication of the write. 33 secondary_c.find_one({'_id': 1}, session=session) 34 35If `causal_consistency` is True (the default), read operations that use 36the session are causally after previous read and write operations. Using a 37causally consistent session, an application can read its own writes and is 38guaranteed monotonic reads, even when reading from replica set secondaries. 39 40.. mongodoc:: causal-consistency 41 42.. _transactions-ref: 43 44Transactions 45============ 46 47.. versionadded:: 3.7 48 49MongoDB 4.0 adds support for transactions on replica set primaries. A 50transaction is associated with a :class:`ClientSession`. To start a transaction 51on a session, use :meth:`ClientSession.start_transaction` in a with-statement. 52Then, execute an operation within the transaction by passing the session to the 53operation: 54 55.. code-block:: python 56 57 orders = client.db.orders 58 inventory = client.db.inventory 59 with client.start_session() as session: 60 with session.start_transaction(): 61 orders.insert_one({"sku": "abc123", "qty": 100}, session=session) 62 inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}}, 63 {"$inc": {"qty": -100}}, session=session) 64 65Upon normal completion of ``with session.start_transaction()`` block, the 66transaction automatically calls :meth:`ClientSession.commit_transaction`. 67If the block exits with an exception, the transaction automatically calls 68:meth:`ClientSession.abort_transaction`. 69 70In general, multi-document transactions only support read/write (CRUD) 71operations on existing collections. However, MongoDB 4.4 adds support for 72creating collections and indexes with some limitations, including an 73insert operation that would result in the creation of a new collection. 74For a complete description of all the supported and unsupported operations 75see the `MongoDB server's documentation for transactions 76<http://dochub.mongodb.org/core/transactions>`_. 77 78A session may only have a single active transaction at a time, multiple 79transactions on the same session can be executed in sequence. 80 81Sharded Transactions 82^^^^^^^^^^^^^^^^^^^^ 83 84.. versionadded:: 3.9 85 86PyMongo 3.9 adds support for transactions on sharded clusters running MongoDB 87>=4.2. Sharded transactions have the same API as replica set transactions. 88When running a transaction against a sharded cluster, the session is 89pinned to the mongos server selected for the first operation in the 90transaction. All subsequent operations that are part of the same transaction 91are routed to the same mongos server. When the transaction is completed, by 92running either commitTransaction or abortTransaction, the session is unpinned. 93 94.. mongodoc:: transactions 95 96.. _snapshot-reads-ref: 97 98Snapshot Reads 99============== 100 101.. versionadded:: 3.12 102 103MongoDB 5.0 adds support for snapshot reads. Snapshot reads are requested by 104passing the ``snapshot`` option to 105:meth:`~pymongo.mongo_client.MongoClient.start_session`. 106If ``snapshot`` is True, all read operations that use this session read data 107from the same snapshot timestamp. The server chooses the latest 108majority-committed snapshot timestamp when executing the first read operation 109using the session. Subsequent reads on this session read from the same 110snapshot timestamp. Snapshot reads are also supported when reading from 111replica set secondaries. 112 113.. code-block:: python 114 115 # Each read using this session reads data from the same point in time. 116 with client.start_session(snapshot=True) as session: 117 order = orders.find_one({"sku": "abc123"}, session=session) 118 inventory = inventory.find_one({"sku": "abc123"}, session=session) 119 120Snapshot Reads Limitations 121^^^^^^^^^^^^^^^^^^^^^^^^^^ 122 123Snapshot reads sessions are incompatible with ``causal_consistency=True``. 124Only the following read operations are supported in a snapshot reads session: 125 126- :meth:`~pymongo.collection.Collection.find` 127- :meth:`~pymongo.collection.Collection.find_one` 128- :meth:`~pymongo.collection.Collection.aggregate` 129- :meth:`~pymongo.collection.Collection.count_documents` 130- :meth:`~pymongo.collection.Collection.distinct` (on unsharded collections) 131 132Classes 133======= 134""" 135 136import collections 137import uuid 138 139from bson.binary import Binary 140from bson.int64 import Int64 141from bson.py3compat import abc, integer_types 142from bson.son import SON 143from bson.timestamp import Timestamp 144 145from pymongo import monotonic 146from pymongo.cursor import _SocketManager 147from pymongo.errors import (ConfigurationError, 148 ConnectionFailure, 149 InvalidOperation, 150 OperationFailure, 151 PyMongoError, 152 WTimeoutError) 153from pymongo.helpers import _RETRYABLE_ERROR_CODES 154from pymongo.read_concern import ReadConcern 155from pymongo.read_preferences import ReadPreference, _ServerMode 156from pymongo.server_type import SERVER_TYPE 157from pymongo.write_concern import WriteConcern 158 159 160class SessionOptions(object): 161 """Options for a new :class:`ClientSession`. 162 163 :Parameters: 164 - `causal_consistency` (optional): If True, read operations are causally 165 ordered within the session. Defaults to True when the ``snapshot`` 166 option is ``False``. 167 - `default_transaction_options` (optional): The default 168 TransactionOptions to use for transactions started on this session. 169 - `snapshot` (optional): If True, then all reads performed using this 170 session will read from the same snapshot. This option is incompatible 171 with ``causal_consistency=True``. Defaults to ``False``. 172 173 .. versionchanged:: 3.12 174 Added the ``snapshot`` parameter. 175 """ 176 def __init__(self, 177 causal_consistency=None, 178 default_transaction_options=None, 179 snapshot=False): 180 if snapshot: 181 if causal_consistency: 182 raise ConfigurationError('snapshot reads do not support ' 183 'causal_consistency=True') 184 causal_consistency = False 185 elif causal_consistency is None: 186 causal_consistency = True 187 self._causal_consistency = causal_consistency 188 if default_transaction_options is not None: 189 if not isinstance(default_transaction_options, TransactionOptions): 190 raise TypeError( 191 "default_transaction_options must be an instance of " 192 "pymongo.client_session.TransactionOptions, not: %r" % 193 (default_transaction_options,)) 194 self._default_transaction_options = default_transaction_options 195 self._snapshot = snapshot 196 197 @property 198 def causal_consistency(self): 199 """Whether causal consistency is configured.""" 200 return self._causal_consistency 201 202 @property 203 def default_transaction_options(self): 204 """The default TransactionOptions to use for transactions started on 205 this session. 206 207 .. versionadded:: 3.7 208 """ 209 return self._default_transaction_options 210 211 @property 212 def snapshot(self): 213 """Whether snapshot reads are configured. 214 215 .. versionadded:: 3.12 216 """ 217 return self._snapshot 218 219 220class TransactionOptions(object): 221 """Options for :meth:`ClientSession.start_transaction`. 222 223 :Parameters: 224 - `read_concern` (optional): The 225 :class:`~pymongo.read_concern.ReadConcern` to use for this transaction. 226 If ``None`` (the default) the :attr:`read_preference` of 227 the :class:`MongoClient` is used. 228 - `write_concern` (optional): The 229 :class:`~pymongo.write_concern.WriteConcern` to use for this 230 transaction. If ``None`` (the default) the :attr:`read_preference` of 231 the :class:`MongoClient` is used. 232 - `read_preference` (optional): The read preference to use. If 233 ``None`` (the default) the :attr:`read_preference` of this 234 :class:`MongoClient` is used. See :mod:`~pymongo.read_preferences` 235 for options. Transactions which read must use 236 :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`. 237 - `max_commit_time_ms` (optional): The maximum amount of time to allow a 238 single commitTransaction command to run. This option is an alias for 239 maxTimeMS option on the commitTransaction command. If ``None`` (the 240 default) maxTimeMS is not used. 241 242 .. versionchanged:: 3.9 243 Added the ``max_commit_time_ms`` option. 244 245 .. versionadded:: 3.7 246 """ 247 def __init__(self, read_concern=None, write_concern=None, 248 read_preference=None, max_commit_time_ms=None): 249 self._read_concern = read_concern 250 self._write_concern = write_concern 251 self._read_preference = read_preference 252 self._max_commit_time_ms = max_commit_time_ms 253 if read_concern is not None: 254 if not isinstance(read_concern, ReadConcern): 255 raise TypeError("read_concern must be an instance of " 256 "pymongo.read_concern.ReadConcern, not: %r" % 257 (read_concern,)) 258 if write_concern is not None: 259 if not isinstance(write_concern, WriteConcern): 260 raise TypeError("write_concern must be an instance of " 261 "pymongo.write_concern.WriteConcern, not: %r" % 262 (write_concern,)) 263 if not write_concern.acknowledged: 264 raise ConfigurationError( 265 "transactions do not support unacknowledged write concern" 266 ": %r" % (write_concern,)) 267 if read_preference is not None: 268 if not isinstance(read_preference, _ServerMode): 269 raise TypeError("%r is not valid for read_preference. See " 270 "pymongo.read_preferences for valid " 271 "options." % (read_preference,)) 272 if max_commit_time_ms is not None: 273 if not isinstance(max_commit_time_ms, integer_types): 274 raise TypeError( 275 "max_commit_time_ms must be an integer or None") 276 277 @property 278 def read_concern(self): 279 """This transaction's :class:`~pymongo.read_concern.ReadConcern`.""" 280 return self._read_concern 281 282 @property 283 def write_concern(self): 284 """This transaction's :class:`~pymongo.write_concern.WriteConcern`.""" 285 return self._write_concern 286 287 @property 288 def read_preference(self): 289 """This transaction's :class:`~pymongo.read_preferences.ReadPreference`. 290 """ 291 return self._read_preference 292 293 @property 294 def max_commit_time_ms(self): 295 """The maxTimeMS to use when running a commitTransaction command. 296 297 .. versionadded:: 3.9 298 """ 299 return self._max_commit_time_ms 300 301 302def _validate_session_write_concern(session, write_concern): 303 """Validate that an explicit session is not used with an unack'ed write. 304 305 Returns the session to use for the next operation. 306 """ 307 if session: 308 if write_concern is not None and not write_concern.acknowledged: 309 # For unacknowledged writes without an explicit session, 310 # drivers SHOULD NOT use an implicit session. If a driver 311 # creates an implicit session for unacknowledged writes 312 # without an explicit session, the driver MUST NOT send the 313 # session ID. 314 if session._implicit: 315 return None 316 else: 317 raise ConfigurationError( 318 'Explicit sessions are incompatible with ' 319 'unacknowledged write concern: %r' % ( 320 write_concern,)) 321 return session 322 323 324class _TransactionContext(object): 325 """Internal transaction context manager for start_transaction.""" 326 def __init__(self, session): 327 self.__session = session 328 329 def __enter__(self): 330 return self 331 332 def __exit__(self, exc_type, exc_val, exc_tb): 333 if self.__session.in_transaction: 334 if exc_val is None: 335 self.__session.commit_transaction() 336 else: 337 self.__session.abort_transaction() 338 339 340class _TxnState(object): 341 NONE = 1 342 STARTING = 2 343 IN_PROGRESS = 3 344 COMMITTED = 4 345 COMMITTED_EMPTY = 5 346 ABORTED = 6 347 348 349class _Transaction(object): 350 """Internal class to hold transaction information in a ClientSession.""" 351 def __init__(self, opts, client): 352 self.opts = opts 353 self.state = _TxnState.NONE 354 self.sharded = False 355 self.pinned_address = None 356 self.sock_mgr = None 357 self.recovery_token = None 358 self.attempt = 0 359 self.client = client 360 361 def active(self): 362 return self.state in (_TxnState.STARTING, _TxnState.IN_PROGRESS) 363 364 def starting(self): 365 return self.state == _TxnState.STARTING 366 367 @property 368 def pinned_conn(self): 369 if self.active() and self.sock_mgr: 370 return self.sock_mgr.sock 371 return None 372 373 def pin(self, server, sock_info): 374 self.sharded = True 375 self.pinned_address = server.description.address 376 if server.description.server_type == SERVER_TYPE.LoadBalancer: 377 sock_info.pin_txn() 378 self.sock_mgr = _SocketManager(sock_info, False) 379 380 def unpin(self): 381 self.pinned_address = None 382 if self.sock_mgr: 383 self.sock_mgr.close() 384 self.sock_mgr = None 385 386 def reset(self): 387 self.unpin() 388 self.state = _TxnState.NONE 389 self.sharded = False 390 self.recovery_token = None 391 self.attempt = 0 392 393 def __del__(self): 394 if self.sock_mgr: 395 # Reuse the cursor closing machinery to return the socket to the 396 # pool soon. 397 self.client._close_cursor_soon(0, None, self.sock_mgr) 398 self.sock_mgr = None 399 400 401def _reraise_with_unknown_commit(exc): 402 """Re-raise an exception with the UnknownTransactionCommitResult label.""" 403 exc._add_error_label("UnknownTransactionCommitResult") 404 raise 405 406 407def _max_time_expired_error(exc): 408 """Return true if exc is a MaxTimeMSExpired error.""" 409 return isinstance(exc, OperationFailure) and exc.code == 50 410 411 412# From the transactions spec, all the retryable writes errors plus 413# WriteConcernFailed. 414_UNKNOWN_COMMIT_ERROR_CODES = _RETRYABLE_ERROR_CODES | frozenset([ 415 64, # WriteConcernFailed 416 50, # MaxTimeMSExpired 417]) 418 419# From the Convenient API for Transactions spec, with_transaction must 420# halt retries after 120 seconds. 421# This limit is non-configurable and was chosen to be twice the 60 second 422# default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. 423_WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 424 425 426def _within_time_limit(start_time): 427 """Are we within the with_transaction retry limit?""" 428 return monotonic.time() - start_time < _WITH_TRANSACTION_RETRY_TIME_LIMIT 429 430 431class ClientSession(object): 432 """A session for ordering sequential operations. 433 434 :class:`ClientSession` instances are **not thread-safe or fork-safe**. 435 They can only be used by one thread or process at a time. A single 436 :class:`ClientSession` cannot be used to run multiple operations 437 concurrently. 438 439 Should not be initialized directly by application developers - to create a 440 :class:`ClientSession`, call 441 :meth:`~pymongo.mongo_client.MongoClient.start_session`. 442 """ 443 def __init__(self, client, server_session, options, authset, implicit): 444 # A MongoClient, a _ServerSession, a SessionOptions, and a set. 445 self._client = client 446 self._server_session = server_session 447 self._options = options 448 self._authset = authset 449 self._cluster_time = None 450 self._operation_time = None 451 self._snapshot_time = None 452 # Is this an implicitly created session? 453 self._implicit = implicit 454 self._transaction = _Transaction(None, client) 455 456 def end_session(self): 457 """Finish this session. If a transaction has started, abort it. 458 459 It is an error to use the session after the session has ended. 460 """ 461 self._end_session(lock=True) 462 463 def _end_session(self, lock): 464 if self._server_session is not None: 465 try: 466 if self.in_transaction: 467 self.abort_transaction() 468 # It's possible we're still pinned here when the transaction 469 # is in the committed state when the session is discarded. 470 self._unpin() 471 finally: 472 self._client._return_server_session(self._server_session, lock) 473 self._server_session = None 474 475 def _check_ended(self): 476 if self._server_session is None: 477 raise InvalidOperation("Cannot use ended session") 478 479 def __enter__(self): 480 return self 481 482 def __exit__(self, exc_type, exc_val, exc_tb): 483 self._end_session(lock=True) 484 485 @property 486 def client(self): 487 """The :class:`~pymongo.mongo_client.MongoClient` this session was 488 created from. 489 """ 490 return self._client 491 492 @property 493 def options(self): 494 """The :class:`SessionOptions` this session was created with.""" 495 return self._options 496 497 @property 498 def session_id(self): 499 """A BSON document, the opaque server session identifier.""" 500 self._check_ended() 501 return self._server_session.session_id 502 503 @property 504 def cluster_time(self): 505 """The cluster time returned by the last operation executed 506 in this session. 507 """ 508 return self._cluster_time 509 510 @property 511 def operation_time(self): 512 """The operation time returned by the last operation executed 513 in this session. 514 """ 515 return self._operation_time 516 517 def _inherit_option(self, name, val): 518 """Return the inherited TransactionOption value.""" 519 if val: 520 return val 521 txn_opts = self.options.default_transaction_options 522 val = txn_opts and getattr(txn_opts, name) 523 if val: 524 return val 525 return getattr(self.client, name) 526 527 def with_transaction(self, callback, read_concern=None, write_concern=None, 528 read_preference=None, max_commit_time_ms=None): 529 """Execute a callback in a transaction. 530 531 This method starts a transaction on this session, executes ``callback`` 532 once, and then commits the transaction. For example:: 533 534 def callback(session): 535 orders = session.client.db.orders 536 inventory = session.client.db.inventory 537 orders.insert_one({"sku": "abc123", "qty": 100}, session=session) 538 inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}}, 539 {"$inc": {"qty": -100}}, session=session) 540 541 with client.start_session() as session: 542 session.with_transaction(callback) 543 544 To pass arbitrary arguments to the ``callback``, wrap your callable 545 with a ``lambda`` like this:: 546 547 def callback(session, custom_arg, custom_kwarg=None): 548 # Transaction operations... 549 550 with client.start_session() as session: 551 session.with_transaction( 552 lambda s: callback(s, "custom_arg", custom_kwarg=1)) 553 554 In the event of an exception, ``with_transaction`` may retry the commit 555 or the entire transaction, therefore ``callback`` may be invoked 556 multiple times by a single call to ``with_transaction``. Developers 557 should be mindful of this possiblity when writing a ``callback`` that 558 modifies application state or has any other side-effects. 559 Note that even when the ``callback`` is invoked multiple times, 560 ``with_transaction`` ensures that the transaction will be committed 561 at-most-once on the server. 562 563 The ``callback`` should not attempt to start new transactions, but 564 should simply run operations meant to be contained within a 565 transaction. The ``callback`` should also not commit the transaction; 566 this is handled automatically by ``with_transaction``. If the 567 ``callback`` does commit or abort the transaction without error, 568 however, ``with_transaction`` will return without taking further 569 action. 570 571 :class:`ClientSession` instances are **not thread-safe or fork-safe**. 572 Consequently, the ``callback`` must not attempt to execute multiple 573 operations concurrently. 574 575 When ``callback`` raises an exception, ``with_transaction`` 576 automatically aborts the current transaction. When ``callback`` or 577 :meth:`~ClientSession.commit_transaction` raises an exception that 578 includes the ``"TransientTransactionError"`` error label, 579 ``with_transaction`` starts a new transaction and re-executes 580 the ``callback``. 581 582 When :meth:`~ClientSession.commit_transaction` raises an exception with 583 the ``"UnknownTransactionCommitResult"`` error label, 584 ``with_transaction`` retries the commit until the result of the 585 transaction is known. 586 587 This method will cease retrying after 120 seconds has elapsed. This 588 timeout is not configurable and any exception raised by the 589 ``callback`` or by :meth:`ClientSession.commit_transaction` after the 590 timeout is reached will be re-raised. Applications that desire a 591 different timeout duration should not use this method. 592 593 :Parameters: 594 - `callback`: The callable ``callback`` to run inside a transaction. 595 The callable must accept a single argument, this session. Note, 596 under certain error conditions the callback may be run multiple 597 times. 598 - `read_concern` (optional): The 599 :class:`~pymongo.read_concern.ReadConcern` to use for this 600 transaction. 601 - `write_concern` (optional): The 602 :class:`~pymongo.write_concern.WriteConcern` to use for this 603 transaction. 604 - `read_preference` (optional): The read preference to use for this 605 transaction. If ``None`` (the default) the :attr:`read_preference` 606 of this :class:`Database` is used. See 607 :mod:`~pymongo.read_preferences` for options. 608 609 :Returns: 610 The return value of the ``callback``. 611 612 .. versionadded:: 3.9 613 """ 614 start_time = monotonic.time() 615 while True: 616 self.start_transaction( 617 read_concern, write_concern, read_preference, 618 max_commit_time_ms) 619 try: 620 ret = callback(self) 621 except Exception as exc: 622 if self.in_transaction: 623 self.abort_transaction() 624 if (isinstance(exc, PyMongoError) and 625 exc.has_error_label("TransientTransactionError") and 626 _within_time_limit(start_time)): 627 # Retry the entire transaction. 628 continue 629 raise 630 631 if not self.in_transaction: 632 # Assume callback intentionally ended the transaction. 633 return ret 634 635 while True: 636 try: 637 self.commit_transaction() 638 except PyMongoError as exc: 639 if (exc.has_error_label("UnknownTransactionCommitResult") 640 and _within_time_limit(start_time) 641 and not _max_time_expired_error(exc)): 642 # Retry the commit. 643 continue 644 645 if (exc.has_error_label("TransientTransactionError") and 646 _within_time_limit(start_time)): 647 # Retry the entire transaction. 648 break 649 raise 650 651 # Commit succeeded. 652 return ret 653 654 def start_transaction(self, read_concern=None, write_concern=None, 655 read_preference=None, max_commit_time_ms=None): 656 """Start a multi-statement transaction. 657 658 Takes the same arguments as :class:`TransactionOptions`. 659 660 .. versionchanged:: 3.9 661 Added the ``max_commit_time_ms`` option. 662 663 .. versionadded:: 3.7 664 """ 665 self._check_ended() 666 667 if self.options.snapshot: 668 raise InvalidOperation("Transactions are not supported in " 669 "snapshot sessions") 670 671 if self.in_transaction: 672 raise InvalidOperation("Transaction already in progress") 673 674 read_concern = self._inherit_option("read_concern", read_concern) 675 write_concern = self._inherit_option("write_concern", write_concern) 676 read_preference = self._inherit_option( 677 "read_preference", read_preference) 678 if max_commit_time_ms is None: 679 opts = self.options.default_transaction_options 680 if opts: 681 max_commit_time_ms = opts.max_commit_time_ms 682 683 self._transaction.opts = TransactionOptions( 684 read_concern, write_concern, read_preference, max_commit_time_ms) 685 self._transaction.reset() 686 self._transaction.state = _TxnState.STARTING 687 self._start_retryable_write() 688 return _TransactionContext(self) 689 690 def commit_transaction(self): 691 """Commit a multi-statement transaction. 692 693 .. versionadded:: 3.7 694 """ 695 self._check_ended() 696 state = self._transaction.state 697 if state is _TxnState.NONE: 698 raise InvalidOperation("No transaction started") 699 elif state in (_TxnState.STARTING, _TxnState.COMMITTED_EMPTY): 700 # Server transaction was never started, no need to send a command. 701 self._transaction.state = _TxnState.COMMITTED_EMPTY 702 return 703 elif state is _TxnState.ABORTED: 704 raise InvalidOperation( 705 "Cannot call commitTransaction after calling abortTransaction") 706 elif state is _TxnState.COMMITTED: 707 # We're explicitly retrying the commit, move the state back to 708 # "in progress" so that in_transaction returns true. 709 self._transaction.state = _TxnState.IN_PROGRESS 710 711 try: 712 self._finish_transaction_with_retry("commitTransaction") 713 except ConnectionFailure as exc: 714 # We do not know if the commit was successfully applied on the 715 # server or if it satisfied the provided write concern, set the 716 # unknown commit error label. 717 exc._remove_error_label("TransientTransactionError") 718 _reraise_with_unknown_commit(exc) 719 except WTimeoutError as exc: 720 # We do not know if the commit has satisfied the provided write 721 # concern, add the unknown commit error label. 722 _reraise_with_unknown_commit(exc) 723 except OperationFailure as exc: 724 if exc.code not in _UNKNOWN_COMMIT_ERROR_CODES: 725 # The server reports errorLabels in the case. 726 raise 727 # We do not know if the commit was successfully applied on the 728 # server or if it satisfied the provided write concern, set the 729 # unknown commit error label. 730 _reraise_with_unknown_commit(exc) 731 finally: 732 self._transaction.state = _TxnState.COMMITTED 733 734 def abort_transaction(self): 735 """Abort a multi-statement transaction. 736 737 .. versionadded:: 3.7 738 """ 739 self._check_ended() 740 741 state = self._transaction.state 742 if state is _TxnState.NONE: 743 raise InvalidOperation("No transaction started") 744 elif state is _TxnState.STARTING: 745 # Server transaction was never started, no need to send a command. 746 self._transaction.state = _TxnState.ABORTED 747 return 748 elif state is _TxnState.ABORTED: 749 raise InvalidOperation("Cannot call abortTransaction twice") 750 elif state in (_TxnState.COMMITTED, _TxnState.COMMITTED_EMPTY): 751 raise InvalidOperation( 752 "Cannot call abortTransaction after calling commitTransaction") 753 754 try: 755 self._finish_transaction_with_retry("abortTransaction") 756 except (OperationFailure, ConnectionFailure): 757 # The transactions spec says to ignore abortTransaction errors. 758 pass 759 finally: 760 self._transaction.state = _TxnState.ABORTED 761 self._unpin() 762 763 def _finish_transaction_with_retry(self, command_name): 764 """Run commit or abort with one retry after any retryable error. 765 766 :Parameters: 767 - `command_name`: Either "commitTransaction" or "abortTransaction". 768 """ 769 def func(session, sock_info, retryable): 770 return self._finish_transaction(sock_info, command_name) 771 return self._client._retry_internal(True, func, self, None) 772 773 def _finish_transaction(self, sock_info, command_name): 774 self._transaction.attempt += 1 775 opts = self._transaction.opts 776 wc = opts.write_concern 777 cmd = SON([(command_name, 1)]) 778 if command_name == "commitTransaction": 779 if opts.max_commit_time_ms: 780 cmd['maxTimeMS'] = opts.max_commit_time_ms 781 782 # Transaction spec says that after the initial commit attempt, 783 # subsequent commitTransaction commands should be upgraded to use 784 # w:"majority" and set a default value of 10 seconds for wtimeout. 785 if self._transaction.attempt > 1: 786 wc_doc = wc.document 787 wc_doc["w"] = "majority" 788 wc_doc.setdefault("wtimeout", 10000) 789 wc = WriteConcern(**wc_doc) 790 791 if self._transaction.recovery_token: 792 cmd['recoveryToken'] = self._transaction.recovery_token 793 794 return self._client.admin._command( 795 sock_info, 796 cmd, 797 session=self, 798 write_concern=wc, 799 parse_write_concern_error=True) 800 801 def _advance_cluster_time(self, cluster_time): 802 """Internal cluster time helper.""" 803 if self._cluster_time is None: 804 self._cluster_time = cluster_time 805 elif cluster_time is not None: 806 if cluster_time["clusterTime"] > self._cluster_time["clusterTime"]: 807 self._cluster_time = cluster_time 808 809 def advance_cluster_time(self, cluster_time): 810 """Update the cluster time for this session. 811 812 :Parameters: 813 - `cluster_time`: The 814 :data:`~pymongo.client_session.ClientSession.cluster_time` from 815 another `ClientSession` instance. 816 """ 817 if not isinstance(cluster_time, abc.Mapping): 818 raise TypeError( 819 "cluster_time must be a subclass of collections.Mapping") 820 if not isinstance(cluster_time.get("clusterTime"), Timestamp): 821 raise ValueError("Invalid cluster_time") 822 self._advance_cluster_time(cluster_time) 823 824 def _advance_operation_time(self, operation_time): 825 """Internal operation time helper.""" 826 if self._operation_time is None: 827 self._operation_time = operation_time 828 elif operation_time is not None: 829 if operation_time > self._operation_time: 830 self._operation_time = operation_time 831 832 def advance_operation_time(self, operation_time): 833 """Update the operation time for this session. 834 835 :Parameters: 836 - `operation_time`: The 837 :data:`~pymongo.client_session.ClientSession.operation_time` from 838 another `ClientSession` instance. 839 """ 840 if not isinstance(operation_time, Timestamp): 841 raise TypeError("operation_time must be an instance " 842 "of bson.timestamp.Timestamp") 843 self._advance_operation_time(operation_time) 844 845 def _process_response(self, reply): 846 """Process a response to a command that was run with this session.""" 847 self._advance_cluster_time(reply.get('$clusterTime')) 848 self._advance_operation_time(reply.get('operationTime')) 849 if self._options.snapshot and self._snapshot_time is None: 850 if 'cursor' in reply: 851 ct = reply['cursor'].get('atClusterTime') 852 else: 853 ct = reply.get('atClusterTime') 854 self._snapshot_time = ct 855 if self.in_transaction and self._transaction.sharded: 856 recovery_token = reply.get('recoveryToken') 857 if recovery_token: 858 self._transaction.recovery_token = recovery_token 859 860 @property 861 def has_ended(self): 862 """True if this session is finished.""" 863 return self._server_session is None 864 865 @property 866 def in_transaction(self): 867 """True if this session has an active multi-statement transaction. 868 869 .. versionadded:: 3.10 870 """ 871 return self._transaction.active() 872 873 @property 874 def _starting_transaction(self): 875 """True if this session is starting a multi-statement transaction. 876 """ 877 return self._transaction.starting() 878 879 @property 880 def _pinned_address(self): 881 """The mongos address this transaction was created on.""" 882 if self._transaction.active(): 883 return self._transaction.pinned_address 884 return None 885 886 @property 887 def _pinned_connection(self): 888 """The connection this transaction was started on.""" 889 return self._transaction.pinned_conn 890 891 def _pin(self, server, sock_info): 892 """Pin this session to the given Server or to the given connection.""" 893 self._transaction.pin(server, sock_info) 894 895 def _unpin(self): 896 """Unpin this session from any pinned Server.""" 897 self._transaction.unpin() 898 899 def _txn_read_preference(self): 900 """Return read preference of this transaction or None.""" 901 if self.in_transaction: 902 return self._transaction.opts.read_preference 903 return None 904 905 def _apply_to(self, command, is_retryable, read_preference, sock_info): 906 self._check_ended() 907 908 if self.options.snapshot: 909 self._update_read_concern(command, sock_info) 910 911 self._server_session.last_use = monotonic.time() 912 command['lsid'] = self._server_session.session_id 913 914 if is_retryable: 915 command['txnNumber'] = self._server_session.transaction_id 916 return 917 918 if self.in_transaction: 919 if read_preference != ReadPreference.PRIMARY: 920 raise InvalidOperation( 921 'read preference in a transaction must be primary, not: ' 922 '%r' % (read_preference,)) 923 924 if self._transaction.state == _TxnState.STARTING: 925 # First command begins a new transaction. 926 self._transaction.state = _TxnState.IN_PROGRESS 927 command['startTransaction'] = True 928 929 if self._transaction.opts.read_concern: 930 rc = self._transaction.opts.read_concern.document 931 if rc: 932 command['readConcern'] = rc 933 self._update_read_concern(command, sock_info) 934 935 command['txnNumber'] = self._server_session.transaction_id 936 command['autocommit'] = False 937 938 def _start_retryable_write(self): 939 self._check_ended() 940 self._server_session.inc_transaction_id() 941 942 def _update_read_concern(self, cmd, sock_info): 943 if (self.options.causal_consistency 944 and self.operation_time is not None): 945 cmd.setdefault('readConcern', {})[ 946 'afterClusterTime'] = self.operation_time 947 if self.options.snapshot: 948 if sock_info.max_wire_version < 13: 949 raise ConfigurationError( 950 'Snapshot reads require MongoDB 5.0 or later') 951 rc = cmd.setdefault('readConcern', {}) 952 rc['level'] = 'snapshot' 953 if self._snapshot_time is not None: 954 rc['atClusterTime'] = self._snapshot_time 955 956 957class _ServerSession(object): 958 def __init__(self, generation): 959 # Ensure id is type 4, regardless of CodecOptions.uuid_representation. 960 self.session_id = {'id': Binary(uuid.uuid4().bytes, 4)} 961 self.last_use = monotonic.time() 962 self._transaction_id = 0 963 self.dirty = False 964 self.generation = generation 965 966 def mark_dirty(self): 967 """Mark this session as dirty. 968 969 A server session is marked dirty when a command fails with a network 970 error. Dirty sessions are later discarded from the server session pool. 971 """ 972 self.dirty = True 973 974 def timed_out(self, session_timeout_minutes): 975 idle_seconds = monotonic.time() - self.last_use 976 977 # Timed out if we have less than a minute to live. 978 return idle_seconds > (session_timeout_minutes - 1) * 60 979 980 @property 981 def transaction_id(self): 982 """Positive 64-bit integer.""" 983 return Int64(self._transaction_id) 984 985 def inc_transaction_id(self): 986 self._transaction_id += 1 987 988 989class _ServerSessionPool(collections.deque): 990 """Pool of _ServerSession objects. 991 992 This class is not thread-safe, access it while holding the Topology lock. 993 """ 994 def __init__(self, *args, **kwargs): 995 super(_ServerSessionPool, self).__init__(*args, **kwargs) 996 self.generation = 0 997 998 def reset(self): 999 self.generation += 1 1000 self.clear() 1001 1002 def pop_all(self): 1003 ids = [] 1004 while self: 1005 ids.append(self.pop().session_id) 1006 return ids 1007 1008 def get_server_session(self, session_timeout_minutes): 1009 # Although the Driver Sessions Spec says we only clear stale sessions 1010 # in return_server_session, PyMongo can't take a lock when returning 1011 # sessions from a __del__ method (like in Cursor.__die), so it can't 1012 # clear stale sessions there. In case many sessions were returned via 1013 # __del__, check for stale sessions here too. 1014 self._clear_stale(session_timeout_minutes) 1015 1016 # The most recently used sessions are on the left. 1017 while self: 1018 s = self.popleft() 1019 if not s.timed_out(session_timeout_minutes): 1020 return s 1021 1022 return _ServerSession(self.generation) 1023 1024 def return_server_session(self, server_session, session_timeout_minutes): 1025 if session_timeout_minutes is not None: 1026 self._clear_stale(session_timeout_minutes) 1027 if server_session.timed_out(session_timeout_minutes): 1028 return 1029 self.return_server_session_no_lock(server_session) 1030 1031 def return_server_session_no_lock(self, server_session): 1032 # Discard sessions from an old pool to avoid duplicate sessions in the 1033 # child process after a fork. 1034 if (server_session.generation == self.generation and 1035 not server_session.dirty): 1036 self.appendleft(server_session) 1037 1038 def _clear_stale(self, session_timeout_minutes): 1039 # Clear stale sessions. The least recently used are on the right. 1040 while self: 1041 if self[-1].timed_out(session_timeout_minutes): 1042 self.pop() 1043 else: 1044 # The remaining sessions also haven't timed out. 1045 break 1046