1import MySQLdb 2from MySQLdb.constants import CR 3from MySQLdb.constants import ER 4import os 5import random 6import signal 7import sys 8import threading 9import time 10import string 11import traceback 12import logging 13import argparse 14import array as arr 15 16# This is a stress rest for the drop cf feature. This test sends the following 17# commands to mysqld server in parallel: create table, drop table, add index, 18# drop index, manual compaction and drop cf, and verifies there are 19# no race conditions or deadlocks. 20 21# This rest uses 2 "metadata" table to record tables and column families 22# that are used. 23# The first table is called "tables". 24 25# CREATE TABLE tables ( 26# table_id int NOT NULL, 27# created BOOLEAN NOT NULL, 28# primary_cf_id int, 29# secondary_cf_id int, 30# PRIMARY KEY (table_id), 31# KEY secondary_key (created, secondary_cf_id) 32# ) ENGINE=MEMORY; 33# 34# This table is populated before this script is run. Each row in this table 35# represents a table on which create table, drop table, add index, drop index 36# commands will run. 37# 38# The "created" column means if the table identified by "table_id" 39# is created or not. If the table is not created, create table command 40# can run on it. If the table is created, drop table command can run on it. 41# 42# "primary_cf_id" represents the column family the primary key is in. Thei 43# primary key is created by create table command. 44# 45# "seconary_cf_id" represents the column family that secondary key is in. 46# If it is NULL and the table is created, add index can run on the 47# table. If it is not NULL and the table is created, drop index can 48# run on the table. 49# 50# The second table is called "cfs". 51# CREATE TABLE cfs ( 52# cf_id int NOT NULL, 53# used BOOLEAN NOT NULL, 54# ref_count int NOT NULL, 55# PRIMARY KEY (cf_id) 56# ) ENGINE=MEMORY; 57 58# This table is also populated before the script is run. Each row in this 59# table represents a column family that can be used by create table and 60# and add index commands. 61# 62# The "used" column will be true if the column family identified by 63# "cf_id" has been created, but has not been dropped. It will be false 64# if it has never been created or has been dropped. 65# 66# The "ref_count" column means the number of indexes that is stored 67# in the column family. 68# 69# When drop cf and manual compaction command run, they will choose cfs 70# with "used" being true to make the test more effective. In addition, 71# drop cf will prefer cfs with ref_count being 0, i.e., the cfs don't 72# contain any indexes. 73# 74# tables and cfs tables use MEMORY storage engine, which doesn't 75# support transactions. As a result, the updates to these tables don't 76# happen atomically in create table, drop table, add index, drop index 77# operations. 78# 79# In this test we don't require that. ref_count is only used in drop cf 80# operation to skew the selection towards the cfs that don't store any 81# index. Temporary inconsistencies between tables and cfs tables don't cause 82# correctness issues. 83# 84# Further, the locking granularity is table for MEMORY storge 85# engine. When updating tables or cfs table, the table is locked. 86# This doesn't cause perf issues for this test. 87 88# global variable checked by threads to determine if the test is stopping 89TEST_STOP = False 90WEIGHTS = arr.array('i', []) 91TOTAL_WEIGHT = 0 92 93LOCKED_TABLE_IDS = set() 94LOCK = threading.Lock() 95 96# given a percentage value, rolls a 100-sided die and return whether the 97# given value is above or equal to the die roll 98# 99# passing 0 should always return false and 100 should always return true 100def roll_d100(p): 101 assert p >= 0 and p <= 100 102 return p >= random.randint(1, 100) 103 104def execute(cur, stmt): 105 ROW_COUNT_ERROR = 18446744073709551615 106 logging.debug("Executing %s" % stmt) 107 cur.execute(stmt) 108 if cur.rowcount < 0 or cur.rowcount == ROW_COUNT_ERROR: 109 raise MySQLdb.OperationalError(MySQLdb.constants.CR.CONNECTION_ERROR, 110 "Possible connection error, rowcount is %d" 111 % cur.rowcount) 112 113def is_table_exists_error(exc): 114 error_code = exc.args[0] 115 return (error_code == MySQLdb.constants.ER.TABLE_EXISTS_ERROR) 116 117def is_table_not_found_error(exc): 118 error_code = exc.args[0] 119 return (error_code == MySQLdb.constants.ER.BAD_TABLE_ERROR) 120 121def is_no_such_table_error(exc): 122 error_code = exc.args[0] 123 return (error_code == MySQLdb.constants.ER.NO_SUCH_TABLE) 124 125def is_dup_key_error(exc): 126 error_code = exc.args[0] 127 return (error_code == MySQLdb.constants.ER.DUP_KEYNAME) 128 129def is_cant_drop_key_error(exc): 130 error_code = exc.args[0] 131 return (error_code == MySQLdb.constants.ER.CANT_DROP_FIELD_OR_KEY) 132 133def wait_for_workers(workers): 134 logging.info("Waiting for %d workers", len(workers)) 135 136 # polling here allows this thread to be responsive to keyboard interrupt 137 # exceptions, otherwise a user hitting ctrl-c would see the load_generator as 138 # hanging and unresponsive 139 try: 140 while threading.active_count() > 1: 141 time.sleep(1) 142 except KeyboardInterrupt as e: 143 os._exit(1) 144 145 num_failures = 0 146 for w in workers: 147 w.join() 148 if w.exception: 149 logging.error(w.exception) 150 num_failures += 1 151 152 return num_failures 153 154class WorkerThread(threading.Thread): 155 def __init__(self, thread_id): 156 threading.Thread.__init__(self) 157 self.con = None 158 self.cur = None 159 self.num_requests = OPTIONS.num_requests 160 self.loop_num = 0 161 self.exception = None 162 163 self.start_time = time.time() 164 self.total_time = 0 165 166 self.start() 167 168 def insert_table_id(self, table_id): 169 global LOCKED_TABLE_IDS 170 global LOCK 171 ret = False 172 LOCK.acquire() 173 174 if table_id not in LOCKED_TABLE_IDS: 175 logging.debug("%d is added to locked table id set" % table_id) 176 LOCKED_TABLE_IDS.add(table_id) 177 ret = True 178 179 LOCK.release() 180 logging.debug("insert_table_id returns %s" % ret) 181 return ret 182 183 def remove_table_id(self, table_id): 184 global LOCKED_TABLE_IDS 185 global LOCK 186 187 LOCK.acquire() 188 logging.debug("%d is removed from locked table id set" % table_id) 189 LOCKED_TABLE_IDS.remove(table_id) 190 LOCK.release() 191 192 def rollback_and_sleep(self): 193 self.con.rollback() 194 time.sleep(0.05) 195 196 def insert_data_to_table(self, table_name): 197 stmt = ("INSERT INTO %s VALUES (1, 1)" 198 % table_name) 199 for value in range(2, 4): 200 stmt += ", (%d, %d)" % (value, value) 201 execute(self.cur, stmt) 202 203 self.con.commit() 204 stmt = ("SET GLOBAL rocksdb_force_flush_memtable_now = true") 205 execute(self.cur, stmt) 206 207 stmt = ("INSERT INTO %s VALUES (4, 4)" 208 % table_name) 209 for value in range(5, 7): 210 stmt += ", (%d, %d)" % (value, value) 211 execute(self.cur, stmt) 212 213 self.con.commit() 214 stmt = ("SET GLOBAL rocksdb_force_flush_memtable_now = true") 215 execute(self.cur, stmt) 216 217 def handle_create_table(self): 218 # create table 219 # randomly choose a table that has not been created, 220 # randomly choose a cf, and create the table 221 logging.debug("handle create table") 222 stmt = ("SELECT table_id FROM tables " 223 "WHERE created = 0 ORDER BY RAND() LIMIT 1") 224 execute(self.cur, stmt) 225 226 if self.cur.rowcount != 1: 227 logging.info("create table doesn't find a suitable table") 228 self.rollback_and_sleep() 229 return 230 231 row = self.cur.fetchone() 232 table_id = row[0] 233 table_name = 'tbl%02d' % table_id 234 235 stmt = ("SELECT cf_id FROM cfs " 236 "ORDER BY RAND() LIMIT 1") 237 execute(self.cur, stmt) 238 239 row = self.cur.fetchone() 240 primary_cf_id = row[0] 241 primary_cf_name = 'cf-%02d' % primary_cf_id 242 243 try: 244 stmt = ("CREATE TABLE %s (" 245 "id1 int unsigned NOT NULL," 246 "id2 int unsigned NOT NULL," 247 "PRIMARY KEY (id1) COMMENT '%s'" 248 ") ENGINE=ROCKSDB" % (table_name, primary_cf_name)) 249 execute(self.cur, stmt) 250 except (MySQLdb.InterfaceError, MySQLdb.OperationalError) as e: 251 self.rollback_and_sleep() 252 return 253 except Exception as e: 254 if is_table_exists_error(e): 255 self.rollback_and_sleep() 256 return 257 else: 258 raise e 259 260 self.insert_data_to_table(table_name) 261 262 stmt = ("UPDATE tables " 263 "SET created = 1, primary_cf_id = %d, secondary_cf_id = NULL " 264 "WHERE table_id = %d" % (primary_cf_id, table_id)) 265 execute(self.cur, stmt) 266 267 stmt = ("UPDATE cfs " 268 "SET used = 1, ref_count = ref_count + 1 " 269 "WHERE cf_id = %d" % primary_cf_id) 270 execute(self.cur, stmt) 271 272 self.con.commit() 273 logging.debug("create table done") 274 275 def handle_drop_table(self): 276 # drop table 277 # randomly choose a used table, and drop the table 278 logging.info("handle drop table") 279 280 stmt = ("SELECT table_id, primary_cf_id, secondary_cf_id FROM tables " 281 "WHERE created = 1 ORDER BY RAND() LIMIT 1") 282 execute(self.cur, stmt) 283 284 if self.cur.rowcount != 1: 285 logging.info("drop table doesn't find a suitable table") 286 self.rollback_and_sleep() 287 return 288 289 row = self.cur.fetchone() 290 table_id = row[0] 291 292 if (not self.insert_table_id(table_id)): 293 logging.info("can't drop table because the table is locked") 294 self.rollback_and_sleep() 295 return 296 297 table_name = 'tbl%02d' % table_id 298 primary_cf_id = row[1] 299 secondary_cf_id = row[2] 300 301 try: 302 stmt = ("DROP TABLE %s" % table_name) 303 execute(self.cur, stmt) 304 except Exception as e: 305 if is_table_not_found_error(e) : 306 self.rollback_and_sleep() 307 return 308 else: 309 raise e 310 311 stmt = ("UPDATE tables " 312 "SET created = 0, primary_cf_id = NULL, secondary_cf_id = NULL " 313 "WHERE table_id = %d" % table_id) 314 execute(self.cur, stmt) 315 316 if primary_cf_id is not None: 317 stmt = ("UPDATE cfs " 318 "SET ref_count = ref_count - 1 " 319 "WHERE cf_id = %d" % primary_cf_id) 320 execute(self.cur, stmt) 321 322 if secondary_cf_id is not None: 323 stmt = ("UPDATE cfs " 324 "SET ref_count = ref_count - 1 " 325 "WHERE cf_id = %d" % secondary_cf_id) 326 execute(self.cur, stmt) 327 328 self.con.commit() 329 self.remove_table_id(table_id) 330 331 logging.debug("drop table done") 332 333 def handle_add_index(self): 334 # alter table add index 335 # randomly choose a created table without secondary index, 336 # add the secondary index 337 logging.info("handle add index") 338 339 stmt = ("SELECT table_id FROM tables " 340 "WHERE created = 1 and secondary_cf_id is NULL " 341 "ORDER BY RAND() LIMIT 1") 342 execute(self.cur, stmt) 343 344 if self.cur.rowcount != 1: 345 logging.info("add index doesn't find a suitable table") 346 self.rollback_and_sleep() 347 return 348 349 row = self.cur.fetchone() 350 table_id = row[0] 351 352 if (not self.insert_table_id(table_id)): 353 logging.info("can't add index because the table is locked") 354 self.rollback_and_sleep() 355 return 356 357 table_name = 'tbl%02d' % table_id 358 359 stmt = ("SELECT cf_id FROM cfs " 360 "ORDER BY RAND() LIMIT 1") 361 execute(self.cur, stmt) 362 363 row = self.cur.fetchone() 364 secondary_cf_id = row[0] 365 secondary_cf_name = 'cf-%02d' % secondary_cf_id 366 367 try: 368 stmt = ("ALTER TABLE %s " 369 "ADD INDEX secondary_key (id2) " 370 "COMMENT '%s'" % (table_name, secondary_cf_name)) 371 execute(self.cur, stmt) 372 except (MySQLdb.InterfaceError, MySQLdb.OperationalError) as e: 373 self.rollback_and_sleep() 374 return 375 except Exception as e: 376 if is_no_such_table_error(e) or is_dup_key_error(e) : 377 self.rollback_and_sleep() 378 return 379 else: 380 raise e 381 382 stmt = ("UPDATE tables " 383 "SET secondary_cf_id = %d " 384 "WHERE table_id = %d" % (secondary_cf_id, table_id)) 385 execute(self.cur, stmt) 386 387 stmt = ("UPDATE cfs " 388 "SET used = 1, ref_count = ref_count + 1 " 389 "WHERE cf_id = %d" % secondary_cf_id) 390 execute(self.cur, stmt) 391 392 self.con.commit() 393 self.remove_table_id(table_id) 394 logging.debug("add index done") 395 396 def handle_drop_index(self): 397 # alter table drop index 398 # randomly choose a created table with secondary index 399 # , and drop the index 400 logging.info("handle drop index") 401 402 stmt = ("SELECT table_id, secondary_cf_id FROM tables " 403 "WHERE created = 1 and secondary_cf_id is NOT NULL " 404 "ORDER BY RAND() LIMIT 1") 405 execute(self.cur, stmt) 406 407 if self.cur.rowcount != 1: 408 logging.info("drop index doesn't find a suitable table") 409 self.rollback_and_sleep() 410 return 411 412 row = self.cur.fetchone() 413 table_id = row[0] 414 415 if (not self.insert_table_id(table_id)): 416 logging.info("can't drop index because the table is locked") 417 self.rollback_and_sleep() 418 return 419 420 table_name = 'tbl%02d' % table_id 421 secondary_cf_id = row[1] 422 423 try: 424 stmt = ("ALTER TABLE %s " 425 "DROP INDEX secondary_key" % table_name) 426 execute(self.cur, stmt) 427 except Exception as e: 428 if is_no_such_table_error(e) or is_cant_drop_key_error(e) : 429 self.rollback_and_sleep() 430 return 431 else: 432 raise e 433 434 stmt = ("UPDATE tables " 435 "SET secondary_cf_id = NULL " 436 "WHERE table_id = %d" % table_id) 437 execute(self.cur, stmt) 438 439 stmt = ("UPDATE cfs " 440 "SET ref_count = ref_count - 1 " 441 "WHERE cf_id = %d" % secondary_cf_id) 442 execute(self.cur, stmt) 443 444 self.con.commit() 445 self.remove_table_id(table_id) 446 logging.debug("drop index done") 447 448 def handle_manual_compaction(self): 449 # manual compaction 450 # randomly choose a used cf 451 # and do manual compaction on the cf 452 logging.info("handle manual compaction") 453 454 stmt = ("SELECT cf_id FROM cfs " 455 " WHERE used = 1 " 456 "ORDER BY RAND() LIMIT 1") 457 execute(self.cur, stmt) 458 459 if self.cur.rowcount != 1: 460 logging.info("manual compaction doesn't find a suitable cf") 461 self.rollback_and_sleep() 462 return 463 464 row = self.cur.fetchone() 465 cf_id = row[0] 466 cf_name = 'cf-%02d' % cf_id 467 468 try: 469 stmt = ("SET @@global.rocksdb_compact_cf = '%s'" % cf_name) 470 execute(self.cur, stmt) 471 except (MySQLdb.InterfaceError, MySQLdb.OperationalError) as e: 472 self.rollback_and_sleep() 473 474 self.con.commit() 475 logging.debug("manual compaction done") 476 477 def handle_drop_cf(self): 478 # drop cf 479 # randomly choose a used cf 480 # and try drop the cf 481 logging.info("handle drop cf") 482 483 stmt = None 484 if roll_d100(90): 485 stmt = ("SELECT cf_id FROM cfs " 486 " WHERE used = 1 and ref_count = 0" 487 " ORDER BY RAND() LIMIT 1") 488 else: 489 stmt = ("SELECT cf_id FROM cfs " 490 " WHERE used = 1 and ref_count > 0" 491 " ORDER BY RAND() LIMIT 1") 492 execute(self.cur, stmt) 493 494 if self.cur.rowcount != 1: 495 logging.info("drop cf doesn't find a suitable cf") 496 self.rollback_and_sleep() 497 return 498 499 row = self.cur.fetchone() 500 cf_id = row[0] 501 cf_name = 'cf-%02d' % cf_id 502 503 try: 504 stmt = ("SET @@global.rocksdb_delete_cf = '%s'" % cf_name) 505 execute(self.cur, stmt) 506 except (MySQLdb.InterfaceError, MySQLdb.OperationalError) as e: 507 self.rollback_and_sleep() 508 return 509 510 stmt = ("UPDATE cfs " 511 "SET used = 0 " 512 "WHERE cf_id = %d" % cf_id) 513 execute(self.cur, stmt) 514 515 self.con.commit() 516 logging.debug("drop cf done") 517 518 def runme(self): 519 global TEST_STOP 520 global WEIGHTS 521 global TOTAL_WEIGHT 522 523 self.con = MySQLdb.connect(user=OPTIONS.user, host=OPTIONS.host, 524 port=OPTIONS.port, db=OPTIONS.db) 525 526 if not self.con: 527 raise Exception("Unable to connect to mysqld server") 528 529 self.con.autocommit(False) 530 self.cur = self.con.cursor() 531 532 while self.loop_num < self.num_requests and not TEST_STOP: 533 p = random.randint(1, TOTAL_WEIGHT) 534 if p <= WEIGHTS[0]: 535 # create table 536 self.handle_create_table() 537 elif p <= WEIGHTS[1]: 538 # drop table 539 self.handle_drop_table() 540 elif p <= WEIGHTS[2]: 541 # alter table add index 542 self.handle_add_index() 543 elif p <= WEIGHTS[3]: 544 # alter table drop index 545 self.handle_drop_index() 546 elif p <= WEIGHTS[4]: 547 # drop cf 548 self.handle_drop_cf() 549 elif p <= WEIGHTS[5]: 550 # manual compaction 551 self.handle_manual_compaction() 552 self.loop_num += 1 553 554 def run(self): 555 global TEST_STOP 556 557 try: 558 logging.info("Started") 559 self.runme() 560 logging.info("Completed successfully") 561 except Exception as e: 562 self.exception = traceback.format_exc() 563 logging.error(self.exception) 564 TEST_STOP = True 565 finally: 566 self.total_time = time.time() - self.start_time 567 logging.info("Total run time: %.2f s" % self.total_time) 568 self.con.close() 569 570if __name__ == '__main__': 571 parser = argparse.ArgumentParser(description='Drop cf stress') 572 573 parser.add_argument('-d, --db', dest='db', default='test', 574 help="mysqld server database to test with") 575 576 parser.add_argument('-H, --host', dest='host', default='127.0.0.1', 577 help="mysqld server host ip address") 578 579 parser.add_argument('-L, --log-file', dest='log_file', default=None, 580 help="log file for output") 581 582 parser.add_argument('-w, --num-workers', dest='num_workers', type=int, 583 default=16, 584 help="number of worker threads to test with") 585 586 parser.add_argument('-P, --port', dest='port', default=3307, type=int, 587 help='mysqld server host port') 588 589 parser.add_argument('-r, --num-requests', dest='num_requests', type=int, 590 default=100000000, 591 help="number of requests issued per worker thread") 592 593 parser.add_argument('-u, --user', dest='user', default='root', 594 help="user to log into the mysql server") 595 596 parser.add_argument('-v, --verbose', dest='verbose', action='store_true', 597 help="enable debug logging") 598 599 parser.add_argument('--ct-weight', dest='create_table_weight', type=int, 600 default=3, 601 help="weight of create table command") 602 603 parser.add_argument('--dt-weight', dest='drop_table_weight', type=int, 604 default=3, 605 help="weight of drop table command") 606 607 parser.add_argument('--ai-weight', dest='add_index_weight', type=int, 608 default=3, 609 help="weight of add index command") 610 611 parser.add_argument('--di-weight', dest='drop_index_weight', type=int, 612 default=3, 613 help="weight of drop index command") 614 615 parser.add_argument('--dc-weight', dest='drop_cf_weight', type=int, 616 default=3, 617 help="weight of drop cf command") 618 619 parser.add_argument('--mc-weight', dest='manual_compaction_weight', 620 type=int, 621 default=1, 622 help="weight of manual compaction command") 623 624 OPTIONS = parser.parse_args() 625 626 if OPTIONS.verbose: 627 log_level = logging.DEBUG 628 else: 629 log_level = logging.INFO 630 631 logging.basicConfig(level=log_level, 632 format='%(asctime)s %(threadName)s [%(levelname)s] ' 633 '%(message)s', 634 datefmt='%Y-%m-%d %H:%M:%S', 635 filename=OPTIONS.log_file) 636 637 con = MySQLdb.connect(user=OPTIONS.user, host=OPTIONS.host, 638 port=OPTIONS.port, db=OPTIONS.db) 639 640 if not con: 641 raise Exception("Unable to connect to mysqld server") 642 643 TOTAL_WEIGHT += OPTIONS.create_table_weight 644 WEIGHTS.append(TOTAL_WEIGHT) 645 646 TOTAL_WEIGHT += OPTIONS.drop_table_weight; 647 WEIGHTS.append(TOTAL_WEIGHT) 648 649 TOTAL_WEIGHT += OPTIONS.add_index_weight; 650 WEIGHTS.append(TOTAL_WEIGHT) 651 652 TOTAL_WEIGHT += OPTIONS.drop_index_weight; 653 WEIGHTS.append(TOTAL_WEIGHT) 654 655 TOTAL_WEIGHT += OPTIONS.drop_cf_weight; 656 WEIGHTS.append(TOTAL_WEIGHT) 657 658 TOTAL_WEIGHT += OPTIONS.manual_compaction_weight; 659 WEIGHTS.append(TOTAL_WEIGHT) 660 661 workers = [] 662 for i in range(OPTIONS.num_workers): 663 workers.append(WorkerThread(i)) 664 665 workers_failed = 0 666 workers_failed += wait_for_workers(workers) 667 if workers_failed > 0: 668 logging.error("Test detected %d failures, aborting" % workers_failed) 669 sys.exit(1) 670 671 cur = con.cursor() 672 stmt = ("SELECT table_name FROM information_schema.tables " 673 "WHERE table_name like 'tbl%'") 674 execute(cur, stmt) 675 676 for row in cur.fetchall(): 677 table_name = row[0] 678 stmt = ("DROP TABLE %s" % table_name) 679 execute(cur, stmt) 680 681 stmt = ("SELECT table_name FROM information_schema.tables " 682 "WHERE table_name like 'cf-%'") 683 execute(cur, stmt) 684 685 for row in cur.fetchall(): 686 table_name = row[0] 687 stmt = ("DROP TABLE %s" % table_name) 688 execute(cur, stmt) 689 690 con.close() 691 692 sys.exit(0) 693 694