1import json 2 3try: 4 # Python 2.x 5 import urllib2 as HTTPClient 6 from urllib2 import HTTPError 7except ImportError: 8 # Python 3.x 9 import urllib.request as HTTPClient 10 from urllib.error import HTTPError 11 12DEFAULT_ENDPOINT_URL = 'https://dc.services.visualstudio.com/v2/track' 13 14class SenderBase(object): 15 """The base class for all types of senders for use in conjunction with an implementation of :class:`QueueBase`. 16 17 The queue will notify the sender that it needs to pick up items. The concrete sender implementation will 18 listen to these notifications and will pull items from the queue getting at most :func:`send_buffer_size` items. 19 It will then call :func:`send` using the list of items pulled from the queue. 20 """ 21 def __init__(self, service_endpoint_uri): 22 """Initializes a new instance of the class. 23 24 Args: 25 service_endpoint_uri (str) the address of the service to send telemetry data to. 26 """ 27 self._service_endpoint_uri = service_endpoint_uri 28 self._queue = None 29 self._send_buffer_size = 100 30 self._timeout = 10 31 32 @property 33 def service_endpoint_uri(self): 34 """The HTTP or HTTPS endpoint that this sender will send data to. 35 36 Args: 37 value (str). the service endpoint URI. 38 39 Returns: 40 str. the service endpoint URI. 41 """ 42 return self._service_endpoint_uri 43 44 @service_endpoint_uri.setter 45 def service_endpoint_uri(self, value): 46 """The service endpoint URI where this sender will send data to. 47 48 Args: 49 value (str). the service endpoint URI. 50 51 Returns: 52 str. the service endpoint URI. 53 """ 54 self._service_endpoint_uri = value 55 56 @property 57 def queue(self): 58 """The queue that this sender is draining. While :class:`SenderBase` doesn't implement any means of doing 59 so, derivations of this class do. 60 61 Args: 62 value (:class:`QueueBase`). the queue instance that this sender is draining. 63 64 Returns: 65 :class:`QueueBase`. the queue instance that this sender is draining. 66 """ 67 return self._queue 68 69 @property 70 def send_timeout(self): 71 """Time in seconds that the sender should wait before giving up.""" 72 return self._timeout 73 74 @send_timeout.setter 75 def send_timeout(self, seconds): 76 """Configures the timeout in seconds the sender waits for a response for the server. 77 78 Args: 79 seconds(float). Timeout in seconds. 80 """ 81 82 self._timeout = seconds 83 84 @queue.setter 85 def queue(self, value): 86 """The queue that this sender is draining. While :class:`SenderBase` doesn't implement any means of doing 87 so, derivations of this class do. 88 89 Args: 90 value (:class:`QueueBase`). the queue instance that this sender is draining. 91 92 Returns: 93 :class:`QueueBase`. the queue instance that this sender is draining. 94 """ 95 self._queue = value 96 97 @property 98 def send_buffer_size(self): 99 """The buffer size for a single batch of telemetry. This is the maximum number of items in a single service 100 request that this sender is going to send. 101 102 Args: 103 value (int). the maximum number of items in a telemetry batch. 104 105 Returns: 106 int. the maximum number of items in a telemetry batch. 107 """ 108 return self._send_buffer_size 109 110 @send_buffer_size.setter 111 def send_buffer_size(self, value): 112 """The buffer size for a single batch of telemetry. This is the maximum number of items in a single service 113 request that this sender is going to send. 114 115 Args: 116 value (int). the maximum number of items in a telemetry batch. 117 118 Returns: 119 int. the maximum number of items in a telemetry batch. 120 """ 121 if value < 1: 122 value = 1 123 self._send_buffer_size = value 124 125 def send(self, data_to_send): 126 """ Immediately sends the data passed in to :func:`service_endpoint_uri`. If the service request fails, the 127 passed in items are pushed back to the :func:`queue`. 128 129 Args: 130 data_to_send (Array): an array of :class:`contracts.Envelope` objects to send to the service. 131 """ 132 request_payload = json.dumps([ a.write() for a in data_to_send ]) 133 134 request = HTTPClient.Request(self._service_endpoint_uri, bytearray(request_payload, 'utf-8'), { 'Accept': 'application/json', 'Content-Type' : 'application/json; charset=utf-8' }) 135 try: 136 response = HTTPClient.urlopen(request, timeout=self._timeout) 137 status_code = response.getcode() 138 if 200 <= status_code < 300: 139 return 140 except HTTPError as e: 141 if e.getcode() == 400: 142 return 143 except Exception as e: 144 pass 145 146 # Add our unsent data back on to the queue 147 for data in data_to_send: 148 self._queue.put(data) 149