1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6import re 7import logging 8from msrest.service_client import ServiceClient 9from msrest import Configuration, Deserializer 10from msrest.exceptions import HttpOperationError 11 12from ..user.user_manager import UserManager 13from . import models 14 15 16class OrganizationManager(): 17 """ Manage DevOps organizations 18 19 Create or list existing organizations 20 21 Attributes: 22 config: url configuration 23 client: authentication client 24 dserialize: deserializer to process http responses into python classes 25 """ 26 27 def __init__(self, base_url='https://app.vssps.visualstudio.com', creds=None, 28 create_organization_url='https://app.vsaex.visualstudio.com'): 29 """Inits OrganizationManager""" 30 self._creds = creds 31 self._config = Configuration(base_url=base_url) 32 self._client = ServiceClient(creds, self._config) 33 #need to make a secondary client for the creating organization as it uses a different base url 34 self._create_organization_config = Configuration(base_url=create_organization_url) 35 self._create_organization_client = ServiceClient(creds, self._create_organization_config) 36 37 self._list_region_config = Configuration(base_url='https://aex.dev.azure.com') 38 self._list_region_client = ServiceClient(creds, self._create_organization_config) 39 client_models = {k: v for k, v in models.__dict__.items() if isinstance(v, type)} 40 self._deserialize = Deserializer(client_models) 41 self._user_mgr = UserManager(creds=self._creds) 42 43 def validate_organization_name(self, organization_name): 44 """Validate an organization name by checking it does not already exist and that it fits name restrictions""" 45 if organization_name is None: 46 return models.ValidateAccountName(valid=False, message="The organization_name cannot be None") 47 48 if re.search("[^0-9A-Za-z-]", organization_name): 49 return models.ValidateAccountName(valid=False, message="""The name supplied contains forbidden characters. 50 Only alphanumeric characters and dashes are allowed. 51 Please try another organization name.""") 52 #construct url 53 url = '/_AzureSpsAccount/ValidateAccountName' 54 55 #construct query parameters 56 query_paramters = {} 57 query_paramters['accountName'] = organization_name 58 59 #construct header parameters 60 header_paramters = {} 61 header_paramters['Accept'] = 'application/json' 62 63 request = self._client.get(url, params=query_paramters) 64 response = self._client.send(request, headers=header_paramters) 65 66 # Handle Response 67 deserialized = None 68 if response.status_code // 100 != 2: 69 logging.error("GET %s", request.url) 70 logging.error("response: %s", response.status_code) 71 logging.error(response.text) 72 raise HttpOperationError(self._deserialize, response) 73 else: 74 deserialized = self._deserialize('ValidateAccountName', response) 75 76 return deserialized 77 78 def list_organizations(self): 79 """List what organizations this user is part of""" 80 81 if not self._user_mgr.is_msa_account(): 82 # Only need to do the one request as ids are the same 83 organizations = self._list_organizations_request(self._user_mgr.aad_id) 84 else: 85 # Need to do a request for each of the ids and then combine them (disabled) 86 #organizations_aad = self._list_organizations_request(self._user_mgr.aad_id, msa=False) 87 #organizations_msa = self._list_organizations_request(self._user_mgr.msa_id, msa=True) 88 #organizations = organizations_msa 89 90 # Overwrite merge aad organizations with msa organizations 91 #duplicated_aad_orgs = [] 92 #for msa_org in organizations_msa.value: 93 # duplicated_aad_orgs.extend([ 94 # o for o in organizations_aad.value if o.accountName == msa_org.accountName 95 # ]) 96 #filtered_organizations_aad = [o for o in organizations_aad.value if (o not in duplicated_aad_orgs)] 97 98 #organizations.value += list(filtered_organizations_aad) 99 #organizations.count = len(organizations.value) 100 organizations = self._list_organizations_request(self._user_mgr.msa_id, msa=True) 101 102 return organizations 103 104 def _list_organizations_request(self, member_id, msa=False): 105 url = '/_apis/Commerce/Subscription' 106 107 query_paramters = {} 108 query_paramters['memberId'] = member_id 109 query_paramters['includeMSAAccounts'] = True 110 query_paramters['queryOnlyOwnerAccounts'] = True 111 query_paramters['inlcudeDisabledAccounts'] = False 112 query_paramters['providerNamespaceId'] = 'VisualStudioOnline' 113 114 #construct header parameters 115 header_parameters = {} 116 header_parameters['X-VSS-ForceMsaPassThrough'] = 'true' if msa else 'false' 117 header_parameters['Accept'] = 'application/json' 118 119 request = self._client.get(url, params=query_paramters) 120 response = self._client.send(request, headers=header_parameters) 121 122 # Handle Response 123 deserialized = None 124 if response.status_code // 100 != 2: 125 logging.error("GET %s", request.url) 126 logging.error("response: %s", response.status_code) 127 logging.error(response.text) 128 raise HttpOperationError(self._deserialize, response) 129 else: 130 deserialized = self._deserialize('Organizations', response) 131 132 return deserialized 133 134 def create_organization(self, region_code, organization_name): 135 """Create a new organization for user""" 136 url = '/_apis/HostAcquisition/collections' 137 138 #construct query parameters 139 query_paramters = {} 140 query_paramters['collectionName'] = organization_name 141 query_paramters['preferredRegion'] = region_code 142 query_paramters['api-version'] = '4.0-preview.1' 143 144 #construct header parameters 145 header_paramters = {} 146 header_paramters['Accept'] = 'application/json' 147 header_paramters['Content-Type'] = 'application/json' 148 if self._user_mgr.is_msa_account(): 149 header_paramters['X-VSS-ForceMsaPassThrough'] = 'true' 150 151 #construct the payload 152 payload = {} 153 payload['VisualStudio.Services.HostResolution.UseCodexDomainForHostCreation'] = 'true' 154 155 request = self._create_organization_client.post(url=url, params=query_paramters, content=payload) 156 response = self._create_organization_client.send(request, headers=header_paramters) 157 158 # Handle Response 159 deserialized = None 160 if response.status_code // 100 != 2: 161 logging.error("GET %s", request.url) 162 logging.error("response: %s", response.status_code) 163 logging.error(response.text) 164 raise HttpOperationError(self._deserialize, response) 165 else: 166 deserialized = self._deserialize('NewOrganization', response) 167 168 return deserialized 169 170 def list_regions(self): 171 """List what regions organizations can exist in""" 172 173 # Construct URL 174 url = '/_apis/hostacquisition/regions' 175 176 #construct header parameters 177 header_paramters = {} 178 header_paramters['Accept'] = 'application/json' 179 180 # Construct and send request 181 request = self._list_region_client.get(url, headers=header_paramters) 182 response = self._list_region_client.send(request) 183 184 # Handle Response 185 deserialized = None 186 if response.status_code // 100 != 2: 187 logging.error("GET %s", request.url) 188 logging.error("response: %s", response.status_code) 189 logging.error(response.text) 190 raise HttpOperationError(self._deserialize, response) 191 else: 192 deserialized = self._deserialize('Regions', response) 193 194 return deserialized 195 196 def close_connection(self): 197 """Close the sessions""" 198 self._client.close() 199 self._create_organization_client.close() 200