1# © 2020 James R. Barlow: github.com/jbarlow83
2#
3# This Source Code Form is subject to the terms of the Mozilla Public
4# License, v. 2.0. If a copy of the MPL was not distributed with this
5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7
8"""Wrappers to manage subprocess calls"""
9
10import logging
11import os
12import re
13import sys
14from collections.abc import Mapping
15from contextlib import suppress
16from distutils.version import LooseVersion, Version
17from functools import lru_cache
18from subprocess import PIPE, STDOUT, CalledProcessError, CompletedProcess, Popen
19from subprocess import run as subprocess_run
20from typing import Callable, Optional, Type, Union
21
22from ocrmypdf.exceptions import MissingDependencyError
23
24# pylint: disable=logging-format-interpolation
25
26log = logging.getLogger(__name__)
27
28
29def run(
30    args, *, env=None, logs_errors_to_stdout: bool = False, **kwargs
31) -> CompletedProcess:
32    """Wrapper around :py:func:`subprocess.run`
33
34    The main purpose of this wrapper is to log subprocess output in an orderly
35    fashion that indentifies the responsible subprocess. An additional
36    task is that this function goes to greater lengths to find possible Windows
37    locations of our dependencies when they are not on the system PATH.
38
39    Arguments should be identical to ``subprocess.run``, except for following:
40
41    Arguments:
42        logs_errors_to_stdout: If True, indicates that the process writes its error
43            messages to stdout rather than stderr, so stdout should be logged
44            if there is an error. If False, stderr is logged. Could be used with
45            stderr=STDOUT, stdout=PIPE for example.
46    """
47    args, env, process_log, _text = _fix_process_args(args, env, kwargs)
48
49    stderr = None
50    stderr_name = 'stderr' if not logs_errors_to_stdout else 'stdout'
51    try:
52        proc = subprocess_run(args, env=env, **kwargs)
53    except CalledProcessError as e:
54        stderr = getattr(e, stderr_name, None)
55        raise
56    else:
57        stderr = getattr(proc, stderr_name, None)
58    finally:
59        if process_log.isEnabledFor(logging.DEBUG) and stderr:
60            with suppress(AttributeError, UnicodeDecodeError):
61                stderr = stderr.decode('utf-8', 'replace')
62            if logs_errors_to_stdout:
63                process_log.debug("stdout/stderr = %s", stderr)
64            else:
65                process_log.debug("stderr = %s", stderr)
66    return proc
67
68
69def run_polling_stderr(
70    args, *, callback: Callable[[str], None], check: bool = False, env=None, **kwargs
71) -> CompletedProcess:
72    """Run a process like ``ocrmypdf.subprocess.run``, and poll stderr.
73
74    Every line of produced by stderr will be forwarded to the callback function.
75    The intended use is monitoring progress of subprocesses that output their
76    own progress indicators. In addition, each line will be logged if debug
77    logging is enabled.
78
79    Requires stderr to be opened in text mode for ease of handling errors. In
80    addition the expected encoding= and errors= arguments should be set. Note
81    that if stdout is already set up, it need not be binary.
82    """
83    args, env, process_log, text = _fix_process_args(args, env, kwargs)
84    assert text, "Must use text=True"
85
86    with Popen(args, env=env, **kwargs) as proc:
87        lines = []
88        while proc.poll() is None:
89            if proc.stderr is None:
90                continue
91            for msg in iter(proc.stderr.readline, ''):
92                if process_log.isEnabledFor(logging.DEBUG):
93                    process_log.debug(msg.strip())
94                callback(msg)
95                lines.append(msg)
96        stderr = ''.join(lines)
97
98        if check and proc.returncode != 0:
99            raise CalledProcessError(proc.returncode, args, output=None, stderr=stderr)
100        return CompletedProcess(args, proc.returncode, None, stderr=stderr)
101
102
103def _fix_process_args(args, env, kwargs):
104    assert 'universal_newlines' not in kwargs, "Use text= instead of universal_newlines"
105
106    if not env:
107        env = os.environ
108
109    # Search in spoof path if necessary
110    program = str(args[0])
111
112    if os.name == 'nt':
113        from ocrmypdf.subprocess._windows import fix_windows_args
114
115        args = fix_windows_args(program, args, env)
116
117    log.debug("Running: %s", args)
118    process_log = log.getChild(os.path.basename(program))
119    text = kwargs.get('text', False)
120    if sys.version_info < (3, 7):
121        if os.name == 'nt':
122            # Can't use close_fds=True on Windows with Python 3.6 or older
123            # https://bugs.python.org/issue19575, etc.
124            kwargs['close_fds'] = False
125        if 'text' in kwargs:
126            # Convert run(...text=) to run(...universal_newlines=) for Python 3.6
127            kwargs['universal_newlines'] = kwargs['text']
128            del kwargs['text']
129    return args, env, process_log, text
130
131
132@lru_cache(maxsize=None)
133def get_version(
134    program: str, *, version_arg: str = '--version', regex=r'(\d+(\.\d+)*)', env=None
135):
136    """Get the version of the specified program
137
138    Arguments:
139        program: The program to version check.
140        version_arg: The argument needed to ask for its version, e.g. ``--version``.
141        regex: A regular expression to parse the program's output and obtain the
142            version.
143        env: Custom ``os.environ`` in which to run program.
144    """
145    args_prog = [program, version_arg]
146    try:
147        proc = run(
148            args_prog,
149            close_fds=True,
150            text=True,
151            stdout=PIPE,
152            stderr=STDOUT,
153            check=True,
154            env=env,
155        )
156        output = proc.stdout
157    except FileNotFoundError as e:
158        raise MissingDependencyError(
159            f"Could not find program '{program}' on the PATH"
160        ) from e
161    except CalledProcessError as e:
162        if e.returncode != 0:
163            raise MissingDependencyError(
164                f"Ran program '{program}' but it exited with an error:\n{e.output}"
165            ) from e
166        raise MissingDependencyError(
167            f"Could not find program '{program}' on the PATH"
168        ) from e
169
170    match = re.match(regex, output.strip())
171    if not match:
172        raise MissingDependencyError(
173            f"The program '{program}' did not report its version. "
174            f"Message was:\n{output}"
175        )
176    version = match.group(1)
177
178    return version
179
180
181missing_program = '''
182The program '{program}' could not be executed or was not found on your
183system PATH.
184'''
185
186missing_optional_program = '''
187The program '{program}' could not be executed or was not found on your
188system PATH.  This program is required when you use the
189{required_for} arguments.  You could try omitting these arguments, or install
190the package.
191'''
192
193missing_recommend_program = '''
194The program '{program}' could not be executed or was not found on your
195system PATH.  This program is recommended when using the {required_for} arguments,
196but not required, so we will proceed.  For best results, install the program.
197'''
198
199old_version = '''
200OCRmyPDF requires '{program}' {need_version} or higher.  Your system appears
201to have {found_version}.  Please update this program.
202'''
203
204old_version_required_for = '''
205OCRmyPDF requires '{program}' {need_version} or higher when run with the
206{required_for} arguments.  If you omit these arguments, OCRmyPDF may be able to
207proceed.  For best results, install the program.
208'''
209
210osx_install_advice = '''
211If you have homebrew installed, try these command to install the missing
212package:
213    brew install {package}
214'''
215
216linux_install_advice = '''
217On systems with the aptitude package manager (Debian, Ubuntu), try these
218commands:
219    sudo apt-get update
220    sudo apt-get install {package}
221
222On RPM-based systems (Red Hat, Fedora), search for instructions on
223installing the RPM for {program}.
224'''
225
226windows_install_advice = '''
227If not already installed, install the Chocolatey package manager. Then use
228a command prompt to install the missing package:
229    choco install {package}
230'''
231
232
233def _get_platform():
234    if sys.platform.startswith('freebsd'):
235        return 'freebsd'
236    elif sys.platform.startswith('linux'):
237        return 'linux'
238    elif sys.platform.startswith('win'):
239        return 'windows'
240    return sys.platform
241
242
243def _error_trailer(program, package, **kwargs):
244    if isinstance(package, Mapping):
245        package = package.get(_get_platform(), program)
246
247    if _get_platform() == 'darwin':
248        log.info(osx_install_advice.format(**locals()))
249    elif _get_platform() == 'linux':
250        log.info(linux_install_advice.format(**locals()))
251    elif _get_platform() == 'windows':
252        log.info(windows_install_advice.format(**locals()))
253
254
255def _error_missing_program(program, package, required_for, recommended):
256    if recommended:
257        log.warning(missing_recommend_program.format(**locals()))
258    elif required_for:
259        log.error(missing_optional_program.format(**locals()))
260    else:
261        log.error(missing_program.format(**locals()))
262    _error_trailer(**locals())
263
264
265def _error_old_version(program, package, need_version, found_version, required_for):
266    if required_for:
267        log.error(old_version_required_for.format(**locals()))
268    else:
269        log.error(old_version.format(**locals()))
270    _error_trailer(**locals())
271
272
273def check_external_program(
274    *,
275    program: str,
276    package: str,
277    version_checker: Union[str, Callable],
278    need_version: str,
279    required_for: Optional[str] = None,
280    recommended=False,
281    version_parser: Type[Version] = LooseVersion,
282):
283    """Check for required version of external program and raise exception if not.
284
285    Args:
286        program: The name of the program to test.
287        package: The name of a software package that typically supplies this program.
288            Usually the same as program.
289        version_check: A callable without arguments that retrieves the installed
290            version of program.
291        need_version: The minimum required version.
292        required_for: The name of an argument of feature that requires this program.
293        recommended: If this external program is recommended, instead of raising
294            an exception, log a warning and allow execution to continue.
295        version_parser: A class that should be used to parse and compare version
296            numbers. Used when version numbers do not follow standard conventions.
297    """
298
299    try:
300        if callable(version_checker):
301            found_version = version_checker()
302        else:
303            found_version = version_checker
304    except (CalledProcessError, FileNotFoundError, MissingDependencyError):
305        _error_missing_program(program, package, required_for, recommended)
306        if not recommended:
307            raise MissingDependencyError(program)
308        return
309
310    def remove_leading_v(s):
311        if s.startswith('v'):
312            return s[1:]
313        return s
314
315    found_version = remove_leading_v(found_version)
316    need_version = remove_leading_v(need_version)
317
318    if found_version and version_parser(found_version) < version_parser(need_version):
319        _error_old_version(program, package, need_version, found_version, required_for)
320        if not recommended:
321            raise MissingDependencyError(program)
322
323    log.debug('Found %s %s', program, found_version)
324