1 /*-
2  * Copyright (c) 2014-2018 MongoDB, Inc.
3  * Copyright (c) 2008-2014 WiredTiger, Inc.
4  *	All rights reserved.
5  *
6  * See the file LICENSE for redistribution information.
7  */
8 
9 #include "wt_internal.h"
10 
11 /*
12  * __async_get_format --
13  *	Find or allocate the uri/config/format structure.
14  */
15 static int
__async_get_format(WT_CONNECTION_IMPL * conn,const char * uri,const char * config,WT_ASYNC_OP_IMPL * op)16 __async_get_format(WT_CONNECTION_IMPL *conn, const char *uri,
17     const char *config, WT_ASYNC_OP_IMPL *op)
18 {
19 	WT_ASYNC *async;
20 	WT_ASYNC_FORMAT *af;
21 	WT_CURSOR *c;
22 	WT_DECL_RET;
23 	WT_SESSION *wt_session;
24 	WT_SESSION_IMPL *session;
25 	uint64_t cfg_hash, uri_hash;
26 
27 	async = conn->async;
28 	c = NULL;
29 	op->format = NULL;
30 
31 	if (uri != NULL)
32 		uri_hash = __wt_hash_city64(uri, strlen(uri));
33 	else
34 		uri_hash = 0;
35 	if (config != NULL)
36 		cfg_hash = __wt_hash_city64(config, strlen(config));
37 	else
38 		cfg_hash = 0;
39 
40 	/*
41 	 * We don't need to hold a lock around this walk.  The list is
42 	 * permanent and always valid.  We might race an insert and there
43 	 * is a possibility a duplicate entry might be inserted, but
44 	 * that is not harmful.
45 	 */
46 	TAILQ_FOREACH(af, &async->formatqh, q) {
47 		if (af->uri_hash == uri_hash && af->cfg_hash == cfg_hash)
48 			goto setup;
49 	}
50 	/*
51 	 * We didn't find one in the cache.  Allocate and initialize one.
52 	 * Insert it at the head expecting LRU usage.  We need a real session
53 	 * for the cursor.
54 	 */
55 	WT_RET(__wt_open_internal_session(
56 	    conn, "async-cursor", true, 0, &session));
57 	__wt_spin_lock(session, &async->ops_lock);
58 	WT_ERR(__wt_calloc_one(session, &af));
59 	WT_ERR(__wt_strdup(session, uri, &af->uri));
60 	WT_ERR(__wt_strdup(session, config, &af->config));
61 	af->uri_hash = uri_hash;
62 	af->cfg_hash = cfg_hash;
63 	/*
64 	 * Get the key_format and value_format for this URI and store
65 	 * it in the structure so that async->set_key/value work.
66 	 */
67 	wt_session = &session->iface;
68 	WT_ERR(wt_session->open_cursor(wt_session, uri, NULL, NULL, &c));
69 	WT_ERR(__wt_strdup(session, c->key_format, &af->key_format));
70 	WT_ERR(__wt_strdup(session, c->value_format, &af->value_format));
71 	WT_ERR(c->close(c));
72 	c = NULL;
73 
74 	TAILQ_INSERT_HEAD(&async->formatqh, af, q);
75 	__wt_spin_unlock(session, &async->ops_lock);
76 	WT_ERR(wt_session->close(wt_session, NULL));
77 
78 setup:	op->format = af;
79 	/*
80 	 * Copy the pointers for the formats.  Items in the async format
81 	 * queue remain there until the connection is closed.  We must
82 	 * initialize the format fields in the async_op, which are publicly
83 	 * visible, and its internal cursor used by internal key/value
84 	 * functions.
85 	 */
86 	op->iface.c.key_format = op->iface.key_format = af->key_format;
87 	op->iface.c.value_format = op->iface.value_format = af->value_format;
88 	return (0);
89 
90 err:
91 	if (c != NULL)
92 		WT_TRET(c->close(c));
93 	__wt_free(session, af->uri);
94 	__wt_free(session, af->config);
95 	__wt_free(session, af->key_format);
96 	__wt_free(session, af->value_format);
97 	__wt_free(session, af);
98 	return (ret);
99 }
100 
101 /*
102  * __async_new_op_alloc --
103  *	Find and allocate the next available async op handle.
104  */
105 static int
__async_new_op_alloc(WT_SESSION_IMPL * session,const char * uri,const char * config,WT_ASYNC_OP_IMPL ** opp)106 __async_new_op_alloc(WT_SESSION_IMPL *session, const char *uri,
107     const char *config, WT_ASYNC_OP_IMPL **opp)
108 {
109 	WT_ASYNC *async;
110 	WT_ASYNC_OP_IMPL *op;
111 	WT_CONNECTION_IMPL *conn;
112 	uint32_t i, save_i, view;
113 
114 	*opp = NULL;
115 
116 	conn = S2C(session);
117 	async = conn->async;
118 	WT_STAT_CONN_INCR(session, async_op_alloc);
119 
120 retry:
121 	op = NULL;
122 	WT_ORDERED_READ(save_i, async->ops_index);
123 	/*
124 	 * Look after the last one allocated for a free one.  We'd expect
125 	 * ops to be freed mostly FIFO so we should quickly find one.
126 	 */
127 	for (view = 1, i = save_i; i < conn->async_size; i++, view++) {
128 		op = &async->async_ops[i];
129 		if (op->state == WT_ASYNCOP_FREE)
130 			break;
131 	}
132 
133 	/*
134 	 * Loop around back to the beginning if we need to.
135 	 */
136 	if (op == NULL || op->state != WT_ASYNCOP_FREE)
137 		for (i = 0; i < save_i; i++, view++) {
138 			op = &async->async_ops[i];
139 			if (op->state == WT_ASYNCOP_FREE)
140 				break;
141 		}
142 
143 	/*
144 	 * We still haven't found one.  Return an error.
145 	 */
146 	if (op == NULL || op->state != WT_ASYNCOP_FREE) {
147 		WT_STAT_CONN_INCR(session, async_full);
148 		return (__wt_set_return(session, EBUSY));
149 	}
150 	/*
151 	 * Set the state of this op handle as READY for the user to use.
152 	 * If we can set the state then the op entry is ours.
153 	 * Start the next search at the next entry after this one.
154 	 */
155 	if (!__wt_atomic_cas32(&op->state, WT_ASYNCOP_FREE, WT_ASYNCOP_READY)) {
156 		WT_STAT_CONN_INCR(session, async_alloc_race);
157 		goto retry;
158 	}
159 	WT_STAT_CONN_INCRV(session, async_alloc_view, view);
160 	WT_RET(__async_get_format(conn, uri, config, op));
161 	op->unique_id = __wt_atomic_add64(&async->op_id, 1);
162 	op->optype = WT_AOP_NONE;
163 	(void)__wt_atomic_store32(
164 	    &async->ops_index, (i + 1) % conn->async_size);
165 	*opp = op;
166 	return (0);
167 }
168 
169 /*
170  * __async_config --
171  *	Parse and setup the async API options.
172  */
173 static int
__async_config(WT_SESSION_IMPL * session,WT_CONNECTION_IMPL * conn,const char ** cfg,bool * runp)174 __async_config(WT_SESSION_IMPL *session,
175     WT_CONNECTION_IMPL *conn, const char **cfg, bool *runp)
176 {
177 	WT_CONFIG_ITEM cval;
178 
179 	/*
180 	 * The async configuration is off by default.
181 	 */
182 	WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval));
183 	*runp = cval.val != 0;
184 
185 	/*
186 	 * Even if async is turned off, we want to parse and store the default
187 	 * values so that reconfigure can just enable them.
188 	 *
189 	 * Bound the minimum maximum operations at 10.
190 	 */
191 	WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval));
192 	conn->async_size = (uint32_t)WT_MAX(cval.val, 10);
193 
194 	WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval));
195 	conn->async_workers = (uint32_t)cval.val;
196 	/* Sanity check that api_data.py is in sync with async.h */
197 	WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS);
198 
199 	return (0);
200 }
201 
202 /*
203  * __wt_async_stats_update --
204  *	Update the async stats for return to the application.
205  */
206 void
__wt_async_stats_update(WT_SESSION_IMPL * session)207 __wt_async_stats_update(WT_SESSION_IMPL *session)
208 {
209 	WT_ASYNC *async;
210 	WT_CONNECTION_IMPL *conn;
211 	WT_CONNECTION_STATS **stats;
212 
213 	conn = S2C(session);
214 	async = conn->async;
215 	if (async == NULL)
216 		return;
217 	stats = conn->stats;
218 	WT_STAT_SET(session, stats, async_cur_queue, async->cur_queue);
219 	WT_STAT_SET(session, stats, async_max_queue, async->max_queue);
220 }
221 
222 /*
223  * __async_start --
224  *	Start the async subsystem.  All configuration processing has
225  *	already been done by the caller.
226  */
227 static int
__async_start(WT_SESSION_IMPL * session)228 __async_start(WT_SESSION_IMPL *session)
229 {
230 	WT_ASYNC *async;
231 	WT_CONNECTION_IMPL *conn;
232 	uint32_t i, session_flags;
233 
234 	conn = S2C(session);
235 	conn->async_cfg = true;
236 	/*
237 	 * Async is on, allocate the WT_ASYNC structure and initialize the ops.
238 	 */
239 	WT_RET(__wt_calloc_one(session, &conn->async));
240 	async = conn->async;
241 	TAILQ_INIT(&async->formatqh);
242 	WT_RET(__wt_spin_init(session, &async->ops_lock, "ops"));
243 	WT_RET(__wt_cond_alloc(session, "async flush", &async->flush_cond));
244 	WT_RET(__wt_async_op_init(session));
245 
246 	/*
247 	 * Start up the worker threads.
248 	 */
249 	F_SET(conn, WT_CONN_SERVER_ASYNC);
250 	for (i = 0; i < conn->async_workers; i++) {
251 		/*
252 		 * Each worker has its own session.  We set both a general
253 		 * server flag in the connection and an individual flag
254 		 * in the session.  The user may reconfigure the number of
255 		 * workers and we may want to selectively stop some workers
256 		 * while leaving the rest running.
257 		 */
258 		session_flags = WT_SESSION_SERVER_ASYNC;
259 		WT_RET(__wt_open_internal_session(conn, "async-worker",
260 		    true, session_flags, &async->worker_sessions[i]));
261 	}
262 	for (i = 0; i < conn->async_workers; i++) {
263 		/*
264 		 * Start the threads.
265 		 */
266 		WT_RET(__wt_thread_create(session, &async->worker_tids[i],
267 		    __wt_async_worker, async->worker_sessions[i]));
268 	}
269 	__wt_async_stats_update(session);
270 	return (0);
271 }
272 
273 /*
274  * __wt_async_create --
275  *	Start the async subsystem and worker threads.
276  */
277 int
__wt_async_create(WT_SESSION_IMPL * session,const char * cfg[])278 __wt_async_create(WT_SESSION_IMPL *session, const char *cfg[])
279 {
280 	WT_CONNECTION_IMPL *conn;
281 	bool run;
282 
283 	conn = S2C(session);
284 
285 	/* Handle configuration. */
286 	run = false;
287 	WT_RET(__async_config(session, conn, cfg, &run));
288 
289 	/* If async is not configured, we're done. */
290 	if (!run)
291 		return (0);
292 	return (__async_start(session));
293 }
294 
295 /*
296  * __wt_async_reconfig --
297  *	Start the async subsystem and worker threads.
298  */
299 int
__wt_async_reconfig(WT_SESSION_IMPL * session,const char * cfg[])300 __wt_async_reconfig(WT_SESSION_IMPL *session, const char *cfg[])
301 {
302 	WT_ASYNC *async;
303 	WT_CONNECTION_IMPL *conn, tmp_conn;
304 	WT_DECL_RET;
305 	WT_SESSION *wt_session;
306 	uint32_t i, session_flags;
307 	bool run;
308 
309 	conn = S2C(session);
310 	async = conn->async;
311 	memset(&tmp_conn, 0, sizeof(tmp_conn));
312 	tmp_conn.async_cfg = conn->async_cfg;
313 	tmp_conn.async_workers = conn->async_workers;
314 	tmp_conn.async_size = conn->async_size;
315 
316 	/* Handle configuration. */
317 	run = conn->async_cfg;
318 	WT_RET(__async_config(session, &tmp_conn, cfg, &run));
319 
320 	/*
321 	 * There are some restrictions on the live reconfiguration of async.
322 	 * Unlike other subsystems where we simply destroy anything existing
323 	 * and restart with the new configuration, async is not so easy.
324 	 * If the user is just changing the number of workers, we want to
325 	 * allow the existing op handles and other information to remain in
326 	 * existence.  So we must handle various combinations of changes
327 	 * individually.
328 	 *
329 	 * One restriction is that if async is currently on, the user cannot
330 	 * change the number of async op handles available.  The user can try
331 	 * but we do nothing with it.  However we must allow the ops_max config
332 	 * string so that a user can completely start async via reconfigure.
333 	 */
334 
335 	/*
336 	 * Easy cases:
337 	 * 1. If async is on and the user wants it off, shut it down.
338 	 * 2. If async is off, and the user wants it on, start it.
339 	 * 3. If not a toggle and async is off, we're done.
340 	 */
341 	if (conn->async_cfg && !run) {			/* Case 1 */
342 		WT_TRET(__wt_async_flush(session));
343 		ret = __wt_async_destroy(session);
344 		conn->async_cfg = false;
345 		return (ret);
346 	}
347 	if (!conn->async_cfg && run)			/* Case 2 */
348 		return (__async_start(session));
349 	if (!conn->async_cfg)				/* Case 3 */
350 		return (0);
351 
352 	/*
353 	 * Running async worker modification cases:
354 	 * 4. If number of workers didn't change, we're done.
355 	 * 5. If more workers, start new ones.
356 	 * 6. If fewer workers, kill some.
357 	 */
358 	if (conn->async_workers == tmp_conn.async_workers)
359 		/* No change in the number of workers. */
360 		return (0);
361 	if (conn->async_workers < tmp_conn.async_workers) {
362 		/* Case 5 */
363 		/*
364 		 * The worker_sessions array is allocated for the maximum
365 		 * allowed number of workers, so starting more is easy.
366 		 */
367 		for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
368 			/*
369 			 * Each worker has its own session.
370 			 */
371 			session_flags = WT_SESSION_SERVER_ASYNC;
372 			WT_RET(__wt_open_internal_session(conn, "async-worker",
373 			    true, session_flags, &async->worker_sessions[i]));
374 		}
375 		for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
376 			/*
377 			 * Start the threads.
378 			 */
379 			WT_RET(__wt_thread_create(session,
380 			    &async->worker_tids[i], __wt_async_worker,
381 			    async->worker_sessions[i]));
382 		}
383 		conn->async_workers = tmp_conn.async_workers;
384 	}
385 	if (conn->async_workers > tmp_conn.async_workers) {
386 		/* Case 6 */
387 		/*
388 		 * Stopping an individual async worker is the most complex case.
389 		 * We clear the session async flag on the targeted worker thread
390 		 * so that only that thread stops, and the others keep running.
391 		 */
392 		for (i = conn->async_workers - 1;
393 		    i >= tmp_conn.async_workers; i--) {
394 			/*
395 			 * Join any worker we're stopping.
396 			 * After the thread is stopped, close its session.
397 			 */
398 			WT_ASSERT(session, async->worker_tids[i].created);
399 			WT_ASSERT(session, async->worker_sessions[i] != NULL);
400 			F_CLR(async->worker_sessions[i],
401 			    WT_SESSION_SERVER_ASYNC);
402 			WT_TRET(__wt_thread_join(
403 			    session, &async->worker_tids[i]));
404 			wt_session = &async->worker_sessions[i]->iface;
405 			WT_TRET(wt_session->close(wt_session, NULL));
406 			async->worker_sessions[i] = NULL;
407 		}
408 		conn->async_workers = tmp_conn.async_workers;
409 	}
410 
411 	return (0);
412 }
413 
414 /*
415  * __wt_async_destroy --
416  *	Destroy the async worker threads and async subsystem.
417  */
418 int
__wt_async_destroy(WT_SESSION_IMPL * session)419 __wt_async_destroy(WT_SESSION_IMPL *session)
420 {
421 	WT_ASYNC *async;
422 	WT_ASYNC_FORMAT *af;
423 	WT_ASYNC_OP *op;
424 	WT_CONNECTION_IMPL *conn;
425 	WT_DECL_RET;
426 	WT_SESSION *wt_session;
427 	uint32_t i;
428 
429 	conn = S2C(session);
430 	async = conn->async;
431 
432 	if (!conn->async_cfg)
433 		return (0);
434 
435 	F_CLR(conn, WT_CONN_SERVER_ASYNC);
436 	for (i = 0; i < conn->async_workers; i++)
437 		WT_TRET(__wt_thread_join(session, &async->worker_tids[i]));
438 	__wt_cond_destroy(session, &async->flush_cond);
439 
440 	/* Close the server threads' sessions. */
441 	for (i = 0; i < conn->async_workers; i++)
442 		if (async->worker_sessions[i] != NULL) {
443 			wt_session = &async->worker_sessions[i]->iface;
444 			WT_TRET(wt_session->close(wt_session, NULL));
445 			async->worker_sessions[i] = NULL;
446 		}
447 	/* Free any op key/value buffers. */
448 	for (i = 0; i < conn->async_size; i++) {
449 		op = (WT_ASYNC_OP *)&async->async_ops[i];
450 		if (op->c.key.data != NULL)
451 			__wt_buf_free(session, &op->c.key);
452 		if (op->c.value.data != NULL)
453 			__wt_buf_free(session, &op->c.value);
454 	}
455 
456 	/* Free format resources */
457 	while ((af = TAILQ_FIRST(&async->formatqh)) != NULL) {
458 		TAILQ_REMOVE(&async->formatqh, af, q);
459 		__wt_free(session, af->uri);
460 		__wt_free(session, af->config);
461 		__wt_free(session, af->key_format);
462 		__wt_free(session, af->value_format);
463 		__wt_free(session, af);
464 	}
465 	__wt_free(session, async->async_queue);
466 	__wt_free(session, async->async_ops);
467 	__wt_spin_destroy(session, &async->ops_lock);
468 	__wt_free(session, conn->async);
469 
470 	return (ret);
471 }
472 
473 /*
474  * __wt_async_flush --
475  *	Implementation of the WT_CONN->async_flush method.
476  */
477 int
__wt_async_flush(WT_SESSION_IMPL * session)478 __wt_async_flush(WT_SESSION_IMPL *session)
479 {
480 	WT_ASYNC *async;
481 	WT_CONNECTION_IMPL *conn;
482 	uint32_t i, workers;
483 
484 	conn = S2C(session);
485 	if (!conn->async_cfg)
486 		return (0);
487 
488 	async = conn->async;
489 	/*
490 	 * Only add a flush operation if there are workers who can process
491 	 * it.  Otherwise we will wait forever.
492 	 */
493 	workers = 0;
494 	for (i = 0; i < conn->async_workers; ++i)
495 		if (async->worker_tids[i].created)
496 			++workers;
497 	if (workers == 0)
498 		return (0);
499 
500 	WT_STAT_CONN_INCR(session, async_flush);
501 	/*
502 	 * We have to do several things.  First we have to prevent
503 	 * other callers from racing with us so that only one
504 	 * flush is happening at a time.  Next we have to wait for
505 	 * the worker threads to notice the flush and indicate
506 	 * that the flush is complete on their side.  Then we
507 	 * clear the flush flags and return.
508 	 */
509 retry:
510 	while (async->flush_state != WT_ASYNC_FLUSH_NONE)
511 		/*
512 		 * We're racing an in-progress flush.  We need to wait
513 		 * our turn to start our own.  We need to convoy the
514 		 * racing calls because a later call may be waiting for
515 		 * specific enqueued ops to be complete before this returns.
516 		 */
517 		__wt_sleep(0, 100000);
518 
519 	if (!__wt_atomic_cas32(&async->flush_state, WT_ASYNC_FLUSH_NONE,
520 	    WT_ASYNC_FLUSH_IN_PROGRESS))
521 		goto retry;
522 	/*
523 	 * We're the owner of this flush operation.  Set the
524 	 * WT_ASYNC_FLUSH_IN_PROGRESS to block other callers.
525 	 * We're also preventing all worker threads from taking
526 	 * things off the work queue with the lock.
527 	 */
528 	async->flush_count = 0;
529 	(void)__wt_atomic_add64(&async->flush_gen, 1);
530 	WT_ASSERT(session, async->flush_op.state == WT_ASYNCOP_FREE);
531 	async->flush_op.state = WT_ASYNCOP_READY;
532 	WT_RET(__wt_async_op_enqueue(session, &async->flush_op));
533 	while (async->flush_state != WT_ASYNC_FLUSH_COMPLETE)
534 		__wt_cond_wait(session, async->flush_cond, 100000, NULL);
535 	/*
536 	 * Flush is done.  Clear the flags.
537 	 */
538 	async->flush_op.state = WT_ASYNCOP_FREE;
539 	WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_NONE);
540 	return (0);
541 }
542 
543 /*
544  * __async_runtime_config --
545  *	Configure runtime fields at allocation.
546  */
547 static int
__async_runtime_config(WT_ASYNC_OP_IMPL * op,const char * cfg[])548 __async_runtime_config(WT_ASYNC_OP_IMPL *op, const char *cfg[])
549 {
550 	WT_ASYNC_OP *asyncop;
551 	WT_CONFIG_ITEM cval;
552 	WT_SESSION_IMPL *session;
553 
554 	session = O2S(op);
555 	asyncop = (WT_ASYNC_OP *)op;
556 	WT_RET(__wt_config_gets_def(session, cfg, "append", 0, &cval));
557 	if (cval.val)
558 		F_SET(&asyncop->c, WT_CURSTD_APPEND);
559 	else
560 		F_CLR(&asyncop->c, WT_CURSTD_APPEND);
561 	WT_RET(__wt_config_gets_def(session, cfg, "overwrite", 1, &cval));
562 	if (cval.val)
563 		F_SET(&asyncop->c, WT_CURSTD_OVERWRITE);
564 	else
565 		F_CLR(&asyncop->c, WT_CURSTD_OVERWRITE);
566 	WT_RET(__wt_config_gets_def(session, cfg, "raw", 0, &cval));
567 	if (cval.val)
568 		F_SET(&asyncop->c, WT_CURSTD_RAW);
569 	else
570 		F_CLR(&asyncop->c, WT_CURSTD_RAW);
571 	return (0);
572 
573 }
574 
575 /*
576  * __wt_async_new_op --
577  *	Implementation of the WT_CONN->async_new_op method.
578  */
579 int
__wt_async_new_op(WT_SESSION_IMPL * session,const char * uri,const char * config,const char * cfg[],WT_ASYNC_CALLBACK * cb,WT_ASYNC_OP_IMPL ** opp)580 __wt_async_new_op(WT_SESSION_IMPL *session, const char *uri,
581     const char *config, const char *cfg[], WT_ASYNC_CALLBACK *cb,
582     WT_ASYNC_OP_IMPL **opp)
583 {
584 	WT_ASYNC_OP_IMPL *op;
585 	WT_CONNECTION_IMPL *conn;
586 	WT_DECL_RET;
587 
588 	*opp = NULL;
589 
590 	conn = S2C(session);
591 	if (!conn->async_cfg)
592 		WT_RET_MSG(
593 		    session, ENOTSUP, "Asynchronous operations not configured");
594 
595 	op = NULL;
596 	WT_ERR(__async_new_op_alloc(session, uri, config, &op));
597 	WT_ERR(__async_runtime_config(op, cfg));
598 	op->cb = cb;
599 	*opp = op;
600 	return (0);
601 
602 err:
603 	/*
604 	 * If we get an error after allocating op, set its state to free.
605 	 */
606 	if (op != NULL)
607 		op->state = WT_ASYNCOP_FREE;
608 	return (ret);
609 }
610