1 
2 #include <unistd.h>
3 #include <sys/time.h>
4 #include <atomic>
5 #include <algorithm>
6 
7 #define LOCAL_DEBUG
8 #include "debug.h"
9 
10 #include "config.h"
11 #include "acfg.h"
12 #include "dlcon.h"
13 
14 #include "fileitem.h"
15 #include "fileio.h"
16 #include "sockio.h"
17 
18 #ifdef HAVE_LINUX_EVENTFD
19 #include <sys/eventfd.h>
20 #endif
21 
22 using namespace std;
23 
24 // evil hack to simulate random disconnects
25 //#define DISCO_FAILURE
26 
27 #define MAX_RETRY 11
28 
29 namespace acng
30 {
31 
32 static cmstring sGenericError("567 Unknown download error occured");
33 
34 // those are not allowed to be forwarded
35 static const auto taboo =
36 {
37 	string("Host"), string("Cache-Control"),
38 	string("Proxy-Authorization"), string("Accept"),
39 	string("User-Agent")
40 };
41 
42 std::atomic_uint g_nDlCons(0);
43 
dlcon(bool bManualExecution,string * xff,IDlConFactory * pConFactory)44 dlcon::dlcon(bool bManualExecution, string *xff, IDlConFactory *pConFactory) :
45 		m_pConFactory(pConFactory), m_bStopASAP(false), m_bManualMode(bManualExecution),
46 		m_nTempPipelineDisable(0),
47 		m_bProxyTot(false)
48 {
49 	LOGSTART("dlcon::dlcon");
50 #ifdef HAVE_LINUX_EVENTFD
51 	m_wakeventfd=eventfd(0, 0);
52 	if(m_wakeventfd>=0)
53 		set_nb(m_wakeventfd);
54 #else
55 	if (0 == pipe(m_wakepipe))
56 	{
57 		set_nb(m_wakepipe[0]);
58 		set_nb(m_wakepipe[1]);
59 	}
60 #endif
61 	if (xff)
62 		m_sXForwardedFor = *xff;
63   g_nDlCons++;
64 }
65 
66 struct tDlJob
67 {
68 	tFileItemPtr m_pStorage;
69 	mstring sErrorMsg;
70 	dlcon &m_parent;
71 
HasBrokenStorageacng::tDlJob72 	inline bool HasBrokenStorage()
73 	{
74 		return (!m_pStorage || m_pStorage->GetStatus() > fileitem::FIST_COMPLETE);
75 	}
76 
77 #define HINT_MORE 0
78 #define HINT_DONE 1
79 #define HINT_DISCON 2
80 #define EFLAG_JOB_BROKEN 4
81 #define EFLAG_MIRROR_BROKEN 8
82 #define EFLAG_STORE_COLLISION 16
83 #define HINT_SWITCH 32
84 #define EFLAG_LOST_CON 64
85 #define HINT_KILL_LAST_FILE 128
86 #define HINT_TGTCHANGE 256
87 
88 	const cfg::tRepoData * m_pRepoDesc=nullptr;
89 
90 	/*!
91 	 * Returns a reference to http url where host and port and protocol match the current host
92 	 * Other fields in that member have undefined contents. ;-)
93 	 */
GetPeerHostacng::tDlJob94 	inline const tHttpUrl& GetPeerHost()
95 	{
96 		return m_pCurBackend ? *m_pCurBackend : m_remoteUri;
97 	}
98 
GetConnStateTrackeracng::tDlJob99 	inline cfg::tRepoData::IHookHandler * GetConnStateTracker()
100 	{
101 		return m_pRepoDesc ? m_pRepoDesc->m_pHooks : nullptr;
102 	}
103 
104 	typedef enum : char
105 	{
106 		STATE_GETHEADER, STATE_REGETHEADER, STATE_PROCESS_DATA,
107 		STATE_GETCHUNKHEAD, STATE_PROCESS_CHUNKDATA, STATE_GET_CHUNKTRAILER,
108 		STATE_FINISHJOB
109 	} tDlState;
110 
111 	string m_extraHeaders;
112 
113 	tHttpUrl m_remoteUri;
114 	const tHttpUrl *m_pCurBackend=nullptr;
115 
116 	uint_fast8_t m_eReconnectASAP =0;
117 	bool m_bBackendMode=false;
118 
119 	off_t m_nRest =0;
120 
121 	tDlState m_DlState = STATE_GETHEADER;
122 
123 	int m_nRedirRemaining;
124 
tDlJobacng::tDlJob125 	inline tDlJob(dlcon *p, tFileItemPtr pFi,
126 			const tHttpUrl *pUri,
127 			const cfg::tRepoData * pRepoData,
128 			const std::string *psPath,
129 			int redirmax) :
130 			m_pStorage(pFi),
131 			m_parent(*p),
132 			m_pRepoDesc(pRepoData),
133 			m_nRedirRemaining(redirmax)
134 	{
135 		LOGSTART("tDlJob::tDlJob");
136 		ldbg("uri: " << (pUri ? pUri->ToURI(false) :  sEmptyString )
137 				<< ", " << "restpath: " << (psPath?*psPath:sEmptyString)
138 		<< "repo: " << uintptr_t(pRepoData)
139 		);
140 		if (m_pStorage)
141 			m_pStorage->IncDlRefCount();
142 		if(pUri)
143 			m_remoteUri=*pUri;
144 		else
145 		{
146 			m_remoteUri.sPath=*psPath;
147 			m_bBackendMode=true;
148 		}
149 	}
150 
~tDlJobacng::tDlJob151 	~tDlJob()
152 	{
153 		if (m_pStorage)
154 			m_pStorage->DecDlRefCount(sErrorMsg.empty() ? sGenericError : sErrorMsg);
155 	}
156 
ExtractCustomHeadersacng::tDlJob157 	inline void ExtractCustomHeaders(LPCSTR reqHead)
158 	{
159 		if(!reqHead)
160 			return;
161 		header h;
162 		bool forbidden=false;
163 		h.Load(reqHead, (unsigned) std::numeric_limits<int>::max(),
164 				[this, &forbidden](cmstring& key, cmstring& rest)
165 				{
166 			// heh, continuation of ignored stuff or without start?
167 			if(key.empty() && (m_extraHeaders.empty() || forbidden))
168 				return;
169 			forbidden = taboo.end() != std::find_if(taboo.begin(), taboo.end(),
170 					[&key](cmstring &x){return scaseequals(x,key);});
171 			if(!forbidden)
172 				m_extraHeaders += key + rest;
173 				}
174 		);
175 	}
176 
RemoteUriacng::tDlJob177 	inline string RemoteUri(bool bUrlEncoded)
178 	{
179 		if(m_pCurBackend)
180 			return m_pCurBackend->ToURI(bUrlEncoded) +
181 					( bUrlEncoded ? UrlEscape(m_remoteUri.sPath)
182 							: m_remoteUri.sPath);
183 
184 		return m_remoteUri.ToURI(bUrlEncoded);
185 	}
186 
RewriteSourceacng::tDlJob187 	inline bool RewriteSource(const char *pNewUrl)
188 	{
189 		LOGSTART("tDlJob::RewriteSource");
190 		if (--m_nRedirRemaining <= 0)
191 		{
192 			sErrorMsg = "500 Bad redirection (loop)";
193 			return false;
194 		}
195 
196 		if (!pNewUrl || !*pNewUrl)
197 		{
198 			sErrorMsg = "500 Bad redirection (empty)";
199 			return false;
200 		}
201 
202 		// start modifying the target URL, point of no return
203 		m_pCurBackend = nullptr;
204 		bool bWasBeMode = m_bBackendMode;
205 		m_bBackendMode = false;
206 		sErrorMsg = "500 Bad redirection (path)";
207 
208 		auto sLocationDecoded = UrlUnescape(pNewUrl);
209 
210 		tHttpUrl newUri;
211 		if (newUri.SetHttpUrl(sLocationDecoded, false))
212 		{
213 			dbgline;
214 			m_remoteUri = newUri;
215 			return true;
216 		}
217 		// ok, some protocol-relative crap? let it parse the hostname but keep the protocol
218 		if (startsWithSz(sLocationDecoded, "//"))
219 		{
220 			stripPrefixChars(sLocationDecoded, "/");
221 			return m_remoteUri.SetHttpUrl(
222 					m_remoteUri.GetProtoPrefix() + sLocationDecoded);
223 		}
224 
225 		// recreate the full URI descriptor matching the last download
226 		if(bWasBeMode)
227 		{
228 			if(!m_pCurBackend)
229 				return false;
230 			auto sPathBackup=m_remoteUri.sPath;
231 			m_remoteUri=*m_pCurBackend;
232 			m_remoteUri.sPath+=sPathBackup;
233 		}
234 
235 		if (startsWithSz(sLocationDecoded, "/"))
236 		{
237 			m_remoteUri.sPath = sLocationDecoded;
238 			return true;
239 		}
240 		// ok, must be relative
241 		m_remoteUri.sPath+=(sPathSepUnix+sLocationDecoded);
242 		return true;
243 	}
244 
SetupJobConfigacng::tDlJob245 	bool SetupJobConfig(mstring& sReasonMsg, decltype(dlcon::m_blacklist) &blacklist)
246 	{
247 		LOGSTART("dlcon::SetupJobConfig");
248 
249 		// using backends? Find one which is not blacklisted
250 		if (m_bBackendMode)
251 		{
252 			// keep the existing one if possible
253 			if (m_pCurBackend)
254 			{
255 				LOG(
256 						"Checking [" << m_pCurBackend->sHost << "]:" << m_pCurBackend->GetPort());
257 				const auto bliter = blacklist.find(
258 						make_pair(m_pCurBackend->sHost, m_pCurBackend->GetPort()));
259 				if (bliter == blacklist.end())
260 					return true;
261 			}
262 
263 			// look in the constant list, either it's usable or it was blacklisted before
264 			for (const auto& bend : m_pRepoDesc->m_backends)
265 			{
266 				const auto bliter = blacklist.find(make_pair(bend.sHost, bend.GetPort()));
267 				if (bliter == blacklist.end())
268 				{
269 					m_pCurBackend = &bend;
270 					return true;
271 				}
272 
273 				// uh, blacklisted, remember the last reason
274 				sReasonMsg = bliter->second;
275 			}
276 			return false;
277 		}
278 
279 		// ok, not backend mode. Check the mirror data (vs. blacklist)
280 		auto bliter = blacklist.find(
281 				make_pair(GetPeerHost().sHost, GetPeerHost().GetPort()));
282 		if (bliter == blacklist.end())
283 			return true;
284 
285 		sReasonMsg = bliter->second;
286 		return false;
287 	}
288 
289 	// needs connectedHost, blacklist, output buffer from the parent, proxy mode?
AppendRequestacng::tDlJob290 	inline void AppendRequest(tSS &head, cmstring &xff, const tHttpUrl *proxy)
291 	{
292 		LOGSTART("tDlJob::AppendRequest");
293 
294 		head << (m_pStorage->m_bHeadOnly ? "HEAD " : "GET ");
295 
296 		if (proxy)
297 			head << RemoteUri(true);
298 		else // only absolute path without scheme
299 		{
300 			if (m_pCurBackend) // base dir from backend definition
301 				head << UrlEscape(m_pCurBackend->sPath);
302 
303 			head << UrlEscape(m_remoteUri.sPath);
304 		}
305 
306 		ldbg(RemoteUri(true));
307 
308 		head << " HTTP/1.1\r\n" << cfg::agentheader << "Host: " << GetPeerHost().sHost << "\r\n";
309 
310 		if (proxy) // proxy stuff, and add authorization if there is any
311 		{
312 			ldbg("using proxy");
313 			if(!proxy->sUserPass.empty())
314 			{
315 				head << "Proxy-Authorization: Basic "
316 						<< EncodeBase64Auth(proxy->sUserPass) << "\r\n";
317 			}
318 			// Proxy-Connection is a non-sensical copy of Connection but some proxy
319 			// might listen only to this one so better add it
320 			head << (cfg::persistoutgoing ? "Proxy-Connection: keep-alive\r\n"
321 									: "Proxy-Connection: close\r\n");
322 		}
323 
324 		const auto& pSourceHost = GetPeerHost();
325 		if(!pSourceHost.sUserPass.empty())
326 		{
327 			head << "Authorization: Basic "
328 				<< EncodeBase64Auth(pSourceHost.sUserPass) << "\r\n";
329 		}
330 
331 		// either by backend or by host in file uri, never both
332 		//XXX: still needed? Checked while inserting already.
333 		// ASSERT( (m_pCurBackend && m_fileUri.sHost.empty()) || (!m_pCurBackend && !m_fileUri.sHost.empty()));
334 
335 		if (m_pStorage->m_nSizeSeen > 0 || m_pStorage->m_nRangeLimit >=0)
336 		{
337 			bool bSetRange(false), bSetIfRange(false);
338 
339 			lockguard g(m_pStorage.get());
340 			const header &pHead = m_pStorage->GetHeaderUnlocked();
341 
342 			if (m_pStorage->m_bCheckFreshness)
343 			{
344 				if (pHead.h[header::LAST_MODIFIED])
345 				{
346 					if (cfg::vrangeops > 0)
347 					{
348 						bSetIfRange = true;
349 						bSetRange = true;
350 					}
351 					else if(cfg::vrangeops == 0)
352 					{
353 						head << "If-Modified-Since: " << pHead.h[header::LAST_MODIFIED] << "\r\n";
354 					}
355 				}
356 			}
357 			else
358 			{
359 				/////////////// this was protection against broken stuff in the pool ////
360 				// static file type, date does not matter. check known content length, not risking "range not satisfiable" result
361 				//
362 				//off_t nContLen=atol(h.get("Content-Length"));
363 				//if (nContLen>0 && j->m_pStorage->m_nFileSize < nContLen)
364 				bSetRange = true;
365 			}
366 
367 			/*
368 			if(m_pStorage->m_nSizeSeen >0 && m_pStorage->m_nRangeLimit>=0)
369 			{
370 				bool bSaneRange=m_pStorage->m_nRangeLimit >=m_pStorage->m_nSizeSeen;
371 				// just to be sure
372 				ASSERT(bSaneRange);
373 			}
374 			if(m_pStorage->m_nRangeLimit < m_pStorage->m_nSizeSeen)
375 				bSetRange = bSetIfRange = false;
376 */
377 
378 			/* use APT's old trick - set the starting position one byte lower -
379 			 * this way the server has to send at least one byte if the assumed
380 			 * position is correct, and we never get a 416 error (one byte
381 			 * waste is acceptable).
382 			 * */
383 			if (bSetRange)
384 			{
385 				head << "Range: bytes=" << std::max(off_t(0), m_pStorage->m_nSizeSeen - 1) << "-";
386 				if(m_pStorage->m_nRangeLimit>=0)
387 					head << m_pStorage->m_nRangeLimit;
388 				head << "\r\n";
389 			}
390 
391 			if (bSetIfRange)
392 				head << "If-Range: " << pHead.h[header::LAST_MODIFIED] << "\r\n";
393 		}
394 
395 		if (m_pStorage->m_bCheckFreshness)
396 			head << "Cache-Control: no-store,no-cache,max-age=0\r\n";
397 
398 		if (cfg::exporigin && !xff.empty())
399 			head << "X-Forwarded-For: " << xff << "\r\n";
400 
401 		head << cfg::requestapx
402 				<< m_extraHeaders
403 				<< "Accept: application/octet-stream\r\n"
404 				"Accept-Encoding: identity\r\n"
405 				"Connection: "
406 				<< (cfg::persistoutgoing ? "keep-alive\r\n\r\n" : "close\r\n\r\n");
407 
408 #ifdef SPAM
409 		//head.syswrite(2);
410 #endif
411 
412 	}
413 
NewDataHandleracng::tDlJob414 	inline uint_fast8_t NewDataHandler(acbuf & inBuf)
415 	{
416 		LOGSTART("tDlJob::NewDataHandler");
417 		while (true)
418 		{
419 			off_t nToStore = min((off_t) inBuf.size(), m_nRest);
420 			ldbg("To store: " <<nToStore);
421 			if (0 == nToStore)
422 				break;
423 
424 			if (!m_pStorage->StoreFileData(inBuf.rptr(), nToStore))
425 			{
426 				dbgline;
427 				sErrorMsg = "502 Could not store data";
428 				return HINT_DISCON | EFLAG_JOB_BROKEN;
429 			}
430 
431 			m_nRest -= nToStore;
432 			inBuf.drop(nToStore);
433 		}
434 
435 		ldbg("Rest: " << m_nRest);
436 
437 		if (m_nRest != 0)
438 			return HINT_MORE; // will come back
439 
440 		m_DlState = (STATE_PROCESS_DATA == m_DlState) ? STATE_FINISHJOB : STATE_GETCHUNKHEAD;
441 		return HINT_SWITCH;
442 	}
443 
444 	/*!
445 	 *
446 	 * Process new incoming data and write it down to disk or other receivers.
447 	 */
ProcessIncommingacng::tDlJob448 	unsigned ProcessIncomming(acbuf & inBuf, bool bOnlyRedirectionActivity)
449 	{
450 		LOGSTART("tDlJob::ProcessIncomming");
451 		if (!m_pStorage)
452 		{
453 			sErrorMsg = "502 Bad cache descriptor";
454 			return HINT_DISCON | EFLAG_JOB_BROKEN;
455 		}
456 
457 		for (;;) // returned by explicit error (or get-more) return
458 		{
459 			ldbg("switch: " << (int)m_DlState);
460 
461 			if (STATE_GETHEADER == m_DlState ||  STATE_REGETHEADER == m_DlState)
462 			{
463 				ldbg("STATE_GETHEADER");
464 				header h;
465 				if (inBuf.size() == 0)
466 					return HINT_MORE;
467 
468 				bool bHotItem = (m_DlState == STATE_REGETHEADER);
469 				dbgline;
470 
471 				auto hDataLen = h.Load(inBuf.rptr(), inBuf.size(),
472 						[&h](cmstring& key, cmstring& rest)
473 						{ if(scaseequals(key, "Content-Location"))
474 							h.frontLine = "HTTP/1.1 500 Apt-Cacher NG does not like that data";
475 						});
476 
477 				if (0 == hDataLen)
478 					return HINT_MORE;
479 				if (hDataLen<0)
480 				{
481 					dbgline;
482 					sErrorMsg = "500 Invalid header";
483 					// can be followed by any junk... drop that mirror, previous file could also contain bad data
484 					return EFLAG_MIRROR_BROKEN | HINT_DISCON | HINT_KILL_LAST_FILE;
485 				}
486 
487 				ldbg("contents: " << std::string(inBuf.rptr(), hDataLen));
488 				inBuf.drop(hDataLen);
489 				if (h.type != header::ANSWER)
490 				{
491 					dbgline;
492 					sErrorMsg = "500 Unexpected response type";
493 					// smells fatal...
494 					return EFLAG_MIRROR_BROKEN | HINT_DISCON;
495 				}
496 				ldbg("GOT, parsed: " << h.frontLine);
497 
498 				int st = h.getStatus();
499 
500 				if (cfg::redirmax) // internal redirection might be disabled
501 				{
502 					if (IS_REDIRECT(st))
503 					{
504 						if (!RewriteSource(h.h[header::LOCATION]))
505 							return EFLAG_JOB_BROKEN;
506 
507 						// drop the redirect page contents if possible so the outer loop
508 						// can scan other headers
509 						off_t contLen = atoofft(h.h[header::CONTENT_LENGTH], 0);
510 						if (contLen <= inBuf.size())
511 							inBuf.drop(contLen);
512 						return HINT_TGTCHANGE; // no other flags, caller will evaluate the state
513 					}
514 
515 					// for non-redirection responses process as usual
516 
517 					// unless it's a probe run from the outer loop, in this case we
518 					// should go no further
519 					if (bOnlyRedirectionActivity)
520 						return EFLAG_LOST_CON | HINT_DISCON;
521 				}
522 
523 				// explicitly blacklist mirror if key file is missing
524 				if (st >= 400 && m_pRepoDesc && m_remoteUri.sHost.empty())
525 				{
526 					for (const auto& kfile : m_pRepoDesc->m_keyfiles)
527 					{
528 						if (endsWith(m_remoteUri.sPath, kfile))
529 						{
530 							sErrorMsg = "500 Keyfile missing, mirror blacklisted";
531 							return HINT_DISCON | EFLAG_MIRROR_BROKEN;
532 						}
533 					}
534 				}
535 
536 				auto pCon = h.h[header::CONNECTION];
537 				if(!pCon)
538 					pCon = h.h[header::PROXY_CONNECTION];
539 
540 				if (pCon && 0 == strcasecmp(pCon, "close"))
541 				{
542 					ldbg("Peer wants to close connection after request");
543 					m_eReconnectASAP = HINT_DISCON;
544 				}
545 
546 				if (m_pStorage->m_bHeadOnly)
547 				{
548 					m_DlState = STATE_FINISHJOB;
549 				}
550 				// the only case where we expect a 304
551 				else if(st == 304 && cfg::vrangeops == 0)
552 				{
553 					m_pStorage->SetupComplete();
554 					m_DlState = STATE_FINISHJOB;
555 				}
556 				else if (h.h[header::TRANSFER_ENCODING]
557 						&& 0 == strcasecmp(h.h[header::TRANSFER_ENCODING], "chunked"))
558 				{
559 					m_DlState = STATE_GETCHUNKHEAD;
560 					h.del(header::TRANSFER_ENCODING); // don't care anymore
561 				}
562 				else
563 				{
564 					dbgline;
565 					if (!h.h[header::CONTENT_LENGTH])
566 					{
567 						sErrorMsg = "500 Missing Content-Length";
568 						return HINT_DISCON | EFLAG_JOB_BROKEN;
569 					}
570 					// may support such endless stuff in the future but that's too unreliable for now
571 					m_nRest = atoofft(h.h[header::CONTENT_LENGTH]);
572 					m_DlState = STATE_PROCESS_DATA;
573 				}
574 
575 				// ok, can pass the data to the file handler
576 				auto sremote = RemoteUri(false);
577 				h.set(header::XORIG, sremote);
578 				bool bDoRetry(false);
579 
580 				// detect bad auto-redirectors (auth-pages, etc.) by the mime-type of their target
581 				if(cfg::redirmax
582 						&& !cfg::badredmime.empty()
583 						&& cfg::redirmax != m_nRedirRemaining
584 						&& h.h[header::CONTENT_TYPE]
585 						&& strstr(h.h[header::CONTENT_TYPE], cfg::badredmime.c_str())
586 						&& h.getStatus() < 300) // contains the final data/response
587 				{
588 					if(m_pStorage->m_bCheckFreshness)
589 					{
590 						// volatile... this is still ok, just make sure time check works next time
591 						h.set(header::LAST_MODIFIED, FAKEDATEMARK);
592 					}
593 					else
594 					{
595 						// this was redirected and the destination is BAD!
596 						h.frontLine="HTTP/1.1 501 Redirected to invalid target";
597 						void DropDnsCache();
598 						DropDnsCache();
599 					}
600 				}
601 
602 				if(!m_pStorage->DownloadStartedStoreHeader(h, hDataLen,
603 						inBuf.rptr(), bHotItem, bDoRetry))
604 				{
605 					if(bDoRetry)
606 						return EFLAG_LOST_CON | HINT_DISCON; // recoverable
607 
608 					ldbg("Item dl'ed by others or in error state --> drop it, reconnect");
609 					m_DlState = STATE_PROCESS_DATA;
610 					sErrorMsg = "502 Cache descriptor busy";
611 /*					header xh = m_pStorage->GetHeader();
612 					if(xh.frontLine.length() > 12)
613 						sErrorMsg = sErrorMsg + " (" + xh.frontLine.substr(12) + ")";
614 						*/
615 					return HINT_DISCON | EFLAG_JOB_BROKEN | EFLAG_STORE_COLLISION;
616 				}
617 			}
618 			else if (m_DlState == STATE_PROCESS_CHUNKDATA || m_DlState ==  STATE_PROCESS_DATA)
619 			{
620 				// similar states, just handled differently afterwards
621 				ldbg("STATE_GETDATA");
622 				auto res = NewDataHandler(inBuf);
623 				if (HINT_SWITCH != res)
624 					return res;
625 			}
626 			else if (m_DlState == STATE_FINISHJOB)
627 			{
628 				ldbg("STATE_FINISHJOB");
629 				m_DlState = STATE_GETHEADER;
630 				m_pStorage->StoreFileData(nullptr, 0);
631 				return HINT_DONE | m_eReconnectASAP;
632 			}
633 			else if (m_DlState == STATE_GETCHUNKHEAD)
634 			{
635 				ldbg("STATE_GETCHUNKHEAD");
636 				// came back from reading, drop remaining newlines?
637 				while (inBuf.size() > 0)
638 				{
639 					char c = *(inBuf.rptr());
640 					if (c != '\r' && c != '\n')
641 						break;
642 					inBuf.drop(1);
643 				}
644 				const char *crlf(0), *pStart(inBuf.c_str());
645 				if (!inBuf.size() || nullptr == (crlf = strstr(pStart, "\r\n")))
646 				{
647 					inBuf.move();
648 					return HINT_MORE;
649 				}
650 				unsigned len(0);
651 				if (1 != sscanf(pStart, "%x", &len))
652 				{
653 					sErrorMsg = "500 Invalid data stream";
654 					return EFLAG_JOB_BROKEN; // hm...?
655 				}
656 				inBuf.drop(crlf + 2 - pStart);
657 				if (len > 0)
658 				{
659 					m_nRest = len;
660 					m_DlState = STATE_PROCESS_CHUNKDATA;
661 				}
662 				else
663 					m_DlState = STATE_GET_CHUNKTRAILER;
664 			}
665 			else if (m_DlState == STATE_GET_CHUNKTRAILER)
666 			{
667 				if (inBuf.size() < 2)
668 					return HINT_MORE;
669 				const char *pStart(inBuf.c_str());
670 				const char *crlf(strstr(pStart, "\r\n"));
671 				if (!crlf)
672 					return HINT_MORE;
673 
674 				if (crlf == pStart) // valid empty line -> done here
675 				{
676 					inBuf.drop(2);
677 					m_DlState = STATE_FINISHJOB;
678 				}
679 				else
680 					inBuf.drop(crlf + 2 - pStart); // drop line and watch for others
681 			}
682 		}
683 		ASSERT(!"unreachable");
684 		sErrorMsg = "502 Invalid state";
685 		return EFLAG_JOB_BROKEN;
686 	}
687 
IsRecoverableStateacng::tDlJob688 	inline bool IsRecoverableState()
689 	{
690 		return (m_DlState == STATE_GETHEADER || m_DlState == STATE_REGETHEADER);
691 		// XXX: In theory, could also easily recover from STATE_FINISH but that's
692 		// unlikely to happen
693 	}
694 
695 private:
696 	// not to be copied ever
697 	tDlJob(const tDlJob&);
698 	tDlJob & operator=(const tDlJob&);
699 };
700 
wake()701 void dlcon::wake()
702 {
703 	if (fdWakeWrite<0)
704 		return;
705 #ifdef HAVE_LINUX_EVENTFD
706 	while(eventfd_write(fdWakeWrite, 1)<0) ;
707 #else
708 	POKE(fdWakeWrite);
709 #endif
710 }
711 
AddJob(tFileItemPtr m_pItem,const tHttpUrl * pForcedUrl,const cfg::tRepoData * pBackends,cmstring * sPatSuffix,LPCSTR reqHead,int nMaxRedirection)712 bool dlcon::AddJob(tFileItemPtr m_pItem, const tHttpUrl *pForcedUrl,
713 		const cfg::tRepoData *pBackends,
714 		cmstring *sPatSuffix, LPCSTR reqHead,
715 		int nMaxRedirection)
716 {
717 	if(!pForcedUrl)
718 	{
719 		if(!pBackends || pBackends->m_backends.empty())
720 			return false;
721 		if(!sPatSuffix || sPatSuffix->empty())
722 			return false;
723 	}
724 	setLockGuard;
725 /*
726 	ASSERT(
727 			todo->m_pStorage->m_nRangeLimit < 0
728 					|| todo->m_pStorage->m_nRangeLimit >= todo->m_pStorage->m_nSizeSeen);
729 
730 	LOGSTART2("dlcon::EnqJob", todo->m_remoteUri.ToURI(false));
731 */
732 	m_qNewjobs.emplace_back(
733 			make_shared<tDlJob>(this, m_pItem, pForcedUrl, pBackends, sPatSuffix,nMaxRedirection));
734 
735 	m_qNewjobs.back()->ExtractCustomHeaders(reqHead);
736 
737 	wake();
738 	return true;
739 }
740 
SignalStop()741 void dlcon::SignalStop()
742 {
743 	LOGSTART("dlcon::SignalStop");
744 	setLockGuard;
745 
746 	// stop all activity as soon as possible
747 	m_bStopASAP=true;
748 	m_qNewjobs.clear();
749 
750 	wake();
751 }
752 
~dlcon()753 dlcon::~dlcon()
754 {
755 	LOGSTART("dlcon::~dlcon, Destroying dlcon");
756 #ifdef HAVE_LINUX_EVENTFD
757 	checkforceclose(m_wakeventfd);
758 #else
759 	checkforceclose(m_wakepipe[0]);
760 	checkforceclose(m_wakepipe[1]);
761 #endif
762   g_nDlCons--;
763 }
764 
ExchangeData(mstring & sErrorMsg,tDlStreamHandle & con,tDljQueue & inpipe)765 inline unsigned dlcon::ExchangeData(mstring &sErrorMsg, tDlStreamHandle &con, tDljQueue &inpipe)
766 {
767 	LOGSTART2("dlcon::ExchangeData",
768 			"qsize: " << inpipe.size() << ", sendbuf size: "
769 			<< m_sendBuf.size() << ", inbuf size: " << m_inBuf.size());
770 
771 	fd_set rfds, wfds;
772 	struct timeval tv;
773 	int r = 0;
774 	int fd = con ? con->GetFD() : -1;
775 	FD_ZERO(&rfds);
776 	FD_ZERO(&wfds);
777 
778 	if(inpipe.empty())
779 		m_inBuf.clear(); // better be sure about dirty buffer from previous connection
780 
781 	// no socket operation needed in this case but just process old buffer contents
782 	bool bReEntered=!m_inBuf.empty();
783 
784 	loop_again:
785 
786 	for (;;)
787 	{
788 		FD_SET(fdWakeRead, &rfds);
789 		int nMaxFd = fdWakeRead;
790 
791 		if (fd>=0)
792 		{
793 			FD_SET(fd, &rfds);
794 			nMaxFd = std::max(fd, nMaxFd);
795 
796 			if (!m_sendBuf.empty())
797 			{
798 				ldbg("Needs to send " << m_sendBuf.size() << " bytes");
799 				FD_SET(fd, &wfds);
800 			}
801 #ifdef HAVE_SSL
802 			else if(con->GetBIO() && BIO_should_write(con->GetBIO()))
803 			{
804 				ldbg("NOTE: OpenSSL wants to write although send buffer is empty!");
805 				FD_SET(fd, &wfds);
806 			}
807 #endif
808 		}
809 
810 		ldbg("select dlcon");
811 		tv.tv_sec = cfg::nettimeout;
812 		tv.tv_usec = 0;
813 
814 		// jump right into data processing but only once
815 		if(bReEntered)
816 		{
817 			bReEntered=false;
818 			goto proc_data;
819 		}
820 
821 		r=select(nMaxFd + 1, &rfds, &wfds, nullptr, &tv);
822 		ldbg("returned: " << r << ", errno: " << errno);
823 
824 		if (r < 0)
825 		{
826 			if (EINTR == errno)
827 				continue;
828 #ifdef MINIBUILD
829 			string fer("select failed");
830 #else
831 			tErrnoFmter fer("FAILURE: select, ");
832 			LOG(fer);
833 #endif
834 			sErrorMsg = string("500 Internal malfunction, ") + fer;
835 			return HINT_DISCON|EFLAG_JOB_BROKEN|EFLAG_MIRROR_BROKEN;
836 		}
837 		else if (r == 0) // looks like a timeout
838 		{
839 			sErrorMsg = "500 Connection timeout";
840 			// was there anything to do at all?
841 			if(inpipe.empty())
842 				return HINT_SWITCH;
843 
844 			if(inpipe.front()->IsRecoverableState())
845 				return EFLAG_LOST_CON;
846 			else
847 				return (HINT_DISCON|EFLAG_JOB_BROKEN);
848 		}
849 
850 		if (FD_ISSET(fdWakeRead, &rfds))
851 		{
852 			dbgline;
853 #ifdef HAVE_LINUX_EVENTFD
854 			eventfd_t xtmp;
855 			int tmp;
856 			do {
857 				tmp = eventfd_read(fdWakeRead, &xtmp);
858 			} while (tmp < 0 && (errno == EINTR || errno == EAGAIN));
859 
860 #else
861 			for (int tmp; read(m_wakepipe[0], &tmp, 1) > 0;)
862 				;
863 #endif
864 			return HINT_SWITCH;
865 		}
866 
867 		if (fd >= 0)
868 		{
869 			if (FD_ISSET(fd, &wfds))
870 			{
871 				FD_CLR(fd, &wfds);
872 
873 #ifdef HAVE_SSL
874 				if (con->GetBIO())
875 				{
876 					int s = BIO_write(con->GetBIO(), m_sendBuf.rptr(),
877 							m_sendBuf.size());
878 					ldbg(
879 							"tried to write to SSL, " << m_sendBuf.size() << " bytes, result: " << s);
880 					if (s > 0)
881 						m_sendBuf.drop(s);
882 				}
883 				else
884 				{
885 #endif
886 					ldbg("Sending data...\n" << m_sendBuf);
887 					int s = ::send(fd, m_sendBuf.data(), m_sendBuf.length(),
888 							MSG_NOSIGNAL);
889 					ldbg(
890 							"Sent " << s << " bytes from " << m_sendBuf.length() << " to " << con.get());
891 					if (s < 0)
892 					{
893 						// EAGAIN is weird but let's retry later, otherwise reconnect
894 						if (errno != EAGAIN && errno != EINTR)
895 						{
896 							sErrorMsg = "502 Send failed";
897 							return EFLAG_LOST_CON;
898 						}
899 					}
900 					else
901 						m_sendBuf.drop(s);
902 
903 				}
904 #ifdef HAVE_SSL
905 			}
906 #endif
907 		}
908 
909 		if (fd >=0 && (FD_ISSET(fd, &rfds)
910 #ifdef HAVE_SSL
911 				|| (con->GetBIO() && BIO_should_read(con->GetBIO()))
912 #endif
913 			))
914 		{
915        if(cfg::maxdlspeed != cfg::RESERVED_DEFVAL)
916        {
917           auto nCntNew=g_nDlCons.load();
918           if(m_nLastDlCount != nCntNew)
919           {
920              m_nLastDlCount=nCntNew;
921 
922              // well, split the bandwidth
923              auto nSpeedNowKib = uint(cfg::maxdlspeed) / nCntNew;
924              auto nTakesPerSec = nSpeedNowKib / 32;
925              if(!nTakesPerSec)
926                 nTakesPerSec=1;
927              m_nSpeedLimitMaxPerTake = nSpeedNowKib*1024/nTakesPerSec;
928              auto nIntervalUS=1000000 / nTakesPerSec;
929              auto nIntervalUS_copy = nIntervalUS;
930              // creating a bitmask
931              for(m_nSpeedLimiterRoundUp=1,nIntervalUS/=2;nIntervalUS;nIntervalUS>>=1)
932                 m_nSpeedLimiterRoundUp = (m_nSpeedLimiterRoundUp<<1)|1;
933              m_nSpeedLimitMaxPerTake = uint(double(m_nSpeedLimitMaxPerTake) * double(m_nSpeedLimiterRoundUp) / double(nIntervalUS_copy));
934           }
935           // waiting for the next time slice to get data from buffer
936           timeval tv;
937           if(0==gettimeofday(&tv, nullptr))
938           {
939              auto usNext = tv.tv_usec | m_nSpeedLimiterRoundUp;
940              usleep(usNext-tv.tv_usec);
941           }
942        }
943 #ifdef HAVE_SSL
944 			if(con->GetBIO())
945 			{
946 				r=BIO_read(con->GetBIO(), m_inBuf.wptr(),
947 						std::min(m_nSpeedLimitMaxPerTake, m_inBuf.freecapa()));
948 				if(r>0)
949 					m_inBuf.got(r);
950 				else // <=0 doesn't mean an error, only a double check can tell
951 					r=BIO_should_read(con->GetBIO()) ? 1 : -errno;
952 			}
953 			else
954 #endif
955 			{
956 				r = m_inBuf.sysread(fd, m_nSpeedLimitMaxPerTake);
957 			}
958 
959 #ifdef DISCO_FAILURE
960 #warning hej
961 			static int fakeFail=-123;
962 			if(fakeFail == -123)
963 			{
964 				srand(getpid());
965 				fakeFail = rand()%123;
966 			}
967 			if( fakeFail-- < 0)
968 			{
969 //				LOGLVL(log::LOG_DEBUG, "\n#################\nFAKING A FAILURE\n###########\n");
970 				r=0;
971 				fakeFail=rand()%123;
972 				errno = EROFS;
973 				//r = -errno;
974 				shutdown(con.get()->GetFD(), SHUT_RDWR);
975 			}
976 #endif
977 
978 			if(r == -EAGAIN || r == -EWOULDBLOCK)
979 			{
980 				ldbg("why EAGAIN/EINTR after getting it from select?");
981 //				timespec sleeptime = { 0, 432000000 };
982 //				nanosleep(&sleeptime, nullptr);
983 				goto loop_again;
984 			}
985 			else if (r == 0)
986 			{
987 				dbgline;
988 				sErrorMsg = "502 Connection closed";
989 				return EFLAG_LOST_CON;
990 			}
991 			else if (r < 0) // other error, might reconnect
992 			{
993 				dbgline;
994 #ifdef MINIBUILD
995 				sErrorMsg = "502 EPIC FAIL";
996 #else
997 				// pickup the error code for later and kill current connection ASAP
998 				sErrorMsg = tErrnoFmter("502 ");
999 #endif
1000 				return EFLAG_LOST_CON;
1001 			}
1002 
1003 			proc_data:
1004 
1005 			if(inpipe.empty())
1006 			{
1007 				ldbg("FIXME: unexpected data returned?");
1008 				sErrorMsg = "500 Unexpected data";
1009 				return EFLAG_LOST_CON;
1010 			}
1011 
1012 			while(!m_inBuf.empty())
1013 			{
1014 
1015 				ldbg("Processing job for " << inpipe.front()->RemoteUri(false));
1016 				unsigned res = inpipe.front()->ProcessIncomming(m_inBuf, false);
1017 				ldbg(
1018 						"... incoming data processing result: " << res
1019 						<< ", emsg: " << inpipe.front()->sErrorMsg);
1020 
1021 				if(res&EFLAG_MIRROR_BROKEN)
1022 				{
1023 					ldbg("###### BROKEN MIRROR ####### on " << con.get());
1024 				}
1025 
1026 				if (HINT_MORE == res)
1027 					goto loop_again;
1028 
1029 				if (HINT_DONE & res)
1030 				{
1031 					// just in case that server damaged the last response body
1032 					con->KnowLastFile(WEAK_PTR<fileitem>(inpipe.front()->m_pStorage));
1033 
1034 					inpipe.pop_front();
1035 					if (HINT_DISCON & res)
1036 						return HINT_DISCON; // with cleaned flags
1037 
1038 					LOG(
1039 							"job finished. Has more? " << inpipe.size()
1040 							<< ", remaining data? " << m_inBuf.size());
1041 
1042 					if (inpipe.empty())
1043 					{
1044 						LOG("Need more work");
1045 						return HINT_SWITCH;
1046 					}
1047 
1048 					LOG("Extract more responses");
1049 					continue;
1050 				}
1051 
1052 				if (HINT_TGTCHANGE & res)
1053 				{
1054 					/* If the target was modified for internal redirection then there might be
1055 					 * more responses of that kind in the queue. Apply the redirection handling
1056 					 * to the rest as well if possible without having side effects.
1057 					 */
1058 					auto it = inpipe.begin();
1059 					for(++it; it != inpipe.end(); ++it)
1060 					{
1061 						unsigned rr = (**it).ProcessIncomming(m_inBuf, true);
1062 						// just the internal rewriting applied and nothing else?
1063 						if( HINT_TGTCHANGE != rr )
1064 						{
1065 							// not internal redirection or some failure doing it
1066 							m_nTempPipelineDisable=30;
1067 							return (HINT_TGTCHANGE|HINT_DISCON);
1068 						}
1069 					}
1070 					// processed all inpipe stuff but if the buffer is still not empty then better disconnect
1071 					return HINT_TGTCHANGE | (m_inBuf.empty() ? 0 : HINT_DISCON);
1072 				}
1073 
1074 				// else case: error handling, pass to main loop
1075 				if(HINT_KILL_LAST_FILE & res)
1076 					con->KillLastFile();
1077 				setIfNotEmpty(sErrorMsg, inpipe.front()->sErrorMsg);
1078 				return res;
1079 			}
1080 			return HINT_DONE; // input buffer consumed
1081 		}
1082 	}
1083 
1084 	ASSERT(!"Unreachable");
1085 	sErrorMsg = "500 Internal failure";
1086 	return EFLAG_JOB_BROKEN|HINT_DISCON;
1087 }
1088 
WorkLoop()1089 void dlcon::WorkLoop()
1090 {
1091 	LOGSTART("dlcon::WorkLoop");
1092     string sErrorMsg;
1093     m_inBuf.clear();
1094 
1095 	if (!m_inBuf.setsize(cfg::dlbufsize))
1096 	{
1097 		log::err("500 Out of memory");
1098 		return;
1099 	}
1100 
1101 	if(fdWakeRead<0 || fdWakeWrite<0)
1102 	{
1103 		log::err("Error creating pipe file descriptors");
1104 		return;
1105 	}
1106 
1107 	tDljQueue inpipe;
1108 	tDlStreamHandle con;
1109 	unsigned loopRes=0;
1110 
1111 	bool bStopRequesting=false; // hint to stop adding request headers until the connection is restarted
1112 
1113 	int nLostConTolerance=MAX_RETRY;
1114 
1115 	auto BlacklistMirror = [&](tDlJobPtr & job)
1116 	{
1117 		LOGSTART2("BlacklistMirror", "blacklisting " <<
1118 				job->GetPeerHost().ToURI(false));
1119 		m_blacklist[std::make_pair(job->GetPeerHost().sHost,
1120 				job->GetPeerHost().GetPort())] = sErrorMsg;
1121 	};
1122 
1123 	auto prefProxy = [&](tDlJobPtr& cjob) -> const tHttpUrl*
1124 	{
1125 		if(m_bProxyTot)
1126 			return nullptr;
1127 
1128 		if(cjob->m_pRepoDesc && cjob->m_pRepoDesc->m_pProxy
1129 				&& !cjob->m_pRepoDesc->m_pProxy->sHost.empty())
1130 		{
1131 			return cjob->m_pRepoDesc->m_pProxy;
1132 		}
1133 		return cfg::GetProxyInfo();
1134 	};
1135 
1136 	while(true) // outer loop: jobs, connection handling
1137 	{
1138         // init state or transfer loop jumped out, what are the needed actions?
1139         {
1140         	setLockGuard;
1141         	LOG("New jobs: " << m_qNewjobs.size());
1142 
1143         	if(m_bStopASAP)
1144         	{
1145         		/* The no-more-users checking logic will purge orphaned items from the inpipe
1146         		 * queue. When the connection is dirty after that, it will be closed in the
1147         		 * ExchangeData() but if not then it can be assumed to be clean and reusable.
1148         		 */
1149         		if(inpipe.empty())
1150         		{
1151         			if(con)
1152         				m_pConFactory->RecycleIdleConnection(con);
1153         			return;
1154         		}
1155         	}
1156 
1157 
1158         	if(m_qNewjobs.empty())
1159         		goto go_select; // parent will notify RSN
1160 
1161         	if(!con)
1162         	{
1163         		// cleanup after the last connection - send buffer, broken jobs, ...
1164         		m_sendBuf.clear();
1165         		m_inBuf.clear();
1166         		inpipe.clear();
1167 
1168         		bStopRequesting=false;
1169 
1170         		for(tDljQueue::iterator it=m_qNewjobs.begin(); it!=m_qNewjobs.end();)
1171         		{
1172         			if((**it).SetupJobConfig(sErrorMsg, m_blacklist))
1173         				++it;
1174         			else
1175         			{
1176         				setIfNotEmpty2( (**it).sErrorMsg, sErrorMsg,
1177         						"500 Broken mirror or incorrect configuration");
1178         				m_qNewjobs.erase(it++);
1179         			}
1180         		}
1181         		if(m_qNewjobs.empty())
1182         		{
1183         			LOG("no jobs left, start waiting")
1184         			goto go_select; // nothing left, might receive new jobs soon
1185         		}
1186 
1187 				bool bUsed = false;
1188 				ASSERT(!m_qNewjobs.empty());
1189 
1190 				auto doconnect = [&](const tHttpUrl& tgt, int timeout, bool fresh)
1191 				{
1192 					return m_pConFactory->CreateConnected(tgt.sHost,
1193 							tgt.GetPort(),
1194 							sErrorMsg,
1195 							&bUsed,
1196 							m_qNewjobs.front()->GetConnStateTracker(),
1197 							IFSSLORFALSE(tgt.bSSL),
1198 							timeout, fresh);
1199 			}	;
1200 
1201 				auto& cjob = m_qNewjobs.front();
1202 				auto proxy = prefProxy(cjob);
1203 				auto& peerHost = cjob->GetPeerHost();
1204 
1205 #ifdef HAVE_SSL
1206 				if(peerHost.bSSL)
1207 				{
1208 					if(proxy)
1209 					{
1210 						con = doconnect(*proxy, cfg::optproxytimeout > 0 ?
1211 								cfg::optproxytimeout : cfg::nettimeout, false);
1212 						if(con)
1213 						{
1214 							if(!con->StartTunnel(peerHost, sErrorMsg, & proxy->sUserPass, true))
1215 								con.reset();
1216 						}
1217 					}
1218 					else
1219 						con = doconnect(peerHost, cfg::nettimeout, false);
1220 				}
1221 				else
1222 #endif
1223 				{
1224 					if(proxy)
1225 					{
1226 						con = doconnect(*proxy, cfg::optproxytimeout > 0 ?
1227 								cfg::optproxytimeout : cfg::nettimeout, false);
1228 					}
1229 					else
1230 						con = doconnect(peerHost, cfg::nettimeout, false);
1231 				}
1232 
1233         		if(!con && proxy && cfg::optproxytimeout>0)
1234         		{
1235         			ldbg("optional proxy broken, disable");
1236         			m_bProxyTot = true;
1237         			proxy = nullptr;
1238 				cfg::MarkProxyFailure();
1239         			con = doconnect(peerHost, cfg::nettimeout, false);
1240         		}
1241 
1242         		ldbg("connection valid? " << bool(con) << " was fresh? " << !bUsed);
1243 
1244         		if(con)
1245         		{
1246         			ldbg("target? [" << con->GetHostname() << "]:" << con->GetPort());
1247 
1248         			// must test this connection, just be sure no crap is in the pipe
1249         			if (bUsed && check_read_state(con->GetFD()))
1250         			{
1251         				ldbg("code: MoonWalker");
1252         				con.reset();
1253         				continue;
1254         			}
1255         		}
1256         		else
1257         		{
1258         			BlacklistMirror(cjob);
1259         			continue; // try the next backend
1260         		}
1261         	}
1262 
1263         	// connection should be stable now, prepare all jobs and/or move to pipeline
1264         	while(!bStopRequesting
1265         			&& !m_qNewjobs.empty()
1266         			&& int(inpipe.size()) <= cfg::pipelinelen)
1267         	{
1268    				tDlJobPtr &cjob = m_qNewjobs.front();
1269 
1270         		if(!cjob->SetupJobConfig(sErrorMsg, m_blacklist))
1271         		{
1272         			// something weird happened to it, drop it and let the client care
1273         			m_qNewjobs.pop_front();
1274         			continue;
1275         		}
1276 
1277         		auto& tgt=cjob->GetPeerHost();
1278         		// good case, direct or tunneled connection
1279         		bool match=(tgt.sHost == con->GetHostname() && tgt.GetPort() == con->GetPort());
1280         		const tHttpUrl * proxy = nullptr; // to be set ONLY if PROXY mode is used
1281 
1282         		// if not exact and can be proxied, and is this the right proxy?
1283         		if(!match)
1284         		{
1285         			proxy = prefProxy(cjob);
1286         			if(proxy)
1287         			{
1288         				/*
1289         				 * SSL over proxy uses HTTP tunnels (CONNECT scheme) so the check
1290         				 * above should have matched before.
1291         				 */
1292         				if(!tgt.bSSL)
1293         					match=(proxy->sHost == con->GetHostname() && proxy->GetPort() == con->GetPort());
1294         			}
1295         			// else... host changed and not going through the same proxy -> fail
1296         		}
1297 
1298         		if(!match)
1299         		{
1300         			LOG("host mismatch, new target: " << tgt.sHost << ":" << tgt.GetPort());
1301         			bStopRequesting=true;
1302         			break;
1303         		}
1304 
1305 				cjob->AppendRequest(m_sendBuf, m_sXForwardedFor, proxy);
1306 				LOG("request added to buffer");
1307 				inpipe.emplace_back(cjob);
1308 				m_qNewjobs.pop_front();
1309 
1310 				if (m_nTempPipelineDisable > 0)
1311 				{
1312 					bStopRequesting = true;
1313 					--m_nTempPipelineDisable;
1314 					break;
1315 				}
1316         	}
1317         }
1318 
1319 		ldbg("Request(s) cooked, buffer contents: " << m_sendBuf);
1320 
1321         go_select:
1322 
1323         if(inpipe.empty() && m_bManualMode)
1324         {
1325         	return;
1326         }
1327 
1328         // inner loop: plain communication until something happens. Maybe should use epoll here?
1329         loopRes=ExchangeData(sErrorMsg, con, inpipe);
1330         ldbg("loopRes: "<< loopRes);
1331 
1332         /* check whether we have a pipeline stall. This may happen because a) we are done or
1333          * b) because of the remote hostname change or c) the client stopped sending tasks.
1334          * Anyhow, that's a reason to put the connection back into the shared pool so either we
1335          * get it back from the pool in the next workloop cycle or someone else gets it and we
1336          * get a new connection for the new host later.
1337          * */
1338         if (inpipe.empty())
1339 		{
1340         	// all requests have been processed (client done, or pipeline stall, who cares)
1341 			dbgline;
1342 
1343 			// no matter what happened, that stop flag is now irrelevant
1344 			bStopRequesting = false;
1345 
1346 			// no error bits set, not busy -> this connection is still good, recycle properly
1347 			unsigned all_err = HINT_DISCON | EFLAG_JOB_BROKEN | EFLAG_LOST_CON | EFLAG_MIRROR_BROKEN;
1348 			if (con && !(loopRes & all_err))
1349 			{
1350 				dbgline;
1351 				m_pConFactory->RecycleIdleConnection(con);
1352 				continue;
1353 			}
1354 		}
1355 
1356         /*
1357          * Here we go if the inpipe is still not processed or there have been errors
1358          * needing special handling.
1359          */
1360 
1361         if( (HINT_DISCON|EFLAG_LOST_CON) & loopRes)
1362         {
1363         	dbgline;
1364         	con.reset();
1365         	m_inBuf.clear();
1366         	m_sendBuf.clear();
1367         }
1368 
1369         if ( loopRes & HINT_TGTCHANGE )
1370         {
1371         	// short queue continues jobs with rewritten targets, so
1372         	// reinsert them into the new task list and continue
1373 
1374         	// if conn was not reset above then it should be in good shape
1375         	m_pConFactory->RecycleIdleConnection(con);
1376         	goto move_jobs_back_to_q;
1377         }
1378 
1379         if ((EFLAG_LOST_CON & loopRes) && !inpipe.empty())
1380 		{
1381 			// disconnected by OS... give it a chance, or maybe not...
1382 			if (--nLostConTolerance <= 0)
1383 			{
1384 				BlacklistMirror(inpipe.front());
1385 				nLostConTolerance=MAX_RETRY;
1386 			}
1387 
1388 			con.reset();
1389 
1390 			timespec sleeptime = { 0, 325000000 };
1391 			nanosleep(&sleeptime, nullptr);
1392 
1393 			// trying to resume that job secretly, unless user disabled the use of range (we
1394 			// cannot resync the sending position ATM, throwing errors to user for now)
1395 			if (cfg::vrangeops <= 0 && inpipe.front()->m_pStorage->m_bCheckFreshness)
1396 				loopRes |= EFLAG_JOB_BROKEN;
1397 			else
1398 				inpipe.front()->m_DlState = tDlJob::STATE_REGETHEADER;
1399 		}
1400 
1401         if(loopRes & (HINT_DONE|HINT_MORE))
1402         {
1403         	sErrorMsg.clear();
1404         	continue;
1405         }
1406 
1407         //
1408         // regular required post-processing done here, now handle special conditions
1409         //
1410 
1411 
1412         if(HINT_SWITCH == loopRes)
1413         	continue;
1414 
1415         // resolving the "fatal error" situation, push the pipelined job back to new, etc.
1416 
1417         if( (EFLAG_MIRROR_BROKEN & loopRes) && !inpipe.empty())
1418         	BlacklistMirror(inpipe.front());
1419 
1420         if( (EFLAG_JOB_BROKEN & loopRes) && !inpipe.empty())
1421         {
1422         	setIfNotEmpty(inpipe.front()->sErrorMsg, sErrorMsg);
1423 
1424         	inpipe.pop_front();
1425 
1426         	if(EFLAG_STORE_COLLISION & loopRes)
1427         	{
1428 				// stupid situation, both users downloading the same stuff - and most likely in the same order
1429 				// if one downloader runs a step ahead (or usually many steps), drop all items
1430 				// already processed by it and try to continue somewhere else.
1431 				// This way, the overall number of collisions and reconnects is minimized
1432 
1433         		auto cleaner = [](tDljQueue &joblist)
1434         		{
1435         			for(auto it = joblist.begin(); it!= joblist.end();)
1436         			{
1437         				if(*it && (**it).m_pStorage
1438         						&& (**it).m_pStorage->GetStatus() >= fileitem::FIST_DLRECEIVING)
1439         				{
1440         					// someone else is doing it -> drop
1441         					joblist.erase(it++);
1442         					continue;
1443         				}
1444         				else
1445         					++it;
1446         			}
1447         		};
1448         		cleaner(inpipe);
1449         		setLockGuard;
1450         		cleaner(m_qNewjobs);
1451         	}
1452         }
1453 
1454         move_jobs_back_to_q:
1455         // for the jobs that were not finished and/or dropped, move them back into the task queue
1456         {
1457         	setLockGuard;
1458         	m_qNewjobs.insert(m_qNewjobs.begin(), inpipe.begin(), inpipe.end());
1459         	inpipe.clear();
1460         }
1461 
1462 	}
1463 }
1464 
1465 }
1466