1 // This file is part of Desktop App Toolkit, 2 // a set of libraries for developing nice desktop applications. 3 // 4 // For license and copyright information please follow this link: 5 // https://github.com/desktop-app/legal/blob/master/LEGAL 6 // 7 #pragma once 8 9 #include <vector> 10 #include <deque> 11 #include <rpl/producer.h> 12 #include "base/type_traits.h" 13 14 namespace base { 15 namespace internal { 16 17 using ObservableCallHandlers = Fn<void()>; 18 void RegisterPendingObservable(ObservableCallHandlers *handlers); 19 void UnregisterActiveObservable(ObservableCallHandlers *handlers); 20 void UnregisterObservable(ObservableCallHandlers *handlers); 21 22 template <typename EventType> 23 struct SubscriptionHandlerHelper { 24 using type = Fn<void(parameter_type<EventType>)>; 25 }; 26 27 template <> 28 struct SubscriptionHandlerHelper<void> { 29 using type = Fn<void()>; 30 }; 31 32 template <typename EventType> 33 using SubscriptionHandler = typename SubscriptionHandlerHelper<EventType>::type; 34 35 class BaseObservableData { 36 }; 37 38 template <typename EventType, typename Handler> 39 class CommonObservableData; 40 41 template <typename EventType, typename Handler> 42 class ObservableData; 43 44 } // namespace internal 45 46 class Subscription { 47 public: 48 Subscription() = default; 49 Subscription(const Subscription &) = delete; 50 Subscription &operator=(const Subscription &) = delete; 51 Subscription(Subscription &&other) : _node(base::take(other._node)), _removeAndDestroyMethod(other._removeAndDestroyMethod) { 52 } 53 Subscription &operator=(Subscription &&other) { 54 qSwap(_node, other._node); 55 qSwap(_removeAndDestroyMethod, other._removeAndDestroyMethod); 56 return *this; 57 } 58 explicit operator bool() const { 59 return (_node != nullptr); 60 } 61 void destroy() { 62 if (_node) { 63 (*_removeAndDestroyMethod)(base::take(_node)); 64 } 65 } 66 ~Subscription() { 67 destroy(); 68 } 69 70 private: 71 struct Node { 72 Node(const std::shared_ptr<internal::BaseObservableData> &observable) 73 : observable(observable) { 74 } 75 Node *next = nullptr; 76 Node *prev = nullptr; 77 std::weak_ptr<internal::BaseObservableData> observable; 78 }; 79 using RemoveAndDestroyMethod = void(*)(Node*); 80 Subscription(Node *node, RemoveAndDestroyMethod removeAndDestroyMethod) 81 : _node(node) 82 , _removeAndDestroyMethod(removeAndDestroyMethod) { 83 } 84 85 Node *_node = nullptr; 86 RemoveAndDestroyMethod _removeAndDestroyMethod; 87 88 template <typename EventType, typename Handler> 89 friend class internal::CommonObservableData; 90 91 template <typename EventType, typename Handler> 92 friend class internal::ObservableData; 93 94 }; 95 96 namespace internal { 97 98 template <typename EventType, typename Handler, bool EventTypeIsSimple> 99 class BaseObservable; 100 101 template <typename EventType, typename Handler> 102 class CommonObservable { 103 public: 104 Subscription add_subscription(Handler &&handler) { 105 if (!_data) { 106 _data = std::make_shared<ObservableData<EventType, Handler>>(this); 107 } 108 return _data->append(std::move(handler)); 109 } 110 111 private: 112 std::shared_ptr<ObservableData<EventType, Handler>> _data; 113 114 friend class CommonObservableData<EventType, Handler>; 115 friend class BaseObservable<EventType, Handler, base::type_traits<EventType>::is_fast_copy_type::value>; 116 117 }; 118 119 template <typename EventType, typename Handler> 120 class BaseObservable<EventType, Handler, true> : public internal::CommonObservable<EventType, Handler> { 121 public: 122 void notify(EventType event, bool sync = false) { 123 if (this->_data) { 124 this->_data->notify(std::move(event), sync); 125 } 126 } 127 128 }; 129 130 template <typename EventType, typename Handler> 131 class BaseObservable<EventType, Handler, false> : public internal::CommonObservable<EventType, Handler> { 132 public: 133 void notify(EventType &&event, bool sync = false) { 134 if (this->_data) { 135 this->_data->notify(std::move(event), sync); 136 } 137 } 138 void notify(const EventType &event, bool sync = false) { 139 if (this->_data) { 140 auto event_copy = event; 141 this->_data->notify(std::move(event_copy), sync); 142 } 143 } 144 145 }; 146 147 } // namespace internal 148 149 namespace internal { 150 151 template <typename EventType, typename Handler> 152 class CommonObservableData : public BaseObservableData { 153 public: 154 CommonObservableData(CommonObservable<EventType, Handler> *observable) : _observable(observable) { 155 } 156 157 Subscription append(Handler &&handler) { 158 auto node = new Node(_observable->_data, std::move(handler)); 159 if (_begin) { 160 _end->next = node; 161 node->prev = _end; 162 _end = node; 163 } else { 164 _begin = _end = node; 165 } 166 return { _end, &CommonObservableData::removeAndDestroyNode }; 167 } 168 169 bool empty() const { 170 return !_begin; 171 } 172 173 private: 174 struct Node : public Subscription::Node { 175 Node( 176 const std::shared_ptr<BaseObservableData> &observer, 177 Handler &&handler) 178 : Subscription::Node(observer) 179 , handler(std::move(handler)) { 180 } 181 Handler handler; 182 }; 183 184 void remove(Subscription::Node *node) { 185 if (node->prev) { 186 node->prev->next = node->next; 187 } 188 if (node->next) { 189 node->next->prev = node->prev; 190 } 191 if (_begin == node) { 192 _begin = static_cast<Node*>(node->next); 193 } 194 if (_end == node) { 195 _end = static_cast<Node*>(node->prev); 196 } 197 if (_current == node) { 198 _current = static_cast<Node*>(node->prev); 199 } else if (!_begin) { 200 _observable->_data.reset(); 201 } 202 } 203 204 static void removeAndDestroyNode(Subscription::Node *node) { 205 if (const auto that = node->observable.lock()) { 206 static_cast<CommonObservableData*>(that.get())->remove(node); 207 } 208 delete static_cast<Node*>(node); 209 } 210 211 template <typename CallCurrent> 212 void notifyEnumerate(CallCurrent callCurrent) { 213 _current = _begin; 214 do { 215 callCurrent(); 216 if (_current) { 217 _current = static_cast<Node*>(_current->next); 218 } else if (_begin) { 219 _current = _begin; 220 } else { 221 break; 222 } 223 } while (_current); 224 } 225 226 bool destroyMeIfEmpty() const { 227 if (empty()) { 228 _observable->_data.reset(); 229 return true; 230 } 231 return false; 232 } 233 234 CommonObservable<EventType, Handler> *_observable = nullptr; 235 Node *_begin = nullptr; 236 Node *_current = nullptr; 237 Node *_end = nullptr; 238 ObservableCallHandlers _callHandlers; 239 240 friend class ObservableData<EventType, Handler>; 241 242 }; 243 244 template <typename EventType, typename Handler> 245 class ObservableData : public CommonObservableData<EventType, Handler> { 246 public: 247 using CommonObservableData<EventType, Handler>::CommonObservableData; 248 249 void notify(EventType &&event, bool sync) { 250 if (_handling) { 251 sync = false; 252 } 253 if (sync) { 254 _events.push_back(std::move(event)); 255 callHandlers(); 256 } else { 257 if (!this->_callHandlers) { 258 this->_callHandlers = [this]() { 259 callHandlers(); 260 }; 261 } 262 if (_events.empty()) { 263 RegisterPendingObservable(&this->_callHandlers); 264 } 265 _events.push_back(std::move(event)); 266 } 267 } 268 269 ~ObservableData() { 270 UnregisterObservable(&this->_callHandlers); 271 } 272 273 private: 274 void callHandlers() { 275 _handling = true; 276 auto events = base::take(_events); 277 for (auto &event : events) { 278 this->notifyEnumerate([this, &event]() { 279 this->_current->handler(event); 280 }); 281 if (this->destroyMeIfEmpty()) { 282 return; 283 } 284 } 285 _handling = false; 286 UnregisterActiveObservable(&this->_callHandlers); 287 } 288 289 std::deque<EventType> _events; 290 bool _handling = false; 291 292 }; 293 294 template <class Handler> 295 class ObservableData<void, Handler> : public CommonObservableData<void, Handler> { 296 public: 297 using CommonObservableData<void, Handler>::CommonObservableData; 298 299 void notify(bool sync) { 300 if (_handling) { 301 sync = false; 302 } 303 if (sync) { 304 ++_eventsCount; 305 callHandlers(); 306 } else { 307 if (!this->_callHandlers) { 308 this->_callHandlers = [this]() { 309 callHandlers(); 310 }; 311 } 312 if (!_eventsCount) { 313 RegisterPendingObservable(&this->_callHandlers); 314 } 315 ++_eventsCount; 316 } 317 } 318 319 ~ObservableData() { 320 UnregisterObservable(&this->_callHandlers); 321 } 322 323 private: 324 void callHandlers() { 325 _handling = true; 326 auto eventsCount = base::take(_eventsCount); 327 for (int i = 0; i != eventsCount; ++i) { 328 this->notifyEnumerate([this]() { 329 this->_current->handler(); 330 }); 331 if (this->destroyMeIfEmpty()) { 332 return; 333 } 334 } 335 _handling = false; 336 UnregisterActiveObservable(&this->_callHandlers); 337 } 338 339 int _eventsCount = 0; 340 bool _handling = false; 341 342 }; 343 344 template <typename Handler> 345 class BaseObservable<void, Handler, base::type_traits<void>::is_fast_copy_type::value> : public internal::CommonObservable<void, Handler> { 346 public: 347 void notify(bool sync = false) { 348 if (this->_data) { 349 this->_data->notify(sync); 350 } 351 } 352 353 }; 354 355 } // namespace internal 356 357 template <typename EventType, typename Handler = internal::SubscriptionHandler<EventType>> 358 class Observable : public internal::BaseObservable<EventType, Handler, base::type_traits<EventType>::is_fast_copy_type::value> { 359 public: 360 Observable() = default; 361 Observable(const Observable &other) = delete; 362 Observable(Observable &&other) = default; 363 Observable &operator=(const Observable &other) = delete; 364 Observable &operator=(Observable &&other) = default; 365 366 }; 367 368 template <typename Type> 369 class Variable { 370 public: 371 Variable(parameter_type<Type> startValue = Type()) : _value(startValue) { 372 } 373 Variable(Variable &&other) = default; 374 Variable &operator=(Variable &&other) = default; 375 376 parameter_type<Type> value() const { 377 return _value; 378 } 379 380 void setForced(parameter_type<Type> newValue, bool sync = false) { 381 _value = newValue; 382 changed().notify(_value, sync); 383 } 384 385 void set(parameter_type<Type> newValue, bool sync = false) { 386 if (_value != newValue) { 387 setForced(newValue, sync); 388 } 389 } 390 391 template <typename Callback> 392 void process(Callback callback, bool sync = false) { 393 callback(_value); 394 changed().notify(_value, sync); 395 } 396 397 Observable<Type> &changed() const { 398 return _changed; 399 } 400 401 private: 402 Type _value; 403 mutable Observable<Type> _changed; 404 405 }; 406 407 class Subscriber { 408 protected: 409 template <typename EventType, typename Handler, typename Lambda> 410 int subscribe(base::Observable<EventType, Handler> &observable, Lambda &&handler) { 411 _subscriptions.push_back(observable.add_subscription(std::forward<Lambda>(handler))); 412 return _subscriptions.size(); 413 } 414 415 template <typename EventType, typename Handler, typename Lambda> 416 int subscribe(base::Observable<EventType, Handler> *observable, Lambda &&handler) { 417 return subscribe(*observable, std::forward<Lambda>(handler)); 418 } 419 420 template <typename Type, typename Lambda> 421 int subscribe(const base::Variable<Type> &variable, Lambda &&handler) { 422 return subscribe(variable.changed(), std::forward<Lambda>(handler)); 423 } 424 425 template <typename Type, typename Lambda> 426 int subscribe(const base::Variable<Type> *variable, Lambda &&handler) { 427 return subscribe(variable->changed(), std::forward<Lambda>(handler)); 428 } 429 430 void unsubscribe(int index) { 431 if (!index) return; 432 auto count = static_cast<int>(_subscriptions.size()); 433 Assert(index > 0 && index <= count); 434 _subscriptions[index - 1].destroy(); 435 if (index == count) { 436 while (index > 0 && !_subscriptions[--index]) { 437 _subscriptions.pop_back(); 438 } 439 } 440 } 441 442 ~Subscriber() { 443 auto subscriptions = base::take(_subscriptions); 444 for (auto &subscription : subscriptions) { 445 subscription.destroy(); 446 } 447 } 448 449 private: 450 std::vector<base::Subscription> _subscriptions; 451 452 }; 453 454 void InitObservables(void(*HandleDelayed)()); 455 void HandleObservables(); 456 457 template < 458 typename Type, 459 typename = std::enable_if_t<!std::is_same_v<Type, void>>> 460 inline auto ObservableViewer(base::Observable<Type> &observable) { 461 return rpl::make_producer<Type>([&observable]( 462 const auto &consumer) { 463 auto lifetime = rpl::lifetime(); 464 lifetime.make_state<base::Subscription>( 465 observable.add_subscription([consumer](auto &&update) { 466 consumer.put_next_forward( 467 std::forward<decltype(update)>(update)); 468 })); 469 return lifetime; 470 }); 471 } 472 473 rpl::producer<> ObservableViewer(base::Observable<void> &observable); 474 475 } // namespace base 476