1 /*
2    Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2014 EditShare, 2013-2015 Skytechnology sp. z o.o..
3 
4    This file was part of MooseFS and is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 #include "mount/oplog.h"
21 
22 #include <errno.h>
23 #include <pthread.h>
24 #include <stdarg.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/time.h>
29 #include <time.h>
30 
31 #define OPBUFFSIZE 0x1000000
32 #define LINELENG 1000
33 #define MAXHISTORYSIZE 0xF00000
34 
35 typedef struct _fhentry {
36 	unsigned long fh;
37 	uint64_t readpos;
38 	uint32_t refcount;
39 	struct _fhentry *next;
40 } fhentry;
41 
42 static unsigned long nextfh=1;
43 static fhentry *fhhead=NULL;
44 
45 static uint8_t opbuff[OPBUFFSIZE];
46 static uint64_t writepos=0;
47 static uint8_t waiting=0;
48 static pthread_mutex_t opbufflock = PTHREAD_MUTEX_INITIALIZER;
49 static pthread_cond_t nodata = PTHREAD_COND_INITIALIZER;
50 
51 static time_t gConvTmHour = std::numeric_limits<time_t>::max(); // enforce update on first read
52 static struct tm gConvTm;
53 static pthread_mutex_t timelock = PTHREAD_MUTEX_INITIALIZER;
54 
oplog_put(uint8_t * buff,uint32_t leng)55 static inline void oplog_put(uint8_t *buff,uint32_t leng) {
56 	uint32_t bpos;
57 	if (leng>OPBUFFSIZE) {  // just in case
58 		buff+=leng-OPBUFFSIZE;
59 		leng=OPBUFFSIZE;
60 	}
61 	pthread_mutex_lock(&opbufflock);
62 	bpos = writepos%OPBUFFSIZE;
63 	writepos+=leng;
64 	if (bpos+leng>OPBUFFSIZE) {
65 		memcpy(opbuff+bpos,buff,OPBUFFSIZE-bpos);
66 		buff+=OPBUFFSIZE-bpos;
67 		leng-=OPBUFFSIZE-bpos;
68 		bpos = 0;
69 	}
70 	memcpy(opbuff+bpos,buff,leng);
71 	if (waiting) {
72 		pthread_cond_broadcast(&nodata);
73 		waiting=0;
74 	}
75 	pthread_mutex_unlock(&opbufflock);
76 }
77 
get_time(timeval & tv,tm & ltime)78 static void get_time(timeval &tv, tm &ltime) {
79 	gettimeofday(&tv, nullptr);
80 	static constexpr time_t secs_per_hour = 60 * 60;
81 	time_t hour = tv.tv_sec / secs_per_hour;
82 	unsigned secs_this_hour = tv.tv_sec % secs_per_hour;
83 
84 	pthread_mutex_lock(&timelock);
85 	if (hour != gConvTmHour) {
86 		gConvTmHour = hour;
87 		time_t convts = hour * secs_per_hour;
88 		localtime_r(&convts, &gConvTm);
89 	}
90 	ltime = gConvTm;
91 	pthread_mutex_unlock(&timelock);
92 
93 	assert(ltime.tm_sec == 0);
94 	assert(ltime.tm_min == 0);
95 
96 	ltime.tm_sec = secs_this_hour % 60;
97 	ltime.tm_min = secs_this_hour / 60;
98 }
99 
oplog_printf(const struct LizardClient::Context & ctx,const char * format,...)100 void oplog_printf(const struct LizardClient::Context &ctx,const char *format,...) {
101 	struct timeval tv;
102 	struct tm ltime;
103 	va_list ap;
104 	int r, leng = 0;
105 	char buff[LINELENG];
106 
107 	get_time(tv, ltime);
108 	r  = snprintf(buff, LINELENG, "%llu %02u.%02u %02u:%02u:%02u.%06u: uid:%u gid:%u pid:%u cmd:",
109 		(unsigned long long)tv.tv_sec, ltime.tm_mon + 1, ltime.tm_mday, ltime.tm_hour, ltime.tm_min, ltime.tm_sec, (unsigned)tv.tv_usec,
110 		(unsigned)ctx.uid, (unsigned)ctx.gid, (unsigned)ctx.pid);
111 	if (r < 0) {
112 		return;
113 	}
114 	leng = std::min(LINELENG - 1, r);
115 
116 	va_start(ap, format);
117 	r = vsnprintf(buff + leng, LINELENG - leng, format, ap);
118 	va_end(ap);
119 	if (r < 0) {
120 		return;
121 	}
122 	leng += r;
123 
124 	leng = std::min(LINELENG - 1, leng);
125 	buff[leng++] = '\n';
126 	oplog_put((uint8_t*)buff, leng);
127 }
128 
oplog_printf(const char * format,...)129 void oplog_printf(const char *format, ...) {
130 	struct timeval tv;
131 	struct tm ltime;
132 	va_list ap;
133 	int r, leng = 0;
134 	char buff[LINELENG];
135 
136 	get_time(tv, ltime);
137 	r = snprintf(buff, LINELENG, "%llu %02u.%02u %02u:%02u:%02u.%06u: cmd:",
138 		(unsigned long long)tv.tv_sec, ltime.tm_mon + 1, ltime.tm_mday, ltime.tm_hour, ltime.tm_min, ltime.tm_sec, (unsigned)tv.tv_usec);
139 	if (r < 0) {
140 		return;
141 	}
142 	leng = std::min(LINELENG - 1, r);
143 
144 	va_start(ap, format);
145 	r = vsnprintf(buff + leng, LINELENG - leng, format, ap);
146 	va_end(ap);
147 	if (r < 0) {
148 		return;
149 	}
150 	leng += r;
151 
152 	leng = std::min(LINELENG - 1, leng);
153 	buff[leng++] = '\n';
154 	oplog_put((uint8_t*)buff, leng);
155 }
156 
oplog_newhandle(int hflag)157 unsigned long oplog_newhandle(int hflag) {
158 	fhentry *fhptr;
159 	uint32_t bpos;
160 
161 	pthread_mutex_lock(&opbufflock);
162 	fhptr = (fhentry*) malloc(sizeof(fhentry));
163 	fhptr->fh = nextfh++;
164 	fhptr->refcount = 1;
165 	if (hflag) {
166 		if (writepos<MAXHISTORYSIZE) {
167 			fhptr->readpos = 0;
168 		} else {
169 			fhptr->readpos = writepos - MAXHISTORYSIZE;
170 			bpos = fhptr->readpos%OPBUFFSIZE;
171 			while (fhptr->readpos < writepos) {
172 				if (opbuff[bpos]=='\n') {
173 					break;
174 				}
175 				bpos++;
176 				bpos%=OPBUFFSIZE;
177 				fhptr->readpos++;
178 			}
179 			if (fhptr->readpos<writepos) {
180 				fhptr->readpos++;
181 			}
182 		}
183 	} else {
184 		fhptr->readpos = writepos;
185 	}
186 	fhptr->next = fhhead;
187 	fhhead = fhptr;
188 	pthread_mutex_unlock(&opbufflock);
189 	return fhptr->fh;
190 }
191 
oplog_releasehandle(unsigned long fh)192 void oplog_releasehandle(unsigned long fh) {
193 	fhentry **fhpptr,*fhptr;
194 	pthread_mutex_lock(&opbufflock);
195 	fhpptr = &fhhead;
196 	while ((fhptr = *fhpptr)) {
197 		if (fhptr->fh==fh) {
198 			fhptr->refcount--;
199 			if (fhptr->refcount==0) {
200 				*fhpptr = fhptr->next;
201 				free(fhptr);
202 			} else {
203 				fhpptr = &(fhptr->next);
204 			}
205 		} else {
206 			fhpptr = &(fhptr->next);
207 		}
208 	}
209 	pthread_mutex_unlock(&opbufflock);
210 }
211 
oplog_getdata(unsigned long fh,uint8_t ** buff,uint32_t * leng,uint32_t maxleng)212 void oplog_getdata(unsigned long fh,uint8_t **buff,uint32_t *leng,uint32_t maxleng) {
213 	fhentry *fhptr;
214 	uint32_t bpos;
215 	struct timeval tv;
216 	struct timespec ts;
217 
218 	pthread_mutex_lock(&opbufflock);
219 	for (fhptr=fhhead ; fhptr && fhptr->fh != fh ; fhptr=fhptr->next) {
220 	}
221 	if (fhptr==NULL) {
222 		*buff = NULL;
223 		*leng = 0;
224 		return;
225 	}
226 	fhptr->refcount++;
227 	while (fhptr->readpos>=writepos) {
228 		gettimeofday(&tv,NULL);
229 		ts.tv_sec = tv.tv_sec+1;
230 		ts.tv_nsec = tv.tv_usec*1000;
231 		waiting=1;
232 		if (pthread_cond_timedwait(&nodata,&opbufflock,&ts)==ETIMEDOUT) {
233 			*buff = (uint8_t*)"#\n";
234 			*leng = 2;
235 			return;
236 		}
237 	}
238 	bpos = fhptr->readpos%OPBUFFSIZE;
239 	*leng = (writepos-(fhptr->readpos));
240 	*buff = opbuff+bpos;
241 	if ((*leng)>(OPBUFFSIZE-bpos)) {
242 		(*leng) = (OPBUFFSIZE-bpos);
243 	}
244 	if ((*leng)>maxleng) {
245 		(*leng) = maxleng;
246 	}
247 	fhptr->readpos+=(*leng);
248 }
249 
oplog_releasedata(unsigned long fh)250 void oplog_releasedata(unsigned long fh) {
251 	fhentry **fhpptr,*fhptr;
252 	fhpptr = &fhhead;
253 	while ((fhptr = *fhpptr)) {
254 		if (fhptr->fh==fh) {
255 			fhptr->refcount--;
256 			if (fhptr->refcount==0) {
257 				*fhpptr = fhptr->next;
258 				free(fhptr);
259 			} else {
260 				fhpptr = &(fhptr->next);
261 			}
262 		} else {
263 			fhpptr = &(fhptr->next);
264 		}
265 	}
266 	pthread_mutex_unlock(&opbufflock);
267 }
268