1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package testbed 16 17import ( 18 "log" 19 "net" 20 "os" 21 "path" 22 "path/filepath" 23 "testing" 24 "time" 25 26 "github.com/stretchr/testify/require" 27) 28 29// TestCase defines a running test case. 30type TestCase struct { 31 t *testing.T 32 33 // Directory where test case results and logs will be written. 34 resultDir string 35 36 // does not write out results when set to true 37 skipResults bool 38 39 // Agent config file path. 40 agentConfigFile string 41 42 // Load generator spec file path. 43 // loadSpecFile string 44 45 // Resource spec for agent. 46 resourceSpec ResourceSpec 47 48 // Agent process. 49 agentProc OtelcolRunner 50 51 Sender DataSender 52 receiver DataReceiver 53 54 LoadGenerator *LoadGenerator 55 MockBackend *MockBackend 56 validator TestCaseValidator 57 58 startTime time.Time 59 60 // errorSignal indicates an error in the test case execution, e.g. process execution 61 // failure or exceeding resource consumption, etc. The actual error message is already 62 // logged, this is only an indicator on which you can wait to be informed. 63 errorSignal chan struct{} 64 // Duration is the requested duration of the tests. Configured via TESTBED_DURATION 65 // env variable and defaults to 15 seconds if env variable is unspecified. 66 Duration time.Duration 67 doneSignal chan struct{} 68 errorCause string 69 resultsSummary TestResultsSummary 70} 71 72const mibibyte = 1024 * 1024 73const testcaseDurationVar = "TESTCASE_DURATION" 74 75// NewTestCase creates a new TestCase. It expects agent-config.yaml in the specified directory. 76func NewTestCase( 77 t *testing.T, 78 dataProvider DataProvider, 79 sender DataSender, 80 receiver DataReceiver, 81 agentProc OtelcolRunner, 82 validator TestCaseValidator, 83 resultsSummary TestResultsSummary, 84 opts ...TestCaseOption, 85) *TestCase { 86 tc := TestCase{ 87 t: t, 88 errorSignal: make(chan struct{}), 89 doneSignal: make(chan struct{}), 90 startTime: time.Now(), 91 Sender: sender, 92 receiver: receiver, 93 agentProc: agentProc, 94 validator: validator, 95 resultsSummary: resultsSummary, 96 } 97 98 // Get requested test case duration from env variable. 99 duration := os.Getenv(testcaseDurationVar) 100 if duration == "" { 101 duration = "15s" 102 } 103 var err error 104 tc.Duration, err = time.ParseDuration(duration) 105 if err != nil { 106 log.Fatalf("Invalid "+testcaseDurationVar+": %v. Expecting a valid duration string.", duration) 107 } 108 109 // Apply all provided options. 110 for _, opt := range opts { 111 opt(&tc) 112 } 113 114 // Prepare directory for results. 115 tc.resultDir, err = filepath.Abs(path.Join("results", t.Name())) 116 require.NoErrorf(t, err, "Cannot resolve %s", t.Name()) 117 require.NoErrorf(t, os.MkdirAll(tc.resultDir, os.ModePerm), "Cannot create directory %s", tc.resultDir) 118 119 // Set default resource check period. 120 tc.resourceSpec.ResourceCheckPeriod = 3 * time.Second 121 if tc.Duration < tc.resourceSpec.ResourceCheckPeriod { 122 // Resource check period should not be longer than entire test duration. 123 tc.resourceSpec.ResourceCheckPeriod = tc.Duration 124 } 125 126 tc.LoadGenerator, err = NewLoadGenerator(dataProvider, sender) 127 require.NoError(t, err, "Cannot create generator") 128 129 tc.MockBackend = NewMockBackend(tc.composeTestResultFileName("backend.log"), receiver) 130 131 go tc.logStats() 132 133 return &tc 134} 135 136func (tc *TestCase) composeTestResultFileName(fileName string) string { 137 fileName, err := filepath.Abs(path.Join(tc.resultDir, fileName)) 138 require.NoError(tc.t, err, "Cannot resolve %s", fileName) 139 return fileName 140} 141 142// StartAgent starts the agent and redirects its standard output and standard error 143// to "agent.log" file located in the test directory. 144func (tc *TestCase) StartAgent(args ...string) { 145 if tc.agentConfigFile != "" { 146 args = append(args, "--config") 147 args = append(args, tc.agentConfigFile) 148 } 149 logFileName := tc.composeTestResultFileName("agent.log") 150 151 err := tc.agentProc.Start(StartParams{ 152 Name: "Agent", 153 LogFilePath: logFileName, 154 CmdArgs: args, 155 resourceSpec: &tc.resourceSpec, 156 }) 157 158 if err != nil { 159 tc.indicateError(err) 160 return 161 } 162 163 // Start watching resource consumption. 164 go func() { 165 err := tc.agentProc.WatchResourceConsumption() 166 if err != nil { 167 tc.indicateError(err) 168 } 169 }() 170 171 endpoint := tc.LoadGenerator.sender.GetEndpoint() 172 if endpoint != nil { 173 // Wait for agent to start. We consider the agent started when we can 174 // connect to the port to which we intend to send load. We only do this 175 // if the endpoint is not-empty, i.e. the sender does use network (some senders 176 // like text log writers don't). 177 tc.WaitFor(func() bool { 178 conn, err := net.Dial(tc.LoadGenerator.sender.GetEndpoint().Network(), tc.LoadGenerator.sender.GetEndpoint().String()) 179 if err == nil && conn != nil { 180 conn.Close() 181 return true 182 } 183 return false 184 }) 185 } 186} 187 188// StopAgent stops agent process. 189func (tc *TestCase) StopAgent() { 190 if _, err := tc.agentProc.Stop(); err != nil { 191 tc.indicateError(err) 192 } 193} 194 195// StartLoad starts the load generator and redirects its standard output and standard error 196// to "load-generator.log" file located in the test directory. 197func (tc *TestCase) StartLoad(options LoadOptions) { 198 tc.LoadGenerator.Start(options) 199} 200 201// StopLoad stops load generator. 202func (tc *TestCase) StopLoad() { 203 tc.LoadGenerator.Stop() 204} 205 206// StartBackend starts the specified backend type. 207func (tc *TestCase) StartBackend() { 208 require.NoError(tc.t, tc.MockBackend.Start(), "Cannot start backend") 209} 210 211// StopBackend stops the backend. 212func (tc *TestCase) StopBackend() { 213 tc.MockBackend.Stop() 214} 215 216// EnableRecording enables recording of all data received by MockBackend. 217func (tc *TestCase) EnableRecording() { 218 tc.MockBackend.EnableRecording() 219} 220 221// AgentMemoryInfo returns raw memory info struct about the agent 222// as returned by github.com/shirou/gopsutil/process 223func (tc *TestCase) AgentMemoryInfo() (uint32, uint32, error) { 224 stat, err := tc.agentProc.GetProcessMon().MemoryInfo() 225 if err != nil { 226 return 0, 0, err 227 } 228 return uint32(stat.RSS / mibibyte), uint32(stat.VMS / mibibyte), nil 229} 230 231// Stop stops the load generator, the agent and the backend. 232func (tc *TestCase) Stop() { 233 // Stop all components 234 tc.StopLoad() 235 tc.StopAgent() 236 tc.StopBackend() 237 238 // Stop logging 239 close(tc.doneSignal) 240 241 if tc.skipResults { 242 return 243 } 244 245 // Report test results 246 tc.validator.RecordResults(tc) 247} 248 249// ValidateData validates data received by mock backend against what was generated and sent to the collector 250// instance(s) under test by the LoadGenerator. 251func (tc *TestCase) ValidateData() { 252 select { 253 case <-tc.errorSignal: 254 // Error is already signaled and recorded. Validating data is pointless. 255 return 256 default: 257 } 258 259 tc.validator.Validate(tc) 260} 261 262// Sleep for specified duration or until error is signaled. 263func (tc *TestCase) Sleep(d time.Duration) { 264 select { 265 case <-time.After(d): 266 case <-tc.errorSignal: 267 } 268} 269 270// WaitForN the specific condition for up to a specified duration. Records a test error 271// if time is out and condition does not become true. If error is signaled 272// while waiting the function will return false, but will not record additional 273// test error (we assume that signaled error is already recorded in indicateError()). 274func (tc *TestCase) WaitForN(cond func() bool, duration time.Duration, errMsg ...interface{}) bool { 275 startTime := time.Now() 276 277 // Start with 5 ms waiting interval between condition re-evaluation. 278 waitInterval := time.Millisecond * 5 279 280 for { 281 if cond() { 282 return true 283 } 284 285 select { 286 case <-time.After(waitInterval): 287 case <-tc.errorSignal: 288 return false 289 } 290 291 // Increase waiting interval exponentially up to 500 ms. 292 if waitInterval < time.Millisecond*500 { 293 waitInterval *= 2 294 } 295 296 if time.Since(startTime) > duration { 297 // Waited too long 298 tc.t.Error("Time out waiting for", errMsg) 299 return false 300 } 301 } 302} 303 304// WaitFor is like WaitForN but with a fixed duration of 10 seconds 305func (tc *TestCase) WaitFor(cond func() bool, errMsg ...interface{}) bool { 306 return tc.WaitForN(cond, time.Second*10, errMsg...) 307} 308 309func (tc *TestCase) indicateError(err error) { 310 // Print to log for visibility 311 log.Print(err.Error()) 312 313 // Indicate error for the test 314 tc.t.Error(err.Error()) 315 316 tc.errorCause = err.Error() 317 318 // Signal the error via channel 319 close(tc.errorSignal) 320} 321 322func (tc *TestCase) logStats() { 323 t := time.NewTicker(tc.resourceSpec.ResourceCheckPeriod) 324 defer t.Stop() 325 326 for { 327 select { 328 case <-t.C: 329 tc.logStatsOnce() 330 case <-tc.doneSignal: 331 return 332 } 333 } 334} 335 336func (tc *TestCase) logStatsOnce() { 337 log.Printf("%s | %s | %s", 338 tc.agentProc.GetResourceConsumption(), 339 tc.LoadGenerator.GetStats(), 340 tc.MockBackend.GetStats()) 341} 342