1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# Copyright (C) 2015-2020 Bareos GmbH & Co. KG 4# 5# This program is Free Software; you can redistribute it and/or 6# modify it under the terms of version three of the GNU Affero General Public 7# License as published by the Free Software Foundation, which is 8# listed in the file LICENSE. 9# 10# This program is distributed in the hope that it will be useful, but 11# WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13# Affero General Public License for more details. 14# 15# You should have received a copy of the GNU Affero General Public License 16# along with this program; if not, write to the Free Software 17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 18# 02110-1301, USA. 19# 20# Author: Maik Aussendorf 21# 22# Uses Percona's xtrabackup for backup and restore of MySQL / MariaDB databases 23 24from bareosfd import * 25from bareos_fd_consts import * 26import os 27from subprocess import * 28from BareosFdPluginBaseclass import * 29import BareosFdWrapper 30import datetime 31import time 32import tempfile 33import shutil 34import json 35 36 37class BareosFdPercona(BareosFdPluginBaseclass): 38 """ 39 Plugin for backing up all mysql innodb databases found in a specific mysql server 40 using the Percona xtrabackup tool. 41 """ 42 43 def __init__(self, context, plugindef): 44 # BareosFdPluginBaseclass.__init__(self, context, plugindef) 45 super(BareosFdPercona, self).__init__(context, plugindef) 46 # we first create and backup the stream and after that 47 # the lsn file as restore-object 48 self.files_to_backup = ["lsnfile", "stream"] 49 self.tempdir = tempfile.mkdtemp() 50 # self.logdir = GetValue(context, bVariable['bVarWorkingDir']) 51 self.logdir = "/var/log/bareos/" 52 self.log = "bareos-plugin-percona.log" 53 self.rop_data = {} 54 self.max_to_lsn = 0 55 self.err_fd = None 56 57 def parse_plugin_definition(self, context, plugindef): 58 """ 59 We have default options that should work out of the box in the most use cases 60 that the mysql/mariadb is on the same host and can be accessed without user/password information, 61 e.g. with a valid my.cnf for user root. 62 """ 63 BareosFdPluginBaseclass.parse_plugin_definition(self, context, plugindef) 64 65 if "dumpbinary" in self.options: 66 self.dumpbinary = self.options["dumpbinary"] 67 else: 68 self.dumpbinary = "xtrabackup" 69 70 if "restorecommand" not in self.options: 71 self.restorecommand = "xbstream -x -C " 72 else: 73 self.restorecommand = self.options["restorecommand"] 74 75 # Default is not to write an extra logfile 76 if "log" not in self.options: 77 self.log = False 78 elif self.options["log"] == "false": 79 self.log = False 80 elif os.path.isabs(self.options["log"]): 81 self.log = self.options["log"] 82 else: 83 self.log = os.path.join(self.logdir, self.options["log"]) 84 85 # By default, standard mysql-config files will be used, set 86 # this option to use extra files 87 self.connect_options = {"read_default_group": "client"} 88 if "mycnf" in self.options: 89 self.connect_options["read_default_file"] = self.options["mycnf"] 90 self.mycnf = "--defaults-extra-file=%s " % self.options["mycnf"] 91 else: 92 self.mycnf = "" 93 94 # If true, incremental jobs will only be performed, if LSN has increased 95 # since last call. 96 if ( 97 "strictIncremental" in self.options 98 and self.options["strictIncremental"] == "true" 99 ): 100 self.strictIncremental = True 101 else: 102 self.strictIncremental = False 103 104 self.dumpoptions = self.mycnf 105 106 # if dumpoptions is set, we use that here, otherwise defaults 107 if "dumpoptions" in self.options: 108 self.dumpoptions += self.options["dumpoptions"] 109 else: 110 self.dumpoptions += " --backup --stream=xbstream" 111 112 self.dumpoptions += " --extra-lsndir=%s" % self.tempdir 113 114 if "extradumpoptions" in self.options: 115 self.dumpoptions += " " + self.options["extradumpoptions"] 116 117 # We need to call mysql to get the current Log Sequece Number (LSN) 118 if "mysqlcmd" in self.options: 119 self.mysqlcmd = self.options["mysqlcmd"] 120 else: 121 self.mysqlcmd = "mysql %s -r" % self.mycnf 122 123 return bRCs["bRC_OK"] 124 125 def check_plugin_options(self, context, mandatory_options=None): 126 accurate_enabled = GetValue(context, bVariable["bVarAccurate"]) 127 if accurate_enabled is not None and accurate_enabled != 0: 128 JobMessage( 129 context, 130 bJobMessageType["M_FATAL"], 131 "start_backup_job: Accurate backup not allowed please disable in Job\n", 132 ) 133 return bRCs["bRC_Error"] 134 else: 135 return bRCs["bRC_OK"] 136 137 def create_file(self, context, restorepkt): 138 """ 139 On restore we create a subdirectory for the first base backup and each incremental backup. 140 Because percona expects an empty directory, we create a tree starting with jobId/ of restore job 141 """ 142 FNAME = restorepkt.ofname 143 DebugMessage(context, 100, "create file with %s called\n" % FNAME) 144 self.writeDir = "%s/%d/" % (os.path.dirname(FNAME), self.jobId) 145 # FNAME contains originating jobId after last . 146 origJobId = int(FNAME.rpartition(".")[-1]) 147 if origJobId in self.rop_data: 148 rop_from_lsn = int(self.rop_data[origJobId]["from_lsn"]) 149 rop_to_lsn = int(self.rop_data[origJobId]["to_lsn"]) 150 self.writeDir += "/%020d_" % rop_from_lsn 151 self.writeDir += "%020d_" % rop_to_lsn 152 self.writeDir += "%010d" % origJobId 153 else: 154 JobMessage( 155 context, 156 bJobMessageType["M_ERROR"], 157 "No lsn information found in restore object for file %s from job %d\n" 158 % (FNAME, origJobId), 159 ) 160 161 # Create restore directory, if not existent 162 if not os.path.exists(self.writeDir): 163 bareosfd.DebugMessage( 164 context, 165 200, 166 "Directory %s does not exist, creating it now\n" % self.writeDir, 167 ) 168 os.makedirs(self.writeDir) 169 # Percona requires empty directory 170 if os.listdir(self.writeDir): 171 JobMessage( 172 context, 173 bJobMessageType["M_FATAL"], 174 "Restore with xbstream needs empty directory: %s\n" % self.writeDir, 175 ) 176 return bRCs["bRC_Error"] 177 self.restorecommand += self.writeDir 178 DebugMessage( 179 context, 180 100, 181 'Restore using xbstream to extract files with "%s"\n' % self.restorecommand, 182 ) 183 restorepkt.create_status = bCFs["CF_EXTRACT"] 184 return bRCs["bRC_OK"] 185 186 def start_backup_job(self, context): 187 """ 188 We will check, if database has changed since last backup 189 in the incremental case 190 """ 191 check_option_bRC = self.check_plugin_options(context) 192 if check_option_bRC != bRCs["bRC_OK"]: 193 return check_option_bRC 194 bareosfd.DebugMessage( 195 context, 100, "start_backup_job, level: %s\n" % chr(self.level) 196 ) 197 if chr(self.level) == "I": 198 # We check, if we have a LSN received by restore object from previous job 199 if self.max_to_lsn == 0: 200 JobMessage( 201 context, 202 bJobMessageType["M_FATAL"], 203 "No LSN received to be used with incremental backup\n", 204 ) 205 return bRCs["bRC_Error"] 206 # Try to load MySQLdb module 207 hasMySQLdbModule = False 208 try: 209 import MySQLdb 210 211 hasMySQLdbModule = True 212 bareosfd.DebugMessage(context, 100, "Imported module MySQLdb\n") 213 except ImportError: 214 bareosfd.DebugMessage( 215 context, 216 100, 217 "Import of module MySQLdb failed. Using command pipe instead\n", 218 ) 219 # contributed by https://github.com/kjetilho 220 if hasMySQLdbModule: 221 try: 222 conn = MySQLdb.connect(**self.connect_options) 223 cursor = conn.cursor() 224 cursor.execute("SHOW ENGINE INNODB STATUS") 225 result = cursor.fetchall() 226 if len(result) == 0: 227 JobMessage( 228 context, 229 bJobMessageType["M_FATAL"], 230 "Could not fetch SHOW ENGINE INNODB STATUS, unprivileged user?", 231 ) 232 return bRCs["bRC_Error"] 233 info = result[0][2] 234 conn.close() 235 for line in info.split("\n"): 236 if line.startswith("Log sequence number"): 237 last_lsn = int(line.split(" ")[3]) 238 except Exception as e: 239 JobMessage( 240 context, 241 bJobMessageType["M_FATAL"], 242 "Could not get LSN, Error: %s" % e, 243 ) 244 return bRCs["bRC_Error"] 245 # use old method as fallback, if module MySQLdb not available 246 else: 247 get_lsn_command = ( 248 "echo 'SHOW ENGINE INNODB STATUS' | %s | grep 'Log sequence number' | cut -d ' ' -f 4" 249 % self.mysqlcmd 250 ) 251 last_lsn_proc = Popen( 252 get_lsn_command, shell=True, stdout=PIPE, stderr=PIPE 253 ) 254 last_lsn_proc.wait() 255 returnCode = last_lsn_proc.poll() 256 (mysqlStdOut, mysqlStdErr) = last_lsn_proc.communicate() 257 if returnCode != 0 or mysqlStdErr: 258 JobMessage( 259 context, 260 bJobMessageType["M_FATAL"], 261 'Could not get LSN with command "%s", Error: %s' 262 % (get_lsn_command, mysqlStdErr), 263 ) 264 return bRCs["bRC_Error"] 265 else: 266 try: 267 last_lsn = int(mysqlStdOut) 268 except: 269 JobMessage( 270 context, 271 bJobMessageType["M_FATAL"], 272 'Error reading LSN: "%s" not an integer' % mysqlStdOut, 273 ) 274 return bRCs["bRC_Error"] 275 JobMessage( 276 context, bJobMessageType["M_INFO"], "Backup until LSN: %d\n" % last_lsn 277 ) 278 if ( 279 self.max_to_lsn > 0 280 and self.max_to_lsn >= last_lsn 281 and self.strictIncremental 282 ): 283 bareosfd.DebugMessage( 284 context, 285 100, 286 "Last LSN of DB %d is not higher than LSN from previous job %d. Skipping this incremental backup\n" 287 % (last_lsn, self.max_to_lsn), 288 ) 289 self.files_to_backup = ["lsn_only"] 290 return bRCs["bRC_OK"] 291 return bRCs["bRC_OK"] 292 293 def start_backup_file(self, context, savepkt): 294 """ 295 This method is called, when Bareos is ready to start backup a file 296 """ 297 if not self.files_to_backup: 298 self.file_to_backup = None 299 DebugMessage(context, 100, "start_backup_file: None\n") 300 else: 301 self.file_to_backup = self.files_to_backup.pop() 302 DebugMessage(context, 100, "start_backup_file: %s\n" % self.file_to_backup) 303 304 statp = StatPacket() 305 savepkt.statp = statp 306 307 if self.file_to_backup == "stream": 308 # This is the database backup as xbstream 309 savepkt.fname = "/_percona/xbstream.%010d" % self.jobId 310 savepkt.type = bFileType["FT_REG"] 311 if self.max_to_lsn > 0: 312 self.dumpoptions += " --incremental-lsn=%d" % self.max_to_lsn 313 self.dumpcommand = "%s %s" % (self.dumpbinary, self.dumpoptions) 314 DebugMessage(context, 100, "Dumper: '" + self.dumpcommand + "'\n") 315 elif self.file_to_backup == "lsnfile": 316 # The restore object containing the log sequence number (lsn) 317 # Read checkpoints and create restore object 318 checkpoints = {} 319 # improve: Error handling 320 with open("%s/xtrabackup_checkpoints" % self.tempdir) as lsnfile: 321 for line in lsnfile: 322 key, value = line.partition("=")[::2] 323 checkpoints[key.strip()] = value.strip() 324 savepkt.fname = "/_percona/xtrabackup_checkpoints" 325 savepkt.type = bFileType["FT_RESTORE_FIRST"] 326 savepkt.object_name = savepkt.fname 327 savepkt.object = bytearray(json.dumps(checkpoints)) 328 savepkt.object_len = len(savepkt.object) 329 savepkt.object_index = int(time.time()) 330 shutil.rmtree(self.tempdir) 331 elif self.file_to_backup == "lsn_only": 332 # We have nothing to backup incremental, so we just have to pass 333 # the restore object from previous job 334 savepkt.fname = "/_percona/xtrabackup_checkpoints" 335 savepkt.type = bFileType["FT_RESTORE_FIRST"] 336 savepkt.object_name = savepkt.fname 337 savepkt.object = bytearray(self.row_rop_raw) 338 savepkt.object_len = len(savepkt.object) 339 savepkt.object_index = int(time.time()) 340 else: 341 # should not happen 342 JobMessage( 343 context, 344 bJobMessageType["M_FATAL"], 345 "Unknown error. Don't know how to handle %s\n" % self.file_to_backup, 346 ) 347 348 JobMessage( 349 context, 350 bJobMessageType["M_INFO"], 351 "Starting backup of " + savepkt.fname + "\n", 352 ) 353 return bRCs["bRC_OK"] 354 355 def plugin_io(self, context, IOP): 356 """ 357 Called for io operations. We read from pipe into buffers or on restore 358 send to xbstream 359 """ 360 DebugMessage(context, 200, "plugin_io called with " + str(IOP.func) + "\n") 361 362 if IOP.func == bIOPS["IO_OPEN"]: 363 DebugMessage(context, 100, "plugin_io called with IO_OPEN\n") 364 if self.log: 365 try: 366 self.err_fd = open(self.log, "a") 367 except IOError as msg: 368 DebugMessage( 369 context, 370 100, 371 "Could not open log file (%s): %s\n" 372 % (self.log, format(str(msg))), 373 ) 374 if IOP.flags & (os.O_CREAT | os.O_WRONLY): 375 if self.log: 376 self.err_fd.write( 377 '%s Restore Job %s opens stream with "%s"\n' 378 % (datetime.datetime.now(), self.jobId, self.restorecommand) 379 ) 380 self.stream = Popen( 381 self.restorecommand, shell=True, stdin=PIPE, stderr=self.err_fd 382 ) 383 else: 384 if self.log: 385 self.err_fd.write( 386 '%s Backup Job %s opens stream with "%s"\n' 387 % (datetime.datetime.now(), self.jobId, self.dumpcommand) 388 ) 389 self.stream = Popen( 390 self.dumpcommand, shell=True, stdout=PIPE, stderr=self.err_fd 391 ) 392 return bRCs["bRC_OK"] 393 394 elif IOP.func == bIOPS["IO_READ"]: 395 IOP.buf = bytearray(IOP.count) 396 IOP.status = self.stream.stdout.readinto(IOP.buf) 397 IOP.io_errno = 0 398 return bRCs["bRC_OK"] 399 400 elif IOP.func == bIOPS["IO_WRITE"]: 401 try: 402 self.stream.stdin.write(IOP.buf) 403 IOP.status = IOP.count 404 IOP.io_errno = 0 405 except IOError as msg: 406 IOP.io_errno = -1 407 DebugMessage( 408 context, 100, "Error writing data: " + format(str(msg)) + "\n" 409 ) 410 return bRCs["bRC_OK"] 411 412 elif IOP.func == bIOPS["IO_CLOSE"]: 413 DebugMessage(context, 100, "plugin_io called with IO_CLOSE\n") 414 self.subprocess_returnCode = self.stream.poll() 415 if self.subprocess_returnCode is None: 416 # Subprocess is open, we wait until it finishes and get results 417 try: 418 self.stream.communicate() 419 self.subprocess_returnCode = self.stream.poll() 420 except: 421 JobMessage( 422 context, 423 bJobMessageType["M_ERROR"], 424 "Dump / restore command not finished properly\n", 425 ) 426 bRCs["bRC_Error"] 427 return bRCs["bRC_OK"] 428 else: 429 DebugMessage( 430 context, 431 100, 432 "Subprocess has terminated with returncode: %d\n" 433 % self.subprocess_returnCode, 434 ) 435 return bRCs["bRC_OK"] 436 437 elif IOP.func == bIOPS["IO_SEEK"]: 438 return bRCs["bRC_OK"] 439 440 else: 441 DebugMessage( 442 context, 443 100, 444 "plugin_io called with unsupported IOP:" + str(IOP.func) + "\n", 445 ) 446 return bRCs["bRC_OK"] 447 448 def end_backup_file(self, context): 449 """ 450 Check if dump was successful. 451 """ 452 # Usually the xtrabackup process should have terminated here, but on some servers 453 # it has not always. 454 if self.file_to_backup == "stream": 455 returnCode = self.subprocess_returnCode 456 if returnCode is None: 457 JobMessage( 458 context, 459 bJobMessageType["M_ERROR"], 460 "Dump command not finished properly for unknown reason\n", 461 ) 462 returnCode = -99 463 else: 464 DebugMessage( 465 context, 466 100, 467 "end_backup_file() entry point in Python called. Returncode: %d\n" 468 % self.stream.returncode, 469 ) 470 if returnCode != 0: 471 msg = [ 472 "Dump command returned non-zero value: %d" % returnCode, 473 'command: "%s"' % self.dumpcommand, 474 ] 475 if self.log: 476 msg += ['log file: "%s"' % self.log] 477 JobMessage( 478 context, bJobMessageType["M_FATAL"], ", ".join(msg) + "\n" 479 ) 480 if returnCode != 0: 481 return bRCs["bRC_Error"] 482 483 if self.log: 484 self.err_fd.write( 485 "%s Backup Job %s closes stream\n" 486 % (datetime.datetime.now(), self.jobId) 487 ) 488 self.err_fd.close() 489 490 if self.files_to_backup: 491 return bRCs["bRC_More"] 492 else: 493 return bRCs["bRC_OK"] 494 495 def end_restore_file(self, context): 496 """ 497 Check if writing to restore command was successful. 498 """ 499 returnCode = self.subprocess_returnCode 500 if returnCode is None: 501 JobMessage( 502 context, 503 bJobMessageType["M_ERROR"], 504 "Restore command not finished properly for unknown reason\n", 505 ) 506 returnCode = -99 507 else: 508 DebugMessage( 509 context, 510 100, 511 "end_restore_file() entry point in Python called. Returncode: %d\n" 512 % self.stream.returncode, 513 ) 514 if returnCode != 0: 515 msg = ["Restore command returned non-zero value: %d" % return_code] 516 if self.log: 517 msg += ['log file: "%s"' % self.log] 518 JobMessage(context, bJobMessageType["M_ERROR"], ", ".join(msg) + "\n") 519 if self.log: 520 self.err_fd.write( 521 "%s Restore Job %s closes stream\n" 522 % (datetime.datetime.now(), self.jobId) 523 ) 524 self.err_fd.close() 525 526 if returnCode == 0: 527 return bRCs["bRC_OK"] 528 else: 529 return bRCs["bRC_Error"] 530 531 def restore_object_data(self, context, ROP): 532 """ 533 Called on restore and on diff/inc jobs. 534 """ 535 # Improve: sanity / consistence check of restore object 536 self.row_rop_raw = ROP.object 537 self.rop_data[ROP.jobid] = json.loads(str(self.row_rop_raw)) 538 if ( 539 "to_lsn" in self.rop_data[ROP.jobid] 540 and self.rop_data[ROP.jobid]["to_lsn"] > self.max_to_lsn 541 ): 542 self.max_to_lsn = int(self.rop_data[ROP.jobid]["to_lsn"]) 543 JobMessage( 544 context, 545 bJobMessageType["M_INFO"], 546 "Got to_lsn %d from restore object of job %d\n" 547 % (self.max_to_lsn, ROP.jobid), 548 ) 549 return bRCs["bRC_OK"] 550 551 552# vim: ts=4 tabstop=4 expandtab shiftwidth=4 softtabstop=4 553