1import time
2import os
3import subprocess
4import shutil
5import collections
6import math
7import warnings
8import base64
9import binascii
10import json
11
12import gevent
13import gevent.lock
14
15from Plugin import PluginManager
16from Debug import Debug
17from Crypt import CryptHash
18with warnings.catch_warnings():
19    warnings.filterwarnings("ignore")  # Ignore missing sha3 warning
20    import merkletools
21
22from util import helper
23from util import Msgpack
24from util.Flag import flag
25import util
26from .BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
27
28
29# We can only import plugin host clases after the plugins are loaded
30@PluginManager.afterLoad
31def importPluginnedClasses():
32    global VerifyError, config
33    from Content.ContentManager import VerifyError
34    from Config import config
35
36if "upload_nonces" not in locals():
37    upload_nonces = {}
38
39
40@PluginManager.registerTo("UiRequest")
41class UiRequestPlugin(object):
42    def isCorsAllowed(self, path):
43        if path == "/ZeroNet-Internal/BigfileUpload":
44            return True
45        else:
46            return super(UiRequestPlugin, self).isCorsAllowed(path)
47
48    @helper.encodeResponse
49    def actionBigfileUpload(self):
50        nonce = self.get.get("upload_nonce")
51        if nonce not in upload_nonces:
52            return self.error403("Upload nonce error.")
53
54        upload_info = upload_nonces[nonce]
55        del upload_nonces[nonce]
56
57        self.sendHeader(200, "text/html", noscript=True, extra_headers={
58            "Access-Control-Allow-Origin": "null",
59            "Access-Control-Allow-Credentials": "true"
60        })
61
62        self.readMultipartHeaders(self.env['wsgi.input'])  # Skip http headers
63
64        site = upload_info["site"]
65        inner_path = upload_info["inner_path"]
66
67        with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
68            merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
69                self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
70            )
71
72        if len(piecemap_info["sha512_pieces"]) == 1:  # Small file, don't split
73            hash = binascii.hexlify(piecemap_info["sha512_pieces"][0])
74            hash_id = site.content_manager.hashfield.getHashId(hash)
75            site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
76
77        else:  # Big file
78            file_name = helper.getFilename(inner_path)
79            site.storage.open(upload_info["piecemap"], "wb").write(Msgpack.pack({file_name: piecemap_info}))
80
81            # Find piecemap and file relative path to content.json
82            file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
83            content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
84            piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
85            file_relative_path = inner_path[len(content_inner_path_dir):]
86
87            # Add file to content.json
88            if site.storage.isFile(file_info["content_inner_path"]):
89                content = site.storage.loadJson(file_info["content_inner_path"])
90            else:
91                content = {}
92            if "files_optional" not in content:
93                content["files_optional"] = {}
94
95            content["files_optional"][file_relative_path] = {
96                "sha512": merkle_root,
97                "size": upload_info["size"],
98                "piecemap": piecemap_relative_path,
99                "piece_size": piece_size
100            }
101
102            merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
103            site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
104            site.storage.writeJson(file_info["content_inner_path"], content)
105
106            site.content_manager.contents.loadItem(file_info["content_inner_path"])  # reload cache
107
108        return json.dumps({
109            "merkle_root": merkle_root,
110            "piece_num": len(piecemap_info["sha512_pieces"]),
111            "piece_size": piece_size,
112            "inner_path": inner_path
113        })
114
115    def readMultipartHeaders(self, wsgi_input):
116        found = False
117        for i in range(100):
118            line = wsgi_input.readline()
119            if line == b"\r\n":
120                found = True
121                break
122        if not found:
123            raise Exception("No multipart header found")
124        return i
125
126    def actionFile(self, file_path, *args, **kwargs):
127        if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"):  # Only check files larger than 1MB
128            path_parts = kwargs["path_parts"]
129            site = self.server.site_manager.get(path_parts["address"])
130            big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
131            if big_file:
132                kwargs["file_obj"] = big_file
133                kwargs["file_size"] = big_file.size
134
135        return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
136
137
138@PluginManager.registerTo("UiWebsocket")
139class UiWebsocketPlugin(object):
140    def actionBigfileUploadInit(self, to, inner_path, size):
141        valid_signers = self.site.content_manager.getValidSigners(inner_path)
142        auth_address = self.user.getAuthAddress(self.site.address)
143        if not self.site.settings["own"] and auth_address not in valid_signers:
144            self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
145            return self.response(to, {"error": "Forbidden, you can only modify your own files"})
146
147        nonce = CryptHash.random()
148        piece_size = 1024 * 1024
149        inner_path = self.site.content_manager.sanitizePath(inner_path)
150        file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
151
152        content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
153        file_relative_path = inner_path[len(content_inner_path_dir):]
154
155        upload_nonces[nonce] = {
156            "added": time.time(),
157            "site": self.site,
158            "inner_path": inner_path,
159            "websocket_client": self,
160            "size": size,
161            "piece_size": piece_size,
162            "piecemap": inner_path + ".piecemap.msgpack"
163        }
164        return {
165            "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
166            "piece_size": piece_size,
167            "inner_path": inner_path,
168            "file_relative_path": file_relative_path
169        }
170
171    @flag.no_multiuser
172    def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
173        permissions = self.getPermissions(to)
174        if "ADMIN" not in permissions:
175            return self.response(to, "You don't have permission to run this command")
176
177        self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
178        self.response(to, "ok")
179
180    def actionFileDelete(self, to, inner_path):
181        piecemap_inner_path = inner_path + ".piecemap.msgpack"
182        if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
183            # Also delete .piecemap.msgpack file if exists
184            self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
185            file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
186            if file_info:
187                content_json = self.site.storage.loadJson(file_info["content_inner_path"])
188                relative_path = file_info["relative_path"]
189                if relative_path in content_json.get("files_optional", {}):
190                    del content_json["files_optional"][relative_path]
191                    self.site.storage.writeJson(file_info["content_inner_path"], content_json)
192                    self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
193                    try:
194                        self.site.storage.delete(piecemap_inner_path)
195                    except Exception as err:
196                        self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
197
198        return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
199
200
201@PluginManager.registerTo("ContentManager")
202class ContentManagerPlugin(object):
203    def getFileInfo(self, inner_path, *args, **kwargs):
204        if "|" not in inner_path:
205            return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
206
207        inner_path, file_range = inner_path.split("|")
208        pos_from, pos_to = map(int, file_range.split("-"))
209        file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
210        return file_info
211
212    def readFile(self, file_in, size, buff_size=1024 * 64):
213        part_num = 0
214        recv_left = size
215
216        while 1:
217            part_num += 1
218            read_size = min(buff_size, recv_left)
219            part = file_in.read(read_size)
220
221            if not part:
222                break
223            yield part
224
225            if part_num % 100 == 0:  # Avoid blocking ZeroNet execution during upload
226                time.sleep(0.001)
227
228            recv_left -= read_size
229            if recv_left <= 0:
230                break
231
232    def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
233        self.site.settings["has_bigfile"] = True
234
235        recv = 0
236        try:
237            piece_hash = CryptHash.sha512t()
238            piece_hashes = []
239            piece_recv = 0
240
241            mt = merkletools.MerkleTools()
242            mt.hash_function = CryptHash.sha512t
243
244            part = ""
245            for part in self.readFile(file_in, size):
246                if file_out:
247                    file_out.write(part)
248
249                recv += len(part)
250                piece_recv += len(part)
251                piece_hash.update(part)
252                if piece_recv >= piece_size:
253                    piece_digest = piece_hash.digest()
254                    piece_hashes.append(piece_digest)
255                    mt.leaves.append(piece_digest)
256                    piece_hash = CryptHash.sha512t()
257                    piece_recv = 0
258
259                    if len(piece_hashes) % 100 == 0 or recv == size:
260                        self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
261                            float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
262                        ))
263                        part = ""
264            if len(part) > 0:
265                piece_digest = piece_hash.digest()
266                piece_hashes.append(piece_digest)
267                mt.leaves.append(piece_digest)
268        except Exception as err:
269            raise err
270        finally:
271            if file_out:
272                file_out.close()
273
274        mt.make_tree()
275        merkle_root = mt.get_merkle_root()
276        if type(merkle_root) is bytes:  # Python <3.5
277            merkle_root = merkle_root.decode()
278        return merkle_root, piece_size, {
279            "sha512_pieces": piece_hashes
280        }
281
282    def hashFile(self, dir_inner_path, file_relative_path, optional=False):
283        inner_path = dir_inner_path + file_relative_path
284
285        file_size = self.site.storage.getSize(inner_path)
286        # Only care about optional files >1MB
287        if not optional or file_size < 1 * 1024 * 1024:
288            return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
289
290        back = {}
291        content = self.contents.get(dir_inner_path + "content.json")
292
293        hash = None
294        piecemap_relative_path = None
295        piece_size = None
296
297        # Don't re-hash if it's already in content.json
298        if content and file_relative_path in content.get("files_optional", {}):
299            file_node = content["files_optional"][file_relative_path]
300            if file_node["size"] == file_size:
301                self.log.info("- [SAME SIZE] %s" % file_relative_path)
302                hash = file_node.get("sha512")
303                piecemap_relative_path = file_node.get("piecemap")
304                piece_size = file_node.get("piece_size")
305
306        if not hash or not piecemap_relative_path:  # Not in content.json yet
307            if file_size < 5 * 1024 * 1024:  # Don't create piecemap automatically for files smaller than 5MB
308                return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
309
310            self.log.info("- [HASHING] %s" % file_relative_path)
311            merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
312            if not hash:
313                hash = merkle_root
314
315            if not piecemap_relative_path:
316                file_name = helper.getFilename(file_relative_path)
317                piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
318                piecemap_inner_path = inner_path + ".piecemap.msgpack"
319
320                self.site.storage.open(piecemap_inner_path, "wb").write(Msgpack.pack({file_name: piecemap_info}))
321
322                back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
323
324        piece_num = int(math.ceil(float(file_size) / piece_size))
325
326        # Add the merkle root to hashfield
327        hash_id = self.site.content_manager.hashfield.getHashId(hash)
328        self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
329        self.site.storage.piecefields[hash].frombytes(b"\x01" * piece_num)
330
331        back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
332        return back
333
334    def getPiecemap(self, inner_path):
335        file_info = self.site.content_manager.getFileInfo(inner_path)
336        piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
337        self.site.needFile(piecemap_inner_path, priority=20)
338        piecemap = Msgpack.unpack(self.site.storage.open(piecemap_inner_path, "rb").read())[helper.getFilename(inner_path)]
339        piecemap["piece_size"] = file_info["piece_size"]
340        return piecemap
341
342    def verifyPiece(self, inner_path, pos, piece):
343        piecemap = self.getPiecemap(inner_path)
344        piece_i = int(pos / piecemap["piece_size"])
345        if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
346            raise VerifyError("Invalid hash")
347        return True
348
349    def verifyFile(self, inner_path, file, ignore_same=True):
350        if "|" not in inner_path:
351            return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
352
353        inner_path, file_range = inner_path.split("|")
354        pos_from, pos_to = map(int, file_range.split("-"))
355
356        return self.verifyPiece(inner_path, pos_from, file)
357
358    def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
359        if "|" in inner_path:
360            inner_path, file_range = inner_path.split("|")
361            pos_from, pos_to = map(int, file_range.split("-"))
362            file_info = self.getFileInfo(inner_path)
363
364            # Mark piece downloaded
365            piece_i = int(pos_from / file_info["piece_size"])
366            self.site.storage.piecefields[file_info["sha512"]][piece_i] = b"\x01"
367
368            # Only add to site size on first request
369            if hash_id in self.hashfield:
370                size = 0
371        elif size > 1024 * 1024:
372            file_info = self.getFileInfo(inner_path)
373            if file_info and "sha512" in file_info:  # We already have the file, but not in piecefield
374                sha512 = file_info["sha512"]
375                if sha512 not in self.site.storage.piecefields:
376                    self.site.storage.checkBigfile(inner_path)
377
378        return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
379
380    def optionalRemoved(self, inner_path, hash_id, size=None):
381        if size and size > 1024 * 1024:
382            file_info = self.getFileInfo(inner_path)
383            sha512 = file_info["sha512"]
384            if sha512 in self.site.storage.piecefields:
385                del self.site.storage.piecefields[sha512]
386
387            # Also remove other pieces of the file from download queue
388            for key in list(self.site.bad_files.keys()):
389                if key.startswith(inner_path + "|"):
390                    del self.site.bad_files[key]
391            self.site.worker_manager.removeSolvedFileTasks()
392        return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
393
394
395@PluginManager.registerTo("SiteStorage")
396class SiteStoragePlugin(object):
397    def __init__(self, *args, **kwargs):
398        super(SiteStoragePlugin, self).__init__(*args, **kwargs)
399        self.piecefields = collections.defaultdict(BigfilePiecefield)
400        if "piecefields" in self.site.settings.get("cache", {}):
401            for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").items():
402                if piecefield_packed:
403                    self.piecefields[sha512].unpack(base64.b64decode(piecefield_packed))
404            self.site.settings["cache"]["piecefields"] = {}
405
406    def createSparseFile(self, inner_path, size, sha512=None):
407        file_path = self.getPath(inner_path)
408
409        file_dir = os.path.dirname(file_path)
410        if not os.path.isdir(file_dir):
411            os.makedirs(file_dir)
412
413        f = open(file_path, 'wb')
414        f.truncate(min(1024 * 1024 * 5, size))  # Only pre-allocate up to 5MB
415        f.close()
416        if os.name == "nt":
417            startupinfo = subprocess.STARTUPINFO()
418            startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
419            subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
420
421        if sha512 and sha512 in self.piecefields:
422            self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
423            del self.piecefields[sha512]
424
425    def write(self, inner_path, content):
426        if "|" not in inner_path:
427            return super(SiteStoragePlugin, self).write(inner_path, content)
428
429        # Write to specific position by passing |{pos} after the filename
430        inner_path, file_range = inner_path.split("|")
431        pos_from, pos_to = map(int, file_range.split("-"))
432        file_path = self.getPath(inner_path)
433
434        # Create dir if not exist
435        file_dir = os.path.dirname(file_path)
436        if not os.path.isdir(file_dir):
437            os.makedirs(file_dir)
438
439        if not os.path.isfile(file_path):
440            file_info = self.site.content_manager.getFileInfo(inner_path)
441            self.createSparseFile(inner_path, file_info["size"])
442
443        # Write file
444        with open(file_path, "rb+") as file:
445            file.seek(pos_from)
446            if hasattr(content, 'read'):  # File-like object
447                shutil.copyfileobj(content, file)  # Write buff to disk
448            else:  # Simple string
449                file.write(content)
450        del content
451        self.onUpdated(inner_path)
452
453    def checkBigfile(self, inner_path):
454        file_info = self.site.content_manager.getFileInfo(inner_path)
455        if not file_info or (file_info and "piecemap" not in file_info):  # It's not a big file
456            return False
457
458        self.site.settings["has_bigfile"] = True
459        file_path = self.getPath(inner_path)
460        sha512 = file_info["sha512"]
461        piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
462        if os.path.isfile(file_path):
463            if sha512 not in self.piecefields:
464                if open(file_path, "rb").read(128) == b"\0" * 128:
465                    piece_data = b"\x00"
466                else:
467                    piece_data = b"\x01"
468                self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
469                self.piecefields[sha512].frombytes(piece_data * piece_num)
470        else:
471            self.log.debug("Creating bigfile: %s" % inner_path)
472            self.createSparseFile(inner_path, file_info["size"], sha512)
473            self.piecefields[sha512].frombytes(b"\x00" * piece_num)
474            self.log.debug("Created bigfile: %s" % inner_path)
475        return True
476
477    def openBigfile(self, inner_path, prebuffer=0):
478        if not self.checkBigfile(inner_path):
479            return False
480        self.site.needFile(inner_path, blocking=False)  # Download piecemap
481        return BigFile(self.site, inner_path, prebuffer=prebuffer)
482
483
484class BigFile(object):
485    def __init__(self, site, inner_path, prebuffer=0):
486        self.site = site
487        self.inner_path = inner_path
488        file_path = site.storage.getPath(inner_path)
489        file_info = self.site.content_manager.getFileInfo(inner_path)
490        self.piece_size = file_info["piece_size"]
491        self.sha512 = file_info["sha512"]
492        self.size = file_info["size"]
493        self.prebuffer = prebuffer
494        self.read_bytes = 0
495
496        self.piecefield = self.site.storage.piecefields[self.sha512]
497        self.f = open(file_path, "rb+")
498        self.read_lock = gevent.lock.Semaphore()
499
500    def read(self, buff=64 * 1024):
501        with self.read_lock:
502            pos = self.f.tell()
503            read_until = min(self.size, pos + buff)
504            requests = []
505            # Request all required blocks
506            while 1:
507                piece_i = int(pos / self.piece_size)
508                if piece_i * self.piece_size >= read_until:
509                    break
510                pos_from = piece_i * self.piece_size
511                pos_to = pos_from + self.piece_size
512                if not self.piecefield[piece_i]:
513                    requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
514                pos += self.piece_size
515
516            if not all(requests):
517                return None
518
519            # Request prebuffer
520            if self.prebuffer:
521                prebuffer_until = min(self.size, read_until + self.prebuffer)
522                priority = 3
523                while 1:
524                    piece_i = int(pos / self.piece_size)
525                    if piece_i * self.piece_size >= prebuffer_until:
526                        break
527                    pos_from = piece_i * self.piece_size
528                    pos_to = pos_from + self.piece_size
529                    if not self.piecefield[piece_i]:
530                        self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
531                    priority -= 1
532                    pos += self.piece_size
533
534            gevent.joinall(requests)
535            self.read_bytes += buff
536
537            # Increase buffer for long reads
538            if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
539                self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
540                self.prebuffer = 5 * 1024 * 1024
541
542            return self.f.read(buff)
543
544    def seek(self, pos, whence=0):
545        with self.read_lock:
546            if whence == 2:  # Relative from file end
547                pos = self.size + pos  # Use the real size instead of size on the disk
548                whence = 0
549            return self.f.seek(pos, whence)
550
551    def tell(self):
552        return self.f.tell()
553
554    def close(self):
555        self.f.close()
556
557    def __enter__(self):
558        return self
559
560    def __exit__(self, exc_type, exc_val, exc_tb):
561        self.close()
562
563
564@PluginManager.registerTo("WorkerManager")
565class WorkerManagerPlugin(object):
566    def addTask(self, inner_path, *args, **kwargs):
567        file_info = kwargs.get("file_info")
568        if file_info and "piecemap" in file_info:  # Bigfile
569            self.site.settings["has_bigfile"] = True
570
571            piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
572            piecemap_task = None
573            if not self.site.storage.isFile(piecemap_inner_path):
574                # Start download piecemap
575                piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
576                autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
577                if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
578                    gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all")  # Download all pieces
579
580            if "|" in inner_path:
581                # Start download piece
582                task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
583
584                inner_path, file_range = inner_path.split("|")
585                pos_from, pos_to = map(int, file_range.split("-"))
586                task["piece_i"] = int(pos_from / file_info["piece_size"])
587                task["sha512"] = file_info["sha512"]
588            else:
589                if inner_path in self.site.bad_files:
590                    del self.site.bad_files[inner_path]
591                if piecemap_task:
592                    task = piecemap_task
593                else:
594                    fake_evt = gevent.event.AsyncResult()  # Don't download anything if no range specified
595                    fake_evt.set(True)
596                    task = {"evt": fake_evt}
597
598            if not self.site.storage.isFile(inner_path):
599                self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
600                piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
601                self.site.storage.piecefields[file_info["sha512"]].frombytes(b"\x00" * piece_num)
602        else:
603            task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
604        return task
605
606    def taskAddPeer(self, task, peer):
607        if "piece_i" in task:
608            if not peer.piecefields[task["sha512"]][task["piece_i"]]:
609                if task["sha512"] not in peer.piecefields:
610                    gevent.spawn(peer.updatePiecefields, force=True)
611                elif not task["peers"]:
612                    gevent.spawn(peer.updatePiecefields)
613
614                return False  # Deny to add peers to task if file not in piecefield
615        return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
616
617
618@PluginManager.registerTo("FileRequest")
619class FileRequestPlugin(object):
620    def isReadable(self, site, inner_path, file, pos):
621        # Peek into file
622        if file.read(10) == b"\0" * 10:
623            # Looks empty, but makes sures we don't have that piece
624            file_info = site.content_manager.getFileInfo(inner_path)
625            if "piece_size" in file_info:
626                piece_i = int(pos / file_info["piece_size"])
627                if not site.storage.piecefields[file_info["sha512"]][piece_i]:
628                    return False
629        # Seek back to position we want to read
630        file.seek(pos)
631        return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
632
633    def actionGetPiecefields(self, params):
634        site = self.sites.get(params["site"])
635        if not site or not site.isServing():  # Site unknown or not serving
636            self.response({"error": "Unknown site"})
637            return False
638
639        # Add peer to site if not added before
640        peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
641        if not peer.connection:  # Just added
642            peer.connect(self.connection)  # Assign current connection to peer
643
644        piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.items()}
645        self.response({"piecefields_packed": piecefields_packed})
646
647    def actionSetPiecefields(self, params):
648        site = self.sites.get(params["site"])
649        if not site or not site.isServing():  # Site unknown or not serving
650            self.response({"error": "Unknown site"})
651            self.connection.badAction(5)
652            return False
653
654        # Add or get peer
655        peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
656        if not peer.connection:
657            peer.connect(self.connection)
658
659        peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
660        for sha512, piecefield_packed in params["piecefields_packed"].items():
661            peer.piecefields[sha512].unpack(piecefield_packed)
662        site.settings["has_bigfile"] = True
663
664        self.response({"ok": "Updated"})
665
666
667@PluginManager.registerTo("Peer")
668class PeerPlugin(object):
669    def __getattr__(self, key):
670        if key == "piecefields":
671            self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
672            return self.piecefields
673        elif key == "time_piecefields_updated":
674            self.time_piecefields_updated = None
675            return self.time_piecefields_updated
676        else:
677            return super(PeerPlugin, self).__getattr__(key)
678
679    @util.Noparallel(ignore_args=True)
680    def updatePiecefields(self, force=False):
681        if self.connection and self.connection.handshake.get("rev", 0) < 2190:
682            return False  # Not supported
683
684        # Don't update piecefield again in 1 min
685        if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
686            return False
687
688        self.time_piecefields_updated = time.time()
689        res = self.request("getPiecefields", {"site": self.site.address})
690        if not res or "error" in res:
691            return False
692
693        self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
694        try:
695            for sha512, piecefield_packed in res["piecefields_packed"].items():
696                self.piecefields[sha512].unpack(piecefield_packed)
697        except Exception as err:
698            self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
699
700        return self.piecefields
701
702    def sendMyHashfield(self, *args, **kwargs):
703        return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
704
705    def updateHashfield(self, *args, **kwargs):
706        if self.site.settings.get("has_bigfile"):
707            thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
708            back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
709            thread.join()
710            return back
711        else:
712            return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
713
714    def getFile(self, site, inner_path, *args, **kwargs):
715        if "|" in inner_path:
716            inner_path, file_range = inner_path.split("|")
717            pos_from, pos_to = map(int, file_range.split("-"))
718            kwargs["pos_from"] = pos_from
719            kwargs["pos_to"] = pos_to
720        return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
721
722
723@PluginManager.registerTo("Site")
724class SitePlugin(object):
725    def isFileDownloadAllowed(self, inner_path, file_info):
726        if "piecemap" in file_info:
727            file_size_mb = file_info["size"] / 1024 / 1024
728            if config.bigfile_size_limit and file_size_mb > config.bigfile_size_limit:
729                self.log.debug(
730                    "Bigfile size %s too large: %sMB > %sMB, skipping..." %
731                    (inner_path, file_size_mb, config.bigfile_size_limit)
732                )
733                return False
734
735            file_info = file_info.copy()
736            file_info["size"] = file_info["piece_size"]
737        return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
738
739    def getSettingsCache(self):
740        back = super(SitePlugin, self).getSettingsCache()
741        if self.storage.piecefields:
742            back["piecefields"] = {sha512: base64.b64encode(piecefield.pack()).decode("utf8") for sha512, piecefield in self.storage.piecefields.items()}
743        return back
744
745    def needFile(self, inner_path, *args, **kwargs):
746        if inner_path.endswith("|all"):
747            @util.Pooled(20)
748            def pooledNeedBigfile(inner_path, *args, **kwargs):
749                if inner_path not in self.bad_files:
750                    self.log.debug("Cancelled piece, skipping %s" % inner_path)
751                    return False
752                return self.needFile(inner_path, *args, **kwargs)
753
754            inner_path = inner_path.replace("|all", "")
755            file_info = self.needFileInfo(inner_path)
756
757            # Use default function to download non-optional file
758            if "piece_size" not in file_info:
759                return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
760
761            file_size = file_info["size"]
762            piece_size = file_info["piece_size"]
763
764            piece_num = int(math.ceil(float(file_size) / piece_size))
765
766            file_threads = []
767
768            piecefield = self.storage.piecefields.get(file_info["sha512"])
769
770            for piece_i in range(piece_num):
771                piece_from = piece_i * piece_size
772                piece_to = min(file_size, piece_from + piece_size)
773                if not piecefield or not piecefield[piece_i]:
774                    inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to)
775                    self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1)
776                    res = pooledNeedBigfile(inner_path_piece, blocking=False)
777                    if res is not True and res is not False:
778                        file_threads.append(res)
779            gevent.joinall(file_threads)
780        else:
781            return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
782
783
784@PluginManager.registerTo("ConfigPlugin")
785class ConfigPlugin(object):
786    def createArguments(self):
787        group = self.parser.add_argument_group("Bigfile plugin")
788        group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=10, metavar="MB", type=int)
789        group.add_argument('--bigfile_size_limit', help='Maximum size of downloaded big files', default=False, metavar="MB", type=int)
790
791        return super(ConfigPlugin, self).createArguments()
792