1from __future__ import unicode_literals 2 3import os 4 5import dvc.logger as logger 6from dvc.exceptions import ReproductionError 7 8 9def _reproduce_stage(stages, node, force, dry, interactive, no_commit): 10 stage = stages[node] 11 12 if stage.locked: 13 logger.warning( 14 "DVC file '{path}' is locked. Its dependencies are" 15 " not going to be reproduced.".format(path=stage.relpath) 16 ) 17 18 stage = stage.reproduce( 19 force=force, dry=dry, interactive=interactive, no_commit=no_commit 20 ) 21 if not stage: 22 return [] 23 24 if not dry: 25 stage.dump() 26 27 return [stage] 28 29 30def reproduce( 31 self, 32 target=None, 33 recursive=True, 34 force=False, 35 dry=False, 36 interactive=False, 37 pipeline=False, 38 all_pipelines=False, 39 ignore_build_cache=False, 40 no_commit=False, 41): 42 from dvc.stage import Stage 43 44 if not target and not all_pipelines: 45 raise ValueError() 46 47 if not interactive: 48 config = self.config 49 core = config.config[config.SECTION_CORE] 50 interactive = core.get(config.SECTION_CORE_INTERACTIVE, False) 51 52 targets = [] 53 if pipeline or all_pipelines: 54 if pipeline: 55 stage = Stage.load(self, target) 56 node = os.path.relpath(stage.path, self.root_dir) 57 pipelines = [self._get_pipeline(node)] 58 else: 59 pipelines = self.pipelines() 60 61 for G in pipelines: 62 for node in G.nodes(): 63 if G.in_degree(node) == 0: 64 targets.append(os.path.join(self.root_dir, node)) 65 else: 66 targets.append(target) 67 68 self.files_to_git_add = [] 69 70 ret = [] 71 with self.state: 72 for target in targets: 73 stages = _reproduce( 74 self, 75 target, 76 recursive=recursive, 77 force=force, 78 dry=dry, 79 interactive=interactive, 80 ignore_build_cache=ignore_build_cache, 81 no_commit=no_commit, 82 ) 83 ret.extend(stages) 84 85 self.remind_to_git_add() 86 87 return ret 88 89 90def _reproduce( 91 self, 92 target, 93 recursive=True, 94 force=False, 95 dry=False, 96 interactive=False, 97 ignore_build_cache=False, 98 no_commit=False, 99): 100 import networkx as nx 101 from dvc.stage import Stage 102 103 stage = Stage.load(self, target) 104 G = self.graph()[1] 105 stages = nx.get_node_attributes(G, "stage") 106 node = os.path.relpath(stage.path, self.root_dir) 107 108 if recursive: 109 ret = _reproduce_stages( 110 G, 111 stages, 112 node, 113 force, 114 dry, 115 interactive, 116 ignore_build_cache, 117 no_commit, 118 ) 119 else: 120 ret = _reproduce_stage( 121 stages, node, force, dry, interactive, no_commit 122 ) 123 124 return ret 125 126 127def _reproduce_stages( 128 G, stages, node, force, dry, interactive, ignore_build_cache, no_commit 129): 130 import networkx as nx 131 132 result = [] 133 for n in nx.dfs_postorder_nodes(G, node): 134 try: 135 ret = _reproduce_stage( 136 stages, n, force, dry, interactive, no_commit 137 ) 138 139 if len(ret) != 0 and ignore_build_cache: 140 # NOTE: we are walking our pipeline from the top to the 141 # bottom. If one stage is changed, it will be reproduced, 142 # which tells us that we should force reproducing all of 143 # the other stages down below, even if their direct 144 # dependencies didn't change. 145 force = True 146 147 result += ret 148 except Exception as ex: 149 raise ReproductionError(stages[n].relpath, ex) 150 return result 151