1// Copyright (c) 2012 The gocql Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package gocql
6
7import (
8	"bytes"
9	"context"
10	"encoding/binary"
11	"errors"
12	"fmt"
13	"io"
14	"net"
15	"strings"
16	"sync"
17	"sync/atomic"
18	"time"
19	"unicode"
20
21	"github.com/gocql/gocql/internal/lru"
22)
23
24// Session is the interface used by users to interact with the database.
25//
26// It's safe for concurrent use by multiple goroutines and a typical usage
27// scenario is to have one global session object to interact with the
28// whole Cassandra cluster.
29//
30// This type extends the Node interface by adding a convenient query builder
31// and automatically sets a default consistency level on all operations
32// that do not have a consistency level set.
33type Session struct {
34	cons                Consistency
35	pageSize            int
36	prefetch            float64
37	routingKeyInfoCache routingKeyInfoLRU
38	schemaDescriber     *schemaDescriber
39	trace               Tracer
40	queryObserver       QueryObserver
41	batchObserver       BatchObserver
42	connectObserver     ConnectObserver
43	frameObserver       FrameHeaderObserver
44	hostSource          *ringDescriber
45	stmtsLRU            *preparedLRU
46
47	connCfg *ConnConfig
48
49	executor *queryExecutor
50	pool     *policyConnPool
51	policy   HostSelectionPolicy
52
53	ring     ring
54	metadata clusterMetadata
55
56	mu sync.RWMutex
57
58	control *controlConn
59
60	// event handlers
61	nodeEvents   *eventDebouncer
62	schemaEvents *eventDebouncer
63
64	// ring metadata
65	useSystemSchema           bool
66	hasAggregatesAndFunctions bool
67
68	cfg ClusterConfig
69
70	ctx    context.Context
71	cancel context.CancelFunc
72
73	closeMu  sync.RWMutex
74	isClosed bool
75}
76
77var queryPool = &sync.Pool{
78	New: func() interface{} {
79		return new(Query)
80	},
81}
82
83func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) {
84	var hosts []*HostInfo
85	for _, hostport := range addrs {
86		resolvedHosts, err := hostInfo(hostport, defaultPort)
87		if err != nil {
88			// Try other hosts if unable to resolve DNS name
89			if _, ok := err.(*net.DNSError); ok {
90				Logger.Printf("gocql: dns error: %v\n", err)
91				continue
92			}
93			return nil, err
94		}
95
96		hosts = append(hosts, resolvedHosts...)
97	}
98	if len(hosts) == 0 {
99		return nil, errors.New("failed to resolve any of the provided hostnames")
100	}
101	return hosts, nil
102}
103
104// NewSession wraps an existing Node.
105func NewSession(cfg ClusterConfig) (*Session, error) {
106	// Check that hosts in the ClusterConfig is not empty
107	if len(cfg.Hosts) < 1 {
108		return nil, ErrNoHosts
109	}
110
111	// Check that either Authenticator is set or AuthProvider, not both
112	if cfg.Authenticator != nil && cfg.AuthProvider != nil {
113		return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
114	}
115
116	// TODO: we should take a context in here at some point
117	ctx, cancel := context.WithCancel(context.TODO())
118
119	s := &Session{
120		cons:            cfg.Consistency,
121		prefetch:        0.25,
122		cfg:             cfg,
123		pageSize:        cfg.PageSize,
124		stmtsLRU:        &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
125		connectObserver: cfg.ConnectObserver,
126		ctx:             ctx,
127		cancel:          cancel,
128	}
129
130	s.schemaDescriber = newSchemaDescriber(s)
131
132	s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent)
133	s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent)
134
135	s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
136
137	s.hostSource = &ringDescriber{session: s}
138
139	if cfg.PoolConfig.HostSelectionPolicy == nil {
140		cfg.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
141	}
142	s.pool = cfg.PoolConfig.buildPool(s)
143
144	s.policy = cfg.PoolConfig.HostSelectionPolicy
145	s.policy.Init(s)
146
147	s.executor = &queryExecutor{
148		pool:   s.pool,
149		policy: cfg.PoolConfig.HostSelectionPolicy,
150	}
151
152	s.queryObserver = cfg.QueryObserver
153	s.batchObserver = cfg.BatchObserver
154	s.connectObserver = cfg.ConnectObserver
155	s.frameObserver = cfg.FrameHeaderObserver
156
157	//Check the TLS Config before trying to connect to anything external
158	connCfg, err := connConfig(&s.cfg)
159	if err != nil {
160		//TODO: Return a typed error
161		return nil, fmt.Errorf("gocql: unable to create session: %v", err)
162	}
163	s.connCfg = connCfg
164
165	if err := s.init(); err != nil {
166		s.Close()
167		if err == ErrNoConnectionsStarted {
168			//This error used to be generated inside NewSession & returned directly
169			//Forward it on up to be backwards compatible
170			return nil, ErrNoConnectionsStarted
171		} else {
172			// TODO(zariel): dont wrap this error in fmt.Errorf, return a typed error
173			return nil, fmt.Errorf("gocql: unable to create session: %v", err)
174		}
175	}
176
177	return s, nil
178}
179
180func (s *Session) init() error {
181	hosts, err := addrsToHosts(s.cfg.Hosts, s.cfg.Port)
182	if err != nil {
183		return err
184	}
185	s.ring.endpoints = hosts
186
187	if !s.cfg.disableControlConn {
188		s.control = createControlConn(s)
189		if s.cfg.ProtoVersion == 0 {
190			proto, err := s.control.discoverProtocol(hosts)
191			if err != nil {
192				return fmt.Errorf("unable to discover protocol version: %v", err)
193			} else if proto == 0 {
194				return errors.New("unable to discovery protocol version")
195			}
196
197			// TODO(zariel): we really only need this in 1 place
198			s.cfg.ProtoVersion = proto
199			s.connCfg.ProtoVersion = proto
200		}
201
202		if err := s.control.connect(hosts); err != nil {
203			return err
204		}
205
206		if !s.cfg.DisableInitialHostLookup {
207			var partitioner string
208			newHosts, partitioner, err := s.hostSource.GetHosts()
209			if err != nil {
210				return err
211			}
212			s.policy.SetPartitioner(partitioner)
213			filteredHosts := make([]*HostInfo, 0, len(newHosts))
214			for _, host := range newHosts {
215				if !s.cfg.filterHost(host) {
216					filteredHosts = append(filteredHosts, host)
217				}
218			}
219			hosts = append(hosts, filteredHosts...)
220		}
221	}
222
223	hostMap := make(map[string]*HostInfo, len(hosts))
224	for _, host := range hosts {
225		hostMap[host.ConnectAddress().String()] = host
226	}
227
228	hosts = hosts[:0]
229	// each host will increment left and decrement it after connecting and once
230	// there's none left, we'll close hostCh
231	var left int64
232	// we will receive up to len(hostMap) of messages so create a buffer so we
233	// don't end up stuck in a goroutine if we stopped listening
234	connectedCh := make(chan struct{}, len(hostMap))
235	// we add one here because we don't want to end up closing hostCh until we're
236	// done looping and the decerement code might be reached before we've looped
237	// again
238	atomic.AddInt64(&left, 1)
239	for _, host := range hostMap {
240		host := s.ring.addOrUpdate(host)
241		if s.cfg.filterHost(host) {
242			continue
243		}
244
245		atomic.AddInt64(&left, 1)
246		go func() {
247			s.pool.addHost(host)
248			connectedCh <- struct{}{}
249
250			// if there are no hosts left, then close the hostCh to unblock the loop
251			// below if its still waiting
252			if atomic.AddInt64(&left, -1) == 0 {
253				close(connectedCh)
254			}
255		}()
256
257		hosts = append(hosts, host)
258	}
259	// once we're done looping we subtract the one we initially added and check
260	// to see if we should close
261	if atomic.AddInt64(&left, -1) == 0 {
262		close(connectedCh)
263	}
264
265	// before waiting for them to connect, add them all to the policy so we can
266	// utilize efficiencies by calling AddHosts if the policy supports it
267	type bulkAddHosts interface {
268		AddHosts([]*HostInfo)
269	}
270	if v, ok := s.policy.(bulkAddHosts); ok {
271		v.AddHosts(hosts)
272	} else {
273		for _, host := range hosts {
274			s.policy.AddHost(host)
275		}
276	}
277
278	readyPolicy, _ := s.policy.(ReadyPolicy)
279	// now loop over connectedCh until it's closed (meaning we've connected to all)
280	// or until the policy says we're ready
281	for range connectedCh {
282		if readyPolicy != nil && readyPolicy.Ready() {
283			break
284		}
285	}
286
287	// TODO(zariel): we probably dont need this any more as we verify that we
288	// can connect to one of the endpoints supplied by using the control conn.
289	// See if there are any connections in the pool
290	if s.cfg.ReconnectInterval > 0 {
291		go s.reconnectDownedHosts(s.cfg.ReconnectInterval)
292	}
293
294	// If we disable the initial host lookup, we need to still check if the
295	// cluster is using the newer system schema or not... however, if control
296	// connection is disable, we really have no choice, so we just make our
297	// best guess...
298	if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup {
299		newer, _ := checkSystemSchema(s.control)
300		s.useSystemSchema = newer
301	} else {
302		version := s.ring.rrHost().Version()
303		s.useSystemSchema = version.AtLeast(3, 0, 0)
304		s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0)
305	}
306
307	if s.pool.Size() == 0 {
308		return ErrNoConnectionsStarted
309	}
310
311	// Invoke KeyspaceChanged to let the policy cache the session keyspace
312	// parameters. This is used by tokenAwareHostPolicy to discover replicas.
313	if !s.cfg.disableControlConn && s.cfg.Keyspace != "" {
314		s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: s.cfg.Keyspace})
315	}
316
317	return nil
318}
319
320// AwaitSchemaAgreement will wait until schema versions across all nodes in the
321// cluster are the same (as seen from the point of view of the control connection).
322// The maximum amount of time this takes is governed
323// by the MaxWaitSchemaAgreement setting in the configuration (default: 60s).
324// AwaitSchemaAgreement returns an error in case schema versions are not the same
325// after the timeout specified in MaxWaitSchemaAgreement elapses.
326func (s *Session) AwaitSchemaAgreement(ctx context.Context) error {
327	if s.cfg.disableControlConn {
328		return errNoControl
329	}
330	return s.control.withConn(func(conn *Conn) *Iter {
331		return &Iter{err: conn.awaitSchemaAgreement(ctx)}
332	}).err
333}
334
335func (s *Session) reconnectDownedHosts(intv time.Duration) {
336	reconnectTicker := time.NewTicker(intv)
337	defer reconnectTicker.Stop()
338
339	for {
340		select {
341		case <-reconnectTicker.C:
342			hosts := s.ring.allHosts()
343
344			// Print session.ring for debug.
345			if gocqlDebug {
346				buf := bytes.NewBufferString("Session.ring:")
347				for _, h := range hosts {
348					buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]")
349				}
350				Logger.Println(buf.String())
351			}
352
353			for _, h := range hosts {
354				if h.IsUp() {
355					continue
356				}
357				// we let the pool call handleNodeUp to change the host state
358				s.pool.addHost(h)
359			}
360		case <-s.ctx.Done():
361			return
362		}
363	}
364}
365
366// SetConsistency sets the default consistency level for this session. This
367// setting can also be changed on a per-query basis and the default value
368// is Quorum.
369func (s *Session) SetConsistency(cons Consistency) {
370	s.mu.Lock()
371	s.cons = cons
372	s.mu.Unlock()
373}
374
375// SetPageSize sets the default page size for this session. A value <= 0 will
376// disable paging. This setting can also be changed on a per-query basis.
377func (s *Session) SetPageSize(n int) {
378	s.mu.Lock()
379	s.pageSize = n
380	s.mu.Unlock()
381}
382
383// SetPrefetch sets the default threshold for pre-fetching new pages. If
384// there are only p*pageSize rows remaining, the next page will be requested
385// automatically. This value can also be changed on a per-query basis and
386// the default value is 0.25.
387func (s *Session) SetPrefetch(p float64) {
388	s.mu.Lock()
389	s.prefetch = p
390	s.mu.Unlock()
391}
392
393// SetTrace sets the default tracer for this session. This setting can also
394// be changed on a per-query basis.
395func (s *Session) SetTrace(trace Tracer) {
396	s.mu.Lock()
397	s.trace = trace
398	s.mu.Unlock()
399}
400
401// Query generates a new query object for interacting with the database.
402// Further details of the query may be tweaked using the resulting query
403// value before the query is executed. Query is automatically prepared
404// if it has not previously been executed.
405func (s *Session) Query(stmt string, values ...interface{}) *Query {
406	qry := queryPool.Get().(*Query)
407	qry.session = s
408	qry.stmt = stmt
409	qry.values = values
410	qry.defaultsFromSession()
411	return qry
412}
413
414type QueryInfo struct {
415	Id          []byte
416	Args        []ColumnInfo
417	Rval        []ColumnInfo
418	PKeyColumns []int
419}
420
421// Bind generates a new query object based on the query statement passed in.
422// The query is automatically prepared if it has not previously been executed.
423// The binding callback allows the application to define which query argument
424// values will be marshalled as part of the query execution.
425// During execution, the meta data of the prepared query will be routed to the
426// binding callback, which is responsible for producing the query argument values.
427func (s *Session) Bind(stmt string, b func(q *QueryInfo) ([]interface{}, error)) *Query {
428	qry := queryPool.Get().(*Query)
429	qry.session = s
430	qry.stmt = stmt
431	qry.binding = b
432	qry.defaultsFromSession()
433	return qry
434}
435
436// Close closes all connections. The session is unusable after this
437// operation.
438func (s *Session) Close() {
439
440	s.closeMu.Lock()
441	defer s.closeMu.Unlock()
442	if s.isClosed {
443		return
444	}
445	s.isClosed = true
446
447	if s.pool != nil {
448		s.pool.Close()
449	}
450
451	if s.control != nil {
452		s.control.close()
453	}
454
455	if s.nodeEvents != nil {
456		s.nodeEvents.stop()
457	}
458
459	if s.schemaEvents != nil {
460		s.schemaEvents.stop()
461	}
462
463	if s.cancel != nil {
464		s.cancel()
465	}
466}
467
468func (s *Session) Closed() bool {
469	s.closeMu.RLock()
470	closed := s.isClosed
471	s.closeMu.RUnlock()
472	return closed
473}
474
475func (s *Session) executeQuery(qry *Query) (it *Iter) {
476	// fail fast
477	if s.Closed() {
478		return &Iter{err: ErrSessionClosed}
479	}
480
481	iter, err := s.executor.executeQuery(qry)
482	if err != nil {
483		return &Iter{err: err}
484	}
485	if iter == nil {
486		panic("nil iter")
487	}
488
489	return iter
490}
491
492func (s *Session) removeHost(h *HostInfo) {
493	s.policy.RemoveHost(h)
494	s.pool.removeHost(h.ConnectAddress())
495	s.ring.removeHost(h.ConnectAddress())
496}
497
498// KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist.
499func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
500	// fail fast
501	if s.Closed() {
502		return nil, ErrSessionClosed
503	} else if keyspace == "" {
504		return nil, ErrNoKeyspace
505	}
506
507	return s.schemaDescriber.getSchema(keyspace)
508}
509
510func (s *Session) getConn() *Conn {
511	hosts := s.ring.allHosts()
512	for _, host := range hosts {
513		if !host.IsUp() {
514			continue
515		}
516
517		pool, ok := s.pool.getPool(host)
518		if !ok {
519			continue
520		} else if conn := pool.Pick(); conn != nil {
521			return conn
522		}
523	}
524
525	return nil
526}
527
528// returns routing key indexes and type info
529func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyInfo, error) {
530	s.routingKeyInfoCache.mu.Lock()
531
532	entry, cached := s.routingKeyInfoCache.lru.Get(stmt)
533	if cached {
534		// done accessing the cache
535		s.routingKeyInfoCache.mu.Unlock()
536		// the entry is an inflight struct similar to that used by
537		// Conn to prepare statements
538		inflight := entry.(*inflightCachedEntry)
539
540		// wait for any inflight work
541		inflight.wg.Wait()
542
543		if inflight.err != nil {
544			return nil, inflight.err
545		}
546
547		key, _ := inflight.value.(*routingKeyInfo)
548
549		return key, nil
550	}
551
552	// create a new inflight entry while the data is created
553	inflight := new(inflightCachedEntry)
554	inflight.wg.Add(1)
555	defer inflight.wg.Done()
556	s.routingKeyInfoCache.lru.Add(stmt, inflight)
557	s.routingKeyInfoCache.mu.Unlock()
558
559	var (
560		info         *preparedStatment
561		partitionKey []*ColumnMetadata
562	)
563
564	conn := s.getConn()
565	if conn == nil {
566		// TODO: better error?
567		inflight.err = errors.New("gocql: unable to fetch prepared info: no connection available")
568		return nil, inflight.err
569	}
570
571	// get the query info for the statement
572	info, inflight.err = conn.prepareStatement(ctx, stmt, nil)
573	if inflight.err != nil {
574		// don't cache this error
575		s.routingKeyInfoCache.Remove(stmt)
576		return nil, inflight.err
577	}
578
579	// TODO: it would be nice to mark hosts here but as we are not using the policies
580	// to fetch hosts we cant
581
582	if info.request.colCount == 0 {
583		// no arguments, no routing key, and no error
584		return nil, nil
585	}
586
587	if len(info.request.pkeyColumns) > 0 {
588		// proto v4 dont need to calculate primary key columns
589		types := make([]TypeInfo, len(info.request.pkeyColumns))
590		for i, col := range info.request.pkeyColumns {
591			types[i] = info.request.columns[col].TypeInfo
592		}
593
594		routingKeyInfo := &routingKeyInfo{
595			indexes: info.request.pkeyColumns,
596			types:   types,
597		}
598
599		inflight.value = routingKeyInfo
600		return routingKeyInfo, nil
601	}
602
603	// get the table metadata
604	table := info.request.columns[0].Table
605
606	var keyspaceMetadata *KeyspaceMetadata
607	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace)
608	if inflight.err != nil {
609		// don't cache this error
610		s.routingKeyInfoCache.Remove(stmt)
611		return nil, inflight.err
612	}
613
614	tableMetadata, found := keyspaceMetadata.Tables[table]
615	if !found {
616		// unlikely that the statement could be prepared and the metadata for
617		// the table couldn't be found, but this may indicate either a bug
618		// in the metadata code, or that the table was just dropped.
619		inflight.err = ErrNoMetadata
620		// don't cache this error
621		s.routingKeyInfoCache.Remove(stmt)
622		return nil, inflight.err
623	}
624
625	partitionKey = tableMetadata.PartitionKey
626
627	size := len(partitionKey)
628	routingKeyInfo := &routingKeyInfo{
629		indexes: make([]int, size),
630		types:   make([]TypeInfo, size),
631	}
632
633	for keyIndex, keyColumn := range partitionKey {
634		// set an indicator for checking if the mapping is missing
635		routingKeyInfo.indexes[keyIndex] = -1
636
637		// find the column in the query info
638		for argIndex, boundColumn := range info.request.columns {
639			if keyColumn.Name == boundColumn.Name {
640				// there may be many such bound columns, pick the first
641				routingKeyInfo.indexes[keyIndex] = argIndex
642				routingKeyInfo.types[keyIndex] = boundColumn.TypeInfo
643				break
644			}
645		}
646
647		if routingKeyInfo.indexes[keyIndex] == -1 {
648			// missing a routing key column mapping
649			// no routing key, and no error
650			return nil, nil
651		}
652	}
653
654	// cache this result
655	inflight.value = routingKeyInfo
656
657	return routingKeyInfo, nil
658}
659
660func (b *Batch) execute(ctx context.Context, conn *Conn) *Iter {
661	return conn.executeBatch(ctx, b)
662}
663
664func (s *Session) executeBatch(batch *Batch) *Iter {
665	// fail fast
666	if s.Closed() {
667		return &Iter{err: ErrSessionClosed}
668	}
669
670	// Prevent the execution of the batch if greater than the limit
671	// Currently batches have a limit of 65536 queries.
672	// https://datastax-oss.atlassian.net/browse/JAVA-229
673	if batch.Size() > BatchSizeMaximum {
674		return &Iter{err: ErrTooManyStmts}
675	}
676
677	iter, err := s.executor.executeQuery(batch)
678	if err != nil {
679		return &Iter{err: err}
680	}
681
682	return iter
683}
684
685// ExecuteBatch executes a batch operation and returns nil if successful
686// otherwise an error is returned describing the failure.
687func (s *Session) ExecuteBatch(batch *Batch) error {
688	iter := s.executeBatch(batch)
689	return iter.Close()
690}
691
692// ExecuteBatchCAS executes a batch operation and returns true if successful and
693// an iterator (to scan additional rows if more than one conditional statement)
694// was sent.
695// Further scans on the interator must also remember to include
696// the applied boolean as the first argument to *Iter.Scan
697func (s *Session) ExecuteBatchCAS(batch *Batch, dest ...interface{}) (applied bool, iter *Iter, err error) {
698	iter = s.executeBatch(batch)
699	if err := iter.checkErrAndNotFound(); err != nil {
700		iter.Close()
701		return false, nil, err
702	}
703
704	if len(iter.Columns()) > 1 {
705		dest = append([]interface{}{&applied}, dest...)
706		iter.Scan(dest...)
707	} else {
708		iter.Scan(&applied)
709	}
710
711	return applied, iter, nil
712}
713
714// MapExecuteBatchCAS executes a batch operation much like ExecuteBatchCAS,
715// however it accepts a map rather than a list of arguments for the initial
716// scan.
717func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) (applied bool, iter *Iter, err error) {
718	iter = s.executeBatch(batch)
719	if err := iter.checkErrAndNotFound(); err != nil {
720		iter.Close()
721		return false, nil, err
722	}
723	iter.MapScan(dest)
724	applied = dest["[applied]"].(bool)
725	delete(dest, "[applied]")
726
727	// we usually close here, but instead of closing, just returin an error
728	// if MapScan failed. Although Close just returns err, using Close
729	// here might be confusing as we are not actually closing the iter
730	return applied, iter, iter.err
731}
732
733type hostMetrics struct {
734	// Attempts is count of how many times this query has been attempted for this host.
735	// An attempt is either a retry or fetching next page of results.
736	Attempts int
737
738	// TotalLatency is the sum of attempt latencies for this host in nanoseconds.
739	TotalLatency int64
740}
741
742type queryMetrics struct {
743	l sync.RWMutex
744	m map[string]*hostMetrics
745	// totalAttempts is total number of attempts.
746	// Equal to sum of all hostMetrics' Attempts.
747	totalAttempts int
748}
749
750// preFilledQueryMetrics initializes new queryMetrics based on per-host supplied data.
751func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics {
752	qm := &queryMetrics{m: m}
753	for _, hm := range qm.m {
754		qm.totalAttempts += hm.Attempts
755	}
756	return qm
757}
758
759// hostMetrics returns a snapshot of metrics for given host.
760// If the metrics for host don't exist, they are created.
761func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics {
762	qm.l.Lock()
763	metrics := qm.hostMetricsLocked(host)
764	copied := new(hostMetrics)
765	*copied = *metrics
766	qm.l.Unlock()
767	return copied
768}
769
770// hostMetricsLocked gets or creates host metrics for given host.
771// It must be called only while holding qm.l lock.
772func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics {
773	metrics, exists := qm.m[host.ConnectAddress().String()]
774	if !exists {
775		// if the host is not in the map, it means it's been accessed for the first time
776		metrics = &hostMetrics{}
777		qm.m[host.ConnectAddress().String()] = metrics
778	}
779
780	return metrics
781}
782
783// attempts returns the number of times the query was executed.
784func (qm *queryMetrics) attempts() int {
785	qm.l.Lock()
786	attempts := qm.totalAttempts
787	qm.l.Unlock()
788	return attempts
789}
790
791func (qm *queryMetrics) latency() int64 {
792	qm.l.Lock()
793	var (
794		attempts int
795		latency  int64
796	)
797	for _, metric := range qm.m {
798		attempts += metric.Attempts
799		latency += metric.TotalLatency
800	}
801	qm.l.Unlock()
802	if attempts > 0 {
803		return latency / int64(attempts)
804	}
805	return 0
806}
807
808// attempt adds given number of attempts and latency for given host.
809// It returns previous total attempts.
810// If needsHostMetrics is true, a copy of updated hostMetrics is returned.
811func (qm *queryMetrics) attempt(addAttempts int, addLatency time.Duration,
812	host *HostInfo, needsHostMetrics bool) (int, *hostMetrics) {
813	qm.l.Lock()
814
815	totalAttempts := qm.totalAttempts
816	qm.totalAttempts += addAttempts
817
818	updateHostMetrics := qm.hostMetricsLocked(host)
819	updateHostMetrics.Attempts += addAttempts
820	updateHostMetrics.TotalLatency += addLatency.Nanoseconds()
821
822	var hostMetricsCopy *hostMetrics
823	if needsHostMetrics {
824		hostMetricsCopy = new(hostMetrics)
825		*hostMetricsCopy = *updateHostMetrics
826	}
827
828	qm.l.Unlock()
829	return totalAttempts, hostMetricsCopy
830}
831
832// Query represents a CQL statement that can be executed.
833type Query struct {
834	stmt                  string
835	values                []interface{}
836	cons                  Consistency
837	pageSize              int
838	routingKey            []byte
839	pageState             []byte
840	prefetch              float64
841	trace                 Tracer
842	observer              QueryObserver
843	session               *Session
844	conn                  *Conn
845	rt                    RetryPolicy
846	spec                  SpeculativeExecutionPolicy
847	binding               func(q *QueryInfo) ([]interface{}, error)
848	serialCons            SerialConsistency
849	defaultTimestamp      bool
850	defaultTimestampValue int64
851	disableSkipMetadata   bool
852	context               context.Context
853	idempotent            bool
854	customPayload         map[string][]byte
855	metrics               *queryMetrics
856
857	disableAutoPage bool
858
859	// getKeyspace is field so that it can be overriden in tests
860	getKeyspace func() string
861
862	// used by control conn queries to prevent triggering a write to systems
863	// tables in AWS MCS see
864	skipPrepare bool
865}
866
867func (q *Query) defaultsFromSession() {
868	s := q.session
869
870	s.mu.RLock()
871	q.cons = s.cons
872	q.pageSize = s.pageSize
873	q.trace = s.trace
874	q.observer = s.queryObserver
875	q.prefetch = s.prefetch
876	q.rt = s.cfg.RetryPolicy
877	q.serialCons = s.cfg.SerialConsistency
878	q.defaultTimestamp = s.cfg.DefaultTimestamp
879	q.idempotent = s.cfg.DefaultIdempotence
880	q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
881
882	q.spec = &NonSpeculativeExecution{}
883	s.mu.RUnlock()
884}
885
886// Statement returns the statement that was used to generate this query.
887func (q Query) Statement() string {
888	return q.stmt
889}
890
891// String implements the stringer interface.
892func (q Query) String() string {
893	return fmt.Sprintf("[query statement=%q values=%+v consistency=%s]", q.stmt, q.values, q.cons)
894}
895
896//Attempts returns the number of times the query was executed.
897func (q *Query) Attempts() int {
898	return q.metrics.attempts()
899}
900
901func (q *Query) AddAttempts(i int, host *HostInfo) {
902	q.metrics.attempt(i, 0, host, false)
903}
904
905//Latency returns the average amount of nanoseconds per attempt of the query.
906func (q *Query) Latency() int64 {
907	return q.metrics.latency()
908}
909
910func (q *Query) AddLatency(l int64, host *HostInfo) {
911	q.metrics.attempt(0, time.Duration(l)*time.Nanosecond, host, false)
912}
913
914// Consistency sets the consistency level for this query. If no consistency
915// level have been set, the default consistency level of the cluster
916// is used.
917func (q *Query) Consistency(c Consistency) *Query {
918	q.cons = c
919	return q
920}
921
922// GetConsistency returns the currently configured consistency level for
923// the query.
924func (q *Query) GetConsistency() Consistency {
925	return q.cons
926}
927
928// Same as Consistency but without a return value
929func (q *Query) SetConsistency(c Consistency) {
930	q.cons = c
931}
932
933// CustomPayload sets the custom payload level for this query.
934func (q *Query) CustomPayload(customPayload map[string][]byte) *Query {
935	q.customPayload = customPayload
936	return q
937}
938
939func (q *Query) Context() context.Context {
940	if q.context == nil {
941		return context.Background()
942	}
943	return q.context
944}
945
946// Trace enables tracing of this query. Look at the documentation of the
947// Tracer interface to learn more about tracing.
948func (q *Query) Trace(trace Tracer) *Query {
949	q.trace = trace
950	return q
951}
952
953// Observer enables query-level observer on this query.
954// The provided observer will be called every time this query is executed.
955func (q *Query) Observer(observer QueryObserver) *Query {
956	q.observer = observer
957	return q
958}
959
960// PageSize will tell the iterator to fetch the result in pages of size n.
961// This is useful for iterating over large result sets, but setting the
962// page size too low might decrease the performance. This feature is only
963// available in Cassandra 2 and onwards.
964func (q *Query) PageSize(n int) *Query {
965	q.pageSize = n
966	return q
967}
968
969// DefaultTimestamp will enable the with default timestamp flag on the query.
970// If enable, this will replace the server side assigned
971// timestamp as default timestamp. Note that a timestamp in the query itself
972// will still override this timestamp. This is entirely optional.
973//
974// Only available on protocol >= 3
975func (q *Query) DefaultTimestamp(enable bool) *Query {
976	q.defaultTimestamp = enable
977	return q
978}
979
980// WithTimestamp will enable the with default timestamp flag on the query
981// like DefaultTimestamp does. But also allows to define value for timestamp.
982// It works the same way as USING TIMESTAMP in the query itself, but
983// should not break prepared query optimization.
984//
985// Only available on protocol >= 3
986func (q *Query) WithTimestamp(timestamp int64) *Query {
987	q.DefaultTimestamp(true)
988	q.defaultTimestampValue = timestamp
989	return q
990}
991
992// RoutingKey sets the routing key to use when a token aware connection
993// pool is used to optimize the routing of this query.
994func (q *Query) RoutingKey(routingKey []byte) *Query {
995	q.routingKey = routingKey
996	return q
997}
998
999func (q *Query) withContext(ctx context.Context) ExecutableQuery {
1000	// I really wish go had covariant types
1001	return q.WithContext(ctx)
1002}
1003
1004// WithContext returns a shallow copy of q with its context
1005// set to ctx.
1006//
1007// The provided context controls the entire lifetime of executing a
1008// query, queries will be canceled and return once the context is
1009// canceled.
1010func (q *Query) WithContext(ctx context.Context) *Query {
1011	q2 := *q
1012	q2.context = ctx
1013	return &q2
1014}
1015
1016// Deprecate: does nothing, cancel the context passed to WithContext
1017func (q *Query) Cancel() {
1018	// TODO: delete
1019}
1020
1021func (q *Query) execute(ctx context.Context, conn *Conn) *Iter {
1022	return conn.executeQuery(ctx, q)
1023}
1024
1025func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
1026	latency := end.Sub(start)
1027	attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.observer != nil)
1028
1029	if q.observer != nil {
1030		q.observer.ObserveQuery(q.Context(), ObservedQuery{
1031			Keyspace:  keyspace,
1032			Statement: q.stmt,
1033			Start:     start,
1034			End:       end,
1035			Rows:      iter.numRows,
1036			Host:      host,
1037			Metrics:   metricsForHost,
1038			Err:       iter.err,
1039			Attempt:   attempt,
1040		})
1041	}
1042}
1043
1044func (q *Query) retryPolicy() RetryPolicy {
1045	return q.rt
1046}
1047
1048// Keyspace returns the keyspace the query will be executed against.
1049func (q *Query) Keyspace() string {
1050	if q.getKeyspace != nil {
1051		return q.getKeyspace()
1052	}
1053	if q.session == nil {
1054		return ""
1055	}
1056	// TODO(chbannis): this should be parsed from the query or we should let
1057	// this be set by users.
1058	return q.session.cfg.Keyspace
1059}
1060
1061// GetRoutingKey gets the routing key to use for routing this query. If
1062// a routing key has not been explicitly set, then the routing key will
1063// be constructed if possible using the keyspace's schema and the query
1064// info for this query statement. If the routing key cannot be determined
1065// then nil will be returned with no error. On any error condition,
1066// an error description will be returned.
1067func (q *Query) GetRoutingKey() ([]byte, error) {
1068	if q.routingKey != nil {
1069		return q.routingKey, nil
1070	} else if q.binding != nil && len(q.values) == 0 {
1071		// If this query was created using session.Bind we wont have the query
1072		// values yet, so we have to pass down to the next policy.
1073		// TODO: Remove this and handle this case
1074		return nil, nil
1075	}
1076
1077	// try to determine the routing key
1078	routingKeyInfo, err := q.session.routingKeyInfo(q.Context(), q.stmt)
1079	if err != nil {
1080		return nil, err
1081	}
1082
1083	return createRoutingKey(routingKeyInfo, q.values)
1084}
1085
1086func (q *Query) shouldPrepare() bool {
1087
1088	stmt := strings.TrimLeftFunc(strings.TrimRightFunc(q.stmt, func(r rune) bool {
1089		return unicode.IsSpace(r) || r == ';'
1090	}), unicode.IsSpace)
1091
1092	var stmtType string
1093	if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 {
1094		stmtType = strings.ToLower(stmt[:n])
1095	}
1096	if stmtType == "begin" {
1097		if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 {
1098			stmtType = strings.ToLower(stmt[n+1:])
1099		}
1100	}
1101	switch stmtType {
1102	case "select", "insert", "update", "delete", "batch":
1103		return true
1104	}
1105	return false
1106}
1107
1108// SetPrefetch sets the default threshold for pre-fetching new pages. If
1109// there are only p*pageSize rows remaining, the next page will be requested
1110// automatically.
1111func (q *Query) Prefetch(p float64) *Query {
1112	q.prefetch = p
1113	return q
1114}
1115
1116// RetryPolicy sets the policy to use when retrying the query.
1117func (q *Query) RetryPolicy(r RetryPolicy) *Query {
1118	q.rt = r
1119	return q
1120}
1121
1122// SetSpeculativeExecutionPolicy sets the execution policy
1123func (q *Query) SetSpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Query {
1124	q.spec = sp
1125	return q
1126}
1127
1128// speculativeExecutionPolicy fetches the policy
1129func (q *Query) speculativeExecutionPolicy() SpeculativeExecutionPolicy {
1130	return q.spec
1131}
1132
1133// IsIdempotent returns whether the query is marked as idempotent.
1134// Non-idempotent query won't be retried.
1135// See "Retries and speculative execution" in package docs for more details.
1136func (q *Query) IsIdempotent() bool {
1137	return q.idempotent
1138}
1139
1140// Idempotent marks the query as being idempotent or not depending on
1141// the value.
1142// Non-idempotent query won't be retried.
1143// See "Retries and speculative execution" in package docs for more details.
1144func (q *Query) Idempotent(value bool) *Query {
1145	q.idempotent = value
1146	return q
1147}
1148
1149// Bind sets query arguments of query. This can also be used to rebind new query arguments
1150// to an existing query instance.
1151func (q *Query) Bind(v ...interface{}) *Query {
1152	q.values = v
1153	q.pageState = nil
1154	return q
1155}
1156
1157// SerialConsistency sets the consistency level for the
1158// serial phase of conditional updates. That consistency can only be
1159// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
1160// SERIAL. This option will be ignored for anything else that a
1161// conditional update/insert.
1162func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
1163	q.serialCons = cons
1164	return q
1165}
1166
1167// PageState sets the paging state for the query to resume paging from a specific
1168// point in time. Setting this will disable to query paging for this query, and
1169// must be used for all subsequent pages.
1170func (q *Query) PageState(state []byte) *Query {
1171	q.pageState = state
1172	q.disableAutoPage = true
1173	return q
1174}
1175
1176// NoSkipMetadata will override the internal result metadata cache so that the driver does not
1177// send skip_metadata for queries, this means that the result will always contain
1178// the metadata to parse the rows and will not reuse the metadata from the prepared
1179// statement. This should only be used to work around cassandra bugs, such as when using
1180// CAS operations which do not end in Cas.
1181//
1182// See https://issues.apache.org/jira/browse/CASSANDRA-11099
1183// https://github.com/gocql/gocql/issues/612
1184func (q *Query) NoSkipMetadata() *Query {
1185	q.disableSkipMetadata = true
1186	return q
1187}
1188
1189// Exec executes the query without returning any rows.
1190func (q *Query) Exec() error {
1191	return q.Iter().Close()
1192}
1193
1194func isUseStatement(stmt string) bool {
1195	if len(stmt) < 3 {
1196		return false
1197	}
1198
1199	return strings.EqualFold(stmt[0:3], "use")
1200}
1201
1202// Iter executes the query and returns an iterator capable of iterating
1203// over all results.
1204func (q *Query) Iter() *Iter {
1205	if isUseStatement(q.stmt) {
1206		return &Iter{err: ErrUseStmt}
1207	}
1208	// if the query was specifically run on a connection then re-use that
1209	// connection when fetching the next results
1210	if q.conn != nil {
1211		return q.conn.executeQuery(q.Context(), q)
1212	}
1213	return q.session.executeQuery(q)
1214}
1215
1216// MapScan executes the query, copies the columns of the first selected
1217// row into the map pointed at by m and discards the rest. If no rows
1218// were selected, ErrNotFound is returned.
1219func (q *Query) MapScan(m map[string]interface{}) error {
1220	iter := q.Iter()
1221	if err := iter.checkErrAndNotFound(); err != nil {
1222		return err
1223	}
1224	iter.MapScan(m)
1225	return iter.Close()
1226}
1227
1228// Scan executes the query, copies the columns of the first selected
1229// row into the values pointed at by dest and discards the rest. If no rows
1230// were selected, ErrNotFound is returned.
1231func (q *Query) Scan(dest ...interface{}) error {
1232	iter := q.Iter()
1233	if err := iter.checkErrAndNotFound(); err != nil {
1234		return err
1235	}
1236	iter.Scan(dest...)
1237	return iter.Close()
1238}
1239
1240// ScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT
1241// statement containing an IF clause). If the transaction fails because
1242// the existing values did not match, the previous values will be stored
1243// in dest.
1244//
1245// As for INSERT .. IF NOT EXISTS, previous values will be returned as if
1246// SELECT * FROM. So using ScanCAS with INSERT is inherently prone to
1247// column mismatching. Use MapScanCAS to capture them safely.
1248func (q *Query) ScanCAS(dest ...interface{}) (applied bool, err error) {
1249	q.disableSkipMetadata = true
1250	iter := q.Iter()
1251	if err := iter.checkErrAndNotFound(); err != nil {
1252		return false, err
1253	}
1254	if len(iter.Columns()) > 1 {
1255		dest = append([]interface{}{&applied}, dest...)
1256		iter.Scan(dest...)
1257	} else {
1258		iter.Scan(&applied)
1259	}
1260	return applied, iter.Close()
1261}
1262
1263// MapScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT
1264// statement containing an IF clause). If the transaction fails because
1265// the existing values did not match, the previous values will be stored
1266// in dest map.
1267//
1268// As for INSERT .. IF NOT EXISTS, previous values will be returned as if
1269// SELECT * FROM. So using ScanCAS with INSERT is inherently prone to
1270// column mismatching. MapScanCAS is added to capture them safely.
1271func (q *Query) MapScanCAS(dest map[string]interface{}) (applied bool, err error) {
1272	q.disableSkipMetadata = true
1273	iter := q.Iter()
1274	if err := iter.checkErrAndNotFound(); err != nil {
1275		return false, err
1276	}
1277	iter.MapScan(dest)
1278	applied = dest["[applied]"].(bool)
1279	delete(dest, "[applied]")
1280
1281	return applied, iter.Close()
1282}
1283
1284// Release releases a query back into a pool of queries. Released Queries
1285// cannot be reused.
1286//
1287// Example:
1288// 		qry := session.Query("SELECT * FROM my_table")
1289// 		qry.Exec()
1290// 		qry.Release()
1291func (q *Query) Release() {
1292	q.reset()
1293	queryPool.Put(q)
1294}
1295
1296// reset zeroes out all fields of a query so that it can be safely pooled.
1297func (q *Query) reset() {
1298	*q = Query{}
1299}
1300
1301// Iter represents an iterator that can be used to iterate over all rows that
1302// were returned by a query. The iterator might send additional queries to the
1303// database during the iteration if paging was enabled.
1304type Iter struct {
1305	err     error
1306	pos     int
1307	meta    resultMetadata
1308	numRows int
1309	next    *nextIter
1310	host    *HostInfo
1311
1312	framer *framer
1313	closed int32
1314}
1315
1316// Host returns the host which the query was sent to.
1317func (iter *Iter) Host() *HostInfo {
1318	return iter.host
1319}
1320
1321// Columns returns the name and type of the selected columns.
1322func (iter *Iter) Columns() []ColumnInfo {
1323	return iter.meta.columns
1324}
1325
1326type Scanner interface {
1327	// Next advances the row pointer to point at the next row, the row is valid until
1328	// the next call of Next. It returns true if there is a row which is available to be
1329	// scanned into with Scan.
1330	// Next must be called before every call to Scan.
1331	Next() bool
1332
1333	// Scan copies the current row's columns into dest. If the length of dest does not equal
1334	// the number of columns returned in the row an error is returned. If an error is encountered
1335	// when unmarshalling a column into the value in dest an error is returned and the row is invalidated
1336	// until the next call to Next.
1337	// Next must be called before calling Scan, if it is not an error is returned.
1338	Scan(...interface{}) error
1339
1340	// Err returns the if there was one during iteration that resulted in iteration being unable to complete.
1341	// Err will also release resources held by the iterator, the Scanner should not used after being called.
1342	Err() error
1343}
1344
1345type iterScanner struct {
1346	iter  *Iter
1347	cols  [][]byte
1348	valid bool
1349}
1350
1351func (is *iterScanner) Next() bool {
1352	iter := is.iter
1353	if iter.err != nil {
1354		return false
1355	}
1356
1357	if iter.pos >= iter.numRows {
1358		if iter.next != nil {
1359			is.iter = iter.next.fetch()
1360			return is.Next()
1361		}
1362		return false
1363	}
1364
1365	for i := 0; i < len(is.cols); i++ {
1366		col, err := iter.readColumn()
1367		if err != nil {
1368			iter.err = err
1369			return false
1370		}
1371		is.cols[i] = col
1372	}
1373	iter.pos++
1374	is.valid = true
1375
1376	return true
1377}
1378
1379func scanColumn(p []byte, col ColumnInfo, dest []interface{}) (int, error) {
1380	if dest[0] == nil {
1381		return 1, nil
1382	}
1383
1384	if col.TypeInfo.Type() == TypeTuple {
1385		// this will panic, actually a bug, please report
1386		tuple := col.TypeInfo.(TupleTypeInfo)
1387
1388		count := len(tuple.Elems)
1389		// here we pass in a slice of the struct which has the number number of
1390		// values as elements in the tuple
1391		if err := Unmarshal(col.TypeInfo, p, dest[:count]); err != nil {
1392			return 0, err
1393		}
1394		return count, nil
1395	} else {
1396		if err := Unmarshal(col.TypeInfo, p, dest[0]); err != nil {
1397			return 0, err
1398		}
1399		return 1, nil
1400	}
1401}
1402
1403func (is *iterScanner) Scan(dest ...interface{}) error {
1404	if !is.valid {
1405		return errors.New("gocql: Scan called without calling Next")
1406	}
1407
1408	iter := is.iter
1409	// currently only support scanning into an expand tuple, such that its the same
1410	// as scanning in more values from a single column
1411	if len(dest) != iter.meta.actualColCount {
1412		return fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
1413	}
1414
1415	// i is the current position in dest, could posible replace it and just use
1416	// slices of dest
1417	i := 0
1418	var err error
1419	for _, col := range iter.meta.columns {
1420		var n int
1421		n, err = scanColumn(is.cols[i], col, dest[i:])
1422		if err != nil {
1423			break
1424		}
1425		i += n
1426	}
1427
1428	is.valid = false
1429	return err
1430}
1431
1432func (is *iterScanner) Err() error {
1433	iter := is.iter
1434	is.iter = nil
1435	is.cols = nil
1436	is.valid = false
1437	return iter.Close()
1438}
1439
1440// Scanner returns a row Scanner which provides an interface to scan rows in a manner which is
1441// similar to database/sql. The iter should NOT be used again after calling this method.
1442func (iter *Iter) Scanner() Scanner {
1443	if iter == nil {
1444		return nil
1445	}
1446
1447	return &iterScanner{iter: iter, cols: make([][]byte, len(iter.meta.columns))}
1448}
1449
1450func (iter *Iter) readColumn() ([]byte, error) {
1451	return iter.framer.readBytesInternal()
1452}
1453
1454// Scan consumes the next row of the iterator and copies the columns of the
1455// current row into the values pointed at by dest. Use nil as a dest value
1456// to skip the corresponding column. Scan might send additional queries
1457// to the database to retrieve the next set of rows if paging was enabled.
1458//
1459// Scan returns true if the row was successfully unmarshaled or false if the
1460// end of the result set was reached or if an error occurred. Close should
1461// be called afterwards to retrieve any potential errors.
1462func (iter *Iter) Scan(dest ...interface{}) bool {
1463	if iter.err != nil {
1464		return false
1465	}
1466
1467	if iter.pos >= iter.numRows {
1468		if iter.next != nil {
1469			*iter = *iter.next.fetch()
1470			return iter.Scan(dest...)
1471		}
1472		return false
1473	}
1474
1475	if iter.next != nil && iter.pos >= iter.next.pos {
1476		iter.next.fetchAsync()
1477	}
1478
1479	// currently only support scanning into an expand tuple, such that its the same
1480	// as scanning in more values from a single column
1481	if len(dest) != iter.meta.actualColCount {
1482		iter.err = fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
1483		return false
1484	}
1485
1486	// i is the current position in dest, could posible replace it and just use
1487	// slices of dest
1488	i := 0
1489	for _, col := range iter.meta.columns {
1490		colBytes, err := iter.readColumn()
1491		if err != nil {
1492			iter.err = err
1493			return false
1494		}
1495
1496		n, err := scanColumn(colBytes, col, dest[i:])
1497		if err != nil {
1498			iter.err = err
1499			return false
1500		}
1501		i += n
1502	}
1503
1504	iter.pos++
1505	return true
1506}
1507
1508// GetCustomPayload returns any parsed custom payload results if given in the
1509// response from Cassandra. Note that the result is not a copy.
1510//
1511// This additional feature of CQL Protocol v4
1512// allows additional results and query information to be returned by
1513// custom QueryHandlers running in your C* cluster.
1514// See https://datastax.github.io/java-driver/manual/custom_payloads/
1515func (iter *Iter) GetCustomPayload() map[string][]byte {
1516	return iter.framer.customPayload
1517}
1518
1519// Warnings returns any warnings generated if given in the response from Cassandra.
1520//
1521// This is only available starting with CQL Protocol v4.
1522func (iter *Iter) Warnings() []string {
1523	if iter.framer != nil {
1524		return iter.framer.header.warnings
1525	}
1526	return nil
1527}
1528
1529// Close closes the iterator and returns any errors that happened during
1530// the query or the iteration.
1531func (iter *Iter) Close() error {
1532	if atomic.CompareAndSwapInt32(&iter.closed, 0, 1) {
1533		if iter.framer != nil {
1534			iter.framer = nil
1535		}
1536	}
1537
1538	return iter.err
1539}
1540
1541// WillSwitchPage detects if iterator reached end of current page
1542// and the next page is available.
1543func (iter *Iter) WillSwitchPage() bool {
1544	return iter.pos >= iter.numRows && iter.next != nil
1545}
1546
1547// checkErrAndNotFound handle error and NotFound in one method.
1548func (iter *Iter) checkErrAndNotFound() error {
1549	if iter.err != nil {
1550		return iter.err
1551	} else if iter.numRows == 0 {
1552		return ErrNotFound
1553	}
1554	return nil
1555}
1556
1557// PageState return the current paging state for a query which can be used for
1558// subsequent queries to resume paging this point.
1559func (iter *Iter) PageState() []byte {
1560	return iter.meta.pagingState
1561}
1562
1563// NumRows returns the number of rows in this pagination, it will update when new
1564// pages are fetched, it is not the value of the total number of rows this iter
1565// will return unless there is only a single page returned.
1566func (iter *Iter) NumRows() int {
1567	return iter.numRows
1568}
1569
1570// nextIter holds state for fetching a single page in an iterator.
1571// single page might be attempted multiple times due to retries.
1572type nextIter struct {
1573	qry   *Query
1574	pos   int
1575	oncea sync.Once
1576	once  sync.Once
1577	next  *Iter
1578}
1579
1580func (n *nextIter) fetchAsync() {
1581	n.oncea.Do(func() {
1582		go n.fetch()
1583	})
1584}
1585
1586func (n *nextIter) fetch() *Iter {
1587	n.once.Do(func() {
1588		// if the query was specifically run on a connection then re-use that
1589		// connection when fetching the next results
1590		if n.qry.conn != nil {
1591			n.next = n.qry.conn.executeQuery(n.qry.Context(), n.qry)
1592		} else {
1593			n.next = n.qry.session.executeQuery(n.qry)
1594		}
1595	})
1596	return n.next
1597}
1598
1599type Batch struct {
1600	Type                  BatchType
1601	Entries               []BatchEntry
1602	Cons                  Consistency
1603	routingKey            []byte
1604	CustomPayload         map[string][]byte
1605	rt                    RetryPolicy
1606	spec                  SpeculativeExecutionPolicy
1607	observer              BatchObserver
1608	session               *Session
1609	serialCons            SerialConsistency
1610	defaultTimestamp      bool
1611	defaultTimestampValue int64
1612	context               context.Context
1613	cancelBatch           func()
1614	keyspace              string
1615	metrics               *queryMetrics
1616}
1617
1618// NewBatch creates a new batch operation without defaults from the cluster
1619//
1620// Deprecated: use session.NewBatch instead
1621func NewBatch(typ BatchType) *Batch {
1622	return &Batch{
1623		Type:    typ,
1624		metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
1625		spec:    &NonSpeculativeExecution{},
1626	}
1627}
1628
1629// NewBatch creates a new batch operation using defaults defined in the cluster
1630func (s *Session) NewBatch(typ BatchType) *Batch {
1631	s.mu.RLock()
1632	batch := &Batch{
1633		Type:             typ,
1634		rt:               s.cfg.RetryPolicy,
1635		serialCons:       s.cfg.SerialConsistency,
1636		observer:         s.batchObserver,
1637		session:          s,
1638		Cons:             s.cons,
1639		defaultTimestamp: s.cfg.DefaultTimestamp,
1640		keyspace:         s.cfg.Keyspace,
1641		metrics:          &queryMetrics{m: make(map[string]*hostMetrics)},
1642		spec:             &NonSpeculativeExecution{},
1643	}
1644
1645	s.mu.RUnlock()
1646	return batch
1647}
1648
1649// Observer enables batch-level observer on this batch.
1650// The provided observer will be called every time this batched query is executed.
1651func (b *Batch) Observer(observer BatchObserver) *Batch {
1652	b.observer = observer
1653	return b
1654}
1655
1656func (b *Batch) Keyspace() string {
1657	return b.keyspace
1658}
1659
1660// Attempts returns the number of attempts made to execute the batch.
1661func (b *Batch) Attempts() int {
1662	return b.metrics.attempts()
1663}
1664
1665func (b *Batch) AddAttempts(i int, host *HostInfo) {
1666	b.metrics.attempt(i, 0, host, false)
1667}
1668
1669//Latency returns the average number of nanoseconds to execute a single attempt of the batch.
1670func (b *Batch) Latency() int64 {
1671	return b.metrics.latency()
1672}
1673
1674func (b *Batch) AddLatency(l int64, host *HostInfo) {
1675	b.metrics.attempt(0, time.Duration(l)*time.Nanosecond, host, false)
1676}
1677
1678// GetConsistency returns the currently configured consistency level for the batch
1679// operation.
1680func (b *Batch) GetConsistency() Consistency {
1681	return b.Cons
1682}
1683
1684// SetConsistency sets the currently configured consistency level for the batch
1685// operation.
1686func (b *Batch) SetConsistency(c Consistency) {
1687	b.Cons = c
1688}
1689
1690func (b *Batch) Context() context.Context {
1691	if b.context == nil {
1692		return context.Background()
1693	}
1694	return b.context
1695}
1696
1697func (b *Batch) IsIdempotent() bool {
1698	for _, entry := range b.Entries {
1699		if !entry.Idempotent {
1700			return false
1701		}
1702	}
1703	return true
1704}
1705
1706func (b *Batch) speculativeExecutionPolicy() SpeculativeExecutionPolicy {
1707	return b.spec
1708}
1709
1710func (b *Batch) SpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Batch {
1711	b.spec = sp
1712	return b
1713}
1714
1715// Query adds the query to the batch operation
1716func (b *Batch) Query(stmt string, args ...interface{}) {
1717	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
1718}
1719
1720// Bind adds the query to the batch operation and correlates it with a binding callback
1721// that will be invoked when the batch is executed. The binding callback allows the application
1722// to define which query argument values will be marshalled as part of the batch execution.
1723func (b *Batch) Bind(stmt string, bind func(q *QueryInfo) ([]interface{}, error)) {
1724	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, binding: bind})
1725}
1726
1727func (b *Batch) retryPolicy() RetryPolicy {
1728	return b.rt
1729}
1730
1731// RetryPolicy sets the retry policy to use when executing the batch operation
1732func (b *Batch) RetryPolicy(r RetryPolicy) *Batch {
1733	b.rt = r
1734	return b
1735}
1736
1737func (b *Batch) withContext(ctx context.Context) ExecutableQuery {
1738	return b.WithContext(ctx)
1739}
1740
1741// WithContext returns a shallow copy of b with its context
1742// set to ctx.
1743//
1744// The provided context controls the entire lifetime of executing a
1745// query, queries will be canceled and return once the context is
1746// canceled.
1747func (b *Batch) WithContext(ctx context.Context) *Batch {
1748	b2 := *b
1749	b2.context = ctx
1750	return &b2
1751}
1752
1753// Deprecate: does nothing, cancel the context passed to WithContext
1754func (*Batch) Cancel() {
1755	// TODO: delete
1756}
1757
1758// Size returns the number of batch statements to be executed by the batch operation.
1759func (b *Batch) Size() int {
1760	return len(b.Entries)
1761}
1762
1763// SerialConsistency sets the consistency level for the
1764// serial phase of conditional updates. That consistency can only be
1765// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
1766// SERIAL. This option will be ignored for anything else that a
1767// conditional update/insert.
1768//
1769// Only available for protocol 3 and above
1770func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
1771	b.serialCons = cons
1772	return b
1773}
1774
1775// DefaultTimestamp will enable the with default timestamp flag on the query.
1776// If enable, this will replace the server side assigned
1777// timestamp as default timestamp. Note that a timestamp in the query itself
1778// will still override this timestamp. This is entirely optional.
1779//
1780// Only available on protocol >= 3
1781func (b *Batch) DefaultTimestamp(enable bool) *Batch {
1782	b.defaultTimestamp = enable
1783	return b
1784}
1785
1786// WithTimestamp will enable the with default timestamp flag on the query
1787// like DefaultTimestamp does. But also allows to define value for timestamp.
1788// It works the same way as USING TIMESTAMP in the query itself, but
1789// should not break prepared query optimization.
1790//
1791// Only available on protocol >= 3
1792func (b *Batch) WithTimestamp(timestamp int64) *Batch {
1793	b.DefaultTimestamp(true)
1794	b.defaultTimestampValue = timestamp
1795	return b
1796}
1797
1798func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
1799	latency := end.Sub(start)
1800	attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.observer != nil)
1801
1802	if b.observer == nil {
1803		return
1804	}
1805
1806	statements := make([]string, len(b.Entries))
1807	for i, entry := range b.Entries {
1808		statements[i] = entry.Stmt
1809	}
1810
1811	b.observer.ObserveBatch(b.Context(), ObservedBatch{
1812		Keyspace:   keyspace,
1813		Statements: statements,
1814		Start:      start,
1815		End:        end,
1816		// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
1817		Host:    host,
1818		Metrics: metricsForHost,
1819		Err:     iter.err,
1820		Attempt: attempt,
1821	})
1822}
1823
1824func (b *Batch) GetRoutingKey() ([]byte, error) {
1825	if b.routingKey != nil {
1826		return b.routingKey, nil
1827	}
1828
1829	if len(b.Entries) == 0 {
1830		return nil, nil
1831	}
1832
1833	entry := b.Entries[0]
1834	if entry.binding != nil {
1835		// bindings do not have the values let's skip it like Query does.
1836		return nil, nil
1837	}
1838	// try to determine the routing key
1839	routingKeyInfo, err := b.session.routingKeyInfo(b.Context(), entry.Stmt)
1840	if err != nil {
1841		return nil, err
1842	}
1843
1844	return createRoutingKey(routingKeyInfo, entry.Args)
1845}
1846
1847func createRoutingKey(routingKeyInfo *routingKeyInfo, values []interface{}) ([]byte, error) {
1848	if routingKeyInfo == nil {
1849		return nil, nil
1850	}
1851
1852	if len(routingKeyInfo.indexes) == 1 {
1853		// single column routing key
1854		routingKey, err := Marshal(
1855			routingKeyInfo.types[0],
1856			values[routingKeyInfo.indexes[0]],
1857		)
1858		if err != nil {
1859			return nil, err
1860		}
1861		return routingKey, nil
1862	}
1863
1864	// composite routing key
1865	buf := bytes.NewBuffer(make([]byte, 0, 256))
1866	for i := range routingKeyInfo.indexes {
1867		encoded, err := Marshal(
1868			routingKeyInfo.types[i],
1869			values[routingKeyInfo.indexes[i]],
1870		)
1871		if err != nil {
1872			return nil, err
1873		}
1874		lenBuf := []byte{0x00, 0x00}
1875		binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded)))
1876		buf.Write(lenBuf)
1877		buf.Write(encoded)
1878		buf.WriteByte(0x00)
1879	}
1880	routingKey := buf.Bytes()
1881	return routingKey, nil
1882}
1883
1884type BatchType byte
1885
1886const (
1887	LoggedBatch   BatchType = 0
1888	UnloggedBatch BatchType = 1
1889	CounterBatch  BatchType = 2
1890)
1891
1892type BatchEntry struct {
1893	Stmt       string
1894	Args       []interface{}
1895	Idempotent bool
1896	binding    func(q *QueryInfo) ([]interface{}, error)
1897}
1898
1899type ColumnInfo struct {
1900	Keyspace string
1901	Table    string
1902	Name     string
1903	TypeInfo TypeInfo
1904}
1905
1906func (c ColumnInfo) String() string {
1907	return fmt.Sprintf("[column keyspace=%s table=%s name=%s type=%v]", c.Keyspace, c.Table, c.Name, c.TypeInfo)
1908}
1909
1910// routing key indexes LRU cache
1911type routingKeyInfoLRU struct {
1912	lru *lru.Cache
1913	mu  sync.Mutex
1914}
1915
1916type routingKeyInfo struct {
1917	indexes []int
1918	types   []TypeInfo
1919}
1920
1921func (r *routingKeyInfo) String() string {
1922	return fmt.Sprintf("routing key index=%v types=%v", r.indexes, r.types)
1923}
1924
1925func (r *routingKeyInfoLRU) Remove(key string) {
1926	r.mu.Lock()
1927	r.lru.Remove(key)
1928	r.mu.Unlock()
1929}
1930
1931//Max adjusts the maximum size of the cache and cleans up the oldest records if
1932//the new max is lower than the previous value. Not concurrency safe.
1933func (r *routingKeyInfoLRU) Max(max int) {
1934	r.mu.Lock()
1935	for r.lru.Len() > max {
1936		r.lru.RemoveOldest()
1937	}
1938	r.lru.MaxEntries = max
1939	r.mu.Unlock()
1940}
1941
1942type inflightCachedEntry struct {
1943	wg    sync.WaitGroup
1944	err   error
1945	value interface{}
1946}
1947
1948// Tracer is the interface implemented by query tracers. Tracers have the
1949// ability to obtain a detailed event log of all events that happened during
1950// the execution of a query from Cassandra. Gathering this information might
1951// be essential for debugging and optimizing queries, but this feature should
1952// not be used on production systems with very high load.
1953type Tracer interface {
1954	Trace(traceId []byte)
1955}
1956
1957type traceWriter struct {
1958	session *Session
1959	w       io.Writer
1960	mu      sync.Mutex
1961}
1962
1963// NewTraceWriter returns a simple Tracer implementation that outputs
1964// the event log in a textual format.
1965func NewTraceWriter(session *Session, w io.Writer) Tracer {
1966	return &traceWriter{session: session, w: w}
1967}
1968
1969func (t *traceWriter) Trace(traceId []byte) {
1970	var (
1971		coordinator string
1972		duration    int
1973	)
1974	iter := t.session.control.query(`SELECT coordinator, duration
1975			FROM system_traces.sessions
1976			WHERE session_id = ?`, traceId)
1977
1978	iter.Scan(&coordinator, &duration)
1979	if err := iter.Close(); err != nil {
1980		t.mu.Lock()
1981		fmt.Fprintln(t.w, "Error:", err)
1982		t.mu.Unlock()
1983		return
1984	}
1985
1986	var (
1987		timestamp time.Time
1988		activity  string
1989		source    string
1990		elapsed   int
1991	)
1992
1993	t.mu.Lock()
1994	defer t.mu.Unlock()
1995
1996	fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n",
1997		traceId, coordinator, time.Duration(duration)*time.Microsecond)
1998
1999	iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed
2000			FROM system_traces.events
2001			WHERE session_id = ?`, traceId)
2002
2003	for iter.Scan(&timestamp, &activity, &source, &elapsed) {
2004		fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n",
2005			timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed)
2006	}
2007
2008	if err := iter.Close(); err != nil {
2009		fmt.Fprintln(t.w, "Error:", err)
2010	}
2011}
2012
2013type ObservedQuery struct {
2014	Keyspace  string
2015	Statement string
2016
2017	Start time.Time // time immediately before the query was called
2018	End   time.Time // time immediately after the query returned
2019
2020	// Rows is the number of rows in the current iter.
2021	// In paginated queries, rows from previous scans are not counted.
2022	// Rows is not used in batch queries and remains at the default value
2023	Rows int
2024
2025	// Host is the informations about the host that performed the query
2026	Host *HostInfo
2027
2028	// The metrics per this host
2029	Metrics *hostMetrics
2030
2031	// Err is the error in the query.
2032	// It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
2033	Err error
2034
2035	// Attempt is the index of attempt at executing this query.
2036	// The first attempt is number zero and any retries have non-zero attempt number.
2037	Attempt int
2038}
2039
2040// QueryObserver is the interface implemented by query observers / stat collectors.
2041//
2042// Experimental, this interface and use may change
2043type QueryObserver interface {
2044	// ObserveQuery gets called on every query to cassandra, including all queries in an iterator when paging is enabled.
2045	// It doesn't get called if there is no query because the session is closed or there are no connections available.
2046	// The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil.
2047	ObserveQuery(context.Context, ObservedQuery)
2048}
2049
2050type ObservedBatch struct {
2051	Keyspace   string
2052	Statements []string
2053
2054	Start time.Time // time immediately before the batch query was called
2055	End   time.Time // time immediately after the batch query returned
2056
2057	// Host is the informations about the host that performed the batch
2058	Host *HostInfo
2059
2060	// Err is the error in the batch query.
2061	// It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
2062	Err error
2063
2064	// The metrics per this host
2065	Metrics *hostMetrics
2066
2067	// Attempt is the index of attempt at executing this query.
2068	// The first attempt is number zero and any retries have non-zero attempt number.
2069	Attempt int
2070}
2071
2072// BatchObserver is the interface implemented by batch observers / stat collectors.
2073type BatchObserver interface {
2074	// ObserveBatch gets called on every batch query to cassandra.
2075	// It also gets called once for each query in a batch.
2076	// It doesn't get called if there is no query because the session is closed or there are no connections available.
2077	// The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil.
2078	// Unlike QueryObserver.ObserveQuery it does no reporting on rows read.
2079	ObserveBatch(context.Context, ObservedBatch)
2080}
2081
2082type ObservedConnect struct {
2083	// Host is the information about the host about to connect
2084	Host *HostInfo
2085
2086	Start time.Time // time immediately before the dial is called
2087	End   time.Time // time immediately after the dial returned
2088
2089	// Err is the connection error (if any)
2090	Err error
2091}
2092
2093// ConnectObserver is the interface implemented by connect observers / stat collectors.
2094type ConnectObserver interface {
2095	// ObserveConnect gets called when a new connection to cassandra is made.
2096	ObserveConnect(ObservedConnect)
2097}
2098
2099type Error struct {
2100	Code    int
2101	Message string
2102}
2103
2104func (e Error) Error() string {
2105	return e.Message
2106}
2107
2108var (
2109	ErrNotFound             = errors.New("not found")
2110	ErrUnavailable          = errors.New("unavailable")
2111	ErrUnsupported          = errors.New("feature not supported")
2112	ErrTooManyStmts         = errors.New("too many statements")
2113	ErrUseStmt              = errors.New("use statements aren't supported. Please see https://github.com/gocql/gocql for explanation.")
2114	ErrSessionClosed        = errors.New("session has been closed")
2115	ErrNoConnections        = errors.New("gocql: no hosts available in the pool")
2116	ErrNoKeyspace           = errors.New("no keyspace provided")
2117	ErrKeyspaceDoesNotExist = errors.New("keyspace does not exist")
2118	ErrNoMetadata           = errors.New("no metadata available")
2119)
2120
2121type ErrProtocol struct{ error }
2122
2123func NewErrProtocol(format string, args ...interface{}) error {
2124	return ErrProtocol{fmt.Errorf(format, args...)}
2125}
2126
2127// BatchSizeMaximum is the maximum number of statements a batch operation can have.
2128// This limit is set by cassandra and could change in the future.
2129const BatchSizeMaximum = 65535
2130