1import os 2import sys 3import time 4import struct 5import socket 6import random 7 8from eventlet.green import threading 9from eventlet.zipkin._thrift.zipkinCore import ttypes 10from eventlet.zipkin._thrift.zipkinCore.constants import SERVER_SEND 11 12 13client = None 14_tls = threading.local() # thread local storage 15 16 17def put_annotation(msg, endpoint=None): 18 """ This is annotation API. 19 You can add your own annotation from in your code. 20 Annotation is recorded with timestamp automatically. 21 e.g.) put_annotation('cache hit for %s' % request) 22 23 :param msg: String message 24 :param endpoint: host info 25 """ 26 if is_sample(): 27 a = ZipkinDataBuilder.build_annotation(msg, endpoint) 28 trace_data = get_trace_data() 29 trace_data.add_annotation(a) 30 31 32def put_key_value(key, value, endpoint=None): 33 """ This is binary annotation API. 34 You can add your own key-value extra information from in your code. 35 Key-value doesn't have a time component. 36 e.g.) put_key_value('http.uri', '/hoge/index.html') 37 38 :param key: String 39 :param value: String 40 :param endpoint: host info 41 """ 42 if is_sample(): 43 b = ZipkinDataBuilder.build_binary_annotation(key, value, endpoint) 44 trace_data = get_trace_data() 45 trace_data.add_binary_annotation(b) 46 47 48def is_tracing(): 49 """ Return whether the current thread is tracking or not """ 50 return hasattr(_tls, 'trace_data') 51 52 53def is_sample(): 54 """ Return whether it should record trace information 55 for the request or not 56 """ 57 return is_tracing() and _tls.trace_data.sampled 58 59 60def get_trace_data(): 61 if is_tracing(): 62 return _tls.trace_data 63 64 65def set_trace_data(trace_data): 66 _tls.trace_data = trace_data 67 68 69def init_trace_data(): 70 if is_tracing(): 71 del _tls.trace_data 72 73 74def _uniq_id(): 75 """ 76 Create a random 64-bit signed integer appropriate 77 for use as trace and span IDs. 78 XXX: By experimentation zipkin has trouble recording traces with ids 79 larger than (2 ** 56) - 1 80 """ 81 return random.randint(0, (2 ** 56) - 1) 82 83 84def generate_trace_id(): 85 return _uniq_id() 86 87 88def generate_span_id(): 89 return _uniq_id() 90 91 92class TraceData(object): 93 94 END_ANNOTATION = SERVER_SEND 95 96 def __init__(self, name, trace_id, span_id, parent_id, sampled, endpoint): 97 """ 98 :param name: RPC name (String) 99 :param trace_id: int 100 :param span_id: int 101 :param parent_id: int or None 102 :param sampled: lets the downstream servers know 103 if I should record trace data for the request (bool) 104 :param endpoint: zipkin._thrift.zipkinCore.ttypes.EndPoint 105 """ 106 self.name = name 107 self.trace_id = trace_id 108 self.span_id = span_id 109 self.parent_id = parent_id 110 self.sampled = sampled 111 self.endpoint = endpoint 112 self.annotations = [] 113 self.bannotations = [] 114 self._done = False 115 116 def add_annotation(self, annotation): 117 if annotation.host is None: 118 annotation.host = self.endpoint 119 if not self._done: 120 self.annotations.append(annotation) 121 if annotation.value == self.END_ANNOTATION: 122 self.flush() 123 124 def add_binary_annotation(self, bannotation): 125 if bannotation.host is None: 126 bannotation.host = self.endpoint 127 if not self._done: 128 self.bannotations.append(bannotation) 129 130 def flush(self): 131 span = ZipkinDataBuilder.build_span(name=self.name, 132 trace_id=self.trace_id, 133 span_id=self.span_id, 134 parent_id=self.parent_id, 135 annotations=self.annotations, 136 bannotations=self.bannotations) 137 client.send_to_collector(span) 138 self.annotations = [] 139 self.bannotations = [] 140 self._done = True 141 142 143class ZipkinDataBuilder: 144 @staticmethod 145 def build_span(name, trace_id, span_id, parent_id, 146 annotations, bannotations): 147 return ttypes.Span( 148 name=name, 149 trace_id=trace_id, 150 id=span_id, 151 parent_id=parent_id, 152 annotations=annotations, 153 binary_annotations=bannotations 154 ) 155 156 @staticmethod 157 def build_annotation(value, endpoint=None): 158 if isinstance(value, unicode): 159 value = value.encode('utf-8') 160 return ttypes.Annotation(time.time() * 1000 * 1000, 161 str(value), endpoint) 162 163 @staticmethod 164 def build_binary_annotation(key, value, endpoint=None): 165 annotation_type = ttypes.AnnotationType.STRING 166 return ttypes.BinaryAnnotation(key, value, annotation_type, endpoint) 167 168 @staticmethod 169 def build_endpoint(ipv4=None, port=None, service_name=None): 170 if ipv4 is not None: 171 ipv4 = ZipkinDataBuilder._ipv4_to_int(ipv4) 172 if service_name is None: 173 service_name = ZipkinDataBuilder._get_script_name() 174 return ttypes.Endpoint( 175 ipv4=ipv4, 176 port=port, 177 service_name=service_name 178 ) 179 180 @staticmethod 181 def _ipv4_to_int(ipv4): 182 return struct.unpack('!i', socket.inet_aton(ipv4))[0] 183 184 @staticmethod 185 def _get_script_name(): 186 return os.path.basename(sys.argv[0]) 187