1from coincurve.context import GLOBAL_CONTEXT, Context 2from coincurve.types import Hasher 3from coincurve.utils import bytes_to_int, int_to_bytes, sha256 4 5from ._libsecp256k1 import ffi, lib 6 7MAX_SIG_LENGTH = 72 8CDATA_SIG_LENGTH = 64 9 10 11def cdata_to_der(cdata, context: Context = GLOBAL_CONTEXT) -> bytes: 12 der = ffi.new('unsigned char[%d]' % MAX_SIG_LENGTH) 13 der_length = ffi.new('size_t *', MAX_SIG_LENGTH) 14 15 lib.secp256k1_ecdsa_signature_serialize_der(context.ctx, der, der_length, cdata) 16 17 return bytes(ffi.buffer(der, der_length[0])) 18 19 20def der_to_cdata(der: bytes, context: Context = GLOBAL_CONTEXT): 21 cdata = ffi.new('secp256k1_ecdsa_signature *') 22 parsed = lib.secp256k1_ecdsa_signature_parse_der(context.ctx, cdata, der, len(der)) 23 24 if not parsed: 25 raise ValueError('The DER-encoded signature could not be parsed.') 26 27 return cdata 28 29 30def recover(message: bytes, recover_sig, hasher: Hasher = sha256, context: Context = GLOBAL_CONTEXT): 31 msg_hash = hasher(message) if hasher is not None else message 32 if len(msg_hash) != 32: 33 raise ValueError('Message hash must be 32 bytes long.') 34 pubkey = ffi.new('secp256k1_pubkey *') 35 36 recovered = lib.secp256k1_ecdsa_recover(context.ctx, pubkey, recover_sig, msg_hash) 37 if recovered: 38 return pubkey 39 raise ValueError('failed to recover ECDSA public key') 40 41 42def serialize_recoverable(recover_sig, context: Context = GLOBAL_CONTEXT) -> bytes: 43 output = ffi.new('unsigned char[%d]' % CDATA_SIG_LENGTH) 44 recid = ffi.new('int *') 45 46 lib.secp256k1_ecdsa_recoverable_signature_serialize_compact(context.ctx, output, recid, recover_sig) 47 48 return bytes(ffi.buffer(output, CDATA_SIG_LENGTH)) + int_to_bytes(recid[0]) 49 50 51def deserialize_recoverable(serialized: bytes, context: Context = GLOBAL_CONTEXT): 52 if len(serialized) != 65: 53 raise ValueError('Serialized signature must be 65 bytes long.') 54 55 ser_sig, rec_id = serialized[:64], bytes_to_int(serialized[64:]) 56 57 if not 0 <= rec_id <= 3: 58 raise ValueError('Invalid recovery id.') 59 60 recover_sig = ffi.new('secp256k1_ecdsa_recoverable_signature *') 61 62 parsed = lib.secp256k1_ecdsa_recoverable_signature_parse_compact(context.ctx, recover_sig, ser_sig, rec_id) 63 if not parsed: 64 raise ValueError('Failed to parse recoverable signature.') 65 66 return recover_sig 67 68 69""" 70Warning: 71 The functions below may change and are not tested! 72""" 73 74 75def serialize_compact(raw_sig, context: Context = GLOBAL_CONTEXT): # no cov 76 output = ffi.new('unsigned char[%d]' % CDATA_SIG_LENGTH) 77 78 res = lib.secp256k1_ecdsa_signature_serialize_compact(context.ctx, output, raw_sig) 79 assert res == 1 80 81 return bytes(ffi.buffer(output, CDATA_SIG_LENGTH)) 82 83 84def deserialize_compact(ser_sig: bytes, context: Context = GLOBAL_CONTEXT): # no cov 85 if len(ser_sig) != 64: 86 raise Exception('invalid signature length') 87 88 raw_sig = ffi.new('secp256k1_ecdsa_signature *') 89 res = lib.secp256k1_ecdsa_signature_parse_compact(context.ctx, raw_sig, ser_sig) 90 assert res == 1 91 92 return raw_sig 93 94 95def signature_normalize(raw_sig, context: Context = GLOBAL_CONTEXT): # no cov 96 """ 97 Check and optionally convert a signature to a normalized lower-S form. 98 99 This function always return a tuple containing a boolean (True if 100 not previously normalized or False if signature was already 101 normalized), and the normalized signature. 102 """ 103 sigout = ffi.new('secp256k1_ecdsa_signature *') 104 105 res = lib.secp256k1_ecdsa_signature_normalize(context.ctx, sigout, raw_sig) 106 107 return not not res, sigout 108 109 110def recoverable_convert(recover_sig, context: Context = GLOBAL_CONTEXT): # no cov 111 normal_sig = ffi.new('secp256k1_ecdsa_signature *') 112 113 lib.secp256k1_ecdsa_recoverable_signature_convert(context.ctx, normal_sig, recover_sig) 114 115 return normal_sig 116