1# Copyright DataStax, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from binascii import unhexlify 16from bisect import bisect_left 17from collections import defaultdict, Mapping 18from functools import total_ordering 19from hashlib import md5 20from itertools import islice, cycle 21import json 22import logging 23import re 24import six 25from six.moves import zip 26import sys 27from threading import RLock 28import struct 29import random 30 31murmur3 = None 32try: 33 from cassandra.murmur3 import murmur3 34except ImportError as e: 35 pass 36 37from cassandra import SignatureDescriptor, ConsistencyLevel, InvalidRequest, Unauthorized 38import cassandra.cqltypes as types 39from cassandra.encoder import Encoder 40from cassandra.marshal import varint_unpack 41from cassandra.protocol import QueryMessage 42from cassandra.query import dict_factory, bind_params 43from cassandra.util import OrderedDict, Version 44from cassandra.pool import HostDistance 45from cassandra.connection import EndPoint 46 47log = logging.getLogger(__name__) 48 49cql_keywords = set(( 50 'add', 'aggregate', 'all', 'allow', 'alter', 'and', 'apply', 'as', 'asc', 'ascii', 'authorize', 'batch', 'begin', 51 'bigint', 'blob', 'boolean', 'by', 'called', 'clustering', 'columnfamily', 'compact', 'contains', 'count', 52 'counter', 'create', 'custom', 'date', 'decimal', 'delete', 'desc', 'describe', 'distinct', 'double', 'drop', 53 'entries', 'execute', 'exists', 'filtering', 'finalfunc', 'float', 'from', 'frozen', 'full', 'function', 54 'functions', 'grant', 'if', 'in', 'index', 'inet', 'infinity', 'initcond', 'input', 'insert', 'int', 'into', 'is', 'json', 55 'key', 'keys', 'keyspace', 'keyspaces', 'language', 'limit', 'list', 'login', 'map', 'materialized', 'modify', 'nan', 'nologin', 56 'norecursive', 'nosuperuser', 'not', 'null', 'of', 'on', 'options', 'or', 'order', 'password', 'permission', 57 'permissions', 'primary', 'rename', 'replace', 'returns', 'revoke', 'role', 'roles', 'schema', 'select', 'set', 58 'sfunc', 'smallint', 'static', 'storage', 'stype', 'superuser', 'table', 'text', 'time', 'timestamp', 'timeuuid', 59 'tinyint', 'to', 'token', 'trigger', 'truncate', 'ttl', 'tuple', 'type', 'unlogged', 'update', 'use', 'user', 60 'users', 'using', 'uuid', 'values', 'varchar', 'varint', 'view', 'where', 'with', 'writetime' 61)) 62""" 63Set of keywords in CQL. 64 65Derived from .../cassandra/src/java/org/apache/cassandra/cql3/Cql.g 66""" 67 68cql_keywords_unreserved = set(( 69 'aggregate', 'all', 'as', 'ascii', 'bigint', 'blob', 'boolean', 'called', 'clustering', 'compact', 'contains', 70 'count', 'counter', 'custom', 'date', 'decimal', 'distinct', 'double', 'exists', 'filtering', 'finalfunc', 'float', 71 'frozen', 'function', 'functions', 'inet', 'initcond', 'input', 'int', 'json', 'key', 'keys', 'keyspaces', 72 'language', 'list', 'login', 'map', 'nologin', 'nosuperuser', 'options', 'password', 'permission', 'permissions', 73 'returns', 'role', 'roles', 'sfunc', 'smallint', 'static', 'storage', 'stype', 'superuser', 'text', 'time', 74 'timestamp', 'timeuuid', 'tinyint', 'trigger', 'ttl', 'tuple', 'type', 'user', 'users', 'uuid', 'values', 'varchar', 75 'varint', 'writetime' 76)) 77""" 78Set of unreserved keywords in CQL. 79 80Derived from .../cassandra/src/java/org/apache/cassandra/cql3/Cql.g 81""" 82 83cql_keywords_reserved = cql_keywords - cql_keywords_unreserved 84""" 85Set of reserved keywords in CQL. 86""" 87 88_encoder = Encoder() 89 90 91class Metadata(object): 92 """ 93 Holds a representation of the cluster schema and topology. 94 """ 95 96 cluster_name = None 97 """ The string name of the cluster. """ 98 99 keyspaces = None 100 """ 101 A map from keyspace names to matching :class:`~.KeyspaceMetadata` instances. 102 """ 103 104 partitioner = None 105 """ 106 The string name of the partitioner for the cluster. 107 """ 108 109 token_map = None 110 """ A :class:`~.TokenMap` instance describing the ring topology. """ 111 112 def __init__(self): 113 self.keyspaces = {} 114 self._hosts = {} 115 self._hosts_lock = RLock() 116 117 def export_schema_as_string(self): 118 """ 119 Returns a string that can be executed as a query in order to recreate 120 the entire schema. The string is formatted to be human readable. 121 """ 122 return "\n\n".join(ks.export_as_string() for ks in self.keyspaces.values()) 123 124 def refresh(self, connection, timeout, target_type=None, change_type=None, **kwargs): 125 126 server_version = self.get_host(connection.endpoint).release_version 127 parser = get_schema_parser(connection, server_version, timeout) 128 129 if not target_type: 130 self._rebuild_all(parser) 131 return 132 133 tt_lower = target_type.lower() 134 try: 135 parse_method = getattr(parser, 'get_' + tt_lower) 136 meta = parse_method(self.keyspaces, **kwargs) 137 if meta: 138 update_method = getattr(self, '_update_' + tt_lower) 139 if tt_lower == 'keyspace' and connection.protocol_version < 3: 140 # we didn't have 'type' target in legacy protocol versions, so we need to query those too 141 user_types = parser.get_types_map(self.keyspaces, **kwargs) 142 self._update_keyspace(meta, user_types) 143 else: 144 update_method(meta) 145 else: 146 drop_method = getattr(self, '_drop_' + tt_lower) 147 drop_method(**kwargs) 148 except AttributeError: 149 raise ValueError("Unknown schema target_type: '%s'" % target_type) 150 151 def _rebuild_all(self, parser): 152 current_keyspaces = set() 153 for keyspace_meta in parser.get_all_keyspaces(): 154 current_keyspaces.add(keyspace_meta.name) 155 old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None) 156 self.keyspaces[keyspace_meta.name] = keyspace_meta 157 if old_keyspace_meta: 158 self._keyspace_updated(keyspace_meta.name) 159 else: 160 self._keyspace_added(keyspace_meta.name) 161 162 # remove not-just-added keyspaces 163 removed_keyspaces = [name for name in self.keyspaces.keys() 164 if name not in current_keyspaces] 165 self.keyspaces = dict((name, meta) for name, meta in self.keyspaces.items() 166 if name in current_keyspaces) 167 for ksname in removed_keyspaces: 168 self._keyspace_removed(ksname) 169 170 def _update_keyspace(self, keyspace_meta, new_user_types=None): 171 ks_name = keyspace_meta.name 172 old_keyspace_meta = self.keyspaces.get(ks_name, None) 173 self.keyspaces[ks_name] = keyspace_meta 174 if old_keyspace_meta: 175 keyspace_meta.tables = old_keyspace_meta.tables 176 keyspace_meta.user_types = new_user_types if new_user_types is not None else old_keyspace_meta.user_types 177 keyspace_meta.indexes = old_keyspace_meta.indexes 178 keyspace_meta.functions = old_keyspace_meta.functions 179 keyspace_meta.aggregates = old_keyspace_meta.aggregates 180 keyspace_meta.views = old_keyspace_meta.views 181 if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy): 182 self._keyspace_updated(ks_name) 183 else: 184 self._keyspace_added(ks_name) 185 186 def _drop_keyspace(self, keyspace): 187 if self.keyspaces.pop(keyspace, None): 188 self._keyspace_removed(keyspace) 189 190 def _update_table(self, meta): 191 try: 192 keyspace_meta = self.keyspaces[meta.keyspace_name] 193 # this is unfortunate, but protocol v4 does not differentiate 194 # between events for tables and views. <parser>.get_table will 195 # return one or the other based on the query results. 196 # Here we deal with that. 197 if isinstance(meta, TableMetadata): 198 keyspace_meta._add_table_metadata(meta) 199 else: 200 keyspace_meta._add_view_metadata(meta) 201 except KeyError: 202 # can happen if keyspace disappears while processing async event 203 pass 204 205 def _drop_table(self, keyspace, table): 206 try: 207 keyspace_meta = self.keyspaces[keyspace] 208 keyspace_meta._drop_table_metadata(table) # handles either table or view 209 except KeyError: 210 # can happen if keyspace disappears while processing async event 211 pass 212 213 def _update_type(self, type_meta): 214 try: 215 self.keyspaces[type_meta.keyspace].user_types[type_meta.name] = type_meta 216 except KeyError: 217 # can happen if keyspace disappears while processing async event 218 pass 219 220 def _drop_type(self, keyspace, type): 221 try: 222 self.keyspaces[keyspace].user_types.pop(type, None) 223 except KeyError: 224 # can happen if keyspace disappears while processing async event 225 pass 226 227 def _update_function(self, function_meta): 228 try: 229 self.keyspaces[function_meta.keyspace].functions[function_meta.signature] = function_meta 230 except KeyError: 231 # can happen if keyspace disappears while processing async event 232 pass 233 234 def _drop_function(self, keyspace, function): 235 try: 236 self.keyspaces[keyspace].functions.pop(function.signature, None) 237 except KeyError: 238 pass 239 240 def _update_aggregate(self, aggregate_meta): 241 try: 242 self.keyspaces[aggregate_meta.keyspace].aggregates[aggregate_meta.signature] = aggregate_meta 243 except KeyError: 244 pass 245 246 def _drop_aggregate(self, keyspace, aggregate): 247 try: 248 self.keyspaces[keyspace].aggregates.pop(aggregate.signature, None) 249 except KeyError: 250 pass 251 252 def _keyspace_added(self, ksname): 253 if self.token_map: 254 self.token_map.rebuild_keyspace(ksname, build_if_absent=False) 255 256 def _keyspace_updated(self, ksname): 257 if self.token_map: 258 self.token_map.rebuild_keyspace(ksname, build_if_absent=False) 259 260 def _keyspace_removed(self, ksname): 261 if self.token_map: 262 self.token_map.remove_keyspace(ksname) 263 264 def rebuild_token_map(self, partitioner, token_map): 265 """ 266 Rebuild our view of the topology from fresh rows from the 267 system topology tables. 268 For internal use only. 269 """ 270 self.partitioner = partitioner 271 if partitioner.endswith('RandomPartitioner'): 272 token_class = MD5Token 273 elif partitioner.endswith('Murmur3Partitioner'): 274 token_class = Murmur3Token 275 elif partitioner.endswith('ByteOrderedPartitioner'): 276 token_class = BytesToken 277 else: 278 self.token_map = None 279 return 280 281 token_to_host_owner = {} 282 ring = [] 283 for host, token_strings in six.iteritems(token_map): 284 for token_string in token_strings: 285 token = token_class.from_string(token_string) 286 ring.append(token) 287 token_to_host_owner[token] = host 288 289 all_tokens = sorted(ring) 290 self.token_map = TokenMap( 291 token_class, token_to_host_owner, all_tokens, self) 292 293 def get_replicas(self, keyspace, key): 294 """ 295 Returns a list of :class:`.Host` instances that are replicas for a given 296 partition key. 297 """ 298 t = self.token_map 299 if not t: 300 return [] 301 try: 302 return t.get_replicas(keyspace, t.token_class.from_key(key)) 303 except NoMurmur3: 304 return [] 305 306 def can_support_partitioner(self): 307 if self.partitioner.endswith('Murmur3Partitioner') and murmur3 is None: 308 return False 309 else: 310 return True 311 312 def add_or_return_host(self, host): 313 """ 314 Returns a tuple (host, new), where ``host`` is a Host 315 instance, and ``new`` is a bool indicating whether 316 the host was newly added. 317 """ 318 with self._hosts_lock: 319 try: 320 return self._hosts[host.endpoint], False 321 except KeyError: 322 self._hosts[host.endpoint] = host 323 return host, True 324 325 def remove_host(self, host): 326 with self._hosts_lock: 327 return bool(self._hosts.pop(host.endpoint, False)) 328 329 def get_host(self, endpoint_or_address): 330 """ 331 Find a host in the metadata for a specific endpoint. If a string inet address is passed, 332 iterate all hosts to match the :attr:`~.pool.Host.broadcast_rpc_address` attribute. 333 """ 334 if not isinstance(endpoint_or_address, EndPoint): 335 return self._get_host_by_address(endpoint_or_address) 336 337 return self._hosts.get(endpoint_or_address) 338 339 def _get_host_by_address(self, address): 340 for host in six.itervalues(self._hosts): 341 if host.broadcast_rpc_address == address: 342 return host 343 return None 344 345 def all_hosts(self): 346 """ 347 Returns a list of all known :class:`.Host` instances in the cluster. 348 """ 349 with self._hosts_lock: 350 return list(self._hosts.values()) 351 352 353REPLICATION_STRATEGY_CLASS_PREFIX = "org.apache.cassandra.locator." 354 355 356def trim_if_startswith(s, prefix): 357 if s.startswith(prefix): 358 return s[len(prefix):] 359 return s 360 361 362_replication_strategies = {} 363 364 365class ReplicationStrategyTypeType(type): 366 def __new__(metacls, name, bases, dct): 367 dct.setdefault('name', name) 368 cls = type.__new__(metacls, name, bases, dct) 369 if not name.startswith('_'): 370 _replication_strategies[name] = cls 371 return cls 372 373 374@six.add_metaclass(ReplicationStrategyTypeType) 375class _ReplicationStrategy(object): 376 options_map = None 377 378 @classmethod 379 def create(cls, strategy_class, options_map): 380 if not strategy_class: 381 return None 382 383 strategy_name = trim_if_startswith(strategy_class, REPLICATION_STRATEGY_CLASS_PREFIX) 384 385 rs_class = _replication_strategies.get(strategy_name, None) 386 if rs_class is None: 387 rs_class = _UnknownStrategyBuilder(strategy_name) 388 _replication_strategies[strategy_name] = rs_class 389 390 try: 391 rs_instance = rs_class(options_map) 392 except Exception as exc: 393 log.warning("Failed creating %s with options %s: %s", strategy_name, options_map, exc) 394 return None 395 396 return rs_instance 397 398 def make_token_replica_map(self, token_to_host_owner, ring): 399 raise NotImplementedError() 400 401 def export_for_schema(self): 402 raise NotImplementedError() 403 404 405ReplicationStrategy = _ReplicationStrategy 406 407 408class _UnknownStrategyBuilder(object): 409 def __init__(self, name): 410 self.name = name 411 412 def __call__(self, options_map): 413 strategy_instance = _UnknownStrategy(self.name, options_map) 414 return strategy_instance 415 416 417class _UnknownStrategy(ReplicationStrategy): 418 def __init__(self, name, options_map): 419 self.name = name 420 self.options_map = options_map.copy() if options_map is not None else dict() 421 self.options_map['class'] = self.name 422 423 def __eq__(self, other): 424 return (isinstance(other, _UnknownStrategy) and 425 self.name == other.name and 426 self.options_map == other.options_map) 427 428 def export_for_schema(self): 429 """ 430 Returns a string version of these replication options which are 431 suitable for use in a CREATE KEYSPACE statement. 432 """ 433 if self.options_map: 434 return dict((str(key), str(value)) for key, value in self.options_map.items()) 435 return "{'class': '%s'}" % (self.name, ) 436 437 def make_token_replica_map(self, token_to_host_owner, ring): 438 return {} 439 440 441class SimpleStrategy(ReplicationStrategy): 442 443 replication_factor = None 444 """ 445 The replication factor for this keyspace. 446 """ 447 448 def __init__(self, options_map): 449 try: 450 self.replication_factor = int(options_map['replication_factor']) 451 except Exception: 452 raise ValueError("SimpleStrategy requires an integer 'replication_factor' option") 453 454 def make_token_replica_map(self, token_to_host_owner, ring): 455 replica_map = {} 456 for i in range(len(ring)): 457 j, hosts = 0, list() 458 while len(hosts) < self.replication_factor and j < len(ring): 459 token = ring[(i + j) % len(ring)] 460 host = token_to_host_owner[token] 461 if host not in hosts: 462 hosts.append(host) 463 j += 1 464 465 replica_map[ring[i]] = hosts 466 return replica_map 467 468 def export_for_schema(self): 469 """ 470 Returns a string version of these replication options which are 471 suitable for use in a CREATE KEYSPACE statement. 472 """ 473 return "{'class': 'SimpleStrategy', 'replication_factor': '%d'}" \ 474 % (self.replication_factor,) 475 476 def __eq__(self, other): 477 if not isinstance(other, SimpleStrategy): 478 return False 479 480 return self.replication_factor == other.replication_factor 481 482 483class NetworkTopologyStrategy(ReplicationStrategy): 484 485 dc_replication_factors = None 486 """ 487 A map of datacenter names to the replication factor for that DC. 488 """ 489 490 def __init__(self, dc_replication_factors): 491 self.dc_replication_factors = dict( 492 (str(k), int(v)) for k, v in dc_replication_factors.items()) 493 494 def make_token_replica_map(self, token_to_host_owner, ring): 495 dc_rf_map = dict((dc, int(rf)) 496 for dc, rf in self.dc_replication_factors.items() if rf > 0) 497 498 # build a map of DCs to lists of indexes into `ring` for tokens that 499 # belong to that DC 500 dc_to_token_offset = defaultdict(list) 501 dc_racks = defaultdict(set) 502 hosts_per_dc = defaultdict(set) 503 for i, token in enumerate(ring): 504 host = token_to_host_owner[token] 505 dc_to_token_offset[host.datacenter].append(i) 506 if host.datacenter and host.rack: 507 dc_racks[host.datacenter].add(host.rack) 508 hosts_per_dc[host.datacenter].add(host) 509 510 # A map of DCs to an index into the dc_to_token_offset value for that dc. 511 # This is how we keep track of advancing around the ring for each DC. 512 dc_to_current_index = defaultdict(int) 513 514 replica_map = defaultdict(list) 515 for i in range(len(ring)): 516 replicas = replica_map[ring[i]] 517 518 # go through each DC and find the replicas in that DC 519 for dc in dc_to_token_offset.keys(): 520 if dc not in dc_rf_map: 521 continue 522 523 # advance our per-DC index until we're up to at least the 524 # current token in the ring 525 token_offsets = dc_to_token_offset[dc] 526 index = dc_to_current_index[dc] 527 num_tokens = len(token_offsets) 528 while index < num_tokens and token_offsets[index] < i: 529 index += 1 530 dc_to_current_index[dc] = index 531 532 replicas_remaining = dc_rf_map[dc] 533 replicas_this_dc = 0 534 skipped_hosts = [] 535 racks_placed = set() 536 racks_this_dc = dc_racks[dc] 537 hosts_this_dc = len(hosts_per_dc[dc]) 538 for token_offset in islice(cycle(token_offsets), index, index + num_tokens): 539 host = token_to_host_owner[ring[token_offset]] 540 if replicas_remaining == 0 or replicas_this_dc == hosts_this_dc: 541 break 542 543 if host in replicas: 544 continue 545 546 if host.rack in racks_placed and len(racks_placed) < len(racks_this_dc): 547 skipped_hosts.append(host) 548 continue 549 550 replicas.append(host) 551 replicas_this_dc += 1 552 replicas_remaining -= 1 553 racks_placed.add(host.rack) 554 555 if len(racks_placed) == len(racks_this_dc): 556 for host in skipped_hosts: 557 if replicas_remaining == 0: 558 break 559 replicas.append(host) 560 replicas_remaining -= 1 561 del skipped_hosts[:] 562 563 return replica_map 564 565 def export_for_schema(self): 566 """ 567 Returns a string version of these replication options which are 568 suitable for use in a CREATE KEYSPACE statement. 569 """ 570 ret = "{'class': 'NetworkTopologyStrategy'" 571 for dc, repl_factor in sorted(self.dc_replication_factors.items()): 572 ret += ", '%s': '%d'" % (dc, repl_factor) 573 return ret + "}" 574 575 def __eq__(self, other): 576 if not isinstance(other, NetworkTopologyStrategy): 577 return False 578 579 return self.dc_replication_factors == other.dc_replication_factors 580 581 582class LocalStrategy(ReplicationStrategy): 583 def __init__(self, options_map): 584 pass 585 586 def make_token_replica_map(self, token_to_host_owner, ring): 587 return {} 588 589 def export_for_schema(self): 590 """ 591 Returns a string version of these replication options which are 592 suitable for use in a CREATE KEYSPACE statement. 593 """ 594 return "{'class': 'LocalStrategy'}" 595 596 def __eq__(self, other): 597 return isinstance(other, LocalStrategy) 598 599 600class KeyspaceMetadata(object): 601 """ 602 A representation of the schema for a single keyspace. 603 """ 604 605 name = None 606 """ The string name of the keyspace. """ 607 608 durable_writes = True 609 """ 610 A boolean indicating whether durable writes are enabled for this keyspace 611 or not. 612 """ 613 614 replication_strategy = None 615 """ 616 A :class:`.ReplicationStrategy` subclass object. 617 """ 618 619 tables = None 620 """ 621 A map from table names to instances of :class:`~.TableMetadata`. 622 """ 623 624 indexes = None 625 """ 626 A dict mapping index names to :class:`.IndexMetadata` instances. 627 """ 628 629 user_types = None 630 """ 631 A map from user-defined type names to instances of :class:`~cassandra.metadata.UserType`. 632 633 .. versionadded:: 2.1.0 634 """ 635 636 functions = None 637 """ 638 A map from user-defined function signatures to instances of :class:`~cassandra.metadata.Function`. 639 640 .. versionadded:: 2.6.0 641 """ 642 643 aggregates = None 644 """ 645 A map from user-defined aggregate signatures to instances of :class:`~cassandra.metadata.Aggregate`. 646 647 .. versionadded:: 2.6.0 648 """ 649 650 views = None 651 """ 652 A dict mapping view names to :class:`.MaterializedViewMetadata` instances. 653 """ 654 655 virtual = False 656 """ 657 A boolean indicating if this is a virtual keyspace or not. Always ``False`` 658 for clusters running pre-4.0 versions of Cassandra. 659 660 .. versionadded:: 3.15 661 """ 662 663 _exc_info = None 664 """ set if metadata parsing failed """ 665 666 def __init__(self, name, durable_writes, strategy_class, strategy_options): 667 self.name = name 668 self.durable_writes = durable_writes 669 self.replication_strategy = ReplicationStrategy.create(strategy_class, strategy_options) 670 self.tables = {} 671 self.indexes = {} 672 self.user_types = {} 673 self.functions = {} 674 self.aggregates = {} 675 self.views = {} 676 677 def export_as_string(self): 678 """ 679 Returns a CQL query string that can be used to recreate the entire keyspace, 680 including user-defined types and tables. 681 """ 682 cql = "\n\n".join([self.as_cql_query() + ';'] + 683 self.user_type_strings() + 684 [f.export_as_string() for f in self.functions.values()] + 685 [a.export_as_string() for a in self.aggregates.values()] + 686 [t.export_as_string() for t in self.tables.values()]) 687 if self._exc_info: 688 import traceback 689 ret = "/*\nWarning: Keyspace %s is incomplete because of an error processing metadata.\n" % \ 690 (self.name) 691 for line in traceback.format_exception(*self._exc_info): 692 ret += line 693 ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % cql 694 return ret 695 if self.virtual: 696 return ("/*\nWarning: Keyspace {ks} is a virtual keyspace and cannot be recreated with CQL.\n" 697 "Structure, for reference:*/\n" 698 "{cql}\n" 699 "").format(ks=self.name, cql=cql) 700 return cql 701 702 def as_cql_query(self): 703 """ 704 Returns a CQL query string that can be used to recreate just this keyspace, 705 not including user-defined types and tables. 706 """ 707 if self.virtual: 708 return "// VIRTUAL KEYSPACE {}".format(protect_name(self.name)) 709 ret = "CREATE KEYSPACE %s WITH replication = %s " % ( 710 protect_name(self.name), 711 self.replication_strategy.export_for_schema()) 712 return ret + (' AND durable_writes = %s' % ("true" if self.durable_writes else "false")) 713 714 def user_type_strings(self): 715 user_type_strings = [] 716 user_types = self.user_types.copy() 717 keys = sorted(user_types.keys()) 718 for k in keys: 719 if k in user_types: 720 self.resolve_user_types(k, user_types, user_type_strings) 721 return user_type_strings 722 723 def resolve_user_types(self, key, user_types, user_type_strings): 724 user_type = user_types.pop(key) 725 for type_name in user_type.field_types: 726 for sub_type in types.cql_types_from_string(type_name): 727 if sub_type in user_types: 728 self.resolve_user_types(sub_type, user_types, user_type_strings) 729 user_type_strings.append(user_type.export_as_string()) 730 731 def _add_table_metadata(self, table_metadata): 732 old_indexes = {} 733 old_meta = self.tables.get(table_metadata.name, None) 734 if old_meta: 735 # views are not queried with table, so they must be transferred to new 736 table_metadata.views = old_meta.views 737 # indexes will be updated with what is on the new metadata 738 old_indexes = old_meta.indexes 739 740 # note the intentional order of add before remove 741 # this makes sure the maps are never absent something that existed before this update 742 for index_name, index_metadata in six.iteritems(table_metadata.indexes): 743 self.indexes[index_name] = index_metadata 744 745 for index_name in (n for n in old_indexes if n not in table_metadata.indexes): 746 self.indexes.pop(index_name, None) 747 748 self.tables[table_metadata.name] = table_metadata 749 750 def _drop_table_metadata(self, table_name): 751 table_meta = self.tables.pop(table_name, None) 752 if table_meta: 753 for index_name in table_meta.indexes: 754 self.indexes.pop(index_name, None) 755 for view_name in table_meta.views: 756 self.views.pop(view_name, None) 757 return 758 # we can't tell table drops from views, so drop both 759 # (name is unique among them, within a keyspace) 760 view_meta = self.views.pop(table_name, None) 761 if view_meta: 762 try: 763 self.tables[view_meta.base_table_name].views.pop(table_name, None) 764 except KeyError: 765 pass 766 767 def _add_view_metadata(self, view_metadata): 768 try: 769 self.tables[view_metadata.base_table_name].views[view_metadata.name] = view_metadata 770 self.views[view_metadata.name] = view_metadata 771 except KeyError: 772 pass 773 774 775class UserType(object): 776 """ 777 A user defined type, as created by ``CREATE TYPE`` statements. 778 779 User-defined types were introduced in Cassandra 2.1. 780 781 .. versionadded:: 2.1.0 782 """ 783 784 keyspace = None 785 """ 786 The string name of the keyspace in which this type is defined. 787 """ 788 789 name = None 790 """ 791 The name of this type. 792 """ 793 794 field_names = None 795 """ 796 An ordered list of the names for each field in this user-defined type. 797 """ 798 799 field_types = None 800 """ 801 An ordered list of the types for each field in this user-defined type. 802 """ 803 804 def __init__(self, keyspace, name, field_names, field_types): 805 self.keyspace = keyspace 806 self.name = name 807 # non-frozen collections can return None 808 self.field_names = field_names or [] 809 self.field_types = field_types or [] 810 811 def as_cql_query(self, formatted=False): 812 """ 813 Returns a CQL query that can be used to recreate this type. 814 If `formatted` is set to :const:`True`, extra whitespace will 815 be added to make the query more readable. 816 """ 817 ret = "CREATE TYPE %s.%s (%s" % ( 818 protect_name(self.keyspace), 819 protect_name(self.name), 820 "\n" if formatted else "") 821 822 if formatted: 823 field_join = ",\n" 824 padding = " " 825 else: 826 field_join = ", " 827 padding = "" 828 829 fields = [] 830 for field_name, field_type in zip(self.field_names, self.field_types): 831 fields.append("%s %s" % (protect_name(field_name), field_type)) 832 833 ret += field_join.join("%s%s" % (padding, field) for field in fields) 834 ret += "\n)" if formatted else ")" 835 return ret 836 837 def export_as_string(self): 838 return self.as_cql_query(formatted=True) + ';' 839 840 841class Aggregate(object): 842 """ 843 A user defined aggregate function, as created by ``CREATE AGGREGATE`` statements. 844 845 Aggregate functions were introduced in Cassandra 2.2 846 847 .. versionadded:: 2.6.0 848 """ 849 850 keyspace = None 851 """ 852 The string name of the keyspace in which this aggregate is defined 853 """ 854 855 name = None 856 """ 857 The name of this aggregate 858 """ 859 860 argument_types = None 861 """ 862 An ordered list of the types for each argument to the aggregate 863 """ 864 865 final_func = None 866 """ 867 Name of a final function 868 """ 869 870 initial_condition = None 871 """ 872 Initial condition of the aggregate 873 """ 874 875 return_type = None 876 """ 877 Return type of the aggregate 878 """ 879 880 state_func = None 881 """ 882 Name of a state function 883 """ 884 885 state_type = None 886 """ 887 Type of the aggregate state 888 """ 889 890 def __init__(self, keyspace, name, argument_types, state_func, 891 state_type, final_func, initial_condition, return_type): 892 self.keyspace = keyspace 893 self.name = name 894 self.argument_types = argument_types 895 self.state_func = state_func 896 self.state_type = state_type 897 self.final_func = final_func 898 self.initial_condition = initial_condition 899 self.return_type = return_type 900 901 def as_cql_query(self, formatted=False): 902 """ 903 Returns a CQL query that can be used to recreate this aggregate. 904 If `formatted` is set to :const:`True`, extra whitespace will 905 be added to make the query more readable. 906 """ 907 sep = '\n ' if formatted else ' ' 908 keyspace = protect_name(self.keyspace) 909 name = protect_name(self.name) 910 type_list = ', '.join(self.argument_types) 911 state_func = protect_name(self.state_func) 912 state_type = self.state_type 913 914 ret = "CREATE AGGREGATE %(keyspace)s.%(name)s(%(type_list)s)%(sep)s" \ 915 "SFUNC %(state_func)s%(sep)s" \ 916 "STYPE %(state_type)s" % locals() 917 918 ret += ''.join((sep, 'FINALFUNC ', protect_name(self.final_func))) if self.final_func else '' 919 ret += ''.join((sep, 'INITCOND ', self.initial_condition)) if self.initial_condition is not None else '' 920 921 return ret 922 923 def export_as_string(self): 924 return self.as_cql_query(formatted=True) + ';' 925 926 @property 927 def signature(self): 928 return SignatureDescriptor.format_signature(self.name, self.argument_types) 929 930 931class Function(object): 932 """ 933 A user defined function, as created by ``CREATE FUNCTION`` statements. 934 935 User-defined functions were introduced in Cassandra 2.2 936 937 .. versionadded:: 2.6.0 938 """ 939 940 keyspace = None 941 """ 942 The string name of the keyspace in which this function is defined 943 """ 944 945 name = None 946 """ 947 The name of this function 948 """ 949 950 argument_types = None 951 """ 952 An ordered list of the types for each argument to the function 953 """ 954 955 argument_names = None 956 """ 957 An ordered list of the names of each argument to the function 958 """ 959 960 return_type = None 961 """ 962 Return type of the function 963 """ 964 965 language = None 966 """ 967 Language of the function body 968 """ 969 970 body = None 971 """ 972 Function body string 973 """ 974 975 called_on_null_input = None 976 """ 977 Flag indicating whether this function should be called for rows with null values 978 (convenience function to avoid handling nulls explicitly if the result will just be null) 979 """ 980 981 def __init__(self, keyspace, name, argument_types, argument_names, 982 return_type, language, body, called_on_null_input): 983 self.keyspace = keyspace 984 self.name = name 985 self.argument_types = argument_types 986 # argument_types (frozen<list<>>) will always be a list 987 # argument_name is not frozen in C* < 3.0 and may return None 988 self.argument_names = argument_names or [] 989 self.return_type = return_type 990 self.language = language 991 self.body = body 992 self.called_on_null_input = called_on_null_input 993 994 def as_cql_query(self, formatted=False): 995 """ 996 Returns a CQL query that can be used to recreate this function. 997 If `formatted` is set to :const:`True`, extra whitespace will 998 be added to make the query more readable. 999 """ 1000 sep = '\n ' if formatted else ' ' 1001 keyspace = protect_name(self.keyspace) 1002 name = protect_name(self.name) 1003 arg_list = ', '.join(["%s %s" % (protect_name(n), t) 1004 for n, t in zip(self.argument_names, self.argument_types)]) 1005 typ = self.return_type 1006 lang = self.language 1007 body = self.body 1008 on_null = "CALLED" if self.called_on_null_input else "RETURNS NULL" 1009 1010 return "CREATE FUNCTION %(keyspace)s.%(name)s(%(arg_list)s)%(sep)s" \ 1011 "%(on_null)s ON NULL INPUT%(sep)s" \ 1012 "RETURNS %(typ)s%(sep)s" \ 1013 "LANGUAGE %(lang)s%(sep)s" \ 1014 "AS $$%(body)s$$" % locals() 1015 1016 def export_as_string(self): 1017 return self.as_cql_query(formatted=True) + ';' 1018 1019 @property 1020 def signature(self): 1021 return SignatureDescriptor.format_signature(self.name, self.argument_types) 1022 1023 1024class TableMetadata(object): 1025 """ 1026 A representation of the schema for a single table. 1027 """ 1028 1029 keyspace_name = None 1030 """ String name of this Table's keyspace """ 1031 1032 name = None 1033 """ The string name of the table. """ 1034 1035 partition_key = None 1036 """ 1037 A list of :class:`.ColumnMetadata` instances representing the columns in 1038 the partition key for this table. This will always hold at least one 1039 column. 1040 """ 1041 1042 clustering_key = None 1043 """ 1044 A list of :class:`.ColumnMetadata` instances representing the columns 1045 in the clustering key for this table. These are all of the 1046 :attr:`.primary_key` columns that are not in the :attr:`.partition_key`. 1047 1048 Note that a table may have no clustering keys, in which case this will 1049 be an empty list. 1050 """ 1051 1052 @property 1053 def primary_key(self): 1054 """ 1055 A list of :class:`.ColumnMetadata` representing the components of 1056 the primary key for this table. 1057 """ 1058 return self.partition_key + self.clustering_key 1059 1060 columns = None 1061 """ 1062 A dict mapping column names to :class:`.ColumnMetadata` instances. 1063 """ 1064 1065 indexes = None 1066 """ 1067 A dict mapping index names to :class:`.IndexMetadata` instances. 1068 """ 1069 1070 is_compact_storage = False 1071 1072 options = None 1073 """ 1074 A dict mapping table option names to their specific settings for this 1075 table. 1076 """ 1077 1078 compaction_options = { 1079 "min_compaction_threshold": "min_threshold", 1080 "max_compaction_threshold": "max_threshold", 1081 "compaction_strategy_class": "class"} 1082 1083 triggers = None 1084 """ 1085 A dict mapping trigger names to :class:`.TriggerMetadata` instances. 1086 """ 1087 1088 views = None 1089 """ 1090 A dict mapping view names to :class:`.MaterializedViewMetadata` instances. 1091 """ 1092 1093 _exc_info = None 1094 """ set if metadata parsing failed """ 1095 1096 virtual = False 1097 """ 1098 A boolean indicating if this is a virtual table or not. Always ``False`` 1099 for clusters running pre-4.0 versions of Cassandra. 1100 1101 .. versionadded:: 3.15 1102 """ 1103 1104 @property 1105 def is_cql_compatible(self): 1106 """ 1107 A boolean indicating if this table can be represented as CQL in export 1108 """ 1109 if self.virtual: 1110 return False 1111 comparator = getattr(self, 'comparator', None) 1112 if comparator: 1113 # no compact storage with more than one column beyond PK if there 1114 # are clustering columns 1115 incompatible = (self.is_compact_storage and 1116 len(self.columns) > len(self.primary_key) + 1 and 1117 len(self.clustering_key) >= 1) 1118 1119 return not incompatible 1120 return True 1121 1122 extensions = None 1123 """ 1124 Metadata describing configuration for table extensions 1125 """ 1126 1127 def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None, virtual=False): 1128 self.keyspace_name = keyspace_name 1129 self.name = name 1130 self.partition_key = [] if partition_key is None else partition_key 1131 self.clustering_key = [] if clustering_key is None else clustering_key 1132 self.columns = OrderedDict() if columns is None else columns 1133 self.indexes = {} 1134 self.options = {} if options is None else options 1135 self.comparator = None 1136 self.triggers = OrderedDict() if triggers is None else triggers 1137 self.views = {} 1138 self.virtual = virtual 1139 1140 def export_as_string(self): 1141 """ 1142 Returns a string of CQL queries that can be used to recreate this table 1143 along with all indexes on it. The returned string is formatted to 1144 be human readable. 1145 """ 1146 if self._exc_info: 1147 import traceback 1148 ret = "/*\nWarning: Table %s.%s is incomplete because of an error processing metadata.\n" % \ 1149 (self.keyspace_name, self.name) 1150 for line in traceback.format_exception(*self._exc_info): 1151 ret += line 1152 ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self._all_as_cql() 1153 elif not self.is_cql_compatible: 1154 # If we can't produce this table with CQL, comment inline 1155 ret = "/*\nWarning: Table %s.%s omitted because it has constructs not compatible with CQL (was created via legacy API).\n" % \ 1156 (self.keyspace_name, self.name) 1157 ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self._all_as_cql() 1158 elif self.virtual: 1159 ret = ('/*\nWarning: Table {ks}.{tab} is a virtual table and cannot be recreated with CQL.\n' 1160 'Structure, for reference:\n' 1161 '{cql}\n*/').format(ks=self.keyspace_name, tab=self.name, cql=self._all_as_cql()) 1162 1163 else: 1164 ret = self._all_as_cql() 1165 1166 return ret 1167 1168 def _all_as_cql(self): 1169 ret = self.as_cql_query(formatted=True) 1170 ret += ";" 1171 1172 for index in self.indexes.values(): 1173 ret += "\n%s;" % index.as_cql_query() 1174 1175 for trigger_meta in self.triggers.values(): 1176 ret += "\n%s;" % (trigger_meta.as_cql_query(),) 1177 1178 for view_meta in self.views.values(): 1179 ret += "\n\n%s;" % (view_meta.as_cql_query(formatted=True),) 1180 1181 if self.extensions: 1182 registry = _RegisteredExtensionType._extension_registry 1183 for k in six.viewkeys(registry) & self.extensions: # no viewkeys on OrderedMapSerializeKey 1184 ext = registry[k] 1185 cql = ext.after_table_cql(self, k, self.extensions[k]) 1186 if cql: 1187 ret += "\n\n%s" % (cql,) 1188 1189 return ret 1190 1191 def as_cql_query(self, formatted=False): 1192 """ 1193 Returns a CQL query that can be used to recreate this table (index 1194 creations are not included). If `formatted` is set to :const:`True`, 1195 extra whitespace will be added to make the query human readable. 1196 """ 1197 ret = "%s TABLE %s.%s (%s" % ( 1198 ('VIRTUAL' if self.virtual else 'CREATE'), 1199 protect_name(self.keyspace_name), 1200 protect_name(self.name), 1201 "\n" if formatted else "") 1202 1203 if formatted: 1204 column_join = ",\n" 1205 padding = " " 1206 else: 1207 column_join = ", " 1208 padding = "" 1209 1210 columns = [] 1211 for col in self.columns.values(): 1212 columns.append("%s %s%s" % (protect_name(col.name), col.cql_type, ' static' if col.is_static else '')) 1213 1214 if len(self.partition_key) == 1 and not self.clustering_key: 1215 columns[0] += " PRIMARY KEY" 1216 1217 ret += column_join.join("%s%s" % (padding, col) for col in columns) 1218 1219 # primary key 1220 if len(self.partition_key) > 1 or self.clustering_key: 1221 ret += "%s%sPRIMARY KEY (" % (column_join, padding) 1222 1223 if len(self.partition_key) > 1: 1224 ret += "(%s)" % ", ".join(protect_name(col.name) for col in self.partition_key) 1225 else: 1226 ret += protect_name(self.partition_key[0].name) 1227 1228 if self.clustering_key: 1229 ret += ", %s" % ", ".join(protect_name(col.name) for col in self.clustering_key) 1230 1231 ret += ")" 1232 1233 # properties 1234 ret += "%s) WITH " % ("\n" if formatted else "") 1235 ret += self._property_string(formatted, self.clustering_key, self.options, self.is_compact_storage) 1236 1237 return ret 1238 1239 @classmethod 1240 def _property_string(cls, formatted, clustering_key, options_map, is_compact_storage=False): 1241 properties = [] 1242 if is_compact_storage: 1243 properties.append("COMPACT STORAGE") 1244 1245 if clustering_key: 1246 cluster_str = "CLUSTERING ORDER BY " 1247 1248 inner = [] 1249 for col in clustering_key: 1250 ordering = "DESC" if col.is_reversed else "ASC" 1251 inner.append("%s %s" % (protect_name(col.name), ordering)) 1252 1253 cluster_str += "(%s)" % ", ".join(inner) 1254 properties.append(cluster_str) 1255 1256 properties.extend(cls._make_option_strings(options_map)) 1257 1258 join_str = "\n AND " if formatted else " AND " 1259 return join_str.join(properties) 1260 1261 @classmethod 1262 def _make_option_strings(cls, options_map): 1263 ret = [] 1264 options_copy = dict(options_map.items()) 1265 1266 actual_options = json.loads(options_copy.pop('compaction_strategy_options', '{}')) 1267 value = options_copy.pop("compaction_strategy_class", None) 1268 actual_options.setdefault("class", value) 1269 1270 compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()] 1271 ret.append('compaction = {%s}' % ', '.join(compaction_option_strings)) 1272 1273 for system_table_name in cls.compaction_options.keys(): 1274 options_copy.pop(system_table_name, None) # delete if present 1275 options_copy.pop('compaction_strategy_option', None) 1276 1277 if not options_copy.get('compression'): 1278 params = json.loads(options_copy.pop('compression_parameters', '{}')) 1279 param_strings = ["'%s': '%s'" % (k, v) for k, v in params.items()] 1280 ret.append('compression = {%s}' % ', '.join(param_strings)) 1281 1282 for name, value in options_copy.items(): 1283 if value is not None: 1284 if name == "comment": 1285 value = value or "" 1286 ret.append("%s = %s" % (name, protect_value(value))) 1287 1288 return list(sorted(ret)) 1289 1290 1291class TableExtensionInterface(object): 1292 """ 1293 Defines CQL/DDL for Cassandra table extensions. 1294 """ 1295 # limited API for now. Could be expanded as new extension types materialize -- "extend_option_strings", for example 1296 @classmethod 1297 def after_table_cql(cls, ext_key, ext_blob): 1298 """ 1299 Called to produce CQL/DDL to follow the table definition. 1300 Should contain requisite terminating semicolon(s). 1301 """ 1302 pass 1303 1304 1305class _RegisteredExtensionType(type): 1306 1307 _extension_registry = {} 1308 1309 def __new__(mcs, name, bases, dct): 1310 cls = super(_RegisteredExtensionType, mcs).__new__(mcs, name, bases, dct) 1311 if name != 'RegisteredTableExtension': 1312 mcs._extension_registry[cls.name] = cls 1313 return cls 1314 1315 1316@six.add_metaclass(_RegisteredExtensionType) 1317class RegisteredTableExtension(TableExtensionInterface): 1318 """ 1319 Extending this class registers it by name (associated by key in the `system_schema.tables.extensions` map). 1320 """ 1321 name = None 1322 """ 1323 Name of the extension (key in the map) 1324 """ 1325 1326 1327def protect_name(name): 1328 return maybe_escape_name(name) 1329 1330 1331def protect_names(names): 1332 return [protect_name(n) for n in names] 1333 1334 1335def protect_value(value): 1336 if value is None: 1337 return 'NULL' 1338 if isinstance(value, (int, float, bool)): 1339 return str(value).lower() 1340 return "'%s'" % value.replace("'", "''") 1341 1342 1343valid_cql3_word_re = re.compile(r'^[a-z][0-9a-z_]*$') 1344 1345 1346def is_valid_name(name): 1347 if name is None: 1348 return False 1349 if name.lower() in cql_keywords_reserved: 1350 return False 1351 return valid_cql3_word_re.match(name) is not None 1352 1353 1354def maybe_escape_name(name): 1355 if is_valid_name(name): 1356 return name 1357 return escape_name(name) 1358 1359 1360def escape_name(name): 1361 return '"%s"' % (name.replace('"', '""'),) 1362 1363 1364class ColumnMetadata(object): 1365 """ 1366 A representation of a single column in a table. 1367 """ 1368 1369 table = None 1370 """ The :class:`.TableMetadata` this column belongs to. """ 1371 1372 name = None 1373 """ The string name of this column. """ 1374 1375 cql_type = None 1376 """ 1377 The CQL type for the column. 1378 """ 1379 1380 is_static = False 1381 """ 1382 If this column is static (available in Cassandra 2.1+), this will 1383 be :const:`True`, otherwise :const:`False`. 1384 """ 1385 1386 is_reversed = False 1387 """ 1388 If this column is reversed (DESC) as in clustering order 1389 """ 1390 1391 _cass_type = None 1392 1393 def __init__(self, table_metadata, column_name, cql_type, is_static=False, is_reversed=False): 1394 self.table = table_metadata 1395 self.name = column_name 1396 self.cql_type = cql_type 1397 self.is_static = is_static 1398 self.is_reversed = is_reversed 1399 1400 def __str__(self): 1401 return "%s %s" % (self.name, self.cql_type) 1402 1403 1404class IndexMetadata(object): 1405 """ 1406 A representation of a secondary index on a column. 1407 """ 1408 keyspace_name = None 1409 """ A string name of the keyspace. """ 1410 1411 table_name = None 1412 """ A string name of the table this index is on. """ 1413 1414 name = None 1415 """ A string name for the index. """ 1416 1417 kind = None 1418 """ A string representing the kind of index (COMPOSITE, CUSTOM,...). """ 1419 1420 index_options = {} 1421 """ A dict of index options. """ 1422 1423 def __init__(self, keyspace_name, table_name, index_name, kind, index_options): 1424 self.keyspace_name = keyspace_name 1425 self.table_name = table_name 1426 self.name = index_name 1427 self.kind = kind 1428 self.index_options = index_options 1429 1430 def as_cql_query(self): 1431 """ 1432 Returns a CQL query that can be used to recreate this index. 1433 """ 1434 options = dict(self.index_options) 1435 index_target = options.pop("target") 1436 if self.kind != "CUSTOM": 1437 return "CREATE INDEX %s ON %s.%s (%s)" % ( 1438 protect_name(self.name), 1439 protect_name(self.keyspace_name), 1440 protect_name(self.table_name), 1441 index_target) 1442 else: 1443 class_name = options.pop("class_name") 1444 ret = "CREATE CUSTOM INDEX %s ON %s.%s (%s) USING '%s'" % ( 1445 protect_name(self.name), 1446 protect_name(self.keyspace_name), 1447 protect_name(self.table_name), 1448 index_target, 1449 class_name) 1450 if options: 1451 # PYTHON-1008: `ret` will always be a unicode 1452 opts_cql_encoded = _encoder.cql_encode_all_types(options, as_text_type=True) 1453 ret += " WITH OPTIONS = %s" % opts_cql_encoded 1454 return ret 1455 1456 def export_as_string(self): 1457 """ 1458 Returns a CQL query string that can be used to recreate this index. 1459 """ 1460 return self.as_cql_query() + ';' 1461 1462 1463class TokenMap(object): 1464 """ 1465 Information about the layout of the ring. 1466 """ 1467 1468 token_class = None 1469 """ 1470 A subclass of :class:`.Token`, depending on what partitioner the cluster uses. 1471 """ 1472 1473 token_to_host_owner = None 1474 """ 1475 A map of :class:`.Token` objects to the :class:`.Host` that owns that token. 1476 """ 1477 1478 tokens_to_hosts_by_ks = None 1479 """ 1480 A map of keyspace names to a nested map of :class:`.Token` objects to 1481 sets of :class:`.Host` objects. 1482 """ 1483 1484 ring = None 1485 """ 1486 An ordered list of :class:`.Token` instances in the ring. 1487 """ 1488 1489 _metadata = None 1490 1491 def __init__(self, token_class, token_to_host_owner, all_tokens, metadata): 1492 self.token_class = token_class 1493 self.ring = all_tokens 1494 self.token_to_host_owner = token_to_host_owner 1495 1496 self.tokens_to_hosts_by_ks = {} 1497 self._metadata = metadata 1498 self._rebuild_lock = RLock() 1499 1500 def rebuild_keyspace(self, keyspace, build_if_absent=False): 1501 with self._rebuild_lock: 1502 try: 1503 current = self.tokens_to_hosts_by_ks.get(keyspace, None) 1504 if (build_if_absent and current is None) or (not build_if_absent and current is not None): 1505 ks_meta = self._metadata.keyspaces.get(keyspace) 1506 if ks_meta: 1507 replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace]) 1508 self.tokens_to_hosts_by_ks[keyspace] = replica_map 1509 except Exception: 1510 # should not happen normally, but we don't want to blow up queries because of unexpected meta state 1511 # bypass until new map is generated 1512 self.tokens_to_hosts_by_ks[keyspace] = {} 1513 log.exception("Failed creating a token map for keyspace '%s' with %s. PLEASE REPORT THIS: https://datastax-oss.atlassian.net/projects/PYTHON", keyspace, self.token_to_host_owner) 1514 1515 def replica_map_for_keyspace(self, ks_metadata): 1516 strategy = ks_metadata.replication_strategy 1517 if strategy: 1518 return strategy.make_token_replica_map(self.token_to_host_owner, self.ring) 1519 else: 1520 return None 1521 1522 def remove_keyspace(self, keyspace): 1523 self.tokens_to_hosts_by_ks.pop(keyspace, None) 1524 1525 def get_replicas(self, keyspace, token): 1526 """ 1527 Get a set of :class:`.Host` instances representing all of the 1528 replica nodes for a given :class:`.Token`. 1529 """ 1530 tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None) 1531 if tokens_to_hosts is None: 1532 self.rebuild_keyspace(keyspace, build_if_absent=True) 1533 tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None) 1534 1535 if tokens_to_hosts: 1536 # The values in self.ring correspond to the end of the 1537 # token range up to and including the value listed. 1538 point = bisect_left(self.ring, token) 1539 if point == len(self.ring): 1540 return tokens_to_hosts[self.ring[0]] 1541 else: 1542 return tokens_to_hosts[self.ring[point]] 1543 return [] 1544 1545 1546@total_ordering 1547class Token(object): 1548 """ 1549 Abstract class representing a token. 1550 """ 1551 1552 def __init__(self, token): 1553 self.value = token 1554 1555 @classmethod 1556 def hash_fn(cls, key): 1557 return key 1558 1559 @classmethod 1560 def from_key(cls, key): 1561 return cls(cls.hash_fn(key)) 1562 1563 @classmethod 1564 def from_string(cls, token_string): 1565 raise NotImplementedError() 1566 1567 def __eq__(self, other): 1568 return self.value == other.value 1569 1570 def __lt__(self, other): 1571 return self.value < other.value 1572 1573 def __hash__(self): 1574 return hash(self.value) 1575 1576 def __repr__(self): 1577 return "<%s: %s>" % (self.__class__.__name__, self.value) 1578 __str__ = __repr__ 1579 1580 1581MIN_LONG = -(2 ** 63) 1582MAX_LONG = (2 ** 63) - 1 1583 1584 1585class NoMurmur3(Exception): 1586 pass 1587 1588 1589class HashToken(Token): 1590 1591 @classmethod 1592 def from_string(cls, token_string): 1593 """ `token_string` should be the string representation from the server. """ 1594 # The hash partitioners just store the deciman value 1595 return cls(int(token_string)) 1596 1597 1598class Murmur3Token(HashToken): 1599 """ 1600 A token for ``Murmur3Partitioner``. 1601 """ 1602 1603 @classmethod 1604 def hash_fn(cls, key): 1605 if murmur3 is not None: 1606 h = int(murmur3(key)) 1607 return h if h != MIN_LONG else MAX_LONG 1608 else: 1609 raise NoMurmur3() 1610 1611 def __init__(self, token): 1612 """ `token` is an int or string representing the token. """ 1613 self.value = int(token) 1614 1615 1616class MD5Token(HashToken): 1617 """ 1618 A token for ``RandomPartitioner``. 1619 """ 1620 1621 @classmethod 1622 def hash_fn(cls, key): 1623 if isinstance(key, six.text_type): 1624 key = key.encode('UTF-8') 1625 return abs(varint_unpack(md5(key).digest())) 1626 1627 1628class BytesToken(Token): 1629 """ 1630 A token for ``ByteOrderedPartitioner``. 1631 """ 1632 1633 @classmethod 1634 def from_string(cls, token_string): 1635 """ `token_string` should be the string representation from the server. """ 1636 # unhexlify works fine with unicode input in everythin but pypy3, where it Raises "TypeError: 'str' does not support the buffer interface" 1637 if isinstance(token_string, six.text_type): 1638 token_string = token_string.encode('ascii') 1639 # The BOP stores a hex string 1640 return cls(unhexlify(token_string)) 1641 1642 1643class TriggerMetadata(object): 1644 """ 1645 A representation of a trigger for a table. 1646 """ 1647 1648 table = None 1649 """ The :class:`.TableMetadata` this trigger belongs to. """ 1650 1651 name = None 1652 """ The string name of this trigger. """ 1653 1654 options = None 1655 """ 1656 A dict mapping trigger option names to their specific settings for this 1657 table. 1658 """ 1659 def __init__(self, table_metadata, trigger_name, options=None): 1660 self.table = table_metadata 1661 self.name = trigger_name 1662 self.options = options 1663 1664 def as_cql_query(self): 1665 ret = "CREATE TRIGGER %s ON %s.%s USING %s" % ( 1666 protect_name(self.name), 1667 protect_name(self.table.keyspace_name), 1668 protect_name(self.table.name), 1669 protect_value(self.options['class']) 1670 ) 1671 return ret 1672 1673 def export_as_string(self): 1674 return self.as_cql_query() + ';' 1675 1676 1677class _SchemaParser(object): 1678 1679 def __init__(self, connection, timeout): 1680 self.connection = connection 1681 self.timeout = timeout 1682 1683 def _handle_results(self, success, result, expected_failures=tuple()): 1684 """ 1685 Given a bool and a ResultSet (the form returned per result from 1686 Connection.wait_for_responses), return a dictionary containing the 1687 results. Used to process results from asynchronous queries to system 1688 tables. 1689 1690 ``expected_failures`` will usually be used to allow callers to ignore 1691 ``InvalidRequest`` errors caused by a missing system keyspace. For 1692 example, some DSE versions report a 4.X server version, but do not have 1693 virtual tables. Thus, running against 4.X servers, SchemaParserV4 uses 1694 expected_failures to make a best-effort attempt to read those 1695 keyspaces, but treat them as empty if they're not found. 1696 1697 :param success: A boolean representing whether or not the query 1698 succeeded 1699 :param result: The resultset in question. 1700 :expected_failures: An Exception class or an iterable thereof. If the 1701 query failed, but raised an instance of an expected failure class, this 1702 will ignore the failure and return an empty list. 1703 """ 1704 if not success and isinstance(result, expected_failures): 1705 return [] 1706 elif success: 1707 return dict_factory(*result.results) if result else [] 1708 else: 1709 raise result 1710 1711 def _query_build_row(self, query_string, build_func): 1712 result = self._query_build_rows(query_string, build_func) 1713 return result[0] if result else None 1714 1715 def _query_build_rows(self, query_string, build_func): 1716 query = QueryMessage(query=query_string, consistency_level=ConsistencyLevel.ONE) 1717 responses = self.connection.wait_for_responses((query), timeout=self.timeout, fail_on_error=False) 1718 (success, response) = responses[0] 1719 if success: 1720 result = dict_factory(*response.results) 1721 return [build_func(row) for row in result] 1722 elif isinstance(response, InvalidRequest): 1723 log.debug("user types table not found") 1724 return [] 1725 else: 1726 raise response 1727 1728 1729class SchemaParserV22(_SchemaParser): 1730 _SELECT_KEYSPACES = "SELECT * FROM system.schema_keyspaces" 1731 _SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies" 1732 _SELECT_COLUMNS = "SELECT * FROM system.schema_columns" 1733 _SELECT_TRIGGERS = "SELECT * FROM system.schema_triggers" 1734 _SELECT_TYPES = "SELECT * FROM system.schema_usertypes" 1735 _SELECT_FUNCTIONS = "SELECT * FROM system.schema_functions" 1736 _SELECT_AGGREGATES = "SELECT * FROM system.schema_aggregates" 1737 1738 _table_name_col = 'columnfamily_name' 1739 1740 _function_agg_arument_type_col = 'signature' 1741 1742 recognized_table_options = ( 1743 "comment", 1744 "read_repair_chance", 1745 "dclocal_read_repair_chance", # kept to be safe, but see _build_table_options() 1746 "local_read_repair_chance", 1747 "replicate_on_write", 1748 "gc_grace_seconds", 1749 "bloom_filter_fp_chance", 1750 "caching", 1751 "compaction_strategy_class", 1752 "compaction_strategy_options", 1753 "min_compaction_threshold", 1754 "max_compaction_threshold", 1755 "compression_parameters", 1756 "min_index_interval", 1757 "max_index_interval", 1758 "index_interval", 1759 "speculative_retry", 1760 "rows_per_partition_to_cache", 1761 "memtable_flush_period_in_ms", 1762 "populate_io_cache_on_flush", 1763 "compression", 1764 "default_time_to_live") 1765 1766 def __init__(self, connection, timeout): 1767 super(SchemaParserV22, self).__init__(connection, timeout) 1768 self.keyspaces_result = [] 1769 self.tables_result = [] 1770 self.columns_result = [] 1771 self.triggers_result = [] 1772 self.types_result = [] 1773 self.functions_result = [] 1774 self.aggregates_result = [] 1775 1776 self.keyspace_table_rows = defaultdict(list) 1777 self.keyspace_table_col_rows = defaultdict(lambda: defaultdict(list)) 1778 self.keyspace_type_rows = defaultdict(list) 1779 self.keyspace_func_rows = defaultdict(list) 1780 self.keyspace_agg_rows = defaultdict(list) 1781 self.keyspace_table_trigger_rows = defaultdict(lambda: defaultdict(list)) 1782 1783 def get_all_keyspaces(self): 1784 self._query_all() 1785 1786 for row in self.keyspaces_result: 1787 keyspace_meta = self._build_keyspace_metadata(row) 1788 1789 try: 1790 for table_row in self.keyspace_table_rows.get(keyspace_meta.name, []): 1791 table_meta = self._build_table_metadata(table_row) 1792 keyspace_meta._add_table_metadata(table_meta) 1793 1794 for usertype_row in self.keyspace_type_rows.get(keyspace_meta.name, []): 1795 usertype = self._build_user_type(usertype_row) 1796 keyspace_meta.user_types[usertype.name] = usertype 1797 1798 for fn_row in self.keyspace_func_rows.get(keyspace_meta.name, []): 1799 fn = self._build_function(fn_row) 1800 keyspace_meta.functions[fn.signature] = fn 1801 1802 for agg_row in self.keyspace_agg_rows.get(keyspace_meta.name, []): 1803 agg = self._build_aggregate(agg_row) 1804 keyspace_meta.aggregates[agg.signature] = agg 1805 except Exception: 1806 log.exception("Error while parsing metadata for keyspace %s. Metadata model will be incomplete.", keyspace_meta.name) 1807 keyspace_meta._exc_info = sys.exc_info() 1808 1809 yield keyspace_meta 1810 1811 def get_table(self, keyspaces, keyspace, table): 1812 cl = ConsistencyLevel.ONE 1813 where_clause = bind_params(" WHERE keyspace_name = %%s AND %s = %%s" % (self._table_name_col,), (keyspace, table), _encoder) 1814 cf_query = QueryMessage(query=self._SELECT_COLUMN_FAMILIES + where_clause, consistency_level=cl) 1815 col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl) 1816 triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl) 1817 (cf_success, cf_result), (col_success, col_result), (triggers_success, triggers_result) \ 1818 = self.connection.wait_for_responses(cf_query, col_query, triggers_query, timeout=self.timeout, fail_on_error=False) 1819 table_result = self._handle_results(cf_success, cf_result) 1820 col_result = self._handle_results(col_success, col_result) 1821 1822 # the triggers table doesn't exist in C* 1.2 1823 triggers_result = self._handle_results(triggers_success, triggers_result, 1824 expected_failures=InvalidRequest) 1825 1826 if table_result: 1827 return self._build_table_metadata(table_result[0], col_result, triggers_result) 1828 1829 def get_type(self, keyspaces, keyspace, type): 1830 where_clause = bind_params(" WHERE keyspace_name = %s AND type_name = %s", (keyspace, type), _encoder) 1831 return self._query_build_row(self._SELECT_TYPES + where_clause, self._build_user_type) 1832 1833 def get_types_map(self, keyspaces, keyspace): 1834 where_clause = bind_params(" WHERE keyspace_name = %s", (keyspace,), _encoder) 1835 types = self._query_build_rows(self._SELECT_TYPES + where_clause, self._build_user_type) 1836 return dict((t.name, t) for t in types) 1837 1838 def get_function(self, keyspaces, keyspace, function): 1839 where_clause = bind_params(" WHERE keyspace_name = %%s AND function_name = %%s AND %s = %%s" % (self._function_agg_arument_type_col,), 1840 (keyspace, function.name, function.argument_types), _encoder) 1841 return self._query_build_row(self._SELECT_FUNCTIONS + where_clause, self._build_function) 1842 1843 def get_aggregate(self, keyspaces, keyspace, aggregate): 1844 where_clause = bind_params(" WHERE keyspace_name = %%s AND aggregate_name = %%s AND %s = %%s" % (self._function_agg_arument_type_col,), 1845 (keyspace, aggregate.name, aggregate.argument_types), _encoder) 1846 1847 return self._query_build_row(self._SELECT_AGGREGATES + where_clause, self._build_aggregate) 1848 1849 def get_keyspace(self, keyspaces, keyspace): 1850 where_clause = bind_params(" WHERE keyspace_name = %s", (keyspace,), _encoder) 1851 return self._query_build_row(self._SELECT_KEYSPACES + where_clause, self._build_keyspace_metadata) 1852 1853 @classmethod 1854 def _build_keyspace_metadata(cls, row): 1855 try: 1856 ksm = cls._build_keyspace_metadata_internal(row) 1857 except Exception: 1858 name = row["keyspace_name"] 1859 ksm = KeyspaceMetadata(name, False, 'UNKNOWN', {}) 1860 ksm._exc_info = sys.exc_info() # capture exc_info before log because nose (test) logging clears it in certain circumstances 1861 log.exception("Error while parsing metadata for keyspace %s row(%s)", name, row) 1862 return ksm 1863 1864 @staticmethod 1865 def _build_keyspace_metadata_internal(row): 1866 name = row["keyspace_name"] 1867 durable_writes = row["durable_writes"] 1868 strategy_class = row["strategy_class"] 1869 strategy_options = json.loads(row["strategy_options"]) 1870 return KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) 1871 1872 @classmethod 1873 def _build_user_type(cls, usertype_row): 1874 field_types = list(map(cls._schema_type_to_cql, usertype_row['field_types'])) 1875 return UserType(usertype_row['keyspace_name'], usertype_row['type_name'], 1876 usertype_row['field_names'], field_types) 1877 1878 @classmethod 1879 def _build_function(cls, function_row): 1880 return_type = cls._schema_type_to_cql(function_row['return_type']) 1881 return Function(function_row['keyspace_name'], function_row['function_name'], 1882 function_row[cls._function_agg_arument_type_col], function_row['argument_names'], 1883 return_type, function_row['language'], function_row['body'], 1884 function_row['called_on_null_input']) 1885 1886 @classmethod 1887 def _build_aggregate(cls, aggregate_row): 1888 cass_state_type = types.lookup_casstype(aggregate_row['state_type']) 1889 initial_condition = aggregate_row['initcond'] 1890 if initial_condition is not None: 1891 initial_condition = _encoder.cql_encode_all_types(cass_state_type.deserialize(initial_condition, 3)) 1892 state_type = _cql_from_cass_type(cass_state_type) 1893 return_type = cls._schema_type_to_cql(aggregate_row['return_type']) 1894 return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'], 1895 aggregate_row['signature'], aggregate_row['state_func'], state_type, 1896 aggregate_row['final_func'], initial_condition, return_type) 1897 1898 def _build_table_metadata(self, row, col_rows=None, trigger_rows=None): 1899 keyspace_name = row["keyspace_name"] 1900 cfname = row[self._table_name_col] 1901 1902 col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][cfname] 1903 trigger_rows = trigger_rows or self.keyspace_table_trigger_rows[keyspace_name][cfname] 1904 1905 if not col_rows: # CASSANDRA-8487 1906 log.warning("Building table metadata with no column meta for %s.%s", 1907 keyspace_name, cfname) 1908 1909 table_meta = TableMetadata(keyspace_name, cfname) 1910 1911 try: 1912 comparator = types.lookup_casstype(row["comparator"]) 1913 table_meta.comparator = comparator 1914 1915 is_dct_comparator = issubclass(comparator, types.DynamicCompositeType) 1916 is_composite_comparator = issubclass(comparator, types.CompositeType) 1917 column_name_types = comparator.subtypes if is_composite_comparator else (comparator,) 1918 1919 num_column_name_components = len(column_name_types) 1920 last_col = column_name_types[-1] 1921 1922 column_aliases = row.get("column_aliases", None) 1923 1924 clustering_rows = [r for r in col_rows 1925 if r.get('type', None) == "clustering_key"] 1926 if len(clustering_rows) > 1: 1927 clustering_rows = sorted(clustering_rows, key=lambda row: row.get('component_index')) 1928 1929 if column_aliases is not None: 1930 column_aliases = json.loads(column_aliases) 1931 1932 if not column_aliases: # json load failed or column_aliases empty PYTHON-562 1933 column_aliases = [r.get('column_name') for r in clustering_rows] 1934 1935 if is_composite_comparator: 1936 if issubclass(last_col, types.ColumnToCollectionType): 1937 # collections 1938 is_compact = False 1939 has_value = False 1940 clustering_size = num_column_name_components - 2 1941 elif (len(column_aliases) == num_column_name_components - 1 and 1942 issubclass(last_col, types.UTF8Type)): 1943 # aliases? 1944 is_compact = False 1945 has_value = False 1946 clustering_size = num_column_name_components - 1 1947 else: 1948 # compact table 1949 is_compact = True 1950 has_value = column_aliases or not col_rows 1951 clustering_size = num_column_name_components 1952 1953 # Some thrift tables define names in composite types (see PYTHON-192) 1954 if not column_aliases and hasattr(comparator, 'fieldnames'): 1955 column_aliases = filter(None, comparator.fieldnames) 1956 else: 1957 is_compact = True 1958 if column_aliases or not col_rows or is_dct_comparator: 1959 has_value = True 1960 clustering_size = num_column_name_components 1961 else: 1962 has_value = False 1963 clustering_size = 0 1964 1965 # partition key 1966 partition_rows = [r for r in col_rows 1967 if r.get('type', None) == "partition_key"] 1968 1969 if len(partition_rows) > 1: 1970 partition_rows = sorted(partition_rows, key=lambda row: row.get('component_index')) 1971 1972 key_aliases = row.get("key_aliases") 1973 if key_aliases is not None: 1974 key_aliases = json.loads(key_aliases) if key_aliases else [] 1975 else: 1976 # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it. 1977 key_aliases = [r.get('column_name') for r in partition_rows] 1978 1979 key_validator = row.get("key_validator") 1980 if key_validator is not None: 1981 key_type = types.lookup_casstype(key_validator) 1982 key_types = key_type.subtypes if issubclass(key_type, types.CompositeType) else [key_type] 1983 else: 1984 key_types = [types.lookup_casstype(r.get('validator')) for r in partition_rows] 1985 1986 for i, col_type in enumerate(key_types): 1987 if len(key_aliases) > i: 1988 column_name = key_aliases[i] 1989 elif i == 0: 1990 column_name = "key" 1991 else: 1992 column_name = "key%d" % i 1993 1994 col = ColumnMetadata(table_meta, column_name, col_type.cql_parameterized_type()) 1995 table_meta.columns[column_name] = col 1996 table_meta.partition_key.append(col) 1997 1998 # clustering key 1999 for i in range(clustering_size): 2000 if len(column_aliases) > i: 2001 column_name = column_aliases[i] 2002 else: 2003 column_name = "column%d" % (i + 1) 2004 2005 data_type = column_name_types[i] 2006 cql_type = _cql_from_cass_type(data_type) 2007 is_reversed = types.is_reversed_casstype(data_type) 2008 col = ColumnMetadata(table_meta, column_name, cql_type, is_reversed=is_reversed) 2009 table_meta.columns[column_name] = col 2010 table_meta.clustering_key.append(col) 2011 2012 # value alias (if present) 2013 if has_value: 2014 value_alias_rows = [r for r in col_rows 2015 if r.get('type', None) == "compact_value"] 2016 2017 if not key_aliases: # TODO are we checking the right thing here? 2018 value_alias = "value" 2019 else: 2020 value_alias = row.get("value_alias", None) 2021 if value_alias is None and value_alias_rows: # CASSANDRA-8487 2022 # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it. 2023 value_alias = value_alias_rows[0].get('column_name') 2024 2025 default_validator = row.get("default_validator") 2026 if default_validator: 2027 validator = types.lookup_casstype(default_validator) 2028 else: 2029 if value_alias_rows: # CASSANDRA-8487 2030 validator = types.lookup_casstype(value_alias_rows[0].get('validator')) 2031 2032 cql_type = _cql_from_cass_type(validator) 2033 col = ColumnMetadata(table_meta, value_alias, cql_type) 2034 if value_alias: # CASSANDRA-8487 2035 table_meta.columns[value_alias] = col 2036 2037 # other normal columns 2038 for col_row in col_rows: 2039 column_meta = self._build_column_metadata(table_meta, col_row) 2040 if column_meta.name is not None: 2041 table_meta.columns[column_meta.name] = column_meta 2042 index_meta = self._build_index_metadata(column_meta, col_row) 2043 if index_meta: 2044 table_meta.indexes[index_meta.name] = index_meta 2045 2046 for trigger_row in trigger_rows: 2047 trigger_meta = self._build_trigger_metadata(table_meta, trigger_row) 2048 table_meta.triggers[trigger_meta.name] = trigger_meta 2049 2050 table_meta.options = self._build_table_options(row) 2051 table_meta.is_compact_storage = is_compact 2052 except Exception: 2053 table_meta._exc_info = sys.exc_info() 2054 log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, cfname, row, col_rows) 2055 2056 return table_meta 2057 2058 def _build_table_options(self, row): 2059 """ Setup the mostly-non-schema table options, like caching settings """ 2060 options = dict((o, row.get(o)) for o in self.recognized_table_options if o in row) 2061 2062 # the option name when creating tables is "dclocal_read_repair_chance", 2063 # but the column name in system.schema_columnfamilies is 2064 # "local_read_repair_chance". We'll store this as dclocal_read_repair_chance, 2065 # since that's probably what users are expecting (and we need it for the 2066 # CREATE TABLE statement anyway). 2067 if "local_read_repair_chance" in options: 2068 val = options.pop("local_read_repair_chance") 2069 options["dclocal_read_repair_chance"] = val 2070 2071 return options 2072 2073 @classmethod 2074 def _build_column_metadata(cls, table_metadata, row): 2075 name = row["column_name"] 2076 type_string = row["validator"] 2077 data_type = types.lookup_casstype(type_string) 2078 cql_type = _cql_from_cass_type(data_type) 2079 is_static = row.get("type", None) == "static" 2080 is_reversed = types.is_reversed_casstype(data_type) 2081 column_meta = ColumnMetadata(table_metadata, name, cql_type, is_static, is_reversed) 2082 column_meta._cass_type = data_type 2083 return column_meta 2084 2085 @staticmethod 2086 def _build_index_metadata(column_metadata, row): 2087 index_name = row.get("index_name") 2088 kind = row.get("index_type") 2089 if index_name or kind: 2090 options = row.get("index_options") 2091 options = json.loads(options) if options else {} 2092 options = options or {} # if the json parsed to None, init empty dict 2093 2094 # generate a CQL index identity string 2095 target = protect_name(column_metadata.name) 2096 if kind != "CUSTOM": 2097 if "index_keys" in options: 2098 target = 'keys(%s)' % (target,) 2099 elif "index_values" in options: 2100 # don't use any "function" for collection values 2101 pass 2102 else: 2103 # it might be a "full" index on a frozen collection, but 2104 # we need to check the data type to verify that, because 2105 # there is no special index option for full-collection 2106 # indexes. 2107 data_type = column_metadata._cass_type 2108 collection_types = ('map', 'set', 'list') 2109 if data_type.typename == "frozen" and data_type.subtypes[0].typename in collection_types: 2110 # no index option for full-collection index 2111 target = 'full(%s)' % (target,) 2112 options['target'] = target 2113 return IndexMetadata(column_metadata.table.keyspace_name, column_metadata.table.name, index_name, kind, options) 2114 2115 @staticmethod 2116 def _build_trigger_metadata(table_metadata, row): 2117 name = row["trigger_name"] 2118 options = row["trigger_options"] 2119 trigger_meta = TriggerMetadata(table_metadata, name, options) 2120 return trigger_meta 2121 2122 def _query_all(self): 2123 cl = ConsistencyLevel.ONE 2124 queries = [ 2125 QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl), 2126 QueryMessage(query=self._SELECT_COLUMN_FAMILIES, consistency_level=cl), 2127 QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), 2128 QueryMessage(query=self._SELECT_TYPES, consistency_level=cl), 2129 QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), 2130 QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl), 2131 QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl) 2132 ] 2133 2134 ((ks_success, ks_result), 2135 (table_success, table_result), 2136 (col_success, col_result), 2137 (types_success, types_result), 2138 (functions_success, functions_result), 2139 (aggregates_success, aggregates_result), 2140 (triggers_success, triggers_result)) = ( 2141 self.connection.wait_for_responses(*queries, timeout=self.timeout, 2142 fail_on_error=False) 2143 ) 2144 2145 self.keyspaces_result = self._handle_results(ks_success, ks_result) 2146 self.tables_result = self._handle_results(table_success, table_result) 2147 self.columns_result = self._handle_results(col_success, col_result) 2148 2149 # if we're connected to Cassandra < 2.0, the triggers table will not exist 2150 if triggers_success: 2151 self.triggers_result = dict_factory(*triggers_result.results) 2152 else: 2153 if isinstance(triggers_result, InvalidRequest): 2154 log.debug("triggers table not found") 2155 elif isinstance(triggers_result, Unauthorized): 2156 log.warning("this version of Cassandra does not allow access to schema_triggers metadata with authorization enabled (CASSANDRA-7967); " 2157 "The driver will operate normally, but will not reflect triggers in the local metadata model, or schema strings.") 2158 else: 2159 raise triggers_result 2160 2161 # if we're connected to Cassandra < 2.1, the usertypes table will not exist 2162 if types_success: 2163 self.types_result = dict_factory(*types_result.results) 2164 else: 2165 if isinstance(types_result, InvalidRequest): 2166 log.debug("user types table not found") 2167 self.types_result = {} 2168 else: 2169 raise types_result 2170 2171 # functions were introduced in Cassandra 2.2 2172 if functions_success: 2173 self.functions_result = dict_factory(*functions_result.results) 2174 else: 2175 if isinstance(functions_result, InvalidRequest): 2176 log.debug("user functions table not found") 2177 else: 2178 raise functions_result 2179 2180 # aggregates were introduced in Cassandra 2.2 2181 if aggregates_success: 2182 self.aggregates_result = dict_factory(*aggregates_result.results) 2183 else: 2184 if isinstance(aggregates_result, InvalidRequest): 2185 log.debug("user aggregates table not found") 2186 else: 2187 raise aggregates_result 2188 2189 self._aggregate_results() 2190 2191 def _aggregate_results(self): 2192 m = self.keyspace_table_rows 2193 for row in self.tables_result: 2194 m[row["keyspace_name"]].append(row) 2195 2196 m = self.keyspace_table_col_rows 2197 for row in self.columns_result: 2198 ksname = row["keyspace_name"] 2199 cfname = row[self._table_name_col] 2200 m[ksname][cfname].append(row) 2201 2202 m = self.keyspace_type_rows 2203 for row in self.types_result: 2204 m[row["keyspace_name"]].append(row) 2205 2206 m = self.keyspace_func_rows 2207 for row in self.functions_result: 2208 m[row["keyspace_name"]].append(row) 2209 2210 m = self.keyspace_agg_rows 2211 for row in self.aggregates_result: 2212 m[row["keyspace_name"]].append(row) 2213 2214 m = self.keyspace_table_trigger_rows 2215 for row in self.triggers_result: 2216 ksname = row["keyspace_name"] 2217 cfname = row[self._table_name_col] 2218 m[ksname][cfname].append(row) 2219 2220 @staticmethod 2221 def _schema_type_to_cql(type_string): 2222 cass_type = types.lookup_casstype(type_string) 2223 return _cql_from_cass_type(cass_type) 2224 2225 2226class SchemaParserV3(SchemaParserV22): 2227 _SELECT_KEYSPACES = "SELECT * FROM system_schema.keyspaces" 2228 _SELECT_TABLES = "SELECT * FROM system_schema.tables" 2229 _SELECT_COLUMNS = "SELECT * FROM system_schema.columns" 2230 _SELECT_INDEXES = "SELECT * FROM system_schema.indexes" 2231 _SELECT_TRIGGERS = "SELECT * FROM system_schema.triggers" 2232 _SELECT_TYPES = "SELECT * FROM system_schema.types" 2233 _SELECT_FUNCTIONS = "SELECT * FROM system_schema.functions" 2234 _SELECT_AGGREGATES = "SELECT * FROM system_schema.aggregates" 2235 _SELECT_VIEWS = "SELECT * FROM system_schema.views" 2236 2237 _table_name_col = 'table_name' 2238 2239 _function_agg_arument_type_col = 'argument_types' 2240 2241 recognized_table_options = ( 2242 'bloom_filter_fp_chance', 2243 'caching', 2244 'cdc', 2245 'comment', 2246 'compaction', 2247 'compression', 2248 'crc_check_chance', 2249 'dclocal_read_repair_chance', 2250 'default_time_to_live', 2251 'gc_grace_seconds', 2252 'max_index_interval', 2253 'memtable_flush_period_in_ms', 2254 'min_index_interval', 2255 'read_repair_chance', 2256 'speculative_retry') 2257 2258 def __init__(self, connection, timeout): 2259 super(SchemaParserV3, self).__init__(connection, timeout) 2260 self.indexes_result = [] 2261 self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list)) 2262 self.keyspace_view_rows = defaultdict(list) 2263 2264 def get_all_keyspaces(self): 2265 for keyspace_meta in super(SchemaParserV3, self).get_all_keyspaces(): 2266 for row in self.keyspace_view_rows[keyspace_meta.name]: 2267 view_meta = self._build_view_metadata(row) 2268 keyspace_meta._add_view_metadata(view_meta) 2269 yield keyspace_meta 2270 2271 def get_table(self, keyspaces, keyspace, table): 2272 cl = ConsistencyLevel.ONE 2273 where_clause = bind_params(" WHERE keyspace_name = %%s AND %s = %%s" % (self._table_name_col), (keyspace, table), _encoder) 2274 cf_query = QueryMessage(query=self._SELECT_TABLES + where_clause, consistency_level=cl) 2275 col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl) 2276 indexes_query = QueryMessage(query=self._SELECT_INDEXES + where_clause, consistency_level=cl) 2277 triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl) 2278 2279 # in protocol v4 we don't know if this event is a view or a table, so we look for both 2280 where_clause = bind_params(" WHERE keyspace_name = %s AND view_name = %s", (keyspace, table), _encoder) 2281 view_query = QueryMessage(query=self._SELECT_VIEWS + where_clause, 2282 consistency_level=cl) 2283 ((cf_success, cf_result), (col_success, col_result), 2284 (indexes_sucess, indexes_result), (triggers_success, triggers_result), 2285 (view_success, view_result)) = ( 2286 self.connection.wait_for_responses( 2287 cf_query, col_query, indexes_query, triggers_query, 2288 view_query, timeout=self.timeout, fail_on_error=False) 2289 ) 2290 table_result = self._handle_results(cf_success, cf_result) 2291 col_result = self._handle_results(col_success, col_result) 2292 if table_result: 2293 indexes_result = self._handle_results(indexes_sucess, indexes_result) 2294 triggers_result = self._handle_results(triggers_success, triggers_result) 2295 return self._build_table_metadata(table_result[0], col_result, triggers_result, indexes_result) 2296 2297 view_result = self._handle_results(view_success, view_result) 2298 if view_result: 2299 return self._build_view_metadata(view_result[0], col_result) 2300 2301 @staticmethod 2302 def _build_keyspace_metadata_internal(row): 2303 name = row["keyspace_name"] 2304 durable_writes = row["durable_writes"] 2305 strategy_options = dict(row["replication"]) 2306 strategy_class = strategy_options.pop("class") 2307 return KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) 2308 2309 @staticmethod 2310 def _build_aggregate(aggregate_row): 2311 return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'], 2312 aggregate_row['argument_types'], aggregate_row['state_func'], aggregate_row['state_type'], 2313 aggregate_row['final_func'], aggregate_row['initcond'], aggregate_row['return_type']) 2314 2315 def _build_table_metadata(self, row, col_rows=None, trigger_rows=None, index_rows=None, virtual=False): 2316 keyspace_name = row["keyspace_name"] 2317 table_name = row[self._table_name_col] 2318 2319 col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][table_name] 2320 trigger_rows = trigger_rows or self.keyspace_table_trigger_rows[keyspace_name][table_name] 2321 index_rows = index_rows or self.keyspace_table_index_rows[keyspace_name][table_name] 2322 2323 table_meta = TableMetadataV3(keyspace_name, table_name, virtual=virtual) 2324 try: 2325 table_meta.options = self._build_table_options(row) 2326 flags = row.get('flags', set()) 2327 if flags: 2328 compact_static = False 2329 table_meta.is_compact_storage = 'dense' in flags or 'super' in flags or 'compound' not in flags 2330 is_dense = 'dense' in flags 2331 elif virtual: 2332 compact_static = False 2333 table_meta.is_compact_storage = False 2334 is_dense = False 2335 else: 2336 compact_static = True 2337 table_meta.is_compact_storage = True 2338 is_dense = False 2339 2340 self._build_table_columns(table_meta, col_rows, compact_static, is_dense, virtual) 2341 2342 for trigger_row in trigger_rows: 2343 trigger_meta = self._build_trigger_metadata(table_meta, trigger_row) 2344 table_meta.triggers[trigger_meta.name] = trigger_meta 2345 2346 for index_row in index_rows: 2347 index_meta = self._build_index_metadata(table_meta, index_row) 2348 if index_meta: 2349 table_meta.indexes[index_meta.name] = index_meta 2350 2351 table_meta.extensions = row.get('extensions', {}) 2352 except Exception: 2353 table_meta._exc_info = sys.exc_info() 2354 log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, table_name, row, col_rows) 2355 2356 return table_meta 2357 2358 def _build_table_options(self, row): 2359 """ Setup the mostly-non-schema table options, like caching settings """ 2360 return dict((o, row.get(o)) for o in self.recognized_table_options if o in row) 2361 2362 def _build_table_columns(self, meta, col_rows, compact_static=False, is_dense=False, virtual=False): 2363 # partition key 2364 partition_rows = [r for r in col_rows 2365 if r.get('kind', None) == "partition_key"] 2366 if len(partition_rows) > 1: 2367 partition_rows = sorted(partition_rows, key=lambda row: row.get('position')) 2368 for r in partition_rows: 2369 # we have to add meta here (and not in the later loop) because TableMetadata.columns is an 2370 # OrderedDict, and it assumes keys are inserted first, in order, when exporting CQL 2371 column_meta = self._build_column_metadata(meta, r) 2372 meta.columns[column_meta.name] = column_meta 2373 meta.partition_key.append(meta.columns[r.get('column_name')]) 2374 2375 # clustering key 2376 if not compact_static: 2377 clustering_rows = [r for r in col_rows 2378 if r.get('kind', None) == "clustering"] 2379 if len(clustering_rows) > 1: 2380 clustering_rows = sorted(clustering_rows, key=lambda row: row.get('position')) 2381 for r in clustering_rows: 2382 column_meta = self._build_column_metadata(meta, r) 2383 meta.columns[column_meta.name] = column_meta 2384 meta.clustering_key.append(meta.columns[r.get('column_name')]) 2385 2386 for col_row in (r for r in col_rows 2387 if r.get('kind', None) not in ('partition_key', 'clustering_key')): 2388 column_meta = self._build_column_metadata(meta, col_row) 2389 if is_dense and column_meta.cql_type == types.cql_empty_type: 2390 continue 2391 if compact_static and not column_meta.is_static: 2392 # for compact static tables, we omit the clustering key and value, and only add the logical columns. 2393 # They are marked not static so that it generates appropriate CQL 2394 continue 2395 if compact_static: 2396 column_meta.is_static = False 2397 meta.columns[column_meta.name] = column_meta 2398 2399 def _build_view_metadata(self, row, col_rows=None): 2400 keyspace_name = row["keyspace_name"] 2401 view_name = row["view_name"] 2402 base_table_name = row["base_table_name"] 2403 include_all_columns = row["include_all_columns"] 2404 where_clause = row["where_clause"] 2405 col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][view_name] 2406 view_meta = MaterializedViewMetadata(keyspace_name, view_name, base_table_name, 2407 include_all_columns, where_clause, self._build_table_options(row)) 2408 self._build_table_columns(view_meta, col_rows) 2409 view_meta.extensions = row.get('extensions', {}) 2410 2411 return view_meta 2412 2413 @staticmethod 2414 def _build_column_metadata(table_metadata, row): 2415 name = row["column_name"] 2416 cql_type = row["type"] 2417 is_static = row.get("kind", None) == "static" 2418 is_reversed = row["clustering_order"].upper() == "DESC" 2419 column_meta = ColumnMetadata(table_metadata, name, cql_type, is_static, is_reversed) 2420 return column_meta 2421 2422 @staticmethod 2423 def _build_index_metadata(table_metadata, row): 2424 index_name = row.get("index_name") 2425 kind = row.get("kind") 2426 if index_name or kind: 2427 index_options = row.get("options") 2428 return IndexMetadata(table_metadata.keyspace_name, table_metadata.name, index_name, kind, index_options) 2429 else: 2430 return None 2431 2432 @staticmethod 2433 def _build_trigger_metadata(table_metadata, row): 2434 name = row["trigger_name"] 2435 options = row["options"] 2436 trigger_meta = TriggerMetadata(table_metadata, name, options) 2437 return trigger_meta 2438 2439 def _query_all(self): 2440 cl = ConsistencyLevel.ONE 2441 queries = [ 2442 QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl), 2443 QueryMessage(query=self._SELECT_TABLES, consistency_level=cl), 2444 QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), 2445 QueryMessage(query=self._SELECT_TYPES, consistency_level=cl), 2446 QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), 2447 QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl), 2448 QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl), 2449 QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl), 2450 QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl) 2451 ] 2452 2453 ((ks_success, ks_result), 2454 (table_success, table_result), 2455 (col_success, col_result), 2456 (types_success, types_result), 2457 (functions_success, functions_result), 2458 (aggregates_success, aggregates_result), 2459 (triggers_success, triggers_result), 2460 (indexes_success, indexes_result), 2461 (views_success, views_result)) = self.connection.wait_for_responses( 2462 *queries, timeout=self.timeout, fail_on_error=False 2463 ) 2464 2465 self.keyspaces_result = self._handle_results(ks_success, ks_result) 2466 self.tables_result = self._handle_results(table_success, table_result) 2467 self.columns_result = self._handle_results(col_success, col_result) 2468 self.triggers_result = self._handle_results(triggers_success, triggers_result) 2469 self.types_result = self._handle_results(types_success, types_result) 2470 self.functions_result = self._handle_results(functions_success, functions_result) 2471 self.aggregates_result = self._handle_results(aggregates_success, aggregates_result) 2472 self.indexes_result = self._handle_results(indexes_success, indexes_result) 2473 self.views_result = self._handle_results(views_success, views_result) 2474 2475 self._aggregate_results() 2476 2477 def _aggregate_results(self): 2478 super(SchemaParserV3, self)._aggregate_results() 2479 2480 m = self.keyspace_table_index_rows 2481 for row in self.indexes_result: 2482 ksname = row["keyspace_name"] 2483 cfname = row[self._table_name_col] 2484 m[ksname][cfname].append(row) 2485 2486 m = self.keyspace_view_rows 2487 for row in self.views_result: 2488 m[row["keyspace_name"]].append(row) 2489 2490 @staticmethod 2491 def _schema_type_to_cql(type_string): 2492 return type_string 2493 2494 2495class SchemaParserV4(SchemaParserV3): 2496 2497 recognized_table_options = tuple( 2498 opt for opt in 2499 SchemaParserV3.recognized_table_options 2500 if opt not in ( 2501 # removed in V4: CASSANDRA-13910 2502 'dclocal_read_repair_chance', 'read_repair_chance' 2503 ) 2504 ) 2505 2506 _SELECT_VIRTUAL_KEYSPACES = 'SELECT * from system_virtual_schema.keyspaces' 2507 _SELECT_VIRTUAL_TABLES = 'SELECT * from system_virtual_schema.tables' 2508 _SELECT_VIRTUAL_COLUMNS = 'SELECT * from system_virtual_schema.columns' 2509 2510 def __init__(self, connection, timeout): 2511 super(SchemaParserV4, self).__init__(connection, timeout) 2512 self.virtual_keyspaces_rows = defaultdict(list) 2513 self.virtual_tables_rows = defaultdict(list) 2514 self.virtual_columns_rows = defaultdict(lambda: defaultdict(list)) 2515 2516 def _query_all(self): 2517 cl = ConsistencyLevel.ONE 2518 # todo: this duplicates V3; we should find a way for _query_all methods 2519 # to extend each other. 2520 queries = [ 2521 # copied from V3 2522 QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl), 2523 QueryMessage(query=self._SELECT_TABLES, consistency_level=cl), 2524 QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), 2525 QueryMessage(query=self._SELECT_TYPES, consistency_level=cl), 2526 QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), 2527 QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl), 2528 QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl), 2529 QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl), 2530 QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl), 2531 # V4-only queries 2532 QueryMessage(query=self._SELECT_VIRTUAL_KEYSPACES, consistency_level=cl), 2533 QueryMessage(query=self._SELECT_VIRTUAL_TABLES, consistency_level=cl), 2534 QueryMessage(query=self._SELECT_VIRTUAL_COLUMNS, consistency_level=cl) 2535 ] 2536 2537 responses = self.connection.wait_for_responses( 2538 *queries, timeout=self.timeout, fail_on_error=False) 2539 ( 2540 # copied from V3 2541 (ks_success, ks_result), 2542 (table_success, table_result), 2543 (col_success, col_result), 2544 (types_success, types_result), 2545 (functions_success, functions_result), 2546 (aggregates_success, aggregates_result), 2547 (triggers_success, triggers_result), 2548 (indexes_success, indexes_result), 2549 (views_success, views_result), 2550 # V4-only responses 2551 (virtual_ks_success, virtual_ks_result), 2552 (virtual_table_success, virtual_table_result), 2553 (virtual_column_success, virtual_column_result) 2554 ) = responses 2555 2556 # copied from V3 2557 self.keyspaces_result = self._handle_results(ks_success, ks_result) 2558 self.tables_result = self._handle_results(table_success, table_result) 2559 self.columns_result = self._handle_results(col_success, col_result) 2560 self.triggers_result = self._handle_results(triggers_success, triggers_result) 2561 self.types_result = self._handle_results(types_success, types_result) 2562 self.functions_result = self._handle_results(functions_success, functions_result) 2563 self.aggregates_result = self._handle_results(aggregates_success, aggregates_result) 2564 self.indexes_result = self._handle_results(indexes_success, indexes_result) 2565 self.views_result = self._handle_results(views_success, views_result) 2566 # V4-only results 2567 # These tables don't exist in some DSE versions reporting 4.X so we can 2568 # ignore them if we got an error 2569 self.virtual_keyspaces_result = self._handle_results( 2570 virtual_ks_success, virtual_ks_result, 2571 expected_failures=InvalidRequest 2572 ) 2573 self.virtual_tables_result = self._handle_results( 2574 virtual_table_success, virtual_table_result, 2575 expected_failures=InvalidRequest 2576 ) 2577 self.virtual_columns_result = self._handle_results( 2578 virtual_column_success, virtual_column_result, 2579 expected_failures=InvalidRequest 2580 ) 2581 2582 self._aggregate_results() 2583 2584 def _aggregate_results(self): 2585 super(SchemaParserV4, self)._aggregate_results() 2586 2587 m = self.virtual_tables_rows 2588 for row in self.virtual_tables_result: 2589 m[row["keyspace_name"]].append(row) 2590 2591 m = self.virtual_columns_rows 2592 for row in self.virtual_columns_result: 2593 ks_name = row['keyspace_name'] 2594 tab_name = row[self._table_name_col] 2595 m[ks_name][tab_name].append(row) 2596 2597 def get_all_keyspaces(self): 2598 for x in super(SchemaParserV4, self).get_all_keyspaces(): 2599 yield x 2600 2601 for row in self.virtual_keyspaces_result: 2602 ks_name = row['keyspace_name'] 2603 keyspace_meta = self._build_keyspace_metadata(row) 2604 keyspace_meta.virtual = True 2605 2606 for table_row in self.virtual_tables_rows.get(ks_name, []): 2607 table_name = table_row[self._table_name_col] 2608 2609 col_rows = self.virtual_columns_rows[ks_name][table_name] 2610 keyspace_meta._add_table_metadata( 2611 self._build_table_metadata(table_row, 2612 col_rows=col_rows, 2613 virtual=True) 2614 ) 2615 yield keyspace_meta 2616 2617 @staticmethod 2618 def _build_keyspace_metadata_internal(row): 2619 # necessary fields that aren't int virtual ks 2620 row["durable_writes"] = row.get("durable_writes", None) 2621 row["replication"] = row.get("replication", {}) 2622 row["replication"]["class"] = row["replication"].get("class", None) 2623 return super(SchemaParserV4, SchemaParserV4)._build_keyspace_metadata_internal(row) 2624 2625 2626class TableMetadataV3(TableMetadata): 2627 compaction_options = {} 2628 2629 option_maps = ['compaction', 'compression', 'caching'] 2630 2631 @property 2632 def is_cql_compatible(self): 2633 return True 2634 2635 @classmethod 2636 def _make_option_strings(cls, options_map): 2637 ret = [] 2638 options_copy = dict(options_map.items()) 2639 2640 for option in cls.option_maps: 2641 value = options_copy.get(option) 2642 if isinstance(value, Mapping): 2643 del options_copy[option] 2644 params = ("'%s': '%s'" % (k, v) for k, v in value.items()) 2645 ret.append("%s = {%s}" % (option, ', '.join(params))) 2646 2647 for name, value in options_copy.items(): 2648 if value is not None: 2649 if name == "comment": 2650 value = value or "" 2651 ret.append("%s = %s" % (name, protect_value(value))) 2652 2653 return list(sorted(ret)) 2654 2655 2656class MaterializedViewMetadata(object): 2657 """ 2658 A representation of a materialized view on a table 2659 """ 2660 2661 keyspace_name = None 2662 2663 """ A string name of the view.""" 2664 2665 name = None 2666 """ A string name of the view.""" 2667 2668 base_table_name = None 2669 """ A string name of the base table for this view.""" 2670 2671 partition_key = None 2672 """ 2673 A list of :class:`.ColumnMetadata` instances representing the columns in 2674 the partition key for this view. This will always hold at least one 2675 column. 2676 """ 2677 2678 clustering_key = None 2679 """ 2680 A list of :class:`.ColumnMetadata` instances representing the columns 2681 in the clustering key for this view. 2682 2683 Note that a table may have no clustering keys, in which case this will 2684 be an empty list. 2685 """ 2686 2687 columns = None 2688 """ 2689 A dict mapping column names to :class:`.ColumnMetadata` instances. 2690 """ 2691 2692 include_all_columns = None 2693 """ A flag indicating whether the view was created AS SELECT * """ 2694 2695 where_clause = None 2696 """ String WHERE clause for the view select statement. From server metadata """ 2697 2698 options = None 2699 """ 2700 A dict mapping table option names to their specific settings for this 2701 view. 2702 """ 2703 2704 extensions = None 2705 """ 2706 Metadata describing configuration for table extensions 2707 """ 2708 2709 def __init__(self, keyspace_name, view_name, base_table_name, include_all_columns, where_clause, options): 2710 self.keyspace_name = keyspace_name 2711 self.name = view_name 2712 self.base_table_name = base_table_name 2713 self.partition_key = [] 2714 self.clustering_key = [] 2715 self.columns = OrderedDict() 2716 self.include_all_columns = include_all_columns 2717 self.where_clause = where_clause 2718 self.options = options or {} 2719 2720 def as_cql_query(self, formatted=False): 2721 """ 2722 Returns a CQL query that can be used to recreate this function. 2723 If `formatted` is set to :const:`True`, extra whitespace will 2724 be added to make the query more readable. 2725 """ 2726 sep = '\n ' if formatted else ' ' 2727 keyspace = protect_name(self.keyspace_name) 2728 name = protect_name(self.name) 2729 2730 selected_cols = '*' if self.include_all_columns else ', '.join(protect_name(col.name) for col in self.columns.values()) 2731 base_table = protect_name(self.base_table_name) 2732 where_clause = self.where_clause 2733 2734 part_key = ', '.join(protect_name(col.name) for col in self.partition_key) 2735 if len(self.partition_key) > 1: 2736 pk = "((%s)" % part_key 2737 else: 2738 pk = "(%s" % part_key 2739 if self.clustering_key: 2740 pk += ", %s" % ', '.join(protect_name(col.name) for col in self.clustering_key) 2741 pk += ")" 2742 2743 properties = TableMetadataV3._property_string(formatted, self.clustering_key, self.options) 2744 2745 ret = ("CREATE MATERIALIZED VIEW %(keyspace)s.%(name)s AS%(sep)s" 2746 "SELECT %(selected_cols)s%(sep)s" 2747 "FROM %(keyspace)s.%(base_table)s%(sep)s" 2748 "WHERE %(where_clause)s%(sep)s" 2749 "PRIMARY KEY %(pk)s%(sep)s" 2750 "WITH %(properties)s") % locals() 2751 2752 if self.extensions: 2753 registry = _RegisteredExtensionType._extension_registry 2754 for k in six.viewkeys(registry) & self.extensions: # no viewkeys on OrderedMapSerializeKey 2755 ext = registry[k] 2756 cql = ext.after_table_cql(self, k, self.extensions[k]) 2757 if cql: 2758 ret += "\n\n%s" % (cql,) 2759 return ret 2760 2761 def export_as_string(self): 2762 return self.as_cql_query(formatted=True) + ";" 2763 2764 2765def get_schema_parser(connection, server_version, timeout): 2766 version = Version(server_version) 2767 if version >= Version('4.0.0'): 2768 return SchemaParserV4(connection, timeout) 2769 if version >= Version('3.0.0'): 2770 return SchemaParserV3(connection, timeout) 2771 else: 2772 # we could further specialize by version. Right now just refactoring the 2773 # multi-version parser we have as of C* 2.2.0rc1. 2774 return SchemaParserV22(connection, timeout) 2775 2776 2777def _cql_from_cass_type(cass_type): 2778 """ 2779 A string representation of the type for this column, such as "varchar" 2780 or "map<string, int>". 2781 """ 2782 if issubclass(cass_type, types.ReversedType): 2783 return cass_type.subtypes[0].cql_parameterized_type() 2784 else: 2785 return cass_type.cql_parameterized_type() 2786 2787 2788NO_VALID_REPLICA = object() 2789 2790 2791def group_keys_by_replica(session, keyspace, table, keys): 2792 """ 2793 Returns a :class:`dict` with the keys grouped per host. This can be 2794 used to more accurately group by IN clause or to batch the keys per host. 2795 2796 If a valid replica is not found for a particular key it will be grouped under 2797 :class:`~.NO_VALID_REPLICA` 2798 2799 Example usage:: 2800 result = group_keys_by_replica( 2801 session, "system", "peers", 2802 (("127.0.0.1", ), ("127.0.0.2", )) 2803 ) 2804 """ 2805 cluster = session.cluster 2806 2807 partition_keys = cluster.metadata.keyspaces[keyspace].tables[table].partition_key 2808 2809 serializers = list(types._cqltypes[partition_key.cql_type] for partition_key in partition_keys) 2810 keys_per_host = defaultdict(list) 2811 distance = cluster._default_load_balancing_policy.distance 2812 2813 for key in keys: 2814 serialized_key = [serializer.serialize(pk, cluster.protocol_version) 2815 for serializer, pk in zip(serializers, key)] 2816 if len(serialized_key) == 1: 2817 routing_key = serialized_key[0] 2818 else: 2819 routing_key = b"".join(struct.pack(">H%dsB" % len(p), len(p), p, 0) for p in serialized_key) 2820 all_replicas = cluster.metadata.get_replicas(keyspace, routing_key) 2821 # First check if there are local replicas 2822 valid_replicas = [host for host in all_replicas if 2823 host.is_up and distance(host) == HostDistance.LOCAL] 2824 if not valid_replicas: 2825 valid_replicas = [host for host in all_replicas if host.is_up] 2826 2827 if valid_replicas: 2828 keys_per_host[random.choice(valid_replicas)].append(key) 2829 else: 2830 # We will group under this statement all the keys for which 2831 # we haven't found a valid replica 2832 keys_per_host[NO_VALID_REPLICA].append(key) 2833 2834 return dict(keys_per_host) 2835