1 /*
2 Minetest
3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14 
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19 
20 #include <stdio.h>
21 #include <stdlib.h>
22 
23 extern "C" {
24 #include "lua.h"
25 #include "lauxlib.h"
26 #include "lualib.h"
27 }
28 
29 #include "server.h"
30 #include "s_async.h"
31 #include "log.h"
32 #include "filesys.h"
33 #include "porting.h"
34 #include "common/c_internal.h"
35 
36 /******************************************************************************/
AsyncEngine()37 AsyncEngine::AsyncEngine() :
38 	initDone(false),
39 	jobIdCounter(0)
40 {
41 }
42 
43 /******************************************************************************/
~AsyncEngine()44 AsyncEngine::~AsyncEngine()
45 {
46 
47 	// Request all threads to stop
48 	for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
49 			it != workerThreads.end(); it++) {
50 		(*it)->Stop();
51 	}
52 
53 
54 	// Wake up all threads
55 	for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
56 			it != workerThreads.end(); it++) {
57 		jobQueueCounter.Post();
58 	}
59 
60 	// Wait for threads to finish
61 	for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
62 			it != workerThreads.end(); it++) {
63 		(*it)->Wait();
64 	}
65 
66 	// Force kill all threads
67 	for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
68 			it != workerThreads.end(); it++) {
69 		(*it)->Kill();
70 		delete *it;
71 	}
72 
73 	jobQueueMutex.Lock();
74 	jobQueue.clear();
75 	jobQueueMutex.Unlock();
76 	workerThreads.clear();
77 }
78 
79 /******************************************************************************/
registerFunction(const char * name,lua_CFunction func)80 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
81 {
82 	if (initDone) {
83 		return false;
84 	}
85 	functionList[name] = func;
86 	return true;
87 }
88 
89 /******************************************************************************/
initialize(unsigned int numEngines)90 void AsyncEngine::initialize(unsigned int numEngines)
91 {
92 	initDone = true;
93 
94 	for (unsigned int i = 0; i < numEngines; i++) {
95 		AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
96 		workerThreads.push_back(toAdd);
97 		toAdd->Start();
98 	}
99 }
100 
101 /******************************************************************************/
queueAsyncJob(std::string func,std::string params)102 unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
103 {
104 	jobQueueMutex.Lock();
105 	LuaJobInfo toAdd;
106 	toAdd.id = jobIdCounter++;
107 	toAdd.serializedFunction = func;
108 	toAdd.serializedParams = params;
109 
110 	jobQueue.push_back(toAdd);
111 
112 	jobQueueCounter.Post();
113 
114 	jobQueueMutex.Unlock();
115 
116 	return toAdd.id;
117 }
118 
119 /******************************************************************************/
getJob()120 LuaJobInfo AsyncEngine::getJob()
121 {
122 	jobQueueCounter.Wait();
123 	jobQueueMutex.Lock();
124 
125 	LuaJobInfo retval;
126 	retval.valid = false;
127 
128 	if (!jobQueue.empty()) {
129 		retval = jobQueue.front();
130 		jobQueue.pop_front();
131 		retval.valid = true;
132 	}
133 	jobQueueMutex.Unlock();
134 
135 	return retval;
136 }
137 
138 /******************************************************************************/
putJobResult(LuaJobInfo result)139 void AsyncEngine::putJobResult(LuaJobInfo result)
140 {
141 	resultQueueMutex.Lock();
142 	resultQueue.push_back(result);
143 	resultQueueMutex.Unlock();
144 }
145 
146 /******************************************************************************/
step(lua_State * L,int errorhandler)147 void AsyncEngine::step(lua_State *L, int errorhandler)
148 {
149 	lua_getglobal(L, "core");
150 	resultQueueMutex.Lock();
151 	while (!resultQueue.empty()) {
152 		LuaJobInfo jobDone = resultQueue.front();
153 		resultQueue.pop_front();
154 
155 		lua_getfield(L, -1, "async_event_handler");
156 
157 		if (lua_isnil(L, -1)) {
158 			assert("Async event handler does not exist!" == 0);
159 		}
160 
161 		luaL_checktype(L, -1, LUA_TFUNCTION);
162 
163 		lua_pushinteger(L, jobDone.id);
164 		lua_pushlstring(L, jobDone.serializedResult.data(),
165 				jobDone.serializedResult.size());
166 
167 		if (lua_pcall(L, 2, 0, errorhandler)) {
168 			script_error(L);
169 		}
170 	}
171 	resultQueueMutex.Unlock();
172 	lua_pop(L, 1); // Pop core
173 }
174 
175 /******************************************************************************/
pushFinishedJobs(lua_State * L)176 void AsyncEngine::pushFinishedJobs(lua_State* L) {
177 	// Result Table
178 	resultQueueMutex.Lock();
179 
180 	unsigned int index = 1;
181 	lua_createtable(L, resultQueue.size(), 0);
182 	int top = lua_gettop(L);
183 
184 	while (!resultQueue.empty()) {
185 		LuaJobInfo jobDone = resultQueue.front();
186 		resultQueue.pop_front();
187 
188 		lua_createtable(L, 0, 2);  // Pre-allocate space for two map fields
189 		int top_lvl2 = lua_gettop(L);
190 
191 		lua_pushstring(L, "jobid");
192 		lua_pushnumber(L, jobDone.id);
193 		lua_settable(L, top_lvl2);
194 
195 		lua_pushstring(L, "retval");
196 		lua_pushlstring(L, jobDone.serializedResult.data(),
197 			jobDone.serializedResult.size());
198 		lua_settable(L, top_lvl2);
199 
200 		lua_rawseti(L, top, index++);
201 	}
202 
203 	resultQueueMutex.Unlock();
204 }
205 
206 /******************************************************************************/
prepareEnvironment(lua_State * L,int top)207 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
208 {
209 	for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
210 			it != functionList.end(); it++) {
211 		lua_pushstring(L, it->first.c_str());
212 		lua_pushcfunction(L, it->second);
213 		lua_settable(L, top);
214 	}
215 }
216 
217 /******************************************************************************/
AsyncWorkerThread(AsyncEngine * jobDispatcher,unsigned int threadNum)218 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
219 		unsigned int threadNum) :
220 	ScriptApiBase(),
221 	jobDispatcher(jobDispatcher),
222 	threadnum(threadNum)
223 {
224 	lua_State *L = getStack();
225 
226 	// Prepare job lua environment
227 	lua_getglobal(L, "core");
228 	int top = lua_gettop(L);
229 
230 	// Push builtin initialization type
231 	lua_pushstring(L, "async");
232 	lua_setglobal(L, "INIT");
233 
234 	jobDispatcher->prepareEnvironment(L, top);
235 }
236 
237 /******************************************************************************/
~AsyncWorkerThread()238 AsyncWorkerThread::~AsyncWorkerThread()
239 {
240 	assert(IsRunning() == false);
241 }
242 
243 /******************************************************************************/
Thread()244 void* AsyncWorkerThread::Thread()
245 {
246 	ThreadStarted();
247 
248 	// Register thread for error logging
249 	char number[21];
250 	snprintf(number, sizeof(number), "%d", threadnum);
251 	log_register_thread(std::string("AsyncWorkerThread_") + number);
252 
253 	porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
254 
255 	lua_State *L = getStack();
256 
257 	std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
258 	if (!loadScript(script)) {
259 		errorstream
260 			<< "AsyncWorkderThread execution of async base environment failed!"
261 			<< std::endl;
262 		abort();
263 	}
264 
265 	lua_getglobal(L, "core");
266 	if (lua_isnil(L, -1)) {
267 		errorstream << "Unable to find core within async environment!";
268 		abort();
269 	}
270 
271 	// Main loop
272 	while (!StopRequested()) {
273 		// Wait for job
274 		LuaJobInfo toProcess = jobDispatcher->getJob();
275 
276 		if (toProcess.valid == false || StopRequested()) {
277 			continue;
278 		}
279 
280 		lua_getfield(L, -1, "job_processor");
281 		if (lua_isnil(L, -1)) {
282 			errorstream << "Unable to get async job processor!" << std::endl;
283 			abort();
284 		}
285 
286 		luaL_checktype(L, -1, LUA_TFUNCTION);
287 
288 		// Call it
289 		lua_pushlstring(L,
290 				toProcess.serializedFunction.data(),
291 				toProcess.serializedFunction.size());
292 		lua_pushlstring(L,
293 				toProcess.serializedParams.data(),
294 				toProcess.serializedParams.size());
295 
296 		if (lua_pcall(L, 2, 1, m_errorhandler)) {
297 			scriptError();
298 			toProcess.serializedResult = "";
299 		} else {
300 			// Fetch result
301 			size_t length;
302 			const char *retval = lua_tolstring(L, -1, &length);
303 			toProcess.serializedResult = std::string(retval, length);
304 		}
305 
306 		lua_pop(L, 1);  // Pop retval
307 
308 		// Put job result
309 		jobDispatcher->putJobResult(toProcess);
310 	}
311 
312 	lua_pop(L, 1);  // Pop core
313 
314 	log_deregister_thread();
315 
316 	return 0;
317 }
318 
319