1// Copyright (c) 2017 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package testutils 16 17import ( 18 "encoding/json" 19 "errors" 20 "fmt" 21 "net/http" 22 "net/http/httptest" 23 "sync" 24 "sync/atomic" 25 26 "github.com/uber/jaeger-client-go/thrift" 27 28 "github.com/uber/jaeger-client-go/thrift-gen/agent" 29 "github.com/uber/jaeger-client-go/thrift-gen/jaeger" 30 "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" 31 "github.com/uber/jaeger-client-go/utils" 32) 33 34// StartMockAgent runs a mock representation of jaeger-agent. 35// This function returns a started server. 36func StartMockAgent() (*MockAgent, error) { 37 transport, err := NewTUDPServerTransport("127.0.0.1:0") 38 if err != nil { 39 return nil, err 40 } 41 42 samplingManager := newSamplingManager() 43 samplingHandler := &samplingHandler{manager: samplingManager} 44 samplingServer := httptest.NewServer(samplingHandler) 45 46 agent := &MockAgent{ 47 transport: transport, 48 samplingMgr: samplingManager, 49 samplingSrv: samplingServer, 50 } 51 52 var started sync.WaitGroup 53 started.Add(1) 54 go agent.serve(&started) 55 started.Wait() 56 57 return agent, nil 58} 59 60// Close stops the serving of traffic 61func (s *MockAgent) Close() { 62 atomic.StoreUint32(&s.serving, 0) 63 s.transport.Close() 64 s.samplingSrv.Close() 65} 66 67// MockAgent is a mock representation of Jaeger Agent. 68// It receives spans over UDP, and has an HTTP endpoint for sampling strategies. 69type MockAgent struct { 70 transport *TUDPTransport 71 jaegerBatches []*jaeger.Batch 72 mutex sync.Mutex 73 serving uint32 74 samplingMgr *samplingManager 75 samplingSrv *httptest.Server 76} 77 78// SpanServerAddr returns the UDP host:port where MockAgent listens for spans 79func (s *MockAgent) SpanServerAddr() string { 80 return s.transport.Addr().String() 81} 82 83// SpanServerClient returns a UDP client that can be used to send spans to the MockAgent 84func (s *MockAgent) SpanServerClient() (agent.Agent, error) { 85 return utils.NewAgentClientUDP(s.SpanServerAddr(), 0) 86} 87 88// SamplingServerAddr returns the host:port of HTTP server exposing sampling strategy endpoint 89func (s *MockAgent) SamplingServerAddr() string { 90 return s.samplingSrv.Listener.Addr().String() 91} 92 93func (s *MockAgent) serve(started *sync.WaitGroup) { 94 handler := agent.NewAgentProcessor(s) 95 protocolFact := thrift.NewTCompactProtocolFactory() 96 buf := make([]byte, utils.UDPPacketMaxLength, utils.UDPPacketMaxLength) 97 trans := thrift.NewTMemoryBufferLen(utils.UDPPacketMaxLength) 98 99 atomic.StoreUint32(&s.serving, 1) 100 started.Done() 101 for s.IsServing() { 102 n, err := s.transport.Read(buf) 103 if err == nil { 104 trans.Write(buf[:n]) 105 protocol := protocolFact.GetProtocol(trans) 106 handler.Process(protocol, protocol) 107 } 108 } 109} 110 111// EmitZipkinBatch is deprecated, use EmitBatch 112func (s *MockAgent) EmitZipkinBatch(spans []*zipkincore.Span) (err error) { 113 // TODO remove this for 3.0.0 114 return errors.New("Not implemented") 115} 116 117// GetZipkinSpans is deprecated use GetJaegerBatches 118func (s *MockAgent) GetZipkinSpans() []*zipkincore.Span { 119 return nil 120} 121 122// ResetZipkinSpans is deprecated use ResetJaegerBatches 123func (s *MockAgent) ResetZipkinSpans() {} 124 125// EmitBatch implements EmitBatch() of TChanSamplingManagerServer 126func (s *MockAgent) EmitBatch(batch *jaeger.Batch) (err error) { 127 s.mutex.Lock() 128 defer s.mutex.Unlock() 129 s.jaegerBatches = append(s.jaegerBatches, batch) 130 return err 131} 132 133// IsServing indicates whether the server is currently serving traffic 134func (s *MockAgent) IsServing() bool { 135 return atomic.LoadUint32(&s.serving) == 1 136} 137 138// AddSamplingStrategy registers a sampling strategy for a service 139func (s *MockAgent) AddSamplingStrategy(service string, strategy interface{}) { 140 s.samplingMgr.AddSamplingStrategy(service, strategy) 141} 142 143// GetJaegerBatches returns accumulated Jaeger batches 144func (s *MockAgent) GetJaegerBatches() []*jaeger.Batch { 145 s.mutex.Lock() 146 defer s.mutex.Unlock() 147 n := len(s.jaegerBatches) 148 batches := make([]*jaeger.Batch, n, n) 149 copy(batches, s.jaegerBatches) 150 return batches 151} 152 153// ResetJaegerBatches discards accumulated Jaeger batches 154func (s *MockAgent) ResetJaegerBatches() { 155 s.mutex.Lock() 156 defer s.mutex.Unlock() 157 s.jaegerBatches = nil 158} 159 160type samplingHandler struct { 161 manager *samplingManager 162} 163 164func (h *samplingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 165 services := r.URL.Query()["service"] 166 if len(services) == 0 { 167 http.Error(w, "'service' parameter is empty", http.StatusBadRequest) 168 return 169 } 170 if len(services) > 1 { 171 http.Error(w, "'service' parameter must occur only once", http.StatusBadRequest) 172 return 173 } 174 resp, err := h.manager.GetSamplingStrategy(services[0]) 175 if err != nil { 176 http.Error(w, fmt.Sprintf("Error retrieving strategy: %+v", err), http.StatusInternalServerError) 177 return 178 } 179 json, err := json.Marshal(resp) 180 if err != nil { 181 http.Error(w, "Cannot marshall Thrift to JSON", http.StatusInternalServerError) 182 return 183 } 184 w.Header().Add("Content-Type", "application/json") 185 if _, err := w.Write(json); err != nil { 186 return 187 } 188} 189