1# encoding: utf-8 2""" 3An application for managing IPython history. 4 5To be invoked as the `ipython history` subcommand. 6""" 7 8import os 9import sqlite3 10 11from traitlets.config.application import Application 12from .application import BaseIPythonApplication 13from traitlets import Bool, Int, Dict 14from ..utils.io import ask_yes_no 15 16trim_hist_help = """Trim the IPython history database to the last 1000 entries. 17 18This actually copies the last 1000 entries to a new database, and then replaces 19the old file with the new. Use the `--keep=` argument to specify a number 20other than 1000. 21""" 22 23clear_hist_help = """Clear the IPython history database, deleting all entries. 24 25Because this is a destructive operation, IPython will prompt the user if they 26really want to do this. Passing a `-f` flag will force clearing without a 27prompt. 28 29This is an handy alias to `ipython history trim --keep=0` 30""" 31 32 33class HistoryTrim(BaseIPythonApplication): 34 description = trim_hist_help 35 36 backup = Bool(False, 37 help="Keep the old history file as history.sqlite.<N>" 38 ).tag(config=True) 39 40 keep = Int(1000, 41 help="Number of recent lines to keep in the database." 42 ).tag(config=True) 43 44 flags = Dict(dict( 45 backup = ({'HistoryTrim' : {'backup' : True}}, 46 backup.help 47 ) 48 )) 49 50 aliases=Dict(dict( 51 keep = 'HistoryTrim.keep' 52 )) 53 54 def start(self): 55 profile_dir = self.profile_dir.location 56 hist_file = os.path.join(profile_dir, 'history.sqlite') 57 con = sqlite3.connect(hist_file) 58 59 # Grab the recent history from the current database. 60 inputs = list(con.execute('SELECT session, line, source, source_raw FROM ' 61 'history ORDER BY session DESC, line DESC LIMIT ?', (self.keep+1,))) 62 if len(inputs) <= self.keep: 63 print("There are already at most %d entries in the history database." % self.keep) 64 print("Not doing anything. Use --keep= argument to keep fewer entries") 65 return 66 67 print("Trimming history to the most recent %d entries." % self.keep) 68 69 inputs.pop() # Remove the extra element we got to check the length. 70 inputs.reverse() 71 if inputs: 72 first_session = inputs[0][0] 73 outputs = list(con.execute('SELECT session, line, output FROM ' 74 'output_history WHERE session >= ?', (first_session,))) 75 sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM ' 76 'sessions WHERE session >= ?', (first_session,))) 77 con.close() 78 79 # Create the new history database. 80 new_hist_file = os.path.join(profile_dir, 'history.sqlite.new') 81 i = 0 82 while os.path.exists(new_hist_file): 83 # Make sure we don't interfere with an existing file. 84 i += 1 85 new_hist_file = os.path.join(profile_dir, 'history.sqlite.new'+str(i)) 86 new_db = sqlite3.connect(new_hist_file) 87 new_db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer 88 primary key autoincrement, start timestamp, 89 end timestamp, num_cmds integer, remark text)""") 90 new_db.execute("""CREATE TABLE IF NOT EXISTS history 91 (session integer, line integer, source text, source_raw text, 92 PRIMARY KEY (session, line))""") 93 new_db.execute("""CREATE TABLE IF NOT EXISTS output_history 94 (session integer, line integer, output text, 95 PRIMARY KEY (session, line))""") 96 new_db.commit() 97 98 99 if inputs: 100 with new_db: 101 # Add the recent history into the new database. 102 new_db.executemany('insert into sessions values (?,?,?,?,?)', sessions) 103 new_db.executemany('insert into history values (?,?,?,?)', inputs) 104 new_db.executemany('insert into output_history values (?,?,?)', outputs) 105 new_db.close() 106 107 if self.backup: 108 i = 1 109 backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i) 110 while os.path.exists(backup_hist_file): 111 i += 1 112 backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i) 113 os.rename(hist_file, backup_hist_file) 114 print("Backed up longer history file to", backup_hist_file) 115 else: 116 os.remove(hist_file) 117 118 os.rename(new_hist_file, hist_file) 119 120class HistoryClear(HistoryTrim): 121 description = clear_hist_help 122 keep = Int(0, 123 help="Number of recent lines to keep in the database.") 124 125 force = Bool(False, 126 help="Don't prompt user for confirmation" 127 ).tag(config=True) 128 129 flags = Dict(dict( 130 force = ({'HistoryClear' : {'force' : True}}, 131 force.help), 132 f = ({'HistoryTrim' : {'force' : True}}, 133 force.help 134 ) 135 )) 136 aliases = Dict() 137 138 def start(self): 139 if self.force or ask_yes_no("Really delete all ipython history? ", 140 default="no", interrupt="no"): 141 HistoryTrim.start(self) 142 143class HistoryApp(Application): 144 name = u'ipython-history' 145 description = "Manage the IPython history database." 146 147 subcommands = Dict(dict( 148 trim = (HistoryTrim, HistoryTrim.description.splitlines()[0]), 149 clear = (HistoryClear, HistoryClear.description.splitlines()[0]), 150 )) 151 152 def start(self): 153 if self.subapp is None: 154 print("No subcommand specified. Must specify one of: %s" % \ 155 (self.subcommands.keys())) 156 print() 157 self.print_description() 158 self.print_subcommands() 159 self.exit(1) 160 else: 161 return self.subapp.start() 162