1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "apr_redis.h"
18 #include "apr_poll.h"
19 #include "apr_version.h"
20 #include <stdlib.h>
21 #include <string.h>
22
23 #define BUFFER_SIZE 512
24 #define LILBUFF_SIZE 64
25 struct apr_redis_conn_t
26 {
27 char *buffer;
28 apr_size_t blen;
29 apr_pool_t *p;
30 apr_pool_t *tp;
31 apr_socket_t *sock;
32 apr_bucket_brigade *bb;
33 apr_bucket_brigade *tb;
34 apr_redis_server_t *rs;
35 };
36
37 /* Strings for Client Commands */
38
39 #define RC_EOL "\r\n"
40 #define RC_EOL_LEN (sizeof(RC_EOL)-1)
41
42 #define RC_WS " "
43 #define RC_WS_LEN (sizeof(RC_WS)-1)
44
45 #define RC_RESP_1 "*1\r\n"
46 #define RC_RESP_1_LEN (sizeof(RC_RESP_1)-1)
47
48 #define RC_RESP_2 "*2\r\n"
49 #define RC_RESP_2_LEN (sizeof(RC_RESP_2)-1)
50
51 #define RC_RESP_3 "*3\r\n"
52 #define RC_RESP_3_LEN (sizeof(RC_RESP_3)-1)
53
54 #define RC_RESP_4 "*4\r\n"
55 #define RC_RESP_4_LEN (sizeof(RC_RESP_4)-1)
56
57 #define RC_GET "GET\r\n"
58 #define RC_GET_LEN (sizeof(RC_GET)-1)
59
60 #define RC_GET_SIZE "$3\r\n"
61 #define RC_GET_SIZE_LEN (sizeof(RC_GET_SIZE)-1)
62
63 #define RC_SET "SET\r\n"
64 #define RC_SET_LEN (sizeof(RC_SET)-1)
65
66 #define RC_SET_SIZE "$3\r\n"
67 #define RC_SET_SIZE_LEN (sizeof(RC_SET_SIZE)-1)
68
69 #define RC_SETEX "SETEX\r\n"
70 #define RC_SETEX_LEN (sizeof(RC_SETEX)-1)
71
72 #define RC_SETEX_SIZE "$5\r\n"
73 #define RC_SETEX_SIZE_LEN (sizeof(RC_SETEX_SIZE)-1)
74
75 #define RC_DEL "DEL\r\n"
76 #define RC_DEL_LEN (sizeof(RC_DEL)-1)
77
78 #define RC_DEL_SIZE "$3\r\n"
79 #define RC_DEL_SIZE_LEN (sizeof(RC_DEL_SIZE)-1)
80
81 #define RC_QUIT "QUIT\r\n"
82 #define RC_QUIT_LEN (sizeof(RC_QUIT)-1)
83
84 #define RC_QUIT_SIZE "$4\r\n"
85 #define RC_QUIT_SIZE_LEN (sizeof(RC_QUIT_SIZE)-1)
86
87 #define RC_PING "PING\r\n"
88 #define RC_PING_LEN (sizeof(RC_PING)-1)
89
90 #define RC_PING_SIZE "$4\r\n"
91 #define RC_PING_SIZE_LEN (sizeof(RC_PING_SIZE)-1)
92
93 #define RC_INFO "INFO\r\n"
94 #define RC_INFO_LEN (sizeof(RC_INFO)-1)
95
96 #define RC_INFO_SIZE "$4\r\n"
97 #define RC_INFO_SIZE_LEN (sizeof(RC_INFO_SIZE)-1)
98
99 /* Strings for Server Replies */
100
101 #define RS_STORED "+OK"
102 #define RS_STORED_LEN (sizeof(RS_STORED)-1)
103
104 #define RS_NOT_STORED "$-1"
105 #define RS_NOT_STORED_LEN (sizeof(RS_NOT_STORED)-1)
106
107 #define RS_DELETED ":1"
108 #define RS_DELETED_LEN (sizeof(RS_DELETED)-1)
109
110 #define RS_NOT_FOUND_GET "$-1"
111 #define RS_NOT_FOUND_GET_LEN (sizeof(RS_NOT_FOUND_GET)-1)
112
113 #define RS_NOT_FOUND_DEL ":0"
114 #define RS_NOT_FOUND_DEL_LEN (sizeof(RS_NOT_FOUND_DEL)-1)
115
116 #define RS_TYPE_STRING "$"
117 #define RS_TYPE_STRING_LEN (sizeof(RS_TYPE_STRING)-1)
118
119 #define RS_END "\r\n"
120 #define RS_END_LEN (sizeof(RS_END)-1)
121
make_server_dead(apr_redis_t * rc,apr_redis_server_t * rs)122 static apr_status_t make_server_dead(apr_redis_t *rc,
123 apr_redis_server_t *rs)
124 {
125 #if APR_HAS_THREADS
126 apr_thread_mutex_lock(rs->lock);
127 #endif
128 rs->status = APR_RC_SERVER_DEAD;
129 rs->btime = apr_time_now();
130 #if APR_HAS_THREADS
131 apr_thread_mutex_unlock(rs->lock);
132 #endif
133 return APR_SUCCESS;
134 }
135
make_server_live(apr_redis_t * rc,apr_redis_server_t * rs)136 static apr_status_t make_server_live(apr_redis_t *rc,
137 apr_redis_server_t *rs)
138 {
139 rs->status = APR_RC_SERVER_LIVE;
140 return APR_SUCCESS;
141 }
142
apr_redis_add_server(apr_redis_t * rc,apr_redis_server_t * rs)143 APU_DECLARE(apr_status_t) apr_redis_add_server(apr_redis_t *rc,
144 apr_redis_server_t *rs)
145 {
146 apr_status_t rv = APR_SUCCESS;
147
148 if (rc->ntotal >= rc->nalloc) {
149 return APR_ENOMEM;
150 }
151 rc->live_servers[rc->ntotal] = rs;
152 rc->ntotal++;
153 make_server_live(rc, rs);
154 return rv;
155 }
156
157 APU_DECLARE(apr_redis_server_t *)
apr_redis_find_server_hash(apr_redis_t * rc,const apr_uint32_t hash)158 apr_redis_find_server_hash(apr_redis_t *rc, const apr_uint32_t hash)
159 {
160 if (rc->server_func) {
161 return rc->server_func(rc->server_baton, rc, hash);
162 }
163 else {
164 return apr_redis_find_server_hash_default(NULL, rc, hash);
165 }
166 }
167
168 APU_DECLARE(apr_redis_server_t *)
apr_redis_find_server_hash_default(void * baton,apr_redis_t * rc,const apr_uint32_t hash)169 apr_redis_find_server_hash_default(void *baton, apr_redis_t *rc,
170 const apr_uint32_t hash)
171 {
172 apr_redis_server_t *rs = NULL;
173 apr_uint32_t h = hash ? hash : 1;
174 apr_uint32_t i = 0;
175 apr_time_t curtime = 0;
176
177 if (rc->ntotal == 0) {
178 return NULL;
179 }
180
181 do {
182 rs = rc->live_servers[h % rc->ntotal];
183 if (rs->status == APR_RC_SERVER_LIVE) {
184 break;
185 }
186 else {
187 if (curtime == 0) {
188 curtime = apr_time_now();
189 }
190 #if APR_HAS_THREADS
191 apr_thread_mutex_lock(rs->lock);
192 #endif
193 /* Try the dead server, every 5 seconds */
194 if (curtime - rs->btime > apr_time_from_sec(5)) {
195 rs->btime = curtime;
196 if (apr_redis_ping(rs) == APR_SUCCESS) {
197 make_server_live(rc, rs);
198 #if APR_HAS_THREADS
199 apr_thread_mutex_unlock(rs->lock);
200 #endif
201 break;
202 }
203 }
204 #if APR_HAS_THREADS
205 apr_thread_mutex_unlock(rs->lock);
206 #endif
207 }
208 h++;
209 i++;
210 } while (i < rc->ntotal);
211
212 if (i == rc->ntotal) {
213 rs = NULL;
214 }
215
216 return rs;
217 }
218
apr_redis_find_server(apr_redis_t * rc,const char * host,apr_port_t port)219 APU_DECLARE(apr_redis_server_t *) apr_redis_find_server(apr_redis_t *rc,
220 const char *host,
221 apr_port_t port)
222 {
223 int i;
224
225 for (i = 0; i < rc->ntotal; i++) {
226 if (strcmp(rc->live_servers[i]->host, host) == 0
227 && rc->live_servers[i]->port == port) {
228
229 return rc->live_servers[i];
230 }
231 }
232
233 return NULL;
234 }
235
rs_find_conn(apr_redis_server_t * rs,apr_redis_conn_t ** conn)236 static apr_status_t rs_find_conn(apr_redis_server_t *rs,
237 apr_redis_conn_t ** conn)
238 {
239 apr_status_t rv;
240 apr_bucket_alloc_t *balloc;
241 apr_bucket *e;
242
243 #if APR_HAS_THREADS
244 rv = apr_reslist_acquire(rs->conns, (void **) conn);
245 #else
246 *conn = rs->conn;
247 rv = APR_SUCCESS;
248 #endif
249
250 if (rv != APR_SUCCESS) {
251 return rv;
252 }
253
254 balloc = apr_bucket_alloc_create((*conn)->tp);
255 (*conn)->bb = apr_brigade_create((*conn)->tp, balloc);
256 (*conn)->tb = apr_brigade_create((*conn)->tp, balloc);
257
258 e = apr_bucket_socket_create((*conn)->sock, balloc);
259 APR_BRIGADE_INSERT_TAIL((*conn)->bb, e);
260
261 return rv;
262 }
263
rs_bad_conn(apr_redis_server_t * rs,apr_redis_conn_t * conn)264 static apr_status_t rs_bad_conn(apr_redis_server_t *rs,
265 apr_redis_conn_t *conn)
266 {
267 #if APR_HAS_THREADS
268 return apr_reslist_invalidate(rs->conns, conn);
269 #else
270 return APR_SUCCESS;
271 #endif
272 }
273
rs_release_conn(apr_redis_server_t * rs,apr_redis_conn_t * conn)274 static apr_status_t rs_release_conn(apr_redis_server_t *rs,
275 apr_redis_conn_t *conn)
276 {
277 apr_pool_clear(conn->tp);
278 #if APR_HAS_THREADS
279 return apr_reslist_release(rs->conns, conn);
280 #else
281 return APR_SUCCESS;
282 #endif
283 }
284
apr_redis_enable_server(apr_redis_t * rc,apr_redis_server_t * rs)285 APU_DECLARE(apr_status_t) apr_redis_enable_server(apr_redis_t *rc,
286 apr_redis_server_t *rs)
287 {
288 apr_status_t rv = APR_SUCCESS;
289
290 if (rs->status == APR_RC_SERVER_LIVE) {
291 return rv;
292 }
293 rv = make_server_live(rc, rs);
294 return rv;
295 }
296
apr_redis_disable_server(apr_redis_t * rc,apr_redis_server_t * rs)297 APU_DECLARE(apr_status_t) apr_redis_disable_server(apr_redis_t *rc,
298 apr_redis_server_t *rs)
299 {
300 return make_server_dead(rc, rs);
301 }
302
conn_connect(apr_redis_conn_t * conn)303 static apr_status_t conn_connect(apr_redis_conn_t *conn)
304 {
305 apr_status_t rv = APR_SUCCESS;
306 apr_sockaddr_t *sa;
307 #if APR_HAVE_SOCKADDR_UN
308 apr_int32_t family = conn->rs->host[0] != '/' ? APR_INET : APR_UNIX;
309 #else
310 apr_int32_t family = APR_INET;
311 #endif
312
313 rv = apr_sockaddr_info_get(&sa, conn->rs->host, family, conn->rs->port, 0,
314 conn->p);
315 if (rv != APR_SUCCESS) {
316 return rv;
317 }
318
319 rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC);
320 if (rv != APR_SUCCESS) {
321 return rv;
322 }
323
324 rv = apr_socket_connect(conn->sock, sa);
325 if (rv != APR_SUCCESS) {
326 return rv;
327 }
328
329 rv = apr_socket_timeout_set(conn->sock,
330 conn->rs->rwto * APR_USEC_PER_SEC);
331 if (rv != APR_SUCCESS) {
332 return rv;
333 }
334
335 return rv;
336 }
337
338 static apr_status_t
rc_conn_construct(void ** conn_,void * params,apr_pool_t * pool)339 rc_conn_construct(void **conn_, void *params, apr_pool_t *pool)
340 {
341 apr_status_t rv = APR_SUCCESS;
342 apr_redis_conn_t *conn;
343 apr_pool_t *np;
344 apr_pool_t *tp;
345 apr_redis_server_t *rs = params;
346 #if APR_HAVE_SOCKADDR_UN
347 apr_int32_t family = rs->host[0] != '/' ? APR_INET : APR_UNIX;
348 #else
349 apr_int32_t family = APR_INET;
350 #endif
351
352 rv = apr_pool_create(&np, pool);
353 if (rv != APR_SUCCESS) {
354 return rv;
355 }
356
357 rv = apr_pool_create(&tp, np);
358 if (rv != APR_SUCCESS) {
359 apr_pool_destroy(np);
360 return rv;
361 }
362
363 conn = apr_palloc(np, sizeof(apr_redis_conn_t));
364
365 conn->p = np;
366 conn->tp = tp;
367
368 rv = apr_socket_create(&conn->sock, family, SOCK_STREAM, 0, np);
369
370 if (rv != APR_SUCCESS) {
371 apr_pool_destroy(np);
372 return rv;
373 }
374
375 conn->buffer = apr_palloc(conn->p, BUFFER_SIZE + 1);
376 conn->blen = 0;
377 conn->rs = rs;
378
379 rv = conn_connect(conn);
380 if (rv != APR_SUCCESS) {
381 apr_pool_destroy(np);
382 }
383 else {
384 *conn_ = conn;
385 }
386
387 return rv;
388 }
389
390 #if APR_HAS_THREADS
391 static apr_status_t
rc_conn_destruct(void * conn_,void * params,apr_pool_t * pool)392 rc_conn_destruct(void *conn_, void *params, apr_pool_t *pool)
393 {
394 apr_redis_conn_t *conn = (apr_redis_conn_t *) conn_;
395 struct iovec vec[3];
396 apr_size_t written;
397
398 /* send a quit message to the Redis server to be nice about it. */
399
400 /*
401 * RESP Command:
402 * *1
403 * $4
404 * QUIT
405 */
406 vec[0].iov_base = RC_RESP_1;
407 vec[0].iov_len = RC_RESP_1_LEN;
408
409 vec[1].iov_base = RC_QUIT_SIZE;
410 vec[1].iov_len = RC_QUIT_SIZE_LEN;
411
412 vec[2].iov_base = RC_QUIT;
413 vec[2].iov_len = RC_QUIT_LEN;
414
415 /* Return values not checked, since we just want to make it go away. */
416 apr_socket_sendv(conn->sock, vec, 3, &written);
417 apr_socket_close(conn->sock);
418
419 apr_pool_destroy(conn->p);
420
421 return APR_SUCCESS;
422 }
423 #endif
424
apr_redis_server_create(apr_pool_t * p,const char * host,apr_port_t port,apr_uint32_t min,apr_uint32_t smax,apr_uint32_t max,apr_uint32_t ttl,apr_uint32_t rwto,apr_redis_server_t ** rs)425 APU_DECLARE(apr_status_t) apr_redis_server_create(apr_pool_t *p,
426 const char *host,
427 apr_port_t port,
428 apr_uint32_t min,
429 apr_uint32_t smax,
430 apr_uint32_t max,
431 apr_uint32_t ttl,
432 apr_uint32_t rwto,
433 apr_redis_server_t **rs)
434 {
435 apr_status_t rv = APR_SUCCESS;
436 apr_redis_server_t *server;
437 apr_pool_t *np;
438
439 rv = apr_pool_create(&np, p);
440
441 server = apr_palloc(np, sizeof(apr_redis_server_t));
442
443 server->p = np;
444 server->host = apr_pstrdup(np, host);
445 server->port = port;
446 server->status = APR_RC_SERVER_DEAD;
447 server->rwto = rwto;
448 server->version.major = 0;
449 server->version.minor = 0;
450 server->version.patch = 0;
451
452 #if APR_HAS_THREADS
453 rv = apr_thread_mutex_create(&server->lock, APR_THREAD_MUTEX_DEFAULT, np);
454 if (rv != APR_SUCCESS) {
455 return rv;
456 }
457
458 rv = apr_reslist_create(&server->conns,
459 min, /* hard minimum */
460 smax, /* soft maximum */
461 max, /* hard maximum */
462 ttl, /* Time to live */
463 rc_conn_construct, /* Make a New Connection */
464 rc_conn_destruct, /* Kill Old Connection */
465 server, np);
466 if (rv != APR_SUCCESS) {
467 return rv;
468 }
469
470 apr_reslist_cleanup_order_set(server->conns, APR_RESLIST_CLEANUP_FIRST);
471 #else
472 rv = rc_conn_construct((void **) &(server->conn), server, np);
473 if (rv != APR_SUCCESS) {
474 return rv;
475 }
476 #endif
477
478 *rs = server;
479
480 return rv;
481 }
482
apr_redis_create(apr_pool_t * p,apr_uint16_t max_servers,apr_uint32_t flags,apr_redis_t ** redis)483 APU_DECLARE(apr_status_t) apr_redis_create(apr_pool_t *p,
484 apr_uint16_t max_servers,
485 apr_uint32_t flags,
486 apr_redis_t **redis)
487 {
488 apr_status_t rv = APR_SUCCESS;
489 apr_redis_t *rc;
490
491 rc = apr_palloc(p, sizeof(apr_redis_t));
492 rc->p = p;
493 rc->nalloc = max_servers;
494 rc->ntotal = 0;
495 rc->live_servers =
496 apr_palloc(p, rc->nalloc * sizeof(struct apr_redis_server_t *));
497 rc->hash_func = NULL;
498 rc->hash_baton = NULL;
499 rc->server_func = NULL;
500 rc->server_baton = NULL;
501 *redis = rc;
502 return rv;
503 }
504
505
506 /* The crc32 functions and data was originally written by Spencer
507 * Garrett <srg@quick.com> and was gleaned from the PostgreSQL source
508 * tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at
509 * src/usr.bin/cksum/crc32.c.
510 */
511
512 static const apr_uint32_t crc32tab[256] = {
513 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
514 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
515 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
516 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
517 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
518 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
519 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
520 0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
521 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
522 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
523 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
524 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
525 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
526 0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
527 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
528 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
529 0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
530 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
531 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
532 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
533 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
534 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
535 0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
536 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
537 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
538 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
539 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
540 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
541 0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
542 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
543 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
544 0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
545 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
546 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
547 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
548 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
549 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
550 0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
551 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
552 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
553 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
554 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
555 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
556 0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
557 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
558 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
559 0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
560 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
561 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
562 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
563 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
564 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
565 0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
566 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
567 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
568 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
569 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
570 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
571 0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
572 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
573 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
574 0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
575 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
576 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
577 };
578
apr_redis_hash_crc32(void * baton,const char * data,const apr_size_t data_len)579 APU_DECLARE(apr_uint32_t) apr_redis_hash_crc32(void *baton,
580 const char *data,
581 const apr_size_t data_len)
582 {
583 apr_uint32_t i;
584 apr_uint32_t crc;
585 crc = ~0;
586
587 for (i = 0; i < data_len; i++)
588 crc = (crc >> 8) ^ crc32tab[(crc ^ (data[i])) & 0xff];
589
590 return ~crc;
591 }
592
apr_redis_hash_default(void * baton,const char * data,const apr_size_t data_len)593 APU_DECLARE(apr_uint32_t) apr_redis_hash_default(void *baton,
594 const char *data,
595 const apr_size_t data_len)
596 {
597 /* The default Perl Client doesn't actually use just crc32 -- it shifts it again
598 * like this....
599 */
600 return ((apr_redis_hash_crc32(baton, data, data_len) >> 16) & 0x7fff);
601 }
602
apr_redis_hash(apr_redis_t * rc,const char * data,const apr_size_t data_len)603 APU_DECLARE(apr_uint32_t) apr_redis_hash(apr_redis_t *rc,
604 const char *data,
605 const apr_size_t data_len)
606 {
607 if (rc->hash_func) {
608 return rc->hash_func(rc->hash_baton, data, data_len);
609 }
610 else {
611 return apr_redis_hash_default(NULL, data, data_len);
612 }
613 }
614
get_server_line(apr_redis_conn_t * conn)615 static apr_status_t get_server_line(apr_redis_conn_t *conn)
616 {
617 apr_size_t bsize = BUFFER_SIZE;
618 apr_status_t rv = APR_SUCCESS;
619
620 rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ,
621 BUFFER_SIZE);
622
623 if (rv != APR_SUCCESS) {
624 return rv;
625 }
626
627 rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize);
628
629 if (rv != APR_SUCCESS) {
630 return rv;
631 }
632
633 conn->blen = bsize;
634 conn->buffer[bsize] = '\0';
635
636 return apr_brigade_cleanup(conn->tb);
637 }
638
apr_redis_set(apr_redis_t * rc,const char * key,char * data,const apr_size_t data_size,apr_uint16_t flags)639 APU_DECLARE(apr_status_t) apr_redis_set(apr_redis_t *rc,
640 const char *key,
641 char *data,
642 const apr_size_t data_size,
643 apr_uint16_t flags)
644 {
645 apr_uint32_t hash;
646 apr_redis_server_t *rs;
647 apr_redis_conn_t *conn;
648 apr_status_t rv;
649 apr_size_t written;
650 struct iovec vec[9];
651 char keysize_str[LILBUFF_SIZE];
652 char datasize_str[LILBUFF_SIZE];
653 apr_size_t len, klen;
654
655 klen = strlen(key);
656 hash = apr_redis_hash(rc, key, klen);
657
658 rs = apr_redis_find_server_hash(rc, hash);
659
660 if (rs == NULL)
661 return APR_NOTFOUND;
662
663 rv = rs_find_conn(rs, &conn);
664
665 if (rv != APR_SUCCESS) {
666 apr_redis_disable_server(rc, rs);
667 return rv;
668 }
669
670 /*
671 * RESP Command:
672 * *3
673 * $3
674 * SET
675 * $<keylen>
676 * key
677 * $<datalen>
678 * data
679 */
680
681 vec[0].iov_base = RC_RESP_3;
682 vec[0].iov_len = RC_RESP_3_LEN;
683
684 vec[1].iov_base = RC_SET_SIZE;
685 vec[1].iov_len = RC_SET_SIZE_LEN;
686
687 vec[2].iov_base = RC_SET;
688 vec[2].iov_len = RC_SET_LEN;
689
690 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n", klen);
691 vec[3].iov_base = keysize_str;
692 vec[3].iov_len = len;
693
694 vec[4].iov_base = (void *) key;
695 vec[4].iov_len = klen;
696
697 vec[5].iov_base = RC_EOL;
698 vec[5].iov_len = RC_EOL_LEN;
699
700 len = apr_snprintf(datasize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
701 data_size);
702 vec[6].iov_base = datasize_str;
703 vec[6].iov_len = len;
704
705 vec[7].iov_base = data;
706 vec[7].iov_len = data_size;
707
708 vec[8].iov_base = RC_EOL;
709 vec[8].iov_len = RC_EOL_LEN;
710
711 rv = apr_socket_sendv(conn->sock, vec, 9, &written);
712
713 if (rv != APR_SUCCESS) {
714 rs_bad_conn(rs, conn);
715 apr_redis_disable_server(rc, rs);
716 return rv;
717 }
718
719 rv = get_server_line(conn);
720 if (rv != APR_SUCCESS) {
721 rs_bad_conn(rs, conn);
722 apr_redis_disable_server(rc, rs);
723 return rv;
724 }
725
726 if (strcmp(conn->buffer, RS_STORED RC_EOL) == 0) {
727 rv = APR_SUCCESS;
728 }
729 else if (strcmp(conn->buffer, RS_NOT_STORED RC_EOL) == 0) {
730 rv = APR_EEXIST;
731 }
732 else {
733 rv = APR_EGENERAL;
734 }
735
736 rs_release_conn(rs, conn);
737 return rv;
738 }
739
apr_redis_setex(apr_redis_t * rc,const char * key,char * data,const apr_size_t data_size,apr_uint32_t timeout,apr_uint16_t flags)740 APU_DECLARE(apr_status_t) apr_redis_setex(apr_redis_t *rc,
741 const char *key,
742 char *data,
743 const apr_size_t data_size,
744 apr_uint32_t timeout,
745 apr_uint16_t flags)
746 {
747 apr_uint32_t hash;
748 apr_redis_server_t *rs;
749 apr_redis_conn_t *conn;
750 apr_status_t rv;
751 apr_size_t written;
752 struct iovec vec[11];
753 char keysize_str[LILBUFF_SIZE];
754 char expire_str[LILBUFF_SIZE];
755 char expiresize_str[LILBUFF_SIZE];
756 char datasize_str[LILBUFF_SIZE];
757 apr_size_t len, klen, expire_len;
758
759
760 klen = strlen(key);
761 hash = apr_redis_hash(rc, key, klen);
762
763 rs = apr_redis_find_server_hash(rc, hash);
764
765 if (rs == NULL)
766 return APR_NOTFOUND;
767
768 rv = rs_find_conn(rs, &conn);
769
770 if (rv != APR_SUCCESS) {
771 apr_redis_disable_server(rc, rs);
772 return rv;
773 }
774
775 /*
776 * RESP Command:
777 * *4
778 * $5
779 * SETEX
780 * $<keylen>
781 * key
782 * $<expirelen>
783 * expirey
784 * $<datalen>
785 * data
786 */
787
788 vec[0].iov_base = RC_RESP_4;
789 vec[0].iov_len = RC_RESP_4_LEN;
790
791 vec[1].iov_base = RC_SETEX_SIZE;
792 vec[1].iov_len = RC_SETEX_SIZE_LEN;
793
794 vec[2].iov_base = RC_SETEX;
795 vec[2].iov_len = RC_SETEX_LEN;
796
797 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n", klen);
798 vec[3].iov_base = keysize_str;
799 vec[3].iov_len = len;
800
801 vec[4].iov_base = (void *) key;
802 vec[4].iov_len = klen;
803
804 vec[5].iov_base = RC_EOL;
805 vec[5].iov_len = RC_EOL_LEN;
806
807 expire_len = apr_snprintf(expire_str, LILBUFF_SIZE, "%u\r\n", timeout);
808 len = apr_snprintf(expiresize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
809 expire_len - 2);
810 vec[6].iov_base = (void *) expiresize_str;
811 vec[6].iov_len = len;
812
813 vec[7].iov_base = (void *) expire_str;
814 vec[7].iov_len = expire_len;
815
816 len = apr_snprintf(datasize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
817 data_size);
818 vec[8].iov_base = datasize_str;
819 vec[8].iov_len = len;
820
821 vec[9].iov_base = data;
822 vec[9].iov_len = data_size;
823
824 vec[10].iov_base = RC_EOL;
825 vec[10].iov_len = RC_EOL_LEN;
826
827 rv = apr_socket_sendv(conn->sock, vec, 11, &written);
828
829 if (rv != APR_SUCCESS) {
830 rs_bad_conn(rs, conn);
831 apr_redis_disable_server(rc, rs);
832 return rv;
833 }
834
835 rv = get_server_line(conn);
836 if (rv != APR_SUCCESS) {
837 rs_bad_conn(rs, conn);
838 apr_redis_disable_server(rc, rs);
839 return rv;
840 }
841
842 if (strcmp(conn->buffer, RS_STORED RC_EOL) == 0) {
843 rv = APR_SUCCESS;
844 }
845 else if (strcmp(conn->buffer, RS_NOT_STORED RC_EOL) == 0) {
846 rv = APR_EEXIST;
847 }
848 else {
849 rv = APR_EGENERAL;
850 }
851
852 rs_release_conn(rs, conn);
853 return rv;
854 }
855
grab_bulk_resp(apr_redis_server_t * rs,apr_redis_t * rc,apr_redis_conn_t * conn,apr_pool_t * p,char ** baton,apr_size_t * new_length)856 static apr_status_t grab_bulk_resp(apr_redis_server_t *rs, apr_redis_t *rc,
857 apr_redis_conn_t *conn, apr_pool_t *p,
858 char **baton, apr_size_t *new_length)
859 {
860 char *length;
861 char *last;
862 apr_status_t rv;
863 apr_size_t len = 0;
864 *new_length = 0;
865
866 length = apr_strtok(conn->buffer + 1, " ", &last);
867 if (length) {
868 len = strtol(length, (char **) NULL, 10);
869 }
870
871 if (len == 0) {
872 *new_length = 0;
873 *baton = NULL;
874 }
875 else {
876 apr_bucket_brigade *bbb;
877 apr_bucket *e;
878
879 /* eat the trailing \r\n */
880 rv = apr_brigade_partition(conn->bb, len + 2, &e);
881
882 if (rv != APR_SUCCESS) {
883 rs_bad_conn(rs, conn);
884 if (rc)
885 apr_redis_disable_server(rc, rs);
886 return rv;
887 }
888
889 bbb = apr_brigade_split(conn->bb, e);
890
891 rv = apr_brigade_pflatten(conn->bb, baton, &len, p);
892
893 if (rv != APR_SUCCESS) {
894 rs_bad_conn(rs, conn);
895 if (rc)
896 apr_redis_disable_server(rc, rs);
897 return rv;
898 }
899
900 rv = apr_brigade_destroy(conn->bb);
901 if (rv != APR_SUCCESS) {
902 rs_bad_conn(rs, conn);
903 if (rc)
904 apr_redis_disable_server(rc, rs);
905 return rv;
906 }
907
908 conn->bb = bbb;
909
910 *new_length = len - 2;
911 (*baton)[*new_length] = '\0';
912 }
913 return APR_SUCCESS;
914
915 }
916
apr_redis_getp(apr_redis_t * rc,apr_pool_t * p,const char * key,char ** baton,apr_size_t * new_length,apr_uint16_t * flags)917 APU_DECLARE(apr_status_t) apr_redis_getp(apr_redis_t *rc,
918 apr_pool_t *p,
919 const char *key,
920 char **baton,
921 apr_size_t *new_length,
922 apr_uint16_t *flags)
923 {
924 apr_status_t rv;
925 apr_redis_server_t *rs;
926 apr_redis_conn_t *conn;
927 apr_uint32_t hash;
928 apr_size_t written;
929 apr_size_t len, klen;
930 struct iovec vec[6];
931 char keysize_str[LILBUFF_SIZE];
932
933 klen = strlen(key);
934 hash = apr_redis_hash(rc, key, klen);
935 rs = apr_redis_find_server_hash(rc, hash);
936
937 if (rs == NULL)
938 return APR_NOTFOUND;
939
940 rv = rs_find_conn(rs, &conn);
941
942 if (rv != APR_SUCCESS) {
943 apr_redis_disable_server(rc, rs);
944 return rv;
945 }
946
947 /*
948 * RESP Command:
949 * *2
950 * $3
951 * GET
952 * $<keylen>
953 * key
954 */
955 vec[0].iov_base = RC_RESP_2;
956 vec[0].iov_len = RC_RESP_2_LEN;
957
958 vec[1].iov_base = RC_GET_SIZE;
959 vec[1].iov_len = RC_GET_SIZE_LEN;
960
961 vec[2].iov_base = RC_GET;
962 vec[2].iov_len = RC_GET_LEN;
963
964 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
965 klen);
966 vec[3].iov_base = keysize_str;
967 vec[3].iov_len = len;
968
969 vec[4].iov_base = (void *) key;
970 vec[4].iov_len = klen;
971
972 vec[5].iov_base = RC_EOL;
973 vec[5].iov_len = RC_EOL_LEN;
974
975 rv = apr_socket_sendv(conn->sock, vec, 6, &written);
976
977
978 if (rv != APR_SUCCESS) {
979 rs_bad_conn(rs, conn);
980 apr_redis_disable_server(rc, rs);
981 return rv;
982 }
983
984 rv = get_server_line(conn);
985 if (rv != APR_SUCCESS) {
986 rs_bad_conn(rs, conn);
987 apr_redis_disable_server(rc, rs);
988 return rv;
989 }
990 if (strncmp(RS_NOT_FOUND_GET, conn->buffer, RS_NOT_FOUND_GET_LEN) == 0) {
991 rv = APR_NOTFOUND;
992 }
993 else if (strncmp(RS_TYPE_STRING, conn->buffer, RS_TYPE_STRING_LEN) == 0) {
994 rv = grab_bulk_resp(rs, rc, conn, p, baton, new_length);
995 }
996 else {
997 rv = APR_EGENERAL;
998 }
999
1000 rs_release_conn(rs, conn);
1001 return rv;
1002 }
1003
1004 APU_DECLARE(apr_status_t)
apr_redis_delete(apr_redis_t * rc,const char * key,apr_uint32_t timeout)1005 apr_redis_delete(apr_redis_t *rc, const char *key, apr_uint32_t timeout)
1006 {
1007 apr_status_t rv;
1008 apr_redis_server_t *rs;
1009 apr_redis_conn_t *conn;
1010 apr_uint32_t hash;
1011 apr_size_t written;
1012 struct iovec vec[6];
1013 apr_size_t len, klen;
1014 char keysize_str[LILBUFF_SIZE];
1015
1016 klen = strlen(key);
1017 hash = apr_redis_hash(rc, key, klen);
1018 rs = apr_redis_find_server_hash(rc, hash);
1019 if (rs == NULL)
1020 return APR_NOTFOUND;
1021
1022 rv = rs_find_conn(rs, &conn);
1023
1024 if (rv != APR_SUCCESS) {
1025 apr_redis_disable_server(rc, rs);
1026 return rv;
1027 }
1028
1029 /*
1030 * RESP Command:
1031 * *2
1032 * $3
1033 * DEL
1034 * $<keylen>
1035 * key
1036 */
1037 vec[0].iov_base = RC_RESP_2;
1038 vec[0].iov_len = RC_RESP_2_LEN;
1039
1040 vec[1].iov_base = RC_DEL_SIZE;
1041 vec[1].iov_len = RC_DEL_SIZE_LEN;
1042
1043 vec[2].iov_base = RC_DEL;
1044 vec[2].iov_len = RC_DEL_LEN;
1045
1046 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
1047 klen);
1048 vec[3].iov_base = keysize_str;
1049 vec[3].iov_len = len;
1050
1051 vec[4].iov_base = (void *) key;
1052 vec[4].iov_len = klen;
1053
1054 vec[5].iov_base = RC_EOL;
1055 vec[5].iov_len = RC_EOL_LEN;
1056
1057 rv = apr_socket_sendv(conn->sock, vec, 6, &written);
1058
1059 if (rv != APR_SUCCESS) {
1060 rs_bad_conn(rs, conn);
1061 apr_redis_disable_server(rc, rs);
1062 return rv;
1063 }
1064
1065 rv = get_server_line(conn);
1066 if (rv != APR_SUCCESS) {
1067 rs_bad_conn(rs, conn);
1068 apr_redis_disable_server(rc, rs);
1069 return rv;
1070 }
1071
1072 if (strncmp(RS_DELETED, conn->buffer, RS_DELETED_LEN) == 0) {
1073 rv = APR_SUCCESS;
1074 }
1075 else if (strncmp(RS_NOT_FOUND_DEL, conn->buffer, RS_NOT_FOUND_DEL_LEN) == 0) {
1076 rv = APR_NOTFOUND;
1077 }
1078 else {
1079 rv = APR_EGENERAL;
1080 }
1081
1082 rs_release_conn(rs, conn);
1083 return rv;
1084 }
1085
1086 APU_DECLARE(apr_status_t)
apr_redis_ping(apr_redis_server_t * rs)1087 apr_redis_ping(apr_redis_server_t *rs)
1088 {
1089 apr_status_t rv;
1090 apr_size_t written;
1091 struct iovec vec[3];
1092 apr_redis_conn_t *conn;
1093
1094 rv = rs_find_conn(rs, &conn);
1095
1096 if (rv != APR_SUCCESS) {
1097 return rv;
1098 }
1099
1100 /*
1101 * RESP Command:
1102 * *1
1103 * $4
1104 * PING
1105 */
1106 vec[0].iov_base = RC_RESP_1;
1107 vec[0].iov_len = RC_RESP_1_LEN;
1108
1109 vec[1].iov_base = RC_PING_SIZE;
1110 vec[1].iov_len = RC_PING_SIZE_LEN;
1111
1112 vec[2].iov_base = RC_PING;
1113 vec[2].iov_len = RC_PING_LEN;
1114
1115 rv = apr_socket_sendv(conn->sock, vec, 3, &written);
1116
1117 if (rv != APR_SUCCESS) {
1118 rs_bad_conn(rs, conn);
1119 return rv;
1120 }
1121
1122 rv = get_server_line(conn);
1123 if (rv == APR_SUCCESS) {
1124 /* we got *something*. Was it Redis? */
1125 if (strncmp(conn->buffer, "+PONG", sizeof("+PONG")-1) != 0) {
1126 rv = APR_EGENERAL;
1127 }
1128 }
1129 rs_release_conn(rs, conn);
1130 return rv;
1131 }
1132
1133 APU_DECLARE(apr_status_t)
apr_redis_info(apr_redis_server_t * rs,apr_pool_t * p,char ** baton)1134 apr_redis_info(apr_redis_server_t *rs, apr_pool_t *p, char **baton)
1135 {
1136 apr_status_t rv;
1137 apr_redis_conn_t *conn;
1138 apr_size_t written;
1139 struct iovec vec[3];
1140
1141 rv = rs_find_conn(rs, &conn);
1142
1143 if (rv != APR_SUCCESS) {
1144 return rv;
1145 }
1146
1147 /*
1148 * RESP Command:
1149 * *1
1150 * $4
1151 * INFO
1152 */
1153 vec[0].iov_base = RC_RESP_1;
1154 vec[0].iov_len = RC_RESP_1_LEN;
1155
1156 vec[1].iov_base = RC_INFO_SIZE;
1157 vec[1].iov_len = RC_INFO_SIZE_LEN;
1158
1159 vec[2].iov_base = RC_INFO;
1160 vec[2].iov_len = RC_INFO_LEN;
1161
1162 rv = apr_socket_sendv(conn->sock, vec, 3, &written);
1163
1164 if (rv != APR_SUCCESS) {
1165 rs_bad_conn(rs, conn);
1166 return rv;
1167 }
1168
1169 rv = get_server_line(conn);
1170 if (rv != APR_SUCCESS) {
1171 rs_bad_conn(rs, conn);
1172 return rv;
1173 }
1174
1175 if (strncmp(RS_TYPE_STRING, conn->buffer, RS_TYPE_STRING_LEN) == 0) {
1176 apr_size_t nl;
1177 rv = grab_bulk_resp(rs, NULL, conn, p, baton, &nl);
1178 } else {
1179 rs_bad_conn(rs, conn);
1180 rv = APR_EGENERAL;
1181 }
1182
1183 rs_release_conn(rs, conn);
1184 return rv;
1185 }
1186
1187 #define RV_FIELD "redis_version:"
1188 APU_DECLARE(apr_status_t)
apr_redis_version(apr_redis_server_t * rs,apr_pool_t * p,char ** baton)1189 apr_redis_version(apr_redis_server_t *rs, apr_pool_t *p, char **baton)
1190 {
1191 apr_status_t rv;
1192 char *ptr, *eptr;
1193 apr_pool_t *subpool;
1194
1195 /* Have we already obtained the version number? */
1196 if (rs->version.minor != 0) {
1197 if (baton)
1198 *baton = apr_pstrdup(p, rs->version.number);
1199 return APR_SUCCESS;
1200 }
1201 if (apr_pool_create(&subpool, p) != APR_SUCCESS) {
1202 /* well, we tried */
1203 subpool = p;
1204 }
1205 rv = apr_redis_info(rs, subpool, baton);
1206
1207 if (rv != APR_SUCCESS) {
1208 if (subpool != p) {
1209 apr_pool_destroy(subpool);
1210 }
1211 return rv;
1212 }
1213
1214 ptr = strstr(*baton, RV_FIELD);
1215 if (ptr) {
1216 rs->version.major = strtol(ptr + sizeof(RV_FIELD) - 1, &eptr, 10);
1217 ptr = eptr + 1;
1218 rs->version.minor = strtol(ptr, &eptr, 10);
1219 ptr = eptr + 1;
1220 rs->version.patch = strtol(ptr, &eptr, 10);
1221 rs->version.number = apr_psprintf(rs->p, "%d.%d.%d",
1222 rs->version.major, rs->version.minor,
1223 rs->version.patch);
1224 }
1225 if (baton)
1226 *baton = apr_pstrdup(p, rs->version.number);
1227 if (subpool != p) {
1228 apr_pool_destroy(subpool);
1229 }
1230 return APR_SUCCESS;
1231 }
1232
plus_minus(apr_redis_t * rc,int incr,const char * key,apr_int32_t inc,apr_uint32_t * new_value)1233 static apr_status_t plus_minus(apr_redis_t *rc,
1234 int incr,
1235 const char *key,
1236 apr_int32_t inc,
1237 apr_uint32_t *new_value)
1238 {
1239 apr_status_t rv;
1240 apr_redis_server_t *rs;
1241 apr_redis_conn_t *conn;
1242 apr_uint32_t hash;
1243 apr_size_t written;
1244 apr_size_t len, klen;
1245 struct iovec vec[12];
1246 char keysize_str[LILBUFF_SIZE];
1247 char inc_str[LILBUFF_SIZE];
1248 char inc_str_len[LILBUFF_SIZE];
1249 int i = 0;
1250
1251 klen = strlen(key);
1252 hash = apr_redis_hash(rc, key, klen);
1253 rs = apr_redis_find_server_hash(rc, hash);
1254 if (rs == NULL)
1255 return APR_NOTFOUND;
1256
1257 rv = rs_find_conn(rs, &conn);
1258
1259 if (rv != APR_SUCCESS) {
1260 apr_redis_disable_server(rc, rs);
1261 return rv;
1262 }
1263
1264 /*
1265 * RESP Command:
1266 * *2|*3
1267 * $4|$6
1268 * INCR/DECR|INCRBY/DECRBY
1269 * $<keylen>
1270 * key
1271 * <:inc>
1272 */
1273 if (inc == 1) {
1274 vec[i].iov_base = RC_RESP_2;
1275 vec[i].iov_len = RC_RESP_2_LEN;
1276 i++;
1277
1278 vec[i].iov_base = "$4\r\n";
1279 vec[i].iov_len = sizeof("$4\r\n")-1;
1280 i++;
1281
1282 if (incr)
1283 vec[i].iov_base = "INCR\r\n";
1284 else
1285 vec[i].iov_base = "DECR\r\n";
1286 vec[i].iov_len = sizeof("INCR\r\n")-1;
1287 i++;
1288 }
1289 else {
1290 vec[i].iov_base = RC_RESP_3;
1291 vec[i].iov_len = RC_RESP_3_LEN;
1292 i++;
1293
1294 vec[i].iov_base = "$6\r\n";
1295 vec[i].iov_len = sizeof("$6\r\n")-1;
1296 i++;
1297
1298 if (incr)
1299 vec[i].iov_base = "INCRBY\r\n";
1300 else
1301 vec[i].iov_base = "DECRBY\r\n";
1302 vec[i].iov_len = sizeof("INCRBY\r\n")-1;
1303 i++;
1304 }
1305
1306 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
1307 klen);
1308 vec[i].iov_base = keysize_str;
1309 vec[i].iov_len = len;
1310 i++;
1311
1312 vec[i].iov_base = (void *) key;
1313 vec[i].iov_len = klen;
1314 i++;
1315
1316 vec[i].iov_base = RC_EOL;
1317 vec[i].iov_len = RC_EOL_LEN;
1318 i++;
1319
1320 if (inc != 1) {
1321 len = apr_snprintf(inc_str, LILBUFF_SIZE, "%d\r\n", inc);
1322 klen = apr_snprintf(inc_str_len, LILBUFF_SIZE, "$%d\r\n", (int)(len-2));
1323 vec[i].iov_base = inc_str_len;
1324 vec[i].iov_len = klen;
1325 i++;
1326
1327 vec[i].iov_base = inc_str;
1328 vec[i].iov_len = len;
1329 i++;
1330
1331 vec[i].iov_base = RC_EOL;
1332 vec[i].iov_len = RC_EOL_LEN;
1333 i++;
1334 }
1335
1336 rv = apr_socket_sendv(conn->sock, vec, i, &written);
1337
1338 if (rv != APR_SUCCESS) {
1339 rs_bad_conn(rs, conn);
1340 apr_redis_disable_server(rc, rs);
1341 return rv;
1342 }
1343
1344 rv = get_server_line(conn);
1345 if (rv != APR_SUCCESS) {
1346 rs_bad_conn(rs, conn);
1347 apr_redis_disable_server(rc, rs);
1348 return rv;
1349 }
1350 if (strncmp(RS_NOT_FOUND_GET, conn->buffer, RS_NOT_FOUND_GET_LEN) == 0) {
1351 rv = APR_NOTFOUND;
1352 }
1353 else if (*conn->buffer == ':') {
1354 *new_value = atoi((const char *)(conn->buffer + 1));
1355 rv = APR_SUCCESS;
1356 }
1357 else {
1358 rv = APR_EGENERAL;
1359 }
1360 rs_release_conn(rs, conn);
1361 return rv;
1362 }
1363
1364 APU_DECLARE(apr_status_t)
apr_redis_incr(apr_redis_t * rc,const char * key,apr_int32_t inc,apr_uint32_t * new_value)1365 apr_redis_incr(apr_redis_t *rc, const char *key, apr_int32_t inc, apr_uint32_t *new_value)
1366 {
1367 return plus_minus(rc, 1, key, inc, new_value);
1368 }
1369
1370 APU_DECLARE(apr_status_t)
apr_redis_decr(apr_redis_t * rc,const char * key,apr_int32_t inc,apr_uint32_t * new_value)1371 apr_redis_decr(apr_redis_t *rc, const char *key, apr_int32_t inc, apr_uint32_t *new_value)
1372 {
1373 return plus_minus(rc, 0, key, inc, new_value);
1374 }
1375
1376 APU_DECLARE(apr_status_t)
apr_redis_multgetp(apr_redis_t * rc,apr_pool_t * temp_pool,apr_pool_t * data_pool,apr_hash_t * values)1377 apr_redis_multgetp(apr_redis_t *rc,
1378 apr_pool_t *temp_pool,
1379 apr_pool_t *data_pool,
1380 apr_hash_t *values)
1381 {
1382 return APR_ENOTIMPL;
1383 }
1384
1385 /**
1386 * Define all of the strings for stats
1387 */
1388
1389 #define STAT_process_id "process_id:"
1390 #define STAT_process_id_LEN (sizeof(STAT_process_id)-1)
1391
1392 #define STAT_uptime_in_seconds "uptime_in_seconds:"
1393 #define STAT_uptime_in_seconds_LEN (sizeof(STAT_uptime_in_seconds)-1)
1394
1395 #define STAT_arch_bits "arch_bits:"
1396 #define STAT_arch_bits_LEN (sizeof(STAT_arch_bits)-1)
1397
1398 #define STAT_connected_clients "connected_clients:"
1399 #define STAT_connected_clients_LEN (sizeof(STAT_connected_clients)-1)
1400
1401 #define STAT_blocked_clients "blocked_clients:"
1402 #define STAT_blocked_clients_LEN (sizeof(STAT_blocked_clients)-1)
1403
1404 #define STAT_maxmemory "maxmemory:"
1405 #define STAT_maxmemory_LEN (sizeof(STAT_maxmemory)-1)
1406
1407 #define STAT_used_memory "used_memory:"
1408 #define STAT_used_memory_LEN (sizeof(STAT_used_memory)-1)
1409
1410 #define STAT_total_system_memory "total_system_memory:"
1411 #define STAT_total_system_memory_LEN (sizeof(STAT_total_system_memory)-1)
1412
1413 #define STAT_total_connections_received "total_connections_received:"
1414 #define STAT_total_connections_received_LEN (sizeof(STAT_total_connections_received)-1)
1415
1416 #define STAT_total_commands_processed "total_commands_processed:"
1417 #define STAT_total_commands_processed_LEN (sizeof(STAT_total_commands_processed)-1)
1418
1419 #define STAT_rejected_connections "rejected_connections:"
1420 #define STAT_rejected_connections_LEN (sizeof(STAT_rejected_connections)-1)
1421
1422 #define STAT_total_net_input_bytes "total_net_input_bytes:"
1423 #define STAT_total_net_input_bytes_LEN (sizeof(STAT_total_net_input_bytes)-1)
1424
1425 #define STAT_total_net_output_bytes "total_net_output_bytes:"
1426 #define STAT_total_net_output_bytes_LEN (sizeof(STAT_total_net_output_bytes)-1)
1427
1428 #define STAT_keyspace_hits "keyspace_hits:"
1429 #define STAT_keyspace_hits_LEN (sizeof(STAT_keyspace_hits)-1)
1430
1431 #define STAT_keyspace_misses "keyspace_misses:"
1432 #define STAT_keyspace_misses_LEN (sizeof(STAT_keyspace_misses)-1)
1433
1434 #define STAT_connected_slaves "connected_slaves:"
1435 #define STAT_connected_slaves_LEN (sizeof(STAT_connected_slaves)-1)
1436
1437 #define STAT_used_cpu_sys "used_cpu_sys:"
1438 #define STAT_used_cpu_sys_LEN (sizeof(STAT_used_cpu_sys)-1)
1439
1440 #define STAT_used_cpu_user "used_cpu_user:"
1441 #define STAT_used_cpu_user_LEN (sizeof(STAT_used_cpu_user)-1)
1442
1443 #define STAT_cluster_enabled "cluster_enabled:"
1444 #define STAT_cluster_enabled_LEN (sizeof(STAT_cluster_enabled)-1)
1445
stat_read_uint32(char * buf)1446 static apr_uint32_t stat_read_uint32( char *buf)
1447 {
1448 return atoi(buf);
1449 }
1450
stat_read_uint64(char * buf)1451 static apr_uint64_t stat_read_uint64(char *buf)
1452 {
1453 return apr_atoi64(buf);
1454 }
1455
1456 #define rc_do_stat(name, type) \
1457 if ((ptr = strstr(info , STAT_ ## name )) != NULL ) { \
1458 char *str = ptr + (STAT_ ## name ## _LEN ); \
1459 stats-> name = stat_read_ ## type (str); \
1460 }
1461
update_stats(char * info,apr_redis_stats_t * stats)1462 static void update_stats(char *info, apr_redis_stats_t *stats)
1463 {
1464 char *ptr;
1465
1466 rc_do_stat(process_id, uint32);
1467 rc_do_stat(uptime_in_seconds, uint32);
1468 rc_do_stat(arch_bits, uint32);
1469 rc_do_stat(connected_clients, uint32);
1470 rc_do_stat(blocked_clients, uint32);
1471 rc_do_stat(maxmemory, uint64);
1472 rc_do_stat(used_memory, uint64);
1473 rc_do_stat(total_system_memory, uint64);
1474 rc_do_stat(total_connections_received, uint64);
1475 rc_do_stat(total_commands_processed, uint64);
1476 rc_do_stat(rejected_connections, uint64);
1477 rc_do_stat(total_net_input_bytes, uint64);
1478 rc_do_stat(total_net_output_bytes, uint64);
1479 rc_do_stat(keyspace_hits, uint64);
1480 rc_do_stat(keyspace_misses, uint64);
1481 rc_do_stat(connected_slaves, uint32);
1482 rc_do_stat(used_cpu_sys, uint32);
1483 rc_do_stat(used_cpu_user, uint32);
1484 rc_do_stat(cluster_enabled, uint32);
1485 }
1486
1487 APU_DECLARE(apr_status_t)
apr_redis_stats(apr_redis_server_t * rs,apr_pool_t * p,apr_redis_stats_t ** stats)1488 apr_redis_stats(apr_redis_server_t *rs,
1489 apr_pool_t *p,
1490 apr_redis_stats_t **stats)
1491 {
1492 apr_status_t rv;
1493 char *info;
1494 apr_pool_t *subpool;
1495 apr_redis_stats_t *ret;
1496 char *ptr;
1497
1498 if (apr_pool_create(&subpool, p) != APR_SUCCESS) {
1499 /* well, we tried */
1500 subpool = p;
1501 }
1502 rv = apr_redis_info(rs, subpool, &info);
1503
1504 if (rv != APR_SUCCESS) {
1505 if (subpool != p) {
1506 apr_pool_destroy(subpool);
1507 }
1508 return rv;
1509 }
1510 ret = apr_pcalloc(p, sizeof(apr_redis_stats_t));
1511 /* Get the bulk of the stats */
1512 update_stats(info, ret);
1513
1514 /* Now the version number */
1515 if (rs->version.major != 0) {
1516 ret->major = rs->version.major;
1517 ret->minor = rs->version.minor;
1518 ret->patch = rs->version.patch;
1519 }
1520 else {
1521 char *eptr;
1522 ptr = strstr(info, RV_FIELD);
1523 if (ptr) {
1524 ret->major = rs->version.major = strtol(ptr + sizeof(RV_FIELD) - 1, &eptr, 10);
1525 ptr = eptr + 1;
1526 ret->minor = rs->version.minor = strtol(ptr, &eptr, 10);
1527 ptr = eptr + 1;
1528 ret->patch = rs->version.patch = strtol(ptr, &eptr, 10);
1529 }
1530 }
1531
1532 /* Finally, the role */
1533 ptr = strstr(info, "role:");
1534 if (!ptr) {
1535 ret->role = APR_RS_SERVER_UNKNOWN;
1536 }
1537 else if (!strncmp("master", ptr + sizeof("role:") - 1, sizeof("master")-1)) {
1538 ret->role = APR_RS_SERVER_MASTER;
1539 }
1540 else {
1541 ret->role = APR_RS_SERVER_SLAVE;
1542 }
1543 if (stats) {
1544 *stats = ret;
1545 }
1546
1547 return APR_SUCCESS;
1548 }
1549