1import re 2 3from conans.errors import ConanException 4from conans.model.ref import ConanFileReference 5from conans.search.search import search_recipes 6 7re_param = re.compile(r"^(?P<function>include_prerelease|loose)\s*=\s*(?P<value>True|False)$") 8re_version = re.compile(r"^((?!(include_prerelease|loose))[a-zA-Z0-9_+.\-~<>=|*^\s])*$") 9 10 11def _parse_versionexpr(versionexpr, result): 12 expression = [it.strip() for it in versionexpr.split(",")] 13 if len(expression) > 4: 14 raise ConanException("Invalid expression for version_range '{}'".format(versionexpr)) 15 16 include_prerelease = False 17 loose = True 18 version_range = [] 19 20 for i, expr in enumerate(expression): 21 match_param = re_param.match(expr) 22 match_version = re_version.match(expr) 23 24 if match_param == match_version: 25 raise ConanException("Invalid version range '{}', failed in " 26 "chunk '{}'".format(versionexpr, expr)) 27 28 if match_version and i not in [0, 1]: 29 raise ConanException("Invalid version range '{}'".format(versionexpr)) 30 31 if match_param and i not in [1, 2, 3]: 32 raise ConanException("Invalid version range '{}'".format(versionexpr)) 33 34 if match_version: 35 version_range.append(expr) 36 37 if match_param: 38 if match_param.group('function') == 'loose': 39 loose = match_param.group('value') == "True" 40 elif match_param.group('function') == 'include_prerelease': 41 include_prerelease = match_param.group('value') == "True" 42 else: 43 raise ConanException("Unexpected version range " 44 "parameter '{}'".format(match_param.group(1))) 45 46 if len(version_range) > 1: 47 result.append("WARN: Commas as separator in version '%s' range are deprecated " 48 "and will be removed in Conan 2.0" % str(versionexpr)) 49 50 version_range = " ".join(map(str, version_range)) 51 return version_range, loose, include_prerelease 52 53 54def satisfying(list_versions, versionexpr, result): 55 """ returns the maximum version that satisfies the expression 56 if some version cannot be converted to loose SemVer, it is discarded with a msg 57 This provides some workaround for failing comparisons like "2.1" not matching "<=2.1" 58 """ 59 from semver import SemVer, Range, max_satisfying 60 version_range, loose, include_prerelease = _parse_versionexpr(versionexpr, result) 61 62 # Check version range expression 63 try: 64 act_range = Range(version_range, loose) 65 except ValueError: 66 raise ConanException("version range expression '%s' is not valid" % version_range) 67 68 # Validate all versions 69 candidates = {} 70 for v in list_versions: 71 try: 72 ver = SemVer(v, loose=loose) 73 candidates[ver] = v 74 except (ValueError, AttributeError): 75 result.append("WARN: Version '%s' is not semver, cannot be compared with a range" 76 % str(v)) 77 78 # Search best matching version in range 79 result = max_satisfying(candidates, act_range, loose=loose, 80 include_prerelease=include_prerelease) 81 return candidates.get(result) 82 83 84class RangeResolver(object): 85 86 def __init__(self, cache, remote_manager): 87 self._cache = cache 88 self._remote_manager = remote_manager 89 self._cached_remote_found = {} 90 self._result = [] 91 92 @property 93 def output(self): 94 return self._result 95 96 def clear_output(self): 97 self._result = [] 98 99 def resolve(self, require, base_conanref, update, remotes): 100 version_range = require.version_range 101 if version_range is None: 102 return 103 104 if require.is_resolved: 105 ref = require.ref 106 resolved_ref = self._resolve_version(version_range, [ref]) 107 if not resolved_ref: 108 raise ConanException("Version range '%s' required by '%s' not valid for " 109 "downstream requirement '%s'" 110 % (version_range, base_conanref, str(ref))) 111 else: 112 self._result.append("Version range '%s' required by '%s' valid for " 113 "downstream requirement '%s'" 114 % (version_range, base_conanref, str(ref))) 115 return 116 117 ref = require.ref 118 # The search pattern must be a string 119 search_ref = ConanFileReference(ref.name, "*", ref.user, ref.channel) 120 121 if update: 122 resolved_ref, remote_name = self._resolve_remote(search_ref, version_range, remotes) 123 if not resolved_ref: 124 remote_name = None 125 resolved_ref = self._resolve_local(search_ref, version_range) 126 else: 127 remote_name = None 128 resolved_ref = self._resolve_local(search_ref, version_range) 129 if not resolved_ref: 130 resolved_ref, remote_name = self._resolve_remote(search_ref, version_range, remotes) 131 132 origin = ("remote '%s'" % remote_name) if remote_name else "local cache" 133 if resolved_ref: 134 self._result.append("Version range '%s' required by '%s' resolved to '%s' in %s" 135 % (version_range, base_conanref, str(resolved_ref), origin)) 136 require.ref = resolved_ref 137 else: 138 raise ConanException("Version range '%s' from requirement '%s' required by '%s' " 139 "could not be resolved in %s" 140 % (version_range, require, base_conanref, origin)) 141 142 def _resolve_local(self, search_ref, version_range): 143 local_found = search_recipes(self._cache, search_ref) 144 local_found = [ref for ref in local_found 145 if ref.user == search_ref.user and 146 ref.channel == search_ref.channel] 147 if local_found: 148 return self._resolve_version(version_range, local_found) 149 150 def _search_remotes(self, search_ref, remotes): 151 pattern = str(search_ref) 152 for remote in remotes.values(): 153 if not remotes.selected or remote == remotes.selected: 154 result = self._remote_manager.search_recipes(remote, pattern, ignorecase=False) 155 result = [ref for ref in result 156 if ref.user == search_ref.user and ref.channel == search_ref.channel] 157 if result: 158 return result, remote.name 159 return None, None 160 161 def _resolve_remote(self, search_ref, version_range, remotes): 162 # We should use ignorecase=False, we want the exact case! 163 found_refs, remote_name = self._cached_remote_found.get(search_ref, (None, None)) 164 if found_refs is None: 165 # Searching for just the name is much faster in remotes like Artifactory 166 found_refs, remote_name = self._search_remotes(search_ref, remotes) 167 if found_refs: 168 self._result.append("%s versions found in '%s' remote" % (search_ref, remote_name)) 169 else: 170 self._result.append("%s versions not found in remotes") 171 # We don't want here to resolve the revision that should be done in the proxy 172 # as any other regular flow 173 found_refs = [ref.copy_clear_rev() for ref in found_refs or []] 174 # Empty list, just in case it returns None 175 self._cached_remote_found[search_ref] = found_refs, remote_name 176 if found_refs: 177 return self._resolve_version(version_range, found_refs), remote_name 178 return None, None 179 180 def _resolve_version(self, version_range, refs_found): 181 versions = {ref.version: ref for ref in refs_found} 182 result = satisfying(versions, version_range, self._result) 183 return versions.get(result) 184