1 /*
2  * Copyright 2013 Vitaly Valtman
3  * Copyright 2014 Canonical Ltd.
4  * Authors:
5  *      Roberto Mier
6  *      Tiago Herrmann
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; version 3.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
19  *
20  */
21 
22 #include "session.h"
23 #include <openssl/rand.h>
24 #include <openssl/sha.h>
25 #include <QDateTime>
26 #include "util/tlvalues.h"
27 #include "telegram/coretypes.h"
28 
29 Q_LOGGING_CATEGORY(TG_CORE_SESSION, "tg.core.session")
30 
31 qint64 Session::m_clientLastMsgId = 0;
32 
33 
Session(DC * dc,Settings * settings,CryptoUtils * crypto,QObject * parent)34 Session::Session(DC *dc, Settings *settings, CryptoUtils *crypto, QObject *parent) :
35     Connection(dc->host(), dc->port(), parent),
36     mSettings(settings),
37     mCrypto(crypto),
38     m_sessionId(0),
39     m_serverSalt(0),
40     mTimeDifference(0),
41     m_seqNo(0),
42     m_dc(dc),
43     m_initConnectionNeeded(true) {
44     // ensure dc has, at least, the shared key created
45     Q_ASSERT(dc->state() >= DC::authKeyCreated);
46     // copy calculated values for timeDifference and serverSalt when created shared key.
47     // This copy is needed because we could have several sessions against same dc with different values
48     mTimeDifference = m_dc->timeDifference();
49     m_serverSalt = m_dc->serverSalt();
50     // create session id
51     RAND_pseudo_bytes((uchar *) &m_sessionId, 8);
52     qCDebug(TG_CORE_SESSION) << "created session with id" << QString::number(m_sessionId, 16);
53 }
54 
~Session()55 Session::~Session() {
56     qCDebug(TG_CORE_SESSION) << "destroyed session with id" << QString::number(m_sessionId, 16);
57 }
58 
close()59 void Session::close() {
60     if (this->state() != QAbstractSocket::UnconnectedState) {
61         connect(this, SIGNAL(disconnected()), SLOT(onDisconnected()));
62         this->disconnectFromHost();
63     } else {
64         Q_EMIT sessionClosed(m_sessionId);
65     }
66 }
67 
onDisconnected()68 void Session::onDisconnected() {
69     if(error() == QAbstractSocket::RemoteHostClosedError) {
70         return; // Trying to reconnect...
71     }
72 
73     Q_EMIT sessionClosed(m_sessionId);
74 }
75 
release()76 void Session::release() {
77     Q_EMIT sessionReleased(m_sessionId);
78 }
79 
dc()80 DC *Session::dc() {
81     return m_dc;
82 }
83 
generatePlainNextMsgId()84 qint64 Session::generatePlainNextMsgId() {
85     return (qint64) ((QDateTime::currentDateTime().toTime_t() - mTimeDifference) * (1LL << 32)) & -4;
86 }
87 
generateNextMsgId()88 qint64 Session::generateNextMsgId() {
89     qint64 nextId = generatePlainNextMsgId();
90     if (nextId <= m_clientLastMsgId) {
91         nextId = m_clientLastMsgId += 4;
92     } else {
93         m_clientLastMsgId = nextId;
94     }
95     return nextId;
96 }
97 
processConnected()98 void Session::processConnected() {
99     Q_EMIT sessionReady(m_dc);
100 }
101 
processRpcAnswer(QByteArray response)102 void Session::processRpcAnswer(QByteArray response) {
103     qint32 len = response.length();
104 
105     qCDebug(TG_CORE_SESSION) << "connection #" << socketDescriptor() << "received rpc answer with" << len << "content bytes by session" << QString::number(m_sessionId, 16);
106 
107     InboundPkt p(response.data(), len);
108     processRpcMessage(p);
109 }
110 
processRpcMessage(InboundPkt & inboundPkt)111 void Session::processRpcMessage(InboundPkt &inboundPkt) {
112 
113     EncryptedMsg *enc = (EncryptedMsg *)inboundPkt.buffer();
114     qint32 len = inboundPkt.length();
115 
116     const qint32 MINSZ = offsetof (EncryptedMsg, message);
117     const qint32 UNENCSZ = offsetof (EncryptedMsg, serverSalt);
118     qCDebug(TG_CORE_SESSION) << "processRpcMessage(), len=" << len;
119 
120     if(len < MINSZ || (len & 15) != (UNENCSZ & 15))
121         return;
122 
123     Q_ASSERT(m_dc->authKeyId());
124     mAsserter.check(enc->authKeyId == m_dc->authKeyId());
125     //msg_key is used to compute AES key and to decrypt the received message
126     mCrypto->initAESAuth(m_dc->authKey() + 8, enc->msgKey, AES_DECRYPT);
127     qint32 l = mCrypto->padAESDecrypt((char *)&enc->serverSalt, len - UNENCSZ, (char *)&enc->serverSalt, len - UNENCSZ);
128     Q_UNUSED(l);
129     Q_ASSERT(l == len - UNENCSZ);
130     if( !(!(enc->msgLen & 3) && enc->msgLen > 0 && enc->msgLen <= len - MINSZ && len - MINSZ - enc->msgLen <= 12) )
131         return;
132 
133     //check msg_key is indeed equal to SHA1 of the plaintext obtained after decription (without final padding bytes).
134     static uchar sha1Buffer[20];
135     SHA1((uchar *)&enc->serverSalt, enc->msgLen + (MINSZ - UNENCSZ), sha1Buffer);
136     if(memcmp (&enc->msgKey, sha1Buffer + 4, 16))
137         return;
138 
139     if (m_dc->serverSalt() != enc->serverSalt) {
140         m_dc->setServerSalt(enc->serverSalt);
141     }
142 
143     // check time synchronization
144     qint32 serverTime = enc->msgId >> 32LL;
145     qint32 clientTime = QDateTime::currentDateTime().toTime_t() - mTimeDifference;
146     if (clientTime <= serverTime - 30 || clientTime >= serverTime + 300) {
147         qCDebug(TG_CORE_SESSION) << "salt =" << enc->serverSalt << ", sessionId =" << QString::number(enc->sessionId, 16) << ", msgId =" << QString::number(enc->msgId, 16) << ", seqNo =" << enc->seqNo << ", serverTime =" << serverTime << ", clientTime =" << clientTime;
148         QString alert("Received message has too large difference between client and server dates - ");
149         if (clientTime <= serverTime -30) {
150             alert.append("the message has a date at least 30 seconds later in time than current date");
151         } else {
152             alert.append("the message was sent at least 300 seconds ago");
153         }
154         qCWarning(TG_CORE_SESSION) << alert;
155     }
156 
157     inboundPkt.setInPtr(enc->message);
158     inboundPkt.setInEnd(inboundPkt.inPtr() + (enc->msgLen / 4));
159 
160     qCDebug(TG_CORE_SESSION) << "received message id" << QString::number(enc->msgId, 16);
161 
162     Q_ASSERT(l >= (MINSZ - UNENCSZ) + 8);
163 
164     if (enc->msgId & 1) {
165         addToPendingAcks(enc->msgId);
166     }
167 
168     mAsserter.check(m_sessionId == enc->sessionId);
169     rpcExecuteAnswer(inboundPkt, enc->msgId);
170     mAsserter.check(inboundPkt.inPtr() == inboundPkt.inEnd());
171 }
172 
rpcExecuteAnswer(InboundPkt & inboundPkt,qint64 msgId)173 void Session::rpcExecuteAnswer(InboundPkt &inboundPkt, qint64 msgId) {
174     qint32 op = inboundPkt.prefetchInt();
175     qCDebug(TG_CORE_SESSION) << "rpcExecuteAnswer(), op =" << QString::number(op, 16);
176     switch (op) {
177     case TL_MsgContainer:
178         workContainer(inboundPkt, msgId);
179         return;
180     case TL_NewSessionCreated:
181         workNewSessionCreated(inboundPkt, msgId);
182         return;
183     case TL_MsgsAck:
184         workMsgsAck(inboundPkt, msgId);
185         return;
186     case TL_RpcResult:
187         workRpcResult(inboundPkt, msgId);
188         return;
189     case UpdatesType::typeUpdateShort:
190         workUpdateShort(inboundPkt, msgId);
191         return;
192     case UpdatesType::typeUpdatesCombined:
193         workUpdatesCombined(inboundPkt, msgId);
194     case UpdatesType::typeUpdates:
195         workUpdates(inboundPkt, msgId);
196         return;
197     case UpdatesType::typeUpdateShortMessage:
198         workUpdateShortMessage(inboundPkt, msgId);
199         return;
200     case UpdatesType::typeUpdateShortChatMessage:
201         workUpdateShortChatMessage(inboundPkt, msgId);
202         return;
203     case TL_GZipPacked:
204         workPacked(inboundPkt, msgId);
205         return;
206     case TL_BadServerSalt:
207         workBadServerSalt(inboundPkt, msgId);
208         return;
209     case TL_Pong:
210         workPong(inboundPkt, msgId);
211         return;
212     case TL_MsgDetailedInfo:
213         workDetailedInfo(inboundPkt, msgId);
214         return;
215     case TL_MsgNewDetailedInfo:
216         workNewDetailedInfo(inboundPkt, msgId);
217         return;
218     case UpdatesType::typeUpdatesTooLong:
219         workUpdatesTooLong(inboundPkt, msgId);
220         return;
221     case TL_BadMsgNotification:
222         workBadMsgNotification(inboundPkt, msgId);
223         return;
224     }
225     qCWarning(TG_CORE_SESSION) << "Unknown rpc response message";
226     inboundPkt.setInPtr(inboundPkt.inEnd());
227 }
228 
workContainer(InboundPkt & inboundPkt,qint64 msgId)229 void Session::workContainer (InboundPkt &inboundPkt, qint64 msgId) {
230     qCDebug(TG_CORE_SESSION) << "workContainer: msgId =" << QString::number(msgId, 16);
231     mAsserter.check(inboundPkt.fetchInt() == TL_MsgContainer);
232     qint32 n = inboundPkt.fetchInt();
233     for (qint32 i = 0; i < n; i++) { // message
234         qint64 id = inboundPkt.fetchLong (); // msg_id
235         if (id & 1) {
236            addToPendingAcks(id);
237         }
238         inboundPkt.fetchInt (); // seq_no
239         qint32 bytes = inboundPkt.fetchInt ();
240         qint32 *t = inboundPkt.inEnd();
241         inboundPkt.setInEnd(inboundPkt.inPtr() + (bytes / 4));
242         rpcExecuteAnswer(inboundPkt, id);
243         Q_ASSERT (inboundPkt.inPtr() == inboundPkt.inEnd());
244         inboundPkt.setInEnd(t);
245     }
246 }
247 
workNewSessionCreated(InboundPkt & inboundPkt,qint64 msgId)248 void Session::workNewSessionCreated(InboundPkt &inboundPkt, qint64 msgId) {
249     qCDebug(TG_CORE_SESSION) << "workNewSessionCreated: msgId =" << QString::number(msgId, 16);
250     mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_NewSessionCreated);
251     inboundPkt.fetchLong(); // first_msg_id; //XXX set is as m_clientLastMsgId??
252     inboundPkt.fetchLong (); // unique_id
253     m_dc->setServerSalt(inboundPkt.fetchLong()); // server_salt
254 }
255 
workMsgsAck(InboundPkt & inboundPkt,qint64 msgId)256 void Session::workMsgsAck(InboundPkt &inboundPkt, qint64 msgId) {
257     qCDebug(TG_CORE_SESSION) << "workMsgsAck: msgId =" << QString::number(msgId, 16);
258     mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_MsgsAck);
259     mAsserter.check(inboundPkt.fetchInt () == (qint32)CoreTypes::typeVector);
260     qint32 n = inboundPkt.fetchInt();
261     for (qint32 i = 0; i < n; i++) {
262         qint64 id = inboundPkt.fetchLong ();
263         Query *q = m_pendingQueries.value(id);
264         if(!q)
265             return;
266 
267         Q_ASSERT(q);
268         q->setAcked(true);
269     }
270 }
271 
workRpcResult(InboundPkt & inboundPkt,qint64 msgId)272 void Session::workRpcResult(InboundPkt &inboundPkt, qint64 msgId) {
273     qCDebug(TG_CORE_SESSION) << "workRpcResult: msgId =" << QString::number(msgId, 16);
274     mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_RpcResult);
275     qint64 id = inboundPkt.fetchLong();
276     qint32 op = inboundPkt.prefetchInt();
277     if (op == (qint32)TL_RpcError) {
278         queryOnError(inboundPkt, id);
279     } else {
280         queryOnResult(inboundPkt, id);
281     }
282 }
283 
workUpdateShort(InboundPkt & inboundPkt,qint64 msgId)284 void Session::workUpdateShort(InboundPkt &inboundPkt, qint64 msgId) {
285     qCDebug(TG_CORE_SESSION) << "workUpdateShort: msgId =" << QString::number(msgId, 16);
286     UpdatesType upd(&inboundPkt);
287     Q_EMIT updateShort(upd.update(), upd.date());
288 }
289 
workUpdatesCombined(InboundPkt & inboundPkt,qint64 msgId)290 void Session::workUpdatesCombined(InboundPkt &inboundPkt, qint64 msgId) {
291     qCDebug(TG_CORE_SESSION) << "workUpdatesCombined: msgId =" << QString::number(msgId, 16);
292     UpdatesType upd(&inboundPkt);
293     Q_EMIT updatesCombined(upd.updates(), upd.users(), upd.chats(), upd.date(), upd.seqStart(), upd.seq());
294 }
295 
workUpdates(InboundPkt & inboundPkt,qint64 msgId)296 void Session::workUpdates(InboundPkt &inboundPkt, qint64 msgId) {
297     qCDebug(TG_CORE_SESSION) << "workUpdates: msgId =" << QString::number(msgId, 16);
298     UpdatesType upd(&inboundPkt);
299     Q_EMIT updates(upd.updates(), upd.users(), upd.chats(), upd.date(), upd.seq());
300 }
301 
workUpdateShortMessage(InboundPkt & inboundPkt,qint64 msgId)302 void Session::workUpdateShortMessage(InboundPkt &inboundPkt, qint64 msgId) {
303     qCDebug(TG_CORE_SESSION) << "workUpdateShortMessage: msgId =" << QString::number(msgId, 16);
304     Q_UNUSED(msgId)
305     UpdatesType upd(&inboundPkt);
306     bool unread = (upd.flags() & 0x1);
307     bool out = (upd.flags() & 0x2);
308     Q_EMIT updateShortMessage(upd.id(), upd.userId(), upd.message(), upd.pts(), upd.ptsCount(), upd.date(), upd.fwdFromId(), upd.fwdDate(), upd.replyToMsgId(), unread, out);
309 }
310 
workUpdateShortChatMessage(InboundPkt & inboundPkt,qint64 msgId)311 void Session::workUpdateShortChatMessage(InboundPkt &inboundPkt, qint64 msgId) {
312     qCDebug(TG_CORE_SESSION) << "workUpdateShortChatMessage: msgId =" << QString::number(msgId, 16);
313     Q_UNUSED(msgId)
314     UpdatesType upd(&inboundPkt);
315     bool unread = (upd.flags() & 0x1);
316     bool out = (upd.flags() & 0x2);
317     Q_EMIT updateShortChatMessage(upd.id(), upd.fromId(), upd.chatId(), upd.message(), upd.pts(), upd.ptsCount(), upd.date(), upd.fwdFromId(), upd.date(), upd.replyToMsgId(), unread, out);
318 }
319 
workPacked(InboundPkt & inboundPkt,qint64 msgId)320 void Session::workPacked(InboundPkt &inboundPkt, qint64 msgId) {
321     qCDebug(TG_CORE_SESSION) << "workPacked: msgId =" << QString::number(msgId, 16);
322     mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_GZipPacked);
323     static qint32 buf[MAX_PACKED_SIZE >> 2];
324     qint32 l = inboundPkt.prefetchStrlen();
325     char *s = inboundPkt.fetchStr(l);
326 
327     qint32 totalOut = Utils::tinflate(s, l, buf, MAX_PACKED_SIZE);
328     qint32 *inPtr = inboundPkt.inPtr();
329     qint32 *inEnd = inboundPkt.inEnd();
330     inboundPkt.setInPtr(buf);
331     inboundPkt.setInEnd(inboundPkt.inPtr() + totalOut / 4);
332     qCDebug(TG_CORE_SESSION) << "Unzipped data";
333     rpcExecuteAnswer(inboundPkt, msgId);
334     inboundPkt.setInPtr(inPtr); //TODO Not sure about this operations of setting inPtr and inEnd after execute answer completion
335     inboundPkt.setInEnd(inEnd);
336 }
337 
workBadServerSalt(InboundPkt & inboundPkt,qint64 msgId)338 void Session::workBadServerSalt(InboundPkt &inboundPkt, qint64 msgId) {
339     Q_UNUSED(msgId)
340     mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_BadServerSalt);
341     qint64 badMsgId = inboundPkt.fetchLong();
342     qint32 badMsgSeqNo = inboundPkt.fetchInt();
343     qint32 errorCode = inboundPkt.fetchInt();
344     qCDebug(TG_CORE_SESSION) << "workBadServerSalt: badMsgId =" << QString::number(badMsgId, 16)
345             << ", badMsgSeqNo =" << badMsgSeqNo << ", errorCode =" << errorCode;
346     m_dc->setServerSalt(inboundPkt.fetchLong()); // new server_salt
347     Query *q = m_pendingQueries.take(badMsgId);
348     qint64 newMsgId = recomposeAndSendQuery(q);
349     if (newMsgId != 0) {
350         Q_EMIT updateMessageId(badMsgId, newMsgId);
351     }
352 }
353 
workPong(InboundPkt & inboundPkt,qint64 msgId)354 void Session::workPong(InboundPkt &inboundPkt, qint64 msgId) {
355     qCDebug(TG_CORE_SESSION) << "workPong: msgId =" << QString::number(msgId, 16);
356     mAsserter.check (inboundPkt.fetchInt() == (qint32)TL_Pong);
357     inboundPkt.fetchLong(); // msg_id
358     inboundPkt.fetchLong(); // ping_id
359 }
360 
workDetailedInfo(InboundPkt & inboundPkt,qint64 msgId)361 void Session::workDetailedInfo(InboundPkt &inboundPkt, qint64 msgId) {
362     qCDebug(TG_CORE_SESSION) << "workDetailedInfo: msgId =" << QString::number(msgId, 16);
363     mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_MsgDetailedInfo);
364     inboundPkt.fetchLong(); // msg_id
365     inboundPkt.fetchLong(); // answer_msg_id
366     inboundPkt.fetchInt(); // bytes
367     inboundPkt.fetchInt(); // status
368 }
369 
workNewDetailedInfo(InboundPkt & inboundPkt,qint64 msgId)370 void Session::workNewDetailedInfo(InboundPkt &inboundPkt, qint64 msgId) {
371     qCDebug(TG_CORE_SESSION) << "workNewDetailedInfo: msgId =" << QString::number(msgId, 16);
372     mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_MsgNewDetailedInfo);
373     inboundPkt.fetchLong(); // answer_msg_id
374     inboundPkt.fetchInt(); // bytes
375     inboundPkt.fetchInt(); // status
376 }
377 
workUpdatesTooLong(InboundPkt & inboundPkt,qint64 msgId)378 void Session::workUpdatesTooLong(InboundPkt &inboundPkt, qint64 msgId) {
379     qCDebug(TG_CORE_SESSION) << "workUpdatesTooLong: msgId =" << QString::number(msgId, 16);
380     UpdatesType upd(&inboundPkt);
381     Q_UNUSED(upd)
382     Q_EMIT updatesTooLong();
383 }
384 
workBadMsgNotification(InboundPkt & inboundPkt,qint64 msgId)385 void Session::workBadMsgNotification(InboundPkt &inboundPkt, qint64 msgId) {
386     mAsserter.check(inboundPkt.fetchInt() == (qint32)TL_BadMsgNotification);
387     qint64 badMsgId = inboundPkt.fetchLong();
388     qint32 badMsgSeqNo = inboundPkt.fetchInt();
389     qint32 errorCode = inboundPkt.fetchInt();
390     qCWarning(TG_CORE_SESSION) << "workBadMsgNotification: badMsgId =" << QString::number(badMsgId, 16) <<
391             ", badMsgSeqNo =" << badMsgSeqNo << ", errorCode =" << errorCode;
392     switch (errorCode) {
393     case 16:
394     case 17:
395     case 19:
396     case 32:
397     case 33:
398     case 64:
399         // update time sync difference and reset msgIds counter
400         qint32 serverTime = msgId >> 32LL;
401         mTimeDifference = QDateTime::currentDateTime().toTime_t() - serverTime;
402 
403         qint64 nextId = generatePlainNextMsgId();
404         if (!m_pendingQueries.contains(nextId)) {
405             m_clientLastMsgId = 0;
406         }
407 
408         // read removing from pending queries, recompose and send the last query
409         Query *q = m_pendingQueries.take(badMsgId);
410         qint64 newMsgId = recomposeAndSendQuery(q);
411         if (newMsgId != 0) {
412             Q_EMIT updateMessageId(badMsgId, newMsgId);
413         }
414         break;
415     }
416 }
417 
initEncryptedMessage(qint32 useful)418 Session::EncryptedMsg *Session::initEncryptedMessage(qint32 useful) {
419     EncryptedMsg *encMsg = new EncryptedMsg;
420     Q_ASSERT(m_dc->authKeyId());
421     encMsg->authKeyId = m_dc->authKeyId();
422     Q_ASSERT(m_dc->serverSalt());
423     encMsg->serverSalt = m_dc->serverSalt();
424     Q_ASSERT(m_sessionId);
425     encMsg->sessionId = m_sessionId;
426     encMsg->msgId = generateNextMsgId();
427     encMsg->seqNo = m_seqNo;
428     if (useful) {
429         encMsg->seqNo |= 1;
430     }
431     m_seqNo += 2;
432     return encMsg;
433 }
434 
aesEncryptMessage(EncryptedMsg * encMsg)435 qint32 Session::aesEncryptMessage (EncryptedMsg *encMsg) {
436     uchar sha1Buffer[20];
437     const qint32 MINSZ = offsetof (EncryptedMsg, message);
438     const qint32 UNENCSZ = offsetof (EncryptedMsg, serverSalt);
439     qint32 encLen = (MINSZ - UNENCSZ) + encMsg->msgLen;
440     Q_ASSERT (encMsg->msgLen >= 0 && encMsg->msgLen <= MAX_MESSAGE_INTS * 4 - 16 && !(encMsg->msgLen & 3));
441     SHA1 ((uchar *) &encMsg->serverSalt, encLen, sha1Buffer);
442     qCDebug(TG_CORE_SESSION) << "sending message with sha1" << QString::number(*(qint32 *)sha1Buffer, 8);
443 
444     memcpy (encMsg->msgKey, sha1Buffer + 4, 16);
445     mCrypto->initAESAuth(m_dc->authKey(), encMsg->msgKey, AES_ENCRYPT);
446     return mCrypto->padAESEncrypt((char *) &encMsg->serverSalt, encLen, (char *) &encMsg->serverSalt, MAX_MESSAGE_INTS * 4 + (MINSZ - UNENCSZ));
447 }
448 
encryptSendMessage(qint32 * msg,qint32 msgInts,qint32 useful)449 qint64 Session::encryptSendMessage(qint32 *msg, qint32 msgInts, qint32 useful) {
450     const qint32 UNENCSZ = offsetof (EncryptedMsg, serverSalt);
451     if (msgInts <= 0 || msgInts > MAX_MESSAGE_INTS - 4) {
452       return -1;
453     }
454     EncryptedMsg *encMsg = initEncryptedMessage(useful);
455     if (msg) {
456       memcpy (encMsg->message, msg, msgInts * 4);
457       encMsg->msgLen = msgInts * 4;
458     } else if ((encMsg->msgLen & 0x80000003) || encMsg->msgLen > MAX_MESSAGE_INTS * 4 - 16) {
459       delete encMsg;
460       return -1;
461     }
462 
463     qint32 l = aesEncryptMessage(encMsg);
464     Q_ASSERT(l > 0);
465 
466     if(!rpcSendMessage(encMsg, l + UNENCSZ))
467         return -1;
468 
469     delete encMsg;
470     return m_clientLastMsgId;
471 }
472 
rpcSendMessage(void * data,qint32 len)473 bool Session::rpcSendMessage(void *data, qint32 len) {
474     qCDebug(TG_CORE_SESSION) << "rpcSendMessage()," << len;
475 
476     qint32 written;
477     Q_UNUSED(written);
478 
479     mAsserter.check(len > 0 && !(len & 0xfc000003));
480     qint32 totalLen = len >> 2;
481 
482     if (totalLen < 0x7f) {
483         written = writeOut(&totalLen, 1);
484         if(written != 1)
485             return false;
486     } else {
487         totalLen = (totalLen << 8) | 0x7f;
488         written = writeOut(&totalLen, 4);
489         if(written != 4)
490             return false;
491     }
492 
493     written = writeOut(data, len);
494     if(written != len)
495         return false;
496 
497     return true;
498 }
499 
500 
501 //### query management
sendQuery(OutboundPkt & outboundPkt,QueryMethods * methods,const QVariant & extra,const QString & name)502 qint64 Session::sendQuery(OutboundPkt &outboundPkt, QueryMethods *methods, const QVariant &extra, const QString &name) {
503     Q_ASSERT (m_sessionId);
504     Q_ASSERT (m_dc->authKeyId());
505     qint32 *data = outboundPkt.buffer();
506     qint32 ints = outboundPkt.length();
507 
508     qCDebug(TG_CORE_SESSION) << "Sending query of size" << 4 * ints << "to DC" << m_dc->id() << "at" << peerName() << ":" << peerPort() << "by session" << QString::number(m_sessionId, 16);
509 
510     Query *q = new Query(this);
511     q->setData(data, ints);
512     q->setMsgId(encryptSendMessage(data, ints, 1));
513     q->setSeqNo(m_seqNo - 1);
514     qCDebug(TG_CORE_SESSION) << "msgId is" << QString::number(q->msgId(), 16);
515     q->setMethods(methods);
516     q->setAcked(false);
517     q->setExtra(extra);
518     q->setName(name);
519 
520     if (mSettings->resendQueries()) {
521         connect(q, SIGNAL(timeout(Query*)), this, SLOT(resendQuery(Query*)), Qt::UniqueConnection);
522         q->startTimer(QUERY_TIMEOUT);
523     }
524 
525     m_pendingQueries.insert(q->msgId(), q);
526 
527     return q->msgId();
528 }
529 
recomposeAndSendQuery(Query * q)530 qint64 Session::recomposeAndSendQuery(Query *q) {
531     if(!q)
532         return 0;
533 
534     qCDebug(TG_CORE_SESSION) << "Resending query with previous msgId" << QString::number(q->msgId(), 16);
535     q->setMsgId(encryptSendMessage((qint32 *)q->data(), q->dataLength(), 1));
536     q->setSeqNo(m_seqNo - 1);
537     qCDebug(TG_CORE_SESSION) << "new msgId is" << QString::number(q->msgId(), 16);
538     q->setAcked(false);
539     m_pendingQueries.insert(q->msgId(), q);
540     return q->msgId();
541 }
542 
resendQuery(Query * q)543 void Session::resendQuery(Query *q) {
544     Q_ASSERT(q);
545     //avoid resending if resend numbers is less than zero
546     if (q->decreaseResends() < 0) {
547         qCDebug(TG_CORE_SESSION) << "Max resend numbers reached for query with msgId" << QString::number(q->msgId(), 16) << ",query discarded";
548         m_pendingQueries.remove(q->msgId());
549         delete q;
550     } else {
551         qCDebug(TG_CORE_SESSION) << "Resending query with msgId" << QString::number(q->msgId(), 16);
552         OutboundPkt p(mSettings);
553         p.appendInt(TL_MsgContainer);
554         p.appendInt(1);
555         p.appendLong(q->msgId());
556         p.appendInt(q->seqNo());
557         p.appendInt(4 * q->dataLength());
558         p.appendInts((qint32 *)q->data(), q->dataLength());
559         encryptSendMessage(p.buffer(), p.length(), 0);
560     }
561 }
562 
queryOnResult(InboundPkt & inboundPkt,qint64 msgId)563 void Session::queryOnResult(InboundPkt &inboundPkt, qint64 msgId) {
564     qCDebug(TG_CORE_SESSION) << "result for query" << QString::number(msgId, 16);
565     qint32 op = inboundPkt.prefetchInt();
566     qint32 *inPtr = 0;
567     qint32 *inEnd = 0;
568     if (op == (qint32)TL_GZipPacked) {
569         inboundPkt.fetchInt();
570         qint32 l = inboundPkt.prefetchStrlen();
571         char *s = inboundPkt.fetchStr(l);
572         static qint32 packedBuffer[MAX_PACKED_SIZE / 4];
573         qint32 totalOut = Utils::tinflate (s, l, packedBuffer, MAX_PACKED_SIZE);
574         inPtr = inboundPkt.inPtr();
575         inEnd = inboundPkt.inEnd();
576         inboundPkt.setInPtr(packedBuffer);
577         inboundPkt.setInEnd(inboundPkt.inPtr() + totalOut / 4);
578         qCDebug(TG_CORE_SESSION) << "unzipped data";
579     }
580 
581     Query *q = m_pendingQueries.take(msgId);
582     if (!q) {
583         qCWarning(TG_CORE_SESSION) << "No such query";
584         inboundPkt.setInPtr(inboundPkt.inEnd());
585     } else {
586         qCDebug(TG_CORE_SESSION) << "acked query with msgId" << QString::number(msgId, 16) << ",pendingQueries:" << m_pendingQueries.size();
587         q->setAcked(true);
588         Q_EMIT resultReceived(q, inboundPkt);
589     }
590 
591     if (inPtr) {
592         inboundPkt.setInPtr(inPtr);
593         inboundPkt.setInEnd(inEnd);
594     }
595 }
596 
queryOnError(InboundPkt & inboundPkt,qint64 msgId)597 void Session::queryOnError(InboundPkt &inboundPkt, qint64 msgId) {
598     mAsserter.check(inboundPkt.fetchInt() == TL_RpcError);
599     qint32 errorCode = inboundPkt.fetchInt();
600     QString errorText = inboundPkt.fetchQString();
601     qCDebug(TG_CORE_SESSION) << "error for query" << QString::number(msgId, 16) << " :" << errorCode << " :" << errorText;
602 
603     Query *q = m_pendingQueries.take(msgId);
604     if (!q) {
605         qCWarning(TG_CORE_SESSION) << "No such query";
606     } else {
607         q->setAcked(true);
608         Q_EMIT errorReceived(q, errorCode, errorText);
609     }
610 }
611 
612 
addToPendingAcks(qint64 msgId)613 void Session::addToPendingAcks(qint64 msgId) {
614     EventTimer *t = new EventTimer(msgId, ACK_TIMEOUT, this);
615     connect(t, SIGNAL(timerTimeout(qint64)), this, SLOT(ack(qint64)));
616     t->start(); //timeout of 60 secs
617     m_pendingAcks[msgId] = t;
618     if (m_pendingAcks.size() > MAX_PENDING_ACKS) {
619         ackAll();
620     }
621 }
622 
ack(qint64 msgId)623 void Session::ack(qint64 msgId) {
624     QList<qint64> idsList;
625     idsList.append(msgId);
626     sendAcks(idsList);
627 }
628 
ackAll()629 void Session::ackAll() {
630     sendAcks(m_pendingAcks.keys());
631 }
632 
sendAcks(const QList<qint64> & msgIds)633 void Session::sendAcks(const QList<qint64> &msgIds) {
634     OutboundPkt p(mSettings);
635     p.appendInt(TL_MsgsAck);
636     p.appendInt(CoreTypes::typeVector);
637     int n = msgIds.length();
638     p.appendInt(n);
639     Q_FOREACH (qint64 msgId, msgIds) {
640         p.appendLong(msgId);
641         qCDebug(TG_CORE_SESSION) << "Ack for msgId" << msgId;
642         EventTimer* t = m_pendingAcks.take(msgId);
643         if (t) {
644             t->stop();
645             delete t;
646         }
647     }
648     qint64 sentAcksId = encryptSendMessage(p.buffer(), p.length(), 0);
649     qCDebug(TG_CORE_SESSION) << "Sent Acks with id:" << QString::number(sentAcksId, 16);
650 }
651