1# -*- coding: utf-8 -*- 2 3# Copyright (C) 2014 - Garrett Regier 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program; if not, write to the Free Software 17# Foundation, Inc. 51 Franklin Street, Fifth Floor, Boston, MA 18# 02110-1301 USA. 19 20from gi.repository import GLib 21 22import abc 23import collections 24import queue 25import threading 26import traceback 27 28from .debug import debug 29 30 31class WorkerThread(threading.Thread): 32 __metaclass__ = abc.ABCMeta 33 34 __sentinel = object() 35 36 def __init__(self, callback, chunk_size=1, *args, **kwargs): 37 super().__init__(*args, **kwargs) 38 39 self.__callback = callback 40 self.__chunk_size = chunk_size 41 42 self.__quit = threading.Event() 43 self.__has_idle = threading.Event() 44 45 self.__tasks = queue.Queue() 46 self.__results = collections.deque() 47 48 @abc.abstractmethod 49 def handle_task(self, *args, **kwargs): 50 raise NotImplementedError 51 52 # TODO: add, put, push? 53 def push(self, *args, **kwargs): 54 self.__tasks.put((args, kwargs)) 55 56 def __close(self, process_results): 57 self.__quit.set() 58 59 # Prevent the queue.get() from blocking forever 60 self.__tasks.put(self.__sentinel) 61 62 super().join() 63 64 if not process_results: 65 self.__results.clear() 66 67 else: 68 while self.__in_idle() is GLib.SOURCE_CONTINUE: 69 pass 70 71 def terminate(self): 72 self.__close(False) 73 74 def join(self): 75 self.__close(True) 76 77 def clear(self): 78 old_tasks = self.__tasks 79 self.__tasks = queue.Queue(1) 80 81 # Prevent the queue.get() from blocking forever 82 old_tasks.put(self.__sentinel) 83 84 # Block until the old queue has finished, otherwise 85 # a old result could be added to the new results queue 86 self.__tasks.put(self.__sentinel) 87 self.__tasks.put(self.__sentinel) 88 89 old_tasks = self.__tasks 90 self.__tasks = queue.Queue() 91 92 # Switch to the new queue 93 old_tasks.put(self.__sentinel) 94 95 # Finally, we can now create a new deque without 96 # the possibility of any old results being added to it 97 self.__results.clear() 98 99 def run(self): 100 while not self.__quit.is_set(): 101 task = self.__tasks.get() 102 if task is self.__sentinel: 103 continue 104 105 args, kwargs = task 106 107 try: 108 result = self.handle_task(*args, **kwargs) 109 110 except Exception: 111 traceback.print_exc() 112 continue 113 114 self.__results.append(result) 115 116 # Avoid having an idle for every result 117 if not self.__has_idle.is_set(): 118 self.__has_idle.set() 119 120 debug('%s<%s>: result callback idle started' % 121 (type(self).__name__, self.name)) 122 GLib.source_set_name_by_id(GLib.idle_add(self.__in_idle), 123 '[gedit] git %s result callback idle' % 124 (type(self).__name__,)) 125 126 def __in_idle(self): 127 try: 128 for i in range(self.__chunk_size): 129 result = self.__results.popleft() 130 131 try: 132 self.__callback(result) 133 134 except Exception: 135 traceback.print_exc() 136 137 except IndexError: 138 # Must be cleared before we check the results length 139 self.__has_idle.clear() 140 141 # Only remove the idle when there are no more items, 142 # some could have been added after the IndexError was raised 143 if len(self.__results) == 0: 144 debug('%s<%s>: result callback idle finished' % 145 (type(self).__name__, self.name)) 146 return GLib.SOURCE_REMOVE 147 148 return GLib.SOURCE_CONTINUE 149 150# ex:ts=4:et: 151