1 // Copyright (c) 2018 IIS (The Internet Foundation in Sweden) 2 // Written by Göran Andersson <initgoran@gmail.com> 3 4 // This class manages Task objects and their timers. It also manages 5 // all network connections through an Engine object. The Engine is an 6 // "inner event loop" used to manage low-level network events. 7 // 8 // Your code must create an EventLoop object, add one or more Task objects 9 // to it, and then run the event loop, either "forever" using the method 10 // runUntilComplete() 11 // or by regularly calling the method 12 // run(double timeout_s) 13 // 14 15 #pragma once 16 17 #include <set> 18 #include <map> 19 20 #include "logger.h" 21 #include "engine.h" 22 #ifdef USE_THREADS 23 #include "msgqueue.h" 24 #endif 25 26 class Task; 27 class SocketConnection; 28 class ServerSocket; 29 class WorkerProcess; 30 31 class EventLoop : public Logger { 32 public: 33 EventLoop(std::string log_label = "MainLoop") : Logger(log_label)34 Logger(log_label), 35 engine("NetworkEngine"), 36 name(log_label) { 37 #ifdef USE_THREADS 38 do_init(nullptr); 39 #else 40 do_init(); 41 #endif 42 } 43 44 void removeAllTasks(); 45 46 ~EventLoop(); 47 48 // Add a task to be managed by the EventLoop. The value of the task 49 // parameter must be an object of a subclass of Task. You _must_ create 50 // the object with new; it cannot be an object on the stack. 51 // The EventLoop will take ownership of the object and will delete it 52 // when the task has finished. Before it is deleted, the parent's 53 // taskFinished method will be called unless the parent is nullptr. 54 void addTask(Task *task, Task *parent = nullptr); 55 56 // Run for at most timeout_s seconds. 57 // Returns false if all done, otherwise true: 58 bool run(double timeout_s); 59 60 // Run until all task are done. 61 void runUntilComplete(); 62 63 #ifdef USE_THREADS 64 // Creates an EventLoop object that runs the task until it's finished. 65 // You cannot use this if you have created your own EventLoop object. 66 67 // The "parent" parameter is used if the main thread (the parent) should 68 // be notified when the thread running the task is finished. 69 static void runTask(Task *task, const std::string &name = "MainLoop", 70 std::ostream *log_file = nullptr, 71 EventLoop *parent = nullptr); 72 #else 73 static void runTask(Task *task, const std::string &name = "MainLoop", 74 std::ostream *log_file = nullptr); 75 #endif 76 77 void waitForThreadsToFinish(); 78 79 // Remove the given task 80 void abortTask(Task *task); 81 82 // Remove all tasks abort()83 void abort() { 84 interrupt(); 85 do_abort = true; 86 } 87 88 // Get all tasks ith the given parent 89 void getChildTasks(std::set<Task *> &tset, Task *parent) const; 90 91 void abortChildTasks(Task *parent); 92 93 // Restart idle connections: 94 void wakeUpTask(Task *t); 95 wakeUpConnection(SocketConnection * s)96 bool wakeUpConnection(SocketConnection *s) { 97 return engine.wakeUpConnection(s); 98 } 99 cancelConnection(SocketConnection * s)100 void cancelConnection(SocketConnection *s) { 101 engine.cancelConnection(s); 102 } 103 104 std::set<Socket *> findConnByTask(const Task *t) const; 105 106 // Return true if conn still exists: isActive(const Socket * conn)107 bool isActive(const Socket *conn) const { 108 return engine.connActive(conn); 109 } 110 111 // Remove previous timer, run after s seconds instead. 112 // If s = 0, run timer immediately. If s < 0, remove timer. 113 void resetTimer(Task *task, double s); 114 115 // Create a new socket connection, and add it to the loop. 116 // Returns false (and deletes conn) on failure. 117 // On success, returns true and calls connAdded on owner task. 118 // A connection to the server will be initiated. When connected, the 119 // connected() method will be called on conn to get initial state. 120 bool addConnection(SocketConnection *conn); 121 122 // Use this if conn contains a socket that has already been connected. 123 // Returns false (and deletes conn) on failure. 124 // On success, returns true and calls connAdded on owner task, 125 // then calls connected() on conn to get initial state. 126 bool addConnected(SocketConnection *conn); 127 128 // Returns false (and deletes conn) on failure. 129 // On success, returns true and calls serverAdded on owner task. 130 bool addServer(ServerSocket *conn); 131 #ifdef USE_GNUTLS tlsSetKey(ServerSocket * conn,const std::string & crt_path,const std::string & key_path,const std::string & password)132 bool tlsSetKey(ServerSocket *conn, const std::string &crt_path, 133 const std::string &key_path, const std::string &password) { 134 return engine.tlsSetKey(conn, crt_path, key_path, password); 135 } setCABundle(const std::string & path)136 bool setCABundle(const std::string &path) { 137 return engine.setCABundle(path); 138 } 139 #endif 140 141 // Returns true if task is running 142 bool running(Task *task); 143 144 void taskDeleted(Task *task); 145 146 // Call this to make the network engine yield control 147 // to me (the task supervisor): interrupt()148 static void interrupt() { 149 Engine::yield(); 150 } 151 152 #ifdef USE_THREADS 153 // Create a new thread and run task in its own loop in that thread. 154 void spawnThread(Task *task, const std::string &name="ThreadLoop", 155 std::ostream *log_file = nullptr, 156 Task *parent = nullptr); 157 void waitForThreadsToComplete(); 158 #endif 159 #ifdef _WIN32 externalCommand(Task * owner,const char * const argv[])160 int externalCommand(Task *owner, const char *const argv[]) { 161 // Not implemented 162 exit(1); 163 } 164 #else 165 // Asynchronously execute an external command. 166 // Return false on immediate failure. 167 int externalCommand(Task *owner, const char *const argv[]); 168 static void daemonize(); 169 170 // Create a child process. Return child's PID. Channels can be 171 // used to pass sockets and messages between parent and child. 172 WorkerProcess *createWorker(Task *parent, std::ostream *log_file, 173 unsigned int channels, unsigned int wno); 174 175 WorkerProcess *createWorker(Task *parent, const std::string &log_file_name, 176 unsigned int channels, unsigned int wno); 177 178 // Send signal to all child processes 179 void killChildProcesses(int signum); 180 setLogFilename(const std::string & filename)181 static void setLogFilename(const std::string &filename) { 182 openFileOnSIGHUP = filename; 183 } 184 #endif 185 notifyTaskFinished(Task * task)186 void notifyTaskFinished(Task *task) { 187 auto ret = finishedTasks.insert(task); 188 if (ret.second) 189 engine.yield(); 190 } notifyTaskMessage(Task * task)191 void notifyTaskMessage(Task *task) { 192 auto ret = messageTasks.insert(task); 193 if (ret.second) 194 engine.yield(); 195 } 196 aborted()197 bool aborted() const { 198 return do_abort; 199 } 200 201 void addSignalHandler(int signum, void (*handler)(int, EventLoop &)); 202 static void signalHandler(int signum); 203 204 // Enable events from task "from" to task "to". I.e. "from" will be able to 205 // call executeHandler with "to" as a parameter. If "to" dies before 206 // "from", "from" will be notified through a call to taskFinished. 207 // Will return false unless both tasks still exist. 208 bool startObserving(Task *from, Task *to); 209 210 // Return true if observer is observing task. isObserving(Task * observer,Task * task)211 bool isObserving(Task *observer, Task *task) const { 212 auto p = observed_by.find(observer); 213 return p != observed_by.end() && 214 p->second.find(task) != p->second.end(); 215 } 216 217 private: 218 #ifndef _WIN32 219 #ifdef USE_THREADS 220 thread_local 221 #endif 222 static std::map<int, int> terminatedPIDs; 223 std::map<int, Task *> pidOwner; 224 #ifdef USE_THREADS 225 thread_local 226 #endif 227 static std::string openFileOnSIGHUP; 228 #endif 229 #ifdef USE_THREADS 230 void do_init(EventLoop *parent); 231 EventLoop(std::string log_label,EventLoop * parent)232 EventLoop(std::string log_label, EventLoop *parent) : 233 Logger(log_label), 234 engine("NetworkEngine"), 235 name(log_label) { 236 do_init(parent); 237 } 238 std::map<Task *, std::thread> threads; 239 static MsgQueue<Task *> finished_threads; 240 std::map<Task *, Task *> threadTaskObserver; 241 void collect_thread(Task *t); 242 EventLoop *parent_loop; 243 #else 244 void do_init(); 245 #endif 246 247 #ifdef USE_THREADS 248 thread_local 249 #endif 250 static volatile int got_signal; 251 #ifdef USE_THREADS 252 thread_local 253 #endif 254 static volatile int terminatedPIDtmp[100]; 255 256 void check_finished(); 257 Task *nextTimerToExecute(); 258 void _removeTimer(Task *task); 259 void _removeTask(Task *task, bool killed = false); 260 Engine engine; 261 262 // Map each task to its parent (or, if it has no parent, to nullptr) 263 std::map<Task *, Task *> tasks; 264 265 // These are used to keep track of "observation" between tasks. 266 // E.g. a parent task is observing its children. 267 std::map<Task *, std::set<Task *> > observed_by; 268 std::map<Task *, std::set<Task *> > observing; 269 270 std::set<Task *> finishedTasks, messageTasks; 271 std::multimap<int, void (*)(int, EventLoop &)> userSignalHandler; 272 std::multimap<TimePoint, Task *> timer_queue; 273 std::string name; 274 bool do_abort = false; 275 }; 276