1// Copyright (c) 2012 The gocql Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package gocql 6 7import ( 8 "bytes" 9 "context" 10 "encoding/binary" 11 "errors" 12 "fmt" 13 "io" 14 "net" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19 "unicode" 20 21 "github.com/gocql/gocql/internal/lru" 22) 23 24// Session is the interface used by users to interact with the database. 25// 26// It's safe for concurrent use by multiple goroutines and a typical usage 27// scenario is to have one global session object to interact with the 28// whole Cassandra cluster. 29// 30// This type extends the Node interface by adding a convinient query builder 31// and automatically sets a default consistency level on all operations 32// that do not have a consistency level set. 33type Session struct { 34 cons Consistency 35 pageSize int 36 prefetch float64 37 routingKeyInfoCache routingKeyInfoLRU 38 schemaDescriber *schemaDescriber 39 trace Tracer 40 queryObserver QueryObserver 41 batchObserver BatchObserver 42 connectObserver ConnectObserver 43 frameObserver FrameHeaderObserver 44 hostSource *ringDescriber 45 stmtsLRU *preparedLRU 46 47 connCfg *ConnConfig 48 49 executor *queryExecutor 50 pool *policyConnPool 51 policy HostSelectionPolicy 52 53 ring ring 54 metadata clusterMetadata 55 56 mu sync.RWMutex 57 58 control *controlConn 59 60 // event handlers 61 nodeEvents *eventDebouncer 62 schemaEvents *eventDebouncer 63 64 // ring metadata 65 hosts []HostInfo 66 useSystemSchema bool 67 hasAggregatesAndFunctions bool 68 69 cfg ClusterConfig 70 71 quit chan struct{} 72 73 closeMu sync.RWMutex 74 isClosed bool 75} 76 77var queryPool = &sync.Pool{ 78 New: func() interface{} { 79 return new(Query) 80 }, 81} 82 83func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) { 84 var hosts []*HostInfo 85 for _, hostport := range addrs { 86 resolvedHosts, err := hostInfo(hostport, defaultPort) 87 if err != nil { 88 // Try other hosts if unable to resolve DNS name 89 if _, ok := err.(*net.DNSError); ok { 90 Logger.Printf("gocql: dns error: %v\n", err) 91 continue 92 } 93 return nil, err 94 } 95 96 hosts = append(hosts, resolvedHosts...) 97 } 98 if len(hosts) == 0 { 99 return nil, errors.New("failed to resolve any of the provided hostnames") 100 } 101 return hosts, nil 102} 103 104// NewSession wraps an existing Node. 105func NewSession(cfg ClusterConfig) (*Session, error) { 106 // Check that hosts in the ClusterConfig is not empty 107 if len(cfg.Hosts) < 1 { 108 return nil, ErrNoHosts 109 } 110 111 // Check that either Authenticator is set or AuthProvider, not both 112 if cfg.Authenticator != nil && cfg.AuthProvider != nil { 113 return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.") 114 } 115 116 s := &Session{ 117 cons: cfg.Consistency, 118 prefetch: 0.25, 119 cfg: cfg, 120 pageSize: cfg.PageSize, 121 stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)}, 122 quit: make(chan struct{}), 123 connectObserver: cfg.ConnectObserver, 124 } 125 126 s.schemaDescriber = newSchemaDescriber(s) 127 128 s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent) 129 s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent) 130 131 s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo) 132 133 s.hostSource = &ringDescriber{session: s} 134 135 if cfg.PoolConfig.HostSelectionPolicy == nil { 136 cfg.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy() 137 } 138 s.pool = cfg.PoolConfig.buildPool(s) 139 140 s.policy = cfg.PoolConfig.HostSelectionPolicy 141 s.policy.Init(s) 142 143 s.executor = &queryExecutor{ 144 pool: s.pool, 145 policy: cfg.PoolConfig.HostSelectionPolicy, 146 } 147 148 s.queryObserver = cfg.QueryObserver 149 s.batchObserver = cfg.BatchObserver 150 s.connectObserver = cfg.ConnectObserver 151 s.frameObserver = cfg.FrameHeaderObserver 152 153 //Check the TLS Config before trying to connect to anything external 154 connCfg, err := connConfig(&s.cfg) 155 if err != nil { 156 //TODO: Return a typed error 157 return nil, fmt.Errorf("gocql: unable to create session: %v", err) 158 } 159 s.connCfg = connCfg 160 161 if err := s.init(); err != nil { 162 s.Close() 163 if err == ErrNoConnectionsStarted { 164 //This error used to be generated inside NewSession & returned directly 165 //Forward it on up to be backwards compatible 166 return nil, ErrNoConnectionsStarted 167 } else { 168 // TODO(zariel): dont wrap this error in fmt.Errorf, return a typed error 169 return nil, fmt.Errorf("gocql: unable to create session: %v", err) 170 } 171 } 172 173 return s, nil 174} 175 176func (s *Session) init() error { 177 hosts, err := addrsToHosts(s.cfg.Hosts, s.cfg.Port) 178 if err != nil { 179 return err 180 } 181 s.ring.endpoints = hosts 182 183 if !s.cfg.disableControlConn { 184 s.control = createControlConn(s) 185 if s.cfg.ProtoVersion == 0 { 186 proto, err := s.control.discoverProtocol(hosts) 187 if err != nil { 188 return fmt.Errorf("unable to discover protocol version: %v", err) 189 } else if proto == 0 { 190 return errors.New("unable to discovery protocol version") 191 } 192 193 // TODO(zariel): we really only need this in 1 place 194 s.cfg.ProtoVersion = proto 195 s.connCfg.ProtoVersion = proto 196 } 197 198 if err := s.control.connect(hosts); err != nil { 199 return err 200 } 201 202 if !s.cfg.DisableInitialHostLookup { 203 var partitioner string 204 newHosts, partitioner, err := s.hostSource.GetHosts() 205 if err != nil { 206 return err 207 } 208 s.policy.SetPartitioner(partitioner) 209 filteredHosts := make([]*HostInfo, 0, len(newHosts)) 210 for _, host := range newHosts { 211 if !s.cfg.filterHost(host) { 212 filteredHosts = append(filteredHosts, host) 213 } 214 } 215 hosts = append(hosts, filteredHosts...) 216 } 217 } 218 219 hostMap := make(map[string]*HostInfo, len(hosts)) 220 for _, host := range hosts { 221 hostMap[host.ConnectAddress().String()] = host 222 } 223 224 for _, host := range hostMap { 225 host = s.ring.addOrUpdate(host) 226 s.addNewNode(host) 227 } 228 229 // TODO(zariel): we probably dont need this any more as we verify that we 230 // can connect to one of the endpoints supplied by using the control conn. 231 // See if there are any connections in the pool 232 if s.cfg.ReconnectInterval > 0 { 233 go s.reconnectDownedHosts(s.cfg.ReconnectInterval) 234 } 235 236 // If we disable the initial host lookup, we need to still check if the 237 // cluster is using the newer system schema or not... however, if control 238 // connection is disable, we really have no choice, so we just make our 239 // best guess... 240 if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup { 241 newer, _ := checkSystemSchema(s.control) 242 s.useSystemSchema = newer 243 } else { 244 version := s.ring.rrHost().Version() 245 s.useSystemSchema = version.AtLeast(3, 0, 0) 246 s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0) 247 } 248 249 if s.pool.Size() == 0 { 250 return ErrNoConnectionsStarted 251 } 252 253 // Invoke KeyspaceChanged to let the policy cache the session keyspace 254 // parameters. This is used by tokenAwareHostPolicy to discover replicas. 255 if !s.cfg.disableControlConn && s.cfg.Keyspace != "" { 256 s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: s.cfg.Keyspace}) 257 } 258 259 return nil 260} 261 262func (s *Session) reconnectDownedHosts(intv time.Duration) { 263 reconnectTicker := time.NewTicker(intv) 264 defer reconnectTicker.Stop() 265 266 for { 267 select { 268 case <-reconnectTicker.C: 269 hosts := s.ring.allHosts() 270 271 // Print session.ring for debug. 272 if gocqlDebug { 273 buf := bytes.NewBufferString("Session.ring:") 274 for _, h := range hosts { 275 buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]") 276 } 277 Logger.Println(buf.String()) 278 } 279 280 for _, h := range hosts { 281 if h.IsUp() { 282 continue 283 } 284 s.handleNodeUp(h.ConnectAddress(), h.Port(), true) 285 } 286 case <-s.quit: 287 return 288 } 289 } 290} 291 292// SetConsistency sets the default consistency level for this session. This 293// setting can also be changed on a per-query basis and the default value 294// is Quorum. 295func (s *Session) SetConsistency(cons Consistency) { 296 s.mu.Lock() 297 s.cons = cons 298 s.mu.Unlock() 299} 300 301// SetPageSize sets the default page size for this session. A value <= 0 will 302// disable paging. This setting can also be changed on a per-query basis. 303func (s *Session) SetPageSize(n int) { 304 s.mu.Lock() 305 s.pageSize = n 306 s.mu.Unlock() 307} 308 309// SetPrefetch sets the default threshold for pre-fetching new pages. If 310// there are only p*pageSize rows remaining, the next page will be requested 311// automatically. This value can also be changed on a per-query basis and 312// the default value is 0.25. 313func (s *Session) SetPrefetch(p float64) { 314 s.mu.Lock() 315 s.prefetch = p 316 s.mu.Unlock() 317} 318 319// SetTrace sets the default tracer for this session. This setting can also 320// be changed on a per-query basis. 321func (s *Session) SetTrace(trace Tracer) { 322 s.mu.Lock() 323 s.trace = trace 324 s.mu.Unlock() 325} 326 327// Query generates a new query object for interacting with the database. 328// Further details of the query may be tweaked using the resulting query 329// value before the query is executed. Query is automatically prepared 330// if it has not previously been executed. 331func (s *Session) Query(stmt string, values ...interface{}) *Query { 332 qry := queryPool.Get().(*Query) 333 qry.session = s 334 qry.stmt = stmt 335 qry.values = values 336 qry.defaultsFromSession() 337 return qry 338} 339 340type QueryInfo struct { 341 Id []byte 342 Args []ColumnInfo 343 Rval []ColumnInfo 344 PKeyColumns []int 345} 346 347// Bind generates a new query object based on the query statement passed in. 348// The query is automatically prepared if it has not previously been executed. 349// The binding callback allows the application to define which query argument 350// values will be marshalled as part of the query execution. 351// During execution, the meta data of the prepared query will be routed to the 352// binding callback, which is responsible for producing the query argument values. 353func (s *Session) Bind(stmt string, b func(q *QueryInfo) ([]interface{}, error)) *Query { 354 qry := queryPool.Get().(*Query) 355 qry.session = s 356 qry.stmt = stmt 357 qry.binding = b 358 qry.defaultsFromSession() 359 return qry 360} 361 362// Close closes all connections. The session is unusable after this 363// operation. 364func (s *Session) Close() { 365 366 s.closeMu.Lock() 367 defer s.closeMu.Unlock() 368 if s.isClosed { 369 return 370 } 371 s.isClosed = true 372 373 if s.pool != nil { 374 s.pool.Close() 375 } 376 377 if s.control != nil { 378 s.control.close() 379 } 380 381 if s.nodeEvents != nil { 382 s.nodeEvents.stop() 383 } 384 385 if s.schemaEvents != nil { 386 s.schemaEvents.stop() 387 } 388 389 if s.quit != nil { 390 close(s.quit) 391 } 392} 393 394func (s *Session) Closed() bool { 395 s.closeMu.RLock() 396 closed := s.isClosed 397 s.closeMu.RUnlock() 398 return closed 399} 400 401func (s *Session) executeQuery(qry *Query) (it *Iter) { 402 // fail fast 403 if s.Closed() { 404 return &Iter{err: ErrSessionClosed} 405 } 406 407 iter, err := s.executor.executeQuery(qry) 408 if err != nil { 409 return &Iter{err: err} 410 } 411 if iter == nil { 412 panic("nil iter") 413 } 414 415 return iter 416} 417 418func (s *Session) removeHost(h *HostInfo) { 419 s.policy.RemoveHost(h) 420 s.pool.removeHost(h.ConnectAddress()) 421 s.ring.removeHost(h.ConnectAddress()) 422} 423 424// KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist. 425func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { 426 // fail fast 427 if s.Closed() { 428 return nil, ErrSessionClosed 429 } else if keyspace == "" { 430 return nil, ErrNoKeyspace 431 } 432 433 return s.schemaDescriber.getSchema(keyspace) 434} 435 436func (s *Session) getConn() *Conn { 437 hosts := s.ring.allHosts() 438 for _, host := range hosts { 439 if !host.IsUp() { 440 continue 441 } 442 443 pool, ok := s.pool.getPool(host) 444 if !ok { 445 continue 446 } else if conn := pool.Pick(); conn != nil { 447 return conn 448 } 449 } 450 451 return nil 452} 453 454// returns routing key indexes and type info 455func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyInfo, error) { 456 s.routingKeyInfoCache.mu.Lock() 457 458 entry, cached := s.routingKeyInfoCache.lru.Get(stmt) 459 if cached { 460 // done accessing the cache 461 s.routingKeyInfoCache.mu.Unlock() 462 // the entry is an inflight struct similar to that used by 463 // Conn to prepare statements 464 inflight := entry.(*inflightCachedEntry) 465 466 // wait for any inflight work 467 inflight.wg.Wait() 468 469 if inflight.err != nil { 470 return nil, inflight.err 471 } 472 473 key, _ := inflight.value.(*routingKeyInfo) 474 475 return key, nil 476 } 477 478 // create a new inflight entry while the data is created 479 inflight := new(inflightCachedEntry) 480 inflight.wg.Add(1) 481 defer inflight.wg.Done() 482 s.routingKeyInfoCache.lru.Add(stmt, inflight) 483 s.routingKeyInfoCache.mu.Unlock() 484 485 var ( 486 info *preparedStatment 487 partitionKey []*ColumnMetadata 488 ) 489 490 conn := s.getConn() 491 if conn == nil { 492 // TODO: better error? 493 inflight.err = errors.New("gocql: unable to fetch prepared info: no connection available") 494 return nil, inflight.err 495 } 496 497 // get the query info for the statement 498 info, inflight.err = conn.prepareStatement(ctx, stmt, nil) 499 if inflight.err != nil { 500 // don't cache this error 501 s.routingKeyInfoCache.Remove(stmt) 502 return nil, inflight.err 503 } 504 505 // TODO: it would be nice to mark hosts here but as we are not using the policies 506 // to fetch hosts we cant 507 508 if info.request.colCount == 0 { 509 // no arguments, no routing key, and no error 510 return nil, nil 511 } 512 513 if len(info.request.pkeyColumns) > 0 { 514 // proto v4 dont need to calculate primary key columns 515 types := make([]TypeInfo, len(info.request.pkeyColumns)) 516 for i, col := range info.request.pkeyColumns { 517 types[i] = info.request.columns[col].TypeInfo 518 } 519 520 routingKeyInfo := &routingKeyInfo{ 521 indexes: info.request.pkeyColumns, 522 types: types, 523 } 524 525 inflight.value = routingKeyInfo 526 return routingKeyInfo, nil 527 } 528 529 // get the table metadata 530 table := info.request.columns[0].Table 531 532 var keyspaceMetadata *KeyspaceMetadata 533 keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace) 534 if inflight.err != nil { 535 // don't cache this error 536 s.routingKeyInfoCache.Remove(stmt) 537 return nil, inflight.err 538 } 539 540 tableMetadata, found := keyspaceMetadata.Tables[table] 541 if !found { 542 // unlikely that the statement could be prepared and the metadata for 543 // the table couldn't be found, but this may indicate either a bug 544 // in the metadata code, or that the table was just dropped. 545 inflight.err = ErrNoMetadata 546 // don't cache this error 547 s.routingKeyInfoCache.Remove(stmt) 548 return nil, inflight.err 549 } 550 551 partitionKey = tableMetadata.PartitionKey 552 553 size := len(partitionKey) 554 routingKeyInfo := &routingKeyInfo{ 555 indexes: make([]int, size), 556 types: make([]TypeInfo, size), 557 } 558 559 for keyIndex, keyColumn := range partitionKey { 560 // set an indicator for checking if the mapping is missing 561 routingKeyInfo.indexes[keyIndex] = -1 562 563 // find the column in the query info 564 for argIndex, boundColumn := range info.request.columns { 565 if keyColumn.Name == boundColumn.Name { 566 // there may be many such bound columns, pick the first 567 routingKeyInfo.indexes[keyIndex] = argIndex 568 routingKeyInfo.types[keyIndex] = boundColumn.TypeInfo 569 break 570 } 571 } 572 573 if routingKeyInfo.indexes[keyIndex] == -1 { 574 // missing a routing key column mapping 575 // no routing key, and no error 576 return nil, nil 577 } 578 } 579 580 // cache this result 581 inflight.value = routingKeyInfo 582 583 return routingKeyInfo, nil 584} 585 586func (b *Batch) execute(ctx context.Context, conn *Conn) *Iter { 587 return conn.executeBatch(ctx, b) 588} 589 590func (s *Session) executeBatch(batch *Batch) *Iter { 591 // fail fast 592 if s.Closed() { 593 return &Iter{err: ErrSessionClosed} 594 } 595 596 // Prevent the execution of the batch if greater than the limit 597 // Currently batches have a limit of 65536 queries. 598 // https://datastax-oss.atlassian.net/browse/JAVA-229 599 if batch.Size() > BatchSizeMaximum { 600 return &Iter{err: ErrTooManyStmts} 601 } 602 603 iter, err := s.executor.executeQuery(batch) 604 if err != nil { 605 return &Iter{err: err} 606 } 607 608 return iter 609} 610 611// ExecuteBatch executes a batch operation and returns nil if successful 612// otherwise an error is returned describing the failure. 613func (s *Session) ExecuteBatch(batch *Batch) error { 614 iter := s.executeBatch(batch) 615 return iter.Close() 616} 617 618// ExecuteBatchCAS executes a batch operation and returns true if successful and 619// an iterator (to scan aditional rows if more than one conditional statement) 620// was sent. 621// Further scans on the interator must also remember to include 622// the applied boolean as the first argument to *Iter.Scan 623func (s *Session) ExecuteBatchCAS(batch *Batch, dest ...interface{}) (applied bool, iter *Iter, err error) { 624 iter = s.executeBatch(batch) 625 if err := iter.checkErrAndNotFound(); err != nil { 626 iter.Close() 627 return false, nil, err 628 } 629 630 if len(iter.Columns()) > 1 { 631 dest = append([]interface{}{&applied}, dest...) 632 iter.Scan(dest...) 633 } else { 634 iter.Scan(&applied) 635 } 636 637 return applied, iter, nil 638} 639 640// MapExecuteBatchCAS executes a batch operation much like ExecuteBatchCAS, 641// however it accepts a map rather than a list of arguments for the initial 642// scan. 643func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) (applied bool, iter *Iter, err error) { 644 iter = s.executeBatch(batch) 645 if err := iter.checkErrAndNotFound(); err != nil { 646 iter.Close() 647 return false, nil, err 648 } 649 iter.MapScan(dest) 650 applied = dest["[applied]"].(bool) 651 delete(dest, "[applied]") 652 653 // we usually close here, but instead of closing, just returin an error 654 // if MapScan failed. Although Close just returns err, using Close 655 // here might be confusing as we are not actually closing the iter 656 return applied, iter, iter.err 657} 658 659type hostMetrics struct { 660 Attempts int 661 TotalLatency int64 662} 663 664type queryMetrics struct { 665 l sync.RWMutex 666 m map[string]*hostMetrics 667} 668 669// Query represents a CQL statement that can be executed. 670type Query struct { 671 stmt string 672 values []interface{} 673 cons Consistency 674 pageSize int 675 routingKey []byte 676 routingKeyBuffer []byte 677 pageState []byte 678 prefetch float64 679 trace Tracer 680 observer QueryObserver 681 session *Session 682 rt RetryPolicy 683 spec SpeculativeExecutionPolicy 684 binding func(q *QueryInfo) ([]interface{}, error) 685 serialCons SerialConsistency 686 defaultTimestamp bool 687 defaultTimestampValue int64 688 disableSkipMetadata bool 689 context context.Context 690 idempotent bool 691 customPayload map[string][]byte 692 metrics *queryMetrics 693 694 disableAutoPage bool 695} 696 697func (q *Query) defaultsFromSession() { 698 s := q.session 699 700 s.mu.RLock() 701 q.cons = s.cons 702 q.pageSize = s.pageSize 703 q.trace = s.trace 704 q.observer = s.queryObserver 705 q.prefetch = s.prefetch 706 q.rt = s.cfg.RetryPolicy 707 q.serialCons = s.cfg.SerialConsistency 708 q.defaultTimestamp = s.cfg.DefaultTimestamp 709 q.idempotent = s.cfg.DefaultIdempotence 710 q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)} 711 712 q.spec = &NonSpeculativeExecution{} 713 s.mu.RUnlock() 714} 715 716func (q *Query) getHostMetrics(host *HostInfo) *hostMetrics { 717 q.metrics.l.Lock() 718 metrics, exists := q.metrics.m[host.ConnectAddress().String()] 719 if !exists { 720 // if the host is not in the map, it means it's been accessed for the first time 721 metrics = &hostMetrics{} 722 q.metrics.m[host.ConnectAddress().String()] = metrics 723 } 724 q.metrics.l.Unlock() 725 726 return metrics 727} 728 729// Statement returns the statement that was used to generate this query. 730func (q Query) Statement() string { 731 return q.stmt 732} 733 734// String implements the stringer interface. 735func (q Query) String() string { 736 return fmt.Sprintf("[query statement=%q values=%+v consistency=%s]", q.stmt, q.values, q.cons) 737} 738 739//Attempts returns the number of times the query was executed. 740func (q *Query) Attempts() int { 741 q.metrics.l.Lock() 742 var attempts int 743 for _, metric := range q.metrics.m { 744 attempts += metric.Attempts 745 } 746 q.metrics.l.Unlock() 747 return attempts 748} 749 750func (q *Query) AddAttempts(i int, host *HostInfo) { 751 hostMetric := q.getHostMetrics(host) 752 q.metrics.l.Lock() 753 hostMetric.Attempts += i 754 q.metrics.l.Unlock() 755} 756 757//Latency returns the average amount of nanoseconds per attempt of the query. 758func (q *Query) Latency() int64 { 759 q.metrics.l.Lock() 760 var attempts int 761 var latency int64 762 for _, metric := range q.metrics.m { 763 attempts += metric.Attempts 764 latency += metric.TotalLatency 765 } 766 q.metrics.l.Unlock() 767 if attempts > 0 { 768 return latency / int64(attempts) 769 } 770 return 0 771} 772 773func (q *Query) AddLatency(l int64, host *HostInfo) { 774 hostMetric := q.getHostMetrics(host) 775 q.metrics.l.Lock() 776 hostMetric.TotalLatency += l 777 q.metrics.l.Unlock() 778} 779 780// Consistency sets the consistency level for this query. If no consistency 781// level have been set, the default consistency level of the cluster 782// is used. 783func (q *Query) Consistency(c Consistency) *Query { 784 q.cons = c 785 return q 786} 787 788// GetConsistency returns the currently configured consistency level for 789// the query. 790func (q *Query) GetConsistency() Consistency { 791 return q.cons 792} 793 794// Same as Consistency but without a return value 795func (q *Query) SetConsistency(c Consistency) { 796 q.cons = c 797} 798 799// CustomPayload sets the custom payload level for this query. 800func (q *Query) CustomPayload(customPayload map[string][]byte) *Query { 801 q.customPayload = customPayload 802 return q 803} 804 805func (q *Query) Context() context.Context { 806 if q.context == nil { 807 return context.Background() 808 } 809 return q.context 810} 811 812// Trace enables tracing of this query. Look at the documentation of the 813// Tracer interface to learn more about tracing. 814func (q *Query) Trace(trace Tracer) *Query { 815 q.trace = trace 816 return q 817} 818 819// Observer enables query-level observer on this query. 820// The provided observer will be called every time this query is executed. 821func (q *Query) Observer(observer QueryObserver) *Query { 822 q.observer = observer 823 return q 824} 825 826// PageSize will tell the iterator to fetch the result in pages of size n. 827// This is useful for iterating over large result sets, but setting the 828// page size too low might decrease the performance. This feature is only 829// available in Cassandra 2 and onwards. 830func (q *Query) PageSize(n int) *Query { 831 q.pageSize = n 832 return q 833} 834 835// DefaultTimestamp will enable the with default timestamp flag on the query. 836// If enable, this will replace the server side assigned 837// timestamp as default timestamp. Note that a timestamp in the query itself 838// will still override this timestamp. This is entirely optional. 839// 840// Only available on protocol >= 3 841func (q *Query) DefaultTimestamp(enable bool) *Query { 842 q.defaultTimestamp = enable 843 return q 844} 845 846// WithTimestamp will enable the with default timestamp flag on the query 847// like DefaultTimestamp does. But also allows to define value for timestamp. 848// It works the same way as USING TIMESTAMP in the query itself, but 849// should not break prepared query optimization 850// 851// Only available on protocol >= 3 852func (q *Query) WithTimestamp(timestamp int64) *Query { 853 q.DefaultTimestamp(true) 854 q.defaultTimestampValue = timestamp 855 return q 856} 857 858// RoutingKey sets the routing key to use when a token aware connection 859// pool is used to optimize the routing of this query. 860func (q *Query) RoutingKey(routingKey []byte) *Query { 861 q.routingKey = routingKey 862 return q 863} 864 865func (q *Query) withContext(ctx context.Context) ExecutableQuery { 866 // I really wish go had covariant types 867 return q.WithContext(ctx) 868} 869 870// WithContext returns a shallow copy of q with its context 871// set to ctx. 872// 873// The provided context controls the entire lifetime of executing a 874// query, queries will be canceled and return once the context is 875// canceled. 876func (q *Query) WithContext(ctx context.Context) *Query { 877 q2 := *q 878 q2.context = ctx 879 return &q2 880} 881 882// Deprecate: does nothing, cancel the context passed to WithContext 883func (q *Query) Cancel() { 884 // TODO: delete 885} 886 887func (q *Query) execute(ctx context.Context, conn *Conn) *Iter { 888 return conn.executeQuery(ctx, q) 889} 890 891func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { 892 q.AddAttempts(1, host) 893 q.AddLatency(end.Sub(start).Nanoseconds(), host) 894 895 if q.observer != nil { 896 q.observer.ObserveQuery(q.Context(), ObservedQuery{ 897 Keyspace: keyspace, 898 Statement: q.stmt, 899 Start: start, 900 End: end, 901 Rows: iter.numRows, 902 Host: host, 903 Metrics: q.getHostMetrics(host), 904 Err: iter.err, 905 }) 906 } 907} 908 909func (q *Query) retryPolicy() RetryPolicy { 910 return q.rt 911} 912 913// Keyspace returns the keyspace the query will be executed against. 914func (q *Query) Keyspace() string { 915 if q.session == nil { 916 return "" 917 } 918 // TODO(chbannis): this should be parsed from the query or we should let 919 // this be set by users. 920 return q.session.cfg.Keyspace 921} 922 923// GetRoutingKey gets the routing key to use for routing this query. If 924// a routing key has not been explicitly set, then the routing key will 925// be constructed if possible using the keyspace's schema and the query 926// info for this query statement. If the routing key cannot be determined 927// then nil will be returned with no error. On any error condition, 928// an error description will be returned. 929func (q *Query) GetRoutingKey() ([]byte, error) { 930 if q.routingKey != nil { 931 return q.routingKey, nil 932 } else if q.binding != nil && len(q.values) == 0 { 933 // If this query was created using session.Bind we wont have the query 934 // values yet, so we have to pass down to the next policy. 935 // TODO: Remove this and handle this case 936 return nil, nil 937 } 938 939 // try to determine the routing key 940 routingKeyInfo, err := q.session.routingKeyInfo(q.Context(), q.stmt) 941 if err != nil { 942 return nil, err 943 } 944 945 if routingKeyInfo == nil { 946 return nil, nil 947 } 948 949 if len(routingKeyInfo.indexes) == 1 { 950 // single column routing key 951 routingKey, err := Marshal( 952 routingKeyInfo.types[0], 953 q.values[routingKeyInfo.indexes[0]], 954 ) 955 if err != nil { 956 return nil, err 957 } 958 return routingKey, nil 959 } 960 961 // We allocate that buffer only once, so that further re-bind/exec of the 962 // same query don't allocate more memory. 963 if q.routingKeyBuffer == nil { 964 q.routingKeyBuffer = make([]byte, 0, 256) 965 } 966 967 // composite routing key 968 buf := bytes.NewBuffer(q.routingKeyBuffer) 969 for i := range routingKeyInfo.indexes { 970 encoded, err := Marshal( 971 routingKeyInfo.types[i], 972 q.values[routingKeyInfo.indexes[i]], 973 ) 974 if err != nil { 975 return nil, err 976 } 977 lenBuf := []byte{0x00, 0x00} 978 binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded))) 979 buf.Write(lenBuf) 980 buf.Write(encoded) 981 buf.WriteByte(0x00) 982 } 983 routingKey := buf.Bytes() 984 return routingKey, nil 985} 986 987func (q *Query) shouldPrepare() bool { 988 989 stmt := strings.TrimLeftFunc(strings.TrimRightFunc(q.stmt, func(r rune) bool { 990 return unicode.IsSpace(r) || r == ';' 991 }), unicode.IsSpace) 992 993 var stmtType string 994 if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 { 995 stmtType = strings.ToLower(stmt[:n]) 996 } 997 if stmtType == "begin" { 998 if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 { 999 stmtType = strings.ToLower(stmt[n+1:]) 1000 } 1001 } 1002 switch stmtType { 1003 case "select", "insert", "update", "delete", "batch": 1004 return true 1005 } 1006 return false 1007} 1008 1009// SetPrefetch sets the default threshold for pre-fetching new pages. If 1010// there are only p*pageSize rows remaining, the next page will be requested 1011// automatically. 1012func (q *Query) Prefetch(p float64) *Query { 1013 q.prefetch = p 1014 return q 1015} 1016 1017// RetryPolicy sets the policy to use when retrying the query. 1018func (q *Query) RetryPolicy(r RetryPolicy) *Query { 1019 q.rt = r 1020 return q 1021} 1022 1023// SetSpeculativeExecutionPolicy sets the execution policy 1024func (q *Query) SetSpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Query { 1025 q.spec = sp 1026 return q 1027} 1028 1029// speculativeExecutionPolicy fetches the policy 1030func (q *Query) speculativeExecutionPolicy() SpeculativeExecutionPolicy { 1031 return q.spec 1032} 1033 1034func (q *Query) IsIdempotent() bool { 1035 return q.idempotent 1036} 1037 1038// Idempotent marks the query as being idempotent or not depending on 1039// the value. 1040func (q *Query) Idempotent(value bool) *Query { 1041 q.idempotent = value 1042 return q 1043} 1044 1045// Bind sets query arguments of query. This can also be used to rebind new query arguments 1046// to an existing query instance. 1047func (q *Query) Bind(v ...interface{}) *Query { 1048 q.values = v 1049 return q 1050} 1051 1052// SerialConsistency sets the consistency level for the 1053// serial phase of conditional updates. That consistency can only be 1054// either SERIAL or LOCAL_SERIAL and if not present, it defaults to 1055// SERIAL. This option will be ignored for anything else that a 1056// conditional update/insert. 1057func (q *Query) SerialConsistency(cons SerialConsistency) *Query { 1058 q.serialCons = cons 1059 return q 1060} 1061 1062// PageState sets the paging state for the query to resume paging from a specific 1063// point in time. Setting this will disable to query paging for this query, and 1064// must be used for all subsequent pages. 1065func (q *Query) PageState(state []byte) *Query { 1066 q.pageState = state 1067 q.disableAutoPage = true 1068 return q 1069} 1070 1071// NoSkipMetadata will override the internal result metadata cache so that the driver does not 1072// send skip_metadata for queries, this means that the result will always contain 1073// the metadata to parse the rows and will not reuse the metadata from the prepared 1074// staement. This should only be used to work around cassandra bugs, such as when using 1075// CAS operations which do not end in Cas. 1076// 1077// See https://issues.apache.org/jira/browse/CASSANDRA-11099 1078// https://github.com/gocql/gocql/issues/612 1079func (q *Query) NoSkipMetadata() *Query { 1080 q.disableSkipMetadata = true 1081 return q 1082} 1083 1084// Exec executes the query without returning any rows. 1085func (q *Query) Exec() error { 1086 return q.Iter().Close() 1087} 1088 1089func isUseStatement(stmt string) bool { 1090 if len(stmt) < 3 { 1091 return false 1092 } 1093 1094 return strings.EqualFold(stmt[0:3], "use") 1095} 1096 1097// Iter executes the query and returns an iterator capable of iterating 1098// over all results. 1099func (q *Query) Iter() *Iter { 1100 if isUseStatement(q.stmt) { 1101 return &Iter{err: ErrUseStmt} 1102 } 1103 return q.session.executeQuery(q) 1104} 1105 1106// MapScan executes the query, copies the columns of the first selected 1107// row into the map pointed at by m and discards the rest. If no rows 1108// were selected, ErrNotFound is returned. 1109func (q *Query) MapScan(m map[string]interface{}) error { 1110 iter := q.Iter() 1111 if err := iter.checkErrAndNotFound(); err != nil { 1112 return err 1113 } 1114 iter.MapScan(m) 1115 return iter.Close() 1116} 1117 1118// Scan executes the query, copies the columns of the first selected 1119// row into the values pointed at by dest and discards the rest. If no rows 1120// were selected, ErrNotFound is returned. 1121func (q *Query) Scan(dest ...interface{}) error { 1122 iter := q.Iter() 1123 if err := iter.checkErrAndNotFound(); err != nil { 1124 return err 1125 } 1126 iter.Scan(dest...) 1127 return iter.Close() 1128} 1129 1130// ScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT 1131// statement containing an IF clause). If the transaction fails because 1132// the existing values did not match, the previous values will be stored 1133// in dest. 1134func (q *Query) ScanCAS(dest ...interface{}) (applied bool, err error) { 1135 q.disableSkipMetadata = true 1136 iter := q.Iter() 1137 if err := iter.checkErrAndNotFound(); err != nil { 1138 return false, err 1139 } 1140 if len(iter.Columns()) > 1 { 1141 dest = append([]interface{}{&applied}, dest...) 1142 iter.Scan(dest...) 1143 } else { 1144 iter.Scan(&applied) 1145 } 1146 return applied, iter.Close() 1147} 1148 1149// MapScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT 1150// statement containing an IF clause). If the transaction fails because 1151// the existing values did not match, the previous values will be stored 1152// in dest map. 1153// 1154// As for INSERT .. IF NOT EXISTS, previous values will be returned as if 1155// SELECT * FROM. So using ScanCAS with INSERT is inherently prone to 1156// column mismatching. MapScanCAS is added to capture them safely. 1157func (q *Query) MapScanCAS(dest map[string]interface{}) (applied bool, err error) { 1158 q.disableSkipMetadata = true 1159 iter := q.Iter() 1160 if err := iter.checkErrAndNotFound(); err != nil { 1161 return false, err 1162 } 1163 iter.MapScan(dest) 1164 applied = dest["[applied]"].(bool) 1165 delete(dest, "[applied]") 1166 1167 return applied, iter.Close() 1168} 1169 1170// Release releases a query back into a pool of queries. Released Queries 1171// cannot be reused. 1172// 1173// Example: 1174// qry := session.Query("SELECT * FROM my_table") 1175// qry.Exec() 1176// qry.Release() 1177func (q *Query) Release() { 1178 q.reset() 1179 queryPool.Put(q) 1180} 1181 1182// reset zeroes out all fields of a query so that it can be safely pooled. 1183func (q *Query) reset() { 1184 *q = Query{} 1185} 1186 1187// Iter represents an iterator that can be used to iterate over all rows that 1188// were returned by a query. The iterator might send additional queries to the 1189// database during the iteration if paging was enabled. 1190type Iter struct { 1191 err error 1192 pos int 1193 meta resultMetadata 1194 numRows int 1195 next *nextIter 1196 host *HostInfo 1197 1198 framer *framer 1199 closed int32 1200} 1201 1202// Host returns the host which the query was sent to. 1203func (iter *Iter) Host() *HostInfo { 1204 return iter.host 1205} 1206 1207// Columns returns the name and type of the selected columns. 1208func (iter *Iter) Columns() []ColumnInfo { 1209 return iter.meta.columns 1210} 1211 1212type Scanner interface { 1213 // Next advances the row pointer to point at the next row, the row is valid until 1214 // the next call of Next. It returns true if there is a row which is available to be 1215 // scanned into with Scan. 1216 // Next must be called before every call to Scan. 1217 Next() bool 1218 1219 // Scan copies the current row's columns into dest. If the length of dest does not equal 1220 // the number of columns returned in the row an error is returned. If an error is encountered 1221 // when unmarshalling a column into the value in dest an error is returned and the row is invalidated 1222 // until the next call to Next. 1223 // Next must be called before calling Scan, if it is not an error is returned. 1224 Scan(...interface{}) error 1225 1226 // Err returns the if there was one during iteration that resulted in iteration being unable to complete. 1227 // Err will also release resources held by the iterator, the Scanner should not used after being called. 1228 Err() error 1229} 1230 1231type iterScanner struct { 1232 iter *Iter 1233 cols [][]byte 1234 valid bool 1235} 1236 1237func (is *iterScanner) Next() bool { 1238 iter := is.iter 1239 if iter.err != nil { 1240 return false 1241 } 1242 1243 if iter.pos >= iter.numRows { 1244 if iter.next != nil { 1245 is.iter = iter.next.fetch() 1246 return is.Next() 1247 } 1248 return false 1249 } 1250 1251 for i := 0; i < len(is.cols); i++ { 1252 col, err := iter.readColumn() 1253 if err != nil { 1254 iter.err = err 1255 return false 1256 } 1257 is.cols[i] = col 1258 } 1259 iter.pos++ 1260 is.valid = true 1261 1262 return true 1263} 1264 1265func scanColumn(p []byte, col ColumnInfo, dest []interface{}) (int, error) { 1266 if dest[0] == nil { 1267 return 1, nil 1268 } 1269 1270 if col.TypeInfo.Type() == TypeTuple { 1271 // this will panic, actually a bug, please report 1272 tuple := col.TypeInfo.(TupleTypeInfo) 1273 1274 count := len(tuple.Elems) 1275 // here we pass in a slice of the struct which has the number number of 1276 // values as elements in the tuple 1277 if err := Unmarshal(col.TypeInfo, p, dest[:count]); err != nil { 1278 return 0, err 1279 } 1280 return count, nil 1281 } else { 1282 if err := Unmarshal(col.TypeInfo, p, dest[0]); err != nil { 1283 return 0, err 1284 } 1285 return 1, nil 1286 } 1287} 1288 1289func (is *iterScanner) Scan(dest ...interface{}) error { 1290 if !is.valid { 1291 return errors.New("gocql: Scan called without calling Next") 1292 } 1293 1294 iter := is.iter 1295 // currently only support scanning into an expand tuple, such that its the same 1296 // as scanning in more values from a single column 1297 if len(dest) != iter.meta.actualColCount { 1298 return fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount) 1299 } 1300 1301 // i is the current position in dest, could posible replace it and just use 1302 // slices of dest 1303 i := 0 1304 var err error 1305 for _, col := range iter.meta.columns { 1306 var n int 1307 n, err = scanColumn(is.cols[i], col, dest[i:]) 1308 if err != nil { 1309 break 1310 } 1311 i += n 1312 } 1313 1314 is.valid = false 1315 return err 1316} 1317 1318func (is *iterScanner) Err() error { 1319 iter := is.iter 1320 is.iter = nil 1321 is.cols = nil 1322 is.valid = false 1323 return iter.Close() 1324} 1325 1326// Scanner returns a row Scanner which provides an interface to scan rows in a manner which is 1327// similar to database/sql. The iter should NOT be used again after calling this method. 1328func (iter *Iter) Scanner() Scanner { 1329 if iter == nil { 1330 return nil 1331 } 1332 1333 return &iterScanner{iter: iter, cols: make([][]byte, len(iter.meta.columns))} 1334} 1335 1336func (iter *Iter) readColumn() ([]byte, error) { 1337 return iter.framer.readBytesInternal() 1338} 1339 1340// Scan consumes the next row of the iterator and copies the columns of the 1341// current row into the values pointed at by dest. Use nil as a dest value 1342// to skip the corresponding column. Scan might send additional queries 1343// to the database to retrieve the next set of rows if paging was enabled. 1344// 1345// Scan returns true if the row was successfully unmarshaled or false if the 1346// end of the result set was reached or if an error occurred. Close should 1347// be called afterwards to retrieve any potential errors. 1348func (iter *Iter) Scan(dest ...interface{}) bool { 1349 if iter.err != nil { 1350 return false 1351 } 1352 1353 if iter.pos >= iter.numRows { 1354 if iter.next != nil { 1355 *iter = *iter.next.fetch() 1356 return iter.Scan(dest...) 1357 } 1358 return false 1359 } 1360 1361 if iter.next != nil && iter.pos >= iter.next.pos { 1362 go iter.next.fetch() 1363 } 1364 1365 // currently only support scanning into an expand tuple, such that its the same 1366 // as scanning in more values from a single column 1367 if len(dest) != iter.meta.actualColCount { 1368 iter.err = fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount) 1369 return false 1370 } 1371 1372 // i is the current position in dest, could posible replace it and just use 1373 // slices of dest 1374 i := 0 1375 for _, col := range iter.meta.columns { 1376 colBytes, err := iter.readColumn() 1377 if err != nil { 1378 iter.err = err 1379 return false 1380 } 1381 1382 n, err := scanColumn(colBytes, col, dest[i:]) 1383 if err != nil { 1384 iter.err = err 1385 return false 1386 } 1387 i += n 1388 } 1389 1390 iter.pos++ 1391 return true 1392} 1393 1394// GetCustomPayload returns any parsed custom payload results if given in the 1395// response from Cassandra. Note that the result is not a copy. 1396// 1397// This additional feature of CQL Protocol v4 1398// allows additional results and query information to be returned by 1399// custom QueryHandlers running in your C* cluster. 1400// See https://datastax.github.io/java-driver/manual/custom_payloads/ 1401func (iter *Iter) GetCustomPayload() map[string][]byte { 1402 return iter.framer.customPayload 1403} 1404 1405// Warnings returns any warnings generated if given in the response from Cassandra. 1406// 1407// This is only available starting with CQL Protocol v4. 1408func (iter *Iter) Warnings() []string { 1409 if iter.framer != nil { 1410 return iter.framer.header.warnings 1411 } 1412 return nil 1413} 1414 1415// Close closes the iterator and returns any errors that happened during 1416// the query or the iteration. 1417func (iter *Iter) Close() error { 1418 if atomic.CompareAndSwapInt32(&iter.closed, 0, 1) { 1419 if iter.framer != nil { 1420 iter.framer = nil 1421 } 1422 } 1423 1424 return iter.err 1425} 1426 1427// WillSwitchPage detects if iterator reached end of current page 1428// and the next page is available. 1429func (iter *Iter) WillSwitchPage() bool { 1430 return iter.pos >= iter.numRows && iter.next != nil 1431} 1432 1433// checkErrAndNotFound handle error and NotFound in one method. 1434func (iter *Iter) checkErrAndNotFound() error { 1435 if iter.err != nil { 1436 return iter.err 1437 } else if iter.numRows == 0 { 1438 return ErrNotFound 1439 } 1440 return nil 1441} 1442 1443// PageState return the current paging state for a query which can be used for 1444// subsequent queries to resume paging this point. 1445func (iter *Iter) PageState() []byte { 1446 return iter.meta.pagingState 1447} 1448 1449// NumRows returns the number of rows in this pagination, it will update when new 1450// pages are fetched, it is not the value of the total number of rows this iter 1451// will return unless there is only a single page returned. 1452func (iter *Iter) NumRows() int { 1453 return iter.numRows 1454} 1455 1456type nextIter struct { 1457 qry *Query 1458 pos int 1459 once sync.Once 1460 next *Iter 1461} 1462 1463func (n *nextIter) fetch() *Iter { 1464 n.once.Do(func() { 1465 n.next = n.qry.session.executeQuery(n.qry) 1466 }) 1467 return n.next 1468} 1469 1470type Batch struct { 1471 Type BatchType 1472 Entries []BatchEntry 1473 Cons Consistency 1474 CustomPayload map[string][]byte 1475 rt RetryPolicy 1476 spec SpeculativeExecutionPolicy 1477 observer BatchObserver 1478 serialCons SerialConsistency 1479 defaultTimestamp bool 1480 defaultTimestampValue int64 1481 context context.Context 1482 cancelBatch func() 1483 keyspace string 1484 metrics *queryMetrics 1485} 1486 1487// NewBatch creates a new batch operation without defaults from the cluster 1488// 1489// Deprecated: use session.NewBatch instead 1490func NewBatch(typ BatchType) *Batch { 1491 return &Batch{ 1492 Type: typ, 1493 metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, 1494 spec: &NonSpeculativeExecution{}, 1495 } 1496} 1497 1498// NewBatch creates a new batch operation using defaults defined in the cluster 1499func (s *Session) NewBatch(typ BatchType) *Batch { 1500 s.mu.RLock() 1501 batch := &Batch{ 1502 Type: typ, 1503 rt: s.cfg.RetryPolicy, 1504 serialCons: s.cfg.SerialConsistency, 1505 observer: s.batchObserver, 1506 Cons: s.cons, 1507 defaultTimestamp: s.cfg.DefaultTimestamp, 1508 keyspace: s.cfg.Keyspace, 1509 metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, 1510 spec: &NonSpeculativeExecution{}, 1511 } 1512 1513 s.mu.RUnlock() 1514 return batch 1515} 1516 1517func (b *Batch) getHostMetrics(host *HostInfo) *hostMetrics { 1518 b.metrics.l.Lock() 1519 metrics, exists := b.metrics.m[host.ConnectAddress().String()] 1520 if !exists { 1521 // if the host is not in the map, it means it's been accessed for the first time 1522 metrics = &hostMetrics{} 1523 b.metrics.m[host.ConnectAddress().String()] = metrics 1524 } 1525 b.metrics.l.Unlock() 1526 1527 return metrics 1528} 1529 1530// Observer enables batch-level observer on this batch. 1531// The provided observer will be called every time this batched query is executed. 1532func (b *Batch) Observer(observer BatchObserver) *Batch { 1533 b.observer = observer 1534 return b 1535} 1536 1537func (b *Batch) Keyspace() string { 1538 return b.keyspace 1539} 1540 1541// Attempts returns the number of attempts made to execute the batch. 1542func (b *Batch) Attempts() int { 1543 b.metrics.l.Lock() 1544 defer b.metrics.l.Unlock() 1545 1546 var attempts int 1547 for _, metric := range b.metrics.m { 1548 attempts += metric.Attempts 1549 } 1550 return attempts 1551} 1552 1553func (b *Batch) AddAttempts(i int, host *HostInfo) { 1554 hostMetric := b.getHostMetrics(host) 1555 b.metrics.l.Lock() 1556 hostMetric.Attempts += i 1557 b.metrics.l.Unlock() 1558} 1559 1560//Latency returns the average number of nanoseconds to execute a single attempt of the batch. 1561func (b *Batch) Latency() int64 { 1562 b.metrics.l.Lock() 1563 defer b.metrics.l.Unlock() 1564 1565 var ( 1566 attempts int 1567 latency int64 1568 ) 1569 for _, metric := range b.metrics.m { 1570 attempts += metric.Attempts 1571 latency += metric.TotalLatency 1572 } 1573 if attempts > 0 { 1574 return latency / int64(attempts) 1575 } 1576 return 0 1577} 1578 1579func (b *Batch) AddLatency(l int64, host *HostInfo) { 1580 hostMetric := b.getHostMetrics(host) 1581 b.metrics.l.Lock() 1582 hostMetric.TotalLatency += l 1583 b.metrics.l.Unlock() 1584} 1585 1586// GetConsistency returns the currently configured consistency level for the batch 1587// operation. 1588func (b *Batch) GetConsistency() Consistency { 1589 return b.Cons 1590} 1591 1592// SetConsistency sets the currently configured consistency level for the batch 1593// operation. 1594func (b *Batch) SetConsistency(c Consistency) { 1595 b.Cons = c 1596} 1597 1598func (b *Batch) Context() context.Context { 1599 if b.context == nil { 1600 return context.Background() 1601 } 1602 return b.context 1603} 1604 1605func (b *Batch) IsIdempotent() bool { 1606 for _, entry := range b.Entries { 1607 if !entry.Idempotent { 1608 return false 1609 } 1610 } 1611 return true 1612} 1613 1614func (b *Batch) speculativeExecutionPolicy() SpeculativeExecutionPolicy { 1615 return b.spec 1616} 1617 1618func (b *Batch) SpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Batch { 1619 b.spec = sp 1620 return b 1621} 1622 1623// Query adds the query to the batch operation 1624func (b *Batch) Query(stmt string, args ...interface{}) { 1625 b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args}) 1626} 1627 1628// Bind adds the query to the batch operation and correlates it with a binding callback 1629// that will be invoked when the batch is executed. The binding callback allows the application 1630// to define which query argument values will be marshalled as part of the batch execution. 1631func (b *Batch) Bind(stmt string, bind func(q *QueryInfo) ([]interface{}, error)) { 1632 b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, binding: bind}) 1633} 1634 1635func (b *Batch) retryPolicy() RetryPolicy { 1636 return b.rt 1637} 1638 1639// RetryPolicy sets the retry policy to use when executing the batch operation 1640func (b *Batch) RetryPolicy(r RetryPolicy) *Batch { 1641 b.rt = r 1642 return b 1643} 1644 1645func (b *Batch) withContext(ctx context.Context) ExecutableQuery { 1646 return b.WithContext(ctx) 1647} 1648 1649// WithContext returns a shallow copy of b with its context 1650// set to ctx. 1651// 1652// The provided context controls the entire lifetime of executing a 1653// query, queries will be canceled and return once the context is 1654// canceled. 1655func (b *Batch) WithContext(ctx context.Context) *Batch { 1656 b2 := *b 1657 b2.context = ctx 1658 return &b2 1659} 1660 1661// Deprecate: does nothing, cancel the context passed to WithContext 1662func (*Batch) Cancel() { 1663 // TODO: delete 1664} 1665 1666// Size returns the number of batch statements to be executed by the batch operation. 1667func (b *Batch) Size() int { 1668 return len(b.Entries) 1669} 1670 1671// SerialConsistency sets the consistency level for the 1672// serial phase of conditional updates. That consistency can only be 1673// either SERIAL or LOCAL_SERIAL and if not present, it defaults to 1674// SERIAL. This option will be ignored for anything else that a 1675// conditional update/insert. 1676// 1677// Only available for protocol 3 and above 1678func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch { 1679 b.serialCons = cons 1680 return b 1681} 1682 1683// DefaultTimestamp will enable the with default timestamp flag on the query. 1684// If enable, this will replace the server side assigned 1685// timestamp as default timestamp. Note that a timestamp in the query itself 1686// will still override this timestamp. This is entirely optional. 1687// 1688// Only available on protocol >= 3 1689func (b *Batch) DefaultTimestamp(enable bool) *Batch { 1690 b.defaultTimestamp = enable 1691 return b 1692} 1693 1694// WithTimestamp will enable the with default timestamp flag on the query 1695// like DefaultTimestamp does. But also allows to define value for timestamp. 1696// It works the same way as USING TIMESTAMP in the query itself, but 1697// should not break prepared query optimization 1698// 1699// Only available on protocol >= 3 1700func (b *Batch) WithTimestamp(timestamp int64) *Batch { 1701 b.DefaultTimestamp(true) 1702 b.defaultTimestampValue = timestamp 1703 return b 1704} 1705 1706func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { 1707 b.AddAttempts(1, host) 1708 b.AddLatency(end.Sub(start).Nanoseconds(), host) 1709 1710 if b.observer == nil { 1711 return 1712 } 1713 1714 statements := make([]string, len(b.Entries)) 1715 for i, entry := range b.Entries { 1716 statements[i] = entry.Stmt 1717 } 1718 1719 b.observer.ObserveBatch(b.Context(), ObservedBatch{ 1720 Keyspace: keyspace, 1721 Statements: statements, 1722 Start: start, 1723 End: end, 1724 // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS 1725 Host: host, 1726 Metrics: b.getHostMetrics(host), 1727 Err: iter.err, 1728 }) 1729} 1730 1731func (b *Batch) GetRoutingKey() ([]byte, error) { 1732 // TODO: use the first statement in the batch as the routing key? 1733 return nil, nil 1734} 1735 1736type BatchType byte 1737 1738const ( 1739 LoggedBatch BatchType = 0 1740 UnloggedBatch BatchType = 1 1741 CounterBatch BatchType = 2 1742) 1743 1744type BatchEntry struct { 1745 Stmt string 1746 Args []interface{} 1747 Idempotent bool 1748 binding func(q *QueryInfo) ([]interface{}, error) 1749} 1750 1751type ColumnInfo struct { 1752 Keyspace string 1753 Table string 1754 Name string 1755 TypeInfo TypeInfo 1756} 1757 1758func (c ColumnInfo) String() string { 1759 return fmt.Sprintf("[column keyspace=%s table=%s name=%s type=%v]", c.Keyspace, c.Table, c.Name, c.TypeInfo) 1760} 1761 1762// routing key indexes LRU cache 1763type routingKeyInfoLRU struct { 1764 lru *lru.Cache 1765 mu sync.Mutex 1766} 1767 1768type routingKeyInfo struct { 1769 indexes []int 1770 types []TypeInfo 1771} 1772 1773func (r *routingKeyInfo) String() string { 1774 return fmt.Sprintf("routing key index=%v types=%v", r.indexes, r.types) 1775} 1776 1777func (r *routingKeyInfoLRU) Remove(key string) { 1778 r.mu.Lock() 1779 r.lru.Remove(key) 1780 r.mu.Unlock() 1781} 1782 1783//Max adjusts the maximum size of the cache and cleans up the oldest records if 1784//the new max is lower than the previous value. Not concurrency safe. 1785func (r *routingKeyInfoLRU) Max(max int) { 1786 r.mu.Lock() 1787 for r.lru.Len() > max { 1788 r.lru.RemoveOldest() 1789 } 1790 r.lru.MaxEntries = max 1791 r.mu.Unlock() 1792} 1793 1794type inflightCachedEntry struct { 1795 wg sync.WaitGroup 1796 err error 1797 value interface{} 1798} 1799 1800// Tracer is the interface implemented by query tracers. Tracers have the 1801// ability to obtain a detailed event log of all events that happened during 1802// the execution of a query from Cassandra. Gathering this information might 1803// be essential for debugging and optimizing queries, but this feature should 1804// not be used on production systems with very high load. 1805type Tracer interface { 1806 Trace(traceId []byte) 1807} 1808 1809type traceWriter struct { 1810 session *Session 1811 w io.Writer 1812 mu sync.Mutex 1813} 1814 1815// NewTraceWriter returns a simple Tracer implementation that outputs 1816// the event log in a textual format. 1817func NewTraceWriter(session *Session, w io.Writer) Tracer { 1818 return &traceWriter{session: session, w: w} 1819} 1820 1821func (t *traceWriter) Trace(traceId []byte) { 1822 var ( 1823 coordinator string 1824 duration int 1825 ) 1826 iter := t.session.control.query(`SELECT coordinator, duration 1827 FROM system_traces.sessions 1828 WHERE session_id = ?`, traceId) 1829 1830 iter.Scan(&coordinator, &duration) 1831 if err := iter.Close(); err != nil { 1832 t.mu.Lock() 1833 fmt.Fprintln(t.w, "Error:", err) 1834 t.mu.Unlock() 1835 return 1836 } 1837 1838 var ( 1839 timestamp time.Time 1840 activity string 1841 source string 1842 elapsed int 1843 ) 1844 1845 t.mu.Lock() 1846 defer t.mu.Unlock() 1847 1848 fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n", 1849 traceId, coordinator, time.Duration(duration)*time.Microsecond) 1850 1851 iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed 1852 FROM system_traces.events 1853 WHERE session_id = ?`, traceId) 1854 1855 for iter.Scan(×tamp, &activity, &source, &elapsed) { 1856 fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n", 1857 timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed) 1858 } 1859 1860 if err := iter.Close(); err != nil { 1861 fmt.Fprintln(t.w, "Error:", err) 1862 } 1863} 1864 1865type ObservedQuery struct { 1866 Keyspace string 1867 Statement string 1868 1869 Start time.Time // time immediately before the query was called 1870 End time.Time // time immediately after the query returned 1871 1872 // Rows is the number of rows in the current iter. 1873 // In paginated queries, rows from previous scans are not counted. 1874 // Rows is not used in batch queries and remains at the default value 1875 Rows int 1876 1877 // Host is the informations about the host that performed the query 1878 Host *HostInfo 1879 1880 // The metrics per this host 1881 Metrics *hostMetrics 1882 1883 // Err is the error in the query. 1884 // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error 1885 Err error 1886} 1887 1888// QueryObserver is the interface implemented by query observers / stat collectors. 1889// 1890// Experimental, this interface and use may change 1891type QueryObserver interface { 1892 // ObserveQuery gets called on every query to cassandra, including all queries in an iterator when paging is enabled. 1893 // It doesn't get called if there is no query because the session is closed or there are no connections available. 1894 // The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil. 1895 ObserveQuery(context.Context, ObservedQuery) 1896} 1897 1898type ObservedBatch struct { 1899 Keyspace string 1900 Statements []string 1901 1902 Start time.Time // time immediately before the batch query was called 1903 End time.Time // time immediately after the batch query returned 1904 1905 // Host is the informations about the host that performed the batch 1906 Host *HostInfo 1907 1908 // Err is the error in the batch query. 1909 // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error 1910 Err error 1911 1912 // The metrics per this host 1913 Metrics *hostMetrics 1914} 1915 1916// BatchObserver is the interface implemented by batch observers / stat collectors. 1917type BatchObserver interface { 1918 // ObserveBatch gets called on every batch query to cassandra. 1919 // It also gets called once for each query in a batch. 1920 // It doesn't get called if there is no query because the session is closed or there are no connections available. 1921 // The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil. 1922 // Unlike QueryObserver.ObserveQuery it does no reporting on rows read. 1923 ObserveBatch(context.Context, ObservedBatch) 1924} 1925 1926type ObservedConnect struct { 1927 // Host is the information about the host about to connect 1928 Host *HostInfo 1929 1930 Start time.Time // time immediately before the dial is called 1931 End time.Time // time immediately after the dial returned 1932 1933 // Err is the connection error (if any) 1934 Err error 1935} 1936 1937// ConnectObserver is the interface implemented by connect observers / stat collectors. 1938type ConnectObserver interface { 1939 // ObserveConnect gets called when a new connection to cassandra is made. 1940 ObserveConnect(ObservedConnect) 1941} 1942 1943type Error struct { 1944 Code int 1945 Message string 1946} 1947 1948func (e Error) Error() string { 1949 return e.Message 1950} 1951 1952var ( 1953 ErrNotFound = errors.New("not found") 1954 ErrUnavailable = errors.New("unavailable") 1955 ErrUnsupported = errors.New("feature not supported") 1956 ErrTooManyStmts = errors.New("too many statements") 1957 ErrUseStmt = errors.New("use statements aren't supported. Please see https://github.com/gocql/gocql for explanation.") 1958 ErrSessionClosed = errors.New("session has been closed") 1959 ErrNoConnections = errors.New("gocql: no hosts available in the pool") 1960 ErrNoKeyspace = errors.New("no keyspace provided") 1961 ErrKeyspaceDoesNotExist = errors.New("keyspace does not exist") 1962 ErrNoMetadata = errors.New("no metadata available") 1963) 1964 1965type ErrProtocol struct{ error } 1966 1967func NewErrProtocol(format string, args ...interface{}) error { 1968 return ErrProtocol{fmt.Errorf(format, args...)} 1969} 1970 1971// BatchSizeMaximum is the maximum number of statements a batch operation can have. 1972// This limit is set by cassandra and could change in the future. 1973const BatchSizeMaximum = 65535 1974