1
2 #include <unistd.h>
3 #include <sys/time.h>
4 #include <atomic>
5 #include <algorithm>
6
7 #define LOCAL_DEBUG
8 #include "debug.h"
9
10 #include "config.h"
11 #include "acfg.h"
12 #include "dlcon.h"
13
14 #include "fileitem.h"
15 #include "fileio.h"
16 #include "sockio.h"
17
18 #ifdef HAVE_LINUX_EVENTFD
19 #include <sys/eventfd.h>
20 #endif
21
22 using namespace std;
23
24 // evil hack to simulate random disconnects
25 //#define DISCO_FAILURE
26
27 #define MAX_RETRY 11
28
29 namespace acng
30 {
31
32 static cmstring sGenericError("567 Unknown download error occured");
33
34 // those are not allowed to be forwarded
35 static const auto taboo =
36 {
37 string("Host"), string("Cache-Control"),
38 string("Proxy-Authorization"), string("Accept"),
39 string("User-Agent")
40 };
41
42 std::atomic_uint g_nDlCons(0);
43
dlcon(bool bManualExecution,string * xff,IDlConFactory * pConFactory)44 dlcon::dlcon(bool bManualExecution, string *xff, IDlConFactory *pConFactory) :
45 m_pConFactory(pConFactory), m_bStopASAP(false), m_bManualMode(bManualExecution),
46 m_nTempPipelineDisable(0),
47 m_bProxyTot(false)
48 {
49 LOGSTART("dlcon::dlcon");
50 #ifdef HAVE_LINUX_EVENTFD
51 m_wakeventfd=eventfd(0, 0);
52 if(m_wakeventfd>=0)
53 set_nb(m_wakeventfd);
54 #else
55 if (0 == pipe(m_wakepipe))
56 {
57 set_nb(m_wakepipe[0]);
58 set_nb(m_wakepipe[1]);
59 }
60 #endif
61 if (xff)
62 m_sXForwardedFor = *xff;
63 g_nDlCons++;
64 }
65
66 struct tDlJob
67 {
68 tFileItemPtr m_pStorage;
69 mstring sErrorMsg;
70 dlcon &m_parent;
71
HasBrokenStorageacng::tDlJob72 inline bool HasBrokenStorage()
73 {
74 return (!m_pStorage || m_pStorage->GetStatus() > fileitem::FIST_COMPLETE);
75 }
76
77 #define HINT_MORE 0
78 #define HINT_DONE 1
79 #define HINT_DISCON 2
80 #define EFLAG_JOB_BROKEN 4
81 #define EFLAG_MIRROR_BROKEN 8
82 #define EFLAG_STORE_COLLISION 16
83 #define HINT_SWITCH 32
84 #define EFLAG_LOST_CON 64
85 #define HINT_KILL_LAST_FILE 128
86 #define HINT_TGTCHANGE 256
87
88 const cfg::tRepoData * m_pRepoDesc=nullptr;
89
90 /*!
91 * Returns a reference to http url where host and port and protocol match the current host
92 * Other fields in that member have undefined contents. ;-)
93 */
GetPeerHostacng::tDlJob94 inline const tHttpUrl& GetPeerHost()
95 {
96 return m_pCurBackend ? *m_pCurBackend : m_remoteUri;
97 }
98
GetConnStateTrackeracng::tDlJob99 inline cfg::tRepoData::IHookHandler * GetConnStateTracker()
100 {
101 return m_pRepoDesc ? m_pRepoDesc->m_pHooks : nullptr;
102 }
103
104 typedef enum : char
105 {
106 STATE_GETHEADER, STATE_REGETHEADER, STATE_PROCESS_DATA,
107 STATE_GETCHUNKHEAD, STATE_PROCESS_CHUNKDATA, STATE_GET_CHUNKTRAILER,
108 STATE_FINISHJOB
109 } tDlState;
110
111 string m_extraHeaders;
112
113 tHttpUrl m_remoteUri;
114 const tHttpUrl *m_pCurBackend=nullptr;
115
116 uint_fast8_t m_eReconnectASAP =0;
117 bool m_bBackendMode=false;
118
119 off_t m_nRest =0;
120
121 tDlState m_DlState = STATE_GETHEADER;
122
123 int m_nRedirRemaining;
124
tDlJobacng::tDlJob125 inline tDlJob(dlcon *p, tFileItemPtr pFi,
126 const tHttpUrl *pUri,
127 const cfg::tRepoData * pRepoData,
128 const std::string *psPath,
129 int redirmax) :
130 m_pStorage(pFi),
131 m_parent(*p),
132 m_pRepoDesc(pRepoData),
133 m_nRedirRemaining(redirmax)
134 {
135 LOGSTART("tDlJob::tDlJob");
136 ldbg("uri: " << (pUri ? pUri->ToURI(false) : sEmptyString )
137 << ", " << "restpath: " << (psPath?*psPath:sEmptyString)
138 << "repo: " << uintptr_t(pRepoData)
139 );
140 if (m_pStorage)
141 m_pStorage->IncDlRefCount();
142 if(pUri)
143 m_remoteUri=*pUri;
144 else
145 {
146 m_remoteUri.sPath=*psPath;
147 m_bBackendMode=true;
148 }
149 }
150
~tDlJobacng::tDlJob151 ~tDlJob()
152 {
153 if (m_pStorage)
154 m_pStorage->DecDlRefCount(sErrorMsg.empty() ? sGenericError : sErrorMsg);
155 }
156
ExtractCustomHeadersacng::tDlJob157 inline void ExtractCustomHeaders(LPCSTR reqHead)
158 {
159 if(!reqHead)
160 return;
161 header h;
162 bool forbidden=false;
163 h.Load(reqHead, (unsigned) std::numeric_limits<int>::max(),
164 [this, &forbidden](cmstring& key, cmstring& rest)
165 {
166 // heh, continuation of ignored stuff or without start?
167 if(key.empty() && (m_extraHeaders.empty() || forbidden))
168 return;
169 forbidden = taboo.end() != std::find_if(taboo.begin(), taboo.end(),
170 [&key](cmstring &x){return scaseequals(x,key);});
171 if(!forbidden)
172 m_extraHeaders += key + rest;
173 }
174 );
175 }
176
RemoteUriacng::tDlJob177 inline string RemoteUri(bool bUrlEncoded)
178 {
179 if(m_pCurBackend)
180 return m_pCurBackend->ToURI(bUrlEncoded) +
181 ( bUrlEncoded ? UrlEscape(m_remoteUri.sPath)
182 : m_remoteUri.sPath);
183
184 return m_remoteUri.ToURI(bUrlEncoded);
185 }
186
RewriteSourceacng::tDlJob187 inline bool RewriteSource(const char *pNewUrl)
188 {
189 LOGSTART("tDlJob::RewriteSource");
190 if (--m_nRedirRemaining <= 0)
191 {
192 sErrorMsg = "500 Bad redirection (loop)";
193 return false;
194 }
195
196 if (!pNewUrl || !*pNewUrl)
197 {
198 sErrorMsg = "500 Bad redirection (empty)";
199 return false;
200 }
201
202 // start modifying the target URL, point of no return
203 m_pCurBackend = nullptr;
204 bool bWasBeMode = m_bBackendMode;
205 m_bBackendMode = false;
206 sErrorMsg = "500 Bad redirection (path)";
207
208 auto sLocationDecoded = UrlUnescape(pNewUrl);
209
210 tHttpUrl newUri;
211 if (newUri.SetHttpUrl(sLocationDecoded, false))
212 {
213 dbgline;
214 m_remoteUri = newUri;
215 return true;
216 }
217 // ok, some protocol-relative crap? let it parse the hostname but keep the protocol
218 if (startsWithSz(sLocationDecoded, "//"))
219 {
220 stripPrefixChars(sLocationDecoded, "/");
221 return m_remoteUri.SetHttpUrl(
222 m_remoteUri.GetProtoPrefix() + sLocationDecoded);
223 }
224
225 // recreate the full URI descriptor matching the last download
226 if(bWasBeMode)
227 {
228 if(!m_pCurBackend)
229 return false;
230 auto sPathBackup=m_remoteUri.sPath;
231 m_remoteUri=*m_pCurBackend;
232 m_remoteUri.sPath+=sPathBackup;
233 }
234
235 if (startsWithSz(sLocationDecoded, "/"))
236 {
237 m_remoteUri.sPath = sLocationDecoded;
238 return true;
239 }
240 // ok, must be relative
241 m_remoteUri.sPath+=(sPathSepUnix+sLocationDecoded);
242 return true;
243 }
244
SetupJobConfigacng::tDlJob245 bool SetupJobConfig(mstring& sReasonMsg, decltype(dlcon::m_blacklist) &blacklist)
246 {
247 LOGSTART("dlcon::SetupJobConfig");
248
249 // using backends? Find one which is not blacklisted
250 if (m_bBackendMode)
251 {
252 // keep the existing one if possible
253 if (m_pCurBackend)
254 {
255 LOG(
256 "Checking [" << m_pCurBackend->sHost << "]:" << m_pCurBackend->GetPort());
257 const auto bliter = blacklist.find(
258 make_pair(m_pCurBackend->sHost, m_pCurBackend->GetPort()));
259 if (bliter == blacklist.end())
260 return true;
261 }
262
263 // look in the constant list, either it's usable or it was blacklisted before
264 for (const auto& bend : m_pRepoDesc->m_backends)
265 {
266 const auto bliter = blacklist.find(make_pair(bend.sHost, bend.GetPort()));
267 if (bliter == blacklist.end())
268 {
269 m_pCurBackend = &bend;
270 return true;
271 }
272
273 // uh, blacklisted, remember the last reason
274 sReasonMsg = bliter->second;
275 }
276 return false;
277 }
278
279 // ok, not backend mode. Check the mirror data (vs. blacklist)
280 auto bliter = blacklist.find(
281 make_pair(GetPeerHost().sHost, GetPeerHost().GetPort()));
282 if (bliter == blacklist.end())
283 return true;
284
285 sReasonMsg = bliter->second;
286 return false;
287 }
288
289 // needs connectedHost, blacklist, output buffer from the parent, proxy mode?
AppendRequestacng::tDlJob290 inline void AppendRequest(tSS &head, cmstring &xff, const tHttpUrl *proxy)
291 {
292 LOGSTART("tDlJob::AppendRequest");
293
294 head << (m_pStorage->m_bHeadOnly ? "HEAD " : "GET ");
295
296 if (proxy)
297 head << RemoteUri(true);
298 else // only absolute path without scheme
299 {
300 if (m_pCurBackend) // base dir from backend definition
301 head << UrlEscape(m_pCurBackend->sPath);
302
303 head << UrlEscape(m_remoteUri.sPath);
304 }
305
306 ldbg(RemoteUri(true));
307
308 head << " HTTP/1.1\r\n" << cfg::agentheader << "Host: " << GetPeerHost().sHost << "\r\n";
309
310 if (proxy) // proxy stuff, and add authorization if there is any
311 {
312 ldbg("using proxy");
313 if(!proxy->sUserPass.empty())
314 {
315 head << "Proxy-Authorization: Basic "
316 << EncodeBase64Auth(proxy->sUserPass) << "\r\n";
317 }
318 // Proxy-Connection is a non-sensical copy of Connection but some proxy
319 // might listen only to this one so better add it
320 head << (cfg::persistoutgoing ? "Proxy-Connection: keep-alive\r\n"
321 : "Proxy-Connection: close\r\n");
322 }
323
324 const auto& pSourceHost = GetPeerHost();
325 if(!pSourceHost.sUserPass.empty())
326 {
327 head << "Authorization: Basic "
328 << EncodeBase64Auth(pSourceHost.sUserPass) << "\r\n";
329 }
330
331 // either by backend or by host in file uri, never both
332 //XXX: still needed? Checked while inserting already.
333 // ASSERT( (m_pCurBackend && m_fileUri.sHost.empty()) || (!m_pCurBackend && !m_fileUri.sHost.empty()));
334
335 if (m_pStorage->m_nSizeSeen > 0 || m_pStorage->m_nRangeLimit >=0)
336 {
337 bool bSetRange(false), bSetIfRange(false);
338
339 lockguard g(m_pStorage.get());
340 const header &pHead = m_pStorage->GetHeaderUnlocked();
341
342 if (m_pStorage->m_bCheckFreshness)
343 {
344 if (pHead.h[header::LAST_MODIFIED])
345 {
346 if (cfg::vrangeops > 0)
347 {
348 bSetIfRange = true;
349 bSetRange = true;
350 }
351 else if(cfg::vrangeops == 0)
352 {
353 head << "If-Modified-Since: " << pHead.h[header::LAST_MODIFIED] << "\r\n";
354 }
355 }
356 }
357 else
358 {
359 /////////////// this was protection against broken stuff in the pool ////
360 // static file type, date does not matter. check known content length, not risking "range not satisfiable" result
361 //
362 //off_t nContLen=atol(h.get("Content-Length"));
363 //if (nContLen>0 && j->m_pStorage->m_nFileSize < nContLen)
364 bSetRange = true;
365 }
366
367 /*
368 if(m_pStorage->m_nSizeSeen >0 && m_pStorage->m_nRangeLimit>=0)
369 {
370 bool bSaneRange=m_pStorage->m_nRangeLimit >=m_pStorage->m_nSizeSeen;
371 // just to be sure
372 ASSERT(bSaneRange);
373 }
374 if(m_pStorage->m_nRangeLimit < m_pStorage->m_nSizeSeen)
375 bSetRange = bSetIfRange = false;
376 */
377
378 /* use APT's old trick - set the starting position one byte lower -
379 * this way the server has to send at least one byte if the assumed
380 * position is correct, and we never get a 416 error (one byte
381 * waste is acceptable).
382 * */
383 if (bSetRange)
384 {
385 head << "Range: bytes=" << std::max(off_t(0), m_pStorage->m_nSizeSeen - 1) << "-";
386 if(m_pStorage->m_nRangeLimit>=0)
387 head << m_pStorage->m_nRangeLimit;
388 head << "\r\n";
389 }
390
391 if (bSetIfRange)
392 head << "If-Range: " << pHead.h[header::LAST_MODIFIED] << "\r\n";
393 }
394
395 if (m_pStorage->m_bCheckFreshness)
396 head << "Cache-Control: no-store,no-cache,max-age=0\r\n";
397
398 if (cfg::exporigin && !xff.empty())
399 head << "X-Forwarded-For: " << xff << "\r\n";
400
401 head << cfg::requestapx
402 << m_extraHeaders
403 << "Accept: application/octet-stream\r\n"
404 "Accept-Encoding: identity\r\n"
405 "Connection: "
406 << (cfg::persistoutgoing ? "keep-alive\r\n\r\n" : "close\r\n\r\n");
407
408 #ifdef SPAM
409 //head.syswrite(2);
410 #endif
411
412 }
413
NewDataHandleracng::tDlJob414 inline uint_fast8_t NewDataHandler(acbuf & inBuf)
415 {
416 LOGSTART("tDlJob::NewDataHandler");
417 while (true)
418 {
419 off_t nToStore = min((off_t) inBuf.size(), m_nRest);
420 ldbg("To store: " <<nToStore);
421 if (0 == nToStore)
422 break;
423
424 if (!m_pStorage->StoreFileData(inBuf.rptr(), nToStore))
425 {
426 dbgline;
427 sErrorMsg = "502 Could not store data";
428 return HINT_DISCON | EFLAG_JOB_BROKEN;
429 }
430
431 m_nRest -= nToStore;
432 inBuf.drop(nToStore);
433 }
434
435 ldbg("Rest: " << m_nRest);
436
437 if (m_nRest != 0)
438 return HINT_MORE; // will come back
439
440 m_DlState = (STATE_PROCESS_DATA == m_DlState) ? STATE_FINISHJOB : STATE_GETCHUNKHEAD;
441 return HINT_SWITCH;
442 }
443
444 /*!
445 *
446 * Process new incoming data and write it down to disk or other receivers.
447 */
ProcessIncommingacng::tDlJob448 unsigned ProcessIncomming(acbuf & inBuf, bool bOnlyRedirectionActivity)
449 {
450 LOGSTART("tDlJob::ProcessIncomming");
451 if (!m_pStorage)
452 {
453 sErrorMsg = "502 Bad cache descriptor";
454 return HINT_DISCON | EFLAG_JOB_BROKEN;
455 }
456
457 for (;;) // returned by explicit error (or get-more) return
458 {
459 ldbg("switch: " << (int)m_DlState);
460
461 if (STATE_GETHEADER == m_DlState || STATE_REGETHEADER == m_DlState)
462 {
463 ldbg("STATE_GETHEADER");
464 header h;
465 if (inBuf.size() == 0)
466 return HINT_MORE;
467
468 bool bHotItem = (m_DlState == STATE_REGETHEADER);
469 dbgline;
470
471 auto hDataLen = h.Load(inBuf.rptr(), inBuf.size(),
472 [&h](cmstring& key, cmstring& rest)
473 { if(scaseequals(key, "Content-Location"))
474 h.frontLine = "HTTP/1.1 500 Apt-Cacher NG does not like that data";
475 });
476
477 if (0 == hDataLen)
478 return HINT_MORE;
479 if (hDataLen<0)
480 {
481 dbgline;
482 sErrorMsg = "500 Invalid header";
483 // can be followed by any junk... drop that mirror, previous file could also contain bad data
484 return EFLAG_MIRROR_BROKEN | HINT_DISCON | HINT_KILL_LAST_FILE;
485 }
486
487 ldbg("contents: " << std::string(inBuf.rptr(), hDataLen));
488 inBuf.drop(hDataLen);
489 if (h.type != header::ANSWER)
490 {
491 dbgline;
492 sErrorMsg = "500 Unexpected response type";
493 // smells fatal...
494 return EFLAG_MIRROR_BROKEN | HINT_DISCON;
495 }
496 ldbg("GOT, parsed: " << h.frontLine);
497
498 int st = h.getStatus();
499
500 if (cfg::redirmax) // internal redirection might be disabled
501 {
502 if (IS_REDIRECT(st))
503 {
504 if (!RewriteSource(h.h[header::LOCATION]))
505 return EFLAG_JOB_BROKEN;
506
507 // drop the redirect page contents if possible so the outer loop
508 // can scan other headers
509 off_t contLen = atoofft(h.h[header::CONTENT_LENGTH], 0);
510 if (contLen <= inBuf.size())
511 inBuf.drop(contLen);
512 return HINT_TGTCHANGE; // no other flags, caller will evaluate the state
513 }
514
515 // for non-redirection responses process as usual
516
517 // unless it's a probe run from the outer loop, in this case we
518 // should go no further
519 if (bOnlyRedirectionActivity)
520 return EFLAG_LOST_CON | HINT_DISCON;
521 }
522
523 // explicitly blacklist mirror if key file is missing
524 if (st >= 400 && m_pRepoDesc && m_remoteUri.sHost.empty())
525 {
526 for (const auto& kfile : m_pRepoDesc->m_keyfiles)
527 {
528 if (endsWith(m_remoteUri.sPath, kfile))
529 {
530 sErrorMsg = "500 Keyfile missing, mirror blacklisted";
531 return HINT_DISCON | EFLAG_MIRROR_BROKEN;
532 }
533 }
534 }
535
536 auto pCon = h.h[header::CONNECTION];
537 if(!pCon)
538 pCon = h.h[header::PROXY_CONNECTION];
539
540 if (pCon && 0 == strcasecmp(pCon, "close"))
541 {
542 ldbg("Peer wants to close connection after request");
543 m_eReconnectASAP = HINT_DISCON;
544 }
545
546 if (m_pStorage->m_bHeadOnly)
547 {
548 m_DlState = STATE_FINISHJOB;
549 }
550 // the only case where we expect a 304
551 else if(st == 304 && cfg::vrangeops == 0)
552 {
553 m_pStorage->SetupComplete();
554 m_DlState = STATE_FINISHJOB;
555 }
556 else if (h.h[header::TRANSFER_ENCODING]
557 && 0 == strcasecmp(h.h[header::TRANSFER_ENCODING], "chunked"))
558 {
559 m_DlState = STATE_GETCHUNKHEAD;
560 h.del(header::TRANSFER_ENCODING); // don't care anymore
561 }
562 else
563 {
564 dbgline;
565 if (!h.h[header::CONTENT_LENGTH])
566 {
567 sErrorMsg = "500 Missing Content-Length";
568 return HINT_DISCON | EFLAG_JOB_BROKEN;
569 }
570 // may support such endless stuff in the future but that's too unreliable for now
571 m_nRest = atoofft(h.h[header::CONTENT_LENGTH]);
572 m_DlState = STATE_PROCESS_DATA;
573 }
574
575 // ok, can pass the data to the file handler
576 auto sremote = RemoteUri(false);
577 h.set(header::XORIG, sremote);
578 bool bDoRetry(false);
579
580 // detect bad auto-redirectors (auth-pages, etc.) by the mime-type of their target
581 if(cfg::redirmax
582 && !cfg::badredmime.empty()
583 && cfg::redirmax != m_nRedirRemaining
584 && h.h[header::CONTENT_TYPE]
585 && strstr(h.h[header::CONTENT_TYPE], cfg::badredmime.c_str())
586 && h.getStatus() < 300) // contains the final data/response
587 {
588 if(m_pStorage->m_bCheckFreshness)
589 {
590 // volatile... this is still ok, just make sure time check works next time
591 h.set(header::LAST_MODIFIED, FAKEDATEMARK);
592 }
593 else
594 {
595 // this was redirected and the destination is BAD!
596 h.frontLine="HTTP/1.1 501 Redirected to invalid target";
597 void DropDnsCache();
598 DropDnsCache();
599 }
600 }
601
602 if(!m_pStorage->DownloadStartedStoreHeader(h, hDataLen,
603 inBuf.rptr(), bHotItem, bDoRetry))
604 {
605 if(bDoRetry)
606 return EFLAG_LOST_CON | HINT_DISCON; // recoverable
607
608 ldbg("Item dl'ed by others or in error state --> drop it, reconnect");
609 m_DlState = STATE_PROCESS_DATA;
610 sErrorMsg = "502 Cache descriptor busy";
611 /* header xh = m_pStorage->GetHeader();
612 if(xh.frontLine.length() > 12)
613 sErrorMsg = sErrorMsg + " (" + xh.frontLine.substr(12) + ")";
614 */
615 return HINT_DISCON | EFLAG_JOB_BROKEN | EFLAG_STORE_COLLISION;
616 }
617 }
618 else if (m_DlState == STATE_PROCESS_CHUNKDATA || m_DlState == STATE_PROCESS_DATA)
619 {
620 // similar states, just handled differently afterwards
621 ldbg("STATE_GETDATA");
622 auto res = NewDataHandler(inBuf);
623 if (HINT_SWITCH != res)
624 return res;
625 }
626 else if (m_DlState == STATE_FINISHJOB)
627 {
628 ldbg("STATE_FINISHJOB");
629 m_DlState = STATE_GETHEADER;
630 m_pStorage->StoreFileData(nullptr, 0);
631 return HINT_DONE | m_eReconnectASAP;
632 }
633 else if (m_DlState == STATE_GETCHUNKHEAD)
634 {
635 ldbg("STATE_GETCHUNKHEAD");
636 // came back from reading, drop remaining newlines?
637 while (inBuf.size() > 0)
638 {
639 char c = *(inBuf.rptr());
640 if (c != '\r' && c != '\n')
641 break;
642 inBuf.drop(1);
643 }
644 const char *crlf(0), *pStart(inBuf.c_str());
645 if (!inBuf.size() || nullptr == (crlf = strstr(pStart, "\r\n")))
646 {
647 inBuf.move();
648 return HINT_MORE;
649 }
650 unsigned len(0);
651 if (1 != sscanf(pStart, "%x", &len))
652 {
653 sErrorMsg = "500 Invalid data stream";
654 return EFLAG_JOB_BROKEN; // hm...?
655 }
656 inBuf.drop(crlf + 2 - pStart);
657 if (len > 0)
658 {
659 m_nRest = len;
660 m_DlState = STATE_PROCESS_CHUNKDATA;
661 }
662 else
663 m_DlState = STATE_GET_CHUNKTRAILER;
664 }
665 else if (m_DlState == STATE_GET_CHUNKTRAILER)
666 {
667 if (inBuf.size() < 2)
668 return HINT_MORE;
669 const char *pStart(inBuf.c_str());
670 const char *crlf(strstr(pStart, "\r\n"));
671 if (!crlf)
672 return HINT_MORE;
673
674 if (crlf == pStart) // valid empty line -> done here
675 {
676 inBuf.drop(2);
677 m_DlState = STATE_FINISHJOB;
678 }
679 else
680 inBuf.drop(crlf + 2 - pStart); // drop line and watch for others
681 }
682 }
683 ASSERT(!"unreachable");
684 sErrorMsg = "502 Invalid state";
685 return EFLAG_JOB_BROKEN;
686 }
687
IsRecoverableStateacng::tDlJob688 inline bool IsRecoverableState()
689 {
690 return (m_DlState == STATE_GETHEADER || m_DlState == STATE_REGETHEADER);
691 // XXX: In theory, could also easily recover from STATE_FINISH but that's
692 // unlikely to happen
693 }
694
695 private:
696 // not to be copied ever
697 tDlJob(const tDlJob&);
698 tDlJob & operator=(const tDlJob&);
699 };
700
wake()701 void dlcon::wake()
702 {
703 if (fdWakeWrite<0)
704 return;
705 #ifdef HAVE_LINUX_EVENTFD
706 while(eventfd_write(fdWakeWrite, 1)<0) ;
707 #else
708 POKE(fdWakeWrite);
709 #endif
710 }
711
AddJob(tFileItemPtr m_pItem,const tHttpUrl * pForcedUrl,const cfg::tRepoData * pBackends,cmstring * sPatSuffix,LPCSTR reqHead,int nMaxRedirection)712 bool dlcon::AddJob(tFileItemPtr m_pItem, const tHttpUrl *pForcedUrl,
713 const cfg::tRepoData *pBackends,
714 cmstring *sPatSuffix, LPCSTR reqHead,
715 int nMaxRedirection)
716 {
717 if(!pForcedUrl)
718 {
719 if(!pBackends || pBackends->m_backends.empty())
720 return false;
721 if(!sPatSuffix || sPatSuffix->empty())
722 return false;
723 }
724 setLockGuard;
725 /*
726 ASSERT(
727 todo->m_pStorage->m_nRangeLimit < 0
728 || todo->m_pStorage->m_nRangeLimit >= todo->m_pStorage->m_nSizeSeen);
729
730 LOGSTART2("dlcon::EnqJob", todo->m_remoteUri.ToURI(false));
731 */
732 m_qNewjobs.emplace_back(
733 make_shared<tDlJob>(this, m_pItem, pForcedUrl, pBackends, sPatSuffix,nMaxRedirection));
734
735 m_qNewjobs.back()->ExtractCustomHeaders(reqHead);
736
737 wake();
738 return true;
739 }
740
SignalStop()741 void dlcon::SignalStop()
742 {
743 LOGSTART("dlcon::SignalStop");
744 setLockGuard;
745
746 // stop all activity as soon as possible
747 m_bStopASAP=true;
748 m_qNewjobs.clear();
749
750 wake();
751 }
752
~dlcon()753 dlcon::~dlcon()
754 {
755 LOGSTART("dlcon::~dlcon, Destroying dlcon");
756 #ifdef HAVE_LINUX_EVENTFD
757 checkforceclose(m_wakeventfd);
758 #else
759 checkforceclose(m_wakepipe[0]);
760 checkforceclose(m_wakepipe[1]);
761 #endif
762 g_nDlCons--;
763 }
764
ExchangeData(mstring & sErrorMsg,tDlStreamHandle & con,tDljQueue & inpipe)765 inline unsigned dlcon::ExchangeData(mstring &sErrorMsg, tDlStreamHandle &con, tDljQueue &inpipe)
766 {
767 LOGSTART2("dlcon::ExchangeData",
768 "qsize: " << inpipe.size() << ", sendbuf size: "
769 << m_sendBuf.size() << ", inbuf size: " << m_inBuf.size());
770
771 fd_set rfds, wfds;
772 struct timeval tv;
773 int r = 0;
774 int fd = con ? con->GetFD() : -1;
775 FD_ZERO(&rfds);
776 FD_ZERO(&wfds);
777
778 if(inpipe.empty())
779 m_inBuf.clear(); // better be sure about dirty buffer from previous connection
780
781 // no socket operation needed in this case but just process old buffer contents
782 bool bReEntered=!m_inBuf.empty();
783
784 loop_again:
785
786 for (;;)
787 {
788 FD_SET(fdWakeRead, &rfds);
789 int nMaxFd = fdWakeRead;
790
791 if (fd>=0)
792 {
793 FD_SET(fd, &rfds);
794 nMaxFd = std::max(fd, nMaxFd);
795
796 if (!m_sendBuf.empty())
797 {
798 ldbg("Needs to send " << m_sendBuf.size() << " bytes");
799 FD_SET(fd, &wfds);
800 }
801 #ifdef HAVE_SSL
802 else if(con->GetBIO() && BIO_should_write(con->GetBIO()))
803 {
804 ldbg("NOTE: OpenSSL wants to write although send buffer is empty!");
805 FD_SET(fd, &wfds);
806 }
807 #endif
808 }
809
810 ldbg("select dlcon");
811 tv.tv_sec = cfg::nettimeout;
812 tv.tv_usec = 0;
813
814 // jump right into data processing but only once
815 if(bReEntered)
816 {
817 bReEntered=false;
818 goto proc_data;
819 }
820
821 r=select(nMaxFd + 1, &rfds, &wfds, nullptr, &tv);
822 ldbg("returned: " << r << ", errno: " << errno);
823
824 if (r < 0)
825 {
826 if (EINTR == errno)
827 continue;
828 #ifdef MINIBUILD
829 string fer("select failed");
830 #else
831 tErrnoFmter fer("FAILURE: select, ");
832 LOG(fer);
833 #endif
834 sErrorMsg = string("500 Internal malfunction, ") + fer;
835 return HINT_DISCON|EFLAG_JOB_BROKEN|EFLAG_MIRROR_BROKEN;
836 }
837 else if (r == 0) // looks like a timeout
838 {
839 sErrorMsg = "500 Connection timeout";
840 // was there anything to do at all?
841 if(inpipe.empty())
842 return HINT_SWITCH;
843
844 if(inpipe.front()->IsRecoverableState())
845 return EFLAG_LOST_CON;
846 else
847 return (HINT_DISCON|EFLAG_JOB_BROKEN);
848 }
849
850 if (FD_ISSET(fdWakeRead, &rfds))
851 {
852 dbgline;
853 #ifdef HAVE_LINUX_EVENTFD
854 eventfd_t xtmp;
855 int tmp;
856 do {
857 tmp = eventfd_read(fdWakeRead, &xtmp);
858 } while (tmp < 0 && (errno == EINTR || errno == EAGAIN));
859
860 #else
861 for (int tmp; read(m_wakepipe[0], &tmp, 1) > 0;)
862 ;
863 #endif
864 return HINT_SWITCH;
865 }
866
867 if (fd >= 0)
868 {
869 if (FD_ISSET(fd, &wfds))
870 {
871 FD_CLR(fd, &wfds);
872
873 #ifdef HAVE_SSL
874 if (con->GetBIO())
875 {
876 int s = BIO_write(con->GetBIO(), m_sendBuf.rptr(),
877 m_sendBuf.size());
878 ldbg(
879 "tried to write to SSL, " << m_sendBuf.size() << " bytes, result: " << s);
880 if (s > 0)
881 m_sendBuf.drop(s);
882 }
883 else
884 {
885 #endif
886 ldbg("Sending data...\n" << m_sendBuf);
887 int s = ::send(fd, m_sendBuf.data(), m_sendBuf.length(),
888 MSG_NOSIGNAL);
889 ldbg(
890 "Sent " << s << " bytes from " << m_sendBuf.length() << " to " << con.get());
891 if (s < 0)
892 {
893 // EAGAIN is weird but let's retry later, otherwise reconnect
894 if (errno != EAGAIN && errno != EINTR)
895 {
896 sErrorMsg = "502 Send failed";
897 return EFLAG_LOST_CON;
898 }
899 }
900 else
901 m_sendBuf.drop(s);
902
903 }
904 #ifdef HAVE_SSL
905 }
906 #endif
907 }
908
909 if (fd >=0 && (FD_ISSET(fd, &rfds)
910 #ifdef HAVE_SSL
911 || (con->GetBIO() && BIO_should_read(con->GetBIO()))
912 #endif
913 ))
914 {
915 if(cfg::maxdlspeed != cfg::RESERVED_DEFVAL)
916 {
917 auto nCntNew=g_nDlCons.load();
918 if(m_nLastDlCount != nCntNew)
919 {
920 m_nLastDlCount=nCntNew;
921
922 // well, split the bandwidth
923 auto nSpeedNowKib = uint(cfg::maxdlspeed) / nCntNew;
924 auto nTakesPerSec = nSpeedNowKib / 32;
925 if(!nTakesPerSec)
926 nTakesPerSec=1;
927 m_nSpeedLimitMaxPerTake = nSpeedNowKib*1024/nTakesPerSec;
928 auto nIntervalUS=1000000 / nTakesPerSec;
929 auto nIntervalUS_copy = nIntervalUS;
930 // creating a bitmask
931 for(m_nSpeedLimiterRoundUp=1,nIntervalUS/=2;nIntervalUS;nIntervalUS>>=1)
932 m_nSpeedLimiterRoundUp = (m_nSpeedLimiterRoundUp<<1)|1;
933 m_nSpeedLimitMaxPerTake = uint(double(m_nSpeedLimitMaxPerTake) * double(m_nSpeedLimiterRoundUp) / double(nIntervalUS_copy));
934 }
935 // waiting for the next time slice to get data from buffer
936 timeval tv;
937 if(0==gettimeofday(&tv, nullptr))
938 {
939 auto usNext = tv.tv_usec | m_nSpeedLimiterRoundUp;
940 usleep(usNext-tv.tv_usec);
941 }
942 }
943 #ifdef HAVE_SSL
944 if(con->GetBIO())
945 {
946 r=BIO_read(con->GetBIO(), m_inBuf.wptr(),
947 std::min(m_nSpeedLimitMaxPerTake, m_inBuf.freecapa()));
948 if(r>0)
949 m_inBuf.got(r);
950 else // <=0 doesn't mean an error, only a double check can tell
951 r=BIO_should_read(con->GetBIO()) ? 1 : -errno;
952 }
953 else
954 #endif
955 {
956 r = m_inBuf.sysread(fd, m_nSpeedLimitMaxPerTake);
957 }
958
959 #ifdef DISCO_FAILURE
960 #warning hej
961 static int fakeFail=-123;
962 if(fakeFail == -123)
963 {
964 srand(getpid());
965 fakeFail = rand()%123;
966 }
967 if( fakeFail-- < 0)
968 {
969 // LOGLVL(log::LOG_DEBUG, "\n#################\nFAKING A FAILURE\n###########\n");
970 r=0;
971 fakeFail=rand()%123;
972 errno = EROFS;
973 //r = -errno;
974 shutdown(con.get()->GetFD(), SHUT_RDWR);
975 }
976 #endif
977
978 if(r == -EAGAIN || r == -EWOULDBLOCK)
979 {
980 ldbg("why EAGAIN/EINTR after getting it from select?");
981 // timespec sleeptime = { 0, 432000000 };
982 // nanosleep(&sleeptime, nullptr);
983 goto loop_again;
984 }
985 else if (r == 0)
986 {
987 dbgline;
988 sErrorMsg = "502 Connection closed";
989 return EFLAG_LOST_CON;
990 }
991 else if (r < 0) // other error, might reconnect
992 {
993 dbgline;
994 #ifdef MINIBUILD
995 sErrorMsg = "502 EPIC FAIL";
996 #else
997 // pickup the error code for later and kill current connection ASAP
998 sErrorMsg = tErrnoFmter("502 ");
999 #endif
1000 return EFLAG_LOST_CON;
1001 }
1002
1003 proc_data:
1004
1005 if(inpipe.empty())
1006 {
1007 ldbg("FIXME: unexpected data returned?");
1008 sErrorMsg = "500 Unexpected data";
1009 return EFLAG_LOST_CON;
1010 }
1011
1012 while(!m_inBuf.empty())
1013 {
1014
1015 ldbg("Processing job for " << inpipe.front()->RemoteUri(false));
1016 unsigned res = inpipe.front()->ProcessIncomming(m_inBuf, false);
1017 ldbg(
1018 "... incoming data processing result: " << res
1019 << ", emsg: " << inpipe.front()->sErrorMsg);
1020
1021 if(res&EFLAG_MIRROR_BROKEN)
1022 {
1023 ldbg("###### BROKEN MIRROR ####### on " << con.get());
1024 }
1025
1026 if (HINT_MORE == res)
1027 goto loop_again;
1028
1029 if (HINT_DONE & res)
1030 {
1031 // just in case that server damaged the last response body
1032 con->KnowLastFile(WEAK_PTR<fileitem>(inpipe.front()->m_pStorage));
1033
1034 inpipe.pop_front();
1035 if (HINT_DISCON & res)
1036 return HINT_DISCON; // with cleaned flags
1037
1038 LOG(
1039 "job finished. Has more? " << inpipe.size()
1040 << ", remaining data? " << m_inBuf.size());
1041
1042 if (inpipe.empty())
1043 {
1044 LOG("Need more work");
1045 return HINT_SWITCH;
1046 }
1047
1048 LOG("Extract more responses");
1049 continue;
1050 }
1051
1052 if (HINT_TGTCHANGE & res)
1053 {
1054 /* If the target was modified for internal redirection then there might be
1055 * more responses of that kind in the queue. Apply the redirection handling
1056 * to the rest as well if possible without having side effects.
1057 */
1058 auto it = inpipe.begin();
1059 for(++it; it != inpipe.end(); ++it)
1060 {
1061 unsigned rr = (**it).ProcessIncomming(m_inBuf, true);
1062 // just the internal rewriting applied and nothing else?
1063 if( HINT_TGTCHANGE != rr )
1064 {
1065 // not internal redirection or some failure doing it
1066 m_nTempPipelineDisable=30;
1067 return (HINT_TGTCHANGE|HINT_DISCON);
1068 }
1069 }
1070 // processed all inpipe stuff but if the buffer is still not empty then better disconnect
1071 return HINT_TGTCHANGE | (m_inBuf.empty() ? 0 : HINT_DISCON);
1072 }
1073
1074 // else case: error handling, pass to main loop
1075 if(HINT_KILL_LAST_FILE & res)
1076 con->KillLastFile();
1077 setIfNotEmpty(sErrorMsg, inpipe.front()->sErrorMsg);
1078 return res;
1079 }
1080 return HINT_DONE; // input buffer consumed
1081 }
1082 }
1083
1084 ASSERT(!"Unreachable");
1085 sErrorMsg = "500 Internal failure";
1086 return EFLAG_JOB_BROKEN|HINT_DISCON;
1087 }
1088
WorkLoop()1089 void dlcon::WorkLoop()
1090 {
1091 LOGSTART("dlcon::WorkLoop");
1092 string sErrorMsg;
1093 m_inBuf.clear();
1094
1095 if (!m_inBuf.setsize(cfg::dlbufsize))
1096 {
1097 log::err("500 Out of memory");
1098 return;
1099 }
1100
1101 if(fdWakeRead<0 || fdWakeWrite<0)
1102 {
1103 log::err("Error creating pipe file descriptors");
1104 return;
1105 }
1106
1107 tDljQueue inpipe;
1108 tDlStreamHandle con;
1109 unsigned loopRes=0;
1110
1111 bool bStopRequesting=false; // hint to stop adding request headers until the connection is restarted
1112
1113 int nLostConTolerance=MAX_RETRY;
1114
1115 auto BlacklistMirror = [&](tDlJobPtr & job)
1116 {
1117 LOGSTART2("BlacklistMirror", "blacklisting " <<
1118 job->GetPeerHost().ToURI(false));
1119 m_blacklist[std::make_pair(job->GetPeerHost().sHost,
1120 job->GetPeerHost().GetPort())] = sErrorMsg;
1121 };
1122
1123 auto prefProxy = [&](tDlJobPtr& cjob) -> const tHttpUrl*
1124 {
1125 if(m_bProxyTot)
1126 return nullptr;
1127
1128 if(cjob->m_pRepoDesc && cjob->m_pRepoDesc->m_pProxy
1129 && !cjob->m_pRepoDesc->m_pProxy->sHost.empty())
1130 {
1131 return cjob->m_pRepoDesc->m_pProxy;
1132 }
1133 return cfg::GetProxyInfo();
1134 };
1135
1136 while(true) // outer loop: jobs, connection handling
1137 {
1138 // init state or transfer loop jumped out, what are the needed actions?
1139 {
1140 setLockGuard;
1141 LOG("New jobs: " << m_qNewjobs.size());
1142
1143 if(m_bStopASAP)
1144 {
1145 /* The no-more-users checking logic will purge orphaned items from the inpipe
1146 * queue. When the connection is dirty after that, it will be closed in the
1147 * ExchangeData() but if not then it can be assumed to be clean and reusable.
1148 */
1149 if(inpipe.empty())
1150 {
1151 if(con)
1152 m_pConFactory->RecycleIdleConnection(con);
1153 return;
1154 }
1155 }
1156
1157
1158 if(m_qNewjobs.empty())
1159 goto go_select; // parent will notify RSN
1160
1161 if(!con)
1162 {
1163 // cleanup after the last connection - send buffer, broken jobs, ...
1164 m_sendBuf.clear();
1165 m_inBuf.clear();
1166 inpipe.clear();
1167
1168 bStopRequesting=false;
1169
1170 for(tDljQueue::iterator it=m_qNewjobs.begin(); it!=m_qNewjobs.end();)
1171 {
1172 if((**it).SetupJobConfig(sErrorMsg, m_blacklist))
1173 ++it;
1174 else
1175 {
1176 setIfNotEmpty2( (**it).sErrorMsg, sErrorMsg,
1177 "500 Broken mirror or incorrect configuration");
1178 m_qNewjobs.erase(it++);
1179 }
1180 }
1181 if(m_qNewjobs.empty())
1182 {
1183 LOG("no jobs left, start waiting")
1184 goto go_select; // nothing left, might receive new jobs soon
1185 }
1186
1187 bool bUsed = false;
1188 ASSERT(!m_qNewjobs.empty());
1189
1190 auto doconnect = [&](const tHttpUrl& tgt, int timeout, bool fresh)
1191 {
1192 return m_pConFactory->CreateConnected(tgt.sHost,
1193 tgt.GetPort(),
1194 sErrorMsg,
1195 &bUsed,
1196 m_qNewjobs.front()->GetConnStateTracker(),
1197 IFSSLORFALSE(tgt.bSSL),
1198 timeout, fresh);
1199 } ;
1200
1201 auto& cjob = m_qNewjobs.front();
1202 auto proxy = prefProxy(cjob);
1203 auto& peerHost = cjob->GetPeerHost();
1204
1205 #ifdef HAVE_SSL
1206 if(peerHost.bSSL)
1207 {
1208 if(proxy)
1209 {
1210 con = doconnect(*proxy, cfg::optproxytimeout > 0 ?
1211 cfg::optproxytimeout : cfg::nettimeout, false);
1212 if(con)
1213 {
1214 if(!con->StartTunnel(peerHost, sErrorMsg, & proxy->sUserPass, true))
1215 con.reset();
1216 }
1217 }
1218 else
1219 con = doconnect(peerHost, cfg::nettimeout, false);
1220 }
1221 else
1222 #endif
1223 {
1224 if(proxy)
1225 {
1226 con = doconnect(*proxy, cfg::optproxytimeout > 0 ?
1227 cfg::optproxytimeout : cfg::nettimeout, false);
1228 }
1229 else
1230 con = doconnect(peerHost, cfg::nettimeout, false);
1231 }
1232
1233 if(!con && proxy && cfg::optproxytimeout>0)
1234 {
1235 ldbg("optional proxy broken, disable");
1236 m_bProxyTot = true;
1237 proxy = nullptr;
1238 cfg::MarkProxyFailure();
1239 con = doconnect(peerHost, cfg::nettimeout, false);
1240 }
1241
1242 ldbg("connection valid? " << bool(con) << " was fresh? " << !bUsed);
1243
1244 if(con)
1245 {
1246 ldbg("target? [" << con->GetHostname() << "]:" << con->GetPort());
1247
1248 // must test this connection, just be sure no crap is in the pipe
1249 if (bUsed && check_read_state(con->GetFD()))
1250 {
1251 ldbg("code: MoonWalker");
1252 con.reset();
1253 continue;
1254 }
1255 }
1256 else
1257 {
1258 BlacklistMirror(cjob);
1259 continue; // try the next backend
1260 }
1261 }
1262
1263 // connection should be stable now, prepare all jobs and/or move to pipeline
1264 while(!bStopRequesting
1265 && !m_qNewjobs.empty()
1266 && int(inpipe.size()) <= cfg::pipelinelen)
1267 {
1268 tDlJobPtr &cjob = m_qNewjobs.front();
1269
1270 if(!cjob->SetupJobConfig(sErrorMsg, m_blacklist))
1271 {
1272 // something weird happened to it, drop it and let the client care
1273 m_qNewjobs.pop_front();
1274 continue;
1275 }
1276
1277 auto& tgt=cjob->GetPeerHost();
1278 // good case, direct or tunneled connection
1279 bool match=(tgt.sHost == con->GetHostname() && tgt.GetPort() == con->GetPort());
1280 const tHttpUrl * proxy = nullptr; // to be set ONLY if PROXY mode is used
1281
1282 // if not exact and can be proxied, and is this the right proxy?
1283 if(!match)
1284 {
1285 proxy = prefProxy(cjob);
1286 if(proxy)
1287 {
1288 /*
1289 * SSL over proxy uses HTTP tunnels (CONNECT scheme) so the check
1290 * above should have matched before.
1291 */
1292 if(!tgt.bSSL)
1293 match=(proxy->sHost == con->GetHostname() && proxy->GetPort() == con->GetPort());
1294 }
1295 // else... host changed and not going through the same proxy -> fail
1296 }
1297
1298 if(!match)
1299 {
1300 LOG("host mismatch, new target: " << tgt.sHost << ":" << tgt.GetPort());
1301 bStopRequesting=true;
1302 break;
1303 }
1304
1305 cjob->AppendRequest(m_sendBuf, m_sXForwardedFor, proxy);
1306 LOG("request added to buffer");
1307 inpipe.emplace_back(cjob);
1308 m_qNewjobs.pop_front();
1309
1310 if (m_nTempPipelineDisable > 0)
1311 {
1312 bStopRequesting = true;
1313 --m_nTempPipelineDisable;
1314 break;
1315 }
1316 }
1317 }
1318
1319 ldbg("Request(s) cooked, buffer contents: " << m_sendBuf);
1320
1321 go_select:
1322
1323 if(inpipe.empty() && m_bManualMode)
1324 {
1325 return;
1326 }
1327
1328 // inner loop: plain communication until something happens. Maybe should use epoll here?
1329 loopRes=ExchangeData(sErrorMsg, con, inpipe);
1330 ldbg("loopRes: "<< loopRes);
1331
1332 /* check whether we have a pipeline stall. This may happen because a) we are done or
1333 * b) because of the remote hostname change or c) the client stopped sending tasks.
1334 * Anyhow, that's a reason to put the connection back into the shared pool so either we
1335 * get it back from the pool in the next workloop cycle or someone else gets it and we
1336 * get a new connection for the new host later.
1337 * */
1338 if (inpipe.empty())
1339 {
1340 // all requests have been processed (client done, or pipeline stall, who cares)
1341 dbgline;
1342
1343 // no matter what happened, that stop flag is now irrelevant
1344 bStopRequesting = false;
1345
1346 // no error bits set, not busy -> this connection is still good, recycle properly
1347 unsigned all_err = HINT_DISCON | EFLAG_JOB_BROKEN | EFLAG_LOST_CON | EFLAG_MIRROR_BROKEN;
1348 if (con && !(loopRes & all_err))
1349 {
1350 dbgline;
1351 m_pConFactory->RecycleIdleConnection(con);
1352 continue;
1353 }
1354 }
1355
1356 /*
1357 * Here we go if the inpipe is still not processed or there have been errors
1358 * needing special handling.
1359 */
1360
1361 if( (HINT_DISCON|EFLAG_LOST_CON) & loopRes)
1362 {
1363 dbgline;
1364 con.reset();
1365 m_inBuf.clear();
1366 m_sendBuf.clear();
1367 }
1368
1369 if ( loopRes & HINT_TGTCHANGE )
1370 {
1371 // short queue continues jobs with rewritten targets, so
1372 // reinsert them into the new task list and continue
1373
1374 // if conn was not reset above then it should be in good shape
1375 m_pConFactory->RecycleIdleConnection(con);
1376 goto move_jobs_back_to_q;
1377 }
1378
1379 if ((EFLAG_LOST_CON & loopRes) && !inpipe.empty())
1380 {
1381 // disconnected by OS... give it a chance, or maybe not...
1382 if (--nLostConTolerance <= 0)
1383 {
1384 BlacklistMirror(inpipe.front());
1385 nLostConTolerance=MAX_RETRY;
1386 }
1387
1388 con.reset();
1389
1390 timespec sleeptime = { 0, 325000000 };
1391 nanosleep(&sleeptime, nullptr);
1392
1393 // trying to resume that job secretly, unless user disabled the use of range (we
1394 // cannot resync the sending position ATM, throwing errors to user for now)
1395 if (cfg::vrangeops <= 0 && inpipe.front()->m_pStorage->m_bCheckFreshness)
1396 loopRes |= EFLAG_JOB_BROKEN;
1397 else
1398 inpipe.front()->m_DlState = tDlJob::STATE_REGETHEADER;
1399 }
1400
1401 if(loopRes & (HINT_DONE|HINT_MORE))
1402 {
1403 sErrorMsg.clear();
1404 continue;
1405 }
1406
1407 //
1408 // regular required post-processing done here, now handle special conditions
1409 //
1410
1411
1412 if(HINT_SWITCH == loopRes)
1413 continue;
1414
1415 // resolving the "fatal error" situation, push the pipelined job back to new, etc.
1416
1417 if( (EFLAG_MIRROR_BROKEN & loopRes) && !inpipe.empty())
1418 BlacklistMirror(inpipe.front());
1419
1420 if( (EFLAG_JOB_BROKEN & loopRes) && !inpipe.empty())
1421 {
1422 setIfNotEmpty(inpipe.front()->sErrorMsg, sErrorMsg);
1423
1424 inpipe.pop_front();
1425
1426 if(EFLAG_STORE_COLLISION & loopRes)
1427 {
1428 // stupid situation, both users downloading the same stuff - and most likely in the same order
1429 // if one downloader runs a step ahead (or usually many steps), drop all items
1430 // already processed by it and try to continue somewhere else.
1431 // This way, the overall number of collisions and reconnects is minimized
1432
1433 auto cleaner = [](tDljQueue &joblist)
1434 {
1435 for(auto it = joblist.begin(); it!= joblist.end();)
1436 {
1437 if(*it && (**it).m_pStorage
1438 && (**it).m_pStorage->GetStatus() >= fileitem::FIST_DLRECEIVING)
1439 {
1440 // someone else is doing it -> drop
1441 joblist.erase(it++);
1442 continue;
1443 }
1444 else
1445 ++it;
1446 }
1447 };
1448 cleaner(inpipe);
1449 setLockGuard;
1450 cleaner(m_qNewjobs);
1451 }
1452 }
1453
1454 move_jobs_back_to_q:
1455 // for the jobs that were not finished and/or dropped, move them back into the task queue
1456 {
1457 setLockGuard;
1458 m_qNewjobs.insert(m_qNewjobs.begin(), inpipe.begin(), inpipe.end());
1459 inpipe.clear();
1460 }
1461
1462 }
1463 }
1464
1465 }
1466