1# coding: utf-8
2# Copyright (c) 2016, 2021, Oracle and/or its affiliates.  All rights reserved.
3# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.
4
5from __future__ import absolute_import
6
7import base64
8import email.utils
9import hashlib
10import io
11import functools
12import os
13
14from oci._vendor import six
15from oci.util import record_body_position_for_rewind, rewind_body, back_up_body_calculate_stream_content_length, read_stream_for_signing
16
17from ._vendor import httpsig_cffi, requests
18from .exceptions import InvalidPrivateKey, MissingPrivateKeyPassphrase
19
20from cryptography.exceptions import UnsupportedAlgorithm
21from cryptography.hazmat.backends import default_backend
22from cryptography.hazmat.primitives import serialization
23import logging
24
25logger = logging.getLogger(__name__)
26SIGNATURE_VERSION = "1"
27
28
29def load_private_key_from_file(filename, pass_phrase=None):
30    filename = os.path.expanduser(filename)
31    with io.open(filename, mode="rb") as f:
32        private_key_data = f.read().strip()
33    return load_private_key(private_key_data, pass_phrase)
34
35
36def load_private_key(secret, pass_phrase):
37    """Loads a private key that may use a pass_phrase.
38
39    Tries to correct or diagnose common errors:
40
41    - provided pass_phrase but didn't need one
42    - provided a public key
43    """
44    if isinstance(secret, six.text_type):
45        secret = secret.encode("ascii")
46    if isinstance(pass_phrase, six.text_type):
47        pass_phrase = pass_phrase.encode("ascii")
48
49    backend = default_backend()
50
51    try:
52        # 0) Try with pass_phrase
53        return serialization.load_pem_private_key(secret, pass_phrase, backend=backend)
54    except TypeError:
55        # 1) Either:
56        #    - key has pass_phrase and one wasn't provided
57        #    - key doesn't have pass_phrase and one was provided.
58        #
59        #    Can't fix the first, but we *can* fix the second.
60        #    This can happen if the DEFAULT profile has a pass_phrase but
61        #    another profile uses a key file without a pass_phrase.
62        if pass_phrase is None:
63            # 1.1) private key needed a pass_phrase and we don't have one
64            raise MissingPrivateKeyPassphrase("The provided key requires a passphrase.")
65        else:
66            # 1.2) try again without pass_phrase; could be an artifact from DEFAULT
67            return serialization.load_pem_private_key(secret, None, backend=backend)
68    except ValueError:
69        # 2) Try to determine what kind of failure this is.
70        #    Most likely, this is either a bad password or a public key.
71        #    If loading it as a public key fails, it's almost certainly a bad password.
72        for loader in [
73            serialization.load_der_public_key,
74            serialization.load_pem_public_key,
75            serialization.load_ssh_public_key
76        ]:
77            try:
78                loader(secret, backend=backend)
79            except (ValueError, UnsupportedAlgorithm):
80                # 2.1) Not a public key; try the next format
81                pass
82            else:
83                # 2.2) This is a public key
84                raise InvalidPrivateKey("Authentication requires a private key, but a public key was provided.")
85        # 2.3) Password is probably wrong.
86        raise InvalidPrivateKey("The provided key is not a private key, or the provided passphrase is incorrect.")
87
88
89def inject_missing_headers(request, sign_body, enforce_content_headers):
90    # Inject date, host, and content-type if missing
91    request.headers.setdefault(
92        "date", email.utils.formatdate(usegmt=True))
93
94    request.headers.setdefault(
95        "host", six.moves.urllib.parse.urlparse(request.url).netloc)
96
97    if hasattr(request.body, "buffer") or hasattr(request.body, "read"):
98        request.headers.setdefault("content-type", "application/octet-stream")
99    request.headers.setdefault("content-type", "application/json")
100
101    if enforce_content_headers:
102        # Requests with a body need to send content-type,
103        # content-length, and x-content-sha256
104        if "x-content-sha256" not in request.headers and sign_body:
105            body = request.body or ""
106            m = hashlib.sha256()
107            # Handle String types
108            if isinstance(body, six.string_types):
109                body = body.encode("utf-8")
110                request.headers.setdefault("content-length", str(len(body)))
111                m.update(body)
112            # Handle bytes
113            elif isinstance(body, (bytes, bytearray)):
114                m.update(body)
115            # Handling signing for Files/stdin
116            elif hasattr(body, "buffer") or hasattr(body, "read"):
117                is_body_rewindable, original_position = record_body_position_for_rewind(body)
118                if is_body_rewindable:
119                    content_length = read_stream_for_signing(m, body)
120                    if content_length == -1:
121                        raise IOError("Unable to read stream for signing! Please sign the stream yourself by using the custom header x-content-sha256")
122                    request.headers.setdefault("content-length", str(content_length))
123                    is_rewind_success = rewind_body(body, original_position)
124                    if not is_rewind_success:
125                        raise IOError("Unable to rewind request body while signing!")
126                else:
127                    logger.warning("Stream cannot be rewound, trying to backup and sign the body!")
128                    stream = back_up_body_calculate_stream_content_length(body)
129                    # Updating request body as it cannot be rewound
130                    request.body = stream.get("byte_content")
131                    m.update(stream.get("byte_content"))
132                    request.headers.setdefault("content-length", str(stream.get("content_length")))
133            # Update sha256 header
134            if m:
135                base64digest = base64.b64encode(m.digest())
136                base64string = base64digest.decode("utf-8")
137                request.headers["x-content-sha256"] = base64string
138
139
140# HeaderSigner doesn't support private keys with passwords.
141# Patched since the constructor parses the key in __init__
142class _PatchedHeaderSigner(httpsig_cffi.sign.HeaderSigner):
143    HEADER_SIGNER_TEMPLATE = 'Signature algorithm="rsa-sha256",headers="{}",keyId="{}",signature="%s",version="{}"'
144
145    """Internal.  If you need to construct a Signer, use :class:`~.Signer` instead."""
146    def __init__(self, key_id, private_key, headers):
147        # Dropped general support for the specific signing/hash the SDK uses.
148        self.sign_algorithm = "rsa"
149        self.hash_algorithm = "sha256"
150
151        self._hash = None
152        self._rsahash = httpsig_cffi.utils.HASHES[self.hash_algorithm]
153
154        self._rsa_private = private_key
155        self._rsa_public = self._rsa_private.public_key()
156
157        self.headers = headers
158        self.signature_template = self.HEADER_SIGNER_TEMPLATE.format(" ".join(headers), key_id, SIGNATURE_VERSION)
159
160    def reset_signer(self, key_id, private_key):
161        self._hash = None
162        self._rsa_private = private_key
163        self._rsa_public = self._rsa_private.public_key()
164        self.signature_template = self.HEADER_SIGNER_TEMPLATE.format(" ".join(self.headers), key_id, SIGNATURE_VERSION)
165
166
167# An abstract class whose subclasses can sign requests. This contains the core logic for creating a signer and signing
168# requests, but does not source the required information:
169#
170#   - api key
171#   - private key
172#   - headers
173#
174# As concrete implementations are expected to provide these and have their ways of sourcing/constructing them.
175class AbstractBaseSigner(requests.auth.AuthBase):
176    def create_signers(self, api_key, private_key, generic_headers, body_headers):
177        self._basic_signer = _PatchedHeaderSigner(
178            key_id=api_key,
179            private_key=private_key,
180            headers=generic_headers)
181
182        self._body_signer = _PatchedHeaderSigner(
183            key_id=api_key,
184            private_key=private_key,
185            headers=generic_headers + body_headers)
186
187    def validate_request(self, request):
188        verb = request.method.lower()
189        if verb not in ["get", "head", "delete", "put", "post", "patch"]:
190            raise ValueError("Don't know how to sign request verb {}".format(verb))
191
192    def do_request_sign(self, request, enforce_content_headers=True):
193        verb = request.method.lower()
194        sign_body = verb in ["put", "post", "patch"]
195        if sign_body and enforce_content_headers:
196            signer = self._body_signer
197        else:
198            signer = self._basic_signer
199            # The requests library sets the Transfer-Encoding header to 'chunked' if the
200            # body is a stream with 0 length. Object storage does not currently support this option,
201            # and the request will fail if it is not removed. This is the only hook available where we
202            # can do this after the header is added and before the request is sent.
203            request.headers.pop('Transfer-Encoding', None)
204
205        inject_missing_headers(request, sign_body, enforce_content_headers)
206        signed_headers = signer.sign(
207            request.headers,
208            host=six.moves.urllib.parse.urlparse(request.url).netloc,
209            method=request.method,
210            path=request.path_url)
211        request.headers.update(signed_headers)
212
213        return request
214
215    def __call__(self, request, enforce_content_headers=True):
216        self.validate_request(request)
217        return self.do_request_sign(request, enforce_content_headers)
218
219    @property
220    def without_content_headers(self):
221        return functools.partial(self, enforce_content_headers=False)
222
223
224class Signer(AbstractBaseSigner):
225    """
226    A requests auth instance that can be reused across requests. This signer is intended to be used
227    when signing requests for a given user and it requires that user's ID, their private key
228    and cerificate fingerprint.
229
230    The private key can be sourced from a file (private_key_file_location) or the PEM string can be
231    provided directly (private_key_content).
232
233    The headers to be signed by this signer are not customizable.
234
235    You can manually sign calls by creating an instance of the signer, and
236    providing it as the ``auth`` argument to Requests functions:
237
238    .. code-block:: python
239
240        import requests
241        from oci import Signer
242
243        auth = Signer(...)
244        resp = requests.get("https://...", auth=auth)
245
246
247    """
248
249    def __init__(self, tenancy, user, fingerprint, private_key_file_location, pass_phrase=None, private_key_content=None):
250        self.api_key = tenancy + "/" + user + "/" + fingerprint
251
252        if private_key_content:
253            self.private_key = load_private_key(private_key_content, pass_phrase)
254        else:
255            self.private_key = load_private_key_from_file(private_key_file_location, pass_phrase)
256
257        generic_headers = ["date", "(request-target)", "host"]
258        body_headers = ["content-length", "content-type", "x-content-sha256"]
259        self.create_signers(self.api_key, self.private_key, generic_headers, body_headers)
260
261    @staticmethod
262    def from_config(config):
263        from .config import validate_config
264        validate_config(config)
265        return Signer(
266            config['tenancy'],
267            config['user'],
268            config['fingerprint'],
269            private_key_file_location=config['key_file'],
270            pass_phrase=config.get('pass_phrase'),
271            private_key_content=config.get('key_content')
272        )
273