1 #include <cpp-pcp-client/protocol/v1/message.hpp>
2 #include <cpp-pcp-client/protocol/v1/schemas.hpp>
3
4 #define LEATHERMAN_LOGGING_NAMESPACE CPP_PCP_CLIENT_LOGGING_PREFIX".message"
5
6 #include <leatherman/logging/logging.hpp>
7
8 #include <leatherman/locale/locale.hpp>
9
10 #include <algorithm> // find
11
12 // TODO(ale): disable assert() once we're confident with the code...
13 // To disable assert()
14 // #define NDEBUG
15 #include <cassert>
16
17 namespace PCPClient {
18 namespace v1 {
19
20 namespace lth_jc = leatherman::json_container;
21 namespace lth_loc = leatherman::locale;
22
23 //
24 // Constants
25 //
26
27 // TODO(ale): use a greater min value based on the expected json entries
28 // Min size of the envelope chunk data [byte]
29 static const size_t MIN_ENVELOPE_SIZE { 6 };
30
31 // Size of descriptor and size fields [byte]
32 static const size_t CHUNK_METADATA_SIZE { 5 };
33
34 // Size of version field [byte]
35 static const size_t VERSION_FIELD_SIZE { 1 };
36
37 // Ordered list of supported protocol versions; the last entry should
38 // be used when creating new messages
39 static std::vector<uint8_t> SUPPORTED_VERSIONS { 1 };
40
41 //
42 // Message
43 //
44
45 // Constructors
46
Message(const std::string & transport_msg)47 Message::Message(const std::string& transport_msg) : version_ {},
48 envelope_chunk_ {},
49 data_chunk_ {},
50 debug_chunks_ {} {
51 parseMessage(transport_msg);
52 }
53
Message(MessageChunk envelope_chunk)54 Message::Message(MessageChunk envelope_chunk)
55 : version_ { SUPPORTED_VERSIONS.back() },
56 envelope_chunk_ { envelope_chunk },
57 data_chunk_ {},
58 debug_chunks_ {} {
59 validateChunk(envelope_chunk);
60 }
61
Message(MessageChunk envelope_chunk,MessageChunk data_chunk)62 Message::Message(MessageChunk envelope_chunk, MessageChunk data_chunk)
63 : version_ { SUPPORTED_VERSIONS.back() },
64 envelope_chunk_ { envelope_chunk },
65 data_chunk_ { data_chunk },
66 debug_chunks_ {} {
67 validateChunk(envelope_chunk);
68 validateChunk(data_chunk);
69 }
70
Message(MessageChunk envelope_chunk,MessageChunk data_chunk,MessageChunk debug_chunk)71 Message::Message(MessageChunk envelope_chunk, MessageChunk data_chunk,
72 MessageChunk debug_chunk)
73 : version_ { SUPPORTED_VERSIONS.back() },
74 envelope_chunk_ { envelope_chunk },
75 data_chunk_ { data_chunk },
76 debug_chunks_ { debug_chunk } {
77 validateChunk(envelope_chunk);
78 validateChunk(data_chunk);
79 validateChunk(debug_chunk);
80 }
81
82 // Add chunks
83
setDataChunk(MessageChunk data_chunk)84 void Message::setDataChunk(MessageChunk data_chunk) {
85 validateChunk(data_chunk);
86
87 if (hasData()) {
88 LOG_WARNING("Resetting data chunk");
89 }
90
91 data_chunk_ = data_chunk;
92 }
93
addDebugChunk(MessageChunk debug_chunk)94 void Message::addDebugChunk(MessageChunk debug_chunk) {
95 validateChunk(debug_chunk);
96 debug_chunks_.push_back(debug_chunk);
97 }
98
99 // Getters
100
getVersion() const101 uint8_t Message::getVersion() const {
102 return version_;
103 }
104
getEnvelopeChunk() const105 MessageChunk Message::getEnvelopeChunk() const {
106 return envelope_chunk_;
107 }
108
getDataChunk() const109 MessageChunk Message::getDataChunk() const {
110 return data_chunk_;
111 }
112
getDebugChunks() const113 std::vector<MessageChunk> Message::getDebugChunks() const {
114 return debug_chunks_;
115 }
116
117 // Inspectors
118
hasData() const119 bool Message::hasData() const {
120 return data_chunk_.descriptor != 0;
121 }
122
hasDebug() const123 bool Message::hasDebug() const {
124 return !debug_chunks_.empty();
125 }
126
127 // Get the serialized message
128
getSerialized() const129 SerializedMessage Message::getSerialized() const {
130 SerializedMessage buffer;
131
132 // Version (mandatory)
133 serialize<uint8_t>(version_, 1, buffer);
134
135 // Envelope (mandatory)
136 envelope_chunk_.serializeOn(buffer);
137
138 if (hasData()) {
139 // Data (optional)
140 data_chunk_.serializeOn(buffer);
141 }
142
143 if (hasDebug()) {
144 for (const auto& d_c : debug_chunks_)
145 // Debug (optional; mutiple)
146 d_c.serializeOn(buffer);
147 }
148
149 return buffer;
150 }
151
152 // Parse JSON, validate schema, and return the content of chunks
153
getParsedChunks(const Validator & validator) const154 ParsedChunks Message::getParsedChunks(const Validator& validator) const {
155 // Envelope
156 lth_jc::JsonContainer envelope_content { envelope_chunk_.content };
157 validator.validate(envelope_content, Protocol::ENVELOPE_SCHEMA_NAME);
158 auto msg_id = envelope_content.get<std::string>("id");
159
160 // Debug
161 std::vector<lth_jc::JsonContainer> debug_content {};
162 unsigned int num_invalid_debug { 0 };
163 for (const auto& d_c : debug_chunks_) {
164 try {
165 // Parse the JSON text
166 lth_jc::JsonContainer parsed_debug { d_c.content };
167
168 // Validate entire content (array)
169 validator.validate(parsed_debug, Protocol::DEBUG_SCHEMA_NAME);
170
171 // Validate each hop entry
172 for (auto &hop : parsed_debug.get<std::vector<lth_jc::JsonContainer>>("hops")) {
173 validator.validate(hop, Protocol::DEBUG_ITEM_SCHEMA_NAME);
174 }
175
176 debug_content.push_back(parsed_debug);
177 } catch (leatherman::json_container::data_parse_error& e) {
178 num_invalid_debug++;
179 LOG_DEBUG("Invalid debug in message {1}: {2}", msg_id, e.what());
180 } catch (validator_error& e) {
181 num_invalid_debug++;
182 LOG_DEBUG("Invalid debug in message {1}: {2}", msg_id, e.what());
183 }
184 }
185
186 // Data
187 if (hasData()) {
188 auto message_type = envelope_content.get<std::string>("message_type");
189 auto content_type = validator.getSchemaContentType(message_type);
190
191 if (content_type == ContentType::Json) {
192 std::string err_msg {};
193 try {
194 lth_jc::JsonContainer data_content_json { data_chunk_.content };
195 validator.validate(data_content_json, message_type);
196
197 // Valid JSON data content
198 return ParsedChunks { envelope_content,
199 data_content_json,
200 debug_content,
201 num_invalid_debug };
202 } catch (leatherman::json_container::data_parse_error& e) {
203 err_msg = e.what();
204 } catch (validator_error& e) {
205 err_msg = e.what();
206 }
207
208 LOG_DEBUG("Invalid data in message {1}: {2}", msg_id, err_msg);
209
210 // Bad JSON data content
211 return ParsedChunks { envelope_content,
212 true,
213 debug_content,
214 num_invalid_debug };
215 } else if (content_type == ContentType::Binary) {
216 auto data_content_binary = data_chunk_.content;
217
218 // Binary data content
219 return ParsedChunks { envelope_content,
220 data_content_binary,
221 debug_content,
222 num_invalid_debug };
223 } else {
224 assert(false);
225 }
226 }
227
228 return ParsedChunks { envelope_content, debug_content, num_invalid_debug };
229 }
230
231 // toString
232
toString() const233 std::string Message::toString() const {
234 auto s = std::to_string(version_) + envelope_chunk_.toString();
235
236 if (hasData()) {
237 s += data_chunk_.toString();
238 }
239
240 for (const auto& debug_chunk : debug_chunks_) {
241 s += debug_chunk.toString();
242 }
243
244 return s;
245 }
246
247 //
248 // Message - private interface
249 //
250
parseMessage(const std::string & transport_msg)251 void Message::parseMessage(const std::string& transport_msg) {
252 auto msg_size = transport_msg.size();
253
254 if (msg_size < MIN_ENVELOPE_SIZE) {
255 LOG_ERROR("Invalid msg; envelope is too small");
256 LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
257 throw message_serialization_error {
258 lth_loc::translate("invalid msg: envelope too small") };
259 }
260
261 // Serialization buffer
262
263 auto buffer = SerializedMessage(transport_msg.begin(), transport_msg.end());
264 SerializedMessage::const_iterator next_itr { buffer.begin() };
265
266 // Version
267
268 auto msg_version = deserialize<uint8_t>(1, next_itr);
269 validateVersion(msg_version);
270
271 // Envelope (mandatory chunk)
272
273 auto envelope_desc = deserialize<uint8_t>(1, next_itr);
274 auto envelope_desc_bit = envelope_desc & ChunkDescriptor::TYPE_MASK;
275 if (envelope_desc_bit != ChunkDescriptor::ENVELOPE) {
276 LOG_ERROR("Invalid msg; missing envelope descriptor");
277 LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
278 throw message_serialization_error {
279 lth_loc::translate("invalid msg: no envelope descriptor") };
280 }
281
282 auto envelope_size = deserialize<uint32_t>(4, next_itr);
283 if (envelope_size > UINT_MAX - (VERSION_FIELD_SIZE + CHUNK_METADATA_SIZE)) {
284 LOG_ERROR("Invalid msg; envelope size is too large for 32-bit systems");
285 LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
286 throw message_serialization_error {
287 lth_loc::translate("invalid msg: size too large") };
288 }
289
290 if (msg_size < VERSION_FIELD_SIZE + CHUNK_METADATA_SIZE + envelope_size) {
291 LOG_ERROR("Invalid msg; missing envelope content");
292 LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
293 throw message_serialization_error {
294 lth_loc::translate("invalid msg: no envelope") };
295 }
296
297 auto envelope_content = deserialize<std::string>(envelope_size, next_itr);
298
299 // Data and debug (optional chunks)
300
301 auto still_to_parse = msg_size - (VERSION_FIELD_SIZE + CHUNK_METADATA_SIZE
302 + envelope_size);
303
304 while (still_to_parse > CHUNK_METADATA_SIZE) {
305 auto chunk_desc = deserialize<uint8_t>(1, next_itr);
306 auto chunk_desc_bit = chunk_desc & ChunkDescriptor::TYPE_MASK;
307
308 if (chunk_desc_bit != ChunkDescriptor::DATA
309 && chunk_desc_bit != ChunkDescriptor::DEBUG) {
310 LOG_ERROR("Invalid msg; invalid chunk descriptor {1}",
311 static_cast<int>(chunk_desc));
312 LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
313 throw message_serialization_error {
314 lth_loc::translate("invalid msg: invalid chunk descriptor") };
315 }
316
317 auto chunk_size = deserialize<uint32_t>(4, next_itr);
318 auto missing_bytes = still_to_parse - CHUNK_METADATA_SIZE;
319 if (chunk_size > missing_bytes) {
320 // TODO(ale): deal with locale & plural
321 assert(chunk_size > 1);
322 if (missing_bytes == 1) {
323 LOG_ERROR("Invalid msg; missing part of the {1} chunk content ({2} "
324 "bytes declared - missing {3} byte)",
325 ChunkDescriptor::names[chunk_desc_bit], chunk_size,
326 missing_bytes);
327 } else {
328 LOG_ERROR("Invalid msg; missing part of the {1} chunk content ({2} "
329 "bytes declared - missing {3} bytes)",
330 ChunkDescriptor::names[chunk_desc_bit], chunk_size,
331 missing_bytes);
332 }
333 LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
334 throw message_serialization_error {
335 lth_loc::translate("invalid msg: missing chunk content") };
336 }
337
338 auto chunk_content = deserialize<std::string>(chunk_size, next_itr);
339 MessageChunk chunk { chunk_desc, chunk_size, chunk_content };
340
341 if (chunk_desc_bit == ChunkDescriptor::DATA) {
342 if (hasData()) {
343 LOG_ERROR("Invalid msg; multiple data chunks");
344 LOG_TRACE("Invalid msg content (unserialized): '{1}'",
345 transport_msg);
346 throw message_serialization_error {
347 lth_loc::translate("invalid msg: multiple data chunks") };
348 }
349
350 data_chunk_ = chunk;
351 } else {
352 debug_chunks_.push_back(chunk);
353 }
354
355 still_to_parse -= CHUNK_METADATA_SIZE + chunk_size;
356 }
357
358 if (still_to_parse > 0) {
359 // TODO(ale): deal with locale & plural
360 if (still_to_parse == 1) {
361 LOG_ERROR("Failed to parse the entire msg (ignoring last {1} byte); "
362 "the msg will be processed anyway", still_to_parse);
363 } else {
364 LOG_ERROR("Failed to parse the entire msg (ignoring last {1} bytes); "
365 "the msg will be processed anyway", still_to_parse);
366 }
367 LOG_TRACE("Msg content (unserialized): '{1}'", transport_msg);
368 }
369
370 version_ = msg_version;
371 envelope_chunk_ = MessageChunk { envelope_desc,
372 envelope_size,
373 envelope_content };
374 }
375
validateVersion(const uint8_t & version) const376 void Message::validateVersion(const uint8_t& version) const {
377 auto found = std::find(SUPPORTED_VERSIONS.begin(), SUPPORTED_VERSIONS.end(),
378 version);
379 if (found == SUPPORTED_VERSIONS.end()) {
380 auto version_num = static_cast<int>(version);
381 LOG_ERROR("Unsupported message version: {1}", version_num);
382 throw unsupported_version_error {
383 lth_loc::format("unsupported message version: {1}", version_num) };
384 }
385 }
386
validateChunk(const MessageChunk & chunk) const387 void Message::validateChunk(const MessageChunk& chunk) const {
388 auto desc_bit = chunk.descriptor & ChunkDescriptor::TYPE_MASK;
389
390 if (ChunkDescriptor::names.find(desc_bit) == ChunkDescriptor::names.end()) {
391 LOG_ERROR("Unknown chunk descriptor: {1}",
392 static_cast<int>(chunk.descriptor));
393 throw invalid_chunk_error { lth_loc::translate("unknown descriptor") };
394 }
395
396 if (chunk.size != static_cast<uint32_t>(chunk.content.size())) {
397 // TODO(ale): deal with locale and plural
398 if (chunk.size == 1) {
399 assert(chunk.content.size() != 1);
400 LOG_ERROR("Incorrect size for {1} chunk; declared {2} byte, got {3} bytes",
401 ChunkDescriptor::names[desc_bit], chunk.size, chunk.content.size());
402 } else {
403 if (chunk.content.size() == 1) {
404 LOG_ERROR("Incorrect size for {1} chunk; declared {2} bytes, got {3} byte",
405 ChunkDescriptor::names[desc_bit], chunk.size, chunk.content.size());
406 } else {
407 LOG_ERROR("Incorrect size for {1} chunk; declared {2} bytes, got {3} bytes",
408 ChunkDescriptor::names[desc_bit], chunk.size, chunk.content.size());
409 }
410 }
411 throw invalid_chunk_error { lth_loc::translate("invalid size") };
412 }
413 }
414
415 } // namespace v1
416 } // namespace PCPClient
417