1 /*
2    Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2014 EditShare, 2013-2017 Skytechnology sp. z o.o..
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 
21 #include "event_loop.h"
22 
23 #include <atomic>
24 #include <list>
25 #include <sys/time.h>
26 #include <unistd.h>
27 
28 #include "common/cfg.h"
29 #include "common/exception.h"
30 #include "common/massert.h"
31 
32 #if defined(_WIN32)
33   #include "common/sockets.h"
34 #endif
35 
36 ExitingStatus gExitingStatus = ExitingStatus::kRunning;
37 bool gReloadRequested = false;
38 static bool nextPollNonblocking = false;
39 
40 typedef struct pollentry {
41 	void (*desc)(std::vector<pollfd>&);
42 	void (*serve)(const std::vector<pollfd>&);
43 } pollentry;
44 
45 namespace {
46 std::list<pollentry> gPollEntries;
47 }
48 
49 struct timeentry {
50 	typedef void (*fun_t)(void);
timeentrytimeentry51 	timeentry(uint64_t ne, uint64_t sec, uint64_t off, int mod, fun_t f, bool ms)
52 		: nextevent(ne), period(sec), offset(off), mode(mod), fun(f), millisecond_precision(ms) {
53 	}
54 	uint64_t nextevent;
55 	uint64_t period;
56 	uint64_t offset;
57 	int      mode;
58 	fun_t    fun;
59 	bool     millisecond_precision;
60 };
61 
62 typedef std::list<timeentry> TimeEntries;
63 namespace {
64 TimeEntries gTimeEntries;
65 }
66 
67 static std::atomic<uint32_t> now;
68 static std::atomic<uint64_t> usecnow;
69 
70 typedef void(*FunctionEntry)(void);
71 typedef int(*CanExitEntry)(void);
72 typedef std::list<FunctionEntry> EntryList;
73 typedef std::list<CanExitEntry> CanExitEntryList;
74 static EntryList gDestructEntries;
75 static CanExitEntryList gCanExitEntries;
76 static EntryList gWantExitEntries;
77 static EntryList gReloadEntries;
78 static EntryList gEachLoopEntries;
79 
eventloop_make_next_poll_nonblocking()80 void eventloop_make_next_poll_nonblocking() {
81 	nextPollNonblocking = true;
82 }
83 
eventloop_destructregister(FunctionEntry fun)84 void eventloop_destructregister (FunctionEntry fun) {
85 	gDestructEntries.push_front(fun);
86 }
87 
eventloop_canexitregister(CanExitEntry fun)88 void eventloop_canexitregister (CanExitEntry fun) {
89 	gCanExitEntries.push_front(fun);
90 }
91 
eventloop_wantexitregister(FunctionEntry fun)92 void eventloop_wantexitregister (FunctionEntry fun) {
93 	gWantExitEntries.push_front(fun);
94 }
95 
eventloop_reloadregister(FunctionEntry fun)96 void eventloop_reloadregister (FunctionEntry fun) {
97 	gReloadEntries.push_front(fun);
98 }
99 
eventloop_pollregister(void (* desc)(std::vector<pollfd> &),void (* serve)(const std::vector<pollfd> &))100 void eventloop_pollregister(void (*desc)(std::vector<pollfd>&),void (*serve)(const std::vector<pollfd>&)) {
101 	gPollEntries.push_back({desc,serve});
102 }
103 
eventloop_eachloopregister(FunctionEntry fun)104 void eventloop_eachloopregister (FunctionEntry fun) {
105 	gEachLoopEntries.push_front(fun);
106 }
107 
eventloop_timeregister(int mode,uint64_t seconds,uint64_t offset,FunctionEntry fun)108 void *eventloop_timeregister(int mode, uint64_t seconds, uint64_t offset, FunctionEntry fun) {
109 	if (seconds == 0 || offset >= seconds) {
110 		return NULL;
111 	}
112 
113 	uint64_t nextevent = ((now + seconds) / seconds) * seconds + offset;
114 
115 	gTimeEntries.push_front(timeentry(nextevent, seconds, offset, mode, fun, false));
116 	return &gTimeEntries.front();
117 }
118 
eventloop_timeregister_ms(uint64_t period,FunctionEntry fun)119 void *eventloop_timeregister_ms(uint64_t period, FunctionEntry fun) {
120 	if (period == 0) {
121 		return NULL;
122 	}
123 
124 	uint64_t nextevent = usecnow / 1000 + period;
125 
126 	gTimeEntries.push_front(timeentry(nextevent, period, 0, TIMEMODE_RUN_LATE, fun, true));
127 	return &gTimeEntries.front();
128 }
129 
eventloop_timeunregister(void * handler)130 void eventloop_timeunregister(void* handler) {
131 	for (TimeEntries::iterator it = gTimeEntries.begin(); it != gTimeEntries.end(); ++it) {
132 		if (&(*it) == handler) {
133 			gTimeEntries.erase(it);
134 			return;
135 		}
136 	}
137 	mabort("unregistering unknown handle from time table");
138 }
139 
eventloop_timechange(void * handle,int mode,uint64_t seconds,uint64_t offset)140 int eventloop_timechange(void* handle, int mode, uint64_t seconds, uint64_t offset) {
141 	timeentry *aux = (timeentry*)handle;
142 	if (seconds == 0 || offset >= seconds) {
143 		return -1;
144 	}
145 	aux->nextevent = ((now + seconds) / seconds) * seconds + offset;
146 	aux->period = seconds;
147 	aux->offset = offset;
148 	aux->mode = mode;
149 	return 0;
150 }
151 
eventloop_timechange_ms(void * handle,uint64_t period)152 int eventloop_timechange_ms(void* handle, uint64_t period) {
153 	timeentry *aux = (timeentry*)handle;
154 	if (period == 0) {
155 		return -1;
156 	}
157 	aux->nextevent = ((usecnow / 1000 + period) / period) * period;
158 	aux->period = period;
159 	return 0;
160 }
161 
eventloop_destruct()162 void eventloop_destruct() {
163 	for (const FunctionEntry &fun : gDestructEntries) {
164 		try {
165 			fun();
166 		} catch (Exception& ex) {
167 			lzfs_pretty_syslog(LOG_WARNING, "term error: %s", ex.what());
168 		}
169 	}
170 }
171 
eventloop_release_resources(void)172 void eventloop_release_resources(void) {
173 	gDestructEntries.clear();
174 	gCanExitEntries.clear();
175 	gWantExitEntries.clear();
176 	gReloadEntries.clear();
177 	gEachLoopEntries.clear();
178 	gPollEntries.clear();
179 	gTimeEntries.clear();
180 }
181 
182 /* internal */
canexit()183 bool canexit() {
184 	for (const CanExitEntry &fun : gCanExitEntries) {
185 		if (fun() == 0) {
186 			return false;
187 		}
188 	}
189 	return true;
190 }
191 
eventloop_time()192 uint32_t eventloop_time() {
193 	return now;
194 }
195 
eventloop_utime()196 uint64_t eventloop_utime() {
197 	return usecnow;
198 }
199 
eventloop_want_to_terminate()200 uint8_t eventloop_want_to_terminate() {
201 	if (gExitingStatus == ExitingStatus::kRunning) {
202 		gExitingStatus = ExitingStatus::kWantExit;
203 		lzfs_pretty_syslog(LOG_INFO, "Exiting on internal request.");
204 		return LIZARDFS_STATUS_OK;
205 	} else {
206 		lzfs_pretty_syslog(LOG_ERR, "Unable to exit on internal request.");
207 		return LIZARDFS_ERROR_NOTPOSSIBLE;
208 	}
209 }
210 
eventloop_want_to_reload()211 void eventloop_want_to_reload() {
212 	gReloadRequested = true;
213 }
214 
eventloop_run()215 void eventloop_run() {
216 	uint32_t prevtime  = 0;
217 	uint64_t prevmtime = 0;
218 	std::vector<pollfd> pdesc;
219 	int i;
220 
221 	while (gExitingStatus != ExitingStatus::kDoExit) {
222 		pdesc.clear();
223 		for (auto &pollit: gPollEntries) {
224 			pollit.desc(pdesc);
225 		}
226 #if defined(_WIN32)
227 		i = tcppoll(pdesc, nextPollNonblocking ? 0 : 50);
228 #else
229 		i = poll(pdesc.data(),pdesc.size(), nextPollNonblocking ? 0 : 50);
230 #endif
231 		nextPollNonblocking = false;
232 		eventloop_updatetime();
233 		if (i<0) {
234 			if (errno==EAGAIN) {
235 				lzfs_pretty_syslog(LOG_WARNING,"poll returned EAGAIN");
236 				usleep(100000);
237 				continue;
238 			}
239 			if (errno!=EINTR) {
240 				lzfs_pretty_syslog(LOG_WARNING,"poll error: %s",strerr(errno));
241 				break;
242 			}
243 		} else {
244 			for (auto &pollit : gPollEntries) {
245 				pollit.serve(pdesc);
246 			}
247 		}
248 		for (const FunctionEntry &fun : gEachLoopEntries) {
249 			fun();
250 		}
251 
252 		uint64_t msecnow = usecnow / 1000;
253 
254 		if (msecnow < prevmtime) {
255 			// time went backward - recalculate next event time
256 			for (timeentry& timeit : gTimeEntries) {
257 				if (!timeit.millisecond_precision) {
258 					continue;
259 				}
260 
261 				uint64_t previous_time_to_run = timeit.nextevent - prevmtime;
262 				previous_time_to_run = std::min(previous_time_to_run, timeit.period);
263 				timeit.nextevent = msecnow + previous_time_to_run;
264 			}
265 		}
266 
267 		if (now<prevtime) {
268 			// time went backward !!! - recalculate "nextevent" time
269 			// adding previous_time_to_run prevents from running next event too soon.
270 			for (timeentry& timeit : gTimeEntries) {
271 				if (timeit.millisecond_precision) {
272 					continue;
273 				}
274 
275 				uint64_t previous_time_to_run = timeit.nextevent - prevtime;
276 				previous_time_to_run = std::min(previous_time_to_run, timeit.period);
277 				timeit.nextevent = ((now + previous_time_to_run + timeit.period)
278 				                   / timeit.period) * timeit.period + timeit.offset;
279 			}
280 		} else if (now>prevtime+3600) {
281 			// time went forward !!! - just recalculate "nextevent" time
282 			for (timeentry& timeit : gTimeEntries) {
283 				if (timeit.millisecond_precision) {
284 					timeit.nextevent = msecnow + timeit.period;
285 					continue;
286 				}
287 
288 				timeit.nextevent = ((now + timeit.period) / timeit.period)
289 				                    * timeit.period + timeit.offset;
290 			}
291 		}
292 
293 		for (timeentry& timeit : gTimeEntries) {
294 			if (timeit.millisecond_precision) {
295 				if (msecnow >= timeit.nextevent) {
296 					timeit.nextevent = msecnow + timeit.period;
297 					timeit.fun();
298 				}
299 				continue;
300 			}
301 
302 			if (now >= timeit.nextevent) {
303 				if (timeit.mode == TIMEMODE_RUN_LATE) {
304 					timeit.fun();
305 				} else { /* timeit.mode == TIMEMODE_SKIP_LATE */
306 					if (now == timeit.nextevent) {
307 						timeit.fun();
308 					}
309 				}
310 				timeit.nextevent += ((now - timeit.nextevent + timeit.period)
311 				                    / timeit.period) * timeit.period;
312 			}
313 		}
314 		prevtime  = now;
315 		prevmtime = usecnow / 1000;
316 		if (gExitingStatus == ExitingStatus::kRunning && gReloadRequested) {
317 			cfg_reload();
318 			for (const FunctionEntry &fun : gReloadEntries) {
319 				try {
320 					fun();
321 				} catch (Exception& ex) {
322 					lzfs_pretty_syslog(LOG_WARNING, "reload error: %s", ex.what());
323 				}
324 			}
325 			gReloadRequested = false;
326 		}
327 		if (gExitingStatus == ExitingStatus::kWantExit) {
328 			for (const FunctionEntry &fun : gWantExitEntries) {
329 				fun();
330 			}
331 			gExitingStatus = ExitingStatus::kCanExit;
332 		}
333 		if (gExitingStatus == ExitingStatus::kCanExit) {
334 			if (canexit()) {
335 				gExitingStatus = ExitingStatus::kDoExit;
336 			}
337 		}
338 	}
339 }
340 
eventloop_updatetime()341 void eventloop_updatetime() {
342 	struct timeval tv;
343 
344 	gettimeofday(&tv,NULL);
345 	usecnow = tv.tv_sec * uint64_t(1000000) + tv.tv_usec;
346 	now = tv.tv_sec;
347 }
348