1 //
2
3 #include <iostream>
4 #include <iomanip>
5 #include <getopt.h>
6 #include <errno.h>
7 #include <boost/thread.hpp>
8 #define NDEBUG
9 #include <cassert>
10 #include "writeengine.h"
11
12 using namespace std;
13
timespec_sub(const struct timespec & tv1,const struct timespec & tv2,struct timespec & diff)14 void timespec_sub(const struct timespec& tv1,
15 const struct timespec& tv2,
16 struct timespec& diff)
17 {
18 if (tv2.tv_nsec < tv1.tv_nsec)
19 {
20 diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1;
21 diff.tv_nsec = tv1.tv_nsec - tv2.tv_nsec;
22 }
23 else
24 {
25 diff.tv_sec = tv2.tv_sec - tv1.tv_sec;
26 diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec;
27 }
28 }
29
30 // TODO:
31 // add threads for reading a file
32 // add threads for reading multiple files
33
34 struct readThr
35 {
36 public:
readThrreadThr37 readThr(const int oid, const int rSize)
38 {
39 fblockSize = 8192;
40 fpageSize = getpagesize();
41 freadBlocks = rSize;
42 freadSize = rSize * fblockSize; // read size ib bytes
43 freadBufferSz = freadSize + fpageSize;
44 falignedbuff = 0;
45 foid = oid;
46 facc = 0;
47 memset(fname, 0, sizeof(fname));
48 memset((char*)&ftm, 0, sizeof(ftm));
49 memset((char*)&ftm2, 0, sizeof(ftm2));
50 memset((char*)&ftm3, 0, sizeof(ftm3));
51 memset((char*)&fstarttm, 0, sizeof(fstarttm));
52 memset((char*)&fendtm, 0, sizeof(fendtm));
53 memset((char*)&ftottm, 0, sizeof(ftottm));
54 fodirect = true;
55 fd = 0;
56 cout << "o: " << foid << " r: " << freadBlocks << " b: " << freadBufferSz << endl;
57 }
58
operator ()readThr59 void operator() ()
60 {
61
62 WriteEngine::FileOp fFileOp;
63 char frealbuff[freadBufferSz];
64 memset(frealbuff, 0, freadBufferSz);
65
66 if (frealbuff == 0)
67 {
68 cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
69 return;
70 }
71
72 if (fFileOp.getFileName(foid, fname) != WriteEngine::NO_ERROR)
73 {
74 fname[0] = 0;
75 throw std::runtime_error("fileOp.getFileName failed");
76 }
77 else
78 {
79 cout << "Reading oid: " << foid << " od: " << fodirect << " file: " << fname << endl;
80 }
81
82 #if __LP64__
83 falignedbuff = (char*)((((ptrdiff_t)frealbuff >> 12) << 12) + fpageSize);
84 #else
85 falignedbuff = (char*)(((((ptrdiff_t)frealbuff >> 12) << 12) & 0xffffffff) + fpageSize);
86 #endif
87 idbassert(((ptrdiff_t)falignedbuff - (ptrdiff_t)frealbuff) < (ptrdiff_t)fpageSize);
88 idbassert(((ptrdiff_t)falignedbuff % fpageSize) == 0);
89
90 if (fodirect)
91 fd = open(fname, O_RDONLY | O_DIRECT | O_LARGEFILE | O_NOATIME);
92 else
93 fd = open(fname, O_RDONLY | O_LARGEFILE | O_NOATIME);
94
95 if (fd < 0)
96 {
97 cerr << "Open failed" << endl;
98 perror("open");
99 throw runtime_error("Error opening file");
100 }
101
102 uint64_t i = 1;
103 uint64_t rCnt = 0;
104
105 clock_gettime(CLOCK_REALTIME, &fstarttm);
106
107 while (i > 0)
108 {
109 clock_gettime(CLOCK_REALTIME, &ftm);
110 i = pread(fd, falignedbuff, freadSize, facc);
111 clock_gettime(CLOCK_REALTIME, &ftm2);
112
113 idbassert(i == 0 || i == freadSize);
114 idbassert(i % fpageSize == 0);
115 idbassert(facc % fpageSize == 0);
116
117 if (i < 0 && errno == EINTR)
118 {
119 timespec_sub(ftm, ftm2, ftm3);
120 cout << "* "
121 << i << " "
122 << right << setw(2) << setfill(' ') << ftm3.tv_sec << "."
123 << right << setw(9) << setfill('0') << ftm3.tv_nsec
124 << endl;
125 continue;
126 }
127 else if (i < 0)
128 {
129 timespec_sub(ftm, ftm2, ftm3);
130 cout << "* i: "
131 << i << " sz: " << freadSize << " acc: " << facc
132 << right << setw(2) << setfill(' ') << ftm3.tv_sec << " "
133 << right << ftm3.tv_nsec
134 << endl;
135 perror("pread");
136 }
137
138 facc += i;
139
140 if (i > 0)
141 rCnt++;
142
143 /**
144 timespec_sub(ftm, ftm2, ftm3);
145 cout
146 << rCnt << " " << facc/(1024*1024)
147 << right << setw(2) << setfill(' ') << ftm3.tv_sec << "."
148 << right << ftm3.tv_nsec << " i: " << i/(1024*1024)
149 << endl;
150 **/
151
152 } // while(acc...
153
154 clock_gettime(CLOCK_REALTIME, &fendtm);
155 timespec_sub(fstarttm, fendtm, ftottm);
156
157 cout << "Total reads: " << rCnt
158 << " sz: " << facc / (1024 * 1024) << "MB"
159 << " tm: " << ftottm.tv_sec << "secs "
160 << ftottm.tv_nsec << "ns"
161 << endl;
162
163 facc = 0;
164 close(fd);
165 } // operator()
166
167 public:
168 uint64_t facc;
169 uint64_t freadBlocks; // read size ib blocks
170 uint64_t freadSize; // read size ib bytes
171 uint64_t freadBufferSz;
172 uint64_t fblockSize;
173 char* falignedbuff;
174 unsigned fpageSize ;
175 BRM::OID_t foid;
176 char fname[256];
177 struct timespec ftm;
178 struct timespec ftm2;
179 struct timespec ftm3;
180 struct timespec fstarttm;
181 struct timespec fendtm;
182 struct timespec ftottm;
183 bool fodirect;
184 int fd;
185
186 }; // struct readThr
187
188 //
usage()189 void usage()
190 {
191 cerr << "usage: mtread -o <oid> -s <read size in blocks>" << endl;
192 }
193
194 // main()
195 //
main(int argc,char ** argv)196 int main(int argc, char** argv)
197 {
198 int ch = 0;
199 int readBlocks = 0;
200 BRM::OID_t oid = 0;
201 std::vector<BRM::OID_t> oidList;
202
203 enum CLA_ENUM
204 {
205 OID = (int)0,
206 READSIZE
207 };
208
209 //longopt struct
210 //struct option {
211 //const char *name;
212 //int has_arg;
213 //int *flag;
214 //int val;
215 //};
216
217 static struct
218 option long_options[] =
219 {
220 //{const char *name, int has_arg, int *flag, int val},
221 {"oid", required_argument, NULL, OID},
222 {"rsize", required_argument, NULL, READSIZE},
223 {0, 0, 0, 0}
224 };
225
226 if (argc <= 1)
227 {
228 return -1;
229 }
230
231 // process command line arguments
232 while ( (ch = getopt_long_only(argc, argv, "o:s:", long_options, NULL)) != -1 )
233 {
234 //pid_t pidId = getpid();
235 switch (ch)
236 {
237
238 case OID:
239 case 'o':
240 oid = atoi(optarg);
241 oidList.push_back(oid);
242 cout << "oid: " << optarg << endl;
243 break;
244
245 case READSIZE:
246 case 's':
247 readBlocks = atoi(optarg);
248 cout << "read size: " << optarg << endl;
249
250 if (readBlocks <= 0)
251 readBlocks = 1;
252
253 break;
254
255 case '?':
256 default:
257 cout << "optarg " << optarg << endl;
258 usage();
259 break;
260
261 } // switch
262
263 } // while...
264
265
266 uint32_t idx = 0;
267 std::vector<boost::thread*> thrList;
268
269 while (idx < oidList.size())
270 {
271 struct readThr rdr(oidList[idx++], readBlocks);
272 boost::thread* thr = new boost::thread(rdr);
273 thrList.push_back(thr);
274 }
275
276 idx = 0;
277
278 while (idx < thrList.size())
279 {
280 boost::thread* thr = thrList[idx++];
281 thr->join();
282 delete thr;
283 }
284
285 thrList.clear();
286
287 } //main
288
289