1# Copyright (C) 2015-2020 Damon Lynch <damonlynch@gmail.com> 2 3# This file is part of Rapid Photo Downloader. 4# 5# Rapid Photo Downloader is free software: you can redistribute it and/or 6# modify it under the terms of the GNU General Public License as published by 7# the Free Software Foundation, either version 3 of the License, or 8# (at your option) any later version. 9# 10# Rapid Photo Downloader is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with Rapid Photo Downloader. If not, 17# see <http://www.gnu.org/licenses/>. 18 19__author__ = 'Damon Lynch' 20__copyright__ = "Copyright 2015-2020, Damon Lynch" 21 22import argparse 23import sys 24import logging 25import pickle 26import os 27import shlex 28import time 29from collections import deque, namedtuple 30from typing import Optional, Set, List, Dict, Sequence, Any, Tuple, Union 31 32 33import psutil 34 35from PyQt5.QtCore import (pyqtSignal, QObject, pyqtSlot) 36from PyQt5.QtGui import (QPixmap, QImage) 37 38import zmq 39import zmq.log.handlers 40if zmq.pyzmq_version_info()[0] < 17: 41 from zmq.eventloop import ioloop 42else: 43 try: 44 from tornado import ioloop 45 except ImportError: 46 from zmq.eventloop import ioloop # note: deprecated in pyzmq 17.0.0 47 48from zmq.eventloop.zmqstream import ZMQStream 49 50from raphodo.rpdfile import RPDFile, FileTypeCounter, FileSizeSum, Photo, Video 51from raphodo.devices import Device 52from raphodo.utilities import CacheDirs, set_pdeathsig 53from raphodo.constants import ( 54 RenameAndMoveStatus, ExtractionTask, ExtractionProcessing, CameraErrorCode, FileType, 55 FileExtension, BackupStatus 56) 57from raphodo.proximity import TemporalProximityGroups 58from raphodo.storage import StorageSpace 59from raphodo.iplogging import ZeroMQSocketHandler 60from raphodo.viewutils import ThumbnailDataForProximity 61from raphodo.folderspreview import DownloadDestination, FoldersPreview 62from raphodo.problemnotification import ( 63 ScanProblems, CopyingProblems, RenamingProblems, BackingUpProblems 64) 65 66logger = logging.getLogger() 67 68 69def make_filter_from_worker_id(worker_id: Union[int, str]) -> bytes: 70 r""" 71 Returns a python byte string from an integer or string 72 73 >>> make_filter_from_worker_id(54) 74 b'54' 75 76 >>> make_filter_from_worker_id('54') 77 b'54' 78 """ 79 if isinstance(worker_id, int): 80 return str(worker_id).encode() 81 if isinstance(worker_id, str): 82 return worker_id.encode() 83 raise(TypeError) 84 85 86def create_identity(worker_type: str, identity: str) -> bytes: 87 r"""Generate identity for a worker's 0mq socket. 88 89 >>> create_identity('Worker', '1') 90 b'Worker-1' 91 >>> create_identity('Thumbnail Extractor', '2') 92 b'Thumbnail-Extractor-2' 93 >>> create_identity('Thumbnail Extractor Plus', '22 2') 94 b'Thumbnail-Extractor-Plus-22-2' 95 """ 96 97 # Replace any whitespace in the strings with a hyphen 98 return '{}-{}'.format('-'.join(worker_type.split()), '-'.join(identity.split())).encode() 99 100 101def get_worker_id_from_identity(identity: bytes) -> int: 102 r"""Extract worker id from the identity used in a 0mq socket 103 104 >>> get_worker_id_from_identity(b'Worker-1') 105 1 106 >>> get_worker_id_from_identity(b'Thumbnail-Extractor-2') 107 2 108 >>> get_worker_id_from_identity(b'Thumbnail-Extractor-Plus-22-2') 109 2 110 """ 111 return int(identity.decode().split('-')[-1]) 112 113 114def create_inproc_msg(cmd: bytes, 115 worker_id: Optional[int]=None, 116 data: Optional[Any]=None) -> List[bytes]: 117 """ 118 Create a list of three values to be sent via a PAIR socket 119 between main and child threads using 0MQ. 120 """ 121 122 if worker_id is not None: 123 worker_id = make_filter_from_worker_id(worker_id) 124 else: 125 worker_id = b'' 126 127 if data is None: 128 data = b'' 129 else: 130 data = pickle.dumps(data, pickle.HIGHEST_PROTOCOL) 131 132 return [cmd, worker_id, data] 133 134 135class ThreadNames: 136 rename = 'rename' 137 scan = 'scan' 138 copy = 'copy' 139 backup = 'backup' 140 thumbnail_daemon = 'thumbnail_daemon' 141 thumbnailer = 'thumbnailer' 142 offload = 'offload' 143 logger = 'logger' 144 load_balancer = 'load_balancer' 145 new_version = 'new_version' 146 147 148class ProcessManager: 149 def __init__(self, logging_port: int, 150 thread_name: str) -> None: 151 152 super().__init__() 153 154 self.logging_port = logging_port 155 156 self.processes = {} # type: Dict[int, psutil.Process] 157 self._process_to_run = '' # Implement in subclass 158 159 self.thread_name = thread_name 160 161 # Monitor which workers we have running 162 self.workers = [] # type: List[int] 163 164 def _get_cmd(self) -> str: 165 return '{} {}'.format( 166 sys.executable, os.path.join( 167 os.path.abspath(os.path.dirname(__file__)), self._process_to_run 168 ) 169 ) 170 171 def _get_command_line(self, worker_id: int) -> str: 172 """ 173 Implement in subclass 174 """ 175 return '' 176 177 def add_worker(self, worker_id: int) -> None: 178 179 command_line = self._get_command_line(worker_id) 180 args = shlex.split(command_line) 181 182 # run command immediately, without waiting a reply, and instruct the Linux 183 # kernel to send a terminate signal should this process unexpectedly die 184 try: 185 proc = psutil.Popen(args, preexec_fn=set_pdeathsig()) 186 except OSError as e: 187 logging.critical("Failed to start process: %s", command_line) 188 logging.critical('OSError [Errno %s]: %s', e.errno, e.strerror) 189 if e.errno == 8: 190 logging.critical( 191 "Script shebang line might be malformed or missing: %s", self._get_cmd() 192 ) 193 sys.exit(1) 194 logging.debug("Started '%s' with pid %s", command_line, proc.pid) 195 196 # Add to list of running workers 197 self.workers.append(worker_id) 198 self.processes[worker_id] = proc 199 200 def forcefully_terminate(self) -> None: 201 """ 202 Forcefully terminate any running child processes. 203 """ 204 205 zombie_processes = [ 206 p for p in self.processes.values() 207 if p.is_running() and p.status() == psutil.STATUS_ZOMBIE 208 ] 209 running_processes = [ 210 p for p in self.processes.values() 211 if p.is_running() and p.status() != psutil.STATUS_ZOMBIE 212 ] 213 if hasattr(self, '_process_name'): 214 logging.debug( 215 "Forcefully terminating processes for %s: %s zombies, %s running.", 216 self._process_name, len(zombie_processes), len(running_processes) 217 ) 218 219 for p in zombie_processes: # type: psutil.Process 220 try: 221 logging.debug("Killing zombie process %s with pid %s", p.name(), p.pid) 222 p.kill() 223 except: 224 logging.error("Failed to kill process with pid %s", p.pid) 225 for p in running_processes: # type: psutil.Process 226 try: 227 logging.debug("Terminating process %s with pid %s", p.name(), p.pid) 228 p.terminate() 229 except: 230 logging.error("Terminating process with pid %s failed", p.pid) 231 gone, alive = psutil.wait_procs(running_processes, timeout=2) 232 for p in alive: 233 try: 234 logging.debug("Killing zombie process %s with pid %s", p.name(), p.pid) 235 p.kill() 236 except: 237 logging.error("Failed to kill process with pid %s", p.pid) 238 239 def process_alive(self, worker_id: int) -> bool: 240 """ 241 Process IDs are reused by the system. Check to make sure 242 a new process has not been created with the same process id. 243 244 :param worker_id: the process to check 245 :return True if the process is the same, False otherwise 246 """ 247 248 return self.processes[worker_id].is_running() 249 250 251class PullPipelineManager(ProcessManager, QObject): 252 """ 253 Base class from which more specialized 0MQ classes are derived. 254 255 Receives data into its sink via a ZMQ PULL socket, but does not 256 specify how workers should be sent data. 257 258 Outputs signals using Qt. 259 """ 260 261 message = pyqtSignal(str) # Derived class will change this 262 sinkStarted = pyqtSignal() 263 workerFinished = pyqtSignal(int) 264 workerStopped = pyqtSignal(int) 265 receiverPortSignal = pyqtSignal(int) 266 267 def __init__(self, logging_port: int, 268 thread_name: str) -> None: 269 super().__init__(logging_port=logging_port, thread_name=thread_name) 270 271 def _start_sockets(self) -> None: 272 273 context = zmq.Context.instance() 274 275 # Subclasses must define the type of port they need to send messages 276 self.ventilator_socket = None 277 self.ventilator_port = None 278 279 # Sink socket to receive results of the workers 280 self.receiver_socket = context.socket(zmq.PULL) 281 self.receiver_port = self.receiver_socket.bind_to_random_port('tcp://*') 282 283 # Socket to communicate directly with the sink, bypassing the workers 284 self.terminate_socket = context.socket(zmq.PUSH) 285 self.terminate_socket.connect("tcp://localhost:{}".format(self.receiver_port)) 286 287 # Socket to receive commands from main thread 288 self.thread_controller = context.socket(zmq.PAIR) 289 self.thread_controller.connect('inproc://{}'.format(self.thread_name)) 290 291 self.terminating = False 292 293 @pyqtSlot() 294 def run_sink(self) -> None: 295 logging.debug("Running sink for %s", self._process_name) 296 297 self._start_sockets() 298 299 poller = zmq.Poller() 300 poller.register(self.receiver_socket, zmq.POLLIN) 301 poller.register(self.thread_controller, zmq.POLLIN) 302 303 self.receiverPortSignal.emit(self.receiver_port) 304 self.sinkStarted.emit() 305 306 while True: 307 try: 308 socks = dict(poller.poll()) 309 except KeyboardInterrupt: 310 break 311 if self.receiver_socket in socks: 312 # Receive messages from the workers 313 # (or the terminate socket) 314 worker_id, directive, content = self.receiver_socket.recv_multipart() 315 316 if directive == b'cmd': 317 command = content 318 assert command in (b"STOPPED", b"FINISHED", b"KILL") 319 if command == b"KILL": 320 # Terminate immediately, without regard for any 321 # incoming messages. This message is only sent 322 # from this manager to itself, using the 323 # self.terminate_socket 324 logging.debug("{} is terminating".format(self._process_name)) 325 break 326 # This worker is done; remove from monitored workers and 327 # continue 328 worker_id = int(worker_id) 329 if command == b"STOPPED": 330 logging.debug("%s worker %s has stopped", self._process_name, worker_id) 331 self.workerStopped.emit(worker_id) 332 else: 333 # Worker has finished its work 334 self.workerFinished.emit(worker_id) 335 self.workers.remove(worker_id) 336 del self.processes[worker_id] 337 if not self.workers: 338 logging.debug("{} currently has no workers".format(self._process_name)) 339 if not self.workers and self.terminating: 340 logging.debug("{} is exiting".format(self._process_name)) 341 break 342 else: 343 assert directive == b'data' 344 self.content = content 345 self.process_sink_data() 346 347 if self.thread_controller in socks: 348 # Receive messages from the main Rapid Photo Downloader thread 349 self.process_thread_directive() 350 351 def process_thread_directive(self) -> None: 352 directive, worker_id, data = self.thread_controller.recv_multipart() 353 354 # Directives: START, STOP, TERMINATE, SEND_TO_WORKER, STOP_WORKER, START_WORKER 355 if directive == b'START': 356 self.start() 357 elif directive == b'START_WORKER': 358 self.start_worker(worker_id=worker_id, data=data) 359 elif directive == b'SEND_TO_WORKER': 360 self.send_message_to_worker(worker_id=worker_id, data=data) 361 elif directive == b'STOP': 362 self.stop() 363 elif directive == b'STOP_WORKER': 364 self.stop_worker(worker_id=worker_id) 365 elif directive == b'PAUSE': 366 self.pause() 367 elif directive == b'RESUME': 368 self.resume(worker_id=worker_id) 369 elif directive == b'TERMINATE': 370 self.forcefully_terminate() 371 else: 372 logging.critical("%s received unknown directive %s", directive.decode()) 373 374 def process_sink_data(self) -> None: 375 data = pickle.loads(self.content) 376 self.message.emit(data) 377 378 def terminate_sink(self) -> None: 379 self.terminate_socket.send_multipart([b'0', b'cmd', b'KILL']) 380 381 def _get_ventilator_start_message(self, worker_id: bytes) -> list: 382 return [worker_id, b'cmd', b'START'] 383 384 def start(self) -> None: 385 logging.critical( 386 "Member function start() not implemented in child class of %s", self._process_name 387 ) 388 389 def start_worker(self, worker_id: bytes, data: bytes) -> None: 390 logging.critical( 391 "Member function start_worker() not implemented in child class of %s", 392 self._process_name 393 ) 394 395 def stop(self) -> None: 396 logging.critical( 397 "Member function stop() not implemented in child class of %s", self._process_name 398 ) 399 400 def stop_worker(self, worker_id: int) -> None: 401 logging.critical( 402 "Member function stop_worker() not implemented in child class of %s", 403 self._process_name 404 ) 405 406 def pause(self) -> None: 407 logging.critical("Member function pause() not implemented in child class of %s", 408 self._process_name) 409 410 def resume(self, worker_id: Optional[bytes]) -> None: 411 logging.critical( 412 "Member function stop_worker() not implemented in child class of %s", self._process_name 413 ) 414 415 def send_message_to_worker(self, data: bytes, worker_id:Optional[bytes]=None) -> None: 416 if self.terminating: 417 logging.debug( 418 "%s not sending message to worker because manager is terminated", self._process_name 419 ) 420 return 421 if not self.workers: 422 logging.debug( 423 "%s not sending message to worker because there are no workers", self._process_name 424 ) 425 return 426 427 assert isinstance(data, bytes) 428 429 if worker_id: 430 message = [worker_id, b'data', data] 431 else: 432 message = [b'data', data] 433 self.ventilator_socket.send_multipart(message) 434 435 def forcefully_terminate(self) -> None: 436 """ 437 Forcefully terminate any child processes and clean up. 438 439 Shuts down the sink too. 440 """ 441 442 super().forcefully_terminate() 443 self.terminate_sink() 444 445 446class LoadBalancerWorkerManager(ProcessManager): 447 def __init__(self, no_workers: int, 448 backend_port: int, 449 sink_port: int, 450 logging_port: int) -> None: 451 super().__init__(logging_port=logging_port, thread_name='') 452 self.no_workers = no_workers 453 self.backend_port = backend_port 454 self.sink_port = sink_port 455 456 def _get_command_line(self, worker_id: int) -> str: 457 cmd = self._get_cmd() 458 459 return '{} --request {} --send {} --identity {} --logging {}'.format( 460 cmd, 461 self.backend_port, 462 self.sink_port, 463 worker_id, 464 self.logging_port 465 ) 466 467 def start_workers(self) -> None: 468 for worker_id in range(self.no_workers): 469 self.add_worker(worker_id) 470 471 def zombie_workers(self) -> List[int]: 472 return [ 473 worker_id for worker_id in self.workers 474 if self.processes[worker_id].status() == psutil.STATUS_ZOMBIE 475 ] 476 477 478class LRUQueue: 479 """LRUQueue class using ZMQStream/IOLoop for event dispatching""" 480 481 def __init__(self, backend_socket: zmq.Socket, 482 frontend_socket: zmq.Socket, 483 controller_socket: zmq.Socket, 484 worker_type: str, 485 process_manager: LoadBalancerWorkerManager) -> None: 486 487 self.worker_type = worker_type 488 self.process_manager = process_manager 489 self.workers = deque() 490 self.terminating = False 491 self.terminating_workers = set() # type: Set[bytes] 492 self.stopped_workers = set() # type: Set[int] 493 494 self.backend = ZMQStream(backend_socket) 495 self.frontend = ZMQStream(frontend_socket) 496 self.controller = ZMQStream(controller_socket) 497 self.backend.on_recv(self.handle_backend) 498 self.controller.on_recv(self.handle_controller) 499 500 self.loop = ioloop.IOLoop.instance() 501 502 def handle_controller(self, msg): 503 self.terminating = True 504 505 while len(self.workers): 506 worker_identity = self.workers.popleft() 507 508 logging.debug( 509 "%s load balancer sending stop cmd to worker %s", 510 self.worker_type, worker_identity.decode() 511 ) 512 self.backend.send_multipart([worker_identity, b'', b'cmd', b'STOP']) 513 self.terminating_workers.add(worker_identity) 514 515 self.loop.add_timeout(time.time()+3, self.loop.stop) 516 517 def handle_backend(self, msg): 518 # Queue worker address for LRU routing 519 worker_identity, empty, client_addr = msg[:3] 520 521 # add worker back to the list of workers 522 self.workers.append(worker_identity) 523 524 zw = self.process_manager.zombie_workers() 525 if zw: 526 logging.critical("%s dead thumbnail extractors", len(zw)) 527 528 # Second frame is empty 529 assert empty == b'' 530 531 if msg[-1] == b'STOPPED' and self.terminating: 532 worker_id = get_worker_id_from_identity(worker_identity) 533 self.stopped_workers.add(worker_id) 534 self.terminating_workers.remove(worker_identity) 535 if len(self.terminating_workers) == 0: 536 for worker_id in self.stopped_workers: 537 p = self.process_manager.processes[worker_id] # type: psutil.Process 538 if p.is_running(): 539 pid = p.pid 540 if p.status() != psutil.STATUS_SLEEPING: 541 logging.debug( 542 "Waiting on %s process %s...", p.status(), pid 543 ) 544 os.waitpid(pid, 0) 545 logging.debug("...process %s is finished", pid) 546 else: 547 logging.debug("Process %s is sleeping", pid) 548 self.loop.add_timeout(time.time()+0.5, self.loop.stop) 549 550 if len(self.workers) == 1: 551 # on first recv, start accepting frontend messages 552 self.frontend.on_recv(self.handle_frontend) 553 554 def handle_frontend(self, request): 555 # Dequeue and drop the next worker address 556 worker_identity = self.workers.popleft() 557 558 message = [worker_identity, b''] + request 559 self.backend.send_multipart(message) 560 if len(self.workers) == 0: 561 # stop receiving until workers become available again 562 self.frontend.stop_on_recv() 563 564 565class LoadBalancer: 566 def __init__(self, worker_type: str, process_manager) -> None: 567 568 self.parser = argparse.ArgumentParser() 569 self.parser.add_argument("--receive", required=True) 570 self.parser.add_argument("--send", required=True) 571 self.parser.add_argument("--controller", required=True) 572 self.parser.add_argument("--logging", required=True) 573 574 args = self.parser.parse_args() 575 self.controller_port = args.controller 576 577 context = zmq.Context() 578 frontend = context.socket(zmq.PULL) 579 frontend_port = frontend.bind_to_random_port('tcp://*') 580 581 backend = context.socket(zmq.ROUTER) 582 backend_port = backend.bind_to_random_port('tcp://*') 583 584 reply = context.socket(zmq.REP) 585 reply.connect("tcp://localhost:{}".format(args.receive)) 586 587 controller = context.socket(zmq.PULL) 588 controller.connect('tcp://localhost:{}'.format(self.controller_port)) 589 590 sink_port = args.send 591 logging_port = args.logging 592 593 self.logger_publisher = ProcessLoggerPublisher( 594 context=context, name=worker_type, notification_port=args.logging 595 ) 596 597 logging.debug( 598 "{} load balancer waiting to be notified how many workers to initialize...".format( 599 worker_type 600 ) 601 ) 602 no_workers = int(reply.recv()) 603 logging.debug("...{} load balancer will use {} workers".format(worker_type, no_workers)) 604 reply.send(str(frontend_port).encode()) 605 606 process_manager = process_manager(no_workers, backend_port, sink_port, logging_port) 607 process_manager.start_workers() 608 609 # create queue with the sockets 610 queue = LRUQueue(backend, frontend, controller, worker_type, process_manager) 611 612 # start reactor, which is an infinite loop 613 ioloop.IOLoop.instance().start() 614 615 # Finished infinite loop: do some housekeeping 616 logging.debug("Forcefully terminating load balancer child processes") 617 process_manager.forcefully_terminate() 618 619 frontend.close() 620 backend.close() 621 622 623class LoadBalancerManager(ProcessManager, QObject): 624 """ 625 Launches and requests termination of the Load Balancer process 626 """ 627 628 load_balancer_started = pyqtSignal(int) 629 def __init__(self, context: zmq.Context, 630 no_workers: int, 631 sink_port: int, 632 logging_port: int, 633 thread_name: str) -> None: 634 super().__init__(logging_port=logging_port, thread_name=thread_name) 635 self.no_workers = no_workers 636 self.sink_port = sink_port 637 self.context = context 638 639 @pyqtSlot() 640 def start_load_balancer(self) -> None: 641 642 self.controller_socket = self.context.socket(zmq.PUSH) 643 self.controller_port = self.controller_socket.bind_to_random_port('tcp://*') 644 645 self.requester = self.context.socket(zmq.REQ) 646 self.requester_port = self.requester.bind_to_random_port('tcp://*') 647 648 self.thread_controller = self. context.socket(zmq.PAIR) 649 self.thread_controller.connect('inproc://{}'.format(self.thread_name)) 650 651 worker_id = 0 652 self.add_worker(worker_id) 653 self.requester.send(str(self.no_workers).encode()) 654 self.frontend_port = int(self.requester.recv()) 655 self.load_balancer_started.emit(self.frontend_port) 656 657 # wait for stop signal 658 directive, worker_id, data = self.thread_controller.recv_multipart() 659 assert directive == b'STOP' 660 self.stop() 661 662 def stop(self): 663 self.controller_socket.send(b'STOP') 664 665 def _get_command_line(self, worker_id: int) -> str: 666 cmd = self._get_cmd() 667 668 return '{} --receive {} --send {} --controller {} --logging {}'.format( 669 cmd, 670 self.requester_port, 671 self.sink_port, 672 self.controller_port, 673 self.logging_port 674 ) 675 676DAEMON_WORKER_ID = 0 677 678 679class PushPullDaemonManager(PullPipelineManager): 680 """ 681 Manage a single instance daemon worker process that waits to work on data 682 issued by this manager. The data to be worked on is issued in sequence, 683 one after the other. 684 685 Because this is a single daemon process, a Push-Pull model is most 686 suitable for sending the data. 687 """ 688 689 def _start_sockets(self) -> None: 690 691 super()._start_sockets() 692 693 context = zmq.Context.instance() 694 695 # Ventilator socket to send message to worker 696 self.ventilator_socket = context.socket(zmq.PUSH) 697 self.ventilator_port = self.ventilator_socket.bind_to_random_port('tcp://*') 698 699 def stop(self) -> None: 700 """ 701 Permanently stop the daemon process and terminate 702 """ 703 704 logging.debug("{} halting".format(self._process_name)) 705 self.terminating = True 706 707 # Only send stop command if the process is still running 708 if self.process_alive(DAEMON_WORKER_ID): 709 try: 710 self.ventilator_socket.send_multipart([b'cmd', b'STOP'], zmq.DONTWAIT) 711 except zmq.Again: 712 logging.debug( 713 "Terminating %s sink because child process did not receive message", 714 self._process_name 715 ) 716 self.terminate_sink() 717 else: 718 # The process may have crashed. Stop the sink. 719 self.terminate_sink() 720 721 def _get_command_line(self, worker_id: int) -> str: 722 cmd = self._get_cmd() 723 724 return '{} --receive {} --send {} --logging {}'.format( 725 cmd, 726 self.ventilator_port, 727 self.receiver_port, 728 self.logging_port 729 ) 730 731 def _get_ventilator_start_message(self, worker_id: int) -> List[bytes]: 732 return [b'cmd', b'START'] 733 734 def start(self) -> None: 735 logging.debug("Starting worker for %s", self._process_name) 736 self.add_worker(worker_id=DAEMON_WORKER_ID) 737 738 739class PublishPullPipelineManager(PullPipelineManager): 740 """ 741 Manage a collection of worker processes that wait to work on data 742 issued by this manager. The data to be worked on is issued in sequence, 743 one after the other, either once, or many times. 744 745 Because there are multiple worker process, a Publish-Subscribe model is 746 most suitable for sending data to workers. 747 """ 748 749 def _start_sockets(self) -> None: 750 751 super()._start_sockets() 752 753 context = zmq.Context.instance() 754 755 # Ventilator socket to send messages to workers on 756 self.ventilator_socket = context.socket(zmq.PUB) 757 self.ventilator_port= self.ventilator_socket.bind_to_random_port('tcp://*') 758 759 # Socket to synchronize the start of each worker 760 self.sync_service_socket = context.socket(zmq.REP) 761 self.sync_service_port = self.sync_service_socket.bind_to_random_port("tcp://*") 762 763 # Socket for worker control: pause, resume, stop 764 self.controller_socket = context.socket(zmq.PUB) 765 self.controller_port = self.controller_socket.bind_to_random_port("tcp://*") 766 767 def stop(self) -> None: 768 """ 769 Permanently stop all the workers and terminate 770 """ 771 772 logging.debug("{} halting".format(self._process_name)) 773 self.terminating = True 774 if self.workers: 775 # Signal workers they must immediately stop 776 termination_signal_sent = False 777 alive_workers = [worker_id for worker_id in self.workers if 778 self.process_alive(worker_id)] 779 for worker_id in alive_workers: 780 781 message = [make_filter_from_worker_id(worker_id),b'STOP'] 782 self.controller_socket.send_multipart(message) 783 784 message = [make_filter_from_worker_id(worker_id), b'cmd', b'STOP'] 785 self.ventilator_socket.send_multipart(message) 786 termination_signal_sent = True 787 788 if not termination_signal_sent: 789 self.terminate_sink() 790 else: 791 self.terminate_sink() 792 793 def stop_worker(self, worker_id: bytes) -> None: 794 """ 795 Permanently stop one worker 796 """ 797 798 if int(worker_id) in self.workers: 799 message = [worker_id, b'STOP'] 800 self.controller_socket.send_multipart(message) 801 message = [worker_id, b'cmd', b'STOP'] 802 self.ventilator_socket.send_multipart(message) 803 804 def start_worker(self, worker_id: bytes, data: bytes) -> None: 805 806 self.add_worker(int(worker_id)) 807 808 # Send START commands until scan worker indicates it is ready to 809 # receive data 810 # Worker ID must be in bytes format 811 while True: 812 self.ventilator_socket.send_multipart( 813 self._get_ventilator_start_message(worker_id)) 814 try: 815 # look for synchronization request 816 self.sync_service_socket.recv(zmq.DONTWAIT) 817 # send synchronization reply 818 self.sync_service_socket.send(b'') 819 break 820 except zmq.Again: 821 # Briefly pause sending out START messages 822 # There is no point flooding the network 823 time.sleep(.01) 824 825 # Send data to process to tell it what to work on 826 self.send_message_to_worker(data=data, worker_id=worker_id) 827 828 def _get_command_line(self, worker_id: int) -> str: 829 cmd = self._get_cmd() 830 831 return '{} --receive {} --send {} --controller {} --syncclient {} --filter {} --logging '\ 832 '{}'.format( 833 cmd, 834 self.ventilator_port, 835 self.receiver_port, 836 self.controller_port, 837 self.sync_service_port, 838 worker_id, 839 self.logging_port 840 ) 841 842 def __len__(self) -> int: 843 return len(self.workers) 844 845 def __contains__(self, item) -> bool: 846 return item in self.workers 847 848 def pause(self) -> None: 849 for worker_id in self.workers: 850 message = [make_filter_from_worker_id(worker_id), b'PAUSE'] 851 self.controller_socket.send_multipart(message) 852 853 def resume(self, worker_id: bytes) -> None: 854 if worker_id: 855 workers = [int(worker_id)] 856 else: 857 workers = self.workers 858 for worker_id in workers: 859 message = [make_filter_from_worker_id(worker_id), b'RESUME'] 860 self.controller_socket.send_multipart(message) 861 862 863class ProcessLoggerPublisher: 864 """ 865 Setup the sockets for worker processes to send log messages to the 866 main process. 867 868 Two tasks: set up the PUB socket, and then tell the main process 869 what port we're using via a second socket, and when we're closing it. 870 """ 871 872 def __init__(self, context: zmq.Context, name: str, notification_port: int) -> None: 873 874 self.logger_pub = context.socket(zmq.PUB) 875 self.logger_pub_port = self.logger_pub.bind_to_random_port("tcp://*") 876 self.handler = ZeroMQSocketHandler(self.logger_pub) 877 self.handler.setLevel(logging.DEBUG) 878 879 self.logger = logging.getLogger() 880 self.logger.setLevel(logging.DEBUG) 881 self.logger.addHandler(self.handler) 882 883 self.logger_socket = context.socket(zmq.PUSH) 884 self.logger_socket.connect("tcp://localhost:{}".format(notification_port)) 885 self.logger_socket.send_multipart([b'CONNECT', str(self.logger_pub_port).encode()]) 886 887 def close(self): 888 self.logger.removeHandler(self.handler) 889 self.logger_socket.send_multipart([b'DISCONNECT', str(self.logger_pub_port).encode()]) 890 self.logger_pub.close() 891 self.logger_socket.close() 892 893 894class WorkerProcess(): 895 def __init__(self, worker_type: str) -> None: 896 super().__init__() 897 self.parser = argparse.ArgumentParser() 898 self.parser.add_argument("--receive", required=True) 899 self.parser.add_argument("--send", required=True) 900 self.parser.add_argument("--logging", required=True) 901 902 def cleanup_pre_stop(self) -> None: 903 """ 904 Operations to run if process is stopped. 905 906 Implement in child class if needed. 907 """ 908 909 pass 910 911 def setup_logging_pub(self, notification_port: int, name: str) -> None: 912 """ 913 Sets up the 0MQ socket that sends out logging messages 914 915 :param notification_port: port that should be notified about 916 the new logging publisher 917 :param name: descriptive name to place in the log messages 918 """ 919 920 if self.worker_id is not None: 921 name = '{}-{}'.format(name, self.worker_id.decode()) 922 self.logger_publisher = ProcessLoggerPublisher( 923 context=self.context, name=name, notification_port=notification_port 924 ) 925 926 def send_message_to_sink(self) -> None: 927 928 self.sender.send_multipart([self.worker_id, b'data', self.content]) 929 930 def initialise_process(self) -> None: 931 # Wait to receive "START" message 932 worker_id, directive, content = self.receiver.recv_multipart() 933 assert directive == b'cmd' 934 assert content == b'START' 935 936 # send a synchronization request 937 self.sync_client.send(b'') 938 939 # wait for synchronization reply 940 self.sync_client.recv() 941 942 # Receive next "START" message and discard, looking for data message 943 while True: 944 worker_id, directive, content = self.receiver.recv_multipart() 945 if directive == b'data': 946 break 947 else: 948 assert directive == b'cmd' 949 assert content == b'START' 950 951 self.content = content 952 953 def do_work(self): 954 pass 955 956 957class DaemonProcess(WorkerProcess): 958 """ 959 Single instance 960 """ 961 def __init__(self, worker_type: str) -> None: 962 super().__init__(worker_type) 963 964 args = self.parser.parse_args() 965 966 self.context = zmq.Context() 967 # Socket to send messages along the pipe to 968 self.sender = self.context.socket(zmq.PUSH) 969 self.sender.set_hwm(10) 970 self.sender.connect("tcp://localhost:{}".format(args.send)) 971 972 self.receiver = self.context.socket(zmq.PULL) 973 self.receiver.connect("tcp://localhost:{}".format(args.receive)) 974 975 self.worker_id = None 976 977 self.setup_logging_pub(notification_port=args.logging, name=worker_type) 978 979 def run(self) -> None: 980 pass 981 982 def check_for_command(self, directive: bytes, content: bytes) -> None: 983 if directive == b'cmd': 984 assert content == b'STOP' 985 self.cleanup_pre_stop() 986 # signal to sink that we've terminated before finishing 987 self.sender.send_multipart( 988 [make_filter_from_worker_id(DAEMON_WORKER_ID), b'cmd', b'STOPPED'] 989 ) 990 sys.exit(0) 991 992 def send_message_to_sink(self) -> None: 993 # Must use a dummy value for the worker id, as there is only ever one 994 # instance. 995 self.sender.send_multipart( 996 [make_filter_from_worker_id(DAEMON_WORKER_ID), b'data', self.content] 997 ) 998 999 1000class WorkerInPublishPullPipeline(WorkerProcess): 1001 """ 1002 Worker counterpart to PublishPullPipelineManager; multiple instance. 1003 """ 1004 def __init__(self, worker_type: str) -> None: 1005 super().__init__(worker_type) 1006 self.add_args() 1007 1008 args = self.parser.parse_args() 1009 1010 subscription_filter = self.worker_id = args.filter.encode() 1011 self.context = zmq.Context() 1012 1013 self.setup_sockets(args, subscription_filter) 1014 self.setup_logging_pub(notification_port=args.logging, name=worker_type) 1015 1016 self.initialise_process() 1017 self.do_work() 1018 1019 def add_args(self) -> None: 1020 self.parser.add_argument("--filter", required=True) 1021 self.parser.add_argument("--syncclient", required=True) 1022 self.parser.add_argument("--controller", required=True) 1023 1024 def setup_sockets(self, args, subscription_filter: bytes) -> None: 1025 1026 # Socket to send messages along the pipe to 1027 self.sender = self.context.socket(zmq.PUSH) 1028 self.sender.set_hwm(10) 1029 self.sender.connect("tcp://localhost:{}".format(args.send)) 1030 1031 # Socket to receive messages from the pipe 1032 self.receiver = self.context.socket(zmq.SUB) 1033 self.receiver.connect("tcp://localhost:{}".format(args.receive)) 1034 self.receiver.setsockopt(zmq.SUBSCRIBE, subscription_filter) 1035 1036 # Socket to receive controller messages: stop, pause, resume 1037 self.controller = self.context.socket(zmq.SUB) 1038 self.controller.connect("tcp://localhost:{}".format(args.controller)) 1039 self.controller.setsockopt(zmq.SUBSCRIBE, subscription_filter) 1040 1041 # Socket to synchronize the start of receiving data from upstream 1042 self.sync_client = self.context.socket(zmq.REQ) 1043 self.sync_client.connect("tcp://localhost:{}".format(args.syncclient)) 1044 1045 def check_for_command(self, directive: bytes, content) -> None: 1046 if directive == b'cmd': 1047 try: 1048 assert content == b'STOP' 1049 except AssertionError: 1050 logging.critical("Expected STOP command but instead got %s", content.decode()) 1051 else: 1052 self.cleanup_pre_stop() 1053 self.disconnect_logging() 1054 # signal to sink that we've terminated before finishing 1055 self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED']) 1056 sys.exit(0) 1057 1058 def check_for_controller_directive(self) -> None: 1059 try: 1060 # Don't block if process is running regularly 1061 # If there is no command,exception will occur 1062 worker_id, command = self.controller.recv_multipart(zmq.DONTWAIT) 1063 assert command in [b'PAUSE', b'STOP'] 1064 assert worker_id == self.worker_id 1065 1066 if command == b'PAUSE': 1067 # Because the process is paused, do a blocking read to 1068 # wait for the next command 1069 worker_id, command = self.controller.recv_multipart() 1070 assert (command in [b'RESUME', b'STOP']) 1071 if command == b'STOP': 1072 self.cleanup_pre_stop() 1073 # before finishing, signal to sink that we've terminated 1074 self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED']) 1075 sys.exit(0) 1076 except zmq.Again: 1077 pass # Continue working 1078 1079 def resume_work(self) -> None: 1080 worker_id, command = self.controller.recv_multipart() 1081 assert (command in [b'RESUME', b'STOP']) 1082 if command == b'STOP': 1083 self.cleanup_pre_stop() 1084 self.disconnect_logging() 1085 # before finishing, signal to sink that we've terminated 1086 self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED']) 1087 sys.exit(0) 1088 1089 def disconnect_logging(self) -> None: 1090 self.logger_publisher.close() 1091 1092 def send_finished_command(self) -> None: 1093 self.sender.send_multipart([self.worker_id, b'cmd', b'FINISHED']) 1094 1095 1096class LoadBalancerWorker: 1097 def __init__(self, worker_type: str) -> None: 1098 super().__init__() 1099 self.parser = argparse.ArgumentParser() 1100 self.parser.add_argument("--request", required=True) 1101 self.parser.add_argument("--send", required=True) 1102 self.parser.add_argument("--identity", required=True) 1103 self.parser.add_argument("--logging", required=True) 1104 1105 args = self.parser.parse_args() 1106 1107 self.context = zmq.Context() 1108 1109 self.requester = self.context.socket(zmq.REQ) 1110 self.identity = create_identity(worker_type, args.identity) 1111 self.requester.identity = self.identity 1112 self.requester.connect("tcp://localhost:{}".format(args.request)) 1113 1114 # Sender is located in the main process. It is where output (messages) 1115 # from this process are are sent to. 1116 self.sender = self.context.socket(zmq.PUSH) 1117 self.sender.connect("tcp://localhost:{}".format(args.send)) 1118 1119 self.logger_publisher = ProcessLoggerPublisher( 1120 context=self.context, name=worker_type, notification_port=args.logging 1121 ) 1122 1123 # Tell the load balancer we are ready for work 1124 self.requester.send(b"READY") 1125 self.do_work() 1126 1127 def do_work(self) -> None: 1128 # Implement in subclass 1129 pass 1130 1131 def cleanup_pre_stop(self) -> None: 1132 """ 1133 Operations to run if process is stopped. 1134 1135 Implement in child class if needed. 1136 """ 1137 1138 pass 1139 1140 def exit(self): 1141 self.cleanup_pre_stop() 1142 identity = self.requester.identity.decode() 1143 # signal to load balancer that we've terminated before finishing 1144 self.requester.send_multipart([b'', b'', b'STOPPED']) 1145 self.requester.close() 1146 self.sender.close() 1147 self.logger_publisher.close() 1148 self.context.term() 1149 logging.debug("%s with pid %s stopped", identity, os.getpid()) 1150 sys.exit(0) 1151 1152 def check_for_command(self, directive: bytes, content: bytes): 1153 if directive == b'cmd': 1154 assert content == b'STOP' 1155 self.exit() 1156 1157 1158class ProcessLoggingManager(QObject): 1159 """ 1160 Receive and log logging messages from workers. 1161 1162 An alternative might be using python logging's QueueListener, which 1163 like this code, runs on its own thread. 1164 """ 1165 1166 ready = pyqtSignal(int) 1167 1168 @pyqtSlot() 1169 def startReceiver(self) -> None: 1170 context = zmq.Context.instance() 1171 self.receiver = context.socket(zmq.SUB) 1172 # Subscribe to all variates of logging messages 1173 self.receiver.setsockopt(zmq.SUBSCRIBE, b'') 1174 1175 # Socket to receive subscription information, and the stop command 1176 info_socket = context.socket(zmq.PULL) 1177 self.info_port = info_socket.bind_to_random_port('tcp://*') 1178 1179 poller = zmq.Poller() 1180 poller.register(self.receiver, zmq.POLLIN) 1181 poller.register(info_socket, zmq.POLLIN) 1182 1183 self.ready.emit(self.info_port) 1184 1185 while True: 1186 try: 1187 socks = dict(poller.poll()) 1188 except KeyboardInterrupt: 1189 break 1190 1191 if self.receiver in socks: 1192 message = self.receiver.recv() 1193 record = logging.makeLogRecord(pickle.loads(message)) 1194 logger.handle(record) 1195 1196 if info_socket in socks: 1197 directive, content = info_socket.recv_multipart() 1198 if directive == b'STOP': 1199 break 1200 elif directive == b'CONNECT': 1201 self.addSubscription(content) 1202 else: 1203 assert directive == b'DISCONNECT' 1204 self.removeSubscription(content) 1205 1206 def addSubscription(self, port: bytes) -> None: 1207 try: 1208 port = int(port) 1209 except ValueError: 1210 logging.critical('Incorrect port value in add logging subscription: %s', port) 1211 else: 1212 logging.debug("Subscribing to logging on port %s", port) 1213 self.receiver.connect("tcp://localhost:{}".format(port)) 1214 1215 def removeSubscription(self, port: bytes): 1216 try: 1217 port = int(port) 1218 except ValueError: 1219 logging.critical('Incorrect port value in remove logging subscription: %s', port) 1220 else: 1221 logging.debug("Unsubscribing to logging on port %s", port) 1222 self.receiver.disconnect("tcp://localhost:{}".format(port)) 1223 1224 1225def stop_process_logging_manager(info_port: int) -> None: 1226 """ 1227 Stop ProcessLoggingManager thread 1228 1229 :param info_port: the port number the manager uses 1230 """ 1231 1232 context = zmq.Context.instance() 1233 command = context.socket(zmq.PUSH) 1234 command.connect("tcp://localhost:{}".format(info_port)) 1235 command.send_multipart([b'STOP', b'']) 1236 1237 1238class ScanArguments: 1239 """ 1240 Pass arguments to the scan process 1241 """ 1242 def __init__(self, device: Device, 1243 ignore_other_types: bool, 1244 log_gphoto2: bool) -> None: 1245 """ 1246 Pass arguments to the scan process 1247 1248 :param device: the device to scan 1249 :param ignore_other_types: ignore file types like TIFF 1250 :param log_gphoto2: whether to generate detailed gphoto2 log 1251 messages 1252 """ 1253 1254 self.device = device 1255 self.ignore_other_types = ignore_other_types 1256 self.log_gphoto2 = log_gphoto2 1257 1258 1259class ScanResults: 1260 """ 1261 Receive results from the scan process 1262 """ 1263 1264 def __init__(self, rpd_files: Optional[List[RPDFile]]=None, 1265 file_type_counter: Optional[FileTypeCounter]=None, 1266 file_size_sum: Optional[FileSizeSum]=None, 1267 error_code: Optional[CameraErrorCode]=None, 1268 scan_id: Optional[int]=None, 1269 optimal_display_name: Optional[str]=None, 1270 storage_space: Optional[List[StorageSpace]]=None, 1271 storage_descriptions: Optional[List[str]]=None, 1272 sample_photo: Optional[Photo]=None, 1273 sample_video: Optional[Video]=None, 1274 problems: Optional[ScanProblems]=None, 1275 fatal_error: Optional[bool]=None, 1276 camera_removed: Optional[bool]=None, 1277 entire_video_required: Optional[bool]=None, 1278 entire_photo_required: Optional[bool]=None) -> None: 1279 self.rpd_files = rpd_files 1280 self.file_type_counter = file_type_counter 1281 self.file_size_sum = file_size_sum 1282 self.error_code = error_code 1283 self.scan_id = scan_id 1284 self.optimal_display_name = optimal_display_name 1285 self.storage_space = storage_space 1286 self.storage_descriptions = storage_descriptions 1287 self.sample_photo = sample_photo 1288 self.sample_video = sample_video 1289 self.problems = problems 1290 self.fatal_error = fatal_error 1291 self.camera_removed = camera_removed 1292 self.entire_video_required = entire_video_required 1293 self.entire_photo_required = entire_photo_required 1294 1295 1296class CopyFilesArguments: 1297 """ 1298 Pass arguments to the copyfiles process 1299 """ 1300 1301 def __init__(self, scan_id: int, 1302 device: Device, 1303 photo_download_folder: str, 1304 video_download_folder: str, 1305 files: List[RPDFile], 1306 verify_file: bool, 1307 generate_thumbnails: bool, 1308 log_gphoto2: bool) -> None: 1309 self.scan_id = scan_id 1310 self.device = device 1311 self.photo_download_folder = photo_download_folder 1312 self.video_download_folder = video_download_folder 1313 self.files = files 1314 self.generate_thumbnails = generate_thumbnails 1315 self.verify_file = verify_file 1316 self.log_gphoto2 = log_gphoto2 1317 1318 1319class CopyFilesResults: 1320 """ 1321 Receive results from the copyfiles process 1322 """ 1323 1324 def __init__(self, scan_id: Optional[int]=None, 1325 photo_temp_dir: Optional[str]=None, 1326 video_temp_dir: Optional[str]=None, 1327 total_downloaded: Optional[int]=None, 1328 chunk_downloaded: Optional[int]=None, 1329 copy_succeeded: Optional[bool]=None, 1330 rpd_file: Optional[RPDFile]=None, 1331 download_count: Optional[int]=None, 1332 mdata_exceptions: Optional[Tuple]=None, 1333 problems: Optional[CopyingProblems]=None, 1334 camera_removed: Optional[bool]=None) -> None: 1335 """ 1336 1337 :param scan_id: scan id of the device the files are being 1338 downloaded from 1339 :param photo_temp_dir: temp directory path, used to copy 1340 photos into until they're renamed 1341 :param video_temp_dir: temp directory path, used to copy 1342 videos into until they're renamed 1343 :param total_downloaded: how many bytes in total have been 1344 downloaded 1345 :param chunk_downloaded: how many bytes were downloaded since 1346 the last message 1347 :param copy_succeeded: whether the copy was successful or not 1348 :param rpd_file: details of the file that was copied 1349 :param download_count: a running count of how many files 1350 have been copied. Used in download tracking. 1351 :param mdata_exceptions: details of errors setting file metadata 1352 :param problems: details of any problems encountered copying files, 1353 not including metedata write problems. 1354 """ 1355 1356 self.scan_id = scan_id 1357 1358 self.photo_temp_dir = photo_temp_dir 1359 self.video_temp_dir = video_temp_dir 1360 1361 self.total_downloaded = total_downloaded 1362 self.chunk_downloaded = chunk_downloaded 1363 1364 self.copy_succeeded = copy_succeeded 1365 self.rpd_file = rpd_file 1366 self.download_count = download_count 1367 self.mdata_exceptions = mdata_exceptions 1368 self.problems = problems 1369 self.camera_removed = camera_removed 1370 1371 1372class ThumbnailDaemonData: 1373 """ 1374 Pass arguments to the thumbnail daemon process. 1375 1376 Occurs after a file is downloaded & renamed, and also 1377 after a file is backed up. 1378 """ 1379 1380 def __init__(self, frontend_port: Optional[int]=None, 1381 rpd_file: Optional[RPDFile]=None, 1382 write_fdo_thumbnail: Optional[bool]=None, 1383 use_thumbnail_cache: Optional[bool]=None, 1384 backup_full_file_names: Optional[List[str]]=None, 1385 fdo_name: Optional[str]=None, 1386 force_exiftool: Optional[bool]=None) -> None: 1387 self.frontend_port = frontend_port 1388 self.rpd_file = rpd_file 1389 self.write_fdo_thumbnail = write_fdo_thumbnail 1390 self.use_thumbnail_cache = use_thumbnail_cache 1391 self.backup_full_file_names = backup_full_file_names 1392 self.fdo_name = fdo_name 1393 self.force_exiftool = force_exiftool 1394 1395 1396class RenameAndMoveFileData: 1397 """ 1398 Pass arguments to the renameandmovefile process 1399 """ 1400 1401 def __init__(self, rpd_file: RPDFile=None, 1402 download_count: int=None, 1403 download_succeeded: bool=None, 1404 message: RenameAndMoveStatus=None) -> None: 1405 self.rpd_file = rpd_file 1406 self.download_count = download_count 1407 self.download_succeeded = download_succeeded 1408 self.message = message 1409 1410 1411class RenameAndMoveFileResults: 1412 def __init__(self, move_succeeded: bool=None, 1413 rpd_file: RPDFile=None, 1414 download_count: int=None, 1415 stored_sequence_no: int=None, 1416 downloads_today: List[str]=None, 1417 problems: Optional[RenamingProblems]=None) -> None: 1418 self.move_succeeded = move_succeeded 1419 self.rpd_file = rpd_file 1420 self.download_count = download_count 1421 self.stored_sequence_no = stored_sequence_no 1422 self.downloads_today = downloads_today 1423 self.problems = problems 1424 1425 1426class OffloadData: 1427 def __init__(self, thumbnail_rows: Optional[Sequence[ThumbnailDataForProximity]]=None, 1428 proximity_seconds: int=None, 1429 rpd_files: Optional[Sequence[RPDFile]]=None, 1430 strip_characters: Optional[bool]=None, 1431 folders_preview: Optional[FoldersPreview]=None) -> None: 1432 self.thumbnail_rows = thumbnail_rows 1433 self.proximity_seconds = proximity_seconds 1434 self.rpd_files = rpd_files 1435 self.strip_characters = strip_characters 1436 self.folders_preview = folders_preview 1437 1438 1439class OffloadResults: 1440 def __init__(self, proximity_groups: Optional[TemporalProximityGroups]=None, 1441 folders_preview: Optional[FoldersPreview]=None) -> None: 1442 self.proximity_groups = proximity_groups 1443 self.folders_preview = folders_preview 1444 1445 1446class BackupArguments: 1447 """ 1448 Pass start up data to the back up process 1449 """ 1450 def __init__(self, path: str, device_name: str) -> None: 1451 self.path = path 1452 self.device_name = device_name 1453 1454 1455class BackupFileData: 1456 """ 1457 Pass file data to the backup process 1458 """ 1459 def __init__(self, rpd_file: Optional[RPDFile]=None, 1460 move_succeeded: Optional[bool]=None, 1461 do_backup: Optional[bool]=None, 1462 path_suffix: Optional[str]=None, 1463 backup_duplicate_overwrite: Optional[bool]=None, 1464 verify_file: Optional[bool]=None, 1465 download_count: Optional[int]=None, 1466 save_fdo_thumbnail: Optional[int]=None, 1467 message: Optional[BackupStatus]=None) -> None: 1468 self.rpd_file = rpd_file 1469 self.move_succeeded = move_succeeded 1470 self.do_backup = do_backup 1471 self.path_suffix = path_suffix 1472 self.backup_duplicate_overwrite = backup_duplicate_overwrite 1473 self.verify_file = verify_file 1474 self.download_count = download_count 1475 self.save_fdo_thumbnail = save_fdo_thumbnail 1476 self.message = message 1477 1478 1479class BackupResults: 1480 def __init__(self, scan_id: int, 1481 device_id: int, 1482 total_downloaded: Optional[int]=None, 1483 chunk_downloaded: Optional[int]=None, 1484 backup_succeeded: Optional[bool]=None, 1485 do_backup: Optional[bool]=None, 1486 rpd_file: Optional[RPDFile] = None, 1487 backup_full_file_name: Optional[str]=None, 1488 mdata_exceptions: Optional[Tuple] = None, 1489 problems: Optional[BackingUpProblems]=None) -> None: 1490 self.scan_id = scan_id 1491 self.device_id = device_id 1492 self.total_downloaded = total_downloaded 1493 self.chunk_downloaded = chunk_downloaded 1494 self.backup_succeeded = backup_succeeded 1495 self.do_backup = do_backup 1496 self.rpd_file = rpd_file 1497 self.backup_full_file_name = backup_full_file_name 1498 self.mdata_exceptions = mdata_exceptions 1499 self.problems = problems 1500 1501 1502class GenerateThumbnailsArguments: 1503 def __init__(self, scan_id: int, 1504 rpd_files: List[RPDFile], 1505 name: str, 1506 proximity_seconds: int, 1507 cache_dirs: CacheDirs, 1508 need_photo_cache_dir: bool, 1509 need_video_cache_dir: bool, 1510 frontend_port: int, 1511 log_gphoto2: bool, 1512 camera: Optional[str]=None, 1513 port: Optional[str]=None, 1514 entire_video_required: Optional[bool]=None, 1515 entire_photo_required: Optional[bool]=None) -> None: 1516 """ 1517 List of files for which thumbnails are to be generated. 1518 All files are assumed to have the same scan id. 1519 :param scan_id: id of the scan 1520 :param rpd_files: files from which to extract thumbnails 1521 :param name: name of the device 1522 :param proximity_seconds: the time elapsed between consecutive 1523 shots that is used to prioritize the order of thumbnail 1524 generation 1525 :param cache_dirs: the location where the cache directories 1526 should be created 1527 :param need_photo_cache_dir: if True, must use cache dir 1528 to extract photo thumbnail 1529 :param need_video_cache_dir: if True, must use cache dir 1530 to extract video thumbnail 1531 :param frontend_port: port to use to send to load balancer's 1532 front end 1533 :param log_gphoto2: if True, log libgphoto2 logging messages 1534 :param camera: If the thumbnails are being downloaded from a 1535 camera, this is the name of the camera, else None 1536 :param port: If the thumbnails are being downloaded from a 1537 camera, this is the port of the camera, else None 1538 :param entire_video_required: if the entire video is required 1539 to extract the thumbnail 1540 :param entire_photo_required: if the entire photo is required 1541 to extract the thumbnail 1542 """ 1543 1544 self.rpd_files = rpd_files 1545 self.scan_id = scan_id 1546 self.name = name 1547 self.proximity_seconds = proximity_seconds 1548 self.cache_dirs = cache_dirs 1549 self.need_photo_cache_dir = need_photo_cache_dir 1550 self.need_video_cache_dir = need_video_cache_dir 1551 self.frontend_port = frontend_port 1552 if camera is not None: 1553 assert port is not None 1554 assert entire_video_required is not None 1555 self.camera = camera 1556 self.port = port 1557 self.log_gphoto2 = log_gphoto2 1558 self.entire_video_required = entire_video_required 1559 self.entire_photo_required = entire_photo_required 1560 1561 1562class GenerateThumbnailsResults: 1563 def __init__(self, rpd_file: Optional[RPDFile]=None, 1564 thumbnail_bytes: Optional[bytes]=None, 1565 scan_id: Optional[int]=None, 1566 cache_dirs: Optional[CacheDirs]=None, 1567 camera_removed: Optional[bool]=None,) -> None: 1568 self.rpd_file = rpd_file 1569 # If thumbnail_bytes is None, there is no thumbnail 1570 self.thumbnail_bytes = thumbnail_bytes 1571 self.scan_id = scan_id 1572 self.cache_dirs = cache_dirs 1573 self.camera_removed = camera_removed 1574 1575 1576class ThumbnailExtractorArgument: 1577 def __init__(self, rpd_file: RPDFile, 1578 task: ExtractionTask, 1579 processing: Set[ExtractionProcessing], 1580 full_file_name_to_work_on: str, 1581 secondary_full_file_name: str, 1582 exif_buffer: Optional[bytearray], 1583 thumbnail_bytes: bytes, 1584 use_thumbnail_cache: bool, 1585 file_to_work_on_is_temporary: bool, 1586 write_fdo_thumbnail: bool, 1587 send_thumb_to_main: bool, 1588 force_exiftool: bool) -> None: 1589 self.rpd_file = rpd_file 1590 self.task = task 1591 self.processing = processing 1592 self.full_file_name_to_work_on = full_file_name_to_work_on 1593 self.secondary_full_file_name = secondary_full_file_name 1594 self.file_to_work_on_is_temporary = file_to_work_on_is_temporary 1595 self.exif_buffer = exif_buffer 1596 self.thumbnail_bytes = thumbnail_bytes 1597 self.use_thumbnail_cache = use_thumbnail_cache 1598 self.write_fdo_thumbnail = write_fdo_thumbnail 1599 self.send_thumb_to_main = send_thumb_to_main 1600 self.force_exiftool = force_exiftool 1601 1602 1603class RenameMoveFileManager(PushPullDaemonManager): 1604 """ 1605 Manages the single instance daemon process that renames and moves 1606 files that have just been downloaded 1607 """ 1608 1609 message = pyqtSignal(bool, RPDFile, int) 1610 sequencesUpdate = pyqtSignal(int, list) 1611 renameProblems = pyqtSignal('PyQt_PyObject') 1612 1613 def __init__(self, logging_port: int) -> None: 1614 super().__init__(logging_port=logging_port, thread_name=ThreadNames.rename) 1615 self._process_name = 'Rename and Move File Manager' 1616 self._process_to_run = 'renameandmovefile.py' 1617 1618 def process_sink_data(self): 1619 data = pickle.loads(self.content) # type: RenameAndMoveFileResults 1620 if data.move_succeeded is not None: 1621 1622 self.message.emit(data.move_succeeded, data.rpd_file, data.download_count) 1623 1624 elif data.problems is not None: 1625 self.renameProblems.emit(data.problems) 1626 else: 1627 assert data.stored_sequence_no is not None 1628 assert data.downloads_today is not None 1629 assert isinstance(data.downloads_today, list) 1630 self.sequencesUpdate.emit(data.stored_sequence_no, data.downloads_today) 1631 1632 1633class ThumbnailDaemonManager(PushPullDaemonManager): 1634 """ 1635 Manages the process that extracts thumbnails after the file 1636 has already been downloaded and that writes FreeDesktop.org 1637 thumbnails. Not to be confused with ThumbnailManagerPara, which 1638 manages thumbnailing using processes that run in parallel, 1639 one for each device. 1640 """ 1641 1642 message = pyqtSignal(RPDFile, QPixmap) 1643 1644 def __init__(self, logging_port: int) -> None: 1645 super().__init__(logging_port=logging_port, thread_name=ThreadNames.thumbnail_daemon) 1646 self._process_name = 'Thumbnail Daemon Manager' 1647 self._process_to_run = 'thumbnaildaemon.py' 1648 1649 def process_sink_data(self) -> None: 1650 data = pickle.loads(self.content) # type: GenerateThumbnailsResults 1651 if data.thumbnail_bytes is None: 1652 thumbnail = QPixmap() 1653 else: 1654 thumbnail = QImage.fromData(data.thumbnail_bytes) 1655 if thumbnail.isNull(): 1656 thumbnail = QPixmap() 1657 else: 1658 thumbnail = QPixmap.fromImage(thumbnail) 1659 self.message.emit(data.rpd_file, thumbnail) 1660 1661 1662class OffloadManager(PushPullDaemonManager): 1663 """ 1664 Handles tasks best run in a separate process 1665 """ 1666 1667 message = pyqtSignal(TemporalProximityGroups) 1668 downloadFolders = pyqtSignal(FoldersPreview) 1669 1670 def __init__(self, logging_port: int) -> None: 1671 super().__init__(logging_port=logging_port, thread_name=ThreadNames.offload) 1672 self._process_name = 'Offload Manager' 1673 self._process_to_run = 'offload.py' 1674 1675 def process_sink_data(self) -> None: 1676 data = pickle.loads(self.content) # type: OffloadResults 1677 if data.proximity_groups is not None: 1678 self.message.emit(data.proximity_groups) 1679 elif data.folders_preview is not None: 1680 self.downloadFolders.emit(data.folders_preview) 1681 1682 1683class ScanManager(PublishPullPipelineManager): 1684 """ 1685 Handles the processes that scan devices (cameras, external devices, 1686 this computer path) 1687 """ 1688 scannedFiles = pyqtSignal( 1689 'PyQt_PyObject', 'PyQt_PyObject', FileTypeCounter, 'PyQt_PyObject', bool, bool 1690 ) 1691 deviceError = pyqtSignal(int, CameraErrorCode) 1692 deviceDetails = pyqtSignal(int, 'PyQt_PyObject', 'PyQt_PyObject', str) 1693 scanProblems = pyqtSignal(int, 'PyQt_PyObject') 1694 fatalError = pyqtSignal(int) 1695 cameraRemovedDuringScan = pyqtSignal(int) 1696 1697 def __init__(self, logging_port: int) -> None: 1698 super().__init__(logging_port=logging_port, thread_name=ThreadNames.scan) 1699 self._process_name = 'Scan Manager' 1700 self._process_to_run = 'scan.py' 1701 1702 def process_sink_data(self) -> None: 1703 data = pickle.loads(self.content) # type: ScanResults 1704 if data.rpd_files is not None: 1705 assert data.file_type_counter 1706 assert data.file_size_sum 1707 assert data.entire_video_required is not None 1708 assert data.entire_photo_required is not None 1709 self.scannedFiles.emit( 1710 data.rpd_files, 1711 (data.sample_photo, data.sample_video), 1712 data.file_type_counter, 1713 data.file_size_sum, 1714 data.entire_video_required, 1715 data.entire_photo_required 1716 ) 1717 else: 1718 assert data.scan_id is not None 1719 if data.error_code is not None: 1720 self.deviceError.emit(data.scan_id, data.error_code) 1721 elif data.optimal_display_name is not None: 1722 self.deviceDetails.emit( 1723 data.scan_id, data.storage_space, data.storage_descriptions, 1724 data.optimal_display_name 1725 ) 1726 elif data.problems is not None: 1727 self.scanProblems.emit(data.scan_id, data.problems) 1728 elif data.camera_removed is not None: 1729 self.cameraRemovedDuringScan.emit( 1730 data.scan_id 1731 ) 1732 else: 1733 assert data.fatal_error 1734 self.fatalError.emit(data.scan_id) 1735 1736 1737class BackupManager(PublishPullPipelineManager): 1738 """ 1739 Each backup "device" (it could be an external drive, or a user- 1740 specified path on the local file system) has associated with it one 1741 worker process. For example if photos and videos are both being 1742 backed up to the same external hard drive, one worker process 1743 handles both the photos and the videos. However if photos are being 1744 backed up to one drive, and videos to another, there would be a 1745 worker process for each drive (2 in total). 1746 """ 1747 message = pyqtSignal(int, bool, bool, RPDFile, str, 'PyQt_PyObject') 1748 bytesBackedUp = pyqtSignal('PyQt_PyObject', 'PyQt_PyObject') 1749 backupProblems = pyqtSignal(int, 'PyQt_PyObject') 1750 1751 def __init__(self, logging_port: int) -> None: 1752 super().__init__(logging_port=logging_port, thread_name=ThreadNames.backup) 1753 self._process_name = 'Backup Manager' 1754 self._process_to_run = 'backupfile.py' 1755 1756 def process_sink_data(self) -> None: 1757 data = pickle.loads(self.content) # type: BackupResults 1758 if data.total_downloaded is not None: 1759 assert data.scan_id is not None 1760 assert data.chunk_downloaded >= 0 1761 assert data.total_downloaded >= 0 1762 self.bytesBackedUp.emit(data.scan_id, data.chunk_downloaded) 1763 elif data.backup_succeeded is not None: 1764 assert data.do_backup is not None 1765 assert data.rpd_file is not None 1766 self.message.emit( 1767 data.device_id, data.backup_succeeded, data.do_backup, data.rpd_file, 1768 data.backup_full_file_name, data.mdata_exceptions 1769 ) 1770 else: 1771 assert data.problems is not None 1772 self.backupProblems.emit(data.device_id, data.problems) 1773 1774 1775class CopyFilesManager(PublishPullPipelineManager): 1776 """ 1777 Manage the processes that copy files from devices to the computer 1778 during the download process 1779 """ 1780 1781 message = pyqtSignal(bool, RPDFile, int, 'PyQt_PyObject') 1782 tempDirs = pyqtSignal(int, str,str) 1783 bytesDownloaded = pyqtSignal(int, 'PyQt_PyObject', 'PyQt_PyObject') 1784 copyProblems = pyqtSignal(int, 'PyQt_PyObject') 1785 cameraRemoved = pyqtSignal(int) 1786 1787 def __init__(self, logging_port: int) -> None: 1788 super().__init__(logging_port=logging_port, thread_name=ThreadNames.copy) 1789 self._process_name = 'Copy Files Manager' 1790 self._process_to_run = 'copyfiles.py' 1791 1792 def process_sink_data(self) -> None: 1793 data = pickle.loads(self.content) # type: CopyFilesResults 1794 if data.total_downloaded is not None: 1795 assert data.scan_id is not None 1796 if data.chunk_downloaded < 0: 1797 logging.critical("Chunk downloaded is less than zero: %s", data.chunk_downloaded) 1798 if data.total_downloaded < 0: 1799 logging.critical("Chunk downloaded is less than zero: %s", data.total_downloaded) 1800 1801 self.bytesDownloaded.emit(data.scan_id, data.total_downloaded, data.chunk_downloaded) 1802 1803 elif data.copy_succeeded is not None: 1804 assert data.rpd_file is not None 1805 assert data.download_count is not None 1806 self.message.emit( 1807 data.copy_succeeded, data.rpd_file, data.download_count, data.mdata_exceptions 1808 ) 1809 1810 elif data.problems is not None: 1811 self.copyProblems.emit(data.scan_id, data.problems) 1812 elif data.camera_removed is not None: 1813 self.cameraRemoved.emit(data.scan_id) 1814 1815 else: 1816 assert (data.photo_temp_dir is not None and 1817 data.video_temp_dir is not None) 1818 assert data.scan_id is not None 1819 self.tempDirs.emit(data.scan_id, data.photo_temp_dir, data.video_temp_dir)