1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package objstore
5
6import (
7	"context"
8	"io"
9
10	"github.com/opentracing/opentracing-go"
11
12	"github.com/thanos-io/thanos/pkg/tracing"
13)
14
15// TracingBucket includes bucket operations in the traces.
16type TracingBucket struct {
17	bkt Bucket
18}
19
20func NewTracingBucket(bkt Bucket) InstrumentedBucket {
21	return TracingBucket{bkt: bkt}
22}
23
24func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) (err error) {
25	tracing.DoWithSpan(ctx, "bucket_iter", func(spanCtx context.Context, span opentracing.Span) {
26		span.LogKV("dir", dir)
27		err = t.bkt.Iter(spanCtx, dir, f, options...)
28	})
29	return
30}
31
32func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
33	span, spanCtx := tracing.StartSpan(ctx, "bucket_get")
34	span.LogKV("name", name)
35
36	r, err := t.bkt.Get(spanCtx, name)
37	if err != nil {
38		span.LogKV("err", err)
39		span.Finish()
40		return nil, err
41	}
42
43	return newTracingReadCloser(r, span), nil
44}
45
46func (t TracingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
47	span, spanCtx := tracing.StartSpan(ctx, "bucket_getrange")
48	span.LogKV("name", name, "offset", off, "length", length)
49
50	r, err := t.bkt.GetRange(spanCtx, name, off, length)
51	if err != nil {
52		span.LogKV("err", err)
53		span.Finish()
54		return nil, err
55	}
56
57	return newTracingReadCloser(r, span), nil
58}
59
60func (t TracingBucket) Exists(ctx context.Context, name string) (exists bool, err error) {
61	tracing.DoWithSpan(ctx, "bucket_exists", func(spanCtx context.Context, span opentracing.Span) {
62		span.LogKV("name", name)
63		exists, err = t.bkt.Exists(spanCtx, name)
64	})
65	return
66}
67
68func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs ObjectAttributes, err error) {
69	tracing.DoWithSpan(ctx, "bucket_attributes", func(spanCtx context.Context, span opentracing.Span) {
70		span.LogKV("name", name)
71		attrs, err = t.bkt.Attributes(spanCtx, name)
72	})
73	return
74}
75
76func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) {
77	tracing.DoWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) {
78		span.LogKV("name", name)
79		err = t.bkt.Upload(spanCtx, name, r)
80	})
81	return
82}
83
84func (t TracingBucket) Delete(ctx context.Context, name string) (err error) {
85	tracing.DoWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) {
86		span.LogKV("name", name)
87		err = t.bkt.Delete(spanCtx, name)
88	})
89	return
90}
91
92func (t TracingBucket) Name() string {
93	return "tracing: " + t.bkt.Name()
94}
95
96func (t TracingBucket) Close() error {
97	return t.bkt.Close()
98}
99
100func (t TracingBucket) IsObjNotFoundErr(err error) bool {
101	return t.bkt.IsObjNotFoundErr(err)
102}
103
104func (t TracingBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket {
105	if ib, ok := t.bkt.(InstrumentedBucket); ok {
106		return TracingBucket{bkt: ib.WithExpectedErrs(expectedFunc)}
107	}
108	return t
109}
110
111func (t TracingBucket) ReaderWithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) BucketReader {
112	return t.WithExpectedErrs(expectedFunc)
113}
114
115type tracingReadCloser struct {
116	r io.ReadCloser
117	s opentracing.Span
118
119	objSize    int64
120	objSizeErr error
121
122	read int
123}
124
125func newTracingReadCloser(r io.ReadCloser, span opentracing.Span) io.ReadCloser {
126	// Since TryToGetSize can only reliably return size before doing any read calls,
127	// we call during "construction" and remember the results.
128	objSize, objSizeErr := TryToGetSize(r)
129
130	return &tracingReadCloser{r: r, s: span, objSize: objSize, objSizeErr: objSizeErr}
131}
132
133func (t *tracingReadCloser) ObjectSize() (int64, error) {
134	return t.objSize, t.objSizeErr
135}
136
137func (t *tracingReadCloser) Read(p []byte) (int, error) {
138	n, err := t.r.Read(p)
139	if n > 0 {
140		t.read += n
141	}
142	if err != nil && err != io.EOF && t.s != nil {
143		t.s.LogKV("err", err)
144	}
145	return n, err
146}
147
148func (t *tracingReadCloser) Close() error {
149	err := t.r.Close()
150	if t.s != nil {
151		t.s.LogKV("read", t.read)
152		if err != nil {
153			t.s.LogKV("close err", err)
154		}
155		t.s.Finish()
156		t.s = nil
157	}
158	return err
159}
160