// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package managedwriter import ( "context" "log" "sync" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) var ( // Metrics on a stream are tagged with the stream ID. keyStream = tag.MustNewKey("streamID") // We allow users to annotate streams with a data origin for monitoring purposes. // See the WithDataOrigin writer option for providing this. keyDataOrigin = tag.MustNewKey("dataOrigin") // keyError tags metrics using the status code of returned errors. keyError = tag.MustNewKey("error") ) // DefaultOpenCensusViews retains the set of all opencensus views that this // library has instrumented, to add view registration for exporters. var DefaultOpenCensusViews []*view.View const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/" var ( // AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless) // AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless) // AppendRequests is a measure of the number of append requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless) // AppendRequestBytes is a measure of the bytes sent as append requests. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes) // AppendRequestErrors is a measure of the number of append requests that errored on send. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless) // AppendRequestRows is a measure of the number of append rows sent. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless) // AppendResponses is a measure of the number of append responses received. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) // AppendResponseErrors is a measure of the number of append responses received with an error attached. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless) // FlushRequests is a measure of the number of FlushRows requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) ) var ( // AppendClientOpenView is a cumulative sum of AppendClientOpenCount. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenView *view.View // AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenRetryView *view.View // AppendRequestsView is a cumulative sum of AppendRequests. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestsView *view.View // AppendRequestBytesView is a cumulative sum of AppendRequestBytes. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestBytesView *view.View // AppendRequestErrorsView is a cumulative sum of AppendRequestErrors. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestErrorsView *view.View // AppendRequestRowsView is a cumulative sum of AppendRows. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestRowsView *view.View // AppendResponsesView is a cumulative sum of AppendResponses. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponsesView *view.View // AppendResponseErrorsView is a cumulative sum of AppendResponseErrors. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrorsView *view.View // FlushRequestsView is a cumulative sum of FlushRequests. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequestsView *view.View ) func init() { AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin) AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin) AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin) AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin) AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError) AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin) AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError) FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin) DefaultOpenCensusViews = []*view.View{ AppendClientOpenView, AppendClientOpenRetryView, AppendRequestsView, AppendRequestBytesView, AppendRequestErrorsView, AppendRequestRowsView, AppendResponsesView, AppendResponseErrorsView, FlushRequestsView, } } func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View { return &view.View{ Name: m.Name(), Description: m.Description(), TagKeys: keys, Measure: m, Aggregation: agg, } } func createSumView(m stats.Measure, keys ...tag.Key) *view.View { return createView(m, view.Sum(), keys...) } var logTagStreamOnce sync.Once var logTagOriginOnce sync.Once // keyContextWithStreamID returns a new context modified with the instrumentation tags. func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context { ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID)) if err != nil { logTagStreamOnce.Do(func() { log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err) }) } ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin)) if err != nil { logTagOriginOnce.Do(func() { log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err) }) } return ctx } func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { stats.Record(ctx, m.M(n)) }