1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaegerreceiver
16
17import (
18	"context"
19	"fmt"
20	"html"
21	"io/ioutil"
22	"mime"
23	"net"
24	"net/http"
25	"sync"
26
27	apacheThrift "github.com/apache/thrift/lib/go/thrift"
28	"github.com/gorilla/mux"
29	"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
30	jSamplingConfig "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
31	"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
32	"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
33	"github.com/jaegertracing/jaeger/cmd/agent/app/servers"
34	"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
35	"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
36	collectorSampling "github.com/jaegertracing/jaeger/cmd/collector/app/sampling"
37	staticStrategyStore "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
38	"github.com/jaegertracing/jaeger/proto-gen/api_v2"
39	"github.com/jaegertracing/jaeger/thrift-gen/agent"
40	"github.com/jaegertracing/jaeger/thrift-gen/baggage"
41	"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
42	"github.com/jaegertracing/jaeger/thrift-gen/sampling"
43	"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
44	"github.com/uber/jaeger-lib/metrics"
45	"go.uber.org/zap"
46	"google.golang.org/grpc"
47
48	"go.opentelemetry.io/collector/client"
49	"go.opentelemetry.io/collector/component"
50	"go.opentelemetry.io/collector/config"
51	"go.opentelemetry.io/collector/config/configgrpc"
52	"go.opentelemetry.io/collector/config/confighttp"
53	"go.opentelemetry.io/collector/consumer"
54	"go.opentelemetry.io/collector/consumer/consumererror"
55	"go.opentelemetry.io/collector/obsreport"
56	jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
57)
58
59// configuration defines the behavior and the ports that
60// the Jaeger receiver will use.
61type configuration struct {
62	CollectorThriftPort         int
63	CollectorHTTPPort           int
64	CollectorHTTPSettings       confighttp.HTTPServerSettings
65	CollectorGRPCPort           int
66	CollectorGRPCServerSettings configgrpc.GRPCServerSettings
67
68	AgentCompactThriftPort       int
69	AgentCompactThriftConfig     ServerConfigUDP
70	AgentBinaryThriftPort        int
71	AgentBinaryThriftConfig      ServerConfigUDP
72	AgentHTTPPort                int
73	RemoteSamplingClientSettings configgrpc.GRPCClientSettings
74	RemoteSamplingStrategyFile   string
75}
76
77// Receiver type is used to receive spans that were originally intended to be sent to Jaeger.
78// This receiver is basically a Jaeger collector.
79type jReceiver struct {
80	nextConsumer consumer.Traces
81	id           config.ComponentID
82
83	config *configuration
84
85	grpc            *grpc.Server
86	collectorServer *http.Server
87
88	agentSamplingManager *jSamplingConfig.SamplingManager
89	agentProcessors      []processors.Processor
90	agentServer          *http.Server
91
92	goroutines sync.WaitGroup
93
94	logger *zap.Logger
95
96	grpcObsrecv *obsreport.Receiver
97	httpObsrecv *obsreport.Receiver
98}
99
100const (
101	agentTransportBinary   = "udp_thrift_binary"
102	agentTransportCompact  = "udp_thrift_compact"
103	collectorHTTPTransport = "collector_http"
104	grpcTransport          = "grpc"
105
106	thriftFormat   = "thrift"
107	protobufFormat = "protobuf"
108)
109
110var (
111	acceptedThriftFormats = map[string]struct{}{
112		"application/x-thrift":                 {},
113		"application/vnd.apache.thrift.binary": {},
114	}
115)
116
117// newJaegerReceiver creates a TracesReceiver that receives traffic as a Jaeger collector, and
118// also as a Jaeger agent.
119func newJaegerReceiver(
120	id config.ComponentID,
121	config *configuration,
122	nextConsumer consumer.Traces,
123	set component.ReceiverCreateSettings,
124) *jReceiver {
125	return &jReceiver{
126		config:       config,
127		nextConsumer: nextConsumer,
128		id:           id,
129		logger:       set.Logger,
130		grpcObsrecv:  obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: grpcTransport}),
131		httpObsrecv:  obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: collectorHTTPTransport}),
132	}
133}
134
135func (jr *jReceiver) agentCompactThriftAddr() string {
136	var port int
137	if jr.config != nil {
138		port = jr.config.AgentCompactThriftPort
139	}
140	return fmt.Sprintf(":%d", port)
141}
142
143func (jr *jReceiver) agentCompactThriftEnabled() bool {
144	return jr.config != nil && jr.config.AgentCompactThriftPort > 0
145}
146
147func (jr *jReceiver) agentBinaryThriftAddr() string {
148	var port int
149	if jr.config != nil {
150		port = jr.config.AgentBinaryThriftPort
151	}
152	return fmt.Sprintf(":%d", port)
153}
154
155func (jr *jReceiver) agentBinaryThriftEnabled() bool {
156	return jr.config != nil && jr.config.AgentBinaryThriftPort > 0
157}
158
159func (jr *jReceiver) agentHTTPAddr() string {
160	var port int
161	if jr.config != nil {
162		port = jr.config.AgentHTTPPort
163	}
164	return fmt.Sprintf(":%d", port)
165}
166
167func (jr *jReceiver) agentHTTPEnabled() bool {
168	return jr.config != nil && jr.config.AgentHTTPPort > 0
169}
170
171func (jr *jReceiver) collectorGRPCAddr() string {
172	var port int
173	if jr.config != nil {
174		port = jr.config.CollectorGRPCPort
175	}
176	return fmt.Sprintf(":%d", port)
177}
178
179func (jr *jReceiver) collectorGRPCEnabled() bool {
180	return jr.config != nil && jr.config.CollectorGRPCPort > 0
181}
182
183func (jr *jReceiver) collectorHTTPEnabled() bool {
184	return jr.config != nil && jr.config.CollectorHTTPPort > 0
185}
186
187func (jr *jReceiver) Start(_ context.Context, host component.Host) error {
188	if err := jr.startAgent(host); err != nil {
189		return err
190	}
191
192	return jr.startCollector(host)
193}
194
195func (jr *jReceiver) Shutdown(ctx context.Context) error {
196	var errs []error
197
198	if jr.agentServer != nil {
199		if aerr := jr.agentServer.Shutdown(ctx); aerr != nil {
200			errs = append(errs, aerr)
201		}
202	}
203	for _, processor := range jr.agentProcessors {
204		processor.Stop()
205	}
206
207	if jr.collectorServer != nil {
208		if cerr := jr.collectorServer.Shutdown(ctx); cerr != nil {
209			errs = append(errs, cerr)
210		}
211	}
212	if jr.grpc != nil {
213		jr.grpc.GracefulStop()
214	}
215
216	jr.goroutines.Wait()
217	return consumererror.Combine(errs)
218}
219
220func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.Traces) (int, error) {
221	if batch == nil {
222		return 0, nil
223	}
224	td := jaegertranslator.ThriftBatchToInternalTraces(batch)
225	return len(batch.Spans), consumer.ConsumeTraces(ctx, td)
226}
227
228var _ agent.Agent = (*agentHandler)(nil)
229var _ api_v2.CollectorServiceServer = (*jReceiver)(nil)
230var _ configmanager.ClientConfigManager = (*jReceiver)(nil)
231
232type agentHandler struct {
233	id           config.ComponentID
234	transport    string
235	nextConsumer consumer.Traces
236	obsrecv      *obsreport.Receiver
237}
238
239// EmitZipkinBatch is unsupported agent's
240func (h *agentHandler) EmitZipkinBatch(context.Context, []*zipkincore.Span) (err error) {
241	panic("unsupported receiver")
242}
243
244// EmitBatch implements thrift-gen/agent/Agent and it forwards
245// Jaeger spans received by the Jaeger agent processor.
246func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
247	ctx = h.obsrecv.StartTracesOp(ctx)
248	numSpans, err := consumeTraces(ctx, batch, h.nextConsumer)
249	h.obsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
250	return err
251}
252
253func (jr *jReceiver) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
254	return jr.agentSamplingManager.GetSamplingStrategy(ctx, serviceName)
255}
256
257func (jr *jReceiver) GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error) {
258	br, err := jr.agentSamplingManager.GetBaggageRestrictions(ctx, serviceName)
259	if err != nil {
260		// Baggage restrictions are not yet implemented - refer to - https://github.com/jaegertracing/jaeger/issues/373
261		// As of today, GetBaggageRestrictions() always returns an error.
262		// However, we `return nil, nil` here in order to serve a valid `200 OK` response.
263		return nil, nil
264	}
265	return br, nil
266}
267
268func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
269	if c, ok := client.FromGRPC(ctx); ok {
270		ctx = client.NewContext(ctx, c)
271	}
272
273	ctx = jr.grpcObsrecv.StartTracesOp(ctx)
274
275	td := jaegertranslator.ProtoBatchToInternalTraces(r.GetBatch())
276
277	err := jr.nextConsumer.ConsumeTraces(ctx, td)
278	jr.grpcObsrecv.EndTracesOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
279	if err != nil {
280		return nil, err
281	}
282
283	return &api_v2.PostSpansResponse{}, nil
284}
285
286func (jr *jReceiver) startAgent(host component.Host) error {
287	if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() && !jr.agentHTTPEnabled() {
288		return nil
289	}
290
291	if jr.agentBinaryThriftEnabled() {
292		h := &agentHandler{
293			id:           jr.id,
294			transport:    agentTransportBinary,
295			nextConsumer: jr.nextConsumer,
296			obsrecv:      obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: jr.id, Transport: agentTransportBinary}),
297		}
298		processor, err := jr.buildProcessor(jr.agentBinaryThriftAddr(), jr.config.AgentBinaryThriftConfig, apacheThrift.NewTBinaryProtocolFactoryConf(nil), h)
299		if err != nil {
300			return err
301		}
302		jr.agentProcessors = append(jr.agentProcessors, processor)
303	}
304
305	if jr.agentCompactThriftEnabled() {
306		h := &agentHandler{
307			id:           jr.id,
308			transport:    agentTransportCompact,
309			nextConsumer: jr.nextConsumer,
310			obsrecv:      obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: jr.id, Transport: agentTransportCompact}),
311		}
312		processor, err := jr.buildProcessor(jr.agentCompactThriftAddr(), jr.config.AgentCompactThriftConfig, apacheThrift.NewTCompactProtocolFactoryConf(nil), h)
313		if err != nil {
314			return err
315		}
316		jr.agentProcessors = append(jr.agentProcessors, processor)
317	}
318
319	jr.goroutines.Add(len(jr.agentProcessors))
320	for _, processor := range jr.agentProcessors {
321		go func(p processors.Processor) {
322			defer jr.goroutines.Done()
323			p.Serve()
324		}(processor)
325	}
326
327	// Start upstream grpc client before serving sampling endpoints over HTTP
328	if jr.config.RemoteSamplingClientSettings.Endpoint != "" {
329		grpcOpts, err := jr.config.RemoteSamplingClientSettings.ToDialOptions(host.GetExtensions())
330		if err != nil {
331			jr.logger.Error("Error creating grpc dial options for remote sampling endpoint", zap.Error(err))
332			return err
333		}
334		conn, err := grpc.Dial(jr.config.RemoteSamplingClientSettings.Endpoint, grpcOpts...)
335		if err != nil {
336			jr.logger.Error("Error creating grpc connection to jaeger remote sampling endpoint", zap.String("endpoint", jr.config.RemoteSamplingClientSettings.Endpoint))
337			return err
338		}
339
340		jr.agentSamplingManager = jSamplingConfig.NewConfigManager(conn)
341	}
342
343	if jr.agentHTTPEnabled() {
344		jr.agentServer = httpserver.NewHTTPServer(jr.agentHTTPAddr(), jr, metrics.NullFactory)
345
346		jr.goroutines.Add(1)
347		go func() {
348			defer jr.goroutines.Done()
349			if err := jr.agentServer.ListenAndServe(); err != http.ErrServerClosed {
350				host.ReportFatalError(fmt.Errorf("jaeger agent server error: %w", err))
351			}
352		}()
353	}
354
355	return nil
356}
357
358func (jr *jReceiver) buildProcessor(address string, cfg ServerConfigUDP, factory apacheThrift.TProtocolFactory, a agent.Agent) (processors.Processor, error) {
359	handler := agent.NewAgentProcessor(a)
360	transport, err := thriftudp.NewTUDPServerTransport(address)
361	if err != nil {
362		return nil, err
363	}
364	if cfg.SocketBufferSize > 0 {
365		if err = transport.SetSocketBufferSize(cfg.SocketBufferSize); err != nil {
366			return nil, err
367		}
368	}
369	server, err := servers.NewTBufferedServer(transport, cfg.QueueSize, cfg.MaxPacketSize, metrics.NullFactory)
370	if err != nil {
371		return nil, err
372	}
373	processor, err := processors.NewThriftProcessor(server, cfg.Workers, metrics.NullFactory, factory, handler, jr.logger)
374	if err != nil {
375		return nil, err
376	}
377	return processor, nil
378}
379
380func (jr *jReceiver) decodeThriftHTTPBody(r *http.Request) (*jaeger.Batch, *httpError) {
381	bodyBytes, err := ioutil.ReadAll(r.Body)
382	r.Body.Close()
383	if err != nil {
384		return nil, &httpError{
385			handler.UnableToReadBodyErrFormat,
386			http.StatusInternalServerError,
387		}
388	}
389
390	contentType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
391	if err != nil {
392		return nil, &httpError{
393			fmt.Sprintf("Cannot parse content type: %v", err),
394			http.StatusBadRequest,
395		}
396	}
397	if _, ok := acceptedThriftFormats[contentType]; !ok {
398		return nil, &httpError{
399			fmt.Sprintf("Unsupported content type: %v", contentType),
400			http.StatusBadRequest,
401		}
402	}
403
404	tdes := apacheThrift.NewTDeserializer()
405	batch := &jaeger.Batch{}
406	if err = tdes.Read(r.Context(), batch, bodyBytes); err != nil {
407		return nil, &httpError{
408			fmt.Sprintf(handler.UnableToReadBodyErrFormat, err),
409			http.StatusBadRequest,
410		}
411	}
412	return batch, nil
413}
414
415// HandleThriftHTTPBatch implements Jaeger HTTP Thrift handler.
416func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Request) {
417	ctx := r.Context()
418	if c, ok := client.FromHTTP(r); ok {
419		ctx = client.NewContext(ctx, c)
420	}
421
422	ctx = jr.httpObsrecv.StartTracesOp(ctx)
423
424	batch, hErr := jr.decodeThriftHTTPBody(r)
425	if hErr != nil {
426		http.Error(w, html.EscapeString(hErr.msg), hErr.statusCode)
427		jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, 0, hErr)
428		return
429	}
430
431	numSpans, err := consumeTraces(ctx, batch, jr.nextConsumer)
432	if err != nil {
433		http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
434	} else {
435		w.WriteHeader(http.StatusAccepted)
436	}
437	jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
438}
439
440func (jr *jReceiver) startCollector(host component.Host) error {
441	if !jr.collectorGRPCEnabled() && !jr.collectorHTTPEnabled() {
442		return nil
443	}
444
445	if jr.collectorHTTPEnabled() {
446		cln, cerr := jr.config.CollectorHTTPSettings.ToListener()
447		if cerr != nil {
448			return fmt.Errorf("failed to bind to Collector address %q: %v",
449				jr.config.CollectorHTTPSettings.Endpoint, cerr)
450		}
451
452		nr := mux.NewRouter()
453		nr.HandleFunc("/api/traces", jr.HandleThriftHTTPBatch).Methods(http.MethodPost)
454		jr.collectorServer = jr.config.CollectorHTTPSettings.ToServer(nr)
455		jr.goroutines.Add(1)
456		go func() {
457			defer jr.goroutines.Done()
458			if err := jr.collectorServer.Serve(cln); err != http.ErrServerClosed {
459				host.ReportFatalError(err)
460			}
461		}()
462	}
463
464	if jr.collectorGRPCEnabled() {
465		opts, err := jr.config.CollectorGRPCServerSettings.ToServerOption(host.GetExtensions())
466		if err != nil {
467			return fmt.Errorf("failed to build the options for the Jaeger gRPC Collector: %v", err)
468		}
469
470		jr.grpc = grpc.NewServer(opts...)
471		gaddr := jr.collectorGRPCAddr()
472		gln, gerr := net.Listen("tcp", gaddr)
473		if gerr != nil {
474			return fmt.Errorf("failed to bind to gRPC address %q: %v", gaddr, gerr)
475		}
476
477		api_v2.RegisterCollectorServiceServer(jr.grpc, jr)
478
479		// init and register sampling strategy store
480		ss, gerr := staticStrategyStore.NewStrategyStore(staticStrategyStore.Options{
481			StrategiesFile: jr.config.RemoteSamplingStrategyFile,
482		}, jr.logger)
483		if gerr != nil {
484			return fmt.Errorf("failed to create collector strategy store: %v", gerr)
485		}
486		api_v2.RegisterSamplingManagerServer(jr.grpc, collectorSampling.NewGRPCHandler(ss))
487
488		jr.goroutines.Add(1)
489		go func() {
490			defer jr.goroutines.Done()
491			if err := jr.grpc.Serve(gln); err != nil && err != grpc.ErrServerStopped {
492				host.ReportFatalError(err)
493			}
494		}()
495	}
496
497	return nil
498}
499