1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #ifndef EVENTQUEUE_H
4 #define EVENTQUEUE_H
5 
6 #include "remote/httphandler.hpp"
7 #include "base/object.hpp"
8 #include "config/expression.hpp"
9 #include <boost/asio/deadline_timer.hpp>
10 #include <boost/asio/spawn.hpp>
11 #include <condition_variable>
12 #include <cstddef>
13 #include <cstdint>
14 #include <mutex>
15 #include <set>
16 #include <map>
17 #include <deque>
18 #include <queue>
19 
20 namespace icinga
21 {
22 
23 class EventQueue final : public Object
24 {
25 public:
26 	DECLARE_PTR_TYPEDEFS(EventQueue);
27 
28 	EventQueue(String name);
29 
30 	bool CanProcessEvent(const String& type) const;
31 	void ProcessEvent(const Dictionary::Ptr& event);
32 	void AddClient(void *client);
33 	void RemoveClient(void *client);
34 
35 	void SetTypes(const std::set<String>& types);
36 	void SetFilter(std::unique_ptr<Expression> filter);
37 
38 	Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
39 
40 	static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
41 	static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
42 
43 	static EventQueue::Ptr GetByName(const String& name);
44 	static void Register(const String& name, const EventQueue::Ptr& function);
45 	static void Unregister(const String& name);
46 
47 private:
48 	String m_Name;
49 
50 	mutable std::mutex m_Mutex;
51 	std::condition_variable m_CV;
52 
53 	std::set<String> m_Types;
54 	std::unique_ptr<Expression> m_Filter;
55 
56 	std::map<void *, std::deque<Dictionary::Ptr> > m_Events;
57 };
58 
59 /**
60  * A registry for API event queues.
61  *
62  * @ingroup base
63  */
64 class EventQueueRegistry : public Registry<EventQueueRegistry, EventQueue::Ptr>
65 {
66 public:
67 	static EventQueueRegistry *GetInstance();
68 };
69 
70 enum class EventType : uint_fast8_t
71 {
72 	AcknowledgementCleared,
73 	AcknowledgementSet,
74 	CheckResult,
75 	CommentAdded,
76 	CommentRemoved,
77 	DowntimeAdded,
78 	DowntimeRemoved,
79 	DowntimeStarted,
80 	DowntimeTriggered,
81 	Flapping,
82 	Notification,
83 	StateChange,
84 	ObjectCreated,
85 	ObjectDeleted,
86 	ObjectModified
87 };
88 
89 class EventsInbox : public Object
90 {
91 public:
92 	DECLARE_PTR_TYPEDEFS(EventsInbox);
93 
94 	EventsInbox(String filter, const String& filterSource);
95 	EventsInbox(const EventsInbox&) = delete;
96 	EventsInbox(EventsInbox&&) = delete;
97 	EventsInbox& operator=(const EventsInbox&) = delete;
98 	EventsInbox& operator=(EventsInbox&&) = delete;
99 	~EventsInbox();
100 
101 	const Expression::Ptr& GetFilter();
102 
103 	void Push(Dictionary::Ptr event);
104 	Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5);
105 
106 private:
107 	struct Filter
108 	{
109 		std::size_t Refs;
110 		Expression::Ptr Expr;
111 	};
112 
113 	static std::mutex m_FiltersMutex;
114 	static std::map<String, Filter> m_Filters;
115 
116 	std::mutex m_Mutex;
117 	decltype(m_Filters.begin()) m_Filter;
118 	std::queue<Dictionary::Ptr> m_Queue;
119 	boost::asio::deadline_timer m_Timer;
120 };
121 
122 class EventsSubscriber
123 {
124 public:
125 	EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource);
126 	EventsSubscriber(const EventsSubscriber&) = delete;
127 	EventsSubscriber(EventsSubscriber&&) = delete;
128 	EventsSubscriber& operator=(const EventsSubscriber&) = delete;
129 	EventsSubscriber& operator=(EventsSubscriber&&) = delete;
130 	~EventsSubscriber();
131 
132 	const EventsInbox::Ptr& GetInbox();
133 
134 private:
135 	std::set<EventType> m_Types;
136 	EventsInbox::Ptr m_Inbox;
137 };
138 
139 class EventsFilter
140 {
141 public:
142 	EventsFilter(std::map<Expression::Ptr, std::set<EventsInbox::Ptr>> inboxes);
143 
144 	operator bool();
145 
146 	void Push(Dictionary::Ptr event);
147 
148 private:
149 	std::map<Expression::Ptr, std::set<EventsInbox::Ptr>> m_Inboxes;
150 };
151 
152 class EventsRouter
153 {
154 public:
155 	static EventsRouter& GetInstance();
156 
157 	void Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
158 	void Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
159 	EventsFilter GetInboxes(EventType type);
160 
161 private:
162 	static EventsRouter m_Instance;
163 
164 	EventsRouter() = default;
165 	EventsRouter(const EventsRouter&) = delete;
166 	EventsRouter(EventsRouter&&) = delete;
167 	EventsRouter& operator=(const EventsRouter&) = delete;
168 	EventsRouter& operator=(EventsRouter&&) = delete;
169 	~EventsRouter() = default;
170 
171 	std::mutex m_Mutex;
172 	std::map<EventType, std::map<Expression::Ptr, std::set<EventsInbox::Ptr>>> m_Subscribers;
173 };
174 
175 }
176 
177 #endif /* EVENTQUEUE_H */
178