1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <toku_portability.h>
40 #include <unistd.h>
41 #include <errno.h>
42 #include <toku_assert.h>
43 #include <stdio.h>
44 #include <string.h>
45 #include <dirent.h>
46 #include <sys/types.h>
47 #include <sys/stat.h>
48 #include <fcntl.h>
49 
50 #include "memory.h"
51 #include "toku_time.h"
52 #include "toku_path.h"
53 #include <portability/toku_atomic.h>
54 
55 toku_instr_key *tokudb_file_data_key;
56 
57 static int toku_assert_on_write_enospc = 0;
58 static const int toku_write_enospc_sleep = 1;
59 static uint64_t toku_write_enospc_last_report;      // timestamp of most recent
60                                                     // report to error log
61 static time_t   toku_write_enospc_last_time;        // timestamp of most recent ENOSPC
62 static uint32_t toku_write_enospc_current;          // number of threads currently blocked on ENOSPC
63 static uint64_t toku_write_enospc_total;            // total number of times ENOSPC was returned from an attempt to write
64 
toku_set_assert_on_write_enospc(int do_assert)65 void toku_set_assert_on_write_enospc(int do_assert) {
66     toku_assert_on_write_enospc = do_assert;
67 }
68 
toku_fs_get_write_info(time_t * enospc_last_time,uint64_t * enospc_current,uint64_t * enospc_total)69 void toku_fs_get_write_info(time_t *enospc_last_time, uint64_t *enospc_current, uint64_t *enospc_total) {
70     *enospc_last_time = toku_write_enospc_last_time;
71     *enospc_current = toku_write_enospc_current;
72     *enospc_total = toku_write_enospc_total;
73 }
74 
75 //Print any necessary errors
76 //Return whether we should try the write again.
77 static void
try_again_after_handling_write_error(int fd,size_t len,ssize_t r_write)78 try_again_after_handling_write_error(int fd, size_t len, ssize_t r_write) {
79     int try_again = 0;
80 
81     assert(r_write < 0);
82     int errno_write = get_error_errno();
83     switch (errno_write) {
84     case EINTR: { //The call was interrupted by a signal before any data was written; see signal(7).
85 	char err_msg[sizeof("Write of [] bytes to fd=[] interrupted.  Retrying.") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
86 	snprintf(err_msg, sizeof(err_msg), "Write of [%" PRIu64 "] bytes to fd=[%d] interrupted.  Retrying.", (uint64_t)len, fd);
87 	perror(err_msg);
88 	fflush(stderr);
89 	try_again = 1;
90 	break;
91     }
92     case ENOSPC: {
93         if (toku_assert_on_write_enospc) {
94             char err_msg[sizeof("Failed write of [] bytes to fd=[].") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
95             snprintf(err_msg, sizeof(err_msg), "Failed write of [%" PRIu64 "] bytes to fd=[%d].", (uint64_t)len, fd);
96             perror(err_msg);
97             fflush(stderr);
98             int out_of_disk_space = 1;
99             assert(!out_of_disk_space); //Give an error message that might be useful if this is the only one that survives.
100         } else {
101             toku_sync_fetch_and_add(&toku_write_enospc_total, 1);
102             toku_sync_fetch_and_add(&toku_write_enospc_current, 1);
103 
104             time_t tnow = time(0);
105             toku_write_enospc_last_time = tnow;
106             if (toku_write_enospc_last_report == 0 || tnow - toku_write_enospc_last_report >= 60) {
107                 toku_write_enospc_last_report = tnow;
108 
109                 const int tstr_length = 26;
110                 char tstr[tstr_length];
111                 time_t t = time(0);
112                 ctime_r(&t, tstr);
113 
114                 const int MY_MAX_PATH = 256;
115                 char fname[MY_MAX_PATH], symname[MY_MAX_PATH+1];
116                 sprintf(fname, "/proc/%d/fd/%d", getpid(), fd);
117                 ssize_t n = readlink(fname, symname, MY_MAX_PATH);
118 
119                 if ((int)n == -1)
120                     fprintf(stderr, "%.24s PerconaFT No space when writing %" PRIu64 " bytes to fd=%d ", tstr, (uint64_t) len, fd);
121                 else {
122 		    tstr[n] = 0; // readlink doesn't append a NUL to the end of the buffer.
123                     fprintf(stderr, "%.24s PerconaFT No space when writing %" PRIu64 " bytes to %*s ", tstr, (uint64_t) len, (int) n, symname);
124 		}
125                 fprintf(stderr, "retry in %d second%s\n", toku_write_enospc_sleep, toku_write_enospc_sleep > 1 ? "s" : "");
126                 fflush(stderr);
127             }
128             sleep(toku_write_enospc_sleep);
129             try_again = 1;
130             toku_sync_fetch_and_sub(&toku_write_enospc_current, 1);
131             break;
132         }
133     }
134     default:
135 	break;
136     }
137     assert(try_again);
138     errno = errno_write;
139 }
140 
141 static ssize_t (*t_write)(int, const void *, size_t);
142 static ssize_t (*t_full_write)(int, const void *, size_t);
143 static ssize_t (*t_pwrite)(int, const void *, size_t, off_t);
144 static ssize_t (*t_full_pwrite)(int, const void *, size_t, off_t);
145 static FILE *  (*t_fdopen)(int, const char *);
146 static FILE *  (*t_fopen)(const char *, const char *);
147 static int     (*t_open)(const char *, int, int);
148 static int (*t_fclose)(FILE *);
149 static ssize_t (*t_read)(int, void *, size_t);
150 static ssize_t (*t_pread)(int, void *, size_t, off_t);
151 static size_t (*os_fwrite_fun)(const void *, size_t, size_t, FILE *) = nullptr;
152 
toku_set_func_fwrite(size_t (* fwrite_fun)(const void *,size_t,size_t,FILE *))153 void toku_set_func_fwrite(
154     size_t (*fwrite_fun)(const void *, size_t, size_t, FILE *)) {
155     os_fwrite_fun = fwrite_fun;
156 }
157 
toku_set_func_write(ssize_t (* write_fun)(int,const void *,size_t))158 void toku_set_func_write(ssize_t (*write_fun)(int, const void *, size_t)) {
159     t_write = write_fun;
160 }
161 
toku_set_func_full_write(ssize_t (* write_fun)(int,const void *,size_t))162 void toku_set_func_full_write (ssize_t (*write_fun)(int, const void *, size_t)) {
163     t_full_write = write_fun;
164 }
165 
toku_set_func_pwrite(ssize_t (* pwrite_fun)(int,const void *,size_t,off_t))166 void toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
167     t_pwrite = pwrite_fun;
168 }
169 
toku_set_func_full_pwrite(ssize_t (* pwrite_fun)(int,const void *,size_t,off_t))170 void toku_set_func_full_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
171     t_full_pwrite = pwrite_fun;
172 }
173 
toku_set_func_fdopen(FILE * (* fdopen_fun)(int,const char *))174 void toku_set_func_fdopen(FILE * (*fdopen_fun)(int, const char *)) {
175     t_fdopen = fdopen_fun;
176 }
177 
toku_set_func_fopen(FILE * (* fopen_fun)(const char *,const char *))178 void toku_set_func_fopen(FILE * (*fopen_fun)(const char *, const char *)) {
179     t_fopen = fopen_fun;
180 }
181 
toku_set_func_open(int (* open_fun)(const char *,int,int))182 void toku_set_func_open(int (*open_fun)(const char *, int, int)) {
183     t_open = open_fun;
184 }
185 
toku_set_func_fclose(int (* fclose_fun)(FILE *))186 void toku_set_func_fclose(int (*fclose_fun)(FILE*)) {
187     t_fclose = fclose_fun;
188 }
189 
toku_set_func_read(ssize_t (* read_fun)(int,void *,size_t))190 void toku_set_func_read (ssize_t (*read_fun)(int, void *, size_t)) {
191     t_read = read_fun;
192 }
193 
toku_set_func_pread(ssize_t (* pread_fun)(int,void *,size_t,off_t))194 void toku_set_func_pread (ssize_t (*pread_fun)(int, void *, size_t, off_t)) {
195     t_pread = pread_fun;
196 }
197 
toku_os_delete_with_source_location(const char * name,const char * src_file,uint src_line)198 int toku_os_delete_with_source_location(const char *name,
199                                         const char *src_file,
200                                         uint src_line) {
201 
202     toku_io_instrumentation io_annotation;
203     toku_instr_file_name_close_begin(io_annotation,
204                                      *tokudb_file_data_key,
205                                      toku_instr_file_op::file_delete,
206                                      name,
207                                      src_file,
208                                      src_line);
209     const int result = unlink(name);
210 
211     /* Register the result value with the instrumentation system */
212     toku_instr_file_close_end(io_annotation, result);
213 
214     return result;
215 }
216 
toku_os_rename_with_source_location(const char * old_name,const char * new_name,const char * src_file,uint src_line)217 int toku_os_rename_with_source_location(const char *old_name,
218                                         const char *new_name,
219                                         const char *src_file,
220                                         uint src_line) {
221     int result;
222 
223     toku_io_instrumentation io_annotation;
224     toku_instr_file_name_io_begin(io_annotation,
225                                   *tokudb_file_data_key,
226                                   toku_instr_file_op::file_rename,
227                                   new_name,
228                                   0,
229                                   src_file,
230                                   src_line);
231 
232     result = rename(old_name, new_name);
233     /* Regsiter the result value with the instrumentation system */
234     toku_instr_file_io_end(io_annotation, 0);
235 
236     return result;
237 }
238 
toku_os_full_write_with_source_location(int fd,const void * buf,size_t len,const char * src_file,uint src_line)239 void toku_os_full_write_with_source_location(int fd,
240                                              const void *buf,
241                                              size_t len,
242                                              const char *src_file,
243                                              uint src_line) {
244     const char *bp = (const char *)buf;
245     size_t bytes_written = len;
246 
247     toku_io_instrumentation io_annotation;
248     toku_instr_file_io_begin(io_annotation,
249                              toku_instr_file_op::file_write,
250                              fd,
251                              len,
252                              src_file,
253                              src_line);
254 
255     while (len > 0) {
256         ssize_t r;
257         if (t_full_write) {
258             r = t_full_write(fd, bp, len);
259         } else {
260             r = write(fd, bp, len);
261         }
262         if (r > 0) {
263             len           -= r;
264             bp            += r;
265         }
266         else {
267             try_again_after_handling_write_error(fd, len, r);
268         }
269     }
270     assert(len == 0);
271 
272     /* Register the result value with the instrumentaion system */
273     toku_instr_file_io_end(io_annotation, bytes_written);
274 }
275 
toku_os_write_with_source_location(int fd,const void * buf,size_t len,const char * src_file,uint src_line)276 int toku_os_write_with_source_location(int fd,
277                                        const void *buf,
278                                        size_t len,
279                                        const char *src_file,
280                                        uint src_line) {
281     const char *bp = (const char *)buf;
282     int result = 0;
283     ssize_t r;
284 
285     size_t bytes_written = len;
286     toku_io_instrumentation io_annotation;
287     toku_instr_file_io_begin(io_annotation,
288                              toku_instr_file_op::file_write,
289                              fd,
290                              len,
291                              src_file,
292                              src_line);
293 
294     while (len > 0) {
295         if (t_write) {
296             r = t_write(fd, bp, len);
297         } else {
298             r = write(fd, bp, len);
299         }
300         if (r < 0) {
301             result = errno;
302             break;
303         }
304         len -= r;
305         bp += r;
306     }
307     /* Register the result value with the instrumentation system */
308     toku_instr_file_io_end(io_annotation, bytes_written - len);
309 
310     return result;
311 }
312 
toku_os_full_pwrite_with_source_location(int fd,const void * buf,size_t len,toku_off_t off,const char * src_file,uint src_line)313 void toku_os_full_pwrite_with_source_location(int fd,
314                                               const void *buf,
315                                               size_t len,
316                                               toku_off_t off,
317                                               const char *src_file,
318                                               uint src_line) {
319     assert(0 == ((long long)buf) % 512);
320     assert((len % 512 == 0) && (off % 512) == 0);  // to make pwrite work.
321     const char *bp = (const char *)buf;
322 
323     size_t bytes_written = len;
324     toku_io_instrumentation io_annotation;
325     toku_instr_file_io_begin(io_annotation,
326                              toku_instr_file_op::file_write,
327                              fd,
328                              len,
329                              src_file,
330                              src_line);
331     while (len > 0) {
332         ssize_t r;
333         if (t_full_pwrite) {
334             r = t_full_pwrite(fd, bp, len, off);
335         } else {
336             r = pwrite(fd, bp, len, off);
337         }
338         if (r > 0) {
339             len           -= r;
340             bp            += r;
341             off           += r;
342         }
343         else {
344             try_again_after_handling_write_error(fd, len, r);
345         }
346     }
347     assert(len == 0);
348 
349     /* Register the result value with the instrumentation system */
350     toku_instr_file_io_end(io_annotation, bytes_written);
351 }
352 
toku_os_pwrite_with_source_location(int fd,const void * buf,size_t len,toku_off_t off,const char * src_file,uint src_line)353 ssize_t toku_os_pwrite_with_source_location(int fd,
354                                             const void *buf,
355                                             size_t len,
356                                             toku_off_t off,
357                                             const char *src_file,
358                                             uint src_line) {
359     assert(0 ==
360            ((long long)buf) %
361                512);  // these asserts are to ensure that direct I/O will work.
362     assert(0 == len % 512);
363     assert(0 == off % 512);
364     const char *bp = (const char *)buf;
365     ssize_t result = 0;
366     ssize_t r;
367 
368     size_t bytes_written = len;
369     toku_io_instrumentation io_annotation;
370     toku_instr_file_io_begin(io_annotation,
371                              toku_instr_file_op::file_write,
372                              fd,
373                              len,
374                              src_file,
375                              src_line);
376     while (len > 0) {
377         r = (t_pwrite) ? t_pwrite(fd, bp, len, off) : pwrite(fd, bp, len, off);
378 
379         if (r < 0) {
380             result = errno;
381             break;
382         }
383         len           -= r;
384         bp += r;
385         off += r;
386     }
387     /* Register the result value with the instrumentation system */
388     toku_instr_file_io_end(io_annotation, bytes_written - len);
389 
390     return result;
391 }
392 
toku_os_fwrite_with_source_location(const void * ptr,size_t size,size_t nmemb,TOKU_FILE * stream,const char * src_file,uint src_line)393 int toku_os_fwrite_with_source_location(const void *ptr,
394                                         size_t size,
395                                         size_t nmemb,
396                                         TOKU_FILE *stream,
397                                         const char *src_file,
398                                         uint src_line) {
399     int result = 0;
400     size_t bytes_written;
401 
402     toku_io_instrumentation io_annotation;
403     toku_instr_file_stream_io_begin(io_annotation,
404                                     toku_instr_file_op::file_write,
405                                     *stream,
406                                     nmemb,
407                                     src_file,
408                                     src_line);
409 
410     if (os_fwrite_fun) {
411         bytes_written = os_fwrite_fun(ptr, size, nmemb, stream->file);
412     } else {
413         bytes_written = fwrite(ptr, size, nmemb, stream->file);
414     }
415 
416     if (bytes_written != nmemb) {
417         if (os_fwrite_fun)  // if using hook to induce artificial errors (for
418                             // testing) ...
419             result = get_maybe_error_errno();  // ... then there is no error in
420                                                // the stream, but there is one
421                                                // in errno
422         else
423             result = ferror(stream->file);
424         invariant(result != 0);  // Should we assert here?
425     }
426     /* Register the result value with the instrumentation system */
427     toku_instr_file_io_end(io_annotation, bytes_written);
428 
429     return result;
430 }
431 
toku_os_fread_with_source_location(void * ptr,size_t size,size_t nmemb,TOKU_FILE * stream,const char * src_file,uint src_line)432 int toku_os_fread_with_source_location(void *ptr,
433                                        size_t size,
434                                        size_t nmemb,
435                                        TOKU_FILE *stream,
436                                        const char *src_file,
437                                        uint src_line) {
438     int result = 0;
439     size_t bytes_read;
440 
441     toku_io_instrumentation io_annotation;
442     toku_instr_file_stream_io_begin(io_annotation,
443                                     toku_instr_file_op::file_read,
444                                     *stream,
445                                     nmemb,
446                                     src_file,
447                                     src_line);
448 
449     if ((bytes_read = fread(ptr, size, nmemb, stream->file)) != nmemb) {
450         if ((feof(stream->file)))
451             result = EOF;
452         else
453             result = ferror(stream->file);
454         invariant(result != 0);  // Should we assert here?
455     }
456     /* Register the result value with the instrumentation system */
457     toku_instr_file_io_end(io_annotation, bytes_read);
458 
459     return result;
460 }
461 
toku_os_fdopen_with_source_location(int fildes,const char * mode,const char * filename,const toku_instr_key & instr_key,const char * src_file,uint src_line)462 TOKU_FILE *toku_os_fdopen_with_source_location(int fildes,
463                                                const char *mode,
464                                                const char *filename,
465                                                const toku_instr_key &instr_key,
466                                                const char *src_file,
467                                                uint src_line) {
468     TOKU_FILE *XMALLOC(rval);
469     if (FT_LIKELY(rval != nullptr)) {
470         toku_io_instrumentation io_annotation;
471         toku_instr_file_open_begin(io_annotation,
472                                    instr_key,
473                                    toku_instr_file_op::file_stream_open,
474                                    filename,
475                                    src_file,
476                                    src_line);
477 
478         rval->file = (t_fdopen) ? t_fdopen(fildes, mode) : fdopen(fildes, mode);
479         toku_instr_file_stream_open_end(io_annotation, *rval);
480 
481         if (FT_UNLIKELY(rval->file == nullptr)) {
482             toku_free(rval);
483             rval = nullptr;
484         }
485     }
486     return rval;
487 }
488 
toku_os_fopen_with_source_location(const char * filename,const char * mode,const toku_instr_key & instr_key,const char * src_file,uint src_line)489 TOKU_FILE *toku_os_fopen_with_source_location(const char *filename,
490                                               const char *mode,
491                                               const toku_instr_key &instr_key,
492                                               const char *src_file,
493                                               uint src_line) {
494     TOKU_FILE *XMALLOC(rval);
495     if (FT_UNLIKELY(rval == nullptr))
496         return nullptr;
497 
498     toku_io_instrumentation io_annotation;
499     toku_instr_file_open_begin(io_annotation,
500                                instr_key,
501                                toku_instr_file_op::file_stream_open,
502                                filename,
503                                src_file,
504                                src_line);
505     rval->file = t_fopen ? t_fopen(filename, mode) : fopen(filename, mode);
506     /* Register the returning "file" value with the system */
507     toku_instr_file_stream_open_end(io_annotation, *rval);
508 
509     if (FT_UNLIKELY(rval->file == nullptr)) {
510         toku_free(rval);
511         rval = nullptr;
512     }
513     return rval;
514 }
515 
toku_os_open_with_source_location(const char * path,int oflag,int mode,const toku_instr_key & instr_key,const char * src_file,uint src_line)516 int toku_os_open_with_source_location(const char *path,
517                                       int oflag,
518                                       int mode,
519                                       const toku_instr_key &instr_key,
520                                       const char *src_file,
521                                       uint src_line) {
522     int fd;
523     toku_io_instrumentation io_annotation;
524     /* register a file open or creation depending on "oflag" */
525     toku_instr_file_open_begin(
526         io_annotation,
527         instr_key,
528         ((oflag & O_CREAT) ? toku_instr_file_op::file_create
529                            : toku_instr_file_op::file_open),
530         path,
531         src_file,
532         src_line);
533     if (t_open)
534         fd = t_open(path, oflag, mode);
535     else
536         fd = open(path, oflag, mode);
537 
538     toku_instr_file_open_end(io_annotation, fd);
539     return fd;
540 }
541 
toku_os_open_direct(const char * path,int oflag,int mode,const toku_instr_key & instr_key)542 int toku_os_open_direct(const char *path,
543                         int oflag,
544                         int mode,
545                         const toku_instr_key &instr_key) {
546     int rval;
547 #if defined(HAVE_O_DIRECT)
548     rval = toku_os_open(path, oflag | O_DIRECT, mode, instr_key);
549 #elif defined(HAVE_F_NOCACHE)
550     rval = toku_os_open(path, oflag, mode, instr_key);
551     if (rval >= 0) {
552         int r = fcntl(rval, F_NOCACHE, 1);
553         if (r == -1) {
554             perror("setting F_NOCACHE");
555         }
556     }
557 #else
558 # error "No direct I/O implementation found."
559 #endif
560     return rval;
561 }
562 
toku_os_fclose_with_source_location(TOKU_FILE * stream,const char * src_file,uint src_line)563 int toku_os_fclose_with_source_location(TOKU_FILE *stream,
564                                         const char *src_file,
565                                         uint src_line) {
566     int rval = -1;
567     if (FT_LIKELY(stream != nullptr)) {
568         /* register a file stream close " */
569         toku_io_instrumentation io_annotation;
570         toku_instr_file_stream_close_begin(
571             io_annotation,
572             toku_instr_file_op::file_stream_close,
573             *stream,
574             src_file,
575             src_line);
576 
577         if (t_fclose)
578             rval = t_fclose(stream->file);
579         else {  // if EINTR, retry until success
580             while (rval != 0) {
581                 rval = fclose(stream->file);
582                 if (rval && (errno != EINTR))
583                     break;
584             }
585         }
586         /* Register the returning "rval" value with the system */
587         toku_instr_file_close_end(io_annotation, rval);
588         toku_free(stream);
589         stream = nullptr;
590     }
591     return rval;
592 }
593 
toku_os_close_with_source_location(int fd,const char * src_file,uint src_line)594 int toku_os_close_with_source_location(
595     int fd,
596     const char *src_file,
597     uint src_line) {  // if EINTR, retry until success
598     /* register the file close */
599     int r = -1;
600 
601     /* register a file descriptor close " */
602     toku_io_instrumentation io_annotation;
603     toku_instr_file_fd_close_begin(
604         io_annotation, toku_instr_file_op::file_close, fd, src_file, src_line);
605     while (r != 0) {
606         r = close(fd);
607         if (r) {
608             int rr = errno;
609             if (rr != EINTR)
610                 printf("rr=%d (%s)\n", rr, strerror(rr));
611             assert(rr == EINTR);
612         }
613     }
614 
615     /* Regsiter the returning value with the system */
616     toku_instr_file_close_end(io_annotation, r);
617 
618     return r;
619 }
620 
toku_os_read_with_source_location(int fd,void * buf,size_t count,const char * src_file,uint src_line)621 ssize_t toku_os_read_with_source_location(int fd,
622                                           void *buf,
623                                           size_t count,
624                                           const char *src_file,
625                                           uint src_line) {
626     ssize_t bytes_read;
627 
628     toku_io_instrumentation io_annotation;
629     toku_instr_file_io_begin(io_annotation,
630                              toku_instr_file_op::file_read,
631                              fd,
632                              count,
633                              src_file,
634                              src_line);
635 
636     bytes_read = (t_read) ? t_read(fd, buf, count) : read(fd, buf, count);
637 
638     toku_instr_file_io_end(io_annotation, bytes_read);
639 
640     return bytes_read;
641 }
642 
inline_toku_os_pread_with_source_location(int fd,void * buf,size_t count,off_t offset,const char * src_file,uint src_line)643 ssize_t inline_toku_os_pread_with_source_location(int fd,
644                                                   void *buf,
645                                                   size_t count,
646                                                   off_t offset,
647                                                   const char *src_file,
648                                                   uint src_line) {
649     assert(0 == ((long long)buf) % 512);
650     assert(0 == count % 512);
651     assert(0 == offset % 512);
652     ssize_t bytes_read;
653 
654     toku_io_instrumentation io_annotation;
655     toku_instr_file_io_begin(io_annotation,
656                              toku_instr_file_op::file_read,
657                              fd,
658                              count,
659                              src_file,
660                              src_line);
661     if (t_pread) {
662         bytes_read = t_pread(fd, buf, count, offset);
663     } else {
664         bytes_read = pread(fd, buf, count, offset);
665     }
666     toku_instr_file_io_end(io_annotation, bytes_read);
667 
668     return bytes_read;
669 }
670 
toku_os_recursive_delete(const char * path)671 void toku_os_recursive_delete(const char *path) {
672     char buf[TOKU_PATH_MAX + sizeof("rm -rf ")];
673     strcpy(buf, "rm -rf ");
674     strncat(buf, path, TOKU_PATH_MAX);
675     int r = system(buf);
676     assert_zero(r);
677 }
678 
679 // fsync logic:
680 
681 // t_fsync exists for testing purposes only
682 static int (*t_fsync)(int) = 0;
683 static uint64_t toku_fsync_count;
684 static uint64_t toku_fsync_time;
685 static uint64_t toku_long_fsync_threshold = 1000000;
686 static uint64_t toku_long_fsync_count;
687 static uint64_t toku_long_fsync_time;
688 static uint64_t toku_long_fsync_eintr_count;
689 static int toku_fsync_debug = 0;
690 
toku_set_func_fsync(int (* fsync_function)(int))691 void toku_set_func_fsync(int (*fsync_function)(int)) {
692     t_fsync = fsync_function;
693 }
694 
695 // keep trying if fsync fails because of EINTR
file_fsync_internal_with_source_location(int fd,const char * src_file,uint src_line)696 void file_fsync_internal_with_source_location(int fd,
697                                               const char *src_file,
698                                               uint src_line) {
699     uint64_t tstart = toku_current_time_microsec();
700     int r = -1;
701     uint64_t eintr_count = 0;
702 
703     toku_io_instrumentation io_annotation;
704     toku_instr_file_io_begin(io_annotation,
705                              toku_instr_file_op::file_sync,
706                              fd,
707                              0,
708                              src_file,
709                              src_line);
710 
711     while (r != 0) {
712         if (t_fsync) {
713             r = t_fsync(fd);
714         } else {
715 	    r = fsync(fd);
716         }
717 	if (r) {
718             assert(get_error_errno() == EINTR);
719             eintr_count++;
720 	}
721     }
722     toku_sync_fetch_and_add(&toku_fsync_count, 1);
723     uint64_t duration = toku_current_time_microsec() - tstart;
724     toku_sync_fetch_and_add(&toku_fsync_time, duration);
725 
726     toku_instr_file_io_end(io_annotation, 0);
727 
728     if (duration >= toku_long_fsync_threshold) {
729         toku_sync_fetch_and_add(&toku_long_fsync_count, 1);
730         toku_sync_fetch_and_add(&toku_long_fsync_time, duration);
731         toku_sync_fetch_and_add(&toku_long_fsync_eintr_count, eintr_count);
732         if (toku_fsync_debug) {
733             const int tstr_length = 26;
734             char tstr[tstr_length];
735             time_t t = time(0);
736 #if __linux__
737             char fdname[256];
738             snprintf(fdname, sizeof fdname, "/proc/%d/fd/%d", getpid(), fd);
739             char lname[256];
740             ssize_t s = readlink(fdname, lname, sizeof lname);
741             if (0 < s && s < (ssize_t) sizeof lname)
742                 lname[s] = 0;
743             fprintf(stderr, "%.24s toku_file_fsync %s fd=%d %s duration=%" PRIu64 " usec eintr=%" PRIu64 "\n",
744                     ctime_r(&t, tstr), __FUNCTION__, fd, s > 0 ? lname : "?", duration, eintr_count);
745 #else
746             fprintf(stderr, "%.24s toku_file_fsync  %s fd=%d duration=%" PRIu64 " usec eintr=%" PRIu64 "\n",
747                     ctime_r(&t, tstr), __FUNCTION__, fd, duration, eintr_count);
748 #endif
749             fflush(stderr);
750         }
751     }
752 }
753 
toku_file_fsync_without_accounting(int fd)754 void toku_file_fsync_without_accounting(int fd) {
755     file_fsync_internal(fd);
756 }
757 
toku_fsync_dirfd_without_accounting(DIR * dir)758 void toku_fsync_dirfd_without_accounting(DIR *dir) {
759     int fd = dirfd(dir);
760     toku_file_fsync_without_accounting(fd);
761 }
762 
toku_fsync_dir_by_name_without_accounting(const char * dir_name)763 int toku_fsync_dir_by_name_without_accounting(const char *dir_name) {
764     int r = 0;
765     DIR * dir = opendir(dir_name);
766     if (!dir) {
767         r = get_error_errno();
768     } else {
769         toku_fsync_dirfd_without_accounting(dir);
770         r = closedir(dir);
771         if (r != 0) {
772             r = get_error_errno();
773         }
774     }
775     return r;
776 }
777 
778 // include fsync in scheduling accounting
toku_file_fsync(int fd)779 void toku_file_fsync(int fd) {
780     file_fsync_internal (fd);
781 }
782 
783 // for real accounting
toku_get_fsync_times(uint64_t * fsync_count,uint64_t * fsync_time,uint64_t * long_fsync_threshold,uint64_t * long_fsync_count,uint64_t * long_fsync_time)784 void toku_get_fsync_times(uint64_t *fsync_count, uint64_t *fsync_time, uint64_t *long_fsync_threshold, uint64_t *long_fsync_count, uint64_t *long_fsync_time) {
785     *fsync_count = toku_fsync_count;
786     *fsync_time = toku_fsync_time;
787     *long_fsync_threshold = toku_long_fsync_threshold;
788     *long_fsync_count = toku_long_fsync_count;
789     *long_fsync_time = toku_long_fsync_time;
790 }
791 
toku_fsync_directory(const char * fname)792 int toku_fsync_directory(const char *fname) {
793     int result = 0;
794 
795     // extract dirname from fname
796     const char *sp = strrchr(fname, '/');
797     size_t len;
798     char *dirname = NULL;
799     if (sp) {
800         resource_assert(sp >= fname);
801         len = sp - fname + 1;
802         MALLOC_N(len+1, dirname);
803         if (dirname == NULL) {
804             result = get_error_errno();
805         } else {
806             strncpy(dirname, fname, len);
807             dirname[len] = 0;
808         }
809     } else {
810         dirname = toku_strdup(".");
811         if (dirname == NULL) {
812             result = get_error_errno();
813         }
814     }
815 
816     if (result == 0) {
817         result = toku_fsync_dir_by_name_without_accounting(dirname);
818     }
819     toku_free(dirname);
820     return result;
821 }
822