1 /* batch_fsync.c --- efficiently fsync multiple targets
2  *
3  * ====================================================================
4  *    Licensed to the Apache Software Foundation (ASF) under one
5  *    or more contributor license agreements.  See the NOTICE file
6  *    distributed with this work for additional information
7  *    regarding copyright ownership.  The ASF licenses this file
8  *    to you under the Apache License, Version 2.0 (the
9  *    "License"); you may not use this file except in compliance
10  *    with the License.  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *    Unless required by applicable law or agreed to in writing,
15  *    software distributed under the License is distributed on an
16  *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  *    KIND, either express or implied.  See the License for the
18  *    specific language governing permissions and limitations
19  *    under the License.
20  * ====================================================================
21  */
22 
23 #include <apr_thread_pool.h>
24 #include <apr_thread_cond.h>
25 
26 #include "batch_fsync.h"
27 #include "svn_pools.h"
28 #include "svn_hash.h"
29 #include "svn_dirent_uri.h"
30 #include "svn_private_config.h"
31 
32 #include "private/svn_atomic.h"
33 #include "private/svn_dep_compat.h"
34 #include "private/svn_mutex.h"
35 #include "private/svn_subr_private.h"
36 
37 /* Handy macro to check APR function results and turning them into
38  * svn_error_t upon failure. */
39 #define WRAP_APR_ERR(x,msg)                     \
40   {                                             \
41     apr_status_t status_ = (x);                 \
42     if (status_)                                \
43       return svn_error_wrap_apr(status_, msg);  \
44   }
45 
46 
47 /* A simple SVN-wrapper around the apr_thread_cond_* API */
48 #if APR_HAS_THREADS
49 typedef apr_thread_cond_t svn_thread_cond__t;
50 #else
51 typedef int svn_thread_cond__t;
52 #endif
53 
54 static svn_error_t *
svn_thread_cond__create(svn_thread_cond__t ** cond,apr_pool_t * result_pool)55 svn_thread_cond__create(svn_thread_cond__t **cond,
56                         apr_pool_t *result_pool)
57 {
58 #if APR_HAS_THREADS
59 
60   WRAP_APR_ERR(apr_thread_cond_create(cond, result_pool),
61                _("Can't create condition variable"));
62 
63 #else
64 
65   *cond = apr_pcalloc(result_pool, sizeof(**cond));
66 
67 #endif
68 
69   return SVN_NO_ERROR;
70 }
71 
72 static svn_error_t *
svn_thread_cond__broadcast(svn_thread_cond__t * cond)73 svn_thread_cond__broadcast(svn_thread_cond__t *cond)
74 {
75 #if APR_HAS_THREADS
76 
77   WRAP_APR_ERR(apr_thread_cond_broadcast(cond),
78                _("Can't broadcast condition variable"));
79 
80 #endif
81 
82   return SVN_NO_ERROR;
83 }
84 
85 static svn_error_t *
svn_thread_cond__wait(svn_thread_cond__t * cond,svn_mutex__t * mutex)86 svn_thread_cond__wait(svn_thread_cond__t *cond,
87                       svn_mutex__t *mutex)
88 {
89 #if APR_HAS_THREADS
90 
91   WRAP_APR_ERR(apr_thread_cond_wait(cond, svn_mutex__get(mutex)),
92                _("Can't broadcast condition variable"));
93 
94 #endif
95 
96   return SVN_NO_ERROR;
97 }
98 
99 /* Utility construct:  Clients can efficiently wait for the encapsulated
100  * counter to reach a certain value.  Currently, only increments have been
101  * implemented.  This whole structure can be opaque to the API users.
102  */
103 typedef struct waitable_counter_t
104 {
105   /* Current value, initialized to 0. */
106   int value;
107 
108   /* Synchronization objects. */
109   svn_thread_cond__t *cond;
110   svn_mutex__t *mutex;
111 } waitable_counter_t;
112 
113 /* Set *COUNTER_P to a new waitable_counter_t instance allocated in
114  * RESULT_POOL.  The initial counter value is 0. */
115 static svn_error_t *
waitable_counter__create(waitable_counter_t ** counter_p,apr_pool_t * result_pool)116 waitable_counter__create(waitable_counter_t **counter_p,
117                          apr_pool_t *result_pool)
118 {
119   waitable_counter_t *counter = apr_pcalloc(result_pool, sizeof(*counter));
120   counter->value = 0;
121 
122   SVN_ERR(svn_thread_cond__create(&counter->cond, result_pool));
123   SVN_ERR(svn_mutex__init(&counter->mutex, TRUE, result_pool));
124 
125   *counter_p = counter;
126 
127   return SVN_NO_ERROR;
128 }
129 
130 /* Increment the value in COUNTER by 1. */
131 static svn_error_t *
waitable_counter__increment(waitable_counter_t * counter)132 waitable_counter__increment(waitable_counter_t *counter)
133 {
134   SVN_ERR(svn_mutex__lock(counter->mutex));
135   counter->value++;
136 
137   SVN_ERR(svn_thread_cond__broadcast(counter->cond));
138   SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
139 
140   return SVN_NO_ERROR;
141 }
142 
143 /* Efficiently wait for COUNTER to assume VALUE. */
144 static svn_error_t *
waitable_counter__wait_for(waitable_counter_t * counter,int value)145 waitable_counter__wait_for(waitable_counter_t *counter,
146                            int value)
147 {
148   svn_boolean_t done = FALSE;
149 
150   /* This loop implicitly handles spurious wake-ups. */
151   do
152     {
153       SVN_ERR(svn_mutex__lock(counter->mutex));
154 
155       if (counter->value == value)
156         done = TRUE;
157       else
158         SVN_ERR(svn_thread_cond__wait(counter->cond, counter->mutex));
159 
160       SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
161     }
162   while (!done);
163 
164   return SVN_NO_ERROR;
165 }
166 
167 /* Set the value in COUNTER to 0. */
168 static svn_error_t *
waitable_counter__reset(waitable_counter_t * counter)169 waitable_counter__reset(waitable_counter_t *counter)
170 {
171   SVN_ERR(svn_mutex__lock(counter->mutex));
172   counter->value = 0;
173   SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
174 
175   SVN_ERR(svn_thread_cond__broadcast(counter->cond));
176 
177   return SVN_NO_ERROR;
178 }
179 
180 /* Entry type for the svn_fs_x__batch_fsync_t collection.  There is one
181  * instance per file handle.
182  */
183 typedef struct to_sync_t
184 {
185   /* Open handle of the file / directory to fsync. */
186   apr_file_t *file;
187 
188   /* Pool to use with FILE.  It is private to FILE such that it can be
189    * used safely together with FILE in a separate thread. */
190   apr_pool_t *pool;
191 
192   /* Result of the file operations. */
193   svn_error_t *result;
194 
195   /* Counter to increment when we completed the task. */
196   waitable_counter_t *counter;
197 } to_sync_t;
198 
199 /* The actual collection object. */
200 struct svn_fs_x__batch_fsync_t
201 {
202   /* Maps open file handles: C-string path to to_sync_t *. */
203   apr_hash_t *files;
204 
205   /* Counts the number of completed fsync tasks. */
206   waitable_counter_t *counter;
207 
208   /* Perform fsyncs only if this flag has been set. */
209   svn_boolean_t flush_to_disk;
210 };
211 
212 /* Data structures for concurrent fsync execution are only available if
213  * we have threading support.
214  */
215 #if APR_HAS_THREADS
216 
217 /* Number of microseconds that an unused thread remains in the pool before
218  * being terminated.
219  *
220  * Higher values are useful if clients frequently send small requests and
221  * you want to minimize the latency for those.
222  */
223 #define THREADPOOL_THREAD_IDLE_LIMIT 1000000
224 
225 /* Maximum number of threads in THREAD_POOL, i.e. number of paths we can
226  * fsync concurrently throughout the process. */
227 #define MAX_THREADS 16
228 
229 /* Thread pool to execute the fsync tasks. */
230 static apr_thread_pool_t *thread_pool = NULL;
231 
232 #endif
233 
234 /* Keep track on whether we already created the THREAD_POOL . */
235 static svn_atomic_t thread_pool_initialized = FALSE;
236 
237 /* We open non-directory files with these flags. */
238 #define FILE_FLAGS (APR_READ | APR_WRITE | APR_BUFFERED | APR_CREATE)
239 
240 #if APR_HAS_THREADS
241 
242 /* Destructor function that implicitly cleans up any running threads
243    in the TRHEAD_POOL *once*.
244 
245    Must be run as a pre-cleanup hook.
246  */
247 static apr_status_t
thread_pool_pre_cleanup(void * data)248 thread_pool_pre_cleanup(void *data)
249 {
250   apr_thread_pool_t *tp = thread_pool;
251   if (!thread_pool)
252     return APR_SUCCESS;
253 
254   thread_pool = NULL;
255   thread_pool_initialized = FALSE;
256 
257   return apr_thread_pool_destroy(tp);
258 }
259 
260 #endif
261 
262 /* Core implementation of svn_fs_x__batch_fsync_init. */
263 static svn_error_t *
create_thread_pool(void * baton,apr_pool_t * owning_pool)264 create_thread_pool(void *baton,
265                    apr_pool_t *owning_pool)
266 {
267 #if APR_HAS_THREADS
268   /* The thread-pool must be allocated from a thread-safe pool.
269      GLOBAL_POOL may be single-threaded, though. */
270   apr_pool_t *pool = svn_pool_create(NULL);
271 
272   /* This thread pool will get cleaned up automatically when GLOBAL_POOL
273      gets cleared.  No additional cleanup callback is needed. */
274   WRAP_APR_ERR(apr_thread_pool_create(&thread_pool, 0, MAX_THREADS, pool),
275                _("Can't create fsync thread pool in FSX"));
276 
277   /* Work around an APR bug:  The cleanup must happen in the pre-cleanup
278      hook instead of the normal cleanup hook.  Otherwise, the sub-pools
279      containing the thread objects would already be invalid. */
280   apr_pool_pre_cleanup_register(pool, NULL, thread_pool_pre_cleanup);
281   apr_pool_pre_cleanup_register(owning_pool, NULL, thread_pool_pre_cleanup);
282 
283   /* let idle threads linger for a while in case more requests are
284      coming in */
285   apr_thread_pool_idle_wait_set(thread_pool, THREADPOOL_THREAD_IDLE_LIMIT);
286 
287   /* don't queue requests unless we reached the worker thread limit */
288   apr_thread_pool_threshold_set(thread_pool, 0);
289 
290 #endif
291 
292   return SVN_NO_ERROR;
293 }
294 
295 svn_error_t *
svn_fs_x__batch_fsync_init(apr_pool_t * owning_pool)296 svn_fs_x__batch_fsync_init(apr_pool_t *owning_pool)
297 {
298   /* Protect against multiple calls. */
299   return svn_error_trace(svn_atomic__init_once(&thread_pool_initialized,
300                                                create_thread_pool,
301                                                NULL, owning_pool));
302 }
303 
304 /* Destructor for svn_fs_x__batch_fsync_t.  Releases all global pool memory
305  * and closes all open file handles. */
306 static apr_status_t
fsync_batch_cleanup(void * data)307 fsync_batch_cleanup(void *data)
308 {
309   svn_fs_x__batch_fsync_t *batch = data;
310   apr_hash_index_t *hi;
311 
312   /* Close all files (implicitly) and release memory. */
313   for (hi = apr_hash_first(apr_hash_pool_get(batch->files), batch->files);
314        hi;
315        hi = apr_hash_next(hi))
316     {
317       to_sync_t *to_sync = apr_hash_this_val(hi);
318       svn_pool_destroy(to_sync->pool);
319     }
320 
321   return APR_SUCCESS;
322 }
323 
324 svn_error_t *
svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t ** result_p,svn_boolean_t flush_to_disk,apr_pool_t * result_pool)325 svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t **result_p,
326                              svn_boolean_t flush_to_disk,
327                              apr_pool_t *result_pool)
328 {
329   svn_fs_x__batch_fsync_t *result = apr_pcalloc(result_pool, sizeof(*result));
330   result->files = svn_hash__make(result_pool);
331   result->flush_to_disk = flush_to_disk;
332 
333   SVN_ERR(waitable_counter__create(&result->counter, result_pool));
334   apr_pool_cleanup_register(result_pool, result, fsync_batch_cleanup,
335                             apr_pool_cleanup_null);
336 
337   *result_p = result;
338 
339   return SVN_NO_ERROR;
340 }
341 
342 /* If BATCH does not contain a handle for PATH, yet, create one with FLAGS
343  * and add it to BATCH.  Set *FILE to the open file handle.
344  * Use SCRATCH_POOL for temporaries.
345  */
346 static svn_error_t *
internal_open_file(apr_file_t ** file,svn_fs_x__batch_fsync_t * batch,const char * path,apr_int32_t flags,apr_pool_t * scratch_pool)347 internal_open_file(apr_file_t **file,
348                    svn_fs_x__batch_fsync_t *batch,
349                    const char *path,
350                    apr_int32_t flags,
351                    apr_pool_t *scratch_pool)
352 {
353   svn_error_t *err;
354   apr_pool_t *pool;
355   to_sync_t *to_sync;
356 #ifdef SVN_ON_POSIX
357   svn_boolean_t is_new_file;
358 #endif
359 
360   /* If we already have a handle for PATH, return that. */
361   to_sync = svn_hash_gets(batch->files, path);
362   if (to_sync)
363     {
364       *file = to_sync->file;
365       return SVN_NO_ERROR;
366     }
367 
368   /* Calling fsync in PATH is going to be expensive in any case, so we can
369    * allow for some extra overhead figuring out whether the file already
370    * exists.  If it doesn't, be sure to schedule parent folder updates, if
371    * required on this platform.
372    *
373    * See svn_fs_x__batch_fsync_new_path() for when such extra fsyncs may be
374    * needed at all. */
375 
376 #ifdef SVN_ON_POSIX
377 
378   is_new_file = FALSE;
379   if (flags & APR_CREATE)
380     {
381       svn_node_kind_t kind;
382       /* We might actually be about to create a new file.
383        * Check whether the file already exists. */
384       SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
385       is_new_file = kind == svn_node_none;
386     }
387 
388 #endif
389 
390   /* To be able to process each file in a separate thread, they must use
391    * separate, thread-safe pools.  Allocating a sub-pool from the standard
392    * memory pool achieves exactly that. */
393   pool = svn_pool_create(NULL);
394   err = svn_io_file_open(file, path, flags, APR_OS_DEFAULT, pool);
395   if (err)
396     {
397       svn_pool_destroy(pool);
398       return svn_error_trace(err);
399     }
400 
401   to_sync = apr_pcalloc(pool, sizeof(*to_sync));
402   to_sync->file = *file;
403   to_sync->pool = pool;
404   to_sync->result = SVN_NO_ERROR;
405   to_sync->counter = batch->counter;
406 
407   svn_hash_sets(batch->files,
408                 apr_pstrdup(apr_hash_pool_get(batch->files), path),
409                 to_sync);
410 
411   /* If we just created a new file, schedule any additional necessary fsyncs.
412    * Note that this can only recurse once since the parent folder already
413    * exists on disk. */
414 #ifdef SVN_ON_POSIX
415 
416   if (is_new_file)
417     SVN_ERR(svn_fs_x__batch_fsync_new_path(batch, path, scratch_pool));
418 
419 #endif
420 
421   return SVN_NO_ERROR;
422 }
423 
424 svn_error_t *
svn_fs_x__batch_fsync_open_file(apr_file_t ** file,svn_fs_x__batch_fsync_t * batch,const char * filename,apr_pool_t * scratch_pool)425 svn_fs_x__batch_fsync_open_file(apr_file_t **file,
426                                 svn_fs_x__batch_fsync_t *batch,
427                                 const char *filename,
428                                 apr_pool_t *scratch_pool)
429 {
430   apr_off_t offset = 0;
431 
432   SVN_ERR(internal_open_file(file, batch, filename, FILE_FLAGS,
433                              scratch_pool));
434   SVN_ERR(svn_io_file_seek(*file, APR_SET, &offset, scratch_pool));
435 
436   return SVN_NO_ERROR;
437 }
438 
439 svn_error_t *
svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t * batch,const char * path,apr_pool_t * scratch_pool)440 svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t *batch,
441                                const char *path,
442                                apr_pool_t *scratch_pool)
443 {
444   apr_file_t *file;
445 
446 #ifdef SVN_ON_POSIX
447 
448   /* On POSIX, we need to sync the parent directory because it contains
449    * the name for the file / folder given by PATH. */
450   path = svn_dirent_dirname(path, scratch_pool);
451   SVN_ERR(internal_open_file(&file, batch, path, APR_READ, scratch_pool));
452 
453 #else
454 
455   svn_node_kind_t kind;
456 
457   /* On non-POSIX systems, we assume that sync'ing the given PATH is the
458    * right thing to do.  Also, we assume that only files may be sync'ed. */
459   SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
460   if (kind == svn_node_file)
461     SVN_ERR(internal_open_file(&file, batch, path, FILE_FLAGS,
462                                scratch_pool));
463 
464 #endif
465 
466   return SVN_NO_ERROR;
467 }
468 
469 /* Thread-pool task Flush the to_sync_t instance given by DATA. */
470 static void * APR_THREAD_FUNC
flush_task(apr_thread_t * tid,void * data)471 flush_task(apr_thread_t *tid,
472            void *data)
473 {
474   to_sync_t *to_sync = data;
475 
476   to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
477                                         (to_sync->file, to_sync->pool));
478 
479   /* As soon as the increment call returns, TO_SYNC may be invalid
480      (the main thread may have woken up and released the struct.
481 
482      Therefore, we cannot chain this error into TO_SYNC->RESULT.
483      OTOH, the main thread will probably deadlock anyway if we got
484      an error here, thus there is no point in trying to tell the
485      main thread what the problem was. */
486   svn_error_clear(waitable_counter__increment(to_sync->counter));
487 
488   return NULL;
489 }
490 
491 svn_error_t *
svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t * batch,apr_pool_t * scratch_pool)492 svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t *batch,
493                           apr_pool_t *scratch_pool)
494 {
495   apr_hash_index_t *hi;
496 
497   /* Number of tasks sent to the thread pool. */
498   int tasks = 0;
499 
500   /* Because we allocated the open files from our global pool, don't bail
501    * out on the first error.  Instead, process all files and but accumulate
502    * the errors in this chain.
503    */
504   svn_error_t *chain = SVN_NO_ERROR;
505 
506   /* First, flush APR-internal buffers. This should minimize / prevent the
507    * introduction of additional meta-data changes during the next phase.
508    * We might otherwise issue redundant fsyncs.
509    */
510   for (hi = apr_hash_first(scratch_pool, batch->files);
511        hi;
512        hi = apr_hash_next(hi))
513     {
514       to_sync_t *to_sync = apr_hash_this_val(hi);
515       to_sync->result = svn_error_trace(svn_io_file_flush
516                                            (to_sync->file, to_sync->pool));
517     }
518 
519   /* Make sure the task completion counter is set to 0. */
520   chain = svn_error_compose_create(chain,
521                                    waitable_counter__reset(batch->counter));
522 
523   /* Start the actual fsyncing process. */
524   if (batch->flush_to_disk)
525     {
526       for (hi = apr_hash_first(scratch_pool, batch->files);
527            hi;
528            hi = apr_hash_next(hi))
529         {
530           to_sync_t *to_sync = apr_hash_this_val(hi);
531 
532 #if APR_HAS_THREADS
533 
534           /* Forgot to call _init() or cleaned up the owning pool too early?
535            */
536           SVN_ERR_ASSERT(thread_pool);
537 
538           /* If there are multiple fsyncs to perform, run them in parallel.
539            * Otherwise, skip the thread-pool and synchronization overhead. */
540           if (apr_hash_count(batch->files) > 1)
541             {
542               apr_status_t status = APR_SUCCESS;
543               status = apr_thread_pool_push(thread_pool, flush_task, to_sync,
544                                             0, NULL);
545               if (status)
546                 to_sync->result = svn_error_wrap_apr(status,
547                                                      _("Can't push task"));
548               else
549                 tasks++;
550             }
551           else
552 
553 #endif
554 
555             {
556               to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
557                                                   (to_sync->file,
558                                                    to_sync->pool));
559             }
560         }
561     }
562 
563   /* Wait for all outstanding flush operations to complete. */
564   chain = svn_error_compose_create(chain,
565                                    waitable_counter__wait_for(batch->counter,
566                                                               tasks));
567 
568   /* Collect the results, close all files and release memory. */
569   for (hi = apr_hash_first(scratch_pool, batch->files);
570        hi;
571        hi = apr_hash_next(hi))
572     {
573       to_sync_t *to_sync = apr_hash_this_val(hi);
574       if (batch->flush_to_disk)
575         chain = svn_error_compose_create(chain, to_sync->result);
576 
577       chain = svn_error_compose_create(chain,
578                                        svn_io_file_close(to_sync->file,
579                                                          scratch_pool));
580       svn_pool_destroy(to_sync->pool);
581     }
582 
583   /* Don't process any file / folder twice. */
584   apr_hash_clear(batch->files);
585 
586   /* Report the errors that we encountered. */
587   return svn_error_trace(chain);
588 }
589