1 /*-
2 * Copyright (c) 2014-2018 MongoDB, Inc.
3 * Copyright (c) 2008-2014 WiredTiger, Inc.
4 * All rights reserved.
5 *
6 * See the file LICENSE for redistribution information.
7 */
8
9 #include "wt_internal.h"
10
11 /*
12 * __async_get_format --
13 * Find or allocate the uri/config/format structure.
14 */
15 static int
__async_get_format(WT_CONNECTION_IMPL * conn,const char * uri,const char * config,WT_ASYNC_OP_IMPL * op)16 __async_get_format(WT_CONNECTION_IMPL *conn, const char *uri,
17 const char *config, WT_ASYNC_OP_IMPL *op)
18 {
19 WT_ASYNC *async;
20 WT_ASYNC_FORMAT *af;
21 WT_CURSOR *c;
22 WT_DECL_RET;
23 WT_SESSION *wt_session;
24 WT_SESSION_IMPL *session;
25 uint64_t cfg_hash, uri_hash;
26
27 async = conn->async;
28 c = NULL;
29 op->format = NULL;
30
31 if (uri != NULL)
32 uri_hash = __wt_hash_city64(uri, strlen(uri));
33 else
34 uri_hash = 0;
35 if (config != NULL)
36 cfg_hash = __wt_hash_city64(config, strlen(config));
37 else
38 cfg_hash = 0;
39
40 /*
41 * We don't need to hold a lock around this walk. The list is
42 * permanent and always valid. We might race an insert and there
43 * is a possibility a duplicate entry might be inserted, but
44 * that is not harmful.
45 */
46 TAILQ_FOREACH(af, &async->formatqh, q) {
47 if (af->uri_hash == uri_hash && af->cfg_hash == cfg_hash)
48 goto setup;
49 }
50 /*
51 * We didn't find one in the cache. Allocate and initialize one.
52 * Insert it at the head expecting LRU usage. We need a real session
53 * for the cursor.
54 */
55 WT_RET(__wt_open_internal_session(
56 conn, "async-cursor", true, 0, &session));
57 __wt_spin_lock(session, &async->ops_lock);
58 WT_ERR(__wt_calloc_one(session, &af));
59 WT_ERR(__wt_strdup(session, uri, &af->uri));
60 WT_ERR(__wt_strdup(session, config, &af->config));
61 af->uri_hash = uri_hash;
62 af->cfg_hash = cfg_hash;
63 /*
64 * Get the key_format and value_format for this URI and store
65 * it in the structure so that async->set_key/value work.
66 */
67 wt_session = &session->iface;
68 WT_ERR(wt_session->open_cursor(wt_session, uri, NULL, NULL, &c));
69 WT_ERR(__wt_strdup(session, c->key_format, &af->key_format));
70 WT_ERR(__wt_strdup(session, c->value_format, &af->value_format));
71 WT_ERR(c->close(c));
72 c = NULL;
73
74 TAILQ_INSERT_HEAD(&async->formatqh, af, q);
75 __wt_spin_unlock(session, &async->ops_lock);
76 WT_ERR(wt_session->close(wt_session, NULL));
77
78 setup: op->format = af;
79 /*
80 * Copy the pointers for the formats. Items in the async format
81 * queue remain there until the connection is closed. We must
82 * initialize the format fields in the async_op, which are publicly
83 * visible, and its internal cursor used by internal key/value
84 * functions.
85 */
86 op->iface.c.key_format = op->iface.key_format = af->key_format;
87 op->iface.c.value_format = op->iface.value_format = af->value_format;
88 return (0);
89
90 err:
91 if (c != NULL)
92 WT_TRET(c->close(c));
93 __wt_free(session, af->uri);
94 __wt_free(session, af->config);
95 __wt_free(session, af->key_format);
96 __wt_free(session, af->value_format);
97 __wt_free(session, af);
98 return (ret);
99 }
100
101 /*
102 * __async_new_op_alloc --
103 * Find and allocate the next available async op handle.
104 */
105 static int
__async_new_op_alloc(WT_SESSION_IMPL * session,const char * uri,const char * config,WT_ASYNC_OP_IMPL ** opp)106 __async_new_op_alloc(WT_SESSION_IMPL *session, const char *uri,
107 const char *config, WT_ASYNC_OP_IMPL **opp)
108 {
109 WT_ASYNC *async;
110 WT_ASYNC_OP_IMPL *op;
111 WT_CONNECTION_IMPL *conn;
112 uint32_t i, save_i, view;
113
114 *opp = NULL;
115
116 conn = S2C(session);
117 async = conn->async;
118 WT_STAT_CONN_INCR(session, async_op_alloc);
119
120 retry:
121 op = NULL;
122 WT_ORDERED_READ(save_i, async->ops_index);
123 /*
124 * Look after the last one allocated for a free one. We'd expect
125 * ops to be freed mostly FIFO so we should quickly find one.
126 */
127 for (view = 1, i = save_i; i < conn->async_size; i++, view++) {
128 op = &async->async_ops[i];
129 if (op->state == WT_ASYNCOP_FREE)
130 break;
131 }
132
133 /*
134 * Loop around back to the beginning if we need to.
135 */
136 if (op == NULL || op->state != WT_ASYNCOP_FREE)
137 for (i = 0; i < save_i; i++, view++) {
138 op = &async->async_ops[i];
139 if (op->state == WT_ASYNCOP_FREE)
140 break;
141 }
142
143 /*
144 * We still haven't found one. Return an error.
145 */
146 if (op == NULL || op->state != WT_ASYNCOP_FREE) {
147 WT_STAT_CONN_INCR(session, async_full);
148 return (__wt_set_return(session, EBUSY));
149 }
150 /*
151 * Set the state of this op handle as READY for the user to use.
152 * If we can set the state then the op entry is ours.
153 * Start the next search at the next entry after this one.
154 */
155 if (!__wt_atomic_cas32(&op->state, WT_ASYNCOP_FREE, WT_ASYNCOP_READY)) {
156 WT_STAT_CONN_INCR(session, async_alloc_race);
157 goto retry;
158 }
159 WT_STAT_CONN_INCRV(session, async_alloc_view, view);
160 WT_RET(__async_get_format(conn, uri, config, op));
161 op->unique_id = __wt_atomic_add64(&async->op_id, 1);
162 op->optype = WT_AOP_NONE;
163 (void)__wt_atomic_store32(
164 &async->ops_index, (i + 1) % conn->async_size);
165 *opp = op;
166 return (0);
167 }
168
169 /*
170 * __async_config --
171 * Parse and setup the async API options.
172 */
173 static int
__async_config(WT_SESSION_IMPL * session,WT_CONNECTION_IMPL * conn,const char ** cfg,bool * runp)174 __async_config(WT_SESSION_IMPL *session,
175 WT_CONNECTION_IMPL *conn, const char **cfg, bool *runp)
176 {
177 WT_CONFIG_ITEM cval;
178
179 /*
180 * The async configuration is off by default.
181 */
182 WT_RET(__wt_config_gets(session, cfg, "async.enabled", &cval));
183 *runp = cval.val != 0;
184
185 /*
186 * Even if async is turned off, we want to parse and store the default
187 * values so that reconfigure can just enable them.
188 *
189 * Bound the minimum maximum operations at 10.
190 */
191 WT_RET(__wt_config_gets(session, cfg, "async.ops_max", &cval));
192 conn->async_size = (uint32_t)WT_MAX(cval.val, 10);
193
194 WT_RET(__wt_config_gets(session, cfg, "async.threads", &cval));
195 conn->async_workers = (uint32_t)cval.val;
196 /* Sanity check that api_data.py is in sync with async.h */
197 WT_ASSERT(session, conn->async_workers <= WT_ASYNC_MAX_WORKERS);
198
199 return (0);
200 }
201
202 /*
203 * __wt_async_stats_update --
204 * Update the async stats for return to the application.
205 */
206 void
__wt_async_stats_update(WT_SESSION_IMPL * session)207 __wt_async_stats_update(WT_SESSION_IMPL *session)
208 {
209 WT_ASYNC *async;
210 WT_CONNECTION_IMPL *conn;
211 WT_CONNECTION_STATS **stats;
212
213 conn = S2C(session);
214 async = conn->async;
215 if (async == NULL)
216 return;
217 stats = conn->stats;
218 WT_STAT_SET(session, stats, async_cur_queue, async->cur_queue);
219 WT_STAT_SET(session, stats, async_max_queue, async->max_queue);
220 }
221
222 /*
223 * __async_start --
224 * Start the async subsystem. All configuration processing has
225 * already been done by the caller.
226 */
227 static int
__async_start(WT_SESSION_IMPL * session)228 __async_start(WT_SESSION_IMPL *session)
229 {
230 WT_ASYNC *async;
231 WT_CONNECTION_IMPL *conn;
232 uint32_t i, session_flags;
233
234 conn = S2C(session);
235 conn->async_cfg = true;
236 /*
237 * Async is on, allocate the WT_ASYNC structure and initialize the ops.
238 */
239 WT_RET(__wt_calloc_one(session, &conn->async));
240 async = conn->async;
241 TAILQ_INIT(&async->formatqh);
242 WT_RET(__wt_spin_init(session, &async->ops_lock, "ops"));
243 WT_RET(__wt_cond_alloc(session, "async flush", &async->flush_cond));
244 WT_RET(__wt_async_op_init(session));
245
246 /*
247 * Start up the worker threads.
248 */
249 F_SET(conn, WT_CONN_SERVER_ASYNC);
250 for (i = 0; i < conn->async_workers; i++) {
251 /*
252 * Each worker has its own session. We set both a general
253 * server flag in the connection and an individual flag
254 * in the session. The user may reconfigure the number of
255 * workers and we may want to selectively stop some workers
256 * while leaving the rest running.
257 */
258 session_flags = WT_SESSION_SERVER_ASYNC;
259 WT_RET(__wt_open_internal_session(conn, "async-worker",
260 true, session_flags, &async->worker_sessions[i]));
261 }
262 for (i = 0; i < conn->async_workers; i++) {
263 /*
264 * Start the threads.
265 */
266 WT_RET(__wt_thread_create(session, &async->worker_tids[i],
267 __wt_async_worker, async->worker_sessions[i]));
268 }
269 __wt_async_stats_update(session);
270 return (0);
271 }
272
273 /*
274 * __wt_async_create --
275 * Start the async subsystem and worker threads.
276 */
277 int
__wt_async_create(WT_SESSION_IMPL * session,const char * cfg[])278 __wt_async_create(WT_SESSION_IMPL *session, const char *cfg[])
279 {
280 WT_CONNECTION_IMPL *conn;
281 bool run;
282
283 conn = S2C(session);
284
285 /* Handle configuration. */
286 run = false;
287 WT_RET(__async_config(session, conn, cfg, &run));
288
289 /* If async is not configured, we're done. */
290 if (!run)
291 return (0);
292 return (__async_start(session));
293 }
294
295 /*
296 * __wt_async_reconfig --
297 * Start the async subsystem and worker threads.
298 */
299 int
__wt_async_reconfig(WT_SESSION_IMPL * session,const char * cfg[])300 __wt_async_reconfig(WT_SESSION_IMPL *session, const char *cfg[])
301 {
302 WT_ASYNC *async;
303 WT_CONNECTION_IMPL *conn, tmp_conn;
304 WT_DECL_RET;
305 WT_SESSION *wt_session;
306 uint32_t i, session_flags;
307 bool run;
308
309 conn = S2C(session);
310 async = conn->async;
311 memset(&tmp_conn, 0, sizeof(tmp_conn));
312 tmp_conn.async_cfg = conn->async_cfg;
313 tmp_conn.async_workers = conn->async_workers;
314 tmp_conn.async_size = conn->async_size;
315
316 /* Handle configuration. */
317 run = conn->async_cfg;
318 WT_RET(__async_config(session, &tmp_conn, cfg, &run));
319
320 /*
321 * There are some restrictions on the live reconfiguration of async.
322 * Unlike other subsystems where we simply destroy anything existing
323 * and restart with the new configuration, async is not so easy.
324 * If the user is just changing the number of workers, we want to
325 * allow the existing op handles and other information to remain in
326 * existence. So we must handle various combinations of changes
327 * individually.
328 *
329 * One restriction is that if async is currently on, the user cannot
330 * change the number of async op handles available. The user can try
331 * but we do nothing with it. However we must allow the ops_max config
332 * string so that a user can completely start async via reconfigure.
333 */
334
335 /*
336 * Easy cases:
337 * 1. If async is on and the user wants it off, shut it down.
338 * 2. If async is off, and the user wants it on, start it.
339 * 3. If not a toggle and async is off, we're done.
340 */
341 if (conn->async_cfg && !run) { /* Case 1 */
342 WT_TRET(__wt_async_flush(session));
343 ret = __wt_async_destroy(session);
344 conn->async_cfg = false;
345 return (ret);
346 }
347 if (!conn->async_cfg && run) /* Case 2 */
348 return (__async_start(session));
349 if (!conn->async_cfg) /* Case 3 */
350 return (0);
351
352 /*
353 * Running async worker modification cases:
354 * 4. If number of workers didn't change, we're done.
355 * 5. If more workers, start new ones.
356 * 6. If fewer workers, kill some.
357 */
358 if (conn->async_workers == tmp_conn.async_workers)
359 /* No change in the number of workers. */
360 return (0);
361 if (conn->async_workers < tmp_conn.async_workers) {
362 /* Case 5 */
363 /*
364 * The worker_sessions array is allocated for the maximum
365 * allowed number of workers, so starting more is easy.
366 */
367 for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
368 /*
369 * Each worker has its own session.
370 */
371 session_flags = WT_SESSION_SERVER_ASYNC;
372 WT_RET(__wt_open_internal_session(conn, "async-worker",
373 true, session_flags, &async->worker_sessions[i]));
374 }
375 for (i = conn->async_workers; i < tmp_conn.async_workers; i++) {
376 /*
377 * Start the threads.
378 */
379 WT_RET(__wt_thread_create(session,
380 &async->worker_tids[i], __wt_async_worker,
381 async->worker_sessions[i]));
382 }
383 conn->async_workers = tmp_conn.async_workers;
384 }
385 if (conn->async_workers > tmp_conn.async_workers) {
386 /* Case 6 */
387 /*
388 * Stopping an individual async worker is the most complex case.
389 * We clear the session async flag on the targeted worker thread
390 * so that only that thread stops, and the others keep running.
391 */
392 for (i = conn->async_workers - 1;
393 i >= tmp_conn.async_workers; i--) {
394 /*
395 * Join any worker we're stopping.
396 * After the thread is stopped, close its session.
397 */
398 WT_ASSERT(session, async->worker_tids[i].created);
399 WT_ASSERT(session, async->worker_sessions[i] != NULL);
400 F_CLR(async->worker_sessions[i],
401 WT_SESSION_SERVER_ASYNC);
402 WT_TRET(__wt_thread_join(
403 session, &async->worker_tids[i]));
404 wt_session = &async->worker_sessions[i]->iface;
405 WT_TRET(wt_session->close(wt_session, NULL));
406 async->worker_sessions[i] = NULL;
407 }
408 conn->async_workers = tmp_conn.async_workers;
409 }
410
411 return (0);
412 }
413
414 /*
415 * __wt_async_destroy --
416 * Destroy the async worker threads and async subsystem.
417 */
418 int
__wt_async_destroy(WT_SESSION_IMPL * session)419 __wt_async_destroy(WT_SESSION_IMPL *session)
420 {
421 WT_ASYNC *async;
422 WT_ASYNC_FORMAT *af;
423 WT_ASYNC_OP *op;
424 WT_CONNECTION_IMPL *conn;
425 WT_DECL_RET;
426 WT_SESSION *wt_session;
427 uint32_t i;
428
429 conn = S2C(session);
430 async = conn->async;
431
432 if (!conn->async_cfg)
433 return (0);
434
435 F_CLR(conn, WT_CONN_SERVER_ASYNC);
436 for (i = 0; i < conn->async_workers; i++)
437 WT_TRET(__wt_thread_join(session, &async->worker_tids[i]));
438 __wt_cond_destroy(session, &async->flush_cond);
439
440 /* Close the server threads' sessions. */
441 for (i = 0; i < conn->async_workers; i++)
442 if (async->worker_sessions[i] != NULL) {
443 wt_session = &async->worker_sessions[i]->iface;
444 WT_TRET(wt_session->close(wt_session, NULL));
445 async->worker_sessions[i] = NULL;
446 }
447 /* Free any op key/value buffers. */
448 for (i = 0; i < conn->async_size; i++) {
449 op = (WT_ASYNC_OP *)&async->async_ops[i];
450 if (op->c.key.data != NULL)
451 __wt_buf_free(session, &op->c.key);
452 if (op->c.value.data != NULL)
453 __wt_buf_free(session, &op->c.value);
454 }
455
456 /* Free format resources */
457 while ((af = TAILQ_FIRST(&async->formatqh)) != NULL) {
458 TAILQ_REMOVE(&async->formatqh, af, q);
459 __wt_free(session, af->uri);
460 __wt_free(session, af->config);
461 __wt_free(session, af->key_format);
462 __wt_free(session, af->value_format);
463 __wt_free(session, af);
464 }
465 __wt_free(session, async->async_queue);
466 __wt_free(session, async->async_ops);
467 __wt_spin_destroy(session, &async->ops_lock);
468 __wt_free(session, conn->async);
469
470 return (ret);
471 }
472
473 /*
474 * __wt_async_flush --
475 * Implementation of the WT_CONN->async_flush method.
476 */
477 int
__wt_async_flush(WT_SESSION_IMPL * session)478 __wt_async_flush(WT_SESSION_IMPL *session)
479 {
480 WT_ASYNC *async;
481 WT_CONNECTION_IMPL *conn;
482 uint32_t i, workers;
483
484 conn = S2C(session);
485 if (!conn->async_cfg)
486 return (0);
487
488 async = conn->async;
489 /*
490 * Only add a flush operation if there are workers who can process
491 * it. Otherwise we will wait forever.
492 */
493 workers = 0;
494 for (i = 0; i < conn->async_workers; ++i)
495 if (async->worker_tids[i].created)
496 ++workers;
497 if (workers == 0)
498 return (0);
499
500 WT_STAT_CONN_INCR(session, async_flush);
501 /*
502 * We have to do several things. First we have to prevent
503 * other callers from racing with us so that only one
504 * flush is happening at a time. Next we have to wait for
505 * the worker threads to notice the flush and indicate
506 * that the flush is complete on their side. Then we
507 * clear the flush flags and return.
508 */
509 retry:
510 while (async->flush_state != WT_ASYNC_FLUSH_NONE)
511 /*
512 * We're racing an in-progress flush. We need to wait
513 * our turn to start our own. We need to convoy the
514 * racing calls because a later call may be waiting for
515 * specific enqueued ops to be complete before this returns.
516 */
517 __wt_sleep(0, 100000);
518
519 if (!__wt_atomic_cas32(&async->flush_state, WT_ASYNC_FLUSH_NONE,
520 WT_ASYNC_FLUSH_IN_PROGRESS))
521 goto retry;
522 /*
523 * We're the owner of this flush operation. Set the
524 * WT_ASYNC_FLUSH_IN_PROGRESS to block other callers.
525 * We're also preventing all worker threads from taking
526 * things off the work queue with the lock.
527 */
528 async->flush_count = 0;
529 (void)__wt_atomic_add64(&async->flush_gen, 1);
530 WT_ASSERT(session, async->flush_op.state == WT_ASYNCOP_FREE);
531 async->flush_op.state = WT_ASYNCOP_READY;
532 WT_RET(__wt_async_op_enqueue(session, &async->flush_op));
533 while (async->flush_state != WT_ASYNC_FLUSH_COMPLETE)
534 __wt_cond_wait(session, async->flush_cond, 100000, NULL);
535 /*
536 * Flush is done. Clear the flags.
537 */
538 async->flush_op.state = WT_ASYNCOP_FREE;
539 WT_PUBLISH(async->flush_state, WT_ASYNC_FLUSH_NONE);
540 return (0);
541 }
542
543 /*
544 * __async_runtime_config --
545 * Configure runtime fields at allocation.
546 */
547 static int
__async_runtime_config(WT_ASYNC_OP_IMPL * op,const char * cfg[])548 __async_runtime_config(WT_ASYNC_OP_IMPL *op, const char *cfg[])
549 {
550 WT_ASYNC_OP *asyncop;
551 WT_CONFIG_ITEM cval;
552 WT_SESSION_IMPL *session;
553
554 session = O2S(op);
555 asyncop = (WT_ASYNC_OP *)op;
556 WT_RET(__wt_config_gets_def(session, cfg, "append", 0, &cval));
557 if (cval.val)
558 F_SET(&asyncop->c, WT_CURSTD_APPEND);
559 else
560 F_CLR(&asyncop->c, WT_CURSTD_APPEND);
561 WT_RET(__wt_config_gets_def(session, cfg, "overwrite", 1, &cval));
562 if (cval.val)
563 F_SET(&asyncop->c, WT_CURSTD_OVERWRITE);
564 else
565 F_CLR(&asyncop->c, WT_CURSTD_OVERWRITE);
566 WT_RET(__wt_config_gets_def(session, cfg, "raw", 0, &cval));
567 if (cval.val)
568 F_SET(&asyncop->c, WT_CURSTD_RAW);
569 else
570 F_CLR(&asyncop->c, WT_CURSTD_RAW);
571 return (0);
572
573 }
574
575 /*
576 * __wt_async_new_op --
577 * Implementation of the WT_CONN->async_new_op method.
578 */
579 int
__wt_async_new_op(WT_SESSION_IMPL * session,const char * uri,const char * config,const char * cfg[],WT_ASYNC_CALLBACK * cb,WT_ASYNC_OP_IMPL ** opp)580 __wt_async_new_op(WT_SESSION_IMPL *session, const char *uri,
581 const char *config, const char *cfg[], WT_ASYNC_CALLBACK *cb,
582 WT_ASYNC_OP_IMPL **opp)
583 {
584 WT_ASYNC_OP_IMPL *op;
585 WT_CONNECTION_IMPL *conn;
586 WT_DECL_RET;
587
588 *opp = NULL;
589
590 conn = S2C(session);
591 if (!conn->async_cfg)
592 WT_RET_MSG(
593 session, ENOTSUP, "Asynchronous operations not configured");
594
595 op = NULL;
596 WT_ERR(__async_new_op_alloc(session, uri, config, &op));
597 WT_ERR(__async_runtime_config(op, cfg));
598 op->cb = cb;
599 *opp = op;
600 return (0);
601
602 err:
603 /*
604 * If we get an error after allocating op, set its state to free.
605 */
606 if (op != NULL)
607 op->state = WT_ASYNCOP_FREE;
608 return (ret);
609 }
610