1# (c) 2018 Red Hat Inc. 2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 3 4from __future__ import (absolute_import, division, print_function) 5 6__metaclass__ = type 7 8DOCUMENTATION = """ 9--- 10author: Ansible Networking Team 11httpapi : checkpoint 12short_description: HttpApi Plugin for Checkpoint devices 13description: 14 - This HttpApi plugin provides methods to connect to Checkpoint 15 devices over a HTTP(S)-based api. 16version_added: "2.8" 17""" 18 19import json 20 21from ansible.module_utils.basic import to_text 22from ansible.errors import AnsibleConnectionFailure 23from ansible.module_utils.six.moves.urllib.error import HTTPError 24from ansible.plugins.httpapi import HttpApiBase 25from ansible.module_utils.connection import ConnectionError 26 27BASE_HEADERS = { 28 'Content-Type': 'application/json', 29} 30 31 32class HttpApi(HttpApiBase): 33 def login(self, username, password): 34 if username and password: 35 payload = {'user': username, 'password': password} 36 url = '/web_api/login' 37 response, response_data = self.send_request(url, payload) 38 else: 39 raise AnsibleConnectionFailure('Username and password are required for login') 40 41 try: 42 self.connection._auth = {'X-chkp-sid': response_data['sid']} 43 self.connection._session_uid = response_data['uid'] 44 except KeyError: 45 raise ConnectionError( 46 'Server returned response without token info during connection authentication: %s' % response) 47 48 def logout(self): 49 url = '/web_api/logout' 50 51 response, dummy = self.send_request(url, None) 52 53 def get_session_uid(self): 54 return self.connection._session_uid 55 56 def send_request(self, path, body_params): 57 data = json.dumps(body_params) if body_params else '{}' 58 59 try: 60 self._display_request() 61 response, response_data = self.connection.send(path, data, method='POST', headers=BASE_HEADERS) 62 value = self._get_response_value(response_data) 63 64 return response.getcode(), self._response_to_json(value) 65 except AnsibleConnectionFailure as e: 66 return 404, e.message 67 except HTTPError as e: 68 error = json.loads(e.read()) 69 return e.code, error 70 71 def _display_request(self): 72 self.connection.queue_message('vvvv', 'Web Services: %s %s' % ('POST', self.connection._url)) 73 74 def _get_response_value(self, response_data): 75 return to_text(response_data.getvalue()) 76 77 def _response_to_json(self, response_text): 78 try: 79 return json.loads(response_text) if response_text else {} 80 # JSONDecodeError only available on Python 3.5+ 81 except ValueError: 82 raise ConnectionError('Invalid JSON response: %s' % response_text) 83