1# Copyright 2009 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Simple DNS server comparison benchmarking tool.
17
18Designed to assist system administrators in selection and prioritization.
19"""
20
21__author__ = 'tstromberg@google.com (Thomas Stromberg)'
22
23import Queue
24import random
25import threading
26import time
27
28
29class BenchmarkThreads(threading.Thread):
30  """Benchmark multiple nameservers in parallel."""
31
32  def __init__(self, input_queue, results_queue):
33    threading.Thread.__init__(self)
34    self.input = input_queue
35    self.results = results_queue
36
37  def run(self):
38    """Iterate over the queue, processing each item."""
39    while not self.input.empty():
40      try:
41        (ns, request_type, hostname) = self.input.get_nowait()
42        # We've moved this here so that it's after all of the random selection goes through.
43        if '__RANDOM__' in hostname:
44          hostname = hostname.replace('__RANDOM__', str(random.random() * random.randint(0, 99999)))
45
46        (response, duration, error_msg) = ns.TimedRequest(request_type, hostname)
47        self.results.put((ns, request_type, hostname, response, duration, error_msg))
48      except Queue.Empty:
49        return
50
51
52class Benchmark(object):
53  """The main benchmarking class."""
54
55  def __init__(self, nameservers, run_count=2, query_count=30, thread_count=1,
56               status_callback=None):
57    """Constructor.
58
59    Args:
60      nameservers: a list of NameServerData objects
61      run_count: How many test-runs to perform on each nameserver (int)
62      query_count: How many DNS lookups to test in each test-run (int)
63      thread_count: How many benchmark threads to use (int)
64      status_callback: Where to send msg() updates to.
65    """
66    self.query_count = query_count
67    self.run_count = run_count
68    self.thread_count = thread_count
69    self.nameservers = nameservers
70    self.results = {}
71    self.status_callback = status_callback
72
73  def msg(self, msg, **kwargs):
74    if self.status_callback:
75      self.status_callback(msg, **kwargs)
76
77  def _CheckForIndexHostsInResults(self, test_records):
78    """Check if we have already tested index hosts.
79
80    Args:
81      test_records: List of tuples of test records (type, record)
82
83    Returns:
84      A list of results that have already been tested
85      A list of records that still need to be tested.
86    """
87    needs_test = []
88    index_results = {}
89    for test in test_records:
90      matched = False
91      for ns in self.results:
92        for result in self.results[ns][0]:
93          hostname, request_type = result[0:2]
94          if (request_type, hostname) == test:
95            matched = True
96            index_results.setdefault(ns, []).append(result)
97            # So that we don't include the second results if duplicates exist.
98            break
99      if not matched:
100        needs_test.append(test)
101    return (index_results, needs_test)
102
103  def RunIndex(self, test_records):
104    """Run index tests using the same mechanism as a standard benchmark."""
105    if not test_records:
106      print 'No records to test.'
107      return None
108
109    index_results, pending_tests = self._CheckForIndexHostsInResults(test_records)
110    run_results = self._SingleTestRun(pending_tests)
111    for ns in run_results:
112      index_results.setdefault(ns, []).extend(run_results[ns])
113    return index_results
114
115  def Run(self, test_records=None):
116    """Run all test runs for all nameservers."""
117
118    # We don't want to keep stats on how many queries timed out from previous runs.
119    for ns in self.nameservers.enabled:
120      ns.ResetErrorCounts()
121
122    for _ in range(self.run_count):
123      run_results = self._SingleTestRun(test_records)
124      for ns in run_results:
125        self.results.setdefault(ns, []).append(run_results[ns])
126    return self.results
127
128  def _SingleTestRun(self, test_records):
129    """Manage and execute a single test-run on all nameservers.
130
131    We used to run all tests for a nameserver, but the results proved to be
132    unfair if the bandwidth was suddenly constrained. We now run a test on
133    each server before moving on to the next.
134
135    Args:
136      test_records: a list of tuples in the form of (request_type, hostname)
137
138    Returns:
139      results: A dictionary of tuples, keyed by nameserver.
140    """
141    input_queue = Queue.Queue()
142    shuffled_records = {}
143    results = {}
144    # Pre-compute the shuffled test records per-nameserver to avoid thread
145    # contention.
146    for ns in self.nameservers.enabled:
147      random.shuffle(test_records)
148      shuffled_records[ns.ip] = list(test_records)
149
150    # Feed the pre-computed records into the input queue.
151    for i in range(len(test_records)):
152      for ns in self.nameservers.enabled:
153        (request_type, hostname) = shuffled_records[ns.ip][i]
154        input_queue.put((ns, request_type, hostname))
155
156    results_queue = self._LaunchBenchmarkThreads(input_queue)
157    errors = []
158    while results_queue.qsize():
159      (ns, request_type, hostname, response, duration, error_msg) = results_queue.get()
160      if error_msg:
161        duration = ns.timeout * 1000
162        errors.append((ns, error_msg))
163      results.setdefault(ns, []).append((hostname, request_type, duration, response, error_msg))
164
165    for (ns, error_msg) in errors:
166      self.msg('Error querying %s: %s' % (ns, error_msg))
167    return results
168
169  def _LaunchBenchmarkThreads(self, input_queue):
170    """Launch and manage the benchmark threads."""
171    results_queue = Queue.Queue()
172    expected_total = input_queue.qsize()
173    threads = []
174    for unused_thread_num in range(0, self.thread_count):
175      thread = BenchmarkThreads(input_queue, results_queue)
176      thread.start()
177      threads.append(thread)
178
179    query_count = expected_total / len(self.nameservers.enabled)
180    status_message = ('Sending %s queries to %s servers' %
181                      (query_count, len(self.nameservers.enabled)))
182    while results_queue.qsize() != expected_total:
183      self.msg(status_message, count=results_queue.qsize(), total=expected_total)
184      time.sleep(0.5)
185
186    self.msg(status_message, count=results_queue.qsize(), total=expected_total)
187    for thread in threads:
188      thread.join()
189    return results_queue
190
191