1# Copyright (C) 2019 Jelmer Vernooij <jelmer@jelmer.uk>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; version 3 of the License or
6# (at your option) a later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17"""Mercurial foreign branch support.
18
19Currently only tells the user that Mercurial is not supported.
20"""
21
22from ... import (
23    controldir,
24    errors,
25    )
26
27from ... import version_info  # noqa: F401
28
29
30class MercurialUnsupportedError(errors.UnsupportedFormatError):
31
32    _fmt = ('Mercurial branches are not yet supported. '
33            'To interoperate with Mercurial, use the fastimport format.')
34
35
36class LocalHgDirFormat(controldir.ControlDirFormat):
37    """Mercurial directory format."""
38
39    def get_converter(self):
40        raise NotImplementedError(self.get_converter)
41
42    def get_format_description(self):
43        return "Local Mercurial control directory"
44
45    def initialize_on_transport(self, transport):
46        raise errors.UninitializableFormat(self)
47
48    def is_supported(self):
49        return False
50
51    def supports_transport(self, transport):
52        return False
53
54    def check_support_status(self, allow_unsupported, recommend_upgrade=True,
55                             basedir=None):
56        raise MercurialUnsupportedError()
57
58    def open(self, transport):
59        # Raise NotBranchError if there is nothing there
60        LocalHgProber().probe_transport(transport)
61        raise NotImplementedError(self.open)
62
63
64class LocalHgProber(controldir.Prober):
65
66    @classmethod
67    def priority(klass, transport):
68        return 100
69
70    @staticmethod
71    def _has_hg_dumb_repository(transport):
72        try:
73            return transport.has_any([".hg/requires", ".hg/00changelog.i"])
74        except (errors.NoSuchFile, errors.PermissionDenied,
75                errors.InvalidHttpResponse):
76            return False
77
78    @classmethod
79    def probe_transport(klass, transport):
80        """Our format is present if the transport has a '.hg/' subdir."""
81        if klass._has_hg_dumb_repository(transport):
82            return LocalHgDirFormat()
83        raise errors.NotBranchError(path=transport.base)
84
85    @classmethod
86    def known_formats(cls):
87        return [LocalHgDirFormat()]
88
89
90class SmartHgDirFormat(controldir.ControlDirFormat):
91    """Mercurial directory format."""
92
93    def get_converter(self):
94        raise NotImplementedError(self.get_converter)
95
96    def get_format_description(self):
97        return "Smart Mercurial control directory"
98
99    def initialize_on_transport(self, transport):
100        raise errors.UninitializableFormat(self)
101
102    def is_supported(self):
103        return False
104
105    def supports_transport(self, transport):
106        return False
107
108    def check_support_status(self, allow_unsupported, recommend_upgrade=True,
109                             basedir=None):
110        raise MercurialUnsupportedError()
111
112    def open(self, transport):
113        # Raise NotBranchError if there is nothing there
114        SmartHgProber().probe_transport(transport)
115        raise NotImplementedError(self.open)
116
117
118class SmartHgProber(controldir.Prober):
119
120    # Perhaps retrieve list from mercurial.hg.schemes ?
121    _supported_schemes = ["http", "https"]
122
123    @classmethod
124    def priority(klass, transport):
125        if 'hg' in transport.base:
126            return 90
127        # hgweb repositories are prone to return *a* page for every possible URL,
128        # making probing hard for other formats so use 99 here rather than 100.
129        return 99
130
131    @staticmethod
132    def _has_hg_http_smart_server(transport, external_url):
133        """Check if there is a Mercurial smart server at the remote location.
134
135        :param transport: Transport to check
136        :param externa_url: External URL for transport
137        :return: Boolean indicating whether transport is backed onto hg
138        """
139        from breezy.urlutils import urlparse
140        parsed_url = urlparse.urlparse(external_url)
141        parsed_url = parsed_url._replace(query='cmd=capabilities')
142        url = urlparse.urlunparse(parsed_url)
143        resp = transport.request(
144            'GET', url, headers={'Accept': 'application/mercurial-0.1'})
145        if resp.status == 404:
146            return False
147        if resp.status == 406:
148            # The server told us that it can't handle our Accept header.
149            return False
150        ct = resp.getheader("Content-Type")
151        if ct is None:
152            return False
153        return ct.startswith("application/mercurial")
154
155    @classmethod
156    def probe_transport(klass, transport):
157        try:
158            external_url = transport.external_url()
159        except errors.InProcessTransport:
160            raise errors.NotBranchError(path=transport.base)
161        scheme = external_url.split(":")[0]
162        if scheme not in klass._supported_schemes:
163            raise errors.NotBranchError(path=transport.base)
164        from breezy import urlutils
165        external_url = urlutils.strip_segment_parameters(external_url)
166        # Explicitly check for .hg directories here, so we avoid
167        # loading foreign branches through Mercurial.
168        if (external_url.startswith("http:") or
169                external_url.startswith("https:")):
170            if klass._has_hg_http_smart_server(transport, external_url):
171                return SmartHgDirFormat()
172        raise errors.NotBranchError(path=transport.base)
173
174    @classmethod
175    def known_formats(cls):
176        return [SmartHgDirFormat()]
177
178
179controldir.ControlDirFormat.register_prober(LocalHgProber)
180controldir.ControlDirFormat.register_prober(SmartHgProber)
181