1// Copyright 2021 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package managedwriter
16
17import (
18	"context"
19	"log"
20	"sync"
21
22	"go.opencensus.io/stats"
23	"go.opencensus.io/stats/view"
24	"go.opencensus.io/tag"
25)
26
27var (
28	// Metrics on a stream are tagged with the stream ID.
29	keyStream = tag.MustNewKey("streamID")
30
31	// We allow users to annotate streams with a data origin for monitoring purposes.
32	// See the WithDataOrigin writer option for providing this.
33	keyDataOrigin = tag.MustNewKey("dataOrigin")
34)
35
36const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
37
38var (
39	// AppendRequests is a measure of the number of append requests sent.
40	// It is EXPERIMENTAL and subject to change or removal without notice.
41	AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
42
43	// AppendBytes is a measure of the bytes sent as append requests.
44	// It is EXPERIMENTAL and subject to change or removal without notice.
45	AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
46
47	// AppendResponses is a measure of the number of append responses received.
48	// It is EXPERIMENTAL and subject to change or removal without notice.
49	AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
50
51	// FlushRequests is a measure of the number of FlushRows requests sent.
52	// It is EXPERIMENTAL and subject to change or removal without notice.
53	FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
54
55	// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
56	// It is EXPERIMENTAL and subject to change or removal without notice.
57	AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
58
59	// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
60	// It is EXPERIMENTAL and subject to change or removal without notice.
61	AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
62)
63
64var (
65	// AppendRequestsView is a cumulative sum of AppendRequests.
66	// It is EXPERIMENTAL and subject to change or removal without notice.
67	AppendRequestsView *view.View
68
69	// AppendBytesView is a cumulative sum of AppendBytes.
70	// It is EXPERIMENTAL and subject to change or removal without notice.
71	AppendBytesView *view.View
72
73	// AppendResponsesView is a cumulative sum of AppendResponses.
74	// It is EXPERIMENTAL and subject to change or removal without notice.
75	AppendResponsesView *view.View
76
77	// FlushRequestsView is a cumulative sum of FlushRequests.
78	// It is EXPERIMENTAL and subject to change or removal without notice.
79	FlushRequestsView *view.View
80
81	// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
82	// It is EXPERIMENTAL and subject to change or removal without notice.
83	AppendClientOpenView *view.View
84
85	// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
86	// It is EXPERIMENTAL and subject to change or removal without notice.
87	AppendClientOpenRetryView *view.View
88)
89
90func init() {
91	AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
92	AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin)
93	AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
94	FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
95	AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
96	AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
97}
98
99func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
100	return &view.View{
101		Name:        m.Name(),
102		Description: m.Description(),
103		TagKeys:     keys,
104		Measure:     m,
105		Aggregation: agg,
106	}
107}
108
109func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
110	return createView(m, view.Sum(), keys...)
111}
112
113var logTagStreamOnce sync.Once
114var logTagOriginOnce sync.Once
115
116// keyContextWithStreamID returns a new context modified with the instrumentation tags.
117func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context {
118	ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
119	if err != nil {
120		logTagStreamOnce.Do(func() {
121			log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err)
122		})
123	}
124	ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin))
125	if err != nil {
126		logTagOriginOnce.Do(func() {
127			log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err)
128		})
129	}
130	return ctx
131}
132
133func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
134	stats.Record(ctx, m.M(n))
135}
136