1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "retry_policy.hpp"
18
19 #include "external.hpp"
20 #include "logger.hpp"
21 #include "request.hpp"
22
23 using namespace datastax::internal;
24 using namespace datastax::internal::core;
25
26 extern "C" {
27
cass_retry_policy_default_new()28 CassRetryPolicy* cass_retry_policy_default_new() {
29 RetryPolicy* policy = new DefaultRetryPolicy();
30 policy->inc_ref();
31 return CassRetryPolicy::to(policy);
32 }
33
cass_retry_policy_downgrading_consistency_new()34 CassRetryPolicy* cass_retry_policy_downgrading_consistency_new() {
35 RetryPolicy* policy = new DowngradingConsistencyRetryPolicy();
36 policy->inc_ref();
37 return CassRetryPolicy::to(policy);
38 }
39
cass_retry_policy_fallthrough_new()40 CassRetryPolicy* cass_retry_policy_fallthrough_new() {
41 RetryPolicy* policy = new FallthroughRetryPolicy();
42 policy->inc_ref();
43 return CassRetryPolicy::to(policy);
44 }
45
cass_retry_policy_logging_new(CassRetryPolicy * child_retry_policy)46 CassRetryPolicy* cass_retry_policy_logging_new(CassRetryPolicy* child_retry_policy) {
47 if (child_retry_policy->type() == RetryPolicy::LOGGING) {
48 return NULL;
49 }
50 RetryPolicy* policy = new LoggingRetryPolicy(SharedRefPtr<RetryPolicy>(child_retry_policy));
51 policy->inc_ref();
52 return CassRetryPolicy::to(policy);
53 }
54
cass_retry_policy_free(CassRetryPolicy * policy)55 void cass_retry_policy_free(CassRetryPolicy* policy) { policy->dec_ref(); }
56
57 } // extern "C"
58
max_likely_to_work(int received)59 inline RetryPolicy::RetryDecision max_likely_to_work(int received) {
60 if (received >= 3) {
61 return RetryPolicy::RetryDecision::retry(CASS_CONSISTENCY_THREE);
62 } else if (received == 2) {
63 return RetryPolicy::RetryDecision::retry(CASS_CONSISTENCY_TWO);
64 } else if (received == 1) {
65 return RetryPolicy::RetryDecision::retry(CASS_CONSISTENCY_ONE);
66 } else {
67 return RetryPolicy::RetryDecision::return_error();
68 }
69 }
70
71 // Default retry policy
72
on_read_timeout(const Request * request,CassConsistency cl,int received,int required,bool data_recevied,int num_retries) const73 RetryPolicy::RetryDecision DefaultRetryPolicy::on_read_timeout(const Request* request,
74 CassConsistency cl, int received,
75 int required, bool data_recevied,
76 int num_retries) const {
77 if (num_retries != 0) {
78 return RetryDecision::return_error();
79 }
80
81 if (received >= required && !data_recevied) {
82 return RetryDecision::retry(cl);
83 } else {
84 return RetryDecision::return_error();
85 }
86 }
87
on_write_timeout(const Request * request,CassConsistency cl,int received,int required,CassWriteType write_type,int num_retries) const88 RetryPolicy::RetryDecision DefaultRetryPolicy::on_write_timeout(const Request* request,
89 CassConsistency cl, int received,
90 int required,
91 CassWriteType write_type,
92 int num_retries) const {
93 if (num_retries != 0) {
94 return RetryDecision::return_error();
95 }
96
97 if (write_type == CASS_WRITE_TYPE_BATCH_LOG) {
98 return RetryDecision::retry(cl);
99 } else {
100 return RetryDecision::return_error();
101 }
102 }
103
on_unavailable(const Request * request,CassConsistency cl,int required,int alive,int num_retries) const104 RetryPolicy::RetryDecision DefaultRetryPolicy::on_unavailable(const Request* request,
105 CassConsistency cl, int required,
106 int alive, int num_retries) const {
107 if (num_retries == 0) {
108 return RetryDecision::retry_next_host(cl);
109 } else {
110 return RetryDecision::return_error();
111 }
112 }
113
on_request_error(const Request * request,CassConsistency cl,const ErrorResponse * error,int num_retries) const114 RetryPolicy::RetryDecision DefaultRetryPolicy::on_request_error(const Request* request,
115 CassConsistency cl,
116 const ErrorResponse* error,
117 int num_retries) const {
118 return RetryDecision::retry_next_host(cl);
119 }
120
121 // Downgrading retry policy
122
123 RetryPolicy::RetryDecision
on_read_timeout(const Request * request,CassConsistency cl,int received,int required,bool data_recevied,int num_retries) const124 DowngradingConsistencyRetryPolicy::on_read_timeout(const Request* request, CassConsistency cl,
125 int received, int required, bool data_recevied,
126 int num_retries) const {
127 if (num_retries != 0) {
128 return RetryDecision::return_error();
129 }
130
131 if (cl == CASS_CONSISTENCY_SERIAL || cl == CASS_CONSISTENCY_LOCAL_SERIAL) {
132 return RetryDecision::return_error();
133 }
134
135 if (received < required) {
136 return max_likely_to_work(received);
137 }
138
139 if (!data_recevied) {
140 return RetryDecision::retry(cl);
141 } else {
142 return RetryDecision::return_error();
143 }
144 }
145
on_write_timeout(const Request * request,CassConsistency cl,int received,int required,CassWriteType write_type,int num_retries) const146 RetryPolicy::RetryDecision DowngradingConsistencyRetryPolicy::on_write_timeout(
147 const Request* request, CassConsistency cl, int received, int required,
148 CassWriteType write_type, int num_retries) const {
149 if (num_retries != 0) {
150 return RetryDecision::return_error();
151 }
152
153 switch (write_type) {
154 case CASS_WRITE_TYPE_SIMPLE:
155 case CASS_WRITE_TYPE_BATCH: // Fallthrough intended
156 if (received > 0) {
157 return RetryDecision::ignore();
158 } else {
159 return RetryDecision::return_error();
160 }
161
162 case CASS_WRITE_TYPE_UNLOGGED_BATCH:
163 return max_likely_to_work(received);
164
165 case CASS_WRITE_TYPE_BATCH_LOG:
166 return RetryDecision::retry(cl);
167
168 default:
169 return RetryDecision::return_error();
170 }
171 }
172
173 RetryPolicy::RetryDecision
on_unavailable(const Request * request,CassConsistency cl,int required,int alive,int num_retries) const174 DowngradingConsistencyRetryPolicy::on_unavailable(const Request* request, CassConsistency cl,
175 int required, int alive, int num_retries) const {
176 if (num_retries != 0) {
177 return RetryDecision::return_error();
178 }
179 return max_likely_to_work(alive);
180 }
181
on_request_error(const Request * request,CassConsistency cl,const ErrorResponse * error,int num_retries) const182 RetryPolicy::RetryDecision DowngradingConsistencyRetryPolicy::on_request_error(
183 const Request* request, CassConsistency cl, const ErrorResponse* error, int num_retries) const {
184 return RetryDecision::retry_next_host(cl);
185 }
186
187 // Fallthrough retry policy
188
on_read_timeout(const Request * request,CassConsistency cl,int received,int required,bool data_recevied,int num_retries) const189 RetryPolicy::RetryDecision FallthroughRetryPolicy::on_read_timeout(const Request* request,
190 CassConsistency cl, int received,
191 int required, bool data_recevied,
192 int num_retries) const {
193 return RetryDecision::return_error();
194 }
195
on_write_timeout(const Request * request,CassConsistency cl,int received,int required,CassWriteType write_type,int num_retries) const196 RetryPolicy::RetryDecision FallthroughRetryPolicy::on_write_timeout(const Request* request,
197 CassConsistency cl,
198 int received, int required,
199 CassWriteType write_type,
200 int num_retries) const {
201 return RetryDecision::return_error();
202 }
203
on_unavailable(const Request * request,CassConsistency cl,int required,int alive,int num_retries) const204 RetryPolicy::RetryDecision FallthroughRetryPolicy::on_unavailable(const Request* request,
205 CassConsistency cl, int required,
206 int alive,
207 int num_retries) const {
208 return RetryDecision::return_error();
209 }
210
on_request_error(const Request * request,CassConsistency cl,const ErrorResponse * error,int num_retries) const211 RetryPolicy::RetryDecision FallthroughRetryPolicy::on_request_error(const Request* request,
212 CassConsistency cl,
213 const ErrorResponse* error,
214 int num_retries) const {
215 return RetryDecision::return_error();
216 }
217
218 // Logging retry policy
219
on_read_timeout(const Request * request,CassConsistency cl,int received,int required,bool data_recevied,int num_retries) const220 RetryPolicy::RetryDecision LoggingRetryPolicy::on_read_timeout(const Request* request,
221 CassConsistency cl, int received,
222 int required, bool data_recevied,
223 int num_retries) const {
224 RetryDecision decision =
225 retry_policy_->on_read_timeout(request, cl, received, required, data_recevied, num_retries);
226
227 switch (decision.type()) {
228 case RetryDecision::IGNORE:
229 LOG_INFO("Ignoring read timeout (initial consistency: %s, required responses: %d, received "
230 "responses: %d, data retrieved: %s, retries: %d)",
231 cass_consistency_string(cl), required, received, data_recevied ? "true" : "false",
232 num_retries);
233 break;
234
235 case RetryDecision::RETRY:
236 LOG_INFO("Retrying on read timeout at consistency %s (initial consistency: %s, required "
237 "responses: %d, received responses: %d, data retrieved: %s, retries: %d)",
238 cass_consistency_string(decision.retry_consistency()), cass_consistency_string(cl),
239 required, received, data_recevied ? "true" : "false", num_retries);
240 break;
241
242 default:
243 break;
244 }
245
246 return decision;
247 }
248
on_write_timeout(const Request * request,CassConsistency cl,int received,int required,CassWriteType write_type,int num_retries) const249 RetryPolicy::RetryDecision LoggingRetryPolicy::on_write_timeout(const Request* request,
250 CassConsistency cl, int received,
251 int required,
252 CassWriteType write_type,
253 int num_retries) const {
254 RetryDecision decision =
255 retry_policy_->on_write_timeout(request, cl, received, required, write_type, num_retries);
256
257 switch (decision.type()) {
258 case RetryDecision::IGNORE:
259 LOG_INFO("Ignoring write timeout (initial consistency: %s, required acknowledgments: %d, "
260 "received acknowledgments: %d, write type: %s, retries: %d)",
261 cass_consistency_string(cl), required, received, cass_write_type_string(write_type),
262 num_retries);
263 break;
264
265 case RetryDecision::RETRY:
266 LOG_INFO("Retrying on write timeout at consistency %s (initial consistency: %s, required "
267 "acknowledgments: %d, received acknowledgments: %d, write type: %s, retries: %d)",
268 cass_consistency_string(decision.retry_consistency()), cass_consistency_string(cl),
269 required, received, cass_write_type_string(write_type), num_retries);
270 break;
271
272 default:
273 break;
274 }
275
276 return decision;
277 }
278
on_unavailable(const Request * request,CassConsistency cl,int required,int alive,int num_retries) const279 RetryPolicy::RetryDecision LoggingRetryPolicy::on_unavailable(const Request* request,
280 CassConsistency cl, int required,
281 int alive, int num_retries) const {
282 RetryDecision decision = retry_policy_->on_unavailable(request, cl, required, alive, num_retries);
283
284 switch (decision.type()) {
285 case RetryDecision::IGNORE:
286 LOG_INFO("Ignoring unavailable error (initial consistency: %s, required replica: %d, alive "
287 "replica: %d, retries: %d)",
288 cass_consistency_string(cl), required, alive, num_retries);
289 break;
290
291 case RetryDecision::RETRY:
292 LOG_INFO("Retrying on unavailable error at consistency %s (initial consistency: %s, required "
293 "replica: %d, alive replica: %d, retries: %d)",
294 cass_consistency_string(decision.retry_consistency()), cass_consistency_string(cl),
295 required, alive, num_retries);
296 break;
297
298 default:
299 break;
300 }
301
302 return decision;
303 }
304
on_request_error(const Request * request,CassConsistency cl,const ErrorResponse * error,int num_retries) const305 RetryPolicy::RetryDecision LoggingRetryPolicy::on_request_error(const Request* request,
306 CassConsistency cl,
307 const ErrorResponse* error,
308 int num_retries) const {
309 RetryDecision decision = retry_policy_->on_request_error(request, cl, error, num_retries);
310
311 switch (decision.type()) {
312 case RetryDecision::IGNORE:
313 LOG_INFO("Ignoring request error (initial consistency: %s, error: %s, retries: %d)",
314 cass_consistency_string(cl), error->message().to_string().c_str(), num_retries);
315 break;
316
317 case RetryDecision::RETRY:
318 LOG_INFO("Retrying on request error at consistency %s (initial consistency: %s, error: %s, "
319 "retries: %d)",
320 cass_consistency_string(decision.retry_consistency()), cass_consistency_string(cl),
321 error->message().to_string().c_str(), num_retries);
322 break;
323
324 default:
325 break;
326 }
327
328 return decision;
329 }
330