1 /*
2 This is free and unencumbered software released into the public domain.
3
4 Anyone is free to copy, modify, publish, use, compile, sell, or
5 distribute this software, either in source code form or as a compiled
6 binary, for any purpose, commercial or non-commercial, and by any
7 means.
8
9 In jurisdictions that recognize copyright laws, the author or authors
10 of this software dedicate any and all copyright interest in the
11 software to the public domain. We make this dedication for the benefit
12 of the public at large and to the detriment of our heirs and
13 successors. We intend this dedication to be an overt act of
14 relinquishment in perpetuity of all present and future rights to this
15 software under copyright law.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20 IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21 OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23 OTHER DEALINGS IN THE SOFTWARE.
24
25 For more information, please refer to <http://unlicense.org/>
26 */
27
28 #include <stdio.h>
29 #include <string.h>
30
31 #include "cassandra.h"
32
33 #define CONCURRENCY_LEVEL 32
34 #define NUM_REQUESTS 10000
35
36 CassSession* session = NULL;
37 const CassPrepared* prepared = NULL;
38 CassUuidGen* uuid_gen = NULL;
39
40 int num_requests = 0;
41 int num_outstanding_requests = 0;
42
print_error(CassFuture * future)43 void print_error(CassFuture* future) {
44 const char* message;
45 size_t message_length;
46 cass_future_error_message(future, &message, &message_length);
47 fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
48 }
49
create_cluster(const char * hosts)50 CassCluster* create_cluster(const char* hosts) {
51 CassCluster* cluster = cass_cluster_new();
52 cass_cluster_set_contact_points(cluster, hosts);
53 return cluster;
54 }
55
connect_session(CassSession * session,const CassCluster * cluster)56 CassError connect_session(CassSession* session, const CassCluster* cluster) {
57 CassError rc = CASS_OK;
58 CassFuture* future = cass_session_connect(session, cluster);
59
60 cass_future_wait(future);
61 rc = cass_future_error_code(future);
62 if (rc != CASS_OK) {
63 print_error(future);
64 }
65 cass_future_free(future);
66
67 return rc;
68 }
69
execute_query(CassSession * session,const char * query)70 CassError execute_query(CassSession* session, const char* query) {
71 CassError rc = CASS_OK;
72 CassStatement* statement = cass_statement_new(query, 0);
73
74 CassFuture* future = cass_session_execute(session, statement);
75 cass_future_wait(future);
76
77 rc = cass_future_error_code(future);
78 if (rc != CASS_OK) {
79 print_error(future);
80 }
81
82 cass_statement_free(statement);
83 cass_future_free(future);
84 return rc;
85 }
86
prepare_insert(CassSession * session,const CassPrepared ** prepared)87 CassError prepare_insert(CassSession* session, const CassPrepared** prepared) {
88 CassError rc = CASS_OK;
89 const char* query = "INSERT INTO examples.concurrent_executions (id, value) VALUES (?, ?);";
90
91 CassFuture* future = cass_session_prepare(session, query);
92 cass_future_wait(future);
93
94 rc = cass_future_error_code(future);
95 if (rc != CASS_OK) {
96 print_error(future);
97 } else {
98 *prepared = cass_future_get_prepared(future);
99 }
100
101 cass_future_free(future);
102
103 return rc;
104 }
105
insert_into_concurrent_executions()106 void insert_into_concurrent_executions() {
107 CassFuture* futures[CONCURRENCY_LEVEL];
108 int num_requests = NUM_REQUESTS;
109
110 while (num_requests > 0) {
111 int i;
112 int num_outstanding_requests = CONCURRENCY_LEVEL;
113 if (num_requests < num_outstanding_requests) {
114 num_outstanding_requests = num_requests;
115 }
116 num_requests -= num_outstanding_requests;
117
118 for (i = 0; i < num_outstanding_requests; ++i) {
119 CassUuid uuid;
120 char value_buffer[16];
121 CassStatement* statement = cass_prepared_bind(prepared);
122 cass_statement_set_is_idempotent(statement, cass_true);
123 cass_uuid_gen_random(uuid_gen, &uuid);
124 cass_statement_bind_uuid_by_name(statement, "id", uuid);
125 sprintf(value_buffer, "%d", i);
126 cass_statement_bind_string_by_name(statement, "value", value_buffer);
127
128 futures[i] = cass_session_execute(session, statement);
129 cass_statement_free(statement);
130 }
131
132 for (i = 0; i < num_outstanding_requests; ++i) {
133 CassFuture* future = futures[i];
134 CassError rc = cass_future_error_code(future);
135 if (rc != CASS_OK) {
136 print_error(future);
137 }
138 cass_future_free(future);
139 }
140 }
141 }
142
main(int argc,char * argv[])143 int main(int argc, char* argv[]) {
144 CassCluster* cluster = NULL;
145 char* hosts = "127.0.0.1";
146 if (argc > 1) {
147 hosts = argv[1];
148 }
149 session = cass_session_new();
150 uuid_gen = cass_uuid_gen_new();
151 cluster = create_cluster(hosts);
152
153 if (connect_session(session, cluster) != CASS_OK) {
154 cass_uuid_gen_free(uuid_gen);
155 cass_cluster_free(cluster);
156 cass_session_free(session);
157 return -1;
158 }
159
160 execute_query(session, "CREATE KEYSPACE IF NOT EXISTS examples WITH replication = { \
161 'class': 'SimpleStrategy', \
162 'replication_factor': '1' }");
163 execute_query(session, "CREATE TABLE IF NOT EXISTS examples.concurrent_executions ( \
164 id uuid, \
165 value text, \
166 PRIMARY KEY (id))");
167
168 if (prepare_insert(session, &prepared) == CASS_OK) {
169 insert_into_concurrent_executions();
170 cass_prepared_free(prepared);
171 }
172
173 cass_uuid_gen_free(uuid_gen);
174 cass_cluster_free(cluster);
175 cass_session_free(session);
176
177 return 0;
178 }
179