1import collections 2import datetime 3import logging 4import random 5from urllib.parse import urlparse 6 7import requests 8from stem.control import EventType 9 10import sbws.util.stem as stem_utils 11from sbws import settings 12from sbws.globals import DESTINATION_VERIFY_CERTIFICATE 13 14from ..globals import ( 15 DELTA_SECONDS_RETRY_DESTINATION, 16 FACTOR_INCREMENT_DESTINATION_RETRY, 17 MAX_NUM_DESTINATION_FAILURES, 18 MAX_SECONDS_RETRY_DESTINATION, 19 NUM_DESTINATION_ATTEMPTS_STORED, 20) 21 22log = logging.getLogger(__name__) 23 24 25# Duplicate some code from DestinationList.from_config, 26# it should be refactored. 27def parse_destinations_countries(conf): 28 """Returns the destinations' country as string separated by comma.""" 29 destinations_countries = [] 30 for key in conf["destinations"].keys(): 31 # Not a destination key 32 if key in ["usability_test_interval"]: 33 continue 34 # The destination is not enabled 35 if not conf["destinations"].getboolean(key): 36 continue 37 destination_section = "destinations.{}".format(key) 38 destination_country = conf[destination_section].get("country", None) 39 destinations_countries.append(destination_country) 40 return ",".join(destinations_countries) 41 42 43def _parse_verify_option(conf_section): 44 if "verify" not in conf_section: 45 return DESTINATION_VERIFY_CERTIFICATE 46 try: 47 verify = conf_section.getboolean("verify") 48 except ValueError: 49 log.warning( 50 "Currently sbws only supports verify=true/false, not a CA bundle " 51 "file. We think %s is not a bool, and thus must be a CA bundle " 52 "file. This is supposed to be allowed by the Python Requests " 53 "library, but pastly couldn't get it to work in his afternoon " 54 "of testing. So we will allow this, but expect Requests to throw " 55 "SSLError exceptions later. Have fun!", 56 conf_section["verify"], 57 ) 58 return conf_section["verify"] 59 if not verify: 60 # disable urllib3 warning: InsecureRequestWarning 61 import urllib3 62 63 urllib3.disable_warnings() 64 return verify 65 66 67def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl): 68 """ 69 Connect to **dest* over the given **circ_id** using the given Requests 70 **session**. Make sure the destination seems usable. Return True and a 71 dictionary of helpful information if we connected and the destination is 72 usable. Otherwise return False and a string stating what the issue is. 73 74 This function has two effects, and which one is the "side effect" depends 75 on your goal. 76 77 1. It creates a stream to the destination. It persists in the requests 78 library **session** object so future requests use the same stream. 79 Therefore, the primary effect of this function could be to open a 80 connection to the destination that measurements can be made over the given 81 **circ_id**, which makes the usability checks a side effect (yet important 82 sanity check). 83 84 2. It determines if a destination is usable. Therefore, the primary effect 85 of this function could be to perform the usability checks and return the 86 results of those checks, which makes the persistent stream a side effect 87 that we don't care about. 88 89 As of the time of writing, you'll find that sbws/core/scanner.py uses this 90 function in order to obtain that stream over which to perform measurements. 91 You will also find in sbws/lib/destination.py (this file) this function 92 being used to determine if a Destination is usable. The first relies on the 93 persistent stream side effect, the second ignores it (and in fact throws it 94 away when it closes the circuit). 95 96 :param dest Destination: the place to which we should connect 97 :param circ_id str: the circuit we should connect over 98 :param session Session: the Requests library session object to use to make 99 the connection. 100 :param cont Controller: them Stem library controller controlling Tor 101 :returns: True and a dictionary if everything is in order and measurements 102 should commence. False and an error string otherwise. 103 """ 104 assert isinstance(dest, Destination) 105 log.debug("Connecting to destination over circuit.") 106 # Do not start if sbws is stopping 107 if settings.end_event.is_set(): 108 return False, "Shutting down." 109 error_prefix = "When sending HTTP HEAD to {}, ".format(dest.url) 110 with stem_utils.stream_building_lock: 111 listener = stem_utils.attach_stream_to_circuit_listener(cont, circ_id) 112 stem_utils.add_event_listener(cont, listener, EventType.STREAM) 113 try: 114 head = session.head(dest.url, verify=dest.verify) 115 except requests.exceptions.RequestException as e: 116 dest.add_failure() 117 return False, "Could not connect to {} over circ {} {}: {}".format( 118 dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e 119 ) 120 finally: 121 stem_utils.remove_event_listener(cont, listener) 122 if head.status_code != requests.codes.ok: 123 dest.add_failure() 124 return ( 125 False, 126 error_prefix + "we expected HTTP code " 127 "{} not {}".format(requests.codes.ok, head.status_code), 128 ) 129 if "content-length" not in head.headers: 130 dest.add_failure() 131 return ( 132 False, 133 error_prefix + "we expect the header Content-Length " 134 "to exist in the response", 135 ) 136 content_length = int(head.headers["content-length"]) 137 if max_dl > content_length: 138 dest.add_failure() 139 return ( 140 False, 141 error_prefix + "our maximum configured download size " 142 "is {} but the content is only {}".format(max_dl, content_length), 143 ) 144 log.debug("Connected to %s over circuit %s", dest.url, circ_id) 145 # Any failure connecting to the destination will call add_failure, 146 # It can not be set at the start, to be able to know whether it is 147 # failing consecutive times. 148 dest.add_success() 149 return True, {"content_length": content_length} 150 151 152class Destination: 153 """Web server from which data is downloaded to measure bandwidth.""" 154 155 # NOTE: max_dl and verify should be optional and have defaults 156 def __init__( 157 self, 158 url, 159 max_dl, 160 verify, 161 max_num_failures=MAX_NUM_DESTINATION_FAILURES, 162 delta_seconds_retry=DELTA_SECONDS_RETRY_DESTINATION, 163 max_seconds_between_retries=MAX_SECONDS_RETRY_DESTINATION, 164 num_attempts_stored=NUM_DESTINATION_ATTEMPTS_STORED, 165 factor_increment_retry=FACTOR_INCREMENT_DESTINATION_RETRY, 166 ): 167 """Initializes the Web server from which the data is downloaded. 168 169 :param str url: Web server data URL to download. 170 :param int max_dl: Maximum size of the the data to download. 171 :param bool verify: Whether to verify or not the TLS certificate. 172 :param int max_num_failures: Number of consecutive failures when the 173 destination is not considered functional. 174 :param int delta_seconds_retry: Delta time to try a destination 175 that was not functional. 176 :param int num_attempts_stored: Number of attempts to store. 177 :param int factor_increment_retry: Factor to increment delta by 178 before trying to use a destination again. 179 """ 180 self._max_dl = max_dl 181 u = urlparse(url) 182 self._url = u 183 self._verify = verify 184 185 # Attributes to decide whether a destination is functional or not. 186 self._max_num_failures = max_num_failures 187 self._num_attempts_stored = num_attempts_stored 188 # Default delta time to try a destination that was not functional. 189 self._default_delta_seconds_retry = delta_seconds_retry 190 self._delta_seconds_retry = delta_seconds_retry 191 # A cap on the time to wait between destination retries. 192 self._max_seconds_between_retries = max_seconds_between_retries 193 # Using a deque (FIFO) to do not grow forever and 194 # to do not have to remove old attempts. 195 # Store tuples of timestamp and whether the destination succeed or not 196 # (succeed, 1, failed, 0). 197 # Initialize it as if it never failed. 198 self._attempts = collections.deque( 199 [ 200 (datetime.datetime.utcnow(), 1), 201 ], 202 maxlen=self._num_attempts_stored, 203 ) 204 self._factor = factor_increment_retry 205 206 def _last_attempts(self, n=None): 207 """Return the last ``n`` attempts the destination was used.""" 208 # deque does not accept slices, 209 # a new deque is returned with the last n items 210 # (or less if there were less). 211 return collections.deque( 212 self._attempts, maxlen=(n or self._max_num_failures) 213 ) 214 215 def _are_last_attempts_failures(self, n=None): 216 """ 217 Return True if the last`` n`` times the destination was used 218 and failed. 219 """ 220 # Count the number that there was a failure when used 221 n = n if n else self._max_num_failures 222 return [i[1] for i in self._last_attempts(n)].count( 223 0 224 ) >= self._max_num_failures 225 226 def _increment_time_to_retry(self, factor=None): 227 """ 228 Increment the time a destination will be tried again by a ``factor``. 229 """ 230 self._delta_seconds_retry *= factor or self._factor 231 if self._delta_seconds_retry > self._max_seconds_between_retries: 232 self._delta_seconds_retry = self._max_seconds_between_retries 233 log.debug( 234 "Incremented the time to try destination %s past the " 235 "limit, capping it at %s hours.", 236 self.url, 237 self._delta_seconds_retry / 60 / 60, 238 ) 239 else: 240 log.debug( 241 "Incremented the time to try destination %s to %s " "hours.", 242 self.url, 243 self._delta_seconds_retry / 60 / 60, 244 ) 245 246 def _get_last_try_in_seconds_ago(self): 247 """ 248 Return the delta between the last try and now, as positive seconds. 249 """ 250 # Timestamp of the last attempt. 251 last_time = self._attempts[-1][0] 252 return (datetime.datetime.utcnow() - last_time).total_seconds() 253 254 def _is_last_try_old_enough(self, n=None): 255 """ 256 Return True if the last time it was used it was ``n`` seconds ago. 257 """ 258 # If the last attempt is older than _delta_seconds_retry, try again 259 return self._get_last_try_in_seconds_ago() > self._delta_seconds_retry 260 261 def is_functional(self): 262 """Whether connections to a destination are failing or not. 263 264 Return True if: 265 - It did not fail more than n (by default 3) consecutive times. 266 - The last time the destination was tried 267 was x (by default 3h) seconds ago. 268 And False otherwise. 269 270 When the destination is tried again after the consecutive failures, 271 the time to try again is incremented and reset as soon as the 272 destination does not fail. 273 """ 274 # NOTE: does a destination fail because several threads are using 275 # it at the same time? 276 # If a destination fails for 1 minute and there're 3 threads, the 277 # 3 threads will fail. 278 279 # Failed the last X consecutive times 280 if self._are_last_attempts_failures(): 281 # The log here will appear in all the the queued relays and 282 # threads. 283 log.debug( 284 "The last %s times the destination %s failed. " 285 "It last ran %s seconds ago. " 286 "Disabled for %s seconds.", 287 self._max_num_failures, 288 self.url, 289 self._get_last_try_in_seconds_ago(), 290 self._delta_seconds_retry, 291 ) 292 log.warning( 293 "The last %s times a destination failed. " 294 "It last ran %s seconds ago. " 295 "Disabled for %s seconds." 296 "Please, add more destinations or increment the " 297 "number of maximum number of consecutive failures " 298 "in the configuration.", 299 self._max_num_failures, 300 self._get_last_try_in_seconds_ago(), 301 self._delta_seconds_retry, 302 ) 303 # It was not used for a while and the last time it was used 304 # was long ago, then try again 305 if self._is_last_try_old_enough(): 306 log.debug( 307 "The destination %s was not tried for %s seconds, " 308 "it is going to by tried again.", 309 self.url, 310 self._get_last_try_in_seconds_ago(), 311 ) 312 # Set the next time to retry higher, in case this attempt fails 313 self._increment_time_to_retry() 314 return True 315 return False 316 # Reset the time to retry to the initial value 317 # In case it was incrememented 318 self._delta_seconds_retry = self._default_delta_seconds_retry 319 return True 320 321 def add_failure(self, dt=None): 322 self._attempts.append((dt or datetime.datetime.utcnow(), 0)) 323 324 def add_success(self, dt=None): 325 self._attempts.append((dt or datetime.datetime.utcnow(), 1)) 326 327 @property 328 def url(self): 329 return self._url.geturl() 330 331 @property 332 def verify(self): 333 return self._verify 334 335 @property 336 def hostname(self): 337 return self._url.hostname 338 339 @property 340 def port(self): 341 p = self._url.port 342 scheme = self._url.scheme 343 if p is None: 344 if scheme == "http": 345 p = 80 346 elif scheme == "https": 347 p = 443 348 else: 349 assert None, "Unreachable. Unknown scheme {}".format(scheme) 350 assert p is not None 351 return p 352 353 @staticmethod 354 def from_config(conf_section, max_dl, number_threads): 355 assert "url" in conf_section 356 url = conf_section["url"] 357 verify = _parse_verify_option(conf_section) 358 try: 359 # Because one a destination fails, all the threads that are using 360 # it at that moment will fail too, multiply by the number of 361 # threads. 362 max_num_failures = ( 363 conf_section.getint("max_num_failures") 364 or MAX_NUM_DESTINATION_FAILURES 365 ) 366 except ValueError: 367 # If the operator did not setup the number, set to the default. 368 max_num_failures = MAX_NUM_DESTINATION_FAILURES 369 370 max_num_failures *= number_threads 371 return Destination(url, max_dl, verify, max_num_failures) 372 373 374class DestinationList: 375 def __init__(self, conf, dests, circuit_builder, relay_list, controller): 376 assert len(dests) > 0 377 for dest in dests: 378 assert isinstance(dest, Destination) 379 self._rng = random.SystemRandom() 380 self._cont = controller 381 self._cb = circuit_builder 382 self._rl = relay_list 383 self._all_dests = dests 384 385 @property 386 def functional_destinations(self): 387 return [d for d in self._all_dests if d.is_functional()] 388 389 @staticmethod 390 def from_config(conf, circuit_builder, relay_list, controller): 391 assert "destinations" in conf 392 section = conf["destinations"] 393 dests = [] 394 for key in section.keys(): 395 if key in ["usability_test_interval"]: 396 continue 397 if not section.getboolean(key): 398 log.debug("%s is disabled; not loading it", key) 399 continue 400 dest_sec = "destinations.{}".format(key) 401 assert dest_sec in conf # validate_config should require this 402 log.debug("Loading info for destination %s", key) 403 dests.append( 404 Destination.from_config( 405 conf[dest_sec], 406 # Multiply by the number of threads since all the threads 407 # will fail at the same time. 408 conf.getint("scanner", "max_download_size"), 409 conf.getint("scanner", "measurement_threads"), 410 ) 411 ) 412 if len(dests) < 1: 413 msg = ( 414 "No enabled destinations in config. Please see " 415 'docs/source/man_sbws.ini.rst" or "man 5 sbws.ini" ' 416 "for help adding and enabling destinations." 417 ) 418 return None, msg 419 return ( 420 DestinationList( 421 conf, dests, circuit_builder, relay_list, controller 422 ), 423 "", 424 ) 425 426 def next(self): 427 """ 428 Returns the next destination that should be used in a measurement 429 """ 430 # Do not perform usability tests since a destination is already proven 431 # usable or not in every measurement, and it should depend on a X 432 # number of failures. 433 # This removes the need for an extra lock for every measurement. 434 # Do not change the order of the destinations, just return a 435 # destination. 436 # random.choice raises IndexError with an empty list. 437 if self.functional_destinations: 438 return self._rng.choice(self.functional_destinations) 439 else: 440 return None 441