1import json
2from typing import Any
3from typing import Dict
4from typing import List
5from typing import Optional
6from typing import TYPE_CHECKING
7
8from ._encoding import ListStringTable
9from ._encoding import MsgpackEncoderV03
10from ._encoding import MsgpackEncoderV05
11from .logger import get_logger
12
13
14__all__ = ["MsgpackEncoderV03", "MsgpackEncoderV05", "ListStringTable", "MSGPACK_ENCODERS"]
15
16
17if TYPE_CHECKING:
18    from ..span import Span
19
20
21log = get_logger(__name__)
22
23
24class _EncoderBase(object):
25    """
26    Encoder interface that provides the logic to encode traces and service.
27    """
28
29    def encode_traces(self, traces):
30        # type: (List[List[Span]]) -> str
31        """
32        Encodes a list of traces, expecting a list of items where each items
33        is a list of spans. Before dumping the string in a serialized format all
34        traces are normalized according to the encoding format. The trace
35        nesting is not changed.
36
37        :param traces: A list of traces that should be serialized
38        """
39        raise NotImplementedError()
40
41    def encode(self, obj):
42        # type: (List[List[Any]]) -> str
43        """
44        Defines the underlying format used during traces or services encoding.
45        This method must be implemented and should only be used by the internal
46        functions.
47        """
48        raise NotImplementedError()
49
50
51class JSONEncoder(_EncoderBase):
52    content_type = "application/json"
53
54    def encode_traces(self, traces):
55        normalized_traces = [[span.to_dict() for span in trace] for trace in traces]
56        return self.encode(normalized_traces)
57
58    @staticmethod
59    def encode(obj):
60        # type: (Any) -> str
61        return json.dumps(obj)
62
63
64class JSONEncoderV2(JSONEncoder):
65    """
66    JSONEncoderV2 encodes traces to the new intake API format.
67    """
68
69    content_type = "application/json"
70
71    def encode_traces(self, traces):
72        # type: (List[List[Span]]) -> str
73        normalized_traces = [[JSONEncoderV2._convert_span(span) for span in trace] for trace in traces]
74        return self.encode({"traces": normalized_traces})
75
76    @staticmethod
77    def _convert_span(span):
78        # type: (Span) -> Dict[str, Any]
79        sp = span.to_dict()
80        sp["trace_id"] = JSONEncoderV2._encode_id_to_hex(sp.get("trace_id"))
81        sp["parent_id"] = JSONEncoderV2._encode_id_to_hex(sp.get("parent_id"))
82        sp["span_id"] = JSONEncoderV2._encode_id_to_hex(sp.get("span_id"))
83        return sp
84
85    @staticmethod
86    def _encode_id_to_hex(dd_id):
87        # type: (Optional[int]) -> str
88        if not dd_id:
89            return "0000000000000000"
90        return "%0.16X" % int(dd_id)
91
92    @staticmethod
93    def _decode_id_to_hex(hex_id):
94        # type: (Optional[str]) -> int
95        if not hex_id:
96            return 0
97        return int(hex_id, 16)
98
99
100MSGPACK_ENCODERS = {
101    "v0.3": MsgpackEncoderV03,
102    "v0.4": MsgpackEncoderV03,
103    "v0.5": MsgpackEncoderV05,
104}
105