1 /*
2    Copyright 2013-2014 EditShare, 2013-2015 Skytechnology sp. z o.o.
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 #include "metarestore/merger.h"
21 
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <syslog.h>
27 
28 #include "protocol/MFSCommunication.h"
29 #include "common/lizardfs_error_codes.h"
30 #include "common/slogger.h"
31 #include "master/restore.h"
32 
33 #define BSIZE 200000
34 
35 typedef struct _hentry {
36 	FILE *fd;
37 	char *filename;
38 	char *buff;
39 	char *ptr;
40 	uint64_t nextid;
41 } hentry;
42 
43 static hentry *heap;
44 static uint32_t heapsize;
45 static uint64_t maxidhole;
46 
47 #define PARENT(x) (((x)-1)/2)
48 #define CHILD(x) (((x)*2)+1)
49 
merger_heap_sort_down(void)50 void merger_heap_sort_down(void) {
51 	uint32_t l,r,m;
52 	uint32_t pos=0;
53 	hentry x;
54 	while (pos<heapsize) {
55 		l = CHILD(pos);
56 		r = l+1;
57 		if (l>=heapsize) {
58 			return;
59 		}
60 		m = l;
61 		if (r<heapsize && heap[r].nextid < heap[l].nextid) {
62 			m = r;
63 		}
64 		if (heap[pos].nextid <= heap[m].nextid) {
65 			return;
66 		}
67 		x = heap[pos];
68 		heap[pos] = heap[m];
69 		heap[m] = x;
70 		pos = m;
71 	}
72 }
73 
merger_heap_sort_up(void)74 void merger_heap_sort_up(void) {
75 	uint32_t pos=heapsize-1;
76 	uint32_t p;
77 	hentry x;
78 	while (pos>0) {
79 		p = PARENT(pos);
80 		if (heap[pos].nextid >= heap[p].nextid) {
81 			return;
82 		}
83 		x = heap[pos];
84 		heap[pos] = heap[p];
85 		heap[p] = x;
86 		pos = p;
87 	}
88 }
89 
90 
merger_nextentry(uint32_t pos)91 void merger_nextentry(uint32_t pos) {
92 	if (fgets(heap[pos].buff,BSIZE,heap[pos].fd)) {
93 		uint64_t nextid = strtoull(heap[pos].buff,&(heap[pos].ptr),10);
94 		if (heap[pos].nextid==0 || (nextid>heap[pos].nextid && nextid<heap[pos].nextid+maxidhole)) {
95 			heap[pos].nextid = nextid;
96 		} else {
97 			lzfs_pretty_syslog(LOG_ERR, "found garbage at the end of file: %s (last correct id: %" PRIu64 ")",
98 					heap[pos].filename, heap[pos].nextid);
99 			heap[pos].nextid = 0;
100 		}
101 	} else {
102 		heap[pos].nextid = 0;
103 	}
104 }
105 
merger_delete_entry(void)106 void merger_delete_entry(void) {
107 	if (heap[heapsize].fd) {
108 		fclose(heap[heapsize].fd);
109 	}
110 	if (heap[heapsize].filename) {
111 		free(heap[heapsize].filename);
112 	}
113 	if (heap[heapsize].buff) {
114 		free(heap[heapsize].buff);
115 	}
116 }
117 
merger_new_entry(const char * filename)118 void merger_new_entry(const char *filename) {
119 	// printf("add file: %s\n",filename);
120 	if ((heap[heapsize].fd = fopen(filename,"r"))!=NULL) {
121 		heap[heapsize].filename = strdup(filename);
122 		heap[heapsize].buff = (char*) malloc(BSIZE);
123 		heap[heapsize].ptr = NULL;
124 		heap[heapsize].nextid = 0;
125 		merger_nextentry(heapsize);
126 	} else {
127 		lzfs_pretty_syslog(LOG_ERR, "can't open changelog file: %s", filename);
128 		heap[heapsize].filename = NULL;
129 		heap[heapsize].buff = NULL;
130 		heap[heapsize].ptr = NULL;
131 		heap[heapsize].nextid = 0;
132 	}
133 }
134 
merger_start(const std::vector<std::string> & filenames,uint64_t maxhole)135 int merger_start(const std::vector<std::string>& filenames, uint64_t maxhole) {
136 	heapsize = 0;
137 	heap = (hentry*)malloc(sizeof(hentry)*filenames.size());
138 	if (heap==NULL) {
139 		return -1;
140 	}
141 	for (const auto& filename : filenames) {
142 		merger_new_entry(filename.c_str());
143 		if (heap[heapsize].nextid==0) {
144 			merger_delete_entry();
145 		} else {
146 			heapsize++;
147 			merger_heap_sort_up();
148 		}
149 	}
150 	maxidhole = maxhole;
151 	return 0;
152 }
153 
merger_loop(void)154 uint8_t merger_loop(void) {
155 	uint8_t status;
156 	hentry h;
157 
158 	while (heapsize) {
159 //              lzfs_pretty_syslog(LOG_DEBUG, "current id: %" PRIu64 " / %s",heap[0].nextid,heap[0].ptr);
160 		if ((status=restore(heap[0].filename, heap[0].nextid, heap[0].ptr,
161 				RestoreRigor::kIgnoreParseErrors)) != LIZARDFS_STATUS_OK) {
162 			while (heapsize) {
163 				heapsize--;
164 				merger_delete_entry();
165 			}
166 			return status;
167 		}
168 		merger_nextentry(0);
169 		if (heap[0].nextid==0) {
170 			heapsize--;
171 			h = heap[0];
172 			heap[0] = heap[heapsize];
173 			heap[heapsize] = h;
174 			merger_delete_entry();
175 		}
176 		merger_heap_sort_down();
177 	}
178 	return 0;
179 }
180