1# (c) 2018 Red Hat Inc. 2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 3 4from __future__ import (absolute_import, division, print_function) 5 6__metaclass__ = type 7 8DOCUMENTATION = """ 9--- 10author: Ansible Networking Team 11httpapi : checkpoint 12short_description: HttpApi Plugin for Checkpoint devices 13description: 14 - This HttpApi plugin provides methods to connect to Checkpoint 15 devices over a HTTP(S)-based api. 16version_added: "2.8" 17options: 18 domain: 19 type: str 20 description: 21 - Specifies the domain of the Check Point device 22 vars: 23 - name: ansible_checkpoint_domain 24 api_key: 25 type: str 26 description: 27 - Login with api-key instead of user & password 28 vars: 29 - name: ansible_api_key 30""" 31 32import json 33 34from ansible.module_utils.basic import to_text 35from ansible.errors import AnsibleConnectionFailure 36from ansible.module_utils.six.moves.urllib.error import HTTPError 37from ansible.plugins.httpapi import HttpApiBase 38from ansible.module_utils.connection import ConnectionError 39 40BASE_HEADERS = { 41 'Content-Type': 'application/json', 42 'User-Agent': 'Ansible', 43} 44 45 46class HttpApi(HttpApiBase): 47 def login(self, username, password): 48 payload = {} 49 cp_domain = self.get_option('domain') 50 cp_api_key = self.get_option('api_key') 51 if cp_domain: 52 payload['domain'] = cp_domain 53 if username and password and not cp_api_key: 54 payload['user'] = username 55 payload['password'] = password 56 elif cp_api_key and not username and not password: 57 payload['api-key'] = cp_api_key 58 else: 59 raise AnsibleConnectionFailure('[Username and password] or api_key are required for login') 60 url = '/web_api/login' 61 response, response_data = self.send_request(url, payload) 62 63 try: 64 self.connection._auth = {'X-chkp-sid': response_data['sid']} 65 self.connection._session_uid = response_data['uid'] 66 except KeyError: 67 raise ConnectionError( 68 'Server returned response without token info during connection authentication: %s' % response) 69 70 def logout(self): 71 url = '/web_api/logout' 72 73 response, dummy = self.send_request(url, None) 74 75 def get_session_uid(self): 76 return self.connection._session_uid 77 78 def send_request(self, path, body_params): 79 data = json.dumps(body_params) if body_params else '{}' 80 81 try: 82 self._display_request() 83 response, response_data = self.connection.send(path, data, method='POST', headers=BASE_HEADERS) 84 value = self._get_response_value(response_data) 85 86 return response.getcode(), self._response_to_json(value) 87 except AnsibleConnectionFailure as e: 88 return 404, e.message 89 except HTTPError as e: 90 error = json.loads(e.read()) 91 return e.code, error 92 93 def _display_request(self): 94 self.connection.queue_message('vvvv', 'Web Services: %s %s' % ('POST', self.connection._url)) 95 96 def _get_response_value(self, response_data): 97 return to_text(response_data.getvalue()) 98 99 def _response_to_json(self, response_text): 100 try: 101 return json.loads(response_text) if response_text else {} 102 # JSONDecodeError only available on Python 3.5+ 103 except ValueError: 104 raise ConnectionError('Invalid JSON response: %s' % response_text) 105