1 #include "AbstractIrcServer.hpp"
2 
3 #include "common/Channel.hpp"
4 #include "common/Common.hpp"
5 #include "common/QLogging.hpp"
6 #include "messages/LimitedQueueSnapshot.hpp"
7 #include "messages/Message.hpp"
8 #include "messages/MessageBuilder.hpp"
9 
10 #include <QCoreApplication>
11 
12 namespace chatterino {
13 
14 const int RECONNECT_BASE_INTERVAL = 2000;
15 // 60 falloff counter means it will try to reconnect at most every 60*2 seconds
16 const int MAX_FALLOFF_COUNTER = 60;
17 
18 // Ratelimits for joinBucket_
19 const int JOIN_RATELIMIT_BUDGET = 18;
20 const int JOIN_RATELIMIT_COOLDOWN = 10500;
21 
AbstractIrcServer()22 AbstractIrcServer::AbstractIrcServer()
23 {
24     // Initialize the connections
25     // XXX: don't create write connection if there is no separate write connection.
26     this->writeConnection_.reset(new IrcConnection);
27     this->writeConnection_->moveToThread(
28         QCoreApplication::instance()->thread());
29 
30     // Apply a leaky bucket rate limiting to JOIN messages
31     auto actuallyJoin = [&](QString message) {
32         if (!this->channels.contains(message))
33         {
34             return;
35         }
36         this->readConnection_->sendRaw("JOIN #" + message);
37     };
38     this->joinBucket_.reset(new RatelimitBucket(
39         JOIN_RATELIMIT_BUDGET, JOIN_RATELIMIT_COOLDOWN, actuallyJoin, this));
40 
41     QObject::connect(this->writeConnection_.get(),
42                      &Communi::IrcConnection::messageReceived, this,
43                      [this](auto msg) {
44                          this->writeConnectionMessageReceived(msg);
45                      });
46     QObject::connect(this->writeConnection_.get(),
47                      &Communi::IrcConnection::connected, this, [this] {
48                          this->onWriteConnected(this->writeConnection_.get());
49                      });
50     this->writeConnection_->connectionLost.connect([this](bool timeout) {
51         qCDebug(chatterinoIrc)
52             << "Write connection reconnect requested. Timeout:" << timeout;
53         this->writeConnection_->smartReconnect.invoke();
54     });
55 
56     // Listen to read connection message signals
57     this->readConnection_.reset(new IrcConnection);
58     this->readConnection_->moveToThread(QCoreApplication::instance()->thread());
59 
60     QObject::connect(this->readConnection_.get(),
61                      &Communi::IrcConnection::messageReceived, this,
62                      [this](auto msg) {
63                          this->readConnectionMessageReceived(msg);
64                      });
65     QObject::connect(this->readConnection_.get(),
66                      &Communi::IrcConnection::privateMessageReceived, this,
67                      [this](auto msg) {
68                          this->privateMessageReceived(msg);
69                      });
70     QObject::connect(this->readConnection_.get(),
71                      &Communi::IrcConnection::connected, this, [this] {
72                          this->onReadConnected(this->readConnection_.get());
73                      });
74     QObject::connect(this->readConnection_.get(),
75                      &Communi::IrcConnection::disconnected, this, [this] {
76                          this->onDisconnected();
77                      });
78     this->readConnection_->connectionLost.connect([this](bool timeout) {
79         qCDebug(chatterinoIrc)
80             << "Read connection reconnect requested. Timeout:" << timeout;
81         if (timeout)
82         {
83             // Show additional message since this is going to interrupt a
84             // connection that is still "connected"
85             this->addGlobalSystemMessage(
86                 "Server connection timed out, reconnecting");
87         }
88         this->readConnection_->smartReconnect.invoke();
89     });
90 }
91 
initializeIrc()92 void AbstractIrcServer::initializeIrc()
93 {
94     assert(!this->initialized_);
95 
96     if (this->hasSeparateWriteConnection())
97     {
98         this->initializeConnectionSignals(this->writeConnection_.get(),
99                                           ConnectionType::Write);
100         this->initializeConnectionSignals(this->readConnection_.get(),
101                                           ConnectionType::Read);
102     }
103     else
104     {
105         this->initializeConnectionSignals(this->readConnection_.get(),
106                                           ConnectionType::Both);
107     }
108 
109     this->initialized_ = true;
110 }
111 
connect()112 void AbstractIrcServer::connect()
113 {
114     assert(this->initialized_);
115 
116     this->disconnect();
117 
118     if (this->hasSeparateWriteConnection())
119     {
120         this->initializeConnection(this->writeConnection_.get(), Write);
121         this->initializeConnection(this->readConnection_.get(), Read);
122     }
123     else
124     {
125         this->initializeConnection(this->readConnection_.get(), Both);
126     }
127 }
128 
open(ConnectionType type)129 void AbstractIrcServer::open(ConnectionType type)
130 {
131     std::lock_guard<std::mutex> lock(this->connectionMutex_);
132 
133     if (type == Write)
134     {
135         this->writeConnection_->open();
136     }
137     if (type & Read)
138     {
139         this->readConnection_->open();
140     }
141 }
142 
addGlobalSystemMessage(const QString & messageText)143 void AbstractIrcServer::addGlobalSystemMessage(const QString &messageText)
144 {
145     std::lock_guard<std::mutex> lock(this->channelMutex);
146 
147     MessageBuilder b(systemMessage, messageText);
148     auto message = b.release();
149 
150     for (std::weak_ptr<Channel> &weak : this->channels.values())
151     {
152         std::shared_ptr<Channel> chan = weak.lock();
153         if (!chan)
154         {
155             continue;
156         }
157 
158         chan->addMessage(message);
159     }
160 }
161 
disconnect()162 void AbstractIrcServer::disconnect()
163 {
164     std::lock_guard<std::mutex> locker(this->connectionMutex_);
165 
166     this->readConnection_->close();
167     if (this->hasSeparateWriteConnection())
168     {
169         this->writeConnection_->close();
170     }
171 }
172 
sendMessage(const QString & channelName,const QString & message)173 void AbstractIrcServer::sendMessage(const QString &channelName,
174                                     const QString &message)
175 {
176     this->sendRawMessage("PRIVMSG #" + channelName + " :" + message);
177 }
178 
sendRawMessage(const QString & rawMessage)179 void AbstractIrcServer::sendRawMessage(const QString &rawMessage)
180 {
181     std::lock_guard<std::mutex> locker(this->connectionMutex_);
182 
183     if (this->hasSeparateWriteConnection())
184     {
185         this->writeConnection_->sendRaw(rawMessage);
186     }
187     else
188     {
189         this->readConnection_->sendRaw(rawMessage);
190     }
191 }
192 
writeConnectionMessageReceived(Communi::IrcMessage * message)193 void AbstractIrcServer::writeConnectionMessageReceived(
194     Communi::IrcMessage *message)
195 {
196     (void)message;
197 }
198 
getOrAddChannel(const QString & dirtyChannelName)199 ChannelPtr AbstractIrcServer::getOrAddChannel(const QString &dirtyChannelName)
200 {
201     auto channelName = this->cleanChannelName(dirtyChannelName);
202 
203     // try get channel
204     ChannelPtr chan = this->getChannelOrEmpty(channelName);
205     if (chan != Channel::getEmpty())
206     {
207         return chan;
208     }
209 
210     std::lock_guard<std::mutex> lock(this->channelMutex);
211 
212     // value doesn't exist
213     chan = this->createChannel(channelName);
214     if (!chan)
215     {
216         return Channel::getEmpty();
217     }
218 
219     this->channels.insert(channelName, chan);
220     this->connections_.emplace_back(
221         chan->destroyed.connect([this, channelName] {
222             // fourtf: issues when the server itself is destroyed
223 
224             qCDebug(chatterinoIrc) << "[AbstractIrcServer::addChannel]"
225                                    << channelName << "was destroyed";
226             this->channels.remove(channelName);
227 
228             if (this->readConnection_)
229             {
230                 this->readConnection_->sendRaw("PART #" + channelName);
231             }
232         }));
233 
234     // join irc channel
235     {
236         std::lock_guard<std::mutex> lock2(this->connectionMutex_);
237 
238         if (this->readConnection_)
239         {
240             if (this->readConnection_->isConnected())
241             {
242                 this->joinBucket_->send(channelName);
243             }
244         }
245     }
246 
247     return chan;
248 }
249 
getChannelOrEmpty(const QString & dirtyChannelName)250 ChannelPtr AbstractIrcServer::getChannelOrEmpty(const QString &dirtyChannelName)
251 {
252     auto channelName = this->cleanChannelName(dirtyChannelName);
253 
254     std::lock_guard<std::mutex> lock(this->channelMutex);
255 
256     // try get special channel
257     ChannelPtr chan = this->getCustomChannel(channelName);
258     if (chan)
259     {
260         return chan;
261     }
262 
263     // value exists
264     auto it = this->channels.find(channelName);
265     if (it != this->channels.end())
266     {
267         chan = it.value().lock();
268 
269         if (chan)
270         {
271             return chan;
272         }
273     }
274 
275     return Channel::getEmpty();
276 }
277 
getChannels()278 std::vector<std::weak_ptr<Channel>> AbstractIrcServer::getChannels()
279 {
280     std::lock_guard lock(this->channelMutex);
281     std::vector<std::weak_ptr<Channel>> channels;
282 
283     for (auto &&weak : this->channels.values())
284     {
285         channels.push_back(weak);
286     }
287 
288     return channels;
289 }
290 
onReadConnected(IrcConnection * connection)291 void AbstractIrcServer::onReadConnected(IrcConnection *connection)
292 {
293     (void)connection;
294 
295     std::lock_guard lock(this->channelMutex);
296 
297     // join channels
298     for (auto &&weak : this->channels)
299     {
300         if (auto channel = weak.lock())
301         {
302             this->joinBucket_->send(channel->getName());
303         }
304     }
305 
306     // connected/disconnected message
307     auto connectedMsg = makeSystemMessage("connected");
308     connectedMsg->flags.set(MessageFlag::ConnectedMessage);
309     auto reconnected = makeSystemMessage("reconnected");
310     reconnected->flags.set(MessageFlag::ConnectedMessage);
311 
312     for (std::weak_ptr<Channel> &weak : this->channels.values())
313     {
314         std::shared_ptr<Channel> chan = weak.lock();
315         if (!chan)
316         {
317             continue;
318         }
319 
320         LimitedQueueSnapshot<MessagePtr> snapshot = chan->getMessageSnapshot();
321 
322         bool replaceMessage =
323             snapshot.size() > 0 && snapshot[snapshot.size() - 1]->flags.has(
324                                        MessageFlag::DisconnectedMessage);
325 
326         if (replaceMessage)
327         {
328             chan->replaceMessage(snapshot[snapshot.size() - 1], reconnected);
329             continue;
330         }
331 
332         chan->addMessage(connectedMsg);
333     }
334 
335     this->falloffCounter_ = 1;
336 }
337 
onWriteConnected(IrcConnection * connection)338 void AbstractIrcServer::onWriteConnected(IrcConnection *connection)
339 {
340     (void)connection;
341 }
342 
onDisconnected()343 void AbstractIrcServer::onDisconnected()
344 {
345     std::lock_guard<std::mutex> lock(this->channelMutex);
346 
347     MessageBuilder b(systemMessage, "disconnected");
348     b->flags.set(MessageFlag::DisconnectedMessage);
349     auto disconnectedMsg = b.release();
350 
351     for (std::weak_ptr<Channel> &weak : this->channels.values())
352     {
353         std::shared_ptr<Channel> chan = weak.lock();
354         if (!chan)
355         {
356             continue;
357         }
358 
359         chan->addMessage(disconnectedMsg);
360     }
361 }
362 
getCustomChannel(const QString & channelName)363 std::shared_ptr<Channel> AbstractIrcServer::getCustomChannel(
364     const QString &channelName)
365 {
366     (void)channelName;
367     return nullptr;
368 }
369 
cleanChannelName(const QString & dirtyChannelName)370 QString AbstractIrcServer::cleanChannelName(const QString &dirtyChannelName)
371 {
372     if (dirtyChannelName.startsWith('#'))
373         return dirtyChannelName.mid(1);
374     else
375         return dirtyChannelName;
376 }
377 
addFakeMessage(const QString & data)378 void AbstractIrcServer::addFakeMessage(const QString &data)
379 {
380     auto fakeMessage = Communi::IrcMessage::fromData(
381         data.toUtf8(), this->readConnection_.get());
382 
383     if (fakeMessage->command() == "PRIVMSG")
384     {
385         this->privateMessageReceived(
386             static_cast<Communi::IrcPrivateMessage *>(fakeMessage));
387     }
388     else
389     {
390         this->readConnectionMessageReceived(fakeMessage);
391     }
392 }
393 
privateMessageReceived(Communi::IrcPrivateMessage * message)394 void AbstractIrcServer::privateMessageReceived(
395     Communi::IrcPrivateMessage *message)
396 {
397     (void)message;
398 }
399 
readConnectionMessageReceived(Communi::IrcMessage * message)400 void AbstractIrcServer::readConnectionMessageReceived(
401     Communi::IrcMessage *message)
402 {
403 }
404 
forEachChannel(std::function<void (ChannelPtr)> func)405 void AbstractIrcServer::forEachChannel(std::function<void(ChannelPtr)> func)
406 {
407     std::lock_guard<std::mutex> lock(this->channelMutex);
408 
409     for (std::weak_ptr<Channel> &weak : this->channels.values())
410     {
411         ChannelPtr chan = weak.lock();
412         if (!chan)
413         {
414             continue;
415         }
416 
417         func(chan);
418     }
419 }
420 
421 }  // namespace chatterino
422