1######################################################################
2#
3# File: test/unit/v1/test_raw_api.py
4#
5# Copyright 2019 Backblaze Inc. All Rights Reserved.
6#
7# License https://www.backblaze.com/using_b2_code.html
8#
9######################################################################
10import pytest
11
12from ..test_base import TestBase
13
14from .deps import EncryptionAlgorithm
15from .deps import EncryptionKey
16from .deps import EncryptionMode
17from .deps import EncryptionSetting
18from .deps import B2RawHTTPApi
19from .deps import B2Http
20from .deps import BucketRetentionSetting, RetentionPeriod, RetentionMode
21from .deps_exception import UnusableFileName, WrongEncryptionModeForBucketDefault
22
23# Unicode characters for testing filenames.  (0x0394 is a letter Delta.)
24TWO_BYTE_UNICHR = chr(0x0394)
25CHAR_UNDER_32 = chr(31)
26DEL_CHAR = chr(127)
27
28
29class TestRawAPIFilenames(TestBase):
30    """Test that the filename checker passes conforming names and rejects those that don't."""
31
32    def setUp(self):
33        self.raw_api = B2RawHTTPApi(B2Http())
34
35    def _should_be_ok(self, filename):
36        """Call with test filenames that follow the filename rules.
37
38        :param filename: unicode (or str) that follows the rules
39        """
40        print(u"Filename \"{0}\" should be OK".format(filename))
41        self.assertTrue(self.raw_api.check_b2_filename(filename) is None)
42
43    def _should_raise(self, filename, exception_message):
44        """Call with filenames that don't follow the rules (so the rule checker should raise).
45
46        :param filename: unicode (or str) that doesn't follow the rules
47        :param exception_message: regexp that matches the exception's detailed message
48        """
49        print(
50            u"Filename \"{0}\" should raise UnusableFileName(\".*{1}.*\").".format(
51                filename, exception_message
52            )
53        )
54        with self.assertRaisesRegexp(UnusableFileName, exception_message):
55            self.raw_api.check_b2_filename(filename)
56
57    def test_b2_filename_checker(self):
58        """Test a conforming and non-conforming filename for each rule.
59
60        From the B2 docs (https://www.backblaze.com/b2/docs/files.html):
61        - Names can be pretty much any UTF-8 string up to 1024 bytes long.
62        - No character codes below 32 are allowed.
63        - Backslashes are not allowed.
64        - DEL characters (127) are not allowed.
65        - File names cannot start with "/", end with "/", or contain "//".
66        - Maximum of 250 bytes of UTF-8 in each segment (part between slashes) of a file name.
67        """
68        print("test b2 filename rules")
69
70        # Examples from doc:
71        self._should_be_ok('Kitten Videos')
72        self._should_be_ok(u'\u81ea\u7531.txt')
73
74        # Check length
75        # 1024 bytes is ok if the segments are at most 250 chars.
76        s_1024 = 4 * (250 * 'x' + '/') + 20 * 'y'
77        self._should_be_ok(s_1024)
78        # 1025 is too long.
79        self._should_raise(s_1024 + u'x', "too long")
80        # 1024 bytes with two byte characters should also work.
81        s_1024_two_byte = 4 * (125 * TWO_BYTE_UNICHR + u'/') + 20 * u'y'
82        self._should_be_ok(s_1024_two_byte)
83        # But 1025 bytes is too long.
84        self._should_raise(s_1024_two_byte + u'x', "too long")
85
86        # Names with unicode values < 32, and DEL aren't allowed.
87        self._should_raise(u'hey' + CHAR_UNDER_32, "contains code.*less than 32")
88        # Unicode in the filename shouldn't break the exception message.
89        self._should_raise(TWO_BYTE_UNICHR + CHAR_UNDER_32, "contains code.*less than 32")
90        self._should_raise(DEL_CHAR, "DEL.*not allowed")
91
92        # Names can't start or end with '/' or contain '//'
93        self._should_raise(u'/hey', "not start.*/")
94        self._should_raise(u'hey/', "not .*end.*/")
95        self._should_raise(u'not//allowed', "contain.*//")
96
97        # Reject segments longer than 250 bytes
98        self._should_raise(u'foo/' + 251 * u'x', "segment too long")
99        # So a segment of 125 two-byte chars plus one should also fail.
100        self._should_raise(u'foo/' + 125 * TWO_BYTE_UNICHR + u'x', "segment too long")
101
102
103class BucketTestBase:
104    @pytest.fixture(autouse=True)
105    def init(self, mocker):
106        b2_http = mocker.MagicMock()
107        self.raw_api = B2RawHTTPApi(b2_http)
108
109
110class TestUpdateBucket(BucketTestBase):
111    """Test updating bucket."""
112
113    def test_assertion_raises(self):
114        with pytest.raises(AssertionError):
115            self.raw_api.update_bucket('test', 'account_auth_token', 'account_id', 'bucket_id')
116
117    @pytest.mark.parametrize(
118        'bucket_type,bucket_info,default_retention', (
119            (None, {}, None),
120            (
121                'allPublic', None,
122                BucketRetentionSetting(RetentionMode.COMPLIANCE, RetentionPeriod(years=1))
123            ),
124        )
125    )
126    def test_assertion_not_raises(self, bucket_type, bucket_info, default_retention):
127        self.raw_api.update_bucket(
128            'test',
129            'account_auth_token',
130            'account_id',
131            'bucket_id',
132            bucket_type=bucket_type,
133            bucket_info=bucket_info,
134            default_retention=default_retention,
135        )
136
137    @pytest.mark.parametrize(
138        'encryption_setting,', (
139            EncryptionSetting(
140                mode=EncryptionMode.SSE_C,
141                algorithm=EncryptionAlgorithm.AES256,
142                key=EncryptionKey(b'key', 'key-id')
143            ),
144            EncryptionSetting(mode=EncryptionMode.UNKNOWN,),
145        )
146    )
147    def test_update_bucket_wrong_encryption(self, encryption_setting):
148        with pytest.raises(WrongEncryptionModeForBucketDefault):
149            self.raw_api.update_bucket(
150                'test',
151                'account_auth_token',
152                'account_id',
153                'bucket_id',
154                default_server_side_encryption=encryption_setting,
155                bucket_type='allPublic',
156            )
157
158
159class TestCreateBucket(BucketTestBase):
160    """Test creating bucket."""
161
162    @pytest.mark.parametrize(
163        'encryption_setting,', (
164            EncryptionSetting(
165                mode=EncryptionMode.SSE_C,
166                algorithm=EncryptionAlgorithm.AES256,
167                key=EncryptionKey(b'key', 'key-id')
168            ),
169            EncryptionSetting(mode=EncryptionMode.UNKNOWN,),
170        )
171    )
172    def test_create_bucket_wrong_encryption(self, encryption_setting):
173
174        with pytest.raises(WrongEncryptionModeForBucketDefault):
175            self.raw_api.create_bucket(
176                'test',
177                'account_auth_token',
178                'account_id',
179                'bucket_id',
180                bucket_type='allPrivate',
181                bucket_info={},
182                default_server_side_encryption=encryption_setting,
183            )
184