1// Copyright (c) 2015 The gocql Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package gocql
6
7import (
8	"encoding/hex"
9	"encoding/json"
10	"fmt"
11	"strconv"
12	"strings"
13	"sync"
14)
15
16// schema metadata for a keyspace
17type KeyspaceMetadata struct {
18	Name            string
19	DurableWrites   bool
20	StrategyClass   string
21	StrategyOptions map[string]interface{}
22	Tables          map[string]*TableMetadata
23	Functions       map[string]*FunctionMetadata
24	Aggregates      map[string]*AggregateMetadata
25	Views           map[string]*ViewMetadata
26}
27
28// schema metadata for a table (a.k.a. column family)
29type TableMetadata struct {
30	Keyspace          string
31	Name              string
32	KeyValidator      string
33	Comparator        string
34	DefaultValidator  string
35	KeyAliases        []string
36	ColumnAliases     []string
37	ValueAlias        string
38	PartitionKey      []*ColumnMetadata
39	ClusteringColumns []*ColumnMetadata
40	Columns           map[string]*ColumnMetadata
41	OrderedColumns    []string
42}
43
44// schema metadata for a column
45type ColumnMetadata struct {
46	Keyspace        string
47	Table           string
48	Name            string
49	ComponentIndex  int
50	Kind            ColumnKind
51	Validator       string
52	Type            TypeInfo
53	ClusteringOrder string
54	Order           ColumnOrder
55	Index           ColumnIndexMetadata
56}
57
58// FunctionMetadata holds metadata for function constructs
59type FunctionMetadata struct {
60	Keyspace          string
61	Name              string
62	ArgumentTypes     []TypeInfo
63	ArgumentNames     []string
64	Body              string
65	CalledOnNullInput bool
66	Language          string
67	ReturnType        TypeInfo
68}
69
70// AggregateMetadata holds metadata for aggregate constructs
71type AggregateMetadata struct {
72	Keyspace      string
73	Name          string
74	ArgumentTypes []TypeInfo
75	FinalFunc     FunctionMetadata
76	InitCond      string
77	ReturnType    TypeInfo
78	StateFunc     FunctionMetadata
79	StateType     TypeInfo
80
81	stateFunc string
82	finalFunc string
83}
84
85// ViewMetadata holds the metadata for views.
86type ViewMetadata struct {
87	Keyspace   string
88	Name       string
89	FieldNames []string
90	FieldTypes []TypeInfo
91}
92
93// the ordering of the column with regard to its comparator
94type ColumnOrder bool
95
96const (
97	ASC  ColumnOrder = false
98	DESC             = true
99)
100
101type ColumnIndexMetadata struct {
102	Name    string
103	Type    string
104	Options map[string]interface{}
105}
106
107type ColumnKind int
108
109const (
110	ColumnUnkownKind ColumnKind = iota
111	ColumnPartitionKey
112	ColumnClusteringKey
113	ColumnRegular
114	ColumnCompact
115	ColumnStatic
116)
117
118func (c ColumnKind) String() string {
119	switch c {
120	case ColumnPartitionKey:
121		return "partition_key"
122	case ColumnClusteringKey:
123		return "clustering_key"
124	case ColumnRegular:
125		return "regular"
126	case ColumnCompact:
127		return "compact"
128	case ColumnStatic:
129		return "static"
130	default:
131		return fmt.Sprintf("unknown_column_%d", c)
132	}
133}
134
135func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte) error {
136	if typ.Type() != TypeVarchar {
137		return unmarshalErrorf("unable to marshall %s into ColumnKind, expected Varchar", typ)
138	}
139
140	kind, err := columnKindFromSchema(string(p))
141	if err != nil {
142		return err
143	}
144	*c = kind
145
146	return nil
147}
148
149func columnKindFromSchema(kind string) (ColumnKind, error) {
150	switch kind {
151	case "partition_key":
152		return ColumnPartitionKey, nil
153	case "clustering_key", "clustering":
154		return ColumnClusteringKey, nil
155	case "regular":
156		return ColumnRegular, nil
157	case "compact_value":
158		return ColumnCompact, nil
159	case "static":
160		return ColumnStatic, nil
161	default:
162		return -1, fmt.Errorf("unknown column kind: %q", kind)
163	}
164}
165
166// default alias values
167const (
168	DEFAULT_KEY_ALIAS    = "key"
169	DEFAULT_COLUMN_ALIAS = "column"
170	DEFAULT_VALUE_ALIAS  = "value"
171)
172
173// queries the cluster for schema information for a specific keyspace
174type schemaDescriber struct {
175	session *Session
176	mu      sync.Mutex
177
178	cache map[string]*KeyspaceMetadata
179}
180
181// creates a session bound schema describer which will query and cache
182// keyspace metadata
183func newSchemaDescriber(session *Session) *schemaDescriber {
184	return &schemaDescriber{
185		session: session,
186		cache:   map[string]*KeyspaceMetadata{},
187	}
188}
189
190// returns the cached KeyspaceMetadata held by the describer for the named
191// keyspace.
192func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
193	s.mu.Lock()
194	defer s.mu.Unlock()
195
196	metadata, found := s.cache[keyspaceName]
197	if !found {
198		// refresh the cache for this keyspace
199		err := s.refreshSchema(keyspaceName)
200		if err != nil {
201			return nil, err
202		}
203
204		metadata = s.cache[keyspaceName]
205	}
206
207	return metadata, nil
208}
209
210// clears the already cached keyspace metadata
211func (s *schemaDescriber) clearSchema(keyspaceName string) {
212	s.mu.Lock()
213	defer s.mu.Unlock()
214
215	delete(s.cache, keyspaceName)
216}
217
218// forcibly updates the current KeyspaceMetadata held by the schema describer
219// for a given named keyspace.
220func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
221	var err error
222
223	// query the system keyspace for schema data
224	// TODO retrieve concurrently
225	keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
226	if err != nil {
227		return err
228	}
229	tables, err := getTableMetadata(s.session, keyspaceName)
230	if err != nil {
231		return err
232	}
233	columns, err := getColumnMetadata(s.session, keyspaceName)
234	if err != nil {
235		return err
236	}
237	functions, err := getFunctionsMetadata(s.session, keyspaceName)
238	if err != nil {
239		return err
240	}
241	aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
242	if err != nil {
243		return err
244	}
245	views, err := getViewsMetadata(s.session, keyspaceName)
246	if err != nil {
247		return err
248	}
249
250	// organize the schema data
251	compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates, views)
252
253	// update the cache
254	s.cache[keyspaceName] = keyspace
255
256	return nil
257}
258
259// "compiles" derived information about keyspace, table, and column metadata
260// for a keyspace from the basic queried metadata objects returned by
261// getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
262// Links the metadata objects together and derives the column composition of
263// the partition key and clustering key for a table.
264func compileMetadata(
265	protoVersion int,
266	keyspace *KeyspaceMetadata,
267	tables []TableMetadata,
268	columns []ColumnMetadata,
269	functions []FunctionMetadata,
270	aggregates []AggregateMetadata,
271	views []ViewMetadata,
272) {
273	keyspace.Tables = make(map[string]*TableMetadata)
274	for i := range tables {
275		tables[i].Columns = make(map[string]*ColumnMetadata)
276
277		keyspace.Tables[tables[i].Name] = &tables[i]
278	}
279	keyspace.Functions = make(map[string]*FunctionMetadata, len(functions))
280	for i := range functions {
281		keyspace.Functions[functions[i].Name] = &functions[i]
282	}
283	keyspace.Aggregates = make(map[string]*AggregateMetadata, len(aggregates))
284	for _, aggregate := range aggregates {
285		aggregate.FinalFunc = *keyspace.Functions[aggregate.finalFunc]
286		aggregate.StateFunc = *keyspace.Functions[aggregate.stateFunc]
287		keyspace.Aggregates[aggregate.Name] = &aggregate
288	}
289	keyspace.Views = make(map[string]*ViewMetadata, len(views))
290	for i := range views {
291		keyspace.Views[views[i].Name] = &views[i]
292	}
293
294	// add columns from the schema data
295	for i := range columns {
296		col := &columns[i]
297		// decode the validator for TypeInfo and order
298		if col.ClusteringOrder != "" { // Cassandra 3.x+
299			col.Type = getCassandraType(col.Validator)
300			col.Order = ASC
301			if col.ClusteringOrder == "desc" {
302				col.Order = DESC
303			}
304		} else {
305			validatorParsed := parseType(col.Validator)
306			col.Type = validatorParsed.types[0]
307			col.Order = ASC
308			if validatorParsed.reversed[0] {
309				col.Order = DESC
310			}
311		}
312
313		table, ok := keyspace.Tables[col.Table]
314		if !ok {
315			// if the schema is being updated we will race between seeing
316			// the metadata be complete. Potentially we should check for
317			// schema versions before and after reading the metadata and
318			// if they dont match try again.
319			continue
320		}
321
322		table.Columns[col.Name] = col
323		table.OrderedColumns = append(table.OrderedColumns, col.Name)
324	}
325
326	if protoVersion == protoVersion1 {
327		compileV1Metadata(tables)
328	} else {
329		compileV2Metadata(tables)
330	}
331}
332
333// Compiles derived information from TableMetadata which have had
334// ColumnMetadata added already. V1 protocol does not return as much
335// column metadata as V2+ (because V1 doesn't support the "type" column in the
336// system.schema_columns table) so determining PartitionKey and ClusterColumns
337// is more complex.
338func compileV1Metadata(tables []TableMetadata) {
339	for i := range tables {
340		table := &tables[i]
341
342		// decode the key validator
343		keyValidatorParsed := parseType(table.KeyValidator)
344		// decode the comparator
345		comparatorParsed := parseType(table.Comparator)
346
347		// the partition key length is the same as the number of types in the
348		// key validator
349		table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
350
351		// V1 protocol only returns "regular" columns from
352		// system.schema_columns (there is no type field for columns)
353		// so the alias information is used to
354		// create the partition key and clustering columns
355
356		// construct the partition key from the alias
357		for i := range table.PartitionKey {
358			var alias string
359			if len(table.KeyAliases) > i {
360				alias = table.KeyAliases[i]
361			} else if i == 0 {
362				alias = DEFAULT_KEY_ALIAS
363			} else {
364				alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1)
365			}
366
367			column := &ColumnMetadata{
368				Keyspace:       table.Keyspace,
369				Table:          table.Name,
370				Name:           alias,
371				Type:           keyValidatorParsed.types[i],
372				Kind:           ColumnPartitionKey,
373				ComponentIndex: i,
374			}
375
376			table.PartitionKey[i] = column
377			table.Columns[alias] = column
378		}
379
380		// determine the number of clustering columns
381		size := len(comparatorParsed.types)
382		if comparatorParsed.isComposite {
383			if len(comparatorParsed.collections) != 0 ||
384				(len(table.ColumnAliases) == size-1 &&
385					comparatorParsed.types[size-1].Type() == TypeVarchar) {
386				size = size - 1
387			}
388		} else {
389			if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) {
390				size = 0
391			}
392		}
393
394		table.ClusteringColumns = make([]*ColumnMetadata, size)
395
396		for i := range table.ClusteringColumns {
397			var alias string
398			if len(table.ColumnAliases) > i {
399				alias = table.ColumnAliases[i]
400			} else if i == 0 {
401				alias = DEFAULT_COLUMN_ALIAS
402			} else {
403				alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1)
404			}
405
406			order := ASC
407			if comparatorParsed.reversed[i] {
408				order = DESC
409			}
410
411			column := &ColumnMetadata{
412				Keyspace:       table.Keyspace,
413				Table:          table.Name,
414				Name:           alias,
415				Type:           comparatorParsed.types[i],
416				Order:          order,
417				Kind:           ColumnClusteringKey,
418				ComponentIndex: i,
419			}
420
421			table.ClusteringColumns[i] = column
422			table.Columns[alias] = column
423		}
424
425		if size != len(comparatorParsed.types)-1 {
426			alias := DEFAULT_VALUE_ALIAS
427			if len(table.ValueAlias) > 0 {
428				alias = table.ValueAlias
429			}
430			// decode the default validator
431			defaultValidatorParsed := parseType(table.DefaultValidator)
432			column := &ColumnMetadata{
433				Keyspace: table.Keyspace,
434				Table:    table.Name,
435				Name:     alias,
436				Type:     defaultValidatorParsed.types[0],
437				Kind:     ColumnRegular,
438			}
439			table.Columns[alias] = column
440		}
441	}
442}
443
444// The simpler compile case for V2+ protocol
445func compileV2Metadata(tables []TableMetadata) {
446	for i := range tables {
447		table := &tables[i]
448
449		clusteringColumnCount := componentColumnCountOfType(table.Columns, ColumnClusteringKey)
450		table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
451
452		if table.KeyValidator != "" {
453			keyValidatorParsed := parseType(table.KeyValidator)
454			table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
455		} else { // Cassandra 3.x+
456			partitionKeyCount := componentColumnCountOfType(table.Columns, ColumnPartitionKey)
457			table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount)
458		}
459
460		for _, columnName := range table.OrderedColumns {
461			column := table.Columns[columnName]
462			if column.Kind == ColumnPartitionKey {
463				table.PartitionKey[column.ComponentIndex] = column
464			} else if column.Kind == ColumnClusteringKey {
465				table.ClusteringColumns[column.ComponentIndex] = column
466			}
467		}
468	}
469}
470
471// returns the count of coluns with the given "kind" value.
472func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind ColumnKind) int {
473	maxComponentIndex := -1
474	for _, column := range columns {
475		if column.Kind == kind && column.ComponentIndex > maxComponentIndex {
476			maxComponentIndex = column.ComponentIndex
477		}
478	}
479	return maxComponentIndex + 1
480}
481
482// query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
483func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
484	keyspace := &KeyspaceMetadata{Name: keyspaceName}
485
486	if session.useSystemSchema { // Cassandra 3.x+
487		const stmt = `
488		SELECT durable_writes, replication
489		FROM system_schema.keyspaces
490		WHERE keyspace_name = ?`
491
492		var replication map[string]string
493
494		iter := session.control.query(stmt, keyspaceName)
495		if iter.NumRows() == 0 {
496			return nil, ErrKeyspaceDoesNotExist
497		}
498		iter.Scan(&keyspace.DurableWrites, &replication)
499		err := iter.Close()
500		if err != nil {
501			return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
502		}
503
504		keyspace.StrategyClass = replication["class"]
505		delete(replication, "class")
506
507		keyspace.StrategyOptions = make(map[string]interface{}, len(replication))
508		for k, v := range replication {
509			keyspace.StrategyOptions[k] = v
510		}
511	} else {
512
513		const stmt = `
514		SELECT durable_writes, strategy_class, strategy_options
515		FROM system.schema_keyspaces
516		WHERE keyspace_name = ?`
517
518		var strategyOptionsJSON []byte
519
520		iter := session.control.query(stmt, keyspaceName)
521		if iter.NumRows() == 0 {
522			return nil, ErrKeyspaceDoesNotExist
523		}
524		iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
525		err := iter.Close()
526		if err != nil {
527			return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
528		}
529
530		err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
531		if err != nil {
532			return nil, fmt.Errorf(
533				"Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
534				strategyOptionsJSON, keyspace.Name, err,
535			)
536		}
537	}
538
539	return keyspace, nil
540}
541
542// query for only the table metadata in the specified keyspace from system.schema_columnfamilies
543func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, error) {
544
545	var (
546		iter *Iter
547		scan func(iter *Iter, table *TableMetadata) bool
548		stmt string
549
550		keyAliasesJSON    []byte
551		columnAliasesJSON []byte
552	)
553
554	if session.useSystemSchema { // Cassandra 3.x+
555		stmt = `
556		SELECT
557			table_name
558		FROM system_schema.tables
559		WHERE keyspace_name = ?`
560
561		switchIter := func() *Iter {
562			iter.Close()
563			stmt = `
564				SELECT
565					view_name
566				FROM system_schema.views
567				WHERE keyspace_name = ?`
568			iter = session.control.query(stmt, keyspaceName)
569			return iter
570		}
571
572		scan = func(iter *Iter, table *TableMetadata) bool {
573			r := iter.Scan(
574				&table.Name,
575			)
576			if !r {
577				iter = switchIter()
578				if iter != nil {
579					switchIter = func() *Iter { return nil }
580					r = iter.Scan(&table.Name)
581				}
582			}
583			return r
584		}
585	} else if session.cfg.ProtoVersion == protoVersion1 {
586		// we have key aliases
587		stmt = `
588		SELECT
589			columnfamily_name,
590			key_validator,
591			comparator,
592			default_validator,
593			key_aliases,
594			column_aliases,
595			value_alias
596		FROM system.schema_columnfamilies
597		WHERE keyspace_name = ?`
598
599		scan = func(iter *Iter, table *TableMetadata) bool {
600			return iter.Scan(
601				&table.Name,
602				&table.KeyValidator,
603				&table.Comparator,
604				&table.DefaultValidator,
605				&keyAliasesJSON,
606				&columnAliasesJSON,
607				&table.ValueAlias,
608			)
609		}
610	} else {
611		stmt = `
612		SELECT
613			columnfamily_name,
614			key_validator,
615			comparator,
616			default_validator
617		FROM system.schema_columnfamilies
618		WHERE keyspace_name = ?`
619
620		scan = func(iter *Iter, table *TableMetadata) bool {
621			return iter.Scan(
622				&table.Name,
623				&table.KeyValidator,
624				&table.Comparator,
625				&table.DefaultValidator,
626			)
627		}
628	}
629
630	iter = session.control.query(stmt, keyspaceName)
631
632	tables := []TableMetadata{}
633	table := TableMetadata{Keyspace: keyspaceName}
634
635	for scan(iter, &table) {
636		var err error
637
638		// decode the key aliases
639		if keyAliasesJSON != nil {
640			table.KeyAliases = []string{}
641			err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases)
642			if err != nil {
643				iter.Close()
644				return nil, fmt.Errorf(
645					"Invalid JSON value '%s' as key_aliases for in table '%s': %v",
646					keyAliasesJSON, table.Name, err,
647				)
648			}
649		}
650
651		// decode the column aliases
652		if columnAliasesJSON != nil {
653			table.ColumnAliases = []string{}
654			err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases)
655			if err != nil {
656				iter.Close()
657				return nil, fmt.Errorf(
658					"Invalid JSON value '%s' as column_aliases for in table '%s': %v",
659					columnAliasesJSON, table.Name, err,
660				)
661			}
662		}
663
664		tables = append(tables, table)
665		table = TableMetadata{Keyspace: keyspaceName}
666	}
667
668	err := iter.Close()
669	if err != nil && err != ErrNotFound {
670		return nil, fmt.Errorf("Error querying table schema: %v", err)
671	}
672
673	return tables, nil
674}
675
676func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
677	// V1 does not support the type column, and all returned rows are
678	// of kind "regular".
679	const stmt = `
680		SELECT
681				columnfamily_name,
682				column_name,
683				component_index,
684				validator,
685				index_name,
686				index_type,
687				index_options
688			FROM system.schema_columns
689			WHERE keyspace_name = ?`
690
691	var columns []ColumnMetadata
692
693	rows := s.control.query(stmt, keyspace).Scanner()
694	for rows.Next() {
695		var (
696			column           = ColumnMetadata{Keyspace: keyspace}
697			indexOptionsJSON []byte
698		)
699
700		// all columns returned by V1 are regular
701		column.Kind = ColumnRegular
702
703		err := rows.Scan(&column.Table,
704			&column.Name,
705			&column.ComponentIndex,
706			&column.Validator,
707			&column.Index.Name,
708			&column.Index.Type,
709			&indexOptionsJSON)
710
711		if err != nil {
712			return nil, err
713		}
714
715		if len(indexOptionsJSON) > 0 {
716			err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
717			if err != nil {
718				return nil, fmt.Errorf(
719					"Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
720					indexOptionsJSON,
721					column.Name,
722					column.Table,
723					err)
724			}
725		}
726
727		columns = append(columns, column)
728	}
729
730	if err := rows.Err(); err != nil {
731		return nil, err
732	}
733
734	return columns, nil
735}
736
737func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) {
738	// V2+ supports the type column
739	const stmt = `
740			SELECT
741				columnfamily_name,
742				column_name,
743				component_index,
744				validator,
745				index_name,
746				index_type,
747				index_options,
748				type
749			FROM system.schema_columns
750			WHERE keyspace_name = ?`
751
752	var columns []ColumnMetadata
753
754	rows := s.control.query(stmt, keyspace).Scanner()
755	for rows.Next() {
756		var (
757			column           = ColumnMetadata{Keyspace: keyspace}
758			indexOptionsJSON []byte
759		)
760
761		err := rows.Scan(&column.Table,
762			&column.Name,
763			&column.ComponentIndex,
764			&column.Validator,
765			&column.Index.Name,
766			&column.Index.Type,
767			&indexOptionsJSON,
768			&column.Kind,
769		)
770
771		if err != nil {
772			return nil, err
773		}
774
775		if len(indexOptionsJSON) > 0 {
776			err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
777			if err != nil {
778				return nil, fmt.Errorf(
779					"Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
780					indexOptionsJSON,
781					column.Name,
782					column.Table,
783					err)
784			}
785		}
786
787		columns = append(columns, column)
788	}
789
790	if err := rows.Err(); err != nil {
791		return nil, err
792	}
793
794	return columns, nil
795
796}
797
798func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) {
799	const stmt = `
800			SELECT
801				table_name,
802				column_name,
803				clustering_order,
804				type,
805				kind,
806				position
807			FROM system_schema.columns
808			WHERE keyspace_name = ?`
809
810	var columns []ColumnMetadata
811
812	rows := s.control.query(stmt, keyspace).Scanner()
813	for rows.Next() {
814		column := ColumnMetadata{Keyspace: keyspace}
815
816		err := rows.Scan(&column.Table,
817			&column.Name,
818			&column.ClusteringOrder,
819			&column.Validator,
820			&column.Kind,
821			&column.ComponentIndex,
822		)
823
824		if err != nil {
825			return nil, err
826		}
827
828		columns = append(columns, column)
829	}
830
831	if err := rows.Err(); err != nil {
832		return nil, err
833	}
834
835	// TODO(zariel): get column index info from system_schema.indexes
836
837	return columns, nil
838}
839
840// query for only the column metadata in the specified keyspace from system.schema_columns
841func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) {
842	var (
843		columns []ColumnMetadata
844		err     error
845	)
846
847	// Deal with differences in protocol versions
848	if session.cfg.ProtoVersion == 1 {
849		columns, err = session.scanColumnMetadataV1(keyspaceName)
850	} else if session.useSystemSchema { // Cassandra 3.x+
851		columns, err = session.scanColumnMetadataSystem(keyspaceName)
852	} else {
853		columns, err = session.scanColumnMetadataV2(keyspaceName)
854	}
855
856	if err != nil && err != ErrNotFound {
857		return nil, fmt.Errorf("Error querying column schema: %v", err)
858	}
859
860	return columns, nil
861}
862
863func getTypeInfo(t string) TypeInfo {
864	if strings.HasPrefix(t, apacheCassandraTypePrefix) {
865		t = apacheToCassandraType(t)
866	}
867	return getCassandraType(t)
868}
869
870func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, error) {
871	if session.cfg.ProtoVersion == protoVersion1 {
872		return nil, nil
873	}
874	var tableName string
875	if session.useSystemSchema {
876		tableName = "system_schema.types"
877	} else {
878		tableName = "system.schema_usertypes"
879	}
880	stmt := fmt.Sprintf(`
881		SELECT
882			type_name,
883			field_names,
884			field_types
885		FROM %s
886		WHERE keyspace_name = ?`, tableName)
887
888	var views []ViewMetadata
889
890	rows := session.control.query(stmt, keyspaceName).Scanner()
891	for rows.Next() {
892		view := ViewMetadata{Keyspace: keyspaceName}
893		var argumentTypes []string
894		err := rows.Scan(&view.Name,
895			&view.FieldNames,
896			&argumentTypes,
897		)
898		if err != nil {
899			return nil, err
900		}
901		view.FieldTypes = make([]TypeInfo, len(argumentTypes))
902		for i, argumentType := range argumentTypes {
903			view.FieldTypes[i] = getTypeInfo(argumentType)
904		}
905		views = append(views, view)
906	}
907
908	if err := rows.Err(); err != nil {
909		return nil, err
910	}
911
912	return views, nil
913}
914
915func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMetadata, error) {
916	if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions {
917		return nil, nil
918	}
919	var tableName string
920	if session.useSystemSchema {
921		tableName = "system_schema.functions"
922	} else {
923		tableName = "system.schema_functions"
924	}
925	stmt := fmt.Sprintf(`
926		SELECT
927			function_name,
928			argument_types,
929			argument_names,
930			body,
931			called_on_null_input,
932			language,
933			return_type
934		FROM %s
935		WHERE keyspace_name = ?`, tableName)
936
937	var functions []FunctionMetadata
938
939	rows := session.control.query(stmt, keyspaceName).Scanner()
940	for rows.Next() {
941		function := FunctionMetadata{Keyspace: keyspaceName}
942		var argumentTypes []string
943		var returnType string
944		err := rows.Scan(&function.Name,
945			&argumentTypes,
946			&function.ArgumentNames,
947			&function.Body,
948			&function.CalledOnNullInput,
949			&function.Language,
950			&returnType,
951		)
952		if err != nil {
953			return nil, err
954		}
955		function.ReturnType = getTypeInfo(returnType)
956		function.ArgumentTypes = make([]TypeInfo, len(argumentTypes))
957		for i, argumentType := range argumentTypes {
958			function.ArgumentTypes[i] = getTypeInfo(argumentType)
959		}
960		functions = append(functions, function)
961	}
962
963	if err := rows.Err(); err != nil {
964		return nil, err
965	}
966
967	return functions, nil
968}
969
970func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMetadata, error) {
971	if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions {
972		return nil, nil
973	}
974	var tableName string
975	if session.useSystemSchema {
976		tableName = "system_schema.aggregates"
977	} else {
978		tableName = "system.schema_aggregates"
979	}
980
981	stmt := fmt.Sprintf(`
982		SELECT
983			aggregate_name,
984			argument_types,
985			final_func,
986			initcond,
987			return_type,
988			state_func,
989			state_type
990		FROM %s
991		WHERE keyspace_name = ?`, tableName)
992
993	var aggregates []AggregateMetadata
994
995	rows := session.control.query(stmt, keyspaceName).Scanner()
996	for rows.Next() {
997		aggregate := AggregateMetadata{Keyspace: keyspaceName}
998		var argumentTypes []string
999		var returnType string
1000		var stateType string
1001		err := rows.Scan(&aggregate.Name,
1002			&argumentTypes,
1003			&aggregate.finalFunc,
1004			&aggregate.InitCond,
1005			&returnType,
1006			&aggregate.stateFunc,
1007			&stateType,
1008		)
1009		if err != nil {
1010			return nil, err
1011		}
1012		aggregate.ReturnType = getTypeInfo(returnType)
1013		aggregate.StateType = getTypeInfo(stateType)
1014		aggregate.ArgumentTypes = make([]TypeInfo, len(argumentTypes))
1015		for i, argumentType := range argumentTypes {
1016			aggregate.ArgumentTypes[i] = getTypeInfo(argumentType)
1017		}
1018		aggregates = append(aggregates, aggregate)
1019	}
1020
1021	if err := rows.Err(); err != nil {
1022		return nil, err
1023	}
1024
1025	return aggregates, nil
1026}
1027
1028// type definition parser state
1029type typeParser struct {
1030	input string
1031	index int
1032}
1033
1034// the type definition parser result
1035type typeParserResult struct {
1036	isComposite bool
1037	types       []TypeInfo
1038	reversed    []bool
1039	collections map[string]TypeInfo
1040}
1041
1042// Parse the type definition used for validator and comparator schema data
1043func parseType(def string) typeParserResult {
1044	parser := &typeParser{input: def}
1045	return parser.parse()
1046}
1047
1048const (
1049	REVERSED_TYPE   = "org.apache.cassandra.db.marshal.ReversedType"
1050	COMPOSITE_TYPE  = "org.apache.cassandra.db.marshal.CompositeType"
1051	COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType"
1052	LIST_TYPE       = "org.apache.cassandra.db.marshal.ListType"
1053	SET_TYPE        = "org.apache.cassandra.db.marshal.SetType"
1054	MAP_TYPE        = "org.apache.cassandra.db.marshal.MapType"
1055)
1056
1057// represents a class specification in the type def AST
1058type typeParserClassNode struct {
1059	name   string
1060	params []typeParserParamNode
1061	// this is the segment of the input string that defined this node
1062	input string
1063}
1064
1065// represents a class parameter in the type def AST
1066type typeParserParamNode struct {
1067	name  *string
1068	class typeParserClassNode
1069}
1070
1071func (t *typeParser) parse() typeParserResult {
1072	// parse the AST
1073	ast, ok := t.parseClassNode()
1074	if !ok {
1075		// treat this is a custom type
1076		return typeParserResult{
1077			isComposite: false,
1078			types: []TypeInfo{
1079				NativeType{
1080					typ:    TypeCustom,
1081					custom: t.input,
1082				},
1083			},
1084			reversed:    []bool{false},
1085			collections: nil,
1086		}
1087	}
1088
1089	// interpret the AST
1090	if strings.HasPrefix(ast.name, COMPOSITE_TYPE) {
1091		count := len(ast.params)
1092
1093		// look for a collections param
1094		last := ast.params[count-1]
1095		collections := map[string]TypeInfo{}
1096		if strings.HasPrefix(last.class.name, COLLECTION_TYPE) {
1097			count--
1098
1099			for _, param := range last.class.params {
1100				// decode the name
1101				var name string
1102				decoded, err := hex.DecodeString(*param.name)
1103				if err != nil {
1104					Logger.Printf(
1105						"Error parsing type '%s', contains collection name '%s' with an invalid format: %v",
1106						t.input,
1107						*param.name,
1108						err,
1109					)
1110					// just use the provided name
1111					name = *param.name
1112				} else {
1113					name = string(decoded)
1114				}
1115				collections[name] = param.class.asTypeInfo()
1116			}
1117		}
1118
1119		types := make([]TypeInfo, count)
1120		reversed := make([]bool, count)
1121
1122		for i, param := range ast.params[:count] {
1123			class := param.class
1124			reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE)
1125			if reversed[i] {
1126				class = class.params[0].class
1127			}
1128			types[i] = class.asTypeInfo()
1129		}
1130
1131		return typeParserResult{
1132			isComposite: true,
1133			types:       types,
1134			reversed:    reversed,
1135			collections: collections,
1136		}
1137	} else {
1138		// not composite, so one type
1139		class := *ast
1140		reversed := strings.HasPrefix(class.name, REVERSED_TYPE)
1141		if reversed {
1142			class = class.params[0].class
1143		}
1144		typeInfo := class.asTypeInfo()
1145
1146		return typeParserResult{
1147			isComposite: false,
1148			types:       []TypeInfo{typeInfo},
1149			reversed:    []bool{reversed},
1150		}
1151	}
1152}
1153
1154func (class *typeParserClassNode) asTypeInfo() TypeInfo {
1155	if strings.HasPrefix(class.name, LIST_TYPE) {
1156		elem := class.params[0].class.asTypeInfo()
1157		return CollectionType{
1158			NativeType: NativeType{
1159				typ: TypeList,
1160			},
1161			Elem: elem,
1162		}
1163	}
1164	if strings.HasPrefix(class.name, SET_TYPE) {
1165		elem := class.params[0].class.asTypeInfo()
1166		return CollectionType{
1167			NativeType: NativeType{
1168				typ: TypeSet,
1169			},
1170			Elem: elem,
1171		}
1172	}
1173	if strings.HasPrefix(class.name, MAP_TYPE) {
1174		key := class.params[0].class.asTypeInfo()
1175		elem := class.params[1].class.asTypeInfo()
1176		return CollectionType{
1177			NativeType: NativeType{
1178				typ: TypeMap,
1179			},
1180			Key:  key,
1181			Elem: elem,
1182		}
1183	}
1184
1185	// must be a simple type or custom type
1186	info := NativeType{typ: getApacheCassandraType(class.name)}
1187	if info.typ == TypeCustom {
1188		// add the entire class definition
1189		info.custom = class.input
1190	}
1191	return info
1192}
1193
1194// CLASS := ID [ PARAMS ]
1195func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) {
1196	t.skipWhitespace()
1197
1198	startIndex := t.index
1199
1200	name, ok := t.nextIdentifier()
1201	if !ok {
1202		return nil, false
1203	}
1204
1205	params, ok := t.parseParamNodes()
1206	if !ok {
1207		return nil, false
1208	}
1209
1210	endIndex := t.index
1211
1212	node = &typeParserClassNode{
1213		name:   name,
1214		params: params,
1215		input:  t.input[startIndex:endIndex],
1216	}
1217	return node, true
1218}
1219
1220// PARAMS := "(" PARAM { "," PARAM } ")"
1221// PARAM := [ PARAM_NAME ":" ] CLASS
1222// PARAM_NAME := ID
1223func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) {
1224	t.skipWhitespace()
1225
1226	// the params are optional
1227	if t.index == len(t.input) || t.input[t.index] != '(' {
1228		return nil, true
1229	}
1230
1231	params = []typeParserParamNode{}
1232
1233	// consume the '('
1234	t.index++
1235
1236	t.skipWhitespace()
1237
1238	for t.input[t.index] != ')' {
1239		// look for a named param, but if no colon, then we want to backup
1240		backupIndex := t.index
1241
1242		// name will be a hex encoded version of a utf-8 string
1243		name, ok := t.nextIdentifier()
1244		if !ok {
1245			return nil, false
1246		}
1247		hasName := true
1248
1249		// TODO handle '=>' used for DynamicCompositeType
1250
1251		t.skipWhitespace()
1252
1253		if t.input[t.index] == ':' {
1254			// there is a name for this parameter
1255
1256			// consume the ':'
1257			t.index++
1258
1259			t.skipWhitespace()
1260		} else {
1261			// no name, backup
1262			hasName = false
1263			t.index = backupIndex
1264		}
1265
1266		// parse the next full parameter
1267		classNode, ok := t.parseClassNode()
1268		if !ok {
1269			return nil, false
1270		}
1271
1272		if hasName {
1273			params = append(
1274				params,
1275				typeParserParamNode{name: &name, class: *classNode},
1276			)
1277		} else {
1278			params = append(
1279				params,
1280				typeParserParamNode{class: *classNode},
1281			)
1282		}
1283
1284		t.skipWhitespace()
1285
1286		if t.input[t.index] == ',' {
1287			// consume the comma
1288			t.index++
1289
1290			t.skipWhitespace()
1291		}
1292	}
1293
1294	// consume the ')'
1295	t.index++
1296
1297	return params, true
1298}
1299
1300func (t *typeParser) skipWhitespace() {
1301	for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) {
1302		t.index++
1303	}
1304}
1305
1306func isWhitespaceChar(c byte) bool {
1307	return c == ' ' || c == '\n' || c == '\t'
1308}
1309
1310// ID := LETTER { LETTER }
1311// LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&"
1312func (t *typeParser) nextIdentifier() (id string, found bool) {
1313	startIndex := t.index
1314	for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) {
1315		t.index++
1316	}
1317	if startIndex == t.index {
1318		return "", false
1319	}
1320	return t.input[startIndex:t.index], true
1321}
1322
1323func isIdentifierChar(c byte) bool {
1324	return (c >= '0' && c <= '9') ||
1325		(c >= 'a' && c <= 'z') ||
1326		(c >= 'A' && c <= 'Z') ||
1327		c == '-' ||
1328		c == '+' ||
1329		c == '.' ||
1330		c == '_' ||
1331		c == '&'
1332}
1333