1package zipkintracer 2 3import ( 4 "encoding/binary" 5 "fmt" 6 "net" 7 "strconv" 8 "time" 9 10 otext "github.com/opentracing/opentracing-go/ext" 11 "github.com/opentracing/opentracing-go/log" 12 13 "github.com/openzipkin/zipkin-go-opentracing/flag" 14 "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" 15) 16 17var ( 18 // SpanKindResource will be regarded as a SA annotation by Zipkin. 19 SpanKindResource = otext.SpanKindEnum("resource") 20) 21 22// Recorder implements the SpanRecorder interface. 23type Recorder struct { 24 collector Collector 25 debug bool 26 endpoint *zipkincore.Endpoint 27 materializer func(logFields []log.Field) ([]byte, error) 28} 29 30// RecorderOption allows for functional options. 31type RecorderOption func(r *Recorder) 32 33// WithLogFmtMaterializer will convert OpenTracing Log fields to a LogFmt representation. 34func WithLogFmtMaterializer() RecorderOption { 35 return func(r *Recorder) { 36 r.materializer = MaterializeWithLogFmt 37 } 38} 39 40// WithJSONMaterializer will convert OpenTracing Log fields to a JSON representation. 41func WithJSONMaterializer() RecorderOption { 42 return func(r *Recorder) { 43 r.materializer = MaterializeWithJSON 44 } 45} 46 47// WithStrictMaterializer will only record event Log fields and discard the rest. 48func WithStrictMaterializer() RecorderOption { 49 return func(r *Recorder) { 50 r.materializer = StrictZipkinMaterializer 51 } 52} 53 54// NewRecorder creates a new Zipkin Recorder backed by the provided Collector. 55// 56// hostPort and serviceName allow you to set the default Zipkin endpoint 57// information which will be added to the application's standard core 58// annotations. hostPort will be resolved into an IPv4 and/or IPv6 address and 59// Port number, serviceName will be used as the application's service 60// identifier. 61// 62// If application does not listen for incoming requests or an endpoint Context 63// does not involve network address and/or port these cases can be solved like 64// this: 65// # port is not applicable: 66// NewRecorder(c, debug, "192.168.1.12:0", "ServiceA") 67// 68// # network address and port are not applicable: 69// NewRecorder(c, debug, "0.0.0.0:0", "ServiceB") 70func NewRecorder(c Collector, debug bool, hostPort, serviceName string, options ...RecorderOption) SpanRecorder { 71 r := &Recorder{ 72 collector: c, 73 debug: debug, 74 endpoint: makeEndpoint(hostPort, serviceName), 75 materializer: MaterializeWithLogFmt, 76 } 77 for _, opts := range options { 78 opts(r) 79 } 80 return r 81} 82 83// RecordSpan converts a RawSpan into the Zipkin representation of a span 84// and records it to the underlying collector. 85func (r *Recorder) RecordSpan(sp RawSpan) { 86 if !sp.Context.Sampled { 87 return 88 } 89 90 var parentSpanID *int64 91 if sp.Context.ParentSpanID != nil { 92 id := int64(*sp.Context.ParentSpanID) 93 parentSpanID = &id 94 } 95 96 var traceIDHigh *int64 97 if sp.Context.TraceID.High > 0 { 98 tidh := int64(sp.Context.TraceID.High) 99 traceIDHigh = &tidh 100 } 101 102 span := &zipkincore.Span{ 103 Name: sp.Operation, 104 ID: int64(sp.Context.SpanID), 105 TraceID: int64(sp.Context.TraceID.Low), 106 TraceIDHigh: traceIDHigh, 107 ParentID: parentSpanID, 108 Debug: r.debug || (sp.Context.Flags&flag.Debug == flag.Debug), 109 } 110 // only send timestamp and duration if this process owns the current span. 111 if sp.Context.Owner { 112 timestamp := sp.Start.UnixNano() / 1e3 113 duration := sp.Duration.Nanoseconds() / 1e3 114 // since we always time our spans we will round up to 1 microsecond if the 115 // span took less. 116 if duration == 0 { 117 duration = 1 118 } 119 span.Timestamp = ×tamp 120 span.Duration = &duration 121 } 122 if kind, ok := sp.Tags[string(otext.SpanKind)]; ok { 123 switch kind { 124 case otext.SpanKindRPCClient, otext.SpanKindRPCClientEnum: 125 annotate(span, sp.Start, zipkincore.CLIENT_SEND, r.endpoint) 126 annotate(span, sp.Start.Add(sp.Duration), zipkincore.CLIENT_RECV, r.endpoint) 127 case otext.SpanKindRPCServer, otext.SpanKindRPCServerEnum: 128 annotate(span, sp.Start, zipkincore.SERVER_RECV, r.endpoint) 129 annotate(span, sp.Start.Add(sp.Duration), zipkincore.SERVER_SEND, r.endpoint) 130 case SpanKindResource: 131 serviceName, ok := sp.Tags[string(otext.PeerService)] 132 if !ok { 133 serviceName = r.endpoint.GetServiceName() 134 } 135 host, ok := sp.Tags[string(otext.PeerHostname)].(string) 136 if !ok { 137 if r.endpoint.GetIpv4() > 0 { 138 ip := make([]byte, 4) 139 binary.BigEndian.PutUint32(ip, uint32(r.endpoint.GetIpv4())) 140 host = net.IP(ip).To4().String() 141 } else { 142 ip := r.endpoint.GetIpv6() 143 host = net.IP(ip).String() 144 } 145 } 146 var sPort string 147 port, ok := sp.Tags[string(otext.PeerPort)] 148 if !ok { 149 sPort = strconv.FormatInt(int64(r.endpoint.GetPort()), 10) 150 } else { 151 sPort = strconv.FormatInt(int64(port.(uint16)), 10) 152 } 153 re := makeEndpoint(net.JoinHostPort(host, sPort), serviceName.(string)) 154 if re != nil { 155 annotateBinary(span, zipkincore.SERVER_ADDR, serviceName, re) 156 } else { 157 fmt.Printf("endpoint creation failed: host: %q port: %q", host, sPort) 158 } 159 annotate(span, sp.Start, zipkincore.CLIENT_SEND, r.endpoint) 160 annotate(span, sp.Start.Add(sp.Duration), zipkincore.CLIENT_RECV, r.endpoint) 161 default: 162 annotateBinary(span, zipkincore.LOCAL_COMPONENT, r.endpoint.GetServiceName(), r.endpoint) 163 } 164 delete(sp.Tags, string(otext.SpanKind)) 165 } else { 166 annotateBinary(span, zipkincore.LOCAL_COMPONENT, r.endpoint.GetServiceName(), r.endpoint) 167 } 168 169 for key, value := range sp.Tags { 170 annotateBinary(span, key, value, r.endpoint) 171 } 172 173 for _, spLog := range sp.Logs { 174 if len(spLog.Fields) == 1 && spLog.Fields[0].Key() == "event" { 175 // proper Zipkin annotation 176 annotate(span, spLog.Timestamp, fmt.Sprintf("%+v", spLog.Fields[0].Value()), r.endpoint) 177 continue 178 } 179 // OpenTracing Log with key-value pair(s). Try to materialize using the 180 // materializer chosen for the recorder. 181 if logs, err := r.materializer(spLog.Fields); err != nil { 182 fmt.Printf("Materialization of OpenTracing LogFields failed: %+v", err) 183 } else { 184 annotate(span, spLog.Timestamp, string(logs), r.endpoint) 185 } 186 } 187 _ = r.collector.Collect(span) 188} 189 190// annotate annotates the span with the given value. 191func annotate(span *zipkincore.Span, timestamp time.Time, value string, host *zipkincore.Endpoint) { 192 if timestamp.IsZero() { 193 timestamp = time.Now() 194 } 195 span.Annotations = append(span.Annotations, &zipkincore.Annotation{ 196 Timestamp: timestamp.UnixNano() / 1e3, 197 Value: value, 198 Host: host, 199 }) 200} 201 202// annotateBinary annotates the span with a key and a value that will be []byte 203// encoded. 204func annotateBinary(span *zipkincore.Span, key string, value interface{}, host *zipkincore.Endpoint) { 205 if b, ok := value.(bool); ok { 206 if b { 207 value = "true" 208 } else { 209 value = "false" 210 } 211 } 212 span.BinaryAnnotations = append(span.BinaryAnnotations, &zipkincore.BinaryAnnotation{ 213 Key: key, 214 Value: []byte(fmt.Sprintf("%+v", value)), 215 AnnotationType: zipkincore.AnnotationType_STRING, 216 Host: host, 217 }) 218} 219