1// Copyright (c) 2012 The gocql Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package gocql
6
7import (
8	"bytes"
9	"context"
10	"encoding/binary"
11	"errors"
12	"fmt"
13	"io"
14	"net"
15	"strings"
16	"sync"
17	"sync/atomic"
18	"time"
19	"unicode"
20
21	"github.com/gocql/gocql/internal/lru"
22)
23
24// Session is the interface used by users to interact with the database.
25//
26// It's safe for concurrent use by multiple goroutines and a typical usage
27// scenario is to have one global session object to interact with the
28// whole Cassandra cluster.
29//
30// This type extends the Node interface by adding a convinient query builder
31// and automatically sets a default consistency level on all operations
32// that do not have a consistency level set.
33type Session struct {
34	cons                Consistency
35	pageSize            int
36	prefetch            float64
37	routingKeyInfoCache routingKeyInfoLRU
38	schemaDescriber     *schemaDescriber
39	trace               Tracer
40	queryObserver       QueryObserver
41	batchObserver       BatchObserver
42	connectObserver     ConnectObserver
43	frameObserver       FrameHeaderObserver
44	hostSource          *ringDescriber
45	stmtsLRU            *preparedLRU
46
47	connCfg *ConnConfig
48
49	executor *queryExecutor
50	pool     *policyConnPool
51	policy   HostSelectionPolicy
52
53	ring     ring
54	metadata clusterMetadata
55
56	mu sync.RWMutex
57
58	control *controlConn
59
60	// event handlers
61	nodeEvents   *eventDebouncer
62	schemaEvents *eventDebouncer
63
64	// ring metadata
65	hosts                     []HostInfo
66	useSystemSchema           bool
67	hasAggregatesAndFunctions bool
68
69	cfg ClusterConfig
70
71	quit chan struct{}
72
73	closeMu  sync.RWMutex
74	isClosed bool
75}
76
77var queryPool = &sync.Pool{
78	New: func() interface{} {
79		return new(Query)
80	},
81}
82
83func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) {
84	var hosts []*HostInfo
85	for _, hostport := range addrs {
86		resolvedHosts, err := hostInfo(hostport, defaultPort)
87		if err != nil {
88			// Try other hosts if unable to resolve DNS name
89			if _, ok := err.(*net.DNSError); ok {
90				Logger.Printf("gocql: dns error: %v\n", err)
91				continue
92			}
93			return nil, err
94		}
95
96		hosts = append(hosts, resolvedHosts...)
97	}
98	if len(hosts) == 0 {
99		return nil, errors.New("failed to resolve any of the provided hostnames")
100	}
101	return hosts, nil
102}
103
104// NewSession wraps an existing Node.
105func NewSession(cfg ClusterConfig) (*Session, error) {
106	// Check that hosts in the ClusterConfig is not empty
107	if len(cfg.Hosts) < 1 {
108		return nil, ErrNoHosts
109	}
110
111	// Check that either Authenticator is set or AuthProvider, not both
112	if cfg.Authenticator != nil && cfg.AuthProvider != nil {
113		return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
114	}
115
116	s := &Session{
117		cons:            cfg.Consistency,
118		prefetch:        0.25,
119		cfg:             cfg,
120		pageSize:        cfg.PageSize,
121		stmtsLRU:        &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
122		quit:            make(chan struct{}),
123		connectObserver: cfg.ConnectObserver,
124	}
125
126	s.schemaDescriber = newSchemaDescriber(s)
127
128	s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent)
129	s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent)
130
131	s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
132
133	s.hostSource = &ringDescriber{session: s}
134
135	if cfg.PoolConfig.HostSelectionPolicy == nil {
136		cfg.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
137	}
138	s.pool = cfg.PoolConfig.buildPool(s)
139
140	s.policy = cfg.PoolConfig.HostSelectionPolicy
141	s.policy.Init(s)
142
143	s.executor = &queryExecutor{
144		pool:   s.pool,
145		policy: cfg.PoolConfig.HostSelectionPolicy,
146	}
147
148	s.queryObserver = cfg.QueryObserver
149	s.batchObserver = cfg.BatchObserver
150	s.connectObserver = cfg.ConnectObserver
151	s.frameObserver = cfg.FrameHeaderObserver
152
153	//Check the TLS Config before trying to connect to anything external
154	connCfg, err := connConfig(&s.cfg)
155	if err != nil {
156		//TODO: Return a typed error
157		return nil, fmt.Errorf("gocql: unable to create session: %v", err)
158	}
159	s.connCfg = connCfg
160
161	if err := s.init(); err != nil {
162		s.Close()
163		if err == ErrNoConnectionsStarted {
164			//This error used to be generated inside NewSession & returned directly
165			//Forward it on up to be backwards compatible
166			return nil, ErrNoConnectionsStarted
167		} else {
168			// TODO(zariel): dont wrap this error in fmt.Errorf, return a typed error
169			return nil, fmt.Errorf("gocql: unable to create session: %v", err)
170		}
171	}
172
173	return s, nil
174}
175
176func (s *Session) init() error {
177	hosts, err := addrsToHosts(s.cfg.Hosts, s.cfg.Port)
178	if err != nil {
179		return err
180	}
181	s.ring.endpoints = hosts
182
183	if !s.cfg.disableControlConn {
184		s.control = createControlConn(s)
185		if s.cfg.ProtoVersion == 0 {
186			proto, err := s.control.discoverProtocol(hosts)
187			if err != nil {
188				return fmt.Errorf("unable to discover protocol version: %v", err)
189			} else if proto == 0 {
190				return errors.New("unable to discovery protocol version")
191			}
192
193			// TODO(zariel): we really only need this in 1 place
194			s.cfg.ProtoVersion = proto
195			s.connCfg.ProtoVersion = proto
196		}
197
198		if err := s.control.connect(hosts); err != nil {
199			return err
200		}
201
202		if !s.cfg.DisableInitialHostLookup {
203			var partitioner string
204			newHosts, partitioner, err := s.hostSource.GetHosts()
205			if err != nil {
206				return err
207			}
208			s.policy.SetPartitioner(partitioner)
209			filteredHosts := make([]*HostInfo, 0, len(newHosts))
210			for _, host := range newHosts {
211				if !s.cfg.filterHost(host) {
212					filteredHosts = append(filteredHosts, host)
213				}
214			}
215			hosts = append(hosts, filteredHosts...)
216		}
217	}
218
219	hostMap := make(map[string]*HostInfo, len(hosts))
220	for _, host := range hosts {
221		hostMap[host.ConnectAddress().String()] = host
222	}
223
224	for _, host := range hostMap {
225		host = s.ring.addOrUpdate(host)
226		s.addNewNode(host)
227	}
228
229	// TODO(zariel): we probably dont need this any more as we verify that we
230	// can connect to one of the endpoints supplied by using the control conn.
231	// See if there are any connections in the pool
232	if s.cfg.ReconnectInterval > 0 {
233		go s.reconnectDownedHosts(s.cfg.ReconnectInterval)
234	}
235
236	// If we disable the initial host lookup, we need to still check if the
237	// cluster is using the newer system schema or not... however, if control
238	// connection is disable, we really have no choice, so we just make our
239	// best guess...
240	if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup {
241		newer, _ := checkSystemSchema(s.control)
242		s.useSystemSchema = newer
243	} else {
244		version := s.ring.rrHost().Version()
245		s.useSystemSchema = version.AtLeast(3, 0, 0)
246		s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0)
247	}
248
249	if s.pool.Size() == 0 {
250		return ErrNoConnectionsStarted
251	}
252
253	// Invoke KeyspaceChanged to let the policy cache the session keyspace
254	// parameters. This is used by tokenAwareHostPolicy to discover replicas.
255	if !s.cfg.disableControlConn && s.cfg.Keyspace != "" {
256		s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: s.cfg.Keyspace})
257	}
258
259	return nil
260}
261
262func (s *Session) reconnectDownedHosts(intv time.Duration) {
263	reconnectTicker := time.NewTicker(intv)
264	defer reconnectTicker.Stop()
265
266	for {
267		select {
268		case <-reconnectTicker.C:
269			hosts := s.ring.allHosts()
270
271			// Print session.ring for debug.
272			if gocqlDebug {
273				buf := bytes.NewBufferString("Session.ring:")
274				for _, h := range hosts {
275					buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]")
276				}
277				Logger.Println(buf.String())
278			}
279
280			for _, h := range hosts {
281				if h.IsUp() {
282					continue
283				}
284				s.handleNodeUp(h.ConnectAddress(), h.Port(), true)
285			}
286		case <-s.quit:
287			return
288		}
289	}
290}
291
292// SetConsistency sets the default consistency level for this session. This
293// setting can also be changed on a per-query basis and the default value
294// is Quorum.
295func (s *Session) SetConsistency(cons Consistency) {
296	s.mu.Lock()
297	s.cons = cons
298	s.mu.Unlock()
299}
300
301// SetPageSize sets the default page size for this session. A value <= 0 will
302// disable paging. This setting can also be changed on a per-query basis.
303func (s *Session) SetPageSize(n int) {
304	s.mu.Lock()
305	s.pageSize = n
306	s.mu.Unlock()
307}
308
309// SetPrefetch sets the default threshold for pre-fetching new pages. If
310// there are only p*pageSize rows remaining, the next page will be requested
311// automatically. This value can also be changed on a per-query basis and
312// the default value is 0.25.
313func (s *Session) SetPrefetch(p float64) {
314	s.mu.Lock()
315	s.prefetch = p
316	s.mu.Unlock()
317}
318
319// SetTrace sets the default tracer for this session. This setting can also
320// be changed on a per-query basis.
321func (s *Session) SetTrace(trace Tracer) {
322	s.mu.Lock()
323	s.trace = trace
324	s.mu.Unlock()
325}
326
327// Query generates a new query object for interacting with the database.
328// Further details of the query may be tweaked using the resulting query
329// value before the query is executed. Query is automatically prepared
330// if it has not previously been executed.
331func (s *Session) Query(stmt string, values ...interface{}) *Query {
332	qry := queryPool.Get().(*Query)
333	qry.session = s
334	qry.stmt = stmt
335	qry.values = values
336	qry.defaultsFromSession()
337	return qry
338}
339
340type QueryInfo struct {
341	Id          []byte
342	Args        []ColumnInfo
343	Rval        []ColumnInfo
344	PKeyColumns []int
345}
346
347// Bind generates a new query object based on the query statement passed in.
348// The query is automatically prepared if it has not previously been executed.
349// The binding callback allows the application to define which query argument
350// values will be marshalled as part of the query execution.
351// During execution, the meta data of the prepared query will be routed to the
352// binding callback, which is responsible for producing the query argument values.
353func (s *Session) Bind(stmt string, b func(q *QueryInfo) ([]interface{}, error)) *Query {
354	qry := queryPool.Get().(*Query)
355	qry.session = s
356	qry.stmt = stmt
357	qry.binding = b
358	qry.defaultsFromSession()
359	return qry
360}
361
362// Close closes all connections. The session is unusable after this
363// operation.
364func (s *Session) Close() {
365
366	s.closeMu.Lock()
367	defer s.closeMu.Unlock()
368	if s.isClosed {
369		return
370	}
371	s.isClosed = true
372
373	if s.pool != nil {
374		s.pool.Close()
375	}
376
377	if s.control != nil {
378		s.control.close()
379	}
380
381	if s.nodeEvents != nil {
382		s.nodeEvents.stop()
383	}
384
385	if s.schemaEvents != nil {
386		s.schemaEvents.stop()
387	}
388
389	if s.quit != nil {
390		close(s.quit)
391	}
392}
393
394func (s *Session) Closed() bool {
395	s.closeMu.RLock()
396	closed := s.isClosed
397	s.closeMu.RUnlock()
398	return closed
399}
400
401func (s *Session) executeQuery(qry *Query) (it *Iter) {
402	// fail fast
403	if s.Closed() {
404		return &Iter{err: ErrSessionClosed}
405	}
406
407	iter, err := s.executor.executeQuery(qry)
408	if err != nil {
409		return &Iter{err: err}
410	}
411	if iter == nil {
412		panic("nil iter")
413	}
414
415	return iter
416}
417
418func (s *Session) removeHost(h *HostInfo) {
419	s.policy.RemoveHost(h)
420	s.pool.removeHost(h.ConnectAddress())
421	s.ring.removeHost(h.ConnectAddress())
422}
423
424// KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist.
425func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
426	// fail fast
427	if s.Closed() {
428		return nil, ErrSessionClosed
429	} else if keyspace == "" {
430		return nil, ErrNoKeyspace
431	}
432
433	return s.schemaDescriber.getSchema(keyspace)
434}
435
436func (s *Session) getConn() *Conn {
437	hosts := s.ring.allHosts()
438	for _, host := range hosts {
439		if !host.IsUp() {
440			continue
441		}
442
443		pool, ok := s.pool.getPool(host)
444		if !ok {
445			continue
446		} else if conn := pool.Pick(); conn != nil {
447			return conn
448		}
449	}
450
451	return nil
452}
453
454// returns routing key indexes and type info
455func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyInfo, error) {
456	s.routingKeyInfoCache.mu.Lock()
457
458	entry, cached := s.routingKeyInfoCache.lru.Get(stmt)
459	if cached {
460		// done accessing the cache
461		s.routingKeyInfoCache.mu.Unlock()
462		// the entry is an inflight struct similar to that used by
463		// Conn to prepare statements
464		inflight := entry.(*inflightCachedEntry)
465
466		// wait for any inflight work
467		inflight.wg.Wait()
468
469		if inflight.err != nil {
470			return nil, inflight.err
471		}
472
473		key, _ := inflight.value.(*routingKeyInfo)
474
475		return key, nil
476	}
477
478	// create a new inflight entry while the data is created
479	inflight := new(inflightCachedEntry)
480	inflight.wg.Add(1)
481	defer inflight.wg.Done()
482	s.routingKeyInfoCache.lru.Add(stmt, inflight)
483	s.routingKeyInfoCache.mu.Unlock()
484
485	var (
486		info         *preparedStatment
487		partitionKey []*ColumnMetadata
488	)
489
490	conn := s.getConn()
491	if conn == nil {
492		// TODO: better error?
493		inflight.err = errors.New("gocql: unable to fetch prepared info: no connection available")
494		return nil, inflight.err
495	}
496
497	// get the query info for the statement
498	info, inflight.err = conn.prepareStatement(ctx, stmt, nil)
499	if inflight.err != nil {
500		// don't cache this error
501		s.routingKeyInfoCache.Remove(stmt)
502		return nil, inflight.err
503	}
504
505	// TODO: it would be nice to mark hosts here but as we are not using the policies
506	// to fetch hosts we cant
507
508	if info.request.colCount == 0 {
509		// no arguments, no routing key, and no error
510		return nil, nil
511	}
512
513	if len(info.request.pkeyColumns) > 0 {
514		// proto v4 dont need to calculate primary key columns
515		types := make([]TypeInfo, len(info.request.pkeyColumns))
516		for i, col := range info.request.pkeyColumns {
517			types[i] = info.request.columns[col].TypeInfo
518		}
519
520		routingKeyInfo := &routingKeyInfo{
521			indexes: info.request.pkeyColumns,
522			types:   types,
523		}
524
525		inflight.value = routingKeyInfo
526		return routingKeyInfo, nil
527	}
528
529	// get the table metadata
530	table := info.request.columns[0].Table
531
532	var keyspaceMetadata *KeyspaceMetadata
533	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace)
534	if inflight.err != nil {
535		// don't cache this error
536		s.routingKeyInfoCache.Remove(stmt)
537		return nil, inflight.err
538	}
539
540	tableMetadata, found := keyspaceMetadata.Tables[table]
541	if !found {
542		// unlikely that the statement could be prepared and the metadata for
543		// the table couldn't be found, but this may indicate either a bug
544		// in the metadata code, or that the table was just dropped.
545		inflight.err = ErrNoMetadata
546		// don't cache this error
547		s.routingKeyInfoCache.Remove(stmt)
548		return nil, inflight.err
549	}
550
551	partitionKey = tableMetadata.PartitionKey
552
553	size := len(partitionKey)
554	routingKeyInfo := &routingKeyInfo{
555		indexes: make([]int, size),
556		types:   make([]TypeInfo, size),
557	}
558
559	for keyIndex, keyColumn := range partitionKey {
560		// set an indicator for checking if the mapping is missing
561		routingKeyInfo.indexes[keyIndex] = -1
562
563		// find the column in the query info
564		for argIndex, boundColumn := range info.request.columns {
565			if keyColumn.Name == boundColumn.Name {
566				// there may be many such bound columns, pick the first
567				routingKeyInfo.indexes[keyIndex] = argIndex
568				routingKeyInfo.types[keyIndex] = boundColumn.TypeInfo
569				break
570			}
571		}
572
573		if routingKeyInfo.indexes[keyIndex] == -1 {
574			// missing a routing key column mapping
575			// no routing key, and no error
576			return nil, nil
577		}
578	}
579
580	// cache this result
581	inflight.value = routingKeyInfo
582
583	return routingKeyInfo, nil
584}
585
586func (b *Batch) execute(ctx context.Context, conn *Conn) *Iter {
587	return conn.executeBatch(ctx, b)
588}
589
590func (s *Session) executeBatch(batch *Batch) *Iter {
591	// fail fast
592	if s.Closed() {
593		return &Iter{err: ErrSessionClosed}
594	}
595
596	// Prevent the execution of the batch if greater than the limit
597	// Currently batches have a limit of 65536 queries.
598	// https://datastax-oss.atlassian.net/browse/JAVA-229
599	if batch.Size() > BatchSizeMaximum {
600		return &Iter{err: ErrTooManyStmts}
601	}
602
603	iter, err := s.executor.executeQuery(batch)
604	if err != nil {
605		return &Iter{err: err}
606	}
607
608	return iter
609}
610
611// ExecuteBatch executes a batch operation and returns nil if successful
612// otherwise an error is returned describing the failure.
613func (s *Session) ExecuteBatch(batch *Batch) error {
614	iter := s.executeBatch(batch)
615	return iter.Close()
616}
617
618// ExecuteBatchCAS executes a batch operation and returns true if successful and
619// an iterator (to scan aditional rows if more than one conditional statement)
620// was sent.
621// Further scans on the interator must also remember to include
622// the applied boolean as the first argument to *Iter.Scan
623func (s *Session) ExecuteBatchCAS(batch *Batch, dest ...interface{}) (applied bool, iter *Iter, err error) {
624	iter = s.executeBatch(batch)
625	if err := iter.checkErrAndNotFound(); err != nil {
626		iter.Close()
627		return false, nil, err
628	}
629
630	if len(iter.Columns()) > 1 {
631		dest = append([]interface{}{&applied}, dest...)
632		iter.Scan(dest...)
633	} else {
634		iter.Scan(&applied)
635	}
636
637	return applied, iter, nil
638}
639
640// MapExecuteBatchCAS executes a batch operation much like ExecuteBatchCAS,
641// however it accepts a map rather than a list of arguments for the initial
642// scan.
643func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) (applied bool, iter *Iter, err error) {
644	iter = s.executeBatch(batch)
645	if err := iter.checkErrAndNotFound(); err != nil {
646		iter.Close()
647		return false, nil, err
648	}
649	iter.MapScan(dest)
650	applied = dest["[applied]"].(bool)
651	delete(dest, "[applied]")
652
653	// we usually close here, but instead of closing, just returin an error
654	// if MapScan failed. Although Close just returns err, using Close
655	// here might be confusing as we are not actually closing the iter
656	return applied, iter, iter.err
657}
658
659type hostMetrics struct {
660	Attempts     int
661	TotalLatency int64
662}
663
664type queryMetrics struct {
665	l sync.RWMutex
666	m map[string]*hostMetrics
667}
668
669// Query represents a CQL statement that can be executed.
670type Query struct {
671	stmt                  string
672	values                []interface{}
673	cons                  Consistency
674	pageSize              int
675	routingKey            []byte
676	routingKeyBuffer      []byte
677	pageState             []byte
678	prefetch              float64
679	trace                 Tracer
680	observer              QueryObserver
681	session               *Session
682	rt                    RetryPolicy
683	spec                  SpeculativeExecutionPolicy
684	binding               func(q *QueryInfo) ([]interface{}, error)
685	serialCons            SerialConsistency
686	defaultTimestamp      bool
687	defaultTimestampValue int64
688	disableSkipMetadata   bool
689	context               context.Context
690	idempotent            bool
691	customPayload         map[string][]byte
692	metrics               *queryMetrics
693
694	disableAutoPage bool
695}
696
697func (q *Query) defaultsFromSession() {
698	s := q.session
699
700	s.mu.RLock()
701	q.cons = s.cons
702	q.pageSize = s.pageSize
703	q.trace = s.trace
704	q.observer = s.queryObserver
705	q.prefetch = s.prefetch
706	q.rt = s.cfg.RetryPolicy
707	q.serialCons = s.cfg.SerialConsistency
708	q.defaultTimestamp = s.cfg.DefaultTimestamp
709	q.idempotent = s.cfg.DefaultIdempotence
710	q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
711
712	q.spec = &NonSpeculativeExecution{}
713	s.mu.RUnlock()
714}
715
716func (q *Query) getHostMetrics(host *HostInfo) *hostMetrics {
717	q.metrics.l.Lock()
718	metrics, exists := q.metrics.m[host.ConnectAddress().String()]
719	if !exists {
720		// if the host is not in the map, it means it's been accessed for the first time
721		metrics = &hostMetrics{}
722		q.metrics.m[host.ConnectAddress().String()] = metrics
723	}
724	q.metrics.l.Unlock()
725
726	return metrics
727}
728
729// Statement returns the statement that was used to generate this query.
730func (q Query) Statement() string {
731	return q.stmt
732}
733
734// String implements the stringer interface.
735func (q Query) String() string {
736	return fmt.Sprintf("[query statement=%q values=%+v consistency=%s]", q.stmt, q.values, q.cons)
737}
738
739//Attempts returns the number of times the query was executed.
740func (q *Query) Attempts() int {
741	q.metrics.l.Lock()
742	var attempts int
743	for _, metric := range q.metrics.m {
744		attempts += metric.Attempts
745	}
746	q.metrics.l.Unlock()
747	return attempts
748}
749
750func (q *Query) AddAttempts(i int, host *HostInfo) {
751	hostMetric := q.getHostMetrics(host)
752	q.metrics.l.Lock()
753	hostMetric.Attempts += i
754	q.metrics.l.Unlock()
755}
756
757//Latency returns the average amount of nanoseconds per attempt of the query.
758func (q *Query) Latency() int64 {
759	q.metrics.l.Lock()
760	var attempts int
761	var latency int64
762	for _, metric := range q.metrics.m {
763		attempts += metric.Attempts
764		latency += metric.TotalLatency
765	}
766	q.metrics.l.Unlock()
767	if attempts > 0 {
768		return latency / int64(attempts)
769	}
770	return 0
771}
772
773func (q *Query) AddLatency(l int64, host *HostInfo) {
774	hostMetric := q.getHostMetrics(host)
775	q.metrics.l.Lock()
776	hostMetric.TotalLatency += l
777	q.metrics.l.Unlock()
778}
779
780// Consistency sets the consistency level for this query. If no consistency
781// level have been set, the default consistency level of the cluster
782// is used.
783func (q *Query) Consistency(c Consistency) *Query {
784	q.cons = c
785	return q
786}
787
788// GetConsistency returns the currently configured consistency level for
789// the query.
790func (q *Query) GetConsistency() Consistency {
791	return q.cons
792}
793
794// Same as Consistency but without a return value
795func (q *Query) SetConsistency(c Consistency) {
796	q.cons = c
797}
798
799// CustomPayload sets the custom payload level for this query.
800func (q *Query) CustomPayload(customPayload map[string][]byte) *Query {
801	q.customPayload = customPayload
802	return q
803}
804
805func (q *Query) Context() context.Context {
806	if q.context == nil {
807		return context.Background()
808	}
809	return q.context
810}
811
812// Trace enables tracing of this query. Look at the documentation of the
813// Tracer interface to learn more about tracing.
814func (q *Query) Trace(trace Tracer) *Query {
815	q.trace = trace
816	return q
817}
818
819// Observer enables query-level observer on this query.
820// The provided observer will be called every time this query is executed.
821func (q *Query) Observer(observer QueryObserver) *Query {
822	q.observer = observer
823	return q
824}
825
826// PageSize will tell the iterator to fetch the result in pages of size n.
827// This is useful for iterating over large result sets, but setting the
828// page size too low might decrease the performance. This feature is only
829// available in Cassandra 2 and onwards.
830func (q *Query) PageSize(n int) *Query {
831	q.pageSize = n
832	return q
833}
834
835// DefaultTimestamp will enable the with default timestamp flag on the query.
836// If enable, this will replace the server side assigned
837// timestamp as default timestamp. Note that a timestamp in the query itself
838// will still override this timestamp. This is entirely optional.
839//
840// Only available on protocol >= 3
841func (q *Query) DefaultTimestamp(enable bool) *Query {
842	q.defaultTimestamp = enable
843	return q
844}
845
846// WithTimestamp will enable the with default timestamp flag on the query
847// like DefaultTimestamp does. But also allows to define value for timestamp.
848// It works the same way as USING TIMESTAMP in the query itself, but
849// should not break prepared query optimization
850//
851// Only available on protocol >= 3
852func (q *Query) WithTimestamp(timestamp int64) *Query {
853	q.DefaultTimestamp(true)
854	q.defaultTimestampValue = timestamp
855	return q
856}
857
858// RoutingKey sets the routing key to use when a token aware connection
859// pool is used to optimize the routing of this query.
860func (q *Query) RoutingKey(routingKey []byte) *Query {
861	q.routingKey = routingKey
862	return q
863}
864
865func (q *Query) withContext(ctx context.Context) ExecutableQuery {
866	// I really wish go had covariant types
867	return q.WithContext(ctx)
868}
869
870// WithContext returns a shallow copy of q with its context
871// set to ctx.
872//
873// The provided context controls the entire lifetime of executing a
874// query, queries will be canceled and return once the context is
875// canceled.
876func (q *Query) WithContext(ctx context.Context) *Query {
877	q2 := *q
878	q2.context = ctx
879	return &q2
880}
881
882// Deprecate: does nothing, cancel the context passed to WithContext
883func (q *Query) Cancel() {
884	// TODO: delete
885}
886
887func (q *Query) execute(ctx context.Context, conn *Conn) *Iter {
888	return conn.executeQuery(ctx, q)
889}
890
891func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
892	q.AddAttempts(1, host)
893	q.AddLatency(end.Sub(start).Nanoseconds(), host)
894
895	if q.observer != nil {
896		q.observer.ObserveQuery(q.Context(), ObservedQuery{
897			Keyspace:  keyspace,
898			Statement: q.stmt,
899			Start:     start,
900			End:       end,
901			Rows:      iter.numRows,
902			Host:      host,
903			Metrics:   q.getHostMetrics(host),
904			Err:       iter.err,
905		})
906	}
907}
908
909func (q *Query) retryPolicy() RetryPolicy {
910	return q.rt
911}
912
913// Keyspace returns the keyspace the query will be executed against.
914func (q *Query) Keyspace() string {
915	if q.session == nil {
916		return ""
917	}
918	// TODO(chbannis): this should be parsed from the query or we should let
919	// this be set by users.
920	return q.session.cfg.Keyspace
921}
922
923// GetRoutingKey gets the routing key to use for routing this query. If
924// a routing key has not been explicitly set, then the routing key will
925// be constructed if possible using the keyspace's schema and the query
926// info for this query statement. If the routing key cannot be determined
927// then nil will be returned with no error. On any error condition,
928// an error description will be returned.
929func (q *Query) GetRoutingKey() ([]byte, error) {
930	if q.routingKey != nil {
931		return q.routingKey, nil
932	} else if q.binding != nil && len(q.values) == 0 {
933		// If this query was created using session.Bind we wont have the query
934		// values yet, so we have to pass down to the next policy.
935		// TODO: Remove this and handle this case
936		return nil, nil
937	}
938
939	// try to determine the routing key
940	routingKeyInfo, err := q.session.routingKeyInfo(q.Context(), q.stmt)
941	if err != nil {
942		return nil, err
943	}
944
945	if routingKeyInfo == nil {
946		return nil, nil
947	}
948
949	if len(routingKeyInfo.indexes) == 1 {
950		// single column routing key
951		routingKey, err := Marshal(
952			routingKeyInfo.types[0],
953			q.values[routingKeyInfo.indexes[0]],
954		)
955		if err != nil {
956			return nil, err
957		}
958		return routingKey, nil
959	}
960
961	// We allocate that buffer only once, so that further re-bind/exec of the
962	// same query don't allocate more memory.
963	if q.routingKeyBuffer == nil {
964		q.routingKeyBuffer = make([]byte, 0, 256)
965	}
966
967	// composite routing key
968	buf := bytes.NewBuffer(q.routingKeyBuffer)
969	for i := range routingKeyInfo.indexes {
970		encoded, err := Marshal(
971			routingKeyInfo.types[i],
972			q.values[routingKeyInfo.indexes[i]],
973		)
974		if err != nil {
975			return nil, err
976		}
977		lenBuf := []byte{0x00, 0x00}
978		binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded)))
979		buf.Write(lenBuf)
980		buf.Write(encoded)
981		buf.WriteByte(0x00)
982	}
983	routingKey := buf.Bytes()
984	return routingKey, nil
985}
986
987func (q *Query) shouldPrepare() bool {
988
989	stmt := strings.TrimLeftFunc(strings.TrimRightFunc(q.stmt, func(r rune) bool {
990		return unicode.IsSpace(r) || r == ';'
991	}), unicode.IsSpace)
992
993	var stmtType string
994	if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 {
995		stmtType = strings.ToLower(stmt[:n])
996	}
997	if stmtType == "begin" {
998		if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 {
999			stmtType = strings.ToLower(stmt[n+1:])
1000		}
1001	}
1002	switch stmtType {
1003	case "select", "insert", "update", "delete", "batch":
1004		return true
1005	}
1006	return false
1007}
1008
1009// SetPrefetch sets the default threshold for pre-fetching new pages. If
1010// there are only p*pageSize rows remaining, the next page will be requested
1011// automatically.
1012func (q *Query) Prefetch(p float64) *Query {
1013	q.prefetch = p
1014	return q
1015}
1016
1017// RetryPolicy sets the policy to use when retrying the query.
1018func (q *Query) RetryPolicy(r RetryPolicy) *Query {
1019	q.rt = r
1020	return q
1021}
1022
1023// SetSpeculativeExecutionPolicy sets the execution policy
1024func (q *Query) SetSpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Query {
1025	q.spec = sp
1026	return q
1027}
1028
1029// speculativeExecutionPolicy fetches the policy
1030func (q *Query) speculativeExecutionPolicy() SpeculativeExecutionPolicy {
1031	return q.spec
1032}
1033
1034func (q *Query) IsIdempotent() bool {
1035	return q.idempotent
1036}
1037
1038// Idempotent marks the query as being idempotent or not depending on
1039// the value.
1040func (q *Query) Idempotent(value bool) *Query {
1041	q.idempotent = value
1042	return q
1043}
1044
1045// Bind sets query arguments of query. This can also be used to rebind new query arguments
1046// to an existing query instance.
1047func (q *Query) Bind(v ...interface{}) *Query {
1048	q.values = v
1049	return q
1050}
1051
1052// SerialConsistency sets the consistency level for the
1053// serial phase of conditional updates. That consistency can only be
1054// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
1055// SERIAL. This option will be ignored for anything else that a
1056// conditional update/insert.
1057func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
1058	q.serialCons = cons
1059	return q
1060}
1061
1062// PageState sets the paging state for the query to resume paging from a specific
1063// point in time. Setting this will disable to query paging for this query, and
1064// must be used for all subsequent pages.
1065func (q *Query) PageState(state []byte) *Query {
1066	q.pageState = state
1067	q.disableAutoPage = true
1068	return q
1069}
1070
1071// NoSkipMetadata will override the internal result metadata cache so that the driver does not
1072// send skip_metadata for queries, this means that the result will always contain
1073// the metadata to parse the rows and will not reuse the metadata from the prepared
1074// staement. This should only be used to work around cassandra bugs, such as when using
1075// CAS operations which do not end in Cas.
1076//
1077// See https://issues.apache.org/jira/browse/CASSANDRA-11099
1078// https://github.com/gocql/gocql/issues/612
1079func (q *Query) NoSkipMetadata() *Query {
1080	q.disableSkipMetadata = true
1081	return q
1082}
1083
1084// Exec executes the query without returning any rows.
1085func (q *Query) Exec() error {
1086	return q.Iter().Close()
1087}
1088
1089func isUseStatement(stmt string) bool {
1090	if len(stmt) < 3 {
1091		return false
1092	}
1093
1094	return strings.EqualFold(stmt[0:3], "use")
1095}
1096
1097// Iter executes the query and returns an iterator capable of iterating
1098// over all results.
1099func (q *Query) Iter() *Iter {
1100	if isUseStatement(q.stmt) {
1101		return &Iter{err: ErrUseStmt}
1102	}
1103	return q.session.executeQuery(q)
1104}
1105
1106// MapScan executes the query, copies the columns of the first selected
1107// row into the map pointed at by m and discards the rest. If no rows
1108// were selected, ErrNotFound is returned.
1109func (q *Query) MapScan(m map[string]interface{}) error {
1110	iter := q.Iter()
1111	if err := iter.checkErrAndNotFound(); err != nil {
1112		return err
1113	}
1114	iter.MapScan(m)
1115	return iter.Close()
1116}
1117
1118// Scan executes the query, copies the columns of the first selected
1119// row into the values pointed at by dest and discards the rest. If no rows
1120// were selected, ErrNotFound is returned.
1121func (q *Query) Scan(dest ...interface{}) error {
1122	iter := q.Iter()
1123	if err := iter.checkErrAndNotFound(); err != nil {
1124		return err
1125	}
1126	iter.Scan(dest...)
1127	return iter.Close()
1128}
1129
1130// ScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT
1131// statement containing an IF clause). If the transaction fails because
1132// the existing values did not match, the previous values will be stored
1133// in dest.
1134func (q *Query) ScanCAS(dest ...interface{}) (applied bool, err error) {
1135	q.disableSkipMetadata = true
1136	iter := q.Iter()
1137	if err := iter.checkErrAndNotFound(); err != nil {
1138		return false, err
1139	}
1140	if len(iter.Columns()) > 1 {
1141		dest = append([]interface{}{&applied}, dest...)
1142		iter.Scan(dest...)
1143	} else {
1144		iter.Scan(&applied)
1145	}
1146	return applied, iter.Close()
1147}
1148
1149// MapScanCAS executes a lightweight transaction (i.e. an UPDATE or INSERT
1150// statement containing an IF clause). If the transaction fails because
1151// the existing values did not match, the previous values will be stored
1152// in dest map.
1153//
1154// As for INSERT .. IF NOT EXISTS, previous values will be returned as if
1155// SELECT * FROM. So using ScanCAS with INSERT is inherently prone to
1156// column mismatching. MapScanCAS is added to capture them safely.
1157func (q *Query) MapScanCAS(dest map[string]interface{}) (applied bool, err error) {
1158	q.disableSkipMetadata = true
1159	iter := q.Iter()
1160	if err := iter.checkErrAndNotFound(); err != nil {
1161		return false, err
1162	}
1163	iter.MapScan(dest)
1164	applied = dest["[applied]"].(bool)
1165	delete(dest, "[applied]")
1166
1167	return applied, iter.Close()
1168}
1169
1170// Release releases a query back into a pool of queries. Released Queries
1171// cannot be reused.
1172//
1173// Example:
1174// 		qry := session.Query("SELECT * FROM my_table")
1175// 		qry.Exec()
1176// 		qry.Release()
1177func (q *Query) Release() {
1178	q.reset()
1179	queryPool.Put(q)
1180}
1181
1182// reset zeroes out all fields of a query so that it can be safely pooled.
1183func (q *Query) reset() {
1184	*q = Query{}
1185}
1186
1187// Iter represents an iterator that can be used to iterate over all rows that
1188// were returned by a query. The iterator might send additional queries to the
1189// database during the iteration if paging was enabled.
1190type Iter struct {
1191	err     error
1192	pos     int
1193	meta    resultMetadata
1194	numRows int
1195	next    *nextIter
1196	host    *HostInfo
1197
1198	framer *framer
1199	closed int32
1200}
1201
1202// Host returns the host which the query was sent to.
1203func (iter *Iter) Host() *HostInfo {
1204	return iter.host
1205}
1206
1207// Columns returns the name and type of the selected columns.
1208func (iter *Iter) Columns() []ColumnInfo {
1209	return iter.meta.columns
1210}
1211
1212type Scanner interface {
1213	// Next advances the row pointer to point at the next row, the row is valid until
1214	// the next call of Next. It returns true if there is a row which is available to be
1215	// scanned into with Scan.
1216	// Next must be called before every call to Scan.
1217	Next() bool
1218
1219	// Scan copies the current row's columns into dest. If the length of dest does not equal
1220	// the number of columns returned in the row an error is returned. If an error is encountered
1221	// when unmarshalling a column into the value in dest an error is returned and the row is invalidated
1222	// until the next call to Next.
1223	// Next must be called before calling Scan, if it is not an error is returned.
1224	Scan(...interface{}) error
1225
1226	// Err returns the if there was one during iteration that resulted in iteration being unable to complete.
1227	// Err will also release resources held by the iterator, the Scanner should not used after being called.
1228	Err() error
1229}
1230
1231type iterScanner struct {
1232	iter  *Iter
1233	cols  [][]byte
1234	valid bool
1235}
1236
1237func (is *iterScanner) Next() bool {
1238	iter := is.iter
1239	if iter.err != nil {
1240		return false
1241	}
1242
1243	if iter.pos >= iter.numRows {
1244		if iter.next != nil {
1245			is.iter = iter.next.fetch()
1246			return is.Next()
1247		}
1248		return false
1249	}
1250
1251	for i := 0; i < len(is.cols); i++ {
1252		col, err := iter.readColumn()
1253		if err != nil {
1254			iter.err = err
1255			return false
1256		}
1257		is.cols[i] = col
1258	}
1259	iter.pos++
1260	is.valid = true
1261
1262	return true
1263}
1264
1265func scanColumn(p []byte, col ColumnInfo, dest []interface{}) (int, error) {
1266	if dest[0] == nil {
1267		return 1, nil
1268	}
1269
1270	if col.TypeInfo.Type() == TypeTuple {
1271		// this will panic, actually a bug, please report
1272		tuple := col.TypeInfo.(TupleTypeInfo)
1273
1274		count := len(tuple.Elems)
1275		// here we pass in a slice of the struct which has the number number of
1276		// values as elements in the tuple
1277		if err := Unmarshal(col.TypeInfo, p, dest[:count]); err != nil {
1278			return 0, err
1279		}
1280		return count, nil
1281	} else {
1282		if err := Unmarshal(col.TypeInfo, p, dest[0]); err != nil {
1283			return 0, err
1284		}
1285		return 1, nil
1286	}
1287}
1288
1289func (is *iterScanner) Scan(dest ...interface{}) error {
1290	if !is.valid {
1291		return errors.New("gocql: Scan called without calling Next")
1292	}
1293
1294	iter := is.iter
1295	// currently only support scanning into an expand tuple, such that its the same
1296	// as scanning in more values from a single column
1297	if len(dest) != iter.meta.actualColCount {
1298		return fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
1299	}
1300
1301	// i is the current position in dest, could posible replace it and just use
1302	// slices of dest
1303	i := 0
1304	var err error
1305	for _, col := range iter.meta.columns {
1306		var n int
1307		n, err = scanColumn(is.cols[i], col, dest[i:])
1308		if err != nil {
1309			break
1310		}
1311		i += n
1312	}
1313
1314	is.valid = false
1315	return err
1316}
1317
1318func (is *iterScanner) Err() error {
1319	iter := is.iter
1320	is.iter = nil
1321	is.cols = nil
1322	is.valid = false
1323	return iter.Close()
1324}
1325
1326// Scanner returns a row Scanner which provides an interface to scan rows in a manner which is
1327// similar to database/sql. The iter should NOT be used again after calling this method.
1328func (iter *Iter) Scanner() Scanner {
1329	if iter == nil {
1330		return nil
1331	}
1332
1333	return &iterScanner{iter: iter, cols: make([][]byte, len(iter.meta.columns))}
1334}
1335
1336func (iter *Iter) readColumn() ([]byte, error) {
1337	return iter.framer.readBytesInternal()
1338}
1339
1340// Scan consumes the next row of the iterator and copies the columns of the
1341// current row into the values pointed at by dest. Use nil as a dest value
1342// to skip the corresponding column. Scan might send additional queries
1343// to the database to retrieve the next set of rows if paging was enabled.
1344//
1345// Scan returns true if the row was successfully unmarshaled or false if the
1346// end of the result set was reached or if an error occurred. Close should
1347// be called afterwards to retrieve any potential errors.
1348func (iter *Iter) Scan(dest ...interface{}) bool {
1349	if iter.err != nil {
1350		return false
1351	}
1352
1353	if iter.pos >= iter.numRows {
1354		if iter.next != nil {
1355			*iter = *iter.next.fetch()
1356			return iter.Scan(dest...)
1357		}
1358		return false
1359	}
1360
1361	if iter.next != nil && iter.pos >= iter.next.pos {
1362		go iter.next.fetch()
1363	}
1364
1365	// currently only support scanning into an expand tuple, such that its the same
1366	// as scanning in more values from a single column
1367	if len(dest) != iter.meta.actualColCount {
1368		iter.err = fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
1369		return false
1370	}
1371
1372	// i is the current position in dest, could posible replace it and just use
1373	// slices of dest
1374	i := 0
1375	for _, col := range iter.meta.columns {
1376		colBytes, err := iter.readColumn()
1377		if err != nil {
1378			iter.err = err
1379			return false
1380		}
1381
1382		n, err := scanColumn(colBytes, col, dest[i:])
1383		if err != nil {
1384			iter.err = err
1385			return false
1386		}
1387		i += n
1388	}
1389
1390	iter.pos++
1391	return true
1392}
1393
1394// GetCustomPayload returns any parsed custom payload results if given in the
1395// response from Cassandra. Note that the result is not a copy.
1396//
1397// This additional feature of CQL Protocol v4
1398// allows additional results and query information to be returned by
1399// custom QueryHandlers running in your C* cluster.
1400// See https://datastax.github.io/java-driver/manual/custom_payloads/
1401func (iter *Iter) GetCustomPayload() map[string][]byte {
1402	return iter.framer.customPayload
1403}
1404
1405// Warnings returns any warnings generated if given in the response from Cassandra.
1406//
1407// This is only available starting with CQL Protocol v4.
1408func (iter *Iter) Warnings() []string {
1409	if iter.framer != nil {
1410		return iter.framer.header.warnings
1411	}
1412	return nil
1413}
1414
1415// Close closes the iterator and returns any errors that happened during
1416// the query or the iteration.
1417func (iter *Iter) Close() error {
1418	if atomic.CompareAndSwapInt32(&iter.closed, 0, 1) {
1419		if iter.framer != nil {
1420			iter.framer = nil
1421		}
1422	}
1423
1424	return iter.err
1425}
1426
1427// WillSwitchPage detects if iterator reached end of current page
1428// and the next page is available.
1429func (iter *Iter) WillSwitchPage() bool {
1430	return iter.pos >= iter.numRows && iter.next != nil
1431}
1432
1433// checkErrAndNotFound handle error and NotFound in one method.
1434func (iter *Iter) checkErrAndNotFound() error {
1435	if iter.err != nil {
1436		return iter.err
1437	} else if iter.numRows == 0 {
1438		return ErrNotFound
1439	}
1440	return nil
1441}
1442
1443// PageState return the current paging state for a query which can be used for
1444// subsequent queries to resume paging this point.
1445func (iter *Iter) PageState() []byte {
1446	return iter.meta.pagingState
1447}
1448
1449// NumRows returns the number of rows in this pagination, it will update when new
1450// pages are fetched, it is not the value of the total number of rows this iter
1451// will return unless there is only a single page returned.
1452func (iter *Iter) NumRows() int {
1453	return iter.numRows
1454}
1455
1456type nextIter struct {
1457	qry  *Query
1458	pos  int
1459	once sync.Once
1460	next *Iter
1461}
1462
1463func (n *nextIter) fetch() *Iter {
1464	n.once.Do(func() {
1465		n.next = n.qry.session.executeQuery(n.qry)
1466	})
1467	return n.next
1468}
1469
1470type Batch struct {
1471	Type                  BatchType
1472	Entries               []BatchEntry
1473	Cons                  Consistency
1474	CustomPayload         map[string][]byte
1475	rt                    RetryPolicy
1476	spec                  SpeculativeExecutionPolicy
1477	observer              BatchObserver
1478	serialCons            SerialConsistency
1479	defaultTimestamp      bool
1480	defaultTimestampValue int64
1481	context               context.Context
1482	cancelBatch           func()
1483	keyspace              string
1484	metrics               *queryMetrics
1485}
1486
1487// NewBatch creates a new batch operation without defaults from the cluster
1488//
1489// Deprecated: use session.NewBatch instead
1490func NewBatch(typ BatchType) *Batch {
1491	return &Batch{
1492		Type:    typ,
1493		metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
1494		spec:    &NonSpeculativeExecution{},
1495	}
1496}
1497
1498// NewBatch creates a new batch operation using defaults defined in the cluster
1499func (s *Session) NewBatch(typ BatchType) *Batch {
1500	s.mu.RLock()
1501	batch := &Batch{
1502		Type:             typ,
1503		rt:               s.cfg.RetryPolicy,
1504		serialCons:       s.cfg.SerialConsistency,
1505		observer:         s.batchObserver,
1506		Cons:             s.cons,
1507		defaultTimestamp: s.cfg.DefaultTimestamp,
1508		keyspace:         s.cfg.Keyspace,
1509		metrics:          &queryMetrics{m: make(map[string]*hostMetrics)},
1510		spec:             &NonSpeculativeExecution{},
1511	}
1512
1513	s.mu.RUnlock()
1514	return batch
1515}
1516
1517func (b *Batch) getHostMetrics(host *HostInfo) *hostMetrics {
1518	b.metrics.l.Lock()
1519	metrics, exists := b.metrics.m[host.ConnectAddress().String()]
1520	if !exists {
1521		// if the host is not in the map, it means it's been accessed for the first time
1522		metrics = &hostMetrics{}
1523		b.metrics.m[host.ConnectAddress().String()] = metrics
1524	}
1525	b.metrics.l.Unlock()
1526
1527	return metrics
1528}
1529
1530// Observer enables batch-level observer on this batch.
1531// The provided observer will be called every time this batched query is executed.
1532func (b *Batch) Observer(observer BatchObserver) *Batch {
1533	b.observer = observer
1534	return b
1535}
1536
1537func (b *Batch) Keyspace() string {
1538	return b.keyspace
1539}
1540
1541// Attempts returns the number of attempts made to execute the batch.
1542func (b *Batch) Attempts() int {
1543	b.metrics.l.Lock()
1544	defer b.metrics.l.Unlock()
1545
1546	var attempts int
1547	for _, metric := range b.metrics.m {
1548		attempts += metric.Attempts
1549	}
1550	return attempts
1551}
1552
1553func (b *Batch) AddAttempts(i int, host *HostInfo) {
1554	hostMetric := b.getHostMetrics(host)
1555	b.metrics.l.Lock()
1556	hostMetric.Attempts += i
1557	b.metrics.l.Unlock()
1558}
1559
1560//Latency returns the average number of nanoseconds to execute a single attempt of the batch.
1561func (b *Batch) Latency() int64 {
1562	b.metrics.l.Lock()
1563	defer b.metrics.l.Unlock()
1564
1565	var (
1566		attempts int
1567		latency  int64
1568	)
1569	for _, metric := range b.metrics.m {
1570		attempts += metric.Attempts
1571		latency += metric.TotalLatency
1572	}
1573	if attempts > 0 {
1574		return latency / int64(attempts)
1575	}
1576	return 0
1577}
1578
1579func (b *Batch) AddLatency(l int64, host *HostInfo) {
1580	hostMetric := b.getHostMetrics(host)
1581	b.metrics.l.Lock()
1582	hostMetric.TotalLatency += l
1583	b.metrics.l.Unlock()
1584}
1585
1586// GetConsistency returns the currently configured consistency level for the batch
1587// operation.
1588func (b *Batch) GetConsistency() Consistency {
1589	return b.Cons
1590}
1591
1592// SetConsistency sets the currently configured consistency level for the batch
1593// operation.
1594func (b *Batch) SetConsistency(c Consistency) {
1595	b.Cons = c
1596}
1597
1598func (b *Batch) Context() context.Context {
1599	if b.context == nil {
1600		return context.Background()
1601	}
1602	return b.context
1603}
1604
1605func (b *Batch) IsIdempotent() bool {
1606	for _, entry := range b.Entries {
1607		if !entry.Idempotent {
1608			return false
1609		}
1610	}
1611	return true
1612}
1613
1614func (b *Batch) speculativeExecutionPolicy() SpeculativeExecutionPolicy {
1615	return b.spec
1616}
1617
1618func (b *Batch) SpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Batch {
1619	b.spec = sp
1620	return b
1621}
1622
1623// Query adds the query to the batch operation
1624func (b *Batch) Query(stmt string, args ...interface{}) {
1625	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
1626}
1627
1628// Bind adds the query to the batch operation and correlates it with a binding callback
1629// that will be invoked when the batch is executed. The binding callback allows the application
1630// to define which query argument values will be marshalled as part of the batch execution.
1631func (b *Batch) Bind(stmt string, bind func(q *QueryInfo) ([]interface{}, error)) {
1632	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, binding: bind})
1633}
1634
1635func (b *Batch) retryPolicy() RetryPolicy {
1636	return b.rt
1637}
1638
1639// RetryPolicy sets the retry policy to use when executing the batch operation
1640func (b *Batch) RetryPolicy(r RetryPolicy) *Batch {
1641	b.rt = r
1642	return b
1643}
1644
1645func (b *Batch) withContext(ctx context.Context) ExecutableQuery {
1646	return b.WithContext(ctx)
1647}
1648
1649// WithContext returns a shallow copy of b with its context
1650// set to ctx.
1651//
1652// The provided context controls the entire lifetime of executing a
1653// query, queries will be canceled and return once the context is
1654// canceled.
1655func (b *Batch) WithContext(ctx context.Context) *Batch {
1656	b2 := *b
1657	b2.context = ctx
1658	return &b2
1659}
1660
1661// Deprecate: does nothing, cancel the context passed to WithContext
1662func (*Batch) Cancel() {
1663	// TODO: delete
1664}
1665
1666// Size returns the number of batch statements to be executed by the batch operation.
1667func (b *Batch) Size() int {
1668	return len(b.Entries)
1669}
1670
1671// SerialConsistency sets the consistency level for the
1672// serial phase of conditional updates. That consistency can only be
1673// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
1674// SERIAL. This option will be ignored for anything else that a
1675// conditional update/insert.
1676//
1677// Only available for protocol 3 and above
1678func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
1679	b.serialCons = cons
1680	return b
1681}
1682
1683// DefaultTimestamp will enable the with default timestamp flag on the query.
1684// If enable, this will replace the server side assigned
1685// timestamp as default timestamp. Note that a timestamp in the query itself
1686// will still override this timestamp. This is entirely optional.
1687//
1688// Only available on protocol >= 3
1689func (b *Batch) DefaultTimestamp(enable bool) *Batch {
1690	b.defaultTimestamp = enable
1691	return b
1692}
1693
1694// WithTimestamp will enable the with default timestamp flag on the query
1695// like DefaultTimestamp does. But also allows to define value for timestamp.
1696// It works the same way as USING TIMESTAMP in the query itself, but
1697// should not break prepared query optimization
1698//
1699// Only available on protocol >= 3
1700func (b *Batch) WithTimestamp(timestamp int64) *Batch {
1701	b.DefaultTimestamp(true)
1702	b.defaultTimestampValue = timestamp
1703	return b
1704}
1705
1706func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
1707	b.AddAttempts(1, host)
1708	b.AddLatency(end.Sub(start).Nanoseconds(), host)
1709
1710	if b.observer == nil {
1711		return
1712	}
1713
1714	statements := make([]string, len(b.Entries))
1715	for i, entry := range b.Entries {
1716		statements[i] = entry.Stmt
1717	}
1718
1719	b.observer.ObserveBatch(b.Context(), ObservedBatch{
1720		Keyspace:   keyspace,
1721		Statements: statements,
1722		Start:      start,
1723		End:        end,
1724		// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
1725		Host:    host,
1726		Metrics: b.getHostMetrics(host),
1727		Err:     iter.err,
1728	})
1729}
1730
1731func (b *Batch) GetRoutingKey() ([]byte, error) {
1732	// TODO: use the first statement in the batch as the routing key?
1733	return nil, nil
1734}
1735
1736type BatchType byte
1737
1738const (
1739	LoggedBatch   BatchType = 0
1740	UnloggedBatch BatchType = 1
1741	CounterBatch  BatchType = 2
1742)
1743
1744type BatchEntry struct {
1745	Stmt       string
1746	Args       []interface{}
1747	Idempotent bool
1748	binding    func(q *QueryInfo) ([]interface{}, error)
1749}
1750
1751type ColumnInfo struct {
1752	Keyspace string
1753	Table    string
1754	Name     string
1755	TypeInfo TypeInfo
1756}
1757
1758func (c ColumnInfo) String() string {
1759	return fmt.Sprintf("[column keyspace=%s table=%s name=%s type=%v]", c.Keyspace, c.Table, c.Name, c.TypeInfo)
1760}
1761
1762// routing key indexes LRU cache
1763type routingKeyInfoLRU struct {
1764	lru *lru.Cache
1765	mu  sync.Mutex
1766}
1767
1768type routingKeyInfo struct {
1769	indexes []int
1770	types   []TypeInfo
1771}
1772
1773func (r *routingKeyInfo) String() string {
1774	return fmt.Sprintf("routing key index=%v types=%v", r.indexes, r.types)
1775}
1776
1777func (r *routingKeyInfoLRU) Remove(key string) {
1778	r.mu.Lock()
1779	r.lru.Remove(key)
1780	r.mu.Unlock()
1781}
1782
1783//Max adjusts the maximum size of the cache and cleans up the oldest records if
1784//the new max is lower than the previous value. Not concurrency safe.
1785func (r *routingKeyInfoLRU) Max(max int) {
1786	r.mu.Lock()
1787	for r.lru.Len() > max {
1788		r.lru.RemoveOldest()
1789	}
1790	r.lru.MaxEntries = max
1791	r.mu.Unlock()
1792}
1793
1794type inflightCachedEntry struct {
1795	wg    sync.WaitGroup
1796	err   error
1797	value interface{}
1798}
1799
1800// Tracer is the interface implemented by query tracers. Tracers have the
1801// ability to obtain a detailed event log of all events that happened during
1802// the execution of a query from Cassandra. Gathering this information might
1803// be essential for debugging and optimizing queries, but this feature should
1804// not be used on production systems with very high load.
1805type Tracer interface {
1806	Trace(traceId []byte)
1807}
1808
1809type traceWriter struct {
1810	session *Session
1811	w       io.Writer
1812	mu      sync.Mutex
1813}
1814
1815// NewTraceWriter returns a simple Tracer implementation that outputs
1816// the event log in a textual format.
1817func NewTraceWriter(session *Session, w io.Writer) Tracer {
1818	return &traceWriter{session: session, w: w}
1819}
1820
1821func (t *traceWriter) Trace(traceId []byte) {
1822	var (
1823		coordinator string
1824		duration    int
1825	)
1826	iter := t.session.control.query(`SELECT coordinator, duration
1827			FROM system_traces.sessions
1828			WHERE session_id = ?`, traceId)
1829
1830	iter.Scan(&coordinator, &duration)
1831	if err := iter.Close(); err != nil {
1832		t.mu.Lock()
1833		fmt.Fprintln(t.w, "Error:", err)
1834		t.mu.Unlock()
1835		return
1836	}
1837
1838	var (
1839		timestamp time.Time
1840		activity  string
1841		source    string
1842		elapsed   int
1843	)
1844
1845	t.mu.Lock()
1846	defer t.mu.Unlock()
1847
1848	fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n",
1849		traceId, coordinator, time.Duration(duration)*time.Microsecond)
1850
1851	iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed
1852			FROM system_traces.events
1853			WHERE session_id = ?`, traceId)
1854
1855	for iter.Scan(&timestamp, &activity, &source, &elapsed) {
1856		fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n",
1857			timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed)
1858	}
1859
1860	if err := iter.Close(); err != nil {
1861		fmt.Fprintln(t.w, "Error:", err)
1862	}
1863}
1864
1865type ObservedQuery struct {
1866	Keyspace  string
1867	Statement string
1868
1869	Start time.Time // time immediately before the query was called
1870	End   time.Time // time immediately after the query returned
1871
1872	// Rows is the number of rows in the current iter.
1873	// In paginated queries, rows from previous scans are not counted.
1874	// Rows is not used in batch queries and remains at the default value
1875	Rows int
1876
1877	// Host is the informations about the host that performed the query
1878	Host *HostInfo
1879
1880	// The metrics per this host
1881	Metrics *hostMetrics
1882
1883	// Err is the error in the query.
1884	// It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
1885	Err error
1886}
1887
1888// QueryObserver is the interface implemented by query observers / stat collectors.
1889//
1890// Experimental, this interface and use may change
1891type QueryObserver interface {
1892	// ObserveQuery gets called on every query to cassandra, including all queries in an iterator when paging is enabled.
1893	// It doesn't get called if there is no query because the session is closed or there are no connections available.
1894	// The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil.
1895	ObserveQuery(context.Context, ObservedQuery)
1896}
1897
1898type ObservedBatch struct {
1899	Keyspace   string
1900	Statements []string
1901
1902	Start time.Time // time immediately before the batch query was called
1903	End   time.Time // time immediately after the batch query returned
1904
1905	// Host is the informations about the host that performed the batch
1906	Host *HostInfo
1907
1908	// Err is the error in the batch query.
1909	// It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
1910	Err error
1911
1912	// The metrics per this host
1913	Metrics *hostMetrics
1914}
1915
1916// BatchObserver is the interface implemented by batch observers / stat collectors.
1917type BatchObserver interface {
1918	// ObserveBatch gets called on every batch query to cassandra.
1919	// It also gets called once for each query in a batch.
1920	// It doesn't get called if there is no query because the session is closed or there are no connections available.
1921	// The error reported only shows query errors, i.e. if a SELECT is valid but finds no matches it will be nil.
1922	// Unlike QueryObserver.ObserveQuery it does no reporting on rows read.
1923	ObserveBatch(context.Context, ObservedBatch)
1924}
1925
1926type ObservedConnect struct {
1927	// Host is the information about the host about to connect
1928	Host *HostInfo
1929
1930	Start time.Time // time immediately before the dial is called
1931	End   time.Time // time immediately after the dial returned
1932
1933	// Err is the connection error (if any)
1934	Err error
1935}
1936
1937// ConnectObserver is the interface implemented by connect observers / stat collectors.
1938type ConnectObserver interface {
1939	// ObserveConnect gets called when a new connection to cassandra is made.
1940	ObserveConnect(ObservedConnect)
1941}
1942
1943type Error struct {
1944	Code    int
1945	Message string
1946}
1947
1948func (e Error) Error() string {
1949	return e.Message
1950}
1951
1952var (
1953	ErrNotFound             = errors.New("not found")
1954	ErrUnavailable          = errors.New("unavailable")
1955	ErrUnsupported          = errors.New("feature not supported")
1956	ErrTooManyStmts         = errors.New("too many statements")
1957	ErrUseStmt              = errors.New("use statements aren't supported. Please see https://github.com/gocql/gocql for explanation.")
1958	ErrSessionClosed        = errors.New("session has been closed")
1959	ErrNoConnections        = errors.New("gocql: no hosts available in the pool")
1960	ErrNoKeyspace           = errors.New("no keyspace provided")
1961	ErrKeyspaceDoesNotExist = errors.New("keyspace does not exist")
1962	ErrNoMetadata           = errors.New("no metadata available")
1963)
1964
1965type ErrProtocol struct{ error }
1966
1967func NewErrProtocol(format string, args ...interface{}) error {
1968	return ErrProtocol{fmt.Errorf(format, args...)}
1969}
1970
1971// BatchSizeMaximum is the maximum number of statements a batch operation can have.
1972// This limit is set by cassandra and could change in the future.
1973const BatchSizeMaximum = 65535
1974