1import copy
2import os
3from shutil import which
4
5from .vcs_base import VcsClientBase
6from ..util import rmtree
7
8
9class BzrClient(VcsClientBase):
10
11    type = 'bzr'
12    _executable = None
13
14    @staticmethod
15    def is_repository(path):
16        return os.path.isdir(os.path.join(path, '.bzr'))
17
18    def __init__(self, path):
19        super(BzrClient, self).__init__(path)
20
21    def branch(self, command):
22        if command.all:
23            return self._not_applicable(
24                command,
25                message='at least with the option to list all branches')
26
27        self._check_executable()
28        return self._get_parent_branch()
29
30    def custom(self, command):
31        self._check_executable()
32        cmd = [BzrClient._executable] + command.args
33        return self._run_command(cmd)
34
35    def diff(self, _command):
36        self._check_executable()
37        cmd = [BzrClient._executable, 'diff']
38        return self._run_command(cmd)
39
40    def import_(self, command):
41        if not command.url:
42            return {
43                'cmd': '',
44                'cwd': self.path,
45                'output': "Repository data lacks the 'url' value",
46                'returncode': 1
47            }
48
49        self._check_executable()
50        if BzrClient.is_repository(self.path):
51            # verify that existing repository is the same
52            result_parent_branch = self._get_parent_branch()
53            if result_parent_branch['returncode']:
54                return result_parent_branch
55            parent_branch = result_parent_branch['output']
56            if parent_branch != command.url:
57                if not command.force:
58                    return {
59                        'cmd': '',
60                        'cwd': self.path,
61                        'output':
62                            'Path already exists and contains a different '
63                            'repository',
64                        'returncode': 1
65                    }
66                try:
67                    rmtree(self.path)
68                except OSError:
69                    os.remove(self.path)
70
71        not_exist = self._create_path()
72        if not_exist:
73            return not_exist
74
75        if BzrClient.is_repository(self.path):
76            # pull updates for existing repo
77            cmd_pull = [BzrClient._executable, 'pull']
78            return self._run_command(cmd_pull, retry=command.retry)
79
80        else:
81            cmd_branch = [BzrClient._executable, 'branch']
82            if command.version:
83                cmd_branch += ['-r', command.version]
84            cmd_branch += [command.url, '.']
85            result_branch = self._run_command(cmd_branch, retry=command.retry)
86            if result_branch['returncode']:
87                result_branch['output'] = \
88                    "Could not branch repository '%s': %s" % \
89                    (command.url, result_branch['output'])
90                return result_branch
91            return result_branch
92
93    def log(self, command):
94        self._check_executable()
95        if command.limit_tag or command.limit_untagged:
96            tag = None
97            if command.limit_tag:
98                tag = command.limit_tag
99            else:
100                # determine nearest tag
101                cmd_tag = [BzrClient._executable, 'tags', '--sort=time']
102                result_tag = self._run_command(cmd_tag)
103                if result_tag['returncode']:
104                    return result_tag
105                for line in result_tag['output'].splitlines():
106                    parts = line.split(' ', 2)
107                    if parts[1] != '?':
108                        tag = parts[0]
109                if not tag:
110                    result_tag['output'] = 'Could not determine latest tag',
111                    result_tag['returncode'] = 1
112                    return result_tag
113            # determine revision number of tag
114            cmd_tag_rev = [
115                BzrClient._executable, 'revno', '--rev', 'tag:' + tag]
116            result_tag_rev = self._run_command(cmd_tag_rev)
117            if result_tag_rev['returncode']:
118                if command.limit_tag:
119                    result_tag_rev['output'] = \
120                        "Repository lacks the tag '%s'" % tag
121                return result_tag_rev
122            try:
123                tag_rev = int(result_tag_rev['output'])
124                tag_next_rev = tag_rev + 1
125            except ValueError:
126                tag_rev = result_tag_rev['output']
127                tag_next_rev = tag_rev
128            # determine revision number of HEAD
129            cmd_head_rev = [BzrClient._executable, 'revno']
130            result_head_rev = self._run_command(cmd_head_rev)
131            if result_head_rev['returncode']:
132                return result_head_rev
133            try:
134                head_rev = int(result_head_rev['output'])
135            except ValueError:
136                head_rev = result_head_rev['output']
137            # output log since nearest tag
138            cmd_log = [
139                BzrClient._executable, 'log',
140                '--rev', 'revno:%s..' % str(tag_next_rev)]
141            if tag_rev == head_rev:
142                return {
143                    'cmd': ' '.join(cmd_log),
144                    'cwd': self.path,
145                    'output': '',
146                    'returncode': 0
147                }
148            if command.limit != 0:
149                cmd_log += ['--limit', '%d' % command.limit]
150            result_log = self._run_command(cmd_log)
151            return result_log
152        cmd = [BzrClient._executable, 'log']
153        if command.limit != 0:
154            cmd += ['--limit', '%d' % command.limit]
155        return self._run_command(cmd)
156
157    def pull(self, _command):
158        self._check_executable()
159        cmd = [BzrClient._executable, 'pull']
160        return self._run_command(cmd)
161
162    def push(self, _command):
163        self._check_executable()
164        cmd = [BzrClient._executable, 'push']
165        return self._run_command(cmd)
166
167    def remotes(self, _command):
168        self._check_executable()
169        return self._get_parent_branch()
170
171    def status(self, _command):
172        self._check_executable()
173        cmd = [BzrClient._executable, 'status']
174        return self._run_command(cmd)
175
176    def _get_parent_branch(self):
177        cmd = [BzrClient._executable, 'info']
178        # parsing the text output requires enforcing language
179        env = copy.copy(os.environ)
180        env['LANG'] = 'en_US.UTF-8'
181        result = self._run_command(cmd, env)
182        if result['returncode']:
183            return result
184        branch = None
185        prefix = '  parent branch: '
186        for line in result['output'].splitlines():
187            if line.startswith(prefix):
188                branch = line[len(prefix):]
189                break
190        if not branch:
191            result['output'] = 'Could not determine parent branch',
192            result['returncode'] = 1
193            return result
194        result['output'] = branch
195        return result
196
197    def _check_executable(self):
198        assert BzrClient._executable is not None, \
199            "Could not find 'bzr' executable"
200
201
202if not BzrClient._executable:
203    BzrClient._executable = which('bzr')
204