1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "future.hpp"
18 
19 #include "external.hpp"
20 #include "prepared.hpp"
21 #include "request_handler.hpp"
22 #include "result_response.hpp"
23 #include "scoped_ptr.hpp"
24 
25 using namespace datastax;
26 using namespace datastax::internal;
27 using namespace datastax::internal::core;
28 
29 extern "C" {
30 
cass_future_free(CassFuture * future)31 void cass_future_free(CassFuture* future) {
32   // Futures can be deleted without being waited on
33   // because they'll be cleaned up by the notifying thread
34   future->dec_ref();
35 }
36 
cass_future_set_callback(CassFuture * future,CassFutureCallback callback,void * data)37 CassError cass_future_set_callback(CassFuture* future, CassFutureCallback callback, void* data) {
38   if (!future->set_callback(callback, data)) {
39     return CASS_ERROR_LIB_CALLBACK_ALREADY_SET;
40   }
41   return CASS_OK;
42 }
43 
cass_future_ready(CassFuture * future)44 cass_bool_t cass_future_ready(CassFuture* future) {
45   return static_cast<cass_bool_t>(future->ready());
46 }
47 
cass_future_wait(CassFuture * future)48 void cass_future_wait(CassFuture* future) { future->wait(); }
49 
cass_future_wait_timed(CassFuture * future,cass_duration_t wait_us)50 cass_bool_t cass_future_wait_timed(CassFuture* future, cass_duration_t wait_us) {
51   return static_cast<cass_bool_t>(future->wait_for(wait_us));
52 }
53 
cass_future_get_result(CassFuture * future)54 const CassResult* cass_future_get_result(CassFuture* future) {
55   if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
56     return NULL;
57   }
58 
59   Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
60   if (!response || response->opcode() == CQL_OPCODE_ERROR) {
61     return NULL;
62   }
63 
64   response->inc_ref();
65   return CassResult::to(static_cast<ResultResponse*>(response.get()));
66 }
67 
cass_future_get_prepared(CassFuture * future)68 const CassPrepared* cass_future_get_prepared(CassFuture* future) {
69   if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
70     return NULL;
71   }
72   ResponseFuture* response_future = static_cast<ResponseFuture*>(future->from());
73 
74   SharedRefPtr<ResultResponse> result(response_future->response());
75   if (!result || result->kind() != CASS_RESULT_KIND_PREPARED) {
76     return NULL;
77   }
78 
79   Prepared* prepared =
80       new Prepared(result, response_future->prepare_request, *response_future->schema_metadata);
81   prepared->inc_ref();
82   return CassPrepared::to(prepared);
83 }
84 
cass_future_get_error_result(CassFuture * future)85 const CassErrorResult* cass_future_get_error_result(CassFuture* future) {
86   if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
87     return NULL;
88   }
89 
90   Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
91   if (!response || response->opcode() != CQL_OPCODE_ERROR) {
92     return NULL;
93   }
94 
95   response->inc_ref();
96   return CassErrorResult::to(static_cast<ErrorResponse*>(response.get()));
97 }
98 
cass_future_error_code(CassFuture * future)99 CassError cass_future_error_code(CassFuture* future) {
100   const Future::Error* error = future->error();
101   if (error != NULL) {
102     return error->code;
103   } else {
104     return CASS_OK;
105   }
106 }
107 
cass_future_error_message(CassFuture * future,const char ** message,size_t * message_length)108 void cass_future_error_message(CassFuture* future, const char** message, size_t* message_length) {
109   const Future::Error* error = future->error();
110   if (error != NULL) {
111     const String& m = error->message;
112     *message = m.data();
113     *message_length = m.length();
114   } else {
115     *message = "";
116     *message_length = 0;
117   }
118 }
119 
cass_future_tracing_id(CassFuture * future,CassUuid * tracing_id)120 CassError cass_future_tracing_id(CassFuture* future, CassUuid* tracing_id) {
121   if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
122     return CASS_ERROR_LIB_INVALID_FUTURE_TYPE;
123   }
124 
125   Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
126   if (!response || !response->has_tracing_id()) {
127     return CASS_ERROR_LIB_NO_TRACING_ID;
128   }
129 
130   *tracing_id = response->tracing_id();
131 
132   return CASS_OK;
133 }
134 
cass_future_custom_payload_item_count(CassFuture * future)135 size_t cass_future_custom_payload_item_count(CassFuture* future) {
136   if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
137     return 0;
138   }
139   Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
140   if (!response) return 0;
141   return response->custom_payload().size();
142 }
143 
cass_future_custom_payload_item(CassFuture * future,size_t index,const char ** name,size_t * name_length,const cass_byte_t ** value,size_t * value_size)144 CassError cass_future_custom_payload_item(CassFuture* future, size_t index, const char** name,
145                                           size_t* name_length, const cass_byte_t** value,
146                                           size_t* value_size) {
147   if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
148     return CASS_ERROR_LIB_INVALID_FUTURE_TYPE;
149   }
150   Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
151   if (!response) return CASS_ERROR_LIB_NO_CUSTOM_PAYLOAD;
152 
153   const CustomPayloadVec& custom_payload = response->custom_payload();
154   if (index >= custom_payload.size()) {
155     return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
156   }
157 
158   const CustomPayloadItem& item = custom_payload[index];
159   *name = item.name.data();
160   *name_length = item.name.size();
161   *value = reinterpret_cast<const cass_byte_t*>(item.value.data());
162   *value_size = item.value.size();
163   return CASS_OK;
164 }
165 
cass_future_coordinator(CassFuture * future)166 const CassNode* cass_future_coordinator(CassFuture* future) {
167   if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
168     return NULL;
169   }
170   const Address& node = static_cast<ResponseFuture*>(future->from())->address();
171   return node.is_valid() ? CassNode::to(&node) : NULL;
172 }
173 
174 } // extern "C"
175 
set_callback(Future::Callback callback,void * data)176 bool Future::set_callback(Future::Callback callback, void* data) {
177   ScopedMutex lock(&mutex_);
178   if (callback_) {
179     return false; // Callback is already set
180   }
181   callback_ = callback;
182   data_ = data;
183   if (is_set_) {
184     // Run the callback if the future is already set
185     lock.unlock();
186     callback(CassFuture::to(this), data);
187   }
188   return true;
189 }
190 
internal_set(ScopedMutex & lock)191 void Future::internal_set(ScopedMutex& lock) {
192   is_set_ = true;
193   if (callback_) {
194     Callback callback = callback_;
195     void* data = data_;
196     lock.unlock();
197     callback(CassFuture::to(this), data);
198     lock.lock();
199   }
200   // Broadcast after we've run the callback so that threads waiting
201   // on this future see the side effects of the callback.
202   uv_cond_broadcast(&cond_);
203 }
204