1# Copyright DataStax, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from binascii import unhexlify
16from bisect import bisect_left
17from collections import defaultdict, Mapping
18from functools import total_ordering
19from hashlib import md5
20from itertools import islice, cycle
21import json
22import logging
23import re
24import six
25from six.moves import zip
26import sys
27from threading import RLock
28import struct
29import random
30
31murmur3 = None
32try:
33    from cassandra.murmur3 import murmur3
34except ImportError as e:
35    pass
36
37from cassandra import SignatureDescriptor, ConsistencyLevel, InvalidRequest, Unauthorized
38import cassandra.cqltypes as types
39from cassandra.encoder import Encoder
40from cassandra.marshal import varint_unpack
41from cassandra.protocol import QueryMessage
42from cassandra.query import dict_factory, bind_params
43from cassandra.util import OrderedDict, Version
44from cassandra.pool import HostDistance
45from cassandra.connection import EndPoint
46
47log = logging.getLogger(__name__)
48
49cql_keywords = set((
50    'add', 'aggregate', 'all', 'allow', 'alter', 'and', 'apply', 'as', 'asc', 'ascii', 'authorize', 'batch', 'begin',
51    'bigint', 'blob', 'boolean', 'by', 'called', 'clustering', 'columnfamily', 'compact', 'contains', 'count',
52    'counter', 'create', 'custom', 'date', 'decimal', 'delete', 'desc', 'describe', 'distinct', 'double', 'drop',
53    'entries', 'execute', 'exists', 'filtering', 'finalfunc', 'float', 'from', 'frozen', 'full', 'function',
54    'functions', 'grant', 'if', 'in', 'index', 'inet', 'infinity', 'initcond', 'input', 'insert', 'int', 'into', 'is', 'json',
55    'key', 'keys', 'keyspace', 'keyspaces', 'language', 'limit', 'list', 'login', 'map', 'materialized', 'modify', 'nan', 'nologin',
56    'norecursive', 'nosuperuser', 'not', 'null', 'of', 'on', 'options', 'or', 'order', 'password', 'permission',
57    'permissions', 'primary', 'rename', 'replace', 'returns', 'revoke', 'role', 'roles', 'schema', 'select', 'set',
58    'sfunc', 'smallint', 'static', 'storage', 'stype', 'superuser', 'table', 'text', 'time', 'timestamp', 'timeuuid',
59    'tinyint', 'to', 'token', 'trigger', 'truncate', 'ttl', 'tuple', 'type', 'unlogged', 'update', 'use', 'user',
60    'users', 'using', 'uuid', 'values', 'varchar', 'varint', 'view', 'where', 'with', 'writetime'
61))
62"""
63Set of keywords in CQL.
64
65Derived from .../cassandra/src/java/org/apache/cassandra/cql3/Cql.g
66"""
67
68cql_keywords_unreserved = set((
69    'aggregate', 'all', 'as', 'ascii', 'bigint', 'blob', 'boolean', 'called', 'clustering', 'compact', 'contains',
70    'count', 'counter', 'custom', 'date', 'decimal', 'distinct', 'double', 'exists', 'filtering', 'finalfunc', 'float',
71    'frozen', 'function', 'functions', 'inet', 'initcond', 'input', 'int', 'json', 'key', 'keys', 'keyspaces',
72    'language', 'list', 'login', 'map', 'nologin', 'nosuperuser', 'options', 'password', 'permission', 'permissions',
73    'returns', 'role', 'roles', 'sfunc', 'smallint', 'static', 'storage', 'stype', 'superuser', 'text', 'time',
74    'timestamp', 'timeuuid', 'tinyint', 'trigger', 'ttl', 'tuple', 'type', 'user', 'users', 'uuid', 'values', 'varchar',
75    'varint', 'writetime'
76))
77"""
78Set of unreserved keywords in CQL.
79
80Derived from .../cassandra/src/java/org/apache/cassandra/cql3/Cql.g
81"""
82
83cql_keywords_reserved = cql_keywords - cql_keywords_unreserved
84"""
85Set of reserved keywords in CQL.
86"""
87
88_encoder = Encoder()
89
90
91class Metadata(object):
92    """
93    Holds a representation of the cluster schema and topology.
94    """
95
96    cluster_name = None
97    """ The string name of the cluster. """
98
99    keyspaces = None
100    """
101    A map from keyspace names to matching :class:`~.KeyspaceMetadata` instances.
102    """
103
104    partitioner = None
105    """
106    The string name of the partitioner for the cluster.
107    """
108
109    token_map = None
110    """ A :class:`~.TokenMap` instance describing the ring topology. """
111
112    def __init__(self):
113        self.keyspaces = {}
114        self._hosts = {}
115        self._hosts_lock = RLock()
116
117    def export_schema_as_string(self):
118        """
119        Returns a string that can be executed as a query in order to recreate
120        the entire schema.  The string is formatted to be human readable.
121        """
122        return "\n\n".join(ks.export_as_string() for ks in self.keyspaces.values())
123
124    def refresh(self, connection, timeout, target_type=None, change_type=None, **kwargs):
125
126        server_version = self.get_host(connection.endpoint).release_version
127        parser = get_schema_parser(connection, server_version, timeout)
128
129        if not target_type:
130            self._rebuild_all(parser)
131            return
132
133        tt_lower = target_type.lower()
134        try:
135            parse_method = getattr(parser, 'get_' + tt_lower)
136            meta = parse_method(self.keyspaces, **kwargs)
137            if meta:
138                update_method = getattr(self, '_update_' + tt_lower)
139                if tt_lower == 'keyspace' and connection.protocol_version < 3:
140                    # we didn't have 'type' target in legacy protocol versions, so we need to query those too
141                    user_types = parser.get_types_map(self.keyspaces, **kwargs)
142                    self._update_keyspace(meta, user_types)
143                else:
144                    update_method(meta)
145            else:
146                drop_method = getattr(self, '_drop_' + tt_lower)
147                drop_method(**kwargs)
148        except AttributeError:
149            raise ValueError("Unknown schema target_type: '%s'" % target_type)
150
151    def _rebuild_all(self, parser):
152        current_keyspaces = set()
153        for keyspace_meta in parser.get_all_keyspaces():
154            current_keyspaces.add(keyspace_meta.name)
155            old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None)
156            self.keyspaces[keyspace_meta.name] = keyspace_meta
157            if old_keyspace_meta:
158                self._keyspace_updated(keyspace_meta.name)
159            else:
160                self._keyspace_added(keyspace_meta.name)
161
162        # remove not-just-added keyspaces
163        removed_keyspaces = [name for name in self.keyspaces.keys()
164                             if name not in current_keyspaces]
165        self.keyspaces = dict((name, meta) for name, meta in self.keyspaces.items()
166                              if name in current_keyspaces)
167        for ksname in removed_keyspaces:
168            self._keyspace_removed(ksname)
169
170    def _update_keyspace(self, keyspace_meta, new_user_types=None):
171        ks_name = keyspace_meta.name
172        old_keyspace_meta = self.keyspaces.get(ks_name, None)
173        self.keyspaces[ks_name] = keyspace_meta
174        if old_keyspace_meta:
175            keyspace_meta.tables = old_keyspace_meta.tables
176            keyspace_meta.user_types = new_user_types if new_user_types is not None else old_keyspace_meta.user_types
177            keyspace_meta.indexes = old_keyspace_meta.indexes
178            keyspace_meta.functions = old_keyspace_meta.functions
179            keyspace_meta.aggregates = old_keyspace_meta.aggregates
180            keyspace_meta.views = old_keyspace_meta.views
181            if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy):
182                self._keyspace_updated(ks_name)
183        else:
184            self._keyspace_added(ks_name)
185
186    def _drop_keyspace(self, keyspace):
187        if self.keyspaces.pop(keyspace, None):
188            self._keyspace_removed(keyspace)
189
190    def _update_table(self, meta):
191        try:
192            keyspace_meta = self.keyspaces[meta.keyspace_name]
193            # this is unfortunate, but protocol v4 does not differentiate
194            # between events for tables and views. <parser>.get_table will
195            # return one or the other based on the query results.
196            # Here we deal with that.
197            if isinstance(meta, TableMetadata):
198                keyspace_meta._add_table_metadata(meta)
199            else:
200                keyspace_meta._add_view_metadata(meta)
201        except KeyError:
202            # can happen if keyspace disappears while processing async event
203            pass
204
205    def _drop_table(self, keyspace, table):
206        try:
207            keyspace_meta = self.keyspaces[keyspace]
208            keyspace_meta._drop_table_metadata(table)  # handles either table or view
209        except KeyError:
210            # can happen if keyspace disappears while processing async event
211            pass
212
213    def _update_type(self, type_meta):
214        try:
215            self.keyspaces[type_meta.keyspace].user_types[type_meta.name] = type_meta
216        except KeyError:
217            # can happen if keyspace disappears while processing async event
218            pass
219
220    def _drop_type(self, keyspace, type):
221        try:
222            self.keyspaces[keyspace].user_types.pop(type, None)
223        except KeyError:
224            # can happen if keyspace disappears while processing async event
225            pass
226
227    def _update_function(self, function_meta):
228        try:
229            self.keyspaces[function_meta.keyspace].functions[function_meta.signature] = function_meta
230        except KeyError:
231            # can happen if keyspace disappears while processing async event
232            pass
233
234    def _drop_function(self, keyspace, function):
235        try:
236            self.keyspaces[keyspace].functions.pop(function.signature, None)
237        except KeyError:
238            pass
239
240    def _update_aggregate(self, aggregate_meta):
241        try:
242            self.keyspaces[aggregate_meta.keyspace].aggregates[aggregate_meta.signature] = aggregate_meta
243        except KeyError:
244            pass
245
246    def _drop_aggregate(self, keyspace, aggregate):
247        try:
248            self.keyspaces[keyspace].aggregates.pop(aggregate.signature, None)
249        except KeyError:
250            pass
251
252    def _keyspace_added(self, ksname):
253        if self.token_map:
254            self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
255
256    def _keyspace_updated(self, ksname):
257        if self.token_map:
258            self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
259
260    def _keyspace_removed(self, ksname):
261        if self.token_map:
262            self.token_map.remove_keyspace(ksname)
263
264    def rebuild_token_map(self, partitioner, token_map):
265        """
266        Rebuild our view of the topology from fresh rows from the
267        system topology tables.
268        For internal use only.
269        """
270        self.partitioner = partitioner
271        if partitioner.endswith('RandomPartitioner'):
272            token_class = MD5Token
273        elif partitioner.endswith('Murmur3Partitioner'):
274            token_class = Murmur3Token
275        elif partitioner.endswith('ByteOrderedPartitioner'):
276            token_class = BytesToken
277        else:
278            self.token_map = None
279            return
280
281        token_to_host_owner = {}
282        ring = []
283        for host, token_strings in six.iteritems(token_map):
284            for token_string in token_strings:
285                token = token_class.from_string(token_string)
286                ring.append(token)
287                token_to_host_owner[token] = host
288
289        all_tokens = sorted(ring)
290        self.token_map = TokenMap(
291            token_class, token_to_host_owner, all_tokens, self)
292
293    def get_replicas(self, keyspace, key):
294        """
295        Returns a list of :class:`.Host` instances that are replicas for a given
296        partition key.
297        """
298        t = self.token_map
299        if not t:
300            return []
301        try:
302            return t.get_replicas(keyspace, t.token_class.from_key(key))
303        except NoMurmur3:
304            return []
305
306    def can_support_partitioner(self):
307        if self.partitioner.endswith('Murmur3Partitioner') and murmur3 is None:
308            return False
309        else:
310            return True
311
312    def add_or_return_host(self, host):
313        """
314        Returns a tuple (host, new), where ``host`` is a Host
315        instance, and ``new`` is a bool indicating whether
316        the host was newly added.
317        """
318        with self._hosts_lock:
319            try:
320                return self._hosts[host.endpoint], False
321            except KeyError:
322                self._hosts[host.endpoint] = host
323                return host, True
324
325    def remove_host(self, host):
326        with self._hosts_lock:
327            return bool(self._hosts.pop(host.endpoint, False))
328
329    def get_host(self, endpoint_or_address):
330        """
331        Find a host in the metadata for a specific endpoint. If a string inet address is passed,
332        iterate all hosts to match the :attr:`~.pool.Host.broadcast_rpc_address` attribute.
333        """
334        if not isinstance(endpoint_or_address, EndPoint):
335            return self._get_host_by_address(endpoint_or_address)
336
337        return self._hosts.get(endpoint_or_address)
338
339    def _get_host_by_address(self, address):
340        for host in six.itervalues(self._hosts):
341            if host.broadcast_rpc_address == address:
342                return host
343        return None
344
345    def all_hosts(self):
346        """
347        Returns a list of all known :class:`.Host` instances in the cluster.
348        """
349        with self._hosts_lock:
350            return list(self._hosts.values())
351
352
353REPLICATION_STRATEGY_CLASS_PREFIX = "org.apache.cassandra.locator."
354
355
356def trim_if_startswith(s, prefix):
357    if s.startswith(prefix):
358        return s[len(prefix):]
359    return s
360
361
362_replication_strategies = {}
363
364
365class ReplicationStrategyTypeType(type):
366    def __new__(metacls, name, bases, dct):
367        dct.setdefault('name', name)
368        cls = type.__new__(metacls, name, bases, dct)
369        if not name.startswith('_'):
370            _replication_strategies[name] = cls
371        return cls
372
373
374@six.add_metaclass(ReplicationStrategyTypeType)
375class _ReplicationStrategy(object):
376    options_map = None
377
378    @classmethod
379    def create(cls, strategy_class, options_map):
380        if not strategy_class:
381            return None
382
383        strategy_name = trim_if_startswith(strategy_class, REPLICATION_STRATEGY_CLASS_PREFIX)
384
385        rs_class = _replication_strategies.get(strategy_name, None)
386        if rs_class is None:
387            rs_class = _UnknownStrategyBuilder(strategy_name)
388            _replication_strategies[strategy_name] = rs_class
389
390        try:
391            rs_instance = rs_class(options_map)
392        except Exception as exc:
393            log.warning("Failed creating %s with options %s: %s", strategy_name, options_map, exc)
394            return None
395
396        return rs_instance
397
398    def make_token_replica_map(self, token_to_host_owner, ring):
399        raise NotImplementedError()
400
401    def export_for_schema(self):
402        raise NotImplementedError()
403
404
405ReplicationStrategy = _ReplicationStrategy
406
407
408class _UnknownStrategyBuilder(object):
409    def __init__(self, name):
410        self.name = name
411
412    def __call__(self, options_map):
413        strategy_instance = _UnknownStrategy(self.name, options_map)
414        return strategy_instance
415
416
417class _UnknownStrategy(ReplicationStrategy):
418    def __init__(self, name, options_map):
419        self.name = name
420        self.options_map = options_map.copy() if options_map is not None else dict()
421        self.options_map['class'] = self.name
422
423    def __eq__(self, other):
424        return (isinstance(other, _UnknownStrategy) and
425                self.name == other.name and
426                self.options_map == other.options_map)
427
428    def export_for_schema(self):
429        """
430        Returns a string version of these replication options which are
431        suitable for use in a CREATE KEYSPACE statement.
432        """
433        if self.options_map:
434            return dict((str(key), str(value)) for key, value in self.options_map.items())
435        return "{'class': '%s'}" % (self.name, )
436
437    def make_token_replica_map(self, token_to_host_owner, ring):
438        return {}
439
440
441class SimpleStrategy(ReplicationStrategy):
442
443    replication_factor = None
444    """
445    The replication factor for this keyspace.
446    """
447
448    def __init__(self, options_map):
449        try:
450            self.replication_factor = int(options_map['replication_factor'])
451        except Exception:
452            raise ValueError("SimpleStrategy requires an integer 'replication_factor' option")
453
454    def make_token_replica_map(self, token_to_host_owner, ring):
455        replica_map = {}
456        for i in range(len(ring)):
457            j, hosts = 0, list()
458            while len(hosts) < self.replication_factor and j < len(ring):
459                token = ring[(i + j) % len(ring)]
460                host = token_to_host_owner[token]
461                if host not in hosts:
462                    hosts.append(host)
463                j += 1
464
465            replica_map[ring[i]] = hosts
466        return replica_map
467
468    def export_for_schema(self):
469        """
470        Returns a string version of these replication options which are
471        suitable for use in a CREATE KEYSPACE statement.
472        """
473        return "{'class': 'SimpleStrategy', 'replication_factor': '%d'}" \
474               % (self.replication_factor,)
475
476    def __eq__(self, other):
477        if not isinstance(other, SimpleStrategy):
478            return False
479
480        return self.replication_factor == other.replication_factor
481
482
483class NetworkTopologyStrategy(ReplicationStrategy):
484
485    dc_replication_factors = None
486    """
487    A map of datacenter names to the replication factor for that DC.
488    """
489
490    def __init__(self, dc_replication_factors):
491        self.dc_replication_factors = dict(
492            (str(k), int(v)) for k, v in dc_replication_factors.items())
493
494    def make_token_replica_map(self, token_to_host_owner, ring):
495        dc_rf_map = dict((dc, int(rf))
496                         for dc, rf in self.dc_replication_factors.items() if rf > 0)
497
498        # build a map of DCs to lists of indexes into `ring` for tokens that
499        # belong to that DC
500        dc_to_token_offset = defaultdict(list)
501        dc_racks = defaultdict(set)
502        hosts_per_dc = defaultdict(set)
503        for i, token in enumerate(ring):
504            host = token_to_host_owner[token]
505            dc_to_token_offset[host.datacenter].append(i)
506            if host.datacenter and host.rack:
507                dc_racks[host.datacenter].add(host.rack)
508                hosts_per_dc[host.datacenter].add(host)
509
510        # A map of DCs to an index into the dc_to_token_offset value for that dc.
511        # This is how we keep track of advancing around the ring for each DC.
512        dc_to_current_index = defaultdict(int)
513
514        replica_map = defaultdict(list)
515        for i in range(len(ring)):
516            replicas = replica_map[ring[i]]
517
518            # go through each DC and find the replicas in that DC
519            for dc in dc_to_token_offset.keys():
520                if dc not in dc_rf_map:
521                    continue
522
523                # advance our per-DC index until we're up to at least the
524                # current token in the ring
525                token_offsets = dc_to_token_offset[dc]
526                index = dc_to_current_index[dc]
527                num_tokens = len(token_offsets)
528                while index < num_tokens and token_offsets[index] < i:
529                    index += 1
530                dc_to_current_index[dc] = index
531
532                replicas_remaining = dc_rf_map[dc]
533                replicas_this_dc = 0
534                skipped_hosts = []
535                racks_placed = set()
536                racks_this_dc = dc_racks[dc]
537                hosts_this_dc = len(hosts_per_dc[dc])
538                for token_offset in islice(cycle(token_offsets), index, index + num_tokens):
539                    host = token_to_host_owner[ring[token_offset]]
540                    if replicas_remaining == 0 or replicas_this_dc == hosts_this_dc:
541                        break
542
543                    if host in replicas:
544                        continue
545
546                    if host.rack in racks_placed and len(racks_placed) < len(racks_this_dc):
547                        skipped_hosts.append(host)
548                        continue
549
550                    replicas.append(host)
551                    replicas_this_dc += 1
552                    replicas_remaining -= 1
553                    racks_placed.add(host.rack)
554
555                    if len(racks_placed) == len(racks_this_dc):
556                        for host in skipped_hosts:
557                            if replicas_remaining == 0:
558                                break
559                            replicas.append(host)
560                            replicas_remaining -= 1
561                        del skipped_hosts[:]
562
563        return replica_map
564
565    def export_for_schema(self):
566        """
567        Returns a string version of these replication options which are
568        suitable for use in a CREATE KEYSPACE statement.
569        """
570        ret = "{'class': 'NetworkTopologyStrategy'"
571        for dc, repl_factor in sorted(self.dc_replication_factors.items()):
572            ret += ", '%s': '%d'" % (dc, repl_factor)
573        return ret + "}"
574
575    def __eq__(self, other):
576        if not isinstance(other, NetworkTopologyStrategy):
577            return False
578
579        return self.dc_replication_factors == other.dc_replication_factors
580
581
582class LocalStrategy(ReplicationStrategy):
583    def __init__(self, options_map):
584        pass
585
586    def make_token_replica_map(self, token_to_host_owner, ring):
587        return {}
588
589    def export_for_schema(self):
590        """
591        Returns a string version of these replication options which are
592        suitable for use in a CREATE KEYSPACE statement.
593        """
594        return "{'class': 'LocalStrategy'}"
595
596    def __eq__(self, other):
597        return isinstance(other, LocalStrategy)
598
599
600class KeyspaceMetadata(object):
601    """
602    A representation of the schema for a single keyspace.
603    """
604
605    name = None
606    """ The string name of the keyspace. """
607
608    durable_writes = True
609    """
610    A boolean indicating whether durable writes are enabled for this keyspace
611    or not.
612    """
613
614    replication_strategy = None
615    """
616    A :class:`.ReplicationStrategy` subclass object.
617    """
618
619    tables = None
620    """
621    A map from table names to instances of :class:`~.TableMetadata`.
622    """
623
624    indexes = None
625    """
626    A dict mapping index names to :class:`.IndexMetadata` instances.
627    """
628
629    user_types = None
630    """
631    A map from user-defined type names to instances of :class:`~cassandra.metadata.UserType`.
632
633    .. versionadded:: 2.1.0
634    """
635
636    functions = None
637    """
638    A map from user-defined function signatures to instances of :class:`~cassandra.metadata.Function`.
639
640    .. versionadded:: 2.6.0
641    """
642
643    aggregates = None
644    """
645    A map from user-defined aggregate signatures to instances of :class:`~cassandra.metadata.Aggregate`.
646
647    .. versionadded:: 2.6.0
648    """
649
650    views = None
651    """
652    A dict mapping view names to :class:`.MaterializedViewMetadata` instances.
653    """
654
655    virtual = False
656    """
657    A boolean indicating if this is a virtual keyspace or not. Always ``False``
658    for clusters running pre-4.0 versions of Cassandra.
659
660    .. versionadded:: 3.15
661    """
662
663    _exc_info = None
664    """ set if metadata parsing failed """
665
666    def __init__(self, name, durable_writes, strategy_class, strategy_options):
667        self.name = name
668        self.durable_writes = durable_writes
669        self.replication_strategy = ReplicationStrategy.create(strategy_class, strategy_options)
670        self.tables = {}
671        self.indexes = {}
672        self.user_types = {}
673        self.functions = {}
674        self.aggregates = {}
675        self.views = {}
676
677    def export_as_string(self):
678        """
679        Returns a CQL query string that can be used to recreate the entire keyspace,
680        including user-defined types and tables.
681        """
682        cql = "\n\n".join([self.as_cql_query() + ';'] +
683                          self.user_type_strings() +
684                          [f.export_as_string() for f in self.functions.values()] +
685                          [a.export_as_string() for a in self.aggregates.values()] +
686                          [t.export_as_string() for t in self.tables.values()])
687        if self._exc_info:
688            import traceback
689            ret = "/*\nWarning: Keyspace %s is incomplete because of an error processing metadata.\n" % \
690                  (self.name)
691            for line in traceback.format_exception(*self._exc_info):
692                ret += line
693            ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % cql
694            return ret
695        if self.virtual:
696            return ("/*\nWarning: Keyspace {ks} is a virtual keyspace and cannot be recreated with CQL.\n"
697                    "Structure, for reference:*/\n"
698                    "{cql}\n"
699                    "").format(ks=self.name, cql=cql)
700        return cql
701
702    def as_cql_query(self):
703        """
704        Returns a CQL query string that can be used to recreate just this keyspace,
705        not including user-defined types and tables.
706        """
707        if self.virtual:
708            return "// VIRTUAL KEYSPACE {}".format(protect_name(self.name))
709        ret = "CREATE KEYSPACE %s WITH replication = %s " % (
710            protect_name(self.name),
711            self.replication_strategy.export_for_schema())
712        return ret + (' AND durable_writes = %s' % ("true" if self.durable_writes else "false"))
713
714    def user_type_strings(self):
715        user_type_strings = []
716        user_types = self.user_types.copy()
717        keys = sorted(user_types.keys())
718        for k in keys:
719            if k in user_types:
720                self.resolve_user_types(k, user_types, user_type_strings)
721        return user_type_strings
722
723    def resolve_user_types(self, key, user_types, user_type_strings):
724        user_type = user_types.pop(key)
725        for type_name in user_type.field_types:
726            for sub_type in types.cql_types_from_string(type_name):
727                if sub_type in user_types:
728                    self.resolve_user_types(sub_type, user_types, user_type_strings)
729        user_type_strings.append(user_type.export_as_string())
730
731    def _add_table_metadata(self, table_metadata):
732        old_indexes = {}
733        old_meta = self.tables.get(table_metadata.name, None)
734        if old_meta:
735            # views are not queried with table, so they must be transferred to new
736            table_metadata.views = old_meta.views
737            # indexes will be updated with what is on the new metadata
738            old_indexes = old_meta.indexes
739
740        # note the intentional order of add before remove
741        # this makes sure the maps are never absent something that existed before this update
742        for index_name, index_metadata in six.iteritems(table_metadata.indexes):
743            self.indexes[index_name] = index_metadata
744
745        for index_name in (n for n in old_indexes if n not in table_metadata.indexes):
746            self.indexes.pop(index_name, None)
747
748        self.tables[table_metadata.name] = table_metadata
749
750    def _drop_table_metadata(self, table_name):
751        table_meta = self.tables.pop(table_name, None)
752        if table_meta:
753            for index_name in table_meta.indexes:
754                self.indexes.pop(index_name, None)
755            for view_name in table_meta.views:
756                self.views.pop(view_name, None)
757            return
758        # we can't tell table drops from views, so drop both
759        # (name is unique among them, within a keyspace)
760        view_meta = self.views.pop(table_name, None)
761        if view_meta:
762            try:
763                self.tables[view_meta.base_table_name].views.pop(table_name, None)
764            except KeyError:
765                pass
766
767    def _add_view_metadata(self, view_metadata):
768        try:
769            self.tables[view_metadata.base_table_name].views[view_metadata.name] = view_metadata
770            self.views[view_metadata.name] = view_metadata
771        except KeyError:
772            pass
773
774
775class UserType(object):
776    """
777    A user defined type, as created by ``CREATE TYPE`` statements.
778
779    User-defined types were introduced in Cassandra 2.1.
780
781    .. versionadded:: 2.1.0
782    """
783
784    keyspace = None
785    """
786    The string name of the keyspace in which this type is defined.
787    """
788
789    name = None
790    """
791    The name of this type.
792    """
793
794    field_names = None
795    """
796    An ordered list of the names for each field in this user-defined type.
797    """
798
799    field_types = None
800    """
801    An ordered list of the types for each field in this user-defined type.
802    """
803
804    def __init__(self, keyspace, name, field_names, field_types):
805        self.keyspace = keyspace
806        self.name = name
807        # non-frozen collections can return None
808        self.field_names = field_names or []
809        self.field_types = field_types or []
810
811    def as_cql_query(self, formatted=False):
812        """
813        Returns a CQL query that can be used to recreate this type.
814        If `formatted` is set to :const:`True`, extra whitespace will
815        be added to make the query more readable.
816        """
817        ret = "CREATE TYPE %s.%s (%s" % (
818            protect_name(self.keyspace),
819            protect_name(self.name),
820            "\n" if formatted else "")
821
822        if formatted:
823            field_join = ",\n"
824            padding = "    "
825        else:
826            field_join = ", "
827            padding = ""
828
829        fields = []
830        for field_name, field_type in zip(self.field_names, self.field_types):
831            fields.append("%s %s" % (protect_name(field_name), field_type))
832
833        ret += field_join.join("%s%s" % (padding, field) for field in fields)
834        ret += "\n)" if formatted else ")"
835        return ret
836
837    def export_as_string(self):
838        return self.as_cql_query(formatted=True) + ';'
839
840
841class Aggregate(object):
842    """
843    A user defined aggregate function, as created by ``CREATE AGGREGATE`` statements.
844
845    Aggregate functions were introduced in Cassandra 2.2
846
847    .. versionadded:: 2.6.0
848    """
849
850    keyspace = None
851    """
852    The string name of the keyspace in which this aggregate is defined
853    """
854
855    name = None
856    """
857    The name of this aggregate
858    """
859
860    argument_types = None
861    """
862    An ordered list of the types for each argument to the aggregate
863    """
864
865    final_func = None
866    """
867    Name of a final function
868    """
869
870    initial_condition = None
871    """
872    Initial condition of the aggregate
873    """
874
875    return_type = None
876    """
877    Return type of the aggregate
878    """
879
880    state_func = None
881    """
882    Name of a state function
883    """
884
885    state_type = None
886    """
887    Type of the aggregate state
888    """
889
890    def __init__(self, keyspace, name, argument_types, state_func,
891                 state_type, final_func, initial_condition, return_type):
892        self.keyspace = keyspace
893        self.name = name
894        self.argument_types = argument_types
895        self.state_func = state_func
896        self.state_type = state_type
897        self.final_func = final_func
898        self.initial_condition = initial_condition
899        self.return_type = return_type
900
901    def as_cql_query(self, formatted=False):
902        """
903        Returns a CQL query that can be used to recreate this aggregate.
904        If `formatted` is set to :const:`True`, extra whitespace will
905        be added to make the query more readable.
906        """
907        sep = '\n    ' if formatted else ' '
908        keyspace = protect_name(self.keyspace)
909        name = protect_name(self.name)
910        type_list = ', '.join(self.argument_types)
911        state_func = protect_name(self.state_func)
912        state_type = self.state_type
913
914        ret = "CREATE AGGREGATE %(keyspace)s.%(name)s(%(type_list)s)%(sep)s" \
915              "SFUNC %(state_func)s%(sep)s" \
916              "STYPE %(state_type)s" % locals()
917
918        ret += ''.join((sep, 'FINALFUNC ', protect_name(self.final_func))) if self.final_func else ''
919        ret += ''.join((sep, 'INITCOND ', self.initial_condition)) if self.initial_condition is not None else ''
920
921        return ret
922
923    def export_as_string(self):
924        return self.as_cql_query(formatted=True) + ';'
925
926    @property
927    def signature(self):
928        return SignatureDescriptor.format_signature(self.name, self.argument_types)
929
930
931class Function(object):
932    """
933    A user defined function, as created by ``CREATE FUNCTION`` statements.
934
935    User-defined functions were introduced in Cassandra 2.2
936
937    .. versionadded:: 2.6.0
938    """
939
940    keyspace = None
941    """
942    The string name of the keyspace in which this function is defined
943    """
944
945    name = None
946    """
947    The name of this function
948    """
949
950    argument_types = None
951    """
952    An ordered list of the types for each argument to the function
953    """
954
955    argument_names = None
956    """
957    An ordered list of the names of each argument to the function
958    """
959
960    return_type = None
961    """
962    Return type of the function
963    """
964
965    language = None
966    """
967    Language of the function body
968    """
969
970    body = None
971    """
972    Function body string
973    """
974
975    called_on_null_input = None
976    """
977    Flag indicating whether this function should be called for rows with null values
978    (convenience function to avoid handling nulls explicitly if the result will just be null)
979    """
980
981    def __init__(self, keyspace, name, argument_types, argument_names,
982                 return_type, language, body, called_on_null_input):
983        self.keyspace = keyspace
984        self.name = name
985        self.argument_types = argument_types
986        # argument_types (frozen<list<>>) will always be a list
987        # argument_name is not frozen in C* < 3.0 and may return None
988        self.argument_names = argument_names or []
989        self.return_type = return_type
990        self.language = language
991        self.body = body
992        self.called_on_null_input = called_on_null_input
993
994    def as_cql_query(self, formatted=False):
995        """
996        Returns a CQL query that can be used to recreate this function.
997        If `formatted` is set to :const:`True`, extra whitespace will
998        be added to make the query more readable.
999        """
1000        sep = '\n    ' if formatted else ' '
1001        keyspace = protect_name(self.keyspace)
1002        name = protect_name(self.name)
1003        arg_list = ', '.join(["%s %s" % (protect_name(n), t)
1004                             for n, t in zip(self.argument_names, self.argument_types)])
1005        typ = self.return_type
1006        lang = self.language
1007        body = self.body
1008        on_null = "CALLED" if self.called_on_null_input else "RETURNS NULL"
1009
1010        return "CREATE FUNCTION %(keyspace)s.%(name)s(%(arg_list)s)%(sep)s" \
1011               "%(on_null)s ON NULL INPUT%(sep)s" \
1012               "RETURNS %(typ)s%(sep)s" \
1013               "LANGUAGE %(lang)s%(sep)s" \
1014               "AS $$%(body)s$$" % locals()
1015
1016    def export_as_string(self):
1017        return self.as_cql_query(formatted=True) + ';'
1018
1019    @property
1020    def signature(self):
1021        return SignatureDescriptor.format_signature(self.name, self.argument_types)
1022
1023
1024class TableMetadata(object):
1025    """
1026    A representation of the schema for a single table.
1027    """
1028
1029    keyspace_name = None
1030    """ String name of this Table's keyspace """
1031
1032    name = None
1033    """ The string name of the table. """
1034
1035    partition_key = None
1036    """
1037    A list of :class:`.ColumnMetadata` instances representing the columns in
1038    the partition key for this table.  This will always hold at least one
1039    column.
1040    """
1041
1042    clustering_key = None
1043    """
1044    A list of :class:`.ColumnMetadata` instances representing the columns
1045    in the clustering key for this table.  These are all of the
1046    :attr:`.primary_key` columns that are not in the :attr:`.partition_key`.
1047
1048    Note that a table may have no clustering keys, in which case this will
1049    be an empty list.
1050    """
1051
1052    @property
1053    def primary_key(self):
1054        """
1055        A list of :class:`.ColumnMetadata` representing the components of
1056        the primary key for this table.
1057        """
1058        return self.partition_key + self.clustering_key
1059
1060    columns = None
1061    """
1062    A dict mapping column names to :class:`.ColumnMetadata` instances.
1063    """
1064
1065    indexes = None
1066    """
1067    A dict mapping index names to :class:`.IndexMetadata` instances.
1068    """
1069
1070    is_compact_storage = False
1071
1072    options = None
1073    """
1074    A dict mapping table option names to their specific settings for this
1075    table.
1076    """
1077
1078    compaction_options = {
1079        "min_compaction_threshold": "min_threshold",
1080        "max_compaction_threshold": "max_threshold",
1081        "compaction_strategy_class": "class"}
1082
1083    triggers = None
1084    """
1085    A dict mapping trigger names to :class:`.TriggerMetadata` instances.
1086    """
1087
1088    views = None
1089    """
1090    A dict mapping view names to :class:`.MaterializedViewMetadata` instances.
1091    """
1092
1093    _exc_info = None
1094    """ set if metadata parsing failed """
1095
1096    virtual = False
1097    """
1098    A boolean indicating if this is a virtual table or not. Always ``False``
1099    for clusters running pre-4.0 versions of Cassandra.
1100
1101    .. versionadded:: 3.15
1102    """
1103
1104    @property
1105    def is_cql_compatible(self):
1106        """
1107        A boolean indicating if this table can be represented as CQL in export
1108        """
1109        if self.virtual:
1110            return False
1111        comparator = getattr(self, 'comparator', None)
1112        if comparator:
1113            # no compact storage with more than one column beyond PK if there
1114            # are clustering columns
1115            incompatible = (self.is_compact_storage and
1116                            len(self.columns) > len(self.primary_key) + 1 and
1117                            len(self.clustering_key) >= 1)
1118
1119            return not incompatible
1120        return True
1121
1122    extensions = None
1123    """
1124    Metadata describing configuration for table extensions
1125    """
1126
1127    def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None, virtual=False):
1128        self.keyspace_name = keyspace_name
1129        self.name = name
1130        self.partition_key = [] if partition_key is None else partition_key
1131        self.clustering_key = [] if clustering_key is None else clustering_key
1132        self.columns = OrderedDict() if columns is None else columns
1133        self.indexes = {}
1134        self.options = {} if options is None else options
1135        self.comparator = None
1136        self.triggers = OrderedDict() if triggers is None else triggers
1137        self.views = {}
1138        self.virtual = virtual
1139
1140    def export_as_string(self):
1141        """
1142        Returns a string of CQL queries that can be used to recreate this table
1143        along with all indexes on it.  The returned string is formatted to
1144        be human readable.
1145        """
1146        if self._exc_info:
1147            import traceback
1148            ret = "/*\nWarning: Table %s.%s is incomplete because of an error processing metadata.\n" % \
1149                  (self.keyspace_name, self.name)
1150            for line in traceback.format_exception(*self._exc_info):
1151                ret += line
1152            ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self._all_as_cql()
1153        elif not self.is_cql_compatible:
1154            # If we can't produce this table with CQL, comment inline
1155            ret = "/*\nWarning: Table %s.%s omitted because it has constructs not compatible with CQL (was created via legacy API).\n" % \
1156                  (self.keyspace_name, self.name)
1157            ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self._all_as_cql()
1158        elif self.virtual:
1159            ret = ('/*\nWarning: Table {ks}.{tab} is a virtual table and cannot be recreated with CQL.\n'
1160                   'Structure, for reference:\n'
1161                   '{cql}\n*/').format(ks=self.keyspace_name, tab=self.name, cql=self._all_as_cql())
1162
1163        else:
1164            ret = self._all_as_cql()
1165
1166        return ret
1167
1168    def _all_as_cql(self):
1169        ret = self.as_cql_query(formatted=True)
1170        ret += ";"
1171
1172        for index in self.indexes.values():
1173            ret += "\n%s;" % index.as_cql_query()
1174
1175        for trigger_meta in self.triggers.values():
1176            ret += "\n%s;" % (trigger_meta.as_cql_query(),)
1177
1178        for view_meta in self.views.values():
1179            ret += "\n\n%s;" % (view_meta.as_cql_query(formatted=True),)
1180
1181        if self.extensions:
1182            registry = _RegisteredExtensionType._extension_registry
1183            for k in six.viewkeys(registry) & self.extensions:  # no viewkeys on OrderedMapSerializeKey
1184                ext = registry[k]
1185                cql = ext.after_table_cql(self, k, self.extensions[k])
1186                if cql:
1187                    ret += "\n\n%s" % (cql,)
1188
1189        return ret
1190
1191    def as_cql_query(self, formatted=False):
1192        """
1193        Returns a CQL query that can be used to recreate this table (index
1194        creations are not included).  If `formatted` is set to :const:`True`,
1195        extra whitespace will be added to make the query human readable.
1196        """
1197        ret = "%s TABLE %s.%s (%s" % (
1198            ('VIRTUAL' if self.virtual else 'CREATE'),
1199            protect_name(self.keyspace_name),
1200            protect_name(self.name),
1201            "\n" if formatted else "")
1202
1203        if formatted:
1204            column_join = ",\n"
1205            padding = "    "
1206        else:
1207            column_join = ", "
1208            padding = ""
1209
1210        columns = []
1211        for col in self.columns.values():
1212            columns.append("%s %s%s" % (protect_name(col.name), col.cql_type, ' static' if col.is_static else ''))
1213
1214        if len(self.partition_key) == 1 and not self.clustering_key:
1215            columns[0] += " PRIMARY KEY"
1216
1217        ret += column_join.join("%s%s" % (padding, col) for col in columns)
1218
1219        # primary key
1220        if len(self.partition_key) > 1 or self.clustering_key:
1221            ret += "%s%sPRIMARY KEY (" % (column_join, padding)
1222
1223            if len(self.partition_key) > 1:
1224                ret += "(%s)" % ", ".join(protect_name(col.name) for col in self.partition_key)
1225            else:
1226                ret += protect_name(self.partition_key[0].name)
1227
1228            if self.clustering_key:
1229                ret += ", %s" % ", ".join(protect_name(col.name) for col in self.clustering_key)
1230
1231            ret += ")"
1232
1233        # properties
1234        ret += "%s) WITH " % ("\n" if formatted else "")
1235        ret += self._property_string(formatted, self.clustering_key, self.options, self.is_compact_storage)
1236
1237        return ret
1238
1239    @classmethod
1240    def _property_string(cls, formatted, clustering_key, options_map, is_compact_storage=False):
1241        properties = []
1242        if is_compact_storage:
1243            properties.append("COMPACT STORAGE")
1244
1245        if clustering_key:
1246            cluster_str = "CLUSTERING ORDER BY "
1247
1248            inner = []
1249            for col in clustering_key:
1250                ordering = "DESC" if col.is_reversed else "ASC"
1251                inner.append("%s %s" % (protect_name(col.name), ordering))
1252
1253            cluster_str += "(%s)" % ", ".join(inner)
1254            properties.append(cluster_str)
1255
1256        properties.extend(cls._make_option_strings(options_map))
1257
1258        join_str = "\n    AND " if formatted else " AND "
1259        return join_str.join(properties)
1260
1261    @classmethod
1262    def _make_option_strings(cls, options_map):
1263        ret = []
1264        options_copy = dict(options_map.items())
1265
1266        actual_options = json.loads(options_copy.pop('compaction_strategy_options', '{}'))
1267        value = options_copy.pop("compaction_strategy_class", None)
1268        actual_options.setdefault("class", value)
1269
1270        compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()]
1271        ret.append('compaction = {%s}' % ', '.join(compaction_option_strings))
1272
1273        for system_table_name in cls.compaction_options.keys():
1274            options_copy.pop(system_table_name, None)  # delete if present
1275        options_copy.pop('compaction_strategy_option', None)
1276
1277        if not options_copy.get('compression'):
1278            params = json.loads(options_copy.pop('compression_parameters', '{}'))
1279            param_strings = ["'%s': '%s'" % (k, v) for k, v in params.items()]
1280            ret.append('compression = {%s}' % ', '.join(param_strings))
1281
1282        for name, value in options_copy.items():
1283            if value is not None:
1284                if name == "comment":
1285                    value = value or ""
1286                ret.append("%s = %s" % (name, protect_value(value)))
1287
1288        return list(sorted(ret))
1289
1290
1291class TableExtensionInterface(object):
1292    """
1293    Defines CQL/DDL for Cassandra table extensions.
1294    """
1295    # limited API for now. Could be expanded as new extension types materialize -- "extend_option_strings", for example
1296    @classmethod
1297    def after_table_cql(cls, ext_key, ext_blob):
1298        """
1299        Called to produce CQL/DDL to follow the table definition.
1300        Should contain requisite terminating semicolon(s).
1301        """
1302        pass
1303
1304
1305class _RegisteredExtensionType(type):
1306
1307    _extension_registry = {}
1308
1309    def __new__(mcs, name, bases, dct):
1310        cls = super(_RegisteredExtensionType, mcs).__new__(mcs, name, bases, dct)
1311        if name != 'RegisteredTableExtension':
1312            mcs._extension_registry[cls.name] = cls
1313        return cls
1314
1315
1316@six.add_metaclass(_RegisteredExtensionType)
1317class RegisteredTableExtension(TableExtensionInterface):
1318    """
1319    Extending this class registers it by name (associated by key in the `system_schema.tables.extensions` map).
1320    """
1321    name = None
1322    """
1323    Name of the extension (key in the map)
1324    """
1325
1326
1327def protect_name(name):
1328    return maybe_escape_name(name)
1329
1330
1331def protect_names(names):
1332    return [protect_name(n) for n in names]
1333
1334
1335def protect_value(value):
1336    if value is None:
1337        return 'NULL'
1338    if isinstance(value, (int, float, bool)):
1339        return str(value).lower()
1340    return "'%s'" % value.replace("'", "''")
1341
1342
1343valid_cql3_word_re = re.compile(r'^[a-z][0-9a-z_]*$')
1344
1345
1346def is_valid_name(name):
1347    if name is None:
1348        return False
1349    if name.lower() in cql_keywords_reserved:
1350        return False
1351    return valid_cql3_word_re.match(name) is not None
1352
1353
1354def maybe_escape_name(name):
1355    if is_valid_name(name):
1356        return name
1357    return escape_name(name)
1358
1359
1360def escape_name(name):
1361    return '"%s"' % (name.replace('"', '""'),)
1362
1363
1364class ColumnMetadata(object):
1365    """
1366    A representation of a single column in a table.
1367    """
1368
1369    table = None
1370    """ The :class:`.TableMetadata` this column belongs to. """
1371
1372    name = None
1373    """ The string name of this column. """
1374
1375    cql_type = None
1376    """
1377    The CQL type for the column.
1378    """
1379
1380    is_static = False
1381    """
1382    If this column is static (available in Cassandra 2.1+), this will
1383    be :const:`True`, otherwise :const:`False`.
1384    """
1385
1386    is_reversed = False
1387    """
1388    If this column is reversed (DESC) as in clustering order
1389    """
1390
1391    _cass_type = None
1392
1393    def __init__(self, table_metadata, column_name, cql_type, is_static=False, is_reversed=False):
1394        self.table = table_metadata
1395        self.name = column_name
1396        self.cql_type = cql_type
1397        self.is_static = is_static
1398        self.is_reversed = is_reversed
1399
1400    def __str__(self):
1401        return "%s %s" % (self.name, self.cql_type)
1402
1403
1404class IndexMetadata(object):
1405    """
1406    A representation of a secondary index on a column.
1407    """
1408    keyspace_name = None
1409    """ A string name of the keyspace. """
1410
1411    table_name = None
1412    """ A string name of the table this index is on. """
1413
1414    name = None
1415    """ A string name for the index. """
1416
1417    kind = None
1418    """ A string representing the kind of index (COMPOSITE, CUSTOM,...). """
1419
1420    index_options = {}
1421    """ A dict of index options. """
1422
1423    def __init__(self, keyspace_name, table_name, index_name, kind, index_options):
1424        self.keyspace_name = keyspace_name
1425        self.table_name = table_name
1426        self.name = index_name
1427        self.kind = kind
1428        self.index_options = index_options
1429
1430    def as_cql_query(self):
1431        """
1432        Returns a CQL query that can be used to recreate this index.
1433        """
1434        options = dict(self.index_options)
1435        index_target = options.pop("target")
1436        if self.kind != "CUSTOM":
1437            return "CREATE INDEX %s ON %s.%s (%s)" % (
1438                protect_name(self.name),
1439                protect_name(self.keyspace_name),
1440                protect_name(self.table_name),
1441                index_target)
1442        else:
1443            class_name = options.pop("class_name")
1444            ret = "CREATE CUSTOM INDEX %s ON %s.%s (%s) USING '%s'" % (
1445                protect_name(self.name),
1446                protect_name(self.keyspace_name),
1447                protect_name(self.table_name),
1448                index_target,
1449                class_name)
1450            if options:
1451                # PYTHON-1008: `ret` will always be a unicode
1452                opts_cql_encoded = _encoder.cql_encode_all_types(options, as_text_type=True)
1453                ret += " WITH OPTIONS = %s" % opts_cql_encoded
1454            return ret
1455
1456    def export_as_string(self):
1457        """
1458        Returns a CQL query string that can be used to recreate this index.
1459        """
1460        return self.as_cql_query() + ';'
1461
1462
1463class TokenMap(object):
1464    """
1465    Information about the layout of the ring.
1466    """
1467
1468    token_class = None
1469    """
1470    A subclass of :class:`.Token`, depending on what partitioner the cluster uses.
1471    """
1472
1473    token_to_host_owner = None
1474    """
1475    A map of :class:`.Token` objects to the :class:`.Host` that owns that token.
1476    """
1477
1478    tokens_to_hosts_by_ks = None
1479    """
1480    A map of keyspace names to a nested map of :class:`.Token` objects to
1481    sets of :class:`.Host` objects.
1482    """
1483
1484    ring = None
1485    """
1486    An ordered list of :class:`.Token` instances in the ring.
1487    """
1488
1489    _metadata = None
1490
1491    def __init__(self, token_class, token_to_host_owner, all_tokens, metadata):
1492        self.token_class = token_class
1493        self.ring = all_tokens
1494        self.token_to_host_owner = token_to_host_owner
1495
1496        self.tokens_to_hosts_by_ks = {}
1497        self._metadata = metadata
1498        self._rebuild_lock = RLock()
1499
1500    def rebuild_keyspace(self, keyspace, build_if_absent=False):
1501        with self._rebuild_lock:
1502            try:
1503                current = self.tokens_to_hosts_by_ks.get(keyspace, None)
1504                if (build_if_absent and current is None) or (not build_if_absent and current is not None):
1505                    ks_meta = self._metadata.keyspaces.get(keyspace)
1506                    if ks_meta:
1507                        replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
1508                        self.tokens_to_hosts_by_ks[keyspace] = replica_map
1509            except Exception:
1510                # should not happen normally, but we don't want to blow up queries because of unexpected meta state
1511                # bypass until new map is generated
1512                self.tokens_to_hosts_by_ks[keyspace] = {}
1513                log.exception("Failed creating a token map for keyspace '%s' with %s. PLEASE REPORT THIS: https://datastax-oss.atlassian.net/projects/PYTHON", keyspace, self.token_to_host_owner)
1514
1515    def replica_map_for_keyspace(self, ks_metadata):
1516        strategy = ks_metadata.replication_strategy
1517        if strategy:
1518            return strategy.make_token_replica_map(self.token_to_host_owner, self.ring)
1519        else:
1520            return None
1521
1522    def remove_keyspace(self, keyspace):
1523        self.tokens_to_hosts_by_ks.pop(keyspace, None)
1524
1525    def get_replicas(self, keyspace, token):
1526        """
1527        Get  a set of :class:`.Host` instances representing all of the
1528        replica nodes for a given :class:`.Token`.
1529        """
1530        tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None)
1531        if tokens_to_hosts is None:
1532            self.rebuild_keyspace(keyspace, build_if_absent=True)
1533            tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None)
1534
1535        if tokens_to_hosts:
1536            # The values in self.ring correspond to the end of the
1537            # token range up to and including the value listed.
1538            point = bisect_left(self.ring, token)
1539            if point == len(self.ring):
1540                return tokens_to_hosts[self.ring[0]]
1541            else:
1542                return tokens_to_hosts[self.ring[point]]
1543        return []
1544
1545
1546@total_ordering
1547class Token(object):
1548    """
1549    Abstract class representing a token.
1550    """
1551
1552    def __init__(self, token):
1553        self.value = token
1554
1555    @classmethod
1556    def hash_fn(cls, key):
1557        return key
1558
1559    @classmethod
1560    def from_key(cls, key):
1561        return cls(cls.hash_fn(key))
1562
1563    @classmethod
1564    def from_string(cls, token_string):
1565        raise NotImplementedError()
1566
1567    def __eq__(self, other):
1568        return self.value == other.value
1569
1570    def __lt__(self, other):
1571        return self.value < other.value
1572
1573    def __hash__(self):
1574        return hash(self.value)
1575
1576    def __repr__(self):
1577        return "<%s: %s>" % (self.__class__.__name__, self.value)
1578    __str__ = __repr__
1579
1580
1581MIN_LONG = -(2 ** 63)
1582MAX_LONG = (2 ** 63) - 1
1583
1584
1585class NoMurmur3(Exception):
1586    pass
1587
1588
1589class HashToken(Token):
1590
1591    @classmethod
1592    def from_string(cls, token_string):
1593        """ `token_string` should be the string representation from the server. """
1594        # The hash partitioners just store the deciman value
1595        return cls(int(token_string))
1596
1597
1598class Murmur3Token(HashToken):
1599    """
1600    A token for ``Murmur3Partitioner``.
1601    """
1602
1603    @classmethod
1604    def hash_fn(cls, key):
1605        if murmur3 is not None:
1606            h = int(murmur3(key))
1607            return h if h != MIN_LONG else MAX_LONG
1608        else:
1609            raise NoMurmur3()
1610
1611    def __init__(self, token):
1612        """ `token` is an int or string representing the token. """
1613        self.value = int(token)
1614
1615
1616class MD5Token(HashToken):
1617    """
1618    A token for ``RandomPartitioner``.
1619    """
1620
1621    @classmethod
1622    def hash_fn(cls, key):
1623        if isinstance(key, six.text_type):
1624            key = key.encode('UTF-8')
1625        return abs(varint_unpack(md5(key).digest()))
1626
1627
1628class BytesToken(Token):
1629    """
1630    A token for ``ByteOrderedPartitioner``.
1631    """
1632
1633    @classmethod
1634    def from_string(cls, token_string):
1635        """ `token_string` should be the string representation from the server. """
1636        # unhexlify works fine with unicode input in everythin but pypy3, where it Raises "TypeError: 'str' does not support the buffer interface"
1637        if isinstance(token_string, six.text_type):
1638            token_string = token_string.encode('ascii')
1639        # The BOP stores a hex string
1640        return cls(unhexlify(token_string))
1641
1642
1643class TriggerMetadata(object):
1644    """
1645    A representation of a trigger for a table.
1646    """
1647
1648    table = None
1649    """ The :class:`.TableMetadata` this trigger belongs to. """
1650
1651    name = None
1652    """ The string name of this trigger. """
1653
1654    options = None
1655    """
1656    A dict mapping trigger option names to their specific settings for this
1657    table.
1658    """
1659    def __init__(self, table_metadata, trigger_name, options=None):
1660        self.table = table_metadata
1661        self.name = trigger_name
1662        self.options = options
1663
1664    def as_cql_query(self):
1665        ret = "CREATE TRIGGER %s ON %s.%s USING %s" % (
1666            protect_name(self.name),
1667            protect_name(self.table.keyspace_name),
1668            protect_name(self.table.name),
1669            protect_value(self.options['class'])
1670        )
1671        return ret
1672
1673    def export_as_string(self):
1674        return self.as_cql_query() + ';'
1675
1676
1677class _SchemaParser(object):
1678
1679    def __init__(self, connection, timeout):
1680        self.connection = connection
1681        self.timeout = timeout
1682
1683    def _handle_results(self, success, result, expected_failures=tuple()):
1684        """
1685        Given a bool and a ResultSet (the form returned per result from
1686        Connection.wait_for_responses), return a dictionary containing the
1687        results. Used to process results from asynchronous queries to system
1688        tables.
1689
1690        ``expected_failures`` will usually be used to allow callers to ignore
1691        ``InvalidRequest`` errors caused by a missing system keyspace. For
1692        example, some DSE versions report a 4.X server version, but do not have
1693        virtual tables. Thus, running against 4.X servers, SchemaParserV4 uses
1694        expected_failures to make a best-effort attempt to read those
1695        keyspaces, but treat them as empty if they're not found.
1696
1697        :param success: A boolean representing whether or not the query
1698        succeeded
1699        :param result: The resultset in question.
1700        :expected_failures: An Exception class or an iterable thereof. If the
1701        query failed, but raised an instance of an expected failure class, this
1702        will ignore the failure and return an empty list.
1703        """
1704        if not success and isinstance(result, expected_failures):
1705            return []
1706        elif success:
1707            return dict_factory(*result.results) if result else []
1708        else:
1709            raise result
1710
1711    def _query_build_row(self, query_string, build_func):
1712        result = self._query_build_rows(query_string, build_func)
1713        return result[0] if result else None
1714
1715    def _query_build_rows(self, query_string, build_func):
1716        query = QueryMessage(query=query_string, consistency_level=ConsistencyLevel.ONE)
1717        responses = self.connection.wait_for_responses((query), timeout=self.timeout, fail_on_error=False)
1718        (success, response) = responses[0]
1719        if success:
1720            result = dict_factory(*response.results)
1721            return [build_func(row) for row in result]
1722        elif isinstance(response, InvalidRequest):
1723            log.debug("user types table not found")
1724            return []
1725        else:
1726            raise response
1727
1728
1729class SchemaParserV22(_SchemaParser):
1730    _SELECT_KEYSPACES = "SELECT * FROM system.schema_keyspaces"
1731    _SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies"
1732    _SELECT_COLUMNS = "SELECT * FROM system.schema_columns"
1733    _SELECT_TRIGGERS = "SELECT * FROM system.schema_triggers"
1734    _SELECT_TYPES = "SELECT * FROM system.schema_usertypes"
1735    _SELECT_FUNCTIONS = "SELECT * FROM system.schema_functions"
1736    _SELECT_AGGREGATES = "SELECT * FROM system.schema_aggregates"
1737
1738    _table_name_col = 'columnfamily_name'
1739
1740    _function_agg_arument_type_col = 'signature'
1741
1742    recognized_table_options = (
1743        "comment",
1744        "read_repair_chance",
1745        "dclocal_read_repair_chance",  # kept to be safe, but see _build_table_options()
1746        "local_read_repair_chance",
1747        "replicate_on_write",
1748        "gc_grace_seconds",
1749        "bloom_filter_fp_chance",
1750        "caching",
1751        "compaction_strategy_class",
1752        "compaction_strategy_options",
1753        "min_compaction_threshold",
1754        "max_compaction_threshold",
1755        "compression_parameters",
1756        "min_index_interval",
1757        "max_index_interval",
1758        "index_interval",
1759        "speculative_retry",
1760        "rows_per_partition_to_cache",
1761        "memtable_flush_period_in_ms",
1762        "populate_io_cache_on_flush",
1763        "compression",
1764        "default_time_to_live")
1765
1766    def __init__(self, connection, timeout):
1767        super(SchemaParserV22, self).__init__(connection, timeout)
1768        self.keyspaces_result = []
1769        self.tables_result = []
1770        self.columns_result = []
1771        self.triggers_result = []
1772        self.types_result = []
1773        self.functions_result = []
1774        self.aggregates_result = []
1775
1776        self.keyspace_table_rows = defaultdict(list)
1777        self.keyspace_table_col_rows = defaultdict(lambda: defaultdict(list))
1778        self.keyspace_type_rows = defaultdict(list)
1779        self.keyspace_func_rows = defaultdict(list)
1780        self.keyspace_agg_rows = defaultdict(list)
1781        self.keyspace_table_trigger_rows = defaultdict(lambda: defaultdict(list))
1782
1783    def get_all_keyspaces(self):
1784        self._query_all()
1785
1786        for row in self.keyspaces_result:
1787            keyspace_meta = self._build_keyspace_metadata(row)
1788
1789            try:
1790                for table_row in self.keyspace_table_rows.get(keyspace_meta.name, []):
1791                    table_meta = self._build_table_metadata(table_row)
1792                    keyspace_meta._add_table_metadata(table_meta)
1793
1794                for usertype_row in self.keyspace_type_rows.get(keyspace_meta.name, []):
1795                    usertype = self._build_user_type(usertype_row)
1796                    keyspace_meta.user_types[usertype.name] = usertype
1797
1798                for fn_row in self.keyspace_func_rows.get(keyspace_meta.name, []):
1799                    fn = self._build_function(fn_row)
1800                    keyspace_meta.functions[fn.signature] = fn
1801
1802                for agg_row in self.keyspace_agg_rows.get(keyspace_meta.name, []):
1803                    agg = self._build_aggregate(agg_row)
1804                    keyspace_meta.aggregates[agg.signature] = agg
1805            except Exception:
1806                log.exception("Error while parsing metadata for keyspace %s. Metadata model will be incomplete.", keyspace_meta.name)
1807                keyspace_meta._exc_info = sys.exc_info()
1808
1809            yield keyspace_meta
1810
1811    def get_table(self, keyspaces, keyspace, table):
1812        cl = ConsistencyLevel.ONE
1813        where_clause = bind_params(" WHERE keyspace_name = %%s AND %s = %%s" % (self._table_name_col,), (keyspace, table), _encoder)
1814        cf_query = QueryMessage(query=self._SELECT_COLUMN_FAMILIES + where_clause, consistency_level=cl)
1815        col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl)
1816        triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl)
1817        (cf_success, cf_result), (col_success, col_result), (triggers_success, triggers_result) \
1818            = self.connection.wait_for_responses(cf_query, col_query, triggers_query, timeout=self.timeout, fail_on_error=False)
1819        table_result = self._handle_results(cf_success, cf_result)
1820        col_result = self._handle_results(col_success, col_result)
1821
1822        # the triggers table doesn't exist in C* 1.2
1823        triggers_result = self._handle_results(triggers_success, triggers_result,
1824                                               expected_failures=InvalidRequest)
1825
1826        if table_result:
1827            return self._build_table_metadata(table_result[0], col_result, triggers_result)
1828
1829    def get_type(self, keyspaces, keyspace, type):
1830        where_clause = bind_params(" WHERE keyspace_name = %s AND type_name = %s", (keyspace, type), _encoder)
1831        return self._query_build_row(self._SELECT_TYPES + where_clause, self._build_user_type)
1832
1833    def get_types_map(self, keyspaces, keyspace):
1834        where_clause = bind_params(" WHERE keyspace_name = %s", (keyspace,), _encoder)
1835        types = self._query_build_rows(self._SELECT_TYPES + where_clause, self._build_user_type)
1836        return dict((t.name, t) for t in types)
1837
1838    def get_function(self, keyspaces, keyspace, function):
1839        where_clause = bind_params(" WHERE keyspace_name = %%s AND function_name = %%s AND %s = %%s" % (self._function_agg_arument_type_col,),
1840                                   (keyspace, function.name, function.argument_types), _encoder)
1841        return self._query_build_row(self._SELECT_FUNCTIONS + where_clause, self._build_function)
1842
1843    def get_aggregate(self, keyspaces, keyspace, aggregate):
1844        where_clause = bind_params(" WHERE keyspace_name = %%s AND aggregate_name = %%s AND %s = %%s" % (self._function_agg_arument_type_col,),
1845                                   (keyspace, aggregate.name, aggregate.argument_types), _encoder)
1846
1847        return self._query_build_row(self._SELECT_AGGREGATES + where_clause, self._build_aggregate)
1848
1849    def get_keyspace(self, keyspaces, keyspace):
1850        where_clause = bind_params(" WHERE keyspace_name = %s", (keyspace,), _encoder)
1851        return self._query_build_row(self._SELECT_KEYSPACES + where_clause, self._build_keyspace_metadata)
1852
1853    @classmethod
1854    def _build_keyspace_metadata(cls, row):
1855        try:
1856            ksm = cls._build_keyspace_metadata_internal(row)
1857        except Exception:
1858            name = row["keyspace_name"]
1859            ksm = KeyspaceMetadata(name, False, 'UNKNOWN', {})
1860            ksm._exc_info = sys.exc_info()  # capture exc_info before log because nose (test) logging clears it in certain circumstances
1861            log.exception("Error while parsing metadata for keyspace %s row(%s)", name, row)
1862        return ksm
1863
1864    @staticmethod
1865    def _build_keyspace_metadata_internal(row):
1866        name = row["keyspace_name"]
1867        durable_writes = row["durable_writes"]
1868        strategy_class = row["strategy_class"]
1869        strategy_options = json.loads(row["strategy_options"])
1870        return KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options)
1871
1872    @classmethod
1873    def _build_user_type(cls, usertype_row):
1874        field_types = list(map(cls._schema_type_to_cql, usertype_row['field_types']))
1875        return UserType(usertype_row['keyspace_name'], usertype_row['type_name'],
1876                        usertype_row['field_names'], field_types)
1877
1878    @classmethod
1879    def _build_function(cls, function_row):
1880        return_type = cls._schema_type_to_cql(function_row['return_type'])
1881        return Function(function_row['keyspace_name'], function_row['function_name'],
1882                        function_row[cls._function_agg_arument_type_col], function_row['argument_names'],
1883                        return_type, function_row['language'], function_row['body'],
1884                        function_row['called_on_null_input'])
1885
1886    @classmethod
1887    def _build_aggregate(cls, aggregate_row):
1888        cass_state_type = types.lookup_casstype(aggregate_row['state_type'])
1889        initial_condition = aggregate_row['initcond']
1890        if initial_condition is not None:
1891            initial_condition = _encoder.cql_encode_all_types(cass_state_type.deserialize(initial_condition, 3))
1892        state_type = _cql_from_cass_type(cass_state_type)
1893        return_type = cls._schema_type_to_cql(aggregate_row['return_type'])
1894        return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'],
1895                         aggregate_row['signature'], aggregate_row['state_func'], state_type,
1896                         aggregate_row['final_func'], initial_condition, return_type)
1897
1898    def _build_table_metadata(self, row, col_rows=None, trigger_rows=None):
1899        keyspace_name = row["keyspace_name"]
1900        cfname = row[self._table_name_col]
1901
1902        col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][cfname]
1903        trigger_rows = trigger_rows or self.keyspace_table_trigger_rows[keyspace_name][cfname]
1904
1905        if not col_rows:  # CASSANDRA-8487
1906            log.warning("Building table metadata with no column meta for %s.%s",
1907                        keyspace_name, cfname)
1908
1909        table_meta = TableMetadata(keyspace_name, cfname)
1910
1911        try:
1912            comparator = types.lookup_casstype(row["comparator"])
1913            table_meta.comparator = comparator
1914
1915            is_dct_comparator = issubclass(comparator, types.DynamicCompositeType)
1916            is_composite_comparator = issubclass(comparator, types.CompositeType)
1917            column_name_types = comparator.subtypes if is_composite_comparator else (comparator,)
1918
1919            num_column_name_components = len(column_name_types)
1920            last_col = column_name_types[-1]
1921
1922            column_aliases = row.get("column_aliases", None)
1923
1924            clustering_rows = [r for r in col_rows
1925                               if r.get('type', None) == "clustering_key"]
1926            if len(clustering_rows) > 1:
1927                clustering_rows = sorted(clustering_rows, key=lambda row: row.get('component_index'))
1928
1929            if column_aliases is not None:
1930                column_aliases = json.loads(column_aliases)
1931
1932            if not column_aliases:  # json load failed or column_aliases empty PYTHON-562
1933                column_aliases = [r.get('column_name') for r in clustering_rows]
1934
1935            if is_composite_comparator:
1936                if issubclass(last_col, types.ColumnToCollectionType):
1937                    # collections
1938                    is_compact = False
1939                    has_value = False
1940                    clustering_size = num_column_name_components - 2
1941                elif (len(column_aliases) == num_column_name_components - 1 and
1942                      issubclass(last_col, types.UTF8Type)):
1943                    # aliases?
1944                    is_compact = False
1945                    has_value = False
1946                    clustering_size = num_column_name_components - 1
1947                else:
1948                    # compact table
1949                    is_compact = True
1950                    has_value = column_aliases or not col_rows
1951                    clustering_size = num_column_name_components
1952
1953                    # Some thrift tables define names in composite types (see PYTHON-192)
1954                    if not column_aliases and hasattr(comparator, 'fieldnames'):
1955                        column_aliases = filter(None, comparator.fieldnames)
1956            else:
1957                is_compact = True
1958                if column_aliases or not col_rows or is_dct_comparator:
1959                    has_value = True
1960                    clustering_size = num_column_name_components
1961                else:
1962                    has_value = False
1963                    clustering_size = 0
1964
1965            # partition key
1966            partition_rows = [r for r in col_rows
1967                              if r.get('type', None) == "partition_key"]
1968
1969            if len(partition_rows) > 1:
1970                partition_rows = sorted(partition_rows, key=lambda row: row.get('component_index'))
1971
1972            key_aliases = row.get("key_aliases")
1973            if key_aliases is not None:
1974                key_aliases = json.loads(key_aliases) if key_aliases else []
1975            else:
1976                # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it.
1977                key_aliases = [r.get('column_name') for r in partition_rows]
1978
1979            key_validator = row.get("key_validator")
1980            if key_validator is not None:
1981                key_type = types.lookup_casstype(key_validator)
1982                key_types = key_type.subtypes if issubclass(key_type, types.CompositeType) else [key_type]
1983            else:
1984                key_types = [types.lookup_casstype(r.get('validator')) for r in partition_rows]
1985
1986            for i, col_type in enumerate(key_types):
1987                if len(key_aliases) > i:
1988                    column_name = key_aliases[i]
1989                elif i == 0:
1990                    column_name = "key"
1991                else:
1992                    column_name = "key%d" % i
1993
1994                col = ColumnMetadata(table_meta, column_name, col_type.cql_parameterized_type())
1995                table_meta.columns[column_name] = col
1996                table_meta.partition_key.append(col)
1997
1998            # clustering key
1999            for i in range(clustering_size):
2000                if len(column_aliases) > i:
2001                    column_name = column_aliases[i]
2002                else:
2003                    column_name = "column%d" % (i + 1)
2004
2005                data_type = column_name_types[i]
2006                cql_type = _cql_from_cass_type(data_type)
2007                is_reversed = types.is_reversed_casstype(data_type)
2008                col = ColumnMetadata(table_meta, column_name, cql_type, is_reversed=is_reversed)
2009                table_meta.columns[column_name] = col
2010                table_meta.clustering_key.append(col)
2011
2012            # value alias (if present)
2013            if has_value:
2014                value_alias_rows = [r for r in col_rows
2015                                    if r.get('type', None) == "compact_value"]
2016
2017                if not key_aliases:  # TODO are we checking the right thing here?
2018                    value_alias = "value"
2019                else:
2020                    value_alias = row.get("value_alias", None)
2021                    if value_alias is None and value_alias_rows:  # CASSANDRA-8487
2022                        # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it.
2023                        value_alias = value_alias_rows[0].get('column_name')
2024
2025                default_validator = row.get("default_validator")
2026                if default_validator:
2027                    validator = types.lookup_casstype(default_validator)
2028                else:
2029                    if value_alias_rows:  # CASSANDRA-8487
2030                        validator = types.lookup_casstype(value_alias_rows[0].get('validator'))
2031
2032                cql_type = _cql_from_cass_type(validator)
2033                col = ColumnMetadata(table_meta, value_alias, cql_type)
2034                if value_alias:  # CASSANDRA-8487
2035                    table_meta.columns[value_alias] = col
2036
2037            # other normal columns
2038            for col_row in col_rows:
2039                column_meta = self._build_column_metadata(table_meta, col_row)
2040                if column_meta.name is not None:
2041                    table_meta.columns[column_meta.name] = column_meta
2042                    index_meta = self._build_index_metadata(column_meta, col_row)
2043                    if index_meta:
2044                        table_meta.indexes[index_meta.name] = index_meta
2045
2046            for trigger_row in trigger_rows:
2047                trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
2048                table_meta.triggers[trigger_meta.name] = trigger_meta
2049
2050            table_meta.options = self._build_table_options(row)
2051            table_meta.is_compact_storage = is_compact
2052        except Exception:
2053            table_meta._exc_info = sys.exc_info()
2054            log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, cfname, row, col_rows)
2055
2056        return table_meta
2057
2058    def _build_table_options(self, row):
2059        """ Setup the mostly-non-schema table options, like caching settings """
2060        options = dict((o, row.get(o)) for o in self.recognized_table_options if o in row)
2061
2062        # the option name when creating tables is "dclocal_read_repair_chance",
2063        # but the column name in system.schema_columnfamilies is
2064        # "local_read_repair_chance".  We'll store this as dclocal_read_repair_chance,
2065        # since that's probably what users are expecting (and we need it for the
2066        # CREATE TABLE statement anyway).
2067        if "local_read_repair_chance" in options:
2068            val = options.pop("local_read_repair_chance")
2069            options["dclocal_read_repair_chance"] = val
2070
2071        return options
2072
2073    @classmethod
2074    def _build_column_metadata(cls, table_metadata, row):
2075        name = row["column_name"]
2076        type_string = row["validator"]
2077        data_type = types.lookup_casstype(type_string)
2078        cql_type = _cql_from_cass_type(data_type)
2079        is_static = row.get("type", None) == "static"
2080        is_reversed = types.is_reversed_casstype(data_type)
2081        column_meta = ColumnMetadata(table_metadata, name, cql_type, is_static, is_reversed)
2082        column_meta._cass_type = data_type
2083        return column_meta
2084
2085    @staticmethod
2086    def _build_index_metadata(column_metadata, row):
2087        index_name = row.get("index_name")
2088        kind = row.get("index_type")
2089        if index_name or kind:
2090            options = row.get("index_options")
2091            options = json.loads(options) if options else {}
2092            options = options or {}  # if the json parsed to None, init empty dict
2093
2094            # generate a CQL index identity string
2095            target = protect_name(column_metadata.name)
2096            if kind != "CUSTOM":
2097                if "index_keys" in options:
2098                    target = 'keys(%s)' % (target,)
2099                elif "index_values" in options:
2100                    # don't use any "function" for collection values
2101                    pass
2102                else:
2103                    # it might be a "full" index on a frozen collection, but
2104                    # we need to check the data type to verify that, because
2105                    # there is no special index option for full-collection
2106                    # indexes.
2107                    data_type = column_metadata._cass_type
2108                    collection_types = ('map', 'set', 'list')
2109                    if data_type.typename == "frozen" and data_type.subtypes[0].typename in collection_types:
2110                        # no index option for full-collection index
2111                        target = 'full(%s)' % (target,)
2112            options['target'] = target
2113            return IndexMetadata(column_metadata.table.keyspace_name, column_metadata.table.name, index_name, kind, options)
2114
2115    @staticmethod
2116    def _build_trigger_metadata(table_metadata, row):
2117        name = row["trigger_name"]
2118        options = row["trigger_options"]
2119        trigger_meta = TriggerMetadata(table_metadata, name, options)
2120        return trigger_meta
2121
2122    def _query_all(self):
2123        cl = ConsistencyLevel.ONE
2124        queries = [
2125            QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl),
2126            QueryMessage(query=self._SELECT_COLUMN_FAMILIES, consistency_level=cl),
2127            QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
2128            QueryMessage(query=self._SELECT_TYPES, consistency_level=cl),
2129            QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl),
2130            QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl),
2131            QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl)
2132        ]
2133
2134        ((ks_success, ks_result),
2135         (table_success, table_result),
2136         (col_success, col_result),
2137         (types_success, types_result),
2138         (functions_success, functions_result),
2139         (aggregates_success, aggregates_result),
2140         (triggers_success, triggers_result)) = (
2141             self.connection.wait_for_responses(*queries, timeout=self.timeout,
2142                                                fail_on_error=False)
2143        )
2144
2145        self.keyspaces_result = self._handle_results(ks_success, ks_result)
2146        self.tables_result = self._handle_results(table_success, table_result)
2147        self.columns_result = self._handle_results(col_success, col_result)
2148
2149        # if we're connected to Cassandra < 2.0, the triggers table will not exist
2150        if triggers_success:
2151            self.triggers_result = dict_factory(*triggers_result.results)
2152        else:
2153            if isinstance(triggers_result, InvalidRequest):
2154                log.debug("triggers table not found")
2155            elif isinstance(triggers_result, Unauthorized):
2156                log.warning("this version of Cassandra does not allow access to schema_triggers metadata with authorization enabled (CASSANDRA-7967); "
2157                            "The driver will operate normally, but will not reflect triggers in the local metadata model, or schema strings.")
2158            else:
2159                raise triggers_result
2160
2161        # if we're connected to Cassandra < 2.1, the usertypes table will not exist
2162        if types_success:
2163            self.types_result = dict_factory(*types_result.results)
2164        else:
2165            if isinstance(types_result, InvalidRequest):
2166                log.debug("user types table not found")
2167                self.types_result = {}
2168            else:
2169                raise types_result
2170
2171        # functions were introduced in Cassandra 2.2
2172        if functions_success:
2173            self.functions_result = dict_factory(*functions_result.results)
2174        else:
2175            if isinstance(functions_result, InvalidRequest):
2176                log.debug("user functions table not found")
2177            else:
2178                raise functions_result
2179
2180        # aggregates were introduced in Cassandra 2.2
2181        if aggregates_success:
2182            self.aggregates_result = dict_factory(*aggregates_result.results)
2183        else:
2184            if isinstance(aggregates_result, InvalidRequest):
2185                log.debug("user aggregates table not found")
2186            else:
2187                raise aggregates_result
2188
2189        self._aggregate_results()
2190
2191    def _aggregate_results(self):
2192        m = self.keyspace_table_rows
2193        for row in self.tables_result:
2194            m[row["keyspace_name"]].append(row)
2195
2196        m = self.keyspace_table_col_rows
2197        for row in self.columns_result:
2198            ksname = row["keyspace_name"]
2199            cfname = row[self._table_name_col]
2200            m[ksname][cfname].append(row)
2201
2202        m = self.keyspace_type_rows
2203        for row in self.types_result:
2204            m[row["keyspace_name"]].append(row)
2205
2206        m = self.keyspace_func_rows
2207        for row in self.functions_result:
2208            m[row["keyspace_name"]].append(row)
2209
2210        m = self.keyspace_agg_rows
2211        for row in self.aggregates_result:
2212            m[row["keyspace_name"]].append(row)
2213
2214        m = self.keyspace_table_trigger_rows
2215        for row in self.triggers_result:
2216            ksname = row["keyspace_name"]
2217            cfname = row[self._table_name_col]
2218            m[ksname][cfname].append(row)
2219
2220    @staticmethod
2221    def _schema_type_to_cql(type_string):
2222        cass_type = types.lookup_casstype(type_string)
2223        return _cql_from_cass_type(cass_type)
2224
2225
2226class SchemaParserV3(SchemaParserV22):
2227    _SELECT_KEYSPACES = "SELECT * FROM system_schema.keyspaces"
2228    _SELECT_TABLES = "SELECT * FROM system_schema.tables"
2229    _SELECT_COLUMNS = "SELECT * FROM system_schema.columns"
2230    _SELECT_INDEXES = "SELECT * FROM system_schema.indexes"
2231    _SELECT_TRIGGERS = "SELECT * FROM system_schema.triggers"
2232    _SELECT_TYPES = "SELECT * FROM system_schema.types"
2233    _SELECT_FUNCTIONS = "SELECT * FROM system_schema.functions"
2234    _SELECT_AGGREGATES = "SELECT * FROM system_schema.aggregates"
2235    _SELECT_VIEWS = "SELECT * FROM system_schema.views"
2236
2237    _table_name_col = 'table_name'
2238
2239    _function_agg_arument_type_col = 'argument_types'
2240
2241    recognized_table_options = (
2242        'bloom_filter_fp_chance',
2243        'caching',
2244        'cdc',
2245        'comment',
2246        'compaction',
2247        'compression',
2248        'crc_check_chance',
2249        'dclocal_read_repair_chance',
2250        'default_time_to_live',
2251        'gc_grace_seconds',
2252        'max_index_interval',
2253        'memtable_flush_period_in_ms',
2254        'min_index_interval',
2255        'read_repair_chance',
2256        'speculative_retry')
2257
2258    def __init__(self, connection, timeout):
2259        super(SchemaParserV3, self).__init__(connection, timeout)
2260        self.indexes_result = []
2261        self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list))
2262        self.keyspace_view_rows = defaultdict(list)
2263
2264    def get_all_keyspaces(self):
2265        for keyspace_meta in super(SchemaParserV3, self).get_all_keyspaces():
2266            for row in self.keyspace_view_rows[keyspace_meta.name]:
2267                view_meta = self._build_view_metadata(row)
2268                keyspace_meta._add_view_metadata(view_meta)
2269            yield keyspace_meta
2270
2271    def get_table(self, keyspaces, keyspace, table):
2272        cl = ConsistencyLevel.ONE
2273        where_clause = bind_params(" WHERE keyspace_name = %%s AND %s = %%s" % (self._table_name_col), (keyspace, table), _encoder)
2274        cf_query = QueryMessage(query=self._SELECT_TABLES + where_clause, consistency_level=cl)
2275        col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl)
2276        indexes_query = QueryMessage(query=self._SELECT_INDEXES + where_clause, consistency_level=cl)
2277        triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl)
2278
2279        # in protocol v4 we don't know if this event is a view or a table, so we look for both
2280        where_clause = bind_params(" WHERE keyspace_name = %s AND view_name = %s", (keyspace, table), _encoder)
2281        view_query = QueryMessage(query=self._SELECT_VIEWS + where_clause,
2282                                  consistency_level=cl)
2283        ((cf_success, cf_result), (col_success, col_result),
2284         (indexes_sucess, indexes_result), (triggers_success, triggers_result),
2285         (view_success, view_result)) = (
2286             self.connection.wait_for_responses(
2287                 cf_query, col_query, indexes_query, triggers_query,
2288                 view_query, timeout=self.timeout, fail_on_error=False)
2289        )
2290        table_result = self._handle_results(cf_success, cf_result)
2291        col_result = self._handle_results(col_success, col_result)
2292        if table_result:
2293            indexes_result = self._handle_results(indexes_sucess, indexes_result)
2294            triggers_result = self._handle_results(triggers_success, triggers_result)
2295            return self._build_table_metadata(table_result[0], col_result, triggers_result, indexes_result)
2296
2297        view_result = self._handle_results(view_success, view_result)
2298        if view_result:
2299            return self._build_view_metadata(view_result[0], col_result)
2300
2301    @staticmethod
2302    def _build_keyspace_metadata_internal(row):
2303        name = row["keyspace_name"]
2304        durable_writes = row["durable_writes"]
2305        strategy_options = dict(row["replication"])
2306        strategy_class = strategy_options.pop("class")
2307        return KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options)
2308
2309    @staticmethod
2310    def _build_aggregate(aggregate_row):
2311        return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'],
2312                         aggregate_row['argument_types'], aggregate_row['state_func'], aggregate_row['state_type'],
2313                         aggregate_row['final_func'], aggregate_row['initcond'], aggregate_row['return_type'])
2314
2315    def _build_table_metadata(self, row, col_rows=None, trigger_rows=None, index_rows=None, virtual=False):
2316        keyspace_name = row["keyspace_name"]
2317        table_name = row[self._table_name_col]
2318
2319        col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][table_name]
2320        trigger_rows = trigger_rows or self.keyspace_table_trigger_rows[keyspace_name][table_name]
2321        index_rows = index_rows or self.keyspace_table_index_rows[keyspace_name][table_name]
2322
2323        table_meta = TableMetadataV3(keyspace_name, table_name, virtual=virtual)
2324        try:
2325            table_meta.options = self._build_table_options(row)
2326            flags = row.get('flags', set())
2327            if flags:
2328                compact_static = False
2329                table_meta.is_compact_storage = 'dense' in flags or 'super' in flags or 'compound' not in flags
2330                is_dense = 'dense' in flags
2331            elif virtual:
2332                compact_static = False
2333                table_meta.is_compact_storage = False
2334                is_dense = False
2335            else:
2336                compact_static = True
2337                table_meta.is_compact_storage = True
2338                is_dense = False
2339
2340            self._build_table_columns(table_meta, col_rows, compact_static, is_dense, virtual)
2341
2342            for trigger_row in trigger_rows:
2343                trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
2344                table_meta.triggers[trigger_meta.name] = trigger_meta
2345
2346            for index_row in index_rows:
2347                index_meta = self._build_index_metadata(table_meta, index_row)
2348                if index_meta:
2349                    table_meta.indexes[index_meta.name] = index_meta
2350
2351            table_meta.extensions = row.get('extensions', {})
2352        except Exception:
2353            table_meta._exc_info = sys.exc_info()
2354            log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, table_name, row, col_rows)
2355
2356        return table_meta
2357
2358    def _build_table_options(self, row):
2359        """ Setup the mostly-non-schema table options, like caching settings """
2360        return dict((o, row.get(o)) for o in self.recognized_table_options if o in row)
2361
2362    def _build_table_columns(self, meta, col_rows, compact_static=False, is_dense=False, virtual=False):
2363        # partition key
2364        partition_rows = [r for r in col_rows
2365                          if r.get('kind', None) == "partition_key"]
2366        if len(partition_rows) > 1:
2367            partition_rows = sorted(partition_rows, key=lambda row: row.get('position'))
2368        for r in partition_rows:
2369            # we have to add meta here (and not in the later loop) because TableMetadata.columns is an
2370            # OrderedDict, and it assumes keys are inserted first, in order, when exporting CQL
2371            column_meta = self._build_column_metadata(meta, r)
2372            meta.columns[column_meta.name] = column_meta
2373            meta.partition_key.append(meta.columns[r.get('column_name')])
2374
2375        # clustering key
2376        if not compact_static:
2377            clustering_rows = [r for r in col_rows
2378                               if r.get('kind', None) == "clustering"]
2379            if len(clustering_rows) > 1:
2380                clustering_rows = sorted(clustering_rows, key=lambda row: row.get('position'))
2381            for r in clustering_rows:
2382                column_meta = self._build_column_metadata(meta, r)
2383                meta.columns[column_meta.name] = column_meta
2384                meta.clustering_key.append(meta.columns[r.get('column_name')])
2385
2386        for col_row in (r for r in col_rows
2387                        if r.get('kind', None) not in ('partition_key', 'clustering_key')):
2388            column_meta = self._build_column_metadata(meta, col_row)
2389            if is_dense and column_meta.cql_type == types.cql_empty_type:
2390                continue
2391            if compact_static and not column_meta.is_static:
2392                # for compact static tables, we omit the clustering key and value, and only add the logical columns.
2393                # They are marked not static so that it generates appropriate CQL
2394                continue
2395            if compact_static:
2396                column_meta.is_static = False
2397            meta.columns[column_meta.name] = column_meta
2398
2399    def _build_view_metadata(self, row, col_rows=None):
2400        keyspace_name = row["keyspace_name"]
2401        view_name = row["view_name"]
2402        base_table_name = row["base_table_name"]
2403        include_all_columns = row["include_all_columns"]
2404        where_clause = row["where_clause"]
2405        col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][view_name]
2406        view_meta = MaterializedViewMetadata(keyspace_name, view_name, base_table_name,
2407                                             include_all_columns, where_clause, self._build_table_options(row))
2408        self._build_table_columns(view_meta, col_rows)
2409        view_meta.extensions = row.get('extensions', {})
2410
2411        return view_meta
2412
2413    @staticmethod
2414    def _build_column_metadata(table_metadata, row):
2415        name = row["column_name"]
2416        cql_type = row["type"]
2417        is_static = row.get("kind", None) == "static"
2418        is_reversed = row["clustering_order"].upper() == "DESC"
2419        column_meta = ColumnMetadata(table_metadata, name, cql_type, is_static, is_reversed)
2420        return column_meta
2421
2422    @staticmethod
2423    def _build_index_metadata(table_metadata, row):
2424        index_name = row.get("index_name")
2425        kind = row.get("kind")
2426        if index_name or kind:
2427            index_options = row.get("options")
2428            return IndexMetadata(table_metadata.keyspace_name, table_metadata.name, index_name, kind, index_options)
2429        else:
2430            return None
2431
2432    @staticmethod
2433    def _build_trigger_metadata(table_metadata, row):
2434        name = row["trigger_name"]
2435        options = row["options"]
2436        trigger_meta = TriggerMetadata(table_metadata, name, options)
2437        return trigger_meta
2438
2439    def _query_all(self):
2440        cl = ConsistencyLevel.ONE
2441        queries = [
2442            QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl),
2443            QueryMessage(query=self._SELECT_TABLES, consistency_level=cl),
2444            QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
2445            QueryMessage(query=self._SELECT_TYPES, consistency_level=cl),
2446            QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl),
2447            QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl),
2448            QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl),
2449            QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl),
2450            QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl)
2451        ]
2452
2453        ((ks_success, ks_result),
2454         (table_success, table_result),
2455         (col_success, col_result),
2456         (types_success, types_result),
2457         (functions_success, functions_result),
2458         (aggregates_success, aggregates_result),
2459         (triggers_success, triggers_result),
2460         (indexes_success, indexes_result),
2461         (views_success, views_result)) = self.connection.wait_for_responses(
2462             *queries, timeout=self.timeout, fail_on_error=False
2463        )
2464
2465        self.keyspaces_result = self._handle_results(ks_success, ks_result)
2466        self.tables_result = self._handle_results(table_success, table_result)
2467        self.columns_result = self._handle_results(col_success, col_result)
2468        self.triggers_result = self._handle_results(triggers_success, triggers_result)
2469        self.types_result = self._handle_results(types_success, types_result)
2470        self.functions_result = self._handle_results(functions_success, functions_result)
2471        self.aggregates_result = self._handle_results(aggregates_success, aggregates_result)
2472        self.indexes_result = self._handle_results(indexes_success, indexes_result)
2473        self.views_result = self._handle_results(views_success, views_result)
2474
2475        self._aggregate_results()
2476
2477    def _aggregate_results(self):
2478        super(SchemaParserV3, self)._aggregate_results()
2479
2480        m = self.keyspace_table_index_rows
2481        for row in self.indexes_result:
2482            ksname = row["keyspace_name"]
2483            cfname = row[self._table_name_col]
2484            m[ksname][cfname].append(row)
2485
2486        m = self.keyspace_view_rows
2487        for row in self.views_result:
2488            m[row["keyspace_name"]].append(row)
2489
2490    @staticmethod
2491    def _schema_type_to_cql(type_string):
2492        return type_string
2493
2494
2495class SchemaParserV4(SchemaParserV3):
2496
2497    recognized_table_options = tuple(
2498        opt for opt in
2499        SchemaParserV3.recognized_table_options
2500        if opt not in (
2501            # removed in V4: CASSANDRA-13910
2502            'dclocal_read_repair_chance', 'read_repair_chance'
2503        )
2504    )
2505
2506    _SELECT_VIRTUAL_KEYSPACES = 'SELECT * from system_virtual_schema.keyspaces'
2507    _SELECT_VIRTUAL_TABLES = 'SELECT * from system_virtual_schema.tables'
2508    _SELECT_VIRTUAL_COLUMNS = 'SELECT * from system_virtual_schema.columns'
2509
2510    def __init__(self, connection, timeout):
2511        super(SchemaParserV4, self).__init__(connection, timeout)
2512        self.virtual_keyspaces_rows = defaultdict(list)
2513        self.virtual_tables_rows = defaultdict(list)
2514        self.virtual_columns_rows = defaultdict(lambda: defaultdict(list))
2515
2516    def _query_all(self):
2517        cl = ConsistencyLevel.ONE
2518        # todo: this duplicates V3; we should find a way for _query_all methods
2519        # to extend each other.
2520        queries = [
2521            # copied from V3
2522            QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl),
2523            QueryMessage(query=self._SELECT_TABLES, consistency_level=cl),
2524            QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
2525            QueryMessage(query=self._SELECT_TYPES, consistency_level=cl),
2526            QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl),
2527            QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl),
2528            QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl),
2529            QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl),
2530            QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl),
2531            # V4-only queries
2532            QueryMessage(query=self._SELECT_VIRTUAL_KEYSPACES, consistency_level=cl),
2533            QueryMessage(query=self._SELECT_VIRTUAL_TABLES, consistency_level=cl),
2534            QueryMessage(query=self._SELECT_VIRTUAL_COLUMNS, consistency_level=cl)
2535        ]
2536
2537        responses = self.connection.wait_for_responses(
2538            *queries, timeout=self.timeout, fail_on_error=False)
2539        (
2540            # copied from V3
2541            (ks_success, ks_result),
2542            (table_success, table_result),
2543            (col_success, col_result),
2544            (types_success, types_result),
2545            (functions_success, functions_result),
2546            (aggregates_success, aggregates_result),
2547            (triggers_success, triggers_result),
2548            (indexes_success, indexes_result),
2549            (views_success, views_result),
2550            # V4-only responses
2551            (virtual_ks_success, virtual_ks_result),
2552            (virtual_table_success, virtual_table_result),
2553            (virtual_column_success, virtual_column_result)
2554        ) = responses
2555
2556        # copied from V3
2557        self.keyspaces_result = self._handle_results(ks_success, ks_result)
2558        self.tables_result = self._handle_results(table_success, table_result)
2559        self.columns_result = self._handle_results(col_success, col_result)
2560        self.triggers_result = self._handle_results(triggers_success, triggers_result)
2561        self.types_result = self._handle_results(types_success, types_result)
2562        self.functions_result = self._handle_results(functions_success, functions_result)
2563        self.aggregates_result = self._handle_results(aggregates_success, aggregates_result)
2564        self.indexes_result = self._handle_results(indexes_success, indexes_result)
2565        self.views_result = self._handle_results(views_success, views_result)
2566        # V4-only results
2567        # These tables don't exist in some DSE versions reporting 4.X so we can
2568        # ignore them if we got an error
2569        self.virtual_keyspaces_result = self._handle_results(
2570            virtual_ks_success, virtual_ks_result,
2571            expected_failures=InvalidRequest
2572        )
2573        self.virtual_tables_result = self._handle_results(
2574            virtual_table_success, virtual_table_result,
2575            expected_failures=InvalidRequest
2576        )
2577        self.virtual_columns_result = self._handle_results(
2578            virtual_column_success, virtual_column_result,
2579            expected_failures=InvalidRequest
2580        )
2581
2582        self._aggregate_results()
2583
2584    def _aggregate_results(self):
2585        super(SchemaParserV4, self)._aggregate_results()
2586
2587        m = self.virtual_tables_rows
2588        for row in self.virtual_tables_result:
2589            m[row["keyspace_name"]].append(row)
2590
2591        m = self.virtual_columns_rows
2592        for row in self.virtual_columns_result:
2593            ks_name = row['keyspace_name']
2594            tab_name = row[self._table_name_col]
2595            m[ks_name][tab_name].append(row)
2596
2597    def get_all_keyspaces(self):
2598        for x in super(SchemaParserV4, self).get_all_keyspaces():
2599            yield x
2600
2601        for row in self.virtual_keyspaces_result:
2602            ks_name = row['keyspace_name']
2603            keyspace_meta = self._build_keyspace_metadata(row)
2604            keyspace_meta.virtual = True
2605
2606            for table_row in self.virtual_tables_rows.get(ks_name, []):
2607                table_name = table_row[self._table_name_col]
2608
2609                col_rows = self.virtual_columns_rows[ks_name][table_name]
2610                keyspace_meta._add_table_metadata(
2611                    self._build_table_metadata(table_row,
2612                                               col_rows=col_rows,
2613                                               virtual=True)
2614                )
2615            yield keyspace_meta
2616
2617    @staticmethod
2618    def _build_keyspace_metadata_internal(row):
2619        # necessary fields that aren't int virtual ks
2620        row["durable_writes"] = row.get("durable_writes", None)
2621        row["replication"] = row.get("replication", {})
2622        row["replication"]["class"] = row["replication"].get("class", None)
2623        return super(SchemaParserV4, SchemaParserV4)._build_keyspace_metadata_internal(row)
2624
2625
2626class TableMetadataV3(TableMetadata):
2627    compaction_options = {}
2628
2629    option_maps = ['compaction', 'compression', 'caching']
2630
2631    @property
2632    def is_cql_compatible(self):
2633        return True
2634
2635    @classmethod
2636    def _make_option_strings(cls, options_map):
2637        ret = []
2638        options_copy = dict(options_map.items())
2639
2640        for option in cls.option_maps:
2641            value = options_copy.get(option)
2642            if isinstance(value, Mapping):
2643                del options_copy[option]
2644                params = ("'%s': '%s'" % (k, v) for k, v in value.items())
2645                ret.append("%s = {%s}" % (option, ', '.join(params)))
2646
2647        for name, value in options_copy.items():
2648            if value is not None:
2649                if name == "comment":
2650                    value = value or ""
2651                ret.append("%s = %s" % (name, protect_value(value)))
2652
2653        return list(sorted(ret))
2654
2655
2656class MaterializedViewMetadata(object):
2657    """
2658    A representation of a materialized view on a table
2659    """
2660
2661    keyspace_name = None
2662
2663    """ A string name of the view."""
2664
2665    name = None
2666    """ A string name of the view."""
2667
2668    base_table_name = None
2669    """ A string name of the base table for this view."""
2670
2671    partition_key = None
2672    """
2673    A list of :class:`.ColumnMetadata` instances representing the columns in
2674    the partition key for this view.  This will always hold at least one
2675    column.
2676    """
2677
2678    clustering_key = None
2679    """
2680    A list of :class:`.ColumnMetadata` instances representing the columns
2681    in the clustering key for this view.
2682
2683    Note that a table may have no clustering keys, in which case this will
2684    be an empty list.
2685    """
2686
2687    columns = None
2688    """
2689    A dict mapping column names to :class:`.ColumnMetadata` instances.
2690    """
2691
2692    include_all_columns = None
2693    """ A flag indicating whether the view was created AS SELECT * """
2694
2695    where_clause = None
2696    """ String WHERE clause for the view select statement. From server metadata """
2697
2698    options = None
2699    """
2700    A dict mapping table option names to their specific settings for this
2701    view.
2702    """
2703
2704    extensions = None
2705    """
2706    Metadata describing configuration for table extensions
2707    """
2708
2709    def __init__(self, keyspace_name, view_name, base_table_name, include_all_columns, where_clause, options):
2710        self.keyspace_name = keyspace_name
2711        self.name = view_name
2712        self.base_table_name = base_table_name
2713        self.partition_key = []
2714        self.clustering_key = []
2715        self.columns = OrderedDict()
2716        self.include_all_columns = include_all_columns
2717        self.where_clause = where_clause
2718        self.options = options or {}
2719
2720    def as_cql_query(self, formatted=False):
2721        """
2722        Returns a CQL query that can be used to recreate this function.
2723        If `formatted` is set to :const:`True`, extra whitespace will
2724        be added to make the query more readable.
2725        """
2726        sep = '\n    ' if formatted else ' '
2727        keyspace = protect_name(self.keyspace_name)
2728        name = protect_name(self.name)
2729
2730        selected_cols = '*' if self.include_all_columns else ', '.join(protect_name(col.name) for col in self.columns.values())
2731        base_table = protect_name(self.base_table_name)
2732        where_clause = self.where_clause
2733
2734        part_key = ', '.join(protect_name(col.name) for col in self.partition_key)
2735        if len(self.partition_key) > 1:
2736            pk = "((%s)" % part_key
2737        else:
2738            pk = "(%s" % part_key
2739        if self.clustering_key:
2740            pk += ", %s" % ', '.join(protect_name(col.name) for col in self.clustering_key)
2741        pk += ")"
2742
2743        properties = TableMetadataV3._property_string(formatted, self.clustering_key, self.options)
2744
2745        ret = ("CREATE MATERIALIZED VIEW %(keyspace)s.%(name)s AS%(sep)s"
2746               "SELECT %(selected_cols)s%(sep)s"
2747               "FROM %(keyspace)s.%(base_table)s%(sep)s"
2748               "WHERE %(where_clause)s%(sep)s"
2749               "PRIMARY KEY %(pk)s%(sep)s"
2750               "WITH %(properties)s") % locals()
2751
2752        if self.extensions:
2753            registry = _RegisteredExtensionType._extension_registry
2754            for k in six.viewkeys(registry) & self.extensions:  # no viewkeys on OrderedMapSerializeKey
2755                ext = registry[k]
2756                cql = ext.after_table_cql(self, k, self.extensions[k])
2757                if cql:
2758                    ret += "\n\n%s" % (cql,)
2759        return ret
2760
2761    def export_as_string(self):
2762        return self.as_cql_query(formatted=True) + ";"
2763
2764
2765def get_schema_parser(connection, server_version, timeout):
2766    version = Version(server_version)
2767    if version >= Version('4.0.0'):
2768        return SchemaParserV4(connection, timeout)
2769    if version >= Version('3.0.0'):
2770        return SchemaParserV3(connection, timeout)
2771    else:
2772        # we could further specialize by version. Right now just refactoring the
2773        # multi-version parser we have as of C* 2.2.0rc1.
2774        return SchemaParserV22(connection, timeout)
2775
2776
2777def _cql_from_cass_type(cass_type):
2778    """
2779    A string representation of the type for this column, such as "varchar"
2780    or "map<string, int>".
2781    """
2782    if issubclass(cass_type, types.ReversedType):
2783        return cass_type.subtypes[0].cql_parameterized_type()
2784    else:
2785        return cass_type.cql_parameterized_type()
2786
2787
2788NO_VALID_REPLICA = object()
2789
2790
2791def group_keys_by_replica(session, keyspace, table, keys):
2792    """
2793    Returns a :class:`dict` with the keys grouped per host. This can be
2794    used to more accurately group by IN clause or to batch the keys per host.
2795
2796    If a valid replica is not found for a particular key it will be grouped under
2797    :class:`~.NO_VALID_REPLICA`
2798
2799    Example usage::
2800        result = group_keys_by_replica(
2801                     session, "system", "peers",
2802                     (("127.0.0.1", ), ("127.0.0.2", ))
2803                 )
2804    """
2805    cluster = session.cluster
2806
2807    partition_keys = cluster.metadata.keyspaces[keyspace].tables[table].partition_key
2808
2809    serializers = list(types._cqltypes[partition_key.cql_type] for partition_key in partition_keys)
2810    keys_per_host = defaultdict(list)
2811    distance = cluster._default_load_balancing_policy.distance
2812
2813    for key in keys:
2814        serialized_key = [serializer.serialize(pk, cluster.protocol_version)
2815                          for serializer, pk in zip(serializers, key)]
2816        if len(serialized_key) == 1:
2817            routing_key = serialized_key[0]
2818        else:
2819            routing_key = b"".join(struct.pack(">H%dsB" % len(p), len(p), p, 0) for p in serialized_key)
2820        all_replicas = cluster.metadata.get_replicas(keyspace, routing_key)
2821        # First check if there are local replicas
2822        valid_replicas = [host for host in all_replicas if
2823                          host.is_up and distance(host) == HostDistance.LOCAL]
2824        if not valid_replicas:
2825            valid_replicas = [host for host in all_replicas if host.is_up]
2826
2827        if valid_replicas:
2828            keys_per_host[random.choice(valid_replicas)].append(key)
2829        else:
2830            # We will group under this statement all the keys for which
2831            # we haven't found a valid replica
2832            keys_per_host[NO_VALID_REPLICA].append(key)
2833
2834    return dict(keys_per_host)
2835