1import gzip
2import logging
3import os
4import platform
5import stat
6import subprocess
7import sys
8from contextlib import contextmanager
9from fnmatch import fnmatch
10
11import six
12from patch_ng import fromfile, fromstring
13
14from conans.client.output import ConanOutput
15from conans.errors import ConanException
16from conans.util.fallbacks import default_output
17from conans.util.files import (_generic_algorithm_sum, load, save)
18
19UNIT_SIZE = 1000.0
20# Library extensions supported by collect_libs
21VALID_LIB_EXTENSIONS = (".so", ".lib", ".a", ".dylib", ".bc")
22
23
24@contextmanager
25def chdir(newdir):
26    old_path = os.getcwd()
27    os.chdir(newdir)
28    try:
29        yield
30    finally:
31        os.chdir(old_path)
32
33
34def human_size(size_bytes):
35    """
36    format a size in bytes into a 'human' file size, e.g. B, KB, MB, GB, TB, PB
37    Note that bytes will be reported in whole numbers but KB and above will have
38    greater precision.  e.g. 43 B, 443 KB, 4.3 MB, 4.43 GB, etc
39    """
40
41    suffixes_table = [('B', 0), ('KB', 1), ('MB', 1), ('GB', 2), ('TB', 2), ('PB', 2)]
42
43    num = float(size_bytes)
44    for suffix, precision in suffixes_table:
45        if num < UNIT_SIZE:
46            break
47        num /= UNIT_SIZE
48
49    if precision == 0:
50        formatted_size = "%d" % num
51    else:
52        formatted_size = str(round(num, ndigits=precision))
53
54    return "%s%s" % (formatted_size, suffix)
55
56
57def unzip(filename, destination=".", keep_permissions=False, pattern=None, output=None,
58          strip_root=False):
59    """
60    Unzip a zipped file
61    :param filename: Path to the zip file
62    :param destination: Destination folder (or file for .gz files)
63    :param keep_permissions: Keep the zip permissions. WARNING: Can be
64    dangerous if the zip was not created in a NIX system, the bits could
65    produce undefined permission schema. Use this option only if you are sure
66    that the zip was created correctly.
67    :param pattern: Extract only paths matching the pattern. This should be a
68    Unix shell-style wildcard, see fnmatch documentation for more details.
69    :param output: output
70    :param flat: If all the contents are in a single dir, flat that directory.
71    :return:
72    """
73    output = default_output(output, 'conans.client.tools.files.unzip')
74
75    if (filename.endswith(".tar.gz") or filename.endswith(".tgz") or
76            filename.endswith(".tbz2") or filename.endswith(".tar.bz2") or
77            filename.endswith(".tar")):
78        return untargz(filename, destination, pattern, strip_root)
79    if filename.endswith(".gz"):
80        with gzip.open(filename, 'rb') as f:
81            file_content = f.read()
82        target_name = filename[:-3] if destination == "." else destination
83        save(target_name, file_content)
84        return
85    if filename.endswith(".tar.xz") or filename.endswith(".txz"):
86        if six.PY2:
87            raise ConanException("XZ format not supported in Python 2. Use Python 3 instead")
88        return untargz(filename, destination, pattern, strip_root)
89
90    import zipfile
91    full_path = os.path.normpath(os.path.join(os.getcwd(), destination))
92
93    if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
94        def print_progress(the_size, uncomp_size):
95            the_size = (the_size * 100.0 / uncomp_size) if uncomp_size != 0 else 0
96            txt_msg = "Unzipping %d %%"
97            if the_size > print_progress.last_size + 1:
98                output.rewrite_line(txt_msg % the_size)
99                print_progress.last_size = the_size
100                if int(the_size) == 99:
101                    output.rewrite_line(txt_msg % 100)
102    else:
103        def print_progress(_, __):
104            pass
105
106    with zipfile.ZipFile(filename, "r") as z:
107        zip_info = z.infolist()
108        if pattern:
109            zip_info = [zi for zi in zip_info if fnmatch(zi.filename, pattern)]
110        if strip_root:
111            names = [n.replace("\\", "/") for n in z.namelist()]
112            common_folder = os.path.commonprefix(names).split("/", 1)[0]
113            if not common_folder and len(names) > 1:
114                raise ConanException("The zip file contains more than 1 folder in the root")
115            if len(names) == 1 and len(names[0].split("/", 1)) == 1:
116                raise ConanException("The zip file contains a file in the root")
117            # Remove the directory entry if present
118            # Note: The "zip" format contains the "/" at the end if it is a directory
119            zip_info = [m for m in zip_info if m.filename != (common_folder + "/")]
120            for member in zip_info:
121                name = member.filename.replace("\\", "/")
122                member.filename = name.split("/", 1)[1]
123
124        uncompress_size = sum((file_.file_size for file_ in zip_info))
125        if uncompress_size > 100000:
126            output.info("Unzipping %s, this can take a while" % human_size(uncompress_size))
127        else:
128            output.info("Unzipping %s" % human_size(uncompress_size))
129        extracted_size = 0
130
131        print_progress.last_size = -1
132        if platform.system() == "Windows":
133            for file_ in zip_info:
134                extracted_size += file_.file_size
135                print_progress(extracted_size, uncompress_size)
136                try:
137                    z.extract(file_, full_path)
138                except Exception as e:
139                    output.error("Error extract %s\n%s" % (file_.filename, str(e)))
140        else:  # duplicated for, to avoid a platform check for each zipped file
141            for file_ in zip_info:
142                extracted_size += file_.file_size
143                print_progress(extracted_size, uncompress_size)
144                try:
145                    z.extract(file_, full_path)
146                    if keep_permissions:
147                        # Could be dangerous if the ZIP has been created in a non nix system
148                        # https://bugs.python.org/issue15795
149                        perm = file_.external_attr >> 16 & 0xFFF
150                        os.chmod(os.path.join(full_path, file_.filename), perm)
151                except Exception as e:
152                    output.error("Error extract %s\n%s" % (file_.filename, str(e)))
153        output.writeln("")
154
155
156def untargz(filename, destination=".", pattern=None, strip_root=False):
157    import tarfile
158    with tarfile.TarFile.open(filename, 'r:*') as tarredgzippedFile:
159        if not pattern and not strip_root:
160            tarredgzippedFile.extractall(destination)
161        else:
162            members = tarredgzippedFile.getmembers()
163
164            if strip_root:
165                names = [n.replace("\\", "/") for n in tarredgzippedFile.getnames()]
166                common_folder = os.path.commonprefix(names).split("/", 1)[0]
167                if not common_folder and len(names) > 1:
168                    raise ConanException("The tgz file contains more than 1 folder in the root")
169                if len(names) == 1 and len(names[0].split("/", 1)) == 1:
170                    raise ConanException("The tgz file contains a file in the root")
171                # Remove the directory entry if present
172                members = [m for m in members if m.name != common_folder]
173                for member in members:
174                    name = member.name.replace("\\", "/")
175                    member.name = name.split("/", 1)[1]
176                    member.path = member.name
177            if pattern:
178                members = list(filter(lambda m: fnmatch(m.name, pattern),
179                                      tarredgzippedFile.getmembers()))
180            tarredgzippedFile.extractall(destination, members=members)
181
182
183def check_with_algorithm_sum(algorithm_name, file_path, signature):
184    real_signature = _generic_algorithm_sum(file_path, algorithm_name)
185    if real_signature != signature.lower():
186        raise ConanException("%s signature failed for '%s' file. \n"
187                             " Provided signature: %s  \n"
188                             " Computed signature: %s" % (algorithm_name,
189                                                          os.path.basename(file_path),
190                                                          signature,
191                                                          real_signature))
192
193
194def check_sha1(file_path, signature):
195    check_with_algorithm_sum("sha1", file_path, signature)
196
197
198def check_md5(file_path, signature):
199    check_with_algorithm_sum("md5", file_path, signature)
200
201
202def check_sha256(file_path, signature):
203    check_with_algorithm_sum("sha256", file_path, signature)
204
205
206def patch(base_path=None, patch_file=None, patch_string=None, strip=0, output=None, fuzz=False):
207    """ Applies a diff from file (patch_file)  or string (patch_string)
208        in base_path directory or current dir if None
209    :param base_path: Base path where the patch should be applied.
210    :param patch_file: Patch file that should be applied.
211    :param patch_string: Patch string that should be applied.
212    :param strip: Number of folders to be stripped from the path.
213    :param output: Stream object.
214    :param fuzz: Should accept fuzzy patches.
215    """
216
217    class PatchLogHandler(logging.Handler):
218        def __init__(self):
219            logging.Handler.__init__(self, logging.DEBUG)
220            self.output = output or ConanOutput(sys.stdout, sys.stderr, color=True)
221            self.patchname = patch_file if patch_file else "patch_ng"
222
223        def emit(self, record):
224            logstr = self.format(record)
225            if record.levelno == logging.WARN:
226                self.output.warn("%s: %s" % (self.patchname, logstr))
227            else:
228                self.output.info("%s: %s" % (self.patchname, logstr))
229
230    patchlog = logging.getLogger("patch_ng")
231    if patchlog:
232        patchlog.handlers = []
233        patchlog.addHandler(PatchLogHandler())
234
235    if not patch_file and not patch_string:
236        return
237    if patch_file:
238        patchset = fromfile(patch_file)
239    else:
240        patchset = fromstring(patch_string.encode())
241
242    if not patchset:
243        raise ConanException("Failed to parse patch: %s" % (patch_file if patch_file else "string"))
244
245    if not patchset.apply(root=base_path, strip=strip, fuzz=fuzz):
246        raise ConanException("Failed to apply patch: %s" % patch_file)
247
248
249def _manage_text_not_found(search, file_path, strict, function_name, output):
250    message = "%s didn't find pattern '%s' in '%s' file." % (function_name, search, file_path)
251    if strict:
252        raise ConanException(message)
253    else:
254        output.warn(message)
255        return False
256
257
258@contextmanager
259def _add_write_permissions(file_path):
260    # Assumes the file already exist in disk
261    write = stat.S_IWRITE
262    saved_permissions = os.stat(file_path).st_mode
263    if saved_permissions & write == write:
264        yield
265        return
266    try:
267        os.chmod(file_path, saved_permissions | write)
268        yield
269    finally:
270        os.chmod(file_path, saved_permissions)
271
272
273def replace_in_file(file_path, search, replace, strict=True, output=None, encoding=None):
274    output = default_output(output, 'conans.client.tools.files.replace_in_file')
275
276    encoding_in = encoding or "auto"
277    encoding_out = encoding or "utf-8"
278    content = load(file_path, encoding=encoding_in)
279    if -1 == content.find(search):
280        _manage_text_not_found(search, file_path, strict, "replace_in_file", output=output)
281    content = content.replace(search, replace)
282    content = content.encode(encoding_out)
283    with _add_write_permissions(file_path):
284        save(file_path, content, only_if_modified=False, encoding=encoding_out)
285
286
287def replace_path_in_file(file_path, search, replace, strict=True, windows_paths=None, output=None,
288                         encoding=None):
289    output = default_output(output, 'conans.client.tools.files.replace_path_in_file')
290
291    if windows_paths is False or (windows_paths is None and platform.system() != "Windows"):
292        return replace_in_file(file_path, search, replace, strict=strict, output=output,
293                               encoding=encoding)
294
295    def normalized_text(text):
296        return text.replace("\\", "/").lower()
297
298    encoding_in = encoding or "auto"
299    encoding_out = encoding or "utf-8"
300    content = load(file_path, encoding=encoding_in)
301    normalized_content = normalized_text(content)
302    normalized_search = normalized_text(search)
303    index = normalized_content.find(normalized_search)
304    if index == -1:
305        return _manage_text_not_found(search, file_path, strict, "replace_path_in_file",
306                                      output=output)
307
308    while index != -1:
309        content = content[:index] + replace + content[index + len(search):]
310        normalized_content = normalized_text(content)
311        index = normalized_content.find(normalized_search)
312
313    content = content.encode(encoding_out)
314    with _add_write_permissions(file_path):
315        save(file_path, content, only_if_modified=False, encoding=encoding_out)
316
317    return True
318
319
320def replace_prefix_in_pc_file(pc_file, new_prefix):
321    content = load(pc_file)
322    lines = []
323    for line in content.splitlines():
324        if line.startswith("prefix="):
325            lines.append('prefix=%s' % new_prefix)
326        else:
327            lines.append(line)
328    with _add_write_permissions(pc_file):
329        save(pc_file, "\n".join(lines))
330
331
332def _path_equals(path1, path2):
333    path1 = os.path.normpath(path1)
334    path2 = os.path.normpath(path2)
335    if platform.system() == "Windows":
336        path1 = path1.lower().replace("sysnative", "system32")
337        path2 = path2.lower().replace("sysnative", "system32")
338    return path1 == path2
339
340
341def collect_libs(conanfile, folder=None):
342    if not conanfile.package_folder:
343        return []
344    if folder:
345        lib_folders = [os.path.join(conanfile.package_folder, folder)]
346    else:
347        lib_folders = [os.path.join(conanfile.package_folder, folder)
348                       for folder in conanfile.cpp_info.libdirs]
349    result = []
350    for lib_folder in lib_folders:
351        if not os.path.exists(lib_folder):
352            conanfile.output.warn("Lib folder doesn't exist, can't collect libraries: "
353                                  "{0}".format(lib_folder))
354            continue
355        files = os.listdir(lib_folder)
356        for f in files:
357            name, ext = os.path.splitext(f)
358            if ext in VALID_LIB_EXTENSIONS:
359                if ext != ".lib" and name.startswith("lib"):
360                    name = name[3:]
361                if name in result:
362                    conanfile.output.warn("Library '%s' was either already found in a previous "
363                                          "'conanfile.cpp_info.libdirs' folder or appears several "
364                                          "times with a different file extension" % name)
365                else:
366                    result.append(name)
367    result.sort()
368    return result
369
370
371def which(filename):
372    """ same affect as posix which command or shutil.which from python3 """
373    # FIXME: Replace with shutil.which in Conan 2.0
374    def verify(file_abspath):
375        return os.path.isfile(file_abspath) and os.access(file_abspath, os.X_OK)
376
377    def _get_possible_filenames(fname):
378        if platform.system() != "Windows":
379            extensions = [".sh", ""]
380        else:
381            if "." in filename:  # File comes with extension already
382                extensions = [""]
383            else:
384                pathext = os.getenv("PATHEXT", ".COM;.EXE;.BAT;.CMD").split(";")
385                extensions = [extension.lower() for extension in pathext]
386                extensions.insert(1, "")  # No extension
387        return ["%s%s" % (fname, extension) for extension in extensions]
388
389    possible_names = _get_possible_filenames(filename)
390    for path in os.environ["PATH"].split(os.pathsep):
391        for name in possible_names:
392            filepath = os.path.abspath(os.path.join(path, name))
393            if verify(filepath):
394                return filepath
395            if platform.system() == "Windows":
396                filepath = filepath.lower()
397                if "system32" in filepath:
398                    # python return False for os.path.exists of exes in System32 but with SysNative
399                    trick_path = filepath.replace("system32", "sysnative")
400                    if verify(trick_path):
401                        return trick_path
402
403    return None
404
405
406def _replace_with_separator(filepath, sep):
407    tmp = load(filepath)
408    ret = sep.join(tmp.splitlines())
409    if tmp.endswith("\n"):
410        ret += sep
411    save(filepath, ret)
412
413
414def unix2dos(filepath):
415    _replace_with_separator(filepath, "\r\n")
416
417
418def dos2unix(filepath):
419    _replace_with_separator(filepath, "\n")
420
421
422def rename(src, dst):
423    # FIXME: Deprecated, use new interface from conan.tools
424    """
425    rename a file or folder to avoid "Access is denied" error on Windows
426    :param src: Source file or folder
427    :param dst: Destination file or folder
428    """
429    if os.path.exists(dst):
430        raise ConanException("rename {} to {} failed, dst exists.".format(src, dst))
431
432    if platform.system() == "Windows" and which("robocopy") and os.path.isdir(src):
433        # /move Moves files and directories, and deletes them from the source after they are copied.
434        # /e Copies subdirectories. Note that this option includes empty directories.
435        # /ndl Specifies that directory names are not to be logged.
436        # /nfl Specifies that file names are not to be logged.
437        process = subprocess.Popen(["robocopy", "/move", "/e", "/ndl", "/nfl", src, dst],
438                                   stdout=subprocess.PIPE)
439        process.communicate()
440        if process.returncode > 7:  # https://ss64.com/nt/robocopy-exit.html
441            raise ConanException("rename {} to {} failed.".format(src, dst))
442    else:
443        try:
444            os.rename(src, dst)
445        except Exception as err:
446            raise ConanException("rename {} to {} failed: {}".format(src, dst, err))
447
448
449def remove_files_by_mask(directory, pattern):
450    removed_names = []
451    for root, _, filenames in os.walk(directory):
452        for filename in filenames:
453            if fnmatch(filename, pattern):
454                fullname = os.path.join(root, filename)
455                os.unlink(fullname)
456                removed_names.append(os.path.relpath(fullname, directory))
457    return removed_names
458
459
460def fix_symlinks(conanfile, raise_if_error=False):
461    """ Fix the symlinks in the conanfile.package_folder: make symlinks relative and remove
462        those links to files outside the package (it will print an error, or raise
463        if 'raise_if_error' evaluates to true).
464    """
465    offending_files = []
466
467    def work_on_element(dirpath, element, token):
468        fullpath = os.path.join(dirpath, element)
469        if not os.path.islink(fullpath):
470            return
471
472        link_target = os.readlink(fullpath)
473        if link_target in ['/dev/null', ]:
474            return
475
476        link_abs_target = os.path.join(dirpath, link_target)
477        link_rel_target = os.path.relpath(link_abs_target, conanfile.package_folder)
478        if link_rel_target.startswith('..') or os.path.isabs(link_rel_target):
479            offending_file = os.path.relpath(fullpath, conanfile.package_folder)
480            offending_files.append(offending_file)
481            conanfile.output.error("{token} '{item}' links to a {token} outside the package, "
482                                   "it's been removed.".format(item=offending_file, token=token))
483            os.unlink(fullpath)
484        elif not os.path.exists(link_abs_target):
485            # This is a broken symlink. Failure is controlled by config variable
486            #  'general.skip_broken_symlinks_check'. Do not fail here.
487            offending_file = os.path.relpath(fullpath, conanfile.package_folder)
488            offending_files.append(offending_file)
489            conanfile.output.error("{token} '{item}' links to a path that doesn't exist, it's"
490                                   " been removed.".format(item=offending_file, token=token))
491            os.unlink(fullpath)
492        elif link_target != link_rel_target:
493            os.unlink(fullpath)
494            os.symlink(link_rel_target, fullpath)
495
496    for (dirpath, dirnames, filenames) in os.walk(conanfile.package_folder):
497        for filename in filenames:
498            work_on_element(dirpath, filename, token="file")
499
500        for dirname in dirnames:
501            work_on_element(dirpath, dirname, token="directory")
502
503    if offending_files and raise_if_error:
504        raise ConanException("There are invalid symlinks in the package!")
505