1 /*
2  * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 
27 // "ngs_common/protocol_protobuf.h" has to come before boost includes, because of build
28 // issue in Solaris (unqualified map used, which clashes with some other map defined
29 // in Solaris headers)
30 #include "ngs_common/protocol_protobuf.h"
31 #include "ngs_common/connection_vio.h"
32 
33 #include "ngs/protocol/buffer.h"
34 #include "ngs/protocol/output_buffer.h"
35 #include "ngs/protocol/protocol_config.h"
36 #include "ngs/protocol_encoder.h"
37 #include "ngs/protocol_monitor.h"
38 #include "ngs/log.h"
39 
40 #undef ERROR // Needed to avoid conflict with ERROR in mysqlx.pb.h
41 
42 
43 using namespace ngs;
44 
45 const Pool_config Protocol_encoder::m_default_pool_config = { 0, 5, BUFFER_PAGE_SIZE };
46 
Protocol_encoder(const ngs::shared_ptr<Connection_vio> & socket,Error_handler ehandler,Protocol_monitor_interface & pmon)47 Protocol_encoder::Protocol_encoder(const ngs::shared_ptr<Connection_vio> &socket,
48                                    Error_handler ehandler,
49                                    Protocol_monitor_interface &pmon)
50 : m_pool(m_default_pool_config),
51   m_socket(socket),
52   m_error_handler(ehandler),
53   m_protocol_monitor(&pmon)
54 {
55   m_buffer.reset(ngs::allocate_object<Output_buffer>(ngs::ref(m_pool)));
56 }
57 
~Protocol_encoder()58 Protocol_encoder::~Protocol_encoder()
59 {
60 }
61 
start_row()62 void Protocol_encoder::start_row()
63 {
64   m_row_builder.start_row(get_buffer());
65 }
66 
abort_row()67 void Protocol_encoder::abort_row()
68 {
69   m_row_builder.abort_row();
70 }
71 
send_row()72 bool Protocol_encoder::send_row()
73 {
74   m_row_builder.end_row();
75   get_protocol_monitor().on_row_send();
76 
77   return send_raw_buffer(Mysqlx::ServerMessages::RESULTSET_ROW);
78 }
79 
send_result(const Error_code & result)80 bool Protocol_encoder::send_result(const Error_code &result)
81 {
82   if (result.error == 0)
83   {
84     Mysqlx::Ok ok;
85     if (!result.message.empty())
86       ok.set_msg(result.message);
87     return send_message(Mysqlx::ServerMessages::OK, ok);
88   }
89   else
90   {
91     if (result.severity == ngs::Error_code::FATAL)
92       get_protocol_monitor().on_fatal_error_send();
93     else
94       get_protocol_monitor().on_error_send();
95 
96     Mysqlx::Error error;
97     error.set_code(result.error);
98     error.set_msg(result.message);
99     error.set_sql_state(result.sql_state);
100     error.set_severity(result.severity == Error_code::FATAL ? Mysqlx::Error::FATAL : Mysqlx::Error::ERROR);
101     return send_message(Mysqlx::ServerMessages::ERROR, error);
102   }
103 }
104 
105 
send_ok()106 bool Protocol_encoder::send_ok()
107 {
108   return send_message(Mysqlx::ServerMessages::OK, Mysqlx::Ok());
109 }
110 
111 
send_ok(const std::string & message)112 bool Protocol_encoder::send_ok(const std::string &message)
113 {
114   Mysqlx::Ok ok;
115 
116   if (!message.empty())
117     ok.set_msg(message);
118 
119   return send_message(Mysqlx::ServerMessages::OK, ok);
120 }
121 
122 
send_init_error(const Error_code & error_code)123 bool Protocol_encoder::send_init_error(const Error_code& error_code)
124 {
125   m_protocol_monitor->on_init_error_send();
126 
127   Mysqlx::Error error;
128 
129   error.set_code(error_code.error);
130   error.set_msg(error_code.message);
131   error.set_sql_state(error_code.sql_state);
132   error.set_severity(Mysqlx::Error::FATAL);
133 
134   return send_message(Mysqlx::ServerMessages::ERROR, error);
135 }
136 
137 
send_local_notice(Notice_type type,const std::string & data,bool force_flush)138 void Protocol_encoder::send_local_notice(Notice_type type,
139                                          const std::string &data,
140                                          bool force_flush)
141 {
142   get_protocol_monitor().on_notice_other_send();
143 
144   send_notice(type, data, FRAME_SCOPE_LOCAL, force_flush);
145 }
146 
147 /*
148 NOTE: Commented for coverage. Uncomment when needed.
149 
150 void Protocol_encoder::send_global_notice(Notice_type type, const std::string &data)
151 {
152   get_protocol_monitor().on_notice_other_send();
153 
154   send_notice(type, data, FRAME_SCOPE_GLOBAL, true);
155 }
156 */
157 
send_local_warning(const std::string & data,bool force_flush)158 void Protocol_encoder::send_local_warning(const std::string &data, bool force_flush)
159 {
160   get_protocol_monitor().on_notice_warning_send();
161 
162   send_notice(k_notice_warning, data, FRAME_SCOPE_LOCAL, force_flush);
163 }
164 
165 
send_auth_ok(const std::string & data)166 void Protocol_encoder::send_auth_ok(const std::string &data)
167 {
168   Mysqlx::Session::AuthenticateOk msg;
169 
170   msg.set_auth_data(data);
171 
172   send_message(Mysqlx::ServerMessages::SESS_AUTHENTICATE_OK, msg);
173 }
174 
send_auth_continue(const std::string & data)175 void Protocol_encoder::send_auth_continue(const std::string &data)
176 {
177   Mysqlx::Session::AuthenticateContinue msg;
178 
179   msg.set_auth_data(data);
180 
181   send_message(Mysqlx::ServerMessages::SESS_AUTHENTICATE_CONTINUE, msg);
182 }
183 
send_empty_message(uint8_t message_id)184 bool Protocol_encoder::send_empty_message(uint8_t message_id)
185 {
186   log_raw_message_send(message_id);
187 
188   m_empty_msg_builder.encode_empty_message(m_buffer.get(), message_id);
189 
190   return enqueue_buffer(message_id);
191 }
192 
send_exec_ok()193 bool Protocol_encoder::send_exec_ok()
194 {
195   return send_empty_message(Mysqlx::ServerMessages::SQL_STMT_EXECUTE_OK);
196 }
197 
198 
send_result_fetch_done()199 bool Protocol_encoder::send_result_fetch_done()
200 {
201   return send_empty_message(Mysqlx::ServerMessages::RESULTSET_FETCH_DONE);
202 }
203 
204 
send_result_fetch_done_more_results()205 bool Protocol_encoder::send_result_fetch_done_more_results()
206 {
207   return send_empty_message(Mysqlx::ServerMessages::RESULTSET_FETCH_DONE_MORE_RESULTSETS);
208 }
209 
210 
get_protocol_monitor()211 Protocol_monitor_interface &Protocol_encoder::get_protocol_monitor()
212 {
213   return *m_protocol_monitor;
214 }
215 
send_message(int8_t type,const Message & message,bool force_buffer_flush)216 bool Protocol_encoder::send_message(int8_t type, const Message &message, bool force_buffer_flush)
217 {
218   const size_t header_size = 5;
219 
220   log_message_send(&message);
221 
222   if (Memory_allocated != m_buffer->reserve(header_size + message.ByteSize()))
223   {
224     on_error(ENOMEM);
225     return true;
226   }
227   if (!message.IsInitialized())
228   {
229     log_warning("Message is not properly initialized: %s", message.InitializationErrorString().c_str());
230   }
231 
232   // header
233   m_buffer->add_int32(message.ByteSize() + 1);
234   m_buffer->add_int8(type);
235 
236   message.SerializeToZeroCopyStream(m_buffer.get());
237 
238   return enqueue_buffer(type, force_buffer_flush);
239 }
240 
241 
on_error(int error)242 void Protocol_encoder::on_error(int error)
243 {
244   m_error_handler(error);
245 }
246 
247 
log_protobuf(const char * direction_name,Request & request)248 void Protocol_encoder::log_protobuf(const char *direction_name, Request &request)
249 {
250   const Message *message = request.message();
251 
252   if (NULL == message)
253   {
254     log_protobuf(request.get_type());
255     return;
256   }
257 
258   log_protobuf(direction_name, message);
259 }
260 
261 
log_protobuf(const char * direction_name,const Message * message)262 void Protocol_encoder::log_protobuf(const char *direction_name, const Message *message)
263 {
264 #ifdef USE_MYSQLX_FULL_PROTO
265   std::string text_message;
266 
267   if (message)
268     google::protobuf::TextFormat::PrintToString(*message, &text_message);
269 
270   if (text_message.length())
271   {
272     const std::size_t index_of_last_enter = text_message.find_last_of("\n");
273 
274     text_message.resize(index_of_last_enter);
275 
276     log_debug("%s: Type: %s, Payload:\n%s", direction_name, message->GetTypeName().c_str(), text_message.c_str());
277   }
278   else
279   {
280     log_debug("%s: Type: ??, Payload: (none)", direction_name);
281   }
282 #else
283   log_debug("%s: Type: %s", direction_name, message->GetTypeName().c_str());
284 #endif
285 }
286 
287 // for message sent as raw buffer only logging its type tag now
log_protobuf(int8_t type)288 void Protocol_encoder::log_protobuf(int8_t type)
289 {
290   log_debug("SEND RAW: Type: %d", type);
291 }
292 
293 
send_notice(uint32_t type,const std::string & data,Frame_scope scope,bool force_flush)294 void Protocol_encoder::send_notice(uint32_t type, const std::string &data,
295   Frame_scope scope, bool force_flush)
296 {
297   int iscope = (scope == FRAME_SCOPE_GLOBAL) ? static_cast<int>(Mysqlx::Notice::Frame_Scope_GLOBAL) :
298     static_cast<int>(Mysqlx::Notice::Frame_Scope_LOCAL);
299 
300   log_raw_message_send(Mysqlx::ServerMessages::NOTICE);
301 
302   m_notice_builder.encode_frame(m_buffer.get(), type, data, iscope);
303   enqueue_buffer(Mysqlx::ServerMessages::NOTICE, force_flush);
304 }
305 
send_rows_affected(uint64_t value)306 void Protocol_encoder::send_rows_affected(uint64_t value)
307 {
308   get_protocol_monitor().on_notice_other_send();
309   log_raw_message_send(Mysqlx::ServerMessages::NOTICE);
310 
311   m_notice_builder.encode_rows_affected(m_buffer.get(), value);
312   enqueue_buffer(Mysqlx::ServerMessages::NOTICE);
313 }
314 
send_column_metadata(const std::string & catalog,const std::string & db_name,const std::string & table_name,const std::string & org_table_name,const std::string & col_name,const std::string & org_col_name,uint64_t collation,int type,int decimals,uint32_t flags,uint32_t length,uint32_t content_type)315 bool Protocol_encoder::send_column_metadata(const std::string &catalog,
316   const std::string &db_name,
317   const std::string &table_name, const std::string &org_table_name,
318   const std::string &col_name, const std::string &org_col_name,
319   uint64_t collation, int type, int decimals,
320   uint32_t flags, uint32_t length, uint32_t content_type)
321 {
322   m_metadata_builder.encode_metadata(m_buffer.get(),
323     catalog, db_name, table_name, org_table_name,
324     col_name, org_col_name, collation, type, decimals,
325     flags, length, content_type);
326 
327   return send_raw_buffer(Mysqlx::ServerMessages::RESULTSET_COLUMN_META_DATA);
328 }
329 
send_column_metadata(uint64_t collation,int type,int decimals,uint32_t flags,uint32_t length,uint32_t content_type)330 bool Protocol_encoder::send_column_metadata(uint64_t collation, int type, int decimals,
331   uint32_t flags, uint32_t length, uint32_t content_type)
332 {
333   m_metadata_builder.encode_metadata(m_buffer.get(),
334     collation, type, decimals,
335     flags, length, content_type);
336 
337   return send_raw_buffer(Mysqlx::ServerMessages::RESULTSET_COLUMN_META_DATA);
338 }
339 
340 
flush_buffer()341 bool Protocol_encoder::flush_buffer()
342 {
343   const bool is_valid_socket = INVALID_SOCKET != m_socket->get_socket_id();
344 
345   if (is_valid_socket)
346   {
347     const ssize_t result = m_socket->write(m_buffer->get_buffers());
348     if (result <= 0)
349     {
350       log_info("Error writing to client: %s (%i)", strerror(errno), errno);
351       on_error(errno);
352       return false;
353     }
354 
355     m_protocol_monitor->on_send(static_cast<long>(result));
356   }
357 
358   m_buffer->reset();
359 
360   return true;
361 }
362 
363 
send_raw_buffer(int8_t type)364 bool Protocol_encoder::send_raw_buffer(int8_t type)
365 {
366   log_raw_message_send(type);
367 
368   return enqueue_buffer(type);
369 }
370 
371 
enqueue_buffer(int8_t type,bool force_flush)372 bool Protocol_encoder::enqueue_buffer(int8_t type, bool force_flush)
373 {
374 
375   bool can_buffer = (!force_flush) &&
376     (
377     (type == Mysqlx::ServerMessages::RESULTSET_COLUMN_META_DATA) ||
378     (type == Mysqlx::ServerMessages::RESULTSET_ROW) ||
379     (type == Mysqlx::ServerMessages::NOTICE) ||
380     (type == Mysqlx::ServerMessages::RESULTSET_FETCH_DONE)
381     );
382 
383   // todo: more testing for this thold
384   if (!can_buffer || (m_buffer->ByteCount() > BUFFER_PAGE_SIZE * 4))
385   {
386     return flush_buffer();
387   }
388 
389   return true;
390 }
391