1package aws
2
3import (
4	"context"
5	"flag"
6	"fmt"
7	"net"
8	"net/http"
9	"net/url"
10	"strings"
11	"time"
12
13	"github.com/aws/aws-sdk-go/aws"
14	"github.com/aws/aws-sdk-go/aws/awserr"
15	"github.com/aws/aws-sdk-go/aws/client"
16	"github.com/aws/aws-sdk-go/aws/request"
17	"github.com/aws/aws-sdk-go/aws/session"
18	"github.com/aws/aws-sdk-go/service/dynamodb"
19	"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
20	"github.com/go-kit/log/level"
21	"github.com/grafana/dskit/backoff"
22	"github.com/grafana/dskit/flagext"
23	ot "github.com/opentracing/opentracing-go"
24	otlog "github.com/opentracing/opentracing-go/log"
25	"github.com/pkg/errors"
26	"github.com/prometheus/client_golang/prometheus"
27	awscommon "github.com/weaveworks/common/aws"
28	"github.com/weaveworks/common/instrument"
29	"golang.org/x/time/rate"
30
31	"github.com/cortexproject/cortex/pkg/chunk"
32	chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
33	"github.com/cortexproject/cortex/pkg/util"
34	"github.com/cortexproject/cortex/pkg/util/log"
35	"github.com/cortexproject/cortex/pkg/util/math"
36	"github.com/cortexproject/cortex/pkg/util/spanlogger"
37)
38
39const (
40	hashKey  = "h"
41	rangeKey = "r"
42	valueKey = "c"
43
44	// For dynamodb errors
45	tableNameLabel   = "table"
46	errorReasonLabel = "error"
47	otherError       = "other"
48
49	// See http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html.
50	dynamoDBMaxWriteBatchSize = 25
51	dynamoDBMaxReadBatchSize  = 100
52	validationException       = "ValidationException"
53)
54
55// DynamoDBConfig specifies config for a DynamoDB database.
56type DynamoDBConfig struct {
57	DynamoDB               flagext.URLValue         `yaml:"dynamodb_url"`
58	APILimit               float64                  `yaml:"api_limit"`
59	ThrottleLimit          float64                  `yaml:"throttle_limit"`
60	Metrics                MetricsAutoScalingConfig `yaml:"metrics"`
61	ChunkGangSize          int                      `yaml:"chunk_gang_size"`
62	ChunkGetMaxParallelism int                      `yaml:"chunk_get_max_parallelism"`
63	BackoffConfig          backoff.Config           `yaml:"backoff_config"`
64}
65
66// RegisterFlags adds the flags required to config this to the given FlagSet
67func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
68	f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL with escaped Key and Secret encoded. "+
69		"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<table-name> to use a mock in-memory implementation.")
70	f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.")
71	f.Float64Var(&cfg.ThrottleLimit, "dynamodb.throttle-limit", 10.0, "DynamoDB rate cap to back off when throttled.")
72	f.IntVar(&cfg.ChunkGangSize, "dynamodb.chunk-gang-size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)")
73	f.IntVar(&cfg.ChunkGetMaxParallelism, "dynamodb.chunk.get-max-parallelism", 32, "Max number of chunk-get operations to start in parallel")
74	f.DurationVar(&cfg.BackoffConfig.MinBackoff, "dynamodb.min-backoff", 100*time.Millisecond, "Minimum backoff time")
75	f.DurationVar(&cfg.BackoffConfig.MaxBackoff, "dynamodb.max-backoff", 50*time.Second, "Maximum backoff time")
76	f.IntVar(&cfg.BackoffConfig.MaxRetries, "dynamodb.max-retries", 20, "Maximum number of times to retry an operation")
77	cfg.Metrics.RegisterFlags(f)
78}
79
80// StorageConfig specifies config for storing data on AWS.
81type StorageConfig struct {
82	DynamoDBConfig `yaml:"dynamodb"`
83	S3Config       `yaml:",inline"`
84}
85
86// RegisterFlags adds the flags required to config this to the given FlagSet
87func (cfg *StorageConfig) RegisterFlags(f *flag.FlagSet) {
88	cfg.DynamoDBConfig.RegisterFlags(f)
89	cfg.S3Config.RegisterFlags(f)
90}
91
92// Validate config and returns error on failure
93func (cfg *StorageConfig) Validate() error {
94	if err := cfg.S3Config.Validate(); err != nil {
95		return errors.Wrap(err, "invalid S3 Storage config")
96	}
97	return nil
98}
99
100type dynamoDBStorageClient struct {
101	cfg       DynamoDBConfig
102	schemaCfg chunk.SchemaConfig
103
104	DynamoDB dynamodbiface.DynamoDBAPI
105	// These rate-limiters let us slow down when DynamoDB signals provision limits.
106	writeThrottle *rate.Limiter
107
108	// These functions exists for mocking, so we don't have to write a whole load
109	// of boilerplate.
110	batchGetItemRequestFn   func(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest
111	batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest
112
113	metrics *dynamoDBMetrics
114}
115
116// NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient.
117func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.IndexClient, error) {
118	return newDynamoDBStorageClient(cfg, schemaCfg, reg)
119}
120
121// NewDynamoDBChunkClient makes a new DynamoDB-backed chunk.Client.
122func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.Client, error) {
123	return newDynamoDBStorageClient(cfg, schemaCfg, reg)
124}
125
126// newDynamoDBStorageClient makes a new DynamoDB-backed IndexClient and chunk.Client.
127func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (*dynamoDBStorageClient, error) {
128	dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL)
129	if err != nil {
130		return nil, err
131	}
132
133	client := &dynamoDBStorageClient{
134		cfg:           cfg,
135		schemaCfg:     schemaCfg,
136		DynamoDB:      dynamoDB,
137		writeThrottle: rate.NewLimiter(rate.Limit(cfg.ThrottleLimit), dynamoDBMaxWriteBatchSize),
138		metrics:       newMetrics(reg),
139	}
140	client.batchGetItemRequestFn = client.batchGetItemRequest
141	client.batchWriteItemRequestFn = client.batchWriteItemRequest
142	return client, nil
143}
144
145// Stop implements chunk.IndexClient.
146func (a dynamoDBStorageClient) Stop() {
147}
148
149// NewWriteBatch implements chunk.IndexClient.
150func (a dynamoDBStorageClient) NewWriteBatch() chunk.WriteBatch {
151	return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{})
152}
153
154func logWriteRetry(unprocessed dynamoDBWriteBatch, metrics *dynamoDBMetrics) {
155	for table, reqs := range unprocessed {
156		metrics.dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs)))
157		for _, req := range reqs {
158			item := req.PutRequest.Item
159			var hash, rnge string
160			if hashAttr, ok := item[hashKey]; ok {
161				if hashAttr.S != nil {
162					hash = *hashAttr.S
163				}
164			}
165			if rangeAttr, ok := item[rangeKey]; ok {
166				rnge = string(rangeAttr.B)
167			}
168			util.Event().Log("msg", "store retry", "table", table, "hashKey", hash, "rangeKey", rnge)
169		}
170	}
171}
172
173// BatchWrite writes requests to the underlying storage, handling retries and backoff.
174// Structure is identical to getDynamoDBChunks(), but operating on different datatypes
175// so cannot share implementation.  If you fix a bug here fix it there too.
176func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) error {
177	outstanding := input.(dynamoDBWriteBatch)
178	unprocessed := dynamoDBWriteBatch{}
179
180	backoff := backoff.New(ctx, a.cfg.BackoffConfig)
181
182	for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() {
183		requests := dynamoDBWriteBatch{}
184		requests.TakeReqs(outstanding, dynamoDBMaxWriteBatchSize)
185		requests.TakeReqs(unprocessed, dynamoDBMaxWriteBatchSize)
186
187		request := a.batchWriteItemRequestFn(ctx, &dynamodb.BatchWriteItemInput{
188			RequestItems:           requests,
189			ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
190		})
191
192		err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
193			return request.Send()
194		})
195		resp := request.Data().(*dynamodb.BatchWriteItemOutput)
196
197		for _, cc := range resp.ConsumedCapacity {
198			a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName).
199				Add(float64(*cc.CapacityUnits))
200		}
201
202		if err != nil {
203			for tableName := range requests {
204				recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics)
205			}
206
207			// If we get provisionedThroughputExceededException, then no items were processed,
208			// so back off and retry all.
209			if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
210				logWriteRetry(requests, a.metrics)
211				unprocessed.TakeReqs(requests, -1)
212				_ = a.writeThrottle.WaitN(ctx, len(requests))
213				backoff.Wait()
214				continue
215			} else if ok && awsErr.Code() == validationException {
216				// this write will never work, so the only option is to drop the offending items and continue.
217				level.Warn(log.Logger).Log("msg", "Data lost while flushing to DynamoDB", "err", awsErr)
218				level.Debug(log.Logger).Log("msg", "Dropped request details", "requests", requests)
219				util.Event().Log("msg", "ValidationException", "requests", requests)
220				// recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context
221				// to determine if a request was dropped (or not)
222				for tableName := range requests {
223					a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc()
224				}
225				continue
226			}
227
228			// All other errors are critical.
229			return err
230		}
231
232		// If there are unprocessed items, retry those items.
233		unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems)
234		if len(unprocessedItems) > 0 {
235			logWriteRetry(unprocessedItems, a.metrics)
236			_ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len())
237			unprocessed.TakeReqs(unprocessedItems, -1)
238		}
239
240		backoff.Reset()
241	}
242
243	if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 {
244		return fmt.Errorf("failed to write items to DynamoDB, %d values remaining: %s", valuesLeft, backoff.Err())
245	}
246	return backoff.Err()
247}
248
249// QueryPages implements chunk.IndexClient.
250func (a dynamoDBStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error {
251	return chunk_util.DoParallelQueries(ctx, a.query, queries, callback)
252}
253
254func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
255	input := &dynamodb.QueryInput{
256		TableName: aws.String(query.TableName),
257		KeyConditions: map[string]*dynamodb.Condition{
258			hashKey: {
259				AttributeValueList: []*dynamodb.AttributeValue{
260					{S: aws.String(query.HashValue)},
261				},
262				ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq),
263			},
264		},
265		ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
266	}
267
268	if query.RangeValuePrefix != nil {
269		input.KeyConditions[rangeKey] = &dynamodb.Condition{
270			AttributeValueList: []*dynamodb.AttributeValue{
271				{B: query.RangeValuePrefix},
272			},
273			ComparisonOperator: aws.String(dynamodb.ComparisonOperatorBeginsWith),
274		}
275	} else if query.RangeValueStart != nil {
276		input.KeyConditions[rangeKey] = &dynamodb.Condition{
277			AttributeValueList: []*dynamodb.AttributeValue{
278				{B: query.RangeValueStart},
279			},
280			ComparisonOperator: aws.String(dynamodb.ComparisonOperatorGe),
281		}
282	}
283
284	// Filters
285	if query.ValueEqual != nil {
286		input.FilterExpression = aws.String(fmt.Sprintf("%s = :v", valueKey))
287		input.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{
288			":v": {
289				B: query.ValueEqual,
290			},
291		}
292	}
293
294	pageCount := 0
295	defer func() {
296		a.metrics.dynamoQueryPagesCount.Observe(float64(pageCount))
297	}()
298
299	retryer := newRetryer(ctx, a.cfg.BackoffConfig)
300	err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error {
301		if sp := ot.SpanFromContext(innerCtx); sp != nil {
302			sp.SetTag("tableName", query.TableName)
303			sp.SetTag("hashValue", query.HashValue)
304		}
305		return a.DynamoDB.QueryPagesWithContext(innerCtx, input, func(output *dynamodb.QueryOutput, _ bool) bool {
306			pageCount++
307			if sp := ot.SpanFromContext(innerCtx); sp != nil {
308				sp.LogFields(otlog.Int("page", pageCount))
309			}
310
311			if cc := output.ConsumedCapacity; cc != nil {
312				a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName).
313					Add(float64(*cc.CapacityUnits))
314			}
315
316			return callback(query, &dynamoDBReadResponse{items: output.Items})
317		}, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics))
318	})
319	if err != nil {
320		return errors.Wrapf(err, "QueryPages error: table=%v", query.TableName)
321	}
322	return err
323}
324
325type dynamoDBRequest interface {
326	Send() error
327	Data() interface{}
328	Error() error
329	Retryable() bool
330}
331
332func (a dynamoDBStorageClient) batchGetItemRequest(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest {
333	req, _ := a.DynamoDB.BatchGetItemRequest(input)
334	req.SetContext(ctx)
335	return dynamoDBRequestAdapter{req}
336}
337
338func (a dynamoDBStorageClient) batchWriteItemRequest(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest {
339	req, _ := a.DynamoDB.BatchWriteItemRequest(input)
340	req.SetContext(ctx)
341	return dynamoDBRequestAdapter{req}
342}
343
344type dynamoDBRequestAdapter struct {
345	request *request.Request
346}
347
348func (a dynamoDBRequestAdapter) Data() interface{} {
349	return a.request.Data
350}
351
352func (a dynamoDBRequestAdapter) Send() error {
353	return a.request.Send()
354}
355
356func (a dynamoDBRequestAdapter) Error() error {
357	return a.request.Error
358}
359
360func (a dynamoDBRequestAdapter) Retryable() bool {
361	return aws.BoolValue(a.request.Retryable)
362}
363
364type chunksPlusError struct {
365	chunks []chunk.Chunk
366	err    error
367}
368
369// GetChunks implements chunk.Client.
370func (a dynamoDBStorageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
371	log, ctx := spanlogger.New(ctx, "GetChunks.DynamoDB", ot.Tag{Key: "numChunks", Value: len(chunks)})
372	defer log.Span.Finish()
373	level.Debug(log).Log("chunks requested", len(chunks))
374
375	dynamoDBChunks := chunks
376	var err error
377
378	gangSize := a.cfg.ChunkGangSize * dynamoDBMaxReadBatchSize
379	if gangSize == 0 { // zero means turn feature off
380		gangSize = len(dynamoDBChunks)
381	} else {
382		if len(dynamoDBChunks)/gangSize > a.cfg.ChunkGetMaxParallelism {
383			gangSize = len(dynamoDBChunks)/a.cfg.ChunkGetMaxParallelism + 1
384		}
385	}
386
387	results := make(chan chunksPlusError)
388	for i := 0; i < len(dynamoDBChunks); i += gangSize {
389		go func(start int) {
390			end := start + gangSize
391			if end > len(dynamoDBChunks) {
392				end = len(dynamoDBChunks)
393			}
394			outChunks, err := a.getDynamoDBChunks(ctx, dynamoDBChunks[start:end])
395			results <- chunksPlusError{outChunks, err}
396		}(i)
397	}
398	finalChunks := []chunk.Chunk{}
399	for i := 0; i < len(dynamoDBChunks); i += gangSize {
400		in := <-results
401		if in.err != nil {
402			err = in.err // TODO: cancel other sub-queries at this point
403		}
404		finalChunks = append(finalChunks, in.chunks...)
405	}
406	level.Debug(log).Log("chunks fetched", len(finalChunks))
407
408	// Return any chunks we did receive: a partial result may be useful
409	return finalChunks, log.Error(err)
410}
411
412// As we're re-using the DynamoDB schema from the index for the chunk tables,
413// we need to provide a non-null, non-empty value for the range value.
414var placeholder = []byte{'c'}
415
416// Fetch a set of chunks from DynamoDB, handling retries and backoff.
417// Structure is identical to BatchWrite(), but operating on different datatypes
418// so cannot share implementation.  If you fix a bug here fix it there too.
419func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
420	log, ctx := spanlogger.New(ctx, "getDynamoDBChunks", ot.Tag{Key: "numChunks", Value: len(chunks)})
421	defer log.Span.Finish()
422	outstanding := dynamoDBReadRequest{}
423	chunksByKey := map[string]chunk.Chunk{}
424	for _, chunk := range chunks {
425		key := chunk.ExternalKey()
426		chunksByKey[key] = chunk
427		tableName, err := a.schemaCfg.ChunkTableFor(chunk.From)
428		if err != nil {
429			return nil, log.Error(err)
430		}
431		outstanding.Add(tableName, key, placeholder)
432	}
433
434	result := []chunk.Chunk{}
435	unprocessed := dynamoDBReadRequest{}
436	backoff := backoff.New(ctx, a.cfg.BackoffConfig)
437
438	for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() {
439		requests := dynamoDBReadRequest{}
440		requests.TakeReqs(outstanding, dynamoDBMaxReadBatchSize)
441		requests.TakeReqs(unprocessed, dynamoDBMaxReadBatchSize)
442
443		request := a.batchGetItemRequestFn(ctx, &dynamodb.BatchGetItemInput{
444			RequestItems:           requests,
445			ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
446		})
447
448		err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
449			return request.Send()
450		})
451		response := request.Data().(*dynamodb.BatchGetItemOutput)
452
453		for _, cc := range response.ConsumedCapacity {
454			a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName).
455				Add(float64(*cc.CapacityUnits))
456		}
457
458		if err != nil {
459			for tableName := range requests {
460				recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics)
461			}
462
463			// If we get provisionedThroughputExceededException, then no items were processed,
464			// so back off and retry all.
465			if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
466				unprocessed.TakeReqs(requests, -1)
467				backoff.Wait()
468				continue
469			} else if ok && awsErr.Code() == validationException {
470				// this read will never work, so the only option is to drop the offending request and continue.
471				level.Warn(log).Log("msg", "Error while fetching data from Dynamo", "err", awsErr)
472				level.Debug(log).Log("msg", "Dropped request details", "requests", requests)
473				// recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context
474				// to determine if a request was dropped (or not)
475				for tableName := range requests {
476					a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc()
477				}
478				continue
479			}
480
481			// All other errors are critical.
482			return nil, err
483		}
484
485		processedChunks, err := processChunkResponse(response, chunksByKey)
486		if err != nil {
487			return nil, log.Error(err)
488		}
489		result = append(result, processedChunks...)
490
491		// If there are unprocessed items, retry those items.
492		if unprocessedKeys := response.UnprocessedKeys; unprocessedKeys != nil && dynamoDBReadRequest(unprocessedKeys).Len() > 0 {
493			unprocessed.TakeReqs(unprocessedKeys, -1)
494		}
495
496		backoff.Reset()
497	}
498
499	if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 {
500		// Return the chunks we did fetch, because partial results may be useful
501		return result, log.Error(fmt.Errorf("failed to query chunks, %d values remaining: %s", valuesLeft, backoff.Err()))
502	}
503	return result, nil
504}
505
506func processChunkResponse(response *dynamodb.BatchGetItemOutput, chunksByKey map[string]chunk.Chunk) ([]chunk.Chunk, error) {
507	result := []chunk.Chunk{}
508	decodeContext := chunk.NewDecodeContext()
509	for _, items := range response.Responses {
510		for _, item := range items {
511			key, ok := item[hashKey]
512			if !ok || key == nil || key.S == nil {
513				return nil, fmt.Errorf("Got response from DynamoDB with no hash key: %+v", item)
514			}
515
516			chunk, ok := chunksByKey[*key.S]
517			if !ok {
518				return nil, fmt.Errorf("Got response from DynamoDB with chunk I didn't ask for: %s", *key.S)
519			}
520
521			buf, ok := item[valueKey]
522			if !ok || buf == nil || buf.B == nil {
523				return nil, fmt.Errorf("Got response from DynamoDB with no value: %+v", item)
524			}
525
526			if err := chunk.Decode(decodeContext, buf.B); err != nil {
527				return nil, err
528			}
529
530			result = append(result, chunk)
531		}
532	}
533	return result, nil
534}
535
536// PutChunkAndIndex implements chunk.ObjectAndIndexClient
537// Combine both sets of writes before sending to DynamoDB, for performance
538func (a dynamoDBStorageClient) PutChunksAndIndex(ctx context.Context, chunks []chunk.Chunk, index chunk.WriteBatch) error {
539	dynamoDBWrites, err := a.writesForChunks(chunks)
540	if err != nil {
541		return err
542	}
543	dynamoDBWrites.TakeReqs(index.(dynamoDBWriteBatch), 0)
544	return a.BatchWrite(ctx, dynamoDBWrites)
545}
546
547// PutChunks implements chunk.Client.
548func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
549	dynamoDBWrites, err := a.writesForChunks(chunks)
550	if err != nil {
551		return err
552	}
553	return a.BatchWrite(ctx, dynamoDBWrites)
554}
555
556func (a dynamoDBStorageClient) DeleteChunk(ctx context.Context, userID, chunkID string) error {
557	chunkRef, err := chunk.ParseExternalKey(userID, chunkID)
558	if err != nil {
559		return err
560	}
561
562	tableName, err := a.schemaCfg.ChunkTableFor(chunkRef.From)
563	if err != nil {
564		return err
565	}
566
567	dynamoDBWrites := dynamoDBWriteBatch{}
568	dynamoDBWrites.Delete(tableName, chunkID, placeholder)
569	return a.BatchWrite(ctx, dynamoDBWrites)
570}
571
572func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWriteBatch, error) {
573	var (
574		dynamoDBWrites = dynamoDBWriteBatch{}
575	)
576
577	for i := range chunks {
578		buf, err := chunks[i].Encoded()
579		if err != nil {
580			return nil, err
581		}
582		key := chunks[i].ExternalKey()
583
584		table, err := a.schemaCfg.ChunkTableFor(chunks[i].From)
585		if err != nil {
586			return nil, err
587		}
588
589		dynamoDBWrites.Add(table, key, placeholder, buf)
590	}
591
592	return dynamoDBWrites, nil
593}
594
595// Slice of values returned; map key is attribute name
596type dynamoDBReadResponse struct {
597	items []map[string]*dynamodb.AttributeValue
598}
599
600func (b *dynamoDBReadResponse) Iterator() chunk.ReadBatchIterator {
601	return &dynamoDBReadResponseIterator{
602		i:                    -1,
603		dynamoDBReadResponse: b,
604	}
605}
606
607type dynamoDBReadResponseIterator struct {
608	i int
609	*dynamoDBReadResponse
610}
611
612func (b *dynamoDBReadResponseIterator) Next() bool {
613	b.i++
614	return b.i < len(b.items)
615}
616
617func (b *dynamoDBReadResponseIterator) RangeValue() []byte {
618	return b.items[b.i][rangeKey].B
619}
620
621func (b *dynamoDBReadResponseIterator) Value() []byte {
622	chunkValue, ok := b.items[b.i][valueKey]
623	if !ok {
624		return nil
625	}
626	return chunkValue.B
627}
628
629// map key is table name; value is a slice of things to 'put'
630type dynamoDBWriteBatch map[string][]*dynamodb.WriteRequest
631
632func (b dynamoDBWriteBatch) Len() int {
633	result := 0
634	for _, reqs := range b {
635		result += len(reqs)
636	}
637	return result
638}
639
640func (b dynamoDBWriteBatch) String() string {
641	var sb strings.Builder
642	sb.WriteByte('{')
643	for k, reqs := range b {
644		sb.WriteString(k)
645		sb.WriteString(": [")
646		for _, req := range reqs {
647			sb.WriteString(req.String())
648			sb.WriteByte(',')
649		}
650		sb.WriteString("], ")
651	}
652	sb.WriteByte('}')
653	return sb.String()
654}
655
656func (b dynamoDBWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) {
657	item := map[string]*dynamodb.AttributeValue{
658		hashKey:  {S: aws.String(hashValue)},
659		rangeKey: {B: rangeValue},
660	}
661
662	if value != nil {
663		item[valueKey] = &dynamodb.AttributeValue{B: value}
664	}
665
666	b[tableName] = append(b[tableName], &dynamodb.WriteRequest{
667		PutRequest: &dynamodb.PutRequest{
668			Item: item,
669		},
670	})
671}
672
673func (b dynamoDBWriteBatch) Delete(tableName, hashValue string, rangeValue []byte) {
674	b[tableName] = append(b[tableName], &dynamodb.WriteRequest{
675		DeleteRequest: &dynamodb.DeleteRequest{
676			Key: map[string]*dynamodb.AttributeValue{
677				hashKey:  {S: aws.String(hashValue)},
678				rangeKey: {B: rangeValue},
679			},
680		},
681	})
682}
683
684// Fill 'b' with WriteRequests from 'from' until 'b' has at most max requests. Remove those requests from 'from'.
685func (b dynamoDBWriteBatch) TakeReqs(from dynamoDBWriteBatch, max int) {
686	outLen, inLen := b.Len(), from.Len()
687	toFill := inLen
688	if max > 0 {
689		toFill = math.Min(inLen, max-outLen)
690	}
691	for toFill > 0 {
692		for tableName, fromReqs := range from {
693			taken := math.Min(len(fromReqs), toFill)
694			if taken > 0 {
695				b[tableName] = append(b[tableName], fromReqs[:taken]...)
696				from[tableName] = fromReqs[taken:]
697				toFill -= taken
698			}
699		}
700	}
701}
702
703// map key is table name
704type dynamoDBReadRequest map[string]*dynamodb.KeysAndAttributes
705
706func (b dynamoDBReadRequest) Len() int {
707	result := 0
708	for _, reqs := range b {
709		result += len(reqs.Keys)
710	}
711	return result
712}
713
714func (b dynamoDBReadRequest) Add(tableName, hashValue string, rangeValue []byte) {
715	requests, ok := b[tableName]
716	if !ok {
717		requests = &dynamodb.KeysAndAttributes{
718			AttributesToGet: []*string{
719				aws.String(hashKey),
720				aws.String(valueKey),
721			},
722		}
723		b[tableName] = requests
724	}
725	requests.Keys = append(requests.Keys, map[string]*dynamodb.AttributeValue{
726		hashKey:  {S: aws.String(hashValue)},
727		rangeKey: {B: rangeValue},
728	})
729}
730
731// Fill 'b' with ReadRequests from 'from' until 'b' has at most max requests. Remove those requests from 'from'.
732func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) {
733	outLen, inLen := b.Len(), from.Len()
734	toFill := inLen
735	if max > 0 {
736		toFill = math.Min(inLen, max-outLen)
737	}
738	for toFill > 0 {
739		for tableName, fromReqs := range from {
740			taken := math.Min(len(fromReqs.Keys), toFill)
741			if taken > 0 {
742				if _, ok := b[tableName]; !ok {
743					b[tableName] = &dynamodb.KeysAndAttributes{
744						AttributesToGet: []*string{
745							aws.String(hashKey),
746							aws.String(valueKey),
747						},
748					}
749				}
750
751				b[tableName].Keys = append(b[tableName].Keys, fromReqs.Keys[:taken]...)
752				from[tableName].Keys = fromReqs.Keys[taken:]
753				toFill -= taken
754			}
755		}
756	}
757}
758
759func withErrorHandler(tableName, operation string, metrics *dynamoDBMetrics) func(req *request.Request) {
760	return func(req *request.Request) {
761		req.Handlers.CompleteAttempt.PushBack(func(req *request.Request) {
762			if req.Error != nil {
763				recordDynamoError(tableName, req.Error, operation, metrics)
764			}
765		})
766	}
767}
768
769func recordDynamoError(tableName string, err error, operation string, metrics *dynamoDBMetrics) {
770	if awsErr, ok := err.(awserr.Error); ok {
771		metrics.dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1))
772	} else {
773		metrics.dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1))
774	}
775}
776
777// dynamoClientFromURL creates a new DynamoDB client from a URL.
778func dynamoClientFromURL(awsURL *url.URL) (dynamodbiface.DynamoDBAPI, error) {
779	dynamoDBSession, err := awsSessionFromURL(awsURL)
780	if err != nil {
781		return nil, err
782	}
783	return dynamodb.New(dynamoDBSession), nil
784}
785
786// awsSessionFromURL creates a new aws session from a URL.
787func awsSessionFromURL(awsURL *url.URL) (client.ConfigProvider, error) {
788	if awsURL == nil {
789		return nil, fmt.Errorf("no URL specified for DynamoDB")
790	}
791	path := strings.TrimPrefix(awsURL.Path, "/")
792	if len(path) > 0 {
793		level.Warn(log.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path)
794	}
795	config, err := awscommon.ConfigFromURL(awsURL)
796	if err != nil {
797		return nil, err
798	}
799	config = config.WithMaxRetries(0) // We do our own retries, so we can monitor them
800	config = config.WithHTTPClient(&http.Client{Transport: defaultTransport})
801	return session.NewSession(config)
802}
803
804// Copy-pasted http.DefaultTransport
805var defaultTransport http.RoundTripper = &http.Transport{
806	Proxy: http.ProxyFromEnvironment,
807	DialContext: (&net.Dialer{
808		Timeout:   30 * time.Second,
809		KeepAlive: 30 * time.Second,
810	}).DialContext,
811	ForceAttemptHTTP2: true,
812	MaxIdleConns:      100,
813	// We will connect many times in parallel to the same DynamoDB server,
814	// see https://github.com/golang/go/issues/13801
815	MaxIdleConnsPerHost:   100,
816	IdleConnTimeout:       90 * time.Second,
817	TLSHandshakeTimeout:   10 * time.Second,
818	ExpectContinueTimeout: 1 * time.Second,
819}
820