1import json
2import io
3
4import pytest
5
6from Crypt import CryptBitcoin
7from Content.ContentManager import VerifyError, SignError
8
9
10@pytest.mark.usefixtures("resetSettings")
11class TestContentUser:
12    def testSigners(self, site):
13        # File info for not existing user file
14        file_info = site.content_manager.getFileInfo("data/users/notexist/data.json")
15        assert file_info["content_inner_path"] == "data/users/notexist/content.json"
16        file_info = site.content_manager.getFileInfo("data/users/notexist/a/b/data.json")
17        assert file_info["content_inner_path"] == "data/users/notexist/content.json"
18        valid_signers = site.content_manager.getValidSigners("data/users/notexist/content.json")
19        assert valid_signers == ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "notexist", "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"]
20
21        # File info for exsitsing user file
22        valid_signers = site.content_manager.getValidSigners("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
23        assert '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT' in valid_signers  # The site address
24        assert '14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet' in valid_signers  # Admin user defined in data/users/content.json
25        assert '1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C' in valid_signers  # The user itself
26        assert len(valid_signers) == 3  # No more valid signers
27
28        # Valid signer for banned user
29        user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
30        user_content["cert_user_id"] = "bad@zeroid.bit"
31
32        valid_signers = site.content_manager.getValidSigners("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
33        assert '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT' in valid_signers  # The site address
34        assert '14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet' in valid_signers  # Admin user defined in data/users/content.json
35        assert '1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C' not in valid_signers  # The user itself
36
37    def testRules(self, site):
38        # We going to manipulate it this test rules based on data/users/content.json
39        user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
40
41        # Known user
42        user_content["cert_auth_type"] = "web"
43        user_content["cert_user_id"] = "nofish@zeroid.bit"
44        rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
45        assert rules["max_size"] == 100000
46        assert "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C" in rules["signers"]
47
48        # Unknown user
49        user_content["cert_auth_type"] = "web"
50        user_content["cert_user_id"] = "noone@zeroid.bit"
51        rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
52        assert rules["max_size"] == 10000
53        assert "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C" in rules["signers"]
54
55        # User with more size limit based on auth type
56        user_content["cert_auth_type"] = "bitmsg"
57        user_content["cert_user_id"] = "noone@zeroid.bit"
58        rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
59        assert rules["max_size"] == 15000
60        assert "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C" in rules["signers"]
61
62        # Banned user
63        user_content["cert_auth_type"] = "web"
64        user_content["cert_user_id"] = "bad@zeroid.bit"
65        rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
66        assert "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C" not in rules["signers"]
67
68    def testRulesAddress(self, site):
69        user_inner_path = "data/users/1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9/content.json"
70        user_content = site.storage.loadJson(user_inner_path)
71
72        rules = site.content_manager.getRules(user_inner_path, user_content)
73        assert rules["max_size"] == 10000
74        assert "1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9" in rules["signers"]
75
76        users_content = site.content_manager.contents["data/users/content.json"]
77
78        # Ban user based on address
79        users_content["user_contents"]["permissions"]["1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9"] = False
80        rules = site.content_manager.getRules(user_inner_path, user_content)
81        assert "1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9" not in rules["signers"]
82
83        # Change max allowed size
84        users_content["user_contents"]["permissions"]["1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9"] = {"max_size": 20000}
85        rules = site.content_manager.getRules(user_inner_path, user_content)
86        assert rules["max_size"] == 20000
87
88    def testVerifyAddress(self, site):
89        privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv"  # For 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT
90        user_inner_path = "data/users/1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9/content.json"
91        data_dict = site.storage.loadJson(user_inner_path)
92        users_content = site.content_manager.contents["data/users/content.json"]
93
94        data = io.BytesIO(json.dumps(data_dict).encode())
95        assert site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
96
97        # Test error on 15k data.json
98        data_dict["files"]["data.json"]["size"] = 1024 * 15
99        del data_dict["signs"]  # Remove signs before signing
100        data_dict["signs"] = {
101            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(data_dict, sort_keys=True), privatekey)
102        }
103        data = io.BytesIO(json.dumps(data_dict).encode())
104        with pytest.raises(VerifyError) as err:
105            site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
106        assert "Include too large" in str(err.value)
107
108        # Give more space based on address
109        users_content["user_contents"]["permissions"]["1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9"] = {"max_size": 20000}
110        del data_dict["signs"]  # Remove signs before signing
111        data_dict["signs"] = {
112            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(data_dict, sort_keys=True), privatekey)
113        }
114        data = io.BytesIO(json.dumps(data_dict).encode())
115        assert site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
116
117    def testVerify(self, site):
118        privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv"  # For 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT
119        user_inner_path = "data/users/1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9/content.json"
120        data_dict = site.storage.loadJson(user_inner_path)
121        users_content = site.content_manager.contents["data/users/content.json"]
122
123        data = io.BytesIO(json.dumps(data_dict).encode())
124        assert site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
125
126        # Test max size exception by setting allowed to 0
127        rules = site.content_manager.getRules(user_inner_path, data_dict)
128        assert rules["max_size"] == 10000
129        assert users_content["user_contents"]["permission_rules"][".*"]["max_size"] == 10000
130
131        users_content["user_contents"]["permission_rules"][".*"]["max_size"] = 0
132        rules = site.content_manager.getRules(user_inner_path, data_dict)
133        assert rules["max_size"] == 0
134        data = io.BytesIO(json.dumps(data_dict).encode())
135
136        with pytest.raises(VerifyError) as err:
137            site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
138        assert "Include too large" in str(err.value)
139        users_content["user_contents"]["permission_rules"][".*"]["max_size"] = 10000  # Reset
140
141        # Test max optional size exception
142        # 1 MB gif = Allowed
143        data_dict["files_optional"]["peanut-butter-jelly-time.gif"]["size"] = 1024 * 1024
144        del data_dict["signs"]  # Remove signs before signing
145        data_dict["signs"] = {
146            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(data_dict, sort_keys=True), privatekey)
147        }
148        data = io.BytesIO(json.dumps(data_dict).encode())
149        assert site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
150
151        # 100 MB gif = Not allowed
152        data_dict["files_optional"]["peanut-butter-jelly-time.gif"]["size"] = 100 * 1024 * 1024
153        del data_dict["signs"]  # Remove signs before signing
154        data_dict["signs"] = {
155            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(data_dict, sort_keys=True), privatekey)
156        }
157        data = io.BytesIO(json.dumps(data_dict).encode())
158        with pytest.raises(VerifyError) as err:
159            site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
160        assert "Include optional files too large" in str(err.value)
161        data_dict["files_optional"]["peanut-butter-jelly-time.gif"]["size"] = 1024 * 1024  # Reset
162
163        # hello.exe = Not allowed
164        data_dict["files_optional"]["hello.exe"] = data_dict["files_optional"]["peanut-butter-jelly-time.gif"]
165        del data_dict["signs"]  # Remove signs before signing
166        data_dict["signs"] = {
167            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(data_dict, sort_keys=True), privatekey)
168        }
169        data = io.BytesIO(json.dumps(data_dict).encode())
170        with pytest.raises(VerifyError) as err:
171            site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
172        assert "Optional file not allowed" in str(err.value)
173        del data_dict["files_optional"]["hello.exe"]  # Reset
174
175        # Includes not allowed in user content
176        data_dict["includes"] = {"other.json": {}}
177        del data_dict["signs"]  # Remove signs before signing
178        data_dict["signs"] = {
179            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(data_dict, sort_keys=True), privatekey)
180        }
181        data = io.BytesIO(json.dumps(data_dict).encode())
182        with pytest.raises(VerifyError) as err:
183            site.content_manager.verifyFile(user_inner_path, data, ignore_same=False)
184        assert "Includes not allowed" in str(err.value)
185
186    def testCert(self, site):
187        # user_addr = "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C"
188        user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A"
189        # cert_addr = "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet"
190        cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA"
191
192        # Check if the user file is loaded
193        assert "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json" in site.content_manager.contents
194        user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"]
195        rules_content = site.content_manager.contents["data/users/content.json"]
196
197        # Override valid cert signers for the test
198        rules_content["user_contents"]["cert_signers"]["zeroid.bit"] = [
199            "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet",
200            "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"
201        ]
202
203        # Check valid cert signers
204        rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
205        assert rules["cert_signers"] == {"zeroid.bit": [
206            "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet",
207            "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"
208        ]}
209
210        # Sign a valid cert
211        user_content["cert_sign"] = CryptBitcoin.sign("1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (
212            user_content["cert_auth_type"],
213            user_content["cert_user_id"].split("@")[0]
214        ), cert_priv)
215
216        # Verify cert
217        assert site.content_manager.verifyCert("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
218
219        # Verify if the cert is valid for other address
220        assert not site.content_manager.verifyCert("data/users/badaddress/content.json", user_content)
221
222        # Sign user content
223        signed_content = site.content_manager.sign(
224            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False
225        )
226
227        # Test user cert
228        assert site.content_manager.verifyFile(
229            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
230            io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
231        )
232
233        # Test banned user
234        cert_user_id = user_content["cert_user_id"]  # My username
235        site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][cert_user_id] = False
236        with pytest.raises(VerifyError) as err:
237            site.content_manager.verifyFile(
238                "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
239                io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
240            )
241        assert "Valid signs: 0/1" in str(err.value)
242        del site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][cert_user_id]  # Reset
243
244        # Test invalid cert
245        user_content["cert_sign"] = CryptBitcoin.sign(
246            "badaddress#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"]), cert_priv
247        )
248        signed_content = site.content_manager.sign(
249            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False
250        )
251        with pytest.raises(VerifyError) as err:
252            site.content_manager.verifyFile(
253                "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
254                io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
255            )
256        assert "Invalid cert" in str(err.value)
257
258        # Test banned user, signed by the site owner
259        user_content["cert_sign"] = CryptBitcoin.sign("1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (
260            user_content["cert_auth_type"],
261            user_content["cert_user_id"].split("@")[0]
262        ), cert_priv)
263        cert_user_id = user_content["cert_user_id"]  # My username
264        site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][cert_user_id] = False
265
266        site_privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv"  # For 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT
267        del user_content["signs"]  # Remove signs before signing
268        user_content["signs"] = {
269            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(user_content, sort_keys=True), site_privatekey)
270        }
271        assert site.content_manager.verifyFile(
272            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
273            io.BytesIO(json.dumps(user_content).encode()), ignore_same=False
274        )
275
276    def testMissingCert(self, site):
277        user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A"
278        cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA"
279
280        user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"]
281        rules_content = site.content_manager.contents["data/users/content.json"]
282
283        # Override valid cert signers for the test
284        rules_content["user_contents"]["cert_signers"]["zeroid.bit"] = [
285            "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet",
286            "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"
287        ]
288
289        # Sign a valid cert
290        user_content["cert_sign"] = CryptBitcoin.sign("1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (
291            user_content["cert_auth_type"],
292            user_content["cert_user_id"].split("@")[0]
293        ), cert_priv)
294        signed_content = site.content_manager.sign(
295            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False
296        )
297
298        assert site.content_manager.verifyFile(
299            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
300            io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
301        )
302
303        # Test invalid cert_user_id
304        user_content["cert_user_id"] = "nodomain"
305        user_content["signs"] = {
306            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(user_content, sort_keys=True), user_priv)
307        }
308        signed_content = site.content_manager.sign(
309            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False
310        )
311        with pytest.raises(VerifyError) as err:
312            site.content_manager.verifyFile(
313                "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
314                io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
315            )
316        assert "Invalid domain in cert_user_id" in str(err.value)
317
318        # Test removed cert
319        del user_content["cert_user_id"]
320        del user_content["cert_auth_type"]
321        del user_content["signs"]  # Remove signs before signing
322        user_content["signs"] = {
323            "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": CryptBitcoin.sign(json.dumps(user_content, sort_keys=True), user_priv)
324        }
325        signed_content = site.content_manager.sign(
326            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False
327        )
328        with pytest.raises(VerifyError) as err:
329            site.content_manager.verifyFile(
330                "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
331                io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
332            )
333        assert "Missing cert_user_id" in str(err.value)
334
335
336    def testCertSignersPattern(self, site):
337        user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A"
338        cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA"  # For 14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet
339
340        user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"]
341        rules_content = site.content_manager.contents["data/users/content.json"]
342
343        # Override valid cert signers for the test
344        rules_content["user_contents"]["cert_signers_pattern"] = "14wgQ[0-9][A-Z]"
345
346        # Sign a valid cert
347        user_content["cert_user_id"] = "certuser@14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet"
348        user_content["cert_sign"] = CryptBitcoin.sign("1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (
349            user_content["cert_auth_type"],
350            "certuser"
351        ), cert_priv)
352        signed_content = site.content_manager.sign(
353            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False
354        )
355
356        assert site.content_manager.verifyFile(
357            "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
358            io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
359        )
360
361        # Cert does not matches the pattern
362        rules_content["user_contents"]["cert_signers_pattern"] = "14wgX[0-9][A-Z]"
363
364        with pytest.raises(VerifyError) as err:
365            site.content_manager.verifyFile(
366                "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
367                io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
368            )
369        assert "Invalid cert signer: 14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet" in str(err.value)
370
371        # Removed cert_signers_pattern
372        del rules_content["user_contents"]["cert_signers_pattern"]
373
374        with pytest.raises(VerifyError) as err:
375            site.content_manager.verifyFile(
376                "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
377                io.BytesIO(json.dumps(signed_content).encode()), ignore_same=False
378            )
379        assert "Invalid cert signer: 14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet" in str(err.value)
380
381
382    def testNewFile(self, site):
383        privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv"  # For 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT
384        inner_path = "data/users/1NEWrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"
385
386        site.storage.writeJson(inner_path, {"test": "data"})
387        site.content_manager.sign(inner_path, privatekey)
388        assert "test" in site.storage.loadJson(inner_path)
389
390        site.storage.delete(inner_path)
391