1// Copyright (c) 2015 The gocql Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package gocql 6 7import ( 8 "encoding/hex" 9 "encoding/json" 10 "fmt" 11 "strconv" 12 "strings" 13 "sync" 14) 15 16// schema metadata for a keyspace 17type KeyspaceMetadata struct { 18 Name string 19 DurableWrites bool 20 StrategyClass string 21 StrategyOptions map[string]interface{} 22 Tables map[string]*TableMetadata 23 Functions map[string]*FunctionMetadata 24 Aggregates map[string]*AggregateMetadata 25 Views map[string]*ViewMetadata 26} 27 28// schema metadata for a table (a.k.a. column family) 29type TableMetadata struct { 30 Keyspace string 31 Name string 32 KeyValidator string 33 Comparator string 34 DefaultValidator string 35 KeyAliases []string 36 ColumnAliases []string 37 ValueAlias string 38 PartitionKey []*ColumnMetadata 39 ClusteringColumns []*ColumnMetadata 40 Columns map[string]*ColumnMetadata 41 OrderedColumns []string 42} 43 44// schema metadata for a column 45type ColumnMetadata struct { 46 Keyspace string 47 Table string 48 Name string 49 ComponentIndex int 50 Kind ColumnKind 51 Validator string 52 Type TypeInfo 53 ClusteringOrder string 54 Order ColumnOrder 55 Index ColumnIndexMetadata 56} 57 58// FunctionMetadata holds metadata for function constructs 59type FunctionMetadata struct { 60 Keyspace string 61 Name string 62 ArgumentTypes []TypeInfo 63 ArgumentNames []string 64 Body string 65 CalledOnNullInput bool 66 Language string 67 ReturnType TypeInfo 68} 69 70// AggregateMetadata holds metadata for aggregate constructs 71type AggregateMetadata struct { 72 Keyspace string 73 Name string 74 ArgumentTypes []TypeInfo 75 FinalFunc FunctionMetadata 76 InitCond string 77 ReturnType TypeInfo 78 StateFunc FunctionMetadata 79 StateType TypeInfo 80 81 stateFunc string 82 finalFunc string 83} 84 85// ViewMetadata holds the metadata for views. 86type ViewMetadata struct { 87 Keyspace string 88 Name string 89 FieldNames []string 90 FieldTypes []TypeInfo 91} 92 93// the ordering of the column with regard to its comparator 94type ColumnOrder bool 95 96const ( 97 ASC ColumnOrder = false 98 DESC = true 99) 100 101type ColumnIndexMetadata struct { 102 Name string 103 Type string 104 Options map[string]interface{} 105} 106 107type ColumnKind int 108 109const ( 110 ColumnUnkownKind ColumnKind = iota 111 ColumnPartitionKey 112 ColumnClusteringKey 113 ColumnRegular 114 ColumnCompact 115 ColumnStatic 116) 117 118func (c ColumnKind) String() string { 119 switch c { 120 case ColumnPartitionKey: 121 return "partition_key" 122 case ColumnClusteringKey: 123 return "clustering_key" 124 case ColumnRegular: 125 return "regular" 126 case ColumnCompact: 127 return "compact" 128 case ColumnStatic: 129 return "static" 130 default: 131 return fmt.Sprintf("unknown_column_%d", c) 132 } 133} 134 135func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte) error { 136 if typ.Type() != TypeVarchar { 137 return unmarshalErrorf("unable to marshall %s into ColumnKind, expected Varchar", typ) 138 } 139 140 kind, err := columnKindFromSchema(string(p)) 141 if err != nil { 142 return err 143 } 144 *c = kind 145 146 return nil 147} 148 149func columnKindFromSchema(kind string) (ColumnKind, error) { 150 switch kind { 151 case "partition_key": 152 return ColumnPartitionKey, nil 153 case "clustering_key", "clustering": 154 return ColumnClusteringKey, nil 155 case "regular": 156 return ColumnRegular, nil 157 case "compact_value": 158 return ColumnCompact, nil 159 case "static": 160 return ColumnStatic, nil 161 default: 162 return -1, fmt.Errorf("unknown column kind: %q", kind) 163 } 164} 165 166// default alias values 167const ( 168 DEFAULT_KEY_ALIAS = "key" 169 DEFAULT_COLUMN_ALIAS = "column" 170 DEFAULT_VALUE_ALIAS = "value" 171) 172 173// queries the cluster for schema information for a specific keyspace 174type schemaDescriber struct { 175 session *Session 176 mu sync.Mutex 177 178 cache map[string]*KeyspaceMetadata 179} 180 181// creates a session bound schema describer which will query and cache 182// keyspace metadata 183func newSchemaDescriber(session *Session) *schemaDescriber { 184 return &schemaDescriber{ 185 session: session, 186 cache: map[string]*KeyspaceMetadata{}, 187 } 188} 189 190// returns the cached KeyspaceMetadata held by the describer for the named 191// keyspace. 192func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) { 193 s.mu.Lock() 194 defer s.mu.Unlock() 195 196 metadata, found := s.cache[keyspaceName] 197 if !found { 198 // refresh the cache for this keyspace 199 err := s.refreshSchema(keyspaceName) 200 if err != nil { 201 return nil, err 202 } 203 204 metadata = s.cache[keyspaceName] 205 } 206 207 return metadata, nil 208} 209 210// clears the already cached keyspace metadata 211func (s *schemaDescriber) clearSchema(keyspaceName string) { 212 s.mu.Lock() 213 defer s.mu.Unlock() 214 215 delete(s.cache, keyspaceName) 216} 217 218// forcibly updates the current KeyspaceMetadata held by the schema describer 219// for a given named keyspace. 220func (s *schemaDescriber) refreshSchema(keyspaceName string) error { 221 var err error 222 223 // query the system keyspace for schema data 224 // TODO retrieve concurrently 225 keyspace, err := getKeyspaceMetadata(s.session, keyspaceName) 226 if err != nil { 227 return err 228 } 229 tables, err := getTableMetadata(s.session, keyspaceName) 230 if err != nil { 231 return err 232 } 233 columns, err := getColumnMetadata(s.session, keyspaceName) 234 if err != nil { 235 return err 236 } 237 functions, err := getFunctionsMetadata(s.session, keyspaceName) 238 if err != nil { 239 return err 240 } 241 aggregates, err := getAggregatesMetadata(s.session, keyspaceName) 242 if err != nil { 243 return err 244 } 245 views, err := getViewsMetadata(s.session, keyspaceName) 246 if err != nil { 247 return err 248 } 249 250 // organize the schema data 251 compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates, views) 252 253 // update the cache 254 s.cache[keyspaceName] = keyspace 255 256 return nil 257} 258 259// "compiles" derived information about keyspace, table, and column metadata 260// for a keyspace from the basic queried metadata objects returned by 261// getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively; 262// Links the metadata objects together and derives the column composition of 263// the partition key and clustering key for a table. 264func compileMetadata( 265 protoVersion int, 266 keyspace *KeyspaceMetadata, 267 tables []TableMetadata, 268 columns []ColumnMetadata, 269 functions []FunctionMetadata, 270 aggregates []AggregateMetadata, 271 views []ViewMetadata, 272) { 273 keyspace.Tables = make(map[string]*TableMetadata) 274 for i := range tables { 275 tables[i].Columns = make(map[string]*ColumnMetadata) 276 277 keyspace.Tables[tables[i].Name] = &tables[i] 278 } 279 keyspace.Functions = make(map[string]*FunctionMetadata, len(functions)) 280 for i := range functions { 281 keyspace.Functions[functions[i].Name] = &functions[i] 282 } 283 keyspace.Aggregates = make(map[string]*AggregateMetadata, len(aggregates)) 284 for _, aggregate := range aggregates { 285 aggregate.FinalFunc = *keyspace.Functions[aggregate.finalFunc] 286 aggregate.StateFunc = *keyspace.Functions[aggregate.stateFunc] 287 keyspace.Aggregates[aggregate.Name] = &aggregate 288 } 289 keyspace.Views = make(map[string]*ViewMetadata, len(views)) 290 for i := range views { 291 keyspace.Views[views[i].Name] = &views[i] 292 } 293 294 // add columns from the schema data 295 for i := range columns { 296 col := &columns[i] 297 // decode the validator for TypeInfo and order 298 if col.ClusteringOrder != "" { // Cassandra 3.x+ 299 col.Type = getCassandraType(col.Validator) 300 col.Order = ASC 301 if col.ClusteringOrder == "desc" { 302 col.Order = DESC 303 } 304 } else { 305 validatorParsed := parseType(col.Validator) 306 col.Type = validatorParsed.types[0] 307 col.Order = ASC 308 if validatorParsed.reversed[0] { 309 col.Order = DESC 310 } 311 } 312 313 table, ok := keyspace.Tables[col.Table] 314 if !ok { 315 // if the schema is being updated we will race between seeing 316 // the metadata be complete. Potentially we should check for 317 // schema versions before and after reading the metadata and 318 // if they dont match try again. 319 continue 320 } 321 322 table.Columns[col.Name] = col 323 table.OrderedColumns = append(table.OrderedColumns, col.Name) 324 } 325 326 if protoVersion == protoVersion1 { 327 compileV1Metadata(tables) 328 } else { 329 compileV2Metadata(tables) 330 } 331} 332 333// Compiles derived information from TableMetadata which have had 334// ColumnMetadata added already. V1 protocol does not return as much 335// column metadata as V2+ (because V1 doesn't support the "type" column in the 336// system.schema_columns table) so determining PartitionKey and ClusterColumns 337// is more complex. 338func compileV1Metadata(tables []TableMetadata) { 339 for i := range tables { 340 table := &tables[i] 341 342 // decode the key validator 343 keyValidatorParsed := parseType(table.KeyValidator) 344 // decode the comparator 345 comparatorParsed := parseType(table.Comparator) 346 347 // the partition key length is the same as the number of types in the 348 // key validator 349 table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types)) 350 351 // V1 protocol only returns "regular" columns from 352 // system.schema_columns (there is no type field for columns) 353 // so the alias information is used to 354 // create the partition key and clustering columns 355 356 // construct the partition key from the alias 357 for i := range table.PartitionKey { 358 var alias string 359 if len(table.KeyAliases) > i { 360 alias = table.KeyAliases[i] 361 } else if i == 0 { 362 alias = DEFAULT_KEY_ALIAS 363 } else { 364 alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1) 365 } 366 367 column := &ColumnMetadata{ 368 Keyspace: table.Keyspace, 369 Table: table.Name, 370 Name: alias, 371 Type: keyValidatorParsed.types[i], 372 Kind: ColumnPartitionKey, 373 ComponentIndex: i, 374 } 375 376 table.PartitionKey[i] = column 377 table.Columns[alias] = column 378 } 379 380 // determine the number of clustering columns 381 size := len(comparatorParsed.types) 382 if comparatorParsed.isComposite { 383 if len(comparatorParsed.collections) != 0 || 384 (len(table.ColumnAliases) == size-1 && 385 comparatorParsed.types[size-1].Type() == TypeVarchar) { 386 size = size - 1 387 } 388 } else { 389 if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) { 390 size = 0 391 } 392 } 393 394 table.ClusteringColumns = make([]*ColumnMetadata, size) 395 396 for i := range table.ClusteringColumns { 397 var alias string 398 if len(table.ColumnAliases) > i { 399 alias = table.ColumnAliases[i] 400 } else if i == 0 { 401 alias = DEFAULT_COLUMN_ALIAS 402 } else { 403 alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1) 404 } 405 406 order := ASC 407 if comparatorParsed.reversed[i] { 408 order = DESC 409 } 410 411 column := &ColumnMetadata{ 412 Keyspace: table.Keyspace, 413 Table: table.Name, 414 Name: alias, 415 Type: comparatorParsed.types[i], 416 Order: order, 417 Kind: ColumnClusteringKey, 418 ComponentIndex: i, 419 } 420 421 table.ClusteringColumns[i] = column 422 table.Columns[alias] = column 423 } 424 425 if size != len(comparatorParsed.types)-1 { 426 alias := DEFAULT_VALUE_ALIAS 427 if len(table.ValueAlias) > 0 { 428 alias = table.ValueAlias 429 } 430 // decode the default validator 431 defaultValidatorParsed := parseType(table.DefaultValidator) 432 column := &ColumnMetadata{ 433 Keyspace: table.Keyspace, 434 Table: table.Name, 435 Name: alias, 436 Type: defaultValidatorParsed.types[0], 437 Kind: ColumnRegular, 438 } 439 table.Columns[alias] = column 440 } 441 } 442} 443 444// The simpler compile case for V2+ protocol 445func compileV2Metadata(tables []TableMetadata) { 446 for i := range tables { 447 table := &tables[i] 448 449 clusteringColumnCount := componentColumnCountOfType(table.Columns, ColumnClusteringKey) 450 table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount) 451 452 if table.KeyValidator != "" { 453 keyValidatorParsed := parseType(table.KeyValidator) 454 table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types)) 455 } else { // Cassandra 3.x+ 456 partitionKeyCount := componentColumnCountOfType(table.Columns, ColumnPartitionKey) 457 table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount) 458 } 459 460 for _, columnName := range table.OrderedColumns { 461 column := table.Columns[columnName] 462 if column.Kind == ColumnPartitionKey { 463 table.PartitionKey[column.ComponentIndex] = column 464 } else if column.Kind == ColumnClusteringKey { 465 table.ClusteringColumns[column.ComponentIndex] = column 466 } 467 } 468 } 469} 470 471// returns the count of coluns with the given "kind" value. 472func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind ColumnKind) int { 473 maxComponentIndex := -1 474 for _, column := range columns { 475 if column.Kind == kind && column.ComponentIndex > maxComponentIndex { 476 maxComponentIndex = column.ComponentIndex 477 } 478 } 479 return maxComponentIndex + 1 480} 481 482// query only for the keyspace metadata for the specified keyspace from system.schema_keyspace 483func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) { 484 keyspace := &KeyspaceMetadata{Name: keyspaceName} 485 486 if session.useSystemSchema { // Cassandra 3.x+ 487 const stmt = ` 488 SELECT durable_writes, replication 489 FROM system_schema.keyspaces 490 WHERE keyspace_name = ?` 491 492 var replication map[string]string 493 494 iter := session.control.query(stmt, keyspaceName) 495 if iter.NumRows() == 0 { 496 return nil, ErrKeyspaceDoesNotExist 497 } 498 iter.Scan(&keyspace.DurableWrites, &replication) 499 err := iter.Close() 500 if err != nil { 501 return nil, fmt.Errorf("Error querying keyspace schema: %v", err) 502 } 503 504 keyspace.StrategyClass = replication["class"] 505 delete(replication, "class") 506 507 keyspace.StrategyOptions = make(map[string]interface{}, len(replication)) 508 for k, v := range replication { 509 keyspace.StrategyOptions[k] = v 510 } 511 } else { 512 513 const stmt = ` 514 SELECT durable_writes, strategy_class, strategy_options 515 FROM system.schema_keyspaces 516 WHERE keyspace_name = ?` 517 518 var strategyOptionsJSON []byte 519 520 iter := session.control.query(stmt, keyspaceName) 521 if iter.NumRows() == 0 { 522 return nil, ErrKeyspaceDoesNotExist 523 } 524 iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON) 525 err := iter.Close() 526 if err != nil { 527 return nil, fmt.Errorf("Error querying keyspace schema: %v", err) 528 } 529 530 err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions) 531 if err != nil { 532 return nil, fmt.Errorf( 533 "Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v", 534 strategyOptionsJSON, keyspace.Name, err, 535 ) 536 } 537 } 538 539 return keyspace, nil 540} 541 542// query for only the table metadata in the specified keyspace from system.schema_columnfamilies 543func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, error) { 544 545 var ( 546 iter *Iter 547 scan func(iter *Iter, table *TableMetadata) bool 548 stmt string 549 550 keyAliasesJSON []byte 551 columnAliasesJSON []byte 552 ) 553 554 if session.useSystemSchema { // Cassandra 3.x+ 555 stmt = ` 556 SELECT 557 table_name 558 FROM system_schema.tables 559 WHERE keyspace_name = ?` 560 561 switchIter := func() *Iter { 562 iter.Close() 563 stmt = ` 564 SELECT 565 view_name 566 FROM system_schema.views 567 WHERE keyspace_name = ?` 568 iter = session.control.query(stmt, keyspaceName) 569 return iter 570 } 571 572 scan = func(iter *Iter, table *TableMetadata) bool { 573 r := iter.Scan( 574 &table.Name, 575 ) 576 if !r { 577 iter = switchIter() 578 if iter != nil { 579 switchIter = func() *Iter { return nil } 580 r = iter.Scan(&table.Name) 581 } 582 } 583 return r 584 } 585 } else if session.cfg.ProtoVersion == protoVersion1 { 586 // we have key aliases 587 stmt = ` 588 SELECT 589 columnfamily_name, 590 key_validator, 591 comparator, 592 default_validator, 593 key_aliases, 594 column_aliases, 595 value_alias 596 FROM system.schema_columnfamilies 597 WHERE keyspace_name = ?` 598 599 scan = func(iter *Iter, table *TableMetadata) bool { 600 return iter.Scan( 601 &table.Name, 602 &table.KeyValidator, 603 &table.Comparator, 604 &table.DefaultValidator, 605 &keyAliasesJSON, 606 &columnAliasesJSON, 607 &table.ValueAlias, 608 ) 609 } 610 } else { 611 stmt = ` 612 SELECT 613 columnfamily_name, 614 key_validator, 615 comparator, 616 default_validator 617 FROM system.schema_columnfamilies 618 WHERE keyspace_name = ?` 619 620 scan = func(iter *Iter, table *TableMetadata) bool { 621 return iter.Scan( 622 &table.Name, 623 &table.KeyValidator, 624 &table.Comparator, 625 &table.DefaultValidator, 626 ) 627 } 628 } 629 630 iter = session.control.query(stmt, keyspaceName) 631 632 tables := []TableMetadata{} 633 table := TableMetadata{Keyspace: keyspaceName} 634 635 for scan(iter, &table) { 636 var err error 637 638 // decode the key aliases 639 if keyAliasesJSON != nil { 640 table.KeyAliases = []string{} 641 err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases) 642 if err != nil { 643 iter.Close() 644 return nil, fmt.Errorf( 645 "Invalid JSON value '%s' as key_aliases for in table '%s': %v", 646 keyAliasesJSON, table.Name, err, 647 ) 648 } 649 } 650 651 // decode the column aliases 652 if columnAliasesJSON != nil { 653 table.ColumnAliases = []string{} 654 err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases) 655 if err != nil { 656 iter.Close() 657 return nil, fmt.Errorf( 658 "Invalid JSON value '%s' as column_aliases for in table '%s': %v", 659 columnAliasesJSON, table.Name, err, 660 ) 661 } 662 } 663 664 tables = append(tables, table) 665 table = TableMetadata{Keyspace: keyspaceName} 666 } 667 668 err := iter.Close() 669 if err != nil && err != ErrNotFound { 670 return nil, fmt.Errorf("Error querying table schema: %v", err) 671 } 672 673 return tables, nil 674} 675 676func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) { 677 // V1 does not support the type column, and all returned rows are 678 // of kind "regular". 679 const stmt = ` 680 SELECT 681 columnfamily_name, 682 column_name, 683 component_index, 684 validator, 685 index_name, 686 index_type, 687 index_options 688 FROM system.schema_columns 689 WHERE keyspace_name = ?` 690 691 var columns []ColumnMetadata 692 693 rows := s.control.query(stmt, keyspace).Scanner() 694 for rows.Next() { 695 var ( 696 column = ColumnMetadata{Keyspace: keyspace} 697 indexOptionsJSON []byte 698 ) 699 700 // all columns returned by V1 are regular 701 column.Kind = ColumnRegular 702 703 err := rows.Scan(&column.Table, 704 &column.Name, 705 &column.ComponentIndex, 706 &column.Validator, 707 &column.Index.Name, 708 &column.Index.Type, 709 &indexOptionsJSON) 710 711 if err != nil { 712 return nil, err 713 } 714 715 if len(indexOptionsJSON) > 0 { 716 err := json.Unmarshal(indexOptionsJSON, &column.Index.Options) 717 if err != nil { 718 return nil, fmt.Errorf( 719 "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v", 720 indexOptionsJSON, 721 column.Name, 722 column.Table, 723 err) 724 } 725 } 726 727 columns = append(columns, column) 728 } 729 730 if err := rows.Err(); err != nil { 731 return nil, err 732 } 733 734 return columns, nil 735} 736 737func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) { 738 // V2+ supports the type column 739 const stmt = ` 740 SELECT 741 columnfamily_name, 742 column_name, 743 component_index, 744 validator, 745 index_name, 746 index_type, 747 index_options, 748 type 749 FROM system.schema_columns 750 WHERE keyspace_name = ?` 751 752 var columns []ColumnMetadata 753 754 rows := s.control.query(stmt, keyspace).Scanner() 755 for rows.Next() { 756 var ( 757 column = ColumnMetadata{Keyspace: keyspace} 758 indexOptionsJSON []byte 759 ) 760 761 err := rows.Scan(&column.Table, 762 &column.Name, 763 &column.ComponentIndex, 764 &column.Validator, 765 &column.Index.Name, 766 &column.Index.Type, 767 &indexOptionsJSON, 768 &column.Kind, 769 ) 770 771 if err != nil { 772 return nil, err 773 } 774 775 if len(indexOptionsJSON) > 0 { 776 err := json.Unmarshal(indexOptionsJSON, &column.Index.Options) 777 if err != nil { 778 return nil, fmt.Errorf( 779 "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v", 780 indexOptionsJSON, 781 column.Name, 782 column.Table, 783 err) 784 } 785 } 786 787 columns = append(columns, column) 788 } 789 790 if err := rows.Err(); err != nil { 791 return nil, err 792 } 793 794 return columns, nil 795 796} 797 798func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) { 799 const stmt = ` 800 SELECT 801 table_name, 802 column_name, 803 clustering_order, 804 type, 805 kind, 806 position 807 FROM system_schema.columns 808 WHERE keyspace_name = ?` 809 810 var columns []ColumnMetadata 811 812 rows := s.control.query(stmt, keyspace).Scanner() 813 for rows.Next() { 814 column := ColumnMetadata{Keyspace: keyspace} 815 816 err := rows.Scan(&column.Table, 817 &column.Name, 818 &column.ClusteringOrder, 819 &column.Validator, 820 &column.Kind, 821 &column.ComponentIndex, 822 ) 823 824 if err != nil { 825 return nil, err 826 } 827 828 columns = append(columns, column) 829 } 830 831 if err := rows.Err(); err != nil { 832 return nil, err 833 } 834 835 // TODO(zariel): get column index info from system_schema.indexes 836 837 return columns, nil 838} 839 840// query for only the column metadata in the specified keyspace from system.schema_columns 841func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) { 842 var ( 843 columns []ColumnMetadata 844 err error 845 ) 846 847 // Deal with differences in protocol versions 848 if session.cfg.ProtoVersion == 1 { 849 columns, err = session.scanColumnMetadataV1(keyspaceName) 850 } else if session.useSystemSchema { // Cassandra 3.x+ 851 columns, err = session.scanColumnMetadataSystem(keyspaceName) 852 } else { 853 columns, err = session.scanColumnMetadataV2(keyspaceName) 854 } 855 856 if err != nil && err != ErrNotFound { 857 return nil, fmt.Errorf("Error querying column schema: %v", err) 858 } 859 860 return columns, nil 861} 862 863func getTypeInfo(t string) TypeInfo { 864 if strings.HasPrefix(t, apacheCassandraTypePrefix) { 865 t = apacheToCassandraType(t) 866 } 867 return getCassandraType(t) 868} 869 870func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, error) { 871 if session.cfg.ProtoVersion == protoVersion1 { 872 return nil, nil 873 } 874 var tableName string 875 if session.useSystemSchema { 876 tableName = "system_schema.types" 877 } else { 878 tableName = "system.schema_usertypes" 879 } 880 stmt := fmt.Sprintf(` 881 SELECT 882 type_name, 883 field_names, 884 field_types 885 FROM %s 886 WHERE keyspace_name = ?`, tableName) 887 888 var views []ViewMetadata 889 890 rows := session.control.query(stmt, keyspaceName).Scanner() 891 for rows.Next() { 892 view := ViewMetadata{Keyspace: keyspaceName} 893 var argumentTypes []string 894 err := rows.Scan(&view.Name, 895 &view.FieldNames, 896 &argumentTypes, 897 ) 898 if err != nil { 899 return nil, err 900 } 901 view.FieldTypes = make([]TypeInfo, len(argumentTypes)) 902 for i, argumentType := range argumentTypes { 903 view.FieldTypes[i] = getTypeInfo(argumentType) 904 } 905 views = append(views, view) 906 } 907 908 if err := rows.Err(); err != nil { 909 return nil, err 910 } 911 912 return views, nil 913} 914 915func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMetadata, error) { 916 if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions { 917 return nil, nil 918 } 919 var tableName string 920 if session.useSystemSchema { 921 tableName = "system_schema.functions" 922 } else { 923 tableName = "system.schema_functions" 924 } 925 stmt := fmt.Sprintf(` 926 SELECT 927 function_name, 928 argument_types, 929 argument_names, 930 body, 931 called_on_null_input, 932 language, 933 return_type 934 FROM %s 935 WHERE keyspace_name = ?`, tableName) 936 937 var functions []FunctionMetadata 938 939 rows := session.control.query(stmt, keyspaceName).Scanner() 940 for rows.Next() { 941 function := FunctionMetadata{Keyspace: keyspaceName} 942 var argumentTypes []string 943 var returnType string 944 err := rows.Scan(&function.Name, 945 &argumentTypes, 946 &function.ArgumentNames, 947 &function.Body, 948 &function.CalledOnNullInput, 949 &function.Language, 950 &returnType, 951 ) 952 if err != nil { 953 return nil, err 954 } 955 function.ReturnType = getTypeInfo(returnType) 956 function.ArgumentTypes = make([]TypeInfo, len(argumentTypes)) 957 for i, argumentType := range argumentTypes { 958 function.ArgumentTypes[i] = getTypeInfo(argumentType) 959 } 960 functions = append(functions, function) 961 } 962 963 if err := rows.Err(); err != nil { 964 return nil, err 965 } 966 967 return functions, nil 968} 969 970func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMetadata, error) { 971 if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions { 972 return nil, nil 973 } 974 var tableName string 975 if session.useSystemSchema { 976 tableName = "system_schema.aggregates" 977 } else { 978 tableName = "system.schema_aggregates" 979 } 980 981 stmt := fmt.Sprintf(` 982 SELECT 983 aggregate_name, 984 argument_types, 985 final_func, 986 initcond, 987 return_type, 988 state_func, 989 state_type 990 FROM %s 991 WHERE keyspace_name = ?`, tableName) 992 993 var aggregates []AggregateMetadata 994 995 rows := session.control.query(stmt, keyspaceName).Scanner() 996 for rows.Next() { 997 aggregate := AggregateMetadata{Keyspace: keyspaceName} 998 var argumentTypes []string 999 var returnType string 1000 var stateType string 1001 err := rows.Scan(&aggregate.Name, 1002 &argumentTypes, 1003 &aggregate.finalFunc, 1004 &aggregate.InitCond, 1005 &returnType, 1006 &aggregate.stateFunc, 1007 &stateType, 1008 ) 1009 if err != nil { 1010 return nil, err 1011 } 1012 aggregate.ReturnType = getTypeInfo(returnType) 1013 aggregate.StateType = getTypeInfo(stateType) 1014 aggregate.ArgumentTypes = make([]TypeInfo, len(argumentTypes)) 1015 for i, argumentType := range argumentTypes { 1016 aggregate.ArgumentTypes[i] = getTypeInfo(argumentType) 1017 } 1018 aggregates = append(aggregates, aggregate) 1019 } 1020 1021 if err := rows.Err(); err != nil { 1022 return nil, err 1023 } 1024 1025 return aggregates, nil 1026} 1027 1028// type definition parser state 1029type typeParser struct { 1030 input string 1031 index int 1032} 1033 1034// the type definition parser result 1035type typeParserResult struct { 1036 isComposite bool 1037 types []TypeInfo 1038 reversed []bool 1039 collections map[string]TypeInfo 1040} 1041 1042// Parse the type definition used for validator and comparator schema data 1043func parseType(def string) typeParserResult { 1044 parser := &typeParser{input: def} 1045 return parser.parse() 1046} 1047 1048const ( 1049 REVERSED_TYPE = "org.apache.cassandra.db.marshal.ReversedType" 1050 COMPOSITE_TYPE = "org.apache.cassandra.db.marshal.CompositeType" 1051 COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType" 1052 LIST_TYPE = "org.apache.cassandra.db.marshal.ListType" 1053 SET_TYPE = "org.apache.cassandra.db.marshal.SetType" 1054 MAP_TYPE = "org.apache.cassandra.db.marshal.MapType" 1055) 1056 1057// represents a class specification in the type def AST 1058type typeParserClassNode struct { 1059 name string 1060 params []typeParserParamNode 1061 // this is the segment of the input string that defined this node 1062 input string 1063} 1064 1065// represents a class parameter in the type def AST 1066type typeParserParamNode struct { 1067 name *string 1068 class typeParserClassNode 1069} 1070 1071func (t *typeParser) parse() typeParserResult { 1072 // parse the AST 1073 ast, ok := t.parseClassNode() 1074 if !ok { 1075 // treat this is a custom type 1076 return typeParserResult{ 1077 isComposite: false, 1078 types: []TypeInfo{ 1079 NativeType{ 1080 typ: TypeCustom, 1081 custom: t.input, 1082 }, 1083 }, 1084 reversed: []bool{false}, 1085 collections: nil, 1086 } 1087 } 1088 1089 // interpret the AST 1090 if strings.HasPrefix(ast.name, COMPOSITE_TYPE) { 1091 count := len(ast.params) 1092 1093 // look for a collections param 1094 last := ast.params[count-1] 1095 collections := map[string]TypeInfo{} 1096 if strings.HasPrefix(last.class.name, COLLECTION_TYPE) { 1097 count-- 1098 1099 for _, param := range last.class.params { 1100 // decode the name 1101 var name string 1102 decoded, err := hex.DecodeString(*param.name) 1103 if err != nil { 1104 Logger.Printf( 1105 "Error parsing type '%s', contains collection name '%s' with an invalid format: %v", 1106 t.input, 1107 *param.name, 1108 err, 1109 ) 1110 // just use the provided name 1111 name = *param.name 1112 } else { 1113 name = string(decoded) 1114 } 1115 collections[name] = param.class.asTypeInfo() 1116 } 1117 } 1118 1119 types := make([]TypeInfo, count) 1120 reversed := make([]bool, count) 1121 1122 for i, param := range ast.params[:count] { 1123 class := param.class 1124 reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE) 1125 if reversed[i] { 1126 class = class.params[0].class 1127 } 1128 types[i] = class.asTypeInfo() 1129 } 1130 1131 return typeParserResult{ 1132 isComposite: true, 1133 types: types, 1134 reversed: reversed, 1135 collections: collections, 1136 } 1137 } else { 1138 // not composite, so one type 1139 class := *ast 1140 reversed := strings.HasPrefix(class.name, REVERSED_TYPE) 1141 if reversed { 1142 class = class.params[0].class 1143 } 1144 typeInfo := class.asTypeInfo() 1145 1146 return typeParserResult{ 1147 isComposite: false, 1148 types: []TypeInfo{typeInfo}, 1149 reversed: []bool{reversed}, 1150 } 1151 } 1152} 1153 1154func (class *typeParserClassNode) asTypeInfo() TypeInfo { 1155 if strings.HasPrefix(class.name, LIST_TYPE) { 1156 elem := class.params[0].class.asTypeInfo() 1157 return CollectionType{ 1158 NativeType: NativeType{ 1159 typ: TypeList, 1160 }, 1161 Elem: elem, 1162 } 1163 } 1164 if strings.HasPrefix(class.name, SET_TYPE) { 1165 elem := class.params[0].class.asTypeInfo() 1166 return CollectionType{ 1167 NativeType: NativeType{ 1168 typ: TypeSet, 1169 }, 1170 Elem: elem, 1171 } 1172 } 1173 if strings.HasPrefix(class.name, MAP_TYPE) { 1174 key := class.params[0].class.asTypeInfo() 1175 elem := class.params[1].class.asTypeInfo() 1176 return CollectionType{ 1177 NativeType: NativeType{ 1178 typ: TypeMap, 1179 }, 1180 Key: key, 1181 Elem: elem, 1182 } 1183 } 1184 1185 // must be a simple type or custom type 1186 info := NativeType{typ: getApacheCassandraType(class.name)} 1187 if info.typ == TypeCustom { 1188 // add the entire class definition 1189 info.custom = class.input 1190 } 1191 return info 1192} 1193 1194// CLASS := ID [ PARAMS ] 1195func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) { 1196 t.skipWhitespace() 1197 1198 startIndex := t.index 1199 1200 name, ok := t.nextIdentifier() 1201 if !ok { 1202 return nil, false 1203 } 1204 1205 params, ok := t.parseParamNodes() 1206 if !ok { 1207 return nil, false 1208 } 1209 1210 endIndex := t.index 1211 1212 node = &typeParserClassNode{ 1213 name: name, 1214 params: params, 1215 input: t.input[startIndex:endIndex], 1216 } 1217 return node, true 1218} 1219 1220// PARAMS := "(" PARAM { "," PARAM } ")" 1221// PARAM := [ PARAM_NAME ":" ] CLASS 1222// PARAM_NAME := ID 1223func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) { 1224 t.skipWhitespace() 1225 1226 // the params are optional 1227 if t.index == len(t.input) || t.input[t.index] != '(' { 1228 return nil, true 1229 } 1230 1231 params = []typeParserParamNode{} 1232 1233 // consume the '(' 1234 t.index++ 1235 1236 t.skipWhitespace() 1237 1238 for t.input[t.index] != ')' { 1239 // look for a named param, but if no colon, then we want to backup 1240 backupIndex := t.index 1241 1242 // name will be a hex encoded version of a utf-8 string 1243 name, ok := t.nextIdentifier() 1244 if !ok { 1245 return nil, false 1246 } 1247 hasName := true 1248 1249 // TODO handle '=>' used for DynamicCompositeType 1250 1251 t.skipWhitespace() 1252 1253 if t.input[t.index] == ':' { 1254 // there is a name for this parameter 1255 1256 // consume the ':' 1257 t.index++ 1258 1259 t.skipWhitespace() 1260 } else { 1261 // no name, backup 1262 hasName = false 1263 t.index = backupIndex 1264 } 1265 1266 // parse the next full parameter 1267 classNode, ok := t.parseClassNode() 1268 if !ok { 1269 return nil, false 1270 } 1271 1272 if hasName { 1273 params = append( 1274 params, 1275 typeParserParamNode{name: &name, class: *classNode}, 1276 ) 1277 } else { 1278 params = append( 1279 params, 1280 typeParserParamNode{class: *classNode}, 1281 ) 1282 } 1283 1284 t.skipWhitespace() 1285 1286 if t.input[t.index] == ',' { 1287 // consume the comma 1288 t.index++ 1289 1290 t.skipWhitespace() 1291 } 1292 } 1293 1294 // consume the ')' 1295 t.index++ 1296 1297 return params, true 1298} 1299 1300func (t *typeParser) skipWhitespace() { 1301 for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) { 1302 t.index++ 1303 } 1304} 1305 1306func isWhitespaceChar(c byte) bool { 1307 return c == ' ' || c == '\n' || c == '\t' 1308} 1309 1310// ID := LETTER { LETTER } 1311// LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&" 1312func (t *typeParser) nextIdentifier() (id string, found bool) { 1313 startIndex := t.index 1314 for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) { 1315 t.index++ 1316 } 1317 if startIndex == t.index { 1318 return "", false 1319 } 1320 return t.input[startIndex:t.index], true 1321} 1322 1323func isIdentifierChar(c byte) bool { 1324 return (c >= '0' && c <= '9') || 1325 (c >= 'a' && c <= 'z') || 1326 (c >= 'A' && c <= 'Z') || 1327 c == '-' || 1328 c == '+' || 1329 c == '.' || 1330 c == '_' || 1331 c == '&' 1332} 1333