1# coding: utf-8 2# Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved. 3# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license. 4 5from __future__ import absolute_import 6 7import base64 8import email.utils 9import hashlib 10import io 11import functools 12import os 13 14from oci._vendor import six 15from oci.util import record_body_position_for_rewind, rewind_body, back_up_body_calculate_stream_content_length, read_stream_for_signing 16 17from ._vendor import httpsig_cffi, requests 18from .exceptions import InvalidPrivateKey, MissingPrivateKeyPassphrase 19 20from cryptography.exceptions import UnsupportedAlgorithm 21from cryptography.hazmat.backends import default_backend 22from cryptography.hazmat.primitives import serialization 23import logging 24 25logger = logging.getLogger(__name__) 26SIGNATURE_VERSION = "1" 27 28 29def load_private_key_from_file(filename, pass_phrase=None): 30 filename = os.path.expanduser(filename) 31 with io.open(filename, mode="rb") as f: 32 private_key_data = f.read().strip() 33 return load_private_key(private_key_data, pass_phrase) 34 35 36def load_private_key(secret, pass_phrase): 37 """Loads a private key that may use a pass_phrase. 38 39 Tries to correct or diagnose common errors: 40 41 - provided pass_phrase but didn't need one 42 - provided a public key 43 """ 44 if isinstance(secret, six.text_type): 45 secret = secret.encode("ascii") 46 if isinstance(pass_phrase, six.text_type): 47 pass_phrase = pass_phrase.encode("ascii") 48 49 backend = default_backend() 50 51 try: 52 # 0) Try with pass_phrase 53 return serialization.load_pem_private_key(secret, pass_phrase, backend=backend) 54 except TypeError: 55 # 1) Either: 56 # - key has pass_phrase and one wasn't provided 57 # - key doesn't have pass_phrase and one was provided. 58 # 59 # Can't fix the first, but we *can* fix the second. 60 # This can happen if the DEFAULT profile has a pass_phrase but 61 # another profile uses a key file without a pass_phrase. 62 if pass_phrase is None: 63 # 1.1) private key needed a pass_phrase and we don't have one 64 raise MissingPrivateKeyPassphrase("The provided key requires a passphrase.") 65 else: 66 # 1.2) try again without pass_phrase; could be an artifact from DEFAULT 67 return serialization.load_pem_private_key(secret, None, backend=backend) 68 except ValueError: 69 # 2) Try to determine what kind of failure this is. 70 # Most likely, this is either a bad password or a public key. 71 # If loading it as a public key fails, it's almost certainly a bad password. 72 for loader in [ 73 serialization.load_der_public_key, 74 serialization.load_pem_public_key, 75 serialization.load_ssh_public_key 76 ]: 77 try: 78 loader(secret, backend=backend) 79 except (ValueError, UnsupportedAlgorithm): 80 # 2.1) Not a public key; try the next format 81 pass 82 else: 83 # 2.2) This is a public key 84 raise InvalidPrivateKey("Authentication requires a private key, but a public key was provided.") 85 # 2.3) Password is probably wrong. 86 raise InvalidPrivateKey("The provided key is not a private key, or the provided passphrase is incorrect.") 87 88 89def inject_missing_headers(request, sign_body, enforce_content_headers): 90 # Inject date, host, and content-type if missing 91 request.headers.setdefault( 92 "date", email.utils.formatdate(usegmt=True)) 93 94 request.headers.setdefault( 95 "host", six.moves.urllib.parse.urlparse(request.url).netloc) 96 97 if hasattr(request.body, "buffer") or hasattr(request.body, "read"): 98 request.headers.setdefault("content-type", "application/octet-stream") 99 request.headers.setdefault("content-type", "application/json") 100 101 if enforce_content_headers: 102 # Requests with a body need to send content-type, 103 # content-length, and x-content-sha256 104 if "x-content-sha256" not in request.headers and sign_body: 105 body = request.body or "" 106 m = hashlib.sha256() 107 # Handle String types 108 if isinstance(body, six.string_types): 109 body = body.encode("utf-8") 110 request.headers.setdefault("content-length", str(len(body))) 111 m.update(body) 112 # Handle bytes 113 elif isinstance(body, (bytes, bytearray)): 114 m.update(body) 115 # Handling signing for Files/stdin 116 elif hasattr(body, "buffer") or hasattr(body, "read"): 117 is_body_rewindable, original_position = record_body_position_for_rewind(body) 118 if is_body_rewindable: 119 content_length = read_stream_for_signing(m, body) 120 if content_length == -1: 121 raise IOError("Unable to read stream for signing! Please sign the stream yourself by using the custom header x-content-sha256") 122 request.headers.setdefault("content-length", str(content_length)) 123 is_rewind_success = rewind_body(body, original_position) 124 if not is_rewind_success: 125 raise IOError("Unable to rewind request body while signing!") 126 else: 127 logger.warning("Stream cannot be rewound, trying to backup and sign the body!") 128 stream = back_up_body_calculate_stream_content_length(body) 129 # Updating request body as it cannot be rewound 130 request.body = stream.get("byte_content") 131 m.update(stream.get("byte_content")) 132 request.headers.setdefault("content-length", str(stream.get("content_length"))) 133 # Update sha256 header 134 if m: 135 base64digest = base64.b64encode(m.digest()) 136 base64string = base64digest.decode("utf-8") 137 request.headers["x-content-sha256"] = base64string 138 139 140# HeaderSigner doesn't support private keys with passwords. 141# Patched since the constructor parses the key in __init__ 142class _PatchedHeaderSigner(httpsig_cffi.sign.HeaderSigner): 143 HEADER_SIGNER_TEMPLATE = 'Signature algorithm="rsa-sha256",headers="{}",keyId="{}",signature="%s",version="{}"' 144 145 """Internal. If you need to construct a Signer, use :class:`~.Signer` instead.""" 146 def __init__(self, key_id, private_key, headers): 147 # Dropped general support for the specific signing/hash the SDK uses. 148 self.sign_algorithm = "rsa" 149 self.hash_algorithm = "sha256" 150 151 self._hash = None 152 self._rsahash = httpsig_cffi.utils.HASHES[self.hash_algorithm] 153 154 self._rsa_private = private_key 155 self._rsa_public = self._rsa_private.public_key() 156 157 self.headers = headers 158 self.signature_template = self.HEADER_SIGNER_TEMPLATE.format(" ".join(headers), key_id, SIGNATURE_VERSION) 159 160 def reset_signer(self, key_id, private_key): 161 self._hash = None 162 self._rsa_private = private_key 163 self._rsa_public = self._rsa_private.public_key() 164 self.signature_template = self.HEADER_SIGNER_TEMPLATE.format(" ".join(self.headers), key_id, SIGNATURE_VERSION) 165 166 167# An abstract class whose subclasses can sign requests. This contains the core logic for creating a signer and signing 168# requests, but does not source the required information: 169# 170# - api key 171# - private key 172# - headers 173# 174# As concrete implementations are expected to provide these and have their ways of sourcing/constructing them. 175class AbstractBaseSigner(requests.auth.AuthBase): 176 def create_signers(self, api_key, private_key, generic_headers, body_headers): 177 self._basic_signer = _PatchedHeaderSigner( 178 key_id=api_key, 179 private_key=private_key, 180 headers=generic_headers) 181 182 self._body_signer = _PatchedHeaderSigner( 183 key_id=api_key, 184 private_key=private_key, 185 headers=generic_headers + body_headers) 186 187 def validate_request(self, request): 188 verb = request.method.lower() 189 if verb not in ["get", "head", "delete", "put", "post", "patch"]: 190 raise ValueError("Don't know how to sign request verb {}".format(verb)) 191 192 def do_request_sign(self, request, enforce_content_headers=True): 193 verb = request.method.lower() 194 sign_body = verb in ["put", "post", "patch"] 195 if sign_body and enforce_content_headers: 196 signer = self._body_signer 197 else: 198 signer = self._basic_signer 199 # The requests library sets the Transfer-Encoding header to 'chunked' if the 200 # body is a stream with 0 length. Object storage does not currently support this option, 201 # and the request will fail if it is not removed. This is the only hook available where we 202 # can do this after the header is added and before the request is sent. 203 request.headers.pop('Transfer-Encoding', None) 204 205 inject_missing_headers(request, sign_body, enforce_content_headers) 206 signed_headers = signer.sign( 207 request.headers, 208 host=six.moves.urllib.parse.urlparse(request.url).netloc, 209 method=request.method, 210 path=request.path_url) 211 request.headers.update(signed_headers) 212 213 return request 214 215 def __call__(self, request, enforce_content_headers=True): 216 self.validate_request(request) 217 return self.do_request_sign(request, enforce_content_headers) 218 219 @property 220 def without_content_headers(self): 221 return functools.partial(self, enforce_content_headers=False) 222 223 224class Signer(AbstractBaseSigner): 225 """ 226 A requests auth instance that can be reused across requests. This signer is intended to be used 227 when signing requests for a given user and it requires that user's ID, their private key 228 and cerificate fingerprint. 229 230 The private key can be sourced from a file (private_key_file_location) or the PEM string can be 231 provided directly (private_key_content). 232 233 The headers to be signed by this signer are not customizable. 234 235 You can manually sign calls by creating an instance of the signer, and 236 providing it as the ``auth`` argument to Requests functions: 237 238 .. code-block:: python 239 240 import requests 241 from oci import Signer 242 243 auth = Signer(...) 244 resp = requests.get("https://...", auth=auth) 245 246 247 """ 248 249 def __init__(self, tenancy, user, fingerprint, private_key_file_location, pass_phrase=None, private_key_content=None): 250 self.api_key = tenancy + "/" + user + "/" + fingerprint 251 252 if private_key_content: 253 self.private_key = load_private_key(private_key_content, pass_phrase) 254 else: 255 self.private_key = load_private_key_from_file(private_key_file_location, pass_phrase) 256 257 generic_headers = ["date", "(request-target)", "host"] 258 body_headers = ["content-length", "content-type", "x-content-sha256"] 259 self.create_signers(self.api_key, self.private_key, generic_headers, body_headers) 260 261 @staticmethod 262 def from_config(config): 263 from .config import validate_config 264 validate_config(config) 265 return Signer( 266 config['tenancy'], 267 config['user'], 268 config['fingerprint'], 269 private_key_file_location=config['key_file'], 270 pass_phrase=config.get('pass_phrase'), 271 private_key_content=config.get('key_content') 272 ) 273