1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /*****************************************************************************
20  * $Id: slavecomm.cpp 1839 2013-02-01 17:42:03Z pleblanc $
21  *
22  ****************************************************************************/
23 #include <unistd.h>
24 #include <iostream>
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <fcntl.h>
28 #include <cstdio>
29 #include <ctime>
30 #ifdef _MSC_VER
31 #include <io.h>
32 #include <psapi.h>
33 #endif
34 
35 #include "messagequeue.h"
36 #include "bytestream.h"
37 #include "socketclosed.h"
38 #include "configcpp.h"
39 #include "IDBDataFile.h"
40 #include "IDBPolicy.h"
41 
42 #define SLAVECOMM_DLLEXPORT
43 #include "slavecomm.h"
44 #undef SLAVECOMM_DLLEXPORT
45 
46 #include "installdir.h"
47 
48 using namespace std;
49 using namespace messageqcpp;
50 using namespace idbdatafile;
51 
52 namespace
53 {
54 #ifdef USE_VERY_COMPLEX_DROP_CACHES
timespec_sub(const struct timespec & tv1,const struct timespec & tv2,double & tm)55 void timespec_sub(const struct timespec& tv1,
56                   const struct timespec& tv2,
57                   double& tm)
58 {
59     tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
60 }
61 #endif
62 }
63 
64 namespace BRM
65 {
66 
SlaveComm(string hostname,SlaveDBRMNode * s)67 SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) :
68     slave(s), currentSaveFile(NULL), journalh(NULL)
69 #ifdef _MSC_VER
70     , fPids(0), fMaxPids(64)
71 #endif
72 {
73     config::Config* config = config::Config::makeConfig();
74     string tmp;
75 
76     bool tellUser = true;
77 
78     for (;;)
79     {
80         try
81         {
82             server = new MessageQueueServer(hostname);
83             break;
84         }
85         catch (runtime_error& re)
86         {
87             string what = re.what();
88 
89             if (what.find("Address already in use") != string::npos)
90             {
91                 if (tellUser)
92                 {
93                     cerr << "Address already in use, retrying..." << endl;
94                     tellUser = false;
95                 }
96 
97                 sleep(5);
98             }
99             else
100             {
101                 throw;
102             }
103         }
104     }
105 
106     string tmpDir = startup::StartUp::tmpDir();
107 
108     /* NOTE: this string has to match whatever is designated as the first slave */
109     if (hostname == "DBRM_Worker1")
110     {
111         try
112         {
113             savefile = config->getConfig("SystemConfig", "DBRMRoot");
114         }
115         catch (exception& e)
116         {
117             savefile = tmpDir + "/BRM_SaveFiles";
118         }
119 
120         if (savefile == "")
121             savefile = tmpDir + "/BRM_SaveFiles";
122 
123         tmp = "";
124 
125         try
126         {
127             tmp = config->getConfig("SystemConfig", "DBRMSnapshotInterval");
128         }
129         catch (exception& e) { }
130 
131         if (tmp == "")
132             snapshotInterval = 100000;
133         else
134             snapshotInterval = config->fromText(tmp);
135 
136         journalCount = 0;
137 
138         firstSlave = true;
139         journalName = savefile + "_journal";
140         const char* filename = journalName.c_str();
141 
142         journalh = IDBDataFile::open(
143                        IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "a", 0);
144         if (journalh == NULL)
145             throw runtime_error("Could not open the BRM journal for writing!");
146     }
147     else
148     {
149         savefile = "";
150         firstSlave = false;
151     }
152 
153     takeSnapshot = false;
154     doSaveDelta = false;
155     saveFileToggle = true;	// start with the suffix "A" rather than "B".  Arbitrary.
156     release = false;
157     die = false;
158     standalone = false;
159     printOnly = false;
160     //@Bug 2258 DBRMTimeOut is default to 20 seconds
161     //@BUG 3189 set timeout to 1 second, don't use config setting
162 //	std::string retStr = config->getConfig("SystemConfig", "DBRMTimeOut");
163 //	int secondsToWait = config->fromText(retStr);
164     MSG_TIMEOUT.tv_nsec = 0;
165 //	if ( secondsToWait > 0 )
166 //		MSG_TIMEOUT.tv_sec = secondsToWait;
167 //	else
168     MSG_TIMEOUT.tv_sec = 1;
169 }
170 
SlaveComm()171 SlaveComm::SlaveComm()
172     : currentSaveFile(NULL), journalh(NULL)
173 #ifdef _MSC_VER
174     , fPids(0), fMaxPids(64)
175 #endif
176 {
177     config::Config* config = config::Config::makeConfig();
178 
179     string tmpDir = startup::StartUp::tmpDir();
180 
181     try
182     {
183         savefile = config->getConfig("SystemConfig", "DBRMRoot");
184     }
185     catch (exception& e)
186     {
187         savefile = tmpDir + "/BRM_SaveFiles";
188     }
189 
190     if (savefile == "")
191         savefile = tmpDir + "/BRM_SaveFiles";
192 
193     journalName = savefile + "_journal";
194 
195     takeSnapshot = false;
196     doSaveDelta = false;
197     saveFileToggle = true;	// start with the suffix "A" rather than "B".  Arbitrary.
198     release = false;
199     die = false;
200     firstSlave = false;
201     server = NULL;
202     standalone = true;
203     printOnly = false;
204     slave = new SlaveDBRMNode();
205 }
206 
~SlaveComm()207 SlaveComm::~SlaveComm()
208 {
209     delete server;
210     server = NULL;
211 
212     if (firstSlave)
213     {
214         delete currentSaveFile;
215         currentSaveFile = NULL;
216     }
217 
218     delete journalh;
219     journalh = NULL;
220 }
221 
stop()222 void SlaveComm::stop()
223 {
224     die = true;
225 }
226 
reset()227 void SlaveComm::reset()
228 {
229     release = true;
230 }
231 
run()232 void SlaveComm::run()
233 {
234     ByteStream msg;
235 
236     while (!die)
237     {
238 #ifdef BRM_VERBOSE
239 //		cerr << "WorkerComm: waiting for a connection" << endl;
240 #endif
241         master = server->accept(&MSG_TIMEOUT);
242 
243         while (!die && master.isOpen())
244         {
245             try
246             {
247                 msg = master.read(&MSG_TIMEOUT);
248             }
249             catch (SocketClosed& e)
250             {
251 #ifdef BRM_VERBOSE
252                 cerr << "WorkerComm: remote closed" << endl;
253 #endif
254                 break;
255             }
256             catch (...)
257             {
258 #ifdef BRM_VERBOSE
259                 cerr << "WorkerComm: read failed, closing connection" << endl;
260 #endif
261                 break;
262             }
263 
264             if (release)
265                 break;
266 
267             if (die)   // || msg.length() == 0)
268                 break;
269 
270             if (msg.length() == 0)
271                 continue;
272 
273 #ifdef BRM_VERBOSE
274             cerr << "WorkerComm: got a command" << endl;
275 #endif
276 
277             try
278             {
279                 processCommand(msg);
280             }
281             catch (exception& e)
282             {
283                 /*
284                  * The error is either that msg was too short (really slow sender possibly),
285                  * there was a bigger communication failure, or there was a file IO
286                  * error.  Closing the connection for now.
287                  */
288                 cerr << e.what() << endl;
289                 do_undo();
290                 master.close();
291             }
292         }
293 
294         release = false;
295         master.close();
296     }
297 
298 #ifdef BRM_VERBOSE
299     cerr << "WorkerComm: exiting..." << endl;
300 #endif
301 }
302 
processCommand(ByteStream & msg)303 void SlaveComm::processCommand(ByteStream& msg)
304 {
305     uint8_t cmd;
306 
307     if (firstSlave)
308     {
309         msg.peek(cmd);
310 
311         if (cmd != CONFIRM)
312             delta = msg;
313     }
314 
315     msg >> cmd;
316 #ifdef BRM_VERBOSE
317     cerr << "WorkerComm: command " << (int) cmd << endl;
318 #endif
319 
320     switch (cmd)
321     {
322         case CREATE_STRIPE_COLUMN_EXTENTS:
323             do_createStripeColumnExtents(msg);
324             break;
325 
326         case CREATE_COLUMN_EXTENT_DBROOT:
327             do_createColumnExtent_DBroot(msg);
328             break;
329 
330         case CREATE_COLUMN_EXTENT_EXACT_FILE:
331             do_createColumnExtentExactFile(msg);
332             break;
333 
334         case CREATE_DICT_STORE_EXTENT:
335             do_createDictStoreExtent(msg);
336             break;
337 
338         case ROLLBACK_COLUMN_EXTENTS_DBROOT:
339             do_rollbackColumnExtents_DBroot(msg);
340             break;
341 
342         case ROLLBACK_DICT_STORE_EXTENTS_DBROOT:
343             do_rollbackDictStoreExtents_DBroot(msg);
344             break;
345 
346         case DELETE_EMPTY_COL_EXTENTS:
347             do_deleteEmptyColExtents(msg);
348             break;
349 
350         case DELETE_EMPTY_DICT_STORE_EXTENTS:
351             do_deleteEmptyDictStoreExtents(msg);
352             break;
353 
354         case DELETE_OID:
355             do_deleteOID(msg);
356             break;
357 
358         case DELETE_OIDS:
359             do_deleteOIDs(msg);
360             break;
361 
362         case SET_LOCAL_HWM:
363             do_setLocalHWM(msg);
364             break;
365 
366         case BULK_SET_HWM:
367             do_bulkSetHWM(msg);
368             break;
369 
370         case BULK_SET_HWM_AND_CP:
371             do_bulkSetHWMAndCP(msg);
372             break;
373 
374         case WRITE_VB_ENTRY:
375             do_writeVBEntry(msg);
376             break;
377 
378         case BULK_WRITE_VB_ENTRY:
379             do_bulkWriteVBEntry(msg);
380             break;
381 
382         case BEGIN_VB_COPY:
383             do_beginVBCopy(msg);
384             break;
385 
386         case END_VB_COPY:
387             do_endVBCopy(msg);
388             break;
389 
390         case VB_ROLLBACK1:
391             do_vbRollback1(msg);
392             break;
393 
394         case VB_ROLLBACK2:
395             do_vbRollback2(msg);
396             break;
397 
398         case VB_COMMIT:
399             do_vbCommit(msg);
400             break;
401 
402         case BRM_UNDO:
403             do_undo();
404             break;
405 
406         case CONFIRM:
407             do_confirm();
408             break;
409 
410         case FLUSH_INODE_CACHES:
411             do_flushInodeCache();
412             break;
413 
414         case BRM_CLEAR:
415             do_clear();
416             break;
417 
418         case MARKEXTENTINVALID:
419             do_markInvalid(msg);
420             break;
421 
422         case MARKMANYEXTENTSINVALID:
423             do_markManyExtentsInvalid(msg);
424             break;
425 
426         case SETEXTENTMAXMIN:
427             do_setExtentMaxMin(msg);
428             break;
429 
430         case SETMANYEXTENTSMAXMIN:
431             do_setExtentsMaxMin(msg);
432             break;
433 
434         case TAKE_SNAPSHOT:
435             do_takeSnapshot();
436             break;
437 
438         case MERGEMANYEXTENTSMAXMIN:
439             do_mergeExtentsMaxMin(msg);
440             break;
441 
442         case DELETE_PARTITION:
443             do_deletePartition(msg);
444             break;
445 
446         case MARK_PARTITION_FOR_DELETION:
447             do_markPartitionForDeletion(msg);
448             break;
449 
450         case MARK_ALL_PARTITION_FOR_DELETION:
451             do_markAllPartitionForDeletion(msg);
452             break;
453 
454         case RESTORE_PARTITION:
455             do_restorePartition(msg);
456             break;
457 
458         case OWNER_CHECK:
459             do_ownerCheck(msg);
460             break;
461 
462         case LOCK_LBID_RANGES:
463             do_dmlLockLBIDRanges(msg);
464             break;
465 
466         case RELEASE_LBID_RANGES:
467             do_dmlReleaseLBIDRanges(msg);
468             break;
469 
470         case DELETE_DBROOT:
471             do_deleteDBRoot(msg);
472             break;
473 
474         case BULK_UPDATE_DBROOT:
475             do_bulkUpdateDBRoot(msg);
476             break;
477 
478         default:
479             cerr << "WorkerComm: unknown command " << (int) cmd << endl;
480     }
481 }
482 
483 //------------------------------------------------------------------------------
484 // Process a request to create a column extent for a specific OID and DBRoot.
485 //------------------------------------------------------------------------------
do_createStripeColumnExtents(ByteStream & msg)486 void SlaveComm::do_createStripeColumnExtents(ByteStream& msg)
487 {
488     int        err;
489     uint16_t   tmp16;
490     uint16_t   tmp32;
491     uint16_t   dbRoot;
492     uint32_t   partitionNum;
493     uint16_t   segmentNum;
494     std::vector<CreateStripeColumnExtentsArgIn>  cols;
495     std::vector<CreateStripeColumnExtentsArgOut> extents;
496     ByteStream reply;
497 
498 #ifdef BRM_VERBOSE
499     cerr << "WorkerComm: do_createStripeColumnExtents()" << endl;
500 #endif
501 
502     deserializeInlineVector(msg, cols);
503     msg >> tmp16;
504     dbRoot = tmp16;
505     msg >> tmp32;
506     partitionNum = tmp32;
507 
508     if (printOnly)
509     {
510         cout << "createStripeColumnExtents().  " <<
511              "DBRoot=" << dbRoot << "; Part#=" << partitionNum << endl;
512 
513         for (uint32_t i = 0; i < cols.size(); i++)
514             cout << "StripeColExt arg "    << i + 1 <<
515                  ": oid="  << cols[i].oid   <<
516                  " width=" << cols[i].width << endl;
517 
518         return;
519     }
520 
521     err = slave->createStripeColumnExtents(cols, dbRoot,
522                                            partitionNum, segmentNum, extents);
523     reply << (uint8_t) err;
524 
525     if (err == ERR_OK)
526     {
527         reply << partitionNum;
528         reply << segmentNum;
529         serializeInlineVector( reply, extents );
530     }
531 
532 #ifdef BRM_VERBOSE
533     cerr << "WorkerComm: do_createStripeColumnExtents() err code is " <<
534          err << endl;
535 #endif
536 
537     if (!standalone)
538         master.write(reply);
539 
540     // see bug 3596.  Need to make sure a snapshot file exists.
541     if ((cols.size() > 0) && (cols[0].oid < 3000))
542         takeSnapshot = true;
543     else
544         doSaveDelta = true;
545 }
546 
547 //------------------------------------------------------------------------------
548 // Process a request to create a column extent for a specific OID and DBRoot.
549 //------------------------------------------------------------------------------
do_createColumnExtent_DBroot(ByteStream & msg)550 void SlaveComm::do_createColumnExtent_DBroot(ByteStream& msg)
551 {
552     int allocdSize, err;
553     uint8_t    tmp8;
554     uint16_t   tmp16;
555     uint32_t   tmp32;
556     OID_t      oid;
557     uint32_t   colWidth;
558     uint16_t   dbRoot;
559     uint32_t   partitionNum;
560     uint16_t   segmentNum;
561     LBID_t     lbid;
562     uint32_t   startBlockOffset;
563     ByteStream reply;
564     execplan::CalpontSystemCatalog::ColDataType colDataType;
565 #ifdef BRM_VERBOSE
566     cerr << "WorkerComm: do_createColumnExtent_DBroot()" << endl;
567 #endif
568 
569     msg >> tmp32;
570     oid = tmp32;
571     msg >> tmp32;
572     colWidth = tmp32;
573     msg >> tmp16;
574     dbRoot = tmp16;
575     msg >> tmp32;
576     partitionNum = tmp32;
577     msg >> tmp16;
578     segmentNum = tmp16;
579     msg >> tmp8;
580     colDataType = (execplan::CalpontSystemCatalog::ColDataType)tmp8;
581 
582     if (printOnly)
583     {
584         cout << "createColumnExtent_DBroot: oid=" << oid <<
585              " colWidth=" << colWidth <<
586              " dbRoot="   << dbRoot   <<
587              " partitionNum=" << partitionNum <<
588              " segmentNum=" << segmentNum << endl;
589         return;
590     }
591 
592     err = slave->createColumnExtent_DBroot(oid, colWidth, dbRoot, colDataType,
593                                            partitionNum, segmentNum, lbid, allocdSize, startBlockOffset);
594     reply << (uint8_t) err;
595 
596     if (err == ERR_OK)
597     {
598         reply << partitionNum;
599         reply << segmentNum;
600         reply << (uint64_t) lbid;
601         reply << (uint32_t) allocdSize;
602         reply << (uint32_t) startBlockOffset;
603     }
604 
605 #ifdef BRM_VERBOSE
606     cerr << "WorkerComm: do_createColumnExtent_DBroot() err code is " <<
607          err << endl;
608 #endif
609 
610     if (!standalone)
611         master.write(reply);
612 
613     if (oid < 3000)  // see bug 3596.  Need to make sure a snapshot file exists.
614         takeSnapshot = true;
615     else
616         doSaveDelta = true;
617 }
618 
619 //------------------------------------------------------------------------------
620 // Process a request to create a column extent for the exact segment file
621 // specified by the requested OID, DBRoot, partition, and segment.
622 //------------------------------------------------------------------------------
do_createColumnExtentExactFile(ByteStream & msg)623 void SlaveComm::do_createColumnExtentExactFile(ByteStream& msg)
624 {
625     int allocdSize, err;
626     uint8_t    tmp8;
627     uint16_t   tmp16;
628     uint32_t   tmp32;
629     OID_t      oid;
630     uint32_t   colWidth;
631     uint16_t   dbRoot;
632     uint32_t   partitionNum;
633     uint16_t   segmentNum;
634     LBID_t     lbid;
635     uint32_t   startBlockOffset;
636     ByteStream reply;
637     execplan::CalpontSystemCatalog::ColDataType colDataType;
638 #ifdef BRM_VERBOSE
639     cerr << "WorkerComm: do_createColumnExtentExactFile()" << endl;
640 #endif
641 
642     msg >> tmp32;
643     oid = tmp32;
644     msg >> tmp32;
645     colWidth = tmp32;
646     msg >> tmp16;
647     dbRoot = tmp16;
648     msg >> tmp32;
649     partitionNum = tmp32;
650     msg >> tmp16;
651     segmentNum = tmp16;
652     msg >> tmp8;
653     colDataType = (execplan::CalpontSystemCatalog::ColDataType)tmp8;
654 
655     if (printOnly)
656     {
657         cout << "createColumnExtentExactFile: oid=" << oid <<
658              " colWidth=" << colWidth <<
659              " dbRoot="   << dbRoot   <<
660              " partitionNum=" << partitionNum <<
661              " segmentNum=" << segmentNum << endl;
662         return;
663     }
664 
665     err = slave->createColumnExtentExactFile(oid, colWidth, dbRoot,
666             partitionNum, segmentNum, colDataType, lbid, allocdSize, startBlockOffset);
667     reply << (uint8_t) err;
668 
669     if (err == ERR_OK)
670     {
671         reply << partitionNum;
672         reply << segmentNum;
673         reply << (uint64_t) lbid;
674         reply << (uint32_t) allocdSize;
675         reply << (uint32_t) startBlockOffset;
676     }
677 
678 #ifdef BRM_VERBOSE
679     cerr << "WorkerComm: do_createColumnExtentExactFile() err code is " <<
680          err << endl;
681 #endif
682 
683     if (!standalone)
684         master.write(reply);
685 
686     if (oid < 3000)  // see bug 3596.  Need to make sure a snapshot file exists.
687         takeSnapshot = true;
688     else
689         doSaveDelta = true;
690 }
691 
692 //------------------------------------------------------------------------------
693 // Process a request to create a dictionary store extent.
694 //------------------------------------------------------------------------------
do_createDictStoreExtent(ByteStream & msg)695 void SlaveComm::do_createDictStoreExtent(ByteStream& msg)
696 {
697     int allocdSize, err;
698     uint16_t   tmp16;
699     uint32_t   tmp32;
700     OID_t      oid;
701     uint16_t   dbRoot;
702     uint32_t   partitionNum;
703     uint16_t   segmentNum;
704     LBID_t     lbid;
705     ByteStream reply;
706 #ifdef BRM_VERBOSE
707     cerr << "WorkerComm: do_createDictStoreExtent()" << endl;
708 #endif
709 
710     msg >> tmp32;
711     oid = tmp32;
712     msg >> tmp16;
713     dbRoot = tmp16;
714     msg >> tmp32;
715     partitionNum = tmp32;
716     msg >> tmp16;
717     segmentNum = tmp16;
718 
719     if (printOnly)
720     {
721         cout << "createDictStoreExtent: oid=" << oid << " dbRoot=" << dbRoot <<
722              " partitionNum=" << partitionNum << " segmentNum=" << segmentNum << endl;
723         return;
724     }
725 
726     err = slave->createDictStoreExtent(oid, dbRoot,
727                                        partitionNum, segmentNum, lbid, allocdSize);
728     reply << (uint8_t) err;
729 
730     if (err == ERR_OK)
731     {
732         reply << (uint64_t) lbid;
733         reply << (uint32_t) allocdSize;
734     }
735 
736 #ifdef BRM_VERBOSE
737     cerr << "WorkerComm: do_createDictStoreExtent() err code is " << err << endl;
738 #endif
739 
740     if (!standalone)
741         master.write(reply);
742 
743     doSaveDelta = true;
744 }
745 
746 //------------------------------------------------------------------------------
747 // Process a request to rollback (delete) a set of column extents.
748 // for a given OID and DBRoot.
749 //------------------------------------------------------------------------------
do_rollbackColumnExtents_DBroot(ByteStream & msg)750 void SlaveComm::do_rollbackColumnExtents_DBroot(ByteStream& msg)
751 {
752     int        err;
753     OID_t      oid;
754     bool       bDeleteAll;
755     uint32_t   partitionNum;
756     uint16_t   segmentNum;
757     uint16_t   dbRoot;
758     HWM_t      hwm;
759     uint8_t    tmp8;
760     uint16_t   tmp16;
761     uint32_t   tmp32;
762     ByteStream reply;
763 
764 #ifdef BRM_VERBOSE
765     cerr << "WorkerComm: do_rollbackColumnExtents_DBroot()" << endl;
766 #endif
767 
768     msg >> tmp32;
769     oid = tmp32;
770     msg >> tmp8;
771     bDeleteAll = tmp8;
772     msg >> tmp16;
773     dbRoot = tmp16;
774     msg >> tmp32;
775     partitionNum = tmp32;
776     msg >> tmp16;
777     segmentNum = tmp16;
778     msg >> tmp32;
779     hwm = tmp32;
780 
781     if (printOnly)
782     {
783         cout << "rollbackColumnExtents_DBroot: oid=" << oid <<
784              " bDeleteAll=" << bDeleteAll << " dbRoot=" << dbRoot <<
785              " partitionNum=" << partitionNum <<
786              " segmentNum=" << segmentNum << " hwm=" << hwm << endl;
787         return;
788     }
789 
790     err = slave->rollbackColumnExtents_DBroot(
791               oid, bDeleteAll, dbRoot, partitionNum, segmentNum, hwm);
792     reply << (uint8_t) err;
793 
794 #ifdef BRM_VERBOSE
795     cerr << "WorkerComm: do_rollbackColumnExtents_DBroot() err code is " <<
796          err << endl;
797 #endif
798 
799     if (!standalone)
800         master.write(reply);
801 
802     doSaveDelta = true;
803 }
804 
805 //------------------------------------------------------------------------------
806 // Process a request to rollback (delete) a set of column extents.
807 // for a given OID and DBRoot.
808 //------------------------------------------------------------------------------
do_rollbackDictStoreExtents_DBroot(ByteStream & msg)809 void SlaveComm::do_rollbackDictStoreExtents_DBroot(ByteStream& msg)
810 {
811     int        err;
812     OID_t      oid;
813     uint32_t   partitionNum;
814     uint16_t   dbRoot;
815     uint32_t   tmp32;
816     uint16_t   tmp16;
817     ByteStream reply;
818     vector<uint16_t> segNums;
819     vector<HWM_t> hwms;
820 
821 #ifdef BRM_VERBOSE
822     cerr << "WorkerComm: do_rollbackDictStoreExtents()" << endl;
823 #endif
824 
825     msg >> tmp32;
826     oid = tmp32;
827     msg >> tmp16;
828     dbRoot = tmp16;
829     msg >> tmp32;
830     partitionNum = tmp32;
831     deserializeVector(msg, segNums);
832     deserializeVector(msg, hwms);
833 
834     if (printOnly)
835     {
836         cout << "rollbackDictStore: oid=" << oid <<
837              " dbRoot=" << dbRoot <<
838              " partitionNum=" << partitionNum <<
839              " hwms..." << endl;
840 
841         for (uint32_t i = 0; i < hwms.size(); i++)
842             cout << "   " << i << ": " << hwms[i] << endl;
843 
844         return;
845     }
846 
847 
848     err = slave->rollbackDictStoreExtents_DBroot(
849               oid, dbRoot, partitionNum, segNums, hwms);
850     reply << (uint8_t) err;
851 
852 #ifdef BRM_VERBOSE
853     cerr << "WorkerComm: do_rollbackDictStoreExtents() err code is " <<
854          err << endl;
855 #endif
856 
857     if (!standalone)
858         master.write(reply);
859 
860     doSaveDelta = true;
861 }
862 
do_deleteEmptyColExtents(messageqcpp::ByteStream & msg)863 void SlaveComm::do_deleteEmptyColExtents(messageqcpp::ByteStream& msg)
864 {
865     OID_t oid;
866     uint32_t tmp1;
867     uint16_t tmp2;
868     int err;
869     ByteStream reply;
870     uint32_t size;
871     ExtentsInfoMap_t extentsInfoMap;
872 
873 #ifdef BRM_VERBOSE
874     cerr << "WorkerComm: do_deleteEmptyColExtents()" << endl;
875 #endif
876 
877     msg >> size;
878 
879     if (printOnly)
880         cout << "deleteEmptyColExtents: size=" << size << " extentsInfoMap..." << endl;
881 
882     for ( unsigned i = 0; i < size; i++ )
883     {
884         msg >> tmp1;
885         oid = tmp1;
886         extentsInfoMap[oid].oid = oid;
887         msg >> tmp1;
888         extentsInfoMap[oid].partitionNum = tmp1;
889         msg >> tmp2;
890         extentsInfoMap[oid].segmentNum = tmp2;
891         msg >> tmp2;
892         extentsInfoMap[oid].dbRoot = tmp2;
893         msg >> tmp1;
894         extentsInfoMap[oid].hwm = tmp1;
895 
896         if (printOnly)
897         {
898             cout << "   oid=" << oid << " partitionNum=" << extentsInfoMap[oid].partitionNum
899                  << " segmentNum=" << extentsInfoMap[oid].segmentNum << " dbRoot=" <<
900                  extentsInfoMap[oid].dbRoot << " hwm=" << extentsInfoMap[oid].hwm << endl;
901         }
902     }
903 
904     if (printOnly)
905         return;
906 
907     err = slave->deleteEmptyColExtents(extentsInfoMap);
908     reply << (uint8_t) err;
909 #ifdef BRM_VERBOSE
910     cerr << "WorkerComm: do_deleteEmptyColExtents() err code is " << err << endl;
911 #endif
912 
913     if (!standalone)
914         master.write(reply);
915 
916     doSaveDelta = true;
917 }
918 
do_deleteEmptyDictStoreExtents(messageqcpp::ByteStream & msg)919 void SlaveComm::do_deleteEmptyDictStoreExtents(messageqcpp::ByteStream& msg)
920 {
921     OID_t oid;
922     uint32_t tmp1;
923     uint16_t tmp2;
924     uint8_t  tmp3;
925     int err;
926     ByteStream reply;
927     uint32_t size;
928     ExtentsInfoMap_t extentsInfoMap;
929 
930 #ifdef BRM_VERBOSE
931     cerr << "WorkerComm: do_deleteEmptyDictStoreExtents()" << endl;
932 #endif
933 
934     msg >> size;
935 
936     if (printOnly)
937         cout << "deleteEmptyDictStoreExtents: size=" << size << " extentsInfoMap..." << endl;
938 
939     for ( unsigned i = 0; i < size; i++ )
940     {
941         msg >> tmp1;
942         oid = tmp1;
943         extentsInfoMap[oid].oid = oid;
944         msg >> tmp1;
945         extentsInfoMap[oid].partitionNum = tmp1;
946         msg >> tmp2;
947         extentsInfoMap[oid].segmentNum = tmp2;
948         msg >> tmp2;
949         extentsInfoMap[oid].dbRoot = tmp2;
950         msg >> tmp1;
951         extentsInfoMap[oid].hwm = tmp1;
952         msg >> tmp3;
953         extentsInfoMap[oid].newFile = (bool) tmp3;
954 
955         if (printOnly)
956         {
957             cout << "  oid=" << oid << " partitionNum=" << extentsInfoMap[oid].partitionNum <<
958                  " segmentNum=" << extentsInfoMap[oid].segmentNum << " dbRoot=" <<
959                  extentsInfoMap[oid].dbRoot << " hwm=" << extentsInfoMap[oid].hwm <<
960                  " newFile=" << (int) extentsInfoMap[oid].newFile << endl;
961         }
962     }
963 
964     if (printOnly)
965         return;
966 
967     err = slave->deleteEmptyDictStoreExtents(extentsInfoMap);
968     reply << (uint8_t) err;
969 #ifdef BRM_VERBOSE
970     cerr << "WorkerComm: do_deleteEmptyDictStoreExtents() err code is " << err << endl;
971 #endif
972 
973     if (!standalone)
974         master.write(reply);
975 
976     doSaveDelta = true;
977 }
978 
do_deleteOID(ByteStream & msg)979 void SlaveComm::do_deleteOID(ByteStream& msg)
980 {
981     OID_t oid;
982     uint32_t tmp;
983     int err;
984     ByteStream reply;
985 
986 #ifdef BRM_VERBOSE
987     cerr << "WorkerComm: do_deleteOID()" << endl;
988 #endif
989 
990     msg >> tmp;
991     oid = tmp;
992 
993     if (printOnly)
994     {
995         cout << "deleteOID: oid=" << oid << endl;
996         return;
997     }
998 
999     err = slave->deleteOID(oid);
1000     reply << (uint8_t) err;
1001 #ifdef BRM_VERBOSE
1002     cerr << "WorkerComm: do_deleteOID() err code is " << err << endl;
1003 #endif
1004 
1005     if (!standalone)
1006         master.write(reply);
1007 
1008     doSaveDelta = true;
1009 }
1010 
do_deleteOIDs(ByteStream & msg)1011 void SlaveComm::do_deleteOIDs(ByteStream& msg)
1012 {
1013     OID_t oid;
1014     uint32_t tmp;
1015     int err;
1016     ByteStream reply;
1017     uint32_t size;
1018     std::vector<OID_t> Oids;
1019     OidsMap_t oidsMap;
1020 #ifdef BRM_VERBOSE
1021     cerr << "WorkerComm: do_deleteOIDs()" << endl;
1022 #endif
1023 
1024     msg >> size;
1025 
1026     if (printOnly)
1027         cout << "deleteOIDs: size=" << size << endl;
1028 
1029     for ( unsigned i = 0; i < size; i++ )
1030     {
1031         msg >> tmp;
1032         oid = tmp;
1033         oidsMap[oid] = oid;
1034 
1035         if (printOnly)
1036             cout << "  oid=" << oid << endl;
1037     }
1038 
1039     if (printOnly)
1040         return;
1041 
1042     err = slave->deleteOIDs(oidsMap);
1043     reply << (uint8_t) err;
1044 #ifdef BRM_VERBOSE
1045     cerr << "WorkerComm: do_deleteOIDs() err code is " << err << endl;
1046 #endif
1047 
1048     if (!standalone)
1049         master.write(reply);
1050 
1051     doSaveDelta = true;
1052 }
1053 
1054 //------------------------------------------------------------------------------
1055 // Process a request to set the local HWM relative to a specific OID, partition,
1056 // and segment number.
1057 //------------------------------------------------------------------------------
do_setLocalHWM(ByteStream & msg)1058 void SlaveComm::do_setLocalHWM(ByteStream& msg)
1059 {
1060     OID_t      oid;
1061     HWM_t      hwm;
1062     uint32_t   partitionNum;
1063     uint16_t   segmentNum;
1064     int        err;
1065     uint16_t   tmp16;
1066     uint32_t   tmp32;
1067     ByteStream reply;
1068 
1069 #ifdef BRM_VERBOSE
1070     cerr << "WorkerComm: do_setLocalHWM()" << endl;
1071 #endif
1072 
1073     msg >> tmp32;
1074     oid = tmp32;
1075     msg >> tmp32;
1076     partitionNum = tmp32;
1077     msg >> tmp16;
1078     segmentNum = tmp16;
1079     msg >> tmp32;
1080     hwm = tmp32;
1081 
1082     if (printOnly)
1083     {
1084         cout << "setLocalHWM: oid=" << oid << " partitionNum=" << partitionNum <<
1085              " segmentNum=" << segmentNum << " hwm=" << hwm << endl;
1086         return;
1087     }
1088 
1089     err = slave->setLocalHWM(oid, partitionNum, segmentNum, hwm, firstSlave);
1090     reply << (uint8_t) err;
1091 #ifdef BRM_VERBOSE
1092     cerr << "WorkerComm: do_setLocalHWM() err code is " << err << endl;
1093 #endif
1094 
1095     if (!standalone)
1096         master.write(reply);
1097 
1098     doSaveDelta = true;
1099 }
1100 
do_bulkSetHWM(ByteStream & msg)1101 void SlaveComm::do_bulkSetHWM(ByteStream& msg)
1102 {
1103     vector<BulkSetHWMArg> args;
1104     int        err;
1105     VER_t transID;
1106     uint32_t tmp32;
1107     ByteStream reply;
1108 
1109 #ifdef BRM_VERBOSE
1110     cerr << "WorkerComm: do_setLocalHWM()" << endl;
1111 #endif
1112 
1113     deserializeInlineVector(msg, args);
1114     msg >> tmp32;
1115     transID = tmp32;
1116 
1117     if (printOnly)
1118     {
1119         cout << "bulkSetHWM().  TransID = " << transID << endl;
1120 
1121         for (uint32_t i = 0; i < args.size(); i++)
1122             cout << "bulkSetHWM arg " << i + 1 << ": oid=" << args[i].oid << " partitionNum=" << args[i].partNum <<
1123                  " segmentNum=" << args[i].segNum << " hwm=" << args[i].hwm << endl;
1124 
1125         return;
1126     }
1127 
1128     err = slave->bulkSetHWM(args, transID, firstSlave);
1129     reply << (uint8_t) err;
1130 #ifdef BRM_VERBOSE
1131     cerr << "WorkerComm: do_setLocalHWM() err code is " << err << endl;
1132 #endif
1133 
1134     if (!standalone)
1135         master.write(reply);
1136 
1137     doSaveDelta = true;
1138 }
1139 
do_bulkSetHWMAndCP(ByteStream & msg)1140 void SlaveComm::do_bulkSetHWMAndCP(ByteStream& msg)
1141 {
1142     vector<BulkSetHWMArg> hwmArgs;
1143     vector<CPInfo> setCPDataArgs;
1144     vector<CPInfoMerge> mergeCPDataArgs;
1145     int        err;
1146     VER_t transID;
1147     uint32_t tmp32;
1148     ByteStream reply;
1149 
1150 #ifdef BRM_VERBOSE
1151     cerr << "WorkerComm: do_setLocalHWM()" << endl;
1152 #endif
1153 
1154     deserializeInlineVector(msg, hwmArgs);
1155     deserializeInlineVector(msg, setCPDataArgs);
1156     deserializeInlineVector(msg, mergeCPDataArgs);
1157     msg >> tmp32;
1158     transID = tmp32;
1159 
1160 #if 0
1161 
1162     if (printOnly)
1163     {
1164         cout << "bulkSetHWM().  TransID = " << transID << endl;
1165 
1166         for (uint32_t i = 0; i < args.size(); i++)
1167             cout << "bulkSetHWM arg " << i + 1 << ": oid=" << args[i].oid << " partitionNum=" << args[i].partNum <<
1168                  " segmentNum=" << args[i].segNum << " hwm=" << args[i].hwm << endl;
1169 
1170         return;
1171     }
1172 
1173 #endif
1174 
1175     err = slave->bulkSetHWMAndCP(hwmArgs, setCPDataArgs, mergeCPDataArgs, transID, firstSlave);
1176     reply << (uint8_t) err;
1177 #ifdef BRM_VERBOSE
1178     cerr << "WorkerComm: do_setLocalHWM() err code is " << err << endl;
1179 #endif
1180 
1181     if (!standalone)
1182         master.write(reply);
1183 
1184     doSaveDelta = true;
1185 }
1186 
do_bulkUpdateDBRoot(ByteStream & msg)1187 void SlaveComm::do_bulkUpdateDBRoot(ByteStream& msg)
1188 {
1189     vector<BulkUpdateDBRootArg> args;
1190     ByteStream reply;
1191     int err;
1192 
1193     deserializeInlineVector(msg, args);
1194     err = slave->bulkUpdateDBRoot(args);
1195     reply << (uint8_t) err;
1196 
1197     if (!standalone)
1198         master.write(reply);
1199 
1200     doSaveDelta = true;
1201 }
1202 
do_markInvalid(ByteStream & msg)1203 void SlaveComm::do_markInvalid(ByteStream& msg)
1204 {
1205     LBID_t lbid;
1206     uint32_t colDataType;
1207     int err;
1208     ByteStream reply;
1209 
1210 #ifdef BRM_VERBOSE
1211     cerr << "WorkerComm: do_markInvalid()" << endl;
1212 #endif
1213 
1214     msg >> lbid;
1215     msg >> colDataType;
1216 
1217     if (printOnly)
1218     {
1219         cout << "markExtentInvalid: lbid=" << lbid << "colDataType=" << colDataType << endl;
1220         return;
1221     }
1222 
1223     err = slave->markExtentInvalid(lbid, (execplan::CalpontSystemCatalog::ColDataType)colDataType);
1224     reply << (uint8_t)err;
1225 #ifdef BRM_VERBOSE
1226     cerr << "WorkerComm: do_markInvalid() err code is " << err << endl;
1227 #endif
1228 
1229     if (!standalone)
1230         master.write(reply);
1231 
1232     doSaveDelta = true;
1233 }
1234 
do_markManyExtentsInvalid(ByteStream & msg)1235 void SlaveComm::do_markManyExtentsInvalid(ByteStream& msg)
1236 {
1237     uint64_t tmp64;
1238     uint32_t colDataType;
1239     int err;
1240     ByteStream reply;
1241     vector<LBID_t> lbids;
1242     vector<execplan::CalpontSystemCatalog::ColDataType> colDataTypes;
1243     uint32_t size, i;
1244 
1245 #ifdef BRM_VERBOSE
1246     cerr << "WorkerComm: do_markManyExtentsInvalid()" << endl;
1247 #endif
1248 
1249     msg >> size;
1250 
1251     if (printOnly)
1252         cout << "markManyExtentsInvalid: size=" << size << " lbids..." << endl;
1253 
1254     for (i = 0; i < size; ++i)
1255     {
1256         msg >> tmp64;
1257         msg >> colDataType;
1258         lbids.push_back(tmp64);
1259         colDataTypes.push_back((execplan::CalpontSystemCatalog::ColDataType)colDataType);
1260 
1261         if (printOnly)
1262             cout << "   " << tmp64 << " " << colDataType << endl;
1263     }
1264 
1265     if (printOnly)
1266         return;
1267 
1268     err = slave->markExtentsInvalid(lbids, colDataTypes);
1269     reply << (uint8_t)err;
1270 #ifdef BRM_VERBOSE
1271     cerr << "WorkerComm: do_markManyExtentsInvalid() err code is " << err << endl;
1272 #endif
1273 
1274     if (!standalone)
1275         master.write(reply);
1276 
1277     doSaveDelta = true;
1278 }
1279 
do_setExtentMaxMin(ByteStream & msg)1280 void SlaveComm::do_setExtentMaxMin(ByteStream& msg)
1281 {
1282     LBID_t lbid;
1283     int64_t max;
1284     int64_t min;
1285     int32_t sequence;
1286     uint64_t tmp64;
1287     uint32_t tmp32;
1288     int err;
1289     ByteStream reply;
1290 
1291 #ifdef BRM_VERBOSE
1292     cerr << "WorkerComm: do_setExtentMaxMin()" << endl;
1293 #endif
1294 
1295     msg >> tmp64;
1296     lbid = tmp64;
1297 
1298     msg >> tmp64;
1299     max = tmp64;
1300 
1301     msg >> tmp64;
1302     min = tmp64;
1303 
1304     msg >> tmp32;
1305     sequence = tmp32;
1306 
1307     if (printOnly)
1308     {
1309         cout << "setExtentMaxMin: lbid=" << lbid << " max=" << max << " min=" << min <<
1310              " sequence=" << sequence << endl;
1311         return;
1312     }
1313 
1314     err = slave->setExtentMaxMin(lbid, max, min, sequence, firstSlave);
1315     reply << (uint8_t)err;
1316 #ifdef BRM_VERBOSE
1317     cerr << "WorkerComm: do_setExtentMaxMin() err code is " << err << endl;
1318 #endif
1319 
1320     if (!standalone)
1321         master.write(reply);
1322 
1323     doSaveDelta = true;
1324 }
1325 
1326 // @bug 1970 - added do_setExtentsMaxMin to set multiple extents CP info.
do_setExtentsMaxMin(ByteStream & msg)1327 void SlaveComm::do_setExtentsMaxMin(ByteStream& msg)
1328 {
1329     LBID_t lbid;
1330     uint64_t tmp64;
1331     uint32_t tmp32;
1332     int err;
1333     ByteStream reply;
1334     int32_t updateCount;
1335 
1336 #ifdef BRM_VERBOSE
1337     cerr << "WorkerComm: do_setExtentsMaxMin()" << endl;
1338 #endif
1339 
1340     msg >> tmp32;
1341     updateCount = tmp32;
1342     CPMaxMinMap_t cpMap;
1343     CPMaxMin cpMaxMin;
1344 
1345     if (printOnly)
1346         cout << "setExtentsMaxMin: size=" << updateCount << " CPdata..." << endl;
1347 
1348     // Loop through extents and add each one to a map.
1349     for (int64_t i = 0; i < updateCount; i++)
1350     {
1351         msg >> tmp64;
1352         lbid = tmp64;
1353 
1354         msg >> tmp64;
1355         cpMaxMin.max = tmp64;
1356 
1357         msg >> tmp64;
1358         cpMaxMin.min = tmp64;
1359 
1360         msg >> tmp32;
1361         cpMaxMin.seqNum = tmp32;
1362 
1363         cpMap[lbid] = cpMaxMin;
1364 
1365         if (printOnly)
1366             cout << "   lbid=" << lbid << " max=" << cpMaxMin.max << " min=" <<
1367                  cpMaxMin.min << " sequenceNum=" << cpMaxMin.seqNum << endl;
1368     }
1369 
1370     if (printOnly)
1371         return;
1372 
1373     err = slave->setExtentsMaxMin(cpMap, firstSlave);
1374     reply << (uint8_t)err;
1375 #ifdef BRM_VERBOSE
1376     cerr << "WorkerComm: do_setExtentsMaxMin() err code is " << err << endl;
1377 #endif
1378 
1379     if (!standalone)
1380         master.write(reply);
1381 
1382     doSaveDelta = true;
1383 }
1384 
1385 //------------------------------------------------------------------------------
1386 // @bug 2117 - added do_mergeExtentsMaxMin to merge multiple extents CP info.
1387 //------------------------------------------------------------------------------
do_mergeExtentsMaxMin(ByteStream & msg)1388 void SlaveComm::do_mergeExtentsMaxMin(ByteStream& msg)
1389 {
1390     LBID_t     startLbid;
1391     uint64_t   tmp64;
1392     uint32_t   tmp32;
1393     int        err;
1394     ByteStream reply;
1395     int32_t    mergeCount;
1396 
1397 #ifdef BRM_VERBOSE
1398     cerr << "WorkerComm: do_mergeExtentsMaxMin()" << endl;
1399 #endif
1400 
1401     msg >> tmp32;
1402     mergeCount = tmp32;
1403     CPMaxMinMergeMap_t cpMap;
1404     CPMaxMinMerge      cpMaxMin;
1405 
1406     if (printOnly)
1407         cout << "mergeExtentsMaxMin: size=" << mergeCount << " CPdata..." << endl;
1408 
1409     // Loop through extents and add each one to a map.
1410     for (int64_t i = 0; i < mergeCount; i++)
1411     {
1412         msg >> tmp64;
1413         startLbid = tmp64;
1414 
1415         msg >> tmp64;
1416         cpMaxMin.max = tmp64;
1417 
1418         msg >> tmp64;
1419         cpMaxMin.min = tmp64;
1420 
1421         msg >> tmp32;
1422         cpMaxMin.seqNum = tmp32;
1423 
1424         msg >> tmp32;
1425         cpMaxMin.type = (execplan::CalpontSystemCatalog::ColDataType)tmp32;
1426 
1427         msg >> tmp32;
1428         cpMaxMin.newExtent = tmp32;
1429 
1430         cpMap[startLbid] = cpMaxMin;
1431 
1432         if (printOnly)
1433             cout << "   startLBID=" << startLbid << " max=" << cpMaxMin.max << " min=" <<
1434                  cpMaxMin.min << " sequenceNum=" << cpMaxMin.seqNum << " type=" << (int)
1435                  cpMaxMin.type << " newExtent=" << (int) cpMaxMin.newExtent << endl;
1436     }
1437 
1438     if (printOnly)
1439         return;
1440 
1441     err = slave->mergeExtentsMaxMin(cpMap);
1442     reply << (uint8_t)err;
1443 #ifdef BRM_VERBOSE
1444     cerr << "WorkerComm: do_mergeExtentsMaxMin() err code is " << err << endl;
1445 #endif
1446 
1447     if (!standalone)
1448         master.write(reply);
1449 
1450     doSaveDelta = true;
1451 }
1452 
1453 //------------------------------------------------------------------------------
1454 // Delete all extents for the specified OID(s) and partition number.
1455 //------------------------------------------------------------------------------
do_deletePartition(ByteStream & msg)1456 void SlaveComm::do_deletePartition(ByteStream& msg)
1457 {
1458     OID_t      oid;
1459     uint32_t   tmp32;
1460     int        err;
1461     ByteStream reply;
1462     uint32_t   size;
1463     std::set<OID_t> oids;
1464 
1465 #ifdef BRM_VERBOSE
1466     cerr << "WorkerComm: do_deletePartition()" << endl;
1467 #endif
1468 
1469     set<LogicalPartition> partitionNums;
1470     deserializeSet<LogicalPartition>(msg, partitionNums);
1471 
1472     msg >> size;
1473 
1474     if (printOnly)
1475     {
1476         cout << "deletePartition: partitionNum: ";
1477         set<LogicalPartition>::const_iterator it;
1478 
1479         for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
1480             cout << (*it) << " ";
1481 
1482         cout << "\nsize=" << size << " oids..." << endl;
1483     }
1484 
1485     for (unsigned i = 0; i < size; i++)
1486     {
1487         msg >> tmp32;
1488         oid = tmp32;
1489         oids.insert( oid );
1490 
1491         if (printOnly)
1492             cout << "   " << oid << endl;
1493     }
1494 
1495     if (printOnly)
1496         return;
1497 
1498     string emsg;
1499     err = slave->deletePartition(oids, partitionNums, emsg);
1500     reply << (uint8_t) err;
1501 
1502     if (err != 0)
1503         reply << emsg;
1504 
1505 #ifdef BRM_VERBOSE
1506     cerr << "WorkerComm: do_deletePartition() err code is " << err << endl;
1507 #endif
1508 
1509     if (!standalone)
1510         master.write(reply);
1511 
1512     doSaveDelta = true;
1513 }
1514 
1515 //------------------------------------------------------------------------------
1516 // Mark all extents as out of service, for the specified OID(s) and partition
1517 // number.
1518 //------------------------------------------------------------------------------
do_markPartitionForDeletion(ByteStream & msg)1519 void SlaveComm::do_markPartitionForDeletion(ByteStream& msg)
1520 {
1521     OID_t      oid;
1522     uint32_t   tmp32;
1523     int        err;
1524     ByteStream reply;
1525     uint32_t   size;
1526     std::set<OID_t> oids;
1527 
1528 #ifdef BRM_VERBOSE
1529     cerr << "WorkerComm: do_markPartitionForDeletion()" << endl;
1530 #endif
1531 
1532     set<LogicalPartition> partitionNums;
1533     deserializeSet<LogicalPartition>(msg, partitionNums);
1534     msg >> size;
1535 
1536     if (printOnly)
1537     {
1538         cout << "markPartitionForDeletion: partitionNum: ";
1539         set<LogicalPartition>::const_iterator it;
1540 
1541         for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
1542             cout << (*it) << " ";
1543 
1544         cout << "\nsize=" << size << " oids..." << endl;
1545     }
1546 
1547     for (unsigned i = 0; i < size; i++)
1548     {
1549         msg >> tmp32;
1550         oid = tmp32;
1551         oids.insert( oid );
1552 
1553         if (printOnly)
1554             cout << "   " << oid << endl;
1555     }
1556 
1557     if (printOnly)
1558         return;
1559 
1560     string emsg;
1561     err = slave->markPartitionForDeletion(oids, partitionNums, emsg);
1562     reply << (uint8_t) err;
1563 
1564     if (err != 0)
1565         reply << emsg;
1566 
1567 #ifdef BRM_VERBOSE
1568     cerr << "WorkerComm: do_markPartitionforDeletion() err code is " <<
1569          err << endl;
1570 #endif
1571 
1572     if (!standalone)
1573         master.write(reply);
1574 
1575     doSaveDelta = true;
1576 }
1577 
1578 //------------------------------------------------------------------------------
1579 // Mark all extents as out of service, for the specified OID(s) and partition
1580 // number.
1581 //------------------------------------------------------------------------------
do_markAllPartitionForDeletion(ByteStream & msg)1582 void SlaveComm::do_markAllPartitionForDeletion(ByteStream& msg)
1583 {
1584     OID_t      oid;
1585     uint32_t   tmp32;
1586     int        err;
1587     ByteStream reply;
1588     uint32_t   size;
1589     std::set<OID_t> oids;
1590 
1591 #ifdef BRM_VERBOSE
1592     cerr << "WorkerComm: do_markAllPartitionForDeletion()" << endl;
1593 #endif
1594     msg >> size;
1595 
1596     if (printOnly)
1597         cout << "markAllPartitionForDeletion: size="
1598              << size << " oids..." << endl;
1599 
1600     for (unsigned i = 0; i < size; i++)
1601     {
1602         msg >> tmp32;
1603         oid = tmp32;
1604         oids.insert( oid );
1605 
1606         if (printOnly)
1607             cout << "   " << oid << endl;
1608     }
1609 
1610     if (printOnly)
1611         return;
1612 
1613     err = slave->markAllPartitionForDeletion(oids);
1614     reply << (uint8_t) err;
1615 #ifdef BRM_VERBOSE
1616     cerr << "WorkerComm: do_markAllPartitionforDeletion() err code is " <<
1617          err << endl;
1618 #endif
1619 
1620     if (!standalone)
1621         master.write(reply);
1622 
1623     doSaveDelta = true;
1624 }
1625 
1626 //------------------------------------------------------------------------------
1627 // Restore all extents for the specified OID(s) and partition number.
1628 //------------------------------------------------------------------------------
do_restorePartition(ByteStream & msg)1629 void SlaveComm::do_restorePartition(ByteStream& msg)
1630 {
1631     OID_t      oid;
1632     uint32_t   tmp32;
1633     int        err;
1634     ByteStream reply;
1635     uint32_t   size;
1636     std::set<OID_t> oids;
1637 
1638 #ifdef BRM_VERBOSE
1639     cerr << "WorkerComm: do_restorePartition()" << endl;
1640 #endif
1641 
1642     set<LogicalPartition> partitionNums;
1643     deserializeSet<LogicalPartition>(msg, partitionNums);
1644 
1645     msg >> size;
1646 
1647     if (printOnly)
1648     {
1649         cout << "restorePartition: partitionNum: ";
1650         set<LogicalPartition>::const_iterator it;
1651 
1652         for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
1653             cout << (*it) << " ";
1654 
1655         cout << "\nsize=" << size << " oids..." << endl;
1656     }
1657 
1658     for (unsigned i = 0; i < size; i++)
1659     {
1660         msg >> tmp32;
1661         oid = tmp32;
1662         oids.insert( oid );
1663 
1664         if (printOnly)
1665             cout << "   " << oid << endl;
1666     }
1667 
1668     if (printOnly)
1669         return;
1670 
1671     string emsg;
1672     err = slave->restorePartition(oids, partitionNums, emsg);
1673     reply << (uint8_t) err;
1674 
1675     if (err != 0)
1676         reply << emsg;
1677 
1678 #ifdef BRM_VERBOSE
1679     cerr << "WorkerComm: do_restorePartition() err code is " << err << endl;
1680 #endif
1681 
1682     if (!standalone)
1683         master.write(reply);
1684 
1685     doSaveDelta = true;
1686 }
1687 
1688 //------------------------------------------------------------------------------
1689 // Delete all extents for the specified dbroot
1690 //------------------------------------------------------------------------------
do_deleteDBRoot(ByteStream & msg)1691 void SlaveComm::do_deleteDBRoot(ByteStream& msg)
1692 {
1693     int        err;
1694     ByteStream reply;
1695     uint32_t   q;
1696     uint16_t   dbroot;
1697 
1698 #ifdef BRM_VERBOSE
1699     cerr << "WorkerComm: do_deleteDBroot()" << endl;
1700 #endif
1701 
1702     msg >> q;
1703     dbroot = static_cast<uint16_t>(q);
1704 
1705     if (printOnly)
1706     {
1707         cout << "deleteDBRoot: " << dbroot << endl;
1708         return;
1709     }
1710 
1711     err = slave->deleteDBRoot(dbroot);
1712     reply << (uint8_t) err;
1713 
1714 #ifdef BRM_VERBOSE
1715     cerr << "WorkerComm: do_deleteDBRoot() err code is " << err << endl;
1716 #endif
1717 
1718     if (!standalone)
1719         master.write(reply);
1720 
1721     doSaveDelta = true;
1722 }
1723 
do_writeVBEntry(ByteStream & msg)1724 void SlaveComm::do_writeVBEntry(ByteStream& msg)
1725 {
1726     VER_t transID;
1727     LBID_t lbid;
1728     OID_t vbOID;
1729     uint32_t vbFBO, tmp;
1730     uint64_t tmp64;
1731     int err;
1732     ByteStream reply;
1733 
1734 #ifdef BRM_VERBOSE
1735     cerr << "WorkerComm: do_writeVBEntry()" << endl;
1736 #endif
1737 
1738     msg >> tmp;
1739     transID = tmp;
1740     msg >> tmp64;
1741     lbid = tmp64;
1742     msg >> tmp;
1743     vbOID = tmp;
1744     msg >> vbFBO;
1745 
1746     if (printOnly)
1747     {
1748         cout << "writeVBEntry: transID=" << transID << " lbid=" << lbid << " vbOID=" <<
1749              vbOID << " vbFBO=" << vbFBO << endl;
1750         return;
1751     }
1752 
1753     err = slave->writeVBEntry(transID, lbid, vbOID, vbFBO);
1754     reply << (uint8_t) err;
1755 #ifdef BRM_VERBOSE
1756     cerr << "WorkerComm: do_writeVBEntry() err code is " << err << endl;
1757 #endif
1758 
1759     if (!standalone)
1760         master.write(reply);
1761 
1762     doSaveDelta = true;
1763 }
1764 
do_bulkWriteVBEntry(ByteStream & msg)1765 void SlaveComm::do_bulkWriteVBEntry(ByteStream& msg)
1766 {
1767     VER_t transID;
1768     std::vector<BRM::LBID_t> lbids;
1769     OID_t vbOID;
1770     std::vector<uint32_t> vbFBOs;
1771     uint32_t tmp;
1772     int err;
1773     ByteStream reply;
1774 
1775 #ifdef BRM_VERBOSE
1776     cerr << "WorkerComm: do_bulkWriteVBEntry()" << endl;
1777 #endif
1778 
1779     msg >> tmp;
1780     transID = tmp;
1781     deserializeInlineVector(msg, lbids);
1782     msg >> tmp;
1783     vbOID = tmp;
1784     deserializeInlineVector(msg, vbFBOs);
1785 
1786     if (printOnly)
1787     {
1788         cout << "bulkWriteVBEntry: transID=" << transID << endl;
1789 
1790         for (size_t i = 0; i < lbids.size(); i++)
1791             cout << "bulkWriteVBEntry arg " << i + 1 << ": lbid=" << lbids[i] << " vbOID=" <<
1792                  vbOID << " vbFBO=" << vbFBOs[i] << endl;
1793         return;
1794     }
1795 
1796     err = slave->bulkWriteVBEntry(transID, lbids, vbOID, vbFBOs);
1797     reply << (uint8_t) err;
1798 #ifdef BRM_VERBOSE
1799     cerr << "WorkerComm: do_bulkWriteVBEntry() err code is " << err << endl;
1800 #endif
1801 
1802     if (!standalone)
1803         master.write(reply);
1804 
1805     doSaveDelta = true;
1806 }
1807 
do_beginVBCopy(ByteStream & msg)1808 void SlaveComm::do_beginVBCopy(ByteStream& msg)
1809 {
1810     VER_t transID;
1811     LBIDRange_v ranges;
1812     VBRange_v freeList;
1813     uint32_t tmp32;
1814     uint16_t dbRoot;
1815     int err;
1816     ByteStream reply;
1817 
1818 #ifdef BRM_VERBOSE
1819     cerr << "WorkerComm: do_beginVBCopy()" << endl;
1820 #endif
1821 
1822     msg >> tmp32;
1823     transID = tmp32;
1824     msg >> dbRoot;
1825     deserializeVector(msg, ranges);
1826 
1827     if (printOnly)
1828     {
1829         cout << "beginVBCopy: transID=" << transID << " dbRoot=" << dbRoot << " size="
1830              << ranges.size() <<	" ranges..." << endl;
1831 
1832         for (uint32_t i = 0; i < ranges.size(); i++)
1833             cout << "   start=" << ranges[i].start << " size=" << ranges[i].size << endl;
1834 
1835         return;
1836     }
1837 
1838     err = slave->beginVBCopy(transID, dbRoot, ranges, freeList, firstSlave && !standalone);
1839     reply << (uint8_t) err;
1840 
1841     if (err == ERR_OK)
1842         serializeVector(reply, freeList);
1843 
1844 #ifdef BRM_VERBOSE
1845     cerr << "WorkerComm: do_beginVBCopy() err code is " << err << endl;
1846 #endif
1847 
1848     if (!standalone)
1849         master.write(reply);
1850 
1851     doSaveDelta = true;
1852 }
1853 
do_endVBCopy(ByteStream & msg)1854 void SlaveComm::do_endVBCopy(ByteStream& msg)
1855 {
1856     VER_t transID;
1857     LBIDRange_v ranges;
1858     uint32_t tmp;
1859     int err;
1860     ByteStream reply;
1861 
1862 #ifdef BRM_VERBOSE
1863     cerr << "WorkerComm: do_endVBCopy()" << endl;
1864 #endif
1865 
1866     msg >> tmp;
1867     transID = tmp;
1868     deserializeVector(msg, ranges);
1869 
1870     if (printOnly)
1871     {
1872         cout << "endVBCopy: transID=" << transID << " size=" << ranges.size() <<
1873              " ranges..." << endl;
1874 
1875         for (uint32_t i = 0; i < ranges.size(); i++)
1876             cout << "   start=" << ranges[i].start << " size=" << ranges[i].size << endl;
1877 
1878         return;
1879     }
1880 
1881     err = slave->endVBCopy(transID, ranges);
1882     reply << (uint8_t) err;
1883 #ifdef BRM_VERBOSE
1884     cerr << "WorkerComm: do_endVBCopy() err code is " << err << endl;
1885 #endif
1886 
1887     if (!standalone)
1888         master.write(reply);
1889 
1890     doSaveDelta = true;
1891 }
1892 
do_vbRollback1(ByteStream & msg)1893 void SlaveComm::do_vbRollback1(ByteStream& msg)
1894 {
1895     VER_t transID;
1896     LBIDRange_v lbidList;
1897     uint32_t tmp;
1898     int err;
1899     ByteStream reply;
1900 
1901 #ifdef BRM_VERBOSE
1902     cerr << "WorkerComm: do_vbRollback1()" << endl;
1903 #endif
1904 
1905     msg >> tmp;
1906     transID = tmp;
1907     deserializeVector(msg, lbidList);
1908 
1909     if (printOnly)
1910     {
1911         cout << "vbRollback1: transID=" << transID << " size=" << lbidList.size() <<
1912              " lbids..." << endl;
1913 
1914         for (uint32_t i = 0; i < lbidList.size(); i++)
1915             cout << "   start=" << lbidList[i].start << " size=" << lbidList[i].size << endl;
1916 
1917         return;
1918     }
1919 
1920     err = slave->vbRollback(transID, lbidList, firstSlave && !standalone);
1921     reply << (uint8_t) err;
1922 #ifdef BRM_VERBOSE
1923     cerr << "WorkerComm: do_vbRollback1() err code is " << err << endl;
1924 #endif
1925 
1926     if (!standalone)
1927         master.write(reply);
1928 
1929     doSaveDelta = true;
1930 }
1931 
do_vbRollback2(ByteStream & msg)1932 void SlaveComm::do_vbRollback2(ByteStream& msg)
1933 {
1934     VER_t transID;
1935     vector<LBID_t> lbidList;
1936     uint32_t tmp;
1937     int err;
1938     ByteStream reply;
1939 
1940 #ifdef BRM_VERBOSE
1941     cerr << "WorkerComm: do_vbRollback2()" << endl;
1942 #endif
1943 
1944     msg >> tmp;
1945     transID = tmp;
1946     deserializeVector(msg, lbidList);
1947 
1948     if (printOnly)
1949     {
1950         cout << "vbRollback2: transID=" << transID << " size=" << lbidList.size() <<
1951              " lbids..." << endl;
1952 
1953         for (uint32_t i = 0; i < lbidList.size(); i++)
1954             cout << "   " << lbidList[i] << endl;
1955 
1956         return;
1957     }
1958 
1959     err = slave->vbRollback(transID, lbidList, firstSlave && !standalone);
1960     reply << (uint8_t) err;
1961 #ifdef BRM_VERBOSE
1962     cerr << "WorkerComm: do_vbRollback2() err code is " << err << endl;
1963 #endif
1964 
1965     if (!standalone)
1966         master.write(reply);
1967 
1968     doSaveDelta = true;
1969 }
1970 
do_vbCommit(ByteStream & msg)1971 void SlaveComm::do_vbCommit(ByteStream& msg)
1972 {
1973     VER_t transID;
1974     uint32_t tmp;
1975     int err;
1976     ByteStream reply;
1977 
1978 #ifdef BRM_VERBOSE
1979     cerr << "WorkerComm: do_vbCommit()" << endl;
1980 #endif
1981 
1982     msg >> tmp;
1983     transID = tmp;
1984 
1985     if (printOnly)
1986     {
1987         cout << "vbCommit: transID=" << transID << endl;
1988         return;
1989     }
1990 
1991     err = slave->vbCommit(transID);
1992     reply << (uint8_t) err;
1993 #ifdef BRM_VERBOSE
1994     cerr << "WorkerComm: do_vbCommit() err code is " << err << endl;
1995 #endif
1996 
1997     if (!standalone)
1998         master.write(reply);
1999 
2000     doSaveDelta = true;
2001 }
2002 
do_undo()2003 void SlaveComm::do_undo()
2004 {
2005 #ifdef BRM_VERBOSE
2006     cerr << "WorkerComm: do_undo()" << endl;
2007 #endif
2008 
2009     if (printOnly)
2010     {
2011         cout << "undoChanges" << endl;
2012         return;
2013     }
2014 
2015     slave->undoChanges();
2016     takeSnapshot = false;
2017     doSaveDelta = false;
2018 }
2019 
do_confirm()2020 void SlaveComm::do_confirm()
2021 {
2022 #ifdef BRM_VERBOSE
2023     cerr << "WorkerComm: do_confirm()" << endl;
2024 #endif
2025 
2026     if (printOnly)
2027     {
2028         cout << "confirmChanges" << endl;
2029         return;
2030     }
2031 
2032     if (firstSlave && doSaveDelta && (journalCount < snapshotInterval || snapshotInterval < 0))
2033     {
2034         doSaveDelta = false;
2035         saveDelta();
2036     }
2037 
2038     slave->confirmChanges();
2039 
2040     string tmp = savefile + "_current";
2041 
2042     if (firstSlave && (takeSnapshot ||
2043                        (journalCount >= snapshotInterval && snapshotInterval >= 0)))
2044     {
2045         if (!currentSaveFile)
2046         {
2047             currentSaveFile = IDBDataFile::open(
2048                                   IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0);
2049         }
2050 
2051         if (currentSaveFile == NULL)
2052         {
2053             ostringstream os;
2054             os << "WorkerComm: failed to open the current savefile. errno: "
2055                << strerror(errno);
2056             log(os.str());
2057             throw runtime_error(os.str());
2058         }
2059 
2060 
2061         tmp = savefile + (saveFileToggle ? 'A' : 'B');
2062         slave->saveState(tmp);
2063 #ifndef _MSC_VER
2064         tmp += '\n';
2065 #endif
2066         int err = 0;
2067 
2068         // MCOL-1558.  Make the _current file relative to DBRMRoot.
2069         string relative = tmp.substr(tmp.find_last_of('/') + 1);
2070         err = currentSaveFile->write(relative.c_str(), relative.length());
2071 
2072         if (err < (int) relative.length())
2073         {
2074             ostringstream os;
2075             os  << "WorkerComm: currentfile write() returned " << err
2076                 << " file pointer is " << currentSaveFile;
2077 
2078             if (err < 0)
2079                 os << " errno: " << strerror(errno);
2080 
2081             log(os.str());
2082         }
2083 
2084         currentSaveFile->flush();
2085         delete currentSaveFile;
2086         currentSaveFile = NULL;
2087         saveFileToggle = !saveFileToggle;
2088 
2089         delete journalh;
2090         journalh = IDBDataFile::open(
2091                        IDBPolicy::getType(journalName.c_str(), IDBPolicy::WRITEENG), journalName.c_str(), "w+b", 0);
2092 
2093         if (!journalh)
2094             throw runtime_error("Could not open the BRM journal for writing!");
2095 
2096         takeSnapshot = false;
2097         doSaveDelta = false;
2098         journalCount = 0;
2099     }
2100 }
2101 
do_flushInodeCache()2102 void SlaveComm::do_flushInodeCache()
2103 {
2104     ByteStream reply;
2105 
2106 #ifdef BRM_VERBOSE
2107     cerr << "WorkerComm: do_flushInodeCache()" << endl;
2108 #endif
2109 
2110     if (printOnly)
2111     {
2112         cout << "flushInodeCache" << endl;
2113         return;
2114     }
2115 
2116 #ifdef __linux__
2117 #ifdef USE_VERY_COMPLEX_DROP_CACHES
2118     double elapsedTime = 0.0;
2119     char   msgChString[100];
2120     struct timespec tm1, tm2;
2121     clock_gettime(CLOCK_REALTIME, &tm1);
2122 
2123     int fd;
2124     fd = open("/proc/sys/vm/drop_caches", O_WRONLY);
2125 
2126     if (fd >= 0)
2127     {
2128         ssize_t writeCnt = write(fd, "3\n", 2);
2129         clock_gettime(CLOCK_REALTIME, &tm2);
2130         timespec_sub(tm1, tm2, elapsedTime);
2131 
2132         if (writeCnt == 2)
2133         {
2134             snprintf(msgChString, sizeof(msgChString),
2135                      "WorkerNode updating drop_caches to flush cache; "
2136                      "elapsedTime-%f sec", elapsedTime);
2137             log(string(msgChString),
2138                 logging::LOG_TYPE_DEBUG);
2139         }
2140         else
2141         {
2142             snprintf(msgChString, sizeof(msgChString),
2143                      "WorkerNode unable to update drop_caches and flush cache; "
2144                      "elapsedTime-%f sec", elapsedTime);
2145             log_errno(string(msgChString),
2146                       logging::LOG_TYPE_WARNING);
2147         }
2148 
2149         close(fd);
2150     }
2151     else
2152     {
2153         clock_gettime(CLOCK_REALTIME, &tm2);
2154         timespec_sub(tm1, tm2, elapsedTime);
2155         snprintf(msgChString, sizeof(msgChString),
2156                  "WorkerNode unable to open drop_caches and flush cache; "
2157                  "elapsedTime-%f sec", elapsedTime);
2158         log_errno(string(msgChString),
2159                   logging::LOG_TYPE_WARNING);
2160     }
2161 
2162 #else
2163     int fd = -1;
2164 
2165     if ((fd = open("/proc/sys/vm/drop_caches", O_WRONLY)) >= 0)
2166     {
2167         ssize_t written = write(fd, "3\n", 2);
2168         int rc = close(fd);
2169         if ( !written || rc )
2170         {
2171             std::cerr << "Could not write into or close /proc/sys/vm/drop_caches" << std::endl;
2172         }
2173     }
2174 
2175 #endif
2176 #endif
2177     reply << (uint8_t) ERR_OK;
2178 
2179     if (!standalone)
2180         master.write(reply);
2181 }
2182 
do_clear()2183 void SlaveComm::do_clear()
2184 {
2185     int err;
2186     ByteStream reply;
2187 
2188 #ifdef BRM_VERBOSE
2189     cerr << "WorkerComm: do_clear()" << endl;
2190 #endif
2191 
2192     if (printOnly)
2193     {
2194         cout << "clear" << endl;
2195         return;
2196     }
2197 
2198     err = slave->clear();
2199 
2200     if (err)
2201         throw runtime_error("Clear failed.");
2202 
2203     /* This doesn't get confirmed, so we have to save its delta here */
2204     if (firstSlave)
2205         saveDelta();
2206 
2207     reply << (uint8_t) (err == 0 ? ERR_OK : ERR_FAILURE);
2208 
2209     if (!standalone)
2210         master.write(reply);
2211 }
2212 
replayJournal(string prefix)2213 int SlaveComm::replayJournal(string prefix)
2214 {
2215     ByteStream cmd;
2216     uint32_t len;
2217     int ret = 0;
2218 
2219     // @Bug 2667+
2220     // Fix for issue where load_brm was using the journal file from DBRMRoot instead of the one from the command line
2221     // argument.
2222 
2223     // If A or B files are being loaded, strip off the A or B for the journal file name as there is only one journal file.
2224     // For example, if prefix is "/usr/local/mariadb/columnstore/data1/systemFiles/dbrm/BRM_savesA" the journal file name will be
2225     // "/usr/local/mariadb/columnstore/data1/systemFiles/dbrm/BRM_saves_journal".
2226 
2227     string tmp = prefix.substr(prefix.length() - 1);
2228     string fName;
2229 
2230     if ((tmp.compare("A") == 0) || (tmp.compare("B") == 0))
2231     {
2232         fName = prefix.substr(0, prefix.length() - 1) + "_journal";
2233     }
2234 
2235 #ifdef _MSC_VER
2236     else if (tmp == "a" || tmp == "b")
2237         fName = prefix.substr(0, prefix.length() - 1) + "_journal";
2238 
2239 #endif
2240     else
2241     {
2242         fName = prefix + "_journal";
2243     }
2244 
2245     const char* filename = fName.c_str();
2246 
2247     IDBDataFile* journalf = IDBDataFile::open(
2248                                 IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0);
2249 
2250     if (!journalf)
2251     {
2252         cout << "Error opening journal file " << fName << endl;
2253         return -1;
2254     }
2255 
2256     if (journalf->size() == 0)  // empty file, nothing to replay
2257         return 0;
2258 
2259     ssize_t readIn = 0;
2260 
2261     do
2262     {
2263         readIn = journalf->read((char*) &len, sizeof(len));
2264 
2265         if (readIn > 0)
2266         {
2267 
2268             cmd.needAtLeast(len);
2269             readIn = journalf->read((char*) cmd.getInputPtr(), len);
2270             cmd.advanceInputPtr(len);
2271 
2272             try
2273             {
2274                 processCommand(cmd);
2275             }
2276             catch (exception& e)
2277             {
2278                 cout << e.what() << "  Journal replay was incomplete." << endl;
2279                 slave->undoChanges();
2280                 return -1;
2281             }
2282 
2283             slave->confirmChanges();
2284             cmd.restart();
2285             ret++;
2286         }
2287     } while (readIn > 0);
2288 
2289     return ret;
2290 }
2291 
do_takeSnapshot()2292 void SlaveComm::do_takeSnapshot()
2293 {
2294     ByteStream reply;
2295 
2296     if (printOnly)
2297     {
2298         cout << "takeSnapshot" << endl;
2299         return;
2300     }
2301 
2302     takeSnapshot = true;
2303     do_confirm();
2304     reply << (uint8_t) 0;
2305 
2306     if (!standalone)
2307         master.write(reply);
2308 }
2309 
saveDelta()2310 void SlaveComm::saveDelta()
2311 {
2312     try
2313     {
2314         uint32_t len = delta.length();
2315 
2316         journalh->seek(0, SEEK_END);
2317         journalh->write((const char*) &len, sizeof(len));
2318         journalh->write((const char*) delta.buf(), delta.length());
2319         journalh->flush();
2320         journalCount++;
2321     }
2322     catch (exception& e)
2323     {
2324         cerr << "Journal write error: " << e.what() << endl;
2325         throw;
2326     }
2327 }
2328 
printJournal(string prefix)2329 int SlaveComm::printJournal(string prefix)
2330 {
2331     int ret;
2332     printOnly = true;
2333     ret = replayJournal(prefix);
2334     printOnly = false;
2335     return ret;
2336 }
2337 
do_ownerCheck(ByteStream & msg)2338 void SlaveComm::do_ownerCheck(ByteStream& msg)
2339 {
2340     string processName;
2341     uint32_t pid;
2342     ByteStream::byte rb = 0;
2343 
2344     msg >> processName >> pid;
2345     idbassert(msg.length() == 0);
2346 
2347     if (standalone)
2348         return;
2349 
2350     if (processExists(pid, processName))
2351         rb++;
2352 
2353     ByteStream reply;
2354     reply << rb;
2355     master.write(reply);
2356 }
2357 
2358 //FIXME: needs to be refactored along with SessionManagerServer::lookupProcessStatus()
2359 #if defined(__linux__)
processExists(const uint32_t pid,const string & pname)2360 bool SlaveComm::processExists(const uint32_t pid, const string& pname)
2361 {
2362     string stat;
2363     ostringstream procFileName;
2364     ostringstream statProcessField;
2365     ifstream in;
2366     string::size_type pos;
2367     ByteStream reply;
2368     char buf[2048];
2369 
2370     procFileName << "/proc/" << pid << "/stat";
2371     in.open(procFileName.str().c_str());
2372 
2373     if (!in)
2374     {
2375         return false;
2376     }
2377 
2378     statProcessField << "(" << pname << ")";
2379 
2380     in.getline(buf, 1024);
2381     stat = buf;
2382     pos = stat.find(statProcessField.str());
2383 
2384     if (pos == string::npos)
2385     {
2386         return false;
2387     }
2388 
2389     return true;
2390 }
2391 
2392 
2393 #elif defined(_MSC_VER)
2394 //FIXME
processExists(const uint32_t pid,const string & pname)2395 bool SlaveComm::processExists(const uint32_t pid, const string& pname)
2396 {
2397     boost::mutex::scoped_lock lk(fPidMemLock);
2398 
2399     if (!fPids)
2400         fPids = (DWORD*)malloc(fMaxPids * sizeof(DWORD));
2401 
2402     DWORD needed = 0;
2403 
2404     if (EnumProcesses(fPids, fMaxPids * sizeof(DWORD), &needed) == 0)
2405         return false;
2406 
2407     while (needed == fMaxPids * sizeof(DWORD))
2408     {
2409         fMaxPids *= 2;
2410         fPids = (DWORD*)realloc(fPids, fMaxPids * sizeof(DWORD));
2411 
2412         if (EnumProcesses(fPids, fMaxPids * sizeof(DWORD), &needed) == 0)
2413             return false;
2414     }
2415 
2416     DWORD numPids = needed / sizeof(DWORD);
2417 
2418     for (DWORD i = 0; i < numPids; i++)
2419     {
2420         if (fPids[i] == pid)
2421         {
2422             TCHAR szProcessName[MAX_PATH] = TEXT("<unknown>");
2423 
2424             // Get a handle to the process.
2425             HANDLE hProcess = OpenProcess(PROCESS_QUERY_INFORMATION |
2426                                           PROCESS_VM_READ,
2427                                           FALSE, fPids[i]);
2428 
2429             // Get the process name.
2430             if (hProcess != NULL)
2431             {
2432                 HMODULE hMod;
2433                 DWORD cbNeeded;
2434 
2435                 if (EnumProcessModules(hProcess, &hMod, sizeof(hMod), &cbNeeded))
2436                     GetModuleBaseName(hProcess, hMod, szProcessName,
2437                                       sizeof(szProcessName) / sizeof(TCHAR));
2438 
2439                 CloseHandle(hProcess);
2440 
2441                 if (pname == szProcessName)
2442                     return true;
2443             }
2444         }
2445     }
2446 
2447     return false;
2448 }
2449 #elif defined(__FreeBSD__)
2450 //FIXME
processExists(const uint32_t pid,const string & pname)2451 bool SlaveComm::processExists(const uint32_t pid, const string& pname)
2452 {
2453     return false;
2454 }
2455 #else
2456 #error Need to port processExists()
2457 #endif
2458 
do_dmlLockLBIDRanges(ByteStream & msg)2459 void SlaveComm::do_dmlLockLBIDRanges(ByteStream& msg)
2460 {
2461     ByteStream reply;
2462     vector<LBIDRange> ranges;
2463     int txnID;
2464     uint32_t tmp32;
2465     int err;
2466 
2467     deserializeVector<LBIDRange>(msg, ranges);
2468     msg >> tmp32;
2469     assert(msg.length() == 0);
2470     txnID = (int) tmp32;
2471 
2472     if (printOnly)
2473     {
2474         cout << "dmlLockLBIDRanges: transID=" << txnID << " size="
2475              << ranges.size() <<	" ranges..." << endl;
2476 
2477         for (uint32_t i = 0; i < ranges.size(); i++)
2478             cout << "   start=" << ranges[i].start << " size=" << ranges[i].size << endl;
2479 
2480         return;
2481     }
2482 
2483     err = slave->dmlLockLBIDRanges(ranges, txnID);
2484 
2485     reply << (uint8_t) err;
2486 #ifdef BRM_VERBOSE
2487     cerr << "WorkerComm: do_dmlLockLBIDRanges() err code is " << err << endl;
2488 #endif
2489 
2490     if (!standalone)
2491         master.write(reply);
2492 
2493     doSaveDelta = true;
2494 }
2495 
do_dmlReleaseLBIDRanges(ByteStream & msg)2496 void SlaveComm::do_dmlReleaseLBIDRanges(ByteStream& msg)
2497 {
2498     ByteStream reply;
2499     vector<LBIDRange> ranges;
2500     int err;
2501 
2502     deserializeVector<LBIDRange>(msg, ranges);
2503 
2504     if (printOnly)
2505     {
2506         cout << "dmlLockLBIDRanges: size=" << ranges.size() << " ranges..." << endl;
2507 
2508         for (uint32_t i = 0; i < ranges.size(); i++)
2509             cout << "   start=" << ranges[i].start << " size=" << ranges[i].size << endl;
2510 
2511         return;
2512     }
2513 
2514     err = slave->dmlReleaseLBIDRanges(ranges);
2515 
2516     reply << (uint8_t) err;
2517 #ifdef BRM_VERBOSE
2518     cerr << "WorkerComm: do_dmlReleaseLBIDRanges() err code is " << err << endl;
2519 #endif
2520 
2521     if (!standalone)
2522         master.write(reply);
2523 
2524     doSaveDelta = true;
2525 }
2526 
2527 }
2528 
2529 // vim:ts=4 sw=4:
2530