1import time 2import io 3import binascii 4 5import pytest 6import mock 7 8from Connection import ConnectionServer 9from Content.ContentManager import VerifyError 10from File import FileServer 11from File import FileRequest 12from Worker import WorkerManager 13from Peer import Peer 14from Bigfile import BigfilePiecefield, BigfilePiecefieldPacked 15from Test import Spy 16from util import Msgpack 17 18 19@pytest.mark.usefixtures("resetSettings") 20@pytest.mark.usefixtures("resetTempSettings") 21class TestBigfile: 22 privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv" 23 piece_size = 1024 * 1024 24 25 def createBigfile(self, site, inner_path="data/optional.any.iso", pieces=10): 26 f = site.storage.open(inner_path, "w") 27 for i in range(pieces * 100): 28 f.write(("Test%s" % i).ljust(10, "-") * 1000) 29 f.close() 30 assert site.content_manager.sign("content.json", self.privatekey) 31 return inner_path 32 33 def testPiecemapCreate(self, site): 34 inner_path = self.createBigfile(site) 35 content = site.storage.loadJson("content.json") 36 assert "data/optional.any.iso" in content["files_optional"] 37 file_node = content["files_optional"][inner_path] 38 assert file_node["size"] == 10 * 1000 * 1000 39 assert file_node["sha512"] == "47a72cde3be80b4a829e7674f72b7c6878cf6a70b0c58c6aa6c17d7e9948daf6" 40 assert file_node["piecemap"] == inner_path + ".piecemap.msgpack" 41 42 piecemap = Msgpack.unpack(site.storage.open(file_node["piecemap"], "rb").read())["optional.any.iso"] 43 assert len(piecemap["sha512_pieces"]) == 10 44 assert piecemap["sha512_pieces"][0] != piecemap["sha512_pieces"][1] 45 assert binascii.hexlify(piecemap["sha512_pieces"][0]) == b"a73abad9992b3d0b672d0c2a292046695d31bebdcb1e150c8410bbe7c972eff3" 46 47 def testVerifyPiece(self, site): 48 inner_path = self.createBigfile(site) 49 50 # Verify all 10 piece 51 f = site.storage.open(inner_path, "rb") 52 for i in range(10): 53 piece = io.BytesIO(f.read(1024 * 1024)) 54 piece.seek(0) 55 site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece) 56 f.close() 57 58 # Try to verify piece 0 with piece 1 hash 59 with pytest.raises(VerifyError) as err: 60 i = 1 61 f = site.storage.open(inner_path, "rb") 62 piece = io.BytesIO(f.read(1024 * 1024)) 63 f.close() 64 site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece) 65 assert "Invalid hash" in str(err.value) 66 67 def testSparseFile(self, site): 68 inner_path = "sparsefile" 69 70 # Create a 100MB sparse file 71 site.storage.createSparseFile(inner_path, 100 * 1024 * 1024) 72 73 # Write to file beginning 74 s = time.time() 75 f = site.storage.write("%s|%s-%s" % (inner_path, 0, 1024 * 1024), b"hellostart" * 1024) 76 time_write_start = time.time() - s 77 78 # Write to file end 79 s = time.time() 80 f = site.storage.write("%s|%s-%s" % (inner_path, 99 * 1024 * 1024, 99 * 1024 * 1024 + 1024 * 1024), b"helloend" * 1024) 81 time_write_end = time.time() - s 82 83 # Verify writes 84 f = site.storage.open(inner_path) 85 assert f.read(10) == b"hellostart" 86 f.seek(99 * 1024 * 1024) 87 assert f.read(8) == b"helloend" 88 f.close() 89 90 site.storage.delete(inner_path) 91 92 # Writing to end shold not take much longer, than writing to start 93 assert time_write_end <= max(0.1, time_write_start * 1.1) 94 95 def testRangedFileRequest(self, file_server, site, site_temp): 96 inner_path = self.createBigfile(site) 97 98 file_server.sites[site.address] = site 99 client = FileServer(file_server.ip, 1545) 100 client.sites[site_temp.address] = site_temp 101 site_temp.connection_server = client 102 connection = client.getConnection(file_server.ip, 1544) 103 104 # Add file_server as peer to client 105 peer_file_server = site_temp.addPeer(file_server.ip, 1544) 106 107 buff = peer_file_server.getFile(site_temp.address, "%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024)) 108 109 assert len(buff.getvalue()) == 1 * 1024 * 1024 # Correct block size 110 assert buff.getvalue().startswith(b"Test524") # Correct data 111 buff.seek(0) 112 assert site.content_manager.verifyPiece(inner_path, 5 * 1024 * 1024, buff) # Correct hash 113 114 connection.close() 115 client.stop() 116 117 def testRangedFileDownload(self, file_server, site, site_temp): 118 inner_path = self.createBigfile(site) 119 120 # Init source server 121 site.connection_server = file_server 122 file_server.sites[site.address] = site 123 124 # Make sure the file and the piecemap in the optional hashfield 125 file_info = site.content_manager.getFileInfo(inner_path) 126 assert site.content_manager.hashfield.hasHash(file_info["sha512"]) 127 128 piecemap_hash = site.content_manager.getFileInfo(file_info["piecemap"])["sha512"] 129 assert site.content_manager.hashfield.hasHash(piecemap_hash) 130 131 # Init client server 132 client = ConnectionServer(file_server.ip, 1545) 133 site_temp.connection_server = client 134 peer_client = site_temp.addPeer(file_server.ip, 1544) 135 136 # Download site 137 site_temp.download(blind_includes=True).join(timeout=5) 138 139 bad_files = site_temp.storage.verifyFiles(quick_check=True)["bad_files"] 140 assert not bad_files 141 142 # client_piecefield = peer_client.piecefields[file_info["sha512"]].tostring() 143 # assert client_piecefield == "1" * 10 144 145 # Download 5. and 10. block 146 147 site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024)) 148 site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024)) 149 150 # Verify 0. block not downloaded 151 f = site_temp.storage.open(inner_path) 152 assert f.read(10) == b"\0" * 10 153 # Verify 5. and 10. block downloaded 154 f.seek(5 * 1024 * 1024) 155 assert f.read(7) == b"Test524" 156 f.seek(9 * 1024 * 1024) 157 assert f.read(7) == b"943---T" 158 159 # Verify hashfield 160 assert set(site_temp.content_manager.hashfield) == set([18343, 43727]) # 18343: data/optional.any.iso, 43727: data/optional.any.iso.hashmap.msgpack 161 162 def testOpenBigfile(self, file_server, site, site_temp): 163 inner_path = self.createBigfile(site) 164 165 # Init source server 166 site.connection_server = file_server 167 file_server.sites[site.address] = site 168 169 # Init client server 170 client = ConnectionServer(file_server.ip, 1545) 171 site_temp.connection_server = client 172 site_temp.addPeer(file_server.ip, 1544) 173 174 # Download site 175 site_temp.download(blind_includes=True).join(timeout=5) 176 177 # Open virtual file 178 assert not site_temp.storage.isFile(inner_path) 179 180 with site_temp.storage.openBigfile(inner_path) as f: 181 with Spy.Spy(FileRequest, "route") as requests: 182 f.seek(5 * 1024 * 1024) 183 assert f.read(7) == b"Test524" 184 185 f.seek(9 * 1024 * 1024) 186 assert f.read(7) == b"943---T" 187 188 assert len(requests) == 4 # 1x peicemap + 1x getpiecefield + 2x for pieces 189 190 assert set(site_temp.content_manager.hashfield) == set([18343, 43727]) 191 192 assert site_temp.storage.piecefields[f.sha512].tostring() == "0000010001" 193 assert f.sha512 in site_temp.getSettingsCache()["piecefields"] 194 195 # Test requesting already downloaded 196 with Spy.Spy(FileRequest, "route") as requests: 197 f.seek(5 * 1024 * 1024) 198 assert f.read(7) == b"Test524" 199 200 assert len(requests) == 0 201 202 # Test requesting multi-block overflow reads 203 with Spy.Spy(FileRequest, "route") as requests: 204 f.seek(5 * 1024 * 1024) # We already have this block 205 data = f.read(1024 * 1024 * 3) # Our read overflow to 6. and 7. block 206 assert data.startswith(b"Test524") 207 assert data.endswith(b"Test838-") 208 assert b"\0" not in data # No null bytes allowed 209 210 assert len(requests) == 2 # Two block download 211 212 # Test out of range request 213 f.seek(5 * 1024 * 1024) 214 data = f.read(1024 * 1024 * 30) 215 assert len(data) == 10 * 1000 * 1000 - (5 * 1024 * 1024) 216 217 f.seek(30 * 1024 * 1024) 218 data = f.read(1024 * 1024 * 30) 219 assert len(data) == 0 220 221 @pytest.mark.parametrize("piecefield_obj", [BigfilePiecefield, BigfilePiecefieldPacked]) 222 def testPiecefield(self, piecefield_obj, site): 223 testdatas = [ 224 b"\x01" * 100 + b"\x00" * 900 + b"\x01" * 4000 + b"\x00" * 4999 + b"\x01", 225 b"\x00\x01\x00\x01\x00\x01" * 10 + b"\x00\x01" * 90 + b"\x01\x00" * 400 + b"\x00" * 4999, 226 b"\x01" * 10000, 227 b"\x00" * 10000 228 ] 229 for testdata in testdatas: 230 piecefield = piecefield_obj() 231 232 piecefield.frombytes(testdata) 233 assert piecefield.tobytes() == testdata 234 assert piecefield[0] == testdata[0] 235 assert piecefield[100] == testdata[100] 236 assert piecefield[1000] == testdata[1000] 237 assert piecefield[len(testdata) - 1] == testdata[len(testdata) - 1] 238 239 packed = piecefield.pack() 240 piecefield_new = piecefield_obj() 241 piecefield_new.unpack(packed) 242 assert piecefield.tobytes() == piecefield_new.tobytes() 243 assert piecefield_new.tobytes() == testdata 244 245 def testFileGet(self, file_server, site, site_temp): 246 inner_path = self.createBigfile(site) 247 248 # Init source server 249 site.connection_server = file_server 250 file_server.sites[site.address] = site 251 252 # Init client server 253 site_temp.connection_server = FileServer(file_server.ip, 1545) 254 site_temp.connection_server.sites[site_temp.address] = site_temp 255 site_temp.addPeer(file_server.ip, 1544) 256 257 # Download site 258 site_temp.download(blind_includes=True).join(timeout=5) 259 260 # Download second block 261 with site_temp.storage.openBigfile(inner_path) as f: 262 f.seek(1024 * 1024) 263 assert f.read(1024)[0:1] != b"\0" 264 265 # Make sure first block not download 266 with site_temp.storage.open(inner_path) as f: 267 assert f.read(1024)[0:1] == b"\0" 268 269 peer2 = site.addPeer(file_server.ip, 1545, return_peer=True) 270 271 # Should drop error on first block request 272 assert not peer2.getFile(site.address, "%s|0-%s" % (inner_path, 1024 * 1024 * 1)) 273 274 # Should not drop error for second block request 275 assert peer2.getFile(site.address, "%s|%s-%s" % (inner_path, 1024 * 1024 * 1, 1024 * 1024 * 2)) 276 277 def benchmarkPeerMemory(self, site, file_server): 278 # Init source server 279 site.connection_server = file_server 280 file_server.sites[site.address] = site 281 282 import psutil, os 283 meminfo = psutil.Process(os.getpid()).memory_info 284 285 mem_s = meminfo()[0] 286 s = time.time() 287 for i in range(25000): 288 site.addPeer(file_server.ip, i) 289 print("%.3fs MEM: + %sKB" % (time.time() - s, (meminfo()[0] - mem_s) / 1024)) # 0.082s MEM: + 6800KB 290 print(list(site.peers.values())[0].piecefields) 291 292 def testUpdatePiecefield(self, file_server, site, site_temp): 293 inner_path = self.createBigfile(site) 294 295 server1 = file_server 296 server1.sites[site.address] = site 297 server2 = FileServer(file_server.ip, 1545) 298 server2.sites[site_temp.address] = site_temp 299 site_temp.connection_server = server2 300 301 # Add file_server as peer to client 302 server2_peer1 = site_temp.addPeer(file_server.ip, 1544) 303 304 # Testing piecefield sync 305 assert len(server2_peer1.piecefields) == 0 306 assert server2_peer1.updatePiecefields() # Query piecefields from peer 307 assert len(server2_peer1.piecefields) > 0 308 309 def testWorkerManagerPiecefieldDeny(self, file_server, site, site_temp): 310 inner_path = self.createBigfile(site) 311 312 server1 = file_server 313 server1.sites[site.address] = site 314 server2 = FileServer(file_server.ip, 1545) 315 server2.sites[site_temp.address] = site_temp 316 site_temp.connection_server = server2 317 318 # Add file_server as peer to client 319 server2_peer1 = site_temp.addPeer(file_server.ip, 1544) # Working 320 321 site_temp.downloadContent("content.json", download_files=False) 322 site_temp.needFile("data/optional.any.iso.piecemap.msgpack") 323 324 # Add fake peers with optional files downloaded 325 for i in range(5): 326 fake_peer = site_temp.addPeer("127.0.1.%s" % i, 1544) 327 fake_peer.hashfield = site.content_manager.hashfield 328 fake_peer.has_hashfield = True 329 330 with Spy.Spy(WorkerManager, "addWorker") as requests: 331 site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024)) 332 site_temp.needFile("%s|%s-%s" % (inner_path, 6 * 1024 * 1024, 7 * 1024 * 1024)) 333 334 # It should only request parts from peer1 as the other peers does not have the requested parts in piecefields 335 assert len([request[1] for request in requests if request[1] != server2_peer1]) == 0 336 337 def testWorkerManagerPiecefieldDownload(self, file_server, site, site_temp): 338 inner_path = self.createBigfile(site) 339 340 server1 = file_server 341 server1.sites[site.address] = site 342 server2 = FileServer(file_server.ip, 1545) 343 server2.sites[site_temp.address] = site_temp 344 site_temp.connection_server = server2 345 sha512 = site.content_manager.getFileInfo(inner_path)["sha512"] 346 347 # Create 10 fake peer for each piece 348 for i in range(10): 349 peer = Peer(file_server.ip, 1544, site_temp, server2) 350 peer.piecefields[sha512][i] = b"\x01" 351 peer.updateHashfield = mock.MagicMock(return_value=False) 352 peer.updatePiecefields = mock.MagicMock(return_value=False) 353 peer.findHashIds = mock.MagicMock(return_value={"nope": []}) 354 peer.hashfield = site.content_manager.hashfield 355 peer.has_hashfield = True 356 peer.key = "Peer:%s" % i 357 site_temp.peers["Peer:%s" % i] = peer 358 359 site_temp.downloadContent("content.json", download_files=False) 360 site_temp.needFile("data/optional.any.iso.piecemap.msgpack") 361 362 with Spy.Spy(Peer, "getFile") as requests: 363 for i in range(10): 364 site_temp.needFile("%s|%s-%s" % (inner_path, i * 1024 * 1024, (i + 1) * 1024 * 1024)) 365 366 assert len(requests) == 10 367 for i in range(10): 368 assert requests[i][0] == site_temp.peers["Peer:%s" % i] # Every part should be requested from piece owner peer 369 370 def testDownloadStats(self, file_server, site, site_temp): 371 inner_path = self.createBigfile(site) 372 373 # Init source server 374 site.connection_server = file_server 375 file_server.sites[site.address] = site 376 377 # Init client server 378 client = ConnectionServer(file_server.ip, 1545) 379 site_temp.connection_server = client 380 site_temp.addPeer(file_server.ip, 1544) 381 382 # Download site 383 site_temp.download(blind_includes=True).join(timeout=5) 384 385 # Open virtual file 386 assert not site_temp.storage.isFile(inner_path) 387 388 # Check size before downloads 389 assert site_temp.settings["size"] < 10 * 1024 * 1024 390 assert site_temp.settings["optional_downloaded"] == 0 391 size_piecemap = site_temp.content_manager.getFileInfo(inner_path + ".piecemap.msgpack")["size"] 392 size_bigfile = site_temp.content_manager.getFileInfo(inner_path)["size"] 393 394 with site_temp.storage.openBigfile(inner_path) as f: 395 assert b"\0" not in f.read(1024) 396 assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile 397 398 with site_temp.storage.openBigfile(inner_path) as f: 399 # Don't count twice 400 assert b"\0" not in f.read(1024) 401 assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile 402 403 # Add second block 404 assert b"\0" not in f.read(1024 * 1024) 405 assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile 406 407 def testPrebuffer(self, file_server, site, site_temp): 408 inner_path = self.createBigfile(site) 409 410 # Init source server 411 site.connection_server = file_server 412 file_server.sites[site.address] = site 413 414 # Init client server 415 client = ConnectionServer(file_server.ip, 1545) 416 site_temp.connection_server = client 417 site_temp.addPeer(file_server.ip, 1544) 418 419 # Download site 420 site_temp.download(blind_includes=True).join(timeout=5) 421 422 # Open virtual file 423 assert not site_temp.storage.isFile(inner_path) 424 425 with site_temp.storage.openBigfile(inner_path, prebuffer=1024 * 1024 * 2) as f: 426 with Spy.Spy(FileRequest, "route") as requests: 427 f.seek(5 * 1024 * 1024) 428 assert f.read(7) == b"Test524" 429 # assert len(requests) == 3 # 1x piecemap + 1x getpiecefield + 1x for pieces 430 assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 2 431 432 time.sleep(0.5) # Wait prebuffer download 433 434 sha512 = site.content_manager.getFileInfo(inner_path)["sha512"] 435 assert site_temp.storage.piecefields[sha512].tostring() == "0000011100" 436 437 # No prebuffer beyond end of the file 438 f.seek(9 * 1024 * 1024) 439 assert b"\0" not in f.read(7) 440 441 assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 0 442 443 def testDownloadAllPieces(self, file_server, site, site_temp): 444 inner_path = self.createBigfile(site) 445 446 # Init source server 447 site.connection_server = file_server 448 file_server.sites[site.address] = site 449 450 # Init client server 451 client = ConnectionServer(file_server.ip, 1545) 452 site_temp.connection_server = client 453 site_temp.addPeer(file_server.ip, 1544) 454 455 # Download site 456 site_temp.download(blind_includes=True).join(timeout=5) 457 458 # Open virtual file 459 assert not site_temp.storage.isFile(inner_path) 460 461 with Spy.Spy(FileRequest, "route") as requests: 462 site_temp.needFile("%s|all" % inner_path) 463 464 assert len(requests) == 12 # piecemap.msgpack, getPiecefields, 10 x piece 465 466 # Don't re-download already got pieces 467 with Spy.Spy(FileRequest, "route") as requests: 468 site_temp.needFile("%s|all" % inner_path) 469 470 assert len(requests) == 0 471 472 def testFileSize(self, file_server, site, site_temp): 473 inner_path = self.createBigfile(site) 474 475 # Init source server 476 site.connection_server = file_server 477 file_server.sites[site.address] = site 478 479 # Init client server 480 client = ConnectionServer(file_server.ip, 1545) 481 site_temp.connection_server = client 482 site_temp.addPeer(file_server.ip, 1544) 483 484 # Download site 485 site_temp.download(blind_includes=True).join(timeout=5) 486 487 # Open virtual file 488 assert not site_temp.storage.isFile(inner_path) 489 490 # Download first block 491 site_temp.needFile("%s|%s-%s" % (inner_path, 0 * 1024 * 1024, 1 * 1024 * 1024)) 492 assert site_temp.storage.getSize(inner_path) < 1000 * 1000 * 10 # Size on the disk should be smaller than the real size 493 494 site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024)) 495 assert site_temp.storage.getSize(inner_path) == site.storage.getSize(inner_path) 496 497 def testFileRename(self, file_server, site, site_temp): 498 inner_path = self.createBigfile(site) 499 500 # Init source server 501 site.connection_server = file_server 502 file_server.sites[site.address] = site 503 504 # Init client server 505 site_temp.connection_server = FileServer(file_server.ip, 1545) 506 site_temp.connection_server.sites[site_temp.address] = site_temp 507 site_temp.addPeer(file_server.ip, 1544) 508 509 # Download site 510 site_temp.download(blind_includes=True).join(timeout=5) 511 512 with Spy.Spy(FileRequest, "route") as requests: 513 site_temp.needFile("%s|%s-%s" % (inner_path, 0, 1 * self.piece_size)) 514 515 assert len([req for req in requests if req[1] == "streamFile"]) == 2 # 1 piece + piecemap 516 517 # Rename the file 518 inner_path_new = inner_path.replace(".iso", "-new.iso") 519 site.storage.rename(inner_path, inner_path_new) 520 site.storage.delete("data/optional.any.iso.piecemap.msgpack") 521 assert site.content_manager.sign("content.json", self.privatekey, remove_missing_optional=True) 522 523 files_optional = site.content_manager.contents["content.json"]["files_optional"].keys() 524 525 assert "data/optional.any-new.iso.piecemap.msgpack" in files_optional 526 assert "data/optional.any.iso.piecemap.msgpack" not in files_optional 527 assert "data/optional.any.iso" not in files_optional 528 529 with Spy.Spy(FileRequest, "route") as requests: 530 site.publish() 531 time.sleep(0.1) 532 site_temp.download(blind_includes=True).join(timeout=5) # Wait for download 533 534 assert len([req[1] for req in requests if req[1] == "streamFile"]) == 0 535 536 with site_temp.storage.openBigfile(inner_path_new, prebuffer=0) as f: 537 f.read(1024) 538 539 # First piece already downloaded 540 assert [req for req in requests if req[1] == "streamFile"] == [] 541 542 # Second piece needs to be downloaded + changed piecemap 543 f.seek(self.piece_size) 544 f.read(1024) 545 assert [req[3]["inner_path"] for req in requests if req[1] == "streamFile"] == [inner_path_new + ".piecemap.msgpack", inner_path_new] 546 547 @pytest.mark.parametrize("size", [1024 * 3, 1024 * 1024 * 3, 1024 * 1024 * 30]) 548 def testNullFileRead(self, file_server, site, site_temp, size): 549 inner_path = "data/optional.iso" 550 551 f = site.storage.open(inner_path, "w") 552 f.write("\0" * size) 553 f.close() 554 assert site.content_manager.sign("content.json", self.privatekey) 555 556 # Init source server 557 site.connection_server = file_server 558 file_server.sites[site.address] = site 559 560 # Init client server 561 site_temp.connection_server = FileServer(file_server.ip, 1545) 562 site_temp.connection_server.sites[site_temp.address] = site_temp 563 site_temp.addPeer(file_server.ip, 1544) 564 565 # Download site 566 site_temp.download(blind_includes=True).join(timeout=5) 567 568 if "piecemap" in site.content_manager.getFileInfo(inner_path): # Bigfile 569 site_temp.needFile(inner_path + "|all") 570 else: 571 site_temp.needFile(inner_path) 572 573 574 assert site_temp.storage.getSize(inner_path) == size 575