1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3
4# Copyright: (c) 2019, Loic Blot (@nerzhul) <loic.blot@unix-experience.fr>
5# Copyright: (c) 2019, Andrew Klychkov (@Andersson007) <aaklychkov@mail.ru>
6# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
7
8from __future__ import absolute_import, division, print_function
9__metaclass__ = type
10
11
12ANSIBLE_METADATA = {'metadata_version': '1.1',
13                    'status': ['preview'],
14                    'supported_by': 'community'}
15
16
17DOCUMENTATION = r'''
18---
19module: postgresql_publication
20short_description: Add, update, or remove PostgreSQL publication
21description:
22- Add, update, or remove PostgreSQL publication.
23version_added: "2.9"
24options:
25  name:
26    description:
27    - Name of the publication to add, update, or remove.
28    required: true
29    type: str
30  db:
31    description:
32    - Name of the database to connect to and where
33      the publication state will be changed.
34    aliases: [ login_db ]
35    type: str
36  tables:
37    description:
38    - List of tables to add to the publication.
39    - If no value is set all tables are targeted.
40    - If the publication already exists for specific tables and I(tables) is not passed,
41      nothing will be changed. If you need to add all tables to the publication with the same name,
42      drop existent and create new without passing I(tables).
43    type: list
44    elements: str
45  state:
46    description:
47    - The publication state.
48    default: present
49    choices: [ absent, present ]
50    type: str
51  parameters:
52    description:
53    - Dictionary with optional publication parameters.
54    - Available parameters depend on PostgreSQL version.
55    type: dict
56  owner:
57    description:
58    - Publication owner.
59    - If I(owner) is not defined, the owner will be set as I(login_user) or I(session_role).
60    type: str
61  cascade:
62    description:
63    - Drop publication dependencies. Has effect with I(state=absent) only.
64    type: bool
65    default: false
66notes:
67- PostgreSQL version must be 10 or greater.
68seealso:
69- name: CREATE PUBLICATION reference
70  description: Complete reference of the CREATE PUBLICATION command documentation.
71  link: https://www.postgresql.org/docs/current/sql-createpublication.html
72- name: ALTER PUBLICATION reference
73  description: Complete reference of the ALTER PUBLICATION command documentation.
74  link: https://www.postgresql.org/docs/current/sql-alterpublication.html
75- name: DROP PUBLICATION reference
76  description: Complete reference of the DROP PUBLICATION command documentation.
77  link: https://www.postgresql.org/docs/current/sql-droppublication.html
78author:
79- Loic Blot (@nerzhul) <loic.blot@unix-experience.fr>
80- Andrew Klychkov (@Andersson007) <aaklychkov@mail.ru>
81extends_documentation_fragment:
82- postgres
83'''
84
85EXAMPLES = r'''
86- name: Create a new publication with name "acme" targeting all tables in database "test".
87  postgresql_publication:
88    db: test
89    name: acme
90
91- name: Create publication "acme" publishing only prices and vehicles tables.
92  postgresql_publication:
93    name: acme
94    tables:
95    - prices
96    - vehicles
97
98- name: >
99    Create publication "acme", set user alice as an owner, targeting all tables.
100    Allowable DML operations are INSERT and UPDATE only
101  postgresql_publication:
102    name: acme
103    owner: alice
104    parameters:
105      publish: 'insert,update'
106
107- name: >
108    Assuming publication "acme" exists and there are targeted
109    tables "prices" and "vehicles", add table "stores" to the publication.
110  postgresql_publication:
111    name: acme
112    tables:
113    - prices
114    - vehicles
115    - stores
116
117- name: Remove publication "acme" if exists in database "test".
118  postgresql_publication:
119    db: test
120    name: acme
121    state: absent
122'''
123
124RETURN = r'''
125exists:
126  description:
127  - Flag indicates the publication exists or not at the end of runtime.
128  returned: always
129  type: bool
130  sample: true
131queries:
132  description: List of executed queries.
133  returned: always
134  type: str
135  sample: [ 'DROP PUBLICATION "acme" CASCADE' ]
136owner:
137  description: Owner of the publication at the end of runtime.
138  returned: if publication exists
139  type: str
140  sample: "alice"
141tables:
142  description:
143  - List of tables in the publication at the end of runtime.
144  - If all tables are published, returns empty list.
145  returned: if publication exists
146  type: list
147  sample: ["\"public\".\"prices\"", "\"public\".\"vehicles\""]
148alltables:
149  description:
150  - Flag indicates that all tables are published.
151  returned: if publication exists
152  type: bool
153  sample: false
154parameters:
155  description: Publication parameters at the end of runtime.
156  returned: if publication exists
157  type: dict
158  sample: {'publish': {'insert': false, 'delete': false, 'update': true}}
159'''
160
161
162try:
163    from psycopg2.extras import DictCursor
164except ImportError:
165    # psycopg2 is checked by connect_to_db()
166    # from ansible.module_utils.postgres
167    pass
168
169from ansible.module_utils.basic import AnsibleModule
170from ansible.module_utils.database import pg_quote_identifier
171from ansible.module_utils.postgres import (
172    connect_to_db,
173    exec_sql,
174    get_conn_params,
175    postgres_common_argument_spec,
176)
177from ansible.module_utils.six import iteritems
178
179SUPPORTED_PG_VERSION = 10000
180
181
182################################
183# Module functions and classes #
184################################
185
186def transform_tables_representation(tbl_list):
187    """Add 'public.' to names of tables where a schema identifier is absent
188    and add quotes to each element.
189
190    Args:
191        tbl_list (list): List of table names.
192
193    Returns:
194        tbl_list (list): Changed list.
195    """
196    for i, table in enumerate(tbl_list):
197        if '.' not in table:
198            tbl_list[i] = pg_quote_identifier('public.%s' % table.strip(), 'table')
199        else:
200            tbl_list[i] = pg_quote_identifier(table.strip(), 'table')
201
202    return tbl_list
203
204
205class PgPublication():
206    """Class to work with PostgreSQL publication.
207
208    Args:
209        module (AnsibleModule): Object of AnsibleModule class.
210        cursor (cursor): Cursor object of psycopg2 library to work with PostgreSQL.
211        name (str): The name of the publication.
212
213    Attributes:
214        module (AnsibleModule): Object of AnsibleModule class.
215        cursor (cursor): Cursor object of psycopg2 library to work with PostgreSQL.
216        name (str): Name of the publication.
217        executed_queries (list): List of executed queries.
218        attrs (dict): Dict with publication attributes.
219        exists (bool): Flag indicates the publication exists or not.
220    """
221
222    def __init__(self, module, cursor, name):
223        self.module = module
224        self.cursor = cursor
225        self.name = name
226        self.executed_queries = []
227        self.attrs = {
228            'alltables': False,
229            'tables': [],
230            'parameters': {},
231            'owner': '',
232        }
233        self.exists = self.check_pub()
234
235    def get_info(self):
236        """Refresh the publication information.
237
238        Returns:
239            ``self.attrs``.
240        """
241        self.exists = self.check_pub()
242        return self.attrs
243
244    def check_pub(self):
245        """Check the publication and refresh ``self.attrs`` publication attribute.
246
247        Returns:
248            True if the publication with ``self.name`` exists, False otherwise.
249        """
250
251        pub_info = self.__get_general_pub_info()
252
253        if not pub_info:
254            # Publication does not exist:
255            return False
256
257        self.attrs['owner'] = pub_info.get('pubowner')
258
259        # Publication DML operations:
260        self.attrs['parameters']['publish'] = {}
261        self.attrs['parameters']['publish']['insert'] = pub_info.get('pubinsert', False)
262        self.attrs['parameters']['publish']['update'] = pub_info.get('pubupdate', False)
263        self.attrs['parameters']['publish']['delete'] = pub_info.get('pubdelete', False)
264        if pub_info.get('pubtruncate'):
265            self.attrs['parameters']['publish']['truncate'] = pub_info.get('pubtruncate')
266
267        # If alltables flag is False, get the list of targeted tables:
268        if not pub_info.get('puballtables'):
269            table_info = self.__get_tables_pub_info()
270            # Join sublists [['schema', 'table'], ...] to ['schema.table', ...]
271            # for better representation:
272            for i, schema_and_table in enumerate(table_info):
273                table_info[i] = pg_quote_identifier('.'.join(schema_and_table), 'table')
274
275            self.attrs['tables'] = table_info
276        else:
277            self.attrs['alltables'] = True
278
279        # Publication exists:
280        return True
281
282    def create(self, tables, params, owner, check_mode=True):
283        """Create the publication.
284
285        Args:
286            tables (list): List with names of the tables that need to be added to the publication.
287            params (dict): Dict contains optional publication parameters and their values.
288            owner (str): Name of the publication owner.
289
290        Kwargs:
291            check_mode (bool): If True, don't actually change anything,
292                just make SQL, add it to ``self.executed_queries`` and return True.
293
294        Returns:
295            changed (bool): True if publication has been created, otherwise False.
296        """
297        changed = True
298
299        query_fragments = ["CREATE PUBLICATION %s" % pg_quote_identifier(self.name, 'publication')]
300
301        if tables:
302            query_fragments.append("FOR TABLE %s" % ', '.join(tables))
303        else:
304            query_fragments.append("FOR ALL TABLES")
305
306        if params:
307            params_list = []
308            # Make list ["param = 'value'", ...] from params dict:
309            for (key, val) in iteritems(params):
310                params_list.append("%s = '%s'" % (key, val))
311
312            # Add the list to query_fragments:
313            query_fragments.append("WITH (%s)" % ', '.join(params_list))
314
315        changed = self.__exec_sql(' '.join(query_fragments), check_mode=check_mode)
316
317        if owner:
318            # If check_mode, just add possible SQL to
319            # executed_queries and return:
320            self.__pub_set_owner(owner, check_mode=check_mode)
321
322        return changed
323
324    def update(self, tables, params, owner, check_mode=True):
325        """Update the publication.
326
327        Args:
328            tables (list): List with names of the tables that need to be presented in the publication.
329            params (dict): Dict contains optional publication parameters and their values.
330            owner (str): Name of the publication owner.
331
332        Kwargs:
333            check_mode (bool): If True, don't actually change anything,
334                just make SQL, add it to ``self.executed_queries`` and return True.
335
336        Returns:
337            changed (bool): True if publication has been updated, otherwise False.
338        """
339        changed = False
340
341        # Add or drop tables from published tables suit:
342        if tables and not self.attrs['alltables']:
343
344            # 1. If needs to add table to the publication:
345            for tbl in tables:
346                if tbl not in self.attrs['tables']:
347                    # If needs to add table to the publication:
348                    changed = self.__pub_add_table(tbl, check_mode=check_mode)
349
350            # 2. if there is a table in targeted tables
351            # that's not presented in the passed tables:
352            for tbl in self.attrs['tables']:
353                if tbl not in tables:
354                    changed = self.__pub_drop_table(tbl, check_mode=check_mode)
355
356        elif tables and self.attrs['alltables']:
357            changed = self.__pub_set_tables(tables, check_mode=check_mode)
358
359        # Update pub parameters:
360        if params:
361            for key, val in iteritems(params):
362                if self.attrs['parameters'].get(key):
363
364                    # In PostgreSQL 10/11 only 'publish' optional parameter is presented.
365                    if key == 'publish':
366                        # 'publish' value can be only a string with comma-separated items
367                        # of allowed DML operations like 'insert,update' or
368                        # 'insert,update,delete', etc.
369                        # Make dictionary to compare with current attrs later:
370                        val_dict = self.attrs['parameters']['publish'].copy()
371                        val_list = val.split(',')
372                        for v in val_dict:
373                            if v in val_list:
374                                val_dict[v] = True
375                            else:
376                                val_dict[v] = False
377
378                        # Compare val_dict and the dict with current 'publish' parameters,
379                        # if they're different, set new values:
380                        if val_dict != self.attrs['parameters']['publish']:
381                            changed = self.__pub_set_param(key, val, check_mode=check_mode)
382
383                    # Default behavior for other cases:
384                    elif self.attrs['parameters'][key] != val:
385                        changed = self.__pub_set_param(key, val, check_mode=check_mode)
386
387                else:
388                    # If the parameter was not set before:
389                    changed = self.__pub_set_param(key, val, check_mode=check_mode)
390
391        # Update pub owner:
392        if owner:
393            if owner != self.attrs['owner']:
394                changed = self.__pub_set_owner(owner, check_mode=check_mode)
395
396        return changed
397
398    def drop(self, cascade=False, check_mode=True):
399        """Drop the publication.
400
401        Kwargs:
402            cascade (bool): Flag indicates that publication needs to be deleted
403                with its dependencies.
404            check_mode (bool): If True, don't actually change anything,
405                just make SQL, add it to ``self.executed_queries`` and return True.
406
407        Returns:
408            changed (bool): True if publication has been updated, otherwise False.
409        """
410        if self.exists:
411            query_fragments = []
412            query_fragments.append("DROP PUBLICATION %s" % pg_quote_identifier(self.name, 'publication'))
413            if cascade:
414                query_fragments.append("CASCADE")
415
416            return self.__exec_sql(' '.join(query_fragments), check_mode=check_mode)
417
418    def __get_general_pub_info(self):
419        """Get and return general publication information.
420
421        Returns:
422            Dict with publication information if successful, False otherwise.
423        """
424        # Check pg_publication.pubtruncate exists (supported from PostgreSQL 11):
425        pgtrunc_sup = exec_sql(self, ("SELECT 1 FROM information_schema.columns "
426                                      "WHERE table_name = 'pg_publication' "
427                                      "AND column_name = 'pubtruncate'"), add_to_executed=False)
428
429        if pgtrunc_sup:
430            query = ("SELECT r.rolname AS pubowner, p.puballtables, p.pubinsert, "
431                     "p.pubupdate , p.pubdelete, p.pubtruncate FROM pg_publication AS p "
432                     "JOIN pg_catalog.pg_roles AS r "
433                     "ON p.pubowner = r.oid "
434                     "WHERE p.pubname = '%s'" % self.name)
435        else:
436            query = ("SELECT r.rolname AS pubowner, p.puballtables, p.pubinsert, "
437                     "p.pubupdate , p.pubdelete FROM pg_publication AS p "
438                     "JOIN pg_catalog.pg_roles AS r "
439                     "ON p.pubowner = r.oid "
440                     "WHERE p.pubname = '%s'" % self.name)
441
442        result = exec_sql(self, query, add_to_executed=False)
443        if result:
444            return result[0]
445        else:
446            return False
447
448    def __get_tables_pub_info(self):
449        """Get and return tables that are published by the publication.
450
451        Returns:
452            List of dicts with published tables.
453        """
454        query = ("SELECT schemaname, tablename "
455                 "FROM pg_publication_tables WHERE pubname = '%s'" % self.name)
456        return exec_sql(self, query, add_to_executed=False)
457
458    def __pub_add_table(self, table, check_mode=False):
459        """Add a table to the publication.
460
461        Args:
462            table (str): Table name.
463
464        Kwargs:
465            check_mode (bool): If True, don't actually change anything,
466                just make SQL, add it to ``self.executed_queries`` and return True.
467
468        Returns:
469            True if successful, False otherwise.
470        """
471        query = ("ALTER PUBLICATION %s ADD TABLE %s" % (pg_quote_identifier(self.name, 'publication'),
472                                                        pg_quote_identifier(table, 'table')))
473        return self.__exec_sql(query, check_mode=check_mode)
474
475    def __pub_drop_table(self, table, check_mode=False):
476        """Drop a table from the publication.
477
478        Args:
479            table (str): Table name.
480
481        Kwargs:
482            check_mode (bool): If True, don't actually change anything,
483                just make SQL, add it to ``self.executed_queries`` and return True.
484
485        Returns:
486            True if successful, False otherwise.
487        """
488        query = ("ALTER PUBLICATION %s DROP TABLE %s" % (pg_quote_identifier(self.name, 'publication'),
489                                                         pg_quote_identifier(table, 'table')))
490        return self.__exec_sql(query, check_mode=check_mode)
491
492    def __pub_set_tables(self, tables, check_mode=False):
493        """Set a table suit that need to be published by the publication.
494
495        Args:
496            tables (list): List of tables.
497
498        Kwargs:
499            check_mode (bool): If True, don't actually change anything,
500                just make SQL, add it to ``self.executed_queries`` and return True.
501
502        Returns:
503            True if successful, False otherwise.
504        """
505        quoted_tables = [pg_quote_identifier(t, 'table') for t in tables]
506        query = ("ALTER PUBLICATION %s SET TABLE %s" % (pg_quote_identifier(self.name, 'publication'),
507                                                        ', '.join(quoted_tables)))
508        return self.__exec_sql(query, check_mode=check_mode)
509
510    def __pub_set_param(self, param, value, check_mode=False):
511        """Set an optional publication parameter.
512
513        Args:
514            param (str): Name of the parameter.
515            value (str): Parameter value.
516
517        Kwargs:
518            check_mode (bool): If True, don't actually change anything,
519                just make SQL, add it to ``self.executed_queries`` and return True.
520
521        Returns:
522            True if successful, False otherwise.
523        """
524        query = ("ALTER PUBLICATION %s SET (%s = '%s')" % (pg_quote_identifier(self.name, 'publication'),
525                                                           param, value))
526        return self.__exec_sql(query, check_mode=check_mode)
527
528    def __pub_set_owner(self, role, check_mode=False):
529        """Set a publication owner.
530
531        Args:
532            role (str): Role (user) name that needs to be set as a publication owner.
533
534        Kwargs:
535            check_mode (bool): If True, don't actually change anything,
536                just make SQL, add it to ``self.executed_queries`` and return True.
537
538        Returns:
539            True if successful, False otherwise.
540        """
541        query = ("ALTER PUBLICATION %s OWNER TO %s" % (pg_quote_identifier(self.name, 'publication'),
542                                                       pg_quote_identifier(role, 'role')))
543        return self.__exec_sql(query, check_mode=check_mode)
544
545    def __exec_sql(self, query, check_mode=False):
546        """Execute SQL query.
547
548        Note: If we need just to get information from the database,
549            we use ``exec_sql`` function directly.
550
551        Args:
552            query (str): Query that needs to be executed.
553
554        Kwargs:
555            check_mode (bool): If True, don't actually change anything,
556                just add ``query`` to ``self.executed_queries`` and return True.
557
558        Returns:
559            True if successful, False otherwise.
560        """
561        if check_mode:
562            self.executed_queries.append(query)
563            return True
564        else:
565            return exec_sql(self, query, ddl=True)
566
567
568# ===========================================
569# Module execution.
570#
571
572
573def main():
574    argument_spec = postgres_common_argument_spec()
575    argument_spec.update(
576        name=dict(required=True),
577        db=dict(type='str', aliases=['login_db']),
578        state=dict(type='str', default='present', choices=['absent', 'present']),
579        tables=dict(type='list'),
580        parameters=dict(type='dict'),
581        owner=dict(type='str'),
582        cascade=dict(type='bool', default=False),
583    )
584    module = AnsibleModule(
585        argument_spec=argument_spec,
586        supports_check_mode=True,
587    )
588
589    # Parameters handling:
590    name = module.params['name']
591    state = module.params['state']
592    tables = module.params['tables']
593    params = module.params['parameters']
594    owner = module.params['owner']
595    cascade = module.params['cascade']
596
597    if state == 'absent':
598        if tables:
599            module.warn('parameter "tables" is ignored when "state=absent"')
600        if params:
601            module.warn('parameter "parameters" is ignored when "state=absent"')
602        if owner:
603            module.warn('parameter "owner" is ignored when "state=absent"')
604
605    if state == 'present' and cascade:
606        module.warn('parameter "cascade" is ignored when "state=present"')
607
608    # Connect to DB and make cursor object:
609    conn_params = get_conn_params(module, module.params)
610    # We check publication state without DML queries execution, so set autocommit:
611    db_connection = connect_to_db(module, conn_params, autocommit=True)
612    cursor = db_connection.cursor(cursor_factory=DictCursor)
613
614    # Check version:
615    if cursor.connection.server_version < SUPPORTED_PG_VERSION:
616        module.fail_json(msg="PostgreSQL server version should be 10.0 or greater")
617
618    # Nothing was changed by default:
619    changed = False
620
621    ###################################
622    # Create object and do rock'n'roll:
623    publication = PgPublication(module, cursor, name)
624
625    if tables:
626        tables = transform_tables_representation(tables)
627
628    # If module.check_mode=True, nothing will be changed:
629    if state == 'present':
630        if not publication.exists:
631            changed = publication.create(tables, params, owner, check_mode=module.check_mode)
632
633        else:
634            changed = publication.update(tables, params, owner, check_mode=module.check_mode)
635
636    elif state == 'absent':
637        changed = publication.drop(cascade=cascade, check_mode=module.check_mode)
638
639    # Get final publication info:
640    pub_fin_info = {}
641    if state == 'present' or (state == 'absent' and module.check_mode):
642        pub_fin_info = publication.get_info()
643    elif state == 'absent' and not module.check_mode:
644        publication.exists = False
645
646    # Connection is not needed any more:
647    cursor.close()
648    db_connection.close()
649
650    # Update publication info and return ret values:
651    module.exit_json(changed=changed, queries=publication.executed_queries, exists=publication.exists, **pub_fin_info)
652
653
654if __name__ == '__main__':
655    main()
656