1# Copyright 2009 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes to work with bunches of nameservers."""
16
17__author__ = 'tstromberg@google.com (Thomas Stromberg)'
18
19import datetime
20import operator
21import os
22import pickle
23import Queue
24import random
25import subprocess
26import sys
27import tempfile
28import threading
29import time
30
31# 3rd party libraries
32import dns.resolver
33
34import conn_quality
35import nameserver
36import addr_util
37import util
38
39NS_CACHE_SLACK = 2
40CACHE_VER = 4
41
42
43PREFERRED_HEALTH_TIMEOUT_MULTIPLIER = 1.5
44SYSTEM_HEALTH_TIMEOUT_MULTIPLIER = 2
45TOO_DISTANT_MULTIPLIER = 4.75
46MAX_NEARBY_SERVERS = 400
47
48# If we can't ping more than this, go into slowmode.
49MIN_PINGABLE_PERCENT = 20
50MIN_HEALTHY_PERCENT = 10
51SLOW_MODE_THREAD_COUNT = 6
52
53# Windows behaves in unfortunate ways if too many threads are specified
54if sys.platform == 'win32':
55  MAX_SANE_THREAD_COUNT = 32
56else:
57  MAX_SANE_THREAD_COUNT = 100
58
59# Slow down for these, as they are used for timing.
60MAX_INITIAL_HEALTH_THREAD_COUNT = 35
61
62
63def InternalNameServers():
64  """Return list of DNS server IP's used by the host via dnspython"""
65  try:
66    servers = dns.resolver.Resolver().nameservers
67  except:
68    print "Unable to get list of internal DNS servers."
69    servers = []
70
71  # dnspython does not always get things right on Windows, particularly in
72  # versions with right-to-left languages. Fall back to ipconfig /all
73  if not servers and sys.platform[:3] == 'win':
74    return WinIpConfigNameServers()
75  return servers
76
77def WinIpConfigNameServers():
78  """Return a list of DNS servers via ipconfig (Windows only)"""
79  servers = []
80  output = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE).stdout.read()
81  for line in output.split('\r\n'):
82    if 'DNS Servers' in line:
83      print "ipconfig: %s" % line
84      servers.extend(addr_util.ExtractIPsFromString(line))
85  return servers
86
87class OutgoingUdpInterception(Exception):
88
89  def __init__(self, value):
90    self.value = value
91
92  def __str__(self):
93    return repr(self.value)
94
95
96class TooFewNameservers(Exception):
97
98  def __init__(self, value):
99    self.value = value
100
101  def __str__(self):
102    return repr(self.value)
103
104
105class ThreadFailure(Exception):
106
107  def __init__(self):
108    pass
109
110
111class QueryThreads(threading.Thread):
112  """Quickly see which nameservers are awake."""
113
114  def __init__(self, input_queue, results_queue, action_type, checks=None):
115    threading.Thread.__init__(self)
116    self.input = input_queue
117    self.action_type = action_type
118    self.results = results_queue
119    self.checks = checks
120    self.halt = False
121
122  def stop(self):
123    self.halt = True
124
125  def run(self):
126    """Iterate over the queue, processing each item."""
127    while not self.halt and not self.input.empty():
128      # check_wildcards is special: it has a tuple of two nameservers
129      if self.action_type == 'wildcard_check':
130        try:
131          (ns, other_ns) = self.input.get_nowait()
132        except Queue.Empty:
133          return
134        if ns.disabled or other_ns.disabled:
135          self.results.put(None)
136          continue
137        else:
138          self.results.put((ns, ns.TestSharedCache(other_ns)))
139      # everything else only has a single nameserver.
140      else:
141        try:
142          ns = self.input.get_nowait()
143        except Queue.Empty:
144          return
145
146        if ns.disabled:
147          self.results.put(None)
148          continue
149        if self.action_type == 'ping':
150          self.results.put(ns.CheckHealth(fast_check=True))
151        elif self.action_type == 'health':
152          self.results.put(ns.CheckHealth(sanity_checks=self.checks))
153        elif self.action_type == 'final':
154          self.results.put(ns.CheckHealth(sanity_checks=self.checks, final_check=True))
155        elif self.action_type == 'port_behavior':
156          self.results.put(ns.CheckHealth(sanity_checks=self.checks, port_check=True))
157        elif self.action_type == 'censorship':
158          self.results.put(ns.CheckCensorship(self.checks))
159        elif self.action_type == 'store_wildcards':
160          self.results.put(ns.StoreWildcardCache())
161        else:
162          raise ValueError('Invalid action type: %s' % self.action_type)
163
164
165class NameServers(list):
166
167  def __init__(self, nameservers, global_servers=None, regional_servers=None,
168               num_servers=1,
169               include_internal=False, threads=5, status_callback=None,
170               timeout=5, health_timeout=5, ping_timeout=1,
171               skip_cache_collusion_checks=False,
172               ipv6_only=False):
173    self.seen_ips = set()
174    self.seen_names = set()
175    self.timeout = timeout
176    self.ping_timeout = ping_timeout
177    self.num_servers = num_servers
178    self.requested_health_timeout = health_timeout
179    self.skip_cache_collusion_checks = skip_cache_collusion_checks
180    self.health_timeout = health_timeout
181    self.min_healthy_percent = MIN_HEALTHY_PERCENT
182    self.status_callback = status_callback
183    self.cache_dir = tempfile.gettempdir()
184    self.ipv6_only = ipv6_only
185    if threads > MAX_SANE_THREAD_COUNT:
186      self.msg('Lowing thread count from %s to sane limit of %s' %
187               (threads, MAX_SANE_THREAD_COUNT))
188      self.thread_count = MAX_SANE_THREAD_COUNT
189    else:
190      self.thread_count = threads
191
192    self.ApplyCongestionFactor()
193    super(NameServers, self).__init__()
194    self.system_nameservers = InternalNameServers()
195    if nameservers:
196      for (ip, name) in nameservers:
197        if (not name or name == ip) and ip in self.system_nameservers:
198          name = 'SYS-%s' % ip
199        self.AddServer(ip, name, is_custom=True, is_preferred=True)
200
201    if global_servers:
202      for (ip, name) in global_servers:
203        self.AddServer(ip, name, is_global=True, is_preferred=True)
204
205    if regional_servers:
206      for (ip, name) in regional_servers:
207        self.AddServer(ip, name)
208
209    if include_internal:
210      for ip in self.system_nameservers:
211        self.AddServer(ip, 'SYS-%s' % ip, is_preferred=True)
212
213  @property
214  def preferred(self):
215    return [x for x in self if x.is_preferred]
216
217  @property
218  def enabled_preferred(self):
219    return [x for x in self.preferred if not x.disabled]
220
221  @property
222  def secondaries(self):
223    return [x for x in self.SortByNearest() if not x.is_preferred]
224
225  @property
226  def enabled_secondaries(self):
227    return [x for x in self.secondaries if not x.disabled]
228
229  @property
230  def enabled(self):
231    return [x for x in self if not x.disabled]
232
233  @property
234  def check_average(self):
235    return util.CalculateListAverage([x.check_average for x in self if not x.disabled])
236
237  def msg(self, msg, count=None, total=None, **kwargs):
238    if self.status_callback:
239      self.status_callback(msg, count=count, total=total, **kwargs)
240    else:
241      print '%s [%s/%s]' % (msg, count, total)
242
243  def AddServer(self, ip, name, is_preferred=False, is_global=False, is_regional=False,
244                is_custom=False):
245    """Add a server to the list given an IP and name."""
246
247    ns = nameserver.NameServer(ip, name=name, preferred=is_preferred)
248    if self.ipv6_only and not ns.is_ipv6:
249      return
250
251    ns.is_custom = is_custom
252
253    if ip in self.system_nameservers:
254      ns.is_system = True
255      ns.is_preferred = True
256      ns.is_custom = False
257      ns.system_position = self.system_nameservers.index(ip)
258
259    ns.is_global = is_global
260    ns.is_regional = is_regional
261    ns.timeout = self.timeout
262    ns.ping_timeout = self.ping_timeout
263
264    # Give them a little extra love for the road.
265    if ns.is_system:
266      ns.health_timeout = self.health_timeout * SYSTEM_HEALTH_TIMEOUT_MULTIPLIER
267      ns.ping_timeout = self.ping_timeout * SYSTEM_HEALTH_TIMEOUT_MULTIPLIER
268    elif ns.is_preferred:
269      ns.health_timeout = self.health_timeout * PREFERRED_HEALTH_TIMEOUT_MULTIPLIER
270      ns.ping_timeout = self.ping_timeout * PREFERRED_HEALTH_TIMEOUT_MULTIPLIER
271    else:
272      ns.health_timeout = self.health_timeout
273    self.append(ns)
274
275  def append(self, ns):
276    """Add a nameserver to the list, guaranteeing uniqueness."""
277    if ns.ip in self.seen_ips:
278      # Perhaps we already know of the IP, but do not have a proper name for it
279      if ns.name != ns.ip:
280        for existing_ns in self:
281          if existing_ns.ip == ns.ip and existing_ns.name == existing_ns.ip:
282            existing_ns.name = ns.name
283      return None
284
285    # Add an identifier to the name if necessary.
286    if ns.name in self.seen_names:
287      for identifier in range(2, 10):
288        new_name = ''.join((ns.name, '-', str(identifier)))
289        if new_name not in self.seen_names:
290          ns.name = new_name
291          break
292
293#    print "Adding: %s [%s]" % (ns.name, ns.ip)
294    super(NameServers, self).append(ns)
295    self.seen_ips.add(ns.ip)
296    self.seen_names.add(ns.name)
297
298  def ApplyCongestionFactor(self):
299    # If we are only benchmarking one server, don't bother w/ congestion checking.
300
301    if len(self) == 1:
302      return
303    cq = conn_quality.ConnectionQuality(status_callback=self.status_callback)
304    (intercepted, unused_level, multiplier) = cq.CheckConnectionQuality()[0:3]
305    if intercepted:
306      raise OutgoingUdpInterception(
307          'Your router or Internet Service Provider appears to be intercepting '
308          'and redirecting all outgoing DNS requests. This means you cannot '
309          'benchmark or utilize alternate DNS servers. Please adjust your '
310          'router configuration or file a support request with your ISP.'
311      )
312    if multiplier > 1:
313# TODO(tstromberg): Consider retiring this feature for good.
314#      self.timeout *= multiplier
315      self.health_timeout *= multiplier
316      self.ping_timeout *= multiplier
317      self.msg('Applied %.2fX timeout multiplier due to congestion: %2.1f ping, %2.1f health.'
318               % (multiplier, self.ping_timeout, self.health_timeout))
319
320  def InvokeSecondaryCache(self):
321    """Delete secondary ips that do not exist in the cache file."""
322    cached = False
323    if self.cache_dir:
324      cpath = self._SecondaryCachePath()
325      cache_data = self._LoadSecondaryCache(cpath)
326      if cache_data:
327        for ns in self:
328          ns.warnings = set()
329          ns.checks = []
330        cached = True
331        cached_ips = [x.ip for x in cache_data if not x.is_preferred]
332        for ns in list(self.secondaries):
333          if ns.ip not in cached_ips:
334            self.remove(ns)
335    return cached
336
337  def RemoveBrokenServers(self, delete_unwanted=False):
338    """Quietly remove broken servers."""
339    for ns in list(self):
340      if ns.disabled and delete_unwanted and (ns.is_ipv6 or not ns.is_preferred):
341        self.remove(ns)
342
343  def DisableDistantServers(self, multiplier=TOO_DISTANT_MULTIPLIER, max_servers=MAX_NEARBY_SERVERS):
344    """Disable servers who's fastest duration is multiplier * average of best 10 servers."""
345
346    self.RemoveBrokenServers(delete_unwanted=True)
347    secondaries = self.secondaries
348    fastest = [x for x in self.SortByFastest() if not x.disabled ][:10]
349    best_10 = util.CalculateListAverage([x.fastest_check_duration for x in fastest])
350    cutoff = best_10 * multiplier
351    self.msg("Removing secondary nameservers slower than %0.2fms (max=%s)" % (cutoff, max_servers))
352    for idx, ns in enumerate(secondaries):
353      if (ns.fastest_check_duration > cutoff) or idx > max_servers:
354        self.remove(ns)
355
356  def DisableUnwantedServers(self, target_count, delete_unwanted=False):
357    """Given a target count, delete nameservers that we do not plan to test."""
358    self.RemoveBrokenServers(delete_unwanted)
359
360    # Magic secondary mixing algorithm:
361    # - Half of them should be the "nearest" nameservers
362    # - Half of them should be the "fastest average" nameservers
363    preferred_count = len(self.enabled_preferred)
364    secondaries_needed = target_count - preferred_count
365    if secondaries_needed < 1 or not self.secondaries:
366      return
367    nearest_needed = int(secondaries_needed / 2.0)
368
369    if secondaries_needed < 50:
370      self.msg("Picking %s secondary servers to use (%s nearest, %s fastest)" %
371               (secondaries_needed, nearest_needed, secondaries_needed - nearest_needed))
372
373    # Phase two is picking the nearest secondary server
374    secondaries_to_keep = []
375    for ns in self.SortByNearest():
376
377      if not ns.is_preferred and not ns.disabled:
378        if not secondaries_to_keep and secondaries_needed < 15:
379          self.msg('%s appears to be the nearest regional (%0.2fms)' % (ns, ns.fastest_check_duration))
380        secondaries_to_keep.append(ns)
381        if len(secondaries_to_keep) >= nearest_needed:
382          break
383
384    # Phase three is removing all of the slower secondary servers
385    for ns in self.SortByFastest():
386      if not ns.is_preferred and not ns.disabled and ns not in secondaries_to_keep:
387        secondaries_to_keep.append(ns)
388        if len(secondaries_to_keep) >= secondaries_needed:
389          break
390
391    for ns in self.secondaries:
392      if ns not in secondaries_to_keep:
393#        print "REMOVE: Fastest: %0.2f Avg: %0.2f:  %s - %s" % (ns.fastest_check_duration, ns.check_average, ns, ns.checks)
394        self.remove(ns)
395#      else:
396#        print "KEEP  : Fastest: %0.2f Avg: %0.2f:  %s - %s" % (ns.fastest_check_duration, ns.check_average, ns, ns.checks)
397
398  def CheckHealth(self, primary_checks, secondary_checks, cache_dir=None, censor_tests=None):
399    """Filter out unhealthy or slow replica servers."""
400    if len(self) == 1:
401      return None
402
403    if cache_dir:
404      self.cache_dir = cache_dir
405
406    cpath = self._SecondaryCachePath()
407    try:
408      cached = self.InvokeSecondaryCache()
409    except:
410      self.msg('Failed to use secondary cache in [%s]: %s' % (cpath, util.GetLastExceptionString()))
411      cached = False
412    if not cached:
413      self.msg('Building initial DNS cache for %s nameservers (%s threads)' %
414               (len(self), self.thread_count))
415
416    self.PingNameServers()
417    if len(self.enabled) > int(self.num_servers * NS_CACHE_SLACK):
418      self.DisableDistantServers()
419    self.RunHealthCheckThreads(primary_checks)
420    if len(self.enabled) > self.num_servers:
421      self._DemoteSecondaryGlobalNameServers()
422    self.DisableUnwantedServers(target_count=int(self.num_servers * NS_CACHE_SLACK), delete_unwanted=True)
423    if not cached:
424      try:
425        self._UpdateSecondaryCache(cpath)
426      except:
427        self.msg('Failed to save secondary cache in [%s]: %s' % (cpath, util.GetLastExceptionString()))
428
429    if not self.skip_cache_collusion_checks:
430      self.CheckCacheCollusion()
431    self.DisableUnwantedServers(self.num_servers)
432
433    self.RunFinalHealthCheckThreads(secondary_checks)
434    if censor_tests:
435      self.RunCensorshipCheckThreads(censor_tests)
436    else:
437      # If we aren't doing censorship checks, quiet any possible false positives.
438      self._RemoveGlobalWarnings()
439    if not self.enabled:
440      raise TooFewNameservers('None of the nameservers tested are healthy')
441
442  def _RemoveGlobalWarnings(self):
443    """If all nameservers have the same warning, remove it. It's likely false."""
444    ns_count = len(self.enabled)
445    seen_counts = {}
446
447    # No sense in checking for duplicate warnings if we only have one server.
448    if len(self.enabled) == 1:
449      return
450
451    for ns in self.enabled:
452      for warning in ns.warnings:
453        seen_counts[warning] = seen_counts.get(warning, 0) + 1
454
455    for warning in seen_counts:
456      if seen_counts[warning] == ns_count:
457        self.msg('All nameservers have warning: %s (likely a false positive)' % warning)
458        for ns in self.enabled:
459          ns.warnings.remove(warning)
460
461  def _DemoteSecondaryGlobalNameServers(self):
462    """For global nameservers, demote the slower IP to secondary status."""
463    seen = {}
464    for ns in self.SortByFastest():
465      if ns.is_preferred:
466        # TODO(tstromberg): Have a better way of denoting secondary anycast.
467        provider = ns.name.replace('-2', '')
468        if provider in seen and not ns.is_system:
469          faster_ns = seen[provider]
470          self.msg('Making %s the primary anycast - faster than %s by %2.2fms' %
471                   (faster_ns.name_and_node, ns.name_and_node, ns.check_average - faster_ns.check_average))
472          ns.is_preferred = False
473        else:
474          seen[provider] = ns
475
476  def _SecondaryCachePath(self):
477    """Find a usable and unique location to store health results."""
478    secondary_ips = [x.ip for x in self.secondaries]
479    checksum = hash(str(sorted(secondary_ips)))
480    basefile = '.'.join(map(str, ('namebench', CACHE_VER, len(secondary_ips),
481                                  '_'.join(self.system_nameservers),
482                                  self.num_servers,
483                                  self.requested_health_timeout, checksum)))
484    return os.path.join(self.cache_dir, basefile)
485
486  def InvalidateSecondaryCache(self):
487    cpath = self._SecondaryCachePath()
488    if os.path.exists(cpath):
489      self.msg('Removing %s' % cpath)
490      os.unlink(cpath)
491
492  def _LoadSecondaryCache(self, cpath):
493    """Check if our health cache has any good data."""
494    if os.path.exists(cpath) and os.path.isfile(cpath):
495#      self.msg('Loading local server health cache: %s' % cpath)
496      cf = open(cpath, 'r')
497      try:
498        return pickle.load(cf)
499      except EOFError:
500        self.msg('No cached nameserver data found')
501    return False
502
503  def _UpdateSecondaryCache(self, cpath):
504    """Update the cache with our object."""
505    cf = open(cpath, 'w')
506    try:
507      pickle.dump(list(self), cf)
508    except TypeError, exc:
509      self.msg('Could not save cache: %s' % exc)
510
511  def SortByFastest(self):
512    """Return a list of healthy servers in fastest-first order."""
513    return sorted(self, key=operator.attrgetter('check_average'))
514
515  def SortByNearest(self):
516    """Return a list of healthy servers in fastest-first order."""
517    return sorted(self, key=operator.attrgetter('fastest_check_duration'))
518
519  def ResetTestResults(self):
520    """Reset the testng status of all disabled hosts."""
521    return [ns.ResetTestStatus() for ns in self]
522
523  def CheckCacheCollusion(self):
524    """Mark if any nameservers share cache, especially if they are slower."""
525    self.RunWildcardStoreThreads()
526    sleepy_time = 4
527    self.msg("Waiting %ss for TTL's to decrement." % sleepy_time)
528    time.sleep(sleepy_time)
529
530    test_combos = []
531    good_nameservers = [x for x in self.SortByFastest() if not x.disabled]
532    for ns in good_nameservers:
533      for compare_ns in good_nameservers:
534        if ns != compare_ns:
535          test_combos.append((compare_ns, ns))
536
537    results = self.RunCacheCollusionThreads(test_combos)
538    while not results.empty():
539      (ns, shared_ns) = results.get()
540      if shared_ns:
541        ns.shared_with.add(shared_ns)
542        shared_ns.shared_with.add(ns)
543        if ns.disabled or shared_ns.disabled:
544          continue
545
546        if ns.check_average > shared_ns.check_average:
547          slower = ns
548          faster = shared_ns
549        else:
550          slower = shared_ns
551          faster = ns
552
553        if slower.system_position == 0:
554          faster.disabled = 'Shares-cache with current primary DNS server'
555          slower.warnings.add('Replica of %s' % faster.ip)
556        elif slower.is_preferred and not faster.is_preferred:
557          faster.disabled = 'Replica of %s [%s]' % (slower.name, slower.ip)
558          slower.warnings.add('Replica of %s [%s]' % (faster.name, faster.ip))
559        else:
560          diff = slower.check_average - faster.check_average
561          self.msg("Disabling %s - slower replica of %s by %0.1fms." % (slower.name_and_node, faster.name_and_node, diff))
562          slower.disabled = 'Slower replica of %s [%s]' % (faster.name, faster.ip)
563          faster.warnings.add('Replica of %s [%s]' % (slower.name, slower.ip))
564
565  def _LaunchQueryThreads(self, action_type, status_message, items,
566                          thread_count=None, **kwargs):
567    """Launch query threads for a given action type.
568
569    Args:
570      action_type: a string describing an action type to pass
571      status_message: Status to show during updates.
572      items: A list of items to pass to the queue
573      thread_count: How many threads to use (int)
574      kwargs: Arguments to pass to QueryThreads()
575
576    Returns:
577      results_queue: Results from the query tests.
578
579    Raises:
580      TooFewNameservers: If no tested nameservers are healthy.
581    """
582    threads = []
583    input_queue = Queue.Queue()
584    results_queue = Queue.Queue()
585
586    # items are usually nameservers
587    random.shuffle(items)
588    for item in items:
589      input_queue.put(item)
590
591    if not thread_count:
592      thread_count = self.thread_count
593    if thread_count > len(items):
594      thread_count = len(items)
595
596    status_message += ' (%s threads)' % thread_count
597
598    self.msg(status_message, count=0, total=len(items))
599    for _ in range(0, thread_count):
600      thread = QueryThreads(input_queue, results_queue, action_type, **kwargs)
601      try:
602        thread.start()
603      except:
604        self.msg("ThreadingError with %s threads: waiting for completion before retrying." % thread_count)
605        for thread in threads:
606          thread.stop()
607          thread.join()
608        raise ThreadFailure()
609      threads.append(thread)
610
611    while results_queue.qsize() != len(items):
612      self.msg(status_message, count=results_queue.qsize(), total=len(items))
613      time.sleep(0.5)
614
615    self.msg(status_message, count=results_queue.qsize(), total=len(items))
616    for thread in threads:
617      thread.join()
618
619    if not self.enabled:
620      raise TooFewNameservers('None of the %s nameservers tested are healthy' % len(self))
621
622    return results_queue
623
624  def RunCacheCollusionThreads(self, test_combos):
625    """Schedule and manage threading for cache collusion checks."""
626    return self._LaunchQueryThreads('wildcard_check', 'Running cache-sharing checks on %s servers' % len(self.enabled), test_combos)
627
628  def PingNameServers(self):
629    """Quickly ping nameservers to see which are available."""
630    start = datetime.datetime.now()
631    try:
632      results = self._LaunchQueryThreads('ping', 'Checking nameserver availability', list(self.enabled))
633    except ThreadFailure:
634      self.msg("It looks like you couldn't handle %s threads, trying again with %s (slow)" % (self.thread_count, SLOW_MODE_THREAD_COUNT))
635      self.thread_count = SLOW_MODE_THREAD_COUNT
636      self.ResetTestResults()
637      results = self._LaunchQueryThreads('ping', 'Checking nameserver availability', list(self.enabled))
638
639    success_rate = (float(len(self.enabled)) / float(len(self))) * 100
640    if success_rate < MIN_PINGABLE_PERCENT:
641      self.msg('How odd! Only %0.1f percent of name servers were pingable. Trying again with %s threads (slow)'
642               % (success_rate, SLOW_MODE_THREAD_COUNT))
643      self.ResetTestResults()
644      self.thread_count = SLOW_MODE_THREAD_COUNT
645      results = self._LaunchQueryThreads('ping', 'Checking nameserver availability', list(self))
646    if self.enabled:
647      success_rate = (float(len(self.enabled)) / float(len(self))) * 100
648      self.msg('%s of %s servers are available (duration: %s)' %
649               (len(self.enabled), len(self), datetime.datetime.now() - start))
650
651    return results
652
653  def RunHealthCheckThreads(self, checks):
654    """Quickly ping nameservers to see which are healthy."""
655
656    status_msg = 'Running initial health checks on %s servers' % len(self.enabled)
657
658    if self.thread_count > MAX_INITIAL_HEALTH_THREAD_COUNT:
659      thread_count = MAX_INITIAL_HEALTH_THREAD_COUNT
660    else:
661      thread_count = self.thread_count
662
663    try:
664      results = self._LaunchQueryThreads('health', status_msg, list(self.enabled),
665                                         checks=checks, thread_count=thread_count)
666    except ThreadFailure:
667      self.msg("It looks like you couldn't handle %s threads, trying again with %s (slow)" % (thread_count, SLOW_MODE_THREAD_COUNT))
668      self.thread_count = SLOW_MODE_THREAD_COUNT
669      self.ResetTestResults()
670      results = self._LaunchQueryThreads('ping', 'Checking nameserver availability', list(self))
671
672    success_rate = (float(len(self.enabled)) / float(len(self))) * 100
673    if success_rate < self.min_healthy_percent:
674      self.msg('How odd! Only %0.1f percent of name servers are healthy. Trying again with %s threads (slow)'
675               % (success_rate, SLOW_MODE_THREAD_COUNT))
676      self.ResetTestResults()
677      self.thread_count = SLOW_MODE_THREAD_COUNT
678      time.sleep(5)
679      results = self._LaunchQueryThreads('health', status_msg, list(self.enabled),
680                                         checks=checks, thread_count=thread_count)
681    if self.enabled:
682      success_rate = (float(len(self.enabled)) / float(len(self))) * 100
683      self.msg('%s of %s tested name servers are healthy' % (len(self.enabled), len(self)))
684
685    return results
686
687  def RunFinalHealthCheckThreads(self, checks):
688    """Quickly ping nameservers to see which are healthy."""
689    status_msg = 'Running final health checks on %s servers' % len(self.enabled)
690    return self._LaunchQueryThreads('final', status_msg, list(self.enabled), checks=checks)
691
692  def RunCensorshipCheckThreads(self, checks):
693    """Quickly ping nameservers to see which are healthy."""
694    status_msg = 'Running censorship checks on %s servers' % len(self.enabled)
695    return self._LaunchQueryThreads('censorship', status_msg, list(self.enabled), checks=checks)
696
697  def RunPortBehaviorThreads(self):
698    """Get port behavior data."""
699    status_msg = 'Running port behavior checks on %s servers' % len(self.enabled)
700    return self._LaunchQueryThreads('port_behavior', status_msg, list(self.enabled))
701
702  def RunWildcardStoreThreads(self):
703    """Store a wildcard cache value for all nameservers (using threads)."""
704    status_msg = 'Waiting for wildcard cache queries from %s servers' % len(self.enabled)
705    return self._LaunchQueryThreads('store_wildcards', status_msg, list(self.enabled))
706
707