1 /*
2 ** 2014-12-11
3 **
4 ** The author disclaims copyright to this source code.  In place of
5 ** a legal notice, here is a blessing:
6 **
7 **    May you do good and not evil.
8 **    May you find forgiveness for yourself and forgive others.
9 **    May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 ** This file implements a simple standalone program used to stress the
13 ** SQLite library when accessing the same set of databases simultaneously
14 ** from multiple threads in shared-cache mode.
15 **
16 ** This test program runs on unix-like systems only.  It uses pthreads.
17 ** To compile:
18 **
19 **     gcc -g -Wall -I. threadtest4.c sqlite3.c -ldl -lpthread
20 **
21 ** To run:
22 **
23 **     ./a.out 10
24 **
25 ** The argument is the number of threads.  There are also options, such
26 ** as -wal and -multithread and -serialized.
27 **
28 ** Consider also compiling with clang instead of gcc and adding the
29 ** -fsanitize=thread option.
30 */
31 #include "sqlite3.h"
32 #include <pthread.h>
33 #include <sched.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <unistd.h>
38 #include <stdarg.h>
39 
40 /*
41 ** An instance of the following structure is passed into each worker
42 ** thread.
43 */
44 typedef struct WorkerInfo WorkerInfo;
45 struct WorkerInfo {
46   int tid;                    /* Thread ID */
47   int nWorker;                /* Total number of workers */
48   unsigned wkrFlags;          /* Flags */
49   sqlite3 *mainDb;            /* Database connection of the main thread */
50   sqlite3 *db;                /* Database connection of this thread */
51   int nErr;                   /* Number of errors seen by this thread */
52   int nTest;                  /* Number of tests run by this thread */
53   char *zMsg;                 /* Message returned by this thread */
54   pthread_t id;               /* Thread id */
55   pthread_mutex_t *pWrMutex;  /* Hold this mutex while writing */
56 };
57 
58 /*
59 ** Allowed values for WorkerInfo.wkrFlags
60 */
61 #define TT4_SERIALIZED    0x0000001   /* The --serialized option is used */
62 #define TT4_WAL           0x0000002   /* WAL mode in use */
63 #define TT4_TRACE         0x0000004   /* Trace activity */
64 
65 
66 /*
67 ** Report an OOM error and die if the argument is NULL
68 */
check_oom(void * x)69 static void check_oom(void *x){
70   if( x==0 ){
71     fprintf(stderr, "out of memory\n");
72     exit(1);
73   }
74 }
75 
76 /*
77 ** Allocate memory.  If the allocation fails, print an error message and
78 ** kill the process.
79 */
safe_malloc(int sz)80 static void *safe_malloc(int sz){
81   void *x = sqlite3_malloc(sz>0?sz:1);
82   check_oom(x);
83   return x;
84 }
85 
86 /*
87 ** Print a trace message for a worker
88 */
worker_trace(WorkerInfo * p,const char * zFormat,...)89 static void worker_trace(WorkerInfo *p, const char *zFormat, ...){
90   va_list ap;
91   char *zMsg;
92   if( (p->wkrFlags & TT4_TRACE)==0 ) return;
93   va_start(ap, zFormat);
94   zMsg = sqlite3_vmprintf(zFormat, ap);
95   check_oom(zMsg);
96   va_end(ap);
97   fprintf(stderr, "TRACE(%02d): %s\n", p->tid, zMsg);
98   sqlite3_free(zMsg);
99 }
100 
101 /*
102 ** Prepare a single SQL query
103 */
prep_sql(sqlite3 * db,const char * zFormat,...)104 static sqlite3_stmt *prep_sql(sqlite3 *db, const char *zFormat, ...){
105   va_list ap;
106   char *zSql;
107   int rc;
108   sqlite3_stmt *pStmt = 0;
109 
110   va_start(ap, zFormat);
111   zSql = sqlite3_vmprintf(zFormat, ap);
112   va_end(ap);
113   check_oom(zSql);
114   rc = sqlite3_prepare_v2(db, zSql, -1, &pStmt, 0);
115   if( rc!=SQLITE_OK ){
116     fprintf(stderr, "SQL error (%d,%d): %s\nWhile preparing: [%s]\n",
117             rc, sqlite3_extended_errcode(db), sqlite3_errmsg(db), zSql);
118     exit(1);
119   }
120   sqlite3_free(zSql);
121   return pStmt;
122 }
123 
124 /*
125 ** Run a SQL statements.  Panic if unable.
126 */
run_sql(WorkerInfo * p,const char * zFormat,...)127 static void run_sql(WorkerInfo *p, const char *zFormat, ...){
128   va_list ap;
129   char *zSql;
130   int rc;
131   sqlite3_stmt *pStmt = 0;
132   int nRetry = 0;
133 
134   va_start(ap, zFormat);
135   zSql = sqlite3_vmprintf(zFormat, ap);
136   va_end(ap);
137   check_oom(zSql);
138   rc = sqlite3_prepare_v2(p->db, zSql, -1, &pStmt, 0);
139   if( rc!=SQLITE_OK ){
140     fprintf(stderr, "SQL error (%d,%d): %s\nWhile preparing: [%s]\n",
141             rc, sqlite3_extended_errcode(p->db), sqlite3_errmsg(p->db), zSql);
142     exit(1);
143   }
144   worker_trace(p, "running [%s]", zSql);
145   while( (rc = sqlite3_step(pStmt))!=SQLITE_DONE ){
146     if( (rc&0xff)==SQLITE_BUSY || (rc&0xff)==SQLITE_LOCKED ){
147       sqlite3_reset(pStmt);
148       nRetry++;
149       if( nRetry<10 ){
150         worker_trace(p, "retry %d for [%s]", nRetry, zSql);
151         sched_yield();
152         continue;
153       }else{
154         fprintf(stderr, "Deadlock in thread %d while running [%s]\n",
155                 p->tid, zSql);
156         exit(1);
157       }
158     }
159     if( rc!=SQLITE_ROW ){
160       fprintf(stderr, "SQL error (%d,%d): %s\nWhile running [%s]\n",
161               rc, sqlite3_extended_errcode(p->db), sqlite3_errmsg(p->db), zSql);
162       exit(1);
163     }
164   }
165   sqlite3_free(zSql);
166   sqlite3_finalize(pStmt);
167 }
168 
169 
170 /*
171 ** Open the database connection for WorkerInfo.  The order in which
172 ** the files are opened is a function of the tid value.
173 */
worker_open_connection(WorkerInfo * p,int iCnt)174 static void worker_open_connection(WorkerInfo *p, int iCnt){
175   char *zFile;
176   int x;
177   int rc;
178   static const unsigned char aOrder[6][3] = {
179     { 1, 2, 3},
180     { 1, 3, 2},
181     { 2, 1, 3},
182     { 2, 3, 1},
183     { 3, 1, 2},
184     { 3, 2, 1}
185   };
186   x = (p->tid + iCnt) % 6;
187   zFile = sqlite3_mprintf("tt4-test%d.db", aOrder[x][0]);
188   check_oom(zFile);
189   worker_trace(p, "open %s", zFile);
190   rc = sqlite3_open_v2(zFile, &p->db,
191                        SQLITE_OPEN_READWRITE|SQLITE_OPEN_SHAREDCACHE, 0);
192   if( rc!=SQLITE_OK ){
193     fprintf(stderr, "sqlite_open_v2(%s) failed on thread %d\n",
194             zFile, p->tid);
195     exit(1);
196   }
197   sqlite3_free(zFile);
198   run_sql(p, "PRAGMA read_uncommitted=ON;");
199   sqlite3_busy_timeout(p->db, 10000);
200   run_sql(p, "PRAGMA synchronous=OFF;");
201   run_sql(p, "ATTACH 'tt4-test%d.db' AS aux1", aOrder[x][1]);
202   run_sql(p, "ATTACH 'tt4-test%d.db' AS aux2", aOrder[x][2]);
203 }
204 
205 /*
206 ** Close the worker database connection
207 */
worker_close_connection(WorkerInfo * p)208 static void worker_close_connection(WorkerInfo *p){
209   if( p->db ){
210     worker_trace(p, "close");
211     sqlite3_close(p->db);
212     p->db = 0;
213   }
214 }
215 
216 /*
217 ** Delete all content in the three databases associated with a
218 ** single thread.  Make this happen all in a single transaction if
219 ** inTrans is true, or separately for each database if inTrans is
220 ** false.
221 */
worker_delete_all_content(WorkerInfo * p,int inTrans)222 static void worker_delete_all_content(WorkerInfo *p, int inTrans){
223   if( inTrans ){
224     pthread_mutex_lock(p->pWrMutex);
225     run_sql(p, "BEGIN");
226     run_sql(p, "DELETE FROM t1 WHERE tid=%d", p->tid);
227     run_sql(p, "DELETE FROM t2 WHERE tid=%d", p->tid);
228     run_sql(p, "DELETE FROM t3 WHERE tid=%d", p->tid);
229     run_sql(p, "COMMIT");
230     pthread_mutex_unlock(p->pWrMutex);
231     p->nTest++;
232   }else{
233     pthread_mutex_lock(p->pWrMutex);
234     run_sql(p, "DELETE FROM t1 WHERE tid=%d", p->tid);
235     pthread_mutex_unlock(p->pWrMutex);
236     p->nTest++;
237     pthread_mutex_lock(p->pWrMutex);
238     run_sql(p, "DELETE FROM t2 WHERE tid=%d", p->tid);
239     pthread_mutex_unlock(p->pWrMutex);
240     p->nTest++;
241     pthread_mutex_lock(p->pWrMutex);
242     run_sql(p, "DELETE FROM t3 WHERE tid=%d", p->tid);
243     pthread_mutex_unlock(p->pWrMutex);
244     p->nTest++;
245   }
246 }
247 
248 /*
249 ** Create rows mn through mx in table iTab for the given worker
250 */
worker_add_content(WorkerInfo * p,int mn,int mx,int iTab)251 static void worker_add_content(WorkerInfo *p, int mn, int mx, int iTab){
252   char *zTabDef;
253   switch( iTab ){
254     case 1:  zTabDef = "t1(tid,sp,a,b,c)";  break;
255     case 2:  zTabDef = "t2(tid,sp,d,e,f)";  break;
256     case 3:  zTabDef = "t3(tid,sp,x,y,z)";  break;
257   }
258   pthread_mutex_lock(p->pWrMutex);
259   run_sql(p,
260      "WITH RECURSIVE\n"
261      " c(i) AS (VALUES(%d) UNION ALL SELECT i+1 FROM c WHERE i<%d)\n"
262      "INSERT INTO %s SELECT %d, zeroblob(3000), i, printf('%%d',i), i FROM c;",
263      mn, mx, zTabDef, p->tid
264   );
265   pthread_mutex_unlock(p->pWrMutex);
266   p->nTest++;
267 }
268 
269 /*
270 ** Set an error message on a worker
271 */
worker_error(WorkerInfo * p,const char * zFormat,...)272 static void worker_error(WorkerInfo *p, const char *zFormat, ...){
273   va_list ap;
274   p->nErr++;
275   sqlite3_free(p->zMsg);
276   va_start(ap, zFormat);
277   p->zMsg = sqlite3_vmprintf(zFormat, ap);
278   va_end(ap);
279 }
280 
281 /*
282 ** Each thread runs the following function.
283 */
worker_thread(void * pArg)284 static void *worker_thread(void *pArg){
285   WorkerInfo *p = (WorkerInfo*)pArg;
286   int iOuter;
287   int i;
288   int rc;
289   sqlite3_stmt *pStmt;
290 
291   printf("worker %d startup\n", p->tid);  fflush(stdout);
292   for(iOuter=1; iOuter<=p->nWorker; iOuter++){
293     worker_open_connection(p, iOuter);
294     for(i=0; i<4; i++){
295       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter)%3 + 1);
296       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter+1)%3 + 1);
297       worker_add_content(p, i*100+1, (i+1)*100, (p->tid+iOuter+2)%3 + 1);
298     }
299 
300     pStmt = prep_sql(p->db, "SELECT count(a) FROM t1 WHERE tid=%d", p->tid);
301     worker_trace(p, "query [%s]", sqlite3_sql(pStmt));
302     rc = sqlite3_step(pStmt);
303     if( rc!=SQLITE_ROW ){
304       worker_error(p, "Failed to step: %s", sqlite3_sql(pStmt));
305     }else if( sqlite3_column_int(pStmt, 0)!=400 ){
306       worker_error(p, "Wrong result: %d", sqlite3_column_int(pStmt,0));
307     }
308     sqlite3_finalize(pStmt);
309     if( p->nErr ) break;
310 
311     if( ((iOuter+p->tid)%3)==0 ){
312       sqlite3_db_release_memory(p->db);
313       p->nTest++;
314     }
315 
316     pthread_mutex_lock(p->pWrMutex);
317     run_sql(p, "BEGIN;");
318     run_sql(p, "UPDATE t1 SET c=NULL WHERE a=55");
319     run_sql(p, "UPDATE t2 SET f=NULL WHERE d=42");
320     run_sql(p, "UPDATE t3 SET z=NULL WHERE x=31");
321     run_sql(p, "ROLLBACK;");
322     p->nTest++;
323     pthread_mutex_unlock(p->pWrMutex);
324 
325 
326     if( iOuter==p->tid ){
327       pthread_mutex_lock(p->pWrMutex);
328       run_sql(p, "VACUUM");
329       pthread_mutex_unlock(p->pWrMutex);
330     }
331 
332     pStmt = prep_sql(p->db,
333        "SELECT t1.rowid, t2.rowid, t3.rowid"
334        "  FROM t1, t2, t3"
335        " WHERE t1.tid=%d AND t2.tid=%d AND t3.tid=%d"
336        "   AND t1.a<>t2.d AND t2.d<>t3.x"
337        " ORDER BY 1, 2, 3"
338        ,p->tid, p->tid, p->tid);
339     worker_trace(p, "query [%s]", sqlite3_sql(pStmt));
340     for(i=0; i<p->nWorker; i++){
341       rc = sqlite3_step(pStmt);
342       if( rc!=SQLITE_ROW ){
343         worker_error(p, "Failed to step: %s", sqlite3_sql(pStmt));
344         break;
345       }
346       sched_yield();
347     }
348     sqlite3_finalize(pStmt);
349     if( p->nErr ) break;
350 
351     worker_delete_all_content(p, (p->tid+iOuter)%2);
352     worker_close_connection(p);
353     p->db = 0;
354   }
355   worker_close_connection(p);
356   printf("worker %d finished\n", p->tid); fflush(stdout);
357   return 0;
358 }
359 
main(int argc,char ** argv)360 int main(int argc, char **argv){
361   int nWorker = 0;         /* Number of worker threads */
362   int i;                   /* Loop counter */
363   WorkerInfo *aInfo;       /* Information for each worker */
364   unsigned wkrFlags = 0;   /* Default worker flags */
365   int nErr = 0;            /* Number of errors */
366   int nTest = 0;           /* Number of tests */
367   int rc;                  /* Return code */
368   sqlite3 *db = 0;         /* Main database connection */
369   pthread_mutex_t wrMutex; /* The write serialization mutex */
370   WorkerInfo infoTop;      /* WorkerInfo for the main thread */
371   WorkerInfo *p;           /* Pointer to infoTop */
372 
373   sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
374   for(i=1; i<argc; i++){
375     const char *z = argv[i];
376     if( z[0]=='-' ){
377       if( z[1]=='-' && z[2]!=0 ) z++;
378       if( strcmp(z,"-multithread")==0 ){
379         sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
380         wkrFlags &= ~TT4_SERIALIZED;
381       }else if( strcmp(z,"-serialized")==0 ){
382         sqlite3_config(SQLITE_CONFIG_SERIALIZED);
383         wkrFlags |= TT4_SERIALIZED;
384       }else if( strcmp(z,"-wal")==0 ){
385         wkrFlags |= TT4_WAL;
386       }else if( strcmp(z,"-trace")==0 ){
387         wkrFlags |= TT4_TRACE;
388       }else{
389         fprintf(stderr, "unknown command-line option: %s\n", argv[i]);
390         exit(1);
391       }
392     }else if( z[0]>='1' && z[0]<='9' && nWorker==0 ){
393       nWorker = atoi(z);
394       if( nWorker<2 ){
395         fprintf(stderr, "minimum of 2 threads\n");
396         exit(1);
397       }
398     }else{
399       fprintf(stderr, "extra command-line argument: \"%s\"\n", argv[i]);
400       exit(1);
401     }
402   }
403   if( nWorker==0 ){
404     fprintf(stderr,
405        "usage:  %s ?OPTIONS? N\n"
406        "N is the number of threads and must be at least 2.\n"
407        "Options:\n"
408        "  --serialized\n"
409        "  --multithread\n"
410        "  --wal\n"
411        "  --trace\n"
412        ,argv[0]
413     );
414     exit(1);
415   }
416   if( !sqlite3_threadsafe() ){
417     fprintf(stderr, "requires a threadsafe build of SQLite\n");
418     exit(1);
419   }
420   sqlite3_initialize();
421   sqlite3_enable_shared_cache(1);
422   pthread_mutex_init(&wrMutex, 0);
423 
424   /* Initialize the test database files */
425   (void)unlink("tt4-test1.db");
426   (void)unlink("tt4-test2.db");
427   (void)unlink("tt4-test3.db");
428   rc = sqlite3_open("tt4-test1.db", &db);
429   if( rc!=SQLITE_OK ){
430     fprintf(stderr, "Unable to open test database: tt4-test2.db\n");
431     exit(1);
432   }
433   memset(&infoTop, 0, sizeof(infoTop));
434   infoTop.db = db;
435   infoTop.wkrFlags = wkrFlags;
436   p = &infoTop;
437   if( wkrFlags & TT4_WAL ){
438     run_sql(p, "PRAGMA journal_mode=WAL");
439   }
440   run_sql(p, "PRAGMA synchronous=OFF");
441   run_sql(p, "CREATE TABLE IF NOT EXISTS t1(tid INTEGER, sp, a, b, c)");
442   run_sql(p, "CREATE INDEX t1tid ON t1(tid)");
443   run_sql(p, "CREATE INDEX t1ab ON t1(a,b)");
444   run_sql(p, "ATTACH 'tt4-test2.db' AS 'test2'");
445   run_sql(p, "CREATE TABLE IF NOT EXISTS test2.t2(tid INTEGER, sp, d, e, f)");
446   run_sql(p, "CREATE INDEX test2.t2tid ON t2(tid)");
447   run_sql(p, "CREATE INDEX test2.t2de ON t2(d,e)");
448   run_sql(p, "ATTACH 'tt4-test3.db' AS 'test3'");
449   run_sql(p, "CREATE TABLE IF NOT EXISTS test3.t3(tid INTEGER, sp, x, y, z)");
450   run_sql(p, "CREATE INDEX test3.t3tid ON t3(tid)");
451   run_sql(p, "CREATE INDEX test3.t3xy ON t3(x,y)");
452   aInfo = safe_malloc( sizeof(*aInfo)*nWorker );
453   memset(aInfo, 0, sizeof(*aInfo)*nWorker);
454   for(i=0; i<nWorker; i++){
455     aInfo[i].tid = i+1;
456     aInfo[i].nWorker = nWorker;
457     aInfo[i].wkrFlags = wkrFlags;
458     aInfo[i].mainDb = db;
459     aInfo[i].pWrMutex = &wrMutex;
460     rc = pthread_create(&aInfo[i].id, 0, worker_thread, &aInfo[i]);
461     if( rc!=0 ){
462       fprintf(stderr, "thread creation failed for thread %d\n", i+1);
463       exit(1);
464     }
465     sched_yield();
466   }
467   for(i=0; i<nWorker; i++){
468     pthread_join(aInfo[i].id, 0);
469     printf("Joined thread %d: %d errors in %d tests",
470            aInfo[i].tid, aInfo[i].nErr, aInfo[i].nTest);
471     if( aInfo[i].zMsg ){
472       printf(": %s\n", aInfo[i].zMsg);
473     }else{
474       printf("\n");
475     }
476     nErr += aInfo[i].nErr;
477     nTest += aInfo[i].nTest;
478     fflush(stdout);
479   }
480   sqlite3_close(db);
481   sqlite3_free(aInfo);
482   printf("Total %d errors in %d tests\n", nErr, nTest);
483   return nErr;
484 }
485