1// Unless explicitly stated otherwise all files in this repository are licensed 2// under the Apache License Version 2.0. 3// This product includes software developed at Datadog (https://www.datadoghq.com/). 4// Copyright 2016 Datadog, Inc. 5 6// Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go). 7package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka" 8 9import ( 10 "math" 11 "time" 12 13 "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" 14 "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" 15 "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" 16 "gopkg.in/DataDog/dd-trace-go.v1/internal/log" 17 18 "github.com/confluentinc/confluent-kafka-go/kafka" 19) 20 21// NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer. 22func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) { 23 c, err := kafka.NewConsumer(conf) 24 if err != nil { 25 return nil, err 26 } 27 return WrapConsumer(c, opts...), nil 28} 29 30// NewProducer calls kafka.NewProducer and wraps the resulting Producer. 31func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) { 32 p, err := kafka.NewProducer(conf) 33 if err != nil { 34 return nil, err 35 } 36 return WrapProducer(p, opts...), nil 37} 38 39// A Consumer wraps a kafka.Consumer. 40type Consumer struct { 41 *kafka.Consumer 42 cfg *config 43 events chan kafka.Event 44 prev ddtrace.Span 45} 46 47// WrapConsumer wraps a kafka.Consumer so that any consumed events are traced. 48func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer { 49 wrapped := &Consumer{ 50 Consumer: c, 51 cfg: newConfig(opts...), 52 } 53 log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Consumer: %#v", wrapped.cfg) 54 wrapped.events = wrapped.traceEventsChannel(c.Events()) 55 return wrapped 56} 57 58func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { 59 // in will be nil when consuming via the events channel is not enabled 60 if in == nil { 61 return nil 62 } 63 64 out := make(chan kafka.Event, 1) 65 go func() { 66 defer close(out) 67 for evt := range in { 68 var next ddtrace.Span 69 70 // only trace messages 71 if msg, ok := evt.(*kafka.Message); ok { 72 next = c.startSpan(msg) 73 } 74 75 out <- evt 76 77 if c.prev != nil { 78 c.prev.Finish() 79 } 80 c.prev = next 81 } 82 // finish any remaining span 83 if c.prev != nil { 84 c.prev.Finish() 85 c.prev = nil 86 } 87 }() 88 89 return out 90} 91 92func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span { 93 opts := []tracer.StartSpanOption{ 94 tracer.ServiceName(c.cfg.consumerServiceName), 95 tracer.ResourceName("Consume Topic " + *msg.TopicPartition.Topic), 96 tracer.SpanType(ext.SpanTypeMessageConsumer), 97 tracer.Tag("partition", msg.TopicPartition.Partition), 98 tracer.Tag("offset", msg.TopicPartition.Offset), 99 tracer.Measured(), 100 } 101 if !math.IsNaN(c.cfg.analyticsRate) { 102 opts = append(opts, tracer.Tag(ext.EventSampleRate, c.cfg.analyticsRate)) 103 } 104 // kafka supports headers, so try to extract a span context 105 carrier := NewMessageCarrier(msg) 106 if spanctx, err := tracer.Extract(carrier); err == nil { 107 opts = append(opts, tracer.ChildOf(spanctx)) 108 } 109 span, _ := tracer.StartSpanFromContext(c.cfg.ctx, "kafka.consume", opts...) 110 // reinject the span context so consumers can pick it up 111 tracer.Inject(span.Context(), carrier) 112 return span 113} 114 115// Close calls the underlying Consumer.Close and if polling is enabled, finishes 116// any remaining span. 117func (c *Consumer) Close() error { 118 err := c.Consumer.Close() 119 // we only close the previous span if consuming via the events channel is 120 // not enabled, because otherwise there would be a data race from the 121 // consuming goroutine. 122 if c.events == nil && c.prev != nil { 123 c.prev.Finish() 124 c.prev = nil 125 } 126 return err 127} 128 129// Events returns the kafka Events channel (if enabled). Message events will be 130// traced. 131func (c *Consumer) Events() chan kafka.Event { 132 return c.events 133} 134 135// Poll polls the consumer for messages or events. Message will be 136// traced. 137func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) { 138 if c.prev != nil { 139 c.prev.Finish() 140 c.prev = nil 141 } 142 evt := c.Consumer.Poll(timeoutMS) 143 if msg, ok := evt.(*kafka.Message); ok { 144 c.prev = c.startSpan(msg) 145 } 146 return evt 147} 148 149// ReadMessage polls the consumer for a message. Message will be traced. 150func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { 151 if c.prev != nil { 152 c.prev.Finish() 153 c.prev = nil 154 } 155 msg, err := c.Consumer.ReadMessage(timeout) 156 if err != nil { 157 return nil, err 158 } 159 c.prev = c.startSpan(msg) 160 return msg, nil 161} 162 163// A Producer wraps a kafka.Producer. 164type Producer struct { 165 *kafka.Producer 166 cfg *config 167 produceChannel chan *kafka.Message 168} 169 170// WrapProducer wraps a kafka.Producer so requests are traced. 171func WrapProducer(p *kafka.Producer, opts ...Option) *Producer { 172 wrapped := &Producer{ 173 Producer: p, 174 cfg: newConfig(opts...), 175 } 176 log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Producer: %#v", wrapped.cfg) 177 wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel()) 178 return wrapped 179} 180 181func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message { 182 if out == nil { 183 return out 184 } 185 186 in := make(chan *kafka.Message, 1) 187 go func() { 188 for msg := range in { 189 span := p.startSpan(msg) 190 out <- msg 191 span.Finish() 192 } 193 }() 194 195 return in 196} 197 198func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span { 199 opts := []tracer.StartSpanOption{ 200 tracer.ServiceName(p.cfg.producerServiceName), 201 tracer.ResourceName("Produce Topic " + *msg.TopicPartition.Topic), 202 tracer.SpanType(ext.SpanTypeMessageProducer), 203 tracer.Tag("partition", msg.TopicPartition.Partition), 204 } 205 if !math.IsNaN(p.cfg.analyticsRate) { 206 opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate)) 207 } 208 carrier := NewMessageCarrier(msg) 209 span, _ := tracer.StartSpanFromContext(p.cfg.ctx, "kafka.produce", opts...) 210 // inject the span context so consumers can pick it up 211 tracer.Inject(span.Context(), carrier) 212 return span 213} 214 215// Close calls the underlying Producer.Close and also closes the internal 216// wrapping producer channel. 217func (p *Producer) Close() { 218 close(p.produceChannel) 219 p.Producer.Close() 220} 221 222// Produce calls the underlying Producer.Produce and traces the request. 223func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error { 224 span := p.startSpan(msg) 225 226 // if the user has selected a delivery channel, we will wrap it and 227 // wait for the delivery event to finish the span 228 if deliveryChan != nil { 229 oldDeliveryChan := deliveryChan 230 deliveryChan = make(chan kafka.Event) 231 go func() { 232 var err error 233 evt := <-deliveryChan 234 if msg, ok := evt.(*kafka.Message); ok { 235 // delivery errors are returned via TopicPartition.Error 236 err = msg.TopicPartition.Error 237 } 238 span.Finish(tracer.WithError(err)) 239 oldDeliveryChan <- evt 240 }() 241 } 242 243 err := p.Producer.Produce(msg, deliveryChan) 244 // with no delivery channel, finish immediately 245 if deliveryChan == nil { 246 span.Finish(tracer.WithError(err)) 247 } 248 249 return err 250} 251 252// ProduceChannel returns a channel which can receive kafka Messages and will 253// send them to the underlying producer channel. 254func (p *Producer) ProduceChannel() chan *kafka.Message { 255 return p.produceChannel 256} 257