1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stdio.h>
26 #include <stdarg.h>
27 #include <syslog.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <sys/stat.h>
33
34 #include "changelog.h"
35 #include "metadata.h"
36 #include "massert.h"
37 #include "bgsaver.h"
38 #include "main.h"
39 #include "slogger.h"
40 #include "matomlserv.h"
41 #include "cfg.h"
42
43 #define MAXLOGLINESIZE 200000U
44 #define MAXLOGNUMBER 1000U
45 static uint32_t BackLogsNumber;
46 static FILE *currentfd;
47
48
49 #define OLD_CHANGES_BLOCK_SIZE 5000
50
51 typedef struct old_changes_entry {
52 uint64_t version;
53 uint32_t length;
54 uint8_t *data;
55 } old_changes_entry;
56
57 typedef struct old_changes_block {
58 old_changes_entry old_changes_block [OLD_CHANGES_BLOCK_SIZE];
59 uint32_t entries;
60 uint32_t mintimestamp;
61 uint64_t minversion;
62 struct old_changes_block *next;
63 } old_changes_block;
64
65 static old_changes_block *old_changes_head=NULL;
66 static old_changes_block *old_changes_current=NULL;
67
68 static uint16_t ChangelogSecondsToRemember;
69
70 static uint8_t ChangelogSaveMode;
71
72 #define SAVEMODE_BACKGROUND 0
73 #define SAVEMODE_ASYNC 1
74 #define SAVEMODE_SYNC 2
75
changelog_old_changes_free_block(old_changes_block * oc)76 static inline void changelog_old_changes_free_block(old_changes_block *oc) {
77 uint32_t i;
78 for (i=0 ; i<oc->entries ; i++) {
79 free(oc->old_changes_block[i].data);
80 }
81 free(oc);
82 }
83
changelog_store_logstring(uint64_t version,uint8_t * logstr,uint32_t logstrsize)84 static inline void changelog_store_logstring(uint64_t version,uint8_t *logstr,uint32_t logstrsize) {
85 old_changes_block *oc;
86 old_changes_entry *oce;
87 uint32_t ts;
88
89 matomlserv_broadcast_logstring(version,logstr,logstrsize);
90 // matomaserv_broadcast_logstring(version,(uint8_t*)printbuff,leng);
91
92 if (ChangelogSecondsToRemember==0) {
93 while (old_changes_head) {
94 oc = old_changes_head->next;
95 changelog_old_changes_free_block(old_changes_head);
96 old_changes_head = oc;
97 }
98 return;
99 }
100 if (old_changes_current==NULL || old_changes_head==NULL || old_changes_current->entries>=OLD_CHANGES_BLOCK_SIZE) {
101 oc = malloc(sizeof(old_changes_block));
102 passert(oc);
103 ts = main_time();
104 oc->entries = 0;
105 oc->minversion = version;
106 oc->mintimestamp = ts;
107 oc->next = NULL;
108 if (old_changes_current==NULL || old_changes_head==NULL) {
109 old_changes_head = old_changes_current = oc;
110 } else {
111 old_changes_current->next = oc;
112 old_changes_current = oc;
113 }
114 while (old_changes_head && old_changes_head->next && old_changes_head->next->mintimestamp+ChangelogSecondsToRemember<ts) {
115 oc = old_changes_head->next;
116 changelog_old_changes_free_block(old_changes_head);
117 old_changes_head = oc;
118 }
119 }
120 oc = old_changes_current;
121 oce = oc->old_changes_block + oc->entries;
122 oce->version = version;
123 oce->length = logstrsize;
124 oce->data = malloc(logstrsize);
125 passert(oce->data);
126 memcpy(oce->data,logstr,logstrsize);
127 oc->entries++;
128 }
129
changelog_get_old_changes(uint64_t version,void (* sendfn)(void *,uint64_t,uint8_t *,uint32_t),void * userdata,uint32_t limit)130 uint32_t changelog_get_old_changes(uint64_t version,void (*sendfn)(void *,uint64_t,uint8_t *,uint32_t),void *userdata,uint32_t limit) {
131 old_changes_block *oc;
132 old_changes_entry *oce;
133 uint8_t start=0;
134 uint32_t i,j;
135
136 j=0;
137 for (oc=old_changes_head ; oc ; oc=oc->next) {
138 if (oc->minversion<=version && (oc->next==NULL || oc->next->minversion>version)) {
139 start=1;
140 }
141 if (start) {
142 for (i=0 ; i<oc->entries ; i++) {
143 oce = oc->old_changes_block + i;
144 if (version<=oce->version) {
145 if (j<limit) {
146 sendfn(userdata,oce->version,oce->data,oce->length);
147 j++;
148 } else {
149 return j;
150 }
151 }
152 }
153 }
154 }
155 return j;
156 }
157
changelog_get_minversion(void)158 uint64_t changelog_get_minversion(void) {
159 if (old_changes_head==NULL) {
160 return meta_version();
161 }
162 return old_changes_head->minversion;
163 }
164
changelog_rotate()165 void changelog_rotate() {
166 if (ChangelogSaveMode==0) {
167 bgsaver_rotatelog();
168 } else {
169 char logname1[100],logname2[100];
170 uint32_t i;
171 if (currentfd) {
172 if (ChangelogSaveMode==2) {
173 fsync(fileno(currentfd));
174 }
175 fclose(currentfd);
176 currentfd=NULL;
177 }
178 if (BackLogsNumber>0) {
179 for (i=BackLogsNumber ; i>0 ; i--) {
180 snprintf(logname1,100,"changelog.%"PRIu32".mfs",i);
181 snprintf(logname2,100,"changelog.%"PRIu32".mfs",i-1);
182 rename(logname2,logname1);
183 }
184 } else {
185 unlink("changelog.0.mfs");
186 }
187 }
188 matomlserv_broadcast_logrotate();
189 }
190
changelog_mr(uint64_t version,const char * data)191 void changelog_mr(uint64_t version,const char *data) {
192 if (ChangelogSaveMode==0) {
193 bgsaver_changelog(version,data);
194 } else {
195 if (currentfd==NULL) {
196 currentfd = fopen("changelog.0.mfs","a");
197 if (!currentfd) {
198 syslog(LOG_NOTICE,"lost MFS change %"PRIu64": %s",version,data);
199 }
200 }
201
202 if (currentfd) {
203 fprintf(currentfd,"%"PRIu64": %s\n",version,data);
204 fflush(currentfd);
205 if (ChangelogSaveMode==2) {
206 fsync(fileno(currentfd));
207 }
208 }
209 }
210 }
211
changelog(const char * format,...)212 void changelog(const char *format,...) {
213 static char printbuff[MAXLOGLINESIZE];
214 va_list ap;
215 uint32_t leng;
216
217 uint64_t version = meta_version_inc();
218
219 va_start(ap,format);
220 leng = vsnprintf(printbuff,MAXLOGLINESIZE,format,ap);
221 va_end(ap);
222 if (leng>=MAXLOGLINESIZE) {
223 printbuff[MAXLOGLINESIZE-1]='\0';
224 leng=MAXLOGLINESIZE;
225 } else {
226 leng++;
227 }
228
229 changelog_mr(version,printbuff);
230 changelog_store_logstring(version,(uint8_t*)printbuff,leng);
231 }
232
changelog_generate_gids(uint32_t gids,uint32_t * gid)233 char* changelog_generate_gids(uint32_t gids,uint32_t *gid) {
234 static char *gidstr = NULL;
235 static uint32_t gidstr_size = 0;
236 uint32_t i,l;
237
238 i = ((gids/32)+1)*32;
239 i *= 11;
240 i += 10;
241 if (i>gidstr_size || gidstr==NULL) {
242 if (gidstr!=NULL) {
243 free(gidstr);
244 }
245 gidstr = malloc(i);
246 passert(gidstr);
247 gidstr_size = i;
248 }
249 l = 0;
250 gidstr[l++] = '[';
251 for (i=0 ; i<gids ; i++) {
252 if (l<gidstr_size) {
253 l += snprintf(gidstr+l,gidstr_size-l,"%"PRIu32,gid[i]);
254 }
255 if (l<gidstr_size) {
256 gidstr[l++] = (i+1<gids)?',':']';
257 }
258 }
259 if (l<gidstr_size) {
260 gidstr[l++]='\0';
261 }
262 return gidstr;
263 }
264
changelog_escape_name(uint32_t nleng,const uint8_t * name)265 char* changelog_escape_name(uint32_t nleng,const uint8_t *name) {
266 static char *escname[2]={NULL,NULL};
267 static uint32_t escnamesize[2]={0,0};
268 static uint8_t buffid=0;
269 char *currescname=NULL;
270 uint32_t i;
271 uint8_t c;
272 buffid = 1-buffid;
273 i = nleng;
274 i = i*3+1;
275 if (i>escnamesize[buffid] || i==0) {
276 escnamesize[buffid] = ((i/1000)+1)*1000;
277 if (escname[buffid]!=NULL) {
278 free(escname[buffid]);
279 }
280 escname[buffid] = malloc(escnamesize[buffid]);
281 passert(escname[buffid]);
282 }
283 i = 0;
284 currescname = escname[buffid];
285 passert(currescname);
286 while (nleng>0) {
287 c = *name;
288 if (c<32 || c>=127 || c==',' || c=='%' || c=='(' || c==')') {
289 currescname[i++]='%';
290 currescname[i++]="0123456789ABCDEF"[(c>>4)&0xF];
291 currescname[i++]="0123456789ABCDEF"[c&0xF];
292 } else {
293 currescname[i++]=c;
294 }
295 name++;
296 nleng--;
297 }
298 currescname[i]=0;
299 return currescname;
300 }
301
302
changelog_reload(void)303 void changelog_reload(void) {
304 BackLogsNumber = cfg_getuint32("BACK_LOGS",50);
305 if (BackLogsNumber>MAXLOGNUMBER) {
306 mfs_syslog(LOG_WARNING,"BACK_LOGS value too big !!!");
307 BackLogsNumber = MAXLOGNUMBER;
308 }
309 ChangelogSecondsToRemember = cfg_getuint16("CHANGELOG_PRESERVE_SECONDS",1800);
310 if (ChangelogSecondsToRemember>15000) {
311 mfs_arg_syslog(LOG_WARNING,"Number of seconds of change logs to be preserved in master is too big (%"PRIu16") - decreasing to 15000 seconds",ChangelogSecondsToRemember);
312 ChangelogSecondsToRemember=15000;
313 }
314 ChangelogSaveMode = cfg_getuint8("CHANGELOG_SAVE_MODE",0);
315 if (ChangelogSaveMode>2) {
316 mfs_syslog(LOG_WARNING,"CHANGELOG_SAVE_MODE - wrong value - using 0 (write in background)");
317 ChangelogSaveMode = 0;
318 }
319 }
320
changelog_init(void)321 int changelog_init(void) {
322 changelog_reload();
323 main_reload_register(changelog_reload);
324 currentfd = NULL;
325 return 0;
326 }
327
changelog_findfirstversion(const char * fname)328 uint64_t changelog_findfirstversion(const char *fname) {
329 uint8_t buff[50];
330 int32_t s,p;
331 uint64_t fv;
332 int fd;
333
334 fd = open(fname,O_RDONLY);
335 if (fd<0) {
336 return 0;
337 }
338 s = read(fd,buff,50);
339 close(fd);
340 if (s<=0) {
341 return 0;
342 }
343 fv = 0;
344 p = 0;
345 while (p<s && buff[p]>='0' && buff[p]<='9') {
346 fv *= 10;
347 fv += buff[p]-'0';
348 p++;
349 }
350 if (p>=s || buff[p]!=':') {
351 return 0;
352 }
353 return fv;
354 }
355
changelog_findlastversion(const char * fname)356 uint64_t changelog_findlastversion(const char *fname) {
357 struct stat st;
358 uint8_t buff[32800]; // 32800 = 32768 + 32
359 uint64_t size;
360 uint32_t buffpos;
361 uint64_t lastnewline,lv;
362 int fd;
363
364 fd = open(fname,O_RDONLY);
365 if (fd<0) {
366 return 0;
367 }
368 fstat(fd,&st);
369 size = st.st_size;
370 memset(buff,0,32);
371 lastnewline = 0;
372 while (size>0 && size+200000>(uint64_t)(st.st_size)) {
373 if (size>32768) {
374 memcpy(buff+32768,buff,32);
375 size-=32768;
376 lseek(fd,size,SEEK_SET);
377 if (read(fd,buff,32768)!=32768) {
378 close(fd);
379 return 0;
380 }
381 buffpos = 32768;
382 } else {
383 memmove(buff+size,buff,32);
384 lseek(fd,0,SEEK_SET);
385 if (read(fd,buff,size)!=(ssize_t)size) {
386 close(fd);
387 return 0;
388 }
389 buffpos = size;
390 size = 0;
391 }
392 // size = position in file of first byte in buff
393 // buffpos = position of last byte in buff to search
394 while (buffpos>0) {
395 buffpos--;
396 if (buff[buffpos]=='\n' || (size + buffpos)==0) {
397 if (lastnewline==0) {
398 lastnewline = size + buffpos;
399 } else {
400 if (lastnewline+1 != (uint64_t)(st.st_size)) { // garbage at the end of file
401 close(fd);
402 return 0;
403 }
404 if ((size + buffpos)>0) {
405 buffpos++;
406 }
407 lv = 0;
408 while (buffpos<32800 && buff[buffpos]>='0' && buff[buffpos]<='9') {
409 lv *= 10;
410 lv += buff[buffpos]-'0';
411 buffpos++;
412 }
413 if (buffpos==32800 || buff[buffpos]!=':') {
414 lv = 0;
415 }
416 close(fd);
417 return lv;
418 }
419 }
420 }
421 }
422 close(fd);
423 return 0;
424 }
425
changelog_checkname(const char * fname)426 int changelog_checkname(const char *fname) {
427 const char *ptr = fname;
428 if (strncmp(ptr,"changelog.",10)==0) {
429 ptr+=10;
430 if (*ptr>='0' && *ptr<='9') {
431 while (*ptr>='0' && *ptr<='9') {
432 ptr++;
433 }
434 if (strcmp(ptr,".mfs")==0) {
435 return 1;
436 }
437 }
438 } else if (strncmp(ptr,"changelog_ml.",13)==0) {
439 ptr+=13;
440 if (*ptr>='0' && *ptr<='9') {
441 while (*ptr>='0' && *ptr<='9') {
442 ptr++;
443 }
444 if (strcmp(ptr,".mfs")==0) {
445 return 1;
446 }
447 }
448 } else if (strncmp(ptr,"changelog_ml_back.",18)==0) {
449 ptr+=18;
450 if (*ptr>='0' && *ptr<='9') {
451 while (*ptr>='0' && *ptr<='9') {
452 ptr++;
453 }
454 if (strcmp(ptr,".mfs")==0) {
455 return 1;
456 }
457 }
458 }
459 return 0;
460 }
461