1# Copyright (C) 2019 Jelmer Vernooij <jelmer@jelmer.uk> 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; version 3 of the License or 6# (at your option) a later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 16 17"""Mercurial foreign branch support. 18 19Currently only tells the user that Mercurial is not supported. 20""" 21 22from ... import ( 23 controldir, 24 errors, 25 ) 26 27from ... import version_info # noqa: F401 28 29 30class MercurialUnsupportedError(errors.UnsupportedFormatError): 31 32 _fmt = ('Mercurial branches are not yet supported. ' 33 'To interoperate with Mercurial, use the fastimport format.') 34 35 36class LocalHgDirFormat(controldir.ControlDirFormat): 37 """Mercurial directory format.""" 38 39 def get_converter(self): 40 raise NotImplementedError(self.get_converter) 41 42 def get_format_description(self): 43 return "Local Mercurial control directory" 44 45 def initialize_on_transport(self, transport): 46 raise errors.UninitializableFormat(self) 47 48 def is_supported(self): 49 return False 50 51 def supports_transport(self, transport): 52 return False 53 54 def check_support_status(self, allow_unsupported, recommend_upgrade=True, 55 basedir=None): 56 raise MercurialUnsupportedError() 57 58 def open(self, transport): 59 # Raise NotBranchError if there is nothing there 60 LocalHgProber().probe_transport(transport) 61 raise NotImplementedError(self.open) 62 63 64class LocalHgProber(controldir.Prober): 65 66 @classmethod 67 def priority(klass, transport): 68 return 100 69 70 @staticmethod 71 def _has_hg_dumb_repository(transport): 72 try: 73 return transport.has_any([".hg/requires", ".hg/00changelog.i"]) 74 except (errors.NoSuchFile, errors.PermissionDenied, 75 errors.InvalidHttpResponse): 76 return False 77 78 @classmethod 79 def probe_transport(klass, transport): 80 """Our format is present if the transport has a '.hg/' subdir.""" 81 if klass._has_hg_dumb_repository(transport): 82 return LocalHgDirFormat() 83 raise errors.NotBranchError(path=transport.base) 84 85 @classmethod 86 def known_formats(cls): 87 return [LocalHgDirFormat()] 88 89 90class SmartHgDirFormat(controldir.ControlDirFormat): 91 """Mercurial directory format.""" 92 93 def get_converter(self): 94 raise NotImplementedError(self.get_converter) 95 96 def get_format_description(self): 97 return "Smart Mercurial control directory" 98 99 def initialize_on_transport(self, transport): 100 raise errors.UninitializableFormat(self) 101 102 def is_supported(self): 103 return False 104 105 def supports_transport(self, transport): 106 return False 107 108 def check_support_status(self, allow_unsupported, recommend_upgrade=True, 109 basedir=None): 110 raise MercurialUnsupportedError() 111 112 def open(self, transport): 113 # Raise NotBranchError if there is nothing there 114 SmartHgProber().probe_transport(transport) 115 raise NotImplementedError(self.open) 116 117 118class SmartHgProber(controldir.Prober): 119 120 # Perhaps retrieve list from mercurial.hg.schemes ? 121 _supported_schemes = ["http", "https"] 122 123 @classmethod 124 def priority(klass, transport): 125 if 'hg' in transport.base: 126 return 90 127 # hgweb repositories are prone to return *a* page for every possible URL, 128 # making probing hard for other formats so use 99 here rather than 100. 129 return 99 130 131 @staticmethod 132 def _has_hg_http_smart_server(transport, external_url): 133 """Check if there is a Mercurial smart server at the remote location. 134 135 :param transport: Transport to check 136 :param externa_url: External URL for transport 137 :return: Boolean indicating whether transport is backed onto hg 138 """ 139 from breezy.urlutils import urlparse 140 parsed_url = urlparse.urlparse(external_url) 141 parsed_url = parsed_url._replace(query='cmd=capabilities') 142 url = urlparse.urlunparse(parsed_url) 143 resp = transport.request( 144 'GET', url, headers={'Accept': 'application/mercurial-0.1'}) 145 if resp.status == 404: 146 return False 147 if resp.status == 406: 148 # The server told us that it can't handle our Accept header. 149 return False 150 ct = resp.getheader("Content-Type") 151 if ct is None: 152 return False 153 return ct.startswith("application/mercurial") 154 155 @classmethod 156 def probe_transport(klass, transport): 157 try: 158 external_url = transport.external_url() 159 except errors.InProcessTransport: 160 raise errors.NotBranchError(path=transport.base) 161 scheme = external_url.split(":")[0] 162 if scheme not in klass._supported_schemes: 163 raise errors.NotBranchError(path=transport.base) 164 from breezy import urlutils 165 external_url = urlutils.strip_segment_parameters(external_url) 166 # Explicitly check for .hg directories here, so we avoid 167 # loading foreign branches through Mercurial. 168 if (external_url.startswith("http:") or 169 external_url.startswith("https:")): 170 if klass._has_hg_http_smart_server(transport, external_url): 171 return SmartHgDirFormat() 172 raise errors.NotBranchError(path=transport.base) 173 174 @classmethod 175 def known_formats(cls): 176 return [SmartHgDirFormat()] 177 178 179controldir.ControlDirFormat.register_prober(LocalHgProber) 180controldir.ControlDirFormat.register_prober(SmartHgProber) 181