1# -*- coding: utf-8 -*-
2
3#  Copyright (C) 2014 - Garrett Regier
4#
5#  This program is free software; you can redistribute it and/or modify
6#  it under the terms of the GNU General Public License as published by
7#  the Free Software Foundation; either version 2 of the License, or
8#  (at your option) any later version.
9#
10#  This program is distributed in the hope that it will be useful,
11#  but WITHOUT ANY WARRANTY; without even the implied warranty of
12#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13#  GNU General Public License for more details.
14#
15#  You should have received a copy of the GNU General Public License
16#  along with this program; if not, write to the Free Software
17#  Foundation, Inc.  51 Franklin Street, Fifth Floor, Boston, MA
18#  02110-1301 USA.
19
20from gi.repository import GLib
21
22import abc
23import collections
24import queue
25import threading
26import traceback
27
28from .debug import debug
29
30
31class WorkerThread(threading.Thread):
32    __metaclass__ = abc.ABCMeta
33
34    __sentinel = object()
35
36    def __init__(self, callback, chunk_size=1, *args, **kwargs):
37        super().__init__(*args, **kwargs)
38
39        self.__callback = callback
40        self.__chunk_size = chunk_size
41
42        self.__quit = threading.Event()
43        self.__has_idle = threading.Event()
44
45        self.__tasks = queue.Queue()
46        self.__results = collections.deque()
47
48    @abc.abstractmethod
49    def handle_task(self, *args, **kwargs):
50        raise NotImplementedError
51
52    # TODO: add, put, push?
53    def push(self, *args, **kwargs):
54        self.__tasks.put((args, kwargs))
55
56    def __close(self, process_results):
57        self.__quit.set()
58
59        # Prevent the queue.get() from blocking forever
60        self.__tasks.put(self.__sentinel)
61
62        super().join()
63
64        if not process_results:
65            self.__results.clear()
66
67        else:
68            while self.__in_idle() is GLib.SOURCE_CONTINUE:
69                pass
70
71    def terminate(self):
72        self.__close(False)
73
74    def join(self):
75        self.__close(True)
76
77    def clear(self):
78        old_tasks = self.__tasks
79        self.__tasks = queue.Queue(1)
80
81        # Prevent the queue.get() from blocking forever
82        old_tasks.put(self.__sentinel)
83
84        # Block until the old queue has finished, otherwise
85        # a old result could be added to the new results queue
86        self.__tasks.put(self.__sentinel)
87        self.__tasks.put(self.__sentinel)
88
89        old_tasks = self.__tasks
90        self.__tasks = queue.Queue()
91
92        # Switch to the new queue
93        old_tasks.put(self.__sentinel)
94
95        # Finally, we can now create a new deque without
96        # the possibility of any old results being added to it
97        self.__results.clear()
98
99    def run(self):
100        while not self.__quit.is_set():
101            task = self.__tasks.get()
102            if task is self.__sentinel:
103                continue
104
105            args, kwargs = task
106
107            try:
108                result = self.handle_task(*args, **kwargs)
109
110            except Exception:
111                traceback.print_exc()
112                continue
113
114            self.__results.append(result)
115
116            # Avoid having an idle for every result
117            if not self.__has_idle.is_set():
118                self.__has_idle.set()
119
120                debug('%s<%s>: result callback idle started' %
121                      (type(self).__name__, self.name))
122                GLib.source_set_name_by_id(GLib.idle_add(self.__in_idle),
123                                           '[gedit] git %s result callback idle' %
124                                           (type(self).__name__,))
125
126    def __in_idle(self):
127        try:
128            for i in range(self.__chunk_size):
129                result = self.__results.popleft()
130
131                try:
132                    self.__callback(result)
133
134                except Exception:
135                    traceback.print_exc()
136
137        except IndexError:
138            # Must be cleared before we check the results length
139            self.__has_idle.clear()
140
141            # Only remove the idle when there are no more items,
142            # some could have been added after the IndexError was raised
143            if len(self.__results) == 0:
144                debug('%s<%s>: result callback idle finished' %
145                      (type(self).__name__, self.name))
146                return GLib.SOURCE_REMOVE
147
148        return GLib.SOURCE_CONTINUE
149
150# ex:ts=4:et:
151