1 /*==============================================================================
2 *
3 *                            PUBLIC DOMAIN NOTICE
4 *               National Center for Biotechnology Information
5 *
6 *  This software/database is a "United States Government Work" under the
7 *  terms of the United States Copyright Act.  It was written as part of
8 *  the author's official duties as a United States Government employee and
9 *  thus cannot be copyrighted.  This software/database is freely available
10 *  to the public for use. The National Library of Medicine and the U.S.
11 *  Government have not placed any restriction on its use or reproduction.
12 *
13 *  Although all reasonable efforts have been taken to ensure the accuracy
14 *  and reliability of the software and data, the NLM and the U.S.
15 *  Government do not and cannot warrant the performance or results that
16 *  may be obtained by using this software or data. The NLM and the U.S.
17 *  Government disclaim all warranties, express or implied, including
18 *  warranties of performance, merchantability or fitness for any particular
19 *  purpose.
20 *
21 *  Please cite the author in any work or product based on this material.
22 * =========================================================================== */
23 
24 #include <kfs/file.h> /* KFileRelease */
25 
26 #include <klib/time.h> /* KSleep */
27 #include <klib/status.h> /* KStsLevelGet */
28 
29 #include <strtol.h> /* strtou64 */
30 
31 #include "PrfMain.h"
32 #include "PrfRetrier.h"
33 
PrfRetrierReset(PrfRetrier * self,uint64_t pos)34 void PrfRetrierReset(PrfRetrier * self, uint64_t pos) {
35     self->_pos = pos;
36 
37     if (self->_failed) {
38         self->_failed = false;
39 
40         self->_state = eRSJustRetry;
41         self->curSize = self->_bsize;
42         self->_sleepTO = 0;
43 
44         if (KStsLevelGet() >= 1)
45             PLOGERR(klogErr, (klogErr, 0,
46                 "KFileRead success: '$(name)':$(pos)/$(sz)",
47                 "name=%S,pos=%\'lu,sz=%\'lu",
48                 self->_src, self->_pos, self->_size));
49     }
50 }
51 
PrfRetrierInit(PrfRetrier * self,const PrfMain * mane,const struct VPath * path,const struct String * src,bool isUri,const struct KFile ** f,size_t size,uint64_t pos)52 void PrfRetrierInit(PrfRetrier * self, const PrfMain * mane,
53     const struct VPath * path, const struct String * src, bool isUri,
54     const struct KFile ** f, size_t size, uint64_t pos)
55 {
56     assert(self && f && *f && src);
57 
58     memset(self, 0, sizeof *self);
59 
60     self->_bsize = mane->bsize;
61     self->_mgr = mane->kns;
62     self->_path = path;
63     self->_src = src;
64     self->_size = size;
65     self->_isUri = isUri;
66 
67     self->curSize = self->_bsize;
68     self->_f = f;
69 
70     PrfRetrierReset(self, pos);
71 }
72 
PrfRetrierReopenRemote(PrfRetrier * self)73 static rc_t PrfRetrierReopenRemote(PrfRetrier * self) {
74     rc_t rc = 0;
75 
76     uint32_t timeout = 1;
77 
78     KFileRelease(*self->_f);
79     *self->_f = NULL;
80 
81     while (timeout < 10 * 60 /* 10 minutes */) {
82         rc = _KFileOpenRemote(self->_f, self->_mgr, self->_path,
83             self->_src, !self->_isUri);
84         if (rc == 0)
85             break;
86 
87         KSleep(timeout);
88         timeout *= 2;
89     }
90 
91     return rc;
92 }
93 
PrfRetrierDecBufSz(PrfRetrier * self,rc_t rc)94 static bool PrfRetrierDecBufSz(PrfRetrier * self, rc_t rc) {
95     const size_t MIN = 3768;
96 
97     assert(self);
98 
99     /* 3768 is usually returned by KStreamRead (size of TLS buffer?) */
100     if (self->curSize == MIN)
101         return false;
102 
103     self->curSize /= 2;
104     if (self->curSize < MIN)
105         self->curSize = MIN;
106 
107     return true;
108 }
109 
PrfRetrierIncSleepTO(PrfRetrier * self)110 static bool PrfRetrierIncSleepTO(PrfRetrier * self) {
111     if (self->_sleepTO == 0)
112         self->_sleepTO = 1;
113     else
114         self->_sleepTO *= 2;
115 
116     if (self->_sleepTO == 16)
117         --self->_sleepTO;
118 
119     if (self->_sleepTO > 20 * 60 /* 20 minutes */)
120         return false;
121     else
122         return true;
123 }
124 
PrfRetrierAgain(PrfRetrier * self,rc_t rc,uint64_t pos)125 rc_t PrfRetrierAgain(PrfRetrier * self, rc_t rc, uint64_t pos) {
126     bool retry = true;
127 
128     static bool INITED = false;
129     static KTime_t D_T = ~0;
130     if (!INITED) {
131         const char * str = getenv("NCBI_VDB_PREFETCH_RETRY");
132         if (str != NULL) {
133             char *end = NULL;
134             D_T = strtou64(str, &end, 0);
135             if (end[0] != 0)
136                 D_T = ~0;
137         }
138         INITED = true;
139     }
140 
141     assert(self);
142 
143     if (D_T == 0)
144         retry = false;
145     else if (pos <= self->_pos)
146         switch (self->_state) {
147         case eRSJustRetry:
148             break;
149         case eRSReopen:
150             if (PrfRetrierReopenRemote(self) != 0)
151                 retry = false;
152             break;
153         case eRSDecBuf:
154             if (PrfRetrierDecBufSz(self, rc))
155                 break;
156             else
157                 self->_state = eRSIncTO;
158         case eRSIncTO:
159             if (!PrfRetrierIncSleepTO(self))
160                 retry = false;
161             else if (D_T != ~0 && KTimeStamp() - self->_tFailed > D_T)
162                 retry = false;
163             break;
164         default: assert(0); break;
165         }
166 
167     if (!self->_failed) {
168         self->_failed = true;
169         self->_tFailed = KTimeStamp();
170     }
171 
172     if (retry) {
173         KStsLevel lvl = KStsLevelGet();
174         if (lvl > 0) {
175             if (lvl == 1)
176                 PLOGERR(klogErr, (klogErr, rc,
177                     "Cannot KFileRead: retrying '$(name)'...",
178                     "name=%S", self->_src));
179             else
180                 switch (self->_state) {
181                 case eRSJustRetry:
182                     PLOGERR(klogErr, (klogErr, rc,
183                         "Cannot KFileRead: retrying '$(name)':$(pos)...",
184                         "name=%S,pos=%lu", self->_src, self->_pos));
185                     break;
186                 case eRSReopen:
187                     PLOGERR(klogErr, (klogErr, rc,
188                         "Cannot KFileRead: reopened, retrying '$(name)'...",
189                         "name=%S", self->_src));
190                     break;
191                 case eRSDecBuf:
192                     PLOGERR(klogErr, (klogErr, rc, "Cannot KFileRead: "
193                         "buffer size = $(sz), retrying '$(name)'...",
194                         "sz=%zu,name=%S", self->curSize, self->_src));
195                     break;
196                 case eRSIncTO:
197                     PLOGERR(klogErr, (klogErr, rc, "Cannot KFileRead: "
198                         "sleep TO = $(to)s, retrying '$(name)'...",
199                         "to=%u,name=%S", self->_sleepTO, self->_src));
200                     break;
201                 default: assert(0); break;
202                 }
203         }
204 
205         rc = 0;
206     }
207     else
208         PLOGERR(klogErr, (klogErr, rc,
209             "Cannot KFileRead '$(name)': sz=$(sz), to=$(to)",
210             "name=%S,sz=%zu,to=%u",
211             self->_src, self->curSize, self->_sleepTO));
212 
213     if (rc == 0) {
214         if (self->_sleepTO > 0) {
215             STSMSG(2, ("Sleeping %us...", self->_sleepTO));
216 #ifndef TESTING_FAILURES
217             KSleep(self->_sleepTO);
218 #endif
219         }
220 
221         if (++self->_state == eRSMax)
222             self->_state = eRSReopen;
223     }
224 
225     return rc;
226 }
227 
228