1 #include <cpp-pcp-client/protocol/v1/message.hpp>
2 #include <cpp-pcp-client/protocol/v1/schemas.hpp>
3 
4 #define LEATHERMAN_LOGGING_NAMESPACE CPP_PCP_CLIENT_LOGGING_PREFIX".message"
5 
6 #include <leatherman/logging/logging.hpp>
7 
8 #include <leatherman/locale/locale.hpp>
9 
10 #include <algorithm>  // find
11 
12 // TODO(ale): disable assert() once we're confident with the code...
13 // To disable assert()
14 // #define NDEBUG
15 #include <cassert>
16 
17 namespace PCPClient {
18 namespace v1 {
19 
20 namespace lth_jc  = leatherman::json_container;
21 namespace lth_loc = leatherman::locale;
22 
23 //
24 // Constants
25 //
26 
27 // TODO(ale): use a greater min value based on the expected json entries
28 // Min size of the envelope chunk data [byte]
29 static const size_t MIN_ENVELOPE_SIZE { 6 };
30 
31 // Size of descriptor and size fields [byte]
32 static const size_t CHUNK_METADATA_SIZE { 5 };
33 
34 // Size of version field [byte]
35 static const size_t VERSION_FIELD_SIZE { 1 };
36 
37 // Ordered list of supported protocol versions; the last entry should
38 // be used when creating new messages
39 static std::vector<uint8_t> SUPPORTED_VERSIONS { 1 };
40 
41 //
42 // Message
43 //
44 
45 // Constructors
46 
Message(const std::string & transport_msg)47 Message::Message(const std::string& transport_msg) : version_ {},
48                                                      envelope_chunk_ {},
49                                                      data_chunk_ {},
50                                                      debug_chunks_ {} {
51     parseMessage(transport_msg);
52 }
53 
Message(MessageChunk envelope_chunk)54 Message::Message(MessageChunk envelope_chunk)
55         : version_ { SUPPORTED_VERSIONS.back() },
56           envelope_chunk_ { envelope_chunk },
57           data_chunk_ {},
58           debug_chunks_ {} {
59     validateChunk(envelope_chunk);
60 }
61 
Message(MessageChunk envelope_chunk,MessageChunk data_chunk)62 Message::Message(MessageChunk envelope_chunk, MessageChunk data_chunk)
63         : version_ { SUPPORTED_VERSIONS.back() },
64           envelope_chunk_ { envelope_chunk },
65           data_chunk_ { data_chunk },
66           debug_chunks_ {} {
67     validateChunk(envelope_chunk);
68     validateChunk(data_chunk);
69 }
70 
Message(MessageChunk envelope_chunk,MessageChunk data_chunk,MessageChunk debug_chunk)71 Message::Message(MessageChunk envelope_chunk, MessageChunk data_chunk,
72                  MessageChunk debug_chunk)
73         : version_ { SUPPORTED_VERSIONS.back() },
74           envelope_chunk_ { envelope_chunk },
75           data_chunk_ { data_chunk },
76           debug_chunks_ { debug_chunk } {
77     validateChunk(envelope_chunk);
78     validateChunk(data_chunk);
79     validateChunk(debug_chunk);
80 }
81 
82 // Add chunks
83 
setDataChunk(MessageChunk data_chunk)84 void Message::setDataChunk(MessageChunk data_chunk) {
85     validateChunk(data_chunk);
86 
87     if (hasData()) {
88         LOG_WARNING("Resetting data chunk");
89     }
90 
91     data_chunk_ = data_chunk;
92 }
93 
addDebugChunk(MessageChunk debug_chunk)94 void Message::addDebugChunk(MessageChunk debug_chunk) {
95     validateChunk(debug_chunk);
96     debug_chunks_.push_back(debug_chunk);
97 }
98 
99 // Getters
100 
getVersion() const101 uint8_t Message::getVersion() const {
102     return version_;
103 }
104 
getEnvelopeChunk() const105 MessageChunk Message::getEnvelopeChunk() const {
106     return envelope_chunk_;
107 }
108 
getDataChunk() const109 MessageChunk Message::getDataChunk() const {
110     return data_chunk_;
111 }
112 
getDebugChunks() const113 std::vector<MessageChunk> Message::getDebugChunks() const {
114     return debug_chunks_;
115 }
116 
117 // Inspectors
118 
hasData() const119 bool Message::hasData() const {
120     return data_chunk_.descriptor != 0;
121 }
122 
hasDebug() const123 bool Message::hasDebug() const {
124     return !debug_chunks_.empty();
125 }
126 
127 // Get the serialized message
128 
getSerialized() const129 SerializedMessage Message::getSerialized() const {
130     SerializedMessage buffer;
131 
132     // Version (mandatory)
133     serialize<uint8_t>(version_, 1, buffer);
134 
135     // Envelope (mandatory)
136     envelope_chunk_.serializeOn(buffer);
137 
138     if (hasData()) {
139         // Data (optional)
140         data_chunk_.serializeOn(buffer);
141     }
142 
143     if (hasDebug()) {
144         for (const auto& d_c : debug_chunks_)
145             // Debug (optional; mutiple)
146             d_c.serializeOn(buffer);
147     }
148 
149     return buffer;
150 }
151 
152 // Parse JSON, validate schema, and return the content of chunks
153 
getParsedChunks(const Validator & validator) const154 ParsedChunks Message::getParsedChunks(const Validator& validator) const {
155     // Envelope
156     lth_jc::JsonContainer envelope_content { envelope_chunk_.content };
157     validator.validate(envelope_content, Protocol::ENVELOPE_SCHEMA_NAME);
158     auto msg_id = envelope_content.get<std::string>("id");
159 
160     // Debug
161     std::vector<lth_jc::JsonContainer> debug_content {};
162     unsigned int num_invalid_debug { 0 };
163     for (const auto& d_c : debug_chunks_) {
164         try {
165             // Parse the JSON text
166             lth_jc::JsonContainer parsed_debug { d_c.content };
167 
168             // Validate entire content (array)
169             validator.validate(parsed_debug, Protocol::DEBUG_SCHEMA_NAME);
170 
171             // Validate each hop entry
172             for (auto &hop : parsed_debug.get<std::vector<lth_jc::JsonContainer>>("hops")) {
173                 validator.validate(hop, Protocol::DEBUG_ITEM_SCHEMA_NAME);
174             }
175 
176             debug_content.push_back(parsed_debug);
177         } catch (leatherman::json_container::data_parse_error& e) {
178             num_invalid_debug++;
179             LOG_DEBUG("Invalid debug in message {1}: {2}", msg_id, e.what());
180         } catch (validator_error& e) {
181             num_invalid_debug++;
182             LOG_DEBUG("Invalid debug in message {1}: {2}", msg_id, e.what());
183         }
184     }
185 
186     // Data
187     if (hasData()) {
188         auto message_type = envelope_content.get<std::string>("message_type");
189         auto content_type = validator.getSchemaContentType(message_type);
190 
191         if (content_type == ContentType::Json) {
192             std::string err_msg {};
193             try {
194                 lth_jc::JsonContainer data_content_json { data_chunk_.content };
195                 validator.validate(data_content_json, message_type);
196 
197                 // Valid JSON data content
198                 return ParsedChunks { envelope_content,
199                                       data_content_json,
200                                       debug_content,
201                                       num_invalid_debug };
202             } catch (leatherman::json_container::data_parse_error& e) {
203                 err_msg = e.what();
204             } catch (validator_error& e) {
205                 err_msg = e.what();
206             }
207 
208             LOG_DEBUG("Invalid data in message {1}: {2}", msg_id, err_msg);
209 
210             // Bad JSON data content
211             return ParsedChunks { envelope_content,
212                                   true,
213                                   debug_content,
214                                   num_invalid_debug };
215         } else if (content_type == ContentType::Binary) {
216             auto data_content_binary = data_chunk_.content;
217 
218             // Binary data content
219             return ParsedChunks { envelope_content,
220                                   data_content_binary,
221                                   debug_content,
222                                   num_invalid_debug };
223         } else {
224             assert(false);
225         }
226     }
227 
228     return ParsedChunks { envelope_content, debug_content, num_invalid_debug };
229 }
230 
231 // toString
232 
toString() const233 std::string Message::toString() const {
234     auto s = std::to_string(version_) + envelope_chunk_.toString();
235 
236     if (hasData()) {
237         s += data_chunk_.toString();
238     }
239 
240     for (const auto& debug_chunk : debug_chunks_) {
241         s += debug_chunk.toString();
242     }
243 
244     return s;
245 }
246 
247 //
248 // Message - private interface
249 //
250 
parseMessage(const std::string & transport_msg)251 void Message::parseMessage(const std::string& transport_msg) {
252     auto msg_size = transport_msg.size();
253 
254     if (msg_size < MIN_ENVELOPE_SIZE) {
255         LOG_ERROR("Invalid msg; envelope is too small");
256         LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
257         throw message_serialization_error {
258             lth_loc::translate("invalid msg: envelope too small") };
259     }
260 
261     // Serialization buffer
262 
263     auto buffer = SerializedMessage(transport_msg.begin(), transport_msg.end());
264     SerializedMessage::const_iterator next_itr { buffer.begin() };
265 
266     // Version
267 
268     auto msg_version = deserialize<uint8_t>(1, next_itr);
269     validateVersion(msg_version);
270 
271     // Envelope (mandatory chunk)
272 
273     auto envelope_desc = deserialize<uint8_t>(1, next_itr);
274     auto envelope_desc_bit = envelope_desc & ChunkDescriptor::TYPE_MASK;
275     if (envelope_desc_bit != ChunkDescriptor::ENVELOPE) {
276         LOG_ERROR("Invalid msg; missing envelope descriptor");
277         LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
278         throw message_serialization_error {
279             lth_loc::translate("invalid msg: no envelope descriptor") };
280     }
281 
282     auto envelope_size = deserialize<uint32_t>(4, next_itr);
283     if (envelope_size > UINT_MAX - (VERSION_FIELD_SIZE + CHUNK_METADATA_SIZE)) {
284         LOG_ERROR("Invalid msg; envelope size is too large for 32-bit systems");
285         LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
286         throw message_serialization_error {
287             lth_loc::translate("invalid msg: size too large") };
288     }
289 
290     if (msg_size < VERSION_FIELD_SIZE + CHUNK_METADATA_SIZE + envelope_size) {
291         LOG_ERROR("Invalid msg; missing envelope content");
292         LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
293         throw message_serialization_error {
294             lth_loc::translate("invalid msg: no envelope") };
295     }
296 
297     auto envelope_content = deserialize<std::string>(envelope_size, next_itr);
298 
299     // Data and debug (optional chunks)
300 
301     auto still_to_parse =  msg_size - (VERSION_FIELD_SIZE + CHUNK_METADATA_SIZE
302                                        + envelope_size);
303 
304     while (still_to_parse > CHUNK_METADATA_SIZE) {
305         auto chunk_desc = deserialize<uint8_t>(1, next_itr);
306         auto chunk_desc_bit = chunk_desc & ChunkDescriptor::TYPE_MASK;
307 
308         if (chunk_desc_bit != ChunkDescriptor::DATA
309                 && chunk_desc_bit != ChunkDescriptor::DEBUG) {
310             LOG_ERROR("Invalid msg; invalid chunk descriptor {1}",
311                       static_cast<int>(chunk_desc));
312             LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
313             throw message_serialization_error {
314                 lth_loc::translate("invalid msg: invalid chunk descriptor") };
315         }
316 
317         auto chunk_size = deserialize<uint32_t>(4, next_itr);
318         auto missing_bytes = still_to_parse - CHUNK_METADATA_SIZE;
319         if (chunk_size > missing_bytes) {
320             // TODO(ale): deal with locale & plural
321             assert(chunk_size > 1);
322             if (missing_bytes == 1) {
323                 LOG_ERROR("Invalid msg; missing part of the {1} chunk content ({2} "
324                           "bytes declared - missing {3} byte)",
325                           ChunkDescriptor::names[chunk_desc_bit], chunk_size,
326                           missing_bytes);
327             } else {
328                 LOG_ERROR("Invalid msg; missing part of the {1} chunk content ({2} "
329                           "bytes declared - missing {3} bytes)",
330                           ChunkDescriptor::names[chunk_desc_bit], chunk_size,
331                           missing_bytes);
332             }
333             LOG_TRACE("Invalid msg content (unserialized): '{1}'", transport_msg);
334             throw message_serialization_error {
335                 lth_loc::translate("invalid msg: missing chunk content") };
336         }
337 
338         auto chunk_content = deserialize<std::string>(chunk_size, next_itr);
339         MessageChunk chunk { chunk_desc, chunk_size, chunk_content };
340 
341         if (chunk_desc_bit == ChunkDescriptor::DATA) {
342             if (hasData()) {
343                 LOG_ERROR("Invalid msg; multiple data chunks");
344                 LOG_TRACE("Invalid msg content (unserialized): '{1}'",
345                           transport_msg);
346                 throw message_serialization_error {
347                     lth_loc::translate("invalid msg: multiple data chunks") };
348             }
349 
350             data_chunk_ = chunk;
351         } else {
352             debug_chunks_.push_back(chunk);
353         }
354 
355         still_to_parse -= CHUNK_METADATA_SIZE + chunk_size;
356     }
357 
358     if (still_to_parse > 0) {
359         // TODO(ale): deal with locale & plural
360         if (still_to_parse == 1) {
361             LOG_ERROR("Failed to parse the entire msg (ignoring last {1} byte); "
362                       "the msg will be processed anyway", still_to_parse);
363         } else {
364             LOG_ERROR("Failed to parse the entire msg (ignoring last {1} bytes); "
365                       "the msg will be processed anyway", still_to_parse);
366         }
367         LOG_TRACE("Msg content (unserialized): '{1}'", transport_msg);
368     }
369 
370     version_ = msg_version;
371     envelope_chunk_ = MessageChunk  { envelope_desc,
372                                       envelope_size,
373                                       envelope_content };
374 }
375 
validateVersion(const uint8_t & version) const376 void Message::validateVersion(const uint8_t& version) const {
377     auto found = std::find(SUPPORTED_VERSIONS.begin(), SUPPORTED_VERSIONS.end(),
378                            version);
379     if (found == SUPPORTED_VERSIONS.end()) {
380         auto version_num = static_cast<int>(version);
381         LOG_ERROR("Unsupported message version: {1}", version_num);
382         throw unsupported_version_error {
383             lth_loc::format("unsupported message version: {1}", version_num) };
384     }
385 }
386 
validateChunk(const MessageChunk & chunk) const387 void Message::validateChunk(const MessageChunk& chunk) const {
388     auto desc_bit = chunk.descriptor & ChunkDescriptor::TYPE_MASK;
389 
390     if (ChunkDescriptor::names.find(desc_bit) == ChunkDescriptor::names.end()) {
391         LOG_ERROR("Unknown chunk descriptor: {1}",
392                   static_cast<int>(chunk.descriptor));
393         throw invalid_chunk_error { lth_loc::translate("unknown descriptor") };
394     }
395 
396     if (chunk.size != static_cast<uint32_t>(chunk.content.size())) {
397         // TODO(ale): deal with locale and plural
398         if (chunk.size == 1) {
399             assert(chunk.content.size() != 1);
400             LOG_ERROR("Incorrect size for {1} chunk; declared {2} byte, got {3} bytes",
401                       ChunkDescriptor::names[desc_bit], chunk.size, chunk.content.size());
402         } else {
403             if (chunk.content.size() == 1) {
404                 LOG_ERROR("Incorrect size for {1} chunk; declared {2} bytes, got {3} byte",
405                           ChunkDescriptor::names[desc_bit], chunk.size, chunk.content.size());
406             } else {
407                 LOG_ERROR("Incorrect size for {1} chunk; declared {2} bytes, got {3} bytes",
408                           ChunkDescriptor::names[desc_bit], chunk.size, chunk.content.size());
409             }
410         }
411         throw invalid_chunk_error { lth_loc::translate("invalid size") };
412     }
413 }
414 
415 }  // namespace v1
416 }  // namespace PCPClient
417