1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <inttypes.h>
25
26 #include "slogger.h"
27 #include "sharedpointer.h"
28 #include "restore.h"
29 #include "clocks.h"
30
31 #define BSIZE 200000
32
33 typedef struct _hentry {
34 FILE *fd;
35 void *shfilename;
36 char *buff;
37 char *ptr;
38 int64_t nextid;
39 } hentry;
40
41 static hentry *heap;
42 static uint32_t heapsize;
43 static int64_t maxidhole;
44 static uint64_t firstlv,lastlv;
45
46 #define PARENT(x) (((x)-1)/2)
47 #define CHILD(x) (((x)*2)+1)
48
merger_heap_sort_down(void)49 void merger_heap_sort_down(void) {
50 uint32_t l,r,m;
51 uint32_t pos=0;
52 hentry x;
53 while (pos<heapsize) {
54 l = CHILD(pos);
55 r = l+1;
56 if (l>=heapsize) {
57 return;
58 }
59 m = l;
60 if (r<heapsize && heap[r].nextid < heap[l].nextid) {
61 m = r;
62 }
63 if (heap[pos].nextid <= heap[m].nextid) {
64 return;
65 }
66 x = heap[pos];
67 heap[pos] = heap[m];
68 heap[m] = x;
69 pos = m;
70 }
71 }
72
merger_heap_sort_up(void)73 void merger_heap_sort_up(void) {
74 uint32_t pos=heapsize-1;
75 uint32_t p;
76 hentry x;
77 while (pos>0) {
78 p = PARENT(pos);
79 if (heap[pos].nextid >= heap[p].nextid) {
80 return;
81 }
82 x = heap[pos];
83 heap[pos] = heap[p];
84 heap[p] = x;
85 pos = p;
86 }
87 }
88
89
merger_nextentry(uint32_t pos)90 void merger_nextentry(uint32_t pos) {
91 if (fgets(heap[pos].buff,BSIZE,heap[pos].fd)) {
92 int64_t nextid = strtoll(heap[pos].buff,&(heap[pos].ptr),10);
93 if (heap[pos].ptr[0]==':' && heap[pos].ptr[1]==' ') {
94 heap[pos].ptr += 2;
95 }
96 if (heap[pos].nextid<0 || (nextid>heap[pos].nextid && nextid<heap[pos].nextid+maxidhole)) {
97 heap[pos].nextid = nextid;
98 } else {
99 mfs_arg_syslog(LOG_WARNING,"found garbage at the end of file: %s (last correct id: %"PRIu64")\n",(char*)shp_get(heap[pos].shfilename),heap[pos].nextid);
100 heap[pos].nextid = INT64_C(-1);
101 }
102 } else {
103 heap[pos].nextid = INT64_C(-1);
104 }
105 }
106
merger_delete_entry(void)107 void merger_delete_entry(void) {
108 if (heap[heapsize].fd) {
109 fclose(heap[heapsize].fd);
110 }
111 if (heap[heapsize].shfilename!=NULL) {
112 shp_dec(heap[heapsize].shfilename);
113 }
114 if (heap[heapsize].buff) {
115 free(heap[heapsize].buff);
116 }
117 }
118
merger_new_entry(const char * filename)119 void merger_new_entry(const char *filename) {
120 // printf("add file: %s\n",filename);
121 if ((heap[heapsize].fd = fopen(filename,"r"))!=NULL) {
122 heap[heapsize].shfilename = shp_new(strdup(filename),free);
123 heap[heapsize].buff = malloc(BSIZE);
124 heap[heapsize].ptr = NULL;
125 heap[heapsize].nextid = INT64_C(-1);
126 merger_nextentry(heapsize);
127 } else {
128 mfs_arg_syslog(LOG_WARNING,"can't open changelog file: %s\n",filename);
129 heap[heapsize].shfilename = NULL;
130 heap[heapsize].buff = NULL;
131 heap[heapsize].ptr = NULL;
132 heap[heapsize].nextid = INT64_C(-1);
133 }
134 }
135
merger_start(uint32_t files,char ** filenames,uint64_t maxhole,uint64_t minid,uint64_t maxid)136 int merger_start(uint32_t files,char **filenames,uint64_t maxhole,uint64_t minid,uint64_t maxid) {
137 uint32_t i;
138 heapsize = 0;
139 heap = (hentry*)malloc(sizeof(hentry)*files);
140 if (heap==NULL) {
141 return -1;
142 }
143 for (i=0 ; i<files ; i++) {
144 merger_new_entry(filenames[i]);
145 // printf("file: %s / firstid: %"PRIu64"\n",filenames[i],heap[heapsize].nextid);
146 if (heap[heapsize].nextid<0) {
147 merger_delete_entry();
148 } else {
149 heapsize++;
150 merger_heap_sort_up();
151 }
152 }
153 maxidhole = maxhole;
154 firstlv = minid;
155 lastlv = maxid;
156 // for (i=0 ; i<heapsize ; i++) {
157 // printf("heap: %u / firstid: %"PRIu64"\n",i,heap[i].nextid);
158 // }
159 return 0;
160 }
161
merger_loop(uint8_t verblevel)162 int merger_loop(uint8_t verblevel) {
163 int status;
164 uint8_t perc,etaok;
165 uint32_t eta;
166 uint64_t st,cu;
167 hentry h;
168
169 //perc = 0;
170 //eta = 0;
171 //etaok = 0;
172 st = monotonic_useconds();
173 while (heapsize) {
174 if ((heap[0].nextid%2497)==0 && lastlv>firstlv) {
175 if (heap[0].nextid<(int64_t)firstlv) {
176 perc = 0;
177 eta = 0;
178 etaok = 0;
179 st = monotonic_useconds();
180 } else if (heap[0].nextid>(int64_t)lastlv) {
181 perc = 100;
182 eta = 0;
183 etaok = 1;
184 } else {
185 cu = monotonic_useconds();
186 perc = (heap[0].nextid - firstlv) * 100 / (lastlv - firstlv);
187 eta = ((lastlv - heap[0].nextid) * (cu - st) / (heap[0].nextid - firstlv)) / 1000000U;
188 etaok = 1;
189 }
190 printf("progress: current change: %"PRIu64" (first:%"PRIu64" - last:%"PRIu64" - %"PRIu8"%%",heap[0].nextid,firstlv,lastlv,perc);
191 if (etaok) {
192 printf(" - ETA:%02u:%02us)\r",(unsigned int)(eta/60),(unsigned int)(eta%60));
193 } else {
194 printf(" - ETA:__:__s)\r");
195 }
196 fflush(stdout);
197 }
198 // printf("current id: %"PRIu64" / %s\n",heap[0].nextid,heap[0].ptr);
199 if ((status=restore_file(heap[0].shfilename,heap[0].nextid,heap[0].ptr,verblevel))<0) {
200 while (heapsize) {
201 heapsize--;
202 merger_delete_entry();
203 }
204 printf("\n");
205 return status;
206 }
207 merger_nextentry(0);
208 if (heap[0].nextid<0) {
209 heapsize--;
210 h = heap[0];
211 heap[0] = heap[heapsize];
212 heap[heapsize] = h;
213 merger_delete_entry();
214 }
215 merger_heap_sort_down();
216 }
217 printf("progress: current change: %"PRIu64" (first:%"PRIu64" - last:%"PRIu64" - 100%% - ETA:finished)\n",lastlv,firstlv,lastlv);
218 return 0;
219 }
220