1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <my_global.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <unistd.h>
43 
44 #include "portability/toku_assert.h"
45 #include "portability/memory.h"
46 
47 #include "ft/ft-internal.h"
48 #include "ft/serialize/ft_node-serialize.h"
49 #include "loader/dbufio.h"
50 #include "loader/loader-internal.h"
51 
52 toku_instr_key *bfs_mutex_key;
53 toku_instr_key *bfs_cond_key;
54 toku_instr_key *io_thread_key;
55 
56 struct dbufio_file {
57     // i/o thread owns these
58     int fd;
59 
60     // consumers own these
61     size_t offset_in_buf;
62     toku_off_t  offset_in_uncompressed_file;
63 
64     // need the mutex to modify these
65     struct dbufio_file *next;
66     bool   second_buf_ready; // if true, the i/o thread is not touching anything.
67 
68     // consumers own [0], i/o thread owns [1], they are swapped by the consumer only when the condition mutex is held and second_buf_ready is true.
69     char *buf[2];
70     size_t n_in_buf[2];
71     int    error_code[2]; // includes errno or eof. [0] is the error code associated with buf[0], [1] is the code for buf[1]
72 
73     bool io_done;
74 };
75 
76 
77 /* A dbufio_fileset  */
78 struct dbufio_fileset {
79     // The mutex/condition variables protect
80     //   the singly-linked list of files that need I/O (head/tail in the fileset, and next in each file)
81     //   in each file:
82     //     the second_buf_ready boolean (which says the second buffer is full of data).
83     //     the swapping of the buf[], n_in_buf[], and error_code[] values.
84     toku_mutex_t mutex;
85     toku_cond_t  cond;
86     int N; // How many files.  This is constant once established.
87     int n_not_done; // how many of the files require more I/O?  Owned by the i/o thread.
88     struct dbufio_file *files;     // an array of length N.
89     struct dbufio_file *head, *tail; // must have the mutex to fiddle with these.
90     size_t bufsize; // the bufsize is the constant (the same for all buffers).
91 
92     bool panic;
93     bool compressed;
94     int  panic_errno;
95     toku_pthread_t iothread;
96 };
97 
98 
enq(DBUFIO_FILESET bfs,struct dbufio_file * f)99 static void enq (DBUFIO_FILESET bfs, struct dbufio_file *f) {
100     if (bfs->tail==NULL) {
101 	bfs->head = f;
102     } else {
103 	bfs->tail->next = f;
104     }
105     bfs->tail = f;
106     f->next = NULL;
107 }
108 
panic(DBUFIO_FILESET bfs,int r)109 static void panic (DBUFIO_FILESET bfs, int r) {
110     if (bfs->panic) return;
111     bfs->panic_errno = r; // Don't really care about a race on this variable...  Writes to it are atomic, so at least one good panic reason will be stored.
112     bfs->panic = true;
113     return;
114 }
115 
paniced(DBUFIO_FILESET bfs)116 static bool paniced (DBUFIO_FILESET bfs) {
117     return bfs->panic;
118 }
119 
dbf_read_some_compressed(struct dbufio_file * dbf,char * buf,size_t bufsize)120 static ssize_t dbf_read_some_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) {
121     ssize_t ret;
122     invariant(bufsize >= MAX_UNCOMPRESSED_BUF);
123     unsigned char *raw_block = NULL;
124 
125     // deserialize the sub block header
126 
127     // total_size
128     // num_sub_blocks
129     // compressed_size,uncompressed_size,xsum (repeated num_sub_blocks times)
130     ssize_t readcode;
131     const uint32_t header_size = sizeof(uint32_t);
132     char header[header_size];
133 
134     readcode = toku_os_read(dbf->fd, &header, header_size);
135     if (readcode < 0) {
136         ret = -1;
137         goto exit;
138     }
139     if (readcode == 0) {
140         ret = 0;
141         goto exit;
142     }
143     if (readcode < (ssize_t) header_size) {
144         errno = TOKUDB_NO_DATA;
145         ret = -1;
146         goto exit;
147     }
148     uint32_t total_size;
149     {
150         uint32_t *p = (uint32_t *) &header[0];
151         total_size = toku_dtoh32(p[0]);
152     }
153     if (total_size == 0 || total_size > (1<<30)) {
154         errno = toku_db_badformat();
155         ret = -1;
156         goto exit;
157     }
158 
159     //Cannot use XMALLOC
160     MALLOC_N(total_size, raw_block);
161     if (raw_block == nullptr) {
162         errno = ENOMEM;
163         ret = -1;
164         goto exit;
165     }
166     readcode = toku_os_read(dbf->fd, raw_block, total_size);
167     if (readcode < 0) {
168         ret = -1;
169         goto exit;
170     }
171     if (readcode < (ssize_t) total_size) {
172         errno = TOKUDB_NO_DATA;
173         ret = -1;
174         goto exit;
175     }
176 
177     struct sub_block sub_block[max_sub_blocks];
178     uint32_t *sub_block_header;
179     sub_block_header = (uint32_t *) &raw_block[0];
180     int32_t n_sub_blocks;
181     n_sub_blocks = toku_dtoh32(sub_block_header[0]);
182     sub_block_header++;
183     size_t size_subblock_header;
184     size_subblock_header = sub_block_header_size(n_sub_blocks);
185     if (n_sub_blocks == 0 || n_sub_blocks > max_sub_blocks || size_subblock_header > total_size) {
186         errno = toku_db_badformat();
187         ret = -1;
188         goto exit;
189     }
190     for (int i = 0; i < n_sub_blocks; i++) {
191         sub_block_init(&sub_block[i]);
192         sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
193         sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
194         sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
195         sub_block_header += 3;
196     }
197 
198     // verify sub block sizes
199     size_t total_compressed_size;
200     total_compressed_size = 0;
201     for (int i = 0; i < n_sub_blocks; i++) {
202         uint32_t compressed_size = sub_block[i].compressed_size;
203         if (compressed_size<=0   || compressed_size>(1<<30)) {
204             errno = toku_db_badformat();
205             ret = -1;
206             goto exit;
207         }
208 
209         uint32_t uncompressed_size = sub_block[i].uncompressed_size;
210         if (uncompressed_size<=0 || uncompressed_size>(1<<30)) {
211             errno = toku_db_badformat();
212             ret = -1;
213             goto exit;
214         }
215         total_compressed_size += compressed_size;
216     }
217     if (total_size != total_compressed_size + size_subblock_header) {
218         errno = toku_db_badformat();
219         ret = -1;
220         goto exit;
221     }
222 
223     // sum up the uncompressed size of the sub blocks
224     size_t uncompressed_size;
225     uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
226     if (uncompressed_size > bufsize || uncompressed_size > MAX_UNCOMPRESSED_BUF) {
227         errno = toku_db_badformat();
228         ret = -1;
229         goto exit;
230     }
231 
232     unsigned char *uncompressed_data;
233     uncompressed_data = (unsigned char *)buf;
234 
235     // point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
236     unsigned char *compressed_data;
237     compressed_data = raw_block + size_subblock_header;
238 
239     // decompress all the compressed sub blocks into the uncompressed buffer
240     {
241         int r;
242         r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, get_num_cores(), get_ft_pool());
243         if (r != 0) {
244             fprintf(stderr, "%s:%d loader failed %d at %p size %" PRIu32"\n", __FUNCTION__, __LINE__, r, raw_block, total_size);
245             dump_bad_block(raw_block, total_size);
246             errno = r;
247             ret = -1;
248             goto exit;
249         }
250     }
251     ret = uncompressed_size;
252 exit:
253     if (raw_block) {
254         toku_free(raw_block);
255     }
256     return ret;
257 }
258 
dbf_read_compressed(struct dbufio_file * dbf,char * buf,size_t bufsize)259 static ssize_t dbf_read_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) {
260     invariant(bufsize >= MAX_UNCOMPRESSED_BUF);
261     size_t count = 0;
262 
263     while (count + MAX_UNCOMPRESSED_BUF <= bufsize) {
264         ssize_t readcode = dbf_read_some_compressed(dbf, buf + count, bufsize - count);
265         if (readcode < 0) {
266             return readcode;
267         }
268         count += readcode;
269         if (readcode == 0) {
270             break;
271         }
272     }
273     return count;
274 }
275 
io_thread(void * v)276 static void* io_thread (void *v)
277 // The dbuf_thread does all the asynchronous I/O.
278 {
279     DBUFIO_FILESET bfs = (DBUFIO_FILESET)v;
280     toku_mutex_lock(&bfs->mutex);
281     //printf("%s:%d Locked\n", __FILE__, __LINE__);
282     while (1) {
283         if (paniced(bfs)) {
284             toku_mutex_unlock(&bfs->mutex);  // ignore any error
285             toku_instr_delete_current_thread();
286             return toku_pthread_done(nullptr);
287         }
288         // printf("n_not_done=%d\n", bfs->n_not_done);
289         if (bfs->n_not_done == 0) {
290             // all done (meaning we stored EOF (or another error) in
291             // error_code[0] for the file.
292             // printf("unlocked\n");
293             toku_mutex_unlock(&bfs->mutex);
294             toku_instr_delete_current_thread();
295             return toku_pthread_done(nullptr);
296         }
297 
298         struct dbufio_file *dbf = bfs->head;
299         if (dbf == NULL) {
300             // No I/O needs to be done yet.
301             // Wait until something happens that will wake us up.
302             toku_cond_wait(&bfs->cond, &bfs->mutex);
303             if (paniced(bfs)) {
304                 toku_mutex_unlock(&bfs->mutex);  // ignore any error
305                 toku_instr_delete_current_thread();
306                 return toku_pthread_done(nullptr);
307             }
308             // Have the lock so go around.
309         } else {
310             // Some I/O needs to be done.
311             // printf("%s:%d Need I/O\n", __FILE__, __LINE__);
312             assert(dbf->second_buf_ready == false);
313             assert(!dbf->io_done);
314             bfs->head = dbf->next;
315             if (bfs->head == NULL)
316                 bfs->tail = NULL;
317 
318             // Unlock the mutex now that we have ownership of dbf to allow
319             // consumers to get the mutex and perform swaps.  They won't swap
320             // this buffer because second_buf_ready is false.
321             toku_mutex_unlock(&bfs->mutex);
322 	    //printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd);
323 	    {
324 		ssize_t readcode;
325                 if (bfs->compressed) {
326                     readcode = dbf_read_compressed(dbf, dbf->buf[1], bfs->bufsize);
327                 }
328                 else {
329                     readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize);
330                 }
331 		//printf("%s:%d readcode=%ld\n", __FILE__, __LINE__, readcode);
332 		if (readcode==-1) {
333 		    // a real error.  Save the real error.
334                     int the_errno = get_error_errno();
335                     fprintf(stderr, "%s:%d dbf=%p fd=%d errno=%d\n", __FILE__, __LINE__, dbf, dbf->fd, the_errno);
336 		    dbf->error_code[1] = the_errno;
337 		    dbf->n_in_buf[1] = 0;
338 		} else if (readcode==0) {
339 		    // End of file.  Save it.
340 		    dbf->error_code[1] = EOF;
341 		    dbf->n_in_buf[1] = 0;
342 		    dbf->io_done = true;
343 
344 		} else {
345 		    dbf->error_code[1] = 0;
346 		    dbf->n_in_buf[1] = readcode;
347 		}
348 
349 		//printf("%s:%d locking mutex again=%ld\n", __FILE__, __LINE__, readcode);
350 		{
351             toku_mutex_lock(&bfs->mutex);
352             if (paniced(bfs)) {
353                 toku_mutex_unlock(&bfs->mutex);  // ignore any error
354                 toku_instr_delete_current_thread();
355                 return toku_pthread_done(nullptr);
356             }
357         }
358         // Now that we have the mutex, we can decrement n_not_done (if
359         // applicable) and set second_buf_ready
360         if (readcode<=0) {
361 		    bfs->n_not_done--;
362 		}
363 		//printf("%s:%d n_not_done=%d\n", __FILE__, __LINE__, bfs->n_not_done);
364 		dbf->second_buf_ready = true;
365                 toku_cond_broadcast(&bfs->cond);
366 		//printf("%s:%d did broadcast=%d\n", __FILE__, __LINE__, bfs->n_not_done);
367 		// Still have the lock so go around the loop
368 	    }
369 	}
370     }
371 }
372 
create_dbufio_fileset(DBUFIO_FILESET * bfsp,int N,int fds[],size_t bufsize,bool compressed)373 int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed) {
374     //printf("%s:%d here\n", __FILE__, __LINE__);
375     int result = 0;
376     DBUFIO_FILESET CALLOC(bfs);
377     if (bfs==0) { result = get_error_errno(); }
378 
379     bfs->compressed = compressed;
380 
381     bool mutex_inited = false, cond_inited = false;
382     if (result==0) {
383 	CALLOC_N(N, bfs->files);
384 	if (bfs->files==NULL) { result = get_error_errno(); }
385 	else {
386 	    for (int i=0; i<N; i++) {
387 		bfs->files[i].buf[0] = bfs->files[i].buf[1] = NULL;
388 	    }
389 	}
390     }
391     // printf("%s:%d here\n", __FILE__, __LINE__);
392     if (result == 0) {
393         toku_mutex_init(*bfs_mutex_key, &bfs->mutex, nullptr);
394         mutex_inited = true;
395     }
396     if (result == 0) {
397         toku_cond_init(*bfs_cond_key, &bfs->cond, nullptr);
398         cond_inited = true;
399     }
400     if (result == 0) {
401         bfs->N = N;
402         bfs->n_not_done = N;
403         bfs->head = bfs->tail = NULL;
404         for (int i = 0; i < N; i++) {
405             bfs->files[i].fd = fds[i];
406             bfs->files[i].offset_in_buf = 0;
407             bfs->files[i].offset_in_uncompressed_file = 0;
408             bfs->files[i].next = NULL;
409             bfs->files[i].second_buf_ready = false;
410             for (int j = 0; j < 2; j++) {
411                 if (result == 0) {
412                     MALLOC_N(bufsize, bfs->files[i].buf[j]);
413                     if (bfs->files[i].buf[j] == NULL) {
414                         result = get_error_errno();
415                     }
416                 }
417                 bfs->files[i].n_in_buf[j] = 0;
418                 bfs->files[i].error_code[j] = 0;
419             }
420             bfs->files[i].io_done = false;
421             ssize_t r;
422             if (bfs->compressed) {
423                 r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize);
424             } else {
425 		r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize);
426             }
427             {
428 		if (r<0) {
429 		    result=get_error_errno();
430 		    break;
431                 } else if (r==0) {
432 		    // it's EOF
433 		    bfs->files[i].io_done = true;
434 		    bfs->n_not_done--;
435 		    bfs->files[i].error_code[0] = EOF;
436 		} else {
437 		    bfs->files[i].n_in_buf[0] = r;
438 		    //printf("%s:%d enq [%d]\n", __FILE__, __LINE__, i);
439 		    enq(bfs, &bfs->files[i]);
440 		}
441 	    }
442 	}
443 	bfs->bufsize = bufsize;
444 	bfs->panic = false;
445 	bfs->panic_errno = 0;
446     }
447     // printf("Creating IO thread\n");
448     if (result == 0) {
449         result = toku_pthread_create(*io_thread_key,
450                                      &bfs->iothread,
451                                      nullptr,
452                                      io_thread,
453                                      static_cast<void *>(bfs));
454     }
455     if (result == 0) {
456         *bfsp = bfs;
457         return 0;
458     }
459     // Now undo everything.
460     // If we got here, there is no thread (either result was zero before the
461     // thread was created, or else the thread creation itself failed.
462     if (bfs) {
463 	if (bfs->files) {
464 	    // the files were allocated, so we have to free all the bufs.
465 	    for (int i=0; i<N; i++) {
466 		for (int j=0; j<2; j++) {
467 		    if (bfs->files[i].buf[j])
468 			toku_free(bfs->files[i].buf[j]);
469 		    bfs->files[i].buf[j]=NULL;
470 		}
471 	    }
472 	    toku_free(bfs->files);
473 	    bfs->files=NULL;
474 	}
475 	if (cond_inited) {
476 	    toku_cond_destroy(&bfs->cond);  // don't check error status
477 	}
478 	if (mutex_inited) {
479 	    toku_mutex_destroy(&bfs->mutex); // don't check error status
480 	}
481 	toku_free(bfs);
482     }
483     return result;
484 }
485 
panic_dbufio_fileset(DBUFIO_FILESET bfs,int error)486 int panic_dbufio_fileset(DBUFIO_FILESET bfs, int error) {
487     toku_mutex_lock(&bfs->mutex);
488     panic(bfs, error);
489     toku_cond_broadcast(&bfs->cond);
490     toku_mutex_unlock(&bfs->mutex);
491     return 0;
492 }
493 
destroy_dbufio_fileset(DBUFIO_FILESET bfs)494 int destroy_dbufio_fileset (DBUFIO_FILESET bfs) {
495     int result = 0;
496     {
497 	void *retval;
498 	int r = toku_pthread_join(bfs->iothread, &retval);
499 	assert(r==0);
500 	assert(retval==NULL);
501     }
502     {
503 	toku_mutex_destroy(&bfs->mutex);
504     }
505     {
506 	toku_cond_destroy(&bfs->cond);
507     }
508     if (bfs->files) {
509 	for (int i=0; i<bfs->N; i++) {
510 	    for (int j=0; j<2; j++) {
511 		//printf("%s:%d free([%d][%d]=%p\n", __FILE__, __LINE__, i,j, bfs->files[i].buf[j]);
512 		toku_free(bfs->files[i].buf[j]);
513 	    }
514 	}
515 	toku_free(bfs->files);
516     }
517     toku_free(bfs);
518     return result;
519 }
520 
dbufio_fileset_read(DBUFIO_FILESET bfs,int filenum,void * buf_v,size_t count,size_t * n_read)521 int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t count, size_t *n_read) {
522     char *buf = (char*)buf_v;
523     struct dbufio_file *dbf = &bfs->files[filenum];
524     if (dbf->error_code[0]!=0) return dbf->error_code[0];
525     if (dbf->offset_in_buf + count <= dbf->n_in_buf[0]) {
526 	// Enough data is present to do it all now
527 	memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, count);
528 	dbf->offset_in_buf += count;
529 	dbf->offset_in_uncompressed_file += count;
530 	*n_read = count;
531 	return 0;
532     } else if (dbf->n_in_buf[0] > dbf->offset_in_buf) {
533 	// There is something in buf[0]
534 	size_t this_count = dbf->n_in_buf[0]-dbf->offset_in_buf;
535 	assert(dbf->offset_in_buf + this_count <= bfs->bufsize);
536 	memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, this_count);
537 	dbf->offset_in_buf += this_count;
538 	dbf->offset_in_uncompressed_file += this_count;
539 	size_t sub_n_read;
540 	int r = dbufio_fileset_read(bfs, filenum, buf+this_count, count-this_count, &sub_n_read);
541 	if (r==0) {
542 	    *n_read = this_count + sub_n_read;
543 	    return 0;
544 	} else {
545 	    // The error code will have been saved.  We got some data so return that
546 	    *n_read = this_count;
547 	    return 0;
548 	}
549     } else {
550 	// There is nothing in buf[0].  So we need to swap buffers
551 	toku_mutex_lock(&bfs->mutex);
552 	while (1) {
553 	    if (dbf->second_buf_ready) {
554 		dbf->n_in_buf[0] = dbf->n_in_buf[1];
555 		{
556 		    char *tmp = dbf->buf[0];
557 		    dbf->buf[0]      = dbf->buf[1];
558 		    dbf->buf[1]      = tmp;
559 		}
560 		dbf->error_code[0] = dbf->error_code[1];
561 		dbf->second_buf_ready = false;
562 		dbf->offset_in_buf = 0;
563 		if (!dbf->io_done) {
564 		    // Don't enqueue it if the I/O is all done.
565 		    //printf("%s:%d enq [%ld]\n", __FILE__, __LINE__, dbf-&bfs->files[0]);
566 		    enq(bfs, dbf);
567 		}
568 		toku_cond_broadcast(&bfs->cond);
569 		toku_mutex_unlock(&bfs->mutex);
570 		if (dbf->error_code[0]==0) {
571 		    assert(dbf->n_in_buf[0]>0);
572 		    return dbufio_fileset_read(bfs, filenum, buf_v, count, n_read);
573 		} else {
574 		    *n_read = 0;
575 		    return dbf->error_code[0];
576 		}
577 	    } else {
578 		toku_cond_wait(&bfs->cond, &bfs->mutex);
579 	    }
580 	}
581 	assert(0); // cannot get here.
582     }
583 }
584 
585 void
dbufio_print(DBUFIO_FILESET bfs)586 dbufio_print(DBUFIO_FILESET bfs) {
587     fprintf(stderr, "%s:%d bfs=%p", __FILE__, __LINE__, bfs);
588     if (bfs->panic)
589         fprintf(stderr, " panic=%d", bfs->panic_errno);
590     fprintf(stderr, " N=%d %d %" PRIuMAX, bfs->N, bfs->n_not_done, (uintmax_t) bfs->bufsize);
591     for (int i = 0; i < bfs->N; i++) {
592         struct dbufio_file *dbf = &bfs->files[i];
593         if (dbf->error_code[0] || dbf->error_code[1])
594             fprintf(stderr, " %d=[%d,%d]", i, dbf->error_code[0], dbf->error_code[1]);
595     }
596     fprintf(stderr, "\n");
597 
598 }
599