1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*****************************************************************************
19  * $Id: sessionmonitor.cpp 9210 2013-01-21 14:10:42Z rdempsey $
20  *
21  ****************************************************************************/
22 
23 using namespace std;
24 
25 #include <sys/types.h>
26 #include <sys/ipc.h>
27 #include <sys/sem.h>
28 #include <sys/shm.h>
29 #include <sys/types.h>
30 #include <time.h>
31 #include <fcntl.h>
32 #include <errno.h>
33 #include <unistd.h>
34 
35 #include <iostream>
36 #include <fstream>
37 #include <string>
38 #include <stdexcept>
39 #include <ios>
40 #include <vector>
41 
42 #include "calpontsystemcatalog.h"
43 #include "sessionmonitor.h"
44 #include "configcpp.h"
45 
46 #include "vendordmlstatement.h"
47 #include "calpontdmlpackage.h"
48 #include "calpontdmlfactory.h"
49 using namespace dmlpackage;
50 
51 #include "bytestream.h"
52 #include "messagequeue.h"
53 using namespace messageqcpp;
54 
55 using namespace BRM;
56 
57 #include "installdir.h"
58 
59 
60 namespace execplan
61 {
62 
SessionMonitor()63 SessionMonitor::SessionMonitor()
64 {
65     config::Config* conf;
66     int madeSems;
67     string stmp;
68 
69     fuid = getuid();
70     conf = config::Config::makeConfig();
71 
72     try
73     {
74         stmp = conf->getConfig("SessionManager", "MaxConcurrentTransactions");
75     }
76     catch (exception& e)
77     {
78         cout << e.what() << endl;
79         stmp.empty();
80     }
81 
82     int tmp = static_cast<int>(config::Config::fromText(stmp));
83 
84     if (tmp <= 0)
85         fMaxTxns = 1000;
86     else
87         fMaxTxns = tmp;
88 
89     stmp.clear();
90 
91     try
92     {
93         stmp = conf->getConfig("SessionMonitor", "SharedMemoryTmpFile");
94     }
95     catch (exception& e)
96     {
97         cout << e.what() << endl;
98         stmp.empty();
99     }
100 
101     // Instantiate/delete a SessionManager to make sure it's shared memory segments are present.
102     SessionManager* mgr = new SessionManager();
103     delete mgr;
104 
105     if (stmp != "")
106         fSegmentFilename = strdup(stmp.c_str());
107     else
108     {
109 		string tmpdir = "/var/lib/columnstore/CalpontSessionMonitorShm";
110 
111         fSegmentFilename = strdup(tmpdir);
112 	}
113 
114     try
115     {
116         madeSems = getSems();
117         fHaveSemaphores = true;
118     }
119     catch (...)
120     {
121         fHaveSemaphores = false;
122     }
123 
124     stmp.clear();
125 
126     try
127     {
128         stmp = conf->getConfig("SessionMonitor", "TransactionAgeLimit");
129     }
130     catch (exception& e)
131     {
132         cerr << e.what() << endl;
133         stmp.empty();
134     }
135 
136     tmp = static_cast<int>(config::Config::fromText(stmp));
137 
138     if (tmp <= 0)
139         fAgeLimit = fDefaultAgeLimit;
140     else
141         fAgeLimit = tmp;
142 
143     fIsAttached = false;
144     getSharedData();
145     unlock();
146     fCurrentSegment = NULL;
147     fPrevSegment.activeTxns = NULL;
148 
149     if (haveSharedMemory())
150         copyCurrentSegment();
151 
152     if (haveSemaphores())
153         copyPreviousSegment();
154 }
155 
SessionMonitor(const SessionMonitor & sm)156 SessionMonitor::SessionMonitor(const SessionMonitor& sm)
157 {
158 }
159 
~SessionMonitor()160 SessionMonitor::~SessionMonitor()
161 {
162     saveAsMonitorData(segmentFilename());
163 
164     if (fSessionManagerData != NULL)
165     {
166         lock();
167         detachSegment();
168         unlock();
169     }
170 
171     //delete [] fCurrentSegment;
172     //delete [] fSessionMonitorData.activeTxns;
173     //delete [] fPrevSegment.activeTxns;
174 }
175 
detachSegment()176 void SessionMonitor::detachSegment()
177 {
178     //delete [] fSessionManagerData;
179 
180     fIsAttached = false;
181 }
182 
183 //returns 1 if it attached to pre-existing semaphores, else it throws exception.
getSems()184 int SessionMonitor::getSems()
185 {
186     return 1;
187 }
188 
lock()189 void SessionMonitor::lock()
190 {
191 }
192 
unlock()193 void SessionMonitor::unlock()
194 {
195 }
196 
printTxns(const MonSIDTIDEntry & txn) const197 void SessionMonitor::printTxns(const MonSIDTIDEntry& txn) const
198 {
199     cout << "sessionid " << txn.sessionid
200          << " txnid " << txn.txnid.id
201          << " valid " << (txn.txnid.valid == true ? "TRUE" : "FALSE")
202          << " time_t " << txn.txnid.firstrecord
203          << " tdiff " << time(NULL) - txn.txnid.firstrecord
204          << " ctime " << ctime(&txn.txnid.firstrecord);
205 }
206 
207 #ifdef SM_DEBUG
printTxns(const SessionManager::SIDTIDEntry & txn) const208 void SessionMonitor::printTxns(const SessionManager::SIDTIDEntry& txn) const
209 {
210     cout << "sessionid " << txn.sessionid
211          << " txnid " << txn.txnid.id
212          << " valid " << (txn.txnid.valid == true ? "TRUE" : "FALSE")
213          << endl;
214 }
215 
printMonitorData(const int len) const216 void SessionMonitor::printMonitorData(const int len) const
217 {
218     MonSIDTIDEntry* txns = fPrevSegment.activeTxns;
219 
220     cout << "Monitor txnCount " << fPrevSegment.txnCount << " verID " << fPrevSegment.verID << endl;
221 
222     for (int idx = 0; txns && idx < len; idx++)
223     {
224         printTxns(txns[idx]);
225     }
226 
227     cout << "==" << endl;
228 }
229 
printSegment(const SessionManagerData_t * seg,const int len) const230 void SessionMonitor::printSegment(const SessionManagerData_t* seg, const int len) const
231 {
232     if (seg == NULL)
233     {
234         cerr << "No SessionManagerData" << endl;
235         return;
236     }
237 
238     cout << "Manager txnCount " << seg->txnCount << " verID " << seg->verID << endl;
239 
240     for (int idx = 0; idx < len; idx++)
241     {
242         printTxns(seg->activeTxns[idx]);
243     }
244 }
245 #endif
246 
getSharedData()247 void SessionMonitor::getSharedData()
248 {
249     int len;
250 
251     fSessionManagerData = reinterpret_cast<SessionManagerData_t*>(sm.getShmContents(len));
252 
253     if (fSessionManagerData == NULL)
254     {
255         cerr << "SessionMonitor::getSharedData(): getShmContents() failed" << endl;
256         throw runtime_error("SessionMonitor::getSharedData(): getShmContents() failed.  Check the error log.");
257     }
258 
259     fIsAttached = true;
260 }
261 
initSegment(SessionMonitorData_t * seg)262 void SessionMonitor::initSegment(SessionMonitorData_t* seg)
263 {
264     if (!seg)
265         return;
266 
267     seg->txnCount = 0;
268     seg->verID = 0;
269     int size = maxTxns() * sizeof(MonSIDTIDEntry_t);
270 
271     if (seg->activeTxns)
272         memset(seg->activeTxns, 0, size);
273 }
274 
initSegment(SessionManagerData_t * seg)275 void SessionMonitor::initSegment(SessionManagerData_t* seg)
276 {
277     if (!seg)
278         return;
279 
280     int size = maxTxns() * sizeof(SIDTIDEntry_t);
281     seg->txnCount = 0;
282     seg->verID = 0;
283     memset(seg->activeTxns, 0, size);
284 }
285 
copyPreviousSegment()286 void SessionMonitor::copyPreviousSegment()
287 {
288     try
289     {
290         bool loadSuccessful = readMonitorDataFromFile(segmentFilename());
291 
292         if (!loadSuccessful)
293         {
294             saveAsMonitorData(segmentFilename());
295             readMonitorDataFromFile(segmentFilename());
296         }
297     }
298     catch (...)
299     {
300         saveAsMonitorData(segmentFilename());
301         readMonitorDataFromFile(segmentFilename());
302     }
303 }
304 
readMonitorDataFromFile(const std::string filename)305 bool SessionMonitor::readMonitorDataFromFile(const std::string filename)
306 {
307     int err = 0;
308     uint32_t headerSize = 2 * sizeof(int);
309     char* data = reinterpret_cast<char*>(&fPrevSegment);
310     int fd = open(filename.c_str(), O_RDONLY);
311 
312     if (fd < 0)
313     {
314         perror("SessionMonitor::readMonitorDataFromFile(): open");
315         return false;
316     }
317 
318     err = read(fd, data, headerSize);
319 
320     if (err < 0)
321     {
322         if (errno != EINTR)
323         {
324             perror("SessionMonitor::readMonitorDataFromFile(): read");
325         }
326     }
327     else if (err == 0)
328     {
329         close(fd);
330         return false;
331     }
332 
333     int dataSize = maxTxns() * sizeof(MonSIDTIDEntry_t);
334     delete [] fPrevSegment.activeTxns;
335 
336     fPrevSegment.activeTxns = new MonSIDTIDEntry[maxTxns()];
337     data = reinterpret_cast<char*>(fPrevSegment.activeTxns);
338     memset(data, 0, sizeof(MonSIDTIDEntry)*maxTxns());
339     err = read(fd, data, dataSize);
340 
341     if (err < 0)
342     {
343         if (errno != EINTR)
344         {
345             perror("SessionMonitor::readMonitorDataFromFile(): read");
346         }
347     }
348     else if (err == 0)
349     {
350         close(fd);
351         perror("SessionMonitor::readMonitorDataFromFile(): read 0");
352         return false;
353     }
354 
355     close(fd);
356 
357     return true;
358 }
359 
saveAsMonitorData(const std::string)360 void SessionMonitor::saveAsMonitorData(const std::string)
361 {
362     int fd;
363     int err = 0;
364     char* data = reinterpret_cast<char*>(&fSessionMonitorData);
365 
366     if (!fSessionMonitorData.activeTxns)
367     {
368         fSessionMonitorData.activeTxns = new MonSIDTIDEntry[maxTxns()];
369     }
370 
371     initSegment(&fSessionMonitorData);
372 
373     // get the most recent SessionManagerData
374     copyCurrentSegment();
375     fSessionMonitorData.txnCount = fCurrentSegment->txnCount;
376     fSessionMonitorData.verID = fCurrentSegment->verID;
377 
378     for (int idx = 0; idx < maxTxns(); idx++)
379     {
380         // is this a new txns or previously existing
381         MonSIDTIDEntry* monitor = &fSessionMonitorData.activeTxns[idx];
382         MonSIDTIDEntry* prevMonitor = (fPrevSegment.activeTxns ? &fPrevSegment.activeTxns[idx] : NULL);
383         SIDTIDEntry* manager = &fCurrentSegment->activeTxns[idx];
384 
385         if (prevMonitor)
386         {
387             if (!isEqualSIDTID(*manager, *prevMonitor))
388             {
389                 monitor->txnid.firstrecord = time(NULL);
390             }
391             else if (isEqualSIDTID(*manager, *prevMonitor) && isUsed(*prevMonitor))
392             {
393                 if (prevMonitor && prevMonitor->txnid.firstrecord == 0)
394                     monitor->txnid.firstrecord = time(NULL);
395                 else
396                     monitor->txnid.firstrecord = prevMonitor->txnid.firstrecord;
397             }
398             else if (manager->txnid.valid == false && monitor->txnid.id == manager->txnid.id)
399                 monitor->txnid.firstrecord = 0;
400         }
401         else
402         {
403             if (manager->txnid.valid && manager->txnid.id)
404             {
405                 monitor->txnid.firstrecord = time(NULL);
406             }
407             else
408                 monitor->txnid.firstrecord = 0;
409         }
410 
411         monitor->sessionid = manager->sessionid;
412         monitor->txnid.id = manager->txnid.id;
413         monitor->txnid.valid = manager->txnid.valid;
414     }
415 
416     // Always write to a new empty file
417     fd = open(segmentFilename(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
418 
419     if (fd < 0)
420     {
421         perror("SessionMonitor::saveAsMonitorData(): open");
422         throw ios_base::failure("SessionMonitor::saveAsMonitorData(): open failed.	Check the error log.");
423     }
424 
425     int headerSize = 2 * (sizeof(int));
426     err = write(fd, data, headerSize);
427 
428     if (err < 0)
429     {
430         if (errno != EINTR)
431             perror("SessionMonitor::saveAsMonitorData(): write");
432     }
433 
434     int dataSize = maxTxns() * sizeof(MonSIDTIDEntry);
435     data = reinterpret_cast<char*>(fSessionMonitorData.activeTxns);
436     err = write(fd, data, dataSize);
437 
438     if (err < 0)
439     {
440         if (errno != EINTR)
441             perror("SessionMonitor::saveAsMonitorData(): write");
442     }
443 
444     close(fd);
445 }
446 
copyCurrentSegment()447 void SessionMonitor::copyCurrentSegment()
448 {
449     const int cpsize = 4 * sizeof(int) + 2 * sizeof(boost::interprocess::interprocess_semaphore) + maxTxns() * sizeof(SIDTIDEntry);
450 
451     delete [] reinterpret_cast<char*>(fCurrentSegment);
452 
453     fCurrentSegment = reinterpret_cast<SessionManagerData_t*>(new char[cpsize]);
454     lock();
455     memcpy(fCurrentSegment, fSessionManagerData, cpsize);
456     unlock();
457 }
458 
isUsed(const MonSIDTIDEntry & e) const459 bool SessionMonitor::isUsed( const MonSIDTIDEntry& e) const
460 {
461     if (e.sessionid == 0 && e.txnid.id == 0 && e.txnid.valid == false && e.txnid.firstrecord == 0)
462         return false;
463 
464     return true;
465 }
466 
isUsedSIDTID(const SIDTIDEntry & e) const467 bool SessionMonitor::isUsedSIDTID( const SIDTIDEntry& e) const
468 {
469     if (e.sessionid == 0 && e.txnid.id == 0 && e.txnid.valid == false)
470         return false;
471 
472     return true;
473 }
474 
isStaleSIDTID(const SIDTIDEntry & a,const MonSIDTIDEntry & b) const475 bool SessionMonitor::isStaleSIDTID( const SIDTIDEntry& a,  const MonSIDTIDEntry& b) const
476 {
477     if (b.txnid.valid && isEqualSIDTID(a, b) && b.txnid.firstrecord && (time(NULL) - b.txnid.firstrecord) > fAgeLimit)
478         return true;
479 
480     return false;
481 }
482 
isEqualSIDTID(const SIDTIDEntry & a,const MonSIDTIDEntry & b) const483 bool SessionMonitor::isEqualSIDTID( const SIDTIDEntry& a,  const MonSIDTIDEntry& b) const
484 {
485     if (a.sessionid == b.sessionid &&
486             a.txnid.id == b.txnid.id &&
487             a.txnid.valid == b.txnid.valid)
488         return true;
489 
490     return false;
491 }
492 
timedOutTxns()493 vector<SessionMonitor::MonSIDTIDEntry_t*> SessionMonitor::timedOutTxns()
494 {
495     vector<MonSIDTIDEntry_t*> txnsVec;
496 
497     copyCurrentSegment();
498 
499     if (fCurrentSegment && fPrevSegment.activeTxns != NULL)
500     {
501         for (int idx = 0; fCurrentSegment->activeTxns  && fPrevSegment.activeTxns && idx < maxTxns(); idx++)
502         {
503             if (isUsedSIDTID(fCurrentSegment->activeTxns[idx]) &&
504                     isStaleSIDTID(fCurrentSegment->activeTxns[idx], fPrevSegment.activeTxns[idx]))
505             {
506                 txnsVec.push_back(&fPrevSegment.activeTxns[idx]);
507             }
508         }
509 
510         sort(txnsVec.begin(), txnsVec.end(), lessMonSIDTIDEntry());
511     }
512 
513     return txnsVec;
514 }
515 
txnCount() const516 const int SessionMonitor::txnCount() const
517 {
518     return fSessionMonitorData.txnCount;
519 }
520 
521 }  //namespace
522