1 /* 2 * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License, version 2.0, 6 * as published by the Free Software Foundation. 7 * 8 * This program is also distributed with certain software (including 9 * but not limited to OpenSSL) that is licensed under separate terms, 10 * as designated in a particular file or component or in included license 11 * documentation. The authors of MySQL hereby grant you an additional 12 * permission to link the program and your derivative works with the 13 * separately licensed software that they have included with MySQL. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU General Public License, version 2.0, for more details. 19 * 20 * You should have received a copy of the GNU General Public License 21 * along with this program; if not, write to the Free Software 22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 #ifndef PLUGIN_X_PROTOCOL_ENCODERS_ENCODING_XPROTOCOL_H_ 26 #define PLUGIN_X_PROTOCOL_ENCODERS_ENCODING_XPROTOCOL_H_ 27 28 #include <google/protobuf/wire_format_lite.h> 29 #include <cassert> 30 #include <cstdint> 31 #include <string> 32 33 #include "my_dbug.h" 34 35 #include "plugin/x/protocol/encoders/encoding_protobuf.h" 36 37 namespace protocol { 38 39 enum class Compression_type { k_single, k_multiple, k_group }; 40 41 class Compression_buffer_interface { 42 public: 43 virtual ~Compression_buffer_interface() = default; 44 45 virtual void reset_counters() = 0; 46 virtual bool process(Encoding_buffer *output_buffer, 47 const Encoding_buffer *input_buffer) = 0; 48 49 virtual void get_processed_data(uint32_t *out_uncompressed, 50 uint32_t *out_compressed) = 0; 51 }; 52 53 /** 54 This class is wraps protobuf payload with X Protocol header 55 56 This class generates X Protocol headers for protobuf messages 57 and for compressed messages. 58 Additionally it supplies sub-field protobuf functionality, 59 because similar mechanism was used for generation for protobuf 60 fields and X headers. 61 */ 62 class XProtocol_encoder : public Protobuf_encoder { 63 private: 64 constexpr static uint32_t k_xmessage_header_length = 5; 65 66 enum class Header_configuration { k_full, k_size_only, k_none }; 67 68 Header_configuration m_header_configuration = Header_configuration::k_full; 69 uint32_t m_header_size = header_size(m_header_configuration); 70 set_header_config(const Header_configuration config)71 void set_header_config(const Header_configuration config) { 72 m_header_configuration = config; 73 m_header_size = header_size(m_header_configuration); 74 } 75 header_size(const Header_configuration config)76 static uint32_t header_size(const Header_configuration config) { 77 switch (config) { 78 case Header_configuration::k_full: 79 return 5; 80 case Header_configuration::k_none: 81 return 0; 82 case Header_configuration::k_size_only: 83 return 4; 84 default: 85 assert(false && "Not allowed value"); 86 return 0; 87 } 88 } 89 90 public: XProtocol_encoder(Encoding_buffer * buffer)91 explicit XProtocol_encoder(Encoding_buffer *buffer) 92 : Protobuf_encoder(buffer) { 93 ensure_buffer_size<1>(); 94 } 95 96 struct Position { 97 Page *m_page; 98 uint8_t *m_position; 99 get_positionPosition100 uint8_t *get_position() const { return m_position; } 101 bytes_until_pagePosition102 uint32_t bytes_until_page(Page *current_page) const { 103 uint32_t size = m_page->m_current_data - m_position; 104 105 if (current_page == m_page) { 106 return size; 107 } 108 109 Page *i = m_page->m_next_page; 110 for (;;) { 111 assert(nullptr != i); 112 size += i->get_used_bytes(); 113 114 if (i == current_page) { 115 assert(nullptr == i->m_next_page); 116 break; 117 } 118 119 i = i->m_next_page; 120 } 121 122 return size; 123 } 124 }; 125 126 template <uint32_t delimiter_length> 127 struct Field_delimiter : Position {}; 128 129 struct Compression_position : Position { 130 Encoding_buffer *m_compressed_buffer; 131 Compression_type m_compression_type; 132 Delayed_fixed_varuint32 m_uncompressed_size; 133 Field_delimiter<5> m_payload; 134 uint8_t m_msg_id; 135 }; 136 137 template <uint32_t id> empty_xmessage()138 void empty_xmessage() { 139 ensure_buffer_size<k_xmessage_header_length>(); 140 141 if (Header_configuration::k_full == m_header_configuration) { 142 DBUG_LOG("debug", "empty_msg_full_header"); 143 primitives::base::Fixint_length<4>::encode<1>(m_page->m_current_data); 144 primitives::base::Fixint_length<1>::encode<id>(m_page->m_current_data); 145 } else if (Header_configuration::k_size_only == m_header_configuration) { 146 DBUG_LOG("debug", "empty_msg_size_only"); 147 primitives::base::Fixint_length<4>::encode<0>(m_page->m_current_data); 148 } 149 } 150 begin_compression(const uint8_t msg_id,const Compression_type type,Encoding_buffer * to_compress)151 Compression_position begin_compression(const uint8_t msg_id, 152 const Compression_type type, 153 Encoding_buffer *to_compress) { 154 Compression_position result; 155 156 result.m_msg_id = msg_id; 157 begin_xmessage<tags::Compression::server_id, 100>(&result); 158 159 switch (type) { 160 case Compression_type::k_single: 161 case Compression_type::k_multiple: 162 set_header_config(Header_configuration::k_full); 163 encode_field_var_uint32<tags::Compression::server_messages>(msg_id); 164 break; 165 case Compression_type::k_group: 166 set_header_config(Header_configuration::k_full); 167 break; 168 } 169 170 result.m_uncompressed_size = 171 encode_field_fixed_uint32<tags::Compression::uncompressed_size>(); 172 begin_delimited_field<tags::Compression::payload>(&result.m_payload); 173 174 DBUG_ASSERT(to_compress->m_current == to_compress->m_front); 175 DBUG_ASSERT(to_compress->m_current->m_begin_data == 176 to_compress->m_current->m_current_data); 177 result.m_compressed_buffer = m_buffer; 178 result.m_compression_type = type; 179 // Reset buffer, and initialize the 'handy' data hold inside 180 // 'Encoder_primitives' 181 buffer_set(to_compress); 182 183 return result; 184 } 185 end_compression(const Compression_position & position,Compression_buffer_interface * compress)186 bool end_compression(const Compression_position &position, 187 Compression_buffer_interface *compress) { 188 Position before_compression{m_buffer->m_front, 189 m_buffer->m_front->m_begin_data}; 190 const auto before_compression_size = 191 before_compression.bytes_until_page(m_page); 192 193 position.m_uncompressed_size.encode(before_compression_size); 194 195 if (!compress->process(position.m_compressed_buffer, m_buffer)) 196 return false; 197 198 auto ptr = position.m_position; 199 const auto message_size = 200 position.bytes_until_page(position.m_compressed_buffer->m_current); 201 202 // Lets discard data inside new/compression buffer 203 // in case when 'compress' call didn't do that. 204 m_buffer->reset(); 205 206 // and now we restore original buffer 207 buffer_set(position.m_compressed_buffer); 208 end_delimited_field(position.m_payload); 209 primitives::base::Fixint_length<4>::encode_value(ptr, message_size - 4); 210 211 set_header_config(Header_configuration::k_full); 212 213 return true; 214 } 215 216 template <uint32_t id, uint32_t needed_buffer_size> begin_xmessage()217 Position begin_xmessage() { 218 Position result; 219 220 begin_xmessage<id, needed_buffer_size>(&result); 221 222 return result; 223 } 224 225 template <uint32_t needed_buffer_size> begin_xmessage(const uint32_t id)226 Position begin_xmessage(const uint32_t id) { 227 Position result; 228 229 ensure_buffer_size<needed_buffer_size + k_xmessage_header_length>(); 230 231 auto xmsg_start = m_page->m_current_data; 232 if (Header_configuration::k_full == m_header_configuration) { 233 auto xmsg_type = xmsg_start + 4; 234 primitives::base::Fixint_length<1>::encode_value(xmsg_type, id); 235 } 236 result.m_page = m_page; 237 result.m_position = xmsg_start; 238 239 m_page->m_current_data += m_header_size; 240 241 return result; 242 } 243 244 template <uint32_t id, uint32_t needed_buffer_size> begin_xmessage(Position * position)245 void begin_xmessage(Position *position) { 246 ensure_buffer_size<needed_buffer_size + k_xmessage_header_length>(); 247 248 auto xmsg_start = m_page->m_current_data; 249 if (Header_configuration::k_full == m_header_configuration) { 250 auto xmsg_type = xmsg_start + 4; 251 primitives::base::Fixint_length<1>::encode<id>(xmsg_type); 252 } 253 position->m_page = m_page; 254 position->m_position = xmsg_start; 255 256 m_page->m_current_data += m_header_size; 257 } 258 end_xmessage(const Position & position)259 void end_xmessage(const Position &position) { 260 auto ptr = position.get_position(); 261 262 if (Header_configuration::k_none != m_header_configuration) { 263 primitives::base::Fixint_length<4>::encode_value( 264 ptr, position.bytes_until_page(m_page) - 4); 265 } 266 } 267 abort_xmessage(const Position & position)268 void abort_xmessage(const Position &position) { 269 auto page = position.m_page->m_next_page; 270 271 m_buffer->remove_page_list(page); 272 273 m_page = position.m_page; 274 m_page->m_current_data = position.m_position; 275 } 276 abort_compression(const Compression_position & position)277 void abort_compression(const Compression_position &position) { 278 // Lets discard data inside new/compression buffer 279 // in case when 'compress' call didn't do that. 280 m_buffer->reset(); 281 282 // and now we restore original buffer 283 buffer_set(position.m_compressed_buffer); 284 285 set_header_config(Header_configuration::k_full); 286 287 abort_xmessage(position); 288 } 289 290 template <uint32_t id, uint32_t delimiter_length = 1> begin_delimited_field()291 Field_delimiter<delimiter_length> begin_delimited_field() { 292 Field_delimiter<delimiter_length> result; 293 294 begin_delimited_field<id>(&result); 295 296 return result; 297 } 298 299 template <uint32_t id, uint32_t delimiter_length = 1> begin_delimited_field(Field_delimiter<delimiter_length> * position)300 void begin_delimited_field(Field_delimiter<delimiter_length> *position) { 301 encode_field_delimited_header<id>(); 302 position->m_position = m_page->m_current_data; 303 position->m_page = m_page; 304 305 m_page->m_current_data += delimiter_length; 306 } 307 308 template <uint32_t delimiter_length> end_delimited_field(const Field_delimiter<delimiter_length> & position)309 void end_delimited_field(const Field_delimiter<delimiter_length> &position) { 310 auto ptr = position.get_position(); 311 primitives::base::Varint_length<delimiter_length>::encode( 312 ptr, position.bytes_until_page(m_page) - delimiter_length); 313 } 314 }; 315 316 } // namespace protocol 317 318 #endif // PLUGIN_X_PROTOCOL_ENCODERS_ENCODING_XPROTOCOL_H_ 319