1#!@PYTHON_SHEBANG@ 2 3from __future__ import division 4from optparse import OptionParser 5import collections 6import signal 7import os 8import stat 9import sys 10import re 11import subprocess 12import logging 13import logging.handlers 14import time 15import datetime 16import shutil 17import traceback 18import tempfile 19 20import MySQLdb 21import MySQLdb.connections 22from MySQLdb import OperationalError, ProgrammingError 23 24logger = None 25opts = None 26rocksdb_files = ['MANIFEST', 'CURRENT', 'OPTIONS'] 27rocksdb_data_suffix = '.sst' 28rocksdb_wal_suffix = '.log' 29exclude_files = ['master.info', 'relay-log.info', 'worker-relay-log.info', 30 'auto.cnf', 'gaplock.log', 'ibdata', 'ib_logfile', '.trash'] 31wdt_bin = 'wdt' 32 33def is_manifest(fname): 34 for m in rocksdb_files: 35 if fname.startswith(m): 36 return True 37 return False 38 39class Writer(object): 40 a = None 41 def __init__(self): 42 a = None 43 44class StreamWriter(Writer): 45 stream_cmd= '' 46 47 def __init__(self, stream_option, direct = 0): 48 super(StreamWriter, self).__init__() 49 if stream_option == 'tar': 50 self.stream_cmd= 'tar chf -' 51 elif stream_option == 'xbstream': 52 self.stream_cmd= 'xbstream -c' 53 if direct: 54 self.stream_cmd = self.stream_cmd + ' -d' 55 else: 56 raise Exception("Only tar or xbstream is supported as streaming option.") 57 58 def write(self, file_name): 59 rc= os.system(self.stream_cmd + " " + file_name) 60 if (rc != 0): 61 raise Exception("Got error on stream write: " + str(rc) + " " + file_name) 62 63 64class MiscFilesProcessor(): 65 datadir = None 66 wildcard = r'.*\.[frm|MYD|MYI|MAD|MAI|MRG|TRG|TRN|ARM|ARZ|CSM|CSV|opt|par]' 67 regex = None 68 start_backup_time = None 69 skip_check_frm_timestamp = None 70 71 def __init__(self, datadir, skip_check_frm_timestamp, start_backup_time): 72 self.datadir = datadir 73 self.regex = re.compile(self.wildcard) 74 self.skip_check_frm_timestamp = skip_check_frm_timestamp 75 self.start_backup_time = start_backup_time 76 77 def process_db(self, db): 78 # do nothing 79 pass 80 81 def process_file(self, path): 82 # do nothing 83 pass 84 85 def check_frm_timestamp(self, fname, path): 86 if not self.skip_check_frm_timestamp and fname.endswith('.frm'): 87 if os.path.getmtime(path) > self.start_backup_time: 88 logger.error('FRM file %s was updated after starting backups. ' 89 'Schema could have changed and the resulting copy may ' 90 'not be valid. Aborting. ' 91 '(backup time: %s, file modifled time: %s)', 92 path, datetime.datetime.fromtimestamp(self.start_backup_time).strftime('%Y-%m-%d %H:%M:%S'), 93 datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime('%Y-%m-%d %H:%M:%S')) 94 raise Exception("Inconsistent frm file timestamp"); 95 96 def process(self): 97 os.chdir(self.datadir) 98 for db in self.get_databases(): 99 logger.info("Starting MySQL misc file traversal from database %s..", db) 100 self.process_db(db) 101 for f in self.get_files(db): 102 if self.match(f): 103 rel_path = os.path.join(db, f) 104 self.check_frm_timestamp(f, rel_path) 105 self.process_file(rel_path) 106 logger.info("Traversing misc files from data directory..") 107 for f in self.get_files(""): 108 should_skip = False 109 for e in exclude_files: 110 if f.startswith(e) or f.endswith(e): 111 logger.info("Skipping %s", f) 112 should_skip = True 113 break 114 if not should_skip: 115 self.process_file(f) 116 117 def match(self, filename): 118 if self.regex.match(filename): 119 return True 120 else: 121 return False 122 123 def get_databases(self): 124 dbs = [] 125 dirs = [ d for d in os.listdir(self.datadir) \ 126 if not os.path.isfile(os.path.join(self.datadir,d))] 127 for db in dirs: 128 if not db.startswith('.') and not self._is_socket(db) and not db == "#rocksdb": 129 dbs.append(db) 130 return dbs 131 132 def get_files(self, db): 133 dbdir = self.datadir + "/" + db 134 return [ f for f in os.listdir(dbdir) \ 135 if os.path.isfile(os.path.join(dbdir,f))] 136 137 def _is_socket(self, item): 138 mode = os.stat(os.path.join(self.datadir, item)).st_mode 139 if stat.S_ISSOCK(mode): 140 return True 141 return False 142 143 144class MySQLBackup(MiscFilesProcessor): 145 writer = None 146 147 def __init__(self, datadir, writer, skip_check_frm_timestamp, start_backup_time): 148 MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time) 149 self.writer = writer 150 151 def process_file(self, fname): # overriding base class 152 self.writer.write(fname) 153 154 155class MiscFilesLinkCreator(MiscFilesProcessor): 156 snapshot_dir = None 157 158 def __init__(self, datadir, snapshot_dir, skip_check_frm_timestamp, start_backup_time): 159 MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time) 160 self.snapshot_dir = snapshot_dir 161 162 def process_db(self, db): 163 snapshot_sub_dir = os.path.join(self.snapshot_dir, db) 164 os.makedirs(snapshot_sub_dir) 165 166 def process_file(self, path): 167 dst_path = os.path.join(self.snapshot_dir, path) 168 os.link(path, dst_path) 169 170 171# RocksDB backup 172class RocksDBBackup(): 173 source_dir = None 174 writer = None 175 # sst files sent in this backup round 176 sent_sst = {} 177 # target sst files in this backup round 178 target_sst = {} 179 # sst files sent in all backup rounds 180 total_sent_sst= {} 181 # sum of sst file size sent in this backup round 182 sent_sst_size = 0 183 # sum of target sst file size in this backup round 184 # if sent_sst_size becomes equal to target_sst_size, 185 # it means the backup round finished backing up all sst files 186 target_sst_size = 0 187 # sum of all sst file size sent all backup rounds 188 total_sent_sst_size= 0 189 # sum of all target sst file size from all backup rounds 190 total_target_sst_size = 0 191 show_progress_size_interval= 1073741824 # 1GB 192 wal_files= [] 193 manifest_files= [] 194 finished= False 195 196 def __init__(self, source_dir, writer, prev): 197 self.source_dir = source_dir 198 self.writer = writer 199 os.chdir(self.source_dir) 200 self.init_target_files(prev) 201 202 def init_target_files(self, prev): 203 sst = {} 204 self.sent_sst = {} 205 self.target_sst= {} 206 self.total_sent_sst = {} 207 self.sent_sst_size = 0 208 self.target_sst_size = 0 209 self.total_sent_sst_size= 0 210 self.total_target_sst_size= 0 211 self.wal_files= [] 212 self.manifest_files= [] 213 214 for f in os.listdir(self.source_dir): 215 if f.endswith(rocksdb_data_suffix): 216 # exactly the same file (same size) was sent in previous backup rounds 217 if prev is not None and f in prev.total_sent_sst and int(os.stat(f).st_size) == prev.total_sent_sst[f]: 218 continue 219 sst[f]= int(os.stat(f).st_size) 220 self.target_sst_size = self.target_sst_size + os.stat(f).st_size 221 elif is_manifest(f): 222 self.manifest_files.append(f) 223 elif f.endswith(rocksdb_wal_suffix): 224 self.wal_files.append(f) 225 self.target_sst= collections.OrderedDict(sorted(sst.items())) 226 227 if prev is not None: 228 self.total_sent_sst = prev.total_sent_sst 229 self.total_sent_sst_size = prev.total_sent_sst_size 230 self.total_target_sst_size = self.target_sst_size + prev.total_sent_sst_size 231 else: 232 self.total_target_sst_size = self.target_sst_size 233 234 def do_backup_single(self, fname): 235 self.writer.write(fname) 236 os.remove(fname) 237 238 def do_backup_sst(self, fname, size): 239 self.do_backup_single(fname) 240 self.sent_sst[fname]= size 241 self.total_sent_sst[fname]= size 242 self.sent_sst_size = self.sent_sst_size + size 243 self.total_sent_sst_size = self.total_sent_sst_size + size 244 245 def do_backup_manifest(self): 246 for f in self.manifest_files: 247 self.do_backup_single(f) 248 249 def do_backup_wal(self): 250 for f in self.wal_files: 251 self.do_backup_single(f) 252 253 # this is the last snapshot round. backing up all the rest files 254 def do_backup_final(self): 255 logger.info("Backup WAL..") 256 self.do_backup_wal() 257 logger.info("Backup Manifest..") 258 self.do_backup_manifest() 259 self.do_cleanup() 260 self.finished= True 261 262 def do_cleanup(self): 263 shutil.rmtree(self.source_dir) 264 logger.info("Cleaned up checkpoint from %s", self.source_dir) 265 266 def do_backup_until(self, time_limit): 267 logger.info("Starting backup from snapshot: target files %d", len(self.target_sst)) 268 start_time= time.time() 269 last_progress_time= start_time 270 progress_size= 0 271 for fname, size in self.target_sst.iteritems(): 272 self.do_backup_sst(fname, size) 273 progress_size= progress_size + size 274 elapsed_seconds = time.time() - start_time 275 progress_seconds = time.time() - last_progress_time 276 277 if self.should_show_progress(size): 278 self.show_progress(progress_size, progress_seconds) 279 progress_size=0 280 last_progress_time= time.time() 281 282 if elapsed_seconds > time_limit and self.has_sent_all_sst() is False: 283 logger.info("Snapshot round finished. Elapsed Time: %5.2f. Remaining sst files: %d", 284 elapsed_seconds, len(self.target_sst) - len(self.sent_sst)) 285 self.do_cleanup() 286 break; 287 if self.has_sent_all_sst(): 288 self.do_backup_final() 289 290 return self 291 292 def should_show_progress(self, size): 293 if int(self.total_sent_sst_size/self.show_progress_size_interval) > int((self.total_sent_sst_size-size)/self.show_progress_size_interval): 294 return True 295 else: 296 return False 297 298 def show_progress(self, size, seconds): 299 logger.info("Backup Progress: %5.2f%% Sent %6.2f GB of %6.2f GB data, Transfer Speed: %6.2f MB/s", 300 self.total_sent_sst_size*100/self.total_target_sst_size, 301 self.total_sent_sst_size/1024/1024/1024, 302 self.total_target_sst_size/1024/1024/1024, 303 size/seconds/1024/1024) 304 305 def print_backup_report(self): 306 logger.info("Sent %6.2f GB of sst files, %d files in total.", 307 self.total_sent_sst_size/1024/1024/1024, 308 len(self.total_sent_sst)) 309 310 def has_sent_all_sst(self): 311 if self.sent_sst_size == self.target_sst_size: 312 return True 313 return False 314 315 316class MySQLUtil: 317 @staticmethod 318 def connect(user, password, port, socket=None): 319 if socket: 320 dbh = MySQLdb.Connect(user=user, 321 passwd=password, 322 unix_socket=socket) 323 else: 324 dbh = MySQLdb.Connect(user=user, 325 passwd=password, 326 port=port, 327 host="127.0.0.1") 328 return dbh 329 330 @staticmethod 331 def create_checkpoint(dbh, checkpoint_dir): 332 sql = ("SET GLOBAL rocksdb_create_checkpoint='{0}'" 333 .format(checkpoint_dir)) 334 cur= dbh.cursor() 335 cur.execute(sql) 336 cur.close() 337 338 @staticmethod 339 def get_datadir(dbh): 340 sql = "SELECT @@datadir" 341 cur = dbh.cursor() 342 cur.execute(sql) 343 row = cur.fetchone() 344 return row[0] 345 346 @staticmethod 347 def is_directio_enabled(dbh): 348 sql = "SELECT @@global.rocksdb_use_direct_reads" 349 cur = dbh.cursor() 350 cur.execute(sql) 351 row = cur.fetchone() 352 return row[0] 353 354class BackupRunner: 355 datadir = None 356 start_backup_time = None 357 358 def __init__(self, datadir): 359 self.datadir = datadir 360 self.start_backup_time = time.time() 361 362 def start_backup_round(self, backup_round, prev_backup): 363 def signal_handler(*args): 364 logger.info("Got signal. Exit") 365 if b is not None: 366 logger.info("Cleaning up snapshot directory..") 367 b.do_cleanup() 368 sys.exit(1) 369 370 b = None 371 try: 372 signal.signal(signal.SIGINT, signal_handler) 373 w = None 374 if not opts.output_stream: 375 raise Exception("Currently only streaming backup is supported.") 376 377 snapshot_dir = opts.checkpoint_directory + '/' + str(backup_round) 378 dbh = MySQLUtil.connect(opts.mysql_user, 379 opts.mysql_password, 380 opts.mysql_port, 381 opts.mysql_socket) 382 direct = MySQLUtil.is_directio_enabled(dbh) 383 logger.info("Direct I/O: %d", direct) 384 385 w = StreamWriter(opts.output_stream, direct) 386 387 if not self.datadir: 388 self.datadir = MySQLUtil.get_datadir(dbh) 389 logger.info("Set datadir: %s", self.datadir) 390 logger.info("Creating checkpoint at %s", snapshot_dir) 391 MySQLUtil.create_checkpoint(dbh, snapshot_dir) 392 logger.info("Created checkpoint at %s", snapshot_dir) 393 b = RocksDBBackup(snapshot_dir, w, prev_backup) 394 return b.do_backup_until(opts.checkpoint_interval) 395 except Exception as e: 396 logger.error(e) 397 logger.error(traceback.format_exc()) 398 if b is not None: 399 logger.info("Cleaning up snapshot directory.") 400 b.do_cleanup() 401 sys.exit(1) 402 403 def backup_mysql(self): 404 try: 405 w = None 406 if opts.output_stream: 407 w = StreamWriter(opts.output_stream) 408 else: 409 raise Exception("Currently only streaming backup is supported.") 410 b = MySQLBackup(self.datadir, w, opts.skip_check_frm_timestamp, 411 self.start_backup_time) 412 logger.info("Taking MySQL misc backups..") 413 b.process() 414 logger.info("MySQL misc backups done.") 415 except Exception as e: 416 logger.error(e) 417 logger.error(traceback.format_exc()) 418 sys.exit(1) 419 420 421class WDTBackup: 422 datadir = None 423 start_backup_time = None 424 425 def __init__(self, datadir): 426 self.datadir = datadir 427 self.start_backup_time = time.time() 428 429 def cleanup(self, snapshot_dir, server_log): 430 if server_log: 431 server_log.seek(0) 432 logger.info("WDT server log:") 433 logger.info(server_log.read()) 434 server_log.close() 435 if snapshot_dir: 436 logger.info("Cleaning up snapshot dir %s", snapshot_dir) 437 shutil.rmtree(snapshot_dir) 438 439 def backup_with_timeout(self, backup_round): 440 def signal_handler(*args): 441 logger.info("Got signal. Exit") 442 self.cleanup(snapshot_dir, server_log) 443 sys.exit(1) 444 445 logger.info("Starting backup round %d", backup_round) 446 snapshot_dir = None 447 server_log = None 448 try: 449 signal.signal(signal.SIGINT, signal_handler) 450 # create rocksdb snapshot 451 snapshot_dir = os.path.join(opts.checkpoint_directory, str(backup_round)) 452 dbh = MySQLUtil.connect(opts.mysql_user, 453 opts.mysql_password, 454 opts.mysql_port, 455 opts.mysql_socket) 456 logger.info("Creating checkpoint at %s", snapshot_dir) 457 MySQLUtil.create_checkpoint(dbh, snapshot_dir) 458 logger.info("Created checkpoint at %s", snapshot_dir) 459 460 # get datadir if not provided 461 if not self.datadir: 462 self.datadir = MySQLUtil.get_datadir(dbh) 463 logger.info("Set datadir: %s", self.datadir) 464 465 # create links for misc files 466 link_creator = MiscFilesLinkCreator(self.datadir, snapshot_dir, 467 opts.skip_check_frm_timestamp, 468 self.start_backup_time) 469 link_creator.process() 470 471 current_path = os.path.join(opts.backupdir, "CURRENT") 472 473 # construct receiver cmd, using the data directory as recovery-id. 474 # we delete the current file because it is not append-only, therefore not 475 # resumable. 476 remote_cmd = ( 477 "ssh {0} rm -f {1}; " 478 "{2} -directory {3} -enable_download_resumption " 479 "-recovery_id {4} -start_port 0 -abort_after_seconds {5} {6}" 480 ).format(opts.destination, 481 current_path, 482 wdt_bin, 483 opts.backupdir, 484 self.datadir, 485 opts.checkpoint_interval, 486 opts.extra_wdt_receiver_options) 487 logger.info("WDT remote cmd %s", remote_cmd) 488 server_log = tempfile.TemporaryFile() 489 remote_process = subprocess.Popen(remote_cmd.split(), 490 stdout=subprocess.PIPE, 491 stderr=server_log) 492 wdt_url = remote_process.stdout.readline().strip() 493 if not wdt_url: 494 raise Exception("Unable to get connection url from wdt receiver") 495 sender_cmd = ( 496 "{0} -connection_url \'{1}\' -directory {2} -app_name=myrocks " 497 "-avg_mbytes_per_sec {3} " 498 "-enable_download_resumption -abort_after_seconds {4} {5}" 499 ).format(wdt_bin, 500 wdt_url, 501 snapshot_dir, 502 opts.avg_mbytes_per_sec, 503 opts.checkpoint_interval, 504 opts.extra_wdt_sender_options) 505 sender_status = os.system(sender_cmd) >> 8 506 remote_status = remote_process.wait() 507 self.cleanup(snapshot_dir, server_log) 508 # TODO: handle retryable and non-retyable errors differently 509 return (sender_status == 0 and remote_status == 0) 510 511 except Exception as e: 512 logger.error(e) 513 logger.error(traceback.format_exc()) 514 self.cleanup(snapshot_dir, server_log) 515 sys.exit(1) 516 517 518def backup_using_wdt(): 519 if not opts.destination: 520 logger.error("Must provide remote destination when using WDT") 521 sys.exit(1) 522 523 # TODO: detect whether WDT is installed 524 logger.info("Backing up myrocks to %s using WDT", opts.destination) 525 wdt_backup = WDTBackup(opts.datadir) 526 finished = False 527 backup_round = 1 528 while not finished: 529 start_time = time.time() 530 finished = wdt_backup.backup_with_timeout(backup_round) 531 end_time = time.time() 532 duration_seconds = end_time - start_time 533 if (not finished) and (duration_seconds < opts.checkpoint_interval): 534 # round finished before timeout 535 sleep_duration = (opts.checkpoint_interval - duration_seconds) 536 logger.info("Sleeping for %f seconds", sleep_duration) 537 time.sleep(sleep_duration) 538 539 backup_round = backup_round + 1 540 logger.info("Finished myrocks backup using WDT") 541 542 543def init_logger(): 544 global logger 545 logger = logging.getLogger('myrocks_hotbackup') 546 logger.setLevel(logging.INFO) 547 h1= logging.StreamHandler(sys.stderr) 548 f = logging.Formatter("%(asctime)s.%(msecs)03d %(levelname)s %(message)s", 549 "%Y-%m-%d %H:%M:%S") 550 h1.setFormatter(f) 551 logger.addHandler(h1) 552 553backup_wdt_usage = ("Backup using WDT: myrocks_hotbackup " 554 "--user=root --password=pw --stream=wdt " 555 "--checkpoint_dir=<directory where temporary backup hard links " 556 "are created> --destination=<remote host name> --backup_dir=" 557 "<remote directory name>. This has to be executed at the src " 558 "host.") 559backup_usage= "Backup: set -o pipefail; myrocks_hotbackup --user=root --password=pw --port=3306 --checkpoint_dir=<directory where temporary backup hard links are created> | ssh -o NoneEnabled=yes remote_server 'tar -xi -C <directory on remote server where backups will be sent>' . You need to execute backup command on a server where you take backups." 560move_back_usage= "Move-Back: myrocks_hotbackup --move_back --datadir=<dest mysql datadir> --rocksdb_datadir=<dest rocksdb datadir> --rocksdb_waldir=<dest rocksdb wal dir> --backup_dir=<where backup files are stored> . You need to execute move-back command on a server where backup files are sent." 561 562 563def parse_options(): 564 global opts 565 parser = OptionParser(usage = "\n\n" + backup_usage + "\n\n" + \ 566 backup_wdt_usage + "\n\n" + move_back_usage) 567 parser.add_option('-i', '--interval', type='int', dest='checkpoint_interval', 568 default=300, 569 help='Number of seconds to renew checkpoint') 570 parser.add_option('-c', '--checkpoint_dir', type='string', dest='checkpoint_directory', 571 default='/data/mysql/backup/snapshot', 572 help='Local directory name where checkpoints will be created.') 573 parser.add_option('-d', '--datadir', type='string', dest='datadir', 574 default=None, 575 help='backup mode: src MySQL datadir. move_back mode: dest MySQL datadir') 576 parser.add_option('-s', '--stream', type='string', dest='output_stream', 577 default='tar', 578 help='Setting streaming backup options. Currently tar, WDT ' 579 'and xbstream are supported. Default is tar') 580 parser.add_option('--destination', type='string', dest='destination', 581 default='', 582 help='Remote server name. Only used for WDT mode so far.') 583 parser.add_option('--avg_mbytes_per_sec', type='int', 584 dest='avg_mbytes_per_sec', 585 default=500, 586 help='Average backup rate in MBytes/sec. WDT only.') 587 parser.add_option('--extra_wdt_sender_options', type='string', 588 dest='extra_wdt_sender_options', 589 default='', 590 help='Extra options for WDT sender') 591 parser.add_option('--extra_wdt_receiver_options', type='string', 592 dest='extra_wdt_receiver_options', 593 default='', 594 help='Extra options for WDT receiver') 595 parser.add_option('-u', '--user', type='string', dest='mysql_user', 596 default='root', 597 help='MySQL user name') 598 parser.add_option('-p', '--password', type='string', dest='mysql_password', 599 default='', 600 help='MySQL password name') 601 parser.add_option('-P', '--port', type='int', dest='mysql_port', 602 default=3306, 603 help='MySQL port number') 604 parser.add_option('-S', '--socket', type='string', dest='mysql_socket', 605 default=None, 606 help='MySQL socket path. Takes precedence over --port.') 607 parser.add_option('-m', '--move_back', action='store_true', dest='move_back', 608 default=False, 609 help='Moving MyRocks backup files to proper locations.') 610 parser.add_option('-r', '--rocksdb_datadir', type='string', dest='rocksdb_datadir', 611 default=None, 612 help='RocksDB target data directory where backup data files will be moved. Must be empty.') 613 parser.add_option('-w', '--rocksdb_waldir', type='string', dest='rocksdb_waldir', 614 default=None, 615 help='RocksDB target data directory where backup wal files will be moved. Must be empty.') 616 parser.add_option('-b', '--backup_dir', type='string', dest='backupdir', 617 default=None, 618 help='backup mode for WDT: Remote directory to store ' 619 'backup. move_back mode: Locations where backup ' 620 'files are stored.') 621 parser.add_option('-f', '--skip_check_frm_timestamp', 622 dest='skip_check_frm_timestamp', 623 action='store_true', default=False, 624 help='skipping to check if frm files are updated after starting backup.') 625 parser.add_option('-D', '--debug_signal_file', type='string', dest='debug_signal_file', 626 default=None, 627 help='debugging purpose: waiting until the specified file is created') 628 629 opts, args = parser.parse_args() 630 631 632def create_moveback_dir(directory): 633 if not os.path.exists(directory): 634 os.makedirs(directory) 635 else: 636 for f in os.listdir(directory): 637 logger.error("Directory %s has file or directory %s!", directory, f) 638 raise 639 640def print_move_back_usage(): 641 logger.warning(move_back_usage) 642 643def move_back(): 644 if opts.rocksdb_datadir is None or opts.rocksdb_waldir is None or opts.backupdir is None or opts.datadir is None: 645 print_move_back_usage() 646 sys.exit() 647 create_moveback_dir(opts.datadir) 648 create_moveback_dir(opts.rocksdb_datadir) 649 create_moveback_dir(opts.rocksdb_waldir) 650 651 os.chdir(opts.backupdir) 652 for f in os.listdir(opts.backupdir): 653 if os.path.isfile(os.path.join(opts.backupdir,f)): 654 if f.endswith(rocksdb_wal_suffix): 655 shutil.move(f, opts.rocksdb_waldir) 656 elif f.endswith(rocksdb_data_suffix) or is_manifest(f): 657 shutil.move(f, opts.rocksdb_datadir) 658 else: 659 shutil.move(f, opts.datadir) 660 else: #directory 661 if f.endswith('.rocksdb'): 662 continue 663 shutil.move(f, opts.datadir) 664 665def start_backup(): 666 logger.info("Starting backup.") 667 runner = BackupRunner(opts.datadir) 668 b = None 669 backup_round= 1 670 while True: 671 b = runner.start_backup_round(backup_round, b) 672 backup_round = backup_round + 1 673 if b.finished is True: 674 b.print_backup_report() 675 logger.info("RocksDB Backup Done.") 676 break 677 if opts.debug_signal_file: 678 while not os.path.exists(opts.debug_signal_file): 679 logger.info("Waiting until %s is created..", opts.debug_signal_file) 680 time.sleep(1) 681 runner.backup_mysql() 682 logger.info("All Backups Done.") 683 684 685def main(): 686 parse_options() 687 init_logger() 688 689 if opts.move_back is True: 690 move_back() 691 elif opts.output_stream == 'wdt': 692 backup_using_wdt() 693 else: 694 start_backup() 695 696if __name__ == "__main__": 697 main() 698