1 /*
2 Copyright 2016-2017 Skytechnology sp. z o.o.
3
4 This file is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20
21 #include "master/task_manager.h"
22
23 #include "common/loop_watchdog.h"
24 #include "master/filesystem_metadata.h"
25 #include "master/filesystem_node.h"
26 #include "protocol/MFSCommunication.h"
27
finalize(int status)28 void TaskManager::Job::finalize(int status) {
29 if (finish_callback_) {
30 finish_callback_(status);
31 }
32 tasks_.clear_and_dispose([](Task *ptr) { delete ptr; });
33 }
34
finalizeTask(TaskIterator itask,int status)35 void TaskManager::Job::finalizeTask(TaskIterator itask, int status) {
36 if (status || (itask->isFinished() && tasks_.size() <= 1)) {
37 finalize(status);
38 } else if (itask->isFinished()) {
39 tasks_.erase_and_dispose(itask, [](Task *ptr) { delete ptr; });
40 }
41 }
42
processTask(uint32_t ts)43 void TaskManager::Job::processTask(uint32_t ts) {
44 if (!tasks_.empty()) {
45 auto i_front = tasks_.begin();
46 int status = i_front->execute(ts, tasks_);
47 finalizeTask(i_front, status);
48 }
49 }
50
getInfo() const51 JobInfo TaskManager::Job::getInfo() const {
52 return { id_, description_ };
53 }
54
submitTask(uint32_t taskid,uint32_t ts,int initial_batch_size,Task * task,const std::string & description,const std::function<void (int)> & callback)55 int TaskManager::submitTask(uint32_t taskid, uint32_t ts, int initial_batch_size, Task *task,
56 const std::string &description, const std::function<void(int)> &callback) {
57 Job new_job(taskid, description);
58
59 int done = 0;
60 int status = LIZARDFS_STATUS_OK;
61
62 new_job.setFinishCallback([&done, &status](int code) {
63 status = code;
64 done = 1;
65 });
66
67 new_job.addTask(task);
68 for (int i = 0; i < initial_batch_size; i++) {
69 new_job.processTask(ts);
70 if (new_job.isFinished()) {
71 break;
72 }
73 }
74
75 if (done) {
76 assert(new_job.isFinished());
77 return status;
78 }
79
80 new_job.setFinishCallback(callback);
81 job_list_.push_back(std::move(new_job));
82
83 return LIZARDFS_ERROR_WAITING;
84 }
85
submitTask(uint32_t ts,int initial_batch_size,Task * task,const std::string & description,const std::function<void (int)> & callback)86 int TaskManager::submitTask(uint32_t ts, int initial_batch_size, Task *task,
87 const std::string &description, const std::function<void(int)> &callback) {
88 return submitTask(reserveJobId(), ts, initial_batch_size, task, description, callback);
89 }
90
processJobs(uint32_t ts,int number_of_tasks)91 void TaskManager::processJobs(uint32_t ts, int number_of_tasks) {
92 SignalLoopWatchdog watchdog;
93 JobIterator it = job_list_.begin();
94 watchdog.start();
95 for (int i = 0; i < number_of_tasks; ++i) {
96 if (it == job_list_.end() || watchdog.expired()) {
97 break;
98 }
99 if (it->isFinished()) {
100 it = job_list_.erase(it);
101 } else {
102 it->processTask(ts);
103 ++it;
104 }
105 if (it == job_list_.end()) {
106 it = job_list_.begin();
107 }
108 }
109 }
110
getCurrentJobsInfo() const111 TaskManager::JobsInfoContainer TaskManager::getCurrentJobsInfo() const {
112 JobsInfoContainer info;
113 info.reserve(job_list_.size());
114 for (const Job &j : job_list_) {
115 info.push_back(j.getInfo());
116 }
117 return info;
118 }
119
cancelJob(uint32_t job_id)120 bool TaskManager::cancelJob(uint32_t job_id) {
121 for (auto it = job_list_.begin(); it != job_list_.end(); ++it) {
122 if (it->getId() == job_id) {
123 it->finalize(LIZARDFS_ERROR_NOTDONE);
124 return true;
125 }
126 }
127 return false;
128 }
129