1 #pragma once 2 3 #include "providers/twitch/ChatterinoWebSocketppLogger.hpp" 4 #include "providers/twitch/PubsubActions.hpp" 5 #include "providers/twitch/TwitchAccount.hpp" 6 #include "providers/twitch/TwitchIrcServer.hpp" 7 8 #include <rapidjson/document.h> 9 #include <QString> 10 #include <pajlada/signals/signal.hpp> 11 #include <websocketpp/client.hpp> 12 #include <websocketpp/config/asio_client.hpp> 13 #include <websocketpp/extensions/permessage_deflate/disabled.hpp> 14 #include <websocketpp/logger/basic.hpp> 15 16 #include <atomic> 17 #include <chrono> 18 #include <map> 19 #include <memory> 20 #include <set> 21 #include <thread> 22 #include <unordered_map> 23 #include <vector> 24 25 namespace chatterino { 26 27 struct chatterinoconfig : public websocketpp::config::asio_tls_client { 28 typedef websocketpp::log::chatterinowebsocketpplogger< 29 concurrency_type, websocketpp::log::elevel> 30 elog_type; 31 typedef websocketpp::log::chatterinowebsocketpplogger< 32 concurrency_type, websocketpp::log::alevel> 33 alog_type; 34 35 struct permessage_deflate_config { 36 }; 37 38 typedef websocketpp::extensions::permessage_deflate::disabled< 39 permessage_deflate_config> 40 permessage_deflate_type; 41 }; 42 43 using WebsocketClient = websocketpp::client<chatterinoconfig>; 44 using WebsocketHandle = websocketpp::connection_hdl; 45 using WebsocketErrorCode = websocketpp::lib::error_code; 46 47 #define MAX_PUBSUB_LISTENS 50 48 #define MAX_PUBSUB_CONNECTIONS 10 49 50 struct RequestMessage { 51 QString payload; 52 int topicCount; 53 }; 54 55 namespace detail { 56 57 struct Listener { 58 QString topic; 59 bool authed; 60 bool persistent; 61 bool confirmed = false; 62 }; 63 64 class PubSubClient : public std::enable_shared_from_this<PubSubClient> 65 { 66 public: 67 PubSubClient(WebsocketClient &_websocketClient, 68 WebsocketHandle _handle); 69 70 void start(); 71 void stop(); 72 73 bool listen(rapidjson::Document &message); 74 void unlistenPrefix(const QString &prefix); 75 76 void handlePong(); 77 78 bool isListeningToTopic(const QString &topic); 79 80 private: 81 void ping(); 82 bool send(const char *payload); 83 84 WebsocketClient &websocketClient_; 85 WebsocketHandle handle_; 86 uint16_t numListens_ = 0; 87 88 std::vector<Listener> listeners_; 89 90 std::atomic<bool> awaitingPong_{false}; 91 std::atomic<bool> started_{false}; 92 }; 93 94 } // namespace detail 95 96 class PubSub 97 { 98 using WebsocketMessagePtr = 99 websocketpp::config::asio_tls_client::message_type::ptr; 100 using WebsocketContextPtr = 101 websocketpp::lib::shared_ptr<boost::asio::ssl::context>; 102 103 template <typename T> 104 using Signal = 105 pajlada::Signals::Signal<T>; // type-id is vector<T, Alloc<T>> 106 107 WebsocketClient websocketClient; 108 std::unique_ptr<std::thread> mainThread; 109 110 public: 111 PubSub(); 112 113 ~PubSub() = delete; 114 115 enum class State { 116 Connected, 117 Disconnected, 118 }; 119 120 void start(); 121 isConnected() const122 bool isConnected() const 123 { 124 return this->state == State::Connected; 125 } 126 127 pajlada::Signals::NoArgSignal connected; 128 129 struct { 130 struct { 131 Signal<ClearChatAction> chatCleared; 132 Signal<DeleteAction> messageDeleted; 133 Signal<ModeChangedAction> modeChanged; 134 Signal<ModerationStateAction> moderationStateChanged; 135 136 Signal<BanAction> userBanned; 137 Signal<UnbanAction> userUnbanned; 138 139 Signal<AutomodAction> automodMessage; 140 Signal<AutomodUserAction> automodUserMessage; 141 Signal<AutomodInfoAction> automodInfoMessage; 142 } moderation; 143 144 struct { 145 // Parsing should be done in PubSubManager as well, 146 // but for now we just send the raw data 147 Signal<const rapidjson::Value &> received; 148 Signal<const rapidjson::Value &> sent; 149 } whisper; 150 151 struct { 152 Signal<rapidjson::Value &> redeemed; 153 } pointReward; 154 } signals_; 155 156 void listenToWhispers(std::shared_ptr<TwitchAccount> account); 157 158 void unlistenAllModerationActions(); 159 160 void listenToChannelModerationActions( 161 const QString &channelID, std::shared_ptr<TwitchAccount> account); 162 void listenToAutomod(const QString &channelID, 163 std::shared_ptr<TwitchAccount> account); 164 165 void listenToChannelPointRewards(const QString &channelID, 166 std::shared_ptr<TwitchAccount> account); 167 168 std::vector<std::unique_ptr<rapidjson::Document>> requests; 169 170 private: 171 void listenToTopic(const QString &topic, 172 std::shared_ptr<TwitchAccount> account); 173 174 void listen(rapidjson::Document &&msg); 175 bool tryListen(rapidjson::Document &msg); 176 177 bool isListeningToTopic(const QString &topic); 178 179 void addClient(); 180 std::atomic<bool> addingClient{false}; 181 182 State state = State::Connected; 183 184 std::map<WebsocketHandle, std::shared_ptr<detail::PubSubClient>, 185 std::owner_less<WebsocketHandle>> 186 clients; 187 188 std::unordered_map< 189 QString, std::function<void(const rapidjson::Value &, const QString &)>> 190 moderationActionHandlers; 191 192 std::unordered_map< 193 QString, std::function<void(const rapidjson::Value &, const QString &)>> 194 channelTermsActionHandlers; 195 196 void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg); 197 void onConnectionOpen(websocketpp::connection_hdl hdl); 198 void onConnectionClose(websocketpp::connection_hdl hdl); 199 WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl); 200 201 void handleResponse(const rapidjson::Document &msg); 202 void handleListenResponse(const RequestMessage &msg, bool failed); 203 void handleUnlistenResponse(const RequestMessage &msg, bool failed); 204 void handleMessageResponse(const rapidjson::Value &data); 205 206 void runThread(); 207 }; 208 209 } // namespace chatterino 210