1-- Copyright (C) 2006-2018 Alexey Kopytov <akopytov@gmail.com>
2
3-- This program is free software; you can redistribute it and/or modify
4-- it under the terms of the GNU General Public License as published by
5-- the Free Software Foundation; either version 2 of the License, or
6-- (at your option) any later version.
7
8-- This program is distributed in the hope that it will be useful,
9-- but WITHOUT ANY WARRANTY; without even the implied warranty of
10-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11-- GNU General Public License for more details.
12
13-- You should have received a copy of the GNU General Public License
14-- along with this program; if not, write to the Free Software
15-- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17-- -----------------------------------------------------------------------------
18-- Common code for OLTP benchmarks.
19-- -----------------------------------------------------------------------------
20
21function init()
22   assert(event ~= nil,
23          "this script is meant to be included by other OLTP scripts and " ..
24             "should not be called directly.")
25end
26
27if sysbench.cmdline.command == nil then
28   error("Command is required. Supported commands: prepare, prewarm, run, " ..
29            "cleanup, help")
30end
31
32-- Command line options
33sysbench.cmdline.options = {
34   table_size =
35      {"Number of rows per table", 10000},
36   range_size =
37      {"Range size for range SELECT queries", 100},
38   tables =
39      {"Number of tables", 1},
40   point_selects =
41      {"Number of point SELECT queries per transaction", 10},
42   simple_ranges =
43      {"Number of simple range SELECT queries per transaction", 1},
44   sum_ranges =
45      {"Number of SELECT SUM() queries per transaction", 1},
46   order_ranges =
47      {"Number of SELECT ORDER BY queries per transaction", 1},
48   distinct_ranges =
49      {"Number of SELECT DISTINCT queries per transaction", 1},
50   index_updates =
51      {"Number of UPDATE index queries per transaction", 1},
52   non_index_updates =
53      {"Number of UPDATE non-index queries per transaction", 1},
54   delete_inserts =
55      {"Number of DELETE/INSERT combinations per transaction", 1},
56   range_selects =
57      {"Enable/disable all range SELECT queries", true},
58   auto_inc =
59   {"Use AUTO_INCREMENT column as Primary Key (for MySQL), " ..
60       "or its alternatives in other DBMS. When disabled, use " ..
61       "client-generated IDs", true},
62   skip_trx =
63      {"Don't start explicit transactions and execute all queries " ..
64          "in the AUTOCOMMIT mode", false},
65   secondary =
66      {"Use a secondary index in place of the PRIMARY KEY", false},
67   create_secondary =
68      {"Create a secondary index in addition to the PRIMARY KEY", true},
69   mysql_storage_engine =
70      {"Storage engine, if MySQL is used", "innodb"},
71   pgsql_variant =
72      {"Use this PostgreSQL variant when running with the " ..
73          "PostgreSQL driver. The only currently supported " ..
74          "variant is 'redshift'. When enabled, " ..
75          "create_secondary is automatically disabled, and " ..
76          "delete_inserts is set to 0"}
77}
78
79-- Prepare the dataset. This command supports parallel execution, i.e. will
80-- benefit from executing with --threads > 1 as long as --tables > 1
81function cmd_prepare()
82   local drv = sysbench.sql.driver()
83   local con = drv:connect()
84
85   for i = sysbench.tid % sysbench.opt.threads + 1, sysbench.opt.tables,
86   sysbench.opt.threads do
87     create_table(drv, con, i)
88   end
89end
90
91-- Preload the dataset into the server cache. This command supports parallel
92-- execution, i.e. will benefit from executing with --threads > 1 as long as
93-- --tables > 1
94--
95-- PS. Currently, this command is only meaningful for MySQL/InnoDB benchmarks
96function cmd_prewarm()
97   local drv = sysbench.sql.driver()
98   local con = drv:connect()
99
100   assert(drv:name() == "mysql", "prewarm is currently MySQL only")
101
102   -- Do not create on disk tables for subsequent queries
103   con:query("SET tmp_table_size=2*1024*1024*1024")
104   con:query("SET max_heap_table_size=2*1024*1024*1024")
105
106   for i = sysbench.tid % sysbench.opt.threads + 1, sysbench.opt.tables,
107   sysbench.opt.threads do
108      local t = "sbtest" .. i
109      print("Prewarming table " .. t)
110      con:query("ANALYZE TABLE sbtest" .. i)
111      con:query(string.format(
112                   "SELECT AVG(id) FROM " ..
113                      "(SELECT * FROM %s FORCE KEY (PRIMARY) " ..
114                      "LIMIT %u) t",
115                   t, sysbench.opt.table_size))
116      con:query(string.format(
117                   "SELECT COUNT(*) FROM " ..
118                      "(SELECT * FROM %s WHERE k LIKE '%%0%%' LIMIT %u) t",
119                   t, sysbench.opt.table_size))
120   end
121end
122
123-- Implement parallel prepare and prewarm commands
124sysbench.cmdline.commands = {
125   prepare = {cmd_prepare, sysbench.cmdline.PARALLEL_COMMAND},
126   prewarm = {cmd_prewarm, sysbench.cmdline.PARALLEL_COMMAND}
127}
128
129
130-- Template strings of random digits with 11-digit groups separated by dashes
131
132-- 10 groups, 119 characters
133local c_value_template = "###########-###########-###########-" ..
134   "###########-###########-###########-" ..
135   "###########-###########-###########-" ..
136   "###########"
137
138-- 5 groups, 59 characters
139local pad_value_template = "###########-###########-###########-" ..
140   "###########-###########"
141
142function get_c_value()
143   return sysbench.rand.string(c_value_template)
144end
145
146function get_pad_value()
147   return sysbench.rand.string(pad_value_template)
148end
149
150function create_table(drv, con, table_num)
151   local id_index_def, id_def
152   local engine_def = ""
153   local extra_table_options = ""
154   local query
155
156   if sysbench.opt.secondary then
157     id_index_def = "KEY xid"
158   else
159     id_index_def = "PRIMARY KEY"
160   end
161
162   if drv:name() == "mysql" or drv:name() == "attachsql" or
163      drv:name() == "drizzle"
164   then
165      if sysbench.opt.auto_inc then
166         id_def = "INTEGER NOT NULL AUTO_INCREMENT"
167      else
168         id_def = "INTEGER NOT NULL"
169      end
170      engine_def = "/*! ENGINE = " .. sysbench.opt.mysql_storage_engine .. " */"
171      extra_table_options = mysql_table_options or ""
172   elseif drv:name() == "pgsql"
173   then
174      if not sysbench.opt.auto_inc then
175         id_def = "INTEGER NOT NULL"
176      elseif pgsql_variant == 'redshift' then
177        id_def = "INTEGER IDENTITY(1,1)"
178      else
179        id_def = "SERIAL"
180      end
181   else
182      error("Unsupported database driver:" .. drv:name())
183   end
184
185   print(string.format("Creating table 'sbtest%d'...", table_num))
186
187   query = string.format([[
188CREATE TABLE sbtest%d(
189  id %s,
190  k INTEGER DEFAULT '0' NOT NULL,
191  c CHAR(120) DEFAULT '' NOT NULL,
192  pad CHAR(60) DEFAULT '' NOT NULL,
193  %s (id)
194) %s %s]],
195      table_num, id_def, id_index_def, engine_def, extra_table_options)
196
197   con:query(query)
198
199   if (sysbench.opt.table_size > 0) then
200      print(string.format("Inserting %d records into 'sbtest%d'",
201                          sysbench.opt.table_size, table_num))
202   end
203
204   if sysbench.opt.auto_inc then
205      query = "INSERT INTO sbtest" .. table_num .. "(k, c, pad) VALUES"
206   else
207      query = "INSERT INTO sbtest" .. table_num .. "(id, k, c, pad) VALUES"
208   end
209
210   con:bulk_insert_init(query)
211
212   local c_val
213   local pad_val
214
215   for i = 1, sysbench.opt.table_size do
216
217      c_val = get_c_value()
218      pad_val = get_pad_value()
219
220      if (sysbench.opt.auto_inc) then
221         query = string.format("(%d, '%s', '%s')",
222                               sb_rand(1, sysbench.opt.table_size), c_val,
223                               pad_val)
224      else
225         query = string.format("(%d, %d, '%s', '%s')",
226                               i, sb_rand(1, sysbench.opt.table_size), c_val,
227                               pad_val)
228      end
229
230      con:bulk_insert_next(query)
231   end
232
233   con:bulk_insert_done()
234
235   if sysbench.opt.create_secondary then
236      print(string.format("Creating a secondary index on 'sbtest%d'...",
237                          table_num))
238      con:query(string.format("CREATE INDEX k_%d ON sbtest%d(k)",
239                              table_num, table_num))
240   end
241end
242
243local t = sysbench.sql.type
244local stmt_defs = {
245   point_selects = {
246      "SELECT c FROM sbtest%u WHERE id=?",
247      t.INT},
248   simple_ranges = {
249      "SELECT c FROM sbtest%u WHERE id BETWEEN ? AND ?",
250      t.INT, t.INT},
251   sum_ranges = {
252      "SELECT SUM(k) FROM sbtest%u WHERE id BETWEEN ? AND ?",
253       t.INT, t.INT},
254   order_ranges = {
255      "SELECT c FROM sbtest%u WHERE id BETWEEN ? AND ? ORDER BY c",
256       t.INT, t.INT},
257   distinct_ranges = {
258      "SELECT DISTINCT c FROM sbtest%u WHERE id BETWEEN ? AND ? ORDER BY c",
259      t.INT, t.INT},
260   index_updates = {
261      "UPDATE sbtest%u SET k=k+1 WHERE id=?",
262      t.INT},
263   non_index_updates = {
264      "UPDATE sbtest%u SET c=? WHERE id=?",
265      {t.CHAR, 120}, t.INT},
266   deletes = {
267      "DELETE FROM sbtest%u WHERE id=?",
268      t.INT},
269   inserts = {
270      "INSERT INTO sbtest%u (id, k, c, pad) VALUES (?, ?, ?, ?)",
271      t.INT, t.INT, {t.CHAR, 120}, {t.CHAR, 60}},
272}
273
274function prepare_begin()
275   stmt.begin = con:prepare("BEGIN")
276end
277
278function prepare_commit()
279   stmt.commit = con:prepare("COMMIT")
280end
281
282function prepare_for_each_table(key)
283   for t = 1, sysbench.opt.tables do
284      stmt[t][key] = con:prepare(string.format(stmt_defs[key][1], t))
285
286      local nparam = #stmt_defs[key] - 1
287
288      if nparam > 0 then
289         param[t][key] = {}
290      end
291
292      for p = 1, nparam do
293         local btype = stmt_defs[key][p+1]
294         local len
295
296         if type(btype) == "table" then
297            len = btype[2]
298            btype = btype[1]
299         end
300         if btype == sysbench.sql.type.VARCHAR or
301            btype == sysbench.sql.type.CHAR then
302               param[t][key][p] = stmt[t][key]:bind_create(btype, len)
303         else
304            param[t][key][p] = stmt[t][key]:bind_create(btype)
305         end
306      end
307
308      if nparam > 0 then
309         stmt[t][key]:bind_param(unpack(param[t][key]))
310      end
311   end
312end
313
314function prepare_point_selects()
315   prepare_for_each_table("point_selects")
316end
317
318function prepare_simple_ranges()
319   prepare_for_each_table("simple_ranges")
320end
321
322function prepare_sum_ranges()
323   prepare_for_each_table("sum_ranges")
324end
325
326function prepare_order_ranges()
327   prepare_for_each_table("order_ranges")
328end
329
330function prepare_distinct_ranges()
331   prepare_for_each_table("distinct_ranges")
332end
333
334function prepare_index_updates()
335   prepare_for_each_table("index_updates")
336end
337
338function prepare_non_index_updates()
339   prepare_for_each_table("non_index_updates")
340end
341
342function prepare_delete_inserts()
343   prepare_for_each_table("deletes")
344   prepare_for_each_table("inserts")
345end
346
347function thread_init()
348   drv = sysbench.sql.driver()
349   con = drv:connect()
350
351   -- Create global nested tables for prepared statements and their
352   -- parameters. We need a statement and a parameter set for each combination
353   -- of connection/table/query
354   stmt = {}
355   param = {}
356
357   for t = 1, sysbench.opt.tables do
358      stmt[t] = {}
359      param[t] = {}
360   end
361
362   -- This function is a 'callback' defined by individual benchmark scripts
363   prepare_statements()
364end
365
366-- Close prepared statements
367function close_statements()
368   for t = 1, sysbench.opt.tables do
369      for k, s in pairs(stmt[t]) do
370         stmt[t][k]:close()
371      end
372   end
373   if (stmt.begin ~= nil) then
374      stmt.begin:close()
375   end
376   if (stmt.commit ~= nil) then
377      stmt.commit:close()
378   end
379end
380
381function thread_done()
382   close_statements()
383   con:disconnect()
384end
385
386function cleanup()
387   local drv = sysbench.sql.driver()
388   local con = drv:connect()
389
390   for i = 1, sysbench.opt.tables do
391      print(string.format("Dropping table 'sbtest%d'...", i))
392      con:query("DROP TABLE IF EXISTS sbtest" .. i )
393   end
394end
395
396local function get_table_num()
397   return sysbench.rand.uniform(1, sysbench.opt.tables)
398end
399
400local function get_id()
401   return sysbench.rand.default(1, sysbench.opt.table_size)
402end
403
404function begin()
405   stmt.begin:execute()
406end
407
408function commit()
409   stmt.commit:execute()
410end
411
412function execute_point_selects()
413   local tnum = get_table_num()
414   local i
415
416   for i = 1, sysbench.opt.point_selects do
417      param[tnum].point_selects[1]:set(get_id())
418
419      stmt[tnum].point_selects:execute()
420   end
421end
422
423local function execute_range(key)
424   local tnum = get_table_num()
425
426   for i = 1, sysbench.opt[key] do
427      local id = get_id()
428
429      param[tnum][key][1]:set(id)
430      param[tnum][key][2]:set(id + sysbench.opt.range_size - 1)
431
432      stmt[tnum][key]:execute()
433   end
434end
435
436function execute_simple_ranges()
437   execute_range("simple_ranges")
438end
439
440function execute_sum_ranges()
441   execute_range("sum_ranges")
442end
443
444function execute_order_ranges()
445   execute_range("order_ranges")
446end
447
448function execute_distinct_ranges()
449   execute_range("distinct_ranges")
450end
451
452function execute_index_updates()
453   local tnum = get_table_num()
454
455   for i = 1, sysbench.opt.index_updates do
456      param[tnum].index_updates[1]:set(get_id())
457
458      stmt[tnum].index_updates:execute()
459   end
460end
461
462function execute_non_index_updates()
463   local tnum = get_table_num()
464
465   for i = 1, sysbench.opt.non_index_updates do
466      param[tnum].non_index_updates[1]:set_rand_str(c_value_template)
467      param[tnum].non_index_updates[2]:set(get_id())
468
469      stmt[tnum].non_index_updates:execute()
470   end
471end
472
473function execute_delete_inserts()
474   local tnum = get_table_num()
475
476   for i = 1, sysbench.opt.delete_inserts do
477      local id = get_id()
478      local k = get_id()
479
480      param[tnum].deletes[1]:set(id)
481
482      param[tnum].inserts[1]:set(id)
483      param[tnum].inserts[2]:set(k)
484      param[tnum].inserts[3]:set_rand_str(c_value_template)
485      param[tnum].inserts[4]:set_rand_str(pad_value_template)
486
487      stmt[tnum].deletes:execute()
488      stmt[tnum].inserts:execute()
489   end
490end
491
492-- Re-prepare statements if we have reconnected, which is possible when some of
493-- the listed error codes are in the --mysql-ignore-errors list
494function sysbench.hooks.before_restart_event(errdesc)
495   if errdesc.sql_errno == 2013 or -- CR_SERVER_LOST
496      errdesc.sql_errno == 2055 or -- CR_SERVER_LOST_EXTENDED
497      errdesc.sql_errno == 2006 or -- CR_SERVER_GONE_ERROR
498      errdesc.sql_errno == 2011    -- CR_TCP_CONNECTION
499   then
500      close_statements()
501      prepare_statements()
502   end
503end
504