1from CTS import *
2from CTStests import CTSTest
3from CTSaudits import ClusterAudit
4from cts.watcher  import LogWatcher
5
6class ScenarioComponent:
7
8    def __init__(self, Env):
9        self.Env = Env
10
11    def IsApplicable(self):
12        '''Return TRUE if the current ScenarioComponent is applicable
13        in the given LabEnvironment given to the constructor.
14        '''
15
16        raise ValueError("Abstract Class member (IsApplicable)")
17
18    def SetUp(self, CM):
19        '''Set up the given ScenarioComponent'''
20        raise ValueError("Abstract Class member (Setup)")
21
22    def TearDown(self, CM):
23        '''Tear down (undo) the given ScenarioComponent'''
24        raise ValueError("Abstract Class member (Setup)")
25
26
27class Scenario:
28    (
29'''The basic idea of a scenario is that of an ordered list of
30ScenarioComponent objects.  Each ScenarioComponent is SetUp() in turn,
31and then after the tests have been run, they are torn down using TearDown()
32(in reverse order).
33
34A Scenario is applicable to a particular cluster manager iff each
35ScenarioComponent is applicable.
36
37A partially set up scenario is torn down if it fails during setup.
38''')
39
40    def __init__(self, ClusterManager, Components, Audits, Tests):
41
42        "Initialize the Scenario from the list of ScenarioComponents"
43
44        self.ClusterManager = ClusterManager
45        self.Components = Components
46        self.Audits  = Audits
47        self.Tests = Tests
48
49        self.BadNews = None
50        self.TestSets = []
51        self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0}
52        self.Sets = []
53
54        #self.ns=CTS.NodeStatus(self.Env)
55
56        for comp in Components:
57            if not issubclass(comp.__class__, ScenarioComponent):
58                raise ValueError("Init value must be subclass of ScenarioComponent")
59
60        for audit in Audits:
61            if not issubclass(audit.__class__, ClusterAudit):
62                raise ValueError("Init value must be subclass of ClusterAudit")
63
64        for test in Tests:
65            if not issubclass(test.__class__, CTSTest):
66                raise ValueError("Init value must be a subclass of CTSTest")
67
68    def IsApplicable(self):
69        (
70'''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable()
71'''
72        )
73
74        for comp in self.Components:
75            if not comp.IsApplicable():
76                return None
77        return 1
78
79    def SetUp(self):
80        '''Set up the Scenario. Return TRUE on success.'''
81
82        self.ClusterManager.prepare()
83        self.audit() # Also detects remote/local log config
84        self.ClusterManager.StatsMark(0)
85        self.ClusterManager.ns.WaitForAllNodesToComeUp(self.ClusterManager.Env["nodes"])
86
87        self.audit()
88        if self.ClusterManager.Env["valgrind-tests"]:
89            self.ClusterManager.install_helper("cts.supp")
90
91        self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"],
92                                  self.ClusterManager.templates.get_patterns(
93                                      self.ClusterManager.Env["Name"], "BadNews"), "BadNews", 0,
94                                  kind=self.ClusterManager.Env["LogWatcher"],
95                                  hosts=self.ClusterManager.Env["nodes"])
96        self.BadNews.setwatch() # Call after we've figured out what type of log watching to do in LogAudit
97
98        j = 0
99        while j < len(self.Components):
100            if not self.Components[j].SetUp(self.ClusterManager):
101                # OOPS!  We failed.  Tear partial setups down.
102                self.audit()
103                self.ClusterManager.log("Tearing down partial setup")
104                self.TearDown(j)
105                return None
106            j = j + 1
107
108        self.audit()
109        return 1
110
111    def TearDown(self, max=None):
112
113        '''Tear Down the Scenario - in reverse order.'''
114
115        if max == None:
116            max = len(self.Components)-1
117        j = max
118        while j >= 0:
119            self.Components[j].TearDown(self.ClusterManager)
120            j = j - 1
121
122        self.audit()
123        self.ClusterManager.StatsExtract()
124
125    def incr(self, name):
126        '''Increment (or initialize) the value associated with the given name'''
127        if not name in self.Stats:
128            self.Stats[name] = 0
129        self.Stats[name] = self.Stats[name]+1
130
131    def run(self, Iterations):
132        self.ClusterManager.oprofileStart()
133        try:
134            self.run_loop(Iterations)
135            self.ClusterManager.oprofileStop()
136        except:
137            self.ClusterManager.oprofileStop()
138            raise
139
140    def run_loop(self, Iterations):
141        raise ValueError("Abstract Class member (run_loop)")
142
143    def run_test(self, test, testcount):
144        nodechoice = self.ClusterManager.Env.RandomNode()
145
146        ret = 1
147        where = ""
148        did_run = 0
149
150        self.ClusterManager.StatsMark(testcount)
151        self.ClusterManager.instance_errorstoignore_clear()
152        self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]")
153
154        starttime = test.set_timer()
155        if not test.setup(nodechoice):
156            self.ClusterManager.log("Setup failed")
157            ret = 0
158
159        elif not test.canrunnow(nodechoice):
160            self.ClusterManager.log("Skipped")
161            test.skipped()
162
163        else:
164            did_run = 1
165            ret = test(nodechoice)
166
167        if not test.teardown(nodechoice):
168            self.ClusterManager.log("Teardown failed")
169            if self.ClusterManager.Env["continue"] == 1:
170                answer = "Y"
171            else:
172                try:
173                    answer = raw_input('Continue? [nY]')
174                except EOFError as e:
175                    answer = "n"
176            if answer and answer == "n":
177                raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
178            ret = 0
179
180        stoptime = time.time()
181        self.ClusterManager.oprofileSave(testcount)
182
183        elapsed_time = stoptime - starttime
184        test_time = stoptime - test.get_timer()
185        if not test["min_time"]:
186            test["elapsed_time"] = elapsed_time
187            test["min_time"] = test_time
188            test["max_time"] = test_time
189        else:
190            test["elapsed_time"] = test["elapsed_time"] + elapsed_time
191            if test_time < test["min_time"]:
192                test["min_time"] = test_time
193            if test_time > test["max_time"]:
194                test["max_time"] = test_time
195
196        if ret:
197            self.incr("success")
198            test.log_timer()
199        else:
200            self.incr("failure")
201            self.ClusterManager.statall()
202            did_run = 1  # Force the test count to be incremented anyway so test extraction works
203
204        self.audit(test.errorstoignore())
205        return did_run
206
207    def summarize(self):
208        self.ClusterManager.log("****************")
209        self.ClusterManager.log("Overall Results:" + repr(self.Stats))
210        self.ClusterManager.log("****************")
211
212        stat_filter = {
213            "calls":0,
214            "failure":0,
215            "skipped":0,
216            "auditfail":0,
217            }
218        self.ClusterManager.log("Test Summary")
219        for test in self.Tests:
220            for key in list(stat_filter.keys()):
221                stat_filter[key] = test.Stats[key]
222            self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter))
223
224        self.ClusterManager.debug("Detailed Results")
225        for test in self.Tests:
226            self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.Stats))
227
228        self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
229
230    def audit(self, LocalIgnore=[]):
231        errcount = 0
232        ignorelist = []
233        ignorelist.append("CTS:")
234        ignorelist.extend(LocalIgnore)
235        ignorelist.extend(self.ClusterManager.errorstoignore())
236        ignorelist.extend(self.ClusterManager.instance_errorstoignore())
237
238        # This makes sure everything is stabilized before starting...
239        failed = 0
240        for audit in self.Audits:
241            if not audit():
242                self.ClusterManager.log("Audit " + audit.name() + " FAILED.")
243                failed += 1
244            else:
245                self.ClusterManager.debug("Audit " + audit.name() + " passed.")
246
247        while errcount < 1000:
248            match = None
249            if self.BadNews:
250                match = self.BadNews.look(0)
251
252            if match:
253                add_err = 1
254                for ignore in ignorelist:
255                    if add_err == 1 and re.search(ignore, match):
256                        add_err = 0
257                if add_err == 1:
258                    self.ClusterManager.log("BadNews: " + match)
259                    self.incr("BadNews")
260                    errcount = errcount + 1
261            else:
262                break
263        else:
264            if self.ClusterManager.Env["continue"] == 1:
265                answer = "Y"
266            else:
267                try:
268                    answer = raw_input('Big problems. Continue? [nY]')
269                except EOFError as e:
270                    answer = "n"
271            if answer and answer == "n":
272                self.ClusterManager.log("Shutting down.")
273                self.summarize()
274                self.TearDown()
275                raise ValueError("Looks like we hit a BadNews jackpot!")
276
277        if self.BadNews:
278            self.BadNews.end()
279        return failed
280
281
282class AllOnce(Scenario):
283    '''Every Test Once''' # Accessable as __doc__
284    def run_loop(self, Iterations):
285        testcount = 1
286        for test in self.Tests:
287            self.run_test(test, testcount)
288            testcount += 1
289
290
291class RandomTests(Scenario):
292    '''Random Test Execution'''
293    def run_loop(self, Iterations):
294        testcount = 1
295        while testcount <= Iterations:
296            test = self.ClusterManager.Env.RandomGen.choice(self.Tests)
297            self.run_test(test, testcount)
298            testcount += 1
299
300
301class BasicSanity(Scenario):
302    '''Basic Cluster Sanity'''
303    def run_loop(self, Iterations):
304        testcount = 1
305        while testcount <= Iterations:
306            test = self.Environment.RandomGen.choice(self.Tests)
307            self.run_test(test, testcount)
308            testcount += 1
309
310
311class Sequence(Scenario):
312    '''Named Tests in Sequence'''
313    def run_loop(self, Iterations):
314        testcount = 1
315        while testcount <= Iterations:
316            for test in self.Tests:
317                self.run_test(test, testcount)
318                testcount += 1
319
320
321class Boot(Scenario):
322    '''Start the Cluster'''
323    def run_loop(self, Iterations):
324        testcount = 0
325
326
327class BootCluster(ScenarioComponent):
328    (
329'''BootCluster is the most basic of ScenarioComponents.
330This ScenarioComponent simply starts the cluster manager on all the nodes.
331It is fairly robust as it waits for all nodes to come up before starting
332as they might have been rebooted or crashed for some reason beforehand.
333''')
334    def __init__(self, Env):
335        pass
336
337    def IsApplicable(self):
338        '''BootCluster is so generic it is always Applicable'''
339        return 1
340
341    def SetUp(self, CM):
342        '''Basic Cluster Manager startup.  Start everything'''
343
344        CM.prepare()
345
346        #        Clear out the cobwebs ;-)
347        CM.stopall(verbose=True, force=True)
348
349        # Now start the Cluster Manager on all the nodes.
350        CM.log("Starting Cluster Manager on all nodes.")
351        return CM.startall(verbose=True, quick=True)
352
353    def TearDown(self, CM, force=False):
354        '''Set up the given ScenarioComponent'''
355
356        # Stop the cluster manager everywhere
357
358        CM.log("Stopping Cluster Manager on all nodes")
359        return CM.stopall(verbose=True, force=force)
360
361
362class LeaveBooted(BootCluster):
363    def TearDown(self, CM):
364        '''Set up the given ScenarioComponent'''
365
366        # Stop the cluster manager everywhere
367
368        CM.log("Leaving Cluster running on all nodes")
369        return 1
370
371
372class PingFest(ScenarioComponent):
373    (
374'''PingFest does a flood ping to each node in the cluster from the test machine.
375
376If the LabEnvironment Parameter PingSize is set, it will be used as the size
377of ping packet requested (via the -s option).  If it is not set, it defaults
378to 1024 bytes.
379
380According to the manual page for ping:
381    Outputs packets as fast as they come back or one hundred times per
382    second, whichever is more.  For every ECHO_REQUEST sent a period ``.''
383    is printed, while for every ECHO_REPLY received a backspace is printed.
384    This provides a rapid display of how many packets are being dropped.
385    Only the super-user may use this option.  This can be very hard on a net-
386    work and should be used with caution.
387''' )
388
389    def __init__(self, Env):
390        self.Env = Env
391
392    def IsApplicable(self):
393        '''PingFests are always applicable ;-)
394        '''
395
396        return 1
397
398    def SetUp(self, CM):
399        '''Start the PingFest!'''
400
401        self.PingSize = 1024
402        if "PingSize" in CM.Env.keys():
403                self.PingSize = CM.Env["PingSize"]
404
405        CM.log("Starting %d byte flood pings" % self.PingSize)
406
407        self.PingPids = []
408        for node in CM.Env["nodes"]:
409            self.PingPids.append(self._pingchild(node))
410
411        CM.log("Ping PIDs: " + repr(self.PingPids))
412        return 1
413
414    def TearDown(self, CM):
415        '''Stop it right now!  My ears are pinging!!'''
416
417        for pid in self.PingPids:
418            if pid != None:
419                CM.log("Stopping ping process %d" % pid)
420                os.kill(pid, signal.SIGKILL)
421
422    def _pingchild(self, node):
423
424        Args = ["ping", "-qfn", "-s", str(self.PingSize), node]
425
426        sys.stdin.flush()
427        sys.stdout.flush()
428        sys.stderr.flush()
429        pid = os.fork()
430
431        if pid < 0:
432            self.Env.log("Cannot fork ping child")
433            return None
434        if pid > 0:
435            return pid
436
437        # Otherwise, we're the child process.
438
439        os.execvp("ping", Args)
440        self.Env.log("Cannot execvp ping: " + repr(Args))
441        sys.exit(1)
442
443
444class PacketLoss(ScenarioComponent):
445    (
446'''
447It would be useful to do some testing of CTS with a modest amount of packet loss
448enabled - so we could see that everything runs like it should with a certain
449amount of packet loss present.
450''')
451
452    def IsApplicable(self):
453        '''always Applicable'''
454        return 1
455
456    def SetUp(self, CM):
457        '''Reduce the reliability of communications'''
458        if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 :
459            return 1
460
461        for node in CM.Env["nodes"]:
462            CM.reducecomm_node(node)
463
464        CM.log("Reduce the reliability of communications")
465
466        return 1
467
468    def TearDown(self, CM):
469        '''Fix the reliability of communications'''
470
471        if float(CM.Env["XmitLoss"]) == 0 and float(CM.Env["RecvLoss"]) == 0 :
472            return 1
473
474        for node in CM.Env["nodes"]:
475            CM.unisolate_node(node)
476
477        CM.log("Fix the reliability of communications")
478
479
480class BasicSanityCheck(ScenarioComponent):
481    (
482'''
483''')
484
485    def IsApplicable(self):
486        return self.Env["DoBSC"]
487
488    def SetUp(self, CM):
489
490        CM.prepare()
491
492        # Clear out the cobwebs
493        self.TearDown(CM)
494
495        # Now start the Cluster Manager on all the nodes.
496        CM.log("Starting Cluster Manager on BSC node(s).")
497        return CM.startall()
498
499    def TearDown(self, CM):
500        CM.log("Stopping Cluster Manager on BSC node(s).")
501        return CM.stopall()
502
503
504class Benchmark(ScenarioComponent):
505    (
506'''
507''')
508
509    def IsApplicable(self):
510        return self.Env["benchmark"]
511
512    def SetUp(self, CM):
513
514        CM.prepare()
515
516        # Clear out the cobwebs
517        self.TearDown(CM, force=True)
518
519        # Now start the Cluster Manager on all the nodes.
520        CM.log("Starting Cluster Manager on all node(s).")
521        return CM.startall()
522
523    def TearDown(self, CM):
524        CM.log("Stopping Cluster Manager on all node(s).")
525        return CM.stopall()
526
527
528class RollingUpgrade(ScenarioComponent):
529    (
530'''
531Test a rolling upgrade between two versions of the stack
532''')
533
534    def __init__(self, Env):
535        self.Env = Env
536
537    def IsApplicable(self):
538        if not self.Env["rpm-dir"]:
539            return None
540        if not self.Env["current-version"]:
541            return None
542        if not self.Env["previous-version"]:
543            return None
544
545        return 1
546
547    def install(self, node, version):
548
549        target_dir = "/tmp/rpm-%s" % version
550        src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version)
551
552        rc = self.CM.rsh(node, "mkdir -p %s" % target_dir)
553        rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir))
554        rc = self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir))
555
556        return self.success()
557
558    def upgrade(self, node):
559        return self.install(node, self.CM.Env["current-version"])
560
561    def downgrade(self, node):
562        return self.install(node, self.CM.Env["previous-version"])
563
564    def SetUp(self, CM):
565        print(repr(self)+"prepare")
566        CM.prepare()
567
568        # Clear out the cobwebs
569        CM.stopall(force=True)
570
571        CM.log("Downgrading all nodes to %s." % self.Env["previous-version"])
572
573        for node in self.Env["nodes"]:
574            if not self.downgrade(node):
575                CM.log("Couldn't downgrade %s" % node)
576                return None
577
578        return 1
579
580    def TearDown(self, CM):
581        # Stop everything
582        CM.log("Stopping Cluster Manager on Upgrade nodes.")
583        CM.stopall()
584
585        CM.log("Upgrading all nodes to %s." % self.Env["current-version"])
586        for node in self.Env["nodes"]:
587            if not self.upgrade(node):
588                CM.log("Couldn't upgrade %s" % node)
589                return None
590
591        return 1
592