1import MySQLdb
2from MySQLdb.constants import CR
3from MySQLdb.constants import ER
4import os
5import random
6import signal
7import sys
8import threading
9import time
10import string
11import traceback
12import logging
13import argparse
14import array as arr
15
16# This is a stress rest for the drop cf feature. This test sends the following
17# commands to mysqld server in parallel: create table, drop table, add index,
18# drop index, manual compaction and drop cf, and verifies there are
19# no race conditions or deadlocks.
20
21# This rest uses 2 "metadata" table to record tables and column families
22# that are used.
23# The first table is called "tables".
24
25# CREATE TABLE tables (
26#    table_id int NOT NULL,
27#    created BOOLEAN NOT NULL,
28#    primary_cf_id int,
29#    secondary_cf_id int,
30#    PRIMARY KEY (table_id),
31#    KEY secondary_key (created, secondary_cf_id)
32#    ) ENGINE=MEMORY;
33#
34# This table is populated before this script is run. Each row in this table
35# represents a table on which create table, drop table, add index, drop index
36# commands will run.
37#
38# The "created" column means if the table identified by "table_id"
39# is created or not. If the table is not created, create table command
40# can run on it. If the table is created, drop table command can run on it.
41#
42# "primary_cf_id" represents the column family the primary key is in. Thei
43# primary key is created by create table command.
44#
45# "seconary_cf_id" represents the column family that secondary key is in.
46# If it is NULL and the table is created, add index can run on the
47# table. If it is not NULL and the table is created, drop index can
48# run on the table.
49#
50# The second table is called "cfs".
51# CREATE TABLE cfs (
52#    cf_id int NOT NULL,
53#    used BOOLEAN NOT NULL,
54#    ref_count int NOT NULL,
55#    PRIMARY KEY (cf_id)
56#    ) ENGINE=MEMORY;
57
58# This table is also populated before the script is run. Each row in this
59# table represents a column family that can be used by create table and
60# and add index commands.
61#
62# The "used" column will be true if the column family identified by
63# "cf_id" has been created, but has not been dropped. It will be false
64# if it has never been created or has been dropped.
65#
66# The "ref_count" column means the number of indexes that is stored
67# in the column family.
68#
69# When drop cf and manual compaction command run, they will choose cfs
70# with "used" being true to make the test more effective. In addition,
71# drop cf will prefer cfs with ref_count being 0, i.e., the cfs don't
72# contain any indexes.
73#
74# tables and cfs tables use MEMORY storage engine, which doesn't
75# support transactions. As a result, the updates to these tables don't
76# happen atomically in create table, drop table, add index, drop index
77# operations.
78#
79# In this test we don't require that. ref_count is only used in drop cf
80# operation to skew the selection towards the cfs that don't store any
81# index. Temporary inconsistencies between tables and cfs tables don't cause
82# correctness issues.
83#
84# Further, the locking granularity is table for MEMORY storge
85# engine. When updating tables or cfs table, the table is locked.
86# This doesn't cause perf issues for this test.
87
88# global variable checked by threads to determine if the test is stopping
89TEST_STOP = False
90WEIGHTS = arr.array('i', [])
91TOTAL_WEIGHT = 0
92
93LOCKED_TABLE_IDS = set()
94LOCK = threading.Lock()
95
96# given a percentage value, rolls a 100-sided die and return whether the
97# given value is above or equal to the die roll
98#
99# passing 0 should always return false and 100 should always return true
100def roll_d100(p):
101  assert p >= 0 and p <= 100
102  return p >= random.randint(1, 100)
103
104def execute(cur, stmt):
105  ROW_COUNT_ERROR = 18446744073709551615
106  logging.debug("Executing %s" % stmt)
107  cur.execute(stmt)
108  if cur.rowcount < 0 or cur.rowcount == ROW_COUNT_ERROR:
109    raise MySQLdb.OperationalError(MySQLdb.constants.CR.CONNECTION_ERROR,
110                                   "Possible connection error, rowcount is %d"
111                                   % cur.rowcount)
112
113def is_table_exists_error(exc):
114  error_code = exc.args[0]
115  return (error_code == MySQLdb.constants.ER.TABLE_EXISTS_ERROR)
116
117def is_table_not_found_error(exc):
118  error_code = exc.args[0]
119  return (error_code == MySQLdb.constants.ER.BAD_TABLE_ERROR)
120
121def is_no_such_table_error(exc):
122    error_code = exc.args[0]
123    return (error_code == MySQLdb.constants.ER.NO_SUCH_TABLE)
124
125def is_dup_key_error(exc):
126    error_code = exc.args[0]
127    return (error_code == MySQLdb.constants.ER.DUP_KEYNAME)
128
129def is_cant_drop_key_error(exc):
130    error_code = exc.args[0]
131    return (error_code == MySQLdb.constants.ER.CANT_DROP_FIELD_OR_KEY)
132
133def wait_for_workers(workers):
134  logging.info("Waiting for %d workers", len(workers))
135
136  # polling here allows this thread to be responsive to keyboard interrupt
137  # exceptions, otherwise a user hitting ctrl-c would see the load_generator as
138  # hanging and unresponsive
139  try:
140    while threading.active_count() > 1:
141      time.sleep(1)
142  except KeyboardInterrupt as e:
143    os._exit(1)
144
145  num_failures = 0
146  for w in workers:
147    w.join()
148    if w.exception:
149      logging.error(w.exception)
150      num_failures += 1
151
152  return num_failures
153
154class WorkerThread(threading.Thread):
155    def __init__(self, thread_id):
156        threading.Thread.__init__(self)
157        self.con = None
158        self.cur = None
159        self.num_requests = OPTIONS.num_requests
160        self.loop_num = 0
161        self.exception = None
162
163        self.start_time = time.time()
164        self.total_time = 0
165
166        self.start()
167
168    def insert_table_id(self, table_id):
169        global LOCKED_TABLE_IDS
170        global LOCK
171        ret = False
172        LOCK.acquire()
173
174        if table_id not in LOCKED_TABLE_IDS:
175            logging.debug("%d is added to locked table id set" % table_id)
176            LOCKED_TABLE_IDS.add(table_id)
177            ret = True
178
179        LOCK.release()
180        logging.debug("insert_table_id returns %s" % ret)
181        return ret
182
183    def remove_table_id(self, table_id):
184        global LOCKED_TABLE_IDS
185        global LOCK
186
187        LOCK.acquire()
188        logging.debug("%d is removed from locked table id set" % table_id)
189        LOCKED_TABLE_IDS.remove(table_id)
190        LOCK.release()
191
192    def rollback_and_sleep(self):
193        self.con.rollback()
194        time.sleep(0.05)
195
196    def insert_data_to_table(self, table_name):
197        stmt = ("INSERT INTO %s VALUES (1, 1)"
198            % table_name)
199        for value in range(2, 4):
200            stmt += ", (%d, %d)" % (value, value)
201        execute(self.cur, stmt)
202
203        self.con.commit()
204        stmt = ("SET GLOBAL rocksdb_force_flush_memtable_now = true")
205        execute(self.cur, stmt)
206
207        stmt = ("INSERT INTO %s VALUES (4, 4)"
208            % table_name)
209        for value in range(5, 7):
210            stmt += ", (%d, %d)" % (value, value)
211        execute(self.cur, stmt)
212
213        self.con.commit()
214        stmt = ("SET GLOBAL rocksdb_force_flush_memtable_now = true")
215        execute(self.cur, stmt)
216
217    def handle_create_table(self):
218        # create table
219        # randomly choose a table that has not been created,
220        # randomly choose a cf, and create the table
221        logging.debug("handle create table")
222        stmt = ("SELECT table_id FROM tables "
223            "WHERE created = 0 ORDER BY RAND() LIMIT 1")
224        execute(self.cur, stmt)
225
226        if self.cur.rowcount != 1:
227            logging.info("create table doesn't find a suitable table")
228            self.rollback_and_sleep()
229            return
230
231        row = self.cur.fetchone()
232        table_id = row[0]
233        table_name = 'tbl%02d' % table_id
234
235        stmt = ("SELECT cf_id FROM cfs "
236            "ORDER BY RAND() LIMIT 1")
237        execute(self.cur, stmt)
238
239        row = self.cur.fetchone()
240        primary_cf_id = row[0]
241        primary_cf_name = 'cf-%02d' % primary_cf_id
242
243        try:
244            stmt = ("CREATE TABLE %s ("
245                    "id1 int unsigned NOT NULL,"
246                    "id2 int unsigned NOT NULL,"
247                    "PRIMARY KEY (id1) COMMENT '%s'"
248                    ") ENGINE=ROCKSDB" % (table_name, primary_cf_name))
249            execute(self.cur, stmt)
250        except (MySQLdb.InterfaceError, MySQLdb.OperationalError) as e:
251            self.rollback_and_sleep()
252            return
253        except Exception as e:
254            if is_table_exists_error(e):
255                self.rollback_and_sleep()
256                return
257            else:
258                raise e
259
260        self.insert_data_to_table(table_name)
261
262        stmt = ("UPDATE tables "
263            "SET created = 1, primary_cf_id = %d, secondary_cf_id = NULL "
264            "WHERE table_id = %d" % (primary_cf_id, table_id))
265        execute(self.cur, stmt)
266
267        stmt = ("UPDATE cfs "
268            "SET used = 1, ref_count = ref_count + 1 "
269            "WHERE cf_id = %d" % primary_cf_id)
270        execute(self.cur, stmt)
271
272        self.con.commit()
273        logging.debug("create table done")
274
275    def handle_drop_table(self):
276        # drop table
277        # randomly choose a used table, and drop the table
278        logging.info("handle drop table")
279
280        stmt = ("SELECT table_id, primary_cf_id, secondary_cf_id FROM tables "
281            "WHERE created = 1 ORDER BY RAND() LIMIT 1")
282        execute(self.cur, stmt)
283
284        if self.cur.rowcount != 1:
285            logging.info("drop table doesn't find a suitable table")
286            self.rollback_and_sleep()
287            return
288
289        row = self.cur.fetchone()
290        table_id = row[0]
291
292        if (not self.insert_table_id(table_id)):
293            logging.info("can't drop table because the table is locked")
294            self.rollback_and_sleep()
295            return
296
297        table_name = 'tbl%02d' % table_id
298        primary_cf_id = row[1]
299        secondary_cf_id = row[2]
300
301        try:
302            stmt = ("DROP TABLE %s" % table_name)
303            execute(self.cur, stmt)
304        except Exception as e:
305            if is_table_not_found_error(e) :
306                self.rollback_and_sleep()
307                return
308            else:
309                raise e
310
311        stmt = ("UPDATE tables "
312            "SET created = 0, primary_cf_id = NULL, secondary_cf_id = NULL "
313            "WHERE table_id = %d" % table_id)
314        execute(self.cur, stmt)
315
316        if primary_cf_id is not None:
317            stmt = ("UPDATE cfs "
318                "SET ref_count = ref_count - 1 "
319                "WHERE cf_id = %d" % primary_cf_id)
320            execute(self.cur, stmt)
321
322        if secondary_cf_id is not None:
323            stmt = ("UPDATE cfs "
324                "SET ref_count = ref_count - 1 "
325                "WHERE cf_id = %d" % secondary_cf_id)
326            execute(self.cur, stmt)
327
328        self.con.commit()
329        self.remove_table_id(table_id)
330
331        logging.debug("drop table done")
332
333    def handle_add_index(self):
334        # alter table add index
335        # randomly choose a created table without secondary index,
336        # add the secondary index
337        logging.info("handle add index")
338
339        stmt = ("SELECT table_id FROM tables "
340            "WHERE created = 1 and secondary_cf_id is NULL "
341            "ORDER BY RAND() LIMIT 1")
342        execute(self.cur, stmt)
343
344        if self.cur.rowcount != 1:
345            logging.info("add index doesn't find a suitable table")
346            self.rollback_and_sleep()
347            return
348
349        row = self.cur.fetchone()
350        table_id = row[0]
351
352        if (not self.insert_table_id(table_id)):
353            logging.info("can't add index because the table is locked")
354            self.rollback_and_sleep()
355            return
356
357        table_name = 'tbl%02d' % table_id
358
359        stmt = ("SELECT cf_id FROM cfs "
360            "ORDER BY RAND() LIMIT 1")
361        execute(self.cur, stmt)
362
363        row = self.cur.fetchone()
364        secondary_cf_id = row[0]
365        secondary_cf_name = 'cf-%02d' % secondary_cf_id
366
367        try:
368            stmt = ("ALTER TABLE %s "
369                    "ADD INDEX secondary_key (id2) "
370                    "COMMENT '%s'" % (table_name, secondary_cf_name))
371            execute(self.cur, stmt)
372        except (MySQLdb.InterfaceError, MySQLdb.OperationalError) as e:
373            self.rollback_and_sleep()
374            return
375        except Exception as e:
376            if is_no_such_table_error(e) or is_dup_key_error(e) :
377                self.rollback_and_sleep()
378                return
379            else:
380                raise e
381
382        stmt = ("UPDATE tables "
383            "SET secondary_cf_id = %d "
384            "WHERE table_id = %d" % (secondary_cf_id, table_id))
385        execute(self.cur, stmt)
386
387        stmt = ("UPDATE cfs "
388            "SET used = 1, ref_count = ref_count + 1 "
389            "WHERE cf_id = %d" % secondary_cf_id)
390        execute(self.cur, stmt)
391
392        self.con.commit()
393        self.remove_table_id(table_id)
394        logging.debug("add index done")
395
396    def handle_drop_index(self):
397        # alter table drop index
398        # randomly choose a created table with secondary index
399        # , and drop the index
400        logging.info("handle drop index")
401
402        stmt = ("SELECT table_id, secondary_cf_id FROM tables "
403            "WHERE created = 1 and secondary_cf_id is NOT NULL "
404            "ORDER BY RAND() LIMIT 1")
405        execute(self.cur, stmt)
406
407        if self.cur.rowcount != 1:
408            logging.info("drop index doesn't find a suitable table")
409            self.rollback_and_sleep()
410            return
411
412        row = self.cur.fetchone()
413        table_id = row[0]
414
415        if (not self.insert_table_id(table_id)):
416            logging.info("can't drop index because the table is locked")
417            self.rollback_and_sleep()
418            return
419
420        table_name = 'tbl%02d' % table_id
421        secondary_cf_id = row[1]
422
423        try:
424            stmt = ("ALTER TABLE %s "
425                    "DROP INDEX secondary_key" % table_name)
426            execute(self.cur, stmt)
427        except Exception as e:
428            if is_no_such_table_error(e) or is_cant_drop_key_error(e) :
429                self.rollback_and_sleep()
430                return
431            else:
432                raise e
433
434        stmt = ("UPDATE tables "
435            "SET secondary_cf_id = NULL "
436            "WHERE table_id = %d" % table_id)
437        execute(self.cur, stmt)
438
439        stmt = ("UPDATE cfs "
440            "SET ref_count = ref_count - 1 "
441            "WHERE cf_id = %d" % secondary_cf_id)
442        execute(self.cur, stmt)
443
444        self.con.commit()
445        self.remove_table_id(table_id)
446        logging.debug("drop index done")
447
448    def handle_manual_compaction(self):
449        # manual compaction
450        # randomly choose a used cf
451        # and do manual compaction on the cf
452        logging.info("handle manual compaction")
453
454        stmt = ("SELECT cf_id FROM cfs "
455             " WHERE used = 1 "
456             "ORDER BY RAND() LIMIT 1")
457        execute(self.cur, stmt)
458
459        if self.cur.rowcount != 1:
460            logging.info("manual compaction doesn't find a suitable cf")
461            self.rollback_and_sleep()
462            return
463
464        row = self.cur.fetchone()
465        cf_id = row[0]
466        cf_name = 'cf-%02d' % cf_id
467
468        try:
469            stmt = ("SET @@global.rocksdb_compact_cf = '%s'" % cf_name)
470            execute(self.cur, stmt)
471        except (MySQLdb.InterfaceError, MySQLdb.OperationalError) as e:
472            self.rollback_and_sleep()
473
474        self.con.commit()
475        logging.debug("manual compaction done")
476
477    def handle_drop_cf(self):
478        # drop cf
479        # randomly choose a used cf
480        # and try drop the cf
481        logging.info("handle drop cf")
482
483        stmt = None
484        if roll_d100(90):
485            stmt = ("SELECT cf_id FROM cfs "
486                " WHERE used = 1 and ref_count = 0"
487                " ORDER BY RAND() LIMIT 1")
488        else:
489            stmt = ("SELECT cf_id FROM cfs "
490                " WHERE used = 1 and ref_count > 0"
491                " ORDER BY RAND() LIMIT 1")
492        execute(self.cur, stmt)
493
494        if self.cur.rowcount != 1:
495            logging.info("drop cf doesn't find a suitable cf")
496            self.rollback_and_sleep()
497            return
498
499        row = self.cur.fetchone()
500        cf_id = row[0]
501        cf_name = 'cf-%02d' % cf_id
502
503        try:
504            stmt = ("SET @@global.rocksdb_delete_cf = '%s'" % cf_name)
505            execute(self.cur, stmt)
506        except (MySQLdb.InterfaceError, MySQLdb.OperationalError) as e:
507            self.rollback_and_sleep()
508            return
509
510        stmt = ("UPDATE cfs "
511            "SET used = 0 "
512            "WHERE cf_id = %d" % cf_id)
513        execute(self.cur, stmt)
514
515        self.con.commit()
516        logging.debug("drop cf done")
517
518    def runme(self):
519        global TEST_STOP
520        global WEIGHTS
521        global TOTAL_WEIGHT
522
523        self.con = MySQLdb.connect(user=OPTIONS.user, host=OPTIONS.host,
524                           port=OPTIONS.port, db=OPTIONS.db)
525
526        if not self.con:
527            raise Exception("Unable to connect to mysqld server")
528
529        self.con.autocommit(False)
530        self.cur = self.con.cursor()
531
532        while self.loop_num < self.num_requests and not TEST_STOP:
533            p = random.randint(1, TOTAL_WEIGHT)
534            if p <= WEIGHTS[0]:
535                # create table
536                self.handle_create_table()
537            elif p <= WEIGHTS[1]:
538                # drop table
539                self.handle_drop_table()
540            elif p <= WEIGHTS[2]:
541                # alter table add index
542                self.handle_add_index()
543            elif p <= WEIGHTS[3]:
544                # alter table drop index
545                self.handle_drop_index()
546            elif p <= WEIGHTS[4]:
547                # drop cf
548                self.handle_drop_cf()
549            elif p <= WEIGHTS[5]:
550                # manual compaction
551                self.handle_manual_compaction()
552            self.loop_num += 1
553
554    def run(self):
555        global TEST_STOP
556
557        try:
558            logging.info("Started")
559            self.runme()
560            logging.info("Completed successfully")
561        except Exception as e:
562            self.exception = traceback.format_exc()
563            logging.error(self.exception)
564            TEST_STOP = True
565        finally:
566            self.total_time = time.time() - self.start_time
567            logging.info("Total run time: %.2f s" % self.total_time)
568            self.con.close()
569
570if  __name__ == '__main__':
571    parser = argparse.ArgumentParser(description='Drop cf stress')
572
573    parser.add_argument('-d, --db', dest='db', default='test',
574                        help="mysqld server database to test with")
575
576    parser.add_argument('-H, --host', dest='host', default='127.0.0.1',
577                        help="mysqld server host ip address")
578
579    parser.add_argument('-L, --log-file', dest='log_file', default=None,
580                        help="log file for output")
581
582    parser.add_argument('-w, --num-workers', dest='num_workers', type=int,
583                        default=16,
584                        help="number of worker threads to test with")
585
586    parser.add_argument('-P, --port', dest='port', default=3307, type=int,
587                        help='mysqld server host port')
588
589    parser.add_argument('-r, --num-requests', dest='num_requests', type=int,
590                        default=100000000,
591                        help="number of requests issued per worker thread")
592
593    parser.add_argument('-u, --user', dest='user', default='root',
594                        help="user to log into the mysql server")
595
596    parser.add_argument('-v, --verbose', dest='verbose', action='store_true',
597                        help="enable debug logging")
598
599    parser.add_argument('--ct-weight', dest='create_table_weight', type=int,
600                        default=3,
601                        help="weight of create table command")
602
603    parser.add_argument('--dt-weight', dest='drop_table_weight', type=int,
604                        default=3,
605                        help="weight of drop table command")
606
607    parser.add_argument('--ai-weight', dest='add_index_weight', type=int,
608                        default=3,
609                        help="weight of add index command")
610
611    parser.add_argument('--di-weight', dest='drop_index_weight', type=int,
612                        default=3,
613                        help="weight of drop index command")
614
615    parser.add_argument('--dc-weight', dest='drop_cf_weight', type=int,
616                        default=3,
617                        help="weight of drop cf command")
618
619    parser.add_argument('--mc-weight', dest='manual_compaction_weight',
620                        type=int,
621                        default=1,
622                        help="weight of manual compaction command")
623
624    OPTIONS = parser.parse_args()
625
626    if OPTIONS.verbose:
627        log_level = logging.DEBUG
628    else:
629        log_level = logging.INFO
630
631    logging.basicConfig(level=log_level,
632                    format='%(asctime)s %(threadName)s [%(levelname)s] '
633                           '%(message)s',
634                    datefmt='%Y-%m-%d %H:%M:%S',
635                    filename=OPTIONS.log_file)
636
637    con = MySQLdb.connect(user=OPTIONS.user, host=OPTIONS.host,
638                           port=OPTIONS.port, db=OPTIONS.db)
639
640    if not con:
641        raise Exception("Unable to connect to mysqld server")
642
643    TOTAL_WEIGHT += OPTIONS.create_table_weight
644    WEIGHTS.append(TOTAL_WEIGHT)
645
646    TOTAL_WEIGHT += OPTIONS.drop_table_weight;
647    WEIGHTS.append(TOTAL_WEIGHT)
648
649    TOTAL_WEIGHT += OPTIONS.add_index_weight;
650    WEIGHTS.append(TOTAL_WEIGHT)
651
652    TOTAL_WEIGHT += OPTIONS.drop_index_weight;
653    WEIGHTS.append(TOTAL_WEIGHT)
654
655    TOTAL_WEIGHT += OPTIONS.drop_cf_weight;
656    WEIGHTS.append(TOTAL_WEIGHT)
657
658    TOTAL_WEIGHT += OPTIONS.manual_compaction_weight;
659    WEIGHTS.append(TOTAL_WEIGHT)
660
661    workers = []
662    for i in range(OPTIONS.num_workers):
663        workers.append(WorkerThread(i))
664
665    workers_failed = 0
666    workers_failed += wait_for_workers(workers)
667    if workers_failed > 0:
668        logging.error("Test detected %d failures, aborting" % workers_failed)
669        sys.exit(1)
670
671    cur = con.cursor()
672    stmt = ("SELECT table_name FROM information_schema.tables "
673        "WHERE table_name like 'tbl%'")
674    execute(cur, stmt)
675
676    for row in cur.fetchall():
677        table_name = row[0]
678        stmt = ("DROP TABLE %s" % table_name)
679        execute(cur, stmt)
680
681    stmt = ("SELECT table_name FROM information_schema.tables "
682        "WHERE table_name like 'cf-%'")
683    execute(cur, stmt)
684
685    for row in cur.fetchall():
686        table_name = row[0]
687        stmt = ("DROP TABLE %s" % table_name)
688        execute(cur, stmt)
689
690    con.close()
691
692    sys.exit(0)
693
694