1 // Copyright (c) 2018 IIS (The Internet Foundation in Sweden)
2 // Written by Göran Andersson <initgoran@gmail.com>
3 
4 // This class manages Task objects and their timers. It also manages
5 // all network connections through an Engine object. The Engine is an
6 // "inner event loop" used to manage low-level network events.
7 //
8 // Your code must create an EventLoop object, add one or more Task objects
9 // to it, and then run the event loop, either "forever" using the method
10 //     runUntilComplete()
11 // or by regularly calling the method
12 //     run(double timeout_s)
13 //
14 
15 #pragma once
16 
17 #include <set>
18 #include <map>
19 
20 #include "logger.h"
21 #include "engine.h"
22 #ifdef USE_THREADS
23 #include "msgqueue.h"
24 #endif
25 
26 class Task;
27 class SocketConnection;
28 class ServerSocket;
29 class WorkerProcess;
30 
31 class EventLoop : public Logger {
32 public:
33     EventLoop(std::string log_label = "MainLoop") :
Logger(log_label)34         Logger(log_label),
35         engine("NetworkEngine"),
36         name(log_label) {
37 #ifdef USE_THREADS
38         do_init(nullptr);
39 #else
40         do_init();
41 #endif
42     }
43 
44     void removeAllTasks();
45 
46     ~EventLoop();
47 
48     // Add a task to be managed by the EventLoop. The value of the task
49     // parameter must be an object of a subclass of Task. You _must_ create
50     // the object with new; it cannot be an object on the stack.
51     // The EventLoop will take ownership of the object and will delete it
52     // when the task has finished. Before it is deleted, the parent's
53     // taskFinished method will be called unless the parent is nullptr.
54     void addTask(Task *task, Task *parent = nullptr);
55 
56     // Run for at most timeout_s seconds.
57     // Returns false if all done, otherwise true:
58     bool run(double timeout_s);
59 
60     // Run until all task are done.
61     void runUntilComplete();
62 
63 #ifdef USE_THREADS
64     // Creates an EventLoop object that runs the task until it's finished.
65     // You cannot use this if you have created your own EventLoop object.
66 
67     // The "parent" parameter is used if the main thread (the parent) should
68     // be notified when the thread running the task is finished.
69     static void runTask(Task *task, const std::string &name = "MainLoop",
70                         std::ostream *log_file = nullptr,
71                         EventLoop *parent = nullptr);
72 #else
73     static void runTask(Task *task, const std::string &name = "MainLoop",
74                         std::ostream *log_file = nullptr);
75 #endif
76 
77     void waitForThreadsToFinish();
78 
79     // Remove the given task
80     void abortTask(Task *task);
81 
82     // Remove all tasks
abort()83     void abort() {
84         interrupt();
85         do_abort = true;
86     }
87 
88     // Get all tasks ith the given parent
89     void getChildTasks(std::set<Task *> &tset, Task *parent) const;
90 
91     void abortChildTasks(Task *parent);
92 
93     // Restart idle connections:
94     void wakeUpTask(Task *t);
95 
wakeUpConnection(SocketConnection * s)96     bool wakeUpConnection(SocketConnection *s) {
97         return engine.wakeUpConnection(s);
98     }
99 
cancelConnection(SocketConnection * s)100     void cancelConnection(SocketConnection *s) {
101         engine.cancelConnection(s);
102     }
103 
104     std::set<Socket *> findConnByTask(const Task *t) const;
105 
106     // Return true if conn still exists:
isActive(const Socket * conn)107     bool isActive(const Socket *conn) const {
108         return engine.connActive(conn);
109     }
110 
111     // Remove previous timer, run after s seconds instead.
112     // If s = 0, run timer immediately. If s < 0, remove timer.
113     void resetTimer(Task *task, double s);
114 
115     // Create a new socket connection, and add it to the loop.
116     // Returns false (and deletes conn) on failure.
117     // On success, returns true and calls connAdded on owner task.
118     // A connection to the server will be initiated. When connected, the
119     // connected() method will be called on conn to get initial state.
120     bool addConnection(SocketConnection *conn);
121 
122     // Use this if conn contains a socket that has already been connected.
123     // Returns false (and deletes conn) on failure.
124     // On success, returns true and calls connAdded on owner task,
125     // then calls connected() on conn to get initial state.
126     bool addConnected(SocketConnection *conn);
127 
128     // Returns false (and deletes conn) on failure.
129     // On success, returns true and calls serverAdded on owner task.
130     bool addServer(ServerSocket *conn);
131 #ifdef USE_GNUTLS
tlsSetKey(ServerSocket * conn,const std::string & crt_path,const std::string & key_path,const std::string & password)132     bool tlsSetKey(ServerSocket *conn, const std::string &crt_path,
133                    const std::string &key_path, const std::string &password) {
134         return engine.tlsSetKey(conn, crt_path, key_path, password);
135     }
setCABundle(const std::string & path)136     bool setCABundle(const std::string &path) {
137         return engine.setCABundle(path);
138     }
139 #endif
140 
141     // Returns true if task is running
142     bool running(Task *task);
143 
144     void taskDeleted(Task *task);
145 
146     // Call this to make the network engine yield control
147     // to me (the task supervisor):
interrupt()148     static void interrupt() {
149         Engine::yield();
150     }
151 
152 #ifdef USE_THREADS
153     // Create a new thread and run task in its own loop in that thread.
154     void spawnThread(Task *task, const std::string &name="ThreadLoop",
155                      std::ostream *log_file = nullptr,
156                      Task *parent = nullptr);
157     void waitForThreadsToComplete();
158 #endif
159 #ifdef _WIN32
externalCommand(Task * owner,const char * const argv[])160     int externalCommand(Task *owner, const char *const argv[]) {
161         // Not implemented
162         exit(1);
163     }
164 #else
165     // Asynchronously execute an external command.
166     // Return false on immediate failure.
167     int externalCommand(Task *owner, const char *const argv[]);
168     static void daemonize();
169 
170     // Create a child process. Return child's PID. Channels can be
171     // used to pass sockets and messages between parent and child.
172     WorkerProcess *createWorker(Task *parent, std::ostream *log_file,
173                                 unsigned int channels, unsigned int wno);
174 
175     WorkerProcess *createWorker(Task *parent, const std::string &log_file_name,
176                                 unsigned int channels, unsigned int wno);
177 
178     // Send signal to all child processes
179     void killChildProcesses(int signum);
180 
setLogFilename(const std::string & filename)181     static void setLogFilename(const std::string &filename) {
182         openFileOnSIGHUP = filename;
183     }
184 #endif
185 
notifyTaskFinished(Task * task)186     void notifyTaskFinished(Task *task) {
187         auto ret = finishedTasks.insert(task);
188         if (ret.second)
189             engine.yield();
190     }
notifyTaskMessage(Task * task)191     void notifyTaskMessage(Task *task) {
192         auto ret = messageTasks.insert(task);
193         if (ret.second)
194             engine.yield();
195     }
196 
aborted()197     bool aborted() const {
198         return do_abort;
199     }
200 
201     void addSignalHandler(int signum, void (*handler)(int, EventLoop &));
202     static void signalHandler(int signum);
203 
204     // Enable events from task "from" to task "to". I.e. "from" will be able to
205     // call executeHandler with "to" as a parameter. If  "to" dies before
206     // "from", "from" will be notified through a call to taskFinished.
207     // Will return false unless both tasks still exist.
208     bool startObserving(Task *from, Task *to);
209 
210     // Return true if observer is observing task.
isObserving(Task * observer,Task * task)211     bool isObserving(Task *observer, Task *task) const {
212         auto p = observed_by.find(observer);
213         return p != observed_by.end() &&
214             p->second.find(task) != p->second.end();
215     }
216 
217 private:
218 #ifndef _WIN32
219 #ifdef USE_THREADS
220     thread_local
221 #endif
222     static std::map<int, int> terminatedPIDs;
223     std::map<int, Task *> pidOwner;
224 #ifdef USE_THREADS
225     thread_local
226 #endif
227     static std::string openFileOnSIGHUP;
228 #endif
229 #ifdef USE_THREADS
230     void do_init(EventLoop *parent);
231 
EventLoop(std::string log_label,EventLoop * parent)232     EventLoop(std::string log_label, EventLoop *parent) :
233         Logger(log_label),
234         engine("NetworkEngine"),
235         name(log_label) {
236         do_init(parent);
237     }
238     std::map<Task *, std::thread> threads;
239     static MsgQueue<Task *> finished_threads;
240     std::map<Task *, Task *> threadTaskObserver;
241     void collect_thread(Task *t);
242     EventLoop *parent_loop;
243 #else
244     void do_init();
245 #endif
246 
247 #ifdef USE_THREADS
248     thread_local
249 #endif
250     static volatile int got_signal;
251 #ifdef USE_THREADS
252     thread_local
253 #endif
254     static volatile int terminatedPIDtmp[100];
255 
256     void check_finished();
257     Task *nextTimerToExecute();
258     void _removeTimer(Task *task);
259     void _removeTask(Task *task, bool killed = false);
260     Engine engine;
261 
262     // Map each task to its parent (or, if it has no parent, to nullptr)
263     std::map<Task *, Task *> tasks;
264 
265     // These are used to keep track of "observation" between tasks.
266     // E.g. a parent task is observing its children.
267     std::map<Task *, std::set<Task *> > observed_by;
268     std::map<Task *, std::set<Task *> > observing;
269 
270     std::set<Task *> finishedTasks, messageTasks;
271     std::multimap<int, void (*)(int, EventLoop &)> userSignalHandler;
272     std::multimap<TimePoint, Task *> timer_queue;
273     std::string name;
274     bool do_abort = false;
275 };
276