1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*****************************************************************************
19 * $Id: sessionmonitor.cpp 9210 2013-01-21 14:10:42Z rdempsey $
20 *
21 ****************************************************************************/
22
23 using namespace std;
24
25 #include <sys/types.h>
26 #include <sys/ipc.h>
27 #include <sys/sem.h>
28 #include <sys/shm.h>
29 #include <sys/types.h>
30 #include <time.h>
31 #include <fcntl.h>
32 #include <errno.h>
33 #include <unistd.h>
34
35 #include <iostream>
36 #include <fstream>
37 #include <string>
38 #include <stdexcept>
39 #include <ios>
40 #include <vector>
41
42 #include "calpontsystemcatalog.h"
43 #include "sessionmonitor.h"
44 #include "configcpp.h"
45
46 #include "vendordmlstatement.h"
47 #include "calpontdmlpackage.h"
48 #include "calpontdmlfactory.h"
49 using namespace dmlpackage;
50
51 #include "bytestream.h"
52 #include "messagequeue.h"
53 using namespace messageqcpp;
54
55 using namespace BRM;
56
57 #include "installdir.h"
58
59
60 namespace execplan
61 {
62
SessionMonitor()63 SessionMonitor::SessionMonitor()
64 {
65 config::Config* conf;
66 int madeSems;
67 string stmp;
68
69 fuid = getuid();
70 conf = config::Config::makeConfig();
71
72 try
73 {
74 stmp = conf->getConfig("SessionManager", "MaxConcurrentTransactions");
75 }
76 catch (exception& e)
77 {
78 cout << e.what() << endl;
79 stmp.empty();
80 }
81
82 int tmp = static_cast<int>(config::Config::fromText(stmp));
83
84 if (tmp <= 0)
85 fMaxTxns = 1000;
86 else
87 fMaxTxns = tmp;
88
89 stmp.clear();
90
91 try
92 {
93 stmp = conf->getConfig("SessionMonitor", "SharedMemoryTmpFile");
94 }
95 catch (exception& e)
96 {
97 cout << e.what() << endl;
98 stmp.empty();
99 }
100
101 // Instantiate/delete a SessionManager to make sure it's shared memory segments are present.
102 SessionManager* mgr = new SessionManager();
103 delete mgr;
104
105 if (stmp != "")
106 fSegmentFilename = strdup(stmp.c_str());
107 else
108 {
109 string tmpdir = "/var/lib/columnstore/CalpontSessionMonitorShm";
110
111 fSegmentFilename = strdup(tmpdir);
112 }
113
114 try
115 {
116 madeSems = getSems();
117 fHaveSemaphores = true;
118 }
119 catch (...)
120 {
121 fHaveSemaphores = false;
122 }
123
124 stmp.clear();
125
126 try
127 {
128 stmp = conf->getConfig("SessionMonitor", "TransactionAgeLimit");
129 }
130 catch (exception& e)
131 {
132 cerr << e.what() << endl;
133 stmp.empty();
134 }
135
136 tmp = static_cast<int>(config::Config::fromText(stmp));
137
138 if (tmp <= 0)
139 fAgeLimit = fDefaultAgeLimit;
140 else
141 fAgeLimit = tmp;
142
143 fIsAttached = false;
144 getSharedData();
145 unlock();
146 fCurrentSegment = NULL;
147 fPrevSegment.activeTxns = NULL;
148
149 if (haveSharedMemory())
150 copyCurrentSegment();
151
152 if (haveSemaphores())
153 copyPreviousSegment();
154 }
155
SessionMonitor(const SessionMonitor & sm)156 SessionMonitor::SessionMonitor(const SessionMonitor& sm)
157 {
158 }
159
~SessionMonitor()160 SessionMonitor::~SessionMonitor()
161 {
162 saveAsMonitorData(segmentFilename());
163
164 if (fSessionManagerData != NULL)
165 {
166 lock();
167 detachSegment();
168 unlock();
169 }
170
171 //delete [] fCurrentSegment;
172 //delete [] fSessionMonitorData.activeTxns;
173 //delete [] fPrevSegment.activeTxns;
174 }
175
detachSegment()176 void SessionMonitor::detachSegment()
177 {
178 //delete [] fSessionManagerData;
179
180 fIsAttached = false;
181 }
182
183 //returns 1 if it attached to pre-existing semaphores, else it throws exception.
getSems()184 int SessionMonitor::getSems()
185 {
186 return 1;
187 }
188
lock()189 void SessionMonitor::lock()
190 {
191 }
192
unlock()193 void SessionMonitor::unlock()
194 {
195 }
196
printTxns(const MonSIDTIDEntry & txn) const197 void SessionMonitor::printTxns(const MonSIDTIDEntry& txn) const
198 {
199 cout << "sessionid " << txn.sessionid
200 << " txnid " << txn.txnid.id
201 << " valid " << (txn.txnid.valid == true ? "TRUE" : "FALSE")
202 << " time_t " << txn.txnid.firstrecord
203 << " tdiff " << time(NULL) - txn.txnid.firstrecord
204 << " ctime " << ctime(&txn.txnid.firstrecord);
205 }
206
207 #ifdef SM_DEBUG
printTxns(const SessionManager::SIDTIDEntry & txn) const208 void SessionMonitor::printTxns(const SessionManager::SIDTIDEntry& txn) const
209 {
210 cout << "sessionid " << txn.sessionid
211 << " txnid " << txn.txnid.id
212 << " valid " << (txn.txnid.valid == true ? "TRUE" : "FALSE")
213 << endl;
214 }
215
printMonitorData(const int len) const216 void SessionMonitor::printMonitorData(const int len) const
217 {
218 MonSIDTIDEntry* txns = fPrevSegment.activeTxns;
219
220 cout << "Monitor txnCount " << fPrevSegment.txnCount << " verID " << fPrevSegment.verID << endl;
221
222 for (int idx = 0; txns && idx < len; idx++)
223 {
224 printTxns(txns[idx]);
225 }
226
227 cout << "==" << endl;
228 }
229
printSegment(const SessionManagerData_t * seg,const int len) const230 void SessionMonitor::printSegment(const SessionManagerData_t* seg, const int len) const
231 {
232 if (seg == NULL)
233 {
234 cerr << "No SessionManagerData" << endl;
235 return;
236 }
237
238 cout << "Manager txnCount " << seg->txnCount << " verID " << seg->verID << endl;
239
240 for (int idx = 0; idx < len; idx++)
241 {
242 printTxns(seg->activeTxns[idx]);
243 }
244 }
245 #endif
246
getSharedData()247 void SessionMonitor::getSharedData()
248 {
249 int len;
250
251 fSessionManagerData = reinterpret_cast<SessionManagerData_t*>(sm.getShmContents(len));
252
253 if (fSessionManagerData == NULL)
254 {
255 cerr << "SessionMonitor::getSharedData(): getShmContents() failed" << endl;
256 throw runtime_error("SessionMonitor::getSharedData(): getShmContents() failed. Check the error log.");
257 }
258
259 fIsAttached = true;
260 }
261
initSegment(SessionMonitorData_t * seg)262 void SessionMonitor::initSegment(SessionMonitorData_t* seg)
263 {
264 if (!seg)
265 return;
266
267 seg->txnCount = 0;
268 seg->verID = 0;
269 int size = maxTxns() * sizeof(MonSIDTIDEntry_t);
270
271 if (seg->activeTxns)
272 memset(seg->activeTxns, 0, size);
273 }
274
initSegment(SessionManagerData_t * seg)275 void SessionMonitor::initSegment(SessionManagerData_t* seg)
276 {
277 if (!seg)
278 return;
279
280 int size = maxTxns() * sizeof(SIDTIDEntry_t);
281 seg->txnCount = 0;
282 seg->verID = 0;
283 memset(seg->activeTxns, 0, size);
284 }
285
copyPreviousSegment()286 void SessionMonitor::copyPreviousSegment()
287 {
288 try
289 {
290 bool loadSuccessful = readMonitorDataFromFile(segmentFilename());
291
292 if (!loadSuccessful)
293 {
294 saveAsMonitorData(segmentFilename());
295 readMonitorDataFromFile(segmentFilename());
296 }
297 }
298 catch (...)
299 {
300 saveAsMonitorData(segmentFilename());
301 readMonitorDataFromFile(segmentFilename());
302 }
303 }
304
readMonitorDataFromFile(const std::string filename)305 bool SessionMonitor::readMonitorDataFromFile(const std::string filename)
306 {
307 int err = 0;
308 uint32_t headerSize = 2 * sizeof(int);
309 char* data = reinterpret_cast<char*>(&fPrevSegment);
310 int fd = open(filename.c_str(), O_RDONLY);
311
312 if (fd < 0)
313 {
314 perror("SessionMonitor::readMonitorDataFromFile(): open");
315 return false;
316 }
317
318 err = read(fd, data, headerSize);
319
320 if (err < 0)
321 {
322 if (errno != EINTR)
323 {
324 perror("SessionMonitor::readMonitorDataFromFile(): read");
325 }
326 }
327 else if (err == 0)
328 {
329 close(fd);
330 return false;
331 }
332
333 int dataSize = maxTxns() * sizeof(MonSIDTIDEntry_t);
334 delete [] fPrevSegment.activeTxns;
335
336 fPrevSegment.activeTxns = new MonSIDTIDEntry[maxTxns()];
337 data = reinterpret_cast<char*>(fPrevSegment.activeTxns);
338 memset(data, 0, sizeof(MonSIDTIDEntry)*maxTxns());
339 err = read(fd, data, dataSize);
340
341 if (err < 0)
342 {
343 if (errno != EINTR)
344 {
345 perror("SessionMonitor::readMonitorDataFromFile(): read");
346 }
347 }
348 else if (err == 0)
349 {
350 close(fd);
351 perror("SessionMonitor::readMonitorDataFromFile(): read 0");
352 return false;
353 }
354
355 close(fd);
356
357 return true;
358 }
359
saveAsMonitorData(const std::string)360 void SessionMonitor::saveAsMonitorData(const std::string)
361 {
362 int fd;
363 int err = 0;
364 char* data = reinterpret_cast<char*>(&fSessionMonitorData);
365
366 if (!fSessionMonitorData.activeTxns)
367 {
368 fSessionMonitorData.activeTxns = new MonSIDTIDEntry[maxTxns()];
369 }
370
371 initSegment(&fSessionMonitorData);
372
373 // get the most recent SessionManagerData
374 copyCurrentSegment();
375 fSessionMonitorData.txnCount = fCurrentSegment->txnCount;
376 fSessionMonitorData.verID = fCurrentSegment->verID;
377
378 for (int idx = 0; idx < maxTxns(); idx++)
379 {
380 // is this a new txns or previously existing
381 MonSIDTIDEntry* monitor = &fSessionMonitorData.activeTxns[idx];
382 MonSIDTIDEntry* prevMonitor = (fPrevSegment.activeTxns ? &fPrevSegment.activeTxns[idx] : NULL);
383 SIDTIDEntry* manager = &fCurrentSegment->activeTxns[idx];
384
385 if (prevMonitor)
386 {
387 if (!isEqualSIDTID(*manager, *prevMonitor))
388 {
389 monitor->txnid.firstrecord = time(NULL);
390 }
391 else if (isEqualSIDTID(*manager, *prevMonitor) && isUsed(*prevMonitor))
392 {
393 if (prevMonitor && prevMonitor->txnid.firstrecord == 0)
394 monitor->txnid.firstrecord = time(NULL);
395 else
396 monitor->txnid.firstrecord = prevMonitor->txnid.firstrecord;
397 }
398 else if (manager->txnid.valid == false && monitor->txnid.id == manager->txnid.id)
399 monitor->txnid.firstrecord = 0;
400 }
401 else
402 {
403 if (manager->txnid.valid && manager->txnid.id)
404 {
405 monitor->txnid.firstrecord = time(NULL);
406 }
407 else
408 monitor->txnid.firstrecord = 0;
409 }
410
411 monitor->sessionid = manager->sessionid;
412 monitor->txnid.id = manager->txnid.id;
413 monitor->txnid.valid = manager->txnid.valid;
414 }
415
416 // Always write to a new empty file
417 fd = open(segmentFilename(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
418
419 if (fd < 0)
420 {
421 perror("SessionMonitor::saveAsMonitorData(): open");
422 throw ios_base::failure("SessionMonitor::saveAsMonitorData(): open failed. Check the error log.");
423 }
424
425 int headerSize = 2 * (sizeof(int));
426 err = write(fd, data, headerSize);
427
428 if (err < 0)
429 {
430 if (errno != EINTR)
431 perror("SessionMonitor::saveAsMonitorData(): write");
432 }
433
434 int dataSize = maxTxns() * sizeof(MonSIDTIDEntry);
435 data = reinterpret_cast<char*>(fSessionMonitorData.activeTxns);
436 err = write(fd, data, dataSize);
437
438 if (err < 0)
439 {
440 if (errno != EINTR)
441 perror("SessionMonitor::saveAsMonitorData(): write");
442 }
443
444 close(fd);
445 }
446
copyCurrentSegment()447 void SessionMonitor::copyCurrentSegment()
448 {
449 const int cpsize = 4 * sizeof(int) + 2 * sizeof(boost::interprocess::interprocess_semaphore) + maxTxns() * sizeof(SIDTIDEntry);
450
451 delete [] reinterpret_cast<char*>(fCurrentSegment);
452
453 fCurrentSegment = reinterpret_cast<SessionManagerData_t*>(new char[cpsize]);
454 lock();
455 memcpy(fCurrentSegment, fSessionManagerData, cpsize);
456 unlock();
457 }
458
isUsed(const MonSIDTIDEntry & e) const459 bool SessionMonitor::isUsed( const MonSIDTIDEntry& e) const
460 {
461 if (e.sessionid == 0 && e.txnid.id == 0 && e.txnid.valid == false && e.txnid.firstrecord == 0)
462 return false;
463
464 return true;
465 }
466
isUsedSIDTID(const SIDTIDEntry & e) const467 bool SessionMonitor::isUsedSIDTID( const SIDTIDEntry& e) const
468 {
469 if (e.sessionid == 0 && e.txnid.id == 0 && e.txnid.valid == false)
470 return false;
471
472 return true;
473 }
474
isStaleSIDTID(const SIDTIDEntry & a,const MonSIDTIDEntry & b) const475 bool SessionMonitor::isStaleSIDTID( const SIDTIDEntry& a, const MonSIDTIDEntry& b) const
476 {
477 if (b.txnid.valid && isEqualSIDTID(a, b) && b.txnid.firstrecord && (time(NULL) - b.txnid.firstrecord) > fAgeLimit)
478 return true;
479
480 return false;
481 }
482
isEqualSIDTID(const SIDTIDEntry & a,const MonSIDTIDEntry & b) const483 bool SessionMonitor::isEqualSIDTID( const SIDTIDEntry& a, const MonSIDTIDEntry& b) const
484 {
485 if (a.sessionid == b.sessionid &&
486 a.txnid.id == b.txnid.id &&
487 a.txnid.valid == b.txnid.valid)
488 return true;
489
490 return false;
491 }
492
timedOutTxns()493 vector<SessionMonitor::MonSIDTIDEntry_t*> SessionMonitor::timedOutTxns()
494 {
495 vector<MonSIDTIDEntry_t*> txnsVec;
496
497 copyCurrentSegment();
498
499 if (fCurrentSegment && fPrevSegment.activeTxns != NULL)
500 {
501 for (int idx = 0; fCurrentSegment->activeTxns && fPrevSegment.activeTxns && idx < maxTxns(); idx++)
502 {
503 if (isUsedSIDTID(fCurrentSegment->activeTxns[idx]) &&
504 isStaleSIDTID(fCurrentSegment->activeTxns[idx], fPrevSegment.activeTxns[idx]))
505 {
506 txnsVec.push_back(&fPrevSegment.activeTxns[idx]);
507 }
508 }
509
510 sort(txnsVec.begin(), txnsVec.end(), lessMonSIDTIDEntry());
511 }
512
513 return txnsVec;
514 }
515
txnCount() const516 const int SessionMonitor::txnCount() const
517 {
518 return fSessionMonitorData.txnCount;
519 }
520
521 } //namespace
522