1import json 2from typing import Any 3from typing import Dict 4from typing import List 5from typing import Optional 6from typing import TYPE_CHECKING 7 8from ._encoding import ListStringTable 9from ._encoding import MsgpackEncoderV03 10from ._encoding import MsgpackEncoderV05 11from .logger import get_logger 12 13 14__all__ = ["MsgpackEncoderV03", "MsgpackEncoderV05", "ListStringTable", "MSGPACK_ENCODERS"] 15 16 17if TYPE_CHECKING: 18 from ..span import Span 19 20 21log = get_logger(__name__) 22 23 24class _EncoderBase(object): 25 """ 26 Encoder interface that provides the logic to encode traces and service. 27 """ 28 29 def encode_traces(self, traces): 30 # type: (List[List[Span]]) -> str 31 """ 32 Encodes a list of traces, expecting a list of items where each items 33 is a list of spans. Before dumping the string in a serialized format all 34 traces are normalized according to the encoding format. The trace 35 nesting is not changed. 36 37 :param traces: A list of traces that should be serialized 38 """ 39 raise NotImplementedError() 40 41 def encode(self, obj): 42 # type: (List[List[Any]]) -> str 43 """ 44 Defines the underlying format used during traces or services encoding. 45 This method must be implemented and should only be used by the internal 46 functions. 47 """ 48 raise NotImplementedError() 49 50 51class JSONEncoder(_EncoderBase): 52 content_type = "application/json" 53 54 def encode_traces(self, traces): 55 normalized_traces = [[span.to_dict() for span in trace] for trace in traces] 56 return self.encode(normalized_traces) 57 58 @staticmethod 59 def encode(obj): 60 # type: (Any) -> str 61 return json.dumps(obj) 62 63 64class JSONEncoderV2(JSONEncoder): 65 """ 66 JSONEncoderV2 encodes traces to the new intake API format. 67 """ 68 69 content_type = "application/json" 70 71 def encode_traces(self, traces): 72 # type: (List[List[Span]]) -> str 73 normalized_traces = [[JSONEncoderV2._convert_span(span) for span in trace] for trace in traces] 74 return self.encode({"traces": normalized_traces}) 75 76 @staticmethod 77 def _convert_span(span): 78 # type: (Span) -> Dict[str, Any] 79 sp = span.to_dict() 80 sp["trace_id"] = JSONEncoderV2._encode_id_to_hex(sp.get("trace_id")) 81 sp["parent_id"] = JSONEncoderV2._encode_id_to_hex(sp.get("parent_id")) 82 sp["span_id"] = JSONEncoderV2._encode_id_to_hex(sp.get("span_id")) 83 return sp 84 85 @staticmethod 86 def _encode_id_to_hex(dd_id): 87 # type: (Optional[int]) -> str 88 if not dd_id: 89 return "0000000000000000" 90 return "%0.16X" % int(dd_id) 91 92 @staticmethod 93 def _decode_id_to_hex(hex_id): 94 # type: (Optional[str]) -> int 95 if not hex_id: 96 return 0 97 return int(hex_id, 16) 98 99 100MSGPACK_ENCODERS = { 101 "v0.3": MsgpackEncoderV03, 102 "v0.4": MsgpackEncoderV03, 103 "v0.5": MsgpackEncoderV05, 104} 105