1import asyncio 2import socket 3import sys 4from typing import Optional 5 6import async_timeout 7 8from ..._errors import ProxyConnectionError, ProxyTimeoutError 9from ..._proto.http_async import HttpProto 10from ..._proto.socks4_async import Socks4Proto 11from ..._proto.socks5_async import Socks5Proto 12from ._stream import AsyncioSocketStream 13from ._resolver import Resolver 14 15from ._connect import connect_tcp 16from ... import _abc as abc 17 18DEFAULT_TIMEOUT = 60 19 20 21class AsyncioProxy(abc.AsyncProxy): 22 _stream: Optional[abc.AsyncSocketStream] 23 24 def __init__( 25 self, 26 proxy_host: str, 27 proxy_port: int, 28 loop: asyncio.AbstractEventLoop = None, 29 ): 30 31 if loop is None: 32 loop = asyncio.get_event_loop() 33 34 self._loop = loop 35 36 self._proxy_host = proxy_host 37 self._proxy_port = proxy_port 38 39 self._dest_host = None 40 self._dest_port = None 41 self._timeout = None 42 43 self._stream = None 44 self._resolver = Resolver(loop=loop) 45 46 async def connect( 47 self, 48 dest_host: str, 49 dest_port: int, 50 timeout: float = None, 51 _socket=None, 52 ) -> socket.socket: 53 54 if timeout is None: 55 timeout = DEFAULT_TIMEOUT 56 57 self._dest_host = dest_host 58 self._dest_port = dest_port 59 self._timeout = timeout 60 61 try: 62 return await self._connect(_socket=_socket) 63 except asyncio.TimeoutError as e: 64 raise ProxyTimeoutError('Proxy connection timed out: {}'.format(self._timeout)) from e 65 66 async def _connect(self, _socket=None) -> socket.socket: 67 async with async_timeout.timeout(self._timeout): 68 try: 69 if _socket is None: 70 _socket = await connect_tcp( 71 host=self._proxy_host, 72 port=self._proxy_port, 73 loop=self._loop, 74 ) 75 76 self._stream = AsyncioSocketStream(sock=_socket, loop=self._loop) 77 await self._negotiate() 78 return _socket 79 except OSError as e: 80 await self._close() 81 msg = 'Could not connect to proxy {}:{} [{}]'.format( 82 self._proxy_host, 83 self._proxy_port, 84 e.strerror, 85 ) 86 raise ProxyConnectionError(e.errno, msg) from e 87 except asyncio.CancelledError: # pragma: no cover 88 # https://bugs.python.org/issue30064 89 # https://bugs.python.org/issue34795 90 if self._can_be_closed_safely(): 91 await self._close() 92 raise 93 except Exception: # pragma: no cover 94 await self._close() 95 raise 96 97 def _can_be_closed_safely(self): # pragma: no cover 98 def is_proactor_event_loop(): 99 try: 100 from asyncio import ProactorEventLoop # noqa 101 except ImportError: 102 return False 103 return isinstance(self._loop, ProactorEventLoop) 104 105 def is_uvloop_event_loop(): 106 try: 107 from uvloop import Loop # noqa 108 except ImportError: 109 return False 110 return isinstance(self._loop, Loop) 111 112 return sys.version_info[:2] >= (3, 8) or is_proactor_event_loop() or is_uvloop_event_loop() 113 114 async def _negotiate(self): 115 raise NotImplementedError() # pragma: no cover 116 117 async def _close(self): 118 if self._stream is not None: 119 await self._stream.close() 120 121 @property 122 def proxy_host(self): 123 return self._proxy_host 124 125 @property 126 def proxy_port(self): 127 return self._proxy_port 128 129 130class Socks5Proxy(AsyncioProxy): 131 def __init__( 132 self, 133 proxy_host, 134 proxy_port, 135 username=None, 136 password=None, 137 rdns=None, 138 loop: asyncio.AbstractEventLoop = None, 139 ): 140 super().__init__(proxy_host=proxy_host, proxy_port=proxy_port, loop=loop) 141 self._username = username 142 self._password = password 143 self._rdns = rdns 144 145 async def _negotiate(self): 146 proto = Socks5Proto( 147 stream=self._stream, 148 resolver=self._resolver, 149 dest_host=self._dest_host, 150 dest_port=self._dest_port, 151 username=self._username, 152 password=self._password, 153 rdns=self._rdns, 154 ) 155 await proto.negotiate() 156 157 158class Socks4Proxy(AsyncioProxy): 159 def __init__( 160 self, 161 proxy_host, 162 proxy_port, 163 user_id=None, 164 rdns=None, 165 loop: asyncio.AbstractEventLoop = None, 166 ): 167 super().__init__(proxy_host=proxy_host, proxy_port=proxy_port, loop=loop) 168 self._user_id = user_id 169 self._rdns = rdns 170 171 async def _negotiate(self): 172 proto = Socks4Proto( 173 stream=self._stream, 174 resolver=self._resolver, 175 dest_host=self._dest_host, 176 dest_port=self._dest_port, 177 user_id=self._user_id, 178 rdns=self._rdns, 179 ) 180 await proto.negotiate() 181 182 183class HttpProxy(AsyncioProxy): 184 def __init__( 185 self, 186 proxy_host, 187 proxy_port, 188 username=None, 189 password=None, 190 loop: asyncio.AbstractEventLoop = None, 191 ): 192 super().__init__(proxy_host=proxy_host, proxy_port=proxy_port, loop=loop) 193 self._username = username 194 self._password = password 195 196 async def _negotiate(self): 197 proto = HttpProto( 198 stream=self._stream, 199 dest_host=self._dest_host, 200 dest_port=self._dest_port, 201 username=self._username, 202 password=self._password, 203 ) 204 await proto.negotiate() 205