1// Copyright (C) 2020 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package jaeger
5
6import (
7	"context"
8	"net"
9	"sync"
10	"testing"
11	"time"
12
13	"github.com/apache/thrift/lib/go/thrift"
14	"github.com/stretchr/testify/require"
15
16	"storj.io/monkit-jaeger/gen-go/agent"
17	"storj.io/monkit-jaeger/gen-go/jaeger"
18)
19
20// withAgent starts a mock agent on a local udp port.
21func withAgent(t *testing.T, f func(mock *MockAgent)) {
22	mock := NewMockAgent()
23
24	var wg sync.WaitGroup
25	wg.Add(1)
26	go func() {
27		err := mock.Serve()
28		require.NoError(t, err)
29		wg.Done()
30	}()
31
32	mock.WaitForStart()
33
34	f(mock)
35
36	err := mock.Close()
37	require.NoError(t, err)
38	wg.Wait()
39}
40
41// MockAgent implements jaeger agent interface.
42type MockAgent struct {
43	conn *net.UDPConn
44	addr string
45
46	cond    *sync.Cond
47	started chan struct{}
48	closed  bool
49	batches []*jaeger.Batch
50}
51
52func NewMockAgent() *MockAgent {
53	return &MockAgent{
54		cond:    sync.NewCond(new(sync.Mutex)),
55		batches: make([]*jaeger.Batch, 0),
56		started: make(chan struct{}),
57	}
58}
59
60// EmitBatch implements jaeger agent interface.
61func (m *MockAgent) EmitBatch(ctx context.Context, batch *jaeger.Batch) (err error) {
62	m.cond.L.Lock()
63	defer m.cond.L.Unlock()
64
65	m.batches = append(m.batches, batch)
66	m.cond.Broadcast()
67
68	return nil
69}
70
71func (m *MockAgent) WaitForBatches(dur time.Duration) []*jaeger.Batch {
72	done := false
73	timer := time.AfterFunc(dur, func() {
74		m.cond.L.Lock()
75		done = true
76		m.cond.L.Unlock()
77		m.cond.Broadcast()
78	})
79	defer timer.Stop()
80
81	m.cond.L.Lock()
82	defer m.cond.L.Unlock()
83
84	for len(m.batches) == 0 && !done {
85		m.cond.Wait()
86	}
87
88	if done {
89		return nil
90	}
91
92	batches := make([]*jaeger.Batch, len(m.batches))
93	copy(batches, m.batches)
94	return batches
95}
96
97// Addr returns the address of the agent.
98func (m *MockAgent) Addr() string {
99	return m.addr
100}
101
102// Close shutdown mock agent server.
103func (m *MockAgent) Close() error {
104	m.cond.L.Lock()
105	m.closed = true
106	m.cond.L.Unlock()
107
108	return m.conn.Close()
109}
110
111// Serve starts the mock agent.
112func (m *MockAgent) Serve() error {
113	addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
114	if err != nil {
115		return err
116	}
117	m.conn, err = net.ListenUDP(addr.Network(), addr)
118	if err != nil {
119		return err
120	}
121
122	m.addr = m.conn.LocalAddr().String()
123
124	handler := agent.NewAgentProcessor(m)
125	protocolFact := thrift.NewTCompactProtocolFactory()
126	trans := thrift.NewTMemoryBufferLen(maxPacketSize)
127	buf := make([]byte, maxPacketSize)
128
129	close(m.started)
130	for !m.isClosed() {
131		n, err := m.conn.Read(buf)
132		if err == nil {
133			trans.Write(buf[:n])
134			protocol := protocolFact.GetProtocol(trans)
135			_, _ = handler.Process(context.Background(), protocol, protocol)
136		}
137	}
138	return nil
139}
140
141// WaitForStart returns when mock agent server is ready.
142func (m *MockAgent) WaitForStart() {
143	<-m.started
144}
145
146func (m *MockAgent) isClosed() bool {
147	m.cond.L.Lock()
148	defer m.cond.L.Unlock()
149	return m.closed
150}
151