1 /*
2 Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2014 EditShare, 2013-2015 Skytechnology sp. z o.o..
3
4 This file was part of MooseFS and is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20 #include "mount/oplog.h"
21
22 #include <errno.h>
23 #include <pthread.h>
24 #include <stdarg.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/time.h>
29 #include <time.h>
30
31 #define OPBUFFSIZE 0x1000000
32 #define LINELENG 1000
33 #define MAXHISTORYSIZE 0xF00000
34
35 typedef struct _fhentry {
36 unsigned long fh;
37 uint64_t readpos;
38 uint32_t refcount;
39 struct _fhentry *next;
40 } fhentry;
41
42 static unsigned long nextfh=1;
43 static fhentry *fhhead=NULL;
44
45 static uint8_t opbuff[OPBUFFSIZE];
46 static uint64_t writepos=0;
47 static uint8_t waiting=0;
48 static pthread_mutex_t opbufflock = PTHREAD_MUTEX_INITIALIZER;
49 static pthread_cond_t nodata = PTHREAD_COND_INITIALIZER;
50
51 static time_t gConvTmHour = std::numeric_limits<time_t>::max(); // enforce update on first read
52 static struct tm gConvTm;
53 static pthread_mutex_t timelock = PTHREAD_MUTEX_INITIALIZER;
54
oplog_put(uint8_t * buff,uint32_t leng)55 static inline void oplog_put(uint8_t *buff,uint32_t leng) {
56 uint32_t bpos;
57 if (leng>OPBUFFSIZE) { // just in case
58 buff+=leng-OPBUFFSIZE;
59 leng=OPBUFFSIZE;
60 }
61 pthread_mutex_lock(&opbufflock);
62 bpos = writepos%OPBUFFSIZE;
63 writepos+=leng;
64 if (bpos+leng>OPBUFFSIZE) {
65 memcpy(opbuff+bpos,buff,OPBUFFSIZE-bpos);
66 buff+=OPBUFFSIZE-bpos;
67 leng-=OPBUFFSIZE-bpos;
68 bpos = 0;
69 }
70 memcpy(opbuff+bpos,buff,leng);
71 if (waiting) {
72 pthread_cond_broadcast(&nodata);
73 waiting=0;
74 }
75 pthread_mutex_unlock(&opbufflock);
76 }
77
get_time(timeval & tv,tm & ltime)78 static void get_time(timeval &tv, tm <ime) {
79 gettimeofday(&tv, nullptr);
80 static constexpr time_t secs_per_hour = 60 * 60;
81 time_t hour = tv.tv_sec / secs_per_hour;
82 unsigned secs_this_hour = tv.tv_sec % secs_per_hour;
83
84 pthread_mutex_lock(&timelock);
85 if (hour != gConvTmHour) {
86 gConvTmHour = hour;
87 time_t convts = hour * secs_per_hour;
88 localtime_r(&convts, &gConvTm);
89 }
90 ltime = gConvTm;
91 pthread_mutex_unlock(&timelock);
92
93 assert(ltime.tm_sec == 0);
94 assert(ltime.tm_min == 0);
95
96 ltime.tm_sec = secs_this_hour % 60;
97 ltime.tm_min = secs_this_hour / 60;
98 }
99
oplog_printf(const struct LizardClient::Context & ctx,const char * format,...)100 void oplog_printf(const struct LizardClient::Context &ctx,const char *format,...) {
101 struct timeval tv;
102 struct tm ltime;
103 va_list ap;
104 int r, leng = 0;
105 char buff[LINELENG];
106
107 get_time(tv, ltime);
108 r = snprintf(buff, LINELENG, "%llu %02u.%02u %02u:%02u:%02u.%06u: uid:%u gid:%u pid:%u cmd:",
109 (unsigned long long)tv.tv_sec, ltime.tm_mon + 1, ltime.tm_mday, ltime.tm_hour, ltime.tm_min, ltime.tm_sec, (unsigned)tv.tv_usec,
110 (unsigned)ctx.uid, (unsigned)ctx.gid, (unsigned)ctx.pid);
111 if (r < 0) {
112 return;
113 }
114 leng = std::min(LINELENG - 1, r);
115
116 va_start(ap, format);
117 r = vsnprintf(buff + leng, LINELENG - leng, format, ap);
118 va_end(ap);
119 if (r < 0) {
120 return;
121 }
122 leng += r;
123
124 leng = std::min(LINELENG - 1, leng);
125 buff[leng++] = '\n';
126 oplog_put((uint8_t*)buff, leng);
127 }
128
oplog_printf(const char * format,...)129 void oplog_printf(const char *format, ...) {
130 struct timeval tv;
131 struct tm ltime;
132 va_list ap;
133 int r, leng = 0;
134 char buff[LINELENG];
135
136 get_time(tv, ltime);
137 r = snprintf(buff, LINELENG, "%llu %02u.%02u %02u:%02u:%02u.%06u: cmd:",
138 (unsigned long long)tv.tv_sec, ltime.tm_mon + 1, ltime.tm_mday, ltime.tm_hour, ltime.tm_min, ltime.tm_sec, (unsigned)tv.tv_usec);
139 if (r < 0) {
140 return;
141 }
142 leng = std::min(LINELENG - 1, r);
143
144 va_start(ap, format);
145 r = vsnprintf(buff + leng, LINELENG - leng, format, ap);
146 va_end(ap);
147 if (r < 0) {
148 return;
149 }
150 leng += r;
151
152 leng = std::min(LINELENG - 1, leng);
153 buff[leng++] = '\n';
154 oplog_put((uint8_t*)buff, leng);
155 }
156
oplog_newhandle(int hflag)157 unsigned long oplog_newhandle(int hflag) {
158 fhentry *fhptr;
159 uint32_t bpos;
160
161 pthread_mutex_lock(&opbufflock);
162 fhptr = (fhentry*) malloc(sizeof(fhentry));
163 fhptr->fh = nextfh++;
164 fhptr->refcount = 1;
165 if (hflag) {
166 if (writepos<MAXHISTORYSIZE) {
167 fhptr->readpos = 0;
168 } else {
169 fhptr->readpos = writepos - MAXHISTORYSIZE;
170 bpos = fhptr->readpos%OPBUFFSIZE;
171 while (fhptr->readpos < writepos) {
172 if (opbuff[bpos]=='\n') {
173 break;
174 }
175 bpos++;
176 bpos%=OPBUFFSIZE;
177 fhptr->readpos++;
178 }
179 if (fhptr->readpos<writepos) {
180 fhptr->readpos++;
181 }
182 }
183 } else {
184 fhptr->readpos = writepos;
185 }
186 fhptr->next = fhhead;
187 fhhead = fhptr;
188 pthread_mutex_unlock(&opbufflock);
189 return fhptr->fh;
190 }
191
oplog_releasehandle(unsigned long fh)192 void oplog_releasehandle(unsigned long fh) {
193 fhentry **fhpptr,*fhptr;
194 pthread_mutex_lock(&opbufflock);
195 fhpptr = &fhhead;
196 while ((fhptr = *fhpptr)) {
197 if (fhptr->fh==fh) {
198 fhptr->refcount--;
199 if (fhptr->refcount==0) {
200 *fhpptr = fhptr->next;
201 free(fhptr);
202 } else {
203 fhpptr = &(fhptr->next);
204 }
205 } else {
206 fhpptr = &(fhptr->next);
207 }
208 }
209 pthread_mutex_unlock(&opbufflock);
210 }
211
oplog_getdata(unsigned long fh,uint8_t ** buff,uint32_t * leng,uint32_t maxleng)212 void oplog_getdata(unsigned long fh,uint8_t **buff,uint32_t *leng,uint32_t maxleng) {
213 fhentry *fhptr;
214 uint32_t bpos;
215 struct timeval tv;
216 struct timespec ts;
217
218 pthread_mutex_lock(&opbufflock);
219 for (fhptr=fhhead ; fhptr && fhptr->fh != fh ; fhptr=fhptr->next) {
220 }
221 if (fhptr==NULL) {
222 *buff = NULL;
223 *leng = 0;
224 return;
225 }
226 fhptr->refcount++;
227 while (fhptr->readpos>=writepos) {
228 gettimeofday(&tv,NULL);
229 ts.tv_sec = tv.tv_sec+1;
230 ts.tv_nsec = tv.tv_usec*1000;
231 waiting=1;
232 if (pthread_cond_timedwait(&nodata,&opbufflock,&ts)==ETIMEDOUT) {
233 *buff = (uint8_t*)"#\n";
234 *leng = 2;
235 return;
236 }
237 }
238 bpos = fhptr->readpos%OPBUFFSIZE;
239 *leng = (writepos-(fhptr->readpos));
240 *buff = opbuff+bpos;
241 if ((*leng)>(OPBUFFSIZE-bpos)) {
242 (*leng) = (OPBUFFSIZE-bpos);
243 }
244 if ((*leng)>maxleng) {
245 (*leng) = maxleng;
246 }
247 fhptr->readpos+=(*leng);
248 }
249
oplog_releasedata(unsigned long fh)250 void oplog_releasedata(unsigned long fh) {
251 fhentry **fhpptr,*fhptr;
252 fhpptr = &fhhead;
253 while ((fhptr = *fhpptr)) {
254 if (fhptr->fh==fh) {
255 fhptr->refcount--;
256 if (fhptr->refcount==0) {
257 *fhpptr = fhptr->next;
258 free(fhptr);
259 } else {
260 fhpptr = &(fhptr->next);
261 }
262 } else {
263 fhpptr = &(fhptr->next);
264 }
265 }
266 pthread_mutex_unlock(&opbufflock);
267 }
268