1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "retry_policy.hpp"
18 
19 #include "external.hpp"
20 #include "logger.hpp"
21 #include "request.hpp"
22 
23 using namespace datastax::internal;
24 using namespace datastax::internal::core;
25 
26 extern "C" {
27 
cass_retry_policy_default_new()28 CassRetryPolicy* cass_retry_policy_default_new() {
29   RetryPolicy* policy = new DefaultRetryPolicy();
30   policy->inc_ref();
31   return CassRetryPolicy::to(policy);
32 }
33 
cass_retry_policy_downgrading_consistency_new()34 CassRetryPolicy* cass_retry_policy_downgrading_consistency_new() {
35   RetryPolicy* policy = new DowngradingConsistencyRetryPolicy();
36   policy->inc_ref();
37   return CassRetryPolicy::to(policy);
38 }
39 
cass_retry_policy_fallthrough_new()40 CassRetryPolicy* cass_retry_policy_fallthrough_new() {
41   RetryPolicy* policy = new FallthroughRetryPolicy();
42   policy->inc_ref();
43   return CassRetryPolicy::to(policy);
44 }
45 
cass_retry_policy_logging_new(CassRetryPolicy * child_retry_policy)46 CassRetryPolicy* cass_retry_policy_logging_new(CassRetryPolicy* child_retry_policy) {
47   if (child_retry_policy->type() == RetryPolicy::LOGGING) {
48     return NULL;
49   }
50   RetryPolicy* policy = new LoggingRetryPolicy(SharedRefPtr<RetryPolicy>(child_retry_policy));
51   policy->inc_ref();
52   return CassRetryPolicy::to(policy);
53 }
54 
cass_retry_policy_free(CassRetryPolicy * policy)55 void cass_retry_policy_free(CassRetryPolicy* policy) { policy->dec_ref(); }
56 
57 } // extern "C"
58 
max_likely_to_work(int received)59 inline RetryPolicy::RetryDecision max_likely_to_work(int received) {
60   if (received >= 3) {
61     return RetryPolicy::RetryDecision::retry(CASS_CONSISTENCY_THREE);
62   } else if (received == 2) {
63     return RetryPolicy::RetryDecision::retry(CASS_CONSISTENCY_TWO);
64   } else if (received == 1) {
65     return RetryPolicy::RetryDecision::retry(CASS_CONSISTENCY_ONE);
66   } else {
67     return RetryPolicy::RetryDecision::return_error();
68   }
69 }
70 
71 // Default retry policy
72 
on_read_timeout(const Request * request,CassConsistency cl,int received,int required,bool data_recevied,int num_retries) const73 RetryPolicy::RetryDecision DefaultRetryPolicy::on_read_timeout(const Request* request,
74                                                                CassConsistency cl, int received,
75                                                                int required, bool data_recevied,
76                                                                int num_retries) const {
77   if (num_retries != 0) {
78     return RetryDecision::return_error();
79   }
80 
81   if (received >= required && !data_recevied) {
82     return RetryDecision::retry(cl);
83   } else {
84     return RetryDecision::return_error();
85   }
86 }
87 
on_write_timeout(const Request * request,CassConsistency cl,int received,int required,CassWriteType write_type,int num_retries) const88 RetryPolicy::RetryDecision DefaultRetryPolicy::on_write_timeout(const Request* request,
89                                                                 CassConsistency cl, int received,
90                                                                 int required,
91                                                                 CassWriteType write_type,
92                                                                 int num_retries) const {
93   if (num_retries != 0) {
94     return RetryDecision::return_error();
95   }
96 
97   if (write_type == CASS_WRITE_TYPE_BATCH_LOG) {
98     return RetryDecision::retry(cl);
99   } else {
100     return RetryDecision::return_error();
101   }
102 }
103 
on_unavailable(const Request * request,CassConsistency cl,int required,int alive,int num_retries) const104 RetryPolicy::RetryDecision DefaultRetryPolicy::on_unavailable(const Request* request,
105                                                               CassConsistency cl, int required,
106                                                               int alive, int num_retries) const {
107   if (num_retries == 0) {
108     return RetryDecision::retry_next_host(cl);
109   } else {
110     return RetryDecision::return_error();
111   }
112 }
113 
on_request_error(const Request * request,CassConsistency cl,const ErrorResponse * error,int num_retries) const114 RetryPolicy::RetryDecision DefaultRetryPolicy::on_request_error(const Request* request,
115                                                                 CassConsistency cl,
116                                                                 const ErrorResponse* error,
117                                                                 int num_retries) const {
118   return RetryDecision::retry_next_host(cl);
119 }
120 
121 // Downgrading retry policy
122 
123 RetryPolicy::RetryDecision
on_read_timeout(const Request * request,CassConsistency cl,int received,int required,bool data_recevied,int num_retries) const124 DowngradingConsistencyRetryPolicy::on_read_timeout(const Request* request, CassConsistency cl,
125                                                    int received, int required, bool data_recevied,
126                                                    int num_retries) const {
127   if (num_retries != 0) {
128     return RetryDecision::return_error();
129   }
130 
131   if (cl == CASS_CONSISTENCY_SERIAL || cl == CASS_CONSISTENCY_LOCAL_SERIAL) {
132     return RetryDecision::return_error();
133   }
134 
135   if (received < required) {
136     return max_likely_to_work(received);
137   }
138 
139   if (!data_recevied) {
140     return RetryDecision::retry(cl);
141   } else {
142     return RetryDecision::return_error();
143   }
144 }
145 
on_write_timeout(const Request * request,CassConsistency cl,int received,int required,CassWriteType write_type,int num_retries) const146 RetryPolicy::RetryDecision DowngradingConsistencyRetryPolicy::on_write_timeout(
147     const Request* request, CassConsistency cl, int received, int required,
148     CassWriteType write_type, int num_retries) const {
149   if (num_retries != 0) {
150     return RetryDecision::return_error();
151   }
152 
153   switch (write_type) {
154     case CASS_WRITE_TYPE_SIMPLE:
155     case CASS_WRITE_TYPE_BATCH: // Fallthrough intended
156       if (received > 0) {
157         return RetryDecision::ignore();
158       } else {
159         return RetryDecision::return_error();
160       }
161 
162     case CASS_WRITE_TYPE_UNLOGGED_BATCH:
163       return max_likely_to_work(received);
164 
165     case CASS_WRITE_TYPE_BATCH_LOG:
166       return RetryDecision::retry(cl);
167 
168     default:
169       return RetryDecision::return_error();
170   }
171 }
172 
173 RetryPolicy::RetryDecision
on_unavailable(const Request * request,CassConsistency cl,int required,int alive,int num_retries) const174 DowngradingConsistencyRetryPolicy::on_unavailable(const Request* request, CassConsistency cl,
175                                                   int required, int alive, int num_retries) const {
176   if (num_retries != 0) {
177     return RetryDecision::return_error();
178   }
179   return max_likely_to_work(alive);
180 }
181 
on_request_error(const Request * request,CassConsistency cl,const ErrorResponse * error,int num_retries) const182 RetryPolicy::RetryDecision DowngradingConsistencyRetryPolicy::on_request_error(
183     const Request* request, CassConsistency cl, const ErrorResponse* error, int num_retries) const {
184   return RetryDecision::retry_next_host(cl);
185 }
186 
187 // Fallthrough retry policy
188 
on_read_timeout(const Request * request,CassConsistency cl,int received,int required,bool data_recevied,int num_retries) const189 RetryPolicy::RetryDecision FallthroughRetryPolicy::on_read_timeout(const Request* request,
190                                                                    CassConsistency cl, int received,
191                                                                    int required, bool data_recevied,
192                                                                    int num_retries) const {
193   return RetryDecision::return_error();
194 }
195 
on_write_timeout(const Request * request,CassConsistency cl,int received,int required,CassWriteType write_type,int num_retries) const196 RetryPolicy::RetryDecision FallthroughRetryPolicy::on_write_timeout(const Request* request,
197                                                                     CassConsistency cl,
198                                                                     int received, int required,
199                                                                     CassWriteType write_type,
200                                                                     int num_retries) const {
201   return RetryDecision::return_error();
202 }
203 
on_unavailable(const Request * request,CassConsistency cl,int required,int alive,int num_retries) const204 RetryPolicy::RetryDecision FallthroughRetryPolicy::on_unavailable(const Request* request,
205                                                                   CassConsistency cl, int required,
206                                                                   int alive,
207                                                                   int num_retries) const {
208   return RetryDecision::return_error();
209 }
210 
on_request_error(const Request * request,CassConsistency cl,const ErrorResponse * error,int num_retries) const211 RetryPolicy::RetryDecision FallthroughRetryPolicy::on_request_error(const Request* request,
212                                                                     CassConsistency cl,
213                                                                     const ErrorResponse* error,
214                                                                     int num_retries) const {
215   return RetryDecision::return_error();
216 }
217 
218 // Logging retry policy
219 
on_read_timeout(const Request * request,CassConsistency cl,int received,int required,bool data_recevied,int num_retries) const220 RetryPolicy::RetryDecision LoggingRetryPolicy::on_read_timeout(const Request* request,
221                                                                CassConsistency cl, int received,
222                                                                int required, bool data_recevied,
223                                                                int num_retries) const {
224   RetryDecision decision =
225       retry_policy_->on_read_timeout(request, cl, received, required, data_recevied, num_retries);
226 
227   switch (decision.type()) {
228     case RetryDecision::IGNORE:
229       LOG_INFO("Ignoring read timeout (initial consistency: %s, required responses: %d, received "
230                "responses: %d, data retrieved: %s, retries: %d)",
231                cass_consistency_string(cl), required, received, data_recevied ? "true" : "false",
232                num_retries);
233       break;
234 
235     case RetryDecision::RETRY:
236       LOG_INFO("Retrying on read timeout at consistency %s (initial consistency: %s, required "
237                "responses: %d, received responses: %d, data retrieved: %s, retries: %d)",
238                cass_consistency_string(decision.retry_consistency()), cass_consistency_string(cl),
239                required, received, data_recevied ? "true" : "false", num_retries);
240       break;
241 
242     default:
243       break;
244   }
245 
246   return decision;
247 }
248 
on_write_timeout(const Request * request,CassConsistency cl,int received,int required,CassWriteType write_type,int num_retries) const249 RetryPolicy::RetryDecision LoggingRetryPolicy::on_write_timeout(const Request* request,
250                                                                 CassConsistency cl, int received,
251                                                                 int required,
252                                                                 CassWriteType write_type,
253                                                                 int num_retries) const {
254   RetryDecision decision =
255       retry_policy_->on_write_timeout(request, cl, received, required, write_type, num_retries);
256 
257   switch (decision.type()) {
258     case RetryDecision::IGNORE:
259       LOG_INFO("Ignoring write timeout (initial consistency: %s, required acknowledgments: %d, "
260                "received acknowledgments: %d, write type: %s, retries: %d)",
261                cass_consistency_string(cl), required, received, cass_write_type_string(write_type),
262                num_retries);
263       break;
264 
265     case RetryDecision::RETRY:
266       LOG_INFO("Retrying on write timeout at consistency %s (initial consistency: %s, required "
267                "acknowledgments: %d, received acknowledgments: %d, write type: %s, retries: %d)",
268                cass_consistency_string(decision.retry_consistency()), cass_consistency_string(cl),
269                required, received, cass_write_type_string(write_type), num_retries);
270       break;
271 
272     default:
273       break;
274   }
275 
276   return decision;
277 }
278 
on_unavailable(const Request * request,CassConsistency cl,int required,int alive,int num_retries) const279 RetryPolicy::RetryDecision LoggingRetryPolicy::on_unavailable(const Request* request,
280                                                               CassConsistency cl, int required,
281                                                               int alive, int num_retries) const {
282   RetryDecision decision = retry_policy_->on_unavailable(request, cl, required, alive, num_retries);
283 
284   switch (decision.type()) {
285     case RetryDecision::IGNORE:
286       LOG_INFO("Ignoring unavailable error (initial consistency: %s, required replica: %d, alive "
287                "replica: %d, retries: %d)",
288                cass_consistency_string(cl), required, alive, num_retries);
289       break;
290 
291     case RetryDecision::RETRY:
292       LOG_INFO("Retrying on unavailable error at consistency %s (initial consistency: %s, required "
293                "replica: %d, alive replica: %d, retries: %d)",
294                cass_consistency_string(decision.retry_consistency()), cass_consistency_string(cl),
295                required, alive, num_retries);
296       break;
297 
298     default:
299       break;
300   }
301 
302   return decision;
303 }
304 
on_request_error(const Request * request,CassConsistency cl,const ErrorResponse * error,int num_retries) const305 RetryPolicy::RetryDecision LoggingRetryPolicy::on_request_error(const Request* request,
306                                                                 CassConsistency cl,
307                                                                 const ErrorResponse* error,
308                                                                 int num_retries) const {
309   RetryDecision decision = retry_policy_->on_request_error(request, cl, error, num_retries);
310 
311   switch (decision.type()) {
312     case RetryDecision::IGNORE:
313       LOG_INFO("Ignoring request error (initial consistency: %s, error: %s, retries: %d)",
314                cass_consistency_string(cl), error->message().to_string().c_str(), num_retries);
315       break;
316 
317     case RetryDecision::RETRY:
318       LOG_INFO("Retrying on request error at consistency %s (initial consistency: %s, error: %s, "
319                "retries: %d)",
320                cass_consistency_string(decision.retry_consistency()), cass_consistency_string(cl),
321                error->message().to_string().c_str(), num_retries);
322       break;
323 
324     default:
325       break;
326   }
327 
328   return decision;
329 }
330