1 /*
2    Copyright 2016-2017 Skytechnology sp. z o.o.
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17 */
18 
19 #include "common/platform.h"
20 
21 #include "master/task_manager.h"
22 
23 #include "common/loop_watchdog.h"
24 #include "master/filesystem_metadata.h"
25 #include "master/filesystem_node.h"
26 #include "protocol/MFSCommunication.h"
27 
finalize(int status)28 void TaskManager::Job::finalize(int status) {
29 	if (finish_callback_) {
30 		finish_callback_(status);
31 	}
32 	tasks_.clear_and_dispose([](Task *ptr) { delete ptr; });
33 }
34 
finalizeTask(TaskIterator itask,int status)35 void TaskManager::Job::finalizeTask(TaskIterator itask, int status) {
36 	if (status || (itask->isFinished() && tasks_.size() <= 1)) {
37 		finalize(status);
38 	} else if (itask->isFinished()) {
39 		tasks_.erase_and_dispose(itask, [](Task *ptr) { delete ptr; });
40 	}
41 }
42 
processTask(uint32_t ts)43 void TaskManager::Job::processTask(uint32_t ts) {
44 	if (!tasks_.empty()) {
45 		auto i_front = tasks_.begin();
46 		int status = i_front->execute(ts, tasks_);
47 		finalizeTask(i_front, status);
48 	}
49 }
50 
getInfo() const51 JobInfo TaskManager::Job::getInfo() const {
52 	return { id_, description_ };
53 }
54 
submitTask(uint32_t taskid,uint32_t ts,int initial_batch_size,Task * task,const std::string & description,const std::function<void (int)> & callback)55 int TaskManager::submitTask(uint32_t taskid, uint32_t ts, int initial_batch_size, Task *task,
56 	                    const std::string &description, const std::function<void(int)> &callback) {
57 	Job new_job(taskid, description);
58 
59 	int done = 0;
60 	int status = LIZARDFS_STATUS_OK;
61 
62 	new_job.setFinishCallback([&done, &status](int code) {
63 		status = code;
64 		done = 1;
65 	});
66 
67 	new_job.addTask(task);
68 	for (int i = 0; i < initial_batch_size; i++) {
69 		new_job.processTask(ts);
70 		if (new_job.isFinished()) {
71 			break;
72 		}
73 	}
74 
75 	if (done) {
76 		assert(new_job.isFinished());
77 		return status;
78 	}
79 
80 	new_job.setFinishCallback(callback);
81 	job_list_.push_back(std::move(new_job));
82 
83 	return LIZARDFS_ERROR_WAITING;
84 }
85 
submitTask(uint32_t ts,int initial_batch_size,Task * task,const std::string & description,const std::function<void (int)> & callback)86 int TaskManager::submitTask(uint32_t ts, int initial_batch_size, Task *task,
87 	                    const std::string &description, const std::function<void(int)> &callback) {
88 	return submitTask(reserveJobId(), ts, initial_batch_size, task, description, callback);
89 }
90 
processJobs(uint32_t ts,int number_of_tasks)91 void TaskManager::processJobs(uint32_t ts, int number_of_tasks) {
92 	SignalLoopWatchdog watchdog;
93 	JobIterator it = job_list_.begin();
94 	watchdog.start();
95 	for (int i = 0; i < number_of_tasks; ++i) {
96 		if (it == job_list_.end() || watchdog.expired()) {
97 			break;
98 		}
99 		if (it->isFinished()) {
100 			it = job_list_.erase(it);
101 		} else {
102 			it->processTask(ts);
103 			++it;
104 		}
105 		if (it == job_list_.end()) {
106 			it = job_list_.begin();
107 		}
108 	}
109 }
110 
getCurrentJobsInfo() const111 TaskManager::JobsInfoContainer TaskManager::getCurrentJobsInfo() const {
112 	JobsInfoContainer info;
113 	info.reserve(job_list_.size());
114 	for (const Job &j : job_list_) {
115 		info.push_back(j.getInfo());
116 	}
117 	return info;
118 }
119 
cancelJob(uint32_t job_id)120 bool TaskManager::cancelJob(uint32_t job_id) {
121 	for (auto it = job_list_.begin(); it != job_list_.end(); ++it) {
122 		if (it->getId() == job_id) {
123 			it->finalize(LIZARDFS_ERROR_NOTDONE);
124 			return true;
125 		}
126 	}
127 	return false;
128 }
129