1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6from datetime import datetime, timedelta 7 8import uuid 9import re 10import json 11import isodate 12 13from knack.log import get_logger 14logger = get_logger(__name__) 15 16iso8601pattern = re.compile("^P(?!$)(\\d+Y)?(\\d+M)?(\\d+W)?(\\d+D)?(T(?=\\d)(\\d+H)?(\\d+M)?(\\d+.)?(\\d+S)?)?$") 17 18 19def _gen_guid(): 20 return uuid.uuid4() 21 22 23def _is_guid(guid): 24 try: 25 uuid.UUID(guid) 26 return True 27 except ValueError: 28 pass 29 return False 30 31 32def parse_duration(str_duration): 33 if str_duration: 34 if iso8601pattern.match(str_duration): 35 return isodate.parse_duration(str_duration) 36 37 return parse_timedelta(str_duration) 38 return None 39 40 41def parse_timedelta(str_duration): 42 if str_duration: 43 datetime_duration = datetime.strptime(str_duration, '%H:%M:%S') 44 return timedelta(hours=datetime_duration.hour or 0, 45 minutes=datetime_duration.minute or 0, 46 seconds=datetime_duration.second or 0) 47 48 49def camel_to_snake(name): 50 s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) 51 return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() 52 53 54def snake_to_camel_case(snake_str): 55 components = snake_str.split('_') 56 return components[0] + ''.join(x.title() for x in components[1:]) 57 58 59class JsonBytearrayEncoder(json.JSONEncoder): 60 DATE_FORMAT = "%Y-%m-%d" 61 TIME_FORMAT = "%H:%M:%S" 62 63 def default(self, obj): # pylint: disable=E0202,W0221 64 if isinstance(obj, datetime): 65 return obj.strftime("%s %s" % (self.DATE_FORMAT, self.TIME_FORMAT)) 66 67 if isinstance(obj, bytearray): 68 return bytes(obj).decode('utf-8', 'ignore') 69 70 try: 71 return obj.toJSON() 72 except Exception: # pylint: disable=W0703 73 obj = vars(obj) 74 obj.pop('additional_properties', None) 75 keys = list(obj.keys()) 76 for key in keys: 77 obj[snake_to_camel_case(key)] = obj.pop(key) 78 return obj 79 80 81def create_ip_range(resource_name, ip): 82 from azure.mgmt.media.models import IPRange 83 subnet_prefix_length = None 84 try: 85 if '/' in ip: 86 ipSplit = ip.split('/') 87 ip = ipSplit[0] 88 subnet_prefix_length = ipSplit[1] 89 except ValueError: 90 pass 91 return IPRange(name=("{}_{}".format(resource_name, ip)), address=ip, subnet_prefix_length=subnet_prefix_length) 92 93 94# The Media Services SDK does not throw an exception for 404s. 95# This helper will make our 'not founds' look like other services. 96def show_resource_not_found_message(group, account, ams_type, name): 97 import sys 98 message = build_resource_not_found_message(group, account, ams_type, name) 99 from azure.cli.core.azlogging import CommandLoggerContext 100 with CommandLoggerContext(logger): 101 logger.error(message) 102 sys.exit(3) 103 104 105def build_resource_not_found_message(group, account, ams_type, name): 106 message = "The Resource 'Microsoft.Media/mediaServices/{}/{}/{}' under resource group '{}' was not found." \ 107 .format(account, ams_type, name, group) 108 return message 109 110 111def show_child_resource_not_found_message(group, account, ams_type, name, ams_child_type, child_name): 112 import sys 113 message = build_child_resource_not_found_message(account, ams_type, name, ams_child_type, child_name, group) 114 from azure.cli.core.azlogging import CommandLoggerContext 115 with CommandLoggerContext(logger): 116 logger.error(message) 117 sys.exit(3) 118 119 120def build_child_resource_not_found_message(group, account, ams_type, name, ams_child_type, child_name): 121 message = "The Resource 'Microsoft.Media/mediaServices/{}/{}/{}/{}/{}' under resource group '{}' was not found." \ 122 .format(account, ams_type, name, ams_child_type, child_name, group) 123 return message 124