1 /*
2 Minetest
3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 extern "C" {
24 #include "lua.h"
25 #include "lauxlib.h"
26 #include "lualib.h"
27 }
28
29 #include "server.h"
30 #include "s_async.h"
31 #include "log.h"
32 #include "filesys.h"
33 #include "porting.h"
34 #include "common/c_internal.h"
35
36 /******************************************************************************/
AsyncEngine()37 AsyncEngine::AsyncEngine() :
38 initDone(false),
39 jobIdCounter(0)
40 {
41 }
42
43 /******************************************************************************/
~AsyncEngine()44 AsyncEngine::~AsyncEngine()
45 {
46
47 // Request all threads to stop
48 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
49 it != workerThreads.end(); it++) {
50 (*it)->Stop();
51 }
52
53
54 // Wake up all threads
55 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
56 it != workerThreads.end(); it++) {
57 jobQueueCounter.Post();
58 }
59
60 // Wait for threads to finish
61 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
62 it != workerThreads.end(); it++) {
63 (*it)->Wait();
64 }
65
66 // Force kill all threads
67 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
68 it != workerThreads.end(); it++) {
69 (*it)->Kill();
70 delete *it;
71 }
72
73 jobQueueMutex.Lock();
74 jobQueue.clear();
75 jobQueueMutex.Unlock();
76 workerThreads.clear();
77 }
78
79 /******************************************************************************/
registerFunction(const char * name,lua_CFunction func)80 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
81 {
82 if (initDone) {
83 return false;
84 }
85 functionList[name] = func;
86 return true;
87 }
88
89 /******************************************************************************/
initialize(unsigned int numEngines)90 void AsyncEngine::initialize(unsigned int numEngines)
91 {
92 initDone = true;
93
94 for (unsigned int i = 0; i < numEngines; i++) {
95 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
96 workerThreads.push_back(toAdd);
97 toAdd->Start();
98 }
99 }
100
101 /******************************************************************************/
queueAsyncJob(std::string func,std::string params)102 unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
103 {
104 jobQueueMutex.Lock();
105 LuaJobInfo toAdd;
106 toAdd.id = jobIdCounter++;
107 toAdd.serializedFunction = func;
108 toAdd.serializedParams = params;
109
110 jobQueue.push_back(toAdd);
111
112 jobQueueCounter.Post();
113
114 jobQueueMutex.Unlock();
115
116 return toAdd.id;
117 }
118
119 /******************************************************************************/
getJob()120 LuaJobInfo AsyncEngine::getJob()
121 {
122 jobQueueCounter.Wait();
123 jobQueueMutex.Lock();
124
125 LuaJobInfo retval;
126 retval.valid = false;
127
128 if (!jobQueue.empty()) {
129 retval = jobQueue.front();
130 jobQueue.pop_front();
131 retval.valid = true;
132 }
133 jobQueueMutex.Unlock();
134
135 return retval;
136 }
137
138 /******************************************************************************/
putJobResult(LuaJobInfo result)139 void AsyncEngine::putJobResult(LuaJobInfo result)
140 {
141 resultQueueMutex.Lock();
142 resultQueue.push_back(result);
143 resultQueueMutex.Unlock();
144 }
145
146 /******************************************************************************/
step(lua_State * L,int errorhandler)147 void AsyncEngine::step(lua_State *L, int errorhandler)
148 {
149 lua_getglobal(L, "core");
150 resultQueueMutex.Lock();
151 while (!resultQueue.empty()) {
152 LuaJobInfo jobDone = resultQueue.front();
153 resultQueue.pop_front();
154
155 lua_getfield(L, -1, "async_event_handler");
156
157 if (lua_isnil(L, -1)) {
158 assert("Async event handler does not exist!" == 0);
159 }
160
161 luaL_checktype(L, -1, LUA_TFUNCTION);
162
163 lua_pushinteger(L, jobDone.id);
164 lua_pushlstring(L, jobDone.serializedResult.data(),
165 jobDone.serializedResult.size());
166
167 if (lua_pcall(L, 2, 0, errorhandler)) {
168 script_error(L);
169 }
170 }
171 resultQueueMutex.Unlock();
172 lua_pop(L, 1); // Pop core
173 }
174
175 /******************************************************************************/
pushFinishedJobs(lua_State * L)176 void AsyncEngine::pushFinishedJobs(lua_State* L) {
177 // Result Table
178 resultQueueMutex.Lock();
179
180 unsigned int index = 1;
181 lua_createtable(L, resultQueue.size(), 0);
182 int top = lua_gettop(L);
183
184 while (!resultQueue.empty()) {
185 LuaJobInfo jobDone = resultQueue.front();
186 resultQueue.pop_front();
187
188 lua_createtable(L, 0, 2); // Pre-allocate space for two map fields
189 int top_lvl2 = lua_gettop(L);
190
191 lua_pushstring(L, "jobid");
192 lua_pushnumber(L, jobDone.id);
193 lua_settable(L, top_lvl2);
194
195 lua_pushstring(L, "retval");
196 lua_pushlstring(L, jobDone.serializedResult.data(),
197 jobDone.serializedResult.size());
198 lua_settable(L, top_lvl2);
199
200 lua_rawseti(L, top, index++);
201 }
202
203 resultQueueMutex.Unlock();
204 }
205
206 /******************************************************************************/
prepareEnvironment(lua_State * L,int top)207 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
208 {
209 for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
210 it != functionList.end(); it++) {
211 lua_pushstring(L, it->first.c_str());
212 lua_pushcfunction(L, it->second);
213 lua_settable(L, top);
214 }
215 }
216
217 /******************************************************************************/
AsyncWorkerThread(AsyncEngine * jobDispatcher,unsigned int threadNum)218 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
219 unsigned int threadNum) :
220 ScriptApiBase(),
221 jobDispatcher(jobDispatcher),
222 threadnum(threadNum)
223 {
224 lua_State *L = getStack();
225
226 // Prepare job lua environment
227 lua_getglobal(L, "core");
228 int top = lua_gettop(L);
229
230 // Push builtin initialization type
231 lua_pushstring(L, "async");
232 lua_setglobal(L, "INIT");
233
234 jobDispatcher->prepareEnvironment(L, top);
235 }
236
237 /******************************************************************************/
~AsyncWorkerThread()238 AsyncWorkerThread::~AsyncWorkerThread()
239 {
240 assert(IsRunning() == false);
241 }
242
243 /******************************************************************************/
Thread()244 void* AsyncWorkerThread::Thread()
245 {
246 ThreadStarted();
247
248 // Register thread for error logging
249 char number[21];
250 snprintf(number, sizeof(number), "%d", threadnum);
251 log_register_thread(std::string("AsyncWorkerThread_") + number);
252
253 porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
254
255 lua_State *L = getStack();
256
257 std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
258 if (!loadScript(script)) {
259 errorstream
260 << "AsyncWorkderThread execution of async base environment failed!"
261 << std::endl;
262 abort();
263 }
264
265 lua_getglobal(L, "core");
266 if (lua_isnil(L, -1)) {
267 errorstream << "Unable to find core within async environment!";
268 abort();
269 }
270
271 // Main loop
272 while (!StopRequested()) {
273 // Wait for job
274 LuaJobInfo toProcess = jobDispatcher->getJob();
275
276 if (toProcess.valid == false || StopRequested()) {
277 continue;
278 }
279
280 lua_getfield(L, -1, "job_processor");
281 if (lua_isnil(L, -1)) {
282 errorstream << "Unable to get async job processor!" << std::endl;
283 abort();
284 }
285
286 luaL_checktype(L, -1, LUA_TFUNCTION);
287
288 // Call it
289 lua_pushlstring(L,
290 toProcess.serializedFunction.data(),
291 toProcess.serializedFunction.size());
292 lua_pushlstring(L,
293 toProcess.serializedParams.data(),
294 toProcess.serializedParams.size());
295
296 if (lua_pcall(L, 2, 1, m_errorhandler)) {
297 scriptError();
298 toProcess.serializedResult = "";
299 } else {
300 // Fetch result
301 size_t length;
302 const char *retval = lua_tolstring(L, -1, &length);
303 toProcess.serializedResult = std::string(retval, length);
304 }
305
306 lua_pop(L, 1); // Pop retval
307
308 // Put job result
309 jobDispatcher->putJobResult(toProcess);
310 }
311
312 lua_pop(L, 1); // Pop core
313
314 log_deregister_thread();
315
316 return 0;
317 }
318
319