1import email.message
2import mailbox
3import pathlib
4import shutil
5import socket
6import subprocess
7import textwrap
8import time
9import os
10
11import pytest
12
13
14def pytest_report_header():
15    which = shutil.which('notmuch')
16    vers = subprocess.run(['notmuch', '--version'], stdout=subprocess.PIPE)
17    return ['{} ({})'.format(vers.stdout.decode(errors='replace').strip(),which)]
18
19
20@pytest.fixture(scope='function')
21def tmppath(tmpdir):
22    """The tmpdir fixture wrapped in pathlib.Path."""
23    return pathlib.Path(str(tmpdir))
24
25
26@pytest.fixture
27def notmuch(maildir):
28    """Return a function which runs notmuch commands on our test maildir.
29
30    This uses the notmuch-config file created by the ``maildir``
31    fixture.
32    """
33    def run(*args):
34        """Run a notmuch command.
35
36        This function runs with a timeout error as many notmuch
37        commands may block if multiple processes are trying to open
38        the database in write-mode.  It is all too easy to
39        accidentally do this in the unittests.
40        """
41        cfg_fname = maildir.path / 'notmuch-config'
42        cmd = ['notmuch'] + list(args)
43        env = os.environ.copy()
44        env['NOTMUCH_CONFIG'] = str(cfg_fname)
45        proc = subprocess.run(cmd,
46                              timeout=5,
47                              env=env)
48        proc.check_returncode()
49    return run
50
51
52@pytest.fixture
53def maildir(tmppath):
54    """A basic test interface to a valid maildir directory.
55
56    This creates a valid maildir and provides a simple mechanism to
57    deliver test emails to it.  It also writes a notmuch-config file
58    in the top of the maildir.
59    """
60    cur = tmppath / 'cur'
61    cur.mkdir()
62    new = tmppath / 'new'
63    new.mkdir()
64    tmp = tmppath / 'tmp'
65    tmp.mkdir()
66    cfg_fname = tmppath/'notmuch-config'
67    with cfg_fname.open('w') as fp:
68        fp.write(textwrap.dedent("""\
69            [database]
70            path={tmppath!s}
71            [user]
72            name=Some Hacker
73            primary_email=dst@example.com
74            [new]
75            tags=unread;inbox;
76            ignore=
77            [search]
78            exclude_tags=deleted;spam;
79            [maildir]
80            synchronize_flags=true
81            """.format(tmppath=tmppath)))
82    return MailDir(tmppath)
83
84
85class MailDir:
86    """An interface around a correct maildir."""
87
88    def __init__(self, path):
89        self._path = pathlib.Path(path)
90        self.mailbox = mailbox.Maildir(str(path))
91        self._idcount = 0
92
93    @property
94    def path(self):
95        """The pathname of the maildir."""
96        return self._path
97
98    def _next_msgid(self):
99        """Return a new unique message ID."""
100        msgid = '{}@{}'.format(self._idcount, socket.getfqdn())
101        self._idcount += 1
102        return msgid
103
104    def deliver(self,
105                subject='Test mail',
106                body='This is a test mail',
107                to='dst@example.com',
108                frm='src@example.com',
109                headers=None,
110                new=False,      # Move to new dir or cur dir?
111                keywords=None,  # List of keywords or labels
112                seen=False,     # Seen flag (cur dir only)
113                replied=False,  # Replied flag (cur dir only)
114                flagged=False):  # Flagged flag (cur dir only)
115        """Deliver a new mail message in the mbox.
116
117        This does only adds the message to maildir, does not insert it
118        into the notmuch database.
119
120        :returns: A tuple of (msgid, pathname).
121        """
122        msgid = self._next_msgid()
123        when = time.time()
124        msg = email.message.EmailMessage()
125        msg.add_header('Received', 'by MailDir; {}'.format(time.ctime(when)))
126        msg.add_header('Message-ID', '<{}>'.format(msgid))
127        msg.add_header('Date', time.ctime(when))
128        msg.add_header('From', frm)
129        msg.add_header('To', to)
130        msg.add_header('Subject', subject)
131        if headers:
132            for h, v in headers:
133                msg.add_header(h, v)
134        msg.set_content(body)
135        mdmsg = mailbox.MaildirMessage(msg)
136        if not new:
137            mdmsg.set_subdir('cur')
138        if flagged:
139            mdmsg.add_flag('F')
140        if replied:
141            mdmsg.add_flag('R')
142        if seen:
143            mdmsg.add_flag('S')
144        boxid = self.mailbox.add(mdmsg)
145        basename = boxid
146        if mdmsg.get_info():
147            basename += mailbox.Maildir.colon + mdmsg.get_info()
148        msgpath = self.path / mdmsg.get_subdir() / basename
149        return (msgid, msgpath)
150