1import gzip 2import logging 3import os 4import platform 5import stat 6import subprocess 7import sys 8from contextlib import contextmanager 9from fnmatch import fnmatch 10 11import six 12from patch_ng import fromfile, fromstring 13 14from conans.client.output import ConanOutput 15from conans.errors import ConanException 16from conans.util.fallbacks import default_output 17from conans.util.files import (_generic_algorithm_sum, load, save) 18 19UNIT_SIZE = 1000.0 20# Library extensions supported by collect_libs 21VALID_LIB_EXTENSIONS = (".so", ".lib", ".a", ".dylib", ".bc") 22 23 24@contextmanager 25def chdir(newdir): 26 old_path = os.getcwd() 27 os.chdir(newdir) 28 try: 29 yield 30 finally: 31 os.chdir(old_path) 32 33 34def human_size(size_bytes): 35 """ 36 format a size in bytes into a 'human' file size, e.g. B, KB, MB, GB, TB, PB 37 Note that bytes will be reported in whole numbers but KB and above will have 38 greater precision. e.g. 43 B, 443 KB, 4.3 MB, 4.43 GB, etc 39 """ 40 41 suffixes_table = [('B', 0), ('KB', 1), ('MB', 1), ('GB', 2), ('TB', 2), ('PB', 2)] 42 43 num = float(size_bytes) 44 for suffix, precision in suffixes_table: 45 if num < UNIT_SIZE: 46 break 47 num /= UNIT_SIZE 48 49 if precision == 0: 50 formatted_size = "%d" % num 51 else: 52 formatted_size = str(round(num, ndigits=precision)) 53 54 return "%s%s" % (formatted_size, suffix) 55 56 57def unzip(filename, destination=".", keep_permissions=False, pattern=None, output=None, 58 strip_root=False): 59 """ 60 Unzip a zipped file 61 :param filename: Path to the zip file 62 :param destination: Destination folder (or file for .gz files) 63 :param keep_permissions: Keep the zip permissions. WARNING: Can be 64 dangerous if the zip was not created in a NIX system, the bits could 65 produce undefined permission schema. Use this option only if you are sure 66 that the zip was created correctly. 67 :param pattern: Extract only paths matching the pattern. This should be a 68 Unix shell-style wildcard, see fnmatch documentation for more details. 69 :param output: output 70 :param flat: If all the contents are in a single dir, flat that directory. 71 :return: 72 """ 73 output = default_output(output, 'conans.client.tools.files.unzip') 74 75 if (filename.endswith(".tar.gz") or filename.endswith(".tgz") or 76 filename.endswith(".tbz2") or filename.endswith(".tar.bz2") or 77 filename.endswith(".tar")): 78 return untargz(filename, destination, pattern, strip_root) 79 if filename.endswith(".gz"): 80 with gzip.open(filename, 'rb') as f: 81 file_content = f.read() 82 target_name = filename[:-3] if destination == "." else destination 83 save(target_name, file_content) 84 return 85 if filename.endswith(".tar.xz") or filename.endswith(".txz"): 86 if six.PY2: 87 raise ConanException("XZ format not supported in Python 2. Use Python 3 instead") 88 return untargz(filename, destination, pattern, strip_root) 89 90 import zipfile 91 full_path = os.path.normpath(os.path.join(os.getcwd(), destination)) 92 93 if hasattr(sys.stdout, "isatty") and sys.stdout.isatty(): 94 def print_progress(the_size, uncomp_size): 95 the_size = (the_size * 100.0 / uncomp_size) if uncomp_size != 0 else 0 96 txt_msg = "Unzipping %d %%" 97 if the_size > print_progress.last_size + 1: 98 output.rewrite_line(txt_msg % the_size) 99 print_progress.last_size = the_size 100 if int(the_size) == 99: 101 output.rewrite_line(txt_msg % 100) 102 else: 103 def print_progress(_, __): 104 pass 105 106 with zipfile.ZipFile(filename, "r") as z: 107 zip_info = z.infolist() 108 if pattern: 109 zip_info = [zi for zi in zip_info if fnmatch(zi.filename, pattern)] 110 if strip_root: 111 names = [n.replace("\\", "/") for n in z.namelist()] 112 common_folder = os.path.commonprefix(names).split("/", 1)[0] 113 if not common_folder and len(names) > 1: 114 raise ConanException("The zip file contains more than 1 folder in the root") 115 if len(names) == 1 and len(names[0].split("/", 1)) == 1: 116 raise ConanException("The zip file contains a file in the root") 117 # Remove the directory entry if present 118 # Note: The "zip" format contains the "/" at the end if it is a directory 119 zip_info = [m for m in zip_info if m.filename != (common_folder + "/")] 120 for member in zip_info: 121 name = member.filename.replace("\\", "/") 122 member.filename = name.split("/", 1)[1] 123 124 uncompress_size = sum((file_.file_size for file_ in zip_info)) 125 if uncompress_size > 100000: 126 output.info("Unzipping %s, this can take a while" % human_size(uncompress_size)) 127 else: 128 output.info("Unzipping %s" % human_size(uncompress_size)) 129 extracted_size = 0 130 131 print_progress.last_size = -1 132 if platform.system() == "Windows": 133 for file_ in zip_info: 134 extracted_size += file_.file_size 135 print_progress(extracted_size, uncompress_size) 136 try: 137 z.extract(file_, full_path) 138 except Exception as e: 139 output.error("Error extract %s\n%s" % (file_.filename, str(e))) 140 else: # duplicated for, to avoid a platform check for each zipped file 141 for file_ in zip_info: 142 extracted_size += file_.file_size 143 print_progress(extracted_size, uncompress_size) 144 try: 145 z.extract(file_, full_path) 146 if keep_permissions: 147 # Could be dangerous if the ZIP has been created in a non nix system 148 # https://bugs.python.org/issue15795 149 perm = file_.external_attr >> 16 & 0xFFF 150 os.chmod(os.path.join(full_path, file_.filename), perm) 151 except Exception as e: 152 output.error("Error extract %s\n%s" % (file_.filename, str(e))) 153 output.writeln("") 154 155 156def untargz(filename, destination=".", pattern=None, strip_root=False): 157 import tarfile 158 with tarfile.TarFile.open(filename, 'r:*') as tarredgzippedFile: 159 if not pattern and not strip_root: 160 tarredgzippedFile.extractall(destination) 161 else: 162 members = tarredgzippedFile.getmembers() 163 164 if strip_root: 165 names = [n.replace("\\", "/") for n in tarredgzippedFile.getnames()] 166 common_folder = os.path.commonprefix(names).split("/", 1)[0] 167 if not common_folder and len(names) > 1: 168 raise ConanException("The tgz file contains more than 1 folder in the root") 169 if len(names) == 1 and len(names[0].split("/", 1)) == 1: 170 raise ConanException("The tgz file contains a file in the root") 171 # Remove the directory entry if present 172 members = [m for m in members if m.name != common_folder] 173 for member in members: 174 name = member.name.replace("\\", "/") 175 member.name = name.split("/", 1)[1] 176 member.path = member.name 177 if pattern: 178 members = list(filter(lambda m: fnmatch(m.name, pattern), 179 tarredgzippedFile.getmembers())) 180 tarredgzippedFile.extractall(destination, members=members) 181 182 183def check_with_algorithm_sum(algorithm_name, file_path, signature): 184 real_signature = _generic_algorithm_sum(file_path, algorithm_name) 185 if real_signature != signature.lower(): 186 raise ConanException("%s signature failed for '%s' file. \n" 187 " Provided signature: %s \n" 188 " Computed signature: %s" % (algorithm_name, 189 os.path.basename(file_path), 190 signature, 191 real_signature)) 192 193 194def check_sha1(file_path, signature): 195 check_with_algorithm_sum("sha1", file_path, signature) 196 197 198def check_md5(file_path, signature): 199 check_with_algorithm_sum("md5", file_path, signature) 200 201 202def check_sha256(file_path, signature): 203 check_with_algorithm_sum("sha256", file_path, signature) 204 205 206def patch(base_path=None, patch_file=None, patch_string=None, strip=0, output=None, fuzz=False): 207 """ Applies a diff from file (patch_file) or string (patch_string) 208 in base_path directory or current dir if None 209 :param base_path: Base path where the patch should be applied. 210 :param patch_file: Patch file that should be applied. 211 :param patch_string: Patch string that should be applied. 212 :param strip: Number of folders to be stripped from the path. 213 :param output: Stream object. 214 :param fuzz: Should accept fuzzy patches. 215 """ 216 217 class PatchLogHandler(logging.Handler): 218 def __init__(self): 219 logging.Handler.__init__(self, logging.DEBUG) 220 self.output = output or ConanOutput(sys.stdout, sys.stderr, color=True) 221 self.patchname = patch_file if patch_file else "patch_ng" 222 223 def emit(self, record): 224 logstr = self.format(record) 225 if record.levelno == logging.WARN: 226 self.output.warn("%s: %s" % (self.patchname, logstr)) 227 else: 228 self.output.info("%s: %s" % (self.patchname, logstr)) 229 230 patchlog = logging.getLogger("patch_ng") 231 if patchlog: 232 patchlog.handlers = [] 233 patchlog.addHandler(PatchLogHandler()) 234 235 if not patch_file and not patch_string: 236 return 237 if patch_file: 238 patchset = fromfile(patch_file) 239 else: 240 patchset = fromstring(patch_string.encode()) 241 242 if not patchset: 243 raise ConanException("Failed to parse patch: %s" % (patch_file if patch_file else "string")) 244 245 if not patchset.apply(root=base_path, strip=strip, fuzz=fuzz): 246 raise ConanException("Failed to apply patch: %s" % patch_file) 247 248 249def _manage_text_not_found(search, file_path, strict, function_name, output): 250 message = "%s didn't find pattern '%s' in '%s' file." % (function_name, search, file_path) 251 if strict: 252 raise ConanException(message) 253 else: 254 output.warn(message) 255 return False 256 257 258@contextmanager 259def _add_write_permissions(file_path): 260 # Assumes the file already exist in disk 261 write = stat.S_IWRITE 262 saved_permissions = os.stat(file_path).st_mode 263 if saved_permissions & write == write: 264 yield 265 return 266 try: 267 os.chmod(file_path, saved_permissions | write) 268 yield 269 finally: 270 os.chmod(file_path, saved_permissions) 271 272 273def replace_in_file(file_path, search, replace, strict=True, output=None, encoding=None): 274 output = default_output(output, 'conans.client.tools.files.replace_in_file') 275 276 encoding_in = encoding or "auto" 277 encoding_out = encoding or "utf-8" 278 content = load(file_path, encoding=encoding_in) 279 if -1 == content.find(search): 280 _manage_text_not_found(search, file_path, strict, "replace_in_file", output=output) 281 content = content.replace(search, replace) 282 content = content.encode(encoding_out) 283 with _add_write_permissions(file_path): 284 save(file_path, content, only_if_modified=False, encoding=encoding_out) 285 286 287def replace_path_in_file(file_path, search, replace, strict=True, windows_paths=None, output=None, 288 encoding=None): 289 output = default_output(output, 'conans.client.tools.files.replace_path_in_file') 290 291 if windows_paths is False or (windows_paths is None and platform.system() != "Windows"): 292 return replace_in_file(file_path, search, replace, strict=strict, output=output, 293 encoding=encoding) 294 295 def normalized_text(text): 296 return text.replace("\\", "/").lower() 297 298 encoding_in = encoding or "auto" 299 encoding_out = encoding or "utf-8" 300 content = load(file_path, encoding=encoding_in) 301 normalized_content = normalized_text(content) 302 normalized_search = normalized_text(search) 303 index = normalized_content.find(normalized_search) 304 if index == -1: 305 return _manage_text_not_found(search, file_path, strict, "replace_path_in_file", 306 output=output) 307 308 while index != -1: 309 content = content[:index] + replace + content[index + len(search):] 310 normalized_content = normalized_text(content) 311 index = normalized_content.find(normalized_search) 312 313 content = content.encode(encoding_out) 314 with _add_write_permissions(file_path): 315 save(file_path, content, only_if_modified=False, encoding=encoding_out) 316 317 return True 318 319 320def replace_prefix_in_pc_file(pc_file, new_prefix): 321 content = load(pc_file) 322 lines = [] 323 for line in content.splitlines(): 324 if line.startswith("prefix="): 325 lines.append('prefix=%s' % new_prefix) 326 else: 327 lines.append(line) 328 with _add_write_permissions(pc_file): 329 save(pc_file, "\n".join(lines)) 330 331 332def _path_equals(path1, path2): 333 path1 = os.path.normpath(path1) 334 path2 = os.path.normpath(path2) 335 if platform.system() == "Windows": 336 path1 = path1.lower().replace("sysnative", "system32") 337 path2 = path2.lower().replace("sysnative", "system32") 338 return path1 == path2 339 340 341def collect_libs(conanfile, folder=None): 342 if not conanfile.package_folder: 343 return [] 344 if folder: 345 lib_folders = [os.path.join(conanfile.package_folder, folder)] 346 else: 347 lib_folders = [os.path.join(conanfile.package_folder, folder) 348 for folder in conanfile.cpp_info.libdirs] 349 result = [] 350 for lib_folder in lib_folders: 351 if not os.path.exists(lib_folder): 352 conanfile.output.warn("Lib folder doesn't exist, can't collect libraries: " 353 "{0}".format(lib_folder)) 354 continue 355 files = os.listdir(lib_folder) 356 for f in files: 357 name, ext = os.path.splitext(f) 358 if ext in VALID_LIB_EXTENSIONS: 359 if ext != ".lib" and name.startswith("lib"): 360 name = name[3:] 361 if name in result: 362 conanfile.output.warn("Library '%s' was either already found in a previous " 363 "'conanfile.cpp_info.libdirs' folder or appears several " 364 "times with a different file extension" % name) 365 else: 366 result.append(name) 367 result.sort() 368 return result 369 370 371def which(filename): 372 """ same affect as posix which command or shutil.which from python3 """ 373 # FIXME: Replace with shutil.which in Conan 2.0 374 def verify(file_abspath): 375 return os.path.isfile(file_abspath) and os.access(file_abspath, os.X_OK) 376 377 def _get_possible_filenames(fname): 378 if platform.system() != "Windows": 379 extensions = [".sh", ""] 380 else: 381 if "." in filename: # File comes with extension already 382 extensions = [""] 383 else: 384 pathext = os.getenv("PATHEXT", ".COM;.EXE;.BAT;.CMD").split(";") 385 extensions = [extension.lower() for extension in pathext] 386 extensions.insert(1, "") # No extension 387 return ["%s%s" % (fname, extension) for extension in extensions] 388 389 possible_names = _get_possible_filenames(filename) 390 for path in os.environ["PATH"].split(os.pathsep): 391 for name in possible_names: 392 filepath = os.path.abspath(os.path.join(path, name)) 393 if verify(filepath): 394 return filepath 395 if platform.system() == "Windows": 396 filepath = filepath.lower() 397 if "system32" in filepath: 398 # python return False for os.path.exists of exes in System32 but with SysNative 399 trick_path = filepath.replace("system32", "sysnative") 400 if verify(trick_path): 401 return trick_path 402 403 return None 404 405 406def _replace_with_separator(filepath, sep): 407 tmp = load(filepath) 408 ret = sep.join(tmp.splitlines()) 409 if tmp.endswith("\n"): 410 ret += sep 411 save(filepath, ret) 412 413 414def unix2dos(filepath): 415 _replace_with_separator(filepath, "\r\n") 416 417 418def dos2unix(filepath): 419 _replace_with_separator(filepath, "\n") 420 421 422def rename(src, dst): 423 # FIXME: Deprecated, use new interface from conan.tools 424 """ 425 rename a file or folder to avoid "Access is denied" error on Windows 426 :param src: Source file or folder 427 :param dst: Destination file or folder 428 """ 429 if os.path.exists(dst): 430 raise ConanException("rename {} to {} failed, dst exists.".format(src, dst)) 431 432 if platform.system() == "Windows" and which("robocopy") and os.path.isdir(src): 433 # /move Moves files and directories, and deletes them from the source after they are copied. 434 # /e Copies subdirectories. Note that this option includes empty directories. 435 # /ndl Specifies that directory names are not to be logged. 436 # /nfl Specifies that file names are not to be logged. 437 process = subprocess.Popen(["robocopy", "/move", "/e", "/ndl", "/nfl", src, dst], 438 stdout=subprocess.PIPE) 439 process.communicate() 440 if process.returncode > 7: # https://ss64.com/nt/robocopy-exit.html 441 raise ConanException("rename {} to {} failed.".format(src, dst)) 442 else: 443 try: 444 os.rename(src, dst) 445 except Exception as err: 446 raise ConanException("rename {} to {} failed: {}".format(src, dst, err)) 447 448 449def remove_files_by_mask(directory, pattern): 450 removed_names = [] 451 for root, _, filenames in os.walk(directory): 452 for filename in filenames: 453 if fnmatch(filename, pattern): 454 fullname = os.path.join(root, filename) 455 os.unlink(fullname) 456 removed_names.append(os.path.relpath(fullname, directory)) 457 return removed_names 458 459 460def fix_symlinks(conanfile, raise_if_error=False): 461 """ Fix the symlinks in the conanfile.package_folder: make symlinks relative and remove 462 those links to files outside the package (it will print an error, or raise 463 if 'raise_if_error' evaluates to true). 464 """ 465 offending_files = [] 466 467 def work_on_element(dirpath, element, token): 468 fullpath = os.path.join(dirpath, element) 469 if not os.path.islink(fullpath): 470 return 471 472 link_target = os.readlink(fullpath) 473 if link_target in ['/dev/null', ]: 474 return 475 476 link_abs_target = os.path.join(dirpath, link_target) 477 link_rel_target = os.path.relpath(link_abs_target, conanfile.package_folder) 478 if link_rel_target.startswith('..') or os.path.isabs(link_rel_target): 479 offending_file = os.path.relpath(fullpath, conanfile.package_folder) 480 offending_files.append(offending_file) 481 conanfile.output.error("{token} '{item}' links to a {token} outside the package, " 482 "it's been removed.".format(item=offending_file, token=token)) 483 os.unlink(fullpath) 484 elif not os.path.exists(link_abs_target): 485 # This is a broken symlink. Failure is controlled by config variable 486 # 'general.skip_broken_symlinks_check'. Do not fail here. 487 offending_file = os.path.relpath(fullpath, conanfile.package_folder) 488 offending_files.append(offending_file) 489 conanfile.output.error("{token} '{item}' links to a path that doesn't exist, it's" 490 " been removed.".format(item=offending_file, token=token)) 491 os.unlink(fullpath) 492 elif link_target != link_rel_target: 493 os.unlink(fullpath) 494 os.symlink(link_rel_target, fullpath) 495 496 for (dirpath, dirnames, filenames) in os.walk(conanfile.package_folder): 497 for filename in filenames: 498 work_on_element(dirpath, filename, token="file") 499 500 for dirname in dirnames: 501 work_on_element(dirpath, dirname, token="directory") 502 503 if offending_files and raise_if_error: 504 raise ConanException("There are invalid symlinks in the package!") 505