1""" 2Display number of unread messages from IMAP account. 3 4Configuration parameters: 5 allow_urgent: display urgency on unread messages (default False) 6 auth_scope: scope to use with OAuth2 (default 'https://mail.google.com/') 7 auth_token: path to where the pickled access/refresh token will be saved 8 after successful credential authorization. 9 (default '~/.config/py3status/imap_auth_token.pickle') 10 cache_timeout: refresh interval for this module (default 60) 11 client_secret: the path to the client secret file with OAuth 2.0 12 credentials (if None then OAuth not used) (default None) 13 criterion: status of emails to check for (default 'UNSEEN') 14 debug: log warnings (default False) 15 degraded_when_stale: color as degraded when updating failed (default True) 16 format: display format for this module (default 'Mail: {unseen}') 17 hide_if_zero: hide this module when no new mail (default False) 18 mailbox: name of the mailbox to check (default 'INBOX') 19 password: login password (default None) 20 port: number to use (default '993') 21 read_timeout: timeout for read(2) syscalls (default 5) 22 security: login authentication method: 'ssl' or 'starttls' 23 (startssl needs python 3.2 or later) (default 'ssl') 24 server: server to connect (default None) 25 use_idle: use IMAP4 IDLE instead of polling; requires compatible 26 server; uses cache_timeout for IDLE's timeout; will auto detect 27 when set to None (default None) 28 user: login user (default None) 29 30Format placeholders: 31 {unseen} number of unread emails 32 33Color options: 34 color_new_mail: use color when new mail arrives, default to color_good 35 36OAuth: 37 OAuth2 will be used for authentication instead of a password if the 38 client_secret path is set. 39 40 To create a client_secret for your Google account, visit 41 https://console.developers.google.com/ and create an "OAuth client ID" from 42 the credentials tab. 43 44 This client secret enables the app (in this case, the IMAP py3status module) 45 to request access to a user's email. Therefore the client secret doesn't 46 have to be for the same Google account as the email account being accessed. 47 48 When the IMAP module first tries to access your email account a browser 49 window will open asking for authorization to access your email. 50 After authorization is complete, an access/refresh token will be saved to 51 the path configured in auth_token. 52 53 Requires: Using OAuth requires the google-auth and google-auth-oauthlib 54 libraries to be installed. 55 56 Note: the same client secret file can be used as with the py3status Google 57 Calendar module. 58 59@author obb, girst 60 61SAMPLE OUTPUT 62{'full_text': 'Mail: 36', 'color': '#00FF00'} 63""" 64import imaplib 65from threading import Thread 66from time import sleep 67from ssl import create_default_context 68from socket import setdefaulttimeout, error as socket_error 69from pathlib import Path 70 71STRING_UNAVAILABLE = "N/A" 72NO_DATA_YET = -1 73 74 75class Py3status: 76 """""" 77 78 # available configuration parameters 79 allow_urgent = False 80 auth_scope = "https://mail.google.com/" 81 auth_token = "~/.config/py3status/imap_auth_token.pickle" 82 cache_timeout = 60 83 client_secret = None 84 criterion = "UNSEEN" 85 debug = False 86 degraded_when_stale = True 87 format = "Mail: {unseen}" 88 hide_if_zero = False 89 mailbox = "INBOX" 90 password = None 91 port = "993" 92 read_timeout = 5 93 security = "ssl" 94 server = None 95 use_idle = None 96 user = None 97 98 class Meta: 99 deprecated = { 100 "rename": [ 101 { 102 "param": "new_mail_color", 103 "new": "color_new_mail", 104 "msg": "obsolete parameter use `color_new_mail`", 105 }, 106 { 107 "param": "imap_server", 108 "new": "server", 109 "msg": "obsolete parameter use `server`", 110 }, 111 ] 112 } 113 114 def post_config_hook(self): 115 # class variables: 116 self.mail_count = NO_DATA_YET 117 self.connection = None 118 self.mail_error = None # cannot throw self.py3.error from thread 119 self.network_error = None 120 self.command_tag = ( 121 0 # IMAPcommands are tagged, so responses can be matched up to requests 122 ) 123 self.idle_thread = Thread() 124 125 if self.client_secret: 126 self.client_secret = Path(self.client_secret).expanduser() 127 self.auth_token = Path(self.auth_token).expanduser() 128 129 if self.security not in ["ssl", "starttls"]: 130 raise ValueError("Unknown security protocol") 131 132 def imap(self): 133 # I -- acquire mail_count 134 if self.use_idle is not False: 135 if not self.idle_thread.is_alive(): 136 sleep( 137 self.read_timeout 138 ) # rate-limit thread-restarting (when network is offline) 139 self.idle_thread = Thread(target=self._get_mail_count) 140 self.idle_thread.daemon = True 141 self.idle_thread.start() 142 else: 143 self._get_mail_count() 144 response = {"cached_until": self.py3.time_in(self.cache_timeout)} 145 if self.mail_error is not None: 146 self.py3.log(self.mail_error, level=self.py3.LOG_ERROR) 147 self.py3.error(self.mail_error) 148 self.mail_error = None 149 150 # II -- format response 151 response["full_text"] = self.py3.safe_format( 152 self.format, {"unseen": self.mail_count} 153 ) 154 155 if self.mail_count is None: 156 response["color"] = (self.py3.COLOR_BAD,) 157 response["full_text"] = self.py3.safe_format( 158 self.format, {"unseen": STRING_UNAVAILABLE} 159 ) 160 elif self.mail_count == NO_DATA_YET: 161 response["full_text"] = "" 162 elif self.mail_count == 0 and self.hide_if_zero: 163 response["full_text"] = "" 164 elif self.mail_count > 0: 165 response["color"] = self.py3.COLOR_NEW_MAIL or self.py3.COLOR_GOOD 166 response["urgent"] = self.allow_urgent 167 if self.network_error is not None and self.degraded_when_stale: 168 response["color"] = self.py3.COLOR_DEGRADED 169 170 return response 171 172 def _check_if_idle(self, connection): 173 supports_idle = "IDLE" in connection.capabilities 174 175 self.use_idle = supports_idle 176 self.py3.log("Will use {}".format("idling" if self.use_idle else "polling")) 177 if self.use_idle and not supports_idle: 178 self.py3.error("Server does not support IDLE") 179 180 def _get_creds(self): 181 from google_auth_oauthlib.flow import InstalledAppFlow 182 from google.auth.transport.requests import Request 183 from google.auth.exceptions import TransportError 184 import pickle 185 186 self.creds = None 187 188 # Open pickle file with access and refresh tokens if it exists 189 if self.auth_token.exists(): 190 with self.auth_token.open("rb") as token: 191 self.creds = pickle.load(token) 192 193 if not self.creds or not self.creds.valid: 194 try: 195 if self.creds and self.creds.expired and self.creds.refresh_token: 196 # Credentials expired but contain refresh token 197 self.creds.refresh(Request()) 198 else: 199 # No valid credentials so open authorisation URL in browser 200 flow = InstalledAppFlow.from_client_secrets_file( 201 self.client_secret, [self.auth_scope] 202 ) 203 self.creds = flow.run_local_server(port=0) 204 # Save the credentials for the next run 205 with self.auth_token.open("wb") as token: 206 pickle.dump(self.creds, token) 207 except TransportError as e: 208 # Treat the same as a socket_error 209 raise socket_error(e) 210 211 def _connection_ssl(self): 212 if self.client_secret: 213 # Use OAUTH 214 self._get_creds() 215 setdefaulttimeout(self.read_timeout) 216 connection = imaplib.IMAP4_SSL(self.server, int(self.port)) 217 return connection 218 219 def _connection_starttls(self): 220 setdefaulttimeout(self.read_timeout) 221 connection = imaplib.IMAP4(self.server, int(self.port)) 222 connection.starttls(create_default_context()) 223 return connection 224 225 def _connect(self): 226 if self.security == "ssl": 227 self.connection = self._connection_ssl() 228 elif self.security == "starttls": 229 self.connection = self._connection_starttls() 230 if self.use_idle is None: 231 self._check_if_idle(self.connection) 232 233 # trigger a socket.timeout if any IMAP request isn't completed in time: 234 self.connection.socket().settimeout(self.read_timeout) 235 236 def _disconnect(self): 237 try: 238 if self.connection is not None: 239 if self.connection.state == "SELECTED": 240 self.connection.close() 241 self.connection.logout() 242 except: # noqa e722 243 pass 244 finally: 245 self.connection = None 246 247 def _idle(self): 248 """ 249 since imaplib doesn't support IMAP4r1 IDLE, we'll do it by hand 250 """ 251 socket = None 252 253 try: 254 # build a new command tag (Xnnn) as bytes: 255 self.command_tag = (self.command_tag + 1) % 1000 256 command_tag = b"X" + bytes(str(self.command_tag).zfill(3), "ascii") 257 258 # make sure we have selected anything before idling: 259 directories = self.mailbox.split(",") 260 self.connection.select(directories[0]) 261 262 socket = self.connection.socket() 263 264 # send IDLE command and check response: 265 socket.write(command_tag + b" IDLE\r\n") 266 try: 267 response = socket.read(4096).decode("ascii") 268 except socket_error: 269 raise imaplib.IMAP4.abort("Server didn't respond to 'IDLE' in time") 270 # Dovecot will responde with "+ idling", courier will return "+ entering idle mode" 271 # RFC 2177 (https://tools.ietf.org/html/rfc2177) only requires the "+" character. 272 if not response.lower().startswith("+"): 273 raise imaplib.IMAP4.abort(f"While initializing IDLE: {response}") 274 275 # wait for changes (EXISTS, EXPUNGE, etc.): 276 socket.settimeout(self.cache_timeout) 277 while True: 278 try: 279 response = socket.read(4096).decode("ascii") 280 if response.upper().startswith("* OK"): 281 continue # ignore '* OK Still here' 282 else: 283 break 284 except socket_error: # IDLE timed out 285 break 286 287 finally: # terminate IDLE command gracefully 288 if socket is None: 289 return 290 291 socket.settimeout(self.read_timeout) 292 socket.write(b"DONE\r\n") # important! Can't query IMAP again otherwise 293 try: 294 response = socket.read(4096).decode("ascii") 295 except socket_error: 296 raise imaplib.IMAP4.abort("Server didn't respond to 'DONE' in time") 297 298 # sometimes, more messages come in between reading and DONEing; so read them again: 299 if response.startswith("* "): 300 try: 301 response = socket.read(4096).decode("ascii") 302 except socket_error: 303 raise imaplib.IMAP4.abort( 304 "Server sent more continuations, but no 'DONE' ack" 305 ) 306 307 expected_response = (command_tag + b" OK").decode("ascii") 308 if not response.lower().startswith(expected_response.lower()): 309 raise imaplib.IMAP4.abort("While terminating IDLE: " + response) 310 311 def _get_mail_count(self): 312 retry_counter = 0 313 retry_max = 3 314 while True: 315 try: 316 if self.connection is None: 317 self._connect() 318 if self.connection.state == "NONAUTH": 319 if self.client_secret: 320 # Authenticate using OAUTH 321 auth_string = "user={}\1auth=Bearer {}\1\1".format( 322 self.user, self.creds.token 323 ) 324 self.connection.authenticate("XOAUTH2", lambda x: auth_string) 325 else: 326 # Login with user and password 327 self.connection.login(self.user, self.password) 328 329 tmp_mail_count = 0 330 directories = self.mailbox.split(",") 331 332 for directory in directories: 333 self.connection.select(directory) 334 unseen_response = self.connection.search(None, self.criterion) 335 mails = unseen_response[1][0].split() 336 tmp_mail_count += len(mails) 337 338 self.mail_count = tmp_mail_count 339 self.network_error = None 340 341 if self.use_idle: 342 self.py3.update() 343 self._idle() 344 retry_counter = 0 345 else: 346 return 347 except (socket_error, imaplib.IMAP4.abort, imaplib.IMAP4.readonly) as e: 348 if "didn't respond to 'DONE'" in str(e) or isinstance(e, socket_error): 349 self.network_error = str(e) 350 error_type = "Network" 351 else: 352 error_type = "Recoverable" 353 # Note: we don't reset network_error, as we want this to persist 354 # until we either run into a permanent error or successfully receive 355 # another response from the IMAP server. 356 357 if self.debug: 358 self.py3.log( 359 f"{error_type} error - {e}", level=self.py3.LOG_WARNING, 360 ) 361 self._disconnect() 362 363 retry_counter += 1 364 if retry_counter <= retry_max: 365 if self.debug: 366 self.py3.log( 367 f"Retrying ({retry_counter}/{retry_max})", 368 level=self.py3.LOG_INFO, 369 ) 370 continue 371 break 372 except (imaplib.IMAP4.error, Exception) as e: 373 self.mail_error = f"Fatal error - {e}" 374 self._disconnect() 375 self.mail_count = None 376 377 retry_counter += 1 378 if retry_counter <= retry_max: 379 if self.debug: 380 self.py3.log( 381 "Will retry after 60 seconds ({}/{})".format( 382 retry_counter, retry_max 383 ), 384 level=self.py3.LOG_INFO, 385 ) 386 sleep(60) 387 continue 388 break 389 finally: 390 self.py3.update() # to propagate mail_error 391 392 393if __name__ == "__main__": 394 """ 395 Run module in test mode. 396 """ 397 from py3status.module_test import module_test 398 399 module_test(Py3status) 400