1"""Support code for CI environments."""
2from __future__ import (absolute_import, division, print_function)
3__metaclass__ = type
4
5import abc
6import base64
7import json
8import os
9import tempfile
10
11
12from .. import types as t
13
14from ..encoding import (
15    to_bytes,
16    to_text,
17)
18
19from ..io import (
20    read_text_file,
21    write_text_file,
22)
23
24from ..config import (
25    CommonConfig,
26    TestConfig,
27)
28
29from ..util import (
30    ABC,
31    ApplicationError,
32    display,
33    get_subclasses,
34    import_plugins,
35    raw_command,
36)
37
38
39class ChangeDetectionNotSupported(ApplicationError):
40    """Exception for cases where change detection is not supported."""
41
42
43class AuthContext:
44    """Context information required for Ansible Core CI authentication."""
45    def __init__(self):  # type: () -> None
46        self.region = None  # type: t.Optional[str]
47
48
49class CIProvider(ABC):
50    """Base class for CI provider plugins."""
51    priority = 500
52
53    @staticmethod
54    @abc.abstractmethod
55    def is_supported():  # type: () -> bool
56        """Return True if this provider is supported in the current running environment."""
57
58    @property
59    @abc.abstractmethod
60    def code(self):  # type: () -> str
61        """Return a unique code representing this provider."""
62
63    @property
64    @abc.abstractmethod
65    def name(self):  # type: () -> str
66        """Return descriptive name for this provider."""
67
68    @abc.abstractmethod
69    def generate_resource_prefix(self):  # type: () -> str
70        """Return a resource prefix specific to this CI provider."""
71
72    @abc.abstractmethod
73    def get_base_branch(self):  # type: () -> str
74        """Return the base branch or an empty string."""
75
76    @abc.abstractmethod
77    def detect_changes(self, args):  # type: (TestConfig) -> t.Optional[t.List[str]]
78        """Initialize change detection."""
79
80    @abc.abstractmethod
81    def supports_core_ci_auth(self, context):  # type: (AuthContext) -> bool
82        """Return True if Ansible Core CI is supported."""
83
84    @abc.abstractmethod
85    def prepare_core_ci_auth(self, context):  # type: (AuthContext) -> t.Dict[str, t.Any]
86        """Return authentication details for Ansible Core CI."""
87
88    @abc.abstractmethod
89    def get_git_details(self, args):  # type: (CommonConfig) -> t.Optional[t.Dict[str, t.Any]]
90        """Return details about git in the current environment."""
91
92
93def get_ci_provider():  # type: () -> CIProvider
94    """Return a CI provider instance for the current environment."""
95    try:
96        return get_ci_provider.provider
97    except AttributeError:
98        pass
99
100    provider = None
101
102    import_plugins('ci')
103
104    candidates = sorted(get_subclasses(CIProvider), key=lambda c: (c.priority, c.__name__))
105
106    for candidate in candidates:
107        if candidate.is_supported():
108            provider = candidate()
109            break
110
111    if provider.code:
112        display.info('Detected CI provider: %s' % provider.name)
113
114    get_ci_provider.provider = provider
115
116    return provider
117
118
119class AuthHelper(ABC):
120    """Public key based authentication helper for Ansible Core CI."""
121    def sign_request(self, request):  # type: (t.Dict[str, t.Any]) -> None
122        """Sign the given auth request and make the public key available."""
123        payload_bytes = to_bytes(json.dumps(request, sort_keys=True))
124        signature_raw_bytes = self.sign_bytes(payload_bytes)
125        signature = to_text(base64.b64encode(signature_raw_bytes))
126
127        request.update(signature=signature)
128
129    def initialize_private_key(self):  # type: () -> str
130        """
131        Initialize and publish a new key pair (if needed) and return the private key.
132        The private key is cached across ansible-test invocations so it is only generated and published once per CI job.
133        """
134        path = os.path.expanduser('~/.ansible-core-ci-private.key')
135
136        if os.path.exists(to_bytes(path)):
137            private_key_pem = read_text_file(path)
138        else:
139            private_key_pem = self.generate_private_key()
140            write_text_file(path, private_key_pem)
141
142        return private_key_pem
143
144    @abc.abstractmethod
145    def sign_bytes(self, payload_bytes):  # type: (bytes) -> bytes
146        """Sign the given payload and return the signature, initializing a new key pair if required."""
147
148    @abc.abstractmethod
149    def publish_public_key(self, public_key_pem):  # type: (str) -> None
150        """Publish the given public key."""
151
152    @abc.abstractmethod
153    def generate_private_key(self):  # type: () -> str
154        """Generate a new key pair, publishing the public key and returning the private key."""
155
156
157class CryptographyAuthHelper(AuthHelper, ABC):  # pylint: disable=abstract-method
158    """Cryptography based public key based authentication helper for Ansible Core CI."""
159    def sign_bytes(self, payload_bytes):  # type: (bytes) -> bytes
160        """Sign the given payload and return the signature, initializing a new key pair if required."""
161        # import cryptography here to avoid overhead and failures in environments which do not use/provide it
162        from cryptography.hazmat.backends import default_backend
163        from cryptography.hazmat.primitives import hashes
164        from cryptography.hazmat.primitives.asymmetric import ec
165        from cryptography.hazmat.primitives.serialization import load_pem_private_key
166
167        private_key_pem = self.initialize_private_key()
168        private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend())
169
170        signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256()))
171
172        return signature_raw_bytes
173
174    def generate_private_key(self):  # type: () -> str
175        """Generate a new key pair, publishing the public key and returning the private key."""
176        # import cryptography here to avoid overhead and failures in environments which do not use/provide it
177        from cryptography.hazmat.backends import default_backend
178        from cryptography.hazmat.primitives import serialization
179        from cryptography.hazmat.primitives.asymmetric import ec
180
181        private_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
182        public_key = private_key.public_key()
183
184        private_key_pem = to_text(private_key.private_bytes(
185            encoding=serialization.Encoding.PEM,
186            format=serialization.PrivateFormat.PKCS8,
187            encryption_algorithm=serialization.NoEncryption(),
188        ))
189
190        public_key_pem = to_text(public_key.public_bytes(
191            encoding=serialization.Encoding.PEM,
192            format=serialization.PublicFormat.SubjectPublicKeyInfo,
193        ))
194
195        self.publish_public_key(public_key_pem)
196
197        return private_key_pem
198
199
200class OpenSSLAuthHelper(AuthHelper, ABC):  # pylint: disable=abstract-method
201    """OpenSSL based public key based authentication helper for Ansible Core CI."""
202    def sign_bytes(self, payload_bytes):  # type: (bytes) -> bytes
203        """Sign the given payload and return the signature, initializing a new key pair if required."""
204        private_key_pem = self.initialize_private_key()
205
206        with tempfile.NamedTemporaryFile() as private_key_file:
207            private_key_file.write(to_bytes(private_key_pem))
208            private_key_file.flush()
209
210            with tempfile.NamedTemporaryFile() as payload_file:
211                payload_file.write(payload_bytes)
212                payload_file.flush()
213
214                with tempfile.NamedTemporaryFile() as signature_file:
215                    raw_command(['openssl', 'dgst', '-sha256', '-sign', private_key_file.name, '-out', signature_file.name, payload_file.name], capture=True)
216                    signature_raw_bytes = signature_file.read()
217
218        return signature_raw_bytes
219
220    def generate_private_key(self):  # type: () -> str
221        """Generate a new key pair, publishing the public key and returning the private key."""
222        private_key_pem = raw_command(['openssl', 'ecparam', '-genkey', '-name', 'secp384r1', '-noout'], capture=True)[0]
223        public_key_pem = raw_command(['openssl', 'ec', '-pubout'], data=private_key_pem, capture=True)[0]
224
225        self.publish_public_key(public_key_pem)
226
227        return private_key_pem
228