1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <inttypes.h>
25 
26 #include "slogger.h"
27 #include "sharedpointer.h"
28 #include "restore.h"
29 #include "clocks.h"
30 
31 #define BSIZE 200000
32 
33 typedef struct _hentry {
34 	FILE *fd;
35 	void *shfilename;
36 	char *buff;
37 	char *ptr;
38 	int64_t nextid;
39 } hentry;
40 
41 static hentry *heap;
42 static uint32_t heapsize;
43 static int64_t maxidhole;
44 static uint64_t firstlv,lastlv;
45 
46 #define PARENT(x) (((x)-1)/2)
47 #define CHILD(x) (((x)*2)+1)
48 
merger_heap_sort_down(void)49 void merger_heap_sort_down(void) {
50 	uint32_t l,r,m;
51 	uint32_t pos=0;
52 	hentry x;
53 	while (pos<heapsize) {
54 		l = CHILD(pos);
55 		r = l+1;
56 		if (l>=heapsize) {
57 			return;
58 		}
59 		m = l;
60 		if (r<heapsize && heap[r].nextid < heap[l].nextid) {
61 			m = r;
62 		}
63 		if (heap[pos].nextid <= heap[m].nextid) {
64 			return;
65 		}
66 		x = heap[pos];
67 		heap[pos] = heap[m];
68 		heap[m] = x;
69 		pos = m;
70 	}
71 }
72 
merger_heap_sort_up(void)73 void merger_heap_sort_up(void) {
74 	uint32_t pos=heapsize-1;
75 	uint32_t p;
76 	hentry x;
77 	while (pos>0) {
78 		p = PARENT(pos);
79 		if (heap[pos].nextid >= heap[p].nextid) {
80 			return;
81 		}
82 		x = heap[pos];
83 		heap[pos] = heap[p];
84 		heap[p] = x;
85 		pos = p;
86 	}
87 }
88 
89 
merger_nextentry(uint32_t pos)90 void merger_nextentry(uint32_t pos) {
91 	if (fgets(heap[pos].buff,BSIZE,heap[pos].fd)) {
92 		int64_t nextid = strtoll(heap[pos].buff,&(heap[pos].ptr),10);
93 		if (heap[pos].ptr[0]==':' && heap[pos].ptr[1]==' ') {
94 			heap[pos].ptr += 2;
95 		}
96 		if (heap[pos].nextid<0 || (nextid>heap[pos].nextid && nextid<heap[pos].nextid+maxidhole)) {
97 			heap[pos].nextid = nextid;
98 		} else {
99 			mfs_arg_syslog(LOG_WARNING,"found garbage at the end of file: %s (last correct id: %"PRIu64")\n",(char*)shp_get(heap[pos].shfilename),heap[pos].nextid);
100 			heap[pos].nextid = INT64_C(-1);
101 		}
102 	} else {
103 		heap[pos].nextid = INT64_C(-1);
104 	}
105 }
106 
merger_delete_entry(void)107 void merger_delete_entry(void) {
108 	if (heap[heapsize].fd) {
109 		fclose(heap[heapsize].fd);
110 	}
111 	if (heap[heapsize].shfilename!=NULL) {
112 		shp_dec(heap[heapsize].shfilename);
113 	}
114 	if (heap[heapsize].buff) {
115 		free(heap[heapsize].buff);
116 	}
117 }
118 
merger_new_entry(const char * filename)119 void merger_new_entry(const char *filename) {
120 	// printf("add file: %s\n",filename);
121 	if ((heap[heapsize].fd = fopen(filename,"r"))!=NULL) {
122 		heap[heapsize].shfilename = shp_new(strdup(filename),free);
123 		heap[heapsize].buff = malloc(BSIZE);
124 		heap[heapsize].ptr = NULL;
125 		heap[heapsize].nextid = INT64_C(-1);
126 		merger_nextentry(heapsize);
127 	} else {
128 		mfs_arg_syslog(LOG_WARNING,"can't open changelog file: %s\n",filename);
129 		heap[heapsize].shfilename = NULL;
130 		heap[heapsize].buff = NULL;
131 		heap[heapsize].ptr = NULL;
132 		heap[heapsize].nextid = INT64_C(-1);
133 	}
134 }
135 
merger_start(uint32_t files,char ** filenames,uint64_t maxhole,uint64_t minid,uint64_t maxid)136 int merger_start(uint32_t files,char **filenames,uint64_t maxhole,uint64_t minid,uint64_t maxid) {
137 	uint32_t i;
138 	heapsize = 0;
139 	heap = (hentry*)malloc(sizeof(hentry)*files);
140 	if (heap==NULL) {
141 		return -1;
142 	}
143 	for (i=0 ; i<files ; i++) {
144 		merger_new_entry(filenames[i]);
145 //		printf("file: %s / firstid: %"PRIu64"\n",filenames[i],heap[heapsize].nextid);
146 		if (heap[heapsize].nextid<0) {
147 			merger_delete_entry();
148 		} else {
149 			heapsize++;
150 			merger_heap_sort_up();
151 		}
152 	}
153 	maxidhole = maxhole;
154 	firstlv = minid;
155 	lastlv = maxid;
156 //	for (i=0 ; i<heapsize ; i++) {
157 //		printf("heap: %u / firstid: %"PRIu64"\n",i,heap[i].nextid);
158 //	}
159 	return 0;
160 }
161 
merger_loop(uint8_t verblevel)162 int merger_loop(uint8_t verblevel) {
163 	int status;
164 	uint8_t perc,etaok;
165 	uint32_t eta;
166 	uint64_t st,cu;
167 	hentry h;
168 
169 	//perc = 0;
170 	//eta = 0;
171 	//etaok = 0;
172 	st = monotonic_useconds();
173 	while (heapsize) {
174 		if ((heap[0].nextid%2497)==0 && lastlv>firstlv) {
175 			if (heap[0].nextid<(int64_t)firstlv) {
176 				perc = 0;
177 				eta = 0;
178 				etaok = 0;
179 				st = monotonic_useconds();
180 			} else if (heap[0].nextid>(int64_t)lastlv) {
181 				perc = 100;
182 				eta = 0;
183 				etaok = 1;
184 			} else {
185 				cu = monotonic_useconds();
186 				perc = (heap[0].nextid - firstlv) * 100 / (lastlv - firstlv);
187 				eta = ((lastlv - heap[0].nextid) * (cu - st) / (heap[0].nextid - firstlv)) / 1000000U;
188 				etaok = 1;
189 			}
190 			printf("progress: current change: %"PRIu64" (first:%"PRIu64" - last:%"PRIu64" - %"PRIu8"%%",heap[0].nextid,firstlv,lastlv,perc);
191 			if (etaok) {
192 				printf(" - ETA:%02u:%02us)\r",(unsigned int)(eta/60),(unsigned int)(eta%60));
193 			} else {
194 				printf(" - ETA:__:__s)\r");
195 			}
196 			fflush(stdout);
197 		}
198 //		printf("current id: %"PRIu64" / %s\n",heap[0].nextid,heap[0].ptr);
199 		if ((status=restore_file(heap[0].shfilename,heap[0].nextid,heap[0].ptr,verblevel))<0) {
200 			while (heapsize) {
201 				heapsize--;
202 				merger_delete_entry();
203 			}
204 			printf("\n");
205 			return status;
206 		}
207 		merger_nextentry(0);
208 		if (heap[0].nextid<0) {
209 			heapsize--;
210 			h = heap[0];
211 			heap[0] = heap[heapsize];
212 			heap[heapsize] = h;
213 			merger_delete_entry();
214 		}
215 		merger_heap_sort_down();
216 	}
217 	printf("progress: current change: %"PRIu64" (first:%"PRIu64" - last:%"PRIu64" - 100%% - ETA:finished)\n",lastlv,firstlv,lastlv);
218 	return 0;
219 }
220