1 
2 
3 #ifdef _WIN32
4 #define _WIN32_WINNT 0x0500  // per CreateJobObject e affini
5 #endif
6 
7 #include "tsystem.h"
8 #include "mainwindow.h"
9 #include "batches.h"
10 #include "tthreadmessage.h"
11 #include "tfarmexecutor.h"
12 #include "tstream.h"
13 #include "tfarmstuff.h"
14 #include "tenv.h"
15 #include "toutputproperties.h"
16 #include "trasterfx.h"
17 #include "tasksviewer.h"
18 #include "tapp.h"
19 #include "filebrowserpopup.h"
20 #include "tmsgcore.h"
21 #include "toonz/toonzscene.h"
22 #include "toonz/sceneproperties.h"
23 #include "toonz/preferences.h"
24 
25 #include "toonzqt/gutil.h"
26 
27 #include <QString>
28 #include <QProcess>
29 #include <QHostInfo>
30 
31 namespace {
32 
33 class NotifyMessage final : public TThread::Message {
34 public:
NotifyMessage()35   NotifyMessage() {}
36 
onDeliver()37   void onDeliver() override { BatchesController::instance()->update(); }
38 
clone() const39   TThread::Message *clone() const override { return new NotifyMessage(*this); }
40 };
41 
42 }  // namespace
43 
44 //------------------------------------------------------------------------------------------------------------------
45 //------------------------------------------------------------------------------------------------------------------
46 //----------------------------------------------------------------------------------------------------------------
47 //----------------------------------------------------------------------------------------------------------------
48 
LoadTaskListPopup()49 LoadTaskListPopup::LoadTaskListPopup()
50     : FileBrowserPopup(tr("Load Task List")) {
51   addFilterType("tnzbat");
52   setOkText(tr("Load"));
53 }
54 
execute()55 bool LoadTaskListPopup::execute() {
56   if (m_selectedPaths.empty()) return false;
57 
58   const TFilePath &fp = *m_selectedPaths.begin();
59 
60   if (!TSystem::doesExistFileOrLevel(fp)) {
61     DVGui::error(toQString(fp) + tr(" does not exist."));
62     return false;
63   } else if (fp.getType() != "tnzbat") {
64     DVGui::error(toQString(fp) +
65                  tr("It is possible to load only TNZBAT files."));
66     return false;
67   }
68 
69   BatchesController::instance()->doLoad(fp);
70   return true;
71 }
72 
73 //----------------------------------------------------------------------------------------------------------------
74 
LoadTaskPopup()75 LoadTaskPopup::LoadTaskPopup() : FileBrowserPopup(""), m_isRenderTask(true) {
76   addFilterType("tnz");
77   setOkText(tr("Add"));
78 }
79 
open(bool isRenderTask)80 void LoadTaskPopup::open(bool isRenderTask) {
81   m_isRenderTask = isRenderTask;
82   setWindowTitle(isRenderTask ? tr("Add Render Task to Batch List")
83                               : tr("Add Cleanup Task to Batch List"));
84 
85   if (isRenderTask)
86     removeFilterType("cln");
87   else
88     addFilterType("cln");
89 
90   exec();
91 }
92 
execute()93 bool LoadTaskPopup::execute() {
94   if (m_selectedPaths.empty()) return false;
95 
96   const TFilePath &fp = *m_selectedPaths.begin();
97 
98   if (!TSystem::doesExistFileOrLevel(fp)) {
99     DVGui::error(toQString(fp) + tr(" does not exist."));
100     return false;
101   } else if (m_isRenderTask && fp.getType() != "tnz") {
102     DVGui::error(toQString(fp) +
103                  tr(" you can load only TNZ files for render task."));
104     return false;
105   } else if (!m_isRenderTask && fp.getType() != "tnz" &&
106              fp.getType() != "cln") {
107     DVGui::error(toQString(fp) +
108                  tr(" you can load only TNZ or CLN files for cleanup task."));
109     return false;
110   }
111 
112   if (m_isRenderTask)
113     BatchesController::instance()->addComposerTask(fp);
114   else
115     BatchesController::instance()->addCleanupTask(fp);
116 
117   return true;
118 }
119 
120 //=============================================================================
121 
SaveTaskListPopup()122 SaveTaskListPopup::SaveTaskListPopup()
123     : FileBrowserPopup(tr("Save Task List")) {
124   setOkText(tr("Save"));
125 }
126 
execute()127 bool SaveTaskListPopup::execute() {
128   if (m_selectedPaths.empty()) return false;
129 
130   const TFilePath &fp = *m_selectedPaths.begin();
131 
132   BatchesController::instance()->doSave(fp.withType("tnzbat"));
133   return true;
134 }
135 
136 //------------------------------------------------------------------------------
137 QMutex TasksMutex;
138 std::map<QString, QProcess *> RunningTasks;
139 
140 class TaskRunner final : public TThread::Runnable {
141 public:
TaskRunner(TFarmTask * task,int localControllerPort)142   TaskRunner(TFarmTask *task, int localControllerPort)
143       : m_task(task), m_localControllerPort(localControllerPort) {}
144 
145   void run() override;
146 
147   int taskLoad() override;
148 
149   void doRun(TFarmTask *task);
150 
runCleanup()151   void runCleanup() { BatchesController::instance()->notify(); }
152 
153   TFarmTask *m_task;
154   int m_localControllerPort;
155 };
156 
run()157 void TaskRunner::run() { doRun(m_task); }
158 
taskLoad()159 int TaskRunner::taskLoad() {
160   // We assume that CPU control is under the user's responsibility about
161   // tcomposer tasks - so they don't figure in CPU consumption calculations.
162   return 0;
163 }
164 
165 /*QString getFarmAddress()
166 {
167 if (BatchesController::instance()->getController())
168   {
169   QString dummystr, address;
170   int dummyint;
171   TFarmStuff::getControllerData(dummystr, address, dummyint);
172 
173   return address;
174   }
175 else return  "127.0.0.1"; //localHost
176 }*/
177 
doRun(TFarmTask * task)178 void TaskRunner::doRun(TFarmTask *task) {
179   // Suspended tasks are not executed. (Should only waiting ones be executed?)
180   if (task->m_status == Suspended) return;
181 
182   if (task->m_dependencies && task->m_dependencies->getTaskCount() != 0) {
183     task->m_status = Waiting;
184     int depCount   = task->m_dependencies->getTaskCount();
185     for (int i = 0; i < depCount; ++i) {
186       QString depTaskId  = task->m_dependencies->getTaskId(i);
187       TFarmTask *depTask = BatchesController::instance()->getTask(depTaskId);
188       if (depTask) {
189         doRun(depTask);
190       }
191     }
192   }
193 
194   task->m_successfullSteps = task->m_failedSteps = 0;
195   task->m_completionDate                         = QDateTime();
196 
197   task->m_status    = Running;
198   task->m_server    = TSystem::getHostName();
199   task->m_startDate = QDateTime::currentDateTime();
200 
201   NotifyMessage().send();
202 
203   int count = task->getTaskCount();
204   if (count > 1) {
205     // Launch each subtask
206     for (int i = 0; i < task->getTaskCount(); i++) {
207       if (task->m_status == Suspended) break;
208 
209       doRun(task->getTask(i));
210     }
211 
212     // Check their status
213     task->m_status           = Completed;
214     task->m_successfullSteps = task->m_to - task->m_from + 1;
215     task->m_completionDate   = QDateTime::currentDateTime();
216     for (int i = 0; i < task->getTaskCount(); i++) {
217       if (task->getTask(i)->m_status == Aborted) task->m_status = Aborted;
218     }
219 
220     NotifyMessage().send();
221 
222     return;
223   }
224 
225   // int exitCode;
226 
227   /*QString commandline = task->getCommandLine();
228 
229 commandline += " -farm ";
230 commandline += QString::number(m_localControllerPort) + "@localhost";
231 commandline += " -id " + task->m_id;*/
232 
233   QProcess *process = new QProcess();
234   std::map<QString, QProcess *>::iterator it;
235 
236   {
237     QMutexLocker sl(&TasksMutex);
238     if ((it = RunningTasks.find(task->m_id)) != RunningTasks.end()) {
239       it->second->kill();
240       RunningTasks.erase(task->m_id);
241     }
242 
243     RunningTasks[task->m_id] = process;
244   }
245 
246   process->start(task->getCommandLine());
247   process->waitForFinished(-1);
248 
249   {
250     QMutexLocker sl(&TasksMutex);
251     RunningTasks.erase(task->m_id);
252   }
253   // exitCode = QProcess::execute(task->getCommandLine());
254 
255   int error     = process->error();
256   bool statusOK = (error == QProcess::UnknownError) &&
257                   (process->exitCode() == 0 ||
258                    task->m_successfullSteps == task->m_stepCount);
259 
260   if (statusOK) {
261     task->m_status           = Completed;
262     task->m_successfullSteps = task->m_to - task->m_from + 1;
263   } else {
264     task->m_status      = Aborted;
265     task->m_failedSteps = task->m_to - task->m_from + 1;
266   }
267 
268   task->m_completionDate = QDateTime::currentDateTime();
269 
270   NotifyMessage().send();
271 }
272 
273 //------------------------------------------------------------------------------
274 //------------------------------------------------------------------------------
275 
BatchesController()276 BatchesController::BatchesController()
277     : m_controller(0), m_tasksTree(0), m_filepath(), m_dirtyFlag(false) {}
278 
279 //------------------------------------------------------------------------------
280 
~BatchesController()281 BatchesController::~BatchesController() {}
282 
283 //------------------------------------------------------------------------------
284 
setTasksTree(TaskTreeModel * tree)285 void BatchesController::setTasksTree(TaskTreeModel *tree) {
286   m_tasksTree = tree;
287 }
288 
289 //------------------------------------------------------------------------------
290 
isMovieType(std::string type)291 inline bool isMovieType(std::string type) {
292   return (type == "mov" || type == "avi" || type == "3gp");
293 }
294 
295 //------------------------------------------------------------------------------
296 
297 static int TaskCounter = 1;
298 
299 //------------------------------------------------------------------------------
300 
addCleanupTask(const TFilePath & taskFilePath)301 void BatchesController::addCleanupTask(const TFilePath &taskFilePath) {
302   setDirtyFlag(true);
303   QString id = QString::number(TaskCounter++);
304   while (BatchesController::instance()->getTask(id))
305     id = QString::number(TaskCounter++);
306 
307   TFarmTaskGroup *taskGroup = new TFarmTaskGroup(
308       id,                                              // id
309       QString::fromStdString(taskFilePath.getName()),  // name
310       TSystem::getUserName(),                          // user
311       TSystem::getHostName(),                          // host
312       1,                                               // stepCount
313       50,                                              // priority
314       taskFilePath,                                    // taskFilePath
315       Overwrite_Off,                                   // Overwrite
316       false);                                          // onlyvisible
317 
318   try {
319     BatchesController::instance()->addTask(id, taskGroup);
320   } catch (TException &) {
321   }
322 }
323 
324 //------------------------------------------------------------------------------
325 
addComposerTask(const TFilePath & _taskFilePath)326 void BatchesController::addComposerTask(const TFilePath &_taskFilePath) {
327   setDirtyFlag(true);
328   TFilePath taskFilePath;
329   try {
330     taskFilePath = TSystem::toUNC(_taskFilePath);
331   } catch (TException &) {
332   }
333 
334   ToonzScene scene;
335   try {
336     scene.loadNoResources(taskFilePath);
337   } catch (...) {
338     return;
339   }
340 
341   TSceneProperties *sprop      = scene.getProperties();
342   const TOutputProperties &out = *sprop->getOutputProperties();
343   const TRenderSettings &rs    = out.getRenderSettings();
344 
345   QString name = QString::fromStdString(taskFilePath.getName());
346   int r0, r1, step;
347   out.getRange(r0, r1, step);
348 
349   int sceneFrameCount = scene.getFrameCount();
350   if (r0 < 0) r0      = 0;
351   if (r1 >= sceneFrameCount)
352     r1 = sceneFrameCount - 1;
353   else if (r1 < r0)
354     r1 = sceneFrameCount - 1;
355 
356   r0++, r1++;
357   int shrink = rs.m_shrinkX;
358 
359   int multimedia       = out.getMultimediaRendering();
360   int threadsIndex     = out.getThreadIndex();
361   int maxTileSizeIndex = out.getMaxTileSizeIndex();
362 
363   TFilePath outputPath =
364       scene.decodeFilePath(sprop->getOutputProperties()->getPath());
365   if (outputPath.getWideName() == L"")
366     outputPath = outputPath.withName(scene.getScenePath().getName());
367   int taskChunkSize;
368   if (isMovieType(outputPath.getType()))
369     taskChunkSize = sceneFrameCount;
370   else {
371     int size = Preferences::instance()->getDefaultTaskChunkSize();
372     if (r1 - r0 + 1 < size)
373       taskChunkSize = r1 - r0 + 1;
374     else
375       taskChunkSize = size;
376   }
377 
378   QString id = QString::number(TaskCounter++);
379   while (BatchesController::instance()->getTask(id))
380     id = QString::number(TaskCounter++);
381 
382   TFarmTaskGroup *taskGroup = new TFarmTaskGroup(
383       id, name, TSystem::getUserName(), TSystem::getHostName(), sceneFrameCount,
384       50, taskFilePath, outputPath, r0, r1, step, shrink, multimedia,
385       taskChunkSize, threadsIndex, maxTileSizeIndex);
386 
387   try {
388     BatchesController::instance()->addTask(id, taskGroup);
389   } catch (TException &) {
390     // TMessage::error(toString(e.getMessage()));
391   }
392   // m_data->m_scene.setProject( mainprogramProj);
393   // TModalPopup::closePopup();
394 }
395 
396 //-------------------------------------------------------------------------------------------------------
397 
398 /*--- id はタスクを追加するごとにインクリメントされる数字 ---*/
addTask(const QString & id,TFarmTask * task,bool doUpdate)399 void BatchesController::addTask(const QString &id, TFarmTask *task,
400                                 bool doUpdate) {
401 #ifdef _DEBUG
402   std::map<QString, TFarmTask *>::iterator it = m_tasks.find(id);
403 // assert(it == m_tasks.end());
404 #endif
405   if (task->m_id.isEmpty()) task->m_id = id;
406 
407   task->m_status         = Suspended;
408   task->m_submissionDate = QDateTime::currentDateTime();
409 
410   m_tasks.insert(std::make_pair(id, task));
411 
412   int count = task->getTaskCount();
413   if (count == 1 && task->getTask(0) == task) return;
414 
415   for (int i = 0; i < count; ++i) {
416     TFarmTask *subtask = task->getTask(i);
417     assert(subtask);
418     addTask(subtask->m_id, subtask);
419   }
420   if (doUpdate) update();
421 }
422 
423 //------------------------------------------------------------------------------
424 
taskBusyStr()425 QString BatchesController::taskBusyStr() {
426   return tr(
427       "The %1 task is currently active.\nStop it or wait for its completion "
428       "before removing it.");
429 }
430 
431 //------------------------------------------------------------------------------
432 
removeTask(const QString & id)433 void BatchesController::removeTask(const QString &id) {
434   std::map<QString, TFarmTask *>::iterator it = m_tasks.find(id);
435   if (it == m_tasks.end()) return;
436 
437   TFarmTask *task = it->second;
438   if (task->m_status == Running || task->m_status == Waiting) {
439     DVGui::warning(taskBusyStr().arg(task->m_name));
440     return;
441   }
442 
443   setDirtyFlag(true);
444 
445   if (!task->m_parentId.isEmpty()) {
446     it                     = m_tasks.find(task->m_parentId);
447     TFarmTaskGroup *parent = dynamic_cast<TFarmTaskGroup *>(it->second);
448     assert(parent);
449     parent->removeTask(task);
450   }
451 
452   TFarmTaskGroup *taskGroup = dynamic_cast<TFarmTaskGroup *>(task);
453   if (taskGroup) {
454     if (m_controller) {
455       // si sta usando la farm
456       std::map<QString, QString>::iterator it = m_farmIdsTable.find(id);
457       if (it != m_farmIdsTable.end()) {
458         m_controller->suspendTask(it->second);
459       }
460     }
461 
462     // rimuove i subtask
463     int taskCount = taskGroup->getTaskCount();
464     for (int i = 0; i < taskCount; ++i) {
465       TFarmTask *subTask = taskGroup->getTask(i);
466       assert(subTask);
467       m_tasks.erase(subTask->m_id);
468     }
469   }
470 
471   m_tasks.erase(id);
472   delete task;
473 }
474 
475 //------------------------------------------------------------------------------
476 
477 namespace {
DeleteTask(const std::pair<QString,TFarmTask * > & mapItem)478 void DeleteTask(const std::pair<QString, TFarmTask *> &mapItem) {
479   if (mapItem.second->m_parentId.isEmpty()) delete mapItem.second;
480 }
481 }
482 
removeAllTasks()483 void BatchesController::removeAllTasks() {
484   std::map<QString, TFarmTask *>::iterator tt, tEnd(m_tasks.end());
485   for (tt = m_tasks.begin(); tt != tEnd; ++tt) {
486     TFarmTask *task = tt->second;
487     if (task->m_status == Running || task->m_status == Waiting) {
488       DVGui::warning(taskBusyStr().arg(task->m_name));
489       return;
490     }
491   }
492 
493   m_filepath = TFilePath();
494   setDirtyFlag(false);
495 
496   m_tasks.clear();
497   notify();
498 }
499 
500 //------------------------------------------------------------------------------
501 
getTaskCount() const502 int BatchesController::getTaskCount() const { return m_tasks.size(); }
503 
504 //------------------------------------------------------------------------------
505 
getTaskId(int index) const506 QString BatchesController::getTaskId(int index) const {
507   assert(index >= 0 && index < (int)m_tasks.size());
508   std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
509   std::advance(it, index);
510   return it->first;
511 }
512 
513 //------------------------------------------------------------------------------
514 
getTask(int index) const515 TFarmTask *BatchesController::getTask(int index) const {
516   assert(index >= 0 && index < (int)m_tasks.size());
517   std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
518   std::advance(it, index);
519   return it->second;
520 }
521 
522 //------------------------------------------------------------------------------
523 
getTask(const QString & id) const524 TFarmTask *BatchesController::getTask(const QString &id) const {
525   std::map<QString, TFarmTask *>::const_iterator it = m_tasks.find(id);
526   if (it != m_tasks.end()) return it->second;
527   return 0;
528 }
529 
530 //------------------------------------------------------------------------------
531 
getTaskInfo(const QString & id,QString & parent,QString & name,TaskState & status)532 bool BatchesController::getTaskInfo(const QString &id, QString &parent,
533                                     QString &name, TaskState &status) {
534   TFarmTask *task = getTask(id);
535   if (task) {
536     parent = task->m_parentId;
537     name   = task->m_name;
538     status = task->m_status;
539   }
540 
541   return task != 0;
542 }
543 
544 //------------------------------------------------------------------------------
545 
getTaskStatus(const QString & id) const546 TaskState BatchesController::getTaskStatus(const QString &id) const {
547   TFarmTask *task = getTask(id);
548   if (task)
549     return task->m_status;
550   else
551     return TaskUnknown;
552 }
553 
setDirtyFlag(bool state)554 void BatchesController::setDirtyFlag(bool state) {
555   static bool FirstTime = true;
556 
557   if (FirstTime) {
558     FirstTime = false;
559     bool ret  = connect(TApp::instance()->getMainWindow(), SIGNAL(exit(bool &)),
560                        SLOT(onExit(bool &)));
561     assert(ret);
562   }
563 
564   if (state == m_dirtyFlag) return;
565 
566   m_dirtyFlag = state;
567   update();
568 }
569 
570 //------------------------------------------------------------------------------
571 
getTasks(const QString & parentId,std::vector<QString> & tasks) const572 void BatchesController::getTasks(const QString &parentId,
573                                  std::vector<QString> &tasks) const {
574   std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
575   for (; it != m_tasks.end(); ++it) {
576     TFarmTask *task = it->second;
577     assert(task);
578     if (task->m_parentId == parentId) tasks.push_back(task->m_id);
579   }
580 }
581 
582 //------------------------------------------------------------------------------
583 
startAll()584 void BatchesController::startAll() {
585   // viene usata la farm <==> m_controller != 0
586   if (!m_controller) {
587     // uso una multimap per ordinare rispetto alla priority
588     std::multimap<int, TFarmTask *> tasksByPriority;
589 
590     std::map<QString, TFarmTask *>::reverse_iterator it = m_tasks.rbegin();
591     // uso il reverse iterator anche qui altrimenti se ho task con priorita'
592     // uguale li esegue dall'ultimo al primo
593     for (; it != m_tasks.rend(); ++it) {
594       TFarmTask *task = it->second;
595       tasksByPriority.insert(std::make_pair(task->m_priority, task));
596     }
597 
598     std::multimap<int, TFarmTask *>::reverse_iterator it2 =
599         tasksByPriority.rbegin();
600     for (; it2 != tasksByPriority.rend(); ++it2) {
601       TFarmTask *task = it2->second;
602       assert(task);
603       task->m_status = Waiting;
604       if (task->m_parentId.isEmpty())
605         m_localExecutor.addTask(
606             new TaskRunner(task, m_localControllerPortNumber));
607     }
608   } else {
609     std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
610     for (; it != m_tasks.end(); ++it) {
611       TFarmTask *task = it->second;
612       assert(task);
613       if (task->m_parentId.isEmpty()) start(task->m_id);
614     }
615   }
616 
617   notify();
618 }
619 
620 //------------------------------------------------------------------------------
621 
start(const QString & taskId)622 void BatchesController::start(const QString &taskId) {
623   QString computerName = QHostInfo().localHostName();
624 
625   std::map<QString, TFarmTask *>::const_iterator it = m_tasks.find(taskId);
626   assert(it != m_tasks.end());
627   TFarmTask *task = it->second;
628   assert(task);
629   task->m_status      = Waiting;
630   task->m_failedSteps = task->m_successfullSteps = 0;
631   task->m_completionDate = task->m_startDate = QDateTime();
632   task->m_callerMachineName                  = computerName;
633   int count                                  = task->getTaskCount();
634   if (count > 1)
635     for (int i = 0; i < count; ++i) {
636       TFarmTask *subtask = task->getTask(i);
637       assert(subtask);
638       subtask->m_status      = Waiting;
639       subtask->m_failedSteps = subtask->m_successfullSteps = 0;
640       subtask->m_completionDate = subtask->m_startDate = QDateTime();
641       subtask->m_callerMachineName                     = computerName;
642     }
643 
644   // viene usata la farm <==> m_controller != 0
645   if (!m_controller) {
646     m_localExecutor.addTask(new TaskRunner(task, m_localControllerPortNumber));
647   } else {
648     std::map<QString, QString>::iterator ktor = m_farmIdsTable.find(taskId);
649     if (ktor != m_farmIdsTable.end()) {
650       QString farmTaskId = ktor->second;
651       m_controller->restartTask(farmTaskId);
652 
653       if (dynamic_cast<TFarmTaskGroup *>(task)) {
654         std::vector<QString> subtasks;
655         m_controller->getTasks(farmTaskId, subtasks);
656 
657         if (!subtasks.empty()) {
658           assert((int)subtasks.size() == task->getTaskCount());
659 
660           std::vector<QString>::iterator jtor = subtasks.begin();
661           for (int i = 0; i < task->getTaskCount(); ++i, ++jtor)
662             m_farmIdsTable.insert(
663                 std::make_pair(task->getTask(i)->m_id, *jtor));
664         }
665       }
666     } else {
667       bool suspended = false;
668       QString id     = m_controller->addTask(*task, suspended);
669       m_farmIdsTable.insert(std::make_pair(task->m_id, id));
670 
671       if (dynamic_cast<TFarmTaskGroup *>(task)) {
672         std::vector<QString> subtasks;
673         m_controller->getTasks(id, subtasks);
674         assert((int)subtasks.size() == task->getTaskCount());
675 
676         std::vector<QString>::iterator jtor = subtasks.begin();
677         for (int i = 0; i < task->getTaskCount(); ++i, ++jtor)
678           m_farmIdsTable.insert(std::make_pair(task->getTask(i)->m_id, *jtor));
679       }
680     }
681   }
682   notify();
683 }
684 
685 //------------------------------------------------------------------------------
686 
stop(const QString & taskId)687 void BatchesController::stop(const QString &taskId) {
688   std::map<QString, TFarmTask *>::const_iterator it = m_tasks.find(taskId);
689   if (it == m_tasks.end()) return;
690 
691   TFarmTask *task = it->second;
692   assert(task);
693 
694   // Update the task status
695   if (task->m_status == Waiting) task->m_status = Suspended;
696 
697   // Local farm <==> m_controller != 0
698   if (!m_controller) {
699     QMutexLocker sl(&TasksMutex);
700     std::map<QString, QProcess *>::iterator it;
701 
702     // Kill the associated process if any
703     if ((it = RunningTasks.find(task->m_id)) != RunningTasks.end()) {
704       it->second->kill();
705       RunningTasks.erase(task->m_id);
706     }
707 
708     // Do the same for child tasks
709     int count = task->getTaskCount();
710     if (count > 1) {
711       for (int i = 0; i < count; ++i) {
712         TFarmTask *subtask                                  = task->getTask(i);
713         if (subtask->m_status == Waiting) subtask->m_status = Suspended;
714         if ((it = RunningTasks.find(subtask->m_id)) != RunningTasks.end()) {
715           it->second->kill();
716           RunningTasks.erase(subtask->m_id);
717         }
718       }
719     }
720   } else {
721     std::map<QString, QString>::iterator it = m_farmIdsTable.find(task->m_id);
722     if (it != m_farmIdsTable.end()) {
723       QString farmId = it->second;
724       m_controller->suspendTask(farmId);
725     }
726   }
727   notify();
728 }
729 
730 //------------------------------------------------------------------------------
731 
stopAll()732 void BatchesController::stopAll() {
733   if (!m_controller) {
734     // Stop all running QProcesses
735     QMutexLocker sl(&TasksMutex);
736 
737     std::map<QString, QProcess *>::iterator it;
738     for (it = RunningTasks.begin(); it != RunningTasks.end(); ++it)
739       it->second->kill();
740 
741     RunningTasks.clear();
742     m_localExecutor.cancelAll();
743   } else {
744     // Suspend all farmIds
745     std::map<QString, QString>::iterator it;
746     for (it = m_farmIdsTable.begin(); it != m_farmIdsTable.end(); ++it) {
747       QString farmId = it->second;
748       m_controller->suspendTask(farmId);
749     }
750   }
751 
752   // Update all waiting task status
753   std::map<QString, TFarmTask *>::iterator jt;
754   for (jt = m_tasks.begin(); jt != m_tasks.end(); ++jt) {
755     if (jt->second->m_status == Waiting) jt->second->m_status = Suspended;
756   }
757 
758   notify();
759 }
760 
761 //------------------------------------------------------------------------------
762 
onExit(bool & ret)763 void BatchesController::onExit(bool &ret) {
764   int answer = 0;
765 
766   if (m_dirtyFlag)
767     answer =
768         DVGui::MsgBox(QString(tr("The current task list has been modified.\nDo "
769                                  "you want to save your changes?")),
770                       tr("Save"), tr("Discard"), tr("Cancel"), 1);
771 
772   ret = true;
773   if (answer == 3)
774     ret = false;
775   else if (answer == 1)
776     save();
777 }
778 
779 //------------------------------------------------------------------------------
780 
load()781 void BatchesController::load() {
782   static LoadTaskListPopup *popup = 0;
783 
784   if (!popup) popup = new LoadTaskListPopup();
785   popup->exec();
786 }
787 
788 //------------------------------------------------------------------------------
loadTask(bool isRenderTask)789 void BatchesController::loadTask(bool isRenderTask) {
790   static LoadTaskPopup *popup = new LoadTaskPopup();
791   popup->open(isRenderTask);
792 }
793 
794 //------------------------------------------------------------------------------
795 
doLoad(const TFilePath & fp)796 void BatchesController::doLoad(const TFilePath &fp) {
797   if (m_dirtyFlag) {
798     int ret =
799         DVGui::MsgBox(QString(tr("The current task list has been modified.\nDo "
800                                  "you want to save your changes?")),
801                       tr("Save"), tr("Discard"), tr("Cancel"));
802     if (ret == 1)
803       save();
804     else if (ret == 3 || ret == 0)
805       return;
806   }
807 
808   setDirtyFlag(false);
809 
810   m_tasks.clear();
811   m_farmIdsTable.clear();
812 
813   try {
814     TIStream is(fp);
815     if (is) {
816     } else
817       throw TException(fp.getWideString() + L": Can't open file");
818 
819     m_filepath = fp;
820 
821     std::string tagName = "";
822     if (!is.matchTag(tagName)) throw TException("Bad file format");
823 
824     if (tagName == "tnzbatches") {
825       std::string rootTagName = tagName;
826       std::string v           = is.getTagAttribute("version");
827       while (is.matchTag(tagName)) {
828         if (tagName == "generator") {
829           std::string program = is.getString();
830         } else if (tagName == "batch") {
831           while (!is.eos()) {
832             TPersist *p = 0;
833             is >> p;
834             TFarmTask *task = dynamic_cast<TFarmTask *>(p);
835             if (task) addTask(task->m_id, task, false);
836           }
837         } else {
838           throw TException(tagName + " : unexpected tag");
839         }
840 
841         if (!is.matchEndTag()) throw TException(tagName + " : missing end tag");
842       }
843       if (!is.matchEndTag())
844         throw TException(rootTagName + " : missing end tag");
845     } else {
846       throw TException("Bad file format");
847     }
848   } catch (TException &e) {
849     throw e;
850   } catch (...) {
851     throw TException("Loading error.");
852   }
853   update();
854   notify();
855 }
856 
857 //------------------------------------------------------------------------------
858 
getListName() const859 QString BatchesController::getListName() const {
860   QString str = (m_filepath == TFilePath()
861                      ? tr("Tasks")
862                      : QString::fromStdString(m_filepath.getName()));
863   return str + (m_dirtyFlag ? "*" : "");
864 }
865 
866 //------------------------------------------------------------------------------
867 
saveas()868 void BatchesController::saveas() {
869   if (getTaskCount() == 0) {
870     DVGui::warning(tr("The Task List is empty!"));
871     return;
872   }
873 
874   static SaveTaskListPopup *popup = 0;
875   if (!popup) popup               = new SaveTaskListPopup();
876 
877   popup->exec();
878 }
879 
save()880 void BatchesController::save() {
881   if (m_filepath.isEmpty())
882     saveas();
883   else
884     doSave();
885 }
886 
doSave(const TFilePath & _fp)887 void BatchesController::doSave(const TFilePath &_fp) {
888   setDirtyFlag(false);
889 
890   TFilePath fp;
891   if (_fp == TFilePath())
892     fp = m_filepath;
893   else
894     fp = m_filepath = _fp;
895 
896   TOStream os(fp);
897 
898   std::map<std::string, std::string> attr;
899   attr["version"] = "1.0";
900 
901   os.openChild("tnzbatches", attr);
902   os.child("generator") << TEnv::getApplicationFullName();
903 
904   os.openChild("batch");
905   std::map<QString, TFarmTask *>::const_iterator it = m_tasks.begin();
906   for (; it != m_tasks.end(); ++it) {
907     TFarmTask *task = it->second;
908     assert(task);
909     os << task;
910   }
911   os.closeChild();  // "batch"
912   os.closeChild();  // "tnzbatches"
913 
914   update();
915 }
916 
917 //------------------------------------------------------------------------------
918 
attach(BatchesController::Observer * obs)919 void BatchesController::attach(BatchesController::Observer *obs) {
920   m_observers.insert(obs);
921 }
922 
923 //------------------------------------------------------------------------------
924 
detach(BatchesController::Observer * obs)925 void BatchesController::detach(BatchesController::Observer *obs) {
926   m_observers.erase(obs);
927 }
928 
929 //------------------------------------------------------------------------------
930 
931 namespace {
932 
notifyObserver(BatchesController::Observer * obs)933 void notifyObserver(BatchesController::Observer *obs) { obs->update(); }
934 }
935 
notify()936 void BatchesController::notify() {
937   std::for_each(m_observers.begin(), m_observers.end(), notifyObserver);
938 }
939 
940 //------------------------------------------------------------------------------
941 
setController(TFarmController * controller)942 void BatchesController::setController(TFarmController *controller) {
943   m_controller = controller;
944 }
945 
946 //------------------------------------------------------------------------------
947 
getController() const948 TFarmController *BatchesController::getController() const {
949   return m_controller;
950 }
951 
952 //------------------------------------------------------------------------------
953 
954 namespace {
955 
956 class ControllerFailureMsg final : public TThread::Message {
957 public:
ControllerFailureMsg(const TException & e)958   ControllerFailureMsg(const TException &e) : m_e(e) {}
959 
onDeliver()960   void onDeliver() override {
961     // throw m_e;
962   }
963 
clone() const964   TThread::Message *clone() const override {
965     return new ControllerFailureMsg(m_e);
966   }
967 
968   TException m_e;
969 };
970 
971 }  // namespace
972 
973 //------------------------------------------------------------------------------
974 
update()975 void BatchesController::update() {
976   if (m_controller) {
977     try {
978       std::map<QString, QString>::iterator it = m_farmIdsTable.begin();
979       for (; it != m_farmIdsTable.end(); ++it) {
980         QString farmTaskId    = it->second;
981         QString batchesTaskId = it->first;
982 
983         TFarmTask *batchesTask = getTask(batchesTaskId);
984         TFarmTask farmTask     = *batchesTask;
985 
986         if (batchesTask) {
987           QString batchesTaskParentId = batchesTask->m_parentId;
988           m_controller->queryTaskInfo(farmTaskId, farmTask);
989           int chunkSize            = batchesTask->m_chunkSize;
990           *batchesTask             = farmTask;
991           batchesTask->m_chunkSize = chunkSize;
992           batchesTask->m_id        = batchesTaskId;
993           batchesTask->m_parentId  = batchesTaskParentId;
994         }
995       }
996     } catch (TException &e) {
997       ControllerFailureMsg(e).send();
998     }
999   }
1000 
1001   std::map<QString, TFarmTask *>::iterator jtor = m_tasks.begin();
1002   for (; jtor != m_tasks.end(); ++jtor) {
1003     TFarmTask *task = jtor->second;
1004     assert(task);
1005     if (task->m_status == Completed) m_farmIdsTable.erase(task->m_id);
1006   }
1007 
1008   assert(m_tasksTree);
1009   m_tasksTree->setupModelData();
1010 
1011   notify();
1012 }
1013 
1014 //------------------------------------------------------------------------------
1015 
1016 BatchesController *BatchesController::m_instance;
1017 
1018 //------------------------------------------------------------------------------
1019 //------------------------------------------------------------------------------
1020 
~Observer()1021 BatchesController::Observer::~Observer() {}
1022 
1023 //------------------------------------------------------------------------------
1024 //------------------------------------------------------------------------------
1025 //------------------------------------------------------------------------------
1026 //------------------------------------------------------------------------------
1027 
1028 namespace {
1029 
1030 class MyLocalController final : public TFarmController, public TFarmExecutor {
1031 public:
MyLocalController(int port)1032   MyLocalController(int port) : TFarmExecutor(port) {}
1033 
1034   QString execute(const std::vector<QString> &argv) override;
1035 
1036   QString addTask(const TFarmTask &task, bool suspended) override;
1037   void removeTask(const QString &id) override;
1038   void suspendTask(const QString &id) override;
1039   void activateTask(const QString &id) override;
1040   void restartTask(const QString &id) override;
1041 
1042   void getTasks(std::vector<QString> &tasks) override;
1043   void getTasks(const QString &parentId, std::vector<QString> &tasks) override;
1044   void getTasks(const QString &parentId,
1045                 std::vector<TaskShortInfo> &tasks) override;
1046 
1047   void queryTaskInfo(const QString &id, TFarmTask &task) override;
1048 
1049   void queryTaskShortInfo(const QString &id, QString &parentId, QString &name,
1050                           TaskState &status) override;
1051 
attachServer(const QString & name,const QString & addr,int port)1052   void attachServer(const QString &name, const QString &addr,
1053                     int port) override {
1054     assert(false);
1055   }
1056 
detachServer(const QString & name,const QString & addr,int port)1057   void detachServer(const QString &name, const QString &addr,
1058                     int port) override {
1059     assert(false);
1060   }
1061 
taskSubmissionError(const QString & taskId,int errCode)1062   void taskSubmissionError(const QString &taskId, int errCode) override {
1063     assert(false);
1064   }
1065 
1066   void taskProgress(const QString &taskId, int step, int stepCount,
1067                     int frameNumber, FrameState state) override;
1068 
1069   void taskCompleted(const QString &taskId, int exitCode) override;
1070 
getServers(std::vector<ServerIdentity> & servers)1071   void getServers(std::vector<ServerIdentity> &servers) override {
1072     assert(false);
1073   }
1074 
queryServerState2(const QString & id)1075   ServerState queryServerState2(const QString &id) override {
1076     assert(false);
1077     return ServerUnknown;
1078   }
1079 
queryServerInfo(const QString & id,ServerInfo & info)1080   void queryServerInfo(const QString &id, ServerInfo &info) override {
1081     assert(false);
1082   }
1083 
activateServer(const QString & id)1084   void activateServer(const QString &id) override { assert(false); }
1085 
deactivateServer(const QString & id,bool completeRunningTasks)1086   void deactivateServer(const QString &id, bool completeRunningTasks) override {
1087     assert(false);
1088   }
1089 
1090 private:
1091   std::map<QString, TFarmTask> m_tasks;
1092 };
1093 
execute(const std::vector<QString> & argv)1094 QString MyLocalController::execute(const std::vector<QString> &argv) {
1095   if (argv.size() > 5 && argv[0] == "taskProgress") {
1096     int step, stepCount, frameNumber;
1097 
1098     fromStr(step, argv[2]);
1099     fromStr(stepCount, argv[3]);
1100     fromStr(frameNumber, argv[4]);
1101 
1102     FrameState state;
1103     fromStr((int &)state, argv[5]);
1104     taskProgress(argv[1], step, stepCount, frameNumber, state);
1105     return "";
1106   } else if (argv.size() > 2 && argv[0] == "taskCompleted") {
1107     QString taskId = argv[1];
1108     int exitCode;
1109     fromStr(exitCode, argv[2]);
1110 
1111     taskCompleted(taskId, exitCode);
1112     return "";
1113   }
1114 
1115   return "";
1116 }
1117 
addTask(const TFarmTask & task,bool suspended)1118 QString MyLocalController::addTask(const TFarmTask &task, bool suspended) {
1119   assert(false);
1120   return "";
1121 }
1122 
removeTask(const QString & id)1123 void MyLocalController::removeTask(const QString &id) { assert(false); }
1124 
suspendTask(const QString & id)1125 void MyLocalController::suspendTask(const QString &id) { assert(false); }
1126 
activateTask(const QString & id)1127 void MyLocalController::activateTask(const QString &id) { assert(false); }
1128 
restartTask(const QString & id)1129 void MyLocalController::restartTask(const QString &id) { assert(false); }
1130 
getTasks(std::vector<QString> & tasks)1131 void MyLocalController::getTasks(std::vector<QString> &tasks) { assert(false); }
1132 
getTasks(const QString & parentId,std::vector<QString> & tasks)1133 void MyLocalController::getTasks(const QString &parentId,
1134                                  std::vector<QString> &tasks) {
1135   assert(false);
1136 }
1137 
getTasks(const QString & parentId,std::vector<TaskShortInfo> & tasks)1138 void MyLocalController::getTasks(const QString &parentId,
1139                                  std::vector<TaskShortInfo> &tasks) {
1140   assert(false);
1141 }
1142 
queryTaskInfo(const QString & id,TFarmTask & task)1143 void MyLocalController::queryTaskInfo(const QString &id, TFarmTask &task) {
1144   assert(false);
1145 }
1146 
queryTaskShortInfo(const QString & id,QString & parentId,QString & name,TaskState & status)1147 void MyLocalController::queryTaskShortInfo(const QString &id, QString &parentId,
1148                                            QString &name, TaskState &status) {
1149   assert(false);
1150 }
1151 
taskProgress(const QString & taskId,int step,int stepCount,int frameNumber,FrameState state)1152 void MyLocalController::taskProgress(const QString &taskId, int step,
1153                                      int stepCount, int frameNumber,
1154                                      FrameState state) {
1155   TFarmTask *task = BatchesController::instance()->getTask(taskId);
1156   assert(task);
1157 
1158   if (state == FrameDone)
1159     ++task->m_successfullSteps;
1160   else
1161     ++task->m_failedSteps;
1162 
1163   if (task->m_parentId != "") {
1164     TFarmTask *taskParent =
1165         BatchesController::instance()->getTask(task->m_parentId);
1166     assert(taskParent);
1167     if (state == FrameDone)
1168       ++taskParent->m_successfullSteps;
1169     else
1170       ++taskParent->m_failedSteps;
1171   }
1172 
1173   NotifyMessage().send();
1174 }
1175 
taskCompleted(const QString & taskId,int exitCode)1176 void MyLocalController::taskCompleted(const QString &taskId, int exitCode) {}
1177 
1178 class MyLocalControllerController final : public TThread::Runnable {
1179   MyLocalController *m_controller;
1180 
1181 public:
MyLocalControllerController(int port)1182   MyLocalControllerController(int port)
1183       : m_controller(new MyLocalController(port)) {
1184     TThread::Executor executor;
1185     executor.addTask(this);
1186     connect(this, SIGNAL(finished(TThread::RunnableP)), this,
1187             SLOT(onFinished(TThread::RunnableP)));
1188     connect(this, SIGNAL(exception(TThread::RunnableP)), this,
1189             SLOT(onFinished(TThread::RunnableP)));
1190   }
1191 
run()1192   void run() override { m_controller->run(); }
1193 
1194 public slots:
1195 
onFinished(TThread::RunnableP thisTask)1196   void onFinished(TThread::RunnableP thisTask) override {
1197     BatchesController::instance()->notify();
1198   }
1199 };
1200 
1201 }  // anonymous namespace
1202 
1203 //------------------------------------------------------------------------------
1204 //------------------------------------------------------------------------------
1205 
instance()1206 BatchesController *BatchesController::instance() {
1207   if (!m_instance) {
1208     m_instance = new BatchesController;
1209 
1210     // scandisce un range di porte fino a troverne una libera da utilizzare come
1211     // porta del local controller
1212 
1213     const int portRangeSize = 100;
1214 
1215     int i          = 0;
1216     int portNumber = cPortNumber;
1217     for (; i < portRangeSize; ++i, ++portNumber) {
1218       bool portBusy = TFarmStuff::testConnection("localhost", portNumber);
1219       if (!portBusy) break;
1220     }
1221 
1222     if (i < portRangeSize) {
1223       m_instance->m_localControllerPortNumber = portNumber;
1224 
1225       //      static MyLocalControllerController *TheLocalController =
1226       new MyLocalControllerController(portNumber);
1227     }
1228   }
1229   return m_instance;
1230 }
1231