1 /*
2 * MDCacheD multi-client benchmark
3 * (c) 2008.-2013. Ivan Voras <ivoras@gmail.com>
4 * $Id: bench_mdcached_async.c 493 2013-05-15 15:39:23Z ivoras $
5 */
6 #include <sys/types.h>
7 #include <sys/fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/socket.h>
10 #include <sys/time.h>
11 #include <sys/un.h>
12 #include <netinet/in.h>
13 #include <netinet/tcp.h>
14 #include <netdb.h>
15 #include <time.h>
16
17 #include <unistd.h>
18 #include <stdlib.h>
19 #include <stdio.h>
20 #include <pthread.h>
21 #include <errno.h>
22 #include <string.h>
23 #include <paths.h>
24 #include <syslog.h>
25 #include <assert.h>
26 #include <err.h>
27
28 #include "mc_protocol.h"
29 #include "mc_client.h"
30 #include "common_utils.h"
31 #include "bench_utils.h"
32 #include "minilibevent.h"
33
34 char *server_host = MC_DEFAULT_UNIX_SOCKET;
35 int server_port = MC_DEFAULT_INET_PORT;
36 int is_unix_socket = TRUE;
37 int percent_read = 90; /* reader:writer ratio */
38 float scale = 1;
39
40 struct ddata {
41 char *key;
42 unsigned int key_size;
43 char *data;
44 unsigned int data_size;
45 int been_put;
46 };
47
48 struct ddata *gendata;
49
50 int n_clients = 10; /* number of clients (connections) */
51 struct mc_connection **conns; /* master connection list */
52
53 struct client {
54 struct mc_connection *conn;
55 unsigned int reqno;
56 enum { REQ_PUT, REQ_GET } reqtype;
57 };
58
59 struct bench_thread {
60 pthread_t thread;
61 unsigned int n_clients;
62 unsigned int clients_count;
63 struct client *clients;
64 };
65
66 struct bench_thread *threads;
67 int n_threads = 2;
68
mkconn()69 struct mc_connection *mkconn()
70 {
71 char *err;
72 struct mc_connection *conn;
73
74 if (is_unix_socket)
75 conn = mc_connect_local(server_host, TRUE, &err);
76 else
77 conn = mc_connect_tcp(server_host, server_port, TRUE, &err);
78 if (conn == NULL)
79 errx(1, "connection error: %s", err);
80 mc_set_connection_async(conn);
81 return conn;
82 }
83
84
85 /**
86 * Initiate the next request on the client. It can be a PUT or a GET.
87 */
next_request(struct client * cl)88 static void next_request(struct client *cl)
89 {
90 int is_write = (rand() % 100) >= percent_read; /* is it a read or a write request? */
91 int r = rand() % N_DATA;
92 int res;
93 char *err = NULL;
94
95 if (is_write) {
96 res = mc_put_simple_async(cl->conn, gendata[r].key, gendata[r].key_size, gendata[r].data, gendata[r].data_size, 3600, &err);
97 if (res == MC_RESULT_PENDING)
98 while ((res = mc_async_sendpoll(cl->conn)) == MC_RESULT_PENDING) {}
99 if (res != MC_RESULT_OK)
100 errx(1, "Error in mc_put_simple_async: %s", err);
101 gendata[r].been_put = TRUE;
102 cl->reqtype = REQ_PUT;
103 } else {
104 res = mc_get_simple_async(cl->conn, gendata[r].key, gendata[r].key_size, &err);
105 if (res == MC_RESULT_PENDING)
106 while ((res = mc_async_sendpoll(cl->conn)) == MC_RESULT_PENDING) {}
107 if (res != MC_RESULT_OK)
108 errx(1, "Error in mc_get_simple_async: %s (%s)", err, mc_result_to_string(res));
109 cl->reqtype = REQ_GET;
110 }
111 }
112
client_thread(void * arg)113 void *client_thread(void *arg)
114 {
115 struct bench_thread *thr = arg;
116 unsigned int i, max_requests, n_requests, n_events, n_iter;
117 char *err;
118 ev_queue evq;
119 ev_list evl;
120
121 printf("Created thread %p (including handshake) with %d connections\n", thr, thr->n_clients);
122
123 /* Create IO queue */
124 evq = ev_create_queue();
125 evl = ev_allocate_list(thr->n_clients);
126
127 for (i = 0; i < thr->n_clients; i++) {
128 assert(thr->clients);
129 assert(thr->clients[i].conn);
130 ev_set_list_element(evl, i, thr->clients[i].conn->sock, EV_READ, &thr->clients[i]);
131 }
132 ev_enqueue_list(evq, evl, thr->n_clients);
133
134 /* Get a first round of requests going - this will start the async chain of events */
135 for (i = 0; i < thr->n_clients; i++) {
136 int r;
137 if ((r = mc_put_simple_async(thr->clients[i].conn, gendata[i].key, gendata[i].key_size, gendata[i].data, gendata[i].data_size, 3600, &err)) != MC_RESULT_OK)
138 errx(1, "Error sending initial data to client #%d on thread %p: %s, %d", i, thr, err, r);
139 thr->clients[i].reqtype = REQ_PUT;
140 }
141 max_requests = thr->n_clients * N_REQUESTS;
142 n_requests = n_events = n_iter = 0;
143
144 while (1) {
145 int r, err;
146 struct client *cl;
147 struct mc_header *pkt = NULL;
148
149 r = ev_get_new_events(evq, evl, thr->n_clients, NULL);
150 n_events += r;
151
152 for (i = 0; i < r; i++) {
153 cl = evl[i].udata;
154 if (evl[i].filter == EV_READ) {
155 if ((err = mc_async_recvpoll(cl->conn, &pkt)) == MC_RESULT_OK) {
156 if (cl->reqtype == REQ_PUT) {
157 struct mc_resp_status *presp = (struct mc_resp_status*)pkt;
158 if (presp->status != MCSTATUS_OK)
159 errx(1, "Error in received status");
160 free(pkt);
161 /* We have just completed the first step of PUT(command) -> GET(data) chain of events. */
162 /* We need to initiate a new request now. The new request will be randomly chosen. */
163 next_request(cl);
164 n_requests++;
165 } else if (cl->reqtype == REQ_GET) {
166 struct mc_resp_data *pget = (struct mc_resp_data*)pkt;
167 if (pget->header.mcmd == MCRESP_DATA) {
168 /* all is well. XXX: check if received data is valid */
169 } else if (pget->header.mcmd == MCRESP_STATUS) {
170 struct mc_resp_status *status = (struct mc_resp_status*)pkt;
171 if (status->status == MCSTATUS_ERROR && status->data == MCREASON_NOTFOUND)
172 {
173 /* all is well. XXX: check if the data should have been found. */
174 } else
175 errx(1, "Unknown status: %d,%d", status->status, status->data);
176 }
177 free(pkt);
178 /* We have just completed the second step of PUT(command) -> GET(data) chain of events. */
179 /* We need to initiate a new request now. The new request will be randomly chosen. */
180 next_request(cl);
181 n_requests++;
182 }
183 } else if (err != MC_RESULT_PENDING) {
184 fprintf(stderr, "async_recvpoll returned %d (%s)\n", err, mc_result_to_string(err));
185 printf("conn type: %d, state: %d\n", cl->conn->type, cl->conn->async_state);
186 abort();
187 }
188 }
189 }
190 if (n_requests > max_requests)
191 break;
192 n_iter++;
193 }
194 printf("Thread %p ends: %u events in %u iterations: %0.1f events/iteration\n", thr, n_events, n_iter, (float)n_events/n_iter);
195 return NULL;
196 }
197
198
199
main(int argc,char ** argv)200 int main(int argc, char **argv)
201 {
202 int ch;
203 double t1, t2;
204 int i, count;
205 unsigned int mem_key_size = 0, mem_data_size = 0;
206 unsigned int min_data_size = 10000000, max_data_size = 0;
207
208 if (RAND_MAX < N_DATA)
209 errx(1, "RAND_MAX is too small.");
210 srand(334455); /* we want repeatable runs */
211
212 while ((ch = getopt(argc, argv, "c:t:n:s:p:r:h")) != -1) {
213 switch (ch) {
214 case 'h':
215 printf("usage: %s [-c server] [-p port] [-s #] [-h] [-r %%read]\n", argv[0]);
216 printf(" -c server Server to connect to (default = local unix socket)\n");
217 printf(" -s # Scale of tests (larger # -> test larger data)\n");
218 printf(" -p port TCP port (usable only with -c)\n");
219 printf(" -r %%read Percentage of read requests (1-99)\n");
220 printf(" -n n_clients Number of clients to connect\n");
221 printf(" -t n_threads Number of threads to spawn (clients will be distributed across threads)\n");
222 exit(0);
223 break;
224 case 'c':
225 server_host = optarg;
226 is_unix_socket = FALSE;
227 break;
228 case 'p':
229 server_port = atoi(optarg);
230 if (server_port < 1 || server_port > 65535)
231 errx(1, "Invalid port %d", server_port);
232 break;
233 case 'r':
234 percent_read = atoi(optarg);
235 if (percent_read < 1 || percent_read > 99)
236 errx(1, "Percentage invalid: %d", percent_read);
237 break;
238 case 't':
239 n_threads = atoi(optarg);
240 if (n_threads < 1)
241 errx(1, "Invalid number of threads: %d", n_threads);
242 break;
243 case 'n':
244 n_clients = atoi(optarg);
245 if (n_clients < 1)
246 errx(1, "Invalid number of clients: %d", n_clients);
247 break;
248 case 's':
249 scale = atof(optarg);
250 break;
251 default:
252 fprintf(stderr, "Unknown argument: %c\n", ch);
253 exit(1);
254 }
255 }
256
257 printf("Generating %u data records.\n", N_DATA);
258
259 gendata = calloc(N_DATA, sizeof *gendata);
260 for (i = 0; i < N_DATA; i++) {
261 gen_data(i, &gendata[i].key, &gendata[i].key_size, &gendata[i].data, &gendata[i].data_size, scale);
262 mem_key_size += gendata[i].key_size;
263 mem_data_size += gendata[i].data_size;
264 if (gendata[i].data_size < min_data_size)
265 min_data_size = gendata[i].data_size;
266 if (gendata[i].data_size > max_data_size)
267 max_data_size = gendata[i].data_size;
268 }
269
270 printf("Generated %u kB (%u kB keys, %u kB data). Starting %d connections in %d threads.\n", (mem_key_size + mem_data_size) / 1024, mem_key_size / 1024, mem_data_size / 1024, n_clients, n_threads);
271 printf("Minimum data size: %d, Maximum data size: %d\n", min_data_size, max_data_size);
272 printf("Average key size: %u bytes.\n", mem_key_size / N_DATA);
273 printf("Average data size: %u bytes.\n", mem_data_size / N_DATA);
274
275 threads = calloc(n_threads, sizeof(*threads));
276 conns = calloc(n_clients, sizeof(*conns));
277 for (i = 0; i < n_clients; i++) {
278 struct bench_thread *thr;
279 int cl;
280
281 conns[i] = mkconn();
282 assert(conns[i]);
283 thr = &threads[i % n_threads];
284
285 cl = thr->clients_count++;
286 thr->clients = realloc(thr->clients, thr->clients_count * sizeof(*thr->clients));
287 assert(thr->clients);
288
289 thr->clients[cl].conn = conns[i];
290 thr->clients[cl].reqno = 0;
291 thr->n_clients = thr->clients_count;
292 assert(thr->clients[cl].conn);
293 }
294
295 for (i = 0; i < n_threads; i++)
296 pthread_create(&threads[i].thread, NULL, client_thread, &threads[i]);
297
298 t1 = now();
299 for (i = 0; i < n_threads; i++)
300 pthread_join(threads[i].thread, NULL);
301 t2 = now();
302
303 printf("%d clients * %u requests = %u requests. That's %0.1lf requests/s\n",
304 n_clients, N_REQUESTS, n_clients * N_REQUESTS, (n_clients * N_REQUESTS) / (t2 - t1));
305
306 count = 0;
307 for (i = 0; i < N_DATA; i++)
308 if (gendata[i].been_put)
309 count++;
310 printf("%d records have been put.\n", count);
311
312 return 0;
313 }
314