1 /* 2 Copyright 2016-2017 Skytechnology sp. z o.o. 3 4 This file is part of LizardFS. 5 6 LizardFS is free software: you can redistribute it and/or modify 7 it under the terms of the GNU General Public License as published by 8 the Free Software Foundation, version 3. 9 10 LizardFS is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 GNU General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with LizardFS. If not, see <http://www.gnu.org/licenses/>. 17 */ 18 19 #pragma once 20 21 #include "common/platform.h" 22 23 #include <functional> 24 #include <list> 25 #include <memory> 26 #include <string> 27 28 #include "common/intrusive_list.h" 29 #include "common/job_info.h" 30 31 /*! \brief Implementation of class for storing and executing tasks 32 * 33 * This class is responsible for managing execution of tasks. 34 * Submitting a task creates a new Job object. Job represents 35 * the task itself + all subtasks it creates during its execution. 36 */ 37 class TaskManager { 38 public: 39 /*! \brief Class representing a single task to be executed by Task Manager*/ 40 class Task : public intrusive_list_base_hook { 41 public: ~Task()42 virtual ~Task() { 43 } 44 /*! \brief Pure virtual function that represents execution of task. 45 * \param ts current time stamp. 46 * \param work_queue a list to which this task adds newly created tasks. 47 */ 48 virtual int execute(uint32_t ts, intrusive_list<Task> &work_queue) = 0; 49 50 virtual bool isFinished() const = 0; 51 }; 52 53 typedef typename intrusive_list<Task>::iterator TaskIterator; 54 55 /*! \brief Class representing the original task and all subtasks it created during execution*/ 56 class Job { 57 public: Job(uint32_t id,const std::string & description)58 Job(uint32_t id, const std::string &description) : 59 id_(id), description_(description), 60 finish_callback_(), tasks_() { 61 } 62 Job(Job && other)63 Job(Job &&other) : id_(std::move(other.id_)), 64 description_(std::move(other.description_)), 65 finish_callback_(std::move(other.finish_callback_)), 66 tasks_(std::move(other.tasks_)) { 67 } 68 ~Job()69 ~Job() { 70 tasks_.clear_and_dispose([](Task *ptr) { delete ptr; }); 71 } 72 73 void finalize(int status); 74 75 /*! \brief Function finalizes processing of single task. 76 * \param itask iterator to a task that was executed. 77 * \param status indicates whether task execution was successful. 78 */ 79 void finalizeTask(TaskIterator itask, int status); 80 81 /*! \brief Function executes and finalizes the first task from the list of tasks. 82 * \param ts current time stamp. 83 */ 84 void processTask(uint32_t ts); 85 setFinishCallback(const std::function<void (int)> & finish_callback)86 void setFinishCallback(const std::function<void(int)> &finish_callback) { 87 finish_callback_ = finish_callback; 88 } 89 setFinishCallback(std::function<void (int)> && finish_callback)90 void setFinishCallback(std::function<void(int)> &&finish_callback) { 91 finish_callback_ = std::move(finish_callback); 92 } 93 isFinished()94 bool isFinished() const { 95 return tasks_.empty(); 96 } 97 addTask(Task * task)98 void addTask(Task *task) { 99 tasks_.push_back(*task); 100 } 101 getId()102 uint32_t getId() { 103 return id_; 104 } 105 106 JobInfo getInfo() const; 107 108 private: 109 uint32_t id_; 110 std::string description_; 111 std::function<void(int)> finish_callback_; /*!< Callback function called when all tasks 112 that belong to this Job are done. */ 113 114 intrusive_list<Task> tasks_; /*!< List of tasks that belong to this Job*/ 115 }; 116 117 typedef typename std::list<Job> JobContainer; 118 typedef typename JobContainer::iterator JobIterator; 119 typedef typename std::vector<JobInfo> JobsInfoContainer; 120 121 public: TaskManager()122 TaskManager() : job_list_(), next_job_id_(0) { 123 } 124 125 /*! \brief Submit task to be enqueued and executed by TaskManager. 126 * 127 * Submitting task creates Job object which contains data of original 128 * task and all tasks that were created during execution. 129 * \param job_id id of the created Job 130 * \param ts current time stamp. 131 * \param initial_batch_size initial number of tasks to be processed 132 * before putting the Job on the list 133 * \param task Task to be submitted. 134 * \param callback Callback function that will be called when all the 135 * tasks related to this sumbmission are finished. 136 * This function is called only if submitted work is not 137 * finished after completing the 'initial_batch_size' 138 * number of tasks. 139 * \return Value representing the status. 140 */ 141 int submitTask(uint32_t job_id, uint32_t ts, int initial_batch_size, Task *task, 142 const std::string &description, const std::function<void(int)> &callback); 143 144 /*! \brief Submit task to be enqueued and executed by TaskManager. 145 * 146 * Calls submitTask declared above without specifying job_id. 147 * This version is used by Tasks which do not support cancelling 148 * their execution. 149 */ 150 int submitTask(uint32_t ts, int initial_batch_size, Task *task, 151 const std::string &description, const std::function<void(int)> &callback); 152 153 /*! \brief Iterate over Jobs and execute tasks. 154 * 155 * This function goes through the list of Jobs over and over again, 156 * executing one task each time it processes a Job. 157 * \param ts current time stamp. 158 * \param number_of_tasks maximum number of tasks to be processed. 159 */ 160 void processJobs(uint32_t ts, int number_of_tasks); 161 162 /*! \brief Get information about all currently executed Job. */ 163 JobsInfoContainer getCurrentJobsInfo() const; 164 165 /*! \brief Stop execution of a Job specified by given id. */ 166 bool cancelJob(uint32_t job_id); 167 workAvailable()168 bool workAvailable() const { 169 return !job_list_.empty(); 170 } 171 reserveJobId()172 uint32_t reserveJobId() { 173 return next_job_id_++; 174 } 175 private: 176 JobContainer job_list_; /*!< List with Jobs to execute. */ 177 uint32_t next_job_id_; 178 }; 179