1#
2# test_multibytecodec.py
3#   Unit test for multibytecodec itself
4#
5
6import _multibytecodec
7import codecs
8import io
9import sys
10import textwrap
11import unittest
12from test import support
13from test.support import os_helper
14from test.support.os_helper import TESTFN
15
16ALL_CJKENCODINGS = [
17# _codecs_cn
18    'gb2312', 'gbk', 'gb18030', 'hz',
19# _codecs_hk
20    'big5hkscs',
21# _codecs_jp
22    'cp932', 'shift_jis', 'euc_jp', 'euc_jisx0213', 'shift_jisx0213',
23    'euc_jis_2004', 'shift_jis_2004',
24# _codecs_kr
25    'cp949', 'euc_kr', 'johab',
26# _codecs_tw
27    'big5', 'cp950',
28# _codecs_iso2022
29    'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004',
30    'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr',
31]
32
33class Test_MultibyteCodec(unittest.TestCase):
34
35    def test_nullcoding(self):
36        for enc in ALL_CJKENCODINGS:
37            self.assertEqual(b''.decode(enc), '')
38            self.assertEqual(str(b'', enc), '')
39            self.assertEqual(''.encode(enc), b'')
40
41    def test_str_decode(self):
42        for enc in ALL_CJKENCODINGS:
43            self.assertEqual('abcd'.encode(enc), b'abcd')
44
45    def test_errorcallback_longindex(self):
46        dec = codecs.getdecoder('euc-kr')
47        myreplace  = lambda exc: ('', sys.maxsize+1)
48        codecs.register_error('test.cjktest', myreplace)
49        self.assertRaises(IndexError, dec,
50                          b'apple\x92ham\x93spam', 'test.cjktest')
51
52    def test_errorcallback_custom_ignore(self):
53        # Issue #23215: MemoryError with custom error handlers and multibyte codecs
54        data = 100 * "\udc00"
55        codecs.register_error("test.ignore", codecs.ignore_errors)
56        for enc in ALL_CJKENCODINGS:
57            self.assertEqual(data.encode(enc, "test.ignore"), b'')
58
59    def test_codingspec(self):
60        try:
61            for enc in ALL_CJKENCODINGS:
62                code = '# coding: {}\n'.format(enc)
63                exec(code)
64        finally:
65            os_helper.unlink(TESTFN)
66
67    def test_init_segfault(self):
68        # bug #3305: this used to segfault
69        self.assertRaises(AttributeError,
70                          _multibytecodec.MultibyteStreamReader, None)
71        self.assertRaises(AttributeError,
72                          _multibytecodec.MultibyteStreamWriter, None)
73
74    def test_decode_unicode(self):
75        # Trying to decode a unicode string should raise a TypeError
76        for enc in ALL_CJKENCODINGS:
77            self.assertRaises(TypeError, codecs.getdecoder(enc), "")
78
79class Test_IncrementalEncoder(unittest.TestCase):
80
81    def test_stateless(self):
82        # cp949 encoder isn't stateful at all.
83        encoder = codecs.getincrementalencoder('cp949')()
84        self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'),
85                         b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
86        self.assertEqual(encoder.reset(), None)
87        self.assertEqual(encoder.encode('\u2606\u223c\u2606', True),
88                         b'\xa1\xd9\xa1\xad\xa1\xd9')
89        self.assertEqual(encoder.reset(), None)
90        self.assertEqual(encoder.encode('', True), b'')
91        self.assertEqual(encoder.encode('', False), b'')
92        self.assertEqual(encoder.reset(), None)
93
94    def test_stateful(self):
95        # jisx0213 encoder is stateful for a few code points. eg)
96        #   U+00E6 => A9DC
97        #   U+00E6 U+0300 => ABC4
98        #   U+0300 => ABDC
99
100        encoder = codecs.getincrementalencoder('jisx0213')()
101        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
102        self.assertEqual(encoder.encode('\u00e6'), b'')
103        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
104        self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')
105
106        self.assertEqual(encoder.reset(), None)
107        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
108
109        self.assertEqual(encoder.encode('\u00e6'), b'')
110        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
111        self.assertEqual(encoder.encode('', True), b'')
112
113    def test_stateful_keep_buffer(self):
114        encoder = codecs.getincrementalencoder('jisx0213')()
115        self.assertEqual(encoder.encode('\u00e6'), b'')
116        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
117        self.assertEqual(encoder.encode('\u0300\u00e6'), b'\xab\xc4')
118        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
119        self.assertEqual(encoder.reset(), None)
120        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
121        self.assertEqual(encoder.encode('\u00e6'), b'')
122        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
123        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
124
125    def test_state_methods_with_buffer_state(self):
126        # euc_jis_2004 stores state as a buffer of pending bytes
127        encoder = codecs.getincrementalencoder('euc_jis_2004')()
128
129        initial_state = encoder.getstate()
130        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
131        encoder.setstate(initial_state)
132        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
133
134        self.assertEqual(encoder.encode('\u00e6'), b'')
135        partial_state = encoder.getstate()
136        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
137        encoder.setstate(partial_state)
138        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
139
140    def test_state_methods_with_non_buffer_state(self):
141        # iso2022_jp stores state without using a buffer
142        encoder = codecs.getincrementalencoder('iso2022_jp')()
143
144        self.assertEqual(encoder.encode('z'), b'z')
145        en_state = encoder.getstate()
146
147        self.assertEqual(encoder.encode('\u3042'), b'\x1b\x24\x42\x24\x22')
148        jp_state = encoder.getstate()
149        self.assertEqual(encoder.encode('z'), b'\x1b\x28\x42z')
150
151        encoder.setstate(jp_state)
152        self.assertEqual(encoder.encode('\u3042'), b'\x24\x22')
153
154        encoder.setstate(en_state)
155        self.assertEqual(encoder.encode('z'), b'z')
156
157    def test_getstate_returns_expected_value(self):
158        # Note: getstate is implemented such that these state values
159        # are expected to be the same across all builds of Python,
160        # regardless of x32/64 bit, endianness and compiler.
161
162        # euc_jis_2004 stores state as a buffer of pending bytes
163        buffer_state_encoder = codecs.getincrementalencoder('euc_jis_2004')()
164        self.assertEqual(buffer_state_encoder.getstate(), 0)
165        buffer_state_encoder.encode('\u00e6')
166        self.assertEqual(buffer_state_encoder.getstate(),
167                         int.from_bytes(
168                             b"\x02"
169                             b"\xc3\xa6"
170                             b"\x00\x00\x00\x00\x00\x00\x00\x00",
171                             'little'))
172        buffer_state_encoder.encode('\u0300')
173        self.assertEqual(buffer_state_encoder.getstate(), 0)
174
175        # iso2022_jp stores state without using a buffer
176        non_buffer_state_encoder = codecs.getincrementalencoder('iso2022_jp')()
177        self.assertEqual(non_buffer_state_encoder.getstate(),
178                         int.from_bytes(
179                             b"\x00"
180                             b"\x42\x42\x00\x00\x00\x00\x00\x00",
181                             'little'))
182        non_buffer_state_encoder.encode('\u3042')
183        self.assertEqual(non_buffer_state_encoder.getstate(),
184                         int.from_bytes(
185                             b"\x00"
186                             b"\xc2\x42\x00\x00\x00\x00\x00\x00",
187                             'little'))
188
189    def test_setstate_validates_input_size(self):
190        encoder = codecs.getincrementalencoder('euc_jp')()
191        pending_size_nine = int.from_bytes(
192            b"\x09"
193            b"\x00\x00\x00\x00\x00\x00\x00\x00"
194            b"\x00\x00\x00\x00\x00\x00\x00\x00",
195            'little')
196        self.assertRaises(UnicodeError, encoder.setstate, pending_size_nine)
197
198    def test_setstate_validates_input_bytes(self):
199        encoder = codecs.getincrementalencoder('euc_jp')()
200        invalid_utf8 = int.from_bytes(
201            b"\x01"
202            b"\xff"
203            b"\x00\x00\x00\x00\x00\x00\x00\x00",
204            'little')
205        self.assertRaises(UnicodeDecodeError, encoder.setstate, invalid_utf8)
206
207    def test_issue5640(self):
208        encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
209        self.assertEqual(encoder.encode('\xff'), b'\\xff')
210        self.assertEqual(encoder.encode('\n'), b'\n')
211
212    @support.cpython_only
213    def test_subinterp(self):
214        # bpo-42846: Test a CJK codec in a subinterpreter
215        import _testcapi
216        encoding = 'cp932'
217        text = "Python の開発は、1990 年ごろから開始されています。"
218        code = textwrap.dedent("""
219            import codecs
220            encoding = %r
221            text = %r
222            encoder = codecs.getincrementalencoder(encoding)()
223            text2 = encoder.encode(text).decode(encoding)
224            if text2 != text:
225                raise ValueError(f"encoding issue: {text2!a} != {text!a}")
226        """) % (encoding, text)
227        res = _testcapi.run_in_subinterp(code)
228        self.assertEqual(res, 0)
229
230class Test_IncrementalDecoder(unittest.TestCase):
231
232    def test_dbcs(self):
233        # cp949 decoder is simple with only 1 or 2 bytes sequences.
234        decoder = codecs.getincrementaldecoder('cp949')()
235        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0\xcc\xbd'),
236                         '\ud30c\uc774')
237        self.assertEqual(decoder.decode(b'\xe3 \xb8\xb6\xc0\xbb'),
238                         '\uc36c \ub9c8\uc744')
239        self.assertEqual(decoder.decode(b''), '')
240
241    def test_dbcs_keep_buffer(self):
242        decoder = codecs.getincrementaldecoder('cp949')()
243        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
244        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
245        self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
246
247        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
248        self.assertRaises(UnicodeDecodeError, decoder.decode,
249                          b'\xcc\xbd', True)
250        self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
251
252    def test_iso2022(self):
253        decoder = codecs.getincrementaldecoder('iso2022-jp')()
254        ESC = b'\x1b'
255        self.assertEqual(decoder.decode(ESC + b'('), '')
256        self.assertEqual(decoder.decode(b'B', True), '')
257        self.assertEqual(decoder.decode(ESC + b'$'), '')
258        self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
259        self.assertEqual(decoder.decode(b'@$@'), '\u4e16')
260        self.assertEqual(decoder.decode(b'$', True), '\u4e16')
261        self.assertEqual(decoder.reset(), None)
262        self.assertEqual(decoder.decode(b'@$'), '@$')
263        self.assertEqual(decoder.decode(ESC + b'$'), '')
264        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
265        self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
266
267    def test_decode_unicode(self):
268        # Trying to decode a unicode string should raise a TypeError
269        for enc in ALL_CJKENCODINGS:
270            decoder = codecs.getincrementaldecoder(enc)()
271            self.assertRaises(TypeError, decoder.decode, "")
272
273    def test_state_methods(self):
274        decoder = codecs.getincrementaldecoder('euc_jp')()
275
276        # Decode a complete input sequence
277        self.assertEqual(decoder.decode(b'\xa4\xa6'), '\u3046')
278        pending1, _ = decoder.getstate()
279        self.assertEqual(pending1, b'')
280
281        # Decode first half of a partial input sequence
282        self.assertEqual(decoder.decode(b'\xa4'), '')
283        pending2, flags2 = decoder.getstate()
284        self.assertEqual(pending2, b'\xa4')
285
286        # Decode second half of a partial input sequence
287        self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
288        pending3, _ = decoder.getstate()
289        self.assertEqual(pending3, b'')
290
291        # Jump back and decode second half of partial input sequence again
292        decoder.setstate((pending2, flags2))
293        self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
294        pending4, _ = decoder.getstate()
295        self.assertEqual(pending4, b'')
296
297        # Ensure state values are preserved correctly
298        decoder.setstate((b'abc', 123456789))
299        self.assertEqual(decoder.getstate(), (b'abc', 123456789))
300
301    def test_setstate_validates_input(self):
302        decoder = codecs.getincrementaldecoder('euc_jp')()
303        self.assertRaises(TypeError, decoder.setstate, 123)
304        self.assertRaises(TypeError, decoder.setstate, ("invalid", 0))
305        self.assertRaises(TypeError, decoder.setstate, (b"1234", "invalid"))
306        self.assertRaises(UnicodeError, decoder.setstate, (b"123456789", 0))
307
308class Test_StreamReader(unittest.TestCase):
309    def test_bug1728403(self):
310        try:
311            f = open(TESTFN, 'wb')
312            try:
313                f.write(b'\xa1')
314            finally:
315                f.close()
316            f = codecs.open(TESTFN, encoding='cp949')
317            try:
318                self.assertRaises(UnicodeDecodeError, f.read, 2)
319            finally:
320                f.close()
321        finally:
322            os_helper.unlink(TESTFN)
323
324class Test_StreamWriter(unittest.TestCase):
325    def test_gb18030(self):
326        s= io.BytesIO()
327        c = codecs.getwriter('gb18030')(s)
328        c.write('123')
329        self.assertEqual(s.getvalue(), b'123')
330        c.write('\U00012345')
331        self.assertEqual(s.getvalue(), b'123\x907\x959')
332        c.write('\uac00\u00ac')
333        self.assertEqual(s.getvalue(),
334                b'123\x907\x959\x827\xcf5\x810\x851')
335
336    def test_utf_8(self):
337        s= io.BytesIO()
338        c = codecs.getwriter('utf-8')(s)
339        c.write('123')
340        self.assertEqual(s.getvalue(), b'123')
341        c.write('\U00012345')
342        self.assertEqual(s.getvalue(), b'123\xf0\x92\x8d\x85')
343        c.write('\uac00\u00ac')
344        self.assertEqual(s.getvalue(),
345            b'123\xf0\x92\x8d\x85'
346            b'\xea\xb0\x80\xc2\xac')
347
348    def test_streamwriter_strwrite(self):
349        s = io.BytesIO()
350        wr = codecs.getwriter('gb18030')(s)
351        wr.write('abcd')
352        self.assertEqual(s.getvalue(), b'abcd')
353
354class Test_ISO2022(unittest.TestCase):
355    def test_g2(self):
356        iso2022jp2 = b'\x1b(B:hu4:unit\x1b.A\x1bNi de famille'
357        uni = ':hu4:unit\xe9 de famille'
358        self.assertEqual(iso2022jp2.decode('iso2022-jp-2'), uni)
359
360    def test_iso2022_jp_g0(self):
361        self.assertNotIn(b'\x0e', '\N{SOFT HYPHEN}'.encode('iso-2022-jp-2'))
362        for encoding in ('iso-2022-jp-2004', 'iso-2022-jp-3'):
363            e = '\u3406'.encode(encoding)
364            self.assertFalse(any(x > 0x80 for x in e))
365
366    def test_bug1572832(self):
367        for x in range(0x10000, 0x110000):
368            # Any ISO 2022 codec will cause the segfault
369            chr(x).encode('iso_2022_jp', 'ignore')
370
371class TestStateful(unittest.TestCase):
372    text = '\u4E16\u4E16'
373    encoding = 'iso-2022-jp'
374    expected = b'\x1b$B@$@$'
375    reset = b'\x1b(B'
376    expected_reset = expected + reset
377
378    def test_encode(self):
379        self.assertEqual(self.text.encode(self.encoding), self.expected_reset)
380
381    def test_incrementalencoder(self):
382        encoder = codecs.getincrementalencoder(self.encoding)()
383        output = b''.join(
384            encoder.encode(char)
385            for char in self.text)
386        self.assertEqual(output, self.expected)
387        self.assertEqual(encoder.encode('', final=True), self.reset)
388        self.assertEqual(encoder.encode('', final=True), b'')
389
390    def test_incrementalencoder_final(self):
391        encoder = codecs.getincrementalencoder(self.encoding)()
392        last_index = len(self.text) - 1
393        output = b''.join(
394            encoder.encode(char, index == last_index)
395            for index, char in enumerate(self.text))
396        self.assertEqual(output, self.expected_reset)
397        self.assertEqual(encoder.encode('', final=True), b'')
398
399class TestHZStateful(TestStateful):
400    text = '\u804a\u804a'
401    encoding = 'hz'
402    expected = b'~{ADAD'
403    reset = b'~}'
404    expected_reset = expected + reset
405
406
407if __name__ == "__main__":
408    unittest.main()
409