1import time
2import io
3import binascii
4
5import pytest
6import mock
7
8from Connection import ConnectionServer
9from Content.ContentManager import VerifyError
10from File import FileServer
11from File import FileRequest
12from Worker import WorkerManager
13from Peer import Peer
14from Bigfile import BigfilePiecefield, BigfilePiecefieldPacked
15from Test import Spy
16from util import Msgpack
17
18
19@pytest.mark.usefixtures("resetSettings")
20@pytest.mark.usefixtures("resetTempSettings")
21class TestBigfile:
22    privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv"
23    piece_size = 1024 * 1024
24
25    def createBigfile(self, site, inner_path="data/optional.any.iso", pieces=10):
26        f = site.storage.open(inner_path, "w")
27        for i in range(pieces * 100):
28            f.write(("Test%s" % i).ljust(10, "-") * 1000)
29        f.close()
30        assert site.content_manager.sign("content.json", self.privatekey)
31        return inner_path
32
33    def testPiecemapCreate(self, site):
34        inner_path = self.createBigfile(site)
35        content = site.storage.loadJson("content.json")
36        assert "data/optional.any.iso" in content["files_optional"]
37        file_node = content["files_optional"][inner_path]
38        assert file_node["size"] == 10 * 1000 * 1000
39        assert file_node["sha512"] == "47a72cde3be80b4a829e7674f72b7c6878cf6a70b0c58c6aa6c17d7e9948daf6"
40        assert file_node["piecemap"] == inner_path + ".piecemap.msgpack"
41
42        piecemap = Msgpack.unpack(site.storage.open(file_node["piecemap"], "rb").read())["optional.any.iso"]
43        assert len(piecemap["sha512_pieces"]) == 10
44        assert piecemap["sha512_pieces"][0] != piecemap["sha512_pieces"][1]
45        assert binascii.hexlify(piecemap["sha512_pieces"][0]) == b"a73abad9992b3d0b672d0c2a292046695d31bebdcb1e150c8410bbe7c972eff3"
46
47    def testVerifyPiece(self, site):
48        inner_path = self.createBigfile(site)
49
50        # Verify all 10 piece
51        f = site.storage.open(inner_path, "rb")
52        for i in range(10):
53            piece = io.BytesIO(f.read(1024 * 1024))
54            piece.seek(0)
55            site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece)
56        f.close()
57
58        # Try to verify piece 0 with piece 1 hash
59        with pytest.raises(VerifyError) as err:
60            i = 1
61            f = site.storage.open(inner_path, "rb")
62            piece = io.BytesIO(f.read(1024 * 1024))
63            f.close()
64            site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece)
65        assert "Invalid hash" in str(err.value)
66
67    def testSparseFile(self, site):
68        inner_path = "sparsefile"
69
70        # Create a 100MB sparse file
71        site.storage.createSparseFile(inner_path, 100 * 1024 * 1024)
72
73        # Write to file beginning
74        s = time.time()
75        f = site.storage.write("%s|%s-%s" % (inner_path, 0, 1024 * 1024), b"hellostart" * 1024)
76        time_write_start = time.time() - s
77
78        # Write to file end
79        s = time.time()
80        f = site.storage.write("%s|%s-%s" % (inner_path, 99 * 1024 * 1024, 99 * 1024 * 1024 + 1024 * 1024), b"helloend" * 1024)
81        time_write_end = time.time() - s
82
83        # Verify writes
84        f = site.storage.open(inner_path)
85        assert f.read(10) == b"hellostart"
86        f.seek(99 * 1024 * 1024)
87        assert f.read(8) == b"helloend"
88        f.close()
89
90        site.storage.delete(inner_path)
91
92        # Writing to end shold not take much longer, than writing to start
93        assert time_write_end <= max(0.1, time_write_start * 1.1)
94
95    def testRangedFileRequest(self, file_server, site, site_temp):
96        inner_path = self.createBigfile(site)
97
98        file_server.sites[site.address] = site
99        client = FileServer(file_server.ip, 1545)
100        client.sites[site_temp.address] = site_temp
101        site_temp.connection_server = client
102        connection = client.getConnection(file_server.ip, 1544)
103
104        # Add file_server as peer to client
105        peer_file_server = site_temp.addPeer(file_server.ip, 1544)
106
107        buff = peer_file_server.getFile(site_temp.address, "%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
108
109        assert len(buff.getvalue()) == 1 * 1024 * 1024  # Correct block size
110        assert buff.getvalue().startswith(b"Test524")  # Correct data
111        buff.seek(0)
112        assert site.content_manager.verifyPiece(inner_path, 5 * 1024 * 1024, buff)  # Correct hash
113
114        connection.close()
115        client.stop()
116
117    def testRangedFileDownload(self, file_server, site, site_temp):
118        inner_path = self.createBigfile(site)
119
120        # Init source server
121        site.connection_server = file_server
122        file_server.sites[site.address] = site
123
124        # Make sure the file and the piecemap in the optional hashfield
125        file_info = site.content_manager.getFileInfo(inner_path)
126        assert site.content_manager.hashfield.hasHash(file_info["sha512"])
127
128        piecemap_hash = site.content_manager.getFileInfo(file_info["piecemap"])["sha512"]
129        assert site.content_manager.hashfield.hasHash(piecemap_hash)
130
131        # Init client server
132        client = ConnectionServer(file_server.ip, 1545)
133        site_temp.connection_server = client
134        peer_client = site_temp.addPeer(file_server.ip, 1544)
135
136        # Download site
137        site_temp.download(blind_includes=True).join(timeout=5)
138
139        bad_files = site_temp.storage.verifyFiles(quick_check=True)["bad_files"]
140        assert not bad_files
141
142        # client_piecefield = peer_client.piecefields[file_info["sha512"]].tostring()
143        # assert client_piecefield == "1" * 10
144
145        # Download 5. and 10. block
146
147        site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
148        site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024))
149
150        # Verify 0. block not downloaded
151        f = site_temp.storage.open(inner_path)
152        assert f.read(10) == b"\0" * 10
153        # Verify 5. and 10. block downloaded
154        f.seek(5 * 1024 * 1024)
155        assert f.read(7) == b"Test524"
156        f.seek(9 * 1024 * 1024)
157        assert f.read(7) == b"943---T"
158
159        # Verify hashfield
160        assert set(site_temp.content_manager.hashfield) == set([18343, 43727])  # 18343: data/optional.any.iso, 43727: data/optional.any.iso.hashmap.msgpack
161
162    def testOpenBigfile(self, file_server, site, site_temp):
163        inner_path = self.createBigfile(site)
164
165        # Init source server
166        site.connection_server = file_server
167        file_server.sites[site.address] = site
168
169        # Init client server
170        client = ConnectionServer(file_server.ip, 1545)
171        site_temp.connection_server = client
172        site_temp.addPeer(file_server.ip, 1544)
173
174        # Download site
175        site_temp.download(blind_includes=True).join(timeout=5)
176
177        # Open virtual file
178        assert not site_temp.storage.isFile(inner_path)
179
180        with site_temp.storage.openBigfile(inner_path) as f:
181            with Spy.Spy(FileRequest, "route") as requests:
182                f.seek(5 * 1024 * 1024)
183                assert f.read(7) == b"Test524"
184
185                f.seek(9 * 1024 * 1024)
186                assert f.read(7) == b"943---T"
187
188            assert len(requests) == 4  # 1x peicemap + 1x getpiecefield + 2x for pieces
189
190            assert set(site_temp.content_manager.hashfield) == set([18343, 43727])
191
192            assert site_temp.storage.piecefields[f.sha512].tostring() == "0000010001"
193            assert f.sha512 in site_temp.getSettingsCache()["piecefields"]
194
195            # Test requesting already downloaded
196            with Spy.Spy(FileRequest, "route") as requests:
197                f.seek(5 * 1024 * 1024)
198                assert f.read(7) == b"Test524"
199
200            assert len(requests) == 0
201
202            # Test requesting multi-block overflow reads
203            with Spy.Spy(FileRequest, "route") as requests:
204                f.seek(5 * 1024 * 1024)  # We already have this block
205                data = f.read(1024 * 1024 * 3)  # Our read overflow to 6. and 7. block
206                assert data.startswith(b"Test524")
207                assert data.endswith(b"Test838-")
208                assert b"\0" not in data  # No null bytes allowed
209
210            assert len(requests) == 2  # Two block download
211
212            # Test out of range request
213            f.seek(5 * 1024 * 1024)
214            data = f.read(1024 * 1024 * 30)
215            assert len(data) == 10 * 1000 * 1000 - (5 * 1024 * 1024)
216
217            f.seek(30 * 1024 * 1024)
218            data = f.read(1024 * 1024 * 30)
219            assert len(data) == 0
220
221    @pytest.mark.parametrize("piecefield_obj", [BigfilePiecefield, BigfilePiecefieldPacked])
222    def testPiecefield(self, piecefield_obj, site):
223        testdatas = [
224            b"\x01" * 100 + b"\x00" * 900 + b"\x01" * 4000 + b"\x00" * 4999 + b"\x01",
225            b"\x00\x01\x00\x01\x00\x01" * 10 + b"\x00\x01" * 90 + b"\x01\x00" * 400 + b"\x00" * 4999,
226            b"\x01" * 10000,
227            b"\x00" * 10000
228        ]
229        for testdata in testdatas:
230            piecefield = piecefield_obj()
231
232            piecefield.frombytes(testdata)
233            assert piecefield.tobytes() == testdata
234            assert piecefield[0] == testdata[0]
235            assert piecefield[100] == testdata[100]
236            assert piecefield[1000] == testdata[1000]
237            assert piecefield[len(testdata) - 1] == testdata[len(testdata) - 1]
238
239            packed = piecefield.pack()
240            piecefield_new = piecefield_obj()
241            piecefield_new.unpack(packed)
242            assert piecefield.tobytes() == piecefield_new.tobytes()
243            assert piecefield_new.tobytes() == testdata
244
245    def testFileGet(self, file_server, site, site_temp):
246        inner_path = self.createBigfile(site)
247
248        # Init source server
249        site.connection_server = file_server
250        file_server.sites[site.address] = site
251
252        # Init client server
253        site_temp.connection_server = FileServer(file_server.ip, 1545)
254        site_temp.connection_server.sites[site_temp.address] = site_temp
255        site_temp.addPeer(file_server.ip, 1544)
256
257        # Download site
258        site_temp.download(blind_includes=True).join(timeout=5)
259
260        # Download second block
261        with site_temp.storage.openBigfile(inner_path) as f:
262            f.seek(1024 * 1024)
263            assert f.read(1024)[0:1] != b"\0"
264
265        # Make sure first block not download
266        with site_temp.storage.open(inner_path) as f:
267            assert f.read(1024)[0:1] == b"\0"
268
269        peer2 = site.addPeer(file_server.ip, 1545, return_peer=True)
270
271        # Should drop error on first block request
272        assert not peer2.getFile(site.address, "%s|0-%s" % (inner_path, 1024 * 1024 * 1))
273
274        # Should not drop error for second block request
275        assert peer2.getFile(site.address, "%s|%s-%s" % (inner_path, 1024 * 1024 * 1, 1024 * 1024 * 2))
276
277    def benchmarkPeerMemory(self, site, file_server):
278        # Init source server
279        site.connection_server = file_server
280        file_server.sites[site.address] = site
281
282        import psutil, os
283        meminfo = psutil.Process(os.getpid()).memory_info
284
285        mem_s = meminfo()[0]
286        s = time.time()
287        for i in range(25000):
288            site.addPeer(file_server.ip, i)
289        print("%.3fs MEM: + %sKB" % (time.time() - s, (meminfo()[0] - mem_s) / 1024))  # 0.082s MEM: + 6800KB
290        print(list(site.peers.values())[0].piecefields)
291
292    def testUpdatePiecefield(self, file_server, site, site_temp):
293        inner_path = self.createBigfile(site)
294
295        server1 = file_server
296        server1.sites[site.address] = site
297        server2 = FileServer(file_server.ip, 1545)
298        server2.sites[site_temp.address] = site_temp
299        site_temp.connection_server = server2
300
301        # Add file_server as peer to client
302        server2_peer1 = site_temp.addPeer(file_server.ip, 1544)
303
304        # Testing piecefield sync
305        assert len(server2_peer1.piecefields) == 0
306        assert server2_peer1.updatePiecefields()  # Query piecefields from peer
307        assert len(server2_peer1.piecefields) > 0
308
309    def testWorkerManagerPiecefieldDeny(self, file_server, site, site_temp):
310        inner_path = self.createBigfile(site)
311
312        server1 = file_server
313        server1.sites[site.address] = site
314        server2 = FileServer(file_server.ip, 1545)
315        server2.sites[site_temp.address] = site_temp
316        site_temp.connection_server = server2
317
318        # Add file_server as peer to client
319        server2_peer1 = site_temp.addPeer(file_server.ip, 1544)  # Working
320
321        site_temp.downloadContent("content.json", download_files=False)
322        site_temp.needFile("data/optional.any.iso.piecemap.msgpack")
323
324        # Add fake peers with optional files downloaded
325        for i in range(5):
326            fake_peer = site_temp.addPeer("127.0.1.%s" % i, 1544)
327            fake_peer.hashfield = site.content_manager.hashfield
328            fake_peer.has_hashfield = True
329
330        with Spy.Spy(WorkerManager, "addWorker") as requests:
331            site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
332            site_temp.needFile("%s|%s-%s" % (inner_path, 6 * 1024 * 1024, 7 * 1024 * 1024))
333
334        # It should only request parts from peer1 as the other peers does not have the requested parts in piecefields
335        assert len([request[1] for request in requests if request[1] != server2_peer1]) == 0
336
337    def testWorkerManagerPiecefieldDownload(self, file_server, site, site_temp):
338        inner_path = self.createBigfile(site)
339
340        server1 = file_server
341        server1.sites[site.address] = site
342        server2 = FileServer(file_server.ip, 1545)
343        server2.sites[site_temp.address] = site_temp
344        site_temp.connection_server = server2
345        sha512 = site.content_manager.getFileInfo(inner_path)["sha512"]
346
347        # Create 10 fake peer for each piece
348        for i in range(10):
349            peer = Peer(file_server.ip, 1544, site_temp, server2)
350            peer.piecefields[sha512][i] = b"\x01"
351            peer.updateHashfield = mock.MagicMock(return_value=False)
352            peer.updatePiecefields = mock.MagicMock(return_value=False)
353            peer.findHashIds = mock.MagicMock(return_value={"nope": []})
354            peer.hashfield = site.content_manager.hashfield
355            peer.has_hashfield = True
356            peer.key = "Peer:%s" % i
357            site_temp.peers["Peer:%s" % i] = peer
358
359        site_temp.downloadContent("content.json", download_files=False)
360        site_temp.needFile("data/optional.any.iso.piecemap.msgpack")
361
362        with Spy.Spy(Peer, "getFile") as requests:
363            for i in range(10):
364                site_temp.needFile("%s|%s-%s" % (inner_path, i * 1024 * 1024, (i + 1) * 1024 * 1024))
365
366        assert len(requests) == 10
367        for i in range(10):
368            assert requests[i][0] == site_temp.peers["Peer:%s" % i]  # Every part should be requested from piece owner peer
369
370    def testDownloadStats(self, file_server, site, site_temp):
371        inner_path = self.createBigfile(site)
372
373        # Init source server
374        site.connection_server = file_server
375        file_server.sites[site.address] = site
376
377        # Init client server
378        client = ConnectionServer(file_server.ip, 1545)
379        site_temp.connection_server = client
380        site_temp.addPeer(file_server.ip, 1544)
381
382        # Download site
383        site_temp.download(blind_includes=True).join(timeout=5)
384
385        # Open virtual file
386        assert not site_temp.storage.isFile(inner_path)
387
388        # Check size before downloads
389        assert site_temp.settings["size"] < 10 * 1024 * 1024
390        assert site_temp.settings["optional_downloaded"] == 0
391        size_piecemap = site_temp.content_manager.getFileInfo(inner_path + ".piecemap.msgpack")["size"]
392        size_bigfile = site_temp.content_manager.getFileInfo(inner_path)["size"]
393
394        with site_temp.storage.openBigfile(inner_path) as f:
395            assert b"\0" not in f.read(1024)
396            assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
397
398        with site_temp.storage.openBigfile(inner_path) as f:
399            # Don't count twice
400            assert b"\0" not in f.read(1024)
401            assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
402
403            # Add second block
404            assert b"\0" not in f.read(1024 * 1024)
405            assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
406
407    def testPrebuffer(self, file_server, site, site_temp):
408        inner_path = self.createBigfile(site)
409
410        # Init source server
411        site.connection_server = file_server
412        file_server.sites[site.address] = site
413
414        # Init client server
415        client = ConnectionServer(file_server.ip, 1545)
416        site_temp.connection_server = client
417        site_temp.addPeer(file_server.ip, 1544)
418
419        # Download site
420        site_temp.download(blind_includes=True).join(timeout=5)
421
422        # Open virtual file
423        assert not site_temp.storage.isFile(inner_path)
424
425        with site_temp.storage.openBigfile(inner_path, prebuffer=1024 * 1024 * 2) as f:
426            with Spy.Spy(FileRequest, "route") as requests:
427                f.seek(5 * 1024 * 1024)
428                assert f.read(7) == b"Test524"
429            # assert len(requests) == 3  # 1x piecemap + 1x getpiecefield + 1x for pieces
430            assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 2
431
432            time.sleep(0.5)  # Wait prebuffer download
433
434            sha512 = site.content_manager.getFileInfo(inner_path)["sha512"]
435            assert site_temp.storage.piecefields[sha512].tostring() == "0000011100"
436
437            # No prebuffer beyond end of the file
438            f.seek(9 * 1024 * 1024)
439            assert b"\0" not in f.read(7)
440
441            assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 0
442
443    def testDownloadAllPieces(self, file_server, site, site_temp):
444        inner_path = self.createBigfile(site)
445
446        # Init source server
447        site.connection_server = file_server
448        file_server.sites[site.address] = site
449
450        # Init client server
451        client = ConnectionServer(file_server.ip, 1545)
452        site_temp.connection_server = client
453        site_temp.addPeer(file_server.ip, 1544)
454
455        # Download site
456        site_temp.download(blind_includes=True).join(timeout=5)
457
458        # Open virtual file
459        assert not site_temp.storage.isFile(inner_path)
460
461        with Spy.Spy(FileRequest, "route") as requests:
462            site_temp.needFile("%s|all" % inner_path)
463
464        assert len(requests) == 12  # piecemap.msgpack, getPiecefields, 10 x piece
465
466        # Don't re-download already got pieces
467        with Spy.Spy(FileRequest, "route") as requests:
468            site_temp.needFile("%s|all" % inner_path)
469
470        assert len(requests) == 0
471
472    def testFileSize(self, file_server, site, site_temp):
473        inner_path = self.createBigfile(site)
474
475        # Init source server
476        site.connection_server = file_server
477        file_server.sites[site.address] = site
478
479        # Init client server
480        client = ConnectionServer(file_server.ip, 1545)
481        site_temp.connection_server = client
482        site_temp.addPeer(file_server.ip, 1544)
483
484        # Download site
485        site_temp.download(blind_includes=True).join(timeout=5)
486
487        # Open virtual file
488        assert not site_temp.storage.isFile(inner_path)
489
490        # Download first block
491        site_temp.needFile("%s|%s-%s" % (inner_path, 0 * 1024 * 1024, 1 * 1024 * 1024))
492        assert site_temp.storage.getSize(inner_path) < 1000 * 1000 * 10  # Size on the disk should be smaller than the real size
493
494        site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024))
495        assert site_temp.storage.getSize(inner_path) == site.storage.getSize(inner_path)
496
497    def testFileRename(self, file_server, site, site_temp):
498        inner_path = self.createBigfile(site)
499
500        # Init source server
501        site.connection_server = file_server
502        file_server.sites[site.address] = site
503
504        # Init client server
505        site_temp.connection_server = FileServer(file_server.ip, 1545)
506        site_temp.connection_server.sites[site_temp.address] = site_temp
507        site_temp.addPeer(file_server.ip, 1544)
508
509        # Download site
510        site_temp.download(blind_includes=True).join(timeout=5)
511
512        with Spy.Spy(FileRequest, "route") as requests:
513            site_temp.needFile("%s|%s-%s" % (inner_path, 0, 1 * self.piece_size))
514
515        assert len([req for req in requests if req[1] == "streamFile"]) == 2  # 1 piece + piecemap
516
517        # Rename the file
518        inner_path_new = inner_path.replace(".iso", "-new.iso")
519        site.storage.rename(inner_path, inner_path_new)
520        site.storage.delete("data/optional.any.iso.piecemap.msgpack")
521        assert site.content_manager.sign("content.json", self.privatekey, remove_missing_optional=True)
522
523        files_optional = site.content_manager.contents["content.json"]["files_optional"].keys()
524
525        assert "data/optional.any-new.iso.piecemap.msgpack" in files_optional
526        assert "data/optional.any.iso.piecemap.msgpack" not in files_optional
527        assert "data/optional.any.iso" not in files_optional
528
529        with Spy.Spy(FileRequest, "route") as requests:
530            site.publish()
531            time.sleep(0.1)
532            site_temp.download(blind_includes=True).join(timeout=5)  # Wait for download
533
534            assert len([req[1] for req in requests if req[1] == "streamFile"]) == 0
535
536            with site_temp.storage.openBigfile(inner_path_new, prebuffer=0) as f:
537                f.read(1024)
538
539                # First piece already downloaded
540                assert [req for req in requests if req[1] == "streamFile"] == []
541
542                # Second piece needs to be downloaded + changed piecemap
543                f.seek(self.piece_size)
544                f.read(1024)
545                assert [req[3]["inner_path"] for req in requests if req[1] == "streamFile"] == [inner_path_new + ".piecemap.msgpack", inner_path_new]
546
547    @pytest.mark.parametrize("size", [1024 * 3, 1024 * 1024 * 3, 1024 * 1024 * 30])
548    def testNullFileRead(self, file_server, site, site_temp, size):
549        inner_path = "data/optional.iso"
550
551        f = site.storage.open(inner_path, "w")
552        f.write("\0" * size)
553        f.close()
554        assert site.content_manager.sign("content.json", self.privatekey)
555
556        # Init source server
557        site.connection_server = file_server
558        file_server.sites[site.address] = site
559
560        # Init client server
561        site_temp.connection_server = FileServer(file_server.ip, 1545)
562        site_temp.connection_server.sites[site_temp.address] = site_temp
563        site_temp.addPeer(file_server.ip, 1544)
564
565        # Download site
566        site_temp.download(blind_includes=True).join(timeout=5)
567
568        if "piecemap" in site.content_manager.getFileInfo(inner_path):  # Bigfile
569            site_temp.needFile(inner_path + "|all")
570        else:
571            site_temp.needFile(inner_path)
572
573
574        assert site_temp.storage.getSize(inner_path) == size
575