1from coincurve.context import GLOBAL_CONTEXT, Context
2from coincurve.types import Hasher
3from coincurve.utils import bytes_to_int, int_to_bytes, sha256
4
5from ._libsecp256k1 import ffi, lib
6
7MAX_SIG_LENGTH = 72
8CDATA_SIG_LENGTH = 64
9
10
11def cdata_to_der(cdata, context: Context = GLOBAL_CONTEXT) -> bytes:
12    der = ffi.new('unsigned char[%d]' % MAX_SIG_LENGTH)
13    der_length = ffi.new('size_t *', MAX_SIG_LENGTH)
14
15    lib.secp256k1_ecdsa_signature_serialize_der(context.ctx, der, der_length, cdata)
16
17    return bytes(ffi.buffer(der, der_length[0]))
18
19
20def der_to_cdata(der: bytes, context: Context = GLOBAL_CONTEXT):
21    cdata = ffi.new('secp256k1_ecdsa_signature *')
22    parsed = lib.secp256k1_ecdsa_signature_parse_der(context.ctx, cdata, der, len(der))
23
24    if not parsed:
25        raise ValueError('The DER-encoded signature could not be parsed.')
26
27    return cdata
28
29
30def recover(message: bytes, recover_sig, hasher: Hasher = sha256, context: Context = GLOBAL_CONTEXT):
31    msg_hash = hasher(message) if hasher is not None else message
32    if len(msg_hash) != 32:
33        raise ValueError('Message hash must be 32 bytes long.')
34    pubkey = ffi.new('secp256k1_pubkey *')
35
36    recovered = lib.secp256k1_ecdsa_recover(context.ctx, pubkey, recover_sig, msg_hash)
37    if recovered:
38        return pubkey
39    raise ValueError('failed to recover ECDSA public key')
40
41
42def serialize_recoverable(recover_sig, context: Context = GLOBAL_CONTEXT) -> bytes:
43    output = ffi.new('unsigned char[%d]' % CDATA_SIG_LENGTH)
44    recid = ffi.new('int *')
45
46    lib.secp256k1_ecdsa_recoverable_signature_serialize_compact(context.ctx, output, recid, recover_sig)
47
48    return bytes(ffi.buffer(output, CDATA_SIG_LENGTH)) + int_to_bytes(recid[0])
49
50
51def deserialize_recoverable(serialized: bytes, context: Context = GLOBAL_CONTEXT):
52    if len(serialized) != 65:
53        raise ValueError('Serialized signature must be 65 bytes long.')
54
55    ser_sig, rec_id = serialized[:64], bytes_to_int(serialized[64:])
56
57    if not 0 <= rec_id <= 3:
58        raise ValueError('Invalid recovery id.')
59
60    recover_sig = ffi.new('secp256k1_ecdsa_recoverable_signature *')
61
62    parsed = lib.secp256k1_ecdsa_recoverable_signature_parse_compact(context.ctx, recover_sig, ser_sig, rec_id)
63    if not parsed:
64        raise ValueError('Failed to parse recoverable signature.')
65
66    return recover_sig
67
68
69"""
70Warning:
71    The functions below may change and are not tested!
72"""
73
74
75def serialize_compact(raw_sig, context: Context = GLOBAL_CONTEXT):  # no cov
76    output = ffi.new('unsigned char[%d]' % CDATA_SIG_LENGTH)
77
78    res = lib.secp256k1_ecdsa_signature_serialize_compact(context.ctx, output, raw_sig)
79    assert res == 1
80
81    return bytes(ffi.buffer(output, CDATA_SIG_LENGTH))
82
83
84def deserialize_compact(ser_sig: bytes, context: Context = GLOBAL_CONTEXT):  # no cov
85    if len(ser_sig) != 64:
86        raise Exception('invalid signature length')
87
88    raw_sig = ffi.new('secp256k1_ecdsa_signature *')
89    res = lib.secp256k1_ecdsa_signature_parse_compact(context.ctx, raw_sig, ser_sig)
90    assert res == 1
91
92    return raw_sig
93
94
95def signature_normalize(raw_sig, context: Context = GLOBAL_CONTEXT):  # no cov
96    """
97    Check and optionally convert a signature to a normalized lower-S form.
98
99    This function always return a tuple containing a boolean (True if
100    not previously normalized or False if signature was already
101    normalized), and the normalized signature.
102    """
103    sigout = ffi.new('secp256k1_ecdsa_signature *')
104
105    res = lib.secp256k1_ecdsa_signature_normalize(context.ctx, sigout, raw_sig)
106
107    return not not res, sigout
108
109
110def recoverable_convert(recover_sig, context: Context = GLOBAL_CONTEXT):  # no cov
111    normal_sig = ffi.new('secp256k1_ecdsa_signature *')
112
113    lib.secp256k1_ecdsa_recoverable_signature_convert(context.ctx, normal_sig, recover_sig)
114
115    return normal_sig
116