1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "apr_redis.h"
18 #include "apr_poll.h"
19 #include "apr_version.h"
20 #include <stdlib.h>
21 #include <string.h>
22 
23 #define BUFFER_SIZE  512
24 #define LILBUFF_SIZE 64
25 struct apr_redis_conn_t
26 {
27     char *buffer;
28     apr_size_t blen;
29     apr_pool_t *p;
30     apr_pool_t *tp;
31     apr_socket_t *sock;
32     apr_bucket_brigade *bb;
33     apr_bucket_brigade *tb;
34     apr_redis_server_t *rs;
35 };
36 
37 /* Strings for Client Commands */
38 
39 #define RC_EOL "\r\n"
40 #define RC_EOL_LEN (sizeof(RC_EOL)-1)
41 
42 #define RC_WS " "
43 #define RC_WS_LEN (sizeof(RC_WS)-1)
44 
45 #define RC_RESP_1 "*1\r\n"
46 #define RC_RESP_1_LEN (sizeof(RC_RESP_1)-1)
47 
48 #define RC_RESP_2 "*2\r\n"
49 #define RC_RESP_2_LEN (sizeof(RC_RESP_2)-1)
50 
51 #define RC_RESP_3 "*3\r\n"
52 #define RC_RESP_3_LEN (sizeof(RC_RESP_3)-1)
53 
54 #define RC_RESP_4 "*4\r\n"
55 #define RC_RESP_4_LEN (sizeof(RC_RESP_4)-1)
56 
57 #define RC_GET "GET\r\n"
58 #define RC_GET_LEN (sizeof(RC_GET)-1)
59 
60 #define RC_GET_SIZE "$3\r\n"
61 #define RC_GET_SIZE_LEN (sizeof(RC_GET_SIZE)-1)
62 
63 #define RC_SET "SET\r\n"
64 #define RC_SET_LEN (sizeof(RC_SET)-1)
65 
66 #define RC_SET_SIZE "$3\r\n"
67 #define RC_SET_SIZE_LEN (sizeof(RC_SET_SIZE)-1)
68 
69 #define RC_SETEX "SETEX\r\n"
70 #define RC_SETEX_LEN (sizeof(RC_SETEX)-1)
71 
72 #define RC_SETEX_SIZE "$5\r\n"
73 #define RC_SETEX_SIZE_LEN (sizeof(RC_SETEX_SIZE)-1)
74 
75 #define RC_DEL "DEL\r\n"
76 #define RC_DEL_LEN (sizeof(RC_DEL)-1)
77 
78 #define RC_DEL_SIZE "$3\r\n"
79 #define RC_DEL_SIZE_LEN (sizeof(RC_DEL_SIZE)-1)
80 
81 #define RC_QUIT "QUIT\r\n"
82 #define RC_QUIT_LEN (sizeof(RC_QUIT)-1)
83 
84 #define RC_QUIT_SIZE "$4\r\n"
85 #define RC_QUIT_SIZE_LEN (sizeof(RC_QUIT_SIZE)-1)
86 
87 #define RC_PING "PING\r\n"
88 #define RC_PING_LEN (sizeof(RC_PING)-1)
89 
90 #define RC_PING_SIZE "$4\r\n"
91 #define RC_PING_SIZE_LEN (sizeof(RC_PING_SIZE)-1)
92 
93 #define RC_INFO "INFO\r\n"
94 #define RC_INFO_LEN (sizeof(RC_INFO)-1)
95 
96 #define RC_INFO_SIZE "$4\r\n"
97 #define RC_INFO_SIZE_LEN (sizeof(RC_INFO_SIZE)-1)
98 
99 /* Strings for Server Replies */
100 
101 #define RS_STORED "+OK"
102 #define RS_STORED_LEN (sizeof(RS_STORED)-1)
103 
104 #define RS_NOT_STORED "$-1"
105 #define RS_NOT_STORED_LEN (sizeof(RS_NOT_STORED)-1)
106 
107 #define RS_DELETED ":1"
108 #define RS_DELETED_LEN (sizeof(RS_DELETED)-1)
109 
110 #define RS_NOT_FOUND_GET "$-1"
111 #define RS_NOT_FOUND_GET_LEN (sizeof(RS_NOT_FOUND_GET)-1)
112 
113 #define RS_NOT_FOUND_DEL ":0"
114 #define RS_NOT_FOUND_DEL_LEN (sizeof(RS_NOT_FOUND_DEL)-1)
115 
116 #define RS_TYPE_STRING "$"
117 #define RS_TYPE_STRING_LEN (sizeof(RS_TYPE_STRING)-1)
118 
119 #define RS_END "\r\n"
120 #define RS_END_LEN (sizeof(RS_END)-1)
121 
make_server_dead(apr_redis_t * rc,apr_redis_server_t * rs)122 static apr_status_t make_server_dead(apr_redis_t *rc,
123                                      apr_redis_server_t *rs)
124 {
125 #if APR_HAS_THREADS
126     apr_thread_mutex_lock(rs->lock);
127 #endif
128     rs->status = APR_RC_SERVER_DEAD;
129     rs->btime = apr_time_now();
130 #if APR_HAS_THREADS
131     apr_thread_mutex_unlock(rs->lock);
132 #endif
133     return APR_SUCCESS;
134 }
135 
make_server_live(apr_redis_t * rc,apr_redis_server_t * rs)136 static apr_status_t make_server_live(apr_redis_t *rc,
137                                      apr_redis_server_t *rs)
138 {
139     rs->status = APR_RC_SERVER_LIVE;
140     return APR_SUCCESS;
141 }
142 
apr_redis_add_server(apr_redis_t * rc,apr_redis_server_t * rs)143 APU_DECLARE(apr_status_t) apr_redis_add_server(apr_redis_t *rc,
144                                                apr_redis_server_t *rs)
145 {
146     apr_status_t rv = APR_SUCCESS;
147 
148     if (rc->ntotal >= rc->nalloc) {
149         return APR_ENOMEM;
150     }
151     rc->live_servers[rc->ntotal] = rs;
152     rc->ntotal++;
153     make_server_live(rc, rs);
154     return rv;
155 }
156 
157 APU_DECLARE(apr_redis_server_t *)
apr_redis_find_server_hash(apr_redis_t * rc,const apr_uint32_t hash)158 apr_redis_find_server_hash(apr_redis_t *rc, const apr_uint32_t hash)
159 {
160     if (rc->server_func) {
161         return rc->server_func(rc->server_baton, rc, hash);
162     }
163     else {
164         return apr_redis_find_server_hash_default(NULL, rc, hash);
165     }
166 }
167 
168 APU_DECLARE(apr_redis_server_t *)
apr_redis_find_server_hash_default(void * baton,apr_redis_t * rc,const apr_uint32_t hash)169 apr_redis_find_server_hash_default(void *baton, apr_redis_t *rc,
170                                    const apr_uint32_t hash)
171 {
172     apr_redis_server_t *rs = NULL;
173     apr_uint32_t h = hash ? hash : 1;
174     apr_uint32_t i = 0;
175     apr_time_t curtime = 0;
176 
177     if (rc->ntotal == 0) {
178         return NULL;
179     }
180 
181     do {
182         rs = rc->live_servers[h % rc->ntotal];
183         if (rs->status == APR_RC_SERVER_LIVE) {
184             break;
185         }
186         else {
187             if (curtime == 0) {
188                 curtime = apr_time_now();
189             }
190 #if APR_HAS_THREADS
191             apr_thread_mutex_lock(rs->lock);
192 #endif
193             /* Try the dead server, every 5 seconds */
194             if (curtime - rs->btime > apr_time_from_sec(5)) {
195                 rs->btime = curtime;
196                 if (apr_redis_ping(rs) == APR_SUCCESS) {
197                     make_server_live(rc, rs);
198 #if APR_HAS_THREADS
199                     apr_thread_mutex_unlock(rs->lock);
200 #endif
201                     break;
202                 }
203             }
204 #if APR_HAS_THREADS
205             apr_thread_mutex_unlock(rs->lock);
206 #endif
207         }
208         h++;
209         i++;
210     } while (i < rc->ntotal);
211 
212     if (i == rc->ntotal) {
213         rs = NULL;
214     }
215 
216     return rs;
217 }
218 
apr_redis_find_server(apr_redis_t * rc,const char * host,apr_port_t port)219 APU_DECLARE(apr_redis_server_t *) apr_redis_find_server(apr_redis_t *rc,
220                                                         const char *host,
221                                                         apr_port_t port)
222 {
223     int i;
224 
225     for (i = 0; i < rc->ntotal; i++) {
226         if (strcmp(rc->live_servers[i]->host, host) == 0
227             && rc->live_servers[i]->port == port) {
228 
229             return rc->live_servers[i];
230         }
231     }
232 
233     return NULL;
234 }
235 
rs_find_conn(apr_redis_server_t * rs,apr_redis_conn_t ** conn)236 static apr_status_t rs_find_conn(apr_redis_server_t *rs,
237                                  apr_redis_conn_t ** conn)
238 {
239     apr_status_t rv;
240     apr_bucket_alloc_t *balloc;
241     apr_bucket *e;
242 
243 #if APR_HAS_THREADS
244     rv = apr_reslist_acquire(rs->conns, (void **) conn);
245 #else
246     *conn = rs->conn;
247     rv = APR_SUCCESS;
248 #endif
249 
250     if (rv != APR_SUCCESS) {
251         return rv;
252     }
253 
254     balloc = apr_bucket_alloc_create((*conn)->tp);
255     (*conn)->bb = apr_brigade_create((*conn)->tp, balloc);
256     (*conn)->tb = apr_brigade_create((*conn)->tp, balloc);
257 
258     e = apr_bucket_socket_create((*conn)->sock, balloc);
259     APR_BRIGADE_INSERT_TAIL((*conn)->bb, e);
260 
261     return rv;
262 }
263 
rs_bad_conn(apr_redis_server_t * rs,apr_redis_conn_t * conn)264 static apr_status_t rs_bad_conn(apr_redis_server_t *rs,
265                                 apr_redis_conn_t *conn)
266 {
267 #if APR_HAS_THREADS
268     return apr_reslist_invalidate(rs->conns, conn);
269 #else
270     return APR_SUCCESS;
271 #endif
272 }
273 
rs_release_conn(apr_redis_server_t * rs,apr_redis_conn_t * conn)274 static apr_status_t rs_release_conn(apr_redis_server_t *rs,
275                                     apr_redis_conn_t *conn)
276 {
277     apr_pool_clear(conn->tp);
278 #if APR_HAS_THREADS
279     return apr_reslist_release(rs->conns, conn);
280 #else
281     return APR_SUCCESS;
282 #endif
283 }
284 
apr_redis_enable_server(apr_redis_t * rc,apr_redis_server_t * rs)285 APU_DECLARE(apr_status_t) apr_redis_enable_server(apr_redis_t *rc,
286                                                   apr_redis_server_t *rs)
287 {
288     apr_status_t rv = APR_SUCCESS;
289 
290     if (rs->status == APR_RC_SERVER_LIVE) {
291         return rv;
292     }
293     rv = make_server_live(rc, rs);
294     return rv;
295 }
296 
apr_redis_disable_server(apr_redis_t * rc,apr_redis_server_t * rs)297 APU_DECLARE(apr_status_t) apr_redis_disable_server(apr_redis_t *rc,
298                                                    apr_redis_server_t *rs)
299 {
300     return make_server_dead(rc, rs);
301 }
302 
conn_connect(apr_redis_conn_t * conn)303 static apr_status_t conn_connect(apr_redis_conn_t *conn)
304 {
305     apr_status_t rv = APR_SUCCESS;
306     apr_sockaddr_t *sa;
307 #if APR_HAVE_SOCKADDR_UN
308     apr_int32_t family = conn->rs->host[0] != '/' ? APR_INET : APR_UNIX;
309 #else
310     apr_int32_t family = APR_INET;
311 #endif
312 
313     rv = apr_sockaddr_info_get(&sa, conn->rs->host, family, conn->rs->port, 0,
314                                conn->p);
315     if (rv != APR_SUCCESS) {
316         return rv;
317     }
318 
319     rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC);
320     if (rv != APR_SUCCESS) {
321         return rv;
322     }
323 
324     rv = apr_socket_connect(conn->sock, sa);
325     if (rv != APR_SUCCESS) {
326         return rv;
327     }
328 
329     rv = apr_socket_timeout_set(conn->sock,
330                                 conn->rs->rwto * APR_USEC_PER_SEC);
331     if (rv != APR_SUCCESS) {
332         return rv;
333     }
334 
335     return rv;
336 }
337 
338 static apr_status_t
rc_conn_construct(void ** conn_,void * params,apr_pool_t * pool)339 rc_conn_construct(void **conn_, void *params, apr_pool_t *pool)
340 {
341     apr_status_t rv = APR_SUCCESS;
342     apr_redis_conn_t *conn;
343     apr_pool_t *np;
344     apr_pool_t *tp;
345     apr_redis_server_t *rs = params;
346 #if APR_HAVE_SOCKADDR_UN
347     apr_int32_t family = rs->host[0] != '/' ? APR_INET : APR_UNIX;
348 #else
349     apr_int32_t family = APR_INET;
350 #endif
351 
352     rv = apr_pool_create(&np, pool);
353     if (rv != APR_SUCCESS) {
354         return rv;
355     }
356 
357     rv = apr_pool_create(&tp, np);
358     if (rv != APR_SUCCESS) {
359         apr_pool_destroy(np);
360         return rv;
361     }
362 
363     conn = apr_palloc(np, sizeof(apr_redis_conn_t));
364 
365     conn->p = np;
366     conn->tp = tp;
367 
368     rv = apr_socket_create(&conn->sock, family, SOCK_STREAM, 0, np);
369 
370     if (rv != APR_SUCCESS) {
371         apr_pool_destroy(np);
372         return rv;
373     }
374 
375     conn->buffer = apr_palloc(conn->p, BUFFER_SIZE + 1);
376     conn->blen = 0;
377     conn->rs = rs;
378 
379     rv = conn_connect(conn);
380     if (rv != APR_SUCCESS) {
381         apr_pool_destroy(np);
382     }
383     else {
384         *conn_ = conn;
385     }
386 
387     return rv;
388 }
389 
390 #if APR_HAS_THREADS
391 static apr_status_t
rc_conn_destruct(void * conn_,void * params,apr_pool_t * pool)392 rc_conn_destruct(void *conn_, void *params, apr_pool_t *pool)
393 {
394     apr_redis_conn_t *conn = (apr_redis_conn_t *) conn_;
395     struct iovec vec[3];
396     apr_size_t written;
397 
398     /* send a quit message to the Redis server to be nice about it. */
399 
400     /*
401      * RESP Command:
402      *   *1
403      *   $4
404      *   QUIT
405      */
406     vec[0].iov_base = RC_RESP_1;
407     vec[0].iov_len = RC_RESP_1_LEN;
408 
409     vec[1].iov_base = RC_QUIT_SIZE;
410     vec[1].iov_len = RC_QUIT_SIZE_LEN;
411 
412     vec[2].iov_base = RC_QUIT;
413     vec[2].iov_len = RC_QUIT_LEN;
414 
415     /* Return values not checked, since we just want to make it go away. */
416     apr_socket_sendv(conn->sock, vec, 3, &written);
417     apr_socket_close(conn->sock);
418 
419     apr_pool_destroy(conn->p);
420 
421     return APR_SUCCESS;
422 }
423 #endif
424 
apr_redis_server_create(apr_pool_t * p,const char * host,apr_port_t port,apr_uint32_t min,apr_uint32_t smax,apr_uint32_t max,apr_uint32_t ttl,apr_uint32_t rwto,apr_redis_server_t ** rs)425 APU_DECLARE(apr_status_t) apr_redis_server_create(apr_pool_t *p,
426                                                   const char *host,
427                                                   apr_port_t port,
428                                                   apr_uint32_t min,
429                                                   apr_uint32_t smax,
430                                                   apr_uint32_t max,
431                                                   apr_uint32_t ttl,
432                                                   apr_uint32_t rwto,
433                                                   apr_redis_server_t **rs)
434 {
435     apr_status_t rv = APR_SUCCESS;
436     apr_redis_server_t *server;
437     apr_pool_t *np;
438 
439     rv = apr_pool_create(&np, p);
440 
441     server = apr_palloc(np, sizeof(apr_redis_server_t));
442 
443     server->p = np;
444     server->host = apr_pstrdup(np, host);
445     server->port = port;
446     server->status = APR_RC_SERVER_DEAD;
447     server->rwto = rwto;
448     server->version.major = 0;
449     server->version.minor = 0;
450     server->version.patch = 0;
451 
452 #if APR_HAS_THREADS
453     rv = apr_thread_mutex_create(&server->lock, APR_THREAD_MUTEX_DEFAULT, np);
454     if (rv != APR_SUCCESS) {
455         return rv;
456     }
457 
458     rv = apr_reslist_create(&server->conns,
459                             min,        /* hard minimum */
460                             smax,       /* soft maximum */
461                             max,        /* hard maximum */
462                             ttl,        /* Time to live */
463                             rc_conn_construct,  /* Make a New Connection */
464                             rc_conn_destruct,   /* Kill Old Connection */
465                             server, np);
466     if (rv != APR_SUCCESS) {
467         return rv;
468     }
469 
470     apr_reslist_cleanup_order_set(server->conns, APR_RESLIST_CLEANUP_FIRST);
471 #else
472     rv = rc_conn_construct((void **) &(server->conn), server, np);
473     if (rv != APR_SUCCESS) {
474         return rv;
475     }
476 #endif
477 
478     *rs = server;
479 
480     return rv;
481 }
482 
apr_redis_create(apr_pool_t * p,apr_uint16_t max_servers,apr_uint32_t flags,apr_redis_t ** redis)483 APU_DECLARE(apr_status_t) apr_redis_create(apr_pool_t *p,
484                                            apr_uint16_t max_servers,
485                                            apr_uint32_t flags,
486                                            apr_redis_t **redis)
487 {
488     apr_status_t rv = APR_SUCCESS;
489     apr_redis_t *rc;
490 
491     rc = apr_palloc(p, sizeof(apr_redis_t));
492     rc->p = p;
493     rc->nalloc = max_servers;
494     rc->ntotal = 0;
495     rc->live_servers =
496         apr_palloc(p, rc->nalloc * sizeof(struct apr_redis_server_t *));
497     rc->hash_func = NULL;
498     rc->hash_baton = NULL;
499     rc->server_func = NULL;
500     rc->server_baton = NULL;
501     *redis = rc;
502     return rv;
503 }
504 
505 
506 /* The crc32 functions and data was originally written by Spencer
507  * Garrett <srg@quick.com> and was gleaned from the PostgreSQL source
508  * tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at
509  * src/usr.bin/cksum/crc32.c.
510  */
511 
512 static const apr_uint32_t crc32tab[256] = {
513     0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
514     0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
515     0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
516     0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
517     0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
518     0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
519     0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
520     0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
521     0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
522     0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
523     0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
524     0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
525     0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
526     0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
527     0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
528     0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
529     0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
530     0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
531     0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
532     0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
533     0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
534     0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
535     0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
536     0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
537     0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
538     0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
539     0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
540     0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
541     0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
542     0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
543     0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
544     0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
545     0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
546     0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
547     0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
548     0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
549     0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
550     0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
551     0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
552     0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
553     0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
554     0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
555     0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
556     0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
557     0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
558     0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
559     0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
560     0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
561     0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
562     0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
563     0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
564     0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
565     0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
566     0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
567     0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
568     0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
569     0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
570     0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
571     0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
572     0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
573     0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
574     0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
575     0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
576     0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
577 };
578 
apr_redis_hash_crc32(void * baton,const char * data,const apr_size_t data_len)579 APU_DECLARE(apr_uint32_t) apr_redis_hash_crc32(void *baton,
580                                                const char *data,
581                                                const apr_size_t data_len)
582 {
583     apr_uint32_t i;
584     apr_uint32_t crc;
585     crc = ~0;
586 
587     for (i = 0; i < data_len; i++)
588         crc = (crc >> 8) ^ crc32tab[(crc ^ (data[i])) & 0xff];
589 
590     return ~crc;
591 }
592 
apr_redis_hash_default(void * baton,const char * data,const apr_size_t data_len)593 APU_DECLARE(apr_uint32_t) apr_redis_hash_default(void *baton,
594                                                  const char *data,
595                                                  const apr_size_t data_len)
596 {
597     /* The default Perl Client doesn't actually use just crc32 -- it shifts it again
598      * like this....
599      */
600     return ((apr_redis_hash_crc32(baton, data, data_len) >> 16) & 0x7fff);
601 }
602 
apr_redis_hash(apr_redis_t * rc,const char * data,const apr_size_t data_len)603 APU_DECLARE(apr_uint32_t) apr_redis_hash(apr_redis_t *rc,
604                                          const char *data,
605                                          const apr_size_t data_len)
606 {
607     if (rc->hash_func) {
608         return rc->hash_func(rc->hash_baton, data, data_len);
609     }
610     else {
611         return apr_redis_hash_default(NULL, data, data_len);
612     }
613 }
614 
get_server_line(apr_redis_conn_t * conn)615 static apr_status_t get_server_line(apr_redis_conn_t *conn)
616 {
617     apr_size_t bsize = BUFFER_SIZE;
618     apr_status_t rv = APR_SUCCESS;
619 
620     rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ,
621             BUFFER_SIZE);
622 
623     if (rv != APR_SUCCESS) {
624         return rv;
625     }
626 
627     rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize);
628 
629     if (rv != APR_SUCCESS) {
630         return rv;
631     }
632 
633     conn->blen = bsize;
634     conn->buffer[bsize] = '\0';
635 
636     return apr_brigade_cleanup(conn->tb);
637 }
638 
apr_redis_set(apr_redis_t * rc,const char * key,char * data,const apr_size_t data_size,apr_uint16_t flags)639 APU_DECLARE(apr_status_t) apr_redis_set(apr_redis_t *rc,
640                                         const char *key,
641                                         char *data,
642                                         const apr_size_t data_size,
643                                         apr_uint16_t flags)
644 {
645     apr_uint32_t hash;
646     apr_redis_server_t *rs;
647     apr_redis_conn_t *conn;
648     apr_status_t rv;
649     apr_size_t written;
650     struct iovec vec[9];
651     char keysize_str[LILBUFF_SIZE];
652     char datasize_str[LILBUFF_SIZE];
653     apr_size_t len, klen;
654 
655     klen = strlen(key);
656     hash = apr_redis_hash(rc, key, klen);
657 
658     rs = apr_redis_find_server_hash(rc, hash);
659 
660     if (rs == NULL)
661         return APR_NOTFOUND;
662 
663     rv = rs_find_conn(rs, &conn);
664 
665     if (rv != APR_SUCCESS) {
666         apr_redis_disable_server(rc, rs);
667         return rv;
668     }
669 
670     /*
671      * RESP Command:
672      *   *3
673      *   $3
674      *   SET
675      *   $<keylen>
676      *   key
677      *   $<datalen>
678      *   data
679      */
680 
681     vec[0].iov_base = RC_RESP_3;
682     vec[0].iov_len = RC_RESP_3_LEN;
683 
684     vec[1].iov_base = RC_SET_SIZE;
685     vec[1].iov_len = RC_SET_SIZE_LEN;
686 
687     vec[2].iov_base = RC_SET;
688     vec[2].iov_len = RC_SET_LEN;
689 
690     len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n", klen);
691     vec[3].iov_base = keysize_str;
692     vec[3].iov_len = len;
693 
694     vec[4].iov_base = (void *) key;
695     vec[4].iov_len = klen;
696 
697     vec[5].iov_base = RC_EOL;
698     vec[5].iov_len = RC_EOL_LEN;
699 
700     len = apr_snprintf(datasize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
701                      data_size);
702     vec[6].iov_base = datasize_str;
703     vec[6].iov_len = len;
704 
705     vec[7].iov_base = data;
706     vec[7].iov_len = data_size;
707 
708     vec[8].iov_base = RC_EOL;
709     vec[8].iov_len = RC_EOL_LEN;
710 
711     rv = apr_socket_sendv(conn->sock, vec, 9, &written);
712 
713     if (rv != APR_SUCCESS) {
714         rs_bad_conn(rs, conn);
715         apr_redis_disable_server(rc, rs);
716         return rv;
717     }
718 
719     rv = get_server_line(conn);
720     if (rv != APR_SUCCESS) {
721         rs_bad_conn(rs, conn);
722         apr_redis_disable_server(rc, rs);
723         return rv;
724     }
725 
726     if (strcmp(conn->buffer, RS_STORED RC_EOL) == 0) {
727         rv = APR_SUCCESS;
728     }
729     else if (strcmp(conn->buffer, RS_NOT_STORED RC_EOL) == 0) {
730         rv = APR_EEXIST;
731     }
732     else {
733         rv = APR_EGENERAL;
734     }
735 
736     rs_release_conn(rs, conn);
737     return rv;
738 }
739 
apr_redis_setex(apr_redis_t * rc,const char * key,char * data,const apr_size_t data_size,apr_uint32_t timeout,apr_uint16_t flags)740 APU_DECLARE(apr_status_t) apr_redis_setex(apr_redis_t *rc,
741                                           const char *key,
742                                           char *data,
743                                           const apr_size_t data_size,
744                                           apr_uint32_t timeout,
745                                           apr_uint16_t flags)
746 {
747     apr_uint32_t hash;
748     apr_redis_server_t *rs;
749     apr_redis_conn_t *conn;
750     apr_status_t rv;
751     apr_size_t written;
752     struct iovec vec[11];
753     char keysize_str[LILBUFF_SIZE];
754     char expire_str[LILBUFF_SIZE];
755     char expiresize_str[LILBUFF_SIZE];
756     char datasize_str[LILBUFF_SIZE];
757     apr_size_t len, klen, expire_len;
758 
759 
760     klen = strlen(key);
761     hash = apr_redis_hash(rc, key, klen);
762 
763     rs = apr_redis_find_server_hash(rc, hash);
764 
765     if (rs == NULL)
766         return APR_NOTFOUND;
767 
768     rv = rs_find_conn(rs, &conn);
769 
770     if (rv != APR_SUCCESS) {
771         apr_redis_disable_server(rc, rs);
772         return rv;
773     }
774 
775     /*
776      * RESP Command:
777      *   *4
778      *   $5
779      *   SETEX
780      *   $<keylen>
781      *   key
782      *   $<expirelen>
783      *   expirey
784      *   $<datalen>
785      *   data
786      */
787 
788     vec[0].iov_base = RC_RESP_4;
789     vec[0].iov_len = RC_RESP_4_LEN;
790 
791     vec[1].iov_base = RC_SETEX_SIZE;
792     vec[1].iov_len = RC_SETEX_SIZE_LEN;
793 
794     vec[2].iov_base = RC_SETEX;
795     vec[2].iov_len = RC_SETEX_LEN;
796 
797     len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n", klen);
798     vec[3].iov_base = keysize_str;
799     vec[3].iov_len = len;
800 
801     vec[4].iov_base = (void *) key;
802     vec[4].iov_len = klen;
803 
804     vec[5].iov_base = RC_EOL;
805     vec[5].iov_len = RC_EOL_LEN;
806 
807     expire_len = apr_snprintf(expire_str, LILBUFF_SIZE, "%u\r\n", timeout);
808     len = apr_snprintf(expiresize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
809                      expire_len - 2);
810     vec[6].iov_base = (void *) expiresize_str;
811     vec[6].iov_len = len;
812 
813     vec[7].iov_base = (void *) expire_str;
814     vec[7].iov_len = expire_len;
815 
816     len = apr_snprintf(datasize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
817                      data_size);
818     vec[8].iov_base = datasize_str;
819     vec[8].iov_len = len;
820 
821     vec[9].iov_base = data;
822     vec[9].iov_len = data_size;
823 
824     vec[10].iov_base = RC_EOL;
825     vec[10].iov_len = RC_EOL_LEN;
826 
827     rv = apr_socket_sendv(conn->sock, vec, 11, &written);
828 
829     if (rv != APR_SUCCESS) {
830         rs_bad_conn(rs, conn);
831         apr_redis_disable_server(rc, rs);
832         return rv;
833     }
834 
835     rv = get_server_line(conn);
836     if (rv != APR_SUCCESS) {
837         rs_bad_conn(rs, conn);
838         apr_redis_disable_server(rc, rs);
839         return rv;
840     }
841 
842     if (strcmp(conn->buffer, RS_STORED RC_EOL) == 0) {
843         rv = APR_SUCCESS;
844     }
845     else if (strcmp(conn->buffer, RS_NOT_STORED RC_EOL) == 0) {
846         rv = APR_EEXIST;
847     }
848     else {
849         rv = APR_EGENERAL;
850     }
851 
852     rs_release_conn(rs, conn);
853     return rv;
854 }
855 
grab_bulk_resp(apr_redis_server_t * rs,apr_redis_t * rc,apr_redis_conn_t * conn,apr_pool_t * p,char ** baton,apr_size_t * new_length)856 static apr_status_t grab_bulk_resp(apr_redis_server_t *rs, apr_redis_t *rc,
857                                    apr_redis_conn_t *conn, apr_pool_t *p,
858                                    char **baton, apr_size_t *new_length)
859 {
860     char *length;
861     char *last;
862     apr_status_t rv;
863     apr_size_t len = 0;
864     *new_length = 0;
865 
866     length = apr_strtok(conn->buffer + 1, " ", &last);
867     if (length) {
868         len = strtol(length, (char **) NULL, 10);
869     }
870 
871     if (len == 0) {
872         *new_length = 0;
873         *baton = NULL;
874     }
875     else {
876         apr_bucket_brigade *bbb;
877         apr_bucket *e;
878 
879         /* eat the trailing \r\n */
880         rv = apr_brigade_partition(conn->bb, len + 2, &e);
881 
882         if (rv != APR_SUCCESS) {
883             rs_bad_conn(rs, conn);
884             if (rc)
885                 apr_redis_disable_server(rc, rs);
886             return rv;
887         }
888 
889         bbb = apr_brigade_split(conn->bb, e);
890 
891         rv = apr_brigade_pflatten(conn->bb, baton, &len, p);
892 
893         if (rv != APR_SUCCESS) {
894             rs_bad_conn(rs, conn);
895             if (rc)
896                 apr_redis_disable_server(rc, rs);
897             return rv;
898         }
899 
900         rv = apr_brigade_destroy(conn->bb);
901         if (rv != APR_SUCCESS) {
902             rs_bad_conn(rs, conn);
903             if (rc)
904                 apr_redis_disable_server(rc, rs);
905             return rv;
906         }
907 
908         conn->bb = bbb;
909 
910         *new_length = len - 2;
911         (*baton)[*new_length] = '\0';
912     }
913     return APR_SUCCESS;
914 
915 }
916 
apr_redis_getp(apr_redis_t * rc,apr_pool_t * p,const char * key,char ** baton,apr_size_t * new_length,apr_uint16_t * flags)917 APU_DECLARE(apr_status_t) apr_redis_getp(apr_redis_t *rc,
918                                          apr_pool_t *p,
919                                          const char *key,
920                                          char **baton,
921                                          apr_size_t *new_length,
922                                          apr_uint16_t *flags)
923 {
924     apr_status_t rv;
925     apr_redis_server_t *rs;
926     apr_redis_conn_t *conn;
927     apr_uint32_t hash;
928     apr_size_t written;
929     apr_size_t len, klen;
930     struct iovec vec[6];
931     char keysize_str[LILBUFF_SIZE];
932 
933     klen = strlen(key);
934     hash = apr_redis_hash(rc, key, klen);
935     rs = apr_redis_find_server_hash(rc, hash);
936 
937     if (rs == NULL)
938         return APR_NOTFOUND;
939 
940     rv = rs_find_conn(rs, &conn);
941 
942     if (rv != APR_SUCCESS) {
943         apr_redis_disable_server(rc, rs);
944         return rv;
945     }
946 
947     /*
948      * RESP Command:
949      *   *2
950      *   $3
951      *   GET
952      *   $<keylen>
953      *   key
954      */
955     vec[0].iov_base = RC_RESP_2;
956     vec[0].iov_len = RC_RESP_2_LEN;
957 
958     vec[1].iov_base = RC_GET_SIZE;
959     vec[1].iov_len = RC_GET_SIZE_LEN;
960 
961     vec[2].iov_base = RC_GET;
962     vec[2].iov_len = RC_GET_LEN;
963 
964     len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
965                      klen);
966     vec[3].iov_base = keysize_str;
967     vec[3].iov_len = len;
968 
969     vec[4].iov_base = (void *) key;
970     vec[4].iov_len = klen;
971 
972     vec[5].iov_base = RC_EOL;
973     vec[5].iov_len = RC_EOL_LEN;
974 
975     rv = apr_socket_sendv(conn->sock, vec, 6, &written);
976 
977 
978     if (rv != APR_SUCCESS) {
979         rs_bad_conn(rs, conn);
980         apr_redis_disable_server(rc, rs);
981         return rv;
982     }
983 
984     rv = get_server_line(conn);
985     if (rv != APR_SUCCESS) {
986         rs_bad_conn(rs, conn);
987         apr_redis_disable_server(rc, rs);
988         return rv;
989     }
990     if (strncmp(RS_NOT_FOUND_GET, conn->buffer, RS_NOT_FOUND_GET_LEN) == 0) {
991         rv = APR_NOTFOUND;
992     }
993     else if (strncmp(RS_TYPE_STRING, conn->buffer, RS_TYPE_STRING_LEN) == 0) {
994         rv = grab_bulk_resp(rs, rc, conn, p, baton, new_length);
995     }
996     else {
997         rv = APR_EGENERAL;
998     }
999 
1000     rs_release_conn(rs, conn);
1001     return rv;
1002 }
1003 
1004 APU_DECLARE(apr_status_t)
apr_redis_delete(apr_redis_t * rc,const char * key,apr_uint32_t timeout)1005     apr_redis_delete(apr_redis_t *rc, const char *key, apr_uint32_t timeout)
1006 {
1007     apr_status_t rv;
1008     apr_redis_server_t *rs;
1009     apr_redis_conn_t *conn;
1010     apr_uint32_t hash;
1011     apr_size_t written;
1012     struct iovec vec[6];
1013     apr_size_t len, klen;
1014     char keysize_str[LILBUFF_SIZE];
1015 
1016     klen = strlen(key);
1017     hash = apr_redis_hash(rc, key, klen);
1018     rs = apr_redis_find_server_hash(rc, hash);
1019     if (rs == NULL)
1020         return APR_NOTFOUND;
1021 
1022     rv = rs_find_conn(rs, &conn);
1023 
1024     if (rv != APR_SUCCESS) {
1025         apr_redis_disable_server(rc, rs);
1026         return rv;
1027     }
1028 
1029     /*
1030      * RESP Command:
1031      *   *2
1032      *   $3
1033      *   DEL
1034      *   $<keylen>
1035      *   key
1036      */
1037     vec[0].iov_base = RC_RESP_2;
1038     vec[0].iov_len = RC_RESP_2_LEN;
1039 
1040     vec[1].iov_base = RC_DEL_SIZE;
1041     vec[1].iov_len = RC_DEL_SIZE_LEN;
1042 
1043     vec[2].iov_base = RC_DEL;
1044     vec[2].iov_len = RC_DEL_LEN;
1045 
1046     len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
1047                      klen);
1048     vec[3].iov_base = keysize_str;
1049     vec[3].iov_len = len;
1050 
1051     vec[4].iov_base = (void *) key;
1052     vec[4].iov_len = klen;
1053 
1054     vec[5].iov_base = RC_EOL;
1055     vec[5].iov_len = RC_EOL_LEN;
1056 
1057     rv = apr_socket_sendv(conn->sock, vec, 6, &written);
1058 
1059     if (rv != APR_SUCCESS) {
1060         rs_bad_conn(rs, conn);
1061         apr_redis_disable_server(rc, rs);
1062         return rv;
1063     }
1064 
1065     rv = get_server_line(conn);
1066     if (rv != APR_SUCCESS) {
1067         rs_bad_conn(rs, conn);
1068         apr_redis_disable_server(rc, rs);
1069         return rv;
1070     }
1071 
1072     if (strncmp(RS_DELETED, conn->buffer, RS_DELETED_LEN) == 0) {
1073         rv = APR_SUCCESS;
1074     }
1075     else if (strncmp(RS_NOT_FOUND_DEL, conn->buffer, RS_NOT_FOUND_DEL_LEN) == 0) {
1076         rv = APR_NOTFOUND;
1077     }
1078     else {
1079         rv = APR_EGENERAL;
1080     }
1081 
1082     rs_release_conn(rs, conn);
1083     return rv;
1084 }
1085 
1086 APU_DECLARE(apr_status_t)
apr_redis_ping(apr_redis_server_t * rs)1087 apr_redis_ping(apr_redis_server_t *rs)
1088 {
1089     apr_status_t rv;
1090     apr_size_t written;
1091     struct iovec vec[3];
1092     apr_redis_conn_t *conn;
1093 
1094     rv = rs_find_conn(rs, &conn);
1095 
1096     if (rv != APR_SUCCESS) {
1097         return rv;
1098     }
1099 
1100     /*
1101      * RESP Command:
1102      *   *1
1103      *   $4
1104      *   PING
1105      */
1106     vec[0].iov_base = RC_RESP_1;
1107     vec[0].iov_len = RC_RESP_1_LEN;
1108 
1109     vec[1].iov_base = RC_PING_SIZE;
1110     vec[1].iov_len = RC_PING_SIZE_LEN;
1111 
1112     vec[2].iov_base = RC_PING;
1113     vec[2].iov_len = RC_PING_LEN;
1114 
1115     rv = apr_socket_sendv(conn->sock, vec, 3, &written);
1116 
1117     if (rv != APR_SUCCESS) {
1118         rs_bad_conn(rs, conn);
1119         return rv;
1120     }
1121 
1122     rv = get_server_line(conn);
1123     if (rv == APR_SUCCESS) {
1124         /* we got *something*. Was it Redis? */
1125         if (strncmp(conn->buffer, "+PONG", sizeof("+PONG")-1) != 0) {
1126             rv = APR_EGENERAL;
1127         }
1128     }
1129     rs_release_conn(rs, conn);
1130     return rv;
1131 }
1132 
1133 APU_DECLARE(apr_status_t)
apr_redis_info(apr_redis_server_t * rs,apr_pool_t * p,char ** baton)1134 apr_redis_info(apr_redis_server_t *rs, apr_pool_t *p, char **baton)
1135 {
1136     apr_status_t rv;
1137     apr_redis_conn_t *conn;
1138     apr_size_t written;
1139     struct iovec vec[3];
1140 
1141     rv = rs_find_conn(rs, &conn);
1142 
1143     if (rv != APR_SUCCESS) {
1144         return rv;
1145     }
1146 
1147     /*
1148      * RESP Command:
1149      *   *1
1150      *   $4
1151      *   INFO
1152      */
1153     vec[0].iov_base = RC_RESP_1;
1154     vec[0].iov_len = RC_RESP_1_LEN;
1155 
1156     vec[1].iov_base = RC_INFO_SIZE;
1157     vec[1].iov_len = RC_INFO_SIZE_LEN;
1158 
1159     vec[2].iov_base = RC_INFO;
1160     vec[2].iov_len = RC_INFO_LEN;
1161 
1162     rv = apr_socket_sendv(conn->sock, vec, 3, &written);
1163 
1164     if (rv != APR_SUCCESS) {
1165         rs_bad_conn(rs, conn);
1166         return rv;
1167     }
1168 
1169     rv = get_server_line(conn);
1170     if (rv != APR_SUCCESS) {
1171         rs_bad_conn(rs, conn);
1172         return rv;
1173     }
1174 
1175     if (strncmp(RS_TYPE_STRING, conn->buffer, RS_TYPE_STRING_LEN) == 0) {
1176         apr_size_t nl;
1177         rv = grab_bulk_resp(rs, NULL, conn, p, baton, &nl);
1178     } else {
1179         rs_bad_conn(rs, conn);
1180         rv = APR_EGENERAL;
1181     }
1182 
1183     rs_release_conn(rs, conn);
1184     return rv;
1185 }
1186 
1187 #define RV_FIELD "redis_version:"
1188 APU_DECLARE(apr_status_t)
apr_redis_version(apr_redis_server_t * rs,apr_pool_t * p,char ** baton)1189 apr_redis_version(apr_redis_server_t *rs, apr_pool_t *p, char **baton)
1190 {
1191     apr_status_t rv;
1192     char *ptr, *eptr;
1193     apr_pool_t *subpool;
1194 
1195     /* Have we already obtained the version number? */
1196     if (rs->version.minor != 0) {
1197         if (baton)
1198             *baton = apr_pstrdup(p, rs->version.number);
1199         return APR_SUCCESS;
1200     }
1201     if (apr_pool_create(&subpool, p) != APR_SUCCESS) {
1202         /* well, we tried */
1203         subpool = p;
1204     }
1205     rv = apr_redis_info(rs, subpool, baton);
1206 
1207     if (rv != APR_SUCCESS) {
1208         if (subpool != p) {
1209             apr_pool_destroy(subpool);
1210         }
1211         return rv;
1212     }
1213 
1214     ptr = strstr(*baton, RV_FIELD);
1215     if (ptr) {
1216         rs->version.major = strtol(ptr + sizeof(RV_FIELD) - 1, &eptr, 10);
1217         ptr = eptr + 1;
1218         rs->version.minor = strtol(ptr, &eptr, 10);
1219         ptr = eptr + 1;
1220         rs->version.patch = strtol(ptr, &eptr, 10);
1221         rs->version.number = apr_psprintf(rs->p, "%d.%d.%d",
1222                 rs->version.major, rs->version.minor,
1223                 rs->version.patch);
1224     }
1225     if (baton)
1226         *baton = apr_pstrdup(p, rs->version.number);
1227     if (subpool != p) {
1228         apr_pool_destroy(subpool);
1229     }
1230     return APR_SUCCESS;
1231 }
1232 
plus_minus(apr_redis_t * rc,int incr,const char * key,apr_int32_t inc,apr_uint32_t * new_value)1233 static apr_status_t plus_minus(apr_redis_t *rc,
1234                     int incr,
1235                     const char *key,
1236                     apr_int32_t inc,
1237                     apr_uint32_t *new_value)
1238 {
1239     apr_status_t rv;
1240     apr_redis_server_t *rs;
1241     apr_redis_conn_t *conn;
1242     apr_uint32_t hash;
1243     apr_size_t written;
1244     apr_size_t len, klen;
1245     struct iovec vec[12];
1246     char keysize_str[LILBUFF_SIZE];
1247     char inc_str[LILBUFF_SIZE];
1248     char inc_str_len[LILBUFF_SIZE];
1249     int i = 0;
1250 
1251     klen = strlen(key);
1252     hash = apr_redis_hash(rc, key, klen);
1253     rs = apr_redis_find_server_hash(rc, hash);
1254     if (rs == NULL)
1255         return APR_NOTFOUND;
1256 
1257     rv = rs_find_conn(rs, &conn);
1258 
1259     if (rv != APR_SUCCESS) {
1260         apr_redis_disable_server(rc, rs);
1261         return rv;
1262     }
1263 
1264     /*
1265      * RESP Command:
1266      *   *2|*3
1267      *   $4|$6
1268      *   INCR/DECR|INCRBY/DECRBY
1269      *   $<keylen>
1270      *   key
1271      *   <:inc>
1272      */
1273     if (inc == 1) {
1274         vec[i].iov_base = RC_RESP_2;
1275         vec[i].iov_len = RC_RESP_2_LEN;
1276         i++;
1277 
1278         vec[i].iov_base = "$4\r\n";
1279         vec[i].iov_len = sizeof("$4\r\n")-1;
1280         i++;
1281 
1282         if (incr)
1283             vec[i].iov_base = "INCR\r\n";
1284         else
1285             vec[i].iov_base = "DECR\r\n";
1286         vec[i].iov_len = sizeof("INCR\r\n")-1;
1287         i++;
1288     }
1289     else {
1290         vec[i].iov_base = RC_RESP_3;
1291         vec[i].iov_len = RC_RESP_3_LEN;
1292         i++;
1293 
1294         vec[i].iov_base = "$6\r\n";
1295         vec[i].iov_len = sizeof("$6\r\n")-1;
1296         i++;
1297 
1298        if (incr)
1299             vec[i].iov_base = "INCRBY\r\n";
1300         else
1301             vec[i].iov_base = "DECRBY\r\n";
1302         vec[i].iov_len = sizeof("INCRBY\r\n")-1;
1303         i++;
1304     }
1305 
1306     len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
1307                      klen);
1308     vec[i].iov_base = keysize_str;
1309     vec[i].iov_len = len;
1310     i++;
1311 
1312     vec[i].iov_base = (void *) key;
1313     vec[i].iov_len = klen;
1314     i++;
1315 
1316     vec[i].iov_base = RC_EOL;
1317     vec[i].iov_len = RC_EOL_LEN;
1318     i++;
1319 
1320     if (inc != 1) {
1321         len = apr_snprintf(inc_str, LILBUFF_SIZE, "%d\r\n", inc);
1322         klen = apr_snprintf(inc_str_len, LILBUFF_SIZE, "$%d\r\n", (int)(len-2));
1323         vec[i].iov_base = inc_str_len;
1324         vec[i].iov_len = klen;
1325         i++;
1326 
1327         vec[i].iov_base = inc_str;
1328         vec[i].iov_len = len;
1329         i++;
1330 
1331         vec[i].iov_base = RC_EOL;
1332         vec[i].iov_len = RC_EOL_LEN;
1333         i++;
1334     }
1335 
1336     rv = apr_socket_sendv(conn->sock, vec, i, &written);
1337 
1338     if (rv != APR_SUCCESS) {
1339         rs_bad_conn(rs, conn);
1340         apr_redis_disable_server(rc, rs);
1341         return rv;
1342     }
1343 
1344     rv = get_server_line(conn);
1345     if (rv != APR_SUCCESS) {
1346         rs_bad_conn(rs, conn);
1347         apr_redis_disable_server(rc, rs);
1348         return rv;
1349     }
1350     if (strncmp(RS_NOT_FOUND_GET, conn->buffer, RS_NOT_FOUND_GET_LEN) == 0) {
1351         rv = APR_NOTFOUND;
1352     }
1353     else if (*conn->buffer == ':') {
1354         *new_value = atoi((const char *)(conn->buffer + 1));
1355         rv = APR_SUCCESS;
1356     }
1357     else {
1358         rv = APR_EGENERAL;
1359     }
1360     rs_release_conn(rs, conn);
1361     return rv;
1362 }
1363 
1364 APU_DECLARE(apr_status_t)
apr_redis_incr(apr_redis_t * rc,const char * key,apr_int32_t inc,apr_uint32_t * new_value)1365 apr_redis_incr(apr_redis_t *rc, const char *key, apr_int32_t inc, apr_uint32_t *new_value)
1366 {
1367     return plus_minus(rc, 1, key, inc, new_value);
1368 }
1369 
1370 APU_DECLARE(apr_status_t)
apr_redis_decr(apr_redis_t * rc,const char * key,apr_int32_t inc,apr_uint32_t * new_value)1371 apr_redis_decr(apr_redis_t *rc, const char *key, apr_int32_t inc, apr_uint32_t *new_value)
1372 {
1373     return plus_minus(rc, 0, key, inc, new_value);
1374 }
1375 
1376 APU_DECLARE(apr_status_t)
apr_redis_multgetp(apr_redis_t * rc,apr_pool_t * temp_pool,apr_pool_t * data_pool,apr_hash_t * values)1377 apr_redis_multgetp(apr_redis_t *rc,
1378                    apr_pool_t *temp_pool,
1379                    apr_pool_t *data_pool,
1380                    apr_hash_t *values)
1381 {
1382     return APR_ENOTIMPL;
1383 }
1384 
1385 /**
1386  * Define all of the strings for stats
1387  */
1388 
1389 #define STAT_process_id "process_id:"
1390 #define STAT_process_id_LEN (sizeof(STAT_process_id)-1)
1391 
1392 #define STAT_uptime_in_seconds "uptime_in_seconds:"
1393 #define STAT_uptime_in_seconds_LEN (sizeof(STAT_uptime_in_seconds)-1)
1394 
1395 #define STAT_arch_bits "arch_bits:"
1396 #define STAT_arch_bits_LEN (sizeof(STAT_arch_bits)-1)
1397 
1398 #define STAT_connected_clients "connected_clients:"
1399 #define STAT_connected_clients_LEN (sizeof(STAT_connected_clients)-1)
1400 
1401 #define STAT_blocked_clients "blocked_clients:"
1402 #define STAT_blocked_clients_LEN (sizeof(STAT_blocked_clients)-1)
1403 
1404 #define STAT_maxmemory "maxmemory:"
1405 #define STAT_maxmemory_LEN (sizeof(STAT_maxmemory)-1)
1406 
1407 #define STAT_used_memory "used_memory:"
1408 #define STAT_used_memory_LEN (sizeof(STAT_used_memory)-1)
1409 
1410 #define STAT_total_system_memory "total_system_memory:"
1411 #define STAT_total_system_memory_LEN (sizeof(STAT_total_system_memory)-1)
1412 
1413 #define STAT_total_connections_received "total_connections_received:"
1414 #define STAT_total_connections_received_LEN (sizeof(STAT_total_connections_received)-1)
1415 
1416 #define STAT_total_commands_processed "total_commands_processed:"
1417 #define STAT_total_commands_processed_LEN (sizeof(STAT_total_commands_processed)-1)
1418 
1419 #define STAT_rejected_connections "rejected_connections:"
1420 #define STAT_rejected_connections_LEN (sizeof(STAT_rejected_connections)-1)
1421 
1422 #define STAT_total_net_input_bytes "total_net_input_bytes:"
1423 #define STAT_total_net_input_bytes_LEN (sizeof(STAT_total_net_input_bytes)-1)
1424 
1425 #define STAT_total_net_output_bytes "total_net_output_bytes:"
1426 #define STAT_total_net_output_bytes_LEN (sizeof(STAT_total_net_output_bytes)-1)
1427 
1428 #define STAT_keyspace_hits "keyspace_hits:"
1429 #define STAT_keyspace_hits_LEN (sizeof(STAT_keyspace_hits)-1)
1430 
1431 #define STAT_keyspace_misses "keyspace_misses:"
1432 #define STAT_keyspace_misses_LEN (sizeof(STAT_keyspace_misses)-1)
1433 
1434 #define STAT_connected_slaves "connected_slaves:"
1435 #define STAT_connected_slaves_LEN (sizeof(STAT_connected_slaves)-1)
1436 
1437 #define STAT_used_cpu_sys "used_cpu_sys:"
1438 #define STAT_used_cpu_sys_LEN (sizeof(STAT_used_cpu_sys)-1)
1439 
1440 #define STAT_used_cpu_user "used_cpu_user:"
1441 #define STAT_used_cpu_user_LEN (sizeof(STAT_used_cpu_user)-1)
1442 
1443 #define STAT_cluster_enabled "cluster_enabled:"
1444 #define STAT_cluster_enabled_LEN (sizeof(STAT_cluster_enabled)-1)
1445 
stat_read_uint32(char * buf)1446 static apr_uint32_t stat_read_uint32( char *buf)
1447 {
1448     return atoi(buf);
1449 }
1450 
stat_read_uint64(char * buf)1451 static apr_uint64_t stat_read_uint64(char *buf)
1452 {
1453     return apr_atoi64(buf);
1454 }
1455 
1456 #define rc_do_stat(name, type) \
1457     if ((ptr = strstr(info , STAT_ ## name )) != NULL ) { \
1458         char *str = ptr + (STAT_ ## name ## _LEN ); \
1459         stats-> name = stat_read_ ## type (str); \
1460     }
1461 
update_stats(char * info,apr_redis_stats_t * stats)1462 static void update_stats(char *info, apr_redis_stats_t *stats)
1463 {
1464     char *ptr;
1465 
1466     rc_do_stat(process_id, uint32);
1467     rc_do_stat(uptime_in_seconds, uint32);
1468     rc_do_stat(arch_bits, uint32);
1469     rc_do_stat(connected_clients, uint32);
1470     rc_do_stat(blocked_clients, uint32);
1471     rc_do_stat(maxmemory, uint64);
1472     rc_do_stat(used_memory, uint64);
1473     rc_do_stat(total_system_memory, uint64);
1474     rc_do_stat(total_connections_received, uint64);
1475     rc_do_stat(total_commands_processed, uint64);
1476     rc_do_stat(rejected_connections, uint64);
1477     rc_do_stat(total_net_input_bytes, uint64);
1478     rc_do_stat(total_net_output_bytes, uint64);
1479     rc_do_stat(keyspace_hits, uint64);
1480     rc_do_stat(keyspace_misses, uint64);
1481     rc_do_stat(connected_slaves, uint32);
1482     rc_do_stat(used_cpu_sys, uint32);
1483     rc_do_stat(used_cpu_user, uint32);
1484     rc_do_stat(cluster_enabled, uint32);
1485 }
1486 
1487 APU_DECLARE(apr_status_t)
apr_redis_stats(apr_redis_server_t * rs,apr_pool_t * p,apr_redis_stats_t ** stats)1488 apr_redis_stats(apr_redis_server_t *rs,
1489                 apr_pool_t *p,
1490                 apr_redis_stats_t **stats)
1491 {
1492     apr_status_t rv;
1493     char *info;
1494     apr_pool_t *subpool;
1495     apr_redis_stats_t *ret;
1496     char *ptr;
1497 
1498     if (apr_pool_create(&subpool, p) != APR_SUCCESS) {
1499         /* well, we tried */
1500         subpool = p;
1501     }
1502     rv = apr_redis_info(rs, subpool, &info);
1503 
1504     if (rv != APR_SUCCESS) {
1505         if (subpool != p) {
1506             apr_pool_destroy(subpool);
1507         }
1508         return rv;
1509     }
1510     ret = apr_pcalloc(p, sizeof(apr_redis_stats_t));
1511     /* Get the bulk of the stats */
1512     update_stats(info, ret);
1513 
1514     /* Now the version number */
1515     if (rs->version.major != 0) {
1516         ret->major = rs->version.major;
1517         ret->minor = rs->version.minor;
1518         ret->patch = rs->version.patch;
1519     }
1520     else {
1521         char *eptr;
1522         ptr = strstr(info, RV_FIELD);
1523         if (ptr) {
1524             ret->major = rs->version.major = strtol(ptr + sizeof(RV_FIELD) - 1, &eptr, 10);
1525             ptr = eptr + 1;
1526             ret->minor = rs->version.minor = strtol(ptr, &eptr, 10);
1527             ptr = eptr + 1;
1528             ret->patch = rs->version.patch = strtol(ptr, &eptr, 10);
1529         }
1530     }
1531 
1532     /* Finally, the role */
1533     ptr = strstr(info, "role:");
1534     if (!ptr) {
1535         ret->role = APR_RS_SERVER_UNKNOWN;
1536     }
1537     else if (!strncmp("master", ptr + sizeof("role:") - 1, sizeof("master")-1)) {
1538         ret->role = APR_RS_SERVER_MASTER;
1539     }
1540     else {
1541         ret->role = APR_RS_SERVER_SLAVE;
1542     }
1543     if (stats) {
1544         *stats = ret;
1545     }
1546 
1547     return APR_SUCCESS;
1548 }
1549