1""" Measure the relays. """
2import concurrent.futures
3import logging
4import os
5import queue
6import random
7import signal
8import sys
9import threading
10import time
11import traceback
12import uuid
13from argparse import ArgumentDefaultsHelpFormatter
14
15import sbws.util.requests as requests_utils
16import sbws.util.stem as stem_utils
17from sbws.globals import HTTP_GET_HEADERS, SOCKET_TIMEOUT, fail_hard
18
19from .. import settings
20from ..lib.circuitbuilder import GapsCircuitBuilder as CB
21from ..lib.destination import (
22    DestinationList,
23    connect_to_destination_over_circuit,
24)
25from ..lib.heartbeat import Heartbeat
26from ..lib.relaylist import RelayList
27from ..lib.relayprioritizer import RelayPrioritizer
28from ..lib.resultdump import (
29    ResultDump,
30    ResultError,
31    ResultErrorCircuit,
32    ResultErrorDestination,
33    ResultErrorSecondRelay,
34    ResultErrorStream,
35    ResultSuccess,
36)
37from ..util.state import State
38from ..util.timestamp import now_isodt_str
39
40rng = random.SystemRandom()
41log = logging.getLogger(__name__)
42# Declare the objects that manage the threads global so that sbws can exit
43# gracefully at any time.
44rd = None
45controller = None
46
47FILLUP_TICKET_MSG = """Something went wrong.
48Please create an issue at
49https://gitlab.torproject.org/tpo/network-health/sbws/-/issues with this
50traceback."""
51
52
53def stop_threads(signal, frame, exit_code=0):
54    global rd
55    log.debug("Stopping sbws.")
56    # Avoid new threads to start.
57    settings.set_end_event()
58    # Stop ResultDump thread
59    rd.thread.join()
60    # Stop Tor thread
61    controller.close()
62    sys.exit(exit_code)
63
64
65signal.signal(signal.SIGTERM, stop_threads)
66
67
68def dumpstacks():
69    log.critical(FILLUP_TICKET_MSG)
70    thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()])
71    for thread_id, stack in sys._current_frames().items():
72        log.critical(
73            "Thread: %s(%d)", thread_id2name.get(thread_id, ""), thread_id
74        )
75        log.critical("Traceback: %s", "".join(traceback.format_stack(stack)))
76
77
78def sigint_handler():
79    import pdb
80
81    pdb.set_trace()
82
83
84signal.signal(signal.SIGINT, sigint_handler)
85
86
87def timed_recv_from_server(session, dest, byte_range):
88    """Request the **byte_range** from the URL at **dest**. If successful,
89    return True and the time it took to download. Otherwise return False and an
90    exception."""
91
92    start_time = time.time()
93    HTTP_GET_HEADERS["Range"] = byte_range
94    # - response.elapsed "measures the time taken between sending the first
95    #   byte of the request and finishing parsing the headers.
96    #   It is therefore unaffected by consuming the response content"
97    #   If this mean that the content has arrived, elapsed could be used to
98    #   know the time it took.
99    try:
100        # headers are merged with the session ones, not overwritten.
101        session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify)
102    # All `requests` exceptions could be caught with
103    # `requests.exceptions.RequestException`, but it seems that `requests`
104    # does not catch all the ssl exceptions and urllib3 doesn't seem to have
105    # a base exception class.
106    except Exception as e:
107        log.debug(e)
108        return False, e
109    end_time = time.time()
110    return True, end_time - start_time
111
112
113def get_random_range_string(content_length, size):
114    """
115    Return a random range of bytes of length **size**. **content_length** is
116    the size of the file we will be requesting a range of bytes from.
117
118    For example, for content_length of 100 and size 10, this function will
119    return one of the following: '0-9', '1-10', '2-11', [...] '89-98', '90-99'
120    """
121    assert size <= content_length
122    # start can be anywhere in the content_length as long as it is **size**
123    # bytes away from the end or more. Because range is [start, end) (doesn't
124    # include the end value), add 1 to the end.
125    start = rng.choice(range(0, content_length - size + 1))
126    # Unlike range, the byte range in an http header is [start, end] (does
127    # include the end value), so we subtract one
128    end = start + size - 1
129    # start and end are indexes, while content_length is a length, therefore,
130    # the largest index end should ever be should be less than the total length
131    # of the content. For example, if content_length is 10, end could be
132    # anywhere from 0 to 9.
133    assert end < content_length
134    return "bytes={}-{}".format(start, end)
135
136
137def measure_rtt_to_server(session, conf, dest, content_length):
138    """Make multiple end-to-end RTT measurements by making small HTTP requests
139    over a circuit + stream that should already exist, persist, and not need
140    rebuilding. If something goes wrong and not all of the RTT measurements can
141    be made, return None. Otherwise return a list of the RTTs (in seconds).
142
143    :returns tuple: results or None if the if the measurement fail.
144        None or exception if the measurement fail.
145
146    """
147    rtts = []
148    size = conf.getint("scanner", "min_download_size")
149    for _ in range(0, conf.getint("scanner", "num_rtts")):
150        log.debug("Measuring RTT to %s", dest.url)
151        random_range = get_random_range_string(content_length, size)
152        success, data = timed_recv_from_server(session, dest, random_range)
153        if not success:
154            # data is an exception
155            log.debug(
156                "While measuring the RTT to %s we hit an exception "
157                "(does the webserver support Range requests?): %s",
158                dest.url,
159                data,
160            )
161            return None, data
162        assert success
163        # data is an RTT
164        assert isinstance(data, float) or isinstance(data, int)
165        rtts.append(data)
166    return rtts, None
167
168
169def measure_bandwidth_to_server(session, conf, dest, content_length):
170    """
171    :returns tuple: results or None if the if the measurement fail.
172        None or exception if the measurement fail.
173
174    """
175    results = []
176    num_downloads = conf.getint("scanner", "num_downloads")
177    expected_amount = conf.getint("scanner", "initial_read_request")
178    min_dl = conf.getint("scanner", "min_download_size")
179    max_dl = conf.getint("scanner", "max_download_size")
180    download_times = {
181        "toofast": conf.getfloat("scanner", "download_toofast"),
182        "min": conf.getfloat("scanner", "download_min"),
183        "target": conf.getfloat("scanner", "download_target"),
184        "max": conf.getfloat("scanner", "download_max"),
185    }
186    while len(results) < num_downloads and not settings.end_event.is_set():
187        assert expected_amount >= min_dl
188        assert expected_amount <= max_dl
189        random_range = get_random_range_string(content_length, expected_amount)
190        success, data = timed_recv_from_server(session, dest, random_range)
191        if not success:
192            # data is an exception
193            log.debug(
194                "While measuring the bandwidth to %s we hit an "
195                "exception (does the webserver support Range "
196                "requests?): %s",
197                dest.url,
198                data,
199            )
200            return None, data
201        assert success
202        # data is a download time
203        assert isinstance(data, float) or isinstance(data, int)
204        if _should_keep_result(
205            expected_amount == max_dl, data, download_times
206        ):
207            results.append({"duration": data, "amount": expected_amount})
208        expected_amount = _next_expected_amount(
209            expected_amount, data, download_times, min_dl, max_dl
210        )
211    return results, None
212
213
214def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit):
215    """
216    Sbws builds two hop circuits. Given the **relay** to measure with
217    destination **dest**, pick a second relay that is or is not an exit
218    according to **is_exit**.
219    """
220    # 40041: Instead of using exits that can exit to all IPs, to ensure that
221    # they can make requests to the Web servers, try with the exits that
222    # allow some IPs, since there're more.
223    # In the case that a concrete exit can't exit to the Web server, it is not
224    # a problem since the relay will be measured in the next loop with other
225    # random exit.
226    candidates = (
227        rl.exits_not_bad_allowing_port(dest.port) if is_exit else rl.non_exits
228    )
229    if not len(candidates):
230        return None
231    # In the case the helper is an exit, the entry could be an exit too
232    # (#40041), so ensure the helper is not the same as the entry, likely to
233    # happen in a test network.
234    if is_exit:
235        candidates = [
236            c for c in candidates if c.fingerprint != relay.fingerprint
237        ]
238    min_relay_bw = rl.exit_min_bw() if is_exit else rl.non_exit_min_bw()
239    log.debug(
240        "Picking a 2nd hop to measure %s from %d choices. is_exit=%s",
241        relay.nickname,
242        len(candidates),
243        is_exit,
244    )
245    for min_bw_factor in [2, 1.75, 1.5, 1.25, 1]:
246        min_bw = relay.consensus_bandwidth * min_bw_factor
247        # We might have a really slow/new relay. Try to measure it properly by
248        # using only relays with or above our calculated min_relay_bw (see:
249        # _calculate_min_bw_second_hop() in relaylist.py).
250        if min_bw < min_relay_bw:
251            min_bw = min_relay_bw
252        new_candidates = stem_utils.only_relays_with_bandwidth(
253            cont, candidates, min_bw=min_bw
254        )
255        if len(new_candidates) > 0:
256            chosen = rng.choice(new_candidates)
257            log.debug(
258                "Found %d candidate 2nd hops with at least %sx the bandwidth "
259                "of %s. Returning %s (bw=%s).",
260                len(new_candidates),
261                min_bw_factor,
262                relay.nickname,
263                chosen.nickname,
264                chosen.consensus_bandwidth,
265            )
266            return chosen
267    candidates = sorted(
268        candidates, key=lambda r: r.consensus_bandwidth, reverse=True
269    )
270    chosen = candidates[0]
271    log.debug(
272        "Didn't find any 2nd hops at least as fast as %s (bw=%s). It's "
273        "probably really fast. Returning %s (bw=%s), the fastest "
274        "candidate we have.",
275        relay.nickname,
276        relay.consensus_bandwidth,
277        chosen.nickname,
278        chosen.consensus_bandwidth,
279    )
280    return chosen
281
282
283def error_no_helper(relay, dest, our_nick=""):
284    reason = "Unable to select a second relay"
285    log.debug(
286        reason + " to help measure %s (%s)", relay.fingerprint, relay.nickname
287    )
288    return [
289        ResultErrorSecondRelay(relay, [], dest.url, our_nick, msg=reason),
290    ]
291
292
293def create_path_relay(relay, dest, rl, cb, relay_as_entry=True):
294    # the helper `is_exit` arg (should be better called `helper_as_exit`),
295    # is True when the relay is the entry (helper has to be exit)
296    # and False when the relay is not the entry, ie. is the exit (helper does
297    # not have to be an exit)
298    helper = _pick_ideal_second_hop(
299        relay, dest, rl, cb.controller, is_exit=relay_as_entry
300    )
301    if not helper:
302        return error_no_helper(relay, dest)
303    if relay_as_entry:
304        circ_fps = [relay.fingerprint, helper.fingerprint]
305        nicknames = [relay.nickname, helper.nickname]
306        exit_policy = helper.exit_policy
307    else:
308        circ_fps = [helper.fingerprint, relay.fingerprint]
309        nicknames = [helper.nickname, relay.nickname]
310        exit_policy = relay.exit_policy
311    return circ_fps, nicknames, exit_policy
312
313
314def error_no_circuit(circ_fps, nicknames, reason, relay, dest, our_nick):
315    log.debug(
316        "Could not build circuit with path %s (%s): %s ",
317        circ_fps,
318        nicknames,
319        reason,
320    )
321    return [
322        ResultErrorCircuit(relay, circ_fps, dest.url, our_nick, msg=reason),
323    ]
324
325
326def measure_relay(args, conf, destinations, cb, rl, relay):
327    """
328    Select a Web server, a relay to build the circuit,
329    build the circuit and measure the bandwidth of the given relay.
330
331    :return Result: a measurement Result object
332
333    """
334    log.debug("Measuring %s %s", relay.nickname, relay.fingerprint)
335    our_nick = conf["scanner"]["nickname"]
336    s = requests_utils.make_session(
337        cb.controller, conf.getfloat("general", "http_timeout")
338    )
339    # Probably because the scanner is stopping.
340    if s is None:
341        if settings.end_event.is_set():
342            return None
343        else:
344            # In future refactor this should be returned from the make_session
345            reason = "Unable to get proxies."
346            log.debug(
347                reason + " to measure %s %s", relay.nickname, relay.fingerprint
348            )
349            return [
350                ResultError(relay, [], "", our_nick, msg=reason),
351            ]
352    # Pick a destination
353    dest = destinations.next()
354    # When there're no any functional destinations.
355    if not dest:
356        # NOTE: When there're still functional destinations but only one of
357        # them fail, the error will be included in `ResultErrorStream`.
358        # Since this is being executed in a thread, the scanner can not
359        # be stop here, but the `end_event` signal can be set so that the
360        # main thread stop the scanner.
361        # It might be useful to store the fact that the destinations fail,
362        # so store here the error, and set the signal once the error is stored
363        # (in `resultump`).
364        log.critical(
365            "There are not any functional destinations.\n"
366            "It is recommended to set several destinations so that "
367            "the scanner can continue if one fails."
368        )
369        reason = "No functional destinations"
370        # Resultdump will set end_event after storing the error
371        return [
372            ResultErrorDestination(relay, [], "", our_nick, msg=reason),
373        ]
374
375    # Pick a relay to help us measure the given relay. If the given relay is an
376    # exit, then pick a non-exit. Otherwise pick an exit.
377    # Instead of ensuring that the relay can exit to all IPs, try first with
378    # the relay as an exit, if it can exit to some IPs.
379    if relay.is_exit_not_bad_allowing_port(dest.port):
380        r = create_path_relay(relay, dest, rl, cb, relay_as_entry=False)
381    else:
382        r = create_path_relay(relay, dest, rl, cb)
383    # When `error_no_helper` is triggered because a helper is not found, what
384    # can happen in test networks with very few relays, it returns a list with
385    # the error.
386    if len(r) == 1:
387        return r
388    circ_fps, nicknames, exit_policy = r
389
390    # Build the circuit
391    circ_id, reason = cb.build_circuit(circ_fps)
392
393    # If the circuit failed to get created, bad luck, it will be created again
394    # with other helper.
395    # Here we won't have the case that an exit tried to build the circuit as
396    # entry and failed (#40029), cause not checking that it can exit all IPs.
397    if not circ_id:
398        return error_no_circuit(
399            circ_fps, nicknames, reason, relay, dest, our_nick
400        )
401    log.debug(
402        "Built circuit with path %s (%s) to measure %s (%s)",
403        circ_fps,
404        nicknames,
405        relay.fingerprint,
406        relay.nickname,
407    )
408    # Make a connection to the destination
409    is_usable, usable_data = connect_to_destination_over_circuit(
410        dest, circ_id, s, cb.controller, dest._max_dl
411    )
412
413    # In the case that the relay was used as an exit, but could not exit
414    # to the Web server, try again using it as entry, to avoid that it would
415    # always fail when there's only one Web server.
416    if not is_usable and relay.is_exit_not_bad_allowing_port(dest.port):
417        log.debug(
418            "Exit %s (%s) that can't exit all ips, with exit policy %s, failed"
419            " to connect to %s via circuit %s (%s). Reason: %s. Trying again "
420            "with it as entry.",
421            relay.fingerprint,
422            relay.nickname,
423            exit_policy,
424            dest.url,
425            circ_fps,
426            nicknames,
427            usable_data,
428        )
429        r = create_path_relay(relay, dest, rl, cb)
430        if len(r) == 1:
431            return r
432        circ_fps, nicknames, exit_policy = r
433        circ_id, reason = cb.build_circuit(circ_fps)
434        if not circ_id:
435            log.info(
436                "Exit %s (%s) that can't exit all ips, failed to create "
437                " circuit as entry: %s (%s).",
438                relay.fingerprint,
439                relay.nickname,
440                circ_fps,
441                nicknames,
442            )
443            return error_no_circuit(
444                circ_fps, nicknames, reason, relay, dest, our_nick
445            )
446
447        log.debug(
448            "Built circuit with path %s (%s) to measure %s (%s)",
449            circ_fps,
450            nicknames,
451            relay.fingerprint,
452            relay.nickname,
453        )
454        is_usable, usable_data = connect_to_destination_over_circuit(
455            dest, circ_id, s, cb.controller, dest._max_dl
456        )
457    if not is_usable:
458        log.debug(
459            "Failed to connect to %s to measure %s (%s) via circuit "
460            "%s (%s). Exit policy: %s. Reason: %s.",
461            dest.url,
462            relay.fingerprint,
463            relay.nickname,
464            circ_fps,
465            nicknames,
466            exit_policy,
467            usable_data,
468        )
469        cb.close_circuit(circ_id)
470        return [
471            ResultErrorStream(
472                relay, circ_fps, dest.url, our_nick, msg=usable_data
473            ),
474        ]
475    assert is_usable
476    assert "content_length" in usable_data
477    # FIRST: measure RTT
478    rtts, reason = measure_rtt_to_server(
479        s, conf, dest, usable_data["content_length"]
480    )
481    if rtts is None:
482        log.debug(
483            "Unable to measure RTT for %s (%s) to %s via circuit "
484            "%s (%s): %s",
485            relay.fingerprint,
486            relay.nickname,
487            dest.url,
488            circ_fps,
489            nicknames,
490            reason,
491        )
492        cb.close_circuit(circ_id)
493        return [
494            ResultErrorStream(
495                relay, circ_fps, dest.url, our_nick, msg=str(reason)
496            ),
497        ]
498    # SECOND: measure bandwidth
499    bw_results, reason = measure_bandwidth_to_server(
500        s, conf, dest, usable_data["content_length"]
501    )
502    if bw_results is None:
503        log.debug(
504            "Failed to measure %s (%s) via circuit %s (%s) to %s. Exit"
505            " policy: %s. Reason: %s.",
506            relay.fingerprint,
507            relay.nickname,
508            circ_fps,
509            nicknames,
510            dest.url,
511            exit_policy,
512            reason,
513        )
514        cb.close_circuit(circ_id)
515        return [
516            ResultErrorStream(
517                relay, circ_fps, dest.url, our_nick, msg=str(reason)
518            ),
519        ]
520    cb.close_circuit(circ_id)
521    # Finally: store result
522    log.debug(
523        "Success measurement for %s (%s) via circuit %s (%s) to %s",
524        relay.fingerprint,
525        relay.nickname,
526        circ_fps,
527        nicknames,
528        dest.url,
529    )
530    return [
531        ResultSuccess(rtts, bw_results, relay, circ_fps, dest.url, our_nick),
532    ]
533
534
535def dispatch_worker_thread(*a, **kw):
536    # If at the point where the relay is actually going to be measured there
537    # are not any functional destinations or the `end_event` is set, do not
538    # try to start measuring the relay, since it will fail anyway.
539    try:
540        # a[2] is the argument `destinations`
541        functional_destinations = a[2].functional_destinations
542    # In case the arguments or the method change, catch the possible exceptions
543    # but ignore here that there are not destinations.
544    except (IndexError, TypeError):
545        log.debug("Wrong argument or attribute.")
546        functional_destinations = True
547    if not functional_destinations or settings.end_event.is_set():
548        return None
549    return measure_relay(*a, **kw)
550
551
552def _should_keep_result(did_request_maximum, result_time, download_times):
553    # In the normal case, we didn't ask for the maximum allowed amount. So we
554    # should only allow ourselves to keep results that are between the min and
555    # max allowed time
556    msg = "Keeping measurement time {:.2f}".format(result_time)
557    if (
558        not did_request_maximum
559        and result_time >= download_times["min"]
560        and result_time < download_times["max"]
561    ):
562        log.debug(msg)
563        return True
564    # If we did request the maximum amount, we should keep the result as long
565    # as it took less than the maximum amount of time
566    if did_request_maximum and result_time < download_times["max"]:
567        log.debug(msg)
568        return True
569    # In all other cases, return false
570    log.debug(
571        "Not keeping result time %f.%s",
572        result_time,
573        ""
574        if not did_request_maximum
575        else " We requested the maximum " "amount allowed.",
576    )
577    return False
578
579
580def _next_expected_amount(
581    expected_amount, result_time, download_times, min_dl, max_dl
582):
583    if result_time < download_times["toofast"]:
584        # Way too fast, greatly increase the amount we ask for
585        expected_amount = int(expected_amount * 5)
586    elif (
587        result_time < download_times["min"]
588        or result_time >= download_times["max"]
589    ):
590        # As long as the result is between min/max, keep the expected amount
591        # the same. Otherwise, adjust so we are aiming for the target amount.
592        expected_amount = int(
593            expected_amount * download_times["target"] / result_time
594        )
595    # Make sure we don't request too much or too little
596    expected_amount = max(min_dl, expected_amount)
597    expected_amount = min(max_dl, expected_amount)
598    return expected_amount
599
600
601def measurement_writer(result_dump, measurement):
602    # Since result_dump thread is calling queue.get() every second,
603    # the queue should be full for only 1 second.
604    # This call blocks at maximum timeout seconds.
605    try:
606        result_dump.queue.put(measurement, timeout=3)
607    except queue.Full:
608        # The result would be lost, the scanner will continue working.
609        log.warning(
610            "The queue with measurements is full, when adding %s.\n"
611            "It is possible that the thread that get them to "
612            "write them to the disk (ResultDump.enter) is stalled.",
613            measurement,
614        )
615
616
617def log_measurement_exception(target, exception):
618    print("in result putter error")
619    if settings.end_event.is_set():
620        return
621    # The only object that can be here if there is not any uncatched
622    # exception is stem.SocketClosed when stopping sbws
623    # An exception here means that the worker thread finished.
624    log.warning(FILLUP_TICKET_MSG)
625    # To print the traceback that happened in the thread, not here in
626    # the main process.
627    log.warning(
628        "".join(
629            traceback.format_exception(
630                type(exception), exception, exception.__traceback__
631            )
632        )
633    )
634    log.debug(
635        "".join(
636            target.fingerprint,
637            target.nickname,
638            traceback.format_exception(
639                type(exception), exception, exception.__traceback__
640            ),
641        )
642    )
643
644
645def main_loop(
646    args,
647    conf,
648    controller,
649    relay_list,
650    circuit_builder,
651    result_dump,
652    relay_prioritizer,
653    destinations,
654):
655    r"""Create the queue of future measurements for every relay to measure.
656
657    It starts a loop that will be run while there is not and event signaling
658    that sbws is stopping (because of SIGTERM or SIGINT).
659
660    Then the ``ThreadPoolExecutor`` (executor) queues all the relays to
661    measure in ``Future`` objects. These objects have an ``state``.
662
663    The executor starts a new thread for every relay to measure, which runs
664    ``measure_relay`` until there are ``max_pending_results`` threads.
665    After that, it will reuse a thread that has finished for every relay to
666    measure.
667
668    Then ``process_completed_futures`` is call, to obtain the results in the
669    completed ``future``\s.
670
671    """
672    log.info("Started the main loop to measure the relays.")
673    hbeat = Heartbeat(conf.getpath("paths", "state_fname"))
674
675    # Set the time to wait for a thread to finish as the half of an HTTP
676    # request timeout.
677    # Do not start a new loop if sbws is stopping.
678    while not settings.end_event.is_set():
679        log.debug("Starting a new measurement loop.")
680        num_relays = 0
681        loop_tstart = time.time()
682
683        # Register relay fingerprints to the heartbeat module
684        hbeat.register_consensus_fprs(relay_list.relays_fingerprints)
685        # num_threads
686        max_pending_results = conf.getint("scanner", "measurement_threads")
687        with concurrent.futures.ThreadPoolExecutor(
688            max_workers=max_pending_results, thread_name_prefix="measurer"
689        ) as executor:
690            log.info("In the executor, queue all future measurements.")
691            # With futures, there's no need for callback, what it was the
692            # callback with multiprocessing library can be just a function
693            # that gets executed when the future result is obtained.
694            pending_results = {
695                executor.submit(
696                    dispatch_worker_thread,
697                    args,
698                    conf,
699                    destinations,
700                    circuit_builder,
701                    relay_list,
702                    target,
703                ): target
704                for target in relay_prioritizer.best_priority()
705            }
706            log.debug("Measurements queued.")
707            # After the submitting all the targets to the executor, the pool
708            # has queued all the relays and pending_results has the list of all
709            # `Future`s.
710
711            # Each target relay_recent_measurement_attempt is incremented in
712            # `process_completed_futures` as well as hbeat measured
713            # fingerprints.
714            num_relays = len(pending_results)
715            # Without a callback, it's needed to pass `result_dump` here to
716            # call the function that writes the measurement when it's
717            # finished.
718            process_completed_futures(
719                executor,
720                hbeat,
721                result_dump,
722                pending_results,
723            )
724            wait_futures_completed(pending_results)
725
726        # Print the heartbeat message
727        hbeat.print_heartbeat_message()
728
729        loop_tstop = time.time()
730        loop_tdelta = (loop_tstop - loop_tstart) / 60
731        # At this point, we know the relays that were queued to be
732        # measured.
733        log.debug(
734            "Attempted to measure %s relays in %s minutes",
735            num_relays,
736            loop_tdelta,
737        )
738        # In a testing network, exit after first loop
739        if controller.get_conf("TestingTorNetwork") == "1":
740            log.info("In a testing network, exiting after the first loop.")
741            # Threads should be closed nicely in some refactor
742            stop_threads(signal.SIGTERM, None)
743
744
745def process_completed_futures(executor, hbeat, result_dump, pending_results):
746    """Obtain the relays' measurements as they finish.
747
748    For every ``Future`` measurements that gets completed, obtain the
749    ``result`` and call ``measurement_writer``, which put the ``Result`` in
750    ``ResultDump`` queue and complete immediately.
751
752    ``ResultDump`` thread (started before and out of this function) will get
753    the ``Result`` from the queue and write it to disk, so this doesn't block
754    the measurement threads.
755
756    If there was an exception not caught by ``measure_relay``, it will call
757    instead ``log_measurement_exception``, which logs the error and complete
758    immediately.
759
760    """
761    num_relays_to_measure = num_pending_results = len(pending_results)
762    with executor:
763        for future_measurement in concurrent.futures.as_completed(
764            pending_results
765        ):
766            target = pending_results[future_measurement]
767            # 40023, disable to decrease state.dat json lines
768            # relay_list.increment_recent_measurement_attempt()
769            target.increment_relay_recent_measurement_attempt()
770
771            # Register this measurement to the heartbeat module
772            hbeat.register_measured_fpr(target.fingerprint)
773            log.debug(
774                "Future measurement for target %s (%s) is done: %s",
775                target.fingerprint,
776                target.nickname,
777                future_measurement.done(),
778            )
779            try:
780                measurement = future_measurement.result()
781            except Exception as e:
782                log_measurement_exception(target, e)
783                import psutil
784
785                log.warning(psutil.Process(os.getpid()).memory_full_info())
786                virtualMemoryInfo = psutil.virtual_memory()
787                availableMemory = virtualMemoryInfo.available
788                log.warning(
789                    "Memory available %s MB.", availableMemory / 1024 ** 2
790                )
791                dumpstacks()
792            else:
793                log.info("Measurement ready: %s" % (measurement))
794                measurement_writer(result_dump, measurement)
795            # `pending_results` has all the initial queued `Future`s,
796            # they don't decrease as they get completed, but we know 1 has be
797            # completed in each loop,
798            num_pending_results -= 1
799            log.info(
800                "Pending measurements: %s out of %s: ",
801                num_pending_results,
802                num_relays_to_measure,
803            )
804
805
806def wait_futures_completed(pending_results):
807    """Wait for last futures to finish, before starting new loop."""
808    log.info("Wait for any remaining measurements.")
809    done, not_done = concurrent.futures.wait(
810        pending_results,
811        timeout=SOCKET_TIMEOUT + 10,  # HTTP timeout is 10
812        return_when=concurrent.futures.ALL_COMPLETED,
813    )
814    log.info("Completed futures: %s", len(done))
815    # log.debug([f.__dict__ for f in done])
816    cancelled = [f for f in done if f.cancelled()]
817    if cancelled:
818        log.warning("Cancelled futures: %s", len(cancelled))
819        for f, t in cancelled:
820            log.debug(t.fingerprint)
821            dumpstacks()
822    if not_done:
823        log.warning("Not completed futures: %s", len(not_done))
824        for f, t in not_done:
825            log.debug(t.fingerprint)
826            dumpstacks()
827
828
829def run_speedtest(args, conf):
830    """Initializes all the data and threads needed to measure the relays.
831
832    It launches or connect to Tor in a thread.
833    It initializes the list of relays seen in the Tor network.
834    It starts a thread to read the previous measurements and wait for new
835    measurements to write them to the disk.
836    It initializes a class that will be used to order the relays depending
837    on their measurements age.
838    It initializes the list of destinations that will be used for the
839    measurements.
840    It initializes the thread pool that will launch the measurement threads.
841    The pool starts 3 other threads that are not the measurement (worker)
842    threads.
843    Finally, it calls the function that will manage the measurement threads.
844
845    """
846    global rd, controller
847
848    controller = stem_utils.launch_or_connect_to_tor(conf)
849
850    # When there will be a refactor where conf is global, this can be removed
851    # from here.
852    state = State(conf.getpath("paths", "state_fname"))
853    # XXX: tech-debt: create new function to obtain the controller and to
854    # write the state, so that a unit test to check the state tor version can
855    # be created
856    # Store tor version whenever the scanner starts.
857    state["tor_version"] = str(controller.get_version())
858    # Call only once to initialize http_headers
859    settings.init_http_headers(
860        conf.get("scanner", "nickname"), state["uuid"], state["tor_version"]
861    )
862    # To do not have to pass args and conf to RelayList, pass an extra
863    # argument with the data_period
864    measurements_period = conf.getint("general", "data_period")
865    rl = RelayList(args, conf, controller, measurements_period, state)
866    cb = CB(args, conf, controller, rl)
867    rd = ResultDump(args, conf)
868    rp = RelayPrioritizer(args, conf, rl, rd)
869    destinations, error_msg = DestinationList.from_config(
870        conf, cb, rl, controller
871    )
872    if not destinations:
873        fail_hard(error_msg)
874    try:
875        main_loop(args, conf, controller, rl, cb, rd, rp, destinations)
876    except KeyboardInterrupt:
877        log.info("Interrupted by the user.")
878        dumpstacks()
879    except Exception as e:
880        log.exception(e)
881        dumpstacks()
882
883
884def gen_parser(sub):
885    d = (
886        "The scanner side of sbws. This should be run on a well-connected "
887        "machine on the Internet with a healthy amount of spare bandwidth. "
888        "This continuously builds circuits, measures relays, and dumps "
889        "results into a datadir, commonly found in ~/.sbws"
890    )
891    sub.add_parser(
892        "scanner", formatter_class=ArgumentDefaultsHelpFormatter, description=d
893    )
894
895
896def main(args, conf):
897    if conf.getint("scanner", "measurement_threads") < 1:
898        fail_hard("Number of measurement threads must be larger than 1")
899
900    min_dl = conf.getint("scanner", "min_download_size")
901    max_dl = conf.getint("scanner", "max_download_size")
902    if max_dl < min_dl:
903        fail_hard(
904            "Max download size %d cannot be smaller than min %d",
905            max_dl,
906            min_dl,
907        )
908
909    os.makedirs(conf.getpath("paths", "datadir"), exist_ok=True)
910
911    state = State(conf.getpath("paths", "state_fname"))
912    state["scanner_started"] = now_isodt_str()
913    # Generate an unique identifier for each scanner
914    if "uuid" not in state:
915        state["uuid"] = str(uuid.uuid4())
916
917    run_speedtest(args, conf)
918