1 /*
2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 
25 #ifndef PLUGIN_X_PROTOCOL_ENCODERS_ENCODING_XPROTOCOL_H_
26 #define PLUGIN_X_PROTOCOL_ENCODERS_ENCODING_XPROTOCOL_H_
27 
28 #include <google/protobuf/wire_format_lite.h>
29 #include <cassert>
30 #include <cstdint>
31 #include <string>
32 
33 #include "my_dbug.h"
34 
35 #include "plugin/x/protocol/encoders/encoding_protobuf.h"
36 
37 namespace protocol {
38 
39 enum class Compression_type { k_single, k_multiple, k_group };
40 
41 class Compression_buffer_interface {
42  public:
43   virtual ~Compression_buffer_interface() = default;
44 
45   virtual void reset_counters() = 0;
46   virtual bool process(Encoding_buffer *output_buffer,
47                        const Encoding_buffer *input_buffer) = 0;
48 
49   virtual void get_processed_data(uint32_t *out_uncompressed,
50                                   uint32_t *out_compressed) = 0;
51 };
52 
53 /**
54   This class is wraps protobuf payload with X Protocol header
55 
56   This class generates X Protocol headers for protobuf messages
57   and for compressed messages.
58   Additionally it supplies sub-field protobuf functionality,
59   because similar mechanism was used for generation for protobuf
60   fields and X headers.
61 */
62 class XProtocol_encoder : public Protobuf_encoder {
63  private:
64   constexpr static uint32_t k_xmessage_header_length = 5;
65 
66   enum class Header_configuration { k_full, k_size_only, k_none };
67 
68   Header_configuration m_header_configuration = Header_configuration::k_full;
69   uint32_t m_header_size = header_size(m_header_configuration);
70 
set_header_config(const Header_configuration config)71   void set_header_config(const Header_configuration config) {
72     m_header_configuration = config;
73     m_header_size = header_size(m_header_configuration);
74   }
75 
header_size(const Header_configuration config)76   static uint32_t header_size(const Header_configuration config) {
77     switch (config) {
78       case Header_configuration::k_full:
79         return 5;
80       case Header_configuration::k_none:
81         return 0;
82       case Header_configuration::k_size_only:
83         return 4;
84       default:
85         assert(false && "Not allowed value");
86         return 0;
87     }
88   }
89 
90  public:
XProtocol_encoder(Encoding_buffer * buffer)91   explicit XProtocol_encoder(Encoding_buffer *buffer)
92       : Protobuf_encoder(buffer) {
93     ensure_buffer_size<1>();
94   }
95 
96   struct Position {
97     Page *m_page;
98     uint8_t *m_position;
99 
get_positionPosition100     uint8_t *get_position() const { return m_position; }
101 
bytes_until_pagePosition102     uint32_t bytes_until_page(Page *current_page) const {
103       uint32_t size = m_page->m_current_data - m_position;
104 
105       if (current_page == m_page) {
106         return size;
107       }
108 
109       Page *i = m_page->m_next_page;
110       for (;;) {
111         assert(nullptr != i);
112         size += i->get_used_bytes();
113 
114         if (i == current_page) {
115           assert(nullptr == i->m_next_page);
116           break;
117         }
118 
119         i = i->m_next_page;
120       }
121 
122       return size;
123     }
124   };
125 
126   template <uint32_t delimiter_length>
127   struct Field_delimiter : Position {};
128 
129   struct Compression_position : Position {
130     Encoding_buffer *m_compressed_buffer;
131     Compression_type m_compression_type;
132     Delayed_fixed_varuint32 m_uncompressed_size;
133     Field_delimiter<5> m_payload;
134     uint8_t m_msg_id;
135   };
136 
137   template <uint32_t id>
empty_xmessage()138   void empty_xmessage() {
139     ensure_buffer_size<k_xmessage_header_length>();
140 
141     if (Header_configuration::k_full == m_header_configuration) {
142       DBUG_LOG("debug", "empty_msg_full_header");
143       primitives::base::Fixint_length<4>::encode<1>(m_page->m_current_data);
144       primitives::base::Fixint_length<1>::encode<id>(m_page->m_current_data);
145     } else if (Header_configuration::k_size_only == m_header_configuration) {
146       DBUG_LOG("debug", "empty_msg_size_only");
147       primitives::base::Fixint_length<4>::encode<0>(m_page->m_current_data);
148     }
149   }
150 
begin_compression(const uint8_t msg_id,const Compression_type type,Encoding_buffer * to_compress)151   Compression_position begin_compression(const uint8_t msg_id,
152                                          const Compression_type type,
153                                          Encoding_buffer *to_compress) {
154     Compression_position result;
155 
156     result.m_msg_id = msg_id;
157     begin_xmessage<tags::Compression::server_id, 100>(&result);
158 
159     switch (type) {
160       case Compression_type::k_single:
161       case Compression_type::k_multiple:
162         set_header_config(Header_configuration::k_full);
163         encode_field_var_uint32<tags::Compression::server_messages>(msg_id);
164         break;
165       case Compression_type::k_group:
166         set_header_config(Header_configuration::k_full);
167         break;
168     }
169 
170     result.m_uncompressed_size =
171         encode_field_fixed_uint32<tags::Compression::uncompressed_size>();
172     begin_delimited_field<tags::Compression::payload>(&result.m_payload);
173 
174     DBUG_ASSERT(to_compress->m_current == to_compress->m_front);
175     DBUG_ASSERT(to_compress->m_current->m_begin_data ==
176                 to_compress->m_current->m_current_data);
177     result.m_compressed_buffer = m_buffer;
178     result.m_compression_type = type;
179     // Reset buffer, and initialize the 'handy' data hold inside
180     // 'Encoder_primitives'
181     buffer_set(to_compress);
182 
183     return result;
184   }
185 
end_compression(const Compression_position & position,Compression_buffer_interface * compress)186   bool end_compression(const Compression_position &position,
187                        Compression_buffer_interface *compress) {
188     Position before_compression{m_buffer->m_front,
189                                 m_buffer->m_front->m_begin_data};
190     const auto before_compression_size =
191         before_compression.bytes_until_page(m_page);
192 
193     position.m_uncompressed_size.encode(before_compression_size);
194 
195     if (!compress->process(position.m_compressed_buffer, m_buffer))
196       return false;
197 
198     auto ptr = position.m_position;
199     const auto message_size =
200         position.bytes_until_page(position.m_compressed_buffer->m_current);
201 
202     // Lets discard data inside new/compression buffer
203     // in case when 'compress' call didn't do that.
204     m_buffer->reset();
205 
206     // and now we restore original buffer
207     buffer_set(position.m_compressed_buffer);
208     end_delimited_field(position.m_payload);
209     primitives::base::Fixint_length<4>::encode_value(ptr, message_size - 4);
210 
211     set_header_config(Header_configuration::k_full);
212 
213     return true;
214   }
215 
216   template <uint32_t id, uint32_t needed_buffer_size>
begin_xmessage()217   Position begin_xmessage() {
218     Position result;
219 
220     begin_xmessage<id, needed_buffer_size>(&result);
221 
222     return result;
223   }
224 
225   template <uint32_t needed_buffer_size>
begin_xmessage(const uint32_t id)226   Position begin_xmessage(const uint32_t id) {
227     Position result;
228 
229     ensure_buffer_size<needed_buffer_size + k_xmessage_header_length>();
230 
231     auto xmsg_start = m_page->m_current_data;
232     if (Header_configuration::k_full == m_header_configuration) {
233       auto xmsg_type = xmsg_start + 4;
234       primitives::base::Fixint_length<1>::encode_value(xmsg_type, id);
235     }
236     result.m_page = m_page;
237     result.m_position = xmsg_start;
238 
239     m_page->m_current_data += m_header_size;
240 
241     return result;
242   }
243 
244   template <uint32_t id, uint32_t needed_buffer_size>
begin_xmessage(Position * position)245   void begin_xmessage(Position *position) {
246     ensure_buffer_size<needed_buffer_size + k_xmessage_header_length>();
247 
248     auto xmsg_start = m_page->m_current_data;
249     if (Header_configuration::k_full == m_header_configuration) {
250       auto xmsg_type = xmsg_start + 4;
251       primitives::base::Fixint_length<1>::encode<id>(xmsg_type);
252     }
253     position->m_page = m_page;
254     position->m_position = xmsg_start;
255 
256     m_page->m_current_data += m_header_size;
257   }
258 
end_xmessage(const Position & position)259   void end_xmessage(const Position &position) {
260     auto ptr = position.get_position();
261 
262     if (Header_configuration::k_none != m_header_configuration) {
263       primitives::base::Fixint_length<4>::encode_value(
264           ptr, position.bytes_until_page(m_page) - 4);
265     }
266   }
267 
abort_xmessage(const Position & position)268   void abort_xmessage(const Position &position) {
269     auto page = position.m_page->m_next_page;
270 
271     m_buffer->remove_page_list(page);
272 
273     m_page = position.m_page;
274     m_page->m_current_data = position.m_position;
275   }
276 
abort_compression(const Compression_position & position)277   void abort_compression(const Compression_position &position) {
278     // Lets discard data inside new/compression buffer
279     // in case when 'compress' call didn't do that.
280     m_buffer->reset();
281 
282     // and now we restore original buffer
283     buffer_set(position.m_compressed_buffer);
284 
285     set_header_config(Header_configuration::k_full);
286 
287     abort_xmessage(position);
288   }
289 
290   template <uint32_t id, uint32_t delimiter_length = 1>
begin_delimited_field()291   Field_delimiter<delimiter_length> begin_delimited_field() {
292     Field_delimiter<delimiter_length> result;
293 
294     begin_delimited_field<id>(&result);
295 
296     return result;
297   }
298 
299   template <uint32_t id, uint32_t delimiter_length = 1>
begin_delimited_field(Field_delimiter<delimiter_length> * position)300   void begin_delimited_field(Field_delimiter<delimiter_length> *position) {
301     encode_field_delimited_header<id>();
302     position->m_position = m_page->m_current_data;
303     position->m_page = m_page;
304 
305     m_page->m_current_data += delimiter_length;
306   }
307 
308   template <uint32_t delimiter_length>
end_delimited_field(const Field_delimiter<delimiter_length> & position)309   void end_delimited_field(const Field_delimiter<delimiter_length> &position) {
310     auto ptr = position.get_position();
311     primitives::base::Varint_length<delimiter_length>::encode(
312         ptr, position.bytes_until_page(m_page) - delimiter_length);
313   }
314 };
315 
316 }  // namespace protocol
317 
318 #endif  // PLUGIN_X_PROTOCOL_ENCODERS_ENCODING_XPROTOCOL_H_
319