1""" 2Return data to a mysql server 3 4:maintainer: Dave Boucha <dave@saltstack.com>, Seth House <shouse@saltstack.com> 5:maturity: mature 6:depends: python-mysqldb 7:platform: all 8 9To enable this returner, the minion will need the python client for mysql 10installed and the following values configured in the minion or master 11config. These are the defaults: 12 13.. code-block:: yaml 14 15 mysql.host: 'salt' 16 mysql.user: 'salt' 17 mysql.pass: 'salt' 18 mysql.db: 'salt' 19 mysql.port: 3306 20 21SSL is optional. The defaults are set to None. If you do not want to use SSL, 22either exclude these options or set them to None. 23 24.. code-block:: yaml 25 26 mysql.ssl_ca: None 27 mysql.ssl_cert: None 28 mysql.ssl_key: None 29 30Alternative configuration values can be used by prefacing the configuration 31with `alternative.`. Any values not found in the alternative configuration will 32be pulled from the default location. As stated above, SSL configuration is 33optional. The following ssl options are simply for illustration purposes: 34 35.. code-block:: yaml 36 37 alternative.mysql.host: 'salt' 38 alternative.mysql.user: 'salt' 39 alternative.mysql.pass: 'salt' 40 alternative.mysql.db: 'salt' 41 alternative.mysql.port: 3306 42 alternative.mysql.ssl_ca: '/etc/pki/mysql/certs/localhost.pem' 43 alternative.mysql.ssl_cert: '/etc/pki/mysql/certs/localhost.crt' 44 alternative.mysql.ssl_key: '/etc/pki/mysql/certs/localhost.key' 45 46Should you wish the returner data to be cleaned out every so often, set 47`keep_jobs` to the number of hours for the jobs to live in the tables. 48Setting it to `0` will cause the data to stay in the tables. The default 49setting for `keep_jobs` is set to `24`. 50 51Should you wish to archive jobs in a different table for later processing, 52set `archive_jobs` to True. Salt will create 3 archive tables 53 54- `jids_archive` 55- `salt_returns_archive` 56- `salt_events_archive` 57 58and move the contents of `jids`, `salt_returns`, and `salt_events` that are 59more than `keep_jobs` hours old to these tables. 60 61Use the following mysql database schema: 62 63.. code-block:: sql 64 65 CREATE DATABASE `salt` 66 DEFAULT CHARACTER SET utf8 67 DEFAULT COLLATE utf8_general_ci; 68 69 USE `salt`; 70 71 -- 72 -- Table structure for table `jids` 73 -- 74 75 DROP TABLE IF EXISTS `jids`; 76 CREATE TABLE `jids` ( 77 `jid` varchar(255) NOT NULL, 78 `load` mediumtext NOT NULL, 79 UNIQUE KEY `jid` (`jid`) 80 ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 81 82 -- 83 -- Table structure for table `salt_returns` 84 -- 85 86 DROP TABLE IF EXISTS `salt_returns`; 87 CREATE TABLE `salt_returns` ( 88 `fun` varchar(50) NOT NULL, 89 `jid` varchar(255) NOT NULL, 90 `return` mediumtext NOT NULL, 91 `id` varchar(255) NOT NULL, 92 `success` varchar(10) NOT NULL, 93 `full_ret` mediumtext NOT NULL, 94 `alter_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 95 KEY `id` (`id`), 96 KEY `jid` (`jid`), 97 KEY `fun` (`fun`) 98 ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 99 100 -- 101 -- Table structure for table `salt_events` 102 -- 103 104 DROP TABLE IF EXISTS `salt_events`; 105 CREATE TABLE `salt_events` ( 106 `id` BIGINT NOT NULL AUTO_INCREMENT, 107 `tag` varchar(255) NOT NULL, 108 `data` mediumtext NOT NULL, 109 `alter_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 110 `master_id` varchar(255) NOT NULL, 111 PRIMARY KEY (`id`), 112 KEY `tag` (`tag`) 113 ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 114 115Required python modules: MySQLdb 116 117To use the mysql returner, append '--return mysql' to the salt command. 118 119.. code-block:: bash 120 121 salt '*' test.ping --return mysql 122 123To use the alternative configuration, append '--return_config alternative' to the salt command. 124 125.. versionadded:: 2015.5.0 126 127.. code-block:: bash 128 129 salt '*' test.ping --return mysql --return_config alternative 130 131To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command. 132 133.. versionadded:: 2016.3.0 134 135.. code-block:: bash 136 137 salt '*' test.ping --return mysql --return_kwargs '{"db": "another-salt"}' 138 139""" 140 141import logging 142import sys 143from contextlib import contextmanager 144 145import salt.exceptions 146import salt.returners 147import salt.utils.jid 148import salt.utils.json 149 150# Let's not allow PyLint complain about string substitution 151# pylint: disable=W1321,E1321 152 153 154try: 155 # Trying to import MySQLdb 156 import MySQLdb 157 import MySQLdb.cursors 158 import MySQLdb.converters 159 from MySQLdb.connections import OperationalError 160except ImportError: 161 try: 162 # MySQLdb import failed, try to import PyMySQL 163 import pymysql 164 165 pymysql.install_as_MySQLdb() 166 import MySQLdb 167 import MySQLdb.cursors 168 import MySQLdb.converters 169 from MySQLdb.err import OperationalError 170 except ImportError: 171 MySQLdb = None 172 173log = logging.getLogger(__name__) 174 175# Define the module's virtual name 176__virtualname__ = "mysql" 177 178 179def __virtual__(): 180 """ 181 Confirm that a python mysql client is installed. 182 """ 183 return bool(MySQLdb), "No python mysql client installed." if MySQLdb is None else "" 184 185 186def _get_options(ret=None): 187 """ 188 Returns options used for the MySQL connection. 189 """ 190 defaults = { 191 "host": "salt", 192 "user": "salt", 193 "pass": "salt", 194 "db": "salt", 195 "port": 3306, 196 "ssl_ca": None, 197 "ssl_cert": None, 198 "ssl_key": None, 199 } 200 201 attrs = { 202 "host": "host", 203 "user": "user", 204 "pass": "pass", 205 "db": "db", 206 "port": "port", 207 "ssl_ca": "ssl_ca", 208 "ssl_cert": "ssl_cert", 209 "ssl_key": "ssl_key", 210 } 211 212 _options = salt.returners.get_returner_options( 213 __virtualname__, 214 ret, 215 attrs, 216 __salt__=__salt__, 217 __opts__=__opts__, 218 defaults=defaults, 219 ) 220 # post processing 221 for k, v in _options.items(): 222 if isinstance(v, str) and v.lower() == "none": 223 # Ensure 'None' is rendered as None 224 _options[k] = None 225 if k == "port": 226 # Ensure port is an int 227 _options[k] = int(v) 228 229 return _options 230 231 232@contextmanager 233def _get_serv(ret=None, commit=False): 234 """ 235 Return a mysql cursor 236 """ 237 _options = _get_options(ret) 238 239 connect = True 240 if __context__ and "mysql_returner_conn" in __context__: 241 try: 242 log.debug("Trying to reuse MySQL connection pool") 243 conn = __context__["mysql_returner_conn"] 244 conn.ping() 245 connect = False 246 except OperationalError as exc: 247 log.debug("OperationalError on ping: %s", exc) 248 249 if connect: 250 log.debug("Generating new MySQL connection pool") 251 try: 252 # An empty ssl_options dictionary passed to MySQLdb.connect will 253 # effectively connect w/o SSL. 254 ssl_options = {} 255 if _options.get("ssl_ca"): 256 ssl_options["ca"] = _options.get("ssl_ca") 257 if _options.get("ssl_cert"): 258 ssl_options["cert"] = _options.get("ssl_cert") 259 if _options.get("ssl_key"): 260 ssl_options["key"] = _options.get("ssl_key") 261 conn = MySQLdb.connect( 262 host=_options.get("host"), 263 user=_options.get("user"), 264 passwd=_options.get("pass"), 265 db=_options.get("db"), 266 port=_options.get("port"), 267 ssl=ssl_options, 268 ) 269 270 try: 271 __context__["mysql_returner_conn"] = conn 272 except TypeError: 273 pass 274 except OperationalError as exc: 275 raise salt.exceptions.SaltMasterError( 276 "MySQL returner could not connect to database: {exc}".format(exc=exc) 277 ) 278 279 cursor = conn.cursor() 280 281 try: 282 yield cursor 283 except MySQLdb.DatabaseError as err: 284 error = err.args 285 sys.stderr.write(str(error)) 286 cursor.execute("ROLLBACK") 287 raise 288 else: 289 if commit: 290 cursor.execute("COMMIT") 291 else: 292 cursor.execute("ROLLBACK") 293 294 295def returner(ret): 296 """ 297 Return data to a mysql server 298 """ 299 # if a minion is returning a standalone job, get a jobid 300 if ret["jid"] == "req": 301 ret["jid"] = prep_jid(nocache=ret.get("nocache", False)) 302 save_load(ret["jid"], ret) 303 304 try: 305 with _get_serv(ret, commit=True) as cur: 306 sql = """INSERT INTO `salt_returns` 307 (`fun`, `jid`, `return`, `id`, `success`, `full_ret`) 308 VALUES (%s, %s, %s, %s, %s, %s)""" 309 310 cur.execute( 311 sql, 312 ( 313 ret["fun"], 314 ret["jid"], 315 salt.utils.json.dumps(ret["return"]), 316 ret["id"], 317 ret.get("success", False), 318 salt.utils.json.dumps(ret), 319 ), 320 ) 321 except salt.exceptions.SaltMasterError as exc: 322 log.critical(exc) 323 log.critical( 324 "Could not store return with MySQL returner. MySQL server unavailable." 325 ) 326 327 328def event_return(events): 329 """ 330 Return event to mysql server 331 332 Requires that configuration be enabled via 'event_return' 333 option in master config. 334 """ 335 with _get_serv(events, commit=True) as cur: 336 for event in events: 337 tag = event.get("tag", "") 338 data = event.get("data", "") 339 sql = """INSERT INTO `salt_events` (`tag`, `data`, `master_id`) 340 VALUES (%s, %s, %s)""" 341 cur.execute(sql, (tag, salt.utils.json.dumps(data), __opts__["id"])) 342 343 344def save_load(jid, load, minions=None): 345 """ 346 Save the load to the specified jid id 347 """ 348 with _get_serv(commit=True) as cur: 349 350 sql = """INSERT INTO `jids` (`jid`, `load`) VALUES (%s, %s)""" 351 352 try: 353 cur.execute(sql, (jid, salt.utils.json.dumps(load))) 354 except MySQLdb.IntegrityError: 355 # https://github.com/saltstack/salt/issues/22171 356 # Without this try/except we get tons of duplicate entry errors 357 # which result in job returns not being stored properly 358 pass 359 360 361def save_minions(jid, minions, syndic_id=None): # pylint: disable=unused-argument 362 """ 363 Included for API consistency 364 """ 365 366 367def get_load(jid): 368 """ 369 Return the load data that marks a specified jid 370 """ 371 with _get_serv(ret=None, commit=True) as cur: 372 373 sql = """SELECT `load` FROM `jids` WHERE `jid` = %s;""" 374 cur.execute(sql, (jid,)) 375 data = cur.fetchone() 376 if data: 377 return salt.utils.json.loads(data[0]) 378 return {} 379 380 381def get_jid(jid): 382 """ 383 Return the information returned when the specified job id was executed 384 """ 385 with _get_serv(ret=None, commit=True) as cur: 386 387 sql = """SELECT id, full_ret FROM `salt_returns` 388 WHERE `jid` = %s""" 389 390 cur.execute(sql, (jid,)) 391 data = cur.fetchall() 392 ret = {} 393 if data: 394 for minion, full_ret in data: 395 ret[minion] = salt.utils.json.loads(full_ret) 396 return ret 397 398 399def get_fun(fun): 400 """ 401 Return a dict of the last function called for all minions 402 """ 403 with _get_serv(ret=None, commit=True) as cur: 404 405 sql = """SELECT s.id,s.jid, s.full_ret 406 FROM `salt_returns` s 407 JOIN ( SELECT MAX(`jid`) as jid 408 from `salt_returns` GROUP BY fun, id) max 409 ON s.jid = max.jid 410 WHERE s.fun = %s 411 """ 412 413 cur.execute(sql, (fun,)) 414 data = cur.fetchall() 415 416 ret = {} 417 if data: 418 for minion, _, full_ret in data: 419 ret[minion] = salt.utils.json.loads(full_ret) 420 return ret 421 422 423def get_jids(): 424 """ 425 Return a list of all job ids 426 """ 427 with _get_serv(ret=None, commit=True) as cur: 428 429 sql = """SELECT DISTINCT `jid`, `load` 430 FROM `jids`""" 431 432 cur.execute(sql) 433 data = cur.fetchall() 434 ret = {} 435 for jid in data: 436 ret[jid[0]] = salt.utils.jid.format_jid_instance( 437 jid[0], salt.utils.json.loads(jid[1]) 438 ) 439 return ret 440 441 442def get_jids_filter(count, filter_find_job=True): 443 """ 444 Return a list of all job ids 445 :param int count: show not more than the count of most recent jobs 446 :param bool filter_find_jobs: filter out 'saltutil.find_job' jobs 447 """ 448 with _get_serv(ret=None, commit=True) as cur: 449 450 sql = """SELECT * FROM ( 451 SELECT DISTINCT `jid` ,`load` FROM `jids` 452 {0} 453 ORDER BY `jid` DESC limit {1} 454 ) `tmp` 455 ORDER BY `jid`;""" 456 where = """WHERE `load` NOT LIKE '%"fun": "saltutil.find_job"%' """ 457 458 cur.execute(sql.format(where if filter_find_job else "", count)) 459 data = cur.fetchall() 460 ret = [] 461 for jid in data: 462 ret.append( 463 salt.utils.jid.format_jid_instance_ext( 464 jid[0], salt.utils.json.loads(jid[1]) 465 ) 466 ) 467 return ret 468 469 470def get_minions(): 471 """ 472 Return a list of minions 473 """ 474 with _get_serv(ret=None, commit=True) as cur: 475 476 sql = """SELECT DISTINCT id 477 FROM `salt_returns`""" 478 479 cur.execute(sql) 480 data = cur.fetchall() 481 ret = [] 482 for minion in data: 483 ret.append(minion[0]) 484 return ret 485 486 487def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument 488 """ 489 Do any work necessary to prepare a JID, including sending a custom id 490 """ 491 return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__) 492 493 494def _purge_jobs(timestamp): 495 """ 496 Purge records from the returner tables. 497 :param job_age_in_seconds: Purge jobs older than this 498 :return: 499 """ 500 with _get_serv() as cur: 501 try: 502 sql = ( 503 "delete from `jids` where jid in (select distinct jid from salt_returns" 504 " where alter_time < %s)" 505 ) 506 cur.execute(sql, (timestamp,)) 507 cur.execute("COMMIT") 508 except MySQLdb.Error as e: 509 log.error( 510 "mysql returner archiver was unable to delete contents of table 'jids'" 511 ) 512 log.error(str(e)) 513 raise salt.exceptions.SaltRunnerError(str(e)) 514 515 try: 516 sql = "delete from `salt_returns` where alter_time < %s" 517 cur.execute(sql, (timestamp,)) 518 cur.execute("COMMIT") 519 except MySQLdb.Error as e: 520 log.error( 521 "mysql returner archiver was unable to delete contents of table" 522 " 'salt_returns'" 523 ) 524 log.error(str(e)) 525 raise salt.exceptions.SaltRunnerError(str(e)) 526 527 try: 528 sql = "delete from `salt_events` where alter_time < %s" 529 cur.execute(sql, (timestamp,)) 530 cur.execute("COMMIT") 531 except MySQLdb.Error as e: 532 log.error( 533 "mysql returner archiver was unable to delete contents of table" 534 " 'salt_events'" 535 ) 536 log.error(str(e)) 537 raise salt.exceptions.SaltRunnerError(str(e)) 538 539 return True 540 541 542def _archive_jobs(timestamp): 543 """ 544 Copy rows to a set of backup tables, then purge rows. 545 :param timestamp: Archive rows older than this timestamp 546 :return: 547 """ 548 source_tables = ["jids", "salt_returns", "salt_events"] 549 550 with _get_serv() as cur: 551 target_tables = {} 552 for table_name in source_tables: 553 try: 554 tmp_table_name = table_name + "_archive" 555 sql = "create table if not exists {} like {}".format( 556 tmp_table_name, table_name 557 ) 558 cur.execute(sql) 559 cur.execute("COMMIT") 560 target_tables[table_name] = tmp_table_name 561 except MySQLdb.Error as e: 562 log.error( 563 "mysql returner archiver was unable to create the archive tables." 564 ) 565 log.error(str(e)) 566 raise salt.exceptions.SaltRunnerError(str(e)) 567 568 try: 569 sql = ( 570 "insert into `{}` select * from `{}` where jid in (select distinct jid" 571 " from salt_returns where alter_time < %s)".format( 572 target_tables["jids"], "jids" 573 ) 574 ) 575 cur.execute(sql, (timestamp,)) 576 cur.execute("COMMIT") 577 except MySQLdb.Error as e: 578 log.error( 579 "mysql returner archiver was unable to copy contents of table 'jids'" 580 ) 581 log.error(str(e)) 582 raise salt.exceptions.SaltRunnerError(str(e)) 583 except Exception as e: # pylint: disable=broad-except 584 log.error(e) 585 raise 586 587 try: 588 sql = "insert into `{}` select * from `{}` where alter_time < %s".format( 589 target_tables["salt_returns"], "salt_returns" 590 ) 591 cur.execute(sql, (timestamp,)) 592 cur.execute("COMMIT") 593 except MySQLdb.Error as e: 594 log.error( 595 "mysql returner archiver was unable to copy contents of table" 596 " 'salt_returns'" 597 ) 598 log.error(str(e)) 599 raise salt.exceptions.SaltRunnerError(str(e)) 600 601 try: 602 sql = "insert into `{}` select * from `{}` where alter_time < %s".format( 603 target_tables["salt_events"], "salt_events" 604 ) 605 cur.execute(sql, (timestamp,)) 606 cur.execute("COMMIT") 607 except MySQLdb.Error as e: 608 log.error( 609 "mysql returner archiver was unable to copy contents of table" 610 " 'salt_events'" 611 ) 612 log.error(str(e)) 613 raise salt.exceptions.SaltRunnerError(str(e)) 614 615 return _purge_jobs(timestamp) 616 617 618def clean_old_jobs(): 619 """ 620 Called in the master's event loop every loop_interval. Archives and/or 621 deletes the events and job details from the database. 622 :return: 623 """ 624 if __opts__.get("keep_jobs", False) and int(__opts__.get("keep_jobs", 0)) > 0: 625 try: 626 with _get_serv() as cur: 627 sql = "select date_sub(now(), interval {} hour) as stamp;".format( 628 __opts__["keep_jobs"] 629 ) 630 cur.execute(sql) 631 rows = cur.fetchall() 632 stamp = rows[0][0] 633 634 if __opts__.get("archive_jobs", False): 635 _archive_jobs(stamp) 636 else: 637 _purge_jobs(stamp) 638 except MySQLdb.Error as e: 639 log.error( 640 "Mysql returner was unable to get timestamp for purge/archive of jobs" 641 ) 642 log.error(str(e)) 643 raise salt.exceptions.SaltRunnerError(str(e)) 644