1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5from io import BytesIO
6import time
7from random import uniform
8import colorama
9import requests
10from knack.util import CLIError
11from knack.log import get_logger
12from msrestazure.azure_exceptions import CloudError
13from azure.common import AzureHttpError
14from azure.cli.core.profiles import ResourceType, get_sdk
15from ._azure_utils import get_blob_info
16from ._constants import ACR_RUN_DEFAULT_TIMEOUT_IN_SEC
17
18logger = get_logger(__name__)
19
20DEFAULT_CHUNK_SIZE = 1024 * 4
21
22
23def stream_logs(cmd, client,
24                run_id,
25                registry_name,
26                resource_group_name,
27                timeout=ACR_RUN_DEFAULT_TIMEOUT_IN_SEC,
28                no_format=False,
29                raise_error_on_failure=False):
30    log_file_sas = None
31    artifact = False
32    error_msg = "Could not get logs for ID: {}".format(run_id)
33
34    try:
35        response = client.get_log_sas_url(
36            resource_group_name=resource_group_name,
37            registry_name=registry_name,
38            run_id=run_id)
39        if not response.log_artifact_link:
40            log_file_sas = response.log_link
41        else:
42            log_file_sas = response.log_artifact_link
43            artifact = True
44    except (AttributeError, CloudError) as e:
45        logger.debug("%s Exception: %s", error_msg, e)
46        raise CLIError(error_msg)
47
48    if not log_file_sas:
49        logger.debug("%s Empty SAS URL.", error_msg)
50        raise CLIError(error_msg)
51
52    if not artifact:
53        account_name, endpoint_suffix, container_name, blob_name, sas_token = get_blob_info(
54            log_file_sas)
55        AppendBlobService = get_sdk(cmd.cli_ctx, ResourceType.DATA_STORAGE, 'blob#AppendBlobService')
56        if not timeout:
57            timeout = ACR_RUN_DEFAULT_TIMEOUT_IN_SEC
58        _stream_logs(no_format,
59                     DEFAULT_CHUNK_SIZE,
60                     timeout,
61                     AppendBlobService(
62                         account_name=account_name,
63                         sas_token=sas_token,
64                         endpoint_suffix=endpoint_suffix),
65                     container_name,
66                     blob_name,
67                     raise_error_on_failure)
68    else:
69        _stream_artifact_logs(log_file_sas,
70                              no_format)
71
72
73def _stream_logs(no_format,  # pylint: disable=too-many-locals, too-many-statements, too-many-branches
74                 byte_size,
75                 timeout_in_seconds,
76                 blob_service,
77                 container_name,
78                 blob_name,
79                 raise_error_on_failure):
80
81    if not no_format:
82        colorama.init()
83
84    log_exist = False
85    stream = BytesIO()
86    metadata = {}
87    start = 0
88    end = byte_size - 1
89    available = 0
90    sleep_time = 1
91    max_sleep_time = 15
92    num_fails = 0
93    num_fails_for_backoff = 3
94    consecutive_sleep_in_sec = 0
95
96    # Try to get the initial properties so there's no waiting.
97    # If the storage call fails, we'll just sleep and try again after.
98    try:
99        # Need to call "exists" API to prevent storage SDK logging BlobNotFound error
100        log_exist = blob_service.exists(
101            container_name=container_name, blob_name=blob_name)
102
103        if log_exist:
104            props = blob_service.get_blob_properties(
105                container_name=container_name, blob_name=blob_name)
106            metadata = props.metadata
107            available = props.properties.content_length
108        else:
109            # Wait a little bit before checking the existence again
110            time.sleep(1)
111    except (AttributeError, AzureHttpError):
112        pass
113
114    while (_blob_is_not_complete(metadata) or start < available):
115        while start < available:
116            # Success! Reset our polling backoff.
117            sleep_time = 1
118            num_fails = 0
119            consecutive_sleep_in_sec = 0
120
121            try:
122                old_byte_size = len(stream.getvalue())
123                blob_service.get_blob_to_stream(
124                    container_name=container_name,
125                    blob_name=blob_name,
126                    start_range=start,
127                    end_range=end,
128                    stream=stream)
129
130                curr_bytes = stream.getvalue()
131                new_byte_size = len(curr_bytes)
132                amount_read = new_byte_size - old_byte_size
133                start += amount_read
134                end = start + byte_size - 1
135
136                # Only scan what's newly read. If nothing is read, default to 0.
137                min_scan_range = max(new_byte_size - amount_read - 1, 0)
138                for i in range(new_byte_size - 1, min_scan_range, -1):
139                    if curr_bytes[i - 1:i + 1] == b'\r\n':
140                        flush = curr_bytes[:i]  # won't print \n
141                        stream = BytesIO()
142                        stream.write(curr_bytes[i + 1:])
143                        print(flush.decode('utf-8', errors='ignore'))
144                        break
145            except AzureHttpError as ae:
146                if ae.status_code != 404:
147                    raise CLIError(ae)
148            except KeyboardInterrupt:
149                curr_bytes = stream.getvalue()
150                if curr_bytes:
151                    print(curr_bytes.decode('utf-8', errors='ignore'))
152                return
153
154        try:
155            if log_exist:
156                props = blob_service.get_blob_properties(
157                    container_name=container_name, blob_name=blob_name)
158                metadata = props.metadata
159                available = props.properties.content_length
160            else:
161                log_exist = blob_service.exists(
162                    container_name=container_name, blob_name=blob_name)
163        except AzureHttpError as ae:
164            if ae.status_code != 404:
165                raise CLIError(ae)
166        except KeyboardInterrupt:
167            if curr_bytes:
168                print(curr_bytes.decode('utf-8', errors='ignore'))
169            return
170        except Exception as err:
171            raise CLIError(err)
172
173        if consecutive_sleep_in_sec > timeout_in_seconds:
174            # Flush anything remaining in the buffer - this would be the case
175            # if the file has expired and we weren't able to detect any \r\n
176            curr_bytes = stream.getvalue()
177            if curr_bytes:
178                print(curr_bytes.decode('utf-8', errors='ignore'))
179
180            logger.warning("Failed to find any new logs in %d seconds. Client will stop polling for additional logs.",
181                           consecutive_sleep_in_sec)
182            return
183
184        # If no new data available but not complete, sleep before trying to process additional data.
185        if (_blob_is_not_complete(metadata) and start >= available):
186            num_fails += 1
187
188            logger.debug(
189                "Failed to find new content %d times in a row", num_fails)
190            if num_fails >= num_fails_for_backoff:
191                num_fails = 0
192                sleep_time = min(sleep_time * 2, max_sleep_time)
193                logger.debug("Resetting failure count to %d", num_fails)
194
195            rnd = uniform(1, 2)  # 1.0 <= x < 2.0
196            total_sleep_time = sleep_time + rnd
197            consecutive_sleep_in_sec += total_sleep_time
198            logger.debug("Base sleep time: %d, random delay: %d, total: %d, consecutive: %d",
199                         sleep_time, rnd, total_sleep_time, consecutive_sleep_in_sec)
200            time.sleep(total_sleep_time)
201
202    # One final check to see if there's anything in the buffer to flush
203    # E.g., metadata has been set and start == available, but the log file
204    # didn't end in \r\n, so we were unable to flush out the final contents.
205    curr_bytes = stream.getvalue()
206    if curr_bytes:
207        print(curr_bytes.decode('utf-8', errors='ignore'))
208
209    build_status = _get_run_status(metadata).lower()
210    logger.debug("status was: '%s'", build_status)
211
212    if raise_error_on_failure:
213        if build_status in ('internalerror', 'failed'):
214            raise CLIError("Run failed")
215        if build_status == 'timedout':
216            raise CLIError("Run timed out")
217        if build_status == 'canceled':
218            raise CLIError("Run was canceled")
219
220
221def _stream_artifact_logs(log_file_sas,
222                          no_format):
223
224    if not no_format:
225        colorama.init()
226
227    try:
228        response = requests.get(log_file_sas, timeout=3, verify=False, stream=True)
229        response.raise_for_status()
230    except KeyboardInterrupt:
231        return
232    except Exception as err:
233        raise CLIError(err)
234
235    for line in response.iter_lines():
236        print(line.decode('utf-8', errors='ignore'))
237
238
239def _blob_is_not_complete(metadata):
240    if not metadata:
241        return True
242    for key in metadata:
243        if key.lower() == 'complete':
244            return False
245    return True
246
247
248def _get_run_status(metadata):
249    if metadata is None:
250        return 'inprogress'
251    for key in metadata:
252        if key.lower() == 'complete':
253            return metadata[key]
254    return 'inprogress'
255