1// Copyright (C) 2020 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package jaeger 5 6import ( 7 "context" 8 "net" 9 "sync" 10 "testing" 11 "time" 12 13 "github.com/apache/thrift/lib/go/thrift" 14 "github.com/stretchr/testify/require" 15 16 "storj.io/monkit-jaeger/gen-go/agent" 17 "storj.io/monkit-jaeger/gen-go/jaeger" 18) 19 20// withAgent starts a mock agent on a local udp port. 21func withAgent(t *testing.T, f func(mock *MockAgent)) { 22 mock := NewMockAgent() 23 24 var wg sync.WaitGroup 25 wg.Add(1) 26 go func() { 27 err := mock.Serve() 28 require.NoError(t, err) 29 wg.Done() 30 }() 31 32 mock.WaitForStart() 33 34 f(mock) 35 36 err := mock.Close() 37 require.NoError(t, err) 38 wg.Wait() 39} 40 41// MockAgent implements jaeger agent interface. 42type MockAgent struct { 43 conn *net.UDPConn 44 addr string 45 46 cond *sync.Cond 47 started chan struct{} 48 closed bool 49 batches []*jaeger.Batch 50} 51 52func NewMockAgent() *MockAgent { 53 return &MockAgent{ 54 cond: sync.NewCond(new(sync.Mutex)), 55 batches: make([]*jaeger.Batch, 0), 56 started: make(chan struct{}), 57 } 58} 59 60// EmitBatch implements jaeger agent interface. 61func (m *MockAgent) EmitBatch(ctx context.Context, batch *jaeger.Batch) (err error) { 62 m.cond.L.Lock() 63 defer m.cond.L.Unlock() 64 65 m.batches = append(m.batches, batch) 66 m.cond.Broadcast() 67 68 return nil 69} 70 71func (m *MockAgent) WaitForBatches(dur time.Duration) []*jaeger.Batch { 72 done := false 73 timer := time.AfterFunc(dur, func() { 74 m.cond.L.Lock() 75 done = true 76 m.cond.L.Unlock() 77 m.cond.Broadcast() 78 }) 79 defer timer.Stop() 80 81 m.cond.L.Lock() 82 defer m.cond.L.Unlock() 83 84 for len(m.batches) == 0 && !done { 85 m.cond.Wait() 86 } 87 88 if done { 89 return nil 90 } 91 92 batches := make([]*jaeger.Batch, len(m.batches)) 93 copy(batches, m.batches) 94 return batches 95} 96 97// Addr returns the address of the agent. 98func (m *MockAgent) Addr() string { 99 return m.addr 100} 101 102// Close shutdown mock agent server. 103func (m *MockAgent) Close() error { 104 m.cond.L.Lock() 105 m.closed = true 106 m.cond.L.Unlock() 107 108 return m.conn.Close() 109} 110 111// Serve starts the mock agent. 112func (m *MockAgent) Serve() error { 113 addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") 114 if err != nil { 115 return err 116 } 117 m.conn, err = net.ListenUDP(addr.Network(), addr) 118 if err != nil { 119 return err 120 } 121 122 m.addr = m.conn.LocalAddr().String() 123 124 handler := agent.NewAgentProcessor(m) 125 protocolFact := thrift.NewTCompactProtocolFactory() 126 trans := thrift.NewTMemoryBufferLen(maxPacketSize) 127 buf := make([]byte, maxPacketSize) 128 129 close(m.started) 130 for !m.isClosed() { 131 n, err := m.conn.Read(buf) 132 if err == nil { 133 trans.Write(buf[:n]) 134 protocol := protocolFact.GetProtocol(trans) 135 _, _ = handler.Process(context.Background(), protocol, protocol) 136 } 137 } 138 return nil 139} 140 141// WaitForStart returns when mock agent server is ready. 142func (m *MockAgent) WaitForStart() { 143 <-m.started 144} 145 146func (m *MockAgent) isClosed() bool { 147 m.cond.L.Lock() 148 defer m.cond.L.Unlock() 149 return m.closed 150} 151