1# Copyright (C) 2007, 2009, 2011, 2012, 2016 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Tests for breezy.pack."""
18
19from io import BytesIO
20
21from ... import errors, tests
22from .. import (
23    pack,
24    )
25
26
27class TestContainerSerialiser(tests.TestCase):
28    """Tests for the ContainerSerialiser class."""
29
30    def test_construct(self):
31        """Test constructing a ContainerSerialiser."""
32        pack.ContainerSerialiser()
33
34    def test_begin(self):
35        serialiser = pack.ContainerSerialiser()
36        self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
37                         serialiser.begin())
38
39    def test_end(self):
40        serialiser = pack.ContainerSerialiser()
41        self.assertEqual(b'E', serialiser.end())
42
43    def test_bytes_record_no_name(self):
44        serialiser = pack.ContainerSerialiser()
45        record = serialiser.bytes_record(b'bytes', [])
46        self.assertEqual(b'B5\n\nbytes', record)
47
48    def test_bytes_record_one_name_with_one_part(self):
49        serialiser = pack.ContainerSerialiser()
50        record = serialiser.bytes_record(b'bytes', [(b'name',)])
51        self.assertEqual(b'B5\nname\n\nbytes', record)
52
53    def test_bytes_record_one_name_with_two_parts(self):
54        serialiser = pack.ContainerSerialiser()
55        record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
56        self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
57
58    def test_bytes_record_two_names(self):
59        serialiser = pack.ContainerSerialiser()
60        record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
61        self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
62
63    def test_bytes_record_whitespace_in_name_part(self):
64        serialiser = pack.ContainerSerialiser()
65        self.assertRaises(
66            errors.InvalidRecordError,
67            serialiser.bytes_record, b'bytes', [(b'bad name',)])
68
69    def test_bytes_record_header(self):
70        serialiser = pack.ContainerSerialiser()
71        record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
72        self.assertEqual(b'B32\nname1\nname2\n\n', record)
73
74
75class TestContainerWriter(tests.TestCase):
76
77    def setUp(self):
78        super(TestContainerWriter, self).setUp()
79        self.output = BytesIO()
80        self.writer = pack.ContainerWriter(self.output.write)
81
82    def assertOutput(self, expected_output):
83        """Assert that the output of self.writer ContainerWriter is equal to
84        expected_output.
85        """
86        self.assertEqual(expected_output, self.output.getvalue())
87
88    def test_construct(self):
89        """Test constructing a ContainerWriter.
90
91        This uses None as the output stream to show that the constructor
92        doesn't try to use the output stream.
93        """
94        pack.ContainerWriter(None)
95
96    def test_begin(self):
97        """The begin() method writes the container format marker line."""
98        self.writer.begin()
99        self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\n')
100
101    def test_zero_records_written_after_begin(self):
102        """After begin is written, 0 records have been written."""
103        self.writer.begin()
104        self.assertEqual(0, self.writer.records_written)
105
106    def test_end(self):
107        """The end() method writes an End Marker record."""
108        self.writer.begin()
109        self.writer.end()
110        self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\nE')
111
112    def test_empty_end_does_not_add_a_record_to_records_written(self):
113        """The end() method does not count towards the records written."""
114        self.writer.begin()
115        self.writer.end()
116        self.assertEqual(0, self.writer.records_written)
117
118    def test_non_empty_end_does_not_add_a_record_to_records_written(self):
119        """The end() method does not count towards the records written."""
120        self.writer.begin()
121        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
122        self.writer.end()
123        self.assertEqual(1, self.writer.records_written)
124
125    def test_add_bytes_record_no_name(self):
126        """Add a bytes record with no name."""
127        self.writer.begin()
128        offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
129        self.assertEqual((42, 7), (offset, length))
130        self.assertOutput(
131            b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
132
133    def test_add_bytes_record_one_name(self):
134        """Add a bytes record with one name."""
135        self.writer.begin()
136
137        offset, length = self.writer.add_bytes_record(
138            [b'abc'], len(b'abc'), names=[(b'name1', )])
139        self.assertEqual((42, 13), (offset, length))
140        self.assertOutput(
141            b'Bazaar pack format 1 (introduced in 0.18)\n'
142            b'B3\nname1\n\nabc')
143
144    def test_add_bytes_record_split_writes(self):
145        """Write a large record which does multiple IOs"""
146
147        writes = []
148        real_write = self.writer.write_func
149
150        def record_writes(data):
151            writes.append(data)
152            return real_write(data)
153
154        self.writer.write_func = record_writes
155        self.writer._JOIN_WRITES_THRESHOLD = 2
156
157        self.writer.begin()
158        offset, length = self.writer.add_bytes_record(
159            [b'abcabc'], len(b'abcabc'), names=[(b'name1', )])
160        self.assertEqual((42, 16), (offset, length))
161        self.assertOutput(
162            b'Bazaar pack format 1 (introduced in 0.18)\n'
163            b'B6\nname1\n\nabcabc')
164
165        self.assertEqual([
166            b'Bazaar pack format 1 (introduced in 0.18)\n',
167            b'B6\nname1\n\n',
168            b'abcabc'],
169            writes)
170
171    def test_add_bytes_record_two_names(self):
172        """Add a bytes record with two names."""
173        self.writer.begin()
174        offset, length = self.writer.add_bytes_record(
175            [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
176        self.assertEqual((42, 19), (offset, length))
177        self.assertOutput(
178            b'Bazaar pack format 1 (introduced in 0.18)\n'
179            b'B3\nname1\nname2\n\nabc')
180
181    def test_add_bytes_record_two_names(self):
182        """Add a bytes record with two names."""
183        self.writer.begin()
184        offset, length = self.writer.add_bytes_record(
185            [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
186        self.assertEqual((42, 19), (offset, length))
187        self.assertOutput(
188            b'Bazaar pack format 1 (introduced in 0.18)\n'
189            b'B3\nname1\nname2\n\nabc')
190
191    def test_add_bytes_record_two_element_name(self):
192        """Add a bytes record with a two-element name."""
193        self.writer.begin()
194        offset, length = self.writer.add_bytes_record(
195            [b'abc'], len(b'abc'), names=[(b'name1', b'name2')])
196        self.assertEqual((42, 19), (offset, length))
197        self.assertOutput(
198            b'Bazaar pack format 1 (introduced in 0.18)\n'
199            b'B3\nname1\x00name2\n\nabc')
200
201    def test_add_second_bytes_record_gets_higher_offset(self):
202        self.writer.begin()
203        self.writer.add_bytes_record([b'a', b'bc'], len(b'abc'), names=[])
204        offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
205        self.assertEqual((49, 7), (offset, length))
206        self.assertOutput(
207            b'Bazaar pack format 1 (introduced in 0.18)\n'
208            b'B3\n\nabc'
209            b'B3\n\nabc')
210
211    def test_add_bytes_record_invalid_name(self):
212        """Adding a Bytes record with a name with whitespace in it raises
213        InvalidRecordError.
214        """
215        self.writer.begin()
216        self.assertRaises(
217            errors.InvalidRecordError,
218            self.writer.add_bytes_record, [b'abc'], len(b'abc'), names=[(b'bad name', )])
219
220    def test_add_bytes_records_add_to_records_written(self):
221        """Adding a Bytes record increments the records_written counter."""
222        self.writer.begin()
223        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
224        self.assertEqual(1, self.writer.records_written)
225        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
226        self.assertEqual(2, self.writer.records_written)
227
228
229class TestContainerReader(tests.TestCase):
230    """Tests for the ContainerReader.
231
232    The ContainerReader reads format 1 containers, so these tests explicitly
233    test how it reacts to format 1 data.  If a new version of the format is
234    added, then separate tests for that format should be added.
235    """
236
237    def get_reader_for(self, data):
238        stream = BytesIO(data)
239        reader = pack.ContainerReader(stream)
240        return reader
241
242    def test_construct(self):
243        """Test constructing a ContainerReader.
244
245        This uses None as the output stream to show that the constructor
246        doesn't try to use the input stream.
247        """
248        pack.ContainerReader(None)
249
250    def test_empty_container(self):
251        """Read an empty container."""
252        reader = self.get_reader_for(
253            b"Bazaar pack format 1 (introduced in 0.18)\nE")
254        self.assertEqual([], list(reader.iter_records()))
255
256    def test_unknown_format(self):
257        """Unrecognised container formats raise UnknownContainerFormatError."""
258        reader = self.get_reader_for(b"unknown format\n")
259        self.assertRaises(
260            errors.UnknownContainerFormatError, reader.iter_records)
261
262    def test_unexpected_end_of_container(self):
263        """Containers that don't end with an End Marker record should cause
264        UnexpectedEndOfContainerError to be raised.
265        """
266        reader = self.get_reader_for(
267            b"Bazaar pack format 1 (introduced in 0.18)\n")
268        iterator = reader.iter_records()
269        self.assertRaises(
270            errors.UnexpectedEndOfContainerError, next, iterator)
271
272    def test_unknown_record_type(self):
273        """Unknown record types cause UnknownRecordTypeError to be raised."""
274        reader = self.get_reader_for(
275            b"Bazaar pack format 1 (introduced in 0.18)\nX")
276        iterator = reader.iter_records()
277        self.assertRaises(
278            errors.UnknownRecordTypeError, next, iterator)
279
280    def test_container_with_one_unnamed_record(self):
281        """Read a container with one Bytes record.
282
283        Parsing Bytes records is more thoroughly exercised by
284        TestBytesRecordReader.  This test is here to ensure that
285        ContainerReader's integration with BytesRecordReader is working.
286        """
287        reader = self.get_reader_for(
288            b"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
289        expected_records = [([], b'aaaaa')]
290        self.assertEqual(
291            expected_records,
292            [(names, read_bytes(None))
293             for (names, read_bytes) in reader.iter_records()])
294
295    def test_validate_empty_container(self):
296        """validate does not raise an error for a container with no records."""
297        reader = self.get_reader_for(
298            b"Bazaar pack format 1 (introduced in 0.18)\nE")
299        # No exception raised
300        reader.validate()
301
302    def test_validate_non_empty_valid_container(self):
303        """validate does not raise an error for a container with a valid record.
304        """
305        reader = self.get_reader_for(
306            b"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
307        # No exception raised
308        reader.validate()
309
310    def test_validate_bad_format(self):
311        """validate raises an error for unrecognised format strings.
312
313        It may raise either UnexpectedEndOfContainerError or
314        UnknownContainerFormatError, depending on exactly what the string is.
315        """
316        inputs = [
317            b"", b"x", b"Bazaar pack format 1 (introduced in 0.18)", b"bad\n"]
318        for input in inputs:
319            reader = self.get_reader_for(input)
320            self.assertRaises(
321                (errors.UnexpectedEndOfContainerError,
322                 errors.UnknownContainerFormatError),
323                reader.validate)
324
325    def test_validate_bad_record_marker(self):
326        """validate raises UnknownRecordTypeError for unrecognised record
327        types.
328        """
329        reader = self.get_reader_for(
330            b"Bazaar pack format 1 (introduced in 0.18)\nX")
331        self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
332
333    def test_validate_data_after_end_marker(self):
334        """validate raises ContainerHasExcessDataError if there are any bytes
335        after the end of the container.
336        """
337        reader = self.get_reader_for(
338            b"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
339        self.assertRaises(
340            errors.ContainerHasExcessDataError, reader.validate)
341
342    def test_validate_no_end_marker(self):
343        """validate raises UnexpectedEndOfContainerError if there's no end of
344        container marker, even if the container up to this point has been
345        valid.
346        """
347        reader = self.get_reader_for(
348            b"Bazaar pack format 1 (introduced in 0.18)\n")
349        self.assertRaises(
350            errors.UnexpectedEndOfContainerError, reader.validate)
351
352    def test_validate_duplicate_name(self):
353        """validate raises DuplicateRecordNameError if the same name occurs
354        multiple times in the container.
355        """
356        reader = self.get_reader_for(
357            b"Bazaar pack format 1 (introduced in 0.18)\n"
358            b"B0\nname\n\n"
359            b"B0\nname\n\n"
360            b"E")
361        self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
362
363    def test_validate_undecodeable_name(self):
364        """Names that aren't valid UTF-8 cause validate to fail."""
365        reader = self.get_reader_for(
366            b"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
367        self.assertRaises(errors.InvalidRecordError, reader.validate)
368
369
370class TestBytesRecordReader(tests.TestCase):
371    """Tests for reading and validating Bytes records with
372    BytesRecordReader.
373
374    Like TestContainerReader, this explicitly tests the reading of format 1
375    data.  If a new version of the format is added, then a separate set of
376    tests for reading that format should be added.
377    """
378
379    def get_reader_for(self, data):
380        stream = BytesIO(data)
381        reader = pack.BytesRecordReader(stream)
382        return reader
383
384    def test_record_with_no_name(self):
385        """Reading a Bytes record with no name returns an empty list of
386        names.
387        """
388        reader = self.get_reader_for(b"5\n\naaaaa")
389        names, get_bytes = reader.read()
390        self.assertEqual([], names)
391        self.assertEqual(b'aaaaa', get_bytes(None))
392
393    def test_record_with_one_name(self):
394        """Reading a Bytes record with one name returns a list of just that
395        name.
396        """
397        reader = self.get_reader_for(b"5\nname1\n\naaaaa")
398        names, get_bytes = reader.read()
399        self.assertEqual([(b'name1', )], names)
400        self.assertEqual(b'aaaaa', get_bytes(None))
401
402    def test_record_with_two_names(self):
403        """Reading a Bytes record with two names returns a list of both names.
404        """
405        reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
406        names, get_bytes = reader.read()
407        self.assertEqual([(b'name1', ), (b'name2', )], names)
408        self.assertEqual(b'aaaaa', get_bytes(None))
409
410    def test_record_with_two_part_names(self):
411        """Reading a Bytes record with a two_part name reads both."""
412        reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
413        names, get_bytes = reader.read()
414        self.assertEqual([(b'name1', b'name2', )], names)
415        self.assertEqual(b'aaaaa', get_bytes(None))
416
417    def test_invalid_length(self):
418        """If the length-prefix is not a number, parsing raises
419        InvalidRecordError.
420        """
421        reader = self.get_reader_for(b"not a number\n")
422        self.assertRaises(errors.InvalidRecordError, reader.read)
423
424    def test_early_eof(self):
425        """Tests for premature EOF occuring during parsing Bytes records with
426        BytesRecordReader.
427
428        A incomplete container might be interrupted at any point.  The
429        BytesRecordReader needs to cope with the input stream running out no
430        matter where it is in the parsing process.
431
432        In all cases, UnexpectedEndOfContainerError should be raised.
433        """
434        complete_record = b"6\nname\n\nabcdef"
435        for count in range(0, len(complete_record)):
436            incomplete_record = complete_record[:count]
437            reader = self.get_reader_for(incomplete_record)
438            # We don't use assertRaises to make diagnosing failures easier
439            # (assertRaises doesn't allow a custom failure message).
440            try:
441                names, read_bytes = reader.read()
442                read_bytes(None)
443            except errors.UnexpectedEndOfContainerError:
444                pass
445            else:
446                self.fail(
447                    "UnexpectedEndOfContainerError not raised when parsing %r"
448                    % (incomplete_record,))
449
450    def test_initial_eof(self):
451        """EOF before any bytes read at all."""
452        reader = self.get_reader_for(b"")
453        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
454
455    def test_eof_after_length(self):
456        """EOF after reading the length and before reading name(s)."""
457        reader = self.get_reader_for(b"123\n")
458        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
459
460    def test_eof_during_name(self):
461        """EOF during reading a name."""
462        reader = self.get_reader_for(b"123\nname")
463        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
464
465    def test_read_invalid_name_whitespace(self):
466        """Names must have no whitespace."""
467        # A name with a space.
468        reader = self.get_reader_for(b"0\nbad name\n\n")
469        self.assertRaises(errors.InvalidRecordError, reader.read)
470
471        # A name with a tab.
472        reader = self.get_reader_for(b"0\nbad\tname\n\n")
473        self.assertRaises(errors.InvalidRecordError, reader.read)
474
475        # A name with a vertical tab.
476        reader = self.get_reader_for(b"0\nbad\vname\n\n")
477        self.assertRaises(errors.InvalidRecordError, reader.read)
478
479    def test_validate_whitespace_in_name(self):
480        """Names must have no whitespace."""
481        reader = self.get_reader_for(b"0\nbad name\n\n")
482        self.assertRaises(errors.InvalidRecordError, reader.validate)
483
484    def test_validate_interrupted_prelude(self):
485        """EOF during reading a record's prelude causes validate to fail."""
486        reader = self.get_reader_for(b"")
487        self.assertRaises(
488            errors.UnexpectedEndOfContainerError, reader.validate)
489
490    def test_validate_interrupted_body(self):
491        """EOF during reading a record's body causes validate to fail."""
492        reader = self.get_reader_for(b"1\n\n")
493        self.assertRaises(
494            errors.UnexpectedEndOfContainerError, reader.validate)
495
496    def test_validate_unparseable_length(self):
497        """An unparseable record length causes validate to fail."""
498        reader = self.get_reader_for(b"\n\n")
499        self.assertRaises(
500            errors.InvalidRecordError, reader.validate)
501
502    def test_validate_undecodeable_name(self):
503        """Names that aren't valid UTF-8 cause validate to fail."""
504        reader = self.get_reader_for(b"0\n\xcc\n\n")
505        self.assertRaises(errors.InvalidRecordError, reader.validate)
506
507    def test_read_max_length(self):
508        """If the max_length passed to the callable returned by read is not
509        None, then no more than that many bytes will be read.
510        """
511        reader = self.get_reader_for(b"6\n\nabcdef")
512        names, get_bytes = reader.read()
513        self.assertEqual(b'abc', get_bytes(3))
514
515    def test_read_no_max_length(self):
516        """If the max_length passed to the callable returned by read is None,
517        then all the bytes in the record will be read.
518        """
519        reader = self.get_reader_for(b"6\n\nabcdef")
520        names, get_bytes = reader.read()
521        self.assertEqual(b'abcdef', get_bytes(None))
522
523    def test_repeated_read_calls(self):
524        """Repeated calls to the callable returned from BytesRecordReader.read
525        will not read beyond the end of the record.
526        """
527        reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
528        names, get_bytes = reader.read()
529        self.assertEqual(b'abcdef', get_bytes(None))
530        self.assertEqual(b'', get_bytes(None))
531        self.assertEqual(b'', get_bytes(99))
532
533
534class TestMakeReadvReader(tests.TestCaseWithTransport):
535
536    def test_read_skipping_records(self):
537        pack_data = BytesIO()
538        writer = pack.ContainerWriter(pack_data.write)
539        writer.begin()
540        memos = []
541        memos.append(writer.add_bytes_record([b'abc'], 3, names=[]))
542        memos.append(writer.add_bytes_record([b'def'], 3, names=[(b'name1', )]))
543        memos.append(writer.add_bytes_record([b'ghi'], 3, names=[(b'name2', )]))
544        memos.append(writer.add_bytes_record([b'jkl'], 3, names=[]))
545        writer.end()
546        transport = self.get_transport()
547        transport.put_bytes('mypack', pack_data.getvalue())
548        requested_records = [memos[0], memos[2]]
549        reader = pack.make_readv_reader(transport, 'mypack', requested_records)
550        result = []
551        for names, reader_func in reader.iter_records():
552            result.append((names, reader_func(None)))
553        self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
554
555
556class TestReadvFile(tests.TestCaseWithTransport):
557    """Tests of the ReadVFile class.
558
559    Error cases are deliberately undefined: this code adapts the underlying
560    transport interface to a single 'streaming read' interface as
561    ContainerReader needs.
562    """
563
564    def test_read_bytes(self):
565        """Test reading of both single bytes and all bytes in a hunk."""
566        transport = self.get_transport()
567        transport.put_bytes('sample', b'0123456789')
568        f = pack.ReadVFile(transport.readv(
569            'sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
570        results = []
571        results.append(f.read(1))
572        results.append(f.read(2))
573        results.append(f.read(1))
574        results.append(f.read(1))
575        results.append(f.read(1))
576        self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
577
578    def test_readline(self):
579        """Test using readline() as ContainerReader does.
580
581        This is always within a readv hunk, never across it.
582        """
583        transport = self.get_transport()
584        transport.put_bytes('sample', b'0\n2\n4\n')
585        f = pack.ReadVFile(transport.readv('sample', [(0, 2), (2, 4)]))
586        results = []
587        results.append(f.readline())
588        results.append(f.readline())
589        results.append(f.readline())
590        self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
591
592    def test_readline_and_read(self):
593        """Test exercising one byte reads, readline, and then read again."""
594        transport = self.get_transport()
595        transport.put_bytes('sample', b'0\n2\n4\n')
596        f = pack.ReadVFile(transport.readv('sample', [(0, 6)]))
597        results = []
598        results.append(f.read(1))
599        results.append(f.readline())
600        results.append(f.read(4))
601        self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
602
603
604class PushParserTestCase(tests.TestCase):
605    """Base class for TestCases involving ContainerPushParser."""
606
607    def make_parser_expecting_record_type(self):
608        parser = pack.ContainerPushParser()
609        parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\n")
610        return parser
611
612    def make_parser_expecting_bytes_record(self):
613        parser = pack.ContainerPushParser()
614        parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\nB")
615        return parser
616
617    def assertRecordParsing(self, expected_record, data):
618        """Assert that 'bytes' is parsed as a given bytes record.
619
620        :param expected_record: A tuple of (names, bytes).
621        """
622        parser = self.make_parser_expecting_bytes_record()
623        parser.accept_bytes(data)
624        parsed_records = parser.read_pending_records()
625        self.assertEqual([expected_record], parsed_records)
626
627
628class TestContainerPushParser(PushParserTestCase):
629    """Tests for ContainerPushParser.
630
631    The ContainerPushParser reads format 1 containers, so these tests
632    explicitly test how it reacts to format 1 data.  If a new version of the
633    format is added, then separate tests for that format should be added.
634    """
635
636    def test_construct(self):
637        """ContainerPushParser can be constructed."""
638        pack.ContainerPushParser()
639
640    def test_multiple_records_at_once(self):
641        """If multiple records worth of data are fed to the parser in one
642        string, the parser will correctly parse all the records.
643
644        (A naive implementation might stop after parsing the first record.)
645        """
646        parser = self.make_parser_expecting_record_type()
647        parser.accept_bytes(b"B5\nname1\n\nbody1B5\nname2\n\nbody2")
648        self.assertEqual(
649            [([(b'name1',)], b'body1'), ([(b'name2',)], b'body2')],
650            parser.read_pending_records())
651
652    def test_multiple_empty_records_at_once(self):
653        """If multiple empty records worth of data are fed to the parser in one
654        string, the parser will correctly parse all the records.
655
656        (A naive implementation might stop after parsing the first empty
657        record, because the buffer size had not changed.)
658        """
659        parser = self.make_parser_expecting_record_type()
660        parser.accept_bytes(b"B0\nname1\n\nB0\nname2\n\n")
661        self.assertEqual(
662            [([(b'name1',)], b''), ([(b'name2',)], b'')],
663            parser.read_pending_records())
664
665
666class TestContainerPushParserBytesParsing(PushParserTestCase):
667    """Tests for reading Bytes records with ContainerPushParser.
668
669    The ContainerPushParser reads format 1 containers, so these tests
670    explicitly test how it reacts to format 1 data.  If a new version of the
671    format is added, then separate tests for that format should be added.
672    """
673
674    def test_record_with_no_name(self):
675        """Reading a Bytes record with no name returns an empty list of
676        names.
677        """
678        self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
679
680    def test_record_with_one_name(self):
681        """Reading a Bytes record with one name returns a list of just that
682        name.
683        """
684        self.assertRecordParsing(
685            ([(b'name1', )], b'aaaaa'),
686            b"5\nname1\n\naaaaa")
687
688    def test_record_with_two_names(self):
689        """Reading a Bytes record with two names returns a list of both names.
690        """
691        self.assertRecordParsing(
692            ([(b'name1', ), (b'name2', )], b'aaaaa'),
693            b"5\nname1\nname2\n\naaaaa")
694
695    def test_record_with_two_part_names(self):
696        """Reading a Bytes record with a two_part name reads both."""
697        self.assertRecordParsing(
698            ([(b'name1', b'name2')], b'aaaaa'),
699            b"5\nname1\x00name2\n\naaaaa")
700
701    def test_invalid_length(self):
702        """If the length-prefix is not a number, parsing raises
703        InvalidRecordError.
704        """
705        parser = self.make_parser_expecting_bytes_record()
706        self.assertRaises(
707            errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
708
709    def test_incomplete_record(self):
710        """If the bytes seen so far don't form a complete record, then there
711        will be nothing returned by read_pending_records.
712        """
713        parser = self.make_parser_expecting_bytes_record()
714        parser.accept_bytes(b"5\n\nabcd")
715        self.assertEqual([], parser.read_pending_records())
716
717    def test_accept_nothing(self):
718        """The edge case of parsing an empty string causes no error."""
719        parser = self.make_parser_expecting_bytes_record()
720        parser.accept_bytes(b"")
721
722    def assertInvalidRecord(self, data):
723        """Assert that parsing the given bytes raises InvalidRecordError."""
724        parser = self.make_parser_expecting_bytes_record()
725        self.assertRaises(
726            errors.InvalidRecordError, parser.accept_bytes, data)
727
728    def test_read_invalid_name_whitespace(self):
729        """Names must have no whitespace."""
730        # A name with a space.
731        self.assertInvalidRecord(b"0\nbad name\n\n")
732
733        # A name with a tab.
734        self.assertInvalidRecord(b"0\nbad\tname\n\n")
735
736        # A name with a vertical tab.
737        self.assertInvalidRecord(b"0\nbad\vname\n\n")
738
739    def test_repeated_read_pending_records(self):
740        """read_pending_records will not return the same record twice."""
741        parser = self.make_parser_expecting_bytes_record()
742        parser.accept_bytes(b"6\n\nabcdef")
743        self.assertEqual([([], b'abcdef')], parser.read_pending_records())
744        self.assertEqual([], parser.read_pending_records())
745