1 /* batch_fsync.c --- efficiently fsync multiple targets
2 *
3 * ====================================================================
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 * ====================================================================
21 */
22
23 #include <apr_thread_pool.h>
24 #include <apr_thread_cond.h>
25
26 #include "batch_fsync.h"
27 #include "svn_pools.h"
28 #include "svn_hash.h"
29 #include "svn_dirent_uri.h"
30 #include "svn_private_config.h"
31
32 #include "private/svn_atomic.h"
33 #include "private/svn_dep_compat.h"
34 #include "private/svn_mutex.h"
35 #include "private/svn_subr_private.h"
36
37 /* Handy macro to check APR function results and turning them into
38 * svn_error_t upon failure. */
39 #define WRAP_APR_ERR(x,msg) \
40 { \
41 apr_status_t status_ = (x); \
42 if (status_) \
43 return svn_error_wrap_apr(status_, msg); \
44 }
45
46
47 /* A simple SVN-wrapper around the apr_thread_cond_* API */
48 #if APR_HAS_THREADS
49 typedef apr_thread_cond_t svn_thread_cond__t;
50 #else
51 typedef int svn_thread_cond__t;
52 #endif
53
54 static svn_error_t *
svn_thread_cond__create(svn_thread_cond__t ** cond,apr_pool_t * result_pool)55 svn_thread_cond__create(svn_thread_cond__t **cond,
56 apr_pool_t *result_pool)
57 {
58 #if APR_HAS_THREADS
59
60 WRAP_APR_ERR(apr_thread_cond_create(cond, result_pool),
61 _("Can't create condition variable"));
62
63 #else
64
65 *cond = apr_pcalloc(result_pool, sizeof(**cond));
66
67 #endif
68
69 return SVN_NO_ERROR;
70 }
71
72 static svn_error_t *
svn_thread_cond__broadcast(svn_thread_cond__t * cond)73 svn_thread_cond__broadcast(svn_thread_cond__t *cond)
74 {
75 #if APR_HAS_THREADS
76
77 WRAP_APR_ERR(apr_thread_cond_broadcast(cond),
78 _("Can't broadcast condition variable"));
79
80 #endif
81
82 return SVN_NO_ERROR;
83 }
84
85 static svn_error_t *
svn_thread_cond__wait(svn_thread_cond__t * cond,svn_mutex__t * mutex)86 svn_thread_cond__wait(svn_thread_cond__t *cond,
87 svn_mutex__t *mutex)
88 {
89 #if APR_HAS_THREADS
90
91 WRAP_APR_ERR(apr_thread_cond_wait(cond, svn_mutex__get(mutex)),
92 _("Can't broadcast condition variable"));
93
94 #endif
95
96 return SVN_NO_ERROR;
97 }
98
99 /* Utility construct: Clients can efficiently wait for the encapsulated
100 * counter to reach a certain value. Currently, only increments have been
101 * implemented. This whole structure can be opaque to the API users.
102 */
103 typedef struct waitable_counter_t
104 {
105 /* Current value, initialized to 0. */
106 int value;
107
108 /* Synchronization objects. */
109 svn_thread_cond__t *cond;
110 svn_mutex__t *mutex;
111 } waitable_counter_t;
112
113 /* Set *COUNTER_P to a new waitable_counter_t instance allocated in
114 * RESULT_POOL. The initial counter value is 0. */
115 static svn_error_t *
waitable_counter__create(waitable_counter_t ** counter_p,apr_pool_t * result_pool)116 waitable_counter__create(waitable_counter_t **counter_p,
117 apr_pool_t *result_pool)
118 {
119 waitable_counter_t *counter = apr_pcalloc(result_pool, sizeof(*counter));
120 counter->value = 0;
121
122 SVN_ERR(svn_thread_cond__create(&counter->cond, result_pool));
123 SVN_ERR(svn_mutex__init(&counter->mutex, TRUE, result_pool));
124
125 *counter_p = counter;
126
127 return SVN_NO_ERROR;
128 }
129
130 /* Increment the value in COUNTER by 1. */
131 static svn_error_t *
waitable_counter__increment(waitable_counter_t * counter)132 waitable_counter__increment(waitable_counter_t *counter)
133 {
134 SVN_ERR(svn_mutex__lock(counter->mutex));
135 counter->value++;
136
137 SVN_ERR(svn_thread_cond__broadcast(counter->cond));
138 SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
139
140 return SVN_NO_ERROR;
141 }
142
143 /* Efficiently wait for COUNTER to assume VALUE. */
144 static svn_error_t *
waitable_counter__wait_for(waitable_counter_t * counter,int value)145 waitable_counter__wait_for(waitable_counter_t *counter,
146 int value)
147 {
148 svn_boolean_t done = FALSE;
149
150 /* This loop implicitly handles spurious wake-ups. */
151 do
152 {
153 SVN_ERR(svn_mutex__lock(counter->mutex));
154
155 if (counter->value == value)
156 done = TRUE;
157 else
158 SVN_ERR(svn_thread_cond__wait(counter->cond, counter->mutex));
159
160 SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
161 }
162 while (!done);
163
164 return SVN_NO_ERROR;
165 }
166
167 /* Set the value in COUNTER to 0. */
168 static svn_error_t *
waitable_counter__reset(waitable_counter_t * counter)169 waitable_counter__reset(waitable_counter_t *counter)
170 {
171 SVN_ERR(svn_mutex__lock(counter->mutex));
172 counter->value = 0;
173 SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
174
175 SVN_ERR(svn_thread_cond__broadcast(counter->cond));
176
177 return SVN_NO_ERROR;
178 }
179
180 /* Entry type for the svn_fs_x__batch_fsync_t collection. There is one
181 * instance per file handle.
182 */
183 typedef struct to_sync_t
184 {
185 /* Open handle of the file / directory to fsync. */
186 apr_file_t *file;
187
188 /* Pool to use with FILE. It is private to FILE such that it can be
189 * used safely together with FILE in a separate thread. */
190 apr_pool_t *pool;
191
192 /* Result of the file operations. */
193 svn_error_t *result;
194
195 /* Counter to increment when we completed the task. */
196 waitable_counter_t *counter;
197 } to_sync_t;
198
199 /* The actual collection object. */
200 struct svn_fs_x__batch_fsync_t
201 {
202 /* Maps open file handles: C-string path to to_sync_t *. */
203 apr_hash_t *files;
204
205 /* Counts the number of completed fsync tasks. */
206 waitable_counter_t *counter;
207
208 /* Perform fsyncs only if this flag has been set. */
209 svn_boolean_t flush_to_disk;
210 };
211
212 /* Data structures for concurrent fsync execution are only available if
213 * we have threading support.
214 */
215 #if APR_HAS_THREADS
216
217 /* Number of microseconds that an unused thread remains in the pool before
218 * being terminated.
219 *
220 * Higher values are useful if clients frequently send small requests and
221 * you want to minimize the latency for those.
222 */
223 #define THREADPOOL_THREAD_IDLE_LIMIT 1000000
224
225 /* Maximum number of threads in THREAD_POOL, i.e. number of paths we can
226 * fsync concurrently throughout the process. */
227 #define MAX_THREADS 16
228
229 /* Thread pool to execute the fsync tasks. */
230 static apr_thread_pool_t *thread_pool = NULL;
231
232 #endif
233
234 /* Keep track on whether we already created the THREAD_POOL . */
235 static svn_atomic_t thread_pool_initialized = FALSE;
236
237 /* We open non-directory files with these flags. */
238 #define FILE_FLAGS (APR_READ | APR_WRITE | APR_BUFFERED | APR_CREATE)
239
240 #if APR_HAS_THREADS
241
242 /* Destructor function that implicitly cleans up any running threads
243 in the TRHEAD_POOL *once*.
244
245 Must be run as a pre-cleanup hook.
246 */
247 static apr_status_t
thread_pool_pre_cleanup(void * data)248 thread_pool_pre_cleanup(void *data)
249 {
250 apr_thread_pool_t *tp = thread_pool;
251 if (!thread_pool)
252 return APR_SUCCESS;
253
254 thread_pool = NULL;
255 thread_pool_initialized = FALSE;
256
257 return apr_thread_pool_destroy(tp);
258 }
259
260 #endif
261
262 /* Core implementation of svn_fs_x__batch_fsync_init. */
263 static svn_error_t *
create_thread_pool(void * baton,apr_pool_t * owning_pool)264 create_thread_pool(void *baton,
265 apr_pool_t *owning_pool)
266 {
267 #if APR_HAS_THREADS
268 /* The thread-pool must be allocated from a thread-safe pool.
269 GLOBAL_POOL may be single-threaded, though. */
270 apr_pool_t *pool = svn_pool_create(NULL);
271
272 /* This thread pool will get cleaned up automatically when GLOBAL_POOL
273 gets cleared. No additional cleanup callback is needed. */
274 WRAP_APR_ERR(apr_thread_pool_create(&thread_pool, 0, MAX_THREADS, pool),
275 _("Can't create fsync thread pool in FSX"));
276
277 /* Work around an APR bug: The cleanup must happen in the pre-cleanup
278 hook instead of the normal cleanup hook. Otherwise, the sub-pools
279 containing the thread objects would already be invalid. */
280 apr_pool_pre_cleanup_register(pool, NULL, thread_pool_pre_cleanup);
281 apr_pool_pre_cleanup_register(owning_pool, NULL, thread_pool_pre_cleanup);
282
283 /* let idle threads linger for a while in case more requests are
284 coming in */
285 apr_thread_pool_idle_wait_set(thread_pool, THREADPOOL_THREAD_IDLE_LIMIT);
286
287 /* don't queue requests unless we reached the worker thread limit */
288 apr_thread_pool_threshold_set(thread_pool, 0);
289
290 #endif
291
292 return SVN_NO_ERROR;
293 }
294
295 svn_error_t *
svn_fs_x__batch_fsync_init(apr_pool_t * owning_pool)296 svn_fs_x__batch_fsync_init(apr_pool_t *owning_pool)
297 {
298 /* Protect against multiple calls. */
299 return svn_error_trace(svn_atomic__init_once(&thread_pool_initialized,
300 create_thread_pool,
301 NULL, owning_pool));
302 }
303
304 /* Destructor for svn_fs_x__batch_fsync_t. Releases all global pool memory
305 * and closes all open file handles. */
306 static apr_status_t
fsync_batch_cleanup(void * data)307 fsync_batch_cleanup(void *data)
308 {
309 svn_fs_x__batch_fsync_t *batch = data;
310 apr_hash_index_t *hi;
311
312 /* Close all files (implicitly) and release memory. */
313 for (hi = apr_hash_first(apr_hash_pool_get(batch->files), batch->files);
314 hi;
315 hi = apr_hash_next(hi))
316 {
317 to_sync_t *to_sync = apr_hash_this_val(hi);
318 svn_pool_destroy(to_sync->pool);
319 }
320
321 return APR_SUCCESS;
322 }
323
324 svn_error_t *
svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t ** result_p,svn_boolean_t flush_to_disk,apr_pool_t * result_pool)325 svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t **result_p,
326 svn_boolean_t flush_to_disk,
327 apr_pool_t *result_pool)
328 {
329 svn_fs_x__batch_fsync_t *result = apr_pcalloc(result_pool, sizeof(*result));
330 result->files = svn_hash__make(result_pool);
331 result->flush_to_disk = flush_to_disk;
332
333 SVN_ERR(waitable_counter__create(&result->counter, result_pool));
334 apr_pool_cleanup_register(result_pool, result, fsync_batch_cleanup,
335 apr_pool_cleanup_null);
336
337 *result_p = result;
338
339 return SVN_NO_ERROR;
340 }
341
342 /* If BATCH does not contain a handle for PATH, yet, create one with FLAGS
343 * and add it to BATCH. Set *FILE to the open file handle.
344 * Use SCRATCH_POOL for temporaries.
345 */
346 static svn_error_t *
internal_open_file(apr_file_t ** file,svn_fs_x__batch_fsync_t * batch,const char * path,apr_int32_t flags,apr_pool_t * scratch_pool)347 internal_open_file(apr_file_t **file,
348 svn_fs_x__batch_fsync_t *batch,
349 const char *path,
350 apr_int32_t flags,
351 apr_pool_t *scratch_pool)
352 {
353 svn_error_t *err;
354 apr_pool_t *pool;
355 to_sync_t *to_sync;
356 #ifdef SVN_ON_POSIX
357 svn_boolean_t is_new_file;
358 #endif
359
360 /* If we already have a handle for PATH, return that. */
361 to_sync = svn_hash_gets(batch->files, path);
362 if (to_sync)
363 {
364 *file = to_sync->file;
365 return SVN_NO_ERROR;
366 }
367
368 /* Calling fsync in PATH is going to be expensive in any case, so we can
369 * allow for some extra overhead figuring out whether the file already
370 * exists. If it doesn't, be sure to schedule parent folder updates, if
371 * required on this platform.
372 *
373 * See svn_fs_x__batch_fsync_new_path() for when such extra fsyncs may be
374 * needed at all. */
375
376 #ifdef SVN_ON_POSIX
377
378 is_new_file = FALSE;
379 if (flags & APR_CREATE)
380 {
381 svn_node_kind_t kind;
382 /* We might actually be about to create a new file.
383 * Check whether the file already exists. */
384 SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
385 is_new_file = kind == svn_node_none;
386 }
387
388 #endif
389
390 /* To be able to process each file in a separate thread, they must use
391 * separate, thread-safe pools. Allocating a sub-pool from the standard
392 * memory pool achieves exactly that. */
393 pool = svn_pool_create(NULL);
394 err = svn_io_file_open(file, path, flags, APR_OS_DEFAULT, pool);
395 if (err)
396 {
397 svn_pool_destroy(pool);
398 return svn_error_trace(err);
399 }
400
401 to_sync = apr_pcalloc(pool, sizeof(*to_sync));
402 to_sync->file = *file;
403 to_sync->pool = pool;
404 to_sync->result = SVN_NO_ERROR;
405 to_sync->counter = batch->counter;
406
407 svn_hash_sets(batch->files,
408 apr_pstrdup(apr_hash_pool_get(batch->files), path),
409 to_sync);
410
411 /* If we just created a new file, schedule any additional necessary fsyncs.
412 * Note that this can only recurse once since the parent folder already
413 * exists on disk. */
414 #ifdef SVN_ON_POSIX
415
416 if (is_new_file)
417 SVN_ERR(svn_fs_x__batch_fsync_new_path(batch, path, scratch_pool));
418
419 #endif
420
421 return SVN_NO_ERROR;
422 }
423
424 svn_error_t *
svn_fs_x__batch_fsync_open_file(apr_file_t ** file,svn_fs_x__batch_fsync_t * batch,const char * filename,apr_pool_t * scratch_pool)425 svn_fs_x__batch_fsync_open_file(apr_file_t **file,
426 svn_fs_x__batch_fsync_t *batch,
427 const char *filename,
428 apr_pool_t *scratch_pool)
429 {
430 apr_off_t offset = 0;
431
432 SVN_ERR(internal_open_file(file, batch, filename, FILE_FLAGS,
433 scratch_pool));
434 SVN_ERR(svn_io_file_seek(*file, APR_SET, &offset, scratch_pool));
435
436 return SVN_NO_ERROR;
437 }
438
439 svn_error_t *
svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t * batch,const char * path,apr_pool_t * scratch_pool)440 svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t *batch,
441 const char *path,
442 apr_pool_t *scratch_pool)
443 {
444 apr_file_t *file;
445
446 #ifdef SVN_ON_POSIX
447
448 /* On POSIX, we need to sync the parent directory because it contains
449 * the name for the file / folder given by PATH. */
450 path = svn_dirent_dirname(path, scratch_pool);
451 SVN_ERR(internal_open_file(&file, batch, path, APR_READ, scratch_pool));
452
453 #else
454
455 svn_node_kind_t kind;
456
457 /* On non-POSIX systems, we assume that sync'ing the given PATH is the
458 * right thing to do. Also, we assume that only files may be sync'ed. */
459 SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
460 if (kind == svn_node_file)
461 SVN_ERR(internal_open_file(&file, batch, path, FILE_FLAGS,
462 scratch_pool));
463
464 #endif
465
466 return SVN_NO_ERROR;
467 }
468
469 /* Thread-pool task Flush the to_sync_t instance given by DATA. */
470 static void * APR_THREAD_FUNC
flush_task(apr_thread_t * tid,void * data)471 flush_task(apr_thread_t *tid,
472 void *data)
473 {
474 to_sync_t *to_sync = data;
475
476 to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
477 (to_sync->file, to_sync->pool));
478
479 /* As soon as the increment call returns, TO_SYNC may be invalid
480 (the main thread may have woken up and released the struct.
481
482 Therefore, we cannot chain this error into TO_SYNC->RESULT.
483 OTOH, the main thread will probably deadlock anyway if we got
484 an error here, thus there is no point in trying to tell the
485 main thread what the problem was. */
486 svn_error_clear(waitable_counter__increment(to_sync->counter));
487
488 return NULL;
489 }
490
491 svn_error_t *
svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t * batch,apr_pool_t * scratch_pool)492 svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t *batch,
493 apr_pool_t *scratch_pool)
494 {
495 apr_hash_index_t *hi;
496
497 /* Number of tasks sent to the thread pool. */
498 int tasks = 0;
499
500 /* Because we allocated the open files from our global pool, don't bail
501 * out on the first error. Instead, process all files and but accumulate
502 * the errors in this chain.
503 */
504 svn_error_t *chain = SVN_NO_ERROR;
505
506 /* First, flush APR-internal buffers. This should minimize / prevent the
507 * introduction of additional meta-data changes during the next phase.
508 * We might otherwise issue redundant fsyncs.
509 */
510 for (hi = apr_hash_first(scratch_pool, batch->files);
511 hi;
512 hi = apr_hash_next(hi))
513 {
514 to_sync_t *to_sync = apr_hash_this_val(hi);
515 to_sync->result = svn_error_trace(svn_io_file_flush
516 (to_sync->file, to_sync->pool));
517 }
518
519 /* Make sure the task completion counter is set to 0. */
520 chain = svn_error_compose_create(chain,
521 waitable_counter__reset(batch->counter));
522
523 /* Start the actual fsyncing process. */
524 if (batch->flush_to_disk)
525 {
526 for (hi = apr_hash_first(scratch_pool, batch->files);
527 hi;
528 hi = apr_hash_next(hi))
529 {
530 to_sync_t *to_sync = apr_hash_this_val(hi);
531
532 #if APR_HAS_THREADS
533
534 /* Forgot to call _init() or cleaned up the owning pool too early?
535 */
536 SVN_ERR_ASSERT(thread_pool);
537
538 /* If there are multiple fsyncs to perform, run them in parallel.
539 * Otherwise, skip the thread-pool and synchronization overhead. */
540 if (apr_hash_count(batch->files) > 1)
541 {
542 apr_status_t status = APR_SUCCESS;
543 status = apr_thread_pool_push(thread_pool, flush_task, to_sync,
544 0, NULL);
545 if (status)
546 to_sync->result = svn_error_wrap_apr(status,
547 _("Can't push task"));
548 else
549 tasks++;
550 }
551 else
552
553 #endif
554
555 {
556 to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
557 (to_sync->file,
558 to_sync->pool));
559 }
560 }
561 }
562
563 /* Wait for all outstanding flush operations to complete. */
564 chain = svn_error_compose_create(chain,
565 waitable_counter__wait_for(batch->counter,
566 tasks));
567
568 /* Collect the results, close all files and release memory. */
569 for (hi = apr_hash_first(scratch_pool, batch->files);
570 hi;
571 hi = apr_hash_next(hi))
572 {
573 to_sync_t *to_sync = apr_hash_this_val(hi);
574 if (batch->flush_to_disk)
575 chain = svn_error_compose_create(chain, to_sync->result);
576
577 chain = svn_error_compose_create(chain,
578 svn_io_file_close(to_sync->file,
579 scratch_pool));
580 svn_pool_destroy(to_sync->pool);
581 }
582
583 /* Don't process any file / folder twice. */
584 apr_hash_clear(batch->files);
585
586 /* Report the errors that we encountered. */
587 return svn_error_trace(chain);
588 }
589