1 #include "AbstractIrcServer.hpp"
2
3 #include "common/Channel.hpp"
4 #include "common/Common.hpp"
5 #include "common/QLogging.hpp"
6 #include "messages/LimitedQueueSnapshot.hpp"
7 #include "messages/Message.hpp"
8 #include "messages/MessageBuilder.hpp"
9
10 #include <QCoreApplication>
11
12 namespace chatterino {
13
14 const int RECONNECT_BASE_INTERVAL = 2000;
15 // 60 falloff counter means it will try to reconnect at most every 60*2 seconds
16 const int MAX_FALLOFF_COUNTER = 60;
17
18 // Ratelimits for joinBucket_
19 const int JOIN_RATELIMIT_BUDGET = 18;
20 const int JOIN_RATELIMIT_COOLDOWN = 10500;
21
AbstractIrcServer()22 AbstractIrcServer::AbstractIrcServer()
23 {
24 // Initialize the connections
25 // XXX: don't create write connection if there is no separate write connection.
26 this->writeConnection_.reset(new IrcConnection);
27 this->writeConnection_->moveToThread(
28 QCoreApplication::instance()->thread());
29
30 // Apply a leaky bucket rate limiting to JOIN messages
31 auto actuallyJoin = [&](QString message) {
32 if (!this->channels.contains(message))
33 {
34 return;
35 }
36 this->readConnection_->sendRaw("JOIN #" + message);
37 };
38 this->joinBucket_.reset(new RatelimitBucket(
39 JOIN_RATELIMIT_BUDGET, JOIN_RATELIMIT_COOLDOWN, actuallyJoin, this));
40
41 QObject::connect(this->writeConnection_.get(),
42 &Communi::IrcConnection::messageReceived, this,
43 [this](auto msg) {
44 this->writeConnectionMessageReceived(msg);
45 });
46 QObject::connect(this->writeConnection_.get(),
47 &Communi::IrcConnection::connected, this, [this] {
48 this->onWriteConnected(this->writeConnection_.get());
49 });
50 this->writeConnection_->connectionLost.connect([this](bool timeout) {
51 qCDebug(chatterinoIrc)
52 << "Write connection reconnect requested. Timeout:" << timeout;
53 this->writeConnection_->smartReconnect.invoke();
54 });
55
56 // Listen to read connection message signals
57 this->readConnection_.reset(new IrcConnection);
58 this->readConnection_->moveToThread(QCoreApplication::instance()->thread());
59
60 QObject::connect(this->readConnection_.get(),
61 &Communi::IrcConnection::messageReceived, this,
62 [this](auto msg) {
63 this->readConnectionMessageReceived(msg);
64 });
65 QObject::connect(this->readConnection_.get(),
66 &Communi::IrcConnection::privateMessageReceived, this,
67 [this](auto msg) {
68 this->privateMessageReceived(msg);
69 });
70 QObject::connect(this->readConnection_.get(),
71 &Communi::IrcConnection::connected, this, [this] {
72 this->onReadConnected(this->readConnection_.get());
73 });
74 QObject::connect(this->readConnection_.get(),
75 &Communi::IrcConnection::disconnected, this, [this] {
76 this->onDisconnected();
77 });
78 this->readConnection_->connectionLost.connect([this](bool timeout) {
79 qCDebug(chatterinoIrc)
80 << "Read connection reconnect requested. Timeout:" << timeout;
81 if (timeout)
82 {
83 // Show additional message since this is going to interrupt a
84 // connection that is still "connected"
85 this->addGlobalSystemMessage(
86 "Server connection timed out, reconnecting");
87 }
88 this->readConnection_->smartReconnect.invoke();
89 });
90 }
91
initializeIrc()92 void AbstractIrcServer::initializeIrc()
93 {
94 assert(!this->initialized_);
95
96 if (this->hasSeparateWriteConnection())
97 {
98 this->initializeConnectionSignals(this->writeConnection_.get(),
99 ConnectionType::Write);
100 this->initializeConnectionSignals(this->readConnection_.get(),
101 ConnectionType::Read);
102 }
103 else
104 {
105 this->initializeConnectionSignals(this->readConnection_.get(),
106 ConnectionType::Both);
107 }
108
109 this->initialized_ = true;
110 }
111
connect()112 void AbstractIrcServer::connect()
113 {
114 assert(this->initialized_);
115
116 this->disconnect();
117
118 if (this->hasSeparateWriteConnection())
119 {
120 this->initializeConnection(this->writeConnection_.get(), Write);
121 this->initializeConnection(this->readConnection_.get(), Read);
122 }
123 else
124 {
125 this->initializeConnection(this->readConnection_.get(), Both);
126 }
127 }
128
open(ConnectionType type)129 void AbstractIrcServer::open(ConnectionType type)
130 {
131 std::lock_guard<std::mutex> lock(this->connectionMutex_);
132
133 if (type == Write)
134 {
135 this->writeConnection_->open();
136 }
137 if (type & Read)
138 {
139 this->readConnection_->open();
140 }
141 }
142
addGlobalSystemMessage(const QString & messageText)143 void AbstractIrcServer::addGlobalSystemMessage(const QString &messageText)
144 {
145 std::lock_guard<std::mutex> lock(this->channelMutex);
146
147 MessageBuilder b(systemMessage, messageText);
148 auto message = b.release();
149
150 for (std::weak_ptr<Channel> &weak : this->channels.values())
151 {
152 std::shared_ptr<Channel> chan = weak.lock();
153 if (!chan)
154 {
155 continue;
156 }
157
158 chan->addMessage(message);
159 }
160 }
161
disconnect()162 void AbstractIrcServer::disconnect()
163 {
164 std::lock_guard<std::mutex> locker(this->connectionMutex_);
165
166 this->readConnection_->close();
167 if (this->hasSeparateWriteConnection())
168 {
169 this->writeConnection_->close();
170 }
171 }
172
sendMessage(const QString & channelName,const QString & message)173 void AbstractIrcServer::sendMessage(const QString &channelName,
174 const QString &message)
175 {
176 this->sendRawMessage("PRIVMSG #" + channelName + " :" + message);
177 }
178
sendRawMessage(const QString & rawMessage)179 void AbstractIrcServer::sendRawMessage(const QString &rawMessage)
180 {
181 std::lock_guard<std::mutex> locker(this->connectionMutex_);
182
183 if (this->hasSeparateWriteConnection())
184 {
185 this->writeConnection_->sendRaw(rawMessage);
186 }
187 else
188 {
189 this->readConnection_->sendRaw(rawMessage);
190 }
191 }
192
writeConnectionMessageReceived(Communi::IrcMessage * message)193 void AbstractIrcServer::writeConnectionMessageReceived(
194 Communi::IrcMessage *message)
195 {
196 (void)message;
197 }
198
getOrAddChannel(const QString & dirtyChannelName)199 ChannelPtr AbstractIrcServer::getOrAddChannel(const QString &dirtyChannelName)
200 {
201 auto channelName = this->cleanChannelName(dirtyChannelName);
202
203 // try get channel
204 ChannelPtr chan = this->getChannelOrEmpty(channelName);
205 if (chan != Channel::getEmpty())
206 {
207 return chan;
208 }
209
210 std::lock_guard<std::mutex> lock(this->channelMutex);
211
212 // value doesn't exist
213 chan = this->createChannel(channelName);
214 if (!chan)
215 {
216 return Channel::getEmpty();
217 }
218
219 this->channels.insert(channelName, chan);
220 this->connections_.emplace_back(
221 chan->destroyed.connect([this, channelName] {
222 // fourtf: issues when the server itself is destroyed
223
224 qCDebug(chatterinoIrc) << "[AbstractIrcServer::addChannel]"
225 << channelName << "was destroyed";
226 this->channels.remove(channelName);
227
228 if (this->readConnection_)
229 {
230 this->readConnection_->sendRaw("PART #" + channelName);
231 }
232 }));
233
234 // join irc channel
235 {
236 std::lock_guard<std::mutex> lock2(this->connectionMutex_);
237
238 if (this->readConnection_)
239 {
240 if (this->readConnection_->isConnected())
241 {
242 this->joinBucket_->send(channelName);
243 }
244 }
245 }
246
247 return chan;
248 }
249
getChannelOrEmpty(const QString & dirtyChannelName)250 ChannelPtr AbstractIrcServer::getChannelOrEmpty(const QString &dirtyChannelName)
251 {
252 auto channelName = this->cleanChannelName(dirtyChannelName);
253
254 std::lock_guard<std::mutex> lock(this->channelMutex);
255
256 // try get special channel
257 ChannelPtr chan = this->getCustomChannel(channelName);
258 if (chan)
259 {
260 return chan;
261 }
262
263 // value exists
264 auto it = this->channels.find(channelName);
265 if (it != this->channels.end())
266 {
267 chan = it.value().lock();
268
269 if (chan)
270 {
271 return chan;
272 }
273 }
274
275 return Channel::getEmpty();
276 }
277
getChannels()278 std::vector<std::weak_ptr<Channel>> AbstractIrcServer::getChannels()
279 {
280 std::lock_guard lock(this->channelMutex);
281 std::vector<std::weak_ptr<Channel>> channels;
282
283 for (auto &&weak : this->channels.values())
284 {
285 channels.push_back(weak);
286 }
287
288 return channels;
289 }
290
onReadConnected(IrcConnection * connection)291 void AbstractIrcServer::onReadConnected(IrcConnection *connection)
292 {
293 (void)connection;
294
295 std::lock_guard lock(this->channelMutex);
296
297 // join channels
298 for (auto &&weak : this->channels)
299 {
300 if (auto channel = weak.lock())
301 {
302 this->joinBucket_->send(channel->getName());
303 }
304 }
305
306 // connected/disconnected message
307 auto connectedMsg = makeSystemMessage("connected");
308 connectedMsg->flags.set(MessageFlag::ConnectedMessage);
309 auto reconnected = makeSystemMessage("reconnected");
310 reconnected->flags.set(MessageFlag::ConnectedMessage);
311
312 for (std::weak_ptr<Channel> &weak : this->channels.values())
313 {
314 std::shared_ptr<Channel> chan = weak.lock();
315 if (!chan)
316 {
317 continue;
318 }
319
320 LimitedQueueSnapshot<MessagePtr> snapshot = chan->getMessageSnapshot();
321
322 bool replaceMessage =
323 snapshot.size() > 0 && snapshot[snapshot.size() - 1]->flags.has(
324 MessageFlag::DisconnectedMessage);
325
326 if (replaceMessage)
327 {
328 chan->replaceMessage(snapshot[snapshot.size() - 1], reconnected);
329 continue;
330 }
331
332 chan->addMessage(connectedMsg);
333 }
334
335 this->falloffCounter_ = 1;
336 }
337
onWriteConnected(IrcConnection * connection)338 void AbstractIrcServer::onWriteConnected(IrcConnection *connection)
339 {
340 (void)connection;
341 }
342
onDisconnected()343 void AbstractIrcServer::onDisconnected()
344 {
345 std::lock_guard<std::mutex> lock(this->channelMutex);
346
347 MessageBuilder b(systemMessage, "disconnected");
348 b->flags.set(MessageFlag::DisconnectedMessage);
349 auto disconnectedMsg = b.release();
350
351 for (std::weak_ptr<Channel> &weak : this->channels.values())
352 {
353 std::shared_ptr<Channel> chan = weak.lock();
354 if (!chan)
355 {
356 continue;
357 }
358
359 chan->addMessage(disconnectedMsg);
360 }
361 }
362
getCustomChannel(const QString & channelName)363 std::shared_ptr<Channel> AbstractIrcServer::getCustomChannel(
364 const QString &channelName)
365 {
366 (void)channelName;
367 return nullptr;
368 }
369
cleanChannelName(const QString & dirtyChannelName)370 QString AbstractIrcServer::cleanChannelName(const QString &dirtyChannelName)
371 {
372 if (dirtyChannelName.startsWith('#'))
373 return dirtyChannelName.mid(1);
374 else
375 return dirtyChannelName;
376 }
377
addFakeMessage(const QString & data)378 void AbstractIrcServer::addFakeMessage(const QString &data)
379 {
380 auto fakeMessage = Communi::IrcMessage::fromData(
381 data.toUtf8(), this->readConnection_.get());
382
383 if (fakeMessage->command() == "PRIVMSG")
384 {
385 this->privateMessageReceived(
386 static_cast<Communi::IrcPrivateMessage *>(fakeMessage));
387 }
388 else
389 {
390 this->readConnectionMessageReceived(fakeMessage);
391 }
392 }
393
privateMessageReceived(Communi::IrcPrivateMessage * message)394 void AbstractIrcServer::privateMessageReceived(
395 Communi::IrcPrivateMessage *message)
396 {
397 (void)message;
398 }
399
readConnectionMessageReceived(Communi::IrcMessage * message)400 void AbstractIrcServer::readConnectionMessageReceived(
401 Communi::IrcMessage *message)
402 {
403 }
404
forEachChannel(std::function<void (ChannelPtr)> func)405 void AbstractIrcServer::forEachChannel(std::function<void(ChannelPtr)> func)
406 {
407 std::lock_guard<std::mutex> lock(this->channelMutex);
408
409 for (std::weak_ptr<Channel> &weak : this->channels.values())
410 {
411 ChannelPtr chan = weak.lock();
412 if (!chan)
413 {
414 continue;
415 }
416
417 func(chan);
418 }
419 }
420
421 } // namespace chatterino
422