1package cassandra
2
3import (
4	"bytes"
5	"context"
6	"crypto/tls"
7	"flag"
8	"fmt"
9	"io/ioutil"
10	"strings"
11	"time"
12
13	"github.com/go-kit/kit/log"
14	"github.com/go-kit/kit/log/level"
15	"github.com/gocql/gocql"
16	"github.com/pkg/errors"
17	"github.com/prometheus/client_golang/prometheus"
18	"golang.org/x/sync/semaphore"
19
20	util_log "github.com/cortexproject/cortex/pkg/util/log"
21	"github.com/grafana/dskit/flagext"
22
23	"github.com/grafana/loki/pkg/storage/chunk"
24	"github.com/grafana/loki/pkg/storage/chunk/util"
25)
26
27// Config for a StorageClient
28type Config struct {
29	Addresses                string              `yaml:"addresses"`
30	Port                     int                 `yaml:"port"`
31	Keyspace                 string              `yaml:"keyspace"`
32	Consistency              string              `yaml:"consistency"`
33	ReplicationFactor        int                 `yaml:"replication_factor"`
34	DisableInitialHostLookup bool                `yaml:"disable_initial_host_lookup"`
35	SSL                      bool                `yaml:"SSL"`
36	HostVerification         bool                `yaml:"host_verification"`
37	HostSelectionPolicy      string              `yaml:"host_selection_policy"`
38	CAPath                   string              `yaml:"CA_path"`
39	CertPath                 string              `yaml:"tls_cert_path"`
40	KeyPath                  string              `yaml:"tls_key_path"`
41	Auth                     bool                `yaml:"auth"`
42	Username                 string              `yaml:"username"`
43	Password                 flagext.Secret      `yaml:"password"`
44	PasswordFile             string              `yaml:"password_file"`
45	CustomAuthenticators     flagext.StringSlice `yaml:"custom_authenticators"`
46	Timeout                  time.Duration       `yaml:"timeout"`
47	ConnectTimeout           time.Duration       `yaml:"connect_timeout"`
48	ReconnectInterval        time.Duration       `yaml:"reconnect_interval"`
49	Retries                  int                 `yaml:"max_retries"`
50	MaxBackoff               time.Duration       `yaml:"retry_max_backoff"`
51	MinBackoff               time.Duration       `yaml:"retry_min_backoff"`
52	QueryConcurrency         int                 `yaml:"query_concurrency"`
53	NumConnections           int                 `yaml:"num_connections"`
54	ConvictHosts             bool                `yaml:"convict_hosts_on_failure"`
55	TableOptions             string              `yaml:"table_options"`
56}
57
58const (
59	HostPolicyRoundRobin = "round-robin"
60	HostPolicyTokenAware = "token-aware"
61)
62
63// RegisterFlags adds the flags required to config this to the given FlagSet
64func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
65	f.StringVar(&cfg.Addresses, "cassandra.addresses", "", "Comma-separated hostnames or IPs of Cassandra instances.")
66	f.IntVar(&cfg.Port, "cassandra.port", 9042, "Port that Cassandra is running on")
67	f.StringVar(&cfg.Keyspace, "cassandra.keyspace", "", "Keyspace to use in Cassandra.")
68	f.StringVar(&cfg.Consistency, "cassandra.consistency", "QUORUM", "Consistency level for Cassandra.")
69	f.IntVar(&cfg.ReplicationFactor, "cassandra.replication-factor", 3, "Replication factor to use in Cassandra.")
70	f.BoolVar(&cfg.DisableInitialHostLookup, "cassandra.disable-initial-host-lookup", false, "Instruct the cassandra driver to not attempt to get host info from the system.peers table.")
71	f.BoolVar(&cfg.SSL, "cassandra.ssl", false, "Use SSL when connecting to cassandra instances.")
72	f.BoolVar(&cfg.HostVerification, "cassandra.host-verification", true, "Require SSL certificate validation.")
73	f.StringVar(&cfg.HostSelectionPolicy, "cassandra.host-selection-policy", HostPolicyRoundRobin, "Policy for selecting Cassandra host. Supported values are: round-robin, token-aware.")
74	f.StringVar(&cfg.CAPath, "cassandra.ca-path", "", "Path to certificate file to verify the peer.")
75	f.StringVar(&cfg.CertPath, "cassandra.tls-cert-path", "", "Path to certificate file used by TLS.")
76	f.StringVar(&cfg.KeyPath, "cassandra.tls-key-path", "", "Path to private key file used by TLS.")
77	f.BoolVar(&cfg.Auth, "cassandra.auth", false, "Enable password authentication when connecting to cassandra.")
78	f.StringVar(&cfg.Username, "cassandra.username", "", "Username to use when connecting to cassandra.")
79	f.Var(&cfg.Password, "cassandra.password", "Password to use when connecting to cassandra.")
80	f.StringVar(&cfg.PasswordFile, "cassandra.password-file", "", "File containing password to use when connecting to cassandra.")
81	f.Var(&cfg.CustomAuthenticators, "cassandra.custom-authenticator", "If set, when authenticating with cassandra a custom authenticator will be expected during the handshake. This flag can be set multiple times.")
82	f.DurationVar(&cfg.Timeout, "cassandra.timeout", 2*time.Second, "Timeout when connecting to cassandra.")
83	f.DurationVar(&cfg.ConnectTimeout, "cassandra.connect-timeout", 5*time.Second, "Initial connection timeout, used during initial dial to server.")
84	f.DurationVar(&cfg.ReconnectInterval, "cassandra.reconnent-interval", 1*time.Second, "Interval to retry connecting to cassandra nodes marked as DOWN.")
85	f.IntVar(&cfg.Retries, "cassandra.max-retries", 0, "Number of retries to perform on a request. Set to 0 to disable retries.")
86	f.DurationVar(&cfg.MinBackoff, "cassandra.retry-min-backoff", 100*time.Millisecond, "Minimum time to wait before retrying a failed request.")
87	f.DurationVar(&cfg.MaxBackoff, "cassandra.retry-max-backoff", 10*time.Second, "Maximum time to wait before retrying a failed request.")
88	f.IntVar(&cfg.QueryConcurrency, "cassandra.query-concurrency", 0, "Limit number of concurrent queries to Cassandra. Set to 0 to disable the limit.")
89	f.IntVar(&cfg.NumConnections, "cassandra.num-connections", 2, "Number of TCP connections per host.")
90	f.BoolVar(&cfg.ConvictHosts, "cassandra.convict-hosts-on-failure", true, "Convict hosts of being down on failure.")
91	f.StringVar(&cfg.TableOptions, "cassandra.table-options", "", "Table options used to create index or chunk tables. This value is used as plain text in the table `WITH` like this, \"CREATE TABLE <generated_by_cortex> (...) WITH <cassandra.table-options>\". For details, see https://cortexmetrics.io/docs/production/cassandra. By default it will use the default table options of your Cassandra cluster.")
92}
93
94func (cfg *Config) Validate() error {
95	if cfg.Password.Value != "" && cfg.PasswordFile != "" {
96		return errors.Errorf("The password and password_file config options are mutually exclusive.")
97	}
98	if cfg.SSL && cfg.HostVerification && len(strings.Split(cfg.Addresses, ",")) != 1 {
99		return errors.Errorf("Host verification is only possible for a single host.")
100	}
101	if cfg.SSL && cfg.CertPath != "" && cfg.KeyPath == "" {
102		return errors.Errorf("TLS certificate specified, but private key configuration is missing.")
103	}
104	if cfg.SSL && cfg.KeyPath != "" && cfg.CertPath == "" {
105		return errors.Errorf("TLS private key specified, but certificate configuration is missing.")
106	}
107	return nil
108}
109
110func (cfg *Config) session(name string, reg prometheus.Registerer) (*gocql.Session, error) {
111	cluster := gocql.NewCluster(strings.Split(cfg.Addresses, ",")...)
112	cluster.Port = cfg.Port
113	cluster.Keyspace = cfg.Keyspace
114	cluster.BatchObserver = observer{}
115	cluster.QueryObserver = observer{}
116	cluster.Timeout = cfg.Timeout
117	cluster.ConnectTimeout = cfg.ConnectTimeout
118	cluster.ReconnectInterval = cfg.ReconnectInterval
119	cluster.NumConns = cfg.NumConnections
120	cluster.Logger = log.With(util_log.Logger, "module", "gocql", "client", name)
121	cluster.Registerer = prometheus.WrapRegistererWith(
122		prometheus.Labels{"client": name}, reg)
123	if cfg.Retries > 0 {
124		cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{
125			NumRetries: cfg.Retries,
126			Min:        cfg.MinBackoff,
127			Max:        cfg.MaxBackoff,
128		}
129	}
130	if !cfg.ConvictHosts {
131		cluster.ConvictionPolicy = noopConvictionPolicy{}
132	}
133	if err := cfg.setClusterConfig(cluster); err != nil {
134		return nil, errors.WithStack(err)
135	}
136
137	session, err := cluster.CreateSession()
138	if err == nil {
139		return session, nil
140	}
141	// ErrNoConnectionsStarted will be returned if keyspace don't exist or is invalid.
142	// ref. https://github.com/gocql/gocql/blob/07ace3bab0f84bb88477bab5d79ba1f7e1da0169/cassandra_test.go#L85-L97
143	if err != gocql.ErrNoConnectionsStarted {
144		return nil, errors.WithStack(err)
145	}
146	// keyspace not exist
147	if err := cfg.createKeyspace(); err != nil {
148		return nil, errors.WithStack(err)
149	}
150	session, err = cluster.CreateSession()
151	return session, errors.WithStack(err)
152}
153
154// apply config settings to a cassandra ClusterConfig
155func (cfg *Config) setClusterConfig(cluster *gocql.ClusterConfig) error {
156	consistency, err := gocql.ParseConsistencyWrapper(cfg.Consistency)
157	if err != nil {
158		return errors.Wrap(err, "unable to parse the configured consistency")
159	}
160
161	cluster.Consistency = consistency
162	cluster.DisableInitialHostLookup = cfg.DisableInitialHostLookup
163
164	if cfg.SSL {
165		tlsConfig := &tls.Config{}
166
167		if cfg.CertPath != "" {
168			cert, err := tls.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath)
169			if err != nil {
170				return errors.Wrap(err, "Unable to load TLS certificate and private key")
171			}
172
173			tlsConfig.Certificates = []tls.Certificate{cert}
174		}
175
176		if cfg.HostVerification {
177			tlsConfig.ServerName = strings.Split(cfg.Addresses, ",")[0]
178
179			cluster.SslOpts = &gocql.SslOptions{
180				CaPath:                 cfg.CAPath,
181				EnableHostVerification: true,
182				Config:                 tlsConfig,
183			}
184		} else {
185			cluster.SslOpts = &gocql.SslOptions{
186				EnableHostVerification: false,
187				Config:                 tlsConfig,
188			}
189		}
190	}
191
192	if cfg.HostSelectionPolicy == HostPolicyRoundRobin {
193		cluster.PoolConfig.HostSelectionPolicy = gocql.RoundRobinHostPolicy()
194	} else if cfg.HostSelectionPolicy == HostPolicyTokenAware {
195		cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
196	} else {
197		return errors.New("Unknown host selection policy")
198	}
199
200	if cfg.Auth {
201		password := cfg.Password.Value
202		if cfg.PasswordFile != "" {
203			passwordBytes, err := ioutil.ReadFile(cfg.PasswordFile)
204			if err != nil {
205				return errors.Errorf("Could not read Cassandra password file: %v", err)
206			}
207			passwordBytes = bytes.TrimRight(passwordBytes, "\n")
208			password = string(passwordBytes)
209		}
210		if len(cfg.CustomAuthenticators) != 0 {
211			cluster.Authenticator = CustomPasswordAuthenticator{
212				ApprovedAuthenticators: cfg.CustomAuthenticators,
213				Username:               cfg.Username,
214				Password:               password,
215			}
216			return nil
217		}
218		cluster.Authenticator = gocql.PasswordAuthenticator{
219			Username: cfg.Username,
220			Password: password,
221		}
222	}
223	return nil
224}
225
226// createKeyspace will create the desired keyspace if it doesn't exist.
227func (cfg *Config) createKeyspace() error {
228	cluster := gocql.NewCluster(strings.Split(cfg.Addresses, ",")...)
229	cluster.Port = cfg.Port
230	cluster.Keyspace = "system"
231	cluster.Timeout = 20 * time.Second
232	cluster.ConnectTimeout = 20 * time.Second
233
234	if err := cfg.setClusterConfig(cluster); err != nil {
235		return errors.WithStack(err)
236	}
237
238	session, err := cluster.CreateSession()
239	if err != nil {
240		return errors.WithStack(err)
241	}
242	defer session.Close()
243
244	err = session.Query(fmt.Sprintf(
245		`CREATE KEYSPACE IF NOT EXISTS %s
246		 WITH replication = {
247			 'class' : 'SimpleStrategy',
248			 'replication_factor' : %d
249		 }`,
250		cfg.Keyspace, cfg.ReplicationFactor)).Exec()
251	return errors.WithStack(err)
252}
253
254// StorageClient implements chunk.IndexClient and chunk.ObjectClient for Cassandra.
255type StorageClient struct {
256	cfg            Config
257	schemaCfg      chunk.SchemaConfig
258	readSession    *gocql.Session
259	writeSession   *gocql.Session
260	querySemaphore *semaphore.Weighted
261}
262
263// NewStorageClient returns a new StorageClient.
264func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*StorageClient, error) {
265	readSession, err := cfg.session("index-read", registerer)
266	if err != nil {
267		return nil, errors.WithStack(err)
268	}
269
270	writeSession, err := cfg.session("index-write", registerer)
271	if err != nil {
272		return nil, errors.WithStack(err)
273	}
274
275	var querySemaphore *semaphore.Weighted
276	if cfg.QueryConcurrency > 0 {
277		querySemaphore = semaphore.NewWeighted(int64(cfg.QueryConcurrency))
278	}
279
280	client := &StorageClient{
281		cfg:            cfg,
282		schemaCfg:      schemaCfg,
283		readSession:    readSession,
284		writeSession:   writeSession,
285		querySemaphore: querySemaphore,
286	}
287	return client, nil
288}
289
290// Stop implement chunk.IndexClient.
291func (s *StorageClient) Stop() {
292	s.readSession.Close()
293	s.writeSession.Close()
294}
295
296// Cassandra batching isn't really useful in this case, its more to do multiple
297// atomic writes.  Therefore we just do a bunch of writes in parallel.
298type writeBatch struct {
299	entries []chunk.IndexEntry
300	deletes []chunk.IndexEntry
301}
302
303// NewWriteBatch implement chunk.IndexClient.
304func (s *StorageClient) NewWriteBatch() chunk.WriteBatch {
305	return &writeBatch{}
306}
307
308func (b *writeBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) {
309	b.entries = append(b.entries, chunk.IndexEntry{
310		TableName:  tableName,
311		HashValue:  hashValue,
312		RangeValue: rangeValue,
313		Value:      value,
314	})
315}
316
317func (b *writeBatch) Delete(tableName, hashValue string, rangeValue []byte) {
318	b.deletes = append(b.deletes, chunk.IndexEntry{
319		TableName:  tableName,
320		HashValue:  hashValue,
321		RangeValue: rangeValue,
322	})
323}
324
325// BatchWrite implement chunk.IndexClient.
326func (s *StorageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
327	b := batch.(*writeBatch)
328
329	for _, entry := range b.entries {
330		err := s.writeSession.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, ?, ?)",
331			entry.TableName), entry.HashValue, entry.RangeValue, entry.Value).WithContext(ctx).Exec()
332		if err != nil {
333			return errors.WithStack(err)
334		}
335	}
336
337	for _, entry := range b.deletes {
338		err := s.writeSession.Query(fmt.Sprintf("DELETE FROM %s WHERE hash = ? and range = ?",
339			entry.TableName), entry.HashValue, entry.RangeValue).WithContext(ctx).Exec()
340		if err != nil {
341			return errors.WithStack(err)
342		}
343	}
344
345	return nil
346}
347
348// QueryPages implement chunk.IndexClient.
349func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error {
350	return util.DoParallelQueries(ctx, s.query, queries, callback)
351}
352
353func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error {
354	if s.querySemaphore != nil {
355		if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
356			return err
357		}
358		defer s.querySemaphore.Release(1)
359	}
360
361	var q *gocql.Query
362
363	switch {
364	case len(query.RangeValuePrefix) > 0 && query.ValueEqual == nil:
365		q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ?",
366			query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff'))
367
368	case len(query.RangeValuePrefix) > 0 && query.ValueEqual != nil:
369		q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ? AND value = ? ALLOW FILTERING",
370			query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff'), query.ValueEqual)
371
372	case len(query.RangeValueStart) > 0 && query.ValueEqual == nil:
373		q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?",
374			query.TableName), query.HashValue, query.RangeValueStart)
375
376	case len(query.RangeValueStart) > 0 && query.ValueEqual != nil:
377		q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND value = ? ALLOW FILTERING",
378			query.TableName), query.HashValue, query.RangeValueStart, query.ValueEqual)
379
380	case query.ValueEqual == nil:
381		q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ?",
382			query.TableName), query.HashValue)
383
384	case query.ValueEqual != nil:
385		q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND value = ? ALLOW FILTERING",
386			query.TableName), query.HashValue, query.ValueEqual)
387	}
388
389	iter := q.WithContext(ctx).Iter()
390	defer iter.Close()
391	scanner := iter.Scanner()
392	for scanner.Next() {
393		b := &readBatch{}
394		if err := scanner.Scan(&b.rangeValue, &b.value); err != nil {
395			return errors.WithStack(err)
396		}
397		if !callback(query, b) {
398			return nil
399		}
400	}
401	return errors.WithStack(scanner.Err())
402}
403
404// Allow other packages to interact with Cassandra directly
405func (s *StorageClient) GetReadSession() *gocql.Session {
406	return s.readSession
407}
408
409// readBatch represents a batch of rows read from Cassandra.
410type readBatch struct {
411	rangeValue []byte
412	value      []byte
413}
414
415func (r *readBatch) Iterator() chunk.ReadBatchIterator {
416	return &readBatchIter{
417		readBatch: r,
418	}
419}
420
421type readBatchIter struct {
422	consumed bool
423	*readBatch
424}
425
426func (b *readBatchIter) Next() bool {
427	if b.consumed {
428		return false
429	}
430	b.consumed = true
431	return true
432}
433
434func (b *readBatchIter) RangeValue() []byte {
435	return b.rangeValue
436}
437
438func (b *readBatchIter) Value() []byte {
439	return b.value
440}
441
442// ObjectClient implements chunk.ObjectClient for Cassandra.
443type ObjectClient struct {
444	cfg            Config
445	schemaCfg      chunk.SchemaConfig
446	readSession    *gocql.Session
447	writeSession   *gocql.Session
448	querySemaphore *semaphore.Weighted
449}
450
451// NewObjectClient returns a new ObjectClient.
452func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*ObjectClient, error) {
453	readSession, err := cfg.session("chunks-read", registerer)
454	if err != nil {
455		return nil, errors.WithStack(err)
456	}
457
458	writeSession, err := cfg.session("chunks-write", registerer)
459	if err != nil {
460		return nil, errors.WithStack(err)
461	}
462
463	var querySemaphore *semaphore.Weighted
464	if cfg.QueryConcurrency > 0 {
465		querySemaphore = semaphore.NewWeighted(int64(cfg.QueryConcurrency))
466	}
467
468	client := &ObjectClient{
469		cfg:            cfg,
470		schemaCfg:      schemaCfg,
471		readSession:    readSession,
472		writeSession:   writeSession,
473		querySemaphore: querySemaphore,
474	}
475	return client, nil
476}
477
478// PutChunks implements chunk.ObjectClient.
479func (s *ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
480	for i := range chunks {
481		buf, err := chunks[i].Encoded()
482		if err != nil {
483			return errors.WithStack(err)
484		}
485		key := chunks[i].ExternalKey()
486		tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From)
487		if err != nil {
488			return err
489		}
490
491		// Must provide a range key, even though its not useds - hence 0x00.
492		q := s.writeSession.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)",
493			tableName), key, buf)
494		if err := q.WithContext(ctx).Exec(); err != nil {
495			return errors.WithStack(err)
496		}
497	}
498
499	return nil
500}
501
502// GetChunks implements chunk.ObjectClient.
503func (s *ObjectClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) {
504	return util.GetParallelChunks(ctx, input, s.getChunk)
505}
506
507func (s *ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
508	if s.querySemaphore != nil {
509		if err := s.querySemaphore.Acquire(ctx, 1); err != nil {
510			return input, err
511		}
512		defer s.querySemaphore.Release(1)
513	}
514
515	tableName, err := s.schemaCfg.ChunkTableFor(input.From)
516	if err != nil {
517		return input, err
518	}
519
520	var buf []byte
521	if err := s.readSession.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()).
522		WithContext(ctx).Scan(&buf); err != nil {
523		return input, errors.WithStack(err)
524	}
525	err = input.Decode(decodeContext, buf)
526	return input, err
527}
528
529func (s *ObjectClient) DeleteChunk(ctx context.Context, userID, chunkID string) error {
530	chunkRef, err := chunk.ParseExternalKey(userID, chunkID)
531	if err != nil {
532		return err
533	}
534
535	tableName, err := s.schemaCfg.ChunkTableFor(chunkRef.From)
536	if err != nil {
537		return err
538	}
539
540	q := s.writeSession.Query(fmt.Sprintf("DELETE FROM %s WHERE hash = ?",
541		tableName), chunkID)
542	if err := q.WithContext(ctx).Exec(); err != nil {
543		return errors.WithStack(err)
544	}
545
546	return nil
547}
548
549func (s *ObjectClient) IsChunkNotFoundErr(_ error) bool {
550	return false
551}
552
553// Stop implement chunk.ObjectClient.
554func (s *ObjectClient) Stop() {
555	s.readSession.Close()
556	s.writeSession.Close()
557}
558
559type noopConvictionPolicy struct{}
560
561// AddFailure should return `true` if the host should be convicted, `false` otherwise.
562// Convicted means connections are removed - we don't want that.
563// Implementats gocql.ConvictionPolicy.
564func (noopConvictionPolicy) AddFailure(err error, host *gocql.HostInfo) bool {
565	level.Error(util_log.Logger).Log("msg", "Cassandra host failure", "err", err, "host", host.String())
566	return false
567}
568
569// Implementats gocql.ConvictionPolicy.
570func (noopConvictionPolicy) Reset(host *gocql.HostInfo) {}
571