/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /***************************************************************************** * $Id: dbrm.cpp 1878 2013-05-02 15:17:12Z dcathey $ * ****************************************************************************/ #include #include #include #ifdef __linux__ #include #endif #include //#define NDEBUG #include #include "oamcache.h" #include "rwlock.h" #include "mastersegmenttable.h" #include "extentmap.h" #include "copylocks.h" #include "vss.h" #include "vbbm.h" #include "socketclosed.h" #include "configcpp.h" #include "sessionmanagerserver.h" #include "messagequeuepool.h" #define DBRM_DLLEXPORT #include "dbrm.h" #undef DBRM_DLLEXPORT #ifdef BRM_DEBUG #define CHECK_EMPTY(x) \ if (x.length() != 0) \ throw logic_error("DBRM: got a message of the wrong size"); #else #define CHECK_EMPTY(x) #endif #define DO_ERR_NETWORK \ MessageQueueClientPool::releaseInstance(msgClient); \ msgClient = NULL; \ mutex.unlock(); \ return ERR_NETWORK; using namespace std; using namespace messageqcpp; using namespace oam; #ifdef BRM_INFO #include "tracer.h" #endif namespace BRM { DBRM::DBRM(bool noBRMinit) : fDebug(false) { if (!noBRMinit) { mst.reset(new MasterSegmentTable()); em.reset(new ExtentMap()); vss.reset(new VSS()); vbbm.reset(new VBBM()); copylocks.reset(new CopyLocks()); em->setReadOnly(); vss->setReadOnly(); vbbm->setReadOnly(); } msgClient = NULL; masterName = "DBRM_Controller"; config = config::Config::makeConfig(); #ifdef BRM_INFO fDebug = ("Y" == config->getConfig("DBRM", "Debug")); #endif } DBRM::DBRM(const DBRM& brm) { throw logic_error("DBRM: Don't use the copy constructor."); } DBRM::~DBRM() throw() { if (msgClient != NULL) MessageQueueClientPool::releaseInstance(msgClient); } DBRM& DBRM::operator=(const DBRM& brm) { throw logic_error("DBRM: Don't use the = operator."); } int DBRM::saveState() throw() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("saveState()"); #endif string prefix = config->getConfig("SystemConfig", "DBRMRoot"); if (prefix.length() == 0) { cerr << "Error: Need a valid Calpont configuation file" << endl; exit(1); } int rc = saveState(prefix); return rc; } int DBRM::saveState(string filename) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("saveState(filename)"); TRACER_ADDSTRINPUT(filename); TRACER_WRITE; } #endif string emFilename = filename + "_em"; string vssFilename = filename + "_vss"; string vbbmFilename = filename + "_vbbm"; bool locked[3] = { false, false, false }; try { vbbm->lock(VBBM::READ); locked[0] = true; vss->lock(VSS::READ); locked[1] = true; copylocks->lock(CopyLocks::READ); locked[2] = true; saveExtentMap(emFilename); vbbm->save(vbbmFilename); vss->save(vssFilename); copylocks->release(CopyLocks::READ); locked[2] = false; vss->release(VSS::READ); locked[1] = false; vbbm->release(VBBM::READ); locked[0] = false; } catch (exception& e) { if (locked[2]) copylocks->release(CopyLocks::READ); if (locked[1]) vss->release(VSS::READ); if (locked[0]) vbbm->release(VBBM::READ); return -1; } return 0; } int DBRM::saveExtentMap(const string& filename) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("saveExtentMap"); TRACER_ADDSTRINPUT(filename); TRACER_WRITE; } #endif try { em->save(filename); } catch (exception& e) { cerr << e.what() << endl; return -1; } return 0; } // @bug 1055+. New functions added for multiple files per OID enhancement. int DBRM::lookupLocal(LBID_t lbid, VER_t verid, bool vbFlag, OID_t& oid, uint16_t& dbRoot, uint32_t& partitionNum, uint16_t& segmentNum, uint32_t& fileBlockOffset) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("lookupLocal(lbid,ver,..)"); TRACER_ADDINPUT(lbid); TRACER_ADDINPUT(verid); TRACER_ADDBOOLINPUT(vbFlag); TRACER_ADDOUTPUT(oid); TRACER_ADDSHORTOUTPUT(dbRoot); TRACER_ADDOUTPUT(partitionNum); TRACER_ADDSHORTOUTPUT(segmentNum); TRACER_ADDOUTPUT(fileBlockOffset); TRACER_WRITE; } #endif bool locked[2] = {false, false}; int ret; bool tooOld = false; try { if (!vbFlag) return em->lookupLocal(lbid, (int&)oid, dbRoot, partitionNum, segmentNum, fileBlockOffset); else { vbbm->lock(VBBM::READ); locked[0] = true; ret = vbbm->lookup(lbid, verid, oid, fileBlockOffset); vbbm->release(VBBM::READ); locked[0] = false; if (ret < 0) { vss->lock(VSS::READ); locked[1] = true; tooOld = vss->isTooOld(lbid, verid); vss->release(VSS::READ); locked[1] = false; if (tooOld) return ERR_SNAPSHOT_TOO_OLD; } return ret; } } catch (exception& e) { if (locked[1]) vss->release(VSS::READ); if (locked[0]) vbbm->release(VBBM::READ); cerr << e.what() << endl; return -1; } } int DBRM::lookupLocal(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, uint32_t fileBlockOffset, LBID_t& lbid) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("lookupLocal(oid,fbo,..)"); TRACER_ADDINPUT(oid); TRACER_ADDINPUT(partitionNum); TRACER_ADDSHORTINPUT(segmentNum); TRACER_ADDINPUT(fileBlockOffset); TRACER_ADDOUTPUT(lbid); TRACER_WRITE; } #endif try { return em->lookupLocal(oid, partitionNum, segmentNum, fileBlockOffset, lbid); } catch (exception& e) { cerr << e.what() << endl; return -1; } } int DBRM::lookupLocal_DBroot(OID_t oid, uint32_t dbroot, uint32_t partitionNum, uint16_t segmentNum, uint32_t fileBlockOffset, LBID_t& lbid) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("lookupLocal(oid,fbo,..)"); TRACER_ADDINPUT(oid); TRACER_ADDINPUT(partitionNum); TRACER_ADDSHORTINPUT(segmentNum); TRACER_ADDINPUT(fileBlockOffset); TRACER_ADDOUTPUT(lbid); TRACER_WRITE; } #endif try { return em->lookupLocal_DBroot(oid, dbroot, partitionNum, segmentNum, fileBlockOffset, lbid); } catch (exception& e) { cerr << e.what() << endl; return -1; } } // @bug 1055- //------------------------------------------------------------------------------ // Lookup/return starting LBID for the specified OID, partition, segment, and // file block offset. //------------------------------------------------------------------------------ int DBRM::lookupLocalStartLbid(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, uint32_t fileBlockOffset, LBID_t& lbid) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("lookupLocalStartLbid(oid,fbo,..)"); TRACER_ADDINPUT(oid); TRACER_ADDINPUT(partitionNum); TRACER_ADDSHORTINPUT(segmentNum); TRACER_ADDINPUT(fileBlockOffset); TRACER_ADDOUTPUT(lbid); TRACER_WRITE; } #endif try { return em->lookupLocalStartLbid(oid, partitionNum, segmentNum, fileBlockOffset, lbid); } catch (exception& e) { cerr << e.what() << endl; return -1; } } int DBRM::lookup(OID_t oid, LBIDRange_v& lbidList) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("lookup(oid,range)"); TRACER_ADDINPUT(oid); TRACER_WRITE; } #endif try { em->lookup(oid, lbidList); return 0; } catch (exception& e) { cerr << e.what() << endl; return -1; } } // Casual Partitioning support int DBRM::markExtentInvalid(const LBID_t lbid, execplan::CalpontSystemCatalog::ColDataType colDataType) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("markExtentInvalid"); TRACER_ADDINPUT(lbid); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << MARKEXTENTINVALID << (uint64_t)lbid << (uint32_t)colDataType; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::markExtentsInvalid(const vector& lbids, const std::vector& colDataTypes) DBRM_THROW { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("markExtentsInvalid"); #endif ByteStream command, response; uint8_t err; uint32_t size = lbids.size(), i; command << MARKMANYEXTENTSINVALID << size; for (i = 0; i < size; i++) { command << (uint64_t) lbids[i]; command << (uint32_t) colDataTypes[i]; } err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::getExtentMaxMin(const LBID_t lbid, int64_t& max, int64_t& min, int32_t& seqNum) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getExtentMaxMin"); TRACER_ADDINPUT(lbid); TRACER_ADDOUTPUT(max); TRACER_ADDOUTPUT(min); TRACER_ADDOUTPUT(seqNum); TRACER_WRITE; } #endif try { int ret = em->getMaxMin(lbid, max, min, seqNum); return ret; } catch (exception& e) { cerr << e.what() << endl; return false; } } int DBRM::setExtentMaxMin(const LBID_t lbid, const int64_t max, const int64_t min, const int32_t seqNum) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("setExtentMaxMin"); TRACER_ADDINPUT(lbid); TRACER_ADDINPUT(max); TRACER_ADDINPUT(min); TRACER_ADDINPUT(seqNum); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << SETEXTENTMAXMIN << (uint64_t)lbid << (uint64_t)max << (uint64_t)min << (uint64_t)seqNum; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } // @bug 1970 - Added function below to set multiple extents casual partition info in one call. int DBRM::setExtentsMaxMin(const CPInfoList_t& cpInfos) DBRM_THROW { CPInfoList_t::const_iterator it; #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("setExtentsMaxMin"); for (it = cpInfos.begin(); it != cpInfos.end(); it++) { TRACER_ADDINPUT(it->firstLbid); TRACER_ADDINPUT(it->max); TRACER_ADDINPUT(it->min); TRACER_ADDINPUT(it->seqNum); TRACER_WRITE; } } #endif ByteStream command, response; uint8_t err = 0; if (cpInfos.size() == 0) return err; if (cpInfos.empty()) return ERR_OK; command << SETMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size(); for (it = cpInfos.begin(); it != cpInfos.end(); it++) { command << (uint64_t)it->firstLbid << (uint64_t)it->max << (uint64_t)it->min << (uint32_t)it->seqNum; } err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } //------------------------------------------------------------------------------ // @bug 2117 - Add function to merge Casual Partition info with current extent // map information. //------------------------------------------------------------------------------ int DBRM::mergeExtentsMaxMin(const CPInfoMergeList_t& cpInfos) DBRM_THROW { CPInfoMergeList_t::const_iterator it; #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("updateExtentsMaxMin"); for (it = cpInfos.begin(); it != cpInfos.end(); it++) { TRACER_ADDINPUT(it->startLbid); TRACER_ADDINPUT(it->max); TRACER_ADDINPUT(it->min); TRACER_ADDINPUT(it->seqNum); TRACER_ADDINPUT(it->type); TRACER_ADDINPUT(it->newExtent); TRACER_WRITE; } } #endif ByteStream command, response; uint8_t err; command << MERGEMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size(); for (it = cpInfos.begin(); it != cpInfos.end(); it++) { command << (uint64_t)it->startLbid << (uint64_t)it->max << (uint64_t)it->min << (uint32_t)it->seqNum << (uint32_t)it->type << (uint32_t)it->newExtent; } err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::vssLookup(LBID_t lbid, const QueryContext& verInfo, VER_t txnID, VER_t* outVer, bool* vbFlag, bool vbOnly) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("vssLookup"); TRACER_ADDINPUT(lbid); TRACER_ADDINPUT(verInfo); TRACER_ADDINPUT(txnID); TRACER_ADDBOOLINPUT(vbOnly); TRACER_WRITE; } #endif if (!vbOnly && vss->isEmpty()) { *outVer = 0; *vbFlag = false; return -1; } bool locked = false; try { int rc = 0; vss->lock(VSS::READ); locked = true; rc = vss->lookup(lbid, verInfo, txnID, outVer, vbFlag, vbOnly); vss->release(VSS::READ); return rc; } catch (exception& e) { if (locked) vss->release(VSS::READ); cerr << e.what() << endl; return -1; } } int DBRM::bulkVSSLookup(const std::vector& lbids, const QueryContext_vss& verInfo, VER_t txnID, std::vector* out) { uint32_t i; bool locked = false; try { out->resize(lbids.size()); vss->lock(VSS::READ); locked = true; if (vss->isEmpty(false)) { for (i = 0; i < lbids.size(); i++) { VSSData& vd = (*out)[i]; vd.verID = 0; vd.vbFlag = false; vd.returnCode = -1; } } else { for (i = 0; i < lbids.size(); i++) { VSSData& vd = (*out)[i]; vd.returnCode = vss->lookup(lbids[i], verInfo, txnID, &vd.verID, &vd.vbFlag, false); } } vss->release(VSS::READ); return 0; } catch (exception& e) { cerr << e.what() << endl; } catch (...) { cerr << "bulkVSSLookup: caught an exception" << endl; } if (locked) vss->release(VSS::READ); out->clear(); return -1; } VER_t DBRM::getCurrentVersion(LBID_t lbid, bool* isLocked) const { bool locked = false; VER_t ret = 0; try { vss->lock(VSS::READ); locked = true; ret = vss->getCurrentVersion(lbid, isLocked); vss->release(VSS::READ); locked = false; } catch (exception& e) { cerr << e.what() << endl; if (locked) vss->release(VSS::READ); throw; } return ret; } int DBRM::bulkGetCurrentVersion(const vector& lbids, vector* versions, vector* isLocked) const { bool locked = false; versions->resize(lbids.size()); if (isLocked != NULL) isLocked->resize(lbids.size()); try { vss->lock(VSS::READ); locked = true; if (isLocked != NULL) { bool tmp = false; for (uint32_t i = 0; i < lbids.size(); i++) { (*versions)[i] = vss->getCurrentVersion(lbids[i], &tmp); (*isLocked)[i] = tmp; } } else for (uint32_t i = 0; i < lbids.size(); i++) (*versions)[i] = vss->getCurrentVersion(lbids[i], NULL); vss->release(VSS::READ); locked = false; return 0; } catch (exception& e) { versions->clear(); cerr << e.what() << endl; if (locked) vss->release(VSS::READ); return -1; } } VER_t DBRM::getHighestVerInVB(LBID_t lbid, VER_t max) const { bool locked = false; VER_t ret = -1; try { vss->lock(VSS::READ); locked = true; ret = vss->getHighestVerInVB(lbid, max); vss->release(VSS::READ); locked = false; } catch (exception& e) { cerr << e.what() << endl; if (locked) vss->release(VSS::READ); throw; } return ret; } bool DBRM::isVersioned(LBID_t lbid, VER_t ver) const { bool ret = false; bool locked = false; try { vss->lock(VSS::READ); locked = true; ret = vss->isVersioned(lbid, ver); vss->release(VSS::READ); locked = false; } catch (exception& e) { cerr << e.what() << endl; if (locked) vss->release(VSS::READ); throw; } return ret; } int8_t DBRM::send_recv(const ByteStream& in, ByteStream& out) throw() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("send_recv"); #endif uint8_t attempt = 1; mutex.lock(); reconnect: if (msgClient == NULL) try { msgClient = MessageQueueClientPool::getInstance(masterName); } catch (exception& e) { cerr << "class DBRM failed to create a MessageQueueClient: " << e.what() << endl; msgClient = NULL; mutex.unlock(); return ERR_NETWORK; } try { msgClient->write(in); out = msgClient->read(); } /* If we add a timeout to the read() call, uncomment this clause catch (SocketClosed &e) { cerr << "DBRM::send_recv: controller node closed the connection" << endl; DO_ERR_NETWORK; } */ catch (exception& e) { cerr << "DBRM::send_recv caught: " << e.what() << endl; if (attempt++ == 1) { MessageQueueClientPool::releaseInstance(msgClient); msgClient = NULL; goto reconnect; } DO_ERR_NETWORK; } if (out.length() == 0) { cerr << "DBRM::send_recv: controller node closed the connection" << endl; if (attempt <= 2) { MessageQueueClientPool::releaseInstance(msgClient); msgClient = NULL; if (attempt++ > 1) { sleep(3); } goto reconnect; } DO_ERR_NETWORK; } mutex.unlock(); return ERR_OK; } //------------------------------------------------------------------------------ // Send a request to create a "stripe" of column extents for the specified // column OIDs and DBRoot. //------------------------------------------------------------------------------ int DBRM::createStripeColumnExtents( const std::vector& cols, uint16_t dbRoot, uint32_t& partitionNum, uint16_t& segmentNum, std::vector& extents) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("createStripeColumnExtents"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; uint16_t tmp16; uint32_t tmp32; command << CREATE_STRIPE_COLUMN_EXTENTS; serializeInlineVector(command, cols); command << dbRoot << partitionNum; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; try { response >> err; if (err != 0) return (int) err; response >> tmp32; partitionNum = tmp32; response >> tmp16; segmentNum = tmp16; deserializeInlineVector(response, extents); } catch (exception& e) { cerr << e.what() << endl; return ERR_FAILURE; } CHECK_EMPTY(response); return 0; } //------------------------------------------------------------------------------ // Send a request to create a column extent for the specified OID and DBRoot. //------------------------------------------------------------------------------ int DBRM::createColumnExtent_DBroot(OID_t oid, uint32_t colWidth, uint16_t dbRoot, uint32_t& partitionNum, uint16_t& segmentNum, execplan::CalpontSystemCatalog::ColDataType colDataType, LBID_t& lbid, int& allocdSize, uint32_t& startBlockOffset) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("createColumnExtent_DBroot"); TRACER_ADDINPUT(oid); TRACER_ADDINPUT(colWidth); TRACER_ADDSHORTINPUT(dbRoot); TRACER_ADDOUTPUT(partitionNum); TRACER_ADDSHORTOUTPUT(segmentNum); TRACER_ADDINT64OUTPUT(lbid); TRACER_ADDOUTPUT(allocdSize); TRACER_ADDOUTPUT(startBlockOffset); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; uint32_t tmp8 = (uint8_t)colDataType; uint16_t tmp16; uint32_t tmp32; uint64_t tmp64; command << CREATE_COLUMN_EXTENT_DBROOT << (ByteStream::quadbyte) oid << colWidth << dbRoot << partitionNum << segmentNum << tmp8; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; try { response >> err; if (err != 0) return (int) err; response >> tmp32; partitionNum = tmp32; response >> tmp16; segmentNum = tmp16; response >> tmp64; lbid = (int64_t)tmp64; response >> tmp32; allocdSize = (int32_t)tmp32; response >> tmp32; startBlockOffset = (int32_t)tmp32; } catch (exception& e) { cerr << e.what() << endl; return ERR_FAILURE; } CHECK_EMPTY(response); return 0; } //------------------------------------------------------------------------------ // Send a request to create a column extent for the exact segment file // specified by the requested OID, DBRoot, partition, and segment. //------------------------------------------------------------------------------ int DBRM::createColumnExtentExactFile(OID_t oid, uint32_t colWidth, uint16_t dbRoot, uint32_t partitionNum, uint16_t segmentNum, execplan::CalpontSystemCatalog::ColDataType colDataType, LBID_t& lbid, int& allocdSize, uint32_t& startBlockOffset) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("createColumnExtentExactFile"); TRACER_ADDINPUT(oid); TRACER_ADDINPUT(colWidth); TRACER_ADDSHORTINPUT(dbRoot); TRACER_ADDOUTPUT(partitionNum); TRACER_ADDSHORTOUTPUT(segmentNum); TRACER_ADDINT64OUTPUT(lbid); TRACER_ADDOUTPUT(allocdSize); TRACER_ADDOUTPUT(startBlockOffset); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; uint8_t tmp8; uint16_t tmp16; uint32_t tmp32; uint64_t tmp64; tmp8 = (uint8_t)colDataType; command << CREATE_COLUMN_EXTENT_EXACT_FILE << (ByteStream::quadbyte) oid << colWidth << dbRoot << partitionNum << segmentNum << tmp8; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; try { response >> err; if (err != 0) return (int) err; response >> tmp32; partitionNum = tmp32; response >> tmp16; segmentNum = tmp16; response >> tmp64; lbid = (int64_t)tmp64; response >> tmp32; allocdSize = (int32_t)tmp32; response >> tmp32; startBlockOffset = (int32_t)tmp32; } catch (exception& e) { cerr << e.what() << endl; return ERR_FAILURE; } CHECK_EMPTY(response); return 0; } //------------------------------------------------------------------------------ // Send a request to create a dictionary store extent. //------------------------------------------------------------------------------ int DBRM::createDictStoreExtent(OID_t oid, uint16_t dbRoot, uint32_t partitionNum, uint16_t segmentNum, LBID_t& lbid, int& allocdSize) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("createDictStoreExtent"); TRACER_ADDINPUT(oid); TRACER_ADDSHORTINPUT(dbRoot); TRACER_ADDINPUT(partitionNum); TRACER_ADDSHORTINPUT(segmentNum); TRACER_ADDINT64OUTPUT(lbid); TRACER_ADDOUTPUT(allocdSize); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; uint32_t tmp32; uint64_t tmp64; command << CREATE_DICT_STORE_EXTENT << (ByteStream::quadbyte) oid << dbRoot << partitionNum << segmentNum; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; try { response >> err; if (err != 0) return (int) err; response >> tmp64; lbid = (int64_t)tmp64; response >> tmp32; allocdSize = (int32_t)tmp32; } catch (exception& e) { cerr << e.what() << endl; return ERR_FAILURE; } CHECK_EMPTY(response); return 0; } //------------------------------------------------------------------------------ // Send a request to delete a set extents for the specified column OID and // DBRoot, and to return the extents to the free list. HWMs for the last // stripe of extents in the specified DBRoot are updated accordingly. //------------------------------------------------------------------------------ int DBRM::rollbackColumnExtents_DBroot(OID_t oid, bool bDeleteAll, uint16_t dbRoot, uint32_t partitionNum, uint16_t segmentNum, HWM_t hwm) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("rollbackColumnExtents"); TRACER_ADDINPUT(oid); TRACER_ADDBOOLINPUT(bDeleteAll); TRACER_ADDSHORTINPUT(dbRoot); TRACER_ADDINPUT(partitionNum); TRACER_ADDSHORTINPUT(segmentNum); TRACER_ADDINPUT(hwm); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << ROLLBACK_COLUMN_EXTENTS_DBROOT << (ByteStream::quadbyte) oid << (uint8_t)bDeleteAll << dbRoot << partitionNum << segmentNum << hwm; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } //------------------------------------------------------------------------------ // Send a request to delete a set of extents for the specified dictionary store // OID and DBRoot, and to return the extents to the free list. HWMs for the // last stripe of extents are updated accordingly. //------------------------------------------------------------------------------ int DBRM::rollbackDictStoreExtents_DBroot(OID_t oid, uint16_t dbRoot, uint32_t partitionNum, const vector& segNums, const vector& hwms) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("rollbackDictStoreExtents"); TRACER_ADDINPUT(oid); TRACER_ADDSHORTINPUT(dbRoot); TRACER_ADDINPUT(partitionNum); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << ROLLBACK_DICT_STORE_EXTENTS_DBROOT << (ByteStream::quadbyte) oid << dbRoot << partitionNum; serializeVector(command, segNums); serializeVector(command, hwms); err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::deleteEmptyColExtents(const std::vector& extentsInfo) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("deleteEmptyColExtents"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; uint32_t size = extentsInfo.size(); command << DELETE_EMPTY_COL_EXTENTS; command << size; for ( unsigned i = 0; i < extentsInfo.size(); i++) { command << (ByteStream::quadbyte) extentsInfo[i].oid; command << extentsInfo[i].partitionNum; command << extentsInfo[i].segmentNum; command << extentsInfo[i].dbRoot; command << extentsInfo[i].hwm; } err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::deleteEmptyDictStoreExtents(const std::vector& extentsInfo) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("deleteEmptyDictStoreExtents"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; uint32_t size = extentsInfo.size(); command << DELETE_EMPTY_DICT_STORE_EXTENTS; command << size; for ( unsigned i = 0; i < extentsInfo.size(); i++) { command << (ByteStream::quadbyte) extentsInfo[i].oid; command << extentsInfo[i].partitionNum; command << extentsInfo[i].segmentNum; command << extentsInfo[i].dbRoot; command << extentsInfo[i].hwm; command << (uint8_t)extentsInfo[i].newFile; } err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::deleteOID(OID_t oid) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("deleteOID"); TRACER_ADDINPUT(oid); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << DELETE_OID << (ByteStream::quadbyte) oid; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); try { deleteAISequence(oid); } catch (...) { } // an error here means a network problem, will be caught elsewhere return err; } int DBRM::deleteOIDs(const std::vector& oids) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("deleteOIDs"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; uint32_t size = oids.size(); command << DELETE_OIDS; command << size; for ( unsigned i = 0; i < oids.size(); i++) { command << (ByteStream::quadbyte) oids[i]; } err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); try { for (uint32_t i = 0; i < oids.size(); i++) deleteAISequence(oids[i]); } catch (...) { } // an error here means a network problem, will be caught elsewhere return err; } //------------------------------------------------------------------------------ // Return the last local HWM for the specified OID and DBroot. The corresponding // partition number, and segment number are returned as well. This function // can be used by cpimport for example to find out where the current "end-of- // data" is, so that cpimport will know where to begin adding new rows. // If no available or outOfService extent is found, then bFound is returned // as false. //------------------------------------------------------------------------------ int DBRM::getLastHWM_DBroot(int oid, uint16_t dbRoot, uint32_t& partitionNum, uint16_t& segmentNum, HWM_t& hwm, int& status, bool& bFound) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getLastHWM_DBroot"); TRACER_ADDINPUT(oid); TRACER_ADDSHORTOUTPUT(dbRoot); TRACER_ADDOUTPUT(partitionNum); TRACER_ADDSHORTOUTPUT(segmentNum); TRACER_ADDOUTPUT(hwm); TRACER_ADDOUTPUT(status); TRACER_WRITE; } #endif try { hwm = em->getLastHWM_DBroot(oid, dbRoot, partitionNum, segmentNum, status, bFound); } catch (exception& e) { return ERR_FAILURE; } return ERR_OK; } //------------------------------------------------------------------------------ // Return the HWM for the specified OID, partition number, and segment number. // This is used to get the HWM for a particular dictionary segment store file, // or a specific column segment file. //------------------------------------------------------------------------------ int DBRM::getLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, HWM_t& hwm, int& status) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getLocalHWM"); TRACER_ADDINPUT(oid); TRACER_ADDINPUT(partitionNum); TRACER_ADDSHORTINPUT(segmentNum); TRACER_ADDOUTPUT(hwm); TRACER_ADDOUTPUT(status); TRACER_WRITE; } #endif try { hwm = em->getLocalHWM(oid, partitionNum, segmentNum, status); } catch (exception& e) { cerr << e.what() << endl; return ERR_FAILURE; } return ERR_OK; } //------------------------------------------------------------------------------ // Set the local HWM for the file referenced by the specified OID, partition // number, and segment number. //------------------------------------------------------------------------------ int DBRM::setLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, HWM_t hwm) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("setLocalHWM"); TRACER_ADDINPUT(oid); TRACER_ADDINPUT(partitionNum); TRACER_ADDSHORTINPUT(segmentNum); TRACER_ADDINPUT(hwm); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << SET_LOCAL_HWM << (ByteStream::quadbyte) oid << partitionNum << segmentNum << hwm; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::bulkSetHWM(const vector& v, VER_t transID) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("bulkSetHWM"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << BULK_SET_HWM; serializeInlineVector(command, v); command << (uint32_t) transID; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::bulkSetHWMAndCP(const vector& v, const vector& setCPDataArgs, const vector& mergeCPDataArgs, VER_t transID) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("bulkSetHWMAndCP"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << BULK_SET_HWM_AND_CP; serializeInlineVector(command, v); serializeInlineVector(command, setCPDataArgs); serializeInlineVector(command, mergeCPDataArgs); command << (uint32_t) transID; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::bulkUpdateDBRoot(const vector& args) { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("bulkUpdateDBRoot"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << BULK_UPDATE_DBROOT; serializeInlineVector(command, args); err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } //------------------------------------------------------------------------------ // For the specified OID and PM number, this function will return a vector // of objects carrying HWM info (for the last segment file) and block count // information about each DBRoot assigned to the specified PM. //------------------------------------------------------------------------------ int DBRM::getDbRootHWMInfo(OID_t oid, uint16_t pmNumber, EmDbRootHWMInfo_v& emDBRootHwmInfos) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getDbRootHWMInfo"); TRACER_ADDINPUT(oid); TRACER_ADDSHORTINPUT(pmNumber); TRACER_WRITE; } #endif try { em->getDbRootHWMInfo(oid, pmNumber, emDBRootHwmInfos); } catch (exception& e) { cerr << e.what() << endl; return ERR_FAILURE; } return ERR_OK; } //------------------------------------------------------------------------------ // Return the status or state of the extents in the segment file specified // by the arguments: oid, partitionNum, and segment Num. //------------------------------------------------------------------------------ int DBRM::getExtentState(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, bool& bFound, int& status) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getExtentState"); TRACER_ADDINPUT(oid); TRACER_ADDINPUT(partitionNum); TRACER_ADDSHORTINPUT(segmentNum); TRACER_ADDOUTPUT(status); TRACER_WRITE; } #endif try { em->getExtentState(oid, partitionNum, segmentNum, bFound, status); } catch (exception& e) { cerr << e.what() << endl; return ERR_FAILURE; } return ERR_OK; } // dmc-should eventually deprecate int DBRM::getExtentSize() throw() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("getExtentSize"); #endif return em->getExtentSize(); } unsigned DBRM::getExtentRows() throw() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("getExtentRows"); #endif return em->getExtentRows(); } int DBRM::getExtents(int OID, std::vector& entries, bool sorted, bool notFoundErr, bool incOutOfService) { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getExtents"); TRACER_ADDINPUT(OID); TRACER_WRITE; } #endif try { em->getExtents(OID, entries, sorted, notFoundErr, incOutOfService); } catch (exception& e) { cerr << e.what() << endl; return -1; } return 0; } int DBRM::getExtents_dbroot(int OID, std::vector& entries, const uint16_t dbroot) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getExtents_dbroot"); TRACER_ADDINPUT(OID); TRACER_WRITE; } #endif try { em->getExtents_dbroot(OID, entries, dbroot); } catch (exception& e) { cerr << e.what() << endl; return -1; } return 0; } //------------------------------------------------------------------------------ // Return the number of extents for the specified OID and DBRoot. // Any out-of-service extents can optionally be included or excluded. //------------------------------------------------------------------------------ int DBRM::getExtentCount_dbroot(int OID, uint16_t dbroot, bool incOutOfService, uint64_t& numExtents) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getExtentCount_dbroot"); TRACER_ADDINPUT(OID); TRACER_WRITE; } #endif try { em->getExtentCount_dbroot(OID, dbroot, incOutOfService, numExtents); } catch (exception& e) { cerr << e.what() << endl; return -1; } return 0; } //------------------------------------------------------------------------------ // Gets the DBRoot for the specified system catalog OID. // Function assumes the specified System Catalog OID is fully contained on // a single DBRoot, as the function only searches for and returns the first // DBRoot entry that is found in the extent map. //------------------------------------------------------------------------------ int DBRM::getSysCatDBRoot(OID_t oid, uint16_t& dbRoot) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getSysCatDBRoot"); TRACER_ADDINPUT(oid); TRACER_ADDSHORTOUTPUT(dbRoot); TRACER_WRITE; } #endif try { em->getSysCatDBRoot(oid, dbRoot); } catch (exception& e) { cerr << e.what() << endl; return -1; } return 0; } //------------------------------------------------------------------------------ // Delete all extents for the specified OID(s) and partition number. //------------------------------------------------------------------------------ int DBRM::deletePartition(const std::vector& oids, const std::set& partitionNums, string& emsg) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITENOW("deletePartition"); std::ostringstream oss; oss << "partitionNum: "; std::set::const_iterator partIt; for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt) oss << (*partIt) << " "; oss << "; OIDS: "; std::vector::const_iterator it; for (it = oids.begin(); it != oids.end(); ++it) { oss << (*it) << ", "; } TRACER_WRITEDIRECT(oss.str()); } #endif ByteStream command, response; uint8_t err; command << DELETE_PARTITION; serializeSet(command, partitionNums); uint32_t oidSize = oids.size(); command << oidSize; for ( unsigned i = 0; i < oidSize; i++) command << (ByteStream::quadbyte) oids[i]; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; if (err != 0) response >> emsg; CHECK_EMPTY(response); return err; } //------------------------------------------------------------------------------ // Mark all extents as out of service, for the specified OID(s) and partition // number. //------------------------------------------------------------------------------ int DBRM::markPartitionForDeletion(const std::vector& oids, const std::set& partitionNums, string& emsg) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITENOW("markPartitionForDeletion"); std::ostringstream oss; oss << "partitionNum: "; std::set::const_iterator partIt; for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt) oss << (*partIt) << " "; oss << "; OIDS: "; std::vector::const_iterator it; for (it = oids.begin(); it != oids.end(); ++it) { oss << (*it) << ", "; } TRACER_WRITEDIRECT(oss.str()); } #endif ByteStream command, response; uint8_t err; command << MARK_PARTITION_FOR_DELETION; serializeSet(command, partitionNums); uint32_t oidSize = oids.size(); command << oidSize; for ( unsigned i = 0; i < oidSize; i++) command << (ByteStream::quadbyte) oids[i]; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; if (err) response >> emsg; CHECK_EMPTY(response); return err; } //------------------------------------------------------------------------------ // Mark all extents as out of service, for the specified OID(s) //------------------------------------------------------------------------------ int DBRM::markAllPartitionForDeletion(const std::vector& oids) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITENOW("markAllPartitionForDeletion"); std::ostringstream oss; oss << "OIDS: "; std::vector::const_iterator it; for (it = oids.begin(); it != oids.end(); ++it) { oss << (*it) << ", "; } TRACER_WRITEDIRECT(oss.str()); } #endif ByteStream command, response; uint8_t err; uint32_t size = oids.size(); command << MARK_ALL_PARTITION_FOR_DELETION << size; for ( unsigned i = 0; i < size; i++) { command << (ByteStream::quadbyte) oids[i]; } err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } //------------------------------------------------------------------------------ // Restore all extents for the specified OID(s) and partition number. //------------------------------------------------------------------------------ int DBRM::restorePartition(const std::vector& oids, const std::set& partitionNums, string& emsg) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITENOW("restorePartition"); std::ostringstream oss; oss << "partitionNum: "; std::set::const_iterator partIt; for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt) oss << (*partIt) << " "; oss << "; OIDS: "; std::vector::const_iterator it; for (it = oids.begin(); it != oids.end(); ++it) { oss << (*it) << ", "; } TRACER_WRITEDIRECT(oss.str()); } #endif ByteStream command, response; uint8_t err; command << RESTORE_PARTITION; serializeSet(command, partitionNums); uint32_t oidSize = oids.size(); command << oidSize; for ( unsigned i = 0; i < oidSize; i++) command << (ByteStream::quadbyte) oids[i]; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; if (err) response >> emsg; CHECK_EMPTY(response); return err; } //------------------------------------------------------------------------------ // Return all the out-of-service partitions for the specified OID. //------------------------------------------------------------------------------ int DBRM::getOutOfServicePartitions(OID_t oid, std::set& partitionNums) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getOutOfServicePartitions"); TRACER_ADDINPUT(oid); TRACER_WRITE; } #endif try { em->getOutOfServicePartitions(oid, partitionNums); } catch (exception& e) { cerr << e.what() << endl; return -1; } return ERR_OK; } //------------------------------------------------------------------------------ // Delete all extents for the specified DBRoot //------------------------------------------------------------------------------ int DBRM::deleteDBRoot(uint16_t dbroot) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITENOW("deleteDBRoot"); std::ostringstream oss; oss << "DBRoot: " << dbroot; TRACER_WRITEDIRECT(oss.str()); } #endif ByteStream command, response; uint8_t err; command << DELETE_DBROOT; uint32_t q = static_cast(dbroot); command << q; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } //------------------------------------------------------------------------------ // Does the specified DBRoot have any extents. // Returns an error if extentmap shared memory is not loaded. //------------------------------------------------------------------------------ int DBRM::isDBRootEmpty(uint16_t dbroot, bool& isEmpty, std::string& errMsg) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("isDBRootEmpty"); TRACER_ADDINPUT(dbroot); TRACER_WRITE; } #endif errMsg.clear(); try { isEmpty = em->isDBRootEmpty(dbroot); } catch (exception& e) { cerr << e.what() << endl; errMsg = e.what(); return ERR_FAILURE; } return ERR_OK; } int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, uint32_t vbFBO) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("writeVBEntry"); TRACER_ADDINPUT(transID); TRACER_ADDINPUT(lbid); TRACER_ADDINPUT(vbOID); TRACER_ADDINPUT(vbFBO); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << WRITE_VB_ENTRY << (uint32_t) transID << (uint64_t) lbid << (uint32_t) vbOID << vbFBO; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::bulkWriteVBEntry(VER_t transID, const std::vector& lbids, OID_t vbOID, const std::vector& vbFBOs) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("bulkWriteVBEntry"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << BULK_WRITE_VB_ENTRY << (uint32_t) transID; serializeInlineVector(command, lbids); command << (uint32_t) vbOID; serializeInlineVector(command, vbFBOs); err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } struct _entry { _entry(LBID_t l) : lbid(l) { }; LBID_t lbid; inline bool operator<(const _entry& e) const { return ((e.lbid >> 10) < (lbid >> 10)); } }; int DBRM::getDBRootsForRollback(VER_t transID, vector* dbroots) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getDBRootsForRollback"); TRACER_ADDINPUT(transID); TRACER_WRITE; } #endif bool locked[2] = {false, false}; set vbOIDs; set::iterator vbIt; vector lbidList; uint32_t i, size; uint32_t tmp32; OID_t vbOID; int err; set<_entry> lbidPruner; set<_entry>::iterator it; try { vbbm->lock(VBBM::READ); locked[0] = true; vss->lock(VSS::READ); locked[1] = true; vss->getUncommittedLBIDs(transID, lbidList); // prune the list; will leave at most 1 entry per 1024-lbid range for (i = 0, size = lbidList.size(); i < size; i++) lbidPruner.insert(_entry(lbidList[i])); // get the VB oids for (it = lbidPruner.begin(); it != lbidPruner.end(); ++it) { err = vbbm->lookup(it->lbid, transID, vbOID, tmp32); if (err) // this error will be caught by DML; more appropriate to handle it there continue; vbOIDs.insert(vbOID); } // get the dbroots for (vbIt = vbOIDs.begin(); vbIt != vbOIDs.end(); ++vbIt) { err = getDBRootOfVBOID(*vbIt); if (err) { ostringstream os; os << "DBRM::getDBRootOfVBOID() returned an error looking for vbOID " << *vbIt; log(os.str()); return ERR_FAILURE; } dbroots->push_back((uint16_t) err); } vss->release(VSS::READ); locked[1] = false; vbbm->release(VBBM::READ); locked[0] = false; return ERR_OK; } catch (exception& e) { if (locked[0]) vbbm->release(VBBM::READ); if (locked[1]) vss->release(VSS::READ); return -1; } } int DBRM::getUncommittedLBIDs(VER_t transID, vector& lbidList) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getUncommittedLBIDs"); TRACER_ADDINPUT(transID); TRACER_WRITE; } #endif bool locked = false; try { vss->lock(VSS::READ); locked = true; vss->getUncommittedLBIDs(transID, lbidList); vss->release(VSS::READ); locked = false; return 0; } catch (exception& e) { if (locked) vss->release(VSS::READ); return -1; } } // @bug 1509. New function that returns one LBID per extent touched as part of the transaction. Used to get a list // of blocks to use for updating casual partitioning when the transaction is committed. int DBRM::getUncommittedExtentLBIDs(VER_t transID, vector& lbidList) throw() { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getUncommittedExtentLBIDs"); TRACER_ADDINPUT(transID); TRACER_WRITE; } #endif bool locked = false; vector::iterator lbidIt; typedef pair range_t; range_t range; vector ranges; vector::iterator rangeIt; try { vss->lock(VSS::READ); locked = true; // Get a full list of uncommitted LBIDs related to this transactin. vss->getUncommittedLBIDs(transID, lbidList); vss->release(VSS::READ); locked = false; if (lbidList.size() > 0) { // Sort the vector. std::sort::iterator>(lbidList.begin(), lbidList.end()); // Get the LBID range for the first block in the list. lbidIt = lbidList.begin(); if (em->lookup(*lbidIt, range.first, range.second) < 0) { return -1; } ranges.push_back(range); // Loop through the LBIDs and add the new ranges. ++lbidIt; while (lbidIt != lbidList.end()) { if (*lbidIt > range.second) { if (em->lookup(*lbidIt, range.first, range.second) < 0) { return -1; } ranges.push_back(range); } ++lbidIt; } // Reset the lbidList and return only the first LBID in each extent that was changed // in the transaction. lbidList.clear(); for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++) { lbidList.push_back(rangeIt->first); } } return 0; } catch (exception& e) { if (locked) vss->release(VSS::READ); return -1; } } int DBRM::beginVBCopy(VER_t transID, uint16_t dbRoot, const LBIDRange_v& ranges, VBRange_v& freeList) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("beginVBCopy"); TRACER_ADDINPUT(transID); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << BEGIN_VB_COPY << (ByteStream::quadbyte) transID << dbRoot; serializeVector(command, ranges); err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; try { response >> err; if (err != 0) return err; deserializeVector(response, freeList); } catch (exception& e) { cerr << e.what() << endl; return ERR_NETWORK; } CHECK_EMPTY(response); return 0; } int DBRM::endVBCopy(VER_t transID, const LBIDRange_v& ranges) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("endVBCopy"); TRACER_ADDINPUT(transID); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << END_VB_COPY << (ByteStream::quadbyte) transID; serializeVector(command, ranges); err = send_recv(command, response); if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::vbCommit(VER_t transID) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("vbCommit"); TRACER_ADDINPUT(transID); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << VB_COMMIT << (ByteStream::quadbyte) transID; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::vbRollback(VER_t transID, const LBIDRange_v& lbidList) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("vbRollback "); TRACER_ADDINPUT(transID); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << VB_ROLLBACK1 << (ByteStream::quadbyte) transID; serializeVector(command, lbidList); err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::vbRollback(VER_t transID, const vector& lbidList) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("vbRollback"); TRACER_ADDINPUT(transID); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << VB_ROLLBACK2 << (ByteStream::quadbyte) transID; serializeVector(command, lbidList); err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::halt() DBRM_THROW { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("halt"); #endif ByteStream command, response; uint8_t err; command << HALT; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::resume() DBRM_THROW { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("resume"); #endif ByteStream command, response; uint8_t err; command << RESUME; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::forceReload() DBRM_THROW { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("forceReload"); #endif ByteStream command, response; uint8_t err; command << RELOAD; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::setReadOnly(bool b) DBRM_THROW { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("setReadOnly"); TRACER_ADDBOOLINPUT(b); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << (b ? SETREADONLY : SETREADWRITE); err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::isReadWrite() throw() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("isReadWrite"); #endif ByteStream command, response; uint8_t err; command << GETREADONLY; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; //CHECK_EMPTY(response); return (err == 0 ? ERR_OK : ERR_READONLY); } int DBRM::dmlLockLBIDRanges(const vector& ranges, int txnID) { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("clear"); #endif ByteStream command, response; uint8_t err; command << LOCK_LBID_RANGES; serializeVector(command, ranges); command << (uint32_t) txnID; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::dmlReleaseLBIDRanges(const vector& ranges) { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("clear"); #endif ByteStream command, response; uint8_t err; command << RELEASE_LBID_RANGES; serializeVector(command, ranges); err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::clear() DBRM_THROW { ByteStream command, response; uint8_t err; command << BRM_CLEAR; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() != 1) return ERR_NETWORK; response >> err; CHECK_EMPTY(response); return err; } int DBRM::checkConsistency() throw() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("checkConsistency"); #endif bool locked[2] = {false, false}; try { em->checkConsistency(); } catch (exception& e) { cerr << e.what() << endl; return -1; } try { vbbm->lock(VBBM::READ); locked[0] = true; vss->lock(VSS::READ); locked[1] = true; vss->checkConsistency(*vbbm, *em); vss->release(VSS::READ); locked[1] = false; vbbm->release(VBBM::READ); locked[0] = false; } catch (exception& e) { cerr << e.what() << endl; if (locked[1]) vss->release(VSS::READ); if (locked[0]) vbbm->release(VBBM::READ); return -1; } try { vbbm->lock(VBBM::READ); vbbm->checkConsistency(); vbbm->release(VBBM::READ); } catch (exception& e) { cerr << e.what() << endl; vbbm->release(VBBM::READ); return -1; } return 0; } int DBRM::getCurrentTxnIDs(set& txnList) throw() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("getCurrentTxnIDs"); #endif bool locked[2] = { false, false }; try { txnList.clear(); vss->lock(VSS::READ); locked[0] = true; copylocks->lock(CopyLocks::READ); locked[1] = true; copylocks->getCurrentTxnIDs(txnList); vss->getCurrentTxnIDs(txnList); copylocks->release(CopyLocks::READ); locked[1] = false; vss->release(VSS::READ); locked[0] = false; } catch (exception& e) { if (locked[1]) copylocks->release(CopyLocks::READ); if (locked[0]) vss->release(VSS::READ); cerr << e.what() << endl; return -1; } return 0; } const QueryContext DBRM::verID() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("verID"); #endif ByteStream command, response; uint8_t err; QueryContext ret; command << VER_ID; err = send_recv(command, response); if (err != ERR_OK) { cerr << "DBRM: SessionManager::verID(): network error" << endl; ret.currentScn = -1; return ret; } try { response >> err; response >> ret; CHECK_EMPTY(response); } catch (exception& e) { cerr << "DBRM: SessionManager::verID(): bad response" << endl; log("DBRM: SessionManager::verID(): bad response", logging::LOG_TYPE_WARNING); ret.currentScn = -1; } return ret; } const QueryContext DBRM::sysCatVerID() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("sysCatVerID"); #endif ByteStream command, response; uint8_t err; QueryContext ret; command << SYSCAT_VER_ID; err = send_recv(command, response); if (err != ERR_OK) { cerr << "DBRM: SessionManager::sysCatVerID(): network error" << endl; ret.currentScn = -1; return ret; } try { response >> err; response >> ret; CHECK_EMPTY(response); } catch (exception& e) { cerr << "DBRM: SessionManager::sysCatVerID(): bad response" << endl; log("DBRM: SessionManager::sysCatVerID(): bad response", logging::LOG_TYPE_WARNING); ret.currentScn = -1; } return ret; } const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL) { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("newTxnID"); TRACER_ADDINPUT(session); TRACER_ADDBOOLINPUT(block); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err, tmp; uint32_t tmp32; TxnID ret; command << NEW_TXN_ID << session << (uint8_t) block << (uint8_t)isDDL; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: SessionManager::newTxnID(): network error"); ret.valid = false; return ret; } if (response.length() != 6) { log("DBRM: SessionManager::newTxnID(): bad response"); ret.valid = false; return ret; } response >> err; response >> tmp32; ret.id = tmp32; response >> tmp; ret.valid = (tmp == 0 ? false : true); CHECK_EMPTY(response); return ret; } void DBRM::committed(TxnID& txnid) { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("committed"); TRACER_ADDINPUT(txnid); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << COMMITTED << (uint32_t) txnid.id << (uint8_t) txnid.valid; err = send_recv(command, response); txnid.valid = false; if (err != ERR_OK) log("DBRM: error: SessionManager::committed() failed"); else if (response.length() != 1) log("DBRM: error: SessionManager::committed() failed (bad response)", logging::LOG_TYPE_ERROR); response >> err; if (err != ERR_OK) log("DBRM: error: SessionManager::committed() failed (valid error code)", logging::LOG_TYPE_ERROR); } void DBRM::rolledback(TxnID& txnid) { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("rolledback"); TRACER_ADDINPUT(txnid); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err, tmp; command << ROLLED_BACK << (uint32_t) txnid.id << (uint8_t) txnid.valid; err = send_recv(command, response); txnid.valid = false; if (err != ERR_OK) log("DBRM: error: SessionManager::rolledback() failed (network)"); else if (response.length() != 1) log("DBRM: error: SessionManager::rolledback() failed (bad response)", logging::LOG_TYPE_ERROR); response >> tmp; if (tmp != ERR_OK) { if (getSystemReady() != 0 ) log("DBRM: error: SessionManager::rolledback() failed (valid error code)", logging::LOG_TYPE_ERROR); } } int DBRM::getUnlockedLBIDs(BlockList_t* list) DBRM_THROW { bool locked = false; list->clear(); try { vss->lock(VSS::READ); locked = true; vss->getUnlockedLBIDs(*list); vss->release(VSS::READ); locked = false; return 0; } catch (exception& e) { if (locked) vss->release(VSS::READ); cerr << e.what() << endl; return -1; } } const TxnID DBRM::getTxnID (const SessionManagerServer::SID session) { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getTxnID"); TRACER_ADDINPUT(session); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err, tmp8; uint32_t tmp32; TxnID ret; command << GET_TXN_ID << (uint32_t) session; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: error: SessionManager::getTxnID() failed (network)", logging::LOG_TYPE_ERROR); ret.valid = false; return ret; } response >> err; if (err != ERR_OK) { log("DBRM: error: SessionManager::getTxnID() failed (got an error)", logging::LOG_TYPE_ERROR); ret.valid = false; return ret; } response >> tmp32 >> tmp8; ret.id = tmp32; ret.valid = tmp8; return ret; } boost::shared_array DBRM::SIDTIDMap(int& len) { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("SIDTIDMap"); TRACER_ADDOUTPUT(len); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err, tmp8; uint32_t tmp32; int i; boost::shared_array ret; command << SID_TID_MAP; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: error: SessionManager::SIDTIDEntry() failed (network)"); return ret; } response >> err; if (err != ERR_OK) { log("DBRM: error: SessionManager::SIDTIDEntry() failed (valid error code)", logging::LOG_TYPE_ERROR); return ret; } response >> tmp32; len = (int) tmp32; ret.reset(new SIDTIDEntry[len]); for (i = 0; i < len; i++) { response >> tmp32 >> tmp8; ret[i].txnid.id = tmp32; ret[i].txnid.valid = (tmp8 == 0 ? false : true); response >> tmp32; ret[i].sessionid = tmp32; } CHECK_EMPTY(response); return ret; } uint32_t DBRM::getUnique32() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("getUnique32"); #endif ByteStream command, response; uint8_t err; uint32_t ret; command << GET_UNIQUE_UINT32; err = send_recv(command, response); if (err != ERR_OK) { cerr << "DBRM: getUnique32() failed (network)\n"; log("DBRM: getUnique32() failed (network)", logging::LOG_TYPE_ERROR); throw runtime_error("DBRM: getUnique32() failed check the controllernode"); return 0; } /* Some jobsteps don't need the connection after this so close it to free up resources on the controller node */ /* Comment the following 4 lines out. The DBRM instance is a singleton so no need to remove the client. Plus, it may cause weird network issue when the socket is being released and re-established very quickly*/ //pthread_mutex_lock(&mutex); //delete msgClient; //msgClient = NULL; //pthread_mutex_unlock(&mutex); response >> err; if (err != ERR_OK) { cerr << "DBRM: getUnique32() failed (got an error)\n"; log("DBRM: getUnique32() failed (got an error)", logging::LOG_TYPE_ERROR); throw runtime_error("DBRM: getUnique32() failed check the controllernode"); return 0; } response >> ret; // cerr << "DBRM returning " << ret << endl; return ret; } uint64_t DBRM::getUnique64() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("getUnique64"); #endif ByteStream command, response; uint8_t err; uint64_t ret; command << GET_UNIQUE_UINT64; err = send_recv(command, response); if (err != ERR_OK) { cerr << "DBRM: getUnique64() failed (network)\n"; log("DBRM: getUnique64() failed (network)", logging::LOG_TYPE_ERROR); throw runtime_error("DBRM: getUnique64() failed check the controllernode"); return 0; } /* Some jobsteps don't need the connection after this so close it to free up resources on the controller node */ /* Comment the following 4 lines out. The DBRM instance is a singleton so no need to remove the client. Plus, it may cause weird network issue when the socket is being released and re-established very quickly*/ //pthread_mutex_lock(&mutex); //delete msgClient; //msgClient = NULL; //pthread_mutex_unlock(&mutex); response >> err; if (err != ERR_OK) { cerr << "DBRM: getUnique64() failed (got an error)\n"; log("DBRM: getUnique64() failed (got an error)", logging::LOG_TYPE_ERROR); throw runtime_error("DBRM: getUnique64() failed check the controllernode"); return 0; } response >> ret; // cerr << "DBRM returning " << ret << endl; return ret; } void DBRM::sessionmanager_reset() { ByteStream command, response; command << SM_RESET; send_recv(command, response); } bool DBRM::isEMEmpty() throw() { bool res = false; try { res = em->empty(); } catch (...) { res = false; } return res; } vector DBRM::getEMFreeListEntries() throw() { vector res; try { res = em->getFreeListEntries(); } catch (...) { res.clear(); } return res; } int DBRM::takeSnapshot() throw () { return 0; // don't know why, but we're calling this all the time. Need to take most/all of those calls out, it's very wasteful. ByteStream command, response; uint8_t err; command << TAKE_SNAPSHOT; err = send_recv(command, response); if (err != ERR_OK) return err; if (response.length() == 0) return ERR_NETWORK; return 0; } int DBRM::getSystemReady() throw() { uint32_t stateFlags; if (getSystemState(stateFlags) < 0) { return -1; } return (stateFlags & SessionManagerServer::SS_READY); } int DBRM::getSystemQueryReady() throw() { uint32_t stateFlags; if (getSystemState(stateFlags) < 0) { return -1; } return (stateFlags & SessionManagerServer::SS_QUERY_READY); } int DBRM::getSystemSuspended() throw() { uint32_t stateFlags; if (getSystemState(stateFlags) < 0) { return -1; } return (stateFlags & SessionManagerServer::SS_SUSPENDED); } int DBRM::getSystemSuspendPending(bool& bRollback) throw() { uint32_t stateFlags; if (getSystemState(stateFlags) < 0) { return -1; } bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK; return (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING); } int DBRM::getSystemShutdownPending(bool& bRollback, bool& bForce) throw() { uint32_t stateFlags; if (getSystemState(stateFlags) < 0) { return -1; } bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK; bForce = stateFlags & SessionManagerServer::SS_FORCE; return (stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING); } int DBRM::setSystemReady(bool bReady) throw() { if (bReady) { return setSystemState(SessionManagerServer::SS_READY); } else { return clearSystemState(SessionManagerServer::SS_READY); } } int DBRM::setSystemQueryReady(bool bReady) throw() { if (bReady) { return setSystemState(SessionManagerServer::SS_QUERY_READY); } else { return clearSystemState(SessionManagerServer::SS_QUERY_READY); } } int DBRM::setSystemSuspended(bool bSuspended) throw() { uint32_t stateFlags = 0; if (bSuspended) { if (setSystemState(SessionManagerServer::SS_SUSPENDED) < 0) { return -1; } } else { stateFlags = SessionManagerServer::SS_SUSPENDED; } // In either case, we need to clear the pending and rollback flags stateFlags |= SessionManagerServer::SS_SUSPEND_PENDING; stateFlags |= SessionManagerServer::SS_ROLLBACK; return clearSystemState(stateFlags); } int DBRM::setSystemSuspendPending(bool bPending, bool bRollback) throw() { uint32_t stateFlags = SessionManagerServer::SS_SUSPEND_PENDING; if (bPending) { if (bRollback) { stateFlags |= SessionManagerServer::SS_ROLLBACK; } return setSystemState(stateFlags); } else { stateFlags |= SessionManagerServer::SS_ROLLBACK; return clearSystemState(stateFlags); } } int DBRM::setSystemShutdownPending(bool bPending, bool bRollback, bool bForce) throw() { int rtn = 0; uint32_t stateFlags = SessionManagerServer::SS_SHUTDOWN_PENDING; if (bPending) { if (bForce) { stateFlags |= SessionManagerServer::SS_FORCE; } else if (bRollback) { stateFlags |= SessionManagerServer::SS_ROLLBACK; } rtn = setSystemState(stateFlags); } else { stateFlags |= SessionManagerServer::SS_ROLLBACK; stateFlags |= SessionManagerServer::SS_FORCE; rtn = clearSystemState(stateFlags); // Clears the flags that are turned on in stateFlags } return rtn; } /* Return the shm stateflags */ int DBRM::getSystemState(uint32_t& stateFlags) throw() { try { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("getSystemState"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << GET_SYSTEM_STATE; err = send_recv(command, response); if (err != ERR_OK) { std::ostringstream oss; oss << "DBRM: error: SessionManager::getSystemState() failed (network)"; log(oss.str(), logging::LOG_TYPE_ERROR); return -1; } response >> err; if (err != ERR_OK) { std::ostringstream oss; oss << "DBRM: error: SessionManager::getSystemState() failed (error " << err << ")"; log(oss.str(), logging::LOG_TYPE_ERROR); return -1; } response >> stateFlags; return 1; } catch (...) { } return -1; } /* Set the shm stateflags that are set in the parameter */ int DBRM::setSystemState(uint32_t stateFlags) throw() { try { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("setSystemState"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << SET_SYSTEM_STATE << static_cast(stateFlags); err = send_recv(command, response); if (err != ERR_OK) { std::ostringstream oss; oss << "DBRM: error: SessionManager::setSystemState() failed (network)"; log(oss.str(), logging::LOG_TYPE_ERROR); stateFlags = 0; return -1; } response >> err; if (err != ERR_OK) { std::ostringstream oss; oss << "DBRM: error: SessionManager::setSystemState() failed (got an error)"; log(oss.str(), logging::LOG_TYPE_ERROR); stateFlags = 0; return -1; } return 1; } catch (...) { } stateFlags = 0; return -1; } /* Clear the shm stateflags that are set in the parameter */ int DBRM::clearSystemState(uint32_t stateFlags) throw() { try { #ifdef BRM_INFO if (fDebug) { TRACER_WRITELATER("clearSystemState"); TRACER_WRITE; } #endif ByteStream command, response; uint8_t err; command << CLEAR_SYSTEM_STATE << static_cast(stateFlags); err = send_recv(command, response); if (err != ERR_OK) { std::ostringstream oss; oss << "DBRM: error: SessionManager::clearSystemState() failed (network)"; log(oss.str(), logging::LOG_TYPE_ERROR); return -1; } response >> err; if (err != ERR_OK) { std::ostringstream oss; oss << "DBRM: error: SessionManager::clearSystemState() failed (got an error)"; log(oss.str(), logging::LOG_TYPE_ERROR); return -1; } return 1; } catch (...) { } return -1; } /* Ping the controller node. Don't print anything. */ bool DBRM::isDBRMReady() throw() { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("isDBRMReady"); #endif boost::mutex::scoped_lock scoped(mutex); try { for (int attempt = 0; attempt < 2; ++attempt) { try { if (msgClient == NULL) { msgClient = MessageQueueClientPool::getInstance(masterName); } if (msgClient->connect()) { return true; } } catch (...) { } MessageQueueClientPool::releaseInstance(msgClient); msgClient = NULL; sleep(1); } } catch (...) { } return false; } /* This waits for the lock up to 30 sec. After 30 sec, the assumption is something * bad happened, and this will fix the lock state so that primproc can keep * running. These prevent a non-critical problem anyway. */ void DBRM::lockLBIDRange(LBID_t start, uint32_t count) { bool locked = false, lockedRange = false; LBIDRange range; const uint32_t waitInterval = 50000; // in usec const uint32_t maxRetries = 30000000 / waitInterval; // 30 secs uint32_t retries = 0; range.start = start; range.size = count; try { copylocks->lock(CopyLocks::WRITE); locked = true; while (copylocks->isLocked(range) && retries < maxRetries) { copylocks->release(CopyLocks::WRITE); locked = false; usleep(waitInterval); retries++; copylocks->lock(CopyLocks::WRITE); locked = true; } if (retries >= maxRetries) copylocks->forceRelease(range); copylocks->lockRange(range, -1); lockedRange = true; copylocks->confirmChanges(); copylocks->release(CopyLocks::WRITE); locked = false; } catch (...) { if (lockedRange) copylocks->releaseRange(range); if (locked) { copylocks->confirmChanges(); copylocks->release(CopyLocks::WRITE); } throw; } } void DBRM::releaseLBIDRange(LBID_t start, uint32_t count) { bool locked = false; LBIDRange range; range.start = start; range.size = count; try { copylocks->lock(CopyLocks::WRITE); locked = true; copylocks->releaseRange(range); copylocks->confirmChanges(); copylocks->release(CopyLocks::WRITE); locked = false; } catch (...) { if (locked) { copylocks->confirmChanges(); copylocks->release(CopyLocks::WRITE); } throw; } } /* OID Manager section */ int DBRM::allocOIDs(int num) { #ifdef BRM_INFO if (fDebug) TRACER_WRITENOW("allocOID"); #endif ByteStream command, response; uint8_t err; uint32_t ret; command << ALLOC_OIDS; command << (uint32_t) num; err = send_recv(command, response); if (err != ERR_OK) { cerr << "DBRM: OIDManager::allocOIDs(): network error" << endl; log("DBRM: OIDManager::allocOIDs(): network error", logging::LOG_TYPE_CRITICAL); return -1; } try { response >> err; if (err != ERR_OK) return -1; response >> ret; CHECK_EMPTY(response); return (int) ret; } catch (...) { log("DBRM: OIDManager::allocOIDs(): bad response", logging::LOG_TYPE_CRITICAL); return -1; } } void DBRM::returnOIDs(int start, int end) { ByteStream command, response; uint8_t err; command << RETURN_OIDS; command << (uint32_t) start; command << (uint32_t) end; err = send_recv(command, response); if (err == ERR_NETWORK) { cerr << "DBRM: OIDManager::returnOIDs(): network error" << endl; log("DBRM: OIDManager::returnOIDs(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: OIDManager::returnOIDs(): network error"); } try { response >> err; CHECK_EMPTY(response); } catch (...) { err = ERR_FAILURE; } if (err != ERR_OK) { log("DBRM: OIDManager::returnOIDs() failed", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: OIDManager::returnOIDs() failed"); } } int DBRM::oidm_size() { ByteStream command, response; uint8_t err; uint32_t ret; command << OIDM_SIZE; err = send_recv(command, response); if (err != ERR_OK) { cerr << "DBRM: OIDManager::size(): network error" << endl; log("DBRM: OIDManager::size(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: OIDManager::size(): network error"); } try { response >> err; if (err == ERR_OK) { response >> ret; CHECK_EMPTY(response); return ret; } CHECK_EMPTY(response); return -1; } catch (...) { log("DBRM: OIDManager::size(): bad response", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: OIDManager::size(): bad response"); } } int DBRM::allocVBOID(uint32_t dbroot) { ByteStream command, response; uint8_t err; uint32_t ret; command << ALLOC_VBOID << (uint32_t) dbroot; err = send_recv(command, response); if (err != ERR_OK) { cerr << "DBRM: OIDManager::allocVBOID(): network error" << endl; log("DBRM: OIDManager::allocVBOID(): network error", logging::LOG_TYPE_CRITICAL); return -1; } try { response >> err; if (err == ERR_OK) { response >> ret; CHECK_EMPTY(response); return ret; } CHECK_EMPTY(response); return -1; } catch (...) { log("DBRM: OIDManager::allocVBOID(): bad response", logging::LOG_TYPE_CRITICAL); return -1; } } int DBRM::getDBRootOfVBOID(uint32_t vbOID) { ByteStream command, response; uint8_t err; uint32_t ret; command << GETDBROOTOFVBOID << (uint32_t) vbOID; err = send_recv(command, response); if (err != ERR_OK) { cerr << "DBRM: OIDManager::getDBRootOfVBOID(): network error" << endl; log("DBRM: OIDManager::getDBRootOfVBOID(): network error", logging::LOG_TYPE_CRITICAL); return -1; } try { response >> err; if (err == ERR_OK) { response >> ret; CHECK_EMPTY(response); return (int) ret; } CHECK_EMPTY(response); return -1; } catch (...) { log("DBRM: OIDManager::getDBRootOfVBOID(): bad response", logging::LOG_TYPE_CRITICAL); return -1; } } vector DBRM::getVBOIDToDBRootMap() { ByteStream command, response; uint8_t err; vector ret; command << GETVBOIDTODBROOTMAP; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: OIDManager::getVBOIDToDBRootMap(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): network error"); } try { response >> err; if (err != ERR_OK) { log("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error"); } deserializeInlineVector(response, ret); CHECK_EMPTY(response); return ret; } catch (...) { log("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response"); } } uint64_t DBRM::getTableLock(const vector& pmList, uint32_t tableOID, string* ownerName, uint32_t* ownerPID, int32_t* ownerSessionID, int32_t* ownerTxnID, LockState state) { ByteStream command, response; uint8_t err; uint64_t ret; TableLockInfo tli; uint32_t tmp32; vector dbRootsList; OamCache* oamcache = OamCache::makeOamCache(); OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap(); int moduleId = 0; for (uint32_t i = 0; i < pmList.size(); i++) { moduleId = pmList[i]; vector dbroots = (*pmDbroots)[moduleId]; for (uint32_t j = 0; j < dbroots.size(); j++) dbRootsList.push_back((uint32_t)dbroots[j]); } tli.id = 0; tli.ownerName = *ownerName; tli.ownerPID = *ownerPID; tli.ownerSessionID = *ownerSessionID; tli.ownerTxnID = *ownerTxnID; tli.dbrootList = dbRootsList; tli.state = state; tli.tableOID = tableOID; tli.creationTime = time(NULL); command << GET_TABLE_LOCK << tli; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: getTableLock(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: getTableLock(): network error"); } response >> err; /* TODO: this means a save failure, need a specific exception type */ if (err != ERR_OK) throw runtime_error("Table lock save file failure"); response >> ret; if (ret == 0) { response >> *ownerPID; response >> *ownerName; response >> tmp32; *ownerSessionID = tmp32; response >> tmp32; *ownerTxnID = tmp32; } idbassert(response.length() == 0); return ret; } bool DBRM::releaseTableLock(uint64_t id) { ByteStream command, response; uint8_t err; command << RELEASE_TABLE_LOCK << id; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: releaseTableLock(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: releaseTableLock(): network error"); } response >> err; /* TODO: this means a save failure, need a specific exception type */ if (err != ERR_OK) throw runtime_error("Table lock save file failure"); response >> err; idbassert(response.length() == 0); return (bool) err; } bool DBRM::changeState(uint64_t id, LockState state) { ByteStream command, response; uint8_t err; command << CHANGE_TABLE_LOCK_STATE << id << (uint32_t) state; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: changeState(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: changeState(): network error"); } response >> err; /* TODO: this means a save failure, need a specific exception type */ if (err != ERR_OK) throw runtime_error("Table lock save file failure"); response >> err; idbassert(response.length() == 0); return (bool) err; } bool DBRM::changeOwner(uint64_t id, const string& ownerName, uint32_t ownerPID, int32_t ownerSessionID, int32_t ownerTxnID) { ByteStream command, response; uint8_t err; command << CHANGE_TABLE_LOCK_OWNER << id << ownerName << ownerPID << (uint32_t) ownerSessionID << (uint32_t) ownerTxnID; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: changeOwner(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: changeOwner(): network error"); } response >> err; /* TODO: this means a save failure, need a specific exception type */ if (err != ERR_OK) throw runtime_error("Table lock save file failure"); response >> err; idbassert(response.length() == 0); return (bool) err; } bool DBRM::checkOwner(uint64_t id) { ByteStream command, response; uint8_t err; command << OWNER_CHECK << id; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: ownerCheck(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: ownerCheck(): network error"); } response >> err; /* TODO: this means a save failure, need a specific exception type */ if (err != ERR_OK) throw runtime_error("Table lock save file failure"); response >> err; idbassert(response.length() == 0); return (bool) err; // Return true means the owner is valid } vector DBRM::getAllTableLocks() { ByteStream command, response; uint8_t err; vector ret; command << GET_ALL_TABLE_LOCKS; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: getAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: getAllTableLocks(): network error"); } response >> err; if (err != ERR_OK) { log("DBRM: getAllTableLocks(): processing error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: getAllTableLocks(): processing error"); } deserializeVector(response, ret); idbassert(response.length() == 0); return ret; } void DBRM::releaseAllTableLocks() { ByteStream command, response; uint8_t err; command << RELEASE_ALL_TABLE_LOCKS; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: releaseAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: releaseAllTableLocks(): network error"); } response >> err; idbassert(response.length() == 0); if (err != ERR_OK) throw runtime_error("DBRM: releaseAllTableLocks(): processing error"); } bool DBRM::getTableLockInfo(uint64_t id, TableLockInfo* tli) { ByteStream command, response; uint8_t err; command << GET_TABLE_LOCK_INFO << id; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: getTableLockInfo(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: getTableLockInfo(): network error"); } response >> err; if (err != ERR_OK) throw runtime_error("DBRM: getTableLockInfo() processing error"); response >> err; if (err) response >> *tli; return (bool) err; } void DBRM::startAISequence(uint32_t OID, uint64_t firstNum, uint32_t colWidth, execplan::CalpontSystemCatalog::ColDataType colDataType) { ByteStream command, response; uint8_t err; uint8_t tmp8 = colDataType; command << START_AI_SEQUENCE << OID << firstNum << colWidth << tmp8; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: startAISequence(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: startAISequence(): network error"); } response >> err; idbassert(response.length() == 0); if (err != ERR_OK) { log("DBRM: startAISequence(): processing error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: startAISequence(): processing error"); } } bool DBRM::getAIRange(uint32_t OID, uint32_t count, uint64_t* firstNum) { ByteStream command, response; uint8_t err; command << GET_AI_RANGE << OID << count; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: getAIRange(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: getAIRange(): network error"); } response >> err; if (err != ERR_OK) { log("DBRM: getAIRange(): processing error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: getAIRange(): processing error"); } response >> err; if (err == 0) return false; response >> *firstNum; idbassert(response.length() == 0); return true; } bool DBRM::getAIValue(uint32_t OID, uint64_t* value) { return getAIRange(OID, 0, value); } void DBRM::resetAISequence(uint32_t OID, uint64_t value) { ByteStream command, response; uint8_t err; command << RESET_AI_SEQUENCE << OID << value; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: resetAISequence(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: resetAISequence(): network error"); } response >> err; idbassert(response.length() == 0); if (err != ERR_OK) { log("DBRM: resetAISequence(): processing error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: resetAISequence(): processing error"); } } void DBRM::getAILock(uint32_t OID) { ByteStream command, response; uint8_t err; command << GET_AI_LOCK << OID; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: getAILock(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: getAILock(): network error"); } response >> err; idbassert(response.length() == 0); if (err != ERR_OK) { log("DBRM: getAILock(): processing error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: getAILock(): processing error"); } } void DBRM::releaseAILock(uint32_t OID) { ByteStream command, response; uint8_t err; command << RELEASE_AI_LOCK << OID; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM: releaseAILock(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: releaseAILock(): network error"); } response >> err; idbassert(response.length() == 0); if (err != ERR_OK) { log("DBRM: releaseAILock(): processing error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: releaseAILock(): processing error"); } } void DBRM::deleteAISequence(uint32_t OID) { ByteStream command, response; uint8_t err; command << DELETE_AI_SEQUENCE << OID; err = send_recv(command, response); if (err != ERR_OK) { log("DBRM:deleteAILock(): network error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: deleteAILock(): network error"); } response >> err; idbassert(response.length() == 0); if (err != ERR_OK) { log("DBRM: deleteAILock(): processing error", logging::LOG_TYPE_CRITICAL); throw runtime_error("DBRM: deleteAILock(): processing error"); } } void DBRM::invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN txnid, vector* plbidList) { // Here we want to minimize the number of calls to dbrm // Given that, and the fact that we need to know the column type // in order to set the invalid min and max correctly in the extents, // We do the following: // 1) Maintain a vector of all extents we've looked at. // 2) Get the list of uncommitted lbids for the transaction. // 3) Look in that list to see if we've already looked at this extent. // 4) If not, // a) lookup the min and max lbid for the extent it belongs to // b) lookup the column oid for that lbid // c) add to the vector of extents // 5) Create a list of CPInfo structures with the first lbid and col type of each extent // 6) Lookup the column type for each retrieved oid. // 7) mark each extent invalid, just like we would during update. This sets the proper // min and max (and set the state to CP_UPDATING. // 6) Call setExtentsMaxMin to set the state to CP_INVALID. vector localLBIDList; boost::shared_ptr csc; CPInfoList_t cpInfos; CPInfo aInfo; int oid; uint16_t dbRoot; uint32_t partitionNum; uint16_t segmentNum; uint32_t fileBlockOffset; // 2) Get the list of uncommitted lbids for the transaction, if we weren't given one. if (plbidList == NULL) { getUncommittedExtentLBIDs(static_cast(txnid), localLBIDList); plbidList = &localLBIDList; } if (plbidList->size() == 0) { return; // Nothing to do. } vector::const_iterator iter = plbidList->begin(); vector::const_iterator end = plbidList->end(); csc = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(); for (; iter != end; ++iter) { LBID_t lbid = *iter; aInfo.firstLbid = lbid; // lookup the column oid for that lbid (all we care about is oid here) if (em->lookupLocal(lbid, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset) == 0) { if (execplan::isUnsigned(csc->colType(oid).colDataType)) { aInfo.max = 0; aInfo.min = numeric_limits::max(); } else { aInfo.max = numeric_limits::min(); aInfo.min = numeric_limits::max(); } } else { // We have a problem, but we need to put something in. This should never happen. aInfo.max = numeric_limits::min(); aInfo.min = numeric_limits::max(); } aInfo.seqNum = -2; cpInfos.push_back(aInfo); } // Call setExtentsMaxMin to invalidate and set the proper max/min in each extent setExtentsMaxMin(cpInfos); } } //namespace