1 #pragma once
2 
3 #include "providers/twitch/ChatterinoWebSocketppLogger.hpp"
4 #include "providers/twitch/PubsubActions.hpp"
5 #include "providers/twitch/TwitchAccount.hpp"
6 #include "providers/twitch/TwitchIrcServer.hpp"
7 
8 #include <rapidjson/document.h>
9 #include <QString>
10 #include <pajlada/signals/signal.hpp>
11 #include <websocketpp/client.hpp>
12 #include <websocketpp/config/asio_client.hpp>
13 #include <websocketpp/extensions/permessage_deflate/disabled.hpp>
14 #include <websocketpp/logger/basic.hpp>
15 
16 #include <atomic>
17 #include <chrono>
18 #include <map>
19 #include <memory>
20 #include <set>
21 #include <thread>
22 #include <unordered_map>
23 #include <vector>
24 
25 namespace chatterino {
26 
27 struct chatterinoconfig : public websocketpp::config::asio_tls_client {
28     typedef websocketpp::log::chatterinowebsocketpplogger<
29         concurrency_type, websocketpp::log::elevel>
30         elog_type;
31     typedef websocketpp::log::chatterinowebsocketpplogger<
32         concurrency_type, websocketpp::log::alevel>
33         alog_type;
34 
35     struct permessage_deflate_config {
36     };
37 
38     typedef websocketpp::extensions::permessage_deflate::disabled<
39         permessage_deflate_config>
40         permessage_deflate_type;
41 };
42 
43 using WebsocketClient = websocketpp::client<chatterinoconfig>;
44 using WebsocketHandle = websocketpp::connection_hdl;
45 using WebsocketErrorCode = websocketpp::lib::error_code;
46 
47 #define MAX_PUBSUB_LISTENS 50
48 #define MAX_PUBSUB_CONNECTIONS 10
49 
50 struct RequestMessage {
51     QString payload;
52     int topicCount;
53 };
54 
55 namespace detail {
56 
57     struct Listener {
58         QString topic;
59         bool authed;
60         bool persistent;
61         bool confirmed = false;
62     };
63 
64     class PubSubClient : public std::enable_shared_from_this<PubSubClient>
65     {
66     public:
67         PubSubClient(WebsocketClient &_websocketClient,
68                      WebsocketHandle _handle);
69 
70         void start();
71         void stop();
72 
73         bool listen(rapidjson::Document &message);
74         void unlistenPrefix(const QString &prefix);
75 
76         void handlePong();
77 
78         bool isListeningToTopic(const QString &topic);
79 
80     private:
81         void ping();
82         bool send(const char *payload);
83 
84         WebsocketClient &websocketClient_;
85         WebsocketHandle handle_;
86         uint16_t numListens_ = 0;
87 
88         std::vector<Listener> listeners_;
89 
90         std::atomic<bool> awaitingPong_{false};
91         std::atomic<bool> started_{false};
92     };
93 
94 }  // namespace detail
95 
96 class PubSub
97 {
98     using WebsocketMessagePtr =
99         websocketpp::config::asio_tls_client::message_type::ptr;
100     using WebsocketContextPtr =
101         websocketpp::lib::shared_ptr<boost::asio::ssl::context>;
102 
103     template <typename T>
104     using Signal =
105         pajlada::Signals::Signal<T>;  // type-id is vector<T, Alloc<T>>
106 
107     WebsocketClient websocketClient;
108     std::unique_ptr<std::thread> mainThread;
109 
110 public:
111     PubSub();
112 
113     ~PubSub() = delete;
114 
115     enum class State {
116         Connected,
117         Disconnected,
118     };
119 
120     void start();
121 
isConnected() const122     bool isConnected() const
123     {
124         return this->state == State::Connected;
125     }
126 
127     pajlada::Signals::NoArgSignal connected;
128 
129     struct {
130         struct {
131             Signal<ClearChatAction> chatCleared;
132             Signal<DeleteAction> messageDeleted;
133             Signal<ModeChangedAction> modeChanged;
134             Signal<ModerationStateAction> moderationStateChanged;
135 
136             Signal<BanAction> userBanned;
137             Signal<UnbanAction> userUnbanned;
138 
139             Signal<AutomodAction> automodMessage;
140             Signal<AutomodUserAction> automodUserMessage;
141             Signal<AutomodInfoAction> automodInfoMessage;
142         } moderation;
143 
144         struct {
145             // Parsing should be done in PubSubManager as well,
146             // but for now we just send the raw data
147             Signal<const rapidjson::Value &> received;
148             Signal<const rapidjson::Value &> sent;
149         } whisper;
150 
151         struct {
152             Signal<rapidjson::Value &> redeemed;
153         } pointReward;
154     } signals_;
155 
156     void listenToWhispers(std::shared_ptr<TwitchAccount> account);
157 
158     void unlistenAllModerationActions();
159 
160     void listenToChannelModerationActions(
161         const QString &channelID, std::shared_ptr<TwitchAccount> account);
162     void listenToAutomod(const QString &channelID,
163                          std::shared_ptr<TwitchAccount> account);
164 
165     void listenToChannelPointRewards(const QString &channelID,
166                                      std::shared_ptr<TwitchAccount> account);
167 
168     std::vector<std::unique_ptr<rapidjson::Document>> requests;
169 
170 private:
171     void listenToTopic(const QString &topic,
172                        std::shared_ptr<TwitchAccount> account);
173 
174     void listen(rapidjson::Document &&msg);
175     bool tryListen(rapidjson::Document &msg);
176 
177     bool isListeningToTopic(const QString &topic);
178 
179     void addClient();
180     std::atomic<bool> addingClient{false};
181 
182     State state = State::Connected;
183 
184     std::map<WebsocketHandle, std::shared_ptr<detail::PubSubClient>,
185              std::owner_less<WebsocketHandle>>
186         clients;
187 
188     std::unordered_map<
189         QString, std::function<void(const rapidjson::Value &, const QString &)>>
190         moderationActionHandlers;
191 
192     std::unordered_map<
193         QString, std::function<void(const rapidjson::Value &, const QString &)>>
194         channelTermsActionHandlers;
195 
196     void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg);
197     void onConnectionOpen(websocketpp::connection_hdl hdl);
198     void onConnectionClose(websocketpp::connection_hdl hdl);
199     WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl);
200 
201     void handleResponse(const rapidjson::Document &msg);
202     void handleListenResponse(const RequestMessage &msg, bool failed);
203     void handleUnlistenResponse(const RequestMessage &msg, bool failed);
204     void handleMessageResponse(const rapidjson::Value &data);
205 
206     void runThread();
207 };
208 
209 }  // namespace chatterino
210