1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*****************************************************************************
19 * $Id: dbrm.cpp 1878 2013-05-02 15:17:12Z dcathey $
20 *
21 ****************************************************************************/
22
23 #include <iostream>
24 #include <sys/types.h>
25 #include <vector>
26 #ifdef __linux__
27 #include <values.h>
28 #endif
29 #include <boost/thread.hpp>
30 //#define NDEBUG
31 #include <cassert>
32
33 #include "oamcache.h"
34 #include "rwlock.h"
35 #include "mastersegmenttable.h"
36 #include "extentmap.h"
37 #include "copylocks.h"
38 #include "vss.h"
39 #include "vbbm.h"
40 #include "socketclosed.h"
41 #include "configcpp.h"
42 #include "sessionmanagerserver.h"
43 #include "messagequeuepool.h"
44 #define DBRM_DLLEXPORT
45 #include "dbrm.h"
46 #undef DBRM_DLLEXPORT
47
48 #ifdef BRM_DEBUG
49 #define CHECK_EMPTY(x) \
50 if (x.length() != 0) \
51 throw logic_error("DBRM: got a message of the wrong size");
52 #else
53 #define CHECK_EMPTY(x)
54 #endif
55
56 #define DO_ERR_NETWORK \
57 MessageQueueClientPool::releaseInstance(msgClient); \
58 msgClient = NULL; \
59 mutex.unlock(); \
60 return ERR_NETWORK;
61
62 using namespace std;
63 using namespace messageqcpp;
64 using namespace oam;
65
66 #ifdef BRM_INFO
67 #include "tracer.h"
68 #endif
69
70 namespace BRM
71 {
72
DBRM(bool noBRMinit)73 DBRM::DBRM(bool noBRMinit) : fDebug(false)
74 {
75 if (!noBRMinit)
76 {
77 mst.reset(new MasterSegmentTable());
78 em.reset(new ExtentMap());
79 vss.reset(new VSS());
80 vbbm.reset(new VBBM());
81 copylocks.reset(new CopyLocks());
82
83 em->setReadOnly();
84 vss->setReadOnly();
85 vbbm->setReadOnly();
86 }
87
88 msgClient = NULL;
89 masterName = "DBRM_Controller";
90 config = config::Config::makeConfig();
91 #ifdef BRM_INFO
92 fDebug = ("Y" == config->getConfig("DBRM", "Debug"));
93 #endif
94 }
95
DBRM(const DBRM & brm)96 DBRM::DBRM(const DBRM& brm)
97 {
98 throw logic_error("DBRM: Don't use the copy constructor.");
99 }
100
~DBRM()101 DBRM::~DBRM() throw()
102 {
103 if (msgClient != NULL)
104 MessageQueueClientPool::releaseInstance(msgClient);
105 }
106
operator =(const DBRM & brm)107 DBRM& DBRM::operator=(const DBRM& brm)
108 {
109 throw logic_error("DBRM: Don't use the = operator.");
110 }
111
112
saveState()113 int DBRM::saveState() throw()
114 {
115 #ifdef BRM_INFO
116
117 if (fDebug) TRACER_WRITENOW("saveState()");
118
119 #endif
120 string prefix = config->getConfig("SystemConfig", "DBRMRoot");
121
122 if (prefix.length() == 0)
123 {
124 cerr << "Error: Need a valid Calpont configuation file" << endl;
125 exit(1);
126 }
127
128 int rc = saveState(prefix);
129
130 return rc;
131 }
132
saveState(string filename)133 int DBRM::saveState(string filename) throw()
134 {
135 #ifdef BRM_INFO
136
137 if (fDebug)
138 {
139 TRACER_WRITELATER("saveState(filename)");
140 TRACER_ADDSTRINPUT(filename);
141 TRACER_WRITE;
142 }
143
144 #endif
145 string emFilename = filename + "_em";
146 string vssFilename = filename + "_vss";
147 string vbbmFilename = filename + "_vbbm";
148 bool locked[3] = { false, false, false };
149
150 try
151 {
152 vbbm->lock(VBBM::READ);
153 locked[0] = true;
154 vss->lock(VSS::READ);
155 locked[1] = true;
156 copylocks->lock(CopyLocks::READ);
157 locked[2] = true;
158
159 saveExtentMap(emFilename);
160 vbbm->save(vbbmFilename);
161 vss->save(vssFilename);
162
163 copylocks->release(CopyLocks::READ);
164 locked[2] = false;
165 vss->release(VSS::READ);
166 locked[1] = false;
167 vbbm->release(VBBM::READ);
168 locked[0] = false;
169 }
170 catch (exception& e)
171 {
172 if (locked[2])
173 copylocks->release(CopyLocks::READ);
174
175 if (locked[1])
176 vss->release(VSS::READ);
177
178 if (locked[0])
179 vbbm->release(VBBM::READ);
180
181 return -1;
182 }
183
184 return 0;
185 }
186
saveExtentMap(const string & filename)187 int DBRM::saveExtentMap(const string& filename) throw()
188 {
189 #ifdef BRM_INFO
190
191 if (fDebug)
192 {
193 TRACER_WRITELATER("saveExtentMap");
194 TRACER_ADDSTRINPUT(filename);
195 TRACER_WRITE;
196 }
197
198 #endif
199
200 try
201 {
202 em->save(filename);
203 }
204 catch (exception& e)
205 {
206 cerr << e.what() << endl;
207 return -1;
208 }
209
210 return 0;
211 }
212
213 // @bug 1055+. New functions added for multiple files per OID enhancement.
lookupLocal(LBID_t lbid,VER_t verid,bool vbFlag,OID_t & oid,uint16_t & dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,uint32_t & fileBlockOffset)214 int DBRM::lookupLocal(LBID_t lbid, VER_t verid, bool vbFlag, OID_t& oid,
215 uint16_t& dbRoot, uint32_t& partitionNum, uint16_t& segmentNum, uint32_t& fileBlockOffset) throw()
216 {
217 #ifdef BRM_INFO
218
219 if (fDebug)
220 {
221 TRACER_WRITELATER("lookupLocal(lbid,ver,..)");
222 TRACER_ADDINPUT(lbid);
223 TRACER_ADDINPUT(verid);
224 TRACER_ADDBOOLINPUT(vbFlag);
225 TRACER_ADDOUTPUT(oid);
226 TRACER_ADDSHORTOUTPUT(dbRoot);
227 TRACER_ADDOUTPUT(partitionNum);
228 TRACER_ADDSHORTOUTPUT(segmentNum);
229 TRACER_ADDOUTPUT(fileBlockOffset);
230 TRACER_WRITE;
231 }
232
233 #endif
234 bool locked[2] = {false, false};
235 int ret;
236 bool tooOld = false;
237
238 try
239 {
240 if (!vbFlag)
241 return em->lookupLocal(lbid, (int&)oid, dbRoot, partitionNum, segmentNum, fileBlockOffset);
242 else
243 {
244 vbbm->lock(VBBM::READ);
245 locked[0] = true;
246 ret = vbbm->lookup(lbid, verid, oid, fileBlockOffset);
247 vbbm->release(VBBM::READ);
248 locked[0] = false;
249
250 if (ret < 0)
251 {
252 vss->lock(VSS::READ);
253 locked[1] = true;
254 tooOld = vss->isTooOld(lbid, verid);
255 vss->release(VSS::READ);
256 locked[1] = false;
257
258 if (tooOld)
259 return ERR_SNAPSHOT_TOO_OLD;
260 }
261
262 return ret;
263 }
264 }
265 catch (exception& e)
266 {
267 if (locked[1])
268 vss->release(VSS::READ);
269
270 if (locked[0])
271 vbbm->release(VBBM::READ);
272
273 cerr << e.what() << endl;
274 return -1;
275 }
276 }
277
lookupLocal(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,uint32_t fileBlockOffset,LBID_t & lbid)278 int DBRM::lookupLocal(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, uint32_t fileBlockOffset, LBID_t& lbid) throw()
279 {
280 #ifdef BRM_INFO
281
282 if (fDebug)
283 {
284 TRACER_WRITELATER("lookupLocal(oid,fbo,..)");
285 TRACER_ADDINPUT(oid);
286 TRACER_ADDINPUT(partitionNum);
287 TRACER_ADDSHORTINPUT(segmentNum);
288 TRACER_ADDINPUT(fileBlockOffset);
289 TRACER_ADDOUTPUT(lbid);
290 TRACER_WRITE;
291 }
292
293 #endif
294
295 try
296 {
297 return em->lookupLocal(oid, partitionNum, segmentNum, fileBlockOffset, lbid);
298 }
299 catch (exception& e)
300 {
301 cerr << e.what() << endl;
302 return -1;
303 }
304 }
305
lookupLocal_DBroot(OID_t oid,uint32_t dbroot,uint32_t partitionNum,uint16_t segmentNum,uint32_t fileBlockOffset,LBID_t & lbid)306 int DBRM::lookupLocal_DBroot(OID_t oid, uint32_t dbroot, uint32_t partitionNum, uint16_t segmentNum,
307 uint32_t fileBlockOffset, LBID_t& lbid) throw()
308 {
309 #ifdef BRM_INFO
310
311 if (fDebug)
312 {
313 TRACER_WRITELATER("lookupLocal(oid,fbo,..)");
314 TRACER_ADDINPUT(oid);
315 TRACER_ADDINPUT(partitionNum);
316 TRACER_ADDSHORTINPUT(segmentNum);
317 TRACER_ADDINPUT(fileBlockOffset);
318 TRACER_ADDOUTPUT(lbid);
319 TRACER_WRITE;
320 }
321
322 #endif
323
324 try
325 {
326 return em->lookupLocal_DBroot(oid, dbroot, partitionNum, segmentNum, fileBlockOffset, lbid);
327 }
328 catch (exception& e)
329 {
330 cerr << e.what() << endl;
331 return -1;
332 }
333 }
334
335 // @bug 1055-
336
337 //------------------------------------------------------------------------------
338 // Lookup/return starting LBID for the specified OID, partition, segment, and
339 // file block offset.
340 //------------------------------------------------------------------------------
lookupLocalStartLbid(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,uint32_t fileBlockOffset,LBID_t & lbid)341 int DBRM::lookupLocalStartLbid(OID_t oid,
342 uint32_t partitionNum,
343 uint16_t segmentNum,
344 uint32_t fileBlockOffset,
345 LBID_t& lbid) throw()
346 {
347 #ifdef BRM_INFO
348
349 if (fDebug)
350 {
351 TRACER_WRITELATER("lookupLocalStartLbid(oid,fbo,..)");
352 TRACER_ADDINPUT(oid);
353 TRACER_ADDINPUT(partitionNum);
354 TRACER_ADDSHORTINPUT(segmentNum);
355 TRACER_ADDINPUT(fileBlockOffset);
356 TRACER_ADDOUTPUT(lbid);
357 TRACER_WRITE;
358 }
359
360 #endif
361
362 try
363 {
364 return em->lookupLocalStartLbid(oid, partitionNum, segmentNum,
365 fileBlockOffset, lbid);
366 }
367 catch (exception& e)
368 {
369 cerr << e.what() << endl;
370 return -1;
371 }
372 }
373
lookup(OID_t oid,LBIDRange_v & lbidList)374 int DBRM::lookup(OID_t oid, LBIDRange_v& lbidList) throw()
375 {
376 #ifdef BRM_INFO
377
378 if (fDebug)
379 {
380 TRACER_WRITELATER("lookup(oid,range)");
381 TRACER_ADDINPUT(oid);
382 TRACER_WRITE;
383 }
384
385 #endif
386
387 try
388 {
389 em->lookup(oid, lbidList);
390 return 0;
391 }
392 catch (exception& e)
393 {
394 cerr << e.what() << endl;
395 return -1;
396 }
397 }
398
399 // Casual Partitioning support
markExtentInvalid(const LBID_t lbid,execplan::CalpontSystemCatalog::ColDataType colDataType)400 int DBRM::markExtentInvalid(const LBID_t lbid,
401 execplan::CalpontSystemCatalog::ColDataType colDataType) DBRM_THROW
402 {
403 #ifdef BRM_INFO
404
405 if (fDebug)
406 {
407 TRACER_WRITELATER("markExtentInvalid");
408 TRACER_ADDINPUT(lbid);
409 TRACER_WRITE;
410 }
411
412 #endif
413
414 ByteStream command, response;
415 uint8_t err;
416
417 command << MARKEXTENTINVALID << (uint64_t)lbid << (uint32_t)colDataType;
418 err = send_recv(command, response);
419
420 if (err != ERR_OK)
421 return err;
422
423 if (response.length() == 0)
424 return ERR_NETWORK;
425
426 response >> err;
427 CHECK_EMPTY(response);
428 return err;
429 }
430
markExtentsInvalid(const vector<LBID_t> & lbids,const std::vector<execplan::CalpontSystemCatalog::ColDataType> & colDataTypes)431 int DBRM::markExtentsInvalid(const vector<LBID_t>& lbids,
432 const std::vector<execplan::CalpontSystemCatalog::ColDataType>& colDataTypes) DBRM_THROW
433 {
434 #ifdef BRM_INFO
435
436 if (fDebug) TRACER_WRITENOW("markExtentsInvalid");
437
438 #endif
439 ByteStream command, response;
440 uint8_t err;
441 uint32_t size = lbids.size(), i;
442
443 command << MARKMANYEXTENTSINVALID << size;
444
445 for (i = 0; i < size; i++)
446 {
447 command << (uint64_t) lbids[i];
448 command << (uint32_t) colDataTypes[i];
449 }
450
451 err = send_recv(command, response);
452
453 if (err != ERR_OK)
454 return err;
455
456 if (response.length() == 0)
457 return ERR_NETWORK;
458
459 response >> err;
460 CHECK_EMPTY(response);
461 return err;
462 }
463
getExtentMaxMin(const LBID_t lbid,int64_t & max,int64_t & min,int32_t & seqNum)464 int DBRM::getExtentMaxMin(const LBID_t lbid, int64_t& max, int64_t& min, int32_t& seqNum) throw()
465 {
466 #ifdef BRM_INFO
467
468 if (fDebug)
469 {
470 TRACER_WRITELATER("getExtentMaxMin");
471 TRACER_ADDINPUT(lbid);
472 TRACER_ADDOUTPUT(max);
473 TRACER_ADDOUTPUT(min);
474 TRACER_ADDOUTPUT(seqNum);
475 TRACER_WRITE;
476 }
477
478 #endif
479
480 try
481 {
482 int ret = em->getMaxMin(lbid, max, min, seqNum);
483 return ret;
484 }
485 catch (exception& e)
486 {
487 cerr << e.what() << endl;
488 return false;
489 }
490 }
491
setExtentMaxMin(const LBID_t lbid,const int64_t max,const int64_t min,const int32_t seqNum)492 int DBRM::setExtentMaxMin(const LBID_t lbid, const int64_t max, const int64_t min, const int32_t seqNum) DBRM_THROW
493 {
494 #ifdef BRM_INFO
495
496 if (fDebug)
497 {
498 TRACER_WRITELATER("setExtentMaxMin");
499 TRACER_ADDINPUT(lbid);
500 TRACER_ADDINPUT(max);
501 TRACER_ADDINPUT(min);
502 TRACER_ADDINPUT(seqNum);
503 TRACER_WRITE;
504 }
505
506 #endif
507 ByteStream command, response;
508 uint8_t err;
509
510 command << SETEXTENTMAXMIN << (uint64_t)lbid << (uint64_t)max << (uint64_t)min << (uint64_t)seqNum;
511 err = send_recv(command, response);
512
513 if (err != ERR_OK)
514 return err;
515
516 if (response.length() == 0)
517 return ERR_NETWORK;
518
519 response >> err;
520 CHECK_EMPTY(response);
521 return err;
522
523 }
524
525 // @bug 1970 - Added function below to set multiple extents casual partition info in one call.
setExtentsMaxMin(const CPInfoList_t & cpInfos)526 int DBRM::setExtentsMaxMin(const CPInfoList_t& cpInfos) DBRM_THROW
527 {
528 CPInfoList_t::const_iterator it;
529 #ifdef BRM_INFO
530
531 if (fDebug)
532 {
533 TRACER_WRITELATER("setExtentsMaxMin");
534
535 for (it = cpInfos.begin(); it != cpInfos.end(); it++)
536 {
537 TRACER_ADDINPUT(it->firstLbid);
538 TRACER_ADDINPUT(it->max);
539 TRACER_ADDINPUT(it->min);
540 TRACER_ADDINPUT(it->seqNum);
541 TRACER_WRITE;
542 }
543 }
544
545 #endif
546 ByteStream command, response;
547 uint8_t err = 0;
548
549 if (cpInfos.size() == 0)
550 return err;
551
552 if (cpInfos.empty())
553 return ERR_OK;
554
555 command << SETMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size();
556
557 for (it = cpInfos.begin(); it != cpInfos.end(); it++)
558 {
559 command << (uint64_t)it->firstLbid << (uint64_t)it->max << (uint64_t)it->min << (uint32_t)it->seqNum;
560 }
561
562 err = send_recv(command, response);
563
564 if (err != ERR_OK)
565 return err;
566
567 if (response.length() == 0)
568 return ERR_NETWORK;
569
570 response >> err;
571 CHECK_EMPTY(response);
572 return err;
573 }
574
575 //------------------------------------------------------------------------------
576 // @bug 2117 - Add function to merge Casual Partition info with current extent
577 // map information.
578 //------------------------------------------------------------------------------
mergeExtentsMaxMin(const CPInfoMergeList_t & cpInfos)579 int DBRM::mergeExtentsMaxMin(const CPInfoMergeList_t& cpInfos) DBRM_THROW
580 {
581 CPInfoMergeList_t::const_iterator it;
582 #ifdef BRM_INFO
583
584 if (fDebug)
585 {
586 TRACER_WRITELATER("updateExtentsMaxMin");
587
588 for (it = cpInfos.begin(); it != cpInfos.end(); it++)
589 {
590 TRACER_ADDINPUT(it->startLbid);
591 TRACER_ADDINPUT(it->max);
592 TRACER_ADDINPUT(it->min);
593 TRACER_ADDINPUT(it->seqNum);
594 TRACER_ADDINPUT(it->type);
595 TRACER_ADDINPUT(it->newExtent);
596 TRACER_WRITE;
597 }
598 }
599
600 #endif
601 ByteStream command, response;
602 uint8_t err;
603
604 command << MERGEMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size();
605
606 for (it = cpInfos.begin(); it != cpInfos.end(); it++)
607 {
608 command << (uint64_t)it->startLbid <<
609 (uint64_t)it->max <<
610 (uint64_t)it->min <<
611 (uint32_t)it->seqNum <<
612 (uint32_t)it->type <<
613 (uint32_t)it->newExtent;
614 }
615
616 err = send_recv(command, response);
617
618 if (err != ERR_OK)
619 return err;
620
621 if (response.length() == 0)
622 return ERR_NETWORK;
623
624 response >> err;
625 CHECK_EMPTY(response);
626 return err;
627 }
628
vssLookup(LBID_t lbid,const QueryContext & verInfo,VER_t txnID,VER_t * outVer,bool * vbFlag,bool vbOnly)629 int DBRM::vssLookup(LBID_t lbid, const QueryContext& verInfo, VER_t txnID, VER_t* outVer,
630 bool* vbFlag, bool vbOnly) throw()
631 {
632 #ifdef BRM_INFO
633
634 if (fDebug)
635 {
636 TRACER_WRITELATER("vssLookup");
637 TRACER_ADDINPUT(lbid);
638 TRACER_ADDINPUT(verInfo);
639 TRACER_ADDINPUT(txnID);
640 TRACER_ADDBOOLINPUT(vbOnly);
641 TRACER_WRITE;
642 }
643
644 #endif
645
646 if (!vbOnly && vss->isEmpty())
647 {
648 *outVer = 0;
649 *vbFlag = false;
650 return -1;
651 }
652
653 bool locked = false;
654
655 try
656 {
657 int rc = 0;
658 vss->lock(VSS::READ);
659 locked = true;
660 rc = vss->lookup(lbid, verInfo, txnID, outVer, vbFlag, vbOnly);
661 vss->release(VSS::READ);
662 return rc;
663 }
664 catch (exception& e)
665 {
666 if (locked)
667 vss->release(VSS::READ);
668
669 cerr << e.what() << endl;
670 return -1;
671 }
672 }
673
bulkVSSLookup(const std::vector<LBID_t> & lbids,const QueryContext_vss & verInfo,VER_t txnID,std::vector<VSSData> * out)674 int DBRM::bulkVSSLookup(const std::vector<LBID_t>& lbids, const QueryContext_vss& verInfo,
675 VER_t txnID, std::vector<VSSData>* out)
676 {
677 uint32_t i;
678 bool locked = false;
679
680 try
681 {
682 out->resize(lbids.size());
683 vss->lock(VSS::READ);
684 locked = true;
685
686 if (vss->isEmpty(false))
687 {
688 for (i = 0; i < lbids.size(); i++)
689 {
690 VSSData& vd = (*out)[i];
691 vd.verID = 0;
692 vd.vbFlag = false;
693 vd.returnCode = -1;
694 }
695 }
696 else
697 {
698 for (i = 0; i < lbids.size(); i++)
699 {
700 VSSData& vd = (*out)[i];
701 vd.returnCode = vss->lookup(lbids[i], verInfo, txnID, &vd.verID, &vd.vbFlag, false);
702 }
703 }
704
705 vss->release(VSS::READ);
706 return 0;
707 }
708 catch (exception& e)
709 {
710 cerr << e.what() << endl;
711 }
712 catch (...)
713 {
714 cerr << "bulkVSSLookup: caught an exception" << endl;
715 }
716
717 if (locked)
718 vss->release(VSS::READ);
719
720 out->clear();
721 return -1;
722 }
723
getCurrentVersion(LBID_t lbid,bool * isLocked) const724 VER_t DBRM::getCurrentVersion(LBID_t lbid, bool* isLocked) const
725 {
726 bool locked = false;
727 VER_t ret = 0;
728
729 try
730 {
731 vss->lock(VSS::READ);
732 locked = true;
733 ret = vss->getCurrentVersion(lbid, isLocked);
734 vss->release(VSS::READ);
735 locked = false;
736 }
737 catch (exception& e)
738 {
739 cerr << e.what() << endl;
740
741 if (locked)
742 vss->release(VSS::READ);
743
744 throw;
745 }
746
747 return ret;
748 }
749
bulkGetCurrentVersion(const vector<LBID_t> & lbids,vector<VER_t> * versions,vector<bool> * isLocked) const750 int DBRM::bulkGetCurrentVersion(const vector<LBID_t>& lbids, vector<VER_t>* versions,
751 vector<bool>* isLocked) const
752 {
753 bool locked = false;
754
755 versions->resize(lbids.size());
756
757 if (isLocked != NULL)
758 isLocked->resize(lbids.size());
759
760 try
761 {
762 vss->lock(VSS::READ);
763 locked = true;
764
765 if (isLocked != NULL)
766 {
767 bool tmp = false;
768
769 for (uint32_t i = 0; i < lbids.size(); i++)
770 {
771 (*versions)[i] = vss->getCurrentVersion(lbids[i], &tmp);
772 (*isLocked)[i] = tmp;
773 }
774 }
775 else
776 for (uint32_t i = 0; i < lbids.size(); i++)
777 (*versions)[i] = vss->getCurrentVersion(lbids[i], NULL);
778
779 vss->release(VSS::READ);
780 locked = false;
781 return 0;
782 }
783 catch (exception& e)
784 {
785 versions->clear();
786 cerr << e.what() << endl;
787
788 if (locked)
789 vss->release(VSS::READ);
790
791 return -1;
792 }
793 }
794
795
getHighestVerInVB(LBID_t lbid,VER_t max) const796 VER_t DBRM::getHighestVerInVB(LBID_t lbid, VER_t max) const
797 {
798 bool locked = false;
799 VER_t ret = -1;
800
801 try
802 {
803 vss->lock(VSS::READ);
804 locked = true;
805 ret = vss->getHighestVerInVB(lbid, max);
806 vss->release(VSS::READ);
807 locked = false;
808 }
809 catch (exception& e)
810 {
811 cerr << e.what() << endl;
812
813 if (locked)
814 vss->release(VSS::READ);
815
816 throw;
817 }
818
819 return ret;
820 }
821
isVersioned(LBID_t lbid,VER_t ver) const822 bool DBRM::isVersioned(LBID_t lbid, VER_t ver) const
823 {
824 bool ret = false;
825 bool locked = false;
826
827 try
828 {
829 vss->lock(VSS::READ);
830 locked = true;
831 ret = vss->isVersioned(lbid, ver);
832 vss->release(VSS::READ);
833 locked = false;
834 }
835 catch (exception& e)
836 {
837 cerr << e.what() << endl;
838
839 if (locked)
840 vss->release(VSS::READ);
841
842 throw;
843 }
844
845 return ret;
846 }
847
send_recv(const ByteStream & in,ByteStream & out)848 int8_t DBRM::send_recv(const ByteStream& in, ByteStream& out) throw()
849 {
850 #ifdef BRM_INFO
851
852 if (fDebug) TRACER_WRITENOW("send_recv");
853
854 #endif
855 uint8_t attempt = 1;
856
857 mutex.lock();
858
859 reconnect:
860
861 if (msgClient == NULL)
862 try
863 {
864 msgClient = MessageQueueClientPool::getInstance(masterName);
865 }
866 catch (exception& e)
867 {
868 cerr << "class DBRM failed to create a MessageQueueClient: " <<
869 e.what() << endl;
870 msgClient = NULL;
871 mutex.unlock();
872 return ERR_NETWORK;
873 }
874
875 try
876 {
877 msgClient->write(in);
878 out = msgClient->read();
879 }
880 /* If we add a timeout to the read() call, uncomment this clause
881 catch (SocketClosed &e) {
882 cerr << "DBRM::send_recv: controller node closed the connection" << endl;
883 DO_ERR_NETWORK;
884 }
885 */
886 catch (exception& e)
887 {
888 cerr << "DBRM::send_recv caught: " << e.what() << endl;
889
890 if (attempt++ == 1)
891 {
892 MessageQueueClientPool::releaseInstance(msgClient);
893 msgClient = NULL;
894 goto reconnect;
895 }
896
897 DO_ERR_NETWORK;
898 }
899
900 if (out.length() == 0)
901 {
902 cerr << "DBRM::send_recv: controller node closed the connection" << endl;
903
904 if (attempt <= 2)
905 {
906 MessageQueueClientPool::releaseInstance(msgClient);
907 msgClient = NULL;
908 if (attempt++ > 1)
909 {
910 sleep(3);
911 }
912 goto reconnect;
913 }
914
915 DO_ERR_NETWORK;
916 }
917
918 mutex.unlock();
919 return ERR_OK;
920 }
921
922 //------------------------------------------------------------------------------
923 // Send a request to create a "stripe" of column extents for the specified
924 // column OIDs and DBRoot.
925 //------------------------------------------------------------------------------
createStripeColumnExtents(const std::vector<CreateStripeColumnExtentsArgIn> & cols,uint16_t dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,std::vector<CreateStripeColumnExtentsArgOut> & extents)926 int DBRM::createStripeColumnExtents(
927 const std::vector<CreateStripeColumnExtentsArgIn>& cols,
928 uint16_t dbRoot,
929 uint32_t& partitionNum,
930 uint16_t& segmentNum,
931 std::vector<CreateStripeColumnExtentsArgOut>& extents) DBRM_THROW
932 {
933 #ifdef BRM_INFO
934
935 if (fDebug)
936 {
937 TRACER_WRITELATER("createStripeColumnExtents");
938 TRACER_WRITE;
939 }
940
941 #endif
942
943 ByteStream command, response;
944 uint8_t err;
945 uint16_t tmp16;
946 uint32_t tmp32;
947
948 command << CREATE_STRIPE_COLUMN_EXTENTS;
949 serializeInlineVector(command, cols);
950 command << dbRoot << partitionNum;
951
952 err = send_recv(command, response);
953
954 if (err != ERR_OK)
955 return err;
956
957 if (response.length() == 0)
958 return ERR_NETWORK;
959
960 try
961 {
962 response >> err;
963
964 if (err != 0)
965 return (int) err;
966
967 response >> tmp32;
968 partitionNum = tmp32;
969 response >> tmp16;
970 segmentNum = tmp16;
971 deserializeInlineVector(response, extents);
972 }
973 catch (exception& e)
974 {
975 cerr << e.what() << endl;
976 return ERR_FAILURE;
977 }
978
979 CHECK_EMPTY(response);
980 return 0;
981 }
982
983 //------------------------------------------------------------------------------
984 // Send a request to create a column extent for the specified OID and DBRoot.
985 //------------------------------------------------------------------------------
createColumnExtent_DBroot(OID_t oid,uint32_t colWidth,uint16_t dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,execplan::CalpontSystemCatalog::ColDataType colDataType,LBID_t & lbid,int & allocdSize,uint32_t & startBlockOffset)986 int DBRM::createColumnExtent_DBroot(OID_t oid,
987 uint32_t colWidth,
988 uint16_t dbRoot,
989 uint32_t& partitionNum,
990 uint16_t& segmentNum,
991 execplan::CalpontSystemCatalog::ColDataType colDataType,
992 LBID_t& lbid,
993 int& allocdSize,
994 uint32_t& startBlockOffset) DBRM_THROW
995 {
996 #ifdef BRM_INFO
997
998 if (fDebug)
999 {
1000 TRACER_WRITELATER("createColumnExtent_DBroot");
1001 TRACER_ADDINPUT(oid);
1002 TRACER_ADDINPUT(colWidth);
1003 TRACER_ADDSHORTINPUT(dbRoot);
1004 TRACER_ADDOUTPUT(partitionNum);
1005 TRACER_ADDSHORTOUTPUT(segmentNum);
1006 TRACER_ADDINT64OUTPUT(lbid);
1007 TRACER_ADDOUTPUT(allocdSize);
1008 TRACER_ADDOUTPUT(startBlockOffset);
1009 TRACER_WRITE;
1010 }
1011
1012 #endif
1013
1014 ByteStream command, response;
1015 uint8_t err;
1016 uint32_t tmp8 = (uint8_t)colDataType;
1017 uint16_t tmp16;
1018 uint32_t tmp32;
1019 uint64_t tmp64;
1020
1021 command << CREATE_COLUMN_EXTENT_DBROOT << (ByteStream::quadbyte) oid <<
1022 colWidth << dbRoot << partitionNum << segmentNum << tmp8;
1023 err = send_recv(command, response);
1024
1025 if (err != ERR_OK)
1026 return err;
1027
1028 if (response.length() == 0)
1029 return ERR_NETWORK;
1030
1031 try
1032 {
1033 response >> err;
1034
1035 if (err != 0)
1036 return (int) err;
1037
1038 response >> tmp32;
1039 partitionNum = tmp32;
1040 response >> tmp16;
1041 segmentNum = tmp16;
1042 response >> tmp64;
1043 lbid = (int64_t)tmp64;
1044 response >> tmp32;
1045 allocdSize = (int32_t)tmp32;
1046 response >> tmp32;
1047 startBlockOffset = (int32_t)tmp32;
1048 }
1049 catch (exception& e)
1050 {
1051 cerr << e.what() << endl;
1052 return ERR_FAILURE;
1053 }
1054
1055 CHECK_EMPTY(response);
1056 return 0;
1057 }
1058
1059 //------------------------------------------------------------------------------
1060 // Send a request to create a column extent for the exact segment file
1061 // specified by the requested OID, DBRoot, partition, and segment.
1062 //------------------------------------------------------------------------------
createColumnExtentExactFile(OID_t oid,uint32_t colWidth,uint16_t dbRoot,uint32_t partitionNum,uint16_t segmentNum,execplan::CalpontSystemCatalog::ColDataType colDataType,LBID_t & lbid,int & allocdSize,uint32_t & startBlockOffset)1063 int DBRM::createColumnExtentExactFile(OID_t oid,
1064 uint32_t colWidth,
1065 uint16_t dbRoot,
1066 uint32_t partitionNum,
1067 uint16_t segmentNum,
1068 execplan::CalpontSystemCatalog::ColDataType colDataType,
1069 LBID_t& lbid,
1070 int& allocdSize,
1071 uint32_t& startBlockOffset) DBRM_THROW
1072 {
1073 #ifdef BRM_INFO
1074
1075 if (fDebug)
1076 {
1077 TRACER_WRITELATER("createColumnExtentExactFile");
1078 TRACER_ADDINPUT(oid);
1079 TRACER_ADDINPUT(colWidth);
1080 TRACER_ADDSHORTINPUT(dbRoot);
1081 TRACER_ADDOUTPUT(partitionNum);
1082 TRACER_ADDSHORTOUTPUT(segmentNum);
1083 TRACER_ADDINT64OUTPUT(lbid);
1084 TRACER_ADDOUTPUT(allocdSize);
1085 TRACER_ADDOUTPUT(startBlockOffset);
1086 TRACER_WRITE;
1087 }
1088
1089 #endif
1090
1091 ByteStream command, response;
1092 uint8_t err;
1093 uint8_t tmp8;
1094 uint16_t tmp16;
1095 uint32_t tmp32;
1096 uint64_t tmp64;
1097
1098 tmp8 = (uint8_t)colDataType;
1099 command << CREATE_COLUMN_EXTENT_EXACT_FILE << (ByteStream::quadbyte) oid <<
1100 colWidth << dbRoot << partitionNum << segmentNum << tmp8;
1101 err = send_recv(command, response);
1102
1103 if (err != ERR_OK)
1104 return err;
1105
1106 if (response.length() == 0)
1107 return ERR_NETWORK;
1108
1109 try
1110 {
1111 response >> err;
1112
1113 if (err != 0)
1114 return (int) err;
1115
1116 response >> tmp32;
1117 partitionNum = tmp32;
1118 response >> tmp16;
1119 segmentNum = tmp16;
1120 response >> tmp64;
1121 lbid = (int64_t)tmp64;
1122 response >> tmp32;
1123 allocdSize = (int32_t)tmp32;
1124 response >> tmp32;
1125 startBlockOffset = (int32_t)tmp32;
1126 }
1127 catch (exception& e)
1128 {
1129 cerr << e.what() << endl;
1130 return ERR_FAILURE;
1131 }
1132
1133 CHECK_EMPTY(response);
1134 return 0;
1135 }
1136
1137 //------------------------------------------------------------------------------
1138 // Send a request to create a dictionary store extent.
1139 //------------------------------------------------------------------------------
createDictStoreExtent(OID_t oid,uint16_t dbRoot,uint32_t partitionNum,uint16_t segmentNum,LBID_t & lbid,int & allocdSize)1140 int DBRM::createDictStoreExtent(OID_t oid,
1141 uint16_t dbRoot,
1142 uint32_t partitionNum,
1143 uint16_t segmentNum,
1144 LBID_t& lbid,
1145 int& allocdSize) DBRM_THROW
1146 {
1147 #ifdef BRM_INFO
1148
1149 if (fDebug)
1150 {
1151 TRACER_WRITELATER("createDictStoreExtent");
1152 TRACER_ADDINPUT(oid);
1153 TRACER_ADDSHORTINPUT(dbRoot);
1154 TRACER_ADDINPUT(partitionNum);
1155 TRACER_ADDSHORTINPUT(segmentNum);
1156 TRACER_ADDINT64OUTPUT(lbid);
1157 TRACER_ADDOUTPUT(allocdSize);
1158 TRACER_WRITE;
1159 }
1160
1161 #endif
1162
1163 ByteStream command, response;
1164 uint8_t err;
1165 uint32_t tmp32;
1166 uint64_t tmp64;
1167
1168 command << CREATE_DICT_STORE_EXTENT << (ByteStream::quadbyte) oid << dbRoot <<
1169 partitionNum << segmentNum;
1170 err = send_recv(command, response);
1171
1172 if (err != ERR_OK)
1173 return err;
1174
1175 if (response.length() == 0)
1176 return ERR_NETWORK;
1177
1178 try
1179 {
1180 response >> err;
1181
1182 if (err != 0)
1183 return (int) err;
1184
1185 response >> tmp64;
1186 lbid = (int64_t)tmp64;
1187 response >> tmp32;
1188 allocdSize = (int32_t)tmp32;
1189 }
1190 catch (exception& e)
1191 {
1192 cerr << e.what() << endl;
1193 return ERR_FAILURE;
1194 }
1195
1196 CHECK_EMPTY(response);
1197 return 0;
1198 }
1199
1200 //------------------------------------------------------------------------------
1201 // Send a request to delete a set extents for the specified column OID and
1202 // DBRoot, and to return the extents to the free list. HWMs for the last
1203 // stripe of extents in the specified DBRoot are updated accordingly.
1204 //------------------------------------------------------------------------------
rollbackColumnExtents_DBroot(OID_t oid,bool bDeleteAll,uint16_t dbRoot,uint32_t partitionNum,uint16_t segmentNum,HWM_t hwm)1205 int DBRM::rollbackColumnExtents_DBroot(OID_t oid,
1206 bool bDeleteAll,
1207 uint16_t dbRoot,
1208 uint32_t partitionNum,
1209 uint16_t segmentNum,
1210 HWM_t hwm) DBRM_THROW
1211 {
1212 #ifdef BRM_INFO
1213
1214 if (fDebug)
1215 {
1216 TRACER_WRITELATER("rollbackColumnExtents");
1217 TRACER_ADDINPUT(oid);
1218 TRACER_ADDBOOLINPUT(bDeleteAll);
1219 TRACER_ADDSHORTINPUT(dbRoot);
1220 TRACER_ADDINPUT(partitionNum);
1221 TRACER_ADDSHORTINPUT(segmentNum);
1222 TRACER_ADDINPUT(hwm);
1223 TRACER_WRITE;
1224 }
1225
1226 #endif
1227
1228 ByteStream command, response;
1229 uint8_t err;
1230
1231 command << ROLLBACK_COLUMN_EXTENTS_DBROOT << (ByteStream::quadbyte) oid <<
1232 (uint8_t)bDeleteAll << dbRoot << partitionNum <<
1233 segmentNum << hwm;
1234 err = send_recv(command, response);
1235
1236 if (err != ERR_OK)
1237 return err;
1238
1239 if (response.length() != 1)
1240 return ERR_NETWORK;
1241
1242 response >> err;
1243 CHECK_EMPTY(response);
1244 return err;
1245 }
1246
1247 //------------------------------------------------------------------------------
1248 // Send a request to delete a set of extents for the specified dictionary store
1249 // OID and DBRoot, and to return the extents to the free list. HWMs for the
1250 // last stripe of extents are updated accordingly.
1251 //------------------------------------------------------------------------------
rollbackDictStoreExtents_DBroot(OID_t oid,uint16_t dbRoot,uint32_t partitionNum,const vector<uint16_t> & segNums,const vector<HWM_t> & hwms)1252 int DBRM::rollbackDictStoreExtents_DBroot(OID_t oid,
1253 uint16_t dbRoot,
1254 uint32_t partitionNum,
1255 const vector<uint16_t>& segNums,
1256 const vector<HWM_t>& hwms) DBRM_THROW
1257 {
1258 #ifdef BRM_INFO
1259
1260 if (fDebug)
1261 {
1262 TRACER_WRITELATER("rollbackDictStoreExtents");
1263 TRACER_ADDINPUT(oid);
1264 TRACER_ADDSHORTINPUT(dbRoot);
1265 TRACER_ADDINPUT(partitionNum);
1266 TRACER_WRITE;
1267 }
1268
1269 #endif
1270
1271 ByteStream command, response;
1272 uint8_t err;
1273
1274 command << ROLLBACK_DICT_STORE_EXTENTS_DBROOT <<
1275 (ByteStream::quadbyte) oid <<
1276 dbRoot << partitionNum;
1277 serializeVector(command, segNums);
1278 serializeVector(command, hwms);
1279 err = send_recv(command, response);
1280
1281 if (err != ERR_OK)
1282 return err;
1283
1284 if (response.length() != 1)
1285 return ERR_NETWORK;
1286
1287 response >> err;
1288 CHECK_EMPTY(response);
1289 return err;
1290 }
1291
deleteEmptyColExtents(const std::vector<ExtentInfo> & extentsInfo)1292 int DBRM::deleteEmptyColExtents(const std::vector<ExtentInfo>& extentsInfo) DBRM_THROW
1293 {
1294 #ifdef BRM_INFO
1295
1296 if (fDebug)
1297 {
1298 TRACER_WRITELATER("deleteEmptyColExtents");
1299 TRACER_WRITE;
1300 }
1301
1302 #endif
1303
1304 ByteStream command, response;
1305 uint8_t err;
1306 uint32_t size = extentsInfo.size();
1307 command << DELETE_EMPTY_COL_EXTENTS;
1308 command << size;
1309
1310 for ( unsigned i = 0; i < extentsInfo.size(); i++)
1311 {
1312 command << (ByteStream::quadbyte) extentsInfo[i].oid;
1313 command << extentsInfo[i].partitionNum;
1314 command << extentsInfo[i].segmentNum;
1315 command << extentsInfo[i].dbRoot;
1316 command << extentsInfo[i].hwm;
1317 }
1318
1319 err = send_recv(command, response);
1320
1321 if (err != ERR_OK)
1322 return err;
1323
1324 if (response.length() != 1)
1325 return ERR_NETWORK;
1326
1327 response >> err;
1328 CHECK_EMPTY(response);
1329 return err;
1330 }
1331
deleteEmptyDictStoreExtents(const std::vector<ExtentInfo> & extentsInfo)1332 int DBRM::deleteEmptyDictStoreExtents(const std::vector<ExtentInfo>& extentsInfo) DBRM_THROW
1333 {
1334 #ifdef BRM_INFO
1335
1336 if (fDebug)
1337 {
1338 TRACER_WRITELATER("deleteEmptyDictStoreExtents");
1339 TRACER_WRITE;
1340 }
1341
1342 #endif
1343
1344 ByteStream command, response;
1345 uint8_t err;
1346 uint32_t size = extentsInfo.size();
1347 command << DELETE_EMPTY_DICT_STORE_EXTENTS;
1348 command << size;
1349
1350 for ( unsigned i = 0; i < extentsInfo.size(); i++)
1351 {
1352 command << (ByteStream::quadbyte) extentsInfo[i].oid;
1353 command << extentsInfo[i].partitionNum;
1354 command << extentsInfo[i].segmentNum;
1355 command << extentsInfo[i].dbRoot;
1356 command << extentsInfo[i].hwm;
1357 command << (uint8_t)extentsInfo[i].newFile;
1358 }
1359
1360 err = send_recv(command, response);
1361
1362 if (err != ERR_OK)
1363 return err;
1364
1365 if (response.length() != 1)
1366 return ERR_NETWORK;
1367
1368 response >> err;
1369 CHECK_EMPTY(response);
1370 return err;
1371 }
1372
deleteOID(OID_t oid)1373 int DBRM::deleteOID(OID_t oid) DBRM_THROW
1374 {
1375 #ifdef BRM_INFO
1376
1377 if (fDebug)
1378 {
1379 TRACER_WRITELATER("deleteOID");
1380 TRACER_ADDINPUT(oid);
1381 TRACER_WRITE;
1382 }
1383
1384 #endif
1385
1386 ByteStream command, response;
1387 uint8_t err;
1388
1389 command << DELETE_OID << (ByteStream::quadbyte) oid;
1390 err = send_recv(command, response);
1391
1392 if (err != ERR_OK)
1393 return err;
1394
1395 if (response.length() != 1)
1396 return ERR_NETWORK;
1397
1398 response >> err;
1399 CHECK_EMPTY(response);
1400
1401 try
1402 {
1403 deleteAISequence(oid);
1404 }
1405 catch (...) { } // an error here means a network problem, will be caught elsewhere
1406
1407 return err;
1408 }
1409
deleteOIDs(const std::vector<OID_t> & oids)1410 int DBRM::deleteOIDs(const std::vector<OID_t>& oids) DBRM_THROW
1411 {
1412 #ifdef BRM_INFO
1413
1414 if (fDebug)
1415 {
1416 TRACER_WRITELATER("deleteOIDs");
1417 TRACER_WRITE;
1418 }
1419
1420 #endif
1421
1422 ByteStream command, response;
1423 uint8_t err;
1424 uint32_t size = oids.size();
1425 command << DELETE_OIDS;
1426 command << size;
1427
1428 for ( unsigned i = 0; i < oids.size(); i++)
1429 {
1430 command << (ByteStream::quadbyte) oids[i];
1431 }
1432
1433 err = send_recv(command, response);
1434
1435 if (err != ERR_OK)
1436 return err;
1437
1438 if (response.length() != 1)
1439 return ERR_NETWORK;
1440
1441 response >> err;
1442 CHECK_EMPTY(response);
1443
1444 try
1445 {
1446 for (uint32_t i = 0; i < oids.size(); i++)
1447 deleteAISequence(oids[i]);
1448 }
1449 catch (...) { } // an error here means a network problem, will be caught elsewhere
1450
1451 return err;
1452 }
1453
1454 //------------------------------------------------------------------------------
1455 // Return the last local HWM for the specified OID and DBroot. The corresponding
1456 // partition number, and segment number are returned as well. This function
1457 // can be used by cpimport for example to find out where the current "end-of-
1458 // data" is, so that cpimport will know where to begin adding new rows.
1459 // If no available or outOfService extent is found, then bFound is returned
1460 // as false.
1461 //------------------------------------------------------------------------------
getLastHWM_DBroot(int oid,uint16_t dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,HWM_t & hwm,int & status,bool & bFound)1462 int DBRM::getLastHWM_DBroot(int oid, uint16_t dbRoot, uint32_t& partitionNum,
1463 uint16_t& segmentNum, HWM_t& hwm,
1464 int& status, bool& bFound) throw()
1465 {
1466 #ifdef BRM_INFO
1467
1468 if (fDebug)
1469 {
1470 TRACER_WRITELATER("getLastHWM_DBroot");
1471 TRACER_ADDINPUT(oid);
1472 TRACER_ADDSHORTOUTPUT(dbRoot);
1473 TRACER_ADDOUTPUT(partitionNum);
1474 TRACER_ADDSHORTOUTPUT(segmentNum);
1475 TRACER_ADDOUTPUT(hwm);
1476 TRACER_ADDOUTPUT(status);
1477 TRACER_WRITE;
1478 }
1479
1480 #endif
1481
1482 try
1483 {
1484 hwm = em->getLastHWM_DBroot(oid, dbRoot, partitionNum, segmentNum,
1485 status, bFound);
1486 }
1487 catch (exception& e)
1488 {
1489 return ERR_FAILURE;
1490 }
1491
1492 return ERR_OK;
1493 }
1494
1495 //------------------------------------------------------------------------------
1496 // Return the HWM for the specified OID, partition number, and segment number.
1497 // This is used to get the HWM for a particular dictionary segment store file,
1498 // or a specific column segment file.
1499 //------------------------------------------------------------------------------
getLocalHWM(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,HWM_t & hwm,int & status)1500 int DBRM::getLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum,
1501 HWM_t& hwm, int& status) throw()
1502 {
1503 #ifdef BRM_INFO
1504
1505 if (fDebug)
1506 {
1507 TRACER_WRITELATER("getLocalHWM");
1508 TRACER_ADDINPUT(oid);
1509 TRACER_ADDINPUT(partitionNum);
1510 TRACER_ADDSHORTINPUT(segmentNum);
1511 TRACER_ADDOUTPUT(hwm);
1512 TRACER_ADDOUTPUT(status);
1513 TRACER_WRITE;
1514 }
1515
1516 #endif
1517
1518 try
1519 {
1520 hwm = em->getLocalHWM(oid, partitionNum, segmentNum, status);
1521 }
1522 catch (exception& e)
1523 {
1524 cerr << e.what() << endl;
1525 return ERR_FAILURE;
1526 }
1527
1528 return ERR_OK;
1529 }
1530
1531 //------------------------------------------------------------------------------
1532 // Set the local HWM for the file referenced by the specified OID, partition
1533 // number, and segment number.
1534 //------------------------------------------------------------------------------
setLocalHWM(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,HWM_t hwm)1535 int DBRM::setLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum,
1536 HWM_t hwm) DBRM_THROW
1537 {
1538 #ifdef BRM_INFO
1539
1540 if (fDebug)
1541 {
1542 TRACER_WRITELATER("setLocalHWM");
1543 TRACER_ADDINPUT(oid);
1544 TRACER_ADDINPUT(partitionNum);
1545 TRACER_ADDSHORTINPUT(segmentNum);
1546 TRACER_ADDINPUT(hwm);
1547 TRACER_WRITE;
1548 }
1549
1550 #endif
1551
1552 ByteStream command, response;
1553 uint8_t err;
1554
1555 command << SET_LOCAL_HWM << (ByteStream::quadbyte) oid << partitionNum << segmentNum << hwm;
1556 err = send_recv(command, response);
1557
1558 if (err != ERR_OK)
1559 return err;
1560
1561 if (response.length() != 1)
1562 return ERR_NETWORK;
1563
1564 response >> err;
1565 CHECK_EMPTY(response);
1566 return err;
1567 }
1568
bulkSetHWM(const vector<BulkSetHWMArg> & v,VER_t transID)1569 int DBRM::bulkSetHWM(const vector<BulkSetHWMArg>& v, VER_t transID) DBRM_THROW
1570 {
1571 #ifdef BRM_INFO
1572
1573 if (fDebug)
1574 {
1575 TRACER_WRITELATER("bulkSetHWM");
1576 TRACER_WRITE;
1577 }
1578
1579 #endif
1580
1581 ByteStream command, response;
1582 uint8_t err;
1583
1584 command << BULK_SET_HWM;
1585 serializeInlineVector(command, v);
1586 command << (uint32_t) transID;
1587 err = send_recv(command, response);
1588
1589 if (err != ERR_OK)
1590 return err;
1591
1592 if (response.length() != 1)
1593 return ERR_NETWORK;
1594
1595 response >> err;
1596 CHECK_EMPTY(response);
1597 return err;
1598 }
1599
bulkSetHWMAndCP(const vector<BulkSetHWMArg> & v,const vector<CPInfo> & setCPDataArgs,const vector<CPInfoMerge> & mergeCPDataArgs,VER_t transID)1600 int DBRM::bulkSetHWMAndCP(const vector<BulkSetHWMArg>& v, const vector<CPInfo>& setCPDataArgs,
1601 const vector<CPInfoMerge>& mergeCPDataArgs, VER_t transID) DBRM_THROW
1602 {
1603 #ifdef BRM_INFO
1604
1605 if (fDebug)
1606 {
1607 TRACER_WRITELATER("bulkSetHWMAndCP");
1608 TRACER_WRITE;
1609 }
1610
1611 #endif
1612
1613 ByteStream command, response;
1614 uint8_t err;
1615
1616 command << BULK_SET_HWM_AND_CP;
1617 serializeInlineVector(command, v);
1618 serializeInlineVector(command, setCPDataArgs);
1619 serializeInlineVector(command, mergeCPDataArgs);
1620 command << (uint32_t) transID;
1621 err = send_recv(command, response);
1622
1623 if (err != ERR_OK)
1624 return err;
1625
1626 if (response.length() != 1)
1627 return ERR_NETWORK;
1628
1629 response >> err;
1630 CHECK_EMPTY(response);
1631 return err;
1632 }
1633
bulkUpdateDBRoot(const vector<BulkUpdateDBRootArg> & args)1634 int DBRM::bulkUpdateDBRoot(const vector<BulkUpdateDBRootArg>& args)
1635 {
1636 #ifdef BRM_INFO
1637
1638 if (fDebug)
1639 {
1640 TRACER_WRITELATER("bulkUpdateDBRoot");
1641 TRACER_WRITE;
1642 }
1643
1644 #endif
1645
1646 ByteStream command, response;
1647 uint8_t err;
1648
1649 command << BULK_UPDATE_DBROOT;
1650 serializeInlineVector(command, args);
1651 err = send_recv(command, response);
1652
1653 if (err != ERR_OK)
1654 return err;
1655
1656 if (response.length() != 1)
1657 return ERR_NETWORK;
1658
1659 response >> err;
1660 CHECK_EMPTY(response);
1661 return err;
1662 }
1663
1664
1665 //------------------------------------------------------------------------------
1666 // For the specified OID and PM number, this function will return a vector
1667 // of objects carrying HWM info (for the last segment file) and block count
1668 // information about each DBRoot assigned to the specified PM.
1669 //------------------------------------------------------------------------------
getDbRootHWMInfo(OID_t oid,uint16_t pmNumber,EmDbRootHWMInfo_v & emDBRootHwmInfos)1670 int DBRM::getDbRootHWMInfo(OID_t oid, uint16_t pmNumber,
1671 EmDbRootHWMInfo_v& emDBRootHwmInfos) throw()
1672 {
1673 #ifdef BRM_INFO
1674
1675 if (fDebug)
1676 {
1677 TRACER_WRITELATER("getDbRootHWMInfo");
1678 TRACER_ADDINPUT(oid);
1679 TRACER_ADDSHORTINPUT(pmNumber);
1680 TRACER_WRITE;
1681 }
1682
1683 #endif
1684
1685 try
1686 {
1687 em->getDbRootHWMInfo(oid, pmNumber, emDBRootHwmInfos);
1688 }
1689 catch (exception& e)
1690 {
1691 cerr << e.what() << endl;
1692 return ERR_FAILURE;
1693 }
1694
1695 return ERR_OK;
1696 }
1697
1698 //------------------------------------------------------------------------------
1699 // Return the status or state of the extents in the segment file specified
1700 // by the arguments: oid, partitionNum, and segment Num.
1701 //------------------------------------------------------------------------------
getExtentState(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,bool & bFound,int & status)1702 int DBRM::getExtentState(OID_t oid, uint32_t partitionNum,
1703 uint16_t segmentNum, bool& bFound, int& status) throw()
1704 {
1705 #ifdef BRM_INFO
1706
1707 if (fDebug)
1708 {
1709 TRACER_WRITELATER("getExtentState");
1710 TRACER_ADDINPUT(oid);
1711 TRACER_ADDINPUT(partitionNum);
1712 TRACER_ADDSHORTINPUT(segmentNum);
1713 TRACER_ADDOUTPUT(status);
1714 TRACER_WRITE;
1715 }
1716
1717 #endif
1718
1719 try
1720 {
1721 em->getExtentState(oid, partitionNum,
1722 segmentNum, bFound, status);
1723 }
1724 catch (exception& e)
1725 {
1726 cerr << e.what() << endl;
1727 return ERR_FAILURE;
1728 }
1729
1730 return ERR_OK;
1731 }
1732
1733 // dmc-should eventually deprecate
getExtentSize()1734 int DBRM::getExtentSize() throw()
1735 {
1736 #ifdef BRM_INFO
1737
1738 if (fDebug) TRACER_WRITENOW("getExtentSize");
1739
1740 #endif
1741 return em->getExtentSize();
1742 }
1743
getExtentRows()1744 unsigned DBRM::getExtentRows() throw()
1745 {
1746 #ifdef BRM_INFO
1747
1748 if (fDebug) TRACER_WRITENOW("getExtentRows");
1749
1750 #endif
1751 return em->getExtentRows();
1752 }
1753
getExtents(int OID,std::vector<struct EMEntry> & entries,bool sorted,bool notFoundErr,bool incOutOfService)1754 int DBRM::getExtents(int OID, std::vector<struct EMEntry>& entries,
1755 bool sorted, bool notFoundErr, bool incOutOfService)
1756 {
1757 #ifdef BRM_INFO
1758
1759 if (fDebug)
1760 {
1761 TRACER_WRITELATER("getExtents");
1762 TRACER_ADDINPUT(OID);
1763 TRACER_WRITE;
1764 }
1765
1766 #endif
1767
1768 try
1769 {
1770 em->getExtents(OID, entries, sorted, notFoundErr, incOutOfService);
1771 }
1772 catch (exception& e)
1773 {
1774 cerr << e.what() << endl;
1775 return -1;
1776 }
1777
1778 return 0;
1779 }
1780
getExtents_dbroot(int OID,std::vector<struct EMEntry> & entries,const uint16_t dbroot)1781 int DBRM::getExtents_dbroot(int OID, std::vector<struct EMEntry>& entries,
1782 const uint16_t dbroot) throw()
1783 {
1784 #ifdef BRM_INFO
1785
1786 if (fDebug)
1787 {
1788 TRACER_WRITELATER("getExtents_dbroot");
1789 TRACER_ADDINPUT(OID);
1790 TRACER_WRITE;
1791 }
1792
1793 #endif
1794
1795 try
1796 {
1797 em->getExtents_dbroot(OID, entries, dbroot);
1798 }
1799 catch (exception& e)
1800 {
1801 cerr << e.what() << endl;
1802 return -1;
1803 }
1804
1805 return 0;
1806 }
1807
1808 //------------------------------------------------------------------------------
1809 // Return the number of extents for the specified OID and DBRoot.
1810 // Any out-of-service extents can optionally be included or excluded.
1811 //------------------------------------------------------------------------------
getExtentCount_dbroot(int OID,uint16_t dbroot,bool incOutOfService,uint64_t & numExtents)1812 int DBRM::getExtentCount_dbroot(int OID, uint16_t dbroot,
1813 bool incOutOfService, uint64_t& numExtents) throw()
1814 {
1815 #ifdef BRM_INFO
1816
1817 if (fDebug)
1818 {
1819 TRACER_WRITELATER("getExtentCount_dbroot");
1820 TRACER_ADDINPUT(OID);
1821 TRACER_WRITE;
1822 }
1823
1824 #endif
1825
1826 try
1827 {
1828 em->getExtentCount_dbroot(OID, dbroot, incOutOfService, numExtents);
1829 }
1830 catch (exception& e)
1831 {
1832 cerr << e.what() << endl;
1833 return -1;
1834 }
1835
1836 return 0;
1837 }
1838
1839 //------------------------------------------------------------------------------
1840 // Gets the DBRoot for the specified system catalog OID.
1841 // Function assumes the specified System Catalog OID is fully contained on
1842 // a single DBRoot, as the function only searches for and returns the first
1843 // DBRoot entry that is found in the extent map.
1844 //------------------------------------------------------------------------------
getSysCatDBRoot(OID_t oid,uint16_t & dbRoot)1845 int DBRM::getSysCatDBRoot(OID_t oid, uint16_t& dbRoot) throw()
1846 {
1847 #ifdef BRM_INFO
1848
1849 if (fDebug)
1850 {
1851 TRACER_WRITELATER("getSysCatDBRoot");
1852 TRACER_ADDINPUT(oid);
1853 TRACER_ADDSHORTOUTPUT(dbRoot);
1854 TRACER_WRITE;
1855 }
1856
1857 #endif
1858
1859 try
1860 {
1861 em->getSysCatDBRoot(oid, dbRoot);
1862 }
1863 catch (exception& e)
1864 {
1865 cerr << e.what() << endl;
1866 return -1;
1867 }
1868
1869 return 0;
1870 }
1871
1872 //------------------------------------------------------------------------------
1873 // Delete all extents for the specified OID(s) and partition number.
1874 //------------------------------------------------------------------------------
deletePartition(const std::vector<OID_t> & oids,const std::set<LogicalPartition> & partitionNums,string & emsg)1875 int DBRM::deletePartition(const std::vector<OID_t>& oids,
1876 const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
1877 {
1878 #ifdef BRM_INFO
1879
1880 if (fDebug)
1881 {
1882 TRACER_WRITENOW("deletePartition");
1883 std::ostringstream oss;
1884 oss << "partitionNum: ";
1885 std::set<LogicalPartition>::const_iterator partIt;
1886
1887 for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
1888 oss << (*partIt) << " ";
1889 oss << "; OIDS: ";
1890
1891 std::vector<OID_t>::const_iterator it;
1892
1893 for (it = oids.begin(); it != oids.end(); ++it)
1894 {
1895 oss << (*it) << ", ";
1896 }
1897
1898 TRACER_WRITEDIRECT(oss.str());
1899 }
1900
1901 #endif
1902
1903 ByteStream command, response;
1904 uint8_t err;
1905 command << DELETE_PARTITION;
1906 serializeSet<LogicalPartition>(command, partitionNums);
1907 uint32_t oidSize = oids.size();
1908 command << oidSize;
1909
1910 for ( unsigned i = 0; i < oidSize; i++)
1911 command << (ByteStream::quadbyte) oids[i];
1912
1913 err = send_recv(command, response);
1914
1915 if (err != ERR_OK)
1916 return err;
1917
1918 if (response.length() == 0)
1919 return ERR_NETWORK;
1920
1921 response >> err;
1922
1923 if (err != 0)
1924 response >> emsg;
1925
1926 CHECK_EMPTY(response);
1927 return err;
1928 }
1929
1930 //------------------------------------------------------------------------------
1931 // Mark all extents as out of service, for the specified OID(s) and partition
1932 // number.
1933 //------------------------------------------------------------------------------
markPartitionForDeletion(const std::vector<OID_t> & oids,const std::set<LogicalPartition> & partitionNums,string & emsg)1934 int DBRM::markPartitionForDeletion(const std::vector<OID_t>& oids,
1935 const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
1936 {
1937 #ifdef BRM_INFO
1938
1939 if (fDebug)
1940 {
1941 TRACER_WRITENOW("markPartitionForDeletion");
1942 std::ostringstream oss;
1943 oss << "partitionNum: ";
1944 std::set<LogicalPartition>::const_iterator partIt;
1945
1946 for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
1947 oss << (*partIt) << " ";
1948 oss << "; OIDS: ";
1949
1950 std::vector<OID_t>::const_iterator it;
1951
1952 for (it = oids.begin(); it != oids.end(); ++it)
1953 {
1954 oss << (*it) << ", ";
1955 }
1956
1957 TRACER_WRITEDIRECT(oss.str());
1958 }
1959
1960 #endif
1961
1962 ByteStream command, response;
1963 uint8_t err;
1964 command << MARK_PARTITION_FOR_DELETION;
1965 serializeSet<LogicalPartition>(command, partitionNums);
1966 uint32_t oidSize = oids.size();
1967 command << oidSize;
1968
1969 for ( unsigned i = 0; i < oidSize; i++)
1970 command << (ByteStream::quadbyte) oids[i];
1971
1972 err = send_recv(command, response);
1973
1974 if (err != ERR_OK)
1975 return err;
1976
1977 if (response.length() == 0)
1978 return ERR_NETWORK;
1979
1980 response >> err;
1981
1982 if (err)
1983 response >> emsg;
1984
1985 CHECK_EMPTY(response);
1986 return err;
1987 }
1988
1989 //------------------------------------------------------------------------------
1990 // Mark all extents as out of service, for the specified OID(s)
1991 //------------------------------------------------------------------------------
markAllPartitionForDeletion(const std::vector<OID_t> & oids)1992 int DBRM::markAllPartitionForDeletion(const std::vector<OID_t>& oids) DBRM_THROW
1993 {
1994 #ifdef BRM_INFO
1995
1996 if (fDebug)
1997 {
1998 TRACER_WRITENOW("markAllPartitionForDeletion");
1999 std::ostringstream oss;
2000 oss << "OIDS: ";
2001 std::vector<OID_t>::const_iterator it;
2002
2003 for (it = oids.begin(); it != oids.end(); ++it)
2004 {
2005 oss << (*it) << ", ";
2006 }
2007
2008 TRACER_WRITEDIRECT(oss.str());
2009 }
2010
2011 #endif
2012
2013 ByteStream command, response;
2014 uint8_t err;
2015 uint32_t size = oids.size();
2016
2017 command << MARK_ALL_PARTITION_FOR_DELETION << size;
2018
2019 for ( unsigned i = 0; i < size; i++)
2020 {
2021 command << (ByteStream::quadbyte) oids[i];
2022 }
2023
2024 err = send_recv(command, response);
2025
2026 if (err != ERR_OK)
2027 return err;
2028
2029 if (response.length() != 1)
2030 return ERR_NETWORK;
2031
2032 response >> err;
2033 CHECK_EMPTY(response);
2034 return err;
2035 }
2036
2037 //------------------------------------------------------------------------------
2038 // Restore all extents for the specified OID(s) and partition number.
2039 //------------------------------------------------------------------------------
restorePartition(const std::vector<OID_t> & oids,const std::set<LogicalPartition> & partitionNums,string & emsg)2040 int DBRM::restorePartition(const std::vector<OID_t>& oids,
2041 const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
2042 {
2043 #ifdef BRM_INFO
2044
2045 if (fDebug)
2046 {
2047 TRACER_WRITENOW("restorePartition");
2048 std::ostringstream oss;
2049 oss << "partitionNum: ";
2050 std::set<LogicalPartition>::const_iterator partIt;
2051
2052 for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
2053 oss << (*partIt) << " ";
2054 oss << "; OIDS: ";
2055
2056 std::vector<OID_t>::const_iterator it;
2057
2058 for (it = oids.begin(); it != oids.end(); ++it)
2059 {
2060 oss << (*it) << ", ";
2061 }
2062
2063 TRACER_WRITEDIRECT(oss.str());
2064 }
2065
2066 #endif
2067
2068 ByteStream command, response;
2069 uint8_t err;
2070
2071 command << RESTORE_PARTITION;
2072 serializeSet<LogicalPartition>(command, partitionNums);
2073 uint32_t oidSize = oids.size();
2074 command << oidSize;
2075
2076 for ( unsigned i = 0; i < oidSize; i++)
2077 command << (ByteStream::quadbyte) oids[i];
2078
2079 err = send_recv(command, response);
2080
2081 if (err != ERR_OK)
2082 return err;
2083
2084 if (response.length() == 0)
2085 return ERR_NETWORK;
2086
2087 response >> err;
2088
2089 if (err)
2090 response >> emsg;
2091
2092 CHECK_EMPTY(response);
2093 return err;
2094 }
2095
2096 //------------------------------------------------------------------------------
2097 // Return all the out-of-service partitions for the specified OID.
2098 //------------------------------------------------------------------------------
getOutOfServicePartitions(OID_t oid,std::set<LogicalPartition> & partitionNums)2099 int DBRM::getOutOfServicePartitions(OID_t oid,
2100 std::set<LogicalPartition>& partitionNums) throw()
2101 {
2102 #ifdef BRM_INFO
2103
2104 if (fDebug)
2105 {
2106 TRACER_WRITELATER("getOutOfServicePartitions");
2107 TRACER_ADDINPUT(oid);
2108 TRACER_WRITE;
2109 }
2110
2111 #endif
2112
2113 try
2114 {
2115 em->getOutOfServicePartitions(oid, partitionNums);
2116 }
2117 catch (exception& e)
2118 {
2119 cerr << e.what() << endl;
2120 return -1;
2121 }
2122
2123 return ERR_OK;
2124 }
2125
2126 //------------------------------------------------------------------------------
2127 // Delete all extents for the specified DBRoot
2128 //------------------------------------------------------------------------------
deleteDBRoot(uint16_t dbroot)2129 int DBRM::deleteDBRoot(uint16_t dbroot) DBRM_THROW
2130 {
2131 #ifdef BRM_INFO
2132
2133 if (fDebug)
2134 {
2135 TRACER_WRITENOW("deleteDBRoot");
2136 std::ostringstream oss;
2137 oss << "DBRoot: " << dbroot;
2138 TRACER_WRITEDIRECT(oss.str());
2139 }
2140
2141 #endif
2142
2143 ByteStream command, response;
2144 uint8_t err;
2145 command << DELETE_DBROOT;
2146 uint32_t q = static_cast<uint32_t>(dbroot);
2147 command << q;
2148 err = send_recv(command, response);
2149
2150 if (err != ERR_OK)
2151 return err;
2152
2153 if (response.length() == 0)
2154 return ERR_NETWORK;
2155
2156 response >> err;
2157 CHECK_EMPTY(response);
2158 return err;
2159 }
2160
2161 //------------------------------------------------------------------------------
2162 // Does the specified DBRoot have any extents.
2163 // Returns an error if extentmap shared memory is not loaded.
2164 //------------------------------------------------------------------------------
isDBRootEmpty(uint16_t dbroot,bool & isEmpty,std::string & errMsg)2165 int DBRM::isDBRootEmpty(uint16_t dbroot,
2166 bool& isEmpty, std::string& errMsg) throw()
2167 {
2168 #ifdef BRM_INFO
2169
2170 if (fDebug)
2171 {
2172 TRACER_WRITELATER("isDBRootEmpty");
2173 TRACER_ADDINPUT(dbroot);
2174 TRACER_WRITE;
2175 }
2176
2177 #endif
2178
2179 errMsg.clear();
2180
2181 try
2182 {
2183 isEmpty = em->isDBRootEmpty(dbroot);
2184 }
2185 catch (exception& e)
2186 {
2187 cerr << e.what() << endl;
2188 errMsg = e.what();
2189 return ERR_FAILURE;
2190 }
2191
2192 return ERR_OK;
2193 }
2194
writeVBEntry(VER_t transID,LBID_t lbid,OID_t vbOID,uint32_t vbFBO)2195 int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
2196 uint32_t vbFBO) DBRM_THROW
2197 {
2198
2199 #ifdef BRM_INFO
2200
2201 if (fDebug)
2202 {
2203 TRACER_WRITELATER("writeVBEntry");
2204 TRACER_ADDINPUT(transID);
2205 TRACER_ADDINPUT(lbid);
2206 TRACER_ADDINPUT(vbOID);
2207 TRACER_ADDINPUT(vbFBO);
2208 TRACER_WRITE;
2209 }
2210
2211 #endif
2212
2213 ByteStream command, response;
2214 uint8_t err;
2215
2216 command << WRITE_VB_ENTRY << (uint32_t) transID << (uint64_t) lbid << (uint32_t) vbOID << vbFBO;
2217 err = send_recv(command, response);
2218
2219 if (err != ERR_OK)
2220 return err;
2221
2222 if (response.length() != 1)
2223 return ERR_NETWORK;
2224
2225 response >> err;
2226 CHECK_EMPTY(response);
2227 return err;
2228 }
2229
bulkWriteVBEntry(VER_t transID,const std::vector<BRM::LBID_t> & lbids,OID_t vbOID,const std::vector<uint32_t> & vbFBOs)2230 int DBRM::bulkWriteVBEntry(VER_t transID,
2231 const std::vector<BRM::LBID_t>& lbids,
2232 OID_t vbOID,
2233 const std::vector<uint32_t>& vbFBOs) DBRM_THROW
2234 {
2235
2236 #ifdef BRM_INFO
2237
2238 if (fDebug)
2239 {
2240 TRACER_WRITELATER("bulkWriteVBEntry");
2241 TRACER_WRITE;
2242 }
2243
2244 #endif
2245
2246 ByteStream command, response;
2247 uint8_t err;
2248
2249 command << BULK_WRITE_VB_ENTRY << (uint32_t) transID;
2250 serializeInlineVector(command, lbids);
2251 command << (uint32_t) vbOID;
2252 serializeInlineVector(command, vbFBOs);
2253 err = send_recv(command, response);
2254
2255 if (err != ERR_OK)
2256 return err;
2257
2258 if (response.length() != 1)
2259 return ERR_NETWORK;
2260
2261 response >> err;
2262 CHECK_EMPTY(response);
2263 return err;
2264 }
2265
2266 struct _entry
2267 {
_entryBRM::_entry2268 _entry(LBID_t l) : lbid(l) { };
2269 LBID_t lbid;
operator <BRM::_entry2270 inline bool operator<(const _entry& e) const
2271 {
2272 return ((e.lbid >> 10) < (lbid >> 10));
2273 }
2274 };
2275
getDBRootsForRollback(VER_t transID,vector<uint16_t> * dbroots)2276 int DBRM::getDBRootsForRollback(VER_t transID, vector<uint16_t>* dbroots) throw()
2277 {
2278 #ifdef BRM_INFO
2279
2280 if (fDebug)
2281 {
2282 TRACER_WRITELATER("getDBRootsForRollback");
2283 TRACER_ADDINPUT(transID);
2284 TRACER_WRITE;
2285 }
2286
2287 #endif
2288 bool locked[2] = {false, false};
2289 set<OID_t> vbOIDs;
2290 set<OID_t>::iterator vbIt;
2291 vector<LBID_t> lbidList;
2292 uint32_t i, size;
2293 uint32_t tmp32;
2294 OID_t vbOID;
2295 int err;
2296
2297 set<_entry> lbidPruner;
2298 set<_entry>::iterator it;
2299
2300 try
2301 {
2302 vbbm->lock(VBBM::READ);
2303 locked[0] = true;
2304 vss->lock(VSS::READ);
2305 locked[1] = true;
2306
2307 vss->getUncommittedLBIDs(transID, lbidList);
2308
2309 // prune the list; will leave at most 1 entry per 1024-lbid range
2310 for (i = 0, size = lbidList.size(); i < size; i++)
2311 lbidPruner.insert(_entry(lbidList[i]));
2312
2313 // get the VB oids
2314 for (it = lbidPruner.begin(); it != lbidPruner.end(); ++it)
2315 {
2316 err = vbbm->lookup(it->lbid, transID, vbOID, tmp32);
2317
2318 if (err) // this error will be caught by DML; more appropriate to handle it there
2319 continue;
2320
2321 vbOIDs.insert(vbOID);
2322 }
2323
2324 // get the dbroots
2325 for (vbIt = vbOIDs.begin(); vbIt != vbOIDs.end(); ++vbIt)
2326 {
2327 err = getDBRootOfVBOID(*vbIt);
2328
2329 if (err)
2330 {
2331 ostringstream os;
2332 os << "DBRM::getDBRootOfVBOID() returned an error looking for vbOID " << *vbIt;
2333 log(os.str());
2334 return ERR_FAILURE;
2335 }
2336
2337 dbroots->push_back((uint16_t) err);
2338 }
2339
2340 vss->release(VSS::READ);
2341 locked[1] = false;
2342 vbbm->release(VBBM::READ);
2343 locked[0] = false;
2344
2345 return ERR_OK;
2346 }
2347 catch (exception& e)
2348 {
2349 if (locked[0])
2350 vbbm->release(VBBM::READ);
2351
2352 if (locked[1])
2353 vss->release(VSS::READ);
2354
2355 return -1;
2356 }
2357
2358 }
2359
getUncommittedLBIDs(VER_t transID,vector<LBID_t> & lbidList)2360 int DBRM::getUncommittedLBIDs(VER_t transID, vector<LBID_t>& lbidList) throw()
2361 {
2362 #ifdef BRM_INFO
2363
2364 if (fDebug)
2365 {
2366 TRACER_WRITELATER("getUncommittedLBIDs");
2367 TRACER_ADDINPUT(transID);
2368 TRACER_WRITE;
2369 }
2370
2371 #endif
2372 bool locked = false;
2373
2374 try
2375 {
2376 vss->lock(VSS::READ);
2377 locked = true;
2378
2379 vss->getUncommittedLBIDs(transID, lbidList);
2380
2381 vss->release(VSS::READ);
2382 locked = false;
2383 return 0;
2384 }
2385 catch (exception& e)
2386 {
2387 if (locked)
2388 vss->release(VSS::READ);
2389
2390 return -1;
2391 }
2392 }
2393
2394 // @bug 1509. New function that returns one LBID per extent touched as part of the transaction. Used to get a list
2395 // of blocks to use for updating casual partitioning when the transaction is committed.
getUncommittedExtentLBIDs(VER_t transID,vector<LBID_t> & lbidList)2396 int DBRM::getUncommittedExtentLBIDs(VER_t transID, vector<LBID_t>& lbidList) throw()
2397 {
2398 #ifdef BRM_INFO
2399
2400 if (fDebug)
2401 {
2402 TRACER_WRITELATER("getUncommittedExtentLBIDs");
2403 TRACER_ADDINPUT(transID);
2404 TRACER_WRITE;
2405 }
2406
2407 #endif
2408 bool locked = false;
2409 vector<LBID_t>::iterator lbidIt;
2410 typedef pair<int64_t, int64_t> range_t;
2411 range_t range;
2412 vector<range_t> ranges;
2413 vector<range_t>::iterator rangeIt;
2414
2415 try
2416 {
2417 vss->lock(VSS::READ);
2418 locked = true;
2419
2420 // Get a full list of uncommitted LBIDs related to this transactin.
2421 vss->getUncommittedLBIDs(transID, lbidList);
2422
2423 vss->release(VSS::READ);
2424 locked = false;
2425
2426 if (lbidList.size() > 0)
2427 {
2428
2429 // Sort the vector.
2430 std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());
2431
2432 // Get the LBID range for the first block in the list.
2433 lbidIt = lbidList.begin();
2434
2435 if (em->lookup(*lbidIt, range.first, range.second) < 0)
2436 {
2437 return -1;
2438 }
2439
2440 ranges.push_back(range);
2441
2442 // Loop through the LBIDs and add the new ranges.
2443 ++lbidIt;
2444
2445 while (lbidIt != lbidList.end())
2446 {
2447 if (*lbidIt > range.second)
2448 {
2449 if (em->lookup(*lbidIt, range.first, range.second) < 0)
2450 {
2451 return -1;
2452 }
2453
2454 ranges.push_back(range);
2455 }
2456
2457 ++lbidIt;
2458 }
2459
2460 // Reset the lbidList and return only the first LBID in each extent that was changed
2461 // in the transaction.
2462 lbidList.clear();
2463
2464 for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++)
2465 {
2466 lbidList.push_back(rangeIt->first);
2467 }
2468 }
2469
2470 return 0;
2471 }
2472 catch (exception& e)
2473 {
2474 if (locked)
2475 vss->release(VSS::READ);
2476
2477 return -1;
2478 }
2479 }
2480
2481
beginVBCopy(VER_t transID,uint16_t dbRoot,const LBIDRange_v & ranges,VBRange_v & freeList)2482 int DBRM::beginVBCopy(VER_t transID, uint16_t dbRoot, const LBIDRange_v& ranges,
2483 VBRange_v& freeList) DBRM_THROW
2484 {
2485 #ifdef BRM_INFO
2486
2487 if (fDebug)
2488 {
2489 TRACER_WRITELATER("beginVBCopy");
2490 TRACER_ADDINPUT(transID);
2491 TRACER_WRITE;
2492 }
2493
2494 #endif
2495
2496 ByteStream command, response;
2497 uint8_t err;
2498
2499 command << BEGIN_VB_COPY << (ByteStream::quadbyte) transID << dbRoot;
2500 serializeVector<LBIDRange>(command, ranges);
2501 err = send_recv(command, response);
2502
2503 if (err != ERR_OK)
2504 return err;
2505
2506 if (response.length() == 0)
2507 return ERR_NETWORK;
2508
2509 try
2510 {
2511 response >> err;
2512
2513 if (err != 0)
2514 return err;
2515
2516 deserializeVector(response, freeList);
2517 }
2518 catch (exception& e)
2519 {
2520 cerr << e.what() << endl;
2521 return ERR_NETWORK;
2522 }
2523
2524 CHECK_EMPTY(response);
2525 return 0;
2526 }
2527
endVBCopy(VER_t transID,const LBIDRange_v & ranges)2528 int DBRM::endVBCopy(VER_t transID, const LBIDRange_v& ranges)
2529 DBRM_THROW
2530 {
2531 #ifdef BRM_INFO
2532
2533 if (fDebug)
2534 {
2535 TRACER_WRITELATER("endVBCopy");
2536 TRACER_ADDINPUT(transID);
2537 TRACER_WRITE;
2538 }
2539
2540 #endif
2541
2542 ByteStream command, response;
2543 uint8_t err;
2544
2545 command << END_VB_COPY << (ByteStream::quadbyte) transID;
2546 serializeVector(command, ranges);
2547 err = send_recv(command, response);
2548
2549 if (response.length() != 1)
2550 return ERR_NETWORK;
2551
2552 response >> err;
2553 CHECK_EMPTY(response);
2554 return err;
2555 }
2556
vbCommit(VER_t transID)2557 int DBRM::vbCommit(VER_t transID) DBRM_THROW
2558 {
2559 #ifdef BRM_INFO
2560
2561 if (fDebug)
2562 {
2563 TRACER_WRITELATER("vbCommit");
2564 TRACER_ADDINPUT(transID);
2565 TRACER_WRITE;
2566 }
2567
2568 #endif
2569
2570 ByteStream command, response;
2571 uint8_t err;
2572
2573 command << VB_COMMIT << (ByteStream::quadbyte) transID;
2574 err = send_recv(command, response);
2575
2576 if (err != ERR_OK)
2577 return err;
2578
2579 if (response.length() != 1)
2580 return ERR_NETWORK;
2581
2582 response >> err;
2583 CHECK_EMPTY(response);
2584 return err;
2585 }
2586
vbRollback(VER_t transID,const LBIDRange_v & lbidList)2587 int DBRM::vbRollback(VER_t transID, const LBIDRange_v& lbidList) DBRM_THROW
2588 {
2589
2590 #ifdef BRM_INFO
2591
2592 if (fDebug)
2593 {
2594 TRACER_WRITELATER("vbRollback ");
2595 TRACER_ADDINPUT(transID);
2596 TRACER_WRITE;
2597 }
2598
2599 #endif
2600
2601 ByteStream command, response;
2602 uint8_t err;
2603
2604 command << VB_ROLLBACK1 << (ByteStream::quadbyte) transID;
2605 serializeVector(command, lbidList);
2606 err = send_recv(command, response);
2607
2608 if (err != ERR_OK)
2609 return err;
2610
2611 if (response.length() != 1)
2612 return ERR_NETWORK;
2613
2614 response >> err;
2615 CHECK_EMPTY(response);
2616 return err;
2617 }
2618
vbRollback(VER_t transID,const vector<LBID_t> & lbidList)2619 int DBRM::vbRollback(VER_t transID, const vector<LBID_t>& lbidList) DBRM_THROW
2620 {
2621 #ifdef BRM_INFO
2622
2623 if (fDebug)
2624 {
2625 TRACER_WRITELATER("vbRollback");
2626 TRACER_ADDINPUT(transID);
2627 TRACER_WRITE;
2628 }
2629
2630 #endif
2631
2632 ByteStream command, response;
2633 uint8_t err;
2634
2635 command << VB_ROLLBACK2 << (ByteStream::quadbyte) transID;
2636 serializeVector(command, lbidList);
2637 err = send_recv(command, response);
2638
2639 if (err != ERR_OK)
2640 return err;
2641
2642 if (response.length() != 1)
2643 return ERR_NETWORK;
2644
2645 response >> err;
2646 CHECK_EMPTY(response);
2647 return err;
2648 }
2649
halt()2650 int DBRM::halt() DBRM_THROW
2651 {
2652 #ifdef BRM_INFO
2653
2654 if (fDebug) TRACER_WRITENOW("halt");
2655
2656 #endif
2657 ByteStream command, response;
2658 uint8_t err;
2659
2660 command << HALT;
2661 err = send_recv(command, response);
2662
2663 if (err != ERR_OK)
2664 return err;
2665
2666 if (response.length() != 1)
2667 return ERR_NETWORK;
2668
2669 response >> err;
2670 CHECK_EMPTY(response);
2671 return err;
2672 }
2673
resume()2674 int DBRM::resume() DBRM_THROW
2675 {
2676 #ifdef BRM_INFO
2677
2678 if (fDebug) TRACER_WRITENOW("resume");
2679
2680 #endif
2681 ByteStream command, response;
2682 uint8_t err;
2683
2684 command << RESUME;
2685 err = send_recv(command, response);
2686
2687 if (err != ERR_OK)
2688 return err;
2689
2690 if (response.length() != 1)
2691 return ERR_NETWORK;
2692
2693 response >> err;
2694 CHECK_EMPTY(response);
2695 return err;
2696 }
2697
forceReload()2698 int DBRM::forceReload() DBRM_THROW
2699 {
2700 #ifdef BRM_INFO
2701
2702 if (fDebug) TRACER_WRITENOW("forceReload");
2703
2704 #endif
2705 ByteStream command, response;
2706 uint8_t err;
2707
2708 command << RELOAD;
2709 err = send_recv(command, response);
2710
2711 if (err != ERR_OK)
2712 return err;
2713
2714 if (response.length() != 1)
2715 return ERR_NETWORK;
2716
2717 response >> err;
2718 CHECK_EMPTY(response);
2719 return err;
2720 }
2721
setReadOnly(bool b)2722 int DBRM::setReadOnly(bool b) DBRM_THROW
2723 {
2724 #ifdef BRM_INFO
2725
2726 if (fDebug)
2727 {
2728 TRACER_WRITELATER("setReadOnly");
2729 TRACER_ADDBOOLINPUT(b);
2730 TRACER_WRITE;
2731 }
2732
2733 #endif
2734
2735 ByteStream command, response;
2736 uint8_t err;
2737
2738 command << (b ? SETREADONLY : SETREADWRITE);
2739 err = send_recv(command, response);
2740
2741 if (err != ERR_OK)
2742 return err;
2743
2744 if (response.length() != 1)
2745 return ERR_NETWORK;
2746
2747 response >> err;
2748 CHECK_EMPTY(response);
2749 return err;
2750 }
2751
isReadWrite()2752 int DBRM::isReadWrite() throw()
2753 {
2754 #ifdef BRM_INFO
2755
2756 if (fDebug) TRACER_WRITENOW("isReadWrite");
2757
2758 #endif
2759 ByteStream command, response;
2760 uint8_t err;
2761
2762 command << GETREADONLY;
2763 err = send_recv(command, response);
2764
2765 if (err != ERR_OK)
2766 return err;
2767
2768 if (response.length() != 1)
2769 return ERR_NETWORK;
2770
2771 response >> err;
2772 //CHECK_EMPTY(response);
2773 return (err == 0 ? ERR_OK : ERR_READONLY);
2774 }
2775
dmlLockLBIDRanges(const vector<LBIDRange> & ranges,int txnID)2776 int DBRM::dmlLockLBIDRanges(const vector<LBIDRange>& ranges, int txnID)
2777 {
2778 #ifdef BRM_INFO
2779
2780 if (fDebug) TRACER_WRITENOW("clear");
2781
2782 #endif
2783 ByteStream command, response;
2784 uint8_t err;
2785
2786 command << LOCK_LBID_RANGES;
2787 serializeVector<LBIDRange>(command, ranges);
2788 command << (uint32_t) txnID;
2789 err = send_recv(command, response);
2790
2791 if (err != ERR_OK)
2792 return err;
2793
2794 if (response.length() != 1)
2795 return ERR_NETWORK;
2796
2797 response >> err;
2798 CHECK_EMPTY(response);
2799 return err;
2800 }
2801
dmlReleaseLBIDRanges(const vector<LBIDRange> & ranges)2802 int DBRM::dmlReleaseLBIDRanges(const vector<LBIDRange>& ranges)
2803 {
2804 #ifdef BRM_INFO
2805
2806 if (fDebug) TRACER_WRITENOW("clear");
2807
2808 #endif
2809 ByteStream command, response;
2810 uint8_t err;
2811
2812 command << RELEASE_LBID_RANGES;
2813 serializeVector<LBIDRange>(command, ranges);
2814 err = send_recv(command, response);
2815
2816 if (err != ERR_OK)
2817 return err;
2818
2819 if (response.length() != 1)
2820 return ERR_NETWORK;
2821
2822 response >> err;
2823 CHECK_EMPTY(response);
2824 return err;
2825 }
2826
clear()2827 int DBRM::clear() DBRM_THROW
2828 {
2829 ByteStream command, response;
2830 uint8_t err;
2831
2832 command << BRM_CLEAR;
2833 err = send_recv(command, response);
2834
2835 if (err != ERR_OK)
2836 return err;
2837
2838 if (response.length() != 1)
2839 return ERR_NETWORK;
2840
2841 response >> err;
2842 CHECK_EMPTY(response);
2843 return err;
2844 }
2845
checkConsistency()2846 int DBRM::checkConsistency() throw()
2847 {
2848 #ifdef BRM_INFO
2849
2850 if (fDebug) TRACER_WRITENOW("checkConsistency");
2851
2852 #endif
2853 bool locked[2] = {false, false};
2854
2855 try
2856 {
2857 em->checkConsistency();
2858 }
2859 catch (exception& e)
2860 {
2861 cerr << e.what() << endl;
2862 return -1;
2863 }
2864
2865 try
2866 {
2867 vbbm->lock(VBBM::READ);
2868 locked[0] = true;
2869 vss->lock(VSS::READ);
2870 locked[1] = true;
2871 vss->checkConsistency(*vbbm, *em);
2872 vss->release(VSS::READ);
2873 locked[1] = false;
2874 vbbm->release(VBBM::READ);
2875 locked[0] = false;
2876 }
2877 catch (exception& e)
2878 {
2879 cerr << e.what() << endl;
2880
2881 if (locked[1])
2882 vss->release(VSS::READ);
2883
2884 if (locked[0])
2885 vbbm->release(VBBM::READ);
2886
2887 return -1;
2888 }
2889
2890 try
2891 {
2892 vbbm->lock(VBBM::READ);
2893 vbbm->checkConsistency();
2894 vbbm->release(VBBM::READ);
2895 }
2896 catch (exception& e)
2897 {
2898 cerr << e.what() << endl;
2899 vbbm->release(VBBM::READ);
2900 return -1;
2901 }
2902
2903 return 0;
2904 }
2905
getCurrentTxnIDs(set<VER_t> & txnList)2906 int DBRM::getCurrentTxnIDs(set<VER_t>& txnList) throw()
2907 {
2908 #ifdef BRM_INFO
2909
2910 if (fDebug) TRACER_WRITENOW("getCurrentTxnIDs");
2911
2912 #endif
2913 bool locked[2] = { false, false };
2914
2915 try
2916 {
2917 txnList.clear();
2918 vss->lock(VSS::READ);
2919 locked[0] = true;
2920 copylocks->lock(CopyLocks::READ);
2921 locked[1] = true;
2922 copylocks->getCurrentTxnIDs(txnList);
2923 vss->getCurrentTxnIDs(txnList);
2924 copylocks->release(CopyLocks::READ);
2925 locked[1] = false;
2926 vss->release(VSS::READ);
2927 locked[0] = false;
2928 }
2929 catch (exception& e)
2930 {
2931 if (locked[1])
2932 copylocks->release(CopyLocks::READ);
2933
2934 if (locked[0])
2935 vss->release(VSS::READ);
2936
2937 cerr << e.what() << endl;
2938 return -1;
2939 }
2940
2941 return 0;
2942 }
2943
verID()2944 const QueryContext DBRM::verID()
2945 {
2946 #ifdef BRM_INFO
2947
2948 if (fDebug) TRACER_WRITENOW("verID");
2949
2950 #endif
2951 ByteStream command, response;
2952 uint8_t err;
2953 QueryContext ret;
2954
2955 command << VER_ID;
2956 err = send_recv(command, response);
2957
2958 if (err != ERR_OK)
2959 {
2960 cerr << "DBRM: SessionManager::verID(): network error" << endl;
2961 ret.currentScn = -1;
2962 return ret;
2963 }
2964
2965 try
2966 {
2967 response >> err;
2968 response >> ret;
2969 CHECK_EMPTY(response);
2970 }
2971 catch (exception& e)
2972 {
2973 cerr << "DBRM: SessionManager::verID(): bad response" << endl;
2974 log("DBRM: SessionManager::verID(): bad response", logging::LOG_TYPE_WARNING);
2975 ret.currentScn = -1;
2976 }
2977
2978 return ret;
2979 }
2980
sysCatVerID()2981 const QueryContext DBRM::sysCatVerID()
2982 {
2983 #ifdef BRM_INFO
2984
2985 if (fDebug) TRACER_WRITENOW("sysCatVerID");
2986
2987 #endif
2988 ByteStream command, response;
2989 uint8_t err;
2990 QueryContext ret;
2991
2992 command << SYSCAT_VER_ID;
2993 err = send_recv(command, response);
2994
2995 if (err != ERR_OK)
2996 {
2997 cerr << "DBRM: SessionManager::sysCatVerID(): network error" << endl;
2998 ret.currentScn = -1;
2999 return ret;
3000 }
3001
3002 try
3003 {
3004 response >> err;
3005 response >> ret;
3006 CHECK_EMPTY(response);
3007 }
3008 catch (exception& e)
3009 {
3010 cerr << "DBRM: SessionManager::sysCatVerID(): bad response" << endl;
3011 log("DBRM: SessionManager::sysCatVerID(): bad response", logging::LOG_TYPE_WARNING);
3012 ret.currentScn = -1;
3013 }
3014
3015 return ret;
3016 }
3017
3018
newTxnID(const SessionManagerServer::SID session,bool block,bool isDDL)3019 const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block,
3020 bool isDDL)
3021 {
3022 #ifdef BRM_INFO
3023
3024 if (fDebug)
3025 {
3026 TRACER_WRITELATER("newTxnID");
3027 TRACER_ADDINPUT(session);
3028 TRACER_ADDBOOLINPUT(block);
3029 TRACER_WRITE;
3030 }
3031
3032 #endif
3033
3034 ByteStream command, response;
3035 uint8_t err, tmp;
3036 uint32_t tmp32;
3037 TxnID ret;
3038
3039 command << NEW_TXN_ID << session << (uint8_t) block << (uint8_t)isDDL;
3040 err = send_recv(command, response);
3041
3042 if (err != ERR_OK)
3043 {
3044 log("DBRM: SessionManager::newTxnID(): network error");
3045 ret.valid = false;
3046 return ret;
3047 }
3048
3049 if (response.length() != 6)
3050 {
3051 log("DBRM: SessionManager::newTxnID(): bad response");
3052 ret.valid = false;
3053 return ret;
3054 }
3055
3056 response >> err;
3057 response >> tmp32;
3058 ret.id = tmp32;
3059 response >> tmp;
3060 ret.valid = (tmp == 0 ? false : true);
3061 CHECK_EMPTY(response);
3062 return ret;
3063 }
3064
committed(TxnID & txnid)3065 void DBRM::committed(TxnID& txnid)
3066 {
3067 #ifdef BRM_INFO
3068
3069 if (fDebug)
3070 {
3071 TRACER_WRITELATER("committed");
3072 TRACER_ADDINPUT(txnid);
3073 TRACER_WRITE;
3074 }
3075
3076 #endif
3077
3078 ByteStream command, response;
3079 uint8_t err;
3080
3081 command << COMMITTED << (uint32_t) txnid.id << (uint8_t) txnid.valid;
3082 err = send_recv(command, response);
3083 txnid.valid = false;
3084
3085 if (err != ERR_OK)
3086 log("DBRM: error: SessionManager::committed() failed");
3087 else if (response.length() != 1)
3088 log("DBRM: error: SessionManager::committed() failed (bad response)",
3089 logging::LOG_TYPE_ERROR);
3090
3091 response >> err;
3092
3093 if (err != ERR_OK)
3094 log("DBRM: error: SessionManager::committed() failed (valid error code)",
3095 logging::LOG_TYPE_ERROR);
3096
3097 }
3098
3099
rolledback(TxnID & txnid)3100 void DBRM::rolledback(TxnID& txnid)
3101 {
3102 #ifdef BRM_INFO
3103
3104 if (fDebug)
3105 {
3106 TRACER_WRITELATER("rolledback");
3107 TRACER_ADDINPUT(txnid);
3108 TRACER_WRITE;
3109 }
3110
3111 #endif
3112
3113 ByteStream command, response;
3114 uint8_t err, tmp;
3115
3116 command << ROLLED_BACK << (uint32_t) txnid.id << (uint8_t) txnid.valid;
3117 err = send_recv(command, response);
3118 txnid.valid = false;
3119
3120 if (err != ERR_OK)
3121 log("DBRM: error: SessionManager::rolledback() failed (network)");
3122 else if (response.length() != 1)
3123 log("DBRM: error: SessionManager::rolledback() failed (bad response)",
3124 logging::LOG_TYPE_ERROR);
3125
3126 response >> tmp;
3127
3128 if (tmp != ERR_OK)
3129 {
3130 if (getSystemReady() != 0 )
3131 log("DBRM: error: SessionManager::rolledback() failed (valid error code)",
3132 logging::LOG_TYPE_ERROR);
3133 }
3134 }
3135
getUnlockedLBIDs(BlockList_t * list)3136 int DBRM::getUnlockedLBIDs(BlockList_t* list) DBRM_THROW
3137 {
3138 bool locked = false;
3139
3140 list->clear();
3141
3142 try
3143 {
3144 vss->lock(VSS::READ);
3145 locked = true;
3146 vss->getUnlockedLBIDs(*list);
3147 vss->release(VSS::READ);
3148 locked = false;
3149 return 0;
3150 }
3151 catch (exception& e)
3152 {
3153 if (locked)
3154 vss->release(VSS::READ);
3155
3156 cerr << e.what() << endl;
3157 return -1;
3158 }
3159 }
3160
3161
getTxnID(const SessionManagerServer::SID session)3162 const TxnID DBRM::getTxnID
3163 (const SessionManagerServer::SID session)
3164 {
3165 #ifdef BRM_INFO
3166
3167 if (fDebug)
3168 {
3169 TRACER_WRITELATER("getTxnID");
3170 TRACER_ADDINPUT(session);
3171 TRACER_WRITE;
3172 }
3173
3174 #endif
3175
3176 ByteStream command, response;
3177 uint8_t err, tmp8;
3178 uint32_t tmp32;
3179 TxnID ret;
3180
3181 command << GET_TXN_ID << (uint32_t) session;
3182 err = send_recv(command, response);
3183
3184 if (err != ERR_OK)
3185 {
3186 log("DBRM: error: SessionManager::getTxnID() failed (network)", logging::LOG_TYPE_ERROR);
3187 ret.valid = false;
3188 return ret;
3189 }
3190
3191 response >> err;
3192
3193 if (err != ERR_OK)
3194 {
3195 log("DBRM: error: SessionManager::getTxnID() failed (got an error)",
3196 logging::LOG_TYPE_ERROR);
3197 ret.valid = false;
3198 return ret;
3199 }
3200
3201 response >> tmp32 >> tmp8;
3202 ret.id = tmp32;
3203 ret.valid = tmp8;
3204 return ret;
3205 }
3206
SIDTIDMap(int & len)3207 boost::shared_array<SIDTIDEntry> DBRM::SIDTIDMap(int& len)
3208 {
3209 #ifdef BRM_INFO
3210
3211 if (fDebug)
3212 {
3213 TRACER_WRITELATER("SIDTIDMap");
3214 TRACER_ADDOUTPUT(len);
3215 TRACER_WRITE;
3216 }
3217
3218 #endif
3219
3220 ByteStream command, response;
3221 uint8_t err, tmp8;
3222 uint32_t tmp32;
3223 int i;
3224 boost::shared_array<SIDTIDEntry> ret;
3225
3226 command << SID_TID_MAP;
3227 err = send_recv(command, response);
3228
3229 if (err != ERR_OK)
3230 {
3231 log("DBRM: error: SessionManager::SIDTIDEntry() failed (network)");
3232 return ret;
3233 }
3234
3235 response >> err;
3236
3237 if (err != ERR_OK)
3238 {
3239 log("DBRM: error: SessionManager::SIDTIDEntry() failed (valid error code)",
3240 logging::LOG_TYPE_ERROR);
3241 return ret;
3242 }
3243
3244 response >> tmp32;
3245 len = (int) tmp32;
3246 ret.reset(new SIDTIDEntry[len]);
3247
3248 for (i = 0; i < len; i++)
3249 {
3250 response >> tmp32 >> tmp8;
3251 ret[i].txnid.id = tmp32;
3252 ret[i].txnid.valid = (tmp8 == 0 ? false : true);
3253 response >> tmp32;
3254 ret[i].sessionid = tmp32;
3255 }
3256
3257 CHECK_EMPTY(response);
3258 return ret;
3259 }
3260
getUnique32()3261 uint32_t DBRM::getUnique32()
3262 {
3263 #ifdef BRM_INFO
3264
3265 if (fDebug) TRACER_WRITENOW("getUnique32");
3266
3267 #endif
3268
3269 ByteStream command, response;
3270 uint8_t err;
3271 uint32_t ret;
3272
3273 command << GET_UNIQUE_UINT32;
3274 err = send_recv(command, response);
3275
3276 if (err != ERR_OK)
3277 {
3278 cerr << "DBRM: getUnique32() failed (network)\n";
3279 log("DBRM: getUnique32() failed (network)", logging::LOG_TYPE_ERROR);
3280 throw runtime_error("DBRM: getUnique32() failed check the controllernode");
3281 return 0;
3282 }
3283
3284 /* Some jobsteps don't need the connection after this so close it to free up
3285 resources on the controller node */
3286 /* Comment the following 4 lines out. The DBRM instance is a singleton so no need to
3287 remove the client. Plus, it may cause weird network issue when the socket is being
3288 released and re-established very quickly*/
3289 //pthread_mutex_lock(&mutex);
3290 //delete msgClient;
3291 //msgClient = NULL;
3292 //pthread_mutex_unlock(&mutex);
3293
3294 response >> err;
3295
3296 if (err != ERR_OK)
3297 {
3298 cerr << "DBRM: getUnique32() failed (got an error)\n";
3299 log("DBRM: getUnique32() failed (got an error)",
3300 logging::LOG_TYPE_ERROR);
3301 throw runtime_error("DBRM: getUnique32() failed check the controllernode");
3302 return 0;
3303 }
3304
3305 response >> ret;
3306 // cerr << "DBRM returning " << ret << endl;
3307 return ret;
3308 }
3309
getUnique64()3310 uint64_t DBRM::getUnique64()
3311 {
3312 #ifdef BRM_INFO
3313
3314 if (fDebug) TRACER_WRITENOW("getUnique64");
3315
3316 #endif
3317
3318 ByteStream command, response;
3319 uint8_t err;
3320 uint64_t ret;
3321
3322 command << GET_UNIQUE_UINT64;
3323 err = send_recv(command, response);
3324
3325 if (err != ERR_OK)
3326 {
3327 cerr << "DBRM: getUnique64() failed (network)\n";
3328 log("DBRM: getUnique64() failed (network)", logging::LOG_TYPE_ERROR);
3329 throw runtime_error("DBRM: getUnique64() failed check the controllernode");
3330 return 0;
3331 }
3332
3333 /* Some jobsteps don't need the connection after this so close it to free up
3334 resources on the controller node */
3335 /* Comment the following 4 lines out. The DBRM instance is a singleton so no need to
3336 remove the client. Plus, it may cause weird network issue when the socket is being
3337 released and re-established very quickly*/
3338 //pthread_mutex_lock(&mutex);
3339 //delete msgClient;
3340 //msgClient = NULL;
3341 //pthread_mutex_unlock(&mutex);
3342
3343 response >> err;
3344
3345 if (err != ERR_OK)
3346 {
3347 cerr << "DBRM: getUnique64() failed (got an error)\n";
3348 log("DBRM: getUnique64() failed (got an error)",
3349 logging::LOG_TYPE_ERROR);
3350 throw runtime_error("DBRM: getUnique64() failed check the controllernode");
3351 return 0;
3352 }
3353
3354 response >> ret;
3355 // cerr << "DBRM returning " << ret << endl;
3356 return ret;
3357 }
3358
sessionmanager_reset()3359 void DBRM::sessionmanager_reset()
3360 {
3361 ByteStream command, response;
3362 command << SM_RESET;
3363 send_recv(command, response);
3364 }
3365
isEMEmpty()3366 bool DBRM::isEMEmpty() throw()
3367 {
3368 bool res = false;
3369
3370 try
3371 {
3372 res = em->empty();
3373 }
3374 catch (...)
3375 {
3376 res = false;
3377 }
3378
3379 return res;
3380 }
3381
getEMFreeListEntries()3382 vector<InlineLBIDRange> DBRM::getEMFreeListEntries() throw()
3383 {
3384 vector<InlineLBIDRange> res;
3385
3386 try
3387 {
3388 res = em->getFreeListEntries();
3389 }
3390 catch (...)
3391 {
3392 res.clear();
3393 }
3394
3395 return res;
3396 }
3397
takeSnapshot()3398 int DBRM::takeSnapshot() throw ()
3399 {
3400 return 0; // don't know why, but we're calling this all the time. Need to take most/all of those calls out, it's very wasteful.
3401
3402 ByteStream command, response;
3403 uint8_t err;
3404
3405 command << TAKE_SNAPSHOT;
3406 err = send_recv(command, response);
3407
3408 if (err != ERR_OK)
3409 return err;
3410
3411 if (response.length() == 0)
3412 return ERR_NETWORK;
3413
3414 return 0;
3415 }
3416
getSystemReady()3417 int DBRM::getSystemReady() throw()
3418 {
3419 uint32_t stateFlags;
3420
3421 if (getSystemState(stateFlags) < 0)
3422 {
3423 return -1;
3424 }
3425
3426 return (stateFlags & SessionManagerServer::SS_READY);
3427 }
3428
getSystemQueryReady()3429 int DBRM::getSystemQueryReady() throw()
3430 {
3431 uint32_t stateFlags;
3432
3433 if (getSystemState(stateFlags) < 0)
3434 {
3435 return -1;
3436 }
3437
3438 return (stateFlags & SessionManagerServer::SS_QUERY_READY);
3439 }
3440
getSystemSuspended()3441 int DBRM::getSystemSuspended() throw()
3442 {
3443 uint32_t stateFlags;
3444
3445 if (getSystemState(stateFlags) < 0)
3446 {
3447 return -1;
3448 }
3449
3450 return (stateFlags & SessionManagerServer::SS_SUSPENDED);
3451 }
3452
getSystemSuspendPending(bool & bRollback)3453 int DBRM::getSystemSuspendPending(bool& bRollback) throw()
3454 {
3455 uint32_t stateFlags;
3456
3457 if (getSystemState(stateFlags) < 0)
3458 {
3459 return -1;
3460 }
3461
3462 bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK;
3463
3464 return (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING);
3465 }
3466
getSystemShutdownPending(bool & bRollback,bool & bForce)3467 int DBRM::getSystemShutdownPending(bool& bRollback, bool& bForce) throw()
3468 {
3469 uint32_t stateFlags;
3470
3471 if (getSystemState(stateFlags) < 0)
3472 {
3473 return -1;
3474 }
3475
3476 bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK;
3477 bForce = stateFlags & SessionManagerServer::SS_FORCE;
3478
3479 return (stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING);
3480 }
3481
setSystemReady(bool bReady)3482 int DBRM::setSystemReady(bool bReady) throw()
3483 {
3484 if (bReady)
3485 {
3486 return setSystemState(SessionManagerServer::SS_READY);
3487 }
3488 else
3489 {
3490 return clearSystemState(SessionManagerServer::SS_READY);
3491 }
3492 }
3493
setSystemQueryReady(bool bReady)3494 int DBRM::setSystemQueryReady(bool bReady) throw()
3495 {
3496 if (bReady)
3497 {
3498 return setSystemState(SessionManagerServer::SS_QUERY_READY);
3499 }
3500 else
3501 {
3502 return clearSystemState(SessionManagerServer::SS_QUERY_READY);
3503 }
3504 }
3505
setSystemSuspended(bool bSuspended)3506 int DBRM::setSystemSuspended(bool bSuspended) throw()
3507 {
3508 uint32_t stateFlags = 0;
3509
3510 if (bSuspended)
3511 {
3512 if (setSystemState(SessionManagerServer::SS_SUSPENDED) < 0)
3513 {
3514 return -1;
3515 }
3516 }
3517 else
3518 {
3519 stateFlags = SessionManagerServer::SS_SUSPENDED;
3520 }
3521
3522 // In either case, we need to clear the pending and rollback flags
3523 stateFlags |= SessionManagerServer::SS_SUSPEND_PENDING;
3524 stateFlags |= SessionManagerServer::SS_ROLLBACK;
3525 return clearSystemState(stateFlags);
3526 }
3527
setSystemSuspendPending(bool bPending,bool bRollback)3528 int DBRM::setSystemSuspendPending(bool bPending, bool bRollback) throw()
3529 {
3530 uint32_t stateFlags = SessionManagerServer::SS_SUSPEND_PENDING;
3531
3532 if (bPending)
3533 {
3534 if (bRollback)
3535 {
3536 stateFlags |= SessionManagerServer::SS_ROLLBACK;
3537 }
3538
3539 return setSystemState(stateFlags);
3540 }
3541 else
3542 {
3543 stateFlags |= SessionManagerServer::SS_ROLLBACK;
3544 return clearSystemState(stateFlags);
3545 }
3546 }
3547
setSystemShutdownPending(bool bPending,bool bRollback,bool bForce)3548 int DBRM::setSystemShutdownPending(bool bPending, bool bRollback, bool bForce) throw()
3549 {
3550 int rtn = 0;
3551 uint32_t stateFlags = SessionManagerServer::SS_SHUTDOWN_PENDING;
3552
3553 if (bPending)
3554 {
3555 if (bForce)
3556 {
3557 stateFlags |= SessionManagerServer::SS_FORCE;
3558 }
3559 else if (bRollback)
3560 {
3561 stateFlags |= SessionManagerServer::SS_ROLLBACK;
3562 }
3563
3564 rtn = setSystemState(stateFlags);
3565 }
3566 else
3567 {
3568 stateFlags |= SessionManagerServer::SS_ROLLBACK;
3569 stateFlags |= SessionManagerServer::SS_FORCE;
3570 rtn = clearSystemState(stateFlags); // Clears the flags that are turned on in stateFlags
3571 }
3572
3573 return rtn;
3574 }
3575
3576 /* Return the shm stateflags
3577 */
getSystemState(uint32_t & stateFlags)3578 int DBRM::getSystemState(uint32_t& stateFlags) throw()
3579 {
3580 try
3581 {
3582 #ifdef BRM_INFO
3583
3584 if (fDebug)
3585 {
3586 TRACER_WRITELATER("getSystemState");
3587 TRACER_WRITE;
3588 }
3589
3590 #endif
3591 ByteStream command, response;
3592 uint8_t err;
3593
3594 command << GET_SYSTEM_STATE;
3595 err = send_recv(command, response);
3596
3597 if (err != ERR_OK)
3598 {
3599 std::ostringstream oss;
3600 oss << "DBRM: error: SessionManager::getSystemState() failed (network)";
3601 log(oss.str(), logging::LOG_TYPE_ERROR);
3602 return -1;
3603 }
3604
3605 response >> err;
3606
3607 if (err != ERR_OK)
3608 {
3609 std::ostringstream oss;
3610 oss << "DBRM: error: SessionManager::getSystemState() failed (error " << err << ")";
3611 log(oss.str(), logging::LOG_TYPE_ERROR);
3612 return -1;
3613 }
3614
3615 response >> stateFlags;
3616 return 1;
3617 }
3618 catch (...)
3619 {
3620 }
3621
3622 return -1;
3623 }
3624
3625 /* Set the shm stateflags that are set in the parameter
3626 */
setSystemState(uint32_t stateFlags)3627 int DBRM::setSystemState(uint32_t stateFlags) throw()
3628 {
3629 try
3630 {
3631 #ifdef BRM_INFO
3632
3633 if (fDebug)
3634 {
3635 TRACER_WRITELATER("setSystemState");
3636 TRACER_WRITE;
3637 }
3638
3639 #endif
3640
3641 ByteStream command, response;
3642 uint8_t err;
3643
3644 command << SET_SYSTEM_STATE << static_cast<ByteStream::quadbyte>(stateFlags);
3645 err = send_recv(command, response);
3646
3647 if (err != ERR_OK)
3648 {
3649 std::ostringstream oss;
3650 oss << "DBRM: error: SessionManager::setSystemState() failed (network)";
3651 log(oss.str(), logging::LOG_TYPE_ERROR);
3652 stateFlags = 0;
3653 return -1;
3654 }
3655
3656 response >> err;
3657
3658 if (err != ERR_OK)
3659 {
3660 std::ostringstream oss;
3661 oss << "DBRM: error: SessionManager::setSystemState() failed (got an error)";
3662 log(oss.str(), logging::LOG_TYPE_ERROR);
3663 stateFlags = 0;
3664 return -1;
3665 }
3666
3667 return 1;
3668 }
3669 catch (...)
3670 {
3671 }
3672
3673 stateFlags = 0;
3674 return -1;
3675 }
3676
3677 /* Clear the shm stateflags that are set in the parameter
3678 */
clearSystemState(uint32_t stateFlags)3679 int DBRM::clearSystemState(uint32_t stateFlags) throw()
3680 {
3681 try
3682 {
3683 #ifdef BRM_INFO
3684
3685 if (fDebug)
3686 {
3687 TRACER_WRITELATER("clearSystemState");
3688 TRACER_WRITE;
3689 }
3690
3691 #endif
3692
3693 ByteStream command, response;
3694 uint8_t err;
3695
3696 command << CLEAR_SYSTEM_STATE << static_cast<ByteStream::quadbyte>(stateFlags);
3697 err = send_recv(command, response);
3698
3699 if (err != ERR_OK)
3700 {
3701 std::ostringstream oss;
3702 oss << "DBRM: error: SessionManager::clearSystemState() failed (network)";
3703 log(oss.str(), logging::LOG_TYPE_ERROR);
3704 return -1;
3705 }
3706
3707 response >> err;
3708
3709 if (err != ERR_OK)
3710 {
3711 std::ostringstream oss;
3712 oss << "DBRM: error: SessionManager::clearSystemState() failed (got an error)";
3713 log(oss.str(), logging::LOG_TYPE_ERROR);
3714 return -1;
3715 }
3716
3717 return 1;
3718 }
3719 catch (...)
3720 {
3721 }
3722
3723 return -1;
3724 }
3725
3726 /* Ping the controller node. Don't print anything.
3727 */
isDBRMReady()3728 bool DBRM::isDBRMReady() throw()
3729 {
3730 #ifdef BRM_INFO
3731
3732 if (fDebug) TRACER_WRITENOW("isDBRMReady");
3733
3734 #endif
3735 boost::mutex::scoped_lock scoped(mutex);
3736
3737 try
3738 {
3739 for (int attempt = 0; attempt < 2; ++attempt)
3740 {
3741 try
3742 {
3743 if (msgClient == NULL)
3744 {
3745 msgClient = MessageQueueClientPool::getInstance(masterName);
3746 }
3747
3748 if (msgClient->connect())
3749 {
3750 return true;
3751 }
3752 }
3753 catch (...)
3754 {
3755 }
3756
3757 MessageQueueClientPool::releaseInstance(msgClient);
3758 msgClient = NULL;
3759 sleep(1);
3760 }
3761 }
3762 catch (...)
3763 {
3764 }
3765
3766 return false;
3767 }
3768
3769 /* This waits for the lock up to 30 sec. After 30 sec, the assumption is something
3770 * bad happened, and this will fix the lock state so that primproc can keep
3771 * running. These prevent a non-critical problem anyway.
3772 */
lockLBIDRange(LBID_t start,uint32_t count)3773 void DBRM::lockLBIDRange(LBID_t start, uint32_t count)
3774 {
3775 bool locked = false, lockedRange = false;
3776 LBIDRange range;
3777 const uint32_t waitInterval = 50000; // in usec
3778 const uint32_t maxRetries = 30000000 / waitInterval; // 30 secs
3779 uint32_t retries = 0;
3780
3781 range.start = start;
3782 range.size = count;
3783
3784 try
3785 {
3786 copylocks->lock(CopyLocks::WRITE);
3787 locked = true;
3788
3789 while (copylocks->isLocked(range) && retries < maxRetries)
3790 {
3791 copylocks->release(CopyLocks::WRITE);
3792 locked = false;
3793 usleep(waitInterval);
3794 retries++;
3795 copylocks->lock(CopyLocks::WRITE);
3796 locked = true;
3797 }
3798
3799 if (retries >= maxRetries)
3800 copylocks->forceRelease(range);
3801
3802 copylocks->lockRange(range, -1);
3803 lockedRange = true;
3804 copylocks->confirmChanges();
3805 copylocks->release(CopyLocks::WRITE);
3806 locked = false;
3807 }
3808 catch (...)
3809 {
3810 if (lockedRange)
3811 copylocks->releaseRange(range);
3812
3813 if (locked)
3814 {
3815 copylocks->confirmChanges();
3816 copylocks->release(CopyLocks::WRITE);
3817 }
3818
3819 throw;
3820 }
3821 }
3822
releaseLBIDRange(LBID_t start,uint32_t count)3823 void DBRM::releaseLBIDRange(LBID_t start, uint32_t count)
3824 {
3825 bool locked = false;
3826 LBIDRange range;
3827 range.start = start;
3828 range.size = count;
3829
3830 try
3831 {
3832 copylocks->lock(CopyLocks::WRITE);
3833 locked = true;
3834 copylocks->releaseRange(range);
3835 copylocks->confirmChanges();
3836 copylocks->release(CopyLocks::WRITE);
3837 locked = false;
3838 }
3839 catch (...)
3840 {
3841 if (locked)
3842 {
3843 copylocks->confirmChanges();
3844 copylocks->release(CopyLocks::WRITE);
3845 }
3846
3847 throw;
3848 }
3849 }
3850
3851 /* OID Manager section */
3852
allocOIDs(int num)3853 int DBRM::allocOIDs(int num)
3854 {
3855 #ifdef BRM_INFO
3856
3857 if (fDebug) TRACER_WRITENOW("allocOID");
3858
3859 #endif
3860 ByteStream command, response;
3861 uint8_t err;
3862 uint32_t ret;
3863
3864 command << ALLOC_OIDS;
3865 command << (uint32_t) num;
3866 err = send_recv(command, response);
3867
3868 if (err != ERR_OK)
3869 {
3870 cerr << "DBRM: OIDManager::allocOIDs(): network error" << endl;
3871 log("DBRM: OIDManager::allocOIDs(): network error", logging::LOG_TYPE_CRITICAL);
3872 return -1;
3873 }
3874
3875 try
3876 {
3877 response >> err;
3878
3879 if (err != ERR_OK)
3880 return -1;
3881
3882 response >> ret;
3883 CHECK_EMPTY(response);
3884 return (int) ret;
3885 }
3886 catch (...)
3887 {
3888 log("DBRM: OIDManager::allocOIDs(): bad response", logging::LOG_TYPE_CRITICAL);
3889 return -1;
3890 }
3891 }
3892
returnOIDs(int start,int end)3893 void DBRM::returnOIDs(int start, int end)
3894 {
3895 ByteStream command, response;
3896 uint8_t err;
3897
3898 command << RETURN_OIDS;
3899 command << (uint32_t) start;
3900 command << (uint32_t) end;
3901 err = send_recv(command, response);
3902
3903 if (err == ERR_NETWORK)
3904 {
3905 cerr << "DBRM: OIDManager::returnOIDs(): network error" << endl;
3906 log("DBRM: OIDManager::returnOIDs(): network error", logging::LOG_TYPE_CRITICAL);
3907 throw runtime_error("DBRM: OIDManager::returnOIDs(): network error");
3908 }
3909
3910 try
3911 {
3912 response >> err;
3913 CHECK_EMPTY(response);
3914 }
3915 catch (...)
3916 {
3917 err = ERR_FAILURE;
3918 }
3919
3920 if (err != ERR_OK)
3921 {
3922 log("DBRM: OIDManager::returnOIDs() failed", logging::LOG_TYPE_CRITICAL);
3923 throw runtime_error("DBRM: OIDManager::returnOIDs() failed");
3924 }
3925 }
3926
oidm_size()3927 int DBRM::oidm_size()
3928 {
3929 ByteStream command, response;
3930 uint8_t err;
3931 uint32_t ret;
3932
3933 command << OIDM_SIZE;
3934 err = send_recv(command, response);
3935
3936 if (err != ERR_OK)
3937 {
3938 cerr << "DBRM: OIDManager::size(): network error" << endl;
3939 log("DBRM: OIDManager::size(): network error", logging::LOG_TYPE_CRITICAL);
3940 throw runtime_error("DBRM: OIDManager::size(): network error");
3941 }
3942
3943 try
3944 {
3945 response >> err;
3946
3947 if (err == ERR_OK)
3948 {
3949 response >> ret;
3950 CHECK_EMPTY(response);
3951 return ret;
3952 }
3953
3954 CHECK_EMPTY(response);
3955 return -1;
3956 }
3957 catch (...)
3958 {
3959 log("DBRM: OIDManager::size(): bad response", logging::LOG_TYPE_CRITICAL);
3960 throw runtime_error("DBRM: OIDManager::size(): bad response");
3961 }
3962 }
3963
allocVBOID(uint32_t dbroot)3964 int DBRM::allocVBOID(uint32_t dbroot)
3965 {
3966 ByteStream command, response;
3967 uint8_t err;
3968 uint32_t ret;
3969
3970 command << ALLOC_VBOID << (uint32_t) dbroot;
3971 err = send_recv(command, response);
3972
3973 if (err != ERR_OK)
3974 {
3975 cerr << "DBRM: OIDManager::allocVBOID(): network error" << endl;
3976 log("DBRM: OIDManager::allocVBOID(): network error", logging::LOG_TYPE_CRITICAL);
3977 return -1;
3978 }
3979
3980 try
3981 {
3982 response >> err;
3983
3984 if (err == ERR_OK)
3985 {
3986 response >> ret;
3987 CHECK_EMPTY(response);
3988 return ret;
3989 }
3990
3991 CHECK_EMPTY(response);
3992 return -1;
3993 }
3994 catch (...)
3995 {
3996 log("DBRM: OIDManager::allocVBOID(): bad response", logging::LOG_TYPE_CRITICAL);
3997 return -1;
3998 }
3999 }
4000
getDBRootOfVBOID(uint32_t vbOID)4001 int DBRM::getDBRootOfVBOID(uint32_t vbOID)
4002 {
4003 ByteStream command, response;
4004 uint8_t err;
4005 uint32_t ret;
4006
4007 command << GETDBROOTOFVBOID << (uint32_t) vbOID;
4008 err = send_recv(command, response);
4009
4010 if (err != ERR_OK)
4011 {
4012 cerr << "DBRM: OIDManager::getDBRootOfVBOID(): network error" << endl;
4013 log("DBRM: OIDManager::getDBRootOfVBOID(): network error", logging::LOG_TYPE_CRITICAL);
4014 return -1;
4015 }
4016
4017 try
4018 {
4019 response >> err;
4020
4021 if (err == ERR_OK)
4022 {
4023 response >> ret;
4024 CHECK_EMPTY(response);
4025 return (int) ret;
4026 }
4027
4028 CHECK_EMPTY(response);
4029 return -1;
4030 }
4031 catch (...)
4032 {
4033 log("DBRM: OIDManager::getDBRootOfVBOID(): bad response", logging::LOG_TYPE_CRITICAL);
4034 return -1;
4035 }
4036 }
4037
getVBOIDToDBRootMap()4038 vector<uint16_t> DBRM::getVBOIDToDBRootMap()
4039 {
4040 ByteStream command, response;
4041 uint8_t err;
4042 vector<uint16_t> ret;
4043
4044 command << GETVBOIDTODBROOTMAP;
4045 err = send_recv(command, response);
4046
4047 if (err != ERR_OK)
4048 {
4049 log("DBRM: OIDManager::getVBOIDToDBRootMap(): network error", logging::LOG_TYPE_CRITICAL);
4050 throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): network error");
4051 }
4052
4053 try
4054 {
4055 response >> err;
4056
4057 if (err != ERR_OK)
4058 {
4059 log("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error", logging::LOG_TYPE_CRITICAL);
4060 throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error");
4061 }
4062
4063 deserializeInlineVector<uint16_t>(response, ret);
4064 CHECK_EMPTY(response);
4065 return ret;
4066 }
4067 catch (...)
4068 {
4069 log("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response", logging::LOG_TYPE_CRITICAL);
4070 throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response");
4071 }
4072 }
4073
getTableLock(const vector<uint32_t> & pmList,uint32_t tableOID,string * ownerName,uint32_t * ownerPID,int32_t * ownerSessionID,int32_t * ownerTxnID,LockState state)4074 uint64_t DBRM::getTableLock(const vector<uint32_t>& pmList, uint32_t tableOID,
4075 string* ownerName, uint32_t* ownerPID, int32_t* ownerSessionID, int32_t* ownerTxnID, LockState state)
4076 {
4077 ByteStream command, response;
4078 uint8_t err;
4079 uint64_t ret;
4080 TableLockInfo tli;
4081 uint32_t tmp32;
4082 vector<uint32_t> dbRootsList;
4083 OamCache* oamcache = OamCache::makeOamCache();
4084 OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap();
4085 int moduleId = 0;
4086
4087 for (uint32_t i = 0; i < pmList.size(); i++)
4088 {
4089 moduleId = pmList[i];
4090 vector<int> dbroots = (*pmDbroots)[moduleId];
4091
4092 for (uint32_t j = 0; j < dbroots.size(); j++)
4093 dbRootsList.push_back((uint32_t)dbroots[j]);
4094 }
4095
4096 tli.id = 0;
4097 tli.ownerName = *ownerName;
4098 tli.ownerPID = *ownerPID;
4099 tli.ownerSessionID = *ownerSessionID;
4100 tli.ownerTxnID = *ownerTxnID;
4101 tli.dbrootList = dbRootsList;
4102 tli.state = state;
4103 tli.tableOID = tableOID;
4104 tli.creationTime = time(NULL);
4105
4106 command << GET_TABLE_LOCK << tli;
4107 err = send_recv(command, response);
4108
4109 if (err != ERR_OK)
4110 {
4111 log("DBRM: getTableLock(): network error", logging::LOG_TYPE_CRITICAL);
4112 throw runtime_error("DBRM: getTableLock(): network error");
4113 }
4114
4115 response >> err;
4116
4117 /* TODO: this means a save failure, need a specific exception type */
4118 if (err != ERR_OK)
4119 throw runtime_error("Table lock save file failure");
4120
4121 response >> ret;
4122
4123 if (ret == 0)
4124 {
4125 response >> *ownerPID;
4126 response >> *ownerName;
4127 response >> tmp32;
4128 *ownerSessionID = tmp32;
4129 response >> tmp32;
4130 *ownerTxnID = tmp32;
4131 }
4132
4133 idbassert(response.length() == 0);
4134 return ret;
4135 }
4136
releaseTableLock(uint64_t id)4137 bool DBRM::releaseTableLock(uint64_t id)
4138 {
4139 ByteStream command, response;
4140 uint8_t err;
4141
4142 command << RELEASE_TABLE_LOCK << id;
4143 err = send_recv(command, response);
4144
4145 if (err != ERR_OK)
4146 {
4147 log("DBRM: releaseTableLock(): network error", logging::LOG_TYPE_CRITICAL);
4148 throw runtime_error("DBRM: releaseTableLock(): network error");
4149 }
4150
4151 response >> err;
4152
4153 /* TODO: this means a save failure, need a specific exception type */
4154 if (err != ERR_OK)
4155 throw runtime_error("Table lock save file failure");
4156
4157 response >> err;
4158 idbassert(response.length() == 0);
4159
4160 return (bool) err;
4161 }
4162
changeState(uint64_t id,LockState state)4163 bool DBRM::changeState(uint64_t id, LockState state)
4164 {
4165 ByteStream command, response;
4166 uint8_t err;
4167
4168 command << CHANGE_TABLE_LOCK_STATE << id << (uint32_t) state;
4169 err = send_recv(command, response);
4170
4171 if (err != ERR_OK)
4172 {
4173 log("DBRM: changeState(): network error", logging::LOG_TYPE_CRITICAL);
4174 throw runtime_error("DBRM: changeState(): network error");
4175 }
4176
4177 response >> err;
4178
4179 /* TODO: this means a save failure, need a specific exception type */
4180 if (err != ERR_OK)
4181 throw runtime_error("Table lock save file failure");
4182
4183 response >> err;
4184 idbassert(response.length() == 0);
4185
4186 return (bool) err;
4187 }
4188
changeOwner(uint64_t id,const string & ownerName,uint32_t ownerPID,int32_t ownerSessionID,int32_t ownerTxnID)4189 bool DBRM::changeOwner(uint64_t id, const string& ownerName, uint32_t ownerPID, int32_t ownerSessionID,
4190 int32_t ownerTxnID)
4191 {
4192 ByteStream command, response;
4193 uint8_t err;
4194
4195 command << CHANGE_TABLE_LOCK_OWNER << id << ownerName << ownerPID <<
4196 (uint32_t) ownerSessionID << (uint32_t) ownerTxnID;
4197 err = send_recv(command, response);
4198
4199 if (err != ERR_OK)
4200 {
4201 log("DBRM: changeOwner(): network error", logging::LOG_TYPE_CRITICAL);
4202 throw runtime_error("DBRM: changeOwner(): network error");
4203 }
4204
4205 response >> err;
4206
4207 /* TODO: this means a save failure, need a specific exception type */
4208 if (err != ERR_OK)
4209 throw runtime_error("Table lock save file failure");
4210
4211 response >> err;
4212 idbassert(response.length() == 0);
4213 return (bool) err;
4214 }
4215
checkOwner(uint64_t id)4216 bool DBRM::checkOwner(uint64_t id)
4217 {
4218 ByteStream command, response;
4219 uint8_t err;
4220
4221 command << OWNER_CHECK << id;
4222 err = send_recv(command, response);
4223
4224 if (err != ERR_OK)
4225 {
4226 log("DBRM: ownerCheck(): network error", logging::LOG_TYPE_CRITICAL);
4227 throw runtime_error("DBRM: ownerCheck(): network error");
4228 }
4229
4230 response >> err;
4231
4232 /* TODO: this means a save failure, need a specific exception type */
4233 if (err != ERR_OK)
4234 throw runtime_error("Table lock save file failure");
4235
4236 response >> err;
4237 idbassert(response.length() == 0);
4238 return (bool) err; // Return true means the owner is valid
4239 }
4240
getAllTableLocks()4241 vector<TableLockInfo> DBRM::getAllTableLocks()
4242 {
4243 ByteStream command, response;
4244 uint8_t err;
4245 vector<TableLockInfo> ret;
4246
4247 command << GET_ALL_TABLE_LOCKS;
4248 err = send_recv(command, response);
4249
4250 if (err != ERR_OK)
4251 {
4252 log("DBRM: getAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL);
4253 throw runtime_error("DBRM: getAllTableLocks(): network error");
4254 }
4255
4256 response >> err;
4257
4258 if (err != ERR_OK)
4259 {
4260 log("DBRM: getAllTableLocks(): processing error", logging::LOG_TYPE_CRITICAL);
4261 throw runtime_error("DBRM: getAllTableLocks(): processing error");
4262 }
4263
4264 deserializeVector<TableLockInfo>(response, ret);
4265 idbassert(response.length() == 0);
4266 return ret;
4267 }
4268
releaseAllTableLocks()4269 void DBRM::releaseAllTableLocks()
4270 {
4271 ByteStream command, response;
4272 uint8_t err;
4273
4274 command << RELEASE_ALL_TABLE_LOCKS;
4275 err = send_recv(command, response);
4276
4277 if (err != ERR_OK)
4278 {
4279 log("DBRM: releaseAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL);
4280 throw runtime_error("DBRM: releaseAllTableLocks(): network error");
4281 }
4282
4283 response >> err;
4284 idbassert(response.length() == 0);
4285
4286 if (err != ERR_OK)
4287 throw runtime_error("DBRM: releaseAllTableLocks(): processing error");
4288 }
4289
getTableLockInfo(uint64_t id,TableLockInfo * tli)4290 bool DBRM::getTableLockInfo(uint64_t id, TableLockInfo* tli)
4291 {
4292 ByteStream command, response;
4293 uint8_t err;
4294
4295 command << GET_TABLE_LOCK_INFO << id;
4296 err = send_recv(command, response);
4297
4298 if (err != ERR_OK)
4299 {
4300 log("DBRM: getTableLockInfo(): network error", logging::LOG_TYPE_CRITICAL);
4301 throw runtime_error("DBRM: getTableLockInfo(): network error");
4302 }
4303
4304 response >> err;
4305
4306 if (err != ERR_OK)
4307 throw runtime_error("DBRM: getTableLockInfo() processing error");
4308
4309 response >> err;
4310
4311 if (err)
4312 response >> *tli;
4313
4314 return (bool) err;
4315 }
4316
startAISequence(uint32_t OID,uint64_t firstNum,uint32_t colWidth,execplan::CalpontSystemCatalog::ColDataType colDataType)4317 void DBRM::startAISequence(uint32_t OID, uint64_t firstNum, uint32_t colWidth,
4318 execplan::CalpontSystemCatalog::ColDataType colDataType)
4319 {
4320 ByteStream command, response;
4321 uint8_t err;
4322 uint8_t tmp8 = colDataType;
4323
4324 command << START_AI_SEQUENCE << OID << firstNum << colWidth << tmp8;
4325 err = send_recv(command, response);
4326
4327 if (err != ERR_OK)
4328 {
4329 log("DBRM: startAISequence(): network error", logging::LOG_TYPE_CRITICAL);
4330 throw runtime_error("DBRM: startAISequence(): network error");
4331 }
4332
4333 response >> err;
4334 idbassert(response.length() == 0);
4335
4336 if (err != ERR_OK)
4337 {
4338 log("DBRM: startAISequence(): processing error", logging::LOG_TYPE_CRITICAL);
4339 throw runtime_error("DBRM: startAISequence(): processing error");
4340 }
4341 }
4342
getAIRange(uint32_t OID,uint32_t count,uint64_t * firstNum)4343 bool DBRM::getAIRange(uint32_t OID, uint32_t count, uint64_t* firstNum)
4344 {
4345 ByteStream command, response;
4346 uint8_t err;
4347
4348 command << GET_AI_RANGE << OID << count;
4349 err = send_recv(command, response);
4350
4351 if (err != ERR_OK)
4352 {
4353 log("DBRM: getAIRange(): network error", logging::LOG_TYPE_CRITICAL);
4354 throw runtime_error("DBRM: getAIRange(): network error");
4355 }
4356
4357 response >> err;
4358
4359 if (err != ERR_OK)
4360 {
4361 log("DBRM: getAIRange(): processing error", logging::LOG_TYPE_CRITICAL);
4362 throw runtime_error("DBRM: getAIRange(): processing error");
4363 }
4364
4365 response >> err;
4366
4367 if (err == 0)
4368 return false;
4369
4370 response >> *firstNum;
4371 idbassert(response.length() == 0);
4372 return true;
4373 }
4374
getAIValue(uint32_t OID,uint64_t * value)4375 bool DBRM::getAIValue(uint32_t OID, uint64_t* value)
4376 {
4377 return getAIRange(OID, 0, value);
4378 }
4379
resetAISequence(uint32_t OID,uint64_t value)4380 void DBRM::resetAISequence(uint32_t OID, uint64_t value)
4381 {
4382 ByteStream command, response;
4383 uint8_t err;
4384
4385 command << RESET_AI_SEQUENCE << OID << value;
4386 err = send_recv(command, response);
4387
4388 if (err != ERR_OK)
4389 {
4390 log("DBRM: resetAISequence(): network error", logging::LOG_TYPE_CRITICAL);
4391 throw runtime_error("DBRM: resetAISequence(): network error");
4392 }
4393
4394 response >> err;
4395 idbassert(response.length() == 0);
4396
4397 if (err != ERR_OK)
4398 {
4399 log("DBRM: resetAISequence(): processing error", logging::LOG_TYPE_CRITICAL);
4400 throw runtime_error("DBRM: resetAISequence(): processing error");
4401 }
4402 }
4403
getAILock(uint32_t OID)4404 void DBRM::getAILock(uint32_t OID)
4405 {
4406 ByteStream command, response;
4407 uint8_t err;
4408
4409 command << GET_AI_LOCK << OID;
4410 err = send_recv(command, response);
4411
4412 if (err != ERR_OK)
4413 {
4414 log("DBRM: getAILock(): network error", logging::LOG_TYPE_CRITICAL);
4415 throw runtime_error("DBRM: getAILock(): network error");
4416 }
4417
4418 response >> err;
4419 idbassert(response.length() == 0);
4420
4421 if (err != ERR_OK)
4422 {
4423 log("DBRM: getAILock(): processing error", logging::LOG_TYPE_CRITICAL);
4424 throw runtime_error("DBRM: getAILock(): processing error");
4425 }
4426 }
4427
releaseAILock(uint32_t OID)4428 void DBRM::releaseAILock(uint32_t OID)
4429 {
4430 ByteStream command, response;
4431 uint8_t err;
4432
4433 command << RELEASE_AI_LOCK << OID;
4434 err = send_recv(command, response);
4435
4436 if (err != ERR_OK)
4437 {
4438 log("DBRM: releaseAILock(): network error", logging::LOG_TYPE_CRITICAL);
4439 throw runtime_error("DBRM: releaseAILock(): network error");
4440 }
4441
4442 response >> err;
4443 idbassert(response.length() == 0);
4444
4445 if (err != ERR_OK)
4446 {
4447 log("DBRM: releaseAILock(): processing error", logging::LOG_TYPE_CRITICAL);
4448 throw runtime_error("DBRM: releaseAILock(): processing error");
4449 }
4450 }
4451
deleteAISequence(uint32_t OID)4452 void DBRM::deleteAISequence(uint32_t OID)
4453 {
4454 ByteStream command, response;
4455 uint8_t err;
4456
4457 command << DELETE_AI_SEQUENCE << OID;
4458 err = send_recv(command, response);
4459
4460 if (err != ERR_OK)
4461 {
4462 log("DBRM:deleteAILock(): network error", logging::LOG_TYPE_CRITICAL);
4463 throw runtime_error("DBRM: deleteAILock(): network error");
4464 }
4465
4466 response >> err;
4467 idbassert(response.length() == 0);
4468
4469 if (err != ERR_OK)
4470 {
4471 log("DBRM: deleteAILock(): processing error", logging::LOG_TYPE_CRITICAL);
4472 throw runtime_error("DBRM: deleteAILock(): processing error");
4473 }
4474 }
4475
invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN txnid,vector<LBID_t> * plbidList)4476 void DBRM::invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN txnid, vector<LBID_t>* plbidList)
4477 {
4478 // Here we want to minimize the number of calls to dbrm
4479 // Given that, and the fact that we need to know the column type
4480 // in order to set the invalid min and max correctly in the extents,
4481 // We do the following:
4482 // 1) Maintain a vector of all extents we've looked at.
4483 // 2) Get the list of uncommitted lbids for the transaction.
4484 // 3) Look in that list to see if we've already looked at this extent.
4485 // 4) If not,
4486 // a) lookup the min and max lbid for the extent it belongs to
4487 // b) lookup the column oid for that lbid
4488 // c) add to the vector of extents
4489 // 5) Create a list of CPInfo structures with the first lbid and col type of each extent
4490 // 6) Lookup the column type for each retrieved oid.
4491 // 7) mark each extent invalid, just like we would during update. This sets the proper
4492 // min and max (and set the state to CP_UPDATING.
4493 // 6) Call setExtentsMaxMin to set the state to CP_INVALID.
4494
4495 vector<LBID_t> localLBIDList;
4496
4497 boost::shared_ptr<execplan::CalpontSystemCatalog> csc;
4498 CPInfoList_t cpInfos;
4499 CPInfo aInfo;
4500 int oid;
4501 uint16_t dbRoot;
4502 uint32_t partitionNum;
4503 uint16_t segmentNum;
4504 uint32_t fileBlockOffset;
4505
4506 // 2) Get the list of uncommitted lbids for the transaction, if we weren't given one.
4507 if (plbidList == NULL)
4508 {
4509 getUncommittedExtentLBIDs(static_cast<VER_t>(txnid), localLBIDList);
4510 plbidList = &localLBIDList;
4511 }
4512
4513 if (plbidList->size() == 0)
4514 {
4515 return; // Nothing to do.
4516 }
4517
4518 vector<LBID_t>::const_iterator iter = plbidList->begin();
4519 vector<LBID_t>::const_iterator end = plbidList->end();
4520 csc = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog();
4521
4522 for (; iter != end; ++iter)
4523 {
4524 LBID_t lbid = *iter;
4525 aInfo.firstLbid = lbid;
4526
4527 // lookup the column oid for that lbid (all we care about is oid here)
4528 if (em->lookupLocal(lbid, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset) == 0)
4529 {
4530 if (execplan::isUnsigned(csc->colType(oid).colDataType))
4531 {
4532 aInfo.max = 0;
4533 aInfo.min = numeric_limits<uint64_t>::max();
4534 }
4535 else
4536 {
4537 aInfo.max = numeric_limits<int64_t>::min();
4538 aInfo.min = numeric_limits<int64_t>::max();
4539 }
4540 }
4541 else
4542 {
4543 // We have a problem, but we need to put something in. This should never happen.
4544 aInfo.max = numeric_limits<int64_t>::min();
4545 aInfo.min = numeric_limits<int64_t>::max();
4546 }
4547
4548 aInfo.seqNum = -2;
4549 cpInfos.push_back(aInfo);
4550 }
4551
4552 // Call setExtentsMaxMin to invalidate and set the proper max/min in each extent
4553 setExtentsMaxMin(cpInfos);
4554 }
4555
4556 } //namespace
4557