1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package testbed 16 17import ( 18 "fmt" 19 "strings" 20 21 "github.com/shirou/gopsutil/process" 22 "go.uber.org/zap" 23 "go.uber.org/zap/zapcore" 24 25 "go.opentelemetry.io/collector/component" 26 "go.opentelemetry.io/collector/internal/version" 27 "go.opentelemetry.io/collector/service" 28 "go.opentelemetry.io/collector/service/parserprovider" 29) 30 31// inProcessCollector implements the OtelcolRunner interfaces running a single otelcol as a go routine within the 32// same process as the test executor. 33type inProcessCollector struct { 34 logger *zap.Logger 35 factories component.Factories 36 configStr string 37 svc *service.Collector 38 appDone chan struct{} 39 stopped bool 40} 41 42// NewInProcessCollector crewtes a new inProcessCollector using the supplied component factories. 43func NewInProcessCollector(factories component.Factories) OtelcolRunner { 44 return &inProcessCollector{ 45 factories: factories, 46 } 47} 48 49func (ipp *inProcessCollector) PrepareConfig(configStr string) (configCleanup func(), err error) { 50 configCleanup = func() { 51 // NoOp 52 } 53 var logger *zap.Logger 54 logger, err = configureLogger() 55 if err != nil { 56 return configCleanup, err 57 } 58 ipp.logger = logger 59 ipp.configStr = configStr 60 return configCleanup, err 61} 62 63func (ipp *inProcessCollector) Start(args StartParams) error { 64 settings := service.CollectorSettings{ 65 BuildInfo: component.BuildInfo{ 66 Command: "otelcol", 67 Version: version.Version, 68 }, 69 Factories: ipp.factories, 70 ParserProvider: parserprovider.NewInMemory(strings.NewReader(ipp.configStr)), 71 } 72 var err error 73 ipp.svc, err = service.New(settings) 74 if err != nil { 75 return err 76 } 77 ipp.svc.Command().SetArgs(args.CmdArgs) 78 79 ipp.appDone = make(chan struct{}) 80 go func() { 81 defer close(ipp.appDone) 82 appErr := ipp.svc.Run() 83 if appErr != nil { 84 err = appErr 85 } 86 }() 87 88 for state := range ipp.svc.GetStateChannel() { 89 switch state { 90 case service.Starting: 91 // NoOp 92 case service.Running: 93 return err 94 default: 95 err = fmt.Errorf("unable to start, otelcol state is %d", state) 96 } 97 } 98 return err 99} 100 101func (ipp *inProcessCollector) Stop() (stopped bool, err error) { 102 if !ipp.stopped { 103 ipp.stopped = true 104 ipp.svc.Shutdown() 105 } 106 <-ipp.appDone 107 stopped = ipp.stopped 108 return stopped, err 109} 110 111func (ipp *inProcessCollector) WatchResourceConsumption() error { 112 return nil 113} 114 115func (ipp *inProcessCollector) GetProcessMon() *process.Process { 116 return nil 117} 118 119func (ipp *inProcessCollector) GetTotalConsumption() *ResourceConsumption { 120 return &ResourceConsumption{ 121 CPUPercentAvg: 0, 122 CPUPercentMax: 0, 123 RAMMiBAvg: 0, 124 RAMMiBMax: 0, 125 } 126} 127 128func (ipp *inProcessCollector) GetResourceConsumption() string { 129 return "" 130} 131 132func configureLogger() (*zap.Logger, error) { 133 conf := zap.NewDevelopmentConfig() 134 conf.Level.SetLevel(zapcore.InfoLevel) 135 logger, err := conf.Build() 136 return logger, err 137} 138