1// Copyright 2021 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package managedwriter
16
17import (
18	"context"
19	"log"
20	"sync"
21
22	"go.opencensus.io/stats"
23	"go.opencensus.io/stats/view"
24	"go.opencensus.io/tag"
25)
26
27var (
28	// Metrics on a stream are tagged with the stream ID.
29	keyStream = tag.MustNewKey("streamID")
30
31	// We allow users to annotate streams with a data origin for monitoring purposes.
32	// See the WithDataOrigin writer option for providing this.
33	keyDataOrigin = tag.MustNewKey("dataOrigin")
34
35	// keyError tags metrics using the status code of returned errors.
36	keyError = tag.MustNewKey("error")
37)
38
39// DefaultOpenCensusViews retains the set of all opencensus views that this
40// library has instrumented, to add view registration for exporters.
41var DefaultOpenCensusViews []*view.View
42
43const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
44
45var (
46	// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
47	// It is EXPERIMENTAL and subject to change or removal without notice.
48	AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
49
50	// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
51	// It is EXPERIMENTAL and subject to change or removal without notice.
52	AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
53
54	// AppendRequests is a measure of the number of append requests sent.
55	// It is EXPERIMENTAL and subject to change or removal without notice.
56	AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
57
58	// AppendRequestBytes is a measure of the bytes sent as append requests.
59	// It is EXPERIMENTAL and subject to change or removal without notice.
60	AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
61
62	// AppendRequestErrors is a measure of the number of append requests that errored on send.
63	// It is EXPERIMENTAL and subject to change or removal without notice.
64	AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)
65
66	// AppendRequestRows is a measure of the number of append rows sent.
67	// It is EXPERIMENTAL and subject to change or removal without notice.
68	AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)
69
70	// AppendResponses is a measure of the number of append responses received.
71	// It is EXPERIMENTAL and subject to change or removal without notice.
72	AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
73
74	// AppendResponseErrors is a measure of the number of append responses received with an error attached.
75	// It is EXPERIMENTAL and subject to change or removal without notice.
76	AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)
77
78	// FlushRequests is a measure of the number of FlushRows requests sent.
79	// It is EXPERIMENTAL and subject to change or removal without notice.
80	FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
81)
82
83var (
84
85	// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
86	// It is EXPERIMENTAL and subject to change or removal without notice.
87	AppendClientOpenView *view.View
88
89	// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
90	// It is EXPERIMENTAL and subject to change or removal without notice.
91	AppendClientOpenRetryView *view.View
92
93	// AppendRequestsView is a cumulative sum of AppendRequests.
94	// It is EXPERIMENTAL and subject to change or removal without notice.
95	AppendRequestsView *view.View
96
97	// AppendRequestBytesView is a cumulative sum of AppendRequestBytes.
98	// It is EXPERIMENTAL and subject to change or removal without notice.
99	AppendRequestBytesView *view.View
100
101	// AppendRequestErrorsView is a cumulative sum of AppendRequestErrors.
102	// It is EXPERIMENTAL and subject to change or removal without notice.
103	AppendRequestErrorsView *view.View
104
105	// AppendRequestRowsView is a cumulative sum of AppendRows.
106	// It is EXPERIMENTAL and subject to change or removal without notice.
107	AppendRequestRowsView *view.View
108
109	// AppendResponsesView is a cumulative sum of AppendResponses.
110	// It is EXPERIMENTAL and subject to change or removal without notice.
111	AppendResponsesView *view.View
112
113	// AppendResponseErrorsView is a cumulative sum of AppendResponseErrors.
114	// It is EXPERIMENTAL and subject to change or removal without notice.
115	AppendResponseErrorsView *view.View
116
117	// FlushRequestsView is a cumulative sum of FlushRequests.
118	// It is EXPERIMENTAL and subject to change or removal without notice.
119	FlushRequestsView *view.View
120)
121
122func init() {
123	AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
124	AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
125
126	AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
127	AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
128	AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError)
129	AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin)
130
131	AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
132	AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)
133
134	FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
135
136	DefaultOpenCensusViews = []*view.View{
137		AppendClientOpenView,
138		AppendClientOpenRetryView,
139
140		AppendRequestsView,
141		AppendRequestBytesView,
142		AppendRequestErrorsView,
143		AppendRequestRowsView,
144
145		AppendResponsesView,
146		AppendResponseErrorsView,
147
148		FlushRequestsView,
149	}
150}
151
152func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
153	return &view.View{
154		Name:        m.Name(),
155		Description: m.Description(),
156		TagKeys:     keys,
157		Measure:     m,
158		Aggregation: agg,
159	}
160}
161
162func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
163	return createView(m, view.Sum(), keys...)
164}
165
166var logTagStreamOnce sync.Once
167var logTagOriginOnce sync.Once
168
169// keyContextWithStreamID returns a new context modified with the instrumentation tags.
170func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context {
171	ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
172	if err != nil {
173		logTagStreamOnce.Do(func() {
174			log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err)
175		})
176	}
177	ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin))
178	if err != nil {
179		logTagOriginOnce.Do(func() {
180			log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err)
181		})
182	}
183	return ctx
184}
185
186func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
187	stats.Record(ctx, m.M(n))
188}
189