1// Copyright (c) 2012 The gocql Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package gocql 6 7import ( 8 "bytes" 9 "context" 10 "encoding/binary" 11 "errors" 12 "fmt" 13 "io" 14 "net" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19 "unicode" 20 21 "github.com/gocql/gocql/internal/lru" 22) 23 24// Session is the interface used by users to interact with the database. 25// 26// It's safe for concurrent use by multiple goroutines and a typical usage 27// scenario is to have one global session object to interact with the 28// whole Cassandra cluster. 29// 30// This type extends the Node interface by adding a convenient query builder 31// and automatically sets a default consistency level on all operations 32// that do not have a consistency level set. 33type Session struct { 34 cons Consistency 35 pageSize int 36 prefetch float64 37 routingKeyInfoCache routingKeyInfoLRU 38 schemaDescriber *schemaDescriber 39 trace Tracer 40 queryObserver QueryObserver 41 batchObserver BatchObserver 42 connectObserver ConnectObserver 43 frameObserver FrameHeaderObserver 44 hostSource *ringDescriber 45 stmtsLRU *preparedLRU 46 47 connCfg *ConnConfig 48 49 executor *queryExecutor 50 pool *policyConnPool 51 policy HostSelectionPolicy 52 53 ring ring 54 metadata clusterMetadata 55 56 mu sync.RWMutex 57 58 control *controlConn 59 60 // event handlers 61 nodeEvents *eventDebouncer 62 schemaEvents *eventDebouncer 63 64 // ring metadata 65 useSystemSchema bool 66 hasAggregatesAndFunctions bool 67 68 cfg ClusterConfig 69 70 ctx context.Context 71 cancel context.CancelFunc 72 73 closeMu sync.RWMutex 74 isClosed bool 75} 76 77var queryPool = &sync.Pool{ 78 New: func() interface{} { 79 return new(Query) 80 }, 81} 82 83func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) { 84 var hosts []*HostInfo 85 for _, hostport := range addrs { 86 resolvedHosts, err := hostInfo(hostport, defaultPort) 87 if err != nil { 88 // Try other hosts if unable to resolve DNS name 89 if _, ok := err.(*net.DNSError); ok { 90 Logger.Printf("gocql: dns error: %v\n", err) 91 continue 92 } 93 return nil, err 94 } 95 96 hosts = append(hosts, resolvedHosts...) 97 } 98 if len(hosts) == 0 { 99 return nil, errors.New("failed to resolve any of the provided hostnames") 100 } 101 return hosts, nil 102} 103 104// NewSession wraps an existing Node. 105func NewSession(cfg ClusterConfig) (*Session, error) { 106 // Check that hosts in the ClusterConfig is not empty 107 if len(cfg.Hosts) < 1 { 108 return nil, ErrNoHosts 109 } 110 111 // Check that either Authenticator is set or AuthProvider, not both 112 if cfg.Authenticator != nil && cfg.AuthProvider != nil { 113 return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.") 114 } 115 116 // TODO: we should take a context in here at some point 117 ctx, cancel := context.WithCancel(context.TODO()) 118 119 s := &Session{ 120 cons: cfg.Consistency, 121 prefetch: 0.25, 122 cfg: cfg, 123 pageSize: cfg.PageSize, 124 stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)}, 125 connectObserver: cfg.ConnectObserver, 126 ctx: ctx, 127 cancel: cancel, 128 } 129 130 s.schemaDescriber = newSchemaDescriber(s) 131 132 s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent) 133 s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent) 134 135 s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo) 136 137 s.hostSource = &ringDescriber{session: s} 138 139 if cfg.PoolConfig.HostSelectionPolicy == nil { 140 cfg.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy() 141 } 142 s.pool = cfg.PoolConfig.buildPool(s) 143 144 s.policy = cfg.PoolConfig.HostSelectionPolicy 145 s.policy.Init(s) 146 147 s.executor = &queryExecutor{ 148 pool: s.pool, 149 policy: cfg.PoolConfig.HostSelectionPolicy, 150 } 151 152 s.queryObserver = cfg.QueryObserver 153 s.batchObserver = cfg.BatchObserver 154 s.connectObserver = cfg.ConnectObserver 155 s.frameObserver = cfg.FrameHeaderObserver 156 157 //Check the TLS Config before trying to connect to anything external 158 connCfg, err := connConfig(&s.cfg) 159 if err != nil { 160 //TODO: Return a typed error 161 return nil, fmt.Errorf("gocql: unable to create session: %v", err) 162 } 163 s.connCfg = connCfg 164 165 if err := s.init(); err != nil { 166 s.Close() 167 if err == ErrNoConnectionsStarted { 168 //This error used to be generated inside NewSession & returned directly 169 //Forward it on up to be backwards compatible 170 return nil, ErrNoConnectionsStarted 171 } else { 172 // TODO(zariel): dont wrap this error in fmt.Errorf, return a typed error 173 return nil, fmt.Errorf("gocql: unable to create session: %v", err) 174 } 175 } 176 177 return s, nil 178} 179 180func (s *Session) init() error { 181 hosts, err := addrsToHosts(s.cfg.Hosts, s.cfg.Port) 182 if err != nil { 183 return err 184 } 185 s.ring.endpoints = hosts 186 187 if !s.cfg.disableControlConn { 188 s.control = createControlConn(s) 189 if s.cfg.ProtoVersion == 0 { 190 proto, err := s.control.discoverProtocol(hosts) 191 if err != nil { 192 return fmt.Errorf("unable to discover protocol version: %v", err) 193 } else if proto == 0 { 194 return errors.New("unable to discovery protocol version") 195 } 196 197 // TODO(zariel): we really only need this in 1 place 198 s.cfg.ProtoVersion = proto 199 s.connCfg.ProtoVersion = proto 200 } 201 202 if err := s.control.connect(hosts); err != nil { 203 return err 204 } 205 206 if !s.cfg.DisableInitialHostLookup { 207 var partitioner string 208 newHosts, partitioner, err := s.hostSource.GetHosts() 209 if err != nil { 210 return err 211 } 212 s.policy.SetPartitioner(partitioner) 213 filteredHosts := make([]*HostInfo, 0, len(newHosts)) 214 for _, host := range newHosts { 215 if !s.cfg.filterHost(host) { 216 filteredHosts = append(filteredHosts, host) 217 } 218 } 219 hosts = append(hosts, filteredHosts...) 220 } 221 } 222 223 hostMap := make(map[string]*HostInfo, len(hosts)) 224 for _, host := range hosts { 225 hostMap[host.ConnectAddress().String()] = host 226 } 227 228 hosts = hosts[:0] 229 // each host will increment left and decrement it after connecting and once 230 // there's none left, we'll close hostCh 231 var left int64 232 // we will receive up to len(hostMap) of messages so create a buffer so we 233 // don't end up stuck in a goroutine if we stopped listening 234 connectedCh := make(chan struct{}, len(hostMap)) 235 // we add one here because we don't want to end up closing hostCh until we're 236 // done looping and the decerement code might be reached before we've looped 237 // again 238 atomic.AddInt64(&left, 1) 239 for _, host := range hostMap { 240 host := s.ring.addOrUpdate(host) 241 if s.cfg.filterHost(host) { 242 continue 243 } 244 245 atomic.AddInt64(&left, 1) 246 go func() { 247 s.pool.addHost(host) 248 connectedCh <- struct{}{} 249 250 // if there are no hosts left, then close the hostCh to unblock the loop 251 // below if its still waiting 252 if atomic.AddInt64(&left, -1) == 0 { 253 close(connectedCh) 254 } 255 }() 256 257 hosts = append(hosts, host) 258 } 259 // once we're done looping we subtract the one we initially added and check 260 // to see if we should close 261 if atomic.AddInt64(&left, -1) == 0 { 262 close(connectedCh) 263 } 264 265 // before waiting for them to connect, add them all to the policy so we can 266 // utilize efficiencies by calling AddHosts if the policy supports it 267 type bulkAddHosts interface { 268 AddHosts([]*HostInfo) 269 } 270 if v, ok := s.policy.(bulkAddHosts); ok { 271 v.AddHosts(hosts) 272 } else { 273 for _, host := range hosts { 274 s.policy.AddHost(host) 275 } 276 } 277 278 readyPolicy, _ := s.policy.(ReadyPolicy) 279 // now loop over connectedCh until it's closed (meaning we've connected to all) 280 // or until the policy says we're ready 281 for range connectedCh { 282 if readyPolicy != nil && readyPolicy.Ready() { 283 break 284 } 285 } 286 287 // TODO(zariel): we probably dont need this any more as we verify that we 288 // can connect to one of the endpoints supplied by using the control conn. 289 // See if there are any connections in the pool 290 if s.cfg.ReconnectInterval > 0 { 291 go s.reconnectDownedHosts(s.cfg.ReconnectInterval) 292 } 293 294 // If we disable the initial host lookup, we need to still check if the 295 // cluster is using the newer system schema or not... however, if control 296 // connection is disable, we really have no choice, so we just make our 297 // best guess... 298 if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup { 299 newer, _ := checkSystemSchema(s.control) 300 s.useSystemSchema = newer 301 } else { 302 version := s.ring.rrHost().Version() 303 s.useSystemSchema = version.AtLeast(3, 0, 0) 304 s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0) 305 } 306 307 if s.pool.Size() == 0 { 308 return ErrNoConnectionsStarted 309 } 310 311 // Invoke KeyspaceChanged to let the policy cache the session keyspace 312 // parameters. This is used by tokenAwareHostPolicy to discover replicas. 313 if !s.cfg.disableControlConn && s.cfg.Keyspace != "" { 314 s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: s.cfg.Keyspace}) 315 } 316 317 return nil 318} 319 320// AwaitSchemaAgreement will wait until schema versions across all nodes in the 321// cluster are the same (as seen from the point of view of the control connection). 322// The maximum amount of time this takes is governed 323// by the MaxWaitSchemaAgreement setting in the configuration (default: 60s). 324// AwaitSchemaAgreement returns an error in case schema versions are not the same 325// after the timeout specified in MaxWaitSchemaAgreement elapses. 326func (s *Session) AwaitSchemaAgreement(ctx context.Context) error { 327 if s.cfg.disableControlConn { 328 return errNoControl 329 } 330 return s.control.withConn(func(conn *Conn) *Iter { 331 return &Iter{err: conn.awaitSchemaAgreement(ctx)} 332 }).err 333} 334 335func (s *Session) reconnectDownedHosts(intv time.Duration) { 336 reconnectTicker := time.NewTicker(intv) 337 defer reconnectTicker.Stop() 338 339 for { 340 select { 341 case <-reconnectTicker.C: 342 hosts := s.ring.allHosts() 343 344 // Print session.ring for debug. 345 if gocqlDebug { 346 buf := bytes.NewBufferString("Session.ring:") 347 for _, h := range hosts { 348 buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]") 349 } 350 Logger.Println(buf.String()) 351 } 352 353 for _, h := range hosts { 354 if h.IsUp() { 355 continue 356 } 357 // we let the pool call handleNodeUp to change the host state 358 s.pool.addHost(h) 359 } 360 case <-s.ctx.Done(): 361 return 362 } 363 } 364} 365 366// SetConsistency sets the default consistency level for this session. This 367// setting can also be changed on a per-query basis and the default value 368// is Quorum. 369func (s *Session) SetConsistency(cons Consistency) { 370 s.mu.Lock() 371 s.cons = cons 372 s.mu.Unlock() 373} 374 375// SetPageSize sets the default page size for this session. A value <= 0 will 376// disable paging. This setting can also be changed on a per-query basis. 377func (s *Session) SetPageSize(n int) { 378 s.mu.Lock() 379 s.pageSize = n 380 s.mu.Unlock() 381} 382 383// SetPrefetch sets the default threshold for pre-fetching new pages. If 384// there are only p*pageSize rows remaining, the next page will be requested 385// automatically. This value can also be changed on a per-query basis and 386// the default value is 0.25. 387func (s *Session) SetPrefetch(p float64) { 388 s.mu.Lock() 389 s.prefetch = p 390 s.mu.Unlock() 391} 392 393// SetTrace sets the default tracer for this session. This setting can also 394// be changed on a per-query basis. 395func (s *Session) SetTrace(trace Tracer) { 396 s.mu.Lock() 397 s.trace = trace 398 s.mu.Unlock() 399} 400 401// Query generates a new query object for interacting with the database. 402// Further details of the query may be tweaked using the resulting query 403// value before the query is executed. Query is automatically prepared 404// if it has not previously been executed. 405func (s *Session) Query(stmt string, values ...interface{}) *Query { 406 qry := queryPool.Get().(*Query) 407 qry.session = s 408 qry.stmt = stmt 409 qry.values = values 410 qry.defaultsFromSession() 411 return qry 412} 413 414type QueryInfo struct { 415 Id []byte 416 Args []ColumnInfo 417 Rval []ColumnInfo 418 PKeyColumns []int 419} 420 421// Bind generates a new query object based on the query statement passed in. 422// The query is automatically prepared if it has not previously been executed. 423// The binding callback allows the application to define which query argument 424// values will be marshalled as part of the query execution. 425// During execution, the meta data of the prepared query will be routed to the 426// binding callback, which is responsible for producing the query argument values. 427func (s *Session) Bind(stmt string, b func(q *QueryInfo) ([]interface{}, error)) *Query { 428 qry := queryPool.Get().(*Query) 429 qry.session = s 430 qry.stmt = stmt 431 qry.binding = b 432 qry.defaultsFromSession() 433 return qry 434} 435 436// Close closes all connections. The session is unusable after this 437// operation. 438func (s *Session) Close() { 439 440 s.closeMu.Lock() 441 defer s.closeMu.Unlock() 442 if s.isClosed { 443 return 444 } 445 s.isClosed = true 446 447 if s.pool != nil { 448 s.pool.Close() 449 } 450 451 if s.control != nil { 452 s.control.close() 453 } 454 455 if s.nodeEvents != nil { 456 s.nodeEvents.stop() 457 } 458 459 if s.schemaEvents != nil { 460 s.schemaEvents.stop() 461 } 462 463 if s.cancel != nil { 464 s.cancel() 465 } 466} 467 468func (s *Session) Closed() bool { 469 s.closeMu.RLock() 470 closed := s.isClosed 471 s.closeMu.RUnlock() 472 return closed 473} 474 475func (s *Session) executeQuery(qry *Query) (it *Iter) { 476 // fail fast 477 if s.Closed() { 478 return &Iter{err: ErrSessionClosed} 479 } 480 481 iter, err := s.executor.executeQuery(qry) 482 if err != nil { 483 return &Iter{err: err} 484 } 485 if iter == nil { 486 panic("nil iter") 487 } 488 489 return iter 490} 491 492func (s *Session) removeHost(h *HostInfo) { 493 s.policy.RemoveHost(h) 494 s.pool.removeHost(h.ConnectAddress()) 495 s.ring.removeHost(h.ConnectAddress()) 496} 497 498// KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist. 499func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { 500 // fail fast 501 if s.Closed() { 502 return nil, ErrSessionClosed 503 } else if keyspace == "" { 504 return nil, ErrNoKeyspace 505 } 506 507 return s.schemaDescriber.getSchema(keyspace) 508} 509 510func (s *Session) getConn() *Conn { 511 hosts := s.ring.allHosts() 512 for _, host := range hosts { 513 if !host.IsUp() { 514 continue 515 } 516 517 pool, ok := s.pool.getPool(host) 518 if !ok { 519 continue 520 } else if conn := pool.Pick(); conn != nil { 521 return conn 522 } 523 } 524 525 return nil 526} 527 528// returns routing key indexes and type info 529func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyInfo, error) { 530 s.routingKeyInfoCache.mu.Lock() 531 532 entry, cached := s.routingKeyInfoCache.lru.Get(stmt) 533 if cached { 534 // done accessing the cache 535 s.routingKeyInfoCache.mu.Unlock() 536 // the entry is an inflight struct similar to that used by 537 // Conn to prepare statements 538 inflight := entry.(*inflightCachedEntry) 539 540 // wait for any inflight work 541 inflight.wg.Wait() 542 543 if inflight.err != nil { 544 return nil, inflight.err 545 } 546 547 key, _ := inflight.value.(*routingKeyInfo) 548 549 return key, nil 550 } 551 552 // create a new inflight entry while the data is created 553 inflight := new(inflightCachedEntry) 554 inflight.wg.Add(1) 555 defer inflight.wg.Done() 556 s.routingKeyInfoCache.lru.Add(stmt, inflight) 557 s.routingKeyInfoCache.mu.Unlock() 558 559 var ( 560 info *preparedStatment 561 partitionKey []*ColumnMetadata 562 ) 563 564 conn := s.getConn() 565 if conn == nil { 566 // TODO: better error? 567 inflight.err = errors.New("gocql: unable to fetch prepared info: no connection available") 568 return nil, inflight.err 569 } 570 571 // get the query info for the statement 572 info, inflight.err = conn.prepareStatement(ctx, stmt, nil) 573 if inflight.err != nil { 574 // don't cache this error 575 s.routingKeyInfoCache.Remove(stmt) 576 return nil, inflight.err 577 } 578 579 // TODO: it would be nice to mark hosts here but as we are not using the policies 580 // to fetch hosts we cant 581 582 if info.request.colCount == 0 { 583 // no arguments, no routing key, and no error 584 return nil, nil 585 } 586 587 if len(info.request.pkeyColumns) > 0 { 588 // proto v4 dont need to calculate primary key columns 589 types := make([]TypeInfo, len(info.request.pkeyColumns)) 590 for i, col := range info.request.pkeyColumns { 591 types[i] = info.request.columns[col].TypeInfo 592 } 593 594 routingKeyInfo := &routingKeyInfo{ 595 indexes: info.request.pkeyColumns, 596 types: types, 597 } 598 599 inflight.value = routingKeyInfo 600 return routingKeyInfo, nil 601 } 602 603 // get the table metadata 604 table := info.request.columns[0].Table 605 606 var keyspaceMetadata *KeyspaceMetadata 607 keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace) 608 if inflight.err != nil { 609 // don't cache this error 610 s.routingKeyInfoCache.Remove(stmt) 611 return nil, inflight.err 612 } 613 614 tableMetadata, found := keyspaceMetadata.Tables[table] 615 if !found { 616 // unlikely that the statement could be prepared and the metadata for 617 // the table couldn't be found, but this may indicate either a bug 618 // in the metadata code, or that the table was just dropped. 619 inflight.err = ErrNoMetadata 620 // don't cache this error 621 s.routingKeyInfoCache.Remove(stmt) 622 return nil, inflight.err 623 } 624 625 partitionKey = tableMetadata.PartitionKey 626 627 size := len(partitionKey) 628 routingKeyInfo := &routingKeyInfo{ 629 indexes: make([]int, size), 630 types: make([]TypeInfo, size), 631 } 632 633 for keyIndex, keyColumn := range partitionKey { 634 // set an indicator for checking if the mapping is missing 635 routingKeyInfo.indexes[keyIndex] = -1 636 637 // find the column in the query info 638 for argIndex, boundColumn := range info.request.columns { 639 if keyColumn.Name == boundColumn.Name { 640 // there may be many such bound columns, pick the first 641 routingKeyInfo.indexes[keyIndex] = argIndex 642 routingKeyInfo.types[keyIndex] = boundColumn.TypeInfo 643 break 644 } 645 } 646 647 if routingKeyInfo.indexes[keyIndex] == -1 { 648 // missing a routing key column mapping 649 // no routing key, and no error 650 return nil, nil 651 } 652 } 653 654 // cache this result 655 inflight.value = routingKeyInfo 656 657 return routingKeyInfo, nil 658} 659 660func (b *Batch) execute(ctx context.Context, conn *Conn) *Iter { 661 return conn.executeBatch(ctx, b) 662} 663 664func (s *Session) executeBatch(batch *Batch) *Iter { 665 // fail fast 666 if s.Closed() { 667 return &Iter{err: ErrSessionClosed} 668 } 669 670 // Prevent the execution of the batch if greater than the limit 671 // Currently batches have a limit of 65536 queries. 672 // https://datastax-oss.atlassian.net/browse/JAVA-229 673 if batch.Size() > BatchSizeMaximum { 674 return &Iter{err: ErrTooManyStmts} 675 } 676 677 iter, err := s.executor.executeQuery(batch) 678 if err != nil { 679 return &Iter{err: err} 680 } 681 682 return iter 683} 684 685// ExecuteBatch executes a batch operation and returns nil if successful 686// otherwise an error is returned describing the failure. 687func (s *Session) ExecuteBatch(batch *Batch) error { 688 iter := s.executeBatch(batch) 689 return iter.Close() 690} 691 692// ExecuteBatchCAS executes a batch operation and returns true if successful and 693// an iterator (to scan additional rows if more than one conditional statement) 694// was sent. 695// Further scans on the interator must also remember to include 696// the applied boolean as the first argument to *Iter.Scan 697func (s *Session) ExecuteBatchCAS(batch *Batch, dest ...interface{}) (applied bool, iter *Iter, err error) { 698 iter = s.executeBatch(batch) 699 if err := iter.checkErrAndNotFound(); err != nil { 700 iter.Close() 701 return false, nil, err 702 } 703 704 if len(iter.Columns()) > 1 { 705 dest = append([]interface{}{&applied}, dest...) 706 iter.Scan(dest...) 707 } else { 708 iter.Scan(&applied) 709 } 710 711 return applied, iter, nil 712} 713 714// MapExecuteBatchCAS executes a batch operation much like ExecuteBatchCAS, 715// however it accepts a map rather than a list of arguments for the initial 716// scan. 717func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) (applied bool, iter *Iter, err error) { 718 iter = s.executeBatch(batch) 719 if err := iter.checkErrAndNotFound(); err != nil { 720 iter.Close() 721 return false, nil, err 722 } 723 iter.MapScan(dest) 724 applied = dest["[applied]"].(bool) 725 delete(dest, "[applied]") 726 727 // we usually close here, but instead of closing, just returin an error 728 // if MapScan failed. Although Close just returns err, using Close 729 // here might be confusing as we are not actually closing the iter 730 return applied, iter, iter.err 731} 732 733type hostMetrics struct { 734 // Attempts is count of how many times this query has been attempted for this host. 735 // An attempt is either a retry or fetching next page of results. 736 Attempts int 737 738 // TotalLatency is the sum of attempt latencies for this host in nanoseconds. 739 TotalLatency int64 740} 741 742type queryMetrics struct { 743 l sync.RWMutex 744 m map[string]*hostMetrics 745 // totalAttempts is total number of attempts. 746 // Equal to sum of all hostMetrics' Attempts. 747 totalAttempts int 748} 749 750// preFilledQueryMetrics initializes new queryMetrics based on per-host supplied data. 751func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics { 752 qm := &queryMetrics{m: m} 753 for _, hm := range qm.m { 754 qm.totalAttempts += hm.Attempts 755 } 756 return qm 757} 758 759// hostMetrics returns a snapshot of metrics for given host. 760// If the metrics for host don't exist, they are created. 761func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics { 762 qm.l.Lock() 763 metrics := qm.hostMetricsLocked(host) 764 copied := new(hostMetrics) 765 *copied = *metrics 766 qm.l.Unlock() 767 return copied 768} 769 770// hostMetricsLocked gets or creates host metrics for given host. 771// It must be called only while holding qm.l lock. 772func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics { 773 metrics, exists := qm.m[host.ConnectAddress().String()] 774 if !exists { 775 // if the host is not in the map, it means it's been accessed for the first time 776 metrics = &hostMetrics{} 777 qm.m[host.ConnectAddress().String()] = metrics 778 } 779 780 return metrics 781} 782 783// attempts returns the number of times the query was executed. 784func (qm *queryMetrics) attempts() int { 785 qm.l.Lock() 786 attempts := qm.totalAttempts 787 qm.l.Unlock() 788 return attempts 789} 790 791func (qm *queryMetrics) latency() int64 { 792 qm.l.Lock() 793 var ( 794 attempts int 795 latency int64 796 ) 797 for _, metric := range qm.m { 798 attempts += metric.Attempts 799 latency += metric.TotalLatency 800 } 801 qm.l.Unlock() 802 if attempts > 0 { 803 return latency / int64(attempts) 804 } 805 return 0 806} 807 808// attempt adds given number of attempts and latency for given host. 809// It returns previous total attempts. 810// If needsHostMetrics is true, a copy of updated hostMetrics is returned. 811func (qm *queryMetrics) attempt(addAttempts int, addLatency time.Duration, 812 host *HostInfo, needsHostMetrics bool) (int, *hostMetrics) { 813 qm.l.Lock() 814 815 totalAttempts := qm.totalAttempts 816 qm.totalAttempts += addAttempts 817 818 updateHostMetrics := qm.hostMetricsLocked(host) 819 updateHostMetrics.Attempts += addAttempts 820 updateHostMetrics.TotalLatency += addLatency.Nanoseconds() 821 822 var hostMetricsCopy *hostMetrics 823 if needsHostMetrics { 824 hostMetricsCopy = new(hostMetrics) 825 *hostMetricsCopy = *updateHostMetrics 826 } 827 828 qm.l.Unlock() 829 return totalAttempts, hostMetricsCopy 830} 831 832// Query represents a CQL statement that can be executed. 833type Query struct { 834 stmt string 835 values []interface{} 836 cons Consistency 837 pageSize int 838 routingKey []byte 839 pageState []byte 840 prefetch float64 841 trace Tracer 842 observer QueryObserver 843 session *Session 844 conn *Conn 845 rt RetryPolicy 846 spec SpeculativeExecutionPolicy 847 binding func(q *QueryInfo) ([]interface{}, error) 848 serialCons SerialConsistency 849 defaultTimestamp bool 850 defaultTimestampValue int64 851 disableSkipMetadata bool 852 context context.Context 853 idempotent bool 854 customPayload map[string][]byte 855 metrics *queryMetrics 856 857 disableAutoPage bool 858 859 // getKeyspace is field so that it can be overriden in tests 860 getKeyspace func() string 861 862 // used by control conn queries to prevent triggering a write to systems 863 // tables in AWS MCS see 864 skipPrepare bool 865} 866 867func (q *Query) defaultsFromSession() { 868 s := q.session 869 870 s.mu.RLock() 871 q.cons = s.cons 872 q.pageSize = s.pageSize 873 q.trace = s.trace 874 q.observer = s.queryObserver 875 q.prefetch = s.prefetch 876 q.rt = s.cfg.RetryPolicy 877 q.serialCons = s.cfg.SerialConsistency 878 q.defaultTimestamp = s.cfg.DefaultTimestamp 879 q.idempotent = s.cfg.DefaultIdempotence 880 q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)} 881 882 q.spec = &NonSpeculativeExecution{} 883 s.mu.RUnlock() 884} 885 886// Statement returns the statement that was used to generate this query. 887func (q Query) Statement() string { 888 return q.stmt 889} 890 891// String implements the stringer interface. 892func (q Query) String() string { 893 return fmt.Sprintf("[query statement=%q values=%+v consistency=%s]", q.stmt, q.values, q.cons) 894} 895 896//Attempts returns the number of times the query was executed. 897func (q *Query) Attempts() int { 898 return q.metrics.attempts() 899} 900 901func (q *Query) AddAttempts(i int, host *HostInfo) { 902 q.metrics.attempt(i, 0, host, false) 903} 904 905//Latency returns the average amount of nanoseconds per attempt of the query. 906func (q *Query) Latency() int64 { 907 return q.metrics.latency() 908} 909 910func (q *Query) AddLatency(l int64, host *HostInfo) { 911 q.metrics.attempt(0, time.Duration(l)*time.Nanosecond, host, false) 912} 913 914// Consistency sets the consistency level for this query. If no consistency 915// level have been set, the default consistency level of the cluster 916// is used. 917func (q *Query) Consistency(c Consistency) *Query { 918 q.cons = c 919 return q 920} 921 922// GetConsistency returns the currently configured consistency level for 923// the query. 924func (q *Query) GetConsistency() Consistency { 925 return q.cons 926} 927 928// Same as Consistency but without a return value 929func (q *Query) SetConsistency(c Consistency) { 930 q.cons = c 931} 932 933// CustomPayload sets the custom payload level for this query. 934func (q *Query) CustomPayload(customPayload map[string][]byte) *Query { 935 q.customPayload = customPayload 936 return q 937} 938 939func (q *Query) Context() context.Context { 940 if q.context == nil { 941 return context.Background() 942 } 943 return q.context 944} 945 946// Trace enables tracing of this query. Look at the documentation of the 947// Tracer interface to learn more about tracing. 948func (q *Query) Trace(trace Tracer) *Query { 949 q.trace = trace 950 return q 951} 952 953// Observer enables query-level observer on this query. 954// The provided observer will be called every time this query is executed. 955func (q *Query) Observer(observer QueryObserver) *Query { 956 q.observer = observer 957 return q 958} 959 960// PageSize will tell the iterator to fetch the result in pages of size n. 961// This is useful for iterating over large result sets, but setting the 962// page size too low might decrease the performance. This feature is only 963// available in Cassandra 2 and onwards. 964func (q *Query) PageSize(n int) *Query { 965 q.pageSize = n 966 return q 967} 968 969// DefaultTimestamp will enable the with default timestamp flag on the query. 970// If enable, this will replace the server side assigned 971// timestamp as default timestamp. Note that a timestamp in the query itself 972// will still override this timestamp. This is entirely optional. 973// 974// Only available on protocol >= 3 975func (q *Query) DefaultTimestamp(enable bool) *Query { 976 q.defaultTimestamp = enable 977 return q 978} 979 980// WithTimestamp will enable the with default timestamp flag on the query 981// like DefaultTimestamp does. But also allows to define value for timestamp. 982// It works the same way as USING TIMESTAMP in the query itself, but 983// should not break prepared query optimization. 984// 985// Only available on protocol >= 3 986func (q *Query) WithTimestamp(timestamp int64) *Query { 987 q.DefaultTimestamp(true) 988 q.defaultTimestampValue = timestamp 989 return q 990} 991 992// RoutingKey sets the routing key to use when a token aware connection 993// pool is used to optimize the routing of this query. 994func (q *Query) RoutingKey(routingKey []byte) *Query { 995 q.routingKey = routingKey 996 return q 997} 998 999func (q *Query) withContext(ctx context.Context) ExecutableQuery { 1000 // I really wish go had covariant types 1001 return q.WithContext(ctx) 1002} 1003 1004// WithContext returns a shallow copy of q with its context 1005// set to ctx. 1006// 1007// The provided context controls the entire lifetime of executing a 1008// query, queries will be canceled and return once the context is 1009// canceled. 1010func (q *Query) WithContext(ctx context.Context) *Query { 1011 q2 := *q 1012 q2.context = ctx 1013 return &q2 1014} 1015 1016// Deprecate: does nothing, cancel the context passed to WithContext 1017func (q *Query) Cancel() { 1018 // TODO: delete 1019} 1020 1021func (q *Query) execute(ctx context.Context, conn *Conn) *Iter { 1022 return conn.executeQuery(ctx, q) 1023} 1024 1025func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { 1026 latency := end.Sub(start) 1027 attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.observer != nil) 1028 1029 if q.observer != nil { 1030 q.observer.ObserveQuery(q.Context(), ObservedQuery{ 1031 Keyspace: keyspace, 1032 Statement: q.stmt, 1033 Start: start, 1034 End: end, 1035 Rows: iter.numRows, 1036 Host: host, 1037 Metrics: metricsForHost, 1038 Err: iter.err, 1039 Attempt: attempt, 1040 }) 1041 } 1042} 1043 1044func (q *Query) retryPolicy() RetryPolicy { 1045 return q.rt 1046} 1047 1048// Keyspace returns the keyspace the query will be executed against. 1049func (q *Query) Keyspace() string { 1050 if q.getKeyspace != nil { 1051 return q.getKeyspace() 1052 } 1053 if q.session == nil { 1054 return "" 1055 } 1056 // TODO(chbannis): this should be parsed from the query or we should let 1057 // this be set by users. 1058 return q.session.cfg.Keyspace 1059} 1060 1061// GetRoutingKey gets the routing key to use for routing this query. If 1062// a routing key has not been explicitly set, then the routing key will 1063// be constructed if possible using the keyspace's schema and the query 1064// info for this query statement. If the routing key cannot be determined 1065// then nil will be returned with no error. On any error condition, 1066// an error description will be returned. 1067func (q *Query) GetRoutingKey() ([]byte, error) { 1068 if q.routingKey != nil { 1069 return q.routingKey, nil 1070 } else if q.binding != nil && len(q.values) == 0 { 1071 // If this query was created using session.Bind we wont have the query 1072 // values yet, so we have to pass down to the next policy. 1073 // TODO: Remove this and handle this case 1074 return nil, nil 1075 } 1076 1077 // try to determine the routing key 1078 routingKeyInfo, err := q.session.routingKeyInfo(q.Context(), q.stmt) 1079 if err != nil { 1080 return nil, err 1081 } 1082 1083 return createRoutingKey(routingKeyInfo, q.values) 1084} 1085 1086func (q *Query) shouldPrepare() bool { 1087 1088 stmt := strings.TrimLeftFunc(strings.TrimRightFunc(q.stmt, func(r rune) bool { 1089 return unicode.IsSpace(r) || r == ';' 1090 }), unicode.IsSpace) 1091 1092 var stmtType string 1093 if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 { 1094 stmtType = strings.ToLower(stmt[:n]) 1095 } 1096 if stmtType == "begin" { 1097 if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 { 1098 stmtType = strings.ToLower(stmt[n+1:]) 1099 } 1100 } 1101 switch stmtType { 1102 case "select", "insert", "update", "delete", "batch": 1103 return true 1104 } 1105 return false 1106} 1107 1108// SetPrefetch sets the default threshold for pre-fetching new pages. If 1109// there are only p*pageSize rows remaining, the next page will be requested 1110// automatically. 1111func (q *Query) Prefetch(p float64) *Query { 1112 q.prefetch = p 1113 return q 1114} 1115 1116// RetryPolicy sets the policy to use when retrying the query. 1117func (q *Query) RetryPolicy(r RetryPolicy) *Query { 1118 q.rt = r 1119 return q 1120} 1121 1122// SetSpeculativeExecutionPolicy sets the execution policy 1123func (q *Query) SetSpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Query { 1124 q.spec = sp 1125 return q 1126} 1127 1128// speculativeExecutionPolicy fetches the policy 1129func (q *Query) speculativeExecutionPolicy() SpeculativeExecutionPolicy { 1130 return q.spec 1131} 1132 1133// IsIdempotent returns whether the query is marked as idempotent. 1134// Non-idempotent query won't be retried. 1135// See "Retries and speculative execution" in package docs for more details. 1136func (q *Query) IsIdempotent() bool { 1137 return q.idempotent 1138} 1139 1140// Idempotent marks the query as being idempotent or not depending on 1141// the value. 1142// Non-idempotent query won't be retried. 1143// See "Retries and speculative execution" in package docs for more details. 1144func (q *Query) Idempotent(value bool) *Query { 1145 q.idempotent = value 1146 return q 1147} 1148 1149// Bind sets query arguments of query. This can also be used to rebind new query arguments 1150// to an existing query instance. 1151func (q *Query) Bind(v ...interface{}) *Query { 1152 q.values = v 1153 q.pageState = nil 1154 return q 1155} 1156 1157// SerialConsistency sets the consistency level for the 1158// serial phase of conditional updates. That consistency can only be 1159// either SERIAL or LOCAL_SERIAL and if not present, it defaults to 1160// SERIAL. This option will be ignored for anything else that a 1161// conditional update/insert. 1162func (q *Query) SerialConsistency(cons SerialConsistency) *Query { 1163 q.serialCons = cons 1164 return q 1165} 1166 1167// PageState sets the paging state for the query to resume paging from a specific 1168// point in time. Setting this will disable to query paging for this query, and 1169// must be used for all subsequent pages. 1170func (q *Query) PageState(state []byte) *Query { 1171 q.pageState = state 1172 q.disableAutoPage = true 1173 return q 1174} 1175 1176// NoSkipMetadata will override the internal result metadata cache so that the driver does not 1177// send skip_metadata for queries, this means that the result will always contain 1178// the metadata to parse the rows and will not reuse the metadata from the prepared 1179// statement. This should only be used to work around cassandra bugs, such as when using 1180// CAS operations which do not end in Cas. 1181// 1182// See https://issues.apache.org/jira/browse/CASSANDRA-11099 1183// https://github.com/gocql/gocql/issues/612 1184func (q *Query) NoSkipMetadata() *Query { 1185 q.disableSkipMetadata = true 1186 return q 1187} 1188 1189// Exec executes the query without returning any rows. 1190func (q *Query) Exec() error { 1191 return q.Iter().Close() 1192} 1193 1194func isUseStatement(stmt string) bool { 1195 if len(stmt) < 3 { 1196 return false 1197 } 1198 1199 return strings.EqualFold(stmt[0:3], "use") 1200} 1201 1202// Iter executes the query and returns an iterator capable of iterating 1203// over all results. 1204func (q *Query) Iter() *Iter { 1205 if isUseStatement(q.stmt) { 1206 return &Iter{err: ErrUseStmt} 1207 } 1208 // if the query was specifically run on a connection then re-use that 1209 // connection when fetching the next results 1210 if q.conn != nil { 1211 return q.conn.executeQuery(q.Context(), q) 1212 } 1213 return q.session.executeQuery(q) 1214} 1215 1216// MapScan executes the query, copies the columns of the first selected 1217// row into the map pointed at by m and discards the rest. If no rows 1218// were selected, ErrNotFound is returned. 1219func (q *Query) MapScan(m map[string]interface{}) error { 1220 iter := q.Iter() 1221 if err := iter.checkErrAndNotFound(); err != nil { 1222 return err 1223 } 1224 iter.MapScan(m) 1225 return iter.Close() 1226} 1227 1228// Scan executes the query, copies the columns of the first selected 1229// row into the values pointed at by dest and discards the rest. If no rows 1230// were selected, ErrNotFound is returned. 1231func (q *Query) Scan(dest ...interface{}) error { 1232 iter := q.Iter() 1233 if err := iter.checkErrAndNotFound(); err != nil { 1234 return err 1235 } 1236 iter.Scan(dest...) 1237 return iter.Close() 1238} 1239 1240// ScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT 1241// statement containing an IF clause). If the transaction fails because 1242// the existing values did not match, the previous values will be stored 1243// in dest. 1244// 1245// As for INSERT .. IF NOT EXISTS, previous values will be returned as if 1246// SELECT * FROM. So using ScanCAS with INSERT is inherently prone to 1247// column mismatching. Use MapScanCAS to capture them safely. 1248func (q *Query) ScanCAS(dest ...interface{}) (applied bool, err error) { 1249 q.disableSkipMetadata = true 1250 iter := q.Iter() 1251 if err := iter.checkErrAndNotFound(); err != nil { 1252 return false, err 1253 } 1254 if len(iter.Columns()) > 1 { 1255 dest = append([]interface{}{&applied}, dest...) 1256 iter.Scan(dest...) 1257 } else { 1258 iter.Scan(&applied) 1259 } 1260 return applied, iter.Close() 1261} 1262 1263// MapScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT 1264// statement containing an IF clause). If the transaction fails because 1265// the existing values did not match, the previous values will be stored 1266// in dest map. 1267// 1268// As for INSERT .. IF NOT EXISTS, previous values will be returned as if 1269// SELECT * FROM. So using ScanCAS with INSERT is inherently prone to 1270// column mismatching. MapScanCAS is added to capture them safely. 1271func (q *Query) MapScanCAS(dest map[string]interface{}) (applied bool, err error) { 1272 q.disableSkipMetadata = true 1273 iter := q.Iter() 1274 if err := iter.checkErrAndNotFound(); err != nil { 1275 return false, err 1276 } 1277 iter.MapScan(dest) 1278 applied = dest["[applied]"].(bool) 1279 delete(dest, "[applied]") 1280 1281 return applied, iter.Close() 1282} 1283 1284// Release releases a query back into a pool of queries. Released Queries 1285// cannot be reused. 1286// 1287// Example: 1288// qry := session.Query("SELECT * FROM my_table") 1289// qry.Exec() 1290// qry.Release() 1291func (q *Query) Release() { 1292 q.reset() 1293 queryPool.Put(q) 1294} 1295 1296// reset zeroes out all fields of a query so that it can be safely pooled. 1297func (q *Query) reset() { 1298 *q = Query{} 1299} 1300 1301// Iter represents an iterator that can be used to iterate over all rows that 1302// were returned by a query. The iterator might send additional queries to the 1303// database during the iteration if paging was enabled. 1304type Iter struct { 1305 err error 1306 pos int 1307 meta resultMetadata 1308 numRows int 1309 next *nextIter 1310 host *HostInfo 1311 1312 framer *framer 1313 closed int32 1314} 1315 1316// Host returns the host which the query was sent to. 1317func (iter *Iter) Host() *HostInfo { 1318 return iter.host 1319} 1320 1321// Columns returns the name and type of the selected columns. 1322func (iter *Iter) Columns() []ColumnInfo { 1323 return iter.meta.columns 1324} 1325 1326type Scanner interface { 1327 // Next advances the row pointer to point at the next row, the row is valid until 1328 // the next call of Next. It returns true if there is a row which is available to be 1329 // scanned into with Scan. 1330 // Next must be called before every call to Scan. 1331 Next() bool 1332 1333 // Scan copies the current row's columns into dest. If the length of dest does not equal 1334 // the number of columns returned in the row an error is returned. If an error is encountered 1335 // when unmarshalling a column into the value in dest an error is returned and the row is invalidated 1336 // until the next call to Next. 1337 // Next must be called before calling Scan, if it is not an error is returned. 1338 Scan(...interface{}) error 1339 1340 // Err returns the if there was one during iteration that resulted in iteration being unable to complete. 1341 // Err will also release resources held by the iterator, the Scanner should not used after being called. 1342 Err() error 1343} 1344 1345type iterScanner struct { 1346 iter *Iter 1347 cols [][]byte 1348 valid bool 1349} 1350 1351func (is *iterScanner) Next() bool { 1352 iter := is.iter 1353 if iter.err != nil { 1354 return false 1355 } 1356 1357 if iter.pos >= iter.numRows { 1358 if iter.next != nil { 1359 is.iter = iter.next.fetch() 1360 return is.Next() 1361 } 1362 return false 1363 } 1364 1365 for i := 0; i < len(is.cols); i++ { 1366 col, err := iter.readColumn() 1367 if err != nil { 1368 iter.err = err 1369 return false 1370 } 1371 is.cols[i] = col 1372 } 1373 iter.pos++ 1374 is.valid = true 1375 1376 return true 1377} 1378 1379func scanColumn(p []byte, col ColumnInfo, dest []interface{}) (int, error) { 1380 if dest[0] == nil { 1381 return 1, nil 1382 } 1383 1384 if col.TypeInfo.Type() == TypeTuple { 1385 // this will panic, actually a bug, please report 1386 tuple := col.TypeInfo.(TupleTypeInfo) 1387 1388 count := len(tuple.Elems) 1389 // here we pass in a slice of the struct which has the number number of 1390 // values as elements in the tuple 1391 if err := Unmarshal(col.TypeInfo, p, dest[:count]); err != nil { 1392 return 0, err 1393 } 1394 return count, nil 1395 } else { 1396 if err := Unmarshal(col.TypeInfo, p, dest[0]); err != nil { 1397 return 0, err 1398 } 1399 return 1, nil 1400 } 1401} 1402 1403func (is *iterScanner) Scan(dest ...interface{}) error { 1404 if !is.valid { 1405 return errors.New("gocql: Scan called without calling Next") 1406 } 1407 1408 iter := is.iter 1409 // currently only support scanning into an expand tuple, such that its the same 1410 // as scanning in more values from a single column 1411 if len(dest) != iter.meta.actualColCount { 1412 return fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount) 1413 } 1414 1415 // i is the current position in dest, could posible replace it and just use 1416 // slices of dest 1417 i := 0 1418 var err error 1419 for _, col := range iter.meta.columns { 1420 var n int 1421 n, err = scanColumn(is.cols[i], col, dest[i:]) 1422 if err != nil { 1423 break 1424 } 1425 i += n 1426 } 1427 1428 is.valid = false 1429 return err 1430} 1431 1432func (is *iterScanner) Err() error { 1433 iter := is.iter 1434 is.iter = nil 1435 is.cols = nil 1436 is.valid = false 1437 return iter.Close() 1438} 1439 1440// Scanner returns a row Scanner which provides an interface to scan rows in a manner which is 1441// similar to database/sql. The iter should NOT be used again after calling this method. 1442func (iter *Iter) Scanner() Scanner { 1443 if iter == nil { 1444 return nil 1445 } 1446 1447 return &iterScanner{iter: iter, cols: make([][]byte, len(iter.meta.columns))} 1448} 1449 1450func (iter *Iter) readColumn() ([]byte, error) { 1451 return iter.framer.readBytesInternal() 1452} 1453 1454// Scan consumes the next row of the iterator and copies the columns of the 1455// current row into the values pointed at by dest. Use nil as a dest value 1456// to skip the corresponding column. Scan might send additional queries 1457// to the database to retrieve the next set of rows if paging was enabled. 1458// 1459// Scan returns true if the row was successfully unmarshaled or false if the 1460// end of the result set was reached or if an error occurred. Close should 1461// be called afterwards to retrieve any potential errors. 1462func (iter *Iter) Scan(dest ...interface{}) bool { 1463 if iter.err != nil { 1464 return false 1465 } 1466 1467 if iter.pos >= iter.numRows { 1468 if iter.next != nil { 1469 *iter = *iter.next.fetch() 1470 return iter.Scan(dest...) 1471 } 1472 return false 1473 } 1474 1475 if iter.next != nil && iter.pos >= iter.next.pos { 1476 iter.next.fetchAsync() 1477 } 1478 1479 // currently only support scanning into an expand tuple, such that its the same 1480 // as scanning in more values from a single column 1481 if len(dest) != iter.meta.actualColCount { 1482 iter.err = fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount) 1483 return false 1484 } 1485 1486 // i is the current position in dest, could posible replace it and just use 1487 // slices of dest 1488 i := 0 1489 for _, col := range iter.meta.columns { 1490 colBytes, err := iter.readColumn() 1491 if err != nil { 1492 iter.err = err 1493 return false 1494 } 1495 1496 n, err := scanColumn(colBytes, col, dest[i:]) 1497 if err != nil { 1498 iter.err = err 1499 return false 1500 } 1501 i += n 1502 } 1503 1504 iter.pos++ 1505 return true 1506} 1507 1508// GetCustomPayload returns any parsed custom payload results if given in the 1509// response from Cassandra. Note that the result is not a copy. 1510// 1511// This additional feature of CQL Protocol v4 1512// allows additional results and query information to be returned by 1513// custom QueryHandlers running in your C* cluster. 1514// See https://datastax.github.io/java-driver/manual/custom_payloads/ 1515func (iter *Iter) GetCustomPayload() map[string][]byte { 1516 return iter.framer.customPayload 1517} 1518 1519// Warnings returns any warnings generated if given in the response from Cassandra. 1520// 1521// This is only available starting with CQL Protocol v4. 1522func (iter *Iter) Warnings() []string { 1523 if iter.framer != nil { 1524 return iter.framer.header.warnings 1525 } 1526 return nil 1527} 1528 1529// Close closes the iterator and returns any errors that happened during 1530// the query or the iteration. 1531func (iter *Iter) Close() error { 1532 if atomic.CompareAndSwapInt32(&iter.closed, 0, 1) { 1533 if iter.framer != nil { 1534 iter.framer = nil 1535 } 1536 } 1537 1538 return iter.err 1539} 1540 1541// WillSwitchPage detects if iterator reached end of current page 1542// and the next page is available. 1543func (iter *Iter) WillSwitchPage() bool { 1544 return iter.pos >= iter.numRows && iter.next != nil 1545} 1546 1547// checkErrAndNotFound handle error and NotFound in one method. 1548func (iter *Iter) checkErrAndNotFound() error { 1549 if iter.err != nil { 1550 return iter.err 1551 } else if iter.numRows == 0 { 1552 return ErrNotFound 1553 } 1554 return nil 1555} 1556 1557// PageState return the current paging state for a query which can be used for 1558// subsequent queries to resume paging this point. 1559func (iter *Iter) PageState() []byte { 1560 return iter.meta.pagingState 1561} 1562 1563// NumRows returns the number of rows in this pagination, it will update when new 1564// pages are fetched, it is not the value of the total number of rows this iter 1565// will return unless there is only a single page returned. 1566func (iter *Iter) NumRows() int { 1567 return iter.numRows 1568} 1569 1570// nextIter holds state for fetching a single page in an iterator. 1571// single page might be attempted multiple times due to retries. 1572type nextIter struct { 1573 qry *Query 1574 pos int 1575 oncea sync.Once 1576 once sync.Once 1577 next *Iter 1578} 1579 1580func (n *nextIter) fetchAsync() { 1581 n.oncea.Do(func() { 1582 go n.fetch() 1583 }) 1584} 1585 1586func (n *nextIter) fetch() *Iter { 1587 n.once.Do(func() { 1588 // if the query was specifically run on a connection then re-use that 1589 // connection when fetching the next results 1590 if n.qry.conn != nil { 1591 n.next = n.qry.conn.executeQuery(n.qry.Context(), n.qry) 1592 } else { 1593 n.next = n.qry.session.executeQuery(n.qry) 1594 } 1595 }) 1596 return n.next 1597} 1598 1599type Batch struct { 1600 Type BatchType 1601 Entries []BatchEntry 1602 Cons Consistency 1603 routingKey []byte 1604 CustomPayload map[string][]byte 1605 rt RetryPolicy 1606 spec SpeculativeExecutionPolicy 1607 observer BatchObserver 1608 session *Session 1609 serialCons SerialConsistency 1610 defaultTimestamp bool 1611 defaultTimestampValue int64 1612 context context.Context 1613 cancelBatch func() 1614 keyspace string 1615 metrics *queryMetrics 1616} 1617 1618// NewBatch creates a new batch operation without defaults from the cluster 1619// 1620// Deprecated: use session.NewBatch instead 1621func NewBatch(typ BatchType) *Batch { 1622 return &Batch{ 1623 Type: typ, 1624 metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, 1625 spec: &NonSpeculativeExecution{}, 1626 } 1627} 1628 1629// NewBatch creates a new batch operation using defaults defined in the cluster 1630func (s *Session) NewBatch(typ BatchType) *Batch { 1631 s.mu.RLock() 1632 batch := &Batch{ 1633 Type: typ, 1634 rt: s.cfg.RetryPolicy, 1635 serialCons: s.cfg.SerialConsistency, 1636 observer: s.batchObserver, 1637 session: s, 1638 Cons: s.cons, 1639 defaultTimestamp: s.cfg.DefaultTimestamp, 1640 keyspace: s.cfg.Keyspace, 1641 metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, 1642 spec: &NonSpeculativeExecution{}, 1643 } 1644 1645 s.mu.RUnlock() 1646 return batch 1647} 1648 1649// Observer enables batch-level observer on this batch. 1650// The provided observer will be called every time this batched query is executed. 1651func (b *Batch) Observer(observer BatchObserver) *Batch { 1652 b.observer = observer 1653 return b 1654} 1655 1656func (b *Batch) Keyspace() string { 1657 return b.keyspace 1658} 1659 1660// Attempts returns the number of attempts made to execute the batch. 1661func (b *Batch) Attempts() int { 1662 return b.metrics.attempts() 1663} 1664 1665func (b *Batch) AddAttempts(i int, host *HostInfo) { 1666 b.metrics.attempt(i, 0, host, false) 1667} 1668 1669//Latency returns the average number of nanoseconds to execute a single attempt of the batch. 1670func (b *Batch) Latency() int64 { 1671 return b.metrics.latency() 1672} 1673 1674func (b *Batch) AddLatency(l int64, host *HostInfo) { 1675 b.metrics.attempt(0, time.Duration(l)*time.Nanosecond, host, false) 1676} 1677 1678// GetConsistency returns the currently configured consistency level for the batch 1679// operation. 1680func (b *Batch) GetConsistency() Consistency { 1681 return b.Cons 1682} 1683 1684// SetConsistency sets the currently configured consistency level for the batch 1685// operation. 1686func (b *Batch) SetConsistency(c Consistency) { 1687 b.Cons = c 1688} 1689 1690func (b *Batch) Context() context.Context { 1691 if b.context == nil { 1692 return context.Background() 1693 } 1694 return b.context 1695} 1696 1697func (b *Batch) IsIdempotent() bool { 1698 for _, entry := range b.Entries { 1699 if !entry.Idempotent { 1700 return false 1701 } 1702 } 1703 return true 1704} 1705 1706func (b *Batch) speculativeExecutionPolicy() SpeculativeExecutionPolicy { 1707 return b.spec 1708} 1709 1710func (b *Batch) SpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Batch { 1711 b.spec = sp 1712 return b 1713} 1714 1715// Query adds the query to the batch operation 1716func (b *Batch) Query(stmt string, args ...interface{}) { 1717 b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args}) 1718} 1719 1720// Bind adds the query to the batch operation and correlates it with a binding callback 1721// that will be invoked when the batch is executed. The binding callback allows the application 1722// to define which query argument values will be marshalled as part of the batch execution. 1723func (b *Batch) Bind(stmt string, bind func(q *QueryInfo) ([]interface{}, error)) { 1724 b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, binding: bind}) 1725} 1726 1727func (b *Batch) retryPolicy() RetryPolicy { 1728 return b.rt 1729} 1730 1731// RetryPolicy sets the retry policy to use when executing the batch operation 1732func (b *Batch) RetryPolicy(r RetryPolicy) *Batch { 1733 b.rt = r 1734 return b 1735} 1736 1737func (b *Batch) withContext(ctx context.Context) ExecutableQuery { 1738 return b.WithContext(ctx) 1739} 1740 1741// WithContext returns a shallow copy of b with its context 1742// set to ctx. 1743// 1744// The provided context controls the entire lifetime of executing a 1745// query, queries will be canceled and return once the context is 1746// canceled. 1747func (b *Batch) WithContext(ctx context.Context) *Batch { 1748 b2 := *b 1749 b2.context = ctx 1750 return &b2 1751} 1752 1753// Deprecate: does nothing, cancel the context passed to WithContext 1754func (*Batch) Cancel() { 1755 // TODO: delete 1756} 1757 1758// Size returns the number of batch statements to be executed by the batch operation. 1759func (b *Batch) Size() int { 1760 return len(b.Entries) 1761} 1762 1763// SerialConsistency sets the consistency level for the 1764// serial phase of conditional updates. That consistency can only be 1765// either SERIAL or LOCAL_SERIAL and if not present, it defaults to 1766// SERIAL. This option will be ignored for anything else that a 1767// conditional update/insert. 1768// 1769// Only available for protocol 3 and above 1770func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch { 1771 b.serialCons = cons 1772 return b 1773} 1774 1775// DefaultTimestamp will enable the with default timestamp flag on the query. 1776// If enable, this will replace the server side assigned 1777// timestamp as default timestamp. Note that a timestamp in the query itself 1778// will still override this timestamp. This is entirely optional. 1779// 1780// Only available on protocol >= 3 1781func (b *Batch) DefaultTimestamp(enable bool) *Batch { 1782 b.defaultTimestamp = enable 1783 return b 1784} 1785 1786// WithTimestamp will enable the with default timestamp flag on the query 1787// like DefaultTimestamp does. But also allows to define value for timestamp. 1788// It works the same way as USING TIMESTAMP in the query itself, but 1789// should not break prepared query optimization. 1790// 1791// Only available on protocol >= 3 1792func (b *Batch) WithTimestamp(timestamp int64) *Batch { 1793 b.DefaultTimestamp(true) 1794 b.defaultTimestampValue = timestamp 1795 return b 1796} 1797 1798func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { 1799 latency := end.Sub(start) 1800 attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.observer != nil) 1801 1802 if b.observer == nil { 1803 return 1804 } 1805 1806 statements := make([]string, len(b.Entries)) 1807 for i, entry := range b.Entries { 1808 statements[i] = entry.Stmt 1809 } 1810 1811 b.observer.ObserveBatch(b.Context(), ObservedBatch{ 1812 Keyspace: keyspace, 1813 Statements: statements, 1814 Start: start, 1815 End: end, 1816 // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS 1817 Host: host, 1818 Metrics: metricsForHost, 1819 Err: iter.err, 1820 Attempt: attempt, 1821 }) 1822} 1823 1824func (b *Batch) GetRoutingKey() ([]byte, error) { 1825 if b.routingKey != nil { 1826 return b.routingKey, nil 1827 } 1828 1829 if len(b.Entries) == 0 { 1830 return nil, nil 1831 } 1832 1833 entry := b.Entries[0] 1834 if entry.binding != nil { 1835 // bindings do not have the values let's skip it like Query does. 1836 return nil, nil 1837 } 1838 // try to determine the routing key 1839 routingKeyInfo, err := b.session.routingKeyInfo(b.Context(), entry.Stmt) 1840 if err != nil { 1841 return nil, err 1842 } 1843 1844 return createRoutingKey(routingKeyInfo, entry.Args) 1845} 1846 1847func createRoutingKey(routingKeyInfo *routingKeyInfo, values []interface{}) ([]byte, error) { 1848 if routingKeyInfo == nil { 1849 return nil, nil 1850 } 1851 1852 if len(routingKeyInfo.indexes) == 1 { 1853 // single column routing key 1854 routingKey, err := Marshal( 1855 routingKeyInfo.types[0], 1856 values[routingKeyInfo.indexes[0]], 1857 ) 1858 if err != nil { 1859 return nil, err 1860 } 1861 return routingKey, nil 1862 } 1863 1864 // composite routing key 1865 buf := bytes.NewBuffer(make([]byte, 0, 256)) 1866 for i := range routingKeyInfo.indexes { 1867 encoded, err := Marshal( 1868 routingKeyInfo.types[i], 1869 values[routingKeyInfo.indexes[i]], 1870 ) 1871 if err != nil { 1872 return nil, err 1873 } 1874 lenBuf := []byte{0x00, 0x00} 1875 binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded))) 1876 buf.Write(lenBuf) 1877 buf.Write(encoded) 1878 buf.WriteByte(0x00) 1879 } 1880 routingKey := buf.Bytes() 1881 return routingKey, nil 1882} 1883 1884type BatchType byte 1885 1886const ( 1887 LoggedBatch BatchType = 0 1888 UnloggedBatch BatchType = 1 1889 CounterBatch BatchType = 2 1890) 1891 1892type BatchEntry struct { 1893 Stmt string 1894 Args []interface{} 1895 Idempotent bool 1896 binding func(q *QueryInfo) ([]interface{}, error) 1897} 1898 1899type ColumnInfo struct { 1900 Keyspace string 1901 Table string 1902 Name string 1903 TypeInfo TypeInfo 1904} 1905 1906func (c ColumnInfo) String() string { 1907 return fmt.Sprintf("[column keyspace=%s table=%s name=%s type=%v]", c.Keyspace, c.Table, c.Name, c.TypeInfo) 1908} 1909 1910// routing key indexes LRU cache 1911type routingKeyInfoLRU struct { 1912 lru *lru.Cache 1913 mu sync.Mutex 1914} 1915 1916type routingKeyInfo struct { 1917 indexes []int 1918 types []TypeInfo 1919} 1920 1921func (r *routingKeyInfo) String() string { 1922 return fmt.Sprintf("routing key index=%v types=%v", r.indexes, r.types) 1923} 1924 1925func (r *routingKeyInfoLRU) Remove(key string) { 1926 r.mu.Lock() 1927 r.lru.Remove(key) 1928 r.mu.Unlock() 1929} 1930 1931//Max adjusts the maximum size of the cache and cleans up the oldest records if 1932//the new max is lower than the previous value. Not concurrency safe. 1933func (r *routingKeyInfoLRU) Max(max int) { 1934 r.mu.Lock() 1935 for r.lru.Len() > max { 1936 r.lru.RemoveOldest() 1937 } 1938 r.lru.MaxEntries = max 1939 r.mu.Unlock() 1940} 1941 1942type inflightCachedEntry struct { 1943 wg sync.WaitGroup 1944 err error 1945 value interface{} 1946} 1947 1948// Tracer is the interface implemented by query tracers. Tracers have the 1949// ability to obtain a detailed event log of all events that happened during 1950// the execution of a query from Cassandra. Gathering this information might 1951// be essential for debugging and optimizing queries, but this feature should 1952// not be used on production systems with very high load. 1953type Tracer interface { 1954 Trace(traceId []byte) 1955} 1956 1957type traceWriter struct { 1958 session *Session 1959 w io.Writer 1960 mu sync.Mutex 1961} 1962 1963// NewTraceWriter returns a simple Tracer implementation that outputs 1964// the event log in a textual format. 1965func NewTraceWriter(session *Session, w io.Writer) Tracer { 1966 return &traceWriter{session: session, w: w} 1967} 1968 1969func (t *traceWriter) Trace(traceId []byte) { 1970 var ( 1971 coordinator string 1972 duration int 1973 ) 1974 iter := t.session.control.query(`SELECT coordinator, duration 1975 FROM system_traces.sessions 1976 WHERE session_id = ?`, traceId) 1977 1978 iter.Scan(&coordinator, &duration) 1979 if err := iter.Close(); err != nil { 1980 t.mu.Lock() 1981 fmt.Fprintln(t.w, "Error:", err) 1982 t.mu.Unlock() 1983 return 1984 } 1985 1986 var ( 1987 timestamp time.Time 1988 activity string 1989 source string 1990 elapsed int 1991 ) 1992 1993 t.mu.Lock() 1994 defer t.mu.Unlock() 1995 1996 fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n", 1997 traceId, coordinator, time.Duration(duration)*time.Microsecond) 1998 1999 iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed 2000 FROM system_traces.events 2001 WHERE session_id = ?`, traceId) 2002 2003 for iter.Scan(×tamp, &activity, &source, &elapsed) { 2004 fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n", 2005 timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed) 2006 } 2007 2008 if err := iter.Close(); err != nil { 2009 fmt.Fprintln(t.w, "Error:", err) 2010 } 2011} 2012 2013type ObservedQuery struct { 2014 Keyspace string 2015 Statement string 2016 2017 Start time.Time // time immediately before the query was called 2018 End time.Time // time immediately after the query returned 2019 2020 // Rows is the number of rows in the current iter. 2021 // In paginated queries, rows from previous scans are not counted. 2022 // Rows is not used in batch queries and remains at the default value 2023 Rows int 2024 2025 // Host is the informations about the host that performed the query 2026 Host *HostInfo 2027 2028 // The metrics per this host 2029 Metrics *hostMetrics 2030 2031 // Err is the error in the query. 2032 // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error 2033 Err error 2034 2035 // Attempt is the index of attempt at executing this query. 2036 // The first attempt is number zero and any retries have non-zero attempt number. 2037 Attempt int 2038} 2039 2040// QueryObserver is the interface implemented by query observers / stat collectors. 2041// 2042// Experimental, this interface and use may change 2043type QueryObserver interface { 2044 // ObserveQuery gets called on every query to cassandra, including all queries in an iterator when paging is enabled. 2045 // It doesn't get called if there is no query because the session is closed or there are no connections available. 2046 // The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil. 2047 ObserveQuery(context.Context, ObservedQuery) 2048} 2049 2050type ObservedBatch struct { 2051 Keyspace string 2052 Statements []string 2053 2054 Start time.Time // time immediately before the batch query was called 2055 End time.Time // time immediately after the batch query returned 2056 2057 // Host is the informations about the host that performed the batch 2058 Host *HostInfo 2059 2060 // Err is the error in the batch query. 2061 // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error 2062 Err error 2063 2064 // The metrics per this host 2065 Metrics *hostMetrics 2066 2067 // Attempt is the index of attempt at executing this query. 2068 // The first attempt is number zero and any retries have non-zero attempt number. 2069 Attempt int 2070} 2071 2072// BatchObserver is the interface implemented by batch observers / stat collectors. 2073type BatchObserver interface { 2074 // ObserveBatch gets called on every batch query to cassandra. 2075 // It also gets called once for each query in a batch. 2076 // It doesn't get called if there is no query because the session is closed or there are no connections available. 2077 // The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil. 2078 // Unlike QueryObserver.ObserveQuery it does no reporting on rows read. 2079 ObserveBatch(context.Context, ObservedBatch) 2080} 2081 2082type ObservedConnect struct { 2083 // Host is the information about the host about to connect 2084 Host *HostInfo 2085 2086 Start time.Time // time immediately before the dial is called 2087 End time.Time // time immediately after the dial returned 2088 2089 // Err is the connection error (if any) 2090 Err error 2091} 2092 2093// ConnectObserver is the interface implemented by connect observers / stat collectors. 2094type ConnectObserver interface { 2095 // ObserveConnect gets called when a new connection to cassandra is made. 2096 ObserveConnect(ObservedConnect) 2097} 2098 2099type Error struct { 2100 Code int 2101 Message string 2102} 2103 2104func (e Error) Error() string { 2105 return e.Message 2106} 2107 2108var ( 2109 ErrNotFound = errors.New("not found") 2110 ErrUnavailable = errors.New("unavailable") 2111 ErrUnsupported = errors.New("feature not supported") 2112 ErrTooManyStmts = errors.New("too many statements") 2113 ErrUseStmt = errors.New("use statements aren't supported. Please see https://github.com/gocql/gocql for explanation.") 2114 ErrSessionClosed = errors.New("session has been closed") 2115 ErrNoConnections = errors.New("gocql: no hosts available in the pool") 2116 ErrNoKeyspace = errors.New("no keyspace provided") 2117 ErrKeyspaceDoesNotExist = errors.New("keyspace does not exist") 2118 ErrNoMetadata = errors.New("no metadata available") 2119) 2120 2121type ErrProtocol struct{ error } 2122 2123func NewErrProtocol(format string, args ...interface{}) error { 2124 return ErrProtocol{fmt.Errorf(format, args...)} 2125} 2126 2127// BatchSizeMaximum is the maximum number of statements a batch operation can have. 2128// This limit is set by cassandra and could change in the future. 2129const BatchSizeMaximum = 65535 2130