1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "future.hpp"
18
19 #include "external.hpp"
20 #include "prepared.hpp"
21 #include "request_handler.hpp"
22 #include "result_response.hpp"
23 #include "scoped_ptr.hpp"
24
25 using namespace datastax;
26 using namespace datastax::internal;
27 using namespace datastax::internal::core;
28
29 extern "C" {
30
cass_future_free(CassFuture * future)31 void cass_future_free(CassFuture* future) {
32 // Futures can be deleted without being waited on
33 // because they'll be cleaned up by the notifying thread
34 future->dec_ref();
35 }
36
cass_future_set_callback(CassFuture * future,CassFutureCallback callback,void * data)37 CassError cass_future_set_callback(CassFuture* future, CassFutureCallback callback, void* data) {
38 if (!future->set_callback(callback, data)) {
39 return CASS_ERROR_LIB_CALLBACK_ALREADY_SET;
40 }
41 return CASS_OK;
42 }
43
cass_future_ready(CassFuture * future)44 cass_bool_t cass_future_ready(CassFuture* future) {
45 return static_cast<cass_bool_t>(future->ready());
46 }
47
cass_future_wait(CassFuture * future)48 void cass_future_wait(CassFuture* future) { future->wait(); }
49
cass_future_wait_timed(CassFuture * future,cass_duration_t wait_us)50 cass_bool_t cass_future_wait_timed(CassFuture* future, cass_duration_t wait_us) {
51 return static_cast<cass_bool_t>(future->wait_for(wait_us));
52 }
53
cass_future_get_result(CassFuture * future)54 const CassResult* cass_future_get_result(CassFuture* future) {
55 if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
56 return NULL;
57 }
58
59 Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
60 if (!response || response->opcode() == CQL_OPCODE_ERROR) {
61 return NULL;
62 }
63
64 response->inc_ref();
65 return CassResult::to(static_cast<ResultResponse*>(response.get()));
66 }
67
cass_future_get_prepared(CassFuture * future)68 const CassPrepared* cass_future_get_prepared(CassFuture* future) {
69 if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
70 return NULL;
71 }
72 ResponseFuture* response_future = static_cast<ResponseFuture*>(future->from());
73
74 SharedRefPtr<ResultResponse> result(response_future->response());
75 if (!result || result->kind() != CASS_RESULT_KIND_PREPARED) {
76 return NULL;
77 }
78
79 Prepared* prepared =
80 new Prepared(result, response_future->prepare_request, *response_future->schema_metadata);
81 prepared->inc_ref();
82 return CassPrepared::to(prepared);
83 }
84
cass_future_get_error_result(CassFuture * future)85 const CassErrorResult* cass_future_get_error_result(CassFuture* future) {
86 if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
87 return NULL;
88 }
89
90 Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
91 if (!response || response->opcode() != CQL_OPCODE_ERROR) {
92 return NULL;
93 }
94
95 response->inc_ref();
96 return CassErrorResult::to(static_cast<ErrorResponse*>(response.get()));
97 }
98
cass_future_error_code(CassFuture * future)99 CassError cass_future_error_code(CassFuture* future) {
100 const Future::Error* error = future->error();
101 if (error != NULL) {
102 return error->code;
103 } else {
104 return CASS_OK;
105 }
106 }
107
cass_future_error_message(CassFuture * future,const char ** message,size_t * message_length)108 void cass_future_error_message(CassFuture* future, const char** message, size_t* message_length) {
109 const Future::Error* error = future->error();
110 if (error != NULL) {
111 const String& m = error->message;
112 *message = m.data();
113 *message_length = m.length();
114 } else {
115 *message = "";
116 *message_length = 0;
117 }
118 }
119
cass_future_tracing_id(CassFuture * future,CassUuid * tracing_id)120 CassError cass_future_tracing_id(CassFuture* future, CassUuid* tracing_id) {
121 if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
122 return CASS_ERROR_LIB_INVALID_FUTURE_TYPE;
123 }
124
125 Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
126 if (!response || !response->has_tracing_id()) {
127 return CASS_ERROR_LIB_NO_TRACING_ID;
128 }
129
130 *tracing_id = response->tracing_id();
131
132 return CASS_OK;
133 }
134
cass_future_custom_payload_item_count(CassFuture * future)135 size_t cass_future_custom_payload_item_count(CassFuture* future) {
136 if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
137 return 0;
138 }
139 Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
140 if (!response) return 0;
141 return response->custom_payload().size();
142 }
143
cass_future_custom_payload_item(CassFuture * future,size_t index,const char ** name,size_t * name_length,const cass_byte_t ** value,size_t * value_size)144 CassError cass_future_custom_payload_item(CassFuture* future, size_t index, const char** name,
145 size_t* name_length, const cass_byte_t** value,
146 size_t* value_size) {
147 if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
148 return CASS_ERROR_LIB_INVALID_FUTURE_TYPE;
149 }
150 Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
151 if (!response) return CASS_ERROR_LIB_NO_CUSTOM_PAYLOAD;
152
153 const CustomPayloadVec& custom_payload = response->custom_payload();
154 if (index >= custom_payload.size()) {
155 return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
156 }
157
158 const CustomPayloadItem& item = custom_payload[index];
159 *name = item.name.data();
160 *name_length = item.name.size();
161 *value = reinterpret_cast<const cass_byte_t*>(item.value.data());
162 *value_size = item.value.size();
163 return CASS_OK;
164 }
165
cass_future_coordinator(CassFuture * future)166 const CassNode* cass_future_coordinator(CassFuture* future) {
167 if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
168 return NULL;
169 }
170 const Address& node = static_cast<ResponseFuture*>(future->from())->address();
171 return node.is_valid() ? CassNode::to(&node) : NULL;
172 }
173
174 } // extern "C"
175
set_callback(Future::Callback callback,void * data)176 bool Future::set_callback(Future::Callback callback, void* data) {
177 ScopedMutex lock(&mutex_);
178 if (callback_) {
179 return false; // Callback is already set
180 }
181 callback_ = callback;
182 data_ = data;
183 if (is_set_) {
184 // Run the callback if the future is already set
185 lock.unlock();
186 callback(CassFuture::to(this), data);
187 }
188 return true;
189 }
190
internal_set(ScopedMutex & lock)191 void Future::internal_set(ScopedMutex& lock) {
192 is_set_ = true;
193 if (callback_) {
194 Callback callback = callback_;
195 void* data = data_;
196 lock.unlock();
197 callback(CassFuture::to(this), data);
198 lock.lock();
199 }
200 // Broadcast after we've run the callback so that threads waiting
201 // on this future see the side effects of the callback.
202 uv_cond_broadcast(&cond_);
203 }
204