1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5
6"""
7Logging for Azure CLI
8
9- Loggers: The name of the parent logger is defined in CLI_LOGGER_NAME variable. All the loggers used in the CLI
10           must descends from it, otherwise it won't benefit from the logger handlers, filters and level configuration.
11
12- Handlers: There are two default handlers will be added to both CLI parent logger and root logger. One is a colorized
13            stream handler for console output and the other is a file logger handler. The file logger can be enabled or
14            disabled through 'az configure' command. The logging file locates at path defined in AZ_LOGFILE_DIR.
15
16- Level: Based on the verbosity option given by users, the logging levels for root and CLI parent loggers are:
17
18               CLI Parent                  Root
19            Console     File        Console     File
20omitted     Warning     Debug       Critical    Debug
21--verbose   Info        Debug       Critical    Debug
22--debug     Debug       Debug       Debug       Debug
23
24"""
25
26import os
27import logging
28import datetime
29
30from azure.cli.core.commands.events import EVENT_INVOKER_PRE_CMD_TBL_TRUNCATE
31
32from knack.events import EVENT_CLI_POST_EXECUTE
33from knack.log import CLILogging, get_logger, LOG_FILE_ENCODING
34from knack.util import ensure_dir
35
36
37_UNKNOWN_COMMAND = "unknown_command"
38_CMD_LOG_LINE_PREFIX = "CMD-LOG-LINE-BEGIN"
39
40
41class AzCliLogging(CLILogging):
42    COMMAND_METADATA_LOGGER = 'az_command_data_logger'
43
44    def __init__(self, name, cli_ctx=None):
45        super(AzCliLogging, self).__init__(name, cli_ctx)
46        self.command_log_dir = os.path.join(cli_ctx.config.config_dir, 'commands')
47        self.command_logger_handler = None
48        self.command_metadata_logger = None
49        self.cli_ctx.register_event(EVENT_INVOKER_PRE_CMD_TBL_TRUNCATE, AzCliLogging.init_command_file_logging)
50        self.cli_ctx.register_event(EVENT_CLI_POST_EXECUTE, AzCliLogging.deinit_cmd_metadata_logging)
51
52    def configure(self, args):
53        super(AzCliLogging, self).configure(args)
54        from knack.log import CliLogLevel
55        if self.log_level == CliLogLevel.DEBUG:
56            # As azure.core.pipeline.policies.http_logging_policy is a redacted version of
57            # azure.core.pipeline.policies._universal, disable azure.core.pipeline.policies.http_logging_policy
58            # when debug log is shown.
59            logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.CRITICAL)
60
61    def get_command_log_dir(self):
62        return self.command_log_dir
63
64    @staticmethod
65    def init_command_file_logging(cli_ctx, **kwargs):
66        def _delete_old_logs(log_dir):
67            """
68            Periodically delete the 5 oldest command log files, ensuring that only the history of the last
69            25 commands (or less) are kept.
70            """
71
72            # get log file names and sort them from newest to oldest file.
73            log_file_names = [file for file in os.listdir(log_dir) if file.endswith(".log")]
74            sorted_files = sorted(log_file_names, reverse=True)
75
76            # if we have too many files, delete the 5 last / oldest command log files.
77            if len(sorted_files) > 25:
78                for file in sorted_files[-5:]:
79                    try:
80                        os.remove(os.path.join(log_dir, file))
81                    except OSError:  # FileNotFoundError introduced in Python 3
82                        continue
83
84        # if tab-completion and not command don't log to file.
85        if not cli_ctx.data.get('completer_active', False):
86            self = cli_ctx.logging
87            args = kwargs['args']
88
89            metadata_logger = logging.getLogger(AzCliLogging.COMMAND_METADATA_LOGGER)
90
91            # Overwrite the default of knack.log.CLILogging._is_file_log_enabled() to True
92            self.file_log_enabled = cli_ctx.config.getboolean('logging', 'enable_log_file', fallback=True)
93
94            if self.file_log_enabled:
95                self._init_command_logfile_handlers(metadata_logger, args)  # pylint: disable=protected-access
96                _delete_old_logs(self.command_log_dir)
97
98    def _init_command_logfile_handlers(self, command_metadata_logger, args):
99
100        ensure_dir(self.command_log_dir)
101        command = self.cli_ctx.invocation._rudimentary_get_command(args) or _UNKNOWN_COMMAND  # pylint: disable=protected-access, line-too-long
102        command_str = command.replace(" ", "_")
103        if command_str.lower() == "feedback":
104            return
105
106        date_str = str(datetime.datetime.now().date())
107        time = datetime.datetime.now().time()
108        time_str = "{:02}-{:02}-{:02}".format(time.hour, time.minute, time.second)
109
110        log_name = "{}.{}.{}.{}.{}".format(date_str, time_str, command_str, os.getpid(), "log")
111
112        log_file_path = os.path.join(self.command_log_dir, log_name)
113        get_logger(__name__).debug("metadata file logging enabled - writing logs to '%s'.", log_file_path)
114
115        logfile_handler = logging.FileHandler(log_file_path, encoding=LOG_FILE_ENCODING)
116
117        lfmt = logging.Formatter(_CMD_LOG_LINE_PREFIX + ' %(process)d | %(asctime)s | %(levelname)s | %(name)s | %(message)s')  # pylint: disable=line-too-long
118        logfile_handler.setFormatter(lfmt)
119        logfile_handler.setLevel(logging.DEBUG)
120        # command_metadata_logger should always accept all levels regardless of the root logger level.
121        command_metadata_logger.setLevel(logging.DEBUG)
122        command_metadata_logger.addHandler(logfile_handler)
123
124        self.command_logger_handler = logfile_handler
125        self.command_metadata_logger = command_metadata_logger
126
127        args = AzCliLogging._get_clean_args(command if command != _UNKNOWN_COMMAND else None, args)
128        command_metadata_logger.info("command args: %s", " ".join(args))
129
130    @staticmethod
131    def _get_clean_args(command, args):  # TODO: add test for this function
132        # based on AzCliCommandInvoker._extract_parameter_names(args)
133        # note: name start with more than 2 '-' will be treated as value e.g. certs in PEM format
134
135        # if no command provided, try to guess the intended command. This does not work for positionals
136        if not command:
137            command_list = []
138            for arg in args:
139                if arg.startswith('-') and not arg.startswith('---') and len(arg) > 1:
140                    break
141                command_list.append(arg)
142            command = " ".join(command_list)
143
144        command = command.split()
145        cleaned_args = []
146        placeholder = "{}"
147        for i, arg in enumerate(args):
148            # while this token a part of the command add it.
149            # Note: if 'command' is none first positional would be captured.
150            if i < len(command):
151                cleaned_args.append(arg)
152                continue
153
154            # if valid optional name
155            if arg.startswith('-') and not arg.startswith('---') and len(arg) > 1:
156
157                # if short option with or without "="
158                if not arg.startswith("--"):
159                    opt = arg[:2]  # get opt
160
161                    opt = opt + "=" if len(arg) > 2 and arg[2] == "=" else opt  # append '=' if necessary
162                    opt = opt + placeholder if (len(arg) > 2 and arg[2] != "=") or len(
163                        arg) > 3 else opt  # append placeholder if argument with optional
164                    cleaned_args.append(opt)
165                    continue
166
167                # otherwise if long option with "="
168                if "=" in arg:
169                    opt, _ = arg.split('=', 1)
170                    cleaned_args.append(opt + "=" + placeholder)
171                    continue
172
173                cleaned_args.append(arg)
174                continue
175
176            # else if positional or optional argument / value
177            cleaned_args.append(placeholder)
178
179        return cleaned_args
180
181    def log_cmd_metadata_extension_info(self, extension_name, extension_version):
182        if self.command_metadata_logger:
183            self.command_metadata_logger.info("extension name: %s", extension_name)
184            self.command_metadata_logger.info("extension version: %s", extension_version)
185
186    @staticmethod
187    def deinit_cmd_metadata_logging(cli_ctx):
188        cli_ctx.logging.end_cmd_metadata_logging(cli_ctx.result.exit_code if cli_ctx.result else 128)
189
190    def end_cmd_metadata_logging(self, exit_code):  # leave it non '-' prefix to not to break user
191        if self.command_metadata_logger:
192            self.command_metadata_logger.info("exit code: %s", exit_code)
193
194            # We have finished metadata logging, remove handler and set command_metadata_handler to None.
195            # crucial to remove handler as in python logger objects are shared which can affect testing of this logger
196            # we do not want duplicate handlers to be added in subsequent calls of _init_command_logfile_handlers
197            self.command_logger_handler.close()
198            self.command_metadata_logger.removeHandler(self.command_logger_handler)
199            self.command_metadata_logger = None
200
201
202class CommandLoggerContext:
203    """A context manager during which error logs are also written to az_command_data_logger for
204    `az feedback` usage.
205    """
206    def __init__(self, module_logger):
207        metadata_logger = logging.getLogger(AzCliLogging.COMMAND_METADATA_LOGGER)
208        original_error = module_logger.error
209
210        # Duplicate error logging to metadata logger
211        def error_duplicated(*args, **kwargs):
212            original_error(*args, **kwargs)
213            metadata_logger.error(*args, **kwargs)
214        from unittest import mock
215        self.mock_cm = mock.patch.object(module_logger, 'error', error_duplicated)
216
217    def __enter__(self):
218        self.mock_cm.__enter__()
219        return self
220
221    def __exit__(self, exc_type, exc_val, exc_tb):
222        self.mock_cm.__exit__(exc_type, exc_val, exc_tb)
223