1""" Measure the relays. """ 2import concurrent.futures 3import logging 4import os 5import queue 6import random 7import signal 8import sys 9import threading 10import time 11import traceback 12import uuid 13from argparse import ArgumentDefaultsHelpFormatter 14 15import sbws.util.requests as requests_utils 16import sbws.util.stem as stem_utils 17from sbws.globals import HTTP_GET_HEADERS, SOCKET_TIMEOUT, fail_hard 18 19from .. import settings 20from ..lib.circuitbuilder import GapsCircuitBuilder as CB 21from ..lib.destination import ( 22 DestinationList, 23 connect_to_destination_over_circuit, 24) 25from ..lib.heartbeat import Heartbeat 26from ..lib.relaylist import RelayList 27from ..lib.relayprioritizer import RelayPrioritizer 28from ..lib.resultdump import ( 29 ResultDump, 30 ResultError, 31 ResultErrorCircuit, 32 ResultErrorDestination, 33 ResultErrorSecondRelay, 34 ResultErrorStream, 35 ResultSuccess, 36) 37from ..util.state import State 38from ..util.timestamp import now_isodt_str 39 40rng = random.SystemRandom() 41log = logging.getLogger(__name__) 42# Declare the objects that manage the threads global so that sbws can exit 43# gracefully at any time. 44rd = None 45controller = None 46 47FILLUP_TICKET_MSG = """Something went wrong. 48Please create an issue at 49https://gitlab.torproject.org/tpo/network-health/sbws/-/issues with this 50traceback.""" 51 52 53def stop_threads(signal, frame, exit_code=0): 54 global rd 55 log.debug("Stopping sbws.") 56 # Avoid new threads to start. 57 settings.set_end_event() 58 # Stop ResultDump thread 59 rd.thread.join() 60 # Stop Tor thread 61 controller.close() 62 sys.exit(exit_code) 63 64 65signal.signal(signal.SIGTERM, stop_threads) 66 67 68def dumpstacks(): 69 log.critical(FILLUP_TICKET_MSG) 70 thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()]) 71 for thread_id, stack in sys._current_frames().items(): 72 log.critical( 73 "Thread: %s(%d)", thread_id2name.get(thread_id, ""), thread_id 74 ) 75 log.critical("Traceback: %s", "".join(traceback.format_stack(stack))) 76 77 78def sigint_handler(): 79 import pdb 80 81 pdb.set_trace() 82 83 84signal.signal(signal.SIGINT, sigint_handler) 85 86 87def timed_recv_from_server(session, dest, byte_range): 88 """Request the **byte_range** from the URL at **dest**. If successful, 89 return True and the time it took to download. Otherwise return False and an 90 exception.""" 91 92 start_time = time.time() 93 HTTP_GET_HEADERS["Range"] = byte_range 94 # - response.elapsed "measures the time taken between sending the first 95 # byte of the request and finishing parsing the headers. 96 # It is therefore unaffected by consuming the response content" 97 # If this mean that the content has arrived, elapsed could be used to 98 # know the time it took. 99 try: 100 # headers are merged with the session ones, not overwritten. 101 session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify) 102 # All `requests` exceptions could be caught with 103 # `requests.exceptions.RequestException`, but it seems that `requests` 104 # does not catch all the ssl exceptions and urllib3 doesn't seem to have 105 # a base exception class. 106 except Exception as e: 107 log.debug(e) 108 return False, e 109 end_time = time.time() 110 return True, end_time - start_time 111 112 113def get_random_range_string(content_length, size): 114 """ 115 Return a random range of bytes of length **size**. **content_length** is 116 the size of the file we will be requesting a range of bytes from. 117 118 For example, for content_length of 100 and size 10, this function will 119 return one of the following: '0-9', '1-10', '2-11', [...] '89-98', '90-99' 120 """ 121 assert size <= content_length 122 # start can be anywhere in the content_length as long as it is **size** 123 # bytes away from the end or more. Because range is [start, end) (doesn't 124 # include the end value), add 1 to the end. 125 start = rng.choice(range(0, content_length - size + 1)) 126 # Unlike range, the byte range in an http header is [start, end] (does 127 # include the end value), so we subtract one 128 end = start + size - 1 129 # start and end are indexes, while content_length is a length, therefore, 130 # the largest index end should ever be should be less than the total length 131 # of the content. For example, if content_length is 10, end could be 132 # anywhere from 0 to 9. 133 assert end < content_length 134 return "bytes={}-{}".format(start, end) 135 136 137def measure_rtt_to_server(session, conf, dest, content_length): 138 """Make multiple end-to-end RTT measurements by making small HTTP requests 139 over a circuit + stream that should already exist, persist, and not need 140 rebuilding. If something goes wrong and not all of the RTT measurements can 141 be made, return None. Otherwise return a list of the RTTs (in seconds). 142 143 :returns tuple: results or None if the if the measurement fail. 144 None or exception if the measurement fail. 145 146 """ 147 rtts = [] 148 size = conf.getint("scanner", "min_download_size") 149 for _ in range(0, conf.getint("scanner", "num_rtts")): 150 log.debug("Measuring RTT to %s", dest.url) 151 random_range = get_random_range_string(content_length, size) 152 success, data = timed_recv_from_server(session, dest, random_range) 153 if not success: 154 # data is an exception 155 log.debug( 156 "While measuring the RTT to %s we hit an exception " 157 "(does the webserver support Range requests?): %s", 158 dest.url, 159 data, 160 ) 161 return None, data 162 assert success 163 # data is an RTT 164 assert isinstance(data, float) or isinstance(data, int) 165 rtts.append(data) 166 return rtts, None 167 168 169def measure_bandwidth_to_server(session, conf, dest, content_length): 170 """ 171 :returns tuple: results or None if the if the measurement fail. 172 None or exception if the measurement fail. 173 174 """ 175 results = [] 176 num_downloads = conf.getint("scanner", "num_downloads") 177 expected_amount = conf.getint("scanner", "initial_read_request") 178 min_dl = conf.getint("scanner", "min_download_size") 179 max_dl = conf.getint("scanner", "max_download_size") 180 download_times = { 181 "toofast": conf.getfloat("scanner", "download_toofast"), 182 "min": conf.getfloat("scanner", "download_min"), 183 "target": conf.getfloat("scanner", "download_target"), 184 "max": conf.getfloat("scanner", "download_max"), 185 } 186 while len(results) < num_downloads and not settings.end_event.is_set(): 187 assert expected_amount >= min_dl 188 assert expected_amount <= max_dl 189 random_range = get_random_range_string(content_length, expected_amount) 190 success, data = timed_recv_from_server(session, dest, random_range) 191 if not success: 192 # data is an exception 193 log.debug( 194 "While measuring the bandwidth to %s we hit an " 195 "exception (does the webserver support Range " 196 "requests?): %s", 197 dest.url, 198 data, 199 ) 200 return None, data 201 assert success 202 # data is a download time 203 assert isinstance(data, float) or isinstance(data, int) 204 if _should_keep_result( 205 expected_amount == max_dl, data, download_times 206 ): 207 results.append({"duration": data, "amount": expected_amount}) 208 expected_amount = _next_expected_amount( 209 expected_amount, data, download_times, min_dl, max_dl 210 ) 211 return results, None 212 213 214def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit): 215 """ 216 Sbws builds two hop circuits. Given the **relay** to measure with 217 destination **dest**, pick a second relay that is or is not an exit 218 according to **is_exit**. 219 """ 220 # 40041: Instead of using exits that can exit to all IPs, to ensure that 221 # they can make requests to the Web servers, try with the exits that 222 # allow some IPs, since there're more. 223 # In the case that a concrete exit can't exit to the Web server, it is not 224 # a problem since the relay will be measured in the next loop with other 225 # random exit. 226 candidates = ( 227 rl.exits_not_bad_allowing_port(dest.port) if is_exit else rl.non_exits 228 ) 229 if not len(candidates): 230 return None 231 # In the case the helper is an exit, the entry could be an exit too 232 # (#40041), so ensure the helper is not the same as the entry, likely to 233 # happen in a test network. 234 if is_exit: 235 candidates = [ 236 c for c in candidates if c.fingerprint != relay.fingerprint 237 ] 238 min_relay_bw = rl.exit_min_bw() if is_exit else rl.non_exit_min_bw() 239 log.debug( 240 "Picking a 2nd hop to measure %s from %d choices. is_exit=%s", 241 relay.nickname, 242 len(candidates), 243 is_exit, 244 ) 245 for min_bw_factor in [2, 1.75, 1.5, 1.25, 1]: 246 min_bw = relay.consensus_bandwidth * min_bw_factor 247 # We might have a really slow/new relay. Try to measure it properly by 248 # using only relays with or above our calculated min_relay_bw (see: 249 # _calculate_min_bw_second_hop() in relaylist.py). 250 if min_bw < min_relay_bw: 251 min_bw = min_relay_bw 252 new_candidates = stem_utils.only_relays_with_bandwidth( 253 cont, candidates, min_bw=min_bw 254 ) 255 if len(new_candidates) > 0: 256 chosen = rng.choice(new_candidates) 257 log.debug( 258 "Found %d candidate 2nd hops with at least %sx the bandwidth " 259 "of %s. Returning %s (bw=%s).", 260 len(new_candidates), 261 min_bw_factor, 262 relay.nickname, 263 chosen.nickname, 264 chosen.consensus_bandwidth, 265 ) 266 return chosen 267 candidates = sorted( 268 candidates, key=lambda r: r.consensus_bandwidth, reverse=True 269 ) 270 chosen = candidates[0] 271 log.debug( 272 "Didn't find any 2nd hops at least as fast as %s (bw=%s). It's " 273 "probably really fast. Returning %s (bw=%s), the fastest " 274 "candidate we have.", 275 relay.nickname, 276 relay.consensus_bandwidth, 277 chosen.nickname, 278 chosen.consensus_bandwidth, 279 ) 280 return chosen 281 282 283def error_no_helper(relay, dest, our_nick=""): 284 reason = "Unable to select a second relay" 285 log.debug( 286 reason + " to help measure %s (%s)", relay.fingerprint, relay.nickname 287 ) 288 return [ 289 ResultErrorSecondRelay(relay, [], dest.url, our_nick, msg=reason), 290 ] 291 292 293def create_path_relay(relay, dest, rl, cb, relay_as_entry=True): 294 # the helper `is_exit` arg (should be better called `helper_as_exit`), 295 # is True when the relay is the entry (helper has to be exit) 296 # and False when the relay is not the entry, ie. is the exit (helper does 297 # not have to be an exit) 298 helper = _pick_ideal_second_hop( 299 relay, dest, rl, cb.controller, is_exit=relay_as_entry 300 ) 301 if not helper: 302 return error_no_helper(relay, dest) 303 if relay_as_entry: 304 circ_fps = [relay.fingerprint, helper.fingerprint] 305 nicknames = [relay.nickname, helper.nickname] 306 exit_policy = helper.exit_policy 307 else: 308 circ_fps = [helper.fingerprint, relay.fingerprint] 309 nicknames = [helper.nickname, relay.nickname] 310 exit_policy = relay.exit_policy 311 return circ_fps, nicknames, exit_policy 312 313 314def error_no_circuit(circ_fps, nicknames, reason, relay, dest, our_nick): 315 log.debug( 316 "Could not build circuit with path %s (%s): %s ", 317 circ_fps, 318 nicknames, 319 reason, 320 ) 321 return [ 322 ResultErrorCircuit(relay, circ_fps, dest.url, our_nick, msg=reason), 323 ] 324 325 326def measure_relay(args, conf, destinations, cb, rl, relay): 327 """ 328 Select a Web server, a relay to build the circuit, 329 build the circuit and measure the bandwidth of the given relay. 330 331 :return Result: a measurement Result object 332 333 """ 334 log.debug("Measuring %s %s", relay.nickname, relay.fingerprint) 335 our_nick = conf["scanner"]["nickname"] 336 s = requests_utils.make_session( 337 cb.controller, conf.getfloat("general", "http_timeout") 338 ) 339 # Probably because the scanner is stopping. 340 if s is None: 341 if settings.end_event.is_set(): 342 return None 343 else: 344 # In future refactor this should be returned from the make_session 345 reason = "Unable to get proxies." 346 log.debug( 347 reason + " to measure %s %s", relay.nickname, relay.fingerprint 348 ) 349 return [ 350 ResultError(relay, [], "", our_nick, msg=reason), 351 ] 352 # Pick a destination 353 dest = destinations.next() 354 # When there're no any functional destinations. 355 if not dest: 356 # NOTE: When there're still functional destinations but only one of 357 # them fail, the error will be included in `ResultErrorStream`. 358 # Since this is being executed in a thread, the scanner can not 359 # be stop here, but the `end_event` signal can be set so that the 360 # main thread stop the scanner. 361 # It might be useful to store the fact that the destinations fail, 362 # so store here the error, and set the signal once the error is stored 363 # (in `resultump`). 364 log.critical( 365 "There are not any functional destinations.\n" 366 "It is recommended to set several destinations so that " 367 "the scanner can continue if one fails." 368 ) 369 reason = "No functional destinations" 370 # Resultdump will set end_event after storing the error 371 return [ 372 ResultErrorDestination(relay, [], "", our_nick, msg=reason), 373 ] 374 375 # Pick a relay to help us measure the given relay. If the given relay is an 376 # exit, then pick a non-exit. Otherwise pick an exit. 377 # Instead of ensuring that the relay can exit to all IPs, try first with 378 # the relay as an exit, if it can exit to some IPs. 379 if relay.is_exit_not_bad_allowing_port(dest.port): 380 r = create_path_relay(relay, dest, rl, cb, relay_as_entry=False) 381 else: 382 r = create_path_relay(relay, dest, rl, cb) 383 # When `error_no_helper` is triggered because a helper is not found, what 384 # can happen in test networks with very few relays, it returns a list with 385 # the error. 386 if len(r) == 1: 387 return r 388 circ_fps, nicknames, exit_policy = r 389 390 # Build the circuit 391 circ_id, reason = cb.build_circuit(circ_fps) 392 393 # If the circuit failed to get created, bad luck, it will be created again 394 # with other helper. 395 # Here we won't have the case that an exit tried to build the circuit as 396 # entry and failed (#40029), cause not checking that it can exit all IPs. 397 if not circ_id: 398 return error_no_circuit( 399 circ_fps, nicknames, reason, relay, dest, our_nick 400 ) 401 log.debug( 402 "Built circuit with path %s (%s) to measure %s (%s)", 403 circ_fps, 404 nicknames, 405 relay.fingerprint, 406 relay.nickname, 407 ) 408 # Make a connection to the destination 409 is_usable, usable_data = connect_to_destination_over_circuit( 410 dest, circ_id, s, cb.controller, dest._max_dl 411 ) 412 413 # In the case that the relay was used as an exit, but could not exit 414 # to the Web server, try again using it as entry, to avoid that it would 415 # always fail when there's only one Web server. 416 if not is_usable and relay.is_exit_not_bad_allowing_port(dest.port): 417 log.debug( 418 "Exit %s (%s) that can't exit all ips, with exit policy %s, failed" 419 " to connect to %s via circuit %s (%s). Reason: %s. Trying again " 420 "with it as entry.", 421 relay.fingerprint, 422 relay.nickname, 423 exit_policy, 424 dest.url, 425 circ_fps, 426 nicknames, 427 usable_data, 428 ) 429 r = create_path_relay(relay, dest, rl, cb) 430 if len(r) == 1: 431 return r 432 circ_fps, nicknames, exit_policy = r 433 circ_id, reason = cb.build_circuit(circ_fps) 434 if not circ_id: 435 log.info( 436 "Exit %s (%s) that can't exit all ips, failed to create " 437 " circuit as entry: %s (%s).", 438 relay.fingerprint, 439 relay.nickname, 440 circ_fps, 441 nicknames, 442 ) 443 return error_no_circuit( 444 circ_fps, nicknames, reason, relay, dest, our_nick 445 ) 446 447 log.debug( 448 "Built circuit with path %s (%s) to measure %s (%s)", 449 circ_fps, 450 nicknames, 451 relay.fingerprint, 452 relay.nickname, 453 ) 454 is_usable, usable_data = connect_to_destination_over_circuit( 455 dest, circ_id, s, cb.controller, dest._max_dl 456 ) 457 if not is_usable: 458 log.debug( 459 "Failed to connect to %s to measure %s (%s) via circuit " 460 "%s (%s). Exit policy: %s. Reason: %s.", 461 dest.url, 462 relay.fingerprint, 463 relay.nickname, 464 circ_fps, 465 nicknames, 466 exit_policy, 467 usable_data, 468 ) 469 cb.close_circuit(circ_id) 470 return [ 471 ResultErrorStream( 472 relay, circ_fps, dest.url, our_nick, msg=usable_data 473 ), 474 ] 475 assert is_usable 476 assert "content_length" in usable_data 477 # FIRST: measure RTT 478 rtts, reason = measure_rtt_to_server( 479 s, conf, dest, usable_data["content_length"] 480 ) 481 if rtts is None: 482 log.debug( 483 "Unable to measure RTT for %s (%s) to %s via circuit " 484 "%s (%s): %s", 485 relay.fingerprint, 486 relay.nickname, 487 dest.url, 488 circ_fps, 489 nicknames, 490 reason, 491 ) 492 cb.close_circuit(circ_id) 493 return [ 494 ResultErrorStream( 495 relay, circ_fps, dest.url, our_nick, msg=str(reason) 496 ), 497 ] 498 # SECOND: measure bandwidth 499 bw_results, reason = measure_bandwidth_to_server( 500 s, conf, dest, usable_data["content_length"] 501 ) 502 if bw_results is None: 503 log.debug( 504 "Failed to measure %s (%s) via circuit %s (%s) to %s. Exit" 505 " policy: %s. Reason: %s.", 506 relay.fingerprint, 507 relay.nickname, 508 circ_fps, 509 nicknames, 510 dest.url, 511 exit_policy, 512 reason, 513 ) 514 cb.close_circuit(circ_id) 515 return [ 516 ResultErrorStream( 517 relay, circ_fps, dest.url, our_nick, msg=str(reason) 518 ), 519 ] 520 cb.close_circuit(circ_id) 521 # Finally: store result 522 log.debug( 523 "Success measurement for %s (%s) via circuit %s (%s) to %s", 524 relay.fingerprint, 525 relay.nickname, 526 circ_fps, 527 nicknames, 528 dest.url, 529 ) 530 return [ 531 ResultSuccess(rtts, bw_results, relay, circ_fps, dest.url, our_nick), 532 ] 533 534 535def dispatch_worker_thread(*a, **kw): 536 # If at the point where the relay is actually going to be measured there 537 # are not any functional destinations or the `end_event` is set, do not 538 # try to start measuring the relay, since it will fail anyway. 539 try: 540 # a[2] is the argument `destinations` 541 functional_destinations = a[2].functional_destinations 542 # In case the arguments or the method change, catch the possible exceptions 543 # but ignore here that there are not destinations. 544 except (IndexError, TypeError): 545 log.debug("Wrong argument or attribute.") 546 functional_destinations = True 547 if not functional_destinations or settings.end_event.is_set(): 548 return None 549 return measure_relay(*a, **kw) 550 551 552def _should_keep_result(did_request_maximum, result_time, download_times): 553 # In the normal case, we didn't ask for the maximum allowed amount. So we 554 # should only allow ourselves to keep results that are between the min and 555 # max allowed time 556 msg = "Keeping measurement time {:.2f}".format(result_time) 557 if ( 558 not did_request_maximum 559 and result_time >= download_times["min"] 560 and result_time < download_times["max"] 561 ): 562 log.debug(msg) 563 return True 564 # If we did request the maximum amount, we should keep the result as long 565 # as it took less than the maximum amount of time 566 if did_request_maximum and result_time < download_times["max"]: 567 log.debug(msg) 568 return True 569 # In all other cases, return false 570 log.debug( 571 "Not keeping result time %f.%s", 572 result_time, 573 "" 574 if not did_request_maximum 575 else " We requested the maximum " "amount allowed.", 576 ) 577 return False 578 579 580def _next_expected_amount( 581 expected_amount, result_time, download_times, min_dl, max_dl 582): 583 if result_time < download_times["toofast"]: 584 # Way too fast, greatly increase the amount we ask for 585 expected_amount = int(expected_amount * 5) 586 elif ( 587 result_time < download_times["min"] 588 or result_time >= download_times["max"] 589 ): 590 # As long as the result is between min/max, keep the expected amount 591 # the same. Otherwise, adjust so we are aiming for the target amount. 592 expected_amount = int( 593 expected_amount * download_times["target"] / result_time 594 ) 595 # Make sure we don't request too much or too little 596 expected_amount = max(min_dl, expected_amount) 597 expected_amount = min(max_dl, expected_amount) 598 return expected_amount 599 600 601def measurement_writer(result_dump, measurement): 602 # Since result_dump thread is calling queue.get() every second, 603 # the queue should be full for only 1 second. 604 # This call blocks at maximum timeout seconds. 605 try: 606 result_dump.queue.put(measurement, timeout=3) 607 except queue.Full: 608 # The result would be lost, the scanner will continue working. 609 log.warning( 610 "The queue with measurements is full, when adding %s.\n" 611 "It is possible that the thread that get them to " 612 "write them to the disk (ResultDump.enter) is stalled.", 613 measurement, 614 ) 615 616 617def log_measurement_exception(target, exception): 618 print("in result putter error") 619 if settings.end_event.is_set(): 620 return 621 # The only object that can be here if there is not any uncatched 622 # exception is stem.SocketClosed when stopping sbws 623 # An exception here means that the worker thread finished. 624 log.warning(FILLUP_TICKET_MSG) 625 # To print the traceback that happened in the thread, not here in 626 # the main process. 627 log.warning( 628 "".join( 629 traceback.format_exception( 630 type(exception), exception, exception.__traceback__ 631 ) 632 ) 633 ) 634 log.debug( 635 "".join( 636 target.fingerprint, 637 target.nickname, 638 traceback.format_exception( 639 type(exception), exception, exception.__traceback__ 640 ), 641 ) 642 ) 643 644 645def main_loop( 646 args, 647 conf, 648 controller, 649 relay_list, 650 circuit_builder, 651 result_dump, 652 relay_prioritizer, 653 destinations, 654): 655 r"""Create the queue of future measurements for every relay to measure. 656 657 It starts a loop that will be run while there is not and event signaling 658 that sbws is stopping (because of SIGTERM or SIGINT). 659 660 Then the ``ThreadPoolExecutor`` (executor) queues all the relays to 661 measure in ``Future`` objects. These objects have an ``state``. 662 663 The executor starts a new thread for every relay to measure, which runs 664 ``measure_relay`` until there are ``max_pending_results`` threads. 665 After that, it will reuse a thread that has finished for every relay to 666 measure. 667 668 Then ``process_completed_futures`` is call, to obtain the results in the 669 completed ``future``\s. 670 671 """ 672 log.info("Started the main loop to measure the relays.") 673 hbeat = Heartbeat(conf.getpath("paths", "state_fname")) 674 675 # Set the time to wait for a thread to finish as the half of an HTTP 676 # request timeout. 677 # Do not start a new loop if sbws is stopping. 678 while not settings.end_event.is_set(): 679 log.debug("Starting a new measurement loop.") 680 num_relays = 0 681 loop_tstart = time.time() 682 683 # Register relay fingerprints to the heartbeat module 684 hbeat.register_consensus_fprs(relay_list.relays_fingerprints) 685 # num_threads 686 max_pending_results = conf.getint("scanner", "measurement_threads") 687 with concurrent.futures.ThreadPoolExecutor( 688 max_workers=max_pending_results, thread_name_prefix="measurer" 689 ) as executor: 690 log.info("In the executor, queue all future measurements.") 691 # With futures, there's no need for callback, what it was the 692 # callback with multiprocessing library can be just a function 693 # that gets executed when the future result is obtained. 694 pending_results = { 695 executor.submit( 696 dispatch_worker_thread, 697 args, 698 conf, 699 destinations, 700 circuit_builder, 701 relay_list, 702 target, 703 ): target 704 for target in relay_prioritizer.best_priority() 705 } 706 log.debug("Measurements queued.") 707 # After the submitting all the targets to the executor, the pool 708 # has queued all the relays and pending_results has the list of all 709 # `Future`s. 710 711 # Each target relay_recent_measurement_attempt is incremented in 712 # `process_completed_futures` as well as hbeat measured 713 # fingerprints. 714 num_relays = len(pending_results) 715 # Without a callback, it's needed to pass `result_dump` here to 716 # call the function that writes the measurement when it's 717 # finished. 718 process_completed_futures( 719 executor, 720 hbeat, 721 result_dump, 722 pending_results, 723 ) 724 wait_futures_completed(pending_results) 725 726 # Print the heartbeat message 727 hbeat.print_heartbeat_message() 728 729 loop_tstop = time.time() 730 loop_tdelta = (loop_tstop - loop_tstart) / 60 731 # At this point, we know the relays that were queued to be 732 # measured. 733 log.debug( 734 "Attempted to measure %s relays in %s minutes", 735 num_relays, 736 loop_tdelta, 737 ) 738 # In a testing network, exit after first loop 739 if controller.get_conf("TestingTorNetwork") == "1": 740 log.info("In a testing network, exiting after the first loop.") 741 # Threads should be closed nicely in some refactor 742 stop_threads(signal.SIGTERM, None) 743 744 745def process_completed_futures(executor, hbeat, result_dump, pending_results): 746 """Obtain the relays' measurements as they finish. 747 748 For every ``Future`` measurements that gets completed, obtain the 749 ``result`` and call ``measurement_writer``, which put the ``Result`` in 750 ``ResultDump`` queue and complete immediately. 751 752 ``ResultDump`` thread (started before and out of this function) will get 753 the ``Result`` from the queue and write it to disk, so this doesn't block 754 the measurement threads. 755 756 If there was an exception not caught by ``measure_relay``, it will call 757 instead ``log_measurement_exception``, which logs the error and complete 758 immediately. 759 760 """ 761 num_relays_to_measure = num_pending_results = len(pending_results) 762 with executor: 763 for future_measurement in concurrent.futures.as_completed( 764 pending_results 765 ): 766 target = pending_results[future_measurement] 767 # 40023, disable to decrease state.dat json lines 768 # relay_list.increment_recent_measurement_attempt() 769 target.increment_relay_recent_measurement_attempt() 770 771 # Register this measurement to the heartbeat module 772 hbeat.register_measured_fpr(target.fingerprint) 773 log.debug( 774 "Future measurement for target %s (%s) is done: %s", 775 target.fingerprint, 776 target.nickname, 777 future_measurement.done(), 778 ) 779 try: 780 measurement = future_measurement.result() 781 except Exception as e: 782 log_measurement_exception(target, e) 783 import psutil 784 785 log.warning(psutil.Process(os.getpid()).memory_full_info()) 786 virtualMemoryInfo = psutil.virtual_memory() 787 availableMemory = virtualMemoryInfo.available 788 log.warning( 789 "Memory available %s MB.", availableMemory / 1024 ** 2 790 ) 791 dumpstacks() 792 else: 793 log.info("Measurement ready: %s" % (measurement)) 794 measurement_writer(result_dump, measurement) 795 # `pending_results` has all the initial queued `Future`s, 796 # they don't decrease as they get completed, but we know 1 has be 797 # completed in each loop, 798 num_pending_results -= 1 799 log.info( 800 "Pending measurements: %s out of %s: ", 801 num_pending_results, 802 num_relays_to_measure, 803 ) 804 805 806def wait_futures_completed(pending_results): 807 """Wait for last futures to finish, before starting new loop.""" 808 log.info("Wait for any remaining measurements.") 809 done, not_done = concurrent.futures.wait( 810 pending_results, 811 timeout=SOCKET_TIMEOUT + 10, # HTTP timeout is 10 812 return_when=concurrent.futures.ALL_COMPLETED, 813 ) 814 log.info("Completed futures: %s", len(done)) 815 # log.debug([f.__dict__ for f in done]) 816 cancelled = [f for f in done if f.cancelled()] 817 if cancelled: 818 log.warning("Cancelled futures: %s", len(cancelled)) 819 for f, t in cancelled: 820 log.debug(t.fingerprint) 821 dumpstacks() 822 if not_done: 823 log.warning("Not completed futures: %s", len(not_done)) 824 for f, t in not_done: 825 log.debug(t.fingerprint) 826 dumpstacks() 827 828 829def run_speedtest(args, conf): 830 """Initializes all the data and threads needed to measure the relays. 831 832 It launches or connect to Tor in a thread. 833 It initializes the list of relays seen in the Tor network. 834 It starts a thread to read the previous measurements and wait for new 835 measurements to write them to the disk. 836 It initializes a class that will be used to order the relays depending 837 on their measurements age. 838 It initializes the list of destinations that will be used for the 839 measurements. 840 It initializes the thread pool that will launch the measurement threads. 841 The pool starts 3 other threads that are not the measurement (worker) 842 threads. 843 Finally, it calls the function that will manage the measurement threads. 844 845 """ 846 global rd, controller 847 848 controller = stem_utils.launch_or_connect_to_tor(conf) 849 850 # When there will be a refactor where conf is global, this can be removed 851 # from here. 852 state = State(conf.getpath("paths", "state_fname")) 853 # XXX: tech-debt: create new function to obtain the controller and to 854 # write the state, so that a unit test to check the state tor version can 855 # be created 856 # Store tor version whenever the scanner starts. 857 state["tor_version"] = str(controller.get_version()) 858 # Call only once to initialize http_headers 859 settings.init_http_headers( 860 conf.get("scanner", "nickname"), state["uuid"], state["tor_version"] 861 ) 862 # To do not have to pass args and conf to RelayList, pass an extra 863 # argument with the data_period 864 measurements_period = conf.getint("general", "data_period") 865 rl = RelayList(args, conf, controller, measurements_period, state) 866 cb = CB(args, conf, controller, rl) 867 rd = ResultDump(args, conf) 868 rp = RelayPrioritizer(args, conf, rl, rd) 869 destinations, error_msg = DestinationList.from_config( 870 conf, cb, rl, controller 871 ) 872 if not destinations: 873 fail_hard(error_msg) 874 try: 875 main_loop(args, conf, controller, rl, cb, rd, rp, destinations) 876 except KeyboardInterrupt: 877 log.info("Interrupted by the user.") 878 dumpstacks() 879 except Exception as e: 880 log.exception(e) 881 dumpstacks() 882 883 884def gen_parser(sub): 885 d = ( 886 "The scanner side of sbws. This should be run on a well-connected " 887 "machine on the Internet with a healthy amount of spare bandwidth. " 888 "This continuously builds circuits, measures relays, and dumps " 889 "results into a datadir, commonly found in ~/.sbws" 890 ) 891 sub.add_parser( 892 "scanner", formatter_class=ArgumentDefaultsHelpFormatter, description=d 893 ) 894 895 896def main(args, conf): 897 if conf.getint("scanner", "measurement_threads") < 1: 898 fail_hard("Number of measurement threads must be larger than 1") 899 900 min_dl = conf.getint("scanner", "min_download_size") 901 max_dl = conf.getint("scanner", "max_download_size") 902 if max_dl < min_dl: 903 fail_hard( 904 "Max download size %d cannot be smaller than min %d", 905 max_dl, 906 min_dl, 907 ) 908 909 os.makedirs(conf.getpath("paths", "datadir"), exist_ok=True) 910 911 state = State(conf.getpath("paths", "state_fname")) 912 state["scanner_started"] = now_isodt_str() 913 # Generate an unique identifier for each scanner 914 if "uuid" not in state: 915 state["uuid"] = str(uuid.uuid4()) 916 917 run_speedtest(args, conf) 918