1# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et: 2 3# Copyright 2015-2021 Florian Bruhin (The Compiler) <mail@qutebrowser.org> 4# 5# This file is part of qutebrowser. 6# 7# qutebrowser is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# qutebrowser is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with qutebrowser. If not, see <https://www.gnu.org/licenses/>. 19 20"""Management of sessions - saved tabs/windows.""" 21 22import os 23import os.path 24import itertools 25import urllib 26import shutil 27import pathlib 28from typing import Any, Iterable, MutableMapping, MutableSequence, Optional, Union, cast 29 30from PyQt5.QtCore import Qt, QUrl, QObject, QPoint, QTimer, QDateTime 31import yaml 32 33from qutebrowser.utils import (standarddir, objreg, qtutils, log, message, 34 utils, usertypes, version) 35from qutebrowser.api import cmdutils 36from qutebrowser.config import config, configfiles 37from qutebrowser.completion.models import miscmodels 38from qutebrowser.mainwindow import mainwindow 39from qutebrowser.qt import sip 40from qutebrowser.misc import objects, throttle 41 42 43_JsonType = MutableMapping[str, Any] 44 45 46class Sentinel: 47 48 """Sentinel value for default argument.""" 49 50 51default = Sentinel() 52session_manager = cast('SessionManager', None) 53 54ArgType = Union[str, Sentinel] 55 56 57def init(parent=None): 58 """Initialize sessions. 59 60 Args: 61 parent: The parent to use for the SessionManager. 62 """ 63 base_path = pathlib.Path(standarddir.data()) / 'sessions' 64 65 # WORKAROUND for https://github.com/qutebrowser/qutebrowser/issues/5359 66 backup_path = base_path / 'before-qt-515' 67 68 if objects.backend == usertypes.Backend.QtWebEngine: 69 webengine_version = version.qtwebengine_versions().webengine 70 do_backup = webengine_version >= utils.VersionNumber(5, 15) 71 else: 72 do_backup = False 73 74 if base_path.exists() and not backup_path.exists() and do_backup: 75 backup_path.mkdir() 76 for path in base_path.glob('*.yml'): 77 shutil.copy(path, backup_path) 78 79 base_path.mkdir(exist_ok=True) 80 81 global session_manager 82 session_manager = SessionManager(str(base_path), parent) 83 84 85def shutdown(session: Optional[ArgType], last_window: bool) -> None: 86 """Handle a shutdown by saving sessions and removing the autosave file.""" 87 if session_manager is None: 88 return # type: ignore[unreachable] 89 90 try: 91 if session is not None: 92 session_manager.save(session, last_window=last_window, 93 load_next_time=True) 94 elif config.val.auto_save.session: 95 session_manager.save(default, last_window=last_window, 96 load_next_time=True) 97 except SessionError as e: 98 log.sessions.error("Failed to save session: {}".format(e)) 99 100 session_manager.delete_autosave() 101 102 103class SessionError(Exception): 104 105 """Exception raised when a session failed to load/save.""" 106 107 108class SessionNotFoundError(SessionError): 109 110 """Exception raised when a session to be loaded was not found.""" 111 112 113class TabHistoryItem: 114 115 """A single item in the tab history. 116 117 Attributes: 118 url: The QUrl of this item. 119 original_url: The QUrl of this item which was originally requested. 120 title: The title as string of this item. 121 active: Whether this item is the item currently navigated to. 122 user_data: The user data for this item. 123 """ 124 125 def __init__(self, url, title, *, original_url=None, active=False, 126 user_data=None, last_visited=None): 127 self.url = url 128 if original_url is None: 129 self.original_url = url 130 else: 131 self.original_url = original_url 132 self.title = title 133 self.active = active 134 self.user_data = user_data 135 self.last_visited = last_visited 136 137 def __repr__(self): 138 return utils.get_repr(self, constructor=True, url=self.url, 139 original_url=self.original_url, title=self.title, 140 active=self.active, user_data=self.user_data, 141 last_visited=self.last_visited) 142 143 144class SessionManager(QObject): 145 146 """Manager for sessions. 147 148 Attributes: 149 _base_path: The path to store sessions under. 150 _last_window_session: The session data of the last window which was 151 closed. 152 current: The name of the currently loaded session, or None. 153 did_load: Set when a session was loaded. 154 """ 155 156 def __init__(self, base_path, parent=None): 157 super().__init__(parent) 158 self.current: Optional[str] = None 159 self._base_path = base_path 160 self._last_window_session = None 161 self.did_load = False 162 # throttle autosaves to one minute apart 163 self.save_autosave = throttle.Throttle(self._save_autosave, 60 * 1000) 164 165 def _get_session_path(self, name, check_exists=False): 166 """Get the session path based on a session name or absolute path. 167 168 Args: 169 name: The name of the session. 170 check_exists: Whether it should also be checked if the session 171 exists. 172 """ 173 path = os.path.expanduser(name) 174 if os.path.isabs(path) and ((not check_exists) or 175 os.path.exists(path)): 176 return path 177 else: 178 path = os.path.join(self._base_path, name + '.yml') 179 if check_exists and not os.path.exists(path): 180 raise SessionNotFoundError(path) 181 return path 182 183 def exists(self, name): 184 """Check if a named session exists.""" 185 try: 186 self._get_session_path(name, check_exists=True) 187 except SessionNotFoundError: 188 return False 189 else: 190 return True 191 192 def _save_tab_item(self, tab, idx, item): 193 """Save a single history item in a tab. 194 195 Args: 196 tab: The tab to save. 197 idx: The index of the current history item. 198 item: The history item. 199 200 Return: 201 A dict with the saved data for this item. 202 """ 203 data: _JsonType = { 204 'url': bytes(item.url().toEncoded()).decode('ascii'), 205 } 206 207 if item.title(): 208 data['title'] = item.title() 209 else: 210 # https://github.com/qutebrowser/qutebrowser/issues/879 211 if tab.history.current_idx() == idx: 212 data['title'] = tab.title() 213 else: 214 data['title'] = data['url'] 215 216 if item.originalUrl() != item.url(): 217 encoded = item.originalUrl().toEncoded() 218 data['original-url'] = bytes(encoded).decode('ascii') 219 220 if tab.history.current_idx() == idx: 221 data['active'] = True 222 223 try: 224 user_data = item.userData() 225 except AttributeError: 226 # QtWebEngine 227 user_data = None 228 229 data['last_visited'] = item.lastVisited().toString(Qt.ISODate) 230 231 if tab.history.current_idx() == idx: 232 pos = tab.scroller.pos_px() 233 data['zoom'] = tab.zoom.factor() 234 data['scroll-pos'] = {'x': pos.x(), 'y': pos.y()} 235 elif user_data is not None: 236 if 'zoom' in user_data: 237 data['zoom'] = user_data['zoom'] 238 if 'scroll-pos' in user_data: 239 pos = user_data['scroll-pos'] 240 data['scroll-pos'] = {'x': pos.x(), 'y': pos.y()} 241 242 data['pinned'] = tab.data.pinned 243 244 return data 245 246 def _save_tab(self, tab, active): 247 """Get a dict with data for a single tab. 248 249 Args: 250 tab: The WebView to save. 251 active: Whether the tab is currently active. 252 """ 253 data: _JsonType = {'history': []} 254 if active: 255 data['active'] = True 256 for idx, item in enumerate(tab.history): 257 qtutils.ensure_valid(item) 258 item_data = self._save_tab_item(tab, idx, item) 259 if item.url().scheme() == 'qute' and item.url().host() == 'back': 260 # don't add qute://back to the session file 261 if item_data.get('active', False) and data['history']: 262 # mark entry before qute://back as active 263 data['history'][-1]['active'] = True 264 else: 265 data['history'].append(item_data) 266 return data 267 268 def _save_all(self, *, only_window=None, with_private=False): 269 """Get a dict with data for all windows/tabs.""" 270 data: _JsonType = {'windows': []} 271 if only_window is not None: 272 winlist: Iterable[int] = [only_window] 273 else: 274 winlist = objreg.window_registry 275 276 for win_id in sorted(winlist): 277 tabbed_browser = objreg.get('tabbed-browser', scope='window', 278 window=win_id) 279 main_window = objreg.get('main-window', scope='window', 280 window=win_id) 281 282 # We could be in the middle of destroying a window here 283 if sip.isdeleted(main_window): 284 continue 285 286 if tabbed_browser.is_private and not with_private: 287 continue 288 289 win_data: _JsonType = {} 290 active_window = objects.qapp.activeWindow() 291 if getattr(active_window, 'win_id', None) == win_id: 292 win_data['active'] = True 293 win_data['geometry'] = bytes(main_window.saveGeometry()) 294 win_data['tabs'] = [] 295 if tabbed_browser.is_private: 296 win_data['private'] = True 297 for i, tab in enumerate(tabbed_browser.widgets()): 298 active = i == tabbed_browser.widget.currentIndex() 299 win_data['tabs'].append(self._save_tab(tab, active)) 300 data['windows'].append(win_data) 301 return data 302 303 def _get_session_name(self, name): 304 """Helper for save to get the name to save the session to. 305 306 Args: 307 name: The name of the session to save, or the 'default' sentinel 308 object. 309 """ 310 if name is default: 311 name = config.val.session.default_name 312 if name is None: 313 if self.current is not None: 314 name = self.current 315 else: 316 name = 'default' 317 return name 318 319 def save(self, name, last_window=False, load_next_time=False, 320 only_window=None, with_private=False): 321 """Save a named session. 322 323 Args: 324 name: The name of the session to save, or the 'default' sentinel 325 object. 326 last_window: If set, saves the saved self._last_window_session 327 instead of the currently open state. 328 load_next_time: If set, prepares this session to be load next time. 329 only_window: If set, only tabs in the specified window is saved. 330 with_private: Include private windows. 331 332 Return: 333 The name of the saved session. 334 """ 335 name = self._get_session_name(name) 336 path = self._get_session_path(name) 337 338 log.sessions.debug("Saving session {} to {}...".format(name, path)) 339 if last_window: 340 data = self._last_window_session 341 if data is None: 342 log.sessions.error("last_window_session is None while saving!") 343 return None 344 else: 345 data = self._save_all(only_window=only_window, 346 with_private=with_private) 347 log.sessions.vdebug( # type: ignore[attr-defined] 348 "Saving data: {}".format(data)) 349 try: 350 with qtutils.savefile_open(path) as f: 351 utils.yaml_dump(data, f) 352 except (OSError, UnicodeEncodeError, yaml.YAMLError) as e: 353 raise SessionError(e) 354 355 if load_next_time: 356 configfiles.state['general']['session'] = name 357 return name 358 359 def _save_autosave(self): 360 """Save the autosave session.""" 361 try: 362 self.save('_autosave') 363 except SessionError as e: 364 log.sessions.error("Failed to save autosave session: {}".format(e)) 365 366 def delete_autosave(self): 367 """Delete the autosave session.""" 368 # cancel any in-flight saves 369 self.save_autosave.cancel() 370 try: 371 self.delete('_autosave') 372 except SessionNotFoundError: 373 # Exiting before the first load finished 374 pass 375 except SessionError as e: 376 log.sessions.error("Failed to delete autosave session: {}" 377 .format(e)) 378 379 def save_last_window_session(self): 380 """Temporarily save the session for the last closed window.""" 381 self._last_window_session = self._save_all() 382 383 def _load_tab(self, new_tab, data): # noqa: C901 384 """Load yaml data into a newly opened tab.""" 385 entries = [] 386 lazy_load: MutableSequence[_JsonType] = [] 387 # use len(data['history']) 388 # -> dropwhile empty if not session.lazy_session 389 lazy_index = len(data['history']) 390 gen = itertools.chain( 391 itertools.takewhile(lambda _: not lazy_load, 392 enumerate(data['history'])), 393 enumerate(lazy_load), 394 itertools.dropwhile(lambda i: i[0] < lazy_index, 395 enumerate(data['history']))) 396 397 for i, histentry in gen: 398 user_data = {} 399 400 if 'zoom' in data: 401 # The zoom was accidentally stored in 'data' instead of per-tab 402 # earlier. 403 # See https://github.com/qutebrowser/qutebrowser/issues/728 404 user_data['zoom'] = data['zoom'] 405 elif 'zoom' in histentry: 406 user_data['zoom'] = histentry['zoom'] 407 408 if 'scroll-pos' in data: 409 # The scroll position was accidentally stored in 'data' instead 410 # of per-tab earlier. 411 # See https://github.com/qutebrowser/qutebrowser/issues/728 412 pos = data['scroll-pos'] 413 user_data['scroll-pos'] = QPoint(pos['x'], pos['y']) 414 elif 'scroll-pos' in histentry: 415 pos = histentry['scroll-pos'] 416 user_data['scroll-pos'] = QPoint(pos['x'], pos['y']) 417 418 if 'pinned' in histentry: 419 new_tab.data.pinned = histentry['pinned'] 420 421 if (config.val.session.lazy_restore and 422 histentry.get('active', False) and 423 not histentry['url'].startswith('qute://back')): 424 # remove "active" mark and insert back page marked as active 425 lazy_index = i + 1 426 lazy_load.append({ 427 'title': histentry['title'], 428 'url': 429 'qute://back#' + 430 urllib.parse.quote(histentry['title']), 431 'active': True 432 }) 433 histentry['active'] = False 434 435 active = histentry.get('active', False) 436 url = QUrl.fromEncoded(histentry['url'].encode('ascii')) 437 438 if 'original-url' in histentry: 439 orig_url = QUrl.fromEncoded( 440 histentry['original-url'].encode('ascii')) 441 else: 442 orig_url = url 443 444 if histentry.get("last_visited"): 445 last_visited: Optional[QDateTime] = QDateTime.fromString( 446 histentry.get("last_visited"), 447 Qt.ISODate, 448 ) 449 else: 450 last_visited = None 451 452 entry = TabHistoryItem(url=url, original_url=orig_url, 453 title=histentry['title'], active=active, 454 user_data=user_data, 455 last_visited=last_visited) 456 entries.append(entry) 457 if active: 458 new_tab.title_changed.emit(histentry['title']) 459 460 try: 461 new_tab.history.private_api.load_items(entries) 462 except ValueError as e: 463 raise SessionError(e) 464 465 def _load_window(self, win): 466 """Turn yaml data into windows.""" 467 window = mainwindow.MainWindow(geometry=win['geometry'], 468 private=win.get('private', None)) 469 window.show() 470 tabbed_browser = objreg.get('tabbed-browser', scope='window', 471 window=window.win_id) 472 tab_to_focus = None 473 for i, tab in enumerate(win['tabs']): 474 new_tab = tabbed_browser.tabopen(background=False) 475 self._load_tab(new_tab, tab) 476 if tab.get('active', False): 477 tab_to_focus = i 478 if new_tab.data.pinned: 479 new_tab.set_pinned(True) 480 if tab_to_focus is not None: 481 tabbed_browser.widget.setCurrentIndex(tab_to_focus) 482 if win.get('active', False): 483 QTimer.singleShot(0, tabbed_browser.widget.activateWindow) 484 485 def load(self, name, temp=False): 486 """Load a named session. 487 488 Args: 489 name: The name of the session to load. 490 temp: If given, don't set the current session. 491 """ 492 path = self._get_session_path(name, check_exists=True) 493 try: 494 with open(path, encoding='utf-8') as f: 495 data = utils.yaml_load(f) 496 except (OSError, UnicodeDecodeError, yaml.YAMLError) as e: 497 raise SessionError(e) 498 499 log.sessions.debug("Loading session {} from {}...".format(name, path)) 500 if data is None: 501 raise SessionError("Got empty session file") 502 503 if qtutils.is_single_process(): 504 if any(win.get('private') for win in data['windows']): 505 raise SessionError("Can't load a session with private windows " 506 "in single process mode.") 507 508 for win in data['windows']: 509 self._load_window(win) 510 511 if data['windows']: 512 self.did_load = True 513 if not name.startswith('_') and not temp: 514 self.current = name 515 516 def delete(self, name): 517 """Delete a session.""" 518 path = self._get_session_path(name, check_exists=True) 519 try: 520 os.remove(path) 521 except OSError as e: 522 raise SessionError(e) 523 524 def list_sessions(self): 525 """Get a list of all session names.""" 526 sessions = [] 527 for filename in os.listdir(self._base_path): 528 base, ext = os.path.splitext(filename) 529 if ext == '.yml': 530 sessions.append(base) 531 return sorted(sessions) 532 533 534@cmdutils.register() 535@cmdutils.argument('name', completion=miscmodels.session) 536def session_load(name: str, *, 537 clear: bool = False, 538 temp: bool = False, 539 force: bool = False, 540 delete: bool = False) -> None: 541 """Load a session. 542 543 Args: 544 name: The name of the session. 545 clear: Close all existing windows. 546 temp: Don't set the current session for :session-save. 547 force: Force loading internal sessions (starting with an underline). 548 delete: Delete the saved session once it has loaded. 549 """ 550 if name.startswith('_') and not force: 551 raise cmdutils.CommandError("{} is an internal session, use --force " 552 "to load anyways.".format(name)) 553 old_windows = list(objreg.window_registry.values()) 554 try: 555 session_manager.load(name, temp=temp) 556 except SessionNotFoundError: 557 raise cmdutils.CommandError("Session {} not found!".format(name)) 558 except SessionError as e: 559 raise cmdutils.CommandError("Error while loading session: {}" 560 .format(e)) 561 else: 562 if clear: 563 for win in old_windows: 564 win.close() 565 if delete: 566 try: 567 session_manager.delete(name) 568 except SessionError as e: 569 log.sessions.exception("Error while deleting session!") 570 raise cmdutils.CommandError("Error while deleting session: {}" 571 .format(e)) 572 else: 573 log.sessions.debug("Loaded & deleted session {}.".format(name)) 574 575 576@cmdutils.register() 577@cmdutils.argument('name', completion=miscmodels.session) 578@cmdutils.argument('win_id', value=cmdutils.Value.win_id) 579@cmdutils.argument('with_private', flag='p') 580def session_save(name: ArgType = default, *, 581 current: bool = False, 582 quiet: bool = False, 583 force: bool = False, 584 only_active_window: bool = False, 585 with_private: bool = False, 586 win_id: int = None) -> None: 587 """Save a session. 588 589 Args: 590 name: The name of the session. If not given, the session configured in 591 session.default_name is saved. 592 current: Save the current session instead of the default. 593 quiet: Don't show confirmation message. 594 force: Force saving internal sessions (starting with an underline). 595 only_active_window: Saves only tabs of the currently active window. 596 with_private: Include private windows. 597 """ 598 if not isinstance(name, Sentinel) and name.startswith('_') and not force: 599 raise cmdutils.CommandError("{} is an internal session, use --force " 600 "to save anyways.".format(name)) 601 if current: 602 if session_manager.current is None: 603 raise cmdutils.CommandError("No session loaded currently!") 604 name = session_manager.current 605 assert not name.startswith('_') 606 try: 607 if only_active_window: 608 name = session_manager.save(name, only_window=win_id, 609 with_private=True) 610 else: 611 name = session_manager.save(name, with_private=with_private) 612 except SessionError as e: 613 raise cmdutils.CommandError("Error while saving session: {}".format(e)) 614 else: 615 if quiet: 616 log.sessions.debug("Saved session {}.".format(name)) 617 else: 618 message.info("Saved session {}.".format(name)) 619 620 621@cmdutils.register() 622@cmdutils.argument('name', completion=miscmodels.session) 623def session_delete(name: str, *, force: bool = False) -> None: 624 """Delete a session. 625 626 Args: 627 name: The name of the session. 628 force: Force deleting internal sessions (starting with an underline). 629 """ 630 if name.startswith('_') and not force: 631 raise cmdutils.CommandError("{} is an internal session, use --force " 632 "to delete anyways.".format(name)) 633 try: 634 session_manager.delete(name) 635 except SessionNotFoundError: 636 raise cmdutils.CommandError("Session {} not found!".format(name)) 637 except SessionError as e: 638 log.sessions.exception("Error while deleting session!") 639 raise cmdutils.CommandError("Error while deleting session: {}" 640 .format(e)) 641 else: 642 log.sessions.debug("Deleted session {}.".format(name)) 643 644 645def load_default(name): 646 """Load the default session. 647 648 Args: 649 name: The name of the session to load, or None to read state file. 650 """ 651 if name is None and session_manager.exists('_autosave'): 652 name = '_autosave' 653 elif name is None: 654 try: 655 name = configfiles.state['general']['session'] 656 except KeyError: 657 # No session given as argument and none in the session file -> 658 # start without loading a session 659 return 660 661 try: 662 session_manager.load(name) 663 except SessionNotFoundError: 664 message.error("Session {} not found!".format(name)) 665 except SessionError as e: 666 message.error("Failed to load session {}: {}".format(name, e)) 667 try: 668 del configfiles.state['general']['session'] 669 except KeyError: 670 pass 671 # If this was a _restart session, delete it. 672 if name == '_restart': 673 session_manager.delete('_restart') 674