1import os
2import sys
3import time
4import struct
5import socket
6import random
7
8from eventlet.green import threading
9from eventlet.zipkin._thrift.zipkinCore import ttypes
10from eventlet.zipkin._thrift.zipkinCore.constants import SERVER_SEND
11
12
13client = None
14_tls = threading.local()  # thread local storage
15
16
17def put_annotation(msg, endpoint=None):
18    """ This is annotation API.
19    You can add your own annotation from in your code.
20    Annotation is recorded with timestamp automatically.
21    e.g.) put_annotation('cache hit for %s' % request)
22
23    :param msg: String message
24    :param endpoint: host info
25    """
26    if is_sample():
27        a = ZipkinDataBuilder.build_annotation(msg, endpoint)
28        trace_data = get_trace_data()
29        trace_data.add_annotation(a)
30
31
32def put_key_value(key, value, endpoint=None):
33    """ This is binary annotation API.
34    You can add your own key-value extra information from in your code.
35    Key-value doesn't have a time component.
36    e.g.) put_key_value('http.uri', '/hoge/index.html')
37
38    :param key: String
39    :param value: String
40    :param endpoint: host info
41    """
42    if is_sample():
43        b = ZipkinDataBuilder.build_binary_annotation(key, value, endpoint)
44        trace_data = get_trace_data()
45        trace_data.add_binary_annotation(b)
46
47
48def is_tracing():
49    """ Return whether the current thread is tracking or not """
50    return hasattr(_tls, 'trace_data')
51
52
53def is_sample():
54    """ Return whether it should record trace information
55        for the request or not
56    """
57    return is_tracing() and _tls.trace_data.sampled
58
59
60def get_trace_data():
61    if is_tracing():
62        return _tls.trace_data
63
64
65def set_trace_data(trace_data):
66    _tls.trace_data = trace_data
67
68
69def init_trace_data():
70    if is_tracing():
71        del _tls.trace_data
72
73
74def _uniq_id():
75    """
76    Create a random 64-bit signed integer appropriate
77    for use as trace and span IDs.
78    XXX: By experimentation zipkin has trouble recording traces with ids
79    larger than (2 ** 56) - 1
80    """
81    return random.randint(0, (2 ** 56) - 1)
82
83
84def generate_trace_id():
85    return _uniq_id()
86
87
88def generate_span_id():
89    return _uniq_id()
90
91
92class TraceData(object):
93
94    END_ANNOTATION = SERVER_SEND
95
96    def __init__(self, name, trace_id, span_id, parent_id, sampled, endpoint):
97        """
98        :param name: RPC name (String)
99        :param trace_id: int
100        :param span_id: int
101        :param parent_id: int or None
102        :param sampled: lets the downstream servers know
103                    if I should record trace data for the request (bool)
104        :param endpoint: zipkin._thrift.zipkinCore.ttypes.EndPoint
105        """
106        self.name = name
107        self.trace_id = trace_id
108        self.span_id = span_id
109        self.parent_id = parent_id
110        self.sampled = sampled
111        self.endpoint = endpoint
112        self.annotations = []
113        self.bannotations = []
114        self._done = False
115
116    def add_annotation(self, annotation):
117        if annotation.host is None:
118            annotation.host = self.endpoint
119        if not self._done:
120            self.annotations.append(annotation)
121            if annotation.value == self.END_ANNOTATION:
122                self.flush()
123
124    def add_binary_annotation(self, bannotation):
125        if bannotation.host is None:
126            bannotation.host = self.endpoint
127        if not self._done:
128            self.bannotations.append(bannotation)
129
130    def flush(self):
131        span = ZipkinDataBuilder.build_span(name=self.name,
132                                            trace_id=self.trace_id,
133                                            span_id=self.span_id,
134                                            parent_id=self.parent_id,
135                                            annotations=self.annotations,
136                                            bannotations=self.bannotations)
137        client.send_to_collector(span)
138        self.annotations = []
139        self.bannotations = []
140        self._done = True
141
142
143class ZipkinDataBuilder:
144    @staticmethod
145    def build_span(name, trace_id, span_id, parent_id,
146                   annotations, bannotations):
147        return ttypes.Span(
148            name=name,
149            trace_id=trace_id,
150            id=span_id,
151            parent_id=parent_id,
152            annotations=annotations,
153            binary_annotations=bannotations
154        )
155
156    @staticmethod
157    def build_annotation(value, endpoint=None):
158        if isinstance(value, unicode):
159            value = value.encode('utf-8')
160        return ttypes.Annotation(time.time() * 1000 * 1000,
161                                 str(value), endpoint)
162
163    @staticmethod
164    def build_binary_annotation(key, value, endpoint=None):
165        annotation_type = ttypes.AnnotationType.STRING
166        return ttypes.BinaryAnnotation(key, value, annotation_type, endpoint)
167
168    @staticmethod
169    def build_endpoint(ipv4=None, port=None, service_name=None):
170        if ipv4 is not None:
171            ipv4 = ZipkinDataBuilder._ipv4_to_int(ipv4)
172        if service_name is None:
173            service_name = ZipkinDataBuilder._get_script_name()
174        return ttypes.Endpoint(
175            ipv4=ipv4,
176            port=port,
177            service_name=service_name
178        )
179
180    @staticmethod
181    def _ipv4_to_int(ipv4):
182        return struct.unpack('!i', socket.inet_aton(ipv4))[0]
183
184    @staticmethod
185    def _get_script_name():
186        return os.path.basename(sys.argv[0])
187