1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5from io import BytesIO 6import time 7from random import uniform 8import colorama 9from knack.util import CLIError 10from knack.log import get_logger 11from msrestazure.azure_exceptions import CloudError 12from azure.storage.blob import AppendBlobService 13from azure.common import AzureHttpError 14from ._azure_utils import get_blob_info 15 16logger = get_logger(__name__) 17 18DEFAULT_CHUNK_SIZE = 1024 * 4 19DEFAULT_LOG_TIMEOUT_IN_SEC = 60 * 30 # 30 minutes 20 21 22def stream_logs(client, 23 run_id, 24 registry_name, 25 resource_group_name, 26 no_format=False, 27 raise_error_on_failure=False): 28 log_file_sas = None 29 error_msg = "Could not get logs for ID: {}".format(run_id) 30 31 try: 32 log_file_sas = client.get_log_sas_url( 33 resource_group_name=resource_group_name, 34 registry_name=registry_name, 35 run_id=run_id).log_link 36 except (AttributeError, CloudError) as e: 37 logger.debug("%s Exception: %s", error_msg, e) 38 raise CLIError(error_msg) 39 40 if not log_file_sas: 41 logger.debug("%s Empty SAS URL.", error_msg) 42 raise CLIError(error_msg) 43 44 account_name, endpoint_suffix, container_name, blob_name, sas_token = get_blob_info( 45 log_file_sas) 46 47 _stream_logs(no_format, 48 DEFAULT_CHUNK_SIZE, 49 DEFAULT_LOG_TIMEOUT_IN_SEC, 50 AppendBlobService( 51 account_name=account_name, 52 sas_token=sas_token, 53 endpoint_suffix=endpoint_suffix), 54 container_name, 55 blob_name, 56 raise_error_on_failure) 57 58 59def _stream_logs(no_format, # pylint: disable=too-many-locals, too-many-statements, too-many-branches 60 byte_size, 61 timeout_in_seconds, 62 blob_service, 63 container_name, 64 blob_name, 65 raise_error_on_failure): 66 67 if not no_format: 68 colorama.init() 69 70 stream = BytesIO() 71 metadata = {} 72 start = 0 73 end = byte_size - 1 74 available = 0 75 sleep_time = 1 76 max_sleep_time = 15 77 num_fails = 0 78 num_fails_for_backoff = 3 79 consecutive_sleep_in_sec = 0 80 81 # Try to get the initial properties so there's no waiting. 82 # If the storage call fails, we'll just sleep and try again after. 83 try: 84 props = blob_service.get_blob_properties( 85 container_name=container_name, blob_name=blob_name) 86 metadata = props.metadata 87 available = props.properties.content_length 88 except (AttributeError, AzureHttpError): 89 pass 90 91 while (_blob_is_not_complete(metadata) or start < available): 92 while start < available: 93 # Success! Reset our polling backoff. 94 sleep_time = 1 95 num_fails = 0 96 consecutive_sleep_in_sec = 0 97 98 try: 99 old_byte_size = len(stream.getvalue()) 100 blob_service.get_blob_to_stream( 101 container_name=container_name, 102 blob_name=blob_name, 103 start_range=start, 104 end_range=end, 105 stream=stream) 106 107 curr_bytes = stream.getvalue() 108 new_byte_size = len(curr_bytes) 109 amount_read = new_byte_size - old_byte_size 110 start += amount_read 111 end = start + byte_size - 1 112 113 # Only scan what's newly read. If nothing is read, default to 0. 114 min_scan_range = max(new_byte_size - amount_read - 1, 0) 115 for i in range(new_byte_size - 1, min_scan_range, -1): 116 if curr_bytes[i - 1:i + 1] == b'\r\n': 117 flush = curr_bytes[:i] # won't print \n 118 stream = BytesIO() 119 stream.write(curr_bytes[i + 1:]) 120 print(flush.decode('utf-8', errors='ignore')) 121 break 122 except AzureHttpError as ae: 123 if ae.status_code != 404: 124 raise CLIError(ae) 125 except KeyboardInterrupt: 126 curr_bytes = stream.getvalue() 127 if curr_bytes: 128 print(curr_bytes.decode('utf-8', errors='ignore')) 129 return 130 131 try: 132 props = blob_service.get_blob_properties( 133 container_name=container_name, blob_name=blob_name) 134 metadata = props.metadata 135 available = props.properties.content_length 136 except AzureHttpError as ae: 137 if ae.status_code != 404: 138 raise CLIError(ae) 139 except KeyboardInterrupt: 140 if curr_bytes: 141 print(curr_bytes.decode('utf-8', errors='ignore')) 142 return 143 except Exception as err: 144 raise CLIError(err) 145 146 if consecutive_sleep_in_sec > timeout_in_seconds: 147 # Flush anything remaining in the buffer - this would be the case 148 # if the file has expired and we weren't able to detect any \r\n 149 curr_bytes = stream.getvalue() 150 if curr_bytes: 151 print(curr_bytes.decode('utf-8', errors='ignore')) 152 153 logger.warning("Failed to find any new logs in %d seconds. Client will stop polling for additional logs.", 154 consecutive_sleep_in_sec) 155 return 156 157 # If no new data available but not complete, sleep before trying to process additional data. 158 if (_blob_is_not_complete(metadata) and start >= available): 159 num_fails += 1 160 161 logger.debug( 162 "Failed to find new content %d times in a row", num_fails) 163 if num_fails >= num_fails_for_backoff: 164 num_fails = 0 165 sleep_time = min(sleep_time * 2, max_sleep_time) 166 logger.debug("Resetting failure count to %d", num_fails) 167 168 rnd = uniform(1, 2) # 1.0 <= x < 2.0 169 total_sleep_time = sleep_time + rnd 170 consecutive_sleep_in_sec += total_sleep_time 171 logger.debug("Base sleep time: %d, random delay: %d, total: %d, consecutive: %d", 172 sleep_time, rnd, total_sleep_time, consecutive_sleep_in_sec) 173 time.sleep(total_sleep_time) 174 175 # One final check to see if there's anything in the buffer to flush 176 # E.g., metadata has been set and start == available, but the log file 177 # didn't end in \r\n, so we were unable to flush out the final contents. 178 curr_bytes = stream.getvalue() 179 if curr_bytes: 180 print(curr_bytes.decode('utf-8', errors='ignore')) 181 182 build_status = _get_run_status(metadata).lower() 183 logger.debug("status was: '%s'", build_status) 184 185 if raise_error_on_failure: 186 if build_status == 'internalerror' or build_status == 'failed': 187 raise CLIError("Run failed") 188 elif build_status == 'timedout': 189 raise CLIError("Run timed out") 190 elif build_status == 'canceled': 191 raise CLIError("Run was canceled") 192 193 194def _blob_is_not_complete(metadata): 195 if not metadata: 196 return True 197 for key in metadata: 198 if key.lower() == 'complete': 199 return False 200 return True 201 202 203def _get_run_status(metadata): 204 if metadata is None: 205 return 'inprogress' 206 for key in metadata: 207 if key.lower() == 'complete': 208 return metadata[key] 209 return 'inprogress' 210