xref: /freebsd/tests/sys/opencrypto/cryptotest.py (revision d0b2dbfa)
1#!/usr/local/bin/python3
2#
3# Copyright (c) 2014 The FreeBSD Foundation
4# All rights reserved.
5# Copyright 2019 Enji Cooper
6#
7# This software was developed by John-Mark Gurney under
8# the sponsorship from the FreeBSD Foundation.
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12# 1.  Redistributions of source code must retain the above copyright
13#     notice, this list of conditions and the following disclaimer.
14# 2.  Redistributions in binary form must reproduce the above copyright
15#     notice, this list of conditions and the following disclaimer in the
16#     documentation and/or other materials provided with the distribution.
17#
18# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28# SUCH DAMAGE.
29#
30#
31
32
33
34import binascii
35import errno
36import cryptodev
37import itertools
38import os
39import struct
40import unittest
41from cryptodev import *
42from glob import iglob
43
44katdir = '/usr/local/share/nist-kat'
45
46def katg(base, glob):
47    assert os.path.exists(katdir), "Please 'pkg install nist-kat'"
48    if not os.path.exists(os.path.join(katdir, base)):
49        raise unittest.SkipTest("Missing %s test vectors" % (base))
50    return iglob(os.path.join(katdir, base, glob))
51
52aesmodules = [ 'cryptosoft0', 'aesni0', 'armv8crypto0', 'ccr0', 'ccp0', 'ossl0', 'safexcel0', 'qat0' ]
53shamodules = [ 'cryptosoft0', 'aesni0', 'armv8crypto0', 'ccr0', 'ccp0', 'ossl0', 'safexcel0', 'qat0' ]
54
55def GenTestCase(cname):
56    try:
57        crid = cryptodev.Crypto.findcrid(cname)
58    except IOError:
59        return None
60
61    class GendCryptoTestCase(unittest.TestCase):
62        ###############
63        ##### AES #####
64        ###############
65        @unittest.skipIf(cname not in aesmodules, 'skipping AES-XTS on %s' % (cname))
66        def test_xts(self):
67            for i in katg('XTSTestVectors/format tweak value input - data unit seq no', '*.rsp'):
68                self.runXTS(i, cryptodev.CRYPTO_AES_XTS)
69
70        @unittest.skipIf(cname not in aesmodules, 'skipping AES-CBC on %s' % (cname))
71        def test_cbc(self):
72            for i in katg('KAT_AES', 'CBC[GKV]*.rsp'):
73                self.runCBC(i)
74
75        @unittest.skipIf(cname not in aesmodules, 'skipping AES-CCM on %s' % (cname))
76        def test_ccm(self):
77            for i in katg('ccmtestvectors', 'V*.rsp'):
78                self.runCCMEncrypt(i)
79
80            for i in katg('ccmtestvectors', 'D*.rsp'):
81                self.runCCMDecrypt(i)
82
83        @unittest.skipIf(cname not in aesmodules, 'skipping AES-GCM on %s' % (cname))
84        def test_gcm(self):
85            for i in katg('gcmtestvectors', 'gcmEncrypt*'):
86                self.runGCM(i, 'ENCRYPT')
87
88            for i in katg('gcmtestvectors', 'gcmDecrypt*'):
89                self.runGCM(i, 'DECRYPT')
90
91        def runGCM(self, fname, mode):
92            curfun = None
93            if mode == 'ENCRYPT':
94                swapptct = False
95                curfun = Crypto.encrypt
96            elif mode == 'DECRYPT':
97                swapptct = True
98                curfun = Crypto.decrypt
99            else:
100                raise RuntimeError('unknown mode: %r' % repr(mode))
101
102            columns = [ 'Count', 'Key', 'IV', 'CT', 'AAD', 'Tag', 'PT', ]
103            with cryptodev.KATParser(fname, columns) as parser:
104                self.runGCMWithParser(parser, mode)
105
106        def runGCMWithParser(self, parser, mode):
107            for _, lines in next(parser):
108                for data in lines:
109                    curcnt = int(data['Count'])
110                    cipherkey = binascii.unhexlify(data['Key'])
111                    iv = binascii.unhexlify(data['IV'])
112                    aad = binascii.unhexlify(data['AAD'])
113                    tag = binascii.unhexlify(data['Tag'])
114                    if 'FAIL' not in data:
115                        pt = binascii.unhexlify(data['PT'])
116                    ct = binascii.unhexlify(data['CT'])
117
118                    if len(iv) != 12:
119                        # XXX - isn't supported
120                        continue
121
122                    try:
123                        c = Crypto(cryptodev.CRYPTO_AES_NIST_GCM_16,
124                            cipherkey, crid=crid,
125                            maclen=16)
126                    except EnvironmentError as e:
127                        # Can't test algorithms the driver does not support.
128                        if e.errno != errno.EOPNOTSUPP:
129                            raise
130                        continue
131
132                    if mode == 'ENCRYPT':
133                        try:
134                            rct, rtag = c.encrypt(pt, iv, aad)
135                        except EnvironmentError as e:
136                            # Can't test inputs the driver does not support.
137                            if e.errno != errno.EINVAL:
138                                raise
139                            continue
140                        rtag = rtag[:len(tag)]
141                        data['rct'] = binascii.hexlify(rct)
142                        data['rtag'] = binascii.hexlify(rtag)
143                        self.assertEqual(rct, ct, repr(data))
144                        self.assertEqual(rtag, tag, repr(data))
145                    else:
146                        if len(tag) != 16:
147                            continue
148                        args = (ct, iv, aad, tag)
149                        if 'FAIL' in data:
150                            self.assertRaises(IOError,
151                                c.decrypt, *args)
152                        else:
153                            try:
154                                rpt, rtag = c.decrypt(*args)
155                            except EnvironmentError as e:
156                                # Can't test inputs the driver does not support.
157                                if e.errno != errno.EINVAL:
158                                    raise
159                                continue
160                            data['rpt'] = binascii.hexlify(rpt)
161                            data['rtag'] = binascii.hexlify(rtag)
162                            self.assertEqual(rpt, pt,
163                                repr(data))
164
165        def runCBC(self, fname):
166            columns = [ 'COUNT', 'KEY', 'IV', 'PLAINTEXT', 'CIPHERTEXT', ]
167            with cryptodev.KATParser(fname, columns) as parser:
168                self.runCBCWithParser(parser)
169
170        def runCBCWithParser(self, parser):
171            curfun = None
172            for mode, lines in next(parser):
173                if mode == 'ENCRYPT':
174                    swapptct = False
175                    curfun = Crypto.encrypt
176                elif mode == 'DECRYPT':
177                    swapptct = True
178                    curfun = Crypto.decrypt
179                else:
180                    raise RuntimeError('unknown mode: %r' % repr(mode))
181
182                for data in lines:
183                    curcnt = int(data['COUNT'])
184                    cipherkey = binascii.unhexlify(data['KEY'])
185                    iv = binascii.unhexlify(data['IV'])
186                    pt = binascii.unhexlify(data['PLAINTEXT'])
187                    ct = binascii.unhexlify(data['CIPHERTEXT'])
188
189                    if swapptct:
190                        pt, ct = ct, pt
191                    # run the fun
192                    c = Crypto(cryptodev.CRYPTO_AES_CBC, cipherkey, crid=crid)
193                    r = curfun(c, pt, iv)
194                    self.assertEqual(r, ct)
195
196        def runXTS(self, fname, meth):
197            columns = [ 'COUNT', 'DataUnitLen', 'Key', 'DataUnitSeqNumber', 'PT',
198                        'CT']
199            with cryptodev.KATParser(fname, columns) as parser:
200                self.runXTSWithParser(parser, meth)
201
202        def runXTSWithParser(self, parser, meth):
203            curfun = None
204            for mode, lines in next(parser):
205                if mode == 'ENCRYPT':
206                    swapptct = False
207                    curfun = Crypto.encrypt
208                elif mode == 'DECRYPT':
209                    swapptct = True
210                    curfun = Crypto.decrypt
211                else:
212                    raise RuntimeError('unknown mode: %r' % repr(mode))
213
214                for data in lines:
215                    curcnt = int(data['COUNT'])
216                    nbits = int(data['DataUnitLen'])
217                    cipherkey = binascii.unhexlify(data['Key'])
218                    iv = struct.pack('QQ', int(data['DataUnitSeqNumber']), 0)
219                    pt = binascii.unhexlify(data['PT'])
220                    ct = binascii.unhexlify(data['CT'])
221
222                    if nbits % 128 != 0:
223                        # XXX - mark as skipped
224                        continue
225                    if swapptct:
226                        pt, ct = ct, pt
227                    # run the fun
228                    try:
229                        c = Crypto(meth, cipherkey, crid=crid)
230                        r = curfun(c, pt, iv)
231                    except EnvironmentError as e:
232                        # Can't test hashes the driver does not support.
233                        if e.errno != errno.EOPNOTSUPP:
234                            raise
235                        continue
236                    self.assertEqual(r, ct)
237
238        def runCCMEncrypt(self, fname):
239            with cryptodev.KATCCMParser(fname) as parser:
240                self.runCCMEncryptWithParser(parser)
241
242        def runCCMEncryptWithParser(self, parser):
243            for data in next(parser):
244                Nlen = int(data['Nlen'])
245                Tlen = int(data['Tlen'])
246                key = binascii.unhexlify(data['Key'])
247                nonce = binascii.unhexlify(data['Nonce'])
248                Alen = int(data['Alen'])
249                Plen = int(data['Plen'])
250                if Alen != 0:
251                    aad = binascii.unhexlify(data['Adata'])
252                else:
253                    aad = None
254                if Plen != 0:
255                    payload = binascii.unhexlify(data['Payload'])
256                else:
257                    payload = None
258                ct = binascii.unhexlify(data['CT'])
259
260                try:
261                    c = Crypto(crid=crid,
262                        cipher=cryptodev.CRYPTO_AES_CCM_16,
263                        key=key,
264                        mackey=key, maclen=Tlen, ivlen=Nlen)
265                    r, tag = Crypto.encrypt(c, payload,
266                        nonce, aad)
267                except EnvironmentError as e:
268                    if e.errno != errno.EOPNOTSUPP:
269                        raise
270                    continue
271
272                out = r + tag
273                self.assertEqual(out, ct,
274                    "Count " + data['Count'] + " Actual: " + \
275                    repr(binascii.hexlify(out)) + " Expected: " + \
276                    repr(data) + " on " + cname)
277
278        def runCCMDecrypt(self, fname):
279            with cryptodev.KATCCMParser(fname) as parser:
280                self.runCCMDecryptWithParser(parser)
281
282        def runCCMDecryptWithParser(self, parser):
283            for data in next(parser):
284                Nlen = int(data['Nlen'])
285                Tlen = int(data['Tlen'])
286                key = binascii.unhexlify(data['Key'])
287                nonce = binascii.unhexlify(data['Nonce'])
288                Alen = int(data['Alen'])
289                Plen = int(data['Plen'])
290                if Alen != 0:
291                    aad = binascii.unhexlify(data['Adata'])
292                else:
293                    aad = None
294                ct = binascii.unhexlify(data['CT'])
295                tag = ct[-Tlen:]
296                if Plen != 0:
297                    payload = ct[:-Tlen]
298                else:
299                    payload = None
300
301                try:
302                    c = Crypto(crid=crid,
303                        cipher=cryptodev.CRYPTO_AES_CCM_16,
304                        key=key,
305                        mackey=key, maclen=Tlen, ivlen=Nlen)
306                except EnvironmentError as e:
307                    if e.errno != errno.EOPNOTSUPP:
308                        raise
309                    continue
310
311                if data['Result'] == 'Fail':
312                    self.assertRaises(IOError,
313                        c.decrypt, payload, nonce, aad, tag)
314                else:
315                    r, tag = Crypto.decrypt(c, payload, nonce,
316                                            aad, tag)
317
318                    payload = binascii.unhexlify(data['Payload'])
319                    payload = payload[:Plen]
320                    self.assertEqual(r, payload,
321                        "Count " + data['Count'] + \
322                        " Actual: " + repr(binascii.hexlify(r)) + \
323                        " Expected: " + repr(data) + \
324                        " on " + cname)
325
326        ###############
327        ##### SHA #####
328        ###############
329        @unittest.skipIf(cname not in shamodules, 'skipping SHA on %s' % str(cname))
330        def test_sha(self):
331            for i in katg('shabytetestvectors', 'SHA*Msg.rsp'):
332                self.runSHA(i)
333
334        def runSHA(self, fname):
335            # Skip SHA512_(224|256) tests
336            if fname.find('SHA512_') != -1:
337                return
338            columns = [ 'Len', 'Msg', 'MD' ]
339            with cryptodev.KATParser(fname, columns) as parser:
340                self.runSHAWithParser(parser)
341
342        def runSHAWithParser(self, parser):
343            for hashlength, lines in next(parser):
344                # E.g., hashlength will be "L=20" (bytes)
345                hashlen = int(hashlength.split("=")[1])
346
347                if hashlen == 20:
348                    alg = cryptodev.CRYPTO_SHA1
349                elif hashlen == 28:
350                    alg = cryptodev.CRYPTO_SHA2_224
351                elif hashlen == 32:
352                    alg = cryptodev.CRYPTO_SHA2_256
353                elif hashlen == 48:
354                    alg = cryptodev.CRYPTO_SHA2_384
355                elif hashlen == 64:
356                    alg = cryptodev.CRYPTO_SHA2_512
357                else:
358                    # Skip unsupported hashes
359                    # Slurp remaining input in section
360                    for data in lines:
361                        continue
362                    continue
363
364                for data in lines:
365                    msg = binascii.unhexlify(data['Msg'])
366                    msg = msg[:int(data['Len'])]
367                    md = binascii.unhexlify(data['MD'])
368
369                    try:
370                        c = Crypto(mac=alg, crid=crid,
371                            maclen=hashlen)
372                    except EnvironmentError as e:
373                        # Can't test hashes the driver does not support.
374                        if e.errno != errno.EOPNOTSUPP:
375                            raise
376                        continue
377
378                    _, r = c.encrypt(msg, iv="")
379
380                    self.assertEqual(r, md, "Actual: " + \
381                        repr(binascii.hexlify(r)) + " Expected: " + repr(data) + " on " + cname)
382
383        @unittest.skipIf(cname not in shamodules, 'skipping SHA-HMAC on %s' % str(cname))
384        def test_sha1hmac(self):
385            for i in katg('hmactestvectors', 'HMAC.rsp'):
386                self.runSHA1HMAC(i)
387
388        def runSHA1HMAC(self, fname):
389            columns = [ 'Count', 'Klen', 'Tlen', 'Key', 'Msg', 'Mac' ]
390            with cryptodev.KATParser(fname, columns) as parser:
391                self.runSHA1HMACWithParser(parser)
392
393        def runSHA1HMACWithParser(self, parser):
394            for hashlength, lines in next(parser):
395                # E.g., hashlength will be "L=20" (bytes)
396                hashlen = int(hashlength.split("=")[1])
397
398                blocksize = None
399                if hashlen == 20:
400                    alg = cryptodev.CRYPTO_SHA1_HMAC
401                    blocksize = 64
402                elif hashlen == 28:
403                    alg = cryptodev.CRYPTO_SHA2_224_HMAC
404                    blocksize = 64
405                elif hashlen == 32:
406                    alg = cryptodev.CRYPTO_SHA2_256_HMAC
407                    blocksize = 64
408                elif hashlen == 48:
409                    alg = cryptodev.CRYPTO_SHA2_384_HMAC
410                    blocksize = 128
411                elif hashlen == 64:
412                    alg = cryptodev.CRYPTO_SHA2_512_HMAC
413                    blocksize = 128
414                else:
415                    # Skip unsupported hashes
416                    # Slurp remaining input in section
417                    for data in lines:
418                        continue
419                    continue
420
421                for data in lines:
422                    key = binascii.unhexlify(data['Key'])
423                    msg = binascii.unhexlify(data['Msg'])
424                    mac = binascii.unhexlify(data['Mac'])
425                    tlen = int(data['Tlen'])
426
427                    if len(key) > blocksize:
428                        continue
429
430                    try:
431                        c = Crypto(mac=alg, mackey=key,
432                            crid=crid, maclen=hashlen)
433                    except EnvironmentError as e:
434                        # Can't test hashes the driver does not support.
435                        if e.errno != errno.EOPNOTSUPP:
436                            raise
437                        continue
438
439                    _, r = c.encrypt(msg, iv="")
440
441                    self.assertEqual(r[:tlen], mac, "Actual: " + \
442                        repr(binascii.hexlify(r)) + " Expected: " + repr(data))
443
444    return GendCryptoTestCase
445
446cryptosoft = GenTestCase('cryptosoft0')
447aesni = GenTestCase('aesni0')
448armv8crypto = GenTestCase('armv8crypto0')
449ccr = GenTestCase('ccr0')
450ccp = GenTestCase('ccp0')
451ossl = GenTestCase('ossl0')
452safexcel = GenTestCase('safexcel0')
453qat = GenTestCase('qat0')
454
455if __name__ == '__main__':
456    unittest.main()
457