1# Copyright (C) 1998-2018 by the Free Software Foundation, Inc.
2#
3# This program is free software; you can redistribute it and/or
4# modify it under the terms of the GNU General Public License
5# as published by the Free Software Foundation; either version 2
6# of the License, or (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
16# USA.
17
18"""Generic queue runner class.
19"""
20
21import time
22import traceback
23from cStringIO import StringIO
24
25from Mailman import mm_cfg
26from Mailman import Utils
27from Mailman import Errors
28from Mailman import MailList
29from Mailman import i18n
30
31from Mailman.Logging.Syslog import syslog
32from Mailman.Queue.Switchboard import Switchboard
33
34import email.Errors
35
36try:
37    True, False
38except NameError:
39    True = 1
40    False = 0
41
42
43
44class Runner:
45    QDIR = None
46    SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME
47
48    def __init__(self, slice=None, numslices=1):
49        self._kids = {}
50        # Create our own switchboard.  Don't use the switchboard cache because
51        # we want to provide slice and numslice arguments.
52        self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
53        # Create the shunt switchboard
54        self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
55        self._stop = False
56
57    def __repr__(self):
58        return '<%s at %s>' % (self.__class__.__name__, id(self))
59
60    def stop(self):
61        self._stop = True
62
63    def run(self):
64        # Start the main loop for this queue runner.
65        try:
66            try:
67                while True:
68                    # Once through the loop that processes all the files in
69                    # the queue directory.
70                    filecnt = self._oneloop()
71                    # Do the periodic work for the subclass.  BAW: this
72                    # shouldn't be called here.  There should be one more
73                    # _doperiodic() call at the end of the _oneloop() loop.
74                    self._doperiodic()
75                    # If the stop flag is set, we're done.
76                    if self._stop:
77                        break
78                    # Give the runner an opportunity to snooze for a while,
79                    # but pass it the file count so it can decide whether to
80                    # do more work now or not.
81                    self._snooze(filecnt)
82            except KeyboardInterrupt:
83                pass
84        finally:
85            # We've broken out of our main loop, so we want to reap all the
86            # subprocesses we've created and do any other necessary cleanups.
87            self._cleanup()
88
89    def _oneloop(self):
90        # First, list all the files in our queue directory.
91        # Switchboard.files() is guaranteed to hand us the files in FIFO
92        # order.  Return an integer count of the number of files that were
93        # available for this qrunner to process.
94        files = self._switchboard.files()
95        for filebase in files:
96            try:
97                # Ask the switchboard for the message and metadata objects
98                # associated with this filebase.
99                msg, msgdata = self._switchboard.dequeue(filebase)
100            except Exception, e:
101                # This used to just catch email.Errors.MessageParseError,
102                # but other problems can occur in message parsing, e.g.
103                # ValueError, and exceptions can occur in unpickling too.
104                # We don't want the runner to die, so we just log and skip
105                # this entry, but maybe preserve it for analysis.
106                self._log(e)
107                if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
108                    syslog('error',
109                           'Skipping and preserving unparseable message: %s',
110                           filebase)
111                    preserve = True
112                else:
113                    syslog('error',
114                           'Ignoring unparseable message: %s', filebase)
115                    preserve = False
116                self._switchboard.finish(filebase, preserve=preserve)
117                continue
118            try:
119                self._onefile(msg, msgdata)
120                self._switchboard.finish(filebase)
121            except Exception, e:
122                # All runners that implement _dispose() must guarantee that
123                # exceptions are caught and dealt with properly.  Still, there
124                # may be a bug in the infrastructure, and we do not want those
125                # to cause messages to be lost.  Any uncaught exceptions will
126                # cause the message to be stored in the shunt queue for human
127                # intervention.
128                self._log(e)
129                # Put a marker in the metadata for unshunting
130                msgdata['whichq'] = self._switchboard.whichq()
131                # It is possible that shunting can throw an exception, e.g. a
132                # permissions problem or a MemoryError due to a really large
133                # message.  Try to be graceful.
134                try:
135                    new_filebase = self._shunt.enqueue(msg, msgdata)
136                    syslog('error', 'SHUNTING: %s', new_filebase)
137                    self._switchboard.finish(filebase)
138                except Exception, e:
139                    # The message wasn't successfully shunted.  Log the
140                    # exception and try to preserve the original queue entry
141                    # for possible analysis.
142                    self._log(e)
143                    syslog('error',
144                           'SHUNTING FAILED, preserving original entry: %s',
145                           filebase)
146                    self._switchboard.finish(filebase, preserve=True)
147            # Other work we want to do each time through the loop
148            Utils.reap(self._kids, once=True)
149            self._doperiodic()
150            if self._shortcircuit():
151                break
152        return len(files)
153
154    def _onefile(self, msg, msgdata):
155        # Do some common sanity checking on the message metadata.  It's got to
156        # be destined for a particular mailing list.  This switchboard is used
157        # to shunt off badly formatted messages.  We don't want to just trash
158        # them because they may be fixable with human intervention.  Just get
159        # them out of our site though.
160        #
161        # Find out which mailing list this message is destined for.
162        listname = msgdata.get('listname')
163        if not listname:
164            listname = mm_cfg.MAILMAN_SITE_LIST
165        mlist = self._open_list(listname)
166        if not mlist:
167            syslog('error',
168                   'Dequeuing message destined for missing list: %s',
169                   listname)
170            self._shunt.enqueue(msg, msgdata)
171            return
172        # Now process this message, keeping track of any subprocesses that may
173        # have been spawned.  We'll reap those later.
174        #
175        # We also want to set up the language context for this message.  The
176        # context will be the preferred language for the user if a member of
177        # the list, or the list's preferred language.  However, we must take
178        # special care to reset the defaults, otherwise subsequent messages
179        # may be translated incorrectly.  BAW: I'm not sure I like this
180        # approach, but I can't think of anything better right now.
181        otranslation = i18n.get_translation()
182        sender = msg.get_sender()
183        if mlist:
184            lang = mlist.getMemberLanguage(sender)
185        else:
186            lang = mm_cfg.DEFAULT_SERVER_LANGUAGE
187        i18n.set_language(lang)
188        msgdata['lang'] = lang
189        try:
190            keepqueued = self._dispose(mlist, msg, msgdata)
191        finally:
192            i18n.set_translation(otranslation)
193        # Keep tabs on any child processes that got spawned.
194        kids = msgdata.get('_kids')
195        if kids:
196            self._kids.update(kids)
197        if keepqueued:
198            self._switchboard.enqueue(msg, msgdata)
199
200    def _open_list(self, listname):
201        # We no longer cache the list instances.  Because of changes to
202        # MailList.py needed to avoid not reloading an updated list, caching
203        # is not as effective as it once was.  Also, with OldStyleMemberships
204        # as the MemberAdaptor, there was a self-reference to the list which
205        # kept all lists in the cache.  Changing this reference to a
206        # weakref.proxy created other issues.
207        try:
208            mlist = MailList.MailList(listname, lock=False)
209        except Errors.MMListError, e:
210            syslog('error', 'error opening list: %s\n%s', listname, e)
211            return None
212        return mlist
213
214    def _log(self, exc):
215        syslog('error', 'Uncaught runner exception: %s', exc)
216        s = StringIO()
217        traceback.print_exc(file=s)
218        syslog('error', s.getvalue())
219
220    #
221    # Subclasses can override these methods.
222    #
223    def _cleanup(self):
224        """Clean up upon exit from the main processing loop.
225
226        Called when the Runner's main loop is stopped, this should perform
227        any necessary resource deallocation.  Its return value is irrelevant.
228        """
229        Utils.reap(self._kids)
230
231    def _dispose(self, mlist, msg, msgdata):
232        """Dispose of a single message destined for a mailing list.
233
234        Called for each message that the Runner is responsible for, this is
235        the primary overridable method for processing each message.
236        Subclasses, must provide implementation for this method.
237
238        mlist is the MailList instance this message is destined for.
239
240        msg is the Message object representing the message.
241
242        msgdata is a dictionary of message metadata.
243        """
244        raise NotImplementedError
245
246    def _doperiodic(self):
247        """Do some processing `every once in a while'.
248
249        Called every once in a while both from the Runner's main loop, and
250        from the Runner's hash slice processing loop.  You can do whatever
251        special periodic processing you want here, and the return value is
252        irrelevant.
253        """
254        pass
255
256    def _snooze(self, filecnt):
257        """Sleep for a little while.
258
259        filecnt is the number of messages in the queue the last time through.
260        Sub-runners can decide to continue to do work, or sleep for a while
261        based on this value.  By default, we only snooze if there was nothing
262        to do last time around.
263        """
264        if filecnt or self.SLEEPTIME <= 0:
265            return
266        time.sleep(self.SLEEPTIME)
267
268    def _shortcircuit(self):
269        """Return a true value if the individual file processing loop should
270        exit before it's finished processing each message in the current slice
271        of hash space.  A false value tells _oneloop() to continue processing
272        until the current snapshot of hash space is exhausted.
273
274        You could, for example, implement a throttling algorithm here.
275        """
276        return self._stop
277