1 // This file is part of Desktop App Toolkit,
2 // a set of libraries for developing nice desktop applications.
3 //
4 // For license and copyright information please follow this link:
5 // https://github.com/desktop-app/legal/blob/master/LEGAL
6 //
7 #pragma once
8 
9 #include <vector>
10 #include <deque>
11 #include <rpl/producer.h>
12 #include "base/type_traits.h"
13 
14 namespace base {
15 namespace internal {
16 
17 using ObservableCallHandlers = Fn<void()>;
18 void RegisterPendingObservable(ObservableCallHandlers *handlers);
19 void UnregisterActiveObservable(ObservableCallHandlers *handlers);
20 void UnregisterObservable(ObservableCallHandlers *handlers);
21 
22 template <typename EventType>
23 struct SubscriptionHandlerHelper {
24 	using type = Fn<void(parameter_type<EventType>)>;
25 };
26 
27 template <>
28 struct SubscriptionHandlerHelper<void> {
29 	using type = Fn<void()>;
30 };
31 
32 template <typename EventType>
33 using SubscriptionHandler = typename SubscriptionHandlerHelper<EventType>::type;
34 
35 class BaseObservableData {
36 };
37 
38 template <typename EventType, typename Handler>
39 class CommonObservableData;
40 
41 template <typename EventType, typename Handler>
42 class ObservableData;
43 
44 } // namespace internal
45 
46 class Subscription {
47 public:
48 	Subscription() = default;
49 	Subscription(const Subscription &) = delete;
50 	Subscription &operator=(const Subscription &) = delete;
51 	Subscription(Subscription &&other) : _node(base::take(other._node)), _removeAndDestroyMethod(other._removeAndDestroyMethod) {
52 	}
53 	Subscription &operator=(Subscription &&other) {
54 		qSwap(_node, other._node);
55 		qSwap(_removeAndDestroyMethod, other._removeAndDestroyMethod);
56 		return *this;
57 	}
58 	explicit operator bool() const {
59 		return (_node != nullptr);
60 	}
61 	void destroy() {
62 		if (_node) {
63 			(*_removeAndDestroyMethod)(base::take(_node));
64 		}
65 	}
66 	~Subscription() {
67 		destroy();
68 	}
69 
70 private:
71 	struct Node {
72 		Node(const std::shared_ptr<internal::BaseObservableData> &observable)
73 		: observable(observable) {
74 		}
75 		Node *next = nullptr;
76 		Node *prev = nullptr;
77 		std::weak_ptr<internal::BaseObservableData> observable;
78 	};
79 	using RemoveAndDestroyMethod = void(*)(Node*);
80 	Subscription(Node *node, RemoveAndDestroyMethod removeAndDestroyMethod)
81 	: _node(node)
82 	, _removeAndDestroyMethod(removeAndDestroyMethod) {
83 	}
84 
85 	Node *_node = nullptr;
86 	RemoveAndDestroyMethod _removeAndDestroyMethod;
87 
88 	template <typename EventType, typename Handler>
89 	friend class internal::CommonObservableData;
90 
91 	template <typename EventType, typename Handler>
92 	friend class internal::ObservableData;
93 
94 };
95 
96 namespace internal {
97 
98 template <typename EventType, typename Handler, bool EventTypeIsSimple>
99 class BaseObservable;
100 
101 template <typename EventType, typename Handler>
102 class CommonObservable {
103 public:
104 	Subscription add_subscription(Handler &&handler) {
105 		if (!_data) {
106 			_data = std::make_shared<ObservableData<EventType, Handler>>(this);
107 		}
108 		return _data->append(std::move(handler));
109 	}
110 
111 private:
112 	std::shared_ptr<ObservableData<EventType, Handler>> _data;
113 
114 	friend class CommonObservableData<EventType, Handler>;
115 	friend class BaseObservable<EventType, Handler, base::type_traits<EventType>::is_fast_copy_type::value>;
116 
117 };
118 
119 template <typename EventType, typename Handler>
120 class BaseObservable<EventType, Handler, true> : public internal::CommonObservable<EventType, Handler> {
121 public:
122 	void notify(EventType event, bool sync = false) {
123 		if (this->_data) {
124 			this->_data->notify(std::move(event), sync);
125 		}
126 	}
127 
128 };
129 
130 template <typename EventType, typename Handler>
131 class BaseObservable<EventType, Handler, false> : public internal::CommonObservable<EventType, Handler> {
132 public:
133 	void notify(EventType &&event, bool sync = false) {
134 		if (this->_data) {
135 			this->_data->notify(std::move(event), sync);
136 		}
137 	}
138 	void notify(const EventType &event, bool sync = false) {
139 		if (this->_data) {
140 			auto event_copy = event;
141 			this->_data->notify(std::move(event_copy), sync);
142 		}
143 	}
144 
145 };
146 
147 } // namespace internal
148 
149 namespace internal {
150 
151 template <typename EventType, typename Handler>
152 class CommonObservableData : public BaseObservableData {
153 public:
154 	CommonObservableData(CommonObservable<EventType, Handler> *observable) : _observable(observable) {
155 	}
156 
157 	Subscription append(Handler &&handler) {
158 		auto node = new Node(_observable->_data, std::move(handler));
159 		if (_begin) {
160 			_end->next = node;
161 			node->prev = _end;
162 			_end = node;
163 		} else {
164 			_begin = _end = node;
165 		}
166 		return { _end, &CommonObservableData::removeAndDestroyNode };
167 	}
168 
169 	bool empty() const {
170 		return !_begin;
171 	}
172 
173 private:
174 	struct Node : public Subscription::Node {
175 		Node(
176 			const std::shared_ptr<BaseObservableData> &observer,
177 			Handler &&handler)
178 		: Subscription::Node(observer)
179 		, handler(std::move(handler)) {
180 		}
181 		Handler handler;
182 	};
183 
184 	void remove(Subscription::Node *node) {
185 		if (node->prev) {
186 			node->prev->next = node->next;
187 		}
188 		if (node->next) {
189 			node->next->prev = node->prev;
190 		}
191 		if (_begin == node) {
192 			_begin = static_cast<Node*>(node->next);
193 		}
194 		if (_end == node) {
195 			_end = static_cast<Node*>(node->prev);
196 		}
197 		if (_current == node) {
198 			_current = static_cast<Node*>(node->prev);
199 		} else if (!_begin) {
200 			_observable->_data.reset();
201 		}
202 	}
203 
204 	static void removeAndDestroyNode(Subscription::Node *node) {
205 		if (const auto that = node->observable.lock()) {
206 			static_cast<CommonObservableData*>(that.get())->remove(node);
207 		}
208 		delete static_cast<Node*>(node);
209 	}
210 
211 	template <typename CallCurrent>
212 	void notifyEnumerate(CallCurrent callCurrent) {
213 		_current = _begin;
214 		do {
215 			callCurrent();
216 			if (_current) {
217 				_current = static_cast<Node*>(_current->next);
218 			} else if (_begin) {
219 				_current = _begin;
220 			} else {
221 				break;
222 			}
223 		} while (_current);
224 	}
225 
226 	bool destroyMeIfEmpty() const {
227 		if (empty()) {
228 			_observable->_data.reset();
229 			return true;
230 		}
231 		return false;
232 	}
233 
234 	CommonObservable<EventType, Handler> *_observable = nullptr;
235 	Node *_begin = nullptr;
236 	Node *_current = nullptr;
237 	Node *_end = nullptr;
238 	ObservableCallHandlers _callHandlers;
239 
240 	friend class ObservableData<EventType, Handler>;
241 
242 };
243 
244 template <typename EventType, typename Handler>
245 class ObservableData : public CommonObservableData<EventType, Handler> {
246 public:
247 	using CommonObservableData<EventType, Handler>::CommonObservableData;
248 
249 	void notify(EventType &&event, bool sync) {
250 		if (_handling) {
251 			sync = false;
252 		}
253 		if (sync) {
254 			_events.push_back(std::move(event));
255 			callHandlers();
256 		} else {
257 			if (!this->_callHandlers) {
258 				this->_callHandlers = [this]() {
259 					callHandlers();
260 				};
261 			}
262 			if (_events.empty()) {
263 				RegisterPendingObservable(&this->_callHandlers);
264 			}
265 			_events.push_back(std::move(event));
266 		}
267 	}
268 
269 	~ObservableData() {
270 		UnregisterObservable(&this->_callHandlers);
271 	}
272 
273 private:
274 	void callHandlers() {
275 		_handling = true;
276 		auto events = base::take(_events);
277 		for (auto &event : events) {
278 			this->notifyEnumerate([this, &event]() {
279 				this->_current->handler(event);
280 			});
281 			if (this->destroyMeIfEmpty()) {
282 				return;
283 			}
284 		}
285 		_handling = false;
286 		UnregisterActiveObservable(&this->_callHandlers);
287 	}
288 
289 	std::deque<EventType> _events;
290 	bool _handling = false;
291 
292 };
293 
294 template <class Handler>
295 class ObservableData<void, Handler> : public CommonObservableData<void, Handler> {
296 public:
297 	using CommonObservableData<void, Handler>::CommonObservableData;
298 
299 	void notify(bool sync) {
300 		if (_handling) {
301 			sync = false;
302 		}
303 		if (sync) {
304 			++_eventsCount;
305 			callHandlers();
306 		} else {
307 			if (!this->_callHandlers) {
308 				this->_callHandlers = [this]() {
309 					callHandlers();
310 				};
311 			}
312 			if (!_eventsCount) {
313 				RegisterPendingObservable(&this->_callHandlers);
314 			}
315 			++_eventsCount;
316 		}
317 	}
318 
319 	~ObservableData() {
320 		UnregisterObservable(&this->_callHandlers);
321 	}
322 
323 private:
324 	void callHandlers() {
325 		_handling = true;
326 		auto eventsCount = base::take(_eventsCount);
327 		for (int i = 0; i != eventsCount; ++i) {
328 			this->notifyEnumerate([this]() {
329 				this->_current->handler();
330 			});
331 			if (this->destroyMeIfEmpty()) {
332 				return;
333 			}
334 		}
335 		_handling = false;
336 		UnregisterActiveObservable(&this->_callHandlers);
337 	}
338 
339 	int _eventsCount = 0;
340 	bool _handling = false;
341 
342 };
343 
344 template <typename Handler>
345 class BaseObservable<void, Handler, base::type_traits<void>::is_fast_copy_type::value> : public internal::CommonObservable<void, Handler> {
346 public:
347 	void notify(bool sync = false) {
348 		if (this->_data) {
349 			this->_data->notify(sync);
350 		}
351 	}
352 
353 };
354 
355 } // namespace internal
356 
357 template <typename EventType, typename Handler = internal::SubscriptionHandler<EventType>>
358 class Observable : public internal::BaseObservable<EventType, Handler, base::type_traits<EventType>::is_fast_copy_type::value> {
359 public:
360 	Observable() = default;
361 	Observable(const Observable &other) = delete;
362 	Observable(Observable &&other) = default;
363 	Observable &operator=(const Observable &other) = delete;
364 	Observable &operator=(Observable &&other) = default;
365 
366 };
367 
368 template <typename Type>
369 class Variable {
370 public:
371 	Variable(parameter_type<Type> startValue = Type()) : _value(startValue) {
372 	}
373 	Variable(Variable &&other) = default;
374 	Variable &operator=(Variable &&other) = default;
375 
376 	parameter_type<Type> value() const {
377 		return _value;
378 	}
379 
380 	void setForced(parameter_type<Type> newValue, bool sync = false) {
381 		_value = newValue;
382 		changed().notify(_value, sync);
383 	}
384 
385 	void set(parameter_type<Type> newValue, bool sync = false) {
386 		if (_value != newValue) {
387 			setForced(newValue, sync);
388 		}
389 	}
390 
391 	template <typename Callback>
392 	void process(Callback callback, bool sync = false) {
393 		callback(_value);
394 		changed().notify(_value, sync);
395 	}
396 
397 	Observable<Type> &changed() const {
398 		return _changed;
399 	}
400 
401 private:
402 	Type _value;
403 	mutable Observable<Type> _changed;
404 
405 };
406 
407 class Subscriber {
408 protected:
409 	template <typename EventType, typename Handler, typename Lambda>
410 	int subscribe(base::Observable<EventType, Handler> &observable, Lambda &&handler) {
411 		_subscriptions.push_back(observable.add_subscription(std::forward<Lambda>(handler)));
412 		return _subscriptions.size();
413 	}
414 
415 	template <typename EventType, typename Handler, typename Lambda>
416 	int subscribe(base::Observable<EventType, Handler> *observable, Lambda &&handler) {
417 		return subscribe(*observable, std::forward<Lambda>(handler));
418 	}
419 
420 	template <typename Type, typename Lambda>
421 	int subscribe(const base::Variable<Type> &variable, Lambda &&handler) {
422 		return subscribe(variable.changed(), std::forward<Lambda>(handler));
423 	}
424 
425 	template <typename Type, typename Lambda>
426 	int subscribe(const base::Variable<Type> *variable, Lambda &&handler) {
427 		return subscribe(variable->changed(), std::forward<Lambda>(handler));
428 	}
429 
430 	void unsubscribe(int index) {
431 		if (!index) return;
432 		auto count = static_cast<int>(_subscriptions.size());
433 		Assert(index > 0 && index <= count);
434 		_subscriptions[index - 1].destroy();
435 		if (index == count) {
436 			while (index > 0 && !_subscriptions[--index]) {
437 				_subscriptions.pop_back();
438 			}
439 		}
440 	}
441 
442 	~Subscriber() {
443 		auto subscriptions = base::take(_subscriptions);
444 		for (auto &subscription : subscriptions) {
445 			subscription.destroy();
446 		}
447 	}
448 
449 private:
450 	std::vector<base::Subscription> _subscriptions;
451 
452 };
453 
454 void InitObservables(void(*HandleDelayed)());
455 void HandleObservables();
456 
457 template <
458 	typename Type,
459 	typename = std::enable_if_t<!std::is_same_v<Type, void>>>
460 inline auto ObservableViewer(base::Observable<Type> &observable) {
461 	return rpl::make_producer<Type>([&observable](
462 			const auto &consumer) {
463 		auto lifetime = rpl::lifetime();
464 		lifetime.make_state<base::Subscription>(
465 			observable.add_subscription([consumer](auto &&update) {
466 				consumer.put_next_forward(
467 					std::forward<decltype(update)>(update));
468 			}));
469 		return lifetime;
470 	});
471 }
472 
473 rpl::producer<> ObservableViewer(base::Observable<void> &observable);
474 
475 } // namespace base
476