1# -*- coding: utf-8 -*-
2#
3"""
4test_abnf.py
5websocket - WebSocket client library for Python
6
7Copyright 2021 engn33r
8
9Licensed under the Apache License, Version 2.0 (the "License");
10you may not use this file except in compliance with the License.
11You may obtain a copy of the License at
12
13    http://www.apache.org/licenses/LICENSE-2.0
14
15Unless required by applicable law or agreed to in writing, software
16distributed under the License is distributed on an "AS IS" BASIS,
17WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18See the License for the specific language governing permissions and
19limitations under the License.
20"""
21
22import os
23import websocket as ws
24from websocket._abnf import *
25import sys
26import unittest
27sys.path[0:0] = [""]
28
29
30class ABNFTest(unittest.TestCase):
31
32    def testInit(self):
33        a = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
34        self.assertEqual(a.fin, 0)
35        self.assertEqual(a.rsv1, 0)
36        self.assertEqual(a.rsv2, 0)
37        self.assertEqual(a.rsv3, 0)
38        self.assertEqual(a.opcode, 9)
39        self.assertEqual(a.data, '')
40        a_bad = ABNF(0,1,0,0, opcode=77)
41        self.assertEqual(a_bad.rsv1, 1)
42        self.assertEqual(a_bad.opcode, 77)
43
44    def testValidate(self):
45        a_invalid_ping = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
46        self.assertRaises(ws._exceptions.WebSocketProtocolException, a_invalid_ping.validate, skip_utf8_validation=False)
47        a_bad_rsv_value = ABNF(0,1,0,0, opcode=ABNF.OPCODE_TEXT)
48        self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_rsv_value.validate, skip_utf8_validation=False)
49        a_bad_opcode = ABNF(0,0,0,0, opcode=77)
50        self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_opcode.validate, skip_utf8_validation=False)
51        a_bad_close_frame = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01')
52        self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame.validate, skip_utf8_validation=False)
53        a_bad_close_frame_2 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01\x8a\xaa\xff\xdd')
54        self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_2.validate, skip_utf8_validation=False)
55        a_bad_close_frame_3 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x03\xe7')
56        self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_3.validate, skip_utf8_validation=True)
57
58    def testMask(self):
59        abnf_none_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data=None)
60        bytes_val = bytes("aaaa", 'utf-8')
61        self.assertEqual(abnf_none_data._get_masked(bytes_val), bytes_val)
62        abnf_str_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data="a")
63        self.assertEqual(abnf_str_data._get_masked(bytes_val), b'aaaa\x00')
64
65    def testFormat(self):
66        abnf_bad_rsv_bits = ABNF(2,0,0,0, opcode=ABNF.OPCODE_TEXT)
67        self.assertRaises(ValueError, abnf_bad_rsv_bits.format)
68        abnf_bad_opcode = ABNF(0,0,0,0, opcode=5)
69        self.assertRaises(ValueError, abnf_bad_opcode.format)
70        abnf_length_10 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, data="abcdefghij")
71        self.assertEqual(b'\x01', abnf_length_10.format()[0].to_bytes(1, 'big'))
72        self.assertEqual(b'\x8a', abnf_length_10.format()[1].to_bytes(1, 'big'))
73        self.assertEqual("fin=0 opcode=1 data=abcdefghij", abnf_length_10.__str__())
74        abnf_length_20 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_BINARY, data="abcdefghijabcdefghij")
75        self.assertEqual(b'\x02', abnf_length_20.format()[0].to_bytes(1, 'big'))
76        self.assertEqual(b'\x94', abnf_length_20.format()[1].to_bytes(1, 'big'))
77        abnf_no_mask = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, mask=0, data=b'\x01\x8a\xcc')
78        self.assertEqual(b'\x01\x03\x01\x8a\xcc', abnf_no_mask.format())
79
80    def testFrameBuffer(self):
81        fb = frame_buffer(0, True)
82        self.assertEqual(fb.recv, 0)
83        self.assertEqual(fb.skip_utf8_validation, True)
84        fb.clear
85        self.assertEqual(fb.header, None)
86        self.assertEqual(fb.length, None)
87        self.assertEqual(fb.mask, None)
88        self.assertEqual(fb.has_mask(), False)
89
90
91if __name__ == "__main__":
92    unittest.main()
93