1"""Support code for CI environments.""" 2from __future__ import (absolute_import, division, print_function) 3__metaclass__ = type 4 5import abc 6import base64 7import json 8import os 9import tempfile 10 11 12from .. import types as t 13 14from ..encoding import ( 15 to_bytes, 16 to_text, 17) 18 19from ..io import ( 20 read_text_file, 21 write_text_file, 22) 23 24from ..config import ( 25 CommonConfig, 26 TestConfig, 27) 28 29from ..util import ( 30 ABC, 31 ApplicationError, 32 display, 33 get_subclasses, 34 import_plugins, 35 raw_command, 36) 37 38 39class ChangeDetectionNotSupported(ApplicationError): 40 """Exception for cases where change detection is not supported.""" 41 42 43class AuthContext: 44 """Context information required for Ansible Core CI authentication.""" 45 def __init__(self): # type: () -> None 46 pass 47 48 49class CIProvider(ABC): 50 """Base class for CI provider plugins.""" 51 priority = 500 52 53 @staticmethod 54 @abc.abstractmethod 55 def is_supported(): # type: () -> bool 56 """Return True if this provider is supported in the current running environment.""" 57 58 @property 59 @abc.abstractmethod 60 def code(self): # type: () -> str 61 """Return a unique code representing this provider.""" 62 63 @property 64 @abc.abstractmethod 65 def name(self): # type: () -> str 66 """Return descriptive name for this provider.""" 67 68 @abc.abstractmethod 69 def generate_resource_prefix(self): # type: () -> str 70 """Return a resource prefix specific to this CI provider.""" 71 72 @abc.abstractmethod 73 def get_base_branch(self): # type: () -> str 74 """Return the base branch or an empty string.""" 75 76 @abc.abstractmethod 77 def detect_changes(self, args): # type: (TestConfig) -> t.Optional[t.List[str]] 78 """Initialize change detection.""" 79 80 @abc.abstractmethod 81 def supports_core_ci_auth(self, context): # type: (AuthContext) -> bool 82 """Return True if Ansible Core CI is supported.""" 83 84 @abc.abstractmethod 85 def prepare_core_ci_auth(self, context): # type: (AuthContext) -> t.Dict[str, t.Any] 86 """Return authentication details for Ansible Core CI.""" 87 88 @abc.abstractmethod 89 def get_git_details(self, args): # type: (CommonConfig) -> t.Optional[t.Dict[str, t.Any]] 90 """Return details about git in the current environment.""" 91 92 93def get_ci_provider(): # type: () -> CIProvider 94 """Return a CI provider instance for the current environment.""" 95 try: 96 return get_ci_provider.provider 97 except AttributeError: 98 pass 99 100 provider = None 101 102 import_plugins('ci') 103 104 candidates = sorted(get_subclasses(CIProvider), key=lambda c: (c.priority, c.__name__)) 105 106 for candidate in candidates: 107 if candidate.is_supported(): 108 provider = candidate() 109 break 110 111 if provider.code: 112 display.info('Detected CI provider: %s' % provider.name) 113 114 get_ci_provider.provider = provider 115 116 return provider 117 118 119class AuthHelper(ABC): 120 """Public key based authentication helper for Ansible Core CI.""" 121 def sign_request(self, request): # type: (t.Dict[str, t.Any]) -> None 122 """Sign the given auth request and make the public key available.""" 123 payload_bytes = to_bytes(json.dumps(request, sort_keys=True)) 124 signature_raw_bytes = self.sign_bytes(payload_bytes) 125 signature = to_text(base64.b64encode(signature_raw_bytes)) 126 127 request.update(signature=signature) 128 129 def initialize_private_key(self): # type: () -> str 130 """ 131 Initialize and publish a new key pair (if needed) and return the private key. 132 The private key is cached across ansible-test invocations so it is only generated and published once per CI job. 133 """ 134 path = os.path.expanduser('~/.ansible-core-ci-private.key') 135 136 if os.path.exists(to_bytes(path)): 137 private_key_pem = read_text_file(path) 138 else: 139 private_key_pem = self.generate_private_key() 140 write_text_file(path, private_key_pem) 141 142 return private_key_pem 143 144 @abc.abstractmethod 145 def sign_bytes(self, payload_bytes): # type: (bytes) -> bytes 146 """Sign the given payload and return the signature, initializing a new key pair if required.""" 147 148 @abc.abstractmethod 149 def publish_public_key(self, public_key_pem): # type: (str) -> None 150 """Publish the given public key.""" 151 152 @abc.abstractmethod 153 def generate_private_key(self): # type: () -> str 154 """Generate a new key pair, publishing the public key and returning the private key.""" 155 156 157class CryptographyAuthHelper(AuthHelper, ABC): # pylint: disable=abstract-method 158 """Cryptography based public key based authentication helper for Ansible Core CI.""" 159 def sign_bytes(self, payload_bytes): # type: (bytes) -> bytes 160 """Sign the given payload and return the signature, initializing a new key pair if required.""" 161 # import cryptography here to avoid overhead and failures in environments which do not use/provide it 162 from cryptography.hazmat.backends import default_backend 163 from cryptography.hazmat.primitives import hashes 164 from cryptography.hazmat.primitives.asymmetric import ec 165 from cryptography.hazmat.primitives.serialization import load_pem_private_key 166 167 private_key_pem = self.initialize_private_key() 168 private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend()) 169 170 signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256())) 171 172 return signature_raw_bytes 173 174 def generate_private_key(self): # type: () -> str 175 """Generate a new key pair, publishing the public key and returning the private key.""" 176 # import cryptography here to avoid overhead and failures in environments which do not use/provide it 177 from cryptography.hazmat.backends import default_backend 178 from cryptography.hazmat.primitives import serialization 179 from cryptography.hazmat.primitives.asymmetric import ec 180 181 private_key = ec.generate_private_key(ec.SECP384R1(), default_backend()) 182 public_key = private_key.public_key() 183 184 # noinspection PyUnresolvedReferences 185 private_key_pem = to_text(private_key.private_bytes( 186 encoding=serialization.Encoding.PEM, 187 format=serialization.PrivateFormat.PKCS8, 188 encryption_algorithm=serialization.NoEncryption(), 189 )) 190 191 # noinspection PyTypeChecker 192 public_key_pem = to_text(public_key.public_bytes( 193 encoding=serialization.Encoding.PEM, 194 format=serialization.PublicFormat.SubjectPublicKeyInfo, 195 )) 196 197 self.publish_public_key(public_key_pem) 198 199 return private_key_pem 200 201 202class OpenSSLAuthHelper(AuthHelper, ABC): # pylint: disable=abstract-method 203 """OpenSSL based public key based authentication helper for Ansible Core CI.""" 204 def sign_bytes(self, payload_bytes): # type: (bytes) -> bytes 205 """Sign the given payload and return the signature, initializing a new key pair if required.""" 206 private_key_pem = self.initialize_private_key() 207 208 with tempfile.NamedTemporaryFile() as private_key_file: 209 private_key_file.write(to_bytes(private_key_pem)) 210 private_key_file.flush() 211 212 with tempfile.NamedTemporaryFile() as payload_file: 213 payload_file.write(payload_bytes) 214 payload_file.flush() 215 216 with tempfile.NamedTemporaryFile() as signature_file: 217 raw_command(['openssl', 'dgst', '-sha256', '-sign', private_key_file.name, '-out', signature_file.name, payload_file.name], capture=True) 218 signature_raw_bytes = signature_file.read() 219 220 return signature_raw_bytes 221 222 def generate_private_key(self): # type: () -> str 223 """Generate a new key pair, publishing the public key and returning the private key.""" 224 private_key_pem = raw_command(['openssl', 'ecparam', '-genkey', '-name', 'secp384r1', '-noout'], capture=True)[0] 225 public_key_pem = raw_command(['openssl', 'ec', '-pubout'], data=private_key_pem, capture=True)[0] 226 227 self.publish_public_key(public_key_pem) 228 229 return private_key_pem 230