1import asyncio
2import socket
3import sys
4from typing import Optional
5
6import async_timeout
7
8from ..._errors import ProxyConnectionError, ProxyTimeoutError
9from ..._proto.http_async import HttpProto
10from ..._proto.socks4_async import Socks4Proto
11from ..._proto.socks5_async import Socks5Proto
12from ._stream import AsyncioSocketStream
13from ._resolver import Resolver
14
15from ._connect import connect_tcp
16from ... import _abc as abc
17
18DEFAULT_TIMEOUT = 60
19
20
21class AsyncioProxy(abc.AsyncProxy):
22    _stream: Optional[abc.AsyncSocketStream]
23
24    def __init__(
25        self,
26        proxy_host: str,
27        proxy_port: int,
28        loop: asyncio.AbstractEventLoop = None,
29    ):
30
31        if loop is None:
32            loop = asyncio.get_event_loop()
33
34        self._loop = loop
35
36        self._proxy_host = proxy_host
37        self._proxy_port = proxy_port
38
39        self._dest_host = None
40        self._dest_port = None
41        self._timeout = None
42
43        self._stream = None
44        self._resolver = Resolver(loop=loop)
45
46    async def connect(
47        self,
48        dest_host: str,
49        dest_port: int,
50        timeout: float = None,
51        _socket=None,
52    ) -> socket.socket:
53
54        if timeout is None:
55            timeout = DEFAULT_TIMEOUT
56
57        self._dest_host = dest_host
58        self._dest_port = dest_port
59        self._timeout = timeout
60
61        try:
62            return await self._connect(_socket=_socket)
63        except asyncio.TimeoutError as e:
64            raise ProxyTimeoutError('Proxy connection timed out: {}'.format(self._timeout)) from e
65
66    async def _connect(self, _socket=None) -> socket.socket:
67        async with async_timeout.timeout(self._timeout):
68            try:
69                if _socket is None:
70                    _socket = await connect_tcp(
71                        host=self._proxy_host,
72                        port=self._proxy_port,
73                        loop=self._loop,
74                    )
75
76                self._stream = AsyncioSocketStream(sock=_socket, loop=self._loop)
77                await self._negotiate()
78                return _socket
79            except OSError as e:
80                await self._close()
81                msg = 'Could not connect to proxy {}:{} [{}]'.format(
82                    self._proxy_host,
83                    self._proxy_port,
84                    e.strerror,
85                )
86                raise ProxyConnectionError(e.errno, msg) from e
87            except asyncio.CancelledError:  # pragma: no cover
88                # https://bugs.python.org/issue30064
89                # https://bugs.python.org/issue34795
90                if self._can_be_closed_safely():
91                    await self._close()
92                raise
93            except Exception:  # pragma: no cover
94                await self._close()
95                raise
96
97    def _can_be_closed_safely(self):  # pragma: no cover
98        def is_proactor_event_loop():
99            try:
100                from asyncio import ProactorEventLoop  # noqa
101            except ImportError:
102                return False
103            return isinstance(self._loop, ProactorEventLoop)
104
105        def is_uvloop_event_loop():
106            try:
107                from uvloop import Loop  # noqa
108            except ImportError:
109                return False
110            return isinstance(self._loop, Loop)
111
112        return sys.version_info[:2] >= (3, 8) or is_proactor_event_loop() or is_uvloop_event_loop()
113
114    async def _negotiate(self):
115        raise NotImplementedError()  # pragma: no cover
116
117    async def _close(self):
118        if self._stream is not None:
119            await self._stream.close()
120
121    @property
122    def proxy_host(self):
123        return self._proxy_host
124
125    @property
126    def proxy_port(self):
127        return self._proxy_port
128
129
130class Socks5Proxy(AsyncioProxy):
131    def __init__(
132        self,
133        proxy_host,
134        proxy_port,
135        username=None,
136        password=None,
137        rdns=None,
138        loop: asyncio.AbstractEventLoop = None,
139    ):
140        super().__init__(proxy_host=proxy_host, proxy_port=proxy_port, loop=loop)
141        self._username = username
142        self._password = password
143        self._rdns = rdns
144
145    async def _negotiate(self):
146        proto = Socks5Proto(
147            stream=self._stream,
148            resolver=self._resolver,
149            dest_host=self._dest_host,
150            dest_port=self._dest_port,
151            username=self._username,
152            password=self._password,
153            rdns=self._rdns,
154        )
155        await proto.negotiate()
156
157
158class Socks4Proxy(AsyncioProxy):
159    def __init__(
160        self,
161        proxy_host,
162        proxy_port,
163        user_id=None,
164        rdns=None,
165        loop: asyncio.AbstractEventLoop = None,
166    ):
167        super().__init__(proxy_host=proxy_host, proxy_port=proxy_port, loop=loop)
168        self._user_id = user_id
169        self._rdns = rdns
170
171    async def _negotiate(self):
172        proto = Socks4Proto(
173            stream=self._stream,
174            resolver=self._resolver,
175            dest_host=self._dest_host,
176            dest_port=self._dest_port,
177            user_id=self._user_id,
178            rdns=self._rdns,
179        )
180        await proto.negotiate()
181
182
183class HttpProxy(AsyncioProxy):
184    def __init__(
185        self,
186        proxy_host,
187        proxy_port,
188        username=None,
189        password=None,
190        loop: asyncio.AbstractEventLoop = None,
191    ):
192        super().__init__(proxy_host=proxy_host, proxy_port=proxy_port, loop=loop)
193        self._username = username
194        self._password = password
195
196    async def _negotiate(self):
197        proto = HttpProto(
198            stream=self._stream,
199            dest_host=self._dest_host,
200            dest_port=self._dest_port,
201            username=self._username,
202            password=self._password,
203        )
204        await proto.negotiate()
205