1 /*
2  * MDCacheD multi-client benchmark
3  * (c) 2008.-2013. Ivan Voras <ivoras@gmail.com>
4  * $Id: bench_mdcached_async.c 493 2013-05-15 15:39:23Z ivoras $
5  */
6 #include <sys/types.h>
7 #include <sys/fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/socket.h>
10 #include <sys/time.h>
11 #include <sys/un.h>
12 #include <netinet/in.h>
13 #include <netinet/tcp.h>
14 #include <netdb.h>
15 #include <time.h>
16 
17 #include <unistd.h>
18 #include <stdlib.h>
19 #include <stdio.h>
20 #include <pthread.h>
21 #include <errno.h>
22 #include <string.h>
23 #include <paths.h>
24 #include <syslog.h>
25 #include <assert.h>
26 #include <err.h>
27 
28 #include "mc_protocol.h"
29 #include "mc_client.h"
30 #include "common_utils.h"
31 #include "bench_utils.h"
32 #include "minilibevent.h"
33 
34 char *server_host = MC_DEFAULT_UNIX_SOCKET;
35 int server_port = MC_DEFAULT_INET_PORT;
36 int is_unix_socket = TRUE;
37 int percent_read = 90; 	/* reader:writer ratio */
38 float scale = 1;
39 
40 struct ddata {
41 	char *key;
42 	unsigned int key_size;
43 	char *data;
44 	unsigned int data_size;
45 	int been_put;
46 };
47 
48 struct ddata *gendata;
49 
50 int n_clients = 10;		/* number of clients (connections) */
51 struct mc_connection **conns;	/* master connection list */
52 
53 struct client {
54 	struct mc_connection *conn;
55 	unsigned int reqno;
56 	enum { REQ_PUT, REQ_GET } reqtype;
57 };
58 
59 struct bench_thread {
60 	pthread_t thread;
61 	unsigned int n_clients;
62 	unsigned int clients_count;
63 	struct client *clients;
64 };
65 
66 struct bench_thread *threads;
67 int n_threads = 2;
68 
mkconn()69 struct mc_connection *mkconn()
70 {
71 	char *err;
72 	struct mc_connection *conn;
73 
74 	if (is_unix_socket)
75 		conn = mc_connect_local(server_host, TRUE, &err);
76 	else
77 		conn = mc_connect_tcp(server_host, server_port, TRUE, &err);
78 	if (conn == NULL)
79 		errx(1, "connection error: %s", err);
80 	mc_set_connection_async(conn);
81 	return conn;
82 }
83 
84 
85 /**
86  * Initiate the next request on the client. It can be a PUT or a GET.
87  */
next_request(struct client * cl)88 static void next_request(struct client *cl)
89 {
90 	int is_write = (rand() % 100) >= percent_read;	/* is it a read or a write request? */
91 	int r = rand() % N_DATA;
92 	int res;
93 	char *err = NULL;
94 
95 	if (is_write) {
96 		res = mc_put_simple_async(cl->conn, gendata[r].key, gendata[r].key_size, gendata[r].data, gendata[r].data_size, 3600, &err);
97 		if (res == MC_RESULT_PENDING)
98 			while ((res = mc_async_sendpoll(cl->conn)) == MC_RESULT_PENDING) {}
99 		if (res != MC_RESULT_OK)
100 			errx(1, "Error in mc_put_simple_async: %s", err);
101 		gendata[r].been_put = TRUE;
102 		cl->reqtype = REQ_PUT;
103 	} else {
104 		res = mc_get_simple_async(cl->conn, gendata[r].key, gendata[r].key_size, &err);
105 		if (res == MC_RESULT_PENDING)
106 			while ((res = mc_async_sendpoll(cl->conn)) == MC_RESULT_PENDING) {}
107 		if (res != MC_RESULT_OK)
108 			errx(1, "Error in mc_get_simple_async: %s (%s)", err, mc_result_to_string(res));
109 		cl->reqtype = REQ_GET;
110 	}
111 }
112 
client_thread(void * arg)113 void *client_thread(void *arg)
114 {
115 	struct bench_thread *thr = arg;
116 	unsigned int i, max_requests, n_requests, n_events, n_iter;
117 	char *err;
118 	ev_queue evq;
119 	ev_list evl;
120 
121 	printf("Created thread %p (including handshake) with %d connections\n", thr, thr->n_clients);
122 
123 	/* Create IO queue */
124 	evq = ev_create_queue();
125 	evl = ev_allocate_list(thr->n_clients);
126 
127 	for (i = 0; i < thr->n_clients; i++) {
128 		assert(thr->clients);
129 		assert(thr->clients[i].conn);
130 		ev_set_list_element(evl, i, thr->clients[i].conn->sock, EV_READ, &thr->clients[i]);
131 	}
132 	ev_enqueue_list(evq, evl, thr->n_clients);
133 
134 	/* Get a first round of requests going - this will start the async chain of events */
135 	for (i = 0; i < thr->n_clients; i++) {
136 		int r;
137 		if ((r = mc_put_simple_async(thr->clients[i].conn, gendata[i].key, gendata[i].key_size, gendata[i].data, gendata[i].data_size, 3600, &err)) != MC_RESULT_OK)
138 			errx(1, "Error sending initial data to client #%d on thread %p: %s, %d", i, thr, err, r);
139 		thr->clients[i].reqtype = REQ_PUT;
140 	}
141 	max_requests = thr->n_clients * N_REQUESTS;
142 	n_requests = n_events = n_iter = 0;
143 
144 	while (1) {
145 		int r, err;
146 		struct client *cl;
147 		struct mc_header *pkt = NULL;
148 
149 		r  = ev_get_new_events(evq, evl, thr->n_clients, NULL);
150 		n_events += r;
151 
152 		for (i = 0; i < r; i++) {
153 			cl = evl[i].udata;
154 			if (evl[i].filter == EV_READ) {
155 				if ((err = mc_async_recvpoll(cl->conn, &pkt)) == MC_RESULT_OK) {
156 					if (cl->reqtype == REQ_PUT) {
157 						struct mc_resp_status *presp = (struct mc_resp_status*)pkt;
158 						if (presp->status != MCSTATUS_OK)
159 							errx(1, "Error in received status");
160 						free(pkt);
161 						/* We have just completed the first step of PUT(command) -> GET(data) chain of events. */
162 						/* We need to initiate a new request now. The new request will be randomly chosen. */
163 						next_request(cl);
164 						n_requests++;
165 					} else if (cl->reqtype == REQ_GET) {
166 						struct mc_resp_data *pget = (struct mc_resp_data*)pkt;
167 						if (pget->header.mcmd == MCRESP_DATA) {
168 							/* all is well. XXX: check if received data is valid */
169 						} else if (pget->header.mcmd == MCRESP_STATUS) {
170 							struct mc_resp_status *status = (struct mc_resp_status*)pkt;
171 							if (status->status == MCSTATUS_ERROR && status->data == MCREASON_NOTFOUND)
172 							{
173 								/* all is well. XXX: check if the data should have been found. */
174 							} else
175 								errx(1, "Unknown status: %d,%d", status->status, status->data);
176 						}
177 						free(pkt);
178 						/* We have just completed the second step of PUT(command) -> GET(data) chain of events. */
179 						/* We need to initiate a new request now. The new request will be randomly chosen. */
180 						next_request(cl);
181 						n_requests++;
182 					}
183 				} else if (err != MC_RESULT_PENDING) {
184 					fprintf(stderr, "async_recvpoll returned %d (%s)\n", err, mc_result_to_string(err));
185 					printf("conn type: %d, state: %d\n", cl->conn->type, cl->conn->async_state);
186 					abort();
187 				}
188 			}
189 		}
190 		if (n_requests > max_requests)
191 			break;
192 		n_iter++;
193 	}
194 	printf("Thread %p ends: %u events in %u iterations: %0.1f events/iteration\n", thr, n_events, n_iter, (float)n_events/n_iter);
195 	return NULL;
196 }
197 
198 
199 
main(int argc,char ** argv)200 int main(int argc, char **argv)
201 {
202 	int ch;
203 	double t1, t2;
204 	int i, count;
205 	unsigned int mem_key_size = 0, mem_data_size = 0;
206 	unsigned int min_data_size = 10000000, max_data_size = 0;
207 
208 	if (RAND_MAX < N_DATA)
209 		errx(1, "RAND_MAX is too small.");
210 	srand(334455); /* we want repeatable runs */
211 
212 	while ((ch = getopt(argc, argv, "c:t:n:s:p:r:h")) != -1) {
213 		switch (ch) {
214 		case 'h':
215 			printf("usage: %s [-c server] [-p port] [-s #] [-h] [-r %%read]\n", argv[0]);
216 			printf("	-c server	Server to connect to (default = local unix socket)\n");
217 			printf("	-s #		Scale of tests (larger # -> test larger data)\n");
218 			printf("	-p port		TCP port (usable only with -c)\n");
219 			printf("	-r %%read	Percentage of read requests (1-99)\n");
220 			printf("	-n n_clients	Number of clients to connect\n");
221 			printf("	-t n_threads	Number of threads to spawn (clients will be distributed across threads)\n");
222 			exit(0);
223 			break;
224 		case 'c':
225 			server_host = optarg;
226 			is_unix_socket = FALSE;
227 			break;
228 		case 'p':
229 			server_port = atoi(optarg);
230 			if (server_port < 1 || server_port > 65535)
231 				errx(1, "Invalid port %d", server_port);
232 			break;
233 		case 'r':
234 			percent_read = atoi(optarg);
235 			if (percent_read < 1 || percent_read > 99)
236 				errx(1, "Percentage invalid: %d", percent_read);
237 			break;
238 		case 't':
239 			n_threads = atoi(optarg);
240 			if (n_threads < 1)
241 				errx(1, "Invalid number of threads: %d", n_threads);
242 			break;
243 		case 'n':
244 			n_clients = atoi(optarg);
245 			if (n_clients < 1)
246 				errx(1, "Invalid number of clients: %d", n_clients);
247 			break;
248 		case 's':
249 			scale = atof(optarg);
250 			break;
251 		default:
252 			fprintf(stderr, "Unknown argument: %c\n", ch);
253 			exit(1);
254 		}
255 	}
256 
257 	printf("Generating %u data records.\n", N_DATA);
258 
259 	gendata = calloc(N_DATA, sizeof *gendata);
260 	for (i = 0; i < N_DATA; i++) {
261 		gen_data(i, &gendata[i].key, &gendata[i].key_size, &gendata[i].data, &gendata[i].data_size, scale);
262 		mem_key_size += gendata[i].key_size;
263 		mem_data_size += gendata[i].data_size;
264 		if (gendata[i].data_size < min_data_size)
265 			min_data_size = gendata[i].data_size;
266 		if (gendata[i].data_size > max_data_size)
267 			max_data_size = gendata[i].data_size;
268 	}
269 
270 	printf("Generated %u kB (%u kB keys, %u kB data). Starting %d connections in %d threads.\n", (mem_key_size + mem_data_size) / 1024, mem_key_size / 1024, mem_data_size / 1024, n_clients, n_threads);
271 	printf("Minimum data size: %d, Maximum data size: %d\n", min_data_size, max_data_size);
272 	printf("Average key size: %u bytes.\n", mem_key_size / N_DATA);
273 	printf("Average data size: %u bytes.\n", mem_data_size / N_DATA);
274 
275 	threads = calloc(n_threads, sizeof(*threads));
276 	conns = calloc(n_clients, sizeof(*conns));
277 	for (i = 0; i < n_clients; i++) {
278 		struct bench_thread *thr;
279 		int cl;
280 
281 		conns[i] = mkconn();
282 		assert(conns[i]);
283 		thr = &threads[i % n_threads];
284 
285 		cl = thr->clients_count++;
286 		thr->clients = realloc(thr->clients, thr->clients_count * sizeof(*thr->clients));
287 		assert(thr->clients);
288 
289 		thr->clients[cl].conn = conns[i];
290 		thr->clients[cl].reqno = 0;
291 		thr->n_clients = thr->clients_count;
292 		assert(thr->clients[cl].conn);
293 	}
294 
295 	for (i = 0; i < n_threads; i++)
296 		pthread_create(&threads[i].thread, NULL, client_thread, &threads[i]);
297 
298 	t1 = now();
299 	for (i = 0; i < n_threads; i++)
300 		pthread_join(threads[i].thread, NULL);
301 	t2 = now();
302 
303 	printf("%d clients * %u requests = %u requests. That's %0.1lf requests/s\n",
304 		n_clients, N_REQUESTS, n_clients * N_REQUESTS, (n_clients * N_REQUESTS) / (t2 - t1));
305 
306 	count = 0;
307 	for (i = 0; i < N_DATA; i++)
308 		if (gendata[i].been_put)
309 			count++;
310 	printf("%d records have been put.\n", count);
311 
312 	return 0;
313 }
314