1// Copyright 2021 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package managedwriter 16 17import ( 18 "context" 19 "log" 20 "sync" 21 22 "go.opencensus.io/stats" 23 "go.opencensus.io/stats/view" 24 "go.opencensus.io/tag" 25) 26 27var ( 28 // Metrics on a stream are tagged with the stream ID. 29 keyStream = tag.MustNewKey("streamID") 30 31 // We allow users to annotate streams with a data origin for monitoring purposes. 32 // See the WithDataOrigin writer option for providing this. 33 keyDataOrigin = tag.MustNewKey("dataOrigin") 34) 35 36const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/" 37 38var ( 39 // AppendRequests is a measure of the number of append requests sent. 40 // It is EXPERIMENTAL and subject to change or removal without notice. 41 AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless) 42 43 // AppendBytes is a measure of the bytes sent as append requests. 44 // It is EXPERIMENTAL and subject to change or removal without notice. 45 AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes) 46 47 // AppendResponses is a measure of the number of append responses received. 48 // It is EXPERIMENTAL and subject to change or removal without notice. 49 AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) 50 51 // FlushRequests is a measure of the number of FlushRows requests sent. 52 // It is EXPERIMENTAL and subject to change or removal without notice. 53 FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) 54 55 // AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened. 56 // It is EXPERIMENTAL and subject to change or removal without notice. 57 AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless) 58 59 // AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried. 60 // It is EXPERIMENTAL and subject to change or removal without notice. 61 AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless) 62) 63 64var ( 65 // AppendRequestsView is a cumulative sum of AppendRequests. 66 // It is EXPERIMENTAL and subject to change or removal without notice. 67 AppendRequestsView *view.View 68 69 // AppendBytesView is a cumulative sum of AppendBytes. 70 // It is EXPERIMENTAL and subject to change or removal without notice. 71 AppendBytesView *view.View 72 73 // AppendResponsesView is a cumulative sum of AppendResponses. 74 // It is EXPERIMENTAL and subject to change or removal without notice. 75 AppendResponsesView *view.View 76 77 // FlushRequestsView is a cumulative sum of FlushRequests. 78 // It is EXPERIMENTAL and subject to change or removal without notice. 79 FlushRequestsView *view.View 80 81 // AppendClientOpenView is a cumulative sum of AppendClientOpenCount. 82 // It is EXPERIMENTAL and subject to change or removal without notice. 83 AppendClientOpenView *view.View 84 85 // AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount. 86 // It is EXPERIMENTAL and subject to change or removal without notice. 87 AppendClientOpenRetryView *view.View 88) 89 90func init() { 91 AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin) 92 AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin) 93 AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) 94 FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin) 95 AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin) 96 AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin) 97} 98 99func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View { 100 return &view.View{ 101 Name: m.Name(), 102 Description: m.Description(), 103 TagKeys: keys, 104 Measure: m, 105 Aggregation: agg, 106 } 107} 108 109func createSumView(m stats.Measure, keys ...tag.Key) *view.View { 110 return createView(m, view.Sum(), keys...) 111} 112 113var logTagStreamOnce sync.Once 114var logTagOriginOnce sync.Once 115 116// keyContextWithStreamID returns a new context modified with the instrumentation tags. 117func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context { 118 ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID)) 119 if err != nil { 120 logTagStreamOnce.Do(func() { 121 log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err) 122 }) 123 } 124 ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin)) 125 if err != nil { 126 logTagOriginOnce.Do(func() { 127 log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err) 128 }) 129 } 130 return ctx 131} 132 133func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { 134 stats.Record(ctx, m.M(n)) 135} 136