1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1997, 2013 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 #include "ex_sql_utils.h"
9 
10 /*
11  * Create dozens of writer threads to insert data in parallel.
12  * DBSQL will not call the busy callback. It blocks instead.
13  */
14 
15 typedef struct {
16 	const char* db_name;        /* The filename of db. */
17 	int num_of_records;   /* The number of records to insert. */
18 	int thread_sn;              /* Serial number of thread. */
19 } thread_attr;
20 
21 /*
22  * Define the writer thread's workload.
23  * The writer would insert 5000 records in its thread. Commit if succeeded
24  * and rollback if failed.
25  */
26 static void*
writer(arg)27 writer(arg)
28 	void *arg;
29 {
30 	const char* sql;
31 	int txn_begin;
32 	int num_of_records;
33 	int thread_sn;
34 	int i, rc;
35 	sqlite3* db;
36 	sqlite3_stmt* stmt;
37 
38 	txn_begin = 0; /* Mark that explicit txn does not begin yet. */
39 
40 	/* Open database. */
41 	sqlite3_open(((thread_attr *)arg)->db_name, &db);
42 	error_handler(db);
43 
44 	/* Fetch attributes. */
45 	num_of_records = ((thread_attr *)arg)->num_of_records;
46 	thread_sn = ((thread_attr *)arg)->thread_sn;
47 
48 	/* Prepare the statement for use, many times over. */
49 	sql = "INSERT INTO university VALUES"
50 	      "(147, 'Tsinghua University China', 'tsinghua.edu.cn',"
51 	      "'cn', 'Asia', 237,63,432,303);";
52 	rc = sqlite3_prepare_v2(db, sql, (int)strlen(sql), &stmt, NULL);
53 	if (rc != SQLITE_OK)
54 		goto cleanup;
55 
56 	/*
57 	 * When we insert data many times over, we shall use explicit
58 	 * transaction to speed up the operations.
59 	 */
60 	rc = sqlite3_exec(db, "BEGIN TRANSACTION", NULL, 0, NULL);
61 	if (rc != SQLITE_OK)
62 		goto cleanup;
63 	txn_begin = 1; /* Mark that explicit txn began. */
64 
65 	for (i = 0; i < num_of_records; i++) {
66 		rc = sqlite3_step(stmt);
67 		/*
68 		 * Even if we encounter errors, the statement still has
69 		 * to be reset. Otherwise following rollback always
70 		 * hits SQLITE_BUSY
71 		 */
72 		sqlite3_reset(stmt);
73 		if (rc != SQLITE_DONE) {
74 			/* We can not return here. Rollback is required. */
75 			goto cleanup;
76 			break;
77 		}
78 	}
79 
80 	/* Commit if no errors. */
81 	rc = sqlite3_exec(db, "COMMIT TRANSACTION", NULL, 0, NULL);
82 	if (rc != SQLITE_OK)
83 		goto cleanup;
84 
85 cleanup:
86 	/* Error handle. */
87 	if (rc != SQLITE_OK && rc != SQLITE_DONE) {
88 		fprintf(stderr, "ERROR: %s. ERRCODE: %d.\n",
89 			sqlite3_errmsg(db), rc);
90 		/* Rollback if explict txn had begined. */
91 		if (txn_begin)
92 			sqlite3_exec(db, "ROLLBACK TRANSACTION", NULL, 0, NULL);
93 	}
94 
95 	/* Final cleanup. */
96 	sqlite3_finalize(stmt);
97 
98 	sqlite3_close(db);
99 	return NULL;
100 }
101 
102 /* Example body. */
103 static int
ex_sql_multi_thread(db,db_name)104 ex_sql_multi_thread(db, db_name)
105 	db_handle *db;
106 	const char* db_name;
107 {
108 	const char* sql;
109 	int nthreads;
110 	int ninsert;
111 	int i;
112 	thread_attr attr;
113 	os_thread_t pid;
114 
115 	nthreads = 20;
116 	ninsert  = 5000;
117 
118 	/* Display current status. */
119 	echo_info("Check existing record number of the table");
120 	sql = "SELECT count(*) FROM university;";
121 	exec_sql(db, sql);
122 
123 	/*
124 	 * Create n threads and write in parallel.
125 	 */
126 	echo_info("Now we begin to insert records by multi-writers.");
127 	attr.db_name = db_name;
128 	attr.num_of_records = ninsert;
129 
130 	for (i = 0; i < nthreads; i++) {
131 		attr.thread_sn = i;
132 		if (os_thread_create(&pid, writer, (void *)&attr)) {
133 			register_thread_id(pid);
134 			printf("%02dth writer starts to write %d rows\n",
135 				i + 1, ninsert);
136 			sqlite3_sleep(20);	/* Milliseconds. */
137 		} else {
138 			fprintf(stderr, "Failed to create thread\n");
139 		}
140 	}
141 	join_threads();
142 
143 	/* Display result. */
144 	echo_info("Check existing record number of the table");
145 	sql = "SELECT count(*) FROM university;";
146 	exec_sql(db, sql);
147 
148 	return 0;
149 }
150 
151 int
main()152 main()
153 {
154 	db_handle *db;
155 	const char* db_name = "ex_sql_multi_thread.db";
156 
157 	/* Check if current lib is threadsafe. */
158 	if(!sqlite3_threadsafe()) {
159 		fprintf(stderr,
160 			"ERROR: The libsqlite version is NOT threadsafe!\n");
161 		exit(EXIT_FAILURE);
162 	}
163 
164 	/* Setup environment and preload data. */
165 	db = setup(db_name);
166 	load_table_from_file(db, university_sample_data, 1/* Silent */);
167 
168 	/* Run example. */
169 	ex_sql_multi_thread(db, db_name);
170 
171 	/* End. */
172 	cleanup(db);
173 	return 0;
174 }
175 
176