1# Copyright (C) 1998-2018 by the Free Software Foundation, Inc. 2# 3# This program is free software; you can redistribute it and/or 4# modify it under the terms of the GNU General Public License 5# as published by the Free Software Foundation; either version 2 6# of the License, or (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, 16# USA. 17 18"""Generic queue runner class. 19""" 20 21import time 22import traceback 23from cStringIO import StringIO 24 25from Mailman import mm_cfg 26from Mailman import Utils 27from Mailman import Errors 28from Mailman import MailList 29from Mailman import i18n 30 31from Mailman.Logging.Syslog import syslog 32from Mailman.Queue.Switchboard import Switchboard 33 34import email.Errors 35 36try: 37 True, False 38except NameError: 39 True = 1 40 False = 0 41 42 43 44class Runner: 45 QDIR = None 46 SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME 47 48 def __init__(self, slice=None, numslices=1): 49 self._kids = {} 50 # Create our own switchboard. Don't use the switchboard cache because 51 # we want to provide slice and numslice arguments. 52 self._switchboard = Switchboard(self.QDIR, slice, numslices, True) 53 # Create the shunt switchboard 54 self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR) 55 self._stop = False 56 57 def __repr__(self): 58 return '<%s at %s>' % (self.__class__.__name__, id(self)) 59 60 def stop(self): 61 self._stop = True 62 63 def run(self): 64 # Start the main loop for this queue runner. 65 try: 66 try: 67 while True: 68 # Once through the loop that processes all the files in 69 # the queue directory. 70 filecnt = self._oneloop() 71 # Do the periodic work for the subclass. BAW: this 72 # shouldn't be called here. There should be one more 73 # _doperiodic() call at the end of the _oneloop() loop. 74 self._doperiodic() 75 # If the stop flag is set, we're done. 76 if self._stop: 77 break 78 # Give the runner an opportunity to snooze for a while, 79 # but pass it the file count so it can decide whether to 80 # do more work now or not. 81 self._snooze(filecnt) 82 except KeyboardInterrupt: 83 pass 84 finally: 85 # We've broken out of our main loop, so we want to reap all the 86 # subprocesses we've created and do any other necessary cleanups. 87 self._cleanup() 88 89 def _oneloop(self): 90 # First, list all the files in our queue directory. 91 # Switchboard.files() is guaranteed to hand us the files in FIFO 92 # order. Return an integer count of the number of files that were 93 # available for this qrunner to process. 94 files = self._switchboard.files() 95 for filebase in files: 96 try: 97 # Ask the switchboard for the message and metadata objects 98 # associated with this filebase. 99 msg, msgdata = self._switchboard.dequeue(filebase) 100 except Exception, e: 101 # This used to just catch email.Errors.MessageParseError, 102 # but other problems can occur in message parsing, e.g. 103 # ValueError, and exceptions can occur in unpickling too. 104 # We don't want the runner to die, so we just log and skip 105 # this entry, but maybe preserve it for analysis. 106 self._log(e) 107 if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES: 108 syslog('error', 109 'Skipping and preserving unparseable message: %s', 110 filebase) 111 preserve = True 112 else: 113 syslog('error', 114 'Ignoring unparseable message: %s', filebase) 115 preserve = False 116 self._switchboard.finish(filebase, preserve=preserve) 117 continue 118 try: 119 self._onefile(msg, msgdata) 120 self._switchboard.finish(filebase) 121 except Exception, e: 122 # All runners that implement _dispose() must guarantee that 123 # exceptions are caught and dealt with properly. Still, there 124 # may be a bug in the infrastructure, and we do not want those 125 # to cause messages to be lost. Any uncaught exceptions will 126 # cause the message to be stored in the shunt queue for human 127 # intervention. 128 self._log(e) 129 # Put a marker in the metadata for unshunting 130 msgdata['whichq'] = self._switchboard.whichq() 131 # It is possible that shunting can throw an exception, e.g. a 132 # permissions problem or a MemoryError due to a really large 133 # message. Try to be graceful. 134 try: 135 new_filebase = self._shunt.enqueue(msg, msgdata) 136 syslog('error', 'SHUNTING: %s', new_filebase) 137 self._switchboard.finish(filebase) 138 except Exception, e: 139 # The message wasn't successfully shunted. Log the 140 # exception and try to preserve the original queue entry 141 # for possible analysis. 142 self._log(e) 143 syslog('error', 144 'SHUNTING FAILED, preserving original entry: %s', 145 filebase) 146 self._switchboard.finish(filebase, preserve=True) 147 # Other work we want to do each time through the loop 148 Utils.reap(self._kids, once=True) 149 self._doperiodic() 150 if self._shortcircuit(): 151 break 152 return len(files) 153 154 def _onefile(self, msg, msgdata): 155 # Do some common sanity checking on the message metadata. It's got to 156 # be destined for a particular mailing list. This switchboard is used 157 # to shunt off badly formatted messages. We don't want to just trash 158 # them because they may be fixable with human intervention. Just get 159 # them out of our site though. 160 # 161 # Find out which mailing list this message is destined for. 162 listname = msgdata.get('listname') 163 if not listname: 164 listname = mm_cfg.MAILMAN_SITE_LIST 165 mlist = self._open_list(listname) 166 if not mlist: 167 syslog('error', 168 'Dequeuing message destined for missing list: %s', 169 listname) 170 self._shunt.enqueue(msg, msgdata) 171 return 172 # Now process this message, keeping track of any subprocesses that may 173 # have been spawned. We'll reap those later. 174 # 175 # We also want to set up the language context for this message. The 176 # context will be the preferred language for the user if a member of 177 # the list, or the list's preferred language. However, we must take 178 # special care to reset the defaults, otherwise subsequent messages 179 # may be translated incorrectly. BAW: I'm not sure I like this 180 # approach, but I can't think of anything better right now. 181 otranslation = i18n.get_translation() 182 sender = msg.get_sender() 183 if mlist: 184 lang = mlist.getMemberLanguage(sender) 185 else: 186 lang = mm_cfg.DEFAULT_SERVER_LANGUAGE 187 i18n.set_language(lang) 188 msgdata['lang'] = lang 189 try: 190 keepqueued = self._dispose(mlist, msg, msgdata) 191 finally: 192 i18n.set_translation(otranslation) 193 # Keep tabs on any child processes that got spawned. 194 kids = msgdata.get('_kids') 195 if kids: 196 self._kids.update(kids) 197 if keepqueued: 198 self._switchboard.enqueue(msg, msgdata) 199 200 def _open_list(self, listname): 201 # We no longer cache the list instances. Because of changes to 202 # MailList.py needed to avoid not reloading an updated list, caching 203 # is not as effective as it once was. Also, with OldStyleMemberships 204 # as the MemberAdaptor, there was a self-reference to the list which 205 # kept all lists in the cache. Changing this reference to a 206 # weakref.proxy created other issues. 207 try: 208 mlist = MailList.MailList(listname, lock=False) 209 except Errors.MMListError, e: 210 syslog('error', 'error opening list: %s\n%s', listname, e) 211 return None 212 return mlist 213 214 def _log(self, exc): 215 syslog('error', 'Uncaught runner exception: %s', exc) 216 s = StringIO() 217 traceback.print_exc(file=s) 218 syslog('error', s.getvalue()) 219 220 # 221 # Subclasses can override these methods. 222 # 223 def _cleanup(self): 224 """Clean up upon exit from the main processing loop. 225 226 Called when the Runner's main loop is stopped, this should perform 227 any necessary resource deallocation. Its return value is irrelevant. 228 """ 229 Utils.reap(self._kids) 230 231 def _dispose(self, mlist, msg, msgdata): 232 """Dispose of a single message destined for a mailing list. 233 234 Called for each message that the Runner is responsible for, this is 235 the primary overridable method for processing each message. 236 Subclasses, must provide implementation for this method. 237 238 mlist is the MailList instance this message is destined for. 239 240 msg is the Message object representing the message. 241 242 msgdata is a dictionary of message metadata. 243 """ 244 raise NotImplementedError 245 246 def _doperiodic(self): 247 """Do some processing `every once in a while'. 248 249 Called every once in a while both from the Runner's main loop, and 250 from the Runner's hash slice processing loop. You can do whatever 251 special periodic processing you want here, and the return value is 252 irrelevant. 253 """ 254 pass 255 256 def _snooze(self, filecnt): 257 """Sleep for a little while. 258 259 filecnt is the number of messages in the queue the last time through. 260 Sub-runners can decide to continue to do work, or sleep for a while 261 based on this value. By default, we only snooze if there was nothing 262 to do last time around. 263 """ 264 if filecnt or self.SLEEPTIME <= 0: 265 return 266 time.sleep(self.SLEEPTIME) 267 268 def _shortcircuit(self): 269 """Return a true value if the individual file processing loop should 270 exit before it's finished processing each message in the current slice 271 of hash space. A false value tells _oneloop() to continue processing 272 until the current snapshot of hash space is exhausted. 273 274 You could, for example, implement a throttling algorithm here. 275 """ 276 return self._stop 277