1"""Support code for working without a supported CI provider."""
2from __future__ import (absolute_import, division, print_function)
3__metaclass__ = type
4
5import os
6import platform
7import random
8import re
9
10from .. import types as t
11
12from ..config import (
13    CommonConfig,
14    TestConfig,
15)
16
17from ..io import (
18    read_text_file,
19)
20
21from ..git import (
22    Git,
23)
24
25from ..util import (
26    ApplicationError,
27    display,
28    is_binary_file,
29    SubprocessError,
30)
31
32from . import (
33    AuthContext,
34    CIProvider,
35)
36
37CODE = ''  # not really a CI provider, so use an empty string for the code
38
39
40class Local(CIProvider):
41    """CI provider implementation when not using CI."""
42    priority = 1000
43
44    @staticmethod
45    def is_supported():  # type: () -> bool
46        """Return True if this provider is supported in the current running environment."""
47        return True
48
49    @property
50    def code(self):  # type: () -> str
51        """Return a unique code representing this provider."""
52        return CODE
53
54    @property
55    def name(self):  # type: () -> str
56        """Return descriptive name for this provider."""
57        return 'Local'
58
59    def generate_resource_prefix(self):  # type: () -> str
60        """Return a resource prefix specific to this CI provider."""
61        prefix = 'ansible-test-%d-%s' % (
62            random.randint(10000000, 99999999),
63            platform.node().split('.')[0],
64        )
65
66        return prefix
67
68    def get_base_branch(self):  # type: () -> str
69        """Return the base branch or an empty string."""
70        return ''
71
72    def detect_changes(self, args):  # type: (TestConfig) -> t.Optional[t.List[str]]
73        """Initialize change detection."""
74        result = LocalChanges(args)
75
76        display.info('Detected branch %s forked from %s at commit %s' % (
77            result.current_branch, result.fork_branch, result.fork_point))
78
79        if result.untracked and not args.untracked:
80            display.warning('Ignored %s untracked file(s). Use --untracked to include them.' %
81                            len(result.untracked))
82
83        if result.committed and not args.committed:
84            display.warning('Ignored %s committed change(s). Omit --ignore-committed to include them.' %
85                            len(result.committed))
86
87        if result.staged and not args.staged:
88            display.warning('Ignored %s staged change(s). Omit --ignore-staged to include them.' %
89                            len(result.staged))
90
91        if result.unstaged and not args.unstaged:
92            display.warning('Ignored %s unstaged change(s). Omit --ignore-unstaged to include them.' %
93                            len(result.unstaged))
94
95        names = set()
96
97        if args.tracked:
98            names |= set(result.tracked)
99        if args.untracked:
100            names |= set(result.untracked)
101        if args.committed:
102            names |= set(result.committed)
103        if args.staged:
104            names |= set(result.staged)
105        if args.unstaged:
106            names |= set(result.unstaged)
107
108        if not args.metadata.changes:
109            args.metadata.populate_changes(result.diff)
110
111            for path in result.untracked:
112                if is_binary_file(path):
113                    args.metadata.changes[path] = ((0, 0),)
114                    continue
115
116                line_count = len(read_text_file(path).splitlines())
117
118                args.metadata.changes[path] = ((1, line_count),)
119
120        return sorted(names)
121
122    def supports_core_ci_auth(self, context):  # type: (AuthContext) -> bool
123        """Return True if Ansible Core CI is supported."""
124        path = self._get_aci_key_path()
125        return os.path.exists(path)
126
127    def prepare_core_ci_auth(self, context):  # type: (AuthContext) -> t.Dict[str, t.Any]
128        """Return authentication details for Ansible Core CI."""
129        path = self._get_aci_key_path()
130        auth_key = read_text_file(path).strip()
131
132        request = dict(
133            key=auth_key,
134            nonce=None,
135        )
136
137        auth = dict(
138            remote=request,
139        )
140
141        return auth
142
143    def get_git_details(self, args):  # type: (CommonConfig) -> t.Optional[t.Dict[str, t.Any]]
144        """Return details about git in the current environment."""
145        return None  # not yet implemented for local
146
147    def _get_aci_key_path(self):  # type: () -> str
148        path = os.path.expanduser('~/.ansible-core-ci.key')
149        return path
150
151
152class InvalidBranch(ApplicationError):
153    """Exception for invalid branch specification."""
154    def __init__(self, branch, reason):  # type: (str, str) -> None
155        message = 'Invalid branch: %s\n%s' % (branch, reason)
156
157        super(InvalidBranch, self).__init__(message)
158
159        self.branch = branch
160
161
162class LocalChanges:
163    """Change information for local work."""
164    def __init__(self, args):  # type: (TestConfig) -> None
165        self.args = args
166        self.git = Git()
167
168        self.current_branch = self.git.get_branch()
169
170        if self.is_official_branch(self.current_branch):
171            raise InvalidBranch(branch=self.current_branch,
172                                reason='Current branch is not a feature branch.')
173
174        self.fork_branch = None
175        self.fork_point = None
176
177        self.local_branches = sorted(self.git.get_branches())
178        self.official_branches = sorted([b for b in self.local_branches if self.is_official_branch(b)])
179
180        for self.fork_branch in self.official_branches:
181            try:
182                self.fork_point = self.git.get_branch_fork_point(self.fork_branch)
183                break
184            except SubprocessError:
185                pass
186
187        if self.fork_point is None:
188            raise ApplicationError('Unable to auto-detect fork branch and fork point.')
189
190        # tracked files (including unchanged)
191        self.tracked = sorted(self.git.get_file_names(['--cached']))
192        # untracked files (except ignored)
193        self.untracked = sorted(self.git.get_file_names(['--others', '--exclude-standard']))
194        # tracked changes (including deletions) committed since the branch was forked
195        self.committed = sorted(self.git.get_diff_names([self.fork_point, 'HEAD']))
196        # tracked changes (including deletions) which are staged
197        self.staged = sorted(self.git.get_diff_names(['--cached']))
198        # tracked changes (including deletions) which are not staged
199        self.unstaged = sorted(self.git.get_diff_names([]))
200        # diff of all tracked files from fork point to working copy
201        self.diff = self.git.get_diff([self.fork_point])
202
203    def is_official_branch(self, name):  # type: (str) -> bool
204        """Return True if the given branch name an official branch for development or releases."""
205        if self.args.base_branch:
206            return name == self.args.base_branch
207
208        if name == 'devel':
209            return True
210
211        if re.match(r'^stable-[0-9]+\.[0-9]+$', name):
212            return True
213
214        return False
215