1#!/usr/local/bin/python3.8
2# -*- coding: utf-8 -*-
3#
4# (c) 2018, Ryan Conway (@rylon)
5# (c) 2018, Scott Buchanan <sbuchanan@ri.pn> (onepassword.py used as starting point)
6# (c) 2018, Ansible Project
7# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
8
9
10from __future__ import (absolute_import, division, print_function)
11__metaclass__ = type
12
13
14DOCUMENTATION = '''
15module: onepassword_info
16author:
17    - Ryan Conway (@Rylon)
18requirements:
19    - C(op) 1Password command line utility. See U(https://support.1password.com/command-line/)
20notes:
21    - Tested with C(op) version 0.5.5
22    - "Based on the C(onepassword) lookup plugin by Scott Buchanan <sbuchanan@ri.pn>."
23short_description: Gather items from 1Password
24description:
25    - M(community.general.onepassword_info) wraps the C(op) command line utility to fetch data about one or more 1Password items.
26    - A fatal error occurs if any of the items being searched for can not be found.
27    - Recommend using with the C(no_log) option to avoid logging the values of the secrets being retrieved.
28    - This module was called C(onepassword_facts) before Ansible 2.9, returning C(ansible_facts).
29      Note that the M(community.general.onepassword_info) module no longer returns C(ansible_facts)!
30      You must now use the C(register) option to use the facts in other tasks.
31options:
32    search_terms:
33        type: list
34        elements: dict
35        description:
36            - A list of one or more search terms.
37            - Each search term can either be a simple string or it can be a dictionary for more control.
38            - When passing a simple string, I(field) is assumed to be C(password).
39            - When passing a dictionary, the following fields are available.
40        suboptions:
41            name:
42                type: str
43                description:
44                    - The name of the 1Password item to search for (required).
45            field:
46                type: str
47                description:
48                    - The name of the field to search for within this item (optional, defaults to "password" (or "document" if the item has an attachment).
49            section:
50                type: str
51                description:
52                    - The name of a section within this item containing the specified field (optional, will search all sections if not specified).
53            vault:
54                type: str
55                description:
56                    - The name of the particular 1Password vault to search, useful if your 1Password user has access to multiple vaults (optional).
57        required: True
58    auto_login:
59        type: dict
60        description:
61            - A dictionary containing authentication details. If this is set, M(community.general.onepassword_info)
62              will attempt to sign in to 1Password automatically.
63            - Without this option, you must have already logged in via the 1Password CLI before running Ansible.
64            - It is B(highly) recommended to store 1Password credentials in an Ansible Vault. Ensure that the key used to encrypt
65              the Ansible Vault is equal to or greater in strength than the 1Password master password.
66        suboptions:
67            subdomain:
68                type: str
69                description:
70                    - 1Password subdomain name (<subdomain>.1password.com).
71                    - If this is not specified, the most recent subdomain will be used.
72            username:
73                type: str
74                description:
75                    - 1Password username.
76                    - Only required for initial sign in.
77            master_password:
78                type: str
79                description:
80                    - The master password for your subdomain.
81                    - This is always required when specifying C(auto_login).
82                required: True
83            secret_key:
84                type: str
85                description:
86                    - The secret key for your subdomain.
87                    - Only required for initial sign in.
88        default: {}
89        required: False
90    cli_path:
91        type: path
92        description: Used to specify the exact path to the C(op) command line interface
93        required: False
94        default: 'op'
95'''
96
97EXAMPLES = '''
98# Gather secrets from 1Password, assuming there is a 'password' field:
99- name: Get a password
100  community.general.onepassword_info:
101    search_terms: My 1Password item
102  delegate_to: localhost
103  register: my_1password_item
104  no_log: true         # Don't want to log the secrets to the console!
105
106# Gather secrets from 1Password, with more advanced search terms:
107- name: Get a password
108  community.general.onepassword_info:
109    search_terms:
110      - name:    My 1Password item
111        field:   Custom field name       # optional, defaults to 'password'
112        section: Custom section name     # optional, defaults to 'None'
113        vault:   Name of the vault       # optional, only necessary if there is more than 1 Vault available
114  delegate_to: localhost
115  register: my_1password_item
116  no_log: True                           # Don't want to log the secrets to the console!
117
118# Gather secrets combining simple and advanced search terms to retrieve two items, one of which we fetch two
119# fields. In the first 'password' is fetched, as a field name is not specified (default behaviour) and in the
120# second, 'Custom field name' is fetched, as that is specified explicitly.
121- name: Get a password
122  community.general.onepassword_info:
123    search_terms:
124      - My 1Password item                # 'name' is optional when passing a simple string...
125      - name: My Other 1Password item    # ...but it can also be set for consistency
126      - name:    My 1Password item
127        field:   Custom field name       # optional, defaults to 'password'
128        section: Custom section name     # optional, defaults to 'None'
129        vault:   Name of the vault       # optional, only necessary if there is more than 1 Vault available
130      - name: A 1Password item with document attachment
131  delegate_to: localhost
132  register: my_1password_item
133  no_log: true                           # Don't want to log the secrets to the console!
134
135- name: Debug a password (for example)
136  ansible.builtin.debug:
137    msg: "{{ my_1password_item['onepassword']['My 1Password item'] }}"
138'''
139
140RETURN = '''
141---
142# One or more dictionaries for each matching item from 1Password, along with the appropriate fields.
143# This shows the response you would expect to receive from the third example documented above.
144onepassword:
145    description: Dictionary of each 1password item matching the given search terms, shows what would be returned from the third example above.
146    returned: success
147    type: dict
148    sample:
149        "My 1Password item":
150            password: the value of this field
151            Custom field name: the value of this field
152        "My Other 1Password item":
153            password: the value of this field
154        "A 1Password item with document attachment":
155            document: the contents of the document attached to this item
156'''
157
158
159import errno
160import json
161import os
162import re
163
164from subprocess import Popen, PIPE
165
166from ansible.module_utils.common.text.converters import to_bytes, to_native
167from ansible.module_utils.basic import AnsibleModule
168
169
170class AnsibleModuleError(Exception):
171    def __init__(self, results):
172        self.results = results
173
174    def __repr__(self):
175        return self.results
176
177
178class OnePasswordInfo(object):
179
180    def __init__(self):
181        self.cli_path = module.params.get('cli_path')
182        self.config_file_path = '~/.op/config'
183        self.auto_login = module.params.get('auto_login')
184        self.logged_in = False
185        self.token = None
186
187        terms = module.params.get('search_terms')
188        self.terms = self.parse_search_terms(terms)
189
190    def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
191        if self.token:
192            # Adds the session token to all commands if we're logged in.
193            args += [to_bytes('--session=') + self.token]
194
195        command = [self.cli_path] + args
196        p = Popen(command, stdout=PIPE, stderr=PIPE, stdin=PIPE)
197        out, err = p.communicate(input=command_input)
198        rc = p.wait()
199        if not ignore_errors and rc != expected_rc:
200            raise AnsibleModuleError(to_native(err))
201        return rc, out, err
202
203    def _parse_field(self, data_json, item_id, field_name, section_title=None):
204        data = json.loads(data_json)
205
206        if ('documentAttributes' in data['details']):
207            # This is actually a document, let's fetch the document data instead!
208            document = self._run(["get", "document", data['overview']['title']])
209            return {'document': document[1].strip()}
210
211        else:
212            # This is not a document, let's try to find the requested field
213
214            # Some types of 1Password items have a 'password' field directly alongside the 'fields' attribute,
215            # not inside it, so we need to check there first.
216            if (field_name in data['details']):
217                return {field_name: data['details'][field_name]}
218
219            # Otherwise we continue looking inside the 'fields' attribute for the specified field.
220            else:
221                if section_title is None:
222                    for field_data in data['details'].get('fields', []):
223                        if field_data.get('name', '').lower() == field_name.lower():
224                            return {field_name: field_data.get('value', '')}
225
226                # Not found it yet, so now lets see if there are any sections defined
227                # and search through those for the field. If a section was given, we skip
228                # any non-matching sections, otherwise we search them all until we find the field.
229                for section_data in data['details'].get('sections', []):
230                    if section_title is not None and section_title.lower() != section_data['title'].lower():
231                        continue
232                    for field_data in section_data.get('fields', []):
233                        if field_data.get('t', '').lower() == field_name.lower():
234                            return {field_name: field_data.get('v', '')}
235
236        # We will get here if the field could not be found in any section and the item wasn't a document to be downloaded.
237        optional_section_title = '' if section_title is None else " in the section '%s'" % section_title
238        module.fail_json(msg="Unable to find an item in 1Password named '%s' with the field '%s'%s." % (item_id, field_name, optional_section_title))
239
240    def parse_search_terms(self, terms):
241        processed_terms = []
242
243        for term in terms:
244            if not isinstance(term, dict):
245                term = {'name': term}
246
247            if 'name' not in term:
248                module.fail_json(msg="Missing required 'name' field from search term, got: '%s'" % to_native(term))
249
250            term['field'] = term.get('field', 'password')
251            term['section'] = term.get('section', None)
252            term['vault'] = term.get('vault', None)
253
254            processed_terms.append(term)
255
256        return processed_terms
257
258    def get_raw(self, item_id, vault=None):
259        try:
260            args = ["get", "item", item_id]
261            if vault is not None:
262                args += ['--vault={0}'.format(vault)]
263            rc, output, dummy = self._run(args)
264            return output
265
266        except Exception as e:
267            if re.search(".*not found.*", to_native(e)):
268                module.fail_json(msg="Unable to find an item in 1Password named '%s'." % item_id)
269            else:
270                module.fail_json(msg="Unexpected error attempting to find an item in 1Password named '%s': %s" % (item_id, to_native(e)))
271
272    def get_field(self, item_id, field, section=None, vault=None):
273        output = self.get_raw(item_id, vault)
274        return self._parse_field(output, item_id, field, section) if output != '' else ''
275
276    def full_login(self):
277        if self.auto_login is not None:
278            if None in [self.auto_login.get('subdomain'), self.auto_login.get('username'),
279                        self.auto_login.get('secret_key'), self.auto_login.get('master_password')]:
280                module.fail_json(msg='Unable to perform initial sign in to 1Password. '
281                                     'subdomain, username, secret_key, and master_password are required to perform initial sign in.')
282
283            args = [
284                'signin',
285                '{0}.1password.com'.format(self.auto_login['subdomain']),
286                to_bytes(self.auto_login['username']),
287                to_bytes(self.auto_login['secret_key']),
288                '--output=raw',
289            ]
290
291            try:
292                rc, out, err = self._run(args, command_input=to_bytes(self.auto_login['master_password']))
293                self.token = out.strip()
294            except AnsibleModuleError as e:
295                module.fail_json(msg="Failed to perform initial sign in to 1Password: %s" % to_native(e))
296        else:
297            module.fail_json(msg="Unable to perform an initial sign in to 1Password. Please run '%s sigin' "
298                                 "or define credentials in 'auto_login'. See the module documentation for details." % self.cli_path)
299
300    def get_token(self):
301        # If the config file exists, assume an initial signin has taken place and try basic sign in
302        if os.path.isfile(self.config_file_path):
303
304            if self.auto_login is not None:
305
306                # Since we are not currently signed in, master_password is required at a minimum
307                if not self.auto_login.get('master_password'):
308                    module.fail_json(msg="Unable to sign in to 1Password. 'auto_login.master_password' is required.")
309
310                # Try signing in using the master_password and a subdomain if one is provided
311                try:
312                    args = ['signin', '--output=raw']
313
314                    if self.auto_login.get('subdomain'):
315                        args = ['signin', self.auto_login['subdomain'], '--output=raw']
316
317                    rc, out, err = self._run(args, command_input=to_bytes(self.auto_login['master_password']))
318                    self.token = out.strip()
319
320                except AnsibleModuleError:
321                    self.full_login()
322
323            else:
324                self.full_login()
325
326        else:
327            # Attempt a full sign in since there appears to be no existing sign in
328            self.full_login()
329
330    def assert_logged_in(self):
331        try:
332            rc, out, err = self._run(['get', 'account'], ignore_errors=True)
333            if rc == 0:
334                self.logged_in = True
335            if not self.logged_in:
336                self.get_token()
337        except OSError as e:
338            if e.errno == errno.ENOENT:
339                module.fail_json(msg="1Password CLI tool '%s' not installed in path on control machine" % self.cli_path)
340            raise e
341
342    def run(self):
343        result = {}
344
345        self.assert_logged_in()
346
347        for term in self.terms:
348            value = self.get_field(term['name'], term['field'], term['section'], term['vault'])
349
350            if term['name'] in result:
351                # If we already have a result for this key, we have to append this result dictionary
352                # to the existing one. This is only applicable when there is a single item
353                # in 1Password which has two different fields, and we want to retrieve both of them.
354                result[term['name']].update(value)
355            else:
356                # If this is the first result for this key, simply set it.
357                result[term['name']] = value
358
359        return result
360
361
362def main():
363    global module
364    module = AnsibleModule(
365        argument_spec=dict(
366            cli_path=dict(type='path', default='op'),
367            auto_login=dict(type='dict', options=dict(
368                subdomain=dict(type='str'),
369                username=dict(type='str'),
370                master_password=dict(required=True, type='str', no_log=True),
371                secret_key=dict(type='str', no_log=True),
372            ), default=None),
373            search_terms=dict(required=True, type='list', elements='dict'),
374        ),
375        supports_check_mode=True
376    )
377
378    results = {'onepassword': OnePasswordInfo().run()}
379
380    module.exit_json(changed=False, **results)
381
382
383if __name__ == '__main__':
384    main()
385