1package aws 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "net" 8 "net/http" 9 "net/url" 10 "strings" 11 "time" 12 13 "github.com/aws/aws-sdk-go/aws" 14 "github.com/aws/aws-sdk-go/aws/awserr" 15 "github.com/aws/aws-sdk-go/aws/client" 16 "github.com/aws/aws-sdk-go/aws/request" 17 "github.com/aws/aws-sdk-go/aws/session" 18 "github.com/aws/aws-sdk-go/service/dynamodb" 19 "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" 20 "github.com/go-kit/log/level" 21 "github.com/grafana/dskit/backoff" 22 "github.com/grafana/dskit/flagext" 23 ot "github.com/opentracing/opentracing-go" 24 otlog "github.com/opentracing/opentracing-go/log" 25 "github.com/pkg/errors" 26 "github.com/prometheus/client_golang/prometheus" 27 awscommon "github.com/weaveworks/common/aws" 28 "github.com/weaveworks/common/instrument" 29 "golang.org/x/time/rate" 30 31 "github.com/cortexproject/cortex/pkg/chunk" 32 chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" 33 "github.com/cortexproject/cortex/pkg/util" 34 "github.com/cortexproject/cortex/pkg/util/log" 35 "github.com/cortexproject/cortex/pkg/util/math" 36 "github.com/cortexproject/cortex/pkg/util/spanlogger" 37) 38 39const ( 40 hashKey = "h" 41 rangeKey = "r" 42 valueKey = "c" 43 44 // For dynamodb errors 45 tableNameLabel = "table" 46 errorReasonLabel = "error" 47 otherError = "other" 48 49 // See http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html. 50 dynamoDBMaxWriteBatchSize = 25 51 dynamoDBMaxReadBatchSize = 100 52 validationException = "ValidationException" 53) 54 55// DynamoDBConfig specifies config for a DynamoDB database. 56type DynamoDBConfig struct { 57 DynamoDB flagext.URLValue `yaml:"dynamodb_url"` 58 APILimit float64 `yaml:"api_limit"` 59 ThrottleLimit float64 `yaml:"throttle_limit"` 60 Metrics MetricsAutoScalingConfig `yaml:"metrics"` 61 ChunkGangSize int `yaml:"chunk_gang_size"` 62 ChunkGetMaxParallelism int `yaml:"chunk_get_max_parallelism"` 63 BackoffConfig backoff.Config `yaml:"backoff_config"` 64} 65 66// RegisterFlags adds the flags required to config this to the given FlagSet 67func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { 68 f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL with escaped Key and Secret encoded. "+ 69 "If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<table-name> to use a mock in-memory implementation.") 70 f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.") 71 f.Float64Var(&cfg.ThrottleLimit, "dynamodb.throttle-limit", 10.0, "DynamoDB rate cap to back off when throttled.") 72 f.IntVar(&cfg.ChunkGangSize, "dynamodb.chunk-gang-size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)") 73 f.IntVar(&cfg.ChunkGetMaxParallelism, "dynamodb.chunk.get-max-parallelism", 32, "Max number of chunk-get operations to start in parallel") 74 f.DurationVar(&cfg.BackoffConfig.MinBackoff, "dynamodb.min-backoff", 100*time.Millisecond, "Minimum backoff time") 75 f.DurationVar(&cfg.BackoffConfig.MaxBackoff, "dynamodb.max-backoff", 50*time.Second, "Maximum backoff time") 76 f.IntVar(&cfg.BackoffConfig.MaxRetries, "dynamodb.max-retries", 20, "Maximum number of times to retry an operation") 77 cfg.Metrics.RegisterFlags(f) 78} 79 80// StorageConfig specifies config for storing data on AWS. 81type StorageConfig struct { 82 DynamoDBConfig `yaml:"dynamodb"` 83 S3Config `yaml:",inline"` 84} 85 86// RegisterFlags adds the flags required to config this to the given FlagSet 87func (cfg *StorageConfig) RegisterFlags(f *flag.FlagSet) { 88 cfg.DynamoDBConfig.RegisterFlags(f) 89 cfg.S3Config.RegisterFlags(f) 90} 91 92// Validate config and returns error on failure 93func (cfg *StorageConfig) Validate() error { 94 if err := cfg.S3Config.Validate(); err != nil { 95 return errors.Wrap(err, "invalid S3 Storage config") 96 } 97 return nil 98} 99 100type dynamoDBStorageClient struct { 101 cfg DynamoDBConfig 102 schemaCfg chunk.SchemaConfig 103 104 DynamoDB dynamodbiface.DynamoDBAPI 105 // These rate-limiters let us slow down when DynamoDB signals provision limits. 106 writeThrottle *rate.Limiter 107 108 // These functions exists for mocking, so we don't have to write a whole load 109 // of boilerplate. 110 batchGetItemRequestFn func(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest 111 batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest 112 113 metrics *dynamoDBMetrics 114} 115 116// NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient. 117func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.IndexClient, error) { 118 return newDynamoDBStorageClient(cfg, schemaCfg, reg) 119} 120 121// NewDynamoDBChunkClient makes a new DynamoDB-backed chunk.Client. 122func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.Client, error) { 123 return newDynamoDBStorageClient(cfg, schemaCfg, reg) 124} 125 126// newDynamoDBStorageClient makes a new DynamoDB-backed IndexClient and chunk.Client. 127func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (*dynamoDBStorageClient, error) { 128 dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL) 129 if err != nil { 130 return nil, err 131 } 132 133 client := &dynamoDBStorageClient{ 134 cfg: cfg, 135 schemaCfg: schemaCfg, 136 DynamoDB: dynamoDB, 137 writeThrottle: rate.NewLimiter(rate.Limit(cfg.ThrottleLimit), dynamoDBMaxWriteBatchSize), 138 metrics: newMetrics(reg), 139 } 140 client.batchGetItemRequestFn = client.batchGetItemRequest 141 client.batchWriteItemRequestFn = client.batchWriteItemRequest 142 return client, nil 143} 144 145// Stop implements chunk.IndexClient. 146func (a dynamoDBStorageClient) Stop() { 147} 148 149// NewWriteBatch implements chunk.IndexClient. 150func (a dynamoDBStorageClient) NewWriteBatch() chunk.WriteBatch { 151 return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{}) 152} 153 154func logWriteRetry(unprocessed dynamoDBWriteBatch, metrics *dynamoDBMetrics) { 155 for table, reqs := range unprocessed { 156 metrics.dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs))) 157 for _, req := range reqs { 158 item := req.PutRequest.Item 159 var hash, rnge string 160 if hashAttr, ok := item[hashKey]; ok { 161 if hashAttr.S != nil { 162 hash = *hashAttr.S 163 } 164 } 165 if rangeAttr, ok := item[rangeKey]; ok { 166 rnge = string(rangeAttr.B) 167 } 168 util.Event().Log("msg", "store retry", "table", table, "hashKey", hash, "rangeKey", rnge) 169 } 170 } 171} 172 173// BatchWrite writes requests to the underlying storage, handling retries and backoff. 174// Structure is identical to getDynamoDBChunks(), but operating on different datatypes 175// so cannot share implementation. If you fix a bug here fix it there too. 176func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) error { 177 outstanding := input.(dynamoDBWriteBatch) 178 unprocessed := dynamoDBWriteBatch{} 179 180 backoff := backoff.New(ctx, a.cfg.BackoffConfig) 181 182 for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() { 183 requests := dynamoDBWriteBatch{} 184 requests.TakeReqs(outstanding, dynamoDBMaxWriteBatchSize) 185 requests.TakeReqs(unprocessed, dynamoDBMaxWriteBatchSize) 186 187 request := a.batchWriteItemRequestFn(ctx, &dynamodb.BatchWriteItemInput{ 188 RequestItems: requests, 189 ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), 190 }) 191 192 err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { 193 return request.Send() 194 }) 195 resp := request.Data().(*dynamodb.BatchWriteItemOutput) 196 197 for _, cc := range resp.ConsumedCapacity { 198 a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName). 199 Add(float64(*cc.CapacityUnits)) 200 } 201 202 if err != nil { 203 for tableName := range requests { 204 recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics) 205 } 206 207 // If we get provisionedThroughputExceededException, then no items were processed, 208 // so back off and retry all. 209 if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { 210 logWriteRetry(requests, a.metrics) 211 unprocessed.TakeReqs(requests, -1) 212 _ = a.writeThrottle.WaitN(ctx, len(requests)) 213 backoff.Wait() 214 continue 215 } else if ok && awsErr.Code() == validationException { 216 // this write will never work, so the only option is to drop the offending items and continue. 217 level.Warn(log.Logger).Log("msg", "Data lost while flushing to DynamoDB", "err", awsErr) 218 level.Debug(log.Logger).Log("msg", "Dropped request details", "requests", requests) 219 util.Event().Log("msg", "ValidationException", "requests", requests) 220 // recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context 221 // to determine if a request was dropped (or not) 222 for tableName := range requests { 223 a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc() 224 } 225 continue 226 } 227 228 // All other errors are critical. 229 return err 230 } 231 232 // If there are unprocessed items, retry those items. 233 unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems) 234 if len(unprocessedItems) > 0 { 235 logWriteRetry(unprocessedItems, a.metrics) 236 _ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len()) 237 unprocessed.TakeReqs(unprocessedItems, -1) 238 } 239 240 backoff.Reset() 241 } 242 243 if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 { 244 return fmt.Errorf("failed to write items to DynamoDB, %d values remaining: %s", valuesLeft, backoff.Err()) 245 } 246 return backoff.Err() 247} 248 249// QueryPages implements chunk.IndexClient. 250func (a dynamoDBStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { 251 return chunk_util.DoParallelQueries(ctx, a.query, queries, callback) 252} 253 254func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { 255 input := &dynamodb.QueryInput{ 256 TableName: aws.String(query.TableName), 257 KeyConditions: map[string]*dynamodb.Condition{ 258 hashKey: { 259 AttributeValueList: []*dynamodb.AttributeValue{ 260 {S: aws.String(query.HashValue)}, 261 }, 262 ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq), 263 }, 264 }, 265 ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), 266 } 267 268 if query.RangeValuePrefix != nil { 269 input.KeyConditions[rangeKey] = &dynamodb.Condition{ 270 AttributeValueList: []*dynamodb.AttributeValue{ 271 {B: query.RangeValuePrefix}, 272 }, 273 ComparisonOperator: aws.String(dynamodb.ComparisonOperatorBeginsWith), 274 } 275 } else if query.RangeValueStart != nil { 276 input.KeyConditions[rangeKey] = &dynamodb.Condition{ 277 AttributeValueList: []*dynamodb.AttributeValue{ 278 {B: query.RangeValueStart}, 279 }, 280 ComparisonOperator: aws.String(dynamodb.ComparisonOperatorGe), 281 } 282 } 283 284 // Filters 285 if query.ValueEqual != nil { 286 input.FilterExpression = aws.String(fmt.Sprintf("%s = :v", valueKey)) 287 input.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ 288 ":v": { 289 B: query.ValueEqual, 290 }, 291 } 292 } 293 294 pageCount := 0 295 defer func() { 296 a.metrics.dynamoQueryPagesCount.Observe(float64(pageCount)) 297 }() 298 299 retryer := newRetryer(ctx, a.cfg.BackoffConfig) 300 err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error { 301 if sp := ot.SpanFromContext(innerCtx); sp != nil { 302 sp.SetTag("tableName", query.TableName) 303 sp.SetTag("hashValue", query.HashValue) 304 } 305 return a.DynamoDB.QueryPagesWithContext(innerCtx, input, func(output *dynamodb.QueryOutput, _ bool) bool { 306 pageCount++ 307 if sp := ot.SpanFromContext(innerCtx); sp != nil { 308 sp.LogFields(otlog.Int("page", pageCount)) 309 } 310 311 if cc := output.ConsumedCapacity; cc != nil { 312 a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName). 313 Add(float64(*cc.CapacityUnits)) 314 } 315 316 return callback(query, &dynamoDBReadResponse{items: output.Items}) 317 }, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics)) 318 }) 319 if err != nil { 320 return errors.Wrapf(err, "QueryPages error: table=%v", query.TableName) 321 } 322 return err 323} 324 325type dynamoDBRequest interface { 326 Send() error 327 Data() interface{} 328 Error() error 329 Retryable() bool 330} 331 332func (a dynamoDBStorageClient) batchGetItemRequest(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest { 333 req, _ := a.DynamoDB.BatchGetItemRequest(input) 334 req.SetContext(ctx) 335 return dynamoDBRequestAdapter{req} 336} 337 338func (a dynamoDBStorageClient) batchWriteItemRequest(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest { 339 req, _ := a.DynamoDB.BatchWriteItemRequest(input) 340 req.SetContext(ctx) 341 return dynamoDBRequestAdapter{req} 342} 343 344type dynamoDBRequestAdapter struct { 345 request *request.Request 346} 347 348func (a dynamoDBRequestAdapter) Data() interface{} { 349 return a.request.Data 350} 351 352func (a dynamoDBRequestAdapter) Send() error { 353 return a.request.Send() 354} 355 356func (a dynamoDBRequestAdapter) Error() error { 357 return a.request.Error 358} 359 360func (a dynamoDBRequestAdapter) Retryable() bool { 361 return aws.BoolValue(a.request.Retryable) 362} 363 364type chunksPlusError struct { 365 chunks []chunk.Chunk 366 err error 367} 368 369// GetChunks implements chunk.Client. 370func (a dynamoDBStorageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { 371 log, ctx := spanlogger.New(ctx, "GetChunks.DynamoDB", ot.Tag{Key: "numChunks", Value: len(chunks)}) 372 defer log.Span.Finish() 373 level.Debug(log).Log("chunks requested", len(chunks)) 374 375 dynamoDBChunks := chunks 376 var err error 377 378 gangSize := a.cfg.ChunkGangSize * dynamoDBMaxReadBatchSize 379 if gangSize == 0 { // zero means turn feature off 380 gangSize = len(dynamoDBChunks) 381 } else { 382 if len(dynamoDBChunks)/gangSize > a.cfg.ChunkGetMaxParallelism { 383 gangSize = len(dynamoDBChunks)/a.cfg.ChunkGetMaxParallelism + 1 384 } 385 } 386 387 results := make(chan chunksPlusError) 388 for i := 0; i < len(dynamoDBChunks); i += gangSize { 389 go func(start int) { 390 end := start + gangSize 391 if end > len(dynamoDBChunks) { 392 end = len(dynamoDBChunks) 393 } 394 outChunks, err := a.getDynamoDBChunks(ctx, dynamoDBChunks[start:end]) 395 results <- chunksPlusError{outChunks, err} 396 }(i) 397 } 398 finalChunks := []chunk.Chunk{} 399 for i := 0; i < len(dynamoDBChunks); i += gangSize { 400 in := <-results 401 if in.err != nil { 402 err = in.err // TODO: cancel other sub-queries at this point 403 } 404 finalChunks = append(finalChunks, in.chunks...) 405 } 406 level.Debug(log).Log("chunks fetched", len(finalChunks)) 407 408 // Return any chunks we did receive: a partial result may be useful 409 return finalChunks, log.Error(err) 410} 411 412// As we're re-using the DynamoDB schema from the index for the chunk tables, 413// we need to provide a non-null, non-empty value for the range value. 414var placeholder = []byte{'c'} 415 416// Fetch a set of chunks from DynamoDB, handling retries and backoff. 417// Structure is identical to BatchWrite(), but operating on different datatypes 418// so cannot share implementation. If you fix a bug here fix it there too. 419func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { 420 log, ctx := spanlogger.New(ctx, "getDynamoDBChunks", ot.Tag{Key: "numChunks", Value: len(chunks)}) 421 defer log.Span.Finish() 422 outstanding := dynamoDBReadRequest{} 423 chunksByKey := map[string]chunk.Chunk{} 424 for _, chunk := range chunks { 425 key := chunk.ExternalKey() 426 chunksByKey[key] = chunk 427 tableName, err := a.schemaCfg.ChunkTableFor(chunk.From) 428 if err != nil { 429 return nil, log.Error(err) 430 } 431 outstanding.Add(tableName, key, placeholder) 432 } 433 434 result := []chunk.Chunk{} 435 unprocessed := dynamoDBReadRequest{} 436 backoff := backoff.New(ctx, a.cfg.BackoffConfig) 437 438 for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() { 439 requests := dynamoDBReadRequest{} 440 requests.TakeReqs(outstanding, dynamoDBMaxReadBatchSize) 441 requests.TakeReqs(unprocessed, dynamoDBMaxReadBatchSize) 442 443 request := a.batchGetItemRequestFn(ctx, &dynamodb.BatchGetItemInput{ 444 RequestItems: requests, 445 ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), 446 }) 447 448 err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { 449 return request.Send() 450 }) 451 response := request.Data().(*dynamodb.BatchGetItemOutput) 452 453 for _, cc := range response.ConsumedCapacity { 454 a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName). 455 Add(float64(*cc.CapacityUnits)) 456 } 457 458 if err != nil { 459 for tableName := range requests { 460 recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics) 461 } 462 463 // If we get provisionedThroughputExceededException, then no items were processed, 464 // so back off and retry all. 465 if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { 466 unprocessed.TakeReqs(requests, -1) 467 backoff.Wait() 468 continue 469 } else if ok && awsErr.Code() == validationException { 470 // this read will never work, so the only option is to drop the offending request and continue. 471 level.Warn(log).Log("msg", "Error while fetching data from Dynamo", "err", awsErr) 472 level.Debug(log).Log("msg", "Dropped request details", "requests", requests) 473 // recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context 474 // to determine if a request was dropped (or not) 475 for tableName := range requests { 476 a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc() 477 } 478 continue 479 } 480 481 // All other errors are critical. 482 return nil, err 483 } 484 485 processedChunks, err := processChunkResponse(response, chunksByKey) 486 if err != nil { 487 return nil, log.Error(err) 488 } 489 result = append(result, processedChunks...) 490 491 // If there are unprocessed items, retry those items. 492 if unprocessedKeys := response.UnprocessedKeys; unprocessedKeys != nil && dynamoDBReadRequest(unprocessedKeys).Len() > 0 { 493 unprocessed.TakeReqs(unprocessedKeys, -1) 494 } 495 496 backoff.Reset() 497 } 498 499 if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 { 500 // Return the chunks we did fetch, because partial results may be useful 501 return result, log.Error(fmt.Errorf("failed to query chunks, %d values remaining: %s", valuesLeft, backoff.Err())) 502 } 503 return result, nil 504} 505 506func processChunkResponse(response *dynamodb.BatchGetItemOutput, chunksByKey map[string]chunk.Chunk) ([]chunk.Chunk, error) { 507 result := []chunk.Chunk{} 508 decodeContext := chunk.NewDecodeContext() 509 for _, items := range response.Responses { 510 for _, item := range items { 511 key, ok := item[hashKey] 512 if !ok || key == nil || key.S == nil { 513 return nil, fmt.Errorf("Got response from DynamoDB with no hash key: %+v", item) 514 } 515 516 chunk, ok := chunksByKey[*key.S] 517 if !ok { 518 return nil, fmt.Errorf("Got response from DynamoDB with chunk I didn't ask for: %s", *key.S) 519 } 520 521 buf, ok := item[valueKey] 522 if !ok || buf == nil || buf.B == nil { 523 return nil, fmt.Errorf("Got response from DynamoDB with no value: %+v", item) 524 } 525 526 if err := chunk.Decode(decodeContext, buf.B); err != nil { 527 return nil, err 528 } 529 530 result = append(result, chunk) 531 } 532 } 533 return result, nil 534} 535 536// PutChunkAndIndex implements chunk.ObjectAndIndexClient 537// Combine both sets of writes before sending to DynamoDB, for performance 538func (a dynamoDBStorageClient) PutChunksAndIndex(ctx context.Context, chunks []chunk.Chunk, index chunk.WriteBatch) error { 539 dynamoDBWrites, err := a.writesForChunks(chunks) 540 if err != nil { 541 return err 542 } 543 dynamoDBWrites.TakeReqs(index.(dynamoDBWriteBatch), 0) 544 return a.BatchWrite(ctx, dynamoDBWrites) 545} 546 547// PutChunks implements chunk.Client. 548func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { 549 dynamoDBWrites, err := a.writesForChunks(chunks) 550 if err != nil { 551 return err 552 } 553 return a.BatchWrite(ctx, dynamoDBWrites) 554} 555 556func (a dynamoDBStorageClient) DeleteChunk(ctx context.Context, userID, chunkID string) error { 557 chunkRef, err := chunk.ParseExternalKey(userID, chunkID) 558 if err != nil { 559 return err 560 } 561 562 tableName, err := a.schemaCfg.ChunkTableFor(chunkRef.From) 563 if err != nil { 564 return err 565 } 566 567 dynamoDBWrites := dynamoDBWriteBatch{} 568 dynamoDBWrites.Delete(tableName, chunkID, placeholder) 569 return a.BatchWrite(ctx, dynamoDBWrites) 570} 571 572func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWriteBatch, error) { 573 var ( 574 dynamoDBWrites = dynamoDBWriteBatch{} 575 ) 576 577 for i := range chunks { 578 buf, err := chunks[i].Encoded() 579 if err != nil { 580 return nil, err 581 } 582 key := chunks[i].ExternalKey() 583 584 table, err := a.schemaCfg.ChunkTableFor(chunks[i].From) 585 if err != nil { 586 return nil, err 587 } 588 589 dynamoDBWrites.Add(table, key, placeholder, buf) 590 } 591 592 return dynamoDBWrites, nil 593} 594 595// Slice of values returned; map key is attribute name 596type dynamoDBReadResponse struct { 597 items []map[string]*dynamodb.AttributeValue 598} 599 600func (b *dynamoDBReadResponse) Iterator() chunk.ReadBatchIterator { 601 return &dynamoDBReadResponseIterator{ 602 i: -1, 603 dynamoDBReadResponse: b, 604 } 605} 606 607type dynamoDBReadResponseIterator struct { 608 i int 609 *dynamoDBReadResponse 610} 611 612func (b *dynamoDBReadResponseIterator) Next() bool { 613 b.i++ 614 return b.i < len(b.items) 615} 616 617func (b *dynamoDBReadResponseIterator) RangeValue() []byte { 618 return b.items[b.i][rangeKey].B 619} 620 621func (b *dynamoDBReadResponseIterator) Value() []byte { 622 chunkValue, ok := b.items[b.i][valueKey] 623 if !ok { 624 return nil 625 } 626 return chunkValue.B 627} 628 629// map key is table name; value is a slice of things to 'put' 630type dynamoDBWriteBatch map[string][]*dynamodb.WriteRequest 631 632func (b dynamoDBWriteBatch) Len() int { 633 result := 0 634 for _, reqs := range b { 635 result += len(reqs) 636 } 637 return result 638} 639 640func (b dynamoDBWriteBatch) String() string { 641 var sb strings.Builder 642 sb.WriteByte('{') 643 for k, reqs := range b { 644 sb.WriteString(k) 645 sb.WriteString(": [") 646 for _, req := range reqs { 647 sb.WriteString(req.String()) 648 sb.WriteByte(',') 649 } 650 sb.WriteString("], ") 651 } 652 sb.WriteByte('}') 653 return sb.String() 654} 655 656func (b dynamoDBWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { 657 item := map[string]*dynamodb.AttributeValue{ 658 hashKey: {S: aws.String(hashValue)}, 659 rangeKey: {B: rangeValue}, 660 } 661 662 if value != nil { 663 item[valueKey] = &dynamodb.AttributeValue{B: value} 664 } 665 666 b[tableName] = append(b[tableName], &dynamodb.WriteRequest{ 667 PutRequest: &dynamodb.PutRequest{ 668 Item: item, 669 }, 670 }) 671} 672 673func (b dynamoDBWriteBatch) Delete(tableName, hashValue string, rangeValue []byte) { 674 b[tableName] = append(b[tableName], &dynamodb.WriteRequest{ 675 DeleteRequest: &dynamodb.DeleteRequest{ 676 Key: map[string]*dynamodb.AttributeValue{ 677 hashKey: {S: aws.String(hashValue)}, 678 rangeKey: {B: rangeValue}, 679 }, 680 }, 681 }) 682} 683 684// Fill 'b' with WriteRequests from 'from' until 'b' has at most max requests. Remove those requests from 'from'. 685func (b dynamoDBWriteBatch) TakeReqs(from dynamoDBWriteBatch, max int) { 686 outLen, inLen := b.Len(), from.Len() 687 toFill := inLen 688 if max > 0 { 689 toFill = math.Min(inLen, max-outLen) 690 } 691 for toFill > 0 { 692 for tableName, fromReqs := range from { 693 taken := math.Min(len(fromReqs), toFill) 694 if taken > 0 { 695 b[tableName] = append(b[tableName], fromReqs[:taken]...) 696 from[tableName] = fromReqs[taken:] 697 toFill -= taken 698 } 699 } 700 } 701} 702 703// map key is table name 704type dynamoDBReadRequest map[string]*dynamodb.KeysAndAttributes 705 706func (b dynamoDBReadRequest) Len() int { 707 result := 0 708 for _, reqs := range b { 709 result += len(reqs.Keys) 710 } 711 return result 712} 713 714func (b dynamoDBReadRequest) Add(tableName, hashValue string, rangeValue []byte) { 715 requests, ok := b[tableName] 716 if !ok { 717 requests = &dynamodb.KeysAndAttributes{ 718 AttributesToGet: []*string{ 719 aws.String(hashKey), 720 aws.String(valueKey), 721 }, 722 } 723 b[tableName] = requests 724 } 725 requests.Keys = append(requests.Keys, map[string]*dynamodb.AttributeValue{ 726 hashKey: {S: aws.String(hashValue)}, 727 rangeKey: {B: rangeValue}, 728 }) 729} 730 731// Fill 'b' with ReadRequests from 'from' until 'b' has at most max requests. Remove those requests from 'from'. 732func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) { 733 outLen, inLen := b.Len(), from.Len() 734 toFill := inLen 735 if max > 0 { 736 toFill = math.Min(inLen, max-outLen) 737 } 738 for toFill > 0 { 739 for tableName, fromReqs := range from { 740 taken := math.Min(len(fromReqs.Keys), toFill) 741 if taken > 0 { 742 if _, ok := b[tableName]; !ok { 743 b[tableName] = &dynamodb.KeysAndAttributes{ 744 AttributesToGet: []*string{ 745 aws.String(hashKey), 746 aws.String(valueKey), 747 }, 748 } 749 } 750 751 b[tableName].Keys = append(b[tableName].Keys, fromReqs.Keys[:taken]...) 752 from[tableName].Keys = fromReqs.Keys[taken:] 753 toFill -= taken 754 } 755 } 756 } 757} 758 759func withErrorHandler(tableName, operation string, metrics *dynamoDBMetrics) func(req *request.Request) { 760 return func(req *request.Request) { 761 req.Handlers.CompleteAttempt.PushBack(func(req *request.Request) { 762 if req.Error != nil { 763 recordDynamoError(tableName, req.Error, operation, metrics) 764 } 765 }) 766 } 767} 768 769func recordDynamoError(tableName string, err error, operation string, metrics *dynamoDBMetrics) { 770 if awsErr, ok := err.(awserr.Error); ok { 771 metrics.dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1)) 772 } else { 773 metrics.dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1)) 774 } 775} 776 777// dynamoClientFromURL creates a new DynamoDB client from a URL. 778func dynamoClientFromURL(awsURL *url.URL) (dynamodbiface.DynamoDBAPI, error) { 779 dynamoDBSession, err := awsSessionFromURL(awsURL) 780 if err != nil { 781 return nil, err 782 } 783 return dynamodb.New(dynamoDBSession), nil 784} 785 786// awsSessionFromURL creates a new aws session from a URL. 787func awsSessionFromURL(awsURL *url.URL) (client.ConfigProvider, error) { 788 if awsURL == nil { 789 return nil, fmt.Errorf("no URL specified for DynamoDB") 790 } 791 path := strings.TrimPrefix(awsURL.Path, "/") 792 if len(path) > 0 { 793 level.Warn(log.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) 794 } 795 config, err := awscommon.ConfigFromURL(awsURL) 796 if err != nil { 797 return nil, err 798 } 799 config = config.WithMaxRetries(0) // We do our own retries, so we can monitor them 800 config = config.WithHTTPClient(&http.Client{Transport: defaultTransport}) 801 return session.NewSession(config) 802} 803 804// Copy-pasted http.DefaultTransport 805var defaultTransport http.RoundTripper = &http.Transport{ 806 Proxy: http.ProxyFromEnvironment, 807 DialContext: (&net.Dialer{ 808 Timeout: 30 * time.Second, 809 KeepAlive: 30 * time.Second, 810 }).DialContext, 811 ForceAttemptHTTP2: true, 812 MaxIdleConns: 100, 813 // We will connect many times in parallel to the same DynamoDB server, 814 // see https://github.com/golang/go/issues/13801 815 MaxIdleConnsPerHost: 100, 816 IdleConnTimeout: 90 * time.Second, 817 TLSHandshakeTimeout: 10 * time.Second, 818 ExpectContinueTimeout: 1 * time.Second, 819} 820