1 #include "providers/twitch/PubsubClient.hpp"
2 
3 #include "providers/twitch/PubsubActions.hpp"
4 #include "providers/twitch/PubsubHelpers.hpp"
5 #include "singletons/Settings.hpp"
6 #include "util/DebugCount.hpp"
7 #include "util/Helpers.hpp"
8 #include "util/RapidjsonHelpers.hpp"
9 
10 #include <rapidjson/error/en.h>
11 
12 #include <exception>
13 #include <iostream>
14 #include <thread>
15 #include "common/QLogging.hpp"
16 
17 #define TWITCH_PUBSUB_URL "wss://pubsub-edge.twitch.tv"
18 
19 using websocketpp::lib::bind;
20 using websocketpp::lib::placeholders::_1;
21 using websocketpp::lib::placeholders::_2;
22 
23 namespace chatterino {
24 
25 static const char *pingPayload = "{\"type\":\"PING\"}";
26 
27 static std::map<QString, RequestMessage> sentListens;
28 static std::map<QString, RequestMessage> sentUnlistens;
29 
30 namespace detail {
31 
PubSubClient(WebsocketClient & websocketClient,WebsocketHandle handle)32     PubSubClient::PubSubClient(WebsocketClient &websocketClient,
33                                WebsocketHandle handle)
34         : websocketClient_(websocketClient)
35         , handle_(handle)
36     {
37     }
38 
start()39     void PubSubClient::start()
40     {
41         assert(!this->started_);
42 
43         this->started_ = true;
44 
45         this->ping();
46     }
47 
stop()48     void PubSubClient::stop()
49     {
50         assert(this->started_);
51 
52         this->started_ = false;
53     }
54 
listen(rapidjson::Document & message)55     bool PubSubClient::listen(rapidjson::Document &message)
56     {
57         int numRequestedListens = message["data"]["topics"].Size();
58 
59         if (this->numListens_ + numRequestedListens > MAX_PUBSUB_LISTENS)
60         {
61             // This PubSubClient is already at its peak listens
62             return false;
63         }
64         this->numListens_ += numRequestedListens;
65         DebugCount::increase("PubSub topic pending listens",
66                              numRequestedListens);
67 
68         for (const auto &topic : message["data"]["topics"].GetArray())
69         {
70             this->listeners_.emplace_back(
71                 Listener{topic.GetString(), false, false, false});
72         }
73 
74         auto nonce = generateUuid();
75         rj::set(message, "nonce", nonce);
76 
77         QString payload = rj::stringify(message);
78         sentListens[nonce] = RequestMessage{payload, numRequestedListens};
79 
80         this->send(payload.toUtf8());
81 
82         return true;
83     }
84 
unlistenPrefix(const QString & prefix)85     void PubSubClient::unlistenPrefix(const QString &prefix)
86     {
87         std::vector<QString> topics;
88 
89         for (auto it = this->listeners_.begin(); it != this->listeners_.end();)
90         {
91             const auto &listener = *it;
92             if (listener.topic.startsWith(prefix))
93             {
94                 topics.push_back(listener.topic);
95                 it = this->listeners_.erase(it);
96             }
97             else
98             {
99                 ++it;
100             }
101         }
102 
103         if (topics.empty())
104         {
105             return;
106         }
107 
108         int numRequestedUnlistens = topics.size();
109 
110         this->numListens_ -= numRequestedUnlistens;
111         DebugCount::increase("PubSub topic pending unlistens",
112                              numRequestedUnlistens);
113 
114         auto message = createUnlistenMessage(topics);
115 
116         auto nonce = generateUuid();
117         rj::set(message, "nonce", nonce);
118 
119         QString payload = rj::stringify(message);
120         sentUnlistens[nonce] = RequestMessage{payload, numRequestedUnlistens};
121 
122         this->send(payload.toUtf8());
123     }
124 
handlePong()125     void PubSubClient::handlePong()
126     {
127         assert(this->awaitingPong_);
128 
129         this->awaitingPong_ = false;
130     }
131 
isListeningToTopic(const QString & topic)132     bool PubSubClient::isListeningToTopic(const QString &topic)
133     {
134         for (const auto &listener : this->listeners_)
135         {
136             if (listener.topic == topic)
137             {
138                 return true;
139             }
140         }
141 
142         return false;
143     }
144 
ping()145     void PubSubClient::ping()
146     {
147         assert(this->started_);
148 
149         if (!this->send(pingPayload))
150         {
151             return;
152         }
153 
154         this->awaitingPong_ = true;
155 
156         auto self = this->shared_from_this();
157 
158         runAfter(this->websocketClient_.get_io_service(),
159                  std::chrono::seconds(15), [self](auto timer) {
160                      if (!self->started_)
161                      {
162                          return;
163                      }
164 
165                      if (self->awaitingPong_)
166                      {
167                          qCDebug(chatterinoPubsub)
168                              << "No pong response, disconnect!";
169                          // TODO(pajlada): Label this connection as "disconnect me"
170                      }
171                  });
172 
173         runAfter(this->websocketClient_.get_io_service(),
174                  std::chrono::minutes(5), [self](auto timer) {
175                      if (!self->started_)
176                      {
177                          return;
178                      }
179 
180                      self->ping();
181                  });
182     }
183 
send(const char * payload)184     bool PubSubClient::send(const char *payload)
185     {
186         WebsocketErrorCode ec;
187         this->websocketClient_.send(this->handle_, payload,
188                                     websocketpp::frame::opcode::text, ec);
189 
190         if (ec)
191         {
192             qCDebug(chatterinoPubsub) << "Error sending message" << payload
193                                       << ":" << ec.message().c_str();
194             // TODO(pajlada): Check which error code happened and maybe
195             // gracefully handle it
196 
197             return false;
198         }
199 
200         return true;
201     }
202 
203 }  // namespace detail
204 
PubSub()205 PubSub::PubSub()
206 {
207     qCDebug(chatterinoPubsub) << "init PubSub";
208 
209     this->moderationActionHandlers["clear"] = [this](const auto &data,
210                                                      const auto &roomID) {
211         ClearChatAction action(data, roomID);
212 
213         this->signals_.moderation.chatCleared.invoke(action);
214     };
215 
216     this->moderationActionHandlers["slowoff"] = [this](const auto &data,
217                                                        const auto &roomID) {
218         ModeChangedAction action(data, roomID);
219 
220         action.mode = ModeChangedAction::Mode::Slow;
221         action.state = ModeChangedAction::State::Off;
222 
223         this->signals_.moderation.modeChanged.invoke(action);
224     };
225 
226     this->moderationActionHandlers["slow"] = [this](const auto &data,
227                                                     const auto &roomID) {
228         ModeChangedAction action(data, roomID);
229 
230         action.mode = ModeChangedAction::Mode::Slow;
231         action.state = ModeChangedAction::State::On;
232 
233         if (!data.HasMember("args"))
234         {
235             qCDebug(chatterinoPubsub) << "Missing required args member";
236             return;
237         }
238 
239         const auto &args = data["args"];
240 
241         if (!args.IsArray())
242         {
243             qCDebug(chatterinoPubsub) << "args member must be an array";
244             return;
245         }
246 
247         if (args.Size() == 0)
248         {
249             qCDebug(chatterinoPubsub)
250                 << "Missing duration argument in slowmode on";
251             return;
252         }
253 
254         const auto &durationArg = args[0];
255 
256         if (!durationArg.IsString())
257         {
258             qCDebug(chatterinoPubsub) << "Duration arg must be a string";
259             return;
260         }
261 
262         bool ok;
263 
264         action.duration = QString(durationArg.GetString()).toUInt(&ok, 10);
265 
266         this->signals_.moderation.modeChanged.invoke(action);
267     };
268 
269     this->moderationActionHandlers["r9kbetaoff"] = [this](const auto &data,
270                                                           const auto &roomID) {
271         ModeChangedAction action(data, roomID);
272 
273         action.mode = ModeChangedAction::Mode::R9K;
274         action.state = ModeChangedAction::State::Off;
275 
276         this->signals_.moderation.modeChanged.invoke(action);
277     };
278 
279     this->moderationActionHandlers["r9kbeta"] = [this](const auto &data,
280                                                        const auto &roomID) {
281         ModeChangedAction action(data, roomID);
282 
283         action.mode = ModeChangedAction::Mode::R9K;
284         action.state = ModeChangedAction::State::On;
285 
286         this->signals_.moderation.modeChanged.invoke(action);
287     };
288 
289     this->moderationActionHandlers["subscribersoff"] =
290         [this](const auto &data, const auto &roomID) {
291             ModeChangedAction action(data, roomID);
292 
293             action.mode = ModeChangedAction::Mode::SubscribersOnly;
294             action.state = ModeChangedAction::State::Off;
295 
296             this->signals_.moderation.modeChanged.invoke(action);
297         };
298 
299     this->moderationActionHandlers["subscribers"] = [this](const auto &data,
300                                                            const auto &roomID) {
301         ModeChangedAction action(data, roomID);
302 
303         action.mode = ModeChangedAction::Mode::SubscribersOnly;
304         action.state = ModeChangedAction::State::On;
305 
306         this->signals_.moderation.modeChanged.invoke(action);
307     };
308 
309     this->moderationActionHandlers["emoteonlyoff"] =
310         [this](const auto &data, const auto &roomID) {
311             ModeChangedAction action(data, roomID);
312 
313             action.mode = ModeChangedAction::Mode::EmoteOnly;
314             action.state = ModeChangedAction::State::Off;
315 
316             this->signals_.moderation.modeChanged.invoke(action);
317         };
318 
319     this->moderationActionHandlers["emoteonly"] = [this](const auto &data,
320                                                          const auto &roomID) {
321         ModeChangedAction action(data, roomID);
322 
323         action.mode = ModeChangedAction::Mode::EmoteOnly;
324         action.state = ModeChangedAction::State::On;
325 
326         this->signals_.moderation.modeChanged.invoke(action);
327     };
328 
329     this->moderationActionHandlers["unmod"] = [this](const auto &data,
330                                                      const auto &roomID) {
331         ModerationStateAction action(data, roomID);
332 
333         getTargetUser(data, action.target);
334 
335         try
336         {
337             const auto &args = getArgs(data);
338 
339             if (args.Size() < 1)
340             {
341                 return;
342             }
343 
344             if (!rj::getSafe(args[0], action.target.login))
345             {
346                 return;
347             }
348         }
349         catch (const std::runtime_error &ex)
350         {
351             qCDebug(chatterinoPubsub)
352                 << "Error parsing moderation action:" << ex.what();
353         }
354 
355         action.modded = false;
356 
357         this->signals_.moderation.moderationStateChanged.invoke(action);
358     };
359 
360     this->moderationActionHandlers["mod"] = [this](const auto &data,
361                                                    const auto &roomID) {
362         ModerationStateAction action(data, roomID);
363         action.modded = true;
364 
365         QString innerType;
366         if (rj::getSafe(data, "type", innerType) &&
367             innerType == "chat_login_moderation")
368         {
369             // Don't display the old message type
370             return;
371         }
372 
373         if (!getTargetUser(data, action.target))
374         {
375             qCDebug(chatterinoPubsub)
376                 << "Error parsing moderation action mod: Unable to get "
377                    "target_user_id";
378             return;
379         }
380 
381         // Load target name from message.data.target_user_login
382         if (!getTargetUserName(data, action.target))
383         {
384             qCDebug(chatterinoPubsub)
385                 << "Error parsing moderation action mod: Unable to get "
386                    "target_user_name";
387             return;
388         }
389 
390         this->signals_.moderation.moderationStateChanged.invoke(action);
391     };
392 
393     this->moderationActionHandlers["timeout"] = [this](const auto &data,
394                                                        const auto &roomID) {
395         BanAction action(data, roomID);
396 
397         getCreatedByUser(data, action.source);
398         getTargetUser(data, action.target);
399 
400         try
401         {
402             const auto &args = getArgs(data);
403 
404             if (args.Size() < 2)
405             {
406                 return;
407             }
408 
409             if (!rj::getSafe(args[0], action.target.login))
410             {
411                 return;
412             }
413 
414             QString durationString;
415             if (!rj::getSafe(args[1], durationString))
416             {
417                 return;
418             }
419             bool ok;
420             action.duration = durationString.toUInt(&ok, 10);
421 
422             if (args.Size() >= 3)
423             {
424                 if (!rj::getSafe(args[2], action.reason))
425                 {
426                     return;
427                 }
428             }
429 
430             this->signals_.moderation.userBanned.invoke(action);
431         }
432         catch (const std::runtime_error &ex)
433         {
434             qCDebug(chatterinoPubsub)
435                 << "Error parsing moderation action:" << ex.what();
436         }
437     };
438 
439     this->moderationActionHandlers["delete"] = [this](const auto &data,
440                                                       const auto &roomID) {
441         DeleteAction action(data, roomID);
442 
443         getCreatedByUser(data, action.source);
444         getTargetUser(data, action.target);
445 
446         try
447         {
448             const auto &args = getArgs(data);
449 
450             if (args.Size() < 3)
451             {
452                 return;
453             }
454 
455             if (!rj::getSafe(args[0], action.target.login))
456             {
457                 return;
458             }
459 
460             if (!rj::getSafe(args[1], action.messageText))
461             {
462                 return;
463             }
464 
465             if (!rj::getSafe(args[2], action.messageId))
466             {
467                 return;
468             }
469 
470             this->signals_.moderation.messageDeleted.invoke(action);
471         }
472         catch (const std::runtime_error &ex)
473         {
474             qCDebug(chatterinoPubsub)
475                 << "Error parsing moderation action:" << ex.what();
476         }
477     };
478 
479     this->moderationActionHandlers["ban"] = [this](const auto &data,
480                                                    const auto &roomID) {
481         BanAction action(data, roomID);
482 
483         getCreatedByUser(data, action.source);
484         getTargetUser(data, action.target);
485 
486         try
487         {
488             const auto &args = getArgs(data);
489 
490             if (args.Size() < 1)
491             {
492                 return;
493             }
494 
495             if (!rj::getSafe(args[0], action.target.login))
496             {
497                 return;
498             }
499 
500             if (args.Size() >= 2)
501             {
502                 if (!rj::getSafe(args[1], action.reason))
503                 {
504                     return;
505                 }
506             }
507 
508             this->signals_.moderation.userBanned.invoke(action);
509         }
510         catch (const std::runtime_error &ex)
511         {
512             qCDebug(chatterinoPubsub)
513                 << "Error parsing moderation action:" << ex.what();
514         }
515     };
516 
517     this->moderationActionHandlers["unban"] = [this](const auto &data,
518                                                      const auto &roomID) {
519         UnbanAction action(data, roomID);
520 
521         getCreatedByUser(data, action.source);
522         getTargetUser(data, action.target);
523 
524         action.previousState = UnbanAction::Banned;
525 
526         try
527         {
528             const auto &args = getArgs(data);
529 
530             if (args.Size() < 1)
531             {
532                 return;
533             }
534 
535             if (!rj::getSafe(args[0], action.target.login))
536             {
537                 return;
538             }
539 
540             this->signals_.moderation.userUnbanned.invoke(action);
541         }
542         catch (const std::runtime_error &ex)
543         {
544             qCDebug(chatterinoPubsub)
545                 << "Error parsing moderation action:" << ex.what();
546         }
547     };
548 
549     this->moderationActionHandlers["untimeout"] = [this](const auto &data,
550                                                          const auto &roomID) {
551         UnbanAction action(data, roomID);
552 
553         getCreatedByUser(data, action.source);
554         getTargetUser(data, action.target);
555 
556         action.previousState = UnbanAction::TimedOut;
557 
558         try
559         {
560             const auto &args = getArgs(data);
561 
562             if (args.Size() < 1)
563             {
564                 return;
565             }
566 
567             if (!rj::getSafe(args[0], action.target.login))
568             {
569                 return;
570             }
571 
572             this->signals_.moderation.userUnbanned.invoke(action);
573         }
574         catch (const std::runtime_error &ex)
575         {
576             qCDebug(chatterinoPubsub)
577                 << "Error parsing moderation action:" << ex.what();
578         }
579     };
580 
581     this->moderationActionHandlers["automod_rejected"] =
582         [this](const auto &data, const auto &roomID) {
583             // Display the automod message and prompt the allow/deny
584             AutomodAction action(data, roomID);
585 
586             getCreatedByUser(data, action.source);
587             getTargetUser(data, action.target);
588 
589             try
590             {
591                 const auto &args = getArgs(data);
592                 const auto &msgID = getMsgID(data);
593 
594                 if (args.Size() < 1)
595                 {
596                     return;
597                 }
598 
599                 if (!rj::getSafe(args[0], action.target.login))
600                 {
601                     return;
602                 }
603 
604                 if (args.Size() >= 2)
605                 {
606                     if (!rj::getSafe(args[1], action.message))
607                     {
608                         return;
609                     }
610                 }
611 
612                 if (args.Size() >= 3)
613                 {
614                     if (!rj::getSafe(args[2], action.reason))
615                     {
616                         return;
617                     }
618                 }
619 
620                 if (!rj::getSafe(msgID, action.msgID))
621                 {
622                     return;
623                 }
624 
625                 this->signals_.moderation.automodMessage.invoke(action);
626             }
627             catch (const std::runtime_error &ex)
628             {
629                 qCDebug(chatterinoPubsub)
630                     << "Error parsing moderation action:" << ex.what();
631             }
632         };
633 
634     this->moderationActionHandlers["automod_message_rejected"] =
635         [this](const auto &data, const auto &roomID) {
636             AutomodInfoAction action(data, roomID);
637             action.type = AutomodInfoAction::OnHold;
638             this->signals_.moderation.automodInfoMessage.invoke(action);
639         };
640 
641     this->moderationActionHandlers["automod_message_denied"] =
642         [this](const auto &data, const auto &roomID) {
643             AutomodInfoAction action(data, roomID);
644             action.type = AutomodInfoAction::Denied;
645             this->signals_.moderation.automodInfoMessage.invoke(action);
646         };
647 
648     this->moderationActionHandlers["automod_message_approved"] =
649         [this](const auto &data, const auto &roomID) {
650             AutomodInfoAction action(data, roomID);
651             action.type = AutomodInfoAction::Approved;
652             this->signals_.moderation.automodInfoMessage.invoke(action);
653         };
654 
655     this->channelTermsActionHandlers["add_permitted_term"] =
656         [this](const auto &data, const auto &roomID) {
657             // This term got a pass through automod
658             AutomodUserAction action(data, roomID);
659             getCreatedByUser(data, action.source);
660 
661             try
662             {
663                 action.type = AutomodUserAction::AddPermitted;
664                 if (!rj::getSafe(data, "text", action.message))
665                 {
666                     return;
667                 }
668 
669                 if (!rj::getSafe(data, "requester_login", action.source.login))
670                 {
671                     return;
672                 }
673 
674                 this->signals_.moderation.automodUserMessage.invoke(action);
675             }
676             catch (const std::runtime_error &ex)
677             {
678                 qCDebug(chatterinoPubsub)
679                     << "Error parsing channel terms action:" << ex.what();
680             }
681         };
682 
683     this->channelTermsActionHandlers["add_blocked_term"] =
684         [this](const auto &data, const auto &roomID) {
685             // A term has been added
686             AutomodUserAction action(data, roomID);
687             getCreatedByUser(data, action.source);
688 
689             try
690             {
691                 action.type = AutomodUserAction::AddBlocked;
692                 if (!rj::getSafe(data, "text", action.message))
693                 {
694                     return;
695                 }
696 
697                 if (!rj::getSafe(data, "requester_login", action.source.login))
698                 {
699                     return;
700                 }
701 
702                 this->signals_.moderation.automodUserMessage.invoke(action);
703             }
704             catch (const std::runtime_error &ex)
705             {
706                 qCDebug(chatterinoPubsub)
707                     << "Error parsing channel terms action:" << ex.what();
708             }
709         };
710 
711     this->moderationActionHandlers["delete_permitted_term"] =
712         [this](const auto &data, const auto &roomID) {
713             // This term got deleted
714             AutomodUserAction action(data, roomID);
715             getCreatedByUser(data, action.source);
716 
717             try
718             {
719                 const auto &args = getArgs(data);
720                 action.type = AutomodUserAction::RemovePermitted;
721 
722                 if (args.Size() < 1)
723                 {
724                     return;
725                 }
726 
727                 if (!rj::getSafe(args[0], action.message))
728                 {
729                     return;
730                 }
731 
732                 this->signals_.moderation.automodUserMessage.invoke(action);
733             }
734             catch (const std::runtime_error &ex)
735             {
736                 qCDebug(chatterinoPubsub)
737                     << "Error parsing moderation action:" << ex.what();
738             }
739         };
740     this->channelTermsActionHandlers["delete_permitted_term"] =
741         [this](const auto &data, const auto &roomID) {
742             // This term got deleted
743             AutomodUserAction action(data, roomID);
744             getCreatedByUser(data, action.source);
745 
746             try
747             {
748                 action.type = AutomodUserAction::RemovePermitted;
749                 if (!rj::getSafe(data, "text", action.message))
750                 {
751                     return;
752                 }
753 
754                 if (!rj::getSafe(data, "requester_login", action.source.login))
755                 {
756                     return;
757                 }
758 
759                 this->signals_.moderation.automodUserMessage.invoke(action);
760             }
761             catch (const std::runtime_error &ex)
762             {
763                 qCDebug(chatterinoPubsub)
764                     << "Error parsing channel terms action:" << ex.what();
765             }
766         };
767 
768     this->moderationActionHandlers["delete_blocked_term"] =
769         [this](const auto &data, const auto &roomID) {
770             // This term got deleted
771             AutomodUserAction action(data, roomID);
772 
773             getCreatedByUser(data, action.source);
774 
775             try
776             {
777                 const auto &args = getArgs(data);
778                 action.type = AutomodUserAction::RemoveBlocked;
779 
780                 if (args.Size() < 1)
781                 {
782                     return;
783                 }
784 
785                 if (!rj::getSafe(args[0], action.message))
786                 {
787                     return;
788                 }
789 
790                 this->signals_.moderation.automodUserMessage.invoke(action);
791             }
792             catch (const std::runtime_error &ex)
793             {
794                 qCDebug(chatterinoPubsub)
795                     << "Error parsing moderation action:" << ex.what();
796             }
797         };
798     this->channelTermsActionHandlers["delete_blocked_term"] =
799         [this](const auto &data, const auto &roomID) {
800             // This term got deleted
801             AutomodUserAction action(data, roomID);
802 
803             getCreatedByUser(data, action.source);
804 
805             try
806             {
807                 action.type = AutomodUserAction::RemoveBlocked;
808                 if (!rj::getSafe(data, "text", action.message))
809                 {
810                     return;
811                 }
812 
813                 if (!rj::getSafe(data, "requester_login", action.source.login))
814                 {
815                     return;
816                 }
817 
818                 this->signals_.moderation.automodUserMessage.invoke(action);
819             }
820             catch (const std::runtime_error &ex)
821             {
822                 qCDebug(chatterinoPubsub)
823                     << "Error parsing channel terms action:" << ex.what();
824             }
825         };
826 
827     // We don't get this one anymore or anything similiar
828     // We need some new topic so we can listen
829     //
830     //this->moderationActionHandlers["modified_automod_properties"] =
831     //    [this](const auto &data, const auto &roomID) {
832     //        // The automod settings got modified
833     //        AutomodUserAction action(data, roomID);
834     //        getCreatedByUser(data, action.source);
835     //        action.type = AutomodUserAction::Properties;
836     //        this->signals_.moderation.automodUserMessage.invoke(action);
837     //    };
838 
839     this->moderationActionHandlers["denied_automod_message"] =
840         [](const auto &data, const auto &roomID) {
841             // This message got denied by a moderator
842             // qCDebug(chatterinoPubsub) << rj::stringify(data);
843         };
844 
845     this->moderationActionHandlers["approved_automod_message"] =
846         [](const auto &data, const auto &roomID) {
847             // This message got approved by a moderator
848             // qCDebug(chatterinoPubsub) << rj::stringify(data);
849         };
850 
851 #ifdef DEBUG_OFF
852     this->websocketClient.set_access_channels(websocketpp::log::alevel::none);
853 #else
854     this->websocketClient.set_access_channels(websocketpp::log::alevel::all);
855 #endif
856     this->websocketClient.clear_access_channels(
857         websocketpp::log::alevel::frame_payload |
858         websocketpp::log::alevel::frame_header);
859 
860     this->websocketClient.init_asio();
861 
862     // SSL Handshake
863     this->websocketClient.set_tls_init_handler(
864         bind(&PubSub::onTLSInit, this, ::_1));
865 
866     this->websocketClient.set_message_handler(
867         bind(&PubSub::onMessage, this, ::_1, ::_2));
868     this->websocketClient.set_open_handler(
869         bind(&PubSub::onConnectionOpen, this, ::_1));
870     this->websocketClient.set_close_handler(
871         bind(&PubSub::onConnectionClose, this, ::_1));
872 
873     // Add an initial client
874     this->addClient();
875 }
876 
addClient()877 void PubSub::addClient()
878 {
879     if (this->addingClient)
880     {
881         return;
882     }
883 
884     this->addingClient = true;
885 
886     websocketpp::lib::error_code ec;
887     auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec);
888 
889     if (ec)
890     {
891         qCDebug(chatterinoPubsub)
892             << "Unable to establish connection:" << ec.message().c_str();
893         return;
894     }
895 
896     this->websocketClient.connect(con);
897 }
898 
start()899 void PubSub::start()
900 {
901     this->mainThread.reset(
902         new std::thread(std::bind(&PubSub::runThread, this)));
903 }
904 
listenToWhispers(std::shared_ptr<TwitchAccount> account)905 void PubSub::listenToWhispers(std::shared_ptr<TwitchAccount> account)
906 {
907     static const QString topicFormat("whispers.%1");
908 
909     assert(account != nullptr);
910 
911     auto userID = account->getUserId();
912 
913     qCDebug(chatterinoPubsub) << "Connection open!";
914     websocketpp::lib::error_code ec;
915 
916     std::vector<QString> topics({topicFormat.arg(userID)});
917 
918     this->listen(createListenMessage(topics, account));
919 
920     if (ec)
921     {
922         qCDebug(chatterinoPubsub)
923             << "Unable to send message to websocket server:"
924             << ec.message().c_str();
925         return;
926     }
927 }
928 
unlistenAllModerationActions()929 void PubSub::unlistenAllModerationActions()
930 {
931     for (const auto &p : this->clients)
932     {
933         const auto &client = p.second;
934         client->unlistenPrefix("chat_moderator_actions.");
935     }
936 }
937 
listenToChannelModerationActions(const QString & channelID,std::shared_ptr<TwitchAccount> account)938 void PubSub::listenToChannelModerationActions(
939     const QString &channelID, std::shared_ptr<TwitchAccount> account)
940 {
941     static const QString topicFormat("chat_moderator_actions.%1.%2");
942     assert(!channelID.isEmpty());
943     assert(account != nullptr);
944     QString userID = account->getUserId();
945     if (userID.isEmpty())
946         return;
947 
948     auto topic = topicFormat.arg(userID, channelID);
949 
950     if (this->isListeningToTopic(topic))
951     {
952         return;
953     }
954 
955     qCDebug(chatterinoPubsub) << "Listen to topic" << topic;
956 
957     this->listenToTopic(topic, account);
958 }
959 
listenToAutomod(const QString & channelID,std::shared_ptr<TwitchAccount> account)960 void PubSub::listenToAutomod(const QString &channelID,
961                              std::shared_ptr<TwitchAccount> account)
962 {
963     static const QString topicFormat("automod-queue.%1.%2");
964     assert(!channelID.isEmpty());
965     assert(account != nullptr);
966     QString userID = account->getUserId();
967     if (userID.isEmpty())
968         return;
969 
970     auto topic = topicFormat.arg(userID, channelID);
971 
972     if (this->isListeningToTopic(topic))
973     {
974         return;
975     }
976 
977     qCDebug(chatterinoPubsub) << "Listen to topic" << topic;
978 
979     this->listenToTopic(topic, account);
980 }
981 
listenToChannelPointRewards(const QString & channelID,std::shared_ptr<TwitchAccount> account)982 void PubSub::listenToChannelPointRewards(const QString &channelID,
983                                          std::shared_ptr<TwitchAccount> account)
984 {
985     static const QString topicFormat("community-points-channel-v1.%1");
986     assert(!channelID.isEmpty());
987     assert(account != nullptr);
988 
989     auto topic = topicFormat.arg(channelID);
990 
991     if (this->isListeningToTopic(topic))
992     {
993         return;
994     }
995     qCDebug(chatterinoPubsub) << "Listen to topic" << topic;
996 
997     this->listenToTopic(topic, account);
998 }
999 
listenToTopic(const QString & topic,std::shared_ptr<TwitchAccount> account)1000 void PubSub::listenToTopic(const QString &topic,
1001                            std::shared_ptr<TwitchAccount> account)
1002 {
1003     auto message = createListenMessage({topic}, account);
1004 
1005     this->listen(std::move(message));
1006 }
1007 
listen(rapidjson::Document && msg)1008 void PubSub::listen(rapidjson::Document &&msg)
1009 {
1010     if (this->tryListen(msg))
1011     {
1012         return;
1013     }
1014 
1015     this->addClient();
1016 
1017     this->requests.emplace_back(
1018         std::make_unique<rapidjson::Document>(std::move(msg)));
1019 
1020     DebugCount::increase("PubSub topic backlog");
1021 }
1022 
tryListen(rapidjson::Document & msg)1023 bool PubSub::tryListen(rapidjson::Document &msg)
1024 {
1025     for (const auto &p : this->clients)
1026     {
1027         const auto &client = p.second;
1028         if (client->listen(msg))
1029         {
1030             return true;
1031         }
1032     }
1033 
1034     return false;
1035 }
1036 
isListeningToTopic(const QString & topic)1037 bool PubSub::isListeningToTopic(const QString &topic)
1038 {
1039     for (const auto &p : this->clients)
1040     {
1041         const auto &client = p.second;
1042         if (client->isListeningToTopic(topic))
1043         {
1044             return true;
1045         }
1046     }
1047 
1048     return false;
1049 }
1050 
onMessage(websocketpp::connection_hdl hdl,WebsocketMessagePtr websocketMessage)1051 void PubSub::onMessage(websocketpp::connection_hdl hdl,
1052                        WebsocketMessagePtr websocketMessage)
1053 {
1054     const auto &payload =
1055         QString::fromStdString(websocketMessage->get_payload());
1056 
1057     rapidjson::Document msg;
1058 
1059     rapidjson::ParseResult res = msg.Parse(payload.toUtf8());
1060 
1061     if (!res)
1062     {
1063         qCDebug(chatterinoPubsub)
1064             << QString("Error parsing message '%1' from PubSub: %2")
1065                    .arg(payload, rapidjson::GetParseError_En(res.Code()));
1066         return;
1067     }
1068 
1069     if (!msg.IsObject())
1070     {
1071         qCDebug(chatterinoPubsub)
1072             << QString("Error parsing message '%1' from PubSub. Root object is "
1073                        "not an object")
1074                    .arg(payload);
1075         return;
1076     }
1077 
1078     QString type;
1079 
1080     if (!rj::getSafe(msg, "type", type))
1081     {
1082         qCDebug(chatterinoPubsub)
1083             << "Missing required string member `type` in message root";
1084         return;
1085     }
1086 
1087     if (type == "RESPONSE")
1088     {
1089         this->handleResponse(msg);
1090     }
1091     else if (type == "MESSAGE")
1092     {
1093         if (!msg.HasMember("data"))
1094         {
1095             qCDebug(chatterinoPubsub)
1096                 << "Missing required object member `data` in message root";
1097             return;
1098         }
1099 
1100         const auto &data = msg["data"];
1101 
1102         if (!data.IsObject())
1103         {
1104             qCDebug(chatterinoPubsub) << "Member `data` must be an object";
1105             return;
1106         }
1107 
1108         this->handleMessageResponse(data);
1109     }
1110     else if (type == "PONG")
1111     {
1112         auto clientIt = this->clients.find(hdl);
1113 
1114         // If this assert goes off, there's something wrong with the connection
1115         // creation/preserving code KKona
1116         assert(clientIt != this->clients.end());
1117 
1118         auto &client = *clientIt;
1119 
1120         client.second->handlePong();
1121     }
1122     else
1123     {
1124         qCDebug(chatterinoPubsub) << "Unknown message type:" << type;
1125     }
1126 }
1127 
onConnectionOpen(WebsocketHandle hdl)1128 void PubSub::onConnectionOpen(WebsocketHandle hdl)
1129 {
1130     DebugCount::increase("PubSub connections");
1131     this->addingClient = false;
1132 
1133     auto client =
1134         std::make_shared<detail::PubSubClient>(this->websocketClient, hdl);
1135 
1136     // We separate the starting from the constructor because we will want to use
1137     // shared_from_this
1138     client->start();
1139 
1140     this->clients.emplace(hdl, client);
1141 
1142     this->connected.invoke();
1143 
1144     for (auto it = this->requests.begin(); it != this->requests.end();)
1145     {
1146         const auto &request = *it;
1147         if (client->listen(*request))
1148         {
1149             DebugCount::decrease("PubSub topic backlog");
1150             it = this->requests.erase(it);
1151         }
1152         else
1153         {
1154             ++it;
1155         }
1156     }
1157 
1158     if (!this->requests.empty())
1159     {
1160         this->addClient();
1161     }
1162 }
1163 
onConnectionClose(WebsocketHandle hdl)1164 void PubSub::onConnectionClose(WebsocketHandle hdl)
1165 {
1166     DebugCount::decrease("PubSub connections");
1167     auto clientIt = this->clients.find(hdl);
1168 
1169     // If this assert goes off, there's something wrong with the connection
1170     // creation/preserving code KKona
1171     assert(clientIt != this->clients.end());
1172 
1173     auto &client = clientIt->second;
1174 
1175     client->stop();
1176 
1177     this->clients.erase(clientIt);
1178 
1179     this->connected.invoke();
1180 }
1181 
onTLSInit(websocketpp::connection_hdl hdl)1182 PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl)
1183 {
1184     WebsocketContextPtr ctx(
1185         new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
1186 
1187     try
1188     {
1189         ctx->set_options(boost::asio::ssl::context::default_workarounds |
1190                          boost::asio::ssl::context::no_sslv2 |
1191                          boost::asio::ssl::context::single_dh_use);
1192     }
1193     catch (const std::exception &e)
1194     {
1195         qCDebug(chatterinoPubsub)
1196             << "Exception caught in OnTLSInit:" << e.what();
1197     }
1198 
1199     return ctx;
1200 }
1201 
handleResponse(const rapidjson::Document & msg)1202 void PubSub::handleResponse(const rapidjson::Document &msg)
1203 {
1204     QString error;
1205 
1206     if (!rj::getSafe(msg, "error", error))
1207         return;
1208 
1209     QString nonce;
1210     rj::getSafe(msg, "nonce", nonce);
1211 
1212     const bool failed = !error.isEmpty();
1213 
1214     if (failed)
1215     {
1216         qCDebug(chatterinoPubsub)
1217             << QString("Error %1 on nonce %2").arg(error, nonce);
1218     }
1219 
1220     if (auto it = sentListens.find(nonce); it != sentListens.end())
1221     {
1222         this->handleListenResponse(it->second, failed);
1223         return;
1224     }
1225 
1226     if (auto it = sentUnlistens.find(nonce); it != sentUnlistens.end())
1227     {
1228         this->handleUnlistenResponse(it->second, failed);
1229         return;
1230     }
1231 
1232     qCDebug(chatterinoPubsub)
1233         << "Response on unused" << nonce << "client/topic listener mismatch?";
1234 }
1235 
handleListenResponse(const RequestMessage & msg,bool failed)1236 void PubSub::handleListenResponse(const RequestMessage &msg, bool failed)
1237 {
1238     DebugCount::decrease("PubSub topic pending listens", msg.topicCount);
1239     if (failed)
1240     {
1241         DebugCount::increase("PubSub topic failed listens", msg.topicCount);
1242     }
1243     else
1244     {
1245         DebugCount::increase("PubSub topic listening", msg.topicCount);
1246     }
1247 }
1248 
handleUnlistenResponse(const RequestMessage & msg,bool failed)1249 void PubSub::handleUnlistenResponse(const RequestMessage &msg, bool failed)
1250 {
1251     DebugCount::decrease("PubSub topic pending unlistens", msg.topicCount);
1252     if (failed)
1253     {
1254         DebugCount::increase("PubSub topic failed unlistens", msg.topicCount);
1255     }
1256     else
1257     {
1258         DebugCount::decrease("PubSub topic listening", msg.topicCount);
1259     }
1260 }
1261 
handleMessageResponse(const rapidjson::Value & outerData)1262 void PubSub::handleMessageResponse(const rapidjson::Value &outerData)
1263 {
1264     QString topic;
1265     qCDebug(chatterinoPubsub) << rj::stringify(outerData);
1266 
1267     if (!rj::getSafe(outerData, "topic", topic))
1268     {
1269         qCDebug(chatterinoPubsub)
1270             << "Missing required string member `topic` in outerData";
1271         return;
1272     }
1273 
1274     QString payload;
1275 
1276     if (!rj::getSafe(outerData, "message", payload))
1277     {
1278         qCDebug(chatterinoPubsub) << "Expected string message in outerData";
1279         return;
1280     }
1281 
1282     rapidjson::Document msg;
1283 
1284     rapidjson::ParseResult res = msg.Parse(payload.toUtf8());
1285 
1286     if (!res)
1287     {
1288         qCDebug(chatterinoPubsub)
1289             << QString("Error parsing message '%1' from PubSub: %2")
1290                    .arg(payload, rapidjson::GetParseError_En(res.Code()));
1291         return;
1292     }
1293 
1294     if (topic.startsWith("whispers."))
1295     {
1296         QString whisperType;
1297 
1298         if (!rj::getSafe(msg, "type", whisperType))
1299         {
1300             qCDebug(chatterinoPubsub) << "Bad whisper data";
1301             return;
1302         }
1303 
1304         if (whisperType == "whisper_received")
1305         {
1306             this->signals_.whisper.received.invoke(msg);
1307         }
1308         else if (whisperType == "whisper_sent")
1309         {
1310             this->signals_.whisper.sent.invoke(msg);
1311         }
1312         else if (whisperType == "thread")
1313         {
1314             // Handle thread?
1315         }
1316         else
1317         {
1318             qCDebug(chatterinoPubsub) << "Invalid whisper type:" << whisperType;
1319             return;
1320         }
1321     }
1322     else if (topic.startsWith("chat_moderator_actions."))
1323     {
1324         auto topicParts = topic.split(".");
1325         assert(topicParts.length() == 3);
1326         const auto &data = msg["data"];
1327 
1328         QString moderationEventType;
1329 
1330         if (!rj::getSafe(msg, "type", moderationEventType))
1331         {
1332             qCDebug(chatterinoPubsub) << "Bad moderator event data";
1333             return;
1334         }
1335         if (moderationEventType == "moderation_action")
1336         {
1337             QString moderationAction;
1338 
1339             if (!rj::getSafe(data, "moderation_action", moderationAction))
1340             {
1341                 qCDebug(chatterinoPubsub)
1342                     << "Missing moderation action in data:"
1343                     << rj::stringify(data);
1344                 return;
1345             }
1346 
1347             auto handlerIt =
1348                 this->moderationActionHandlers.find(moderationAction);
1349 
1350             if (handlerIt == this->moderationActionHandlers.end())
1351             {
1352                 qCDebug(chatterinoPubsub)
1353                     << "No handler found for moderation action"
1354                     << moderationAction;
1355                 return;
1356             }
1357             // Invoke handler function
1358             handlerIt->second(data, topicParts[2]);
1359         }
1360         else if (moderationEventType == "channel_terms_action")
1361         {
1362             QString channelTermsAction;
1363 
1364             if (!rj::getSafe(data, "type", channelTermsAction))
1365             {
1366                 qCDebug(chatterinoPubsub)
1367                     << "Missing channel terms action in data:"
1368                     << rj::stringify(data);
1369                 return;
1370             }
1371 
1372             auto handlerIt =
1373                 this->channelTermsActionHandlers.find(channelTermsAction);
1374 
1375             if (handlerIt == this->channelTermsActionHandlers.end())
1376             {
1377                 qCDebug(chatterinoPubsub)
1378                     << "No handler found for channel terms action"
1379                     << channelTermsAction;
1380                 return;
1381             }
1382             // Invoke handler function
1383             handlerIt->second(data, topicParts[2]);
1384         }
1385     }
1386     else if (topic.startsWith("community-points-channel-v1."))
1387     {
1388         QString pointEventType;
1389         if (!rj::getSafe(msg, "type", pointEventType))
1390         {
1391             qCDebug(chatterinoPubsub) << "Bad channel point event data";
1392             return;
1393         }
1394 
1395         if (pointEventType == "reward-redeemed")
1396         {
1397             if (!rj::getSafeObject(msg, "data", msg))
1398             {
1399                 qCDebug(chatterinoPubsub)
1400                     << "No data found for redeemed reward";
1401                 return;
1402             }
1403             if (!rj::getSafeObject(msg, "redemption", msg))
1404             {
1405                 qCDebug(chatterinoPubsub)
1406                     << "No redemption info found for redeemed reward";
1407                 return;
1408             }
1409             this->signals_.pointReward.redeemed.invoke(msg);
1410         }
1411         else
1412         {
1413             qCDebug(chatterinoPubsub)
1414                 << "Invalid point event type:" << pointEventType;
1415         }
1416     }
1417     else if (topic.startsWith("automod-queue."))
1418     {
1419         auto topicParts = topic.split(".");
1420         assert(topicParts.length() == 3);
1421         auto &data = msg["data"];
1422 
1423         QString automodEventType;
1424         if (!rj::getSafe(msg, "type", automodEventType))
1425         {
1426             qCDebug(chatterinoPubsub) << "Bad automod event data";
1427             return;
1428         }
1429 
1430         if (automodEventType == "automod_caught_message")
1431         {
1432             QString status;
1433             if (!rj::getSafe(data, "status", status))
1434             {
1435                 qCDebug(chatterinoPubsub) << "Failed to get status";
1436                 return;
1437             }
1438             if (status == "PENDING")
1439             {
1440                 AutomodAction action(data, topicParts[2]);
1441                 rapidjson::Value classification;
1442                 if (!rj::getSafeObject(data, "content_classification",
1443                                        classification))
1444                 {
1445                     qCDebug(chatterinoPubsub)
1446                         << "Failed to get content_classification";
1447                     return;
1448                 }
1449 
1450                 QString contentCategory;
1451                 if (!rj::getSafe(classification, "category", contentCategory))
1452                 {
1453                     qCDebug(chatterinoPubsub)
1454                         << "Failed to get content category";
1455                     return;
1456                 }
1457                 int contentLevel;
1458                 if (!rj::getSafe(classification, "level", contentLevel))
1459                 {
1460                     qCDebug(chatterinoPubsub) << "Failed to get content level";
1461                     return;
1462                 }
1463                 action.reason = QString("%1 level %2")
1464                                     .arg(contentCategory)
1465                                     .arg(contentLevel);
1466 
1467                 rapidjson::Value messageData;
1468                 if (!rj::getSafeObject(data, "message", messageData))
1469                 {
1470                     qCDebug(chatterinoPubsub) << "Failed to get message data";
1471                     return;
1472                 }
1473 
1474                 rapidjson::Value messageContent;
1475                 if (!rj::getSafeObject(messageData, "content", messageContent))
1476                 {
1477                     qCDebug(chatterinoPubsub)
1478                         << "Failed to get message content";
1479                     return;
1480                 }
1481                 if (!rj::getSafe(messageData, "id", action.msgID))
1482                 {
1483                     qCDebug(chatterinoPubsub) << "Failed to get message id";
1484                     return;
1485                 }
1486 
1487                 if (!rj::getSafe(messageContent, "text", action.message))
1488                 {
1489                     qCDebug(chatterinoPubsub) << "Failed to get message text";
1490                     return;
1491                 }
1492 
1493                 // this message also contains per-word automod data, which could be implemented
1494 
1495                 // extract sender data manually because twitch loves not being consistent
1496                 rapidjson::Value senderData;
1497                 if (!rj::getSafeObject(messageData, "sender", senderData))
1498                 {
1499                     qCDebug(chatterinoPubsub) << "Failed to get sender";
1500                     return;
1501                 }
1502                 QString senderId;
1503                 if (!rj::getSafe(senderData, "user_id", senderId))
1504                 {
1505                     qCDebug(chatterinoPubsub) << "Failed to get sender user id";
1506                     return;
1507                 }
1508                 QString senderLogin;
1509                 if (!rj::getSafe(senderData, "login", senderLogin))
1510                 {
1511                     qCDebug(chatterinoPubsub) << "Failed to get sender login";
1512                     return;
1513                 }
1514                 QString senderDisplayName = senderLogin;
1515                 bool hasLocalizedName = false;
1516                 if (rj::getSafe(senderData, "display_name", senderDisplayName))
1517                 {
1518                     // check for non-ascii display names
1519                     if (QString::compare(senderLogin, senderDisplayName,
1520                                          Qt::CaseInsensitive) != 0)
1521                     {
1522                         hasLocalizedName = true;
1523                     }
1524                 }
1525                 QColor senderColor;
1526                 QString senderColor_;
1527                 if (rj::getSafe(senderData, "chat_color", senderColor_))
1528                 {
1529                     senderColor = QColor(senderColor_);
1530                 }
1531                 else if (getSettings()->colorizeNicknames)
1532                 {
1533                     // color may be not present if user is a grey-name
1534                     senderColor = getRandomColor(senderId);
1535                 }
1536                 // handle username style based on prefered setting
1537                 switch (getSettings()->usernameDisplayMode.getValue())
1538                 {
1539                     case UsernameDisplayMode::Username: {
1540                         if (hasLocalizedName)
1541                         {
1542                             senderDisplayName = senderLogin;
1543                         }
1544                         break;
1545                     }
1546                     case UsernameDisplayMode::LocalizedName: {
1547                         break;
1548                     }
1549                     case UsernameDisplayMode::UsernameAndLocalizedName: {
1550                         if (hasLocalizedName)
1551                         {
1552                             senderDisplayName = QString("%1(%2)").arg(
1553                                 senderLogin, senderDisplayName);
1554                         }
1555                         break;
1556                     }
1557                 }
1558 
1559                 action.target = ActionUser{senderId, senderLogin,
1560                                            senderDisplayName, senderColor};
1561                 this->signals_.moderation.automodMessage.invoke(action);
1562             }
1563             // "ALLOWED" and "DENIED" statuses remain unimplemented
1564             // They are versions of automod_message_(denied|approved) but for mods.
1565         }
1566     }
1567     else
1568     {
1569         qCDebug(chatterinoPubsub) << "Unknown topic:" << topic;
1570         return;
1571     }
1572 }
1573 
runThread()1574 void PubSub::runThread()
1575 {
1576     qCDebug(chatterinoPubsub) << "Start pubsub manager thread";
1577     this->websocketClient.run();
1578     qCDebug(chatterinoPubsub) << "Done with pubsub manager thread";
1579 }
1580 
1581 }  // namespace chatterino
1582