1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <stdarg.h>
29 #include <time.h>
30 #include <sys/time.h>
31 #include <errno.h>
32 #include <pthread.h>
33 
34 #include "oplog.h"
35 
36 #define OPBUFFSIZE 0x1000000
37 #define LINELENG 1000
38 #define MAXHISTORYSIZE 0xF00000
39 
40 typedef struct _fhentry {
41 	unsigned long fh;
42 	uint64_t readpos;
43 	uint32_t refcount;
44 //	uint8_t dotsent;
45 	struct _fhentry *next;
46 } fhentry;
47 
48 static unsigned long nextfh=1;
49 static fhentry *fhhead=NULL;
50 
51 static uint8_t opbuff[OPBUFFSIZE];
52 static uint64_t writepos=0;
53 static uint8_t waiting=0;
54 static pthread_mutex_t opbufflock = PTHREAD_MUTEX_INITIALIZER;
55 static pthread_cond_t nodata = PTHREAD_COND_INITIALIZER;
56 
57 static time_t convts=0;
58 static struct tm convtm;
59 static pthread_mutex_t timelock = PTHREAD_MUTEX_INITIALIZER;
60 //static pthread_mutex_t bufflock = PTHREAD_MUTEX_INITIALIZER;
61 
oplog_put(uint8_t * buff,uint32_t leng)62 static inline void oplog_put(uint8_t *buff,uint32_t leng) {
63 	uint32_t bpos;
64 	if (leng>OPBUFFSIZE) {	// just in case
65 		buff+=leng-OPBUFFSIZE;
66 		leng=OPBUFFSIZE;
67 	}
68 	pthread_mutex_lock(&opbufflock);
69 	bpos = writepos%OPBUFFSIZE;
70 	writepos+=leng;
71 	if (bpos+leng>OPBUFFSIZE) {
72 		memcpy(opbuff+bpos,buff,OPBUFFSIZE-bpos);
73 		buff+=OPBUFFSIZE-bpos;
74 		leng-=OPBUFFSIZE-bpos;
75 		bpos = 0;
76 	}
77 	memcpy(opbuff+bpos,buff,leng);
78 	if (waiting) {
79 		pthread_cond_broadcast(&nodata);
80 		waiting=0;
81 	}
82 	pthread_mutex_unlock(&opbufflock);
83 }
84 
oplog_printf(const struct fuse_ctx * ctx,const char * format,...)85 void oplog_printf(const struct fuse_ctx *ctx,const char *format,...) {
86 	va_list ap;
87 	char buff[LINELENG];
88 	uint32_t leng;
89 	struct timeval tv;
90 	struct tm ltime;
91 
92 	pthread_mutex_lock(&timelock);
93 	gettimeofday(&tv,NULL);
94 	if (convts/900!=tv.tv_sec/900) {
95 		convts=tv.tv_sec/900;
96 		convts*=900;
97 		localtime_r(&convts,&convtm);
98 	}
99 	ltime = convtm;
100 	leng = tv.tv_sec - convts;
101 	ltime.tm_sec += leng%60;
102 	ltime.tm_min += leng/60;
103 	pthread_mutex_unlock(&timelock);
104 //	pthread_mutex_lock(&bufflock);
105 	leng = snprintf(buff,LINELENG,"%02u.%02u %02u:%02u:%02u.%06u: uid:%u gid:%u pid:%u cmd:",ltime.tm_mon+1,ltime.tm_mday,ltime.tm_hour,ltime.tm_min,ltime.tm_sec,(unsigned)(tv.tv_usec),(unsigned)(ctx->uid),(unsigned)(ctx->gid),(unsigned)(ctx->pid));
106 	if (leng<LINELENG) {
107 		va_start(ap,format);
108 		leng += vsnprintf(buff+leng,LINELENG-leng,format,ap);
109 		va_end(ap);
110 	}
111 	if (leng>=LINELENG) {
112 		leng=LINELENG-1;
113 	}
114 	buff[leng++]='\n';
115 	oplog_put((uint8_t*)buff,leng);
116 //	pthread_mutex_unlock(&bufflock);
117 }
118 
119 
oplog_newhandle(int hflag)120 unsigned long oplog_newhandle(int hflag) {
121 	fhentry *fhptr;
122 	uint32_t bpos;
123 
124 	pthread_mutex_lock(&opbufflock);
125 	fhptr = malloc(sizeof(fhentry));
126 	fhptr->fh = nextfh++;
127 	fhptr->refcount = 1;
128 //	fhptr->dotsent = 0;
129 	if (hflag) {
130 		if (writepos<MAXHISTORYSIZE) {
131 			fhptr->readpos = 0;
132 		} else {
133 			fhptr->readpos = writepos - MAXHISTORYSIZE;
134 			bpos = fhptr->readpos%OPBUFFSIZE;
135 			while (fhptr->readpos < writepos) {
136 				if (opbuff[bpos]=='\n') {
137 					break;
138 				}
139 				bpos++;
140 				bpos%=OPBUFFSIZE;
141 				fhptr->readpos++;
142 			}
143 			if (fhptr->readpos<writepos) {
144 				fhptr->readpos++;
145 			}
146 		}
147 	} else {
148 		fhptr->readpos = writepos;
149 	}
150 	fhptr->next = fhhead;
151 	fhhead = fhptr;
152 	pthread_mutex_unlock(&opbufflock);
153 	return fhptr->fh;
154 }
155 
oplog_releasehandle(unsigned long fh)156 void oplog_releasehandle(unsigned long fh) {
157 	fhentry **fhpptr,*fhptr;
158 	pthread_mutex_lock(&opbufflock);
159 	fhpptr = &fhhead;
160 	while ((fhptr = *fhpptr)) {
161 		if (fhptr->fh==fh) {
162 			fhptr->refcount--;
163 			if (fhptr->refcount==0) {
164 				*fhpptr = fhptr->next;
165 				free(fhptr);
166 			} else {
167 				fhpptr = &(fhptr->next);
168 			}
169 		} else {
170 			fhpptr = &(fhptr->next);
171 		}
172 	}
173 	pthread_mutex_unlock(&opbufflock);
174 }
175 
oplog_getdata(unsigned long fh,uint8_t ** buff,uint32_t * leng,uint32_t maxleng)176 void oplog_getdata(unsigned long fh,uint8_t **buff,uint32_t *leng,uint32_t maxleng) {
177 	fhentry *fhptr;
178 	uint32_t bpos;
179 	struct timeval tv;
180 	struct timespec ts;
181 
182 	pthread_mutex_lock(&opbufflock);
183 	for (fhptr=fhhead ; fhptr && fhptr->fh != fh ; fhptr=fhptr->next ) {
184 	}
185 	if (fhptr==NULL) {
186 		*buff = NULL;
187 		*leng = 0;
188 		return;
189 	}
190 	fhptr->refcount++;
191 	while (fhptr->readpos>=writepos) {
192 		gettimeofday(&tv,NULL);
193 		ts.tv_sec = tv.tv_sec+1;
194 		ts.tv_nsec = tv.tv_usec*1000;
195 		waiting=1;
196 		if (pthread_cond_timedwait(&nodata,&opbufflock,&ts)==ETIMEDOUT) {
197 //			fhptr->dotsent=1;
198 			*buff = (uint8_t*)"#\n";
199 			*leng = 2;
200 			return;
201 		}
202 	}
203 //	if (fhptr->dotsent) {
204 //		fhptr->dotsent=0;
205 //		*buff = (uint8_t*)"\n";
206 //		*leng = 1;
207 //		return;
208 //	}
209 	bpos = fhptr->readpos%OPBUFFSIZE;
210 	*leng = (writepos-(fhptr->readpos));
211 	*buff = opbuff+bpos;
212 	if ((*leng)>(OPBUFFSIZE-bpos)) {
213 		(*leng) = (OPBUFFSIZE-bpos);
214 	}
215 	if ((*leng)>maxleng) {
216 		(*leng) = maxleng;
217 	}
218 	fhptr->readpos+=(*leng);
219 }
220 
oplog_releasedata(unsigned long fh)221 void oplog_releasedata(unsigned long fh) {
222 	fhentry **fhpptr,*fhptr;
223 	fhpptr = &fhhead;
224 	while ((fhptr = *fhpptr)) {
225 		if (fhptr->fh==fh) {
226 			fhptr->refcount--;
227 			if (fhptr->refcount==0) {
228 				*fhpptr = fhptr->next;
229 				free(fhptr);
230 			} else {
231 				fhpptr = &(fhptr->next);
232 			}
233 		} else {
234 			fhpptr = &(fhptr->next);
235 		}
236 	}
237 	pthread_mutex_unlock(&opbufflock);
238 }
239