1package objectclient
2
3import (
4	"bytes"
5	"context"
6	"encoding/base64"
7	"io/ioutil"
8
9	"github.com/pkg/errors"
10
11	"github.com/grafana/loki/pkg/storage/chunk"
12	"github.com/grafana/loki/pkg/storage/chunk/util"
13)
14
15// KeyEncoder is used to encode chunk keys before writing/retrieving chunks
16// from the underlying ObjectClient
17type KeyEncoder func(string) string
18
19// Base64Encoder is used to encode chunk keys in base64 before storing/retrieving
20// them from the ObjectClient
21var Base64Encoder = func(key string) string {
22	return base64.StdEncoding.EncodeToString([]byte(key))
23}
24
25// Client is used to store chunks in object store backends
26type Client struct {
27	store      chunk.ObjectClient
28	keyEncoder KeyEncoder
29}
30
31// NewClient wraps the provided ObjectClient with a chunk.Client implementation
32func NewClient(store chunk.ObjectClient, encoder KeyEncoder) *Client {
33	return &Client{
34		store:      store,
35		keyEncoder: encoder,
36	}
37}
38
39// Stop shuts down the object store and any underlying clients
40func (o *Client) Stop() {
41	o.store.Stop()
42}
43
44// PutChunks stores the provided chunks in the configured backend. If multiple errors are
45// returned, the last one sequentially will be propagated up.
46func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
47	var (
48		chunkKeys []string
49		chunkBufs [][]byte
50	)
51
52	for i := range chunks {
53		buf, err := chunks[i].Encoded()
54		if err != nil {
55			return err
56		}
57		key := chunks[i].ExternalKey()
58		if o.keyEncoder != nil {
59			key = o.keyEncoder(key)
60		}
61
62		chunkKeys = append(chunkKeys, key)
63		chunkBufs = append(chunkBufs, buf)
64	}
65
66	incomingErrors := make(chan error)
67	for i := range chunkBufs {
68		go func(i int) {
69			incomingErrors <- o.store.PutObject(ctx, chunkKeys[i], bytes.NewReader(chunkBufs[i]))
70		}(i)
71	}
72
73	var lastErr error
74	for range chunkKeys {
75		err := <-incomingErrors
76		if err != nil {
77			lastErr = err
78		}
79	}
80	return lastErr
81}
82
83// GetChunks retrieves the specified chunks from the configured backend
84func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
85	return util.GetParallelChunks(ctx, chunks, o.getChunk)
86}
87
88func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
89	key := c.ExternalKey()
90	if o.keyEncoder != nil {
91		key = o.keyEncoder(key)
92	}
93
94	readCloser, err := o.store.GetObject(ctx, key)
95	if err != nil {
96		return chunk.Chunk{}, errors.WithStack(err)
97	}
98
99	defer readCloser.Close()
100
101	buf, err := ioutil.ReadAll(readCloser)
102	if err != nil {
103		return chunk.Chunk{}, errors.WithStack(err)
104	}
105
106	if err := c.Decode(decodeContext, buf); err != nil {
107		return chunk.Chunk{}, errors.WithStack(err)
108	}
109	return c, nil
110}
111
112// GetChunks retrieves the specified chunks from the configured backend
113func (o *Client) DeleteChunk(ctx context.Context, userID, chunkID string) error {
114	key := chunkID
115	if o.keyEncoder != nil {
116		key = o.keyEncoder(key)
117	}
118	return o.store.DeleteObject(ctx, key)
119}
120
121func (o *Client) IsChunkNotFoundErr(err error) bool {
122	return o.store.IsObjectNotFoundErr(err)
123}
124