1# -*- coding: utf-8 -*- 2 3""" 4Tests for the incremental XML serialisation API. 5""" 6 7from __future__ import absolute_import 8 9import io 10import os 11import sys 12import unittest 13import textwrap 14import tempfile 15 16from lxml.etree import LxmlSyntaxError 17 18from .common_imports import etree, BytesIO, HelperTestCase, skipIf, _str 19 20 21class _XmlFileTestCaseBase(HelperTestCase): 22 _file = None # to be set by specific subtypes below 23 24 def test_element(self): 25 with etree.xmlfile(self._file) as xf: 26 with xf.element('test'): 27 pass 28 self.assertXml('<test></test>') 29 30 def test_element_write_text(self): 31 with etree.xmlfile(self._file) as xf: 32 with xf.element('test'): 33 xf.write('toast') 34 self.assertXml('<test>toast</test>') 35 36 def test_element_write_empty(self): 37 with etree.xmlfile(self._file) as xf: 38 with xf.element('test'): 39 xf.write(None) 40 xf.write('') 41 xf.write('') 42 xf.write(None) 43 self.assertXml('<test></test>') 44 45 def test_element_nested(self): 46 with etree.xmlfile(self._file) as xf: 47 with xf.element('test'): 48 with xf.element('toast'): 49 with xf.element('taste'): 50 xf.write('conTent') 51 self.assertXml('<test><toast><taste>conTent</taste></toast></test>') 52 53 def test_element_nested_with_text(self): 54 with etree.xmlfile(self._file) as xf: 55 with xf.element('test'): 56 xf.write('con') 57 with xf.element('toast'): 58 xf.write('tent') 59 with xf.element('taste'): 60 xf.write('inside') 61 xf.write('tnet') 62 xf.write('noc') 63 self.assertXml('<test>con<toast>tent<taste>inside</taste>' 64 'tnet</toast>noc</test>') 65 66 def test_write_Element(self): 67 with etree.xmlfile(self._file) as xf: 68 xf.write(etree.Element('test')) 69 self.assertXml('<test/>') 70 71 def test_write_Element_repeatedly(self): 72 element = etree.Element('test') 73 with etree.xmlfile(self._file) as xf: 74 with xf.element('test'): 75 for i in range(100): 76 xf.write(element) 77 78 tree = self._parse_file() 79 self.assertTrue(tree is not None) 80 self.assertEqual(100, len(tree.getroot())) 81 self.assertEqual({'test'}, {el.tag for el in tree.getroot()}) 82 83 def test_namespace_nsmap(self): 84 with etree.xmlfile(self._file) as xf: 85 with xf.element('{nsURI}test', nsmap={'x': 'nsURI'}): 86 pass 87 self.assertXml('<x:test xmlns:x="nsURI"></x:test>') 88 89 def test_namespace_nested_nsmap(self): 90 with etree.xmlfile(self._file) as xf: 91 with xf.element('test', nsmap={'x': 'nsURI'}): 92 with xf.element('{nsURI}toast'): 93 pass 94 self.assertXml('<test xmlns:x="nsURI"><x:toast></x:toast></test>') 95 96 def test_anonymous_namespace(self): 97 with etree.xmlfile(self._file) as xf: 98 with xf.element('{nsURI}test'): 99 pass 100 self.assertXml('<ns0:test xmlns:ns0="nsURI"></ns0:test>') 101 102 def test_namespace_nested_anonymous(self): 103 with etree.xmlfile(self._file) as xf: 104 with xf.element('test'): 105 with xf.element('{nsURI}toast'): 106 pass 107 self.assertXml('<test><ns0:toast xmlns:ns0="nsURI"></ns0:toast></test>') 108 109 def test_default_namespace(self): 110 with etree.xmlfile(self._file) as xf: 111 with xf.element('{nsURI}test', nsmap={None: 'nsURI'}): 112 pass 113 self.assertXml('<test xmlns="nsURI"></test>') 114 115 def test_nested_default_namespace(self): 116 with etree.xmlfile(self._file) as xf: 117 with xf.element('{nsURI}test', nsmap={None: 'nsURI'}): 118 with xf.element('{nsURI}toast'): 119 pass 120 self.assertXml('<test xmlns="nsURI"><toast></toast></test>') 121 122 def test_nested_default_namespace_and_other(self): 123 with etree.xmlfile(self._file) as xf: 124 with xf.element('{nsURI}test', nsmap={None: 'nsURI', 'p': 'ns2'}): 125 with xf.element('{nsURI}toast'): 126 pass 127 with xf.element('{ns2}toast'): 128 pass 129 self.assertXml( 130 '<test xmlns="nsURI" xmlns:p="ns2"><toast></toast><p:toast></p:toast></test>') 131 132 def test_pi(self): 133 with etree.xmlfile(self._file) as xf: 134 xf.write(etree.ProcessingInstruction('pypi')) 135 with xf.element('test'): 136 pass 137 self.assertXml('<?pypi ?><test></test>') 138 139 def test_comment(self): 140 with etree.xmlfile(self._file) as xf: 141 xf.write(etree.Comment('a comment')) 142 with xf.element('test'): 143 pass 144 self.assertXml('<!--a comment--><test></test>') 145 146 def test_attribute(self): 147 with etree.xmlfile(self._file) as xf: 148 with xf.element('test', attrib={'k': 'v'}): 149 pass 150 self.assertXml('<test k="v"></test>') 151 152 def test_attribute_extra(self): 153 with etree.xmlfile(self._file) as xf: 154 with xf.element('test', attrib={'k': 'v'}, n='N'): 155 pass 156 self.assertXml('<test k="v" n="N"></test>') 157 158 def test_attribute_extra_duplicate(self): 159 with etree.xmlfile(self._file) as xf: 160 with xf.element('test', attrib={'k': 'v'}, k='V'): 161 pass 162 self.assertXml('<test k="V"></test>') 163 164 def test_escaping(self): 165 with etree.xmlfile(self._file) as xf: 166 with xf.element('test'): 167 xf.write('Comments: <!-- text -->\n') 168 xf.write('Entities: &') 169 self.assertXml( 170 '<test>Comments: <!-- text -->\nEntities: &amp;</test>') 171 172 def test_encoding(self): 173 with etree.xmlfile(self._file, encoding='utf16') as xf: 174 with xf.element('test'): 175 xf.write('toast') 176 self.assertXml('<test>toast</test>', encoding='utf16') 177 178 def test_buffering(self): 179 with etree.xmlfile(self._file, buffered=False) as xf: 180 with xf.element('test'): 181 self.assertXml("<test>") 182 xf.write('toast') 183 self.assertXml("<test>toast") 184 with xf.element('taste'): 185 self.assertXml("<test>toast<taste>") 186 xf.write('some', etree.Element("more"), "toast") 187 self.assertXml("<test>toast<taste>some<more/>toast") 188 self.assertXml("<test>toast<taste>some<more/>toast</taste>") 189 xf.write('end') 190 self.assertXml("<test>toast<taste>some<more/>toast</taste>end") 191 self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>") 192 self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>") 193 194 def test_flush(self): 195 with etree.xmlfile(self._file, buffered=True) as xf: 196 with xf.element('test'): 197 self.assertXml("") 198 xf.write('toast') 199 self.assertXml("") 200 with xf.element('taste'): 201 self.assertXml("") 202 xf.flush() 203 self.assertXml("<test>toast<taste>") 204 self.assertXml("<test>toast<taste>") 205 self.assertXml("<test>toast<taste>") 206 self.assertXml("<test>toast<taste></taste></test>") 207 208 def test_non_io_exception_continues_closing(self): 209 try: 210 with etree.xmlfile(self._file) as xf: 211 with xf.element('root'): 212 with xf.element('test'): 213 xf.write("BEFORE") 214 raise TypeError("FAIL!") 215 xf.write("AFTER") 216 except TypeError as exc: 217 self.assertTrue("FAIL" in str(exc), exc) 218 else: 219 self.assertTrue(False, "exception not propagated") 220 self.assertXml("<root><test>BEFORE</test></root>") 221 222 def test_generator_close_continues_closing(self): 223 def gen(): 224 with etree.xmlfile(self._file) as xf: 225 with xf.element('root'): 226 while True: 227 content = (yield) 228 with xf.element('entry'): 229 xf.write(content) 230 231 g = gen() 232 next(g) 233 g.send('A') 234 g.send('B') 235 g.send('C') 236 g.close() 237 self.assertXml("<root><entry>A</entry><entry>B</entry><entry>C</entry></root>") 238 239 def test_failure_preceding_text(self): 240 try: 241 with etree.xmlfile(self._file) as xf: 242 xf.write('toast') 243 except etree.LxmlSyntaxError: 244 self.assertTrue(True) 245 else: 246 self.assertTrue(False) 247 248 def test_failure_trailing_text(self): 249 with etree.xmlfile(self._file) as xf: 250 with xf.element('test'): 251 pass 252 try: 253 xf.write('toast') 254 except etree.LxmlSyntaxError: 255 self.assertTrue(True) 256 else: 257 self.assertTrue(False) 258 259 def test_failure_trailing_Element(self): 260 with etree.xmlfile(self._file) as xf: 261 with xf.element('test'): 262 pass 263 try: 264 xf.write(etree.Element('test')) 265 except etree.LxmlSyntaxError: 266 self.assertTrue(True) 267 else: 268 self.assertTrue(False) 269 270 def test_closing_out_of_order_in_error_case(self): 271 cm_exit = None 272 try: 273 with etree.xmlfile(self._file) as xf: 274 x = xf.element('test') 275 cm_exit = x.__exit__ 276 x.__enter__() 277 raise ValueError('123') 278 except ValueError: 279 self.assertTrue(cm_exit) 280 try: 281 cm_exit(ValueError, ValueError("huhu"), None) 282 except etree.LxmlSyntaxError: 283 self.assertTrue(True) 284 else: 285 self.assertTrue(False) 286 else: 287 self.assertTrue(False) 288 289 def _read_file(self): 290 pos = self._file.tell() 291 self._file.seek(0) 292 try: 293 return self._file.read() 294 finally: 295 self._file.seek(pos) 296 297 def _parse_file(self): 298 pos = self._file.tell() 299 self._file.seek(0) 300 try: 301 return etree.parse(self._file) 302 finally: 303 self._file.seek(pos) 304 305 def tearDown(self): 306 if self._file is not None: 307 self._file.close() 308 309 def assertXml(self, expected, encoding='utf8'): 310 self.assertEqual(self._read_file().decode(encoding), expected) 311 312 313class BytesIOXmlFileTestCase(_XmlFileTestCaseBase): 314 def setUp(self): 315 self._file = BytesIO() 316 317 def test_filelike_close(self): 318 with etree.xmlfile(self._file, close=True) as xf: 319 with xf.element('test'): 320 pass 321 self.assertRaises(ValueError, self._file.getvalue) 322 323 324class TempXmlFileTestCase(_XmlFileTestCaseBase): 325 def setUp(self): 326 self._file = tempfile.TemporaryFile() 327 328 329@skipIf(sys.platform.startswith("win"), "Can't reopen temporary files on Windows") 330class TempPathXmlFileTestCase(_XmlFileTestCaseBase): 331 def setUp(self): 332 self._tmpfile = tempfile.NamedTemporaryFile() 333 self._file = self._tmpfile.name 334 335 def tearDown(self): 336 try: 337 self._tmpfile.close() 338 finally: 339 if os.path.exists(self._tmpfile.name): 340 os.unlink(self._tmpfile.name) 341 342 def _read_file(self): 343 self._tmpfile.seek(0) 344 return self._tmpfile.read() 345 346 def _parse_file(self): 347 self._tmpfile.seek(0) 348 return etree.parse(self._tmpfile) 349 350 @skipIf(True, "temp file behaviour is too platform specific here") 351 def test_buffering(self): 352 pass 353 354 @skipIf(True, "temp file behaviour is too platform specific here") 355 def test_flush(self): 356 pass 357 358 359class SimpleFileLikeXmlFileTestCase(_XmlFileTestCaseBase): 360 class SimpleFileLike(object): 361 def __init__(self, target): 362 self._target = target 363 self.write = target.write 364 self.tell = target.tell 365 self.seek = target.seek 366 self.closed = False 367 368 def close(self): 369 assert not self.closed 370 self.closed = True 371 self._target.close() 372 373 def setUp(self): 374 self._target = BytesIO() 375 self._file = self.SimpleFileLike(self._target) 376 377 def _read_file(self): 378 return self._target.getvalue() 379 380 def _parse_file(self): 381 pos = self._file.tell() 382 self._target.seek(0) 383 try: 384 return etree.parse(self._target) 385 finally: 386 self._target.seek(pos) 387 388 def test_filelike_not_closing(self): 389 with etree.xmlfile(self._file) as xf: 390 with xf.element('test'): 391 pass 392 self.assertFalse(self._file.closed) 393 394 def test_filelike_close(self): 395 with etree.xmlfile(self._file, close=True) as xf: 396 with xf.element('test'): 397 pass 398 self.assertTrue(self._file.closed) 399 self._file = None # prevent closing in tearDown() 400 401 def test_write_fails(self): 402 class WriteError(Exception): 403 pass 404 405 class Writer(object): 406 def __init__(self, trigger): 407 self._trigger = trigger 408 self._failed = False 409 410 def write(self, data): 411 assert not self._failed, "write() called again after failure" 412 if self._trigger in data: 413 self._failed = True 414 raise WriteError("FAILED: " + self._trigger.decode('utf8')) 415 416 for trigger in ['text', 'root', 'tag', 'noflush']: 417 try: 418 with etree.xmlfile(Writer(trigger.encode('utf8')), encoding='utf8') as xf: 419 with xf.element('root'): 420 xf.flush() 421 with xf.element('tag'): 422 xf.write('text') 423 xf.flush() 424 xf.write('noflush') 425 xf.flush() 426 xf.flush() 427 except WriteError as exc: 428 self.assertTrue('FAILED: ' + trigger in str(exc)) 429 else: 430 self.assertTrue(False, "exception not raised for '%s'" % trigger) 431 432 433class HtmlFileTestCase(_XmlFileTestCaseBase): 434 def setUp(self): 435 self._file = BytesIO() 436 437 def test_void_elements(self): 438 # http://www.w3.org/TR/html5/syntax.html#elements-0 439 void_elements = { 440 "area", "base", "br", "col", "embed", "hr", "img", "input", 441 "keygen", "link", "meta", "param", "source", "track", "wbr"} 442 443 # FIXME: These don't get serialized as void elements. 444 void_elements.difference_update([ 445 'area', 'embed', 'keygen', 'source', 'track', 'wbr' 446 ]) 447 448 for tag in sorted(void_elements): 449 with etree.htmlfile(self._file) as xf: 450 xf.write(etree.Element(tag)) 451 self.assertXml('<%s>' % tag) 452 self._file = BytesIO() 453 454 def test_method_context_manager_misuse(self): 455 with etree.htmlfile(self._file) as xf: 456 with xf.element('foo'): 457 cm = xf.method('xml') 458 cm.__enter__() 459 460 self.assertRaises(LxmlSyntaxError, cm.__enter__) 461 462 cm2 = xf.method('xml') 463 cm2.__enter__() 464 cm2.__exit__(None, None, None) 465 466 self.assertRaises(LxmlSyntaxError, cm2.__exit__, None, None, None) 467 468 cm3 = xf.method('xml') 469 cm3.__enter__() 470 with xf.method('html'): 471 self.assertRaises(LxmlSyntaxError, cm3.__exit__, None, None, None) 472 473 def test_xml_mode_write_inside_html(self): 474 tag = 'foo' 475 attrib = {'selected': 'bar'} 476 elt = etree.Element(tag, attrib=attrib) 477 478 with etree.htmlfile(self._file) as xf: 479 with xf.element("root"): 480 xf.write(elt) # 1 481 482 assert elt.text is None 483 xf.write(elt, method='xml') # 2 484 485 elt.text = "" 486 xf.write(elt, method='xml') # 3 487 488 with xf.element(tag, attrib=attrib, method='xml'): 489 pass # 4 490 491 xf.write(elt) # 5 492 493 with xf.method('xml'): 494 xf.write(elt) # 6 495 496 self.assertXml( 497 '<root>' 498 '<foo selected></foo>' # 1 499 '<foo selected="bar"/>' # 2 500 '<foo selected="bar"></foo>' # 3 501 '<foo selected="bar"></foo>' # 4 502 '<foo selected></foo>' # 5 503 '<foo selected="bar"></foo>' # 6 504 '</root>') 505 self._file = BytesIO() 506 507 def test_xml_mode_element_inside_html(self): 508 # The htmlfile already outputs in xml mode for .element calls. This 509 # test actually illustrates a bug 510 511 with etree.htmlfile(self._file) as xf: 512 with xf.element("root"): 513 with xf.element('foo', attrib={'selected': 'bar'}): 514 pass 515 516 self.assertXml( 517 '<root>' 518 # '<foo selected></foo>' # FIXME: this is the correct output 519 # in html mode 520 '<foo selected="bar"></foo>' 521 '</root>') 522 self._file = BytesIO() 523 524 def test_attribute_quoting(self): 525 with etree.htmlfile(self._file) as xf: 526 with xf.element("tagname", attrib={"attr": '"misquoted"'}): 527 xf.write("foo") 528 529 self.assertXml('<tagname attr=""misquoted"">foo</tagname>') 530 531 def test_attribute_quoting_unicode(self): 532 with etree.htmlfile(self._file) as xf: 533 with xf.element("tagname", attrib={"attr": _str('"misquöted\\u3344\\U00013344"')}): 534 xf.write("foo") 535 536 self.assertXml('<tagname attr=""misquöted㍄𓍄"">foo</tagname>') 537 538 def test_unescaped_script(self): 539 with etree.htmlfile(self._file) as xf: 540 elt = etree.Element('script') 541 elt.text = "if (a < b);" 542 xf.write(elt) 543 self.assertXml('<script>if (a < b);</script>') 544 545 def test_unescaped_script_incremental(self): 546 with etree.htmlfile(self._file) as xf: 547 with xf.element('script'): 548 xf.write("if (a < b);") 549 550 self.assertXml('<script>if (a < b);</script>') 551 552 def test_write_declaration(self): 553 with etree.htmlfile(self._file) as xf: 554 try: 555 xf.write_declaration() 556 except etree.LxmlSyntaxError: 557 self.assertTrue(True) 558 else: 559 self.assertTrue(False) 560 xf.write(etree.Element('html')) 561 562 def test_write_namespaced_element(self): 563 with etree.htmlfile(self._file) as xf: 564 xf.write(etree.Element('{some_ns}some_tag')) 565 self.assertXml('<ns0:some_tag xmlns:ns0="some_ns"></ns0:some_tag>') 566 567 def test_open_namespaced_element(self): 568 with etree.htmlfile(self._file) as xf: 569 with xf.element("{some_ns}some_tag"): 570 pass 571 self.assertXml('<ns0:some_tag xmlns:ns0="some_ns"></ns0:some_tag>') 572 573 574class AsyncXmlFileTestCase(HelperTestCase): 575 def test_async_api(self): 576 out = io.BytesIO() 577 xf = etree.xmlfile(out) 578 scm = xf.__enter__() 579 acm = xf.__aenter__() 580 list(acm.__await__()) # fake await to avoid destructor warning 581 582 def api_of(obj): 583 return sorted(name for name in dir(scm) if not name.startswith('__')) 584 585 a_api = api_of(acm) 586 587 self.assertEqual(api_of(scm), api_of(acm)) 588 self.assertTrue('write' in a_api) 589 self.assertTrue('element' in a_api) 590 self.assertTrue('method' in a_api) 591 self.assertTrue(len(a_api) > 5) 592 593 def _run_async(self, coro): 594 while True: 595 try: 596 coro.send(None) 597 except StopIteration as ex: 598 return ex.value 599 600 @skipIf(sys.version_info < (3, 5), "requires support for async-def (Py3.5+)") 601 def test_async(self): 602 code = textwrap.dedent("""\ 603 async def test_async_xmlfile(close=True, buffered=True): 604 class Writer(object): 605 def __init__(self): 606 self._data = [] 607 self._all_data = None 608 self._calls = 0 609 610 async def write(self, data): 611 self._calls += 1 612 self._data.append(data) 613 614 async def close(self): 615 assert self._all_data is None 616 assert self._data is not None 617 self._all_data = b''.join(self._data) 618 self._data = None # make writing fail afterwards 619 620 async def generate(out, close=True, buffered=True): 621 async with etree.xmlfile(out, close=close, buffered=buffered) as xf: 622 async with xf.element('root'): 623 await xf.write('root-text') 624 async with xf.method('html'): 625 await xf.write(etree.Element('img', src='http://huhu.org/')) 626 await xf.flush() 627 for i in range(3): 628 async with xf.element('el'): 629 await xf.write('text-%d' % i) 630 631 out = Writer() 632 await generate(out, close=close, buffered=buffered) 633 if not close: 634 await out.close() 635 assert out._data is None, out._data 636 return out._all_data, out._calls 637 """) 638 lns = {} 639 exec(code, globals(), lns) 640 test_async_xmlfile = lns['test_async_xmlfile'] 641 642 expected = ( 643 b'<root>root-text<img src="http://huhu.org/">' 644 b'<el>text-0</el><el>text-1</el><el>text-2</el></root>' 645 ) 646 647 data, calls = self._run_async(test_async_xmlfile(close=True)) 648 self.assertEqual(expected, data) 649 self.assertEqual(2, calls) # only flush() and close() 650 651 data, calls = self._run_async(test_async_xmlfile(close=False)) 652 self.assertEqual(expected, data) 653 self.assertEqual(2, calls) # only flush() and close() 654 655 data, unbuffered_calls = self._run_async(test_async_xmlfile(buffered=False)) 656 self.assertEqual(expected, data) 657 self.assertTrue(unbuffered_calls > calls, unbuffered_calls) 658 659 660def test_suite(): 661 suite = unittest.TestSuite() 662 suite.addTests([ 663 unittest.makeSuite(BytesIOXmlFileTestCase), 664 unittest.makeSuite(TempXmlFileTestCase), 665 unittest.makeSuite(TempPathXmlFileTestCase), 666 unittest.makeSuite(SimpleFileLikeXmlFileTestCase), 667 unittest.makeSuite(HtmlFileTestCase), 668 unittest.makeSuite(AsyncXmlFileTestCase), 669 ]) 670 return suite 671 672 673if __name__ == '__main__': 674 print('to test use test.py %s' % __file__) 675