1"""Manages state database used for checksum caching."""
2
3from __future__ import unicode_literals
4
5import os
6import sqlite3
7
8import dvc.logger as logger
9from dvc.config import Config
10from dvc.utils import file_md5, remove, current_timestamp
11from dvc.exceptions import DvcException
12from dvc.utils.fs import get_mtime_and_size, get_inode
13
14
15class StateVersionTooNewError(DvcException):
16    """Thrown when dvc version is older than the state database version."""
17
18    def __init__(self, dvc_version, expected, actual):
19        super(StateVersionTooNewError, self).__init__(
20            "you are using an old version '{dvc_version}' of dvc that is "
21            "using state file version '{expected}' which is not compatible "
22            "with the state file version '{actual}' that is used in this "
23            "repo. Please upgrade right now!".format(
24                dvc_version=dvc_version, expected=expected, actual=actual
25            )
26        )
27
28
29def _file_metadata_changed(actual_mtime, mtime, actual_size, size):
30    return actual_mtime != mtime or actual_size != size
31
32
33class State(object):  # pylint: disable=too-many-instance-attributes
34    """Class for the state database.
35
36    Args:
37        repo (dvc.repo.Repo): repo instance that this state belongs to.
38        config (configobj.ConfigObj): config for the state.
39
40    Raises:
41        StateVersionTooNewError: thrown when dvc version is older than the
42            state database version.
43    """
44
45    VERSION = 3
46    STATE_FILE = "state"
47    STATE_TABLE = "state"
48    STATE_TABLE_LAYOUT = (
49        "inode INTEGER PRIMARY KEY, "
50        "mtime TEXT NOT NULL, "
51        "size TEXT NOT NULL, "
52        "md5 TEXT NOT NULL, "
53        "timestamp TEXT NOT NULL"
54    )
55
56    STATE_INFO_TABLE = "state_info"
57    STATE_INFO_TABLE_LAYOUT = "count INTEGER"
58    STATE_INFO_ROW = 1
59
60    LINK_STATE_TABLE = "link_state"
61    LINK_STATE_TABLE_LAYOUT = (
62        "path TEXT PRIMARY KEY, "
63        "inode INTEGER NOT NULL, "
64        "mtime TEXT NOT NULL"
65    )
66
67    STATE_ROW_LIMIT = 100000000
68    STATE_ROW_CLEANUP_QUOTA = 50
69
70    MAX_INT = 2 ** 63 - 1
71    MAX_UINT = 2 ** 64 - 2
72
73    def __init__(self, repo, config):
74        self.repo = repo
75        self.dvc_dir = repo.dvc_dir
76        self.root_dir = repo.root_dir
77
78        self.row_limit = 100
79        self.row_cleanup_quota = 50
80
81        state_config = config.get(Config.SECTION_STATE, {})
82        self.row_limit = state_config.get(
83            Config.SECTION_STATE_ROW_LIMIT, self.STATE_ROW_LIMIT
84        )
85        self.row_cleanup_quota = state_config.get(
86            Config.SECTION_STATE_ROW_CLEANUP_QUOTA,
87            self.STATE_ROW_CLEANUP_QUOTA,
88        )
89
90        if not self.dvc_dir:
91            self.state_file = None
92            return
93
94        self.state_file = os.path.join(self.dvc_dir, self.STATE_FILE)
95
96        # https://www.sqlite.org/tempfiles.html
97        self.temp_files = [
98            self.state_file + "-journal",
99            self.state_file + "-wal",
100        ]
101
102        self.database = None
103        self.cursor = None
104        self.inserts = 0
105
106    def __enter__(self):
107        self.load()
108
109    def __exit__(self, typ, value, tbck):
110        self.dump()
111
112    def _collect(self, path):
113        if os.path.isdir(path):
114            return self.repo.cache.local.collect_dir_cache(path)
115        return (file_md5(path)[0], None)
116
117    def changed(self, path, md5):
118        """Check if file/directory has the expected md5.
119
120        Args:
121            path (str): path to the file/directory to check.
122            md5 (str): expected md5.
123
124        Returns:
125            bool: True if path has the expected md5, False otherwise.
126        """
127        actual = self.update(path)
128
129        msg = "File '{}', md5 '{}', actual '{}'"
130        logger.debug(msg.format(path, md5, actual))
131
132        if not md5 or not actual:
133            return True
134
135        return actual.split(".")[0] != md5.split(".")[0]
136
137    def _execute(self, cmd):
138        logger.debug(cmd)
139        return self.cursor.execute(cmd)
140
141    def _fetchall(self):
142        ret = self.cursor.fetchall()
143        logger.debug("fetched: {}".format(ret))
144        return ret
145
146    def _to_sqlite(self, num):
147        assert num >= 0
148        assert num < self.MAX_UINT
149        # NOTE: sqlite stores unit as signed ints, so maximum uint is 2^63-1
150        # see http://jakegoulding.com/blog/2011/02/06/sqlite-64-bit-integers/
151        if num > self.MAX_INT:
152            ret = -(num - self.MAX_INT)
153        else:
154            ret = num
155        assert self._from_sqlite(ret) == num
156        return ret
157
158    def _from_sqlite(self, num):
159        assert abs(num) <= self.MAX_INT
160        if num < 0:
161            return abs(num) + self.MAX_INT
162        assert num < self.MAX_UINT
163        assert num >= 0
164        return num
165
166    def _prepare_db(self, empty=False):
167        from dvc import VERSION
168
169        if not empty:
170            cmd = "PRAGMA user_version;"
171            self._execute(cmd)
172            ret = self._fetchall()
173            assert len(ret) == 1
174            assert len(ret[0]) == 1
175            assert isinstance(ret[0][0], int)
176            version = ret[0][0]
177
178            if version > self.VERSION:
179                raise StateVersionTooNewError(VERSION, self.VERSION, version)
180            elif version < self.VERSION:
181                msg = (
182                    "State file version '{}' is too old. "
183                    "Reformatting to the current version '{}'."
184                )
185                logger.warning(msg.format(version, self.VERSION))
186                cmd = "DROP TABLE IF EXISTS {};"
187                self._execute(cmd.format(self.STATE_TABLE))
188                self._execute(cmd.format(self.STATE_INFO_TABLE))
189                self._execute(cmd.format(self.LINK_STATE_TABLE))
190
191        # Check that the state file is indeed a database
192        cmd = "CREATE TABLE IF NOT EXISTS {} ({})"
193        self._execute(cmd.format(self.STATE_TABLE, self.STATE_TABLE_LAYOUT))
194        self._execute(
195            cmd.format(self.STATE_INFO_TABLE, self.STATE_INFO_TABLE_LAYOUT)
196        )
197        self._execute(
198            cmd.format(self.LINK_STATE_TABLE, self.LINK_STATE_TABLE_LAYOUT)
199        )
200
201        cmd = (
202            "INSERT OR IGNORE INTO {} (count) SELECT 0 "
203            "WHERE NOT EXISTS (SELECT * FROM {})"
204        )
205        self._execute(cmd.format(self.STATE_INFO_TABLE, self.STATE_INFO_TABLE))
206
207        cmd = "PRAGMA user_version = {};"
208        self._execute(cmd.format(self.VERSION))
209
210    def load(self):
211        """Loads state database."""
212        retries = 1
213        while True:
214            assert self.database is None
215            assert self.cursor is None
216            assert self.inserts == 0
217            empty = not os.path.exists(self.state_file)
218            self.database = sqlite3.connect(self.state_file)
219            self.cursor = self.database.cursor()
220
221            # Try loading once to check that the file is indeed a database
222            # and reformat it if it is not.
223            try:
224                self._prepare_db(empty=empty)
225                return
226            except sqlite3.DatabaseError:
227                self.cursor.close()
228                self.database.close()
229                self.database = None
230                self.cursor = None
231                self.inserts = 0
232                if retries > 0:
233                    os.unlink(self.state_file)
234                    retries -= 1
235                else:
236                    raise
237
238    def _vacuum(self):
239        # NOTE: see https://bugs.python.org/issue28518
240        self.database.isolation_level = None
241        self._execute("VACUUM")
242        self.database.isolation_level = ""
243
244    def dump(self):
245        """Saves state database."""
246        assert self.database is not None
247
248        cmd = "SELECT count from {} WHERE rowid={}"
249        self._execute(cmd.format(self.STATE_INFO_TABLE, self.STATE_INFO_ROW))
250        ret = self._fetchall()
251        assert len(ret) == 1
252        assert len(ret[0]) == 1
253        count = self._from_sqlite(ret[0][0]) + self.inserts
254
255        if count > self.row_limit:
256            msg = "cleaning up state, this might take a while."
257            logger.warning(msg)
258
259            delete = count - self.row_limit
260            delete += int(self.row_limit * (self.row_cleanup_quota / 100.0))
261            cmd = (
262                "DELETE FROM {} WHERE timestamp IN ("
263                "SELECT timestamp FROM {} ORDER BY timestamp ASC LIMIT {});"
264            )
265            self._execute(
266                cmd.format(self.STATE_TABLE, self.STATE_TABLE, delete)
267            )
268
269            self._vacuum()
270
271            cmd = "SELECT COUNT(*) FROM {}"
272
273            self._execute(cmd.format(self.STATE_TABLE))
274            ret = self._fetchall()
275            assert len(ret) == 1
276            assert len(ret[0]) == 1
277            count = ret[0][0]
278
279        cmd = "UPDATE {} SET count = {} WHERE rowid = {}"
280        self._execute(
281            cmd.format(
282                self.STATE_INFO_TABLE,
283                self._to_sqlite(count),
284                self.STATE_INFO_ROW,
285            )
286        )
287
288        self._update_cache_directory_state()
289
290        self.database.commit()
291        self.cursor.close()
292        self.database.close()
293        self.database = None
294        self.cursor = None
295        self.inserts = 0
296
297    def _do_update(self, path, known_checksum=None):
298        """
299        Make sure the stored info for the given path is up to date.
300        """
301        if not os.path.exists(path):
302            return None, None
303
304        actual_mtime, actual_size = get_mtime_and_size(path)
305        actual_inode = get_inode(path)
306
307        existing_record = self.get_state_record_for_inode(actual_inode)
308
309        if existing_record:
310            md5, info = self._update_existing_state_record(
311                path,
312                actual_inode,
313                actual_mtime,
314                actual_size,
315                existing_record,
316                known_checksum,
317            )
318        else:
319            md5, info = self._insert_new_state_record(
320                path, actual_inode, actual_mtime, actual_size, known_checksum
321            )
322
323        return md5, info
324
325    def _update_existing_state_record(
326        self,
327        path,
328        actual_inode,
329        actual_mtime,
330        actual_size,
331        existing_record,
332        known_checksum=None,
333    ):
334
335        mtime, size, md5, _ = existing_record
336        if _file_metadata_changed(actual_mtime, mtime, actual_size, size):
337            md5, info = self._update_state_for_path_changed(
338                path, actual_inode, actual_mtime, actual_size, known_checksum
339            )
340        else:
341            info = None
342            self._update_state_record_timestamp_for_inode(actual_inode)
343        return md5, info
344
345    def _update_state_record_timestamp_for_inode(self, actual_inode):
346        cmd = 'UPDATE {} SET timestamp = "{}" WHERE inode = {}'
347        self._execute(
348            cmd.format(
349                self.STATE_TABLE,
350                current_timestamp(),
351                self._to_sqlite(actual_inode),
352            )
353        )
354
355    def _update_state_for_path_changed(
356        self,
357        path,
358        actual_inode,
359        actual_mtime,
360        actual_size,
361        known_checksum=None,
362    ):
363        if known_checksum:
364            md5, info = known_checksum, None
365        else:
366            md5, info = self._collect(path)
367        cmd = (
368            "UPDATE {} SET "
369            'mtime = "{}", size = "{}", '
370            'md5 = "{}", timestamp = "{}" '
371            "WHERE inode = {}"
372        )
373        self._execute(
374            cmd.format(
375                self.STATE_TABLE,
376                actual_mtime,
377                actual_size,
378                md5,
379                current_timestamp(),
380                self._to_sqlite(actual_inode),
381            )
382        )
383        return md5, info
384
385    def _insert_new_state_record(
386        self, path, actual_inode, actual_mtime, actual_size, known_checksum
387    ):
388        if known_checksum:
389            md5, info = known_checksum, None
390        else:
391            md5, info = self._collect(path)
392        cmd = (
393            "INSERT INTO {}(inode, mtime, size, md5, timestamp) "
394            'VALUES ({}, "{}", "{}", "{}", "{}")'
395        )
396        self._execute(
397            cmd.format(
398                self.STATE_TABLE,
399                self._to_sqlite(actual_inode),
400                actual_mtime,
401                actual_size,
402                md5,
403                current_timestamp(),
404            )
405        )
406        self.inserts += 1
407        return md5, info
408
409    def get_state_record_for_inode(self, inode):
410        cmd = "SELECT mtime, size, md5, timestamp from {} " "WHERE inode={}"
411        cmd = cmd.format(self.STATE_TABLE, inode)
412        self._execute(cmd)
413        results = self._fetchall()
414        if results:
415            # uniquness constrain on inode
416            assert len(results) == 1
417            return results[0]
418        return None
419
420    def update(self, path, known_checksum=None):
421        """Gets the checksum for the specified path. Checksum will be
422        retrieved from the state database if available, otherwise it will be
423        computed and cached in the state database for the further use.
424
425        Args:
426            path (str): path to get the checksum for.
427
428        Returns:
429            str: checksum for the specified path.
430        """
431        return self._do_update(path, known_checksum)[0]
432
433    def update_info(self, path):
434        """Gets the checksum and the directory info (if applicable) for the
435        specified path.
436
437        Args:
438            path (str): path to get the checksum and the directory info for.
439
440        Returns:
441            tuple: checksum for the specified path along with a directory info
442            (list of {relative_path: checksum} entries for each file in the
443            directory) if applicable, otherwise None.
444        """
445        md5, info = self._do_update(path)
446        if not info:
447            info = self.repo.cache.local.load_dir_cache(md5)
448        return (md5, info)
449
450    def update_link(self, path):
451        """Adds the specified path to the list of links created by dvc. This
452        list is later used on `dvc checkout` to cleanup old links.
453
454        Args:
455            path (str): path to add to the list of links.
456        """
457        if not os.path.exists(path):
458            return
459
460        mtime, _ = get_mtime_and_size(path)
461        inode = get_inode(path)
462        relpath = os.path.relpath(path, self.root_dir)
463
464        cmd = (
465            "REPLACE INTO {}(path, inode, mtime) "
466            'VALUES ("{}", {}, "{}")'.format(
467                self.LINK_STATE_TABLE, relpath, self._to_sqlite(inode), mtime
468            )
469        )
470        self._execute(cmd)
471
472    def remove_unused_links(self, used):
473        """Removes all saved links except the ones that are used.
474
475        Args:
476            used (list): list of used links that should not be removed.
477        """
478        unused = []
479
480        self._execute("SELECT * FROM {}".format(self.LINK_STATE_TABLE))
481        for row in self.cursor:
482            relpath, inode, mtime = row
483            inode = self._from_sqlite(inode)
484            path = os.path.join(self.root_dir, relpath)
485
486            if path in used:
487                continue
488
489            if not os.path.exists(path):
490                continue
491
492            actual_inode = get_inode(path)
493            actual_mtime, _ = get_mtime_and_size(path)
494
495            if inode == actual_inode and mtime == actual_mtime:
496                logger.debug("Removing '{}' as unused link.".format(path))
497                remove(path)
498                unused.append(relpath)
499
500        for relpath in unused:
501            cmd = 'DELETE FROM {} WHERE path = "{}"'
502            self._execute(cmd.format(self.LINK_STATE_TABLE, relpath))
503
504    def _update_cache_directory_state(self):
505        cache_path = self.repo.cache.local.cache_dir
506        mtime, size = get_mtime_and_size(cache_path)
507        inode = get_inode(cache_path)
508
509        cmd = (
510            "INSERT OR REPLACE INTO {}(inode, size, mtime, timestamp, md5) "
511            'VALUES ({}, "{}", "{}", "{}", "")'.format(
512                self.STATE_TABLE,
513                self._to_sqlite(inode),
514                size,
515                mtime,
516                current_timestamp(),
517            )
518        )
519        self._execute(cmd)
520