1#!/usr/local/bin/python3.8 2 3# 4# This source file is part of appleseed. 5# Visit https://appleseedhq.net/ for additional information and resources. 6# 7# This software is released under the MIT license. 8# 9# Copyright (c) 2013 Francois Beaune, Jupiter Jazz Limited 10# Copyright (c) 2014-2018 Francois Beaune, The appleseedhq Organization 11# 12# Permission is hereby granted, free of charge, to any person obtaining a copy 13# of this software and associated documentation files (the "Software"), to deal 14# in the Software without restriction, including without limitation the rights 15# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 16# copies of the Software, and to permit persons to whom the Software is 17# furnished to do so, subject to the following conditions: 18# 19# The above copyright notice and this permission notice shall be included in 20# all copies or substantial portions of the Software. 21# 22# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 23# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 24# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 25# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 26# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 27# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 28# THE SOFTWARE. 29# 30 31from __future__ import division 32import argparse 33import datetime 34import glob 35import os 36import shutil 37import string 38import sys 39import time 40import traceback 41import xml.dom.minidom as xml 42 43 44# ------------------------------------------------------------------------------------------------- 45# Constants. 46# ------------------------------------------------------------------------------------------------- 47 48VERSION = "2.9" 49RENDERS_DIR = "_renders" 50ARCHIVES_DIR = "_archives" 51LOGS_DIR = "_logs" 52PAUSE_BETWEEN_UPDATES = 60 # in seconds 53MB = 1024 * 1024 54 55 56# ------------------------------------------------------------------------------------------------- 57# Utility functions. 58# ------------------------------------------------------------------------------------------------- 59 60def safe_get_file_size(filepath): 61 try: 62 return os.path.getsize(filepath) 63 except: 64 return 0 65 66 67def get_directory_size(directory): 68 size = 0 69 for dirpath, dirnames, filenames in os.walk(directory): 70 for filename in filenames: 71 filepath = os.path.join(dirpath, filename) 72 size += safe_get_file_size(filepath) 73 return size 74 75 76def get_files(directory, pattern="*"): 77 files = [] 78 for file in glob.glob(os.path.join(directory, pattern)): 79 files.append(file) 80 return files 81 82 83def safe_mkdir(dir): 84 if not os.path.exists(dir): 85 os.makedirs(dir) 86 87 88def convert_path_to_local(path): 89 if os.name == "nt": 90 return path.replace('/', '\\') 91 else: 92 return path.replace('\\', '/') 93 94 95def tail_file(f, window=20): 96 """ 97 Returns the last `window` lines of file `f` as a list. 98 Based on code from http://stackoverflow.com/a/7047765/393756. 99 """ 100 101 BUFFER_SIZE = 1024 102 103 f.seek(0, 2) 104 bytes = f.tell() 105 106 size = window + 1 107 block = -1 108 data = [] 109 110 while size > 0 and bytes > 0: 111 if bytes > BUFFER_SIZE: 112 # Seek back one whole block of size BUFFER_SIZE. 113 f.seek(block * BUFFER_SIZE, 2) 114 # Read one block. 115 data.insert(0, f.read(BUFFER_SIZE)) 116 else: 117 # File too small, start from begining. 118 f.seek(0, 0) 119 # Only read what was not read. 120 data.insert(0, f.read(bytes)) 121 122 lines_found = data[0].count('\n') 123 size -= lines_found 124 bytes -= BUFFER_SIZE 125 block -= 1 126 127 return "".join(data).splitlines()[-window:] 128 129 130def format_message(severity, msg): 131 now = datetime.datetime.now() 132 timestamp = now.strftime("%Y-%m-%d %H:%M:%S.%f") 133 padded_severity = severity.ljust(7) 134 return "\n".join("{0} mgr {1} | {2}".format(timestamp, padded_severity, line) 135 for line in msg.splitlines()) 136 137 138# ------------------------------------------------------------------------------------------------- 139# Log backend to write to the console, using colors on systems that support them. 140# ------------------------------------------------------------------------------------------------- 141 142class ConsoleBackend: 143 144 @staticmethod 145 def info(msg): 146 print("{0}".format(msg)) 147 148 @staticmethod 149 def warning(msg): 150 if ConsoleBackend.is_coloring_supported(): 151 print("\033[93m{0}\033[0m".format(msg)) 152 else: 153 print("{0}".format(msg)) 154 155 @staticmethod 156 def error(msg): 157 if ConsoleBackend.is_coloring_supported(): 158 print("\033[91m{0}\033[0m".format(msg)) 159 else: 160 print("{0}".format(msg)) 161 162 @staticmethod 163 def is_coloring_supported(): 164 return os.system == 'darwin' 165 166 167# ------------------------------------------------------------------------------------------------- 168# Log backend to write to a log file. 169# ------------------------------------------------------------------------------------------------- 170 171class LogFileBackend: 172 173 def __init__(self, path): 174 self.path = path 175 176 def write(self, msg): 177 safe_mkdir(os.path.dirname(self.path)) 178 179 with open(self.path, "a") as file: 180 file.write(msg + "\n") 181 182 183# ------------------------------------------------------------------------------------------------- 184# Log class to simultaneously write to a log file and to the console. 185# ------------------------------------------------------------------------------------------------- 186 187class Log: 188 189 def __init__(self, path): 190 self.log_file = LogFileBackend(path) 191 192 def info(self, msg): 193 formatted_msg = format_message("info", msg) 194 self.log_file.write(formatted_msg) 195 ConsoleBackend.info(formatted_msg) 196 197 def warning(self, msg): 198 formatted_msg = format_message("warning", msg) 199 self.log_file.write(formatted_msg) 200 ConsoleBackend.warning(formatted_msg) 201 202 def error(self, msg): 203 formatted_msg = format_message("error", msg) 204 self.log_file.write(formatted_msg) 205 ConsoleBackend.error(formatted_msg) 206 207 @staticmethod 208 def info_no_log(msg): 209 ConsoleBackend.info(format_message("info", msg)) 210 211 @staticmethod 212 def warning_no_log(msg): 213 ConsoleBackend.warning(format_message("warning", msg)) 214 215 @staticmethod 216 def error_no_log(msg): 217 ConsoleBackend.error(format_message("error", msg)) 218 219 220# ------------------------------------------------------------------------------------------------- 221# Dependency database. 222# ------------------------------------------------------------------------------------------------- 223 224class DependencyDB: 225 226 def __init__(self, source_directory, log): 227 self.source_directory = source_directory 228 self.log = log 229 self.roots = {} 230 231 def update(self, new_roots): 232 for root in new_roots: 233 if not root in self.roots: 234 success, deps = self.__extract_dependencies(root) 235 if success: 236 self.roots[root] = deps 237 self.log.info(" added {0}".format(root)) 238 239 updated_roots = {} 240 241 for root in self.roots: 242 if root in new_roots: 243 updated_roots[root] = self.roots[root] 244 else: 245 self.log.info(" removed {0}".format(root)) 246 247 self.roots = updated_roots 248 249 def get_all_dependencies(self): 250 deps = set() 251 for root in self.roots: 252 deps = deps.union(self.roots[root]) 253 return deps 254 255 def __extract_dependencies(self, filename): 256 try: 257 filepath = os.path.join(self.source_directory, filename) 258 259 with open(filepath, 'r') as file: 260 contents = file.read() 261 262 xmldoc = xml.parseString(contents) 263 deps = set() 264 265 for node in xmldoc.getElementsByTagName('parameter'): 266 if node.getAttribute('name') == 'filename': 267 deps.add(convert_path_to_local(node.getAttribute('value'))) 268 269 for node in xmldoc.getElementsByTagName('parameters'): 270 if node.getAttribute('name') == 'filename': 271 for child in node.childNodes: 272 if child.nodeType == xml.Node.ELEMENT_NODE: 273 deps.add(convert_path_to_local(child.getAttribute('value'))) 274 275 return True, deps 276 277 except KeyboardInterrupt, SystemExit: 278 raise 279 280 except: 281 return False, set() 282 283 284# ------------------------------------------------------------------------------------------------- 285# Management logic. 286# ------------------------------------------------------------------------------------------------- 287 288class Manager: 289 290 def __init__(self, args, log): 291 self.args = args 292 self.log = log 293 self.frames_directory = os.path.join(self.args.target_directory, RENDERS_DIR) 294 self.archives_directory = os.path.join(self.args.target_directory, ARCHIVES_DIR) 295 self.all_uploaded_dependency_db = DependencyDB(self.args.target_directory, log) 296 self.own_uploaded_dependency_db = DependencyDB(self.args.source_directory, log) 297 self.completed_dependency_db = DependencyDB(self.args.source_directory, log) 298 299 def manage(self): 300 self.compute_target_directory_size() 301 self.gather_files() 302 self.print_status() 303 if self.args.frames_directory is not None: 304 self.move_frames() 305 self.update_dependency_dbs() 306 self.remove_orphan_dependencies() 307 self.upload_project_files() 308 self.upload_missing_dependencies() 309 310 def compute_target_directory_size(self): 311 self.target_directory_size = get_directory_size(self.args.target_directory) 312 313 def gather_files(self): 314 self.log.info("gathering files...") 315 self.source_files = map(os.path.basename, get_files(self.args.source_directory, "*.appleseed")) 316 self.uploaded_files = self.gather_uploaded_files() 317 self.inprogress_files = self.gather_inprogress_files() 318 self.completed_files = map(os.path.basename, get_files(self.archives_directory, "*.appleseed")) 319 self.log.info(" found {0} source files (this shot) in {1}".format(len(self.source_files), self.args.source_directory)) 320 self.log.info(" found {0} uploaded files (all shots) in {1}".format(len(self.uploaded_files), self.args.target_directory)) 321 self.log.info(" found {0} in-progress files (all shots) in {1}".format(len(self.inprogress_files), self.args.target_directory)) 322 self.log.info(" found {0} completed files (all shots) in {1}".format(len(self.completed_files), self.archives_directory)) 323 324 def gather_uploaded_files(self): 325 return map(os.path.basename, get_files(self.args.target_directory, "*.appleseed")) 326 327 def gather_inprogress_files(self): 328 inprogress = {} 329 for filename in map(os.path.basename, get_files(self.args.target_directory, "*.appleseed.*")): 330 parts = filename.split(".") 331 assert len(parts) >= 3 332 if parts[-2] == "appleseed": 333 owner = parts[-1] 334 stripped_filename = filename[:-(1 + len(owner))] 335 inprogress.setdefault(stripped_filename, []).append(owner) 336 return inprogress 337 338 def print_status(self): 339 self.log.info("-------------------------------------------------------------------") 340 self.print_progress() 341 self.print_assignments() 342 self.print_pings() 343 self.print_target_directory_size() 344 self.log.info("-------------------------------------------------------------------") 345 346 def print_progress(self): 347 total = len(self.source_files) 348 completed = self.count_completed_frames() 349 rendering = self.count_inprogress_frames() 350 pending = self.count_pending_frames() 351 progress = 100.0 * completed / total if total > 0 else 0.0 352 self.log.info("PROGRESS: {0}/{1} completed ({2:.2f} %), {3} rendering, {4} pending" 353 .format(completed, total, progress, rendering, pending)) 354 355 def print_assignments(self): 356 assignments = {} 357 for filename in self.source_files: 358 if filename in self.inprogress_files.keys(): 359 assignments[filename] = ", ".join(self.inprogress_files[filename]) 360 if len(assignments) > 0: 361 self.log.info("frame assignments:") 362 for filename in assignments.keys(): 363 self.log.info(" {0}: {1}".format(filename, assignments[filename])) 364 else: 365 self.log.info("no frame assigned.") 366 367 def print_pings(self): 368 owners = set() 369 for filename in self.source_files: 370 if filename in self.inprogress_files.keys(): 371 for owner in self.inprogress_files[filename]: 372 owners.add(owner) 373 unsorted_pings = [(owner, self.read_ping(owner)) for owner in owners] 374 filtered_pings = [x for x in unsorted_pings if x[1] is not None] 375 pings = sorted(filtered_pings, key=lambda x: x[1]) 376 if len(pings) > 0: 377 max_owner_length = max([len(owner) for owner in owners]) 378 self.log.info("pings:") 379 for (owner, ping) in pings: 380 padding = " " * (max_owner_length + 1 - len(owner)) 381 self.log.info(" {0}:{1}{2}".format(owner, padding, self.format_ping(ping) if ping is not None else "n/a")) 382 else: 383 self.log.info("no pings.") 384 385 def read_ping(self, owner): 386 TIMESTAMP_LENGTH = 26 387 try: 388 with open(os.path.join(self.args.target_directory, LOGS_DIR, owner + ".log")) as file: 389 last_line = tail_file(file, 1)[0] 390 return datetime.datetime.strptime(last_line[:TIMESTAMP_LENGTH], "%Y-%m-%d %H:%M:%S.%f") 391 except IOError as ex: 392 return None 393 394 def format_ping(self, ping): 395 elapsed = datetime.datetime.now() - ping 396 return "{0} ago (at {1})".format(elapsed, ping) 397 398 def print_target_directory_size(self): 399 size_mb = self.target_directory_size / MB 400 max_size_mb = self.args.max_size / MB 401 full = 100.0 * size_mb / max_size_mb if max_size_mb > 0 else 100.0 402 self.log.info("size of target directory: {0:.2f}/{1} mb ({2:.2f} % full)" 403 .format(size_mb, max_size_mb, full)) 404 405 def count_completed_frames(self): 406 return sum(1 for filename in self.source_files if filename in self.completed_files) 407 408 def count_inprogress_frames(self): 409 return sum(1 for filename in self.source_files if filename in self.inprogress_files) 410 411 def count_pending_frames(self): 412 return sum(1 for filename in self.source_files 413 if not filename in self.completed_files and not filename in self.inprogress_files) 414 415 def move_frames(self): 416 self.log.info("moving frames...") 417 for filepath in get_files(self.frames_directory): 418 self.move_frame(filepath) 419 420 def move_frame(self, source_filepath): 421 filename = os.path.basename(source_filepath) 422 dest_filepath = os.path.join(self.args.frames_directory, filename) 423 self.log.info(" moving {0}".format(filename)) 424 safe_mkdir(self.args.frames_directory) 425 shutil.move(source_filepath, dest_filepath) 426 427 def update_dependency_dbs(self): 428 self.update_uploaded_dependency_db() 429 self.update_completed_dependency_db() 430 431 def update_uploaded_dependency_db(self): 432 self.log.info("updating dependency database of uploaded and in-progress files (all shots)...") 433 all_roots = map(os.path.basename, get_files(self.args.target_directory, "*.appleseed*")) 434 self.all_uploaded_dependency_db.update(all_roots) 435 436 self.log.info("updating dependency database of uploaded files (this shot)...") 437 own_roots = [filename for filename in self.source_files 438 if filename in self.inprogress_files or filename in self.uploaded_files] 439 self.own_uploaded_dependency_db.update(own_roots) 440 441 def update_completed_dependency_db(self): 442 self.log.info("updating dependency database of completed files (this shot)...") 443 roots = [filename for filename in self.source_files if filename in self.completed_files] 444 self.completed_dependency_db.update(roots) 445 446 def remove_orphan_dependencies(self): 447 self.log.info("removing orphan dependencies...") 448 removed = 0 449 all_uploaded_files_dependencies = self.all_uploaded_dependency_db.get_all_dependencies() 450 for dep in self.completed_dependency_db.get_all_dependencies(): 451 if not dep in all_uploaded_files_dependencies: 452 count = self.remove_file(dep) 453 if count > 0: 454 self.log.info(" removed {0}".format(dep)) 455 removed += count 456 if removed > 0: 457 self.log.info(" removed {0} dependencies".format(removed)) 458 459 def upload_project_files(self): 460 self.log.info("uploading project files...") 461 for filename in self.source_files: 462 if not filename in self.inprogress_files and not filename in self.completed_files: 463 if self.upload_file(filename) > 0: 464 self.log.info(" uploaded {0}".format(filename)) 465 self.uploaded_files = self.gather_uploaded_files() 466 self.update_uploaded_dependency_db() 467 self.upload_missing_dependencies() 468 469 def upload_missing_dependencies(self): 470 self.log.info("uploading missing dependencies...") 471 uploaded = 0 472 for dep in self.own_uploaded_dependency_db.get_all_dependencies(): 473 count = self.upload_file(dep) 474 if count > 0: 475 self.log.info(" uploaded {0}".format(dep)) 476 uploaded += count 477 if uploaded > 0: 478 self.log.info(" uploaded {0} dependencies".format(uploaded)) 479 480 def remove_file(self, filename): 481 filepath = os.path.join(self.args.target_directory, filename) 482 if not os.path.isfile(filepath): 483 return 0 484 485 try: 486 filesize = safe_get_file_size(filepath) 487 os.remove(filepath) 488 self.target_directory_size = max(self.target_directory_size - filesize, 0) 489 return 1 490 except IOError as ex: 491 self.log.error(" could not remove {0}: {1}".format(filepath, ex.strerror)) 492 return 0 493 494 def upload_file(self, filename): 495 dest_filepath = os.path.join(self.args.target_directory, filename) 496 if os.path.isfile(dest_filepath): 497 return 0 498 499 source_filepath = os.path.join(self.args.source_directory, filename) 500 filesize = safe_get_file_size(source_filepath) 501 if self.target_directory_size + filesize > self.args.max_size: 502 return 0 503 504 try: 505 safe_mkdir(os.path.dirname(dest_filepath)) 506 shutil.copyfile(source_filepath, dest_filepath) 507 self.target_directory_size += filesize 508 return 1 509 except IOError as ex: 510 self.log.error(" could not upload {0}: {1}".format(source_filepath, ex.strerror)) 511 return 0 512 513 514# ------------------------------------------------------------------------------------------------- 515# Entry point. 516# ------------------------------------------------------------------------------------------------- 517 518def main(): 519 # Parse the command line. 520 parser = argparse.ArgumentParser(description="send a shot to a folder being watched by " 521 "appleseed render nodes.") 522 parser.add_argument("-s", "--max-size", metavar="MB", 523 help="set the maximum allowed size in mb of the target directory " 524 "(default is 1 terabyte)") 525 parser.add_argument("--source", metavar="source-directory", dest="source_directory", 526 required=True, help="directory containing the source shot data") 527 parser.add_argument("--target", metavar="target-directory", dest="target_directory", 528 required=True, help="directory being watched by render nodes") 529 parser.add_argument("--frames", metavar="frames-directory", dest="frames_directory", 530 help="directory where the rendered frames should be stored") 531 args = parser.parse_args() 532 533 if args.max_size is None: 534 args.max_size = 2 ** 40 # default to 1 terabyte 535 else: 536 args.max_size = long(args.max_size) 537 args.max_size *= MB # convert to bytes 538 539 # Start the log. 540 log = Log(os.path.join(args.target_directory, LOGS_DIR, "rendermanager.log")) 541 log.info("--- starting logging ---") 542 log.info("running rendermanager.py version {0}.".format(VERSION)) 543 544 manager = Manager(args, log) 545 546 # Main management loop. 547 try: 548 while True: 549 try: 550 manager.manage() 551 except KeyboardInterrupt, SystemExit: 552 raise 553 except: 554 exc_type, exc_value, exc_traceback = sys.exc_info() 555 lines = traceback.format_exception(exc_type, exc_value, exc_traceback) 556 log.error("".join(line for line in lines)) 557 558 log.info_no_log("waiting {0} seconds...".format(PAUSE_BETWEEN_UPDATES)) 559 time.sleep(PAUSE_BETWEEN_UPDATES) 560 561 except KeyboardInterrupt, SystemExit: 562 pass 563 564 log.info("exiting...") 565 566 567if __name__ == "__main__": 568 main() 569