1from __future__ import annotations
2
3import atexit
4import contextlib
5import functools
6import io
7import logging
8import os
9import sys
10from itertools import zip_longest
11from tempfile import mktemp
12from typing import Any, Callable, Iterator, List, Optional, Sequence, Union
13
14import click
15from click._compat import strip_ansi
16
17from pdm._vendor import colorama, halo
18from pdm._vendor.log_symbols.symbols import is_supported as supports_unicode
19
20logger = logging.getLogger(__name__)
21logger.setLevel(logging.DEBUG)
22logger.addHandler(logging.NullHandler())
23
24
25def ljust(text: str, length: int) -> str:
26    """Like str.ljust() but ignore all ANSI controlling characters."""
27    return text + " " * (length - len(strip_ansi(text)))
28
29
30def rjust(text: str, length: int) -> str:
31    """Like str.rjust() but ignore all ANSI controlling characters."""
32    return " " * (length - len(strip_ansi(text))) + text
33
34
35def centerize(text: str, length: int) -> str:
36    """Centerize the text while ignoring ANSI controlling characters."""
37    space_num = length - len(strip_ansi(text))
38    left_space = space_num // 2
39    return " " * left_space + text + " " * (space_num - left_space)
40
41
42def supports_ansi() -> bool:
43    if os.getenv("CI") or not hasattr(sys.stdout, "fileno"):
44        return False
45    if sys.platform == "win32":
46        return (
47            os.getenv("ANSICON") is not None
48            or os.getenv("WT_SESSION") is not None
49            or "ON" == os.getenv("ConEmuANSI")
50            or "xterm" == os.getenv("Term")
51        )
52
53    try:
54        return os.isatty(sys.stdout.fileno())
55    except io.UnsupportedOperation:
56        return False
57
58
59# Export some style shortcut helpers
60green = functools.partial(click.style, fg="green")
61red = functools.partial(click.style, fg="red")
62yellow = functools.partial(click.style, fg="yellow")
63cyan = functools.partial(click.style, fg="cyan")
64blue = functools.partial(click.style, fg="blue")
65bold = functools.partial(click.style, bold=True)
66
67# Verbosity levels
68NORMAL = 0
69DETAIL = 1
70DEBUG = 2
71
72
73class DummySpinner:
74    """A dummy spinner class implementing needed interfaces.
75    But only display text onto screen.
76    """
77
78    def start(self, text: str) -> None:
79        click.echo(text)
80
81    def stop_and_persist(self, symbol: str = " ", text: Optional[str] = None) -> None:
82        click.echo(symbol + " " + (text or ""))
83
84    succeed = fail = start
85
86    text = property(lambda self: "", start)
87
88    def __enter__(self) -> DummySpinner:
89        return self
90
91    def __exit__(self, *args: Any) -> None:
92        pass
93
94
95class UI:
96    """Terminal UI object"""
97
98    def __init__(self, verbosity: int = NORMAL, no_ansi: Optional[bool] = None) -> None:
99        self.verbosity = verbosity
100        self._indent = ""
101        self.supports_ansi = not no_ansi if no_ansi is not None else supports_ansi()
102        if not self.supports_ansi:
103            colorama.init()
104        else:
105            colorama.deinit()
106
107    def set_verbosity(self, verbosity: int) -> None:
108        self.verbosity = verbosity
109
110    def echo(
111        self,
112        message: str = "",
113        err: bool = False,
114        verbosity: int = NORMAL,
115        **kwargs: Any,
116    ) -> None:
117        if self.verbosity >= verbosity:
118            click.secho(
119                self._indent + str(message), err=err, color=self.supports_ansi, **kwargs
120            )
121
122    def display_columns(
123        self, rows: Sequence[Sequence[str]], header: Optional[List[str]] = None
124    ) -> None:
125        """Print rows in aligned columns.
126
127        :param rows: a rows of data to be displayed.
128        :param header: a list of header strings.
129        """
130
131        def get_aligner(align: str) -> Callable:
132            if align == ">":
133                return rjust
134            if align == "^":
135                return centerize
136            else:
137                return ljust
138
139        sizes = list(
140            map(
141                lambda column: max(map(lambda x: len(strip_ansi(x)), column)),
142                zip_longest(header or [], *rows, fillvalue=""),
143            )
144        )
145
146        aligners = [ljust] * len(sizes)
147        if header:
148            aligners = []
149            for i, head in enumerate(header):
150                aligners.append(get_aligner(head[0]))
151                if head[0] in (">", "^", "<"):
152                    header[i] = head[1:]
153            self.echo(
154                " ".join(
155                    aligner(head, size)
156                    for aligner, head, size in zip(aligners, header, sizes)
157                ).rstrip()
158            )
159            # Print a separator
160            self.echo(" ".join("-" * size for size in sizes))
161        for row in rows:
162            self.echo(
163                " ".join(
164                    aligner(item, size)
165                    for aligner, item, size in zip(aligners, row, sizes)
166                ).rstrip()
167            )
168
169    @contextlib.contextmanager
170    def indent(self, prefix: str) -> Iterator[None]:
171        """Indent the following lines with a prefix."""
172        _indent = self._indent
173        self._indent += prefix
174        yield
175        self._indent = _indent
176
177    @contextlib.contextmanager
178    def logging(self, type_: str = "install") -> Iterator[logging.Logger]:
179        """A context manager that opens a file for logging when verbosity is NORMAL or
180        print to the stdout otherwise.
181        """
182        file_name = mktemp(".log", f"pdm-{type_}-")
183
184        if self.verbosity >= DETAIL:
185            handler = logging.StreamHandler()
186        else:
187            handler = logging.FileHandler(file_name, encoding="utf-8")
188        handler.setLevel(logging.DEBUG)
189        logger.handlers[1:] = [handler]
190        pip_logger = logging.getLogger("pip.subprocessor")
191        pip_logger.handlers[:] = [handler]
192
193        def cleanup() -> None:
194            try:
195                os.unlink(file_name)
196            except OSError:
197                pass
198
199        try:
200            yield logger
201        except Exception:
202            if self.verbosity < DETAIL:
203                logger.exception("Error occurs")
204                self.echo(yellow(f"See {file_name} for detailed debug log."), err=True)
205            raise
206        else:
207            atexit.register(cleanup)
208        finally:
209            logger.handlers.remove(handler)
210            pip_logger.handlers.remove(handler)
211
212    def open_spinner(
213        self, title: str, spinner: str = "dots"
214    ) -> Union[DummySpinner, halo.Halo]:
215        """Open a spinner as a context manager."""
216        if self.verbosity >= DETAIL or not self.supports_ansi:
217            return DummySpinner()
218        else:
219            return halo.Halo(  # type: ignore
220                title, spinner=spinner, indent=self._indent
221            )
222
223
224class Emoji:
225    """A collection of emoji characters used in terminal output"""
226
227    if supports_unicode():  # type: ignore
228        SUCC = "��"
229        LOCK = "��"
230    else:
231        SUCC = ""
232        LOCK = ""
233