1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5from io import BytesIO 6import time 7from random import uniform 8import colorama 9import requests 10from knack.util import CLIError 11from knack.log import get_logger 12from msrestazure.azure_exceptions import CloudError 13from azure.common import AzureHttpError 14from azure.cli.core.profiles import ResourceType, get_sdk 15from ._azure_utils import get_blob_info 16from ._constants import ACR_RUN_DEFAULT_TIMEOUT_IN_SEC 17 18logger = get_logger(__name__) 19 20DEFAULT_CHUNK_SIZE = 1024 * 4 21 22 23def stream_logs(cmd, client, 24 run_id, 25 registry_name, 26 resource_group_name, 27 timeout=ACR_RUN_DEFAULT_TIMEOUT_IN_SEC, 28 no_format=False, 29 raise_error_on_failure=False): 30 log_file_sas = None 31 artifact = False 32 error_msg = "Could not get logs for ID: {}".format(run_id) 33 34 try: 35 response = client.get_log_sas_url( 36 resource_group_name=resource_group_name, 37 registry_name=registry_name, 38 run_id=run_id) 39 if not response.log_artifact_link: 40 log_file_sas = response.log_link 41 else: 42 log_file_sas = response.log_artifact_link 43 artifact = True 44 except (AttributeError, CloudError) as e: 45 logger.debug("%s Exception: %s", error_msg, e) 46 raise CLIError(error_msg) 47 48 if not log_file_sas: 49 logger.debug("%s Empty SAS URL.", error_msg) 50 raise CLIError(error_msg) 51 52 if not artifact: 53 account_name, endpoint_suffix, container_name, blob_name, sas_token = get_blob_info( 54 log_file_sas) 55 AppendBlobService = get_sdk(cmd.cli_ctx, ResourceType.DATA_STORAGE, 'blob#AppendBlobService') 56 if not timeout: 57 timeout = ACR_RUN_DEFAULT_TIMEOUT_IN_SEC 58 _stream_logs(no_format, 59 DEFAULT_CHUNK_SIZE, 60 timeout, 61 AppendBlobService( 62 account_name=account_name, 63 sas_token=sas_token, 64 endpoint_suffix=endpoint_suffix), 65 container_name, 66 blob_name, 67 raise_error_on_failure) 68 else: 69 _stream_artifact_logs(log_file_sas, 70 no_format) 71 72 73def _stream_logs(no_format, # pylint: disable=too-many-locals, too-many-statements, too-many-branches 74 byte_size, 75 timeout_in_seconds, 76 blob_service, 77 container_name, 78 blob_name, 79 raise_error_on_failure): 80 81 if not no_format: 82 colorama.init() 83 84 log_exist = False 85 stream = BytesIO() 86 metadata = {} 87 start = 0 88 end = byte_size - 1 89 available = 0 90 sleep_time = 1 91 max_sleep_time = 15 92 num_fails = 0 93 num_fails_for_backoff = 3 94 consecutive_sleep_in_sec = 0 95 96 # Try to get the initial properties so there's no waiting. 97 # If the storage call fails, we'll just sleep and try again after. 98 try: 99 # Need to call "exists" API to prevent storage SDK logging BlobNotFound error 100 log_exist = blob_service.exists( 101 container_name=container_name, blob_name=blob_name) 102 103 if log_exist: 104 props = blob_service.get_blob_properties( 105 container_name=container_name, blob_name=blob_name) 106 metadata = props.metadata 107 available = props.properties.content_length 108 else: 109 # Wait a little bit before checking the existence again 110 time.sleep(1) 111 except (AttributeError, AzureHttpError): 112 pass 113 114 while (_blob_is_not_complete(metadata) or start < available): 115 while start < available: 116 # Success! Reset our polling backoff. 117 sleep_time = 1 118 num_fails = 0 119 consecutive_sleep_in_sec = 0 120 121 try: 122 old_byte_size = len(stream.getvalue()) 123 blob_service.get_blob_to_stream( 124 container_name=container_name, 125 blob_name=blob_name, 126 start_range=start, 127 end_range=end, 128 stream=stream) 129 130 curr_bytes = stream.getvalue() 131 new_byte_size = len(curr_bytes) 132 amount_read = new_byte_size - old_byte_size 133 start += amount_read 134 end = start + byte_size - 1 135 136 # Only scan what's newly read. If nothing is read, default to 0. 137 min_scan_range = max(new_byte_size - amount_read - 1, 0) 138 for i in range(new_byte_size - 1, min_scan_range, -1): 139 if curr_bytes[i - 1:i + 1] == b'\r\n': 140 flush = curr_bytes[:i] # won't print \n 141 stream = BytesIO() 142 stream.write(curr_bytes[i + 1:]) 143 print(flush.decode('utf-8', errors='ignore')) 144 break 145 except AzureHttpError as ae: 146 if ae.status_code != 404: 147 raise CLIError(ae) 148 except KeyboardInterrupt: 149 curr_bytes = stream.getvalue() 150 if curr_bytes: 151 print(curr_bytes.decode('utf-8', errors='ignore')) 152 return 153 154 try: 155 if log_exist: 156 props = blob_service.get_blob_properties( 157 container_name=container_name, blob_name=blob_name) 158 metadata = props.metadata 159 available = props.properties.content_length 160 else: 161 log_exist = blob_service.exists( 162 container_name=container_name, blob_name=blob_name) 163 except AzureHttpError as ae: 164 if ae.status_code != 404: 165 raise CLIError(ae) 166 except KeyboardInterrupt: 167 if curr_bytes: 168 print(curr_bytes.decode('utf-8', errors='ignore')) 169 return 170 except Exception as err: 171 raise CLIError(err) 172 173 if consecutive_sleep_in_sec > timeout_in_seconds: 174 # Flush anything remaining in the buffer - this would be the case 175 # if the file has expired and we weren't able to detect any \r\n 176 curr_bytes = stream.getvalue() 177 if curr_bytes: 178 print(curr_bytes.decode('utf-8', errors='ignore')) 179 180 logger.warning("Failed to find any new logs in %d seconds. Client will stop polling for additional logs.", 181 consecutive_sleep_in_sec) 182 return 183 184 # If no new data available but not complete, sleep before trying to process additional data. 185 if (_blob_is_not_complete(metadata) and start >= available): 186 num_fails += 1 187 188 logger.debug( 189 "Failed to find new content %d times in a row", num_fails) 190 if num_fails >= num_fails_for_backoff: 191 num_fails = 0 192 sleep_time = min(sleep_time * 2, max_sleep_time) 193 logger.debug("Resetting failure count to %d", num_fails) 194 195 rnd = uniform(1, 2) # 1.0 <= x < 2.0 196 total_sleep_time = sleep_time + rnd 197 consecutive_sleep_in_sec += total_sleep_time 198 logger.debug("Base sleep time: %d, random delay: %d, total: %d, consecutive: %d", 199 sleep_time, rnd, total_sleep_time, consecutive_sleep_in_sec) 200 time.sleep(total_sleep_time) 201 202 # One final check to see if there's anything in the buffer to flush 203 # E.g., metadata has been set and start == available, but the log file 204 # didn't end in \r\n, so we were unable to flush out the final contents. 205 curr_bytes = stream.getvalue() 206 if curr_bytes: 207 print(curr_bytes.decode('utf-8', errors='ignore')) 208 209 build_status = _get_run_status(metadata).lower() 210 logger.debug("status was: '%s'", build_status) 211 212 if raise_error_on_failure: 213 if build_status in ('internalerror', 'failed'): 214 raise CLIError("Run failed") 215 if build_status == 'timedout': 216 raise CLIError("Run timed out") 217 if build_status == 'canceled': 218 raise CLIError("Run was canceled") 219 220 221def _stream_artifact_logs(log_file_sas, 222 no_format): 223 224 if not no_format: 225 colorama.init() 226 227 try: 228 response = requests.get(log_file_sas, timeout=3, verify=False, stream=True) 229 response.raise_for_status() 230 except KeyboardInterrupt: 231 return 232 except Exception as err: 233 raise CLIError(err) 234 235 for line in response.iter_lines(): 236 print(line.decode('utf-8', errors='ignore')) 237 238 239def _blob_is_not_complete(metadata): 240 if not metadata: 241 return True 242 for key in metadata: 243 if key.lower() == 'complete': 244 return False 245 return True 246 247 248def _get_run_status(metadata): 249 if metadata is None: 250 return 'inprogress' 251 for key in metadata: 252 if key.lower() == 'complete': 253 return metadata[key] 254 return 'inprogress' 255