1 /*
2 This is free and unencumbered software released into the public domain.
3
4 Anyone is free to copy, modify, publish, use, compile, sell, or
5 distribute this software, either in source code form or as a compiled
6 binary, for any purpose, commercial or non-commercial, and by any
7 means.
8
9 In jurisdictions that recognize copyright laws, the author or authors
10 of this software dedicate any and all copyright interest in the
11 software to the public domain. We make this dedication for the benefit
12 of the public at large and to the detriment of our heirs and
13 successors. We intend this dedication to be an overt act of
14 relinquishment in perpetuity of all present and future rights to this
15 software under copyright law.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20 IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21 OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23 OTHER DEALINGS IN THE SOFTWARE.
24
25 For more information, please refer to <http://unlicense.org/>
26 */
27
28 #include <assert.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32
33 #include <uv.h>
34
35 #include "cassandra.h"
36
37 uv_mutex_t mutex;
38 uv_cond_t cond;
39 int exit_flag = 0;
40 CassUuidGen* uuid_gen = NULL;
41
wait_exit()42 void wait_exit() {
43 uv_mutex_lock(&mutex);
44 while (exit_flag == 0) {
45 uv_cond_wait(&cond, &mutex);
46 }
47 uv_mutex_unlock(&mutex);
48 }
49
signal_exit()50 void signal_exit() {
51 uv_mutex_lock(&mutex);
52 exit_flag = 1;
53 uv_cond_signal(&cond);
54 uv_mutex_unlock(&mutex);
55 }
56
57 void on_set_keyspace(CassFuture* future, void* data);
58 void on_create_keyspace(CassFuture* future, void* data);
59 void on_create_table(CassFuture* future, void* data);
60
61 void on_insert(CassFuture* future, void* data);
62 void on_select(CassFuture* future, void* data);
63
64 void on_session_connect(CassFuture* future, void* data);
65
print_error(CassFuture * future)66 void print_error(CassFuture* future) {
67 const char* message;
68 size_t message_length;
69 cass_future_error_message(future, &message, &message_length);
70 fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
71 }
72
create_cluster(const char * hosts)73 CassCluster* create_cluster(const char* hosts) {
74 CassCluster* cluster = cass_cluster_new();
75 cass_cluster_set_contact_points(cluster, hosts);
76 return cluster;
77 }
78
connect_session(CassSession * session,const CassCluster * cluster,CassFutureCallback callback)79 void connect_session(CassSession* session, const CassCluster* cluster,
80 CassFutureCallback callback) {
81 CassFuture* future = cass_session_connect(session, cluster);
82 cass_future_set_callback(future, callback, session);
83 cass_future_free(future);
84 }
85
execute_query(CassSession * session,const char * query,CassFutureCallback callback)86 void execute_query(CassSession* session, const char* query, CassFutureCallback callback) {
87 CassStatement* statement = cass_statement_new(query, 0);
88 CassFuture* future = cass_session_execute(session, statement);
89 cass_future_set_callback(future, callback, session);
90 cass_future_free(future);
91 cass_statement_free(statement);
92 }
93
on_session_connect(CassFuture * future,void * data)94 void on_session_connect(CassFuture* future, void* data) {
95 CassSession* session = (CassSession*)data;
96 CassError code = cass_future_error_code(future);
97
98 if (code != CASS_OK) {
99 print_error(future);
100 signal_exit();
101 return;
102 }
103
104 execute_query(session,
105 "CREATE KEYSPACE examples WITH replication = { "
106 "'class': 'SimpleStrategy', 'replication_factor': '3' };",
107 on_create_keyspace);
108 }
109
on_create_keyspace(CassFuture * future,void * data)110 void on_create_keyspace(CassFuture* future, void* data) {
111 CassError code = cass_future_error_code(future);
112 if (code != CASS_OK) {
113 print_error(future);
114 }
115
116 execute_query((CassSession*)data, "USE examples", on_set_keyspace);
117 }
118
on_set_keyspace(CassFuture * future,void * data)119 void on_set_keyspace(CassFuture* future, void* data) {
120 CassError code = cass_future_error_code(future);
121 if (code != CASS_OK) {
122 print_error(future);
123 }
124
125 execute_query((CassSession*)data,
126 "CREATE TABLE callbacks "
127 "(key timeuuid PRIMARY KEY, value bigint)",
128 on_create_table);
129 }
130
on_create_table(CassFuture * future,void * data)131 void on_create_table(CassFuture* future, void* data) {
132 const char* insert_query = "INSERT INTO callbacks (key, value) "
133 "VALUES (?, ?)";
134 CassUuid key;
135 CassStatement* statement = NULL;
136 CassFuture* insert_future = NULL;
137
138 CassError code = cass_future_error_code(future);
139 if (code != CASS_OK) {
140 print_error(future);
141 }
142
143 statement = cass_statement_new(insert_query, 2);
144
145 cass_uuid_gen_time(uuid_gen, &key);
146 cass_statement_bind_uuid(statement, 0, key);
147 cass_statement_bind_int64(statement, 1, cass_uuid_timestamp(key));
148
149 insert_future = cass_session_execute((CassSession*)data, statement);
150
151 cass_future_set_callback(insert_future, on_insert, data);
152
153 cass_statement_free(statement);
154 cass_future_free(insert_future);
155 }
156
on_insert(CassFuture * future,void * data)157 void on_insert(CassFuture* future, void* data) {
158 CassError code = cass_future_error_code(future);
159 if (code != CASS_OK) {
160 print_error(future);
161 signal_exit();
162 } else {
163 const char* select_query = "SELECT * FROM callbacks";
164 CassStatement* statement = cass_statement_new(select_query, 0);
165 CassFuture* select_future = cass_session_execute((CassSession*)data, statement);
166
167 cass_future_set_callback(select_future, on_select, data);
168
169 cass_statement_free(statement);
170 cass_future_free(select_future);
171 }
172 }
173
on_select(CassFuture * future,void * data)174 void on_select(CassFuture* future, void* data) {
175 CassError code = cass_future_error_code(future);
176 if (code != CASS_OK) {
177 print_error(future);
178 } else {
179 const CassResult* result = cass_future_get_result(future);
180 CassIterator* iterator = cass_iterator_from_result(result);
181 while (cass_iterator_next(iterator)) {
182 CassUuid key;
183 char key_str[CASS_UUID_STRING_LENGTH];
184 cass_uint64_t value = 0;
185 const CassRow* row = cass_iterator_get_row(iterator);
186
187 cass_value_get_uuid(cass_row_get_column(row, 0), &key);
188
189 cass_uuid_string(key, key_str);
190 cass_value_get_int64(cass_row_get_column(row, 1), (cass_int64_t*)&value);
191
192 printf("%s, %llu\n", key_str, (unsigned long long)value);
193 }
194 cass_iterator_free(iterator);
195 cass_result_free(result);
196 }
197
198 signal_exit();
199 }
200
main(int argc,char * argv[])201 int main(int argc, char* argv[]) {
202 CassCluster* cluster = NULL;
203 CassSession* session = cass_session_new();
204 char* hosts = "127.0.0.1";
205 if (argc > 1) {
206 hosts = argv[1];
207 }
208 cluster = create_cluster(hosts);
209
210 uuid_gen = cass_uuid_gen_new();
211
212 uv_mutex_init(&mutex);
213 uv_cond_init(&cond);
214
215 connect_session(session, cluster, on_session_connect);
216
217 /* Code running in parallel with queries */
218
219 wait_exit();
220
221 uv_cond_destroy(&cond);
222 uv_mutex_destroy(&mutex);
223
224 cass_cluster_free(cluster);
225 cass_uuid_gen_free(uuid_gen);
226 cass_session_free(session);
227
228 return 0;
229 }
230