1from CTS import * 2from CTStests import CTSTest 3from CTSaudits import ClusterAudit 4from cts.watcher import LogWatcher 5 6class ScenarioComponent: 7 8 def __init__(self, Env): 9 self.Env = Env 10 11 def IsApplicable(self): 12 '''Return TRUE if the current ScenarioComponent is applicable 13 in the given LabEnvironment given to the constructor. 14 ''' 15 16 raise ValueError("Abstract Class member (IsApplicable)") 17 18 def SetUp(self, CM): 19 '''Set up the given ScenarioComponent''' 20 raise ValueError("Abstract Class member (Setup)") 21 22 def TearDown(self, CM): 23 '''Tear down (undo) the given ScenarioComponent''' 24 raise ValueError("Abstract Class member (Setup)") 25 26 27class Scenario: 28 ( 29'''The basic idea of a scenario is that of an ordered list of 30ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn, 31and then after the tests have been run, they are torn down using TearDown() 32(in reverse order). 33 34A Scenario is applicable to a particular cluster manager iff each 35ScenarioComponent is applicable. 36 37A partially set up scenario is torn down if it fails during setup. 38''') 39 40 def __init__(self, ClusterManager, Components, Audits, Tests): 41 42 "Initialize the Scenario from the list of ScenarioComponents" 43 44 self.ClusterManager = ClusterManager 45 self.Components = Components 46 self.Audits = Audits 47 self.Tests = Tests 48 49 self.BadNews = None 50 self.TestSets = [] 51 self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0} 52 self.Sets = [] 53 54 #self.ns=CTS.NodeStatus(self.Env) 55 56 for comp in Components: 57 if not issubclass(comp.__class__, ScenarioComponent): 58 raise ValueError("Init value must be subclass of ScenarioComponent") 59 60 for audit in Audits: 61 if not issubclass(audit.__class__, ClusterAudit): 62 raise ValueError("Init value must be subclass of ClusterAudit") 63 64 for test in Tests: 65 if not issubclass(test.__class__, CTSTest): 66 raise ValueError("Init value must be a subclass of CTSTest") 67 68 def IsApplicable(self): 69 ( 70'''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable() 71''' 72 ) 73 74 for comp in self.Components: 75 if not comp.IsApplicable(): 76 return None 77 return 1 78 79 def SetUp(self): 80 '''Set up the Scenario. Return TRUE on success.''' 81 82 self.ClusterManager.prepare() 83 self.audit() # Also detects remote/local log config 84 self.ClusterManager.StatsMark(0) 85 self.ClusterManager.ns.WaitForAllNodesToComeUp(self.ClusterManager.Env["nodes"]) 86 87 self.audit() 88 if self.ClusterManager.Env["valgrind-tests"]: 89 self.ClusterManager.install_helper("cts.supp") 90 91 self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"], 92 self.ClusterManager.templates.get_patterns( 93 self.ClusterManager.Env["Name"], "BadNews"), "BadNews", 0, 94 kind=self.ClusterManager.Env["LogWatcher"], 95 hosts=self.ClusterManager.Env["nodes"]) 96 self.BadNews.setwatch() # Call after we've figured out what type of log watching to do in LogAudit 97 98 j = 0 99 while j < len(self.Components): 100 if not self.Components[j].SetUp(self.ClusterManager): 101 # OOPS! We failed. Tear partial setups down. 102 self.audit() 103 self.ClusterManager.log("Tearing down partial setup") 104 self.TearDown(j) 105 return None 106 j = j + 1 107 108 self.audit() 109 return 1 110 111 def TearDown(self, max=None): 112 113 '''Tear Down the Scenario - in reverse order.''' 114 115 if max == None: 116 max = len(self.Components)-1 117 j = max 118 while j >= 0: 119 self.Components[j].TearDown(self.ClusterManager) 120 j = j - 1 121 122 self.audit() 123 self.ClusterManager.StatsExtract() 124 125 def incr(self, name): 126 '''Increment (or initialize) the value associated with the given name''' 127 if not name in self.Stats: 128 self.Stats[name] = 0 129 self.Stats[name] = self.Stats[name]+1 130 131 def run(self, Iterations): 132 self.ClusterManager.oprofileStart() 133 try: 134 self.run_loop(Iterations) 135 self.ClusterManager.oprofileStop() 136 except: 137 self.ClusterManager.oprofileStop() 138 raise 139 140 def run_loop(self, Iterations): 141 raise ValueError("Abstract Class member (run_loop)") 142 143 def run_test(self, test, testcount): 144 nodechoice = self.ClusterManager.Env.RandomNode() 145 146 ret = 1 147 where = "" 148 did_run = 0 149 150 self.ClusterManager.StatsMark(testcount) 151 self.ClusterManager.instance_errorstoignore_clear() 152 self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]") 153 154 starttime = test.set_timer() 155 if not test.setup(nodechoice): 156 self.ClusterManager.log("Setup failed") 157 ret = 0 158 159 elif not test.canrunnow(nodechoice): 160 self.ClusterManager.log("Skipped") 161 test.skipped() 162 163 else: 164 did_run = 1 165 ret = test(nodechoice) 166 167 if not test.teardown(nodechoice): 168 self.ClusterManager.log("Teardown failed") 169 if self.ClusterManager.Env["continue"] == 1: 170 answer = "Y" 171 else: 172 try: 173 answer = raw_input('Continue? [nY]') 174 except EOFError as e: 175 answer = "n" 176 if answer and answer == "n": 177 raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice)) 178 ret = 0 179 180 stoptime = time.time() 181 self.ClusterManager.oprofileSave(testcount) 182 183 elapsed_time = stoptime - starttime 184 test_time = stoptime - test.get_timer() 185 if not test["min_time"]: 186 test["elapsed_time"] = elapsed_time 187 test["min_time"] = test_time 188 test["max_time"] = test_time 189 else: 190 test["elapsed_time"] = test["elapsed_time"] + elapsed_time 191 if test_time < test["min_time"]: 192 test["min_time"] = test_time 193 if test_time > test["max_time"]: 194 test["max_time"] = test_time 195 196 if ret: 197 self.incr("success") 198 test.log_timer() 199 else: 200 self.incr("failure") 201 self.ClusterManager.statall() 202 did_run = 1 # Force the test count to be incremented anyway so test extraction works 203 204 self.audit(test.errorstoignore()) 205 return did_run 206 207 def summarize(self): 208 self.ClusterManager.log("****************") 209 self.ClusterManager.log("Overall Results:" + repr(self.Stats)) 210 self.ClusterManager.log("****************") 211 212 stat_filter = { 213 "calls":0, 214 "failure":0, 215 "skipped":0, 216 "auditfail":0, 217 } 218 self.ClusterManager.log("Test Summary") 219 for test in self.Tests: 220 for key in list(stat_filter.keys()): 221 stat_filter[key] = test.Stats[key] 222 self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter)) 223 224 self.ClusterManager.debug("Detailed Results") 225 for test in self.Tests: 226 self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.Stats)) 227 228 self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") 229 230 def audit(self, LocalIgnore=[]): 231 errcount = 0 232 ignorelist = [] 233 ignorelist.append("CTS:") 234 ignorelist.extend(LocalIgnore) 235 ignorelist.extend(self.ClusterManager.errorstoignore()) 236 ignorelist.extend(self.ClusterManager.instance_errorstoignore()) 237 238 # This makes sure everything is stabilized before starting... 239 failed = 0 240 for audit in self.Audits: 241 if not audit(): 242 self.ClusterManager.log("Audit " + audit.name() + " FAILED.") 243 failed += 1 244 else: 245 self.ClusterManager.debug("Audit " + audit.name() + " passed.") 246 247 while errcount < 1000: 248 match = None 249 if self.BadNews: 250 match = self.BadNews.look(0) 251 252 if match: 253 add_err = 1 254 for ignore in ignorelist: 255 if add_err == 1 and re.search(ignore, match): 256 add_err = 0 257 if add_err == 1: 258 self.ClusterManager.log("BadNews: " + match) 259 self.incr("BadNews") 260 errcount = errcount + 1 261 else: 262 break 263 else: 264 if self.ClusterManager.Env["continue"] == 1: 265 answer = "Y" 266 else: 267 try: 268 answer = raw_input('Big problems. Continue? [nY]') 269 except EOFError as e: 270 answer = "n" 271 if answer and answer == "n": 272 self.ClusterManager.log("Shutting down.") 273 self.summarize() 274 self.TearDown() 275 raise ValueError("Looks like we hit a BadNews jackpot!") 276 277 if self.BadNews: 278 self.BadNews.end() 279 return failed 280 281 282class AllOnce(Scenario): 283 '''Every Test Once''' # Accessable as __doc__ 284 def run_loop(self, Iterations): 285 testcount = 1 286 for test in self.Tests: 287 self.run_test(test, testcount) 288 testcount += 1 289 290 291class RandomTests(Scenario): 292 '''Random Test Execution''' 293 def run_loop(self, Iterations): 294 testcount = 1 295 while testcount <= Iterations: 296 test = self.ClusterManager.Env.RandomGen.choice(self.Tests) 297 self.run_test(test, testcount) 298 testcount += 1 299 300 301class BasicSanity(Scenario): 302 '''Basic Cluster Sanity''' 303 def run_loop(self, Iterations): 304 testcount = 1 305 while testcount <= Iterations: 306 test = self.Environment.RandomGen.choice(self.Tests) 307 self.run_test(test, testcount) 308 testcount += 1 309 310 311class Sequence(Scenario): 312 '''Named Tests in Sequence''' 313 def run_loop(self, Iterations): 314 testcount = 1 315 while testcount <= Iterations: 316 for test in self.Tests: 317 self.run_test(test, testcount) 318 testcount += 1 319 320 321class Boot(Scenario): 322 '''Start the Cluster''' 323 def run_loop(self, Iterations): 324 testcount = 0 325 326 327class BootCluster(ScenarioComponent): 328 ( 329'''BootCluster is the most basic of ScenarioComponents. 330This ScenarioComponent simply starts the cluster manager on all the nodes. 331It is fairly robust as it waits for all nodes to come up before starting 332as they might have been rebooted or crashed for some reason beforehand. 333''') 334 def __init__(self, Env): 335 pass 336 337 def IsApplicable(self): 338 '''BootCluster is so generic it is always Applicable''' 339 return 1 340 341 def SetUp(self, CM): 342 '''Basic Cluster Manager startup. Start everything''' 343 344 CM.prepare() 345 346 # Clear out the cobwebs ;-) 347 CM.stopall(verbose=True, force=True) 348 349 # Now start the Cluster Manager on all the nodes. 350 CM.log("Starting Cluster Manager on all nodes.") 351 return CM.startall(verbose=True, quick=True) 352 353 def TearDown(self, CM, force=False): 354 '''Set up the given ScenarioComponent''' 355 356 # Stop the cluster manager everywhere 357 358 CM.log("Stopping Cluster Manager on all nodes") 359 return CM.stopall(verbose=True, force=force) 360 361 362class LeaveBooted(BootCluster): 363 def TearDown(self, CM): 364 '''Set up the given ScenarioComponent''' 365 366 # Stop the cluster manager everywhere 367 368 CM.log("Leaving Cluster running on all nodes") 369 return 1 370 371 372class PingFest(ScenarioComponent): 373 ( 374'''PingFest does a flood ping to each node in the cluster from the test machine. 375 376If the LabEnvironment Parameter PingSize is set, it will be used as the size 377of ping packet requested (via the -s option). If it is not set, it defaults 378to 1024 bytes. 379 380According to the manual page for ping: 381 Outputs packets as fast as they come back or one hundred times per 382 second, whichever is more. For every ECHO_REQUEST sent a period ``.'' 383 is printed, while for every ECHO_REPLY received a backspace is printed. 384 This provides a rapid display of how many packets are being dropped. 385 Only the super-user may use this option. This can be very hard on a net- 386 work and should be used with caution. 387''' ) 388 389 def __init__(self, Env): 390 self.Env = Env 391 392 def IsApplicable(self): 393 '''PingFests are always applicable ;-) 394 ''' 395 396 return 1 397 398 def SetUp(self, CM): 399 '''Start the PingFest!''' 400 401 self.PingSize = 1024 402 if "PingSize" in CM.Env.keys(): 403 self.PingSize = CM.Env["PingSize"] 404 405 CM.log("Starting %d byte flood pings" % self.PingSize) 406 407 self.PingPids = [] 408 for node in CM.Env["nodes"]: 409 self.PingPids.append(self._pingchild(node)) 410 411 CM.log("Ping PIDs: " + repr(self.PingPids)) 412 return 1 413 414 def TearDown(self, CM): 415 '''Stop it right now! My ears are pinging!!''' 416 417 for pid in self.PingPids: 418 if pid != None: 419 CM.log("Stopping ping process %d" % pid) 420 os.kill(pid, signal.SIGKILL) 421 422 def _pingchild(self, node): 423 424 Args = ["ping", "-qfn", "-s", str(self.PingSize), node] 425 426 sys.stdin.flush() 427 sys.stdout.flush() 428 sys.stderr.flush() 429 pid = os.fork() 430 431 if pid < 0: 432 self.Env.log("Cannot fork ping child") 433 return None 434 if pid > 0: 435 return pid 436 437 # Otherwise, we're the child process. 438 439 os.execvp("ping", Args) 440 self.Env.log("Cannot execvp ping: " + repr(Args)) 441 sys.exit(1) 442 443 444class PacketLoss(ScenarioComponent): 445 ( 446''' 447It would be useful to do some testing of CTS with a modest amount of packet loss 448enabled - so we could see that everything runs like it should with a certain 449amount of packet loss present. 450''') 451 452 def IsApplicable(self): 453 '''always Applicable''' 454 return 1 455 456 def SetUp(self, CM): 457 '''Reduce the reliability of communications''' 458 if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 : 459 return 1 460 461 for node in CM.Env["nodes"]: 462 CM.reducecomm_node(node) 463 464 CM.log("Reduce the reliability of communications") 465 466 return 1 467 468 def TearDown(self, CM): 469 '''Fix the reliability of communications''' 470 471 if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 : 472 return 1 473 474 for node in CM.Env["nodes"]: 475 CM.unisolate_node(node) 476 477 CM.log("Fix the reliability of communications") 478 479 480class BasicSanityCheck(ScenarioComponent): 481 ( 482''' 483''') 484 485 def IsApplicable(self): 486 return self.Env["DoBSC"] 487 488 def SetUp(self, CM): 489 490 CM.prepare() 491 492 # Clear out the cobwebs 493 self.TearDown(CM) 494 495 # Now start the Cluster Manager on all the nodes. 496 CM.log("Starting Cluster Manager on BSC node(s).") 497 return CM.startall() 498 499 def TearDown(self, CM): 500 CM.log("Stopping Cluster Manager on BSC node(s).") 501 return CM.stopall() 502 503 504class Benchmark(ScenarioComponent): 505 ( 506''' 507''') 508 509 def IsApplicable(self): 510 return self.Env["benchmark"] 511 512 def SetUp(self, CM): 513 514 CM.prepare() 515 516 # Clear out the cobwebs 517 self.TearDown(CM, force=True) 518 519 # Now start the Cluster Manager on all the nodes. 520 CM.log("Starting Cluster Manager on all node(s).") 521 return CM.startall() 522 523 def TearDown(self, CM): 524 CM.log("Stopping Cluster Manager on all node(s).") 525 return CM.stopall() 526 527 528class RollingUpgrade(ScenarioComponent): 529 ( 530''' 531Test a rolling upgrade between two versions of the stack 532''') 533 534 def __init__(self, Env): 535 self.Env = Env 536 537 def IsApplicable(self): 538 if not self.Env["rpm-dir"]: 539 return None 540 if not self.Env["current-version"]: 541 return None 542 if not self.Env["previous-version"]: 543 return None 544 545 return 1 546 547 def install(self, node, version): 548 549 target_dir = "/tmp/rpm-%s" % version 550 src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version) 551 552 rc = self.CM.rsh(node, "mkdir -p %s" % target_dir) 553 rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir)) 554 rc = self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir)) 555 556 return self.success() 557 558 def upgrade(self, node): 559 return self.install(node, self.CM.Env["current-version"]) 560 561 def downgrade(self, node): 562 return self.install(node, self.CM.Env["previous-version"]) 563 564 def SetUp(self, CM): 565 print(repr(self)+"prepare") 566 CM.prepare() 567 568 # Clear out the cobwebs 569 CM.stopall(force=True) 570 571 CM.log("Downgrading all nodes to %s." % self.Env["previous-version"]) 572 573 for node in self.Env["nodes"]: 574 if not self.downgrade(node): 575 CM.log("Couldn't downgrade %s" % node) 576 return None 577 578 return 1 579 580 def TearDown(self, CM): 581 # Stop everything 582 CM.log("Stopping Cluster Manager on Upgrade nodes.") 583 CM.stopall() 584 585 CM.log("Upgrading all nodes to %s." % self.Env["current-version"]) 586 for node in self.Env["nodes"]: 587 if not self.upgrade(node): 588 CM.log("Couldn't upgrade %s" % node) 589 return None 590 591 return 1 592