1// Copyright 2017, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package trace
16
17import (
18	"sync"
19	"time"
20
21	"go.opencensus.io/internal"
22)
23
24const (
25	maxBucketSize     = 100000
26	defaultBucketSize = 10
27)
28
29var (
30	ssmu       sync.RWMutex // protects spanStores
31	spanStores = make(map[string]*spanStore)
32)
33
34// This exists purely to avoid exposing internal methods used by z-Pages externally.
35type internalOnly struct{}
36
37func init() {
38	//TODO(#412): remove
39	internal.Trace = &internalOnly{}
40}
41
42// ReportActiveSpans returns the active spans for the given name.
43func (i internalOnly) ReportActiveSpans(name string) []*SpanData {
44	s := spanStoreForName(name)
45	if s == nil {
46		return nil
47	}
48	var out []*SpanData
49	s.mu.Lock()
50	defer s.mu.Unlock()
51	for span := range s.active {
52		out = append(out, span.makeSpanData())
53	}
54	return out
55}
56
57// ReportSpansByError returns a sample of error spans.
58//
59// If code is nonzero, only spans with that status code are returned.
60func (i internalOnly) ReportSpansByError(name string, code int32) []*SpanData {
61	s := spanStoreForName(name)
62	if s == nil {
63		return nil
64	}
65	var out []*SpanData
66	s.mu.Lock()
67	defer s.mu.Unlock()
68	if code != 0 {
69		if b, ok := s.errors[code]; ok {
70			for _, sd := range b.buffer {
71				if sd == nil {
72					break
73				}
74				out = append(out, sd)
75			}
76		}
77	} else {
78		for _, b := range s.errors {
79			for _, sd := range b.buffer {
80				if sd == nil {
81					break
82				}
83				out = append(out, sd)
84			}
85		}
86	}
87	return out
88}
89
90// ConfigureBucketSizes sets the number of spans to keep per latency and error
91// bucket for different span names.
92func (i internalOnly) ConfigureBucketSizes(bcs []internal.BucketConfiguration) {
93	for _, bc := range bcs {
94		latencyBucketSize := bc.MaxRequestsSucceeded
95		if latencyBucketSize < 0 {
96			latencyBucketSize = 0
97		}
98		if latencyBucketSize > maxBucketSize {
99			latencyBucketSize = maxBucketSize
100		}
101		errorBucketSize := bc.MaxRequestsErrors
102		if errorBucketSize < 0 {
103			errorBucketSize = 0
104		}
105		if errorBucketSize > maxBucketSize {
106			errorBucketSize = maxBucketSize
107		}
108		spanStoreSetSize(bc.Name, latencyBucketSize, errorBucketSize)
109	}
110}
111
112// ReportSpansPerMethod returns a summary of what spans are being stored for each span name.
113func (i internalOnly) ReportSpansPerMethod() map[string]internal.PerMethodSummary {
114	out := make(map[string]internal.PerMethodSummary)
115	ssmu.RLock()
116	defer ssmu.RUnlock()
117	for name, s := range spanStores {
118		s.mu.Lock()
119		p := internal.PerMethodSummary{
120			Active: len(s.active),
121		}
122		for code, b := range s.errors {
123			p.ErrorBuckets = append(p.ErrorBuckets, internal.ErrorBucketSummary{
124				ErrorCode: code,
125				Size:      b.size(),
126			})
127		}
128		for i, b := range s.latency {
129			min, max := latencyBucketBounds(i)
130			p.LatencyBuckets = append(p.LatencyBuckets, internal.LatencyBucketSummary{
131				MinLatency: min,
132				MaxLatency: max,
133				Size:       b.size(),
134			})
135		}
136		s.mu.Unlock()
137		out[name] = p
138	}
139	return out
140}
141
142// ReportSpansByLatency returns a sample of successful spans.
143//
144// minLatency is the minimum latency of spans to be returned.
145// maxLatency, if nonzero, is the maximum latency of spans to be returned.
146func (i internalOnly) ReportSpansByLatency(name string, minLatency, maxLatency time.Duration) []*SpanData {
147	s := spanStoreForName(name)
148	if s == nil {
149		return nil
150	}
151	var out []*SpanData
152	s.mu.Lock()
153	defer s.mu.Unlock()
154	for i, b := range s.latency {
155		min, max := latencyBucketBounds(i)
156		if i+1 != len(s.latency) && max <= minLatency {
157			continue
158		}
159		if maxLatency != 0 && maxLatency < min {
160			continue
161		}
162		for _, sd := range b.buffer {
163			if sd == nil {
164				break
165			}
166			if minLatency != 0 || maxLatency != 0 {
167				d := sd.EndTime.Sub(sd.StartTime)
168				if d < minLatency {
169					continue
170				}
171				if maxLatency != 0 && d > maxLatency {
172					continue
173				}
174			}
175			out = append(out, sd)
176		}
177	}
178	return out
179}
180
181// spanStore keeps track of spans stored for a particular span name.
182//
183// It contains all active spans; a sample of spans for failed requests,
184// categorized by error code; and a sample of spans for successful requests,
185// bucketed by latency.
186type spanStore struct {
187	mu                     sync.Mutex // protects everything below.
188	active                 map[*Span]struct{}
189	errors                 map[int32]*bucket
190	latency                []bucket
191	maxSpansPerErrorBucket int
192}
193
194// newSpanStore creates a span store.
195func newSpanStore(name string, latencyBucketSize int, errorBucketSize int) *spanStore {
196	s := &spanStore{
197		active:                 make(map[*Span]struct{}),
198		latency:                make([]bucket, len(defaultLatencies)+1),
199		maxSpansPerErrorBucket: errorBucketSize,
200	}
201	for i := range s.latency {
202		s.latency[i] = makeBucket(latencyBucketSize)
203	}
204	return s
205}
206
207// spanStoreForName returns the spanStore for the given name.
208//
209// It returns nil if it doesn't exist.
210func spanStoreForName(name string) *spanStore {
211	var s *spanStore
212	ssmu.RLock()
213	s, _ = spanStores[name]
214	ssmu.RUnlock()
215	return s
216}
217
218// spanStoreForNameCreateIfNew returns the spanStore for the given name.
219//
220// It creates it if it didn't exist.
221func spanStoreForNameCreateIfNew(name string) *spanStore {
222	ssmu.RLock()
223	s, ok := spanStores[name]
224	ssmu.RUnlock()
225	if ok {
226		return s
227	}
228	ssmu.Lock()
229	defer ssmu.Unlock()
230	s, ok = spanStores[name]
231	if ok {
232		return s
233	}
234	s = newSpanStore(name, defaultBucketSize, defaultBucketSize)
235	spanStores[name] = s
236	return s
237}
238
239// spanStoreSetSize resizes the spanStore for the given name.
240//
241// It creates it if it didn't exist.
242func spanStoreSetSize(name string, latencyBucketSize int, errorBucketSize int) {
243	ssmu.RLock()
244	s, ok := spanStores[name]
245	ssmu.RUnlock()
246	if ok {
247		s.resize(latencyBucketSize, errorBucketSize)
248		return
249	}
250	ssmu.Lock()
251	defer ssmu.Unlock()
252	s, ok = spanStores[name]
253	if ok {
254		s.resize(latencyBucketSize, errorBucketSize)
255		return
256	}
257	s = newSpanStore(name, latencyBucketSize, errorBucketSize)
258	spanStores[name] = s
259}
260
261func (s *spanStore) resize(latencyBucketSize int, errorBucketSize int) {
262	s.mu.Lock()
263	for i := range s.latency {
264		s.latency[i].resize(latencyBucketSize)
265	}
266	for _, b := range s.errors {
267		b.resize(errorBucketSize)
268	}
269	s.maxSpansPerErrorBucket = errorBucketSize
270	s.mu.Unlock()
271}
272
273// add adds a span to the active bucket of the spanStore.
274func (s *spanStore) add(span *Span) {
275	s.mu.Lock()
276	s.active[span] = struct{}{}
277	s.mu.Unlock()
278}
279
280// finished removes a span from the active set, and adds a corresponding
281// SpanData to a latency or error bucket.
282func (s *spanStore) finished(span *Span, sd *SpanData) {
283	latency := sd.EndTime.Sub(sd.StartTime)
284	if latency < 0 {
285		latency = 0
286	}
287	code := sd.Status.Code
288
289	s.mu.Lock()
290	delete(s.active, span)
291	if code == 0 {
292		s.latency[latencyBucket(latency)].add(sd)
293	} else {
294		if s.errors == nil {
295			s.errors = make(map[int32]*bucket)
296		}
297		if b := s.errors[code]; b != nil {
298			b.add(sd)
299		} else {
300			b := makeBucket(s.maxSpansPerErrorBucket)
301			s.errors[code] = &b
302			b.add(sd)
303		}
304	}
305	s.mu.Unlock()
306}
307