1# Copyright DataStax, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16This module houses the main classes you will interact with, 17:class:`.Cluster` and :class:`.Session`. 18""" 19from __future__ import absolute_import 20 21import atexit 22from collections import defaultdict, Mapping 23from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures 24from copy import copy 25from functools import partial, wraps 26from itertools import groupby, count 27import logging 28from warnings import warn 29from random import random 30import six 31from six.moves import filter, range, queue as Queue 32import socket 33import sys 34import time 35from threading import Lock, RLock, Thread, Event 36 37import weakref 38from weakref import WeakValueDictionary 39try: 40 from weakref import WeakSet 41except ImportError: 42 from cassandra.util import WeakSet # NOQA 43 44from cassandra import (ConsistencyLevel, AuthenticationFailed, 45 OperationTimedOut, UnsupportedOperation, 46 SchemaTargetType, DriverException, ProtocolVersion, 47 UnresolvableContactPoints) 48from cassandra.connection import (ConnectionException, ConnectionShutdown, 49 ConnectionHeartbeat, ProtocolVersionUnsupported, 50 EndPoint, DefaultEndPoint, DefaultEndPointFactory) 51from cassandra.cqltypes import UserType 52from cassandra.encoder import Encoder 53from cassandra.protocol import (QueryMessage, ResultMessage, 54 ErrorMessage, ReadTimeoutErrorMessage, 55 WriteTimeoutErrorMessage, 56 UnavailableErrorMessage, 57 OverloadedErrorMessage, 58 PrepareMessage, ExecuteMessage, 59 PreparedQueryNotFound, 60 IsBootstrappingErrorMessage, 61 TruncateError, ServerError, 62 BatchMessage, RESULT_KIND_PREPARED, 63 RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS, 64 RESULT_KIND_SCHEMA_CHANGE, ProtocolHandler) 65from cassandra.metadata import Metadata, protect_name, murmur3 66from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy, 67 ExponentialReconnectionPolicy, HostDistance, 68 RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan, 69 NoSpeculativeExecutionPolicy) 70from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler, 71 HostConnectionPool, HostConnection, 72 NoConnectionsAvailable) 73from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement, 74 BatchStatement, bind_params, QueryTrace, TraceUnavailable, 75 named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET) 76from cassandra.timestamps import MonotonicTimestampGenerator 77 78 79def _is_eventlet_monkey_patched(): 80 if 'eventlet.patcher' not in sys.modules: 81 return False 82 import eventlet.patcher 83 return eventlet.patcher.is_monkey_patched('socket') 84 85 86def _is_gevent_monkey_patched(): 87 if 'gevent.monkey' not in sys.modules: 88 return False 89 import gevent.socket 90 return socket.socket is gevent.socket.socket 91 92 93# default to gevent when we are monkey patched with gevent, eventlet when 94# monkey patched with eventlet, otherwise if libev is available, use that as 95# the default because it's fastest. Otherwise, use asyncore. 96if _is_gevent_monkey_patched(): 97 from cassandra.io.geventreactor import GeventConnection as DefaultConnection 98elif _is_eventlet_monkey_patched(): 99 from cassandra.io.eventletreactor import EventletConnection as DefaultConnection 100else: 101 try: 102 from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA 103 except ImportError: 104 from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA 105 106# Forces load of utf8 encoding module to avoid deadlock that occurs 107# if code that is being imported tries to import the module in a seperate 108# thread. 109# See http://bugs.python.org/issue10923 110"".encode('utf8') 111 112log = logging.getLogger(__name__) 113 114 115DEFAULT_MIN_REQUESTS = 5 116DEFAULT_MAX_REQUESTS = 100 117 118DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST = 2 119DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST = 8 120 121DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST = 1 122DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST = 2 123 124 125_NOT_SET = object() 126 127 128class NoHostAvailable(Exception): 129 """ 130 Raised when an operation is attempted but all connections are 131 busy, defunct, closed, or resulted in errors when used. 132 """ 133 134 errors = None 135 """ 136 A map of the form ``{ip: exception}`` which details the particular 137 Exception that was caught for each host the operation was attempted 138 against. 139 """ 140 141 def __init__(self, message, errors): 142 Exception.__init__(self, message, errors) 143 self.errors = errors 144 145 146def _future_completed(future): 147 """ Helper for run_in_executor() """ 148 exc = future.exception() 149 if exc: 150 log.debug("Failed to run task on executor", exc_info=exc) 151 152 153def run_in_executor(f): 154 """ 155 A decorator to run the given method in the ThreadPoolExecutor. 156 """ 157 158 @wraps(f) 159 def new_f(self, *args, **kwargs): 160 161 if self.is_shutdown: 162 return 163 try: 164 future = self.executor.submit(f, self, *args, **kwargs) 165 future.add_done_callback(_future_completed) 166 except Exception: 167 log.exception("Failed to submit task to executor") 168 169 return new_f 170 171 172_clusters_for_shutdown = set() 173 174 175def _register_cluster_shutdown(cluster): 176 _clusters_for_shutdown.add(cluster) 177 178 179def _discard_cluster_shutdown(cluster): 180 _clusters_for_shutdown.discard(cluster) 181 182 183def _shutdown_clusters(): 184 clusters = _clusters_for_shutdown.copy() # copy because shutdown modifies the global set "discard" 185 for cluster in clusters: 186 cluster.shutdown() 187 188 189atexit.register(_shutdown_clusters) 190 191 192def default_lbp_factory(): 193 if murmur3 is not None: 194 return TokenAwarePolicy(DCAwareRoundRobinPolicy()) 195 return DCAwareRoundRobinPolicy() 196 197 198def _addrinfo_or_none(contact_point, port): 199 """ 200 A helper function that wraps socket.getaddrinfo and returns None 201 when it fails to, e.g. resolve one of the hostnames. Used to address 202 PYTHON-895. 203 """ 204 try: 205 return socket.getaddrinfo(contact_point, port, 206 socket.AF_UNSPEC, socket.SOCK_STREAM) 207 except socket.gaierror: 208 log.debug('Could not resolve hostname "{}" ' 209 'with port {}'.format(contact_point, port)) 210 return None 211 212 213def _resolve_contact_points(contact_points, port): 214 resolved = tuple(_addrinfo_or_none(p, port) 215 for p in contact_points) 216 217 if resolved and all((x is None for x in resolved)): 218 raise UnresolvableContactPoints(contact_points, port) 219 220 resolved = tuple(r for r in resolved if r is not None) 221 222 return [endpoint[4][0] 223 for addrinfo in resolved 224 for endpoint in addrinfo] 225 226 227class ExecutionProfile(object): 228 load_balancing_policy = None 229 """ 230 An instance of :class:`.policies.LoadBalancingPolicy` or one of its subclasses. 231 232 Used in determining host distance for establishing connections, and routing requests. 233 234 Defaults to ``TokenAwarePolicy(DCAwareRoundRobinPolicy())`` if not specified 235 """ 236 237 retry_policy = None 238 """ 239 An instance of :class:`.policies.RetryPolicy` instance used when :class:`.Statement` objects do not have a 240 :attr:`~.Statement.retry_policy` explicitly set. 241 242 Defaults to :class:`.RetryPolicy` if not specified 243 """ 244 245 consistency_level = ConsistencyLevel.LOCAL_ONE 246 """ 247 :class:`.ConsistencyLevel` used when not specified on a :class:`.Statement`. 248 """ 249 250 serial_consistency_level = None 251 """ 252 Serial :class:`.ConsistencyLevel` used when not specified on a :class:`.Statement` (for LWT conditional statements). 253 """ 254 255 request_timeout = 10.0 256 """ 257 Request timeout used when not overridden in :meth:`.Session.execute` 258 """ 259 260 row_factory = staticmethod(tuple_factory) 261 """ 262 A callable to format results, accepting ``(colnames, rows)`` where ``colnames`` is a list of column names, and 263 ``rows`` is a list of tuples, with each tuple representing a row of parsed values. 264 265 Some example implementations: 266 267 - :func:`cassandra.query.tuple_factory` - return a result row as a tuple 268 - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple 269 - :func:`cassandra.query.dict_factory` - return a result row as a dict 270 - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict 271 """ 272 273 speculative_execution_policy = None 274 """ 275 An instance of :class:`.policies.SpeculativeExecutionPolicy` 276 277 Defaults to :class:`.NoSpeculativeExecutionPolicy` if not specified 278 """ 279 280 # indicates if lbp was set explicitly or uses default values 281 _load_balancing_policy_explicit = False 282 283 def __init__(self, load_balancing_policy=_NOT_SET, retry_policy=None, 284 consistency_level=ConsistencyLevel.LOCAL_ONE, serial_consistency_level=None, 285 request_timeout=10.0, row_factory=named_tuple_factory, speculative_execution_policy=None): 286 287 if load_balancing_policy is _NOT_SET: 288 self._load_balancing_policy_explicit = False 289 self.load_balancing_policy = default_lbp_factory() 290 else: 291 self._load_balancing_policy_explicit = True 292 self.load_balancing_policy = load_balancing_policy 293 self.retry_policy = retry_policy or RetryPolicy() 294 self.consistency_level = consistency_level 295 296 if (serial_consistency_level is not None and 297 not ConsistencyLevel.is_serial(serial_consistency_level)): 298 raise ValueError("serial_consistency_level must be either " 299 "ConsistencyLevel.SERIAL " 300 "or ConsistencyLevel.LOCAL_SERIAL.") 301 self.serial_consistency_level = serial_consistency_level 302 303 self.request_timeout = request_timeout 304 self.row_factory = row_factory 305 self.speculative_execution_policy = speculative_execution_policy or NoSpeculativeExecutionPolicy() 306 307 308class ProfileManager(object): 309 310 def __init__(self): 311 self.profiles = dict() 312 313 def _profiles_without_explicit_lbps(self): 314 names = (profile_name for 315 profile_name, profile in self.profiles.items() 316 if not profile._load_balancing_policy_explicit) 317 return tuple( 318 'EXEC_PROFILE_DEFAULT' if n is EXEC_PROFILE_DEFAULT else n 319 for n in names 320 ) 321 322 def distance(self, host): 323 distances = set(p.load_balancing_policy.distance(host) for p in self.profiles.values()) 324 return HostDistance.LOCAL if HostDistance.LOCAL in distances else \ 325 HostDistance.REMOTE if HostDistance.REMOTE in distances else \ 326 HostDistance.IGNORED 327 328 def populate(self, cluster, hosts): 329 for p in self.profiles.values(): 330 p.load_balancing_policy.populate(cluster, hosts) 331 332 def check_supported(self): 333 for p in self.profiles.values(): 334 p.load_balancing_policy.check_supported() 335 336 def on_up(self, host): 337 for p in self.profiles.values(): 338 p.load_balancing_policy.on_up(host) 339 340 def on_down(self, host): 341 for p in self.profiles.values(): 342 p.load_balancing_policy.on_down(host) 343 344 def on_add(self, host): 345 for p in self.profiles.values(): 346 p.load_balancing_policy.on_add(host) 347 348 def on_remove(self, host): 349 for p in self.profiles.values(): 350 p.load_balancing_policy.on_remove(host) 351 352 @property 353 def default(self): 354 """ 355 internal-only; no checks are done because this entry is populated on cluster init 356 """ 357 return self.profiles[EXEC_PROFILE_DEFAULT] 358 359 360EXEC_PROFILE_DEFAULT = object() 361""" 362Key for the ``Cluster`` default execution profile, used when no other profile is selected in 363``Session.execute(execution_profile)``. 364 365Use this as the key in ``Cluster(execution_profiles)`` to override the default profile. 366""" 367 368 369class _ConfigMode(object): 370 UNCOMMITTED = 0 371 LEGACY = 1 372 PROFILES = 2 373 374 375class Cluster(object): 376 """ 377 The main class to use when interacting with a Cassandra cluster. 378 Typically, one instance of this class will be created for each 379 separate Cassandra cluster that your application interacts with. 380 381 Example usage:: 382 383 >>> from cassandra.cluster import Cluster 384 >>> cluster = Cluster(['192.168.1.1', '192.168.1.2']) 385 >>> session = cluster.connect() 386 >>> session.execute("CREATE KEYSPACE ...") 387 >>> ... 388 >>> cluster.shutdown() 389 390 ``Cluster`` and ``Session`` also provide context management functions 391 which implicitly handle shutdown when leaving scope. 392 """ 393 394 contact_points = ['127.0.0.1'] 395 """ 396 The list of contact points to try connecting for cluster discovery. A 397 contact point can be a string (ip, hostname) or a 398 :class:`.connection.EndPoint` instance. 399 400 Defaults to loopback interface. 401 402 Note: When using :class:`.DCAwareLoadBalancingPolicy` with no explicit 403 local_dc set (as is the default), the DC is chosen from an arbitrary 404 host in contact_points. In this case, contact_points should contain 405 only nodes from a single, local DC. 406 407 Note: In the next major version, if you specify contact points, you will 408 also be required to also explicitly specify a load-balancing policy. This 409 change will help prevent cases where users had hard-to-debug issues 410 surrounding unintuitive default load-balancing policy behavior. 411 """ 412 # tracks if contact_points was set explicitly or with default values 413 _contact_points_explicit = None 414 415 port = 9042 416 """ 417 The server-side port to open connections to. Defaults to 9042. 418 """ 419 420 cql_version = None 421 """ 422 If a specific version of CQL should be used, this may be set to that 423 string version. Otherwise, the highest CQL version supported by the 424 server will be automatically used. 425 """ 426 427 protocol_version = ProtocolVersion.V4 428 """ 429 The maximum version of the native protocol to use. 430 431 See :class:`.ProtocolVersion` for more information about versions. 432 433 If not set in the constructor, the driver will automatically downgrade 434 version based on a negotiation with the server, but it is most efficient 435 to set this to the maximum supported by your version of Cassandra. 436 Setting this will also prevent conflicting versions negotiated if your 437 cluster is upgraded. 438 439 """ 440 441 allow_beta_protocol_version = False 442 443 no_compact = False 444 445 """ 446 Setting true injects a flag in all messages that makes the server accept and use "beta" protocol version. 447 Used for testing new protocol features incrementally before the new version is complete. 448 """ 449 450 compression = True 451 """ 452 Controls compression for communications between the driver and Cassandra. 453 If left as the default of :const:`True`, either lz4 or snappy compression 454 may be used, depending on what is supported by both the driver 455 and Cassandra. If both are fully supported, lz4 will be preferred. 456 457 You may also set this to 'snappy' or 'lz4' to request that specific 458 compression type. 459 460 Setting this to :const:`False` disables compression. 461 """ 462 463 _auth_provider = None 464 _auth_provider_callable = None 465 466 @property 467 def auth_provider(self): 468 """ 469 When :attr:`~.Cluster.protocol_version` is 2 or higher, this should 470 be an instance of a subclass of :class:`~cassandra.auth.AuthProvider`, 471 such as :class:`~.PlainTextAuthProvider`. 472 473 When :attr:`~.Cluster.protocol_version` is 1, this should be 474 a function that accepts one argument, the IP address of a node, 475 and returns a dict of credentials for that node. 476 477 When not using authentication, this should be left as :const:`None`. 478 """ 479 return self._auth_provider 480 481 @auth_provider.setter # noqa 482 def auth_provider(self, value): 483 if not value: 484 self._auth_provider = value 485 return 486 487 try: 488 self._auth_provider_callable = value.new_authenticator 489 except AttributeError: 490 if self.protocol_version > 1: 491 raise TypeError("auth_provider must implement the cassandra.auth.AuthProvider " 492 "interface when protocol_version >= 2") 493 elif not callable(value): 494 raise TypeError("auth_provider must be callable when protocol_version == 1") 495 self._auth_provider_callable = value 496 497 self._auth_provider = value 498 499 _load_balancing_policy = None 500 @property 501 def load_balancing_policy(self): 502 """ 503 An instance of :class:`.policies.LoadBalancingPolicy` or 504 one of its subclasses. 505 506 .. versionchanged:: 2.6.0 507 508 Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`). 509 when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy` 510 otherwise. Default local DC will be chosen from contact points. 511 512 **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to 513 DC locality and remote nodes.** 514 """ 515 return self._load_balancing_policy 516 517 @load_balancing_policy.setter 518 def load_balancing_policy(self, lbp): 519 if self._config_mode == _ConfigMode.PROFILES: 520 raise ValueError("Cannot set Cluster.load_balancing_policy while using Configuration Profiles. Set this in a profile instead.") 521 self._load_balancing_policy = lbp 522 self._config_mode = _ConfigMode.LEGACY 523 524 @property 525 def _default_load_balancing_policy(self): 526 return self.profile_manager.default.load_balancing_policy 527 528 reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0) 529 """ 530 An instance of :class:`.policies.ReconnectionPolicy`. Defaults to an instance 531 of :class:`.ExponentialReconnectionPolicy` with a base delay of one second and 532 a max delay of ten minutes. 533 """ 534 535 _default_retry_policy = RetryPolicy() 536 @property 537 def default_retry_policy(self): 538 """ 539 A default :class:`.policies.RetryPolicy` instance to use for all 540 :class:`.Statement` objects which do not have a :attr:`~.Statement.retry_policy` 541 explicitly set. 542 """ 543 return self._default_retry_policy 544 545 @default_retry_policy.setter 546 def default_retry_policy(self, policy): 547 if self._config_mode == _ConfigMode.PROFILES: 548 raise ValueError("Cannot set Cluster.default_retry_policy while using Configuration Profiles. Set this in a profile instead.") 549 self._default_retry_policy = policy 550 self._config_mode = _ConfigMode.LEGACY 551 552 conviction_policy_factory = SimpleConvictionPolicy 553 """ 554 A factory function which creates instances of 555 :class:`.policies.ConvictionPolicy`. Defaults to 556 :class:`.policies.SimpleConvictionPolicy`. 557 """ 558 559 address_translator = IdentityTranslator() 560 """ 561 :class:`.policies.AddressTranslator` instance to be used in translating server node addresses 562 to driver connection addresses. 563 """ 564 565 connect_to_remote_hosts = True 566 """ 567 If left as :const:`True`, hosts that are considered :attr:`~.HostDistance.REMOTE` 568 by the :attr:`~.Cluster.load_balancing_policy` will have a connection 569 opened to them. Otherwise, they will not have a connection opened to them. 570 571 Note that the default load balancing policy ignores remote hosts by default. 572 573 .. versionadded:: 2.1.0 574 """ 575 576 metrics_enabled = False 577 """ 578 Whether or not metric collection is enabled. If enabled, :attr:`.metrics` 579 will be an instance of :class:`~cassandra.metrics.Metrics`. 580 """ 581 582 metrics = None 583 """ 584 An instance of :class:`cassandra.metrics.Metrics` if :attr:`.metrics_enabled` is 585 :const:`True`, else :const:`None`. 586 """ 587 588 ssl_options = None 589 """ 590 Using ssl_options without ssl_context is deprecated and will be removed in the 591 next major release. 592 593 An optional dict which will be used as kwargs for ``ssl.SSLContext.wrap_socket`` (or 594 ``ssl.wrap_socket()`` if used without ssl_context) when new sockets are created. 595 This should be used when client encryption is enabled in Cassandra. 596 597 The following documentation only applies when ssl_options is used without ssl_context. 598 599 By default, a ``ca_certs`` value should be supplied (the value should be 600 a string pointing to the location of the CA certs file), and you probably 601 want to specify ``ssl_version`` as ``ssl.PROTOCOL_TLSv1`` to match 602 Cassandra's default protocol. 603 604 .. versionchanged:: 3.3.0 605 606 In addition to ``wrap_socket`` kwargs, clients may also specify ``'check_hostname': True`` to verify the cert hostname 607 as outlined in RFC 2818 and RFC 6125. Note that this requires the certificate to be transferred, so 608 should almost always require the option ``'cert_reqs': ssl.CERT_REQUIRED``. Note also that this functionality was not built into 609 Python standard library until (2.7.9, 3.2). To enable this mechanism in earlier versions, patch ``ssl.match_hostname`` 610 with a custom or `back-ported function <https://pypi.org/project/backports.ssl_match_hostname/>`_. 611 """ 612 613 ssl_context = None 614 """ 615 An optional ``ssl.SSLContext`` instance which will be used when new sockets are created. 616 This should be used when client encryption is enabled in Cassandra. 617 618 ``wrap_socket`` options can be set using :attr:`~Cluster.ssl_options`. ssl_options will 619 be used as kwargs for ``ssl.SSLContext.wrap_socket``. 620 621 .. versionadded:: 3.17.0 622 """ 623 624 sockopts = None 625 """ 626 An optional list of tuples which will be used as arguments to 627 ``socket.setsockopt()`` for all created sockets. 628 629 Note: some drivers find setting TCPNODELAY beneficial in the context of 630 their execution model. It was not found generally beneficial for this driver. 631 To try with your own workload, set ``sockopts = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]`` 632 """ 633 634 max_schema_agreement_wait = 10 635 """ 636 The maximum duration (in seconds) that the driver will wait for schema 637 agreement across the cluster. Defaults to ten seconds. 638 If set <= 0, the driver will bypass schema agreement waits altogether. 639 """ 640 641 metadata = None 642 """ 643 An instance of :class:`cassandra.metadata.Metadata`. 644 """ 645 646 connection_class = DefaultConnection 647 """ 648 This determines what event loop system will be used for managing 649 I/O with Cassandra. These are the current options: 650 651 * :class:`cassandra.io.asyncorereactor.AsyncoreConnection` 652 * :class:`cassandra.io.libevreactor.LibevConnection` 653 * :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details) 654 * :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details) 655 * :class:`cassandra.io.twistedreactor.TwistedConnection` 656 * EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection` 657 658 By default, ``AsyncoreConnection`` will be used, which uses 659 the ``asyncore`` module in the Python standard library. 660 661 If ``libev`` is installed, ``LibevConnection`` will be used instead. 662 663 If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding 664 connection class will be used automatically. 665 666 ``AsyncioConnection``, which uses the ``asyncio`` module in the Python 667 standard library, is also available, but currently experimental. Note that 668 it requires ``asyncio`` features that were only introduced in the 3.4 line 669 in 3.4.6, and in the 3.5 line in 3.5.1. 670 """ 671 672 control_connection_timeout = 2.0 673 """ 674 A timeout, in seconds, for queries made by the control connection, such 675 as querying the current schema and information about nodes in the cluster. 676 If set to :const:`None`, there will be no timeout for these queries. 677 """ 678 679 idle_heartbeat_interval = 30 680 """ 681 Interval, in seconds, on which to heartbeat idle connections. This helps 682 keep connections open through network devices that expire idle connections. 683 It also helps discover bad connections early in low-traffic scenarios. 684 Setting to zero disables heartbeats. 685 """ 686 687 idle_heartbeat_timeout = 30 688 """ 689 Timeout, in seconds, on which the heartbeat wait for idle connection responses. 690 Lowering this value can help to discover bad connections earlier. 691 """ 692 693 schema_event_refresh_window = 2 694 """ 695 Window, in seconds, within which a schema component will be refreshed after 696 receiving a schema_change event. 697 698 The driver delays a random amount of time in the range [0.0, window) 699 before executing the refresh. This serves two purposes: 700 701 1.) Spread the refresh for deployments with large fanout from C* to client tier, 702 preventing a 'thundering herd' problem with many clients refreshing simultaneously. 703 704 2.) Remove redundant refreshes. Redundant events arriving within the delay period 705 are discarded, and only one refresh is executed. 706 707 Setting this to zero will execute refreshes immediately. 708 709 Setting this negative will disable schema refreshes in response to push events 710 (refreshes will still occur in response to schema change responses to DDL statements 711 executed by Sessions of this Cluster). 712 """ 713 714 topology_event_refresh_window = 10 715 """ 716 Window, in seconds, within which the node and token list will be refreshed after 717 receiving a topology_change event. 718 719 Setting this to zero will execute refreshes immediately. 720 721 Setting this negative will disable node refreshes in response to push events. 722 723 See :attr:`.schema_event_refresh_window` for discussion of rationale 724 """ 725 726 status_event_refresh_window = 2 727 """ 728 Window, in seconds, within which the driver will start the reconnect after 729 receiving a status_change event. 730 731 Setting this to zero will connect immediately. 732 733 This is primarily used to avoid 'thundering herd' in deployments with large fanout from cluster to clients. 734 When nodes come up, clients attempt to reprepare prepared statements (depending on :attr:`.reprepare_on_up`), and 735 establish connection pools. This can cause a rush of connections and queries if not mitigated with this factor. 736 """ 737 738 prepare_on_all_hosts = True 739 """ 740 Specifies whether statements should be prepared on all hosts, or just one. 741 742 This can reasonably be disabled on long-running applications with numerous clients preparing statements on startup, 743 where a randomized initial condition of the load balancing policy can be expected to distribute prepares from 744 different clients across the cluster. 745 """ 746 747 reprepare_on_up = True 748 """ 749 Specifies whether all known prepared statements should be prepared on a node when it comes up. 750 751 May be used to avoid overwhelming a node on return, or if it is supposed that the node was only marked down due to 752 network. If statements are not reprepared, they are prepared on the first execution, causing 753 an extra roundtrip for one or more client requests. 754 """ 755 756 connect_timeout = 5 757 """ 758 Timeout, in seconds, for creating new connections. 759 760 This timeout covers the entire connection negotiation, including TCP 761 establishment, options passing, and authentication. 762 """ 763 764 timestamp_generator = None 765 """ 766 An object, shared between all sessions created by this cluster instance, 767 that generates timestamps when client-side timestamp generation is enabled. 768 By default, each :class:`Cluster` uses a new 769 :class:`~.MonotonicTimestampGenerator`. 770 771 Applications can set this value for custom timestamp behavior. See the 772 documentation for :meth:`Session.timestamp_generator`. 773 """ 774 775 @property 776 def schema_metadata_enabled(self): 777 """ 778 Flag indicating whether internal schema metadata is updated. 779 780 When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This 781 can be used to speed initial connection, and reduce load on client and server during operation. Turning this off 782 gives away token aware request routing, and programmatic inspection of the metadata model. 783 """ 784 return self.control_connection._schema_meta_enabled 785 786 @schema_metadata_enabled.setter 787 def schema_metadata_enabled(self, enabled): 788 self.control_connection._schema_meta_enabled = bool(enabled) 789 790 @property 791 def token_metadata_enabled(self): 792 """ 793 Flag indicating whether internal token metadata is updated. 794 795 When disabled, the driver does not query node token information on connect, or on topology change events. This 796 can be used to speed initial connection, and reduce load on client and server during operation. It is most useful 797 in large clusters using vnodes, where the token map can be expensive to compute. Turning this off 798 gives away token aware request routing, and programmatic inspection of the token ring. 799 """ 800 return self.control_connection._token_meta_enabled 801 802 @token_metadata_enabled.setter 803 def token_metadata_enabled(self, enabled): 804 self.control_connection._token_meta_enabled = bool(enabled) 805 806 endpoint_factory = None 807 """ 808 An :class:`~.connection.EndPointFactory` instance to use internally when creating 809 a socket connection to a node. You can ignore this unless you need a special 810 connection mechanism. 811 """ 812 813 profile_manager = None 814 _config_mode = _ConfigMode.UNCOMMITTED 815 816 sessions = None 817 control_connection = None 818 scheduler = None 819 executor = None 820 is_shutdown = False 821 _is_setup = False 822 _prepared_statements = None 823 _prepared_statement_lock = None 824 _idle_heartbeat = None 825 _protocol_version_explicit = False 826 _discount_down_events = True 827 828 _user_types = None 829 """ 830 A map of {keyspace: {type_name: UserType}} 831 """ 832 833 _listeners = None 834 _listener_lock = None 835 836 def __init__(self, 837 contact_points=_NOT_SET, 838 port=9042, 839 compression=True, 840 auth_provider=None, 841 load_balancing_policy=None, 842 reconnection_policy=None, 843 default_retry_policy=None, 844 conviction_policy_factory=None, 845 metrics_enabled=False, 846 connection_class=None, 847 ssl_options=None, 848 sockopts=None, 849 cql_version=None, 850 protocol_version=_NOT_SET, 851 executor_threads=2, 852 max_schema_agreement_wait=10, 853 control_connection_timeout=2.0, 854 idle_heartbeat_interval=30, 855 schema_event_refresh_window=2, 856 topology_event_refresh_window=10, 857 connect_timeout=5, 858 schema_metadata_enabled=True, 859 token_metadata_enabled=True, 860 address_translator=None, 861 status_event_refresh_window=2, 862 prepare_on_all_hosts=True, 863 reprepare_on_up=True, 864 execution_profiles=None, 865 allow_beta_protocol_version=False, 866 timestamp_generator=None, 867 idle_heartbeat_timeout=30, 868 no_compact=False, 869 ssl_context=None, 870 endpoint_factory=None): 871 """ 872 ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as 873 extablishing connection pools or refreshing metadata. 874 875 Any of the mutable Cluster attributes may be set as keyword arguments to the constructor. 876 """ 877 if contact_points is not None: 878 if contact_points is _NOT_SET: 879 self._contact_points_explicit = False 880 contact_points = ['127.0.0.1'] 881 else: 882 self._contact_points_explicit = True 883 884 if isinstance(contact_points, six.string_types): 885 raise TypeError("contact_points should not be a string, it should be a sequence (e.g. list) of strings") 886 887 if None in contact_points: 888 raise ValueError("contact_points should not contain None (it can resolve to localhost)") 889 self.contact_points = contact_points 890 891 self.port = port 892 893 self.endpoint_factory = endpoint_factory or DefaultEndPointFactory() 894 self.endpoint_factory.configure(self) 895 896 raw_contact_points = [cp for cp in self.contact_points if not isinstance(cp, EndPoint)] 897 self.endpoints_resolved = [cp for cp in self.contact_points if isinstance(cp, EndPoint)] 898 899 try: 900 self.endpoints_resolved += [DefaultEndPoint(address, self.port) 901 for address in _resolve_contact_points(raw_contact_points, self.port)] 902 except UnresolvableContactPoints: 903 # rethrow if no EndPoint was provided 904 if not self.endpoints_resolved: 905 raise 906 907 self.compression = compression 908 909 if protocol_version is not _NOT_SET: 910 self.protocol_version = protocol_version 911 self._protocol_version_explicit = True 912 self.allow_beta_protocol_version = allow_beta_protocol_version 913 914 self.no_compact = no_compact 915 916 self.auth_provider = auth_provider 917 918 if load_balancing_policy is not None: 919 if isinstance(load_balancing_policy, type): 920 raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class") 921 self.load_balancing_policy = load_balancing_policy 922 else: 923 self._load_balancing_policy = default_lbp_factory() # set internal attribute to avoid committing to legacy config mode 924 925 if reconnection_policy is not None: 926 if isinstance(reconnection_policy, type): 927 raise TypeError("reconnection_policy should not be a class, it should be an instance of that class") 928 self.reconnection_policy = reconnection_policy 929 930 if default_retry_policy is not None: 931 if isinstance(default_retry_policy, type): 932 raise TypeError("default_retry_policy should not be a class, it should be an instance of that class") 933 self.default_retry_policy = default_retry_policy 934 935 if conviction_policy_factory is not None: 936 if not callable(conviction_policy_factory): 937 raise ValueError("conviction_policy_factory must be callable") 938 self.conviction_policy_factory = conviction_policy_factory 939 940 if address_translator is not None: 941 if isinstance(address_translator, type): 942 raise TypeError("address_translator should not be a class, it should be an instance of that class") 943 self.address_translator = address_translator 944 945 if connection_class is not None: 946 self.connection_class = connection_class 947 948 if timestamp_generator is not None: 949 if not callable(timestamp_generator): 950 raise ValueError("timestamp_generator must be callable") 951 self.timestamp_generator = timestamp_generator 952 else: 953 self.timestamp_generator = MonotonicTimestampGenerator() 954 955 self.profile_manager = ProfileManager() 956 self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy, 957 self.default_retry_policy, 958 Session._default_consistency_level, 959 Session._default_serial_consistency_level, 960 Session._default_timeout, 961 Session._row_factory) 962 # legacy mode if either of these is not default 963 if load_balancing_policy or default_retry_policy: 964 if execution_profiles: 965 raise ValueError("Clusters constructed with execution_profiles should not specify legacy parameters " 966 "load_balancing_policy or default_retry_policy. Configure this in a profile instead.") 967 968 self._config_mode = _ConfigMode.LEGACY 969 warn("Legacy execution parameters will be removed in 4.0. Consider using " 970 "execution profiles.", DeprecationWarning) 971 972 else: 973 if execution_profiles: 974 self.profile_manager.profiles.update(execution_profiles) 975 self._config_mode = _ConfigMode.PROFILES 976 977 if self._contact_points_explicit: 978 if self._config_mode is _ConfigMode.PROFILES: 979 default_lbp_profiles = self.profile_manager._profiles_without_explicit_lbps() 980 if default_lbp_profiles: 981 log.warning( 982 'Cluster.__init__ called with contact_points ' 983 'specified, but load-balancing policies are not ' 984 'specified in some ExecutionProfiles. In the next ' 985 'major version, this will raise an error; please ' 986 'specify a load-balancing policy. ' 987 '(contact_points = {cp}, ' 988 'EPs without explicit LBPs = {eps})' 989 ''.format(cp=contact_points, eps=default_lbp_profiles)) 990 else: 991 if load_balancing_policy is None: 992 log.warning( 993 'Cluster.__init__ called with contact_points ' 994 'specified, but no load_balancing_policy. In the next ' 995 'major version, this will raise an error; please ' 996 'specify a load-balancing policy. ' 997 '(contact_points = {cp}, lbp = {lbp})' 998 ''.format(cp=contact_points, lbp=load_balancing_policy)) 999 1000 self.metrics_enabled = metrics_enabled 1001 1002 if ssl_options and not ssl_context: 1003 warn('Using ssl_options without ssl_context is ' 1004 'deprecated and will result in an error in ' 1005 'the next major release. Please use ssl_context ' 1006 'to prepare for that release.', 1007 DeprecationWarning) 1008 1009 self.ssl_options = ssl_options 1010 self.ssl_context = ssl_context 1011 self.sockopts = sockopts 1012 self.cql_version = cql_version 1013 self.max_schema_agreement_wait = max_schema_agreement_wait 1014 self.control_connection_timeout = control_connection_timeout 1015 self.idle_heartbeat_interval = idle_heartbeat_interval 1016 self.idle_heartbeat_timeout = idle_heartbeat_timeout 1017 self.schema_event_refresh_window = schema_event_refresh_window 1018 self.topology_event_refresh_window = topology_event_refresh_window 1019 self.status_event_refresh_window = status_event_refresh_window 1020 self.connect_timeout = connect_timeout 1021 self.prepare_on_all_hosts = prepare_on_all_hosts 1022 self.reprepare_on_up = reprepare_on_up 1023 1024 self._listeners = set() 1025 self._listener_lock = Lock() 1026 1027 # let Session objects be GC'ed (and shutdown) when the user no longer 1028 # holds a reference. 1029 self.sessions = WeakSet() 1030 self.metadata = Metadata() 1031 self.control_connection = None 1032 self._prepared_statements = WeakValueDictionary() 1033 self._prepared_statement_lock = Lock() 1034 1035 self._user_types = defaultdict(dict) 1036 1037 self._min_requests_per_connection = { 1038 HostDistance.LOCAL: DEFAULT_MIN_REQUESTS, 1039 HostDistance.REMOTE: DEFAULT_MIN_REQUESTS 1040 } 1041 1042 self._max_requests_per_connection = { 1043 HostDistance.LOCAL: DEFAULT_MAX_REQUESTS, 1044 HostDistance.REMOTE: DEFAULT_MAX_REQUESTS 1045 } 1046 1047 self._core_connections_per_host = { 1048 HostDistance.LOCAL: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST, 1049 HostDistance.REMOTE: DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST 1050 } 1051 1052 self._max_connections_per_host = { 1053 HostDistance.LOCAL: DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST, 1054 HostDistance.REMOTE: DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST 1055 } 1056 1057 self.executor = ThreadPoolExecutor(max_workers=executor_threads) 1058 self.scheduler = _Scheduler(self.executor) 1059 1060 self._lock = RLock() 1061 1062 if self.metrics_enabled: 1063 from cassandra.metrics import Metrics 1064 self.metrics = Metrics(weakref.proxy(self)) 1065 1066 self.control_connection = ControlConnection( 1067 self, self.control_connection_timeout, 1068 self.schema_event_refresh_window, self.topology_event_refresh_window, 1069 self.status_event_refresh_window, 1070 schema_metadata_enabled, token_metadata_enabled) 1071 1072 def register_user_type(self, keyspace, user_type, klass): 1073 """ 1074 Registers a class to use to represent a particular user-defined type. 1075 Query parameters for this user-defined type will be assumed to be 1076 instances of `klass`. Result sets for this user-defined type will 1077 be instances of `klass`. If no class is registered for a user-defined 1078 type, a namedtuple will be used for result sets, and non-prepared 1079 statements may not encode parameters for this type correctly. 1080 1081 `keyspace` is the name of the keyspace that the UDT is defined in. 1082 1083 `user_type` is the string name of the UDT to register the mapping 1084 for. 1085 1086 `klass` should be a class with attributes whose names match the 1087 fields of the user-defined type. The constructor must accepts kwargs 1088 for each of the fields in the UDT. 1089 1090 This method should only be called after the type has been created 1091 within Cassandra. 1092 1093 Example:: 1094 1095 cluster = Cluster(protocol_version=3) 1096 session = cluster.connect() 1097 session.set_keyspace('mykeyspace') 1098 session.execute("CREATE TYPE address (street text, zipcode int)") 1099 session.execute("CREATE TABLE users (id int PRIMARY KEY, location address)") 1100 1101 # create a class to map to the "address" UDT 1102 class Address(object): 1103 1104 def __init__(self, street, zipcode): 1105 self.street = street 1106 self.zipcode = zipcode 1107 1108 cluster.register_user_type('mykeyspace', 'address', Address) 1109 1110 # insert a row using an instance of Address 1111 session.execute("INSERT INTO users (id, location) VALUES (%s, %s)", 1112 (0, Address("123 Main St.", 78723))) 1113 1114 # results will include Address instances 1115 results = session.execute("SELECT * FROM users") 1116 row = results[0] 1117 print row.id, row.location.street, row.location.zipcode 1118 1119 """ 1120 if self.protocol_version < 3: 1121 log.warning("User Type serialization is only supported in native protocol version 3+ (%d in use). " 1122 "CQL encoding for simple statements will still work, but named tuples will " 1123 "be returned when reading type %s.%s.", self.protocol_version, keyspace, user_type) 1124 1125 self._user_types[keyspace][user_type] = klass 1126 for session in tuple(self.sessions): 1127 session.user_type_registered(keyspace, user_type, klass) 1128 UserType.evict_udt_class(keyspace, user_type) 1129 1130 def add_execution_profile(self, name, profile, pool_wait_timeout=5): 1131 """ 1132 Adds an :class:`.ExecutionProfile` to the cluster. This makes it available for use by ``name`` in :meth:`.Session.execute` 1133 and :meth:`.Session.execute_async`. This method will raise if the profile already exists. 1134 1135 Normally profiles will be injected at cluster initialization via ``Cluster(execution_profiles)``. This method 1136 provides a way of adding them dynamically. 1137 1138 Adding a new profile updates the connection pools according to the specified ``load_balancing_policy``. By default, 1139 this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately 1140 upon return. This behavior can be controlled using ``pool_wait_timeout`` (see 1141 `concurrent.futures.wait <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait>`_ 1142 for timeout semantics). 1143 """ 1144 if not isinstance(profile, ExecutionProfile): 1145 raise TypeError("profile must be an instance of ExecutionProfile") 1146 if self._config_mode == _ConfigMode.LEGACY: 1147 raise ValueError("Cannot add execution profiles when legacy parameters are set explicitly.") 1148 if name in self.profile_manager.profiles: 1149 raise ValueError("Profile {} already exists".format(name)) 1150 contact_points_but_no_lbp = ( 1151 self._contact_points_explicit and not 1152 profile._load_balancing_policy_explicit) 1153 if contact_points_but_no_lbp: 1154 log.warning( 1155 'Tried to add an ExecutionProfile with name {name}. ' 1156 '{self} was explicitly configured with contact_points, but ' 1157 '{ep} was not explicitly configured with a ' 1158 'load_balancing_policy. In the next major version, trying to ' 1159 'add an ExecutionProfile without an explicitly configured LBP ' 1160 'to a cluster with explicitly configured contact_points will ' 1161 'raise an exception; please specify a load-balancing policy ' 1162 'in the ExecutionProfile.' 1163 ''.format(name=repr(name), self=self, ep=profile)) 1164 1165 self.profile_manager.profiles[name] = profile 1166 profile.load_balancing_policy.populate(self, self.metadata.all_hosts()) 1167 # on_up after populate allows things like DCA LBP to choose default local dc 1168 for host in filter(lambda h: h.is_up, self.metadata.all_hosts()): 1169 profile.load_balancing_policy.on_up(host) 1170 futures = set() 1171 for session in tuple(self.sessions): 1172 futures.update(session.update_created_pools()) 1173 _, not_done = wait_futures(futures, pool_wait_timeout) 1174 if not_done: 1175 raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.") 1176 1177 def get_min_requests_per_connection(self, host_distance): 1178 return self._min_requests_per_connection[host_distance] 1179 1180 def set_min_requests_per_connection(self, host_distance, min_requests): 1181 """ 1182 Sets a threshold for concurrent requests per connection, below which 1183 connections will be considered for disposal (down to core connections; 1184 see :meth:`~Cluster.set_core_connections_per_host`). 1185 1186 Pertains to connection pool management in protocol versions {1,2}. 1187 """ 1188 if self.protocol_version >= 3: 1189 raise UnsupportedOperation( 1190 "Cluster.set_min_requests_per_connection() only has an effect " 1191 "when using protocol_version 1 or 2.") 1192 if min_requests < 0 or min_requests > 126 or \ 1193 min_requests >= self._max_requests_per_connection[host_distance]: 1194 raise ValueError("min_requests must be 0-126 and less than the max_requests for this host_distance (%d)" % 1195 (self._min_requests_per_connection[host_distance],)) 1196 self._min_requests_per_connection[host_distance] = min_requests 1197 1198 def get_max_requests_per_connection(self, host_distance): 1199 return self._max_requests_per_connection[host_distance] 1200 1201 def set_max_requests_per_connection(self, host_distance, max_requests): 1202 """ 1203 Sets a threshold for concurrent requests per connection, above which new 1204 connections will be created to a host (up to max connections; 1205 see :meth:`~Cluster.set_max_connections_per_host`). 1206 1207 Pertains to connection pool management in protocol versions {1,2}. 1208 """ 1209 if self.protocol_version >= 3: 1210 raise UnsupportedOperation( 1211 "Cluster.set_max_requests_per_connection() only has an effect " 1212 "when using protocol_version 1 or 2.") 1213 if max_requests < 1 or max_requests > 127 or \ 1214 max_requests <= self._min_requests_per_connection[host_distance]: 1215 raise ValueError("max_requests must be 1-127 and greater than the min_requests for this host_distance (%d)" % 1216 (self._min_requests_per_connection[host_distance],)) 1217 self._max_requests_per_connection[host_distance] = max_requests 1218 1219 def get_core_connections_per_host(self, host_distance): 1220 """ 1221 Gets the minimum number of connections per Session that will be opened 1222 for each host with :class:`~.HostDistance` equal to `host_distance`. 1223 The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for 1224 :attr:`~HostDistance.REMOTE`. 1225 1226 This property is ignored if :attr:`~.Cluster.protocol_version` is 1227 3 or higher. 1228 """ 1229 return self._core_connections_per_host[host_distance] 1230 1231 def set_core_connections_per_host(self, host_distance, core_connections): 1232 """ 1233 Sets the minimum number of connections per Session that will be opened 1234 for each host with :class:`~.HostDistance` equal to `host_distance`. 1235 The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for 1236 :attr:`~HostDistance.REMOTE`. 1237 1238 Protocol version 1 and 2 are limited in the number of concurrent 1239 requests they can send per connection. The driver implements connection 1240 pooling to support higher levels of concurrency. 1241 1242 If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this 1243 is not supported (there is always one connection per host, unless 1244 the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`) 1245 and using this will result in an :exc:`~.UnsupporteOperation`. 1246 """ 1247 if self.protocol_version >= 3: 1248 raise UnsupportedOperation( 1249 "Cluster.set_core_connections_per_host() only has an effect " 1250 "when using protocol_version 1 or 2.") 1251 old = self._core_connections_per_host[host_distance] 1252 self._core_connections_per_host[host_distance] = core_connections 1253 if old < core_connections: 1254 self._ensure_core_connections() 1255 1256 def get_max_connections_per_host(self, host_distance): 1257 """ 1258 Gets the maximum number of connections per Session that will be opened 1259 for each host with :class:`~.HostDistance` equal to `host_distance`. 1260 The default is 8 for :attr:`~HostDistance.LOCAL` and 2 for 1261 :attr:`~HostDistance.REMOTE`. 1262 1263 This property is ignored if :attr:`~.Cluster.protocol_version` is 1264 3 or higher. 1265 """ 1266 return self._max_connections_per_host[host_distance] 1267 1268 def set_max_connections_per_host(self, host_distance, max_connections): 1269 """ 1270 Sets the maximum number of connections per Session that will be opened 1271 for each host with :class:`~.HostDistance` equal to `host_distance`. 1272 The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for 1273 :attr:`~HostDistance.REMOTE`. 1274 1275 If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this 1276 is not supported (there is always one connection per host, unless 1277 the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`) 1278 and using this will result in an :exc:`~.UnsupporteOperation`. 1279 """ 1280 if self.protocol_version >= 3: 1281 raise UnsupportedOperation( 1282 "Cluster.set_max_connections_per_host() only has an effect " 1283 "when using protocol_version 1 or 2.") 1284 self._max_connections_per_host[host_distance] = max_connections 1285 1286 def connection_factory(self, endpoint, *args, **kwargs): 1287 """ 1288 Called to create a new connection with proper configuration. 1289 Intended for internal use only. 1290 """ 1291 kwargs = self._make_connection_kwargs(endpoint, kwargs) 1292 return self.connection_class.factory(endpoint, self.connect_timeout, *args, **kwargs) 1293 1294 def _make_connection_factory(self, host, *args, **kwargs): 1295 kwargs = self._make_connection_kwargs(host.endpoint, kwargs) 1296 return partial(self.connection_class.factory, host.endpoint, self.connect_timeout, *args, **kwargs) 1297 1298 def _make_connection_kwargs(self, endpoint, kwargs_dict): 1299 if self._auth_provider_callable: 1300 kwargs_dict.setdefault('authenticator', self._auth_provider_callable(endpoint.address)) 1301 1302 kwargs_dict.setdefault('port', self.port) 1303 kwargs_dict.setdefault('compression', self.compression) 1304 kwargs_dict.setdefault('sockopts', self.sockopts) 1305 kwargs_dict.setdefault('ssl_options', self.ssl_options) 1306 kwargs_dict.setdefault('ssl_context', self.ssl_context) 1307 kwargs_dict.setdefault('cql_version', self.cql_version) 1308 kwargs_dict.setdefault('protocol_version', self.protocol_version) 1309 kwargs_dict.setdefault('user_type_map', self._user_types) 1310 kwargs_dict.setdefault('allow_beta_protocol_version', self.allow_beta_protocol_version) 1311 kwargs_dict.setdefault('no_compact', self.no_compact) 1312 1313 return kwargs_dict 1314 1315 def protocol_downgrade(self, host_endpoint, previous_version): 1316 if self._protocol_version_explicit: 1317 raise DriverException("ProtocolError returned from server while using explicitly set client protocol_version %d" % (previous_version,)) 1318 1319 new_version = ProtocolVersion.get_lower_supported(previous_version) 1320 if new_version < ProtocolVersion.MIN_SUPPORTED: 1321 raise DriverException( 1322 "Cannot downgrade protocol version below minimum supported version: %d" % (ProtocolVersion.MIN_SUPPORTED,)) 1323 1324 log.warning("Downgrading core protocol version from %d to %d for %s. " 1325 "To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. " 1326 "http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint) 1327 self.protocol_version = new_version 1328 1329 def connect(self, keyspace=None, wait_for_all_pools=False): 1330 """ 1331 Creates and returns a new :class:`~.Session` object. If `keyspace` 1332 is specified, that keyspace will be the default keyspace for 1333 operations on the ``Session``. 1334 """ 1335 with self._lock: 1336 if self.is_shutdown: 1337 raise DriverException("Cluster is already shut down") 1338 1339 if not self._is_setup: 1340 log.debug("Connecting to cluster, contact points: %s; protocol version: %s", 1341 self.contact_points, self.protocol_version) 1342 self.connection_class.initialize_reactor() 1343 _register_cluster_shutdown(self) 1344 for endpoint in self.endpoints_resolved: 1345 host, new = self.add_host(endpoint, signal=False) 1346 if new: 1347 host.set_up() 1348 for listener in self.listeners: 1349 listener.on_add(host) 1350 1351 self.profile_manager.populate( 1352 weakref.proxy(self), self.metadata.all_hosts()) 1353 self.load_balancing_policy.populate( 1354 weakref.proxy(self), self.metadata.all_hosts() 1355 ) 1356 1357 try: 1358 self.control_connection.connect() 1359 1360 # we set all contact points up for connecting, but we won't infer state after this 1361 for endpoint in self.endpoints_resolved: 1362 h = self.metadata.get_host(endpoint) 1363 if h and self.profile_manager.distance(h) == HostDistance.IGNORED: 1364 h.is_up = None 1365 1366 log.debug("Control connection created") 1367 except Exception: 1368 log.exception("Control connection failed to connect, " 1369 "shutting down Cluster:") 1370 self.shutdown() 1371 raise 1372 1373 self.profile_manager.check_supported() # todo: rename this method 1374 1375 if self.idle_heartbeat_interval: 1376 self._idle_heartbeat = ConnectionHeartbeat( 1377 self.idle_heartbeat_interval, 1378 self.get_connection_holders, 1379 timeout=self.idle_heartbeat_timeout 1380 ) 1381 self._is_setup = True 1382 1383 session = self._new_session(keyspace) 1384 if wait_for_all_pools: 1385 wait_futures(session._initial_connect_futures) 1386 return session 1387 1388 def get_connection_holders(self): 1389 holders = [] 1390 for s in tuple(self.sessions): 1391 holders.extend(s.get_pools()) 1392 holders.append(self.control_connection) 1393 return holders 1394 1395 def shutdown(self): 1396 """ 1397 Closes all sessions and connection associated with this Cluster. 1398 To ensure all connections are properly closed, **you should always 1399 call shutdown() on a Cluster instance when you are done with it**. 1400 1401 Once shutdown, a Cluster should not be used for any purpose. 1402 """ 1403 with self._lock: 1404 if self.is_shutdown: 1405 return 1406 else: 1407 self.is_shutdown = True 1408 1409 if self._idle_heartbeat: 1410 self._idle_heartbeat.stop() 1411 1412 self.scheduler.shutdown() 1413 1414 self.control_connection.shutdown() 1415 1416 for session in tuple(self.sessions): 1417 session.shutdown() 1418 1419 self.executor.shutdown() 1420 1421 _discard_cluster_shutdown(self) 1422 1423 def __enter__(self): 1424 return self 1425 1426 def __exit__(self, *args): 1427 self.shutdown() 1428 1429 def _new_session(self, keyspace): 1430 session = Session(self, self.metadata.all_hosts(), keyspace) 1431 self._session_register_user_types(session) 1432 self.sessions.add(session) 1433 return session 1434 1435 def _session_register_user_types(self, session): 1436 for keyspace, type_map in six.iteritems(self._user_types): 1437 for udt_name, klass in six.iteritems(type_map): 1438 session.user_type_registered(keyspace, udt_name, klass) 1439 1440 def _cleanup_failed_on_up_handling(self, host): 1441 self.profile_manager.on_down(host) 1442 self.control_connection.on_down(host) 1443 for session in tuple(self.sessions): 1444 session.remove_pool(host) 1445 1446 self._start_reconnector(host, is_host_addition=False) 1447 1448 def _on_up_future_completed(self, host, futures, results, lock, finished_future): 1449 with lock: 1450 futures.discard(finished_future) 1451 1452 try: 1453 results.append(finished_future.result()) 1454 except Exception as exc: 1455 results.append(exc) 1456 1457 if futures: 1458 return 1459 1460 try: 1461 # all futures have completed at this point 1462 for exc in [f for f in results if isinstance(f, Exception)]: 1463 log.error("Unexpected failure while marking node %s up:", host, exc_info=exc) 1464 self._cleanup_failed_on_up_handling(host) 1465 return 1466 1467 if not all(results): 1468 log.debug("Connection pool could not be created, not marking node %s up", host) 1469 self._cleanup_failed_on_up_handling(host) 1470 return 1471 1472 log.info("Connection pools established for node %s", host) 1473 # mark the host as up and notify all listeners 1474 host.set_up() 1475 for listener in self.listeners: 1476 listener.on_up(host) 1477 finally: 1478 with host.lock: 1479 host._currently_handling_node_up = False 1480 1481 # see if there are any pools to add or remove now that the host is marked up 1482 for session in tuple(self.sessions): 1483 session.update_created_pools() 1484 1485 def on_up(self, host): 1486 """ 1487 Intended for internal use only. 1488 """ 1489 if self.is_shutdown: 1490 return 1491 1492 log.debug("Waiting to acquire lock for handling up status of node %s", host) 1493 with host.lock: 1494 if host._currently_handling_node_up: 1495 log.debug("Another thread is already handling up status of node %s", host) 1496 return 1497 1498 if host.is_up: 1499 log.debug("Host %s was already marked up", host) 1500 return 1501 1502 host._currently_handling_node_up = True 1503 log.debug("Starting to handle up status of node %s", host) 1504 1505 have_future = False 1506 futures = set() 1507 try: 1508 log.info("Host %s may be up; will prepare queries and open connection pool", host) 1509 1510 reconnector = host.get_and_set_reconnection_handler(None) 1511 if reconnector: 1512 log.debug("Now that host %s is up, cancelling the reconnection handler", host) 1513 reconnector.cancel() 1514 1515 if self.profile_manager.distance(host) != HostDistance.IGNORED: 1516 self._prepare_all_queries(host) 1517 log.debug("Done preparing all queries for host %s, ", host) 1518 1519 for session in tuple(self.sessions): 1520 session.remove_pool(host) 1521 1522 log.debug("Signalling to load balancing policies that host %s is up", host) 1523 self.profile_manager.on_up(host) 1524 1525 log.debug("Signalling to control connection that host %s is up", host) 1526 self.control_connection.on_up(host) 1527 1528 log.debug("Attempting to open new connection pools for host %s", host) 1529 futures_lock = Lock() 1530 futures_results = [] 1531 callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock) 1532 for session in tuple(self.sessions): 1533 future = session.add_or_renew_pool(host, is_host_addition=False) 1534 if future is not None: 1535 have_future = True 1536 future.add_done_callback(callback) 1537 futures.add(future) 1538 except Exception: 1539 log.exception("Unexpected failure handling node %s being marked up:", host) 1540 for future in futures: 1541 future.cancel() 1542 1543 self._cleanup_failed_on_up_handling(host) 1544 1545 with host.lock: 1546 host._currently_handling_node_up = False 1547 raise 1548 else: 1549 if not have_future: 1550 with host.lock: 1551 host.set_up() 1552 host._currently_handling_node_up = False 1553 1554 # for testing purposes 1555 return futures 1556 1557 def _start_reconnector(self, host, is_host_addition): 1558 if self.profile_manager.distance(host) == HostDistance.IGNORED: 1559 return 1560 1561 schedule = self.reconnection_policy.new_schedule() 1562 1563 # in order to not hold references to this Cluster open and prevent 1564 # proper shutdown when the program ends, we'll just make a closure 1565 # of the current Cluster attributes to create new Connections with 1566 conn_factory = self._make_connection_factory(host) 1567 1568 reconnector = _HostReconnectionHandler( 1569 host, conn_factory, is_host_addition, self.on_add, self.on_up, 1570 self.scheduler, schedule, host.get_and_set_reconnection_handler, 1571 new_handler=None) 1572 1573 old_reconnector = host.get_and_set_reconnection_handler(reconnector) 1574 if old_reconnector: 1575 log.debug("Old host reconnector found for %s, cancelling", host) 1576 old_reconnector.cancel() 1577 1578 log.debug("Starting reconnector for host %s", host) 1579 reconnector.start() 1580 1581 @run_in_executor 1582 def on_down(self, host, is_host_addition, expect_host_to_be_down=False): 1583 """ 1584 Intended for internal use only. 1585 """ 1586 if self.is_shutdown: 1587 return 1588 1589 with host.lock: 1590 was_up = host.is_up 1591 1592 # ignore down signals if we have open pools to the host 1593 # this is to avoid closing pools when a control connection host became isolated 1594 if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED: 1595 connected = False 1596 for session in tuple(self.sessions): 1597 pool_states = session.get_pool_state() 1598 pool_state = pool_states.get(host) 1599 if pool_state: 1600 connected |= pool_state['open_count'] > 0 1601 if connected: 1602 return 1603 1604 host.set_down() 1605 if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting(): 1606 return 1607 1608 log.warning("Host %s has been marked down", host) 1609 1610 self.profile_manager.on_down(host) 1611 self.control_connection.on_down(host) 1612 for session in tuple(self.sessions): 1613 session.on_down(host) 1614 1615 for listener in self.listeners: 1616 listener.on_down(host) 1617 1618 self._start_reconnector(host, is_host_addition) 1619 1620 def on_add(self, host, refresh_nodes=True): 1621 if self.is_shutdown: 1622 return 1623 1624 log.debug("Handling new host %r and notifying listeners", host) 1625 1626 distance = self.profile_manager.distance(host) 1627 if distance != HostDistance.IGNORED: 1628 self._prepare_all_queries(host) 1629 log.debug("Done preparing queries for new host %r", host) 1630 1631 self.profile_manager.on_add(host) 1632 self.control_connection.on_add(host, refresh_nodes) 1633 1634 if distance == HostDistance.IGNORED: 1635 log.debug("Not adding connection pool for new host %r because the " 1636 "load balancing policy has marked it as IGNORED", host) 1637 self._finalize_add(host, set_up=False) 1638 return 1639 1640 futures_lock = Lock() 1641 futures_results = [] 1642 futures = set() 1643 1644 def future_completed(future): 1645 with futures_lock: 1646 futures.discard(future) 1647 1648 try: 1649 futures_results.append(future.result()) 1650 except Exception as exc: 1651 futures_results.append(exc) 1652 1653 if futures: 1654 return 1655 1656 log.debug('All futures have completed for added host %s', host) 1657 1658 for exc in [f for f in futures_results if isinstance(f, Exception)]: 1659 log.error("Unexpected failure while adding node %s, will not mark up:", host, exc_info=exc) 1660 return 1661 1662 if not all(futures_results): 1663 log.warning("Connection pool could not be created, not marking node %s up", host) 1664 return 1665 1666 self._finalize_add(host) 1667 1668 have_future = False 1669 for session in tuple(self.sessions): 1670 future = session.add_or_renew_pool(host, is_host_addition=True) 1671 if future is not None: 1672 have_future = True 1673 futures.add(future) 1674 future.add_done_callback(future_completed) 1675 1676 if not have_future: 1677 self._finalize_add(host) 1678 1679 def _finalize_add(self, host, set_up=True): 1680 if set_up: 1681 host.set_up() 1682 1683 for listener in self.listeners: 1684 listener.on_add(host) 1685 1686 # see if there are any pools to add or remove now that the host is marked up 1687 for session in tuple(self.sessions): 1688 session.update_created_pools() 1689 1690 def on_remove(self, host): 1691 if self.is_shutdown: 1692 return 1693 1694 log.debug("Removing host %s", host) 1695 host.set_down() 1696 self.profile_manager.on_remove(host) 1697 for session in tuple(self.sessions): 1698 session.on_remove(host) 1699 for listener in self.listeners: 1700 listener.on_remove(host) 1701 self.control_connection.on_remove(host) 1702 1703 def signal_connection_failure(self, host, connection_exc, is_host_addition, expect_host_to_be_down=False): 1704 is_down = host.signal_connection_failure(connection_exc) 1705 if is_down: 1706 self.on_down(host, is_host_addition, expect_host_to_be_down) 1707 return is_down 1708 1709 def add_host(self, endpoint, datacenter=None, rack=None, signal=True, refresh_nodes=True): 1710 """ 1711 Called when adding initial contact points and when the control 1712 connection subsequently discovers a new node. 1713 Returns a Host instance, and a flag indicating whether it was new in 1714 the metadata. 1715 Intended for internal use only. 1716 """ 1717 host, new = self.metadata.add_or_return_host(Host(endpoint, self.conviction_policy_factory, datacenter, rack)) 1718 if new and signal: 1719 log.info("New Cassandra host %r discovered", host) 1720 self.on_add(host, refresh_nodes) 1721 1722 return host, new 1723 1724 def remove_host(self, host): 1725 """ 1726 Called when the control connection observes that a node has left the 1727 ring. Intended for internal use only. 1728 """ 1729 if host and self.metadata.remove_host(host): 1730 log.info("Cassandra host %s removed", host) 1731 self.on_remove(host) 1732 1733 def register_listener(self, listener): 1734 """ 1735 Adds a :class:`cassandra.policies.HostStateListener` subclass instance to 1736 the list of listeners to be notified when a host is added, removed, 1737 marked up, or marked down. 1738 """ 1739 with self._listener_lock: 1740 self._listeners.add(listener) 1741 1742 def unregister_listener(self, listener): 1743 """ Removes a registered listener. """ 1744 with self._listener_lock: 1745 self._listeners.remove(listener) 1746 1747 @property 1748 def listeners(self): 1749 with self._listener_lock: 1750 return self._listeners.copy() 1751 1752 def _ensure_core_connections(self): 1753 """ 1754 If any host has fewer than the configured number of core connections 1755 open, attempt to open connections until that number is met. 1756 """ 1757 for session in tuple(self.sessions): 1758 for pool in tuple(session._pools.values()): 1759 pool.ensure_core_connections() 1760 1761 @staticmethod 1762 def _validate_refresh_schema(keyspace, table, usertype, function, aggregate): 1763 if any((table, usertype, function, aggregate)): 1764 if not keyspace: 1765 raise ValueError("keyspace is required to refresh specific sub-entity {table, usertype, function, aggregate}") 1766 if sum(1 for e in (table, usertype, function) if e) > 1: 1767 raise ValueError("{table, usertype, function, aggregate} are mutually exclusive") 1768 1769 @staticmethod 1770 def _target_type_from_refresh_args(keyspace, table, usertype, function, aggregate): 1771 if aggregate: 1772 return SchemaTargetType.AGGREGATE 1773 elif function: 1774 return SchemaTargetType.FUNCTION 1775 elif usertype: 1776 return SchemaTargetType.TYPE 1777 elif table: 1778 return SchemaTargetType.TABLE 1779 elif keyspace: 1780 return SchemaTargetType.KEYSPACE 1781 return None 1782 1783 def get_control_connection_host(self): 1784 """ 1785 Returns the control connection host metadata. 1786 """ 1787 connection = self.control_connection._connection 1788 endpoint = connection.endpoint if connection else None 1789 return self.metadata.get_host(endpoint) if endpoint else None 1790 1791 def refresh_schema_metadata(self, max_schema_agreement_wait=None): 1792 """ 1793 Synchronously refresh all schema metadata. 1794 1795 By default, the timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait` 1796 and :attr:`~.Cluster.control_connection_timeout`. 1797 1798 Passing max_schema_agreement_wait here overrides :attr:`~.Cluster.max_schema_agreement_wait`. 1799 1800 Setting max_schema_agreement_wait <= 0 will bypass schema agreement and refresh schema immediately. 1801 1802 An Exception is raised if schema refresh fails for any reason. 1803 """ 1804 if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait, force=True): 1805 raise DriverException("Schema metadata was not refreshed. See log for details.") 1806 1807 def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None): 1808 """ 1809 Synchronously refresh keyspace metadata. This applies to keyspace-level information such as replication 1810 and durability settings. It does not refresh tables, types, etc. contained in the keyspace. 1811 1812 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior 1813 """ 1814 if not self.control_connection.refresh_schema(target_type=SchemaTargetType.KEYSPACE, keyspace=keyspace, 1815 schema_agreement_wait=max_schema_agreement_wait, force=True): 1816 raise DriverException("Keyspace metadata was not refreshed. See log for details.") 1817 1818 def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None): 1819 """ 1820 Synchronously refresh table metadata. This applies to a table, and any triggers or indexes attached 1821 to the table. 1822 1823 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior 1824 """ 1825 if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table, 1826 schema_agreement_wait=max_schema_agreement_wait, force=True): 1827 raise DriverException("Table metadata was not refreshed. See log for details.") 1828 1829 def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreement_wait=None): 1830 """ 1831 Synchronously refresh materialized view metadata. 1832 1833 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior 1834 """ 1835 if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view, 1836 schema_agreement_wait=max_schema_agreement_wait, force=True): 1837 raise DriverException("View metadata was not refreshed. See log for details.") 1838 1839 def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_wait=None): 1840 """ 1841 Synchronously refresh user defined type metadata. 1842 1843 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior 1844 """ 1845 if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type, 1846 schema_agreement_wait=max_schema_agreement_wait, force=True): 1847 raise DriverException("User Type metadata was not refreshed. See log for details.") 1848 1849 def refresh_user_function_metadata(self, keyspace, function, max_schema_agreement_wait=None): 1850 """ 1851 Synchronously refresh user defined function metadata. 1852 1853 ``function`` is a :class:`cassandra.UserFunctionDescriptor`. 1854 1855 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior 1856 """ 1857 if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function, 1858 schema_agreement_wait=max_schema_agreement_wait, force=True): 1859 raise DriverException("User Function metadata was not refreshed. See log for details.") 1860 1861 def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreement_wait=None): 1862 """ 1863 Synchronously refresh user defined aggregate metadata. 1864 1865 ``aggregate`` is a :class:`cassandra.UserAggregateDescriptor`. 1866 1867 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior 1868 """ 1869 if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate, 1870 schema_agreement_wait=max_schema_agreement_wait, force=True): 1871 raise DriverException("User Aggregate metadata was not refreshed. See log for details.") 1872 1873 def refresh_nodes(self, force_token_rebuild=False): 1874 """ 1875 Synchronously refresh the node list and token metadata 1876 1877 `force_token_rebuild` can be used to rebuild the token map metadata, even if no new nodes are discovered. 1878 1879 An Exception is raised if node refresh fails for any reason. 1880 """ 1881 if not self.control_connection.refresh_node_list_and_token_map(force_token_rebuild): 1882 raise DriverException("Node list was not refreshed. See log for details.") 1883 1884 def set_meta_refresh_enabled(self, enabled): 1885 """ 1886 *Deprecated:* set :attr:`~.Cluster.schema_metadata_enabled` :attr:`~.Cluster.token_metadata_enabled` instead 1887 1888 Sets a flag to enable (True) or disable (False) all metadata refresh queries. 1889 This applies to both schema and node topology. 1890 1891 Disabling this is useful to minimize refreshes during multiple changes. 1892 1893 Meta refresh must be enabled for the driver to become aware of any cluster 1894 topology changes or schema updates. 1895 """ 1896 warn("Cluster.set_meta_refresh_enabled is deprecated and will be removed in 4.0. Set " 1897 "Cluster.schema_metadata_enabled and Cluster.token_metadata_enabled instead.", DeprecationWarning) 1898 self.schema_metadata_enabled = enabled 1899 self.token_metadata_enabled = enabled 1900 1901 @classmethod 1902 def _send_chunks(cls, connection, host, chunks, set_keyspace=False): 1903 for ks_chunk in chunks: 1904 messages = [PrepareMessage(query=s.query_string, 1905 keyspace=s.keyspace if set_keyspace else None) 1906 for s in ks_chunk] 1907 # TODO: make this timeout configurable somehow? 1908 responses = connection.wait_for_responses(*messages, timeout=5.0, fail_on_error=False) 1909 for success, response in responses: 1910 if not success: 1911 log.debug("Got unexpected response when preparing " 1912 "statement on host %s: %r", host, response) 1913 1914 def _prepare_all_queries(self, host): 1915 if not self._prepared_statements or not self.reprepare_on_up: 1916 return 1917 1918 log.debug("Preparing all known prepared statements against host %s", host) 1919 connection = None 1920 try: 1921 connection = self.connection_factory(host.endpoint) 1922 statements = list(self._prepared_statements.values()) 1923 if ProtocolVersion.uses_keyspace_flag(self.protocol_version): 1924 # V5 protocol and higher, no need to set the keyspace 1925 chunks = [] 1926 for i in range(0, len(statements), 10): 1927 chunks.append(statements[i:i + 10]) 1928 self._send_chunks(connection, host, chunks, True) 1929 else: 1930 for keyspace, ks_statements in groupby(statements, lambda s: s.keyspace): 1931 if keyspace is not None: 1932 connection.set_keyspace_blocking(keyspace) 1933 1934 # prepare 10 statements at a time 1935 ks_statements = list(ks_statements) 1936 chunks = [] 1937 for i in range(0, len(ks_statements), 10): 1938 chunks.append(ks_statements[i:i + 10]) 1939 self._send_chunks(connection, host, chunks) 1940 1941 log.debug("Done preparing all known prepared statements against host %s", host) 1942 except OperationTimedOut as timeout: 1943 log.warning("Timed out trying to prepare all statements on host %s: %s", host, timeout) 1944 except (ConnectionException, socket.error) as exc: 1945 log.warning("Error trying to prepare all statements on host %s: %r", host, exc) 1946 except Exception: 1947 log.exception("Error trying to prepare all statements on host %s", host) 1948 finally: 1949 if connection: 1950 connection.close() 1951 1952 def add_prepared(self, query_id, prepared_statement): 1953 with self._prepared_statement_lock: 1954 self._prepared_statements[query_id] = prepared_statement 1955 1956 1957class Session(object): 1958 """ 1959 A collection of connection pools for each host in the cluster. 1960 Instances of this class should not be created directly, only 1961 using :meth:`.Cluster.connect()`. 1962 1963 Queries and statements can be executed through ``Session`` instances 1964 using the :meth:`~.Session.execute()` and :meth:`~.Session.execute_async()` 1965 methods. 1966 1967 Example usage:: 1968 1969 >>> session = cluster.connect() 1970 >>> session.set_keyspace("mykeyspace") 1971 >>> session.execute("SELECT * FROM mycf") 1972 1973 """ 1974 1975 cluster = None 1976 hosts = None 1977 keyspace = None 1978 is_shutdown = False 1979 1980 _row_factory = staticmethod(named_tuple_factory) 1981 @property 1982 def row_factory(self): 1983 """ 1984 The format to return row results in. By default, each 1985 returned row will be a named tuple. You can alternatively 1986 use any of the following: 1987 1988 - :func:`cassandra.query.tuple_factory` - return a result row as a tuple 1989 - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple 1990 - :func:`cassandra.query.dict_factory` - return a result row as a dict 1991 - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict 1992 1993 """ 1994 return self._row_factory 1995 1996 @row_factory.setter 1997 def row_factory(self, rf): 1998 self._validate_set_legacy_config('row_factory', rf) 1999 2000 _default_timeout = 10.0 2001 2002 @property 2003 def default_timeout(self): 2004 """ 2005 A default timeout, measured in seconds, for queries executed through 2006 :meth:`.execute()` or :meth:`.execute_async()`. This default may be 2007 overridden with the `timeout` parameter for either of those methods. 2008 2009 Setting this to :const:`None` will cause no timeouts to be set by default. 2010 2011 Please see :meth:`.ResponseFuture.result` for details on the scope and 2012 effect of this timeout. 2013 2014 .. versionadded:: 2.0.0 2015 """ 2016 return self._default_timeout 2017 2018 @default_timeout.setter 2019 def default_timeout(self, timeout): 2020 self._validate_set_legacy_config('default_timeout', timeout) 2021 2022 _default_consistency_level = ConsistencyLevel.LOCAL_ONE 2023 2024 @property 2025 def default_consistency_level(self): 2026 """ 2027 *Deprecated:* use execution profiles instead 2028 The default :class:`~ConsistencyLevel` for operations executed through 2029 this session. This default may be overridden by setting the 2030 :attr:`~.Statement.consistency_level` on individual statements. 2031 2032 .. versionadded:: 1.2.0 2033 2034 .. versionchanged:: 3.0.0 2035 2036 default changed from ONE to LOCAL_ONE 2037 """ 2038 return self._default_consistency_level 2039 2040 @default_consistency_level.setter 2041 def default_consistency_level(self, cl): 2042 """ 2043 *Deprecated:* use execution profiles instead 2044 """ 2045 warn("Setting the consistency level at the session level will be removed in 4.0. Consider using " 2046 "execution profiles and setting the desired consitency level to the EXEC_PROFILE_DEFAULT profile." 2047 , DeprecationWarning) 2048 self._validate_set_legacy_config('default_consistency_level', cl) 2049 2050 _default_serial_consistency_level = None 2051 2052 @property 2053 def default_serial_consistency_level(self): 2054 """ 2055 The default :class:`~ConsistencyLevel` for serial phase of conditional updates executed through 2056 this session. This default may be overridden by setting the 2057 :attr:`~.Statement.serial_consistency_level` on individual statements. 2058 2059 Only valid for ``protocol_version >= 2``. 2060 """ 2061 return self._default_serial_consistency_level 2062 2063 @default_serial_consistency_level.setter 2064 def default_serial_consistency_level(self, cl): 2065 if (cl is not None and 2066 not ConsistencyLevel.is_serial(cl)): 2067 raise ValueError("default_serial_consistency_level must be either " 2068 "ConsistencyLevel.SERIAL " 2069 "or ConsistencyLevel.LOCAL_SERIAL.") 2070 2071 self._validate_set_legacy_config('default_serial_consistency_level', cl) 2072 2073 max_trace_wait = 2.0 2074 """ 2075 The maximum amount of time (in seconds) the driver will wait for trace 2076 details to be populated server-side for a query before giving up. 2077 If the `trace` parameter for :meth:`~.execute()` or :meth:`~.execute_async()` 2078 is :const:`True`, the driver will repeatedly attempt to fetch trace 2079 details for the query (using exponential backoff) until this limit is 2080 hit. If the limit is passed, an error will be logged and the 2081 :attr:`.Statement.trace` will be left as :const:`None`. """ 2082 2083 default_fetch_size = 5000 2084 """ 2085 By default, this many rows will be fetched at a time. Setting 2086 this to :const:`None` will disable automatic paging for large query 2087 results. The fetch size can be also specified per-query through 2088 :attr:`.Statement.fetch_size`. 2089 2090 This only takes effect when protocol version 2 or higher is used. 2091 See :attr:`.Cluster.protocol_version` for details. 2092 2093 .. versionadded:: 2.0.0 2094 """ 2095 2096 use_client_timestamp = True 2097 """ 2098 When using protocol version 3 or higher, write timestamps may be supplied 2099 client-side at the protocol level. (Normally they are generated 2100 server-side by the coordinator node.) Note that timestamps specified 2101 within a CQL query will override this timestamp. 2102 2103 .. versionadded:: 2.1.0 2104 """ 2105 2106 timestamp_generator = None 2107 """ 2108 When :attr:`use_client_timestamp` is set, sessions call this object and use 2109 the result as the timestamp. (Note that timestamps specified within a CQL 2110 query will override this timestamp.) By default, a new 2111 :class:`~.MonotonicTimestampGenerator` is created for 2112 each :class:`Cluster` instance. 2113 2114 Applications can set this value for custom timestamp behavior. For 2115 example, an application could share a timestamp generator across 2116 :class:`Cluster` objects to guarantee that the application will use unique, 2117 increasing timestamps across clusters, or set it to to ``lambda: 2118 int(time.time() * 1e6)`` if losing records over clock inconsistencies is 2119 acceptable for the application. Custom :attr:`timestamp_generator` s should 2120 be callable, and calling them should return an integer representing microseconds 2121 since some point in time, typically UNIX epoch. 2122 2123 .. versionadded:: 3.8.0 2124 """ 2125 2126 encoder = None 2127 """ 2128 A :class:`~cassandra.encoder.Encoder` instance that will be used when 2129 formatting query parameters for non-prepared statements. This is not used 2130 for prepared statements (because prepared statements give the driver more 2131 information about what CQL types are expected, allowing it to accept a 2132 wider range of python types). 2133 2134 The encoder uses a mapping from python types to encoder methods (for 2135 specific CQL types). This mapping can be be modified by users as they see 2136 fit. Methods of :class:`~cassandra.encoder.Encoder` should be used for mapping 2137 values if possible, because they take precautions to avoid injections and 2138 properly sanitize data. 2139 2140 Example:: 2141 2142 cluster = Cluster() 2143 session = cluster.connect("mykeyspace") 2144 session.encoder.mapping[tuple] = session.encoder.cql_encode_tuple 2145 2146 session.execute("CREATE TABLE mytable (k int PRIMARY KEY, col tuple<int, ascii>)") 2147 session.execute("INSERT INTO mytable (k, col) VALUES (%s, %s)", [0, (123, 'abc')]) 2148 2149 .. versionadded:: 2.1.0 2150 """ 2151 2152 client_protocol_handler = ProtocolHandler 2153 """ 2154 Specifies a protocol handler that will be used for client-initiated requests (i.e. no 2155 internal driver requests). This can be used to override or extend features such as 2156 message or type ser/des. 2157 2158 The default pure python implementation is :class:`cassandra.protocol.ProtocolHandler`. 2159 2160 When compiled with Cython, there are also built-in faster alternatives. See :ref:`faster_deser` 2161 """ 2162 2163 _lock = None 2164 _pools = None 2165 _profile_manager = None 2166 _metrics = None 2167 _request_init_callbacks = None 2168 2169 def __init__(self, cluster, hosts, keyspace=None): 2170 self.cluster = cluster 2171 self.hosts = hosts 2172 self.keyspace = keyspace 2173 2174 self._lock = RLock() 2175 self._pools = {} 2176 self._profile_manager = cluster.profile_manager 2177 self._metrics = cluster.metrics 2178 self._request_init_callbacks = [] 2179 self._protocol_version = self.cluster.protocol_version 2180 2181 self.encoder = Encoder() 2182 2183 # create connection pools in parallel 2184 self._initial_connect_futures = set() 2185 for host in hosts: 2186 future = self.add_or_renew_pool(host, is_host_addition=False) 2187 if future: 2188 self._initial_connect_futures.add(future) 2189 2190 futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED) 2191 while futures.not_done and not any(f.result() for f in futures.done): 2192 futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED) 2193 2194 if not any(f.result() for f in self._initial_connect_futures): 2195 msg = "Unable to connect to any servers" 2196 if self.keyspace: 2197 msg += " using keyspace '%s'" % self.keyspace 2198 raise NoHostAvailable(msg, [h.address for h in hosts]) 2199 2200 def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, 2201 custom_payload=None, execution_profile=EXEC_PROFILE_DEFAULT, 2202 paging_state=None, host=None): 2203 """ 2204 Execute the given query and synchronously wait for the response. 2205 2206 If an error is encountered while executing the query, an Exception 2207 will be raised. 2208 2209 `query` may be a query string or an instance of :class:`cassandra.query.Statement`. 2210 2211 `parameters` may be a sequence or dict of parameters to bind. If a 2212 sequence is used, ``%s`` should be used the placeholder for each 2213 argument. If a dict is used, ``%(name)s`` style placeholders must 2214 be used. 2215 2216 `timeout` should specify a floating-point timeout (in seconds) after 2217 which an :exc:`.OperationTimedOut` exception will be raised if the query 2218 has not completed. If not set, the timeout defaults to 2219 :attr:`~.Session.default_timeout`. If set to :const:`None`, there is 2220 no timeout. Please see :meth:`.ResponseFuture.result` for details on 2221 the scope and effect of this timeout. 2222 2223 If `trace` is set to :const:`True`, the query will be sent with tracing enabled. 2224 The trace details can be obtained using the returned :class:`.ResultSet` object. 2225 2226 `custom_payload` is a :ref:`custom_payload` dict to be passed to the server. 2227 If `query` is a Statement with its own custom_payload. The message payload 2228 will be a union of the two, with the values specified here taking precedence. 2229 2230 `execution_profile` is the execution profile to use for this request. It can be a key to a profile configured 2231 via :meth:`Cluster.add_execution_profile` or an instance (from :meth:`Session.execution_profile_clone_update`, 2232 for example 2233 2234 `paging_state` is an optional paging state, reused from a previous :class:`ResultSet`. 2235 2236 `host` is the :class:`pool.Host` that should handle the query. Using this is discouraged except in a few 2237 cases, e.g., querying node-local tables and applying schema changes. 2238 """ 2239 return self.execute_async(query, parameters, trace, custom_payload, 2240 timeout, execution_profile, paging_state, host).result() 2241 2242 def execute_async(self, query, parameters=None, trace=False, custom_payload=None, 2243 timeout=_NOT_SET, execution_profile=EXEC_PROFILE_DEFAULT, 2244 paging_state=None, host=None): 2245 """ 2246 Execute the given query and return a :class:`~.ResponseFuture` object 2247 which callbacks may be attached to for asynchronous response 2248 delivery. You may also call :meth:`~.ResponseFuture.result()` 2249 on the :class:`.ResponseFuture` to synchronously block for results at 2250 any time. 2251 2252 See :meth:`Session.execute` for parameter definitions. 2253 2254 Example usage:: 2255 2256 >>> session = cluster.connect() 2257 >>> future = session.execute_async("SELECT * FROM mycf") 2258 2259 >>> def log_results(results): 2260 ... for row in results: 2261 ... log.info("Results: %s", row) 2262 2263 >>> def log_error(exc): 2264 >>> log.error("Operation failed: %s", exc) 2265 2266 >>> future.add_callbacks(log_results, log_error) 2267 2268 Async execution with blocking wait for results:: 2269 2270 >>> future = session.execute_async("SELECT * FROM mycf") 2271 >>> # do other stuff... 2272 2273 >>> try: 2274 ... results = future.result() 2275 ... except Exception: 2276 ... log.exception("Operation failed:") 2277 2278 """ 2279 future = self._create_response_future( 2280 query, parameters, trace, custom_payload, timeout, 2281 execution_profile, paging_state, host) 2282 future._protocol_handler = self.client_protocol_handler 2283 self._on_request(future) 2284 future.send_request() 2285 return future 2286 2287 def _create_response_future(self, query, parameters, trace, custom_payload, 2288 timeout, execution_profile=EXEC_PROFILE_DEFAULT, 2289 paging_state=None, host=None): 2290 """ Returns the ResponseFuture before calling send_request() on it """ 2291 2292 prepared_statement = None 2293 2294 if isinstance(query, six.string_types): 2295 query = SimpleStatement(query) 2296 elif isinstance(query, PreparedStatement): 2297 query = query.bind(parameters) 2298 2299 if self.cluster._config_mode == _ConfigMode.LEGACY: 2300 if execution_profile is not EXEC_PROFILE_DEFAULT: 2301 raise ValueError("Cannot specify execution_profile while using legacy parameters.") 2302 2303 if timeout is _NOT_SET: 2304 timeout = self.default_timeout 2305 2306 cl = query.consistency_level if query.consistency_level is not None else self.default_consistency_level 2307 serial_cl = query.serial_consistency_level if query.serial_consistency_level is not None else self.default_serial_consistency_level 2308 2309 retry_policy = query.retry_policy or self.cluster.default_retry_policy 2310 row_factory = self.row_factory 2311 load_balancing_policy = self.cluster.load_balancing_policy 2312 spec_exec_policy = None 2313 else: 2314 execution_profile = self._maybe_get_execution_profile(execution_profile) 2315 2316 if timeout is _NOT_SET: 2317 timeout = execution_profile.request_timeout 2318 2319 cl = query.consistency_level if query.consistency_level is not None else execution_profile.consistency_level 2320 serial_cl = query.serial_consistency_level if query.serial_consistency_level is not None else execution_profile.serial_consistency_level 2321 2322 retry_policy = query.retry_policy or execution_profile.retry_policy 2323 row_factory = execution_profile.row_factory 2324 load_balancing_policy = execution_profile.load_balancing_policy 2325 spec_exec_policy = execution_profile.speculative_execution_policy 2326 2327 fetch_size = query.fetch_size 2328 if fetch_size is FETCH_SIZE_UNSET and self._protocol_version >= 2: 2329 fetch_size = self.default_fetch_size 2330 elif self._protocol_version == 1: 2331 fetch_size = None 2332 2333 start_time = time.time() 2334 if self._protocol_version >= 3 and self.use_client_timestamp: 2335 timestamp = self.cluster.timestamp_generator() 2336 else: 2337 timestamp = None 2338 2339 if isinstance(query, SimpleStatement): 2340 query_string = query.query_string 2341 statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None 2342 if parameters: 2343 query_string = bind_params(query_string, parameters, self.encoder) 2344 message = QueryMessage( 2345 query_string, cl, serial_cl, 2346 fetch_size, timestamp=timestamp, 2347 keyspace=statement_keyspace) 2348 elif isinstance(query, BoundStatement): 2349 prepared_statement = query.prepared_statement 2350 message = ExecuteMessage( 2351 prepared_statement.query_id, query.values, cl, 2352 serial_cl, fetch_size, 2353 timestamp=timestamp, skip_meta=bool(prepared_statement.result_metadata), 2354 result_metadata_id=prepared_statement.result_metadata_id) 2355 elif isinstance(query, BatchStatement): 2356 if self._protocol_version < 2: 2357 raise UnsupportedOperation( 2358 "BatchStatement execution is only supported with protocol version " 2359 "2 or higher (supported in Cassandra 2.0 and higher). Consider " 2360 "setting Cluster.protocol_version to 2 to support this operation.") 2361 statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None 2362 message = BatchMessage( 2363 query.batch_type, query._statements_and_parameters, cl, 2364 serial_cl, timestamp, statement_keyspace) 2365 2366 message.tracing = trace 2367 2368 message.update_custom_payload(query.custom_payload) 2369 message.update_custom_payload(custom_payload) 2370 message.allow_beta_protocol_version = self.cluster.allow_beta_protocol_version 2371 message.paging_state = paging_state 2372 2373 spec_exec_plan = spec_exec_policy.new_plan(query.keyspace or self.keyspace, query) if query.is_idempotent and spec_exec_policy else None 2374 return ResponseFuture( 2375 self, message, query, timeout, metrics=self._metrics, 2376 prepared_statement=prepared_statement, retry_policy=retry_policy, row_factory=row_factory, 2377 load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan, 2378 host=host) 2379 2380 def _execution_profile_to_string(self, name): 2381 if name is EXEC_PROFILE_DEFAULT: 2382 return 'EXEC_PROFILE_DEFAULT' 2383 return '"%s"' % (name,) 2384 2385 def get_execution_profile(self, name): 2386 """ 2387 Returns the execution profile associated with the provided ``name``. 2388 2389 :param name: The name (or key) of the execution profile. 2390 """ 2391 profiles = self.cluster.profile_manager.profiles 2392 try: 2393 return profiles[name] 2394 except KeyError: 2395 eps = [self._execution_profile_to_string(ep) for ep in profiles.keys()] 2396 raise ValueError("Invalid execution_profile: %s; valid profiles are: %s." % ( 2397 self._execution_profile_to_string(name), ', '.join(eps))) 2398 2399 def _maybe_get_execution_profile(self, ep): 2400 return ep if isinstance(ep, ExecutionProfile) else self.get_execution_profile(ep) 2401 2402 def execution_profile_clone_update(self, ep, **kwargs): 2403 """ 2404 Returns a clone of the ``ep`` profile. ``kwargs`` can be specified to update attributes 2405 of the returned profile. 2406 2407 This is a shallow clone, so any objects referenced by the profile are shared. This means Load Balancing Policy 2408 is maintained by inclusion in the active profiles. It also means updating any other rich objects will be seen 2409 by the active profile. In cases where this is not desirable, be sure to replace the instance instead of manipulating 2410 the shared object. 2411 """ 2412 clone = copy(self._maybe_get_execution_profile(ep)) 2413 for attr, value in kwargs.items(): 2414 setattr(clone, attr, value) 2415 return clone 2416 2417 def add_request_init_listener(self, fn, *args, **kwargs): 2418 """ 2419 Adds a callback with arguments to be called when any request is created. 2420 2421 It will be invoked as `fn(response_future, *args, **kwargs)` after each client request is created, 2422 and before the request is sent\*. This can be used to create extensions by adding result callbacks to the 2423 response future. 2424 2425 \* where `response_future` is the :class:`.ResponseFuture` for the request. 2426 2427 Note that the init callback is done on the client thread creating the request, so you may need to consider 2428 synchronization if you have multiple threads. Any callbacks added to the response future will be executed 2429 on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in 2430 :meth:`.ResponseFuture.add_callbacks`. 2431 2432 See `this example <https://github.com/datastax/python-driver/blob/master/examples/request_init_listener.py>`_ in the 2433 source tree for an example. 2434 """ 2435 self._request_init_callbacks.append((fn, args, kwargs)) 2436 2437 def remove_request_init_listener(self, fn, *args, **kwargs): 2438 """ 2439 Removes a callback and arguments from the list. 2440 2441 See :meth:`.Session.add_request_init_listener`. 2442 """ 2443 self._request_init_callbacks.remove((fn, args, kwargs)) 2444 2445 def _on_request(self, response_future): 2446 for fn, args, kwargs in self._request_init_callbacks: 2447 fn(response_future, *args, **kwargs) 2448 2449 def prepare(self, query, custom_payload=None, keyspace=None): 2450 """ 2451 Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement` 2452 instance which can be used as follows:: 2453 2454 >>> session = cluster.connect("mykeyspace") 2455 >>> query = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)" 2456 >>> prepared = session.prepare(query) 2457 >>> session.execute(prepared, (user.id, user.name, user.age)) 2458 2459 Or you may bind values to the prepared statement ahead of time:: 2460 2461 >>> prepared = session.prepare(query) 2462 >>> bound_stmt = prepared.bind((user.id, user.name, user.age)) 2463 >>> session.execute(bound_stmt) 2464 2465 Of course, prepared statements may (and should) be reused:: 2466 2467 >>> prepared = session.prepare(query) 2468 >>> for user in users: 2469 ... bound = prepared.bind((user.id, user.name, user.age)) 2470 ... session.execute(bound) 2471 2472 Alternatively, if :attr:`~.Cluster.protocol_version` is 5 or higher 2473 (requires Cassandra 4.0+), the keyspace can be specified as a 2474 parameter. This will allow you to avoid specifying the keyspace in the 2475 query without specifying a keyspace in :meth:`~.Cluster.connect`. It 2476 even will let you prepare and use statements against a keyspace other 2477 than the one originally specified on connection: 2478 2479 >>> analyticskeyspace_prepared = session.prepare( 2480 ... "INSERT INTO user_activity id, last_activity VALUES (?, ?)", 2481 ... keyspace="analyticskeyspace") # note the different keyspace 2482 2483 **Important**: PreparedStatements should be prepared only once. 2484 Preparing the same query more than once will likely affect performance. 2485 2486 `custom_payload` is a key value map to be passed along with the prepare 2487 message. See :ref:`custom_payload`. 2488 """ 2489 message = PrepareMessage(query=query, keyspace=keyspace) 2490 future = ResponseFuture(self, message, query=None, timeout=self.default_timeout) 2491 try: 2492 future.send_request() 2493 query_id, bind_metadata, pk_indexes, result_metadata, result_metadata_id = future.result() 2494 except Exception: 2495 log.exception("Error preparing query:") 2496 raise 2497 2498 prepared_keyspace = keyspace if keyspace else None 2499 prepared_statement = PreparedStatement.from_message( 2500 query_id, bind_metadata, pk_indexes, self.cluster.metadata, query, self.keyspace, 2501 self._protocol_version, result_metadata, result_metadata_id) 2502 prepared_statement.custom_payload = future.custom_payload 2503 2504 self.cluster.add_prepared(query_id, prepared_statement) 2505 2506 if self.cluster.prepare_on_all_hosts: 2507 host = future._current_host 2508 try: 2509 self.prepare_on_all_hosts(prepared_statement.query_string, host, prepared_keyspace) 2510 except Exception: 2511 log.exception("Error preparing query on all hosts:") 2512 2513 return prepared_statement 2514 2515 def prepare_on_all_hosts(self, query, excluded_host, keyspace=None): 2516 """ 2517 Prepare the given query on all hosts, excluding ``excluded_host``. 2518 Intended for internal use only. 2519 """ 2520 futures = [] 2521 for host in tuple(self._pools.keys()): 2522 if host != excluded_host and host.is_up: 2523 future = ResponseFuture(self, PrepareMessage(query=query, keyspace=keyspace), 2524 None, self.default_timeout) 2525 2526 # we don't care about errors preparing against specific hosts, 2527 # since we can always prepare them as needed when the prepared 2528 # statement is used. Just log errors and continue on. 2529 try: 2530 request_id = future._query(host) 2531 except Exception: 2532 log.exception("Error preparing query for host %s:", host) 2533 continue 2534 2535 if request_id is None: 2536 # the error has already been logged by ResponsFuture 2537 log.debug("Failed to prepare query for host %s: %r", 2538 host, future._errors.get(host)) 2539 continue 2540 2541 futures.append((host, future)) 2542 2543 for host, future in futures: 2544 try: 2545 future.result() 2546 except Exception: 2547 log.exception("Error preparing query for host %s:", host) 2548 2549 def shutdown(self): 2550 """ 2551 Close all connections. ``Session`` instances should not be used 2552 for any purpose after being shutdown. 2553 """ 2554 with self._lock: 2555 if self.is_shutdown: 2556 return 2557 else: 2558 self.is_shutdown = True 2559 2560 # PYTHON-673. If shutdown was called shortly after session init, avoid 2561 # a race by cancelling any initial connection attempts haven't started, 2562 # then blocking on any that have. 2563 for future in self._initial_connect_futures: 2564 future.cancel() 2565 wait_futures(self._initial_connect_futures) 2566 2567 for pool in tuple(self._pools.values()): 2568 pool.shutdown() 2569 2570 def __enter__(self): 2571 return self 2572 2573 def __exit__(self, *args): 2574 self.shutdown() 2575 2576 def __del__(self): 2577 try: 2578 # Ensure all connections are closed, in case the Session object is deleted by the GC 2579 self.shutdown() 2580 except: 2581 # Ignore all errors. Shutdown errors can be caught by the user 2582 # when cluster.shutdown() is called explicitly. 2583 pass 2584 2585 def add_or_renew_pool(self, host, is_host_addition): 2586 """ 2587 For internal use only. 2588 """ 2589 distance = self._profile_manager.distance(host) 2590 if distance == HostDistance.IGNORED: 2591 return None 2592 2593 def run_add_or_renew_pool(): 2594 try: 2595 if self._protocol_version >= 3: 2596 new_pool = HostConnection(host, distance, self) 2597 else: 2598 # TODO remove host pool again ??? 2599 new_pool = HostConnectionPool(host, distance, self) 2600 except AuthenticationFailed as auth_exc: 2601 conn_exc = ConnectionException(str(auth_exc), host=host) 2602 self.cluster.signal_connection_failure(host, conn_exc, is_host_addition) 2603 return False 2604 except Exception as conn_exc: 2605 log.warning("Failed to create connection pool for new host %s:", 2606 host, exc_info=conn_exc) 2607 # the host itself will still be marked down, so we need to pass 2608 # a special flag to make sure the reconnector is created 2609 self.cluster.signal_connection_failure( 2610 host, conn_exc, is_host_addition, expect_host_to_be_down=True) 2611 return False 2612 2613 previous = self._pools.get(host) 2614 with self._lock: 2615 while new_pool._keyspace != self.keyspace: 2616 self._lock.release() 2617 set_keyspace_event = Event() 2618 errors_returned = [] 2619 2620 def callback(pool, errors): 2621 errors_returned.extend(errors) 2622 set_keyspace_event.set() 2623 2624 new_pool._set_keyspace_for_all_conns(self.keyspace, callback) 2625 set_keyspace_event.wait(self.cluster.connect_timeout) 2626 if not set_keyspace_event.is_set() or errors_returned: 2627 log.warning("Failed setting keyspace for pool after keyspace changed during connect: %s", errors_returned) 2628 self.cluster.on_down(host, is_host_addition) 2629 new_pool.shutdown() 2630 self._lock.acquire() 2631 return False 2632 self._lock.acquire() 2633 self._pools[host] = new_pool 2634 2635 log.debug("Added pool for host %s to session", host) 2636 if previous: 2637 previous.shutdown() 2638 2639 return True 2640 2641 return self.submit(run_add_or_renew_pool) 2642 2643 def remove_pool(self, host): 2644 pool = self._pools.pop(host, None) 2645 if pool: 2646 log.debug("Removed connection pool for %r", host) 2647 return self.submit(pool.shutdown) 2648 else: 2649 return None 2650 2651 def update_created_pools(self): 2652 """ 2653 When the set of live nodes change, the loadbalancer will change its 2654 mind on host distances. It might change it on the node that came/left 2655 but also on other nodes (for instance, if a node dies, another 2656 previously ignored node may be now considered). 2657 2658 This method ensures that all hosts for which a pool should exist 2659 have one, and hosts that shouldn't don't. 2660 2661 For internal use only. 2662 """ 2663 futures = set() 2664 for host in self.cluster.metadata.all_hosts(): 2665 distance = self._profile_manager.distance(host) 2666 pool = self._pools.get(host) 2667 future = None 2668 if not pool or pool.is_shutdown: 2669 # we don't eagerly set is_up on previously ignored hosts. None is included here 2670 # to allow us to attempt connections to hosts that have gone from ignored to something 2671 # else. 2672 if distance != HostDistance.IGNORED and host.is_up in (True, None): 2673 future = self.add_or_renew_pool(host, False) 2674 elif distance != pool.host_distance: 2675 # the distance has changed 2676 if distance == HostDistance.IGNORED: 2677 future = self.remove_pool(host) 2678 else: 2679 pool.host_distance = distance 2680 if future: 2681 futures.add(future) 2682 return futures 2683 2684 def on_down(self, host): 2685 """ 2686 Called by the parent Cluster instance when a node is marked down. 2687 Only intended for internal use. 2688 """ 2689 future = self.remove_pool(host) 2690 if future: 2691 future.add_done_callback(lambda f: self.update_created_pools()) 2692 2693 def on_remove(self, host): 2694 """ Internal """ 2695 self.on_down(host) 2696 2697 def set_keyspace(self, keyspace): 2698 """ 2699 Set the default keyspace for all queries made through this Session. 2700 This operation blocks until complete. 2701 """ 2702 self.execute('USE %s' % (protect_name(keyspace),)) 2703 2704 def _set_keyspace_for_all_pools(self, keyspace, callback): 2705 """ 2706 Asynchronously sets the keyspace on all pools. When all 2707 pools have set all of their connections, `callback` will be 2708 called with a dictionary of all errors that occurred, keyed 2709 by the `Host` that they occurred against. 2710 """ 2711 with self._lock: 2712 self.keyspace = keyspace 2713 remaining_callbacks = set(self._pools.values()) 2714 errors = {} 2715 2716 if not remaining_callbacks: 2717 callback(errors) 2718 return 2719 2720 def pool_finished_setting_keyspace(pool, host_errors): 2721 remaining_callbacks.remove(pool) 2722 if host_errors: 2723 errors[pool.host] = host_errors 2724 2725 if not remaining_callbacks: 2726 callback(host_errors) 2727 2728 for pool in tuple(self._pools.values()): 2729 pool._set_keyspace_for_all_conns(keyspace, pool_finished_setting_keyspace) 2730 2731 def user_type_registered(self, keyspace, user_type, klass): 2732 """ 2733 Called by the parent Cluster instance when the user registers a new 2734 mapping from a user-defined type to a class. Intended for internal 2735 use only. 2736 """ 2737 try: 2738 ks_meta = self.cluster.metadata.keyspaces[keyspace] 2739 except KeyError: 2740 raise UserTypeDoesNotExist( 2741 'Keyspace %s does not exist or has not been discovered by the driver' % (keyspace,)) 2742 2743 try: 2744 type_meta = ks_meta.user_types[user_type] 2745 except KeyError: 2746 raise UserTypeDoesNotExist( 2747 'User type %s does not exist in keyspace %s' % (user_type, keyspace)) 2748 2749 field_names = type_meta.field_names 2750 if six.PY2: 2751 # go from unicode to string to avoid decode errors from implicit 2752 # decode when formatting non-ascii values 2753 field_names = [fn.encode('utf-8') for fn in field_names] 2754 2755 def encode(val): 2756 return '{ %s }' % ' , '.join('%s : %s' % ( 2757 field_name, 2758 self.encoder.cql_encode_all_types(getattr(val, field_name, None)) 2759 ) for field_name in field_names) 2760 2761 self.encoder.mapping[klass] = encode 2762 2763 def submit(self, fn, *args, **kwargs): 2764 """ Internal """ 2765 if not self.is_shutdown: 2766 return self.cluster.executor.submit(fn, *args, **kwargs) 2767 2768 def get_pool_state(self): 2769 return dict((host, pool.get_state()) for host, pool in tuple(self._pools.items())) 2770 2771 def get_pools(self): 2772 return self._pools.values() 2773 2774 def _validate_set_legacy_config(self, attr_name, value): 2775 if self.cluster._config_mode == _ConfigMode.PROFILES: 2776 raise ValueError("Cannot set Session.%s while using Configuration Profiles. Set this in a profile instead." % (attr_name,)) 2777 setattr(self, '_' + attr_name, value) 2778 self.cluster._config_mode = _ConfigMode.LEGACY 2779 2780 2781class UserTypeDoesNotExist(Exception): 2782 """ 2783 An attempt was made to use a user-defined type that does not exist. 2784 2785 .. versionadded:: 2.1.0 2786 """ 2787 pass 2788 2789 2790class _ControlReconnectionHandler(_ReconnectionHandler): 2791 """ 2792 Internal 2793 """ 2794 2795 def __init__(self, control_connection, *args, **kwargs): 2796 _ReconnectionHandler.__init__(self, *args, **kwargs) 2797 self.control_connection = weakref.proxy(control_connection) 2798 2799 def try_reconnect(self): 2800 return self.control_connection._reconnect_internal() 2801 2802 def on_reconnection(self, connection): 2803 self.control_connection._set_new_connection(connection) 2804 2805 def on_exception(self, exc, next_delay): 2806 # TODO only overridden to add logging, so add logging 2807 if isinstance(exc, AuthenticationFailed): 2808 return False 2809 else: 2810 log.debug("Error trying to reconnect control connection: %r", exc) 2811 return True 2812 2813 2814def _watch_callback(obj_weakref, method_name, *args, **kwargs): 2815 """ 2816 A callback handler for the ControlConnection that tolerates 2817 weak references. 2818 """ 2819 obj = obj_weakref() 2820 if obj is None: 2821 return 2822 getattr(obj, method_name)(*args, **kwargs) 2823 2824 2825def _clear_watcher(conn, expiring_weakref): 2826 """ 2827 Called when the ControlConnection object is about to be finalized. 2828 This clears watchers on the underlying Connection object. 2829 """ 2830 try: 2831 conn.control_conn_disposed() 2832 except ReferenceError: 2833 pass 2834 2835 2836class ControlConnection(object): 2837 """ 2838 Internal 2839 """ 2840 2841 _SELECT_PEERS = "SELECT * FROM system.peers" 2842 _SELECT_PEERS_NO_TOKENS = "SELECT host_id, peer, data_center, rack, rpc_address, release_version, schema_version FROM system.peers" 2843 _SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'" 2844 _SELECT_LOCAL_NO_TOKENS = "SELECT host_id, cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'" 2845 # Used only when token_metadata_enabled is set to False 2846 _SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS = "SELECT rpc_address FROM system.local WHERE key='local'" 2847 2848 _SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers" 2849 _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'" 2850 2851 _is_shutdown = False 2852 _timeout = None 2853 _protocol_version = None 2854 2855 _schema_event_refresh_window = None 2856 _topology_event_refresh_window = None 2857 _status_event_refresh_window = None 2858 2859 _schema_meta_enabled = True 2860 _token_meta_enabled = True 2861 2862 # for testing purposes 2863 _time = time 2864 2865 def __init__(self, cluster, timeout, 2866 schema_event_refresh_window, 2867 topology_event_refresh_window, 2868 status_event_refresh_window, 2869 schema_meta_enabled=True, 2870 token_meta_enabled=True): 2871 # use a weak reference to allow the Cluster instance to be GC'ed (and 2872 # shutdown) since implementing __del__ disables the cycle detector 2873 self._cluster = weakref.proxy(cluster) 2874 self._connection = None 2875 self._timeout = timeout 2876 2877 self._schema_event_refresh_window = schema_event_refresh_window 2878 self._topology_event_refresh_window = topology_event_refresh_window 2879 self._status_event_refresh_window = status_event_refresh_window 2880 self._schema_meta_enabled = schema_meta_enabled 2881 self._token_meta_enabled = token_meta_enabled 2882 2883 self._lock = RLock() 2884 self._schema_agreement_lock = Lock() 2885 2886 self._reconnection_handler = None 2887 self._reconnection_lock = RLock() 2888 2889 self._event_schedule_times = {} 2890 2891 def connect(self): 2892 if self._is_shutdown: 2893 return 2894 2895 self._protocol_version = self._cluster.protocol_version 2896 self._set_new_connection(self._reconnect_internal()) 2897 2898 def _set_new_connection(self, conn): 2899 """ 2900 Replace existing connection (if there is one) and close it. 2901 """ 2902 with self._lock: 2903 old = self._connection 2904 self._connection = conn 2905 2906 if old: 2907 log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn) 2908 old.close() 2909 2910 def _reconnect_internal(self): 2911 """ 2912 Tries to connect to each host in the query plan until one succeeds 2913 or every attempt fails. If successful, a new Connection will be 2914 returned. Otherwise, :exc:`NoHostAvailable` will be raised 2915 with an "errors" arg that is a dict mapping host addresses 2916 to the exception that was raised when an attempt was made to open 2917 a connection to that host. 2918 """ 2919 errors = {} 2920 lbp = ( 2921 self._cluster.load_balancing_policy 2922 if self._cluster._config_mode == _ConfigMode.LEGACY else 2923 self._cluster._default_load_balancing_policy 2924 ) 2925 2926 for host in lbp.make_query_plan(): 2927 try: 2928 return self._try_connect(host) 2929 except ConnectionException as exc: 2930 errors[str(host.endpoint)] = exc 2931 log.warning("[control connection] Error connecting to %s:", host, exc_info=True) 2932 self._cluster.signal_connection_failure(host, exc, is_host_addition=False) 2933 except Exception as exc: 2934 errors[str(host.endpoint)] = exc 2935 log.warning("[control connection] Error connecting to %s:", host, exc_info=True) 2936 if self._is_shutdown: 2937 raise DriverException("[control connection] Reconnection in progress during shutdown") 2938 2939 raise NoHostAvailable("Unable to connect to any servers", errors) 2940 2941 def _try_connect(self, host): 2942 """ 2943 Creates a new Connection, registers for pushed events, and refreshes 2944 node/token and schema metadata. 2945 """ 2946 log.debug("[control connection] Opening new connection to %s", host) 2947 2948 while True: 2949 try: 2950 connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True) 2951 if self._is_shutdown: 2952 connection.close() 2953 raise DriverException("Reconnecting during shutdown") 2954 break 2955 except ProtocolVersionUnsupported as e: 2956 self._cluster.protocol_downgrade(host.endpoint, e.startup_version) 2957 2958 log.debug("[control connection] Established new connection %r, " 2959 "registering watchers and refreshing schema and topology", 2960 connection) 2961 2962 # use weak references in both directions 2963 # _clear_watcher will be called when this ControlConnection is about to be finalized 2964 # _watch_callback will get the actual callback from the Connection and relay it to 2965 # this object (after a dereferencing a weakref) 2966 self_weakref = weakref.ref(self, partial(_clear_watcher, weakref.proxy(connection))) 2967 try: 2968 connection.register_watchers({ 2969 "TOPOLOGY_CHANGE": partial(_watch_callback, self_weakref, '_handle_topology_change'), 2970 "STATUS_CHANGE": partial(_watch_callback, self_weakref, '_handle_status_change'), 2971 "SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change') 2972 }, register_timeout=self._timeout) 2973 2974 sel_peers = self._SELECT_PEERS if self._token_meta_enabled else self._SELECT_PEERS_NO_TOKENS 2975 sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS 2976 peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE) 2977 local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE) 2978 shared_results = connection.wait_for_responses( 2979 peers_query, local_query, timeout=self._timeout) 2980 2981 self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results) 2982 self._refresh_schema(connection, preloaded_results=shared_results, schema_agreement_wait=-1) 2983 except Exception: 2984 connection.close() 2985 raise 2986 2987 return connection 2988 2989 def reconnect(self): 2990 if self._is_shutdown: 2991 return 2992 2993 self._submit(self._reconnect) 2994 2995 def _reconnect(self): 2996 log.debug("[control connection] Attempting to reconnect") 2997 try: 2998 self._set_new_connection(self._reconnect_internal()) 2999 except NoHostAvailable: 3000 # make a retry schedule (which includes backoff) 3001 schedule = self._cluster.reconnection_policy.new_schedule() 3002 3003 with self._reconnection_lock: 3004 3005 # cancel existing reconnection attempts 3006 if self._reconnection_handler: 3007 self._reconnection_handler.cancel() 3008 3009 # when a connection is successfully made, _set_new_connection 3010 # will be called with the new connection and then our 3011 # _reconnection_handler will be cleared out 3012 self._reconnection_handler = _ControlReconnectionHandler( 3013 self, self._cluster.scheduler, schedule, 3014 self._get_and_set_reconnection_handler, 3015 new_handler=None) 3016 self._reconnection_handler.start() 3017 except Exception: 3018 log.debug("[control connection] error reconnecting", exc_info=True) 3019 raise 3020 3021 def _get_and_set_reconnection_handler(self, new_handler): 3022 """ 3023 Called by the _ControlReconnectionHandler when a new connection 3024 is successfully created. Clears out the _reconnection_handler on 3025 this ControlConnection. 3026 """ 3027 with self._reconnection_lock: 3028 old = self._reconnection_handler 3029 self._reconnection_handler = new_handler 3030 return old 3031 3032 def _submit(self, *args, **kwargs): 3033 try: 3034 if not self._cluster.is_shutdown: 3035 return self._cluster.executor.submit(*args, **kwargs) 3036 except ReferenceError: 3037 pass 3038 return None 3039 3040 def shutdown(self): 3041 # stop trying to reconnect (if we are) 3042 with self._reconnection_lock: 3043 if self._reconnection_handler: 3044 self._reconnection_handler.cancel() 3045 3046 with self._lock: 3047 if self._is_shutdown: 3048 return 3049 else: 3050 self._is_shutdown = True 3051 3052 log.debug("Shutting down control connection") 3053 if self._connection: 3054 self._connection.close() 3055 self._connection = None 3056 3057 def refresh_schema(self, force=False, **kwargs): 3058 try: 3059 if self._connection: 3060 return self._refresh_schema(self._connection, force=force, **kwargs) 3061 except ReferenceError: 3062 pass # our weak reference to the Cluster is no good 3063 except Exception: 3064 log.debug("[control connection] Error refreshing schema", exc_info=True) 3065 self._signal_error() 3066 return False 3067 3068 def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, force=False, **kwargs): 3069 if self._cluster.is_shutdown: 3070 return False 3071 3072 agreed = self.wait_for_schema_agreement(connection, 3073 preloaded_results=preloaded_results, 3074 wait_time=schema_agreement_wait) 3075 3076 if not self._schema_meta_enabled and not force: 3077 log.debug("[control connection] Skipping schema refresh because schema metadata is disabled") 3078 return False 3079 3080 if not agreed: 3081 log.debug("Skipping schema refresh due to lack of schema agreement") 3082 return False 3083 3084 self._cluster.metadata.refresh(connection, self._timeout, **kwargs) 3085 3086 return True 3087 3088 def refresh_node_list_and_token_map(self, force_token_rebuild=False): 3089 try: 3090 if self._connection: 3091 self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild) 3092 return True 3093 except ReferenceError: 3094 pass # our weak reference to the Cluster is no good 3095 except Exception: 3096 log.debug("[control connection] Error refreshing node list and token map", exc_info=True) 3097 self._signal_error() 3098 return False 3099 3100 def _refresh_node_list_and_token_map(self, connection, preloaded_results=None, 3101 force_token_rebuild=False): 3102 3103 if preloaded_results: 3104 log.debug("[control connection] Refreshing node list and token map using preloaded results") 3105 peers_result = preloaded_results[0] 3106 local_result = preloaded_results[1] 3107 else: 3108 cl = ConsistencyLevel.ONE 3109 if not self._token_meta_enabled: 3110 log.debug("[control connection] Refreshing node list without token map") 3111 sel_peers = self._SELECT_PEERS_NO_TOKENS 3112 sel_local = self._SELECT_LOCAL_NO_TOKENS 3113 else: 3114 log.debug("[control connection] Refreshing node list and token map") 3115 sel_peers = self._SELECT_PEERS 3116 sel_local = self._SELECT_LOCAL 3117 peers_query = QueryMessage(query=sel_peers, consistency_level=cl) 3118 local_query = QueryMessage(query=sel_local, consistency_level=cl) 3119 peers_result, local_result = connection.wait_for_responses( 3120 peers_query, local_query, timeout=self._timeout) 3121 3122 peers_result = dict_factory(*peers_result.results) 3123 3124 partitioner = None 3125 token_map = {} 3126 3127 found_hosts = set() 3128 if local_result.results: 3129 found_hosts.add(connection.endpoint) 3130 local_rows = dict_factory(*(local_result.results)) 3131 local_row = local_rows[0] 3132 cluster_name = local_row["cluster_name"] 3133 self._cluster.metadata.cluster_name = cluster_name 3134 3135 partitioner = local_row.get("partitioner") 3136 tokens = local_row.get("tokens") 3137 3138 host = self._cluster.metadata.get_host(connection.endpoint) 3139 if host: 3140 datacenter = local_row.get("data_center") 3141 rack = local_row.get("rack") 3142 self._update_location_info(host, datacenter, rack) 3143 host.host_id = local_row.get("host_id") 3144 host.listen_address = local_row.get("listen_address") 3145 host.broadcast_address = local_row.get("broadcast_address") 3146 3147 host.broadcast_rpc_address = self._address_from_row(local_row) 3148 if host.broadcast_rpc_address is None: 3149 if self._token_meta_enabled: 3150 # local rpc_address is not available, use the connection endpoint 3151 host.broadcast_rpc_address = connection.endpoint.address 3152 else: 3153 # local rpc_address has not been queried yet, try to fetch it 3154 # separately, which might fail because C* < 2.1.6 doesn't have rpc_address 3155 # in system.local. See CASSANDRA-9436. 3156 local_rpc_address_query = QueryMessage(query=self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS, 3157 consistency_level=ConsistencyLevel.ONE) 3158 success, local_rpc_address_result = connection.wait_for_response( 3159 local_rpc_address_query, timeout=self._timeout, fail_on_error=False) 3160 if success: 3161 row = dict_factory(*local_rpc_address_result.results) 3162 host.broadcast_rpc_address = row[0]['rpc_address'] 3163 else: 3164 host.broadcast_rpc_address = connection.endpoint.address 3165 3166 host.release_version = local_row.get("release_version") 3167 host.dse_version = local_row.get("dse_version") 3168 host.dse_workload = local_row.get("workload") 3169 3170 if partitioner and tokens: 3171 token_map[host] = tokens 3172 3173 # Check metadata.partitioner to see if we haven't built anything yet. If 3174 # every node in the cluster was in the contact points, we won't discover 3175 # any new nodes, so we need this additional check. (See PYTHON-90) 3176 should_rebuild_token_map = force_token_rebuild or self._cluster.metadata.partitioner is None 3177 for row in peers_result: 3178 endpoint = self._cluster.endpoint_factory.create(row) 3179 3180 tokens = row.get("tokens", None) 3181 if 'tokens' in row and not tokens: # it was selected, but empty 3182 log.warning("Excluding host (%s) with no tokens in system.peers table of %s." % (endpoint, connection.endpoint)) 3183 continue 3184 if endpoint in found_hosts: 3185 log.warning("Found multiple hosts with the same endpoint (%s). Excluding peer %s", endpoint, row.get("peer")) 3186 continue 3187 3188 found_hosts.add(endpoint) 3189 3190 host = self._cluster.metadata.get_host(endpoint) 3191 datacenter = row.get("data_center") 3192 rack = row.get("rack") 3193 if host is None: 3194 log.debug("[control connection] Found new host to connect to: %s", endpoint) 3195 host, _ = self._cluster.add_host(endpoint, datacenter, rack, signal=True, refresh_nodes=False) 3196 should_rebuild_token_map = True 3197 else: 3198 should_rebuild_token_map |= self._update_location_info(host, datacenter, rack) 3199 3200 host.host_id = row.get("host_id") 3201 host.broadcast_address = row.get("peer") 3202 host.broadcast_rpc_address = self._address_from_row(row) 3203 host.release_version = row.get("release_version") 3204 host.dse_version = row.get("dse_version") 3205 host.dse_workload = row.get("workload") 3206 3207 if partitioner and tokens: 3208 token_map[host] = tokens 3209 3210 for old_host in self._cluster.metadata.all_hosts(): 3211 if old_host.endpoint.address != connection.endpoint and old_host.endpoint not in found_hosts: 3212 should_rebuild_token_map = True 3213 log.debug("[control connection] Removing host not found in peers metadata: %r", old_host) 3214 self._cluster.remove_host(old_host) 3215 3216 log.debug("[control connection] Finished fetching ring info") 3217 if partitioner and should_rebuild_token_map: 3218 log.debug("[control connection] Rebuilding token map due to topology changes") 3219 self._cluster.metadata.rebuild_token_map(partitioner, token_map) 3220 3221 def _update_location_info(self, host, datacenter, rack): 3222 if host.datacenter == datacenter and host.rack == rack: 3223 return False 3224 3225 # If the dc/rack information changes, we need to update the load balancing policy. 3226 # For that, we remove and re-add the node against the policy. Not the most elegant, and assumes 3227 # that the policy will update correctly, but in practice this should work. 3228 self._cluster.profile_manager.on_down(host) 3229 host.set_location_info(datacenter, rack) 3230 self._cluster.profile_manager.on_up(host) 3231 return True 3232 3233 def _delay_for_event_type(self, event_type, delay_window): 3234 # this serves to order processing correlated events (received within the window) 3235 # the window and randomization still have the desired effect of skew across client instances 3236 next_time = self._event_schedule_times.get(event_type, 0) 3237 now = self._time.time() 3238 if now <= next_time: 3239 this_time = next_time + 0.01 3240 delay = this_time - now 3241 else: 3242 delay = random() * delay_window 3243 this_time = now + delay 3244 self._event_schedule_times[event_type] = this_time 3245 return delay 3246 3247 def _refresh_nodes_if_not_up(self, host): 3248 """ 3249 Used to mitigate refreshes for nodes that are already known. 3250 Some versions of the server send superfluous NEW_NODE messages in addition to UP events. 3251 """ 3252 if not host or not host.is_up: 3253 self.refresh_node_list_and_token_map() 3254 3255 def _handle_topology_change(self, event): 3256 change_type = event["change_type"] 3257 host = self._cluster.metadata.get_host(event["address"][0]) 3258 if change_type == "NEW_NODE" or change_type == "MOVED_NODE": 3259 if self._topology_event_refresh_window >= 0: 3260 delay = self._delay_for_event_type('topology_change', self._topology_event_refresh_window) 3261 self._cluster.scheduler.schedule_unique(delay, self._refresh_nodes_if_not_up, host) 3262 elif change_type == "REMOVED_NODE": 3263 self._cluster.scheduler.schedule_unique(0, self._cluster.remove_host, host) 3264 3265 def _handle_status_change(self, event): 3266 change_type = event["change_type"] 3267 host = self._cluster.metadata.get_host(event["address"][0]) 3268 if change_type == "UP": 3269 delay = self._delay_for_event_type('status_change', self._status_event_refresh_window) 3270 if host is None: 3271 # this is the first time we've seen the node 3272 self._cluster.scheduler.schedule_unique(delay, self.refresh_node_list_and_token_map) 3273 else: 3274 self._cluster.scheduler.schedule_unique(delay, self._cluster.on_up, host) 3275 elif change_type == "DOWN": 3276 # Note that there is a slight risk we can receive the event late and thus 3277 # mark the host down even though we already had reconnected successfully. 3278 # But it is unlikely, and don't have too much consequence since we'll try reconnecting 3279 # right away, so we favor the detection to make the Host.is_up more accurate. 3280 if host is not None: 3281 # this will be run by the scheduler 3282 self._cluster.on_down(host, is_host_addition=False) 3283 3284 def _handle_schema_change(self, event): 3285 if self._schema_event_refresh_window < 0: 3286 return 3287 delay = self._delay_for_event_type('schema_change', self._schema_event_refresh_window) 3288 self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, **event) 3289 3290 def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None): 3291 3292 total_timeout = wait_time if wait_time is not None else self._cluster.max_schema_agreement_wait 3293 if total_timeout <= 0: 3294 return True 3295 3296 # Each schema change typically generates two schema refreshes, one 3297 # from the response type and one from the pushed notification. Holding 3298 # a lock is just a simple way to cut down on the number of schema queries 3299 # we'll make. 3300 with self._schema_agreement_lock: 3301 if self._is_shutdown: 3302 return 3303 3304 if not connection: 3305 connection = self._connection 3306 3307 if preloaded_results: 3308 log.debug("[control connection] Attempting to use preloaded results for schema agreement") 3309 3310 peers_result = preloaded_results[0] 3311 local_result = preloaded_results[1] 3312 schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint) 3313 if schema_mismatches is None: 3314 return True 3315 3316 log.debug("[control connection] Waiting for schema agreement") 3317 start = self._time.time() 3318 elapsed = 0 3319 cl = ConsistencyLevel.ONE 3320 schema_mismatches = None 3321 while elapsed < total_timeout: 3322 peers_query = QueryMessage(query=self._SELECT_SCHEMA_PEERS, consistency_level=cl) 3323 local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl) 3324 try: 3325 timeout = min(self._timeout, total_timeout - elapsed) 3326 peers_result, local_result = connection.wait_for_responses( 3327 peers_query, local_query, timeout=timeout) 3328 except OperationTimedOut as timeout: 3329 log.debug("[control connection] Timed out waiting for " 3330 "response during schema agreement check: %s", timeout) 3331 elapsed = self._time.time() - start 3332 continue 3333 except ConnectionShutdown: 3334 if self._is_shutdown: 3335 log.debug("[control connection] Aborting wait for schema match due to shutdown") 3336 return None 3337 else: 3338 raise 3339 3340 schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint) 3341 if schema_mismatches is None: 3342 return True 3343 3344 log.debug("[control connection] Schemas mismatched, trying again") 3345 self._time.sleep(0.2) 3346 elapsed = self._time.time() - start 3347 3348 log.warning("Node %s is reporting a schema disagreement: %s", 3349 connection.endpoint, schema_mismatches) 3350 return False 3351 3352 def _get_schema_mismatches(self, peers_result, local_result, local_address): 3353 peers_result = dict_factory(*peers_result.results) 3354 3355 versions = defaultdict(set) 3356 if local_result.results: 3357 local_row = dict_factory(*local_result.results)[0] 3358 if local_row.get("schema_version"): 3359 versions[local_row.get("schema_version")].add(local_address) 3360 3361 for row in peers_result: 3362 schema_ver = row.get('schema_version') 3363 if not schema_ver: 3364 continue 3365 endpoint = self._cluster.endpoint_factory.create(row) 3366 peer = self._cluster.metadata.get_host(endpoint) 3367 if peer and peer.is_up is not False: 3368 versions[schema_ver].add(endpoint) 3369 3370 if len(versions) == 1: 3371 log.debug("[control connection] Schemas match") 3372 return None 3373 3374 return dict((version, list(nodes)) for version, nodes in six.iteritems(versions)) 3375 3376 def _address_from_row(self, row): 3377 """ 3378 Parse the broadcast rpc address from a row and return it untranslated. 3379 """ 3380 addr = None 3381 if "rpc_address" in row: 3382 addr = row.get("rpc_address") # peers and local 3383 if "native_transport_address" in row: 3384 addr = row.get("native_transport_address") 3385 if not addr or addr in ["0.0.0.0", "::"]: 3386 addr = row.get("peer") 3387 3388 return addr 3389 3390 def _signal_error(self): 3391 with self._lock: 3392 if self._is_shutdown: 3393 return 3394 3395 # try just signaling the cluster, as this will trigger a reconnect 3396 # as part of marking the host down 3397 if self._connection and self._connection.is_defunct: 3398 host = self._cluster.metadata.get_host(self._connection.endpoint) 3399 # host may be None if it's already been removed, but that indicates 3400 # that errors have already been reported, so we're fine 3401 if host: 3402 self._cluster.signal_connection_failure( 3403 host, self._connection.last_error, is_host_addition=False) 3404 return 3405 3406 # if the connection is not defunct or the host already left, reconnect 3407 # manually 3408 self.reconnect() 3409 3410 def on_up(self, host): 3411 pass 3412 3413 def on_down(self, host): 3414 3415 conn = self._connection 3416 if conn and conn.endpoint == host.endpoint and \ 3417 self._reconnection_handler is None: 3418 log.debug("[control connection] Control connection host (%s) is " 3419 "considered down, starting reconnection", host) 3420 # this will result in a task being submitted to the executor to reconnect 3421 self.reconnect() 3422 3423 def on_add(self, host, refresh_nodes=True): 3424 if refresh_nodes: 3425 self.refresh_node_list_and_token_map(force_token_rebuild=True) 3426 3427 def on_remove(self, host): 3428 c = self._connection 3429 if c and c.endpoint == host.endpoint: 3430 log.debug("[control connection] Control connection host (%s) is being removed. Reconnecting", host) 3431 # refresh will be done on reconnect 3432 self.reconnect() 3433 else: 3434 self.refresh_node_list_and_token_map(force_token_rebuild=True) 3435 3436 def get_connections(self): 3437 c = getattr(self, '_connection', None) 3438 return [c] if c else [] 3439 3440 def return_connection(self, connection): 3441 if connection is self._connection and (connection.is_defunct or connection.is_closed): 3442 self.reconnect() 3443 3444 3445def _stop_scheduler(scheduler, thread): 3446 try: 3447 if not scheduler.is_shutdown: 3448 scheduler.shutdown() 3449 except ReferenceError: 3450 pass 3451 3452 thread.join() 3453 3454 3455class _Scheduler(Thread): 3456 3457 _queue = None 3458 _scheduled_tasks = None 3459 _executor = None 3460 is_shutdown = False 3461 3462 def __init__(self, executor): 3463 self._queue = Queue.PriorityQueue() 3464 self._scheduled_tasks = set() 3465 self._count = count() 3466 self._executor = executor 3467 3468 Thread.__init__(self, name="Task Scheduler") 3469 self.daemon = True 3470 self.start() 3471 3472 def shutdown(self): 3473 try: 3474 log.debug("Shutting down Cluster Scheduler") 3475 except AttributeError: 3476 # this can happen on interpreter shutdown 3477 pass 3478 self.is_shutdown = True 3479 self._queue.put_nowait((0, 0, None)) 3480 self.join() 3481 3482 def schedule(self, delay, fn, *args, **kwargs): 3483 self._insert_task(delay, (fn, args, tuple(kwargs.items()))) 3484 3485 def schedule_unique(self, delay, fn, *args, **kwargs): 3486 task = (fn, args, tuple(kwargs.items())) 3487 if task not in self._scheduled_tasks: 3488 self._insert_task(delay, task) 3489 else: 3490 log.debug("Ignoring schedule_unique for already-scheduled task: %r", task) 3491 3492 def _insert_task(self, delay, task): 3493 if not self.is_shutdown: 3494 run_at = time.time() + delay 3495 self._scheduled_tasks.add(task) 3496 self._queue.put_nowait((run_at, next(self._count), task)) 3497 else: 3498 log.debug("Ignoring scheduled task after shutdown: %r", task) 3499 3500 def run(self): 3501 while True: 3502 if self.is_shutdown: 3503 return 3504 3505 try: 3506 while True: 3507 run_at, i, task = self._queue.get(block=True, timeout=None) 3508 if self.is_shutdown: 3509 if task: 3510 log.debug("Not executing scheduled task due to Scheduler shutdown") 3511 return 3512 if run_at <= time.time(): 3513 self._scheduled_tasks.discard(task) 3514 fn, args, kwargs = task 3515 kwargs = dict(kwargs) 3516 future = self._executor.submit(fn, *args, **kwargs) 3517 future.add_done_callback(self._log_if_failed) 3518 else: 3519 self._queue.put_nowait((run_at, i, task)) 3520 break 3521 except Queue.Empty: 3522 pass 3523 3524 time.sleep(0.1) 3525 3526 def _log_if_failed(self, future): 3527 exc = future.exception() 3528 if exc: 3529 log.warning( 3530 "An internally scheduled tasked failed with an unhandled exception:", 3531 exc_info=exc) 3532 3533 3534def refresh_schema_and_set_result(control_conn, response_future, connection, **kwargs): 3535 try: 3536 log.debug("Refreshing schema in response to schema change. " 3537 "%s", kwargs) 3538 response_future.is_schema_agreed = control_conn._refresh_schema(connection, **kwargs) 3539 except Exception: 3540 log.exception("Exception refreshing schema in response to schema change:") 3541 response_future.session.submit(control_conn.refresh_schema, **kwargs) 3542 finally: 3543 response_future._set_final_result(None) 3544 3545 3546class ResponseFuture(object): 3547 """ 3548 An asynchronous response delivery mechanism that is returned from calls 3549 to :meth:`.Session.execute_async()`. 3550 3551 There are two ways for results to be delivered: 3552 - Synchronously, by calling :meth:`.result()` 3553 - Asynchronously, by attaching callback and errback functions via 3554 :meth:`.add_callback()`, :meth:`.add_errback()`, and 3555 :meth:`.add_callbacks()`. 3556 """ 3557 3558 query = None 3559 """ 3560 The :class:`~.Statement` instance that is being executed through this 3561 :class:`.ResponseFuture`. 3562 """ 3563 3564 is_schema_agreed = True 3565 """ 3566 For DDL requests, this may be set ``False`` if the schema agreement poll after the response fails. 3567 3568 Always ``True`` for non-DDL requests. 3569 """ 3570 3571 request_encoded_size = None 3572 """ 3573 Size of the request message sent 3574 """ 3575 3576 coordinator_host = None 3577 """ 3578 The host from which we recieved a response 3579 """ 3580 3581 attempted_hosts = None 3582 """ 3583 A list of hosts tried, including all speculative executions, retries, and pages 3584 """ 3585 3586 session = None 3587 row_factory = None 3588 message = None 3589 default_timeout = None 3590 3591 _retry_policy = None 3592 _profile_manager = None 3593 3594 _req_id = None 3595 _final_result = _NOT_SET 3596 _col_names = None 3597 _col_types = None 3598 _final_exception = None 3599 _query_traces = None 3600 _callbacks = None 3601 _errbacks = None 3602 _current_host = None 3603 _connection = None 3604 _query_retries = 0 3605 _start_time = None 3606 _metrics = None 3607 _paging_state = None 3608 _custom_payload = None 3609 _warnings = None 3610 _timer = None 3611 _protocol_handler = ProtocolHandler 3612 _spec_execution_plan = NoSpeculativeExecutionPlan() 3613 _host = None 3614 3615 _warned_timeout = False 3616 3617 def __init__(self, session, message, query, timeout, metrics=None, prepared_statement=None, 3618 retry_policy=RetryPolicy(), row_factory=None, load_balancer=None, start_time=None, 3619 speculative_execution_plan=None, host=None): 3620 self.session = session 3621 # TODO: normalize handling of retry policy and row factory 3622 self.row_factory = row_factory or session.row_factory 3623 self._load_balancer = load_balancer or session.cluster._default_load_balancing_policy 3624 self.message = message 3625 self.query = query 3626 self.timeout = timeout 3627 self._retry_policy = retry_policy 3628 self._metrics = metrics 3629 self.prepared_statement = prepared_statement 3630 self._callback_lock = Lock() 3631 self._start_time = start_time or time.time() 3632 self._host = host 3633 self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan 3634 self._make_query_plan() 3635 self._event = Event() 3636 self._errors = {} 3637 self._callbacks = [] 3638 self._errbacks = [] 3639 self.attempted_hosts = [] 3640 self._start_timer() 3641 3642 @property 3643 def _time_remaining(self): 3644 if self.timeout is None: 3645 return None 3646 return (self._start_time + self.timeout) - time.time() 3647 3648 def _start_timer(self): 3649 if self._timer is None: 3650 spec_delay = self._spec_execution_plan.next_execution(self._current_host) 3651 if spec_delay >= 0: 3652 if self._time_remaining is None or self._time_remaining > spec_delay: 3653 self._timer = self.session.cluster.connection_class.create_timer(spec_delay, self._on_speculative_execute) 3654 return 3655 if self._time_remaining is not None: 3656 self._timer = self.session.cluster.connection_class.create_timer(self._time_remaining, self._on_timeout) 3657 3658 def _cancel_timer(self): 3659 if self._timer: 3660 self._timer.cancel() 3661 3662 def _on_timeout(self, _attempts=0): 3663 """ 3664 Called when the request associated with this ResponseFuture times out. 3665 3666 This function may reschedule itself. The ``_attempts`` parameter tracks 3667 the number of times this has happened. This parameter should only be 3668 set in those cases, where ``_on_timeout`` reschedules itself. 3669 """ 3670 # PYTHON-853: for short timeouts, we sometimes race with our __init__ 3671 if self._connection is None and _attempts < 3: 3672 self._timer = self.session.cluster.connection_class.create_timer( 3673 0.01, 3674 partial(self._on_timeout, _attempts=_attempts + 1) 3675 ) 3676 return 3677 3678 if self._connection is not None: 3679 try: 3680 self._connection._requests.pop(self._req_id) 3681 # This prevents the race condition of the 3682 # event loop thread just receiving the waited message 3683 # If it arrives after this, it will be ignored 3684 except KeyError: 3685 return 3686 3687 pool = self.session._pools.get(self._current_host) 3688 if pool and not pool.is_shutdown: 3689 with self._connection.lock: 3690 self._connection.request_ids.append(self._req_id) 3691 3692 pool.return_connection(self._connection) 3693 3694 errors = self._errors 3695 if not errors: 3696 if self.is_schema_agreed: 3697 key = str(self._current_host.endpoint) if self._current_host else 'no host queried before timeout' 3698 errors = {key: "Client request timeout. See Session.execute[_async](timeout)"} 3699 else: 3700 connection = self.session.cluster.control_connection._connection 3701 host = str(connection.endpoint) if connection else 'unknown' 3702 errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."} 3703 3704 self._set_final_exception(OperationTimedOut(errors, self._current_host)) 3705 3706 def _on_speculative_execute(self): 3707 self._timer = None 3708 if not self._event.is_set(): 3709 3710 # PYTHON-836, the speculative queries must be after 3711 # the query is sent from the main thread, otherwise the 3712 # query from the main thread may raise NoHostAvailable 3713 # if the _query_plan has been exhausted by the specualtive queries. 3714 # This also prevents a race condition accessing the iterator. 3715 # We reschedule this call until the main thread has succeeded 3716 # making a query 3717 if not self.attempted_hosts: 3718 self._timer = self.session.cluster.connection_class.create_timer(0.01, self._on_speculative_execute) 3719 return 3720 3721 if self._time_remaining is not None: 3722 if self._time_remaining <= 0: 3723 self._on_timeout() 3724 return 3725 self.send_request(error_no_hosts=False) 3726 self._start_timer() 3727 3728 def _make_query_plan(self): 3729 # set the query_plan according to the load balancing policy, 3730 # or to the explicit host target if set 3731 if self._host: 3732 # returning a single value effectively disables retries 3733 self.query_plan = [self._host] 3734 else: 3735 # convert the list/generator/etc to an iterator so that subsequent 3736 # calls to send_request (which retries may do) will resume where 3737 # they last left off 3738 self.query_plan = iter(self._load_balancer.make_query_plan(self.session.keyspace, self.query)) 3739 3740 def send_request(self, error_no_hosts=True): 3741 """ Internal """ 3742 # query_plan is an iterator, so this will resume where we last left 3743 # off if send_request() is called multiple times 3744 for host in self.query_plan: 3745 req_id = self._query(host) 3746 if req_id is not None: 3747 self._req_id = req_id 3748 return True 3749 if self.timeout is not None and time.time() - self._start_time > self.timeout: 3750 self._on_timeout() 3751 return True 3752 3753 if error_no_hosts: 3754 self._set_final_exception(NoHostAvailable( 3755 "Unable to complete the operation against any hosts", self._errors)) 3756 return False 3757 3758 def _query(self, host, message=None, cb=None): 3759 if message is None: 3760 message = self.message 3761 3762 pool = self.session._pools.get(host) 3763 if not pool: 3764 self._errors[host] = ConnectionException("Host has been marked down or removed") 3765 return None 3766 elif pool.is_shutdown: 3767 self._errors[host] = ConnectionException("Pool is shutdown") 3768 return None 3769 3770 self._current_host = host 3771 3772 connection = None 3773 try: 3774 # TODO get connectTimeout from cluster settings 3775 connection, request_id = pool.borrow_connection(timeout=2.0) 3776 self._connection = connection 3777 result_meta = self.prepared_statement.result_metadata if self.prepared_statement else [] 3778 3779 if cb is None: 3780 cb = partial(self._set_result, host, connection, pool) 3781 3782 self.request_encoded_size = connection.send_msg(message, request_id, cb=cb, 3783 encoder=self._protocol_handler.encode_message, 3784 decoder=self._protocol_handler.decode_message, 3785 result_metadata=result_meta) 3786 self.attempted_hosts.append(host) 3787 return request_id 3788 except NoConnectionsAvailable as exc: 3789 log.debug("All connections for host %s are at capacity, moving to the next host", host) 3790 self._errors[host] = exc 3791 return None 3792 except Exception as exc: 3793 log.debug("Error querying host %s", host, exc_info=True) 3794 self._errors[host] = exc 3795 if self._metrics is not None: 3796 self._metrics.on_connection_error() 3797 if connection: 3798 pool.return_connection(connection) 3799 return None 3800 3801 @property 3802 def has_more_pages(self): 3803 """ 3804 Returns :const:`True` if there are more pages left in the 3805 query results, :const:`False` otherwise. This should only 3806 be checked after the first page has been returned. 3807 3808 .. versionadded:: 2.0.0 3809 """ 3810 return self._paging_state is not None 3811 3812 @property 3813 def warnings(self): 3814 """ 3815 Warnings returned from the server, if any. This will only be 3816 set for protocol_version 4+. 3817 3818 Warnings may be returned for such things as oversized batches, 3819 or too many tombstones in slice queries. 3820 3821 Ensure the future is complete before trying to access this property 3822 (call :meth:`.result()`, or after callback is invoked). 3823 Otherwise it may throw if the response has not been received. 3824 """ 3825 # TODO: When timers are introduced, just make this wait 3826 if not self._event.is_set(): 3827 raise DriverException("warnings cannot be retrieved before ResponseFuture is finalized") 3828 return self._warnings 3829 3830 @property 3831 def custom_payload(self): 3832 """ 3833 The custom payload returned from the server, if any. This will only be 3834 set by Cassandra servers implementing a custom QueryHandler, and only 3835 for protocol_version 4+. 3836 3837 Ensure the future is complete before trying to access this property 3838 (call :meth:`.result()`, or after callback is invoked). 3839 Otherwise it may throw if the response has not been received. 3840 3841 :return: :ref:`custom_payload`. 3842 """ 3843 # TODO: When timers are introduced, just make this wait 3844 if not self._event.is_set(): 3845 raise DriverException("custom_payload cannot be retrieved before ResponseFuture is finalized") 3846 return self._custom_payload 3847 3848 def start_fetching_next_page(self): 3849 """ 3850 If there are more pages left in the query result, this asynchronously 3851 starts fetching the next page. If there are no pages left, :exc:`.QueryExhausted` 3852 is raised. Also see :attr:`.has_more_pages`. 3853 3854 This should only be called after the first page has been returned. 3855 3856 .. versionadded:: 2.0.0 3857 """ 3858 if not self._paging_state: 3859 raise QueryExhausted() 3860 3861 self._make_query_plan() 3862 self.message.paging_state = self._paging_state 3863 self._event.clear() 3864 self._final_result = _NOT_SET 3865 self._final_exception = None 3866 self._start_timer() 3867 self.send_request() 3868 3869 def _reprepare(self, prepare_message, host, connection, pool): 3870 cb = partial(self.session.submit, self._execute_after_prepare, host, connection, pool) 3871 request_id = self._query(host, prepare_message, cb=cb) 3872 if request_id is None: 3873 # try to submit the original prepared statement on some other host 3874 self.send_request() 3875 3876 def _set_result(self, host, connection, pool, response): 3877 try: 3878 self.coordinator_host = host 3879 if pool: 3880 pool.return_connection(connection) 3881 3882 trace_id = getattr(response, 'trace_id', None) 3883 if trace_id: 3884 if not self._query_traces: 3885 self._query_traces = [] 3886 self._query_traces.append(QueryTrace(trace_id, self.session)) 3887 3888 self._warnings = getattr(response, 'warnings', None) 3889 self._custom_payload = getattr(response, 'custom_payload', None) 3890 3891 if isinstance(response, ResultMessage): 3892 if response.kind == RESULT_KIND_SET_KEYSPACE: 3893 session = getattr(self, 'session', None) 3894 # since we're running on the event loop thread, we need to 3895 # use a non-blocking method for setting the keyspace on 3896 # all connections in this session, otherwise the event 3897 # loop thread will deadlock waiting for keyspaces to be 3898 # set. This uses a callback chain which ends with 3899 # self._set_keyspace_completed() being called in the 3900 # event loop thread. 3901 if session: 3902 session._set_keyspace_for_all_pools( 3903 response.results, self._set_keyspace_completed) 3904 elif response.kind == RESULT_KIND_SCHEMA_CHANGE: 3905 # refresh the schema before responding, but do it in another 3906 # thread instead of the event loop thread 3907 self.is_schema_agreed = False 3908 self.session.submit( 3909 refresh_schema_and_set_result, 3910 self.session.cluster.control_connection, 3911 self, connection, **response.results) 3912 else: 3913 results = getattr(response, 'results', None) 3914 if results is not None and response.kind == RESULT_KIND_ROWS: 3915 self._paging_state = response.paging_state 3916 self._col_types = response.col_types 3917 self._col_names = results[0] 3918 results = self.row_factory(*results) 3919 self._set_final_result(results) 3920 elif isinstance(response, ErrorMessage): 3921 retry_policy = self._retry_policy 3922 3923 if isinstance(response, ReadTimeoutErrorMessage): 3924 if self._metrics is not None: 3925 self._metrics.on_read_timeout() 3926 retry = retry_policy.on_read_timeout( 3927 self.query, retry_num=self._query_retries, **response.info) 3928 elif isinstance(response, WriteTimeoutErrorMessage): 3929 if self._metrics is not None: 3930 self._metrics.on_write_timeout() 3931 retry = retry_policy.on_write_timeout( 3932 self.query, retry_num=self._query_retries, **response.info) 3933 elif isinstance(response, UnavailableErrorMessage): 3934 if self._metrics is not None: 3935 self._metrics.on_unavailable() 3936 retry = retry_policy.on_unavailable( 3937 self.query, retry_num=self._query_retries, **response.info) 3938 elif isinstance(response, (OverloadedErrorMessage, 3939 IsBootstrappingErrorMessage, 3940 TruncateError, ServerError)): 3941 log.warning("Host %s error: %s.", host, response.summary) 3942 if self._metrics is not None: 3943 self._metrics.on_other_error() 3944 retry = retry_policy.on_request_error( 3945 self.query, self.message.consistency_level, error=response, 3946 retry_num=self._query_retries) 3947 elif isinstance(response, PreparedQueryNotFound): 3948 if self.prepared_statement: 3949 query_id = self.prepared_statement.query_id 3950 assert query_id == response.info, \ 3951 "Got different query ID in server response (%s) than we " \ 3952 "had before (%s)" % (response.info, query_id) 3953 else: 3954 query_id = response.info 3955 3956 try: 3957 prepared_statement = self.session.cluster._prepared_statements[query_id] 3958 except KeyError: 3959 if not self.prepared_statement: 3960 log.error("Tried to execute unknown prepared statement: id=%s", 3961 query_id.encode('hex')) 3962 self._set_final_exception(response) 3963 return 3964 else: 3965 prepared_statement = self.prepared_statement 3966 self.session.cluster._prepared_statements[query_id] = prepared_statement 3967 3968 current_keyspace = self._connection.keyspace 3969 prepared_keyspace = prepared_statement.keyspace 3970 if not ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) \ 3971 and prepared_keyspace and current_keyspace != prepared_keyspace: 3972 self._set_final_exception( 3973 ValueError("The Session's current keyspace (%s) does " 3974 "not match the keyspace the statement was " 3975 "prepared with (%s)" % 3976 (current_keyspace, prepared_keyspace))) 3977 return 3978 3979 log.debug("Re-preparing unrecognized prepared statement against host %s: %s", 3980 host, prepared_statement.query_string) 3981 prepared_keyspace = prepared_statement.keyspace \ 3982 if ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) else None 3983 prepare_message = PrepareMessage(query=prepared_statement.query_string, 3984 keyspace=prepared_keyspace) 3985 # since this might block, run on the executor to avoid hanging 3986 # the event loop thread 3987 self.session.submit(self._reprepare, prepare_message, host, connection, pool) 3988 return 3989 else: 3990 if hasattr(response, 'to_exception'): 3991 self._set_final_exception(response.to_exception()) 3992 else: 3993 self._set_final_exception(response) 3994 return 3995 3996 self._handle_retry_decision(retry, response, host) 3997 elif isinstance(response, ConnectionException): 3998 if self._metrics is not None: 3999 self._metrics.on_connection_error() 4000 if not isinstance(response, ConnectionShutdown): 4001 self._connection.defunct(response) 4002 retry = self._retry_policy.on_request_error( 4003 self.query, self.message.consistency_level, error=response, 4004 retry_num=self._query_retries) 4005 self._handle_retry_decision(retry, response, host) 4006 elif isinstance(response, Exception): 4007 if hasattr(response, 'to_exception'): 4008 self._set_final_exception(response.to_exception()) 4009 else: 4010 self._set_final_exception(response) 4011 else: 4012 # we got some other kind of response message 4013 msg = "Got unexpected message: %r" % (response,) 4014 exc = ConnectionException(msg, host) 4015 self._cancel_timer() 4016 self._connection.defunct(exc) 4017 self._set_final_exception(exc) 4018 except Exception as exc: 4019 # almost certainly caused by a bug, but we need to set something here 4020 log.exception("Unexpected exception while handling result in ResponseFuture:") 4021 self._set_final_exception(exc) 4022 4023 def _set_keyspace_completed(self, errors): 4024 if not errors: 4025 self._set_final_result(None) 4026 else: 4027 self._set_final_exception(ConnectionException( 4028 "Failed to set keyspace on all hosts: %s" % (errors,))) 4029 4030 def _execute_after_prepare(self, host, connection, pool, response): 4031 """ 4032 Handle the response to our attempt to prepare a statement. 4033 If it succeeded, run the original query again against the same host. 4034 """ 4035 if pool: 4036 pool.return_connection(connection) 4037 4038 if self._final_exception: 4039 return 4040 4041 if isinstance(response, ResultMessage): 4042 if response.kind == RESULT_KIND_PREPARED: 4043 if self.prepared_statement: 4044 # result metadata is the only thing that could have 4045 # changed from an alter 4046 (_, _, _, 4047 self.prepared_statement.result_metadata, 4048 new_metadata_id) = response.results 4049 if new_metadata_id is not None: 4050 self.prepared_statement.result_metadata_id = new_metadata_id 4051 4052 # use self._query to re-use the same host and 4053 # at the same time properly borrow the connection 4054 request_id = self._query(host) 4055 if request_id is None: 4056 # this host errored out, move on to the next 4057 self.send_request() 4058 else: 4059 self._set_final_exception(ConnectionException( 4060 "Got unexpected response when preparing statement " 4061 "on host %s: %s" % (host, response))) 4062 elif isinstance(response, ErrorMessage): 4063 if hasattr(response, 'to_exception'): 4064 self._set_final_exception(response.to_exception()) 4065 else: 4066 self._set_final_exception(response) 4067 elif isinstance(response, ConnectionException): 4068 log.debug("Connection error when preparing statement on host %s: %s", 4069 host, response) 4070 # try again on a different host, preparing again if necessary 4071 self._errors[host] = response 4072 self.send_request() 4073 else: 4074 self._set_final_exception(ConnectionException( 4075 "Got unexpected response type when preparing " 4076 "statement on host %s: %s" % (host, response))) 4077 4078 def _set_final_result(self, response): 4079 self._cancel_timer() 4080 if self._metrics is not None: 4081 self._metrics.request_timer.addValue(time.time() - self._start_time) 4082 4083 with self._callback_lock: 4084 self._final_result = response 4085 # save off current callbacks inside lock for execution outside it 4086 # -- prevents case where _final_result is set, then a callback is 4087 # added and executed on the spot, then executed again as a 4088 # registered callback 4089 to_call = tuple( 4090 partial(fn, response, *args, **kwargs) 4091 for (fn, args, kwargs) in self._callbacks 4092 ) 4093 4094 self._event.set() 4095 4096 # apply each callback 4097 for callback_partial in to_call: 4098 callback_partial() 4099 4100 def _set_final_exception(self, response): 4101 self._cancel_timer() 4102 if self._metrics is not None: 4103 self._metrics.request_timer.addValue(time.time() - self._start_time) 4104 4105 with self._callback_lock: 4106 self._final_exception = response 4107 # save off current errbacks inside lock for execution outside it -- 4108 # prevents case where _final_exception is set, then an errback is 4109 # added and executed on the spot, then executed again as a 4110 # registered errback 4111 to_call = tuple( 4112 partial(fn, response, *args, **kwargs) 4113 for (fn, args, kwargs) in self._errbacks 4114 ) 4115 self._event.set() 4116 4117 # apply each callback 4118 for callback_partial in to_call: 4119 callback_partial() 4120 4121 def _handle_retry_decision(self, retry_decision, response, host): 4122 4123 def exception_from_response(response): 4124 if hasattr(response, 'to_exception'): 4125 return response.to_exception() 4126 else: 4127 return response 4128 4129 retry_type, consistency = retry_decision 4130 if retry_type in (RetryPolicy.RETRY, RetryPolicy.RETRY_NEXT_HOST): 4131 self._query_retries += 1 4132 reuse = retry_type == RetryPolicy.RETRY 4133 self._retry(reuse, consistency, host) 4134 elif retry_type is RetryPolicy.RETHROW: 4135 self._set_final_exception(exception_from_response(response)) 4136 else: # IGNORE 4137 if self._metrics is not None: 4138 self._metrics.on_ignore() 4139 self._set_final_result(None) 4140 4141 self._errors[host] = exception_from_response(response) 4142 4143 def _retry(self, reuse_connection, consistency_level, host): 4144 if self._final_exception: 4145 # the connection probably broke while we were waiting 4146 # to retry the operation 4147 return 4148 4149 if self._metrics is not None: 4150 self._metrics.on_retry() 4151 if consistency_level is not None: 4152 self.message.consistency_level = consistency_level 4153 4154 # don't retry on the event loop thread 4155 self.session.submit(self._retry_task, reuse_connection, host) 4156 4157 def _retry_task(self, reuse_connection, host): 4158 if self._final_exception: 4159 # the connection probably broke while we were waiting 4160 # to retry the operation 4161 return 4162 4163 if reuse_connection and self._query(host) is not None: 4164 return 4165 4166 # otherwise, move onto another host 4167 self.send_request() 4168 4169 def result(self): 4170 """ 4171 Return the final result or raise an Exception if errors were 4172 encountered. If the final result or error has not been set 4173 yet, this method will block until it is set, or the timeout 4174 set for the request expires. 4175 4176 Timeout is specified in the Session request execution functions. 4177 If the timeout is exceeded, an :exc:`cassandra.OperationTimedOut` will be raised. 4178 This is a client-side timeout. For more information 4179 about server-side coordinator timeouts, see :class:`.policies.RetryPolicy`. 4180 4181 Example usage:: 4182 4183 >>> future = session.execute_async("SELECT * FROM mycf") 4184 >>> # do other stuff... 4185 4186 >>> try: 4187 ... rows = future.result() 4188 ... for row in rows: 4189 ... ... # process results 4190 ... except Exception: 4191 ... log.exception("Operation failed:") 4192 4193 """ 4194 self._event.wait() 4195 if self._final_result is not _NOT_SET: 4196 return ResultSet(self, self._final_result) 4197 else: 4198 raise self._final_exception 4199 4200 def get_query_trace_ids(self): 4201 """ 4202 Returns the trace session ids for this future, if tracing was enabled (does not fetch trace data). 4203 """ 4204 return [trace.trace_id for trace in self._query_traces] 4205 4206 def get_query_trace(self, max_wait=None, query_cl=ConsistencyLevel.LOCAL_ONE): 4207 """ 4208 Fetches and returns the query trace of the last response, or `None` if tracing was 4209 not enabled. 4210 4211 Note that this may raise an exception if there are problems retrieving the trace 4212 details from Cassandra. If the trace is not available after `max_wait`, 4213 :exc:`cassandra.query.TraceUnavailable` will be raised. 4214 4215 If the ResponseFuture is not done (async execution) and you try to retrieve the trace, 4216 :exc:`cassandra.query.TraceUnavailable` will be raised. 4217 4218 `query_cl` is the consistency level used to poll the trace tables. 4219 """ 4220 if self._final_result is _NOT_SET and self._final_exception is None: 4221 raise TraceUnavailable( 4222 "Trace information was not available. The ResponseFuture is not done.") 4223 4224 if self._query_traces: 4225 return self._get_query_trace(len(self._query_traces) - 1, max_wait, query_cl) 4226 4227 def get_all_query_traces(self, max_wait_per=None, query_cl=ConsistencyLevel.LOCAL_ONE): 4228 """ 4229 Fetches and returns the query traces for all query pages, if tracing was enabled. 4230 4231 See note in :meth:`~.get_query_trace` regarding possible exceptions. 4232 """ 4233 if self._query_traces: 4234 return [self._get_query_trace(i, max_wait_per, query_cl) for i in range(len(self._query_traces))] 4235 return [] 4236 4237 def _get_query_trace(self, i, max_wait, query_cl): 4238 trace = self._query_traces[i] 4239 if not trace.events: 4240 trace.populate(max_wait=max_wait, query_cl=query_cl) 4241 return trace 4242 4243 def add_callback(self, fn, *args, **kwargs): 4244 """ 4245 Attaches a callback function to be called when the final results arrive. 4246 4247 By default, `fn` will be called with the results as the first and only 4248 argument. If `*args` or `**kwargs` are supplied, they will be passed 4249 through as additional positional or keyword arguments to `fn`. 4250 4251 If an error is hit while executing the operation, a callback attached 4252 here will not be called. Use :meth:`.add_errback()` or :meth:`add_callbacks()` 4253 if you wish to handle that case. 4254 4255 If the final result has already been seen when this method is called, 4256 the callback will be called immediately (before this method returns). 4257 4258 Note: in the case that the result is not available when the callback is added, 4259 the callback is executed by IO event thread. This means that the callback 4260 should not block or attempt further synchronous requests, because no further 4261 IO will be processed until the callback returns. 4262 4263 **Important**: if the callback you attach results in an exception being 4264 raised, **the exception will be ignored**, so please ensure your 4265 callback handles all error cases that you care about. 4266 4267 Usage example:: 4268 4269 >>> session = cluster.connect("mykeyspace") 4270 4271 >>> def handle_results(rows, start_time, should_log=False): 4272 ... if should_log: 4273 ... log.info("Total time: %f", time.time() - start_time) 4274 ... ... 4275 4276 >>> future = session.execute_async("SELECT * FROM users") 4277 >>> future.add_callback(handle_results, time.time(), should_log=True) 4278 4279 """ 4280 run_now = False 4281 with self._callback_lock: 4282 # Always add fn to self._callbacks, even when we're about to 4283 # execute it, to prevent races with functions like 4284 # start_fetching_next_page that reset _final_result 4285 self._callbacks.append((fn, args, kwargs)) 4286 if self._final_result is not _NOT_SET: 4287 run_now = True 4288 if run_now: 4289 fn(self._final_result, *args, **kwargs) 4290 return self 4291 4292 def add_errback(self, fn, *args, **kwargs): 4293 """ 4294 Like :meth:`.add_callback()`, but handles error cases. 4295 An Exception instance will be passed as the first positional argument 4296 to `fn`. 4297 """ 4298 run_now = False 4299 with self._callback_lock: 4300 # Always add fn to self._errbacks, even when we're about to execute 4301 # it, to prevent races with functions like start_fetching_next_page 4302 # that reset _final_exception 4303 self._errbacks.append((fn, args, kwargs)) 4304 if self._final_exception: 4305 run_now = True 4306 if run_now: 4307 fn(self._final_exception, *args, **kwargs) 4308 return self 4309 4310 def add_callbacks(self, callback, errback, 4311 callback_args=(), callback_kwargs=None, 4312 errback_args=(), errback_kwargs=None): 4313 """ 4314 A convenient combination of :meth:`.add_callback()` and 4315 :meth:`.add_errback()`. 4316 4317 Example usage:: 4318 4319 >>> session = cluster.connect() 4320 >>> query = "SELECT * FROM mycf" 4321 >>> future = session.execute_async(query) 4322 4323 >>> def log_results(results, level='debug'): 4324 ... for row in results: 4325 ... log.log(level, "Result: %s", row) 4326 4327 >>> def log_error(exc, query): 4328 ... log.error("Query '%s' failed: %s", query, exc) 4329 4330 >>> future.add_callbacks( 4331 ... callback=log_results, callback_kwargs={'level': 'info'}, 4332 ... errback=log_error, errback_args=(query,)) 4333 4334 """ 4335 self.add_callback(callback, *callback_args, **(callback_kwargs or {})) 4336 self.add_errback(errback, *errback_args, **(errback_kwargs or {})) 4337 4338 def clear_callbacks(self): 4339 with self._callback_lock: 4340 self._callbacks = [] 4341 self._errbacks = [] 4342 4343 def __str__(self): 4344 result = "(no result yet)" if self._final_result is _NOT_SET else self._final_result 4345 return "<ResponseFuture: query='%s' request_id=%s result=%s exception=%s coordinator_host=%s>" \ 4346 % (self.query, self._req_id, result, self._final_exception, self.coordinator_host) 4347 __repr__ = __str__ 4348 4349 4350class QueryExhausted(Exception): 4351 """ 4352 Raised when :meth:`.ResponseFuture.start_fetching_next_page()` is called and 4353 there are no more pages. You can check :attr:`.ResponseFuture.has_more_pages` 4354 before calling to avoid this. 4355 4356 .. versionadded:: 2.0.0 4357 """ 4358 pass 4359 4360 4361class ResultSet(object): 4362 """ 4363 An iterator over the rows from a query result. Also supplies basic equality 4364 and indexing methods for backward-compatability. These methods materialize 4365 the entire result set (loading all pages), and should only be used if the 4366 total result size is understood. Warnings are emitted when paged results 4367 are materialized in this fashion. 4368 4369 You can treat this as a normal iterator over rows:: 4370 4371 >>> from cassandra.query import SimpleStatement 4372 >>> statement = SimpleStatement("SELECT * FROM users", fetch_size=10) 4373 >>> for user_row in session.execute(statement): 4374 ... process_user(user_row) 4375 4376 Whenever there are no more rows in the current page, the next page will 4377 be fetched transparently. However, note that it *is* possible for 4378 an :class:`Exception` to be raised while fetching the next page, just 4379 like you might see on a normal call to ``session.execute()``. 4380 """ 4381 4382 def __init__(self, response_future, initial_response): 4383 self.response_future = response_future 4384 self.column_names = response_future._col_names 4385 self.column_types = response_future._col_types 4386 self._set_current_rows(initial_response) 4387 self._page_iter = None 4388 self._list_mode = False 4389 4390 @property 4391 def has_more_pages(self): 4392 """ 4393 True if the last response indicated more pages; False otherwise 4394 """ 4395 return self.response_future.has_more_pages 4396 4397 @property 4398 def current_rows(self): 4399 """ 4400 The list of current page rows. May be empty if the result was empty, 4401 or this is the last page. 4402 """ 4403 return self._current_rows or [] 4404 4405 def one(self): 4406 """ 4407 Return a single row of the results or None if empty. This is basically 4408 a shortcut to `result_set.current_rows[0]` and should only be used when 4409 you know a query returns a single row. Consider using an iterator if the 4410 ResultSet contains more than one row. 4411 """ 4412 row = None 4413 if self._current_rows: 4414 try: 4415 row = self._current_rows[0] 4416 except TypeError: # generator object is not subscriptable, PYTHON-1026 4417 row = next(iter(self._current_rows)) 4418 4419 return row 4420 4421 def __iter__(self): 4422 if self._list_mode: 4423 return iter(self._current_rows) 4424 self._page_iter = iter(self._current_rows) 4425 return self 4426 4427 def next(self): 4428 try: 4429 return next(self._page_iter) 4430 except StopIteration: 4431 if not self.response_future.has_more_pages: 4432 if not self._list_mode: 4433 self._current_rows = [] 4434 raise 4435 4436 self.fetch_next_page() 4437 self._page_iter = iter(self._current_rows) 4438 4439 return next(self._page_iter) 4440 4441 __next__ = next 4442 4443 def fetch_next_page(self): 4444 """ 4445 Manually, synchronously fetch the next page. Supplied for manually retrieving pages 4446 and inspecting :meth:`~.current_page`. It is not necessary to call this when iterating 4447 through results; paging happens implicitly in iteration. 4448 """ 4449 if self.response_future.has_more_pages: 4450 self.response_future.start_fetching_next_page() 4451 result = self.response_future.result() 4452 self._current_rows = result._current_rows # ResultSet has already _set_current_rows to the appropriate form 4453 else: 4454 self._current_rows = [] 4455 4456 def _set_current_rows(self, result): 4457 if isinstance(result, Mapping): 4458 self._current_rows = [result] if result else [] 4459 return 4460 try: 4461 iter(result) # can't check directly for generator types because cython generators are different 4462 self._current_rows = result 4463 except TypeError: 4464 self._current_rows = [result] if result else [] 4465 4466 def _fetch_all(self): 4467 self._current_rows = list(self) 4468 self._page_iter = None 4469 4470 def _enter_list_mode(self, operator): 4471 if self._list_mode: 4472 return 4473 if self._page_iter: 4474 raise RuntimeError("Cannot use %s when results have been iterated." % operator) 4475 if self.response_future.has_more_pages: 4476 log.warning("Using %s on paged results causes entire result set to be materialized.", operator) 4477 self._fetch_all() # done regardless of paging status in case the row factory produces a generator 4478 self._list_mode = True 4479 4480 def __eq__(self, other): 4481 self._enter_list_mode("equality operator") 4482 return self._current_rows == other 4483 4484 def __getitem__(self, i): 4485 if i == 0: 4486 warn("ResultSet indexing support will be removed in 4.0. Consider using " 4487 "ResultSet.one() to get a single row.", DeprecationWarning) 4488 self._enter_list_mode("index operator") 4489 return self._current_rows[i] 4490 4491 def __nonzero__(self): 4492 return bool(self._current_rows) 4493 4494 __bool__ = __nonzero__ 4495 4496 def get_query_trace(self, max_wait_sec=None): 4497 """ 4498 Gets the last query trace from the associated future. 4499 See :meth:`.ResponseFuture.get_query_trace` for details. 4500 """ 4501 return self.response_future.get_query_trace(max_wait_sec) 4502 4503 def get_all_query_traces(self, max_wait_sec_per=None): 4504 """ 4505 Gets all query traces from the associated future. 4506 See :meth:`.ResponseFuture.get_all_query_traces` for details. 4507 """ 4508 return self.response_future.get_all_query_traces(max_wait_sec_per) 4509 4510 @property 4511 def was_applied(self): 4512 """ 4513 For LWT results, returns whether the transaction was applied. 4514 4515 Result is indeterminate if called on a result that was not an LWT request or on 4516 a :class:`.query.BatchStatement` containing LWT. In the latter case either all the batch 4517 succeeds or fails. 4518 4519 Only valid when one of the of the internal row factories is in use. 4520 """ 4521 if self.response_future.row_factory not in (named_tuple_factory, dict_factory, tuple_factory): 4522 raise RuntimeError("Cannot determine LWT result with row factory %s" % (self.response_future.row_factory,)) 4523 4524 is_batch_statement = isinstance(self.response_future.query, BatchStatement) 4525 if is_batch_statement and (not self.column_names or self.column_names[0] != "[applied]"): 4526 raise RuntimeError("No LWT were present in the BatchStatement") 4527 4528 if not is_batch_statement and len(self.current_rows) != 1: 4529 raise RuntimeError("LWT result should have exactly one row. This has %d." % (len(self.current_rows))) 4530 4531 row = self.current_rows[0] 4532 if isinstance(row, tuple): 4533 return row[0] 4534 else: 4535 return row['[applied]'] 4536 4537 @property 4538 def paging_state(self): 4539 """ 4540 Server paging state of the query. Can be `None` if the query was not paged. 4541 4542 The driver treats paging state as opaque, but it may contain primary key data, so applications may want to 4543 avoid sending this to untrusted parties. 4544 """ 4545 return self.response_future._paging_state 4546