1import time 2import os 3import subprocess 4import shutil 5import collections 6import math 7import warnings 8import base64 9import binascii 10import json 11 12import gevent 13import gevent.lock 14 15from Plugin import PluginManager 16from Debug import Debug 17from Crypt import CryptHash 18with warnings.catch_warnings(): 19 warnings.filterwarnings("ignore") # Ignore missing sha3 warning 20 import merkletools 21 22from util import helper 23from util import Msgpack 24from util.Flag import flag 25import util 26from .BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked 27 28 29# We can only import plugin host clases after the plugins are loaded 30@PluginManager.afterLoad 31def importPluginnedClasses(): 32 global VerifyError, config 33 from Content.ContentManager import VerifyError 34 from Config import config 35 36if "upload_nonces" not in locals(): 37 upload_nonces = {} 38 39 40@PluginManager.registerTo("UiRequest") 41class UiRequestPlugin(object): 42 def isCorsAllowed(self, path): 43 if path == "/ZeroNet-Internal/BigfileUpload": 44 return True 45 else: 46 return super(UiRequestPlugin, self).isCorsAllowed(path) 47 48 @helper.encodeResponse 49 def actionBigfileUpload(self): 50 nonce = self.get.get("upload_nonce") 51 if nonce not in upload_nonces: 52 return self.error403("Upload nonce error.") 53 54 upload_info = upload_nonces[nonce] 55 del upload_nonces[nonce] 56 57 self.sendHeader(200, "text/html", noscript=True, extra_headers={ 58 "Access-Control-Allow-Origin": "null", 59 "Access-Control-Allow-Credentials": "true" 60 }) 61 62 self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers 63 64 site = upload_info["site"] 65 inner_path = upload_info["inner_path"] 66 67 with site.storage.open(inner_path, "wb", create_dirs=True) as out_file: 68 merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile( 69 self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file 70 ) 71 72 if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split 73 hash = binascii.hexlify(piecemap_info["sha512_pieces"][0]) 74 hash_id = site.content_manager.hashfield.getHashId(hash) 75 site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True) 76 77 else: # Big file 78 file_name = helper.getFilename(inner_path) 79 site.storage.open(upload_info["piecemap"], "wb").write(Msgpack.pack({file_name: piecemap_info})) 80 81 # Find piecemap and file relative path to content.json 82 file_info = site.content_manager.getFileInfo(inner_path, new_file=True) 83 content_inner_path_dir = helper.getDirname(file_info["content_inner_path"]) 84 piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):] 85 file_relative_path = inner_path[len(content_inner_path_dir):] 86 87 # Add file to content.json 88 if site.storage.isFile(file_info["content_inner_path"]): 89 content = site.storage.loadJson(file_info["content_inner_path"]) 90 else: 91 content = {} 92 if "files_optional" not in content: 93 content["files_optional"] = {} 94 95 content["files_optional"][file_relative_path] = { 96 "sha512": merkle_root, 97 "size": upload_info["size"], 98 "piecemap": piecemap_relative_path, 99 "piece_size": piece_size 100 } 101 102 merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root) 103 site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True) 104 site.storage.writeJson(file_info["content_inner_path"], content) 105 106 site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache 107 108 return json.dumps({ 109 "merkle_root": merkle_root, 110 "piece_num": len(piecemap_info["sha512_pieces"]), 111 "piece_size": piece_size, 112 "inner_path": inner_path 113 }) 114 115 def readMultipartHeaders(self, wsgi_input): 116 found = False 117 for i in range(100): 118 line = wsgi_input.readline() 119 if line == b"\r\n": 120 found = True 121 break 122 if not found: 123 raise Exception("No multipart header found") 124 return i 125 126 def actionFile(self, file_path, *args, **kwargs): 127 if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB 128 path_parts = kwargs["path_parts"] 129 site = self.server.site_manager.get(path_parts["address"]) 130 big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024) 131 if big_file: 132 kwargs["file_obj"] = big_file 133 kwargs["file_size"] = big_file.size 134 135 return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs) 136 137 138@PluginManager.registerTo("UiWebsocket") 139class UiWebsocketPlugin(object): 140 def actionBigfileUploadInit(self, to, inner_path, size): 141 valid_signers = self.site.content_manager.getValidSigners(inner_path) 142 auth_address = self.user.getAuthAddress(self.site.address) 143 if not self.site.settings["own"] and auth_address not in valid_signers: 144 self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers)) 145 return self.response(to, {"error": "Forbidden, you can only modify your own files"}) 146 147 nonce = CryptHash.random() 148 piece_size = 1024 * 1024 149 inner_path = self.site.content_manager.sanitizePath(inner_path) 150 file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True) 151 152 content_inner_path_dir = helper.getDirname(file_info["content_inner_path"]) 153 file_relative_path = inner_path[len(content_inner_path_dir):] 154 155 upload_nonces[nonce] = { 156 "added": time.time(), 157 "site": self.site, 158 "inner_path": inner_path, 159 "websocket_client": self, 160 "size": size, 161 "piece_size": piece_size, 162 "piecemap": inner_path + ".piecemap.msgpack" 163 } 164 return { 165 "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce, 166 "piece_size": piece_size, 167 "inner_path": inner_path, 168 "file_relative_path": file_relative_path 169 } 170 171 @flag.no_multiuser 172 def actionSiteSetAutodownloadBigfileLimit(self, to, limit): 173 permissions = self.getPermissions(to) 174 if "ADMIN" not in permissions: 175 return self.response(to, "You don't have permission to run this command") 176 177 self.site.settings["autodownload_bigfile_size_limit"] = int(limit) 178 self.response(to, "ok") 179 180 def actionFileDelete(self, to, inner_path): 181 piecemap_inner_path = inner_path + ".piecemap.msgpack" 182 if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path): 183 # Also delete .piecemap.msgpack file if exists 184 self.log.debug("Deleting piecemap: %s" % piecemap_inner_path) 185 file_info = self.site.content_manager.getFileInfo(piecemap_inner_path) 186 if file_info: 187 content_json = self.site.storage.loadJson(file_info["content_inner_path"]) 188 relative_path = file_info["relative_path"] 189 if relative_path in content_json.get("files_optional", {}): 190 del content_json["files_optional"][relative_path] 191 self.site.storage.writeJson(file_info["content_inner_path"], content_json) 192 self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True) 193 try: 194 self.site.storage.delete(piecemap_inner_path) 195 except Exception as err: 196 self.log.error("File %s delete error: %s" % (piecemap_inner_path, err)) 197 198 return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path) 199 200 201@PluginManager.registerTo("ContentManager") 202class ContentManagerPlugin(object): 203 def getFileInfo(self, inner_path, *args, **kwargs): 204 if "|" not in inner_path: 205 return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs) 206 207 inner_path, file_range = inner_path.split("|") 208 pos_from, pos_to = map(int, file_range.split("-")) 209 file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs) 210 return file_info 211 212 def readFile(self, file_in, size, buff_size=1024 * 64): 213 part_num = 0 214 recv_left = size 215 216 while 1: 217 part_num += 1 218 read_size = min(buff_size, recv_left) 219 part = file_in.read(read_size) 220 221 if not part: 222 break 223 yield part 224 225 if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload 226 time.sleep(0.001) 227 228 recv_left -= read_size 229 if recv_left <= 0: 230 break 231 232 def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None): 233 self.site.settings["has_bigfile"] = True 234 235 recv = 0 236 try: 237 piece_hash = CryptHash.sha512t() 238 piece_hashes = [] 239 piece_recv = 0 240 241 mt = merkletools.MerkleTools() 242 mt.hash_function = CryptHash.sha512t 243 244 part = "" 245 for part in self.readFile(file_in, size): 246 if file_out: 247 file_out.write(part) 248 249 recv += len(part) 250 piece_recv += len(part) 251 piece_hash.update(part) 252 if piece_recv >= piece_size: 253 piece_digest = piece_hash.digest() 254 piece_hashes.append(piece_digest) 255 mt.leaves.append(piece_digest) 256 piece_hash = CryptHash.sha512t() 257 piece_recv = 0 258 259 if len(piece_hashes) % 100 == 0 or recv == size: 260 self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % ( 261 float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024 262 )) 263 part = "" 264 if len(part) > 0: 265 piece_digest = piece_hash.digest() 266 piece_hashes.append(piece_digest) 267 mt.leaves.append(piece_digest) 268 except Exception as err: 269 raise err 270 finally: 271 if file_out: 272 file_out.close() 273 274 mt.make_tree() 275 merkle_root = mt.get_merkle_root() 276 if type(merkle_root) is bytes: # Python <3.5 277 merkle_root = merkle_root.decode() 278 return merkle_root, piece_size, { 279 "sha512_pieces": piece_hashes 280 } 281 282 def hashFile(self, dir_inner_path, file_relative_path, optional=False): 283 inner_path = dir_inner_path + file_relative_path 284 285 file_size = self.site.storage.getSize(inner_path) 286 # Only care about optional files >1MB 287 if not optional or file_size < 1 * 1024 * 1024: 288 return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional) 289 290 back = {} 291 content = self.contents.get(dir_inner_path + "content.json") 292 293 hash = None 294 piecemap_relative_path = None 295 piece_size = None 296 297 # Don't re-hash if it's already in content.json 298 if content and file_relative_path in content.get("files_optional", {}): 299 file_node = content["files_optional"][file_relative_path] 300 if file_node["size"] == file_size: 301 self.log.info("- [SAME SIZE] %s" % file_relative_path) 302 hash = file_node.get("sha512") 303 piecemap_relative_path = file_node.get("piecemap") 304 piece_size = file_node.get("piece_size") 305 306 if not hash or not piecemap_relative_path: # Not in content.json yet 307 if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB 308 return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional) 309 310 self.log.info("- [HASHING] %s" % file_relative_path) 311 merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size) 312 if not hash: 313 hash = merkle_root 314 315 if not piecemap_relative_path: 316 file_name = helper.getFilename(file_relative_path) 317 piecemap_relative_path = file_relative_path + ".piecemap.msgpack" 318 piecemap_inner_path = inner_path + ".piecemap.msgpack" 319 320 self.site.storage.open(piecemap_inner_path, "wb").write(Msgpack.pack({file_name: piecemap_info})) 321 322 back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True)) 323 324 piece_num = int(math.ceil(float(file_size) / piece_size)) 325 326 # Add the merkle root to hashfield 327 hash_id = self.site.content_manager.hashfield.getHashId(hash) 328 self.optionalDownloaded(inner_path, hash_id, file_size, own=True) 329 self.site.storage.piecefields[hash].frombytes(b"\x01" * piece_num) 330 331 back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size} 332 return back 333 334 def getPiecemap(self, inner_path): 335 file_info = self.site.content_manager.getFileInfo(inner_path) 336 piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"] 337 self.site.needFile(piecemap_inner_path, priority=20) 338 piecemap = Msgpack.unpack(self.site.storage.open(piecemap_inner_path, "rb").read())[helper.getFilename(inner_path)] 339 piecemap["piece_size"] = file_info["piece_size"] 340 return piecemap 341 342 def verifyPiece(self, inner_path, pos, piece): 343 piecemap = self.getPiecemap(inner_path) 344 piece_i = int(pos / piecemap["piece_size"]) 345 if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]: 346 raise VerifyError("Invalid hash") 347 return True 348 349 def verifyFile(self, inner_path, file, ignore_same=True): 350 if "|" not in inner_path: 351 return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same) 352 353 inner_path, file_range = inner_path.split("|") 354 pos_from, pos_to = map(int, file_range.split("-")) 355 356 return self.verifyPiece(inner_path, pos_from, file) 357 358 def optionalDownloaded(self, inner_path, hash_id, size=None, own=False): 359 if "|" in inner_path: 360 inner_path, file_range = inner_path.split("|") 361 pos_from, pos_to = map(int, file_range.split("-")) 362 file_info = self.getFileInfo(inner_path) 363 364 # Mark piece downloaded 365 piece_i = int(pos_from / file_info["piece_size"]) 366 self.site.storage.piecefields[file_info["sha512"]][piece_i] = b"\x01" 367 368 # Only add to site size on first request 369 if hash_id in self.hashfield: 370 size = 0 371 elif size > 1024 * 1024: 372 file_info = self.getFileInfo(inner_path) 373 if file_info and "sha512" in file_info: # We already have the file, but not in piecefield 374 sha512 = file_info["sha512"] 375 if sha512 not in self.site.storage.piecefields: 376 self.site.storage.checkBigfile(inner_path) 377 378 return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own) 379 380 def optionalRemoved(self, inner_path, hash_id, size=None): 381 if size and size > 1024 * 1024: 382 file_info = self.getFileInfo(inner_path) 383 sha512 = file_info["sha512"] 384 if sha512 in self.site.storage.piecefields: 385 del self.site.storage.piecefields[sha512] 386 387 # Also remove other pieces of the file from download queue 388 for key in list(self.site.bad_files.keys()): 389 if key.startswith(inner_path + "|"): 390 del self.site.bad_files[key] 391 self.site.worker_manager.removeSolvedFileTasks() 392 return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size) 393 394 395@PluginManager.registerTo("SiteStorage") 396class SiteStoragePlugin(object): 397 def __init__(self, *args, **kwargs): 398 super(SiteStoragePlugin, self).__init__(*args, **kwargs) 399 self.piecefields = collections.defaultdict(BigfilePiecefield) 400 if "piecefields" in self.site.settings.get("cache", {}): 401 for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").items(): 402 if piecefield_packed: 403 self.piecefields[sha512].unpack(base64.b64decode(piecefield_packed)) 404 self.site.settings["cache"]["piecefields"] = {} 405 406 def createSparseFile(self, inner_path, size, sha512=None): 407 file_path = self.getPath(inner_path) 408 409 file_dir = os.path.dirname(file_path) 410 if not os.path.isdir(file_dir): 411 os.makedirs(file_dir) 412 413 f = open(file_path, 'wb') 414 f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB 415 f.close() 416 if os.name == "nt": 417 startupinfo = subprocess.STARTUPINFO() 418 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW 419 subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo) 420 421 if sha512 and sha512 in self.piecefields: 422 self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path) 423 del self.piecefields[sha512] 424 425 def write(self, inner_path, content): 426 if "|" not in inner_path: 427 return super(SiteStoragePlugin, self).write(inner_path, content) 428 429 # Write to specific position by passing |{pos} after the filename 430 inner_path, file_range = inner_path.split("|") 431 pos_from, pos_to = map(int, file_range.split("-")) 432 file_path = self.getPath(inner_path) 433 434 # Create dir if not exist 435 file_dir = os.path.dirname(file_path) 436 if not os.path.isdir(file_dir): 437 os.makedirs(file_dir) 438 439 if not os.path.isfile(file_path): 440 file_info = self.site.content_manager.getFileInfo(inner_path) 441 self.createSparseFile(inner_path, file_info["size"]) 442 443 # Write file 444 with open(file_path, "rb+") as file: 445 file.seek(pos_from) 446 if hasattr(content, 'read'): # File-like object 447 shutil.copyfileobj(content, file) # Write buff to disk 448 else: # Simple string 449 file.write(content) 450 del content 451 self.onUpdated(inner_path) 452 453 def checkBigfile(self, inner_path): 454 file_info = self.site.content_manager.getFileInfo(inner_path) 455 if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file 456 return False 457 458 self.site.settings["has_bigfile"] = True 459 file_path = self.getPath(inner_path) 460 sha512 = file_info["sha512"] 461 piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"])) 462 if os.path.isfile(file_path): 463 if sha512 not in self.piecefields: 464 if open(file_path, "rb").read(128) == b"\0" * 128: 465 piece_data = b"\x00" 466 else: 467 piece_data = b"\x01" 468 self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data)) 469 self.piecefields[sha512].frombytes(piece_data * piece_num) 470 else: 471 self.log.debug("Creating bigfile: %s" % inner_path) 472 self.createSparseFile(inner_path, file_info["size"], sha512) 473 self.piecefields[sha512].frombytes(b"\x00" * piece_num) 474 self.log.debug("Created bigfile: %s" % inner_path) 475 return True 476 477 def openBigfile(self, inner_path, prebuffer=0): 478 if not self.checkBigfile(inner_path): 479 return False 480 self.site.needFile(inner_path, blocking=False) # Download piecemap 481 return BigFile(self.site, inner_path, prebuffer=prebuffer) 482 483 484class BigFile(object): 485 def __init__(self, site, inner_path, prebuffer=0): 486 self.site = site 487 self.inner_path = inner_path 488 file_path = site.storage.getPath(inner_path) 489 file_info = self.site.content_manager.getFileInfo(inner_path) 490 self.piece_size = file_info["piece_size"] 491 self.sha512 = file_info["sha512"] 492 self.size = file_info["size"] 493 self.prebuffer = prebuffer 494 self.read_bytes = 0 495 496 self.piecefield = self.site.storage.piecefields[self.sha512] 497 self.f = open(file_path, "rb+") 498 self.read_lock = gevent.lock.Semaphore() 499 500 def read(self, buff=64 * 1024): 501 with self.read_lock: 502 pos = self.f.tell() 503 read_until = min(self.size, pos + buff) 504 requests = [] 505 # Request all required blocks 506 while 1: 507 piece_i = int(pos / self.piece_size) 508 if piece_i * self.piece_size >= read_until: 509 break 510 pos_from = piece_i * self.piece_size 511 pos_to = pos_from + self.piece_size 512 if not self.piecefield[piece_i]: 513 requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10)) 514 pos += self.piece_size 515 516 if not all(requests): 517 return None 518 519 # Request prebuffer 520 if self.prebuffer: 521 prebuffer_until = min(self.size, read_until + self.prebuffer) 522 priority = 3 523 while 1: 524 piece_i = int(pos / self.piece_size) 525 if piece_i * self.piece_size >= prebuffer_until: 526 break 527 pos_from = piece_i * self.piece_size 528 pos_to = pos_from + self.piece_size 529 if not self.piecefield[piece_i]: 530 self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority)) 531 priority -= 1 532 pos += self.piece_size 533 534 gevent.joinall(requests) 535 self.read_bytes += buff 536 537 # Increase buffer for long reads 538 if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024: 539 self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path) 540 self.prebuffer = 5 * 1024 * 1024 541 542 return self.f.read(buff) 543 544 def seek(self, pos, whence=0): 545 with self.read_lock: 546 if whence == 2: # Relative from file end 547 pos = self.size + pos # Use the real size instead of size on the disk 548 whence = 0 549 return self.f.seek(pos, whence) 550 551 def tell(self): 552 return self.f.tell() 553 554 def close(self): 555 self.f.close() 556 557 def __enter__(self): 558 return self 559 560 def __exit__(self, exc_type, exc_val, exc_tb): 561 self.close() 562 563 564@PluginManager.registerTo("WorkerManager") 565class WorkerManagerPlugin(object): 566 def addTask(self, inner_path, *args, **kwargs): 567 file_info = kwargs.get("file_info") 568 if file_info and "piecemap" in file_info: # Bigfile 569 self.site.settings["has_bigfile"] = True 570 571 piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"] 572 piecemap_task = None 573 if not self.site.storage.isFile(piecemap_inner_path): 574 # Start download piecemap 575 piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30) 576 autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit) 577 if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit: 578 gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces 579 580 if "|" in inner_path: 581 # Start download piece 582 task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs) 583 584 inner_path, file_range = inner_path.split("|") 585 pos_from, pos_to = map(int, file_range.split("-")) 586 task["piece_i"] = int(pos_from / file_info["piece_size"]) 587 task["sha512"] = file_info["sha512"] 588 else: 589 if inner_path in self.site.bad_files: 590 del self.site.bad_files[inner_path] 591 if piecemap_task: 592 task = piecemap_task 593 else: 594 fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified 595 fake_evt.set(True) 596 task = {"evt": fake_evt} 597 598 if not self.site.storage.isFile(inner_path): 599 self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"]) 600 piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"])) 601 self.site.storage.piecefields[file_info["sha512"]].frombytes(b"\x00" * piece_num) 602 else: 603 task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs) 604 return task 605 606 def taskAddPeer(self, task, peer): 607 if "piece_i" in task: 608 if not peer.piecefields[task["sha512"]][task["piece_i"]]: 609 if task["sha512"] not in peer.piecefields: 610 gevent.spawn(peer.updatePiecefields, force=True) 611 elif not task["peers"]: 612 gevent.spawn(peer.updatePiecefields) 613 614 return False # Deny to add peers to task if file not in piecefield 615 return super(WorkerManagerPlugin, self).taskAddPeer(task, peer) 616 617 618@PluginManager.registerTo("FileRequest") 619class FileRequestPlugin(object): 620 def isReadable(self, site, inner_path, file, pos): 621 # Peek into file 622 if file.read(10) == b"\0" * 10: 623 # Looks empty, but makes sures we don't have that piece 624 file_info = site.content_manager.getFileInfo(inner_path) 625 if "piece_size" in file_info: 626 piece_i = int(pos / file_info["piece_size"]) 627 if not site.storage.piecefields[file_info["sha512"]][piece_i]: 628 return False 629 # Seek back to position we want to read 630 file.seek(pos) 631 return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos) 632 633 def actionGetPiecefields(self, params): 634 site = self.sites.get(params["site"]) 635 if not site or not site.isServing(): # Site unknown or not serving 636 self.response({"error": "Unknown site"}) 637 return False 638 639 # Add peer to site if not added before 640 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) 641 if not peer.connection: # Just added 642 peer.connect(self.connection) # Assign current connection to peer 643 644 piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.items()} 645 self.response({"piecefields_packed": piecefields_packed}) 646 647 def actionSetPiecefields(self, params): 648 site = self.sites.get(params["site"]) 649 if not site or not site.isServing(): # Site unknown or not serving 650 self.response({"error": "Unknown site"}) 651 self.connection.badAction(5) 652 return False 653 654 # Add or get peer 655 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection) 656 if not peer.connection: 657 peer.connect(self.connection) 658 659 peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked) 660 for sha512, piecefield_packed in params["piecefields_packed"].items(): 661 peer.piecefields[sha512].unpack(piecefield_packed) 662 site.settings["has_bigfile"] = True 663 664 self.response({"ok": "Updated"}) 665 666 667@PluginManager.registerTo("Peer") 668class PeerPlugin(object): 669 def __getattr__(self, key): 670 if key == "piecefields": 671 self.piecefields = collections.defaultdict(BigfilePiecefieldPacked) 672 return self.piecefields 673 elif key == "time_piecefields_updated": 674 self.time_piecefields_updated = None 675 return self.time_piecefields_updated 676 else: 677 return super(PeerPlugin, self).__getattr__(key) 678 679 @util.Noparallel(ignore_args=True) 680 def updatePiecefields(self, force=False): 681 if self.connection and self.connection.handshake.get("rev", 0) < 2190: 682 return False # Not supported 683 684 # Don't update piecefield again in 1 min 685 if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force: 686 return False 687 688 self.time_piecefields_updated = time.time() 689 res = self.request("getPiecefields", {"site": self.site.address}) 690 if not res or "error" in res: 691 return False 692 693 self.piecefields = collections.defaultdict(BigfilePiecefieldPacked) 694 try: 695 for sha512, piecefield_packed in res["piecefields_packed"].items(): 696 self.piecefields[sha512].unpack(piecefield_packed) 697 except Exception as err: 698 self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err)) 699 700 return self.piecefields 701 702 def sendMyHashfield(self, *args, **kwargs): 703 return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs) 704 705 def updateHashfield(self, *args, **kwargs): 706 if self.site.settings.get("has_bigfile"): 707 thread = gevent.spawn(self.updatePiecefields, *args, **kwargs) 708 back = super(PeerPlugin, self).updateHashfield(*args, **kwargs) 709 thread.join() 710 return back 711 else: 712 return super(PeerPlugin, self).updateHashfield(*args, **kwargs) 713 714 def getFile(self, site, inner_path, *args, **kwargs): 715 if "|" in inner_path: 716 inner_path, file_range = inner_path.split("|") 717 pos_from, pos_to = map(int, file_range.split("-")) 718 kwargs["pos_from"] = pos_from 719 kwargs["pos_to"] = pos_to 720 return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs) 721 722 723@PluginManager.registerTo("Site") 724class SitePlugin(object): 725 def isFileDownloadAllowed(self, inner_path, file_info): 726 if "piecemap" in file_info: 727 file_size_mb = file_info["size"] / 1024 / 1024 728 if config.bigfile_size_limit and file_size_mb > config.bigfile_size_limit: 729 self.log.debug( 730 "Bigfile size %s too large: %sMB > %sMB, skipping..." % 731 (inner_path, file_size_mb, config.bigfile_size_limit) 732 ) 733 return False 734 735 file_info = file_info.copy() 736 file_info["size"] = file_info["piece_size"] 737 return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info) 738 739 def getSettingsCache(self): 740 back = super(SitePlugin, self).getSettingsCache() 741 if self.storage.piecefields: 742 back["piecefields"] = {sha512: base64.b64encode(piecefield.pack()).decode("utf8") for sha512, piecefield in self.storage.piecefields.items()} 743 return back 744 745 def needFile(self, inner_path, *args, **kwargs): 746 if inner_path.endswith("|all"): 747 @util.Pooled(20) 748 def pooledNeedBigfile(inner_path, *args, **kwargs): 749 if inner_path not in self.bad_files: 750 self.log.debug("Cancelled piece, skipping %s" % inner_path) 751 return False 752 return self.needFile(inner_path, *args, **kwargs) 753 754 inner_path = inner_path.replace("|all", "") 755 file_info = self.needFileInfo(inner_path) 756 757 # Use default function to download non-optional file 758 if "piece_size" not in file_info: 759 return super(SitePlugin, self).needFile(inner_path, *args, **kwargs) 760 761 file_size = file_info["size"] 762 piece_size = file_info["piece_size"] 763 764 piece_num = int(math.ceil(float(file_size) / piece_size)) 765 766 file_threads = [] 767 768 piecefield = self.storage.piecefields.get(file_info["sha512"]) 769 770 for piece_i in range(piece_num): 771 piece_from = piece_i * piece_size 772 piece_to = min(file_size, piece_from + piece_size) 773 if not piecefield or not piecefield[piece_i]: 774 inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to) 775 self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1) 776 res = pooledNeedBigfile(inner_path_piece, blocking=False) 777 if res is not True and res is not False: 778 file_threads.append(res) 779 gevent.joinall(file_threads) 780 else: 781 return super(SitePlugin, self).needFile(inner_path, *args, **kwargs) 782 783 784@PluginManager.registerTo("ConfigPlugin") 785class ConfigPlugin(object): 786 def createArguments(self): 787 group = self.parser.add_argument_group("Bigfile plugin") 788 group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=10, metavar="MB", type=int) 789 group.add_argument('--bigfile_size_limit', help='Maximum size of downloaded big files', default=False, metavar="MB", type=int) 790 791 return super(ConfigPlugin, self).createArguments() 792