1# Copyright (C) 2015-2020 Damon Lynch <damonlynch@gmail.com>
2
3# This file is part of Rapid Photo Downloader.
4#
5# Rapid Photo Downloader is free software: you can redistribute it and/or
6# modify it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# Rapid Photo Downloader is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with Rapid Photo Downloader.  If not,
17# see <http://www.gnu.org/licenses/>.
18
19__author__ = 'Damon Lynch'
20__copyright__ = "Copyright 2015-2020, Damon Lynch"
21
22import argparse
23import sys
24import logging
25import pickle
26import os
27import shlex
28import time
29from collections import deque, namedtuple
30from typing import Optional, Set, List, Dict, Sequence, Any, Tuple, Union
31
32
33import psutil
34
35from PyQt5.QtCore import (pyqtSignal, QObject, pyqtSlot)
36from PyQt5.QtGui import (QPixmap, QImage)
37
38import zmq
39import zmq.log.handlers
40if zmq.pyzmq_version_info()[0] < 17:
41    from zmq.eventloop import ioloop
42else:
43    try:
44        from tornado import ioloop
45    except ImportError:
46        from zmq.eventloop import ioloop  # note: deprecated in pyzmq 17.0.0
47
48from zmq.eventloop.zmqstream import ZMQStream
49
50from raphodo.rpdfile import RPDFile, FileTypeCounter, FileSizeSum, Photo, Video
51from raphodo.devices import Device
52from raphodo.utilities import CacheDirs, set_pdeathsig
53from raphodo.constants import (
54    RenameAndMoveStatus, ExtractionTask, ExtractionProcessing, CameraErrorCode, FileType,
55    FileExtension, BackupStatus
56)
57from raphodo.proximity import TemporalProximityGroups
58from raphodo.storage import StorageSpace
59from raphodo.iplogging import ZeroMQSocketHandler
60from raphodo.viewutils import ThumbnailDataForProximity
61from raphodo.folderspreview import DownloadDestination, FoldersPreview
62from raphodo.problemnotification import (
63    ScanProblems, CopyingProblems, RenamingProblems, BackingUpProblems
64)
65
66logger = logging.getLogger()
67
68
69def make_filter_from_worker_id(worker_id: Union[int, str]) -> bytes:
70    r"""
71    Returns a python byte string from an integer or string
72
73    >>> make_filter_from_worker_id(54)
74    b'54'
75
76    >>> make_filter_from_worker_id('54')
77    b'54'
78    """
79    if isinstance(worker_id, int):
80        return str(worker_id).encode()
81    if isinstance(worker_id, str):
82        return worker_id.encode()
83    raise(TypeError)
84
85
86def create_identity(worker_type: str, identity: str) -> bytes:
87    r"""Generate identity for a worker's 0mq socket.
88
89    >>> create_identity('Worker', '1')
90    b'Worker-1'
91    >>> create_identity('Thumbnail Extractor', '2')
92    b'Thumbnail-Extractor-2'
93    >>> create_identity('Thumbnail Extractor Plus', '22 2')
94    b'Thumbnail-Extractor-Plus-22-2'
95    """
96
97    # Replace any whitespace in the strings with a hyphen
98    return '{}-{}'.format('-'.join(worker_type.split()), '-'.join(identity.split())).encode()
99
100
101def get_worker_id_from_identity(identity: bytes) -> int:
102    r"""Extract worker id from the identity used in a 0mq socket
103
104    >>> get_worker_id_from_identity(b'Worker-1')
105    1
106    >>> get_worker_id_from_identity(b'Thumbnail-Extractor-2')
107    2
108    >>> get_worker_id_from_identity(b'Thumbnail-Extractor-Plus-22-2')
109    2
110    """
111    return int(identity.decode().split('-')[-1])
112
113
114def create_inproc_msg(cmd: bytes,
115                      worker_id: Optional[int]=None,
116                      data: Optional[Any]=None) -> List[bytes]:
117    """
118    Create a list of three values to be sent via a PAIR socket
119    between main and child threads using 0MQ.
120    """
121
122    if worker_id is not None:
123        worker_id = make_filter_from_worker_id(worker_id)
124    else:
125        worker_id = b''
126
127    if data is None:
128        data = b''
129    else:
130        data = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
131
132    return [cmd, worker_id, data]
133
134
135class ThreadNames:
136    rename = 'rename'
137    scan = 'scan'
138    copy = 'copy'
139    backup = 'backup'
140    thumbnail_daemon = 'thumbnail_daemon'
141    thumbnailer = 'thumbnailer'
142    offload = 'offload'
143    logger = 'logger'
144    load_balancer = 'load_balancer'
145    new_version = 'new_version'
146
147
148class ProcessManager:
149    def __init__(self, logging_port: int,
150                 thread_name: str) -> None:
151
152        super().__init__()
153
154        self.logging_port = logging_port
155
156        self.processes = {}  # type: Dict[int, psutil.Process]
157        self._process_to_run = ''  # Implement in subclass
158
159        self.thread_name = thread_name
160
161        # Monitor which workers we have running
162        self.workers = []  # type: List[int]
163
164    def _get_cmd(self) -> str:
165        return '{} {}'.format(
166            sys.executable, os.path.join(
167                os.path.abspath(os.path.dirname(__file__)), self._process_to_run
168            )
169        )
170
171    def _get_command_line(self, worker_id: int) -> str:
172        """
173        Implement in subclass
174        """
175        return ''
176
177    def add_worker(self, worker_id: int) -> None:
178
179        command_line = self._get_command_line(worker_id)
180        args = shlex.split(command_line)
181
182        # run command immediately, without waiting a reply, and instruct the Linux
183        # kernel to send a terminate signal should this process unexpectedly die
184        try:
185            proc = psutil.Popen(args, preexec_fn=set_pdeathsig())
186        except OSError as e:
187            logging.critical("Failed to start process: %s", command_line)
188            logging.critical('OSError [Errno %s]: %s', e.errno, e.strerror)
189            if e.errno == 8:
190                logging.critical(
191                    "Script shebang line might be malformed or missing: %s", self._get_cmd()
192                )
193            sys.exit(1)
194        logging.debug("Started '%s' with pid %s", command_line, proc.pid)
195
196        # Add to list of running workers
197        self.workers.append(worker_id)
198        self.processes[worker_id] = proc
199
200    def forcefully_terminate(self) -> None:
201        """
202        Forcefully terminate any running child processes.
203        """
204
205        zombie_processes = [
206            p for p in self.processes.values()
207            if p.is_running() and p.status() == psutil.STATUS_ZOMBIE
208        ]
209        running_processes = [
210            p for p in self.processes.values()
211            if p.is_running() and p.status() != psutil.STATUS_ZOMBIE
212        ]
213        if hasattr(self, '_process_name'):
214            logging.debug(
215                "Forcefully terminating processes for %s: %s zombies, %s running.",
216                self._process_name, len(zombie_processes), len(running_processes)
217            )
218
219        for p in zombie_processes:  # type: psutil.Process
220            try:
221                logging.debug("Killing zombie process %s with pid %s", p.name(), p.pid)
222                p.kill()
223            except:
224                logging.error("Failed to kill process with pid %s", p.pid)
225        for p in running_processes:  # type: psutil.Process
226            try:
227                logging.debug("Terminating process %s with pid %s", p.name(), p.pid)
228                p.terminate()
229            except:
230                logging.error("Terminating process with pid %s failed", p.pid)
231        gone, alive = psutil.wait_procs(running_processes, timeout=2)
232        for p in alive:
233            try:
234                logging.debug("Killing zombie process %s with pid %s", p.name(), p.pid)
235                p.kill()
236            except:
237                logging.error("Failed to kill process with pid %s", p.pid)
238
239    def process_alive(self, worker_id: int) -> bool:
240        """
241        Process IDs are reused by the system. Check to make sure
242        a new process has not been created with the same process id.
243
244        :param worker_id: the process to check
245        :return True if the process is the same, False otherwise
246        """
247
248        return self.processes[worker_id].is_running()
249
250
251class PullPipelineManager(ProcessManager, QObject):
252    """
253    Base class from which more specialized 0MQ classes are derived.
254
255    Receives data into its sink via a ZMQ PULL socket, but does not
256    specify how workers should be sent data.
257
258    Outputs signals using Qt.
259    """
260
261    message = pyqtSignal(str) # Derived class will change this
262    sinkStarted = pyqtSignal()
263    workerFinished = pyqtSignal(int)
264    workerStopped = pyqtSignal(int)
265    receiverPortSignal = pyqtSignal(int)
266
267    def __init__(self, logging_port: int,
268                 thread_name: str) -> None:
269        super().__init__(logging_port=logging_port, thread_name=thread_name)
270
271    def _start_sockets(self) -> None:
272
273        context = zmq.Context.instance()
274
275        # Subclasses must define the type of port they need to send messages
276        self.ventilator_socket = None
277        self.ventilator_port = None
278
279        # Sink socket to receive results of the workers
280        self.receiver_socket = context.socket(zmq.PULL)
281        self.receiver_port = self.receiver_socket.bind_to_random_port('tcp://*')
282
283        # Socket to communicate directly with the sink, bypassing the workers
284        self.terminate_socket = context.socket(zmq.PUSH)
285        self.terminate_socket.connect("tcp://localhost:{}".format(self.receiver_port))
286
287        # Socket to receive commands from main thread
288        self.thread_controller = context.socket(zmq.PAIR)
289        self.thread_controller.connect('inproc://{}'.format(self.thread_name))
290
291        self.terminating = False
292
293    @pyqtSlot()
294    def run_sink(self) -> None:
295        logging.debug("Running sink for %s", self._process_name)
296
297        self._start_sockets()
298
299        poller = zmq.Poller()
300        poller.register(self.receiver_socket, zmq.POLLIN)
301        poller.register(self.thread_controller, zmq.POLLIN)
302
303        self.receiverPortSignal.emit(self.receiver_port)
304        self.sinkStarted.emit()
305
306        while True:
307            try:
308                socks = dict(poller.poll())
309            except KeyboardInterrupt:
310                break
311            if self.receiver_socket in socks:
312                # Receive messages from the workers
313                # (or the terminate socket)
314                worker_id, directive, content = self.receiver_socket.recv_multipart()
315
316                if directive == b'cmd':
317                    command = content
318                    assert command in (b"STOPPED", b"FINISHED", b"KILL")
319                    if command == b"KILL":
320                        # Terminate immediately, without regard for any
321                        # incoming messages. This message is only sent
322                        # from this manager to itself, using the
323                        # self.terminate_socket
324                        logging.debug("{} is terminating".format(self._process_name))
325                        break
326                    # This worker is done; remove from monitored workers and
327                    # continue
328                    worker_id = int(worker_id)
329                    if command == b"STOPPED":
330                        logging.debug("%s worker %s has stopped", self._process_name, worker_id)
331                        self.workerStopped.emit(worker_id)
332                    else:
333                        # Worker has finished its work
334                        self.workerFinished.emit(worker_id)
335                    self.workers.remove(worker_id)
336                    del self.processes[worker_id]
337                    if not self.workers:
338                        logging.debug("{} currently has no workers".format(self._process_name))
339                    if not self.workers and self.terminating:
340                        logging.debug("{} is exiting".format(self._process_name))
341                        break
342                else:
343                    assert directive == b'data'
344                    self.content = content
345                    self.process_sink_data()
346
347            if self.thread_controller in socks:
348                # Receive messages from the main Rapid Photo Downloader thread
349                self.process_thread_directive()
350
351    def process_thread_directive(self) -> None:
352        directive, worker_id, data = self.thread_controller.recv_multipart()
353
354        # Directives: START, STOP, TERMINATE, SEND_TO_WORKER, STOP_WORKER, START_WORKER
355        if directive == b'START':
356            self.start()
357        elif directive == b'START_WORKER':
358            self.start_worker(worker_id=worker_id, data=data)
359        elif directive == b'SEND_TO_WORKER':
360            self.send_message_to_worker(worker_id=worker_id, data=data)
361        elif directive == b'STOP':
362            self.stop()
363        elif directive == b'STOP_WORKER':
364            self.stop_worker(worker_id=worker_id)
365        elif directive == b'PAUSE':
366            self.pause()
367        elif directive == b'RESUME':
368            self.resume(worker_id=worker_id)
369        elif directive == b'TERMINATE':
370            self.forcefully_terminate()
371        else:
372            logging.critical("%s received unknown directive %s", directive.decode())
373
374    def process_sink_data(self) -> None:
375        data = pickle.loads(self.content)
376        self.message.emit(data)
377
378    def terminate_sink(self) -> None:
379        self.terminate_socket.send_multipart([b'0', b'cmd', b'KILL'])
380
381    def _get_ventilator_start_message(self, worker_id: bytes) -> list:
382        return [worker_id, b'cmd', b'START']
383
384    def start(self) -> None:
385        logging.critical(
386            "Member function start() not implemented in child class of %s", self._process_name
387        )
388
389    def start_worker(self, worker_id: bytes, data: bytes) -> None:
390        logging.critical(
391            "Member function start_worker() not implemented in child class of %s",
392            self._process_name
393        )
394
395    def stop(self) -> None:
396        logging.critical(
397            "Member function stop() not implemented in child class of %s", self._process_name
398        )
399
400    def stop_worker(self, worker_id: int) -> None:
401        logging.critical(
402            "Member function stop_worker() not implemented in child class of %s",
403            self._process_name
404        )
405
406    def pause(self) -> None:
407        logging.critical("Member function pause() not implemented in child class of %s",
408                         self._process_name)
409
410    def resume(self, worker_id: Optional[bytes]) -> None:
411        logging.critical(
412            "Member function stop_worker() not implemented in child class of %s", self._process_name
413        )
414
415    def send_message_to_worker(self, data: bytes, worker_id:Optional[bytes]=None) -> None:
416        if self.terminating:
417            logging.debug(
418                "%s not sending message to worker because manager is terminated", self._process_name
419            )
420            return
421        if not self.workers:
422            logging.debug(
423                "%s not sending message to worker because there are no workers", self._process_name
424            )
425            return
426
427        assert isinstance(data, bytes)
428
429        if worker_id:
430            message = [worker_id, b'data', data]
431        else:
432            message = [b'data', data]
433        self.ventilator_socket.send_multipart(message)
434
435    def forcefully_terminate(self) -> None:
436        """
437        Forcefully terminate any child processes and clean up.
438
439        Shuts down the sink too.
440        """
441
442        super().forcefully_terminate()
443        self.terminate_sink()
444
445
446class LoadBalancerWorkerManager(ProcessManager):
447    def __init__(self, no_workers: int,
448                 backend_port: int,
449                 sink_port: int,
450                 logging_port: int) -> None:
451        super().__init__(logging_port=logging_port, thread_name='')
452        self.no_workers = no_workers
453        self.backend_port = backend_port
454        self.sink_port = sink_port
455
456    def _get_command_line(self, worker_id: int) -> str:
457        cmd = self._get_cmd()
458
459        return '{} --request {} --send {} --identity {} --logging {}'.format(
460            cmd,
461            self.backend_port,
462            self.sink_port,
463            worker_id,
464            self.logging_port
465        )
466
467    def start_workers(self) -> None:
468        for worker_id in range(self.no_workers):
469            self.add_worker(worker_id)
470
471    def zombie_workers(self) -> List[int]:
472        return [
473            worker_id for worker_id in self.workers
474            if self.processes[worker_id].status() == psutil.STATUS_ZOMBIE
475        ]
476
477
478class LRUQueue:
479    """LRUQueue class using ZMQStream/IOLoop for event dispatching"""
480
481    def __init__(self, backend_socket: zmq.Socket,
482                 frontend_socket: zmq.Socket,
483                 controller_socket: zmq.Socket,
484                 worker_type: str,
485                 process_manager: LoadBalancerWorkerManager) -> None:
486
487        self.worker_type = worker_type
488        self.process_manager = process_manager
489        self.workers = deque()
490        self.terminating = False
491        self.terminating_workers = set()  # type: Set[bytes]
492        self.stopped_workers = set()  # type: Set[int]
493
494        self.backend = ZMQStream(backend_socket)
495        self.frontend = ZMQStream(frontend_socket)
496        self.controller = ZMQStream(controller_socket)
497        self.backend.on_recv(self.handle_backend)
498        self.controller.on_recv(self.handle_controller)
499
500        self.loop = ioloop.IOLoop.instance()
501
502    def handle_controller(self, msg):
503        self.terminating = True
504
505        while len(self.workers):
506            worker_identity = self.workers.popleft()
507
508            logging.debug(
509                "%s load balancer sending stop cmd to worker %s",
510                self.worker_type, worker_identity.decode()
511            )
512            self.backend.send_multipart([worker_identity, b'', b'cmd', b'STOP'])
513            self.terminating_workers.add(worker_identity)
514
515        self.loop.add_timeout(time.time()+3, self.loop.stop)
516
517    def handle_backend(self, msg):
518        # Queue worker address for LRU routing
519        worker_identity, empty, client_addr = msg[:3]
520
521        # add worker back to the list of workers
522        self.workers.append(worker_identity)
523
524        zw = self.process_manager.zombie_workers()
525        if zw:
526            logging.critical("%s dead thumbnail extractors", len(zw))
527
528        # Second frame is empty
529        assert empty == b''
530
531        if msg[-1] == b'STOPPED' and self.terminating:
532            worker_id = get_worker_id_from_identity(worker_identity)
533            self.stopped_workers.add(worker_id)
534            self.terminating_workers.remove(worker_identity)
535            if len(self.terminating_workers) == 0:
536                for worker_id in self.stopped_workers:
537                    p = self.process_manager.processes[worker_id]  # type: psutil.Process
538                    if p.is_running():
539                        pid = p.pid
540                        if p.status() != psutil.STATUS_SLEEPING:
541                            logging.debug(
542                                "Waiting on %s process %s...", p.status(), pid
543                            )
544                            os.waitpid(pid, 0)
545                            logging.debug("...process %s is finished", pid)
546                        else:
547                            logging.debug("Process %s is sleeping", pid)
548                self.loop.add_timeout(time.time()+0.5, self.loop.stop)
549
550        if len(self.workers) == 1:
551            # on first recv, start accepting frontend messages
552            self.frontend.on_recv(self.handle_frontend)
553
554    def handle_frontend(self, request):
555        #  Dequeue and drop the next worker address
556        worker_identity = self.workers.popleft()
557
558        message = [worker_identity, b''] + request
559        self.backend.send_multipart(message)
560        if len(self.workers) == 0:
561            # stop receiving until workers become available again
562            self.frontend.stop_on_recv()
563
564
565class LoadBalancer:
566    def __init__(self, worker_type: str, process_manager) -> None:
567
568        self.parser = argparse.ArgumentParser()
569        self.parser.add_argument("--receive", required=True)
570        self.parser.add_argument("--send", required=True)
571        self.parser.add_argument("--controller", required=True)
572        self.parser.add_argument("--logging", required=True)
573
574        args = self.parser.parse_args()
575        self.controller_port = args.controller
576
577        context = zmq.Context()
578        frontend = context.socket(zmq.PULL)
579        frontend_port = frontend.bind_to_random_port('tcp://*')
580
581        backend = context.socket(zmq.ROUTER)
582        backend_port = backend.bind_to_random_port('tcp://*')
583
584        reply = context.socket(zmq.REP)
585        reply.connect("tcp://localhost:{}".format(args.receive))
586
587        controller = context.socket(zmq.PULL)
588        controller.connect('tcp://localhost:{}'.format(self.controller_port))
589
590        sink_port = args.send
591        logging_port = args.logging
592
593        self.logger_publisher = ProcessLoggerPublisher(
594            context=context, name=worker_type, notification_port=args.logging
595        )
596
597        logging.debug(
598            "{} load balancer waiting to be notified how many workers to initialize...".format(
599                worker_type
600            )
601        )
602        no_workers = int(reply.recv())
603        logging.debug("...{} load balancer will use {} workers".format(worker_type, no_workers))
604        reply.send(str(frontend_port).encode())
605
606        process_manager = process_manager(no_workers, backend_port, sink_port, logging_port)
607        process_manager.start_workers()
608
609        # create queue with the sockets
610        queue = LRUQueue(backend, frontend, controller, worker_type, process_manager)
611
612        # start reactor, which is an infinite loop
613        ioloop.IOLoop.instance().start()
614
615        # Finished infinite loop: do some housekeeping
616        logging.debug("Forcefully terminating load balancer child processes")
617        process_manager.forcefully_terminate()
618
619        frontend.close()
620        backend.close()
621
622
623class LoadBalancerManager(ProcessManager, QObject):
624    """
625    Launches and requests termination of the Load Balancer process
626    """
627
628    load_balancer_started = pyqtSignal(int)
629    def __init__(self, context: zmq.Context,
630                 no_workers: int,
631                 sink_port: int,
632                 logging_port: int,
633                 thread_name: str) -> None:
634        super().__init__(logging_port=logging_port, thread_name=thread_name)
635        self.no_workers = no_workers
636        self.sink_port = sink_port
637        self.context = context
638
639    @pyqtSlot()
640    def start_load_balancer(self) -> None:
641
642        self.controller_socket = self.context.socket(zmq.PUSH)
643        self.controller_port = self.controller_socket.bind_to_random_port('tcp://*')
644
645        self.requester = self.context.socket(zmq.REQ)
646        self.requester_port = self.requester.bind_to_random_port('tcp://*')
647
648        self.thread_controller = self. context.socket(zmq.PAIR)
649        self.thread_controller.connect('inproc://{}'.format(self.thread_name))
650
651        worker_id = 0
652        self.add_worker(worker_id)
653        self.requester.send(str(self.no_workers).encode())
654        self.frontend_port = int(self.requester.recv())
655        self.load_balancer_started.emit(self.frontend_port)
656
657        # wait for stop signal
658        directive, worker_id, data = self.thread_controller.recv_multipart()
659        assert directive == b'STOP'
660        self.stop()
661
662    def stop(self):
663        self.controller_socket.send(b'STOP')
664
665    def _get_command_line(self, worker_id: int) -> str:
666        cmd = self._get_cmd()
667
668        return '{} --receive {} --send {} --controller {} --logging {}'.format(
669            cmd,
670            self.requester_port,
671            self.sink_port,
672            self.controller_port,
673            self.logging_port
674        )
675
676DAEMON_WORKER_ID = 0
677
678
679class PushPullDaemonManager(PullPipelineManager):
680    """
681    Manage a single instance daemon worker process that waits to work on data
682    issued by this manager. The data to be worked on is issued in sequence,
683    one after the other.
684
685    Because this is a single daemon process, a Push-Pull model is most
686    suitable for sending the data.
687    """
688
689    def _start_sockets(self) -> None:
690
691        super()._start_sockets()
692
693        context = zmq.Context.instance()
694
695        # Ventilator socket to send message to worker
696        self.ventilator_socket = context.socket(zmq.PUSH)
697        self.ventilator_port = self.ventilator_socket.bind_to_random_port('tcp://*')
698
699    def stop(self) -> None:
700        """
701        Permanently stop the daemon process and terminate
702        """
703
704        logging.debug("{} halting".format(self._process_name))
705        self.terminating = True
706
707        # Only send stop command if the process is still running
708        if self.process_alive(DAEMON_WORKER_ID):
709            try:
710                self.ventilator_socket.send_multipart([b'cmd', b'STOP'], zmq.DONTWAIT)
711            except zmq.Again:
712                logging.debug(
713                    "Terminating %s sink because child process did not receive message",
714                    self._process_name
715                )
716                self.terminate_sink()
717        else:
718            # The process may have crashed. Stop the sink.
719            self.terminate_sink()
720
721    def _get_command_line(self, worker_id: int) -> str:
722        cmd = self._get_cmd()
723
724        return '{} --receive {} --send {} --logging {}'.format(
725            cmd,
726            self.ventilator_port,
727            self.receiver_port,
728            self.logging_port
729        )
730
731    def _get_ventilator_start_message(self, worker_id: int) -> List[bytes]:
732        return [b'cmd', b'START']
733
734    def start(self) -> None:
735        logging.debug("Starting worker for %s", self._process_name)
736        self.add_worker(worker_id=DAEMON_WORKER_ID)
737
738
739class PublishPullPipelineManager(PullPipelineManager):
740    """
741    Manage a collection of worker processes that wait to work on data
742    issued by this manager. The data to be worked on is issued in sequence,
743    one after the other, either once, or many times.
744
745    Because there are multiple worker process, a Publish-Subscribe model is
746    most suitable for sending data to workers.
747    """
748
749    def _start_sockets(self) -> None:
750
751        super()._start_sockets()
752
753        context = zmq.Context.instance()
754
755        # Ventilator socket to send messages to workers on
756        self.ventilator_socket = context.socket(zmq.PUB)
757        self.ventilator_port= self.ventilator_socket.bind_to_random_port('tcp://*')
758
759        # Socket to synchronize the start of each worker
760        self.sync_service_socket = context.socket(zmq.REP)
761        self.sync_service_port = self.sync_service_socket.bind_to_random_port("tcp://*")
762
763        # Socket for worker control: pause, resume, stop
764        self.controller_socket = context.socket(zmq.PUB)
765        self.controller_port = self.controller_socket.bind_to_random_port("tcp://*")
766
767    def stop(self) -> None:
768        """
769        Permanently stop all the workers and terminate
770        """
771
772        logging.debug("{} halting".format(self._process_name))
773        self.terminating = True
774        if self.workers:
775            # Signal workers they must immediately stop
776            termination_signal_sent = False
777            alive_workers = [worker_id for worker_id in self.workers if
778                             self.process_alive(worker_id)]
779            for worker_id in alive_workers:
780
781                message = [make_filter_from_worker_id(worker_id),b'STOP']
782                self.controller_socket.send_multipart(message)
783
784                message = [make_filter_from_worker_id(worker_id), b'cmd', b'STOP']
785                self.ventilator_socket.send_multipart(message)
786                termination_signal_sent = True
787
788            if not termination_signal_sent:
789                self.terminate_sink()
790        else:
791            self.terminate_sink()
792
793    def stop_worker(self, worker_id: bytes) -> None:
794        """
795        Permanently stop one worker
796        """
797
798        if int(worker_id) in self.workers:
799            message = [worker_id, b'STOP']
800            self.controller_socket.send_multipart(message)
801            message = [worker_id, b'cmd', b'STOP']
802            self.ventilator_socket.send_multipart(message)
803
804    def start_worker(self, worker_id: bytes, data: bytes) -> None:
805
806        self.add_worker(int(worker_id))
807
808        # Send START commands until scan worker indicates it is ready to
809        # receive data
810        # Worker ID must be in bytes format
811        while True:
812            self.ventilator_socket.send_multipart(
813                self._get_ventilator_start_message(worker_id))
814            try:
815                # look for synchronization request
816                self.sync_service_socket.recv(zmq.DONTWAIT)
817                # send synchronization reply
818                self.sync_service_socket.send(b'')
819                break
820            except zmq.Again:
821                # Briefly pause sending out START messages
822                # There is no point flooding the network
823                time.sleep(.01)
824
825        # Send data to process to tell it what to work on
826        self.send_message_to_worker(data=data, worker_id=worker_id)
827
828    def _get_command_line(self, worker_id: int) -> str:
829        cmd = self._get_cmd()
830
831        return '{} --receive {} --send {} --controller {} --syncclient {} --filter {} --logging '\
832               '{}'.format(
833            cmd,
834            self.ventilator_port,
835            self.receiver_port,
836            self.controller_port,
837            self.sync_service_port,
838            worker_id,
839            self.logging_port
840        )
841
842    def __len__(self) -> int:
843        return len(self.workers)
844
845    def __contains__(self, item) -> bool:
846        return item in self.workers
847
848    def pause(self) -> None:
849        for worker_id in self.workers:
850            message = [make_filter_from_worker_id(worker_id), b'PAUSE']
851            self.controller_socket.send_multipart(message)
852
853    def resume(self, worker_id: bytes) -> None:
854        if worker_id:
855            workers = [int(worker_id)]
856        else:
857            workers = self.workers
858        for worker_id in workers:
859            message = [make_filter_from_worker_id(worker_id), b'RESUME']
860            self.controller_socket.send_multipart(message)
861
862
863class ProcessLoggerPublisher:
864    """
865    Setup the sockets for worker processes to send log messages to the
866    main process.
867
868    Two tasks: set up the PUB socket, and then tell the main process
869    what port we're using via a second socket, and when we're closing it.
870    """
871
872    def __init__(self, context: zmq.Context, name: str, notification_port: int) -> None:
873
874        self.logger_pub = context.socket(zmq.PUB)
875        self.logger_pub_port = self.logger_pub.bind_to_random_port("tcp://*")
876        self.handler = ZeroMQSocketHandler(self.logger_pub)
877        self.handler.setLevel(logging.DEBUG)
878
879        self.logger = logging.getLogger()
880        self.logger.setLevel(logging.DEBUG)
881        self.logger.addHandler(self.handler)
882
883        self.logger_socket = context.socket(zmq.PUSH)
884        self.logger_socket.connect("tcp://localhost:{}".format(notification_port))
885        self.logger_socket.send_multipart([b'CONNECT', str(self.logger_pub_port).encode()])
886
887    def close(self):
888        self.logger.removeHandler(self.handler)
889        self.logger_socket.send_multipart([b'DISCONNECT', str(self.logger_pub_port).encode()])
890        self.logger_pub.close()
891        self.logger_socket.close()
892
893
894class WorkerProcess():
895    def __init__(self, worker_type: str) -> None:
896        super().__init__()
897        self.parser = argparse.ArgumentParser()
898        self.parser.add_argument("--receive", required=True)
899        self.parser.add_argument("--send", required=True)
900        self.parser.add_argument("--logging", required=True)
901
902    def cleanup_pre_stop(self) -> None:
903        """
904        Operations to run if process is stopped.
905
906        Implement in child class if needed.
907        """
908
909        pass
910
911    def setup_logging_pub(self, notification_port: int, name: str) -> None:
912        """
913        Sets up the 0MQ socket that sends out logging messages
914
915        :param notification_port: port that should be notified about
916         the new logging publisher
917        :param name: descriptive name to place in the log messages
918        """
919
920        if self.worker_id is not None:
921            name = '{}-{}'.format(name, self.worker_id.decode())
922        self.logger_publisher = ProcessLoggerPublisher(
923            context=self.context, name=name, notification_port=notification_port
924        )
925
926    def send_message_to_sink(self) -> None:
927
928        self.sender.send_multipart([self.worker_id, b'data', self.content])
929
930    def initialise_process(self) -> None:
931        # Wait to receive "START" message
932        worker_id, directive, content = self.receiver.recv_multipart()
933        assert directive == b'cmd'
934        assert content == b'START'
935
936        # send a synchronization request
937        self.sync_client.send(b'')
938
939        # wait for synchronization reply
940        self.sync_client.recv()
941
942        # Receive next "START" message and discard, looking for data message
943        while True:
944            worker_id, directive, content = self.receiver.recv_multipart()
945            if directive == b'data':
946                break
947            else:
948                assert directive == b'cmd'
949                assert content == b'START'
950
951        self.content = content
952
953    def do_work(self):
954        pass
955
956
957class DaemonProcess(WorkerProcess):
958    """
959    Single instance
960    """
961    def __init__(self, worker_type: str) -> None:
962        super().__init__(worker_type)
963
964        args = self.parser.parse_args()
965
966        self.context = zmq.Context()
967        # Socket to send messages along the pipe to
968        self.sender = self.context.socket(zmq.PUSH)
969        self.sender.set_hwm(10)
970        self.sender.connect("tcp://localhost:{}".format(args.send))
971
972        self.receiver = self.context.socket(zmq.PULL)
973        self.receiver.connect("tcp://localhost:{}".format(args.receive))
974
975        self.worker_id = None
976
977        self.setup_logging_pub(notification_port=args.logging, name=worker_type)
978
979    def run(self) -> None:
980        pass
981
982    def check_for_command(self, directive: bytes, content: bytes) -> None:
983        if directive == b'cmd':
984            assert content == b'STOP'
985            self.cleanup_pre_stop()
986            # signal to sink that we've terminated before finishing
987            self.sender.send_multipart(
988                [make_filter_from_worker_id(DAEMON_WORKER_ID), b'cmd', b'STOPPED']
989            )
990            sys.exit(0)
991
992    def send_message_to_sink(self) -> None:
993        # Must use a dummy value for the worker id, as there is only ever one
994        # instance.
995        self.sender.send_multipart(
996            [make_filter_from_worker_id(DAEMON_WORKER_ID), b'data', self.content]
997        )
998
999
1000class WorkerInPublishPullPipeline(WorkerProcess):
1001    """
1002    Worker counterpart to PublishPullPipelineManager; multiple instance.
1003    """
1004    def __init__(self, worker_type: str) -> None:
1005        super().__init__(worker_type)
1006        self.add_args()
1007
1008        args = self.parser.parse_args()
1009
1010        subscription_filter = self.worker_id = args.filter.encode()
1011        self.context = zmq.Context()
1012
1013        self.setup_sockets(args, subscription_filter)
1014        self.setup_logging_pub(notification_port=args.logging, name=worker_type)
1015
1016        self.initialise_process()
1017        self.do_work()
1018
1019    def add_args(self) -> None:
1020        self.parser.add_argument("--filter", required=True)
1021        self.parser.add_argument("--syncclient", required=True)
1022        self.parser.add_argument("--controller", required=True)
1023
1024    def setup_sockets(self, args, subscription_filter: bytes) -> None:
1025
1026        # Socket to send messages along the pipe to
1027        self.sender = self.context.socket(zmq.PUSH)
1028        self.sender.set_hwm(10)
1029        self.sender.connect("tcp://localhost:{}".format(args.send))
1030
1031        # Socket to receive messages from the pipe
1032        self.receiver = self.context.socket(zmq.SUB)
1033        self.receiver.connect("tcp://localhost:{}".format(args.receive))
1034        self.receiver.setsockopt(zmq.SUBSCRIBE, subscription_filter)
1035
1036        # Socket to receive controller messages: stop, pause, resume
1037        self.controller = self.context.socket(zmq.SUB)
1038        self.controller.connect("tcp://localhost:{}".format(args.controller))
1039        self.controller.setsockopt(zmq.SUBSCRIBE, subscription_filter)
1040
1041        # Socket to synchronize the start of receiving data from upstream
1042        self.sync_client = self.context.socket(zmq.REQ)
1043        self.sync_client.connect("tcp://localhost:{}".format(args.syncclient))
1044
1045    def check_for_command(self, directive: bytes, content) -> None:
1046        if directive == b'cmd':
1047            try:
1048                assert content == b'STOP'
1049            except AssertionError:
1050                logging.critical("Expected STOP command but instead got %s", content.decode())
1051            else:
1052                self.cleanup_pre_stop()
1053                self.disconnect_logging()
1054                # signal to sink that we've terminated before finishing
1055                self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED'])
1056                sys.exit(0)
1057
1058    def check_for_controller_directive(self) -> None:
1059        try:
1060            # Don't block if process is running regularly
1061            # If there is no command,exception will occur
1062            worker_id, command = self.controller.recv_multipart(zmq.DONTWAIT)
1063            assert command in [b'PAUSE', b'STOP']
1064            assert worker_id == self.worker_id
1065
1066            if command == b'PAUSE':
1067                # Because the process is paused, do a blocking read to
1068                # wait for the next command
1069                worker_id, command = self.controller.recv_multipart()
1070                assert (command in [b'RESUME', b'STOP'])
1071            if command == b'STOP':
1072                self.cleanup_pre_stop()
1073                # before finishing, signal to sink that we've terminated
1074                self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED'])
1075                sys.exit(0)
1076        except zmq.Again:
1077            pass # Continue working
1078
1079    def resume_work(self) -> None:
1080        worker_id, command = self.controller.recv_multipart()
1081        assert (command in [b'RESUME', b'STOP'])
1082        if command == b'STOP':
1083            self.cleanup_pre_stop()
1084            self.disconnect_logging()
1085            # before finishing, signal to sink that we've terminated
1086            self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED'])
1087            sys.exit(0)
1088
1089    def disconnect_logging(self) -> None:
1090        self.logger_publisher.close()
1091
1092    def send_finished_command(self) -> None:
1093        self.sender.send_multipart([self.worker_id, b'cmd', b'FINISHED'])
1094
1095
1096class LoadBalancerWorker:
1097    def __init__(self, worker_type: str) -> None:
1098        super().__init__()
1099        self.parser = argparse.ArgumentParser()
1100        self.parser.add_argument("--request", required=True)
1101        self.parser.add_argument("--send", required=True)
1102        self.parser.add_argument("--identity", required=True)
1103        self.parser.add_argument("--logging", required=True)
1104
1105        args = self.parser.parse_args()
1106
1107        self.context = zmq.Context()
1108
1109        self.requester = self.context.socket(zmq.REQ)
1110        self.identity = create_identity(worker_type, args.identity)
1111        self.requester.identity = self.identity
1112        self.requester.connect("tcp://localhost:{}".format(args.request))
1113
1114        # Sender is located in the main process. It is where output (messages)
1115        # from this process are are sent to.
1116        self.sender = self.context.socket(zmq.PUSH)
1117        self.sender.connect("tcp://localhost:{}".format(args.send))
1118
1119        self.logger_publisher = ProcessLoggerPublisher(
1120            context=self.context, name=worker_type, notification_port=args.logging
1121        )
1122
1123        # Tell the load balancer we are ready for work
1124        self.requester.send(b"READY")
1125        self.do_work()
1126
1127    def do_work(self) -> None:
1128        # Implement in subclass
1129        pass
1130
1131    def cleanup_pre_stop(self) -> None:
1132        """
1133        Operations to run if process is stopped.
1134
1135        Implement in child class if needed.
1136        """
1137
1138        pass
1139
1140    def exit(self):
1141        self.cleanup_pre_stop()
1142        identity = self.requester.identity.decode()
1143        # signal to load balancer that we've terminated before finishing
1144        self.requester.send_multipart([b'', b'', b'STOPPED'])
1145        self.requester.close()
1146        self.sender.close()
1147        self.logger_publisher.close()
1148        self.context.term()
1149        logging.debug("%s with pid %s stopped", identity, os.getpid())
1150        sys.exit(0)
1151
1152    def check_for_command(self, directive: bytes, content: bytes):
1153        if directive == b'cmd':
1154            assert content == b'STOP'
1155            self.exit()
1156
1157
1158class ProcessLoggingManager(QObject):
1159    """
1160    Receive and log logging messages from workers.
1161
1162    An alternative might be using python logging's QueueListener, which
1163    like this code, runs on its own thread.
1164    """
1165
1166    ready = pyqtSignal(int)
1167
1168    @pyqtSlot()
1169    def startReceiver(self) -> None:
1170        context = zmq.Context.instance()
1171        self.receiver = context.socket(zmq.SUB)
1172        # Subscribe to all variates of logging messages
1173        self.receiver.setsockopt(zmq.SUBSCRIBE, b'')
1174
1175        # Socket to receive subscription information, and the stop command
1176        info_socket = context.socket(zmq.PULL)
1177        self.info_port = info_socket.bind_to_random_port('tcp://*')
1178
1179        poller = zmq.Poller()
1180        poller.register(self.receiver, zmq.POLLIN)
1181        poller.register(info_socket, zmq.POLLIN)
1182
1183        self.ready.emit(self.info_port)
1184
1185        while True:
1186            try:
1187                socks = dict(poller.poll())
1188            except KeyboardInterrupt:
1189                break
1190
1191            if self.receiver in socks:
1192                message = self.receiver.recv()
1193                record = logging.makeLogRecord(pickle.loads(message))
1194                logger.handle(record)
1195
1196            if info_socket in socks:
1197                directive, content = info_socket.recv_multipart()
1198                if directive == b'STOP':
1199                    break
1200                elif directive == b'CONNECT':
1201                    self.addSubscription(content)
1202                else:
1203                    assert directive == b'DISCONNECT'
1204                    self.removeSubscription(content)
1205
1206    def addSubscription(self, port: bytes) -> None:
1207        try:
1208            port = int(port)
1209        except ValueError:
1210            logging.critical('Incorrect port value in add logging subscription: %s', port)
1211        else:
1212            logging.debug("Subscribing to logging on port %s", port)
1213            self.receiver.connect("tcp://localhost:{}".format(port))
1214
1215    def removeSubscription(self, port: bytes):
1216        try:
1217            port = int(port)
1218        except ValueError:
1219            logging.critical('Incorrect port value in remove logging subscription: %s', port)
1220        else:
1221            logging.debug("Unsubscribing to logging on port %s", port)
1222            self.receiver.disconnect("tcp://localhost:{}".format(port))
1223
1224
1225def stop_process_logging_manager(info_port: int) -> None:
1226    """
1227    Stop ProcessLoggingManager thread
1228
1229    :param info_port: the port number the manager uses
1230    """
1231
1232    context = zmq.Context.instance()
1233    command =  context.socket(zmq.PUSH)
1234    command.connect("tcp://localhost:{}".format(info_port))
1235    command.send_multipart([b'STOP', b''])
1236
1237
1238class ScanArguments:
1239    """
1240    Pass arguments to the scan process
1241    """
1242    def __init__(self, device: Device,
1243                 ignore_other_types: bool,
1244                 log_gphoto2: bool) -> None:
1245        """
1246        Pass arguments to the scan process
1247
1248        :param device: the device to scan
1249        :param ignore_other_types: ignore file types like TIFF
1250        :param log_gphoto2: whether to generate detailed gphoto2 log
1251         messages
1252        """
1253
1254        self.device = device
1255        self.ignore_other_types = ignore_other_types
1256        self.log_gphoto2 = log_gphoto2
1257
1258
1259class ScanResults:
1260    """
1261    Receive results from the scan process
1262    """
1263
1264    def __init__(self, rpd_files: Optional[List[RPDFile]]=None,
1265                 file_type_counter: Optional[FileTypeCounter]=None,
1266                 file_size_sum: Optional[FileSizeSum]=None,
1267                 error_code: Optional[CameraErrorCode]=None,
1268                 scan_id: Optional[int]=None,
1269                 optimal_display_name: Optional[str]=None,
1270                 storage_space: Optional[List[StorageSpace]]=None,
1271                 storage_descriptions: Optional[List[str]]=None,
1272                 sample_photo: Optional[Photo]=None,
1273                 sample_video: Optional[Video]=None,
1274                 problems: Optional[ScanProblems]=None,
1275                 fatal_error: Optional[bool]=None,
1276                 camera_removed: Optional[bool]=None,
1277                 entire_video_required: Optional[bool]=None,
1278                 entire_photo_required: Optional[bool]=None) -> None:
1279        self.rpd_files = rpd_files
1280        self.file_type_counter = file_type_counter
1281        self.file_size_sum = file_size_sum
1282        self.error_code = error_code
1283        self.scan_id = scan_id
1284        self.optimal_display_name = optimal_display_name
1285        self.storage_space = storage_space
1286        self.storage_descriptions = storage_descriptions
1287        self.sample_photo = sample_photo
1288        self.sample_video = sample_video
1289        self.problems = problems
1290        self.fatal_error = fatal_error
1291        self.camera_removed = camera_removed
1292        self.entire_video_required = entire_video_required
1293        self.entire_photo_required = entire_photo_required
1294
1295
1296class CopyFilesArguments:
1297    """
1298    Pass arguments to the copyfiles process
1299    """
1300
1301    def  __init__(self, scan_id: int,
1302                  device: Device,
1303                  photo_download_folder: str,
1304                  video_download_folder: str,
1305                  files: List[RPDFile],
1306                  verify_file: bool,
1307                  generate_thumbnails: bool,
1308                  log_gphoto2: bool) -> None:
1309        self.scan_id = scan_id
1310        self.device = device
1311        self.photo_download_folder = photo_download_folder
1312        self.video_download_folder = video_download_folder
1313        self.files = files
1314        self.generate_thumbnails = generate_thumbnails
1315        self.verify_file = verify_file
1316        self.log_gphoto2 = log_gphoto2
1317
1318
1319class CopyFilesResults:
1320    """
1321    Receive results from the copyfiles process
1322    """
1323
1324    def __init__(self, scan_id: Optional[int]=None,
1325                 photo_temp_dir: Optional[str]=None,
1326                 video_temp_dir: Optional[str]=None,
1327                 total_downloaded: Optional[int]=None,
1328                 chunk_downloaded: Optional[int]=None,
1329                 copy_succeeded: Optional[bool]=None,
1330                 rpd_file: Optional[RPDFile]=None,
1331                 download_count: Optional[int]=None,
1332                 mdata_exceptions: Optional[Tuple]=None,
1333                 problems: Optional[CopyingProblems]=None,
1334                 camera_removed: Optional[bool]=None) -> None:
1335        """
1336
1337        :param scan_id: scan id of the device the files are being
1338         downloaded from
1339        :param photo_temp_dir: temp directory path, used to copy
1340         photos into until they're renamed
1341        :param video_temp_dir: temp directory path, used to copy
1342         videos into until they're renamed
1343        :param total_downloaded: how many bytes in total have been
1344         downloaded
1345        :param chunk_downloaded: how many bytes were downloaded since
1346         the last message
1347        :param copy_succeeded: whether the copy was successful or not
1348        :param rpd_file: details of the file that was copied
1349        :param download_count: a running count of how many files
1350         have been copied. Used in download tracking.
1351        :param mdata_exceptions: details of errors setting file metadata
1352        :param problems: details of any problems encountered copying files,
1353         not including metedata write problems.
1354        """
1355
1356        self.scan_id = scan_id
1357
1358        self.photo_temp_dir = photo_temp_dir
1359        self.video_temp_dir = video_temp_dir
1360
1361        self.total_downloaded = total_downloaded
1362        self.chunk_downloaded = chunk_downloaded
1363
1364        self.copy_succeeded = copy_succeeded
1365        self.rpd_file = rpd_file
1366        self.download_count = download_count
1367        self.mdata_exceptions = mdata_exceptions
1368        self.problems = problems
1369        self.camera_removed = camera_removed
1370
1371
1372class ThumbnailDaemonData:
1373    """
1374    Pass arguments to the thumbnail daemon process.
1375
1376    Occurs after a file is downloaded & renamed, and also
1377    after a file is backed up.
1378    """
1379
1380    def __init__(self, frontend_port: Optional[int]=None,
1381                 rpd_file: Optional[RPDFile]=None,
1382                 write_fdo_thumbnail: Optional[bool]=None,
1383                 use_thumbnail_cache: Optional[bool]=None,
1384                 backup_full_file_names: Optional[List[str]]=None,
1385                 fdo_name: Optional[str]=None,
1386                 force_exiftool: Optional[bool]=None) -> None:
1387        self.frontend_port = frontend_port
1388        self.rpd_file = rpd_file
1389        self.write_fdo_thumbnail = write_fdo_thumbnail
1390        self.use_thumbnail_cache = use_thumbnail_cache
1391        self.backup_full_file_names = backup_full_file_names
1392        self.fdo_name = fdo_name
1393        self.force_exiftool = force_exiftool
1394
1395
1396class RenameAndMoveFileData:
1397    """
1398    Pass arguments to the renameandmovefile process
1399    """
1400
1401    def __init__(self, rpd_file: RPDFile=None,
1402                 download_count: int=None,
1403                 download_succeeded: bool=None,
1404                 message: RenameAndMoveStatus=None) -> None:
1405        self.rpd_file = rpd_file
1406        self.download_count = download_count
1407        self.download_succeeded = download_succeeded
1408        self.message = message
1409
1410
1411class RenameAndMoveFileResults:
1412    def __init__(self, move_succeeded: bool=None,
1413                 rpd_file: RPDFile=None,
1414                 download_count: int=None,
1415                 stored_sequence_no: int=None,
1416                 downloads_today: List[str]=None,
1417                 problems: Optional[RenamingProblems]=None) -> None:
1418        self.move_succeeded = move_succeeded
1419        self.rpd_file = rpd_file
1420        self.download_count = download_count
1421        self.stored_sequence_no = stored_sequence_no
1422        self.downloads_today = downloads_today
1423        self.problems = problems
1424
1425
1426class OffloadData:
1427    def __init__(self, thumbnail_rows: Optional[Sequence[ThumbnailDataForProximity]]=None,
1428                 proximity_seconds: int=None,
1429                 rpd_files: Optional[Sequence[RPDFile]]=None,
1430                 strip_characters: Optional[bool]=None,
1431                 folders_preview: Optional[FoldersPreview]=None) -> None:
1432        self.thumbnail_rows = thumbnail_rows
1433        self.proximity_seconds = proximity_seconds
1434        self.rpd_files = rpd_files
1435        self.strip_characters = strip_characters
1436        self.folders_preview = folders_preview
1437
1438
1439class OffloadResults:
1440    def __init__(self, proximity_groups: Optional[TemporalProximityGroups]=None,
1441                 folders_preview: Optional[FoldersPreview]=None) -> None:
1442        self.proximity_groups = proximity_groups
1443        self.folders_preview = folders_preview
1444
1445
1446class BackupArguments:
1447    """
1448    Pass start up data to the back up process
1449    """
1450    def __init__(self, path: str, device_name: str) -> None:
1451        self.path = path
1452        self.device_name = device_name
1453
1454
1455class BackupFileData:
1456    """
1457    Pass file data to the backup process
1458    """
1459    def __init__(self, rpd_file: Optional[RPDFile]=None,
1460                 move_succeeded: Optional[bool]=None,
1461                 do_backup: Optional[bool]=None,
1462                 path_suffix: Optional[str]=None,
1463                 backup_duplicate_overwrite: Optional[bool]=None,
1464                 verify_file: Optional[bool]=None,
1465                 download_count: Optional[int]=None,
1466                 save_fdo_thumbnail: Optional[int]=None,
1467                 message: Optional[BackupStatus]=None) -> None:
1468        self.rpd_file = rpd_file
1469        self.move_succeeded = move_succeeded
1470        self.do_backup = do_backup
1471        self.path_suffix = path_suffix
1472        self.backup_duplicate_overwrite = backup_duplicate_overwrite
1473        self.verify_file = verify_file
1474        self.download_count = download_count
1475        self.save_fdo_thumbnail = save_fdo_thumbnail
1476        self.message = message
1477
1478
1479class BackupResults:
1480    def __init__(self, scan_id: int,
1481                 device_id: int,
1482                 total_downloaded: Optional[int]=None,
1483                 chunk_downloaded: Optional[int]=None,
1484                 backup_succeeded: Optional[bool]=None,
1485                 do_backup: Optional[bool]=None,
1486                 rpd_file: Optional[RPDFile] = None,
1487                 backup_full_file_name: Optional[str]=None,
1488                 mdata_exceptions: Optional[Tuple] = None,
1489                 problems: Optional[BackingUpProblems]=None) -> None:
1490        self.scan_id = scan_id
1491        self.device_id = device_id
1492        self.total_downloaded = total_downloaded
1493        self.chunk_downloaded = chunk_downloaded
1494        self.backup_succeeded = backup_succeeded
1495        self.do_backup = do_backup
1496        self.rpd_file = rpd_file
1497        self.backup_full_file_name = backup_full_file_name
1498        self.mdata_exceptions = mdata_exceptions
1499        self.problems = problems
1500
1501
1502class GenerateThumbnailsArguments:
1503    def __init__(self, scan_id: int,
1504                 rpd_files: List[RPDFile],
1505                 name: str,
1506                 proximity_seconds: int,
1507                 cache_dirs: CacheDirs,
1508                 need_photo_cache_dir: bool,
1509                 need_video_cache_dir: bool,
1510                 frontend_port: int,
1511                 log_gphoto2: bool,
1512                 camera: Optional[str]=None,
1513                 port: Optional[str]=None,
1514                 entire_video_required: Optional[bool]=None,
1515                 entire_photo_required: Optional[bool]=None) -> None:
1516        """
1517        List of files for which thumbnails are to be generated.
1518        All files  are assumed to have the same scan id.
1519        :param scan_id: id of the scan
1520        :param rpd_files: files from which to extract thumbnails
1521        :param name: name of the device
1522        :param proximity_seconds: the time elapsed between consecutive
1523         shots that is used to prioritize the order of thumbnail
1524         generation
1525        :param cache_dirs: the location where the cache directories
1526         should be created
1527        :param need_photo_cache_dir: if True, must use cache dir
1528         to extract photo thumbnail
1529        :param need_video_cache_dir: if True, must use cache dir
1530         to extract video thumbnail
1531        :param frontend_port: port to use to send to load balancer's
1532         front end
1533        :param log_gphoto2: if True, log libgphoto2 logging messages
1534        :param camera: If the thumbnails are being downloaded from a
1535         camera, this is the name of the camera, else None
1536        :param port: If the thumbnails are being downloaded from a
1537         camera, this is the port of the camera, else None
1538        :param entire_video_required: if the entire video is required
1539         to extract the thumbnail
1540        :param entire_photo_required: if the entire photo is required
1541         to extract the thumbnail
1542        """
1543
1544        self.rpd_files = rpd_files
1545        self.scan_id = scan_id
1546        self.name = name
1547        self.proximity_seconds = proximity_seconds
1548        self.cache_dirs = cache_dirs
1549        self.need_photo_cache_dir = need_photo_cache_dir
1550        self.need_video_cache_dir = need_video_cache_dir
1551        self.frontend_port = frontend_port
1552        if camera is not None:
1553            assert port is not None
1554            assert entire_video_required is not None
1555        self.camera = camera
1556        self.port = port
1557        self.log_gphoto2 = log_gphoto2
1558        self.entire_video_required = entire_video_required
1559        self.entire_photo_required = entire_photo_required
1560
1561
1562class GenerateThumbnailsResults:
1563    def __init__(self, rpd_file: Optional[RPDFile]=None,
1564                 thumbnail_bytes: Optional[bytes]=None,
1565                 scan_id: Optional[int]=None,
1566                 cache_dirs: Optional[CacheDirs]=None,
1567                 camera_removed: Optional[bool]=None,) -> None:
1568        self.rpd_file = rpd_file
1569        # If thumbnail_bytes is None, there is no thumbnail
1570        self.thumbnail_bytes = thumbnail_bytes
1571        self.scan_id = scan_id
1572        self.cache_dirs = cache_dirs
1573        self.camera_removed = camera_removed
1574
1575
1576class ThumbnailExtractorArgument:
1577    def __init__(self, rpd_file: RPDFile,
1578                 task: ExtractionTask,
1579                 processing: Set[ExtractionProcessing],
1580                 full_file_name_to_work_on: str,
1581                 secondary_full_file_name: str,
1582                 exif_buffer: Optional[bytearray],
1583                 thumbnail_bytes: bytes,
1584                 use_thumbnail_cache: bool,
1585                 file_to_work_on_is_temporary: bool,
1586                 write_fdo_thumbnail: bool,
1587                 send_thumb_to_main: bool,
1588                 force_exiftool: bool) -> None:
1589        self.rpd_file = rpd_file
1590        self.task = task
1591        self.processing = processing
1592        self.full_file_name_to_work_on = full_file_name_to_work_on
1593        self.secondary_full_file_name = secondary_full_file_name
1594        self.file_to_work_on_is_temporary = file_to_work_on_is_temporary
1595        self.exif_buffer = exif_buffer
1596        self.thumbnail_bytes = thumbnail_bytes
1597        self.use_thumbnail_cache = use_thumbnail_cache
1598        self.write_fdo_thumbnail = write_fdo_thumbnail
1599        self.send_thumb_to_main = send_thumb_to_main
1600        self.force_exiftool = force_exiftool
1601
1602
1603class RenameMoveFileManager(PushPullDaemonManager):
1604    """
1605    Manages the single instance daemon process that renames and moves
1606    files that have just been downloaded
1607    """
1608
1609    message = pyqtSignal(bool, RPDFile, int)
1610    sequencesUpdate = pyqtSignal(int, list)
1611    renameProblems = pyqtSignal('PyQt_PyObject')
1612
1613    def __init__(self, logging_port: int) -> None:
1614        super().__init__(logging_port=logging_port, thread_name=ThreadNames.rename)
1615        self._process_name = 'Rename and Move File Manager'
1616        self._process_to_run = 'renameandmovefile.py'
1617
1618    def process_sink_data(self):
1619        data = pickle.loads(self.content)  # type: RenameAndMoveFileResults
1620        if data.move_succeeded is not None:
1621
1622            self.message.emit(data.move_succeeded, data.rpd_file, data.download_count)
1623
1624        elif data.problems is not None:
1625            self.renameProblems.emit(data.problems)
1626        else:
1627            assert data.stored_sequence_no is not None
1628            assert data.downloads_today is not None
1629            assert isinstance(data.downloads_today, list)
1630            self.sequencesUpdate.emit(data.stored_sequence_no, data.downloads_today)
1631
1632
1633class ThumbnailDaemonManager(PushPullDaemonManager):
1634    """
1635    Manages the process that extracts thumbnails after the file
1636    has already been downloaded and that writes FreeDesktop.org
1637    thumbnails. Not to be confused with ThumbnailManagerPara, which
1638    manages thumbnailing using processes that run in parallel,
1639    one for each device.
1640    """
1641
1642    message = pyqtSignal(RPDFile, QPixmap)
1643
1644    def __init__(self, logging_port: int) -> None:
1645        super().__init__(logging_port=logging_port, thread_name=ThreadNames.thumbnail_daemon)
1646        self._process_name = 'Thumbnail Daemon Manager'
1647        self._process_to_run = 'thumbnaildaemon.py'
1648
1649    def process_sink_data(self) -> None:
1650        data = pickle.loads(self.content) # type: GenerateThumbnailsResults
1651        if data.thumbnail_bytes is None:
1652            thumbnail = QPixmap()
1653        else:
1654            thumbnail = QImage.fromData(data.thumbnail_bytes)
1655            if thumbnail.isNull():
1656                thumbnail = QPixmap()
1657            else:
1658                thumbnail = QPixmap.fromImage(thumbnail)
1659        self.message.emit(data.rpd_file, thumbnail)
1660
1661
1662class OffloadManager(PushPullDaemonManager):
1663    """
1664    Handles tasks best run in a separate process
1665    """
1666
1667    message = pyqtSignal(TemporalProximityGroups)
1668    downloadFolders = pyqtSignal(FoldersPreview)
1669
1670    def __init__(self, logging_port: int) -> None:
1671        super().__init__(logging_port=logging_port, thread_name=ThreadNames.offload)
1672        self._process_name = 'Offload Manager'
1673        self._process_to_run = 'offload.py'
1674
1675    def process_sink_data(self) -> None:
1676        data = pickle.loads(self.content)  # type: OffloadResults
1677        if data.proximity_groups is not None:
1678            self.message.emit(data.proximity_groups)
1679        elif data.folders_preview is not None:
1680            self.downloadFolders.emit(data.folders_preview)
1681
1682
1683class ScanManager(PublishPullPipelineManager):
1684    """
1685    Handles the processes that scan devices (cameras, external devices,
1686    this computer path)
1687    """
1688    scannedFiles = pyqtSignal(
1689        'PyQt_PyObject', 'PyQt_PyObject', FileTypeCounter, 'PyQt_PyObject', bool, bool
1690    )
1691    deviceError = pyqtSignal(int, CameraErrorCode)
1692    deviceDetails = pyqtSignal(int, 'PyQt_PyObject', 'PyQt_PyObject', str)
1693    scanProblems = pyqtSignal(int, 'PyQt_PyObject')
1694    fatalError = pyqtSignal(int)
1695    cameraRemovedDuringScan = pyqtSignal(int)
1696
1697    def __init__(self, logging_port: int) -> None:
1698        super().__init__(logging_port=logging_port, thread_name=ThreadNames.scan)
1699        self._process_name = 'Scan Manager'
1700        self._process_to_run = 'scan.py'
1701
1702    def process_sink_data(self) -> None:
1703        data = pickle.loads(self.content)  # type: ScanResults
1704        if data.rpd_files is not None:
1705            assert data.file_type_counter
1706            assert data.file_size_sum
1707            assert data.entire_video_required is not None
1708            assert  data.entire_photo_required is not None
1709            self.scannedFiles.emit(
1710                data.rpd_files,
1711                (data.sample_photo, data.sample_video),
1712                data.file_type_counter,
1713                data.file_size_sum,
1714                data.entire_video_required,
1715                data.entire_photo_required
1716            )
1717        else:
1718            assert data.scan_id is not None
1719            if data.error_code is not None:
1720                self.deviceError.emit(data.scan_id, data.error_code)
1721            elif data.optimal_display_name is not None:
1722                self.deviceDetails.emit(
1723                    data.scan_id, data.storage_space, data.storage_descriptions,
1724                    data.optimal_display_name
1725                )
1726            elif data.problems is not None:
1727                self.scanProblems.emit(data.scan_id, data.problems)
1728            elif data.camera_removed is not None:
1729                self.cameraRemovedDuringScan.emit(
1730                    data.scan_id
1731                )
1732            else:
1733                assert data.fatal_error
1734                self.fatalError.emit(data.scan_id)
1735
1736
1737class BackupManager(PublishPullPipelineManager):
1738    """
1739    Each backup "device" (it could be an external drive, or a user-
1740    specified path on the local file system) has associated with it one
1741    worker process. For example if photos and videos are both being
1742    backed up to the same external hard drive, one worker process
1743    handles both the photos and the videos. However if photos are being
1744    backed up to one drive, and videos to another, there would be a
1745    worker process for each drive (2 in total).
1746    """
1747    message = pyqtSignal(int, bool, bool, RPDFile, str, 'PyQt_PyObject')
1748    bytesBackedUp = pyqtSignal('PyQt_PyObject', 'PyQt_PyObject')
1749    backupProblems = pyqtSignal(int, 'PyQt_PyObject')
1750
1751    def __init__(self, logging_port: int) -> None:
1752        super().__init__(logging_port=logging_port, thread_name=ThreadNames.backup)
1753        self._process_name = 'Backup Manager'
1754        self._process_to_run = 'backupfile.py'
1755
1756    def process_sink_data(self) -> None:
1757        data = pickle.loads(self.content) # type: BackupResults
1758        if data.total_downloaded is not None:
1759            assert data.scan_id is not None
1760            assert data.chunk_downloaded >= 0
1761            assert data.total_downloaded >= 0
1762            self.bytesBackedUp.emit(data.scan_id, data.chunk_downloaded)
1763        elif data.backup_succeeded is not None:
1764            assert data.do_backup is not None
1765            assert data.rpd_file is not None
1766            self.message.emit(
1767                data.device_id, data.backup_succeeded, data.do_backup, data.rpd_file,
1768                data.backup_full_file_name, data.mdata_exceptions
1769            )
1770        else:
1771            assert data.problems is not None
1772            self.backupProblems.emit(data.device_id, data.problems)
1773
1774
1775class CopyFilesManager(PublishPullPipelineManager):
1776    """
1777    Manage the processes that copy files from devices to the computer
1778    during the download process
1779    """
1780
1781    message = pyqtSignal(bool, RPDFile, int, 'PyQt_PyObject')
1782    tempDirs = pyqtSignal(int, str,str)
1783    bytesDownloaded = pyqtSignal(int, 'PyQt_PyObject', 'PyQt_PyObject')
1784    copyProblems = pyqtSignal(int, 'PyQt_PyObject')
1785    cameraRemoved = pyqtSignal(int)
1786
1787    def __init__(self, logging_port: int) -> None:
1788        super().__init__(logging_port=logging_port, thread_name=ThreadNames.copy)
1789        self._process_name = 'Copy Files Manager'
1790        self._process_to_run = 'copyfiles.py'
1791
1792    def process_sink_data(self) -> None:
1793        data = pickle.loads(self.content) # type: CopyFilesResults
1794        if data.total_downloaded is not None:
1795            assert data.scan_id is not None
1796            if data.chunk_downloaded < 0:
1797                logging.critical("Chunk downloaded is less than zero: %s", data.chunk_downloaded)
1798            if data.total_downloaded < 0:
1799                logging.critical("Chunk downloaded is less than zero: %s", data.total_downloaded)
1800
1801            self.bytesDownloaded.emit(data.scan_id, data.total_downloaded, data.chunk_downloaded)
1802
1803        elif data.copy_succeeded is not None:
1804            assert data.rpd_file is not None
1805            assert data.download_count is not None
1806            self.message.emit(
1807                data.copy_succeeded, data.rpd_file, data.download_count, data.mdata_exceptions
1808            )
1809
1810        elif data.problems is not None:
1811            self.copyProblems.emit(data.scan_id, data.problems)
1812        elif data.camera_removed is not None:
1813            self.cameraRemoved.emit(data.scan_id)
1814
1815        else:
1816            assert (data.photo_temp_dir is not None and
1817                    data.video_temp_dir is not None)
1818            assert data.scan_id is not None
1819            self.tempDirs.emit(data.scan_id, data.photo_temp_dir, data.video_temp_dir)