1"""IMAP4 client.
2
3Based on RFC 2060.
4
5Public class:           IMAP4
6Public variable:        Debug
7Public functions:       Internaldate2tuple
8                        Int2AP
9                        ParseFlags
10                        Time2Internaldate
11"""
12
13# Author: Piers Lauder <piers@cs.su.oz.au> December 1997.
14#
15# Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998.
16# String method conversion by ESR, February 2001.
17# GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001.
18# IMAP4_SSL contributed by Tino Lange <Tino.Lange@isg.de> March 2002.
19# GET/SETQUOTA contributed by Andreas Zeidler <az@kreativkombinat.de> June 2002.
20# PROXYAUTH contributed by Rick Holbert <holbert.13@osu.edu> November 2002.
21# GET/SETANNOTATION contributed by Tomas Lindroos <skitta@abo.fi> June 2005.
22
23__version__ = "2.58"
24
25import binascii, errno, random, re, socket, subprocess, sys, time, calendar
26from datetime import datetime, timezone, timedelta
27from io import DEFAULT_BUFFER_SIZE
28
29try:
30    import ssl
31    HAVE_SSL = True
32except ImportError:
33    HAVE_SSL = False
34
35__all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple",
36           "Int2AP", "ParseFlags", "Time2Internaldate"]
37
38#       Globals
39
40CRLF = b'\r\n'
41Debug = 0
42IMAP4_PORT = 143
43IMAP4_SSL_PORT = 993
44AllowedVersions = ('IMAP4REV1', 'IMAP4')        # Most recent first
45
46# Maximal line length when calling readline(). This is to prevent
47# reading arbitrary length lines. RFC 3501 and 2060 (IMAP 4rev1)
48# don't specify a line length. RFC 2683 suggests limiting client
49# command lines to 1000 octets and that servers should be prepared
50# to accept command lines up to 8000 octets, so we used to use 10K here.
51# In the modern world (eg: gmail) the response to, for example, a
52# search command can be quite large, so we now use 1M.
53_MAXLINE = 1000000
54
55
56#       Commands
57
58Commands = {
59        # name            valid states
60        'APPEND':       ('AUTH', 'SELECTED'),
61        'AUTHENTICATE': ('NONAUTH',),
62        'CAPABILITY':   ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
63        'CHECK':        ('SELECTED',),
64        'CLOSE':        ('SELECTED',),
65        'COPY':         ('SELECTED',),
66        'CREATE':       ('AUTH', 'SELECTED'),
67        'DELETE':       ('AUTH', 'SELECTED'),
68        'DELETEACL':    ('AUTH', 'SELECTED'),
69        'ENABLE':       ('AUTH', ),
70        'EXAMINE':      ('AUTH', 'SELECTED'),
71        'EXPUNGE':      ('SELECTED',),
72        'FETCH':        ('SELECTED',),
73        'GETACL':       ('AUTH', 'SELECTED'),
74        'GETANNOTATION':('AUTH', 'SELECTED'),
75        'GETQUOTA':     ('AUTH', 'SELECTED'),
76        'GETQUOTAROOT': ('AUTH', 'SELECTED'),
77        'MYRIGHTS':     ('AUTH', 'SELECTED'),
78        'LIST':         ('AUTH', 'SELECTED'),
79        'LOGIN':        ('NONAUTH',),
80        'LOGOUT':       ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
81        'LSUB':         ('AUTH', 'SELECTED'),
82        'MOVE':         ('SELECTED',),
83        'NAMESPACE':    ('AUTH', 'SELECTED'),
84        'NOOP':         ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
85        'PARTIAL':      ('SELECTED',),                                  # NB: obsolete
86        'PROXYAUTH':    ('AUTH',),
87        'RENAME':       ('AUTH', 'SELECTED'),
88        'SEARCH':       ('SELECTED',),
89        'SELECT':       ('AUTH', 'SELECTED'),
90        'SETACL':       ('AUTH', 'SELECTED'),
91        'SETANNOTATION':('AUTH', 'SELECTED'),
92        'SETQUOTA':     ('AUTH', 'SELECTED'),
93        'SORT':         ('SELECTED',),
94        'STARTTLS':     ('NONAUTH',),
95        'STATUS':       ('AUTH', 'SELECTED'),
96        'STORE':        ('SELECTED',),
97        'SUBSCRIBE':    ('AUTH', 'SELECTED'),
98        'THREAD':       ('SELECTED',),
99        'UID':          ('SELECTED',),
100        'UNSUBSCRIBE':  ('AUTH', 'SELECTED'),
101        }
102
103#       Patterns to match server responses
104
105Continuation = re.compile(br'\+( (?P<data>.*))?')
106Flags = re.compile(br'.*FLAGS \((?P<flags>[^\)]*)\)')
107InternalDate = re.compile(br'.*INTERNALDATE "'
108        br'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
109        br' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
110        br' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
111        br'"')
112# Literal is no longer used; kept for backward compatibility.
113Literal = re.compile(br'.*{(?P<size>\d+)}$', re.ASCII)
114MapCRLF = re.compile(br'\r\n|\r|\n')
115# We no longer exclude the ']' character from the data portion of the response
116# code, even though it violates the RFC.  Popular IMAP servers such as Gmail
117# allow flags with ']', and there are programs (including imaplib!) that can
118# produce them.  The problem with this is if the 'text' portion of the response
119# includes a ']' we'll parse the response wrong (which is the point of the RFC
120# restriction).  However, that seems less likely to be a problem in practice
121# than being unable to correctly parse flags that include ']' chars, which
122# was reported as a real-world problem in issue #21815.
123Response_code = re.compile(br'\[(?P<type>[A-Z-]+)( (?P<data>.*))?\]')
124Untagged_response = re.compile(br'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
125# Untagged_status is no longer used; kept for backward compatibility
126Untagged_status = re.compile(
127    br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?', re.ASCII)
128# We compile these in _mode_xxx.
129_Literal = br'.*{(?P<size>\d+)}$'
130_Untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?'
131
132
133
134class IMAP4:
135
136    r"""IMAP4 client class.
137
138    Instantiate with: IMAP4([host[, port]])
139
140            host - host's name (default: localhost);
141            port - port number (default: standard IMAP4 port).
142
143    All IMAP4rev1 commands are supported by methods of the same
144    name (in lower-case).
145
146    All arguments to commands are converted to strings, except for
147    AUTHENTICATE, and the last argument to APPEND which is passed as
148    an IMAP4 literal.  If necessary (the string contains any
149    non-printing characters or white-space and isn't enclosed with
150    either parentheses or double quotes) each string is quoted.
151    However, the 'password' argument to the LOGIN command is always
152    quoted.  If you want to avoid having an argument string quoted
153    (eg: the 'flags' argument to STORE) then enclose the string in
154    parentheses (eg: "(\Deleted)").
155
156    Each command returns a tuple: (type, [data, ...]) where 'type'
157    is usually 'OK' or 'NO', and 'data' is either the text from the
158    tagged response, or untagged results from command. Each 'data'
159    is either a string, or a tuple. If a tuple, then the first part
160    is the header of the response, and the second part contains
161    the data (ie: 'literal' value).
162
163    Errors raise the exception class <instance>.error("<reason>").
164    IMAP4 server errors raise <instance>.abort("<reason>"),
165    which is a sub-class of 'error'. Mailbox status changes
166    from READ-WRITE to READ-ONLY raise the exception class
167    <instance>.readonly("<reason>"), which is a sub-class of 'abort'.
168
169    "error" exceptions imply a program error.
170    "abort" exceptions imply the connection should be reset, and
171            the command re-tried.
172    "readonly" exceptions imply the command should be re-tried.
173
174    Note: to use this module, you must read the RFCs pertaining to the
175    IMAP4 protocol, as the semantics of the arguments to each IMAP4
176    command are left to the invoker, not to mention the results. Also,
177    most IMAP servers implement a sub-set of the commands available here.
178    """
179
180    class error(Exception): pass    # Logical errors - debug required
181    class abort(error): pass        # Service errors - close and retry
182    class readonly(abort): pass     # Mailbox status changed to READ-ONLY
183
184    def __init__(self, host='', port=IMAP4_PORT):
185        self.debug = Debug
186        self.state = 'LOGOUT'
187        self.literal = None             # A literal argument to a command
188        self.tagged_commands = {}       # Tagged commands awaiting response
189        self.untagged_responses = {}    # {typ: [data, ...], ...}
190        self.continuation_response = '' # Last continuation response
191        self.is_readonly = False        # READ-ONLY desired state
192        self.tagnum = 0
193        self._tls_established = False
194        self._mode_ascii()
195
196        # Open socket to server.
197
198        self.open(host, port)
199
200        try:
201            self._connect()
202        except Exception:
203            try:
204                self.shutdown()
205            except OSError:
206                pass
207            raise
208
209    def _mode_ascii(self):
210        self.utf8_enabled = False
211        self._encoding = 'ascii'
212        self.Literal = re.compile(_Literal, re.ASCII)
213        self.Untagged_status = re.compile(_Untagged_status, re.ASCII)
214
215
216    def _mode_utf8(self):
217        self.utf8_enabled = True
218        self._encoding = 'utf-8'
219        self.Literal = re.compile(_Literal)
220        self.Untagged_status = re.compile(_Untagged_status)
221
222
223    def _connect(self):
224        # Create unique tag for this session,
225        # and compile tagged response matcher.
226
227        self.tagpre = Int2AP(random.randint(4096, 65535))
228        self.tagre = re.compile(br'(?P<tag>'
229                        + self.tagpre
230                        + br'\d+) (?P<type>[A-Z]+) (?P<data>.*)', re.ASCII)
231
232        # Get server welcome message,
233        # request and store CAPABILITY response.
234
235        if __debug__:
236            self._cmd_log_len = 10
237            self._cmd_log_idx = 0
238            self._cmd_log = {}           # Last `_cmd_log_len' interactions
239            if self.debug >= 1:
240                self._mesg('imaplib version %s' % __version__)
241                self._mesg('new IMAP4 connection, tag=%s' % self.tagpre)
242
243        self.welcome = self._get_response()
244        if 'PREAUTH' in self.untagged_responses:
245            self.state = 'AUTH'
246        elif 'OK' in self.untagged_responses:
247            self.state = 'NONAUTH'
248        else:
249            raise self.error(self.welcome)
250
251        self._get_capabilities()
252        if __debug__:
253            if self.debug >= 3:
254                self._mesg('CAPABILITIES: %r' % (self.capabilities,))
255
256        for version in AllowedVersions:
257            if not version in self.capabilities:
258                continue
259            self.PROTOCOL_VERSION = version
260            return
261
262        raise self.error('server not IMAP4 compliant')
263
264
265    def __getattr__(self, attr):
266        #       Allow UPPERCASE variants of IMAP4 command methods.
267        if attr in Commands:
268            return getattr(self, attr.lower())
269        raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
270
271    def __enter__(self):
272        return self
273
274    def __exit__(self, *args):
275        try:
276            self.logout()
277        except OSError:
278            pass
279
280
281    #       Overridable methods
282
283
284    def _create_socket(self):
285        # Default value of IMAP4.host is '', but socket.getaddrinfo()
286        # (which is used by socket.create_connection()) expects None
287        # as a default value for host.
288        host = None if not self.host else self.host
289        return socket.create_connection((host, self.port))
290
291    def open(self, host = '', port = IMAP4_PORT):
292        """Setup connection to remote server on "host:port"
293            (default: localhost:standard IMAP4 port).
294        This connection will be used by the routines:
295            read, readline, send, shutdown.
296        """
297        self.host = host
298        self.port = port
299        self.sock = self._create_socket()
300        self.file = self.sock.makefile('rb')
301
302
303    def read(self, size):
304        """Read 'size' bytes from remote."""
305        return self.file.read(size)
306
307
308    def readline(self):
309        """Read line from remote."""
310        line = self.file.readline(_MAXLINE + 1)
311        if len(line) > _MAXLINE:
312            raise self.error("got more than %d bytes" % _MAXLINE)
313        return line
314
315
316    def send(self, data):
317        """Send data to remote."""
318        self.sock.sendall(data)
319
320
321    def shutdown(self):
322        """Close I/O established in "open"."""
323        self.file.close()
324        try:
325            self.sock.shutdown(socket.SHUT_RDWR)
326        except OSError as exc:
327            # The server might already have closed the connection.
328            # On Windows, this may result in WSAEINVAL (error 10022):
329            # An invalid operation was attempted.
330            if (exc.errno != errno.ENOTCONN
331               and getattr(exc, 'winerror', 0) != 10022):
332                raise
333        finally:
334            self.sock.close()
335
336
337    def socket(self):
338        """Return socket instance used to connect to IMAP4 server.
339
340        socket = <instance>.socket()
341        """
342        return self.sock
343
344
345
346    #       Utility methods
347
348
349    def recent(self):
350        """Return most recent 'RECENT' responses if any exist,
351        else prompt server for an update using the 'NOOP' command.
352
353        (typ, [data]) = <instance>.recent()
354
355        'data' is None if no new messages,
356        else list of RECENT responses, most recent last.
357        """
358        name = 'RECENT'
359        typ, dat = self._untagged_response('OK', [None], name)
360        if dat[-1]:
361            return typ, dat
362        typ, dat = self.noop()  # Prod server for response
363        return self._untagged_response(typ, dat, name)
364
365
366    def response(self, code):
367        """Return data for response 'code' if received, or None.
368
369        Old value for response 'code' is cleared.
370
371        (code, [data]) = <instance>.response(code)
372        """
373        return self._untagged_response(code, [None], code.upper())
374
375
376
377    #       IMAP4 commands
378
379
380    def append(self, mailbox, flags, date_time, message):
381        """Append message to named mailbox.
382
383        (typ, [data]) = <instance>.append(mailbox, flags, date_time, message)
384
385                All args except `message' can be None.
386        """
387        name = 'APPEND'
388        if not mailbox:
389            mailbox = 'INBOX'
390        if flags:
391            if (flags[0],flags[-1]) != ('(',')'):
392                flags = '(%s)' % flags
393        else:
394            flags = None
395        if date_time:
396            date_time = Time2Internaldate(date_time)
397        else:
398            date_time = None
399        literal = MapCRLF.sub(CRLF, message)
400        if self.utf8_enabled:
401            literal = b'UTF8 (' + literal + b')'
402        self.literal = literal
403        return self._simple_command(name, mailbox, flags, date_time)
404
405
406    def authenticate(self, mechanism, authobject):
407        """Authenticate command - requires response processing.
408
409        'mechanism' specifies which authentication mechanism is to
410        be used - it must appear in <instance>.capabilities in the
411        form AUTH=<mechanism>.
412
413        'authobject' must be a callable object:
414
415                data = authobject(response)
416
417        It will be called to process server continuation responses; the
418        response argument it is passed will be a bytes.  It should return bytes
419        data that will be base64 encoded and sent to the server.  It should
420        return None if the client abort response '*' should be sent instead.
421        """
422        mech = mechanism.upper()
423        # XXX: shouldn't this code be removed, not commented out?
424        #cap = 'AUTH=%s' % mech
425        #if not cap in self.capabilities:       # Let the server decide!
426        #    raise self.error("Server doesn't allow %s authentication." % mech)
427        self.literal = _Authenticator(authobject).process
428        typ, dat = self._simple_command('AUTHENTICATE', mech)
429        if typ != 'OK':
430            raise self.error(dat[-1].decode('utf-8', 'replace'))
431        self.state = 'AUTH'
432        return typ, dat
433
434
435    def capability(self):
436        """(typ, [data]) = <instance>.capability()
437        Fetch capabilities list from server."""
438
439        name = 'CAPABILITY'
440        typ, dat = self._simple_command(name)
441        return self._untagged_response(typ, dat, name)
442
443
444    def check(self):
445        """Checkpoint mailbox on server.
446
447        (typ, [data]) = <instance>.check()
448        """
449        return self._simple_command('CHECK')
450
451
452    def close(self):
453        """Close currently selected mailbox.
454
455        Deleted messages are removed from writable mailbox.
456        This is the recommended command before 'LOGOUT'.
457
458        (typ, [data]) = <instance>.close()
459        """
460        try:
461            typ, dat = self._simple_command('CLOSE')
462        finally:
463            self.state = 'AUTH'
464        return typ, dat
465
466
467    def copy(self, message_set, new_mailbox):
468        """Copy 'message_set' messages onto end of 'new_mailbox'.
469
470        (typ, [data]) = <instance>.copy(message_set, new_mailbox)
471        """
472        return self._simple_command('COPY', message_set, new_mailbox)
473
474
475    def create(self, mailbox):
476        """Create new mailbox.
477
478        (typ, [data]) = <instance>.create(mailbox)
479        """
480        return self._simple_command('CREATE', mailbox)
481
482
483    def delete(self, mailbox):
484        """Delete old mailbox.
485
486        (typ, [data]) = <instance>.delete(mailbox)
487        """
488        return self._simple_command('DELETE', mailbox)
489
490    def deleteacl(self, mailbox, who):
491        """Delete the ACLs (remove any rights) set for who on mailbox.
492
493        (typ, [data]) = <instance>.deleteacl(mailbox, who)
494        """
495        return self._simple_command('DELETEACL', mailbox, who)
496
497    def enable(self, capability):
498        """Send an RFC5161 enable string to the server.
499
500        (typ, [data]) = <intance>.enable(capability)
501        """
502        if 'ENABLE' not in self.capabilities:
503            raise IMAP4.error("Server does not support ENABLE")
504        typ, data = self._simple_command('ENABLE', capability)
505        if typ == 'OK' and 'UTF8=ACCEPT' in capability.upper():
506            self._mode_utf8()
507        return typ, data
508
509    def expunge(self):
510        """Permanently remove deleted items from selected mailbox.
511
512        Generates 'EXPUNGE' response for each deleted message.
513
514        (typ, [data]) = <instance>.expunge()
515
516        'data' is list of 'EXPUNGE'd message numbers in order received.
517        """
518        name = 'EXPUNGE'
519        typ, dat = self._simple_command(name)
520        return self._untagged_response(typ, dat, name)
521
522
523    def fetch(self, message_set, message_parts):
524        """Fetch (parts of) messages.
525
526        (typ, [data, ...]) = <instance>.fetch(message_set, message_parts)
527
528        'message_parts' should be a string of selected parts
529        enclosed in parentheses, eg: "(UID BODY[TEXT])".
530
531        'data' are tuples of message part envelope and data.
532        """
533        name = 'FETCH'
534        typ, dat = self._simple_command(name, message_set, message_parts)
535        return self._untagged_response(typ, dat, name)
536
537
538    def getacl(self, mailbox):
539        """Get the ACLs for a mailbox.
540
541        (typ, [data]) = <instance>.getacl(mailbox)
542        """
543        typ, dat = self._simple_command('GETACL', mailbox)
544        return self._untagged_response(typ, dat, 'ACL')
545
546
547    def getannotation(self, mailbox, entry, attribute):
548        """(typ, [data]) = <instance>.getannotation(mailbox, entry, attribute)
549        Retrieve ANNOTATIONs."""
550
551        typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute)
552        return self._untagged_response(typ, dat, 'ANNOTATION')
553
554
555    def getquota(self, root):
556        """Get the quota root's resource usage and limits.
557
558        Part of the IMAP4 QUOTA extension defined in rfc2087.
559
560        (typ, [data]) = <instance>.getquota(root)
561        """
562        typ, dat = self._simple_command('GETQUOTA', root)
563        return self._untagged_response(typ, dat, 'QUOTA')
564
565
566    def getquotaroot(self, mailbox):
567        """Get the list of quota roots for the named mailbox.
568
569        (typ, [[QUOTAROOT responses...], [QUOTA responses]]) = <instance>.getquotaroot(mailbox)
570        """
571        typ, dat = self._simple_command('GETQUOTAROOT', mailbox)
572        typ, quota = self._untagged_response(typ, dat, 'QUOTA')
573        typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT')
574        return typ, [quotaroot, quota]
575
576
577    def list(self, directory='""', pattern='*'):
578        """List mailbox names in directory matching pattern.
579
580        (typ, [data]) = <instance>.list(directory='""', pattern='*')
581
582        'data' is list of LIST responses.
583        """
584        name = 'LIST'
585        typ, dat = self._simple_command(name, directory, pattern)
586        return self._untagged_response(typ, dat, name)
587
588
589    def login(self, user, password):
590        """Identify client using plaintext password.
591
592        (typ, [data]) = <instance>.login(user, password)
593
594        NB: 'password' will be quoted.
595        """
596        typ, dat = self._simple_command('LOGIN', user, self._quote(password))
597        if typ != 'OK':
598            raise self.error(dat[-1])
599        self.state = 'AUTH'
600        return typ, dat
601
602
603    def login_cram_md5(self, user, password):
604        """ Force use of CRAM-MD5 authentication.
605
606        (typ, [data]) = <instance>.login_cram_md5(user, password)
607        """
608        self.user, self.password = user, password
609        return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH)
610
611
612    def _CRAM_MD5_AUTH(self, challenge):
613        """ Authobject to use with CRAM-MD5 authentication. """
614        import hmac
615        pwd = (self.password.encode('utf-8') if isinstance(self.password, str)
616                                             else self.password)
617        return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest()
618
619
620    def logout(self):
621        """Shutdown connection to server.
622
623        (typ, [data]) = <instance>.logout()
624
625        Returns server 'BYE' response.
626        """
627        self.state = 'LOGOUT'
628        try: typ, dat = self._simple_command('LOGOUT')
629        except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]]
630        self.shutdown()
631        if 'BYE' in self.untagged_responses:
632            return 'BYE', self.untagged_responses['BYE']
633        return typ, dat
634
635
636    def lsub(self, directory='""', pattern='*'):
637        """List 'subscribed' mailbox names in directory matching pattern.
638
639        (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*')
640
641        'data' are tuples of message part envelope and data.
642        """
643        name = 'LSUB'
644        typ, dat = self._simple_command(name, directory, pattern)
645        return self._untagged_response(typ, dat, name)
646
647    def myrights(self, mailbox):
648        """Show my ACLs for a mailbox (i.e. the rights that I have on mailbox).
649
650        (typ, [data]) = <instance>.myrights(mailbox)
651        """
652        typ,dat = self._simple_command('MYRIGHTS', mailbox)
653        return self._untagged_response(typ, dat, 'MYRIGHTS')
654
655    def namespace(self):
656        """ Returns IMAP namespaces ala rfc2342
657
658        (typ, [data, ...]) = <instance>.namespace()
659        """
660        name = 'NAMESPACE'
661        typ, dat = self._simple_command(name)
662        return self._untagged_response(typ, dat, name)
663
664
665    def noop(self):
666        """Send NOOP command.
667
668        (typ, [data]) = <instance>.noop()
669        """
670        if __debug__:
671            if self.debug >= 3:
672                self._dump_ur(self.untagged_responses)
673        return self._simple_command('NOOP')
674
675
676    def partial(self, message_num, message_part, start, length):
677        """Fetch truncated part of a message.
678
679        (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length)
680
681        'data' is tuple of message part envelope and data.
682        """
683        name = 'PARTIAL'
684        typ, dat = self._simple_command(name, message_num, message_part, start, length)
685        return self._untagged_response(typ, dat, 'FETCH')
686
687
688    def proxyauth(self, user):
689        """Assume authentication as "user".
690
691        Allows an authorised administrator to proxy into any user's
692        mailbox.
693
694        (typ, [data]) = <instance>.proxyauth(user)
695        """
696
697        name = 'PROXYAUTH'
698        return self._simple_command('PROXYAUTH', user)
699
700
701    def rename(self, oldmailbox, newmailbox):
702        """Rename old mailbox name to new.
703
704        (typ, [data]) = <instance>.rename(oldmailbox, newmailbox)
705        """
706        return self._simple_command('RENAME', oldmailbox, newmailbox)
707
708
709    def search(self, charset, *criteria):
710        """Search mailbox for matching messages.
711
712        (typ, [data]) = <instance>.search(charset, criterion, ...)
713
714        'data' is space separated list of matching message numbers.
715        If UTF8 is enabled, charset MUST be None.
716        """
717        name = 'SEARCH'
718        if charset:
719            if self.utf8_enabled:
720                raise IMAP4.error("Non-None charset not valid in UTF8 mode")
721            typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria)
722        else:
723            typ, dat = self._simple_command(name, *criteria)
724        return self._untagged_response(typ, dat, name)
725
726
727    def select(self, mailbox='INBOX', readonly=False):
728        """Select a mailbox.
729
730        Flush all untagged responses.
731
732        (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=False)
733
734        'data' is count of messages in mailbox ('EXISTS' response).
735
736        Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY'), so
737        other responses should be obtained via <instance>.response('FLAGS') etc.
738        """
739        self.untagged_responses = {}    # Flush old responses.
740        self.is_readonly = readonly
741        if readonly:
742            name = 'EXAMINE'
743        else:
744            name = 'SELECT'
745        typ, dat = self._simple_command(name, mailbox)
746        if typ != 'OK':
747            self.state = 'AUTH'     # Might have been 'SELECTED'
748            return typ, dat
749        self.state = 'SELECTED'
750        if 'READ-ONLY' in self.untagged_responses \
751                and not readonly:
752            if __debug__:
753                if self.debug >= 1:
754                    self._dump_ur(self.untagged_responses)
755            raise self.readonly('%s is not writable' % mailbox)
756        return typ, self.untagged_responses.get('EXISTS', [None])
757
758
759    def setacl(self, mailbox, who, what):
760        """Set a mailbox acl.
761
762        (typ, [data]) = <instance>.setacl(mailbox, who, what)
763        """
764        return self._simple_command('SETACL', mailbox, who, what)
765
766
767    def setannotation(self, *args):
768        """(typ, [data]) = <instance>.setannotation(mailbox[, entry, attribute]+)
769        Set ANNOTATIONs."""
770
771        typ, dat = self._simple_command('SETANNOTATION', *args)
772        return self._untagged_response(typ, dat, 'ANNOTATION')
773
774
775    def setquota(self, root, limits):
776        """Set the quota root's resource limits.
777
778        (typ, [data]) = <instance>.setquota(root, limits)
779        """
780        typ, dat = self._simple_command('SETQUOTA', root, limits)
781        return self._untagged_response(typ, dat, 'QUOTA')
782
783
784    def sort(self, sort_criteria, charset, *search_criteria):
785        """IMAP4rev1 extension SORT command.
786
787        (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...)
788        """
789        name = 'SORT'
790        #if not name in self.capabilities:      # Let the server decide!
791        #       raise self.error('unimplemented extension command: %s' % name)
792        if (sort_criteria[0],sort_criteria[-1]) != ('(',')'):
793            sort_criteria = '(%s)' % sort_criteria
794        typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria)
795        return self._untagged_response(typ, dat, name)
796
797
798    def starttls(self, ssl_context=None):
799        name = 'STARTTLS'
800        if not HAVE_SSL:
801            raise self.error('SSL support missing')
802        if self._tls_established:
803            raise self.abort('TLS session already established')
804        if name not in self.capabilities:
805            raise self.abort('TLS not supported by server')
806        # Generate a default SSL context if none was passed.
807        if ssl_context is None:
808            ssl_context = ssl._create_stdlib_context()
809        typ, dat = self._simple_command(name)
810        if typ == 'OK':
811            self.sock = ssl_context.wrap_socket(self.sock,
812                                                server_hostname=self.host)
813            self.file = self.sock.makefile('rb')
814            self._tls_established = True
815            self._get_capabilities()
816        else:
817            raise self.error("Couldn't establish TLS session")
818        return self._untagged_response(typ, dat, name)
819
820
821    def status(self, mailbox, names):
822        """Request named status conditions for mailbox.
823
824        (typ, [data]) = <instance>.status(mailbox, names)
825        """
826        name = 'STATUS'
827        #if self.PROTOCOL_VERSION == 'IMAP4':   # Let the server decide!
828        #    raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name)
829        typ, dat = self._simple_command(name, mailbox, names)
830        return self._untagged_response(typ, dat, name)
831
832
833    def store(self, message_set, command, flags):
834        """Alters flag dispositions for messages in mailbox.
835
836        (typ, [data]) = <instance>.store(message_set, command, flags)
837        """
838        if (flags[0],flags[-1]) != ('(',')'):
839            flags = '(%s)' % flags  # Avoid quoting the flags
840        typ, dat = self._simple_command('STORE', message_set, command, flags)
841        return self._untagged_response(typ, dat, 'FETCH')
842
843
844    def subscribe(self, mailbox):
845        """Subscribe to new mailbox.
846
847        (typ, [data]) = <instance>.subscribe(mailbox)
848        """
849        return self._simple_command('SUBSCRIBE', mailbox)
850
851
852    def thread(self, threading_algorithm, charset, *search_criteria):
853        """IMAPrev1 extension THREAD command.
854
855        (type, [data]) = <instance>.thread(threading_algorithm, charset, search_criteria, ...)
856        """
857        name = 'THREAD'
858        typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria)
859        return self._untagged_response(typ, dat, name)
860
861
862    def uid(self, command, *args):
863        """Execute "command arg ..." with messages identified by UID,
864                rather than message number.
865
866        (typ, [data]) = <instance>.uid(command, arg1, arg2, ...)
867
868        Returns response appropriate to 'command'.
869        """
870        command = command.upper()
871        if not command in Commands:
872            raise self.error("Unknown IMAP4 UID command: %s" % command)
873        if self.state not in Commands[command]:
874            raise self.error("command %s illegal in state %s, "
875                             "only allowed in states %s" %
876                             (command, self.state,
877                              ', '.join(Commands[command])))
878        name = 'UID'
879        typ, dat = self._simple_command(name, command, *args)
880        if command in ('SEARCH', 'SORT', 'THREAD'):
881            name = command
882        else:
883            name = 'FETCH'
884        return self._untagged_response(typ, dat, name)
885
886
887    def unsubscribe(self, mailbox):
888        """Unsubscribe from old mailbox.
889
890        (typ, [data]) = <instance>.unsubscribe(mailbox)
891        """
892        return self._simple_command('UNSUBSCRIBE', mailbox)
893
894
895    def xatom(self, name, *args):
896        """Allow simple extension commands
897                notified by server in CAPABILITY response.
898
899        Assumes command is legal in current state.
900
901        (typ, [data]) = <instance>.xatom(name, arg, ...)
902
903        Returns response appropriate to extension command `name'.
904        """
905        name = name.upper()
906        #if not name in self.capabilities:      # Let the server decide!
907        #    raise self.error('unknown extension command: %s' % name)
908        if not name in Commands:
909            Commands[name] = (self.state,)
910        return self._simple_command(name, *args)
911
912
913
914    #       Private methods
915
916
917    def _append_untagged(self, typ, dat):
918        if dat is None:
919            dat = b''
920        ur = self.untagged_responses
921        if __debug__:
922            if self.debug >= 5:
923                self._mesg('untagged_responses[%s] %s += ["%r"]' %
924                        (typ, len(ur.get(typ,'')), dat))
925        if typ in ur:
926            ur[typ].append(dat)
927        else:
928            ur[typ] = [dat]
929
930
931    def _check_bye(self):
932        bye = self.untagged_responses.get('BYE')
933        if bye:
934            raise self.abort(bye[-1].decode(self._encoding, 'replace'))
935
936
937    def _command(self, name, *args):
938
939        if self.state not in Commands[name]:
940            self.literal = None
941            raise self.error("command %s illegal in state %s, "
942                             "only allowed in states %s" %
943                             (name, self.state,
944                              ', '.join(Commands[name])))
945
946        for typ in ('OK', 'NO', 'BAD'):
947            if typ in self.untagged_responses:
948                del self.untagged_responses[typ]
949
950        if 'READ-ONLY' in self.untagged_responses \
951        and not self.is_readonly:
952            raise self.readonly('mailbox status changed to READ-ONLY')
953
954        tag = self._new_tag()
955        name = bytes(name, self._encoding)
956        data = tag + b' ' + name
957        for arg in args:
958            if arg is None: continue
959            if isinstance(arg, str):
960                arg = bytes(arg, self._encoding)
961            data = data + b' ' + arg
962
963        literal = self.literal
964        if literal is not None:
965            self.literal = None
966            if type(literal) is type(self._command):
967                literator = literal
968            else:
969                literator = None
970                data = data + bytes(' {%s}' % len(literal), self._encoding)
971
972        if __debug__:
973            if self.debug >= 4:
974                self._mesg('> %r' % data)
975            else:
976                self._log('> %r' % data)
977
978        try:
979            self.send(data + CRLF)
980        except OSError as val:
981            raise self.abort('socket error: %s' % val)
982
983        if literal is None:
984            return tag
985
986        while 1:
987            # Wait for continuation response
988
989            while self._get_response():
990                if self.tagged_commands[tag]:   # BAD/NO?
991                    return tag
992
993            # Send literal
994
995            if literator:
996                literal = literator(self.continuation_response)
997
998            if __debug__:
999                if self.debug >= 4:
1000                    self._mesg('write literal size %s' % len(literal))
1001
1002            try:
1003                self.send(literal)
1004                self.send(CRLF)
1005            except OSError as val:
1006                raise self.abort('socket error: %s' % val)
1007
1008            if not literator:
1009                break
1010
1011        return tag
1012
1013
1014    def _command_complete(self, name, tag):
1015        # BYE is expected after LOGOUT
1016        if name != 'LOGOUT':
1017            self._check_bye()
1018        try:
1019            typ, data = self._get_tagged_response(tag)
1020        except self.abort as val:
1021            raise self.abort('command: %s => %s' % (name, val))
1022        except self.error as val:
1023            raise self.error('command: %s => %s' % (name, val))
1024        if name != 'LOGOUT':
1025            self._check_bye()
1026        if typ == 'BAD':
1027            raise self.error('%s command error: %s %s' % (name, typ, data))
1028        return typ, data
1029
1030
1031    def _get_capabilities(self):
1032        typ, dat = self.capability()
1033        if dat == [None]:
1034            raise self.error('no CAPABILITY response from server')
1035        dat = str(dat[-1], self._encoding)
1036        dat = dat.upper()
1037        self.capabilities = tuple(dat.split())
1038
1039
1040    def _get_response(self):
1041
1042        # Read response and store.
1043        #
1044        # Returns None for continuation responses,
1045        # otherwise first response line received.
1046
1047        resp = self._get_line()
1048
1049        # Command completion response?
1050
1051        if self._match(self.tagre, resp):
1052            tag = self.mo.group('tag')
1053            if not tag in self.tagged_commands:
1054                raise self.abort('unexpected tagged response: %r' % resp)
1055
1056            typ = self.mo.group('type')
1057            typ = str(typ, self._encoding)
1058            dat = self.mo.group('data')
1059            self.tagged_commands[tag] = (typ, [dat])
1060        else:
1061            dat2 = None
1062
1063            # '*' (untagged) responses?
1064
1065            if not self._match(Untagged_response, resp):
1066                if self._match(self.Untagged_status, resp):
1067                    dat2 = self.mo.group('data2')
1068
1069            if self.mo is None:
1070                # Only other possibility is '+' (continuation) response...
1071
1072                if self._match(Continuation, resp):
1073                    self.continuation_response = self.mo.group('data')
1074                    return None     # NB: indicates continuation
1075
1076                raise self.abort("unexpected response: %r" % resp)
1077
1078            typ = self.mo.group('type')
1079            typ = str(typ, self._encoding)
1080            dat = self.mo.group('data')
1081            if dat is None: dat = b''        # Null untagged response
1082            if dat2: dat = dat + b' ' + dat2
1083
1084            # Is there a literal to come?
1085
1086            while self._match(self.Literal, dat):
1087
1088                # Read literal direct from connection.
1089
1090                size = int(self.mo.group('size'))
1091                if __debug__:
1092                    if self.debug >= 4:
1093                        self._mesg('read literal size %s' % size)
1094                data = self.read(size)
1095
1096                # Store response with literal as tuple
1097
1098                self._append_untagged(typ, (dat, data))
1099
1100                # Read trailer - possibly containing another literal
1101
1102                dat = self._get_line()
1103
1104            self._append_untagged(typ, dat)
1105
1106        # Bracketed response information?
1107
1108        if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
1109            typ = self.mo.group('type')
1110            typ = str(typ, self._encoding)
1111            self._append_untagged(typ, self.mo.group('data'))
1112
1113        if __debug__:
1114            if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'):
1115                self._mesg('%s response: %r' % (typ, dat))
1116
1117        return resp
1118
1119
1120    def _get_tagged_response(self, tag):
1121
1122        while 1:
1123            result = self.tagged_commands[tag]
1124            if result is not None:
1125                del self.tagged_commands[tag]
1126                return result
1127
1128            # If we've seen a BYE at this point, the socket will be
1129            # closed, so report the BYE now.
1130
1131            self._check_bye()
1132
1133            # Some have reported "unexpected response" exceptions.
1134            # Note that ignoring them here causes loops.
1135            # Instead, send me details of the unexpected response and
1136            # I'll update the code in `_get_response()'.
1137
1138            try:
1139                self._get_response()
1140            except self.abort as val:
1141                if __debug__:
1142                    if self.debug >= 1:
1143                        self.print_log()
1144                raise
1145
1146
1147    def _get_line(self):
1148
1149        line = self.readline()
1150        if not line:
1151            raise self.abort('socket error: EOF')
1152
1153        # Protocol mandates all lines terminated by CRLF
1154        if not line.endswith(b'\r\n'):
1155            raise self.abort('socket error: unterminated line: %r' % line)
1156
1157        line = line[:-2]
1158        if __debug__:
1159            if self.debug >= 4:
1160                self._mesg('< %r' % line)
1161            else:
1162                self._log('< %r' % line)
1163        return line
1164
1165
1166    def _match(self, cre, s):
1167
1168        # Run compiled regular expression match method on 's'.
1169        # Save result, return success.
1170
1171        self.mo = cre.match(s)
1172        if __debug__:
1173            if self.mo is not None and self.debug >= 5:
1174                self._mesg("\tmatched %r => %r" % (cre.pattern, self.mo.groups()))
1175        return self.mo is not None
1176
1177
1178    def _new_tag(self):
1179
1180        tag = self.tagpre + bytes(str(self.tagnum), self._encoding)
1181        self.tagnum = self.tagnum + 1
1182        self.tagged_commands[tag] = None
1183        return tag
1184
1185
1186    def _quote(self, arg):
1187
1188        arg = arg.replace('\\', '\\\\')
1189        arg = arg.replace('"', '\\"')
1190
1191        return '"' + arg + '"'
1192
1193
1194    def _simple_command(self, name, *args):
1195
1196        return self._command_complete(name, self._command(name, *args))
1197
1198
1199    def _untagged_response(self, typ, dat, name):
1200        if typ == 'NO':
1201            return typ, dat
1202        if not name in self.untagged_responses:
1203            return typ, [None]
1204        data = self.untagged_responses.pop(name)
1205        if __debug__:
1206            if self.debug >= 5:
1207                self._mesg('untagged_responses[%s] => %s' % (name, data))
1208        return typ, data
1209
1210
1211    if __debug__:
1212
1213        def _mesg(self, s, secs=None):
1214            if secs is None:
1215                secs = time.time()
1216            tm = time.strftime('%M:%S', time.localtime(secs))
1217            sys.stderr.write('  %s.%02d %s\n' % (tm, (secs*100)%100, s))
1218            sys.stderr.flush()
1219
1220        def _dump_ur(self, dict):
1221            # Dump untagged responses (in `dict').
1222            l = dict.items()
1223            if not l: return
1224            t = '\n\t\t'
1225            l = map(lambda x:'%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or ''), l)
1226            self._mesg('untagged responses dump:%s%s' % (t, t.join(l)))
1227
1228        def _log(self, line):
1229            # Keep log of last `_cmd_log_len' interactions for debugging.
1230            self._cmd_log[self._cmd_log_idx] = (line, time.time())
1231            self._cmd_log_idx += 1
1232            if self._cmd_log_idx >= self._cmd_log_len:
1233                self._cmd_log_idx = 0
1234
1235        def print_log(self):
1236            self._mesg('last %d IMAP4 interactions:' % len(self._cmd_log))
1237            i, n = self._cmd_log_idx, self._cmd_log_len
1238            while n:
1239                try:
1240                    self._mesg(*self._cmd_log[i])
1241                except:
1242                    pass
1243                i += 1
1244                if i >= self._cmd_log_len:
1245                    i = 0
1246                n -= 1
1247
1248
1249if HAVE_SSL:
1250
1251    class IMAP4_SSL(IMAP4):
1252
1253        """IMAP4 client class over SSL connection
1254
1255        Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile[, ssl_context]]]]])
1256
1257                host - host's name (default: localhost);
1258                port - port number (default: standard IMAP4 SSL port);
1259                keyfile - PEM formatted file that contains your private key (default: None);
1260                certfile - PEM formatted certificate chain file (default: None);
1261                ssl_context - a SSLContext object that contains your certificate chain
1262                              and private key (default: None)
1263                Note: if ssl_context is provided, then parameters keyfile or
1264                certfile should not be set otherwise ValueError is raised.
1265
1266        for more documentation see the docstring of the parent class IMAP4.
1267        """
1268
1269
1270        def __init__(self, host='', port=IMAP4_SSL_PORT, keyfile=None,
1271                     certfile=None, ssl_context=None):
1272            if ssl_context is not None and keyfile is not None:
1273                raise ValueError("ssl_context and keyfile arguments are mutually "
1274                                 "exclusive")
1275            if ssl_context is not None and certfile is not None:
1276                raise ValueError("ssl_context and certfile arguments are mutually "
1277                                 "exclusive")
1278            if keyfile is not None or certfile is not None:
1279                import warnings
1280                warnings.warn("keyfile and certfile are deprecated, use a "
1281                              "custom ssl_context instead", DeprecationWarning, 2)
1282            self.keyfile = keyfile
1283            self.certfile = certfile
1284            if ssl_context is None:
1285                ssl_context = ssl._create_stdlib_context(certfile=certfile,
1286                                                         keyfile=keyfile)
1287            self.ssl_context = ssl_context
1288            IMAP4.__init__(self, host, port)
1289
1290        def _create_socket(self):
1291            sock = IMAP4._create_socket(self)
1292            return self.ssl_context.wrap_socket(sock,
1293                                                server_hostname=self.host)
1294
1295        def open(self, host='', port=IMAP4_SSL_PORT):
1296            """Setup connection to remote server on "host:port".
1297                (default: localhost:standard IMAP4 SSL port).
1298            This connection will be used by the routines:
1299                read, readline, send, shutdown.
1300            """
1301            IMAP4.open(self, host, port)
1302
1303    __all__.append("IMAP4_SSL")
1304
1305
1306class IMAP4_stream(IMAP4):
1307
1308    """IMAP4 client class over a stream
1309
1310    Instantiate with: IMAP4_stream(command)
1311
1312            "command" - a string that can be passed to subprocess.Popen()
1313
1314    for more documentation see the docstring of the parent class IMAP4.
1315    """
1316
1317
1318    def __init__(self, command):
1319        self.command = command
1320        IMAP4.__init__(self)
1321
1322
1323    def open(self, host = None, port = None):
1324        """Setup a stream connection.
1325        This connection will be used by the routines:
1326            read, readline, send, shutdown.
1327        """
1328        self.host = None        # For compatibility with parent class
1329        self.port = None
1330        self.sock = None
1331        self.file = None
1332        self.process = subprocess.Popen(self.command,
1333            bufsize=DEFAULT_BUFFER_SIZE,
1334            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1335            shell=True, close_fds=True)
1336        self.writefile = self.process.stdin
1337        self.readfile = self.process.stdout
1338
1339    def read(self, size):
1340        """Read 'size' bytes from remote."""
1341        return self.readfile.read(size)
1342
1343
1344    def readline(self):
1345        """Read line from remote."""
1346        return self.readfile.readline()
1347
1348
1349    def send(self, data):
1350        """Send data to remote."""
1351        self.writefile.write(data)
1352        self.writefile.flush()
1353
1354
1355    def shutdown(self):
1356        """Close I/O established in "open"."""
1357        self.readfile.close()
1358        self.writefile.close()
1359        self.process.wait()
1360
1361
1362
1363class _Authenticator:
1364
1365    """Private class to provide en/decoding
1366            for base64-based authentication conversation.
1367    """
1368
1369    def __init__(self, mechinst):
1370        self.mech = mechinst    # Callable object to provide/process data
1371
1372    def process(self, data):
1373        ret = self.mech(self.decode(data))
1374        if ret is None:
1375            return b'*'     # Abort conversation
1376        return self.encode(ret)
1377
1378    def encode(self, inp):
1379        #
1380        #  Invoke binascii.b2a_base64 iteratively with
1381        #  short even length buffers, strip the trailing
1382        #  line feed from the result and append.  "Even"
1383        #  means a number that factors to both 6 and 8,
1384        #  so when it gets to the end of the 8-bit input
1385        #  there's no partial 6-bit output.
1386        #
1387        oup = b''
1388        if isinstance(inp, str):
1389            inp = inp.encode('utf-8')
1390        while inp:
1391            if len(inp) > 48:
1392                t = inp[:48]
1393                inp = inp[48:]
1394            else:
1395                t = inp
1396                inp = b''
1397            e = binascii.b2a_base64(t)
1398            if e:
1399                oup = oup + e[:-1]
1400        return oup
1401
1402    def decode(self, inp):
1403        if not inp:
1404            return b''
1405        return binascii.a2b_base64(inp)
1406
1407Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
1408Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])}
1409
1410def Internaldate2tuple(resp):
1411    """Parse an IMAP4 INTERNALDATE string.
1412
1413    Return corresponding local time.  The return value is a
1414    time.struct_time tuple or None if the string has wrong format.
1415    """
1416
1417    mo = InternalDate.match(resp)
1418    if not mo:
1419        return None
1420
1421    mon = Mon2num[mo.group('mon')]
1422    zonen = mo.group('zonen')
1423
1424    day = int(mo.group('day'))
1425    year = int(mo.group('year'))
1426    hour = int(mo.group('hour'))
1427    min = int(mo.group('min'))
1428    sec = int(mo.group('sec'))
1429    zoneh = int(mo.group('zoneh'))
1430    zonem = int(mo.group('zonem'))
1431
1432    # INTERNALDATE timezone must be subtracted to get UT
1433
1434    zone = (zoneh*60 + zonem)*60
1435    if zonen == b'-':
1436        zone = -zone
1437
1438    tt = (year, mon, day, hour, min, sec, -1, -1, -1)
1439    utc = calendar.timegm(tt) - zone
1440
1441    return time.localtime(utc)
1442
1443
1444
1445def Int2AP(num):
1446
1447    """Convert integer to A-P string representation."""
1448
1449    val = b''; AP = b'ABCDEFGHIJKLMNOP'
1450    num = int(abs(num))
1451    while num:
1452        num, mod = divmod(num, 16)
1453        val = AP[mod:mod+1] + val
1454    return val
1455
1456
1457
1458def ParseFlags(resp):
1459
1460    """Convert IMAP4 flags response to python tuple."""
1461
1462    mo = Flags.match(resp)
1463    if not mo:
1464        return ()
1465
1466    return tuple(mo.group('flags').split())
1467
1468
1469def Time2Internaldate(date_time):
1470
1471    """Convert date_time to IMAP4 INTERNALDATE representation.
1472
1473    Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'.  The
1474    date_time argument can be a number (int or float) representing
1475    seconds since epoch (as returned by time.time()), a 9-tuple
1476    representing local time, an instance of time.struct_time (as
1477    returned by time.localtime()), an aware datetime instance or a
1478    double-quoted string.  In the last case, it is assumed to already
1479    be in the correct format.
1480    """
1481    if isinstance(date_time, (int, float)):
1482        dt = datetime.fromtimestamp(date_time,
1483                                    timezone.utc).astimezone()
1484    elif isinstance(date_time, tuple):
1485        try:
1486            gmtoff = date_time.tm_gmtoff
1487        except AttributeError:
1488            if time.daylight:
1489                dst = date_time[8]
1490                if dst == -1:
1491                    dst = time.localtime(time.mktime(date_time))[8]
1492                gmtoff = -(time.timezone, time.altzone)[dst]
1493            else:
1494                gmtoff = -time.timezone
1495        delta = timedelta(seconds=gmtoff)
1496        dt = datetime(*date_time[:6], tzinfo=timezone(delta))
1497    elif isinstance(date_time, datetime):
1498        if date_time.tzinfo is None:
1499            raise ValueError("date_time must be aware")
1500        dt = date_time
1501    elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'):
1502        return date_time        # Assume in correct format
1503    else:
1504        raise ValueError("date_time not of a known type")
1505    fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month])
1506    return dt.strftime(fmt)
1507
1508
1509
1510if __name__ == '__main__':
1511
1512    # To test: invoke either as 'python imaplib.py [IMAP4_server_hostname]'
1513    # or 'python imaplib.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"'
1514    # to test the IMAP4_stream class
1515
1516    import getopt, getpass
1517
1518    try:
1519        optlist, args = getopt.getopt(sys.argv[1:], 'd:s:')
1520    except getopt.error as val:
1521        optlist, args = (), ()
1522
1523    stream_command = None
1524    for opt,val in optlist:
1525        if opt == '-d':
1526            Debug = int(val)
1527        elif opt == '-s':
1528            stream_command = val
1529            if not args: args = (stream_command,)
1530
1531    if not args: args = ('',)
1532
1533    host = args[0]
1534
1535    USER = getpass.getuser()
1536    PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost"))
1537
1538    test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':'\n'}
1539    test_seq1 = (
1540    ('login', (USER, PASSWD)),
1541    ('create', ('/tmp/xxx 1',)),
1542    ('rename', ('/tmp/xxx 1', '/tmp/yyy')),
1543    ('CREATE', ('/tmp/yyz 2',)),
1544    ('append', ('/tmp/yyz 2', None, None, test_mesg)),
1545    ('list', ('/tmp', 'yy*')),
1546    ('select', ('/tmp/yyz 2',)),
1547    ('search', (None, 'SUBJECT', 'test')),
1548    ('fetch', ('1', '(FLAGS INTERNALDATE RFC822)')),
1549    ('store', ('1', 'FLAGS', r'(\Deleted)')),
1550    ('namespace', ()),
1551    ('expunge', ()),
1552    ('recent', ()),
1553    ('close', ()),
1554    )
1555
1556    test_seq2 = (
1557    ('select', ()),
1558    ('response',('UIDVALIDITY',)),
1559    ('uid', ('SEARCH', 'ALL')),
1560    ('response', ('EXISTS',)),
1561    ('append', (None, None, None, test_mesg)),
1562    ('recent', ()),
1563    ('logout', ()),
1564    )
1565
1566    def run(cmd, args):
1567        M._mesg('%s %s' % (cmd, args))
1568        typ, dat = getattr(M, cmd)(*args)
1569        M._mesg('%s => %s %s' % (cmd, typ, dat))
1570        if typ == 'NO': raise dat[0]
1571        return dat
1572
1573    try:
1574        if stream_command:
1575            M = IMAP4_stream(stream_command)
1576        else:
1577            M = IMAP4(host)
1578        if M.state == 'AUTH':
1579            test_seq1 = test_seq1[1:]   # Login not needed
1580        M._mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
1581        M._mesg('CAPABILITIES = %r' % (M.capabilities,))
1582
1583        for cmd,args in test_seq1:
1584            run(cmd, args)
1585
1586        for ml in run('list', ('/tmp/', 'yy%')):
1587            mo = re.match(r'.*"([^"]+)"$', ml)
1588            if mo: path = mo.group(1)
1589            else: path = ml.split()[-1]
1590            run('delete', (path,))
1591
1592        for cmd,args in test_seq2:
1593            dat = run(cmd, args)
1594
1595            if (cmd,args) != ('uid', ('SEARCH', 'ALL')):
1596                continue
1597
1598            uid = dat[-1].split()
1599            if not uid: continue
1600            run('uid', ('FETCH', '%s' % uid[-1],
1601                    '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))
1602
1603        print('\nAll tests OK.')
1604
1605    except:
1606        print('\nTests failed.')
1607
1608        if not Debug:
1609            print('''
1610If you would like to see debugging output,
1611try: %s -d5
1612''' % sys.argv[0])
1613
1614        raise
1615