1""" 2Create and verify ANSI X9.31 RSA signatures using OpenSSL libcrypto 3""" 4 5 6import ctypes.util 7import glob 8import os 9import platform 10import sys 11from ctypes import c_char_p, c_int, c_void_p, cdll, create_string_buffer, pointer 12 13import salt.utils.platform 14import salt.utils.stringutils 15 16# Constants taken from openssl-1.1.0c/include/openssl/crypto.h 17OPENSSL_INIT_ADD_ALL_CIPHERS = 0x00000004 18OPENSSL_INIT_ADD_ALL_DIGESTS = 0x00000008 19OPENSSL_INIT_NO_LOAD_CONFIG = 0x00000080 20 21 22def _find_libcrypto(): 23 """ 24 Find the path (or return the short name) of libcrypto. 25 """ 26 if sys.platform.startswith("win"): 27 lib = "libeay32" 28 elif salt.utils.platform.is_darwin(): 29 # will look for several different location on the system, 30 # Search in the following order. salts pkg, homebrew, macports, finnally 31 # system. 32 # look in salts pkg install location. 33 lib = glob.glob("/opt/salt/lib/libcrypto.dylib") 34 # Find library symlinks in Homebrew locations. 35 brew_prefix = os.getenv("HOMEBREW_PREFIX", "/usr/local") 36 lib = lib or glob.glob( 37 os.path.join(brew_prefix, "opt/openssl/lib/libcrypto.dylib") 38 ) 39 lib = lib or glob.glob( 40 os.path.join(brew_prefix, "opt/openssl@*/lib/libcrypto.dylib") 41 ) 42 # look in macports. 43 lib = lib or glob.glob("/opt/local/lib/libcrypto.dylib") 44 # check if 10.15, regular libcrypto.dylib is just a false pointer. 45 if platform.mac_ver()[0].split(".")[:2] == ["10", "15"]: 46 lib = lib or glob.glob("/usr/lib/libcrypto.*.dylib") 47 lib = list(reversed(sorted(lib))) 48 elif int(platform.mac_ver()[0].split(".")[0]) < 11: 49 # Fall back on system libcrypto (only works before Big Sur) 50 lib = lib or ["/usr/lib/libcrypto.dylib"] 51 lib = lib[0] if lib else None 52 elif getattr(sys, "frozen", False) and salt.utils.platform.is_smartos(): 53 lib = glob.glob(os.path.join(os.path.dirname(sys.executable), "libcrypto.so*")) 54 lib = lib[0] if lib else None 55 else: 56 lib = ctypes.util.find_library("crypto") 57 if not lib: 58 if salt.utils.platform.is_sunos(): 59 # Solaris-like distribution that use pkgsrc have libraries 60 # in a non standard location. 61 # (SmartOS, OmniOS, OpenIndiana, ...) 62 # This could be /opt/tools/lib (Global Zone) or 63 # /opt/local/lib (non-Global Zone), thus the two checks 64 # below 65 lib = glob.glob("/opt/saltstack/salt/run/libcrypto.so*") 66 lib = lib or glob.glob("/opt/local/lib/libcrypto.so*") 67 lib = lib or glob.glob("/opt/tools/lib/libcrypto.so*") 68 lib = lib[0] if lib else None 69 elif salt.utils.platform.is_aix(): 70 if os.path.isdir("/opt/saltstack/salt/run") or os.path.isdir( 71 "/opt/salt/lib" 72 ): 73 # preference for Salt installed fileset 74 lib = glob.glob("/opt/saltstack/salt/run/libcrypto.so*") 75 lib = lib or glob.glob("/opt/salt/lib/libcrypto.so*") 76 else: 77 lib = glob.glob("/opt/freeware/lib/libcrypto.so*") 78 lib = lib[0] if lib else None 79 if not lib: 80 raise OSError("Cannot locate OpenSSL libcrypto") 81 return lib 82 83 84def _load_libcrypto(): 85 """ 86 Attempt to load libcrypto. 87 """ 88 return cdll.LoadLibrary(_find_libcrypto()) 89 90 91def _init_libcrypto(): 92 """ 93 Set up libcrypto argtypes and initialize the library 94 """ 95 libcrypto = _load_libcrypto() 96 97 try: 98 # If we're greater than OpenSSL 1.1.0, no need to to the init 99 openssl_version_num = libcrypto.OpenSSL_version_num 100 if callable(openssl_version_num): 101 openssl_version_num = openssl_version_num() 102 if openssl_version_num < 0x10100000: 103 libcrypto.OPENSSL_init_crypto() 104 except AttributeError: 105 # Support for OpenSSL < 1.1 (OPENSSL_API_COMPAT < 0x10100000L) 106 libcrypto.OPENSSL_no_config() 107 libcrypto.OPENSSL_add_all_algorithms_noconf() 108 109 libcrypto.RSA_new.argtypes = () 110 libcrypto.RSA_new.restype = c_void_p 111 libcrypto.RSA_free.argtypes = (c_void_p,) 112 libcrypto.RSA_size.argtype = c_void_p 113 libcrypto.BIO_new_mem_buf.argtypes = (c_char_p, c_int) 114 libcrypto.BIO_new_mem_buf.restype = c_void_p 115 libcrypto.BIO_free.argtypes = (c_void_p,) 116 libcrypto.PEM_read_bio_RSAPrivateKey.argtypes = ( 117 c_void_p, 118 c_void_p, 119 c_void_p, 120 c_void_p, 121 ) 122 libcrypto.PEM_read_bio_RSAPrivateKey.restype = c_void_p 123 libcrypto.PEM_read_bio_RSA_PUBKEY.argtypes = ( 124 c_void_p, 125 c_void_p, 126 c_void_p, 127 c_void_p, 128 ) 129 libcrypto.PEM_read_bio_RSA_PUBKEY.restype = c_void_p 130 libcrypto.RSA_private_encrypt.argtypes = ( 131 c_int, 132 c_char_p, 133 c_char_p, 134 c_void_p, 135 c_int, 136 ) 137 libcrypto.RSA_public_decrypt.argtypes = (c_int, c_char_p, c_char_p, c_void_p, c_int) 138 139 return libcrypto 140 141 142libcrypto = _init_libcrypto() 143 144# openssl/rsa.h:#define RSA_X931_PADDING 5 145RSA_X931_PADDING = 5 146 147 148class RSAX931Signer: 149 """ 150 Create ANSI X9.31 RSA signatures using OpenSSL libcrypto 151 """ 152 153 def __init__(self, keydata): 154 """ 155 Init an RSAX931Signer instance 156 157 :param str keydata: The RSA private key in PEM format 158 """ 159 keydata = salt.utils.stringutils.to_bytes(keydata, "ascii") 160 self._bio = libcrypto.BIO_new_mem_buf(keydata, len(keydata)) 161 self._rsa = c_void_p(libcrypto.RSA_new()) 162 if not libcrypto.PEM_read_bio_RSAPrivateKey( 163 self._bio, pointer(self._rsa), None, None 164 ): 165 raise ValueError("invalid RSA private key") 166 167 # pylint: disable=W1701 168 def __del__(self): 169 libcrypto.BIO_free(self._bio) 170 libcrypto.RSA_free(self._rsa) 171 172 # pylint: enable=W1701 173 174 def sign(self, msg): 175 """ 176 Sign a message (digest) using the private key 177 178 :param str msg: The message (digest) to sign 179 :rtype: str 180 :return: The signature, or an empty string if the encryption failed 181 """ 182 # Allocate a buffer large enough for the signature. Freed by ctypes. 183 buf = create_string_buffer(libcrypto.RSA_size(self._rsa)) 184 msg = salt.utils.stringutils.to_bytes(msg) 185 size = libcrypto.RSA_private_encrypt( 186 len(msg), msg, buf, self._rsa, RSA_X931_PADDING 187 ) 188 if size < 0: 189 raise ValueError("Unable to encrypt message") 190 return buf[0:size] 191 192 193class RSAX931Verifier: 194 """ 195 Verify ANSI X9.31 RSA signatures using OpenSSL libcrypto 196 """ 197 198 def __init__(self, pubdata): 199 """ 200 Init an RSAX931Verifier instance 201 202 :param str pubdata: The RSA public key in PEM format 203 """ 204 pubdata = salt.utils.stringutils.to_bytes(pubdata, "ascii") 205 pubdata = pubdata.replace(b"RSA ", b"") 206 self._bio = libcrypto.BIO_new_mem_buf(pubdata, len(pubdata)) 207 self._rsa = c_void_p(libcrypto.RSA_new()) 208 if not libcrypto.PEM_read_bio_RSA_PUBKEY( 209 self._bio, pointer(self._rsa), None, None 210 ): 211 raise ValueError("invalid RSA public key") 212 213 # pylint: disable=W1701 214 def __del__(self): 215 libcrypto.BIO_free(self._bio) 216 libcrypto.RSA_free(self._rsa) 217 218 # pylint: enable=W1701 219 220 def verify(self, signed): 221 """ 222 Recover the message (digest) from the signature using the public key 223 224 :param str signed: The signature created with the private key 225 :rtype: str 226 :return: The message (digest) recovered from the signature, or an empty 227 string if the decryption failed 228 """ 229 # Allocate a buffer large enough for the signature. Freed by ctypes. 230 buf = create_string_buffer(libcrypto.RSA_size(self._rsa)) 231 signed = salt.utils.stringutils.to_bytes(signed) 232 size = libcrypto.RSA_public_decrypt( 233 len(signed), signed, buf, self._rsa, RSA_X931_PADDING 234 ) 235 if size < 0: 236 raise ValueError("Unable to decrypt message") 237 return buf[0:size] 238