1 /*
2 * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <stdarg.h>
29 #include <time.h>
30 #include <sys/time.h>
31 #include <errno.h>
32 #include <pthread.h>
33
34 #include "oplog.h"
35
36 #define OPBUFFSIZE 0x1000000
37 #define LINELENG 1000
38 #define MAXHISTORYSIZE 0xF00000
39
40 typedef struct _fhentry {
41 unsigned long fh;
42 uint64_t readpos;
43 uint32_t refcount;
44 // uint8_t dotsent;
45 struct _fhentry *next;
46 } fhentry;
47
48 static unsigned long nextfh=1;
49 static fhentry *fhhead=NULL;
50
51 static uint8_t opbuff[OPBUFFSIZE];
52 static uint64_t writepos=0;
53 static uint8_t waiting=0;
54 static pthread_mutex_t opbufflock = PTHREAD_MUTEX_INITIALIZER;
55 static pthread_cond_t nodata = PTHREAD_COND_INITIALIZER;
56
57 static time_t convts=0;
58 static struct tm convtm;
59 static pthread_mutex_t timelock = PTHREAD_MUTEX_INITIALIZER;
60 //static pthread_mutex_t bufflock = PTHREAD_MUTEX_INITIALIZER;
61
oplog_put(uint8_t * buff,uint32_t leng)62 static inline void oplog_put(uint8_t *buff,uint32_t leng) {
63 uint32_t bpos;
64 if (leng>OPBUFFSIZE) { // just in case
65 buff+=leng-OPBUFFSIZE;
66 leng=OPBUFFSIZE;
67 }
68 pthread_mutex_lock(&opbufflock);
69 bpos = writepos%OPBUFFSIZE;
70 writepos+=leng;
71 if (bpos+leng>OPBUFFSIZE) {
72 memcpy(opbuff+bpos,buff,OPBUFFSIZE-bpos);
73 buff+=OPBUFFSIZE-bpos;
74 leng-=OPBUFFSIZE-bpos;
75 bpos = 0;
76 }
77 memcpy(opbuff+bpos,buff,leng);
78 if (waiting) {
79 pthread_cond_broadcast(&nodata);
80 waiting=0;
81 }
82 pthread_mutex_unlock(&opbufflock);
83 }
84
oplog_printf(const struct fuse_ctx * ctx,const char * format,...)85 void oplog_printf(const struct fuse_ctx *ctx,const char *format,...) {
86 va_list ap;
87 char buff[LINELENG];
88 uint32_t leng;
89 struct timeval tv;
90 struct tm ltime;
91
92 pthread_mutex_lock(&timelock);
93 gettimeofday(&tv,NULL);
94 if (convts/900!=tv.tv_sec/900) {
95 convts=tv.tv_sec/900;
96 convts*=900;
97 localtime_r(&convts,&convtm);
98 }
99 ltime = convtm;
100 leng = tv.tv_sec - convts;
101 ltime.tm_sec += leng%60;
102 ltime.tm_min += leng/60;
103 pthread_mutex_unlock(&timelock);
104 // pthread_mutex_lock(&bufflock);
105 leng = snprintf(buff,LINELENG,"%02u.%02u %02u:%02u:%02u.%06u: uid:%u gid:%u pid:%u cmd:",ltime.tm_mon+1,ltime.tm_mday,ltime.tm_hour,ltime.tm_min,ltime.tm_sec,(unsigned)(tv.tv_usec),(unsigned)(ctx->uid),(unsigned)(ctx->gid),(unsigned)(ctx->pid));
106 if (leng<LINELENG) {
107 va_start(ap,format);
108 leng += vsnprintf(buff+leng,LINELENG-leng,format,ap);
109 va_end(ap);
110 }
111 if (leng>=LINELENG) {
112 leng=LINELENG-1;
113 }
114 buff[leng++]='\n';
115 oplog_put((uint8_t*)buff,leng);
116 // pthread_mutex_unlock(&bufflock);
117 }
118
119
oplog_newhandle(int hflag)120 unsigned long oplog_newhandle(int hflag) {
121 fhentry *fhptr;
122 uint32_t bpos;
123
124 pthread_mutex_lock(&opbufflock);
125 fhptr = malloc(sizeof(fhentry));
126 fhptr->fh = nextfh++;
127 fhptr->refcount = 1;
128 // fhptr->dotsent = 0;
129 if (hflag) {
130 if (writepos<MAXHISTORYSIZE) {
131 fhptr->readpos = 0;
132 } else {
133 fhptr->readpos = writepos - MAXHISTORYSIZE;
134 bpos = fhptr->readpos%OPBUFFSIZE;
135 while (fhptr->readpos < writepos) {
136 if (opbuff[bpos]=='\n') {
137 break;
138 }
139 bpos++;
140 bpos%=OPBUFFSIZE;
141 fhptr->readpos++;
142 }
143 if (fhptr->readpos<writepos) {
144 fhptr->readpos++;
145 }
146 }
147 } else {
148 fhptr->readpos = writepos;
149 }
150 fhptr->next = fhhead;
151 fhhead = fhptr;
152 pthread_mutex_unlock(&opbufflock);
153 return fhptr->fh;
154 }
155
oplog_releasehandle(unsigned long fh)156 void oplog_releasehandle(unsigned long fh) {
157 fhentry **fhpptr,*fhptr;
158 pthread_mutex_lock(&opbufflock);
159 fhpptr = &fhhead;
160 while ((fhptr = *fhpptr)) {
161 if (fhptr->fh==fh) {
162 fhptr->refcount--;
163 if (fhptr->refcount==0) {
164 *fhpptr = fhptr->next;
165 free(fhptr);
166 } else {
167 fhpptr = &(fhptr->next);
168 }
169 } else {
170 fhpptr = &(fhptr->next);
171 }
172 }
173 pthread_mutex_unlock(&opbufflock);
174 }
175
oplog_getdata(unsigned long fh,uint8_t ** buff,uint32_t * leng,uint32_t maxleng)176 void oplog_getdata(unsigned long fh,uint8_t **buff,uint32_t *leng,uint32_t maxleng) {
177 fhentry *fhptr;
178 uint32_t bpos;
179 struct timeval tv;
180 struct timespec ts;
181
182 pthread_mutex_lock(&opbufflock);
183 for (fhptr=fhhead ; fhptr && fhptr->fh != fh ; fhptr=fhptr->next ) {
184 }
185 if (fhptr==NULL) {
186 *buff = NULL;
187 *leng = 0;
188 return;
189 }
190 fhptr->refcount++;
191 while (fhptr->readpos>=writepos) {
192 gettimeofday(&tv,NULL);
193 ts.tv_sec = tv.tv_sec+1;
194 ts.tv_nsec = tv.tv_usec*1000;
195 waiting=1;
196 if (pthread_cond_timedwait(&nodata,&opbufflock,&ts)==ETIMEDOUT) {
197 // fhptr->dotsent=1;
198 *buff = (uint8_t*)"#\n";
199 *leng = 2;
200 return;
201 }
202 }
203 // if (fhptr->dotsent) {
204 // fhptr->dotsent=0;
205 // *buff = (uint8_t*)"\n";
206 // *leng = 1;
207 // return;
208 // }
209 bpos = fhptr->readpos%OPBUFFSIZE;
210 *leng = (writepos-(fhptr->readpos));
211 *buff = opbuff+bpos;
212 if ((*leng)>(OPBUFFSIZE-bpos)) {
213 (*leng) = (OPBUFFSIZE-bpos);
214 }
215 if ((*leng)>maxleng) {
216 (*leng) = maxleng;
217 }
218 fhptr->readpos+=(*leng);
219 }
220
oplog_releasedata(unsigned long fh)221 void oplog_releasedata(unsigned long fh) {
222 fhentry **fhpptr,*fhptr;
223 fhpptr = &fhhead;
224 while ((fhptr = *fhpptr)) {
225 if (fhptr->fh==fh) {
226 fhptr->refcount--;
227 if (fhptr->refcount==0) {
228 *fhpptr = fhptr->next;
229 free(fhptr);
230 } else {
231 fhpptr = &(fhptr->next);
232 }
233 } else {
234 fhpptr = &(fhptr->next);
235 }
236 }
237 pthread_mutex_unlock(&opbufflock);
238 }
239