1 /*
2    Copyright 2016-2017 Skytechnology sp. z o.o.
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17 */
18 
19 #pragma once
20 
21 #include "common/platform.h"
22 
23 #include <functional>
24 #include <list>
25 #include <memory>
26 #include <string>
27 
28 #include "common/intrusive_list.h"
29 #include "common/job_info.h"
30 
31 /*! \brief Implementation of class for storing and executing tasks
32  *
33  * This class is responsible for managing execution of tasks.
34  * Submitting a task creates a new Job object. Job represents
35  * the task itself + all subtasks it creates during its execution.
36  */
37 class TaskManager {
38 public:
39 	/*! \brief Class representing a single task to be executed by Task Manager*/
40 	class Task : public intrusive_list_base_hook {
41 	public:
~Task()42 		virtual ~Task() {
43 		}
44 		/*! \brief Pure virtual function that represents execution of task.
45 		 * \param ts current time stamp.
46 		 * \param work_queue a list to which this task adds newly created tasks.
47 		 */
48 		virtual int execute(uint32_t ts, intrusive_list<Task> &work_queue) = 0;
49 
50 		virtual bool isFinished() const = 0;
51 	};
52 
53 	typedef typename intrusive_list<Task>::iterator TaskIterator;
54 
55 	/*! \brief Class representing the original task and all subtasks it created during execution*/
56 	class Job {
57 	public:
Job(uint32_t id,const std::string & description)58 		Job(uint32_t id, const std::string &description) :
59 		    id_(id), description_(description),
60 		    finish_callback_(), tasks_() {
61 		}
62 
Job(Job && other)63 		Job(Job &&other) : id_(std::move(other.id_)),
64 				   description_(std::move(other.description_)),
65 				   finish_callback_(std::move(other.finish_callback_)),
66 				   tasks_(std::move(other.tasks_)) {
67 		}
68 
~Job()69 		~Job() {
70 			tasks_.clear_and_dispose([](Task *ptr) { delete ptr; });
71 		}
72 
73 		void finalize(int status);
74 
75 		/*! \brief Function finalizes processing of single task.
76 		 * \param itask iterator to a task that was executed.
77 		 * \param status indicates whether task execution was successful.
78 		 */
79 		void finalizeTask(TaskIterator itask, int status);
80 
81 		/*! \brief Function executes and finalizes the first task from the list of tasks.
82 		 * \param ts current time stamp.
83 		 */
84 		void processTask(uint32_t ts);
85 
setFinishCallback(const std::function<void (int)> & finish_callback)86 		void setFinishCallback(const std::function<void(int)> &finish_callback) {
87 			finish_callback_ = finish_callback;
88 		}
89 
setFinishCallback(std::function<void (int)> && finish_callback)90 		void setFinishCallback(std::function<void(int)> &&finish_callback) {
91 			finish_callback_ = std::move(finish_callback);
92 		}
93 
isFinished()94 		bool isFinished() const {
95 			return tasks_.empty();
96 		}
97 
addTask(Task * task)98 		void addTask(Task *task) {
99 			tasks_.push_back(*task);
100 		}
101 
getId()102 		uint32_t getId() {
103 			return id_;
104 		}
105 
106 		JobInfo getInfo() const;
107 
108 	private:
109 		uint32_t id_;
110 		std::string description_;
111 		std::function<void(int)> finish_callback_; /*!< Callback function called when all tasks
112 		                                                that belong to this Job are done. */
113 
114 		intrusive_list<Task> tasks_; /*!< List of tasks that belong to this Job*/
115 	};
116 
117 	typedef typename std::list<Job> JobContainer;
118 	typedef typename JobContainer::iterator JobIterator;
119 	typedef typename std::vector<JobInfo> JobsInfoContainer;
120 
121 public:
TaskManager()122 	TaskManager() : job_list_(), next_job_id_(0) {
123 	}
124 
125 	/*! \brief Submit task to be enqueued and executed by TaskManager.
126 	 *
127 	 * Submitting task creates Job object which contains data of original
128 	 * task and all tasks that were created during execution.
129 	 * \param job_id id of the created Job
130 	 * \param ts current time stamp.
131 	 * \param initial_batch_size initial number of tasks to be processed
132 	 *                           before putting the Job on the list
133 	 * \param task Task to be submitted.
134 	 * \param callback Callback function that will be called when all the
135 	 *                 tasks related to this sumbmission are finished.
136 	 *                 This function is called only if submitted work is not
137 	 *                 finished after completing the 'initial_batch_size'
138 	 *                 number of tasks.
139 	 * \return Value representing the status.
140 	 */
141 	int submitTask(uint32_t job_id, uint32_t ts, int initial_batch_size, Task *task,
142 		       const std::string &description, const std::function<void(int)> &callback);
143 
144 	/*! \brief Submit task to be enqueued and executed by TaskManager.
145 	 *
146 	 * Calls submitTask declared above without specifying job_id.
147 	 * This version is used by Tasks which do not support cancelling
148 	 * their execution.
149 	 */
150 	int submitTask(uint32_t ts, int initial_batch_size, Task *task,
151 		       const std::string &description, const std::function<void(int)> &callback);
152 
153 	/*! \brief Iterate over Jobs and execute tasks.
154 	 *
155 	 * This function goes through the list of Jobs over and over again,
156 	 * executing one task each time it processes a Job.
157 	 * \param ts current time stamp.
158 	 * \param number_of_tasks maximum number of tasks to be processed.
159 	 */
160 	void processJobs(uint32_t ts, int number_of_tasks);
161 
162 	/*! \brief Get information about all currently executed Job. */
163 	JobsInfoContainer getCurrentJobsInfo() const;
164 
165 	/*! \brief Stop execution of a Job specified by given id. */
166 	bool cancelJob(uint32_t job_id);
167 
workAvailable()168 	bool workAvailable() const {
169 		return !job_list_.empty();
170 	}
171 
reserveJobId()172 	uint32_t reserveJobId() {
173 		return next_job_id_++;
174 	}
175 private:
176 	JobContainer job_list_; /*!< List with Jobs to execute. */
177 	uint32_t next_job_id_;
178 };
179