1 /* Redis benchmark utility.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "fmacros.h"
32
33 #include <stdio.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <unistd.h>
37 #include <errno.h>
38 #include <time.h>
39 #include <sys/time.h>
40 #include <signal.h>
41 #include <assert.h>
42 #include <math.h>
43 #include <pthread.h>
44
45 #include <sds.h> /* Use hiredis sds. */
46 #include "ae.h"
47 #include "hiredis.h"
48 #include "adlist.h"
49 #include "dict.h"
50 #include "zmalloc.h"
51 #include "atomicvar.h"
52 #include "crc16_slottable.h"
53 #include "mt19937-64.h"
54
55 #define UNUSED(V) ((void) V)
56 #define RANDPTR_INITIAL_SIZE 8
57 #define MAX_LATENCY_PRECISION 3
58 #define MAX_THREADS 500
59 #define CLUSTER_SLOTS 16384
60
61 #define CLIENT_GET_EVENTLOOP(c) \
62 (c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el)
63
64 struct benchmarkThread;
65 struct clusterNode;
66 struct redisConfig;
67
68 static struct config {
69 aeEventLoop *el;
70 const char *hostip;
71 int hostport;
72 const char *hostsocket;
73 int numclients;
74 int liveclients;
75 int requests;
76 int requests_issued;
77 int requests_finished;
78 int keysize;
79 int datasize;
80 int randomkeys;
81 int randomkeys_keyspacelen;
82 int keepalive;
83 int pipeline;
84 int showerrors;
85 long long start;
86 long long totlatency;
87 long long *latency;
88 const char *title;
89 list *clients;
90 int quiet;
91 int csv;
92 int loop;
93 int idlemode;
94 int dbnum;
95 sds dbnumstr;
96 char *tests;
97 char *auth;
98 const char *user;
99 int precision;
100 int num_threads;
101 struct benchmarkThread **threads;
102 int cluster_mode;
103 int cluster_node_count;
104 struct clusterNode **cluster_nodes;
105 struct redisConfig *redis_config;
106 int is_fetching_slots;
107 int is_updating_slots;
108 int slots_last_update;
109 int enable_tracking;
110 /* Thread mutexes to be used as fallbacks by atomicvar.h */
111 pthread_mutex_t requests_issued_mutex;
112 pthread_mutex_t requests_finished_mutex;
113 pthread_mutex_t liveclients_mutex;
114 pthread_mutex_t is_fetching_slots_mutex;
115 pthread_mutex_t is_updating_slots_mutex;
116 pthread_mutex_t updating_slots_mutex;
117 pthread_mutex_t slots_last_update_mutex;
118 } config;
119
120 typedef struct _client {
121 redisContext *context;
122 sds obuf;
123 char **randptr; /* Pointers to :rand: strings inside the command buf */
124 size_t randlen; /* Number of pointers in client->randptr */
125 size_t randfree; /* Number of unused pointers in client->randptr */
126 char **stagptr; /* Pointers to slot hashtags (cluster mode only) */
127 size_t staglen; /* Number of pointers in client->stagptr */
128 size_t stagfree; /* Number of unused pointers in client->stagptr */
129 size_t written; /* Bytes of 'obuf' already written */
130 long long start; /* Start time of a request */
131 long long latency; /* Request latency */
132 int pending; /* Number of pending requests (replies to consume) */
133 int prefix_pending; /* If non-zero, number of pending prefix commands. Commands
134 such as auth and select are prefixed to the pipeline of
135 benchmark commands and discarded after the first send. */
136 int prefixlen; /* Size in bytes of the pending prefix commands */
137 int thread_id;
138 struct clusterNode *cluster_node;
139 int slots_last_update;
140 } *client;
141
142 /* Threads. */
143
144 typedef struct benchmarkThread {
145 int index;
146 pthread_t thread;
147 aeEventLoop *el;
148 } benchmarkThread;
149
150 /* Cluster. */
151 typedef struct clusterNode {
152 char *ip;
153 int port;
154 sds name;
155 int flags;
156 sds replicate; /* Master ID if node is a slave */
157 int *slots;
158 int slots_count;
159 int current_slot_index;
160 int *updated_slots; /* Used by updateClusterSlotsConfiguration */
161 int updated_slots_count; /* Used by updateClusterSlotsConfiguration */
162 int replicas_count;
163 sds *migrating; /* An array of sds where even strings are slots and odd
164 * strings are the destination node IDs. */
165 sds *importing; /* An array of sds where even strings are slots and odd
166 * strings are the source node IDs. */
167 int migrating_count; /* Length of the migrating array (migrating slots*2) */
168 int importing_count; /* Length of the importing array (importing slots*2) */
169 struct redisConfig *redis_config;
170 } clusterNode;
171
172 typedef struct redisConfig {
173 sds save;
174 sds appendonly;
175 } redisConfig;
176
177 /* Prototypes */
178 static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
179 static void createMissingClients(client c);
180 static benchmarkThread *createBenchmarkThread(int index);
181 static void freeBenchmarkThread(benchmarkThread *thread);
182 static void freeBenchmarkThreads();
183 static void *execBenchmarkThread(void *ptr);
184 static clusterNode *createClusterNode(char *ip, int port);
185 static redisConfig *getRedisConfig(const char *ip, int port,
186 const char *hostsocket);
187 static redisContext *getRedisContext(const char *ip, int port,
188 const char *hostsocket);
189 static void freeRedisConfig(redisConfig *cfg);
190 static int fetchClusterSlotsConfiguration(client c);
191 static void updateClusterSlotsConfiguration();
192 int showThroughput(struct aeEventLoop *eventLoop, long long id,
193 void *clientData);
194
195 /* Dict callbacks */
196 static uint64_t dictSdsHash(const void *key);
197 static int dictSdsKeyCompare(void *privdata, const void *key1,
198 const void *key2);
199
200 /* Implementation */
ustime(void)201 static long long ustime(void) {
202 struct timeval tv;
203 long long ust;
204
205 gettimeofday(&tv, NULL);
206 ust = ((long)tv.tv_sec)*1000000;
207 ust += tv.tv_usec;
208 return ust;
209 }
210
mstime(void)211 static long long mstime(void) {
212 struct timeval tv;
213 long long mst;
214
215 gettimeofday(&tv, NULL);
216 mst = ((long long)tv.tv_sec)*1000;
217 mst += tv.tv_usec/1000;
218 return mst;
219 }
220
dictSdsHash(const void * key)221 static uint64_t dictSdsHash(const void *key) {
222 return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
223 }
224
dictSdsKeyCompare(void * privdata,const void * key1,const void * key2)225 static int dictSdsKeyCompare(void *privdata, const void *key1,
226 const void *key2)
227 {
228 int l1,l2;
229 DICT_NOTUSED(privdata);
230
231 l1 = sdslen((sds)key1);
232 l2 = sdslen((sds)key2);
233 if (l1 != l2) return 0;
234 return memcmp(key1, key2, l1) == 0;
235 }
236
237 /* _serverAssert is needed by dict */
_serverAssert(const char * estr,const char * file,int line)238 void _serverAssert(const char *estr, const char *file, int line) {
239 fprintf(stderr, "=== ASSERTION FAILED ===");
240 fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr);
241 *((char*)-1) = 'x';
242 }
243
getRedisContext(const char * ip,int port,const char * hostsocket)244 static redisContext *getRedisContext(const char *ip, int port,
245 const char *hostsocket)
246 {
247 redisContext *ctx = NULL;
248 redisReply *reply = NULL;
249 if (hostsocket == NULL)
250 ctx = redisConnect(ip, port);
251 else
252 ctx = redisConnectUnix(hostsocket);
253 if (ctx == NULL || ctx->err) {
254 fprintf(stderr,"Could not connect to Redis at ");
255 char *err = (ctx != NULL ? ctx->errstr : "");
256 if (hostsocket == NULL)
257 fprintf(stderr,"%s:%d: %s\n",ip,port,err);
258 else
259 fprintf(stderr,"%s: %s\n",hostsocket,err);
260 goto cleanup;
261 }
262 if (config.auth == NULL)
263 return ctx;
264 if (config.user == NULL)
265 reply = redisCommand(ctx,"AUTH %s", config.auth);
266 else
267 reply = redisCommand(ctx,"AUTH %s %s", config.user, config.auth);
268 if (reply != NULL) {
269 if (reply->type == REDIS_REPLY_ERROR) {
270 if (hostsocket == NULL)
271 fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
272 else
273 fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
274 goto cleanup;
275 }
276 freeReplyObject(reply);
277 return ctx;
278 }
279 fprintf(stderr, "ERROR: failed to fetch reply from ");
280 if (hostsocket == NULL)
281 fprintf(stderr, "%s:%d\n", ip, port);
282 else
283 fprintf(stderr, "%s\n", hostsocket);
284 cleanup:
285 freeReplyObject(reply);
286 redisFree(ctx);
287 return NULL;
288 }
289
getRedisConfig(const char * ip,int port,const char * hostsocket)290 static redisConfig *getRedisConfig(const char *ip, int port,
291 const char *hostsocket)
292 {
293 redisConfig *cfg = zcalloc(sizeof(*cfg));
294 if (!cfg) return NULL;
295 redisContext *c = NULL;
296 redisReply *reply = NULL, *sub_reply = NULL;
297 c = getRedisContext(ip, port, hostsocket);
298 if (c == NULL) {
299 freeRedisConfig(cfg);
300 return NULL;
301 }
302 redisAppendCommand(c, "CONFIG GET %s", "save");
303 redisAppendCommand(c, "CONFIG GET %s", "appendonly");
304 int i = 0;
305 void *r = NULL;
306 for (; i < 2; i++) {
307 int res = redisGetReply(c, &r);
308 if (reply) freeReplyObject(reply);
309 reply = res == REDIS_OK ? ((redisReply *) r) : NULL;
310 if (res != REDIS_OK || !r) goto fail;
311 if (reply->type == REDIS_REPLY_ERROR) {
312 fprintf(stderr, "ERROR: %s\n", reply->str);
313 goto fail;
314 }
315 if (reply->type != REDIS_REPLY_ARRAY || reply->elements < 2) goto fail;
316 sub_reply = reply->element[1];
317 char *value = sub_reply->str;
318 if (!value) value = "";
319 switch (i) {
320 case 0: cfg->save = sdsnew(value); break;
321 case 1: cfg->appendonly = sdsnew(value); break;
322 }
323 }
324 freeReplyObject(reply);
325 redisFree(c);
326 return cfg;
327 fail:
328 fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
329 if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port);
330 else fprintf(stderr, "%s\n", hostsocket);
331 freeReplyObject(reply);
332 redisFree(c);
333 freeRedisConfig(cfg);
334 return NULL;
335 }
freeRedisConfig(redisConfig * cfg)336 static void freeRedisConfig(redisConfig *cfg) {
337 if (cfg->save) sdsfree(cfg->save);
338 if (cfg->appendonly) sdsfree(cfg->appendonly);
339 zfree(cfg);
340 }
341
freeClient(client c)342 static void freeClient(client c) {
343 aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
344 listNode *ln;
345 aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
346 aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
347 if (c->thread_id >= 0) {
348 int requests_finished = 0;
349 atomicGet(config.requests_finished, requests_finished);
350 if (requests_finished >= config.requests) {
351 aeStop(el);
352 }
353 }
354 redisFree(c->context);
355 sdsfree(c->obuf);
356 zfree(c->randptr);
357 zfree(c->stagptr);
358 zfree(c);
359 if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
360 config.liveclients--;
361 ln = listSearchKey(config.clients,c);
362 assert(ln != NULL);
363 listDelNode(config.clients,ln);
364 if (config.num_threads) pthread_mutex_unlock(&(config.liveclients_mutex));
365 }
366
freeAllClients(void)367 static void freeAllClients(void) {
368 listNode *ln = config.clients->head, *next;
369
370 while(ln) {
371 next = ln->next;
372 freeClient(ln->value);
373 ln = next;
374 }
375 }
376
resetClient(client c)377 static void resetClient(client c) {
378 aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
379 aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
380 aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
381 aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
382 c->written = 0;
383 c->pending = config.pipeline;
384 }
385
randomizeClientKey(client c)386 static void randomizeClientKey(client c) {
387 size_t i;
388
389 for (i = 0; i < c->randlen; i++) {
390 char *p = c->randptr[i]+11;
391 size_t r = 0;
392 if (config.randomkeys_keyspacelen != 0)
393 r = random() % config.randomkeys_keyspacelen;
394 size_t j;
395
396 for (j = 0; j < 12; j++) {
397 *p = '0'+r%10;
398 r/=10;
399 p--;
400 }
401 }
402 }
403
setClusterKeyHashTag(client c)404 static void setClusterKeyHashTag(client c) {
405 assert(c->thread_id >= 0);
406 clusterNode *node = c->cluster_node;
407 assert(node);
408 assert(node->current_slot_index < node->slots_count);
409 int is_updating_slots = 0;
410 atomicGet(config.is_updating_slots, is_updating_slots);
411 /* If updateClusterSlotsConfiguration is updating the slots array,
412 * call updateClusterSlotsConfiguration is order to block the thread
413 * since the mutex is locked. When the slots will be updated by the
414 * thread that's actually performing the update, the execution of
415 * updateClusterSlotsConfiguration won't actually do anything, since
416 * the updated_slots_count array will be already NULL. */
417 if (is_updating_slots) updateClusterSlotsConfiguration();
418 int slot = node->slots[node->current_slot_index];
419 const char *tag = crc16_slot_table[slot];
420 int taglen = strlen(tag);
421 size_t i;
422 for (i = 0; i < c->staglen; i++) {
423 char *p = c->stagptr[i] + 1;
424 p[0] = tag[0];
425 p[1] = (taglen >= 2 ? tag[1] : '}');
426 p[2] = (taglen == 3 ? tag[2] : '}');
427 }
428 }
429
clientDone(client c)430 static void clientDone(client c) {
431 int requests_finished = 0;
432 atomicGet(config.requests_finished, requests_finished);
433 if (requests_finished >= config.requests) {
434 freeClient(c);
435 if (!config.num_threads && config.el) aeStop(config.el);
436 return;
437 }
438 if (config.keepalive) {
439 resetClient(c);
440 } else {
441 if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
442 config.liveclients--;
443 createMissingClients(c);
444 config.liveclients++;
445 if (config.num_threads)
446 pthread_mutex_unlock(&(config.liveclients_mutex));
447 freeClient(c);
448 }
449 }
450
readHandler(aeEventLoop * el,int fd,void * privdata,int mask)451 static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
452 client c = privdata;
453 void *reply = NULL;
454 UNUSED(el);
455 UNUSED(fd);
456 UNUSED(mask);
457
458 /* Calculate latency only for the first read event. This means that the
459 * server already sent the reply and we need to parse it. Parsing overhead
460 * is not part of the latency, so calculate it only once, here. */
461 if (c->latency < 0) c->latency = ustime()-(c->start);
462
463 if (redisBufferRead(c->context) != REDIS_OK) {
464 fprintf(stderr,"Error: %s\n",c->context->errstr);
465 exit(1);
466 } else {
467 while(c->pending) {
468 if (redisGetReply(c->context,&reply) != REDIS_OK) {
469 fprintf(stderr,"Error: %s\n",c->context->errstr);
470 exit(1);
471 }
472 if (reply != NULL) {
473 if (reply == (void*)REDIS_REPLY_ERROR) {
474 fprintf(stderr,"Unexpected error reply, exiting...\n");
475 exit(1);
476 }
477 redisReply *r = reply;
478 int is_err = (r->type == REDIS_REPLY_ERROR);
479
480 if (is_err && config.showerrors) {
481 /* TODO: static lasterr_time not thread-safe */
482 static time_t lasterr_time = 0;
483 time_t now = time(NULL);
484 if (lasterr_time != now) {
485 lasterr_time = now;
486 if (c->cluster_node) {
487 printf("Error from server %s:%d: %s\n",
488 c->cluster_node->ip,
489 c->cluster_node->port,
490 r->str);
491 } else printf("Error from server: %s\n", r->str);
492 }
493 }
494
495 /* Try to update slots configuration if reply error is
496 * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
497 * contain(s) the slot hash tag. */
498 if (is_err && c->cluster_node && c->staglen) {
499 int fetch_slots = 0, do_wait = 0;
500 if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
501 fetch_slots = 1;
502 else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
503 /* Usually the cluster is able to recover itself after
504 * a CLUSTERDOWN error, so try to sleep one second
505 * before requesting the new configuration. */
506 fetch_slots = 1;
507 do_wait = 1;
508 printf("Error from server %s:%d: %s\n",
509 c->cluster_node->ip,
510 c->cluster_node->port,
511 r->str);
512 }
513 if (do_wait) sleep(1);
514 if (fetch_slots && !fetchClusterSlotsConfiguration(c))
515 exit(1);
516 }
517
518 freeReplyObject(reply);
519 /* This is an OK for prefix commands such as auth and select.*/
520 if (c->prefix_pending > 0) {
521 c->prefix_pending--;
522 c->pending--;
523 /* Discard prefix commands on first response.*/
524 if (c->prefixlen > 0) {
525 size_t j;
526 sdsrange(c->obuf, c->prefixlen, -1);
527 /* We also need to fix the pointers to the strings
528 * we need to randomize. */
529 for (j = 0; j < c->randlen; j++)
530 c->randptr[j] -= c->prefixlen;
531 /* Fix the pointers to the slot hash tags */
532 for (j = 0; j < c->staglen; j++)
533 c->stagptr[j] -= c->prefixlen;
534 c->prefixlen = 0;
535 }
536 continue;
537 }
538 int requests_finished = 0;
539 atomicGetIncr(config.requests_finished, requests_finished, 1);
540 if (requests_finished < config.requests)
541 config.latency[requests_finished] = c->latency;
542 c->pending--;
543 if (c->pending == 0) {
544 clientDone(c);
545 break;
546 }
547 } else {
548 break;
549 }
550 }
551 }
552 }
553
writeHandler(aeEventLoop * el,int fd,void * privdata,int mask)554 static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
555 client c = privdata;
556 UNUSED(el);
557 UNUSED(fd);
558 UNUSED(mask);
559
560 /* Initialize request when nothing was written. */
561 if (c->written == 0) {
562 /* Enforce upper bound to number of requests. */
563 int requests_issued = 0;
564 atomicGetIncr(config.requests_issued, requests_issued, 1);
565 if (requests_issued >= config.requests) {
566 freeClient(c);
567 return;
568 }
569
570 /* Really initialize: randomize keys and set start time. */
571 if (config.randomkeys) randomizeClientKey(c);
572 if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
573 atomicGet(config.slots_last_update, c->slots_last_update);
574 c->start = ustime();
575 c->latency = -1;
576 }
577 if (sdslen(c->obuf) > c->written) {
578 void *ptr = c->obuf+c->written;
579 ssize_t nwritten = write(c->context->fd,ptr,sdslen(c->obuf)-c->written);
580 if (nwritten == -1) {
581 if (errno != EPIPE)
582 fprintf(stderr, "Writing to socket: %s\n", strerror(errno));
583 freeClient(c);
584 return;
585 }
586 c->written += nwritten;
587 if (sdslen(c->obuf) == c->written) {
588 aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
589 aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c);
590 }
591 }
592 }
593
594 /* Create a benchmark client, configured to send the command passed as 'cmd' of
595 * 'len' bytes.
596 *
597 * The command is copied N times in the client output buffer (that is reused
598 * again and again to send the request to the server) accordingly to the configured
599 * pipeline size.
600 *
601 * Also an initial SELECT command is prepended in order to make sure the right
602 * database is selected, if needed. The initial SELECT will be discarded as soon
603 * as the first reply is received.
604 *
605 * To create a client from scratch, the 'from' pointer is set to NULL. If instead
606 * we want to create a client using another client as reference, the 'from' pointer
607 * points to the client to use as reference. In such a case the following
608 * information is take from the 'from' client:
609 *
610 * 1) The command line to use.
611 * 2) The offsets of the __rand_int__ elements inside the command line, used
612 * for arguments randomization.
613 *
614 * Even when cloning another client, prefix commands are applied if needed.*/
createClient(char * cmd,size_t len,client from,int thread_id)615 static client createClient(char *cmd, size_t len, client from, int thread_id) {
616 int j;
617 int is_cluster_client = (config.cluster_mode && thread_id >= 0);
618 client c = zmalloc(sizeof(struct _client));
619
620 const char *ip = NULL;
621 int port = 0;
622 c->cluster_node = NULL;
623 if (config.hostsocket == NULL || is_cluster_client) {
624 if (!is_cluster_client) {
625 ip = config.hostip;
626 port = config.hostport;
627 } else {
628 int node_idx = 0;
629 if (config.num_threads < config.cluster_node_count)
630 node_idx = config.liveclients % config.cluster_node_count;
631 else
632 node_idx = thread_id % config.cluster_node_count;
633 clusterNode *node = config.cluster_nodes[node_idx];
634 assert(node != NULL);
635 ip = (const char *) node->ip;
636 port = node->port;
637 c->cluster_node = node;
638 }
639 c->context = redisConnectNonBlock(ip,port);
640 } else {
641 c->context = redisConnectUnixNonBlock(config.hostsocket);
642 }
643 if (c->context->err) {
644 fprintf(stderr,"Could not connect to Redis at ");
645 if (config.hostsocket == NULL || is_cluster_client)
646 fprintf(stderr,"%s:%d: %s\n",ip,port,c->context->errstr);
647 else
648 fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
649 exit(1);
650 }
651 c->thread_id = thread_id;
652 /* Suppress hiredis cleanup of unused buffers for max speed. */
653 c->context->reader->maxbuf = 0;
654
655 /* Build the request buffer:
656 * Queue N requests accordingly to the pipeline size, or simply clone
657 * the example client buffer. */
658 c->obuf = sdsempty();
659 /* Prefix the request buffer with AUTH and/or SELECT commands, if applicable.
660 * These commands are discarded after the first response, so if the client is
661 * reused the commands will not be used again. */
662 c->prefix_pending = 0;
663 if (config.auth) {
664 char *buf = NULL;
665 int len;
666 if (config.user == NULL)
667 len = redisFormatCommand(&buf, "AUTH %s", config.auth);
668 else
669 len = redisFormatCommand(&buf, "AUTH %s %s",
670 config.user, config.auth);
671 c->obuf = sdscatlen(c->obuf, buf, len);
672 free(buf);
673 c->prefix_pending++;
674 }
675
676 if (config.enable_tracking) {
677 char *buf = NULL;
678 int len = redisFormatCommand(&buf, "CLIENT TRACKING on");
679 c->obuf = sdscatlen(c->obuf, buf, len);
680 free(buf);
681 c->prefix_pending++;
682 }
683
684 /* If a DB number different than zero is selected, prefix our request
685 * buffer with the SELECT command, that will be discarded the first
686 * time the replies are received, so if the client is reused the
687 * SELECT command will not be used again. */
688 if (config.dbnum != 0 && !is_cluster_client) {
689 c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
690 (int)sdslen(config.dbnumstr),config.dbnumstr);
691 c->prefix_pending++;
692 }
693 c->prefixlen = sdslen(c->obuf);
694 /* Append the request itself. */
695 if (from) {
696 c->obuf = sdscatlen(c->obuf,
697 from->obuf+from->prefixlen,
698 sdslen(from->obuf)-from->prefixlen);
699 } else {
700 for (j = 0; j < config.pipeline; j++)
701 c->obuf = sdscatlen(c->obuf,cmd,len);
702 }
703
704 c->written = 0;
705 c->pending = config.pipeline+c->prefix_pending;
706 c->randptr = NULL;
707 c->randlen = 0;
708 c->stagptr = NULL;
709 c->staglen = 0;
710
711 /* Find substrings in the output buffer that need to be randomized. */
712 if (config.randomkeys) {
713 if (from) {
714 c->randlen = from->randlen;
715 c->randfree = 0;
716 c->randptr = zmalloc(sizeof(char*)*c->randlen);
717 /* copy the offsets. */
718 for (j = 0; j < (int)c->randlen; j++) {
719 c->randptr[j] = c->obuf + (from->randptr[j]-from->obuf);
720 /* Adjust for the different select prefix length. */
721 c->randptr[j] += c->prefixlen - from->prefixlen;
722 }
723 } else {
724 char *p = c->obuf;
725
726 c->randlen = 0;
727 c->randfree = RANDPTR_INITIAL_SIZE;
728 c->randptr = zmalloc(sizeof(char*)*c->randfree);
729 while ((p = strstr(p,"__rand_int__")) != NULL) {
730 if (c->randfree == 0) {
731 c->randptr = zrealloc(c->randptr,sizeof(char*)*c->randlen*2);
732 c->randfree += c->randlen;
733 }
734 c->randptr[c->randlen++] = p;
735 c->randfree--;
736 p += 12; /* 12 is strlen("__rand_int__). */
737 }
738 }
739 }
740 /* If cluster mode is enabled, set slot hashtags pointers. */
741 if (config.cluster_mode) {
742 if (from) {
743 c->staglen = from->staglen;
744 c->stagfree = 0;
745 c->stagptr = zmalloc(sizeof(char*)*c->staglen);
746 /* copy the offsets. */
747 for (j = 0; j < (int)c->staglen; j++) {
748 c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf);
749 /* Adjust for the different select prefix length. */
750 c->stagptr[j] += c->prefixlen - from->prefixlen;
751 }
752 } else {
753 char *p = c->obuf;
754
755 c->staglen = 0;
756 c->stagfree = RANDPTR_INITIAL_SIZE;
757 c->stagptr = zmalloc(sizeof(char*)*c->stagfree);
758 while ((p = strstr(p,"{tag}")) != NULL) {
759 if (c->stagfree == 0) {
760 c->stagptr = zrealloc(c->stagptr,
761 sizeof(char*) * c->staglen*2);
762 c->stagfree += c->staglen;
763 }
764 c->stagptr[c->staglen++] = p;
765 c->stagfree--;
766 p += 5; /* 5 is strlen("{tag}"). */
767 }
768 }
769 }
770 aeEventLoop *el = NULL;
771 if (thread_id < 0) el = config.el;
772 else {
773 benchmarkThread *thread = config.threads[thread_id];
774 el = thread->el;
775 }
776 if (config.idlemode == 0)
777 aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
778 listAddNodeTail(config.clients,c);
779 atomicIncr(config.liveclients, 1);
780 atomicGet(config.slots_last_update, c->slots_last_update);
781 return c;
782 }
783
createMissingClients(client c)784 static void createMissingClients(client c) {
785 int n = 0;
786 while(config.liveclients < config.numclients) {
787 int thread_id = -1;
788 if (config.num_threads)
789 thread_id = config.liveclients % config.num_threads;
790 createClient(NULL,0,c,thread_id);
791
792 /* Listen backlog is quite limited on most systems */
793 if (++n > 64) {
794 usleep(50000);
795 n = 0;
796 }
797 }
798 }
799
compareLatency(const void * a,const void * b)800 static int compareLatency(const void *a, const void *b) {
801 return (*(long long*)a)-(*(long long*)b);
802 }
803
ipow(int base,int exp)804 static int ipow(int base, int exp) {
805 int result = 1;
806 while (exp) {
807 if (exp & 1) result *= base;
808 exp /= 2;
809 base *= base;
810 }
811 return result;
812 }
813
showLatencyReport(void)814 static void showLatencyReport(void) {
815 int i, curlat = 0;
816 int usbetweenlat = ipow(10, MAX_LATENCY_PRECISION-config.precision);
817 float perc, reqpersec;
818
819 reqpersec = (float)config.requests_finished/((float)config.totlatency/1000);
820 if (!config.quiet && !config.csv) {
821 printf("====== %s ======\n", config.title);
822 printf(" %d requests completed in %.2f seconds\n", config.requests_finished,
823 (float)config.totlatency/1000);
824 printf(" %d parallel clients\n", config.numclients);
825 printf(" %d bytes payload\n", config.datasize);
826 printf(" keep alive: %d\n", config.keepalive);
827 if (config.cluster_mode) {
828 printf(" cluster mode: yes (%d masters)\n",
829 config.cluster_node_count);
830 int m ;
831 for (m = 0; m < config.cluster_node_count; m++) {
832 clusterNode *node = config.cluster_nodes[m];
833 redisConfig *cfg = node->redis_config;
834 if (cfg == NULL) continue;
835 printf(" node [%d] configuration:\n",m );
836 printf(" save: %s\n",
837 sdslen(cfg->save) ? cfg->save : "NONE");
838 printf(" appendonly: %s\n", cfg->appendonly);
839 }
840 } else {
841 if (config.redis_config) {
842 printf(" host configuration \"save\": %s\n",
843 config.redis_config->save);
844 printf(" host configuration \"appendonly\": %s\n",
845 config.redis_config->appendonly);
846 }
847 }
848 printf(" multi-thread: %s\n", (config.num_threads ? "yes" : "no"));
849 if (config.num_threads)
850 printf(" threads: %d\n", config.num_threads);
851
852 printf("\n");
853
854 qsort(config.latency,config.requests,sizeof(long long),compareLatency);
855 for (i = 0; i < config.requests; i++) {
856 if (config.latency[i]/usbetweenlat != curlat ||
857 i == (config.requests-1))
858 {
859 /* After the 2 milliseconds latency to have percentages split
860 * by decimals will just add a lot of noise to the output. */
861 if (config.latency[i] >= 2000) {
862 config.precision = 0;
863 usbetweenlat = ipow(10,
864 MAX_LATENCY_PRECISION-config.precision);
865 }
866
867 curlat = config.latency[i]/usbetweenlat;
868 perc = ((float)(i+1)*100)/config.requests;
869 printf("%.2f%% <= %.*f milliseconds\n", perc, config.precision,
870 curlat/pow(10.0, config.precision));
871 }
872 }
873 printf("%.2f requests per second\n\n", reqpersec);
874 } else if (config.csv) {
875 printf("\"%s\",\"%.2f\"\n", config.title, reqpersec);
876 } else {
877 printf("%s: %.2f requests per second\n", config.title, reqpersec);
878 }
879 }
880
initBenchmarkThreads()881 static void initBenchmarkThreads() {
882 int i;
883 if (config.threads) freeBenchmarkThreads();
884 config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*));
885 for (i = 0; i < config.num_threads; i++) {
886 benchmarkThread *thread = createBenchmarkThread(i);
887 config.threads[i] = thread;
888 }
889 }
890
startBenchmarkThreads()891 static void startBenchmarkThreads() {
892 int i;
893 for (i = 0; i < config.num_threads; i++) {
894 benchmarkThread *t = config.threads[i];
895 if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
896 fprintf(stderr, "FATAL: Failed to start thread %d.\n", i);
897 exit(1);
898 }
899 }
900 for (i = 0; i < config.num_threads; i++)
901 pthread_join(config.threads[i]->thread, NULL);
902 }
903
benchmark(char * title,char * cmd,int len)904 static void benchmark(char *title, char *cmd, int len) {
905 client c;
906
907 config.title = title;
908 config.requests_issued = 0;
909 config.requests_finished = 0;
910
911 if (config.num_threads) initBenchmarkThreads();
912
913 int thread_id = config.num_threads > 0 ? 0 : -1;
914 c = createClient(cmd,len,NULL,thread_id);
915 createMissingClients(c);
916
917 config.start = mstime();
918 if (!config.num_threads) aeMain(config.el);
919 else startBenchmarkThreads();
920 config.totlatency = mstime()-config.start;
921
922 showLatencyReport();
923 freeAllClients();
924 if (config.threads) freeBenchmarkThreads();
925 }
926
927 /* Thread functions. */
928
createBenchmarkThread(int index)929 static benchmarkThread *createBenchmarkThread(int index) {
930 benchmarkThread *thread = zmalloc(sizeof(*thread));
931 if (thread == NULL) return NULL;
932 thread->index = index;
933 thread->el = aeCreateEventLoop(1024*10);
934 aeCreateTimeEvent(thread->el,1,showThroughput,NULL,NULL);
935 return thread;
936 }
937
freeBenchmarkThread(benchmarkThread * thread)938 static void freeBenchmarkThread(benchmarkThread *thread) {
939 if (thread->el) aeDeleteEventLoop(thread->el);
940 zfree(thread);
941 }
942
freeBenchmarkThreads()943 static void freeBenchmarkThreads() {
944 int i = 0;
945 for (; i < config.num_threads; i++) {
946 benchmarkThread *thread = config.threads[i];
947 if (thread) freeBenchmarkThread(thread);
948 }
949 zfree(config.threads);
950 config.threads = NULL;
951 }
952
execBenchmarkThread(void * ptr)953 static void *execBenchmarkThread(void *ptr) {
954 benchmarkThread *thread = (benchmarkThread *) ptr;
955 aeMain(thread->el);
956 return NULL;
957 }
958
959 /* Cluster helper functions. */
960
createClusterNode(char * ip,int port)961 static clusterNode *createClusterNode(char *ip, int port) {
962 clusterNode *node = zmalloc(sizeof(*node));
963 if (!node) return NULL;
964 node->ip = ip;
965 node->port = port;
966 node->name = NULL;
967 node->flags = 0;
968 node->replicate = NULL;
969 node->replicas_count = 0;
970 node->slots = zmalloc(CLUSTER_SLOTS * sizeof(int));
971 node->slots_count = 0;
972 node->current_slot_index = 0;
973 node->updated_slots = NULL;
974 node->updated_slots_count = 0;
975 node->migrating = NULL;
976 node->importing = NULL;
977 node->migrating_count = 0;
978 node->importing_count = 0;
979 node->redis_config = NULL;
980 return node;
981 }
982
freeClusterNode(clusterNode * node)983 static void freeClusterNode(clusterNode *node) {
984 int i;
985 if (node->name) sdsfree(node->name);
986 if (node->replicate) sdsfree(node->replicate);
987 if (node->migrating != NULL) {
988 for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]);
989 zfree(node->migrating);
990 }
991 if (node->importing != NULL) {
992 for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
993 zfree(node->importing);
994 }
995 /* If the node is not the reference node, that uses the address from
996 * config.hostip and config.hostport, then the node ip has been
997 * allocated by fetchClusterConfiguration, so it must be freed. */
998 if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip);
999 if (node->redis_config != NULL) freeRedisConfig(node->redis_config);
1000 zfree(node->slots);
1001 zfree(node);
1002 }
1003
freeClusterNodes()1004 static void freeClusterNodes() {
1005 int i = 0;
1006 for (; i < config.cluster_node_count; i++) {
1007 clusterNode *n = config.cluster_nodes[i];
1008 if (n) freeClusterNode(n);
1009 }
1010 zfree(config.cluster_nodes);
1011 config.cluster_nodes = NULL;
1012 }
1013
addClusterNode(clusterNode * node)1014 static clusterNode **addClusterNode(clusterNode *node) {
1015 int count = config.cluster_node_count + 1;
1016 config.cluster_nodes = zrealloc(config.cluster_nodes,
1017 count * sizeof(*node));
1018 if (!config.cluster_nodes) return NULL;
1019 config.cluster_nodes[config.cluster_node_count++] = node;
1020 return config.cluster_nodes;
1021 }
1022
fetchClusterConfiguration()1023 static int fetchClusterConfiguration() {
1024 int success = 1;
1025 redisContext *ctx = NULL;
1026 redisReply *reply = NULL;
1027 ctx = getRedisContext(config.hostip, config.hostport, config.hostsocket);
1028 if (ctx == NULL) {
1029 exit(1);
1030 }
1031 clusterNode *firstNode = createClusterNode((char *) config.hostip,
1032 config.hostport);
1033 if (!firstNode) {success = 0; goto cleanup;}
1034 reply = redisCommand(ctx, "CLUSTER NODES");
1035 success = (reply != NULL);
1036 if (!success) goto cleanup;
1037 success = (reply->type != REDIS_REPLY_ERROR);
1038 if (!success) {
1039 if (config.hostsocket == NULL) {
1040 fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n",
1041 config.hostip, config.hostport, reply->str);
1042 } else {
1043 fprintf(stderr, "Cluster node %s replied with error:\n%s\n",
1044 config.hostsocket, reply->str);
1045 }
1046 goto cleanup;
1047 }
1048 char *lines = reply->str, *p, *line;
1049 while ((p = strstr(lines, "\n")) != NULL) {
1050 *p = '\0';
1051 line = lines;
1052 lines = p + 1;
1053 char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL;
1054 int i = 0;
1055 while ((p = strchr(line, ' ')) != NULL) {
1056 *p = '\0';
1057 char *token = line;
1058 line = p + 1;
1059 switch(i++){
1060 case 0: name = token; break;
1061 case 1: addr = token; break;
1062 case 2: flags = token; break;
1063 case 3: master_id = token; break;
1064 }
1065 if (i == 8) break; // Slots
1066 }
1067 if (!flags) {
1068 fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n");
1069 success = 0;
1070 goto cleanup;
1071 }
1072 int myself = (strstr(flags, "myself") != NULL);
1073 int is_replica = (strstr(flags, "slave") != NULL ||
1074 (master_id != NULL && master_id[0] != '-'));
1075 if (is_replica) continue;
1076 if (addr == NULL) {
1077 fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n");
1078 success = 0;
1079 goto cleanup;
1080 }
1081 clusterNode *node = NULL;
1082 char *ip = NULL;
1083 int port = 0;
1084 char *paddr = strchr(addr, ':');
1085 if (paddr != NULL) {
1086 *paddr = '\0';
1087 ip = addr;
1088 addr = paddr + 1;
1089 /* If internal bus is specified, then just drop it. */
1090 if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0';
1091 port = atoi(addr);
1092 }
1093 if (myself) {
1094 node = firstNode;
1095 if (node->ip == NULL && ip != NULL) {
1096 node->ip = ip;
1097 node->port = port;
1098 }
1099 } else {
1100 node = createClusterNode(sdsnew(ip), port);
1101 }
1102 if (node == NULL) {
1103 success = 0;
1104 goto cleanup;
1105 }
1106 if (name != NULL) node->name = sdsnew(name);
1107 if (i == 8) {
1108 int remaining = strlen(line);
1109 while (remaining > 0) {
1110 p = strchr(line, ' ');
1111 if (p == NULL) p = line + remaining;
1112 remaining -= (p - line);
1113
1114 char *slotsdef = line;
1115 *p = '\0';
1116 if (remaining) {
1117 line = p + 1;
1118 remaining--;
1119 } else line = p;
1120 char *dash = NULL;
1121 if (slotsdef[0] == '[') {
1122 slotsdef++;
1123 if ((p = strstr(slotsdef, "->-"))) { // Migrating
1124 *p = '\0';
1125 p += 3;
1126 char *closing_bracket = strchr(p, ']');
1127 if (closing_bracket) *closing_bracket = '\0';
1128 sds slot = sdsnew(slotsdef);
1129 sds dst = sdsnew(p);
1130 node->migrating_count += 2;
1131 node->migrating =
1132 zrealloc(node->migrating,
1133 (node->migrating_count * sizeof(sds)));
1134 node->migrating[node->migrating_count - 2] =
1135 slot;
1136 node->migrating[node->migrating_count - 1] =
1137 dst;
1138 } else if ((p = strstr(slotsdef, "-<-"))) {//Importing
1139 *p = '\0';
1140 p += 3;
1141 char *closing_bracket = strchr(p, ']');
1142 if (closing_bracket) *closing_bracket = '\0';
1143 sds slot = sdsnew(slotsdef);
1144 sds src = sdsnew(p);
1145 node->importing_count += 2;
1146 node->importing = zrealloc(node->importing,
1147 (node->importing_count * sizeof(sds)));
1148 node->importing[node->importing_count - 2] =
1149 slot;
1150 node->importing[node->importing_count - 1] =
1151 src;
1152 }
1153 } else if ((dash = strchr(slotsdef, '-')) != NULL) {
1154 p = dash;
1155 int start, stop;
1156 *p = '\0';
1157 start = atoi(slotsdef);
1158 stop = atoi(p + 1);
1159 while (start <= stop) {
1160 int slot = start++;
1161 node->slots[node->slots_count++] = slot;
1162 }
1163 } else if (p > slotsdef) {
1164 int slot = atoi(slotsdef);
1165 node->slots[node->slots_count++] = slot;
1166 }
1167 }
1168 }
1169 if (node->slots_count == 0) {
1170 printf("WARNING: master node %s:%d has no slots, skipping...\n",
1171 node->ip, node->port);
1172 continue;
1173 }
1174 if (!addClusterNode(node)) {
1175 success = 0;
1176 goto cleanup;
1177 }
1178 }
1179 cleanup:
1180 if (ctx) redisFree(ctx);
1181 if (!success) {
1182 if (config.cluster_nodes) freeClusterNodes();
1183 }
1184 if (reply) freeReplyObject(reply);
1185 return success;
1186 }
1187
1188 /* Request the current cluster slots configuration by calling CLUSTER SLOTS
1189 * and atomically update the slots after a successful reply. */
fetchClusterSlotsConfiguration(client c)1190 static int fetchClusterSlotsConfiguration(client c) {
1191 UNUSED(c);
1192 int success = 1, is_fetching_slots = 0, last_update = 0;
1193 size_t i;
1194 atomicGet(config.slots_last_update, last_update);
1195 if (c->slots_last_update < last_update) {
1196 c->slots_last_update = last_update;
1197 return -1;
1198 }
1199 redisReply *reply = NULL;
1200 atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1);
1201 if (is_fetching_slots) return -1; //TODO: use other codes || errno ?
1202 atomicSet(config.is_fetching_slots, 1);
1203 if (config.showerrors)
1204 printf("Cluster slots configuration changed, fetching new one...\n");
1205 const char *errmsg = "Failed to update cluster slots configuration";
1206 static dictType dtype = {
1207 dictSdsHash, /* hash function */
1208 NULL, /* key dup */
1209 NULL, /* val dup */
1210 dictSdsKeyCompare, /* key compare */
1211 NULL, /* key destructor */
1212 NULL /* val destructor */
1213 };
1214 /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */
1215 dict *masters = dictCreate(&dtype, NULL);
1216 redisContext *ctx = NULL;
1217 for (i = 0; i < (size_t) config.cluster_node_count; i++) {
1218 clusterNode *node = config.cluster_nodes[i];
1219 assert(node->ip != NULL);
1220 assert(node->name != NULL);
1221 assert(node->port);
1222 /* Use first node as entry point to connect to. */
1223 if (ctx == NULL) {
1224 ctx = getRedisContext(node->ip, node->port, NULL);
1225 if (!ctx) {
1226 success = 0;
1227 goto cleanup;
1228 }
1229 }
1230 if (node->updated_slots != NULL)
1231 zfree(node->updated_slots);
1232 node->updated_slots = NULL;
1233 node->updated_slots_count = 0;
1234 dictReplace(masters, node->name, node) ;
1235 }
1236 reply = redisCommand(ctx, "CLUSTER SLOTS");
1237 if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
1238 success = 0;
1239 if (reply)
1240 fprintf(stderr,"%s\nCLUSTER SLOTS ERROR: %s\n",errmsg,reply->str);
1241 goto cleanup;
1242 }
1243 assert(reply->type == REDIS_REPLY_ARRAY);
1244 for (i = 0; i < reply->elements; i++) {
1245 redisReply *r = reply->element[i];
1246 assert(r->type == REDIS_REPLY_ARRAY);
1247 assert(r->elements >= 3);
1248 int from, to, slot;
1249 from = r->element[0]->integer;
1250 to = r->element[1]->integer;
1251 redisReply *nr = r->element[2];
1252 assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
1253 assert(nr->element[2]->str != NULL);
1254 sds name = sdsnew(nr->element[2]->str);
1255 dictEntry *entry = dictFind(masters, name);
1256 if (entry == NULL) {
1257 success = 0;
1258 fprintf(stderr, "%s: could not find node with ID %s in current "
1259 "configuration.\n", errmsg, name);
1260 if (name) sdsfree(name);
1261 goto cleanup;
1262 }
1263 sdsfree(name);
1264 clusterNode *node = dictGetVal(entry);
1265 if (node->updated_slots == NULL)
1266 node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int));
1267 for (slot = from; slot <= to; slot++)
1268 node->updated_slots[node->updated_slots_count++] = slot;
1269 }
1270 updateClusterSlotsConfiguration();
1271 cleanup:
1272 freeReplyObject(reply);
1273 redisFree(ctx);
1274 dictRelease(masters);
1275 atomicSet(config.is_fetching_slots, 0);
1276 return success;
1277 }
1278
1279 /* Atomically update the new slots configuration. */
updateClusterSlotsConfiguration()1280 static void updateClusterSlotsConfiguration() {
1281 pthread_mutex_lock(&config.is_updating_slots_mutex);
1282 atomicSet(config.is_updating_slots, 1);
1283 int i;
1284 for (i = 0; i < config.cluster_node_count; i++) {
1285 clusterNode *node = config.cluster_nodes[i];
1286 if (node->updated_slots != NULL) {
1287 int *oldslots = node->slots;
1288 node->slots = node->updated_slots;
1289 node->slots_count = node->updated_slots_count;
1290 node->current_slot_index = 0;
1291 node->updated_slots = NULL;
1292 node->updated_slots_count = 0;
1293 zfree(oldslots);
1294 }
1295 }
1296 atomicSet(config.is_updating_slots, 0);
1297 atomicIncr(config.slots_last_update, 1);
1298 pthread_mutex_unlock(&config.is_updating_slots_mutex);
1299 }
1300
1301 /* Generate random data for redis benchmark. See #7196. */
genBenchmarkRandomData(char * data,int count)1302 static void genBenchmarkRandomData(char *data, int count) {
1303 static uint32_t state = 1234;
1304 int i = 0;
1305
1306 while (count--) {
1307 state = (state*1103515245+12345);
1308 data[i++] = '0'+((state>>16)&63);
1309 }
1310 }
1311
1312 /* Returns number of consumed options. */
parseOptions(int argc,const char ** argv)1313 int parseOptions(int argc, const char **argv) {
1314 int i;
1315 int lastarg;
1316 int exit_status = 1;
1317
1318 for (i = 1; i < argc; i++) {
1319 lastarg = (i == (argc-1));
1320
1321 if (!strcmp(argv[i],"-c")) {
1322 if (lastarg) goto invalid;
1323 config.numclients = atoi(argv[++i]);
1324 } else if (!strcmp(argv[i],"-n")) {
1325 if (lastarg) goto invalid;
1326 config.requests = atoi(argv[++i]);
1327 } else if (!strcmp(argv[i],"-k")) {
1328 if (lastarg) goto invalid;
1329 config.keepalive = atoi(argv[++i]);
1330 } else if (!strcmp(argv[i],"-h")) {
1331 if (lastarg) goto invalid;
1332 config.hostip = strdup(argv[++i]);
1333 } else if (!strcmp(argv[i],"-p")) {
1334 if (lastarg) goto invalid;
1335 config.hostport = atoi(argv[++i]);
1336 } else if (!strcmp(argv[i],"-s")) {
1337 if (lastarg) goto invalid;
1338 config.hostsocket = strdup(argv[++i]);
1339 } else if (!strcmp(argv[i],"-a") ) {
1340 if (lastarg) goto invalid;
1341 config.auth = strdup(argv[++i]);
1342 } else if (!strcmp(argv[i],"--user")) {
1343 if (lastarg) goto invalid;
1344 config.user = argv[++i];
1345 } else if (!strcmp(argv[i],"-d")) {
1346 if (lastarg) goto invalid;
1347 config.datasize = atoi(argv[++i]);
1348 if (config.datasize < 1) config.datasize=1;
1349 if (config.datasize > 1024*1024*1024) config.datasize = 1024*1024*1024;
1350 } else if (!strcmp(argv[i],"-P")) {
1351 if (lastarg) goto invalid;
1352 config.pipeline = atoi(argv[++i]);
1353 if (config.pipeline <= 0) config.pipeline=1;
1354 } else if (!strcmp(argv[i],"-r")) {
1355 if (lastarg) goto invalid;
1356 const char *next = argv[++i], *p = next;
1357 if (*p == '-') {
1358 p++;
1359 if (*p < '0' || *p > '9') goto invalid;
1360 }
1361 config.randomkeys = 1;
1362 config.randomkeys_keyspacelen = atoi(next);
1363 if (config.randomkeys_keyspacelen < 0)
1364 config.randomkeys_keyspacelen = 0;
1365 } else if (!strcmp(argv[i],"-q")) {
1366 config.quiet = 1;
1367 } else if (!strcmp(argv[i],"--csv")) {
1368 config.csv = 1;
1369 } else if (!strcmp(argv[i],"-l")) {
1370 config.loop = 1;
1371 } else if (!strcmp(argv[i],"-I")) {
1372 config.idlemode = 1;
1373 } else if (!strcmp(argv[i],"-e")) {
1374 config.showerrors = 1;
1375 } else if (!strcmp(argv[i],"-t")) {
1376 if (lastarg) goto invalid;
1377 /* We get the list of tests to run as a string in the form
1378 * get,set,lrange,...,test_N. Then we add a comma before and
1379 * after the string in order to make sure that searching
1380 * for ",testname," will always get a match if the test is
1381 * enabled. */
1382 config.tests = sdsnew(",");
1383 config.tests = sdscat(config.tests,(char*)argv[++i]);
1384 config.tests = sdscat(config.tests,",");
1385 sdstolower(config.tests);
1386 } else if (!strcmp(argv[i],"--dbnum")) {
1387 if (lastarg) goto invalid;
1388 config.dbnum = atoi(argv[++i]);
1389 config.dbnumstr = sdsfromlonglong(config.dbnum);
1390 } else if (!strcmp(argv[i],"--precision")) {
1391 if (lastarg) goto invalid;
1392 config.precision = atoi(argv[++i]);
1393 if (config.precision < 0) config.precision = 0;
1394 if (config.precision > MAX_LATENCY_PRECISION) config.precision = MAX_LATENCY_PRECISION;
1395 } else if (!strcmp(argv[i],"--threads")) {
1396 if (lastarg) goto invalid;
1397 config.num_threads = atoi(argv[++i]);
1398 if (config.num_threads > MAX_THREADS) {
1399 printf("WARNING: too many threads, limiting threads to %d.\n",
1400 MAX_THREADS);
1401 config.num_threads = MAX_THREADS;
1402 } else if (config.num_threads < 0) config.num_threads = 0;
1403 } else if (!strcmp(argv[i],"--cluster")) {
1404 config.cluster_mode = 1;
1405 } else if (!strcmp(argv[i],"--enable-tracking")) {
1406 config.enable_tracking = 1;
1407 } else if (!strcmp(argv[i],"--help")) {
1408 exit_status = 0;
1409 goto usage;
1410 } else {
1411 /* Assume the user meant to provide an option when the arg starts
1412 * with a dash. We're done otherwise and should use the remainder
1413 * as the command and arguments for running the benchmark. */
1414 if (argv[i][0] == '-') goto invalid;
1415 return i;
1416 }
1417 }
1418
1419 return i;
1420
1421 invalid:
1422 printf("Invalid option \"%s\" or option argument missing\n\n",argv[i]);
1423
1424 usage:
1425 printf(
1426 "Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests>] [-k <boolean>]\n\n"
1427 " -h <hostname> Server hostname (default 127.0.0.1)\n"
1428 " -p <port> Server port (default 6379)\n"
1429 " -s <socket> Server socket (overrides host and port)\n"
1430 " -a <password> Password for Redis Auth\n"
1431 " --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
1432 " -c <clients> Number of parallel connections (default 50)\n"
1433 " -n <requests> Total number of requests (default 100000)\n"
1434 " -d <size> Data size of SET/GET value in bytes (default 3)\n"
1435 " --dbnum <db> SELECT the specified db number (default 0)\n"
1436 " --threads <num> Enable multi-thread mode.\n"
1437 " --cluster Enable cluster mode.\n"
1438 " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n"
1439 " -k <boolean> 1=keep alive 0=reconnect (default 1)\n"
1440 " -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD,\n"
1441 " random members and scores for ZADD.\n"
1442 " Using this option the benchmark will expand the string __rand_int__\n"
1443 " inside an argument with a 12 digits number in the specified range\n"
1444 " from 0 to keyspacelen-1. The substitution changes every time a command\n"
1445 " is executed. Default tests use this to hit random keys in the\n"
1446 " specified range.\n"
1447 " -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).\n"
1448 " -e If server replies with errors, show them on stdout.\n"
1449 " (no more than 1 error per second is displayed)\n"
1450 " -q Quiet. Just show query/sec values\n"
1451 " --precision Number of decimal places to display in latency output (default 0)\n"
1452 " --csv Output in CSV format\n"
1453 " -l Loop. Run the tests forever\n"
1454 " -t <tests> Only run the comma separated list of tests. The test\n"
1455 " names are the same as the ones produced as output.\n"
1456 " -I Idle mode. Just open N idle connections and wait.\n\n"
1457 "Examples:\n\n"
1458 " Run the benchmark with the default configuration against 127.0.0.1:6379:\n"
1459 " $ redis-benchmark\n\n"
1460 " Use 20 parallel clients, for a total of 100k requests, against 192.168.1.1:\n"
1461 " $ redis-benchmark -h 192.168.1.1 -p 6379 -n 100000 -c 20\n\n"
1462 " Fill 127.0.0.1:6379 with about 1 million keys only using the SET test:\n"
1463 " $ redis-benchmark -t set -n 1000000 -r 100000000\n\n"
1464 " Benchmark 127.0.0.1:6379 for a few commands producing CSV output:\n"
1465 " $ redis-benchmark -t ping,set,get -n 100000 --csv\n\n"
1466 " Benchmark a specific command line:\n"
1467 " $ redis-benchmark -r 10000 -n 10000 eval 'return redis.call(\"ping\")' 0\n\n"
1468 " Fill a list with 10000 random elements:\n"
1469 " $ redis-benchmark -r 10000 -n 10000 lpush mylist __rand_int__\n\n"
1470 " On user specified command lines __rand_int__ is replaced with a random integer\n"
1471 " with a range of values selected by the -r option.\n"
1472 );
1473 exit(exit_status);
1474 }
1475
showThroughput(struct aeEventLoop * eventLoop,long long id,void * clientData)1476 int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1477 UNUSED(eventLoop);
1478 UNUSED(id);
1479 UNUSED(clientData);
1480 int liveclients = 0;
1481 int requests_finished = 0;
1482 atomicGet(config.liveclients, liveclients);
1483 atomicGet(config.requests_finished, requests_finished);
1484
1485 if (liveclients == 0 && requests_finished != config.requests) {
1486 fprintf(stderr,"All clients disconnected... aborting.\n");
1487 exit(1);
1488 }
1489 if (config.num_threads && requests_finished >= config.requests) {
1490 aeStop(eventLoop);
1491 return AE_NOMORE;
1492 }
1493 if (config.csv) return 250;
1494 if (config.idlemode == 1) {
1495 printf("clients: %d\r", config.liveclients);
1496 fflush(stdout);
1497 return 250;
1498 }
1499 float dt = (float)(mstime()-config.start)/1000.0;
1500 float rps = (float)requests_finished/dt;
1501 printf("%s: %.2f\r", config.title, rps);
1502 fflush(stdout);
1503 return 250; /* every 250ms */
1504 }
1505
1506 /* Return true if the named test was selected using the -t command line
1507 * switch, or if all the tests are selected (no -t passed by user). */
test_is_selected(char * name)1508 int test_is_selected(char *name) {
1509 char buf[256];
1510 int l = strlen(name);
1511
1512 if (config.tests == NULL) return 1;
1513 buf[0] = ',';
1514 memcpy(buf+1,name,l);
1515 buf[l+1] = ',';
1516 buf[l+2] = '\0';
1517 return strstr(config.tests,buf) != NULL;
1518 }
1519
main(int argc,const char ** argv)1520 int main(int argc, const char **argv) {
1521 int i;
1522 char *data, *cmd, *tag;
1523 int len;
1524
1525 client c;
1526
1527 srandom(time(NULL) ^ getpid());
1528 init_genrand64(ustime() ^ getpid());
1529 signal(SIGHUP, SIG_IGN);
1530 signal(SIGPIPE, SIG_IGN);
1531
1532 config.numclients = 50;
1533 config.requests = 100000;
1534 config.liveclients = 0;
1535 config.el = aeCreateEventLoop(1024*10);
1536 aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
1537 config.keepalive = 1;
1538 config.datasize = 3;
1539 config.pipeline = 1;
1540 config.showerrors = 0;
1541 config.randomkeys = 0;
1542 config.randomkeys_keyspacelen = 0;
1543 config.quiet = 0;
1544 config.csv = 0;
1545 config.loop = 0;
1546 config.idlemode = 0;
1547 config.latency = NULL;
1548 config.clients = listCreate();
1549 config.hostip = "127.0.0.1";
1550 config.hostport = 6379;
1551 config.hostsocket = NULL;
1552 config.tests = NULL;
1553 config.dbnum = 0;
1554 config.auth = NULL;
1555 config.precision = 1;
1556 config.num_threads = 0;
1557 config.threads = NULL;
1558 config.cluster_mode = 0;
1559 config.cluster_node_count = 0;
1560 config.cluster_nodes = NULL;
1561 config.redis_config = NULL;
1562 config.is_fetching_slots = 0;
1563 config.is_updating_slots = 0;
1564 config.slots_last_update = 0;
1565 config.enable_tracking = 0;
1566
1567 i = parseOptions(argc,argv);
1568 argc -= i;
1569 argv += i;
1570
1571 config.latency = zmalloc(sizeof(long long)*config.requests);
1572
1573 tag = "";
1574
1575 if (config.cluster_mode) {
1576 // We only include the slot placeholder {tag} if cluster mode is enabled
1577 tag = ":{tag}";
1578
1579 /* Fetch cluster configuration. */
1580 if (!fetchClusterConfiguration() || !config.cluster_nodes) {
1581 if (!config.hostsocket) {
1582 fprintf(stderr, "Failed to fetch cluster configuration from "
1583 "%s:%d\n", config.hostip, config.hostport);
1584 } else {
1585 fprintf(stderr, "Failed to fetch cluster configuration from "
1586 "%s\n", config.hostsocket);
1587 }
1588 exit(1);
1589 }
1590 if (config.cluster_node_count <= 1) {
1591 fprintf(stderr, "Invalid cluster: %d node(s).\n",
1592 config.cluster_node_count);
1593 exit(1);
1594 }
1595 printf("Cluster has %d master nodes:\n\n", config.cluster_node_count);
1596 int i = 0;
1597 for (; i < config.cluster_node_count; i++) {
1598 clusterNode *node = config.cluster_nodes[i];
1599 if (!node) {
1600 fprintf(stderr, "Invalid cluster node #%d\n", i);
1601 exit(1);
1602 }
1603 printf("Master %d: ", i);
1604 if (node->name) printf("%s ", node->name);
1605 printf("%s:%d\n", node->ip, node->port);
1606 node->redis_config = getRedisConfig(node->ip, node->port, NULL);
1607 if (node->redis_config == NULL) {
1608 fprintf(stderr, "WARN: could not fetch node CONFIG %s:%d\n",
1609 node->ip, node->port);
1610 }
1611 }
1612 printf("\n");
1613 /* Automatically set thread number to node count if not specified
1614 * by the user. */
1615 if (config.num_threads == 0)
1616 config.num_threads = config.cluster_node_count;
1617 } else {
1618 config.redis_config =
1619 getRedisConfig(config.hostip, config.hostport, config.hostsocket);
1620 if (config.redis_config == NULL)
1621 fprintf(stderr, "WARN: could not fetch server CONFIG\n");
1622 }
1623
1624 if (config.num_threads > 0) {
1625 pthread_mutex_init(&(config.requests_issued_mutex), NULL);
1626 pthread_mutex_init(&(config.requests_finished_mutex), NULL);
1627 pthread_mutex_init(&(config.liveclients_mutex), NULL);
1628 pthread_mutex_init(&(config.is_fetching_slots_mutex), NULL);
1629 pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
1630 pthread_mutex_init(&(config.updating_slots_mutex), NULL);
1631 pthread_mutex_init(&(config.slots_last_update_mutex), NULL);
1632 }
1633
1634 if (config.keepalive == 0) {
1635 printf("WARNING: keepalive disabled, you probably need 'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and 'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order to use a lot of clients/requests\n");
1636 }
1637
1638 if (config.idlemode) {
1639 printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
1640 int thread_id = -1, use_threads = (config.num_threads > 0);
1641 if (use_threads) {
1642 thread_id = 0;
1643 initBenchmarkThreads();
1644 }
1645 c = createClient("",0,NULL,thread_id); /* will never receive a reply */
1646 createMissingClients(c);
1647 if (use_threads) startBenchmarkThreads();
1648 else aeMain(config.el);
1649 /* and will wait for every */
1650 }
1651
1652 /* Run benchmark with command in the remainder of the arguments. */
1653 if (argc) {
1654 sds title = sdsnew(argv[0]);
1655 for (i = 1; i < argc; i++) {
1656 title = sdscatlen(title, " ", 1);
1657 title = sdscatlen(title, (char*)argv[i], strlen(argv[i]));
1658 }
1659
1660 do {
1661 len = redisFormatCommandArgv(&cmd,argc,argv,NULL);
1662 benchmark(title,cmd,len);
1663 free(cmd);
1664 } while(config.loop);
1665
1666 if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
1667 return 0;
1668 }
1669
1670 /* Run default benchmark suite. */
1671 data = zmalloc(config.datasize+1);
1672 do {
1673 genBenchmarkRandomData(data, config.datasize);
1674 data[config.datasize] = '\0';
1675
1676 if (test_is_selected("ping_inline") || test_is_selected("ping"))
1677 benchmark("PING_INLINE","PING\r\n",6);
1678
1679 if (test_is_selected("ping_mbulk") || test_is_selected("ping")) {
1680 len = redisFormatCommand(&cmd,"PING");
1681 benchmark("PING_BULK",cmd,len);
1682 free(cmd);
1683 }
1684
1685 if (test_is_selected("set")) {
1686 len = redisFormatCommand(&cmd,"SET key%s:__rand_int__ %s",tag,data);
1687 benchmark("SET",cmd,len);
1688 free(cmd);
1689 }
1690
1691 if (test_is_selected("get")) {
1692 len = redisFormatCommand(&cmd,"GET key%s:__rand_int__",tag);
1693 benchmark("GET",cmd,len);
1694 free(cmd);
1695 }
1696
1697 if (test_is_selected("incr")) {
1698 len = redisFormatCommand(&cmd,"INCR counter%s:__rand_int__",tag);
1699 benchmark("INCR",cmd,len);
1700 free(cmd);
1701 }
1702
1703 if (test_is_selected("lpush")) {
1704 len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
1705 benchmark("LPUSH",cmd,len);
1706 free(cmd);
1707 }
1708
1709 if (test_is_selected("rpush")) {
1710 len = redisFormatCommand(&cmd,"RPUSH mylist%s %s",tag,data);
1711 benchmark("RPUSH",cmd,len);
1712 free(cmd);
1713 }
1714
1715 if (test_is_selected("lpop")) {
1716 len = redisFormatCommand(&cmd,"LPOP mylist%s",tag);
1717 benchmark("LPOP",cmd,len);
1718 free(cmd);
1719 }
1720
1721 if (test_is_selected("rpop")) {
1722 len = redisFormatCommand(&cmd,"RPOP mylist%s",tag);
1723 benchmark("RPOP",cmd,len);
1724 free(cmd);
1725 }
1726
1727 if (test_is_selected("sadd")) {
1728 len = redisFormatCommand(&cmd,
1729 "SADD myset%s element:__rand_int__",tag);
1730 benchmark("SADD",cmd,len);
1731 free(cmd);
1732 }
1733
1734 if (test_is_selected("hset")) {
1735 len = redisFormatCommand(&cmd,
1736 "HSET myhash%s element:__rand_int__ %s",tag,data);
1737 benchmark("HSET",cmd,len);
1738 free(cmd);
1739 }
1740
1741 if (test_is_selected("spop")) {
1742 len = redisFormatCommand(&cmd,"SPOP myset%s",tag);
1743 benchmark("SPOP",cmd,len);
1744 free(cmd);
1745 }
1746
1747 if (test_is_selected("zadd")) {
1748 char *score = "0";
1749 if (config.randomkeys) score = "__rand_int__";
1750 len = redisFormatCommand(&cmd,
1751 "ZADD myzset%s %s element:__rand_int__",tag,score);
1752 benchmark("ZADD",cmd,len);
1753 free(cmd);
1754 }
1755
1756 if (test_is_selected("zpopmin")) {
1757 len = redisFormatCommand(&cmd,"ZPOPMIN myzset%s",tag);
1758 benchmark("ZPOPMIN",cmd,len);
1759 free(cmd);
1760 }
1761
1762 if (test_is_selected("lrange") ||
1763 test_is_selected("lrange_100") ||
1764 test_is_selected("lrange_300") ||
1765 test_is_selected("lrange_500") ||
1766 test_is_selected("lrange_600"))
1767 {
1768 len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
1769 benchmark("LPUSH (needed to benchmark LRANGE)",cmd,len);
1770 free(cmd);
1771 }
1772
1773 if (test_is_selected("lrange") || test_is_selected("lrange_100")) {
1774 len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 99",tag);
1775 benchmark("LRANGE_100 (first 100 elements)",cmd,len);
1776 free(cmd);
1777 }
1778
1779 if (test_is_selected("lrange") || test_is_selected("lrange_300")) {
1780 len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 299",tag);
1781 benchmark("LRANGE_300 (first 300 elements)",cmd,len);
1782 free(cmd);
1783 }
1784
1785 if (test_is_selected("lrange") || test_is_selected("lrange_500")) {
1786 len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 449",tag);
1787 benchmark("LRANGE_500 (first 450 elements)",cmd,len);
1788 free(cmd);
1789 }
1790
1791 if (test_is_selected("lrange") || test_is_selected("lrange_600")) {
1792 len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 599",tag);
1793 benchmark("LRANGE_600 (first 600 elements)",cmd,len);
1794 free(cmd);
1795 }
1796
1797 if (test_is_selected("mset")) {
1798 const char *cmd_argv[21];
1799 cmd_argv[0] = "MSET";
1800 sds key_placeholder = sdscatprintf(sdsnew(""),"key%s:__rand_int__",tag);
1801 for (i = 1; i < 21; i += 2) {
1802 cmd_argv[i] = key_placeholder;
1803 cmd_argv[i+1] = data;
1804 }
1805 len = redisFormatCommandArgv(&cmd,21,cmd_argv,NULL);
1806 benchmark("MSET (10 keys)",cmd,len);
1807 free(cmd);
1808 sdsfree(key_placeholder);
1809 }
1810
1811 if (!config.csv) printf("\n");
1812 } while(config.loop);
1813
1814 if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
1815
1816 return 0;
1817 }
1818