1import pycurl
2from io import BytesIO
3from threading import Thread, Lock
4import itertools
5from queue import Queue
6import collections
7
8from .exception import FuzzExceptBadOptions, FuzzExceptNetError
9
10from .factories.reqresp_factory import ReqRespRequestFactory
11
12# See https://curl.haxx.se/libcurl/c/libcurl-errors.html
13UNRECOVERABLE_PYCURL_EXCEPTIONS = [
14    28,  # Operation timeout. The specified time-out period was reached according to the conditions.
15    7,  # Failed to connect() to host or proxy.
16    6,  # Couldn't resolve host. The given remote host was not resolved.
17    5,  # Couldn't resolve proxy. The given proxy host could not be resolved.
18]
19
20# Other common pycurl exceptions:
21# Exception in perform (35, 'error:0B07C065:x509 certificate routines:X509_STORE_add_cert:cert already in hash table')
22# Exception in perform (18, 'SSL read: error:0B07C065:x509 certificate routines:X509_STORE_add_cert:cert already in hash table, errno 11')
23
24
25class HttpPool:
26    HTTPAUTH_BASIC, HTTPAUTH_NTLM, HTTPAUTH_DIGEST = ("basic", "ntlm", "digest")
27    newid = itertools.count(0)
28
29    def __init__(self, options):
30        self.processed = 0
31
32        self.exit_job = False
33        self.mutex_stats = Lock()
34
35        self.m = None
36        self.curlh_freelist = []
37        self._request_list = collections.deque()
38        self.handles = []
39
40        self.ths = None
41
42        self.pool_map = {}
43
44        self.options = options
45
46        self._registered = 0
47
48    def _initialize(self):
49        # pycurl Connection pool
50        self.m = pycurl.CurlMulti()
51        self.handles = []
52
53        for i in range(self.options.get("concurrent")):
54            curl_h = pycurl.Curl()
55            self.handles.append(curl_h)
56            self.curlh_freelist.append(curl_h)
57
58        # create threads
59        self.ths = []
60
61        for fn in ("_read_multi_stack",):
62            th = Thread(target=getattr(self, fn))
63            th.setName(fn)
64            self.ths.append(th)
65            th.start()
66
67    def job_stats(self):
68        with self.mutex_stats:
69            dic = {
70                "http_processed": self.processed,
71                "http_registered": self._registered,
72            }
73        return dic
74
75    # internal http pool control
76
77    def iter_results(self, poolid):
78        item = self.pool_map[poolid]["queue"].get()
79
80        if not item:
81            return
82
83        yield item
84
85    def _new_pool(self):
86        poolid = next(self.newid)
87        self.pool_map[poolid] = {}
88        self.pool_map[poolid]["queue"] = Queue()
89        self.pool_map[poolid]["proxy"] = None
90
91        if self.options.get("proxies"):
92            self.pool_map[poolid]["proxy"] = self._get_next_proxy(
93                self.options.get("proxies")
94            )
95
96        return poolid
97
98    def _prepare_curl_h(self, curl_h, fuzzres, poolid):
99        new_curl_h = ReqRespRequestFactory.to_http_object(
100            self.options, fuzzres.history, curl_h
101        )
102        new_curl_h = self._set_extra_options(new_curl_h, fuzzres, poolid)
103
104        new_curl_h.response_queue = (BytesIO(), BytesIO(), fuzzres, poolid)
105        new_curl_h.setopt(pycurl.WRITEFUNCTION, new_curl_h.response_queue[0].write)
106        new_curl_h.setopt(pycurl.HEADERFUNCTION, new_curl_h.response_queue[1].write)
107
108        return new_curl_h
109
110    def enqueue(self, fuzzres, poolid):
111        if self.exit_job:
112            return
113
114        self._request_list.append((fuzzres, poolid))
115
116    def _stop_to_pools(self):
117        for p in list(self.pool_map.keys()):
118            self.pool_map[p]["queue"].put(None)
119
120    def cleanup(self):
121        self.exit_job = True
122        for th in self.ths:
123            th.join()
124
125    def register(self):
126        with self.mutex_stats:
127            self._registered += 1
128
129        if not self.pool_map:
130            self._initialize()
131
132        return self._new_pool()
133
134    def deregister(self):
135        with self.mutex_stats:
136            self._registered -= 1
137
138            if self._registered <= 0:
139                self.cleanup()
140
141    def _get_next_proxy(self, proxy_list):
142        i = 0
143        while 1:
144            yield proxy_list[i]
145            i += 1
146            i = i % len(proxy_list)
147
148    def _set_extra_options(self, c, fuzzres, poolid):
149        if self.pool_map[poolid]["proxy"]:
150            ip, port, ptype = next(self.pool_map[poolid]["proxy"])
151
152            fuzzres.history.wf_proxy = (("%s:%s" % (ip, port)), ptype)
153
154            if ptype == "SOCKS5":
155                c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5)
156                c.setopt(pycurl.PROXY, "%s:%s" % (ip, port))
157            elif ptype == "SOCKS4":
158                c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4)
159                c.setopt(pycurl.PROXY, "%s:%s" % (ip, port))
160            elif ptype == "HTTP":
161                c.setopt(pycurl.PROXY, "%s:%s" % (ip, port))
162            else:
163                raise FuzzExceptBadOptions(
164                    "Bad proxy type specified, correct values are HTTP, SOCKS4 or SOCKS5."
165                )
166        else:
167            c.setopt(pycurl.PROXY, "")
168
169        mdelay = self.options.get("req_delay")
170        if mdelay is not None:
171            c.setopt(pycurl.TIMEOUT, mdelay)
172
173        cdelay = self.options.get("conn_delay")
174        if cdelay is not None:
175            c.setopt(pycurl.CONNECTTIMEOUT, cdelay)
176
177        return c
178
179    def _process_curl_handle(self, curl_h):
180        buff_body, buff_header, res, poolid = curl_h.response_queue
181
182        try:
183            ReqRespRequestFactory.from_http_object(
184                self.options,
185                res.history,
186                curl_h,
187                buff_header.getvalue(),
188                buff_body.getvalue(),
189            )
190        except Exception as e:
191            self.pool_map[poolid]["queue"].put(res.update(exception=e))
192        else:
193            # reset type to result otherwise backfeed items will enter an infinite loop
194            self.pool_map[poolid]["queue"].put(res.update())
195
196        with self.mutex_stats:
197            self.processed += 1
198
199    def _process_curl_should_retry(self, res, errno, poolid):
200        if errno not in UNRECOVERABLE_PYCURL_EXCEPTIONS:
201            res.history.wf_retries += 1
202
203            if res.history.wf_retries < self.options.get("retries"):
204                self._request_list.append((res, poolid))
205                return True
206
207        return False
208
209    def _process_curl_handle_error(self, res, errno, errmsg, poolid):
210        e = FuzzExceptNetError("Pycurl error %d: %s" % (errno, errmsg))
211        res.history.totaltime = 0
212        self.pool_map[poolid]["queue"].put(res.update(exception=e))
213
214        with self.mutex_stats:
215            self.processed += 1
216
217    def _read_multi_stack(self):
218        # Check for curl objects which have terminated, and add them to the curlh_freelist
219        while not self.exit_job:
220            while not self.exit_job:
221                ret, num_handles = self.m.perform()
222                if ret != pycurl.E_CALL_MULTI_PERFORM:
223                    break
224
225            num_q, ok_list, err_list = self.m.info_read()
226            for curl_h in ok_list:
227                self._process_curl_handle(curl_h)
228                self.m.remove_handle(curl_h)
229                self.curlh_freelist.append(curl_h)
230
231            for curl_h, errno, errmsg in err_list:
232                buff_body, buff_header, res, poolid = curl_h.response_queue
233
234                if not self._process_curl_should_retry(res, errno, poolid):
235                    self._process_curl_handle_error(res, errno, errmsg, poolid)
236
237                self.m.remove_handle(curl_h)
238                self.curlh_freelist.append(curl_h)
239
240            while self.curlh_freelist and self._request_list:
241                curl_h = self.curlh_freelist.pop()
242                fuzzres, poolid = self._request_list.popleft()
243
244                self.m.add_handle(self._prepare_curl_h(curl_h, fuzzres, poolid))
245
246        self._stop_to_pools()
247
248        # cleanup multi stack
249        for c in self.handles:
250            c.close()
251            self.curlh_freelist.append(c)
252        self.m.close()
253