1import pycurl 2from io import BytesIO 3from threading import Thread, Lock 4import itertools 5from queue import Queue 6import collections 7 8from .exception import FuzzExceptBadOptions, FuzzExceptNetError 9 10from .factories.reqresp_factory import ReqRespRequestFactory 11 12# See https://curl.haxx.se/libcurl/c/libcurl-errors.html 13UNRECOVERABLE_PYCURL_EXCEPTIONS = [ 14 28, # Operation timeout. The specified time-out period was reached according to the conditions. 15 7, # Failed to connect() to host or proxy. 16 6, # Couldn't resolve host. The given remote host was not resolved. 17 5, # Couldn't resolve proxy. The given proxy host could not be resolved. 18] 19 20# Other common pycurl exceptions: 21# Exception in perform (35, 'error:0B07C065:x509 certificate routines:X509_STORE_add_cert:cert already in hash table') 22# Exception in perform (18, 'SSL read: error:0B07C065:x509 certificate routines:X509_STORE_add_cert:cert already in hash table, errno 11') 23 24 25class HttpPool: 26 HTTPAUTH_BASIC, HTTPAUTH_NTLM, HTTPAUTH_DIGEST = ("basic", "ntlm", "digest") 27 newid = itertools.count(0) 28 29 def __init__(self, options): 30 self.processed = 0 31 32 self.exit_job = False 33 self.mutex_stats = Lock() 34 35 self.m = None 36 self.curlh_freelist = [] 37 self._request_list = collections.deque() 38 self.handles = [] 39 40 self.ths = None 41 42 self.pool_map = {} 43 44 self.options = options 45 46 self._registered = 0 47 48 def _initialize(self): 49 # pycurl Connection pool 50 self.m = pycurl.CurlMulti() 51 self.handles = [] 52 53 for i in range(self.options.get("concurrent")): 54 curl_h = pycurl.Curl() 55 self.handles.append(curl_h) 56 self.curlh_freelist.append(curl_h) 57 58 # create threads 59 self.ths = [] 60 61 for fn in ("_read_multi_stack",): 62 th = Thread(target=getattr(self, fn)) 63 th.setName(fn) 64 self.ths.append(th) 65 th.start() 66 67 def job_stats(self): 68 with self.mutex_stats: 69 dic = { 70 "http_processed": self.processed, 71 "http_registered": self._registered, 72 } 73 return dic 74 75 # internal http pool control 76 77 def iter_results(self, poolid): 78 item = self.pool_map[poolid]["queue"].get() 79 80 if not item: 81 return 82 83 yield item 84 85 def _new_pool(self): 86 poolid = next(self.newid) 87 self.pool_map[poolid] = {} 88 self.pool_map[poolid]["queue"] = Queue() 89 self.pool_map[poolid]["proxy"] = None 90 91 if self.options.get("proxies"): 92 self.pool_map[poolid]["proxy"] = self._get_next_proxy( 93 self.options.get("proxies") 94 ) 95 96 return poolid 97 98 def _prepare_curl_h(self, curl_h, fuzzres, poolid): 99 new_curl_h = ReqRespRequestFactory.to_http_object( 100 self.options, fuzzres.history, curl_h 101 ) 102 new_curl_h = self._set_extra_options(new_curl_h, fuzzres, poolid) 103 104 new_curl_h.response_queue = (BytesIO(), BytesIO(), fuzzres, poolid) 105 new_curl_h.setopt(pycurl.WRITEFUNCTION, new_curl_h.response_queue[0].write) 106 new_curl_h.setopt(pycurl.HEADERFUNCTION, new_curl_h.response_queue[1].write) 107 108 return new_curl_h 109 110 def enqueue(self, fuzzres, poolid): 111 if self.exit_job: 112 return 113 114 self._request_list.append((fuzzres, poolid)) 115 116 def _stop_to_pools(self): 117 for p in list(self.pool_map.keys()): 118 self.pool_map[p]["queue"].put(None) 119 120 def cleanup(self): 121 self.exit_job = True 122 for th in self.ths: 123 th.join() 124 125 def register(self): 126 with self.mutex_stats: 127 self._registered += 1 128 129 if not self.pool_map: 130 self._initialize() 131 132 return self._new_pool() 133 134 def deregister(self): 135 with self.mutex_stats: 136 self._registered -= 1 137 138 if self._registered <= 0: 139 self.cleanup() 140 141 def _get_next_proxy(self, proxy_list): 142 i = 0 143 while 1: 144 yield proxy_list[i] 145 i += 1 146 i = i % len(proxy_list) 147 148 def _set_extra_options(self, c, fuzzres, poolid): 149 if self.pool_map[poolid]["proxy"]: 150 ip, port, ptype = next(self.pool_map[poolid]["proxy"]) 151 152 fuzzres.history.wf_proxy = (("%s:%s" % (ip, port)), ptype) 153 154 if ptype == "SOCKS5": 155 c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5) 156 c.setopt(pycurl.PROXY, "%s:%s" % (ip, port)) 157 elif ptype == "SOCKS4": 158 c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS4) 159 c.setopt(pycurl.PROXY, "%s:%s" % (ip, port)) 160 elif ptype == "HTTP": 161 c.setopt(pycurl.PROXY, "%s:%s" % (ip, port)) 162 else: 163 raise FuzzExceptBadOptions( 164 "Bad proxy type specified, correct values are HTTP, SOCKS4 or SOCKS5." 165 ) 166 else: 167 c.setopt(pycurl.PROXY, "") 168 169 mdelay = self.options.get("req_delay") 170 if mdelay is not None: 171 c.setopt(pycurl.TIMEOUT, mdelay) 172 173 cdelay = self.options.get("conn_delay") 174 if cdelay is not None: 175 c.setopt(pycurl.CONNECTTIMEOUT, cdelay) 176 177 return c 178 179 def _process_curl_handle(self, curl_h): 180 buff_body, buff_header, res, poolid = curl_h.response_queue 181 182 try: 183 ReqRespRequestFactory.from_http_object( 184 self.options, 185 res.history, 186 curl_h, 187 buff_header.getvalue(), 188 buff_body.getvalue(), 189 ) 190 except Exception as e: 191 self.pool_map[poolid]["queue"].put(res.update(exception=e)) 192 else: 193 # reset type to result otherwise backfeed items will enter an infinite loop 194 self.pool_map[poolid]["queue"].put(res.update()) 195 196 with self.mutex_stats: 197 self.processed += 1 198 199 def _process_curl_should_retry(self, res, errno, poolid): 200 if errno not in UNRECOVERABLE_PYCURL_EXCEPTIONS: 201 res.history.wf_retries += 1 202 203 if res.history.wf_retries < self.options.get("retries"): 204 self._request_list.append((res, poolid)) 205 return True 206 207 return False 208 209 def _process_curl_handle_error(self, res, errno, errmsg, poolid): 210 e = FuzzExceptNetError("Pycurl error %d: %s" % (errno, errmsg)) 211 res.history.totaltime = 0 212 self.pool_map[poolid]["queue"].put(res.update(exception=e)) 213 214 with self.mutex_stats: 215 self.processed += 1 216 217 def _read_multi_stack(self): 218 # Check for curl objects which have terminated, and add them to the curlh_freelist 219 while not self.exit_job: 220 while not self.exit_job: 221 ret, num_handles = self.m.perform() 222 if ret != pycurl.E_CALL_MULTI_PERFORM: 223 break 224 225 num_q, ok_list, err_list = self.m.info_read() 226 for curl_h in ok_list: 227 self._process_curl_handle(curl_h) 228 self.m.remove_handle(curl_h) 229 self.curlh_freelist.append(curl_h) 230 231 for curl_h, errno, errmsg in err_list: 232 buff_body, buff_header, res, poolid = curl_h.response_queue 233 234 if not self._process_curl_should_retry(res, errno, poolid): 235 self._process_curl_handle_error(res, errno, errmsg, poolid) 236 237 self.m.remove_handle(curl_h) 238 self.curlh_freelist.append(curl_h) 239 240 while self.curlh_freelist and self._request_list: 241 curl_h = self.curlh_freelist.pop() 242 fuzzres, poolid = self._request_list.popleft() 243 244 self.m.add_handle(self._prepare_curl_h(curl_h, fuzzres, poolid)) 245 246 self._stop_to_pools() 247 248 # cleanup multi stack 249 for c in self.handles: 250 c.close() 251 self.curlh_freelist.append(c) 252 self.m.close() 253