1 /*
2 * lftp - file transfer program
3 *
4 * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net)
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <time.h>
23 #include "trio.h"
24 #ifdef TIME_WITH_SYS_TIME
25 # include <sys/types.h>
26 # include <sys/time.h>
27 #endif
28
29 #include "SMTask.h"
30 #include "Timer.h"
31 #include "misc.h"
32
33 #ifdef TASK_DEBUG
34 # define DEBUG(x) do{printf x;fflush(stdout);}while(0)
35 #else
36 # define DEBUG(x) do{}while(0)
37 #endif
38
39 xlist_head<SMTask> SMTask::all_tasks;
40 xlist_head<SMTask> SMTask::ready_tasks;
41 xlist_head<SMTask> SMTask::new_tasks;
42 xlist_head<SMTask> SMTask::deleted_tasks;
43
44 SMTask *SMTask::current;
45
46 SMTask *SMTask::stack[SMTASK_MAX_DEPTH];
47 int SMTask::stack_ptr;
48
49 PollVec SMTask::block;
50 TimeDate SMTask::now;
51 time_t SMTask::last_block;
52
53 static SMTask *init_task=new SMTaskInit;
54
SMTask()55 SMTask::SMTask()
56 : all_tasks_node(this), ready_tasks_node(this),
57 new_tasks_node(this), deleted_tasks_node(this)
58 {
59 // insert in the chain
60 all_tasks.add(all_tasks_node);
61
62 suspended=false;
63 suspended_slave=false;
64 running=0;
65 ref_count=0;
66 deleting=false;
67 new_tasks.add(new_tasks_node);
68 DEBUG(("new SMTask %p (count=%d)\n",this,all_tasks.count()));
69 }
70
Suspend()71 void SMTask::Suspend()
72 {
73 if(suspended)
74 return;
75 DEBUG(("Suspend(%p) from %p\n",this,current));
76 if(!IsSuspended())
77 SuspendInternal();
78 suspended=true;
79 }
Resume()80 void SMTask::Resume()
81 {
82 if(!suspended)
83 return;
84 suspended=false;
85 if(!IsSuspended())
86 ResumeInternal();
87 }
SuspendSlave()88 void SMTask::SuspendSlave()
89 {
90 if(suspended_slave)
91 return;
92 DEBUG(("SuspendSlave(%p) from %p\n",this,current));
93 if(!IsSuspended())
94 SuspendInternal();
95 suspended_slave=true;
96 }
ResumeSlave()97 void SMTask::ResumeSlave()
98 {
99 if(!suspended_slave)
100 return;
101 suspended_slave=false;
102 if(!IsSuspended())
103 ResumeInternal();
104 }
ResumeInternal()105 void SMTask::ResumeInternal()
106 {
107 if(!new_tasks_node.listed() && !ready_tasks_node.listed())
108 new_tasks.add_tail(new_tasks_node);
109 }
110
~SMTask()111 SMTask::~SMTask()
112 {
113 DEBUG(("delete SMTask %p (count=%d)\n",this,all_tasks.count()));
114 assert(!running);
115 assert(!ref_count);
116 assert(deleting);
117
118 if(ready_tasks_node.listed())
119 ready_tasks_node.remove();
120 if(new_tasks_node.listed())
121 new_tasks_node.remove();
122 assert(!deleted_tasks_node.listed());
123
124 // remove from the chain
125 all_tasks_node.remove();
126 }
127
DeleteLater()128 void SMTask::DeleteLater()
129 {
130 if(deleting)
131 return;
132 deleting=true;
133 deleted_tasks.add_tail(deleted_tasks_node);
134 PrepareToDie();
135 }
Delete(SMTask * task)136 void SMTask::Delete(SMTask *task)
137 {
138 if(!task)
139 return;
140 task->DeleteLater();
141 // CollectGarbage will delete the task gracefully
142 }
_SetRef(SMTask * task,SMTask * new_task)143 SMTask *SMTask::_SetRef(SMTask *task,SMTask *new_task)
144 {
145 _DeleteRef(task);
146 _MakeRef(new_task);
147 return new_task;
148 }
149
Enter(SMTask * task)150 void SMTask::Enter(SMTask *task)
151 {
152 assert(stack_ptr<SMTASK_MAX_DEPTH);
153 stack[stack_ptr++]=current;
154 current=task;
155 current->running++;
156 }
Leave(SMTask * task)157 void SMTask::Leave(SMTask *task)
158 {
159 assert(current==task);
160 current->running--;
161 assert(stack_ptr>0);
162 current=stack[--stack_ptr];
163 }
164
Roll(SMTask * task)165 int SMTask::Roll(SMTask *task)
166 {
167 int m=STALL;
168 if(task->running || task->deleting)
169 return m;
170 Enter(task);
171 while(!task->deleting && task->Do()==MOVED)
172 m=MOVED;
173 Leave(task);
174 return m;
175 }
176
RollAll(const TimeInterval & max_time)177 void SMTask::RollAll(const TimeInterval &max_time)
178 {
179 Timer limit_timer(max_time);
180 do { Schedule(); }
181 while(block.WillNotBlock() && !limit_timer.Stopped());
182 }
183
CollectGarbage()184 int SMTask::CollectGarbage()
185 {
186 int count=0;
187 xlist_for_each_safe(SMTask,deleted_tasks,node,task,next)
188 {
189 if(task->running || task->ref_count)
190 continue;
191 node->remove();
192 delete task;
193 count++;
194 }
195 return count;
196 }
197
ScheduleThis()198 int SMTask::ScheduleThis()
199 {
200 assert(ready_tasks_node.listed());
201 if(running)
202 return STALL;
203 if(deleting || IsSuspended())
204 {
205 ready_tasks_node.remove();
206 return STALL;
207 }
208 Enter(); // mark it current and running.
209 int res=Do(); // let it run.
210 Leave(); // unmark it running and change current.
211 return res;
212 }
213
ScheduleNew()214 int SMTask::ScheduleNew()
215 {
216 int res=STALL;
217 xlist_for_each_safe(SMTask,new_tasks,node,task,next)
218 {
219 task->new_tasks_node.remove();
220 ready_tasks.add(task->ready_tasks_node);
221 SMTask *next_task=next->get_obj();
222 if(next_task) // protect next from deleting
223 next_task->IncRefCount();
224 res|=task->ScheduleThis();
225 if(next_task)
226 next_task->DecRefCount();
227 }
228 return res;
229 }
230
Schedule()231 void SMTask::Schedule()
232 {
233 block.Empty();
234
235 // get time once and assume Do() don't take much time
236 UpdateNow();
237
238 timeval timer_timeout=Timer::GetTimeoutTV();
239 if(timer_timeout.tv_sec>=0)
240 block.SetTimeout(timer_timeout);
241
242 int res=ScheduleNew();
243 xlist_for_each_safe(SMTask,ready_tasks,node,task,next)
244 {
245 SMTask *next_task=next->get_obj();
246 if(next_task) // protect next from deleting
247 next_task->IncRefCount();
248 res|=task->ScheduleThis();
249 res|=ScheduleNew(); // run just created tasks immediately
250 if(next_task)
251 next_task->DecRefCount();
252 }
253 CollectGarbage();
254 if(res)
255 block.NoWait();
256 }
257
Block()258 void SMTask::Block()
259 {
260 // use timer to force periodic select to find out which FDs are ready.
261 if(block.WillNotBlock() && last_block==now.UnixTime())
262 return;
263 block.Block();
264 last_block=now.UnixTime();
265 }
266
Do()267 int SMTaskInit::Do()
268 {
269 return STALL;
270 }
SMTaskInit()271 SMTaskInit::SMTaskInit()
272 {
273 Enter();
274 }
~SMTaskInit()275 SMTaskInit::~SMTaskInit()
276 {
277 Leave();
278 }
279
TaskCount()280 int SMTask::TaskCount()
281 {
282 return all_tasks.count();
283 }
284
Cleanup()285 void SMTask::Cleanup()
286 {
287 CollectGarbage();
288 Delete(init_task);
289 CollectGarbage();
290 }
291
292 #include <errno.h>
293 #include "ResMgr.h"
294 ResDecl enospc_fatal ("xfer:disk-full-fatal","no",ResMgr::BoolValidate,ResMgr::NoClosure);
NonFatalError(int err)295 bool SMTask::NonFatalError(int err)
296 {
297 if(E_RETRY(err))
298 return true;
299
300 current->TimeoutS(1);
301 if(err==ENFILE || err==EMFILE)
302 return true;
303 #ifdef ENOBUFS
304 if(err==ENOBUFS)
305 return true;
306 #endif
307 #ifdef ENOSR
308 if(err==ENOSR)
309 return true;
310 #endif
311 #ifdef ENOSPC
312 if(err==ENOSPC)
313 return !enospc_fatal.QueryBool(0);
314 #endif
315 #ifdef EDQUOT
316 if(err==EDQUOT)
317 return !enospc_fatal.QueryBool(0);
318 #endif
319
320 current->Timeout(0);
321 return false; /* fatal error */
322 }
323
PrintTasks()324 void SMTask::PrintTasks()
325 {
326 xlist_for_each(SMTask,all_tasks,node,scan)
327 {
328 const char *c=scan->GetLogContext();
329 if(!c) c="";
330 printf("%p\t%c%c%c\t%d\t%s\n",scan,scan->running?'R':' ',
331 scan->suspended?'S':' ',scan->deleting?'D':' ',scan->ref_count,c);
332 }
333 }
334