1# vim: sts=4 sw=4 et
2from __future__ import absolute_import
3
4"""
5M2Crypto wrapper for OpenSSL ENGINE API.
6
7Pavel Shramov
8IMEC MSU
9"""
10
11from M2Crypto import EVP, Err, X509, m2, six
12from typing import AnyStr, Callable, Optional  # noqa
13
14
15class EngineError(Exception):
16    pass
17
18
19m2.engine_init_error(EngineError)
20
21
22class Engine(object):
23    """Wrapper for ENGINE object."""
24
25    m2_engine_free = m2.engine_free
26
27    def __init__(self, id=None, _ptr=None, _pyfree=1):
28        # type: (Optional[bytes], Optional[bytes], int) -> None
29        """Create new Engine from ENGINE pointer or obtain by id"""
30        if not _ptr and not id:
31            raise ValueError("No engine id specified")
32        self._ptr = _ptr
33        if not self._ptr:
34            self._ptr = m2.engine_by_id(id)
35            if not self._ptr:
36                raise ValueError("Unknown engine: %s" % id)
37        self._pyfree = _pyfree
38
39    def __del__(self):
40        # type: () -> None
41        if getattr(self, '_pyfree', 0):
42            self.m2_engine_free(self._ptr)
43
44    def init(self):
45        # type: () -> int
46        """Obtain a functional reference to the engine.
47
48        :return: 0 on error, non-zero on success."""
49        return m2.engine_init(self._ptr)
50
51    def finish(self):
52        # type: () -> int
53        """Release a functional and structural reference to the engine."""
54        return m2.engine_finish(self._ptr)
55
56    def ctrl_cmd_string(self, cmd, arg, optional=0):
57        # type: (AnyStr, Optional[AnyStr], int) -> None
58        """Call ENGINE_ctrl_cmd_string"""
59        cmd = six.ensure_str(cmd)
60        if arg is not None:
61            arg = six.ensure_str(arg)
62        if not m2.engine_ctrl_cmd_string(self._ptr, cmd, arg, optional):
63            raise EngineError(Err.get_error())
64
65    def get_name(self):
66        # type: () -> bytes
67        """Return engine name"""
68        return m2.engine_get_name(self._ptr)
69
70    def get_id(self):
71        # type: () -> bytes
72        """Return engine id"""
73        return m2.engine_get_id(self._ptr)
74
75    def set_default(self, methods=m2.ENGINE_METHOD_ALL):
76        # type: (int) -> int
77        """
78        Use this engine as default for methods specified in argument
79
80        :param methods: Possible values are bitwise OR of m2.ENGINE_METHOD_*
81        """
82        return m2.engine_set_default(self._ptr, methods)
83
84    def _engine_load_key(self, func, name, pin=None):
85        # type: (Callable, bytes, Optional[bytes]) -> EVP.PKey
86        """Helper function for loading keys"""
87        ui = m2.ui_openssl()
88        cbd = m2.engine_pkcs11_data_new(pin)
89        try:
90            kptr = func(self._ptr, name, ui, cbd)
91            if not kptr:
92                raise EngineError(Err.get_error())
93            key = EVP.PKey(kptr, _pyfree=1)
94        finally:
95            m2.engine_pkcs11_data_free(cbd)
96        return key
97
98    def load_private_key(self, name, pin=None):
99        # type: (bytes, Optional[bytes]) -> X509.X509
100        """Load private key with engine methods (e.g from smartcard).
101            If pin is not set it will be asked
102        """
103        return self._engine_load_key(m2.engine_load_private_key, name, pin)
104
105    def load_public_key(self, name, pin=None):
106        # type: (bytes, Optional[bytes]) -> EVP.PKey
107        """Load public key with engine methods (e.g from smartcard)."""
108        return self._engine_load_key(m2.engine_load_public_key, name, pin)
109
110    def load_certificate(self, name):
111        # type: (bytes) -> X509.X509
112        """Load certificate from engine (e.g from smartcard).
113        NOTE: This function may be not implemented by engine!"""
114        cptr = m2.engine_load_certificate(self._ptr, name)
115        if not cptr:
116            raise EngineError("Certificate or card not found")
117        return X509.X509(cptr, _pyfree=1)
118
119
120def load_dynamic_engine(id, sopath):
121    # type: (bytes, AnyStr) -> Engine
122    """Load and return dymanic engine from sopath and assign id to it"""
123    if isinstance(sopath, six.text_type):
124        sopath = sopath.encode('utf8')
125    m2.engine_load_dynamic()
126    e = Engine('dynamic')
127    e.ctrl_cmd_string('SO_PATH', sopath)
128    e.ctrl_cmd_string('ID', id)
129    e.ctrl_cmd_string('LIST_ADD', '1')
130    e.ctrl_cmd_string('LOAD', None)
131    return e
132
133
134def load_dynamic():
135    # type: () -> None
136    """Load dynamic engine"""
137    m2.engine_load_dynamic()
138
139
140def load_openssl():
141    # type: () -> None
142    """Load openssl engine"""
143    m2.engine_load_openssl()
144
145
146def cleanup():
147    # type: () -> None
148    """If you load any engines, you need to clean up after your application
149    is finished with the engines."""
150    m2.engine_cleanup()
151