1-- Copyright (C) 2017 Alexey Kopytov <akopytov@gmail.com>
2
3-- This program is free software; you can redistribute it and/or modify
4-- it under the terms of the GNU General Public License as published by
5-- the Free Software Foundation; either version 2 of the License, or
6-- (at your option) any later version.
7
8-- This program is distributed in the hope that it will be useful,
9-- but WITHOUT ANY WARRANTY; without even the implied warranty of
10-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11-- GNU General Public License for more details.
12
13-- You should have received a copy of the GNU General Public License
14-- along with this program; if not, write to the Free Software
15-- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17-- ----------------------------------------------------------------------
18-- SQL API
19-- ----------------------------------------------------------------------
20
21ffi = require("ffi")
22
23sysbench.sql = {}
24
25ffi.cdef[[
26/*
27  The following definitions have been copied with modifications from db_driver.h
28*/
29
30typedef enum
31{
32  DB_ERROR_NONE,                /* no error(s) */
33  DB_ERROR_IGNORABLE,           /* error should be ignored as defined by command
34                                line arguments or a custom error handler */
35  DB_ERROR_FATAL                /* non-ignorable error */
36} sql_error_t;
37
38typedef struct
39{
40  const char      *sname;    /* short name */
41  const char      *lname;    /* long name */
42
43  const char      opaque[?];
44} sql_driver;
45
46typedef struct {
47  uint32_t        len;         /* Value length */
48  const char      *ptr;        /* Value string */
49} sql_value;
50
51/* Result set row definition */
52
53typedef struct
54{
55  void            *ptr;        /* Driver-specific row data */
56  sql_value       *values;     /* Array of column values */
57} sql_row;
58
59/* Query type for statistics */
60
61typedef enum
62{
63  SB_CNT_OTHER,
64  SB_CNT_READ,
65  SB_CNT_WRITE,
66  SB_CNT_TRX,
67  SB_CNT_ERROR,
68  SB_CNT_RECONNECT,
69  SB_CNT_MAX
70} sb_counter_type;
71
72typedef struct
73{
74  sql_error_t     error;             /* Driver-independent error code */
75  int             sql_errno;         /* Driver-specific error code */
76  const char      *sql_state;        /* Database-specific SQL state */
77  const char      *sql_errmsg;       /* Database-specific error message */
78  sql_driver      *driver;           /* DB driver for this connection */
79
80  const char      opaque[?];
81} sql_connection;
82
83typedef struct
84{
85  sql_connection  *connection;
86
87  const char      opaque[?];
88} sql_statement;
89
90/* Result set definition */
91
92typedef struct
93{
94  sb_counter_type   counter;     /* Statistical counter type */
95  uint32_t       nrows;         /* Number of affected rows */
96  uint32_t       nfields;       /* Number of fields */
97  sql_statement  *statement;    /* Pointer to prepared statement (if used) */
98  void           *ptr;          /* Pointer to driver-specific data */
99  sql_row        row;           /* Last fetched row */
100} sql_result;
101
102typedef enum
103{
104  SQL_TYPE_NONE,
105  SQL_TYPE_TINYINT,
106  SQL_TYPE_SMALLINT,
107  SQL_TYPE_INT,
108  SQL_TYPE_BIGINT,
109  SQL_TYPE_FLOAT,
110  SQL_TYPE_DOUBLE,
111  SQL_TYPE_TIME,
112  SQL_TYPE_DATE,
113  SQL_TYPE_DATETIME,
114  SQL_TYPE_TIMESTAMP,
115  SQL_TYPE_CHAR,
116  SQL_TYPE_VARCHAR
117} sql_bind_type_t;
118
119typedef struct
120{
121  sql_bind_type_t   type;
122  void             *buffer;
123  unsigned long    *data_len;
124  unsigned long    max_len;
125  char             *is_null;
126} sql_bind;
127
128sql_driver *db_create(const char *);
129int db_destroy(sql_driver *drv);
130
131sql_connection *db_connection_create(sql_driver * drv);
132int db_connection_close(sql_connection *con);
133int db_connection_reconnect(sql_connection *con);
134void db_connection_free(sql_connection *con);
135
136int db_bulk_insert_init(sql_connection *, const char *, size_t);
137int db_bulk_insert_next(sql_connection *, const char *, size_t);
138int db_bulk_insert_done(sql_connection *);
139
140sql_result *db_query(sql_connection *con, const char *query, size_t len);
141
142sql_row *db_fetch_row(sql_result *rs);
143
144sql_statement *db_prepare(sql_connection *con, const char *query, size_t len);
145int db_bind_param(sql_statement *stmt, sql_bind *params, size_t len);
146int db_bind_result(sql_statement *stmt, sql_bind *results, size_t len);
147sql_result *db_execute(sql_statement *stmt);
148int db_close(sql_statement *stmt);
149
150int db_free_results(sql_result *);
151]]
152
153local sql_driver = ffi.typeof('sql_driver *')
154local sql_connection = ffi.typeof('sql_connection *')
155local sql_statement = ffi.typeof('sql_statement *')
156local sql_bind = ffi.typeof('sql_bind');
157local sql_result = ffi.typeof('sql_result');
158local sql_value = ffi.typeof('sql_value');
159local sql_row = ffi.typeof('sql_row');
160
161sysbench.sql.type =
162{
163      NONE = ffi.C.SQL_TYPE_NONE,
164      TINYINT = ffi.C.SQL_TYPE_TINYINT,
165      SMALLINT = ffi.C.SQL_TYPE_SMALLINT,
166      INT = ffi.C.SQL_TYPE_INT,
167      BIGINT = ffi.C.SQL_TYPE_BIGINT,
168      FLOAT = ffi.C.SQL_TYPE_FLOAT,
169      DOUBLE = ffi.C.SQL_TYPE_DOUBLE,
170      TIME = ffi.C.SQL_TYPE_TIME,
171      DATE = ffi.C.SQL_TYPE_DATE,
172      DATETIME = ffi.C.SQL_TYPE_DATETIME,
173      TIMESTAMP = ffi.C.SQL_TYPE_TIMESTAMP,
174      CHAR = ffi.C.SQL_TYPE_CHAR,
175      VARCHAR = ffi.C.SQL_TYPE_VARCHAR
176   }
177
178-- Initialize a given SQL driver and return a handle to it to create
179-- connections. A nil driver name (i.e. no function argument) initializes the
180-- default driver, i.e. the one specified with --db-driver on the command line.
181function sysbench.sql.driver(driver_name)
182   local drv = ffi.C.db_create(driver_name)
183   if (drv == nil) then
184      error("failed to initialize the DB driver", 2)
185   end
186   return ffi.gc(drv, ffi.C.db_destroy)
187end
188
189-- sql_driver methods
190local driver_methods = {}
191
192function driver_methods.connect(self)
193   local con = ffi.C.db_connection_create(self)
194   if con == nil then
195      error("connection creation failed", 2)
196   end
197   return ffi.gc(con, ffi.C.db_connection_free)
198end
199
200function driver_methods.name(self)
201   return ffi.string(self.sname)
202end
203
204-- sql_driver metatable
205local driver_mt = {
206   __index = driver_methods,
207   __gc = ffi.C.db_destroy,
208   __tostring = function() return '<sql_driver>' end,
209}
210ffi.metatype("sql_driver", driver_mt)
211
212-- sql_connection methods
213local connection_methods = {}
214
215function connection_methods.disconnect(self)
216   return assert(ffi.C.db_connection_close(self) == 0)
217end
218
219function connection_methods.reconnect(self)
220   return assert(ffi.C.db_connection_reconnect(self) == 0)
221end
222
223function connection_methods.check_error(self, rs, query)
224   if rs ~= nil or self.error == sysbench.sql.error.NONE then
225      return rs
226   end
227
228   if self.sql_state == nil or self.sql_errmsg == nil then
229      -- It must be an API error, don't bother trying to downgrade it an
230      -- ignorable error
231      error("SQL API error", 3)
232   end
233
234   local sql_state = ffi.string(self.sql_state)
235   local sql_errmsg = ffi.string(self.sql_errmsg)
236
237   -- Create an error descriptor containing connection, failed query, SQL error
238   -- number, state and error message provided by the SQL driver
239   errdesc = {
240      connection = self,
241      query = query,
242      sql_errno = self.sql_errno,
243      sql_state = sql_state,
244      sql_errmsg = sql_errmsg
245   }
246
247   -- Check if the error has already been marked as ignorable by the driver, or
248   -- there is an error hook that allows downgrading it to IGNORABLE
249   if (self.error == sysbench.sql.error.FATAL and
250          type(sysbench.hooks.sql_error_ignorable) == "function" and
251          sysbench.hooks.sql_error_ignorable(errdesc)) or
252      self.error == sysbench.sql.error.IGNORABLE
253   then
254      -- Throw a 'restart event' exception that can be caught by the user script
255      -- to do some extra steps to restart a transaction (e.g. reprepare
256      -- statements after a reconnect). Otherwise it will be caught by
257      -- thread_run() in sysbench.lua, in which case the entire current event
258      -- will be restarted without extra processing.
259      errdesc.errcode = sysbench.error.RESTART_EVENT
260      error(errdesc, 3)
261   end
262
263   -- Just throw a regular error message on a fatal error
264   error(string.format("SQL error, errno = %d, state = '%s': %s",
265                       self.sql_errno, sql_state, sql_errmsg), 2)
266end
267
268function connection_methods.query(self, query)
269   local rs = ffi.C.db_query(self, query, #query)
270   return self:check_error(rs, query)
271end
272
273function connection_methods.bulk_insert_init(self, query)
274   return assert(ffi.C.db_bulk_insert_init(self, query, #query) == 0,
275                 "db_bulk_insert_init() failed")
276end
277
278function connection_methods.bulk_insert_next(self, val)
279   return assert(ffi.C.db_bulk_insert_next(self, val, #val) == 0,
280                 "db_bulk_insert_next() failed")
281end
282
283function connection_methods.bulk_insert_done(self)
284   return assert(ffi.C.db_bulk_insert_done(self) == 0,
285                 "db_bulk_insert_done() failed")
286end
287
288function connection_methods.prepare(self, query)
289   local stmt = ffi.C.db_prepare(self, query, #query)
290   if stmt == nil then
291      self:check_error(nil, query)
292   end
293   return stmt
294end
295
296-- A convenience wrapper around sql_connection:query() and
297-- sql_result:fetch_row(). Executes the specified query and returns the first
298-- row from the result set, if available, or nil otherwise
299function connection_methods.query_row(self, query)
300   local rs = self:query(query)
301
302   if rs == nil or rs.nrows == 0 then
303      return nil
304   end
305
306   return unpack(rs:fetch_row(), 1, rs.nfields)
307end
308
309-- sql_connection metatable
310local connection_mt = {
311   __index = connection_methods,
312   __tostring = function() return '<sql_connection>' end,
313   __gc = ffi.C.db_connection_free,
314}
315ffi.metatype("sql_connection", connection_mt)
316
317-- sql_param
318local sql_param = {}
319function sql_param.set(self, value)
320   local sql_type = sysbench.sql.type
321   local btype = self.type
322
323   if (value == nil) then
324      self.is_null[0] = true
325      return
326   end
327
328   self.is_null[0] = false
329
330   if btype == sql_type.TINYINT or
331      btype == sql_type.SMALLINT or
332      btype == sql_type.INT or
333      btype == sql_type.BIGINT
334   then
335      self.buffer[0] = value
336   elseif btype == sql_type.FLOAT or
337      btype == sql_type.DOUBLE
338   then
339      self.buffer[1] = value
340   elseif btype == sql_type.CHAR or
341      btype == sql_type.VARCHAR
342   then
343      local len = #value
344      len = self.max_len < len and self.max_len or len
345      ffi.copy(self.buffer, value, len)
346      self.data_len[0] = len
347   else
348      error("Unsupported argument type: " .. btype, 2)
349   end
350end
351
352function sql_param.set_rand_str(self, fmt)
353   local sql_type = sysbench.sql.type
354   local btype = self.type
355
356   self.is_null[0] = false
357
358   if btype == sql_type.CHAR or
359      btype == sql_type.VARCHAR
360   then
361      local len = #fmt
362      len = self.max_len < len and self.max_len or len
363      ffi.C.sb_rand_str(fmt, self.buffer)
364      self.data_len[0] = len
365   else
366      error("Unsupported argument type: " .. btype, 2)
367   end
368end
369
370sql_param.__index = sql_param
371sql_param.__tostring = function () return '<sql_param>' end
372
373-- sql_statement methods
374local statement_methods = {}
375
376function statement_methods.bind_create(self, btype, max_len)
377   local sql_type = sysbench.sql.type
378
379   local param = setmetatable({}, sql_param)
380
381   if btype == sql_type.TINYINT or
382      btype == sql_type.SMALLINT or
383      btype == sql_type.INT or
384      btype == sql_type.BIGINT
385   then
386      param.type = sql_type.BIGINT
387      param.buffer = ffi.new('int64_t[1]')
388      param.max_len = 8
389   elseif btype == sql_type.FLOAT or
390      btype == sql_type.DOUBLE
391   then
392      param.type = sql_type.DOUBLE
393      param.buffer = ffi.new('double[1]')
394      param.max_len = 8
395   elseif btype == sql_type.CHAR or
396      btype == sql_type.VARCHAR
397   then
398      param.type = sql_type.VARCHAR
399      param.buffer = ffi.new('char[?]', max_len)
400      param.max_len = max_len
401   else
402      error("Unsupported argument type: " .. btype, 2)
403   end
404
405   param.data_len = ffi.new('unsigned long[1]')
406   param.is_null = ffi.new('char[1]')
407
408   return param
409end
410
411function statement_methods.bind_param(self, ...)
412   local len = select('#', ...)
413   if len  < 1 then return nil end
414
415   local binds = ffi.new("sql_bind[?]", len)
416
417   local i, param
418
419   for i, param in ipairs({...}) do
420      binds[i-1].type = param.type
421      binds[i-1].buffer = param.buffer
422      binds[i-1].data_len = param.data_len
423      binds[i-1].max_len = param.max_len
424      binds[i-1].is_null = param.is_null
425   end
426   return ffi.C.db_bind_param(self, binds, len)
427end
428
429function statement_methods.execute(self)
430   local rs = ffi.C.db_execute(self)
431   return self.connection:check_error(rs, '<prepared statement>')
432end
433
434function statement_methods.close(self)
435   return ffi.C.db_close(self)
436end
437
438-- sql_statement metatable
439local statement_mt = {
440   __index = statement_methods,
441   __tostring = function() return '<sql_statement>' end,
442   __gc = ffi.C.db_close,
443}
444ffi.metatype("sql_statement", statement_mt)
445
446local bind_mt = {
447   __tostring = function() return '<sql_bind>' end,
448}
449ffi.metatype("sql_bind", bind_mt)
450
451-- sql_result methods
452local result_methods = {}
453
454-- Returns the next row of values from a result set, or nil if there are no more
455-- rows to fetch. Values are returned as an array, i.e. a table with numeric
456-- indexes starting from 1. The total number of values (i.e. fields in a result
457-- set) can be obtained from sql_result.nfields.
458function result_methods.fetch_row(self)
459   local res = {}
460   local row = ffi.C.db_fetch_row(self)
461
462   if row == nil then
463      return nil
464   end
465
466   local i
467   for i = 0, self.nfields-1 do
468      if row.values[i].ptr ~= nil then -- not a NULL value
469         res[i+1] = ffi.string(row.values[i].ptr, tonumber(row.values[i].len))
470      end
471   end
472
473   return res
474end
475
476function result_methods.free(self)
477   return assert(ffi.C.db_free_results(self) == 0, "db_free_results() failed")
478end
479
480-- sql_results metatable
481local result_mt = {
482   __index = result_methods,
483   __tostring = function() return '<sql_result>' end,
484   __gc = ffi.C.db_free_results
485}
486ffi.metatype("sql_result", result_mt)
487
488-- error codes
489sysbench.sql.error = {}
490sysbench.sql.error.NONE = ffi.C.DB_ERROR_NONE
491sysbench.sql.error.IGNORABLE = ffi.C.DB_ERROR_IGNORABLE
492sysbench.sql.error.FATAL = ffi.C.DB_ERROR_FATAL
493