1 /*
2 * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2.0,
6 * as published by the Free Software Foundation.
7 *
8 * This program is also distributed with certain software (including
9 * but not limited to OpenSSL) that is licensed under separate terms,
10 * as designated in a particular file or component or in included license
11 * documentation. The authors of MySQL hereby grant you an additional
12 * permission to link the program and your derivative works with the
13 * separately licensed software that they have included with MySQL.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License, version 2.0, for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23 * 02110-1301 USA
24 */
25
26
27 // "ngs_common/protocol_protobuf.h" has to come before boost includes, because of build
28 // issue in Solaris (unqualified map used, which clashes with some other map defined
29 // in Solaris headers)
30 #include "ngs_common/protocol_protobuf.h"
31 #include "ngs_common/connection_vio.h"
32
33 #include "ngs/protocol/buffer.h"
34 #include "ngs/protocol/output_buffer.h"
35 #include "ngs/protocol/protocol_config.h"
36 #include "ngs/protocol_encoder.h"
37 #include "ngs/protocol_monitor.h"
38 #include "ngs/log.h"
39
40 #undef ERROR // Needed to avoid conflict with ERROR in mysqlx.pb.h
41
42
43 using namespace ngs;
44
45 const Pool_config Protocol_encoder::m_default_pool_config = { 0, 5, BUFFER_PAGE_SIZE };
46
Protocol_encoder(const ngs::shared_ptr<Connection_vio> & socket,Error_handler ehandler,Protocol_monitor_interface & pmon)47 Protocol_encoder::Protocol_encoder(const ngs::shared_ptr<Connection_vio> &socket,
48 Error_handler ehandler,
49 Protocol_monitor_interface &pmon)
50 : m_pool(m_default_pool_config),
51 m_socket(socket),
52 m_error_handler(ehandler),
53 m_protocol_monitor(&pmon)
54 {
55 m_buffer.reset(ngs::allocate_object<Output_buffer>(ngs::ref(m_pool)));
56 }
57
~Protocol_encoder()58 Protocol_encoder::~Protocol_encoder()
59 {
60 }
61
start_row()62 void Protocol_encoder::start_row()
63 {
64 m_row_builder.start_row(get_buffer());
65 }
66
abort_row()67 void Protocol_encoder::abort_row()
68 {
69 m_row_builder.abort_row();
70 }
71
send_row()72 bool Protocol_encoder::send_row()
73 {
74 m_row_builder.end_row();
75 get_protocol_monitor().on_row_send();
76
77 return send_raw_buffer(Mysqlx::ServerMessages::RESULTSET_ROW);
78 }
79
send_result(const Error_code & result)80 bool Protocol_encoder::send_result(const Error_code &result)
81 {
82 if (result.error == 0)
83 {
84 Mysqlx::Ok ok;
85 if (!result.message.empty())
86 ok.set_msg(result.message);
87 return send_message(Mysqlx::ServerMessages::OK, ok);
88 }
89 else
90 {
91 if (result.severity == ngs::Error_code::FATAL)
92 get_protocol_monitor().on_fatal_error_send();
93 else
94 get_protocol_monitor().on_error_send();
95
96 Mysqlx::Error error;
97 error.set_code(result.error);
98 error.set_msg(result.message);
99 error.set_sql_state(result.sql_state);
100 error.set_severity(result.severity == Error_code::FATAL ? Mysqlx::Error::FATAL : Mysqlx::Error::ERROR);
101 return send_message(Mysqlx::ServerMessages::ERROR, error);
102 }
103 }
104
105
send_ok()106 bool Protocol_encoder::send_ok()
107 {
108 return send_message(Mysqlx::ServerMessages::OK, Mysqlx::Ok());
109 }
110
111
send_ok(const std::string & message)112 bool Protocol_encoder::send_ok(const std::string &message)
113 {
114 Mysqlx::Ok ok;
115
116 if (!message.empty())
117 ok.set_msg(message);
118
119 return send_message(Mysqlx::ServerMessages::OK, ok);
120 }
121
122
send_init_error(const Error_code & error_code)123 bool Protocol_encoder::send_init_error(const Error_code& error_code)
124 {
125 m_protocol_monitor->on_init_error_send();
126
127 Mysqlx::Error error;
128
129 error.set_code(error_code.error);
130 error.set_msg(error_code.message);
131 error.set_sql_state(error_code.sql_state);
132 error.set_severity(Mysqlx::Error::FATAL);
133
134 return send_message(Mysqlx::ServerMessages::ERROR, error);
135 }
136
137
send_local_notice(Notice_type type,const std::string & data,bool force_flush)138 void Protocol_encoder::send_local_notice(Notice_type type,
139 const std::string &data,
140 bool force_flush)
141 {
142 get_protocol_monitor().on_notice_other_send();
143
144 send_notice(type, data, FRAME_SCOPE_LOCAL, force_flush);
145 }
146
147 /*
148 NOTE: Commented for coverage. Uncomment when needed.
149
150 void Protocol_encoder::send_global_notice(Notice_type type, const std::string &data)
151 {
152 get_protocol_monitor().on_notice_other_send();
153
154 send_notice(type, data, FRAME_SCOPE_GLOBAL, true);
155 }
156 */
157
send_local_warning(const std::string & data,bool force_flush)158 void Protocol_encoder::send_local_warning(const std::string &data, bool force_flush)
159 {
160 get_protocol_monitor().on_notice_warning_send();
161
162 send_notice(k_notice_warning, data, FRAME_SCOPE_LOCAL, force_flush);
163 }
164
165
send_auth_ok(const std::string & data)166 void Protocol_encoder::send_auth_ok(const std::string &data)
167 {
168 Mysqlx::Session::AuthenticateOk msg;
169
170 msg.set_auth_data(data);
171
172 send_message(Mysqlx::ServerMessages::SESS_AUTHENTICATE_OK, msg);
173 }
174
send_auth_continue(const std::string & data)175 void Protocol_encoder::send_auth_continue(const std::string &data)
176 {
177 Mysqlx::Session::AuthenticateContinue msg;
178
179 msg.set_auth_data(data);
180
181 send_message(Mysqlx::ServerMessages::SESS_AUTHENTICATE_CONTINUE, msg);
182 }
183
send_empty_message(uint8_t message_id)184 bool Protocol_encoder::send_empty_message(uint8_t message_id)
185 {
186 log_raw_message_send(message_id);
187
188 m_empty_msg_builder.encode_empty_message(m_buffer.get(), message_id);
189
190 return enqueue_buffer(message_id);
191 }
192
send_exec_ok()193 bool Protocol_encoder::send_exec_ok()
194 {
195 return send_empty_message(Mysqlx::ServerMessages::SQL_STMT_EXECUTE_OK);
196 }
197
198
send_result_fetch_done()199 bool Protocol_encoder::send_result_fetch_done()
200 {
201 return send_empty_message(Mysqlx::ServerMessages::RESULTSET_FETCH_DONE);
202 }
203
204
send_result_fetch_done_more_results()205 bool Protocol_encoder::send_result_fetch_done_more_results()
206 {
207 return send_empty_message(Mysqlx::ServerMessages::RESULTSET_FETCH_DONE_MORE_RESULTSETS);
208 }
209
210
get_protocol_monitor()211 Protocol_monitor_interface &Protocol_encoder::get_protocol_monitor()
212 {
213 return *m_protocol_monitor;
214 }
215
send_message(int8_t type,const Message & message,bool force_buffer_flush)216 bool Protocol_encoder::send_message(int8_t type, const Message &message, bool force_buffer_flush)
217 {
218 const size_t header_size = 5;
219
220 log_message_send(&message);
221
222 if (Memory_allocated != m_buffer->reserve(header_size + message.ByteSize()))
223 {
224 on_error(ENOMEM);
225 return true;
226 }
227 if (!message.IsInitialized())
228 {
229 log_warning("Message is not properly initialized: %s", message.InitializationErrorString().c_str());
230 }
231
232 // header
233 m_buffer->add_int32(message.ByteSize() + 1);
234 m_buffer->add_int8(type);
235
236 message.SerializeToZeroCopyStream(m_buffer.get());
237
238 return enqueue_buffer(type, force_buffer_flush);
239 }
240
241
on_error(int error)242 void Protocol_encoder::on_error(int error)
243 {
244 m_error_handler(error);
245 }
246
247
log_protobuf(const char * direction_name,Request & request)248 void Protocol_encoder::log_protobuf(const char *direction_name, Request &request)
249 {
250 const Message *message = request.message();
251
252 if (NULL == message)
253 {
254 log_protobuf(request.get_type());
255 return;
256 }
257
258 log_protobuf(direction_name, message);
259 }
260
261
log_protobuf(const char * direction_name,const Message * message)262 void Protocol_encoder::log_protobuf(const char *direction_name, const Message *message)
263 {
264 #ifdef USE_MYSQLX_FULL_PROTO
265 std::string text_message;
266
267 if (message)
268 google::protobuf::TextFormat::PrintToString(*message, &text_message);
269
270 if (text_message.length())
271 {
272 const std::size_t index_of_last_enter = text_message.find_last_of("\n");
273
274 text_message.resize(index_of_last_enter);
275
276 log_debug("%s: Type: %s, Payload:\n%s", direction_name, message->GetTypeName().c_str(), text_message.c_str());
277 }
278 else
279 {
280 log_debug("%s: Type: ??, Payload: (none)", direction_name);
281 }
282 #else
283 log_debug("%s: Type: %s", direction_name, message->GetTypeName().c_str());
284 #endif
285 }
286
287 // for message sent as raw buffer only logging its type tag now
log_protobuf(int8_t type)288 void Protocol_encoder::log_protobuf(int8_t type)
289 {
290 log_debug("SEND RAW: Type: %d", type);
291 }
292
293
send_notice(uint32_t type,const std::string & data,Frame_scope scope,bool force_flush)294 void Protocol_encoder::send_notice(uint32_t type, const std::string &data,
295 Frame_scope scope, bool force_flush)
296 {
297 int iscope = (scope == FRAME_SCOPE_GLOBAL) ? static_cast<int>(Mysqlx::Notice::Frame_Scope_GLOBAL) :
298 static_cast<int>(Mysqlx::Notice::Frame_Scope_LOCAL);
299
300 log_raw_message_send(Mysqlx::ServerMessages::NOTICE);
301
302 m_notice_builder.encode_frame(m_buffer.get(), type, data, iscope);
303 enqueue_buffer(Mysqlx::ServerMessages::NOTICE, force_flush);
304 }
305
send_rows_affected(uint64_t value)306 void Protocol_encoder::send_rows_affected(uint64_t value)
307 {
308 get_protocol_monitor().on_notice_other_send();
309 log_raw_message_send(Mysqlx::ServerMessages::NOTICE);
310
311 m_notice_builder.encode_rows_affected(m_buffer.get(), value);
312 enqueue_buffer(Mysqlx::ServerMessages::NOTICE);
313 }
314
send_column_metadata(const std::string & catalog,const std::string & db_name,const std::string & table_name,const std::string & org_table_name,const std::string & col_name,const std::string & org_col_name,uint64_t collation,int type,int decimals,uint32_t flags,uint32_t length,uint32_t content_type)315 bool Protocol_encoder::send_column_metadata(const std::string &catalog,
316 const std::string &db_name,
317 const std::string &table_name, const std::string &org_table_name,
318 const std::string &col_name, const std::string &org_col_name,
319 uint64_t collation, int type, int decimals,
320 uint32_t flags, uint32_t length, uint32_t content_type)
321 {
322 m_metadata_builder.encode_metadata(m_buffer.get(),
323 catalog, db_name, table_name, org_table_name,
324 col_name, org_col_name, collation, type, decimals,
325 flags, length, content_type);
326
327 return send_raw_buffer(Mysqlx::ServerMessages::RESULTSET_COLUMN_META_DATA);
328 }
329
send_column_metadata(uint64_t collation,int type,int decimals,uint32_t flags,uint32_t length,uint32_t content_type)330 bool Protocol_encoder::send_column_metadata(uint64_t collation, int type, int decimals,
331 uint32_t flags, uint32_t length, uint32_t content_type)
332 {
333 m_metadata_builder.encode_metadata(m_buffer.get(),
334 collation, type, decimals,
335 flags, length, content_type);
336
337 return send_raw_buffer(Mysqlx::ServerMessages::RESULTSET_COLUMN_META_DATA);
338 }
339
340
flush_buffer()341 bool Protocol_encoder::flush_buffer()
342 {
343 const bool is_valid_socket = INVALID_SOCKET != m_socket->get_socket_id();
344
345 if (is_valid_socket)
346 {
347 const ssize_t result = m_socket->write(m_buffer->get_buffers());
348 if (result <= 0)
349 {
350 log_info("Error writing to client: %s (%i)", strerror(errno), errno);
351 on_error(errno);
352 return false;
353 }
354
355 m_protocol_monitor->on_send(static_cast<long>(result));
356 }
357
358 m_buffer->reset();
359
360 return true;
361 }
362
363
send_raw_buffer(int8_t type)364 bool Protocol_encoder::send_raw_buffer(int8_t type)
365 {
366 log_raw_message_send(type);
367
368 return enqueue_buffer(type);
369 }
370
371
enqueue_buffer(int8_t type,bool force_flush)372 bool Protocol_encoder::enqueue_buffer(int8_t type, bool force_flush)
373 {
374
375 bool can_buffer = (!force_flush) &&
376 (
377 (type == Mysqlx::ServerMessages::RESULTSET_COLUMN_META_DATA) ||
378 (type == Mysqlx::ServerMessages::RESULTSET_ROW) ||
379 (type == Mysqlx::ServerMessages::NOTICE) ||
380 (type == Mysqlx::ServerMessages::RESULTSET_FETCH_DONE)
381 );
382
383 // todo: more testing for this thold
384 if (!can_buffer || (m_buffer->ByteCount() > BUFFER_PAGE_SIZE * 4))
385 {
386 return flush_buffer();
387 }
388
389 return true;
390 }
391