1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zipkinv2
16
17import (
18	"encoding/json"
19	"errors"
20	"fmt"
21	"net"
22	"strconv"
23	"time"
24
25	zipkinmodel "github.com/openzipkin/zipkin-go/model"
26
27	"go.opentelemetry.io/collector/internal/idutils"
28	"go.opentelemetry.io/collector/model/pdata"
29	"go.opentelemetry.io/collector/translator/conventions"
30	tracetranslator "go.opentelemetry.io/collector/translator/trace"
31	"go.opentelemetry.io/collector/translator/trace/internal/zipkin"
32)
33
34const (
35	spanEventDataFormat = "%s|%s|%d"
36	spanLinkDataFormat  = "%s|%s|%s|%s|%d"
37)
38
39var (
40	sampled = true
41)
42
43// FromTranslator converts from pdata to Zipkin data model.
44type FromTranslator struct{}
45
46// FromTraces translates internal trace data into Zipkin v2 spans.
47// Returns a slice of Zipkin SpanModel's.
48func (t FromTranslator) FromTraces(td pdata.Traces) ([]*zipkinmodel.SpanModel, error) {
49	resourceSpans := td.ResourceSpans()
50	if resourceSpans.Len() == 0 {
51		return nil, nil
52	}
53
54	zSpans := make([]*zipkinmodel.SpanModel, 0, td.SpanCount())
55
56	for i := 0; i < resourceSpans.Len(); i++ {
57		batch, err := resourceSpansToZipkinSpans(resourceSpans.At(i), td.SpanCount()/resourceSpans.Len())
58		if err != nil {
59			return zSpans, err
60		}
61		if batch != nil {
62			zSpans = append(zSpans, batch...)
63		}
64	}
65
66	return zSpans, nil
67}
68
69func resourceSpansToZipkinSpans(rs pdata.ResourceSpans, estSpanCount int) ([]*zipkinmodel.SpanModel, error) {
70	resource := rs.Resource()
71	ilss := rs.InstrumentationLibrarySpans()
72
73	if resource.Attributes().Len() == 0 && ilss.Len() == 0 {
74		return nil, nil
75	}
76
77	localServiceName, zTags := resourceToZipkinEndpointServiceNameAndAttributeMap(resource)
78
79	zSpans := make([]*zipkinmodel.SpanModel, 0, estSpanCount)
80	for i := 0; i < ilss.Len(); i++ {
81		ils := ilss.At(i)
82		extractInstrumentationLibraryTags(ils.InstrumentationLibrary(), zTags)
83		spans := ils.Spans()
84		for j := 0; j < spans.Len(); j++ {
85			zSpan, err := spanToZipkinSpan(spans.At(j), localServiceName, zTags)
86			if err != nil {
87				return zSpans, err
88			}
89			zSpans = append(zSpans, zSpan)
90		}
91	}
92
93	return zSpans, nil
94}
95
96func extractInstrumentationLibraryTags(il pdata.InstrumentationLibrary, zTags map[string]string) {
97	if ilName := il.Name(); ilName != "" {
98		zTags[conventions.InstrumentationLibraryName] = ilName
99	}
100	if ilVer := il.Version(); ilVer != "" {
101		zTags[conventions.InstrumentationLibraryVersion] = ilVer
102	}
103}
104
105func spanToZipkinSpan(
106	span pdata.Span,
107	localServiceName string,
108	zTags map[string]string,
109) (*zipkinmodel.SpanModel, error) {
110
111	tags := aggregateSpanTags(span, zTags)
112
113	zs := &zipkinmodel.SpanModel{}
114
115	if span.TraceID().IsEmpty() {
116		return zs, errors.New("TraceID is invalid")
117	}
118	zs.TraceID = convertTraceID(span.TraceID())
119	if span.SpanID().IsEmpty() {
120		return zs, errors.New("SpanID is invalid")
121	}
122	zs.ID = convertSpanID(span.SpanID())
123
124	if len(span.TraceState()) > 0 {
125		tags[tracetranslator.TagW3CTraceState] = string(span.TraceState())
126	}
127
128	if !span.ParentSpanID().IsEmpty() {
129		id := convertSpanID(span.ParentSpanID())
130		zs.ParentID = &id
131	}
132
133	zs.Sampled = &sampled
134	zs.Name = span.Name()
135	startTime := span.StartTimestamp().AsTime()
136
137	// leave timestamp unset on zs (zipkin span) if
138	// otel span startTime is zero.  Zipkin has a
139	// case where startTime is not set on the span.
140	// See handling of this (and setting of otel span
141	// to unix time zero) in zipkinv2_to_traces.go
142	if startTime.Unix() != 0 {
143		zs.Timestamp = startTime
144	}
145
146	if span.EndTimestamp() != 0 {
147		zs.Duration = time.Duration(span.EndTimestamp() - span.StartTimestamp())
148	}
149	zs.Kind = spanKindToZipkinKind(span.Kind())
150	if span.Kind() == pdata.SpanKindInternal {
151		tags[tracetranslator.TagSpanKind] = "internal"
152	}
153
154	redundantKeys := make(map[string]bool, 8)
155	zs.LocalEndpoint = zipkinEndpointFromTags(tags, localServiceName, false, redundantKeys)
156	zs.RemoteEndpoint = zipkinEndpointFromTags(tags, "", true, redundantKeys)
157
158	removeRedundentTags(redundantKeys, tags)
159
160	status := span.Status()
161	tags[tracetranslator.TagStatusCode] = status.Code().String()
162	if status.Message() != "" {
163		tags[tracetranslator.TagStatusMsg] = status.Message()
164		if int32(status.Code()) > 0 {
165			zs.Err = fmt.Errorf("%s", status.Message())
166		}
167	}
168
169	if err := spanEventsToZipkinAnnotations(span.Events(), zs); err != nil {
170		return nil, err
171	}
172	if err := spanLinksToZipkinTags(span.Links(), tags); err != nil {
173		return nil, err
174	}
175
176	zs.Tags = tags
177
178	return zs, nil
179}
180
181func aggregateSpanTags(span pdata.Span, zTags map[string]string) map[string]string {
182	tags := make(map[string]string)
183	for key, val := range zTags {
184		tags[key] = val
185	}
186	spanTags := attributeMapToStringMap(span.Attributes())
187	for key, val := range spanTags {
188		tags[key] = val
189	}
190	return tags
191}
192
193func spanEventsToZipkinAnnotations(events pdata.SpanEventSlice, zs *zipkinmodel.SpanModel) error {
194	if events.Len() > 0 {
195		zAnnos := make([]zipkinmodel.Annotation, events.Len())
196		for i := 0; i < events.Len(); i++ {
197			event := events.At(i)
198			if event.Attributes().Len() == 0 && event.DroppedAttributesCount() == 0 {
199				zAnnos[i] = zipkinmodel.Annotation{
200					Timestamp: event.Timestamp().AsTime(),
201					Value:     event.Name(),
202				}
203			} else {
204				jsonStr, err := json.Marshal(tracetranslator.AttributeMapToMap(event.Attributes()))
205				if err != nil {
206					return err
207				}
208				zAnnos[i] = zipkinmodel.Annotation{
209					Timestamp: event.Timestamp().AsTime(),
210					Value: fmt.Sprintf(spanEventDataFormat, event.Name(), jsonStr,
211						event.DroppedAttributesCount()),
212				}
213			}
214		}
215		zs.Annotations = zAnnos
216	}
217	return nil
218}
219
220func spanLinksToZipkinTags(links pdata.SpanLinkSlice, zTags map[string]string) error {
221	for i := 0; i < links.Len(); i++ {
222		link := links.At(i)
223		key := fmt.Sprintf("otlp.link.%d", i)
224		jsonStr, err := json.Marshal(tracetranslator.AttributeMapToMap(link.Attributes()))
225		if err != nil {
226			return err
227		}
228		zTags[key] = fmt.Sprintf(spanLinkDataFormat, link.TraceID().HexString(),
229			link.SpanID().HexString(), link.TraceState(), jsonStr, link.DroppedAttributesCount())
230	}
231	return nil
232}
233
234func attributeMapToStringMap(attrMap pdata.AttributeMap) map[string]string {
235	rawMap := make(map[string]string)
236	attrMap.Range(func(k string, v pdata.AttributeValue) bool {
237		rawMap[k] = tracetranslator.AttributeValueToString(v)
238		return true
239	})
240	return rawMap
241}
242
243func removeRedundentTags(redundantKeys map[string]bool, zTags map[string]string) {
244	for k, v := range redundantKeys {
245		if v {
246			delete(zTags, k)
247		}
248	}
249}
250
251func resourceToZipkinEndpointServiceNameAndAttributeMap(
252	resource pdata.Resource,
253) (serviceName string, zTags map[string]string) {
254	zTags = make(map[string]string)
255	attrs := resource.Attributes()
256	if attrs.Len() == 0 {
257		return tracetranslator.ResourceNoServiceName, zTags
258	}
259
260	attrs.Range(func(k string, v pdata.AttributeValue) bool {
261		zTags[k] = tracetranslator.AttributeValueToString(v)
262		return true
263	})
264
265	serviceName = extractZipkinServiceName(zTags)
266	return serviceName, zTags
267}
268
269func extractZipkinServiceName(zTags map[string]string) string {
270	var serviceName string
271	if sn, ok := zTags[conventions.AttributeServiceName]; ok {
272		serviceName = sn
273		delete(zTags, conventions.AttributeServiceName)
274	} else if fn, ok := zTags[conventions.AttributeFaasName]; ok {
275		serviceName = fn
276		delete(zTags, conventions.AttributeFaasName)
277		zTags[zipkin.TagServiceNameSource] = conventions.AttributeFaasName
278	} else if fn, ok := zTags[conventions.AttributeK8sDeployment]; ok {
279		serviceName = fn
280		delete(zTags, conventions.AttributeK8sDeployment)
281		zTags[zipkin.TagServiceNameSource] = conventions.AttributeK8sDeployment
282	} else if fn, ok := zTags[conventions.AttributeProcessExecutableName]; ok {
283		serviceName = fn
284		delete(zTags, conventions.AttributeProcessExecutableName)
285		zTags[zipkin.TagServiceNameSource] = conventions.AttributeProcessExecutableName
286	} else {
287		serviceName = tracetranslator.ResourceNoServiceName
288	}
289	return serviceName
290}
291
292func spanKindToZipkinKind(kind pdata.SpanKind) zipkinmodel.Kind {
293	switch kind {
294	case pdata.SpanKindClient:
295		return zipkinmodel.Client
296	case pdata.SpanKindServer:
297		return zipkinmodel.Server
298	case pdata.SpanKindProducer:
299		return zipkinmodel.Producer
300	case pdata.SpanKindConsumer:
301		return zipkinmodel.Consumer
302	default:
303		return zipkinmodel.Undetermined
304	}
305}
306
307func zipkinEndpointFromTags(
308	zTags map[string]string,
309	localServiceName string,
310	remoteEndpoint bool,
311	redundantKeys map[string]bool,
312) (endpoint *zipkinmodel.Endpoint) {
313
314	serviceName := localServiceName
315	if peerSvc, ok := zTags[conventions.AttributePeerService]; ok && remoteEndpoint {
316		serviceName = peerSvc
317		redundantKeys[conventions.AttributePeerService] = true
318	}
319
320	var ipKey, portKey string
321	if remoteEndpoint {
322		ipKey, portKey = conventions.AttributeNetPeerIP, conventions.AttributeNetPeerPort
323	} else {
324		ipKey, portKey = conventions.AttributeNetHostIP, conventions.AttributeNetHostPort
325	}
326
327	var ip net.IP
328	ipv6Selected := false
329	if ipStr, ok := zTags[ipKey]; ok {
330		ipv6Selected = isIPv6Address(ipStr)
331		ip = net.ParseIP(ipStr)
332		redundantKeys[ipKey] = true
333	}
334
335	var port uint64
336	if portStr, ok := zTags[portKey]; ok {
337		port, _ = strconv.ParseUint(portStr, 10, 16)
338		redundantKeys[portKey] = true
339	}
340
341	if serviceName == "" && ip == nil {
342		return nil
343	}
344
345	zEndpoint := &zipkinmodel.Endpoint{
346		ServiceName: serviceName,
347		Port:        uint16(port),
348	}
349	if ipv6Selected {
350		zEndpoint.IPv6 = ip
351	} else {
352		zEndpoint.IPv4 = ip
353	}
354
355	return zEndpoint
356}
357
358func isIPv6Address(ipStr string) bool {
359	for i := 0; i < len(ipStr); i++ {
360		if ipStr[i] == ':' {
361			return true
362		}
363	}
364	return false
365}
366
367func convertTraceID(t pdata.TraceID) zipkinmodel.TraceID {
368	h, l := idutils.TraceIDToUInt64Pair(t)
369	return zipkinmodel.TraceID{High: h, Low: l}
370}
371
372func convertSpanID(s pdata.SpanID) zipkinmodel.ID {
373	return zipkinmodel.ID(idutils.SpanIDToUInt64(s))
374}
375