1# Copyright (C) 2007-2020 by the Free Software Foundation, Inc.
2#
3# This file is part of GNU Mailman.
4#
5# GNU Mailman is free software: you can redistribute it and/or modify it under
6# the terms of the GNU General Public License as published by the Free
7# Software Foundation, either version 3 of the License, or (at your option)
8# any later version.
9#
10# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
13# more details.
14#
15# You should have received a copy of the GNU General Public License along with
16# GNU Mailman.  If not, see <https://www.gnu.org/licenses/>.
17
18"""Application level bounce handling."""
19
20import re
21import uuid
22import logging
23
24from email.mime.message import MIMEMessage
25from email.mime.text import MIMEText
26from email.utils import parseaddr
27from mailman.config import config
28from mailman.core.i18n import _
29from mailman.email.message import OwnerNotification, UserNotification
30from mailman.interfaces.bounce import UnrecognizedBounceDisposition
31from mailman.interfaces.listmanager import IListManager
32from mailman.interfaces.pending import IPendable, IPendings
33from mailman.interfaces.subscriptions import ISubscriptionService
34from mailman.interfaces.template import ITemplateLoader
35from mailman.utilities.email import split_email
36from mailman.utilities.string import expand, oneline, wrap
37from public import public
38from string import Template
39from zope.component import getUtility
40from zope.interface import implementer
41
42
43log = logging.getLogger('mailman.config')
44elog = logging.getLogger('mailman.error')
45blog = logging.getLogger('mailman.bounce')
46
47DOT = '.'
48NL = '\n'
49
50
51@public
52def bounce_message(mlist, msg, error=None):
53    """Bounce the message back to the original author.
54
55    :param mlist: The mailing list that the message was posted to.
56    :type mlist: `IMailingList`
57    :param msg: The original message.
58    :type msg: `email.message.Message`
59    :param error: Optional exception causing the bounce.  The exception
60        instance must have a `.message` attribute.  The exception *may* have a
61        non-None `.reasons` attribute which would be a list of reasons for the
62        rejection, and it may have a non-None `.substitutions` attribute.  The
63        latter, along with the formatted reasons will be interpolated into the
64        message (`.reasons` gets put into the `$reasons` placeholder).
65    :type error: RejectMessage
66    """
67    # Bounce a message back to the sender, with an error message if provided
68    # in the exception argument.  .sender might be None or the empty string.
69    if not msg.sender:
70        # We can't bounce the message if we don't know who it's supposed to go
71        # to.
72        return
73    subject = msg.get('subject', _('(no subject)'))
74    subject = oneline(subject, mlist.preferred_language.charset)
75    notice = (_('[No bounce details are available]')
76              if error is None
77              else str(error))
78    # Currently we always craft bounces as MIME messages.
79    bmsg = UserNotification(msg.sender, mlist.owner_address, subject,
80                            lang=mlist.preferred_language)
81    # BAW: Be sure you set the type before trying to attach, or you'll get
82    # a MultipartConversionError.
83    bmsg.set_type('multipart/mixed')
84    txt = MIMEText(notice, _charset=mlist.preferred_language.charset)
85    bmsg.attach(txt)
86    bmsg.attach(MIMEMessage(msg))
87    bmsg.send(mlist)
88
89
90class _BaseVERPParser:
91    """Base class for parsing VERP messages.
92
93    Sadly not every MTA bounces VERP messages correctly, or consistently.
94    First, the To: header is checked, then Delivered-To: (Postfix),
95    Envelope-To: (Exim) and Apparently-To:.  Note that there can be multiple
96    headers so we need to search them all
97    """
98
99    def __init__(self, pattern):
100        self._pattern = pattern
101        self._cre = re.compile(pattern, re.IGNORECASE)
102
103    def get_verp(self, mlist, msg):
104        """Extract a set of VERP bounce addresses.
105
106        :param mlist: The mailing list being checked.
107        :type mlist: `IMailingList`
108        :param msg: The message being parsed.
109        :type msg: `email.message.Message`
110        :return: The set of addresses extracted from the VERP headers.
111        :rtype: set of strings
112        """
113        blocal, bdomain = split_email(mlist.bounces_address)
114        values = set()
115        verp_matches = set()
116        for header in ('to', 'delivered-to', 'envelope-to', 'apparently-to'):
117            values.update(msg.get_all(header, []))
118        for field in values:
119            address = parseaddr(field)[1]
120            if not address:
121                # This header was empty.
122                continue
123            mo = self._cre.search(address)
124            if not mo:
125                # This did not match the VERP regexp.
126                continue
127            try:
128                if blocal != mo.group('bounces'):
129                    # This was not a bounce to our mailing list.
130                    continue
131                original_address = self._get_address(mo)
132            except IndexError:
133                elog.error('Bad VERP pattern: {0}'.format(self._pattern))
134                return set()
135            else:
136                if original_address is not None:
137                    verp_matches.add(original_address)
138        return verp_matches
139
140
141@public
142class StandardVERP(_BaseVERPParser):
143    def __init__(self):
144        super().__init__(config.mta.verp_regexp)
145
146    def _get_address(self, match_object):
147        return '{0}@{1}'.format(*match_object.group('local', 'domain'))
148
149
150@public
151class ProbeVERP(_BaseVERPParser):
152    def __init__(self):
153        super().__init__(config.mta.verp_probe_regexp)
154
155    def _get_address(self, match_object):
156        # Extract the token and get the matching address.
157        token = match_object.group('token')
158        pendable = getUtility(IPendings).confirm(token)
159        if pendable is None:
160            # The token must have already been confirmed, or it may have been
161            # evicted from the database already.
162            return None
163        # We had to pend the uuid as a unicode.
164        member_id = uuid.UUID(hex=pendable['member_id'])
165        member = getUtility(ISubscriptionService).get_member(member_id)
166        if member is None:
167            return None
168        return member.address.email
169
170
171@implementer(IPendable)
172class _ProbePendable(dict):
173    """The pendable dictionary for probe messages."""
174    PEND_TYPE = 'probe'
175
176
177@public
178def send_probe(member, msg=None, message_id=None):
179    """Send a VERP probe to the member.
180
181    :param member: The member to send the probe to.  From this object, both
182        the user and the mailing list can be determined.
183    :type member: IMember
184    :param msg: The bouncing message that caused the probe to be sent.
185    :type msg:
186    :param message_id: MessageID of the bouncing message.
187    :type message_id: str
188    :return: The token representing this probe in the pendings database.
189    :rtype: string
190    """
191    if (message_id or msg) is None:
192        raise ValueError('Required at least one of "message_id" and "msg".')
193    mlist = getUtility(IListManager).get_by_list_id(
194        member.mailing_list.list_id)
195    template = getUtility(ITemplateLoader).get(
196        'list:user:notice:probe', mlist,
197        language=member.preferred_language.code,
198        # For backward compatibility.
199        code=member.preferred_language.code,
200        )
201    text = wrap(expand(template, mlist, dict(
202        sender_email=member.subscriber.email,
203        # For backward compatibility.
204        address=member.address.email,
205        email=member.address.email,
206        owneraddr=mlist.owner_address,
207        )))
208    if message_id is None:
209        message_id = msg['message-id']
210    if isinstance(message_id, bytes):
211        message_id = message_id.decode('ascii')
212    pendable = _ProbePendable(
213        # We can only pend unicodes.
214        member_id=member.member_id.hex,
215        message_id=message_id,
216        )
217    token = getUtility(IPendings).add(pendable)
218    mailbox, domain_parts = split_email(mlist.bounces_address)
219    probe_sender = Template(config.mta.verp_probe_format).safe_substitute(
220        bounces=mailbox,
221        token=token,
222        domain=DOT.join(domain_parts),
223        )
224    # Calculate the Subject header, in the member's preferred language.
225    with _.using(member.preferred_language.code):
226        subject = _('$mlist.display_name mailing list probe message')
227    # Craft the probe message.  This will be a multipart where the first part
228    # is the probe text and the second part is the message that caused this
229    # probe to be sent, if it provied.
230    probe = UserNotification(member.address.email, probe_sender,
231                             subject, lang=member.preferred_language)
232    probe.set_type('multipart/mixed')
233    notice = MIMEText(text, _charset=mlist.preferred_language.charset)
234    probe.attach(notice)
235    if msg is not None:
236        probe.attach(MIMEMessage(msg))
237    # Probes should not have the Precedence: bulk header.
238    probe.send(mlist, sender=probe_sender, verp=False, probe_token=token,
239               add_precedence=False)
240    # When we send a probe, we reset the score.
241    member.bounce_score = 0
242    return token
243
244
245@public
246def maybe_forward(mlist, msg):
247    """Possibly forward bounce messages with no recognizable addresses.
248
249    :param mlist: The mailing list.
250    :type mlist: `IMailingList`
251    :param msg: The bounce message to scan.
252    :type msg: `Message`
253    """
254    message_id = msg['message-id']
255    if (mlist.forward_unrecognized_bounces_to
256            is UnrecognizedBounceDisposition.discard):
257        blog.error('Discarding unrecognized bounce: {0}'.format(message_id))
258        return
259    # The notification is either going to go to the list's administrators
260    # (owners and moderators), or to the site administrators.  Most of the
261    # notification is exactly the same in either case.
262    subject = _('Uncaught bounce notification')
263    template = getUtility(ITemplateLoader).get(
264        'list:admin:notice:unrecognized', mlist)
265    text = expand(template, mlist)
266    text_part = MIMEText(text, _charset=mlist.preferred_language.charset)
267    attachment = MIMEMessage(msg)
268    if (mlist.forward_unrecognized_bounces_to
269            is UnrecognizedBounceDisposition.administrators):
270        keywords = dict(roster=mlist.administrators)
271    elif (mlist.forward_unrecognized_bounces_to
272          is UnrecognizedBounceDisposition.site_owner):
273        keywords = {}
274    else:
275        raise AssertionError('Invalid forwarding disposition: {0}'.format(
276                             mlist.forward_unrecognized_bounces_to))
277    # Create the notification and send it.
278    notice = OwnerNotification(mlist, subject, **keywords)
279    notice.set_type('multipart/mixed')
280    notice.attach(text_part)
281    notice.attach(attachment)
282    notice.send(mlist)
283