1# Copyright DataStax, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16This module houses the main classes you will interact with,
17:class:`.Cluster` and :class:`.Session`.
18"""
19from __future__ import absolute_import
20
21import atexit
22from collections import defaultdict, Mapping
23from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
24from copy import copy
25from functools import partial, wraps
26from itertools import groupby, count
27import logging
28from warnings import warn
29from random import random
30import six
31from six.moves import filter, range, queue as Queue
32import socket
33import sys
34import time
35from threading import Lock, RLock, Thread, Event
36
37import weakref
38from weakref import WeakValueDictionary
39try:
40    from weakref import WeakSet
41except ImportError:
42    from cassandra.util import WeakSet  # NOQA
43
44from cassandra import (ConsistencyLevel, AuthenticationFailed,
45                       OperationTimedOut, UnsupportedOperation,
46                       SchemaTargetType, DriverException, ProtocolVersion,
47                       UnresolvableContactPoints)
48from cassandra.connection import (ConnectionException, ConnectionShutdown,
49                                  ConnectionHeartbeat, ProtocolVersionUnsupported,
50                                  EndPoint, DefaultEndPoint, DefaultEndPointFactory)
51from cassandra.cqltypes import UserType
52from cassandra.encoder import Encoder
53from cassandra.protocol import (QueryMessage, ResultMessage,
54                                ErrorMessage, ReadTimeoutErrorMessage,
55                                WriteTimeoutErrorMessage,
56                                UnavailableErrorMessage,
57                                OverloadedErrorMessage,
58                                PrepareMessage, ExecuteMessage,
59                                PreparedQueryNotFound,
60                                IsBootstrappingErrorMessage,
61                                TruncateError, ServerError,
62                                BatchMessage, RESULT_KIND_PREPARED,
63                                RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
64                                RESULT_KIND_SCHEMA_CHANGE, ProtocolHandler)
65from cassandra.metadata import Metadata, protect_name, murmur3
66from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
67                                ExponentialReconnectionPolicy, HostDistance,
68                                RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
69                                NoSpeculativeExecutionPolicy)
70from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
71                            HostConnectionPool, HostConnection,
72                            NoConnectionsAvailable)
73from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement,
74                             BatchStatement, bind_params, QueryTrace, TraceUnavailable,
75                             named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET)
76from cassandra.timestamps import MonotonicTimestampGenerator
77
78
79def _is_eventlet_monkey_patched():
80    if 'eventlet.patcher' not in sys.modules:
81        return False
82    import eventlet.patcher
83    return eventlet.patcher.is_monkey_patched('socket')
84
85
86def _is_gevent_monkey_patched():
87    if 'gevent.monkey' not in sys.modules:
88        return False
89    import gevent.socket
90    return socket.socket is gevent.socket.socket
91
92
93# default to gevent when we are monkey patched with gevent, eventlet when
94# monkey patched with eventlet, otherwise if libev is available, use that as
95# the default because it's fastest. Otherwise, use asyncore.
96if _is_gevent_monkey_patched():
97    from cassandra.io.geventreactor import GeventConnection as DefaultConnection
98elif _is_eventlet_monkey_patched():
99    from cassandra.io.eventletreactor import EventletConnection as DefaultConnection
100else:
101    try:
102        from cassandra.io.libevreactor import LibevConnection as DefaultConnection  # NOQA
103    except ImportError:
104        from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection  # NOQA
105
106# Forces load of utf8 encoding module to avoid deadlock that occurs
107# if code that is being imported tries to import the module in a seperate
108# thread.
109# See http://bugs.python.org/issue10923
110"".encode('utf8')
111
112log = logging.getLogger(__name__)
113
114
115DEFAULT_MIN_REQUESTS = 5
116DEFAULT_MAX_REQUESTS = 100
117
118DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST = 2
119DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST = 8
120
121DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST = 1
122DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST = 2
123
124
125_NOT_SET = object()
126
127
128class NoHostAvailable(Exception):
129    """
130    Raised when an operation is attempted but all connections are
131    busy, defunct, closed, or resulted in errors when used.
132    """
133
134    errors = None
135    """
136    A map of the form ``{ip: exception}`` which details the particular
137    Exception that was caught for each host the operation was attempted
138    against.
139    """
140
141    def __init__(self, message, errors):
142        Exception.__init__(self, message, errors)
143        self.errors = errors
144
145
146def _future_completed(future):
147    """ Helper for run_in_executor() """
148    exc = future.exception()
149    if exc:
150        log.debug("Failed to run task on executor", exc_info=exc)
151
152
153def run_in_executor(f):
154    """
155    A decorator to run the given method in the ThreadPoolExecutor.
156    """
157
158    @wraps(f)
159    def new_f(self, *args, **kwargs):
160
161        if self.is_shutdown:
162            return
163        try:
164            future = self.executor.submit(f, self, *args, **kwargs)
165            future.add_done_callback(_future_completed)
166        except Exception:
167            log.exception("Failed to submit task to executor")
168
169    return new_f
170
171
172_clusters_for_shutdown = set()
173
174
175def _register_cluster_shutdown(cluster):
176    _clusters_for_shutdown.add(cluster)
177
178
179def _discard_cluster_shutdown(cluster):
180    _clusters_for_shutdown.discard(cluster)
181
182
183def _shutdown_clusters():
184    clusters = _clusters_for_shutdown.copy()  # copy because shutdown modifies the global set "discard"
185    for cluster in clusters:
186        cluster.shutdown()
187
188
189atexit.register(_shutdown_clusters)
190
191
192def default_lbp_factory():
193    if murmur3 is not None:
194        return TokenAwarePolicy(DCAwareRoundRobinPolicy())
195    return DCAwareRoundRobinPolicy()
196
197
198def _addrinfo_or_none(contact_point, port):
199    """
200    A helper function that wraps socket.getaddrinfo and returns None
201    when it fails to, e.g. resolve one of the hostnames. Used to address
202    PYTHON-895.
203    """
204    try:
205        return socket.getaddrinfo(contact_point, port,
206                                  socket.AF_UNSPEC, socket.SOCK_STREAM)
207    except socket.gaierror:
208        log.debug('Could not resolve hostname "{}" '
209                  'with port {}'.format(contact_point, port))
210        return None
211
212
213def _resolve_contact_points(contact_points, port):
214    resolved = tuple(_addrinfo_or_none(p, port)
215                     for p in contact_points)
216
217    if resolved and all((x is None for x in resolved)):
218        raise UnresolvableContactPoints(contact_points, port)
219
220    resolved = tuple(r for r in resolved if r is not None)
221
222    return [endpoint[4][0]
223            for addrinfo in resolved
224            for endpoint in addrinfo]
225
226
227class ExecutionProfile(object):
228    load_balancing_policy = None
229    """
230    An instance of :class:`.policies.LoadBalancingPolicy` or one of its subclasses.
231
232    Used in determining host distance for establishing connections, and routing requests.
233
234    Defaults to ``TokenAwarePolicy(DCAwareRoundRobinPolicy())`` if not specified
235    """
236
237    retry_policy = None
238    """
239    An instance of :class:`.policies.RetryPolicy` instance used when :class:`.Statement` objects do not have a
240    :attr:`~.Statement.retry_policy` explicitly set.
241
242    Defaults to :class:`.RetryPolicy` if not specified
243    """
244
245    consistency_level = ConsistencyLevel.LOCAL_ONE
246    """
247    :class:`.ConsistencyLevel` used when not specified on a :class:`.Statement`.
248    """
249
250    serial_consistency_level = None
251    """
252    Serial :class:`.ConsistencyLevel` used when not specified on a :class:`.Statement` (for LWT conditional statements).
253    """
254
255    request_timeout = 10.0
256    """
257    Request timeout used when not overridden in :meth:`.Session.execute`
258    """
259
260    row_factory = staticmethod(tuple_factory)
261    """
262    A callable to format results, accepting ``(colnames, rows)`` where ``colnames`` is a list of column names, and
263    ``rows`` is a list of tuples, with each tuple representing a row of parsed values.
264
265    Some example implementations:
266
267    - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
268    - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
269    - :func:`cassandra.query.dict_factory` - return a result row as a dict
270    - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict
271    """
272
273    speculative_execution_policy = None
274    """
275    An instance of :class:`.policies.SpeculativeExecutionPolicy`
276
277    Defaults to :class:`.NoSpeculativeExecutionPolicy` if not specified
278    """
279
280    # indicates if lbp was set explicitly or uses default values
281    _load_balancing_policy_explicit = False
282
283    def __init__(self, load_balancing_policy=_NOT_SET, retry_policy=None,
284                 consistency_level=ConsistencyLevel.LOCAL_ONE, serial_consistency_level=None,
285                 request_timeout=10.0, row_factory=named_tuple_factory, speculative_execution_policy=None):
286
287        if load_balancing_policy is _NOT_SET:
288            self._load_balancing_policy_explicit = False
289            self.load_balancing_policy = default_lbp_factory()
290        else:
291            self._load_balancing_policy_explicit = True
292            self.load_balancing_policy = load_balancing_policy
293        self.retry_policy = retry_policy or RetryPolicy()
294        self.consistency_level = consistency_level
295
296        if (serial_consistency_level is not None and
297                not ConsistencyLevel.is_serial(serial_consistency_level)):
298            raise ValueError("serial_consistency_level must be either "
299                             "ConsistencyLevel.SERIAL "
300                             "or ConsistencyLevel.LOCAL_SERIAL.")
301        self.serial_consistency_level = serial_consistency_level
302
303        self.request_timeout = request_timeout
304        self.row_factory = row_factory
305        self.speculative_execution_policy = speculative_execution_policy or NoSpeculativeExecutionPolicy()
306
307
308class ProfileManager(object):
309
310    def __init__(self):
311        self.profiles = dict()
312
313    def _profiles_without_explicit_lbps(self):
314        names = (profile_name for
315                 profile_name, profile in self.profiles.items()
316                 if not profile._load_balancing_policy_explicit)
317        return tuple(
318            'EXEC_PROFILE_DEFAULT' if n is EXEC_PROFILE_DEFAULT else n
319            for n in names
320        )
321
322    def distance(self, host):
323        distances = set(p.load_balancing_policy.distance(host) for p in self.profiles.values())
324        return HostDistance.LOCAL if HostDistance.LOCAL in distances else \
325            HostDistance.REMOTE if HostDistance.REMOTE in distances else \
326            HostDistance.IGNORED
327
328    def populate(self, cluster, hosts):
329        for p in self.profiles.values():
330            p.load_balancing_policy.populate(cluster, hosts)
331
332    def check_supported(self):
333        for p in self.profiles.values():
334            p.load_balancing_policy.check_supported()
335
336    def on_up(self, host):
337        for p in self.profiles.values():
338            p.load_balancing_policy.on_up(host)
339
340    def on_down(self, host):
341        for p in self.profiles.values():
342            p.load_balancing_policy.on_down(host)
343
344    def on_add(self, host):
345        for p in self.profiles.values():
346            p.load_balancing_policy.on_add(host)
347
348    def on_remove(self, host):
349        for p in self.profiles.values():
350            p.load_balancing_policy.on_remove(host)
351
352    @property
353    def default(self):
354        """
355        internal-only; no checks are done because this entry is populated on cluster init
356        """
357        return self.profiles[EXEC_PROFILE_DEFAULT]
358
359
360EXEC_PROFILE_DEFAULT = object()
361"""
362Key for the ``Cluster`` default execution profile, used when no other profile is selected in
363``Session.execute(execution_profile)``.
364
365Use this as the key in ``Cluster(execution_profiles)`` to override the default profile.
366"""
367
368
369class _ConfigMode(object):
370    UNCOMMITTED = 0
371    LEGACY = 1
372    PROFILES = 2
373
374
375class Cluster(object):
376    """
377    The main class to use when interacting with a Cassandra cluster.
378    Typically, one instance of this class will be created for each
379    separate Cassandra cluster that your application interacts with.
380
381    Example usage::
382
383        >>> from cassandra.cluster import Cluster
384        >>> cluster = Cluster(['192.168.1.1', '192.168.1.2'])
385        >>> session = cluster.connect()
386        >>> session.execute("CREATE KEYSPACE ...")
387        >>> ...
388        >>> cluster.shutdown()
389
390    ``Cluster`` and ``Session`` also provide context management functions
391    which implicitly handle shutdown when leaving scope.
392    """
393
394    contact_points = ['127.0.0.1']
395    """
396    The list of contact points to try connecting for cluster discovery. A
397    contact point can be a string (ip, hostname) or a
398    :class:`.connection.EndPoint` instance.
399
400    Defaults to loopback interface.
401
402    Note: When using :class:`.DCAwareLoadBalancingPolicy` with no explicit
403    local_dc set (as is the default), the DC is chosen from an arbitrary
404    host in contact_points. In this case, contact_points should contain
405    only nodes from a single, local DC.
406
407    Note: In the next major version, if you specify contact points, you will
408    also be required to also explicitly specify a load-balancing policy. This
409    change will help prevent cases where users had hard-to-debug issues
410    surrounding unintuitive default load-balancing policy behavior.
411    """
412    # tracks if contact_points was set explicitly or with default values
413    _contact_points_explicit = None
414
415    port = 9042
416    """
417    The server-side port to open connections to. Defaults to 9042.
418    """
419
420    cql_version = None
421    """
422    If a specific version of CQL should be used, this may be set to that
423    string version.  Otherwise, the highest CQL version supported by the
424    server will be automatically used.
425    """
426
427    protocol_version = ProtocolVersion.V4
428    """
429    The maximum version of the native protocol to use.
430
431    See :class:`.ProtocolVersion` for more information about versions.
432
433    If not set in the constructor, the driver will automatically downgrade
434    version based on a negotiation with the server, but it is most efficient
435    to set this to the maximum supported by your version of Cassandra.
436    Setting this will also prevent conflicting versions negotiated if your
437    cluster is upgraded.
438
439    """
440
441    allow_beta_protocol_version = False
442
443    no_compact = False
444
445    """
446    Setting true injects a flag in all messages that makes the server accept and use "beta" protocol version.
447    Used for testing new protocol features incrementally before the new version is complete.
448    """
449
450    compression = True
451    """
452    Controls compression for communications between the driver and Cassandra.
453    If left as the default of :const:`True`, either lz4 or snappy compression
454    may be used, depending on what is supported by both the driver
455    and Cassandra.  If both are fully supported, lz4 will be preferred.
456
457    You may also set this to 'snappy' or 'lz4' to request that specific
458    compression type.
459
460    Setting this to :const:`False` disables compression.
461    """
462
463    _auth_provider = None
464    _auth_provider_callable = None
465
466    @property
467    def auth_provider(self):
468        """
469        When :attr:`~.Cluster.protocol_version` is 2 or higher, this should
470        be an instance of a subclass of :class:`~cassandra.auth.AuthProvider`,
471        such as :class:`~.PlainTextAuthProvider`.
472
473        When :attr:`~.Cluster.protocol_version` is 1, this should be
474        a function that accepts one argument, the IP address of a node,
475        and returns a dict of credentials for that node.
476
477        When not using authentication, this should be left as :const:`None`.
478        """
479        return self._auth_provider
480
481    @auth_provider.setter  # noqa
482    def auth_provider(self, value):
483        if not value:
484            self._auth_provider = value
485            return
486
487        try:
488            self._auth_provider_callable = value.new_authenticator
489        except AttributeError:
490            if self.protocol_version > 1:
491                raise TypeError("auth_provider must implement the cassandra.auth.AuthProvider "
492                                "interface when protocol_version >= 2")
493            elif not callable(value):
494                raise TypeError("auth_provider must be callable when protocol_version == 1")
495            self._auth_provider_callable = value
496
497        self._auth_provider = value
498
499    _load_balancing_policy = None
500    @property
501    def load_balancing_policy(self):
502        """
503        An instance of :class:`.policies.LoadBalancingPolicy` or
504        one of its subclasses.
505
506        .. versionchanged:: 2.6.0
507
508        Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
509        when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
510        otherwise. Default local DC will be chosen from contact points.
511
512        **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
513        DC locality and remote nodes.**
514        """
515        return self._load_balancing_policy
516
517    @load_balancing_policy.setter
518    def load_balancing_policy(self, lbp):
519        if self._config_mode == _ConfigMode.PROFILES:
520            raise ValueError("Cannot set Cluster.load_balancing_policy while using Configuration Profiles. Set this in a profile instead.")
521        self._load_balancing_policy = lbp
522        self._config_mode = _ConfigMode.LEGACY
523
524    @property
525    def _default_load_balancing_policy(self):
526        return self.profile_manager.default.load_balancing_policy
527
528    reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0)
529    """
530    An instance of :class:`.policies.ReconnectionPolicy`. Defaults to an instance
531    of :class:`.ExponentialReconnectionPolicy` with a base delay of one second and
532    a max delay of ten minutes.
533    """
534
535    _default_retry_policy = RetryPolicy()
536    @property
537    def default_retry_policy(self):
538        """
539        A default :class:`.policies.RetryPolicy` instance to use for all
540        :class:`.Statement` objects which do not have a :attr:`~.Statement.retry_policy`
541        explicitly set.
542        """
543        return self._default_retry_policy
544
545    @default_retry_policy.setter
546    def default_retry_policy(self, policy):
547        if self._config_mode == _ConfigMode.PROFILES:
548            raise ValueError("Cannot set Cluster.default_retry_policy while using Configuration Profiles. Set this in a profile instead.")
549        self._default_retry_policy = policy
550        self._config_mode = _ConfigMode.LEGACY
551
552    conviction_policy_factory = SimpleConvictionPolicy
553    """
554    A factory function which creates instances of
555    :class:`.policies.ConvictionPolicy`.  Defaults to
556    :class:`.policies.SimpleConvictionPolicy`.
557    """
558
559    address_translator = IdentityTranslator()
560    """
561    :class:`.policies.AddressTranslator` instance to be used in translating server node addresses
562    to driver connection addresses.
563    """
564
565    connect_to_remote_hosts = True
566    """
567    If left as :const:`True`, hosts that are considered :attr:`~.HostDistance.REMOTE`
568    by the :attr:`~.Cluster.load_balancing_policy` will have a connection
569    opened to them.  Otherwise, they will not have a connection opened to them.
570
571    Note that the default load balancing policy ignores remote hosts by default.
572
573    .. versionadded:: 2.1.0
574    """
575
576    metrics_enabled = False
577    """
578    Whether or not metric collection is enabled.  If enabled, :attr:`.metrics`
579    will be an instance of :class:`~cassandra.metrics.Metrics`.
580    """
581
582    metrics = None
583    """
584    An instance of :class:`cassandra.metrics.Metrics` if :attr:`.metrics_enabled` is
585    :const:`True`, else :const:`None`.
586    """
587
588    ssl_options = None
589    """
590    Using ssl_options without ssl_context is deprecated and will be removed in the
591    next major release.
592
593    An optional dict which will be used as kwargs for ``ssl.SSLContext.wrap_socket`` (or
594    ``ssl.wrap_socket()`` if used without ssl_context) when new sockets are created.
595    This should be used when client encryption is enabled in Cassandra.
596
597    The following documentation only applies when ssl_options is used without ssl_context.
598
599    By default, a ``ca_certs`` value should be supplied (the value should be
600    a string pointing to the location of the CA certs file), and you probably
601    want to specify ``ssl_version`` as ``ssl.PROTOCOL_TLSv1`` to match
602    Cassandra's default protocol.
603
604    .. versionchanged:: 3.3.0
605
606    In addition to ``wrap_socket`` kwargs, clients may also specify ``'check_hostname': True`` to verify the cert hostname
607    as outlined in RFC 2818 and RFC 6125. Note that this requires the certificate to be transferred, so
608    should almost always require the option ``'cert_reqs': ssl.CERT_REQUIRED``. Note also that this functionality was not built into
609    Python standard library until (2.7.9, 3.2). To enable this mechanism in earlier versions, patch ``ssl.match_hostname``
610    with a custom or `back-ported function <https://pypi.org/project/backports.ssl_match_hostname/>`_.
611    """
612
613    ssl_context = None
614    """
615    An optional ``ssl.SSLContext`` instance which will be used when new sockets are created.
616    This should be used when client encryption is enabled in Cassandra.
617
618    ``wrap_socket`` options can be set using :attr:`~Cluster.ssl_options`. ssl_options will
619    be used as kwargs for ``ssl.SSLContext.wrap_socket``.
620
621    .. versionadded:: 3.17.0
622    """
623
624    sockopts = None
625    """
626    An optional list of tuples which will be used as arguments to
627    ``socket.setsockopt()`` for all created sockets.
628
629    Note: some drivers find setting TCPNODELAY beneficial in the context of
630    their execution model. It was not found generally beneficial for this driver.
631    To try with your own workload, set ``sockopts = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
632    """
633
634    max_schema_agreement_wait = 10
635    """
636    The maximum duration (in seconds) that the driver will wait for schema
637    agreement across the cluster. Defaults to ten seconds.
638    If set <= 0, the driver will bypass schema agreement waits altogether.
639    """
640
641    metadata = None
642    """
643    An instance of :class:`cassandra.metadata.Metadata`.
644    """
645
646    connection_class = DefaultConnection
647    """
648    This determines what event loop system will be used for managing
649    I/O with Cassandra.  These are the current options:
650
651    * :class:`cassandra.io.asyncorereactor.AsyncoreConnection`
652    * :class:`cassandra.io.libevreactor.LibevConnection`
653    * :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details)
654    * :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details)
655    * :class:`cassandra.io.twistedreactor.TwistedConnection`
656    * EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection`
657
658    By default, ``AsyncoreConnection`` will be used, which uses
659    the ``asyncore`` module in the Python standard library.
660
661    If ``libev`` is installed, ``LibevConnection`` will be used instead.
662
663    If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding
664    connection class will be used automatically.
665
666    ``AsyncioConnection``, which uses the ``asyncio`` module in the Python
667    standard library, is also available, but currently experimental. Note that
668    it requires ``asyncio`` features that were only introduced in the 3.4 line
669    in 3.4.6, and in the 3.5 line in 3.5.1.
670    """
671
672    control_connection_timeout = 2.0
673    """
674    A timeout, in seconds, for queries made by the control connection, such
675    as querying the current schema and information about nodes in the cluster.
676    If set to :const:`None`, there will be no timeout for these queries.
677    """
678
679    idle_heartbeat_interval = 30
680    """
681    Interval, in seconds, on which to heartbeat idle connections. This helps
682    keep connections open through network devices that expire idle connections.
683    It also helps discover bad connections early in low-traffic scenarios.
684    Setting to zero disables heartbeats.
685    """
686
687    idle_heartbeat_timeout = 30
688    """
689    Timeout, in seconds, on which the heartbeat wait for idle connection responses.
690    Lowering this value can help to discover bad connections earlier.
691    """
692
693    schema_event_refresh_window = 2
694    """
695    Window, in seconds, within which a schema component will be refreshed after
696    receiving a schema_change event.
697
698    The driver delays a random amount of time in the range [0.0, window)
699    before executing the refresh. This serves two purposes:
700
701    1.) Spread the refresh for deployments with large fanout from C* to client tier,
702    preventing a 'thundering herd' problem with many clients refreshing simultaneously.
703
704    2.) Remove redundant refreshes. Redundant events arriving within the delay period
705    are discarded, and only one refresh is executed.
706
707    Setting this to zero will execute refreshes immediately.
708
709    Setting this negative will disable schema refreshes in response to push events
710    (refreshes will still occur in response to schema change responses to DDL statements
711    executed by Sessions of this Cluster).
712    """
713
714    topology_event_refresh_window = 10
715    """
716    Window, in seconds, within which the node and token list will be refreshed after
717    receiving a topology_change event.
718
719    Setting this to zero will execute refreshes immediately.
720
721    Setting this negative will disable node refreshes in response to push events.
722
723    See :attr:`.schema_event_refresh_window` for discussion of rationale
724    """
725
726    status_event_refresh_window = 2
727    """
728    Window, in seconds, within which the driver will start the reconnect after
729    receiving a status_change event.
730
731    Setting this to zero will connect immediately.
732
733    This is primarily used to avoid 'thundering herd' in deployments with large fanout from cluster to clients.
734    When nodes come up, clients attempt to reprepare prepared statements (depending on :attr:`.reprepare_on_up`), and
735    establish connection pools. This can cause a rush of connections and queries if not mitigated with this factor.
736    """
737
738    prepare_on_all_hosts = True
739    """
740    Specifies whether statements should be prepared on all hosts, or just one.
741
742    This can reasonably be disabled on long-running applications with numerous clients preparing statements on startup,
743    where a randomized initial condition of the load balancing policy can be expected to distribute prepares from
744    different clients across the cluster.
745    """
746
747    reprepare_on_up = True
748    """
749    Specifies whether all known prepared statements should be prepared on a node when it comes up.
750
751    May be used to avoid overwhelming a node on return, or if it is supposed that the node was only marked down due to
752    network. If statements are not reprepared, they are prepared on the first execution, causing
753    an extra roundtrip for one or more client requests.
754    """
755
756    connect_timeout = 5
757    """
758    Timeout, in seconds, for creating new connections.
759
760    This timeout covers the entire connection negotiation, including TCP
761    establishment, options passing, and authentication.
762    """
763
764    timestamp_generator = None
765    """
766    An object, shared between all sessions created by this cluster instance,
767    that generates timestamps when client-side timestamp generation is enabled.
768    By default, each :class:`Cluster` uses a new
769    :class:`~.MonotonicTimestampGenerator`.
770
771    Applications can set this value for custom timestamp behavior. See the
772    documentation for :meth:`Session.timestamp_generator`.
773    """
774
775    @property
776    def schema_metadata_enabled(self):
777        """
778        Flag indicating whether internal schema metadata is updated.
779
780        When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This
781        can be used to speed initial connection, and reduce load on client and server during operation. Turning this off
782        gives away token aware request routing, and programmatic inspection of the metadata model.
783        """
784        return self.control_connection._schema_meta_enabled
785
786    @schema_metadata_enabled.setter
787    def schema_metadata_enabled(self, enabled):
788        self.control_connection._schema_meta_enabled = bool(enabled)
789
790    @property
791    def token_metadata_enabled(self):
792        """
793        Flag indicating whether internal token metadata is updated.
794
795        When disabled, the driver does not query node token information on connect, or on topology change events. This
796        can be used to speed initial connection, and reduce load on client and server during operation. It is most useful
797        in large clusters using vnodes, where the token map can be expensive to compute. Turning this off
798        gives away token aware request routing, and programmatic inspection of the token ring.
799        """
800        return self.control_connection._token_meta_enabled
801
802    @token_metadata_enabled.setter
803    def token_metadata_enabled(self, enabled):
804        self.control_connection._token_meta_enabled = bool(enabled)
805
806    endpoint_factory = None
807    """
808    An :class:`~.connection.EndPointFactory` instance to use internally when creating
809    a socket connection to a node. You can ignore this unless you need a special
810    connection mechanism.
811    """
812
813    profile_manager = None
814    _config_mode = _ConfigMode.UNCOMMITTED
815
816    sessions = None
817    control_connection = None
818    scheduler = None
819    executor = None
820    is_shutdown = False
821    _is_setup = False
822    _prepared_statements = None
823    _prepared_statement_lock = None
824    _idle_heartbeat = None
825    _protocol_version_explicit = False
826    _discount_down_events = True
827
828    _user_types = None
829    """
830    A map of {keyspace: {type_name: UserType}}
831    """
832
833    _listeners = None
834    _listener_lock = None
835
836    def __init__(self,
837                 contact_points=_NOT_SET,
838                 port=9042,
839                 compression=True,
840                 auth_provider=None,
841                 load_balancing_policy=None,
842                 reconnection_policy=None,
843                 default_retry_policy=None,
844                 conviction_policy_factory=None,
845                 metrics_enabled=False,
846                 connection_class=None,
847                 ssl_options=None,
848                 sockopts=None,
849                 cql_version=None,
850                 protocol_version=_NOT_SET,
851                 executor_threads=2,
852                 max_schema_agreement_wait=10,
853                 control_connection_timeout=2.0,
854                 idle_heartbeat_interval=30,
855                 schema_event_refresh_window=2,
856                 topology_event_refresh_window=10,
857                 connect_timeout=5,
858                 schema_metadata_enabled=True,
859                 token_metadata_enabled=True,
860                 address_translator=None,
861                 status_event_refresh_window=2,
862                 prepare_on_all_hosts=True,
863                 reprepare_on_up=True,
864                 execution_profiles=None,
865                 allow_beta_protocol_version=False,
866                 timestamp_generator=None,
867                 idle_heartbeat_timeout=30,
868                 no_compact=False,
869                 ssl_context=None,
870                 endpoint_factory=None):
871        """
872        ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
873        extablishing connection pools or refreshing metadata.
874
875        Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.
876        """
877        if contact_points is not None:
878            if contact_points is _NOT_SET:
879                self._contact_points_explicit = False
880                contact_points = ['127.0.0.1']
881            else:
882                self._contact_points_explicit = True
883
884            if isinstance(contact_points, six.string_types):
885                raise TypeError("contact_points should not be a string, it should be a sequence (e.g. list) of strings")
886
887            if None in contact_points:
888                raise ValueError("contact_points should not contain None (it can resolve to localhost)")
889            self.contact_points = contact_points
890
891        self.port = port
892
893        self.endpoint_factory = endpoint_factory or DefaultEndPointFactory()
894        self.endpoint_factory.configure(self)
895
896        raw_contact_points = [cp for cp in self.contact_points if not isinstance(cp, EndPoint)]
897        self.endpoints_resolved = [cp for cp in self.contact_points if isinstance(cp, EndPoint)]
898
899        try:
900            self.endpoints_resolved += [DefaultEndPoint(address, self.port)
901                                        for address in _resolve_contact_points(raw_contact_points, self.port)]
902        except UnresolvableContactPoints:
903            # rethrow if no EndPoint was provided
904            if not self.endpoints_resolved:
905                raise
906
907        self.compression = compression
908
909        if protocol_version is not _NOT_SET:
910            self.protocol_version = protocol_version
911            self._protocol_version_explicit = True
912        self.allow_beta_protocol_version = allow_beta_protocol_version
913
914        self.no_compact = no_compact
915
916        self.auth_provider = auth_provider
917
918        if load_balancing_policy is not None:
919            if isinstance(load_balancing_policy, type):
920                raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class")
921            self.load_balancing_policy = load_balancing_policy
922        else:
923            self._load_balancing_policy = default_lbp_factory()  # set internal attribute to avoid committing to legacy config mode
924
925        if reconnection_policy is not None:
926            if isinstance(reconnection_policy, type):
927                raise TypeError("reconnection_policy should not be a class, it should be an instance of that class")
928            self.reconnection_policy = reconnection_policy
929
930        if default_retry_policy is not None:
931            if isinstance(default_retry_policy, type):
932                raise TypeError("default_retry_policy should not be a class, it should be an instance of that class")
933            self.default_retry_policy = default_retry_policy
934
935        if conviction_policy_factory is not None:
936            if not callable(conviction_policy_factory):
937                raise ValueError("conviction_policy_factory must be callable")
938            self.conviction_policy_factory = conviction_policy_factory
939
940        if address_translator is not None:
941            if isinstance(address_translator, type):
942                raise TypeError("address_translator should not be a class, it should be an instance of that class")
943            self.address_translator = address_translator
944
945        if connection_class is not None:
946            self.connection_class = connection_class
947
948        if timestamp_generator is not None:
949            if not callable(timestamp_generator):
950                raise ValueError("timestamp_generator must be callable")
951            self.timestamp_generator = timestamp_generator
952        else:
953            self.timestamp_generator = MonotonicTimestampGenerator()
954
955        self.profile_manager = ProfileManager()
956        self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy,
957                                                                               self.default_retry_policy,
958                                                                               Session._default_consistency_level,
959                                                                               Session._default_serial_consistency_level,
960                                                                               Session._default_timeout,
961                                                                               Session._row_factory)
962        # legacy mode if either of these is not default
963        if load_balancing_policy or default_retry_policy:
964            if execution_profiles:
965                raise ValueError("Clusters constructed with execution_profiles should not specify legacy parameters "
966                                 "load_balancing_policy or default_retry_policy. Configure this in a profile instead.")
967
968            self._config_mode = _ConfigMode.LEGACY
969            warn("Legacy execution parameters will be removed in 4.0. Consider using "
970                 "execution profiles.", DeprecationWarning)
971
972        else:
973            if execution_profiles:
974                self.profile_manager.profiles.update(execution_profiles)
975                self._config_mode = _ConfigMode.PROFILES
976
977        if self._contact_points_explicit:
978            if self._config_mode is _ConfigMode.PROFILES:
979                default_lbp_profiles = self.profile_manager._profiles_without_explicit_lbps()
980                if default_lbp_profiles:
981                    log.warning(
982                        'Cluster.__init__ called with contact_points '
983                        'specified, but load-balancing policies are not '
984                        'specified in some ExecutionProfiles. In the next '
985                        'major version, this will raise an error; please '
986                        'specify a load-balancing policy. '
987                        '(contact_points = {cp}, '
988                        'EPs without explicit LBPs = {eps})'
989                        ''.format(cp=contact_points, eps=default_lbp_profiles))
990            else:
991                if load_balancing_policy is None:
992                    log.warning(
993                        'Cluster.__init__ called with contact_points '
994                        'specified, but no load_balancing_policy. In the next '
995                        'major version, this will raise an error; please '
996                        'specify a load-balancing policy. '
997                        '(contact_points = {cp}, lbp = {lbp})'
998                        ''.format(cp=contact_points, lbp=load_balancing_policy))
999
1000        self.metrics_enabled = metrics_enabled
1001
1002        if ssl_options and not ssl_context:
1003            warn('Using ssl_options without ssl_context is '
1004                 'deprecated and will result in an error in '
1005                 'the next major release. Please use ssl_context '
1006                 'to prepare for that release.',
1007                 DeprecationWarning)
1008
1009        self.ssl_options = ssl_options
1010        self.ssl_context = ssl_context
1011        self.sockopts = sockopts
1012        self.cql_version = cql_version
1013        self.max_schema_agreement_wait = max_schema_agreement_wait
1014        self.control_connection_timeout = control_connection_timeout
1015        self.idle_heartbeat_interval = idle_heartbeat_interval
1016        self.idle_heartbeat_timeout = idle_heartbeat_timeout
1017        self.schema_event_refresh_window = schema_event_refresh_window
1018        self.topology_event_refresh_window = topology_event_refresh_window
1019        self.status_event_refresh_window = status_event_refresh_window
1020        self.connect_timeout = connect_timeout
1021        self.prepare_on_all_hosts = prepare_on_all_hosts
1022        self.reprepare_on_up = reprepare_on_up
1023
1024        self._listeners = set()
1025        self._listener_lock = Lock()
1026
1027        # let Session objects be GC'ed (and shutdown) when the user no longer
1028        # holds a reference.
1029        self.sessions = WeakSet()
1030        self.metadata = Metadata()
1031        self.control_connection = None
1032        self._prepared_statements = WeakValueDictionary()
1033        self._prepared_statement_lock = Lock()
1034
1035        self._user_types = defaultdict(dict)
1036
1037        self._min_requests_per_connection = {
1038            HostDistance.LOCAL: DEFAULT_MIN_REQUESTS,
1039            HostDistance.REMOTE: DEFAULT_MIN_REQUESTS
1040        }
1041
1042        self._max_requests_per_connection = {
1043            HostDistance.LOCAL: DEFAULT_MAX_REQUESTS,
1044            HostDistance.REMOTE: DEFAULT_MAX_REQUESTS
1045        }
1046
1047        self._core_connections_per_host = {
1048            HostDistance.LOCAL: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST,
1049            HostDistance.REMOTE: DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST
1050        }
1051
1052        self._max_connections_per_host = {
1053            HostDistance.LOCAL: DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST,
1054            HostDistance.REMOTE: DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST
1055        }
1056
1057        self.executor = ThreadPoolExecutor(max_workers=executor_threads)
1058        self.scheduler = _Scheduler(self.executor)
1059
1060        self._lock = RLock()
1061
1062        if self.metrics_enabled:
1063            from cassandra.metrics import Metrics
1064            self.metrics = Metrics(weakref.proxy(self))
1065
1066        self.control_connection = ControlConnection(
1067            self, self.control_connection_timeout,
1068            self.schema_event_refresh_window, self.topology_event_refresh_window,
1069            self.status_event_refresh_window,
1070            schema_metadata_enabled, token_metadata_enabled)
1071
1072    def register_user_type(self, keyspace, user_type, klass):
1073        """
1074        Registers a class to use to represent a particular user-defined type.
1075        Query parameters for this user-defined type will be assumed to be
1076        instances of `klass`.  Result sets for this user-defined type will
1077        be instances of `klass`.  If no class is registered for a user-defined
1078        type, a namedtuple will be used for result sets, and non-prepared
1079        statements may not encode parameters for this type correctly.
1080
1081        `keyspace` is the name of the keyspace that the UDT is defined in.
1082
1083        `user_type` is the string name of the UDT to register the mapping
1084        for.
1085
1086        `klass` should be a class with attributes whose names match the
1087        fields of the user-defined type.  The constructor must accepts kwargs
1088        for each of the fields in the UDT.
1089
1090        This method should only be called after the type has been created
1091        within Cassandra.
1092
1093        Example::
1094
1095            cluster = Cluster(protocol_version=3)
1096            session = cluster.connect()
1097            session.set_keyspace('mykeyspace')
1098            session.execute("CREATE TYPE address (street text, zipcode int)")
1099            session.execute("CREATE TABLE users (id int PRIMARY KEY, location address)")
1100
1101            # create a class to map to the "address" UDT
1102            class Address(object):
1103
1104                def __init__(self, street, zipcode):
1105                    self.street = street
1106                    self.zipcode = zipcode
1107
1108            cluster.register_user_type('mykeyspace', 'address', Address)
1109
1110            # insert a row using an instance of Address
1111            session.execute("INSERT INTO users (id, location) VALUES (%s, %s)",
1112                            (0, Address("123 Main St.", 78723)))
1113
1114            # results will include Address instances
1115            results = session.execute("SELECT * FROM users")
1116            row = results[0]
1117            print row.id, row.location.street, row.location.zipcode
1118
1119        """
1120        if self.protocol_version < 3:
1121            log.warning("User Type serialization is only supported in native protocol version 3+ (%d in use). "
1122                        "CQL encoding for simple statements will still work, but named tuples will "
1123                        "be returned when reading type %s.%s.", self.protocol_version, keyspace, user_type)
1124
1125        self._user_types[keyspace][user_type] = klass
1126        for session in tuple(self.sessions):
1127            session.user_type_registered(keyspace, user_type, klass)
1128        UserType.evict_udt_class(keyspace, user_type)
1129
1130    def add_execution_profile(self, name, profile, pool_wait_timeout=5):
1131        """
1132        Adds an :class:`.ExecutionProfile` to the cluster. This makes it available for use by ``name`` in :meth:`.Session.execute`
1133        and :meth:`.Session.execute_async`. This method will raise if the profile already exists.
1134
1135        Normally profiles will be injected at cluster initialization via ``Cluster(execution_profiles)``. This method
1136        provides a way of adding them dynamically.
1137
1138        Adding a new profile updates the connection pools according to the specified ``load_balancing_policy``. By default,
1139        this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately
1140        upon return. This behavior can be controlled using ``pool_wait_timeout`` (see
1141        `concurrent.futures.wait <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait>`_
1142        for timeout semantics).
1143        """
1144        if not isinstance(profile, ExecutionProfile):
1145            raise TypeError("profile must be an instance of ExecutionProfile")
1146        if self._config_mode == _ConfigMode.LEGACY:
1147            raise ValueError("Cannot add execution profiles when legacy parameters are set explicitly.")
1148        if name in self.profile_manager.profiles:
1149            raise ValueError("Profile {} already exists".format(name))
1150        contact_points_but_no_lbp = (
1151            self._contact_points_explicit and not
1152            profile._load_balancing_policy_explicit)
1153        if contact_points_but_no_lbp:
1154            log.warning(
1155                'Tried to add an ExecutionProfile with name {name}. '
1156                '{self} was explicitly configured with contact_points, but '
1157                '{ep} was not explicitly configured with a '
1158                'load_balancing_policy. In the next major version, trying to '
1159                'add an ExecutionProfile without an explicitly configured LBP '
1160                'to a cluster with explicitly configured contact_points will '
1161                'raise an exception; please specify a load-balancing policy '
1162                'in the ExecutionProfile.'
1163                ''.format(name=repr(name), self=self, ep=profile))
1164
1165        self.profile_manager.profiles[name] = profile
1166        profile.load_balancing_policy.populate(self, self.metadata.all_hosts())
1167        # on_up after populate allows things like DCA LBP to choose default local dc
1168        for host in filter(lambda h: h.is_up, self.metadata.all_hosts()):
1169            profile.load_balancing_policy.on_up(host)
1170        futures = set()
1171        for session in tuple(self.sessions):
1172            futures.update(session.update_created_pools())
1173        _, not_done = wait_futures(futures, pool_wait_timeout)
1174        if not_done:
1175            raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
1176
1177    def get_min_requests_per_connection(self, host_distance):
1178        return self._min_requests_per_connection[host_distance]
1179
1180    def set_min_requests_per_connection(self, host_distance, min_requests):
1181        """
1182        Sets a threshold for concurrent requests per connection, below which
1183        connections will be considered for disposal (down to core connections;
1184        see :meth:`~Cluster.set_core_connections_per_host`).
1185
1186        Pertains to connection pool management in protocol versions {1,2}.
1187        """
1188        if self.protocol_version >= 3:
1189            raise UnsupportedOperation(
1190                "Cluster.set_min_requests_per_connection() only has an effect "
1191                "when using protocol_version 1 or 2.")
1192        if min_requests < 0 or min_requests > 126 or \
1193           min_requests >= self._max_requests_per_connection[host_distance]:
1194            raise ValueError("min_requests must be 0-126 and less than the max_requests for this host_distance (%d)" %
1195                             (self._min_requests_per_connection[host_distance],))
1196        self._min_requests_per_connection[host_distance] = min_requests
1197
1198    def get_max_requests_per_connection(self, host_distance):
1199        return self._max_requests_per_connection[host_distance]
1200
1201    def set_max_requests_per_connection(self, host_distance, max_requests):
1202        """
1203        Sets a threshold for concurrent requests per connection, above which new
1204        connections will be created to a host (up to max connections;
1205        see :meth:`~Cluster.set_max_connections_per_host`).
1206
1207        Pertains to connection pool management in protocol versions {1,2}.
1208        """
1209        if self.protocol_version >= 3:
1210            raise UnsupportedOperation(
1211                "Cluster.set_max_requests_per_connection() only has an effect "
1212                "when using protocol_version 1 or 2.")
1213        if max_requests < 1 or max_requests > 127 or \
1214           max_requests <= self._min_requests_per_connection[host_distance]:
1215            raise ValueError("max_requests must be 1-127 and greater than the min_requests for this host_distance (%d)" %
1216                             (self._min_requests_per_connection[host_distance],))
1217        self._max_requests_per_connection[host_distance] = max_requests
1218
1219    def get_core_connections_per_host(self, host_distance):
1220        """
1221        Gets the minimum number of connections per Session that will be opened
1222        for each host with :class:`~.HostDistance` equal to `host_distance`.
1223        The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
1224        :attr:`~HostDistance.REMOTE`.
1225
1226        This property is ignored if :attr:`~.Cluster.protocol_version` is
1227        3 or higher.
1228        """
1229        return self._core_connections_per_host[host_distance]
1230
1231    def set_core_connections_per_host(self, host_distance, core_connections):
1232        """
1233        Sets the minimum number of connections per Session that will be opened
1234        for each host with :class:`~.HostDistance` equal to `host_distance`.
1235        The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
1236        :attr:`~HostDistance.REMOTE`.
1237
1238        Protocol version 1 and 2 are limited in the number of concurrent
1239        requests they can send per connection. The driver implements connection
1240        pooling to support higher levels of concurrency.
1241
1242        If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
1243        is not supported (there is always one connection per host, unless
1244        the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
1245        and using this will result in an :exc:`~.UnsupporteOperation`.
1246        """
1247        if self.protocol_version >= 3:
1248            raise UnsupportedOperation(
1249                "Cluster.set_core_connections_per_host() only has an effect "
1250                "when using protocol_version 1 or 2.")
1251        old = self._core_connections_per_host[host_distance]
1252        self._core_connections_per_host[host_distance] = core_connections
1253        if old < core_connections:
1254            self._ensure_core_connections()
1255
1256    def get_max_connections_per_host(self, host_distance):
1257        """
1258        Gets the maximum number of connections per Session that will be opened
1259        for each host with :class:`~.HostDistance` equal to `host_distance`.
1260        The default is 8 for :attr:`~HostDistance.LOCAL` and 2 for
1261        :attr:`~HostDistance.REMOTE`.
1262
1263        This property is ignored if :attr:`~.Cluster.protocol_version` is
1264        3 or higher.
1265        """
1266        return self._max_connections_per_host[host_distance]
1267
1268    def set_max_connections_per_host(self, host_distance, max_connections):
1269        """
1270        Sets the maximum number of connections per Session that will be opened
1271        for each host with :class:`~.HostDistance` equal to `host_distance`.
1272        The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
1273        :attr:`~HostDistance.REMOTE`.
1274
1275        If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
1276        is not supported (there is always one connection per host, unless
1277        the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
1278        and using this will result in an :exc:`~.UnsupporteOperation`.
1279        """
1280        if self.protocol_version >= 3:
1281            raise UnsupportedOperation(
1282                "Cluster.set_max_connections_per_host() only has an effect "
1283                "when using protocol_version 1 or 2.")
1284        self._max_connections_per_host[host_distance] = max_connections
1285
1286    def connection_factory(self, endpoint, *args, **kwargs):
1287        """
1288        Called to create a new connection with proper configuration.
1289        Intended for internal use only.
1290        """
1291        kwargs = self._make_connection_kwargs(endpoint, kwargs)
1292        return self.connection_class.factory(endpoint, self.connect_timeout, *args, **kwargs)
1293
1294    def _make_connection_factory(self, host, *args, **kwargs):
1295        kwargs = self._make_connection_kwargs(host.endpoint, kwargs)
1296        return partial(self.connection_class.factory, host.endpoint, self.connect_timeout, *args, **kwargs)
1297
1298    def _make_connection_kwargs(self, endpoint, kwargs_dict):
1299        if self._auth_provider_callable:
1300            kwargs_dict.setdefault('authenticator', self._auth_provider_callable(endpoint.address))
1301
1302        kwargs_dict.setdefault('port', self.port)
1303        kwargs_dict.setdefault('compression', self.compression)
1304        kwargs_dict.setdefault('sockopts', self.sockopts)
1305        kwargs_dict.setdefault('ssl_options', self.ssl_options)
1306        kwargs_dict.setdefault('ssl_context', self.ssl_context)
1307        kwargs_dict.setdefault('cql_version', self.cql_version)
1308        kwargs_dict.setdefault('protocol_version', self.protocol_version)
1309        kwargs_dict.setdefault('user_type_map', self._user_types)
1310        kwargs_dict.setdefault('allow_beta_protocol_version', self.allow_beta_protocol_version)
1311        kwargs_dict.setdefault('no_compact', self.no_compact)
1312
1313        return kwargs_dict
1314
1315    def protocol_downgrade(self, host_endpoint, previous_version):
1316        if self._protocol_version_explicit:
1317            raise DriverException("ProtocolError returned from server while using explicitly set client protocol_version %d" % (previous_version,))
1318
1319        new_version = ProtocolVersion.get_lower_supported(previous_version)
1320        if new_version < ProtocolVersion.MIN_SUPPORTED:
1321            raise DriverException(
1322                "Cannot downgrade protocol version below minimum supported version: %d" % (ProtocolVersion.MIN_SUPPORTED,))
1323
1324        log.warning("Downgrading core protocol version from %d to %d for %s. "
1325                    "To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. "
1326                    "http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint)
1327        self.protocol_version = new_version
1328
1329    def connect(self, keyspace=None, wait_for_all_pools=False):
1330        """
1331        Creates and returns a new :class:`~.Session` object.  If `keyspace`
1332        is specified, that keyspace will be the default keyspace for
1333        operations on the ``Session``.
1334        """
1335        with self._lock:
1336            if self.is_shutdown:
1337                raise DriverException("Cluster is already shut down")
1338
1339            if not self._is_setup:
1340                log.debug("Connecting to cluster, contact points: %s; protocol version: %s",
1341                          self.contact_points, self.protocol_version)
1342                self.connection_class.initialize_reactor()
1343                _register_cluster_shutdown(self)
1344                for endpoint in self.endpoints_resolved:
1345                    host, new = self.add_host(endpoint, signal=False)
1346                    if new:
1347                        host.set_up()
1348                        for listener in self.listeners:
1349                            listener.on_add(host)
1350
1351                self.profile_manager.populate(
1352                    weakref.proxy(self), self.metadata.all_hosts())
1353                self.load_balancing_policy.populate(
1354                    weakref.proxy(self), self.metadata.all_hosts()
1355                )
1356
1357                try:
1358                    self.control_connection.connect()
1359
1360                    # we set all contact points up for connecting, but we won't infer state after this
1361                    for endpoint in self.endpoints_resolved:
1362                        h = self.metadata.get_host(endpoint)
1363                        if h and self.profile_manager.distance(h) == HostDistance.IGNORED:
1364                            h.is_up = None
1365
1366                    log.debug("Control connection created")
1367                except Exception:
1368                    log.exception("Control connection failed to connect, "
1369                                  "shutting down Cluster:")
1370                    self.shutdown()
1371                    raise
1372
1373                self.profile_manager.check_supported()  # todo: rename this method
1374
1375                if self.idle_heartbeat_interval:
1376                    self._idle_heartbeat = ConnectionHeartbeat(
1377                        self.idle_heartbeat_interval,
1378                        self.get_connection_holders,
1379                        timeout=self.idle_heartbeat_timeout
1380                    )
1381                self._is_setup = True
1382
1383        session = self._new_session(keyspace)
1384        if wait_for_all_pools:
1385            wait_futures(session._initial_connect_futures)
1386        return session
1387
1388    def get_connection_holders(self):
1389        holders = []
1390        for s in tuple(self.sessions):
1391            holders.extend(s.get_pools())
1392        holders.append(self.control_connection)
1393        return holders
1394
1395    def shutdown(self):
1396        """
1397        Closes all sessions and connection associated with this Cluster.
1398        To ensure all connections are properly closed, **you should always
1399        call shutdown() on a Cluster instance when you are done with it**.
1400
1401        Once shutdown, a Cluster should not be used for any purpose.
1402        """
1403        with self._lock:
1404            if self.is_shutdown:
1405                return
1406            else:
1407                self.is_shutdown = True
1408
1409        if self._idle_heartbeat:
1410            self._idle_heartbeat.stop()
1411
1412        self.scheduler.shutdown()
1413
1414        self.control_connection.shutdown()
1415
1416        for session in tuple(self.sessions):
1417            session.shutdown()
1418
1419        self.executor.shutdown()
1420
1421        _discard_cluster_shutdown(self)
1422
1423    def __enter__(self):
1424        return self
1425
1426    def __exit__(self, *args):
1427        self.shutdown()
1428
1429    def _new_session(self, keyspace):
1430        session = Session(self, self.metadata.all_hosts(), keyspace)
1431        self._session_register_user_types(session)
1432        self.sessions.add(session)
1433        return session
1434
1435    def _session_register_user_types(self, session):
1436        for keyspace, type_map in six.iteritems(self._user_types):
1437            for udt_name, klass in six.iteritems(type_map):
1438                session.user_type_registered(keyspace, udt_name, klass)
1439
1440    def _cleanup_failed_on_up_handling(self, host):
1441        self.profile_manager.on_down(host)
1442        self.control_connection.on_down(host)
1443        for session in tuple(self.sessions):
1444            session.remove_pool(host)
1445
1446        self._start_reconnector(host, is_host_addition=False)
1447
1448    def _on_up_future_completed(self, host, futures, results, lock, finished_future):
1449        with lock:
1450            futures.discard(finished_future)
1451
1452            try:
1453                results.append(finished_future.result())
1454            except Exception as exc:
1455                results.append(exc)
1456
1457            if futures:
1458                return
1459
1460        try:
1461            # all futures have completed at this point
1462            for exc in [f for f in results if isinstance(f, Exception)]:
1463                log.error("Unexpected failure while marking node %s up:", host, exc_info=exc)
1464                self._cleanup_failed_on_up_handling(host)
1465                return
1466
1467            if not all(results):
1468                log.debug("Connection pool could not be created, not marking node %s up", host)
1469                self._cleanup_failed_on_up_handling(host)
1470                return
1471
1472            log.info("Connection pools established for node %s", host)
1473            # mark the host as up and notify all listeners
1474            host.set_up()
1475            for listener in self.listeners:
1476                listener.on_up(host)
1477        finally:
1478            with host.lock:
1479                host._currently_handling_node_up = False
1480
1481        # see if there are any pools to add or remove now that the host is marked up
1482        for session in tuple(self.sessions):
1483            session.update_created_pools()
1484
1485    def on_up(self, host):
1486        """
1487        Intended for internal use only.
1488        """
1489        if self.is_shutdown:
1490            return
1491
1492        log.debug("Waiting to acquire lock for handling up status of node %s", host)
1493        with host.lock:
1494            if host._currently_handling_node_up:
1495                log.debug("Another thread is already handling up status of node %s", host)
1496                return
1497
1498            if host.is_up:
1499                log.debug("Host %s was already marked up", host)
1500                return
1501
1502            host._currently_handling_node_up = True
1503        log.debug("Starting to handle up status of node %s", host)
1504
1505        have_future = False
1506        futures = set()
1507        try:
1508            log.info("Host %s may be up; will prepare queries and open connection pool", host)
1509
1510            reconnector = host.get_and_set_reconnection_handler(None)
1511            if reconnector:
1512                log.debug("Now that host %s is up, cancelling the reconnection handler", host)
1513                reconnector.cancel()
1514
1515            if self.profile_manager.distance(host) != HostDistance.IGNORED:
1516                self._prepare_all_queries(host)
1517                log.debug("Done preparing all queries for host %s, ", host)
1518
1519            for session in tuple(self.sessions):
1520                session.remove_pool(host)
1521
1522            log.debug("Signalling to load balancing policies that host %s is up", host)
1523            self.profile_manager.on_up(host)
1524
1525            log.debug("Signalling to control connection that host %s is up", host)
1526            self.control_connection.on_up(host)
1527
1528            log.debug("Attempting to open new connection pools for host %s", host)
1529            futures_lock = Lock()
1530            futures_results = []
1531            callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock)
1532            for session in tuple(self.sessions):
1533                future = session.add_or_renew_pool(host, is_host_addition=False)
1534                if future is not None:
1535                    have_future = True
1536                    future.add_done_callback(callback)
1537                    futures.add(future)
1538        except Exception:
1539            log.exception("Unexpected failure handling node %s being marked up:", host)
1540            for future in futures:
1541                future.cancel()
1542
1543            self._cleanup_failed_on_up_handling(host)
1544
1545            with host.lock:
1546                host._currently_handling_node_up = False
1547            raise
1548        else:
1549            if not have_future:
1550                with host.lock:
1551                    host.set_up()
1552                    host._currently_handling_node_up = False
1553
1554        # for testing purposes
1555        return futures
1556
1557    def _start_reconnector(self, host, is_host_addition):
1558        if self.profile_manager.distance(host) == HostDistance.IGNORED:
1559            return
1560
1561        schedule = self.reconnection_policy.new_schedule()
1562
1563        # in order to not hold references to this Cluster open and prevent
1564        # proper shutdown when the program ends, we'll just make a closure
1565        # of the current Cluster attributes to create new Connections with
1566        conn_factory = self._make_connection_factory(host)
1567
1568        reconnector = _HostReconnectionHandler(
1569            host, conn_factory, is_host_addition, self.on_add, self.on_up,
1570            self.scheduler, schedule, host.get_and_set_reconnection_handler,
1571            new_handler=None)
1572
1573        old_reconnector = host.get_and_set_reconnection_handler(reconnector)
1574        if old_reconnector:
1575            log.debug("Old host reconnector found for %s, cancelling", host)
1576            old_reconnector.cancel()
1577
1578        log.debug("Starting reconnector for host %s", host)
1579        reconnector.start()
1580
1581    @run_in_executor
1582    def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
1583        """
1584        Intended for internal use only.
1585        """
1586        if self.is_shutdown:
1587            return
1588
1589        with host.lock:
1590            was_up = host.is_up
1591
1592            # ignore down signals if we have open pools to the host
1593            # this is to avoid closing pools when a control connection host became isolated
1594            if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED:
1595                connected = False
1596                for session in tuple(self.sessions):
1597                    pool_states = session.get_pool_state()
1598                    pool_state = pool_states.get(host)
1599                    if pool_state:
1600                        connected |= pool_state['open_count'] > 0
1601                if connected:
1602                    return
1603
1604            host.set_down()
1605            if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
1606                return
1607
1608        log.warning("Host %s has been marked down", host)
1609
1610        self.profile_manager.on_down(host)
1611        self.control_connection.on_down(host)
1612        for session in tuple(self.sessions):
1613            session.on_down(host)
1614
1615        for listener in self.listeners:
1616            listener.on_down(host)
1617
1618        self._start_reconnector(host, is_host_addition)
1619
1620    def on_add(self, host, refresh_nodes=True):
1621        if self.is_shutdown:
1622            return
1623
1624        log.debug("Handling new host %r and notifying listeners", host)
1625
1626        distance = self.profile_manager.distance(host)
1627        if distance != HostDistance.IGNORED:
1628            self._prepare_all_queries(host)
1629            log.debug("Done preparing queries for new host %r", host)
1630
1631        self.profile_manager.on_add(host)
1632        self.control_connection.on_add(host, refresh_nodes)
1633
1634        if distance == HostDistance.IGNORED:
1635            log.debug("Not adding connection pool for new host %r because the "
1636                      "load balancing policy has marked it as IGNORED", host)
1637            self._finalize_add(host, set_up=False)
1638            return
1639
1640        futures_lock = Lock()
1641        futures_results = []
1642        futures = set()
1643
1644        def future_completed(future):
1645            with futures_lock:
1646                futures.discard(future)
1647
1648                try:
1649                    futures_results.append(future.result())
1650                except Exception as exc:
1651                    futures_results.append(exc)
1652
1653                if futures:
1654                    return
1655
1656            log.debug('All futures have completed for added host %s', host)
1657
1658            for exc in [f for f in futures_results if isinstance(f, Exception)]:
1659                log.error("Unexpected failure while adding node %s, will not mark up:", host, exc_info=exc)
1660                return
1661
1662            if not all(futures_results):
1663                log.warning("Connection pool could not be created, not marking node %s up", host)
1664                return
1665
1666            self._finalize_add(host)
1667
1668        have_future = False
1669        for session in tuple(self.sessions):
1670            future = session.add_or_renew_pool(host, is_host_addition=True)
1671            if future is not None:
1672                have_future = True
1673                futures.add(future)
1674                future.add_done_callback(future_completed)
1675
1676        if not have_future:
1677            self._finalize_add(host)
1678
1679    def _finalize_add(self, host, set_up=True):
1680        if set_up:
1681            host.set_up()
1682
1683        for listener in self.listeners:
1684            listener.on_add(host)
1685
1686        # see if there are any pools to add or remove now that the host is marked up
1687        for session in tuple(self.sessions):
1688            session.update_created_pools()
1689
1690    def on_remove(self, host):
1691        if self.is_shutdown:
1692            return
1693
1694        log.debug("Removing host %s", host)
1695        host.set_down()
1696        self.profile_manager.on_remove(host)
1697        for session in tuple(self.sessions):
1698            session.on_remove(host)
1699        for listener in self.listeners:
1700            listener.on_remove(host)
1701        self.control_connection.on_remove(host)
1702
1703    def signal_connection_failure(self, host, connection_exc, is_host_addition, expect_host_to_be_down=False):
1704        is_down = host.signal_connection_failure(connection_exc)
1705        if is_down:
1706            self.on_down(host, is_host_addition, expect_host_to_be_down)
1707        return is_down
1708
1709    def add_host(self, endpoint, datacenter=None, rack=None, signal=True, refresh_nodes=True):
1710        """
1711        Called when adding initial contact points and when the control
1712        connection subsequently discovers a new node.
1713        Returns a Host instance, and a flag indicating whether it was new in
1714        the metadata.
1715        Intended for internal use only.
1716        """
1717        host, new = self.metadata.add_or_return_host(Host(endpoint, self.conviction_policy_factory, datacenter, rack))
1718        if new and signal:
1719            log.info("New Cassandra host %r discovered", host)
1720            self.on_add(host, refresh_nodes)
1721
1722        return host, new
1723
1724    def remove_host(self, host):
1725        """
1726        Called when the control connection observes that a node has left the
1727        ring.  Intended for internal use only.
1728        """
1729        if host and self.metadata.remove_host(host):
1730            log.info("Cassandra host %s removed", host)
1731            self.on_remove(host)
1732
1733    def register_listener(self, listener):
1734        """
1735        Adds a :class:`cassandra.policies.HostStateListener` subclass instance to
1736        the list of listeners to be notified when a host is added, removed,
1737        marked up, or marked down.
1738        """
1739        with self._listener_lock:
1740            self._listeners.add(listener)
1741
1742    def unregister_listener(self, listener):
1743        """ Removes a registered listener. """
1744        with self._listener_lock:
1745            self._listeners.remove(listener)
1746
1747    @property
1748    def listeners(self):
1749        with self._listener_lock:
1750            return self._listeners.copy()
1751
1752    def _ensure_core_connections(self):
1753        """
1754        If any host has fewer than the configured number of core connections
1755        open, attempt to open connections until that number is met.
1756        """
1757        for session in tuple(self.sessions):
1758            for pool in tuple(session._pools.values()):
1759                pool.ensure_core_connections()
1760
1761    @staticmethod
1762    def _validate_refresh_schema(keyspace, table, usertype, function, aggregate):
1763        if any((table, usertype, function, aggregate)):
1764            if not keyspace:
1765                raise ValueError("keyspace is required to refresh specific sub-entity {table, usertype, function, aggregate}")
1766            if sum(1 for e in (table, usertype, function) if e) > 1:
1767                raise ValueError("{table, usertype, function, aggregate} are mutually exclusive")
1768
1769    @staticmethod
1770    def _target_type_from_refresh_args(keyspace, table, usertype, function, aggregate):
1771        if aggregate:
1772            return SchemaTargetType.AGGREGATE
1773        elif function:
1774            return SchemaTargetType.FUNCTION
1775        elif usertype:
1776            return SchemaTargetType.TYPE
1777        elif table:
1778            return SchemaTargetType.TABLE
1779        elif keyspace:
1780            return SchemaTargetType.KEYSPACE
1781        return None
1782
1783    def get_control_connection_host(self):
1784        """
1785        Returns the control connection host metadata.
1786        """
1787        connection = self.control_connection._connection
1788        endpoint = connection.endpoint if connection else None
1789        return self.metadata.get_host(endpoint) if endpoint else None
1790
1791    def refresh_schema_metadata(self, max_schema_agreement_wait=None):
1792        """
1793        Synchronously refresh all schema metadata.
1794
1795        By default, the timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait`
1796        and :attr:`~.Cluster.control_connection_timeout`.
1797
1798        Passing max_schema_agreement_wait here overrides :attr:`~.Cluster.max_schema_agreement_wait`.
1799
1800        Setting max_schema_agreement_wait <= 0 will bypass schema agreement and refresh schema immediately.
1801
1802        An Exception is raised if schema refresh fails for any reason.
1803        """
1804        if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait, force=True):
1805            raise DriverException("Schema metadata was not refreshed. See log for details.")
1806
1807    def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None):
1808        """
1809        Synchronously refresh keyspace metadata. This applies to keyspace-level information such as replication
1810        and durability settings. It does not refresh tables, types, etc. contained in the keyspace.
1811
1812        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
1813        """
1814        if not self.control_connection.refresh_schema(target_type=SchemaTargetType.KEYSPACE, keyspace=keyspace,
1815                                                      schema_agreement_wait=max_schema_agreement_wait, force=True):
1816            raise DriverException("Keyspace metadata was not refreshed. See log for details.")
1817
1818    def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None):
1819        """
1820        Synchronously refresh table metadata. This applies to a table, and any triggers or indexes attached
1821        to the table.
1822
1823        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
1824        """
1825        if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table,
1826                                                      schema_agreement_wait=max_schema_agreement_wait, force=True):
1827            raise DriverException("Table metadata was not refreshed. See log for details.")
1828
1829    def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreement_wait=None):
1830        """
1831        Synchronously refresh materialized view metadata.
1832
1833        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
1834        """
1835        if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view,
1836                                                      schema_agreement_wait=max_schema_agreement_wait, force=True):
1837            raise DriverException("View metadata was not refreshed. See log for details.")
1838
1839    def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_wait=None):
1840        """
1841        Synchronously refresh user defined type metadata.
1842
1843        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
1844        """
1845        if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type,
1846                                                      schema_agreement_wait=max_schema_agreement_wait, force=True):
1847            raise DriverException("User Type metadata was not refreshed. See log for details.")
1848
1849    def refresh_user_function_metadata(self, keyspace, function, max_schema_agreement_wait=None):
1850        """
1851        Synchronously refresh user defined function metadata.
1852
1853        ``function`` is a :class:`cassandra.UserFunctionDescriptor`.
1854
1855        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
1856        """
1857        if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function,
1858                                                      schema_agreement_wait=max_schema_agreement_wait, force=True):
1859            raise DriverException("User Function metadata was not refreshed. See log for details.")
1860
1861    def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreement_wait=None):
1862        """
1863        Synchronously refresh user defined aggregate metadata.
1864
1865        ``aggregate`` is a :class:`cassandra.UserAggregateDescriptor`.
1866
1867        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
1868        """
1869        if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate,
1870                                                      schema_agreement_wait=max_schema_agreement_wait, force=True):
1871            raise DriverException("User Aggregate metadata was not refreshed. See log for details.")
1872
1873    def refresh_nodes(self, force_token_rebuild=False):
1874        """
1875        Synchronously refresh the node list and token metadata
1876
1877        `force_token_rebuild` can be used to rebuild the token map metadata, even if no new nodes are discovered.
1878
1879        An Exception is raised if node refresh fails for any reason.
1880        """
1881        if not self.control_connection.refresh_node_list_and_token_map(force_token_rebuild):
1882            raise DriverException("Node list was not refreshed. See log for details.")
1883
1884    def set_meta_refresh_enabled(self, enabled):
1885        """
1886        *Deprecated:* set :attr:`~.Cluster.schema_metadata_enabled` :attr:`~.Cluster.token_metadata_enabled` instead
1887
1888        Sets a flag to enable (True) or disable (False) all metadata refresh queries.
1889        This applies to both schema and node topology.
1890
1891        Disabling this is useful to minimize refreshes during multiple changes.
1892
1893        Meta refresh must be enabled for the driver to become aware of any cluster
1894        topology changes or schema updates.
1895        """
1896        warn("Cluster.set_meta_refresh_enabled is deprecated and will be removed in 4.0. Set "
1897             "Cluster.schema_metadata_enabled and Cluster.token_metadata_enabled instead.", DeprecationWarning)
1898        self.schema_metadata_enabled = enabled
1899        self.token_metadata_enabled = enabled
1900
1901    @classmethod
1902    def _send_chunks(cls, connection, host, chunks, set_keyspace=False):
1903        for ks_chunk in chunks:
1904            messages = [PrepareMessage(query=s.query_string,
1905                                       keyspace=s.keyspace if set_keyspace else None)
1906                        for s in ks_chunk]
1907            # TODO: make this timeout configurable somehow?
1908            responses = connection.wait_for_responses(*messages, timeout=5.0, fail_on_error=False)
1909            for success, response in responses:
1910                if not success:
1911                    log.debug("Got unexpected response when preparing "
1912                              "statement on host %s: %r", host, response)
1913
1914    def _prepare_all_queries(self, host):
1915        if not self._prepared_statements or not self.reprepare_on_up:
1916            return
1917
1918        log.debug("Preparing all known prepared statements against host %s", host)
1919        connection = None
1920        try:
1921            connection = self.connection_factory(host.endpoint)
1922            statements = list(self._prepared_statements.values())
1923            if ProtocolVersion.uses_keyspace_flag(self.protocol_version):
1924                # V5 protocol and higher, no need to set the keyspace
1925                chunks = []
1926                for i in range(0, len(statements), 10):
1927                    chunks.append(statements[i:i + 10])
1928                    self._send_chunks(connection, host, chunks, True)
1929            else:
1930                for keyspace, ks_statements in groupby(statements, lambda s: s.keyspace):
1931                    if keyspace is not None:
1932                        connection.set_keyspace_blocking(keyspace)
1933
1934                    # prepare 10 statements at a time
1935                    ks_statements = list(ks_statements)
1936                    chunks = []
1937                    for i in range(0, len(ks_statements), 10):
1938                        chunks.append(ks_statements[i:i + 10])
1939                    self._send_chunks(connection, host, chunks)
1940
1941            log.debug("Done preparing all known prepared statements against host %s", host)
1942        except OperationTimedOut as timeout:
1943            log.warning("Timed out trying to prepare all statements on host %s: %s", host, timeout)
1944        except (ConnectionException, socket.error) as exc:
1945            log.warning("Error trying to prepare all statements on host %s: %r", host, exc)
1946        except Exception:
1947            log.exception("Error trying to prepare all statements on host %s", host)
1948        finally:
1949            if connection:
1950                connection.close()
1951
1952    def add_prepared(self, query_id, prepared_statement):
1953        with self._prepared_statement_lock:
1954            self._prepared_statements[query_id] = prepared_statement
1955
1956
1957class Session(object):
1958    """
1959    A collection of connection pools for each host in the cluster.
1960    Instances of this class should not be created directly, only
1961    using :meth:`.Cluster.connect()`.
1962
1963    Queries and statements can be executed through ``Session`` instances
1964    using the :meth:`~.Session.execute()` and :meth:`~.Session.execute_async()`
1965    methods.
1966
1967    Example usage::
1968
1969        >>> session = cluster.connect()
1970        >>> session.set_keyspace("mykeyspace")
1971        >>> session.execute("SELECT * FROM mycf")
1972
1973    """
1974
1975    cluster = None
1976    hosts = None
1977    keyspace = None
1978    is_shutdown = False
1979
1980    _row_factory = staticmethod(named_tuple_factory)
1981    @property
1982    def row_factory(self):
1983        """
1984        The format to return row results in.  By default, each
1985        returned row will be a named tuple.  You can alternatively
1986        use any of the following:
1987
1988        - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
1989        - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
1990        - :func:`cassandra.query.dict_factory` - return a result row as a dict
1991        - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict
1992
1993        """
1994        return self._row_factory
1995
1996    @row_factory.setter
1997    def row_factory(self, rf):
1998        self._validate_set_legacy_config('row_factory', rf)
1999
2000    _default_timeout = 10.0
2001
2002    @property
2003    def default_timeout(self):
2004        """
2005        A default timeout, measured in seconds, for queries executed through
2006        :meth:`.execute()` or :meth:`.execute_async()`.  This default may be
2007        overridden with the `timeout` parameter for either of those methods.
2008
2009        Setting this to :const:`None` will cause no timeouts to be set by default.
2010
2011        Please see :meth:`.ResponseFuture.result` for details on the scope and
2012        effect of this timeout.
2013
2014        .. versionadded:: 2.0.0
2015        """
2016        return self._default_timeout
2017
2018    @default_timeout.setter
2019    def default_timeout(self, timeout):
2020        self._validate_set_legacy_config('default_timeout', timeout)
2021
2022    _default_consistency_level = ConsistencyLevel.LOCAL_ONE
2023
2024    @property
2025    def default_consistency_level(self):
2026        """
2027        *Deprecated:* use execution profiles instead
2028        The default :class:`~ConsistencyLevel` for operations executed through
2029        this session.  This default may be overridden by setting the
2030        :attr:`~.Statement.consistency_level` on individual statements.
2031
2032        .. versionadded:: 1.2.0
2033
2034        .. versionchanged:: 3.0.0
2035
2036            default changed from ONE to LOCAL_ONE
2037        """
2038        return self._default_consistency_level
2039
2040    @default_consistency_level.setter
2041    def default_consistency_level(self, cl):
2042        """
2043        *Deprecated:* use execution profiles instead
2044        """
2045        warn("Setting the consistency level at the session level will be removed in 4.0. Consider using "
2046             "execution profiles and setting the desired consitency level to the EXEC_PROFILE_DEFAULT profile."
2047             , DeprecationWarning)
2048        self._validate_set_legacy_config('default_consistency_level', cl)
2049
2050    _default_serial_consistency_level = None
2051
2052    @property
2053    def default_serial_consistency_level(self):
2054        """
2055        The default :class:`~ConsistencyLevel` for serial phase of  conditional updates executed through
2056        this session.  This default may be overridden by setting the
2057        :attr:`~.Statement.serial_consistency_level` on individual statements.
2058
2059        Only valid for ``protocol_version >= 2``.
2060        """
2061        return self._default_serial_consistency_level
2062
2063    @default_serial_consistency_level.setter
2064    def default_serial_consistency_level(self, cl):
2065        if (cl is not None and
2066                not ConsistencyLevel.is_serial(cl)):
2067            raise ValueError("default_serial_consistency_level must be either "
2068                             "ConsistencyLevel.SERIAL "
2069                             "or ConsistencyLevel.LOCAL_SERIAL.")
2070
2071        self._validate_set_legacy_config('default_serial_consistency_level', cl)
2072
2073    max_trace_wait = 2.0
2074    """
2075    The maximum amount of time (in seconds) the driver will wait for trace
2076    details to be populated server-side for a query before giving up.
2077    If the `trace` parameter for :meth:`~.execute()` or :meth:`~.execute_async()`
2078    is :const:`True`, the driver will repeatedly attempt to fetch trace
2079    details for the query (using exponential backoff) until this limit is
2080    hit.  If the limit is passed, an error will be logged and the
2081    :attr:`.Statement.trace` will be left as :const:`None`. """
2082
2083    default_fetch_size = 5000
2084    """
2085    By default, this many rows will be fetched at a time. Setting
2086    this to :const:`None` will disable automatic paging for large query
2087    results.  The fetch size can be also specified per-query through
2088    :attr:`.Statement.fetch_size`.
2089
2090    This only takes effect when protocol version 2 or higher is used.
2091    See :attr:`.Cluster.protocol_version` for details.
2092
2093    .. versionadded:: 2.0.0
2094    """
2095
2096    use_client_timestamp = True
2097    """
2098    When using protocol version 3 or higher, write timestamps may be supplied
2099    client-side at the protocol level.  (Normally they are generated
2100    server-side by the coordinator node.)  Note that timestamps specified
2101    within a CQL query will override this timestamp.
2102
2103    .. versionadded:: 2.1.0
2104    """
2105
2106    timestamp_generator = None
2107    """
2108    When :attr:`use_client_timestamp` is set, sessions call this object and use
2109    the result as the timestamp.  (Note that timestamps specified within a CQL
2110    query will override this timestamp.)  By default, a new
2111    :class:`~.MonotonicTimestampGenerator` is created for
2112    each :class:`Cluster` instance.
2113
2114    Applications can set this value for custom timestamp behavior.  For
2115    example, an application could share a timestamp generator across
2116    :class:`Cluster` objects to guarantee that the application will use unique,
2117    increasing timestamps across clusters, or set it to to ``lambda:
2118    int(time.time() * 1e6)`` if losing records over clock inconsistencies is
2119    acceptable for the application. Custom :attr:`timestamp_generator` s should
2120    be callable, and calling them should return an integer representing microseconds
2121    since some point in time, typically UNIX epoch.
2122
2123    .. versionadded:: 3.8.0
2124    """
2125
2126    encoder = None
2127    """
2128    A :class:`~cassandra.encoder.Encoder` instance that will be used when
2129    formatting query parameters for non-prepared statements.  This is not used
2130    for prepared statements (because prepared statements give the driver more
2131    information about what CQL types are expected, allowing it to accept a
2132    wider range of python types).
2133
2134    The encoder uses a mapping from python types to encoder methods (for
2135    specific CQL types).  This mapping can be be modified by users as they see
2136    fit.  Methods of :class:`~cassandra.encoder.Encoder` should be used for mapping
2137    values if possible, because they take precautions to avoid injections and
2138    properly sanitize data.
2139
2140    Example::
2141
2142        cluster = Cluster()
2143        session = cluster.connect("mykeyspace")
2144        session.encoder.mapping[tuple] = session.encoder.cql_encode_tuple
2145
2146        session.execute("CREATE TABLE mytable (k int PRIMARY KEY, col tuple<int, ascii>)")
2147        session.execute("INSERT INTO mytable (k, col) VALUES (%s, %s)", [0, (123, 'abc')])
2148
2149    .. versionadded:: 2.1.0
2150    """
2151
2152    client_protocol_handler = ProtocolHandler
2153    """
2154    Specifies a protocol handler that will be used for client-initiated requests (i.e. no
2155    internal driver requests). This can be used to override or extend features such as
2156    message or type ser/des.
2157
2158    The default pure python implementation is :class:`cassandra.protocol.ProtocolHandler`.
2159
2160    When compiled with Cython, there are also built-in faster alternatives. See :ref:`faster_deser`
2161    """
2162
2163    _lock = None
2164    _pools = None
2165    _profile_manager = None
2166    _metrics = None
2167    _request_init_callbacks = None
2168
2169    def __init__(self, cluster, hosts, keyspace=None):
2170        self.cluster = cluster
2171        self.hosts = hosts
2172        self.keyspace = keyspace
2173
2174        self._lock = RLock()
2175        self._pools = {}
2176        self._profile_manager = cluster.profile_manager
2177        self._metrics = cluster.metrics
2178        self._request_init_callbacks = []
2179        self._protocol_version = self.cluster.protocol_version
2180
2181        self.encoder = Encoder()
2182
2183        # create connection pools in parallel
2184        self._initial_connect_futures = set()
2185        for host in hosts:
2186            future = self.add_or_renew_pool(host, is_host_addition=False)
2187            if future:
2188                self._initial_connect_futures.add(future)
2189
2190        futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED)
2191        while futures.not_done and not any(f.result() for f in futures.done):
2192            futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)
2193
2194        if not any(f.result() for f in self._initial_connect_futures):
2195            msg = "Unable to connect to any servers"
2196            if self.keyspace:
2197                msg += " using keyspace '%s'" % self.keyspace
2198            raise NoHostAvailable(msg, [h.address for h in hosts])
2199
2200    def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False,
2201                custom_payload=None, execution_profile=EXEC_PROFILE_DEFAULT,
2202                paging_state=None, host=None):
2203        """
2204        Execute the given query and synchronously wait for the response.
2205
2206        If an error is encountered while executing the query, an Exception
2207        will be raised.
2208
2209        `query` may be a query string or an instance of :class:`cassandra.query.Statement`.
2210
2211        `parameters` may be a sequence or dict of parameters to bind.  If a
2212        sequence is used, ``%s`` should be used the placeholder for each
2213        argument.  If a dict is used, ``%(name)s`` style placeholders must
2214        be used.
2215
2216        `timeout` should specify a floating-point timeout (in seconds) after
2217        which an :exc:`.OperationTimedOut` exception will be raised if the query
2218        has not completed.  If not set, the timeout defaults to
2219        :attr:`~.Session.default_timeout`.  If set to :const:`None`, there is
2220        no timeout. Please see :meth:`.ResponseFuture.result` for details on
2221        the scope and effect of this timeout.
2222
2223        If `trace` is set to :const:`True`, the query will be sent with tracing enabled.
2224        The trace details can be obtained using the returned :class:`.ResultSet` object.
2225
2226        `custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
2227        If `query` is a Statement with its own custom_payload. The message payload
2228        will be a union of the two, with the values specified here taking precedence.
2229
2230        `execution_profile` is the execution profile to use for this request. It can be a key to a profile configured
2231        via :meth:`Cluster.add_execution_profile` or an instance (from :meth:`Session.execution_profile_clone_update`,
2232        for example
2233
2234        `paging_state` is an optional paging state, reused from a previous :class:`ResultSet`.
2235
2236        `host` is the :class:`pool.Host` that should handle the query. Using this is discouraged except in a few
2237        cases, e.g., querying node-local tables and applying schema changes.
2238        """
2239        return self.execute_async(query, parameters, trace, custom_payload,
2240                                  timeout, execution_profile, paging_state, host).result()
2241
2242    def execute_async(self, query, parameters=None, trace=False, custom_payload=None,
2243                      timeout=_NOT_SET, execution_profile=EXEC_PROFILE_DEFAULT,
2244                      paging_state=None, host=None):
2245        """
2246        Execute the given query and return a :class:`~.ResponseFuture` object
2247        which callbacks may be attached to for asynchronous response
2248        delivery.  You may also call :meth:`~.ResponseFuture.result()`
2249        on the :class:`.ResponseFuture` to synchronously block for results at
2250        any time.
2251
2252        See :meth:`Session.execute` for parameter definitions.
2253
2254        Example usage::
2255
2256            >>> session = cluster.connect()
2257            >>> future = session.execute_async("SELECT * FROM mycf")
2258
2259            >>> def log_results(results):
2260            ...     for row in results:
2261            ...         log.info("Results: %s", row)
2262
2263            >>> def log_error(exc):
2264            >>>     log.error("Operation failed: %s", exc)
2265
2266            >>> future.add_callbacks(log_results, log_error)
2267
2268        Async execution with blocking wait for results::
2269
2270            >>> future = session.execute_async("SELECT * FROM mycf")
2271            >>> # do other stuff...
2272
2273            >>> try:
2274            ...     results = future.result()
2275            ... except Exception:
2276            ...     log.exception("Operation failed:")
2277
2278        """
2279        future = self._create_response_future(
2280            query, parameters, trace, custom_payload, timeout,
2281            execution_profile, paging_state, host)
2282        future._protocol_handler = self.client_protocol_handler
2283        self._on_request(future)
2284        future.send_request()
2285        return future
2286
2287    def _create_response_future(self, query, parameters, trace, custom_payload,
2288                                timeout, execution_profile=EXEC_PROFILE_DEFAULT,
2289                                paging_state=None, host=None):
2290        """ Returns the ResponseFuture before calling send_request() on it """
2291
2292        prepared_statement = None
2293
2294        if isinstance(query, six.string_types):
2295            query = SimpleStatement(query)
2296        elif isinstance(query, PreparedStatement):
2297            query = query.bind(parameters)
2298
2299        if self.cluster._config_mode == _ConfigMode.LEGACY:
2300            if execution_profile is not EXEC_PROFILE_DEFAULT:
2301                raise ValueError("Cannot specify execution_profile while using legacy parameters.")
2302
2303            if timeout is _NOT_SET:
2304                timeout = self.default_timeout
2305
2306            cl = query.consistency_level if query.consistency_level is not None else self.default_consistency_level
2307            serial_cl = query.serial_consistency_level if query.serial_consistency_level is not None else self.default_serial_consistency_level
2308
2309            retry_policy = query.retry_policy or self.cluster.default_retry_policy
2310            row_factory = self.row_factory
2311            load_balancing_policy = self.cluster.load_balancing_policy
2312            spec_exec_policy = None
2313        else:
2314            execution_profile = self._maybe_get_execution_profile(execution_profile)
2315
2316            if timeout is _NOT_SET:
2317                timeout = execution_profile.request_timeout
2318
2319            cl = query.consistency_level if query.consistency_level is not None else execution_profile.consistency_level
2320            serial_cl = query.serial_consistency_level if query.serial_consistency_level is not None else execution_profile.serial_consistency_level
2321
2322            retry_policy = query.retry_policy or execution_profile.retry_policy
2323            row_factory = execution_profile.row_factory
2324            load_balancing_policy = execution_profile.load_balancing_policy
2325            spec_exec_policy = execution_profile.speculative_execution_policy
2326
2327        fetch_size = query.fetch_size
2328        if fetch_size is FETCH_SIZE_UNSET and self._protocol_version >= 2:
2329            fetch_size = self.default_fetch_size
2330        elif self._protocol_version == 1:
2331            fetch_size = None
2332
2333        start_time = time.time()
2334        if self._protocol_version >= 3 and self.use_client_timestamp:
2335            timestamp = self.cluster.timestamp_generator()
2336        else:
2337            timestamp = None
2338
2339        if isinstance(query, SimpleStatement):
2340            query_string = query.query_string
2341            statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None
2342            if parameters:
2343                query_string = bind_params(query_string, parameters, self.encoder)
2344            message = QueryMessage(
2345                query_string, cl, serial_cl,
2346                fetch_size, timestamp=timestamp,
2347                keyspace=statement_keyspace)
2348        elif isinstance(query, BoundStatement):
2349            prepared_statement = query.prepared_statement
2350            message = ExecuteMessage(
2351                prepared_statement.query_id, query.values, cl,
2352                serial_cl, fetch_size,
2353                timestamp=timestamp, skip_meta=bool(prepared_statement.result_metadata),
2354                result_metadata_id=prepared_statement.result_metadata_id)
2355        elif isinstance(query, BatchStatement):
2356            if self._protocol_version < 2:
2357                raise UnsupportedOperation(
2358                    "BatchStatement execution is only supported with protocol version "
2359                    "2 or higher (supported in Cassandra 2.0 and higher).  Consider "
2360                    "setting Cluster.protocol_version to 2 to support this operation.")
2361            statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None
2362            message = BatchMessage(
2363                query.batch_type, query._statements_and_parameters, cl,
2364                serial_cl, timestamp, statement_keyspace)
2365
2366        message.tracing = trace
2367
2368        message.update_custom_payload(query.custom_payload)
2369        message.update_custom_payload(custom_payload)
2370        message.allow_beta_protocol_version = self.cluster.allow_beta_protocol_version
2371        message.paging_state = paging_state
2372
2373        spec_exec_plan = spec_exec_policy.new_plan(query.keyspace or self.keyspace, query) if query.is_idempotent and spec_exec_policy else None
2374        return ResponseFuture(
2375            self, message, query, timeout, metrics=self._metrics,
2376            prepared_statement=prepared_statement, retry_policy=retry_policy, row_factory=row_factory,
2377            load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan,
2378            host=host)
2379
2380    def _execution_profile_to_string(self, name):
2381        if name is EXEC_PROFILE_DEFAULT:
2382            return 'EXEC_PROFILE_DEFAULT'
2383        return '"%s"' % (name,)
2384
2385    def get_execution_profile(self, name):
2386        """
2387        Returns the execution profile associated with the provided ``name``.
2388
2389        :param name: The name (or key) of the execution profile.
2390        """
2391        profiles = self.cluster.profile_manager.profiles
2392        try:
2393            return profiles[name]
2394        except KeyError:
2395            eps = [self._execution_profile_to_string(ep) for ep in profiles.keys()]
2396            raise ValueError("Invalid execution_profile: %s; valid profiles are: %s." % (
2397                self._execution_profile_to_string(name), ', '.join(eps)))
2398
2399    def _maybe_get_execution_profile(self, ep):
2400        return ep if isinstance(ep, ExecutionProfile) else self.get_execution_profile(ep)
2401
2402    def execution_profile_clone_update(self, ep, **kwargs):
2403        """
2404        Returns a clone of the ``ep`` profile.  ``kwargs`` can be specified to update attributes
2405        of the returned profile.
2406
2407        This is a shallow clone, so any objects referenced by the profile are shared. This means Load Balancing Policy
2408        is maintained by inclusion in the active profiles. It also means updating any other rich objects will be seen
2409        by the active profile. In cases where this is not desirable, be sure to replace the instance instead of manipulating
2410        the shared object.
2411        """
2412        clone = copy(self._maybe_get_execution_profile(ep))
2413        for attr, value in kwargs.items():
2414            setattr(clone, attr, value)
2415        return clone
2416
2417    def add_request_init_listener(self, fn, *args, **kwargs):
2418        """
2419        Adds a callback with arguments to be called when any request is created.
2420
2421        It will be invoked as `fn(response_future, *args, **kwargs)` after each client request is created,
2422        and before the request is sent\*. This can be used to create extensions by adding result callbacks to the
2423        response future.
2424
2425        \* where `response_future` is the :class:`.ResponseFuture` for the request.
2426
2427        Note that the init callback is done on the client thread creating the request, so you may need to consider
2428        synchronization if you have multiple threads. Any callbacks added to the response future will be executed
2429        on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in
2430        :meth:`.ResponseFuture.add_callbacks`.
2431
2432        See `this example <https://github.com/datastax/python-driver/blob/master/examples/request_init_listener.py>`_ in the
2433        source tree for an example.
2434        """
2435        self._request_init_callbacks.append((fn, args, kwargs))
2436
2437    def remove_request_init_listener(self, fn, *args, **kwargs):
2438        """
2439        Removes a callback and arguments from the list.
2440
2441        See :meth:`.Session.add_request_init_listener`.
2442        """
2443        self._request_init_callbacks.remove((fn, args, kwargs))
2444
2445    def _on_request(self, response_future):
2446        for fn, args, kwargs in self._request_init_callbacks:
2447            fn(response_future, *args, **kwargs)
2448
2449    def prepare(self, query, custom_payload=None, keyspace=None):
2450        """
2451        Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
2452        instance which can be used as follows::
2453
2454            >>> session = cluster.connect("mykeyspace")
2455            >>> query = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)"
2456            >>> prepared = session.prepare(query)
2457            >>> session.execute(prepared, (user.id, user.name, user.age))
2458
2459        Or you may bind values to the prepared statement ahead of time::
2460
2461            >>> prepared = session.prepare(query)
2462            >>> bound_stmt = prepared.bind((user.id, user.name, user.age))
2463            >>> session.execute(bound_stmt)
2464
2465        Of course, prepared statements may (and should) be reused::
2466
2467            >>> prepared = session.prepare(query)
2468            >>> for user in users:
2469            ...     bound = prepared.bind((user.id, user.name, user.age))
2470            ...     session.execute(bound)
2471
2472        Alternatively, if :attr:`~.Cluster.protocol_version` is 5 or higher
2473        (requires Cassandra 4.0+), the keyspace can be specified as a
2474        parameter. This will allow you to avoid specifying the keyspace in the
2475        query without specifying a keyspace in :meth:`~.Cluster.connect`. It
2476        even will let you prepare and use statements against a keyspace other
2477        than the one originally specified on connection:
2478
2479            >>> analyticskeyspace_prepared = session.prepare(
2480            ...     "INSERT INTO user_activity id, last_activity VALUES (?, ?)",
2481            ...     keyspace="analyticskeyspace")  # note the different keyspace
2482
2483        **Important**: PreparedStatements should be prepared only once.
2484        Preparing the same query more than once will likely affect performance.
2485
2486        `custom_payload` is a key value map to be passed along with the prepare
2487        message. See :ref:`custom_payload`.
2488        """
2489        message = PrepareMessage(query=query, keyspace=keyspace)
2490        future = ResponseFuture(self, message, query=None, timeout=self.default_timeout)
2491        try:
2492            future.send_request()
2493            query_id, bind_metadata, pk_indexes, result_metadata, result_metadata_id = future.result()
2494        except Exception:
2495            log.exception("Error preparing query:")
2496            raise
2497
2498        prepared_keyspace = keyspace if keyspace else None
2499        prepared_statement = PreparedStatement.from_message(
2500            query_id, bind_metadata, pk_indexes, self.cluster.metadata, query, self.keyspace,
2501            self._protocol_version, result_metadata, result_metadata_id)
2502        prepared_statement.custom_payload = future.custom_payload
2503
2504        self.cluster.add_prepared(query_id, prepared_statement)
2505
2506        if self.cluster.prepare_on_all_hosts:
2507            host = future._current_host
2508            try:
2509                self.prepare_on_all_hosts(prepared_statement.query_string, host, prepared_keyspace)
2510            except Exception:
2511                log.exception("Error preparing query on all hosts:")
2512
2513        return prepared_statement
2514
2515    def prepare_on_all_hosts(self, query, excluded_host, keyspace=None):
2516        """
2517        Prepare the given query on all hosts, excluding ``excluded_host``.
2518        Intended for internal use only.
2519        """
2520        futures = []
2521        for host in tuple(self._pools.keys()):
2522            if host != excluded_host and host.is_up:
2523                future = ResponseFuture(self, PrepareMessage(query=query, keyspace=keyspace),
2524                                            None, self.default_timeout)
2525
2526                # we don't care about errors preparing against specific hosts,
2527                # since we can always prepare them as needed when the prepared
2528                # statement is used.  Just log errors and continue on.
2529                try:
2530                    request_id = future._query(host)
2531                except Exception:
2532                    log.exception("Error preparing query for host %s:", host)
2533                    continue
2534
2535                if request_id is None:
2536                    # the error has already been logged by ResponsFuture
2537                    log.debug("Failed to prepare query for host %s: %r",
2538                              host, future._errors.get(host))
2539                    continue
2540
2541                futures.append((host, future))
2542
2543        for host, future in futures:
2544            try:
2545                future.result()
2546            except Exception:
2547                log.exception("Error preparing query for host %s:", host)
2548
2549    def shutdown(self):
2550        """
2551        Close all connections.  ``Session`` instances should not be used
2552        for any purpose after being shutdown.
2553        """
2554        with self._lock:
2555            if self.is_shutdown:
2556                return
2557            else:
2558                self.is_shutdown = True
2559
2560        # PYTHON-673. If shutdown was called shortly after session init, avoid
2561        # a race by cancelling any initial connection attempts haven't started,
2562        # then blocking on any that have.
2563        for future in self._initial_connect_futures:
2564            future.cancel()
2565        wait_futures(self._initial_connect_futures)
2566
2567        for pool in tuple(self._pools.values()):
2568            pool.shutdown()
2569
2570    def __enter__(self):
2571        return self
2572
2573    def __exit__(self, *args):
2574        self.shutdown()
2575
2576    def __del__(self):
2577        try:
2578            # Ensure all connections are closed, in case the Session object is deleted by the GC
2579            self.shutdown()
2580        except:
2581            # Ignore all errors. Shutdown errors can be caught by the user
2582            # when cluster.shutdown() is called explicitly.
2583            pass
2584
2585    def add_or_renew_pool(self, host, is_host_addition):
2586        """
2587        For internal use only.
2588        """
2589        distance = self._profile_manager.distance(host)
2590        if distance == HostDistance.IGNORED:
2591            return None
2592
2593        def run_add_or_renew_pool():
2594            try:
2595                if self._protocol_version >= 3:
2596                    new_pool = HostConnection(host, distance, self)
2597                else:
2598                    # TODO remove host pool again ???
2599                    new_pool = HostConnectionPool(host, distance, self)
2600            except AuthenticationFailed as auth_exc:
2601                conn_exc = ConnectionException(str(auth_exc), host=host)
2602                self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
2603                return False
2604            except Exception as conn_exc:
2605                log.warning("Failed to create connection pool for new host %s:",
2606                            host, exc_info=conn_exc)
2607                # the host itself will still be marked down, so we need to pass
2608                # a special flag to make sure the reconnector is created
2609                self.cluster.signal_connection_failure(
2610                    host, conn_exc, is_host_addition, expect_host_to_be_down=True)
2611                return False
2612
2613            previous = self._pools.get(host)
2614            with self._lock:
2615                while new_pool._keyspace != self.keyspace:
2616                    self._lock.release()
2617                    set_keyspace_event = Event()
2618                    errors_returned = []
2619
2620                    def callback(pool, errors):
2621                        errors_returned.extend(errors)
2622                        set_keyspace_event.set()
2623
2624                    new_pool._set_keyspace_for_all_conns(self.keyspace, callback)
2625                    set_keyspace_event.wait(self.cluster.connect_timeout)
2626                    if not set_keyspace_event.is_set() or errors_returned:
2627                        log.warning("Failed setting keyspace for pool after keyspace changed during connect: %s", errors_returned)
2628                        self.cluster.on_down(host, is_host_addition)
2629                        new_pool.shutdown()
2630                        self._lock.acquire()
2631                        return False
2632                    self._lock.acquire()
2633                self._pools[host] = new_pool
2634
2635            log.debug("Added pool for host %s to session", host)
2636            if previous:
2637                previous.shutdown()
2638
2639            return True
2640
2641        return self.submit(run_add_or_renew_pool)
2642
2643    def remove_pool(self, host):
2644        pool = self._pools.pop(host, None)
2645        if pool:
2646            log.debug("Removed connection pool for %r", host)
2647            return self.submit(pool.shutdown)
2648        else:
2649            return None
2650
2651    def update_created_pools(self):
2652        """
2653        When the set of live nodes change, the loadbalancer will change its
2654        mind on host distances. It might change it on the node that came/left
2655        but also on other nodes (for instance, if a node dies, another
2656        previously ignored node may be now considered).
2657
2658        This method ensures that all hosts for which a pool should exist
2659        have one, and hosts that shouldn't don't.
2660
2661        For internal use only.
2662        """
2663        futures = set()
2664        for host in self.cluster.metadata.all_hosts():
2665            distance = self._profile_manager.distance(host)
2666            pool = self._pools.get(host)
2667            future = None
2668            if not pool or pool.is_shutdown:
2669                # we don't eagerly set is_up on previously ignored hosts. None is included here
2670                # to allow us to attempt connections to hosts that have gone from ignored to something
2671                # else.
2672                if distance != HostDistance.IGNORED and host.is_up in (True, None):
2673                    future = self.add_or_renew_pool(host, False)
2674            elif distance != pool.host_distance:
2675                # the distance has changed
2676                if distance == HostDistance.IGNORED:
2677                    future = self.remove_pool(host)
2678                else:
2679                    pool.host_distance = distance
2680            if future:
2681                futures.add(future)
2682        return futures
2683
2684    def on_down(self, host):
2685        """
2686        Called by the parent Cluster instance when a node is marked down.
2687        Only intended for internal use.
2688        """
2689        future = self.remove_pool(host)
2690        if future:
2691            future.add_done_callback(lambda f: self.update_created_pools())
2692
2693    def on_remove(self, host):
2694        """ Internal """
2695        self.on_down(host)
2696
2697    def set_keyspace(self, keyspace):
2698        """
2699        Set the default keyspace for all queries made through this Session.
2700        This operation blocks until complete.
2701        """
2702        self.execute('USE %s' % (protect_name(keyspace),))
2703
2704    def _set_keyspace_for_all_pools(self, keyspace, callback):
2705        """
2706        Asynchronously sets the keyspace on all pools.  When all
2707        pools have set all of their connections, `callback` will be
2708        called with a dictionary of all errors that occurred, keyed
2709        by the `Host` that they occurred against.
2710        """
2711        with self._lock:
2712            self.keyspace = keyspace
2713            remaining_callbacks = set(self._pools.values())
2714        errors = {}
2715
2716        if not remaining_callbacks:
2717            callback(errors)
2718            return
2719
2720        def pool_finished_setting_keyspace(pool, host_errors):
2721            remaining_callbacks.remove(pool)
2722            if host_errors:
2723                errors[pool.host] = host_errors
2724
2725            if not remaining_callbacks:
2726                callback(host_errors)
2727
2728        for pool in tuple(self._pools.values()):
2729            pool._set_keyspace_for_all_conns(keyspace, pool_finished_setting_keyspace)
2730
2731    def user_type_registered(self, keyspace, user_type, klass):
2732        """
2733        Called by the parent Cluster instance when the user registers a new
2734        mapping from a user-defined type to a class.  Intended for internal
2735        use only.
2736        """
2737        try:
2738            ks_meta = self.cluster.metadata.keyspaces[keyspace]
2739        except KeyError:
2740            raise UserTypeDoesNotExist(
2741                'Keyspace %s does not exist or has not been discovered by the driver' % (keyspace,))
2742
2743        try:
2744            type_meta = ks_meta.user_types[user_type]
2745        except KeyError:
2746            raise UserTypeDoesNotExist(
2747                'User type %s does not exist in keyspace %s' % (user_type, keyspace))
2748
2749        field_names = type_meta.field_names
2750        if six.PY2:
2751            # go from unicode to string to avoid decode errors from implicit
2752            # decode when formatting non-ascii values
2753            field_names = [fn.encode('utf-8') for fn in field_names]
2754
2755        def encode(val):
2756            return '{ %s }' % ' , '.join('%s : %s' % (
2757                field_name,
2758                self.encoder.cql_encode_all_types(getattr(val, field_name, None))
2759            ) for field_name in field_names)
2760
2761        self.encoder.mapping[klass] = encode
2762
2763    def submit(self, fn, *args, **kwargs):
2764        """ Internal """
2765        if not self.is_shutdown:
2766            return self.cluster.executor.submit(fn, *args, **kwargs)
2767
2768    def get_pool_state(self):
2769        return dict((host, pool.get_state()) for host, pool in tuple(self._pools.items()))
2770
2771    def get_pools(self):
2772        return self._pools.values()
2773
2774    def _validate_set_legacy_config(self, attr_name, value):
2775        if self.cluster._config_mode == _ConfigMode.PROFILES:
2776            raise ValueError("Cannot set Session.%s while using Configuration Profiles. Set this in a profile instead." % (attr_name,))
2777        setattr(self, '_' + attr_name, value)
2778        self.cluster._config_mode = _ConfigMode.LEGACY
2779
2780
2781class UserTypeDoesNotExist(Exception):
2782    """
2783    An attempt was made to use a user-defined type that does not exist.
2784
2785    .. versionadded:: 2.1.0
2786    """
2787    pass
2788
2789
2790class _ControlReconnectionHandler(_ReconnectionHandler):
2791    """
2792    Internal
2793    """
2794
2795    def __init__(self, control_connection, *args, **kwargs):
2796        _ReconnectionHandler.__init__(self, *args, **kwargs)
2797        self.control_connection = weakref.proxy(control_connection)
2798
2799    def try_reconnect(self):
2800        return self.control_connection._reconnect_internal()
2801
2802    def on_reconnection(self, connection):
2803        self.control_connection._set_new_connection(connection)
2804
2805    def on_exception(self, exc, next_delay):
2806        # TODO only overridden to add logging, so add logging
2807        if isinstance(exc, AuthenticationFailed):
2808            return False
2809        else:
2810            log.debug("Error trying to reconnect control connection: %r", exc)
2811            return True
2812
2813
2814def _watch_callback(obj_weakref, method_name, *args, **kwargs):
2815    """
2816    A callback handler for the ControlConnection that tolerates
2817    weak references.
2818    """
2819    obj = obj_weakref()
2820    if obj is None:
2821        return
2822    getattr(obj, method_name)(*args, **kwargs)
2823
2824
2825def _clear_watcher(conn, expiring_weakref):
2826    """
2827    Called when the ControlConnection object is about to be finalized.
2828    This clears watchers on the underlying Connection object.
2829    """
2830    try:
2831        conn.control_conn_disposed()
2832    except ReferenceError:
2833        pass
2834
2835
2836class ControlConnection(object):
2837    """
2838    Internal
2839    """
2840
2841    _SELECT_PEERS = "SELECT * FROM system.peers"
2842    _SELECT_PEERS_NO_TOKENS = "SELECT host_id, peer, data_center, rack, rpc_address, release_version, schema_version FROM system.peers"
2843    _SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'"
2844    _SELECT_LOCAL_NO_TOKENS = "SELECT host_id, cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
2845    # Used only when token_metadata_enabled is set to False
2846    _SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS = "SELECT rpc_address FROM system.local WHERE key='local'"
2847
2848    _SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers"
2849    _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
2850
2851    _is_shutdown = False
2852    _timeout = None
2853    _protocol_version = None
2854
2855    _schema_event_refresh_window = None
2856    _topology_event_refresh_window = None
2857    _status_event_refresh_window = None
2858
2859    _schema_meta_enabled = True
2860    _token_meta_enabled = True
2861
2862    # for testing purposes
2863    _time = time
2864
2865    def __init__(self, cluster, timeout,
2866                 schema_event_refresh_window,
2867                 topology_event_refresh_window,
2868                 status_event_refresh_window,
2869                 schema_meta_enabled=True,
2870                 token_meta_enabled=True):
2871        # use a weak reference to allow the Cluster instance to be GC'ed (and
2872        # shutdown) since implementing __del__ disables the cycle detector
2873        self._cluster = weakref.proxy(cluster)
2874        self._connection = None
2875        self._timeout = timeout
2876
2877        self._schema_event_refresh_window = schema_event_refresh_window
2878        self._topology_event_refresh_window = topology_event_refresh_window
2879        self._status_event_refresh_window = status_event_refresh_window
2880        self._schema_meta_enabled = schema_meta_enabled
2881        self._token_meta_enabled = token_meta_enabled
2882
2883        self._lock = RLock()
2884        self._schema_agreement_lock = Lock()
2885
2886        self._reconnection_handler = None
2887        self._reconnection_lock = RLock()
2888
2889        self._event_schedule_times = {}
2890
2891    def connect(self):
2892        if self._is_shutdown:
2893            return
2894
2895        self._protocol_version = self._cluster.protocol_version
2896        self._set_new_connection(self._reconnect_internal())
2897
2898    def _set_new_connection(self, conn):
2899        """
2900        Replace existing connection (if there is one) and close it.
2901        """
2902        with self._lock:
2903            old = self._connection
2904            self._connection = conn
2905
2906        if old:
2907            log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
2908            old.close()
2909
2910    def _reconnect_internal(self):
2911        """
2912        Tries to connect to each host in the query plan until one succeeds
2913        or every attempt fails. If successful, a new Connection will be
2914        returned.  Otherwise, :exc:`NoHostAvailable` will be raised
2915        with an "errors" arg that is a dict mapping host addresses
2916        to the exception that was raised when an attempt was made to open
2917        a connection to that host.
2918        """
2919        errors = {}
2920        lbp = (
2921            self._cluster.load_balancing_policy
2922            if self._cluster._config_mode == _ConfigMode.LEGACY else
2923            self._cluster._default_load_balancing_policy
2924        )
2925
2926        for host in lbp.make_query_plan():
2927            try:
2928                return self._try_connect(host)
2929            except ConnectionException as exc:
2930                errors[str(host.endpoint)] = exc
2931                log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
2932                self._cluster.signal_connection_failure(host, exc, is_host_addition=False)
2933            except Exception as exc:
2934                errors[str(host.endpoint)] = exc
2935                log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
2936            if self._is_shutdown:
2937                raise DriverException("[control connection] Reconnection in progress during shutdown")
2938
2939        raise NoHostAvailable("Unable to connect to any servers", errors)
2940
2941    def _try_connect(self, host):
2942        """
2943        Creates a new Connection, registers for pushed events, and refreshes
2944        node/token and schema metadata.
2945        """
2946        log.debug("[control connection] Opening new connection to %s", host)
2947
2948        while True:
2949            try:
2950                connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True)
2951                if self._is_shutdown:
2952                    connection.close()
2953                    raise DriverException("Reconnecting during shutdown")
2954                break
2955            except ProtocolVersionUnsupported as e:
2956                self._cluster.protocol_downgrade(host.endpoint, e.startup_version)
2957
2958        log.debug("[control connection] Established new connection %r, "
2959                  "registering watchers and refreshing schema and topology",
2960                  connection)
2961
2962        # use weak references in both directions
2963        # _clear_watcher will be called when this ControlConnection is about to be finalized
2964        # _watch_callback will get the actual callback from the Connection and relay it to
2965        # this object (after a dereferencing a weakref)
2966        self_weakref = weakref.ref(self, partial(_clear_watcher, weakref.proxy(connection)))
2967        try:
2968            connection.register_watchers({
2969                "TOPOLOGY_CHANGE": partial(_watch_callback, self_weakref, '_handle_topology_change'),
2970                "STATUS_CHANGE": partial(_watch_callback, self_weakref, '_handle_status_change'),
2971                "SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
2972            }, register_timeout=self._timeout)
2973
2974            sel_peers = self._SELECT_PEERS if self._token_meta_enabled else self._SELECT_PEERS_NO_TOKENS
2975            sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
2976            peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
2977            local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE)
2978            shared_results = connection.wait_for_responses(
2979                peers_query, local_query, timeout=self._timeout)
2980
2981            self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
2982            self._refresh_schema(connection, preloaded_results=shared_results, schema_agreement_wait=-1)
2983        except Exception:
2984            connection.close()
2985            raise
2986
2987        return connection
2988
2989    def reconnect(self):
2990        if self._is_shutdown:
2991            return
2992
2993        self._submit(self._reconnect)
2994
2995    def _reconnect(self):
2996        log.debug("[control connection] Attempting to reconnect")
2997        try:
2998            self._set_new_connection(self._reconnect_internal())
2999        except NoHostAvailable:
3000            # make a retry schedule (which includes backoff)
3001            schedule = self._cluster.reconnection_policy.new_schedule()
3002
3003            with self._reconnection_lock:
3004
3005                # cancel existing reconnection attempts
3006                if self._reconnection_handler:
3007                    self._reconnection_handler.cancel()
3008
3009                # when a connection is successfully made, _set_new_connection
3010                # will be called with the new connection and then our
3011                # _reconnection_handler will be cleared out
3012                self._reconnection_handler = _ControlReconnectionHandler(
3013                    self, self._cluster.scheduler, schedule,
3014                    self._get_and_set_reconnection_handler,
3015                    new_handler=None)
3016                self._reconnection_handler.start()
3017        except Exception:
3018            log.debug("[control connection] error reconnecting", exc_info=True)
3019            raise
3020
3021    def _get_and_set_reconnection_handler(self, new_handler):
3022        """
3023        Called by the _ControlReconnectionHandler when a new connection
3024        is successfully created.  Clears out the _reconnection_handler on
3025        this ControlConnection.
3026        """
3027        with self._reconnection_lock:
3028            old = self._reconnection_handler
3029            self._reconnection_handler = new_handler
3030            return old
3031
3032    def _submit(self, *args, **kwargs):
3033        try:
3034            if not self._cluster.is_shutdown:
3035                return self._cluster.executor.submit(*args, **kwargs)
3036        except ReferenceError:
3037            pass
3038        return None
3039
3040    def shutdown(self):
3041        # stop trying to reconnect (if we are)
3042        with self._reconnection_lock:
3043            if self._reconnection_handler:
3044                self._reconnection_handler.cancel()
3045
3046        with self._lock:
3047            if self._is_shutdown:
3048                return
3049            else:
3050                self._is_shutdown = True
3051
3052            log.debug("Shutting down control connection")
3053            if self._connection:
3054                self._connection.close()
3055                self._connection = None
3056
3057    def refresh_schema(self, force=False, **kwargs):
3058        try:
3059            if self._connection:
3060                return self._refresh_schema(self._connection, force=force, **kwargs)
3061        except ReferenceError:
3062            pass  # our weak reference to the Cluster is no good
3063        except Exception:
3064            log.debug("[control connection] Error refreshing schema", exc_info=True)
3065            self._signal_error()
3066        return False
3067
3068    def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, force=False, **kwargs):
3069        if self._cluster.is_shutdown:
3070            return False
3071
3072        agreed = self.wait_for_schema_agreement(connection,
3073                                                preloaded_results=preloaded_results,
3074                                                wait_time=schema_agreement_wait)
3075
3076        if not self._schema_meta_enabled and not force:
3077            log.debug("[control connection] Skipping schema refresh because schema metadata is disabled")
3078            return False
3079
3080        if not agreed:
3081            log.debug("Skipping schema refresh due to lack of schema agreement")
3082            return False
3083
3084        self._cluster.metadata.refresh(connection, self._timeout, **kwargs)
3085
3086        return True
3087
3088    def refresh_node_list_and_token_map(self, force_token_rebuild=False):
3089        try:
3090            if self._connection:
3091                self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild)
3092                return True
3093        except ReferenceError:
3094            pass  # our weak reference to the Cluster is no good
3095        except Exception:
3096            log.debug("[control connection] Error refreshing node list and token map", exc_info=True)
3097            self._signal_error()
3098        return False
3099
3100    def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
3101                                         force_token_rebuild=False):
3102
3103        if preloaded_results:
3104            log.debug("[control connection] Refreshing node list and token map using preloaded results")
3105            peers_result = preloaded_results[0]
3106            local_result = preloaded_results[1]
3107        else:
3108            cl = ConsistencyLevel.ONE
3109            if not self._token_meta_enabled:
3110                log.debug("[control connection] Refreshing node list without token map")
3111                sel_peers = self._SELECT_PEERS_NO_TOKENS
3112                sel_local = self._SELECT_LOCAL_NO_TOKENS
3113            else:
3114                log.debug("[control connection] Refreshing node list and token map")
3115                sel_peers = self._SELECT_PEERS
3116                sel_local = self._SELECT_LOCAL
3117            peers_query = QueryMessage(query=sel_peers, consistency_level=cl)
3118            local_query = QueryMessage(query=sel_local, consistency_level=cl)
3119            peers_result, local_result = connection.wait_for_responses(
3120                peers_query, local_query, timeout=self._timeout)
3121
3122        peers_result = dict_factory(*peers_result.results)
3123
3124        partitioner = None
3125        token_map = {}
3126
3127        found_hosts = set()
3128        if local_result.results:
3129            found_hosts.add(connection.endpoint)
3130            local_rows = dict_factory(*(local_result.results))
3131            local_row = local_rows[0]
3132            cluster_name = local_row["cluster_name"]
3133            self._cluster.metadata.cluster_name = cluster_name
3134
3135            partitioner = local_row.get("partitioner")
3136            tokens = local_row.get("tokens")
3137
3138            host = self._cluster.metadata.get_host(connection.endpoint)
3139            if host:
3140                datacenter = local_row.get("data_center")
3141                rack = local_row.get("rack")
3142                self._update_location_info(host, datacenter, rack)
3143                host.host_id = local_row.get("host_id")
3144                host.listen_address = local_row.get("listen_address")
3145                host.broadcast_address = local_row.get("broadcast_address")
3146
3147                host.broadcast_rpc_address = self._address_from_row(local_row)
3148                if host.broadcast_rpc_address is None:
3149                    if self._token_meta_enabled:
3150                        # local rpc_address is not available, use the connection endpoint
3151                        host.broadcast_rpc_address = connection.endpoint.address
3152                    else:
3153                        # local rpc_address has not been queried yet, try to fetch it
3154                        # separately, which might fail because C* < 2.1.6 doesn't have rpc_address
3155                        # in system.local. See CASSANDRA-9436.
3156                        local_rpc_address_query = QueryMessage(query=self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS,
3157                                                               consistency_level=ConsistencyLevel.ONE)
3158                        success, local_rpc_address_result = connection.wait_for_response(
3159                            local_rpc_address_query, timeout=self._timeout, fail_on_error=False)
3160                        if success:
3161                            row = dict_factory(*local_rpc_address_result.results)
3162                            host.broadcast_rpc_address = row[0]['rpc_address']
3163                        else:
3164                            host.broadcast_rpc_address = connection.endpoint.address
3165
3166                host.release_version = local_row.get("release_version")
3167                host.dse_version = local_row.get("dse_version")
3168                host.dse_workload = local_row.get("workload")
3169
3170                if partitioner and tokens:
3171                    token_map[host] = tokens
3172
3173        # Check metadata.partitioner to see if we haven't built anything yet. If
3174        # every node in the cluster was in the contact points, we won't discover
3175        # any new nodes, so we need this additional check.  (See PYTHON-90)
3176        should_rebuild_token_map = force_token_rebuild or self._cluster.metadata.partitioner is None
3177        for row in peers_result:
3178            endpoint = self._cluster.endpoint_factory.create(row)
3179
3180            tokens = row.get("tokens", None)
3181            if 'tokens' in row and not tokens:  # it was selected, but empty
3182                log.warning("Excluding host (%s) with no tokens in system.peers table of %s." % (endpoint, connection.endpoint))
3183                continue
3184            if endpoint in found_hosts:
3185                log.warning("Found multiple hosts with the same endpoint (%s). Excluding peer %s", endpoint, row.get("peer"))
3186                continue
3187
3188            found_hosts.add(endpoint)
3189
3190            host = self._cluster.metadata.get_host(endpoint)
3191            datacenter = row.get("data_center")
3192            rack = row.get("rack")
3193            if host is None:
3194                log.debug("[control connection] Found new host to connect to: %s", endpoint)
3195                host, _ = self._cluster.add_host(endpoint, datacenter, rack, signal=True, refresh_nodes=False)
3196                should_rebuild_token_map = True
3197            else:
3198                should_rebuild_token_map |= self._update_location_info(host, datacenter, rack)
3199
3200            host.host_id = row.get("host_id")
3201            host.broadcast_address = row.get("peer")
3202            host.broadcast_rpc_address = self._address_from_row(row)
3203            host.release_version = row.get("release_version")
3204            host.dse_version = row.get("dse_version")
3205            host.dse_workload = row.get("workload")
3206
3207            if partitioner and tokens:
3208                token_map[host] = tokens
3209
3210        for old_host in self._cluster.metadata.all_hosts():
3211            if old_host.endpoint.address != connection.endpoint and old_host.endpoint not in found_hosts:
3212                should_rebuild_token_map = True
3213                log.debug("[control connection] Removing host not found in peers metadata: %r", old_host)
3214                self._cluster.remove_host(old_host)
3215
3216        log.debug("[control connection] Finished fetching ring info")
3217        if partitioner and should_rebuild_token_map:
3218            log.debug("[control connection] Rebuilding token map due to topology changes")
3219            self._cluster.metadata.rebuild_token_map(partitioner, token_map)
3220
3221    def _update_location_info(self, host, datacenter, rack):
3222        if host.datacenter == datacenter and host.rack == rack:
3223            return False
3224
3225        # If the dc/rack information changes, we need to update the load balancing policy.
3226        # For that, we remove and re-add the node against the policy. Not the most elegant, and assumes
3227        # that the policy will update correctly, but in practice this should work.
3228        self._cluster.profile_manager.on_down(host)
3229        host.set_location_info(datacenter, rack)
3230        self._cluster.profile_manager.on_up(host)
3231        return True
3232
3233    def _delay_for_event_type(self, event_type, delay_window):
3234        # this serves to order processing correlated events (received within the window)
3235        # the window and randomization still have the desired effect of skew across client instances
3236        next_time = self._event_schedule_times.get(event_type, 0)
3237        now = self._time.time()
3238        if now <= next_time:
3239            this_time = next_time + 0.01
3240            delay = this_time - now
3241        else:
3242            delay = random() * delay_window
3243            this_time = now + delay
3244        self._event_schedule_times[event_type] = this_time
3245        return delay
3246
3247    def _refresh_nodes_if_not_up(self, host):
3248        """
3249        Used to mitigate refreshes for nodes that are already known.
3250        Some versions of the server send superfluous NEW_NODE messages in addition to UP events.
3251        """
3252        if not host or not host.is_up:
3253            self.refresh_node_list_and_token_map()
3254
3255    def _handle_topology_change(self, event):
3256        change_type = event["change_type"]
3257        host = self._cluster.metadata.get_host(event["address"][0])
3258        if change_type == "NEW_NODE" or change_type == "MOVED_NODE":
3259            if self._topology_event_refresh_window >= 0:
3260                delay = self._delay_for_event_type('topology_change', self._topology_event_refresh_window)
3261                self._cluster.scheduler.schedule_unique(delay, self._refresh_nodes_if_not_up, host)
3262        elif change_type == "REMOVED_NODE":
3263            self._cluster.scheduler.schedule_unique(0, self._cluster.remove_host, host)
3264
3265    def _handle_status_change(self, event):
3266        change_type = event["change_type"]
3267        host = self._cluster.metadata.get_host(event["address"][0])
3268        if change_type == "UP":
3269            delay = self._delay_for_event_type('status_change', self._status_event_refresh_window)
3270            if host is None:
3271                # this is the first time we've seen the node
3272                self._cluster.scheduler.schedule_unique(delay, self.refresh_node_list_and_token_map)
3273            else:
3274                self._cluster.scheduler.schedule_unique(delay, self._cluster.on_up, host)
3275        elif change_type == "DOWN":
3276            # Note that there is a slight risk we can receive the event late and thus
3277            # mark the host down even though we already had reconnected successfully.
3278            # But it is unlikely, and don't have too much consequence since we'll try reconnecting
3279            # right away, so we favor the detection to make the Host.is_up more accurate.
3280            if host is not None:
3281                # this will be run by the scheduler
3282                self._cluster.on_down(host, is_host_addition=False)
3283
3284    def _handle_schema_change(self, event):
3285        if self._schema_event_refresh_window < 0:
3286            return
3287        delay = self._delay_for_event_type('schema_change', self._schema_event_refresh_window)
3288        self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, **event)
3289
3290    def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None):
3291
3292        total_timeout = wait_time if wait_time is not None else self._cluster.max_schema_agreement_wait
3293        if total_timeout <= 0:
3294            return True
3295
3296        # Each schema change typically generates two schema refreshes, one
3297        # from the response type and one from the pushed notification. Holding
3298        # a lock is just a simple way to cut down on the number of schema queries
3299        # we'll make.
3300        with self._schema_agreement_lock:
3301            if self._is_shutdown:
3302                return
3303
3304            if not connection:
3305                connection = self._connection
3306
3307            if preloaded_results:
3308                log.debug("[control connection] Attempting to use preloaded results for schema agreement")
3309
3310                peers_result = preloaded_results[0]
3311                local_result = preloaded_results[1]
3312                schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
3313                if schema_mismatches is None:
3314                    return True
3315
3316            log.debug("[control connection] Waiting for schema agreement")
3317            start = self._time.time()
3318            elapsed = 0
3319            cl = ConsistencyLevel.ONE
3320            schema_mismatches = None
3321            while elapsed < total_timeout:
3322                peers_query = QueryMessage(query=self._SELECT_SCHEMA_PEERS, consistency_level=cl)
3323                local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl)
3324                try:
3325                    timeout = min(self._timeout, total_timeout - elapsed)
3326                    peers_result, local_result = connection.wait_for_responses(
3327                        peers_query, local_query, timeout=timeout)
3328                except OperationTimedOut as timeout:
3329                    log.debug("[control connection] Timed out waiting for "
3330                              "response during schema agreement check: %s", timeout)
3331                    elapsed = self._time.time() - start
3332                    continue
3333                except ConnectionShutdown:
3334                    if self._is_shutdown:
3335                        log.debug("[control connection] Aborting wait for schema match due to shutdown")
3336                        return None
3337                    else:
3338                        raise
3339
3340                schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
3341                if schema_mismatches is None:
3342                    return True
3343
3344                log.debug("[control connection] Schemas mismatched, trying again")
3345                self._time.sleep(0.2)
3346                elapsed = self._time.time() - start
3347
3348            log.warning("Node %s is reporting a schema disagreement: %s",
3349                        connection.endpoint, schema_mismatches)
3350            return False
3351
3352    def _get_schema_mismatches(self, peers_result, local_result, local_address):
3353        peers_result = dict_factory(*peers_result.results)
3354
3355        versions = defaultdict(set)
3356        if local_result.results:
3357            local_row = dict_factory(*local_result.results)[0]
3358            if local_row.get("schema_version"):
3359                versions[local_row.get("schema_version")].add(local_address)
3360
3361        for row in peers_result:
3362            schema_ver = row.get('schema_version')
3363            if not schema_ver:
3364                continue
3365            endpoint = self._cluster.endpoint_factory.create(row)
3366            peer = self._cluster.metadata.get_host(endpoint)
3367            if peer and peer.is_up is not False:
3368                versions[schema_ver].add(endpoint)
3369
3370        if len(versions) == 1:
3371            log.debug("[control connection] Schemas match")
3372            return None
3373
3374        return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
3375
3376    def _address_from_row(self, row):
3377        """
3378        Parse the broadcast rpc address from a row and return it untranslated.
3379        """
3380        addr = None
3381        if "rpc_address" in row:
3382            addr = row.get("rpc_address")  # peers and local
3383        if "native_transport_address" in row:
3384            addr = row.get("native_transport_address")
3385        if not addr or addr in ["0.0.0.0", "::"]:
3386            addr = row.get("peer")
3387
3388        return addr
3389
3390    def _signal_error(self):
3391        with self._lock:
3392            if self._is_shutdown:
3393                return
3394
3395            # try just signaling the cluster, as this will trigger a reconnect
3396            # as part of marking the host down
3397            if self._connection and self._connection.is_defunct:
3398                host = self._cluster.metadata.get_host(self._connection.endpoint)
3399                # host may be None if it's already been removed, but that indicates
3400                # that errors have already been reported, so we're fine
3401                if host:
3402                    self._cluster.signal_connection_failure(
3403                        host, self._connection.last_error, is_host_addition=False)
3404                    return
3405
3406        # if the connection is not defunct or the host already left, reconnect
3407        # manually
3408        self.reconnect()
3409
3410    def on_up(self, host):
3411        pass
3412
3413    def on_down(self, host):
3414
3415        conn = self._connection
3416        if conn and conn.endpoint == host.endpoint and \
3417                self._reconnection_handler is None:
3418            log.debug("[control connection] Control connection host (%s) is "
3419                      "considered down, starting reconnection", host)
3420            # this will result in a task being submitted to the executor to reconnect
3421            self.reconnect()
3422
3423    def on_add(self, host, refresh_nodes=True):
3424        if refresh_nodes:
3425            self.refresh_node_list_and_token_map(force_token_rebuild=True)
3426
3427    def on_remove(self, host):
3428        c = self._connection
3429        if c and c.endpoint == host.endpoint:
3430            log.debug("[control connection] Control connection host (%s) is being removed. Reconnecting", host)
3431            # refresh will be done on reconnect
3432            self.reconnect()
3433        else:
3434            self.refresh_node_list_and_token_map(force_token_rebuild=True)
3435
3436    def get_connections(self):
3437        c = getattr(self, '_connection', None)
3438        return [c] if c else []
3439
3440    def return_connection(self, connection):
3441        if connection is self._connection and (connection.is_defunct or connection.is_closed):
3442            self.reconnect()
3443
3444
3445def _stop_scheduler(scheduler, thread):
3446    try:
3447        if not scheduler.is_shutdown:
3448            scheduler.shutdown()
3449    except ReferenceError:
3450        pass
3451
3452    thread.join()
3453
3454
3455class _Scheduler(Thread):
3456
3457    _queue = None
3458    _scheduled_tasks = None
3459    _executor = None
3460    is_shutdown = False
3461
3462    def __init__(self, executor):
3463        self._queue = Queue.PriorityQueue()
3464        self._scheduled_tasks = set()
3465        self._count = count()
3466        self._executor = executor
3467
3468        Thread.__init__(self, name="Task Scheduler")
3469        self.daemon = True
3470        self.start()
3471
3472    def shutdown(self):
3473        try:
3474            log.debug("Shutting down Cluster Scheduler")
3475        except AttributeError:
3476            # this can happen on interpreter shutdown
3477            pass
3478        self.is_shutdown = True
3479        self._queue.put_nowait((0, 0, None))
3480        self.join()
3481
3482    def schedule(self, delay, fn, *args, **kwargs):
3483        self._insert_task(delay, (fn, args, tuple(kwargs.items())))
3484
3485    def schedule_unique(self, delay, fn, *args, **kwargs):
3486        task = (fn, args, tuple(kwargs.items()))
3487        if task not in self._scheduled_tasks:
3488            self._insert_task(delay, task)
3489        else:
3490            log.debug("Ignoring schedule_unique for already-scheduled task: %r", task)
3491
3492    def _insert_task(self, delay, task):
3493        if not self.is_shutdown:
3494            run_at = time.time() + delay
3495            self._scheduled_tasks.add(task)
3496            self._queue.put_nowait((run_at, next(self._count), task))
3497        else:
3498            log.debug("Ignoring scheduled task after shutdown: %r", task)
3499
3500    def run(self):
3501        while True:
3502            if self.is_shutdown:
3503                return
3504
3505            try:
3506                while True:
3507                    run_at, i, task = self._queue.get(block=True, timeout=None)
3508                    if self.is_shutdown:
3509                        if task:
3510                            log.debug("Not executing scheduled task due to Scheduler shutdown")
3511                        return
3512                    if run_at <= time.time():
3513                        self._scheduled_tasks.discard(task)
3514                        fn, args, kwargs = task
3515                        kwargs = dict(kwargs)
3516                        future = self._executor.submit(fn, *args, **kwargs)
3517                        future.add_done_callback(self._log_if_failed)
3518                    else:
3519                        self._queue.put_nowait((run_at, i, task))
3520                        break
3521            except Queue.Empty:
3522                pass
3523
3524            time.sleep(0.1)
3525
3526    def _log_if_failed(self, future):
3527        exc = future.exception()
3528        if exc:
3529            log.warning(
3530                "An internally scheduled tasked failed with an unhandled exception:",
3531                exc_info=exc)
3532
3533
3534def refresh_schema_and_set_result(control_conn, response_future, connection, **kwargs):
3535    try:
3536        log.debug("Refreshing schema in response to schema change. "
3537                  "%s", kwargs)
3538        response_future.is_schema_agreed = control_conn._refresh_schema(connection, **kwargs)
3539    except Exception:
3540        log.exception("Exception refreshing schema in response to schema change:")
3541        response_future.session.submit(control_conn.refresh_schema, **kwargs)
3542    finally:
3543        response_future._set_final_result(None)
3544
3545
3546class ResponseFuture(object):
3547    """
3548    An asynchronous response delivery mechanism that is returned from calls
3549    to :meth:`.Session.execute_async()`.
3550
3551    There are two ways for results to be delivered:
3552     - Synchronously, by calling :meth:`.result()`
3553     - Asynchronously, by attaching callback and errback functions via
3554       :meth:`.add_callback()`, :meth:`.add_errback()`, and
3555       :meth:`.add_callbacks()`.
3556    """
3557
3558    query = None
3559    """
3560    The :class:`~.Statement` instance that is being executed through this
3561    :class:`.ResponseFuture`.
3562    """
3563
3564    is_schema_agreed = True
3565    """
3566    For DDL requests, this may be set ``False`` if the schema agreement poll after the response fails.
3567
3568    Always ``True`` for non-DDL requests.
3569    """
3570
3571    request_encoded_size = None
3572    """
3573    Size of the request message sent
3574    """
3575
3576    coordinator_host = None
3577    """
3578    The host from which we recieved a response
3579    """
3580
3581    attempted_hosts = None
3582    """
3583    A list of hosts tried, including all speculative executions, retries, and pages
3584    """
3585
3586    session = None
3587    row_factory = None
3588    message = None
3589    default_timeout = None
3590
3591    _retry_policy = None
3592    _profile_manager = None
3593
3594    _req_id = None
3595    _final_result = _NOT_SET
3596    _col_names = None
3597    _col_types = None
3598    _final_exception = None
3599    _query_traces = None
3600    _callbacks = None
3601    _errbacks = None
3602    _current_host = None
3603    _connection = None
3604    _query_retries = 0
3605    _start_time = None
3606    _metrics = None
3607    _paging_state = None
3608    _custom_payload = None
3609    _warnings = None
3610    _timer = None
3611    _protocol_handler = ProtocolHandler
3612    _spec_execution_plan = NoSpeculativeExecutionPlan()
3613    _host = None
3614
3615    _warned_timeout = False
3616
3617    def __init__(self, session, message, query, timeout, metrics=None, prepared_statement=None,
3618                 retry_policy=RetryPolicy(), row_factory=None, load_balancer=None, start_time=None,
3619                 speculative_execution_plan=None, host=None):
3620        self.session = session
3621        # TODO: normalize handling of retry policy and row factory
3622        self.row_factory = row_factory or session.row_factory
3623        self._load_balancer = load_balancer or session.cluster._default_load_balancing_policy
3624        self.message = message
3625        self.query = query
3626        self.timeout = timeout
3627        self._retry_policy = retry_policy
3628        self._metrics = metrics
3629        self.prepared_statement = prepared_statement
3630        self._callback_lock = Lock()
3631        self._start_time = start_time or time.time()
3632        self._host = host
3633        self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
3634        self._make_query_plan()
3635        self._event = Event()
3636        self._errors = {}
3637        self._callbacks = []
3638        self._errbacks = []
3639        self.attempted_hosts = []
3640        self._start_timer()
3641
3642    @property
3643    def _time_remaining(self):
3644        if self.timeout is None:
3645            return None
3646        return (self._start_time + self.timeout) - time.time()
3647
3648    def _start_timer(self):
3649        if self._timer is None:
3650            spec_delay = self._spec_execution_plan.next_execution(self._current_host)
3651            if spec_delay >= 0:
3652                if self._time_remaining is None or self._time_remaining > spec_delay:
3653                    self._timer = self.session.cluster.connection_class.create_timer(spec_delay, self._on_speculative_execute)
3654                    return
3655            if self._time_remaining is not None:
3656                self._timer = self.session.cluster.connection_class.create_timer(self._time_remaining, self._on_timeout)
3657
3658    def _cancel_timer(self):
3659        if self._timer:
3660            self._timer.cancel()
3661
3662    def _on_timeout(self, _attempts=0):
3663        """
3664        Called when the request associated with this ResponseFuture times out.
3665
3666        This function may reschedule itself. The ``_attempts`` parameter tracks
3667        the number of times this has happened. This parameter should only be
3668        set in those cases, where ``_on_timeout`` reschedules itself.
3669        """
3670        # PYTHON-853: for short timeouts, we sometimes race with our __init__
3671        if self._connection is None and _attempts < 3:
3672            self._timer = self.session.cluster.connection_class.create_timer(
3673                0.01,
3674                partial(self._on_timeout, _attempts=_attempts + 1)
3675            )
3676            return
3677
3678        if self._connection is not None:
3679            try:
3680                self._connection._requests.pop(self._req_id)
3681            # This prevents the race condition of the
3682            # event loop thread just receiving the waited message
3683            # If it arrives after this, it will be ignored
3684            except KeyError:
3685                return
3686
3687            pool = self.session._pools.get(self._current_host)
3688            if pool and not pool.is_shutdown:
3689                with self._connection.lock:
3690                    self._connection.request_ids.append(self._req_id)
3691
3692                pool.return_connection(self._connection)
3693
3694        errors = self._errors
3695        if not errors:
3696            if self.is_schema_agreed:
3697                key = str(self._current_host.endpoint) if self._current_host else 'no host queried before timeout'
3698                errors = {key: "Client request timeout. See Session.execute[_async](timeout)"}
3699            else:
3700                connection = self.session.cluster.control_connection._connection
3701                host = str(connection.endpoint) if connection else 'unknown'
3702                errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."}
3703
3704        self._set_final_exception(OperationTimedOut(errors, self._current_host))
3705
3706    def _on_speculative_execute(self):
3707        self._timer = None
3708        if not self._event.is_set():
3709
3710            # PYTHON-836, the speculative queries must be after
3711            # the query is sent from the main thread, otherwise the
3712            # query from the main thread may raise NoHostAvailable
3713            # if the _query_plan has been exhausted by the specualtive queries.
3714            # This also prevents a race condition accessing the iterator.
3715            # We reschedule this call until the main thread has succeeded
3716            # making a query
3717            if not self.attempted_hosts:
3718                self._timer = self.session.cluster.connection_class.create_timer(0.01, self._on_speculative_execute)
3719                return
3720
3721            if self._time_remaining is not None:
3722                if self._time_remaining <= 0:
3723                    self._on_timeout()
3724                    return
3725            self.send_request(error_no_hosts=False)
3726            self._start_timer()
3727
3728    def _make_query_plan(self):
3729        # set the query_plan according to the load balancing policy,
3730        # or to the explicit host target if set
3731        if self._host:
3732            # returning a single value effectively disables retries
3733            self.query_plan = [self._host]
3734        else:
3735            # convert the list/generator/etc to an iterator so that subsequent
3736            # calls to send_request (which retries may do) will resume where
3737            # they last left off
3738            self.query_plan = iter(self._load_balancer.make_query_plan(self.session.keyspace, self.query))
3739
3740    def send_request(self, error_no_hosts=True):
3741        """ Internal """
3742        # query_plan is an iterator, so this will resume where we last left
3743        # off if send_request() is called multiple times
3744        for host in self.query_plan:
3745            req_id = self._query(host)
3746            if req_id is not None:
3747                self._req_id = req_id
3748                return True
3749            if self.timeout is not None and time.time() - self._start_time > self.timeout:
3750                self._on_timeout()
3751                return True
3752
3753        if error_no_hosts:
3754            self._set_final_exception(NoHostAvailable(
3755                "Unable to complete the operation against any hosts", self._errors))
3756        return False
3757
3758    def _query(self, host, message=None, cb=None):
3759        if message is None:
3760            message = self.message
3761
3762        pool = self.session._pools.get(host)
3763        if not pool:
3764            self._errors[host] = ConnectionException("Host has been marked down or removed")
3765            return None
3766        elif pool.is_shutdown:
3767            self._errors[host] = ConnectionException("Pool is shutdown")
3768            return None
3769
3770        self._current_host = host
3771
3772        connection = None
3773        try:
3774            # TODO get connectTimeout from cluster settings
3775            connection, request_id = pool.borrow_connection(timeout=2.0)
3776            self._connection = connection
3777            result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
3778
3779            if cb is None:
3780                cb = partial(self._set_result, host, connection, pool)
3781
3782            self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
3783                                                            encoder=self._protocol_handler.encode_message,
3784                                                            decoder=self._protocol_handler.decode_message,
3785                                                            result_metadata=result_meta)
3786            self.attempted_hosts.append(host)
3787            return request_id
3788        except NoConnectionsAvailable as exc:
3789            log.debug("All connections for host %s are at capacity, moving to the next host", host)
3790            self._errors[host] = exc
3791            return None
3792        except Exception as exc:
3793            log.debug("Error querying host %s", host, exc_info=True)
3794            self._errors[host] = exc
3795            if self._metrics is not None:
3796                self._metrics.on_connection_error()
3797            if connection:
3798                pool.return_connection(connection)
3799            return None
3800
3801    @property
3802    def has_more_pages(self):
3803        """
3804        Returns :const:`True` if there are more pages left in the
3805        query results, :const:`False` otherwise.  This should only
3806        be checked after the first page has been returned.
3807
3808        .. versionadded:: 2.0.0
3809        """
3810        return self._paging_state is not None
3811
3812    @property
3813    def warnings(self):
3814        """
3815        Warnings returned from the server, if any. This will only be
3816        set for protocol_version 4+.
3817
3818        Warnings may be returned for such things as oversized batches,
3819        or too many tombstones in slice queries.
3820
3821        Ensure the future is complete before trying to access this property
3822        (call :meth:`.result()`, or after callback is invoked).
3823        Otherwise it may throw if the response has not been received.
3824        """
3825        # TODO: When timers are introduced, just make this wait
3826        if not self._event.is_set():
3827            raise DriverException("warnings cannot be retrieved before ResponseFuture is finalized")
3828        return self._warnings
3829
3830    @property
3831    def custom_payload(self):
3832        """
3833        The custom payload returned from the server, if any. This will only be
3834        set by Cassandra servers implementing a custom QueryHandler, and only
3835        for protocol_version 4+.
3836
3837        Ensure the future is complete before trying to access this property
3838        (call :meth:`.result()`, or after callback is invoked).
3839        Otherwise it may throw if the response has not been received.
3840
3841        :return: :ref:`custom_payload`.
3842        """
3843        # TODO: When timers are introduced, just make this wait
3844        if not self._event.is_set():
3845            raise DriverException("custom_payload cannot be retrieved before ResponseFuture is finalized")
3846        return self._custom_payload
3847
3848    def start_fetching_next_page(self):
3849        """
3850        If there are more pages left in the query result, this asynchronously
3851        starts fetching the next page.  If there are no pages left, :exc:`.QueryExhausted`
3852        is raised.  Also see :attr:`.has_more_pages`.
3853
3854        This should only be called after the first page has been returned.
3855
3856        .. versionadded:: 2.0.0
3857        """
3858        if not self._paging_state:
3859            raise QueryExhausted()
3860
3861        self._make_query_plan()
3862        self.message.paging_state = self._paging_state
3863        self._event.clear()
3864        self._final_result = _NOT_SET
3865        self._final_exception = None
3866        self._start_timer()
3867        self.send_request()
3868
3869    def _reprepare(self, prepare_message, host, connection, pool):
3870        cb = partial(self.session.submit, self._execute_after_prepare, host, connection, pool)
3871        request_id = self._query(host, prepare_message, cb=cb)
3872        if request_id is None:
3873            # try to submit the original prepared statement on some other host
3874            self.send_request()
3875
3876    def _set_result(self, host, connection, pool, response):
3877        try:
3878            self.coordinator_host = host
3879            if pool:
3880                pool.return_connection(connection)
3881
3882            trace_id = getattr(response, 'trace_id', None)
3883            if trace_id:
3884                if not self._query_traces:
3885                    self._query_traces = []
3886                self._query_traces.append(QueryTrace(trace_id, self.session))
3887
3888            self._warnings = getattr(response, 'warnings', None)
3889            self._custom_payload = getattr(response, 'custom_payload', None)
3890
3891            if isinstance(response, ResultMessage):
3892                if response.kind == RESULT_KIND_SET_KEYSPACE:
3893                    session = getattr(self, 'session', None)
3894                    # since we're running on the event loop thread, we need to
3895                    # use a non-blocking method for setting the keyspace on
3896                    # all connections in this session, otherwise the event
3897                    # loop thread will deadlock waiting for keyspaces to be
3898                    # set.  This uses a callback chain which ends with
3899                    # self._set_keyspace_completed() being called in the
3900                    # event loop thread.
3901                    if session:
3902                        session._set_keyspace_for_all_pools(
3903                            response.results, self._set_keyspace_completed)
3904                elif response.kind == RESULT_KIND_SCHEMA_CHANGE:
3905                    # refresh the schema before responding, but do it in another
3906                    # thread instead of the event loop thread
3907                    self.is_schema_agreed = False
3908                    self.session.submit(
3909                        refresh_schema_and_set_result,
3910                        self.session.cluster.control_connection,
3911                        self, connection, **response.results)
3912                else:
3913                    results = getattr(response, 'results', None)
3914                    if results is not None and response.kind == RESULT_KIND_ROWS:
3915                        self._paging_state = response.paging_state
3916                        self._col_types = response.col_types
3917                        self._col_names = results[0]
3918                        results = self.row_factory(*results)
3919                    self._set_final_result(results)
3920            elif isinstance(response, ErrorMessage):
3921                retry_policy = self._retry_policy
3922
3923                if isinstance(response, ReadTimeoutErrorMessage):
3924                    if self._metrics is not None:
3925                        self._metrics.on_read_timeout()
3926                    retry = retry_policy.on_read_timeout(
3927                        self.query, retry_num=self._query_retries, **response.info)
3928                elif isinstance(response, WriteTimeoutErrorMessage):
3929                    if self._metrics is not None:
3930                        self._metrics.on_write_timeout()
3931                    retry = retry_policy.on_write_timeout(
3932                        self.query, retry_num=self._query_retries, **response.info)
3933                elif isinstance(response, UnavailableErrorMessage):
3934                    if self._metrics is not None:
3935                        self._metrics.on_unavailable()
3936                    retry = retry_policy.on_unavailable(
3937                        self.query, retry_num=self._query_retries, **response.info)
3938                elif isinstance(response, (OverloadedErrorMessage,
3939                                           IsBootstrappingErrorMessage,
3940                                           TruncateError, ServerError)):
3941                    log.warning("Host %s error: %s.", host, response.summary)
3942                    if self._metrics is not None:
3943                        self._metrics.on_other_error()
3944                    retry = retry_policy.on_request_error(
3945                        self.query, self.message.consistency_level, error=response,
3946                        retry_num=self._query_retries)
3947                elif isinstance(response, PreparedQueryNotFound):
3948                    if self.prepared_statement:
3949                        query_id = self.prepared_statement.query_id
3950                        assert query_id == response.info, \
3951                            "Got different query ID in server response (%s) than we " \
3952                            "had before (%s)" % (response.info, query_id)
3953                    else:
3954                        query_id = response.info
3955
3956                    try:
3957                        prepared_statement = self.session.cluster._prepared_statements[query_id]
3958                    except KeyError:
3959                        if not self.prepared_statement:
3960                            log.error("Tried to execute unknown prepared statement: id=%s",
3961                                      query_id.encode('hex'))
3962                            self._set_final_exception(response)
3963                            return
3964                        else:
3965                            prepared_statement = self.prepared_statement
3966                            self.session.cluster._prepared_statements[query_id] = prepared_statement
3967
3968                    current_keyspace = self._connection.keyspace
3969                    prepared_keyspace = prepared_statement.keyspace
3970                    if not ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) \
3971                            and prepared_keyspace  and current_keyspace != prepared_keyspace:
3972                        self._set_final_exception(
3973                            ValueError("The Session's current keyspace (%s) does "
3974                                       "not match the keyspace the statement was "
3975                                       "prepared with (%s)" %
3976                                       (current_keyspace, prepared_keyspace)))
3977                        return
3978
3979                    log.debug("Re-preparing unrecognized prepared statement against host %s: %s",
3980                              host, prepared_statement.query_string)
3981                    prepared_keyspace = prepared_statement.keyspace \
3982                        if ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) else None
3983                    prepare_message = PrepareMessage(query=prepared_statement.query_string,
3984                                                     keyspace=prepared_keyspace)
3985                    # since this might block, run on the executor to avoid hanging
3986                    # the event loop thread
3987                    self.session.submit(self._reprepare, prepare_message, host, connection, pool)
3988                    return
3989                else:
3990                    if hasattr(response, 'to_exception'):
3991                        self._set_final_exception(response.to_exception())
3992                    else:
3993                        self._set_final_exception(response)
3994                    return
3995
3996                self._handle_retry_decision(retry, response, host)
3997            elif isinstance(response, ConnectionException):
3998                if self._metrics is not None:
3999                    self._metrics.on_connection_error()
4000                if not isinstance(response, ConnectionShutdown):
4001                    self._connection.defunct(response)
4002                retry = self._retry_policy.on_request_error(
4003                    self.query, self.message.consistency_level, error=response,
4004                    retry_num=self._query_retries)
4005                self._handle_retry_decision(retry, response, host)
4006            elif isinstance(response, Exception):
4007                if hasattr(response, 'to_exception'):
4008                    self._set_final_exception(response.to_exception())
4009                else:
4010                    self._set_final_exception(response)
4011            else:
4012                # we got some other kind of response message
4013                msg = "Got unexpected message: %r" % (response,)
4014                exc = ConnectionException(msg, host)
4015                self._cancel_timer()
4016                self._connection.defunct(exc)
4017                self._set_final_exception(exc)
4018        except Exception as exc:
4019            # almost certainly caused by a bug, but we need to set something here
4020            log.exception("Unexpected exception while handling result in ResponseFuture:")
4021            self._set_final_exception(exc)
4022
4023    def _set_keyspace_completed(self, errors):
4024        if not errors:
4025            self._set_final_result(None)
4026        else:
4027            self._set_final_exception(ConnectionException(
4028                "Failed to set keyspace on all hosts: %s" % (errors,)))
4029
4030    def _execute_after_prepare(self, host, connection, pool, response):
4031        """
4032        Handle the response to our attempt to prepare a statement.
4033        If it succeeded, run the original query again against the same host.
4034        """
4035        if pool:
4036            pool.return_connection(connection)
4037
4038        if self._final_exception:
4039            return
4040
4041        if isinstance(response, ResultMessage):
4042            if response.kind == RESULT_KIND_PREPARED:
4043                if self.prepared_statement:
4044                    # result metadata is the only thing that could have
4045                    # changed from an alter
4046                    (_, _, _,
4047                     self.prepared_statement.result_metadata,
4048                     new_metadata_id) = response.results
4049                    if new_metadata_id is not None:
4050                        self.prepared_statement.result_metadata_id = new_metadata_id
4051
4052                # use self._query to re-use the same host and
4053                # at the same time properly borrow the connection
4054                request_id = self._query(host)
4055                if request_id is None:
4056                    # this host errored out, move on to the next
4057                    self.send_request()
4058            else:
4059                self._set_final_exception(ConnectionException(
4060                    "Got unexpected response when preparing statement "
4061                    "on host %s: %s" % (host, response)))
4062        elif isinstance(response, ErrorMessage):
4063            if hasattr(response, 'to_exception'):
4064                self._set_final_exception(response.to_exception())
4065            else:
4066                self._set_final_exception(response)
4067        elif isinstance(response, ConnectionException):
4068            log.debug("Connection error when preparing statement on host %s: %s",
4069                      host, response)
4070            # try again on a different host, preparing again if necessary
4071            self._errors[host] = response
4072            self.send_request()
4073        else:
4074            self._set_final_exception(ConnectionException(
4075                "Got unexpected response type when preparing "
4076                "statement on host %s: %s" % (host, response)))
4077
4078    def _set_final_result(self, response):
4079        self._cancel_timer()
4080        if self._metrics is not None:
4081            self._metrics.request_timer.addValue(time.time() - self._start_time)
4082
4083        with self._callback_lock:
4084            self._final_result = response
4085            # save off current callbacks inside lock for execution outside it
4086            # -- prevents case where _final_result is set, then a callback is
4087            # added and executed on the spot, then executed again as a
4088            # registered callback
4089            to_call = tuple(
4090                partial(fn, response, *args, **kwargs)
4091                for (fn, args, kwargs) in self._callbacks
4092            )
4093
4094        self._event.set()
4095
4096        # apply each callback
4097        for callback_partial in to_call:
4098            callback_partial()
4099
4100    def _set_final_exception(self, response):
4101        self._cancel_timer()
4102        if self._metrics is not None:
4103            self._metrics.request_timer.addValue(time.time() - self._start_time)
4104
4105        with self._callback_lock:
4106            self._final_exception = response
4107            # save off current errbacks inside lock for execution outside it --
4108            # prevents case where _final_exception is set, then an errback is
4109            # added and executed on the spot, then executed again as a
4110            # registered errback
4111            to_call = tuple(
4112                partial(fn, response, *args, **kwargs)
4113                for (fn, args, kwargs) in self._errbacks
4114            )
4115        self._event.set()
4116
4117        # apply each callback
4118        for callback_partial in to_call:
4119            callback_partial()
4120
4121    def _handle_retry_decision(self, retry_decision, response, host):
4122
4123        def exception_from_response(response):
4124            if hasattr(response, 'to_exception'):
4125                return response.to_exception()
4126            else:
4127                return response
4128
4129        retry_type, consistency = retry_decision
4130        if retry_type in (RetryPolicy.RETRY, RetryPolicy.RETRY_NEXT_HOST):
4131            self._query_retries += 1
4132            reuse = retry_type == RetryPolicy.RETRY
4133            self._retry(reuse, consistency, host)
4134        elif retry_type is RetryPolicy.RETHROW:
4135            self._set_final_exception(exception_from_response(response))
4136        else:  # IGNORE
4137            if self._metrics is not None:
4138                self._metrics.on_ignore()
4139            self._set_final_result(None)
4140
4141        self._errors[host] = exception_from_response(response)
4142
4143    def _retry(self, reuse_connection, consistency_level, host):
4144        if self._final_exception:
4145            # the connection probably broke while we were waiting
4146            # to retry the operation
4147            return
4148
4149        if self._metrics is not None:
4150            self._metrics.on_retry()
4151        if consistency_level is not None:
4152            self.message.consistency_level = consistency_level
4153
4154        # don't retry on the event loop thread
4155        self.session.submit(self._retry_task, reuse_connection, host)
4156
4157    def _retry_task(self, reuse_connection, host):
4158        if self._final_exception:
4159            # the connection probably broke while we were waiting
4160            # to retry the operation
4161            return
4162
4163        if reuse_connection and self._query(host) is not None:
4164            return
4165
4166        # otherwise, move onto another host
4167        self.send_request()
4168
4169    def result(self):
4170        """
4171        Return the final result or raise an Exception if errors were
4172        encountered.  If the final result or error has not been set
4173        yet, this method will block until it is set, or the timeout
4174        set for the request expires.
4175
4176        Timeout is specified in the Session request execution functions.
4177        If the timeout is exceeded, an :exc:`cassandra.OperationTimedOut` will be raised.
4178        This is a client-side timeout. For more information
4179        about server-side coordinator timeouts, see :class:`.policies.RetryPolicy`.
4180
4181        Example usage::
4182
4183            >>> future = session.execute_async("SELECT * FROM mycf")
4184            >>> # do other stuff...
4185
4186            >>> try:
4187            ...     rows = future.result()
4188            ...     for row in rows:
4189            ...         ... # process results
4190            ... except Exception:
4191            ...     log.exception("Operation failed:")
4192
4193        """
4194        self._event.wait()
4195        if self._final_result is not _NOT_SET:
4196            return ResultSet(self, self._final_result)
4197        else:
4198            raise self._final_exception
4199
4200    def get_query_trace_ids(self):
4201        """
4202        Returns the trace session ids for this future, if tracing was enabled (does not fetch trace data).
4203        """
4204        return [trace.trace_id for trace in self._query_traces]
4205
4206    def get_query_trace(self, max_wait=None, query_cl=ConsistencyLevel.LOCAL_ONE):
4207        """
4208        Fetches and returns the query trace of the last response, or `None` if tracing was
4209        not enabled.
4210
4211        Note that this may raise an exception if there are problems retrieving the trace
4212        details from Cassandra. If the trace is not available after `max_wait`,
4213        :exc:`cassandra.query.TraceUnavailable` will be raised.
4214
4215        If the ResponseFuture is not done (async execution) and you try to retrieve the trace,
4216        :exc:`cassandra.query.TraceUnavailable` will be raised.
4217
4218        `query_cl` is the consistency level used to poll the trace tables.
4219        """
4220        if self._final_result is _NOT_SET and self._final_exception is None:
4221            raise TraceUnavailable(
4222                "Trace information was not available. The ResponseFuture is not done.")
4223
4224        if self._query_traces:
4225            return self._get_query_trace(len(self._query_traces) - 1, max_wait, query_cl)
4226
4227    def get_all_query_traces(self, max_wait_per=None, query_cl=ConsistencyLevel.LOCAL_ONE):
4228        """
4229        Fetches and returns the query traces for all query pages, if tracing was enabled.
4230
4231        See note in :meth:`~.get_query_trace` regarding possible exceptions.
4232        """
4233        if self._query_traces:
4234            return [self._get_query_trace(i, max_wait_per, query_cl) for i in range(len(self._query_traces))]
4235        return []
4236
4237    def _get_query_trace(self, i, max_wait, query_cl):
4238        trace = self._query_traces[i]
4239        if not trace.events:
4240            trace.populate(max_wait=max_wait, query_cl=query_cl)
4241        return trace
4242
4243    def add_callback(self, fn, *args, **kwargs):
4244        """
4245        Attaches a callback function to be called when the final results arrive.
4246
4247        By default, `fn` will be called with the results as the first and only
4248        argument.  If `*args` or `**kwargs` are supplied, they will be passed
4249        through as additional positional or keyword arguments to `fn`.
4250
4251        If an error is hit while executing the operation, a callback attached
4252        here will not be called.  Use :meth:`.add_errback()` or :meth:`add_callbacks()`
4253        if you wish to handle that case.
4254
4255        If the final result has already been seen when this method is called,
4256        the callback will be called immediately (before this method returns).
4257
4258        Note: in the case that the result is not available when the callback is added,
4259        the callback is executed by IO event thread. This means that the callback
4260        should not block or attempt further synchronous requests, because no further
4261        IO will be processed until the callback returns.
4262
4263        **Important**: if the callback you attach results in an exception being
4264        raised, **the exception will be ignored**, so please ensure your
4265        callback handles all error cases that you care about.
4266
4267        Usage example::
4268
4269            >>> session = cluster.connect("mykeyspace")
4270
4271            >>> def handle_results(rows, start_time, should_log=False):
4272            ...     if should_log:
4273            ...         log.info("Total time: %f", time.time() - start_time)
4274            ...     ...
4275
4276            >>> future = session.execute_async("SELECT * FROM users")
4277            >>> future.add_callback(handle_results, time.time(), should_log=True)
4278
4279        """
4280        run_now = False
4281        with self._callback_lock:
4282            # Always add fn to self._callbacks, even when we're about to
4283            # execute it, to prevent races with functions like
4284            # start_fetching_next_page that reset _final_result
4285            self._callbacks.append((fn, args, kwargs))
4286            if self._final_result is not _NOT_SET:
4287                run_now = True
4288        if run_now:
4289            fn(self._final_result, *args, **kwargs)
4290        return self
4291
4292    def add_errback(self, fn, *args, **kwargs):
4293        """
4294        Like :meth:`.add_callback()`, but handles error cases.
4295        An Exception instance will be passed as the first positional argument
4296        to `fn`.
4297        """
4298        run_now = False
4299        with self._callback_lock:
4300            # Always add fn to self._errbacks, even when we're about to execute
4301            # it, to prevent races with functions like start_fetching_next_page
4302            # that reset _final_exception
4303            self._errbacks.append((fn, args, kwargs))
4304            if self._final_exception:
4305                run_now = True
4306        if run_now:
4307            fn(self._final_exception, *args, **kwargs)
4308        return self
4309
4310    def add_callbacks(self, callback, errback,
4311                      callback_args=(), callback_kwargs=None,
4312                      errback_args=(), errback_kwargs=None):
4313        """
4314        A convenient combination of :meth:`.add_callback()` and
4315        :meth:`.add_errback()`.
4316
4317        Example usage::
4318
4319            >>> session = cluster.connect()
4320            >>> query = "SELECT * FROM mycf"
4321            >>> future = session.execute_async(query)
4322
4323            >>> def log_results(results, level='debug'):
4324            ...     for row in results:
4325            ...         log.log(level, "Result: %s", row)
4326
4327            >>> def log_error(exc, query):
4328            ...     log.error("Query '%s' failed: %s", query, exc)
4329
4330            >>> future.add_callbacks(
4331            ...     callback=log_results, callback_kwargs={'level': 'info'},
4332            ...     errback=log_error, errback_args=(query,))
4333
4334        """
4335        self.add_callback(callback, *callback_args, **(callback_kwargs or {}))
4336        self.add_errback(errback, *errback_args, **(errback_kwargs or {}))
4337
4338    def clear_callbacks(self):
4339        with self._callback_lock:
4340            self._callbacks = []
4341            self._errbacks = []
4342
4343    def __str__(self):
4344        result = "(no result yet)" if self._final_result is _NOT_SET else self._final_result
4345        return "<ResponseFuture: query='%s' request_id=%s result=%s exception=%s coordinator_host=%s>" \
4346               % (self.query, self._req_id, result, self._final_exception, self.coordinator_host)
4347    __repr__ = __str__
4348
4349
4350class QueryExhausted(Exception):
4351    """
4352    Raised when :meth:`.ResponseFuture.start_fetching_next_page()` is called and
4353    there are no more pages.  You can check :attr:`.ResponseFuture.has_more_pages`
4354    before calling to avoid this.
4355
4356    .. versionadded:: 2.0.0
4357    """
4358    pass
4359
4360
4361class ResultSet(object):
4362    """
4363    An iterator over the rows from a query result. Also supplies basic equality
4364    and indexing methods for backward-compatability. These methods materialize
4365    the entire result set (loading all pages), and should only be used if the
4366    total result size is understood. Warnings are emitted when paged results
4367    are materialized in this fashion.
4368
4369    You can treat this as a normal iterator over rows::
4370
4371        >>> from cassandra.query import SimpleStatement
4372        >>> statement = SimpleStatement("SELECT * FROM users", fetch_size=10)
4373        >>> for user_row in session.execute(statement):
4374        ...     process_user(user_row)
4375
4376    Whenever there are no more rows in the current page, the next page will
4377    be fetched transparently.  However, note that it *is* possible for
4378    an :class:`Exception` to be raised while fetching the next page, just
4379    like you might see on a normal call to ``session.execute()``.
4380    """
4381
4382    def __init__(self, response_future, initial_response):
4383        self.response_future = response_future
4384        self.column_names = response_future._col_names
4385        self.column_types = response_future._col_types
4386        self._set_current_rows(initial_response)
4387        self._page_iter = None
4388        self._list_mode = False
4389
4390    @property
4391    def has_more_pages(self):
4392        """
4393        True if the last response indicated more pages; False otherwise
4394        """
4395        return self.response_future.has_more_pages
4396
4397    @property
4398    def current_rows(self):
4399        """
4400        The list of current page rows. May be empty if the result was empty,
4401        or this is the last page.
4402        """
4403        return self._current_rows or []
4404
4405    def one(self):
4406        """
4407        Return a single row of the results or None if empty. This is basically
4408        a shortcut to `result_set.current_rows[0]` and should only be used when
4409        you know a query returns a single row. Consider using an iterator if the
4410        ResultSet contains more than one row.
4411        """
4412        row = None
4413        if self._current_rows:
4414            try:
4415                row = self._current_rows[0]
4416            except TypeError:  # generator object is not subscriptable, PYTHON-1026
4417                row = next(iter(self._current_rows))
4418
4419        return row
4420
4421    def __iter__(self):
4422        if self._list_mode:
4423            return iter(self._current_rows)
4424        self._page_iter = iter(self._current_rows)
4425        return self
4426
4427    def next(self):
4428        try:
4429            return next(self._page_iter)
4430        except StopIteration:
4431            if not self.response_future.has_more_pages:
4432                if not self._list_mode:
4433                    self._current_rows = []
4434                raise
4435
4436        self.fetch_next_page()
4437        self._page_iter = iter(self._current_rows)
4438
4439        return next(self._page_iter)
4440
4441    __next__ = next
4442
4443    def fetch_next_page(self):
4444        """
4445        Manually, synchronously fetch the next page. Supplied for manually retrieving pages
4446        and inspecting :meth:`~.current_page`. It is not necessary to call this when iterating
4447        through results; paging happens implicitly in iteration.
4448        """
4449        if self.response_future.has_more_pages:
4450            self.response_future.start_fetching_next_page()
4451            result = self.response_future.result()
4452            self._current_rows = result._current_rows  # ResultSet has already _set_current_rows to the appropriate form
4453        else:
4454            self._current_rows = []
4455
4456    def _set_current_rows(self, result):
4457        if isinstance(result, Mapping):
4458            self._current_rows = [result] if result else []
4459            return
4460        try:
4461            iter(result)  # can't check directly for generator types because cython generators are different
4462            self._current_rows = result
4463        except TypeError:
4464            self._current_rows = [result] if result else []
4465
4466    def _fetch_all(self):
4467        self._current_rows = list(self)
4468        self._page_iter = None
4469
4470    def _enter_list_mode(self, operator):
4471        if self._list_mode:
4472            return
4473        if self._page_iter:
4474            raise RuntimeError("Cannot use %s when results have been iterated." % operator)
4475        if self.response_future.has_more_pages:
4476            log.warning("Using %s on paged results causes entire result set to be materialized.", operator)
4477        self._fetch_all()  # done regardless of paging status in case the row factory produces a generator
4478        self._list_mode = True
4479
4480    def __eq__(self, other):
4481        self._enter_list_mode("equality operator")
4482        return self._current_rows == other
4483
4484    def __getitem__(self, i):
4485        if i == 0:
4486            warn("ResultSet indexing support will be removed in 4.0. Consider using "
4487                 "ResultSet.one() to get a single row.", DeprecationWarning)
4488        self._enter_list_mode("index operator")
4489        return self._current_rows[i]
4490
4491    def __nonzero__(self):
4492        return bool(self._current_rows)
4493
4494    __bool__ = __nonzero__
4495
4496    def get_query_trace(self, max_wait_sec=None):
4497        """
4498        Gets the last query trace from the associated future.
4499        See :meth:`.ResponseFuture.get_query_trace` for details.
4500        """
4501        return self.response_future.get_query_trace(max_wait_sec)
4502
4503    def get_all_query_traces(self, max_wait_sec_per=None):
4504        """
4505        Gets all query traces from the associated future.
4506        See :meth:`.ResponseFuture.get_all_query_traces` for details.
4507        """
4508        return self.response_future.get_all_query_traces(max_wait_sec_per)
4509
4510    @property
4511    def was_applied(self):
4512        """
4513        For LWT results, returns whether the transaction was applied.
4514
4515        Result is indeterminate if called on a result that was not an LWT request or on
4516        a :class:`.query.BatchStatement` containing LWT. In the latter case either all the batch
4517        succeeds or fails.
4518
4519        Only valid when one of the of the internal row factories is in use.
4520        """
4521        if self.response_future.row_factory not in (named_tuple_factory, dict_factory, tuple_factory):
4522            raise RuntimeError("Cannot determine LWT result with row factory %s" % (self.response_future.row_factory,))
4523
4524        is_batch_statement = isinstance(self.response_future.query, BatchStatement)
4525        if is_batch_statement and (not self.column_names or self.column_names[0] != "[applied]"):
4526            raise RuntimeError("No LWT were present in the BatchStatement")
4527
4528        if not is_batch_statement and len(self.current_rows) != 1:
4529            raise RuntimeError("LWT result should have exactly one row. This has %d." % (len(self.current_rows)))
4530
4531        row = self.current_rows[0]
4532        if isinstance(row, tuple):
4533            return row[0]
4534        else:
4535            return row['[applied]']
4536
4537    @property
4538    def paging_state(self):
4539        """
4540        Server paging state of the query. Can be `None` if the query was not paged.
4541
4542        The driver treats paging state as opaque, but it may contain primary key data, so applications may want to
4543        avoid sending this to untrusted parties.
4544        """
4545        return self.response_future._paging_state
4546