1-- Copyright (C) 2017 Alexey Kopytov <akopytov@gmail.com> 2 3-- This program is free software; you can redistribute it and/or modify 4-- it under the terms of the GNU General Public License as published by 5-- the Free Software Foundation; either version 2 of the License, or 6-- (at your option) any later version. 7 8-- This program is distributed in the hope that it will be useful, 9-- but WITHOUT ANY WARRANTY; without even the implied warranty of 10-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11-- GNU General Public License for more details. 12 13-- You should have received a copy of the GNU General Public License 14-- along with this program; if not, write to the Free Software 15-- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17-- ---------------------------------------------------------------------- 18-- SQL API 19-- ---------------------------------------------------------------------- 20 21ffi = require("ffi") 22 23sysbench.sql = {} 24 25ffi.cdef[[ 26/* 27 The following definitions have been copied with modifications from db_driver.h 28*/ 29 30typedef enum 31{ 32 DB_ERROR_NONE, /* no error(s) */ 33 DB_ERROR_IGNORABLE, /* error should be ignored as defined by command 34 line arguments or a custom error handler */ 35 DB_ERROR_FATAL /* non-ignorable error */ 36} sql_error_t; 37 38typedef struct 39{ 40 const char *sname; /* short name */ 41 const char *lname; /* long name */ 42 43 const char opaque[?]; 44} sql_driver; 45 46typedef struct { 47 uint32_t len; /* Value length */ 48 const char *ptr; /* Value string */ 49} sql_value; 50 51/* Result set row definition */ 52 53typedef struct 54{ 55 void *ptr; /* Driver-specific row data */ 56 sql_value *values; /* Array of column values */ 57} sql_row; 58 59/* Query type for statistics */ 60 61typedef enum 62{ 63 SB_CNT_OTHER, 64 SB_CNT_READ, 65 SB_CNT_WRITE, 66 SB_CNT_TRX, 67 SB_CNT_ERROR, 68 SB_CNT_RECONNECT, 69 SB_CNT_MAX 70} sb_counter_type; 71 72typedef struct 73{ 74 sql_error_t error; /* Driver-independent error code */ 75 int sql_errno; /* Driver-specific error code */ 76 const char *sql_state; /* Database-specific SQL state */ 77 const char *sql_errmsg; /* Database-specific error message */ 78 sql_driver *driver; /* DB driver for this connection */ 79 80 const char opaque[?]; 81} sql_connection; 82 83typedef struct 84{ 85 sql_connection *connection; 86 87 const char opaque[?]; 88} sql_statement; 89 90/* Result set definition */ 91 92typedef struct 93{ 94 sb_counter_type counter; /* Statistical counter type */ 95 uint32_t nrows; /* Number of affected rows */ 96 uint32_t nfields; /* Number of fields */ 97 sql_statement *statement; /* Pointer to prepared statement (if used) */ 98 void *ptr; /* Pointer to driver-specific data */ 99 sql_row row; /* Last fetched row */ 100} sql_result; 101 102typedef enum 103{ 104 SQL_TYPE_NONE, 105 SQL_TYPE_TINYINT, 106 SQL_TYPE_SMALLINT, 107 SQL_TYPE_INT, 108 SQL_TYPE_BIGINT, 109 SQL_TYPE_FLOAT, 110 SQL_TYPE_DOUBLE, 111 SQL_TYPE_TIME, 112 SQL_TYPE_DATE, 113 SQL_TYPE_DATETIME, 114 SQL_TYPE_TIMESTAMP, 115 SQL_TYPE_CHAR, 116 SQL_TYPE_VARCHAR 117} sql_bind_type_t; 118 119typedef struct 120{ 121 sql_bind_type_t type; 122 void *buffer; 123 unsigned long *data_len; 124 unsigned long max_len; 125 char *is_null; 126} sql_bind; 127 128sql_driver *db_create(const char *); 129int db_destroy(sql_driver *drv); 130 131sql_connection *db_connection_create(sql_driver * drv); 132int db_connection_close(sql_connection *con); 133int db_connection_reconnect(sql_connection *con); 134void db_connection_free(sql_connection *con); 135 136int db_bulk_insert_init(sql_connection *, const char *, size_t); 137int db_bulk_insert_next(sql_connection *, const char *, size_t); 138int db_bulk_insert_done(sql_connection *); 139 140sql_result *db_query(sql_connection *con, const char *query, size_t len); 141 142sql_row *db_fetch_row(sql_result *rs); 143 144sql_statement *db_prepare(sql_connection *con, const char *query, size_t len); 145int db_bind_param(sql_statement *stmt, sql_bind *params, size_t len); 146int db_bind_result(sql_statement *stmt, sql_bind *results, size_t len); 147sql_result *db_execute(sql_statement *stmt); 148int db_close(sql_statement *stmt); 149 150int db_free_results(sql_result *); 151]] 152 153local sql_driver = ffi.typeof('sql_driver *') 154local sql_connection = ffi.typeof('sql_connection *') 155local sql_statement = ffi.typeof('sql_statement *') 156local sql_bind = ffi.typeof('sql_bind'); 157local sql_result = ffi.typeof('sql_result'); 158local sql_value = ffi.typeof('sql_value'); 159local sql_row = ffi.typeof('sql_row'); 160 161sysbench.sql.type = 162{ 163 NONE = ffi.C.SQL_TYPE_NONE, 164 TINYINT = ffi.C.SQL_TYPE_TINYINT, 165 SMALLINT = ffi.C.SQL_TYPE_SMALLINT, 166 INT = ffi.C.SQL_TYPE_INT, 167 BIGINT = ffi.C.SQL_TYPE_BIGINT, 168 FLOAT = ffi.C.SQL_TYPE_FLOAT, 169 DOUBLE = ffi.C.SQL_TYPE_DOUBLE, 170 TIME = ffi.C.SQL_TYPE_TIME, 171 DATE = ffi.C.SQL_TYPE_DATE, 172 DATETIME = ffi.C.SQL_TYPE_DATETIME, 173 TIMESTAMP = ffi.C.SQL_TYPE_TIMESTAMP, 174 CHAR = ffi.C.SQL_TYPE_CHAR, 175 VARCHAR = ffi.C.SQL_TYPE_VARCHAR 176 } 177 178-- Initialize a given SQL driver and return a handle to it to create 179-- connections. A nil driver name (i.e. no function argument) initializes the 180-- default driver, i.e. the one specified with --db-driver on the command line. 181function sysbench.sql.driver(driver_name) 182 local drv = ffi.C.db_create(driver_name) 183 if (drv == nil) then 184 error("failed to initialize the DB driver", 2) 185 end 186 return ffi.gc(drv, ffi.C.db_destroy) 187end 188 189-- sql_driver methods 190local driver_methods = {} 191 192function driver_methods.connect(self) 193 local con = ffi.C.db_connection_create(self) 194 if con == nil then 195 error("connection creation failed", 2) 196 end 197 return ffi.gc(con, ffi.C.db_connection_free) 198end 199 200function driver_methods.name(self) 201 return ffi.string(self.sname) 202end 203 204-- sql_driver metatable 205local driver_mt = { 206 __index = driver_methods, 207 __gc = ffi.C.db_destroy, 208 __tostring = function() return '<sql_driver>' end, 209} 210ffi.metatype("sql_driver", driver_mt) 211 212-- sql_connection methods 213local connection_methods = {} 214 215function connection_methods.disconnect(self) 216 return assert(ffi.C.db_connection_close(self) == 0) 217end 218 219function connection_methods.reconnect(self) 220 return assert(ffi.C.db_connection_reconnect(self) == 0) 221end 222 223function connection_methods.check_error(self, rs, query) 224 if rs ~= nil or self.error == sysbench.sql.error.NONE then 225 return rs 226 end 227 228 if self.sql_state == nil or self.sql_errmsg == nil then 229 -- It must be an API error, don't bother trying to downgrade it an 230 -- ignorable error 231 error("SQL API error", 3) 232 end 233 234 local sql_state = ffi.string(self.sql_state) 235 local sql_errmsg = ffi.string(self.sql_errmsg) 236 237 -- Create an error descriptor containing connection, failed query, SQL error 238 -- number, state and error message provided by the SQL driver 239 errdesc = { 240 connection = self, 241 query = query, 242 sql_errno = self.sql_errno, 243 sql_state = sql_state, 244 sql_errmsg = sql_errmsg 245 } 246 247 -- Check if the error has already been marked as ignorable by the driver, or 248 -- there is an error hook that allows downgrading it to IGNORABLE 249 if (self.error == sysbench.sql.error.FATAL and 250 type(sysbench.hooks.sql_error_ignorable) == "function" and 251 sysbench.hooks.sql_error_ignorable(errdesc)) or 252 self.error == sysbench.sql.error.IGNORABLE 253 then 254 -- Throw a 'restart event' exception that can be caught by the user script 255 -- to do some extra steps to restart a transaction (e.g. reprepare 256 -- statements after a reconnect). Otherwise it will be caught by 257 -- thread_run() in sysbench.lua, in which case the entire current event 258 -- will be restarted without extra processing. 259 errdesc.errcode = sysbench.error.RESTART_EVENT 260 error(errdesc, 3) 261 end 262 263 -- Just throw a regular error message on a fatal error 264 error(string.format("SQL error, errno = %d, state = '%s': %s", 265 self.sql_errno, sql_state, sql_errmsg), 2) 266end 267 268function connection_methods.query(self, query) 269 local rs = ffi.C.db_query(self, query, #query) 270 return self:check_error(rs, query) 271end 272 273function connection_methods.bulk_insert_init(self, query) 274 return assert(ffi.C.db_bulk_insert_init(self, query, #query) == 0, 275 "db_bulk_insert_init() failed") 276end 277 278function connection_methods.bulk_insert_next(self, val) 279 return assert(ffi.C.db_bulk_insert_next(self, val, #val) == 0, 280 "db_bulk_insert_next() failed") 281end 282 283function connection_methods.bulk_insert_done(self) 284 return assert(ffi.C.db_bulk_insert_done(self) == 0, 285 "db_bulk_insert_done() failed") 286end 287 288function connection_methods.prepare(self, query) 289 local stmt = ffi.C.db_prepare(self, query, #query) 290 if stmt == nil then 291 self:check_error(nil, query) 292 end 293 return stmt 294end 295 296-- A convenience wrapper around sql_connection:query() and 297-- sql_result:fetch_row(). Executes the specified query and returns the first 298-- row from the result set, if available, or nil otherwise 299function connection_methods.query_row(self, query) 300 local rs = self:query(query) 301 302 if rs == nil or rs.nrows == 0 then 303 return nil 304 end 305 306 return unpack(rs:fetch_row(), 1, rs.nfields) 307end 308 309-- sql_connection metatable 310local connection_mt = { 311 __index = connection_methods, 312 __tostring = function() return '<sql_connection>' end, 313 __gc = ffi.C.db_connection_free, 314} 315ffi.metatype("sql_connection", connection_mt) 316 317-- sql_param 318local sql_param = {} 319function sql_param.set(self, value) 320 local sql_type = sysbench.sql.type 321 local btype = self.type 322 323 if (value == nil) then 324 self.is_null[0] = true 325 return 326 end 327 328 self.is_null[0] = false 329 330 if btype == sql_type.TINYINT or 331 btype == sql_type.SMALLINT or 332 btype == sql_type.INT or 333 btype == sql_type.BIGINT 334 then 335 self.buffer[0] = value 336 elseif btype == sql_type.FLOAT or 337 btype == sql_type.DOUBLE 338 then 339 self.buffer[1] = value 340 elseif btype == sql_type.CHAR or 341 btype == sql_type.VARCHAR 342 then 343 local len = #value 344 len = self.max_len < len and self.max_len or len 345 ffi.copy(self.buffer, value, len) 346 self.data_len[0] = len 347 else 348 error("Unsupported argument type: " .. btype, 2) 349 end 350end 351 352function sql_param.set_rand_str(self, fmt) 353 local sql_type = sysbench.sql.type 354 local btype = self.type 355 356 self.is_null[0] = false 357 358 if btype == sql_type.CHAR or 359 btype == sql_type.VARCHAR 360 then 361 local len = #fmt 362 len = self.max_len < len and self.max_len or len 363 ffi.C.sb_rand_str(fmt, self.buffer) 364 self.data_len[0] = len 365 else 366 error("Unsupported argument type: " .. btype, 2) 367 end 368end 369 370sql_param.__index = sql_param 371sql_param.__tostring = function () return '<sql_param>' end 372 373-- sql_statement methods 374local statement_methods = {} 375 376function statement_methods.bind_create(self, btype, max_len) 377 local sql_type = sysbench.sql.type 378 379 local param = setmetatable({}, sql_param) 380 381 if btype == sql_type.TINYINT or 382 btype == sql_type.SMALLINT or 383 btype == sql_type.INT or 384 btype == sql_type.BIGINT 385 then 386 param.type = sql_type.BIGINT 387 param.buffer = ffi.new('int64_t[1]') 388 param.max_len = 8 389 elseif btype == sql_type.FLOAT or 390 btype == sql_type.DOUBLE 391 then 392 param.type = sql_type.DOUBLE 393 param.buffer = ffi.new('double[1]') 394 param.max_len = 8 395 elseif btype == sql_type.CHAR or 396 btype == sql_type.VARCHAR 397 then 398 param.type = sql_type.VARCHAR 399 param.buffer = ffi.new('char[?]', max_len) 400 param.max_len = max_len 401 else 402 error("Unsupported argument type: " .. btype, 2) 403 end 404 405 param.data_len = ffi.new('unsigned long[1]') 406 param.is_null = ffi.new('char[1]') 407 408 return param 409end 410 411function statement_methods.bind_param(self, ...) 412 local len = select('#', ...) 413 if len < 1 then return nil end 414 415 local binds = ffi.new("sql_bind[?]", len) 416 417 local i, param 418 419 for i, param in ipairs({...}) do 420 binds[i-1].type = param.type 421 binds[i-1].buffer = param.buffer 422 binds[i-1].data_len = param.data_len 423 binds[i-1].max_len = param.max_len 424 binds[i-1].is_null = param.is_null 425 end 426 return ffi.C.db_bind_param(self, binds, len) 427end 428 429function statement_methods.execute(self) 430 local rs = ffi.C.db_execute(self) 431 return self.connection:check_error(rs, '<prepared statement>') 432end 433 434function statement_methods.close(self) 435 return ffi.C.db_close(self) 436end 437 438-- sql_statement metatable 439local statement_mt = { 440 __index = statement_methods, 441 __tostring = function() return '<sql_statement>' end, 442 __gc = ffi.C.db_close, 443} 444ffi.metatype("sql_statement", statement_mt) 445 446local bind_mt = { 447 __tostring = function() return '<sql_bind>' end, 448} 449ffi.metatype("sql_bind", bind_mt) 450 451-- sql_result methods 452local result_methods = {} 453 454-- Returns the next row of values from a result set, or nil if there are no more 455-- rows to fetch. Values are returned as an array, i.e. a table with numeric 456-- indexes starting from 1. The total number of values (i.e. fields in a result 457-- set) can be obtained from sql_result.nfields. 458function result_methods.fetch_row(self) 459 local res = {} 460 local row = ffi.C.db_fetch_row(self) 461 462 if row == nil then 463 return nil 464 end 465 466 local i 467 for i = 0, self.nfields-1 do 468 if row.values[i].ptr ~= nil then -- not a NULL value 469 res[i+1] = ffi.string(row.values[i].ptr, tonumber(row.values[i].len)) 470 end 471 end 472 473 return res 474end 475 476function result_methods.free(self) 477 return assert(ffi.C.db_free_results(self) == 0, "db_free_results() failed") 478end 479 480-- sql_results metatable 481local result_mt = { 482 __index = result_methods, 483 __tostring = function() return '<sql_result>' end, 484 __gc = ffi.C.db_free_results 485} 486ffi.metatype("sql_result", result_mt) 487 488-- error codes 489sysbench.sql.error = {} 490sysbench.sql.error.NONE = ffi.C.DB_ERROR_NONE 491sysbench.sql.error.IGNORABLE = ffi.C.DB_ERROR_IGNORABLE 492sysbench.sql.error.FATAL = ffi.C.DB_ERROR_FATAL 493