1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# Copyright (C) 2015-2020 Bareos GmbH & Co. KG
4#
5# This program is Free Software; you can redistribute it and/or
6# modify it under the terms of version three of the GNU Affero General Public
7# License as published by the Free Software Foundation, which is
8# listed in the file LICENSE.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13# Affero General Public License for more details.
14#
15# You should have received a copy of the GNU Affero General Public License
16# along with this program; if not, write to the Free Software
17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18# 02110-1301, USA.
19#
20# Author: Maik Aussendorf
21#
22# Uses Percona's xtrabackup for backup and restore of MySQL / MariaDB databases
23
24from bareosfd import *
25import os
26from subprocess import *
27from BareosFdPluginBaseclass import *
28import BareosFdWrapper
29import datetime
30import time
31import tempfile
32import shutil
33import json
34
35
36class BareosFdPercona(BareosFdPluginBaseclass):
37    """
38    Plugin for backing up all mysql innodb databases found in a specific mysql server
39    using the Percona xtrabackup tool.
40    """
41
42    def __init__(self, plugindef):
43        # BareosFdPluginBaseclass.__init__(self, plugindef)
44        super(BareosFdPercona, self).__init__(plugindef)
45        # we first create and backup the stream and after that
46        # the lsn file as restore-object
47        self.files_to_backup = ["lsnfile", "stream"]
48        self.tempdir = tempfile.mkdtemp()
49        # self.logdir = GetValue(bVarWorkingDir)
50        self.logdir = "/var/log/bareos/"
51        self.log = "bareos-plugin-percona.log"
52        self.rop_data = {}
53        self.max_to_lsn = 0
54        self.err_fd = None
55
56    def parse_plugin_definition(self, plugindef):
57        """
58        We have default options that should work out of the box in the most  use cases
59        that the mysql/mariadb is on the same host and can be accessed without user/password information,
60        e.g. with a valid my.cnf for user root.
61        """
62        BareosFdPluginBaseclass.parse_plugin_definition(self, plugindef)
63
64        if "dumpbinary" in self.options:
65            self.dumpbinary = self.options["dumpbinary"]
66        else:
67            self.dumpbinary = "xtrabackup"
68
69        if "restorecommand" not in self.options:
70            self.restorecommand = "xbstream -x -C "
71        else:
72            self.restorecommand = self.options["restorecommand"]
73
74        # Default is not to write an extra logfile
75        if "log" not in self.options:
76            self.log = False
77        elif self.options["log"] == "false":
78            self.log = False
79        elif os.path.isabs(self.options["log"]):
80            self.log = self.options["log"]
81        else:
82            self.log = os.path.join(self.logdir, self.options["log"])
83
84        # By default, standard mysql-config files will be used, set
85        # this option to use extra files
86        self.connect_options = {"read_default_group": "client"}
87        if "mycnf" in self.options:
88            self.connect_options["read_default_file"] = self.options["mycnf"]
89            self.mycnf = "--defaults-extra-file=%s " % self.options["mycnf"]
90        else:
91            self.mycnf = ""
92
93        # If true, incremental jobs will only be performed, if LSN has increased
94        # since last call.
95        if (
96            "strictIncremental" in self.options
97            and self.options["strictIncremental"] == "true"
98        ):
99            self.strictIncremental = True
100        else:
101            self.strictIncremental = False
102
103        self.dumpoptions = self.mycnf
104
105        # if dumpoptions is set, we use that here, otherwise defaults
106        if "dumpoptions" in self.options:
107            self.dumpoptions += self.options["dumpoptions"]
108        else:
109            self.dumpoptions += " --backup --stream=xbstream"
110
111        self.dumpoptions += " --extra-lsndir=%s" % self.tempdir
112
113        if "extradumpoptions" in self.options:
114            self.dumpoptions += " " + self.options["extradumpoptions"]
115
116        # We need to call mysql to get the current Log Sequece Number (LSN)
117        if "mysqlcmd" in self.options:
118            self.mysqlcmd = self.options["mysqlcmd"]
119        else:
120            self.mysqlcmd = "mysql %s -r" % self.mycnf
121
122        return bRC_OK
123
124    def check_plugin_options(self, mandatory_options=None):
125        accurate_enabled = GetValue(bVarAccurate)
126        if accurate_enabled is not None and accurate_enabled != 0:
127            JobMessage(
128                M_FATAL,
129                "start_backup_job: Accurate backup not allowed please disable in Job\n",
130            )
131            return bRC_Error
132        else:
133            return bRC_OK
134
135    def create_file(self, restorepkt):
136        """
137        On restore we create a subdirectory for the first base backup and each incremental backup.
138        Because percona expects an empty directory, we create a tree starting with jobId/ of restore job
139        """
140        FNAME = restorepkt.ofname
141        DebugMessage(100, "create file with %s called\n" % FNAME)
142        self.writeDir = "%s/%d/" % (os.path.dirname(FNAME), self.jobId)
143        # FNAME contains originating jobId after last .
144        origJobId = int(FNAME.rpartition(".")[-1])
145        if origJobId in self.rop_data:
146            rop_from_lsn = int(self.rop_data[origJobId]["from_lsn"])
147            rop_to_lsn = int(self.rop_data[origJobId]["to_lsn"])
148            self.writeDir += "/%020d_" % rop_from_lsn
149            self.writeDir += "%020d_" % rop_to_lsn
150            self.writeDir += "%010d" % origJobId
151        else:
152            JobMessage(
153                M_ERROR,
154                "No lsn information found in restore object for file %s from job %d\n"
155                % (FNAME, origJobId),
156            )
157
158        # Create restore directory, if not existent
159        if not os.path.exists(self.writeDir):
160            bareosfd.DebugMessage(
161                200,
162                "Directory %s does not exist, creating it now\n" % self.writeDir,
163            )
164            os.makedirs(self.writeDir)
165        # Percona requires empty directory
166        if os.listdir(self.writeDir):
167            JobMessage(
168                M_FATAL,
169                "Restore with xbstream needs empty directory: %s\n" % self.writeDir,
170            )
171            return bRC_Error
172        self.restorecommand += self.writeDir
173        DebugMessage(
174            100,
175            'Restore using xbstream to extract files with "%s"\n' % self.restorecommand,
176        )
177        restorepkt.create_status = CF_EXTRACT
178        return bRC_OK
179
180    def start_backup_job(self):
181        """
182        We will check, if database has changed since last backup
183        in the incremental case
184        """
185        check_option_bRC = self.check_plugin_options()
186        if check_option_bRC != bRC_OK:
187            return check_option_bRC
188        bareosfd.DebugMessage(100, "start_backup_job, level: %s\n" % chr(self.level))
189        if chr(self.level) == "I":
190            # We check, if we have a LSN received by restore object from previous job
191            if self.max_to_lsn == 0:
192                JobMessage(
193                    M_FATAL,
194                    "No LSN received to be used with incremental backup\n",
195                )
196                return bRC_Error
197            # Try to load MySQLdb module
198            hasMySQLdbModule = False
199            try:
200                import MySQLdb
201
202                hasMySQLdbModule = True
203                bareosfd.DebugMessage(100, "Imported module MySQLdb\n")
204            except ImportError:
205                bareosfd.DebugMessage(
206                    100,
207                    "Import of module MySQLdb failed. Using command pipe instead\n",
208                )
209            # contributed by https://github.com/kjetilho
210            if hasMySQLdbModule:
211                try:
212                    conn = MySQLdb.connect(**self.connect_options)
213                    cursor = conn.cursor()
214                    cursor.execute("SHOW ENGINE INNODB STATUS")
215                    result = cursor.fetchall()
216                    if len(result) == 0:
217                        JobMessage(
218                            M_FATAL,
219                            "Could not fetch SHOW ENGINE INNODB STATUS, unprivileged user?",
220                        )
221                        return bRC_Error
222                    info = result[0][2]
223                    conn.close()
224                    for line in info.split("\n"):
225                        if line.startswith("Log sequence number"):
226                            last_lsn = int(line.split(" ")[-1])
227                except Exception as e:
228                    JobMessage(
229                        M_FATAL,
230                        "Could not get LSN, Error: %s" % e,
231                    )
232                    return bRC_Error
233            # use old method as fallback, if module MySQLdb not available
234            else:
235                get_lsn_command = (
236                    "echo 'SHOW ENGINE INNODB STATUS' | %s | grep 'Log sequence number' | awk '{ print $4 }'"
237                    % self.mysqlcmd
238                )
239                last_lsn_proc = Popen(
240                    get_lsn_command, shell=True, stdout=PIPE, stderr=PIPE
241                )
242                last_lsn_proc.wait()
243                returnCode = last_lsn_proc.poll()
244                (mysqlStdOut, mysqlStdErr) = last_lsn_proc.communicate()
245                if returnCode != 0 or mysqlStdErr:
246                    JobMessage(
247                        M_FATAL,
248                        'Could not get LSN with command "%s", Error: %s'
249                        % (get_lsn_command, mysqlStdErr),
250                    )
251                    return bRC_Error
252                else:
253                    try:
254                        last_lsn = int(mysqlStdOut)
255                    except:
256                        JobMessage(
257                            M_FATAL,
258                            'Error reading LSN: "%s" not an integer' % mysqlStdOut,
259                        )
260                        return bRC_Error
261            JobMessage(M_INFO, "Backup until LSN: %d\n" % last_lsn)
262            if (
263                self.max_to_lsn > 0
264                and self.max_to_lsn >= last_lsn
265                and self.strictIncremental
266            ):
267                bareosfd.DebugMessage(
268                    100,
269                    "Last LSN of DB %d is not higher than LSN from previous job %d. Skipping this incremental backup\n"
270                    % (last_lsn, self.max_to_lsn),
271                )
272                self.files_to_backup = ["lsn_only"]
273                return bRC_OK
274        return bRC_OK
275
276    def start_backup_file(self, savepkt):
277        """
278        This method is called, when Bareos is ready to start backup a file
279        """
280        if not self.files_to_backup:
281            self.file_to_backup = None
282            DebugMessage(100, "start_backup_file: None\n")
283        else:
284            self.file_to_backup = self.files_to_backup.pop()
285            DebugMessage(100, "start_backup_file: %s\n" % self.file_to_backup)
286
287        statp = StatPacket()
288        savepkt.statp = statp
289
290        if self.file_to_backup == "stream":
291            # This is the database backup as xbstream
292            savepkt.fname = "/_percona/xbstream.%010d" % self.jobId
293            savepkt.type = FT_REG
294            if self.max_to_lsn > 0:
295                self.dumpoptions += " --incremental-lsn=%d" % self.max_to_lsn
296            self.dumpcommand = "%s %s" % (self.dumpbinary, self.dumpoptions)
297            DebugMessage(100, "Dumper: '" + self.dumpcommand + "'\n")
298        elif self.file_to_backup == "lsnfile":
299            # The restore object containing the log sequence number (lsn)
300            # Read checkpoints and create restore object
301            checkpoints = {}
302            # improve: Error handling
303            with open("%s/xtrabackup_checkpoints" % self.tempdir) as lsnfile:
304                for line in lsnfile:
305                    key, value = line.partition("=")[::2]
306                    checkpoints[key.strip()] = value.strip()
307            savepkt.fname = "/_percona/xtrabackup_checkpoints"
308            savepkt.type = FT_RESTORE_FIRST
309            savepkt.object_name = savepkt.fname
310            savepkt.object = bytearray(json.dumps(checkpoints), encoding="utf8")
311            savepkt.object_len = len(savepkt.object)
312            savepkt.object_index = int(time.time())
313            shutil.rmtree(self.tempdir)
314        elif self.file_to_backup == "lsn_only":
315            # We have nothing to backup incremental, so we just have to pass
316            # the restore object from previous job
317            savepkt.fname = "/_percona/xtrabackup_checkpoints"
318            savepkt.type = FT_RESTORE_FIRST
319            savepkt.object_name = savepkt.fname
320            savepkt.object = bytearray(self.row_rop_raw)
321            savepkt.object_len = len(savepkt.object)
322            savepkt.object_index = int(time.time())
323        else:
324            # should not happen
325            JobMessage(
326                M_FATAL,
327                "Unknown error. Don't know how to handle %s\n" % self.file_to_backup,
328            )
329
330        JobMessage(
331            M_INFO,
332            "Starting backup of " + savepkt.fname + "\n",
333        )
334        return bRC_OK
335
336    def plugin_io(self, IOP):
337        """
338        Called for io operations. We read from pipe into buffers or on restore
339        send to xbstream
340        """
341        DebugMessage(200, "plugin_io called with " + str(IOP.func) + "\n")
342
343        if IOP.func == IO_OPEN:
344            DebugMessage(100, "plugin_io called with IO_OPEN\n")
345            if self.log:
346                try:
347                    self.err_fd = open(self.log, "a")
348                except IOError as msg:
349                    DebugMessage(
350                        100,
351                        "Could not open log file (%s): %s\n"
352                        % (self.log, format(str(msg))),
353                    )
354            if IOP.flags & (os.O_CREAT | os.O_WRONLY):
355                if self.log:
356                    self.err_fd.write(
357                        '%s Restore Job %s opens stream with "%s"\n'
358                        % (datetime.datetime.now(), self.jobId, self.restorecommand)
359                    )
360                self.stream = Popen(
361                    self.restorecommand, shell=True, stdin=PIPE, stderr=self.err_fd
362                )
363            else:
364                if self.log:
365                    self.err_fd.write(
366                        '%s Backup Job %s opens stream with "%s"\n'
367                        % (datetime.datetime.now(), self.jobId, self.dumpcommand)
368                    )
369                self.stream = Popen(
370                    self.dumpcommand, shell=True, stdout=PIPE, stderr=self.err_fd
371                )
372            return bRC_OK
373
374        elif IOP.func == IO_READ:
375            IOP.buf = bytearray(IOP.count)
376            IOP.status = self.stream.stdout.readinto(IOP.buf)
377            IOP.io_errno = 0
378            return bRC_OK
379
380        elif IOP.func == IO_WRITE:
381            try:
382                self.stream.stdin.write(IOP.buf)
383                IOP.status = IOP.count
384                IOP.io_errno = 0
385            except IOError as msg:
386                IOP.io_errno = -1
387                DebugMessage(100, "Error writing data: " + format(str(msg)) + "\n")
388            return bRC_OK
389
390        elif IOP.func == IO_CLOSE:
391            DebugMessage(100, "plugin_io called with IO_CLOSE\n")
392            self.subprocess_returnCode = self.stream.poll()
393            if self.subprocess_returnCode is None:
394                # Subprocess is open, we wait until it finishes and get results
395                try:
396                    self.stream.communicate()
397                    self.subprocess_returnCode = self.stream.poll()
398                except:
399                    JobMessage(
400                        M_ERROR,
401                        "Dump / restore command not finished properly\n",
402                    )
403                    bRC_Error
404                return bRC_OK
405            else:
406                DebugMessage(
407                    100,
408                    "Subprocess has terminated with returncode: %d\n"
409                    % self.subprocess_returnCode,
410                )
411                return bRC_OK
412
413        elif IOP.func == IO_SEEK:
414            return bRC_OK
415
416        else:
417            DebugMessage(
418                100,
419                "plugin_io called with unsupported IOP:" + str(IOP.func) + "\n",
420            )
421            return bRC_OK
422
423    def end_backup_file(self):
424        """
425        Check if dump was successful.
426        """
427        # Usually the xtrabackup process should have terminated here, but on some servers
428        # it has not always.
429        if self.file_to_backup == "stream":
430            returnCode = self.subprocess_returnCode
431            if returnCode is None:
432                JobMessage(
433                    M_ERROR,
434                    "Dump command not finished properly for unknown reason\n",
435                )
436                returnCode = -99
437            else:
438                DebugMessage(
439                    100,
440                    "end_backup_file() entry point in Python called. Returncode: %d\n"
441                    % self.stream.returncode,
442                )
443                if returnCode != 0:
444                    msg = [
445                        "Dump command returned non-zero value: %d" % returnCode,
446                        'command: "%s"' % self.dumpcommand,
447                    ]
448                    if self.log:
449                        msg += ['log file: "%s"' % self.log]
450                    JobMessage(M_FATAL, ", ".join(msg) + "\n")
451            if returnCode != 0:
452                return bRC_Error
453
454            if self.log:
455                self.err_fd.write(
456                    "%s Backup Job %s closes stream\n"
457                    % (datetime.datetime.now(), self.jobId)
458                )
459                self.err_fd.close()
460
461        if self.files_to_backup:
462            return bRC_More
463        else:
464            return bRC_OK
465
466    def end_restore_file(self):
467        """
468        Check if writing to restore command was successful.
469        """
470        returnCode = self.subprocess_returnCode
471        if returnCode is None:
472            JobMessage(
473                M_ERROR,
474                "Restore command not finished properly for unknown reason\n",
475            )
476            returnCode = -99
477        else:
478            DebugMessage(
479                100,
480                "end_restore_file() entry point in Python called. Returncode: %d\n"
481                % self.stream.returncode,
482            )
483            if returnCode != 0:
484                msg = ["Restore command returned non-zero value: %d" % return_code]
485                if self.log:
486                    msg += ['log file: "%s"' % self.log]
487                JobMessage(M_ERROR, ", ".join(msg) + "\n")
488        if self.log:
489            self.err_fd.write(
490                "%s Restore Job %s closes stream\n"
491                % (datetime.datetime.now(), self.jobId)
492            )
493            self.err_fd.close()
494
495        if returnCode == 0:
496            return bRC_OK
497        else:
498            return bRC_Error
499
500    def restore_object_data(self, ROP):
501        """
502        Called on restore and on diff/inc jobs.
503        """
504        # Improve: sanity / consistence check of restore object
505        self.row_rop_raw = ROP.object
506        self.rop_data[ROP.jobid] = json.loads((self.row_rop_raw.decode("utf-8")))
507        if (
508            "to_lsn" in self.rop_data[ROP.jobid]
509            and int(self.rop_data[ROP.jobid]["to_lsn"]) > self.max_to_lsn
510        ):
511            self.max_to_lsn = int(self.rop_data[ROP.jobid]["to_lsn"])
512            JobMessage(
513                M_INFO,
514                "Got to_lsn %d from restore object of job %d\n"
515                % (self.max_to_lsn, ROP.jobid),
516            )
517        return bRC_OK
518
519
520# vim: ts=4 tabstop=4 expandtab shiftwidth=4 softtabstop=4
521