1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package zipkinv2 16 17import ( 18 "encoding/json" 19 "errors" 20 "fmt" 21 "net" 22 "strconv" 23 "time" 24 25 zipkinmodel "github.com/openzipkin/zipkin-go/model" 26 27 "go.opentelemetry.io/collector/internal/idutils" 28 "go.opentelemetry.io/collector/model/pdata" 29 "go.opentelemetry.io/collector/translator/conventions" 30 tracetranslator "go.opentelemetry.io/collector/translator/trace" 31 "go.opentelemetry.io/collector/translator/trace/internal/zipkin" 32) 33 34const ( 35 spanEventDataFormat = "%s|%s|%d" 36 spanLinkDataFormat = "%s|%s|%s|%s|%d" 37) 38 39var ( 40 sampled = true 41) 42 43// FromTranslator converts from pdata to Zipkin data model. 44type FromTranslator struct{} 45 46// FromTraces translates internal trace data into Zipkin v2 spans. 47// Returns a slice of Zipkin SpanModel's. 48func (t FromTranslator) FromTraces(td pdata.Traces) ([]*zipkinmodel.SpanModel, error) { 49 resourceSpans := td.ResourceSpans() 50 if resourceSpans.Len() == 0 { 51 return nil, nil 52 } 53 54 zSpans := make([]*zipkinmodel.SpanModel, 0, td.SpanCount()) 55 56 for i := 0; i < resourceSpans.Len(); i++ { 57 batch, err := resourceSpansToZipkinSpans(resourceSpans.At(i), td.SpanCount()/resourceSpans.Len()) 58 if err != nil { 59 return zSpans, err 60 } 61 if batch != nil { 62 zSpans = append(zSpans, batch...) 63 } 64 } 65 66 return zSpans, nil 67} 68 69func resourceSpansToZipkinSpans(rs pdata.ResourceSpans, estSpanCount int) ([]*zipkinmodel.SpanModel, error) { 70 resource := rs.Resource() 71 ilss := rs.InstrumentationLibrarySpans() 72 73 if resource.Attributes().Len() == 0 && ilss.Len() == 0 { 74 return nil, nil 75 } 76 77 localServiceName, zTags := resourceToZipkinEndpointServiceNameAndAttributeMap(resource) 78 79 zSpans := make([]*zipkinmodel.SpanModel, 0, estSpanCount) 80 for i := 0; i < ilss.Len(); i++ { 81 ils := ilss.At(i) 82 extractInstrumentationLibraryTags(ils.InstrumentationLibrary(), zTags) 83 spans := ils.Spans() 84 for j := 0; j < spans.Len(); j++ { 85 zSpan, err := spanToZipkinSpan(spans.At(j), localServiceName, zTags) 86 if err != nil { 87 return zSpans, err 88 } 89 zSpans = append(zSpans, zSpan) 90 } 91 } 92 93 return zSpans, nil 94} 95 96func extractInstrumentationLibraryTags(il pdata.InstrumentationLibrary, zTags map[string]string) { 97 if ilName := il.Name(); ilName != "" { 98 zTags[conventions.InstrumentationLibraryName] = ilName 99 } 100 if ilVer := il.Version(); ilVer != "" { 101 zTags[conventions.InstrumentationLibraryVersion] = ilVer 102 } 103} 104 105func spanToZipkinSpan( 106 span pdata.Span, 107 localServiceName string, 108 zTags map[string]string, 109) (*zipkinmodel.SpanModel, error) { 110 111 tags := aggregateSpanTags(span, zTags) 112 113 zs := &zipkinmodel.SpanModel{} 114 115 if span.TraceID().IsEmpty() { 116 return zs, errors.New("TraceID is invalid") 117 } 118 zs.TraceID = convertTraceID(span.TraceID()) 119 if span.SpanID().IsEmpty() { 120 return zs, errors.New("SpanID is invalid") 121 } 122 zs.ID = convertSpanID(span.SpanID()) 123 124 if len(span.TraceState()) > 0 { 125 tags[tracetranslator.TagW3CTraceState] = string(span.TraceState()) 126 } 127 128 if !span.ParentSpanID().IsEmpty() { 129 id := convertSpanID(span.ParentSpanID()) 130 zs.ParentID = &id 131 } 132 133 zs.Sampled = &sampled 134 zs.Name = span.Name() 135 startTime := span.StartTimestamp().AsTime() 136 137 // leave timestamp unset on zs (zipkin span) if 138 // otel span startTime is zero. Zipkin has a 139 // case where startTime is not set on the span. 140 // See handling of this (and setting of otel span 141 // to unix time zero) in zipkinv2_to_traces.go 142 if startTime.Unix() != 0 { 143 zs.Timestamp = startTime 144 } 145 146 if span.EndTimestamp() != 0 { 147 zs.Duration = time.Duration(span.EndTimestamp() - span.StartTimestamp()) 148 } 149 zs.Kind = spanKindToZipkinKind(span.Kind()) 150 if span.Kind() == pdata.SpanKindInternal { 151 tags[tracetranslator.TagSpanKind] = "internal" 152 } 153 154 redundantKeys := make(map[string]bool, 8) 155 zs.LocalEndpoint = zipkinEndpointFromTags(tags, localServiceName, false, redundantKeys) 156 zs.RemoteEndpoint = zipkinEndpointFromTags(tags, "", true, redundantKeys) 157 158 removeRedundentTags(redundantKeys, tags) 159 160 status := span.Status() 161 tags[tracetranslator.TagStatusCode] = status.Code().String() 162 if status.Message() != "" { 163 tags[tracetranslator.TagStatusMsg] = status.Message() 164 if int32(status.Code()) > 0 { 165 zs.Err = fmt.Errorf("%s", status.Message()) 166 } 167 } 168 169 if err := spanEventsToZipkinAnnotations(span.Events(), zs); err != nil { 170 return nil, err 171 } 172 if err := spanLinksToZipkinTags(span.Links(), tags); err != nil { 173 return nil, err 174 } 175 176 zs.Tags = tags 177 178 return zs, nil 179} 180 181func aggregateSpanTags(span pdata.Span, zTags map[string]string) map[string]string { 182 tags := make(map[string]string) 183 for key, val := range zTags { 184 tags[key] = val 185 } 186 spanTags := attributeMapToStringMap(span.Attributes()) 187 for key, val := range spanTags { 188 tags[key] = val 189 } 190 return tags 191} 192 193func spanEventsToZipkinAnnotations(events pdata.SpanEventSlice, zs *zipkinmodel.SpanModel) error { 194 if events.Len() > 0 { 195 zAnnos := make([]zipkinmodel.Annotation, events.Len()) 196 for i := 0; i < events.Len(); i++ { 197 event := events.At(i) 198 if event.Attributes().Len() == 0 && event.DroppedAttributesCount() == 0 { 199 zAnnos[i] = zipkinmodel.Annotation{ 200 Timestamp: event.Timestamp().AsTime(), 201 Value: event.Name(), 202 } 203 } else { 204 jsonStr, err := json.Marshal(tracetranslator.AttributeMapToMap(event.Attributes())) 205 if err != nil { 206 return err 207 } 208 zAnnos[i] = zipkinmodel.Annotation{ 209 Timestamp: event.Timestamp().AsTime(), 210 Value: fmt.Sprintf(spanEventDataFormat, event.Name(), jsonStr, 211 event.DroppedAttributesCount()), 212 } 213 } 214 } 215 zs.Annotations = zAnnos 216 } 217 return nil 218} 219 220func spanLinksToZipkinTags(links pdata.SpanLinkSlice, zTags map[string]string) error { 221 for i := 0; i < links.Len(); i++ { 222 link := links.At(i) 223 key := fmt.Sprintf("otlp.link.%d", i) 224 jsonStr, err := json.Marshal(tracetranslator.AttributeMapToMap(link.Attributes())) 225 if err != nil { 226 return err 227 } 228 zTags[key] = fmt.Sprintf(spanLinkDataFormat, link.TraceID().HexString(), 229 link.SpanID().HexString(), link.TraceState(), jsonStr, link.DroppedAttributesCount()) 230 } 231 return nil 232} 233 234func attributeMapToStringMap(attrMap pdata.AttributeMap) map[string]string { 235 rawMap := make(map[string]string) 236 attrMap.Range(func(k string, v pdata.AttributeValue) bool { 237 rawMap[k] = tracetranslator.AttributeValueToString(v) 238 return true 239 }) 240 return rawMap 241} 242 243func removeRedundentTags(redundantKeys map[string]bool, zTags map[string]string) { 244 for k, v := range redundantKeys { 245 if v { 246 delete(zTags, k) 247 } 248 } 249} 250 251func resourceToZipkinEndpointServiceNameAndAttributeMap( 252 resource pdata.Resource, 253) (serviceName string, zTags map[string]string) { 254 zTags = make(map[string]string) 255 attrs := resource.Attributes() 256 if attrs.Len() == 0 { 257 return tracetranslator.ResourceNoServiceName, zTags 258 } 259 260 attrs.Range(func(k string, v pdata.AttributeValue) bool { 261 zTags[k] = tracetranslator.AttributeValueToString(v) 262 return true 263 }) 264 265 serviceName = extractZipkinServiceName(zTags) 266 return serviceName, zTags 267} 268 269func extractZipkinServiceName(zTags map[string]string) string { 270 var serviceName string 271 if sn, ok := zTags[conventions.AttributeServiceName]; ok { 272 serviceName = sn 273 delete(zTags, conventions.AttributeServiceName) 274 } else if fn, ok := zTags[conventions.AttributeFaasName]; ok { 275 serviceName = fn 276 delete(zTags, conventions.AttributeFaasName) 277 zTags[zipkin.TagServiceNameSource] = conventions.AttributeFaasName 278 } else if fn, ok := zTags[conventions.AttributeK8sDeployment]; ok { 279 serviceName = fn 280 delete(zTags, conventions.AttributeK8sDeployment) 281 zTags[zipkin.TagServiceNameSource] = conventions.AttributeK8sDeployment 282 } else if fn, ok := zTags[conventions.AttributeProcessExecutableName]; ok { 283 serviceName = fn 284 delete(zTags, conventions.AttributeProcessExecutableName) 285 zTags[zipkin.TagServiceNameSource] = conventions.AttributeProcessExecutableName 286 } else { 287 serviceName = tracetranslator.ResourceNoServiceName 288 } 289 return serviceName 290} 291 292func spanKindToZipkinKind(kind pdata.SpanKind) zipkinmodel.Kind { 293 switch kind { 294 case pdata.SpanKindClient: 295 return zipkinmodel.Client 296 case pdata.SpanKindServer: 297 return zipkinmodel.Server 298 case pdata.SpanKindProducer: 299 return zipkinmodel.Producer 300 case pdata.SpanKindConsumer: 301 return zipkinmodel.Consumer 302 default: 303 return zipkinmodel.Undetermined 304 } 305} 306 307func zipkinEndpointFromTags( 308 zTags map[string]string, 309 localServiceName string, 310 remoteEndpoint bool, 311 redundantKeys map[string]bool, 312) (endpoint *zipkinmodel.Endpoint) { 313 314 serviceName := localServiceName 315 if peerSvc, ok := zTags[conventions.AttributePeerService]; ok && remoteEndpoint { 316 serviceName = peerSvc 317 redundantKeys[conventions.AttributePeerService] = true 318 } 319 320 var ipKey, portKey string 321 if remoteEndpoint { 322 ipKey, portKey = conventions.AttributeNetPeerIP, conventions.AttributeNetPeerPort 323 } else { 324 ipKey, portKey = conventions.AttributeNetHostIP, conventions.AttributeNetHostPort 325 } 326 327 var ip net.IP 328 ipv6Selected := false 329 if ipStr, ok := zTags[ipKey]; ok { 330 ipv6Selected = isIPv6Address(ipStr) 331 ip = net.ParseIP(ipStr) 332 redundantKeys[ipKey] = true 333 } 334 335 var port uint64 336 if portStr, ok := zTags[portKey]; ok { 337 port, _ = strconv.ParseUint(portStr, 10, 16) 338 redundantKeys[portKey] = true 339 } 340 341 if serviceName == "" && ip == nil { 342 return nil 343 } 344 345 zEndpoint := &zipkinmodel.Endpoint{ 346 ServiceName: serviceName, 347 Port: uint16(port), 348 } 349 if ipv6Selected { 350 zEndpoint.IPv6 = ip 351 } else { 352 zEndpoint.IPv4 = ip 353 } 354 355 return zEndpoint 356} 357 358func isIPv6Address(ipStr string) bool { 359 for i := 0; i < len(ipStr); i++ { 360 if ipStr[i] == ':' { 361 return true 362 } 363 } 364 return false 365} 366 367func convertTraceID(t pdata.TraceID) zipkinmodel.TraceID { 368 h, l := idutils.TraceIDToUInt64Pair(t) 369 return zipkinmodel.TraceID{High: h, Low: l} 370} 371 372func convertSpanID(s pdata.SpanID) zipkinmodel.ID { 373 return zipkinmodel.ID(idutils.SpanIDToUInt64(s)) 374} 375