1import collections
2import datetime
3import logging
4import random
5from urllib.parse import urlparse
6
7import requests
8from stem.control import EventType
9
10import sbws.util.stem as stem_utils
11from sbws import settings
12from sbws.globals import DESTINATION_VERIFY_CERTIFICATE
13
14from ..globals import (
15    DELTA_SECONDS_RETRY_DESTINATION,
16    FACTOR_INCREMENT_DESTINATION_RETRY,
17    MAX_NUM_DESTINATION_FAILURES,
18    MAX_SECONDS_RETRY_DESTINATION,
19    NUM_DESTINATION_ATTEMPTS_STORED,
20)
21
22log = logging.getLogger(__name__)
23
24
25# Duplicate some code from DestinationList.from_config,
26# it should be refactored.
27def parse_destinations_countries(conf):
28    """Returns the destinations' country as string separated by comma."""
29    destinations_countries = []
30    for key in conf["destinations"].keys():
31        # Not a destination key
32        if key in ["usability_test_interval"]:
33            continue
34        # The destination is not enabled
35        if not conf["destinations"].getboolean(key):
36            continue
37        destination_section = "destinations.{}".format(key)
38        destination_country = conf[destination_section].get("country", None)
39        destinations_countries.append(destination_country)
40    return ",".join(destinations_countries)
41
42
43def _parse_verify_option(conf_section):
44    if "verify" not in conf_section:
45        return DESTINATION_VERIFY_CERTIFICATE
46    try:
47        verify = conf_section.getboolean("verify")
48    except ValueError:
49        log.warning(
50            "Currently sbws only supports verify=true/false, not a CA bundle "
51            "file. We think %s is not a bool, and thus must be a CA bundle "
52            "file. This is supposed to be allowed by the Python Requests "
53            "library, but pastly couldn't get it to work in his afternoon "
54            "of testing. So we will allow this, but expect Requests to throw "
55            "SSLError exceptions later. Have fun!",
56            conf_section["verify"],
57        )
58        return conf_section["verify"]
59    if not verify:
60        # disable urllib3 warning: InsecureRequestWarning
61        import urllib3
62
63        urllib3.disable_warnings()
64    return verify
65
66
67def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl):
68    """
69    Connect to **dest* over the given **circ_id** using the given Requests
70    **session**. Make sure the destination seems usable. Return True and a
71    dictionary of helpful information if we connected and the destination is
72    usable.  Otherwise return False and a string stating what the issue is.
73
74    This function has two effects, and which one is the "side effect" depends
75    on your goal.
76
77    1. It creates a stream to the destination. It persists in the requests
78    library **session** object so future requests use the same stream.
79    Therefore, the primary effect of this function could be to open a
80    connection to the destination that measurements can be made over the given
81    **circ_id**, which makes the usability checks a side effect (yet important
82    sanity check).
83
84    2. It determines if a destination is usable. Therefore, the primary effect
85    of this function could be to perform the usability checks and return the
86    results of those checks, which makes the persistent stream a side effect
87    that we don't care about.
88
89    As of the time of writing, you'll find that sbws/core/scanner.py uses this
90    function in order to obtain that stream over which to perform measurements.
91    You will also find in sbws/lib/destination.py (this file) this function
92    being used to determine if a Destination is usable. The first relies on the
93    persistent stream side effect, the second ignores it (and in fact throws it
94    away when it closes the circuit).
95
96    :param dest Destination: the place to which we should connect
97    :param circ_id str: the circuit we should connect over
98    :param session Session: the Requests library session object to use to make
99        the connection.
100    :param cont Controller: them Stem library controller controlling Tor
101    :returns: True and a dictionary if everything is in order and measurements
102        should commence.  False and an error string otherwise.
103    """
104    assert isinstance(dest, Destination)
105    log.debug("Connecting to destination over circuit.")
106    # Do not start if sbws is stopping
107    if settings.end_event.is_set():
108        return False, "Shutting down."
109    error_prefix = "When sending HTTP HEAD to {}, ".format(dest.url)
110    with stem_utils.stream_building_lock:
111        listener = stem_utils.attach_stream_to_circuit_listener(cont, circ_id)
112        stem_utils.add_event_listener(cont, listener, EventType.STREAM)
113        try:
114            head = session.head(dest.url, verify=dest.verify)
115        except requests.exceptions.RequestException as e:
116            dest.add_failure()
117            return False, "Could not connect to {} over circ {} {}: {}".format(
118                dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e
119            )
120        finally:
121            stem_utils.remove_event_listener(cont, listener)
122    if head.status_code != requests.codes.ok:
123        dest.add_failure()
124        return (
125            False,
126            error_prefix + "we expected HTTP code "
127            "{} not {}".format(requests.codes.ok, head.status_code),
128        )
129    if "content-length" not in head.headers:
130        dest.add_failure()
131        return (
132            False,
133            error_prefix + "we expect the header Content-Length "
134            "to exist in the response",
135        )
136    content_length = int(head.headers["content-length"])
137    if max_dl > content_length:
138        dest.add_failure()
139        return (
140            False,
141            error_prefix + "our maximum configured download size "
142            "is {} but the content is only {}".format(max_dl, content_length),
143        )
144    log.debug("Connected to %s over circuit %s", dest.url, circ_id)
145    # Any failure connecting to the destination will call add_failure,
146    # It can not be set at the start, to be able to know whether it is
147    # failing consecutive times.
148    dest.add_success()
149    return True, {"content_length": content_length}
150
151
152class Destination:
153    """Web server from which data is downloaded to measure bandwidth."""
154
155    # NOTE: max_dl and verify should be optional and have defaults
156    def __init__(
157        self,
158        url,
159        max_dl,
160        verify,
161        max_num_failures=MAX_NUM_DESTINATION_FAILURES,
162        delta_seconds_retry=DELTA_SECONDS_RETRY_DESTINATION,
163        max_seconds_between_retries=MAX_SECONDS_RETRY_DESTINATION,
164        num_attempts_stored=NUM_DESTINATION_ATTEMPTS_STORED,
165        factor_increment_retry=FACTOR_INCREMENT_DESTINATION_RETRY,
166    ):
167        """Initializes the Web server from which the data is downloaded.
168
169        :param str url: Web server data URL to download.
170        :param int max_dl: Maximum size of the the data to download.
171        :param bool verify: Whether to verify or not the TLS certificate.
172        :param int max_num_failures: Number of consecutive failures when the
173            destination is not considered functional.
174        :param int delta_seconds_retry: Delta time to try a destination
175            that was not functional.
176        :param int num_attempts_stored: Number of attempts to store.
177        :param int factor_increment_retry: Factor to increment delta by
178            before trying to use a destination again.
179        """
180        self._max_dl = max_dl
181        u = urlparse(url)
182        self._url = u
183        self._verify = verify
184
185        # Attributes to decide whether a destination is functional or not.
186        self._max_num_failures = max_num_failures
187        self._num_attempts_stored = num_attempts_stored
188        # Default delta time to try a destination that was not functional.
189        self._default_delta_seconds_retry = delta_seconds_retry
190        self._delta_seconds_retry = delta_seconds_retry
191        # A cap on the time to wait between destination retries.
192        self._max_seconds_between_retries = max_seconds_between_retries
193        # Using a deque (FIFO) to do not grow forever and
194        # to do not have to remove old attempts.
195        # Store tuples of timestamp and whether the destination succeed or not
196        # (succeed, 1, failed, 0).
197        # Initialize it as if it never failed.
198        self._attempts = collections.deque(
199            [
200                (datetime.datetime.utcnow(), 1),
201            ],
202            maxlen=self._num_attempts_stored,
203        )
204        self._factor = factor_increment_retry
205
206    def _last_attempts(self, n=None):
207        """Return the last ``n`` attempts the destination was used."""
208        # deque does not accept slices,
209        # a new deque is returned with the last n items
210        # (or less if there were less).
211        return collections.deque(
212            self._attempts, maxlen=(n or self._max_num_failures)
213        )
214
215    def _are_last_attempts_failures(self, n=None):
216        """
217        Return True if the last`` n`` times the destination was used
218        and failed.
219        """
220        # Count the number that there was a failure when used
221        n = n if n else self._max_num_failures
222        return [i[1] for i in self._last_attempts(n)].count(
223            0
224        ) >= self._max_num_failures
225
226    def _increment_time_to_retry(self, factor=None):
227        """
228        Increment the time a destination will be tried again by a ``factor``.
229        """
230        self._delta_seconds_retry *= factor or self._factor
231        if self._delta_seconds_retry > self._max_seconds_between_retries:
232            self._delta_seconds_retry = self._max_seconds_between_retries
233            log.debug(
234                "Incremented the time to try destination %s past the "
235                "limit, capping it at %s hours.",
236                self.url,
237                self._delta_seconds_retry / 60 / 60,
238            )
239        else:
240            log.debug(
241                "Incremented the time to try destination %s to %s " "hours.",
242                self.url,
243                self._delta_seconds_retry / 60 / 60,
244            )
245
246    def _get_last_try_in_seconds_ago(self):
247        """
248        Return the delta between the last try and now, as positive seconds.
249        """
250        # Timestamp of the last attempt.
251        last_time = self._attempts[-1][0]
252        return (datetime.datetime.utcnow() - last_time).total_seconds()
253
254    def _is_last_try_old_enough(self, n=None):
255        """
256        Return True if the last time it was used it was ``n`` seconds ago.
257        """
258        # If the last attempt is older than _delta_seconds_retry, try again
259        return self._get_last_try_in_seconds_ago() > self._delta_seconds_retry
260
261    def is_functional(self):
262        """Whether connections to a destination are failing or not.
263
264        Return True if:
265            - It did not fail more than n (by default 3) consecutive times.
266            - The last time the destination was tried
267              was x (by default 3h) seconds ago.
268        And False otherwise.
269
270        When the destination is tried again after the consecutive failures,
271        the time to try again is incremented and reset as soon as the
272        destination does not fail.
273        """
274        # NOTE: does a destination fail because several threads are using
275        # it at the same time?
276        # If a destination fails for 1 minute and there're 3 threads, the
277        # 3 threads will fail.
278
279        # Failed the last X consecutive times
280        if self._are_last_attempts_failures():
281            # The log here will appear in all the the queued relays and
282            # threads.
283            log.debug(
284                "The last %s times the destination %s failed. "
285                "It last ran %s seconds ago. "
286                "Disabled for %s seconds.",
287                self._max_num_failures,
288                self.url,
289                self._get_last_try_in_seconds_ago(),
290                self._delta_seconds_retry,
291            )
292            log.warning(
293                "The last %s times a destination failed. "
294                "It last ran %s seconds ago. "
295                "Disabled for %s seconds."
296                "Please, add more destinations or increment the "
297                "number of maximum number of consecutive failures "
298                "in the configuration.",
299                self._max_num_failures,
300                self._get_last_try_in_seconds_ago(),
301                self._delta_seconds_retry,
302            )
303            # It was not used for a while and the last time it was used
304            # was long ago, then try again
305            if self._is_last_try_old_enough():
306                log.debug(
307                    "The destination %s was not tried for %s seconds, "
308                    "it is going to by tried again.",
309                    self.url,
310                    self._get_last_try_in_seconds_ago(),
311                )
312                # Set the next time to retry higher, in case this attempt fails
313                self._increment_time_to_retry()
314                return True
315            return False
316        # Reset the time to retry to the initial value
317        # In case it was incrememented
318        self._delta_seconds_retry = self._default_delta_seconds_retry
319        return True
320
321    def add_failure(self, dt=None):
322        self._attempts.append((dt or datetime.datetime.utcnow(), 0))
323
324    def add_success(self, dt=None):
325        self._attempts.append((dt or datetime.datetime.utcnow(), 1))
326
327    @property
328    def url(self):
329        return self._url.geturl()
330
331    @property
332    def verify(self):
333        return self._verify
334
335    @property
336    def hostname(self):
337        return self._url.hostname
338
339    @property
340    def port(self):
341        p = self._url.port
342        scheme = self._url.scheme
343        if p is None:
344            if scheme == "http":
345                p = 80
346            elif scheme == "https":
347                p = 443
348            else:
349                assert None, "Unreachable. Unknown scheme {}".format(scheme)
350        assert p is not None
351        return p
352
353    @staticmethod
354    def from_config(conf_section, max_dl, number_threads):
355        assert "url" in conf_section
356        url = conf_section["url"]
357        verify = _parse_verify_option(conf_section)
358        try:
359            # Because one a destination fails, all the threads that are using
360            # it at that moment will fail too, multiply by the number of
361            # threads.
362            max_num_failures = (
363                conf_section.getint("max_num_failures")
364                or MAX_NUM_DESTINATION_FAILURES
365            )
366        except ValueError:
367            # If the operator did not setup the number, set to the default.
368            max_num_failures = MAX_NUM_DESTINATION_FAILURES
369
370        max_num_failures *= number_threads
371        return Destination(url, max_dl, verify, max_num_failures)
372
373
374class DestinationList:
375    def __init__(self, conf, dests, circuit_builder, relay_list, controller):
376        assert len(dests) > 0
377        for dest in dests:
378            assert isinstance(dest, Destination)
379        self._rng = random.SystemRandom()
380        self._cont = controller
381        self._cb = circuit_builder
382        self._rl = relay_list
383        self._all_dests = dests
384
385    @property
386    def functional_destinations(self):
387        return [d for d in self._all_dests if d.is_functional()]
388
389    @staticmethod
390    def from_config(conf, circuit_builder, relay_list, controller):
391        assert "destinations" in conf
392        section = conf["destinations"]
393        dests = []
394        for key in section.keys():
395            if key in ["usability_test_interval"]:
396                continue
397            if not section.getboolean(key):
398                log.debug("%s is disabled; not loading it", key)
399                continue
400            dest_sec = "destinations.{}".format(key)
401            assert dest_sec in conf  # validate_config should require this
402            log.debug("Loading info for destination %s", key)
403            dests.append(
404                Destination.from_config(
405                    conf[dest_sec],
406                    # Multiply by the number of threads since all the threads
407                    # will fail at the same time.
408                    conf.getint("scanner", "max_download_size"),
409                    conf.getint("scanner", "measurement_threads"),
410                )
411            )
412        if len(dests) < 1:
413            msg = (
414                "No enabled destinations in config. Please see "
415                'docs/source/man_sbws.ini.rst" or "man 5 sbws.ini" '
416                "for help adding and enabling destinations."
417            )
418            return None, msg
419        return (
420            DestinationList(
421                conf, dests, circuit_builder, relay_list, controller
422            ),
423            "",
424        )
425
426    def next(self):
427        """
428        Returns the next destination that should be used in a measurement
429        """
430        # Do not perform usability tests since a destination is already proven
431        # usable or not in every measurement, and it should depend on a X
432        # number of failures.
433        # This removes the need for an extra lock for every measurement.
434        # Do not change the order of the destinations, just return a
435        # destination.
436        # random.choice raises IndexError with an empty list.
437        if self.functional_destinations:
438            return self._rng.choice(self.functional_destinations)
439        else:
440            return None
441