1package cassandra 2 3import ( 4 "bytes" 5 "context" 6 "crypto/tls" 7 "flag" 8 "fmt" 9 "io/ioutil" 10 "strings" 11 "time" 12 13 "github.com/go-kit/kit/log" 14 "github.com/go-kit/kit/log/level" 15 "github.com/gocql/gocql" 16 "github.com/pkg/errors" 17 "github.com/prometheus/client_golang/prometheus" 18 "golang.org/x/sync/semaphore" 19 20 util_log "github.com/cortexproject/cortex/pkg/util/log" 21 "github.com/grafana/dskit/flagext" 22 23 "github.com/grafana/loki/pkg/storage/chunk" 24 "github.com/grafana/loki/pkg/storage/chunk/util" 25) 26 27// Config for a StorageClient 28type Config struct { 29 Addresses string `yaml:"addresses"` 30 Port int `yaml:"port"` 31 Keyspace string `yaml:"keyspace"` 32 Consistency string `yaml:"consistency"` 33 ReplicationFactor int `yaml:"replication_factor"` 34 DisableInitialHostLookup bool `yaml:"disable_initial_host_lookup"` 35 SSL bool `yaml:"SSL"` 36 HostVerification bool `yaml:"host_verification"` 37 HostSelectionPolicy string `yaml:"host_selection_policy"` 38 CAPath string `yaml:"CA_path"` 39 CertPath string `yaml:"tls_cert_path"` 40 KeyPath string `yaml:"tls_key_path"` 41 Auth bool `yaml:"auth"` 42 Username string `yaml:"username"` 43 Password flagext.Secret `yaml:"password"` 44 PasswordFile string `yaml:"password_file"` 45 CustomAuthenticators flagext.StringSlice `yaml:"custom_authenticators"` 46 Timeout time.Duration `yaml:"timeout"` 47 ConnectTimeout time.Duration `yaml:"connect_timeout"` 48 ReconnectInterval time.Duration `yaml:"reconnect_interval"` 49 Retries int `yaml:"max_retries"` 50 MaxBackoff time.Duration `yaml:"retry_max_backoff"` 51 MinBackoff time.Duration `yaml:"retry_min_backoff"` 52 QueryConcurrency int `yaml:"query_concurrency"` 53 NumConnections int `yaml:"num_connections"` 54 ConvictHosts bool `yaml:"convict_hosts_on_failure"` 55 TableOptions string `yaml:"table_options"` 56} 57 58const ( 59 HostPolicyRoundRobin = "round-robin" 60 HostPolicyTokenAware = "token-aware" 61) 62 63// RegisterFlags adds the flags required to config this to the given FlagSet 64func (cfg *Config) RegisterFlags(f *flag.FlagSet) { 65 f.StringVar(&cfg.Addresses, "cassandra.addresses", "", "Comma-separated hostnames or IPs of Cassandra instances.") 66 f.IntVar(&cfg.Port, "cassandra.port", 9042, "Port that Cassandra is running on") 67 f.StringVar(&cfg.Keyspace, "cassandra.keyspace", "", "Keyspace to use in Cassandra.") 68 f.StringVar(&cfg.Consistency, "cassandra.consistency", "QUORUM", "Consistency level for Cassandra.") 69 f.IntVar(&cfg.ReplicationFactor, "cassandra.replication-factor", 3, "Replication factor to use in Cassandra.") 70 f.BoolVar(&cfg.DisableInitialHostLookup, "cassandra.disable-initial-host-lookup", false, "Instruct the cassandra driver to not attempt to get host info from the system.peers table.") 71 f.BoolVar(&cfg.SSL, "cassandra.ssl", false, "Use SSL when connecting to cassandra instances.") 72 f.BoolVar(&cfg.HostVerification, "cassandra.host-verification", true, "Require SSL certificate validation.") 73 f.StringVar(&cfg.HostSelectionPolicy, "cassandra.host-selection-policy", HostPolicyRoundRobin, "Policy for selecting Cassandra host. Supported values are: round-robin, token-aware.") 74 f.StringVar(&cfg.CAPath, "cassandra.ca-path", "", "Path to certificate file to verify the peer.") 75 f.StringVar(&cfg.CertPath, "cassandra.tls-cert-path", "", "Path to certificate file used by TLS.") 76 f.StringVar(&cfg.KeyPath, "cassandra.tls-key-path", "", "Path to private key file used by TLS.") 77 f.BoolVar(&cfg.Auth, "cassandra.auth", false, "Enable password authentication when connecting to cassandra.") 78 f.StringVar(&cfg.Username, "cassandra.username", "", "Username to use when connecting to cassandra.") 79 f.Var(&cfg.Password, "cassandra.password", "Password to use when connecting to cassandra.") 80 f.StringVar(&cfg.PasswordFile, "cassandra.password-file", "", "File containing password to use when connecting to cassandra.") 81 f.Var(&cfg.CustomAuthenticators, "cassandra.custom-authenticator", "If set, when authenticating with cassandra a custom authenticator will be expected during the handshake. This flag can be set multiple times.") 82 f.DurationVar(&cfg.Timeout, "cassandra.timeout", 2*time.Second, "Timeout when connecting to cassandra.") 83 f.DurationVar(&cfg.ConnectTimeout, "cassandra.connect-timeout", 5*time.Second, "Initial connection timeout, used during initial dial to server.") 84 f.DurationVar(&cfg.ReconnectInterval, "cassandra.reconnent-interval", 1*time.Second, "Interval to retry connecting to cassandra nodes marked as DOWN.") 85 f.IntVar(&cfg.Retries, "cassandra.max-retries", 0, "Number of retries to perform on a request. Set to 0 to disable retries.") 86 f.DurationVar(&cfg.MinBackoff, "cassandra.retry-min-backoff", 100*time.Millisecond, "Minimum time to wait before retrying a failed request.") 87 f.DurationVar(&cfg.MaxBackoff, "cassandra.retry-max-backoff", 10*time.Second, "Maximum time to wait before retrying a failed request.") 88 f.IntVar(&cfg.QueryConcurrency, "cassandra.query-concurrency", 0, "Limit number of concurrent queries to Cassandra. Set to 0 to disable the limit.") 89 f.IntVar(&cfg.NumConnections, "cassandra.num-connections", 2, "Number of TCP connections per host.") 90 f.BoolVar(&cfg.ConvictHosts, "cassandra.convict-hosts-on-failure", true, "Convict hosts of being down on failure.") 91 f.StringVar(&cfg.TableOptions, "cassandra.table-options", "", "Table options used to create index or chunk tables. This value is used as plain text in the table `WITH` like this, \"CREATE TABLE <generated_by_cortex> (...) WITH <cassandra.table-options>\". For details, see https://cortexmetrics.io/docs/production/cassandra. By default it will use the default table options of your Cassandra cluster.") 92} 93 94func (cfg *Config) Validate() error { 95 if cfg.Password.Value != "" && cfg.PasswordFile != "" { 96 return errors.Errorf("The password and password_file config options are mutually exclusive.") 97 } 98 if cfg.SSL && cfg.HostVerification && len(strings.Split(cfg.Addresses, ",")) != 1 { 99 return errors.Errorf("Host verification is only possible for a single host.") 100 } 101 if cfg.SSL && cfg.CertPath != "" && cfg.KeyPath == "" { 102 return errors.Errorf("TLS certificate specified, but private key configuration is missing.") 103 } 104 if cfg.SSL && cfg.KeyPath != "" && cfg.CertPath == "" { 105 return errors.Errorf("TLS private key specified, but certificate configuration is missing.") 106 } 107 return nil 108} 109 110func (cfg *Config) session(name string, reg prometheus.Registerer) (*gocql.Session, error) { 111 cluster := gocql.NewCluster(strings.Split(cfg.Addresses, ",")...) 112 cluster.Port = cfg.Port 113 cluster.Keyspace = cfg.Keyspace 114 cluster.BatchObserver = observer{} 115 cluster.QueryObserver = observer{} 116 cluster.Timeout = cfg.Timeout 117 cluster.ConnectTimeout = cfg.ConnectTimeout 118 cluster.ReconnectInterval = cfg.ReconnectInterval 119 cluster.NumConns = cfg.NumConnections 120 cluster.Logger = log.With(util_log.Logger, "module", "gocql", "client", name) 121 cluster.Registerer = prometheus.WrapRegistererWith( 122 prometheus.Labels{"client": name}, reg) 123 if cfg.Retries > 0 { 124 cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{ 125 NumRetries: cfg.Retries, 126 Min: cfg.MinBackoff, 127 Max: cfg.MaxBackoff, 128 } 129 } 130 if !cfg.ConvictHosts { 131 cluster.ConvictionPolicy = noopConvictionPolicy{} 132 } 133 if err := cfg.setClusterConfig(cluster); err != nil { 134 return nil, errors.WithStack(err) 135 } 136 137 session, err := cluster.CreateSession() 138 if err == nil { 139 return session, nil 140 } 141 // ErrNoConnectionsStarted will be returned if keyspace don't exist or is invalid. 142 // ref. https://github.com/gocql/gocql/blob/07ace3bab0f84bb88477bab5d79ba1f7e1da0169/cassandra_test.go#L85-L97 143 if err != gocql.ErrNoConnectionsStarted { 144 return nil, errors.WithStack(err) 145 } 146 // keyspace not exist 147 if err := cfg.createKeyspace(); err != nil { 148 return nil, errors.WithStack(err) 149 } 150 session, err = cluster.CreateSession() 151 return session, errors.WithStack(err) 152} 153 154// apply config settings to a cassandra ClusterConfig 155func (cfg *Config) setClusterConfig(cluster *gocql.ClusterConfig) error { 156 consistency, err := gocql.ParseConsistencyWrapper(cfg.Consistency) 157 if err != nil { 158 return errors.Wrap(err, "unable to parse the configured consistency") 159 } 160 161 cluster.Consistency = consistency 162 cluster.DisableInitialHostLookup = cfg.DisableInitialHostLookup 163 164 if cfg.SSL { 165 tlsConfig := &tls.Config{} 166 167 if cfg.CertPath != "" { 168 cert, err := tls.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath) 169 if err != nil { 170 return errors.Wrap(err, "Unable to load TLS certificate and private key") 171 } 172 173 tlsConfig.Certificates = []tls.Certificate{cert} 174 } 175 176 if cfg.HostVerification { 177 tlsConfig.ServerName = strings.Split(cfg.Addresses, ",")[0] 178 179 cluster.SslOpts = &gocql.SslOptions{ 180 CaPath: cfg.CAPath, 181 EnableHostVerification: true, 182 Config: tlsConfig, 183 } 184 } else { 185 cluster.SslOpts = &gocql.SslOptions{ 186 EnableHostVerification: false, 187 Config: tlsConfig, 188 } 189 } 190 } 191 192 if cfg.HostSelectionPolicy == HostPolicyRoundRobin { 193 cluster.PoolConfig.HostSelectionPolicy = gocql.RoundRobinHostPolicy() 194 } else if cfg.HostSelectionPolicy == HostPolicyTokenAware { 195 cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) 196 } else { 197 return errors.New("Unknown host selection policy") 198 } 199 200 if cfg.Auth { 201 password := cfg.Password.Value 202 if cfg.PasswordFile != "" { 203 passwordBytes, err := ioutil.ReadFile(cfg.PasswordFile) 204 if err != nil { 205 return errors.Errorf("Could not read Cassandra password file: %v", err) 206 } 207 passwordBytes = bytes.TrimRight(passwordBytes, "\n") 208 password = string(passwordBytes) 209 } 210 if len(cfg.CustomAuthenticators) != 0 { 211 cluster.Authenticator = CustomPasswordAuthenticator{ 212 ApprovedAuthenticators: cfg.CustomAuthenticators, 213 Username: cfg.Username, 214 Password: password, 215 } 216 return nil 217 } 218 cluster.Authenticator = gocql.PasswordAuthenticator{ 219 Username: cfg.Username, 220 Password: password, 221 } 222 } 223 return nil 224} 225 226// createKeyspace will create the desired keyspace if it doesn't exist. 227func (cfg *Config) createKeyspace() error { 228 cluster := gocql.NewCluster(strings.Split(cfg.Addresses, ",")...) 229 cluster.Port = cfg.Port 230 cluster.Keyspace = "system" 231 cluster.Timeout = 20 * time.Second 232 cluster.ConnectTimeout = 20 * time.Second 233 234 if err := cfg.setClusterConfig(cluster); err != nil { 235 return errors.WithStack(err) 236 } 237 238 session, err := cluster.CreateSession() 239 if err != nil { 240 return errors.WithStack(err) 241 } 242 defer session.Close() 243 244 err = session.Query(fmt.Sprintf( 245 `CREATE KEYSPACE IF NOT EXISTS %s 246 WITH replication = { 247 'class' : 'SimpleStrategy', 248 'replication_factor' : %d 249 }`, 250 cfg.Keyspace, cfg.ReplicationFactor)).Exec() 251 return errors.WithStack(err) 252} 253 254// StorageClient implements chunk.IndexClient and chunk.ObjectClient for Cassandra. 255type StorageClient struct { 256 cfg Config 257 schemaCfg chunk.SchemaConfig 258 readSession *gocql.Session 259 writeSession *gocql.Session 260 querySemaphore *semaphore.Weighted 261} 262 263// NewStorageClient returns a new StorageClient. 264func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*StorageClient, error) { 265 readSession, err := cfg.session("index-read", registerer) 266 if err != nil { 267 return nil, errors.WithStack(err) 268 } 269 270 writeSession, err := cfg.session("index-write", registerer) 271 if err != nil { 272 return nil, errors.WithStack(err) 273 } 274 275 var querySemaphore *semaphore.Weighted 276 if cfg.QueryConcurrency > 0 { 277 querySemaphore = semaphore.NewWeighted(int64(cfg.QueryConcurrency)) 278 } 279 280 client := &StorageClient{ 281 cfg: cfg, 282 schemaCfg: schemaCfg, 283 readSession: readSession, 284 writeSession: writeSession, 285 querySemaphore: querySemaphore, 286 } 287 return client, nil 288} 289 290// Stop implement chunk.IndexClient. 291func (s *StorageClient) Stop() { 292 s.readSession.Close() 293 s.writeSession.Close() 294} 295 296// Cassandra batching isn't really useful in this case, its more to do multiple 297// atomic writes. Therefore we just do a bunch of writes in parallel. 298type writeBatch struct { 299 entries []chunk.IndexEntry 300 deletes []chunk.IndexEntry 301} 302 303// NewWriteBatch implement chunk.IndexClient. 304func (s *StorageClient) NewWriteBatch() chunk.WriteBatch { 305 return &writeBatch{} 306} 307 308func (b *writeBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { 309 b.entries = append(b.entries, chunk.IndexEntry{ 310 TableName: tableName, 311 HashValue: hashValue, 312 RangeValue: rangeValue, 313 Value: value, 314 }) 315} 316 317func (b *writeBatch) Delete(tableName, hashValue string, rangeValue []byte) { 318 b.deletes = append(b.deletes, chunk.IndexEntry{ 319 TableName: tableName, 320 HashValue: hashValue, 321 RangeValue: rangeValue, 322 }) 323} 324 325// BatchWrite implement chunk.IndexClient. 326func (s *StorageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { 327 b := batch.(*writeBatch) 328 329 for _, entry := range b.entries { 330 err := s.writeSession.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, ?, ?)", 331 entry.TableName), entry.HashValue, entry.RangeValue, entry.Value).WithContext(ctx).Exec() 332 if err != nil { 333 return errors.WithStack(err) 334 } 335 } 336 337 for _, entry := range b.deletes { 338 err := s.writeSession.Query(fmt.Sprintf("DELETE FROM %s WHERE hash = ? and range = ?", 339 entry.TableName), entry.HashValue, entry.RangeValue).WithContext(ctx).Exec() 340 if err != nil { 341 return errors.WithStack(err) 342 } 343 } 344 345 return nil 346} 347 348// QueryPages implement chunk.IndexClient. 349func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { 350 return util.DoParallelQueries(ctx, s.query, queries, callback) 351} 352 353func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error { 354 if s.querySemaphore != nil { 355 if err := s.querySemaphore.Acquire(ctx, 1); err != nil { 356 return err 357 } 358 defer s.querySemaphore.Release(1) 359 } 360 361 var q *gocql.Query 362 363 switch { 364 case len(query.RangeValuePrefix) > 0 && query.ValueEqual == nil: 365 q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ?", 366 query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff')) 367 368 case len(query.RangeValuePrefix) > 0 && query.ValueEqual != nil: 369 q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ? AND value = ? ALLOW FILTERING", 370 query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff'), query.ValueEqual) 371 372 case len(query.RangeValueStart) > 0 && query.ValueEqual == nil: 373 q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?", 374 query.TableName), query.HashValue, query.RangeValueStart) 375 376 case len(query.RangeValueStart) > 0 && query.ValueEqual != nil: 377 q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND value = ? ALLOW FILTERING", 378 query.TableName), query.HashValue, query.RangeValueStart, query.ValueEqual) 379 380 case query.ValueEqual == nil: 381 q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ?", 382 query.TableName), query.HashValue) 383 384 case query.ValueEqual != nil: 385 q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND value = ? ALLOW FILTERING", 386 query.TableName), query.HashValue, query.ValueEqual) 387 } 388 389 iter := q.WithContext(ctx).Iter() 390 defer iter.Close() 391 scanner := iter.Scanner() 392 for scanner.Next() { 393 b := &readBatch{} 394 if err := scanner.Scan(&b.rangeValue, &b.value); err != nil { 395 return errors.WithStack(err) 396 } 397 if !callback(query, b) { 398 return nil 399 } 400 } 401 return errors.WithStack(scanner.Err()) 402} 403 404// Allow other packages to interact with Cassandra directly 405func (s *StorageClient) GetReadSession() *gocql.Session { 406 return s.readSession 407} 408 409// readBatch represents a batch of rows read from Cassandra. 410type readBatch struct { 411 rangeValue []byte 412 value []byte 413} 414 415func (r *readBatch) Iterator() chunk.ReadBatchIterator { 416 return &readBatchIter{ 417 readBatch: r, 418 } 419} 420 421type readBatchIter struct { 422 consumed bool 423 *readBatch 424} 425 426func (b *readBatchIter) Next() bool { 427 if b.consumed { 428 return false 429 } 430 b.consumed = true 431 return true 432} 433 434func (b *readBatchIter) RangeValue() []byte { 435 return b.rangeValue 436} 437 438func (b *readBatchIter) Value() []byte { 439 return b.value 440} 441 442// ObjectClient implements chunk.ObjectClient for Cassandra. 443type ObjectClient struct { 444 cfg Config 445 schemaCfg chunk.SchemaConfig 446 readSession *gocql.Session 447 writeSession *gocql.Session 448 querySemaphore *semaphore.Weighted 449} 450 451// NewObjectClient returns a new ObjectClient. 452func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*ObjectClient, error) { 453 readSession, err := cfg.session("chunks-read", registerer) 454 if err != nil { 455 return nil, errors.WithStack(err) 456 } 457 458 writeSession, err := cfg.session("chunks-write", registerer) 459 if err != nil { 460 return nil, errors.WithStack(err) 461 } 462 463 var querySemaphore *semaphore.Weighted 464 if cfg.QueryConcurrency > 0 { 465 querySemaphore = semaphore.NewWeighted(int64(cfg.QueryConcurrency)) 466 } 467 468 client := &ObjectClient{ 469 cfg: cfg, 470 schemaCfg: schemaCfg, 471 readSession: readSession, 472 writeSession: writeSession, 473 querySemaphore: querySemaphore, 474 } 475 return client, nil 476} 477 478// PutChunks implements chunk.ObjectClient. 479func (s *ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { 480 for i := range chunks { 481 buf, err := chunks[i].Encoded() 482 if err != nil { 483 return errors.WithStack(err) 484 } 485 key := chunks[i].ExternalKey() 486 tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From) 487 if err != nil { 488 return err 489 } 490 491 // Must provide a range key, even though its not useds - hence 0x00. 492 q := s.writeSession.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)", 493 tableName), key, buf) 494 if err := q.WithContext(ctx).Exec(); err != nil { 495 return errors.WithStack(err) 496 } 497 } 498 499 return nil 500} 501 502// GetChunks implements chunk.ObjectClient. 503func (s *ObjectClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { 504 return util.GetParallelChunks(ctx, input, s.getChunk) 505} 506 507func (s *ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) { 508 if s.querySemaphore != nil { 509 if err := s.querySemaphore.Acquire(ctx, 1); err != nil { 510 return input, err 511 } 512 defer s.querySemaphore.Release(1) 513 } 514 515 tableName, err := s.schemaCfg.ChunkTableFor(input.From) 516 if err != nil { 517 return input, err 518 } 519 520 var buf []byte 521 if err := s.readSession.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()). 522 WithContext(ctx).Scan(&buf); err != nil { 523 return input, errors.WithStack(err) 524 } 525 err = input.Decode(decodeContext, buf) 526 return input, err 527} 528 529func (s *ObjectClient) DeleteChunk(ctx context.Context, userID, chunkID string) error { 530 chunkRef, err := chunk.ParseExternalKey(userID, chunkID) 531 if err != nil { 532 return err 533 } 534 535 tableName, err := s.schemaCfg.ChunkTableFor(chunkRef.From) 536 if err != nil { 537 return err 538 } 539 540 q := s.writeSession.Query(fmt.Sprintf("DELETE FROM %s WHERE hash = ?", 541 tableName), chunkID) 542 if err := q.WithContext(ctx).Exec(); err != nil { 543 return errors.WithStack(err) 544 } 545 546 return nil 547} 548 549func (s *ObjectClient) IsChunkNotFoundErr(_ error) bool { 550 return false 551} 552 553// Stop implement chunk.ObjectClient. 554func (s *ObjectClient) Stop() { 555 s.readSession.Close() 556 s.writeSession.Close() 557} 558 559type noopConvictionPolicy struct{} 560 561// AddFailure should return `true` if the host should be convicted, `false` otherwise. 562// Convicted means connections are removed - we don't want that. 563// Implementats gocql.ConvictionPolicy. 564func (noopConvictionPolicy) AddFailure(err error, host *gocql.HostInfo) bool { 565 level.Error(util_log.Logger).Log("msg", "Cassandra host failure", "err", err, "host", host.String()) 566 return false 567} 568 569// Implementats gocql.ConvictionPolicy. 570func (noopConvictionPolicy) Reset(host *gocql.HostInfo) {} 571