1// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package testutils
16
17import (
18	"encoding/json"
19	"errors"
20	"fmt"
21	"net/http"
22	"net/http/httptest"
23	"sync"
24	"sync/atomic"
25
26	"github.com/uber/jaeger-client-go/thrift"
27
28	"github.com/uber/jaeger-client-go/thrift-gen/agent"
29	"github.com/uber/jaeger-client-go/thrift-gen/jaeger"
30	"github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
31	"github.com/uber/jaeger-client-go/utils"
32)
33
34// StartMockAgent runs a mock representation of jaeger-agent.
35// This function returns a started server.
36func StartMockAgent() (*MockAgent, error) {
37	transport, err := NewTUDPServerTransport("127.0.0.1:0")
38	if err != nil {
39		return nil, err
40	}
41
42	samplingManager := newSamplingManager()
43	samplingHandler := &samplingHandler{manager: samplingManager}
44	samplingServer := httptest.NewServer(samplingHandler)
45
46	agent := &MockAgent{
47		transport:   transport,
48		samplingMgr: samplingManager,
49		samplingSrv: samplingServer,
50	}
51
52	var started sync.WaitGroup
53	started.Add(1)
54	go agent.serve(&started)
55	started.Wait()
56
57	return agent, nil
58}
59
60// Close stops the serving of traffic
61func (s *MockAgent) Close() {
62	atomic.StoreUint32(&s.serving, 0)
63	s.transport.Close()
64	s.samplingSrv.Close()
65}
66
67// MockAgent is a mock representation of Jaeger Agent.
68// It receives spans over UDP, and has an HTTP endpoint for sampling strategies.
69type MockAgent struct {
70	transport     *TUDPTransport
71	jaegerBatches []*jaeger.Batch
72	mutex         sync.Mutex
73	serving       uint32
74	samplingMgr   *samplingManager
75	samplingSrv   *httptest.Server
76}
77
78// SpanServerAddr returns the UDP host:port where MockAgent listens for spans
79func (s *MockAgent) SpanServerAddr() string {
80	return s.transport.Addr().String()
81}
82
83// SpanServerClient returns a UDP client that can be used to send spans to the MockAgent
84func (s *MockAgent) SpanServerClient() (agent.Agent, error) {
85	return utils.NewAgentClientUDP(s.SpanServerAddr(), 0)
86}
87
88// SamplingServerAddr returns the host:port of HTTP server exposing sampling strategy endpoint
89func (s *MockAgent) SamplingServerAddr() string {
90	return s.samplingSrv.Listener.Addr().String()
91}
92
93func (s *MockAgent) serve(started *sync.WaitGroup) {
94	handler := agent.NewAgentProcessor(s)
95	protocolFact := thrift.NewTCompactProtocolFactory()
96	buf := make([]byte, utils.UDPPacketMaxLength, utils.UDPPacketMaxLength)
97	trans := thrift.NewTMemoryBufferLen(utils.UDPPacketMaxLength)
98
99	atomic.StoreUint32(&s.serving, 1)
100	started.Done()
101	for s.IsServing() {
102		n, err := s.transport.Read(buf)
103		if err == nil {
104			trans.Write(buf[:n])
105			protocol := protocolFact.GetProtocol(trans)
106			handler.Process(protocol, protocol)
107		}
108	}
109}
110
111// EmitZipkinBatch is deprecated, use EmitBatch
112func (s *MockAgent) EmitZipkinBatch(spans []*zipkincore.Span) (err error) {
113	// TODO remove this for 3.0.0
114	return errors.New("Not implemented")
115}
116
117// GetZipkinSpans is deprecated use GetJaegerBatches
118func (s *MockAgent) GetZipkinSpans() []*zipkincore.Span {
119	return nil
120}
121
122// ResetZipkinSpans is deprecated use ResetJaegerBatches
123func (s *MockAgent) ResetZipkinSpans() {}
124
125// EmitBatch implements EmitBatch() of TChanSamplingManagerServer
126func (s *MockAgent) EmitBatch(batch *jaeger.Batch) (err error) {
127	s.mutex.Lock()
128	defer s.mutex.Unlock()
129	s.jaegerBatches = append(s.jaegerBatches, batch)
130	return err
131}
132
133// IsServing indicates whether the server is currently serving traffic
134func (s *MockAgent) IsServing() bool {
135	return atomic.LoadUint32(&s.serving) == 1
136}
137
138// AddSamplingStrategy registers a sampling strategy for a service
139func (s *MockAgent) AddSamplingStrategy(service string, strategy interface{}) {
140	s.samplingMgr.AddSamplingStrategy(service, strategy)
141}
142
143// GetJaegerBatches returns accumulated Jaeger batches
144func (s *MockAgent) GetJaegerBatches() []*jaeger.Batch {
145	s.mutex.Lock()
146	defer s.mutex.Unlock()
147	n := len(s.jaegerBatches)
148	batches := make([]*jaeger.Batch, n, n)
149	copy(batches, s.jaegerBatches)
150	return batches
151}
152
153// ResetJaegerBatches discards accumulated Jaeger batches
154func (s *MockAgent) ResetJaegerBatches() {
155	s.mutex.Lock()
156	defer s.mutex.Unlock()
157	s.jaegerBatches = nil
158}
159
160type samplingHandler struct {
161	manager *samplingManager
162}
163
164func (h *samplingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
165	services := r.URL.Query()["service"]
166	if len(services) == 0 {
167		http.Error(w, "'service' parameter is empty", http.StatusBadRequest)
168		return
169	}
170	if len(services) > 1 {
171		http.Error(w, "'service' parameter must occur only once", http.StatusBadRequest)
172		return
173	}
174	resp, err := h.manager.GetSamplingStrategy(services[0])
175	if err != nil {
176		http.Error(w, fmt.Sprintf("Error retrieving strategy: %+v", err), http.StatusInternalServerError)
177		return
178	}
179	json, err := json.Marshal(resp)
180	if err != nil {
181		http.Error(w, "Cannot marshall Thrift to JSON", http.StatusInternalServerError)
182		return
183	}
184	w.Header().Add("Content-Type", "application/json")
185	if _, err := w.Write(json); err != nil {
186		return
187	}
188}
189