1 /*
2  *  Copyright (C) 2016-2018 Team Kodi
3  *  This file is part of Kodi - https://kodi.tv
4  *
5  *  SPDX-License-Identifier: GPL-2.0-or-later
6  *  See LICENSES/README.md for more information.
7  */
8 
9 #pragma once
10 
11 #include "EventStreamDetail.h"
12 #include "JobManager.h"
13 #include "threads/CriticalSection.h"
14 #include "threads/SingleLock.h"
15 
16 #include <algorithm>
17 #include <memory>
18 #include <vector>
19 
20 
21 template<typename Event>
22 class CEventStream
23 {
24 public:
25 
26   template<typename A>
Subscribe(A * owner,void (A::* fn)(const Event &))27   void Subscribe(A* owner, void (A::*fn)(const Event&))
28   {
29     auto subscription = std::make_shared<detail::CSubscription<Event, A>>(owner, fn);
30     CSingleLock lock(m_criticalSection);
31     m_subscriptions.emplace_back(std::move(subscription));
32   }
33 
34   template<typename A>
Unsubscribe(A * obj)35   void Unsubscribe(A* obj)
36   {
37     std::vector<std::shared_ptr<detail::ISubscription<Event>>> toCancel;
38     {
39       CSingleLock lock(m_criticalSection);
40       auto it = m_subscriptions.begin();
41       while (it != m_subscriptions.end())
42       {
43         if ((*it)->IsOwnedBy(obj))
44         {
45           toCancel.push_back(*it);
46           it = m_subscriptions.erase(it);
47         }
48         else
49         {
50           ++it;
51         }
52       }
53     }
54     for (auto& subscription : toCancel)
55       subscription->Cancel();
56   }
57 
58 protected:
59   std::vector<std::shared_ptr<detail::ISubscription<Event>>> m_subscriptions;
60   CCriticalSection m_criticalSection;
61 };
62 
63 
64 template<typename Event>
65 class CEventSource : public CEventStream<Event>
66 {
67 public:
CEventSource()68   explicit CEventSource() : m_queue(false, 1, CJob::PRIORITY_HIGH) {};
69 
70   template<typename A>
Publish(A event)71   void Publish(A event)
72   {
73     CSingleLock lock(this->m_criticalSection);
74     auto& subscriptions = this->m_subscriptions;
75     auto task = [subscriptions, event](){
76       for (auto& s: subscriptions)
77         s->HandleEvent(event);
78     };
79     lock.Leave();
80     m_queue.Submit(std::move(task));
81   }
82 
83 private:
84   CJobQueue m_queue;
85 };
86 
87 template<typename Event>
88 class CBlockingEventSource : public CEventStream<Event>
89 {
90 public:
91   template<typename A>
HandleEvent(A event)92   void HandleEvent(A event)
93   {
94     CSingleLock lock(this->m_criticalSection);
95     for (const auto& subscription : this->m_subscriptions)
96     {
97       subscription->HandleEvent(event);
98     }
99   }
100 };
101