1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5from io import BytesIO
6import time
7from random import uniform
8import colorama
9from knack.util import CLIError
10from knack.log import get_logger
11from msrestazure.azure_exceptions import CloudError
12from azure.storage.blob import AppendBlobService
13from azure.common import AzureHttpError
14from ._azure_utils import get_blob_info
15
16logger = get_logger(__name__)
17
18DEFAULT_CHUNK_SIZE = 1024 * 4
19DEFAULT_LOG_TIMEOUT_IN_SEC = 60 * 30  # 30 minutes
20
21
22def stream_logs(client,
23                run_id,
24                registry_name,
25                resource_group_name,
26                no_format=False,
27                raise_error_on_failure=False):
28    log_file_sas = None
29    error_msg = "Could not get logs for ID: {}".format(run_id)
30
31    try:
32        log_file_sas = client.get_log_sas_url(
33            resource_group_name=resource_group_name,
34            registry_name=registry_name,
35            run_id=run_id).log_link
36    except (AttributeError, CloudError) as e:
37        logger.debug("%s Exception: %s", error_msg, e)
38        raise CLIError(error_msg)
39
40    if not log_file_sas:
41        logger.debug("%s Empty SAS URL.", error_msg)
42        raise CLIError(error_msg)
43
44    account_name, endpoint_suffix, container_name, blob_name, sas_token = get_blob_info(
45        log_file_sas)
46
47    _stream_logs(no_format,
48                 DEFAULT_CHUNK_SIZE,
49                 DEFAULT_LOG_TIMEOUT_IN_SEC,
50                 AppendBlobService(
51                     account_name=account_name,
52                     sas_token=sas_token,
53                     endpoint_suffix=endpoint_suffix),
54                 container_name,
55                 blob_name,
56                 raise_error_on_failure)
57
58
59def _stream_logs(no_format,  # pylint: disable=too-many-locals, too-many-statements, too-many-branches
60                 byte_size,
61                 timeout_in_seconds,
62                 blob_service,
63                 container_name,
64                 blob_name,
65                 raise_error_on_failure):
66
67    if not no_format:
68        colorama.init()
69
70    stream = BytesIO()
71    metadata = {}
72    start = 0
73    end = byte_size - 1
74    available = 0
75    sleep_time = 1
76    max_sleep_time = 15
77    num_fails = 0
78    num_fails_for_backoff = 3
79    consecutive_sleep_in_sec = 0
80
81    # Try to get the initial properties so there's no waiting.
82    # If the storage call fails, we'll just sleep and try again after.
83    try:
84        props = blob_service.get_blob_properties(
85            container_name=container_name, blob_name=blob_name)
86        metadata = props.metadata
87        available = props.properties.content_length
88    except (AttributeError, AzureHttpError):
89        pass
90
91    while (_blob_is_not_complete(metadata) or start < available):
92        while start < available:
93            # Success! Reset our polling backoff.
94            sleep_time = 1
95            num_fails = 0
96            consecutive_sleep_in_sec = 0
97
98            try:
99                old_byte_size = len(stream.getvalue())
100                blob_service.get_blob_to_stream(
101                    container_name=container_name,
102                    blob_name=blob_name,
103                    start_range=start,
104                    end_range=end,
105                    stream=stream)
106
107                curr_bytes = stream.getvalue()
108                new_byte_size = len(curr_bytes)
109                amount_read = new_byte_size - old_byte_size
110                start += amount_read
111                end = start + byte_size - 1
112
113                # Only scan what's newly read. If nothing is read, default to 0.
114                min_scan_range = max(new_byte_size - amount_read - 1, 0)
115                for i in range(new_byte_size - 1, min_scan_range, -1):
116                    if curr_bytes[i - 1:i + 1] == b'\r\n':
117                        flush = curr_bytes[:i]  # won't print \n
118                        stream = BytesIO()
119                        stream.write(curr_bytes[i + 1:])
120                        print(flush.decode('utf-8', errors='ignore'))
121                        break
122            except AzureHttpError as ae:
123                if ae.status_code != 404:
124                    raise CLIError(ae)
125            except KeyboardInterrupt:
126                curr_bytes = stream.getvalue()
127                if curr_bytes:
128                    print(curr_bytes.decode('utf-8', errors='ignore'))
129                return
130
131        try:
132            props = blob_service.get_blob_properties(
133                container_name=container_name, blob_name=blob_name)
134            metadata = props.metadata
135            available = props.properties.content_length
136        except AzureHttpError as ae:
137            if ae.status_code != 404:
138                raise CLIError(ae)
139        except KeyboardInterrupt:
140            if curr_bytes:
141                print(curr_bytes.decode('utf-8', errors='ignore'))
142            return
143        except Exception as err:
144            raise CLIError(err)
145
146        if consecutive_sleep_in_sec > timeout_in_seconds:
147            # Flush anything remaining in the buffer - this would be the case
148            # if the file has expired and we weren't able to detect any \r\n
149            curr_bytes = stream.getvalue()
150            if curr_bytes:
151                print(curr_bytes.decode('utf-8', errors='ignore'))
152
153            logger.warning("Failed to find any new logs in %d seconds. Client will stop polling for additional logs.",
154                           consecutive_sleep_in_sec)
155            return
156
157        # If no new data available but not complete, sleep before trying to process additional data.
158        if (_blob_is_not_complete(metadata) and start >= available):
159            num_fails += 1
160
161            logger.debug(
162                "Failed to find new content %d times in a row", num_fails)
163            if num_fails >= num_fails_for_backoff:
164                num_fails = 0
165                sleep_time = min(sleep_time * 2, max_sleep_time)
166                logger.debug("Resetting failure count to %d", num_fails)
167
168            rnd = uniform(1, 2)  # 1.0 <= x < 2.0
169            total_sleep_time = sleep_time + rnd
170            consecutive_sleep_in_sec += total_sleep_time
171            logger.debug("Base sleep time: %d, random delay: %d, total: %d, consecutive: %d",
172                         sleep_time, rnd, total_sleep_time, consecutive_sleep_in_sec)
173            time.sleep(total_sleep_time)
174
175    # One final check to see if there's anything in the buffer to flush
176    # E.g., metadata has been set and start == available, but the log file
177    # didn't end in \r\n, so we were unable to flush out the final contents.
178    curr_bytes = stream.getvalue()
179    if curr_bytes:
180        print(curr_bytes.decode('utf-8', errors='ignore'))
181
182    build_status = _get_run_status(metadata).lower()
183    logger.debug("status was: '%s'", build_status)
184
185    if raise_error_on_failure:
186        if build_status == 'internalerror' or build_status == 'failed':
187            raise CLIError("Run failed")
188        elif build_status == 'timedout':
189            raise CLIError("Run timed out")
190        elif build_status == 'canceled':
191            raise CLIError("Run was canceled")
192
193
194def _blob_is_not_complete(metadata):
195    if not metadata:
196        return True
197    for key in metadata:
198        if key.lower() == 'complete':
199            return False
200    return True
201
202
203def _get_run_status(metadata):
204    if metadata is None:
205        return 'inprogress'
206    for key in metadata:
207        if key.lower() == 'complete':
208            return metadata[key]
209    return 'inprogress'
210