1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# Copyright (C) 2015-2020 Bareos GmbH & Co. KG
4#
5# This program is Free Software; you can redistribute it and/or
6# modify it under the terms of version three of the GNU Affero General Public
7# License as published by the Free Software Foundation, which is
8# listed in the file LICENSE.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13# Affero General Public License for more details.
14#
15# You should have received a copy of the GNU Affero General Public License
16# along with this program; if not, write to the Free Software
17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18# 02110-1301, USA.
19#
20# Author: Maik Aussendorf
21#
22# Uses Percona's xtrabackup for backup and restore of MySQL / MariaDB databases
23
24from bareosfd import *
25from bareos_fd_consts import *
26import os
27from subprocess import *
28from BareosFdPluginBaseclass import *
29import BareosFdWrapper
30import datetime
31import time
32import tempfile
33import shutil
34import json
35
36
37class BareosFdPercona(BareosFdPluginBaseclass):
38    """
39        Plugin for backing up all mysql innodb databases found in a specific mysql server
40        using the Percona xtrabackup tool.
41    """
42
43    def __init__(self, context, plugindef):
44        # BareosFdPluginBaseclass.__init__(self, context, plugindef)
45        super(BareosFdPercona, self).__init__(context, plugindef)
46        # we first create and backup the stream and after that
47        # the lsn file as restore-object
48        self.files_to_backup = ["lsnfile", "stream"]
49        self.tempdir = tempfile.mkdtemp()
50        # self.logdir = GetValue(context, bVariable['bVarWorkingDir'])
51        self.logdir = "/var/log/bareos/"
52        self.log = "bareos-plugin-percona.log"
53        self.rop_data = {}
54        self.max_to_lsn = 0
55        self.err_fd = None
56
57    def parse_plugin_definition(self, context, plugindef):
58        """
59        We have default options that should work out of the box in the most  use cases
60        that the mysql/mariadb is on the same host and can be accessed without user/password information,
61        e.g. with a valid my.cnf for user root.
62        """
63        BareosFdPluginBaseclass.parse_plugin_definition(self, context, plugindef)
64
65        if "dumpbinary" in self.options:
66            self.dumpbinary = self.options["dumpbinary"]
67        else:
68            self.dumpbinary = "xtrabackup"
69
70        if "restorecommand" not in self.options:
71            self.restorecommand = "xbstream -x -C "
72        else:
73            self.restorecommand = self.options["restorecommand"]
74
75        # Default is not to write an extra logfile
76        if "log" not in self.options:
77            self.log = False
78        elif self.options["log"] == "false":
79            self.log = False
80        elif os.path.isabs(self.options["log"]):
81            self.log = self.options["log"]
82        else:
83            self.log = os.path.join(self.logdir, self.options["log"])
84
85        # By default, standard mysql-config files will be used, set
86        # this option to use extra files
87        self.connect_options = {"read_default_group": "client"}
88        if "mycnf" in self.options:
89            self.connect_options["read_default_file"] = self.options["mycnf"]
90            self.mycnf = "--defaults-extra-file=%s " % self.options["mycnf"]
91        else:
92            self.mycnf = ""
93
94        # If true, incremental jobs will only be performed, if LSN has increased
95        # since last call.
96        if (
97            "strictIncremental" in self.options
98            and self.options["strictIncremental"] == "true"
99        ):
100            self.strictIncremental = True
101        else:
102            self.strictIncremental = False
103
104        self.dumpoptions = self.mycnf
105
106        # if dumpoptions is set, we use that here, otherwise defaults
107        if "dumpoptions" in self.options:
108            self.dumpoptions += self.options["dumpoptions"]
109        else:
110            self.dumpoptions += " --backup --stream=xbstream"
111
112        self.dumpoptions += " --extra-lsndir=%s" % self.tempdir
113
114        if "extradumpoptions" in self.options:
115            self.dumpoptions += " " + self.options["extradumpoptions"]
116
117        # We need to call mysql to get the current Log Sequece Number (LSN)
118        if "mysqlcmd" in self.options:
119            self.mysqlcmd = self.options["mysqlcmd"]
120        else:
121            self.mysqlcmd = "mysql %s -r" % self.mycnf
122
123        return bRCs["bRC_OK"]
124
125    def check_plugin_options(self, context, mandatory_options=None):
126        accurate_enabled = GetValue(context, bVariable["bVarAccurate"])
127        if accurate_enabled is not None and accurate_enabled != 0:
128            JobMessage(
129                context,
130                bJobMessageType["M_FATAL"],
131                "start_backup_job: Accurate backup not allowed please disable in Job\n",
132            )
133            return bRCs["bRC_Error"]
134        else:
135            return bRCs["bRC_OK"]
136
137    def create_file(self, context, restorepkt):
138        """
139        On restore we create a subdirectory for the first base backup and each incremental backup.
140        Because percona expects an empty directory, we create a tree starting with jobId/ of restore job
141        """
142        FNAME = restorepkt.ofname
143        DebugMessage(context, 100, "create file with %s called\n" % FNAME)
144        self.writeDir = "%s/%d/" % (os.path.dirname(FNAME), self.jobId)
145        # FNAME contains originating jobId after last .
146        origJobId = int(FNAME.rpartition(".")[-1])
147        if origJobId in self.rop_data:
148            rop_from_lsn = int(self.rop_data[origJobId]["from_lsn"])
149            rop_to_lsn = int(self.rop_data[origJobId]["to_lsn"])
150            self.writeDir += "/%020d_" % rop_from_lsn
151            self.writeDir += "%020d_" % rop_to_lsn
152            self.writeDir += "%010d" % origJobId
153        else:
154            JobMessage(
155                context,
156                bJobMessageType["M_ERROR"],
157                "No lsn information found in restore object for file %s from job %d\n"
158                % (FNAME, origJobId),
159            )
160
161        # Create restore directory, if not existent
162        if not os.path.exists(self.writeDir):
163            bareosfd.DebugMessage(
164                context,
165                200,
166                "Directory %s does not exist, creating it now\n" % self.writeDir,
167            )
168            os.makedirs(self.writeDir)
169        # Percona requires empty directory
170        if os.listdir(self.writeDir):
171            JobMessage(
172                context,
173                bJobMessageType["M_FATAL"],
174                "Restore with xbstream needs empty directory: %s\n" % self.writeDir,
175            )
176            return bRCs["bRC_Error"]
177        self.restorecommand += self.writeDir
178        DebugMessage(
179            context,
180            100,
181            'Restore using xbstream to extract files with "%s"\n' % self.restorecommand,
182        )
183        restorepkt.create_status = bCFs["CF_EXTRACT"]
184        return bRCs["bRC_OK"]
185
186    def start_backup_job(self, context):
187        """
188        We will check, if database has changed since last backup
189        in the incremental case
190        """
191        check_option_bRC = self.check_plugin_options(context)
192        if check_option_bRC != bRCs["bRC_OK"]:
193            return check_option_bRC
194        bareosfd.DebugMessage(
195            context, 100, "start_backup_job, level: %s\n" % chr(self.level)
196        )
197        if chr(self.level) == "I":
198            # We check, if we have a LSN received by restore object from previous job
199            if self.max_to_lsn == 0:
200                JobMessage(
201                    context,
202                    bJobMessageType["M_FATAL"],
203                    "No LSN received to be used with incremental backup\n",
204                )
205                return bRCs["bRC_Error"]
206            # Try to load MySQLdb module
207            hasMySQLdbModule = False
208            try:
209                import MySQLdb
210
211                hasMySQLdbModule = True
212                bareosfd.DebugMessage(context, 100, "Imported module MySQLdb\n")
213            except ImportError:
214                bareosfd.DebugMessage(
215                    context,
216                    100,
217                    "Import of module MySQLdb failed. Using command pipe instead\n",
218                )
219            # contributed by https://github.com/kjetilho
220            if hasMySQLdbModule:
221                try:
222                    conn = MySQLdb.connect(**self.connect_options)
223                    cursor = conn.cursor()
224                    cursor.execute("SHOW ENGINE INNODB STATUS")
225                    result = cursor.fetchall()
226                    if len(result) == 0:
227                        JobMessage(
228                            context,
229                            bJobMessageType["M_FATAL"],
230                            "Could not fetch SHOW ENGINE INNODB STATUS, unprivileged user?",
231                        )
232                        return bRCs["bRC_Error"]
233                    info = result[0][2]
234                    conn.close()
235                    for line in info.split("\n"):
236                        if line.startswith("Log sequence number"):
237                            last_lsn = int(line.split(" ")[3])
238                except Exception as e:
239                    JobMessage(
240                        context,
241                        bJobMessageType["M_FATAL"],
242                        "Could not get LSN, Error: %s" % e,
243                    )
244                    return bRCs["bRC_Error"]
245            # use old method as fallback, if module MySQLdb not available
246            else:
247                get_lsn_command = (
248                    "echo 'SHOW ENGINE INNODB STATUS' | %s | grep 'Log sequence number' | cut -d ' ' -f 4"
249                    % self.mysqlcmd
250                )
251                last_lsn_proc = Popen(
252                    get_lsn_command, shell=True, stdout=PIPE, stderr=PIPE
253                )
254                last_lsn_proc.wait()
255                returnCode = last_lsn_proc.poll()
256                (mysqlStdOut, mysqlStdErr) = last_lsn_proc.communicate()
257                if returnCode != 0 or mysqlStdErr:
258                    JobMessage(
259                        context,
260                        bJobMessageType["M_FATAL"],
261                        'Could not get LSN with command "%s", Error: %s'
262                        % (get_lsn_command, mysqlStdErr),
263                    )
264                    return bRCs["bRC_Error"]
265                else:
266                    try:
267                        last_lsn = int(mysqlStdOut)
268                    except:
269                        JobMessage(
270                            context,
271                            bJobMessageType["M_FATAL"],
272                            'Error reading LSN: "%s" not an integer' % mysqlStdOut,
273                        )
274                        return bRCs["bRC_Error"]
275            JobMessage(
276                context, bJobMessageType["M_INFO"], "Backup until LSN: %d\n" % last_lsn
277            )
278            if (
279                self.max_to_lsn > 0
280                and self.max_to_lsn >= last_lsn
281                and self.strictIncremental
282            ):
283                bareosfd.DebugMessage(
284                    context,
285                    100,
286                    "Last LSN of DB %d is not higher than LSN from previous job %d. Skipping this incremental backup\n"
287                    % (last_lsn, self.max_to_lsn),
288                )
289                self.files_to_backup = ["lsn_only"]
290                return bRCs["bRC_OK"]
291        return bRCs["bRC_OK"]
292
293    def start_backup_file(self, context, savepkt):
294        """
295        This method is called, when Bareos is ready to start backup a file
296        """
297        if not self.files_to_backup:
298            self.file_to_backup = None
299            DebugMessage(context, 100, "start_backup_file: None\n")
300        else:
301            self.file_to_backup = self.files_to_backup.pop()
302            DebugMessage(context, 100, "start_backup_file: %s\n" % self.file_to_backup)
303
304        statp = StatPacket()
305        savepkt.statp = statp
306
307        if self.file_to_backup == "stream":
308            # This is the database backup as xbstream
309            savepkt.fname = "/_percona/xbstream.%010d" % self.jobId
310            savepkt.type = bFileType["FT_REG"]
311            if self.max_to_lsn > 0:
312                self.dumpoptions += " --incremental-lsn=%d" % self.max_to_lsn
313            self.dumpcommand = "%s %s" % (self.dumpbinary, self.dumpoptions)
314            DebugMessage(context, 100, "Dumper: '" + self.dumpcommand + "'\n")
315        elif self.file_to_backup == "lsnfile":
316            # The restore object containing the log sequence number (lsn)
317            # Read checkpoints and create restore object
318            checkpoints = {}
319            # improve: Error handling
320            with open("%s/xtrabackup_checkpoints" % self.tempdir) as lsnfile:
321                for line in lsnfile:
322                    key, value = line.partition("=")[::2]
323                    checkpoints[key.strip()] = value.strip()
324            savepkt.fname = "/_percona/xtrabackup_checkpoints"
325            savepkt.type = bFileType["FT_RESTORE_FIRST"]
326            savepkt.object_name = savepkt.fname
327            savepkt.object = bytearray(json.dumps(checkpoints))
328            savepkt.object_len = len(savepkt.object)
329            savepkt.object_index = int(time.time())
330            shutil.rmtree(self.tempdir)
331        elif self.file_to_backup == "lsn_only":
332            # We have nothing to backup incremental, so we just have to pass
333            # the restore object from previous job
334            savepkt.fname = "/_percona/xtrabackup_checkpoints"
335            savepkt.type = bFileType["FT_RESTORE_FIRST"]
336            savepkt.object_name = savepkt.fname
337            savepkt.object = bytearray(self.row_rop_raw)
338            savepkt.object_len = len(savepkt.object)
339            savepkt.object_index = int(time.time())
340        else:
341            # should not happen
342            JobMessage(
343                context,
344                bJobMessageType["M_FATAL"],
345                "Unknown error. Don't know how to handle %s\n" % self.file_to_backup,
346            )
347
348        JobMessage(
349            context,
350            bJobMessageType["M_INFO"],
351            "Starting backup of " + savepkt.fname + "\n",
352        )
353        return bRCs["bRC_OK"]
354
355    def plugin_io(self, context, IOP):
356        """
357        Called for io operations. We read from pipe into buffers or on restore
358        send to xbstream
359        """
360        DebugMessage(context, 200, "plugin_io called with " + str(IOP.func) + "\n")
361
362        if IOP.func == bIOPS["IO_OPEN"]:
363            DebugMessage(context, 100, "plugin_io called with IO_OPEN\n")
364            if self.log:
365                try:
366                    self.err_fd = open(self.log, "a")
367                except IOError as msg:
368                    DebugMessage(
369                        context,
370                        100,
371                        "Could not open log file (%s): %s\n"
372                        % (self.log, format(str(msg))),
373                    )
374            if IOP.flags & (os.O_CREAT | os.O_WRONLY):
375                if self.log:
376                    self.err_fd.write(
377                        '%s Restore Job %s opens stream with "%s"\n'
378                        % (datetime.datetime.now(), self.jobId, self.restorecommand)
379                    )
380                self.stream = Popen(
381                    self.restorecommand, shell=True, stdin=PIPE, stderr=self.err_fd
382                )
383            else:
384                if self.log:
385                    self.err_fd.write(
386                        '%s Backup Job %s opens stream with "%s"\n'
387                        % (datetime.datetime.now(), self.jobId, self.dumpcommand)
388                    )
389                self.stream = Popen(
390                    self.dumpcommand, shell=True, stdout=PIPE, stderr=self.err_fd
391                )
392            return bRCs["bRC_OK"]
393
394        elif IOP.func == bIOPS["IO_READ"]:
395            IOP.buf = bytearray(IOP.count)
396            IOP.status = self.stream.stdout.readinto(IOP.buf)
397            IOP.io_errno = 0
398            return bRCs["bRC_OK"]
399
400        elif IOP.func == bIOPS["IO_WRITE"]:
401            try:
402                self.stream.stdin.write(IOP.buf)
403                IOP.status = IOP.count
404                IOP.io_errno = 0
405            except IOError as msg:
406                IOP.io_errno = -1
407                DebugMessage(
408                    context, 100, "Error writing data: " + format(str(msg)) + "\n"
409                )
410            return bRCs["bRC_OK"]
411
412        elif IOP.func == bIOPS["IO_CLOSE"]:
413            DebugMessage(context, 100, "plugin_io called with IO_CLOSE\n")
414            self.subprocess_returnCode = self.stream.poll()
415            if self.subprocess_returnCode is None:
416                # Subprocess is open, we wait until it finishes and get results
417                try:
418                    self.stream.communicate()
419                    self.subprocess_returnCode = self.stream.poll()
420                except:
421                    JobMessage(
422                        context,
423                        bJobMessageType["M_ERROR"],
424                        "Dump / restore command not finished properly\n",
425                    )
426                    bRCs["bRC_Error"]
427                return bRCs["bRC_OK"]
428            else:
429                DebugMessage(
430                    context,
431                    100,
432                    "Subprocess has terminated with returncode: %d\n"
433                    % self.subprocess_returnCode,
434                )
435                return bRCs["bRC_OK"]
436
437        elif IOP.func == bIOPS["IO_SEEK"]:
438            return bRCs["bRC_OK"]
439
440        else:
441            DebugMessage(
442                context,
443                100,
444                "plugin_io called with unsupported IOP:" + str(IOP.func) + "\n",
445            )
446            return bRCs["bRC_OK"]
447
448    def end_backup_file(self, context):
449        """
450        Check if dump was successful.
451        """
452        # Usually the xtrabackup process should have terminated here, but on some servers
453        # it has not always.
454        if self.file_to_backup == "stream":
455            returnCode = self.subprocess_returnCode
456            if returnCode is None:
457                JobMessage(
458                    context,
459                    bJobMessageType["M_ERROR"],
460                    "Dump command not finished properly for unknown reason\n",
461                )
462                returnCode = -99
463            else:
464                DebugMessage(
465                    context,
466                    100,
467                    "end_backup_file() entry point in Python called. Returncode: %d\n"
468                    % self.stream.returncode,
469                )
470                if returnCode != 0:
471                    msg = [
472                        "Dump command returned non-zero value: %d" % returnCode,
473                        'command: "%s"' % self.dumpcommand,
474                    ]
475                    if self.log:
476                        msg += ['log file: "%s"' % self.log]
477                    JobMessage(
478                        context, bJobMessageType["M_FATAL"], ", ".join(msg) + "\n"
479                    )
480            if returnCode != 0:
481                return bRCs["bRC_Error"]
482
483            if self.log:
484                self.err_fd.write(
485                    "%s Backup Job %s closes stream\n"
486                    % (datetime.datetime.now(), self.jobId)
487                )
488                self.err_fd.close()
489
490        if self.files_to_backup:
491            return bRCs["bRC_More"]
492        else:
493            return bRCs["bRC_OK"]
494
495    def end_restore_file(self, context):
496        """
497        Check if writing to restore command was successful.
498        """
499        returnCode = self.subprocess_returnCode
500        if returnCode is None:
501            JobMessage(
502                context,
503                bJobMessageType["M_ERROR"],
504                "Restore command not finished properly for unknown reason\n",
505            )
506            returnCode = -99
507        else:
508            DebugMessage(
509                context,
510                100,
511                "end_restore_file() entry point in Python called. Returncode: %d\n"
512                % self.stream.returncode,
513            )
514            if returnCode != 0:
515                msg = ["Restore command returned non-zero value: %d" % return_code]
516                if self.log:
517                    msg += ['log file: "%s"' % self.log]
518                JobMessage(context, bJobMessageType["M_ERROR"], ", ".join(msg) + "\n")
519        if self.log:
520            self.err_fd.write(
521                "%s Restore Job %s closes stream\n"
522                % (datetime.datetime.now(), self.jobId)
523            )
524            self.err_fd.close()
525
526        if returnCode == 0:
527            return bRCs["bRC_OK"]
528        else:
529            return bRCs["bRC_Error"]
530
531    def restore_object_data(self, context, ROP):
532        """
533        Called on restore and on diff/inc jobs.
534        """
535        # Improve: sanity / consistence check of restore object
536        self.row_rop_raw = ROP.object
537        self.rop_data[ROP.jobid] = json.loads(str(self.row_rop_raw))
538        if (
539            "to_lsn" in self.rop_data[ROP.jobid]
540            and self.rop_data[ROP.jobid]["to_lsn"] > self.max_to_lsn
541        ):
542            self.max_to_lsn = int(self.rop_data[ROP.jobid]["to_lsn"])
543            JobMessage(
544                context,
545                bJobMessageType["M_INFO"],
546                "Got to_lsn %d from restore object of job %d\n"
547                % (self.max_to_lsn, ROP.jobid),
548            )
549        return bRCs["bRC_OK"]
550
551
552# vim: ts=4 tabstop=4 expandtab shiftwidth=4 softtabstop=4
553