1package objectclient 2 3import ( 4 "bytes" 5 "context" 6 "encoding/base64" 7 "io/ioutil" 8 9 "github.com/pkg/errors" 10 11 "github.com/grafana/loki/pkg/storage/chunk" 12 "github.com/grafana/loki/pkg/storage/chunk/util" 13) 14 15// KeyEncoder is used to encode chunk keys before writing/retrieving chunks 16// from the underlying ObjectClient 17type KeyEncoder func(string) string 18 19// Base64Encoder is used to encode chunk keys in base64 before storing/retrieving 20// them from the ObjectClient 21var Base64Encoder = func(key string) string { 22 return base64.StdEncoding.EncodeToString([]byte(key)) 23} 24 25// Client is used to store chunks in object store backends 26type Client struct { 27 store chunk.ObjectClient 28 keyEncoder KeyEncoder 29} 30 31// NewClient wraps the provided ObjectClient with a chunk.Client implementation 32func NewClient(store chunk.ObjectClient, encoder KeyEncoder) *Client { 33 return &Client{ 34 store: store, 35 keyEncoder: encoder, 36 } 37} 38 39// Stop shuts down the object store and any underlying clients 40func (o *Client) Stop() { 41 o.store.Stop() 42} 43 44// PutChunks stores the provided chunks in the configured backend. If multiple errors are 45// returned, the last one sequentially will be propagated up. 46func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { 47 var ( 48 chunkKeys []string 49 chunkBufs [][]byte 50 ) 51 52 for i := range chunks { 53 buf, err := chunks[i].Encoded() 54 if err != nil { 55 return err 56 } 57 key := chunks[i].ExternalKey() 58 if o.keyEncoder != nil { 59 key = o.keyEncoder(key) 60 } 61 62 chunkKeys = append(chunkKeys, key) 63 chunkBufs = append(chunkBufs, buf) 64 } 65 66 incomingErrors := make(chan error) 67 for i := range chunkBufs { 68 go func(i int) { 69 incomingErrors <- o.store.PutObject(ctx, chunkKeys[i], bytes.NewReader(chunkBufs[i])) 70 }(i) 71 } 72 73 var lastErr error 74 for range chunkKeys { 75 err := <-incomingErrors 76 if err != nil { 77 lastErr = err 78 } 79 } 80 return lastErr 81} 82 83// GetChunks retrieves the specified chunks from the configured backend 84func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { 85 return util.GetParallelChunks(ctx, chunks, o.getChunk) 86} 87 88func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { 89 key := c.ExternalKey() 90 if o.keyEncoder != nil { 91 key = o.keyEncoder(key) 92 } 93 94 readCloser, err := o.store.GetObject(ctx, key) 95 if err != nil { 96 return chunk.Chunk{}, errors.WithStack(err) 97 } 98 99 defer readCloser.Close() 100 101 buf, err := ioutil.ReadAll(readCloser) 102 if err != nil { 103 return chunk.Chunk{}, errors.WithStack(err) 104 } 105 106 if err := c.Decode(decodeContext, buf); err != nil { 107 return chunk.Chunk{}, errors.WithStack(err) 108 } 109 return c, nil 110} 111 112// GetChunks retrieves the specified chunks from the configured backend 113func (o *Client) DeleteChunk(ctx context.Context, userID, chunkID string) error { 114 key := chunkID 115 if o.keyEncoder != nil { 116 key = o.keyEncoder(key) 117 } 118 return o.store.DeleteObject(ctx, key) 119} 120 121func (o *Client) IsChunkNotFoundErr(err error) bool { 122 return o.store.IsObjectNotFoundErr(err) 123} 124