1
2
3 #ifdef _WIN32
4 #define _WIN32_WINNT 0x0500 // per CreateJobObject e affini
5 #endif
6
7 #include "tsystem.h"
8 #include "mainwindow.h"
9 #include "batches.h"
10 #include "tthreadmessage.h"
11 #include "tfarmexecutor.h"
12 #include "tstream.h"
13 #include "tfarmstuff.h"
14 #include "tenv.h"
15 #include "toutputproperties.h"
16 #include "trasterfx.h"
17 #include "tasksviewer.h"
18 #include "tapp.h"
19 #include "filebrowserpopup.h"
20 #include "tmsgcore.h"
21 #include "toonz/toonzscene.h"
22 #include "toonz/sceneproperties.h"
23 #include "toonz/preferences.h"
24
25 #include "toonzqt/gutil.h"
26
27 #include <QString>
28 #include <QProcess>
29 #include <QHostInfo>
30
31 namespace {
32
33 class NotifyMessage final : public TThread::Message {
34 public:
NotifyMessage()35 NotifyMessage() {}
36
onDeliver()37 void onDeliver() override { BatchesController::instance()->update(); }
38
clone() const39 TThread::Message *clone() const override { return new NotifyMessage(*this); }
40 };
41
42 } // namespace
43
44 //------------------------------------------------------------------------------------------------------------------
45 //------------------------------------------------------------------------------------------------------------------
46 //----------------------------------------------------------------------------------------------------------------
47 //----------------------------------------------------------------------------------------------------------------
48
LoadTaskListPopup()49 LoadTaskListPopup::LoadTaskListPopup()
50 : FileBrowserPopup(tr("Load Task List")) {
51 addFilterType("tnzbat");
52 setOkText(tr("Load"));
53 }
54
execute()55 bool LoadTaskListPopup::execute() {
56 if (m_selectedPaths.empty()) return false;
57
58 const TFilePath &fp = *m_selectedPaths.begin();
59
60 if (!TSystem::doesExistFileOrLevel(fp)) {
61 DVGui::error(toQString(fp) + tr(" does not exist."));
62 return false;
63 } else if (fp.getType() != "tnzbat") {
64 DVGui::error(toQString(fp) +
65 tr("It is possible to load only TNZBAT files."));
66 return false;
67 }
68
69 BatchesController::instance()->doLoad(fp);
70 return true;
71 }
72
73 //----------------------------------------------------------------------------------------------------------------
74
LoadTaskPopup()75 LoadTaskPopup::LoadTaskPopup() : FileBrowserPopup(""), m_isRenderTask(true) {
76 addFilterType("tnz");
77 setOkText(tr("Add"));
78 }
79
open(bool isRenderTask)80 void LoadTaskPopup::open(bool isRenderTask) {
81 m_isRenderTask = isRenderTask;
82 setWindowTitle(isRenderTask ? tr("Add Render Task to Batch List")
83 : tr("Add Cleanup Task to Batch List"));
84
85 if (isRenderTask)
86 removeFilterType("cln");
87 else
88 addFilterType("cln");
89
90 exec();
91 }
92
execute()93 bool LoadTaskPopup::execute() {
94 if (m_selectedPaths.empty()) return false;
95
96 const TFilePath &fp = *m_selectedPaths.begin();
97
98 if (!TSystem::doesExistFileOrLevel(fp)) {
99 DVGui::error(toQString(fp) + tr(" does not exist."));
100 return false;
101 } else if (m_isRenderTask && fp.getType() != "tnz") {
102 DVGui::error(toQString(fp) +
103 tr(" you can load only TNZ files for render task."));
104 return false;
105 } else if (!m_isRenderTask && fp.getType() != "tnz" &&
106 fp.getType() != "cln") {
107 DVGui::error(toQString(fp) +
108 tr(" you can load only TNZ or CLN files for cleanup task."));
109 return false;
110 }
111
112 if (m_isRenderTask)
113 BatchesController::instance()->addComposerTask(fp);
114 else
115 BatchesController::instance()->addCleanupTask(fp);
116
117 return true;
118 }
119
120 //=============================================================================
121
SaveTaskListPopup()122 SaveTaskListPopup::SaveTaskListPopup()
123 : FileBrowserPopup(tr("Save Task List")) {
124 setOkText(tr("Save"));
125 }
126
execute()127 bool SaveTaskListPopup::execute() {
128 if (m_selectedPaths.empty()) return false;
129
130 const TFilePath &fp = *m_selectedPaths.begin();
131
132 BatchesController::instance()->doSave(fp.withType("tnzbat"));
133 return true;
134 }
135
136 //------------------------------------------------------------------------------
137 QMutex TasksMutex;
138 std::map<QString, QProcess *> RunningTasks;
139
140 class TaskRunner final : public TThread::Runnable {
141 public:
TaskRunner(TFarmTask * task,int localControllerPort)142 TaskRunner(TFarmTask *task, int localControllerPort)
143 : m_task(task), m_localControllerPort(localControllerPort) {}
144
145 void run() override;
146
147 int taskLoad() override;
148
149 void doRun(TFarmTask *task);
150
runCleanup()151 void runCleanup() { BatchesController::instance()->notify(); }
152
153 TFarmTask *m_task;
154 int m_localControllerPort;
155 };
156
run()157 void TaskRunner::run() { doRun(m_task); }
158
taskLoad()159 int TaskRunner::taskLoad() {
160 // We assume that CPU control is under the user's responsibility about
161 // tcomposer tasks - so they don't figure in CPU consumption calculations.
162 return 0;
163 }
164
165 /*QString getFarmAddress()
166 {
167 if (BatchesController::instance()->getController())
168 {
169 QString dummystr, address;
170 int dummyint;
171 TFarmStuff::getControllerData(dummystr, address, dummyint);
172
173 return address;
174 }
175 else return "127.0.0.1"; //localHost
176 }*/
177
doRun(TFarmTask * task)178 void TaskRunner::doRun(TFarmTask *task) {
179 // Suspended tasks are not executed. (Should only waiting ones be executed?)
180 if (task->m_status == Suspended) return;
181
182 if (task->m_dependencies && task->m_dependencies->getTaskCount() != 0) {
183 task->m_status = Waiting;
184 int depCount = task->m_dependencies->getTaskCount();
185 for (int i = 0; i < depCount; ++i) {
186 QString depTaskId = task->m_dependencies->getTaskId(i);
187 TFarmTask *depTask = BatchesController::instance()->getTask(depTaskId);
188 if (depTask) {
189 doRun(depTask);
190 }
191 }
192 }
193
194 task->m_successfullSteps = task->m_failedSteps = 0;
195 task->m_completionDate = QDateTime();
196
197 task->m_status = Running;
198 task->m_server = TSystem::getHostName();
199 task->m_startDate = QDateTime::currentDateTime();
200
201 NotifyMessage().send();
202
203 int count = task->getTaskCount();
204 if (count > 1) {
205 // Launch each subtask
206 for (int i = 0; i < task->getTaskCount(); i++) {
207 if (task->m_status == Suspended) break;
208
209 doRun(task->getTask(i));
210 }
211
212 // Check their status
213 task->m_status = Completed;
214 task->m_successfullSteps = task->m_to - task->m_from + 1;
215 task->m_completionDate = QDateTime::currentDateTime();
216 for (int i = 0; i < task->getTaskCount(); i++) {
217 if (task->getTask(i)->m_status == Aborted) task->m_status = Aborted;
218 }
219
220 NotifyMessage().send();
221
222 return;
223 }
224
225 // int exitCode;
226
227 /*QString commandline = task->getCommandLine();
228
229 commandline += " -farm ";
230 commandline += QString::number(m_localControllerPort) + "@localhost";
231 commandline += " -id " + task->m_id;*/
232
233 QProcess *process = new QProcess();
234 std::map<QString, QProcess *>::iterator it;
235
236 {
237 QMutexLocker sl(&TasksMutex);
238 if ((it = RunningTasks.find(task->m_id)) != RunningTasks.end()) {
239 it->second->kill();
240 RunningTasks.erase(task->m_id);
241 }
242
243 RunningTasks[task->m_id] = process;
244 }
245
246 process->start(task->getCommandLine());
247 process->waitForFinished(-1);
248
249 {
250 QMutexLocker sl(&TasksMutex);
251 RunningTasks.erase(task->m_id);
252 }
253 // exitCode = QProcess::execute(task->getCommandLine());
254
255 int error = process->error();
256 bool statusOK = (error == QProcess::UnknownError) &&
257 (process->exitCode() == 0 ||
258 task->m_successfullSteps == task->m_stepCount);
259
260 if (statusOK) {
261 task->m_status = Completed;
262 task->m_successfullSteps = task->m_to - task->m_from + 1;
263 } else {
264 task->m_status = Aborted;
265 task->m_failedSteps = task->m_to - task->m_from + 1;
266 }
267
268 task->m_completionDate = QDateTime::currentDateTime();
269
270 NotifyMessage().send();
271 }
272
273 //------------------------------------------------------------------------------
274 //------------------------------------------------------------------------------
275
BatchesController()276 BatchesController::BatchesController()
277 : m_controller(0), m_tasksTree(0), m_filepath(), m_dirtyFlag(false) {}
278
279 //------------------------------------------------------------------------------
280
~BatchesController()281 BatchesController::~BatchesController() {}
282
283 //------------------------------------------------------------------------------
284
setTasksTree(TaskTreeModel * tree)285 void BatchesController::setTasksTree(TaskTreeModel *tree) {
286 m_tasksTree = tree;
287 }
288
289 //------------------------------------------------------------------------------
290
isMovieType(std::string type)291 inline bool isMovieType(std::string type) {
292 return (type == "mov" || type == "avi" || type == "3gp");
293 }
294
295 //------------------------------------------------------------------------------
296
297 static int TaskCounter = 1;
298
299 //------------------------------------------------------------------------------
300
addCleanupTask(const TFilePath & taskFilePath)301 void BatchesController::addCleanupTask(const TFilePath &taskFilePath) {
302 setDirtyFlag(true);
303 QString id = QString::number(TaskCounter++);
304 while (BatchesController::instance()->getTask(id))
305 id = QString::number(TaskCounter++);
306
307 TFarmTaskGroup *taskGroup = new TFarmTaskGroup(
308 id, // id
309 QString::fromStdString(taskFilePath.getName()), // name
310 TSystem::getUserName(), // user
311 TSystem::getHostName(), // host
312 1, // stepCount
313 50, // priority
314 taskFilePath, // taskFilePath
315 Overwrite_Off, // Overwrite
316 false); // onlyvisible
317
318 try {
319 BatchesController::instance()->addTask(id, taskGroup);
320 } catch (TException &) {
321 }
322 }
323
324 //------------------------------------------------------------------------------
325
addComposerTask(const TFilePath & _taskFilePath)326 void BatchesController::addComposerTask(const TFilePath &_taskFilePath) {
327 setDirtyFlag(true);
328 TFilePath taskFilePath;
329 try {
330 taskFilePath = TSystem::toUNC(_taskFilePath);
331 } catch (TException &) {
332 }
333
334 ToonzScene scene;
335 try {
336 scene.loadNoResources(taskFilePath);
337 } catch (...) {
338 return;
339 }
340
341 TSceneProperties *sprop = scene.getProperties();
342 const TOutputProperties &out = *sprop->getOutputProperties();
343 const TRenderSettings &rs = out.getRenderSettings();
344
345 QString name = QString::fromStdString(taskFilePath.getName());
346 int r0, r1, step;
347 out.getRange(r0, r1, step);
348
349 int sceneFrameCount = scene.getFrameCount();
350 if (r0 < 0) r0 = 0;
351 if (r1 >= sceneFrameCount)
352 r1 = sceneFrameCount - 1;
353 else if (r1 < r0)
354 r1 = sceneFrameCount - 1;
355
356 r0++, r1++;
357 int shrink = rs.m_shrinkX;
358
359 int multimedia = out.getMultimediaRendering();
360 int threadsIndex = out.getThreadIndex();
361 int maxTileSizeIndex = out.getMaxTileSizeIndex();
362
363 TFilePath outputPath =
364 scene.decodeFilePath(sprop->getOutputProperties()->getPath());
365 if (outputPath.getWideName() == L"")
366 outputPath = outputPath.withName(scene.getScenePath().getName());
367 int taskChunkSize;
368 if (isMovieType(outputPath.getType()))
369 taskChunkSize = sceneFrameCount;
370 else {
371 int size = Preferences::instance()->getDefaultTaskChunkSize();
372 if (r1 - r0 + 1 < size)
373 taskChunkSize = r1 - r0 + 1;
374 else
375 taskChunkSize = size;
376 }
377
378 QString id = QString::number(TaskCounter++);
379 while (BatchesController::instance()->getTask(id))
380 id = QString::number(TaskCounter++);
381
382 TFarmTaskGroup *taskGroup = new TFarmTaskGroup(
383 id, name, TSystem::getUserName(), TSystem::getHostName(), sceneFrameCount,
384 50, taskFilePath, outputPath, r0, r1, step, shrink, multimedia,
385 taskChunkSize, threadsIndex, maxTileSizeIndex);
386
387 try {
388 BatchesController::instance()->addTask(id, taskGroup);
389 } catch (TException &) {
390 // TMessage::error(toString(e.getMessage()));
391 }
392 // m_data->m_scene.setProject( mainprogramProj);
393 // TModalPopup::closePopup();
394 }
395
396 //-------------------------------------------------------------------------------------------------------
397
398 /*--- id はタスクを追加するごとにインクリメントされる数字 ---*/
addTask(const QString & id,TFarmTask * task,bool doUpdate)399 void BatchesController::addTask(const QString &id, TFarmTask *task,
400 bool doUpdate) {
401 #ifdef _DEBUG
402 std::map<QString, TFarmTask *>::iterator it = m_tasks.find(id);
403 // assert(it == m_tasks.end());
404 #endif
405 if (task->m_id.isEmpty()) task->m_id = id;
406
407 task->m_status = Suspended;
408 task->m_submissionDate = QDateTime::currentDateTime();
409
410 m_tasks.insert(std::make_pair(id, task));
411
412 int count = task->getTaskCount();
413 if (count == 1 && task->getTask(0) == task) return;
414
415 for (int i = 0; i < count; ++i) {
416 TFarmTask *subtask = task->getTask(i);
417 assert(subtask);
418 addTask(subtask->m_id, subtask);
419 }
420 if (doUpdate) update();
421 }
422
423 //------------------------------------------------------------------------------
424
taskBusyStr()425 QString BatchesController::taskBusyStr() {
426 return tr(
427 "The %1 task is currently active.\nStop it or wait for its completion "
428 "before removing it.");
429 }
430
431 //------------------------------------------------------------------------------
432
removeTask(const QString & id)433 void BatchesController::removeTask(const QString &id) {
434 std::map<QString, TFarmTask *>::iterator it = m_tasks.find(id);
435 if (it == m_tasks.end()) return;
436
437 TFarmTask *task = it->second;
438 if (task->m_status == Running || task->m_status == Waiting) {
439 DVGui::warning(taskBusyStr().arg(task->m_name));
440 return;
441 }
442
443 setDirtyFlag(true);
444
445 if (!task->m_parentId.isEmpty()) {
446 it = m_tasks.find(task->m_parentId);
447 TFarmTaskGroup *parent = dynamic_cast<TFarmTaskGroup *>(it->second);
448 assert(parent);
449 parent->removeTask(task);
450 }
451
452 TFarmTaskGroup *taskGroup = dynamic_cast<TFarmTaskGroup *>(task);
453 if (taskGroup) {
454 if (m_controller) {
455 // si sta usando la farm
456 std::map<QString, QString>::iterator it = m_farmIdsTable.find(id);
457 if (it != m_farmIdsTable.end()) {
458 m_controller->suspendTask(it->second);
459 }
460 }
461
462 // rimuove i subtask
463 int taskCount = taskGroup->getTaskCount();
464 for (int i = 0; i < taskCount; ++i) {
465 TFarmTask *subTask = taskGroup->getTask(i);
466 assert(subTask);
467 m_tasks.erase(subTask->m_id);
468 }
469 }
470
471 m_tasks.erase(id);
472 delete task;
473 }
474
475 //------------------------------------------------------------------------------
476
477 namespace {
DeleteTask(const std::pair<QString,TFarmTask * > & mapItem)478 void DeleteTask(const std::pair<QString, TFarmTask *> &mapItem) {
479 if (mapItem.second->m_parentId.isEmpty()) delete mapItem.second;
480 }
481 }
482
removeAllTasks()483 void BatchesController::removeAllTasks() {
484 std::map<QString, TFarmTask *>::iterator tt, tEnd(m_tasks.end());
485 for (tt = m_tasks.begin(); tt != tEnd; ++tt) {
486 TFarmTask *task = tt->second;
487 if (task->m_status == Running || task->m_status == Waiting) {
488 DVGui::warning(taskBusyStr().arg(task->m_name));
489 return;
490 }
491 }
492
493 m_filepath = TFilePath();
494 setDirtyFlag(false);
495
496 m_tasks.clear();
497 notify();
498 }
499
500 //------------------------------------------------------------------------------
501
getTaskCount() const502 int BatchesController::getTaskCount() const { return m_tasks.size(); }
503
504 //------------------------------------------------------------------------------
505
getTaskId(int index) const506 QString BatchesController::getTaskId(int index) const {
507 assert(index >= 0 && index < (int)m_tasks.size());
508 std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
509 std::advance(it, index);
510 return it->first;
511 }
512
513 //------------------------------------------------------------------------------
514
getTask(int index) const515 TFarmTask *BatchesController::getTask(int index) const {
516 assert(index >= 0 && index < (int)m_tasks.size());
517 std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
518 std::advance(it, index);
519 return it->second;
520 }
521
522 //------------------------------------------------------------------------------
523
getTask(const QString & id) const524 TFarmTask *BatchesController::getTask(const QString &id) const {
525 std::map<QString, TFarmTask *>::const_iterator it = m_tasks.find(id);
526 if (it != m_tasks.end()) return it->second;
527 return 0;
528 }
529
530 //------------------------------------------------------------------------------
531
getTaskInfo(const QString & id,QString & parent,QString & name,TaskState & status)532 bool BatchesController::getTaskInfo(const QString &id, QString &parent,
533 QString &name, TaskState &status) {
534 TFarmTask *task = getTask(id);
535 if (task) {
536 parent = task->m_parentId;
537 name = task->m_name;
538 status = task->m_status;
539 }
540
541 return task != 0;
542 }
543
544 //------------------------------------------------------------------------------
545
getTaskStatus(const QString & id) const546 TaskState BatchesController::getTaskStatus(const QString &id) const {
547 TFarmTask *task = getTask(id);
548 if (task)
549 return task->m_status;
550 else
551 return TaskUnknown;
552 }
553
setDirtyFlag(bool state)554 void BatchesController::setDirtyFlag(bool state) {
555 static bool FirstTime = true;
556
557 if (FirstTime) {
558 FirstTime = false;
559 bool ret = connect(TApp::instance()->getMainWindow(), SIGNAL(exit(bool &)),
560 SLOT(onExit(bool &)));
561 assert(ret);
562 }
563
564 if (state == m_dirtyFlag) return;
565
566 m_dirtyFlag = state;
567 update();
568 }
569
570 //------------------------------------------------------------------------------
571
getTasks(const QString & parentId,std::vector<QString> & tasks) const572 void BatchesController::getTasks(const QString &parentId,
573 std::vector<QString> &tasks) const {
574 std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
575 for (; it != m_tasks.end(); ++it) {
576 TFarmTask *task = it->second;
577 assert(task);
578 if (task->m_parentId == parentId) tasks.push_back(task->m_id);
579 }
580 }
581
582 //------------------------------------------------------------------------------
583
startAll()584 void BatchesController::startAll() {
585 // viene usata la farm <==> m_controller != 0
586 if (!m_controller) {
587 // uso una multimap per ordinare rispetto alla priority
588 std::multimap<int, TFarmTask *> tasksByPriority;
589
590 std::map<QString, TFarmTask *>::reverse_iterator it = m_tasks.rbegin();
591 // uso il reverse iterator anche qui altrimenti se ho task con priorita'
592 // uguale li esegue dall'ultimo al primo
593 for (; it != m_tasks.rend(); ++it) {
594 TFarmTask *task = it->second;
595 tasksByPriority.insert(std::make_pair(task->m_priority, task));
596 }
597
598 std::multimap<int, TFarmTask *>::reverse_iterator it2 =
599 tasksByPriority.rbegin();
600 for (; it2 != tasksByPriority.rend(); ++it2) {
601 TFarmTask *task = it2->second;
602 assert(task);
603 task->m_status = Waiting;
604 if (task->m_parentId.isEmpty())
605 m_localExecutor.addTask(
606 new TaskRunner(task, m_localControllerPortNumber));
607 }
608 } else {
609 std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
610 for (; it != m_tasks.end(); ++it) {
611 TFarmTask *task = it->second;
612 assert(task);
613 if (task->m_parentId.isEmpty()) start(task->m_id);
614 }
615 }
616
617 notify();
618 }
619
620 //------------------------------------------------------------------------------
621
start(const QString & taskId)622 void BatchesController::start(const QString &taskId) {
623 QString computerName = QHostInfo().localHostName();
624
625 std::map<QString, TFarmTask *>::const_iterator it = m_tasks.find(taskId);
626 assert(it != m_tasks.end());
627 TFarmTask *task = it->second;
628 assert(task);
629 task->m_status = Waiting;
630 task->m_failedSteps = task->m_successfullSteps = 0;
631 task->m_completionDate = task->m_startDate = QDateTime();
632 task->m_callerMachineName = computerName;
633 int count = task->getTaskCount();
634 if (count > 1)
635 for (int i = 0; i < count; ++i) {
636 TFarmTask *subtask = task->getTask(i);
637 assert(subtask);
638 subtask->m_status = Waiting;
639 subtask->m_failedSteps = subtask->m_successfullSteps = 0;
640 subtask->m_completionDate = subtask->m_startDate = QDateTime();
641 subtask->m_callerMachineName = computerName;
642 }
643
644 // viene usata la farm <==> m_controller != 0
645 if (!m_controller) {
646 m_localExecutor.addTask(new TaskRunner(task, m_localControllerPortNumber));
647 } else {
648 std::map<QString, QString>::iterator ktor = m_farmIdsTable.find(taskId);
649 if (ktor != m_farmIdsTable.end()) {
650 QString farmTaskId = ktor->second;
651 m_controller->restartTask(farmTaskId);
652
653 if (dynamic_cast<TFarmTaskGroup *>(task)) {
654 std::vector<QString> subtasks;
655 m_controller->getTasks(farmTaskId, subtasks);
656
657 if (!subtasks.empty()) {
658 assert((int)subtasks.size() == task->getTaskCount());
659
660 std::vector<QString>::iterator jtor = subtasks.begin();
661 for (int i = 0; i < task->getTaskCount(); ++i, ++jtor)
662 m_farmIdsTable.insert(
663 std::make_pair(task->getTask(i)->m_id, *jtor));
664 }
665 }
666 } else {
667 bool suspended = false;
668 QString id = m_controller->addTask(*task, suspended);
669 m_farmIdsTable.insert(std::make_pair(task->m_id, id));
670
671 if (dynamic_cast<TFarmTaskGroup *>(task)) {
672 std::vector<QString> subtasks;
673 m_controller->getTasks(id, subtasks);
674 assert((int)subtasks.size() == task->getTaskCount());
675
676 std::vector<QString>::iterator jtor = subtasks.begin();
677 for (int i = 0; i < task->getTaskCount(); ++i, ++jtor)
678 m_farmIdsTable.insert(std::make_pair(task->getTask(i)->m_id, *jtor));
679 }
680 }
681 }
682 notify();
683 }
684
685 //------------------------------------------------------------------------------
686
stop(const QString & taskId)687 void BatchesController::stop(const QString &taskId) {
688 std::map<QString, TFarmTask *>::const_iterator it = m_tasks.find(taskId);
689 if (it == m_tasks.end()) return;
690
691 TFarmTask *task = it->second;
692 assert(task);
693
694 // Update the task status
695 if (task->m_status == Waiting) task->m_status = Suspended;
696
697 // Local farm <==> m_controller != 0
698 if (!m_controller) {
699 QMutexLocker sl(&TasksMutex);
700 std::map<QString, QProcess *>::iterator it;
701
702 // Kill the associated process if any
703 if ((it = RunningTasks.find(task->m_id)) != RunningTasks.end()) {
704 it->second->kill();
705 RunningTasks.erase(task->m_id);
706 }
707
708 // Do the same for child tasks
709 int count = task->getTaskCount();
710 if (count > 1) {
711 for (int i = 0; i < count; ++i) {
712 TFarmTask *subtask = task->getTask(i);
713 if (subtask->m_status == Waiting) subtask->m_status = Suspended;
714 if ((it = RunningTasks.find(subtask->m_id)) != RunningTasks.end()) {
715 it->second->kill();
716 RunningTasks.erase(subtask->m_id);
717 }
718 }
719 }
720 } else {
721 std::map<QString, QString>::iterator it = m_farmIdsTable.find(task->m_id);
722 if (it != m_farmIdsTable.end()) {
723 QString farmId = it->second;
724 m_controller->suspendTask(farmId);
725 }
726 }
727 notify();
728 }
729
730 //------------------------------------------------------------------------------
731
stopAll()732 void BatchesController::stopAll() {
733 if (!m_controller) {
734 // Stop all running QProcesses
735 QMutexLocker sl(&TasksMutex);
736
737 std::map<QString, QProcess *>::iterator it;
738 for (it = RunningTasks.begin(); it != RunningTasks.end(); ++it)
739 it->second->kill();
740
741 RunningTasks.clear();
742 m_localExecutor.cancelAll();
743 } else {
744 // Suspend all farmIds
745 std::map<QString, QString>::iterator it;
746 for (it = m_farmIdsTable.begin(); it != m_farmIdsTable.end(); ++it) {
747 QString farmId = it->second;
748 m_controller->suspendTask(farmId);
749 }
750 }
751
752 // Update all waiting task status
753 std::map<QString, TFarmTask *>::iterator jt;
754 for (jt = m_tasks.begin(); jt != m_tasks.end(); ++jt) {
755 if (jt->second->m_status == Waiting) jt->second->m_status = Suspended;
756 }
757
758 notify();
759 }
760
761 //------------------------------------------------------------------------------
762
onExit(bool & ret)763 void BatchesController::onExit(bool &ret) {
764 int answer = 0;
765
766 if (m_dirtyFlag)
767 answer =
768 DVGui::MsgBox(QString(tr("The current task list has been modified.\nDo "
769 "you want to save your changes?")),
770 tr("Save"), tr("Discard"), tr("Cancel"), 1);
771
772 ret = true;
773 if (answer == 3)
774 ret = false;
775 else if (answer == 1)
776 save();
777 }
778
779 //------------------------------------------------------------------------------
780
load()781 void BatchesController::load() {
782 static LoadTaskListPopup *popup = 0;
783
784 if (!popup) popup = new LoadTaskListPopup();
785 popup->exec();
786 }
787
788 //------------------------------------------------------------------------------
loadTask(bool isRenderTask)789 void BatchesController::loadTask(bool isRenderTask) {
790 static LoadTaskPopup *popup = new LoadTaskPopup();
791 popup->open(isRenderTask);
792 }
793
794 //------------------------------------------------------------------------------
795
doLoad(const TFilePath & fp)796 void BatchesController::doLoad(const TFilePath &fp) {
797 if (m_dirtyFlag) {
798 int ret =
799 DVGui::MsgBox(QString(tr("The current task list has been modified.\nDo "
800 "you want to save your changes?")),
801 tr("Save"), tr("Discard"), tr("Cancel"));
802 if (ret == 1)
803 save();
804 else if (ret == 3 || ret == 0)
805 return;
806 }
807
808 setDirtyFlag(false);
809
810 m_tasks.clear();
811 m_farmIdsTable.clear();
812
813 try {
814 TIStream is(fp);
815 if (is) {
816 } else
817 throw TException(fp.getWideString() + L": Can't open file");
818
819 m_filepath = fp;
820
821 std::string tagName = "";
822 if (!is.matchTag(tagName)) throw TException("Bad file format");
823
824 if (tagName == "tnzbatches") {
825 std::string rootTagName = tagName;
826 std::string v = is.getTagAttribute("version");
827 while (is.matchTag(tagName)) {
828 if (tagName == "generator") {
829 std::string program = is.getString();
830 } else if (tagName == "batch") {
831 while (!is.eos()) {
832 TPersist *p = 0;
833 is >> p;
834 TFarmTask *task = dynamic_cast<TFarmTask *>(p);
835 if (task) addTask(task->m_id, task, false);
836 }
837 } else {
838 throw TException(tagName + " : unexpected tag");
839 }
840
841 if (!is.matchEndTag()) throw TException(tagName + " : missing end tag");
842 }
843 if (!is.matchEndTag())
844 throw TException(rootTagName + " : missing end tag");
845 } else {
846 throw TException("Bad file format");
847 }
848 } catch (TException &e) {
849 throw e;
850 } catch (...) {
851 throw TException("Loading error.");
852 }
853 update();
854 notify();
855 }
856
857 //------------------------------------------------------------------------------
858
getListName() const859 QString BatchesController::getListName() const {
860 QString str = (m_filepath == TFilePath()
861 ? tr("Tasks")
862 : QString::fromStdString(m_filepath.getName()));
863 return str + (m_dirtyFlag ? "*" : "");
864 }
865
866 //------------------------------------------------------------------------------
867
saveas()868 void BatchesController::saveas() {
869 if (getTaskCount() == 0) {
870 DVGui::warning(tr("The Task List is empty!"));
871 return;
872 }
873
874 static SaveTaskListPopup *popup = 0;
875 if (!popup) popup = new SaveTaskListPopup();
876
877 popup->exec();
878 }
879
save()880 void BatchesController::save() {
881 if (m_filepath.isEmpty())
882 saveas();
883 else
884 doSave();
885 }
886
doSave(const TFilePath & _fp)887 void BatchesController::doSave(const TFilePath &_fp) {
888 setDirtyFlag(false);
889
890 TFilePath fp;
891 if (_fp == TFilePath())
892 fp = m_filepath;
893 else
894 fp = m_filepath = _fp;
895
896 TOStream os(fp);
897
898 std::map<std::string, std::string> attr;
899 attr["version"] = "1.0";
900
901 os.openChild("tnzbatches", attr);
902 os.child("generator") << TEnv::getApplicationFullName();
903
904 os.openChild("batch");
905 std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
906 for (; it != m_tasks.end(); ++it) {
907 TFarmTask *task = it->second;
908 assert(task);
909 os << task;
910 }
911 os.closeChild(); // "batch"
912 os.closeChild(); // "tnzbatches"
913
914 update();
915 }
916
917 //------------------------------------------------------------------------------
918
attach(BatchesController::Observer * obs)919 void BatchesController::attach(BatchesController::Observer *obs) {
920 m_observers.insert(obs);
921 }
922
923 //------------------------------------------------------------------------------
924
detach(BatchesController::Observer * obs)925 void BatchesController::detach(BatchesController::Observer *obs) {
926 m_observers.erase(obs);
927 }
928
929 //------------------------------------------------------------------------------
930
931 namespace {
932
notifyObserver(BatchesController::Observer * obs)933 void notifyObserver(BatchesController::Observer *obs) { obs->update(); }
934 }
935
notify()936 void BatchesController::notify() {
937 std::for_each(m_observers.begin(), m_observers.end(), notifyObserver);
938 }
939
940 //------------------------------------------------------------------------------
941
setController(TFarmController * controller)942 void BatchesController::setController(TFarmController *controller) {
943 m_controller = controller;
944 }
945
946 //------------------------------------------------------------------------------
947
getController() const948 TFarmController *BatchesController::getController() const {
949 return m_controller;
950 }
951
952 //------------------------------------------------------------------------------
953
954 namespace {
955
956 class ControllerFailureMsg final : public TThread::Message {
957 public:
ControllerFailureMsg(const TException & e)958 ControllerFailureMsg(const TException &e) : m_e(e) {}
959
onDeliver()960 void onDeliver() override {
961 // throw m_e;
962 }
963
clone() const964 TThread::Message *clone() const override {
965 return new ControllerFailureMsg(m_e);
966 }
967
968 TException m_e;
969 };
970
971 } // namespace
972
973 //------------------------------------------------------------------------------
974
update()975 void BatchesController::update() {
976 if (m_controller) {
977 try {
978 std::map<QString, QString>::iterator it = m_farmIdsTable.begin();
979 for (; it != m_farmIdsTable.end(); ++it) {
980 QString farmTaskId = it->second;
981 QString batchesTaskId = it->first;
982
983 TFarmTask *batchesTask = getTask(batchesTaskId);
984 TFarmTask farmTask = *batchesTask;
985
986 if (batchesTask) {
987 QString batchesTaskParentId = batchesTask->m_parentId;
988 m_controller->queryTaskInfo(farmTaskId, farmTask);
989 int chunkSize = batchesTask->m_chunkSize;
990 *batchesTask = farmTask;
991 batchesTask->m_chunkSize = chunkSize;
992 batchesTask->m_id = batchesTaskId;
993 batchesTask->m_parentId = batchesTaskParentId;
994 }
995 }
996 } catch (TException &e) {
997 ControllerFailureMsg(e).send();
998 }
999 }
1000
1001 std::map<QString, TFarmTask *>::iterator jtor = m_tasks.begin();
1002 for (; jtor != m_tasks.end(); ++jtor) {
1003 TFarmTask *task = jtor->second;
1004 assert(task);
1005 if (task->m_status == Completed) m_farmIdsTable.erase(task->m_id);
1006 }
1007
1008 assert(m_tasksTree);
1009 m_tasksTree->setupModelData();
1010
1011 notify();
1012 }
1013
1014 //------------------------------------------------------------------------------
1015
1016 BatchesController *BatchesController::m_instance;
1017
1018 //------------------------------------------------------------------------------
1019 //------------------------------------------------------------------------------
1020
~Observer()1021 BatchesController::Observer::~Observer() {}
1022
1023 //------------------------------------------------------------------------------
1024 //------------------------------------------------------------------------------
1025 //------------------------------------------------------------------------------
1026 //------------------------------------------------------------------------------
1027
1028 namespace {
1029
1030 class MyLocalController final : public TFarmController, public TFarmExecutor {
1031 public:
MyLocalController(int port)1032 MyLocalController(int port) : TFarmExecutor(port) {}
1033
1034 QString execute(const std::vector<QString> &argv) override;
1035
1036 QString addTask(const TFarmTask &task, bool suspended) override;
1037 void removeTask(const QString &id) override;
1038 void suspendTask(const QString &id) override;
1039 void activateTask(const QString &id) override;
1040 void restartTask(const QString &id) override;
1041
1042 void getTasks(std::vector<QString> &tasks) override;
1043 void getTasks(const QString &parentId, std::vector<QString> &tasks) override;
1044 void getTasks(const QString &parentId,
1045 std::vector<TaskShortInfo> &tasks) override;
1046
1047 void queryTaskInfo(const QString &id, TFarmTask &task) override;
1048
1049 void queryTaskShortInfo(const QString &id, QString &parentId, QString &name,
1050 TaskState &status) override;
1051
attachServer(const QString & name,const QString & addr,int port)1052 void attachServer(const QString &name, const QString &addr,
1053 int port) override {
1054 assert(false);
1055 }
1056
detachServer(const QString & name,const QString & addr,int port)1057 void detachServer(const QString &name, const QString &addr,
1058 int port) override {
1059 assert(false);
1060 }
1061
taskSubmissionError(const QString & taskId,int errCode)1062 void taskSubmissionError(const QString &taskId, int errCode) override {
1063 assert(false);
1064 }
1065
1066 void taskProgress(const QString &taskId, int step, int stepCount,
1067 int frameNumber, FrameState state) override;
1068
1069 void taskCompleted(const QString &taskId, int exitCode) override;
1070
getServers(std::vector<ServerIdentity> & servers)1071 void getServers(std::vector<ServerIdentity> &servers) override {
1072 assert(false);
1073 }
1074
queryServerState2(const QString & id)1075 ServerState queryServerState2(const QString &id) override {
1076 assert(false);
1077 return ServerUnknown;
1078 }
1079
queryServerInfo(const QString & id,ServerInfo & info)1080 void queryServerInfo(const QString &id, ServerInfo &info) override {
1081 assert(false);
1082 }
1083
activateServer(const QString & id)1084 void activateServer(const QString &id) override { assert(false); }
1085
deactivateServer(const QString & id,bool completeRunningTasks)1086 void deactivateServer(const QString &id, bool completeRunningTasks) override {
1087 assert(false);
1088 }
1089
1090 private:
1091 std::map<QString, TFarmTask> m_tasks;
1092 };
1093
execute(const std::vector<QString> & argv)1094 QString MyLocalController::execute(const std::vector<QString> &argv) {
1095 if (argv.size() > 5 && argv[0] == "taskProgress") {
1096 int step, stepCount, frameNumber;
1097
1098 fromStr(step, argv[2]);
1099 fromStr(stepCount, argv[3]);
1100 fromStr(frameNumber, argv[4]);
1101
1102 FrameState state;
1103 fromStr((int &)state, argv[5]);
1104 taskProgress(argv[1], step, stepCount, frameNumber, state);
1105 return "";
1106 } else if (argv.size() > 2 && argv[0] == "taskCompleted") {
1107 QString taskId = argv[1];
1108 int exitCode;
1109 fromStr(exitCode, argv[2]);
1110
1111 taskCompleted(taskId, exitCode);
1112 return "";
1113 }
1114
1115 return "";
1116 }
1117
addTask(const TFarmTask & task,bool suspended)1118 QString MyLocalController::addTask(const TFarmTask &task, bool suspended) {
1119 assert(false);
1120 return "";
1121 }
1122
removeTask(const QString & id)1123 void MyLocalController::removeTask(const QString &id) { assert(false); }
1124
suspendTask(const QString & id)1125 void MyLocalController::suspendTask(const QString &id) { assert(false); }
1126
activateTask(const QString & id)1127 void MyLocalController::activateTask(const QString &id) { assert(false); }
1128
restartTask(const QString & id)1129 void MyLocalController::restartTask(const QString &id) { assert(false); }
1130
getTasks(std::vector<QString> & tasks)1131 void MyLocalController::getTasks(std::vector<QString> &tasks) { assert(false); }
1132
getTasks(const QString & parentId,std::vector<QString> & tasks)1133 void MyLocalController::getTasks(const QString &parentId,
1134 std::vector<QString> &tasks) {
1135 assert(false);
1136 }
1137
getTasks(const QString & parentId,std::vector<TaskShortInfo> & tasks)1138 void MyLocalController::getTasks(const QString &parentId,
1139 std::vector<TaskShortInfo> &tasks) {
1140 assert(false);
1141 }
1142
queryTaskInfo(const QString & id,TFarmTask & task)1143 void MyLocalController::queryTaskInfo(const QString &id, TFarmTask &task) {
1144 assert(false);
1145 }
1146
queryTaskShortInfo(const QString & id,QString & parentId,QString & name,TaskState & status)1147 void MyLocalController::queryTaskShortInfo(const QString &id, QString &parentId,
1148 QString &name, TaskState &status) {
1149 assert(false);
1150 }
1151
taskProgress(const QString & taskId,int step,int stepCount,int frameNumber,FrameState state)1152 void MyLocalController::taskProgress(const QString &taskId, int step,
1153 int stepCount, int frameNumber,
1154 FrameState state) {
1155 TFarmTask *task = BatchesController::instance()->getTask(taskId);
1156 assert(task);
1157
1158 if (state == FrameDone)
1159 ++task->m_successfullSteps;
1160 else
1161 ++task->m_failedSteps;
1162
1163 if (task->m_parentId != "") {
1164 TFarmTask *taskParent =
1165 BatchesController::instance()->getTask(task->m_parentId);
1166 assert(taskParent);
1167 if (state == FrameDone)
1168 ++taskParent->m_successfullSteps;
1169 else
1170 ++taskParent->m_failedSteps;
1171 }
1172
1173 NotifyMessage().send();
1174 }
1175
taskCompleted(const QString & taskId,int exitCode)1176 void MyLocalController::taskCompleted(const QString &taskId, int exitCode) {}
1177
1178 class MyLocalControllerController final : public TThread::Runnable {
1179 MyLocalController *m_controller;
1180
1181 public:
MyLocalControllerController(int port)1182 MyLocalControllerController(int port)
1183 : m_controller(new MyLocalController(port)) {
1184 TThread::Executor executor;
1185 executor.addTask(this);
1186 connect(this, SIGNAL(finished(TThread::RunnableP)), this,
1187 SLOT(onFinished(TThread::RunnableP)));
1188 connect(this, SIGNAL(exception(TThread::RunnableP)), this,
1189 SLOT(onFinished(TThread::RunnableP)));
1190 }
1191
run()1192 void run() override { m_controller->run(); }
1193
1194 public slots:
1195
onFinished(TThread::RunnableP thisTask)1196 void onFinished(TThread::RunnableP thisTask) override {
1197 BatchesController::instance()->notify();
1198 }
1199 };
1200
1201 } // anonymous namespace
1202
1203 //------------------------------------------------------------------------------
1204 //------------------------------------------------------------------------------
1205
instance()1206 BatchesController *BatchesController::instance() {
1207 if (!m_instance) {
1208 m_instance = new BatchesController;
1209
1210 // scandisce un range di porte fino a troverne una libera da utilizzare come
1211 // porta del local controller
1212
1213 const int portRangeSize = 100;
1214
1215 int i = 0;
1216 int portNumber = cPortNumber;
1217 for (; i < portRangeSize; ++i, ++portNumber) {
1218 bool portBusy = TFarmStuff::testConnection("localhost", portNumber);
1219 if (!portBusy) break;
1220 }
1221
1222 if (i < portRangeSize) {
1223 m_instance->m_localControllerPortNumber = portNumber;
1224
1225 // static MyLocalControllerController *TheLocalController =
1226 new MyLocalControllerController(portNumber);
1227 }
1228 }
1229 return m_instance;
1230 }
1231