1# encoding: utf-8
2"""
3An application for managing IPython history.
4
5To be invoked as the `ipython history` subcommand.
6"""
7
8import os
9import sqlite3
10
11from traitlets.config.application import Application
12from .application import BaseIPythonApplication
13from traitlets import Bool, Int, Dict
14from ..utils.io import ask_yes_no
15
16trim_hist_help = """Trim the IPython history database to the last 1000 entries.
17
18This actually copies the last 1000 entries to a new database, and then replaces
19the old file with the new. Use the `--keep=` argument to specify a number
20other than 1000.
21"""
22
23clear_hist_help = """Clear the IPython history database, deleting all entries.
24
25Because this is a destructive operation, IPython will prompt the user if they
26really want to do this. Passing a `-f` flag will force clearing without a
27prompt.
28
29This is an handy alias to `ipython history trim --keep=0`
30"""
31
32
33class HistoryTrim(BaseIPythonApplication):
34    description = trim_hist_help
35
36    backup = Bool(False,
37        help="Keep the old history file as history.sqlite.<N>"
38        ).tag(config=True)
39
40    keep = Int(1000,
41        help="Number of recent lines to keep in the database."
42        ).tag(config=True)
43
44    flags = Dict(dict(
45        backup = ({'HistoryTrim' : {'backup' : True}},
46            backup.help
47        )
48    ))
49
50    aliases=Dict(dict(
51        keep = 'HistoryTrim.keep'
52    ))
53
54    def start(self):
55        profile_dir = self.profile_dir.location
56        hist_file = os.path.join(profile_dir, 'history.sqlite')
57        con = sqlite3.connect(hist_file)
58
59        # Grab the recent history from the current database.
60        inputs = list(con.execute('SELECT session, line, source, source_raw FROM '
61                                'history ORDER BY session DESC, line DESC LIMIT ?', (self.keep+1,)))
62        if len(inputs) <= self.keep:
63            print("There are already at most %d entries in the history database." % self.keep)
64            print("Not doing anything. Use --keep= argument to keep fewer entries")
65            return
66
67        print("Trimming history to the most recent %d entries." % self.keep)
68
69        inputs.pop() # Remove the extra element we got to check the length.
70        inputs.reverse()
71        if inputs:
72            first_session = inputs[0][0]
73            outputs = list(con.execute('SELECT session, line, output FROM '
74                                       'output_history WHERE session >= ?', (first_session,)))
75            sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM '
76                                        'sessions WHERE session >= ?', (first_session,)))
77        con.close()
78
79        # Create the new history database.
80        new_hist_file = os.path.join(profile_dir, 'history.sqlite.new')
81        i = 0
82        while os.path.exists(new_hist_file):
83            # Make sure we don't interfere with an existing file.
84            i += 1
85            new_hist_file = os.path.join(profile_dir, 'history.sqlite.new'+str(i))
86        new_db = sqlite3.connect(new_hist_file)
87        new_db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer
88                            primary key autoincrement, start timestamp,
89                            end timestamp, num_cmds integer, remark text)""")
90        new_db.execute("""CREATE TABLE IF NOT EXISTS history
91                        (session integer, line integer, source text, source_raw text,
92                        PRIMARY KEY (session, line))""")
93        new_db.execute("""CREATE TABLE IF NOT EXISTS output_history
94                        (session integer, line integer, output text,
95                        PRIMARY KEY (session, line))""")
96        new_db.commit()
97
98
99        if inputs:
100            with new_db:
101                # Add the recent history into the new database.
102                new_db.executemany('insert into sessions values (?,?,?,?,?)', sessions)
103                new_db.executemany('insert into history values (?,?,?,?)', inputs)
104                new_db.executemany('insert into output_history values (?,?,?)', outputs)
105        new_db.close()
106
107        if self.backup:
108            i = 1
109            backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i)
110            while os.path.exists(backup_hist_file):
111                i += 1
112                backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i)
113            os.rename(hist_file, backup_hist_file)
114            print("Backed up longer history file to", backup_hist_file)
115        else:
116            os.remove(hist_file)
117
118        os.rename(new_hist_file, hist_file)
119
120class HistoryClear(HistoryTrim):
121    description = clear_hist_help
122    keep = Int(0,
123        help="Number of recent lines to keep in the database.")
124
125    force = Bool(False,
126        help="Don't prompt user for confirmation"
127        ).tag(config=True)
128
129    flags = Dict(dict(
130        force = ({'HistoryClear' : {'force' : True}},
131            force.help),
132        f = ({'HistoryTrim' : {'force' : True}},
133            force.help
134        )
135    ))
136    aliases = Dict()
137
138    def start(self):
139        if self.force or ask_yes_no("Really delete all ipython history? ",
140                default="no", interrupt="no"):
141            HistoryTrim.start(self)
142
143class HistoryApp(Application):
144    name = u'ipython-history'
145    description = "Manage the IPython history database."
146
147    subcommands = Dict(dict(
148        trim = (HistoryTrim, HistoryTrim.description.splitlines()[0]),
149        clear = (HistoryClear, HistoryClear.description.splitlines()[0]),
150    ))
151
152    def start(self):
153        if self.subapp is None:
154            print("No subcommand specified. Must specify one of: %s" % \
155                                                    (self.subcommands.keys()))
156            print()
157            self.print_description()
158            self.print_subcommands()
159            self.exit(1)
160        else:
161            return self.subapp.start()
162