1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package objstore 5 6import ( 7 "context" 8 "io" 9 10 "github.com/opentracing/opentracing-go" 11 12 "github.com/thanos-io/thanos/pkg/tracing" 13) 14 15// TracingBucket includes bucket operations in the traces. 16type TracingBucket struct { 17 bkt Bucket 18} 19 20func NewTracingBucket(bkt Bucket) InstrumentedBucket { 21 return TracingBucket{bkt: bkt} 22} 23 24func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) (err error) { 25 tracing.DoWithSpan(ctx, "bucket_iter", func(spanCtx context.Context, span opentracing.Span) { 26 span.LogKV("dir", dir) 27 err = t.bkt.Iter(spanCtx, dir, f, options...) 28 }) 29 return 30} 31 32func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { 33 span, spanCtx := tracing.StartSpan(ctx, "bucket_get") 34 span.LogKV("name", name) 35 36 r, err := t.bkt.Get(spanCtx, name) 37 if err != nil { 38 span.LogKV("err", err) 39 span.Finish() 40 return nil, err 41 } 42 43 return newTracingReadCloser(r, span), nil 44} 45 46func (t TracingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { 47 span, spanCtx := tracing.StartSpan(ctx, "bucket_getrange") 48 span.LogKV("name", name, "offset", off, "length", length) 49 50 r, err := t.bkt.GetRange(spanCtx, name, off, length) 51 if err != nil { 52 span.LogKV("err", err) 53 span.Finish() 54 return nil, err 55 } 56 57 return newTracingReadCloser(r, span), nil 58} 59 60func (t TracingBucket) Exists(ctx context.Context, name string) (exists bool, err error) { 61 tracing.DoWithSpan(ctx, "bucket_exists", func(spanCtx context.Context, span opentracing.Span) { 62 span.LogKV("name", name) 63 exists, err = t.bkt.Exists(spanCtx, name) 64 }) 65 return 66} 67 68func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs ObjectAttributes, err error) { 69 tracing.DoWithSpan(ctx, "bucket_attributes", func(spanCtx context.Context, span opentracing.Span) { 70 span.LogKV("name", name) 71 attrs, err = t.bkt.Attributes(spanCtx, name) 72 }) 73 return 74} 75 76func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { 77 tracing.DoWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) { 78 span.LogKV("name", name) 79 err = t.bkt.Upload(spanCtx, name, r) 80 }) 81 return 82} 83 84func (t TracingBucket) Delete(ctx context.Context, name string) (err error) { 85 tracing.DoWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) { 86 span.LogKV("name", name) 87 err = t.bkt.Delete(spanCtx, name) 88 }) 89 return 90} 91 92func (t TracingBucket) Name() string { 93 return "tracing: " + t.bkt.Name() 94} 95 96func (t TracingBucket) Close() error { 97 return t.bkt.Close() 98} 99 100func (t TracingBucket) IsObjNotFoundErr(err error) bool { 101 return t.bkt.IsObjNotFoundErr(err) 102} 103 104func (t TracingBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket { 105 if ib, ok := t.bkt.(InstrumentedBucket); ok { 106 return TracingBucket{bkt: ib.WithExpectedErrs(expectedFunc)} 107 } 108 return t 109} 110 111func (t TracingBucket) ReaderWithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) BucketReader { 112 return t.WithExpectedErrs(expectedFunc) 113} 114 115type tracingReadCloser struct { 116 r io.ReadCloser 117 s opentracing.Span 118 119 objSize int64 120 objSizeErr error 121 122 read int 123} 124 125func newTracingReadCloser(r io.ReadCloser, span opentracing.Span) io.ReadCloser { 126 // Since TryToGetSize can only reliably return size before doing any read calls, 127 // we call during "construction" and remember the results. 128 objSize, objSizeErr := TryToGetSize(r) 129 130 return &tracingReadCloser{r: r, s: span, objSize: objSize, objSizeErr: objSizeErr} 131} 132 133func (t *tracingReadCloser) ObjectSize() (int64, error) { 134 return t.objSize, t.objSizeErr 135} 136 137func (t *tracingReadCloser) Read(p []byte) (int, error) { 138 n, err := t.r.Read(p) 139 if n > 0 { 140 t.read += n 141 } 142 if err != nil && err != io.EOF && t.s != nil { 143 t.s.LogKV("err", err) 144 } 145 return n, err 146} 147 148func (t *tracingReadCloser) Close() error { 149 err := t.r.Close() 150 if t.s != nil { 151 t.s.LogKV("read", t.read) 152 if err != nil { 153 t.s.LogKV("close err", err) 154 } 155 t.s.Finish() 156 t.s = nil 157 } 158 return err 159} 160