1 /* 2 * Copyright (C) 2016-2018 Team Kodi 3 * This file is part of Kodi - https://kodi.tv 4 * 5 * SPDX-License-Identifier: GPL-2.0-or-later 6 * See LICENSES/README.md for more information. 7 */ 8 9 #pragma once 10 11 #include "EventStreamDetail.h" 12 #include "JobManager.h" 13 #include "threads/CriticalSection.h" 14 #include "threads/SingleLock.h" 15 16 #include <algorithm> 17 #include <memory> 18 #include <vector> 19 20 21 template<typename Event> 22 class CEventStream 23 { 24 public: 25 26 template<typename A> Subscribe(A * owner,void (A::* fn)(const Event &))27 void Subscribe(A* owner, void (A::*fn)(const Event&)) 28 { 29 auto subscription = std::make_shared<detail::CSubscription<Event, A>>(owner, fn); 30 CSingleLock lock(m_criticalSection); 31 m_subscriptions.emplace_back(std::move(subscription)); 32 } 33 34 template<typename A> Unsubscribe(A * obj)35 void Unsubscribe(A* obj) 36 { 37 std::vector<std::shared_ptr<detail::ISubscription<Event>>> toCancel; 38 { 39 CSingleLock lock(m_criticalSection); 40 auto it = m_subscriptions.begin(); 41 while (it != m_subscriptions.end()) 42 { 43 if ((*it)->IsOwnedBy(obj)) 44 { 45 toCancel.push_back(*it); 46 it = m_subscriptions.erase(it); 47 } 48 else 49 { 50 ++it; 51 } 52 } 53 } 54 for (auto& subscription : toCancel) 55 subscription->Cancel(); 56 } 57 58 protected: 59 std::vector<std::shared_ptr<detail::ISubscription<Event>>> m_subscriptions; 60 CCriticalSection m_criticalSection; 61 }; 62 63 64 template<typename Event> 65 class CEventSource : public CEventStream<Event> 66 { 67 public: CEventSource()68 explicit CEventSource() : m_queue(false, 1, CJob::PRIORITY_HIGH) {}; 69 70 template<typename A> Publish(A event)71 void Publish(A event) 72 { 73 CSingleLock lock(this->m_criticalSection); 74 auto& subscriptions = this->m_subscriptions; 75 auto task = [subscriptions, event](){ 76 for (auto& s: subscriptions) 77 s->HandleEvent(event); 78 }; 79 lock.Leave(); 80 m_queue.Submit(std::move(task)); 81 } 82 83 private: 84 CJobQueue m_queue; 85 }; 86 87 template<typename Event> 88 class CBlockingEventSource : public CEventStream<Event> 89 { 90 public: 91 template<typename A> HandleEvent(A event)92 void HandleEvent(A event) 93 { 94 CSingleLock lock(this->m_criticalSection); 95 for (const auto& subscription : this->m_subscriptions) 96 { 97 subscription->HandleEvent(event); 98 } 99 } 100 }; 101