1package zipkintracer
2
3import (
4	"encoding/binary"
5	"fmt"
6	"net"
7	"strconv"
8	"time"
9
10	otext "github.com/opentracing/opentracing-go/ext"
11	"github.com/opentracing/opentracing-go/log"
12
13	"github.com/openzipkin/zipkin-go-opentracing/flag"
14	"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
15)
16
17var (
18	// SpanKindResource will be regarded as a SA annotation by Zipkin.
19	SpanKindResource = otext.SpanKindEnum("resource")
20)
21
22// Recorder implements the SpanRecorder interface.
23type Recorder struct {
24	collector    Collector
25	debug        bool
26	endpoint     *zipkincore.Endpoint
27	materializer func(logFields []log.Field) ([]byte, error)
28}
29
30// RecorderOption allows for functional options.
31type RecorderOption func(r *Recorder)
32
33// WithLogFmtMaterializer will convert OpenTracing Log fields to a LogFmt representation.
34func WithLogFmtMaterializer() RecorderOption {
35	return func(r *Recorder) {
36		r.materializer = MaterializeWithLogFmt
37	}
38}
39
40// WithJSONMaterializer will convert OpenTracing Log fields to a JSON representation.
41func WithJSONMaterializer() RecorderOption {
42	return func(r *Recorder) {
43		r.materializer = MaterializeWithJSON
44	}
45}
46
47// WithStrictMaterializer will only record event Log fields and discard the rest.
48func WithStrictMaterializer() RecorderOption {
49	return func(r *Recorder) {
50		r.materializer = StrictZipkinMaterializer
51	}
52}
53
54// NewRecorder creates a new Zipkin Recorder backed by the provided Collector.
55//
56// hostPort and serviceName allow you to set the default Zipkin endpoint
57// information which will be added to the application's standard core
58// annotations. hostPort will be resolved into an IPv4 and/or IPv6 address and
59// Port number, serviceName will be used as the application's service
60// identifier.
61//
62// If application does not listen for incoming requests or an endpoint Context
63// does not involve network address and/or port these cases can be solved like
64// this:
65//  # port is not applicable:
66//  NewRecorder(c, debug, "192.168.1.12:0", "ServiceA")
67//
68//  # network address and port are not applicable:
69//  NewRecorder(c, debug, "0.0.0.0:0", "ServiceB")
70func NewRecorder(c Collector, debug bool, hostPort, serviceName string, options ...RecorderOption) SpanRecorder {
71	r := &Recorder{
72		collector:    c,
73		debug:        debug,
74		endpoint:     makeEndpoint(hostPort, serviceName),
75		materializer: MaterializeWithLogFmt,
76	}
77	for _, opts := range options {
78		opts(r)
79	}
80	return r
81}
82
83// RecordSpan converts a RawSpan into the Zipkin representation of a span
84// and records it to the underlying collector.
85func (r *Recorder) RecordSpan(sp RawSpan) {
86	if !sp.Context.Sampled {
87		return
88	}
89
90	var parentSpanID *int64
91	if sp.Context.ParentSpanID != nil {
92		id := int64(*sp.Context.ParentSpanID)
93		parentSpanID = &id
94	}
95
96	var traceIDHigh *int64
97	if sp.Context.TraceID.High > 0 {
98		tidh := int64(sp.Context.TraceID.High)
99		traceIDHigh = &tidh
100	}
101
102	span := &zipkincore.Span{
103		Name:        sp.Operation,
104		ID:          int64(sp.Context.SpanID),
105		TraceID:     int64(sp.Context.TraceID.Low),
106		TraceIDHigh: traceIDHigh,
107		ParentID:    parentSpanID,
108		Debug:       r.debug || (sp.Context.Flags&flag.Debug == flag.Debug),
109	}
110	// only send timestamp and duration if this process owns the current span.
111	if sp.Context.Owner {
112		timestamp := sp.Start.UnixNano() / 1e3
113		duration := sp.Duration.Nanoseconds() / 1e3
114		// since we always time our spans we will round up to 1 microsecond if the
115		// span took less.
116		if duration == 0 {
117			duration = 1
118		}
119		span.Timestamp = &timestamp
120		span.Duration = &duration
121	}
122	if kind, ok := sp.Tags[string(otext.SpanKind)]; ok {
123		switch kind {
124		case otext.SpanKindRPCClient, otext.SpanKindRPCClientEnum:
125			annotate(span, sp.Start, zipkincore.CLIENT_SEND, r.endpoint)
126			annotate(span, sp.Start.Add(sp.Duration), zipkincore.CLIENT_RECV, r.endpoint)
127		case otext.SpanKindRPCServer, otext.SpanKindRPCServerEnum:
128			annotate(span, sp.Start, zipkincore.SERVER_RECV, r.endpoint)
129			annotate(span, sp.Start.Add(sp.Duration), zipkincore.SERVER_SEND, r.endpoint)
130		case SpanKindResource:
131			serviceName, ok := sp.Tags[string(otext.PeerService)]
132			if !ok {
133				serviceName = r.endpoint.GetServiceName()
134			}
135			host, ok := sp.Tags[string(otext.PeerHostname)].(string)
136			if !ok {
137				if r.endpoint.GetIpv4() > 0 {
138					ip := make([]byte, 4)
139					binary.BigEndian.PutUint32(ip, uint32(r.endpoint.GetIpv4()))
140					host = net.IP(ip).To4().String()
141				} else {
142					ip := r.endpoint.GetIpv6()
143					host = net.IP(ip).String()
144				}
145			}
146			var sPort string
147			port, ok := sp.Tags[string(otext.PeerPort)]
148			if !ok {
149				sPort = strconv.FormatInt(int64(r.endpoint.GetPort()), 10)
150			} else {
151				sPort = strconv.FormatInt(int64(port.(uint16)), 10)
152			}
153			re := makeEndpoint(net.JoinHostPort(host, sPort), serviceName.(string))
154			if re != nil {
155				annotateBinary(span, zipkincore.SERVER_ADDR, serviceName, re)
156			} else {
157				fmt.Printf("endpoint creation failed: host: %q port: %q", host, sPort)
158			}
159			annotate(span, sp.Start, zipkincore.CLIENT_SEND, r.endpoint)
160			annotate(span, sp.Start.Add(sp.Duration), zipkincore.CLIENT_RECV, r.endpoint)
161		default:
162			annotateBinary(span, zipkincore.LOCAL_COMPONENT, r.endpoint.GetServiceName(), r.endpoint)
163		}
164		delete(sp.Tags, string(otext.SpanKind))
165	} else {
166		annotateBinary(span, zipkincore.LOCAL_COMPONENT, r.endpoint.GetServiceName(), r.endpoint)
167	}
168
169	for key, value := range sp.Tags {
170		annotateBinary(span, key, value, r.endpoint)
171	}
172
173	for _, spLog := range sp.Logs {
174		if len(spLog.Fields) == 1 && spLog.Fields[0].Key() == "event" {
175			// proper Zipkin annotation
176			annotate(span, spLog.Timestamp, fmt.Sprintf("%+v", spLog.Fields[0].Value()), r.endpoint)
177			continue
178		}
179		// OpenTracing Log with key-value pair(s). Try to materialize using the
180		// materializer chosen for the recorder.
181		if logs, err := r.materializer(spLog.Fields); err != nil {
182			fmt.Printf("Materialization of OpenTracing LogFields failed: %+v", err)
183		} else {
184			annotate(span, spLog.Timestamp, string(logs), r.endpoint)
185		}
186	}
187	_ = r.collector.Collect(span)
188}
189
190// annotate annotates the span with the given value.
191func annotate(span *zipkincore.Span, timestamp time.Time, value string, host *zipkincore.Endpoint) {
192	if timestamp.IsZero() {
193		timestamp = time.Now()
194	}
195	span.Annotations = append(span.Annotations, &zipkincore.Annotation{
196		Timestamp: timestamp.UnixNano() / 1e3,
197		Value:     value,
198		Host:      host,
199	})
200}
201
202// annotateBinary annotates the span with a key and a value that will be []byte
203// encoded.
204func annotateBinary(span *zipkincore.Span, key string, value interface{}, host *zipkincore.Endpoint) {
205	if b, ok := value.(bool); ok {
206		if b {
207			value = "true"
208		} else {
209			value = "false"
210		}
211	}
212	span.BinaryAnnotations = append(span.BinaryAnnotations, &zipkincore.BinaryAnnotation{
213		Key:            key,
214		Value:          []byte(fmt.Sprintf("%+v", value)),
215		AnnotationType: zipkincore.AnnotationType_STRING,
216		Host:           host,
217	})
218}
219