1 /*
2   This is free and unencumbered software released into the public domain.
3 
4   Anyone is free to copy, modify, publish, use, compile, sell, or
5   distribute this software, either in source code form or as a compiled
6   binary, for any purpose, commercial or non-commercial, and by any
7   means.
8 
9   In jurisdictions that recognize copyright laws, the author or authors
10   of this software dedicate any and all copyright interest in the
11   software to the public domain. We make this dedication for the benefit
12   of the public at large and to the detriment of our heirs and
13   successors. We intend this dedication to be an overt act of
14   relinquishment in perpetuity of all present and future rights to this
15   software under copyright law.
16 
17   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18   EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19   MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20   IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21   OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22   ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23   OTHER DEALINGS IN THE SOFTWARE.
24 
25   For more information, please refer to <http://unlicense.org/>
26 */
27 
28 #include <assert.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 
33 #include <uv.h>
34 
35 #include "cassandra.h"
36 
37 uv_mutex_t mutex;
38 uv_cond_t cond;
39 int exit_flag = 0;
40 CassUuidGen* uuid_gen = NULL;
41 
wait_exit()42 void wait_exit() {
43   uv_mutex_lock(&mutex);
44   while (exit_flag == 0) {
45     uv_cond_wait(&cond, &mutex);
46   }
47   uv_mutex_unlock(&mutex);
48 }
49 
signal_exit()50 void signal_exit() {
51   uv_mutex_lock(&mutex);
52   exit_flag = 1;
53   uv_cond_signal(&cond);
54   uv_mutex_unlock(&mutex);
55 }
56 
57 void on_set_keyspace(CassFuture* future, void* data);
58 void on_create_keyspace(CassFuture* future, void* data);
59 void on_create_table(CassFuture* future, void* data);
60 
61 void on_insert(CassFuture* future, void* data);
62 void on_select(CassFuture* future, void* data);
63 
64 void on_session_connect(CassFuture* future, void* data);
65 
print_error(CassFuture * future)66 void print_error(CassFuture* future) {
67   const char* message;
68   size_t message_length;
69   cass_future_error_message(future, &message, &message_length);
70   fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
71 }
72 
create_cluster(const char * hosts)73 CassCluster* create_cluster(const char* hosts) {
74   CassCluster* cluster = cass_cluster_new();
75   cass_cluster_set_contact_points(cluster, hosts);
76   return cluster;
77 }
78 
connect_session(CassSession * session,const CassCluster * cluster,CassFutureCallback callback)79 void connect_session(CassSession* session, const CassCluster* cluster,
80                      CassFutureCallback callback) {
81   CassFuture* future = cass_session_connect(session, cluster);
82   cass_future_set_callback(future, callback, session);
83   cass_future_free(future);
84 }
85 
execute_query(CassSession * session,const char * query,CassFutureCallback callback)86 void execute_query(CassSession* session, const char* query, CassFutureCallback callback) {
87   CassStatement* statement = cass_statement_new(query, 0);
88   CassFuture* future = cass_session_execute(session, statement);
89   cass_future_set_callback(future, callback, session);
90   cass_future_free(future);
91   cass_statement_free(statement);
92 }
93 
on_session_connect(CassFuture * future,void * data)94 void on_session_connect(CassFuture* future, void* data) {
95   CassSession* session = (CassSession*)data;
96   CassError code = cass_future_error_code(future);
97 
98   if (code != CASS_OK) {
99     print_error(future);
100     signal_exit();
101     return;
102   }
103 
104   execute_query(session,
105                 "CREATE KEYSPACE examples WITH replication = { "
106                 "'class': 'SimpleStrategy', 'replication_factor': '3' };",
107                 on_create_keyspace);
108 }
109 
on_create_keyspace(CassFuture * future,void * data)110 void on_create_keyspace(CassFuture* future, void* data) {
111   CassError code = cass_future_error_code(future);
112   if (code != CASS_OK) {
113     print_error(future);
114   }
115 
116   execute_query((CassSession*)data, "USE examples", on_set_keyspace);
117 }
118 
on_set_keyspace(CassFuture * future,void * data)119 void on_set_keyspace(CassFuture* future, void* data) {
120   CassError code = cass_future_error_code(future);
121   if (code != CASS_OK) {
122     print_error(future);
123   }
124 
125   execute_query((CassSession*)data,
126                 "CREATE TABLE callbacks "
127                 "(key timeuuid PRIMARY KEY, value bigint)",
128                 on_create_table);
129 }
130 
on_create_table(CassFuture * future,void * data)131 void on_create_table(CassFuture* future, void* data) {
132   const char* insert_query = "INSERT INTO callbacks (key, value) "
133                              "VALUES (?, ?)";
134   CassUuid key;
135   CassStatement* statement = NULL;
136   CassFuture* insert_future = NULL;
137 
138   CassError code = cass_future_error_code(future);
139   if (code != CASS_OK) {
140     print_error(future);
141   }
142 
143   statement = cass_statement_new(insert_query, 2);
144 
145   cass_uuid_gen_time(uuid_gen, &key);
146   cass_statement_bind_uuid(statement, 0, key);
147   cass_statement_bind_int64(statement, 1, cass_uuid_timestamp(key));
148 
149   insert_future = cass_session_execute((CassSession*)data, statement);
150 
151   cass_future_set_callback(insert_future, on_insert, data);
152 
153   cass_statement_free(statement);
154   cass_future_free(insert_future);
155 }
156 
on_insert(CassFuture * future,void * data)157 void on_insert(CassFuture* future, void* data) {
158   CassError code = cass_future_error_code(future);
159   if (code != CASS_OK) {
160     print_error(future);
161     signal_exit();
162   } else {
163     const char* select_query = "SELECT * FROM callbacks";
164     CassStatement* statement = cass_statement_new(select_query, 0);
165     CassFuture* select_future = cass_session_execute((CassSession*)data, statement);
166 
167     cass_future_set_callback(select_future, on_select, data);
168 
169     cass_statement_free(statement);
170     cass_future_free(select_future);
171   }
172 }
173 
on_select(CassFuture * future,void * data)174 void on_select(CassFuture* future, void* data) {
175   CassError code = cass_future_error_code(future);
176   if (code != CASS_OK) {
177     print_error(future);
178   } else {
179     const CassResult* result = cass_future_get_result(future);
180     CassIterator* iterator = cass_iterator_from_result(result);
181     while (cass_iterator_next(iterator)) {
182       CassUuid key;
183       char key_str[CASS_UUID_STRING_LENGTH];
184       cass_uint64_t value = 0;
185       const CassRow* row = cass_iterator_get_row(iterator);
186 
187       cass_value_get_uuid(cass_row_get_column(row, 0), &key);
188 
189       cass_uuid_string(key, key_str);
190       cass_value_get_int64(cass_row_get_column(row, 1), (cass_int64_t*)&value);
191 
192       printf("%s, %llu\n", key_str, (unsigned long long)value);
193     }
194     cass_iterator_free(iterator);
195     cass_result_free(result);
196   }
197 
198   signal_exit();
199 }
200 
main(int argc,char * argv[])201 int main(int argc, char* argv[]) {
202   CassCluster* cluster = NULL;
203   CassSession* session = cass_session_new();
204   char* hosts = "127.0.0.1";
205   if (argc > 1) {
206     hosts = argv[1];
207   }
208   cluster = create_cluster(hosts);
209 
210   uuid_gen = cass_uuid_gen_new();
211 
212   uv_mutex_init(&mutex);
213   uv_cond_init(&cond);
214 
215   connect_session(session, cluster, on_session_connect);
216 
217   /* Code running in parallel with queries */
218 
219   wait_exit();
220 
221   uv_cond_destroy(&cond);
222   uv_mutex_destroy(&mutex);
223 
224   cass_cluster_free(cluster);
225   cass_uuid_gen_free(uuid_gen);
226   cass_session_free(session);
227 
228   return 0;
229 }
230