1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <stdarg.h>
29 #include <time.h>
30 #include <sys/time.h>
31 #include <errno.h>
32 #include <pthread.h>
33 
34 #include "oplog.h"
35 
36 #define OPBUFFSIZE 0x1000000
37 #define LINELENG 1000
38 #define MAXHISTORYSIZE 0xF00000
39 
40 typedef struct _fhentry {
41 	unsigned long fh;
42 	uint64_t readpos;
43 	uint32_t refcount;
44 //	uint8_t dotsent;
45 	struct _fhentry *next;
46 } fhentry;
47 
48 static unsigned long nextfh=1;
49 static fhentry *fhhead=NULL;
50 
51 static uint8_t opbuff[OPBUFFSIZE];
52 static uint64_t writepos=0;
53 static uint8_t waiting=0;
54 static pthread_mutex_t opbufflock = PTHREAD_MUTEX_INITIALIZER;
55 static pthread_cond_t nodata = PTHREAD_COND_INITIALIZER;
56 
57 static time_t convts=0;
58 static struct tm convtm;
59 static pthread_mutex_t timelock = PTHREAD_MUTEX_INITIALIZER;
60 //static pthread_mutex_t bufflock = PTHREAD_MUTEX_INITIALIZER;
61 
oplog_put(uint8_t * buff,uint32_t leng)62 static inline void oplog_put(uint8_t *buff,uint32_t leng) {
63 	uint32_t bpos;
64 	if (leng>OPBUFFSIZE) {	// just in case
65 		buff+=leng-OPBUFFSIZE;
66 		leng=OPBUFFSIZE;
67 	}
68 	pthread_mutex_lock(&opbufflock);
69 	bpos = writepos%OPBUFFSIZE;
70 	writepos+=leng;
71 	if (bpos+leng>OPBUFFSIZE) {
72 		memcpy(opbuff+bpos,buff,OPBUFFSIZE-bpos);
73 		buff+=OPBUFFSIZE-bpos;
74 		leng-=OPBUFFSIZE-bpos;
75 		bpos = 0;
76 	}
77 	memcpy(opbuff+bpos,buff,leng);
78 	if (waiting) {
79 		pthread_cond_broadcast(&nodata);
80 		waiting=0;
81 	}
82 	pthread_mutex_unlock(&opbufflock);
83 }
84 
oplog_printf(const struct fuse_ctx * ctx,const char * format,...)85 void oplog_printf(const struct fuse_ctx *ctx,const char *format,...) {
86 	va_list ap;
87 	char buff[LINELENG];
88 	uint32_t leng;
89 	struct timeval tv;
90 	struct tm ltime;
91 
92 	pthread_mutex_lock(&timelock);
93 	gettimeofday(&tv,NULL);
94 	if (convts/900!=tv.tv_sec/900) {
95 		convts=tv.tv_sec/900;
96 		convts*=900;
97 		localtime_r(&convts,&convtm);
98 	}
99 	ltime = convtm;
100 	leng = tv.tv_sec - convts;
101 	ltime.tm_sec += leng%60;
102 	ltime.tm_min += leng/60;
103 	pthread_mutex_unlock(&timelock);
104 //	pthread_mutex_lock(&bufflock);
105 	leng = snprintf(buff,LINELENG,"%02u.%02u %02u:%02u:%02u.%06u: uid:%u gid:%u pid:%u cmd:",ltime.tm_mon+1,ltime.tm_mday,ltime.tm_hour,ltime.tm_min,ltime.tm_sec,(unsigned)(tv.tv_usec),(unsigned)(ctx->uid),(unsigned)(ctx->gid),(unsigned)(ctx->pid));
106 	if (leng<LINELENG) {
107 		va_start(ap,format);
108 		leng += vsnprintf(buff+leng,LINELENG-leng,format,ap);
109 		va_end(ap);
110 	}
111 	if (leng>=LINELENG) {
112 		leng=LINELENG-1;
113 	}
114 	buff[leng++]='\n';
115 	oplog_put((uint8_t*)buff,leng);
116 //	pthread_mutex_unlock(&bufflock);
117 }
118 
oplog_msg(const char * format,...)119 void oplog_msg(const char *format,...) {
120 	va_list ap;
121 	char buff[LINELENG];
122 	uint32_t leng;
123 	struct timeval tv;
124 	struct tm ltime;
125 
126 	pthread_mutex_lock(&timelock);
127 	gettimeofday(&tv,NULL);
128 	if (convts/900!=tv.tv_sec/900) {
129 		convts=tv.tv_sec/900;
130 		convts*=900;
131 		localtime_r(&convts,&convtm);
132 	}
133 	ltime = convtm;
134 	leng = tv.tv_sec - convts;
135 	ltime.tm_sec += leng%60;
136 	ltime.tm_min += leng/60;
137 	pthread_mutex_unlock(&timelock);
138 //	pthread_mutex_lock(&bufflock);
139 	leng = snprintf(buff,LINELENG,"%02u.%02u %02u:%02u:%02u.%06u: msg:",ltime.tm_mon+1,ltime.tm_mday,ltime.tm_hour,ltime.tm_min,ltime.tm_sec,(unsigned)(tv.tv_usec));
140 	if (leng<LINELENG) {
141 		va_start(ap,format);
142 		leng += vsnprintf(buff+leng,LINELENG-leng,format,ap);
143 		va_end(ap);
144 	}
145 	if (leng>=LINELENG) {
146 		leng=LINELENG-1;
147 	}
148 	buff[leng++]='\n';
149 	oplog_put((uint8_t*)buff,leng);
150 //	pthread_mutex_unlock(&bufflock);
151 }
152 
oplog_newhandle(int hflag)153 unsigned long oplog_newhandle(int hflag) {
154 	fhentry *fhptr;
155 	uint32_t bpos;
156 
157 	pthread_mutex_lock(&opbufflock);
158 	fhptr = malloc(sizeof(fhentry));
159 	fhptr->fh = nextfh++;
160 	fhptr->refcount = 1;
161 //	fhptr->dotsent = 0;
162 	if (hflag) {
163 		if (writepos<MAXHISTORYSIZE) {
164 			fhptr->readpos = 0;
165 		} else {
166 			fhptr->readpos = writepos - MAXHISTORYSIZE;
167 			bpos = fhptr->readpos%OPBUFFSIZE;
168 			while (fhptr->readpos < writepos) {
169 				if (opbuff[bpos]=='\n') {
170 					break;
171 				}
172 				bpos++;
173 				bpos%=OPBUFFSIZE;
174 				fhptr->readpos++;
175 			}
176 			if (fhptr->readpos<writepos) {
177 				fhptr->readpos++;
178 			}
179 		}
180 	} else {
181 		fhptr->readpos = writepos;
182 	}
183 	fhptr->next = fhhead;
184 	fhhead = fhptr;
185 	pthread_mutex_unlock(&opbufflock);
186 	return fhptr->fh;
187 }
188 
oplog_releasehandle(unsigned long fh)189 void oplog_releasehandle(unsigned long fh) {
190 	fhentry **fhpptr,*fhptr;
191 	pthread_mutex_lock(&opbufflock);
192 	fhpptr = &fhhead;
193 	while ((fhptr = *fhpptr)) {
194 		if (fhptr->fh==fh) {
195 			fhptr->refcount--;
196 			if (fhptr->refcount==0) {
197 				*fhpptr = fhptr->next;
198 				free(fhptr);
199 			} else {
200 				fhpptr = &(fhptr->next);
201 			}
202 		} else {
203 			fhpptr = &(fhptr->next);
204 		}
205 	}
206 	pthread_mutex_unlock(&opbufflock);
207 }
208 
oplog_getdata(unsigned long fh,uint8_t ** buff,uint32_t * leng,uint32_t maxleng)209 void oplog_getdata(unsigned long fh,uint8_t **buff,uint32_t *leng,uint32_t maxleng) {
210 	fhentry *fhptr;
211 	uint32_t bpos;
212 	struct timeval tv;
213 	struct timespec ts;
214 
215 	pthread_mutex_lock(&opbufflock);
216 	for (fhptr=fhhead ; fhptr && fhptr->fh != fh ; fhptr=fhptr->next ) {
217 	}
218 	if (fhptr==NULL) {
219 		*buff = NULL;
220 		*leng = 0;
221 		return;
222 	}
223 	fhptr->refcount++;
224 	while (fhptr->readpos>=writepos) {
225 		gettimeofday(&tv,NULL);
226 		ts.tv_sec = tv.tv_sec+1;
227 		ts.tv_nsec = tv.tv_usec*1000;
228 		waiting=1;
229 		if (pthread_cond_timedwait(&nodata,&opbufflock,&ts)==ETIMEDOUT) {
230 //			fhptr->dotsent=1;
231 			*buff = (uint8_t*)"#\n";
232 			*leng = 2;
233 			return;
234 		}
235 	}
236 //	if (fhptr->dotsent) {
237 //		fhptr->dotsent=0;
238 //		*buff = (uint8_t*)"\n";
239 //		*leng = 1;
240 //		return;
241 //	}
242 	bpos = fhptr->readpos%OPBUFFSIZE;
243 	*leng = (writepos-(fhptr->readpos));
244 	*buff = opbuff+bpos;
245 	if ((*leng)>(OPBUFFSIZE-bpos)) {
246 		(*leng) = (OPBUFFSIZE-bpos);
247 	}
248 	if ((*leng)>maxleng) {
249 		(*leng) = maxleng;
250 	}
251 	fhptr->readpos+=(*leng);
252 }
253 
oplog_releasedata(unsigned long fh)254 void oplog_releasedata(unsigned long fh) {
255 	fhentry **fhpptr,*fhptr;
256 	fhpptr = &fhhead;
257 	while ((fhptr = *fhpptr)) {
258 		if (fhptr->fh==fh) {
259 			fhptr->refcount--;
260 			if (fhptr->refcount==0) {
261 				*fhpptr = fhptr->next;
262 				free(fhptr);
263 			} else {
264 				fhpptr = &(fhptr->next);
265 			}
266 		} else {
267 			fhpptr = &(fhptr->next);
268 		}
269 	}
270 	pthread_mutex_unlock(&opbufflock);
271 }
272