1"""
2Return data to a mysql server
3
4:maintainer:    Dave Boucha <dave@saltstack.com>, Seth House <shouse@saltstack.com>
5:maturity:      mature
6:depends:       python-mysqldb
7:platform:      all
8
9To enable this returner, the minion will need the python client for mysql
10installed and the following values configured in the minion or master
11config. These are the defaults:
12
13.. code-block:: yaml
14
15    mysql.host: 'salt'
16    mysql.user: 'salt'
17    mysql.pass: 'salt'
18    mysql.db: 'salt'
19    mysql.port: 3306
20
21SSL is optional. The defaults are set to None. If you do not want to use SSL,
22either exclude these options or set them to None.
23
24.. code-block:: yaml
25
26    mysql.ssl_ca: None
27    mysql.ssl_cert: None
28    mysql.ssl_key: None
29
30Alternative configuration values can be used by prefacing the configuration
31with `alternative.`. Any values not found in the alternative configuration will
32be pulled from the default location. As stated above, SSL configuration is
33optional. The following ssl options are simply for illustration purposes:
34
35.. code-block:: yaml
36
37    alternative.mysql.host: 'salt'
38    alternative.mysql.user: 'salt'
39    alternative.mysql.pass: 'salt'
40    alternative.mysql.db: 'salt'
41    alternative.mysql.port: 3306
42    alternative.mysql.ssl_ca: '/etc/pki/mysql/certs/localhost.pem'
43    alternative.mysql.ssl_cert: '/etc/pki/mysql/certs/localhost.crt'
44    alternative.mysql.ssl_key: '/etc/pki/mysql/certs/localhost.key'
45
46Should you wish the returner data to be cleaned out every so often, set
47`keep_jobs` to the number of hours for the jobs to live in the tables.
48Setting it to `0` will cause the data to stay in the tables. The default
49setting for `keep_jobs` is set to `24`.
50
51Should you wish to archive jobs in a different table for later processing,
52set `archive_jobs` to True.  Salt will create 3 archive tables
53
54- `jids_archive`
55- `salt_returns_archive`
56- `salt_events_archive`
57
58and move the contents of `jids`, `salt_returns`, and `salt_events` that are
59more than `keep_jobs` hours old to these tables.
60
61Use the following mysql database schema:
62
63.. code-block:: sql
64
65    CREATE DATABASE  `salt`
66      DEFAULT CHARACTER SET utf8
67      DEFAULT COLLATE utf8_general_ci;
68
69    USE `salt`;
70
71    --
72    -- Table structure for table `jids`
73    --
74
75    DROP TABLE IF EXISTS `jids`;
76    CREATE TABLE `jids` (
77      `jid` varchar(255) NOT NULL,
78      `load` mediumtext NOT NULL,
79      UNIQUE KEY `jid` (`jid`)
80    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
81
82    --
83    -- Table structure for table `salt_returns`
84    --
85
86    DROP TABLE IF EXISTS `salt_returns`;
87    CREATE TABLE `salt_returns` (
88      `fun` varchar(50) NOT NULL,
89      `jid` varchar(255) NOT NULL,
90      `return` mediumtext NOT NULL,
91      `id` varchar(255) NOT NULL,
92      `success` varchar(10) NOT NULL,
93      `full_ret` mediumtext NOT NULL,
94      `alter_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
95      KEY `id` (`id`),
96      KEY `jid` (`jid`),
97      KEY `fun` (`fun`)
98    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
99
100    --
101    -- Table structure for table `salt_events`
102    --
103
104    DROP TABLE IF EXISTS `salt_events`;
105    CREATE TABLE `salt_events` (
106    `id` BIGINT NOT NULL AUTO_INCREMENT,
107    `tag` varchar(255) NOT NULL,
108    `data` mediumtext NOT NULL,
109    `alter_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
110    `master_id` varchar(255) NOT NULL,
111    PRIMARY KEY (`id`),
112    KEY `tag` (`tag`)
113    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
114
115Required python modules: MySQLdb
116
117To use the mysql returner, append '--return mysql' to the salt command.
118
119.. code-block:: bash
120
121    salt '*' test.ping --return mysql
122
123To use the alternative configuration, append '--return_config alternative' to the salt command.
124
125.. versionadded:: 2015.5.0
126
127.. code-block:: bash
128
129    salt '*' test.ping --return mysql --return_config alternative
130
131To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.
132
133.. versionadded:: 2016.3.0
134
135.. code-block:: bash
136
137    salt '*' test.ping --return mysql --return_kwargs '{"db": "another-salt"}'
138
139"""
140
141import logging
142import sys
143from contextlib import contextmanager
144
145import salt.exceptions
146import salt.returners
147import salt.utils.jid
148import salt.utils.json
149
150# Let's not allow PyLint complain about string substitution
151# pylint: disable=W1321,E1321
152
153
154try:
155    # Trying to import MySQLdb
156    import MySQLdb
157    import MySQLdb.cursors
158    import MySQLdb.converters
159    from MySQLdb.connections import OperationalError
160except ImportError:
161    try:
162        # MySQLdb import failed, try to import PyMySQL
163        import pymysql
164
165        pymysql.install_as_MySQLdb()
166        import MySQLdb
167        import MySQLdb.cursors
168        import MySQLdb.converters
169        from MySQLdb.err import OperationalError
170    except ImportError:
171        MySQLdb = None
172
173log = logging.getLogger(__name__)
174
175# Define the module's virtual name
176__virtualname__ = "mysql"
177
178
179def __virtual__():
180    """
181    Confirm that a python mysql client is installed.
182    """
183    return bool(MySQLdb), "No python mysql client installed." if MySQLdb is None else ""
184
185
186def _get_options(ret=None):
187    """
188    Returns options used for the MySQL connection.
189    """
190    defaults = {
191        "host": "salt",
192        "user": "salt",
193        "pass": "salt",
194        "db": "salt",
195        "port": 3306,
196        "ssl_ca": None,
197        "ssl_cert": None,
198        "ssl_key": None,
199    }
200
201    attrs = {
202        "host": "host",
203        "user": "user",
204        "pass": "pass",
205        "db": "db",
206        "port": "port",
207        "ssl_ca": "ssl_ca",
208        "ssl_cert": "ssl_cert",
209        "ssl_key": "ssl_key",
210    }
211
212    _options = salt.returners.get_returner_options(
213        __virtualname__,
214        ret,
215        attrs,
216        __salt__=__salt__,
217        __opts__=__opts__,
218        defaults=defaults,
219    )
220    # post processing
221    for k, v in _options.items():
222        if isinstance(v, str) and v.lower() == "none":
223            # Ensure 'None' is rendered as None
224            _options[k] = None
225        if k == "port":
226            # Ensure port is an int
227            _options[k] = int(v)
228
229    return _options
230
231
232@contextmanager
233def _get_serv(ret=None, commit=False):
234    """
235    Return a mysql cursor
236    """
237    _options = _get_options(ret)
238
239    connect = True
240    if __context__ and "mysql_returner_conn" in __context__:
241        try:
242            log.debug("Trying to reuse MySQL connection pool")
243            conn = __context__["mysql_returner_conn"]
244            conn.ping()
245            connect = False
246        except OperationalError as exc:
247            log.debug("OperationalError on ping: %s", exc)
248
249    if connect:
250        log.debug("Generating new MySQL connection pool")
251        try:
252            # An empty ssl_options dictionary passed to MySQLdb.connect will
253            # effectively connect w/o SSL.
254            ssl_options = {}
255            if _options.get("ssl_ca"):
256                ssl_options["ca"] = _options.get("ssl_ca")
257            if _options.get("ssl_cert"):
258                ssl_options["cert"] = _options.get("ssl_cert")
259            if _options.get("ssl_key"):
260                ssl_options["key"] = _options.get("ssl_key")
261            conn = MySQLdb.connect(
262                host=_options.get("host"),
263                user=_options.get("user"),
264                passwd=_options.get("pass"),
265                db=_options.get("db"),
266                port=_options.get("port"),
267                ssl=ssl_options,
268            )
269
270            try:
271                __context__["mysql_returner_conn"] = conn
272            except TypeError:
273                pass
274        except OperationalError as exc:
275            raise salt.exceptions.SaltMasterError(
276                "MySQL returner could not connect to database: {exc}".format(exc=exc)
277            )
278
279    cursor = conn.cursor()
280
281    try:
282        yield cursor
283    except MySQLdb.DatabaseError as err:
284        error = err.args
285        sys.stderr.write(str(error))
286        cursor.execute("ROLLBACK")
287        raise
288    else:
289        if commit:
290            cursor.execute("COMMIT")
291        else:
292            cursor.execute("ROLLBACK")
293
294
295def returner(ret):
296    """
297    Return data to a mysql server
298    """
299    # if a minion is returning a standalone job, get a jobid
300    if ret["jid"] == "req":
301        ret["jid"] = prep_jid(nocache=ret.get("nocache", False))
302        save_load(ret["jid"], ret)
303
304    try:
305        with _get_serv(ret, commit=True) as cur:
306            sql = """INSERT INTO `salt_returns`
307                     (`fun`, `jid`, `return`, `id`, `success`, `full_ret`)
308                     VALUES (%s, %s, %s, %s, %s, %s)"""
309
310            cur.execute(
311                sql,
312                (
313                    ret["fun"],
314                    ret["jid"],
315                    salt.utils.json.dumps(ret["return"]),
316                    ret["id"],
317                    ret.get("success", False),
318                    salt.utils.json.dumps(ret),
319                ),
320            )
321    except salt.exceptions.SaltMasterError as exc:
322        log.critical(exc)
323        log.critical(
324            "Could not store return with MySQL returner. MySQL server unavailable."
325        )
326
327
328def event_return(events):
329    """
330    Return event to mysql server
331
332    Requires that configuration be enabled via 'event_return'
333    option in master config.
334    """
335    with _get_serv(events, commit=True) as cur:
336        for event in events:
337            tag = event.get("tag", "")
338            data = event.get("data", "")
339            sql = """INSERT INTO `salt_events` (`tag`, `data`, `master_id`)
340                     VALUES (%s, %s, %s)"""
341            cur.execute(sql, (tag, salt.utils.json.dumps(data), __opts__["id"]))
342
343
344def save_load(jid, load, minions=None):
345    """
346    Save the load to the specified jid id
347    """
348    with _get_serv(commit=True) as cur:
349
350        sql = """INSERT INTO `jids` (`jid`, `load`) VALUES (%s, %s)"""
351
352        try:
353            cur.execute(sql, (jid, salt.utils.json.dumps(load)))
354        except MySQLdb.IntegrityError:
355            # https://github.com/saltstack/salt/issues/22171
356            # Without this try/except we get tons of duplicate entry errors
357            # which result in job returns not being stored properly
358            pass
359
360
361def save_minions(jid, minions, syndic_id=None):  # pylint: disable=unused-argument
362    """
363    Included for API consistency
364    """
365
366
367def get_load(jid):
368    """
369    Return the load data that marks a specified jid
370    """
371    with _get_serv(ret=None, commit=True) as cur:
372
373        sql = """SELECT `load` FROM `jids` WHERE `jid` = %s;"""
374        cur.execute(sql, (jid,))
375        data = cur.fetchone()
376        if data:
377            return salt.utils.json.loads(data[0])
378        return {}
379
380
381def get_jid(jid):
382    """
383    Return the information returned when the specified job id was executed
384    """
385    with _get_serv(ret=None, commit=True) as cur:
386
387        sql = """SELECT id, full_ret FROM `salt_returns`
388                WHERE `jid` = %s"""
389
390        cur.execute(sql, (jid,))
391        data = cur.fetchall()
392        ret = {}
393        if data:
394            for minion, full_ret in data:
395                ret[minion] = salt.utils.json.loads(full_ret)
396        return ret
397
398
399def get_fun(fun):
400    """
401    Return a dict of the last function called for all minions
402    """
403    with _get_serv(ret=None, commit=True) as cur:
404
405        sql = """SELECT s.id,s.jid, s.full_ret
406                FROM `salt_returns` s
407                JOIN ( SELECT MAX(`jid`) as jid
408                    from `salt_returns` GROUP BY fun, id) max
409                ON s.jid = max.jid
410                WHERE s.fun = %s
411                """
412
413        cur.execute(sql, (fun,))
414        data = cur.fetchall()
415
416        ret = {}
417        if data:
418            for minion, _, full_ret in data:
419                ret[minion] = salt.utils.json.loads(full_ret)
420        return ret
421
422
423def get_jids():
424    """
425    Return a list of all job ids
426    """
427    with _get_serv(ret=None, commit=True) as cur:
428
429        sql = """SELECT DISTINCT `jid`, `load`
430                FROM `jids`"""
431
432        cur.execute(sql)
433        data = cur.fetchall()
434        ret = {}
435        for jid in data:
436            ret[jid[0]] = salt.utils.jid.format_jid_instance(
437                jid[0], salt.utils.json.loads(jid[1])
438            )
439        return ret
440
441
442def get_jids_filter(count, filter_find_job=True):
443    """
444    Return a list of all job ids
445    :param int count: show not more than the count of most recent jobs
446    :param bool filter_find_jobs: filter out 'saltutil.find_job' jobs
447    """
448    with _get_serv(ret=None, commit=True) as cur:
449
450        sql = """SELECT * FROM (
451                     SELECT DISTINCT `jid` ,`load` FROM `jids`
452                     {0}
453                     ORDER BY `jid` DESC limit {1}
454                     ) `tmp`
455                 ORDER BY `jid`;"""
456        where = """WHERE `load` NOT LIKE '%"fun": "saltutil.find_job"%' """
457
458        cur.execute(sql.format(where if filter_find_job else "", count))
459        data = cur.fetchall()
460        ret = []
461        for jid in data:
462            ret.append(
463                salt.utils.jid.format_jid_instance_ext(
464                    jid[0], salt.utils.json.loads(jid[1])
465                )
466            )
467        return ret
468
469
470def get_minions():
471    """
472    Return a list of minions
473    """
474    with _get_serv(ret=None, commit=True) as cur:
475
476        sql = """SELECT DISTINCT id
477                FROM `salt_returns`"""
478
479        cur.execute(sql)
480        data = cur.fetchall()
481        ret = []
482        for minion in data:
483            ret.append(minion[0])
484        return ret
485
486
487def prep_jid(nocache=False, passed_jid=None):  # pylint: disable=unused-argument
488    """
489    Do any work necessary to prepare a JID, including sending a custom id
490    """
491    return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)
492
493
494def _purge_jobs(timestamp):
495    """
496    Purge records from the returner tables.
497    :param job_age_in_seconds:  Purge jobs older than this
498    :return:
499    """
500    with _get_serv() as cur:
501        try:
502            sql = (
503                "delete from `jids` where jid in (select distinct jid from salt_returns"
504                " where alter_time < %s)"
505            )
506            cur.execute(sql, (timestamp,))
507            cur.execute("COMMIT")
508        except MySQLdb.Error as e:
509            log.error(
510                "mysql returner archiver was unable to delete contents of table 'jids'"
511            )
512            log.error(str(e))
513            raise salt.exceptions.SaltRunnerError(str(e))
514
515        try:
516            sql = "delete from `salt_returns` where alter_time < %s"
517            cur.execute(sql, (timestamp,))
518            cur.execute("COMMIT")
519        except MySQLdb.Error as e:
520            log.error(
521                "mysql returner archiver was unable to delete contents of table"
522                " 'salt_returns'"
523            )
524            log.error(str(e))
525            raise salt.exceptions.SaltRunnerError(str(e))
526
527        try:
528            sql = "delete from `salt_events` where alter_time < %s"
529            cur.execute(sql, (timestamp,))
530            cur.execute("COMMIT")
531        except MySQLdb.Error as e:
532            log.error(
533                "mysql returner archiver was unable to delete contents of table"
534                " 'salt_events'"
535            )
536            log.error(str(e))
537            raise salt.exceptions.SaltRunnerError(str(e))
538
539    return True
540
541
542def _archive_jobs(timestamp):
543    """
544    Copy rows to a set of backup tables, then purge rows.
545    :param timestamp: Archive rows older than this timestamp
546    :return:
547    """
548    source_tables = ["jids", "salt_returns", "salt_events"]
549
550    with _get_serv() as cur:
551        target_tables = {}
552        for table_name in source_tables:
553            try:
554                tmp_table_name = table_name + "_archive"
555                sql = "create table if not exists {} like {}".format(
556                    tmp_table_name, table_name
557                )
558                cur.execute(sql)
559                cur.execute("COMMIT")
560                target_tables[table_name] = tmp_table_name
561            except MySQLdb.Error as e:
562                log.error(
563                    "mysql returner archiver was unable to create the archive tables."
564                )
565                log.error(str(e))
566                raise salt.exceptions.SaltRunnerError(str(e))
567
568        try:
569            sql = (
570                "insert into `{}` select * from `{}` where jid in (select distinct jid"
571                " from salt_returns where alter_time < %s)".format(
572                    target_tables["jids"], "jids"
573                )
574            )
575            cur.execute(sql, (timestamp,))
576            cur.execute("COMMIT")
577        except MySQLdb.Error as e:
578            log.error(
579                "mysql returner archiver was unable to copy contents of table 'jids'"
580            )
581            log.error(str(e))
582            raise salt.exceptions.SaltRunnerError(str(e))
583        except Exception as e:  # pylint: disable=broad-except
584            log.error(e)
585            raise
586
587        try:
588            sql = "insert into `{}` select * from `{}` where alter_time < %s".format(
589                target_tables["salt_returns"], "salt_returns"
590            )
591            cur.execute(sql, (timestamp,))
592            cur.execute("COMMIT")
593        except MySQLdb.Error as e:
594            log.error(
595                "mysql returner archiver was unable to copy contents of table"
596                " 'salt_returns'"
597            )
598            log.error(str(e))
599            raise salt.exceptions.SaltRunnerError(str(e))
600
601        try:
602            sql = "insert into `{}` select * from `{}` where alter_time < %s".format(
603                target_tables["salt_events"], "salt_events"
604            )
605            cur.execute(sql, (timestamp,))
606            cur.execute("COMMIT")
607        except MySQLdb.Error as e:
608            log.error(
609                "mysql returner archiver was unable to copy contents of table"
610                " 'salt_events'"
611            )
612            log.error(str(e))
613            raise salt.exceptions.SaltRunnerError(str(e))
614
615    return _purge_jobs(timestamp)
616
617
618def clean_old_jobs():
619    """
620    Called in the master's event loop every loop_interval.  Archives and/or
621    deletes the events and job details from the database.
622    :return:
623    """
624    if __opts__.get("keep_jobs", False) and int(__opts__.get("keep_jobs", 0)) > 0:
625        try:
626            with _get_serv() as cur:
627                sql = "select date_sub(now(), interval {} hour) as stamp;".format(
628                    __opts__["keep_jobs"]
629                )
630                cur.execute(sql)
631                rows = cur.fetchall()
632                stamp = rows[0][0]
633
634            if __opts__.get("archive_jobs", False):
635                _archive_jobs(stamp)
636            else:
637                _purge_jobs(stamp)
638        except MySQLdb.Error as e:
639            log.error(
640                "Mysql returner was unable to get timestamp for purge/archive of jobs"
641            )
642            log.error(str(e))
643            raise salt.exceptions.SaltRunnerError(str(e))
644