1# -*- coding: utf-8 -*-
2
3"""
4Tests for the incremental XML serialisation API.
5"""
6
7from __future__ import absolute_import
8
9import io
10import os
11import sys
12import unittest
13import textwrap
14import tempfile
15
16from lxml.etree import LxmlSyntaxError
17
18from .common_imports import etree, BytesIO, HelperTestCase, skipIf, _str
19
20
21class _XmlFileTestCaseBase(HelperTestCase):
22    _file = None  # to be set by specific subtypes below
23
24    def test_element(self):
25        with etree.xmlfile(self._file) as xf:
26            with xf.element('test'):
27                pass
28        self.assertXml('<test></test>')
29
30    def test_element_write_text(self):
31        with etree.xmlfile(self._file) as xf:
32            with xf.element('test'):
33                xf.write('toast')
34        self.assertXml('<test>toast</test>')
35
36    def test_element_write_empty(self):
37        with etree.xmlfile(self._file) as xf:
38            with xf.element('test'):
39                xf.write(None)
40                xf.write('')
41                xf.write('')
42                xf.write(None)
43        self.assertXml('<test></test>')
44
45    def test_element_nested(self):
46        with etree.xmlfile(self._file) as xf:
47            with xf.element('test'):
48                with xf.element('toast'):
49                    with xf.element('taste'):
50                        xf.write('conTent')
51        self.assertXml('<test><toast><taste>conTent</taste></toast></test>')
52
53    def test_element_nested_with_text(self):
54        with etree.xmlfile(self._file) as xf:
55            with xf.element('test'):
56                xf.write('con')
57                with xf.element('toast'):
58                    xf.write('tent')
59                    with xf.element('taste'):
60                        xf.write('inside')
61                    xf.write('tnet')
62                xf.write('noc')
63        self.assertXml('<test>con<toast>tent<taste>inside</taste>'
64                       'tnet</toast>noc</test>')
65
66    def test_write_Element(self):
67        with etree.xmlfile(self._file) as xf:
68            xf.write(etree.Element('test'))
69        self.assertXml('<test/>')
70
71    def test_write_Element_repeatedly(self):
72        element = etree.Element('test')
73        with etree.xmlfile(self._file) as xf:
74            with xf.element('test'):
75                for i in range(100):
76                    xf.write(element)
77
78        tree = self._parse_file()
79        self.assertTrue(tree is not None)
80        self.assertEqual(100, len(tree.getroot()))
81        self.assertEqual({'test'}, {el.tag for el in tree.getroot()})
82
83    def test_namespace_nsmap(self):
84        with etree.xmlfile(self._file) as xf:
85            with xf.element('{nsURI}test', nsmap={'x': 'nsURI'}):
86                pass
87        self.assertXml('<x:test xmlns:x="nsURI"></x:test>')
88
89    def test_namespace_nested_nsmap(self):
90        with etree.xmlfile(self._file) as xf:
91            with xf.element('test', nsmap={'x': 'nsURI'}):
92                with xf.element('{nsURI}toast'):
93                    pass
94        self.assertXml('<test xmlns:x="nsURI"><x:toast></x:toast></test>')
95
96    def test_anonymous_namespace(self):
97        with etree.xmlfile(self._file) as xf:
98            with xf.element('{nsURI}test'):
99                pass
100        self.assertXml('<ns0:test xmlns:ns0="nsURI"></ns0:test>')
101
102    def test_namespace_nested_anonymous(self):
103        with etree.xmlfile(self._file) as xf:
104            with xf.element('test'):
105                with xf.element('{nsURI}toast'):
106                    pass
107        self.assertXml('<test><ns0:toast xmlns:ns0="nsURI"></ns0:toast></test>')
108
109    def test_default_namespace(self):
110        with etree.xmlfile(self._file) as xf:
111            with xf.element('{nsURI}test', nsmap={None: 'nsURI'}):
112                pass
113        self.assertXml('<test xmlns="nsURI"></test>')
114
115    def test_nested_default_namespace(self):
116        with etree.xmlfile(self._file) as xf:
117            with xf.element('{nsURI}test', nsmap={None: 'nsURI'}):
118                with xf.element('{nsURI}toast'):
119                    pass
120        self.assertXml('<test xmlns="nsURI"><toast></toast></test>')
121
122    def test_nested_default_namespace_and_other(self):
123        with etree.xmlfile(self._file) as xf:
124            with xf.element('{nsURI}test', nsmap={None: 'nsURI', 'p': 'ns2'}):
125                with xf.element('{nsURI}toast'):
126                    pass
127                with xf.element('{ns2}toast'):
128                    pass
129        self.assertXml(
130            '<test xmlns="nsURI" xmlns:p="ns2"><toast></toast><p:toast></p:toast></test>')
131
132    def test_pi(self):
133        with etree.xmlfile(self._file) as xf:
134            xf.write(etree.ProcessingInstruction('pypi'))
135            with xf.element('test'):
136                pass
137        self.assertXml('<?pypi ?><test></test>')
138
139    def test_comment(self):
140        with etree.xmlfile(self._file) as xf:
141            xf.write(etree.Comment('a comment'))
142            with xf.element('test'):
143                pass
144        self.assertXml('<!--a comment--><test></test>')
145
146    def test_attribute(self):
147        with etree.xmlfile(self._file) as xf:
148            with xf.element('test', attrib={'k': 'v'}):
149                pass
150        self.assertXml('<test k="v"></test>')
151
152    def test_attribute_extra(self):
153        with etree.xmlfile(self._file) as xf:
154            with xf.element('test', attrib={'k': 'v'}, n='N'):
155                pass
156        self.assertXml('<test k="v" n="N"></test>')
157
158    def test_attribute_extra_duplicate(self):
159        with etree.xmlfile(self._file) as xf:
160            with xf.element('test', attrib={'k': 'v'}, k='V'):
161                pass
162        self.assertXml('<test k="V"></test>')
163
164    def test_escaping(self):
165        with etree.xmlfile(self._file) as xf:
166            with xf.element('test'):
167                xf.write('Comments: <!-- text -->\n')
168                xf.write('Entities: &amp;')
169        self.assertXml(
170            '<test>Comments: &lt;!-- text --&gt;\nEntities: &amp;amp;</test>')
171
172    def test_encoding(self):
173        with etree.xmlfile(self._file, encoding='utf16') as xf:
174            with xf.element('test'):
175                xf.write('toast')
176        self.assertXml('<test>toast</test>', encoding='utf16')
177
178    def test_buffering(self):
179        with etree.xmlfile(self._file, buffered=False) as xf:
180            with xf.element('test'):
181                self.assertXml("<test>")
182                xf.write('toast')
183                self.assertXml("<test>toast")
184                with xf.element('taste'):
185                    self.assertXml("<test>toast<taste>")
186                    xf.write('some', etree.Element("more"), "toast")
187                    self.assertXml("<test>toast<taste>some<more/>toast")
188                self.assertXml("<test>toast<taste>some<more/>toast</taste>")
189                xf.write('end')
190                self.assertXml("<test>toast<taste>some<more/>toast</taste>end")
191            self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>")
192        self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>")
193
194    def test_flush(self):
195        with etree.xmlfile(self._file, buffered=True) as xf:
196            with xf.element('test'):
197                self.assertXml("")
198                xf.write('toast')
199                self.assertXml("")
200                with xf.element('taste'):
201                    self.assertXml("")
202                    xf.flush()
203                    self.assertXml("<test>toast<taste>")
204                self.assertXml("<test>toast<taste>")
205            self.assertXml("<test>toast<taste>")
206        self.assertXml("<test>toast<taste></taste></test>")
207
208    def test_non_io_exception_continues_closing(self):
209        try:
210            with etree.xmlfile(self._file) as xf:
211                with xf.element('root'):
212                    with xf.element('test'):
213                        xf.write("BEFORE")
214                        raise TypeError("FAIL!")
215                    xf.write("AFTER")
216        except TypeError as exc:
217            self.assertTrue("FAIL" in str(exc), exc)
218        else:
219            self.assertTrue(False, "exception not propagated")
220        self.assertXml("<root><test>BEFORE</test></root>")
221
222    def test_generator_close_continues_closing(self):
223        def gen():
224            with etree.xmlfile(self._file) as xf:
225                with xf.element('root'):
226                    while True:
227                        content = (yield)
228                        with xf.element('entry'):
229                            xf.write(content)
230
231        g = gen()
232        next(g)
233        g.send('A')
234        g.send('B')
235        g.send('C')
236        g.close()
237        self.assertXml("<root><entry>A</entry><entry>B</entry><entry>C</entry></root>")
238
239    def test_failure_preceding_text(self):
240        try:
241            with etree.xmlfile(self._file) as xf:
242                xf.write('toast')
243        except etree.LxmlSyntaxError:
244            self.assertTrue(True)
245        else:
246            self.assertTrue(False)
247
248    def test_failure_trailing_text(self):
249        with etree.xmlfile(self._file) as xf:
250            with xf.element('test'):
251                pass
252            try:
253                xf.write('toast')
254            except etree.LxmlSyntaxError:
255                self.assertTrue(True)
256            else:
257                self.assertTrue(False)
258
259    def test_failure_trailing_Element(self):
260        with etree.xmlfile(self._file) as xf:
261            with xf.element('test'):
262                pass
263            try:
264                xf.write(etree.Element('test'))
265            except etree.LxmlSyntaxError:
266                self.assertTrue(True)
267            else:
268                self.assertTrue(False)
269
270    def test_closing_out_of_order_in_error_case(self):
271        cm_exit = None
272        try:
273            with etree.xmlfile(self._file) as xf:
274                x = xf.element('test')
275                cm_exit = x.__exit__
276                x.__enter__()
277                raise ValueError('123')
278        except ValueError:
279            self.assertTrue(cm_exit)
280            try:
281                cm_exit(ValueError, ValueError("huhu"), None)
282            except etree.LxmlSyntaxError:
283                self.assertTrue(True)
284            else:
285                self.assertTrue(False)
286        else:
287            self.assertTrue(False)
288
289    def _read_file(self):
290        pos = self._file.tell()
291        self._file.seek(0)
292        try:
293            return self._file.read()
294        finally:
295            self._file.seek(pos)
296
297    def _parse_file(self):
298        pos = self._file.tell()
299        self._file.seek(0)
300        try:
301            return etree.parse(self._file)
302        finally:
303            self._file.seek(pos)
304
305    def tearDown(self):
306        if self._file is not None:
307            self._file.close()
308
309    def assertXml(self, expected, encoding='utf8'):
310        self.assertEqual(self._read_file().decode(encoding), expected)
311
312
313class BytesIOXmlFileTestCase(_XmlFileTestCaseBase):
314    def setUp(self):
315        self._file = BytesIO()
316
317    def test_filelike_close(self):
318        with etree.xmlfile(self._file, close=True) as xf:
319            with xf.element('test'):
320                pass
321        self.assertRaises(ValueError, self._file.getvalue)
322
323
324class TempXmlFileTestCase(_XmlFileTestCaseBase):
325    def setUp(self):
326        self._file = tempfile.TemporaryFile()
327
328
329@skipIf(sys.platform.startswith("win"), "Can't reopen temporary files on Windows")
330class TempPathXmlFileTestCase(_XmlFileTestCaseBase):
331    def setUp(self):
332        self._tmpfile = tempfile.NamedTemporaryFile()
333        self._file = self._tmpfile.name
334
335    def tearDown(self):
336        try:
337            self._tmpfile.close()
338        finally:
339            if os.path.exists(self._tmpfile.name):
340                os.unlink(self._tmpfile.name)
341
342    def _read_file(self):
343        self._tmpfile.seek(0)
344        return self._tmpfile.read()
345
346    def _parse_file(self):
347        self._tmpfile.seek(0)
348        return etree.parse(self._tmpfile)
349
350    @skipIf(True, "temp file behaviour is too platform specific here")
351    def test_buffering(self):
352        pass
353
354    @skipIf(True, "temp file behaviour is too platform specific here")
355    def test_flush(self):
356        pass
357
358
359class SimpleFileLikeXmlFileTestCase(_XmlFileTestCaseBase):
360    class SimpleFileLike(object):
361        def __init__(self, target):
362            self._target = target
363            self.write = target.write
364            self.tell = target.tell
365            self.seek = target.seek
366            self.closed = False
367
368        def close(self):
369            assert not self.closed
370            self.closed = True
371            self._target.close()
372
373    def setUp(self):
374        self._target = BytesIO()
375        self._file = self.SimpleFileLike(self._target)
376
377    def _read_file(self):
378        return self._target.getvalue()
379
380    def _parse_file(self):
381        pos = self._file.tell()
382        self._target.seek(0)
383        try:
384            return etree.parse(self._target)
385        finally:
386            self._target.seek(pos)
387
388    def test_filelike_not_closing(self):
389        with etree.xmlfile(self._file) as xf:
390            with xf.element('test'):
391                pass
392        self.assertFalse(self._file.closed)
393
394    def test_filelike_close(self):
395        with etree.xmlfile(self._file, close=True) as xf:
396            with xf.element('test'):
397                pass
398        self.assertTrue(self._file.closed)
399        self._file = None  # prevent closing in tearDown()
400
401    def test_write_fails(self):
402        class WriteError(Exception):
403            pass
404
405        class Writer(object):
406            def __init__(self, trigger):
407                self._trigger = trigger
408                self._failed = False
409
410            def write(self, data):
411                assert not self._failed, "write() called again after failure"
412                if self._trigger in data:
413                    self._failed = True
414                    raise WriteError("FAILED: " + self._trigger.decode('utf8'))
415
416        for trigger in ['text', 'root', 'tag', 'noflush']:
417            try:
418                with etree.xmlfile(Writer(trigger.encode('utf8')), encoding='utf8') as xf:
419                    with xf.element('root'):
420                        xf.flush()
421                        with xf.element('tag'):
422                            xf.write('text')
423                            xf.flush()
424                            xf.write('noflush')
425                        xf.flush()
426                    xf.flush()
427            except WriteError as exc:
428                self.assertTrue('FAILED: ' + trigger in str(exc))
429            else:
430                self.assertTrue(False, "exception not raised for '%s'" % trigger)
431
432
433class HtmlFileTestCase(_XmlFileTestCaseBase):
434    def setUp(self):
435        self._file = BytesIO()
436
437    def test_void_elements(self):
438        # http://www.w3.org/TR/html5/syntax.html#elements-0
439        void_elements = {
440            "area", "base", "br", "col", "embed", "hr", "img", "input",
441            "keygen", "link", "meta", "param", "source", "track", "wbr"}
442
443        # FIXME: These don't get serialized as void elements.
444        void_elements.difference_update([
445            'area', 'embed', 'keygen', 'source', 'track', 'wbr'
446        ])
447
448        for tag in sorted(void_elements):
449            with etree.htmlfile(self._file) as xf:
450                xf.write(etree.Element(tag))
451            self.assertXml('<%s>' % tag)
452            self._file = BytesIO()
453
454    def test_method_context_manager_misuse(self):
455        with etree.htmlfile(self._file) as xf:
456            with xf.element('foo'):
457                cm = xf.method('xml')
458                cm.__enter__()
459
460                self.assertRaises(LxmlSyntaxError, cm.__enter__)
461
462                cm2 = xf.method('xml')
463                cm2.__enter__()
464                cm2.__exit__(None, None, None)
465
466                self.assertRaises(LxmlSyntaxError, cm2.__exit__, None, None, None)
467
468                cm3 = xf.method('xml')
469                cm3.__enter__()
470                with xf.method('html'):
471                    self.assertRaises(LxmlSyntaxError, cm3.__exit__, None, None, None)
472
473    def test_xml_mode_write_inside_html(self):
474        tag = 'foo'
475        attrib = {'selected': 'bar'}
476        elt = etree.Element(tag, attrib=attrib)
477
478        with etree.htmlfile(self._file) as xf:
479            with xf.element("root"):
480                xf.write(elt)  # 1
481
482                assert elt.text is None
483                xf.write(elt, method='xml')  # 2
484
485                elt.text = ""
486                xf.write(elt, method='xml')  # 3
487
488                with xf.element(tag, attrib=attrib, method='xml'):
489                    pass # 4
490
491                xf.write(elt)  # 5
492
493                with xf.method('xml'):
494                    xf.write(elt)  # 6
495
496        self.assertXml(
497            '<root>'
498                '<foo selected></foo>'  # 1
499                '<foo selected="bar"/>'  # 2
500                '<foo selected="bar"></foo>'  # 3
501                '<foo selected="bar"></foo>'  # 4
502                '<foo selected></foo>'  # 5
503                '<foo selected="bar"></foo>'  # 6
504            '</root>')
505        self._file = BytesIO()
506
507    def test_xml_mode_element_inside_html(self):
508        # The htmlfile already outputs in xml mode for .element calls. This
509        # test actually illustrates a bug
510
511        with etree.htmlfile(self._file) as xf:
512            with xf.element("root"):
513                with xf.element('foo', attrib={'selected': 'bar'}):
514                    pass
515
516        self.assertXml(
517            '<root>'
518              # '<foo selected></foo>'  # FIXME: this is the correct output
519                                        # in html mode
520              '<foo selected="bar"></foo>'
521            '</root>')
522        self._file = BytesIO()
523
524    def test_attribute_quoting(self):
525        with etree.htmlfile(self._file) as xf:
526            with xf.element("tagname", attrib={"attr": '"misquoted"'}):
527                xf.write("foo")
528
529        self.assertXml('<tagname attr="&quot;misquoted&quot;">foo</tagname>')
530
531    def test_attribute_quoting_unicode(self):
532        with etree.htmlfile(self._file) as xf:
533            with xf.element("tagname", attrib={"attr": _str('"misquöted\\u3344\\U00013344"')}):
534                xf.write("foo")
535
536        self.assertXml('<tagname attr="&quot;misqu&#xF6;ted&#x3344;&#x13344;&quot;">foo</tagname>')
537
538    def test_unescaped_script(self):
539        with etree.htmlfile(self._file) as xf:
540            elt = etree.Element('script')
541            elt.text = "if (a < b);"
542            xf.write(elt)
543        self.assertXml('<script>if (a < b);</script>')
544
545    def test_unescaped_script_incremental(self):
546        with etree.htmlfile(self._file) as xf:
547            with xf.element('script'):
548                xf.write("if (a < b);")
549
550        self.assertXml('<script>if (a < b);</script>')
551
552    def test_write_declaration(self):
553        with etree.htmlfile(self._file) as xf:
554            try:
555                xf.write_declaration()
556            except etree.LxmlSyntaxError:
557                self.assertTrue(True)
558            else:
559                self.assertTrue(False)
560            xf.write(etree.Element('html'))
561
562    def test_write_namespaced_element(self):
563        with etree.htmlfile(self._file) as xf:
564            xf.write(etree.Element('{some_ns}some_tag'))
565        self.assertXml('<ns0:some_tag xmlns:ns0="some_ns"></ns0:some_tag>')
566
567    def test_open_namespaced_element(self):
568        with etree.htmlfile(self._file) as xf:
569            with xf.element("{some_ns}some_tag"):
570                pass
571        self.assertXml('<ns0:some_tag xmlns:ns0="some_ns"></ns0:some_tag>')
572
573
574class AsyncXmlFileTestCase(HelperTestCase):
575    def test_async_api(self):
576        out = io.BytesIO()
577        xf = etree.xmlfile(out)
578        scm = xf.__enter__()
579        acm = xf.__aenter__()
580        list(acm.__await__())  # fake await to avoid destructor warning
581
582        def api_of(obj):
583            return sorted(name for name in dir(scm) if not name.startswith('__'))
584
585        a_api = api_of(acm)
586
587        self.assertEqual(api_of(scm), api_of(acm))
588        self.assertTrue('write' in a_api)
589        self.assertTrue('element' in a_api)
590        self.assertTrue('method' in a_api)
591        self.assertTrue(len(a_api) > 5)
592
593    def _run_async(self, coro):
594        while True:
595            try:
596                coro.send(None)
597            except StopIteration as ex:
598                return ex.value
599
600    @skipIf(sys.version_info < (3, 5), "requires support for async-def (Py3.5+)")
601    def test_async(self):
602        code = textwrap.dedent("""\
603        async def test_async_xmlfile(close=True, buffered=True):
604            class Writer(object):
605                def __init__(self):
606                    self._data = []
607                    self._all_data = None
608                    self._calls = 0
609
610                async def write(self, data):
611                    self._calls += 1
612                    self._data.append(data)
613
614                async def close(self):
615                    assert self._all_data is None
616                    assert self._data is not None
617                    self._all_data = b''.join(self._data)
618                    self._data = None  # make writing fail afterwards
619
620            async def generate(out, close=True, buffered=True):
621                async with etree.xmlfile(out, close=close, buffered=buffered) as xf:
622                    async with xf.element('root'):
623                        await xf.write('root-text')
624                        async with xf.method('html'):
625                            await xf.write(etree.Element('img', src='http://huhu.org/'))
626                        await xf.flush()
627                        for i in range(3):
628                            async with xf.element('el'):
629                                await xf.write('text-%d' % i)
630
631            out = Writer()
632            await generate(out, close=close, buffered=buffered)
633            if not close:
634                await out.close()
635            assert out._data is None, out._data
636            return out._all_data, out._calls
637        """)
638        lns = {}
639        exec(code, globals(), lns)
640        test_async_xmlfile = lns['test_async_xmlfile']
641
642        expected = (
643            b'<root>root-text<img src="http://huhu.org/">'
644            b'<el>text-0</el><el>text-1</el><el>text-2</el></root>'
645        )
646
647        data, calls = self._run_async(test_async_xmlfile(close=True))
648        self.assertEqual(expected, data)
649        self.assertEqual(2, calls)  # only flush() and close()
650
651        data, calls = self._run_async(test_async_xmlfile(close=False))
652        self.assertEqual(expected, data)
653        self.assertEqual(2, calls)  # only flush() and close()
654
655        data, unbuffered_calls = self._run_async(test_async_xmlfile(buffered=False))
656        self.assertEqual(expected, data)
657        self.assertTrue(unbuffered_calls > calls, unbuffered_calls)
658
659
660def test_suite():
661    suite = unittest.TestSuite()
662    suite.addTests([
663        unittest.makeSuite(BytesIOXmlFileTestCase),
664        unittest.makeSuite(TempXmlFileTestCase),
665        unittest.makeSuite(TempPathXmlFileTestCase),
666        unittest.makeSuite(SimpleFileLikeXmlFileTestCase),
667        unittest.makeSuite(HtmlFileTestCase),
668        unittest.makeSuite(AsyncXmlFileTestCase),
669    ])
670    return suite
671
672
673if __name__ == '__main__':
674    print('to test use test.py %s' % __file__)
675