1 #include "providers/twitch/PubsubClient.hpp"
2
3 #include "providers/twitch/PubsubActions.hpp"
4 #include "providers/twitch/PubsubHelpers.hpp"
5 #include "singletons/Settings.hpp"
6 #include "util/DebugCount.hpp"
7 #include "util/Helpers.hpp"
8 #include "util/RapidjsonHelpers.hpp"
9
10 #include <rapidjson/error/en.h>
11
12 #include <exception>
13 #include <iostream>
14 #include <thread>
15 #include "common/QLogging.hpp"
16
17 #define TWITCH_PUBSUB_URL "wss://pubsub-edge.twitch.tv"
18
19 using websocketpp::lib::bind;
20 using websocketpp::lib::placeholders::_1;
21 using websocketpp::lib::placeholders::_2;
22
23 namespace chatterino {
24
25 static const char *pingPayload = "{\"type\":\"PING\"}";
26
27 static std::map<QString, RequestMessage> sentListens;
28 static std::map<QString, RequestMessage> sentUnlistens;
29
30 namespace detail {
31
PubSubClient(WebsocketClient & websocketClient,WebsocketHandle handle)32 PubSubClient::PubSubClient(WebsocketClient &websocketClient,
33 WebsocketHandle handle)
34 : websocketClient_(websocketClient)
35 , handle_(handle)
36 {
37 }
38
start()39 void PubSubClient::start()
40 {
41 assert(!this->started_);
42
43 this->started_ = true;
44
45 this->ping();
46 }
47
stop()48 void PubSubClient::stop()
49 {
50 assert(this->started_);
51
52 this->started_ = false;
53 }
54
listen(rapidjson::Document & message)55 bool PubSubClient::listen(rapidjson::Document &message)
56 {
57 int numRequestedListens = message["data"]["topics"].Size();
58
59 if (this->numListens_ + numRequestedListens > MAX_PUBSUB_LISTENS)
60 {
61 // This PubSubClient is already at its peak listens
62 return false;
63 }
64 this->numListens_ += numRequestedListens;
65 DebugCount::increase("PubSub topic pending listens",
66 numRequestedListens);
67
68 for (const auto &topic : message["data"]["topics"].GetArray())
69 {
70 this->listeners_.emplace_back(
71 Listener{topic.GetString(), false, false, false});
72 }
73
74 auto nonce = generateUuid();
75 rj::set(message, "nonce", nonce);
76
77 QString payload = rj::stringify(message);
78 sentListens[nonce] = RequestMessage{payload, numRequestedListens};
79
80 this->send(payload.toUtf8());
81
82 return true;
83 }
84
unlistenPrefix(const QString & prefix)85 void PubSubClient::unlistenPrefix(const QString &prefix)
86 {
87 std::vector<QString> topics;
88
89 for (auto it = this->listeners_.begin(); it != this->listeners_.end();)
90 {
91 const auto &listener = *it;
92 if (listener.topic.startsWith(prefix))
93 {
94 topics.push_back(listener.topic);
95 it = this->listeners_.erase(it);
96 }
97 else
98 {
99 ++it;
100 }
101 }
102
103 if (topics.empty())
104 {
105 return;
106 }
107
108 int numRequestedUnlistens = topics.size();
109
110 this->numListens_ -= numRequestedUnlistens;
111 DebugCount::increase("PubSub topic pending unlistens",
112 numRequestedUnlistens);
113
114 auto message = createUnlistenMessage(topics);
115
116 auto nonce = generateUuid();
117 rj::set(message, "nonce", nonce);
118
119 QString payload = rj::stringify(message);
120 sentUnlistens[nonce] = RequestMessage{payload, numRequestedUnlistens};
121
122 this->send(payload.toUtf8());
123 }
124
handlePong()125 void PubSubClient::handlePong()
126 {
127 assert(this->awaitingPong_);
128
129 this->awaitingPong_ = false;
130 }
131
isListeningToTopic(const QString & topic)132 bool PubSubClient::isListeningToTopic(const QString &topic)
133 {
134 for (const auto &listener : this->listeners_)
135 {
136 if (listener.topic == topic)
137 {
138 return true;
139 }
140 }
141
142 return false;
143 }
144
ping()145 void PubSubClient::ping()
146 {
147 assert(this->started_);
148
149 if (!this->send(pingPayload))
150 {
151 return;
152 }
153
154 this->awaitingPong_ = true;
155
156 auto self = this->shared_from_this();
157
158 runAfter(this->websocketClient_.get_io_service(),
159 std::chrono::seconds(15), [self](auto timer) {
160 if (!self->started_)
161 {
162 return;
163 }
164
165 if (self->awaitingPong_)
166 {
167 qCDebug(chatterinoPubsub)
168 << "No pong response, disconnect!";
169 // TODO(pajlada): Label this connection as "disconnect me"
170 }
171 });
172
173 runAfter(this->websocketClient_.get_io_service(),
174 std::chrono::minutes(5), [self](auto timer) {
175 if (!self->started_)
176 {
177 return;
178 }
179
180 self->ping();
181 });
182 }
183
send(const char * payload)184 bool PubSubClient::send(const char *payload)
185 {
186 WebsocketErrorCode ec;
187 this->websocketClient_.send(this->handle_, payload,
188 websocketpp::frame::opcode::text, ec);
189
190 if (ec)
191 {
192 qCDebug(chatterinoPubsub) << "Error sending message" << payload
193 << ":" << ec.message().c_str();
194 // TODO(pajlada): Check which error code happened and maybe
195 // gracefully handle it
196
197 return false;
198 }
199
200 return true;
201 }
202
203 } // namespace detail
204
PubSub()205 PubSub::PubSub()
206 {
207 qCDebug(chatterinoPubsub) << "init PubSub";
208
209 this->moderationActionHandlers["clear"] = [this](const auto &data,
210 const auto &roomID) {
211 ClearChatAction action(data, roomID);
212
213 this->signals_.moderation.chatCleared.invoke(action);
214 };
215
216 this->moderationActionHandlers["slowoff"] = [this](const auto &data,
217 const auto &roomID) {
218 ModeChangedAction action(data, roomID);
219
220 action.mode = ModeChangedAction::Mode::Slow;
221 action.state = ModeChangedAction::State::Off;
222
223 this->signals_.moderation.modeChanged.invoke(action);
224 };
225
226 this->moderationActionHandlers["slow"] = [this](const auto &data,
227 const auto &roomID) {
228 ModeChangedAction action(data, roomID);
229
230 action.mode = ModeChangedAction::Mode::Slow;
231 action.state = ModeChangedAction::State::On;
232
233 if (!data.HasMember("args"))
234 {
235 qCDebug(chatterinoPubsub) << "Missing required args member";
236 return;
237 }
238
239 const auto &args = data["args"];
240
241 if (!args.IsArray())
242 {
243 qCDebug(chatterinoPubsub) << "args member must be an array";
244 return;
245 }
246
247 if (args.Size() == 0)
248 {
249 qCDebug(chatterinoPubsub)
250 << "Missing duration argument in slowmode on";
251 return;
252 }
253
254 const auto &durationArg = args[0];
255
256 if (!durationArg.IsString())
257 {
258 qCDebug(chatterinoPubsub) << "Duration arg must be a string";
259 return;
260 }
261
262 bool ok;
263
264 action.duration = QString(durationArg.GetString()).toUInt(&ok, 10);
265
266 this->signals_.moderation.modeChanged.invoke(action);
267 };
268
269 this->moderationActionHandlers["r9kbetaoff"] = [this](const auto &data,
270 const auto &roomID) {
271 ModeChangedAction action(data, roomID);
272
273 action.mode = ModeChangedAction::Mode::R9K;
274 action.state = ModeChangedAction::State::Off;
275
276 this->signals_.moderation.modeChanged.invoke(action);
277 };
278
279 this->moderationActionHandlers["r9kbeta"] = [this](const auto &data,
280 const auto &roomID) {
281 ModeChangedAction action(data, roomID);
282
283 action.mode = ModeChangedAction::Mode::R9K;
284 action.state = ModeChangedAction::State::On;
285
286 this->signals_.moderation.modeChanged.invoke(action);
287 };
288
289 this->moderationActionHandlers["subscribersoff"] =
290 [this](const auto &data, const auto &roomID) {
291 ModeChangedAction action(data, roomID);
292
293 action.mode = ModeChangedAction::Mode::SubscribersOnly;
294 action.state = ModeChangedAction::State::Off;
295
296 this->signals_.moderation.modeChanged.invoke(action);
297 };
298
299 this->moderationActionHandlers["subscribers"] = [this](const auto &data,
300 const auto &roomID) {
301 ModeChangedAction action(data, roomID);
302
303 action.mode = ModeChangedAction::Mode::SubscribersOnly;
304 action.state = ModeChangedAction::State::On;
305
306 this->signals_.moderation.modeChanged.invoke(action);
307 };
308
309 this->moderationActionHandlers["emoteonlyoff"] =
310 [this](const auto &data, const auto &roomID) {
311 ModeChangedAction action(data, roomID);
312
313 action.mode = ModeChangedAction::Mode::EmoteOnly;
314 action.state = ModeChangedAction::State::Off;
315
316 this->signals_.moderation.modeChanged.invoke(action);
317 };
318
319 this->moderationActionHandlers["emoteonly"] = [this](const auto &data,
320 const auto &roomID) {
321 ModeChangedAction action(data, roomID);
322
323 action.mode = ModeChangedAction::Mode::EmoteOnly;
324 action.state = ModeChangedAction::State::On;
325
326 this->signals_.moderation.modeChanged.invoke(action);
327 };
328
329 this->moderationActionHandlers["unmod"] = [this](const auto &data,
330 const auto &roomID) {
331 ModerationStateAction action(data, roomID);
332
333 getTargetUser(data, action.target);
334
335 try
336 {
337 const auto &args = getArgs(data);
338
339 if (args.Size() < 1)
340 {
341 return;
342 }
343
344 if (!rj::getSafe(args[0], action.target.login))
345 {
346 return;
347 }
348 }
349 catch (const std::runtime_error &ex)
350 {
351 qCDebug(chatterinoPubsub)
352 << "Error parsing moderation action:" << ex.what();
353 }
354
355 action.modded = false;
356
357 this->signals_.moderation.moderationStateChanged.invoke(action);
358 };
359
360 this->moderationActionHandlers["mod"] = [this](const auto &data,
361 const auto &roomID) {
362 ModerationStateAction action(data, roomID);
363 action.modded = true;
364
365 QString innerType;
366 if (rj::getSafe(data, "type", innerType) &&
367 innerType == "chat_login_moderation")
368 {
369 // Don't display the old message type
370 return;
371 }
372
373 if (!getTargetUser(data, action.target))
374 {
375 qCDebug(chatterinoPubsub)
376 << "Error parsing moderation action mod: Unable to get "
377 "target_user_id";
378 return;
379 }
380
381 // Load target name from message.data.target_user_login
382 if (!getTargetUserName(data, action.target))
383 {
384 qCDebug(chatterinoPubsub)
385 << "Error parsing moderation action mod: Unable to get "
386 "target_user_name";
387 return;
388 }
389
390 this->signals_.moderation.moderationStateChanged.invoke(action);
391 };
392
393 this->moderationActionHandlers["timeout"] = [this](const auto &data,
394 const auto &roomID) {
395 BanAction action(data, roomID);
396
397 getCreatedByUser(data, action.source);
398 getTargetUser(data, action.target);
399
400 try
401 {
402 const auto &args = getArgs(data);
403
404 if (args.Size() < 2)
405 {
406 return;
407 }
408
409 if (!rj::getSafe(args[0], action.target.login))
410 {
411 return;
412 }
413
414 QString durationString;
415 if (!rj::getSafe(args[1], durationString))
416 {
417 return;
418 }
419 bool ok;
420 action.duration = durationString.toUInt(&ok, 10);
421
422 if (args.Size() >= 3)
423 {
424 if (!rj::getSafe(args[2], action.reason))
425 {
426 return;
427 }
428 }
429
430 this->signals_.moderation.userBanned.invoke(action);
431 }
432 catch (const std::runtime_error &ex)
433 {
434 qCDebug(chatterinoPubsub)
435 << "Error parsing moderation action:" << ex.what();
436 }
437 };
438
439 this->moderationActionHandlers["delete"] = [this](const auto &data,
440 const auto &roomID) {
441 DeleteAction action(data, roomID);
442
443 getCreatedByUser(data, action.source);
444 getTargetUser(data, action.target);
445
446 try
447 {
448 const auto &args = getArgs(data);
449
450 if (args.Size() < 3)
451 {
452 return;
453 }
454
455 if (!rj::getSafe(args[0], action.target.login))
456 {
457 return;
458 }
459
460 if (!rj::getSafe(args[1], action.messageText))
461 {
462 return;
463 }
464
465 if (!rj::getSafe(args[2], action.messageId))
466 {
467 return;
468 }
469
470 this->signals_.moderation.messageDeleted.invoke(action);
471 }
472 catch (const std::runtime_error &ex)
473 {
474 qCDebug(chatterinoPubsub)
475 << "Error parsing moderation action:" << ex.what();
476 }
477 };
478
479 this->moderationActionHandlers["ban"] = [this](const auto &data,
480 const auto &roomID) {
481 BanAction action(data, roomID);
482
483 getCreatedByUser(data, action.source);
484 getTargetUser(data, action.target);
485
486 try
487 {
488 const auto &args = getArgs(data);
489
490 if (args.Size() < 1)
491 {
492 return;
493 }
494
495 if (!rj::getSafe(args[0], action.target.login))
496 {
497 return;
498 }
499
500 if (args.Size() >= 2)
501 {
502 if (!rj::getSafe(args[1], action.reason))
503 {
504 return;
505 }
506 }
507
508 this->signals_.moderation.userBanned.invoke(action);
509 }
510 catch (const std::runtime_error &ex)
511 {
512 qCDebug(chatterinoPubsub)
513 << "Error parsing moderation action:" << ex.what();
514 }
515 };
516
517 this->moderationActionHandlers["unban"] = [this](const auto &data,
518 const auto &roomID) {
519 UnbanAction action(data, roomID);
520
521 getCreatedByUser(data, action.source);
522 getTargetUser(data, action.target);
523
524 action.previousState = UnbanAction::Banned;
525
526 try
527 {
528 const auto &args = getArgs(data);
529
530 if (args.Size() < 1)
531 {
532 return;
533 }
534
535 if (!rj::getSafe(args[0], action.target.login))
536 {
537 return;
538 }
539
540 this->signals_.moderation.userUnbanned.invoke(action);
541 }
542 catch (const std::runtime_error &ex)
543 {
544 qCDebug(chatterinoPubsub)
545 << "Error parsing moderation action:" << ex.what();
546 }
547 };
548
549 this->moderationActionHandlers["untimeout"] = [this](const auto &data,
550 const auto &roomID) {
551 UnbanAction action(data, roomID);
552
553 getCreatedByUser(data, action.source);
554 getTargetUser(data, action.target);
555
556 action.previousState = UnbanAction::TimedOut;
557
558 try
559 {
560 const auto &args = getArgs(data);
561
562 if (args.Size() < 1)
563 {
564 return;
565 }
566
567 if (!rj::getSafe(args[0], action.target.login))
568 {
569 return;
570 }
571
572 this->signals_.moderation.userUnbanned.invoke(action);
573 }
574 catch (const std::runtime_error &ex)
575 {
576 qCDebug(chatterinoPubsub)
577 << "Error parsing moderation action:" << ex.what();
578 }
579 };
580
581 this->moderationActionHandlers["automod_rejected"] =
582 [this](const auto &data, const auto &roomID) {
583 // Display the automod message and prompt the allow/deny
584 AutomodAction action(data, roomID);
585
586 getCreatedByUser(data, action.source);
587 getTargetUser(data, action.target);
588
589 try
590 {
591 const auto &args = getArgs(data);
592 const auto &msgID = getMsgID(data);
593
594 if (args.Size() < 1)
595 {
596 return;
597 }
598
599 if (!rj::getSafe(args[0], action.target.login))
600 {
601 return;
602 }
603
604 if (args.Size() >= 2)
605 {
606 if (!rj::getSafe(args[1], action.message))
607 {
608 return;
609 }
610 }
611
612 if (args.Size() >= 3)
613 {
614 if (!rj::getSafe(args[2], action.reason))
615 {
616 return;
617 }
618 }
619
620 if (!rj::getSafe(msgID, action.msgID))
621 {
622 return;
623 }
624
625 this->signals_.moderation.automodMessage.invoke(action);
626 }
627 catch (const std::runtime_error &ex)
628 {
629 qCDebug(chatterinoPubsub)
630 << "Error parsing moderation action:" << ex.what();
631 }
632 };
633
634 this->moderationActionHandlers["automod_message_rejected"] =
635 [this](const auto &data, const auto &roomID) {
636 AutomodInfoAction action(data, roomID);
637 action.type = AutomodInfoAction::OnHold;
638 this->signals_.moderation.automodInfoMessage.invoke(action);
639 };
640
641 this->moderationActionHandlers["automod_message_denied"] =
642 [this](const auto &data, const auto &roomID) {
643 AutomodInfoAction action(data, roomID);
644 action.type = AutomodInfoAction::Denied;
645 this->signals_.moderation.automodInfoMessage.invoke(action);
646 };
647
648 this->moderationActionHandlers["automod_message_approved"] =
649 [this](const auto &data, const auto &roomID) {
650 AutomodInfoAction action(data, roomID);
651 action.type = AutomodInfoAction::Approved;
652 this->signals_.moderation.automodInfoMessage.invoke(action);
653 };
654
655 this->channelTermsActionHandlers["add_permitted_term"] =
656 [this](const auto &data, const auto &roomID) {
657 // This term got a pass through automod
658 AutomodUserAction action(data, roomID);
659 getCreatedByUser(data, action.source);
660
661 try
662 {
663 action.type = AutomodUserAction::AddPermitted;
664 if (!rj::getSafe(data, "text", action.message))
665 {
666 return;
667 }
668
669 if (!rj::getSafe(data, "requester_login", action.source.login))
670 {
671 return;
672 }
673
674 this->signals_.moderation.automodUserMessage.invoke(action);
675 }
676 catch (const std::runtime_error &ex)
677 {
678 qCDebug(chatterinoPubsub)
679 << "Error parsing channel terms action:" << ex.what();
680 }
681 };
682
683 this->channelTermsActionHandlers["add_blocked_term"] =
684 [this](const auto &data, const auto &roomID) {
685 // A term has been added
686 AutomodUserAction action(data, roomID);
687 getCreatedByUser(data, action.source);
688
689 try
690 {
691 action.type = AutomodUserAction::AddBlocked;
692 if (!rj::getSafe(data, "text", action.message))
693 {
694 return;
695 }
696
697 if (!rj::getSafe(data, "requester_login", action.source.login))
698 {
699 return;
700 }
701
702 this->signals_.moderation.automodUserMessage.invoke(action);
703 }
704 catch (const std::runtime_error &ex)
705 {
706 qCDebug(chatterinoPubsub)
707 << "Error parsing channel terms action:" << ex.what();
708 }
709 };
710
711 this->moderationActionHandlers["delete_permitted_term"] =
712 [this](const auto &data, const auto &roomID) {
713 // This term got deleted
714 AutomodUserAction action(data, roomID);
715 getCreatedByUser(data, action.source);
716
717 try
718 {
719 const auto &args = getArgs(data);
720 action.type = AutomodUserAction::RemovePermitted;
721
722 if (args.Size() < 1)
723 {
724 return;
725 }
726
727 if (!rj::getSafe(args[0], action.message))
728 {
729 return;
730 }
731
732 this->signals_.moderation.automodUserMessage.invoke(action);
733 }
734 catch (const std::runtime_error &ex)
735 {
736 qCDebug(chatterinoPubsub)
737 << "Error parsing moderation action:" << ex.what();
738 }
739 };
740 this->channelTermsActionHandlers["delete_permitted_term"] =
741 [this](const auto &data, const auto &roomID) {
742 // This term got deleted
743 AutomodUserAction action(data, roomID);
744 getCreatedByUser(data, action.source);
745
746 try
747 {
748 action.type = AutomodUserAction::RemovePermitted;
749 if (!rj::getSafe(data, "text", action.message))
750 {
751 return;
752 }
753
754 if (!rj::getSafe(data, "requester_login", action.source.login))
755 {
756 return;
757 }
758
759 this->signals_.moderation.automodUserMessage.invoke(action);
760 }
761 catch (const std::runtime_error &ex)
762 {
763 qCDebug(chatterinoPubsub)
764 << "Error parsing channel terms action:" << ex.what();
765 }
766 };
767
768 this->moderationActionHandlers["delete_blocked_term"] =
769 [this](const auto &data, const auto &roomID) {
770 // This term got deleted
771 AutomodUserAction action(data, roomID);
772
773 getCreatedByUser(data, action.source);
774
775 try
776 {
777 const auto &args = getArgs(data);
778 action.type = AutomodUserAction::RemoveBlocked;
779
780 if (args.Size() < 1)
781 {
782 return;
783 }
784
785 if (!rj::getSafe(args[0], action.message))
786 {
787 return;
788 }
789
790 this->signals_.moderation.automodUserMessage.invoke(action);
791 }
792 catch (const std::runtime_error &ex)
793 {
794 qCDebug(chatterinoPubsub)
795 << "Error parsing moderation action:" << ex.what();
796 }
797 };
798 this->channelTermsActionHandlers["delete_blocked_term"] =
799 [this](const auto &data, const auto &roomID) {
800 // This term got deleted
801 AutomodUserAction action(data, roomID);
802
803 getCreatedByUser(data, action.source);
804
805 try
806 {
807 action.type = AutomodUserAction::RemoveBlocked;
808 if (!rj::getSafe(data, "text", action.message))
809 {
810 return;
811 }
812
813 if (!rj::getSafe(data, "requester_login", action.source.login))
814 {
815 return;
816 }
817
818 this->signals_.moderation.automodUserMessage.invoke(action);
819 }
820 catch (const std::runtime_error &ex)
821 {
822 qCDebug(chatterinoPubsub)
823 << "Error parsing channel terms action:" << ex.what();
824 }
825 };
826
827 // We don't get this one anymore or anything similiar
828 // We need some new topic so we can listen
829 //
830 //this->moderationActionHandlers["modified_automod_properties"] =
831 // [this](const auto &data, const auto &roomID) {
832 // // The automod settings got modified
833 // AutomodUserAction action(data, roomID);
834 // getCreatedByUser(data, action.source);
835 // action.type = AutomodUserAction::Properties;
836 // this->signals_.moderation.automodUserMessage.invoke(action);
837 // };
838
839 this->moderationActionHandlers["denied_automod_message"] =
840 [](const auto &data, const auto &roomID) {
841 // This message got denied by a moderator
842 // qCDebug(chatterinoPubsub) << rj::stringify(data);
843 };
844
845 this->moderationActionHandlers["approved_automod_message"] =
846 [](const auto &data, const auto &roomID) {
847 // This message got approved by a moderator
848 // qCDebug(chatterinoPubsub) << rj::stringify(data);
849 };
850
851 #ifdef DEBUG_OFF
852 this->websocketClient.set_access_channels(websocketpp::log::alevel::none);
853 #else
854 this->websocketClient.set_access_channels(websocketpp::log::alevel::all);
855 #endif
856 this->websocketClient.clear_access_channels(
857 websocketpp::log::alevel::frame_payload |
858 websocketpp::log::alevel::frame_header);
859
860 this->websocketClient.init_asio();
861
862 // SSL Handshake
863 this->websocketClient.set_tls_init_handler(
864 bind(&PubSub::onTLSInit, this, ::_1));
865
866 this->websocketClient.set_message_handler(
867 bind(&PubSub::onMessage, this, ::_1, ::_2));
868 this->websocketClient.set_open_handler(
869 bind(&PubSub::onConnectionOpen, this, ::_1));
870 this->websocketClient.set_close_handler(
871 bind(&PubSub::onConnectionClose, this, ::_1));
872
873 // Add an initial client
874 this->addClient();
875 }
876
addClient()877 void PubSub::addClient()
878 {
879 if (this->addingClient)
880 {
881 return;
882 }
883
884 this->addingClient = true;
885
886 websocketpp::lib::error_code ec;
887 auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec);
888
889 if (ec)
890 {
891 qCDebug(chatterinoPubsub)
892 << "Unable to establish connection:" << ec.message().c_str();
893 return;
894 }
895
896 this->websocketClient.connect(con);
897 }
898
start()899 void PubSub::start()
900 {
901 this->mainThread.reset(
902 new std::thread(std::bind(&PubSub::runThread, this)));
903 }
904
listenToWhispers(std::shared_ptr<TwitchAccount> account)905 void PubSub::listenToWhispers(std::shared_ptr<TwitchAccount> account)
906 {
907 static const QString topicFormat("whispers.%1");
908
909 assert(account != nullptr);
910
911 auto userID = account->getUserId();
912
913 qCDebug(chatterinoPubsub) << "Connection open!";
914 websocketpp::lib::error_code ec;
915
916 std::vector<QString> topics({topicFormat.arg(userID)});
917
918 this->listen(createListenMessage(topics, account));
919
920 if (ec)
921 {
922 qCDebug(chatterinoPubsub)
923 << "Unable to send message to websocket server:"
924 << ec.message().c_str();
925 return;
926 }
927 }
928
unlistenAllModerationActions()929 void PubSub::unlistenAllModerationActions()
930 {
931 for (const auto &p : this->clients)
932 {
933 const auto &client = p.second;
934 client->unlistenPrefix("chat_moderator_actions.");
935 }
936 }
937
listenToChannelModerationActions(const QString & channelID,std::shared_ptr<TwitchAccount> account)938 void PubSub::listenToChannelModerationActions(
939 const QString &channelID, std::shared_ptr<TwitchAccount> account)
940 {
941 static const QString topicFormat("chat_moderator_actions.%1.%2");
942 assert(!channelID.isEmpty());
943 assert(account != nullptr);
944 QString userID = account->getUserId();
945 if (userID.isEmpty())
946 return;
947
948 auto topic = topicFormat.arg(userID, channelID);
949
950 if (this->isListeningToTopic(topic))
951 {
952 return;
953 }
954
955 qCDebug(chatterinoPubsub) << "Listen to topic" << topic;
956
957 this->listenToTopic(topic, account);
958 }
959
listenToAutomod(const QString & channelID,std::shared_ptr<TwitchAccount> account)960 void PubSub::listenToAutomod(const QString &channelID,
961 std::shared_ptr<TwitchAccount> account)
962 {
963 static const QString topicFormat("automod-queue.%1.%2");
964 assert(!channelID.isEmpty());
965 assert(account != nullptr);
966 QString userID = account->getUserId();
967 if (userID.isEmpty())
968 return;
969
970 auto topic = topicFormat.arg(userID, channelID);
971
972 if (this->isListeningToTopic(topic))
973 {
974 return;
975 }
976
977 qCDebug(chatterinoPubsub) << "Listen to topic" << topic;
978
979 this->listenToTopic(topic, account);
980 }
981
listenToChannelPointRewards(const QString & channelID,std::shared_ptr<TwitchAccount> account)982 void PubSub::listenToChannelPointRewards(const QString &channelID,
983 std::shared_ptr<TwitchAccount> account)
984 {
985 static const QString topicFormat("community-points-channel-v1.%1");
986 assert(!channelID.isEmpty());
987 assert(account != nullptr);
988
989 auto topic = topicFormat.arg(channelID);
990
991 if (this->isListeningToTopic(topic))
992 {
993 return;
994 }
995 qCDebug(chatterinoPubsub) << "Listen to topic" << topic;
996
997 this->listenToTopic(topic, account);
998 }
999
listenToTopic(const QString & topic,std::shared_ptr<TwitchAccount> account)1000 void PubSub::listenToTopic(const QString &topic,
1001 std::shared_ptr<TwitchAccount> account)
1002 {
1003 auto message = createListenMessage({topic}, account);
1004
1005 this->listen(std::move(message));
1006 }
1007
listen(rapidjson::Document && msg)1008 void PubSub::listen(rapidjson::Document &&msg)
1009 {
1010 if (this->tryListen(msg))
1011 {
1012 return;
1013 }
1014
1015 this->addClient();
1016
1017 this->requests.emplace_back(
1018 std::make_unique<rapidjson::Document>(std::move(msg)));
1019
1020 DebugCount::increase("PubSub topic backlog");
1021 }
1022
tryListen(rapidjson::Document & msg)1023 bool PubSub::tryListen(rapidjson::Document &msg)
1024 {
1025 for (const auto &p : this->clients)
1026 {
1027 const auto &client = p.second;
1028 if (client->listen(msg))
1029 {
1030 return true;
1031 }
1032 }
1033
1034 return false;
1035 }
1036
isListeningToTopic(const QString & topic)1037 bool PubSub::isListeningToTopic(const QString &topic)
1038 {
1039 for (const auto &p : this->clients)
1040 {
1041 const auto &client = p.second;
1042 if (client->isListeningToTopic(topic))
1043 {
1044 return true;
1045 }
1046 }
1047
1048 return false;
1049 }
1050
onMessage(websocketpp::connection_hdl hdl,WebsocketMessagePtr websocketMessage)1051 void PubSub::onMessage(websocketpp::connection_hdl hdl,
1052 WebsocketMessagePtr websocketMessage)
1053 {
1054 const auto &payload =
1055 QString::fromStdString(websocketMessage->get_payload());
1056
1057 rapidjson::Document msg;
1058
1059 rapidjson::ParseResult res = msg.Parse(payload.toUtf8());
1060
1061 if (!res)
1062 {
1063 qCDebug(chatterinoPubsub)
1064 << QString("Error parsing message '%1' from PubSub: %2")
1065 .arg(payload, rapidjson::GetParseError_En(res.Code()));
1066 return;
1067 }
1068
1069 if (!msg.IsObject())
1070 {
1071 qCDebug(chatterinoPubsub)
1072 << QString("Error parsing message '%1' from PubSub. Root object is "
1073 "not an object")
1074 .arg(payload);
1075 return;
1076 }
1077
1078 QString type;
1079
1080 if (!rj::getSafe(msg, "type", type))
1081 {
1082 qCDebug(chatterinoPubsub)
1083 << "Missing required string member `type` in message root";
1084 return;
1085 }
1086
1087 if (type == "RESPONSE")
1088 {
1089 this->handleResponse(msg);
1090 }
1091 else if (type == "MESSAGE")
1092 {
1093 if (!msg.HasMember("data"))
1094 {
1095 qCDebug(chatterinoPubsub)
1096 << "Missing required object member `data` in message root";
1097 return;
1098 }
1099
1100 const auto &data = msg["data"];
1101
1102 if (!data.IsObject())
1103 {
1104 qCDebug(chatterinoPubsub) << "Member `data` must be an object";
1105 return;
1106 }
1107
1108 this->handleMessageResponse(data);
1109 }
1110 else if (type == "PONG")
1111 {
1112 auto clientIt = this->clients.find(hdl);
1113
1114 // If this assert goes off, there's something wrong with the connection
1115 // creation/preserving code KKona
1116 assert(clientIt != this->clients.end());
1117
1118 auto &client = *clientIt;
1119
1120 client.second->handlePong();
1121 }
1122 else
1123 {
1124 qCDebug(chatterinoPubsub) << "Unknown message type:" << type;
1125 }
1126 }
1127
onConnectionOpen(WebsocketHandle hdl)1128 void PubSub::onConnectionOpen(WebsocketHandle hdl)
1129 {
1130 DebugCount::increase("PubSub connections");
1131 this->addingClient = false;
1132
1133 auto client =
1134 std::make_shared<detail::PubSubClient>(this->websocketClient, hdl);
1135
1136 // We separate the starting from the constructor because we will want to use
1137 // shared_from_this
1138 client->start();
1139
1140 this->clients.emplace(hdl, client);
1141
1142 this->connected.invoke();
1143
1144 for (auto it = this->requests.begin(); it != this->requests.end();)
1145 {
1146 const auto &request = *it;
1147 if (client->listen(*request))
1148 {
1149 DebugCount::decrease("PubSub topic backlog");
1150 it = this->requests.erase(it);
1151 }
1152 else
1153 {
1154 ++it;
1155 }
1156 }
1157
1158 if (!this->requests.empty())
1159 {
1160 this->addClient();
1161 }
1162 }
1163
onConnectionClose(WebsocketHandle hdl)1164 void PubSub::onConnectionClose(WebsocketHandle hdl)
1165 {
1166 DebugCount::decrease("PubSub connections");
1167 auto clientIt = this->clients.find(hdl);
1168
1169 // If this assert goes off, there's something wrong with the connection
1170 // creation/preserving code KKona
1171 assert(clientIt != this->clients.end());
1172
1173 auto &client = clientIt->second;
1174
1175 client->stop();
1176
1177 this->clients.erase(clientIt);
1178
1179 this->connected.invoke();
1180 }
1181
onTLSInit(websocketpp::connection_hdl hdl)1182 PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl)
1183 {
1184 WebsocketContextPtr ctx(
1185 new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
1186
1187 try
1188 {
1189 ctx->set_options(boost::asio::ssl::context::default_workarounds |
1190 boost::asio::ssl::context::no_sslv2 |
1191 boost::asio::ssl::context::single_dh_use);
1192 }
1193 catch (const std::exception &e)
1194 {
1195 qCDebug(chatterinoPubsub)
1196 << "Exception caught in OnTLSInit:" << e.what();
1197 }
1198
1199 return ctx;
1200 }
1201
handleResponse(const rapidjson::Document & msg)1202 void PubSub::handleResponse(const rapidjson::Document &msg)
1203 {
1204 QString error;
1205
1206 if (!rj::getSafe(msg, "error", error))
1207 return;
1208
1209 QString nonce;
1210 rj::getSafe(msg, "nonce", nonce);
1211
1212 const bool failed = !error.isEmpty();
1213
1214 if (failed)
1215 {
1216 qCDebug(chatterinoPubsub)
1217 << QString("Error %1 on nonce %2").arg(error, nonce);
1218 }
1219
1220 if (auto it = sentListens.find(nonce); it != sentListens.end())
1221 {
1222 this->handleListenResponse(it->second, failed);
1223 return;
1224 }
1225
1226 if (auto it = sentUnlistens.find(nonce); it != sentUnlistens.end())
1227 {
1228 this->handleUnlistenResponse(it->second, failed);
1229 return;
1230 }
1231
1232 qCDebug(chatterinoPubsub)
1233 << "Response on unused" << nonce << "client/topic listener mismatch?";
1234 }
1235
handleListenResponse(const RequestMessage & msg,bool failed)1236 void PubSub::handleListenResponse(const RequestMessage &msg, bool failed)
1237 {
1238 DebugCount::decrease("PubSub topic pending listens", msg.topicCount);
1239 if (failed)
1240 {
1241 DebugCount::increase("PubSub topic failed listens", msg.topicCount);
1242 }
1243 else
1244 {
1245 DebugCount::increase("PubSub topic listening", msg.topicCount);
1246 }
1247 }
1248
handleUnlistenResponse(const RequestMessage & msg,bool failed)1249 void PubSub::handleUnlistenResponse(const RequestMessage &msg, bool failed)
1250 {
1251 DebugCount::decrease("PubSub topic pending unlistens", msg.topicCount);
1252 if (failed)
1253 {
1254 DebugCount::increase("PubSub topic failed unlistens", msg.topicCount);
1255 }
1256 else
1257 {
1258 DebugCount::decrease("PubSub topic listening", msg.topicCount);
1259 }
1260 }
1261
handleMessageResponse(const rapidjson::Value & outerData)1262 void PubSub::handleMessageResponse(const rapidjson::Value &outerData)
1263 {
1264 QString topic;
1265 qCDebug(chatterinoPubsub) << rj::stringify(outerData);
1266
1267 if (!rj::getSafe(outerData, "topic", topic))
1268 {
1269 qCDebug(chatterinoPubsub)
1270 << "Missing required string member `topic` in outerData";
1271 return;
1272 }
1273
1274 QString payload;
1275
1276 if (!rj::getSafe(outerData, "message", payload))
1277 {
1278 qCDebug(chatterinoPubsub) << "Expected string message in outerData";
1279 return;
1280 }
1281
1282 rapidjson::Document msg;
1283
1284 rapidjson::ParseResult res = msg.Parse(payload.toUtf8());
1285
1286 if (!res)
1287 {
1288 qCDebug(chatterinoPubsub)
1289 << QString("Error parsing message '%1' from PubSub: %2")
1290 .arg(payload, rapidjson::GetParseError_En(res.Code()));
1291 return;
1292 }
1293
1294 if (topic.startsWith("whispers."))
1295 {
1296 QString whisperType;
1297
1298 if (!rj::getSafe(msg, "type", whisperType))
1299 {
1300 qCDebug(chatterinoPubsub) << "Bad whisper data";
1301 return;
1302 }
1303
1304 if (whisperType == "whisper_received")
1305 {
1306 this->signals_.whisper.received.invoke(msg);
1307 }
1308 else if (whisperType == "whisper_sent")
1309 {
1310 this->signals_.whisper.sent.invoke(msg);
1311 }
1312 else if (whisperType == "thread")
1313 {
1314 // Handle thread?
1315 }
1316 else
1317 {
1318 qCDebug(chatterinoPubsub) << "Invalid whisper type:" << whisperType;
1319 return;
1320 }
1321 }
1322 else if (topic.startsWith("chat_moderator_actions."))
1323 {
1324 auto topicParts = topic.split(".");
1325 assert(topicParts.length() == 3);
1326 const auto &data = msg["data"];
1327
1328 QString moderationEventType;
1329
1330 if (!rj::getSafe(msg, "type", moderationEventType))
1331 {
1332 qCDebug(chatterinoPubsub) << "Bad moderator event data";
1333 return;
1334 }
1335 if (moderationEventType == "moderation_action")
1336 {
1337 QString moderationAction;
1338
1339 if (!rj::getSafe(data, "moderation_action", moderationAction))
1340 {
1341 qCDebug(chatterinoPubsub)
1342 << "Missing moderation action in data:"
1343 << rj::stringify(data);
1344 return;
1345 }
1346
1347 auto handlerIt =
1348 this->moderationActionHandlers.find(moderationAction);
1349
1350 if (handlerIt == this->moderationActionHandlers.end())
1351 {
1352 qCDebug(chatterinoPubsub)
1353 << "No handler found for moderation action"
1354 << moderationAction;
1355 return;
1356 }
1357 // Invoke handler function
1358 handlerIt->second(data, topicParts[2]);
1359 }
1360 else if (moderationEventType == "channel_terms_action")
1361 {
1362 QString channelTermsAction;
1363
1364 if (!rj::getSafe(data, "type", channelTermsAction))
1365 {
1366 qCDebug(chatterinoPubsub)
1367 << "Missing channel terms action in data:"
1368 << rj::stringify(data);
1369 return;
1370 }
1371
1372 auto handlerIt =
1373 this->channelTermsActionHandlers.find(channelTermsAction);
1374
1375 if (handlerIt == this->channelTermsActionHandlers.end())
1376 {
1377 qCDebug(chatterinoPubsub)
1378 << "No handler found for channel terms action"
1379 << channelTermsAction;
1380 return;
1381 }
1382 // Invoke handler function
1383 handlerIt->second(data, topicParts[2]);
1384 }
1385 }
1386 else if (topic.startsWith("community-points-channel-v1."))
1387 {
1388 QString pointEventType;
1389 if (!rj::getSafe(msg, "type", pointEventType))
1390 {
1391 qCDebug(chatterinoPubsub) << "Bad channel point event data";
1392 return;
1393 }
1394
1395 if (pointEventType == "reward-redeemed")
1396 {
1397 if (!rj::getSafeObject(msg, "data", msg))
1398 {
1399 qCDebug(chatterinoPubsub)
1400 << "No data found for redeemed reward";
1401 return;
1402 }
1403 if (!rj::getSafeObject(msg, "redemption", msg))
1404 {
1405 qCDebug(chatterinoPubsub)
1406 << "No redemption info found for redeemed reward";
1407 return;
1408 }
1409 this->signals_.pointReward.redeemed.invoke(msg);
1410 }
1411 else
1412 {
1413 qCDebug(chatterinoPubsub)
1414 << "Invalid point event type:" << pointEventType;
1415 }
1416 }
1417 else if (topic.startsWith("automod-queue."))
1418 {
1419 auto topicParts = topic.split(".");
1420 assert(topicParts.length() == 3);
1421 auto &data = msg["data"];
1422
1423 QString automodEventType;
1424 if (!rj::getSafe(msg, "type", automodEventType))
1425 {
1426 qCDebug(chatterinoPubsub) << "Bad automod event data";
1427 return;
1428 }
1429
1430 if (automodEventType == "automod_caught_message")
1431 {
1432 QString status;
1433 if (!rj::getSafe(data, "status", status))
1434 {
1435 qCDebug(chatterinoPubsub) << "Failed to get status";
1436 return;
1437 }
1438 if (status == "PENDING")
1439 {
1440 AutomodAction action(data, topicParts[2]);
1441 rapidjson::Value classification;
1442 if (!rj::getSafeObject(data, "content_classification",
1443 classification))
1444 {
1445 qCDebug(chatterinoPubsub)
1446 << "Failed to get content_classification";
1447 return;
1448 }
1449
1450 QString contentCategory;
1451 if (!rj::getSafe(classification, "category", contentCategory))
1452 {
1453 qCDebug(chatterinoPubsub)
1454 << "Failed to get content category";
1455 return;
1456 }
1457 int contentLevel;
1458 if (!rj::getSafe(classification, "level", contentLevel))
1459 {
1460 qCDebug(chatterinoPubsub) << "Failed to get content level";
1461 return;
1462 }
1463 action.reason = QString("%1 level %2")
1464 .arg(contentCategory)
1465 .arg(contentLevel);
1466
1467 rapidjson::Value messageData;
1468 if (!rj::getSafeObject(data, "message", messageData))
1469 {
1470 qCDebug(chatterinoPubsub) << "Failed to get message data";
1471 return;
1472 }
1473
1474 rapidjson::Value messageContent;
1475 if (!rj::getSafeObject(messageData, "content", messageContent))
1476 {
1477 qCDebug(chatterinoPubsub)
1478 << "Failed to get message content";
1479 return;
1480 }
1481 if (!rj::getSafe(messageData, "id", action.msgID))
1482 {
1483 qCDebug(chatterinoPubsub) << "Failed to get message id";
1484 return;
1485 }
1486
1487 if (!rj::getSafe(messageContent, "text", action.message))
1488 {
1489 qCDebug(chatterinoPubsub) << "Failed to get message text";
1490 return;
1491 }
1492
1493 // this message also contains per-word automod data, which could be implemented
1494
1495 // extract sender data manually because twitch loves not being consistent
1496 rapidjson::Value senderData;
1497 if (!rj::getSafeObject(messageData, "sender", senderData))
1498 {
1499 qCDebug(chatterinoPubsub) << "Failed to get sender";
1500 return;
1501 }
1502 QString senderId;
1503 if (!rj::getSafe(senderData, "user_id", senderId))
1504 {
1505 qCDebug(chatterinoPubsub) << "Failed to get sender user id";
1506 return;
1507 }
1508 QString senderLogin;
1509 if (!rj::getSafe(senderData, "login", senderLogin))
1510 {
1511 qCDebug(chatterinoPubsub) << "Failed to get sender login";
1512 return;
1513 }
1514 QString senderDisplayName = senderLogin;
1515 bool hasLocalizedName = false;
1516 if (rj::getSafe(senderData, "display_name", senderDisplayName))
1517 {
1518 // check for non-ascii display names
1519 if (QString::compare(senderLogin, senderDisplayName,
1520 Qt::CaseInsensitive) != 0)
1521 {
1522 hasLocalizedName = true;
1523 }
1524 }
1525 QColor senderColor;
1526 QString senderColor_;
1527 if (rj::getSafe(senderData, "chat_color", senderColor_))
1528 {
1529 senderColor = QColor(senderColor_);
1530 }
1531 else if (getSettings()->colorizeNicknames)
1532 {
1533 // color may be not present if user is a grey-name
1534 senderColor = getRandomColor(senderId);
1535 }
1536 // handle username style based on prefered setting
1537 switch (getSettings()->usernameDisplayMode.getValue())
1538 {
1539 case UsernameDisplayMode::Username: {
1540 if (hasLocalizedName)
1541 {
1542 senderDisplayName = senderLogin;
1543 }
1544 break;
1545 }
1546 case UsernameDisplayMode::LocalizedName: {
1547 break;
1548 }
1549 case UsernameDisplayMode::UsernameAndLocalizedName: {
1550 if (hasLocalizedName)
1551 {
1552 senderDisplayName = QString("%1(%2)").arg(
1553 senderLogin, senderDisplayName);
1554 }
1555 break;
1556 }
1557 }
1558
1559 action.target = ActionUser{senderId, senderLogin,
1560 senderDisplayName, senderColor};
1561 this->signals_.moderation.automodMessage.invoke(action);
1562 }
1563 // "ALLOWED" and "DENIED" statuses remain unimplemented
1564 // They are versions of automod_message_(denied|approved) but for mods.
1565 }
1566 }
1567 else
1568 {
1569 qCDebug(chatterinoPubsub) << "Unknown topic:" << topic;
1570 return;
1571 }
1572 }
1573
runThread()1574 void PubSub::runThread()
1575 {
1576 qCDebug(chatterinoPubsub) << "Start pubsub manager thread";
1577 this->websocketClient.run();
1578 qCDebug(chatterinoPubsub) << "Done with pubsub manager thread";
1579 }
1580
1581 } // namespace chatterino
1582