1"""
2Display number of unread messages from IMAP account.
3
4Configuration parameters:
5    allow_urgent: display urgency on unread messages (default False)
6    auth_scope: scope to use with OAuth2 (default 'https://mail.google.com/')
7    auth_token: path to where the pickled access/refresh token will be saved
8        after successful credential authorization.
9        (default '~/.config/py3status/imap_auth_token.pickle')
10    cache_timeout: refresh interval for this module (default 60)
11    client_secret: the path to the client secret file with OAuth 2.0
12        credentials (if None then OAuth not used) (default None)
13    criterion: status of emails to check for (default 'UNSEEN')
14    debug: log warnings (default False)
15    degraded_when_stale: color as degraded when updating failed (default True)
16    format: display format for this module (default 'Mail: {unseen}')
17    hide_if_zero: hide this module when no new mail (default False)
18    mailbox: name of the mailbox to check (default 'INBOX')
19    password: login password (default None)
20    port: number to use (default '993')
21    read_timeout: timeout for read(2) syscalls (default 5)
22    security: login authentication method: 'ssl' or 'starttls'
23        (startssl needs python 3.2 or later) (default 'ssl')
24    server: server to connect (default None)
25    use_idle: use IMAP4 IDLE instead of polling; requires compatible
26        server; uses cache_timeout for IDLE's timeout; will auto detect
27        when set to None (default None)
28    user: login user (default None)
29
30Format placeholders:
31    {unseen} number of unread emails
32
33Color options:
34    color_new_mail: use color when new mail arrives, default to color_good
35
36OAuth:
37    OAuth2 will be used for authentication instead of a password if the
38    client_secret path is set.
39
40    To create a client_secret for your Google account, visit
41    https://console.developers.google.com/ and create an "OAuth client ID" from
42    the credentials tab.
43
44    This client secret enables the app (in this case, the IMAP py3status module)
45    to request access to a user's email. Therefore the client secret doesn't
46    have to be for the same Google account as the email account being accessed.
47
48    When the IMAP module first tries to access your email account a browser
49    window will open asking for authorization to access your email.
50    After authorization is complete, an access/refresh token will be saved to
51    the path configured in auth_token.
52
53    Requires: Using OAuth requires the google-auth and google-auth-oauthlib
54    libraries to be installed.
55
56    Note: the same client secret file can be used as with the py3status Google
57    Calendar module.
58
59@author obb, girst
60
61SAMPLE OUTPUT
62{'full_text': 'Mail: 36', 'color': '#00FF00'}
63"""
64import imaplib
65from threading import Thread
66from time import sleep
67from ssl import create_default_context
68from socket import setdefaulttimeout, error as socket_error
69from pathlib import Path
70
71STRING_UNAVAILABLE = "N/A"
72NO_DATA_YET = -1
73
74
75class Py3status:
76    """"""
77
78    # available configuration parameters
79    allow_urgent = False
80    auth_scope = "https://mail.google.com/"
81    auth_token = "~/.config/py3status/imap_auth_token.pickle"
82    cache_timeout = 60
83    client_secret = None
84    criterion = "UNSEEN"
85    debug = False
86    degraded_when_stale = True
87    format = "Mail: {unseen}"
88    hide_if_zero = False
89    mailbox = "INBOX"
90    password = None
91    port = "993"
92    read_timeout = 5
93    security = "ssl"
94    server = None
95    use_idle = None
96    user = None
97
98    class Meta:
99        deprecated = {
100            "rename": [
101                {
102                    "param": "new_mail_color",
103                    "new": "color_new_mail",
104                    "msg": "obsolete parameter use `color_new_mail`",
105                },
106                {
107                    "param": "imap_server",
108                    "new": "server",
109                    "msg": "obsolete parameter use `server`",
110                },
111            ]
112        }
113
114    def post_config_hook(self):
115        # class variables:
116        self.mail_count = NO_DATA_YET
117        self.connection = None
118        self.mail_error = None  # cannot throw self.py3.error from thread
119        self.network_error = None
120        self.command_tag = (
121            0  # IMAPcommands are tagged, so responses can be matched up to requests
122        )
123        self.idle_thread = Thread()
124
125        if self.client_secret:
126            self.client_secret = Path(self.client_secret).expanduser()
127        self.auth_token = Path(self.auth_token).expanduser()
128
129        if self.security not in ["ssl", "starttls"]:
130            raise ValueError("Unknown security protocol")
131
132    def imap(self):
133        # I -- acquire mail_count
134        if self.use_idle is not False:
135            if not self.idle_thread.is_alive():
136                sleep(
137                    self.read_timeout
138                )  # rate-limit thread-restarting (when network is offline)
139                self.idle_thread = Thread(target=self._get_mail_count)
140                self.idle_thread.daemon = True
141                self.idle_thread.start()
142        else:
143            self._get_mail_count()
144        response = {"cached_until": self.py3.time_in(self.cache_timeout)}
145        if self.mail_error is not None:
146            self.py3.log(self.mail_error, level=self.py3.LOG_ERROR)
147            self.py3.error(self.mail_error)
148            self.mail_error = None
149
150        # II -- format response
151        response["full_text"] = self.py3.safe_format(
152            self.format, {"unseen": self.mail_count}
153        )
154
155        if self.mail_count is None:
156            response["color"] = (self.py3.COLOR_BAD,)
157            response["full_text"] = self.py3.safe_format(
158                self.format, {"unseen": STRING_UNAVAILABLE}
159            )
160        elif self.mail_count == NO_DATA_YET:
161            response["full_text"] = ""
162        elif self.mail_count == 0 and self.hide_if_zero:
163            response["full_text"] = ""
164        elif self.mail_count > 0:
165            response["color"] = self.py3.COLOR_NEW_MAIL or self.py3.COLOR_GOOD
166            response["urgent"] = self.allow_urgent
167        if self.network_error is not None and self.degraded_when_stale:
168            response["color"] = self.py3.COLOR_DEGRADED
169
170        return response
171
172    def _check_if_idle(self, connection):
173        supports_idle = "IDLE" in connection.capabilities
174
175        self.use_idle = supports_idle
176        self.py3.log("Will use {}".format("idling" if self.use_idle else "polling"))
177        if self.use_idle and not supports_idle:
178            self.py3.error("Server does not support IDLE")
179
180    def _get_creds(self):
181        from google_auth_oauthlib.flow import InstalledAppFlow
182        from google.auth.transport.requests import Request
183        from google.auth.exceptions import TransportError
184        import pickle
185
186        self.creds = None
187
188        # Open pickle file with access and refresh tokens if it exists
189        if self.auth_token.exists():
190            with self.auth_token.open("rb") as token:
191                self.creds = pickle.load(token)
192
193        if not self.creds or not self.creds.valid:
194            try:
195                if self.creds and self.creds.expired and self.creds.refresh_token:
196                    # Credentials expired but contain refresh token
197                    self.creds.refresh(Request())
198                else:
199                    # No valid credentials so open authorisation URL in browser
200                    flow = InstalledAppFlow.from_client_secrets_file(
201                        self.client_secret, [self.auth_scope]
202                    )
203                    self.creds = flow.run_local_server(port=0)
204                # Save the credentials for the next run
205                with self.auth_token.open("wb") as token:
206                    pickle.dump(self.creds, token)
207            except TransportError as e:
208                # Treat the same as a socket_error
209                raise socket_error(e)
210
211    def _connection_ssl(self):
212        if self.client_secret:
213            # Use OAUTH
214            self._get_creds()
215        setdefaulttimeout(self.read_timeout)
216        connection = imaplib.IMAP4_SSL(self.server, int(self.port))
217        return connection
218
219    def _connection_starttls(self):
220        setdefaulttimeout(self.read_timeout)
221        connection = imaplib.IMAP4(self.server, int(self.port))
222        connection.starttls(create_default_context())
223        return connection
224
225    def _connect(self):
226        if self.security == "ssl":
227            self.connection = self._connection_ssl()
228        elif self.security == "starttls":
229            self.connection = self._connection_starttls()
230        if self.use_idle is None:
231            self._check_if_idle(self.connection)
232
233        # trigger a socket.timeout if any IMAP request isn't completed in time:
234        self.connection.socket().settimeout(self.read_timeout)
235
236    def _disconnect(self):
237        try:
238            if self.connection is not None:
239                if self.connection.state == "SELECTED":
240                    self.connection.close()
241                self.connection.logout()
242        except:  # noqa e722
243            pass
244        finally:
245            self.connection = None
246
247    def _idle(self):
248        """
249        since imaplib doesn't support IMAP4r1 IDLE, we'll do it by hand
250        """
251        socket = None
252
253        try:
254            # build a new command tag (Xnnn) as bytes:
255            self.command_tag = (self.command_tag + 1) % 1000
256            command_tag = b"X" + bytes(str(self.command_tag).zfill(3), "ascii")
257
258            # make sure we have selected anything before idling:
259            directories = self.mailbox.split(",")
260            self.connection.select(directories[0])
261
262            socket = self.connection.socket()
263
264            # send IDLE command and check response:
265            socket.write(command_tag + b" IDLE\r\n")
266            try:
267                response = socket.read(4096).decode("ascii")
268            except socket_error:
269                raise imaplib.IMAP4.abort("Server didn't respond to 'IDLE' in time")
270            # Dovecot will responde with "+ idling", courier will return "+ entering idle mode"
271            # RFC 2177 (https://tools.ietf.org/html/rfc2177) only requires the "+" character.
272            if not response.lower().startswith("+"):
273                raise imaplib.IMAP4.abort(f"While initializing IDLE: {response}")
274
275            # wait for changes (EXISTS, EXPUNGE, etc.):
276            socket.settimeout(self.cache_timeout)
277            while True:
278                try:
279                    response = socket.read(4096).decode("ascii")
280                    if response.upper().startswith("* OK"):
281                        continue  # ignore '* OK Still here'
282                    else:
283                        break
284                except socket_error:  # IDLE timed out
285                    break
286
287        finally:  # terminate IDLE command gracefully
288            if socket is None:
289                return
290
291            socket.settimeout(self.read_timeout)
292            socket.write(b"DONE\r\n")  # important! Can't query IMAP again otherwise
293            try:
294                response = socket.read(4096).decode("ascii")
295            except socket_error:
296                raise imaplib.IMAP4.abort("Server didn't respond to 'DONE' in time")
297
298            # sometimes, more messages come in between reading and DONEing; so read them again:
299            if response.startswith("* "):
300                try:
301                    response = socket.read(4096).decode("ascii")
302                except socket_error:
303                    raise imaplib.IMAP4.abort(
304                        "Server sent more continuations, but no 'DONE' ack"
305                    )
306
307            expected_response = (command_tag + b" OK").decode("ascii")
308            if not response.lower().startswith(expected_response.lower()):
309                raise imaplib.IMAP4.abort("While terminating IDLE: " + response)
310
311    def _get_mail_count(self):
312        retry_counter = 0
313        retry_max = 3
314        while True:
315            try:
316                if self.connection is None:
317                    self._connect()
318                if self.connection.state == "NONAUTH":
319                    if self.client_secret:
320                        # Authenticate using OAUTH
321                        auth_string = "user={}\1auth=Bearer {}\1\1".format(
322                            self.user, self.creds.token
323                        )
324                        self.connection.authenticate("XOAUTH2", lambda x: auth_string)
325                    else:
326                        # Login with user and password
327                        self.connection.login(self.user, self.password)
328
329                tmp_mail_count = 0
330                directories = self.mailbox.split(",")
331
332                for directory in directories:
333                    self.connection.select(directory)
334                    unseen_response = self.connection.search(None, self.criterion)
335                    mails = unseen_response[1][0].split()
336                    tmp_mail_count += len(mails)
337
338                self.mail_count = tmp_mail_count
339                self.network_error = None
340
341                if self.use_idle:
342                    self.py3.update()
343                    self._idle()
344                    retry_counter = 0
345                else:
346                    return
347            except (socket_error, imaplib.IMAP4.abort, imaplib.IMAP4.readonly) as e:
348                if "didn't respond to 'DONE'" in str(e) or isinstance(e, socket_error):
349                    self.network_error = str(e)
350                    error_type = "Network"
351                else:
352                    error_type = "Recoverable"
353                    # Note: we don't reset network_error, as we want this to persist
354                    # until we either run into a permanent error or successfully receive
355                    # another response from the IMAP server.
356
357                if self.debug:
358                    self.py3.log(
359                        f"{error_type} error - {e}", level=self.py3.LOG_WARNING,
360                    )
361                self._disconnect()
362
363                retry_counter += 1
364                if retry_counter <= retry_max:
365                    if self.debug:
366                        self.py3.log(
367                            f"Retrying ({retry_counter}/{retry_max})",
368                            level=self.py3.LOG_INFO,
369                        )
370                    continue
371                break
372            except (imaplib.IMAP4.error, Exception) as e:
373                self.mail_error = f"Fatal error - {e}"
374                self._disconnect()
375                self.mail_count = None
376
377                retry_counter += 1
378                if retry_counter <= retry_max:
379                    if self.debug:
380                        self.py3.log(
381                            "Will retry after 60 seconds ({}/{})".format(
382                                retry_counter, retry_max
383                            ),
384                            level=self.py3.LOG_INFO,
385                        )
386                    sleep(60)
387                    continue
388                break
389            finally:
390                self.py3.update()  # to propagate mail_error
391
392
393if __name__ == "__main__":
394    """
395    Run module in test mode.
396    """
397    from py3status.module_test import module_test
398
399    module_test(Py3status)
400