1# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
2
3# Copyright 2015-2021 Florian Bruhin (The Compiler) <mail@qutebrowser.org>
4#
5# This file is part of qutebrowser.
6#
7# qutebrowser is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# qutebrowser is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with qutebrowser.  If not, see <https://www.gnu.org/licenses/>.
19
20"""Management of sessions - saved tabs/windows."""
21
22import os
23import os.path
24import itertools
25import urllib
26import shutil
27import pathlib
28from typing import Any, Iterable, MutableMapping, MutableSequence, Optional, Union, cast
29
30from PyQt5.QtCore import Qt, QUrl, QObject, QPoint, QTimer, QDateTime
31import yaml
32
33from qutebrowser.utils import (standarddir, objreg, qtutils, log, message,
34                               utils, usertypes, version)
35from qutebrowser.api import cmdutils
36from qutebrowser.config import config, configfiles
37from qutebrowser.completion.models import miscmodels
38from qutebrowser.mainwindow import mainwindow
39from qutebrowser.qt import sip
40from qutebrowser.misc import objects, throttle
41
42
43_JsonType = MutableMapping[str, Any]
44
45
46class Sentinel:
47
48    """Sentinel value for default argument."""
49
50
51default = Sentinel()
52session_manager = cast('SessionManager', None)
53
54ArgType = Union[str, Sentinel]
55
56
57def init(parent=None):
58    """Initialize sessions.
59
60    Args:
61        parent: The parent to use for the SessionManager.
62    """
63    base_path = pathlib.Path(standarddir.data()) / 'sessions'
64
65    # WORKAROUND for https://github.com/qutebrowser/qutebrowser/issues/5359
66    backup_path = base_path / 'before-qt-515'
67
68    if objects.backend == usertypes.Backend.QtWebEngine:
69        webengine_version = version.qtwebengine_versions().webengine
70        do_backup = webengine_version >= utils.VersionNumber(5, 15)
71    else:
72        do_backup = False
73
74    if base_path.exists() and not backup_path.exists() and do_backup:
75        backup_path.mkdir()
76        for path in base_path.glob('*.yml'):
77            shutil.copy(path, backup_path)
78
79    base_path.mkdir(exist_ok=True)
80
81    global session_manager
82    session_manager = SessionManager(str(base_path), parent)
83
84
85def shutdown(session: Optional[ArgType], last_window: bool) -> None:
86    """Handle a shutdown by saving sessions and removing the autosave file."""
87    if session_manager is None:
88        return  # type: ignore[unreachable]
89
90    try:
91        if session is not None:
92            session_manager.save(session, last_window=last_window,
93                                 load_next_time=True)
94        elif config.val.auto_save.session:
95            session_manager.save(default, last_window=last_window,
96                                 load_next_time=True)
97    except SessionError as e:
98        log.sessions.error("Failed to save session: {}".format(e))
99
100    session_manager.delete_autosave()
101
102
103class SessionError(Exception):
104
105    """Exception raised when a session failed to load/save."""
106
107
108class SessionNotFoundError(SessionError):
109
110    """Exception raised when a session to be loaded was not found."""
111
112
113class TabHistoryItem:
114
115    """A single item in the tab history.
116
117    Attributes:
118        url: The QUrl of this item.
119        original_url: The QUrl of this item which was originally requested.
120        title: The title as string of this item.
121        active: Whether this item is the item currently navigated to.
122        user_data: The user data for this item.
123    """
124
125    def __init__(self, url, title, *, original_url=None, active=False,
126                 user_data=None, last_visited=None):
127        self.url = url
128        if original_url is None:
129            self.original_url = url
130        else:
131            self.original_url = original_url
132        self.title = title
133        self.active = active
134        self.user_data = user_data
135        self.last_visited = last_visited
136
137    def __repr__(self):
138        return utils.get_repr(self, constructor=True, url=self.url,
139                              original_url=self.original_url, title=self.title,
140                              active=self.active, user_data=self.user_data,
141                              last_visited=self.last_visited)
142
143
144class SessionManager(QObject):
145
146    """Manager for sessions.
147
148    Attributes:
149        _base_path: The path to store sessions under.
150        _last_window_session: The session data of the last window which was
151                              closed.
152        current: The name of the currently loaded session, or None.
153        did_load: Set when a session was loaded.
154    """
155
156    def __init__(self, base_path, parent=None):
157        super().__init__(parent)
158        self.current: Optional[str] = None
159        self._base_path = base_path
160        self._last_window_session = None
161        self.did_load = False
162        # throttle autosaves to one minute apart
163        self.save_autosave = throttle.Throttle(self._save_autosave, 60 * 1000)
164
165    def _get_session_path(self, name, check_exists=False):
166        """Get the session path based on a session name or absolute path.
167
168        Args:
169            name: The name of the session.
170            check_exists: Whether it should also be checked if the session
171                          exists.
172        """
173        path = os.path.expanduser(name)
174        if os.path.isabs(path) and ((not check_exists) or
175                                    os.path.exists(path)):
176            return path
177        else:
178            path = os.path.join(self._base_path, name + '.yml')
179            if check_exists and not os.path.exists(path):
180                raise SessionNotFoundError(path)
181            return path
182
183    def exists(self, name):
184        """Check if a named session exists."""
185        try:
186            self._get_session_path(name, check_exists=True)
187        except SessionNotFoundError:
188            return False
189        else:
190            return True
191
192    def _save_tab_item(self, tab, idx, item):
193        """Save a single history item in a tab.
194
195        Args:
196            tab: The tab to save.
197            idx: The index of the current history item.
198            item: The history item.
199
200        Return:
201            A dict with the saved data for this item.
202        """
203        data: _JsonType = {
204            'url': bytes(item.url().toEncoded()).decode('ascii'),
205        }
206
207        if item.title():
208            data['title'] = item.title()
209        else:
210            # https://github.com/qutebrowser/qutebrowser/issues/879
211            if tab.history.current_idx() == idx:
212                data['title'] = tab.title()
213            else:
214                data['title'] = data['url']
215
216        if item.originalUrl() != item.url():
217            encoded = item.originalUrl().toEncoded()
218            data['original-url'] = bytes(encoded).decode('ascii')
219
220        if tab.history.current_idx() == idx:
221            data['active'] = True
222
223        try:
224            user_data = item.userData()
225        except AttributeError:
226            # QtWebEngine
227            user_data = None
228
229        data['last_visited'] = item.lastVisited().toString(Qt.ISODate)
230
231        if tab.history.current_idx() == idx:
232            pos = tab.scroller.pos_px()
233            data['zoom'] = tab.zoom.factor()
234            data['scroll-pos'] = {'x': pos.x(), 'y': pos.y()}
235        elif user_data is not None:
236            if 'zoom' in user_data:
237                data['zoom'] = user_data['zoom']
238            if 'scroll-pos' in user_data:
239                pos = user_data['scroll-pos']
240                data['scroll-pos'] = {'x': pos.x(), 'y': pos.y()}
241
242        data['pinned'] = tab.data.pinned
243
244        return data
245
246    def _save_tab(self, tab, active):
247        """Get a dict with data for a single tab.
248
249        Args:
250            tab: The WebView to save.
251            active: Whether the tab is currently active.
252        """
253        data: _JsonType = {'history': []}
254        if active:
255            data['active'] = True
256        for idx, item in enumerate(tab.history):
257            qtutils.ensure_valid(item)
258            item_data = self._save_tab_item(tab, idx, item)
259            if item.url().scheme() == 'qute' and item.url().host() == 'back':
260                # don't add qute://back to the session file
261                if item_data.get('active', False) and data['history']:
262                    # mark entry before qute://back as active
263                    data['history'][-1]['active'] = True
264            else:
265                data['history'].append(item_data)
266        return data
267
268    def _save_all(self, *, only_window=None, with_private=False):
269        """Get a dict with data for all windows/tabs."""
270        data: _JsonType = {'windows': []}
271        if only_window is not None:
272            winlist: Iterable[int] = [only_window]
273        else:
274            winlist = objreg.window_registry
275
276        for win_id in sorted(winlist):
277            tabbed_browser = objreg.get('tabbed-browser', scope='window',
278                                        window=win_id)
279            main_window = objreg.get('main-window', scope='window',
280                                     window=win_id)
281
282            # We could be in the middle of destroying a window here
283            if sip.isdeleted(main_window):
284                continue
285
286            if tabbed_browser.is_private and not with_private:
287                continue
288
289            win_data: _JsonType = {}
290            active_window = objects.qapp.activeWindow()
291            if getattr(active_window, 'win_id', None) == win_id:
292                win_data['active'] = True
293            win_data['geometry'] = bytes(main_window.saveGeometry())
294            win_data['tabs'] = []
295            if tabbed_browser.is_private:
296                win_data['private'] = True
297            for i, tab in enumerate(tabbed_browser.widgets()):
298                active = i == tabbed_browser.widget.currentIndex()
299                win_data['tabs'].append(self._save_tab(tab, active))
300            data['windows'].append(win_data)
301        return data
302
303    def _get_session_name(self, name):
304        """Helper for save to get the name to save the session to.
305
306        Args:
307            name: The name of the session to save, or the 'default' sentinel
308                  object.
309        """
310        if name is default:
311            name = config.val.session.default_name
312            if name is None:
313                if self.current is not None:
314                    name = self.current
315                else:
316                    name = 'default'
317        return name
318
319    def save(self, name, last_window=False, load_next_time=False,
320             only_window=None, with_private=False):
321        """Save a named session.
322
323        Args:
324            name: The name of the session to save, or the 'default' sentinel
325                  object.
326            last_window: If set, saves the saved self._last_window_session
327                         instead of the currently open state.
328            load_next_time: If set, prepares this session to be load next time.
329            only_window: If set, only tabs in the specified window is saved.
330            with_private: Include private windows.
331
332        Return:
333            The name of the saved session.
334        """
335        name = self._get_session_name(name)
336        path = self._get_session_path(name)
337
338        log.sessions.debug("Saving session {} to {}...".format(name, path))
339        if last_window:
340            data = self._last_window_session
341            if data is None:
342                log.sessions.error("last_window_session is None while saving!")
343                return None
344        else:
345            data = self._save_all(only_window=only_window,
346                                  with_private=with_private)
347        log.sessions.vdebug(  # type: ignore[attr-defined]
348            "Saving data: {}".format(data))
349        try:
350            with qtutils.savefile_open(path) as f:
351                utils.yaml_dump(data, f)
352        except (OSError, UnicodeEncodeError, yaml.YAMLError) as e:
353            raise SessionError(e)
354
355        if load_next_time:
356            configfiles.state['general']['session'] = name
357        return name
358
359    def _save_autosave(self):
360        """Save the autosave session."""
361        try:
362            self.save('_autosave')
363        except SessionError as e:
364            log.sessions.error("Failed to save autosave session: {}".format(e))
365
366    def delete_autosave(self):
367        """Delete the autosave session."""
368        # cancel any in-flight saves
369        self.save_autosave.cancel()
370        try:
371            self.delete('_autosave')
372        except SessionNotFoundError:
373            # Exiting before the first load finished
374            pass
375        except SessionError as e:
376            log.sessions.error("Failed to delete autosave session: {}"
377                               .format(e))
378
379    def save_last_window_session(self):
380        """Temporarily save the session for the last closed window."""
381        self._last_window_session = self._save_all()
382
383    def _load_tab(self, new_tab, data):  # noqa: C901
384        """Load yaml data into a newly opened tab."""
385        entries = []
386        lazy_load: MutableSequence[_JsonType] = []
387        # use len(data['history'])
388        # -> dropwhile empty if not session.lazy_session
389        lazy_index = len(data['history'])
390        gen = itertools.chain(
391            itertools.takewhile(lambda _: not lazy_load,
392                                enumerate(data['history'])),
393            enumerate(lazy_load),
394            itertools.dropwhile(lambda i: i[0] < lazy_index,
395                                enumerate(data['history'])))
396
397        for i, histentry in gen:
398            user_data = {}
399
400            if 'zoom' in data:
401                # The zoom was accidentally stored in 'data' instead of per-tab
402                # earlier.
403                # See https://github.com/qutebrowser/qutebrowser/issues/728
404                user_data['zoom'] = data['zoom']
405            elif 'zoom' in histentry:
406                user_data['zoom'] = histentry['zoom']
407
408            if 'scroll-pos' in data:
409                # The scroll position was accidentally stored in 'data' instead
410                # of per-tab earlier.
411                # See https://github.com/qutebrowser/qutebrowser/issues/728
412                pos = data['scroll-pos']
413                user_data['scroll-pos'] = QPoint(pos['x'], pos['y'])
414            elif 'scroll-pos' in histentry:
415                pos = histentry['scroll-pos']
416                user_data['scroll-pos'] = QPoint(pos['x'], pos['y'])
417
418            if 'pinned' in histentry:
419                new_tab.data.pinned = histentry['pinned']
420
421            if (config.val.session.lazy_restore and
422                    histentry.get('active', False) and
423                    not histentry['url'].startswith('qute://back')):
424                # remove "active" mark and insert back page marked as active
425                lazy_index = i + 1
426                lazy_load.append({
427                    'title': histentry['title'],
428                    'url':
429                        'qute://back#' +
430                        urllib.parse.quote(histentry['title']),
431                    'active': True
432                })
433                histentry['active'] = False
434
435            active = histentry.get('active', False)
436            url = QUrl.fromEncoded(histentry['url'].encode('ascii'))
437
438            if 'original-url' in histentry:
439                orig_url = QUrl.fromEncoded(
440                    histentry['original-url'].encode('ascii'))
441            else:
442                orig_url = url
443
444            if histentry.get("last_visited"):
445                last_visited: Optional[QDateTime] = QDateTime.fromString(
446                    histentry.get("last_visited"),
447                    Qt.ISODate,
448                )
449            else:
450                last_visited = None
451
452            entry = TabHistoryItem(url=url, original_url=orig_url,
453                                   title=histentry['title'], active=active,
454                                   user_data=user_data,
455                                   last_visited=last_visited)
456            entries.append(entry)
457            if active:
458                new_tab.title_changed.emit(histentry['title'])
459
460        try:
461            new_tab.history.private_api.load_items(entries)
462        except ValueError as e:
463            raise SessionError(e)
464
465    def _load_window(self, win):
466        """Turn yaml data into windows."""
467        window = mainwindow.MainWindow(geometry=win['geometry'],
468                                       private=win.get('private', None))
469        window.show()
470        tabbed_browser = objreg.get('tabbed-browser', scope='window',
471                                    window=window.win_id)
472        tab_to_focus = None
473        for i, tab in enumerate(win['tabs']):
474            new_tab = tabbed_browser.tabopen(background=False)
475            self._load_tab(new_tab, tab)
476            if tab.get('active', False):
477                tab_to_focus = i
478            if new_tab.data.pinned:
479                new_tab.set_pinned(True)
480        if tab_to_focus is not None:
481            tabbed_browser.widget.setCurrentIndex(tab_to_focus)
482        if win.get('active', False):
483            QTimer.singleShot(0, tabbed_browser.widget.activateWindow)
484
485    def load(self, name, temp=False):
486        """Load a named session.
487
488        Args:
489            name: The name of the session to load.
490            temp: If given, don't set the current session.
491        """
492        path = self._get_session_path(name, check_exists=True)
493        try:
494            with open(path, encoding='utf-8') as f:
495                data = utils.yaml_load(f)
496        except (OSError, UnicodeDecodeError, yaml.YAMLError) as e:
497            raise SessionError(e)
498
499        log.sessions.debug("Loading session {} from {}...".format(name, path))
500        if data is None:
501            raise SessionError("Got empty session file")
502
503        if qtutils.is_single_process():
504            if any(win.get('private') for win in data['windows']):
505                raise SessionError("Can't load a session with private windows "
506                                   "in single process mode.")
507
508        for win in data['windows']:
509            self._load_window(win)
510
511        if data['windows']:
512            self.did_load = True
513        if not name.startswith('_') and not temp:
514            self.current = name
515
516    def delete(self, name):
517        """Delete a session."""
518        path = self._get_session_path(name, check_exists=True)
519        try:
520            os.remove(path)
521        except OSError as e:
522            raise SessionError(e)
523
524    def list_sessions(self):
525        """Get a list of all session names."""
526        sessions = []
527        for filename in os.listdir(self._base_path):
528            base, ext = os.path.splitext(filename)
529            if ext == '.yml':
530                sessions.append(base)
531        return sorted(sessions)
532
533
534@cmdutils.register()
535@cmdutils.argument('name', completion=miscmodels.session)
536def session_load(name: str, *,
537                 clear: bool = False,
538                 temp: bool = False,
539                 force: bool = False,
540                 delete: bool = False) -> None:
541    """Load a session.
542
543    Args:
544        name: The name of the session.
545        clear: Close all existing windows.
546        temp: Don't set the current session for :session-save.
547        force: Force loading internal sessions (starting with an underline).
548        delete: Delete the saved session once it has loaded.
549    """
550    if name.startswith('_') and not force:
551        raise cmdutils.CommandError("{} is an internal session, use --force "
552                                    "to load anyways.".format(name))
553    old_windows = list(objreg.window_registry.values())
554    try:
555        session_manager.load(name, temp=temp)
556    except SessionNotFoundError:
557        raise cmdutils.CommandError("Session {} not found!".format(name))
558    except SessionError as e:
559        raise cmdutils.CommandError("Error while loading session: {}"
560                                    .format(e))
561    else:
562        if clear:
563            for win in old_windows:
564                win.close()
565        if delete:
566            try:
567                session_manager.delete(name)
568            except SessionError as e:
569                log.sessions.exception("Error while deleting session!")
570                raise cmdutils.CommandError("Error while deleting session: {}"
571                                            .format(e))
572            else:
573                log.sessions.debug("Loaded & deleted session {}.".format(name))
574
575
576@cmdutils.register()
577@cmdutils.argument('name', completion=miscmodels.session)
578@cmdutils.argument('win_id', value=cmdutils.Value.win_id)
579@cmdutils.argument('with_private', flag='p')
580def session_save(name: ArgType = default, *,
581                 current: bool = False,
582                 quiet: bool = False,
583                 force: bool = False,
584                 only_active_window: bool = False,
585                 with_private: bool = False,
586                 win_id: int = None) -> None:
587    """Save a session.
588
589    Args:
590        name: The name of the session. If not given, the session configured in
591              session.default_name is saved.
592        current: Save the current session instead of the default.
593        quiet: Don't show confirmation message.
594        force: Force saving internal sessions (starting with an underline).
595        only_active_window: Saves only tabs of the currently active window.
596        with_private: Include private windows.
597    """
598    if not isinstance(name, Sentinel) and name.startswith('_') and not force:
599        raise cmdutils.CommandError("{} is an internal session, use --force "
600                                    "to save anyways.".format(name))
601    if current:
602        if session_manager.current is None:
603            raise cmdutils.CommandError("No session loaded currently!")
604        name = session_manager.current
605        assert not name.startswith('_')
606    try:
607        if only_active_window:
608            name = session_manager.save(name, only_window=win_id,
609                                        with_private=True)
610        else:
611            name = session_manager.save(name, with_private=with_private)
612    except SessionError as e:
613        raise cmdutils.CommandError("Error while saving session: {}".format(e))
614    else:
615        if quiet:
616            log.sessions.debug("Saved session {}.".format(name))
617        else:
618            message.info("Saved session {}.".format(name))
619
620
621@cmdutils.register()
622@cmdutils.argument('name', completion=miscmodels.session)
623def session_delete(name: str, *, force: bool = False) -> None:
624    """Delete a session.
625
626    Args:
627        name: The name of the session.
628        force: Force deleting internal sessions (starting with an underline).
629    """
630    if name.startswith('_') and not force:
631        raise cmdutils.CommandError("{} is an internal session, use --force "
632                                    "to delete anyways.".format(name))
633    try:
634        session_manager.delete(name)
635    except SessionNotFoundError:
636        raise cmdutils.CommandError("Session {} not found!".format(name))
637    except SessionError as e:
638        log.sessions.exception("Error while deleting session!")
639        raise cmdutils.CommandError("Error while deleting session: {}"
640                                    .format(e))
641    else:
642        log.sessions.debug("Deleted session {}.".format(name))
643
644
645def load_default(name):
646    """Load the default session.
647
648    Args:
649        name: The name of the session to load, or None to read state file.
650    """
651    if name is None and session_manager.exists('_autosave'):
652        name = '_autosave'
653    elif name is None:
654        try:
655            name = configfiles.state['general']['session']
656        except KeyError:
657            # No session given as argument and none in the session file ->
658            # start without loading a session
659            return
660
661    try:
662        session_manager.load(name)
663    except SessionNotFoundError:
664        message.error("Session {} not found!".format(name))
665    except SessionError as e:
666        message.error("Failed to load session {}: {}".format(name, e))
667    try:
668        del configfiles.state['general']['session']
669    except KeyError:
670        pass
671    # If this was a _restart session, delete it.
672    if name == '_restart':
673        session_manager.delete('_restart')
674