1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include <toku_portability.h>
40 #include <unistd.h>
41 #include <errno.h>
42 #include <toku_assert.h>
43 #include <stdio.h>
44 #include <string.h>
45 #include <dirent.h>
46 #include <sys/types.h>
47 #include <sys/stat.h>
48 #include <fcntl.h>
49
50 #include "memory.h"
51 #include "toku_time.h"
52 #include "toku_path.h"
53 #include <portability/toku_atomic.h>
54
55 toku_instr_key *tokudb_file_data_key;
56
57 static int toku_assert_on_write_enospc = 0;
58 static const int toku_write_enospc_sleep = 1;
59 static uint64_t toku_write_enospc_last_report; // timestamp of most recent
60 // report to error log
61 static time_t toku_write_enospc_last_time; // timestamp of most recent ENOSPC
62 static uint32_t toku_write_enospc_current; // number of threads currently blocked on ENOSPC
63 static uint64_t toku_write_enospc_total; // total number of times ENOSPC was returned from an attempt to write
64
toku_set_assert_on_write_enospc(int do_assert)65 void toku_set_assert_on_write_enospc(int do_assert) {
66 toku_assert_on_write_enospc = do_assert;
67 }
68
toku_fs_get_write_info(time_t * enospc_last_time,uint64_t * enospc_current,uint64_t * enospc_total)69 void toku_fs_get_write_info(time_t *enospc_last_time, uint64_t *enospc_current, uint64_t *enospc_total) {
70 *enospc_last_time = toku_write_enospc_last_time;
71 *enospc_current = toku_write_enospc_current;
72 *enospc_total = toku_write_enospc_total;
73 }
74
75 //Print any necessary errors
76 //Return whether we should try the write again.
77 static void
try_again_after_handling_write_error(int fd,size_t len,ssize_t r_write)78 try_again_after_handling_write_error(int fd, size_t len, ssize_t r_write) {
79 int try_again = 0;
80
81 assert(r_write < 0);
82 int errno_write = get_error_errno();
83 switch (errno_write) {
84 case EINTR: { //The call was interrupted by a signal before any data was written; see signal(7).
85 char err_msg[sizeof("Write of [] bytes to fd=[] interrupted. Retrying.") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
86 snprintf(err_msg, sizeof(err_msg), "Write of [%" PRIu64 "] bytes to fd=[%d] interrupted. Retrying.", (uint64_t)len, fd);
87 perror(err_msg);
88 fflush(stderr);
89 try_again = 1;
90 break;
91 }
92 case ENOSPC: {
93 if (toku_assert_on_write_enospc) {
94 char err_msg[sizeof("Failed write of [] bytes to fd=[].") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
95 snprintf(err_msg, sizeof(err_msg), "Failed write of [%" PRIu64 "] bytes to fd=[%d].", (uint64_t)len, fd);
96 perror(err_msg);
97 fflush(stderr);
98 int out_of_disk_space = 1;
99 assert(!out_of_disk_space); //Give an error message that might be useful if this is the only one that survives.
100 } else {
101 toku_sync_fetch_and_add(&toku_write_enospc_total, 1);
102 toku_sync_fetch_and_add(&toku_write_enospc_current, 1);
103
104 time_t tnow = time(0);
105 toku_write_enospc_last_time = tnow;
106 if (toku_write_enospc_last_report == 0 || tnow - toku_write_enospc_last_report >= 60) {
107 toku_write_enospc_last_report = tnow;
108
109 const int tstr_length = 26;
110 char tstr[tstr_length];
111 time_t t = time(0);
112 ctime_r(&t, tstr);
113
114 const int MY_MAX_PATH = 256;
115 char fname[MY_MAX_PATH], symname[MY_MAX_PATH+1];
116 sprintf(fname, "/proc/%d/fd/%d", getpid(), fd);
117 ssize_t n = readlink(fname, symname, MY_MAX_PATH);
118
119 if ((int)n == -1)
120 fprintf(stderr, "%.24s PerconaFT No space when writing %" PRIu64 " bytes to fd=%d ", tstr, (uint64_t) len, fd);
121 else {
122 tstr[n] = 0; // readlink doesn't append a NUL to the end of the buffer.
123 fprintf(stderr, "%.24s PerconaFT No space when writing %" PRIu64 " bytes to %*s ", tstr, (uint64_t) len, (int) n, symname);
124 }
125 fprintf(stderr, "retry in %d second%s\n", toku_write_enospc_sleep, toku_write_enospc_sleep > 1 ? "s" : "");
126 fflush(stderr);
127 }
128 sleep(toku_write_enospc_sleep);
129 try_again = 1;
130 toku_sync_fetch_and_sub(&toku_write_enospc_current, 1);
131 break;
132 }
133 }
134 default:
135 break;
136 }
137 assert(try_again);
138 errno = errno_write;
139 }
140
141 static ssize_t (*t_write)(int, const void *, size_t);
142 static ssize_t (*t_full_write)(int, const void *, size_t);
143 static ssize_t (*t_pwrite)(int, const void *, size_t, off_t);
144 static ssize_t (*t_full_pwrite)(int, const void *, size_t, off_t);
145 static FILE * (*t_fdopen)(int, const char *);
146 static FILE * (*t_fopen)(const char *, const char *);
147 static int (*t_open)(const char *, int, int);
148 static int (*t_fclose)(FILE *);
149 static ssize_t (*t_read)(int, void *, size_t);
150 static ssize_t (*t_pread)(int, void *, size_t, off_t);
151 static size_t (*os_fwrite_fun)(const void *, size_t, size_t, FILE *) = nullptr;
152
toku_set_func_fwrite(size_t (* fwrite_fun)(const void *,size_t,size_t,FILE *))153 void toku_set_func_fwrite(
154 size_t (*fwrite_fun)(const void *, size_t, size_t, FILE *)) {
155 os_fwrite_fun = fwrite_fun;
156 }
157
toku_set_func_write(ssize_t (* write_fun)(int,const void *,size_t))158 void toku_set_func_write(ssize_t (*write_fun)(int, const void *, size_t)) {
159 t_write = write_fun;
160 }
161
toku_set_func_full_write(ssize_t (* write_fun)(int,const void *,size_t))162 void toku_set_func_full_write (ssize_t (*write_fun)(int, const void *, size_t)) {
163 t_full_write = write_fun;
164 }
165
toku_set_func_pwrite(ssize_t (* pwrite_fun)(int,const void *,size_t,off_t))166 void toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
167 t_pwrite = pwrite_fun;
168 }
169
toku_set_func_full_pwrite(ssize_t (* pwrite_fun)(int,const void *,size_t,off_t))170 void toku_set_func_full_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
171 t_full_pwrite = pwrite_fun;
172 }
173
toku_set_func_fdopen(FILE * (* fdopen_fun)(int,const char *))174 void toku_set_func_fdopen(FILE * (*fdopen_fun)(int, const char *)) {
175 t_fdopen = fdopen_fun;
176 }
177
toku_set_func_fopen(FILE * (* fopen_fun)(const char *,const char *))178 void toku_set_func_fopen(FILE * (*fopen_fun)(const char *, const char *)) {
179 t_fopen = fopen_fun;
180 }
181
toku_set_func_open(int (* open_fun)(const char *,int,int))182 void toku_set_func_open(int (*open_fun)(const char *, int, int)) {
183 t_open = open_fun;
184 }
185
toku_set_func_fclose(int (* fclose_fun)(FILE *))186 void toku_set_func_fclose(int (*fclose_fun)(FILE*)) {
187 t_fclose = fclose_fun;
188 }
189
toku_set_func_read(ssize_t (* read_fun)(int,void *,size_t))190 void toku_set_func_read (ssize_t (*read_fun)(int, void *, size_t)) {
191 t_read = read_fun;
192 }
193
toku_set_func_pread(ssize_t (* pread_fun)(int,void *,size_t,off_t))194 void toku_set_func_pread (ssize_t (*pread_fun)(int, void *, size_t, off_t)) {
195 t_pread = pread_fun;
196 }
197
toku_os_delete_with_source_location(const char * name,const char * src_file,uint src_line)198 int toku_os_delete_with_source_location(const char *name,
199 const char *src_file,
200 uint src_line) {
201
202 toku_io_instrumentation io_annotation;
203 toku_instr_file_name_close_begin(io_annotation,
204 *tokudb_file_data_key,
205 toku_instr_file_op::file_delete,
206 name,
207 src_file,
208 src_line);
209 const int result = unlink(name);
210
211 /* Register the result value with the instrumentation system */
212 toku_instr_file_close_end(io_annotation, result);
213
214 return result;
215 }
216
toku_os_rename_with_source_location(const char * old_name,const char * new_name,const char * src_file,uint src_line)217 int toku_os_rename_with_source_location(const char *old_name,
218 const char *new_name,
219 const char *src_file,
220 uint src_line) {
221 int result;
222
223 toku_io_instrumentation io_annotation;
224 toku_instr_file_name_io_begin(io_annotation,
225 *tokudb_file_data_key,
226 toku_instr_file_op::file_rename,
227 new_name,
228 0,
229 src_file,
230 src_line);
231
232 result = rename(old_name, new_name);
233 /* Regsiter the result value with the instrumentation system */
234 toku_instr_file_io_end(io_annotation, 0);
235
236 return result;
237 }
238
toku_os_full_write_with_source_location(int fd,const void * buf,size_t len,const char * src_file,uint src_line)239 void toku_os_full_write_with_source_location(int fd,
240 const void *buf,
241 size_t len,
242 const char *src_file,
243 uint src_line) {
244 const char *bp = (const char *)buf;
245 size_t bytes_written = len;
246
247 toku_io_instrumentation io_annotation;
248 toku_instr_file_io_begin(io_annotation,
249 toku_instr_file_op::file_write,
250 fd,
251 len,
252 src_file,
253 src_line);
254
255 while (len > 0) {
256 ssize_t r;
257 if (t_full_write) {
258 r = t_full_write(fd, bp, len);
259 } else {
260 r = write(fd, bp, len);
261 }
262 if (r > 0) {
263 len -= r;
264 bp += r;
265 }
266 else {
267 try_again_after_handling_write_error(fd, len, r);
268 }
269 }
270 assert(len == 0);
271
272 /* Register the result value with the instrumentaion system */
273 toku_instr_file_io_end(io_annotation, bytes_written);
274 }
275
toku_os_write_with_source_location(int fd,const void * buf,size_t len,const char * src_file,uint src_line)276 int toku_os_write_with_source_location(int fd,
277 const void *buf,
278 size_t len,
279 const char *src_file,
280 uint src_line) {
281 const char *bp = (const char *)buf;
282 int result = 0;
283 ssize_t r;
284
285 size_t bytes_written = len;
286 toku_io_instrumentation io_annotation;
287 toku_instr_file_io_begin(io_annotation,
288 toku_instr_file_op::file_write,
289 fd,
290 len,
291 src_file,
292 src_line);
293
294 while (len > 0) {
295 if (t_write) {
296 r = t_write(fd, bp, len);
297 } else {
298 r = write(fd, bp, len);
299 }
300 if (r < 0) {
301 result = errno;
302 break;
303 }
304 len -= r;
305 bp += r;
306 }
307 /* Register the result value with the instrumentation system */
308 toku_instr_file_io_end(io_annotation, bytes_written - len);
309
310 return result;
311 }
312
toku_os_full_pwrite_with_source_location(int fd,const void * buf,size_t len,toku_off_t off,const char * src_file,uint src_line)313 void toku_os_full_pwrite_with_source_location(int fd,
314 const void *buf,
315 size_t len,
316 toku_off_t off,
317 const char *src_file,
318 uint src_line) {
319 assert(0 == ((long long)buf) % 512);
320 assert((len % 512 == 0) && (off % 512) == 0); // to make pwrite work.
321 const char *bp = (const char *)buf;
322
323 size_t bytes_written = len;
324 toku_io_instrumentation io_annotation;
325 toku_instr_file_io_begin(io_annotation,
326 toku_instr_file_op::file_write,
327 fd,
328 len,
329 src_file,
330 src_line);
331 while (len > 0) {
332 ssize_t r;
333 if (t_full_pwrite) {
334 r = t_full_pwrite(fd, bp, len, off);
335 } else {
336 r = pwrite(fd, bp, len, off);
337 }
338 if (r > 0) {
339 len -= r;
340 bp += r;
341 off += r;
342 }
343 else {
344 try_again_after_handling_write_error(fd, len, r);
345 }
346 }
347 assert(len == 0);
348
349 /* Register the result value with the instrumentation system */
350 toku_instr_file_io_end(io_annotation, bytes_written);
351 }
352
toku_os_pwrite_with_source_location(int fd,const void * buf,size_t len,toku_off_t off,const char * src_file,uint src_line)353 ssize_t toku_os_pwrite_with_source_location(int fd,
354 const void *buf,
355 size_t len,
356 toku_off_t off,
357 const char *src_file,
358 uint src_line) {
359 assert(0 ==
360 ((long long)buf) %
361 512); // these asserts are to ensure that direct I/O will work.
362 assert(0 == len % 512);
363 assert(0 == off % 512);
364 const char *bp = (const char *)buf;
365 ssize_t result = 0;
366 ssize_t r;
367
368 size_t bytes_written = len;
369 toku_io_instrumentation io_annotation;
370 toku_instr_file_io_begin(io_annotation,
371 toku_instr_file_op::file_write,
372 fd,
373 len,
374 src_file,
375 src_line);
376 while (len > 0) {
377 r = (t_pwrite) ? t_pwrite(fd, bp, len, off) : pwrite(fd, bp, len, off);
378
379 if (r < 0) {
380 result = errno;
381 break;
382 }
383 len -= r;
384 bp += r;
385 off += r;
386 }
387 /* Register the result value with the instrumentation system */
388 toku_instr_file_io_end(io_annotation, bytes_written - len);
389
390 return result;
391 }
392
toku_os_fwrite_with_source_location(const void * ptr,size_t size,size_t nmemb,TOKU_FILE * stream,const char * src_file,uint src_line)393 int toku_os_fwrite_with_source_location(const void *ptr,
394 size_t size,
395 size_t nmemb,
396 TOKU_FILE *stream,
397 const char *src_file,
398 uint src_line) {
399 int result = 0;
400 size_t bytes_written;
401
402 toku_io_instrumentation io_annotation;
403 toku_instr_file_stream_io_begin(io_annotation,
404 toku_instr_file_op::file_write,
405 *stream,
406 nmemb,
407 src_file,
408 src_line);
409
410 if (os_fwrite_fun) {
411 bytes_written = os_fwrite_fun(ptr, size, nmemb, stream->file);
412 } else {
413 bytes_written = fwrite(ptr, size, nmemb, stream->file);
414 }
415
416 if (bytes_written != nmemb) {
417 if (os_fwrite_fun) // if using hook to induce artificial errors (for
418 // testing) ...
419 result = get_maybe_error_errno(); // ... then there is no error in
420 // the stream, but there is one
421 // in errno
422 else
423 result = ferror(stream->file);
424 invariant(result != 0); // Should we assert here?
425 }
426 /* Register the result value with the instrumentation system */
427 toku_instr_file_io_end(io_annotation, bytes_written);
428
429 return result;
430 }
431
toku_os_fread_with_source_location(void * ptr,size_t size,size_t nmemb,TOKU_FILE * stream,const char * src_file,uint src_line)432 int toku_os_fread_with_source_location(void *ptr,
433 size_t size,
434 size_t nmemb,
435 TOKU_FILE *stream,
436 const char *src_file,
437 uint src_line) {
438 int result = 0;
439 size_t bytes_read;
440
441 toku_io_instrumentation io_annotation;
442 toku_instr_file_stream_io_begin(io_annotation,
443 toku_instr_file_op::file_read,
444 *stream,
445 nmemb,
446 src_file,
447 src_line);
448
449 if ((bytes_read = fread(ptr, size, nmemb, stream->file)) != nmemb) {
450 if ((feof(stream->file)))
451 result = EOF;
452 else
453 result = ferror(stream->file);
454 invariant(result != 0); // Should we assert here?
455 }
456 /* Register the result value with the instrumentation system */
457 toku_instr_file_io_end(io_annotation, bytes_read);
458
459 return result;
460 }
461
toku_os_fdopen_with_source_location(int fildes,const char * mode,const char * filename,const toku_instr_key & instr_key,const char * src_file,uint src_line)462 TOKU_FILE *toku_os_fdopen_with_source_location(int fildes,
463 const char *mode,
464 const char *filename,
465 const toku_instr_key &instr_key,
466 const char *src_file,
467 uint src_line) {
468 TOKU_FILE *XMALLOC(rval);
469 if (FT_LIKELY(rval != nullptr)) {
470 toku_io_instrumentation io_annotation;
471 toku_instr_file_open_begin(io_annotation,
472 instr_key,
473 toku_instr_file_op::file_stream_open,
474 filename,
475 src_file,
476 src_line);
477
478 rval->file = (t_fdopen) ? t_fdopen(fildes, mode) : fdopen(fildes, mode);
479 toku_instr_file_stream_open_end(io_annotation, *rval);
480
481 if (FT_UNLIKELY(rval->file == nullptr)) {
482 toku_free(rval);
483 rval = nullptr;
484 }
485 }
486 return rval;
487 }
488
toku_os_fopen_with_source_location(const char * filename,const char * mode,const toku_instr_key & instr_key,const char * src_file,uint src_line)489 TOKU_FILE *toku_os_fopen_with_source_location(const char *filename,
490 const char *mode,
491 const toku_instr_key &instr_key,
492 const char *src_file,
493 uint src_line) {
494 TOKU_FILE *XMALLOC(rval);
495 if (FT_UNLIKELY(rval == nullptr))
496 return nullptr;
497
498 toku_io_instrumentation io_annotation;
499 toku_instr_file_open_begin(io_annotation,
500 instr_key,
501 toku_instr_file_op::file_stream_open,
502 filename,
503 src_file,
504 src_line);
505 rval->file = t_fopen ? t_fopen(filename, mode) : fopen(filename, mode);
506 /* Register the returning "file" value with the system */
507 toku_instr_file_stream_open_end(io_annotation, *rval);
508
509 if (FT_UNLIKELY(rval->file == nullptr)) {
510 toku_free(rval);
511 rval = nullptr;
512 }
513 return rval;
514 }
515
toku_os_open_with_source_location(const char * path,int oflag,int mode,const toku_instr_key & instr_key,const char * src_file,uint src_line)516 int toku_os_open_with_source_location(const char *path,
517 int oflag,
518 int mode,
519 const toku_instr_key &instr_key,
520 const char *src_file,
521 uint src_line) {
522 int fd;
523 toku_io_instrumentation io_annotation;
524 /* register a file open or creation depending on "oflag" */
525 toku_instr_file_open_begin(
526 io_annotation,
527 instr_key,
528 ((oflag & O_CREAT) ? toku_instr_file_op::file_create
529 : toku_instr_file_op::file_open),
530 path,
531 src_file,
532 src_line);
533 if (t_open)
534 fd = t_open(path, oflag, mode);
535 else
536 fd = open(path, oflag, mode);
537
538 toku_instr_file_open_end(io_annotation, fd);
539 return fd;
540 }
541
toku_os_open_direct(const char * path,int oflag,int mode,const toku_instr_key & instr_key)542 int toku_os_open_direct(const char *path,
543 int oflag,
544 int mode,
545 const toku_instr_key &instr_key) {
546 int rval;
547 #if defined(HAVE_O_DIRECT)
548 rval = toku_os_open(path, oflag | O_DIRECT, mode, instr_key);
549 #elif defined(HAVE_F_NOCACHE)
550 rval = toku_os_open(path, oflag, mode, instr_key);
551 if (rval >= 0) {
552 int r = fcntl(rval, F_NOCACHE, 1);
553 if (r == -1) {
554 perror("setting F_NOCACHE");
555 }
556 }
557 #else
558 # error "No direct I/O implementation found."
559 #endif
560 return rval;
561 }
562
toku_os_fclose_with_source_location(TOKU_FILE * stream,const char * src_file,uint src_line)563 int toku_os_fclose_with_source_location(TOKU_FILE *stream,
564 const char *src_file,
565 uint src_line) {
566 int rval = -1;
567 if (FT_LIKELY(stream != nullptr)) {
568 /* register a file stream close " */
569 toku_io_instrumentation io_annotation;
570 toku_instr_file_stream_close_begin(
571 io_annotation,
572 toku_instr_file_op::file_stream_close,
573 *stream,
574 src_file,
575 src_line);
576
577 if (t_fclose)
578 rval = t_fclose(stream->file);
579 else { // if EINTR, retry until success
580 while (rval != 0) {
581 rval = fclose(stream->file);
582 if (rval && (errno != EINTR))
583 break;
584 }
585 }
586 /* Register the returning "rval" value with the system */
587 toku_instr_file_close_end(io_annotation, rval);
588 toku_free(stream);
589 stream = nullptr;
590 }
591 return rval;
592 }
593
toku_os_close_with_source_location(int fd,const char * src_file,uint src_line)594 int toku_os_close_with_source_location(
595 int fd,
596 const char *src_file,
597 uint src_line) { // if EINTR, retry until success
598 /* register the file close */
599 int r = -1;
600
601 /* register a file descriptor close " */
602 toku_io_instrumentation io_annotation;
603 toku_instr_file_fd_close_begin(
604 io_annotation, toku_instr_file_op::file_close, fd, src_file, src_line);
605 while (r != 0) {
606 r = close(fd);
607 if (r) {
608 int rr = errno;
609 if (rr != EINTR)
610 printf("rr=%d (%s)\n", rr, strerror(rr));
611 assert(rr == EINTR);
612 }
613 }
614
615 /* Regsiter the returning value with the system */
616 toku_instr_file_close_end(io_annotation, r);
617
618 return r;
619 }
620
toku_os_read_with_source_location(int fd,void * buf,size_t count,const char * src_file,uint src_line)621 ssize_t toku_os_read_with_source_location(int fd,
622 void *buf,
623 size_t count,
624 const char *src_file,
625 uint src_line) {
626 ssize_t bytes_read;
627
628 toku_io_instrumentation io_annotation;
629 toku_instr_file_io_begin(io_annotation,
630 toku_instr_file_op::file_read,
631 fd,
632 count,
633 src_file,
634 src_line);
635
636 bytes_read = (t_read) ? t_read(fd, buf, count) : read(fd, buf, count);
637
638 toku_instr_file_io_end(io_annotation, bytes_read);
639
640 return bytes_read;
641 }
642
inline_toku_os_pread_with_source_location(int fd,void * buf,size_t count,off_t offset,const char * src_file,uint src_line)643 ssize_t inline_toku_os_pread_with_source_location(int fd,
644 void *buf,
645 size_t count,
646 off_t offset,
647 const char *src_file,
648 uint src_line) {
649 assert(0 == ((long long)buf) % 512);
650 assert(0 == count % 512);
651 assert(0 == offset % 512);
652 ssize_t bytes_read;
653
654 toku_io_instrumentation io_annotation;
655 toku_instr_file_io_begin(io_annotation,
656 toku_instr_file_op::file_read,
657 fd,
658 count,
659 src_file,
660 src_line);
661 if (t_pread) {
662 bytes_read = t_pread(fd, buf, count, offset);
663 } else {
664 bytes_read = pread(fd, buf, count, offset);
665 }
666 toku_instr_file_io_end(io_annotation, bytes_read);
667
668 return bytes_read;
669 }
670
toku_os_recursive_delete(const char * path)671 void toku_os_recursive_delete(const char *path) {
672 char buf[TOKU_PATH_MAX + sizeof("rm -rf ")];
673 strcpy(buf, "rm -rf ");
674 strncat(buf, path, TOKU_PATH_MAX);
675 int r = system(buf);
676 assert_zero(r);
677 }
678
679 // fsync logic:
680
681 // t_fsync exists for testing purposes only
682 static int (*t_fsync)(int) = 0;
683 static uint64_t toku_fsync_count;
684 static uint64_t toku_fsync_time;
685 static uint64_t toku_long_fsync_threshold = 1000000;
686 static uint64_t toku_long_fsync_count;
687 static uint64_t toku_long_fsync_time;
688 static uint64_t toku_long_fsync_eintr_count;
689 static int toku_fsync_debug = 0;
690
toku_set_func_fsync(int (* fsync_function)(int))691 void toku_set_func_fsync(int (*fsync_function)(int)) {
692 t_fsync = fsync_function;
693 }
694
695 // keep trying if fsync fails because of EINTR
file_fsync_internal_with_source_location(int fd,const char * src_file,uint src_line)696 void file_fsync_internal_with_source_location(int fd,
697 const char *src_file,
698 uint src_line) {
699 uint64_t tstart = toku_current_time_microsec();
700 int r = -1;
701 uint64_t eintr_count = 0;
702
703 toku_io_instrumentation io_annotation;
704 toku_instr_file_io_begin(io_annotation,
705 toku_instr_file_op::file_sync,
706 fd,
707 0,
708 src_file,
709 src_line);
710
711 while (r != 0) {
712 if (t_fsync) {
713 r = t_fsync(fd);
714 } else {
715 r = fsync(fd);
716 }
717 if (r) {
718 assert(get_error_errno() == EINTR);
719 eintr_count++;
720 }
721 }
722 toku_sync_fetch_and_add(&toku_fsync_count, 1);
723 uint64_t duration = toku_current_time_microsec() - tstart;
724 toku_sync_fetch_and_add(&toku_fsync_time, duration);
725
726 toku_instr_file_io_end(io_annotation, 0);
727
728 if (duration >= toku_long_fsync_threshold) {
729 toku_sync_fetch_and_add(&toku_long_fsync_count, 1);
730 toku_sync_fetch_and_add(&toku_long_fsync_time, duration);
731 toku_sync_fetch_and_add(&toku_long_fsync_eintr_count, eintr_count);
732 if (toku_fsync_debug) {
733 const int tstr_length = 26;
734 char tstr[tstr_length];
735 time_t t = time(0);
736 #if __linux__
737 char fdname[256];
738 snprintf(fdname, sizeof fdname, "/proc/%d/fd/%d", getpid(), fd);
739 char lname[256];
740 ssize_t s = readlink(fdname, lname, sizeof lname);
741 if (0 < s && s < (ssize_t) sizeof lname)
742 lname[s] = 0;
743 fprintf(stderr, "%.24s toku_file_fsync %s fd=%d %s duration=%" PRIu64 " usec eintr=%" PRIu64 "\n",
744 ctime_r(&t, tstr), __FUNCTION__, fd, s > 0 ? lname : "?", duration, eintr_count);
745 #else
746 fprintf(stderr, "%.24s toku_file_fsync %s fd=%d duration=%" PRIu64 " usec eintr=%" PRIu64 "\n",
747 ctime_r(&t, tstr), __FUNCTION__, fd, duration, eintr_count);
748 #endif
749 fflush(stderr);
750 }
751 }
752 }
753
toku_file_fsync_without_accounting(int fd)754 void toku_file_fsync_without_accounting(int fd) {
755 file_fsync_internal(fd);
756 }
757
toku_fsync_dirfd_without_accounting(DIR * dir)758 void toku_fsync_dirfd_without_accounting(DIR *dir) {
759 int fd = dirfd(dir);
760 toku_file_fsync_without_accounting(fd);
761 }
762
toku_fsync_dir_by_name_without_accounting(const char * dir_name)763 int toku_fsync_dir_by_name_without_accounting(const char *dir_name) {
764 int r = 0;
765 DIR * dir = opendir(dir_name);
766 if (!dir) {
767 r = get_error_errno();
768 } else {
769 toku_fsync_dirfd_without_accounting(dir);
770 r = closedir(dir);
771 if (r != 0) {
772 r = get_error_errno();
773 }
774 }
775 return r;
776 }
777
778 // include fsync in scheduling accounting
toku_file_fsync(int fd)779 void toku_file_fsync(int fd) {
780 file_fsync_internal (fd);
781 }
782
783 // for real accounting
toku_get_fsync_times(uint64_t * fsync_count,uint64_t * fsync_time,uint64_t * long_fsync_threshold,uint64_t * long_fsync_count,uint64_t * long_fsync_time)784 void toku_get_fsync_times(uint64_t *fsync_count, uint64_t *fsync_time, uint64_t *long_fsync_threshold, uint64_t *long_fsync_count, uint64_t *long_fsync_time) {
785 *fsync_count = toku_fsync_count;
786 *fsync_time = toku_fsync_time;
787 *long_fsync_threshold = toku_long_fsync_threshold;
788 *long_fsync_count = toku_long_fsync_count;
789 *long_fsync_time = toku_long_fsync_time;
790 }
791
toku_fsync_directory(const char * fname)792 int toku_fsync_directory(const char *fname) {
793 int result = 0;
794
795 // extract dirname from fname
796 const char *sp = strrchr(fname, '/');
797 size_t len;
798 char *dirname = NULL;
799 if (sp) {
800 resource_assert(sp >= fname);
801 len = sp - fname + 1;
802 MALLOC_N(len+1, dirname);
803 if (dirname == NULL) {
804 result = get_error_errno();
805 } else {
806 strncpy(dirname, fname, len);
807 dirname[len] = 0;
808 }
809 } else {
810 dirname = toku_strdup(".");
811 if (dirname == NULL) {
812 result = get_error_errno();
813 }
814 }
815
816 if (result == 0) {
817 result = toku_fsync_dir_by_name_without_accounting(dirname);
818 }
819 toku_free(dirname);
820 return result;
821 }
822