1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package basic_test 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "runtime" 22 "sync" 23 "testing" 24 "time" 25 26 "github.com/stretchr/testify/require" 27 28 "go.opentelemetry.io/otel" 29 "go.opentelemetry.io/otel/attribute" 30 "go.opentelemetry.io/otel/metric" 31 export "go.opentelemetry.io/otel/sdk/export/metric" 32 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 33 controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" 34 "go.opentelemetry.io/otel/sdk/metric/controller/controllertest" 35 processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" 36 "go.opentelemetry.io/otel/sdk/metric/processor/processortest" 37 "go.opentelemetry.io/otel/sdk/resource" 38) 39 40var testResource = resource.NewWithAttributes(attribute.String("R", "V")) 41 42type handler struct { 43 sync.Mutex 44 err error 45} 46 47func (h *handler) Handle(err error) { 48 h.Lock() 49 h.err = err 50 h.Unlock() 51} 52 53func (h *handler) Flush() error { 54 h.Lock() 55 err := h.err 56 h.err = nil 57 h.Unlock() 58 return err 59} 60 61var testHandler *handler 62 63func init() { 64 testHandler = new(handler) 65 otel.SetErrorHandler(testHandler) 66} 67 68func newExporter() *processortest.Exporter { 69 return processortest.NewExporter( 70 export.StatelessExportKindSelector(), 71 attribute.DefaultEncoder(), 72 ) 73} 74 75func newCheckpointer() export.Checkpointer { 76 return processortest.Checkpointer( 77 processortest.NewProcessor( 78 processortest.AggregatorSelector(), 79 attribute.DefaultEncoder(), 80 ), 81 ) 82} 83 84func TestPushDoubleStop(t *testing.T) { 85 ctx := context.Background() 86 exporter := newExporter() 87 checkpointer := newCheckpointer() 88 p := controller.New(checkpointer, controller.WithExporter(exporter)) 89 require.NoError(t, p.Start(ctx)) 90 require.NoError(t, p.Stop(ctx)) 91 require.NoError(t, p.Stop(ctx)) 92} 93 94func TestPushDoubleStart(t *testing.T) { 95 ctx := context.Background() 96 exporter := newExporter() 97 checkpointer := newCheckpointer() 98 p := controller.New(checkpointer, controller.WithExporter(exporter)) 99 require.NoError(t, p.Start(ctx)) 100 err := p.Start(ctx) 101 require.Error(t, err) 102 require.True(t, errors.Is(err, controller.ErrControllerStarted)) 103 require.NoError(t, p.Stop(ctx)) 104} 105 106func TestPushTicker(t *testing.T) { 107 exporter := newExporter() 108 checkpointer := newCheckpointer() 109 p := controller.New( 110 checkpointer, 111 controller.WithExporter(exporter), 112 controller.WithCollectPeriod(time.Second), 113 controller.WithResource(testResource), 114 ) 115 meter := p.MeterProvider().Meter("name") 116 117 mock := controllertest.NewMockClock() 118 p.SetClock(mock) 119 120 ctx := context.Background() 121 122 counter := metric.Must(meter).NewInt64Counter("counter.sum") 123 124 require.NoError(t, p.Start(ctx)) 125 126 counter.Add(ctx, 3) 127 128 require.EqualValues(t, map[string]float64{}, exporter.Values()) 129 130 mock.Add(time.Second) 131 runtime.Gosched() 132 133 require.EqualValues(t, map[string]float64{ 134 "counter.sum//R=V": 3, 135 }, exporter.Values()) 136 137 require.Equal(t, 1, exporter.ExportCount()) 138 exporter.Reset() 139 140 counter.Add(ctx, 7) 141 142 mock.Add(time.Second) 143 runtime.Gosched() 144 145 require.EqualValues(t, map[string]float64{ 146 "counter.sum//R=V": 10, 147 }, exporter.Values()) 148 149 require.Equal(t, 1, exporter.ExportCount()) 150 exporter.Reset() 151 152 require.NoError(t, p.Stop(ctx)) 153} 154 155func TestPushExportError(t *testing.T) { 156 injector := func(name string, e error) func(r export.Record) error { 157 return func(r export.Record) error { 158 if r.Descriptor().Name() == name { 159 return e 160 } 161 return nil 162 } 163 } 164 var errAggregator = fmt.Errorf("unexpected error") 165 var tests = []struct { 166 name string 167 injectedError error 168 expected map[string]float64 169 expectedError error 170 }{ 171 {"errNone", nil, map[string]float64{ 172 "counter1.sum/X=Y/R=V": 3, 173 "counter2.sum//R=V": 5, 174 }, nil}, 175 {"errNoData", aggregation.ErrNoData, map[string]float64{ 176 "counter2.sum//R=V": 5, 177 }, nil}, 178 {"errUnexpected", errAggregator, map[string]float64{}, errAggregator}, 179 } 180 for _, tt := range tests { 181 t.Run(tt.name, func(t *testing.T) { 182 exporter := newExporter() 183 exporter.InjectErr = injector("counter1.sum", tt.injectedError) 184 185 // This test validates the error handling 186 // behavior of the basic Processor is honored 187 // by the push processor. 188 checkpointer := processor.New(processortest.AggregatorSelector(), exporter) 189 p := controller.New( 190 checkpointer, 191 controller.WithExporter(exporter), 192 controller.WithCollectPeriod(time.Second), 193 controller.WithResource(testResource), 194 ) 195 196 mock := controllertest.NewMockClock() 197 p.SetClock(mock) 198 199 ctx := context.Background() 200 201 meter := p.MeterProvider().Meter("name") 202 counter1 := metric.Must(meter).NewInt64Counter("counter1.sum") 203 counter2 := metric.Must(meter).NewInt64Counter("counter2.sum") 204 205 require.NoError(t, p.Start(ctx)) 206 runtime.Gosched() 207 208 counter1.Add(ctx, 3, attribute.String("X", "Y")) 209 counter2.Add(ctx, 5) 210 211 require.Equal(t, 0, exporter.ExportCount()) 212 require.Nil(t, testHandler.Flush()) 213 214 mock.Add(time.Second) 215 runtime.Gosched() 216 217 require.Equal(t, 1, exporter.ExportCount()) 218 if tt.expectedError == nil { 219 require.EqualValues(t, tt.expected, exporter.Values()) 220 require.NoError(t, testHandler.Flush()) 221 } else { 222 err := testHandler.Flush() 223 require.Error(t, err) 224 require.Equal(t, tt.expectedError, err) 225 } 226 227 require.NoError(t, p.Stop(ctx)) 228 }) 229 } 230} 231