1from __future__ import unicode_literals
2
3import os
4
5import dvc.logger as logger
6from dvc.exceptions import ReproductionError
7
8
9def _reproduce_stage(stages, node, force, dry, interactive, no_commit):
10    stage = stages[node]
11
12    if stage.locked:
13        logger.warning(
14            "DVC file '{path}' is locked. Its dependencies are"
15            " not going to be reproduced.".format(path=stage.relpath)
16        )
17
18    stage = stage.reproduce(
19        force=force, dry=dry, interactive=interactive, no_commit=no_commit
20    )
21    if not stage:
22        return []
23
24    if not dry:
25        stage.dump()
26
27    return [stage]
28
29
30def reproduce(
31    self,
32    target=None,
33    recursive=True,
34    force=False,
35    dry=False,
36    interactive=False,
37    pipeline=False,
38    all_pipelines=False,
39    ignore_build_cache=False,
40    no_commit=False,
41):
42    from dvc.stage import Stage
43
44    if not target and not all_pipelines:
45        raise ValueError()
46
47    if not interactive:
48        config = self.config
49        core = config.config[config.SECTION_CORE]
50        interactive = core.get(config.SECTION_CORE_INTERACTIVE, False)
51
52    targets = []
53    if pipeline or all_pipelines:
54        if pipeline:
55            stage = Stage.load(self, target)
56            node = os.path.relpath(stage.path, self.root_dir)
57            pipelines = [self._get_pipeline(node)]
58        else:
59            pipelines = self.pipelines()
60
61        for G in pipelines:
62            for node in G.nodes():
63                if G.in_degree(node) == 0:
64                    targets.append(os.path.join(self.root_dir, node))
65    else:
66        targets.append(target)
67
68    self.files_to_git_add = []
69
70    ret = []
71    with self.state:
72        for target in targets:
73            stages = _reproduce(
74                self,
75                target,
76                recursive=recursive,
77                force=force,
78                dry=dry,
79                interactive=interactive,
80                ignore_build_cache=ignore_build_cache,
81                no_commit=no_commit,
82            )
83            ret.extend(stages)
84
85    self.remind_to_git_add()
86
87    return ret
88
89
90def _reproduce(
91    self,
92    target,
93    recursive=True,
94    force=False,
95    dry=False,
96    interactive=False,
97    ignore_build_cache=False,
98    no_commit=False,
99):
100    import networkx as nx
101    from dvc.stage import Stage
102
103    stage = Stage.load(self, target)
104    G = self.graph()[1]
105    stages = nx.get_node_attributes(G, "stage")
106    node = os.path.relpath(stage.path, self.root_dir)
107
108    if recursive:
109        ret = _reproduce_stages(
110            G,
111            stages,
112            node,
113            force,
114            dry,
115            interactive,
116            ignore_build_cache,
117            no_commit,
118        )
119    else:
120        ret = _reproduce_stage(
121            stages, node, force, dry, interactive, no_commit
122        )
123
124    return ret
125
126
127def _reproduce_stages(
128    G, stages, node, force, dry, interactive, ignore_build_cache, no_commit
129):
130    import networkx as nx
131
132    result = []
133    for n in nx.dfs_postorder_nodes(G, node):
134        try:
135            ret = _reproduce_stage(
136                stages, n, force, dry, interactive, no_commit
137            )
138
139            if len(ret) != 0 and ignore_build_cache:
140                # NOTE: we are walking our pipeline from the top to the
141                # bottom. If one stage is changed, it will be reproduced,
142                # which tells us that we should force reproducing all of
143                # the other stages down below, even if their direct
144                # dependencies didn't change.
145                force = True
146
147            result += ret
148        except Exception as ex:
149            raise ReproductionError(stages[n].relpath, ex)
150    return result
151