1 /*
2 * Copyright 2013 Vitaly Valtman
3 * Copyright 2014 Canonical Ltd.
4 * Authors:
5 * Roberto Mier
6 * Tiago Herrmann
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; version 3.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 *
20 */
21
22 #include "session.h"
23 #include <openssl/rand.h>
24 #include <openssl/sha.h>
25 #include <QDateTime>
26 #include "util/tlvalues.h"
27 #include "telegram/coretypes.h"
28
29 Q_LOGGING_CATEGORY(TG_CORE_SESSION, "tg.core.session")
30
31 qint64 Session::m_clientLastMsgId = 0;
32
33
Session(DC * dc,Settings * settings,CryptoUtils * crypto,QObject * parent)34 Session::Session(DC *dc, Settings *settings, CryptoUtils *crypto, QObject *parent) :
35 Connection(dc->host(), dc->port(), parent),
36 mSettings(settings),
37 mCrypto(crypto),
38 m_sessionId(0),
39 m_serverSalt(0),
40 mTimeDifference(0),
41 m_seqNo(0),
42 m_dc(dc),
43 m_initConnectionNeeded(true) {
44 // ensure dc has, at least, the shared key created
45 Q_ASSERT(dc->state() >= DC::authKeyCreated);
46 // copy calculated values for timeDifference and serverSalt when created shared key.
47 // This copy is needed because we could have several sessions against same dc with different values
48 mTimeDifference = m_dc->timeDifference();
49 m_serverSalt = m_dc->serverSalt();
50 // create session id
51 RAND_pseudo_bytes((uchar *) &m_sessionId, 8);
52 qCDebug(TG_CORE_SESSION) << "created session with id" << QString::number(m_sessionId, 16);
53 }
54
~Session()55 Session::~Session() {
56 qCDebug(TG_CORE_SESSION) << "destroyed session with id" << QString::number(m_sessionId, 16);
57 }
58
close()59 void Session::close() {
60 if (this->state() != QAbstractSocket::UnconnectedState) {
61 connect(this, SIGNAL(disconnected()), SLOT(onDisconnected()));
62 this->disconnectFromHost();
63 } else {
64 Q_EMIT sessionClosed(m_sessionId);
65 }
66 }
67
onDisconnected()68 void Session::onDisconnected() {
69 if(error() == QAbstractSocket::RemoteHostClosedError) {
70 return; // Trying to reconnect...
71 }
72
73 Q_EMIT sessionClosed(m_sessionId);
74 }
75
release()76 void Session::release() {
77 Q_EMIT sessionReleased(m_sessionId);
78 }
79
dc()80 DC *Session::dc() {
81 return m_dc;
82 }
83
generatePlainNextMsgId()84 qint64 Session::generatePlainNextMsgId() {
85 return (qint64) ((QDateTime::currentDateTime().toTime_t() - mTimeDifference) * (1LL << 32)) & -4;
86 }
87
generateNextMsgId()88 qint64 Session::generateNextMsgId() {
89 qint64 nextId = generatePlainNextMsgId();
90 if (nextId <= m_clientLastMsgId) {
91 nextId = m_clientLastMsgId += 4;
92 } else {
93 m_clientLastMsgId = nextId;
94 }
95 return nextId;
96 }
97
processConnected()98 void Session::processConnected() {
99 Q_EMIT sessionReady(m_dc);
100 }
101
processRpcAnswer(QByteArray response)102 void Session::processRpcAnswer(QByteArray response) {
103 qint32 len = response.length();
104
105 qCDebug(TG_CORE_SESSION) << "connection #" << socketDescriptor() << "received rpc answer with" << len << "content bytes by session" << QString::number(m_sessionId, 16);
106
107 InboundPkt p(response.data(), len);
108 processRpcMessage(p);
109 }
110
processRpcMessage(InboundPkt & inboundPkt)111 void Session::processRpcMessage(InboundPkt &inboundPkt) {
112
113 EncryptedMsg *enc = (EncryptedMsg *)inboundPkt.buffer();
114 qint32 len = inboundPkt.length();
115
116 const qint32 MINSZ = offsetof (EncryptedMsg, message);
117 const qint32 UNENCSZ = offsetof (EncryptedMsg, serverSalt);
118 qCDebug(TG_CORE_SESSION) << "processRpcMessage(), len=" << len;
119
120 if(len < MINSZ || (len & 15) != (UNENCSZ & 15))
121 return;
122
123 Q_ASSERT(m_dc->authKeyId());
124 mAsserter.check(enc->authKeyId == m_dc->authKeyId());
125 //msg_key is used to compute AES key and to decrypt the received message
126 mCrypto->initAESAuth(m_dc->authKey() + 8, enc->msgKey, AES_DECRYPT);
127 qint32 l = mCrypto->padAESDecrypt((char *)&enc->serverSalt, len - UNENCSZ, (char *)&enc->serverSalt, len - UNENCSZ);
128 Q_UNUSED(l);
129 Q_ASSERT(l == len - UNENCSZ);
130 if( !(!(enc->msgLen & 3) && enc->msgLen > 0 && enc->msgLen <= len - MINSZ && len - MINSZ - enc->msgLen <= 12) )
131 return;
132
133 //check msg_key is indeed equal to SHA1 of the plaintext obtained after decription (without final padding bytes).
134 static uchar sha1Buffer[20];
135 SHA1((uchar *)&enc->serverSalt, enc->msgLen + (MINSZ - UNENCSZ), sha1Buffer);
136 if(memcmp (&enc->msgKey, sha1Buffer + 4, 16))
137 return;
138
139 if (m_dc->serverSalt() != enc->serverSalt) {
140 m_dc->setServerSalt(enc->serverSalt);
141 }
142
143 // check time synchronization
144 qint32 serverTime = enc->msgId >> 32LL;
145 qint32 clientTime = QDateTime::currentDateTime().toTime_t() - mTimeDifference;
146 if (clientTime <= serverTime - 30 || clientTime >= serverTime + 300) {
147 qCDebug(TG_CORE_SESSION) << "salt =" << enc->serverSalt << ", sessionId =" << QString::number(enc->sessionId, 16) << ", msgId =" << QString::number(enc->msgId, 16) << ", seqNo =" << enc->seqNo << ", serverTime =" << serverTime << ", clientTime =" << clientTime;
148 QString alert("Received message has too large difference between client and server dates - ");
149 if (clientTime <= serverTime -30) {
150 alert.append("the message has a date at least 30 seconds later in time than current date");
151 } else {
152 alert.append("the message was sent at least 300 seconds ago");
153 }
154 qCWarning(TG_CORE_SESSION) << alert;
155 }
156
157 inboundPkt.setInPtr(enc->message);
158 inboundPkt.setInEnd(inboundPkt.inPtr() + (enc->msgLen / 4));
159
160 qCDebug(TG_CORE_SESSION) << "received message id" << QString::number(enc->msgId, 16);
161
162 Q_ASSERT(l >= (MINSZ - UNENCSZ) + 8);
163
164 if (enc->msgId & 1) {
165 addToPendingAcks(enc->msgId);
166 }
167
168 mAsserter.check(m_sessionId == enc->sessionId);
169 rpcExecuteAnswer(inboundPkt, enc->msgId);
170 mAsserter.check(inboundPkt.inPtr() == inboundPkt.inEnd());
171 }
172
rpcExecuteAnswer(InboundPkt & inboundPkt,qint64 msgId)173 void Session::rpcExecuteAnswer(InboundPkt &inboundPkt, qint64 msgId) {
174 qint32 op = inboundPkt.prefetchInt();
175 qCDebug(TG_CORE_SESSION) << "rpcExecuteAnswer(), op =" << QString::number(op, 16);
176 switch (op) {
177 case TL_MsgContainer:
178 workContainer(inboundPkt, msgId);
179 return;
180 case TL_NewSessionCreated:
181 workNewSessionCreated(inboundPkt, msgId);
182 return;
183 case TL_MsgsAck:
184 workMsgsAck(inboundPkt, msgId);
185 return;
186 case TL_RpcResult:
187 workRpcResult(inboundPkt, msgId);
188 return;
189 case UpdatesType::typeUpdateShort:
190 workUpdateShort(inboundPkt, msgId);
191 return;
192 case UpdatesType::typeUpdatesCombined:
193 workUpdatesCombined(inboundPkt, msgId);
194 case UpdatesType::typeUpdates:
195 workUpdates(inboundPkt, msgId);
196 return;
197 case UpdatesType::typeUpdateShortMessage:
198 workUpdateShortMessage(inboundPkt, msgId);
199 return;
200 case UpdatesType::typeUpdateShortChatMessage:
201 workUpdateShortChatMessage(inboundPkt, msgId);
202 return;
203 case TL_GZipPacked:
204 workPacked(inboundPkt, msgId);
205 return;
206 case TL_BadServerSalt:
207 workBadServerSalt(inboundPkt, msgId);
208 return;
209 case TL_Pong:
210 workPong(inboundPkt, msgId);
211 return;
212 case TL_MsgDetailedInfo:
213 workDetailedInfo(inboundPkt, msgId);
214 return;
215 case TL_MsgNewDetailedInfo:
216 workNewDetailedInfo(inboundPkt, msgId);
217 return;
218 case UpdatesType::typeUpdatesTooLong:
219 workUpdatesTooLong(inboundPkt, msgId);
220 return;
221 case TL_BadMsgNotification:
222 workBadMsgNotification(inboundPkt, msgId);
223 return;
224 }
225 qCWarning(TG_CORE_SESSION) << "Unknown rpc response message";
226 inboundPkt.setInPtr(inboundPkt.inEnd());
227 }
228
workContainer(InboundPkt & inboundPkt,qint64 msgId)229 void Session::workContainer (InboundPkt &inboundPkt, qint64 msgId) {
230 qCDebug(TG_CORE_SESSION) << "workContainer: msgId =" << QString::number(msgId, 16);
231 mAsserter.check(inboundPkt.fetchInt() == TL_MsgContainer);
232 qint32 n = inboundPkt.fetchInt();
233 for (qint32 i = 0; i < n; i++) { // message
234 qint64 id = inboundPkt.fetchLong (); // msg_id
235 if (id & 1) {
236 addToPendingAcks(id);
237 }
238 inboundPkt.fetchInt (); // seq_no
239 qint32 bytes = inboundPkt.fetchInt ();
240 qint32 *t = inboundPkt.inEnd();
241 inboundPkt.setInEnd(inboundPkt.inPtr() + (bytes / 4));
242 rpcExecuteAnswer(inboundPkt, id);
243 Q_ASSERT (inboundPkt.inPtr() == inboundPkt.inEnd());
244 inboundPkt.setInEnd(t);
245 }
246 }
247
workNewSessionCreated(InboundPkt & inboundPkt,qint64 msgId)248 void Session::workNewSessionCreated(InboundPkt &inboundPkt, qint64 msgId) {
249 qCDebug(TG_CORE_SESSION) << "workNewSessionCreated: msgId =" << QString::number(msgId, 16);
250 mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_NewSessionCreated);
251 inboundPkt.fetchLong(); // first_msg_id; //XXX set is as m_clientLastMsgId??
252 inboundPkt.fetchLong (); // unique_id
253 m_dc->setServerSalt(inboundPkt.fetchLong()); // server_salt
254 }
255
workMsgsAck(InboundPkt & inboundPkt,qint64 msgId)256 void Session::workMsgsAck(InboundPkt &inboundPkt, qint64 msgId) {
257 qCDebug(TG_CORE_SESSION) << "workMsgsAck: msgId =" << QString::number(msgId, 16);
258 mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_MsgsAck);
259 mAsserter.check(inboundPkt.fetchInt () == (qint32)CoreTypes::typeVector);
260 qint32 n = inboundPkt.fetchInt();
261 for (qint32 i = 0; i < n; i++) {
262 qint64 id = inboundPkt.fetchLong ();
263 Query *q = m_pendingQueries.value(id);
264 if(!q)
265 return;
266
267 Q_ASSERT(q);
268 q->setAcked(true);
269 }
270 }
271
workRpcResult(InboundPkt & inboundPkt,qint64 msgId)272 void Session::workRpcResult(InboundPkt &inboundPkt, qint64 msgId) {
273 qCDebug(TG_CORE_SESSION) << "workRpcResult: msgId =" << QString::number(msgId, 16);
274 mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_RpcResult);
275 qint64 id = inboundPkt.fetchLong();
276 qint32 op = inboundPkt.prefetchInt();
277 if (op == (qint32)TL_RpcError) {
278 queryOnError(inboundPkt, id);
279 } else {
280 queryOnResult(inboundPkt, id);
281 }
282 }
283
workUpdateShort(InboundPkt & inboundPkt,qint64 msgId)284 void Session::workUpdateShort(InboundPkt &inboundPkt, qint64 msgId) {
285 qCDebug(TG_CORE_SESSION) << "workUpdateShort: msgId =" << QString::number(msgId, 16);
286 UpdatesType upd(&inboundPkt);
287 Q_EMIT updateShort(upd.update(), upd.date());
288 }
289
workUpdatesCombined(InboundPkt & inboundPkt,qint64 msgId)290 void Session::workUpdatesCombined(InboundPkt &inboundPkt, qint64 msgId) {
291 qCDebug(TG_CORE_SESSION) << "workUpdatesCombined: msgId =" << QString::number(msgId, 16);
292 UpdatesType upd(&inboundPkt);
293 Q_EMIT updatesCombined(upd.updates(), upd.users(), upd.chats(), upd.date(), upd.seqStart(), upd.seq());
294 }
295
workUpdates(InboundPkt & inboundPkt,qint64 msgId)296 void Session::workUpdates(InboundPkt &inboundPkt, qint64 msgId) {
297 qCDebug(TG_CORE_SESSION) << "workUpdates: msgId =" << QString::number(msgId, 16);
298 UpdatesType upd(&inboundPkt);
299 Q_EMIT updates(upd.updates(), upd.users(), upd.chats(), upd.date(), upd.seq());
300 }
301
workUpdateShortMessage(InboundPkt & inboundPkt,qint64 msgId)302 void Session::workUpdateShortMessage(InboundPkt &inboundPkt, qint64 msgId) {
303 qCDebug(TG_CORE_SESSION) << "workUpdateShortMessage: msgId =" << QString::number(msgId, 16);
304 Q_UNUSED(msgId)
305 UpdatesType upd(&inboundPkt);
306 bool unread = (upd.flags() & 0x1);
307 bool out = (upd.flags() & 0x2);
308 Q_EMIT updateShortMessage(upd.id(), upd.userId(), upd.message(), upd.pts(), upd.ptsCount(), upd.date(), upd.fwdFromId(), upd.fwdDate(), upd.replyToMsgId(), unread, out);
309 }
310
workUpdateShortChatMessage(InboundPkt & inboundPkt,qint64 msgId)311 void Session::workUpdateShortChatMessage(InboundPkt &inboundPkt, qint64 msgId) {
312 qCDebug(TG_CORE_SESSION) << "workUpdateShortChatMessage: msgId =" << QString::number(msgId, 16);
313 Q_UNUSED(msgId)
314 UpdatesType upd(&inboundPkt);
315 bool unread = (upd.flags() & 0x1);
316 bool out = (upd.flags() & 0x2);
317 Q_EMIT updateShortChatMessage(upd.id(), upd.fromId(), upd.chatId(), upd.message(), upd.pts(), upd.ptsCount(), upd.date(), upd.fwdFromId(), upd.date(), upd.replyToMsgId(), unread, out);
318 }
319
workPacked(InboundPkt & inboundPkt,qint64 msgId)320 void Session::workPacked(InboundPkt &inboundPkt, qint64 msgId) {
321 qCDebug(TG_CORE_SESSION) << "workPacked: msgId =" << QString::number(msgId, 16);
322 mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_GZipPacked);
323 static qint32 buf[MAX_PACKED_SIZE >> 2];
324 qint32 l = inboundPkt.prefetchStrlen();
325 char *s = inboundPkt.fetchStr(l);
326
327 qint32 totalOut = Utils::tinflate(s, l, buf, MAX_PACKED_SIZE);
328 qint32 *inPtr = inboundPkt.inPtr();
329 qint32 *inEnd = inboundPkt.inEnd();
330 inboundPkt.setInPtr(buf);
331 inboundPkt.setInEnd(inboundPkt.inPtr() + totalOut / 4);
332 qCDebug(TG_CORE_SESSION) << "Unzipped data";
333 rpcExecuteAnswer(inboundPkt, msgId);
334 inboundPkt.setInPtr(inPtr); //TODO Not sure about this operations of setting inPtr and inEnd after execute answer completion
335 inboundPkt.setInEnd(inEnd);
336 }
337
workBadServerSalt(InboundPkt & inboundPkt,qint64 msgId)338 void Session::workBadServerSalt(InboundPkt &inboundPkt, qint64 msgId) {
339 Q_UNUSED(msgId)
340 mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_BadServerSalt);
341 qint64 badMsgId = inboundPkt.fetchLong();
342 qint32 badMsgSeqNo = inboundPkt.fetchInt();
343 qint32 errorCode = inboundPkt.fetchInt();
344 qCDebug(TG_CORE_SESSION) << "workBadServerSalt: badMsgId =" << QString::number(badMsgId, 16)
345 << ", badMsgSeqNo =" << badMsgSeqNo << ", errorCode =" << errorCode;
346 m_dc->setServerSalt(inboundPkt.fetchLong()); // new server_salt
347 Query *q = m_pendingQueries.take(badMsgId);
348 qint64 newMsgId = recomposeAndSendQuery(q);
349 if (newMsgId != 0) {
350 Q_EMIT updateMessageId(badMsgId, newMsgId);
351 }
352 }
353
workPong(InboundPkt & inboundPkt,qint64 msgId)354 void Session::workPong(InboundPkt &inboundPkt, qint64 msgId) {
355 qCDebug(TG_CORE_SESSION) << "workPong: msgId =" << QString::number(msgId, 16);
356 mAsserter.check (inboundPkt.fetchInt() == (qint32)TL_Pong);
357 inboundPkt.fetchLong(); // msg_id
358 inboundPkt.fetchLong(); // ping_id
359 }
360
workDetailedInfo(InboundPkt & inboundPkt,qint64 msgId)361 void Session::workDetailedInfo(InboundPkt &inboundPkt, qint64 msgId) {
362 qCDebug(TG_CORE_SESSION) << "workDetailedInfo: msgId =" << QString::number(msgId, 16);
363 mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_MsgDetailedInfo);
364 inboundPkt.fetchLong(); // msg_id
365 inboundPkt.fetchLong(); // answer_msg_id
366 inboundPkt.fetchInt(); // bytes
367 inboundPkt.fetchInt(); // status
368 }
369
workNewDetailedInfo(InboundPkt & inboundPkt,qint64 msgId)370 void Session::workNewDetailedInfo(InboundPkt &inboundPkt, qint64 msgId) {
371 qCDebug(TG_CORE_SESSION) << "workNewDetailedInfo: msgId =" << QString::number(msgId, 16);
372 mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_MsgNewDetailedInfo);
373 inboundPkt.fetchLong(); // answer_msg_id
374 inboundPkt.fetchInt(); // bytes
375 inboundPkt.fetchInt(); // status
376 }
377
workUpdatesTooLong(InboundPkt & inboundPkt,qint64 msgId)378 void Session::workUpdatesTooLong(InboundPkt &inboundPkt, qint64 msgId) {
379 qCDebug(TG_CORE_SESSION) << "workUpdatesTooLong: msgId =" << QString::number(msgId, 16);
380 UpdatesType upd(&inboundPkt);
381 Q_UNUSED(upd)
382 Q_EMIT updatesTooLong();
383 }
384
workBadMsgNotification(InboundPkt & inboundPkt,qint64 msgId)385 void Session::workBadMsgNotification(InboundPkt &inboundPkt, qint64 msgId) {
386 mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_BadMsgNotification);
387 qint64 badMsgId = inboundPkt.fetchLong();
388 qint32 badMsgSeqNo = inboundPkt.fetchInt();
389 qint32 errorCode = inboundPkt.fetchInt();
390 qCWarning(TG_CORE_SESSION) << "workBadMsgNotification: badMsgId =" << QString::number(badMsgId, 16) <<
391 ", badMsgSeqNo =" << badMsgSeqNo << ", errorCode =" << errorCode;
392 switch (errorCode) {
393 case 16:
394 case 17:
395 case 19:
396 case 32:
397 case 33:
398 case 64:
399 // update time sync difference and reset msgIds counter
400 qint32 serverTime = msgId >> 32LL;
401 mTimeDifference = QDateTime::currentDateTime().toTime_t() - serverTime;
402
403 qint64 nextId = generatePlainNextMsgId();
404 if (!m_pendingQueries.contains(nextId)) {
405 m_clientLastMsgId = 0;
406 }
407
408 // read removing from pending queries, recompose and send the last query
409 Query *q = m_pendingQueries.take(badMsgId);
410 qint64 newMsgId = recomposeAndSendQuery(q);
411 if (newMsgId != 0) {
412 Q_EMIT updateMessageId(badMsgId, newMsgId);
413 }
414 break;
415 }
416 }
417
initEncryptedMessage(qint32 useful)418 Session::EncryptedMsg *Session::initEncryptedMessage(qint32 useful) {
419 EncryptedMsg *encMsg = new EncryptedMsg;
420 Q_ASSERT(m_dc->authKeyId());
421 encMsg->authKeyId = m_dc->authKeyId();
422 Q_ASSERT(m_dc->serverSalt());
423 encMsg->serverSalt = m_dc->serverSalt();
424 Q_ASSERT(m_sessionId);
425 encMsg->sessionId = m_sessionId;
426 encMsg->msgId = generateNextMsgId();
427 encMsg->seqNo = m_seqNo;
428 if (useful) {
429 encMsg->seqNo |= 1;
430 }
431 m_seqNo += 2;
432 return encMsg;
433 }
434
aesEncryptMessage(EncryptedMsg * encMsg)435 qint32 Session::aesEncryptMessage (EncryptedMsg *encMsg) {
436 uchar sha1Buffer[20];
437 const qint32 MINSZ = offsetof (EncryptedMsg, message);
438 const qint32 UNENCSZ = offsetof (EncryptedMsg, serverSalt);
439 qint32 encLen = (MINSZ - UNENCSZ) + encMsg->msgLen;
440 Q_ASSERT (encMsg->msgLen >= 0 && encMsg->msgLen <= MAX_MESSAGE_INTS * 4 - 16 && !(encMsg->msgLen & 3));
441 SHA1 ((uchar *) &encMsg->serverSalt, encLen, sha1Buffer);
442 qCDebug(TG_CORE_SESSION) << "sending message with sha1" << QString::number(*(qint32 *)sha1Buffer, 8);
443
444 memcpy (encMsg->msgKey, sha1Buffer + 4, 16);
445 mCrypto->initAESAuth(m_dc->authKey(), encMsg->msgKey, AES_ENCRYPT);
446 return mCrypto->padAESEncrypt((char *) &encMsg->serverSalt, encLen, (char *) &encMsg->serverSalt, MAX_MESSAGE_INTS * 4 + (MINSZ - UNENCSZ));
447 }
448
encryptSendMessage(qint32 * msg,qint32 msgInts,qint32 useful)449 qint64 Session::encryptSendMessage(qint32 *msg, qint32 msgInts, qint32 useful) {
450 const qint32 UNENCSZ = offsetof (EncryptedMsg, serverSalt);
451 if (msgInts <= 0 || msgInts > MAX_MESSAGE_INTS - 4) {
452 return -1;
453 }
454 EncryptedMsg *encMsg = initEncryptedMessage(useful);
455 if (msg) {
456 memcpy (encMsg->message, msg, msgInts * 4);
457 encMsg->msgLen = msgInts * 4;
458 } else if ((encMsg->msgLen & 0x80000003) || encMsg->msgLen > MAX_MESSAGE_INTS * 4 - 16) {
459 delete encMsg;
460 return -1;
461 }
462
463 qint32 l = aesEncryptMessage(encMsg);
464 Q_ASSERT(l > 0);
465
466 if(!rpcSendMessage(encMsg, l + UNENCSZ))
467 return -1;
468
469 delete encMsg;
470 return m_clientLastMsgId;
471 }
472
rpcSendMessage(void * data,qint32 len)473 bool Session::rpcSendMessage(void *data, qint32 len) {
474 qCDebug(TG_CORE_SESSION) << "rpcSendMessage()," << len;
475
476 qint32 written;
477 Q_UNUSED(written);
478
479 mAsserter.check(len > 0 && !(len & 0xfc000003));
480 qint32 totalLen = len >> 2;
481
482 if (totalLen < 0x7f) {
483 written = writeOut(&totalLen, 1);
484 if(written != 1)
485 return false;
486 } else {
487 totalLen = (totalLen << 8) | 0x7f;
488 written = writeOut(&totalLen, 4);
489 if(written != 4)
490 return false;
491 }
492
493 written = writeOut(data, len);
494 if(written != len)
495 return false;
496
497 return true;
498 }
499
500
501 //### query management
sendQuery(OutboundPkt & outboundPkt,QueryMethods * methods,const QVariant & extra,const QString & name)502 qint64 Session::sendQuery(OutboundPkt &outboundPkt, QueryMethods *methods, const QVariant &extra, const QString &name) {
503 Q_ASSERT (m_sessionId);
504 Q_ASSERT (m_dc->authKeyId());
505 qint32 *data = outboundPkt.buffer();
506 qint32 ints = outboundPkt.length();
507
508 qCDebug(TG_CORE_SESSION) << "Sending query of size" << 4 * ints << "to DC" << m_dc->id() << "at" << peerName() << ":" << peerPort() << "by session" << QString::number(m_sessionId, 16);
509
510 Query *q = new Query(this);
511 q->setData(data, ints);
512 q->setMsgId(encryptSendMessage(data, ints, 1));
513 q->setSeqNo(m_seqNo - 1);
514 qCDebug(TG_CORE_SESSION) << "msgId is" << QString::number(q->msgId(), 16);
515 q->setMethods(methods);
516 q->setAcked(false);
517 q->setExtra(extra);
518 q->setName(name);
519
520 if (mSettings->resendQueries()) {
521 connect(q, SIGNAL(timeout(Query*)), this, SLOT(resendQuery(Query*)), Qt::UniqueConnection);
522 q->startTimer(QUERY_TIMEOUT);
523 }
524
525 m_pendingQueries.insert(q->msgId(), q);
526
527 return q->msgId();
528 }
529
recomposeAndSendQuery(Query * q)530 qint64 Session::recomposeAndSendQuery(Query *q) {
531 if(!q)
532 return 0;
533
534 qCDebug(TG_CORE_SESSION) << "Resending query with previous msgId" << QString::number(q->msgId(), 16);
535 q->setMsgId(encryptSendMessage((qint32 *)q->data(), q->dataLength(), 1));
536 q->setSeqNo(m_seqNo - 1);
537 qCDebug(TG_CORE_SESSION) << "new msgId is" << QString::number(q->msgId(), 16);
538 q->setAcked(false);
539 m_pendingQueries.insert(q->msgId(), q);
540 return q->msgId();
541 }
542
resendQuery(Query * q)543 void Session::resendQuery(Query *q) {
544 Q_ASSERT(q);
545 //avoid resending if resend numbers is less than zero
546 if (q->decreaseResends() < 0) {
547 qCDebug(TG_CORE_SESSION) << "Max resend numbers reached for query with msgId" << QString::number(q->msgId(), 16) << ",query discarded";
548 m_pendingQueries.remove(q->msgId());
549 delete q;
550 } else {
551 qCDebug(TG_CORE_SESSION) << "Resending query with msgId" << QString::number(q->msgId(), 16);
552 OutboundPkt p(mSettings);
553 p.appendInt(TL_MsgContainer);
554 p.appendInt(1);
555 p.appendLong(q->msgId());
556 p.appendInt(q->seqNo());
557 p.appendInt(4 * q->dataLength());
558 p.appendInts((qint32 *)q->data(), q->dataLength());
559 encryptSendMessage(p.buffer(), p.length(), 0);
560 }
561 }
562
queryOnResult(InboundPkt & inboundPkt,qint64 msgId)563 void Session::queryOnResult(InboundPkt &inboundPkt, qint64 msgId) {
564 qCDebug(TG_CORE_SESSION) << "result for query" << QString::number(msgId, 16);
565 qint32 op = inboundPkt.prefetchInt();
566 qint32 *inPtr = 0;
567 qint32 *inEnd = 0;
568 if (op == (qint32)TL_GZipPacked) {
569 inboundPkt.fetchInt();
570 qint32 l = inboundPkt.prefetchStrlen();
571 char *s = inboundPkt.fetchStr(l);
572 static qint32 packedBuffer[MAX_PACKED_SIZE / 4];
573 qint32 totalOut = Utils::tinflate (s, l, packedBuffer, MAX_PACKED_SIZE);
574 inPtr = inboundPkt.inPtr();
575 inEnd = inboundPkt.inEnd();
576 inboundPkt.setInPtr(packedBuffer);
577 inboundPkt.setInEnd(inboundPkt.inPtr() + totalOut / 4);
578 qCDebug(TG_CORE_SESSION) << "unzipped data";
579 }
580
581 Query *q = m_pendingQueries.take(msgId);
582 if (!q) {
583 qCWarning(TG_CORE_SESSION) << "No such query";
584 inboundPkt.setInPtr(inboundPkt.inEnd());
585 } else {
586 qCDebug(TG_CORE_SESSION) << "acked query with msgId" << QString::number(msgId, 16) << ",pendingQueries:" << m_pendingQueries.size();
587 q->setAcked(true);
588 Q_EMIT resultReceived(q, inboundPkt);
589 }
590
591 if (inPtr) {
592 inboundPkt.setInPtr(inPtr);
593 inboundPkt.setInEnd(inEnd);
594 }
595 }
596
queryOnError(InboundPkt & inboundPkt,qint64 msgId)597 void Session::queryOnError(InboundPkt &inboundPkt, qint64 msgId) {
598 mAsserter.check(inboundPkt.fetchInt() == TL_RpcError);
599 qint32 errorCode = inboundPkt.fetchInt();
600 QString errorText = inboundPkt.fetchQString();
601 qCDebug(TG_CORE_SESSION) << "error for query" << QString::number(msgId, 16) << " :" << errorCode << " :" << errorText;
602
603 Query *q = m_pendingQueries.take(msgId);
604 if (!q) {
605 qCWarning(TG_CORE_SESSION) << "No such query";
606 } else {
607 q->setAcked(true);
608 Q_EMIT errorReceived(q, errorCode, errorText);
609 }
610 }
611
612
addToPendingAcks(qint64 msgId)613 void Session::addToPendingAcks(qint64 msgId) {
614 EventTimer *t = new EventTimer(msgId, ACK_TIMEOUT, this);
615 connect(t, SIGNAL(timerTimeout(qint64)), this, SLOT(ack(qint64)));
616 t->start(); //timeout of 60 secs
617 m_pendingAcks[msgId] = t;
618 if (m_pendingAcks.size() > MAX_PENDING_ACKS) {
619 ackAll();
620 }
621 }
622
ack(qint64 msgId)623 void Session::ack(qint64 msgId) {
624 QList<qint64> idsList;
625 idsList.append(msgId);
626 sendAcks(idsList);
627 }
628
ackAll()629 void Session::ackAll() {
630 sendAcks(m_pendingAcks.keys());
631 }
632
sendAcks(const QList<qint64> & msgIds)633 void Session::sendAcks(const QList<qint64> &msgIds) {
634 OutboundPkt p(mSettings);
635 p.appendInt(TL_MsgsAck);
636 p.appendInt(CoreTypes::typeVector);
637 int n = msgIds.length();
638 p.appendInt(n);
639 Q_FOREACH (qint64 msgId, msgIds) {
640 p.appendLong(msgId);
641 qCDebug(TG_CORE_SESSION) << "Ack for msgId" << msgId;
642 EventTimer* t = m_pendingAcks.take(msgId);
643 if (t) {
644 t->stop();
645 delete t;
646 }
647 }
648 qint64 sentAcksId = encryptSendMessage(p.buffer(), p.length(), 0);
649 qCDebug(TG_CORE_SESSION) << "Sent Acks with id:" << QString::number(sentAcksId, 16);
650 }
651