1 /*
2   message.cpp
3 
4   This file is part of GammaRay, the Qt application inspection and
5   manipulation tool.
6 
7   Copyright (C) 2013-2021 Klarälvdalens Datakonsult AB, a KDAB Group company, info@kdab.com
8   Author: Volker Krause <volker.krause@kdab.com>
9 
10   Licensees holding valid commercial KDAB GammaRay licenses may use this file in
11   accordance with GammaRay Commercial License Agreement provided with the Software.
12 
13   Contact info@kdab.com if any conditions of this licensing are not clear to you.
14 
15   This program is free software; you can redistribute it and/or modify
16   it under the terms of the GNU General Public License as published by
17   the Free Software Foundation, either version 2 of the License, or
18   (at your option) any later version.
19 
20   This program is distributed in the hope that it will be useful,
21   but WITHOUT ANY WARRANTY; without even the implied warranty of
22   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23   GNU General Public License for more details.
24 
25   You should have received a copy of the GNU General Public License
26   along with this program.  If not, see <http://www.gnu.org/licenses/>.
27 */
28 
29 #include "message.h"
30 
31 #include "sharedpool.h"
32 #include "lz4/lz4.h" // 3rdparty
33 
34 #include <QBuffer>
35 #include <QDebug>
36 #include <qendian.h>
37 
compress(const QByteArray & src,QByteArray & dst)38 inline void compress(const QByteArray &src, QByteArray &dst)
39 {
40     const qint32 srcSz = src.size();
41 
42     dst.resize(LZ4_compressBound(srcSz + sizeof(srcSz)));
43     *(qint32 *)dst.data() = srcSz; // save the source size
44 
45     const int sz
46         = LZ4_compress_default(src.constData(), dst.data() + sizeof(int), srcSz, dst.size());
47     dst.resize(sz + sizeof(srcSz));
48 }
49 
uncompress(const QByteArray & src,QByteArray & dst)50 inline void uncompress(const QByteArray &src, QByteArray &dst)
51 {
52     const qint32 dstSz = *(const qint32 *)src.constData(); // get the dest size
53     dst.resize(dstSz);
54     const int sz = LZ4_decompress_safe(src.constData() + sizeof(dstSz), dst.data(),
55                                        src.size()- sizeof(dstSz), dstSz);
56     if (sz <= 0)
57         dst.resize(0);
58     else
59         dst.resize(sz);
60 }
61 
62 static quint8 s_streamVersion = GammaRay::Message::lowestSupportedDataVersion();
63 static const int minimumUncompressedSize = 32;
64 
readNumber(QIODevice * device)65 template<typename T> static T readNumber(QIODevice *device)
66 {
67     T buffer;
68     const int readSize = device->read((char *)&buffer, sizeof(T));
69     Q_UNUSED(readSize);
70     Q_ASSERT(readSize == sizeof(T));
71     return qFromBigEndian(buffer);
72 }
73 
writeNumber(QIODevice * device,T value)74 template<typename T> static void writeNumber(QIODevice *device, T value)
75 {
76     value = qToBigEndian(value);
77     const int writeSize = device->write((char *)&value, sizeof(T));
78     Q_UNUSED(writeSize);
79     Q_ASSERT(writeSize == sizeof(T));
80 }
81 
82 using namespace GammaRay;
83 
84 class MessageBuffer
85 {
86 public:
MessageBuffer()87     MessageBuffer()
88         : stream(&data)
89     {
90         data.open(QIODevice::ReadWrite);
91 
92         // explicitly reserve memory so a resize() won't shed it
93         data.buffer().reserve(32);
94         scratchSpace.reserve(32);
95     }
96 
97     ~MessageBuffer() = default;
98 
clear()99     void clear()
100     {
101         data.buffer().resize(0);
102         resetStatus();
103     }
104 
resetStatus()105     void resetStatus()
106     {
107         data.seek(0);
108         scratchSpace.resize(0);
109         stream.resetStatus();
110     }
111 
112     QBuffer data;
113     QByteArray scratchSpace;
114     QDataStream stream;
115 };
116 
117 Q_GLOBAL_STATIC_WITH_ARGS(SharedPool<MessageBuffer>, s_sharedMessageBufferPool, (5))
118 
Message()119 Message::Message()
120     : m_objectAddress(Protocol::InvalidObjectAddress)
121     , m_messageType(Protocol::InvalidMessageType)
122     , m_buffer(s_sharedMessageBufferPool()->acquire())
123 {
124     m_buffer->clear();
125     m_buffer->stream.setVersion(s_streamVersion);
126 }
127 
Message(Protocol::ObjectAddress objectAddress,Protocol::MessageType type)128 Message::Message(Protocol::ObjectAddress objectAddress, Protocol::MessageType type)
129     : m_objectAddress(objectAddress)
130     , m_messageType(type)
131     , m_buffer(s_sharedMessageBufferPool()->acquire())
132 {
133     m_buffer->clear();
134     m_buffer->stream.setVersion(s_streamVersion);
135 }
136 
Message(Message && other)137 Message::Message(Message &&other) Q_DECL_NOEXCEPT
138     : m_objectAddress(other.m_objectAddress)
139     , m_messageType(other.m_messageType)
140     , m_buffer(std::move(other.m_buffer))
141 {
142 }
143 
144 Message::~Message() = default;
145 
address() const146 Protocol::ObjectAddress Message::address() const
147 {
148     return m_objectAddress;
149 }
150 
type() const151 Protocol::MessageType Message::type() const
152 {
153     return m_messageType;
154 }
155 
payload() const156 QDataStream &Message::payload() const
157 {
158     return m_buffer->stream;
159 }
160 
canReadMessage(QIODevice * device)161 bool Message::canReadMessage(QIODevice *device)
162 {
163     if (!device)
164         return false;
165 
166     static const int minimumSize = sizeof(Protocol::PayloadSize) + sizeof(Protocol::ObjectAddress)
167                                    + sizeof(Protocol::MessageType);
168     if (device->bytesAvailable() < minimumSize)
169         return false;
170 
171     Protocol::PayloadSize payloadSize;
172     const int peekSize = device->peek((char *)&payloadSize, sizeof(Protocol::PayloadSize));
173     if (peekSize < (int)sizeof(Protocol::PayloadSize))
174         return false;
175 
176     if (payloadSize == -1 && !device->isSequential()) // input end on shared memory
177         return false;
178 
179     payloadSize = abs(qFromBigEndian(payloadSize));
180     return device->bytesAvailable() >= payloadSize + minimumSize;
181 }
182 
readMessage(QIODevice * device)183 Message Message::readMessage(QIODevice *device)
184 {
185     Message msg;
186 
187     Protocol::PayloadSize payloadSize = readNumber<qint32>(device);
188 
189     msg.m_objectAddress = readNumber<Protocol::ObjectAddress>(device);
190     msg.m_messageType = readNumber<Protocol::MessageType>(device);
191     Q_ASSERT(msg.m_messageType != Protocol::InvalidMessageType);
192     Q_ASSERT(msg.m_objectAddress != Protocol::InvalidObjectAddress);
193     if (payloadSize < 0) {
194         payloadSize = abs(payloadSize);
195         auto& uncompressedData = msg.m_buffer->scratchSpace;
196         uncompressedData.resize(payloadSize);
197         device->read(uncompressedData.data(), payloadSize);
198         uncompress(uncompressedData, msg.m_buffer->data.buffer());
199         Q_ASSERT(payloadSize == uncompressedData.size());
200     } else {
201         if (payloadSize > 0) {
202             msg.m_buffer->data.buffer() = device->read(payloadSize);
203             Q_ASSERT(payloadSize == msg.m_buffer->data.size());
204         }
205     }
206 
207     msg.m_buffer->resetStatus();
208 
209     return msg;
210 }
211 
lowestSupportedDataVersion()212 quint8 Message::lowestSupportedDataVersion()
213 {
214     return QDataStream::Qt_5_5;
215 }
216 
highestSupportedDataVersion()217 quint8 Message::highestSupportedDataVersion()
218 {
219     return QDataStream::Qt_DefaultCompiledVersion;
220 }
221 
negotiatedDataVersion()222 quint8 Message::negotiatedDataVersion()
223 {
224     return s_streamVersion;
225 }
226 
setNegotiatedDataVersion(quint8 version)227 void Message::setNegotiatedDataVersion(quint8 version)
228 {
229     s_streamVersion = version;
230 }
231 
resetNegotiatedDataVersion()232 void Message::resetNegotiatedDataVersion()
233 {
234     s_streamVersion = lowestSupportedDataVersion();
235 }
236 
write(QIODevice * device) const237 void Message::write(QIODevice *device) const
238 {
239     Q_ASSERT(m_objectAddress != Protocol::InvalidObjectAddress);
240     Q_ASSERT(m_messageType != Protocol::InvalidMessageType);
241     static const bool compressionEnabled = qgetenv("GAMMARAY_DISABLE_LZ4") != "1";
242     const int buffSize = m_buffer->data.size();
243     auto& compressedData = m_buffer->scratchSpace;
244     if (buffSize > minimumUncompressedSize && compressionEnabled)
245         compress(m_buffer->data.buffer(), compressedData);
246 
247     const bool isCompressed = compressedData.size() && compressedData.size() < buffSize;
248     if (isCompressed)
249         writeNumber<Protocol::PayloadSize>(device, -compressedData.size()); // send compressed Buffer
250     else
251         writeNumber<Protocol::PayloadSize>(device, buffSize);   // send uncompressed Buffer
252 
253     writeNumber(device, m_objectAddress);
254     writeNumber(device, m_messageType);
255 
256     if (buffSize) {
257         if (isCompressed) {
258             const int s = device->write(compressedData);
259             Q_ASSERT(s == compressedData.size());
260             Q_UNUSED(s);
261         } else {
262             const int s = device->write(m_buffer->data.buffer());
263             Q_ASSERT(s == m_buffer->data.size());
264             Q_UNUSED(s);
265         }
266     }
267 }
268 
size() const269 int Message::size() const
270 {
271     return m_buffer->data.size();
272 }
273