1// Unless explicitly stated otherwise all files in this repository are licensed
2// under the Apache License Version 2.0.
3// This product includes software developed at Datadog (https://www.datadoghq.com/).
4// Copyright 2016 Datadog, Inc.
5
6// Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).
7package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka"
8
9import (
10	"math"
11	"time"
12
13	"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
14	"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
15	"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
16	"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
17
18	"github.com/confluentinc/confluent-kafka-go/kafka"
19)
20
21// NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
22func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
23	c, err := kafka.NewConsumer(conf)
24	if err != nil {
25		return nil, err
26	}
27	return WrapConsumer(c, opts...), nil
28}
29
30// NewProducer calls kafka.NewProducer and wraps the resulting Producer.
31func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
32	p, err := kafka.NewProducer(conf)
33	if err != nil {
34		return nil, err
35	}
36	return WrapProducer(p, opts...), nil
37}
38
39// A Consumer wraps a kafka.Consumer.
40type Consumer struct {
41	*kafka.Consumer
42	cfg    *config
43	events chan kafka.Event
44	prev   ddtrace.Span
45}
46
47// WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
48func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
49	wrapped := &Consumer{
50		Consumer: c,
51		cfg:      newConfig(opts...),
52	}
53	log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Consumer: %#v", wrapped.cfg)
54	wrapped.events = wrapped.traceEventsChannel(c.Events())
55	return wrapped
56}
57
58func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
59	// in will be nil when consuming via the events channel is not enabled
60	if in == nil {
61		return nil
62	}
63
64	out := make(chan kafka.Event, 1)
65	go func() {
66		defer close(out)
67		for evt := range in {
68			var next ddtrace.Span
69
70			// only trace messages
71			if msg, ok := evt.(*kafka.Message); ok {
72				next = c.startSpan(msg)
73			}
74
75			out <- evt
76
77			if c.prev != nil {
78				c.prev.Finish()
79			}
80			c.prev = next
81		}
82		// finish any remaining span
83		if c.prev != nil {
84			c.prev.Finish()
85			c.prev = nil
86		}
87	}()
88
89	return out
90}
91
92func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span {
93	opts := []tracer.StartSpanOption{
94		tracer.ServiceName(c.cfg.consumerServiceName),
95		tracer.ResourceName("Consume Topic " + *msg.TopicPartition.Topic),
96		tracer.SpanType(ext.SpanTypeMessageConsumer),
97		tracer.Tag("partition", msg.TopicPartition.Partition),
98		tracer.Tag("offset", msg.TopicPartition.Offset),
99		tracer.Measured(),
100	}
101	if !math.IsNaN(c.cfg.analyticsRate) {
102		opts = append(opts, tracer.Tag(ext.EventSampleRate, c.cfg.analyticsRate))
103	}
104	// kafka supports headers, so try to extract a span context
105	carrier := NewMessageCarrier(msg)
106	if spanctx, err := tracer.Extract(carrier); err == nil {
107		opts = append(opts, tracer.ChildOf(spanctx))
108	}
109	span, _ := tracer.StartSpanFromContext(c.cfg.ctx, "kafka.consume", opts...)
110	// reinject the span context so consumers can pick it up
111	tracer.Inject(span.Context(), carrier)
112	return span
113}
114
115// Close calls the underlying Consumer.Close and if polling is enabled, finishes
116// any remaining span.
117func (c *Consumer) Close() error {
118	err := c.Consumer.Close()
119	// we only close the previous span if consuming via the events channel is
120	// not enabled, because otherwise there would be a data race from the
121	// consuming goroutine.
122	if c.events == nil && c.prev != nil {
123		c.prev.Finish()
124		c.prev = nil
125	}
126	return err
127}
128
129// Events returns the kafka Events channel (if enabled). Message events will be
130// traced.
131func (c *Consumer) Events() chan kafka.Event {
132	return c.events
133}
134
135// Poll polls the consumer for messages or events. Message will be
136// traced.
137func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
138	if c.prev != nil {
139		c.prev.Finish()
140		c.prev = nil
141	}
142	evt := c.Consumer.Poll(timeoutMS)
143	if msg, ok := evt.(*kafka.Message); ok {
144		c.prev = c.startSpan(msg)
145	}
146	return evt
147}
148
149// ReadMessage polls the consumer for a message. Message will be traced.
150func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
151	if c.prev != nil {
152		c.prev.Finish()
153		c.prev = nil
154	}
155	msg, err := c.Consumer.ReadMessage(timeout)
156	if err != nil {
157		return nil, err
158	}
159	c.prev = c.startSpan(msg)
160	return msg, nil
161}
162
163// A Producer wraps a kafka.Producer.
164type Producer struct {
165	*kafka.Producer
166	cfg            *config
167	produceChannel chan *kafka.Message
168}
169
170// WrapProducer wraps a kafka.Producer so requests are traced.
171func WrapProducer(p *kafka.Producer, opts ...Option) *Producer {
172	wrapped := &Producer{
173		Producer: p,
174		cfg:      newConfig(opts...),
175	}
176	log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Producer: %#v", wrapped.cfg)
177	wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel())
178	return wrapped
179}
180
181func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message {
182	if out == nil {
183		return out
184	}
185
186	in := make(chan *kafka.Message, 1)
187	go func() {
188		for msg := range in {
189			span := p.startSpan(msg)
190			out <- msg
191			span.Finish()
192		}
193	}()
194
195	return in
196}
197
198func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span {
199	opts := []tracer.StartSpanOption{
200		tracer.ServiceName(p.cfg.producerServiceName),
201		tracer.ResourceName("Produce Topic " + *msg.TopicPartition.Topic),
202		tracer.SpanType(ext.SpanTypeMessageProducer),
203		tracer.Tag("partition", msg.TopicPartition.Partition),
204	}
205	if !math.IsNaN(p.cfg.analyticsRate) {
206		opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate))
207	}
208	carrier := NewMessageCarrier(msg)
209	span, _ := tracer.StartSpanFromContext(p.cfg.ctx, "kafka.produce", opts...)
210	// inject the span context so consumers can pick it up
211	tracer.Inject(span.Context(), carrier)
212	return span
213}
214
215// Close calls the underlying Producer.Close and also closes the internal
216// wrapping producer channel.
217func (p *Producer) Close() {
218	close(p.produceChannel)
219	p.Producer.Close()
220}
221
222// Produce calls the underlying Producer.Produce and traces the request.
223func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
224	span := p.startSpan(msg)
225
226	// if the user has selected a delivery channel, we will wrap it and
227	// wait for the delivery event to finish the span
228	if deliveryChan != nil {
229		oldDeliveryChan := deliveryChan
230		deliveryChan = make(chan kafka.Event)
231		go func() {
232			var err error
233			evt := <-deliveryChan
234			if msg, ok := evt.(*kafka.Message); ok {
235				// delivery errors are returned via TopicPartition.Error
236				err = msg.TopicPartition.Error
237			}
238			span.Finish(tracer.WithError(err))
239			oldDeliveryChan <- evt
240		}()
241	}
242
243	err := p.Producer.Produce(msg, deliveryChan)
244	// with no delivery channel, finish immediately
245	if deliveryChan == nil {
246		span.Finish(tracer.WithError(err))
247	}
248
249	return err
250}
251
252// ProduceChannel returns a channel which can receive kafka Messages and will
253// send them to the underlying producer channel.
254func (p *Producer) ProduceChannel() chan *kafka.Message {
255	return p.produceChannel
256}
257