1#!@PYTHON_SHEBANG@
2
3from __future__ import division
4from optparse import OptionParser
5import collections
6import signal
7import os
8import stat
9import sys
10import re
11import subprocess
12import logging
13import logging.handlers
14import time
15import datetime
16import shutil
17import traceback
18import tempfile
19
20import MySQLdb
21import MySQLdb.connections
22from MySQLdb import OperationalError, ProgrammingError
23
24logger = None
25opts = None
26rocksdb_files = ['MANIFEST', 'CURRENT', 'OPTIONS']
27rocksdb_data_suffix = '.sst'
28rocksdb_wal_suffix = '.log'
29exclude_files = ['master.info', 'relay-log.info', 'worker-relay-log.info',
30                 'auto.cnf', 'gaplock.log', 'ibdata', 'ib_logfile', '.trash']
31wdt_bin = 'wdt'
32
33def is_manifest(fname):
34  for m in rocksdb_files:
35    if fname.startswith(m):
36      return True
37  return False
38
39class Writer(object):
40  a = None
41  def __init__(self):
42    a = None
43
44class StreamWriter(Writer):
45  stream_cmd= ''
46
47  def __init__(self, stream_option, direct = 0):
48    super(StreamWriter, self).__init__()
49    if stream_option == 'tar':
50      self.stream_cmd= 'tar chf -'
51    elif stream_option == 'xbstream':
52      self.stream_cmd= 'xbstream -c'
53      if direct:
54        self.stream_cmd = self.stream_cmd + ' -d'
55    else:
56      raise Exception("Only tar or xbstream is supported as streaming option.")
57
58  def write(self, file_name):
59    rc= os.system(self.stream_cmd + " " + file_name)
60    if (rc != 0):
61      raise Exception("Got error on stream write: " + str(rc) + " " + file_name)
62
63
64class MiscFilesProcessor():
65  datadir = None
66  wildcard = r'.*\.[frm|MYD|MYI|MAD|MAI|MRG|TRG|TRN|ARM|ARZ|CSM|CSV|opt|par]'
67  regex = None
68  start_backup_time = None
69  skip_check_frm_timestamp = None
70
71  def __init__(self, datadir, skip_check_frm_timestamp, start_backup_time):
72    self.datadir = datadir
73    self.regex = re.compile(self.wildcard)
74    self.skip_check_frm_timestamp = skip_check_frm_timestamp
75    self.start_backup_time = start_backup_time
76
77  def process_db(self, db):
78    # do nothing
79    pass
80
81  def process_file(self, path):
82    # do nothing
83    pass
84
85  def check_frm_timestamp(self, fname, path):
86    if not self.skip_check_frm_timestamp and fname.endswith('.frm'):
87      if os.path.getmtime(path) > self.start_backup_time:
88        logger.error('FRM file %s was updated after starting backups. '
89                     'Schema could have changed and the resulting copy may '
90                     'not be valid. Aborting. '
91                     '(backup time: %s, file modifled time: %s)',
92                     path, datetime.datetime.fromtimestamp(self.start_backup_time).strftime('%Y-%m-%d %H:%M:%S'),
93                     datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime('%Y-%m-%d %H:%M:%S'))
94        raise Exception("Inconsistent frm file timestamp");
95
96  def process(self):
97    os.chdir(self.datadir)
98    for db in self.get_databases():
99      logger.info("Starting MySQL misc file traversal from database %s..", db)
100      self.process_db(db)
101      for f in self.get_files(db):
102        if self.match(f):
103          rel_path = os.path.join(db, f)
104          self.check_frm_timestamp(f, rel_path)
105          self.process_file(rel_path)
106    logger.info("Traversing misc files from data directory..")
107    for f in self.get_files(""):
108      should_skip = False
109      for e in exclude_files:
110        if f.startswith(e) or f.endswith(e):
111          logger.info("Skipping %s", f)
112          should_skip = True
113          break
114      if not should_skip:
115        self.process_file(f)
116
117  def match(self, filename):
118    if self.regex.match(filename):
119      return True
120    else:
121      return False
122
123  def get_databases(self):
124    dbs = []
125    dirs = [ d for d in os.listdir(self.datadir) \
126            if not os.path.isfile(os.path.join(self.datadir,d))]
127    for db in dirs:
128      if not db.startswith('.') and not self._is_socket(db) and not db == "#rocksdb":
129        dbs.append(db)
130    return dbs
131
132  def get_files(self, db):
133    dbdir = self.datadir + "/" + db
134    return [ f for f in os.listdir(dbdir) \
135            if os.path.isfile(os.path.join(dbdir,f))]
136
137  def _is_socket(self, item):
138      mode = os.stat(os.path.join(self.datadir, item)).st_mode
139      if stat.S_ISSOCK(mode):
140        return True
141      return False
142
143
144class MySQLBackup(MiscFilesProcessor):
145  writer = None
146
147  def __init__(self, datadir, writer, skip_check_frm_timestamp, start_backup_time):
148    MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time)
149    self.writer = writer
150
151  def process_file(self, fname):    # overriding base class
152    self.writer.write(fname)
153
154
155class MiscFilesLinkCreator(MiscFilesProcessor):
156  snapshot_dir = None
157
158  def __init__(self, datadir, snapshot_dir, skip_check_frm_timestamp, start_backup_time):
159    MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time)
160    self.snapshot_dir = snapshot_dir
161
162  def process_db(self, db):
163    snapshot_sub_dir = os.path.join(self.snapshot_dir, db)
164    os.makedirs(snapshot_sub_dir)
165
166  def process_file(self, path):
167    dst_path = os.path.join(self.snapshot_dir, path)
168    os.link(path, dst_path)
169
170
171# RocksDB backup
172class RocksDBBackup():
173  source_dir = None
174  writer = None
175  # sst files sent in this backup round
176  sent_sst = {}
177  # target sst files in this backup round
178  target_sst = {}
179  # sst files sent in all backup rounds
180  total_sent_sst= {}
181  # sum of sst file size sent in this backup round
182  sent_sst_size = 0
183  # sum of target sst file size in this backup round
184  # if sent_sst_size becomes equal to target_sst_size,
185  # it means the backup round finished backing up all sst files
186  target_sst_size = 0
187  # sum of all sst file size sent all backup rounds
188  total_sent_sst_size= 0
189  # sum of all target sst file size from all backup rounds
190  total_target_sst_size = 0
191  show_progress_size_interval= 1073741824 # 1GB
192  wal_files= []
193  manifest_files= []
194  finished= False
195
196  def __init__(self, source_dir, writer, prev):
197    self.source_dir = source_dir
198    self.writer = writer
199    os.chdir(self.source_dir)
200    self.init_target_files(prev)
201
202  def init_target_files(self, prev):
203    sst = {}
204    self.sent_sst = {}
205    self.target_sst= {}
206    self.total_sent_sst = {}
207    self.sent_sst_size = 0
208    self.target_sst_size = 0
209    self.total_sent_sst_size= 0
210    self.total_target_sst_size= 0
211    self.wal_files= []
212    self.manifest_files= []
213
214    for f in os.listdir(self.source_dir):
215      if f.endswith(rocksdb_data_suffix):
216        # exactly the same file (same size) was sent in previous backup rounds
217        if prev is not None and f in prev.total_sent_sst and int(os.stat(f).st_size) == prev.total_sent_sst[f]:
218          continue
219        sst[f]= int(os.stat(f).st_size)
220        self.target_sst_size = self.target_sst_size + os.stat(f).st_size
221      elif is_manifest(f):
222        self.manifest_files.append(f)
223      elif f.endswith(rocksdb_wal_suffix):
224        self.wal_files.append(f)
225    self.target_sst= collections.OrderedDict(sorted(sst.items()))
226
227    if prev is not None:
228      self.total_sent_sst = prev.total_sent_sst
229      self.total_sent_sst_size = prev.total_sent_sst_size
230      self.total_target_sst_size = self.target_sst_size + prev.total_sent_sst_size
231    else:
232      self.total_target_sst_size = self.target_sst_size
233
234  def do_backup_single(self, fname):
235    self.writer.write(fname)
236    os.remove(fname)
237
238  def do_backup_sst(self, fname, size):
239    self.do_backup_single(fname)
240    self.sent_sst[fname]= size
241    self.total_sent_sst[fname]= size
242    self.sent_sst_size = self.sent_sst_size + size
243    self.total_sent_sst_size = self.total_sent_sst_size + size
244
245  def do_backup_manifest(self):
246    for f in self.manifest_files:
247      self.do_backup_single(f)
248
249  def do_backup_wal(self):
250    for f in self.wal_files:
251      self.do_backup_single(f)
252
253  # this is the last snapshot round. backing up all the rest files
254  def do_backup_final(self):
255    logger.info("Backup WAL..")
256    self.do_backup_wal()
257    logger.info("Backup Manifest..")
258    self.do_backup_manifest()
259    self.do_cleanup()
260    self.finished= True
261
262  def do_cleanup(self):
263    shutil.rmtree(self.source_dir)
264    logger.info("Cleaned up checkpoint from %s", self.source_dir)
265
266  def do_backup_until(self, time_limit):
267    logger.info("Starting backup from snapshot: target files %d", len(self.target_sst))
268    start_time= time.time()
269    last_progress_time= start_time
270    progress_size= 0
271    for fname, size in self.target_sst.iteritems():
272      self.do_backup_sst(fname, size)
273      progress_size= progress_size + size
274      elapsed_seconds = time.time() - start_time
275      progress_seconds = time.time() - last_progress_time
276
277      if self.should_show_progress(size):
278        self.show_progress(progress_size, progress_seconds)
279        progress_size=0
280        last_progress_time= time.time()
281
282      if elapsed_seconds > time_limit and self.has_sent_all_sst() is False:
283        logger.info("Snapshot round finished. Elapsed Time: %5.2f.  Remaining sst files: %d",
284                    elapsed_seconds, len(self.target_sst) - len(self.sent_sst))
285        self.do_cleanup()
286        break;
287    if self.has_sent_all_sst():
288      self.do_backup_final()
289
290    return self
291
292  def should_show_progress(self, size):
293    if int(self.total_sent_sst_size/self.show_progress_size_interval) > int((self.total_sent_sst_size-size)/self.show_progress_size_interval):
294      return True
295    else:
296      return False
297
298  def show_progress(self, size, seconds):
299    logger.info("Backup Progress: %5.2f%%   Sent %6.2f GB of %6.2f GB data, Transfer Speed: %6.2f MB/s",
300                self.total_sent_sst_size*100/self.total_target_sst_size,
301                self.total_sent_sst_size/1024/1024/1024,
302                self.total_target_sst_size/1024/1024/1024,
303                size/seconds/1024/1024)
304
305  def print_backup_report(self):
306    logger.info("Sent %6.2f GB of sst files, %d files in total.",
307                self.total_sent_sst_size/1024/1024/1024,
308                len(self.total_sent_sst))
309
310  def has_sent_all_sst(self):
311    if self.sent_sst_size == self.target_sst_size:
312      return True
313    return False
314
315
316class MySQLUtil:
317  @staticmethod
318  def connect(user, password, port, socket=None):
319    if socket:
320      dbh = MySQLdb.Connect(user=user,
321                            passwd=password,
322                            unix_socket=socket)
323    else:
324      dbh = MySQLdb.Connect(user=user,
325                            passwd=password,
326                            port=port,
327                            host="127.0.0.1")
328    return dbh
329
330  @staticmethod
331  def create_checkpoint(dbh, checkpoint_dir):
332    sql = ("SET GLOBAL rocksdb_create_checkpoint='{0}'"
333           .format(checkpoint_dir))
334    cur= dbh.cursor()
335    cur.execute(sql)
336    cur.close()
337
338  @staticmethod
339  def get_datadir(dbh):
340    sql = "SELECT @@datadir"
341    cur = dbh.cursor()
342    cur.execute(sql)
343    row = cur.fetchone()
344    return row[0]
345
346  @staticmethod
347  def is_directio_enabled(dbh):
348    sql = "SELECT @@global.rocksdb_use_direct_reads"
349    cur = dbh.cursor()
350    cur.execute(sql)
351    row = cur.fetchone()
352    return row[0]
353
354class BackupRunner:
355  datadir = None
356  start_backup_time = None
357
358  def __init__(self, datadir):
359    self.datadir = datadir
360    self.start_backup_time = time.time()
361
362  def start_backup_round(self, backup_round, prev_backup):
363    def signal_handler(*args):
364      logger.info("Got signal. Exit")
365      if b is not None:
366        logger.info("Cleaning up snapshot directory..")
367        b.do_cleanup()
368      sys.exit(1)
369
370    b = None
371    try:
372      signal.signal(signal.SIGINT, signal_handler)
373      w = None
374      if not opts.output_stream:
375        raise Exception("Currently only streaming backup is supported.")
376
377      snapshot_dir = opts.checkpoint_directory + '/' + str(backup_round)
378      dbh = MySQLUtil.connect(opts.mysql_user,
379                              opts.mysql_password,
380                              opts.mysql_port,
381                              opts.mysql_socket)
382      direct = MySQLUtil.is_directio_enabled(dbh)
383      logger.info("Direct I/O: %d", direct)
384
385      w = StreamWriter(opts.output_stream, direct)
386
387      if not self.datadir:
388        self.datadir = MySQLUtil.get_datadir(dbh)
389        logger.info("Set datadir: %s", self.datadir)
390      logger.info("Creating checkpoint at %s", snapshot_dir)
391      MySQLUtil.create_checkpoint(dbh, snapshot_dir)
392      logger.info("Created checkpoint at %s", snapshot_dir)
393      b = RocksDBBackup(snapshot_dir, w, prev_backup)
394      return b.do_backup_until(opts.checkpoint_interval)
395    except Exception as e:
396      logger.error(e)
397      logger.error(traceback.format_exc())
398      if b is not None:
399        logger.info("Cleaning up snapshot directory.")
400        b.do_cleanup()
401      sys.exit(1)
402
403  def backup_mysql(self):
404    try:
405      w = None
406      if opts.output_stream:
407        w = StreamWriter(opts.output_stream)
408      else:
409        raise Exception("Currently only streaming backup is supported.")
410      b = MySQLBackup(self.datadir, w, opts.skip_check_frm_timestamp,
411                      self.start_backup_time)
412      logger.info("Taking MySQL misc backups..")
413      b.process()
414      logger.info("MySQL misc backups done.")
415    except Exception as e:
416      logger.error(e)
417      logger.error(traceback.format_exc())
418      sys.exit(1)
419
420
421class WDTBackup:
422  datadir = None
423  start_backup_time = None
424
425  def __init__(self, datadir):
426    self.datadir = datadir
427    self.start_backup_time = time.time()
428
429  def cleanup(self, snapshot_dir, server_log):
430    if server_log:
431      server_log.seek(0)
432      logger.info("WDT server log:")
433      logger.info(server_log.read())
434      server_log.close()
435    if snapshot_dir:
436      logger.info("Cleaning up snapshot dir %s", snapshot_dir)
437      shutil.rmtree(snapshot_dir)
438
439  def backup_with_timeout(self, backup_round):
440    def signal_handler(*args):
441      logger.info("Got signal. Exit")
442      self.cleanup(snapshot_dir, server_log)
443      sys.exit(1)
444
445    logger.info("Starting backup round %d", backup_round)
446    snapshot_dir = None
447    server_log = None
448    try:
449      signal.signal(signal.SIGINT, signal_handler)
450      # create rocksdb snapshot
451      snapshot_dir = os.path.join(opts.checkpoint_directory, str(backup_round))
452      dbh = MySQLUtil.connect(opts.mysql_user,
453                              opts.mysql_password,
454                              opts.mysql_port,
455                              opts.mysql_socket)
456      logger.info("Creating checkpoint at %s", snapshot_dir)
457      MySQLUtil.create_checkpoint(dbh, snapshot_dir)
458      logger.info("Created checkpoint at %s", snapshot_dir)
459
460      # get datadir if not provided
461      if not self.datadir:
462        self.datadir = MySQLUtil.get_datadir(dbh)
463        logger.info("Set datadir: %s", self.datadir)
464
465      # create links for misc files
466      link_creator = MiscFilesLinkCreator(self.datadir, snapshot_dir,
467                                          opts.skip_check_frm_timestamp,
468                                          self.start_backup_time)
469      link_creator.process()
470
471      current_path = os.path.join(opts.backupdir, "CURRENT")
472
473      # construct receiver cmd, using the data directory as recovery-id.
474      # we delete the current file because it is not append-only, therefore not
475      # resumable.
476      remote_cmd = (
477                "ssh {0} rm -f {1}; "
478                "{2} -directory {3} -enable_download_resumption "
479                "-recovery_id {4} -start_port 0 -abort_after_seconds {5} {6}"
480          ).format(opts.destination,
481                   current_path,
482                   wdt_bin,
483                   opts.backupdir,
484                   self.datadir,
485                   opts.checkpoint_interval,
486                   opts.extra_wdt_receiver_options)
487      logger.info("WDT remote cmd %s", remote_cmd)
488      server_log = tempfile.TemporaryFile()
489      remote_process = subprocess.Popen(remote_cmd.split(),
490                                        stdout=subprocess.PIPE,
491                                        stderr=server_log)
492      wdt_url = remote_process.stdout.readline().strip()
493      if not wdt_url:
494        raise Exception("Unable to get connection url from wdt receiver")
495      sender_cmd = (
496                "{0} -connection_url \'{1}\' -directory {2} -app_name=myrocks "
497                "-avg_mbytes_per_sec {3} "
498                "-enable_download_resumption -abort_after_seconds {4} {5}"
499          ).format(wdt_bin,
500                   wdt_url,
501                   snapshot_dir,
502                   opts.avg_mbytes_per_sec,
503                   opts.checkpoint_interval,
504                   opts.extra_wdt_sender_options)
505      sender_status = os.system(sender_cmd) >> 8
506      remote_status = remote_process.wait()
507      self.cleanup(snapshot_dir, server_log)
508      # TODO: handle retryable and non-retyable errors differently
509      return (sender_status == 0 and remote_status == 0)
510
511    except Exception as e:
512      logger.error(e)
513      logger.error(traceback.format_exc())
514      self.cleanup(snapshot_dir, server_log)
515      sys.exit(1)
516
517
518def backup_using_wdt():
519  if not opts.destination:
520    logger.error("Must provide remote destination when using WDT")
521    sys.exit(1)
522
523  # TODO: detect whether WDT is installed
524  logger.info("Backing up myrocks to %s using WDT", opts.destination)
525  wdt_backup = WDTBackup(opts.datadir)
526  finished = False
527  backup_round = 1
528  while not finished:
529    start_time = time.time()
530    finished = wdt_backup.backup_with_timeout(backup_round)
531    end_time = time.time()
532    duration_seconds = end_time - start_time
533    if (not finished) and (duration_seconds < opts.checkpoint_interval):
534      # round finished before timeout
535      sleep_duration = (opts.checkpoint_interval - duration_seconds)
536      logger.info("Sleeping for %f seconds", sleep_duration)
537      time.sleep(sleep_duration)
538
539    backup_round = backup_round + 1
540  logger.info("Finished myrocks backup using WDT")
541
542
543def init_logger():
544  global logger
545  logger = logging.getLogger('myrocks_hotbackup')
546  logger.setLevel(logging.INFO)
547  h1= logging.StreamHandler(sys.stderr)
548  f = logging.Formatter("%(asctime)s.%(msecs)03d %(levelname)s %(message)s",
549                        "%Y-%m-%d %H:%M:%S")
550  h1.setFormatter(f)
551  logger.addHandler(h1)
552
553backup_wdt_usage = ("Backup using WDT: myrocks_hotbackup "
554                "--user=root --password=pw --stream=wdt "
555                "--checkpoint_dir=<directory where temporary backup hard links "
556                "are created> --destination=<remote host name> --backup_dir="
557                "<remote directory name>. This has to be executed at the src "
558                "host.")
559backup_usage= "Backup: set -o pipefail; myrocks_hotbackup --user=root --password=pw --port=3306 --checkpoint_dir=<directory where temporary backup hard links are created> | ssh -o NoneEnabled=yes remote_server 'tar -xi -C <directory on remote server where backups will be sent>' . You need to execute backup command on a server where you take backups."
560move_back_usage= "Move-Back: myrocks_hotbackup --move_back --datadir=<dest mysql datadir> --rocksdb_datadir=<dest rocksdb datadir> --rocksdb_waldir=<dest rocksdb wal dir> --backup_dir=<where backup files are stored> . You need to execute move-back command on a server where backup files are sent."
561
562
563def parse_options():
564  global opts
565  parser = OptionParser(usage = "\n\n" + backup_usage + "\n\n" + \
566          backup_wdt_usage + "\n\n" + move_back_usage)
567  parser.add_option('-i', '--interval', type='int', dest='checkpoint_interval',
568                    default=300,
569                    help='Number of seconds to renew checkpoint')
570  parser.add_option('-c', '--checkpoint_dir', type='string', dest='checkpoint_directory',
571                    default='/data/mysql/backup/snapshot',
572                    help='Local directory name where checkpoints will be created.')
573  parser.add_option('-d', '--datadir', type='string', dest='datadir',
574                    default=None,
575                    help='backup mode: src MySQL datadir. move_back mode: dest MySQL datadir')
576  parser.add_option('-s', '--stream', type='string', dest='output_stream',
577                    default='tar',
578                    help='Setting streaming backup options. Currently tar, WDT '
579                    'and xbstream are supported. Default is tar')
580  parser.add_option('--destination', type='string', dest='destination',
581                    default='',
582                    help='Remote server name. Only used for WDT mode so far.')
583  parser.add_option('--avg_mbytes_per_sec', type='int',
584                    dest='avg_mbytes_per_sec',
585                    default=500,
586                    help='Average backup rate in MBytes/sec. WDT only.')
587  parser.add_option('--extra_wdt_sender_options', type='string',
588                    dest='extra_wdt_sender_options',
589                    default='',
590                    help='Extra options for WDT sender')
591  parser.add_option('--extra_wdt_receiver_options', type='string',
592                    dest='extra_wdt_receiver_options',
593                    default='',
594                    help='Extra options for WDT receiver')
595  parser.add_option('-u', '--user', type='string', dest='mysql_user',
596                    default='root',
597                    help='MySQL user name')
598  parser.add_option('-p', '--password', type='string', dest='mysql_password',
599                    default='',
600                    help='MySQL password name')
601  parser.add_option('-P', '--port', type='int', dest='mysql_port',
602                    default=3306,
603                    help='MySQL port number')
604  parser.add_option('-S', '--socket', type='string', dest='mysql_socket',
605                    default=None,
606                    help='MySQL socket path. Takes precedence over --port.')
607  parser.add_option('-m', '--move_back', action='store_true', dest='move_back',
608                    default=False,
609                    help='Moving MyRocks backup files to proper locations.')
610  parser.add_option('-r', '--rocksdb_datadir', type='string', dest='rocksdb_datadir',
611                    default=None,
612                    help='RocksDB target data directory where backup data files will be moved. Must be empty.')
613  parser.add_option('-w', '--rocksdb_waldir', type='string', dest='rocksdb_waldir',
614                    default=None,
615                    help='RocksDB target data directory where backup wal files will be moved. Must be empty.')
616  parser.add_option('-b', '--backup_dir', type='string', dest='backupdir',
617                    default=None,
618                    help='backup mode for WDT: Remote directory to store '
619                    'backup. move_back mode: Locations where backup '
620                    'files are stored.')
621  parser.add_option('-f', '--skip_check_frm_timestamp',
622                    dest='skip_check_frm_timestamp',
623                    action='store_true', default=False,
624                    help='skipping to check if frm files are updated after starting backup.')
625  parser.add_option('-D', '--debug_signal_file', type='string', dest='debug_signal_file',
626                    default=None,
627                    help='debugging purpose: waiting until the specified file is created')
628
629  opts, args = parser.parse_args()
630
631
632def create_moveback_dir(directory):
633  if not os.path.exists(directory):
634    os.makedirs(directory)
635  else:
636    for f in os.listdir(directory):
637      logger.error("Directory %s has file or directory %s!", directory, f)
638      raise
639
640def print_move_back_usage():
641  logger.warning(move_back_usage)
642
643def move_back():
644  if opts.rocksdb_datadir is None or opts.rocksdb_waldir is None or opts.backupdir is None or opts.datadir is None:
645    print_move_back_usage()
646    sys.exit()
647  create_moveback_dir(opts.datadir)
648  create_moveback_dir(opts.rocksdb_datadir)
649  create_moveback_dir(opts.rocksdb_waldir)
650
651  os.chdir(opts.backupdir)
652  for f in os.listdir(opts.backupdir):
653    if os.path.isfile(os.path.join(opts.backupdir,f)):
654      if f.endswith(rocksdb_wal_suffix):
655        shutil.move(f, opts.rocksdb_waldir)
656      elif f.endswith(rocksdb_data_suffix) or is_manifest(f):
657        shutil.move(f, opts.rocksdb_datadir)
658      else:
659        shutil.move(f, opts.datadir)
660    else: #directory
661      if f.endswith('.rocksdb'):
662        continue
663      shutil.move(f, opts.datadir)
664
665def start_backup():
666  logger.info("Starting backup.")
667  runner = BackupRunner(opts.datadir)
668  b = None
669  backup_round= 1
670  while True:
671    b = runner.start_backup_round(backup_round, b)
672    backup_round = backup_round + 1
673    if b.finished is True:
674      b.print_backup_report()
675      logger.info("RocksDB Backup Done.")
676      break
677  if opts.debug_signal_file:
678    while not os.path.exists(opts.debug_signal_file):
679      logger.info("Waiting until %s is created..", opts.debug_signal_file)
680      time.sleep(1)
681  runner.backup_mysql()
682  logger.info("All Backups Done.")
683
684
685def main():
686  parse_options()
687  init_logger()
688
689  if opts.move_back is True:
690    move_back()
691  elif opts.output_stream == 'wdt':
692    backup_using_wdt()
693  else:
694    start_backup()
695
696if __name__ == "__main__":
697  main()
698