1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# Copyright (C) 2015-2020 Bareos GmbH & Co. KG 4# 5# This program is Free Software; you can redistribute it and/or 6# modify it under the terms of version three of the GNU Affero General Public 7# License as published by the Free Software Foundation, which is 8# listed in the file LICENSE. 9# 10# This program is distributed in the hope that it will be useful, but 11# WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13# Affero General Public License for more details. 14# 15# You should have received a copy of the GNU Affero General Public License 16# along with this program; if not, write to the Free Software 17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 18# 02110-1301, USA. 19# 20# Author: Maik Aussendorf 21# 22# Uses Percona's xtrabackup for backup and restore of MySQL / MariaDB databases 23 24from bareosfd import * 25import os 26from subprocess import * 27from BareosFdPluginBaseclass import * 28import BareosFdWrapper 29import datetime 30import time 31import tempfile 32import shutil 33import json 34 35 36class BareosFdPercona(BareosFdPluginBaseclass): 37 """ 38 Plugin for backing up all mysql innodb databases found in a specific mysql server 39 using the Percona xtrabackup tool. 40 """ 41 42 def __init__(self, plugindef): 43 # BareosFdPluginBaseclass.__init__(self, plugindef) 44 super(BareosFdPercona, self).__init__(plugindef) 45 # we first create and backup the stream and after that 46 # the lsn file as restore-object 47 self.files_to_backup = ["lsnfile", "stream"] 48 self.tempdir = tempfile.mkdtemp() 49 # self.logdir = GetValue(bVarWorkingDir) 50 self.logdir = "/var/log/bareos/" 51 self.log = "bareos-plugin-percona.log" 52 self.rop_data = {} 53 self.max_to_lsn = 0 54 self.err_fd = None 55 56 def parse_plugin_definition(self, plugindef): 57 """ 58 We have default options that should work out of the box in the most use cases 59 that the mysql/mariadb is on the same host and can be accessed without user/password information, 60 e.g. with a valid my.cnf for user root. 61 """ 62 BareosFdPluginBaseclass.parse_plugin_definition(self, plugindef) 63 64 if "dumpbinary" in self.options: 65 self.dumpbinary = self.options["dumpbinary"] 66 else: 67 self.dumpbinary = "xtrabackup" 68 69 if "restorecommand" not in self.options: 70 self.restorecommand = "xbstream -x -C " 71 else: 72 self.restorecommand = self.options["restorecommand"] 73 74 # Default is not to write an extra logfile 75 if "log" not in self.options: 76 self.log = False 77 elif self.options["log"] == "false": 78 self.log = False 79 elif os.path.isabs(self.options["log"]): 80 self.log = self.options["log"] 81 else: 82 self.log = os.path.join(self.logdir, self.options["log"]) 83 84 # By default, standard mysql-config files will be used, set 85 # this option to use extra files 86 self.connect_options = {"read_default_group": "client"} 87 if "mycnf" in self.options: 88 self.connect_options["read_default_file"] = self.options["mycnf"] 89 self.mycnf = "--defaults-extra-file=%s " % self.options["mycnf"] 90 else: 91 self.mycnf = "" 92 93 # If true, incremental jobs will only be performed, if LSN has increased 94 # since last call. 95 if ( 96 "strictIncremental" in self.options 97 and self.options["strictIncremental"] == "true" 98 ): 99 self.strictIncremental = True 100 else: 101 self.strictIncremental = False 102 103 self.dumpoptions = self.mycnf 104 105 # if dumpoptions is set, we use that here, otherwise defaults 106 if "dumpoptions" in self.options: 107 self.dumpoptions += self.options["dumpoptions"] 108 else: 109 self.dumpoptions += " --backup --stream=xbstream" 110 111 self.dumpoptions += " --extra-lsndir=%s" % self.tempdir 112 113 if "extradumpoptions" in self.options: 114 self.dumpoptions += " " + self.options["extradumpoptions"] 115 116 # We need to call mysql to get the current Log Sequece Number (LSN) 117 if "mysqlcmd" in self.options: 118 self.mysqlcmd = self.options["mysqlcmd"] 119 else: 120 self.mysqlcmd = "mysql %s -r" % self.mycnf 121 122 return bRC_OK 123 124 def check_plugin_options(self, mandatory_options=None): 125 accurate_enabled = GetValue(bVarAccurate) 126 if accurate_enabled is not None and accurate_enabled != 0: 127 JobMessage( 128 M_FATAL, 129 "start_backup_job: Accurate backup not allowed please disable in Job\n", 130 ) 131 return bRC_Error 132 else: 133 return bRC_OK 134 135 def create_file(self, restorepkt): 136 """ 137 On restore we create a subdirectory for the first base backup and each incremental backup. 138 Because percona expects an empty directory, we create a tree starting with jobId/ of restore job 139 """ 140 FNAME = restorepkt.ofname 141 DebugMessage(100, "create file with %s called\n" % FNAME) 142 self.writeDir = "%s/%d/" % (os.path.dirname(FNAME), self.jobId) 143 # FNAME contains originating jobId after last . 144 origJobId = int(FNAME.rpartition(".")[-1]) 145 if origJobId in self.rop_data: 146 rop_from_lsn = int(self.rop_data[origJobId]["from_lsn"]) 147 rop_to_lsn = int(self.rop_data[origJobId]["to_lsn"]) 148 self.writeDir += "/%020d_" % rop_from_lsn 149 self.writeDir += "%020d_" % rop_to_lsn 150 self.writeDir += "%010d" % origJobId 151 else: 152 JobMessage( 153 M_ERROR, 154 "No lsn information found in restore object for file %s from job %d\n" 155 % (FNAME, origJobId), 156 ) 157 158 # Create restore directory, if not existent 159 if not os.path.exists(self.writeDir): 160 bareosfd.DebugMessage( 161 200, 162 "Directory %s does not exist, creating it now\n" % self.writeDir, 163 ) 164 os.makedirs(self.writeDir) 165 # Percona requires empty directory 166 if os.listdir(self.writeDir): 167 JobMessage( 168 M_FATAL, 169 "Restore with xbstream needs empty directory: %s\n" % self.writeDir, 170 ) 171 return bRC_Error 172 self.restorecommand += self.writeDir 173 DebugMessage( 174 100, 175 'Restore using xbstream to extract files with "%s"\n' % self.restorecommand, 176 ) 177 restorepkt.create_status = CF_EXTRACT 178 return bRC_OK 179 180 def start_backup_job(self): 181 """ 182 We will check, if database has changed since last backup 183 in the incremental case 184 """ 185 check_option_bRC = self.check_plugin_options() 186 if check_option_bRC != bRC_OK: 187 return check_option_bRC 188 bareosfd.DebugMessage(100, "start_backup_job, level: %s\n" % chr(self.level)) 189 if chr(self.level) == "I": 190 # We check, if we have a LSN received by restore object from previous job 191 if self.max_to_lsn == 0: 192 JobMessage( 193 M_FATAL, 194 "No LSN received to be used with incremental backup\n", 195 ) 196 return bRC_Error 197 # Try to load MySQLdb module 198 hasMySQLdbModule = False 199 try: 200 import MySQLdb 201 202 hasMySQLdbModule = True 203 bareosfd.DebugMessage(100, "Imported module MySQLdb\n") 204 except ImportError: 205 bareosfd.DebugMessage( 206 100, 207 "Import of module MySQLdb failed. Using command pipe instead\n", 208 ) 209 # contributed by https://github.com/kjetilho 210 if hasMySQLdbModule: 211 try: 212 conn = MySQLdb.connect(**self.connect_options) 213 cursor = conn.cursor() 214 cursor.execute("SHOW ENGINE INNODB STATUS") 215 result = cursor.fetchall() 216 if len(result) == 0: 217 JobMessage( 218 M_FATAL, 219 "Could not fetch SHOW ENGINE INNODB STATUS, unprivileged user?", 220 ) 221 return bRC_Error 222 info = result[0][2] 223 conn.close() 224 for line in info.split("\n"): 225 if line.startswith("Log sequence number"): 226 last_lsn = int(line.split(" ")[-1]) 227 except Exception as e: 228 JobMessage( 229 M_FATAL, 230 "Could not get LSN, Error: %s" % e, 231 ) 232 return bRC_Error 233 # use old method as fallback, if module MySQLdb not available 234 else: 235 get_lsn_command = ( 236 "echo 'SHOW ENGINE INNODB STATUS' | %s | grep 'Log sequence number' | awk '{ print $4 }'" 237 % self.mysqlcmd 238 ) 239 last_lsn_proc = Popen( 240 get_lsn_command, shell=True, stdout=PIPE, stderr=PIPE 241 ) 242 last_lsn_proc.wait() 243 returnCode = last_lsn_proc.poll() 244 (mysqlStdOut, mysqlStdErr) = last_lsn_proc.communicate() 245 if returnCode != 0 or mysqlStdErr: 246 JobMessage( 247 M_FATAL, 248 'Could not get LSN with command "%s", Error: %s' 249 % (get_lsn_command, mysqlStdErr), 250 ) 251 return bRC_Error 252 else: 253 try: 254 last_lsn = int(mysqlStdOut) 255 except: 256 JobMessage( 257 M_FATAL, 258 'Error reading LSN: "%s" not an integer' % mysqlStdOut, 259 ) 260 return bRC_Error 261 JobMessage(M_INFO, "Backup until LSN: %d\n" % last_lsn) 262 if ( 263 self.max_to_lsn > 0 264 and self.max_to_lsn >= last_lsn 265 and self.strictIncremental 266 ): 267 bareosfd.DebugMessage( 268 100, 269 "Last LSN of DB %d is not higher than LSN from previous job %d. Skipping this incremental backup\n" 270 % (last_lsn, self.max_to_lsn), 271 ) 272 self.files_to_backup = ["lsn_only"] 273 return bRC_OK 274 return bRC_OK 275 276 def start_backup_file(self, savepkt): 277 """ 278 This method is called, when Bareos is ready to start backup a file 279 """ 280 if not self.files_to_backup: 281 self.file_to_backup = None 282 DebugMessage(100, "start_backup_file: None\n") 283 else: 284 self.file_to_backup = self.files_to_backup.pop() 285 DebugMessage(100, "start_backup_file: %s\n" % self.file_to_backup) 286 287 statp = StatPacket() 288 savepkt.statp = statp 289 290 if self.file_to_backup == "stream": 291 # This is the database backup as xbstream 292 savepkt.fname = "/_percona/xbstream.%010d" % self.jobId 293 savepkt.type = FT_REG 294 if self.max_to_lsn > 0: 295 self.dumpoptions += " --incremental-lsn=%d" % self.max_to_lsn 296 self.dumpcommand = "%s %s" % (self.dumpbinary, self.dumpoptions) 297 DebugMessage(100, "Dumper: '" + self.dumpcommand + "'\n") 298 elif self.file_to_backup == "lsnfile": 299 # The restore object containing the log sequence number (lsn) 300 # Read checkpoints and create restore object 301 checkpoints = {} 302 # improve: Error handling 303 with open("%s/xtrabackup_checkpoints" % self.tempdir) as lsnfile: 304 for line in lsnfile: 305 key, value = line.partition("=")[::2] 306 checkpoints[key.strip()] = value.strip() 307 savepkt.fname = "/_percona/xtrabackup_checkpoints" 308 savepkt.type = FT_RESTORE_FIRST 309 savepkt.object_name = savepkt.fname 310 savepkt.object = bytearray(json.dumps(checkpoints), encoding="utf8") 311 savepkt.object_len = len(savepkt.object) 312 savepkt.object_index = int(time.time()) 313 shutil.rmtree(self.tempdir) 314 elif self.file_to_backup == "lsn_only": 315 # We have nothing to backup incremental, so we just have to pass 316 # the restore object from previous job 317 savepkt.fname = "/_percona/xtrabackup_checkpoints" 318 savepkt.type = FT_RESTORE_FIRST 319 savepkt.object_name = savepkt.fname 320 savepkt.object = bytearray(self.row_rop_raw) 321 savepkt.object_len = len(savepkt.object) 322 savepkt.object_index = int(time.time()) 323 else: 324 # should not happen 325 JobMessage( 326 M_FATAL, 327 "Unknown error. Don't know how to handle %s\n" % self.file_to_backup, 328 ) 329 330 JobMessage( 331 M_INFO, 332 "Starting backup of " + savepkt.fname + "\n", 333 ) 334 return bRC_OK 335 336 def plugin_io(self, IOP): 337 """ 338 Called for io operations. We read from pipe into buffers or on restore 339 send to xbstream 340 """ 341 DebugMessage(200, "plugin_io called with " + str(IOP.func) + "\n") 342 343 if IOP.func == IO_OPEN: 344 DebugMessage(100, "plugin_io called with IO_OPEN\n") 345 if self.log: 346 try: 347 self.err_fd = open(self.log, "a") 348 except IOError as msg: 349 DebugMessage( 350 100, 351 "Could not open log file (%s): %s\n" 352 % (self.log, format(str(msg))), 353 ) 354 if IOP.flags & (os.O_CREAT | os.O_WRONLY): 355 if self.log: 356 self.err_fd.write( 357 '%s Restore Job %s opens stream with "%s"\n' 358 % (datetime.datetime.now(), self.jobId, self.restorecommand) 359 ) 360 self.stream = Popen( 361 self.restorecommand, shell=True, stdin=PIPE, stderr=self.err_fd 362 ) 363 else: 364 if self.log: 365 self.err_fd.write( 366 '%s Backup Job %s opens stream with "%s"\n' 367 % (datetime.datetime.now(), self.jobId, self.dumpcommand) 368 ) 369 self.stream = Popen( 370 self.dumpcommand, shell=True, stdout=PIPE, stderr=self.err_fd 371 ) 372 return bRC_OK 373 374 elif IOP.func == IO_READ: 375 IOP.buf = bytearray(IOP.count) 376 IOP.status = self.stream.stdout.readinto(IOP.buf) 377 IOP.io_errno = 0 378 return bRC_OK 379 380 elif IOP.func == IO_WRITE: 381 try: 382 self.stream.stdin.write(IOP.buf) 383 IOP.status = IOP.count 384 IOP.io_errno = 0 385 except IOError as msg: 386 IOP.io_errno = -1 387 DebugMessage(100, "Error writing data: " + format(str(msg)) + "\n") 388 return bRC_OK 389 390 elif IOP.func == IO_CLOSE: 391 DebugMessage(100, "plugin_io called with IO_CLOSE\n") 392 self.subprocess_returnCode = self.stream.poll() 393 if self.subprocess_returnCode is None: 394 # Subprocess is open, we wait until it finishes and get results 395 try: 396 self.stream.communicate() 397 self.subprocess_returnCode = self.stream.poll() 398 except: 399 JobMessage( 400 M_ERROR, 401 "Dump / restore command not finished properly\n", 402 ) 403 bRC_Error 404 return bRC_OK 405 else: 406 DebugMessage( 407 100, 408 "Subprocess has terminated with returncode: %d\n" 409 % self.subprocess_returnCode, 410 ) 411 return bRC_OK 412 413 elif IOP.func == IO_SEEK: 414 return bRC_OK 415 416 else: 417 DebugMessage( 418 100, 419 "plugin_io called with unsupported IOP:" + str(IOP.func) + "\n", 420 ) 421 return bRC_OK 422 423 def end_backup_file(self): 424 """ 425 Check if dump was successful. 426 """ 427 # Usually the xtrabackup process should have terminated here, but on some servers 428 # it has not always. 429 if self.file_to_backup == "stream": 430 returnCode = self.subprocess_returnCode 431 if returnCode is None: 432 JobMessage( 433 M_ERROR, 434 "Dump command not finished properly for unknown reason\n", 435 ) 436 returnCode = -99 437 else: 438 DebugMessage( 439 100, 440 "end_backup_file() entry point in Python called. Returncode: %d\n" 441 % self.stream.returncode, 442 ) 443 if returnCode != 0: 444 msg = [ 445 "Dump command returned non-zero value: %d" % returnCode, 446 'command: "%s"' % self.dumpcommand, 447 ] 448 if self.log: 449 msg += ['log file: "%s"' % self.log] 450 JobMessage(M_FATAL, ", ".join(msg) + "\n") 451 if returnCode != 0: 452 return bRC_Error 453 454 if self.log: 455 self.err_fd.write( 456 "%s Backup Job %s closes stream\n" 457 % (datetime.datetime.now(), self.jobId) 458 ) 459 self.err_fd.close() 460 461 if self.files_to_backup: 462 return bRC_More 463 else: 464 return bRC_OK 465 466 def end_restore_file(self): 467 """ 468 Check if writing to restore command was successful. 469 """ 470 returnCode = self.subprocess_returnCode 471 if returnCode is None: 472 JobMessage( 473 M_ERROR, 474 "Restore command not finished properly for unknown reason\n", 475 ) 476 returnCode = -99 477 else: 478 DebugMessage( 479 100, 480 "end_restore_file() entry point in Python called. Returncode: %d\n" 481 % self.stream.returncode, 482 ) 483 if returnCode != 0: 484 msg = ["Restore command returned non-zero value: %d" % return_code] 485 if self.log: 486 msg += ['log file: "%s"' % self.log] 487 JobMessage(M_ERROR, ", ".join(msg) + "\n") 488 if self.log: 489 self.err_fd.write( 490 "%s Restore Job %s closes stream\n" 491 % (datetime.datetime.now(), self.jobId) 492 ) 493 self.err_fd.close() 494 495 if returnCode == 0: 496 return bRC_OK 497 else: 498 return bRC_Error 499 500 def restore_object_data(self, ROP): 501 """ 502 Called on restore and on diff/inc jobs. 503 """ 504 # Improve: sanity / consistence check of restore object 505 self.row_rop_raw = ROP.object 506 self.rop_data[ROP.jobid] = json.loads((self.row_rop_raw.decode("utf-8"))) 507 if ( 508 "to_lsn" in self.rop_data[ROP.jobid] 509 and int(self.rop_data[ROP.jobid]["to_lsn"]) > self.max_to_lsn 510 ): 511 self.max_to_lsn = int(self.rop_data[ROP.jobid]["to_lsn"]) 512 JobMessage( 513 M_INFO, 514 "Got to_lsn %d from restore object of job %d\n" 515 % (self.max_to_lsn, ROP.jobid), 516 ) 517 return bRC_OK 518 519 520# vim: ts=4 tabstop=4 expandtab shiftwidth=4 softtabstop=4 521