1 /*
2   This is free and unencumbered software released into the public domain.
3 
4   Anyone is free to copy, modify, publish, use, compile, sell, or
5   distribute this software, either in source code form or as a compiled
6   binary, for any purpose, commercial or non-commercial, and by any
7   means.
8 
9   In jurisdictions that recognize copyright laws, the author or authors
10   of this software dedicate any and all copyright interest in the
11   software to the public domain. We make this dedication for the benefit
12   of the public at large and to the detriment of our heirs and
13   successors. We intend this dedication to be an overt act of
14   relinquishment in perpetuity of all present and future rights to this
15   software under copyright law.
16 
17   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18   EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19   MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20   IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21   OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22   ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23   OTHER DEALINGS IN THE SOFTWARE.
24 
25   For more information, please refer to <http://unlicense.org/>
26 */
27 
28 #include <stdio.h>
29 #include <string.h>
30 
31 #include "cassandra.h"
32 
33 #define CONCURRENCY_LEVEL 32
34 #define NUM_REQUESTS 10000
35 
36 CassSession* session = NULL;
37 const CassPrepared* prepared = NULL;
38 CassUuidGen* uuid_gen = NULL;
39 
40 int num_requests = 0;
41 int num_outstanding_requests = 0;
42 
print_error(CassFuture * future)43 void print_error(CassFuture* future) {
44   const char* message;
45   size_t message_length;
46   cass_future_error_message(future, &message, &message_length);
47   fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
48 }
49 
create_cluster(const char * hosts)50 CassCluster* create_cluster(const char* hosts) {
51   CassCluster* cluster = cass_cluster_new();
52   cass_cluster_set_contact_points(cluster, hosts);
53   return cluster;
54 }
55 
connect_session(CassSession * session,const CassCluster * cluster)56 CassError connect_session(CassSession* session, const CassCluster* cluster) {
57   CassError rc = CASS_OK;
58   CassFuture* future = cass_session_connect(session, cluster);
59 
60   cass_future_wait(future);
61   rc = cass_future_error_code(future);
62   if (rc != CASS_OK) {
63     print_error(future);
64   }
65   cass_future_free(future);
66 
67   return rc;
68 }
69 
execute_query(CassSession * session,const char * query)70 CassError execute_query(CassSession* session, const char* query) {
71   CassError rc = CASS_OK;
72   CassStatement* statement = cass_statement_new(query, 0);
73 
74   CassFuture* future = cass_session_execute(session, statement);
75   cass_future_wait(future);
76 
77   rc = cass_future_error_code(future);
78   if (rc != CASS_OK) {
79     print_error(future);
80   }
81 
82   cass_statement_free(statement);
83   cass_future_free(future);
84   return rc;
85 }
86 
prepare_insert(CassSession * session,const CassPrepared ** prepared)87 CassError prepare_insert(CassSession* session, const CassPrepared** prepared) {
88   CassError rc = CASS_OK;
89   const char* query = "INSERT INTO examples.concurrent_executions (id, value) VALUES (?, ?);";
90 
91   CassFuture* future = cass_session_prepare(session, query);
92   cass_future_wait(future);
93 
94   rc = cass_future_error_code(future);
95   if (rc != CASS_OK) {
96     print_error(future);
97   } else {
98     *prepared = cass_future_get_prepared(future);
99   }
100 
101   cass_future_free(future);
102 
103   return rc;
104 }
105 
insert_into_concurrent_executions()106 void insert_into_concurrent_executions() {
107   CassFuture* futures[CONCURRENCY_LEVEL];
108   int num_requests = NUM_REQUESTS;
109 
110   while (num_requests > 0) {
111     int i;
112     int num_outstanding_requests = CONCURRENCY_LEVEL;
113     if (num_requests < num_outstanding_requests) {
114       num_outstanding_requests = num_requests;
115     }
116     num_requests -= num_outstanding_requests;
117 
118     for (i = 0; i < num_outstanding_requests; ++i) {
119       CassUuid uuid;
120       char value_buffer[16];
121       CassStatement* statement = cass_prepared_bind(prepared);
122       cass_statement_set_is_idempotent(statement, cass_true);
123       cass_uuid_gen_random(uuid_gen, &uuid);
124       cass_statement_bind_uuid_by_name(statement, "id", uuid);
125       sprintf(value_buffer, "%d", i);
126       cass_statement_bind_string_by_name(statement, "value", value_buffer);
127 
128       futures[i] = cass_session_execute(session, statement);
129       cass_statement_free(statement);
130     }
131 
132     for (i = 0; i < num_outstanding_requests; ++i) {
133       CassFuture* future = futures[i];
134       CassError rc = cass_future_error_code(future);
135       if (rc != CASS_OK) {
136         print_error(future);
137       }
138       cass_future_free(future);
139     }
140   }
141 }
142 
main(int argc,char * argv[])143 int main(int argc, char* argv[]) {
144   CassCluster* cluster = NULL;
145   char* hosts = "127.0.0.1";
146   if (argc > 1) {
147     hosts = argv[1];
148   }
149   session = cass_session_new();
150   uuid_gen = cass_uuid_gen_new();
151   cluster = create_cluster(hosts);
152 
153   if (connect_session(session, cluster) != CASS_OK) {
154     cass_uuid_gen_free(uuid_gen);
155     cass_cluster_free(cluster);
156     cass_session_free(session);
157     return -1;
158   }
159 
160   execute_query(session, "CREATE KEYSPACE IF NOT EXISTS examples WITH replication = { \
161                                                         'class': 'SimpleStrategy', \
162                                                         'replication_factor': '1' }");
163   execute_query(session, "CREATE TABLE IF NOT EXISTS examples.concurrent_executions ( \
164                                                      id uuid, \
165                                                      value text, \
166                                                      PRIMARY KEY (id))");
167 
168   if (prepare_insert(session, &prepared) == CASS_OK) {
169     insert_into_concurrent_executions();
170     cass_prepared_free(prepared);
171   }
172 
173   cass_uuid_gen_free(uuid_gen);
174   cass_cluster_free(cluster);
175   cass_session_free(session);
176 
177   return 0;
178 }
179