1import copy
2import logging
3import random
4from datetime import datetime, timedelta
5from threading import Lock
6
7from stem import ControllerError, DescriptorUnavailable, Flag
8from stem.descriptor.router_status_entry import RouterStatusEntryV3
9from stem.descriptor.server_descriptor import ServerDescriptor
10
11from ..globals import (
12    MAX_RECENT_CONSENSUS_COUNT,
13    MAX_RECENT_PRIORITY_LIST_COUNT,
14    MAX_RECENT_PRIORITY_RELAY_COUNT,
15    MEASUREMENTS_PERIOD,
16)
17from ..util import timestamps
18
19log = logging.getLogger(__name__)
20
21
22def valid_after_from_network_statuses(network_statuses):
23    """Obtain the consensus Valid-After datetime from the ``document``
24    attribute of a ``stem.descriptor.RouterStatusEntryV3``.
25
26    :param list network_statuses:
27
28    returns datetime:
29    """
30    for ns in network_statuses:
31        document = getattr(ns, "document", None)
32        if document:
33            valid_after = getattr(document, "valid_after", None)
34            if valid_after:
35                return valid_after
36    return datetime.utcnow().replace(microsecond=0)
37
38
39class Relay:
40    def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
41        """
42        Given a relay fingerprint, fetch all the information about a relay that
43        sbws currently needs and store it in this class. Acts as an abstraction
44        to hide the confusion that is Tor consensus/descriptor stuff.
45
46        :param str fp: fingerprint of the relay.
47        :param cont: active and valid stem Tor controller connection
48
49        :param datatime timestamp: the timestamp of a consensus
50            (RouterStatusEntryV3) from which this relay has been obtained.
51        """
52        assert isinstance(fp, str)
53        assert len(fp) == 40
54        if ns is not None:
55            assert isinstance(ns, RouterStatusEntryV3)
56            self._ns = ns
57        else:
58            try:
59                self._ns = cont.get_network_status(fp, default=None)
60            except (DescriptorUnavailable, ControllerError) as e:
61                log.exception("Exception trying to get ns %s", e)
62                self._ns = None
63        if desc is not None:
64            assert isinstance(desc, ServerDescriptor)
65            self._desc = desc
66        else:
67            try:
68                self._desc = cont.get_server_descriptor(fp, default=None)
69            except (DescriptorUnavailable, ControllerError) as e:
70                log.exception("Exception trying to get desc %s", e)
71        self.relay_in_recent_consensus = timestamps.DateTimeSeq(
72            [], MAX_RECENT_CONSENSUS_COUNT
73        )
74        # Use the same timestamp as the consensus, so that it can be tested
75        # that the relay was in a consensus using this timestamp.
76        # Note that this doesn't change the number of consensus the relay was
77        # in.
78        self.update_relay_in_recent_consensus(timestamp)
79        # The number of times that a relay is "prioritized" to be measured.
80        # It is incremented in ``RelayPrioritizer.best_priority``
81        self.relay_recent_priority_list = timestamps.DateTimeSeq(
82            [], MAX_RECENT_PRIORITY_LIST_COUNT
83        )
84        # The number of times that a relay has been queued to be measured.
85        # It is incremented in ``scanner.main_loop``
86        self.relay_recent_measurement_attempt = timestamps.DateTimeSeq(
87            [], MAX_RECENT_PRIORITY_LIST_COUNT
88        )
89
90    def _from_desc(self, attr):
91        if not self._desc:
92            return None
93        return getattr(self._desc, attr, None)
94
95    def _from_ns(self, attr):
96        if not self._ns:
97            return None
98        return getattr(self._ns, attr, None)
99
100    @property
101    def nickname(self):
102        return self._from_ns("nickname")
103
104    @property
105    def fingerprint(self):
106        return self._from_ns("fingerprint")
107
108    @property
109    def flags(self):
110        return self._from_ns("flags")
111
112    @property
113    def exit_policy(self):
114        return self._from_desc("exit_policy")
115
116    @property
117    def average_bandwidth(self):
118        return self._from_desc("average_bandwidth")
119
120    @property
121    def burst_bandwidth(self):
122        return self._from_desc("burst_bandwidth")
123
124    @property
125    def observed_bandwidth(self):
126        return self._from_desc("observed_bandwidth")
127
128    @property
129    def consensus_bandwidth(self):
130        """Return the consensus bandwidth in Bytes.
131
132        Consensus bandwidth is the only bandwidth value that is in kilobytes.
133        """
134        if self._from_ns("bandwidth") is not None:
135            return self._from_ns("bandwidth") * 1000
136
137    @property
138    def consensus_bandwidth_is_unmeasured(self):
139        # measured appears only votes, unmeasured appears in consensus
140        # therefore is_unmeasured is needed to know whether the bandwidth
141        # value in consensus is coming from bwauth measurements or not.
142        return self._from_ns("is_unmeasured")
143
144    @property
145    def address(self):
146        return self._from_ns("address")
147
148    @property
149    def master_key_ed25519(self):
150        """Obtain ed25519 master key of the relay in server descriptors.
151
152        :returns: str, the ed25519 master key base 64 encoded without
153                  trailing '='s.
154
155        """
156        # Even if this key is called master-key-ed25519 in dir-spec.txt,
157        # it seems that stem parses it as ed25519_master_key
158        key = self._from_desc("ed25519_master_key")
159        if key is None:
160            return None
161        return key.rstrip("=")
162
163    @property
164    def consensus_valid_after(self):
165        """Obtain the consensus Valid-After from the document of this relay
166        network status.
167        """
168        network_status_document = self._from_ns("document")
169        if network_status_document:
170            return getattr(network_status_document, "valid_after", None)
171        return None
172
173    @property
174    def last_consensus_timestamp(self):
175        return self.relay_in_recent_consensus.last()
176
177    def update_relay_in_recent_consensus(self, timestamp=None):
178        self.relay_in_recent_consensus.update(timestamp)
179
180    @property
181    def relay_in_recent_consensus_count(self):
182        """Number of times the relay was in a conensus."""
183        return len(self.relay_in_recent_consensus)
184
185    def can_exit_to_port(self, port, strict=False):
186        """
187        Returns True if the relay has an exit policy and the policy accepts
188        exiting to the given port or False otherwise.
189
190        If ``strict`` is true, it only returns the exits that can exit to all
191        IPs and that port.
192
193        The exits that are IPv6 only or IPv4 but rejecting some public networks
194        will return false.
195        On July 2020, there were 67 out of 1095 exits like this.
196
197        If ``strict`` is false, it returns any exit that can exit to some
198        public IPs and that port.
199
200        Note that the EXIT flag exists when the relay can exit to 443 **and**
201        80. Currently all Web servers are using 443, so it would not be needed
202        to check the EXIT flag too, using this function.
203
204        """
205        assert isinstance(port, int)
206        # if dind't get the descriptor, there isn't exit policy
207        # When the attribute is gotten in getattr(self._desc, "exit_policy"),
208        # is possible that stem's _input_rules is None and raises an exception
209        # (#29899):
210        #   File "/usr/lib/python3/dist-packages/sbws/lib/relaylist.py", line 117, in can_exit_to_port  # noqa
211        #     if not self.exit_policy:
212        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 512, in __len__  # noqa
213        #     return len(self._get_rules())
214        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 464, in _get_rules  # noqa
215        #     for rule in decompressed_rules:
216        # TypeError: 'NoneType' object is not iterable
217        # Therefore, catch the exception here.
218        try:
219            if self.exit_policy:
220                # Using `strip_private` to ignore reject rules to private
221                # networks.
222                # When ``strict`` is true, We could increase the chances that
223                # the exit can exit via IPv6 too (``exit_policy_v6``). However,
224                # in theory that is only known using microdescriptors.
225                return self.exit_policy.strip_private().can_exit_to(
226                    port=port, strict=strict
227                )
228        except TypeError:
229            return False
230        return False
231
232    def is_exit_not_bad_allowing_port(self, port, strict=False):
233        return (
234            Flag.BADEXIT not in self.flags
235            and Flag.EXIT in self.flags
236            and self.can_exit_to_port(port, strict)
237        )
238
239    def increment_relay_recent_measurement_attempt(self):
240        """
241        Increment The number of times that a relay has been queued
242        to be measured.
243
244        It is call from :func:`~sbws.core.scaner.main_loop`.
245        """
246        self.relay_recent_measurement_attempt.update()
247
248    @property
249    def relay_recent_measurement_attempt_count(self):
250        return len(self.relay_recent_measurement_attempt)
251
252    def increment_relay_recent_priority_list(self):
253        """
254        The number of times that a relay is "prioritized" to be measured.
255
256        It is call from
257        :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`.
258        """
259        # If it was not in the previous measurements version, start counting
260        self.relay_recent_priority_list.update()
261
262    @property
263    def relay_recent_priority_list_count(self):
264        return len(self.relay_recent_priority_list)
265
266    # XXX: tech-debt: replace `_desc` attr by a a `dequee` of the last
267    # descriptors seen for this relay and the timestamp.
268    def update_server_descriptor(self, server_descriptor):
269        """Update this relay server descriptor (from the consensus."""
270        self._desc = server_descriptor
271
272    # XXX: tech-debt: replace `_ns` attr by a a `dequee` of the last
273    # router statuses seen for this relay and the timestampt.
274    def update_router_status(self, router_status):
275        """Update this relay router status (from the consensus)."""
276        self._ns = router_status
277
278
279class RelayList:
280    """Keeps a list of all relays in the current Tor network and updates it
281    transparently in the background. Provides useful interfaces for getting
282    only relays of a certain type.
283    """
284
285    def __init__(
286        self,
287        args,
288        conf,
289        controller,
290        measurements_period=MEASUREMENTS_PERIOD,
291        state=None,
292    ):
293        self._controller = controller
294        self.rng = random.SystemRandom()
295        self._refresh_lock = Lock()
296        # To track all the consensus seen.
297        self._recent_consensus = timestamps.DateTimeSeq(
298            [], MAX_RECENT_CONSENSUS_COUNT, state, "recent_consensus"
299        )
300        # Initialize so that there's no error trying to access to it.
301        # In future refactor, change to a dictionary, where the keys are
302        # the relays' fingerprint.
303        self._relays = []
304        # The period of time for which the measurements are keep.
305        self._measurements_period = measurements_period
306        self._recent_measurement_attempt = timestamps.DateTimeSeq(
307            [],
308            MAX_RECENT_PRIORITY_RELAY_COUNT,
309            state,
310            "recent_measurement_attempt",
311        )
312        # Start with 0 for the min bw for our second hops
313        self._exit_min_bw = 0
314        self._non_exit_min_bw = 0
315        self._refresh()
316
317    def _need_refresh(self):
318        # New consensuses happen every hour.
319        return datetime.utcnow() >= self.last_consensus_timestamp + timedelta(
320            seconds=60 * 60
321        )
322
323    @property
324    def last_consensus_timestamp(self):
325        """Returns the datetime when the last consensus was obtained."""
326        return self._recent_consensus.last()
327
328    @property
329    def relays(self):
330        # See if we can get the list of relays without having to do a refresh,
331        # which is expensive and blocks other threads
332        if self._need_refresh():
333            log.debug(
334                "We need to refresh our list of relays. "
335                "Going to wait for lock."
336            )
337            # Whelp we couldn't just get the list of relays because the list is
338            # stale. Wait for the lock so we can refresh it.
339            with self._refresh_lock:
340                log.debug(
341                    "We got the lock. Now to see if we still "
342                    "need to refresh."
343                )
344                # Now we have the lock ... but wait! Maybe someone else already
345                # did the refreshing. So check if it still needs refreshing. If
346                # not, we can do nothing.
347                if self._need_refresh():
348                    log.debug("Yup we need to refresh our relays. Doing so.")
349                    self._refresh()
350                else:
351                    log.debug(
352                        "No we don't need to refresh our relays. "
353                        "It was done by someone else."
354                    )
355            log.debug("Giving back the lock for refreshing relays.")
356        return self._relays
357
358    @property
359    def fast(self):
360        return self._relays_with_flag(Flag.FAST)
361
362    @property
363    def exits(self):
364        return self._relays_with_flag(Flag.EXIT)
365
366    @property
367    def bad_exits(self):
368        return self._relays_with_flag(Flag.BADEXIT)
369
370    @property
371    def non_exits(self):
372        return self._relays_without_flag(Flag.EXIT)
373
374    @property
375    def guards(self):
376        return self._relays_with_flag(Flag.GUARD)
377
378    @property
379    def authorities(self):
380        return self._relays_with_flag(Flag.AUTHORITY)
381
382    @property
383    def relays_fingerprints(self):
384        # Using relays instead of _relays, so that the list get updated if
385        # needed, since this method is used to know which fingerprints are in
386        # the consensus.
387        return [r.fingerprint for r in self.relays]
388
389    def random_relay(self):
390        return self.rng.choice(self.relays)
391
392    def _relays_with_flag(self, flag):
393        return [r for r in self.relays if flag in r.flags]
394
395    def _relays_without_flag(self, flag):
396        return [r for r in self.relays if flag not in r.flags]
397
398    def _init_relays(self):
399        """Returns a new list of relays that are in the current consensus.
400        And update the consensus timestamp list with the current one.
401
402        """
403        c = self._controller
404        # This will get router statuses from this Tor cache, might not be
405        # updated with the network.
406        # Change to stem.descriptor.remote in future refactor.
407        network_statuses = c.get_network_statuses()
408        new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])
409        log.debug(
410            "Number of relays in the current consensus: %d.",
411            len(new_relays_dict),
412        )
413
414        # Find the timestamp of the last consensus.
415        timestamp = valid_after_from_network_statuses(network_statuses)
416        self._recent_consensus.update(timestamp)
417
418        new_relays = []
419
420        # Only or debugging, count the relays that are not in the current
421        # consensus and have not been seen in the last consensuses either.
422        num_old_relays = 0
423
424        relays = copy.deepcopy(self._relays)
425        for r in relays:
426            if r.fingerprint in new_relays_dict.keys():
427                # If a relay in the previous consensus and is in the current
428                # one, update its timestamp, router status and descriptor.
429                fp = r.fingerprint
430                # new_relays_dict[fp] is the router status.
431                r.update_router_status(new_relays_dict[fp])
432                r.update_relay_in_recent_consensus(timestamp)
433                try:
434                    descriptor = c.get_server_descriptor(fp, default=None)
435                except (DescriptorUnavailable, ControllerError) as e:
436                    log.exception("Exception trying to get desc %s", e)
437                r.update_server_descriptor(descriptor)
438                # Add it to the new list of relays.
439                new_relays.append(r)
440                # And remove it from the new consensus dict, as it has
441                # already added to the new list.
442                new_relays_dict.pop(fp)
443
444            # In #30727, the relay that is not in the current conensus but is
445            # not "old", was added to the new list of relays too.
446            # In #40037 we think it should not be measured, as it might cause
447            # many circuit errors. It's already added to the generator.
448            # Otherwise, don't add it to the new list of relays.
449            # For debugging, count the old relays that will be discarded.
450            else:
451                num_old_relays += 1
452
453        # Finally, add the relays that were not in the previous consensus
454        for fp, ns in new_relays_dict.items():
455            r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
456            new_relays.append(r)
457
458        days = self._measurements_period / (60 * 60 * 24)
459        log.debug(
460            "Previous number of relays being measured %d", len(self._relays)
461        )
462        log.debug(
463            "Number of relays not in the in the consensus in the last "
464            "%d days: %d.",
465            days,
466            num_old_relays,
467        )
468        log.debug(
469            "Number of relays to measure with the current consensus: " "%d",
470            len(new_relays),
471        )
472        return new_relays
473
474    def _refresh(self):
475        # Set a new list of relays.
476        self._relays = self._init_relays()
477
478        log.info(
479            "Number of consensuses obtained in the last %s days: %s.",
480            int(self._measurements_period / 24 / 60 / 60),
481            self.recent_consensus_count,
482        )
483
484        # Calculate minimum bandwidth value for 2nd hop after we refreshed
485        # our available relays.
486        self._calculate_min_bw_second_hop()
487
488    @property
489    def recent_consensus_count(self):
490        """Number of times a new consensus was obtained."""
491        return len(self._recent_consensus)
492
493    def exits_not_bad_allowing_port(self, port, strict=False):
494        return [
495            r
496            for r in self.exits
497            if r.is_exit_not_bad_allowing_port(port, strict)
498        ]
499
500    def increment_recent_measurement_attempt(self):
501        """
502        Increment the number of times that any relay has been queued to be
503        measured.
504
505        It is call from :func:`~sbws.core.scaner.main_loop`.
506
507        It is read and stored in a ``state`` file.
508        """
509        # NOTE: blocking, writes to file!
510        self._recent_measurement_attempt.update()
511
512    @property
513    def recent_measurement_attempt_count(self):
514        return len(self._recent_measurement_attempt)
515
516    def _calculate_min_bw_second_hop(self):
517        """
518        Calculates the minimum bandwidth for both exit and non-exit relays
519        chosen as a second hop by picking the lowest bandwidth value available
520        from the top 75% of the respective category.
521        """
522        # Sort our sets of candidates according to bw, lowest amount first.
523        # It's okay to keep things simple for the calculation and go over all
524        # exits, including badexits.
525        exit_candidates = sorted(
526            self.exits, key=lambda r: r.consensus_bandwidth
527        )
528        non_exit_candidates = sorted(
529            self.non_exits, key=lambda r: r.consensus_bandwidth
530        )
531        # We know the bandwidth is sorted from least to most. Dividing the
532        # length of the available relays by 4 gives us the position of the
533        # relay with the lowest bandwidth from the top 75%. We do this both
534        # for our exit and non-exit candidates.
535        pos = int(len(exit_candidates) / 4)
536        self._exit_min_bw = exit_candidates[pos].consensus_bandwidth
537        pos = int(len(non_exit_candidates) / 4)
538        # when there are not non-exits in a test network
539        if pos:
540            self._non_exit_min_bw = non_exit_candidates[
541                pos
542            ].consensus_bandwidth
543
544    def exit_min_bw(self):
545        return self._exit_min_bw
546
547    def non_exit_min_bw(self):
548        return self._non_exit_min_bw
549