1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package testbed
16
17import (
18	"log"
19	"net"
20	"os"
21	"path"
22	"path/filepath"
23	"testing"
24	"time"
25
26	"github.com/stretchr/testify/require"
27)
28
29// TestCase defines a running test case.
30type TestCase struct {
31	t *testing.T
32
33	// Directory where test case results and logs will be written.
34	resultDir string
35
36	// does not write out results when set to true
37	skipResults bool
38
39	// Agent config file path.
40	agentConfigFile string
41
42	// Load generator spec file path.
43	// loadSpecFile string
44
45	// Resource spec for agent.
46	resourceSpec ResourceSpec
47
48	// Agent process.
49	agentProc OtelcolRunner
50
51	Sender   DataSender
52	receiver DataReceiver
53
54	LoadGenerator *LoadGenerator
55	MockBackend   *MockBackend
56	validator     TestCaseValidator
57
58	startTime time.Time
59
60	// errorSignal indicates an error in the test case execution, e.g. process execution
61	// failure or exceeding resource consumption, etc. The actual error message is already
62	// logged, this is only an indicator on which you can wait to be informed.
63	errorSignal chan struct{}
64	// Duration is the requested duration of the tests. Configured via TESTBED_DURATION
65	// env variable and defaults to 15 seconds if env variable is unspecified.
66	Duration       time.Duration
67	doneSignal     chan struct{}
68	errorCause     string
69	resultsSummary TestResultsSummary
70}
71
72const mibibyte = 1024 * 1024
73const testcaseDurationVar = "TESTCASE_DURATION"
74
75// NewTestCase creates a new TestCase. It expects agent-config.yaml in the specified directory.
76func NewTestCase(
77	t *testing.T,
78	dataProvider DataProvider,
79	sender DataSender,
80	receiver DataReceiver,
81	agentProc OtelcolRunner,
82	validator TestCaseValidator,
83	resultsSummary TestResultsSummary,
84	opts ...TestCaseOption,
85) *TestCase {
86	tc := TestCase{
87		t:              t,
88		errorSignal:    make(chan struct{}),
89		doneSignal:     make(chan struct{}),
90		startTime:      time.Now(),
91		Sender:         sender,
92		receiver:       receiver,
93		agentProc:      agentProc,
94		validator:      validator,
95		resultsSummary: resultsSummary,
96	}
97
98	// Get requested test case duration from env variable.
99	duration := os.Getenv(testcaseDurationVar)
100	if duration == "" {
101		duration = "15s"
102	}
103	var err error
104	tc.Duration, err = time.ParseDuration(duration)
105	if err != nil {
106		log.Fatalf("Invalid "+testcaseDurationVar+": %v. Expecting a valid duration string.", duration)
107	}
108
109	// Apply all provided options.
110	for _, opt := range opts {
111		opt(&tc)
112	}
113
114	// Prepare directory for results.
115	tc.resultDir, err = filepath.Abs(path.Join("results", t.Name()))
116	require.NoErrorf(t, err, "Cannot resolve %s", t.Name())
117	require.NoErrorf(t, os.MkdirAll(tc.resultDir, os.ModePerm), "Cannot create directory %s", tc.resultDir)
118
119	// Set default resource check period.
120	tc.resourceSpec.ResourceCheckPeriod = 3 * time.Second
121	if tc.Duration < tc.resourceSpec.ResourceCheckPeriod {
122		// Resource check period should not be longer than entire test duration.
123		tc.resourceSpec.ResourceCheckPeriod = tc.Duration
124	}
125
126	tc.LoadGenerator, err = NewLoadGenerator(dataProvider, sender)
127	require.NoError(t, err, "Cannot create generator")
128
129	tc.MockBackend = NewMockBackend(tc.composeTestResultFileName("backend.log"), receiver)
130
131	go tc.logStats()
132
133	return &tc
134}
135
136func (tc *TestCase) composeTestResultFileName(fileName string) string {
137	fileName, err := filepath.Abs(path.Join(tc.resultDir, fileName))
138	require.NoError(tc.t, err, "Cannot resolve %s", fileName)
139	return fileName
140}
141
142// StartAgent starts the agent and redirects its standard output and standard error
143// to "agent.log" file located in the test directory.
144func (tc *TestCase) StartAgent(args ...string) {
145	if tc.agentConfigFile != "" {
146		args = append(args, "--config")
147		args = append(args, tc.agentConfigFile)
148	}
149	logFileName := tc.composeTestResultFileName("agent.log")
150
151	err := tc.agentProc.Start(StartParams{
152		Name:         "Agent",
153		LogFilePath:  logFileName,
154		CmdArgs:      args,
155		resourceSpec: &tc.resourceSpec,
156	})
157
158	if err != nil {
159		tc.indicateError(err)
160		return
161	}
162
163	// Start watching resource consumption.
164	go func() {
165		err := tc.agentProc.WatchResourceConsumption()
166		if err != nil {
167			tc.indicateError(err)
168		}
169	}()
170
171	endpoint := tc.LoadGenerator.sender.GetEndpoint()
172	if endpoint != nil {
173		// Wait for agent to start. We consider the agent started when we can
174		// connect to the port to which we intend to send load. We only do this
175		// if the endpoint is not-empty, i.e. the sender does use network (some senders
176		// like text log writers don't).
177		tc.WaitFor(func() bool {
178			conn, err := net.Dial(tc.LoadGenerator.sender.GetEndpoint().Network(), tc.LoadGenerator.sender.GetEndpoint().String())
179			if err == nil && conn != nil {
180				conn.Close()
181				return true
182			}
183			return false
184		})
185	}
186}
187
188// StopAgent stops agent process.
189func (tc *TestCase) StopAgent() {
190	if _, err := tc.agentProc.Stop(); err != nil {
191		tc.indicateError(err)
192	}
193}
194
195// StartLoad starts the load generator and redirects its standard output and standard error
196// to "load-generator.log" file located in the test directory.
197func (tc *TestCase) StartLoad(options LoadOptions) {
198	tc.LoadGenerator.Start(options)
199}
200
201// StopLoad stops load generator.
202func (tc *TestCase) StopLoad() {
203	tc.LoadGenerator.Stop()
204}
205
206// StartBackend starts the specified backend type.
207func (tc *TestCase) StartBackend() {
208	require.NoError(tc.t, tc.MockBackend.Start(), "Cannot start backend")
209}
210
211// StopBackend stops the backend.
212func (tc *TestCase) StopBackend() {
213	tc.MockBackend.Stop()
214}
215
216// EnableRecording enables recording of all data received by MockBackend.
217func (tc *TestCase) EnableRecording() {
218	tc.MockBackend.EnableRecording()
219}
220
221// AgentMemoryInfo returns raw memory info struct about the agent
222// as returned by github.com/shirou/gopsutil/process
223func (tc *TestCase) AgentMemoryInfo() (uint32, uint32, error) {
224	stat, err := tc.agentProc.GetProcessMon().MemoryInfo()
225	if err != nil {
226		return 0, 0, err
227	}
228	return uint32(stat.RSS / mibibyte), uint32(stat.VMS / mibibyte), nil
229}
230
231// Stop stops the load generator, the agent and the backend.
232func (tc *TestCase) Stop() {
233	// Stop all components
234	tc.StopLoad()
235	tc.StopAgent()
236	tc.StopBackend()
237
238	// Stop logging
239	close(tc.doneSignal)
240
241	if tc.skipResults {
242		return
243	}
244
245	// Report test results
246	tc.validator.RecordResults(tc)
247}
248
249// ValidateData validates data received by mock backend against what was generated and sent to the collector
250// instance(s) under test by the LoadGenerator.
251func (tc *TestCase) ValidateData() {
252	select {
253	case <-tc.errorSignal:
254		// Error is already signaled and recorded. Validating data is pointless.
255		return
256	default:
257	}
258
259	tc.validator.Validate(tc)
260}
261
262// Sleep for specified duration or until error is signaled.
263func (tc *TestCase) Sleep(d time.Duration) {
264	select {
265	case <-time.After(d):
266	case <-tc.errorSignal:
267	}
268}
269
270// WaitForN the specific condition for up to a specified duration. Records a test error
271// if time is out and condition does not become true. If error is signaled
272// while waiting the function will return false, but will not record additional
273// test error (we assume that signaled error is already recorded in indicateError()).
274func (tc *TestCase) WaitForN(cond func() bool, duration time.Duration, errMsg ...interface{}) bool {
275	startTime := time.Now()
276
277	// Start with 5 ms waiting interval between condition re-evaluation.
278	waitInterval := time.Millisecond * 5
279
280	for {
281		if cond() {
282			return true
283		}
284
285		select {
286		case <-time.After(waitInterval):
287		case <-tc.errorSignal:
288			return false
289		}
290
291		// Increase waiting interval exponentially up to 500 ms.
292		if waitInterval < time.Millisecond*500 {
293			waitInterval *= 2
294		}
295
296		if time.Since(startTime) > duration {
297			// Waited too long
298			tc.t.Error("Time out waiting for", errMsg)
299			return false
300		}
301	}
302}
303
304// WaitFor is like WaitForN but with a fixed duration of 10 seconds
305func (tc *TestCase) WaitFor(cond func() bool, errMsg ...interface{}) bool {
306	return tc.WaitForN(cond, time.Second*10, errMsg...)
307}
308
309func (tc *TestCase) indicateError(err error) {
310	// Print to log for visibility
311	log.Print(err.Error())
312
313	// Indicate error for the test
314	tc.t.Error(err.Error())
315
316	tc.errorCause = err.Error()
317
318	// Signal the error via channel
319	close(tc.errorSignal)
320}
321
322func (tc *TestCase) logStats() {
323	t := time.NewTicker(tc.resourceSpec.ResourceCheckPeriod)
324	defer t.Stop()
325
326	for {
327		select {
328		case <-t.C:
329			tc.logStatsOnce()
330		case <-tc.doneSignal:
331			return
332		}
333	}
334}
335
336func (tc *TestCase) logStatsOnce() {
337	log.Printf("%s | %s | %s",
338		tc.agentProc.GetResourceConsumption(),
339		tc.LoadGenerator.GetStats(),
340		tc.MockBackend.GetStats())
341}
342