1#!/usr/local/bin/python3.8 2# -*- coding: utf-8 -*- 3# 4# (c) 2018, Ryan Conway (@rylon) 5# (c) 2018, Scott Buchanan <sbuchanan@ri.pn> (onepassword.py used as starting point) 6# (c) 2018, Ansible Project 7# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 8 9 10from __future__ import (absolute_import, division, print_function) 11__metaclass__ = type 12 13 14DOCUMENTATION = ''' 15module: onepassword_info 16author: 17 - Ryan Conway (@Rylon) 18requirements: 19 - C(op) 1Password command line utility. See U(https://support.1password.com/command-line/) 20notes: 21 - Tested with C(op) version 0.5.5 22 - "Based on the C(onepassword) lookup plugin by Scott Buchanan <sbuchanan@ri.pn>." 23short_description: Gather items from 1Password 24description: 25 - M(community.general.onepassword_info) wraps the C(op) command line utility to fetch data about one or more 1Password items. 26 - A fatal error occurs if any of the items being searched for can not be found. 27 - Recommend using with the C(no_log) option to avoid logging the values of the secrets being retrieved. 28 - This module was called C(onepassword_facts) before Ansible 2.9, returning C(ansible_facts). 29 Note that the M(community.general.onepassword_info) module no longer returns C(ansible_facts)! 30 You must now use the C(register) option to use the facts in other tasks. 31options: 32 search_terms: 33 type: list 34 elements: dict 35 description: 36 - A list of one or more search terms. 37 - Each search term can either be a simple string or it can be a dictionary for more control. 38 - When passing a simple string, I(field) is assumed to be C(password). 39 - When passing a dictionary, the following fields are available. 40 suboptions: 41 name: 42 type: str 43 description: 44 - The name of the 1Password item to search for (required). 45 field: 46 type: str 47 description: 48 - The name of the field to search for within this item (optional, defaults to "password" (or "document" if the item has an attachment). 49 section: 50 type: str 51 description: 52 - The name of a section within this item containing the specified field (optional, will search all sections if not specified). 53 vault: 54 type: str 55 description: 56 - The name of the particular 1Password vault to search, useful if your 1Password user has access to multiple vaults (optional). 57 required: True 58 auto_login: 59 type: dict 60 description: 61 - A dictionary containing authentication details. If this is set, M(community.general.onepassword_info) 62 will attempt to sign in to 1Password automatically. 63 - Without this option, you must have already logged in via the 1Password CLI before running Ansible. 64 - It is B(highly) recommended to store 1Password credentials in an Ansible Vault. Ensure that the key used to encrypt 65 the Ansible Vault is equal to or greater in strength than the 1Password master password. 66 suboptions: 67 subdomain: 68 type: str 69 description: 70 - 1Password subdomain name (<subdomain>.1password.com). 71 - If this is not specified, the most recent subdomain will be used. 72 username: 73 type: str 74 description: 75 - 1Password username. 76 - Only required for initial sign in. 77 master_password: 78 type: str 79 description: 80 - The master password for your subdomain. 81 - This is always required when specifying C(auto_login). 82 required: True 83 secret_key: 84 type: str 85 description: 86 - The secret key for your subdomain. 87 - Only required for initial sign in. 88 default: {} 89 required: False 90 cli_path: 91 type: path 92 description: Used to specify the exact path to the C(op) command line interface 93 required: False 94 default: 'op' 95''' 96 97EXAMPLES = ''' 98# Gather secrets from 1Password, assuming there is a 'password' field: 99- name: Get a password 100 community.general.onepassword_info: 101 search_terms: My 1Password item 102 delegate_to: localhost 103 register: my_1password_item 104 no_log: true # Don't want to log the secrets to the console! 105 106# Gather secrets from 1Password, with more advanced search terms: 107- name: Get a password 108 community.general.onepassword_info: 109 search_terms: 110 - name: My 1Password item 111 field: Custom field name # optional, defaults to 'password' 112 section: Custom section name # optional, defaults to 'None' 113 vault: Name of the vault # optional, only necessary if there is more than 1 Vault available 114 delegate_to: localhost 115 register: my_1password_item 116 no_log: True # Don't want to log the secrets to the console! 117 118# Gather secrets combining simple and advanced search terms to retrieve two items, one of which we fetch two 119# fields. In the first 'password' is fetched, as a field name is not specified (default behaviour) and in the 120# second, 'Custom field name' is fetched, as that is specified explicitly. 121- name: Get a password 122 community.general.onepassword_info: 123 search_terms: 124 - My 1Password item # 'name' is optional when passing a simple string... 125 - name: My Other 1Password item # ...but it can also be set for consistency 126 - name: My 1Password item 127 field: Custom field name # optional, defaults to 'password' 128 section: Custom section name # optional, defaults to 'None' 129 vault: Name of the vault # optional, only necessary if there is more than 1 Vault available 130 - name: A 1Password item with document attachment 131 delegate_to: localhost 132 register: my_1password_item 133 no_log: true # Don't want to log the secrets to the console! 134 135- name: Debug a password (for example) 136 ansible.builtin.debug: 137 msg: "{{ my_1password_item['onepassword']['My 1Password item'] }}" 138''' 139 140RETURN = ''' 141--- 142# One or more dictionaries for each matching item from 1Password, along with the appropriate fields. 143# This shows the response you would expect to receive from the third example documented above. 144onepassword: 145 description: Dictionary of each 1password item matching the given search terms, shows what would be returned from the third example above. 146 returned: success 147 type: dict 148 sample: 149 "My 1Password item": 150 password: the value of this field 151 Custom field name: the value of this field 152 "My Other 1Password item": 153 password: the value of this field 154 "A 1Password item with document attachment": 155 document: the contents of the document attached to this item 156''' 157 158 159import errno 160import json 161import os 162import re 163 164from subprocess import Popen, PIPE 165 166from ansible.module_utils.common.text.converters import to_bytes, to_native 167from ansible.module_utils.basic import AnsibleModule 168 169 170class AnsibleModuleError(Exception): 171 def __init__(self, results): 172 self.results = results 173 174 def __repr__(self): 175 return self.results 176 177 178class OnePasswordInfo(object): 179 180 def __init__(self): 181 self.cli_path = module.params.get('cli_path') 182 self.config_file_path = '~/.op/config' 183 self.auto_login = module.params.get('auto_login') 184 self.logged_in = False 185 self.token = None 186 187 terms = module.params.get('search_terms') 188 self.terms = self.parse_search_terms(terms) 189 190 def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False): 191 if self.token: 192 # Adds the session token to all commands if we're logged in. 193 args += [to_bytes('--session=') + self.token] 194 195 command = [self.cli_path] + args 196 p = Popen(command, stdout=PIPE, stderr=PIPE, stdin=PIPE) 197 out, err = p.communicate(input=command_input) 198 rc = p.wait() 199 if not ignore_errors and rc != expected_rc: 200 raise AnsibleModuleError(to_native(err)) 201 return rc, out, err 202 203 def _parse_field(self, data_json, item_id, field_name, section_title=None): 204 data = json.loads(data_json) 205 206 if ('documentAttributes' in data['details']): 207 # This is actually a document, let's fetch the document data instead! 208 document = self._run(["get", "document", data['overview']['title']]) 209 return {'document': document[1].strip()} 210 211 else: 212 # This is not a document, let's try to find the requested field 213 214 # Some types of 1Password items have a 'password' field directly alongside the 'fields' attribute, 215 # not inside it, so we need to check there first. 216 if (field_name in data['details']): 217 return {field_name: data['details'][field_name]} 218 219 # Otherwise we continue looking inside the 'fields' attribute for the specified field. 220 else: 221 if section_title is None: 222 for field_data in data['details'].get('fields', []): 223 if field_data.get('name', '').lower() == field_name.lower(): 224 return {field_name: field_data.get('value', '')} 225 226 # Not found it yet, so now lets see if there are any sections defined 227 # and search through those for the field. If a section was given, we skip 228 # any non-matching sections, otherwise we search them all until we find the field. 229 for section_data in data['details'].get('sections', []): 230 if section_title is not None and section_title.lower() != section_data['title'].lower(): 231 continue 232 for field_data in section_data.get('fields', []): 233 if field_data.get('t', '').lower() == field_name.lower(): 234 return {field_name: field_data.get('v', '')} 235 236 # We will get here if the field could not be found in any section and the item wasn't a document to be downloaded. 237 optional_section_title = '' if section_title is None else " in the section '%s'" % section_title 238 module.fail_json(msg="Unable to find an item in 1Password named '%s' with the field '%s'%s." % (item_id, field_name, optional_section_title)) 239 240 def parse_search_terms(self, terms): 241 processed_terms = [] 242 243 for term in terms: 244 if not isinstance(term, dict): 245 term = {'name': term} 246 247 if 'name' not in term: 248 module.fail_json(msg="Missing required 'name' field from search term, got: '%s'" % to_native(term)) 249 250 term['field'] = term.get('field', 'password') 251 term['section'] = term.get('section', None) 252 term['vault'] = term.get('vault', None) 253 254 processed_terms.append(term) 255 256 return processed_terms 257 258 def get_raw(self, item_id, vault=None): 259 try: 260 args = ["get", "item", item_id] 261 if vault is not None: 262 args += ['--vault={0}'.format(vault)] 263 rc, output, dummy = self._run(args) 264 return output 265 266 except Exception as e: 267 if re.search(".*not found.*", to_native(e)): 268 module.fail_json(msg="Unable to find an item in 1Password named '%s'." % item_id) 269 else: 270 module.fail_json(msg="Unexpected error attempting to find an item in 1Password named '%s': %s" % (item_id, to_native(e))) 271 272 def get_field(self, item_id, field, section=None, vault=None): 273 output = self.get_raw(item_id, vault) 274 return self._parse_field(output, item_id, field, section) if output != '' else '' 275 276 def full_login(self): 277 if self.auto_login is not None: 278 if None in [self.auto_login.get('subdomain'), self.auto_login.get('username'), 279 self.auto_login.get('secret_key'), self.auto_login.get('master_password')]: 280 module.fail_json(msg='Unable to perform initial sign in to 1Password. ' 281 'subdomain, username, secret_key, and master_password are required to perform initial sign in.') 282 283 args = [ 284 'signin', 285 '{0}.1password.com'.format(self.auto_login['subdomain']), 286 to_bytes(self.auto_login['username']), 287 to_bytes(self.auto_login['secret_key']), 288 '--output=raw', 289 ] 290 291 try: 292 rc, out, err = self._run(args, command_input=to_bytes(self.auto_login['master_password'])) 293 self.token = out.strip() 294 except AnsibleModuleError as e: 295 module.fail_json(msg="Failed to perform initial sign in to 1Password: %s" % to_native(e)) 296 else: 297 module.fail_json(msg="Unable to perform an initial sign in to 1Password. Please run '%s sigin' " 298 "or define credentials in 'auto_login'. See the module documentation for details." % self.cli_path) 299 300 def get_token(self): 301 # If the config file exists, assume an initial signin has taken place and try basic sign in 302 if os.path.isfile(self.config_file_path): 303 304 if self.auto_login is not None: 305 306 # Since we are not currently signed in, master_password is required at a minimum 307 if not self.auto_login.get('master_password'): 308 module.fail_json(msg="Unable to sign in to 1Password. 'auto_login.master_password' is required.") 309 310 # Try signing in using the master_password and a subdomain if one is provided 311 try: 312 args = ['signin', '--output=raw'] 313 314 if self.auto_login.get('subdomain'): 315 args = ['signin', self.auto_login['subdomain'], '--output=raw'] 316 317 rc, out, err = self._run(args, command_input=to_bytes(self.auto_login['master_password'])) 318 self.token = out.strip() 319 320 except AnsibleModuleError: 321 self.full_login() 322 323 else: 324 self.full_login() 325 326 else: 327 # Attempt a full sign in since there appears to be no existing sign in 328 self.full_login() 329 330 def assert_logged_in(self): 331 try: 332 rc, out, err = self._run(['get', 'account'], ignore_errors=True) 333 if rc == 0: 334 self.logged_in = True 335 if not self.logged_in: 336 self.get_token() 337 except OSError as e: 338 if e.errno == errno.ENOENT: 339 module.fail_json(msg="1Password CLI tool '%s' not installed in path on control machine" % self.cli_path) 340 raise e 341 342 def run(self): 343 result = {} 344 345 self.assert_logged_in() 346 347 for term in self.terms: 348 value = self.get_field(term['name'], term['field'], term['section'], term['vault']) 349 350 if term['name'] in result: 351 # If we already have a result for this key, we have to append this result dictionary 352 # to the existing one. This is only applicable when there is a single item 353 # in 1Password which has two different fields, and we want to retrieve both of them. 354 result[term['name']].update(value) 355 else: 356 # If this is the first result for this key, simply set it. 357 result[term['name']] = value 358 359 return result 360 361 362def main(): 363 global module 364 module = AnsibleModule( 365 argument_spec=dict( 366 cli_path=dict(type='path', default='op'), 367 auto_login=dict(type='dict', options=dict( 368 subdomain=dict(type='str'), 369 username=dict(type='str'), 370 master_password=dict(required=True, type='str', no_log=True), 371 secret_key=dict(type='str', no_log=True), 372 ), default=None), 373 search_terms=dict(required=True, type='list', elements='dict'), 374 ), 375 supports_check_mode=True 376 ) 377 378 results = {'onepassword': OnePasswordInfo().run()} 379 380 module.exit_json(changed=False, **results) 381 382 383if __name__ == '__main__': 384 main() 385