1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaegerreceiver 16 17import ( 18 "context" 19 "fmt" 20 "html" 21 "io/ioutil" 22 "mime" 23 "net" 24 "net/http" 25 "sync" 26 27 apacheThrift "github.com/apache/thrift/lib/go/thrift" 28 "github.com/gorilla/mux" 29 "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" 30 jSamplingConfig "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc" 31 "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" 32 "github.com/jaegertracing/jaeger/cmd/agent/app/processors" 33 "github.com/jaegertracing/jaeger/cmd/agent/app/servers" 34 "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" 35 "github.com/jaegertracing/jaeger/cmd/collector/app/handler" 36 collectorSampling "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" 37 staticStrategyStore "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static" 38 "github.com/jaegertracing/jaeger/proto-gen/api_v2" 39 "github.com/jaegertracing/jaeger/thrift-gen/agent" 40 "github.com/jaegertracing/jaeger/thrift-gen/baggage" 41 "github.com/jaegertracing/jaeger/thrift-gen/jaeger" 42 "github.com/jaegertracing/jaeger/thrift-gen/sampling" 43 "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" 44 "github.com/uber/jaeger-lib/metrics" 45 "go.uber.org/zap" 46 "google.golang.org/grpc" 47 48 "go.opentelemetry.io/collector/client" 49 "go.opentelemetry.io/collector/component" 50 "go.opentelemetry.io/collector/config" 51 "go.opentelemetry.io/collector/config/configgrpc" 52 "go.opentelemetry.io/collector/config/confighttp" 53 "go.opentelemetry.io/collector/consumer" 54 "go.opentelemetry.io/collector/consumer/consumererror" 55 "go.opentelemetry.io/collector/obsreport" 56 jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" 57) 58 59// configuration defines the behavior and the ports that 60// the Jaeger receiver will use. 61type configuration struct { 62 CollectorThriftPort int 63 CollectorHTTPPort int 64 CollectorHTTPSettings confighttp.HTTPServerSettings 65 CollectorGRPCPort int 66 CollectorGRPCServerSettings configgrpc.GRPCServerSettings 67 68 AgentCompactThriftPort int 69 AgentCompactThriftConfig ServerConfigUDP 70 AgentBinaryThriftPort int 71 AgentBinaryThriftConfig ServerConfigUDP 72 AgentHTTPPort int 73 RemoteSamplingClientSettings configgrpc.GRPCClientSettings 74 RemoteSamplingStrategyFile string 75} 76 77// Receiver type is used to receive spans that were originally intended to be sent to Jaeger. 78// This receiver is basically a Jaeger collector. 79type jReceiver struct { 80 nextConsumer consumer.Traces 81 id config.ComponentID 82 83 config *configuration 84 85 grpc *grpc.Server 86 collectorServer *http.Server 87 88 agentSamplingManager *jSamplingConfig.SamplingManager 89 agentProcessors []processors.Processor 90 agentServer *http.Server 91 92 goroutines sync.WaitGroup 93 94 logger *zap.Logger 95 96 grpcObsrecv *obsreport.Receiver 97 httpObsrecv *obsreport.Receiver 98} 99 100const ( 101 agentTransportBinary = "udp_thrift_binary" 102 agentTransportCompact = "udp_thrift_compact" 103 collectorHTTPTransport = "collector_http" 104 grpcTransport = "grpc" 105 106 thriftFormat = "thrift" 107 protobufFormat = "protobuf" 108) 109 110var ( 111 acceptedThriftFormats = map[string]struct{}{ 112 "application/x-thrift": {}, 113 "application/vnd.apache.thrift.binary": {}, 114 } 115) 116 117// newJaegerReceiver creates a TracesReceiver that receives traffic as a Jaeger collector, and 118// also as a Jaeger agent. 119func newJaegerReceiver( 120 id config.ComponentID, 121 config *configuration, 122 nextConsumer consumer.Traces, 123 set component.ReceiverCreateSettings, 124) *jReceiver { 125 return &jReceiver{ 126 config: config, 127 nextConsumer: nextConsumer, 128 id: id, 129 logger: set.Logger, 130 grpcObsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: grpcTransport}), 131 httpObsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: collectorHTTPTransport}), 132 } 133} 134 135func (jr *jReceiver) agentCompactThriftAddr() string { 136 var port int 137 if jr.config != nil { 138 port = jr.config.AgentCompactThriftPort 139 } 140 return fmt.Sprintf(":%d", port) 141} 142 143func (jr *jReceiver) agentCompactThriftEnabled() bool { 144 return jr.config != nil && jr.config.AgentCompactThriftPort > 0 145} 146 147func (jr *jReceiver) agentBinaryThriftAddr() string { 148 var port int 149 if jr.config != nil { 150 port = jr.config.AgentBinaryThriftPort 151 } 152 return fmt.Sprintf(":%d", port) 153} 154 155func (jr *jReceiver) agentBinaryThriftEnabled() bool { 156 return jr.config != nil && jr.config.AgentBinaryThriftPort > 0 157} 158 159func (jr *jReceiver) agentHTTPAddr() string { 160 var port int 161 if jr.config != nil { 162 port = jr.config.AgentHTTPPort 163 } 164 return fmt.Sprintf(":%d", port) 165} 166 167func (jr *jReceiver) agentHTTPEnabled() bool { 168 return jr.config != nil && jr.config.AgentHTTPPort > 0 169} 170 171func (jr *jReceiver) collectorGRPCAddr() string { 172 var port int 173 if jr.config != nil { 174 port = jr.config.CollectorGRPCPort 175 } 176 return fmt.Sprintf(":%d", port) 177} 178 179func (jr *jReceiver) collectorGRPCEnabled() bool { 180 return jr.config != nil && jr.config.CollectorGRPCPort > 0 181} 182 183func (jr *jReceiver) collectorHTTPEnabled() bool { 184 return jr.config != nil && jr.config.CollectorHTTPPort > 0 185} 186 187func (jr *jReceiver) Start(_ context.Context, host component.Host) error { 188 if err := jr.startAgent(host); err != nil { 189 return err 190 } 191 192 return jr.startCollector(host) 193} 194 195func (jr *jReceiver) Shutdown(ctx context.Context) error { 196 var errs []error 197 198 if jr.agentServer != nil { 199 if aerr := jr.agentServer.Shutdown(ctx); aerr != nil { 200 errs = append(errs, aerr) 201 } 202 } 203 for _, processor := range jr.agentProcessors { 204 processor.Stop() 205 } 206 207 if jr.collectorServer != nil { 208 if cerr := jr.collectorServer.Shutdown(ctx); cerr != nil { 209 errs = append(errs, cerr) 210 } 211 } 212 if jr.grpc != nil { 213 jr.grpc.GracefulStop() 214 } 215 216 jr.goroutines.Wait() 217 return consumererror.Combine(errs) 218} 219 220func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.Traces) (int, error) { 221 if batch == nil { 222 return 0, nil 223 } 224 td := jaegertranslator.ThriftBatchToInternalTraces(batch) 225 return len(batch.Spans), consumer.ConsumeTraces(ctx, td) 226} 227 228var _ agent.Agent = (*agentHandler)(nil) 229var _ api_v2.CollectorServiceServer = (*jReceiver)(nil) 230var _ configmanager.ClientConfigManager = (*jReceiver)(nil) 231 232type agentHandler struct { 233 id config.ComponentID 234 transport string 235 nextConsumer consumer.Traces 236 obsrecv *obsreport.Receiver 237} 238 239// EmitZipkinBatch is unsupported agent's 240func (h *agentHandler) EmitZipkinBatch(context.Context, []*zipkincore.Span) (err error) { 241 panic("unsupported receiver") 242} 243 244// EmitBatch implements thrift-gen/agent/Agent and it forwards 245// Jaeger spans received by the Jaeger agent processor. 246func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error { 247 ctx = h.obsrecv.StartTracesOp(ctx) 248 numSpans, err := consumeTraces(ctx, batch, h.nextConsumer) 249 h.obsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err) 250 return err 251} 252 253func (jr *jReceiver) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) { 254 return jr.agentSamplingManager.GetSamplingStrategy(ctx, serviceName) 255} 256 257func (jr *jReceiver) GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error) { 258 br, err := jr.agentSamplingManager.GetBaggageRestrictions(ctx, serviceName) 259 if err != nil { 260 // Baggage restrictions are not yet implemented - refer to - https://github.com/jaegertracing/jaeger/issues/373 261 // As of today, GetBaggageRestrictions() always returns an error. 262 // However, we `return nil, nil` here in order to serve a valid `200 OK` response. 263 return nil, nil 264 } 265 return br, nil 266} 267 268func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) { 269 if c, ok := client.FromGRPC(ctx); ok { 270 ctx = client.NewContext(ctx, c) 271 } 272 273 ctx = jr.grpcObsrecv.StartTracesOp(ctx) 274 275 td := jaegertranslator.ProtoBatchToInternalTraces(r.GetBatch()) 276 277 err := jr.nextConsumer.ConsumeTraces(ctx, td) 278 jr.grpcObsrecv.EndTracesOp(ctx, protobufFormat, len(r.GetBatch().Spans), err) 279 if err != nil { 280 return nil, err 281 } 282 283 return &api_v2.PostSpansResponse{}, nil 284} 285 286func (jr *jReceiver) startAgent(host component.Host) error { 287 if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() && !jr.agentHTTPEnabled() { 288 return nil 289 } 290 291 if jr.agentBinaryThriftEnabled() { 292 h := &agentHandler{ 293 id: jr.id, 294 transport: agentTransportBinary, 295 nextConsumer: jr.nextConsumer, 296 obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: jr.id, Transport: agentTransportBinary}), 297 } 298 processor, err := jr.buildProcessor(jr.agentBinaryThriftAddr(), jr.config.AgentBinaryThriftConfig, apacheThrift.NewTBinaryProtocolFactoryConf(nil), h) 299 if err != nil { 300 return err 301 } 302 jr.agentProcessors = append(jr.agentProcessors, processor) 303 } 304 305 if jr.agentCompactThriftEnabled() { 306 h := &agentHandler{ 307 id: jr.id, 308 transport: agentTransportCompact, 309 nextConsumer: jr.nextConsumer, 310 obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: jr.id, Transport: agentTransportCompact}), 311 } 312 processor, err := jr.buildProcessor(jr.agentCompactThriftAddr(), jr.config.AgentCompactThriftConfig, apacheThrift.NewTCompactProtocolFactoryConf(nil), h) 313 if err != nil { 314 return err 315 } 316 jr.agentProcessors = append(jr.agentProcessors, processor) 317 } 318 319 jr.goroutines.Add(len(jr.agentProcessors)) 320 for _, processor := range jr.agentProcessors { 321 go func(p processors.Processor) { 322 defer jr.goroutines.Done() 323 p.Serve() 324 }(processor) 325 } 326 327 // Start upstream grpc client before serving sampling endpoints over HTTP 328 if jr.config.RemoteSamplingClientSettings.Endpoint != "" { 329 grpcOpts, err := jr.config.RemoteSamplingClientSettings.ToDialOptions(host.GetExtensions()) 330 if err != nil { 331 jr.logger.Error("Error creating grpc dial options for remote sampling endpoint", zap.Error(err)) 332 return err 333 } 334 conn, err := grpc.Dial(jr.config.RemoteSamplingClientSettings.Endpoint, grpcOpts...) 335 if err != nil { 336 jr.logger.Error("Error creating grpc connection to jaeger remote sampling endpoint", zap.String("endpoint", jr.config.RemoteSamplingClientSettings.Endpoint)) 337 return err 338 } 339 340 jr.agentSamplingManager = jSamplingConfig.NewConfigManager(conn) 341 } 342 343 if jr.agentHTTPEnabled() { 344 jr.agentServer = httpserver.NewHTTPServer(jr.agentHTTPAddr(), jr, metrics.NullFactory) 345 346 jr.goroutines.Add(1) 347 go func() { 348 defer jr.goroutines.Done() 349 if err := jr.agentServer.ListenAndServe(); err != http.ErrServerClosed { 350 host.ReportFatalError(fmt.Errorf("jaeger agent server error: %w", err)) 351 } 352 }() 353 } 354 355 return nil 356} 357 358func (jr *jReceiver) buildProcessor(address string, cfg ServerConfigUDP, factory apacheThrift.TProtocolFactory, a agent.Agent) (processors.Processor, error) { 359 handler := agent.NewAgentProcessor(a) 360 transport, err := thriftudp.NewTUDPServerTransport(address) 361 if err != nil { 362 return nil, err 363 } 364 if cfg.SocketBufferSize > 0 { 365 if err = transport.SetSocketBufferSize(cfg.SocketBufferSize); err != nil { 366 return nil, err 367 } 368 } 369 server, err := servers.NewTBufferedServer(transport, cfg.QueueSize, cfg.MaxPacketSize, metrics.NullFactory) 370 if err != nil { 371 return nil, err 372 } 373 processor, err := processors.NewThriftProcessor(server, cfg.Workers, metrics.NullFactory, factory, handler, jr.logger) 374 if err != nil { 375 return nil, err 376 } 377 return processor, nil 378} 379 380func (jr *jReceiver) decodeThriftHTTPBody(r *http.Request) (*jaeger.Batch, *httpError) { 381 bodyBytes, err := ioutil.ReadAll(r.Body) 382 r.Body.Close() 383 if err != nil { 384 return nil, &httpError{ 385 handler.UnableToReadBodyErrFormat, 386 http.StatusInternalServerError, 387 } 388 } 389 390 contentType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")) 391 if err != nil { 392 return nil, &httpError{ 393 fmt.Sprintf("Cannot parse content type: %v", err), 394 http.StatusBadRequest, 395 } 396 } 397 if _, ok := acceptedThriftFormats[contentType]; !ok { 398 return nil, &httpError{ 399 fmt.Sprintf("Unsupported content type: %v", contentType), 400 http.StatusBadRequest, 401 } 402 } 403 404 tdes := apacheThrift.NewTDeserializer() 405 batch := &jaeger.Batch{} 406 if err = tdes.Read(r.Context(), batch, bodyBytes); err != nil { 407 return nil, &httpError{ 408 fmt.Sprintf(handler.UnableToReadBodyErrFormat, err), 409 http.StatusBadRequest, 410 } 411 } 412 return batch, nil 413} 414 415// HandleThriftHTTPBatch implements Jaeger HTTP Thrift handler. 416func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Request) { 417 ctx := r.Context() 418 if c, ok := client.FromHTTP(r); ok { 419 ctx = client.NewContext(ctx, c) 420 } 421 422 ctx = jr.httpObsrecv.StartTracesOp(ctx) 423 424 batch, hErr := jr.decodeThriftHTTPBody(r) 425 if hErr != nil { 426 http.Error(w, html.EscapeString(hErr.msg), hErr.statusCode) 427 jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, 0, hErr) 428 return 429 } 430 431 numSpans, err := consumeTraces(ctx, batch, jr.nextConsumer) 432 if err != nil { 433 http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError) 434 } else { 435 w.WriteHeader(http.StatusAccepted) 436 } 437 jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err) 438} 439 440func (jr *jReceiver) startCollector(host component.Host) error { 441 if !jr.collectorGRPCEnabled() && !jr.collectorHTTPEnabled() { 442 return nil 443 } 444 445 if jr.collectorHTTPEnabled() { 446 cln, cerr := jr.config.CollectorHTTPSettings.ToListener() 447 if cerr != nil { 448 return fmt.Errorf("failed to bind to Collector address %q: %v", 449 jr.config.CollectorHTTPSettings.Endpoint, cerr) 450 } 451 452 nr := mux.NewRouter() 453 nr.HandleFunc("/api/traces", jr.HandleThriftHTTPBatch).Methods(http.MethodPost) 454 jr.collectorServer = jr.config.CollectorHTTPSettings.ToServer(nr) 455 jr.goroutines.Add(1) 456 go func() { 457 defer jr.goroutines.Done() 458 if err := jr.collectorServer.Serve(cln); err != http.ErrServerClosed { 459 host.ReportFatalError(err) 460 } 461 }() 462 } 463 464 if jr.collectorGRPCEnabled() { 465 opts, err := jr.config.CollectorGRPCServerSettings.ToServerOption(host.GetExtensions()) 466 if err != nil { 467 return fmt.Errorf("failed to build the options for the Jaeger gRPC Collector: %v", err) 468 } 469 470 jr.grpc = grpc.NewServer(opts...) 471 gaddr := jr.collectorGRPCAddr() 472 gln, gerr := net.Listen("tcp", gaddr) 473 if gerr != nil { 474 return fmt.Errorf("failed to bind to gRPC address %q: %v", gaddr, gerr) 475 } 476 477 api_v2.RegisterCollectorServiceServer(jr.grpc, jr) 478 479 // init and register sampling strategy store 480 ss, gerr := staticStrategyStore.NewStrategyStore(staticStrategyStore.Options{ 481 StrategiesFile: jr.config.RemoteSamplingStrategyFile, 482 }, jr.logger) 483 if gerr != nil { 484 return fmt.Errorf("failed to create collector strategy store: %v", gerr) 485 } 486 api_v2.RegisterSamplingManagerServer(jr.grpc, collectorSampling.NewGRPCHandler(ss)) 487 488 jr.goroutines.Add(1) 489 go func() { 490 defer jr.goroutines.Done() 491 if err := jr.grpc.Serve(gln); err != nil && err != grpc.ErrServerStopped { 492 host.ReportFatalError(err) 493 } 494 }() 495 } 496 497 return nil 498} 499