1# Copyright (C) 2007, 2009, 2011, 2012, 2016 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Tests for breezy.pack.""" 18 19from io import BytesIO 20 21from ... import errors, tests 22from .. import ( 23 pack, 24 ) 25 26 27class TestContainerSerialiser(tests.TestCase): 28 """Tests for the ContainerSerialiser class.""" 29 30 def test_construct(self): 31 """Test constructing a ContainerSerialiser.""" 32 pack.ContainerSerialiser() 33 34 def test_begin(self): 35 serialiser = pack.ContainerSerialiser() 36 self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n', 37 serialiser.begin()) 38 39 def test_end(self): 40 serialiser = pack.ContainerSerialiser() 41 self.assertEqual(b'E', serialiser.end()) 42 43 def test_bytes_record_no_name(self): 44 serialiser = pack.ContainerSerialiser() 45 record = serialiser.bytes_record(b'bytes', []) 46 self.assertEqual(b'B5\n\nbytes', record) 47 48 def test_bytes_record_one_name_with_one_part(self): 49 serialiser = pack.ContainerSerialiser() 50 record = serialiser.bytes_record(b'bytes', [(b'name',)]) 51 self.assertEqual(b'B5\nname\n\nbytes', record) 52 53 def test_bytes_record_one_name_with_two_parts(self): 54 serialiser = pack.ContainerSerialiser() 55 record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')]) 56 self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record) 57 58 def test_bytes_record_two_names(self): 59 serialiser = pack.ContainerSerialiser() 60 record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)]) 61 self.assertEqual(b'B5\nname1\nname2\n\nbytes', record) 62 63 def test_bytes_record_whitespace_in_name_part(self): 64 serialiser = pack.ContainerSerialiser() 65 self.assertRaises( 66 errors.InvalidRecordError, 67 serialiser.bytes_record, b'bytes', [(b'bad name',)]) 68 69 def test_bytes_record_header(self): 70 serialiser = pack.ContainerSerialiser() 71 record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)]) 72 self.assertEqual(b'B32\nname1\nname2\n\n', record) 73 74 75class TestContainerWriter(tests.TestCase): 76 77 def setUp(self): 78 super(TestContainerWriter, self).setUp() 79 self.output = BytesIO() 80 self.writer = pack.ContainerWriter(self.output.write) 81 82 def assertOutput(self, expected_output): 83 """Assert that the output of self.writer ContainerWriter is equal to 84 expected_output. 85 """ 86 self.assertEqual(expected_output, self.output.getvalue()) 87 88 def test_construct(self): 89 """Test constructing a ContainerWriter. 90 91 This uses None as the output stream to show that the constructor 92 doesn't try to use the output stream. 93 """ 94 pack.ContainerWriter(None) 95 96 def test_begin(self): 97 """The begin() method writes the container format marker line.""" 98 self.writer.begin() 99 self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\n') 100 101 def test_zero_records_written_after_begin(self): 102 """After begin is written, 0 records have been written.""" 103 self.writer.begin() 104 self.assertEqual(0, self.writer.records_written) 105 106 def test_end(self): 107 """The end() method writes an End Marker record.""" 108 self.writer.begin() 109 self.writer.end() 110 self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\nE') 111 112 def test_empty_end_does_not_add_a_record_to_records_written(self): 113 """The end() method does not count towards the records written.""" 114 self.writer.begin() 115 self.writer.end() 116 self.assertEqual(0, self.writer.records_written) 117 118 def test_non_empty_end_does_not_add_a_record_to_records_written(self): 119 """The end() method does not count towards the records written.""" 120 self.writer.begin() 121 self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[]) 122 self.writer.end() 123 self.assertEqual(1, self.writer.records_written) 124 125 def test_add_bytes_record_no_name(self): 126 """Add a bytes record with no name.""" 127 self.writer.begin() 128 offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[]) 129 self.assertEqual((42, 7), (offset, length)) 130 self.assertOutput( 131 b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc') 132 133 def test_add_bytes_record_one_name(self): 134 """Add a bytes record with one name.""" 135 self.writer.begin() 136 137 offset, length = self.writer.add_bytes_record( 138 [b'abc'], len(b'abc'), names=[(b'name1', )]) 139 self.assertEqual((42, 13), (offset, length)) 140 self.assertOutput( 141 b'Bazaar pack format 1 (introduced in 0.18)\n' 142 b'B3\nname1\n\nabc') 143 144 def test_add_bytes_record_split_writes(self): 145 """Write a large record which does multiple IOs""" 146 147 writes = [] 148 real_write = self.writer.write_func 149 150 def record_writes(data): 151 writes.append(data) 152 return real_write(data) 153 154 self.writer.write_func = record_writes 155 self.writer._JOIN_WRITES_THRESHOLD = 2 156 157 self.writer.begin() 158 offset, length = self.writer.add_bytes_record( 159 [b'abcabc'], len(b'abcabc'), names=[(b'name1', )]) 160 self.assertEqual((42, 16), (offset, length)) 161 self.assertOutput( 162 b'Bazaar pack format 1 (introduced in 0.18)\n' 163 b'B6\nname1\n\nabcabc') 164 165 self.assertEqual([ 166 b'Bazaar pack format 1 (introduced in 0.18)\n', 167 b'B6\nname1\n\n', 168 b'abcabc'], 169 writes) 170 171 def test_add_bytes_record_two_names(self): 172 """Add a bytes record with two names.""" 173 self.writer.begin() 174 offset, length = self.writer.add_bytes_record( 175 [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )]) 176 self.assertEqual((42, 19), (offset, length)) 177 self.assertOutput( 178 b'Bazaar pack format 1 (introduced in 0.18)\n' 179 b'B3\nname1\nname2\n\nabc') 180 181 def test_add_bytes_record_two_names(self): 182 """Add a bytes record with two names.""" 183 self.writer.begin() 184 offset, length = self.writer.add_bytes_record( 185 [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )]) 186 self.assertEqual((42, 19), (offset, length)) 187 self.assertOutput( 188 b'Bazaar pack format 1 (introduced in 0.18)\n' 189 b'B3\nname1\nname2\n\nabc') 190 191 def test_add_bytes_record_two_element_name(self): 192 """Add a bytes record with a two-element name.""" 193 self.writer.begin() 194 offset, length = self.writer.add_bytes_record( 195 [b'abc'], len(b'abc'), names=[(b'name1', b'name2')]) 196 self.assertEqual((42, 19), (offset, length)) 197 self.assertOutput( 198 b'Bazaar pack format 1 (introduced in 0.18)\n' 199 b'B3\nname1\x00name2\n\nabc') 200 201 def test_add_second_bytes_record_gets_higher_offset(self): 202 self.writer.begin() 203 self.writer.add_bytes_record([b'a', b'bc'], len(b'abc'), names=[]) 204 offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[]) 205 self.assertEqual((49, 7), (offset, length)) 206 self.assertOutput( 207 b'Bazaar pack format 1 (introduced in 0.18)\n' 208 b'B3\n\nabc' 209 b'B3\n\nabc') 210 211 def test_add_bytes_record_invalid_name(self): 212 """Adding a Bytes record with a name with whitespace in it raises 213 InvalidRecordError. 214 """ 215 self.writer.begin() 216 self.assertRaises( 217 errors.InvalidRecordError, 218 self.writer.add_bytes_record, [b'abc'], len(b'abc'), names=[(b'bad name', )]) 219 220 def test_add_bytes_records_add_to_records_written(self): 221 """Adding a Bytes record increments the records_written counter.""" 222 self.writer.begin() 223 self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[]) 224 self.assertEqual(1, self.writer.records_written) 225 self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[]) 226 self.assertEqual(2, self.writer.records_written) 227 228 229class TestContainerReader(tests.TestCase): 230 """Tests for the ContainerReader. 231 232 The ContainerReader reads format 1 containers, so these tests explicitly 233 test how it reacts to format 1 data. If a new version of the format is 234 added, then separate tests for that format should be added. 235 """ 236 237 def get_reader_for(self, data): 238 stream = BytesIO(data) 239 reader = pack.ContainerReader(stream) 240 return reader 241 242 def test_construct(self): 243 """Test constructing a ContainerReader. 244 245 This uses None as the output stream to show that the constructor 246 doesn't try to use the input stream. 247 """ 248 pack.ContainerReader(None) 249 250 def test_empty_container(self): 251 """Read an empty container.""" 252 reader = self.get_reader_for( 253 b"Bazaar pack format 1 (introduced in 0.18)\nE") 254 self.assertEqual([], list(reader.iter_records())) 255 256 def test_unknown_format(self): 257 """Unrecognised container formats raise UnknownContainerFormatError.""" 258 reader = self.get_reader_for(b"unknown format\n") 259 self.assertRaises( 260 errors.UnknownContainerFormatError, reader.iter_records) 261 262 def test_unexpected_end_of_container(self): 263 """Containers that don't end with an End Marker record should cause 264 UnexpectedEndOfContainerError to be raised. 265 """ 266 reader = self.get_reader_for( 267 b"Bazaar pack format 1 (introduced in 0.18)\n") 268 iterator = reader.iter_records() 269 self.assertRaises( 270 errors.UnexpectedEndOfContainerError, next, iterator) 271 272 def test_unknown_record_type(self): 273 """Unknown record types cause UnknownRecordTypeError to be raised.""" 274 reader = self.get_reader_for( 275 b"Bazaar pack format 1 (introduced in 0.18)\nX") 276 iterator = reader.iter_records() 277 self.assertRaises( 278 errors.UnknownRecordTypeError, next, iterator) 279 280 def test_container_with_one_unnamed_record(self): 281 """Read a container with one Bytes record. 282 283 Parsing Bytes records is more thoroughly exercised by 284 TestBytesRecordReader. This test is here to ensure that 285 ContainerReader's integration with BytesRecordReader is working. 286 """ 287 reader = self.get_reader_for( 288 b"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE") 289 expected_records = [([], b'aaaaa')] 290 self.assertEqual( 291 expected_records, 292 [(names, read_bytes(None)) 293 for (names, read_bytes) in reader.iter_records()]) 294 295 def test_validate_empty_container(self): 296 """validate does not raise an error for a container with no records.""" 297 reader = self.get_reader_for( 298 b"Bazaar pack format 1 (introduced in 0.18)\nE") 299 # No exception raised 300 reader.validate() 301 302 def test_validate_non_empty_valid_container(self): 303 """validate does not raise an error for a container with a valid record. 304 """ 305 reader = self.get_reader_for( 306 b"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE") 307 # No exception raised 308 reader.validate() 309 310 def test_validate_bad_format(self): 311 """validate raises an error for unrecognised format strings. 312 313 It may raise either UnexpectedEndOfContainerError or 314 UnknownContainerFormatError, depending on exactly what the string is. 315 """ 316 inputs = [ 317 b"", b"x", b"Bazaar pack format 1 (introduced in 0.18)", b"bad\n"] 318 for input in inputs: 319 reader = self.get_reader_for(input) 320 self.assertRaises( 321 (errors.UnexpectedEndOfContainerError, 322 errors.UnknownContainerFormatError), 323 reader.validate) 324 325 def test_validate_bad_record_marker(self): 326 """validate raises UnknownRecordTypeError for unrecognised record 327 types. 328 """ 329 reader = self.get_reader_for( 330 b"Bazaar pack format 1 (introduced in 0.18)\nX") 331 self.assertRaises(errors.UnknownRecordTypeError, reader.validate) 332 333 def test_validate_data_after_end_marker(self): 334 """validate raises ContainerHasExcessDataError if there are any bytes 335 after the end of the container. 336 """ 337 reader = self.get_reader_for( 338 b"Bazaar pack format 1 (introduced in 0.18)\nEcrud") 339 self.assertRaises( 340 errors.ContainerHasExcessDataError, reader.validate) 341 342 def test_validate_no_end_marker(self): 343 """validate raises UnexpectedEndOfContainerError if there's no end of 344 container marker, even if the container up to this point has been 345 valid. 346 """ 347 reader = self.get_reader_for( 348 b"Bazaar pack format 1 (introduced in 0.18)\n") 349 self.assertRaises( 350 errors.UnexpectedEndOfContainerError, reader.validate) 351 352 def test_validate_duplicate_name(self): 353 """validate raises DuplicateRecordNameError if the same name occurs 354 multiple times in the container. 355 """ 356 reader = self.get_reader_for( 357 b"Bazaar pack format 1 (introduced in 0.18)\n" 358 b"B0\nname\n\n" 359 b"B0\nname\n\n" 360 b"E") 361 self.assertRaises(errors.DuplicateRecordNameError, reader.validate) 362 363 def test_validate_undecodeable_name(self): 364 """Names that aren't valid UTF-8 cause validate to fail.""" 365 reader = self.get_reader_for( 366 b"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE") 367 self.assertRaises(errors.InvalidRecordError, reader.validate) 368 369 370class TestBytesRecordReader(tests.TestCase): 371 """Tests for reading and validating Bytes records with 372 BytesRecordReader. 373 374 Like TestContainerReader, this explicitly tests the reading of format 1 375 data. If a new version of the format is added, then a separate set of 376 tests for reading that format should be added. 377 """ 378 379 def get_reader_for(self, data): 380 stream = BytesIO(data) 381 reader = pack.BytesRecordReader(stream) 382 return reader 383 384 def test_record_with_no_name(self): 385 """Reading a Bytes record with no name returns an empty list of 386 names. 387 """ 388 reader = self.get_reader_for(b"5\n\naaaaa") 389 names, get_bytes = reader.read() 390 self.assertEqual([], names) 391 self.assertEqual(b'aaaaa', get_bytes(None)) 392 393 def test_record_with_one_name(self): 394 """Reading a Bytes record with one name returns a list of just that 395 name. 396 """ 397 reader = self.get_reader_for(b"5\nname1\n\naaaaa") 398 names, get_bytes = reader.read() 399 self.assertEqual([(b'name1', )], names) 400 self.assertEqual(b'aaaaa', get_bytes(None)) 401 402 def test_record_with_two_names(self): 403 """Reading a Bytes record with two names returns a list of both names. 404 """ 405 reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa") 406 names, get_bytes = reader.read() 407 self.assertEqual([(b'name1', ), (b'name2', )], names) 408 self.assertEqual(b'aaaaa', get_bytes(None)) 409 410 def test_record_with_two_part_names(self): 411 """Reading a Bytes record with a two_part name reads both.""" 412 reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa") 413 names, get_bytes = reader.read() 414 self.assertEqual([(b'name1', b'name2', )], names) 415 self.assertEqual(b'aaaaa', get_bytes(None)) 416 417 def test_invalid_length(self): 418 """If the length-prefix is not a number, parsing raises 419 InvalidRecordError. 420 """ 421 reader = self.get_reader_for(b"not a number\n") 422 self.assertRaises(errors.InvalidRecordError, reader.read) 423 424 def test_early_eof(self): 425 """Tests for premature EOF occuring during parsing Bytes records with 426 BytesRecordReader. 427 428 A incomplete container might be interrupted at any point. The 429 BytesRecordReader needs to cope with the input stream running out no 430 matter where it is in the parsing process. 431 432 In all cases, UnexpectedEndOfContainerError should be raised. 433 """ 434 complete_record = b"6\nname\n\nabcdef" 435 for count in range(0, len(complete_record)): 436 incomplete_record = complete_record[:count] 437 reader = self.get_reader_for(incomplete_record) 438 # We don't use assertRaises to make diagnosing failures easier 439 # (assertRaises doesn't allow a custom failure message). 440 try: 441 names, read_bytes = reader.read() 442 read_bytes(None) 443 except errors.UnexpectedEndOfContainerError: 444 pass 445 else: 446 self.fail( 447 "UnexpectedEndOfContainerError not raised when parsing %r" 448 % (incomplete_record,)) 449 450 def test_initial_eof(self): 451 """EOF before any bytes read at all.""" 452 reader = self.get_reader_for(b"") 453 self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read) 454 455 def test_eof_after_length(self): 456 """EOF after reading the length and before reading name(s).""" 457 reader = self.get_reader_for(b"123\n") 458 self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read) 459 460 def test_eof_during_name(self): 461 """EOF during reading a name.""" 462 reader = self.get_reader_for(b"123\nname") 463 self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read) 464 465 def test_read_invalid_name_whitespace(self): 466 """Names must have no whitespace.""" 467 # A name with a space. 468 reader = self.get_reader_for(b"0\nbad name\n\n") 469 self.assertRaises(errors.InvalidRecordError, reader.read) 470 471 # A name with a tab. 472 reader = self.get_reader_for(b"0\nbad\tname\n\n") 473 self.assertRaises(errors.InvalidRecordError, reader.read) 474 475 # A name with a vertical tab. 476 reader = self.get_reader_for(b"0\nbad\vname\n\n") 477 self.assertRaises(errors.InvalidRecordError, reader.read) 478 479 def test_validate_whitespace_in_name(self): 480 """Names must have no whitespace.""" 481 reader = self.get_reader_for(b"0\nbad name\n\n") 482 self.assertRaises(errors.InvalidRecordError, reader.validate) 483 484 def test_validate_interrupted_prelude(self): 485 """EOF during reading a record's prelude causes validate to fail.""" 486 reader = self.get_reader_for(b"") 487 self.assertRaises( 488 errors.UnexpectedEndOfContainerError, reader.validate) 489 490 def test_validate_interrupted_body(self): 491 """EOF during reading a record's body causes validate to fail.""" 492 reader = self.get_reader_for(b"1\n\n") 493 self.assertRaises( 494 errors.UnexpectedEndOfContainerError, reader.validate) 495 496 def test_validate_unparseable_length(self): 497 """An unparseable record length causes validate to fail.""" 498 reader = self.get_reader_for(b"\n\n") 499 self.assertRaises( 500 errors.InvalidRecordError, reader.validate) 501 502 def test_validate_undecodeable_name(self): 503 """Names that aren't valid UTF-8 cause validate to fail.""" 504 reader = self.get_reader_for(b"0\n\xcc\n\n") 505 self.assertRaises(errors.InvalidRecordError, reader.validate) 506 507 def test_read_max_length(self): 508 """If the max_length passed to the callable returned by read is not 509 None, then no more than that many bytes will be read. 510 """ 511 reader = self.get_reader_for(b"6\n\nabcdef") 512 names, get_bytes = reader.read() 513 self.assertEqual(b'abc', get_bytes(3)) 514 515 def test_read_no_max_length(self): 516 """If the max_length passed to the callable returned by read is None, 517 then all the bytes in the record will be read. 518 """ 519 reader = self.get_reader_for(b"6\n\nabcdef") 520 names, get_bytes = reader.read() 521 self.assertEqual(b'abcdef', get_bytes(None)) 522 523 def test_repeated_read_calls(self): 524 """Repeated calls to the callable returned from BytesRecordReader.read 525 will not read beyond the end of the record. 526 """ 527 reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX") 528 names, get_bytes = reader.read() 529 self.assertEqual(b'abcdef', get_bytes(None)) 530 self.assertEqual(b'', get_bytes(None)) 531 self.assertEqual(b'', get_bytes(99)) 532 533 534class TestMakeReadvReader(tests.TestCaseWithTransport): 535 536 def test_read_skipping_records(self): 537 pack_data = BytesIO() 538 writer = pack.ContainerWriter(pack_data.write) 539 writer.begin() 540 memos = [] 541 memos.append(writer.add_bytes_record([b'abc'], 3, names=[])) 542 memos.append(writer.add_bytes_record([b'def'], 3, names=[(b'name1', )])) 543 memos.append(writer.add_bytes_record([b'ghi'], 3, names=[(b'name2', )])) 544 memos.append(writer.add_bytes_record([b'jkl'], 3, names=[])) 545 writer.end() 546 transport = self.get_transport() 547 transport.put_bytes('mypack', pack_data.getvalue()) 548 requested_records = [memos[0], memos[2]] 549 reader = pack.make_readv_reader(transport, 'mypack', requested_records) 550 result = [] 551 for names, reader_func in reader.iter_records(): 552 result.append((names, reader_func(None))) 553 self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result) 554 555 556class TestReadvFile(tests.TestCaseWithTransport): 557 """Tests of the ReadVFile class. 558 559 Error cases are deliberately undefined: this code adapts the underlying 560 transport interface to a single 'streaming read' interface as 561 ContainerReader needs. 562 """ 563 564 def test_read_bytes(self): 565 """Test reading of both single bytes and all bytes in a hunk.""" 566 transport = self.get_transport() 567 transport.put_bytes('sample', b'0123456789') 568 f = pack.ReadVFile(transport.readv( 569 'sample', [(0, 1), (1, 2), (4, 1), (6, 2)])) 570 results = [] 571 results.append(f.read(1)) 572 results.append(f.read(2)) 573 results.append(f.read(1)) 574 results.append(f.read(1)) 575 results.append(f.read(1)) 576 self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results) 577 578 def test_readline(self): 579 """Test using readline() as ContainerReader does. 580 581 This is always within a readv hunk, never across it. 582 """ 583 transport = self.get_transport() 584 transport.put_bytes('sample', b'0\n2\n4\n') 585 f = pack.ReadVFile(transport.readv('sample', [(0, 2), (2, 4)])) 586 results = [] 587 results.append(f.readline()) 588 results.append(f.readline()) 589 results.append(f.readline()) 590 self.assertEqual([b'0\n', b'2\n', b'4\n'], results) 591 592 def test_readline_and_read(self): 593 """Test exercising one byte reads, readline, and then read again.""" 594 transport = self.get_transport() 595 transport.put_bytes('sample', b'0\n2\n4\n') 596 f = pack.ReadVFile(transport.readv('sample', [(0, 6)])) 597 results = [] 598 results.append(f.read(1)) 599 results.append(f.readline()) 600 results.append(f.read(4)) 601 self.assertEqual([b'0', b'\n', b'2\n4\n'], results) 602 603 604class PushParserTestCase(tests.TestCase): 605 """Base class for TestCases involving ContainerPushParser.""" 606 607 def make_parser_expecting_record_type(self): 608 parser = pack.ContainerPushParser() 609 parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\n") 610 return parser 611 612 def make_parser_expecting_bytes_record(self): 613 parser = pack.ContainerPushParser() 614 parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\nB") 615 return parser 616 617 def assertRecordParsing(self, expected_record, data): 618 """Assert that 'bytes' is parsed as a given bytes record. 619 620 :param expected_record: A tuple of (names, bytes). 621 """ 622 parser = self.make_parser_expecting_bytes_record() 623 parser.accept_bytes(data) 624 parsed_records = parser.read_pending_records() 625 self.assertEqual([expected_record], parsed_records) 626 627 628class TestContainerPushParser(PushParserTestCase): 629 """Tests for ContainerPushParser. 630 631 The ContainerPushParser reads format 1 containers, so these tests 632 explicitly test how it reacts to format 1 data. If a new version of the 633 format is added, then separate tests for that format should be added. 634 """ 635 636 def test_construct(self): 637 """ContainerPushParser can be constructed.""" 638 pack.ContainerPushParser() 639 640 def test_multiple_records_at_once(self): 641 """If multiple records worth of data are fed to the parser in one 642 string, the parser will correctly parse all the records. 643 644 (A naive implementation might stop after parsing the first record.) 645 """ 646 parser = self.make_parser_expecting_record_type() 647 parser.accept_bytes(b"B5\nname1\n\nbody1B5\nname2\n\nbody2") 648 self.assertEqual( 649 [([(b'name1',)], b'body1'), ([(b'name2',)], b'body2')], 650 parser.read_pending_records()) 651 652 def test_multiple_empty_records_at_once(self): 653 """If multiple empty records worth of data are fed to the parser in one 654 string, the parser will correctly parse all the records. 655 656 (A naive implementation might stop after parsing the first empty 657 record, because the buffer size had not changed.) 658 """ 659 parser = self.make_parser_expecting_record_type() 660 parser.accept_bytes(b"B0\nname1\n\nB0\nname2\n\n") 661 self.assertEqual( 662 [([(b'name1',)], b''), ([(b'name2',)], b'')], 663 parser.read_pending_records()) 664 665 666class TestContainerPushParserBytesParsing(PushParserTestCase): 667 """Tests for reading Bytes records with ContainerPushParser. 668 669 The ContainerPushParser reads format 1 containers, so these tests 670 explicitly test how it reacts to format 1 data. If a new version of the 671 format is added, then separate tests for that format should be added. 672 """ 673 674 def test_record_with_no_name(self): 675 """Reading a Bytes record with no name returns an empty list of 676 names. 677 """ 678 self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa") 679 680 def test_record_with_one_name(self): 681 """Reading a Bytes record with one name returns a list of just that 682 name. 683 """ 684 self.assertRecordParsing( 685 ([(b'name1', )], b'aaaaa'), 686 b"5\nname1\n\naaaaa") 687 688 def test_record_with_two_names(self): 689 """Reading a Bytes record with two names returns a list of both names. 690 """ 691 self.assertRecordParsing( 692 ([(b'name1', ), (b'name2', )], b'aaaaa'), 693 b"5\nname1\nname2\n\naaaaa") 694 695 def test_record_with_two_part_names(self): 696 """Reading a Bytes record with a two_part name reads both.""" 697 self.assertRecordParsing( 698 ([(b'name1', b'name2')], b'aaaaa'), 699 b"5\nname1\x00name2\n\naaaaa") 700 701 def test_invalid_length(self): 702 """If the length-prefix is not a number, parsing raises 703 InvalidRecordError. 704 """ 705 parser = self.make_parser_expecting_bytes_record() 706 self.assertRaises( 707 errors.InvalidRecordError, parser.accept_bytes, b"not a number\n") 708 709 def test_incomplete_record(self): 710 """If the bytes seen so far don't form a complete record, then there 711 will be nothing returned by read_pending_records. 712 """ 713 parser = self.make_parser_expecting_bytes_record() 714 parser.accept_bytes(b"5\n\nabcd") 715 self.assertEqual([], parser.read_pending_records()) 716 717 def test_accept_nothing(self): 718 """The edge case of parsing an empty string causes no error.""" 719 parser = self.make_parser_expecting_bytes_record() 720 parser.accept_bytes(b"") 721 722 def assertInvalidRecord(self, data): 723 """Assert that parsing the given bytes raises InvalidRecordError.""" 724 parser = self.make_parser_expecting_bytes_record() 725 self.assertRaises( 726 errors.InvalidRecordError, parser.accept_bytes, data) 727 728 def test_read_invalid_name_whitespace(self): 729 """Names must have no whitespace.""" 730 # A name with a space. 731 self.assertInvalidRecord(b"0\nbad name\n\n") 732 733 # A name with a tab. 734 self.assertInvalidRecord(b"0\nbad\tname\n\n") 735 736 # A name with a vertical tab. 737 self.assertInvalidRecord(b"0\nbad\vname\n\n") 738 739 def test_repeated_read_pending_records(self): 740 """read_pending_records will not return the same record twice.""" 741 parser = self.make_parser_expecting_bytes_record() 742 parser.accept_bytes(b"6\n\nabcdef") 743 self.assertEqual([([], b'abcdef')], parser.read_pending_records()) 744 self.assertEqual([], parser.read_pending_records()) 745