1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*****************************************************************************
19  * $Id: dbrm.cpp 1878 2013-05-02 15:17:12Z dcathey $
20  *
21  ****************************************************************************/
22 
23 #include <iostream>
24 #include <sys/types.h>
25 #include <vector>
26 #ifdef __linux__
27 #include <values.h>
28 #endif
29 #include <boost/thread.hpp>
30 //#define NDEBUG
31 #include <cassert>
32 
33 #include "oamcache.h"
34 #include "rwlock.h"
35 #include "mastersegmenttable.h"
36 #include "extentmap.h"
37 #include "copylocks.h"
38 #include "vss.h"
39 #include "vbbm.h"
40 #include "socketclosed.h"
41 #include "configcpp.h"
42 #include "sessionmanagerserver.h"
43 #include "messagequeuepool.h"
44 #define DBRM_DLLEXPORT
45 #include "dbrm.h"
46 #undef DBRM_DLLEXPORT
47 
48 #ifdef BRM_DEBUG
49 #define CHECK_EMPTY(x) \
50 	if (x.length() != 0) \
51 		throw logic_error("DBRM: got a message of the wrong size");
52 #else
53 #define CHECK_EMPTY(x)
54 #endif
55 
56 #define DO_ERR_NETWORK \
57 	MessageQueueClientPool::releaseInstance(msgClient); \
58 	msgClient = NULL; \
59 	mutex.unlock(); \
60 	return ERR_NETWORK;
61 
62 using namespace std;
63 using namespace messageqcpp;
64 using namespace oam;
65 
66 #ifdef BRM_INFO
67 #include "tracer.h"
68 #endif
69 
70 namespace BRM
71 {
72 
DBRM(bool noBRMinit)73 DBRM::DBRM(bool noBRMinit) : fDebug(false)
74 {
75     if (!noBRMinit)
76     {
77         mst.reset(new MasterSegmentTable());
78         em.reset(new ExtentMap());
79         vss.reset(new VSS());
80         vbbm.reset(new VBBM());
81         copylocks.reset(new CopyLocks());
82 
83         em->setReadOnly();
84         vss->setReadOnly();
85         vbbm->setReadOnly();
86     }
87 
88     msgClient = NULL;
89     masterName = "DBRM_Controller";
90     config = config::Config::makeConfig();
91 #ifdef BRM_INFO
92     fDebug = ("Y" == config->getConfig("DBRM", "Debug"));
93 #endif
94 }
95 
DBRM(const DBRM & brm)96 DBRM::DBRM(const DBRM& brm)
97 {
98     throw logic_error("DBRM: Don't use the copy constructor.");
99 }
100 
~DBRM()101 DBRM::~DBRM() throw()
102 {
103     if (msgClient != NULL)
104         MessageQueueClientPool::releaseInstance(msgClient);
105 }
106 
operator =(const DBRM & brm)107 DBRM& DBRM::operator=(const DBRM& brm)
108 {
109     throw logic_error("DBRM: Don't use the = operator.");
110 }
111 
112 
saveState()113 int DBRM::saveState() throw()
114 {
115 #ifdef BRM_INFO
116 
117     if (fDebug) TRACER_WRITENOW("saveState()");
118 
119 #endif
120     string prefix = config->getConfig("SystemConfig", "DBRMRoot");
121 
122     if (prefix.length() == 0)
123     {
124         cerr << "Error: Need a valid Calpont configuation file" << endl;
125         exit(1);
126     }
127 
128     int rc = saveState(prefix);
129 
130     return rc;
131 }
132 
saveState(string filename)133 int DBRM::saveState(string filename) throw()
134 {
135 #ifdef BRM_INFO
136 
137     if (fDebug)
138     {
139         TRACER_WRITELATER("saveState(filename)");
140         TRACER_ADDSTRINPUT(filename);
141         TRACER_WRITE;
142     }
143 
144 #endif
145     string emFilename = filename + "_em";
146     string vssFilename = filename + "_vss";
147     string vbbmFilename = filename + "_vbbm";
148     bool locked[3] = { false, false, false };
149 
150     try
151     {
152         vbbm->lock(VBBM::READ);
153         locked[0] = true;
154         vss->lock(VSS::READ);
155         locked[1] = true;
156         copylocks->lock(CopyLocks::READ);
157         locked[2] = true;
158 
159         saveExtentMap(emFilename);
160         vbbm->save(vbbmFilename);
161         vss->save(vssFilename);
162 
163         copylocks->release(CopyLocks::READ);
164         locked[2] = false;
165         vss->release(VSS::READ);
166         locked[1] = false;
167         vbbm->release(VBBM::READ);
168         locked[0] = false;
169     }
170     catch (exception& e)
171     {
172         if (locked[2])
173             copylocks->release(CopyLocks::READ);
174 
175         if (locked[1])
176             vss->release(VSS::READ);
177 
178         if (locked[0])
179             vbbm->release(VBBM::READ);
180 
181         return -1;
182     }
183 
184     return 0;
185 }
186 
saveExtentMap(const string & filename)187 int DBRM::saveExtentMap(const string& filename) throw()
188 {
189 #ifdef BRM_INFO
190 
191     if (fDebug)
192     {
193         TRACER_WRITELATER("saveExtentMap");
194         TRACER_ADDSTRINPUT(filename);
195         TRACER_WRITE;
196     }
197 
198 #endif
199 
200     try
201     {
202         em->save(filename);
203     }
204     catch (exception& e)
205     {
206         cerr << e.what() << endl;
207         return -1;
208     }
209 
210     return 0;
211 }
212 
213 // @bug 1055+.  New functions added for multiple files per OID enhancement.
lookupLocal(LBID_t lbid,VER_t verid,bool vbFlag,OID_t & oid,uint16_t & dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,uint32_t & fileBlockOffset)214 int DBRM::lookupLocal(LBID_t lbid, VER_t verid, bool vbFlag, OID_t& oid,
215                       uint16_t& dbRoot, uint32_t& partitionNum, uint16_t& segmentNum, uint32_t& fileBlockOffset) throw()
216 {
217 #ifdef BRM_INFO
218 
219     if (fDebug)
220     {
221         TRACER_WRITELATER("lookupLocal(lbid,ver,..)");
222         TRACER_ADDINPUT(lbid);
223         TRACER_ADDINPUT(verid);
224         TRACER_ADDBOOLINPUT(vbFlag);
225         TRACER_ADDOUTPUT(oid);
226         TRACER_ADDSHORTOUTPUT(dbRoot);
227         TRACER_ADDOUTPUT(partitionNum);
228         TRACER_ADDSHORTOUTPUT(segmentNum);
229         TRACER_ADDOUTPUT(fileBlockOffset);
230         TRACER_WRITE;
231     }
232 
233 #endif
234     bool locked[2] = {false, false};
235     int ret;
236     bool tooOld = false;
237 
238     try
239     {
240         if (!vbFlag)
241             return em->lookupLocal(lbid, (int&)oid, dbRoot, partitionNum, segmentNum, fileBlockOffset);
242         else
243         {
244             vbbm->lock(VBBM::READ);
245             locked[0] = true;
246             ret = vbbm->lookup(lbid, verid, oid, fileBlockOffset);
247             vbbm->release(VBBM::READ);
248             locked[0] = false;
249 
250             if (ret < 0)
251             {
252                 vss->lock(VSS::READ);
253                 locked[1] = true;
254                 tooOld = vss->isTooOld(lbid, verid);
255                 vss->release(VSS::READ);
256                 locked[1] = false;
257 
258                 if (tooOld)
259                     return ERR_SNAPSHOT_TOO_OLD;
260             }
261 
262             return ret;
263         }
264     }
265     catch (exception& e)
266     {
267         if (locked[1])
268             vss->release(VSS::READ);
269 
270         if (locked[0])
271             vbbm->release(VBBM::READ);
272 
273         cerr << e.what() << endl;
274         return -1;
275     }
276 }
277 
lookupLocal(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,uint32_t fileBlockOffset,LBID_t & lbid)278 int DBRM::lookupLocal(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, uint32_t fileBlockOffset, LBID_t& lbid) throw()
279 {
280 #ifdef BRM_INFO
281 
282     if (fDebug)
283     {
284         TRACER_WRITELATER("lookupLocal(oid,fbo,..)");
285         TRACER_ADDINPUT(oid);
286         TRACER_ADDINPUT(partitionNum);
287         TRACER_ADDSHORTINPUT(segmentNum);
288         TRACER_ADDINPUT(fileBlockOffset);
289         TRACER_ADDOUTPUT(lbid);
290         TRACER_WRITE;
291     }
292 
293 #endif
294 
295     try
296     {
297         return em->lookupLocal(oid, partitionNum, segmentNum, fileBlockOffset, lbid);
298     }
299     catch (exception& e)
300     {
301         cerr << e.what() << endl;
302         return -1;
303     }
304 }
305 
lookupLocal_DBroot(OID_t oid,uint32_t dbroot,uint32_t partitionNum,uint16_t segmentNum,uint32_t fileBlockOffset,LBID_t & lbid)306 int DBRM::lookupLocal_DBroot(OID_t oid, uint32_t dbroot, uint32_t partitionNum, uint16_t segmentNum,
307                              uint32_t fileBlockOffset, LBID_t& lbid) throw()
308 {
309 #ifdef BRM_INFO
310 
311     if (fDebug)
312     {
313         TRACER_WRITELATER("lookupLocal(oid,fbo,..)");
314         TRACER_ADDINPUT(oid);
315         TRACER_ADDINPUT(partitionNum);
316         TRACER_ADDSHORTINPUT(segmentNum);
317         TRACER_ADDINPUT(fileBlockOffset);
318         TRACER_ADDOUTPUT(lbid);
319         TRACER_WRITE;
320     }
321 
322 #endif
323 
324     try
325     {
326         return em->lookupLocal_DBroot(oid, dbroot, partitionNum, segmentNum, fileBlockOffset, lbid);
327     }
328     catch (exception& e)
329     {
330         cerr << e.what() << endl;
331         return -1;
332     }
333 }
334 
335 // @bug 1055-
336 
337 //------------------------------------------------------------------------------
338 // Lookup/return starting LBID for the specified OID, partition, segment, and
339 // file block offset.
340 //------------------------------------------------------------------------------
lookupLocalStartLbid(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,uint32_t fileBlockOffset,LBID_t & lbid)341 int DBRM::lookupLocalStartLbid(OID_t    oid,
342                                uint32_t partitionNum,
343                                uint16_t segmentNum,
344                                uint32_t fileBlockOffset,
345                                LBID_t&  lbid) throw()
346 {
347 #ifdef BRM_INFO
348 
349     if (fDebug)
350     {
351         TRACER_WRITELATER("lookupLocalStartLbid(oid,fbo,..)");
352         TRACER_ADDINPUT(oid);
353         TRACER_ADDINPUT(partitionNum);
354         TRACER_ADDSHORTINPUT(segmentNum);
355         TRACER_ADDINPUT(fileBlockOffset);
356         TRACER_ADDOUTPUT(lbid);
357         TRACER_WRITE;
358     }
359 
360 #endif
361 
362     try
363     {
364         return em->lookupLocalStartLbid(oid, partitionNum, segmentNum,
365                                         fileBlockOffset, lbid);
366     }
367     catch (exception& e)
368     {
369         cerr << e.what() << endl;
370         return -1;
371     }
372 }
373 
lookup(OID_t oid,LBIDRange_v & lbidList)374 int DBRM::lookup(OID_t oid, LBIDRange_v& lbidList) throw()
375 {
376 #ifdef BRM_INFO
377 
378     if (fDebug)
379     {
380         TRACER_WRITELATER("lookup(oid,range)");
381         TRACER_ADDINPUT(oid);
382         TRACER_WRITE;
383     }
384 
385 #endif
386 
387     try
388     {
389         em->lookup(oid, lbidList);
390         return 0;
391     }
392     catch (exception& e)
393     {
394         cerr << e.what() << endl;
395         return -1;
396     }
397 }
398 
399 // Casual Partitioning support
markExtentInvalid(const LBID_t lbid,execplan::CalpontSystemCatalog::ColDataType colDataType)400 int DBRM::markExtentInvalid(const LBID_t lbid,
401                             execplan::CalpontSystemCatalog::ColDataType colDataType) DBRM_THROW
402 {
403 #ifdef BRM_INFO
404 
405     if (fDebug)
406     {
407         TRACER_WRITELATER("markExtentInvalid");
408         TRACER_ADDINPUT(lbid);
409         TRACER_WRITE;
410     }
411 
412 #endif
413 
414     ByteStream command, response;
415     uint8_t err;
416 
417     command << MARKEXTENTINVALID << (uint64_t)lbid << (uint32_t)colDataType;
418     err = send_recv(command, response);
419 
420     if (err != ERR_OK)
421         return err;
422 
423     if (response.length() == 0)
424         return ERR_NETWORK;
425 
426     response >> err;
427     CHECK_EMPTY(response);
428     return err;
429 }
430 
markExtentsInvalid(const vector<LBID_t> & lbids,const std::vector<execplan::CalpontSystemCatalog::ColDataType> & colDataTypes)431 int DBRM::markExtentsInvalid(const vector<LBID_t>& lbids,
432                              const std::vector<execplan::CalpontSystemCatalog::ColDataType>& colDataTypes) DBRM_THROW
433 {
434 #ifdef BRM_INFO
435 
436     if (fDebug) TRACER_WRITENOW("markExtentsInvalid");
437 
438 #endif
439     ByteStream command, response;
440     uint8_t err;
441     uint32_t size = lbids.size(), i;
442 
443     command << MARKMANYEXTENTSINVALID << size;
444 
445     for (i = 0; i < size; i++)
446     {
447         command << (uint64_t) lbids[i];
448         command << (uint32_t) colDataTypes[i];
449     }
450 
451     err = send_recv(command, response);
452 
453     if (err != ERR_OK)
454         return err;
455 
456     if (response.length() == 0)
457         return ERR_NETWORK;
458 
459     response >> err;
460     CHECK_EMPTY(response);
461     return err;
462 }
463 
getExtentMaxMin(const LBID_t lbid,int64_t & max,int64_t & min,int32_t & seqNum)464 int DBRM::getExtentMaxMin(const LBID_t lbid, int64_t& max, int64_t& min, int32_t& seqNum) throw()
465 {
466 #ifdef BRM_INFO
467 
468     if (fDebug)
469     {
470         TRACER_WRITELATER("getExtentMaxMin");
471         TRACER_ADDINPUT(lbid);
472         TRACER_ADDOUTPUT(max);
473         TRACER_ADDOUTPUT(min);
474         TRACER_ADDOUTPUT(seqNum);
475         TRACER_WRITE;
476     }
477 
478 #endif
479 
480     try
481     {
482         int ret = em->getMaxMin(lbid, max, min, seqNum);
483         return ret;
484     }
485     catch (exception& e)
486     {
487         cerr << e.what() << endl;
488         return false;
489     }
490 }
491 
setExtentMaxMin(const LBID_t lbid,const int64_t max,const int64_t min,const int32_t seqNum)492 int DBRM::setExtentMaxMin(const LBID_t lbid, const int64_t max, const int64_t min, const int32_t seqNum) DBRM_THROW
493 {
494 #ifdef BRM_INFO
495 
496     if (fDebug)
497     {
498         TRACER_WRITELATER("setExtentMaxMin");
499         TRACER_ADDINPUT(lbid);
500         TRACER_ADDINPUT(max);
501         TRACER_ADDINPUT(min);
502         TRACER_ADDINPUT(seqNum);
503         TRACER_WRITE;
504     }
505 
506 #endif
507     ByteStream command, response;
508     uint8_t err;
509 
510     command << SETEXTENTMAXMIN << (uint64_t)lbid << (uint64_t)max << (uint64_t)min << (uint64_t)seqNum;
511     err = send_recv(command, response);
512 
513     if (err != ERR_OK)
514         return err;
515 
516     if (response.length() == 0)
517         return ERR_NETWORK;
518 
519     response >> err;
520     CHECK_EMPTY(response);
521     return err;
522 
523 }
524 
525 // @bug 1970 - Added function below to set multiple extents casual partition info in one call.
setExtentsMaxMin(const CPInfoList_t & cpInfos)526 int DBRM::setExtentsMaxMin(const CPInfoList_t& cpInfos) DBRM_THROW
527 {
528     CPInfoList_t::const_iterator it;
529 #ifdef BRM_INFO
530 
531     if (fDebug)
532     {
533         TRACER_WRITELATER("setExtentsMaxMin");
534 
535         for (it = cpInfos.begin(); it != cpInfos.end(); it++)
536         {
537             TRACER_ADDINPUT(it->firstLbid);
538             TRACER_ADDINPUT(it->max);
539             TRACER_ADDINPUT(it->min);
540             TRACER_ADDINPUT(it->seqNum);
541             TRACER_WRITE;
542         }
543     }
544 
545 #endif
546     ByteStream command, response;
547     uint8_t err = 0;
548 
549     if (cpInfos.size() == 0)
550         return err;
551 
552     if (cpInfos.empty())
553         return ERR_OK;
554 
555     command << SETMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size();
556 
557     for (it = cpInfos.begin(); it != cpInfos.end(); it++)
558     {
559         command << (uint64_t)it->firstLbid << (uint64_t)it->max << (uint64_t)it->min << (uint32_t)it->seqNum;
560     }
561 
562     err = send_recv(command, response);
563 
564     if (err != ERR_OK)
565         return err;
566 
567     if (response.length() == 0)
568         return ERR_NETWORK;
569 
570     response >> err;
571     CHECK_EMPTY(response);
572     return err;
573 }
574 
575 //------------------------------------------------------------------------------
576 // @bug 2117 - Add function to merge Casual Partition info with current extent
577 // map information.
578 //------------------------------------------------------------------------------
mergeExtentsMaxMin(const CPInfoMergeList_t & cpInfos)579 int DBRM::mergeExtentsMaxMin(const CPInfoMergeList_t& cpInfos) DBRM_THROW
580 {
581     CPInfoMergeList_t::const_iterator it;
582 #ifdef BRM_INFO
583 
584     if (fDebug)
585     {
586         TRACER_WRITELATER("updateExtentsMaxMin");
587 
588         for (it = cpInfos.begin(); it != cpInfos.end(); it++)
589         {
590             TRACER_ADDINPUT(it->startLbid);
591             TRACER_ADDINPUT(it->max);
592             TRACER_ADDINPUT(it->min);
593             TRACER_ADDINPUT(it->seqNum);
594             TRACER_ADDINPUT(it->type);
595             TRACER_ADDINPUT(it->newExtent);
596             TRACER_WRITE;
597         }
598     }
599 
600 #endif
601     ByteStream command, response;
602     uint8_t err;
603 
604     command << MERGEMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size();
605 
606     for (it = cpInfos.begin(); it != cpInfos.end(); it++)
607     {
608         command << (uint64_t)it->startLbid <<
609                 (uint64_t)it->max       <<
610                 (uint64_t)it->min       <<
611                 (uint32_t)it->seqNum    <<
612                 (uint32_t)it->type      <<
613                 (uint32_t)it->newExtent;
614     }
615 
616     err = send_recv(command, response);
617 
618     if (err != ERR_OK)
619         return err;
620 
621     if (response.length() == 0)
622         return ERR_NETWORK;
623 
624     response >> err;
625     CHECK_EMPTY(response);
626     return err;
627 }
628 
vssLookup(LBID_t lbid,const QueryContext & verInfo,VER_t txnID,VER_t * outVer,bool * vbFlag,bool vbOnly)629 int DBRM::vssLookup(LBID_t lbid, const QueryContext& verInfo, VER_t txnID, VER_t* outVer,
630                     bool* vbFlag, bool vbOnly) throw()
631 {
632 #ifdef BRM_INFO
633 
634     if (fDebug)
635     {
636         TRACER_WRITELATER("vssLookup");
637         TRACER_ADDINPUT(lbid);
638         TRACER_ADDINPUT(verInfo);
639         TRACER_ADDINPUT(txnID);
640         TRACER_ADDBOOLINPUT(vbOnly);
641         TRACER_WRITE;
642     }
643 
644 #endif
645 
646     if (!vbOnly && vss->isEmpty())
647     {
648         *outVer = 0;
649         *vbFlag = false;
650         return -1;
651     }
652 
653     bool locked = false;
654 
655     try
656     {
657         int rc = 0;
658         vss->lock(VSS::READ);
659         locked = true;
660         rc = vss->lookup(lbid, verInfo, txnID, outVer, vbFlag, vbOnly);
661         vss->release(VSS::READ);
662         return rc;
663     }
664     catch (exception& e)
665     {
666         if (locked)
667             vss->release(VSS::READ);
668 
669         cerr << e.what() << endl;
670         return -1;
671     }
672 }
673 
bulkVSSLookup(const std::vector<LBID_t> & lbids,const QueryContext_vss & verInfo,VER_t txnID,std::vector<VSSData> * out)674 int DBRM::bulkVSSLookup(const std::vector<LBID_t>& lbids, const QueryContext_vss& verInfo,
675                         VER_t txnID, std::vector<VSSData>* out)
676 {
677     uint32_t i;
678     bool locked = false;
679 
680     try
681     {
682         out->resize(lbids.size());
683         vss->lock(VSS::READ);
684         locked = true;
685 
686         if (vss->isEmpty(false))
687         {
688             for (i = 0; i < lbids.size(); i++)
689             {
690                 VSSData& vd = (*out)[i];
691                 vd.verID = 0;
692                 vd.vbFlag = false;
693                 vd.returnCode = -1;
694             }
695         }
696         else
697         {
698             for (i = 0; i < lbids.size(); i++)
699             {
700                 VSSData& vd = (*out)[i];
701                 vd.returnCode = vss->lookup(lbids[i], verInfo, txnID, &vd.verID, &vd.vbFlag, false);
702             }
703         }
704 
705         vss->release(VSS::READ);
706         return 0;
707     }
708     catch (exception& e)
709     {
710         cerr << e.what() << endl;
711     }
712     catch (...)
713     {
714         cerr << "bulkVSSLookup: caught an exception" << endl;
715     }
716 
717     if (locked)
718         vss->release(VSS::READ);
719 
720     out->clear();
721     return -1;
722 }
723 
getCurrentVersion(LBID_t lbid,bool * isLocked) const724 VER_t DBRM::getCurrentVersion(LBID_t lbid, bool* isLocked) const
725 {
726     bool locked = false;
727     VER_t ret = 0;
728 
729     try
730     {
731         vss->lock(VSS::READ);
732         locked = true;
733         ret = vss->getCurrentVersion(lbid, isLocked);
734         vss->release(VSS::READ);
735         locked = false;
736     }
737     catch (exception& e)
738     {
739         cerr << e.what() << endl;
740 
741         if (locked)
742             vss->release(VSS::READ);
743 
744         throw;
745     }
746 
747     return ret;
748 }
749 
bulkGetCurrentVersion(const vector<LBID_t> & lbids,vector<VER_t> * versions,vector<bool> * isLocked) const750 int DBRM::bulkGetCurrentVersion(const vector<LBID_t>& lbids, vector<VER_t>* versions,
751                                 vector<bool>* isLocked) const
752 {
753     bool locked = false;
754 
755     versions->resize(lbids.size());
756 
757     if (isLocked != NULL)
758         isLocked->resize(lbids.size());
759 
760     try
761     {
762         vss->lock(VSS::READ);
763         locked = true;
764 
765         if (isLocked != NULL)
766         {
767             bool tmp = false;
768 
769             for (uint32_t i = 0; i < lbids.size(); i++)
770             {
771                 (*versions)[i] = vss->getCurrentVersion(lbids[i], &tmp);
772                 (*isLocked)[i] = tmp;
773             }
774         }
775         else
776             for (uint32_t i = 0; i < lbids.size(); i++)
777                 (*versions)[i] = vss->getCurrentVersion(lbids[i], NULL);
778 
779         vss->release(VSS::READ);
780         locked = false;
781         return 0;
782     }
783     catch (exception& e)
784     {
785         versions->clear();
786         cerr << e.what() << endl;
787 
788         if (locked)
789             vss->release(VSS::READ);
790 
791         return -1;
792     }
793 }
794 
795 
getHighestVerInVB(LBID_t lbid,VER_t max) const796 VER_t DBRM::getHighestVerInVB(LBID_t lbid, VER_t max) const
797 {
798     bool locked = false;
799     VER_t ret = -1;
800 
801     try
802     {
803         vss->lock(VSS::READ);
804         locked = true;
805         ret = vss->getHighestVerInVB(lbid, max);
806         vss->release(VSS::READ);
807         locked = false;
808     }
809     catch (exception& e)
810     {
811         cerr << e.what() << endl;
812 
813         if (locked)
814             vss->release(VSS::READ);
815 
816         throw;
817     }
818 
819     return ret;
820 }
821 
isVersioned(LBID_t lbid,VER_t ver) const822 bool DBRM::isVersioned(LBID_t lbid, VER_t ver) const
823 {
824     bool ret = false;
825     bool locked = false;
826 
827     try
828     {
829         vss->lock(VSS::READ);
830         locked = true;
831         ret = vss->isVersioned(lbid, ver);
832         vss->release(VSS::READ);
833         locked = false;
834     }
835     catch (exception& e)
836     {
837         cerr << e.what() << endl;
838 
839         if (locked)
840             vss->release(VSS::READ);
841 
842         throw;
843     }
844 
845     return ret;
846 }
847 
send_recv(const ByteStream & in,ByteStream & out)848 int8_t DBRM::send_recv(const ByteStream& in, ByteStream& out) throw()
849 {
850 #ifdef BRM_INFO
851 
852     if (fDebug) TRACER_WRITENOW("send_recv");
853 
854 #endif
855     uint8_t attempt = 1;
856 
857     mutex.lock();
858 
859 reconnect:
860 
861     if (msgClient == NULL)
862         try
863         {
864             msgClient = MessageQueueClientPool::getInstance(masterName);
865         }
866         catch (exception& e)
867         {
868             cerr << "class DBRM failed to create a MessageQueueClient: " <<
869                  e.what() << endl;
870             msgClient = NULL;
871             mutex.unlock();
872             return ERR_NETWORK;
873         }
874 
875     try
876     {
877         msgClient->write(in);
878         out = msgClient->read();
879     }
880     /* If we add a timeout to the read() call, uncomment this clause
881     catch (SocketClosed &e) {
882     	cerr << "DBRM::send_recv: controller node closed the connection" << endl;
883     	DO_ERR_NETWORK;
884     }
885     */
886     catch (exception& e)
887     {
888         cerr << "DBRM::send_recv caught: " << e.what() << endl;
889 
890         if (attempt++ == 1)
891         {
892             MessageQueueClientPool::releaseInstance(msgClient);
893             msgClient = NULL;
894             goto reconnect;
895         }
896 
897         DO_ERR_NETWORK;
898     }
899 
900     if (out.length() == 0)
901     {
902         cerr << "DBRM::send_recv: controller node closed the connection" << endl;
903 
904         if (attempt <= 2)
905         {
906             MessageQueueClientPool::releaseInstance(msgClient);
907             msgClient = NULL;
908             if (attempt++ > 1)
909             {
910                 sleep(3);
911             }
912             goto reconnect;
913         }
914 
915         DO_ERR_NETWORK;
916     }
917 
918     mutex.unlock();
919     return ERR_OK;
920 }
921 
922 //------------------------------------------------------------------------------
923 // Send a request to create a "stripe" of column extents for the specified
924 // column OIDs and DBRoot.
925 //------------------------------------------------------------------------------
createStripeColumnExtents(const std::vector<CreateStripeColumnExtentsArgIn> & cols,uint16_t dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,std::vector<CreateStripeColumnExtentsArgOut> & extents)926 int DBRM::createStripeColumnExtents(
927     const std::vector<CreateStripeColumnExtentsArgIn>& cols,
928     uint16_t  dbRoot,
929     uint32_t& partitionNum,
930     uint16_t& segmentNum,
931     std::vector<CreateStripeColumnExtentsArgOut>& extents) DBRM_THROW
932 {
933 #ifdef BRM_INFO
934 
935     if (fDebug)
936     {
937         TRACER_WRITELATER("createStripeColumnExtents");
938         TRACER_WRITE;
939     }
940 
941 #endif
942 
943     ByteStream command, response;
944     uint8_t  err;
945     uint16_t tmp16;
946     uint32_t tmp32;
947 
948     command << CREATE_STRIPE_COLUMN_EXTENTS;
949     serializeInlineVector(command, cols);
950     command << dbRoot << partitionNum;
951 
952     err = send_recv(command, response);
953 
954     if (err != ERR_OK)
955         return err;
956 
957     if (response.length() == 0)
958         return ERR_NETWORK;
959 
960     try
961     {
962         response >> err;
963 
964         if (err != 0)
965             return (int) err;
966 
967         response >> tmp32;
968         partitionNum = tmp32;
969         response >> tmp16;
970         segmentNum = tmp16;
971         deserializeInlineVector(response, extents);
972     }
973     catch (exception& e)
974     {
975         cerr << e.what() << endl;
976         return ERR_FAILURE;
977     }
978 
979     CHECK_EMPTY(response);
980     return 0;
981 }
982 
983 //------------------------------------------------------------------------------
984 // Send a request to create a column extent for the specified OID and DBRoot.
985 //------------------------------------------------------------------------------
createColumnExtent_DBroot(OID_t oid,uint32_t colWidth,uint16_t dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,execplan::CalpontSystemCatalog::ColDataType colDataType,LBID_t & lbid,int & allocdSize,uint32_t & startBlockOffset)986 int DBRM::createColumnExtent_DBroot(OID_t oid,
987                                     uint32_t  colWidth,
988                                     uint16_t  dbRoot,
989                                     uint32_t& partitionNum,
990                                     uint16_t& segmentNum,
991                                     execplan::CalpontSystemCatalog::ColDataType colDataType,
992                                     LBID_t&    lbid,
993                                     int&       allocdSize,
994                                     uint32_t& startBlockOffset) DBRM_THROW
995 {
996 #ifdef BRM_INFO
997 
998     if (fDebug)
999     {
1000         TRACER_WRITELATER("createColumnExtent_DBroot");
1001         TRACER_ADDINPUT(oid);
1002         TRACER_ADDINPUT(colWidth);
1003         TRACER_ADDSHORTINPUT(dbRoot);
1004         TRACER_ADDOUTPUT(partitionNum);
1005         TRACER_ADDSHORTOUTPUT(segmentNum);
1006         TRACER_ADDINT64OUTPUT(lbid);
1007         TRACER_ADDOUTPUT(allocdSize);
1008         TRACER_ADDOUTPUT(startBlockOffset);
1009         TRACER_WRITE;
1010     }
1011 
1012 #endif
1013 
1014     ByteStream command, response;
1015     uint8_t  err;
1016     uint32_t tmp8 = (uint8_t)colDataType;
1017     uint16_t tmp16;
1018     uint32_t tmp32;
1019     uint64_t tmp64;
1020 
1021     command << CREATE_COLUMN_EXTENT_DBROOT << (ByteStream::quadbyte) oid <<
1022             colWidth << dbRoot << partitionNum << segmentNum << tmp8;
1023     err = send_recv(command, response);
1024 
1025     if (err != ERR_OK)
1026         return err;
1027 
1028     if (response.length() == 0)
1029         return ERR_NETWORK;
1030 
1031     try
1032     {
1033         response >> err;
1034 
1035         if (err != 0)
1036             return (int) err;
1037 
1038         response >> tmp32;
1039         partitionNum = tmp32;
1040         response >> tmp16;
1041         segmentNum = tmp16;
1042         response >> tmp64;
1043         lbid = (int64_t)tmp64;
1044         response >> tmp32;
1045         allocdSize = (int32_t)tmp32;
1046         response >> tmp32;
1047         startBlockOffset = (int32_t)tmp32;
1048     }
1049     catch (exception& e)
1050     {
1051         cerr << e.what() << endl;
1052         return ERR_FAILURE;
1053     }
1054 
1055     CHECK_EMPTY(response);
1056     return 0;
1057 }
1058 
1059 //------------------------------------------------------------------------------
1060 // Send a request to create a column extent for the exact segment file
1061 // specified by the requested OID, DBRoot, partition, and segment.
1062 //------------------------------------------------------------------------------
createColumnExtentExactFile(OID_t oid,uint32_t colWidth,uint16_t dbRoot,uint32_t partitionNum,uint16_t segmentNum,execplan::CalpontSystemCatalog::ColDataType colDataType,LBID_t & lbid,int & allocdSize,uint32_t & startBlockOffset)1063 int DBRM::createColumnExtentExactFile(OID_t oid,
1064                                       uint32_t  colWidth,
1065                                       uint16_t  dbRoot,
1066                                       uint32_t partitionNum,
1067                                       uint16_t segmentNum,
1068                                       execplan::CalpontSystemCatalog::ColDataType colDataType,
1069                                       LBID_t&    lbid,
1070                                       int&       allocdSize,
1071                                       uint32_t& startBlockOffset) DBRM_THROW
1072 {
1073 #ifdef BRM_INFO
1074 
1075     if (fDebug)
1076     {
1077         TRACER_WRITELATER("createColumnExtentExactFile");
1078         TRACER_ADDINPUT(oid);
1079         TRACER_ADDINPUT(colWidth);
1080         TRACER_ADDSHORTINPUT(dbRoot);
1081         TRACER_ADDOUTPUT(partitionNum);
1082         TRACER_ADDSHORTOUTPUT(segmentNum);
1083         TRACER_ADDINT64OUTPUT(lbid);
1084         TRACER_ADDOUTPUT(allocdSize);
1085         TRACER_ADDOUTPUT(startBlockOffset);
1086         TRACER_WRITE;
1087     }
1088 
1089 #endif
1090 
1091     ByteStream command, response;
1092     uint8_t  err;
1093     uint8_t  tmp8;
1094     uint16_t tmp16;
1095     uint32_t tmp32;
1096     uint64_t tmp64;
1097 
1098     tmp8 = (uint8_t)colDataType;
1099     command << CREATE_COLUMN_EXTENT_EXACT_FILE << (ByteStream::quadbyte) oid <<
1100             colWidth << dbRoot << partitionNum << segmentNum << tmp8;
1101     err = send_recv(command, response);
1102 
1103     if (err != ERR_OK)
1104         return err;
1105 
1106     if (response.length() == 0)
1107         return ERR_NETWORK;
1108 
1109     try
1110     {
1111         response >> err;
1112 
1113         if (err != 0)
1114             return (int) err;
1115 
1116         response >> tmp32;
1117         partitionNum = tmp32;
1118         response >> tmp16;
1119         segmentNum = tmp16;
1120         response >> tmp64;
1121         lbid = (int64_t)tmp64;
1122         response >> tmp32;
1123         allocdSize = (int32_t)tmp32;
1124         response >> tmp32;
1125         startBlockOffset = (int32_t)tmp32;
1126     }
1127     catch (exception& e)
1128     {
1129         cerr << e.what() << endl;
1130         return ERR_FAILURE;
1131     }
1132 
1133     CHECK_EMPTY(response);
1134     return 0;
1135 }
1136 
1137 //------------------------------------------------------------------------------
1138 // Send a request to create a dictionary store extent.
1139 //------------------------------------------------------------------------------
createDictStoreExtent(OID_t oid,uint16_t dbRoot,uint32_t partitionNum,uint16_t segmentNum,LBID_t & lbid,int & allocdSize)1140 int DBRM::createDictStoreExtent(OID_t oid,
1141                                 uint16_t  dbRoot,
1142                                 uint32_t  partitionNum,
1143                                 uint16_t  segmentNum,
1144                                 LBID_t&    lbid,
1145                                 int&       allocdSize) DBRM_THROW
1146 {
1147 #ifdef BRM_INFO
1148 
1149     if (fDebug)
1150     {
1151         TRACER_WRITELATER("createDictStoreExtent");
1152         TRACER_ADDINPUT(oid);
1153         TRACER_ADDSHORTINPUT(dbRoot);
1154         TRACER_ADDINPUT(partitionNum);
1155         TRACER_ADDSHORTINPUT(segmentNum);
1156         TRACER_ADDINT64OUTPUT(lbid);
1157         TRACER_ADDOUTPUT(allocdSize);
1158         TRACER_WRITE;
1159     }
1160 
1161 #endif
1162 
1163     ByteStream command, response;
1164     uint8_t  err;
1165     uint32_t tmp32;
1166     uint64_t tmp64;
1167 
1168     command << CREATE_DICT_STORE_EXTENT << (ByteStream::quadbyte) oid << dbRoot <<
1169             partitionNum << segmentNum;
1170     err = send_recv(command, response);
1171 
1172     if (err != ERR_OK)
1173         return err;
1174 
1175     if (response.length() == 0)
1176         return ERR_NETWORK;
1177 
1178     try
1179     {
1180         response >> err;
1181 
1182         if (err != 0)
1183             return (int) err;
1184 
1185         response >> tmp64;
1186         lbid = (int64_t)tmp64;
1187         response >> tmp32;
1188         allocdSize = (int32_t)tmp32;
1189     }
1190     catch (exception& e)
1191     {
1192         cerr << e.what() << endl;
1193         return ERR_FAILURE;
1194     }
1195 
1196     CHECK_EMPTY(response);
1197     return 0;
1198 }
1199 
1200 //------------------------------------------------------------------------------
1201 // Send a request to delete a set extents for the specified column OID and
1202 // DBRoot, and to return the extents to the free list.  HWMs for the last
1203 // stripe of extents in the specified DBRoot are updated accordingly.
1204 //------------------------------------------------------------------------------
rollbackColumnExtents_DBroot(OID_t oid,bool bDeleteAll,uint16_t dbRoot,uint32_t partitionNum,uint16_t segmentNum,HWM_t hwm)1205 int DBRM::rollbackColumnExtents_DBroot(OID_t oid,
1206                                        bool       bDeleteAll,
1207                                        uint16_t  dbRoot,
1208                                        uint32_t  partitionNum,
1209                                        uint16_t  segmentNum,
1210                                        HWM_t      hwm) DBRM_THROW
1211 {
1212 #ifdef BRM_INFO
1213 
1214     if (fDebug)
1215     {
1216         TRACER_WRITELATER("rollbackColumnExtents");
1217         TRACER_ADDINPUT(oid);
1218         TRACER_ADDBOOLINPUT(bDeleteAll);
1219         TRACER_ADDSHORTINPUT(dbRoot);
1220         TRACER_ADDINPUT(partitionNum);
1221         TRACER_ADDSHORTINPUT(segmentNum);
1222         TRACER_ADDINPUT(hwm);
1223         TRACER_WRITE;
1224     }
1225 
1226 #endif
1227 
1228     ByteStream command, response;
1229     uint8_t err;
1230 
1231     command << ROLLBACK_COLUMN_EXTENTS_DBROOT << (ByteStream::quadbyte) oid <<
1232             (uint8_t)bDeleteAll << dbRoot << partitionNum <<
1233             segmentNum << hwm;
1234     err = send_recv(command, response);
1235 
1236     if (err != ERR_OK)
1237         return err;
1238 
1239     if (response.length() != 1)
1240         return ERR_NETWORK;
1241 
1242     response >> err;
1243     CHECK_EMPTY(response);
1244     return err;
1245 }
1246 
1247 //------------------------------------------------------------------------------
1248 // Send a request to delete a set of extents for the specified dictionary store
1249 // OID and DBRoot, and to return the extents to the free list.  HWMs for the
1250 // last stripe of extents are updated accordingly.
1251 //------------------------------------------------------------------------------
rollbackDictStoreExtents_DBroot(OID_t oid,uint16_t dbRoot,uint32_t partitionNum,const vector<uint16_t> & segNums,const vector<HWM_t> & hwms)1252 int DBRM::rollbackDictStoreExtents_DBroot(OID_t oid,
1253         uint16_t            dbRoot,
1254         uint32_t            partitionNum,
1255         const vector<uint16_t>& segNums,
1256         const vector<HWM_t>& hwms) DBRM_THROW
1257 {
1258 #ifdef BRM_INFO
1259 
1260     if (fDebug)
1261     {
1262         TRACER_WRITELATER("rollbackDictStoreExtents");
1263         TRACER_ADDINPUT(oid);
1264         TRACER_ADDSHORTINPUT(dbRoot);
1265         TRACER_ADDINPUT(partitionNum);
1266         TRACER_WRITE;
1267     }
1268 
1269 #endif
1270 
1271     ByteStream command, response;
1272     uint8_t err;
1273 
1274     command << ROLLBACK_DICT_STORE_EXTENTS_DBROOT <<
1275             (ByteStream::quadbyte) oid <<
1276             dbRoot << partitionNum;
1277     serializeVector(command, segNums);
1278     serializeVector(command, hwms);
1279     err = send_recv(command, response);
1280 
1281     if (err != ERR_OK)
1282         return err;
1283 
1284     if (response.length() != 1)
1285         return ERR_NETWORK;
1286 
1287     response >> err;
1288     CHECK_EMPTY(response);
1289     return err;
1290 }
1291 
deleteEmptyColExtents(const std::vector<ExtentInfo> & extentsInfo)1292 int DBRM::deleteEmptyColExtents(const std::vector<ExtentInfo>& extentsInfo) DBRM_THROW
1293 {
1294 #ifdef BRM_INFO
1295 
1296     if (fDebug)
1297     {
1298         TRACER_WRITELATER("deleteEmptyColExtents");
1299         TRACER_WRITE;
1300     }
1301 
1302 #endif
1303 
1304     ByteStream command, response;
1305     uint8_t err;
1306     uint32_t size = extentsInfo.size();
1307     command << DELETE_EMPTY_COL_EXTENTS;
1308     command << size;
1309 
1310     for ( unsigned i = 0; i < extentsInfo.size(); i++)
1311     {
1312         command << (ByteStream::quadbyte) extentsInfo[i].oid;
1313         command << extentsInfo[i].partitionNum;
1314         command << extentsInfo[i].segmentNum;
1315         command << extentsInfo[i].dbRoot;
1316         command << extentsInfo[i].hwm;
1317     }
1318 
1319     err = send_recv(command, response);
1320 
1321     if (err != ERR_OK)
1322         return err;
1323 
1324     if (response.length() != 1)
1325         return ERR_NETWORK;
1326 
1327     response >> err;
1328     CHECK_EMPTY(response);
1329     return err;
1330 }
1331 
deleteEmptyDictStoreExtents(const std::vector<ExtentInfo> & extentsInfo)1332 int DBRM::deleteEmptyDictStoreExtents(const std::vector<ExtentInfo>& extentsInfo) DBRM_THROW
1333 {
1334 #ifdef BRM_INFO
1335 
1336     if (fDebug)
1337     {
1338         TRACER_WRITELATER("deleteEmptyDictStoreExtents");
1339         TRACER_WRITE;
1340     }
1341 
1342 #endif
1343 
1344     ByteStream command, response;
1345     uint8_t err;
1346     uint32_t size = extentsInfo.size();
1347     command << DELETE_EMPTY_DICT_STORE_EXTENTS;
1348     command << size;
1349 
1350     for ( unsigned i = 0; i < extentsInfo.size(); i++)
1351     {
1352         command << (ByteStream::quadbyte) extentsInfo[i].oid;
1353         command << extentsInfo[i].partitionNum;
1354         command << extentsInfo[i].segmentNum;
1355         command << extentsInfo[i].dbRoot;
1356         command << extentsInfo[i].hwm;
1357         command << (uint8_t)extentsInfo[i].newFile;
1358     }
1359 
1360     err = send_recv(command, response);
1361 
1362     if (err != ERR_OK)
1363         return err;
1364 
1365     if (response.length() != 1)
1366         return ERR_NETWORK;
1367 
1368     response >> err;
1369     CHECK_EMPTY(response);
1370     return err;
1371 }
1372 
deleteOID(OID_t oid)1373 int DBRM::deleteOID(OID_t oid) DBRM_THROW
1374 {
1375 #ifdef BRM_INFO
1376 
1377     if (fDebug)
1378     {
1379         TRACER_WRITELATER("deleteOID");
1380         TRACER_ADDINPUT(oid);
1381         TRACER_WRITE;
1382     }
1383 
1384 #endif
1385 
1386     ByteStream command, response;
1387     uint8_t err;
1388 
1389     command << DELETE_OID << (ByteStream::quadbyte) oid;
1390     err = send_recv(command, response);
1391 
1392     if (err != ERR_OK)
1393         return err;
1394 
1395     if (response.length() != 1)
1396         return ERR_NETWORK;
1397 
1398     response >> err;
1399     CHECK_EMPTY(response);
1400 
1401     try
1402     {
1403         deleteAISequence(oid);
1404     }
1405     catch (...) { }   // an error here means a network problem, will be caught elsewhere
1406 
1407     return err;
1408 }
1409 
deleteOIDs(const std::vector<OID_t> & oids)1410 int DBRM::deleteOIDs(const std::vector<OID_t>& oids) DBRM_THROW
1411 {
1412 #ifdef BRM_INFO
1413 
1414     if (fDebug)
1415     {
1416         TRACER_WRITELATER("deleteOIDs");
1417         TRACER_WRITE;
1418     }
1419 
1420 #endif
1421 
1422     ByteStream command, response;
1423     uint8_t err;
1424     uint32_t size = oids.size();
1425     command << DELETE_OIDS;
1426     command << size;
1427 
1428     for ( unsigned i = 0; i < oids.size(); i++)
1429     {
1430         command << (ByteStream::quadbyte) oids[i];
1431     }
1432 
1433     err = send_recv(command, response);
1434 
1435     if (err != ERR_OK)
1436         return err;
1437 
1438     if (response.length() != 1)
1439         return ERR_NETWORK;
1440 
1441     response >> err;
1442     CHECK_EMPTY(response);
1443 
1444     try
1445     {
1446         for (uint32_t i = 0; i < oids.size(); i++)
1447             deleteAISequence(oids[i]);
1448     }
1449     catch (...) { }   // an error here means a network problem, will be caught elsewhere
1450 
1451     return err;
1452 }
1453 
1454 //------------------------------------------------------------------------------
1455 // Return the last local HWM for the specified OID and DBroot. The corresponding
1456 // partition number, and segment number are returned as well.  This function
1457 // can be used by cpimport for example to find out where the current "end-of-
1458 // data" is, so that cpimport will know where to begin adding new rows.
1459 // If no available or outOfService extent is found, then bFound is returned
1460 // as false.
1461 //------------------------------------------------------------------------------
getLastHWM_DBroot(int oid,uint16_t dbRoot,uint32_t & partitionNum,uint16_t & segmentNum,HWM_t & hwm,int & status,bool & bFound)1462 int DBRM::getLastHWM_DBroot(int oid, uint16_t dbRoot, uint32_t& partitionNum,
1463                             uint16_t& segmentNum, HWM_t& hwm,
1464                             int& status, bool& bFound) throw()
1465 {
1466 #ifdef BRM_INFO
1467 
1468     if (fDebug)
1469     {
1470         TRACER_WRITELATER("getLastHWM_DBroot");
1471         TRACER_ADDINPUT(oid);
1472         TRACER_ADDSHORTOUTPUT(dbRoot);
1473         TRACER_ADDOUTPUT(partitionNum);
1474         TRACER_ADDSHORTOUTPUT(segmentNum);
1475         TRACER_ADDOUTPUT(hwm);
1476         TRACER_ADDOUTPUT(status);
1477         TRACER_WRITE;
1478     }
1479 
1480 #endif
1481 
1482     try
1483     {
1484         hwm = em->getLastHWM_DBroot(oid, dbRoot, partitionNum, segmentNum,
1485                                     status, bFound);
1486     }
1487     catch (exception& e)
1488     {
1489         return ERR_FAILURE;
1490     }
1491 
1492     return ERR_OK;
1493 }
1494 
1495 //------------------------------------------------------------------------------
1496 // Return the HWM for the specified OID, partition number, and segment number.
1497 // This is used to get the HWM for a particular dictionary segment store file,
1498 // or a specific column segment file.
1499 //------------------------------------------------------------------------------
getLocalHWM(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,HWM_t & hwm,int & status)1500 int DBRM::getLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum,
1501                       HWM_t& hwm, int& status) throw()
1502 {
1503 #ifdef BRM_INFO
1504 
1505     if (fDebug)
1506     {
1507         TRACER_WRITELATER("getLocalHWM");
1508         TRACER_ADDINPUT(oid);
1509         TRACER_ADDINPUT(partitionNum);
1510         TRACER_ADDSHORTINPUT(segmentNum);
1511         TRACER_ADDOUTPUT(hwm);
1512         TRACER_ADDOUTPUT(status);
1513         TRACER_WRITE;
1514     }
1515 
1516 #endif
1517 
1518     try
1519     {
1520         hwm = em->getLocalHWM(oid, partitionNum, segmentNum, status);
1521     }
1522     catch (exception& e)
1523     {
1524         cerr << e.what() << endl;
1525         return ERR_FAILURE;
1526     }
1527 
1528     return ERR_OK;
1529 }
1530 
1531 //------------------------------------------------------------------------------
1532 // Set the local HWM for the file referenced by the specified OID, partition
1533 // number, and segment number.
1534 //------------------------------------------------------------------------------
setLocalHWM(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,HWM_t hwm)1535 int DBRM::setLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum,
1536                       HWM_t hwm) DBRM_THROW
1537 {
1538 #ifdef BRM_INFO
1539 
1540     if (fDebug)
1541     {
1542         TRACER_WRITELATER("setLocalHWM");
1543         TRACER_ADDINPUT(oid);
1544         TRACER_ADDINPUT(partitionNum);
1545         TRACER_ADDSHORTINPUT(segmentNum);
1546         TRACER_ADDINPUT(hwm);
1547         TRACER_WRITE;
1548     }
1549 
1550 #endif
1551 
1552     ByteStream command, response;
1553     uint8_t err;
1554 
1555     command << SET_LOCAL_HWM << (ByteStream::quadbyte) oid << partitionNum << segmentNum << hwm;
1556     err = send_recv(command, response);
1557 
1558     if (err != ERR_OK)
1559         return err;
1560 
1561     if (response.length() != 1)
1562         return ERR_NETWORK;
1563 
1564     response >> err;
1565     CHECK_EMPTY(response);
1566     return err;
1567 }
1568 
bulkSetHWM(const vector<BulkSetHWMArg> & v,VER_t transID)1569 int DBRM::bulkSetHWM(const vector<BulkSetHWMArg>& v, VER_t transID) DBRM_THROW
1570 {
1571 #ifdef BRM_INFO
1572 
1573     if (fDebug)
1574     {
1575         TRACER_WRITELATER("bulkSetHWM");
1576         TRACER_WRITE;
1577     }
1578 
1579 #endif
1580 
1581     ByteStream command, response;
1582     uint8_t err;
1583 
1584     command << BULK_SET_HWM;
1585     serializeInlineVector(command, v);
1586     command << (uint32_t) transID;
1587     err = send_recv(command, response);
1588 
1589     if (err != ERR_OK)
1590         return err;
1591 
1592     if (response.length() != 1)
1593         return ERR_NETWORK;
1594 
1595     response >> err;
1596     CHECK_EMPTY(response);
1597     return err;
1598 }
1599 
bulkSetHWMAndCP(const vector<BulkSetHWMArg> & v,const vector<CPInfo> & setCPDataArgs,const vector<CPInfoMerge> & mergeCPDataArgs,VER_t transID)1600 int DBRM::bulkSetHWMAndCP(const vector<BulkSetHWMArg>& v, const vector<CPInfo>& setCPDataArgs,
1601                           const vector<CPInfoMerge>& mergeCPDataArgs, VER_t transID) DBRM_THROW
1602 {
1603 #ifdef BRM_INFO
1604 
1605     if (fDebug)
1606     {
1607         TRACER_WRITELATER("bulkSetHWMAndCP");
1608         TRACER_WRITE;
1609     }
1610 
1611 #endif
1612 
1613     ByteStream command, response;
1614     uint8_t err;
1615 
1616     command << BULK_SET_HWM_AND_CP;
1617     serializeInlineVector(command, v);
1618     serializeInlineVector(command, setCPDataArgs);
1619     serializeInlineVector(command, mergeCPDataArgs);
1620     command << (uint32_t) transID;
1621     err = send_recv(command, response);
1622 
1623     if (err != ERR_OK)
1624         return err;
1625 
1626     if (response.length() != 1)
1627         return ERR_NETWORK;
1628 
1629     response >> err;
1630     CHECK_EMPTY(response);
1631     return err;
1632 }
1633 
bulkUpdateDBRoot(const vector<BulkUpdateDBRootArg> & args)1634 int DBRM::bulkUpdateDBRoot(const vector<BulkUpdateDBRootArg>& args)
1635 {
1636 #ifdef BRM_INFO
1637 
1638     if (fDebug)
1639     {
1640         TRACER_WRITELATER("bulkUpdateDBRoot");
1641         TRACER_WRITE;
1642     }
1643 
1644 #endif
1645 
1646     ByteStream command, response;
1647     uint8_t err;
1648 
1649     command << BULK_UPDATE_DBROOT;
1650     serializeInlineVector(command, args);
1651     err = send_recv(command, response);
1652 
1653     if (err != ERR_OK)
1654         return err;
1655 
1656     if (response.length() != 1)
1657         return ERR_NETWORK;
1658 
1659     response >> err;
1660     CHECK_EMPTY(response);
1661     return err;
1662 }
1663 
1664 
1665 //------------------------------------------------------------------------------
1666 // For the specified OID and PM number, this function will return a vector
1667 // of objects carrying HWM info (for the last segment file) and block count
1668 // information about each DBRoot assigned to the specified PM.
1669 //------------------------------------------------------------------------------
getDbRootHWMInfo(OID_t oid,uint16_t pmNumber,EmDbRootHWMInfo_v & emDBRootHwmInfos)1670 int DBRM::getDbRootHWMInfo(OID_t oid, uint16_t pmNumber,
1671                            EmDbRootHWMInfo_v& emDBRootHwmInfos) throw()
1672 {
1673 #ifdef BRM_INFO
1674 
1675     if (fDebug)
1676     {
1677         TRACER_WRITELATER("getDbRootHWMInfo");
1678         TRACER_ADDINPUT(oid);
1679         TRACER_ADDSHORTINPUT(pmNumber);
1680         TRACER_WRITE;
1681     }
1682 
1683 #endif
1684 
1685     try
1686     {
1687         em->getDbRootHWMInfo(oid, pmNumber, emDBRootHwmInfos);
1688     }
1689     catch (exception& e)
1690     {
1691         cerr << e.what() << endl;
1692         return ERR_FAILURE;
1693     }
1694 
1695     return ERR_OK;
1696 }
1697 
1698 //------------------------------------------------------------------------------
1699 // Return the status or state of the extents in the segment file specified
1700 // by the arguments: oid, partitionNum, and segment Num.
1701 //------------------------------------------------------------------------------
getExtentState(OID_t oid,uint32_t partitionNum,uint16_t segmentNum,bool & bFound,int & status)1702 int DBRM::getExtentState(OID_t oid, uint32_t partitionNum,
1703                          uint16_t segmentNum, bool& bFound, int& status) throw()
1704 {
1705 #ifdef BRM_INFO
1706 
1707     if (fDebug)
1708     {
1709         TRACER_WRITELATER("getExtentState");
1710         TRACER_ADDINPUT(oid);
1711         TRACER_ADDINPUT(partitionNum);
1712         TRACER_ADDSHORTINPUT(segmentNum);
1713         TRACER_ADDOUTPUT(status);
1714         TRACER_WRITE;
1715     }
1716 
1717 #endif
1718 
1719     try
1720     {
1721         em->getExtentState(oid, partitionNum,
1722                            segmentNum, bFound, status);
1723     }
1724     catch (exception& e)
1725     {
1726         cerr << e.what() << endl;
1727         return ERR_FAILURE;
1728     }
1729 
1730     return ERR_OK;
1731 }
1732 
1733 // dmc-should eventually deprecate
getExtentSize()1734 int DBRM::getExtentSize() throw()
1735 {
1736 #ifdef BRM_INFO
1737 
1738     if (fDebug) TRACER_WRITENOW("getExtentSize");
1739 
1740 #endif
1741     return em->getExtentSize();
1742 }
1743 
getExtentRows()1744 unsigned DBRM::getExtentRows() throw()
1745 {
1746 #ifdef BRM_INFO
1747 
1748     if (fDebug) TRACER_WRITENOW("getExtentRows");
1749 
1750 #endif
1751     return em->getExtentRows();
1752 }
1753 
getExtents(int OID,std::vector<struct EMEntry> & entries,bool sorted,bool notFoundErr,bool incOutOfService)1754 int DBRM::getExtents(int OID, std::vector<struct EMEntry>& entries,
1755                      bool sorted, bool notFoundErr, bool incOutOfService)
1756 {
1757 #ifdef BRM_INFO
1758 
1759     if (fDebug)
1760     {
1761         TRACER_WRITELATER("getExtents");
1762         TRACER_ADDINPUT(OID);
1763         TRACER_WRITE;
1764     }
1765 
1766 #endif
1767 
1768     try
1769     {
1770         em->getExtents(OID, entries, sorted, notFoundErr, incOutOfService);
1771     }
1772     catch (exception& e)
1773     {
1774         cerr << e.what() << endl;
1775         return -1;
1776     }
1777 
1778     return 0;
1779 }
1780 
getExtents_dbroot(int OID,std::vector<struct EMEntry> & entries,const uint16_t dbroot)1781 int DBRM::getExtents_dbroot(int OID, std::vector<struct EMEntry>& entries,
1782                             const uint16_t dbroot) throw()
1783 {
1784 #ifdef BRM_INFO
1785 
1786     if (fDebug)
1787     {
1788         TRACER_WRITELATER("getExtents_dbroot");
1789         TRACER_ADDINPUT(OID);
1790         TRACER_WRITE;
1791     }
1792 
1793 #endif
1794 
1795     try
1796     {
1797         em->getExtents_dbroot(OID, entries, dbroot);
1798     }
1799     catch (exception& e)
1800     {
1801         cerr << e.what() << endl;
1802         return -1;
1803     }
1804 
1805     return 0;
1806 }
1807 
1808 //------------------------------------------------------------------------------
1809 // Return the number of extents for the specified OID and DBRoot.
1810 // Any out-of-service extents can optionally be included or excluded.
1811 //------------------------------------------------------------------------------
getExtentCount_dbroot(int OID,uint16_t dbroot,bool incOutOfService,uint64_t & numExtents)1812 int DBRM::getExtentCount_dbroot(int OID, uint16_t dbroot,
1813                                 bool incOutOfService, uint64_t& numExtents) throw()
1814 {
1815 #ifdef BRM_INFO
1816 
1817     if (fDebug)
1818     {
1819         TRACER_WRITELATER("getExtentCount_dbroot");
1820         TRACER_ADDINPUT(OID);
1821         TRACER_WRITE;
1822     }
1823 
1824 #endif
1825 
1826     try
1827     {
1828         em->getExtentCount_dbroot(OID, dbroot, incOutOfService, numExtents);
1829     }
1830     catch (exception& e)
1831     {
1832         cerr << e.what() << endl;
1833         return -1;
1834     }
1835 
1836     return 0;
1837 }
1838 
1839 //------------------------------------------------------------------------------
1840 // Gets the DBRoot for the specified system catalog OID.
1841 // Function assumes the specified System Catalog OID is fully contained on
1842 // a single DBRoot, as the function only searches for and returns the first
1843 // DBRoot entry that is found in the extent map.
1844 //------------------------------------------------------------------------------
getSysCatDBRoot(OID_t oid,uint16_t & dbRoot)1845 int DBRM::getSysCatDBRoot(OID_t oid, uint16_t& dbRoot) throw()
1846 {
1847 #ifdef BRM_INFO
1848 
1849     if (fDebug)
1850     {
1851         TRACER_WRITELATER("getSysCatDBRoot");
1852         TRACER_ADDINPUT(oid);
1853         TRACER_ADDSHORTOUTPUT(dbRoot);
1854         TRACER_WRITE;
1855     }
1856 
1857 #endif
1858 
1859     try
1860     {
1861         em->getSysCatDBRoot(oid, dbRoot);
1862     }
1863     catch (exception& e)
1864     {
1865         cerr << e.what() << endl;
1866         return -1;
1867     }
1868 
1869     return 0;
1870 }
1871 
1872 //------------------------------------------------------------------------------
1873 // Delete all extents for the specified OID(s) and partition number.
1874 //------------------------------------------------------------------------------
deletePartition(const std::vector<OID_t> & oids,const std::set<LogicalPartition> & partitionNums,string & emsg)1875 int DBRM::deletePartition(const std::vector<OID_t>& oids,
1876                           const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
1877 {
1878 #ifdef BRM_INFO
1879 
1880     if (fDebug)
1881     {
1882         TRACER_WRITENOW("deletePartition");
1883         std::ostringstream oss;
1884 		oss << "partitionNum: ";
1885             std::set<LogicalPartition>::const_iterator partIt;
1886 
1887         for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
1888 			oss << (*partIt) << " ";
1889                 oss << "; OIDS: ";
1890 
1891         std::vector<OID_t>::const_iterator it;
1892 
1893         for (it = oids.begin(); it != oids.end(); ++it)
1894         {
1895             oss << (*it) << ", ";
1896         }
1897 
1898         TRACER_WRITEDIRECT(oss.str());
1899     }
1900 
1901 #endif
1902 
1903     ByteStream command, response;
1904     uint8_t err;
1905     command << DELETE_PARTITION;
1906     serializeSet<LogicalPartition>(command, partitionNums);
1907     uint32_t oidSize = oids.size();
1908     command << oidSize;
1909 
1910     for ( unsigned i = 0; i < oidSize; i++)
1911         command << (ByteStream::quadbyte) oids[i];
1912 
1913     err = send_recv(command, response);
1914 
1915     if (err != ERR_OK)
1916         return err;
1917 
1918     if (response.length() == 0)
1919         return ERR_NETWORK;
1920 
1921     response >> err;
1922 
1923     if (err != 0)
1924         response >> emsg;
1925 
1926     CHECK_EMPTY(response);
1927     return err;
1928 }
1929 
1930 //------------------------------------------------------------------------------
1931 // Mark all extents as out of service, for the specified OID(s) and partition
1932 // number.
1933 //------------------------------------------------------------------------------
markPartitionForDeletion(const std::vector<OID_t> & oids,const std::set<LogicalPartition> & partitionNums,string & emsg)1934 int DBRM::markPartitionForDeletion(const std::vector<OID_t>& oids,
1935                                    const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
1936 {
1937 #ifdef BRM_INFO
1938 
1939     if (fDebug)
1940     {
1941         TRACER_WRITENOW("markPartitionForDeletion");
1942         std::ostringstream oss;
1943 		oss << "partitionNum: ";
1944             std::set<LogicalPartition>::const_iterator partIt;
1945 
1946         for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
1947 			oss << (*partIt) << " ";
1948                 oss << "; OIDS: ";
1949 
1950         std::vector<OID_t>::const_iterator it;
1951 
1952         for (it = oids.begin(); it != oids.end(); ++it)
1953         {
1954             oss << (*it) << ", ";
1955         }
1956 
1957         TRACER_WRITEDIRECT(oss.str());
1958     }
1959 
1960 #endif
1961 
1962     ByteStream command, response;
1963     uint8_t err;
1964     command << MARK_PARTITION_FOR_DELETION;
1965     serializeSet<LogicalPartition>(command, partitionNums);
1966     uint32_t oidSize = oids.size();
1967     command << oidSize;
1968 
1969     for ( unsigned i = 0; i < oidSize; i++)
1970         command << (ByteStream::quadbyte) oids[i];
1971 
1972     err = send_recv(command, response);
1973 
1974     if (err != ERR_OK)
1975         return err;
1976 
1977     if (response.length() == 0)
1978         return ERR_NETWORK;
1979 
1980     response >> err;
1981 
1982     if (err)
1983         response >> emsg;
1984 
1985     CHECK_EMPTY(response);
1986     return err;
1987 }
1988 
1989 //------------------------------------------------------------------------------
1990 // Mark all extents as out of service, for the specified OID(s)
1991 //------------------------------------------------------------------------------
markAllPartitionForDeletion(const std::vector<OID_t> & oids)1992 int DBRM::markAllPartitionForDeletion(const std::vector<OID_t>& oids) DBRM_THROW
1993 {
1994 #ifdef BRM_INFO
1995 
1996     if (fDebug)
1997     {
1998         TRACER_WRITENOW("markAllPartitionForDeletion");
1999         std::ostringstream oss;
2000         oss << "OIDS: ";
2001         std::vector<OID_t>::const_iterator it;
2002 
2003         for (it = oids.begin(); it != oids.end(); ++it)
2004         {
2005             oss << (*it) << ", ";
2006         }
2007 
2008         TRACER_WRITEDIRECT(oss.str());
2009     }
2010 
2011 #endif
2012 
2013     ByteStream command, response;
2014     uint8_t err;
2015     uint32_t size = oids.size();
2016 
2017     command << MARK_ALL_PARTITION_FOR_DELETION << size;
2018 
2019     for ( unsigned i = 0; i < size; i++)
2020     {
2021         command << (ByteStream::quadbyte) oids[i];
2022     }
2023 
2024     err = send_recv(command, response);
2025 
2026     if (err != ERR_OK)
2027         return err;
2028 
2029     if (response.length() != 1)
2030         return ERR_NETWORK;
2031 
2032     response >> err;
2033     CHECK_EMPTY(response);
2034     return err;
2035 }
2036 
2037 //------------------------------------------------------------------------------
2038 // Restore all extents for the specified OID(s) and partition number.
2039 //------------------------------------------------------------------------------
restorePartition(const std::vector<OID_t> & oids,const std::set<LogicalPartition> & partitionNums,string & emsg)2040 int DBRM::restorePartition(const std::vector<OID_t>& oids,
2041                            const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
2042 {
2043 #ifdef BRM_INFO
2044 
2045     if (fDebug)
2046     {
2047         TRACER_WRITENOW("restorePartition");
2048         std::ostringstream oss;
2049 		oss << "partitionNum: ";
2050             std::set<LogicalPartition>::const_iterator partIt;
2051 
2052         for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
2053 			oss << (*partIt) << " ";
2054                 oss << "; OIDS: ";
2055 
2056         std::vector<OID_t>::const_iterator it;
2057 
2058         for (it = oids.begin(); it != oids.end(); ++it)
2059         {
2060             oss << (*it) << ", ";
2061         }
2062 
2063         TRACER_WRITEDIRECT(oss.str());
2064     }
2065 
2066 #endif
2067 
2068     ByteStream command, response;
2069     uint8_t err;
2070 
2071     command << RESTORE_PARTITION;
2072     serializeSet<LogicalPartition>(command, partitionNums);
2073     uint32_t oidSize = oids.size();
2074     command << oidSize;
2075 
2076     for ( unsigned i = 0; i < oidSize; i++)
2077         command << (ByteStream::quadbyte) oids[i];
2078 
2079     err = send_recv(command, response);
2080 
2081     if (err != ERR_OK)
2082         return err;
2083 
2084     if (response.length() == 0)
2085         return ERR_NETWORK;
2086 
2087     response >> err;
2088 
2089     if (err)
2090         response >> emsg;
2091 
2092     CHECK_EMPTY(response);
2093     return err;
2094 }
2095 
2096 //------------------------------------------------------------------------------
2097 // Return all the out-of-service partitions for the specified OID.
2098 //------------------------------------------------------------------------------
getOutOfServicePartitions(OID_t oid,std::set<LogicalPartition> & partitionNums)2099 int DBRM::getOutOfServicePartitions(OID_t oid,
2100                                     std::set<LogicalPartition>& partitionNums) throw()
2101 {
2102 #ifdef BRM_INFO
2103 
2104     if (fDebug)
2105     {
2106         TRACER_WRITELATER("getOutOfServicePartitions");
2107         TRACER_ADDINPUT(oid);
2108         TRACER_WRITE;
2109     }
2110 
2111 #endif
2112 
2113     try
2114     {
2115         em->getOutOfServicePartitions(oid, partitionNums);
2116     }
2117     catch (exception& e)
2118     {
2119         cerr << e.what() << endl;
2120         return -1;
2121     }
2122 
2123     return ERR_OK;
2124 }
2125 
2126 //------------------------------------------------------------------------------
2127 // Delete all extents for the specified DBRoot
2128 //------------------------------------------------------------------------------
deleteDBRoot(uint16_t dbroot)2129 int DBRM::deleteDBRoot(uint16_t dbroot) DBRM_THROW
2130 {
2131 #ifdef BRM_INFO
2132 
2133     if (fDebug)
2134     {
2135         TRACER_WRITENOW("deleteDBRoot");
2136         std::ostringstream oss;
2137         oss << "DBRoot: " << dbroot;
2138         TRACER_WRITEDIRECT(oss.str());
2139     }
2140 
2141 #endif
2142 
2143     ByteStream command, response;
2144     uint8_t err;
2145     command << DELETE_DBROOT;
2146     uint32_t q = static_cast<uint32_t>(dbroot);
2147     command << q;
2148     err = send_recv(command, response);
2149 
2150     if (err != ERR_OK)
2151         return err;
2152 
2153     if (response.length() == 0)
2154         return ERR_NETWORK;
2155 
2156     response >> err;
2157     CHECK_EMPTY(response);
2158     return err;
2159 }
2160 
2161 //------------------------------------------------------------------------------
2162 // Does the specified DBRoot have any extents.
2163 // Returns an error if extentmap shared memory is not loaded.
2164 //------------------------------------------------------------------------------
isDBRootEmpty(uint16_t dbroot,bool & isEmpty,std::string & errMsg)2165 int DBRM::isDBRootEmpty(uint16_t dbroot,
2166                         bool& isEmpty, std::string& errMsg) throw()
2167 {
2168 #ifdef BRM_INFO
2169 
2170     if (fDebug)
2171     {
2172         TRACER_WRITELATER("isDBRootEmpty");
2173         TRACER_ADDINPUT(dbroot);
2174         TRACER_WRITE;
2175     }
2176 
2177 #endif
2178 
2179     errMsg.clear();
2180 
2181     try
2182     {
2183         isEmpty = em->isDBRootEmpty(dbroot);
2184     }
2185     catch (exception& e)
2186     {
2187         cerr << e.what() << endl;
2188         errMsg = e.what();
2189         return ERR_FAILURE;
2190     }
2191 
2192     return ERR_OK;
2193 }
2194 
writeVBEntry(VER_t transID,LBID_t lbid,OID_t vbOID,uint32_t vbFBO)2195 int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
2196                        uint32_t vbFBO) DBRM_THROW
2197 {
2198 
2199 #ifdef BRM_INFO
2200 
2201     if (fDebug)
2202     {
2203         TRACER_WRITELATER("writeVBEntry");
2204         TRACER_ADDINPUT(transID);
2205         TRACER_ADDINPUT(lbid);
2206         TRACER_ADDINPUT(vbOID);
2207         TRACER_ADDINPUT(vbFBO);
2208         TRACER_WRITE;
2209     }
2210 
2211 #endif
2212 
2213     ByteStream command, response;
2214     uint8_t err;
2215 
2216     command << WRITE_VB_ENTRY << (uint32_t) transID << (uint64_t) lbid << (uint32_t) vbOID << vbFBO;
2217     err = send_recv(command, response);
2218 
2219     if (err != ERR_OK)
2220         return err;
2221 
2222     if (response.length() != 1)
2223         return ERR_NETWORK;
2224 
2225     response >> err;
2226     CHECK_EMPTY(response);
2227     return err;
2228 }
2229 
bulkWriteVBEntry(VER_t transID,const std::vector<BRM::LBID_t> & lbids,OID_t vbOID,const std::vector<uint32_t> & vbFBOs)2230 int DBRM::bulkWriteVBEntry(VER_t transID,
2231                            const std::vector<BRM::LBID_t>& lbids,
2232                            OID_t vbOID,
2233                            const std::vector<uint32_t>& vbFBOs) DBRM_THROW
2234 {
2235 
2236 #ifdef BRM_INFO
2237 
2238     if (fDebug)
2239     {
2240         TRACER_WRITELATER("bulkWriteVBEntry");
2241         TRACER_WRITE;
2242     }
2243 
2244 #endif
2245 
2246     ByteStream command, response;
2247     uint8_t err;
2248 
2249     command << BULK_WRITE_VB_ENTRY << (uint32_t) transID;
2250     serializeInlineVector(command, lbids);
2251     command << (uint32_t) vbOID;
2252     serializeInlineVector(command, vbFBOs);
2253     err = send_recv(command, response);
2254 
2255     if (err != ERR_OK)
2256         return err;
2257 
2258     if (response.length() != 1)
2259         return ERR_NETWORK;
2260 
2261     response >> err;
2262     CHECK_EMPTY(response);
2263     return err;
2264 }
2265 
2266 struct _entry
2267 {
_entryBRM::_entry2268     _entry(LBID_t l) : lbid(l) { };
2269     LBID_t lbid;
operator <BRM::_entry2270     inline bool operator<(const _entry& e) const
2271     {
2272         return ((e.lbid >> 10) < (lbid >> 10));
2273     }
2274 };
2275 
getDBRootsForRollback(VER_t transID,vector<uint16_t> * dbroots)2276 int DBRM::getDBRootsForRollback(VER_t transID, vector<uint16_t>* dbroots) throw()
2277 {
2278 #ifdef BRM_INFO
2279 
2280     if (fDebug)
2281     {
2282         TRACER_WRITELATER("getDBRootsForRollback");
2283         TRACER_ADDINPUT(transID);
2284         TRACER_WRITE;
2285     }
2286 
2287 #endif
2288     bool locked[2] = {false, false};
2289     set<OID_t> vbOIDs;
2290     set<OID_t>::iterator vbIt;
2291     vector<LBID_t> lbidList;
2292     uint32_t i, size;
2293     uint32_t tmp32;
2294     OID_t vbOID;
2295     int err;
2296 
2297     set<_entry> lbidPruner;
2298     set<_entry>::iterator it;
2299 
2300     try
2301     {
2302         vbbm->lock(VBBM::READ);
2303         locked[0] = true;
2304         vss->lock(VSS::READ);
2305         locked[1] = true;
2306 
2307         vss->getUncommittedLBIDs(transID, lbidList);
2308 
2309         // prune the list; will leave at most 1 entry per 1024-lbid range
2310         for (i = 0, size = lbidList.size(); i < size; i++)
2311             lbidPruner.insert(_entry(lbidList[i]));
2312 
2313         // get the VB oids
2314         for (it = lbidPruner.begin(); it != lbidPruner.end(); ++it)
2315         {
2316             err = vbbm->lookup(it->lbid, transID, vbOID, tmp32);
2317 
2318             if (err)   // this error will be caught by DML; more appropriate to handle it there
2319                 continue;
2320 
2321             vbOIDs.insert(vbOID);
2322         }
2323 
2324         // get the dbroots
2325         for (vbIt = vbOIDs.begin(); vbIt != vbOIDs.end(); ++vbIt)
2326         {
2327             err = getDBRootOfVBOID(*vbIt);
2328 
2329             if (err)
2330             {
2331                 ostringstream os;
2332                 os << "DBRM::getDBRootOfVBOID() returned an error looking for vbOID " << *vbIt;
2333                 log(os.str());
2334                 return ERR_FAILURE;
2335             }
2336 
2337             dbroots->push_back((uint16_t) err);
2338         }
2339 
2340         vss->release(VSS::READ);
2341         locked[1] = false;
2342         vbbm->release(VBBM::READ);
2343         locked[0] = false;
2344 
2345         return ERR_OK;
2346     }
2347     catch (exception& e)
2348     {
2349         if (locked[0])
2350             vbbm->release(VBBM::READ);
2351 
2352         if (locked[1])
2353             vss->release(VSS::READ);
2354 
2355         return -1;
2356     }
2357 
2358 }
2359 
getUncommittedLBIDs(VER_t transID,vector<LBID_t> & lbidList)2360 int DBRM::getUncommittedLBIDs(VER_t transID, vector<LBID_t>& lbidList) throw()
2361 {
2362 #ifdef BRM_INFO
2363 
2364     if (fDebug)
2365     {
2366         TRACER_WRITELATER("getUncommittedLBIDs");
2367         TRACER_ADDINPUT(transID);
2368         TRACER_WRITE;
2369     }
2370 
2371 #endif
2372     bool locked = false;
2373 
2374     try
2375     {
2376         vss->lock(VSS::READ);
2377         locked = true;
2378 
2379         vss->getUncommittedLBIDs(transID, lbidList);
2380 
2381         vss->release(VSS::READ);
2382         locked = false;
2383         return 0;
2384     }
2385     catch (exception& e)
2386     {
2387         if (locked)
2388             vss->release(VSS::READ);
2389 
2390         return -1;
2391     }
2392 }
2393 
2394 // @bug 1509.  New function that returns one LBID per extent touched as part of the transaction.  Used to get a list
2395 // of blocks to use for updating casual partitioning when the transaction is committed.
getUncommittedExtentLBIDs(VER_t transID,vector<LBID_t> & lbidList)2396 int DBRM::getUncommittedExtentLBIDs(VER_t transID, vector<LBID_t>& lbidList) throw()
2397 {
2398 #ifdef BRM_INFO
2399 
2400     if (fDebug)
2401     {
2402         TRACER_WRITELATER("getUncommittedExtentLBIDs");
2403         TRACER_ADDINPUT(transID);
2404         TRACER_WRITE;
2405     }
2406 
2407 #endif
2408     bool locked = false;
2409     vector<LBID_t>::iterator lbidIt;
2410     typedef pair<int64_t, int64_t> range_t;
2411     range_t range;
2412     vector<range_t> ranges;
2413     vector<range_t>::iterator rangeIt;
2414 
2415     try
2416     {
2417         vss->lock(VSS::READ);
2418         locked = true;
2419 
2420         // Get a full list of uncommitted LBIDs related to this transactin.
2421         vss->getUncommittedLBIDs(transID, lbidList);
2422 
2423         vss->release(VSS::READ);
2424         locked = false;
2425 
2426         if (lbidList.size() > 0)
2427         {
2428 
2429             // Sort the vector.
2430             std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());
2431 
2432             // Get the LBID range for the first block in the list.
2433             lbidIt = lbidList.begin();
2434 
2435             if (em->lookup(*lbidIt, range.first, range.second) < 0)
2436             {
2437                 return -1;
2438             }
2439 
2440             ranges.push_back(range);
2441 
2442             // Loop through the LBIDs and add the new ranges.
2443             ++lbidIt;
2444 
2445             while (lbidIt != lbidList.end())
2446             {
2447                 if (*lbidIt > range.second)
2448                 {
2449                     if (em->lookup(*lbidIt, range.first, range.second) < 0)
2450                     {
2451                         return -1;
2452                     }
2453 
2454                     ranges.push_back(range);
2455                 }
2456 
2457                 ++lbidIt;
2458             }
2459 
2460             // Reset the lbidList and return only the first LBID in each extent that was changed
2461             // in the transaction.
2462             lbidList.clear();
2463 
2464             for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++)
2465             {
2466                 lbidList.push_back(rangeIt->first);
2467             }
2468         }
2469 
2470         return 0;
2471     }
2472     catch (exception& e)
2473     {
2474         if (locked)
2475             vss->release(VSS::READ);
2476 
2477         return -1;
2478     }
2479 }
2480 
2481 
beginVBCopy(VER_t transID,uint16_t dbRoot,const LBIDRange_v & ranges,VBRange_v & freeList)2482 int DBRM::beginVBCopy(VER_t transID, uint16_t dbRoot, const LBIDRange_v& ranges,
2483                       VBRange_v& freeList) DBRM_THROW
2484 {
2485 #ifdef BRM_INFO
2486 
2487     if (fDebug)
2488     {
2489         TRACER_WRITELATER("beginVBCopy");
2490         TRACER_ADDINPUT(transID);
2491         TRACER_WRITE;
2492     }
2493 
2494 #endif
2495 
2496     ByteStream command, response;
2497     uint8_t err;
2498 
2499     command << BEGIN_VB_COPY << (ByteStream::quadbyte) transID << dbRoot;
2500     serializeVector<LBIDRange>(command, ranges);
2501     err = send_recv(command, response);
2502 
2503     if (err != ERR_OK)
2504         return err;
2505 
2506     if (response.length() == 0)
2507         return ERR_NETWORK;
2508 
2509     try
2510     {
2511         response >> err;
2512 
2513         if (err != 0)
2514             return err;
2515 
2516         deserializeVector(response, freeList);
2517     }
2518     catch (exception& e)
2519     {
2520         cerr << e.what() << endl;
2521         return ERR_NETWORK;
2522     }
2523 
2524     CHECK_EMPTY(response);
2525     return 0;
2526 }
2527 
endVBCopy(VER_t transID,const LBIDRange_v & ranges)2528 int DBRM::endVBCopy(VER_t transID, const LBIDRange_v& ranges)
2529 DBRM_THROW
2530 {
2531 #ifdef BRM_INFO
2532 
2533     if (fDebug)
2534     {
2535         TRACER_WRITELATER("endVBCopy");
2536         TRACER_ADDINPUT(transID);
2537         TRACER_WRITE;
2538     }
2539 
2540 #endif
2541 
2542     ByteStream command, response;
2543     uint8_t err;
2544 
2545     command << END_VB_COPY << (ByteStream::quadbyte) transID;
2546     serializeVector(command, ranges);
2547     err = send_recv(command, response);
2548 
2549     if (response.length() != 1)
2550         return ERR_NETWORK;
2551 
2552     response >> err;
2553     CHECK_EMPTY(response);
2554     return err;
2555 }
2556 
vbCommit(VER_t transID)2557 int DBRM::vbCommit(VER_t transID) DBRM_THROW
2558 {
2559 #ifdef BRM_INFO
2560 
2561     if (fDebug)
2562     {
2563         TRACER_WRITELATER("vbCommit");
2564         TRACER_ADDINPUT(transID);
2565         TRACER_WRITE;
2566     }
2567 
2568 #endif
2569 
2570     ByteStream command, response;
2571     uint8_t err;
2572 
2573     command << VB_COMMIT << (ByteStream::quadbyte) transID;
2574     err = send_recv(command, response);
2575 
2576     if (err != ERR_OK)
2577         return err;
2578 
2579     if (response.length() != 1)
2580         return ERR_NETWORK;
2581 
2582     response >> err;
2583     CHECK_EMPTY(response);
2584     return err;
2585 }
2586 
vbRollback(VER_t transID,const LBIDRange_v & lbidList)2587 int DBRM::vbRollback(VER_t transID, const LBIDRange_v& lbidList) DBRM_THROW
2588 {
2589 
2590 #ifdef BRM_INFO
2591 
2592     if (fDebug)
2593     {
2594         TRACER_WRITELATER("vbRollback ");
2595         TRACER_ADDINPUT(transID);
2596         TRACER_WRITE;
2597     }
2598 
2599 #endif
2600 
2601     ByteStream command, response;
2602     uint8_t err;
2603 
2604     command << VB_ROLLBACK1 << (ByteStream::quadbyte) transID;
2605     serializeVector(command, lbidList);
2606     err = send_recv(command, response);
2607 
2608     if (err != ERR_OK)
2609         return err;
2610 
2611     if (response.length() != 1)
2612         return ERR_NETWORK;
2613 
2614     response >> err;
2615     CHECK_EMPTY(response);
2616     return err;
2617 }
2618 
vbRollback(VER_t transID,const vector<LBID_t> & lbidList)2619 int DBRM::vbRollback(VER_t transID, const vector<LBID_t>& lbidList) DBRM_THROW
2620 {
2621 #ifdef BRM_INFO
2622 
2623     if (fDebug)
2624     {
2625         TRACER_WRITELATER("vbRollback");
2626         TRACER_ADDINPUT(transID);
2627         TRACER_WRITE;
2628     }
2629 
2630 #endif
2631 
2632     ByteStream command, response;
2633     uint8_t err;
2634 
2635     command << VB_ROLLBACK2 << (ByteStream::quadbyte) transID;
2636     serializeVector(command, lbidList);
2637     err = send_recv(command, response);
2638 
2639     if (err != ERR_OK)
2640         return err;
2641 
2642     if (response.length() != 1)
2643         return ERR_NETWORK;
2644 
2645     response >> err;
2646     CHECK_EMPTY(response);
2647     return err;
2648 }
2649 
halt()2650 int DBRM::halt() DBRM_THROW
2651 {
2652 #ifdef BRM_INFO
2653 
2654     if (fDebug) TRACER_WRITENOW("halt");
2655 
2656 #endif
2657     ByteStream command, response;
2658     uint8_t err;
2659 
2660     command << HALT;
2661     err = send_recv(command, response);
2662 
2663     if (err != ERR_OK)
2664         return err;
2665 
2666     if (response.length() != 1)
2667         return ERR_NETWORK;
2668 
2669     response >> err;
2670     CHECK_EMPTY(response);
2671     return err;
2672 }
2673 
resume()2674 int DBRM::resume() DBRM_THROW
2675 {
2676 #ifdef BRM_INFO
2677 
2678     if (fDebug) TRACER_WRITENOW("resume");
2679 
2680 #endif
2681     ByteStream command, response;
2682     uint8_t err;
2683 
2684     command << RESUME;
2685     err = send_recv(command, response);
2686 
2687     if (err != ERR_OK)
2688         return err;
2689 
2690     if (response.length() != 1)
2691         return ERR_NETWORK;
2692 
2693     response >> err;
2694     CHECK_EMPTY(response);
2695     return err;
2696 }
2697 
forceReload()2698 int DBRM::forceReload() DBRM_THROW
2699 {
2700 #ifdef BRM_INFO
2701 
2702     if (fDebug) TRACER_WRITENOW("forceReload");
2703 
2704 #endif
2705     ByteStream command, response;
2706     uint8_t err;
2707 
2708     command << RELOAD;
2709     err = send_recv(command, response);
2710 
2711     if (err != ERR_OK)
2712         return err;
2713 
2714     if (response.length() != 1)
2715         return ERR_NETWORK;
2716 
2717     response >> err;
2718     CHECK_EMPTY(response);
2719     return err;
2720 }
2721 
setReadOnly(bool b)2722 int DBRM::setReadOnly(bool b) DBRM_THROW
2723 {
2724 #ifdef BRM_INFO
2725 
2726     if (fDebug)
2727     {
2728         TRACER_WRITELATER("setReadOnly");
2729         TRACER_ADDBOOLINPUT(b);
2730         TRACER_WRITE;
2731     }
2732 
2733 #endif
2734 
2735     ByteStream command, response;
2736     uint8_t err;
2737 
2738     command << (b ? SETREADONLY : SETREADWRITE);
2739     err = send_recv(command, response);
2740 
2741     if (err != ERR_OK)
2742         return err;
2743 
2744     if (response.length() != 1)
2745         return ERR_NETWORK;
2746 
2747     response >> err;
2748     CHECK_EMPTY(response);
2749     return err;
2750 }
2751 
isReadWrite()2752 int DBRM::isReadWrite() throw()
2753 {
2754 #ifdef BRM_INFO
2755 
2756     if (fDebug)  TRACER_WRITENOW("isReadWrite");
2757 
2758 #endif
2759     ByteStream command, response;
2760     uint8_t err;
2761 
2762     command << GETREADONLY;
2763     err = send_recv(command, response);
2764 
2765     if (err != ERR_OK)
2766         return err;
2767 
2768     if (response.length() != 1)
2769         return ERR_NETWORK;
2770 
2771     response >> err;
2772     //CHECK_EMPTY(response);
2773     return (err == 0 ? ERR_OK : ERR_READONLY);
2774 }
2775 
dmlLockLBIDRanges(const vector<LBIDRange> & ranges,int txnID)2776 int DBRM::dmlLockLBIDRanges(const vector<LBIDRange>& ranges, int txnID)
2777 {
2778 #ifdef BRM_INFO
2779 
2780     if (fDebug) TRACER_WRITENOW("clear");
2781 
2782 #endif
2783     ByteStream command, response;
2784     uint8_t err;
2785 
2786     command << LOCK_LBID_RANGES;
2787     serializeVector<LBIDRange>(command, ranges);
2788     command << (uint32_t) txnID;
2789     err = send_recv(command, response);
2790 
2791     if (err != ERR_OK)
2792         return err;
2793 
2794     if (response.length() != 1)
2795         return ERR_NETWORK;
2796 
2797     response >> err;
2798     CHECK_EMPTY(response);
2799     return err;
2800 }
2801 
dmlReleaseLBIDRanges(const vector<LBIDRange> & ranges)2802 int DBRM::dmlReleaseLBIDRanges(const vector<LBIDRange>& ranges)
2803 {
2804 #ifdef BRM_INFO
2805 
2806     if (fDebug) TRACER_WRITENOW("clear");
2807 
2808 #endif
2809     ByteStream command, response;
2810     uint8_t err;
2811 
2812     command << RELEASE_LBID_RANGES;
2813     serializeVector<LBIDRange>(command, ranges);
2814     err = send_recv(command, response);
2815 
2816     if (err != ERR_OK)
2817         return err;
2818 
2819     if (response.length() != 1)
2820         return ERR_NETWORK;
2821 
2822     response >> err;
2823     CHECK_EMPTY(response);
2824     return err;
2825 }
2826 
clear()2827 int DBRM::clear() DBRM_THROW
2828 {
2829     ByteStream command, response;
2830     uint8_t err;
2831 
2832     command << BRM_CLEAR;
2833     err = send_recv(command, response);
2834 
2835     if (err != ERR_OK)
2836         return err;
2837 
2838     if (response.length() != 1)
2839         return ERR_NETWORK;
2840 
2841     response >> err;
2842     CHECK_EMPTY(response);
2843     return err;
2844 }
2845 
checkConsistency()2846 int DBRM::checkConsistency() throw()
2847 {
2848 #ifdef BRM_INFO
2849 
2850     if (fDebug) TRACER_WRITENOW("checkConsistency");
2851 
2852 #endif
2853     bool locked[2] = {false, false};
2854 
2855     try
2856     {
2857         em->checkConsistency();
2858     }
2859     catch (exception& e)
2860     {
2861         cerr << e.what() << endl;
2862         return -1;
2863     }
2864 
2865     try
2866     {
2867         vbbm->lock(VBBM::READ);
2868         locked[0] = true;
2869         vss->lock(VSS::READ);
2870         locked[1] = true;
2871         vss->checkConsistency(*vbbm, *em);
2872         vss->release(VSS::READ);
2873         locked[1] = false;
2874         vbbm->release(VBBM::READ);
2875         locked[0] = false;
2876     }
2877     catch (exception& e)
2878     {
2879         cerr << e.what() << endl;
2880 
2881         if (locked[1])
2882             vss->release(VSS::READ);
2883 
2884         if (locked[0])
2885             vbbm->release(VBBM::READ);
2886 
2887         return -1;
2888     }
2889 
2890     try
2891     {
2892         vbbm->lock(VBBM::READ);
2893         vbbm->checkConsistency();
2894         vbbm->release(VBBM::READ);
2895     }
2896     catch (exception& e)
2897     {
2898         cerr << e.what() << endl;
2899         vbbm->release(VBBM::READ);
2900         return -1;
2901     }
2902 
2903     return 0;
2904 }
2905 
getCurrentTxnIDs(set<VER_t> & txnList)2906 int DBRM::getCurrentTxnIDs(set<VER_t>& txnList) throw()
2907 {
2908 #ifdef BRM_INFO
2909 
2910     if (fDebug) TRACER_WRITENOW("getCurrentTxnIDs");
2911 
2912 #endif
2913     bool locked[2] = { false, false };
2914 
2915     try
2916     {
2917         txnList.clear();
2918         vss->lock(VSS::READ);
2919         locked[0] = true;
2920         copylocks->lock(CopyLocks::READ);
2921         locked[1] = true;
2922         copylocks->getCurrentTxnIDs(txnList);
2923         vss->getCurrentTxnIDs(txnList);
2924         copylocks->release(CopyLocks::READ);
2925         locked[1] = false;
2926         vss->release(VSS::READ);
2927         locked[0] = false;
2928     }
2929     catch (exception& e)
2930     {
2931         if (locked[1])
2932             copylocks->release(CopyLocks::READ);
2933 
2934         if (locked[0])
2935             vss->release(VSS::READ);
2936 
2937         cerr << e.what() << endl;
2938         return -1;
2939     }
2940 
2941     return 0;
2942 }
2943 
verID()2944 const QueryContext DBRM::verID()
2945 {
2946 #ifdef BRM_INFO
2947 
2948     if (fDebug) TRACER_WRITENOW("verID");
2949 
2950 #endif
2951     ByteStream command, response;
2952     uint8_t err;
2953     QueryContext ret;
2954 
2955     command << VER_ID;
2956     err = send_recv(command, response);
2957 
2958     if (err != ERR_OK)
2959     {
2960         cerr << "DBRM: SessionManager::verID(): network error" << endl;
2961         ret.currentScn = -1;
2962         return ret;
2963     }
2964 
2965     try
2966     {
2967         response >> err;
2968         response >> ret;
2969         CHECK_EMPTY(response);
2970     }
2971     catch (exception& e)
2972     {
2973         cerr << "DBRM: SessionManager::verID(): bad response" << endl;
2974         log("DBRM: SessionManager::verID(): bad response", logging::LOG_TYPE_WARNING);
2975         ret.currentScn = -1;
2976     }
2977 
2978     return ret;
2979 }
2980 
sysCatVerID()2981 const QueryContext DBRM::sysCatVerID()
2982 {
2983 #ifdef BRM_INFO
2984 
2985     if (fDebug) TRACER_WRITENOW("sysCatVerID");
2986 
2987 #endif
2988     ByteStream command, response;
2989     uint8_t err;
2990     QueryContext ret;
2991 
2992     command << SYSCAT_VER_ID;
2993     err = send_recv(command, response);
2994 
2995     if (err != ERR_OK)
2996     {
2997         cerr << "DBRM: SessionManager::sysCatVerID(): network error" << endl;
2998         ret.currentScn = -1;
2999         return ret;
3000     }
3001 
3002     try
3003     {
3004         response >> err;
3005         response >> ret;
3006         CHECK_EMPTY(response);
3007     }
3008     catch (exception& e)
3009     {
3010         cerr << "DBRM: SessionManager::sysCatVerID(): bad response" << endl;
3011         log("DBRM: SessionManager::sysCatVerID(): bad response", logging::LOG_TYPE_WARNING);
3012         ret.currentScn = -1;
3013     }
3014 
3015     return ret;
3016 }
3017 
3018 
newTxnID(const SessionManagerServer::SID session,bool block,bool isDDL)3019 const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block,
3020                            bool isDDL)
3021 {
3022 #ifdef BRM_INFO
3023 
3024     if (fDebug)
3025     {
3026         TRACER_WRITELATER("newTxnID");
3027         TRACER_ADDINPUT(session);
3028         TRACER_ADDBOOLINPUT(block);
3029         TRACER_WRITE;
3030     }
3031 
3032 #endif
3033 
3034     ByteStream command, response;
3035     uint8_t err, tmp;
3036     uint32_t tmp32;
3037     TxnID ret;
3038 
3039     command << NEW_TXN_ID << session << (uint8_t) block << (uint8_t)isDDL;
3040     err = send_recv(command, response);
3041 
3042     if (err != ERR_OK)
3043     {
3044         log("DBRM: SessionManager::newTxnID(): network error");
3045         ret.valid = false;
3046         return ret;
3047     }
3048 
3049     if (response.length() != 6)
3050     {
3051         log("DBRM: SessionManager::newTxnID(): bad response");
3052         ret.valid = false;
3053         return ret;
3054     }
3055 
3056     response >> err;
3057     response >> tmp32;
3058     ret.id = tmp32;
3059     response >> tmp;
3060     ret.valid = (tmp == 0 ? false : true);
3061     CHECK_EMPTY(response);
3062     return ret;
3063 }
3064 
committed(TxnID & txnid)3065 void DBRM::committed(TxnID& txnid)
3066 {
3067 #ifdef BRM_INFO
3068 
3069     if (fDebug)
3070     {
3071         TRACER_WRITELATER("committed");
3072         TRACER_ADDINPUT(txnid);
3073         TRACER_WRITE;
3074     }
3075 
3076 #endif
3077 
3078     ByteStream command, response;
3079     uint8_t err;
3080 
3081     command << COMMITTED << (uint32_t) txnid.id << (uint8_t) txnid.valid;
3082     err = send_recv(command, response);
3083     txnid.valid = false;
3084 
3085     if (err != ERR_OK)
3086         log("DBRM: error: SessionManager::committed() failed");
3087     else if (response.length() != 1)
3088         log("DBRM: error: SessionManager::committed() failed (bad response)",
3089             logging::LOG_TYPE_ERROR);
3090 
3091     response >> err;
3092 
3093     if (err != ERR_OK)
3094         log("DBRM: error: SessionManager::committed() failed (valid error code)",
3095             logging::LOG_TYPE_ERROR);
3096 
3097 }
3098 
3099 
rolledback(TxnID & txnid)3100 void DBRM::rolledback(TxnID& txnid)
3101 {
3102 #ifdef BRM_INFO
3103 
3104     if (fDebug)
3105     {
3106         TRACER_WRITELATER("rolledback");
3107         TRACER_ADDINPUT(txnid);
3108         TRACER_WRITE;
3109     }
3110 
3111 #endif
3112 
3113     ByteStream command, response;
3114     uint8_t err, tmp;
3115 
3116     command << ROLLED_BACK << (uint32_t) txnid.id << (uint8_t) txnid.valid;
3117     err = send_recv(command, response);
3118     txnid.valid = false;
3119 
3120     if (err != ERR_OK)
3121         log("DBRM: error: SessionManager::rolledback() failed (network)");
3122     else if (response.length() != 1)
3123         log("DBRM: error: SessionManager::rolledback() failed (bad response)",
3124             logging::LOG_TYPE_ERROR);
3125 
3126     response >> tmp;
3127 
3128     if (tmp != ERR_OK)
3129     {
3130         if (getSystemReady() != 0 )
3131             log("DBRM: error: SessionManager::rolledback() failed (valid error code)",
3132                 logging::LOG_TYPE_ERROR);
3133     }
3134 }
3135 
getUnlockedLBIDs(BlockList_t * list)3136 int DBRM::getUnlockedLBIDs(BlockList_t* list) DBRM_THROW
3137 {
3138     bool locked = false;
3139 
3140     list->clear();
3141 
3142     try
3143     {
3144         vss->lock(VSS::READ);
3145         locked = true;
3146         vss->getUnlockedLBIDs(*list);
3147         vss->release(VSS::READ);
3148         locked = false;
3149         return 0;
3150     }
3151     catch (exception& e)
3152     {
3153         if (locked)
3154             vss->release(VSS::READ);
3155 
3156         cerr << e.what() << endl;
3157         return -1;
3158     }
3159 }
3160 
3161 
getTxnID(const SessionManagerServer::SID session)3162 const TxnID DBRM::getTxnID
3163 (const SessionManagerServer::SID session)
3164 {
3165 #ifdef BRM_INFO
3166 
3167     if (fDebug)
3168     {
3169         TRACER_WRITELATER("getTxnID");
3170         TRACER_ADDINPUT(session);
3171         TRACER_WRITE;
3172     }
3173 
3174 #endif
3175 
3176     ByteStream command, response;
3177     uint8_t err, tmp8;
3178     uint32_t tmp32;
3179     TxnID ret;
3180 
3181     command << GET_TXN_ID << (uint32_t) session;
3182     err = send_recv(command, response);
3183 
3184     if (err != ERR_OK)
3185     {
3186         log("DBRM: error: SessionManager::getTxnID() failed (network)", logging::LOG_TYPE_ERROR);
3187         ret.valid = false;
3188         return ret;
3189     }
3190 
3191     response >> err;
3192 
3193     if (err != ERR_OK)
3194     {
3195         log("DBRM: error: SessionManager::getTxnID() failed (got an error)",
3196             logging::LOG_TYPE_ERROR);
3197         ret.valid = false;
3198         return ret;
3199     }
3200 
3201     response >> tmp32 >> tmp8;
3202     ret.id = tmp32;
3203     ret.valid = tmp8;
3204     return ret;
3205 }
3206 
SIDTIDMap(int & len)3207 boost::shared_array<SIDTIDEntry> DBRM::SIDTIDMap(int& len)
3208 {
3209 #ifdef BRM_INFO
3210 
3211     if (fDebug)
3212     {
3213         TRACER_WRITELATER("SIDTIDMap");
3214         TRACER_ADDOUTPUT(len);
3215         TRACER_WRITE;
3216     }
3217 
3218 #endif
3219 
3220     ByteStream command, response;
3221     uint8_t err, tmp8;
3222     uint32_t tmp32;
3223     int i;
3224     boost::shared_array<SIDTIDEntry> ret;
3225 
3226     command << SID_TID_MAP;
3227     err = send_recv(command, response);
3228 
3229     if (err != ERR_OK)
3230     {
3231         log("DBRM: error: SessionManager::SIDTIDEntry() failed (network)");
3232         return ret;
3233     }
3234 
3235     response >> err;
3236 
3237     if (err != ERR_OK)
3238     {
3239         log("DBRM: error: SessionManager::SIDTIDEntry() failed (valid error code)",
3240             logging::LOG_TYPE_ERROR);
3241         return ret;
3242     }
3243 
3244     response >> tmp32;
3245     len = (int) tmp32;
3246     ret.reset(new SIDTIDEntry[len]);
3247 
3248     for (i = 0; i < len; i++)
3249     {
3250         response >> tmp32 >> tmp8;
3251         ret[i].txnid.id = tmp32;
3252         ret[i].txnid.valid = (tmp8 == 0 ? false : true);
3253         response >> tmp32;
3254         ret[i].sessionid = tmp32;
3255     }
3256 
3257     CHECK_EMPTY(response);
3258     return ret;
3259 }
3260 
getUnique32()3261 uint32_t DBRM::getUnique32()
3262 {
3263 #ifdef BRM_INFO
3264 
3265     if (fDebug) TRACER_WRITENOW("getUnique32");
3266 
3267 #endif
3268 
3269     ByteStream command, response;
3270     uint8_t err;
3271     uint32_t ret;
3272 
3273     command << GET_UNIQUE_UINT32;
3274     err = send_recv(command, response);
3275 
3276     if (err != ERR_OK)
3277     {
3278         cerr << "DBRM: getUnique32() failed (network)\n";
3279         log("DBRM: getUnique32() failed (network)", logging::LOG_TYPE_ERROR);
3280         throw runtime_error("DBRM: getUnique32() failed check the controllernode");
3281         return 0;
3282     }
3283 
3284     /* Some jobsteps don't need the connection after this so close it to free up
3285     resources on the controller node */
3286     /* Comment the following 4 lines out. The DBRM instance is a singleton so no need to
3287     remove the client. Plus, it may cause weird network issue when the socket is being
3288     released and re-established very quickly*/
3289     //pthread_mutex_lock(&mutex);
3290     //delete msgClient;
3291     //msgClient = NULL;
3292     //pthread_mutex_unlock(&mutex);
3293 
3294     response >> err;
3295 
3296     if (err != ERR_OK)
3297     {
3298         cerr << "DBRM: getUnique32() failed (got an error)\n";
3299         log("DBRM: getUnique32() failed (got an error)",
3300             logging::LOG_TYPE_ERROR);
3301         throw runtime_error("DBRM: getUnique32() failed check the controllernode");
3302         return 0;
3303     }
3304 
3305     response >> ret;
3306 // 	cerr << "DBRM returning " << ret << endl;
3307     return ret;
3308 }
3309 
getUnique64()3310 uint64_t DBRM::getUnique64()
3311 {
3312 #ifdef BRM_INFO
3313 
3314     if (fDebug) TRACER_WRITENOW("getUnique64");
3315 
3316 #endif
3317 
3318     ByteStream command, response;
3319     uint8_t err;
3320     uint64_t ret;
3321 
3322     command << GET_UNIQUE_UINT64;
3323     err = send_recv(command, response);
3324 
3325     if (err != ERR_OK)
3326     {
3327         cerr << "DBRM: getUnique64() failed (network)\n";
3328         log("DBRM: getUnique64() failed (network)", logging::LOG_TYPE_ERROR);
3329         throw runtime_error("DBRM: getUnique64() failed check the controllernode");
3330         return 0;
3331     }
3332 
3333     /* Some jobsteps don't need the connection after this so close it to free up
3334     resources on the controller node */
3335     /* Comment the following 4 lines out. The DBRM instance is a singleton so no need to
3336     remove the client. Plus, it may cause weird network issue when the socket is being
3337     released and re-established very quickly*/
3338     //pthread_mutex_lock(&mutex);
3339     //delete msgClient;
3340     //msgClient = NULL;
3341     //pthread_mutex_unlock(&mutex);
3342 
3343     response >> err;
3344 
3345     if (err != ERR_OK)
3346     {
3347         cerr << "DBRM: getUnique64() failed (got an error)\n";
3348         log("DBRM: getUnique64() failed (got an error)",
3349             logging::LOG_TYPE_ERROR);
3350         throw runtime_error("DBRM: getUnique64() failed check the controllernode");
3351         return 0;
3352     }
3353 
3354     response >> ret;
3355 // 	cerr << "DBRM returning " << ret << endl;
3356     return ret;
3357 }
3358 
sessionmanager_reset()3359 void DBRM::sessionmanager_reset()
3360 {
3361     ByteStream command, response;
3362     command << SM_RESET;
3363     send_recv(command, response);
3364 }
3365 
isEMEmpty()3366 bool DBRM::isEMEmpty() throw()
3367 {
3368     bool res = false;
3369 
3370     try
3371     {
3372         res = em->empty();
3373     }
3374     catch (...)
3375     {
3376         res = false;
3377     }
3378 
3379     return res;
3380 }
3381 
getEMFreeListEntries()3382 vector<InlineLBIDRange> DBRM::getEMFreeListEntries() throw()
3383 {
3384     vector<InlineLBIDRange> res;
3385 
3386     try
3387     {
3388         res = em->getFreeListEntries();
3389     }
3390     catch (...)
3391     {
3392         res.clear();
3393     }
3394 
3395     return res;
3396 }
3397 
takeSnapshot()3398 int DBRM::takeSnapshot() throw ()
3399 {
3400     return 0;   // don't know why, but we're calling this all the time.  Need to take most/all of those calls out, it's very wasteful.
3401 
3402     ByteStream command, response;
3403     uint8_t  err;
3404 
3405     command << TAKE_SNAPSHOT;
3406     err = send_recv(command, response);
3407 
3408     if (err != ERR_OK)
3409         return err;
3410 
3411     if (response.length() == 0)
3412         return ERR_NETWORK;
3413 
3414     return 0;
3415 }
3416 
getSystemReady()3417 int DBRM::getSystemReady() throw()
3418 {
3419     uint32_t stateFlags;
3420 
3421     if (getSystemState(stateFlags) < 0)
3422     {
3423         return -1;
3424     }
3425 
3426     return (stateFlags & SessionManagerServer::SS_READY);
3427 }
3428 
getSystemQueryReady()3429 int DBRM::getSystemQueryReady() throw()
3430 {
3431     uint32_t stateFlags;
3432 
3433     if (getSystemState(stateFlags) < 0)
3434     {
3435         return -1;
3436     }
3437 
3438     return (stateFlags & SessionManagerServer::SS_QUERY_READY);
3439 }
3440 
getSystemSuspended()3441 int DBRM::getSystemSuspended() throw()
3442 {
3443     uint32_t stateFlags;
3444 
3445     if (getSystemState(stateFlags) < 0)
3446     {
3447         return -1;
3448     }
3449 
3450     return (stateFlags & SessionManagerServer::SS_SUSPENDED);
3451 }
3452 
getSystemSuspendPending(bool & bRollback)3453 int DBRM::getSystemSuspendPending(bool& bRollback) throw()
3454 {
3455     uint32_t stateFlags;
3456 
3457     if (getSystemState(stateFlags) < 0)
3458     {
3459         return -1;
3460     }
3461 
3462     bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK;
3463 
3464     return (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING);
3465 }
3466 
getSystemShutdownPending(bool & bRollback,bool & bForce)3467 int DBRM::getSystemShutdownPending(bool& bRollback, bool& bForce) throw()
3468 {
3469     uint32_t stateFlags;
3470 
3471     if (getSystemState(stateFlags) < 0)
3472     {
3473         return -1;
3474     }
3475 
3476     bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK;
3477     bForce = stateFlags & SessionManagerServer::SS_FORCE;
3478 
3479     return (stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING);
3480 }
3481 
setSystemReady(bool bReady)3482 int DBRM::setSystemReady(bool bReady) throw()
3483 {
3484     if (bReady)
3485     {
3486         return setSystemState(SessionManagerServer::SS_READY);
3487     }
3488     else
3489     {
3490         return clearSystemState(SessionManagerServer::SS_READY);
3491     }
3492 }
3493 
setSystemQueryReady(bool bReady)3494 int DBRM::setSystemQueryReady(bool bReady) throw()
3495 {
3496     if (bReady)
3497     {
3498         return setSystemState(SessionManagerServer::SS_QUERY_READY);
3499     }
3500     else
3501     {
3502         return clearSystemState(SessionManagerServer::SS_QUERY_READY);
3503     }
3504 }
3505 
setSystemSuspended(bool bSuspended)3506 int DBRM::setSystemSuspended(bool bSuspended) throw()
3507 {
3508     uint32_t stateFlags = 0;
3509 
3510     if (bSuspended)
3511     {
3512         if (setSystemState(SessionManagerServer::SS_SUSPENDED) < 0)
3513         {
3514             return -1;
3515         }
3516     }
3517     else
3518     {
3519         stateFlags = SessionManagerServer::SS_SUSPENDED;
3520     }
3521 
3522     // In either case, we need to clear the pending and rollback flags
3523     stateFlags |= SessionManagerServer::SS_SUSPEND_PENDING;
3524     stateFlags |= SessionManagerServer::SS_ROLLBACK;
3525     return clearSystemState(stateFlags);
3526 }
3527 
setSystemSuspendPending(bool bPending,bool bRollback)3528 int DBRM::setSystemSuspendPending(bool bPending, bool bRollback) throw()
3529 {
3530     uint32_t stateFlags = SessionManagerServer::SS_SUSPEND_PENDING;
3531 
3532     if (bPending)
3533     {
3534         if (bRollback)
3535         {
3536             stateFlags |= SessionManagerServer::SS_ROLLBACK;
3537         }
3538 
3539         return setSystemState(stateFlags);
3540     }
3541     else
3542     {
3543         stateFlags |= SessionManagerServer::SS_ROLLBACK;
3544         return clearSystemState(stateFlags);
3545     }
3546 }
3547 
setSystemShutdownPending(bool bPending,bool bRollback,bool bForce)3548 int DBRM::setSystemShutdownPending(bool bPending, bool bRollback, bool bForce) throw()
3549 {
3550     int rtn = 0;
3551     uint32_t stateFlags = SessionManagerServer::SS_SHUTDOWN_PENDING;
3552 
3553     if (bPending)
3554     {
3555         if (bForce)
3556         {
3557             stateFlags |= SessionManagerServer::SS_FORCE;
3558         }
3559         else if (bRollback)
3560         {
3561             stateFlags |= SessionManagerServer::SS_ROLLBACK;
3562         }
3563 
3564         rtn = setSystemState(stateFlags);
3565     }
3566     else
3567     {
3568         stateFlags |= SessionManagerServer::SS_ROLLBACK;
3569         stateFlags |= SessionManagerServer::SS_FORCE;
3570         rtn = clearSystemState(stateFlags);		// Clears the flags that are turned on in stateFlags
3571     }
3572 
3573     return rtn;
3574 }
3575 
3576 /* Return the shm stateflags
3577  */
getSystemState(uint32_t & stateFlags)3578 int DBRM::getSystemState(uint32_t& stateFlags) throw()
3579 {
3580     try
3581     {
3582 #ifdef BRM_INFO
3583 
3584         if (fDebug)
3585         {
3586             TRACER_WRITELATER("getSystemState");
3587             TRACER_WRITE;
3588         }
3589 
3590 #endif
3591         ByteStream command, response;
3592         uint8_t err;
3593 
3594         command << GET_SYSTEM_STATE;
3595         err = send_recv(command, response);
3596 
3597         if (err != ERR_OK)
3598         {
3599             std::ostringstream oss;
3600             oss << "DBRM: error: SessionManager::getSystemState() failed (network)";
3601             log(oss.str(), logging::LOG_TYPE_ERROR);
3602             return -1;
3603         }
3604 
3605         response >> err;
3606 
3607         if (err != ERR_OK)
3608         {
3609             std::ostringstream oss;
3610             oss << "DBRM: error: SessionManager::getSystemState() failed (error " << err << ")";
3611             log(oss.str(), logging::LOG_TYPE_ERROR);
3612             return -1;
3613         }
3614 
3615         response >> stateFlags;
3616         return 1;
3617     }
3618     catch (...)
3619     {
3620     }
3621 
3622     return -1;
3623 }
3624 
3625 /* Set the shm stateflags that are set in the parameter
3626  */
setSystemState(uint32_t stateFlags)3627 int DBRM::setSystemState(uint32_t stateFlags) throw()
3628 {
3629     try
3630     {
3631 #ifdef BRM_INFO
3632 
3633         if (fDebug)
3634         {
3635             TRACER_WRITELATER("setSystemState");
3636             TRACER_WRITE;
3637         }
3638 
3639 #endif
3640 
3641         ByteStream command, response;
3642         uint8_t err;
3643 
3644         command << SET_SYSTEM_STATE << static_cast<ByteStream::quadbyte>(stateFlags);
3645         err = send_recv(command, response);
3646 
3647         if (err != ERR_OK)
3648         {
3649             std::ostringstream oss;
3650             oss << "DBRM: error: SessionManager::setSystemState() failed (network)";
3651             log(oss.str(), logging::LOG_TYPE_ERROR);
3652             stateFlags = 0;
3653             return -1;
3654         }
3655 
3656         response >> err;
3657 
3658         if (err != ERR_OK)
3659         {
3660             std::ostringstream oss;
3661             oss << "DBRM: error: SessionManager::setSystemState() failed (got an error)";
3662             log(oss.str(), logging::LOG_TYPE_ERROR);
3663             stateFlags = 0;
3664             return -1;
3665         }
3666 
3667         return 1;
3668     }
3669     catch (...)
3670     {
3671     }
3672 
3673     stateFlags = 0;
3674     return -1;
3675 }
3676 
3677 /* Clear the shm stateflags that are set in the parameter
3678  */
clearSystemState(uint32_t stateFlags)3679 int DBRM::clearSystemState(uint32_t stateFlags) throw()
3680 {
3681     try
3682     {
3683 #ifdef BRM_INFO
3684 
3685         if (fDebug)
3686         {
3687             TRACER_WRITELATER("clearSystemState");
3688             TRACER_WRITE;
3689         }
3690 
3691 #endif
3692 
3693         ByteStream command, response;
3694         uint8_t err;
3695 
3696         command << CLEAR_SYSTEM_STATE << static_cast<ByteStream::quadbyte>(stateFlags);
3697         err = send_recv(command, response);
3698 
3699         if (err != ERR_OK)
3700         {
3701             std::ostringstream oss;
3702             oss << "DBRM: error: SessionManager::clearSystemState() failed (network)";
3703             log(oss.str(), logging::LOG_TYPE_ERROR);
3704             return -1;
3705         }
3706 
3707         response >> err;
3708 
3709         if (err != ERR_OK)
3710         {
3711             std::ostringstream oss;
3712             oss << "DBRM: error: SessionManager::clearSystemState() failed (got an error)";
3713             log(oss.str(), logging::LOG_TYPE_ERROR);
3714             return -1;
3715         }
3716 
3717         return 1;
3718     }
3719     catch (...)
3720     {
3721     }
3722 
3723     return -1;
3724 }
3725 
3726 /* Ping the controller node. Don't print anything.
3727  */
isDBRMReady()3728 bool DBRM::isDBRMReady() throw()
3729 {
3730 #ifdef BRM_INFO
3731 
3732     if (fDebug) TRACER_WRITENOW("isDBRMReady");
3733 
3734 #endif
3735     boost::mutex::scoped_lock scoped(mutex);
3736 
3737     try
3738     {
3739         for (int attempt = 0; attempt < 2; ++attempt)
3740         {
3741             try
3742             {
3743                 if (msgClient == NULL)
3744                 {
3745                     msgClient = MessageQueueClientPool::getInstance(masterName);
3746                 }
3747 
3748                 if (msgClient->connect())
3749                 {
3750                     return true;
3751                 }
3752             }
3753             catch (...)
3754             {
3755             }
3756 
3757             MessageQueueClientPool::releaseInstance(msgClient);
3758             msgClient = NULL;
3759             sleep(1);
3760         }
3761     }
3762     catch (...)
3763     {
3764     }
3765 
3766     return false;
3767 }
3768 
3769 /* This waits for the lock up to 30 sec.  After 30 sec, the assumption is something
3770  * bad happened, and this will fix the lock state so that primproc can keep
3771  * running.  These prevent a non-critical problem anyway.
3772  */
lockLBIDRange(LBID_t start,uint32_t count)3773 void DBRM::lockLBIDRange(LBID_t start, uint32_t count)
3774 {
3775     bool locked = false, lockedRange = false;
3776     LBIDRange range;
3777     const uint32_t waitInterval = 50000;  // in usec
3778     const uint32_t maxRetries = 30000000 / waitInterval; // 30 secs
3779     uint32_t retries = 0;
3780 
3781     range.start = start;
3782     range.size = count;
3783 
3784     try
3785     {
3786         copylocks->lock(CopyLocks::WRITE);
3787         locked = true;
3788 
3789         while (copylocks->isLocked(range) && retries < maxRetries)
3790         {
3791             copylocks->release(CopyLocks::WRITE);
3792             locked = false;
3793             usleep(waitInterval);
3794             retries++;
3795             copylocks->lock(CopyLocks::WRITE);
3796             locked = true;
3797         }
3798 
3799         if (retries >= maxRetries)
3800             copylocks->forceRelease(range);
3801 
3802         copylocks->lockRange(range, -1);
3803         lockedRange = true;
3804         copylocks->confirmChanges();
3805         copylocks->release(CopyLocks::WRITE);
3806         locked = false;
3807     }
3808     catch (...)
3809     {
3810         if (lockedRange)
3811             copylocks->releaseRange(range);
3812 
3813         if (locked)
3814         {
3815             copylocks->confirmChanges();
3816             copylocks->release(CopyLocks::WRITE);
3817         }
3818 
3819         throw;
3820     }
3821 }
3822 
releaseLBIDRange(LBID_t start,uint32_t count)3823 void DBRM::releaseLBIDRange(LBID_t start, uint32_t count)
3824 {
3825     bool locked = false;
3826     LBIDRange range;
3827     range.start = start;
3828     range.size = count;
3829 
3830     try
3831     {
3832         copylocks->lock(CopyLocks::WRITE);
3833         locked = true;
3834         copylocks->releaseRange(range);
3835         copylocks->confirmChanges();
3836         copylocks->release(CopyLocks::WRITE);
3837         locked = false;
3838     }
3839     catch (...)
3840     {
3841         if (locked)
3842         {
3843             copylocks->confirmChanges();
3844             copylocks->release(CopyLocks::WRITE);
3845         }
3846 
3847         throw;
3848     }
3849 }
3850 
3851 /* OID Manager section */
3852 
allocOIDs(int num)3853 int DBRM::allocOIDs(int num)
3854 {
3855 #ifdef BRM_INFO
3856 
3857     if (fDebug) TRACER_WRITENOW("allocOID");
3858 
3859 #endif
3860     ByteStream command, response;
3861     uint8_t err;
3862     uint32_t ret;
3863 
3864     command << ALLOC_OIDS;
3865     command << (uint32_t) num;
3866     err = send_recv(command, response);
3867 
3868     if (err != ERR_OK)
3869     {
3870         cerr << "DBRM: OIDManager::allocOIDs(): network error" << endl;
3871         log("DBRM: OIDManager::allocOIDs(): network error", logging::LOG_TYPE_CRITICAL);
3872         return -1;
3873     }
3874 
3875     try
3876     {
3877         response >> err;
3878 
3879         if (err != ERR_OK)
3880             return -1;
3881 
3882         response >> ret;
3883         CHECK_EMPTY(response);
3884         return (int) ret;
3885     }
3886     catch (...)
3887     {
3888         log("DBRM: OIDManager::allocOIDs(): bad response", logging::LOG_TYPE_CRITICAL);
3889         return -1;
3890     }
3891 }
3892 
returnOIDs(int start,int end)3893 void DBRM::returnOIDs(int start, int end)
3894 {
3895     ByteStream command, response;
3896     uint8_t err;
3897 
3898     command << RETURN_OIDS;
3899     command << (uint32_t) start;
3900     command << (uint32_t) end;
3901     err = send_recv(command, response);
3902 
3903     if (err == ERR_NETWORK)
3904     {
3905         cerr << "DBRM: OIDManager::returnOIDs(): network error" << endl;
3906         log("DBRM: OIDManager::returnOIDs(): network error", logging::LOG_TYPE_CRITICAL);
3907         throw runtime_error("DBRM: OIDManager::returnOIDs(): network error");
3908     }
3909 
3910     try
3911     {
3912         response >> err;
3913         CHECK_EMPTY(response);
3914     }
3915     catch (...)
3916     {
3917         err = ERR_FAILURE;
3918     }
3919 
3920     if (err != ERR_OK)
3921     {
3922         log("DBRM: OIDManager::returnOIDs() failed", logging::LOG_TYPE_CRITICAL);
3923         throw runtime_error("DBRM: OIDManager::returnOIDs() failed");
3924     }
3925 }
3926 
oidm_size()3927 int DBRM::oidm_size()
3928 {
3929     ByteStream command, response;
3930     uint8_t err;
3931     uint32_t ret;
3932 
3933     command << OIDM_SIZE;
3934     err = send_recv(command, response);
3935 
3936     if (err != ERR_OK)
3937     {
3938         cerr << "DBRM: OIDManager::size(): network error" << endl;
3939         log("DBRM: OIDManager::size(): network error", logging::LOG_TYPE_CRITICAL);
3940         throw runtime_error("DBRM: OIDManager::size(): network error");
3941     }
3942 
3943     try
3944     {
3945         response >> err;
3946 
3947         if (err == ERR_OK)
3948         {
3949             response >> ret;
3950             CHECK_EMPTY(response);
3951             return ret;
3952         }
3953 
3954         CHECK_EMPTY(response);
3955         return -1;
3956     }
3957     catch (...)
3958     {
3959         log("DBRM: OIDManager::size(): bad response", logging::LOG_TYPE_CRITICAL);
3960         throw runtime_error("DBRM: OIDManager::size(): bad response");
3961     }
3962 }
3963 
allocVBOID(uint32_t dbroot)3964 int DBRM::allocVBOID(uint32_t dbroot)
3965 {
3966     ByteStream command, response;
3967     uint8_t err;
3968     uint32_t ret;
3969 
3970     command << ALLOC_VBOID << (uint32_t) dbroot;
3971     err = send_recv(command, response);
3972 
3973     if (err != ERR_OK)
3974     {
3975         cerr << "DBRM: OIDManager::allocVBOID(): network error" << endl;
3976         log("DBRM: OIDManager::allocVBOID(): network error", logging::LOG_TYPE_CRITICAL);
3977         return -1;
3978     }
3979 
3980     try
3981     {
3982         response >> err;
3983 
3984         if (err == ERR_OK)
3985         {
3986             response >> ret;
3987             CHECK_EMPTY(response);
3988             return ret;
3989         }
3990 
3991         CHECK_EMPTY(response);
3992         return -1;
3993     }
3994     catch (...)
3995     {
3996         log("DBRM: OIDManager::allocVBOID(): bad response", logging::LOG_TYPE_CRITICAL);
3997         return -1;
3998     }
3999 }
4000 
getDBRootOfVBOID(uint32_t vbOID)4001 int DBRM::getDBRootOfVBOID(uint32_t vbOID)
4002 {
4003     ByteStream command, response;
4004     uint8_t err;
4005     uint32_t ret;
4006 
4007     command << GETDBROOTOFVBOID << (uint32_t) vbOID;
4008     err = send_recv(command, response);
4009 
4010     if (err != ERR_OK)
4011     {
4012         cerr << "DBRM: OIDManager::getDBRootOfVBOID(): network error" << endl;
4013         log("DBRM: OIDManager::getDBRootOfVBOID(): network error", logging::LOG_TYPE_CRITICAL);
4014         return -1;
4015     }
4016 
4017     try
4018     {
4019         response >> err;
4020 
4021         if (err == ERR_OK)
4022         {
4023             response >> ret;
4024             CHECK_EMPTY(response);
4025             return (int) ret;
4026         }
4027 
4028         CHECK_EMPTY(response);
4029         return -1;
4030     }
4031     catch (...)
4032     {
4033         log("DBRM: OIDManager::getDBRootOfVBOID(): bad response", logging::LOG_TYPE_CRITICAL);
4034         return -1;
4035     }
4036 }
4037 
getVBOIDToDBRootMap()4038 vector<uint16_t> DBRM::getVBOIDToDBRootMap()
4039 {
4040     ByteStream command, response;
4041     uint8_t err;
4042     vector<uint16_t> ret;
4043 
4044     command << GETVBOIDTODBROOTMAP;
4045     err = send_recv(command, response);
4046 
4047     if (err != ERR_OK)
4048     {
4049         log("DBRM: OIDManager::getVBOIDToDBRootMap(): network error", logging::LOG_TYPE_CRITICAL);
4050         throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): network error");
4051     }
4052 
4053     try
4054     {
4055         response >> err;
4056 
4057         if (err != ERR_OK)
4058         {
4059             log("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error", logging::LOG_TYPE_CRITICAL);
4060             throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error");
4061         }
4062 
4063         deserializeInlineVector<uint16_t>(response, ret);
4064         CHECK_EMPTY(response);
4065         return ret;
4066     }
4067     catch (...)
4068     {
4069         log("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response", logging::LOG_TYPE_CRITICAL);
4070         throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response");
4071     }
4072 }
4073 
getTableLock(const vector<uint32_t> & pmList,uint32_t tableOID,string * ownerName,uint32_t * ownerPID,int32_t * ownerSessionID,int32_t * ownerTxnID,LockState state)4074 uint64_t DBRM::getTableLock(const vector<uint32_t>& pmList, uint32_t tableOID,
4075                             string* ownerName, uint32_t* ownerPID, int32_t* ownerSessionID, int32_t* ownerTxnID, LockState state)
4076 {
4077     ByteStream command, response;
4078     uint8_t err;
4079     uint64_t ret;
4080     TableLockInfo tli;
4081     uint32_t tmp32;
4082     vector<uint32_t> dbRootsList;
4083     OamCache* oamcache = OamCache::makeOamCache();
4084     OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap();
4085     int moduleId = 0;
4086 
4087     for (uint32_t i = 0; i < pmList.size(); i++)
4088     {
4089         moduleId = pmList[i];
4090         vector<int> dbroots = (*pmDbroots)[moduleId];
4091 
4092         for (uint32_t j = 0; j < dbroots.size(); j++)
4093             dbRootsList.push_back((uint32_t)dbroots[j]);
4094     }
4095 
4096     tli.id = 0;
4097     tli.ownerName = *ownerName;
4098     tli.ownerPID = *ownerPID;
4099     tli.ownerSessionID = *ownerSessionID;
4100     tli.ownerTxnID = *ownerTxnID;
4101     tli.dbrootList = dbRootsList;
4102     tli.state = state;
4103     tli.tableOID = tableOID;
4104     tli.creationTime = time(NULL);
4105 
4106     command << GET_TABLE_LOCK << tli;
4107     err = send_recv(command, response);
4108 
4109     if (err != ERR_OK)
4110     {
4111         log("DBRM: getTableLock(): network error", logging::LOG_TYPE_CRITICAL);
4112         throw runtime_error("DBRM: getTableLock(): network error");
4113     }
4114 
4115     response >> err;
4116 
4117     /* TODO: this means a save failure, need a specific exception type */
4118     if (err != ERR_OK)
4119         throw runtime_error("Table lock save file failure");
4120 
4121     response >> ret;
4122 
4123     if (ret == 0)
4124     {
4125         response >> *ownerPID;
4126         response >> *ownerName;
4127         response >> tmp32;
4128         *ownerSessionID = tmp32;
4129         response >> tmp32;
4130         *ownerTxnID = tmp32;
4131     }
4132 
4133     idbassert(response.length() == 0);
4134     return ret;
4135 }
4136 
releaseTableLock(uint64_t id)4137 bool DBRM::releaseTableLock(uint64_t id)
4138 {
4139     ByteStream command, response;
4140     uint8_t err;
4141 
4142     command << RELEASE_TABLE_LOCK << id;
4143     err = send_recv(command, response);
4144 
4145     if (err != ERR_OK)
4146     {
4147         log("DBRM: releaseTableLock(): network error", logging::LOG_TYPE_CRITICAL);
4148         throw runtime_error("DBRM: releaseTableLock(): network error");
4149     }
4150 
4151     response >> err;
4152 
4153     /* TODO: this means a save failure, need a specific exception type */
4154     if (err != ERR_OK)
4155         throw runtime_error("Table lock save file failure");
4156 
4157     response >> err;
4158     idbassert(response.length() == 0);
4159 
4160     return (bool) err;
4161 }
4162 
changeState(uint64_t id,LockState state)4163 bool DBRM::changeState(uint64_t id, LockState state)
4164 {
4165     ByteStream command, response;
4166     uint8_t err;
4167 
4168     command << CHANGE_TABLE_LOCK_STATE << id << (uint32_t) state;
4169     err = send_recv(command, response);
4170 
4171     if (err != ERR_OK)
4172     {
4173         log("DBRM: changeState(): network error", logging::LOG_TYPE_CRITICAL);
4174         throw runtime_error("DBRM: changeState(): network error");
4175     }
4176 
4177     response >> err;
4178 
4179     /* TODO: this means a save failure, need a specific exception type */
4180     if (err != ERR_OK)
4181         throw runtime_error("Table lock save file failure");
4182 
4183     response >> err;
4184     idbassert(response.length() == 0);
4185 
4186     return (bool) err;
4187 }
4188 
changeOwner(uint64_t id,const string & ownerName,uint32_t ownerPID,int32_t ownerSessionID,int32_t ownerTxnID)4189 bool DBRM::changeOwner(uint64_t id, const string& ownerName, uint32_t ownerPID, int32_t ownerSessionID,
4190                        int32_t ownerTxnID)
4191 {
4192     ByteStream command, response;
4193     uint8_t err;
4194 
4195     command << CHANGE_TABLE_LOCK_OWNER << id << ownerName << ownerPID <<
4196             (uint32_t) ownerSessionID << (uint32_t) ownerTxnID;
4197     err = send_recv(command, response);
4198 
4199     if (err != ERR_OK)
4200     {
4201         log("DBRM: changeOwner(): network error", logging::LOG_TYPE_CRITICAL);
4202         throw runtime_error("DBRM: changeOwner(): network error");
4203     }
4204 
4205     response >> err;
4206 
4207     /* TODO: this means a save failure, need a specific exception type */
4208     if (err != ERR_OK)
4209         throw runtime_error("Table lock save file failure");
4210 
4211     response >> err;
4212     idbassert(response.length() == 0);
4213     return (bool) err;
4214 }
4215 
checkOwner(uint64_t id)4216 bool DBRM::checkOwner(uint64_t id)
4217 {
4218     ByteStream command, response;
4219     uint8_t err;
4220 
4221     command << OWNER_CHECK << id;
4222     err = send_recv(command, response);
4223 
4224     if (err != ERR_OK)
4225     {
4226         log("DBRM: ownerCheck(): network error", logging::LOG_TYPE_CRITICAL);
4227         throw runtime_error("DBRM: ownerCheck(): network error");
4228     }
4229 
4230     response >> err;
4231 
4232     /* TODO: this means a save failure, need a specific exception type */
4233     if (err != ERR_OK)
4234         throw runtime_error("Table lock save file failure");
4235 
4236     response >> err;
4237     idbassert(response.length() == 0);
4238     return (bool) err;  // Return true means the owner is valid
4239 }
4240 
getAllTableLocks()4241 vector<TableLockInfo> DBRM::getAllTableLocks()
4242 {
4243     ByteStream command, response;
4244     uint8_t err;
4245     vector<TableLockInfo> ret;
4246 
4247     command << GET_ALL_TABLE_LOCKS;
4248     err = send_recv(command, response);
4249 
4250     if (err != ERR_OK)
4251     {
4252         log("DBRM: getAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL);
4253         throw runtime_error("DBRM: getAllTableLocks(): network error");
4254     }
4255 
4256     response >> err;
4257 
4258     if (err != ERR_OK)
4259     {
4260         log("DBRM: getAllTableLocks(): processing error", logging::LOG_TYPE_CRITICAL);
4261         throw runtime_error("DBRM: getAllTableLocks(): processing error");
4262     }
4263 
4264     deserializeVector<TableLockInfo>(response, ret);
4265     idbassert(response.length() == 0);
4266     return ret;
4267 }
4268 
releaseAllTableLocks()4269 void DBRM::releaseAllTableLocks()
4270 {
4271     ByteStream command, response;
4272     uint8_t err;
4273 
4274     command << RELEASE_ALL_TABLE_LOCKS;
4275     err = send_recv(command, response);
4276 
4277     if (err != ERR_OK)
4278     {
4279         log("DBRM: releaseAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL);
4280         throw runtime_error("DBRM: releaseAllTableLocks(): network error");
4281     }
4282 
4283     response >> err;
4284     idbassert(response.length() == 0);
4285 
4286     if (err != ERR_OK)
4287         throw runtime_error("DBRM: releaseAllTableLocks(): processing error");
4288 }
4289 
getTableLockInfo(uint64_t id,TableLockInfo * tli)4290 bool DBRM::getTableLockInfo(uint64_t id, TableLockInfo* tli)
4291 {
4292     ByteStream command, response;
4293     uint8_t err;
4294 
4295     command << GET_TABLE_LOCK_INFO << id;
4296     err = send_recv(command, response);
4297 
4298     if (err != ERR_OK)
4299     {
4300         log("DBRM: getTableLockInfo(): network error", logging::LOG_TYPE_CRITICAL);
4301         throw runtime_error("DBRM: getTableLockInfo(): network error");
4302     }
4303 
4304     response >> err;
4305 
4306     if (err != ERR_OK)
4307         throw runtime_error("DBRM: getTableLockInfo() processing error");
4308 
4309     response >> err;
4310 
4311     if (err)
4312         response >> *tli;
4313 
4314     return (bool) err;
4315 }
4316 
startAISequence(uint32_t OID,uint64_t firstNum,uint32_t colWidth,execplan::CalpontSystemCatalog::ColDataType colDataType)4317 void DBRM::startAISequence(uint32_t OID, uint64_t firstNum, uint32_t colWidth,
4318                            execplan::CalpontSystemCatalog::ColDataType colDataType)
4319 {
4320     ByteStream command, response;
4321     uint8_t err;
4322     uint8_t tmp8 = colDataType;
4323 
4324     command << START_AI_SEQUENCE << OID << firstNum << colWidth << tmp8;
4325     err = send_recv(command, response);
4326 
4327     if (err != ERR_OK)
4328     {
4329         log("DBRM: startAISequence(): network error", logging::LOG_TYPE_CRITICAL);
4330         throw runtime_error("DBRM: startAISequence(): network error");
4331     }
4332 
4333     response >> err;
4334     idbassert(response.length() == 0);
4335 
4336     if (err != ERR_OK)
4337     {
4338         log("DBRM: startAISequence(): processing error", logging::LOG_TYPE_CRITICAL);
4339         throw runtime_error("DBRM: startAISequence(): processing error");
4340     }
4341 }
4342 
getAIRange(uint32_t OID,uint32_t count,uint64_t * firstNum)4343 bool DBRM::getAIRange(uint32_t OID, uint32_t count, uint64_t* firstNum)
4344 {
4345     ByteStream command, response;
4346     uint8_t err;
4347 
4348     command << GET_AI_RANGE << OID << count;
4349     err = send_recv(command, response);
4350 
4351     if (err != ERR_OK)
4352     {
4353         log("DBRM: getAIRange(): network error", logging::LOG_TYPE_CRITICAL);
4354         throw runtime_error("DBRM: getAIRange(): network error");
4355     }
4356 
4357     response >> err;
4358 
4359     if (err != ERR_OK)
4360     {
4361         log("DBRM: getAIRange(): processing error",	logging::LOG_TYPE_CRITICAL);
4362         throw runtime_error("DBRM: getAIRange(): processing error");
4363     }
4364 
4365     response >> err;
4366 
4367     if (err == 0)
4368         return false;
4369 
4370     response >> *firstNum;
4371     idbassert(response.length() == 0);
4372     return true;
4373 }
4374 
getAIValue(uint32_t OID,uint64_t * value)4375 bool DBRM::getAIValue(uint32_t OID, uint64_t* value)
4376 {
4377     return getAIRange(OID, 0, value);
4378 }
4379 
resetAISequence(uint32_t OID,uint64_t value)4380 void DBRM::resetAISequence(uint32_t OID, uint64_t value)
4381 {
4382     ByteStream command, response;
4383     uint8_t err;
4384 
4385     command << RESET_AI_SEQUENCE << OID << value;
4386     err = send_recv(command, response);
4387 
4388     if (err != ERR_OK)
4389     {
4390         log("DBRM: resetAISequence(): network error", logging::LOG_TYPE_CRITICAL);
4391         throw runtime_error("DBRM: resetAISequence(): network error");
4392     }
4393 
4394     response >> err;
4395     idbassert(response.length() == 0);
4396 
4397     if (err != ERR_OK)
4398     {
4399         log("DBRM: resetAISequence(): processing error", logging::LOG_TYPE_CRITICAL);
4400         throw runtime_error("DBRM: resetAISequence(): processing error");
4401     }
4402 }
4403 
getAILock(uint32_t OID)4404 void DBRM::getAILock(uint32_t OID)
4405 {
4406     ByteStream command, response;
4407     uint8_t err;
4408 
4409     command << GET_AI_LOCK << OID;
4410     err = send_recv(command, response);
4411 
4412     if (err != ERR_OK)
4413     {
4414         log("DBRM: getAILock(): network error", logging::LOG_TYPE_CRITICAL);
4415         throw runtime_error("DBRM: getAILock(): network error");
4416     }
4417 
4418     response >> err;
4419     idbassert(response.length() == 0);
4420 
4421     if (err != ERR_OK)
4422     {
4423         log("DBRM: getAILock(): processing error", logging::LOG_TYPE_CRITICAL);
4424         throw runtime_error("DBRM: getAILock(): processing error");
4425     }
4426 }
4427 
releaseAILock(uint32_t OID)4428 void DBRM::releaseAILock(uint32_t OID)
4429 {
4430     ByteStream command, response;
4431     uint8_t err;
4432 
4433     command << RELEASE_AI_LOCK << OID;
4434     err = send_recv(command, response);
4435 
4436     if (err != ERR_OK)
4437     {
4438         log("DBRM: releaseAILock(): network error", logging::LOG_TYPE_CRITICAL);
4439         throw runtime_error("DBRM: releaseAILock(): network error");
4440     }
4441 
4442     response >> err;
4443     idbassert(response.length() == 0);
4444 
4445     if (err != ERR_OK)
4446     {
4447         log("DBRM: releaseAILock(): processing error", logging::LOG_TYPE_CRITICAL);
4448         throw runtime_error("DBRM: releaseAILock(): processing error");
4449     }
4450 }
4451 
deleteAISequence(uint32_t OID)4452 void DBRM::deleteAISequence(uint32_t OID)
4453 {
4454     ByteStream command, response;
4455     uint8_t err;
4456 
4457     command << DELETE_AI_SEQUENCE << OID;
4458     err = send_recv(command, response);
4459 
4460     if (err != ERR_OK)
4461     {
4462         log("DBRM:deleteAILock(): network error", logging::LOG_TYPE_CRITICAL);
4463         throw runtime_error("DBRM: deleteAILock(): network error");
4464     }
4465 
4466     response >> err;
4467     idbassert(response.length() == 0);
4468 
4469     if (err != ERR_OK)
4470     {
4471         log("DBRM: deleteAILock(): processing error", logging::LOG_TYPE_CRITICAL);
4472         throw runtime_error("DBRM: deleteAILock(): processing error");
4473     }
4474 }
4475 
invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN txnid,vector<LBID_t> * plbidList)4476 void DBRM::invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN txnid, vector<LBID_t>* plbidList)
4477 {
4478     // Here we want to minimize the number of calls to dbrm
4479     // Given that, and the fact that we need to know the column type
4480     // in order to set the invalid min and max correctly in the extents,
4481     // We do the following:
4482     // 1) Maintain a vector of all extents we've looked at.
4483     // 2) Get the list of uncommitted lbids for the transaction.
4484     // 3) Look in that list to see if we've already looked at this extent.
4485     // 4) If not,
4486     //    a) lookup the min and max lbid for the extent it belongs to
4487     //    b) lookup the column oid for that lbid
4488     //    c) add to the vector of extents
4489     // 5) Create a list of CPInfo structures with the first lbid and col type of each extent
4490     // 6) Lookup the column type for each retrieved oid.
4491     // 7) mark each extent invalid, just like we would during update. This sets the proper
4492     //    min and max (and set the state to CP_UPDATING.
4493     // 6) Call setExtentsMaxMin to set the state to CP_INVALID.
4494 
4495     vector<LBID_t> localLBIDList;
4496 
4497     boost::shared_ptr<execplan::CalpontSystemCatalog> csc;
4498     CPInfoList_t cpInfos;
4499     CPInfo aInfo;
4500     int oid;
4501     uint16_t dbRoot;
4502     uint32_t partitionNum;
4503     uint16_t segmentNum;
4504     uint32_t fileBlockOffset;
4505 
4506     // 2) Get the list of uncommitted lbids for the transaction, if we weren't given one.
4507     if (plbidList == NULL)
4508     {
4509         getUncommittedExtentLBIDs(static_cast<VER_t>(txnid), localLBIDList);
4510         plbidList = &localLBIDList;
4511     }
4512 
4513     if (plbidList->size() == 0)
4514     {
4515         return; // Nothing to do.
4516     }
4517 
4518     vector<LBID_t>::const_iterator iter = plbidList->begin();
4519     vector<LBID_t>::const_iterator end = plbidList->end();
4520     csc = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog();
4521 
4522     for (; iter != end; ++iter)
4523     {
4524         LBID_t lbid = *iter;
4525         aInfo.firstLbid = lbid;
4526 
4527         // lookup the column oid for that lbid (all we care about is oid here)
4528         if (em->lookupLocal(lbid, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset) == 0)
4529         {
4530             if (execplan::isUnsigned(csc->colType(oid).colDataType))
4531             {
4532                 aInfo.max = 0;
4533                 aInfo.min = numeric_limits<uint64_t>::max();
4534             }
4535             else
4536             {
4537                 aInfo.max = numeric_limits<int64_t>::min();
4538                 aInfo.min = numeric_limits<int64_t>::max();
4539             }
4540         }
4541         else
4542         {
4543             // We have a problem, but we need to put something in. This should never happen.
4544             aInfo.max = numeric_limits<int64_t>::min();
4545             aInfo.min = numeric_limits<int64_t>::max();
4546         }
4547 
4548         aInfo.seqNum = -2;
4549         cpInfos.push_back(aInfo);
4550     }
4551 
4552     // Call setExtentsMaxMin to invalidate and set the proper max/min in each extent
4553     setExtentsMaxMin(cpInfos);
4554 }
4555 
4556 }   //namespace
4557