1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16"""Simple DNS server comparison benchmarking tool. 17 18Designed to assist system administrators in selection and prioritization. 19""" 20 21__author__ = 'tstromberg@google.com (Thomas Stromberg)' 22 23import Queue 24import random 25import threading 26import time 27 28 29class BenchmarkThreads(threading.Thread): 30 """Benchmark multiple nameservers in parallel.""" 31 32 def __init__(self, input_queue, results_queue): 33 threading.Thread.__init__(self) 34 self.input = input_queue 35 self.results = results_queue 36 37 def run(self): 38 """Iterate over the queue, processing each item.""" 39 while not self.input.empty(): 40 try: 41 (ns, request_type, hostname) = self.input.get_nowait() 42 # We've moved this here so that it's after all of the random selection goes through. 43 if '__RANDOM__' in hostname: 44 hostname = hostname.replace('__RANDOM__', str(random.random() * random.randint(0, 99999))) 45 46 (response, duration, error_msg) = ns.TimedRequest(request_type, hostname) 47 self.results.put((ns, request_type, hostname, response, duration, error_msg)) 48 except Queue.Empty: 49 return 50 51 52class Benchmark(object): 53 """The main benchmarking class.""" 54 55 def __init__(self, nameservers, run_count=2, query_count=30, thread_count=1, 56 status_callback=None): 57 """Constructor. 58 59 Args: 60 nameservers: a list of NameServerData objects 61 run_count: How many test-runs to perform on each nameserver (int) 62 query_count: How many DNS lookups to test in each test-run (int) 63 thread_count: How many benchmark threads to use (int) 64 status_callback: Where to send msg() updates to. 65 """ 66 self.query_count = query_count 67 self.run_count = run_count 68 self.thread_count = thread_count 69 self.nameservers = nameservers 70 self.results = {} 71 self.status_callback = status_callback 72 73 def msg(self, msg, **kwargs): 74 if self.status_callback: 75 self.status_callback(msg, **kwargs) 76 77 def _CheckForIndexHostsInResults(self, test_records): 78 """Check if we have already tested index hosts. 79 80 Args: 81 test_records: List of tuples of test records (type, record) 82 83 Returns: 84 A list of results that have already been tested 85 A list of records that still need to be tested. 86 """ 87 needs_test = [] 88 index_results = {} 89 for test in test_records: 90 matched = False 91 for ns in self.results: 92 for result in self.results[ns][0]: 93 hostname, request_type = result[0:2] 94 if (request_type, hostname) == test: 95 matched = True 96 index_results.setdefault(ns, []).append(result) 97 # So that we don't include the second results if duplicates exist. 98 break 99 if not matched: 100 needs_test.append(test) 101 return (index_results, needs_test) 102 103 def RunIndex(self, test_records): 104 """Run index tests using the same mechanism as a standard benchmark.""" 105 if not test_records: 106 print 'No records to test.' 107 return None 108 109 index_results, pending_tests = self._CheckForIndexHostsInResults(test_records) 110 run_results = self._SingleTestRun(pending_tests) 111 for ns in run_results: 112 index_results.setdefault(ns, []).extend(run_results[ns]) 113 return index_results 114 115 def Run(self, test_records=None): 116 """Run all test runs for all nameservers.""" 117 118 # We don't want to keep stats on how many queries timed out from previous runs. 119 for ns in self.nameservers.enabled: 120 ns.ResetErrorCounts() 121 122 for _ in range(self.run_count): 123 run_results = self._SingleTestRun(test_records) 124 for ns in run_results: 125 self.results.setdefault(ns, []).append(run_results[ns]) 126 return self.results 127 128 def _SingleTestRun(self, test_records): 129 """Manage and execute a single test-run on all nameservers. 130 131 We used to run all tests for a nameserver, but the results proved to be 132 unfair if the bandwidth was suddenly constrained. We now run a test on 133 each server before moving on to the next. 134 135 Args: 136 test_records: a list of tuples in the form of (request_type, hostname) 137 138 Returns: 139 results: A dictionary of tuples, keyed by nameserver. 140 """ 141 input_queue = Queue.Queue() 142 shuffled_records = {} 143 results = {} 144 # Pre-compute the shuffled test records per-nameserver to avoid thread 145 # contention. 146 for ns in self.nameservers.enabled: 147 random.shuffle(test_records) 148 shuffled_records[ns.ip] = list(test_records) 149 150 # Feed the pre-computed records into the input queue. 151 for i in range(len(test_records)): 152 for ns in self.nameservers.enabled: 153 (request_type, hostname) = shuffled_records[ns.ip][i] 154 input_queue.put((ns, request_type, hostname)) 155 156 results_queue = self._LaunchBenchmarkThreads(input_queue) 157 errors = [] 158 while results_queue.qsize(): 159 (ns, request_type, hostname, response, duration, error_msg) = results_queue.get() 160 if error_msg: 161 duration = ns.timeout * 1000 162 errors.append((ns, error_msg)) 163 results.setdefault(ns, []).append((hostname, request_type, duration, response, error_msg)) 164 165 for (ns, error_msg) in errors: 166 self.msg('Error querying %s: %s' % (ns, error_msg)) 167 return results 168 169 def _LaunchBenchmarkThreads(self, input_queue): 170 """Launch and manage the benchmark threads.""" 171 results_queue = Queue.Queue() 172 expected_total = input_queue.qsize() 173 threads = [] 174 for unused_thread_num in range(0, self.thread_count): 175 thread = BenchmarkThreads(input_queue, results_queue) 176 thread.start() 177 threads.append(thread) 178 179 query_count = expected_total / len(self.nameservers.enabled) 180 status_message = ('Sending %s queries to %s servers' % 181 (query_count, len(self.nameservers.enabled))) 182 while results_queue.qsize() != expected_total: 183 self.msg(status_message, count=results_queue.qsize(), total=expected_total) 184 time.sleep(0.5) 185 186 self.msg(status_message, count=results_queue.qsize(), total=expected_total) 187 for thread in threads: 188 thread.join() 189 return results_queue 190 191