1#!/usr/bin/python 2# -*- coding: utf-8 -*- 3 4# Copyright: (c) 2019, Loic Blot (@nerzhul) <loic.blot@unix-experience.fr> 5# Copyright: (c) 2019, Andrew Klychkov (@Andersson007) <aaklychkov@mail.ru> 6# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 7 8from __future__ import absolute_import, division, print_function 9__metaclass__ = type 10 11 12ANSIBLE_METADATA = {'metadata_version': '1.1', 13 'status': ['preview'], 14 'supported_by': 'community'} 15 16 17DOCUMENTATION = r''' 18--- 19module: postgresql_publication 20short_description: Add, update, or remove PostgreSQL publication 21description: 22- Add, update, or remove PostgreSQL publication. 23version_added: "2.9" 24options: 25 name: 26 description: 27 - Name of the publication to add, update, or remove. 28 required: true 29 type: str 30 db: 31 description: 32 - Name of the database to connect to and where 33 the publication state will be changed. 34 aliases: [ login_db ] 35 type: str 36 tables: 37 description: 38 - List of tables to add to the publication. 39 - If no value is set all tables are targeted. 40 - If the publication already exists for specific tables and I(tables) is not passed, 41 nothing will be changed. If you need to add all tables to the publication with the same name, 42 drop existent and create new without passing I(tables). 43 type: list 44 elements: str 45 state: 46 description: 47 - The publication state. 48 default: present 49 choices: [ absent, present ] 50 type: str 51 parameters: 52 description: 53 - Dictionary with optional publication parameters. 54 - Available parameters depend on PostgreSQL version. 55 type: dict 56 owner: 57 description: 58 - Publication owner. 59 - If I(owner) is not defined, the owner will be set as I(login_user) or I(session_role). 60 type: str 61 cascade: 62 description: 63 - Drop publication dependencies. Has effect with I(state=absent) only. 64 type: bool 65 default: false 66notes: 67- PostgreSQL version must be 10 or greater. 68seealso: 69- name: CREATE PUBLICATION reference 70 description: Complete reference of the CREATE PUBLICATION command documentation. 71 link: https://www.postgresql.org/docs/current/sql-createpublication.html 72- name: ALTER PUBLICATION reference 73 description: Complete reference of the ALTER PUBLICATION command documentation. 74 link: https://www.postgresql.org/docs/current/sql-alterpublication.html 75- name: DROP PUBLICATION reference 76 description: Complete reference of the DROP PUBLICATION command documentation. 77 link: https://www.postgresql.org/docs/current/sql-droppublication.html 78author: 79- Loic Blot (@nerzhul) <loic.blot@unix-experience.fr> 80- Andrew Klychkov (@Andersson007) <aaklychkov@mail.ru> 81extends_documentation_fragment: 82- postgres 83''' 84 85EXAMPLES = r''' 86- name: Create a new publication with name "acme" targeting all tables in database "test". 87 postgresql_publication: 88 db: test 89 name: acme 90 91- name: Create publication "acme" publishing only prices and vehicles tables. 92 postgresql_publication: 93 name: acme 94 tables: 95 - prices 96 - vehicles 97 98- name: > 99 Create publication "acme", set user alice as an owner, targeting all tables. 100 Allowable DML operations are INSERT and UPDATE only 101 postgresql_publication: 102 name: acme 103 owner: alice 104 parameters: 105 publish: 'insert,update' 106 107- name: > 108 Assuming publication "acme" exists and there are targeted 109 tables "prices" and "vehicles", add table "stores" to the publication. 110 postgresql_publication: 111 name: acme 112 tables: 113 - prices 114 - vehicles 115 - stores 116 117- name: Remove publication "acme" if exists in database "test". 118 postgresql_publication: 119 db: test 120 name: acme 121 state: absent 122''' 123 124RETURN = r''' 125exists: 126 description: 127 - Flag indicates the publication exists or not at the end of runtime. 128 returned: always 129 type: bool 130 sample: true 131queries: 132 description: List of executed queries. 133 returned: always 134 type: str 135 sample: [ 'DROP PUBLICATION "acme" CASCADE' ] 136owner: 137 description: Owner of the publication at the end of runtime. 138 returned: if publication exists 139 type: str 140 sample: "alice" 141tables: 142 description: 143 - List of tables in the publication at the end of runtime. 144 - If all tables are published, returns empty list. 145 returned: if publication exists 146 type: list 147 sample: ["\"public\".\"prices\"", "\"public\".\"vehicles\""] 148alltables: 149 description: 150 - Flag indicates that all tables are published. 151 returned: if publication exists 152 type: bool 153 sample: false 154parameters: 155 description: Publication parameters at the end of runtime. 156 returned: if publication exists 157 type: dict 158 sample: {'publish': {'insert': false, 'delete': false, 'update': true}} 159''' 160 161 162try: 163 from psycopg2.extras import DictCursor 164except ImportError: 165 # psycopg2 is checked by connect_to_db() 166 # from ansible.module_utils.postgres 167 pass 168 169from ansible.module_utils.basic import AnsibleModule 170from ansible.module_utils.database import pg_quote_identifier 171from ansible.module_utils.postgres import ( 172 connect_to_db, 173 exec_sql, 174 get_conn_params, 175 postgres_common_argument_spec, 176) 177from ansible.module_utils.six import iteritems 178 179SUPPORTED_PG_VERSION = 10000 180 181 182################################ 183# Module functions and classes # 184################################ 185 186def transform_tables_representation(tbl_list): 187 """Add 'public.' to names of tables where a schema identifier is absent 188 and add quotes to each element. 189 190 Args: 191 tbl_list (list): List of table names. 192 193 Returns: 194 tbl_list (list): Changed list. 195 """ 196 for i, table in enumerate(tbl_list): 197 if '.' not in table: 198 tbl_list[i] = pg_quote_identifier('public.%s' % table.strip(), 'table') 199 else: 200 tbl_list[i] = pg_quote_identifier(table.strip(), 'table') 201 202 return tbl_list 203 204 205class PgPublication(): 206 """Class to work with PostgreSQL publication. 207 208 Args: 209 module (AnsibleModule): Object of AnsibleModule class. 210 cursor (cursor): Cursor object of psycopg2 library to work with PostgreSQL. 211 name (str): The name of the publication. 212 213 Attributes: 214 module (AnsibleModule): Object of AnsibleModule class. 215 cursor (cursor): Cursor object of psycopg2 library to work with PostgreSQL. 216 name (str): Name of the publication. 217 executed_queries (list): List of executed queries. 218 attrs (dict): Dict with publication attributes. 219 exists (bool): Flag indicates the publication exists or not. 220 """ 221 222 def __init__(self, module, cursor, name): 223 self.module = module 224 self.cursor = cursor 225 self.name = name 226 self.executed_queries = [] 227 self.attrs = { 228 'alltables': False, 229 'tables': [], 230 'parameters': {}, 231 'owner': '', 232 } 233 self.exists = self.check_pub() 234 235 def get_info(self): 236 """Refresh the publication information. 237 238 Returns: 239 ``self.attrs``. 240 """ 241 self.exists = self.check_pub() 242 return self.attrs 243 244 def check_pub(self): 245 """Check the publication and refresh ``self.attrs`` publication attribute. 246 247 Returns: 248 True if the publication with ``self.name`` exists, False otherwise. 249 """ 250 251 pub_info = self.__get_general_pub_info() 252 253 if not pub_info: 254 # Publication does not exist: 255 return False 256 257 self.attrs['owner'] = pub_info.get('pubowner') 258 259 # Publication DML operations: 260 self.attrs['parameters']['publish'] = {} 261 self.attrs['parameters']['publish']['insert'] = pub_info.get('pubinsert', False) 262 self.attrs['parameters']['publish']['update'] = pub_info.get('pubupdate', False) 263 self.attrs['parameters']['publish']['delete'] = pub_info.get('pubdelete', False) 264 if pub_info.get('pubtruncate'): 265 self.attrs['parameters']['publish']['truncate'] = pub_info.get('pubtruncate') 266 267 # If alltables flag is False, get the list of targeted tables: 268 if not pub_info.get('puballtables'): 269 table_info = self.__get_tables_pub_info() 270 # Join sublists [['schema', 'table'], ...] to ['schema.table', ...] 271 # for better representation: 272 for i, schema_and_table in enumerate(table_info): 273 table_info[i] = pg_quote_identifier('.'.join(schema_and_table), 'table') 274 275 self.attrs['tables'] = table_info 276 else: 277 self.attrs['alltables'] = True 278 279 # Publication exists: 280 return True 281 282 def create(self, tables, params, owner, check_mode=True): 283 """Create the publication. 284 285 Args: 286 tables (list): List with names of the tables that need to be added to the publication. 287 params (dict): Dict contains optional publication parameters and their values. 288 owner (str): Name of the publication owner. 289 290 Kwargs: 291 check_mode (bool): If True, don't actually change anything, 292 just make SQL, add it to ``self.executed_queries`` and return True. 293 294 Returns: 295 changed (bool): True if publication has been created, otherwise False. 296 """ 297 changed = True 298 299 query_fragments = ["CREATE PUBLICATION %s" % pg_quote_identifier(self.name, 'publication')] 300 301 if tables: 302 query_fragments.append("FOR TABLE %s" % ', '.join(tables)) 303 else: 304 query_fragments.append("FOR ALL TABLES") 305 306 if params: 307 params_list = [] 308 # Make list ["param = 'value'", ...] from params dict: 309 for (key, val) in iteritems(params): 310 params_list.append("%s = '%s'" % (key, val)) 311 312 # Add the list to query_fragments: 313 query_fragments.append("WITH (%s)" % ', '.join(params_list)) 314 315 changed = self.__exec_sql(' '.join(query_fragments), check_mode=check_mode) 316 317 if owner: 318 # If check_mode, just add possible SQL to 319 # executed_queries and return: 320 self.__pub_set_owner(owner, check_mode=check_mode) 321 322 return changed 323 324 def update(self, tables, params, owner, check_mode=True): 325 """Update the publication. 326 327 Args: 328 tables (list): List with names of the tables that need to be presented in the publication. 329 params (dict): Dict contains optional publication parameters and their values. 330 owner (str): Name of the publication owner. 331 332 Kwargs: 333 check_mode (bool): If True, don't actually change anything, 334 just make SQL, add it to ``self.executed_queries`` and return True. 335 336 Returns: 337 changed (bool): True if publication has been updated, otherwise False. 338 """ 339 changed = False 340 341 # Add or drop tables from published tables suit: 342 if tables and not self.attrs['alltables']: 343 344 # 1. If needs to add table to the publication: 345 for tbl in tables: 346 if tbl not in self.attrs['tables']: 347 # If needs to add table to the publication: 348 changed = self.__pub_add_table(tbl, check_mode=check_mode) 349 350 # 2. if there is a table in targeted tables 351 # that's not presented in the passed tables: 352 for tbl in self.attrs['tables']: 353 if tbl not in tables: 354 changed = self.__pub_drop_table(tbl, check_mode=check_mode) 355 356 elif tables and self.attrs['alltables']: 357 changed = self.__pub_set_tables(tables, check_mode=check_mode) 358 359 # Update pub parameters: 360 if params: 361 for key, val in iteritems(params): 362 if self.attrs['parameters'].get(key): 363 364 # In PostgreSQL 10/11 only 'publish' optional parameter is presented. 365 if key == 'publish': 366 # 'publish' value can be only a string with comma-separated items 367 # of allowed DML operations like 'insert,update' or 368 # 'insert,update,delete', etc. 369 # Make dictionary to compare with current attrs later: 370 val_dict = self.attrs['parameters']['publish'].copy() 371 val_list = val.split(',') 372 for v in val_dict: 373 if v in val_list: 374 val_dict[v] = True 375 else: 376 val_dict[v] = False 377 378 # Compare val_dict and the dict with current 'publish' parameters, 379 # if they're different, set new values: 380 if val_dict != self.attrs['parameters']['publish']: 381 changed = self.__pub_set_param(key, val, check_mode=check_mode) 382 383 # Default behavior for other cases: 384 elif self.attrs['parameters'][key] != val: 385 changed = self.__pub_set_param(key, val, check_mode=check_mode) 386 387 else: 388 # If the parameter was not set before: 389 changed = self.__pub_set_param(key, val, check_mode=check_mode) 390 391 # Update pub owner: 392 if owner: 393 if owner != self.attrs['owner']: 394 changed = self.__pub_set_owner(owner, check_mode=check_mode) 395 396 return changed 397 398 def drop(self, cascade=False, check_mode=True): 399 """Drop the publication. 400 401 Kwargs: 402 cascade (bool): Flag indicates that publication needs to be deleted 403 with its dependencies. 404 check_mode (bool): If True, don't actually change anything, 405 just make SQL, add it to ``self.executed_queries`` and return True. 406 407 Returns: 408 changed (bool): True if publication has been updated, otherwise False. 409 """ 410 if self.exists: 411 query_fragments = [] 412 query_fragments.append("DROP PUBLICATION %s" % pg_quote_identifier(self.name, 'publication')) 413 if cascade: 414 query_fragments.append("CASCADE") 415 416 return self.__exec_sql(' '.join(query_fragments), check_mode=check_mode) 417 418 def __get_general_pub_info(self): 419 """Get and return general publication information. 420 421 Returns: 422 Dict with publication information if successful, False otherwise. 423 """ 424 # Check pg_publication.pubtruncate exists (supported from PostgreSQL 11): 425 pgtrunc_sup = exec_sql(self, ("SELECT 1 FROM information_schema.columns " 426 "WHERE table_name = 'pg_publication' " 427 "AND column_name = 'pubtruncate'"), add_to_executed=False) 428 429 if pgtrunc_sup: 430 query = ("SELECT r.rolname AS pubowner, p.puballtables, p.pubinsert, " 431 "p.pubupdate , p.pubdelete, p.pubtruncate FROM pg_publication AS p " 432 "JOIN pg_catalog.pg_roles AS r " 433 "ON p.pubowner = r.oid " 434 "WHERE p.pubname = '%s'" % self.name) 435 else: 436 query = ("SELECT r.rolname AS pubowner, p.puballtables, p.pubinsert, " 437 "p.pubupdate , p.pubdelete FROM pg_publication AS p " 438 "JOIN pg_catalog.pg_roles AS r " 439 "ON p.pubowner = r.oid " 440 "WHERE p.pubname = '%s'" % self.name) 441 442 result = exec_sql(self, query, add_to_executed=False) 443 if result: 444 return result[0] 445 else: 446 return False 447 448 def __get_tables_pub_info(self): 449 """Get and return tables that are published by the publication. 450 451 Returns: 452 List of dicts with published tables. 453 """ 454 query = ("SELECT schemaname, tablename " 455 "FROM pg_publication_tables WHERE pubname = '%s'" % self.name) 456 return exec_sql(self, query, add_to_executed=False) 457 458 def __pub_add_table(self, table, check_mode=False): 459 """Add a table to the publication. 460 461 Args: 462 table (str): Table name. 463 464 Kwargs: 465 check_mode (bool): If True, don't actually change anything, 466 just make SQL, add it to ``self.executed_queries`` and return True. 467 468 Returns: 469 True if successful, False otherwise. 470 """ 471 query = ("ALTER PUBLICATION %s ADD TABLE %s" % (pg_quote_identifier(self.name, 'publication'), 472 pg_quote_identifier(table, 'table'))) 473 return self.__exec_sql(query, check_mode=check_mode) 474 475 def __pub_drop_table(self, table, check_mode=False): 476 """Drop a table from the publication. 477 478 Args: 479 table (str): Table name. 480 481 Kwargs: 482 check_mode (bool): If True, don't actually change anything, 483 just make SQL, add it to ``self.executed_queries`` and return True. 484 485 Returns: 486 True if successful, False otherwise. 487 """ 488 query = ("ALTER PUBLICATION %s DROP TABLE %s" % (pg_quote_identifier(self.name, 'publication'), 489 pg_quote_identifier(table, 'table'))) 490 return self.__exec_sql(query, check_mode=check_mode) 491 492 def __pub_set_tables(self, tables, check_mode=False): 493 """Set a table suit that need to be published by the publication. 494 495 Args: 496 tables (list): List of tables. 497 498 Kwargs: 499 check_mode (bool): If True, don't actually change anything, 500 just make SQL, add it to ``self.executed_queries`` and return True. 501 502 Returns: 503 True if successful, False otherwise. 504 """ 505 quoted_tables = [pg_quote_identifier(t, 'table') for t in tables] 506 query = ("ALTER PUBLICATION %s SET TABLE %s" % (pg_quote_identifier(self.name, 'publication'), 507 ', '.join(quoted_tables))) 508 return self.__exec_sql(query, check_mode=check_mode) 509 510 def __pub_set_param(self, param, value, check_mode=False): 511 """Set an optional publication parameter. 512 513 Args: 514 param (str): Name of the parameter. 515 value (str): Parameter value. 516 517 Kwargs: 518 check_mode (bool): If True, don't actually change anything, 519 just make SQL, add it to ``self.executed_queries`` and return True. 520 521 Returns: 522 True if successful, False otherwise. 523 """ 524 query = ("ALTER PUBLICATION %s SET (%s = '%s')" % (pg_quote_identifier(self.name, 'publication'), 525 param, value)) 526 return self.__exec_sql(query, check_mode=check_mode) 527 528 def __pub_set_owner(self, role, check_mode=False): 529 """Set a publication owner. 530 531 Args: 532 role (str): Role (user) name that needs to be set as a publication owner. 533 534 Kwargs: 535 check_mode (bool): If True, don't actually change anything, 536 just make SQL, add it to ``self.executed_queries`` and return True. 537 538 Returns: 539 True if successful, False otherwise. 540 """ 541 query = ("ALTER PUBLICATION %s OWNER TO %s" % (pg_quote_identifier(self.name, 'publication'), 542 pg_quote_identifier(role, 'role'))) 543 return self.__exec_sql(query, check_mode=check_mode) 544 545 def __exec_sql(self, query, check_mode=False): 546 """Execute SQL query. 547 548 Note: If we need just to get information from the database, 549 we use ``exec_sql`` function directly. 550 551 Args: 552 query (str): Query that needs to be executed. 553 554 Kwargs: 555 check_mode (bool): If True, don't actually change anything, 556 just add ``query`` to ``self.executed_queries`` and return True. 557 558 Returns: 559 True if successful, False otherwise. 560 """ 561 if check_mode: 562 self.executed_queries.append(query) 563 return True 564 else: 565 return exec_sql(self, query, ddl=True) 566 567 568# =========================================== 569# Module execution. 570# 571 572 573def main(): 574 argument_spec = postgres_common_argument_spec() 575 argument_spec.update( 576 name=dict(required=True), 577 db=dict(type='str', aliases=['login_db']), 578 state=dict(type='str', default='present', choices=['absent', 'present']), 579 tables=dict(type='list'), 580 parameters=dict(type='dict'), 581 owner=dict(type='str'), 582 cascade=dict(type='bool', default=False), 583 ) 584 module = AnsibleModule( 585 argument_spec=argument_spec, 586 supports_check_mode=True, 587 ) 588 589 # Parameters handling: 590 name = module.params['name'] 591 state = module.params['state'] 592 tables = module.params['tables'] 593 params = module.params['parameters'] 594 owner = module.params['owner'] 595 cascade = module.params['cascade'] 596 597 if state == 'absent': 598 if tables: 599 module.warn('parameter "tables" is ignored when "state=absent"') 600 if params: 601 module.warn('parameter "parameters" is ignored when "state=absent"') 602 if owner: 603 module.warn('parameter "owner" is ignored when "state=absent"') 604 605 if state == 'present' and cascade: 606 module.warn('parameter "cascade" is ignored when "state=present"') 607 608 # Connect to DB and make cursor object: 609 conn_params = get_conn_params(module, module.params) 610 # We check publication state without DML queries execution, so set autocommit: 611 db_connection = connect_to_db(module, conn_params, autocommit=True) 612 cursor = db_connection.cursor(cursor_factory=DictCursor) 613 614 # Check version: 615 if cursor.connection.server_version < SUPPORTED_PG_VERSION: 616 module.fail_json(msg="PostgreSQL server version should be 10.0 or greater") 617 618 # Nothing was changed by default: 619 changed = False 620 621 ################################### 622 # Create object and do rock'n'roll: 623 publication = PgPublication(module, cursor, name) 624 625 if tables: 626 tables = transform_tables_representation(tables) 627 628 # If module.check_mode=True, nothing will be changed: 629 if state == 'present': 630 if not publication.exists: 631 changed = publication.create(tables, params, owner, check_mode=module.check_mode) 632 633 else: 634 changed = publication.update(tables, params, owner, check_mode=module.check_mode) 635 636 elif state == 'absent': 637 changed = publication.drop(cascade=cascade, check_mode=module.check_mode) 638 639 # Get final publication info: 640 pub_fin_info = {} 641 if state == 'present' or (state == 'absent' and module.check_mode): 642 pub_fin_info = publication.get_info() 643 elif state == 'absent' and not module.check_mode: 644 publication.exists = False 645 646 # Connection is not needed any more: 647 cursor.close() 648 db_connection.close() 649 650 # Update publication info and return ret values: 651 module.exit_json(changed=changed, queries=publication.executed_queries, exists=publication.exists, **pub_fin_info) 652 653 654if __name__ == '__main__': 655 main() 656