1"""Manages state database used for checksum caching.""" 2 3from __future__ import unicode_literals 4 5import os 6import sqlite3 7 8import dvc.logger as logger 9from dvc.config import Config 10from dvc.utils import file_md5, remove, current_timestamp 11from dvc.exceptions import DvcException 12from dvc.utils.fs import get_mtime_and_size, get_inode 13 14 15class StateVersionTooNewError(DvcException): 16 """Thrown when dvc version is older than the state database version.""" 17 18 def __init__(self, dvc_version, expected, actual): 19 super(StateVersionTooNewError, self).__init__( 20 "you are using an old version '{dvc_version}' of dvc that is " 21 "using state file version '{expected}' which is not compatible " 22 "with the state file version '{actual}' that is used in this " 23 "repo. Please upgrade right now!".format( 24 dvc_version=dvc_version, expected=expected, actual=actual 25 ) 26 ) 27 28 29def _file_metadata_changed(actual_mtime, mtime, actual_size, size): 30 return actual_mtime != mtime or actual_size != size 31 32 33class State(object): # pylint: disable=too-many-instance-attributes 34 """Class for the state database. 35 36 Args: 37 repo (dvc.repo.Repo): repo instance that this state belongs to. 38 config (configobj.ConfigObj): config for the state. 39 40 Raises: 41 StateVersionTooNewError: thrown when dvc version is older than the 42 state database version. 43 """ 44 45 VERSION = 3 46 STATE_FILE = "state" 47 STATE_TABLE = "state" 48 STATE_TABLE_LAYOUT = ( 49 "inode INTEGER PRIMARY KEY, " 50 "mtime TEXT NOT NULL, " 51 "size TEXT NOT NULL, " 52 "md5 TEXT NOT NULL, " 53 "timestamp TEXT NOT NULL" 54 ) 55 56 STATE_INFO_TABLE = "state_info" 57 STATE_INFO_TABLE_LAYOUT = "count INTEGER" 58 STATE_INFO_ROW = 1 59 60 LINK_STATE_TABLE = "link_state" 61 LINK_STATE_TABLE_LAYOUT = ( 62 "path TEXT PRIMARY KEY, " 63 "inode INTEGER NOT NULL, " 64 "mtime TEXT NOT NULL" 65 ) 66 67 STATE_ROW_LIMIT = 100000000 68 STATE_ROW_CLEANUP_QUOTA = 50 69 70 MAX_INT = 2 ** 63 - 1 71 MAX_UINT = 2 ** 64 - 2 72 73 def __init__(self, repo, config): 74 self.repo = repo 75 self.dvc_dir = repo.dvc_dir 76 self.root_dir = repo.root_dir 77 78 self.row_limit = 100 79 self.row_cleanup_quota = 50 80 81 state_config = config.get(Config.SECTION_STATE, {}) 82 self.row_limit = state_config.get( 83 Config.SECTION_STATE_ROW_LIMIT, self.STATE_ROW_LIMIT 84 ) 85 self.row_cleanup_quota = state_config.get( 86 Config.SECTION_STATE_ROW_CLEANUP_QUOTA, 87 self.STATE_ROW_CLEANUP_QUOTA, 88 ) 89 90 if not self.dvc_dir: 91 self.state_file = None 92 return 93 94 self.state_file = os.path.join(self.dvc_dir, self.STATE_FILE) 95 96 # https://www.sqlite.org/tempfiles.html 97 self.temp_files = [ 98 self.state_file + "-journal", 99 self.state_file + "-wal", 100 ] 101 102 self.database = None 103 self.cursor = None 104 self.inserts = 0 105 106 def __enter__(self): 107 self.load() 108 109 def __exit__(self, typ, value, tbck): 110 self.dump() 111 112 def _collect(self, path): 113 if os.path.isdir(path): 114 return self.repo.cache.local.collect_dir_cache(path) 115 return (file_md5(path)[0], None) 116 117 def changed(self, path, md5): 118 """Check if file/directory has the expected md5. 119 120 Args: 121 path (str): path to the file/directory to check. 122 md5 (str): expected md5. 123 124 Returns: 125 bool: True if path has the expected md5, False otherwise. 126 """ 127 actual = self.update(path) 128 129 msg = "File '{}', md5 '{}', actual '{}'" 130 logger.debug(msg.format(path, md5, actual)) 131 132 if not md5 or not actual: 133 return True 134 135 return actual.split(".")[0] != md5.split(".")[0] 136 137 def _execute(self, cmd): 138 logger.debug(cmd) 139 return self.cursor.execute(cmd) 140 141 def _fetchall(self): 142 ret = self.cursor.fetchall() 143 logger.debug("fetched: {}".format(ret)) 144 return ret 145 146 def _to_sqlite(self, num): 147 assert num >= 0 148 assert num < self.MAX_UINT 149 # NOTE: sqlite stores unit as signed ints, so maximum uint is 2^63-1 150 # see http://jakegoulding.com/blog/2011/02/06/sqlite-64-bit-integers/ 151 if num > self.MAX_INT: 152 ret = -(num - self.MAX_INT) 153 else: 154 ret = num 155 assert self._from_sqlite(ret) == num 156 return ret 157 158 def _from_sqlite(self, num): 159 assert abs(num) <= self.MAX_INT 160 if num < 0: 161 return abs(num) + self.MAX_INT 162 assert num < self.MAX_UINT 163 assert num >= 0 164 return num 165 166 def _prepare_db(self, empty=False): 167 from dvc import VERSION 168 169 if not empty: 170 cmd = "PRAGMA user_version;" 171 self._execute(cmd) 172 ret = self._fetchall() 173 assert len(ret) == 1 174 assert len(ret[0]) == 1 175 assert isinstance(ret[0][0], int) 176 version = ret[0][0] 177 178 if version > self.VERSION: 179 raise StateVersionTooNewError(VERSION, self.VERSION, version) 180 elif version < self.VERSION: 181 msg = ( 182 "State file version '{}' is too old. " 183 "Reformatting to the current version '{}'." 184 ) 185 logger.warning(msg.format(version, self.VERSION)) 186 cmd = "DROP TABLE IF EXISTS {};" 187 self._execute(cmd.format(self.STATE_TABLE)) 188 self._execute(cmd.format(self.STATE_INFO_TABLE)) 189 self._execute(cmd.format(self.LINK_STATE_TABLE)) 190 191 # Check that the state file is indeed a database 192 cmd = "CREATE TABLE IF NOT EXISTS {} ({})" 193 self._execute(cmd.format(self.STATE_TABLE, self.STATE_TABLE_LAYOUT)) 194 self._execute( 195 cmd.format(self.STATE_INFO_TABLE, self.STATE_INFO_TABLE_LAYOUT) 196 ) 197 self._execute( 198 cmd.format(self.LINK_STATE_TABLE, self.LINK_STATE_TABLE_LAYOUT) 199 ) 200 201 cmd = ( 202 "INSERT OR IGNORE INTO {} (count) SELECT 0 " 203 "WHERE NOT EXISTS (SELECT * FROM {})" 204 ) 205 self._execute(cmd.format(self.STATE_INFO_TABLE, self.STATE_INFO_TABLE)) 206 207 cmd = "PRAGMA user_version = {};" 208 self._execute(cmd.format(self.VERSION)) 209 210 def load(self): 211 """Loads state database.""" 212 retries = 1 213 while True: 214 assert self.database is None 215 assert self.cursor is None 216 assert self.inserts == 0 217 empty = not os.path.exists(self.state_file) 218 self.database = sqlite3.connect(self.state_file) 219 self.cursor = self.database.cursor() 220 221 # Try loading once to check that the file is indeed a database 222 # and reformat it if it is not. 223 try: 224 self._prepare_db(empty=empty) 225 return 226 except sqlite3.DatabaseError: 227 self.cursor.close() 228 self.database.close() 229 self.database = None 230 self.cursor = None 231 self.inserts = 0 232 if retries > 0: 233 os.unlink(self.state_file) 234 retries -= 1 235 else: 236 raise 237 238 def _vacuum(self): 239 # NOTE: see https://bugs.python.org/issue28518 240 self.database.isolation_level = None 241 self._execute("VACUUM") 242 self.database.isolation_level = "" 243 244 def dump(self): 245 """Saves state database.""" 246 assert self.database is not None 247 248 cmd = "SELECT count from {} WHERE rowid={}" 249 self._execute(cmd.format(self.STATE_INFO_TABLE, self.STATE_INFO_ROW)) 250 ret = self._fetchall() 251 assert len(ret) == 1 252 assert len(ret[0]) == 1 253 count = self._from_sqlite(ret[0][0]) + self.inserts 254 255 if count > self.row_limit: 256 msg = "cleaning up state, this might take a while." 257 logger.warning(msg) 258 259 delete = count - self.row_limit 260 delete += int(self.row_limit * (self.row_cleanup_quota / 100.0)) 261 cmd = ( 262 "DELETE FROM {} WHERE timestamp IN (" 263 "SELECT timestamp FROM {} ORDER BY timestamp ASC LIMIT {});" 264 ) 265 self._execute( 266 cmd.format(self.STATE_TABLE, self.STATE_TABLE, delete) 267 ) 268 269 self._vacuum() 270 271 cmd = "SELECT COUNT(*) FROM {}" 272 273 self._execute(cmd.format(self.STATE_TABLE)) 274 ret = self._fetchall() 275 assert len(ret) == 1 276 assert len(ret[0]) == 1 277 count = ret[0][0] 278 279 cmd = "UPDATE {} SET count = {} WHERE rowid = {}" 280 self._execute( 281 cmd.format( 282 self.STATE_INFO_TABLE, 283 self._to_sqlite(count), 284 self.STATE_INFO_ROW, 285 ) 286 ) 287 288 self._update_cache_directory_state() 289 290 self.database.commit() 291 self.cursor.close() 292 self.database.close() 293 self.database = None 294 self.cursor = None 295 self.inserts = 0 296 297 def _do_update(self, path, known_checksum=None): 298 """ 299 Make sure the stored info for the given path is up to date. 300 """ 301 if not os.path.exists(path): 302 return None, None 303 304 actual_mtime, actual_size = get_mtime_and_size(path) 305 actual_inode = get_inode(path) 306 307 existing_record = self.get_state_record_for_inode(actual_inode) 308 309 if existing_record: 310 md5, info = self._update_existing_state_record( 311 path, 312 actual_inode, 313 actual_mtime, 314 actual_size, 315 existing_record, 316 known_checksum, 317 ) 318 else: 319 md5, info = self._insert_new_state_record( 320 path, actual_inode, actual_mtime, actual_size, known_checksum 321 ) 322 323 return md5, info 324 325 def _update_existing_state_record( 326 self, 327 path, 328 actual_inode, 329 actual_mtime, 330 actual_size, 331 existing_record, 332 known_checksum=None, 333 ): 334 335 mtime, size, md5, _ = existing_record 336 if _file_metadata_changed(actual_mtime, mtime, actual_size, size): 337 md5, info = self._update_state_for_path_changed( 338 path, actual_inode, actual_mtime, actual_size, known_checksum 339 ) 340 else: 341 info = None 342 self._update_state_record_timestamp_for_inode(actual_inode) 343 return md5, info 344 345 def _update_state_record_timestamp_for_inode(self, actual_inode): 346 cmd = 'UPDATE {} SET timestamp = "{}" WHERE inode = {}' 347 self._execute( 348 cmd.format( 349 self.STATE_TABLE, 350 current_timestamp(), 351 self._to_sqlite(actual_inode), 352 ) 353 ) 354 355 def _update_state_for_path_changed( 356 self, 357 path, 358 actual_inode, 359 actual_mtime, 360 actual_size, 361 known_checksum=None, 362 ): 363 if known_checksum: 364 md5, info = known_checksum, None 365 else: 366 md5, info = self._collect(path) 367 cmd = ( 368 "UPDATE {} SET " 369 'mtime = "{}", size = "{}", ' 370 'md5 = "{}", timestamp = "{}" ' 371 "WHERE inode = {}" 372 ) 373 self._execute( 374 cmd.format( 375 self.STATE_TABLE, 376 actual_mtime, 377 actual_size, 378 md5, 379 current_timestamp(), 380 self._to_sqlite(actual_inode), 381 ) 382 ) 383 return md5, info 384 385 def _insert_new_state_record( 386 self, path, actual_inode, actual_mtime, actual_size, known_checksum 387 ): 388 if known_checksum: 389 md5, info = known_checksum, None 390 else: 391 md5, info = self._collect(path) 392 cmd = ( 393 "INSERT INTO {}(inode, mtime, size, md5, timestamp) " 394 'VALUES ({}, "{}", "{}", "{}", "{}")' 395 ) 396 self._execute( 397 cmd.format( 398 self.STATE_TABLE, 399 self._to_sqlite(actual_inode), 400 actual_mtime, 401 actual_size, 402 md5, 403 current_timestamp(), 404 ) 405 ) 406 self.inserts += 1 407 return md5, info 408 409 def get_state_record_for_inode(self, inode): 410 cmd = "SELECT mtime, size, md5, timestamp from {} " "WHERE inode={}" 411 cmd = cmd.format(self.STATE_TABLE, inode) 412 self._execute(cmd) 413 results = self._fetchall() 414 if results: 415 # uniquness constrain on inode 416 assert len(results) == 1 417 return results[0] 418 return None 419 420 def update(self, path, known_checksum=None): 421 """Gets the checksum for the specified path. Checksum will be 422 retrieved from the state database if available, otherwise it will be 423 computed and cached in the state database for the further use. 424 425 Args: 426 path (str): path to get the checksum for. 427 428 Returns: 429 str: checksum for the specified path. 430 """ 431 return self._do_update(path, known_checksum)[0] 432 433 def update_info(self, path): 434 """Gets the checksum and the directory info (if applicable) for the 435 specified path. 436 437 Args: 438 path (str): path to get the checksum and the directory info for. 439 440 Returns: 441 tuple: checksum for the specified path along with a directory info 442 (list of {relative_path: checksum} entries for each file in the 443 directory) if applicable, otherwise None. 444 """ 445 md5, info = self._do_update(path) 446 if not info: 447 info = self.repo.cache.local.load_dir_cache(md5) 448 return (md5, info) 449 450 def update_link(self, path): 451 """Adds the specified path to the list of links created by dvc. This 452 list is later used on `dvc checkout` to cleanup old links. 453 454 Args: 455 path (str): path to add to the list of links. 456 """ 457 if not os.path.exists(path): 458 return 459 460 mtime, _ = get_mtime_and_size(path) 461 inode = get_inode(path) 462 relpath = os.path.relpath(path, self.root_dir) 463 464 cmd = ( 465 "REPLACE INTO {}(path, inode, mtime) " 466 'VALUES ("{}", {}, "{}")'.format( 467 self.LINK_STATE_TABLE, relpath, self._to_sqlite(inode), mtime 468 ) 469 ) 470 self._execute(cmd) 471 472 def remove_unused_links(self, used): 473 """Removes all saved links except the ones that are used. 474 475 Args: 476 used (list): list of used links that should not be removed. 477 """ 478 unused = [] 479 480 self._execute("SELECT * FROM {}".format(self.LINK_STATE_TABLE)) 481 for row in self.cursor: 482 relpath, inode, mtime = row 483 inode = self._from_sqlite(inode) 484 path = os.path.join(self.root_dir, relpath) 485 486 if path in used: 487 continue 488 489 if not os.path.exists(path): 490 continue 491 492 actual_inode = get_inode(path) 493 actual_mtime, _ = get_mtime_and_size(path) 494 495 if inode == actual_inode and mtime == actual_mtime: 496 logger.debug("Removing '{}' as unused link.".format(path)) 497 remove(path) 498 unused.append(relpath) 499 500 for relpath in unused: 501 cmd = 'DELETE FROM {} WHERE path = "{}"' 502 self._execute(cmd.format(self.LINK_STATE_TABLE, relpath)) 503 504 def _update_cache_directory_state(self): 505 cache_path = self.repo.cache.local.cache_dir 506 mtime, size = get_mtime_and_size(cache_path) 507 inode = get_inode(cache_path) 508 509 cmd = ( 510 "INSERT OR REPLACE INTO {}(inode, size, mtime, timestamp, md5) " 511 'VALUES ({}, "{}", "{}", "{}", "")'.format( 512 self.STATE_TABLE, 513 self._to_sqlite(inode), 514 size, 515 mtime, 516 current_timestamp(), 517 ) 518 ) 519 self._execute(cmd) 520