1// Copyright 2021 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package managedwriter 16 17import ( 18 "context" 19 "log" 20 "sync" 21 22 "go.opencensus.io/stats" 23 "go.opencensus.io/stats/view" 24 "go.opencensus.io/tag" 25) 26 27var ( 28 // Metrics on a stream are tagged with the stream ID. 29 keyStream = tag.MustNewKey("streamID") 30 31 // We allow users to annotate streams with a data origin for monitoring purposes. 32 // See the WithDataOrigin writer option for providing this. 33 keyDataOrigin = tag.MustNewKey("dataOrigin") 34 35 // keyError tags metrics using the status code of returned errors. 36 keyError = tag.MustNewKey("error") 37) 38 39// DefaultOpenCensusViews retains the set of all opencensus views that this 40// library has instrumented, to add view registration for exporters. 41var DefaultOpenCensusViews []*view.View 42 43const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/" 44 45var ( 46 // AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened. 47 // It is EXPERIMENTAL and subject to change or removal without notice. 48 AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless) 49 50 // AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried. 51 // It is EXPERIMENTAL and subject to change or removal without notice. 52 AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless) 53 54 // AppendRequests is a measure of the number of append requests sent. 55 // It is EXPERIMENTAL and subject to change or removal without notice. 56 AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless) 57 58 // AppendRequestBytes is a measure of the bytes sent as append requests. 59 // It is EXPERIMENTAL and subject to change or removal without notice. 60 AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes) 61 62 // AppendRequestErrors is a measure of the number of append requests that errored on send. 63 // It is EXPERIMENTAL and subject to change or removal without notice. 64 AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless) 65 66 // AppendRequestRows is a measure of the number of append rows sent. 67 // It is EXPERIMENTAL and subject to change or removal without notice. 68 AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless) 69 70 // AppendResponses is a measure of the number of append responses received. 71 // It is EXPERIMENTAL and subject to change or removal without notice. 72 AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) 73 74 // AppendResponseErrors is a measure of the number of append responses received with an error attached. 75 // It is EXPERIMENTAL and subject to change or removal without notice. 76 AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless) 77 78 // FlushRequests is a measure of the number of FlushRows requests sent. 79 // It is EXPERIMENTAL and subject to change or removal without notice. 80 FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) 81) 82 83var ( 84 85 // AppendClientOpenView is a cumulative sum of AppendClientOpenCount. 86 // It is EXPERIMENTAL and subject to change or removal without notice. 87 AppendClientOpenView *view.View 88 89 // AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount. 90 // It is EXPERIMENTAL and subject to change or removal without notice. 91 AppendClientOpenRetryView *view.View 92 93 // AppendRequestsView is a cumulative sum of AppendRequests. 94 // It is EXPERIMENTAL and subject to change or removal without notice. 95 AppendRequestsView *view.View 96 97 // AppendRequestBytesView is a cumulative sum of AppendRequestBytes. 98 // It is EXPERIMENTAL and subject to change or removal without notice. 99 AppendRequestBytesView *view.View 100 101 // AppendRequestErrorsView is a cumulative sum of AppendRequestErrors. 102 // It is EXPERIMENTAL and subject to change or removal without notice. 103 AppendRequestErrorsView *view.View 104 105 // AppendRequestRowsView is a cumulative sum of AppendRows. 106 // It is EXPERIMENTAL and subject to change or removal without notice. 107 AppendRequestRowsView *view.View 108 109 // AppendResponsesView is a cumulative sum of AppendResponses. 110 // It is EXPERIMENTAL and subject to change or removal without notice. 111 AppendResponsesView *view.View 112 113 // AppendResponseErrorsView is a cumulative sum of AppendResponseErrors. 114 // It is EXPERIMENTAL and subject to change or removal without notice. 115 AppendResponseErrorsView *view.View 116 117 // FlushRequestsView is a cumulative sum of FlushRequests. 118 // It is EXPERIMENTAL and subject to change or removal without notice. 119 FlushRequestsView *view.View 120) 121 122func init() { 123 AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin) 124 AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin) 125 126 AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin) 127 AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin) 128 AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError) 129 AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin) 130 131 AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) 132 AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError) 133 134 FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin) 135 136 DefaultOpenCensusViews = []*view.View{ 137 AppendClientOpenView, 138 AppendClientOpenRetryView, 139 140 AppendRequestsView, 141 AppendRequestBytesView, 142 AppendRequestErrorsView, 143 AppendRequestRowsView, 144 145 AppendResponsesView, 146 AppendResponseErrorsView, 147 148 FlushRequestsView, 149 } 150} 151 152func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View { 153 return &view.View{ 154 Name: m.Name(), 155 Description: m.Description(), 156 TagKeys: keys, 157 Measure: m, 158 Aggregation: agg, 159 } 160} 161 162func createSumView(m stats.Measure, keys ...tag.Key) *view.View { 163 return createView(m, view.Sum(), keys...) 164} 165 166var logTagStreamOnce sync.Once 167var logTagOriginOnce sync.Once 168 169// keyContextWithStreamID returns a new context modified with the instrumentation tags. 170func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context { 171 ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID)) 172 if err != nil { 173 logTagStreamOnce.Do(func() { 174 log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err) 175 }) 176 } 177 ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin)) 178 if err != nil { 179 logTagOriginOnce.Do(func() { 180 log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err) 181 }) 182 } 183 return ctx 184} 185 186func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { 187 stats.Record(ctx, m.M(n)) 188} 189