1 //
2 
3 #include <iostream>
4 #include <iomanip>
5 #include <getopt.h>
6 #include <errno.h>
7 #include <boost/thread.hpp>
8 #define NDEBUG
9 #include <cassert>
10 #include "writeengine.h"
11 
12 using namespace std;
13 
timespec_sub(const struct timespec & tv1,const struct timespec & tv2,struct timespec & diff)14 void timespec_sub(const struct timespec& tv1,
15                   const struct timespec& tv2,
16                   struct timespec& diff)
17 {
18     if (tv2.tv_nsec < tv1.tv_nsec)
19     {
20         diff.tv_sec = tv2.tv_sec - tv1.tv_sec - 1;
21         diff.tv_nsec = tv1.tv_nsec - tv2.tv_nsec;
22     }
23     else
24     {
25         diff.tv_sec = tv2.tv_sec - tv1.tv_sec;
26         diff.tv_nsec = tv2.tv_nsec - tv1.tv_nsec;
27     }
28 }
29 
30 // TODO:
31 // add threads for reading a file
32 // add threads for reading multiple files
33 
34 struct readThr
35 {
36 public:
readThrreadThr37     readThr(const int oid, const int rSize)
38     {
39         fblockSize = 8192;
40         fpageSize = getpagesize();
41         freadBlocks = rSize;
42         freadSize = rSize * fblockSize; // read size ib bytes
43         freadBufferSz = freadSize + fpageSize;
44         falignedbuff = 0;
45         foid = oid;
46         facc = 0;
47         memset(fname, 0, sizeof(fname));
48         memset((char*)&ftm, 0, sizeof(ftm));
49         memset((char*)&ftm2, 0, sizeof(ftm2));
50         memset((char*)&ftm3, 0, sizeof(ftm3));
51         memset((char*)&fstarttm, 0, sizeof(fstarttm));
52         memset((char*)&fendtm, 0, sizeof(fendtm));
53         memset((char*)&ftottm, 0, sizeof(ftottm));
54         fodirect = true;
55         fd = 0;
56         cout << "o: " << foid << " r: " << freadBlocks << " b: " << freadBufferSz << endl;
57     }
58 
operator ()readThr59     void operator() ()
60     {
61 
62         WriteEngine::FileOp fFileOp;
63         char frealbuff[freadBufferSz];
64         memset(frealbuff, 0, freadBufferSz);
65 
66         if (frealbuff == 0)
67         {
68             cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
69             return;
70         }
71 
72         if (fFileOp.getFileName(foid, fname) != WriteEngine::NO_ERROR)
73         {
74             fname[0] = 0;
75             throw std::runtime_error("fileOp.getFileName failed");
76         }
77         else
78         {
79             cout << "Reading oid: " << foid << " od: " << fodirect << " file: " << fname << endl;
80         }
81 
82 #if __LP64__
83         falignedbuff = (char*)((((ptrdiff_t)frealbuff >> 12) << 12) + fpageSize);
84 #else
85         falignedbuff = (char*)(((((ptrdiff_t)frealbuff >> 12) << 12) & 0xffffffff) + fpageSize);
86 #endif
87         idbassert(((ptrdiff_t)falignedbuff - (ptrdiff_t)frealbuff) < (ptrdiff_t)fpageSize);
88         idbassert(((ptrdiff_t)falignedbuff % fpageSize) == 0);
89 
90         if (fodirect)
91             fd = open(fname, O_RDONLY | O_DIRECT | O_LARGEFILE | O_NOATIME);
92         else
93             fd = open(fname, O_RDONLY | O_LARGEFILE | O_NOATIME);
94 
95         if (fd < 0)
96         {
97             cerr << "Open failed" << endl;
98             perror("open");
99             throw runtime_error("Error opening file");
100         }
101 
102         uint64_t i = 1;
103         uint64_t rCnt = 0;
104 
105         clock_gettime(CLOCK_REALTIME, &fstarttm);
106 
107         while (i > 0)
108         {
109             clock_gettime(CLOCK_REALTIME, &ftm);
110             i = pread(fd, falignedbuff, freadSize, facc);
111             clock_gettime(CLOCK_REALTIME, &ftm2);
112 
113             idbassert(i == 0 || i == freadSize);
114             idbassert(i % fpageSize == 0);
115             idbassert(facc % fpageSize == 0);
116 
117             if (i < 0 && errno == EINTR)
118             {
119                 timespec_sub(ftm, ftm2, ftm3);
120                 cout << "* "
121                      << i << " "
122                      << right << setw(2) << setfill(' ') << ftm3.tv_sec << "."
123                      << right << setw(9) << setfill('0') << ftm3.tv_nsec
124                      << endl;
125                 continue;
126             }
127             else if (i < 0)
128             {
129                 timespec_sub(ftm, ftm2, ftm3);
130                 cout << "* i: "
131                      << i << " sz: " << freadSize << " acc: " << facc
132                      << right << setw(2) << setfill(' ') << ftm3.tv_sec << " "
133                      << right << ftm3.tv_nsec
134                      << endl;
135                 perror("pread");
136             }
137 
138             facc += i;
139 
140             if (i > 0)
141                 rCnt++;
142 
143             /**
144             timespec_sub(ftm, ftm2, ftm3);
145             cout
146             	<< rCnt << " " << facc/(1024*1024)
147             	<< right << setw(2) << setfill(' ') << ftm3.tv_sec << "."
148             	<< right << ftm3.tv_nsec << " i: " << i/(1024*1024)
149             	<< endl;
150             **/
151 
152         } // while(acc...
153 
154         clock_gettime(CLOCK_REALTIME, &fendtm);
155         timespec_sub(fstarttm, fendtm, ftottm);
156 
157         cout << "Total reads: " << rCnt
158              << " sz: " << facc / (1024 * 1024) << "MB"
159              << " tm: " << ftottm.tv_sec << "secs "
160              << ftottm.tv_nsec << "ns"
161              << endl;
162 
163         facc = 0;
164         close(fd);
165     } // operator()
166 
167 public:
168     uint64_t facc;
169     uint64_t freadBlocks; // read size ib blocks
170     uint64_t freadSize; // read size ib bytes
171     uint64_t freadBufferSz;
172     uint64_t fblockSize;
173     char* falignedbuff;
174     unsigned fpageSize ;
175     BRM::OID_t foid;
176     char fname[256];
177     struct timespec ftm;
178     struct timespec ftm2;
179     struct timespec ftm3;
180     struct timespec fstarttm;
181     struct timespec fendtm;
182     struct timespec ftottm;
183     bool fodirect;
184     int fd;
185 
186 }; // struct readThr
187 
188 //
usage()189 void usage()
190 {
191     cerr << "usage: mtread -o <oid> -s <read size in blocks>" << endl;
192 }
193 
194 // main()
195 //
main(int argc,char ** argv)196 int main(int argc, char** argv)
197 {
198     int ch = 0;
199     int readBlocks = 0;
200     BRM::OID_t oid = 0;
201     std::vector<BRM::OID_t> oidList;
202 
203     enum CLA_ENUM
204     {
205         OID = (int)0,
206         READSIZE
207     };
208 
209     //longopt struct
210     //struct option {
211     //const char *name;
212     //int has_arg;
213     //int *flag;
214     //int val;
215     //};
216 
217     static struct
218         option long_options[] =
219     {
220         //{const char *name, 	int has_arg, 	int *flag,	int val},
221         {"oid", 		required_argument, 		NULL, OID},
222         {"rsize", 		required_argument, 		NULL, READSIZE},
223         {0, 					0, 				0, 			0}
224     };
225 
226     if (argc <= 1)
227     {
228         return -1;
229     }
230 
231     // process command line arguments
232     while ( (ch = getopt_long_only(argc, argv, "o:s:", long_options, NULL)) != -1 )
233     {
234         //pid_t pidId = getpid();
235         switch (ch)
236         {
237 
238             case OID:
239             case 'o':
240                 oid = atoi(optarg);
241                 oidList.push_back(oid);
242                 cout << "oid: " << optarg << endl;
243                 break;
244 
245             case READSIZE:
246             case 's':
247                 readBlocks = atoi(optarg);
248                 cout << "read size: " << optarg << endl;
249 
250                 if (readBlocks <= 0)
251                     readBlocks = 1;
252 
253                 break;
254 
255             case '?':
256             default:
257                 cout << "optarg " << optarg << endl;
258                 usage();
259                 break;
260 
261         } // switch
262 
263     } // while...
264 
265 
266     uint32_t idx = 0;
267     std::vector<boost::thread*> thrList;
268 
269     while (idx < oidList.size())
270     {
271         struct readThr rdr(oidList[idx++], readBlocks);
272         boost::thread* thr = new boost::thread(rdr);
273         thrList.push_back(thr);
274     }
275 
276     idx = 0;
277 
278     while (idx < thrList.size())
279     {
280         boost::thread* thr = thrList[idx++];
281         thr->join();
282         delete thr;
283     }
284 
285     thrList.clear();
286 
287 } //main
288 
289