1#!/usr/local/bin/python3.8
2
3#
4# This source file is part of appleseed.
5# Visit https://appleseedhq.net/ for additional information and resources.
6#
7# This software is released under the MIT license.
8#
9# Copyright (c) 2013 Francois Beaune, Jupiter Jazz Limited
10# Copyright (c) 2014-2018 Francois Beaune, The appleseedhq Organization
11#
12# Permission is hereby granted, free of charge, to any person obtaining a copy
13# of this software and associated documentation files (the "Software"), to deal
14# in the Software without restriction, including without limitation the rights
15# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16# copies of the Software, and to permit persons to whom the Software is
17# furnished to do so, subject to the following conditions:
18#
19# The above copyright notice and this permission notice shall be included in
20# all copies or substantial portions of the Software.
21#
22# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
28# THE SOFTWARE.
29#
30
31from __future__ import division
32import argparse
33import datetime
34import glob
35import os
36import shutil
37import string
38import sys
39import time
40import traceback
41import xml.dom.minidom as xml
42
43
44# -------------------------------------------------------------------------------------------------
45# Constants.
46# -------------------------------------------------------------------------------------------------
47
48VERSION = "2.9"
49RENDERS_DIR = "_renders"
50ARCHIVES_DIR = "_archives"
51LOGS_DIR = "_logs"
52PAUSE_BETWEEN_UPDATES = 60   # in seconds
53MB = 1024 * 1024
54
55
56# -------------------------------------------------------------------------------------------------
57# Utility functions.
58# -------------------------------------------------------------------------------------------------
59
60def safe_get_file_size(filepath):
61    try:
62        return os.path.getsize(filepath)
63    except:
64        return 0
65
66
67def get_directory_size(directory):
68    size = 0
69    for dirpath, dirnames, filenames in os.walk(directory):
70        for filename in filenames:
71            filepath = os.path.join(dirpath, filename)
72            size += safe_get_file_size(filepath)
73    return size
74
75
76def get_files(directory, pattern="*"):
77    files = []
78    for file in glob.glob(os.path.join(directory, pattern)):
79        files.append(file)
80    return files
81
82
83def safe_mkdir(dir):
84    if not os.path.exists(dir):
85        os.makedirs(dir)
86
87
88def convert_path_to_local(path):
89    if os.name == "nt":
90        return path.replace('/', '\\')
91    else:
92        return path.replace('\\', '/')
93
94
95def tail_file(f, window=20):
96    """
97    Returns the last `window` lines of file `f` as a list.
98    Based on code from http://stackoverflow.com/a/7047765/393756.
99    """
100
101    BUFFER_SIZE = 1024
102
103    f.seek(0, 2)
104    bytes = f.tell()
105
106    size = window + 1
107    block = -1
108    data = []
109
110    while size > 0 and bytes > 0:
111        if bytes > BUFFER_SIZE:
112            # Seek back one whole block of size BUFFER_SIZE.
113            f.seek(block * BUFFER_SIZE, 2)
114            # Read one block.
115            data.insert(0, f.read(BUFFER_SIZE))
116        else:
117            # File too small, start from begining.
118            f.seek(0, 0)
119            # Only read what was not read.
120            data.insert(0, f.read(bytes))
121
122        lines_found = data[0].count('\n')
123        size -= lines_found
124        bytes -= BUFFER_SIZE
125        block -= 1
126
127    return "".join(data).splitlines()[-window:]
128
129
130def format_message(severity, msg):
131    now = datetime.datetime.now()
132    timestamp = now.strftime("%Y-%m-%d %H:%M:%S.%f")
133    padded_severity = severity.ljust(7)
134    return "\n".join("{0} mgr   {1} | {2}".format(timestamp, padded_severity, line)
135                     for line in msg.splitlines())
136
137
138# -------------------------------------------------------------------------------------------------
139# Log backend to write to the console, using colors on systems that support them.
140# -------------------------------------------------------------------------------------------------
141
142class ConsoleBackend:
143
144    @staticmethod
145    def info(msg):
146        print("{0}".format(msg))
147
148    @staticmethod
149    def warning(msg):
150        if ConsoleBackend.is_coloring_supported():
151            print("\033[93m{0}\033[0m".format(msg))
152        else:
153            print("{0}".format(msg))
154
155    @staticmethod
156    def error(msg):
157        if ConsoleBackend.is_coloring_supported():
158            print("\033[91m{0}\033[0m".format(msg))
159        else:
160            print("{0}".format(msg))
161
162    @staticmethod
163    def is_coloring_supported():
164        return os.system == 'darwin'
165
166
167# -------------------------------------------------------------------------------------------------
168# Log backend to write to a log file.
169# -------------------------------------------------------------------------------------------------
170
171class LogFileBackend:
172
173    def __init__(self, path):
174        self.path = path
175
176    def write(self, msg):
177        safe_mkdir(os.path.dirname(self.path))
178
179        with open(self.path, "a") as file:
180            file.write(msg + "\n")
181
182
183# -------------------------------------------------------------------------------------------------
184# Log class to simultaneously write to a log file and to the console.
185# -------------------------------------------------------------------------------------------------
186
187class Log:
188
189    def __init__(self, path):
190        self.log_file = LogFileBackend(path)
191
192    def info(self, msg):
193        formatted_msg = format_message("info", msg)
194        self.log_file.write(formatted_msg)
195        ConsoleBackend.info(formatted_msg)
196
197    def warning(self, msg):
198        formatted_msg = format_message("warning", msg)
199        self.log_file.write(formatted_msg)
200        ConsoleBackend.warning(formatted_msg)
201
202    def error(self, msg):
203        formatted_msg = format_message("error", msg)
204        self.log_file.write(formatted_msg)
205        ConsoleBackend.error(formatted_msg)
206
207    @staticmethod
208    def info_no_log(msg):
209        ConsoleBackend.info(format_message("info", msg))
210
211    @staticmethod
212    def warning_no_log(msg):
213        ConsoleBackend.warning(format_message("warning", msg))
214
215    @staticmethod
216    def error_no_log(msg):
217        ConsoleBackend.error(format_message("error", msg))
218
219
220# -------------------------------------------------------------------------------------------------
221# Dependency database.
222# -------------------------------------------------------------------------------------------------
223
224class DependencyDB:
225
226    def __init__(self, source_directory, log):
227        self.source_directory = source_directory
228        self.log = log
229        self.roots = {}
230
231    def update(self, new_roots):
232        for root in new_roots:
233            if not root in self.roots:
234                success, deps = self.__extract_dependencies(root)
235                if success:
236                    self.roots[root] = deps
237                    self.log.info("  added {0}".format(root))
238
239        updated_roots = {}
240
241        for root in self.roots:
242            if root in new_roots:
243                updated_roots[root] = self.roots[root]
244            else:
245                self.log.info("  removed {0}".format(root))
246
247        self.roots = updated_roots
248
249    def get_all_dependencies(self):
250        deps = set()
251        for root in self.roots:
252            deps = deps.union(self.roots[root])
253        return deps
254
255    def __extract_dependencies(self, filename):
256        try:
257            filepath = os.path.join(self.source_directory, filename)
258
259            with open(filepath, 'r') as file:
260                contents = file.read()
261
262            xmldoc = xml.parseString(contents)
263            deps = set()
264
265            for node in xmldoc.getElementsByTagName('parameter'):
266                if node.getAttribute('name') == 'filename':
267                    deps.add(convert_path_to_local(node.getAttribute('value')))
268
269            for node in xmldoc.getElementsByTagName('parameters'):
270                if node.getAttribute('name') == 'filename':
271                    for child in node.childNodes:
272                        if child.nodeType == xml.Node.ELEMENT_NODE:
273                            deps.add(convert_path_to_local(child.getAttribute('value')))
274
275            return True, deps
276
277        except KeyboardInterrupt, SystemExit:
278            raise
279
280        except:
281            return False, set()
282
283
284# -------------------------------------------------------------------------------------------------
285# Management logic.
286# -------------------------------------------------------------------------------------------------
287
288class Manager:
289
290    def __init__(self, args, log):
291        self.args = args
292        self.log = log
293        self.frames_directory = os.path.join(self.args.target_directory, RENDERS_DIR)
294        self.archives_directory = os.path.join(self.args.target_directory, ARCHIVES_DIR)
295        self.all_uploaded_dependency_db = DependencyDB(self.args.target_directory, log)
296        self.own_uploaded_dependency_db = DependencyDB(self.args.source_directory, log)
297        self.completed_dependency_db = DependencyDB(self.args.source_directory, log)
298
299    def manage(self):
300        self.compute_target_directory_size()
301        self.gather_files()
302        self.print_status()
303        if self.args.frames_directory is not None:
304            self.move_frames()
305        self.update_dependency_dbs()
306        self.remove_orphan_dependencies()
307        self.upload_project_files()
308        self.upload_missing_dependencies()
309
310    def compute_target_directory_size(self):
311        self.target_directory_size = get_directory_size(self.args.target_directory)
312
313    def gather_files(self):
314        self.log.info("gathering files...")
315        self.source_files = map(os.path.basename, get_files(self.args.source_directory, "*.appleseed"))
316        self.uploaded_files = self.gather_uploaded_files()
317        self.inprogress_files = self.gather_inprogress_files()
318        self.completed_files = map(os.path.basename, get_files(self.archives_directory, "*.appleseed"))
319        self.log.info("  found {0} source files (this shot) in {1}".format(len(self.source_files), self.args.source_directory))
320        self.log.info("  found {0} uploaded files (all shots) in {1}".format(len(self.uploaded_files), self.args.target_directory))
321        self.log.info("  found {0} in-progress files (all shots) in {1}".format(len(self.inprogress_files), self.args.target_directory))
322        self.log.info("  found {0} completed files (all shots) in {1}".format(len(self.completed_files), self.archives_directory))
323
324    def gather_uploaded_files(self):
325        return map(os.path.basename, get_files(self.args.target_directory, "*.appleseed"))
326
327    def gather_inprogress_files(self):
328        inprogress = {}
329        for filename in map(os.path.basename, get_files(self.args.target_directory, "*.appleseed.*")):
330            parts = filename.split(".")
331            assert len(parts) >= 3
332            if parts[-2] == "appleseed":
333                owner = parts[-1]
334                stripped_filename = filename[:-(1 + len(owner))]
335                inprogress.setdefault(stripped_filename, []).append(owner)
336        return inprogress
337
338    def print_status(self):
339        self.log.info("-------------------------------------------------------------------")
340        self.print_progress()
341        self.print_assignments()
342        self.print_pings()
343        self.print_target_directory_size()
344        self.log.info("-------------------------------------------------------------------")
345
346    def print_progress(self):
347        total = len(self.source_files)
348        completed = self.count_completed_frames()
349        rendering = self.count_inprogress_frames()
350        pending = self.count_pending_frames()
351        progress = 100.0 * completed / total if total > 0 else 0.0
352        self.log.info("PROGRESS: {0}/{1} completed ({2:.2f} %), {3} rendering, {4} pending"
353                      .format(completed, total, progress, rendering, pending))
354
355    def print_assignments(self):
356        assignments = {}
357        for filename in self.source_files:
358            if filename in self.inprogress_files.keys():
359                assignments[filename] = ", ".join(self.inprogress_files[filename])
360        if len(assignments) > 0:
361            self.log.info("frame assignments:")
362            for filename in assignments.keys():
363                self.log.info("  {0}: {1}".format(filename, assignments[filename]))
364        else:
365            self.log.info("no frame assigned.")
366
367    def print_pings(self):
368        owners = set()
369        for filename in self.source_files:
370            if filename in self.inprogress_files.keys():
371                for owner in self.inprogress_files[filename]:
372                    owners.add(owner)
373        unsorted_pings = [(owner, self.read_ping(owner)) for owner in owners]
374        filtered_pings = [x for x in unsorted_pings if x[1] is not None]
375        pings = sorted(filtered_pings, key=lambda x: x[1])
376        if len(pings) > 0:
377            max_owner_length = max([len(owner) for owner in owners])
378            self.log.info("pings:")
379            for (owner, ping) in pings:
380                padding = " " * (max_owner_length + 1 - len(owner))
381                self.log.info("  {0}:{1}{2}".format(owner, padding, self.format_ping(ping) if ping is not None else "n/a"))
382        else:
383            self.log.info("no pings.")
384
385    def read_ping(self, owner):
386        TIMESTAMP_LENGTH = 26
387        try:
388            with open(os.path.join(self.args.target_directory, LOGS_DIR, owner + ".log")) as file:
389                last_line = tail_file(file, 1)[0]
390                return datetime.datetime.strptime(last_line[:TIMESTAMP_LENGTH], "%Y-%m-%d %H:%M:%S.%f")
391        except IOError as ex:
392            return None
393
394    def format_ping(self, ping):
395        elapsed = datetime.datetime.now() - ping
396        return "{0} ago (at {1})".format(elapsed, ping)
397
398    def print_target_directory_size(self):
399        size_mb = self.target_directory_size / MB
400        max_size_mb = self.args.max_size / MB
401        full = 100.0 * size_mb / max_size_mb if max_size_mb > 0 else 100.0
402        self.log.info("size of target directory: {0:.2f}/{1} mb ({2:.2f} % full)"
403                      .format(size_mb, max_size_mb, full))
404
405    def count_completed_frames(self):
406        return sum(1 for filename in self.source_files if filename in self.completed_files)
407
408    def count_inprogress_frames(self):
409        return sum(1 for filename in self.source_files if filename in self.inprogress_files)
410
411    def count_pending_frames(self):
412        return sum(1 for filename in self.source_files
413                   if not filename in self.completed_files and not filename in self.inprogress_files)
414
415    def move_frames(self):
416        self.log.info("moving frames...")
417        for filepath in get_files(self.frames_directory):
418            self.move_frame(filepath)
419
420    def move_frame(self, source_filepath):
421        filename = os.path.basename(source_filepath)
422        dest_filepath = os.path.join(self.args.frames_directory, filename)
423        self.log.info("  moving {0}".format(filename))
424        safe_mkdir(self.args.frames_directory)
425        shutil.move(source_filepath, dest_filepath)
426
427    def update_dependency_dbs(self):
428        self.update_uploaded_dependency_db()
429        self.update_completed_dependency_db()
430
431    def update_uploaded_dependency_db(self):
432        self.log.info("updating dependency database of uploaded and in-progress files (all shots)...")
433        all_roots = map(os.path.basename, get_files(self.args.target_directory, "*.appleseed*"))
434        self.all_uploaded_dependency_db.update(all_roots)
435
436        self.log.info("updating dependency database of uploaded files (this shot)...")
437        own_roots = [filename for filename in self.source_files
438                     if filename in self.inprogress_files or filename in self.uploaded_files]
439        self.own_uploaded_dependency_db.update(own_roots)
440
441    def update_completed_dependency_db(self):
442        self.log.info("updating dependency database of completed files (this shot)...")
443        roots = [filename for filename in self.source_files if filename in self.completed_files]
444        self.completed_dependency_db.update(roots)
445
446    def remove_orphan_dependencies(self):
447        self.log.info("removing orphan dependencies...")
448        removed = 0
449        all_uploaded_files_dependencies = self.all_uploaded_dependency_db.get_all_dependencies()
450        for dep in self.completed_dependency_db.get_all_dependencies():
451            if not dep in all_uploaded_files_dependencies:
452                count = self.remove_file(dep)
453                if count > 0:
454                    self.log.info("  removed {0}".format(dep))
455                removed += count
456        if removed > 0:
457            self.log.info("  removed {0} dependencies".format(removed))
458
459    def upload_project_files(self):
460        self.log.info("uploading project files...")
461        for filename in self.source_files:
462            if not filename in self.inprogress_files and not filename in self.completed_files:
463                if self.upload_file(filename) > 0:
464                    self.log.info("  uploaded {0}".format(filename))
465                    self.uploaded_files = self.gather_uploaded_files()
466                    self.update_uploaded_dependency_db()
467                    self.upload_missing_dependencies()
468
469    def upload_missing_dependencies(self):
470        self.log.info("uploading missing dependencies...")
471        uploaded = 0
472        for dep in self.own_uploaded_dependency_db.get_all_dependencies():
473            count = self.upload_file(dep)
474            if count > 0:
475                self.log.info("  uploaded {0}".format(dep))
476            uploaded += count
477        if uploaded > 0:
478            self.log.info("  uploaded {0} dependencies".format(uploaded))
479
480    def remove_file(self, filename):
481        filepath = os.path.join(self.args.target_directory, filename)
482        if not os.path.isfile(filepath):
483            return 0
484
485        try:
486            filesize = safe_get_file_size(filepath)
487            os.remove(filepath)
488            self.target_directory_size = max(self.target_directory_size - filesize, 0)
489            return 1
490        except IOError as ex:
491            self.log.error("  could not remove {0}: {1}".format(filepath, ex.strerror))
492            return 0
493
494    def upload_file(self, filename):
495        dest_filepath = os.path.join(self.args.target_directory, filename)
496        if os.path.isfile(dest_filepath):
497            return 0
498
499        source_filepath = os.path.join(self.args.source_directory, filename)
500        filesize = safe_get_file_size(source_filepath)
501        if self.target_directory_size + filesize > self.args.max_size:
502            return 0
503
504        try:
505            safe_mkdir(os.path.dirname(dest_filepath))
506            shutil.copyfile(source_filepath, dest_filepath)
507            self.target_directory_size += filesize
508            return 1
509        except IOError as ex:
510            self.log.error("  could not upload {0}: {1}".format(source_filepath, ex.strerror))
511            return 0
512
513
514# -------------------------------------------------------------------------------------------------
515# Entry point.
516# -------------------------------------------------------------------------------------------------
517
518def main():
519    # Parse the command line.
520    parser = argparse.ArgumentParser(description="send a shot to a folder being watched by "
521                                     "appleseed render nodes.")
522    parser.add_argument("-s", "--max-size", metavar="MB",
523                        help="set the maximum allowed size in mb of the target directory "
524                        "(default is 1 terabyte)")
525    parser.add_argument("--source", metavar="source-directory", dest="source_directory",
526                        required=True, help="directory containing the source shot data")
527    parser.add_argument("--target", metavar="target-directory", dest="target_directory",
528                        required=True, help="directory being watched by render nodes")
529    parser.add_argument("--frames", metavar="frames-directory", dest="frames_directory",
530                        help="directory where the rendered frames should be stored")
531    args = parser.parse_args()
532
533    if args.max_size is None:
534        args.max_size = 2 ** 40                 # default to 1 terabyte
535    else:
536        args.max_size = long(args.max_size)
537        args.max_size *= MB                     # convert to bytes
538
539    # Start the log.
540    log = Log(os.path.join(args.target_directory, LOGS_DIR, "rendermanager.log"))
541    log.info("--- starting logging ---")
542    log.info("running rendermanager.py version {0}.".format(VERSION))
543
544    manager = Manager(args, log)
545
546    # Main management loop.
547    try:
548        while True:
549            try:
550                manager.manage()
551            except KeyboardInterrupt, SystemExit:
552                raise
553            except:
554                exc_type, exc_value, exc_traceback = sys.exc_info()
555                lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
556                log.error("".join(line for line in lines))
557
558            log.info_no_log("waiting {0} seconds...".format(PAUSE_BETWEEN_UPDATES))
559            time.sleep(PAUSE_BETWEEN_UPDATES)
560
561    except KeyboardInterrupt, SystemExit:
562        pass
563
564    log.info("exiting...")
565
566
567if __name__ == "__main__":
568    main()
569