1 /*
2 Copyright 2013-2014 EditShare, 2013-2015 Skytechnology sp. z o.o.
3
4 This file is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20 #include "metarestore/merger.h"
21
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <syslog.h>
27
28 #include "protocol/MFSCommunication.h"
29 #include "common/lizardfs_error_codes.h"
30 #include "common/slogger.h"
31 #include "master/restore.h"
32
33 #define BSIZE 200000
34
35 typedef struct _hentry {
36 FILE *fd;
37 char *filename;
38 char *buff;
39 char *ptr;
40 uint64_t nextid;
41 } hentry;
42
43 static hentry *heap;
44 static uint32_t heapsize;
45 static uint64_t maxidhole;
46
47 #define PARENT(x) (((x)-1)/2)
48 #define CHILD(x) (((x)*2)+1)
49
merger_heap_sort_down(void)50 void merger_heap_sort_down(void) {
51 uint32_t l,r,m;
52 uint32_t pos=0;
53 hentry x;
54 while (pos<heapsize) {
55 l = CHILD(pos);
56 r = l+1;
57 if (l>=heapsize) {
58 return;
59 }
60 m = l;
61 if (r<heapsize && heap[r].nextid < heap[l].nextid) {
62 m = r;
63 }
64 if (heap[pos].nextid <= heap[m].nextid) {
65 return;
66 }
67 x = heap[pos];
68 heap[pos] = heap[m];
69 heap[m] = x;
70 pos = m;
71 }
72 }
73
merger_heap_sort_up(void)74 void merger_heap_sort_up(void) {
75 uint32_t pos=heapsize-1;
76 uint32_t p;
77 hentry x;
78 while (pos>0) {
79 p = PARENT(pos);
80 if (heap[pos].nextid >= heap[p].nextid) {
81 return;
82 }
83 x = heap[pos];
84 heap[pos] = heap[p];
85 heap[p] = x;
86 pos = p;
87 }
88 }
89
90
merger_nextentry(uint32_t pos)91 void merger_nextentry(uint32_t pos) {
92 if (fgets(heap[pos].buff,BSIZE,heap[pos].fd)) {
93 uint64_t nextid = strtoull(heap[pos].buff,&(heap[pos].ptr),10);
94 if (heap[pos].nextid==0 || (nextid>heap[pos].nextid && nextid<heap[pos].nextid+maxidhole)) {
95 heap[pos].nextid = nextid;
96 } else {
97 lzfs_pretty_syslog(LOG_ERR, "found garbage at the end of file: %s (last correct id: %" PRIu64 ")",
98 heap[pos].filename, heap[pos].nextid);
99 heap[pos].nextid = 0;
100 }
101 } else {
102 heap[pos].nextid = 0;
103 }
104 }
105
merger_delete_entry(void)106 void merger_delete_entry(void) {
107 if (heap[heapsize].fd) {
108 fclose(heap[heapsize].fd);
109 }
110 if (heap[heapsize].filename) {
111 free(heap[heapsize].filename);
112 }
113 if (heap[heapsize].buff) {
114 free(heap[heapsize].buff);
115 }
116 }
117
merger_new_entry(const char * filename)118 void merger_new_entry(const char *filename) {
119 // printf("add file: %s\n",filename);
120 if ((heap[heapsize].fd = fopen(filename,"r"))!=NULL) {
121 heap[heapsize].filename = strdup(filename);
122 heap[heapsize].buff = (char*) malloc(BSIZE);
123 heap[heapsize].ptr = NULL;
124 heap[heapsize].nextid = 0;
125 merger_nextentry(heapsize);
126 } else {
127 lzfs_pretty_syslog(LOG_ERR, "can't open changelog file: %s", filename);
128 heap[heapsize].filename = NULL;
129 heap[heapsize].buff = NULL;
130 heap[heapsize].ptr = NULL;
131 heap[heapsize].nextid = 0;
132 }
133 }
134
merger_start(const std::vector<std::string> & filenames,uint64_t maxhole)135 int merger_start(const std::vector<std::string>& filenames, uint64_t maxhole) {
136 heapsize = 0;
137 heap = (hentry*)malloc(sizeof(hentry)*filenames.size());
138 if (heap==NULL) {
139 return -1;
140 }
141 for (const auto& filename : filenames) {
142 merger_new_entry(filename.c_str());
143 if (heap[heapsize].nextid==0) {
144 merger_delete_entry();
145 } else {
146 heapsize++;
147 merger_heap_sort_up();
148 }
149 }
150 maxidhole = maxhole;
151 return 0;
152 }
153
merger_loop(void)154 uint8_t merger_loop(void) {
155 uint8_t status;
156 hentry h;
157
158 while (heapsize) {
159 // lzfs_pretty_syslog(LOG_DEBUG, "current id: %" PRIu64 " / %s",heap[0].nextid,heap[0].ptr);
160 if ((status=restore(heap[0].filename, heap[0].nextid, heap[0].ptr,
161 RestoreRigor::kIgnoreParseErrors)) != LIZARDFS_STATUS_OK) {
162 while (heapsize) {
163 heapsize--;
164 merger_delete_entry();
165 }
166 return status;
167 }
168 merger_nextentry(0);
169 if (heap[0].nextid==0) {
170 heapsize--;
171 h = heap[0];
172 heap[0] = heap[heapsize];
173 heap[heapsize] = h;
174 merger_delete_entry();
175 }
176 merger_heap_sort_down();
177 }
178 return 0;
179 }
180