1import copy 2import logging 3import random 4from datetime import datetime, timedelta 5from threading import Lock 6 7from stem import ControllerError, DescriptorUnavailable, Flag 8from stem.descriptor.router_status_entry import RouterStatusEntryV3 9from stem.descriptor.server_descriptor import ServerDescriptor 10 11from ..globals import ( 12 MAX_RECENT_CONSENSUS_COUNT, 13 MAX_RECENT_PRIORITY_LIST_COUNT, 14 MAX_RECENT_PRIORITY_RELAY_COUNT, 15 MEASUREMENTS_PERIOD, 16) 17from ..util import timestamps 18 19log = logging.getLogger(__name__) 20 21 22def valid_after_from_network_statuses(network_statuses): 23 """Obtain the consensus Valid-After datetime from the ``document`` 24 attribute of a ``stem.descriptor.RouterStatusEntryV3``. 25 26 :param list network_statuses: 27 28 returns datetime: 29 """ 30 for ns in network_statuses: 31 document = getattr(ns, "document", None) 32 if document: 33 valid_after = getattr(document, "valid_after", None) 34 if valid_after: 35 return valid_after 36 return datetime.utcnow().replace(microsecond=0) 37 38 39class Relay: 40 def __init__(self, fp, cont, ns=None, desc=None, timestamp=None): 41 """ 42 Given a relay fingerprint, fetch all the information about a relay that 43 sbws currently needs and store it in this class. Acts as an abstraction 44 to hide the confusion that is Tor consensus/descriptor stuff. 45 46 :param str fp: fingerprint of the relay. 47 :param cont: active and valid stem Tor controller connection 48 49 :param datatime timestamp: the timestamp of a consensus 50 (RouterStatusEntryV3) from which this relay has been obtained. 51 """ 52 assert isinstance(fp, str) 53 assert len(fp) == 40 54 if ns is not None: 55 assert isinstance(ns, RouterStatusEntryV3) 56 self._ns = ns 57 else: 58 try: 59 self._ns = cont.get_network_status(fp, default=None) 60 except (DescriptorUnavailable, ControllerError) as e: 61 log.exception("Exception trying to get ns %s", e) 62 self._ns = None 63 if desc is not None: 64 assert isinstance(desc, ServerDescriptor) 65 self._desc = desc 66 else: 67 try: 68 self._desc = cont.get_server_descriptor(fp, default=None) 69 except (DescriptorUnavailable, ControllerError) as e: 70 log.exception("Exception trying to get desc %s", e) 71 self.relay_in_recent_consensus = timestamps.DateTimeSeq( 72 [], MAX_RECENT_CONSENSUS_COUNT 73 ) 74 # Use the same timestamp as the consensus, so that it can be tested 75 # that the relay was in a consensus using this timestamp. 76 # Note that this doesn't change the number of consensus the relay was 77 # in. 78 self.update_relay_in_recent_consensus(timestamp) 79 # The number of times that a relay is "prioritized" to be measured. 80 # It is incremented in ``RelayPrioritizer.best_priority`` 81 self.relay_recent_priority_list = timestamps.DateTimeSeq( 82 [], MAX_RECENT_PRIORITY_LIST_COUNT 83 ) 84 # The number of times that a relay has been queued to be measured. 85 # It is incremented in ``scanner.main_loop`` 86 self.relay_recent_measurement_attempt = timestamps.DateTimeSeq( 87 [], MAX_RECENT_PRIORITY_LIST_COUNT 88 ) 89 90 def _from_desc(self, attr): 91 if not self._desc: 92 return None 93 return getattr(self._desc, attr, None) 94 95 def _from_ns(self, attr): 96 if not self._ns: 97 return None 98 return getattr(self._ns, attr, None) 99 100 @property 101 def nickname(self): 102 return self._from_ns("nickname") 103 104 @property 105 def fingerprint(self): 106 return self._from_ns("fingerprint") 107 108 @property 109 def flags(self): 110 return self._from_ns("flags") 111 112 @property 113 def exit_policy(self): 114 return self._from_desc("exit_policy") 115 116 @property 117 def average_bandwidth(self): 118 return self._from_desc("average_bandwidth") 119 120 @property 121 def burst_bandwidth(self): 122 return self._from_desc("burst_bandwidth") 123 124 @property 125 def observed_bandwidth(self): 126 return self._from_desc("observed_bandwidth") 127 128 @property 129 def consensus_bandwidth(self): 130 """Return the consensus bandwidth in Bytes. 131 132 Consensus bandwidth is the only bandwidth value that is in kilobytes. 133 """ 134 if self._from_ns("bandwidth") is not None: 135 return self._from_ns("bandwidth") * 1000 136 137 @property 138 def consensus_bandwidth_is_unmeasured(self): 139 # measured appears only votes, unmeasured appears in consensus 140 # therefore is_unmeasured is needed to know whether the bandwidth 141 # value in consensus is coming from bwauth measurements or not. 142 return self._from_ns("is_unmeasured") 143 144 @property 145 def address(self): 146 return self._from_ns("address") 147 148 @property 149 def master_key_ed25519(self): 150 """Obtain ed25519 master key of the relay in server descriptors. 151 152 :returns: str, the ed25519 master key base 64 encoded without 153 trailing '='s. 154 155 """ 156 # Even if this key is called master-key-ed25519 in dir-spec.txt, 157 # it seems that stem parses it as ed25519_master_key 158 key = self._from_desc("ed25519_master_key") 159 if key is None: 160 return None 161 return key.rstrip("=") 162 163 @property 164 def consensus_valid_after(self): 165 """Obtain the consensus Valid-After from the document of this relay 166 network status. 167 """ 168 network_status_document = self._from_ns("document") 169 if network_status_document: 170 return getattr(network_status_document, "valid_after", None) 171 return None 172 173 @property 174 def last_consensus_timestamp(self): 175 return self.relay_in_recent_consensus.last() 176 177 def update_relay_in_recent_consensus(self, timestamp=None): 178 self.relay_in_recent_consensus.update(timestamp) 179 180 @property 181 def relay_in_recent_consensus_count(self): 182 """Number of times the relay was in a conensus.""" 183 return len(self.relay_in_recent_consensus) 184 185 def can_exit_to_port(self, port, strict=False): 186 """ 187 Returns True if the relay has an exit policy and the policy accepts 188 exiting to the given port or False otherwise. 189 190 If ``strict`` is true, it only returns the exits that can exit to all 191 IPs and that port. 192 193 The exits that are IPv6 only or IPv4 but rejecting some public networks 194 will return false. 195 On July 2020, there were 67 out of 1095 exits like this. 196 197 If ``strict`` is false, it returns any exit that can exit to some 198 public IPs and that port. 199 200 Note that the EXIT flag exists when the relay can exit to 443 **and** 201 80. Currently all Web servers are using 443, so it would not be needed 202 to check the EXIT flag too, using this function. 203 204 """ 205 assert isinstance(port, int) 206 # if dind't get the descriptor, there isn't exit policy 207 # When the attribute is gotten in getattr(self._desc, "exit_policy"), 208 # is possible that stem's _input_rules is None and raises an exception 209 # (#29899): 210 # File "/usr/lib/python3/dist-packages/sbws/lib/relaylist.py", line 117, in can_exit_to_port # noqa 211 # if not self.exit_policy: 212 # File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 512, in __len__ # noqa 213 # return len(self._get_rules()) 214 # File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 464, in _get_rules # noqa 215 # for rule in decompressed_rules: 216 # TypeError: 'NoneType' object is not iterable 217 # Therefore, catch the exception here. 218 try: 219 if self.exit_policy: 220 # Using `strip_private` to ignore reject rules to private 221 # networks. 222 # When ``strict`` is true, We could increase the chances that 223 # the exit can exit via IPv6 too (``exit_policy_v6``). However, 224 # in theory that is only known using microdescriptors. 225 return self.exit_policy.strip_private().can_exit_to( 226 port=port, strict=strict 227 ) 228 except TypeError: 229 return False 230 return False 231 232 def is_exit_not_bad_allowing_port(self, port, strict=False): 233 return ( 234 Flag.BADEXIT not in self.flags 235 and Flag.EXIT in self.flags 236 and self.can_exit_to_port(port, strict) 237 ) 238 239 def increment_relay_recent_measurement_attempt(self): 240 """ 241 Increment The number of times that a relay has been queued 242 to be measured. 243 244 It is call from :func:`~sbws.core.scaner.main_loop`. 245 """ 246 self.relay_recent_measurement_attempt.update() 247 248 @property 249 def relay_recent_measurement_attempt_count(self): 250 return len(self.relay_recent_measurement_attempt) 251 252 def increment_relay_recent_priority_list(self): 253 """ 254 The number of times that a relay is "prioritized" to be measured. 255 256 It is call from 257 :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`. 258 """ 259 # If it was not in the previous measurements version, start counting 260 self.relay_recent_priority_list.update() 261 262 @property 263 def relay_recent_priority_list_count(self): 264 return len(self.relay_recent_priority_list) 265 266 # XXX: tech-debt: replace `_desc` attr by a a `dequee` of the last 267 # descriptors seen for this relay and the timestamp. 268 def update_server_descriptor(self, server_descriptor): 269 """Update this relay server descriptor (from the consensus.""" 270 self._desc = server_descriptor 271 272 # XXX: tech-debt: replace `_ns` attr by a a `dequee` of the last 273 # router statuses seen for this relay and the timestampt. 274 def update_router_status(self, router_status): 275 """Update this relay router status (from the consensus).""" 276 self._ns = router_status 277 278 279class RelayList: 280 """Keeps a list of all relays in the current Tor network and updates it 281 transparently in the background. Provides useful interfaces for getting 282 only relays of a certain type. 283 """ 284 285 def __init__( 286 self, 287 args, 288 conf, 289 controller, 290 measurements_period=MEASUREMENTS_PERIOD, 291 state=None, 292 ): 293 self._controller = controller 294 self.rng = random.SystemRandom() 295 self._refresh_lock = Lock() 296 # To track all the consensus seen. 297 self._recent_consensus = timestamps.DateTimeSeq( 298 [], MAX_RECENT_CONSENSUS_COUNT, state, "recent_consensus" 299 ) 300 # Initialize so that there's no error trying to access to it. 301 # In future refactor, change to a dictionary, where the keys are 302 # the relays' fingerprint. 303 self._relays = [] 304 # The period of time for which the measurements are keep. 305 self._measurements_period = measurements_period 306 self._recent_measurement_attempt = timestamps.DateTimeSeq( 307 [], 308 MAX_RECENT_PRIORITY_RELAY_COUNT, 309 state, 310 "recent_measurement_attempt", 311 ) 312 # Start with 0 for the min bw for our second hops 313 self._exit_min_bw = 0 314 self._non_exit_min_bw = 0 315 self._refresh() 316 317 def _need_refresh(self): 318 # New consensuses happen every hour. 319 return datetime.utcnow() >= self.last_consensus_timestamp + timedelta( 320 seconds=60 * 60 321 ) 322 323 @property 324 def last_consensus_timestamp(self): 325 """Returns the datetime when the last consensus was obtained.""" 326 return self._recent_consensus.last() 327 328 @property 329 def relays(self): 330 # See if we can get the list of relays without having to do a refresh, 331 # which is expensive and blocks other threads 332 if self._need_refresh(): 333 log.debug( 334 "We need to refresh our list of relays. " 335 "Going to wait for lock." 336 ) 337 # Whelp we couldn't just get the list of relays because the list is 338 # stale. Wait for the lock so we can refresh it. 339 with self._refresh_lock: 340 log.debug( 341 "We got the lock. Now to see if we still " 342 "need to refresh." 343 ) 344 # Now we have the lock ... but wait! Maybe someone else already 345 # did the refreshing. So check if it still needs refreshing. If 346 # not, we can do nothing. 347 if self._need_refresh(): 348 log.debug("Yup we need to refresh our relays. Doing so.") 349 self._refresh() 350 else: 351 log.debug( 352 "No we don't need to refresh our relays. " 353 "It was done by someone else." 354 ) 355 log.debug("Giving back the lock for refreshing relays.") 356 return self._relays 357 358 @property 359 def fast(self): 360 return self._relays_with_flag(Flag.FAST) 361 362 @property 363 def exits(self): 364 return self._relays_with_flag(Flag.EXIT) 365 366 @property 367 def bad_exits(self): 368 return self._relays_with_flag(Flag.BADEXIT) 369 370 @property 371 def non_exits(self): 372 return self._relays_without_flag(Flag.EXIT) 373 374 @property 375 def guards(self): 376 return self._relays_with_flag(Flag.GUARD) 377 378 @property 379 def authorities(self): 380 return self._relays_with_flag(Flag.AUTHORITY) 381 382 @property 383 def relays_fingerprints(self): 384 # Using relays instead of _relays, so that the list get updated if 385 # needed, since this method is used to know which fingerprints are in 386 # the consensus. 387 return [r.fingerprint for r in self.relays] 388 389 def random_relay(self): 390 return self.rng.choice(self.relays) 391 392 def _relays_with_flag(self, flag): 393 return [r for r in self.relays if flag in r.flags] 394 395 def _relays_without_flag(self, flag): 396 return [r for r in self.relays if flag not in r.flags] 397 398 def _init_relays(self): 399 """Returns a new list of relays that are in the current consensus. 400 And update the consensus timestamp list with the current one. 401 402 """ 403 c = self._controller 404 # This will get router statuses from this Tor cache, might not be 405 # updated with the network. 406 # Change to stem.descriptor.remote in future refactor. 407 network_statuses = c.get_network_statuses() 408 new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses]) 409 log.debug( 410 "Number of relays in the current consensus: %d.", 411 len(new_relays_dict), 412 ) 413 414 # Find the timestamp of the last consensus. 415 timestamp = valid_after_from_network_statuses(network_statuses) 416 self._recent_consensus.update(timestamp) 417 418 new_relays = [] 419 420 # Only or debugging, count the relays that are not in the current 421 # consensus and have not been seen in the last consensuses either. 422 num_old_relays = 0 423 424 relays = copy.deepcopy(self._relays) 425 for r in relays: 426 if r.fingerprint in new_relays_dict.keys(): 427 # If a relay in the previous consensus and is in the current 428 # one, update its timestamp, router status and descriptor. 429 fp = r.fingerprint 430 # new_relays_dict[fp] is the router status. 431 r.update_router_status(new_relays_dict[fp]) 432 r.update_relay_in_recent_consensus(timestamp) 433 try: 434 descriptor = c.get_server_descriptor(fp, default=None) 435 except (DescriptorUnavailable, ControllerError) as e: 436 log.exception("Exception trying to get desc %s", e) 437 r.update_server_descriptor(descriptor) 438 # Add it to the new list of relays. 439 new_relays.append(r) 440 # And remove it from the new consensus dict, as it has 441 # already added to the new list. 442 new_relays_dict.pop(fp) 443 444 # In #30727, the relay that is not in the current conensus but is 445 # not "old", was added to the new list of relays too. 446 # In #40037 we think it should not be measured, as it might cause 447 # many circuit errors. It's already added to the generator. 448 # Otherwise, don't add it to the new list of relays. 449 # For debugging, count the old relays that will be discarded. 450 else: 451 num_old_relays += 1 452 453 # Finally, add the relays that were not in the previous consensus 454 for fp, ns in new_relays_dict.items(): 455 r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp) 456 new_relays.append(r) 457 458 days = self._measurements_period / (60 * 60 * 24) 459 log.debug( 460 "Previous number of relays being measured %d", len(self._relays) 461 ) 462 log.debug( 463 "Number of relays not in the in the consensus in the last " 464 "%d days: %d.", 465 days, 466 num_old_relays, 467 ) 468 log.debug( 469 "Number of relays to measure with the current consensus: " "%d", 470 len(new_relays), 471 ) 472 return new_relays 473 474 def _refresh(self): 475 # Set a new list of relays. 476 self._relays = self._init_relays() 477 478 log.info( 479 "Number of consensuses obtained in the last %s days: %s.", 480 int(self._measurements_period / 24 / 60 / 60), 481 self.recent_consensus_count, 482 ) 483 484 # Calculate minimum bandwidth value for 2nd hop after we refreshed 485 # our available relays. 486 self._calculate_min_bw_second_hop() 487 488 @property 489 def recent_consensus_count(self): 490 """Number of times a new consensus was obtained.""" 491 return len(self._recent_consensus) 492 493 def exits_not_bad_allowing_port(self, port, strict=False): 494 return [ 495 r 496 for r in self.exits 497 if r.is_exit_not_bad_allowing_port(port, strict) 498 ] 499 500 def increment_recent_measurement_attempt(self): 501 """ 502 Increment the number of times that any relay has been queued to be 503 measured. 504 505 It is call from :func:`~sbws.core.scaner.main_loop`. 506 507 It is read and stored in a ``state`` file. 508 """ 509 # NOTE: blocking, writes to file! 510 self._recent_measurement_attempt.update() 511 512 @property 513 def recent_measurement_attempt_count(self): 514 return len(self._recent_measurement_attempt) 515 516 def _calculate_min_bw_second_hop(self): 517 """ 518 Calculates the minimum bandwidth for both exit and non-exit relays 519 chosen as a second hop by picking the lowest bandwidth value available 520 from the top 75% of the respective category. 521 """ 522 # Sort our sets of candidates according to bw, lowest amount first. 523 # It's okay to keep things simple for the calculation and go over all 524 # exits, including badexits. 525 exit_candidates = sorted( 526 self.exits, key=lambda r: r.consensus_bandwidth 527 ) 528 non_exit_candidates = sorted( 529 self.non_exits, key=lambda r: r.consensus_bandwidth 530 ) 531 # We know the bandwidth is sorted from least to most. Dividing the 532 # length of the available relays by 4 gives us the position of the 533 # relay with the lowest bandwidth from the top 75%. We do this both 534 # for our exit and non-exit candidates. 535 pos = int(len(exit_candidates) / 4) 536 self._exit_min_bw = exit_candidates[pos].consensus_bandwidth 537 pos = int(len(non_exit_candidates) / 4) 538 # when there are not non-exits in a test network 539 if pos: 540 self._non_exit_min_bw = non_exit_candidates[ 541 pos 542 ].consensus_bandwidth 543 544 def exit_min_bw(self): 545 return self._exit_min_bw 546 547 def non_exit_min_bw(self): 548 return self._non_exit_min_bw 549