1# coding=utf-8 2# -------------------------------------------------------------------------- 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# Licensed under the MIT License. See License.txt in the project root for license information. 5# Code generated by Microsoft (R) AutoRest Code Generator. 6# Changes may cause incorrect behavior and will be lost if the code is regenerated. 7# -------------------------------------------------------------------------- 8from typing import Any, AsyncIterable, Callable, Dict, Generic, Optional, TypeVar 9import warnings 10 11from azure.core.async_paging import AsyncItemPaged, AsyncList 12from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error 13from azure.core.pipeline import PipelineResponse 14from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest 15from azure.mgmt.core.exceptions import ARMErrorFormat 16 17from ... import models as _models 18 19T = TypeVar('T') 20ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]] 21 22class VpnLinkConnectionsOperations: 23 """VpnLinkConnectionsOperations async operations. 24 25 You should not instantiate this class directly. Instead, you should create a Client instance that 26 instantiates it for you and attaches it as an attribute. 27 28 :ivar models: Alias to model classes used in this operation group. 29 :type models: ~azure.mgmt.network.v2020_03_01.models 30 :param client: Client for service requests. 31 :param config: Configuration of service client. 32 :param serializer: An object model serializer. 33 :param deserializer: An object model deserializer. 34 """ 35 36 models = _models 37 38 def __init__(self, client, config, serializer, deserializer) -> None: 39 self._client = client 40 self._serialize = serializer 41 self._deserialize = deserializer 42 self._config = config 43 44 def list_by_vpn_connection( 45 self, 46 resource_group_name: str, 47 gateway_name: str, 48 connection_name: str, 49 **kwargs 50 ) -> AsyncIterable["_models.ListVpnSiteLinkConnectionsResult"]: 51 """Retrieves all vpn site link connections for a particular virtual wan vpn gateway vpn 52 connection. 53 54 :param resource_group_name: The resource group name of the VpnGateway. 55 :type resource_group_name: str 56 :param gateway_name: The name of the gateway. 57 :type gateway_name: str 58 :param connection_name: The name of the vpn connection. 59 :type connection_name: str 60 :keyword callable cls: A custom type or function that will be passed the direct response 61 :return: An iterator like instance of either ListVpnSiteLinkConnectionsResult or the result of cls(response) 62 :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.network.v2020_03_01.models.ListVpnSiteLinkConnectionsResult] 63 :raises: ~azure.core.exceptions.HttpResponseError 64 """ 65 cls = kwargs.pop('cls', None) # type: ClsType["_models.ListVpnSiteLinkConnectionsResult"] 66 error_map = { 67 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 68 } 69 error_map.update(kwargs.pop('error_map', {})) 70 api_version = "2020-03-01" 71 accept = "application/json" 72 73 def prepare_request(next_link=None): 74 # Construct headers 75 header_parameters = {} # type: Dict[str, Any] 76 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 77 78 if not next_link: 79 # Construct URL 80 url = self.list_by_vpn_connection.metadata['url'] # type: ignore 81 path_format_arguments = { 82 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 83 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 84 'gatewayName': self._serialize.url("gateway_name", gateway_name, 'str'), 85 'connectionName': self._serialize.url("connection_name", connection_name, 'str'), 86 } 87 url = self._client.format_url(url, **path_format_arguments) 88 # Construct parameters 89 query_parameters = {} # type: Dict[str, Any] 90 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 91 92 request = self._client.get(url, query_parameters, header_parameters) 93 else: 94 url = next_link 95 query_parameters = {} # type: Dict[str, Any] 96 request = self._client.get(url, query_parameters, header_parameters) 97 return request 98 99 async def extract_data(pipeline_response): 100 deserialized = self._deserialize('ListVpnSiteLinkConnectionsResult', pipeline_response) 101 list_of_elem = deserialized.value 102 if cls: 103 list_of_elem = cls(list_of_elem) 104 return deserialized.next_link or None, AsyncList(list_of_elem) 105 106 async def get_next(next_link=None): 107 request = prepare_request(next_link) 108 109 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 110 response = pipeline_response.http_response 111 112 if response.status_code not in [200]: 113 map_error(status_code=response.status_code, response=response, error_map=error_map) 114 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 115 116 return pipeline_response 117 118 return AsyncItemPaged( 119 get_next, extract_data 120 ) 121 list_by_vpn_connection.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/vpnGateways/{gatewayName}/vpnConnections/{connectionName}/vpnLinkConnections'} # type: ignore 122