1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /*****************************************************************************
20 * $Id: slavecomm.cpp 1839 2013-02-01 17:42:03Z pleblanc $
21 *
22 ****************************************************************************/
23 #include <unistd.h>
24 #include <iostream>
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <fcntl.h>
28 #include <cstdio>
29 #include <ctime>
30 #ifdef _MSC_VER
31 #include <io.h>
32 #include <psapi.h>
33 #endif
34
35 #include "messagequeue.h"
36 #include "bytestream.h"
37 #include "socketclosed.h"
38 #include "configcpp.h"
39 #include "IDBDataFile.h"
40 #include "IDBPolicy.h"
41
42 #define SLAVECOMM_DLLEXPORT
43 #include "slavecomm.h"
44 #undef SLAVECOMM_DLLEXPORT
45
46 #include "installdir.h"
47
48 using namespace std;
49 using namespace messageqcpp;
50 using namespace idbdatafile;
51
52 namespace
53 {
54 #ifdef USE_VERY_COMPLEX_DROP_CACHES
timespec_sub(const struct timespec & tv1,const struct timespec & tv2,double & tm)55 void timespec_sub(const struct timespec& tv1,
56 const struct timespec& tv2,
57 double& tm)
58 {
59 tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
60 }
61 #endif
62 }
63
64 namespace BRM
65 {
66
SlaveComm(string hostname,SlaveDBRMNode * s)67 SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) :
68 slave(s), currentSaveFile(NULL), journalh(NULL)
69 #ifdef _MSC_VER
70 , fPids(0), fMaxPids(64)
71 #endif
72 {
73 config::Config* config = config::Config::makeConfig();
74 string tmp;
75
76 bool tellUser = true;
77
78 for (;;)
79 {
80 try
81 {
82 server = new MessageQueueServer(hostname);
83 break;
84 }
85 catch (runtime_error& re)
86 {
87 string what = re.what();
88
89 if (what.find("Address already in use") != string::npos)
90 {
91 if (tellUser)
92 {
93 cerr << "Address already in use, retrying..." << endl;
94 tellUser = false;
95 }
96
97 sleep(5);
98 }
99 else
100 {
101 throw;
102 }
103 }
104 }
105
106 string tmpDir = startup::StartUp::tmpDir();
107
108 /* NOTE: this string has to match whatever is designated as the first slave */
109 if (hostname == "DBRM_Worker1")
110 {
111 try
112 {
113 savefile = config->getConfig("SystemConfig", "DBRMRoot");
114 }
115 catch (exception& e)
116 {
117 savefile = tmpDir + "/BRM_SaveFiles";
118 }
119
120 if (savefile == "")
121 savefile = tmpDir + "/BRM_SaveFiles";
122
123 tmp = "";
124
125 try
126 {
127 tmp = config->getConfig("SystemConfig", "DBRMSnapshotInterval");
128 }
129 catch (exception& e) { }
130
131 if (tmp == "")
132 snapshotInterval = 100000;
133 else
134 snapshotInterval = config->fromText(tmp);
135
136 journalCount = 0;
137
138 firstSlave = true;
139 journalName = savefile + "_journal";
140 const char* filename = journalName.c_str();
141
142 journalh = IDBDataFile::open(
143 IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "a", 0);
144 if (journalh == NULL)
145 throw runtime_error("Could not open the BRM journal for writing!");
146 }
147 else
148 {
149 savefile = "";
150 firstSlave = false;
151 }
152
153 takeSnapshot = false;
154 doSaveDelta = false;
155 saveFileToggle = true; // start with the suffix "A" rather than "B". Arbitrary.
156 release = false;
157 die = false;
158 standalone = false;
159 printOnly = false;
160 //@Bug 2258 DBRMTimeOut is default to 20 seconds
161 //@BUG 3189 set timeout to 1 second, don't use config setting
162 // std::string retStr = config->getConfig("SystemConfig", "DBRMTimeOut");
163 // int secondsToWait = config->fromText(retStr);
164 MSG_TIMEOUT.tv_nsec = 0;
165 // if ( secondsToWait > 0 )
166 // MSG_TIMEOUT.tv_sec = secondsToWait;
167 // else
168 MSG_TIMEOUT.tv_sec = 1;
169 }
170
SlaveComm()171 SlaveComm::SlaveComm()
172 : currentSaveFile(NULL), journalh(NULL)
173 #ifdef _MSC_VER
174 , fPids(0), fMaxPids(64)
175 #endif
176 {
177 config::Config* config = config::Config::makeConfig();
178
179 string tmpDir = startup::StartUp::tmpDir();
180
181 try
182 {
183 savefile = config->getConfig("SystemConfig", "DBRMRoot");
184 }
185 catch (exception& e)
186 {
187 savefile = tmpDir + "/BRM_SaveFiles";
188 }
189
190 if (savefile == "")
191 savefile = tmpDir + "/BRM_SaveFiles";
192
193 journalName = savefile + "_journal";
194
195 takeSnapshot = false;
196 doSaveDelta = false;
197 saveFileToggle = true; // start with the suffix "A" rather than "B". Arbitrary.
198 release = false;
199 die = false;
200 firstSlave = false;
201 server = NULL;
202 standalone = true;
203 printOnly = false;
204 slave = new SlaveDBRMNode();
205 }
206
~SlaveComm()207 SlaveComm::~SlaveComm()
208 {
209 delete server;
210 server = NULL;
211
212 if (firstSlave)
213 {
214 delete currentSaveFile;
215 currentSaveFile = NULL;
216 }
217
218 delete journalh;
219 journalh = NULL;
220 }
221
stop()222 void SlaveComm::stop()
223 {
224 die = true;
225 }
226
reset()227 void SlaveComm::reset()
228 {
229 release = true;
230 }
231
run()232 void SlaveComm::run()
233 {
234 ByteStream msg;
235
236 while (!die)
237 {
238 #ifdef BRM_VERBOSE
239 // cerr << "WorkerComm: waiting for a connection" << endl;
240 #endif
241 master = server->accept(&MSG_TIMEOUT);
242
243 while (!die && master.isOpen())
244 {
245 try
246 {
247 msg = master.read(&MSG_TIMEOUT);
248 }
249 catch (SocketClosed& e)
250 {
251 #ifdef BRM_VERBOSE
252 cerr << "WorkerComm: remote closed" << endl;
253 #endif
254 break;
255 }
256 catch (...)
257 {
258 #ifdef BRM_VERBOSE
259 cerr << "WorkerComm: read failed, closing connection" << endl;
260 #endif
261 break;
262 }
263
264 if (release)
265 break;
266
267 if (die) // || msg.length() == 0)
268 break;
269
270 if (msg.length() == 0)
271 continue;
272
273 #ifdef BRM_VERBOSE
274 cerr << "WorkerComm: got a command" << endl;
275 #endif
276
277 try
278 {
279 processCommand(msg);
280 }
281 catch (exception& e)
282 {
283 /*
284 * The error is either that msg was too short (really slow sender possibly),
285 * there was a bigger communication failure, or there was a file IO
286 * error. Closing the connection for now.
287 */
288 cerr << e.what() << endl;
289 do_undo();
290 master.close();
291 }
292 }
293
294 release = false;
295 master.close();
296 }
297
298 #ifdef BRM_VERBOSE
299 cerr << "WorkerComm: exiting..." << endl;
300 #endif
301 }
302
processCommand(ByteStream & msg)303 void SlaveComm::processCommand(ByteStream& msg)
304 {
305 uint8_t cmd;
306
307 if (firstSlave)
308 {
309 msg.peek(cmd);
310
311 if (cmd != CONFIRM)
312 delta = msg;
313 }
314
315 msg >> cmd;
316 #ifdef BRM_VERBOSE
317 cerr << "WorkerComm: command " << (int) cmd << endl;
318 #endif
319
320 switch (cmd)
321 {
322 case CREATE_STRIPE_COLUMN_EXTENTS:
323 do_createStripeColumnExtents(msg);
324 break;
325
326 case CREATE_COLUMN_EXTENT_DBROOT:
327 do_createColumnExtent_DBroot(msg);
328 break;
329
330 case CREATE_COLUMN_EXTENT_EXACT_FILE:
331 do_createColumnExtentExactFile(msg);
332 break;
333
334 case CREATE_DICT_STORE_EXTENT:
335 do_createDictStoreExtent(msg);
336 break;
337
338 case ROLLBACK_COLUMN_EXTENTS_DBROOT:
339 do_rollbackColumnExtents_DBroot(msg);
340 break;
341
342 case ROLLBACK_DICT_STORE_EXTENTS_DBROOT:
343 do_rollbackDictStoreExtents_DBroot(msg);
344 break;
345
346 case DELETE_EMPTY_COL_EXTENTS:
347 do_deleteEmptyColExtents(msg);
348 break;
349
350 case DELETE_EMPTY_DICT_STORE_EXTENTS:
351 do_deleteEmptyDictStoreExtents(msg);
352 break;
353
354 case DELETE_OID:
355 do_deleteOID(msg);
356 break;
357
358 case DELETE_OIDS:
359 do_deleteOIDs(msg);
360 break;
361
362 case SET_LOCAL_HWM:
363 do_setLocalHWM(msg);
364 break;
365
366 case BULK_SET_HWM:
367 do_bulkSetHWM(msg);
368 break;
369
370 case BULK_SET_HWM_AND_CP:
371 do_bulkSetHWMAndCP(msg);
372 break;
373
374 case WRITE_VB_ENTRY:
375 do_writeVBEntry(msg);
376 break;
377
378 case BULK_WRITE_VB_ENTRY:
379 do_bulkWriteVBEntry(msg);
380 break;
381
382 case BEGIN_VB_COPY:
383 do_beginVBCopy(msg);
384 break;
385
386 case END_VB_COPY:
387 do_endVBCopy(msg);
388 break;
389
390 case VB_ROLLBACK1:
391 do_vbRollback1(msg);
392 break;
393
394 case VB_ROLLBACK2:
395 do_vbRollback2(msg);
396 break;
397
398 case VB_COMMIT:
399 do_vbCommit(msg);
400 break;
401
402 case BRM_UNDO:
403 do_undo();
404 break;
405
406 case CONFIRM:
407 do_confirm();
408 break;
409
410 case FLUSH_INODE_CACHES:
411 do_flushInodeCache();
412 break;
413
414 case BRM_CLEAR:
415 do_clear();
416 break;
417
418 case MARKEXTENTINVALID:
419 do_markInvalid(msg);
420 break;
421
422 case MARKMANYEXTENTSINVALID:
423 do_markManyExtentsInvalid(msg);
424 break;
425
426 case SETEXTENTMAXMIN:
427 do_setExtentMaxMin(msg);
428 break;
429
430 case SETMANYEXTENTSMAXMIN:
431 do_setExtentsMaxMin(msg);
432 break;
433
434 case TAKE_SNAPSHOT:
435 do_takeSnapshot();
436 break;
437
438 case MERGEMANYEXTENTSMAXMIN:
439 do_mergeExtentsMaxMin(msg);
440 break;
441
442 case DELETE_PARTITION:
443 do_deletePartition(msg);
444 break;
445
446 case MARK_PARTITION_FOR_DELETION:
447 do_markPartitionForDeletion(msg);
448 break;
449
450 case MARK_ALL_PARTITION_FOR_DELETION:
451 do_markAllPartitionForDeletion(msg);
452 break;
453
454 case RESTORE_PARTITION:
455 do_restorePartition(msg);
456 break;
457
458 case OWNER_CHECK:
459 do_ownerCheck(msg);
460 break;
461
462 case LOCK_LBID_RANGES:
463 do_dmlLockLBIDRanges(msg);
464 break;
465
466 case RELEASE_LBID_RANGES:
467 do_dmlReleaseLBIDRanges(msg);
468 break;
469
470 case DELETE_DBROOT:
471 do_deleteDBRoot(msg);
472 break;
473
474 case BULK_UPDATE_DBROOT:
475 do_bulkUpdateDBRoot(msg);
476 break;
477
478 default:
479 cerr << "WorkerComm: unknown command " << (int) cmd << endl;
480 }
481 }
482
483 //------------------------------------------------------------------------------
484 // Process a request to create a column extent for a specific OID and DBRoot.
485 //------------------------------------------------------------------------------
do_createStripeColumnExtents(ByteStream & msg)486 void SlaveComm::do_createStripeColumnExtents(ByteStream& msg)
487 {
488 int err;
489 uint16_t tmp16;
490 uint16_t tmp32;
491 uint16_t dbRoot;
492 uint32_t partitionNum;
493 uint16_t segmentNum;
494 std::vector<CreateStripeColumnExtentsArgIn> cols;
495 std::vector<CreateStripeColumnExtentsArgOut> extents;
496 ByteStream reply;
497
498 #ifdef BRM_VERBOSE
499 cerr << "WorkerComm: do_createStripeColumnExtents()" << endl;
500 #endif
501
502 deserializeInlineVector(msg, cols);
503 msg >> tmp16;
504 dbRoot = tmp16;
505 msg >> tmp32;
506 partitionNum = tmp32;
507
508 if (printOnly)
509 {
510 cout << "createStripeColumnExtents(). " <<
511 "DBRoot=" << dbRoot << "; Part#=" << partitionNum << endl;
512
513 for (uint32_t i = 0; i < cols.size(); i++)
514 cout << "StripeColExt arg " << i + 1 <<
515 ": oid=" << cols[i].oid <<
516 " width=" << cols[i].width << endl;
517
518 return;
519 }
520
521 err = slave->createStripeColumnExtents(cols, dbRoot,
522 partitionNum, segmentNum, extents);
523 reply << (uint8_t) err;
524
525 if (err == ERR_OK)
526 {
527 reply << partitionNum;
528 reply << segmentNum;
529 serializeInlineVector( reply, extents );
530 }
531
532 #ifdef BRM_VERBOSE
533 cerr << "WorkerComm: do_createStripeColumnExtents() err code is " <<
534 err << endl;
535 #endif
536
537 if (!standalone)
538 master.write(reply);
539
540 // see bug 3596. Need to make sure a snapshot file exists.
541 if ((cols.size() > 0) && (cols[0].oid < 3000))
542 takeSnapshot = true;
543 else
544 doSaveDelta = true;
545 }
546
547 //------------------------------------------------------------------------------
548 // Process a request to create a column extent for a specific OID and DBRoot.
549 //------------------------------------------------------------------------------
do_createColumnExtent_DBroot(ByteStream & msg)550 void SlaveComm::do_createColumnExtent_DBroot(ByteStream& msg)
551 {
552 int allocdSize, err;
553 uint8_t tmp8;
554 uint16_t tmp16;
555 uint32_t tmp32;
556 OID_t oid;
557 uint32_t colWidth;
558 uint16_t dbRoot;
559 uint32_t partitionNum;
560 uint16_t segmentNum;
561 LBID_t lbid;
562 uint32_t startBlockOffset;
563 ByteStream reply;
564 execplan::CalpontSystemCatalog::ColDataType colDataType;
565 #ifdef BRM_VERBOSE
566 cerr << "WorkerComm: do_createColumnExtent_DBroot()" << endl;
567 #endif
568
569 msg >> tmp32;
570 oid = tmp32;
571 msg >> tmp32;
572 colWidth = tmp32;
573 msg >> tmp16;
574 dbRoot = tmp16;
575 msg >> tmp32;
576 partitionNum = tmp32;
577 msg >> tmp16;
578 segmentNum = tmp16;
579 msg >> tmp8;
580 colDataType = (execplan::CalpontSystemCatalog::ColDataType)tmp8;
581
582 if (printOnly)
583 {
584 cout << "createColumnExtent_DBroot: oid=" << oid <<
585 " colWidth=" << colWidth <<
586 " dbRoot=" << dbRoot <<
587 " partitionNum=" << partitionNum <<
588 " segmentNum=" << segmentNum << endl;
589 return;
590 }
591
592 err = slave->createColumnExtent_DBroot(oid, colWidth, dbRoot, colDataType,
593 partitionNum, segmentNum, lbid, allocdSize, startBlockOffset);
594 reply << (uint8_t) err;
595
596 if (err == ERR_OK)
597 {
598 reply << partitionNum;
599 reply << segmentNum;
600 reply << (uint64_t) lbid;
601 reply << (uint32_t) allocdSize;
602 reply << (uint32_t) startBlockOffset;
603 }
604
605 #ifdef BRM_VERBOSE
606 cerr << "WorkerComm: do_createColumnExtent_DBroot() err code is " <<
607 err << endl;
608 #endif
609
610 if (!standalone)
611 master.write(reply);
612
613 if (oid < 3000) // see bug 3596. Need to make sure a snapshot file exists.
614 takeSnapshot = true;
615 else
616 doSaveDelta = true;
617 }
618
619 //------------------------------------------------------------------------------
620 // Process a request to create a column extent for the exact segment file
621 // specified by the requested OID, DBRoot, partition, and segment.
622 //------------------------------------------------------------------------------
do_createColumnExtentExactFile(ByteStream & msg)623 void SlaveComm::do_createColumnExtentExactFile(ByteStream& msg)
624 {
625 int allocdSize, err;
626 uint8_t tmp8;
627 uint16_t tmp16;
628 uint32_t tmp32;
629 OID_t oid;
630 uint32_t colWidth;
631 uint16_t dbRoot;
632 uint32_t partitionNum;
633 uint16_t segmentNum;
634 LBID_t lbid;
635 uint32_t startBlockOffset;
636 ByteStream reply;
637 execplan::CalpontSystemCatalog::ColDataType colDataType;
638 #ifdef BRM_VERBOSE
639 cerr << "WorkerComm: do_createColumnExtentExactFile()" << endl;
640 #endif
641
642 msg >> tmp32;
643 oid = tmp32;
644 msg >> tmp32;
645 colWidth = tmp32;
646 msg >> tmp16;
647 dbRoot = tmp16;
648 msg >> tmp32;
649 partitionNum = tmp32;
650 msg >> tmp16;
651 segmentNum = tmp16;
652 msg >> tmp8;
653 colDataType = (execplan::CalpontSystemCatalog::ColDataType)tmp8;
654
655 if (printOnly)
656 {
657 cout << "createColumnExtentExactFile: oid=" << oid <<
658 " colWidth=" << colWidth <<
659 " dbRoot=" << dbRoot <<
660 " partitionNum=" << partitionNum <<
661 " segmentNum=" << segmentNum << endl;
662 return;
663 }
664
665 err = slave->createColumnExtentExactFile(oid, colWidth, dbRoot,
666 partitionNum, segmentNum, colDataType, lbid, allocdSize, startBlockOffset);
667 reply << (uint8_t) err;
668
669 if (err == ERR_OK)
670 {
671 reply << partitionNum;
672 reply << segmentNum;
673 reply << (uint64_t) lbid;
674 reply << (uint32_t) allocdSize;
675 reply << (uint32_t) startBlockOffset;
676 }
677
678 #ifdef BRM_VERBOSE
679 cerr << "WorkerComm: do_createColumnExtentExactFile() err code is " <<
680 err << endl;
681 #endif
682
683 if (!standalone)
684 master.write(reply);
685
686 if (oid < 3000) // see bug 3596. Need to make sure a snapshot file exists.
687 takeSnapshot = true;
688 else
689 doSaveDelta = true;
690 }
691
692 //------------------------------------------------------------------------------
693 // Process a request to create a dictionary store extent.
694 //------------------------------------------------------------------------------
do_createDictStoreExtent(ByteStream & msg)695 void SlaveComm::do_createDictStoreExtent(ByteStream& msg)
696 {
697 int allocdSize, err;
698 uint16_t tmp16;
699 uint32_t tmp32;
700 OID_t oid;
701 uint16_t dbRoot;
702 uint32_t partitionNum;
703 uint16_t segmentNum;
704 LBID_t lbid;
705 ByteStream reply;
706 #ifdef BRM_VERBOSE
707 cerr << "WorkerComm: do_createDictStoreExtent()" << endl;
708 #endif
709
710 msg >> tmp32;
711 oid = tmp32;
712 msg >> tmp16;
713 dbRoot = tmp16;
714 msg >> tmp32;
715 partitionNum = tmp32;
716 msg >> tmp16;
717 segmentNum = tmp16;
718
719 if (printOnly)
720 {
721 cout << "createDictStoreExtent: oid=" << oid << " dbRoot=" << dbRoot <<
722 " partitionNum=" << partitionNum << " segmentNum=" << segmentNum << endl;
723 return;
724 }
725
726 err = slave->createDictStoreExtent(oid, dbRoot,
727 partitionNum, segmentNum, lbid, allocdSize);
728 reply << (uint8_t) err;
729
730 if (err == ERR_OK)
731 {
732 reply << (uint64_t) lbid;
733 reply << (uint32_t) allocdSize;
734 }
735
736 #ifdef BRM_VERBOSE
737 cerr << "WorkerComm: do_createDictStoreExtent() err code is " << err << endl;
738 #endif
739
740 if (!standalone)
741 master.write(reply);
742
743 doSaveDelta = true;
744 }
745
746 //------------------------------------------------------------------------------
747 // Process a request to rollback (delete) a set of column extents.
748 // for a given OID and DBRoot.
749 //------------------------------------------------------------------------------
do_rollbackColumnExtents_DBroot(ByteStream & msg)750 void SlaveComm::do_rollbackColumnExtents_DBroot(ByteStream& msg)
751 {
752 int err;
753 OID_t oid;
754 bool bDeleteAll;
755 uint32_t partitionNum;
756 uint16_t segmentNum;
757 uint16_t dbRoot;
758 HWM_t hwm;
759 uint8_t tmp8;
760 uint16_t tmp16;
761 uint32_t tmp32;
762 ByteStream reply;
763
764 #ifdef BRM_VERBOSE
765 cerr << "WorkerComm: do_rollbackColumnExtents_DBroot()" << endl;
766 #endif
767
768 msg >> tmp32;
769 oid = tmp32;
770 msg >> tmp8;
771 bDeleteAll = tmp8;
772 msg >> tmp16;
773 dbRoot = tmp16;
774 msg >> tmp32;
775 partitionNum = tmp32;
776 msg >> tmp16;
777 segmentNum = tmp16;
778 msg >> tmp32;
779 hwm = tmp32;
780
781 if (printOnly)
782 {
783 cout << "rollbackColumnExtents_DBroot: oid=" << oid <<
784 " bDeleteAll=" << bDeleteAll << " dbRoot=" << dbRoot <<
785 " partitionNum=" << partitionNum <<
786 " segmentNum=" << segmentNum << " hwm=" << hwm << endl;
787 return;
788 }
789
790 err = slave->rollbackColumnExtents_DBroot(
791 oid, bDeleteAll, dbRoot, partitionNum, segmentNum, hwm);
792 reply << (uint8_t) err;
793
794 #ifdef BRM_VERBOSE
795 cerr << "WorkerComm: do_rollbackColumnExtents_DBroot() err code is " <<
796 err << endl;
797 #endif
798
799 if (!standalone)
800 master.write(reply);
801
802 doSaveDelta = true;
803 }
804
805 //------------------------------------------------------------------------------
806 // Process a request to rollback (delete) a set of column extents.
807 // for a given OID and DBRoot.
808 //------------------------------------------------------------------------------
do_rollbackDictStoreExtents_DBroot(ByteStream & msg)809 void SlaveComm::do_rollbackDictStoreExtents_DBroot(ByteStream& msg)
810 {
811 int err;
812 OID_t oid;
813 uint32_t partitionNum;
814 uint16_t dbRoot;
815 uint32_t tmp32;
816 uint16_t tmp16;
817 ByteStream reply;
818 vector<uint16_t> segNums;
819 vector<HWM_t> hwms;
820
821 #ifdef BRM_VERBOSE
822 cerr << "WorkerComm: do_rollbackDictStoreExtents()" << endl;
823 #endif
824
825 msg >> tmp32;
826 oid = tmp32;
827 msg >> tmp16;
828 dbRoot = tmp16;
829 msg >> tmp32;
830 partitionNum = tmp32;
831 deserializeVector(msg, segNums);
832 deserializeVector(msg, hwms);
833
834 if (printOnly)
835 {
836 cout << "rollbackDictStore: oid=" << oid <<
837 " dbRoot=" << dbRoot <<
838 " partitionNum=" << partitionNum <<
839 " hwms..." << endl;
840
841 for (uint32_t i = 0; i < hwms.size(); i++)
842 cout << " " << i << ": " << hwms[i] << endl;
843
844 return;
845 }
846
847
848 err = slave->rollbackDictStoreExtents_DBroot(
849 oid, dbRoot, partitionNum, segNums, hwms);
850 reply << (uint8_t) err;
851
852 #ifdef BRM_VERBOSE
853 cerr << "WorkerComm: do_rollbackDictStoreExtents() err code is " <<
854 err << endl;
855 #endif
856
857 if (!standalone)
858 master.write(reply);
859
860 doSaveDelta = true;
861 }
862
do_deleteEmptyColExtents(messageqcpp::ByteStream & msg)863 void SlaveComm::do_deleteEmptyColExtents(messageqcpp::ByteStream& msg)
864 {
865 OID_t oid;
866 uint32_t tmp1;
867 uint16_t tmp2;
868 int err;
869 ByteStream reply;
870 uint32_t size;
871 ExtentsInfoMap_t extentsInfoMap;
872
873 #ifdef BRM_VERBOSE
874 cerr << "WorkerComm: do_deleteEmptyColExtents()" << endl;
875 #endif
876
877 msg >> size;
878
879 if (printOnly)
880 cout << "deleteEmptyColExtents: size=" << size << " extentsInfoMap..." << endl;
881
882 for ( unsigned i = 0; i < size; i++ )
883 {
884 msg >> tmp1;
885 oid = tmp1;
886 extentsInfoMap[oid].oid = oid;
887 msg >> tmp1;
888 extentsInfoMap[oid].partitionNum = tmp1;
889 msg >> tmp2;
890 extentsInfoMap[oid].segmentNum = tmp2;
891 msg >> tmp2;
892 extentsInfoMap[oid].dbRoot = tmp2;
893 msg >> tmp1;
894 extentsInfoMap[oid].hwm = tmp1;
895
896 if (printOnly)
897 {
898 cout << " oid=" << oid << " partitionNum=" << extentsInfoMap[oid].partitionNum
899 << " segmentNum=" << extentsInfoMap[oid].segmentNum << " dbRoot=" <<
900 extentsInfoMap[oid].dbRoot << " hwm=" << extentsInfoMap[oid].hwm << endl;
901 }
902 }
903
904 if (printOnly)
905 return;
906
907 err = slave->deleteEmptyColExtents(extentsInfoMap);
908 reply << (uint8_t) err;
909 #ifdef BRM_VERBOSE
910 cerr << "WorkerComm: do_deleteEmptyColExtents() err code is " << err << endl;
911 #endif
912
913 if (!standalone)
914 master.write(reply);
915
916 doSaveDelta = true;
917 }
918
do_deleteEmptyDictStoreExtents(messageqcpp::ByteStream & msg)919 void SlaveComm::do_deleteEmptyDictStoreExtents(messageqcpp::ByteStream& msg)
920 {
921 OID_t oid;
922 uint32_t tmp1;
923 uint16_t tmp2;
924 uint8_t tmp3;
925 int err;
926 ByteStream reply;
927 uint32_t size;
928 ExtentsInfoMap_t extentsInfoMap;
929
930 #ifdef BRM_VERBOSE
931 cerr << "WorkerComm: do_deleteEmptyDictStoreExtents()" << endl;
932 #endif
933
934 msg >> size;
935
936 if (printOnly)
937 cout << "deleteEmptyDictStoreExtents: size=" << size << " extentsInfoMap..." << endl;
938
939 for ( unsigned i = 0; i < size; i++ )
940 {
941 msg >> tmp1;
942 oid = tmp1;
943 extentsInfoMap[oid].oid = oid;
944 msg >> tmp1;
945 extentsInfoMap[oid].partitionNum = tmp1;
946 msg >> tmp2;
947 extentsInfoMap[oid].segmentNum = tmp2;
948 msg >> tmp2;
949 extentsInfoMap[oid].dbRoot = tmp2;
950 msg >> tmp1;
951 extentsInfoMap[oid].hwm = tmp1;
952 msg >> tmp3;
953 extentsInfoMap[oid].newFile = (bool) tmp3;
954
955 if (printOnly)
956 {
957 cout << " oid=" << oid << " partitionNum=" << extentsInfoMap[oid].partitionNum <<
958 " segmentNum=" << extentsInfoMap[oid].segmentNum << " dbRoot=" <<
959 extentsInfoMap[oid].dbRoot << " hwm=" << extentsInfoMap[oid].hwm <<
960 " newFile=" << (int) extentsInfoMap[oid].newFile << endl;
961 }
962 }
963
964 if (printOnly)
965 return;
966
967 err = slave->deleteEmptyDictStoreExtents(extentsInfoMap);
968 reply << (uint8_t) err;
969 #ifdef BRM_VERBOSE
970 cerr << "WorkerComm: do_deleteEmptyDictStoreExtents() err code is " << err << endl;
971 #endif
972
973 if (!standalone)
974 master.write(reply);
975
976 doSaveDelta = true;
977 }
978
do_deleteOID(ByteStream & msg)979 void SlaveComm::do_deleteOID(ByteStream& msg)
980 {
981 OID_t oid;
982 uint32_t tmp;
983 int err;
984 ByteStream reply;
985
986 #ifdef BRM_VERBOSE
987 cerr << "WorkerComm: do_deleteOID()" << endl;
988 #endif
989
990 msg >> tmp;
991 oid = tmp;
992
993 if (printOnly)
994 {
995 cout << "deleteOID: oid=" << oid << endl;
996 return;
997 }
998
999 err = slave->deleteOID(oid);
1000 reply << (uint8_t) err;
1001 #ifdef BRM_VERBOSE
1002 cerr << "WorkerComm: do_deleteOID() err code is " << err << endl;
1003 #endif
1004
1005 if (!standalone)
1006 master.write(reply);
1007
1008 doSaveDelta = true;
1009 }
1010
do_deleteOIDs(ByteStream & msg)1011 void SlaveComm::do_deleteOIDs(ByteStream& msg)
1012 {
1013 OID_t oid;
1014 uint32_t tmp;
1015 int err;
1016 ByteStream reply;
1017 uint32_t size;
1018 std::vector<OID_t> Oids;
1019 OidsMap_t oidsMap;
1020 #ifdef BRM_VERBOSE
1021 cerr << "WorkerComm: do_deleteOIDs()" << endl;
1022 #endif
1023
1024 msg >> size;
1025
1026 if (printOnly)
1027 cout << "deleteOIDs: size=" << size << endl;
1028
1029 for ( unsigned i = 0; i < size; i++ )
1030 {
1031 msg >> tmp;
1032 oid = tmp;
1033 oidsMap[oid] = oid;
1034
1035 if (printOnly)
1036 cout << " oid=" << oid << endl;
1037 }
1038
1039 if (printOnly)
1040 return;
1041
1042 err = slave->deleteOIDs(oidsMap);
1043 reply << (uint8_t) err;
1044 #ifdef BRM_VERBOSE
1045 cerr << "WorkerComm: do_deleteOIDs() err code is " << err << endl;
1046 #endif
1047
1048 if (!standalone)
1049 master.write(reply);
1050
1051 doSaveDelta = true;
1052 }
1053
1054 //------------------------------------------------------------------------------
1055 // Process a request to set the local HWM relative to a specific OID, partition,
1056 // and segment number.
1057 //------------------------------------------------------------------------------
do_setLocalHWM(ByteStream & msg)1058 void SlaveComm::do_setLocalHWM(ByteStream& msg)
1059 {
1060 OID_t oid;
1061 HWM_t hwm;
1062 uint32_t partitionNum;
1063 uint16_t segmentNum;
1064 int err;
1065 uint16_t tmp16;
1066 uint32_t tmp32;
1067 ByteStream reply;
1068
1069 #ifdef BRM_VERBOSE
1070 cerr << "WorkerComm: do_setLocalHWM()" << endl;
1071 #endif
1072
1073 msg >> tmp32;
1074 oid = tmp32;
1075 msg >> tmp32;
1076 partitionNum = tmp32;
1077 msg >> tmp16;
1078 segmentNum = tmp16;
1079 msg >> tmp32;
1080 hwm = tmp32;
1081
1082 if (printOnly)
1083 {
1084 cout << "setLocalHWM: oid=" << oid << " partitionNum=" << partitionNum <<
1085 " segmentNum=" << segmentNum << " hwm=" << hwm << endl;
1086 return;
1087 }
1088
1089 err = slave->setLocalHWM(oid, partitionNum, segmentNum, hwm, firstSlave);
1090 reply << (uint8_t) err;
1091 #ifdef BRM_VERBOSE
1092 cerr << "WorkerComm: do_setLocalHWM() err code is " << err << endl;
1093 #endif
1094
1095 if (!standalone)
1096 master.write(reply);
1097
1098 doSaveDelta = true;
1099 }
1100
do_bulkSetHWM(ByteStream & msg)1101 void SlaveComm::do_bulkSetHWM(ByteStream& msg)
1102 {
1103 vector<BulkSetHWMArg> args;
1104 int err;
1105 VER_t transID;
1106 uint32_t tmp32;
1107 ByteStream reply;
1108
1109 #ifdef BRM_VERBOSE
1110 cerr << "WorkerComm: do_setLocalHWM()" << endl;
1111 #endif
1112
1113 deserializeInlineVector(msg, args);
1114 msg >> tmp32;
1115 transID = tmp32;
1116
1117 if (printOnly)
1118 {
1119 cout << "bulkSetHWM(). TransID = " << transID << endl;
1120
1121 for (uint32_t i = 0; i < args.size(); i++)
1122 cout << "bulkSetHWM arg " << i + 1 << ": oid=" << args[i].oid << " partitionNum=" << args[i].partNum <<
1123 " segmentNum=" << args[i].segNum << " hwm=" << args[i].hwm << endl;
1124
1125 return;
1126 }
1127
1128 err = slave->bulkSetHWM(args, transID, firstSlave);
1129 reply << (uint8_t) err;
1130 #ifdef BRM_VERBOSE
1131 cerr << "WorkerComm: do_setLocalHWM() err code is " << err << endl;
1132 #endif
1133
1134 if (!standalone)
1135 master.write(reply);
1136
1137 doSaveDelta = true;
1138 }
1139
do_bulkSetHWMAndCP(ByteStream & msg)1140 void SlaveComm::do_bulkSetHWMAndCP(ByteStream& msg)
1141 {
1142 vector<BulkSetHWMArg> hwmArgs;
1143 vector<CPInfo> setCPDataArgs;
1144 vector<CPInfoMerge> mergeCPDataArgs;
1145 int err;
1146 VER_t transID;
1147 uint32_t tmp32;
1148 ByteStream reply;
1149
1150 #ifdef BRM_VERBOSE
1151 cerr << "WorkerComm: do_setLocalHWM()" << endl;
1152 #endif
1153
1154 deserializeInlineVector(msg, hwmArgs);
1155 deserializeInlineVector(msg, setCPDataArgs);
1156 deserializeInlineVector(msg, mergeCPDataArgs);
1157 msg >> tmp32;
1158 transID = tmp32;
1159
1160 #if 0
1161
1162 if (printOnly)
1163 {
1164 cout << "bulkSetHWM(). TransID = " << transID << endl;
1165
1166 for (uint32_t i = 0; i < args.size(); i++)
1167 cout << "bulkSetHWM arg " << i + 1 << ": oid=" << args[i].oid << " partitionNum=" << args[i].partNum <<
1168 " segmentNum=" << args[i].segNum << " hwm=" << args[i].hwm << endl;
1169
1170 return;
1171 }
1172
1173 #endif
1174
1175 err = slave->bulkSetHWMAndCP(hwmArgs, setCPDataArgs, mergeCPDataArgs, transID, firstSlave);
1176 reply << (uint8_t) err;
1177 #ifdef BRM_VERBOSE
1178 cerr << "WorkerComm: do_setLocalHWM() err code is " << err << endl;
1179 #endif
1180
1181 if (!standalone)
1182 master.write(reply);
1183
1184 doSaveDelta = true;
1185 }
1186
do_bulkUpdateDBRoot(ByteStream & msg)1187 void SlaveComm::do_bulkUpdateDBRoot(ByteStream& msg)
1188 {
1189 vector<BulkUpdateDBRootArg> args;
1190 ByteStream reply;
1191 int err;
1192
1193 deserializeInlineVector(msg, args);
1194 err = slave->bulkUpdateDBRoot(args);
1195 reply << (uint8_t) err;
1196
1197 if (!standalone)
1198 master.write(reply);
1199
1200 doSaveDelta = true;
1201 }
1202
do_markInvalid(ByteStream & msg)1203 void SlaveComm::do_markInvalid(ByteStream& msg)
1204 {
1205 LBID_t lbid;
1206 uint32_t colDataType;
1207 int err;
1208 ByteStream reply;
1209
1210 #ifdef BRM_VERBOSE
1211 cerr << "WorkerComm: do_markInvalid()" << endl;
1212 #endif
1213
1214 msg >> lbid;
1215 msg >> colDataType;
1216
1217 if (printOnly)
1218 {
1219 cout << "markExtentInvalid: lbid=" << lbid << "colDataType=" << colDataType << endl;
1220 return;
1221 }
1222
1223 err = slave->markExtentInvalid(lbid, (execplan::CalpontSystemCatalog::ColDataType)colDataType);
1224 reply << (uint8_t)err;
1225 #ifdef BRM_VERBOSE
1226 cerr << "WorkerComm: do_markInvalid() err code is " << err << endl;
1227 #endif
1228
1229 if (!standalone)
1230 master.write(reply);
1231
1232 doSaveDelta = true;
1233 }
1234
do_markManyExtentsInvalid(ByteStream & msg)1235 void SlaveComm::do_markManyExtentsInvalid(ByteStream& msg)
1236 {
1237 uint64_t tmp64;
1238 uint32_t colDataType;
1239 int err;
1240 ByteStream reply;
1241 vector<LBID_t> lbids;
1242 vector<execplan::CalpontSystemCatalog::ColDataType> colDataTypes;
1243 uint32_t size, i;
1244
1245 #ifdef BRM_VERBOSE
1246 cerr << "WorkerComm: do_markManyExtentsInvalid()" << endl;
1247 #endif
1248
1249 msg >> size;
1250
1251 if (printOnly)
1252 cout << "markManyExtentsInvalid: size=" << size << " lbids..." << endl;
1253
1254 for (i = 0; i < size; ++i)
1255 {
1256 msg >> tmp64;
1257 msg >> colDataType;
1258 lbids.push_back(tmp64);
1259 colDataTypes.push_back((execplan::CalpontSystemCatalog::ColDataType)colDataType);
1260
1261 if (printOnly)
1262 cout << " " << tmp64 << " " << colDataType << endl;
1263 }
1264
1265 if (printOnly)
1266 return;
1267
1268 err = slave->markExtentsInvalid(lbids, colDataTypes);
1269 reply << (uint8_t)err;
1270 #ifdef BRM_VERBOSE
1271 cerr << "WorkerComm: do_markManyExtentsInvalid() err code is " << err << endl;
1272 #endif
1273
1274 if (!standalone)
1275 master.write(reply);
1276
1277 doSaveDelta = true;
1278 }
1279
do_setExtentMaxMin(ByteStream & msg)1280 void SlaveComm::do_setExtentMaxMin(ByteStream& msg)
1281 {
1282 LBID_t lbid;
1283 int64_t max;
1284 int64_t min;
1285 int32_t sequence;
1286 uint64_t tmp64;
1287 uint32_t tmp32;
1288 int err;
1289 ByteStream reply;
1290
1291 #ifdef BRM_VERBOSE
1292 cerr << "WorkerComm: do_setExtentMaxMin()" << endl;
1293 #endif
1294
1295 msg >> tmp64;
1296 lbid = tmp64;
1297
1298 msg >> tmp64;
1299 max = tmp64;
1300
1301 msg >> tmp64;
1302 min = tmp64;
1303
1304 msg >> tmp32;
1305 sequence = tmp32;
1306
1307 if (printOnly)
1308 {
1309 cout << "setExtentMaxMin: lbid=" << lbid << " max=" << max << " min=" << min <<
1310 " sequence=" << sequence << endl;
1311 return;
1312 }
1313
1314 err = slave->setExtentMaxMin(lbid, max, min, sequence, firstSlave);
1315 reply << (uint8_t)err;
1316 #ifdef BRM_VERBOSE
1317 cerr << "WorkerComm: do_setExtentMaxMin() err code is " << err << endl;
1318 #endif
1319
1320 if (!standalone)
1321 master.write(reply);
1322
1323 doSaveDelta = true;
1324 }
1325
1326 // @bug 1970 - added do_setExtentsMaxMin to set multiple extents CP info.
do_setExtentsMaxMin(ByteStream & msg)1327 void SlaveComm::do_setExtentsMaxMin(ByteStream& msg)
1328 {
1329 LBID_t lbid;
1330 uint64_t tmp64;
1331 uint32_t tmp32;
1332 int err;
1333 ByteStream reply;
1334 int32_t updateCount;
1335
1336 #ifdef BRM_VERBOSE
1337 cerr << "WorkerComm: do_setExtentsMaxMin()" << endl;
1338 #endif
1339
1340 msg >> tmp32;
1341 updateCount = tmp32;
1342 CPMaxMinMap_t cpMap;
1343 CPMaxMin cpMaxMin;
1344
1345 if (printOnly)
1346 cout << "setExtentsMaxMin: size=" << updateCount << " CPdata..." << endl;
1347
1348 // Loop through extents and add each one to a map.
1349 for (int64_t i = 0; i < updateCount; i++)
1350 {
1351 msg >> tmp64;
1352 lbid = tmp64;
1353
1354 msg >> tmp64;
1355 cpMaxMin.max = tmp64;
1356
1357 msg >> tmp64;
1358 cpMaxMin.min = tmp64;
1359
1360 msg >> tmp32;
1361 cpMaxMin.seqNum = tmp32;
1362
1363 cpMap[lbid] = cpMaxMin;
1364
1365 if (printOnly)
1366 cout << " lbid=" << lbid << " max=" << cpMaxMin.max << " min=" <<
1367 cpMaxMin.min << " sequenceNum=" << cpMaxMin.seqNum << endl;
1368 }
1369
1370 if (printOnly)
1371 return;
1372
1373 err = slave->setExtentsMaxMin(cpMap, firstSlave);
1374 reply << (uint8_t)err;
1375 #ifdef BRM_VERBOSE
1376 cerr << "WorkerComm: do_setExtentsMaxMin() err code is " << err << endl;
1377 #endif
1378
1379 if (!standalone)
1380 master.write(reply);
1381
1382 doSaveDelta = true;
1383 }
1384
1385 //------------------------------------------------------------------------------
1386 // @bug 2117 - added do_mergeExtentsMaxMin to merge multiple extents CP info.
1387 //------------------------------------------------------------------------------
do_mergeExtentsMaxMin(ByteStream & msg)1388 void SlaveComm::do_mergeExtentsMaxMin(ByteStream& msg)
1389 {
1390 LBID_t startLbid;
1391 uint64_t tmp64;
1392 uint32_t tmp32;
1393 int err;
1394 ByteStream reply;
1395 int32_t mergeCount;
1396
1397 #ifdef BRM_VERBOSE
1398 cerr << "WorkerComm: do_mergeExtentsMaxMin()" << endl;
1399 #endif
1400
1401 msg >> tmp32;
1402 mergeCount = tmp32;
1403 CPMaxMinMergeMap_t cpMap;
1404 CPMaxMinMerge cpMaxMin;
1405
1406 if (printOnly)
1407 cout << "mergeExtentsMaxMin: size=" << mergeCount << " CPdata..." << endl;
1408
1409 // Loop through extents and add each one to a map.
1410 for (int64_t i = 0; i < mergeCount; i++)
1411 {
1412 msg >> tmp64;
1413 startLbid = tmp64;
1414
1415 msg >> tmp64;
1416 cpMaxMin.max = tmp64;
1417
1418 msg >> tmp64;
1419 cpMaxMin.min = tmp64;
1420
1421 msg >> tmp32;
1422 cpMaxMin.seqNum = tmp32;
1423
1424 msg >> tmp32;
1425 cpMaxMin.type = (execplan::CalpontSystemCatalog::ColDataType)tmp32;
1426
1427 msg >> tmp32;
1428 cpMaxMin.newExtent = tmp32;
1429
1430 cpMap[startLbid] = cpMaxMin;
1431
1432 if (printOnly)
1433 cout << " startLBID=" << startLbid << " max=" << cpMaxMin.max << " min=" <<
1434 cpMaxMin.min << " sequenceNum=" << cpMaxMin.seqNum << " type=" << (int)
1435 cpMaxMin.type << " newExtent=" << (int) cpMaxMin.newExtent << endl;
1436 }
1437
1438 if (printOnly)
1439 return;
1440
1441 err = slave->mergeExtentsMaxMin(cpMap);
1442 reply << (uint8_t)err;
1443 #ifdef BRM_VERBOSE
1444 cerr << "WorkerComm: do_mergeExtentsMaxMin() err code is " << err << endl;
1445 #endif
1446
1447 if (!standalone)
1448 master.write(reply);
1449
1450 doSaveDelta = true;
1451 }
1452
1453 //------------------------------------------------------------------------------
1454 // Delete all extents for the specified OID(s) and partition number.
1455 //------------------------------------------------------------------------------
do_deletePartition(ByteStream & msg)1456 void SlaveComm::do_deletePartition(ByteStream& msg)
1457 {
1458 OID_t oid;
1459 uint32_t tmp32;
1460 int err;
1461 ByteStream reply;
1462 uint32_t size;
1463 std::set<OID_t> oids;
1464
1465 #ifdef BRM_VERBOSE
1466 cerr << "WorkerComm: do_deletePartition()" << endl;
1467 #endif
1468
1469 set<LogicalPartition> partitionNums;
1470 deserializeSet<LogicalPartition>(msg, partitionNums);
1471
1472 msg >> size;
1473
1474 if (printOnly)
1475 {
1476 cout << "deletePartition: partitionNum: ";
1477 set<LogicalPartition>::const_iterator it;
1478
1479 for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
1480 cout << (*it) << " ";
1481
1482 cout << "\nsize=" << size << " oids..." << endl;
1483 }
1484
1485 for (unsigned i = 0; i < size; i++)
1486 {
1487 msg >> tmp32;
1488 oid = tmp32;
1489 oids.insert( oid );
1490
1491 if (printOnly)
1492 cout << " " << oid << endl;
1493 }
1494
1495 if (printOnly)
1496 return;
1497
1498 string emsg;
1499 err = slave->deletePartition(oids, partitionNums, emsg);
1500 reply << (uint8_t) err;
1501
1502 if (err != 0)
1503 reply << emsg;
1504
1505 #ifdef BRM_VERBOSE
1506 cerr << "WorkerComm: do_deletePartition() err code is " << err << endl;
1507 #endif
1508
1509 if (!standalone)
1510 master.write(reply);
1511
1512 doSaveDelta = true;
1513 }
1514
1515 //------------------------------------------------------------------------------
1516 // Mark all extents as out of service, for the specified OID(s) and partition
1517 // number.
1518 //------------------------------------------------------------------------------
do_markPartitionForDeletion(ByteStream & msg)1519 void SlaveComm::do_markPartitionForDeletion(ByteStream& msg)
1520 {
1521 OID_t oid;
1522 uint32_t tmp32;
1523 int err;
1524 ByteStream reply;
1525 uint32_t size;
1526 std::set<OID_t> oids;
1527
1528 #ifdef BRM_VERBOSE
1529 cerr << "WorkerComm: do_markPartitionForDeletion()" << endl;
1530 #endif
1531
1532 set<LogicalPartition> partitionNums;
1533 deserializeSet<LogicalPartition>(msg, partitionNums);
1534 msg >> size;
1535
1536 if (printOnly)
1537 {
1538 cout << "markPartitionForDeletion: partitionNum: ";
1539 set<LogicalPartition>::const_iterator it;
1540
1541 for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
1542 cout << (*it) << " ";
1543
1544 cout << "\nsize=" << size << " oids..." << endl;
1545 }
1546
1547 for (unsigned i = 0; i < size; i++)
1548 {
1549 msg >> tmp32;
1550 oid = tmp32;
1551 oids.insert( oid );
1552
1553 if (printOnly)
1554 cout << " " << oid << endl;
1555 }
1556
1557 if (printOnly)
1558 return;
1559
1560 string emsg;
1561 err = slave->markPartitionForDeletion(oids, partitionNums, emsg);
1562 reply << (uint8_t) err;
1563
1564 if (err != 0)
1565 reply << emsg;
1566
1567 #ifdef BRM_VERBOSE
1568 cerr << "WorkerComm: do_markPartitionforDeletion() err code is " <<
1569 err << endl;
1570 #endif
1571
1572 if (!standalone)
1573 master.write(reply);
1574
1575 doSaveDelta = true;
1576 }
1577
1578 //------------------------------------------------------------------------------
1579 // Mark all extents as out of service, for the specified OID(s) and partition
1580 // number.
1581 //------------------------------------------------------------------------------
do_markAllPartitionForDeletion(ByteStream & msg)1582 void SlaveComm::do_markAllPartitionForDeletion(ByteStream& msg)
1583 {
1584 OID_t oid;
1585 uint32_t tmp32;
1586 int err;
1587 ByteStream reply;
1588 uint32_t size;
1589 std::set<OID_t> oids;
1590
1591 #ifdef BRM_VERBOSE
1592 cerr << "WorkerComm: do_markAllPartitionForDeletion()" << endl;
1593 #endif
1594 msg >> size;
1595
1596 if (printOnly)
1597 cout << "markAllPartitionForDeletion: size="
1598 << size << " oids..." << endl;
1599
1600 for (unsigned i = 0; i < size; i++)
1601 {
1602 msg >> tmp32;
1603 oid = tmp32;
1604 oids.insert( oid );
1605
1606 if (printOnly)
1607 cout << " " << oid << endl;
1608 }
1609
1610 if (printOnly)
1611 return;
1612
1613 err = slave->markAllPartitionForDeletion(oids);
1614 reply << (uint8_t) err;
1615 #ifdef BRM_VERBOSE
1616 cerr << "WorkerComm: do_markAllPartitionforDeletion() err code is " <<
1617 err << endl;
1618 #endif
1619
1620 if (!standalone)
1621 master.write(reply);
1622
1623 doSaveDelta = true;
1624 }
1625
1626 //------------------------------------------------------------------------------
1627 // Restore all extents for the specified OID(s) and partition number.
1628 //------------------------------------------------------------------------------
do_restorePartition(ByteStream & msg)1629 void SlaveComm::do_restorePartition(ByteStream& msg)
1630 {
1631 OID_t oid;
1632 uint32_t tmp32;
1633 int err;
1634 ByteStream reply;
1635 uint32_t size;
1636 std::set<OID_t> oids;
1637
1638 #ifdef BRM_VERBOSE
1639 cerr << "WorkerComm: do_restorePartition()" << endl;
1640 #endif
1641
1642 set<LogicalPartition> partitionNums;
1643 deserializeSet<LogicalPartition>(msg, partitionNums);
1644
1645 msg >> size;
1646
1647 if (printOnly)
1648 {
1649 cout << "restorePartition: partitionNum: ";
1650 set<LogicalPartition>::const_iterator it;
1651
1652 for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
1653 cout << (*it) << " ";
1654
1655 cout << "\nsize=" << size << " oids..." << endl;
1656 }
1657
1658 for (unsigned i = 0; i < size; i++)
1659 {
1660 msg >> tmp32;
1661 oid = tmp32;
1662 oids.insert( oid );
1663
1664 if (printOnly)
1665 cout << " " << oid << endl;
1666 }
1667
1668 if (printOnly)
1669 return;
1670
1671 string emsg;
1672 err = slave->restorePartition(oids, partitionNums, emsg);
1673 reply << (uint8_t) err;
1674
1675 if (err != 0)
1676 reply << emsg;
1677
1678 #ifdef BRM_VERBOSE
1679 cerr << "WorkerComm: do_restorePartition() err code is " << err << endl;
1680 #endif
1681
1682 if (!standalone)
1683 master.write(reply);
1684
1685 doSaveDelta = true;
1686 }
1687
1688 //------------------------------------------------------------------------------
1689 // Delete all extents for the specified dbroot
1690 //------------------------------------------------------------------------------
do_deleteDBRoot(ByteStream & msg)1691 void SlaveComm::do_deleteDBRoot(ByteStream& msg)
1692 {
1693 int err;
1694 ByteStream reply;
1695 uint32_t q;
1696 uint16_t dbroot;
1697
1698 #ifdef BRM_VERBOSE
1699 cerr << "WorkerComm: do_deleteDBroot()" << endl;
1700 #endif
1701
1702 msg >> q;
1703 dbroot = static_cast<uint16_t>(q);
1704
1705 if (printOnly)
1706 {
1707 cout << "deleteDBRoot: " << dbroot << endl;
1708 return;
1709 }
1710
1711 err = slave->deleteDBRoot(dbroot);
1712 reply << (uint8_t) err;
1713
1714 #ifdef BRM_VERBOSE
1715 cerr << "WorkerComm: do_deleteDBRoot() err code is " << err << endl;
1716 #endif
1717
1718 if (!standalone)
1719 master.write(reply);
1720
1721 doSaveDelta = true;
1722 }
1723
do_writeVBEntry(ByteStream & msg)1724 void SlaveComm::do_writeVBEntry(ByteStream& msg)
1725 {
1726 VER_t transID;
1727 LBID_t lbid;
1728 OID_t vbOID;
1729 uint32_t vbFBO, tmp;
1730 uint64_t tmp64;
1731 int err;
1732 ByteStream reply;
1733
1734 #ifdef BRM_VERBOSE
1735 cerr << "WorkerComm: do_writeVBEntry()" << endl;
1736 #endif
1737
1738 msg >> tmp;
1739 transID = tmp;
1740 msg >> tmp64;
1741 lbid = tmp64;
1742 msg >> tmp;
1743 vbOID = tmp;
1744 msg >> vbFBO;
1745
1746 if (printOnly)
1747 {
1748 cout << "writeVBEntry: transID=" << transID << " lbid=" << lbid << " vbOID=" <<
1749 vbOID << " vbFBO=" << vbFBO << endl;
1750 return;
1751 }
1752
1753 err = slave->writeVBEntry(transID, lbid, vbOID, vbFBO);
1754 reply << (uint8_t) err;
1755 #ifdef BRM_VERBOSE
1756 cerr << "WorkerComm: do_writeVBEntry() err code is " << err << endl;
1757 #endif
1758
1759 if (!standalone)
1760 master.write(reply);
1761
1762 doSaveDelta = true;
1763 }
1764
do_bulkWriteVBEntry(ByteStream & msg)1765 void SlaveComm::do_bulkWriteVBEntry(ByteStream& msg)
1766 {
1767 VER_t transID;
1768 std::vector<BRM::LBID_t> lbids;
1769 OID_t vbOID;
1770 std::vector<uint32_t> vbFBOs;
1771 uint32_t tmp;
1772 int err;
1773 ByteStream reply;
1774
1775 #ifdef BRM_VERBOSE
1776 cerr << "WorkerComm: do_bulkWriteVBEntry()" << endl;
1777 #endif
1778
1779 msg >> tmp;
1780 transID = tmp;
1781 deserializeInlineVector(msg, lbids);
1782 msg >> tmp;
1783 vbOID = tmp;
1784 deserializeInlineVector(msg, vbFBOs);
1785
1786 if (printOnly)
1787 {
1788 cout << "bulkWriteVBEntry: transID=" << transID << endl;
1789
1790 for (size_t i = 0; i < lbids.size(); i++)
1791 cout << "bulkWriteVBEntry arg " << i + 1 << ": lbid=" << lbids[i] << " vbOID=" <<
1792 vbOID << " vbFBO=" << vbFBOs[i] << endl;
1793 return;
1794 }
1795
1796 err = slave->bulkWriteVBEntry(transID, lbids, vbOID, vbFBOs);
1797 reply << (uint8_t) err;
1798 #ifdef BRM_VERBOSE
1799 cerr << "WorkerComm: do_bulkWriteVBEntry() err code is " << err << endl;
1800 #endif
1801
1802 if (!standalone)
1803 master.write(reply);
1804
1805 doSaveDelta = true;
1806 }
1807
do_beginVBCopy(ByteStream & msg)1808 void SlaveComm::do_beginVBCopy(ByteStream& msg)
1809 {
1810 VER_t transID;
1811 LBIDRange_v ranges;
1812 VBRange_v freeList;
1813 uint32_t tmp32;
1814 uint16_t dbRoot;
1815 int err;
1816 ByteStream reply;
1817
1818 #ifdef BRM_VERBOSE
1819 cerr << "WorkerComm: do_beginVBCopy()" << endl;
1820 #endif
1821
1822 msg >> tmp32;
1823 transID = tmp32;
1824 msg >> dbRoot;
1825 deserializeVector(msg, ranges);
1826
1827 if (printOnly)
1828 {
1829 cout << "beginVBCopy: transID=" << transID << " dbRoot=" << dbRoot << " size="
1830 << ranges.size() << " ranges..." << endl;
1831
1832 for (uint32_t i = 0; i < ranges.size(); i++)
1833 cout << " start=" << ranges[i].start << " size=" << ranges[i].size << endl;
1834
1835 return;
1836 }
1837
1838 err = slave->beginVBCopy(transID, dbRoot, ranges, freeList, firstSlave && !standalone);
1839 reply << (uint8_t) err;
1840
1841 if (err == ERR_OK)
1842 serializeVector(reply, freeList);
1843
1844 #ifdef BRM_VERBOSE
1845 cerr << "WorkerComm: do_beginVBCopy() err code is " << err << endl;
1846 #endif
1847
1848 if (!standalone)
1849 master.write(reply);
1850
1851 doSaveDelta = true;
1852 }
1853
do_endVBCopy(ByteStream & msg)1854 void SlaveComm::do_endVBCopy(ByteStream& msg)
1855 {
1856 VER_t transID;
1857 LBIDRange_v ranges;
1858 uint32_t tmp;
1859 int err;
1860 ByteStream reply;
1861
1862 #ifdef BRM_VERBOSE
1863 cerr << "WorkerComm: do_endVBCopy()" << endl;
1864 #endif
1865
1866 msg >> tmp;
1867 transID = tmp;
1868 deserializeVector(msg, ranges);
1869
1870 if (printOnly)
1871 {
1872 cout << "endVBCopy: transID=" << transID << " size=" << ranges.size() <<
1873 " ranges..." << endl;
1874
1875 for (uint32_t i = 0; i < ranges.size(); i++)
1876 cout << " start=" << ranges[i].start << " size=" << ranges[i].size << endl;
1877
1878 return;
1879 }
1880
1881 err = slave->endVBCopy(transID, ranges);
1882 reply << (uint8_t) err;
1883 #ifdef BRM_VERBOSE
1884 cerr << "WorkerComm: do_endVBCopy() err code is " << err << endl;
1885 #endif
1886
1887 if (!standalone)
1888 master.write(reply);
1889
1890 doSaveDelta = true;
1891 }
1892
do_vbRollback1(ByteStream & msg)1893 void SlaveComm::do_vbRollback1(ByteStream& msg)
1894 {
1895 VER_t transID;
1896 LBIDRange_v lbidList;
1897 uint32_t tmp;
1898 int err;
1899 ByteStream reply;
1900
1901 #ifdef BRM_VERBOSE
1902 cerr << "WorkerComm: do_vbRollback1()" << endl;
1903 #endif
1904
1905 msg >> tmp;
1906 transID = tmp;
1907 deserializeVector(msg, lbidList);
1908
1909 if (printOnly)
1910 {
1911 cout << "vbRollback1: transID=" << transID << " size=" << lbidList.size() <<
1912 " lbids..." << endl;
1913
1914 for (uint32_t i = 0; i < lbidList.size(); i++)
1915 cout << " start=" << lbidList[i].start << " size=" << lbidList[i].size << endl;
1916
1917 return;
1918 }
1919
1920 err = slave->vbRollback(transID, lbidList, firstSlave && !standalone);
1921 reply << (uint8_t) err;
1922 #ifdef BRM_VERBOSE
1923 cerr << "WorkerComm: do_vbRollback1() err code is " << err << endl;
1924 #endif
1925
1926 if (!standalone)
1927 master.write(reply);
1928
1929 doSaveDelta = true;
1930 }
1931
do_vbRollback2(ByteStream & msg)1932 void SlaveComm::do_vbRollback2(ByteStream& msg)
1933 {
1934 VER_t transID;
1935 vector<LBID_t> lbidList;
1936 uint32_t tmp;
1937 int err;
1938 ByteStream reply;
1939
1940 #ifdef BRM_VERBOSE
1941 cerr << "WorkerComm: do_vbRollback2()" << endl;
1942 #endif
1943
1944 msg >> tmp;
1945 transID = tmp;
1946 deserializeVector(msg, lbidList);
1947
1948 if (printOnly)
1949 {
1950 cout << "vbRollback2: transID=" << transID << " size=" << lbidList.size() <<
1951 " lbids..." << endl;
1952
1953 for (uint32_t i = 0; i < lbidList.size(); i++)
1954 cout << " " << lbidList[i] << endl;
1955
1956 return;
1957 }
1958
1959 err = slave->vbRollback(transID, lbidList, firstSlave && !standalone);
1960 reply << (uint8_t) err;
1961 #ifdef BRM_VERBOSE
1962 cerr << "WorkerComm: do_vbRollback2() err code is " << err << endl;
1963 #endif
1964
1965 if (!standalone)
1966 master.write(reply);
1967
1968 doSaveDelta = true;
1969 }
1970
do_vbCommit(ByteStream & msg)1971 void SlaveComm::do_vbCommit(ByteStream& msg)
1972 {
1973 VER_t transID;
1974 uint32_t tmp;
1975 int err;
1976 ByteStream reply;
1977
1978 #ifdef BRM_VERBOSE
1979 cerr << "WorkerComm: do_vbCommit()" << endl;
1980 #endif
1981
1982 msg >> tmp;
1983 transID = tmp;
1984
1985 if (printOnly)
1986 {
1987 cout << "vbCommit: transID=" << transID << endl;
1988 return;
1989 }
1990
1991 err = slave->vbCommit(transID);
1992 reply << (uint8_t) err;
1993 #ifdef BRM_VERBOSE
1994 cerr << "WorkerComm: do_vbCommit() err code is " << err << endl;
1995 #endif
1996
1997 if (!standalone)
1998 master.write(reply);
1999
2000 doSaveDelta = true;
2001 }
2002
do_undo()2003 void SlaveComm::do_undo()
2004 {
2005 #ifdef BRM_VERBOSE
2006 cerr << "WorkerComm: do_undo()" << endl;
2007 #endif
2008
2009 if (printOnly)
2010 {
2011 cout << "undoChanges" << endl;
2012 return;
2013 }
2014
2015 slave->undoChanges();
2016 takeSnapshot = false;
2017 doSaveDelta = false;
2018 }
2019
do_confirm()2020 void SlaveComm::do_confirm()
2021 {
2022 #ifdef BRM_VERBOSE
2023 cerr << "WorkerComm: do_confirm()" << endl;
2024 #endif
2025
2026 if (printOnly)
2027 {
2028 cout << "confirmChanges" << endl;
2029 return;
2030 }
2031
2032 if (firstSlave && doSaveDelta && (journalCount < snapshotInterval || snapshotInterval < 0))
2033 {
2034 doSaveDelta = false;
2035 saveDelta();
2036 }
2037
2038 slave->confirmChanges();
2039
2040 string tmp = savefile + "_current";
2041
2042 if (firstSlave && (takeSnapshot ||
2043 (journalCount >= snapshotInterval && snapshotInterval >= 0)))
2044 {
2045 if (!currentSaveFile)
2046 {
2047 currentSaveFile = IDBDataFile::open(
2048 IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0);
2049 }
2050
2051 if (currentSaveFile == NULL)
2052 {
2053 ostringstream os;
2054 os << "WorkerComm: failed to open the current savefile. errno: "
2055 << strerror(errno);
2056 log(os.str());
2057 throw runtime_error(os.str());
2058 }
2059
2060
2061 tmp = savefile + (saveFileToggle ? 'A' : 'B');
2062 slave->saveState(tmp);
2063 #ifndef _MSC_VER
2064 tmp += '\n';
2065 #endif
2066 int err = 0;
2067
2068 // MCOL-1558. Make the _current file relative to DBRMRoot.
2069 string relative = tmp.substr(tmp.find_last_of('/') + 1);
2070 err = currentSaveFile->write(relative.c_str(), relative.length());
2071
2072 if (err < (int) relative.length())
2073 {
2074 ostringstream os;
2075 os << "WorkerComm: currentfile write() returned " << err
2076 << " file pointer is " << currentSaveFile;
2077
2078 if (err < 0)
2079 os << " errno: " << strerror(errno);
2080
2081 log(os.str());
2082 }
2083
2084 currentSaveFile->flush();
2085 delete currentSaveFile;
2086 currentSaveFile = NULL;
2087 saveFileToggle = !saveFileToggle;
2088
2089 delete journalh;
2090 journalh = IDBDataFile::open(
2091 IDBPolicy::getType(journalName.c_str(), IDBPolicy::WRITEENG), journalName.c_str(), "w+b", 0);
2092
2093 if (!journalh)
2094 throw runtime_error("Could not open the BRM journal for writing!");
2095
2096 takeSnapshot = false;
2097 doSaveDelta = false;
2098 journalCount = 0;
2099 }
2100 }
2101
do_flushInodeCache()2102 void SlaveComm::do_flushInodeCache()
2103 {
2104 ByteStream reply;
2105
2106 #ifdef BRM_VERBOSE
2107 cerr << "WorkerComm: do_flushInodeCache()" << endl;
2108 #endif
2109
2110 if (printOnly)
2111 {
2112 cout << "flushInodeCache" << endl;
2113 return;
2114 }
2115
2116 #ifdef __linux__
2117 #ifdef USE_VERY_COMPLEX_DROP_CACHES
2118 double elapsedTime = 0.0;
2119 char msgChString[100];
2120 struct timespec tm1, tm2;
2121 clock_gettime(CLOCK_REALTIME, &tm1);
2122
2123 int fd;
2124 fd = open("/proc/sys/vm/drop_caches", O_WRONLY);
2125
2126 if (fd >= 0)
2127 {
2128 ssize_t writeCnt = write(fd, "3\n", 2);
2129 clock_gettime(CLOCK_REALTIME, &tm2);
2130 timespec_sub(tm1, tm2, elapsedTime);
2131
2132 if (writeCnt == 2)
2133 {
2134 snprintf(msgChString, sizeof(msgChString),
2135 "WorkerNode updating drop_caches to flush cache; "
2136 "elapsedTime-%f sec", elapsedTime);
2137 log(string(msgChString),
2138 logging::LOG_TYPE_DEBUG);
2139 }
2140 else
2141 {
2142 snprintf(msgChString, sizeof(msgChString),
2143 "WorkerNode unable to update drop_caches and flush cache; "
2144 "elapsedTime-%f sec", elapsedTime);
2145 log_errno(string(msgChString),
2146 logging::LOG_TYPE_WARNING);
2147 }
2148
2149 close(fd);
2150 }
2151 else
2152 {
2153 clock_gettime(CLOCK_REALTIME, &tm2);
2154 timespec_sub(tm1, tm2, elapsedTime);
2155 snprintf(msgChString, sizeof(msgChString),
2156 "WorkerNode unable to open drop_caches and flush cache; "
2157 "elapsedTime-%f sec", elapsedTime);
2158 log_errno(string(msgChString),
2159 logging::LOG_TYPE_WARNING);
2160 }
2161
2162 #else
2163 int fd = -1;
2164
2165 if ((fd = open("/proc/sys/vm/drop_caches", O_WRONLY)) >= 0)
2166 {
2167 ssize_t written = write(fd, "3\n", 2);
2168 int rc = close(fd);
2169 if ( !written || rc )
2170 {
2171 std::cerr << "Could not write into or close /proc/sys/vm/drop_caches" << std::endl;
2172 }
2173 }
2174
2175 #endif
2176 #endif
2177 reply << (uint8_t) ERR_OK;
2178
2179 if (!standalone)
2180 master.write(reply);
2181 }
2182
do_clear()2183 void SlaveComm::do_clear()
2184 {
2185 int err;
2186 ByteStream reply;
2187
2188 #ifdef BRM_VERBOSE
2189 cerr << "WorkerComm: do_clear()" << endl;
2190 #endif
2191
2192 if (printOnly)
2193 {
2194 cout << "clear" << endl;
2195 return;
2196 }
2197
2198 err = slave->clear();
2199
2200 if (err)
2201 throw runtime_error("Clear failed.");
2202
2203 /* This doesn't get confirmed, so we have to save its delta here */
2204 if (firstSlave)
2205 saveDelta();
2206
2207 reply << (uint8_t) (err == 0 ? ERR_OK : ERR_FAILURE);
2208
2209 if (!standalone)
2210 master.write(reply);
2211 }
2212
replayJournal(string prefix)2213 int SlaveComm::replayJournal(string prefix)
2214 {
2215 ByteStream cmd;
2216 uint32_t len;
2217 int ret = 0;
2218
2219 // @Bug 2667+
2220 // Fix for issue where load_brm was using the journal file from DBRMRoot instead of the one from the command line
2221 // argument.
2222
2223 // If A or B files are being loaded, strip off the A or B for the journal file name as there is only one journal file.
2224 // For example, if prefix is "/usr/local/mariadb/columnstore/data1/systemFiles/dbrm/BRM_savesA" the journal file name will be
2225 // "/usr/local/mariadb/columnstore/data1/systemFiles/dbrm/BRM_saves_journal".
2226
2227 string tmp = prefix.substr(prefix.length() - 1);
2228 string fName;
2229
2230 if ((tmp.compare("A") == 0) || (tmp.compare("B") == 0))
2231 {
2232 fName = prefix.substr(0, prefix.length() - 1) + "_journal";
2233 }
2234
2235 #ifdef _MSC_VER
2236 else if (tmp == "a" || tmp == "b")
2237 fName = prefix.substr(0, prefix.length() - 1) + "_journal";
2238
2239 #endif
2240 else
2241 {
2242 fName = prefix + "_journal";
2243 }
2244
2245 const char* filename = fName.c_str();
2246
2247 IDBDataFile* journalf = IDBDataFile::open(
2248 IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0);
2249
2250 if (!journalf)
2251 {
2252 cout << "Error opening journal file " << fName << endl;
2253 return -1;
2254 }
2255
2256 if (journalf->size() == 0) // empty file, nothing to replay
2257 return 0;
2258
2259 ssize_t readIn = 0;
2260
2261 do
2262 {
2263 readIn = journalf->read((char*) &len, sizeof(len));
2264
2265 if (readIn > 0)
2266 {
2267
2268 cmd.needAtLeast(len);
2269 readIn = journalf->read((char*) cmd.getInputPtr(), len);
2270 cmd.advanceInputPtr(len);
2271
2272 try
2273 {
2274 processCommand(cmd);
2275 }
2276 catch (exception& e)
2277 {
2278 cout << e.what() << " Journal replay was incomplete." << endl;
2279 slave->undoChanges();
2280 return -1;
2281 }
2282
2283 slave->confirmChanges();
2284 cmd.restart();
2285 ret++;
2286 }
2287 } while (readIn > 0);
2288
2289 return ret;
2290 }
2291
do_takeSnapshot()2292 void SlaveComm::do_takeSnapshot()
2293 {
2294 ByteStream reply;
2295
2296 if (printOnly)
2297 {
2298 cout << "takeSnapshot" << endl;
2299 return;
2300 }
2301
2302 takeSnapshot = true;
2303 do_confirm();
2304 reply << (uint8_t) 0;
2305
2306 if (!standalone)
2307 master.write(reply);
2308 }
2309
saveDelta()2310 void SlaveComm::saveDelta()
2311 {
2312 try
2313 {
2314 uint32_t len = delta.length();
2315
2316 journalh->seek(0, SEEK_END);
2317 journalh->write((const char*) &len, sizeof(len));
2318 journalh->write((const char*) delta.buf(), delta.length());
2319 journalh->flush();
2320 journalCount++;
2321 }
2322 catch (exception& e)
2323 {
2324 cerr << "Journal write error: " << e.what() << endl;
2325 throw;
2326 }
2327 }
2328
printJournal(string prefix)2329 int SlaveComm::printJournal(string prefix)
2330 {
2331 int ret;
2332 printOnly = true;
2333 ret = replayJournal(prefix);
2334 printOnly = false;
2335 return ret;
2336 }
2337
do_ownerCheck(ByteStream & msg)2338 void SlaveComm::do_ownerCheck(ByteStream& msg)
2339 {
2340 string processName;
2341 uint32_t pid;
2342 ByteStream::byte rb = 0;
2343
2344 msg >> processName >> pid;
2345 idbassert(msg.length() == 0);
2346
2347 if (standalone)
2348 return;
2349
2350 if (processExists(pid, processName))
2351 rb++;
2352
2353 ByteStream reply;
2354 reply << rb;
2355 master.write(reply);
2356 }
2357
2358 //FIXME: needs to be refactored along with SessionManagerServer::lookupProcessStatus()
2359 #if defined(__linux__)
processExists(const uint32_t pid,const string & pname)2360 bool SlaveComm::processExists(const uint32_t pid, const string& pname)
2361 {
2362 string stat;
2363 ostringstream procFileName;
2364 ostringstream statProcessField;
2365 ifstream in;
2366 string::size_type pos;
2367 ByteStream reply;
2368 char buf[2048];
2369
2370 procFileName << "/proc/" << pid << "/stat";
2371 in.open(procFileName.str().c_str());
2372
2373 if (!in)
2374 {
2375 return false;
2376 }
2377
2378 statProcessField << "(" << pname << ")";
2379
2380 in.getline(buf, 1024);
2381 stat = buf;
2382 pos = stat.find(statProcessField.str());
2383
2384 if (pos == string::npos)
2385 {
2386 return false;
2387 }
2388
2389 return true;
2390 }
2391
2392
2393 #elif defined(_MSC_VER)
2394 //FIXME
processExists(const uint32_t pid,const string & pname)2395 bool SlaveComm::processExists(const uint32_t pid, const string& pname)
2396 {
2397 boost::mutex::scoped_lock lk(fPidMemLock);
2398
2399 if (!fPids)
2400 fPids = (DWORD*)malloc(fMaxPids * sizeof(DWORD));
2401
2402 DWORD needed = 0;
2403
2404 if (EnumProcesses(fPids, fMaxPids * sizeof(DWORD), &needed) == 0)
2405 return false;
2406
2407 while (needed == fMaxPids * sizeof(DWORD))
2408 {
2409 fMaxPids *= 2;
2410 fPids = (DWORD*)realloc(fPids, fMaxPids * sizeof(DWORD));
2411
2412 if (EnumProcesses(fPids, fMaxPids * sizeof(DWORD), &needed) == 0)
2413 return false;
2414 }
2415
2416 DWORD numPids = needed / sizeof(DWORD);
2417
2418 for (DWORD i = 0; i < numPids; i++)
2419 {
2420 if (fPids[i] == pid)
2421 {
2422 TCHAR szProcessName[MAX_PATH] = TEXT("<unknown>");
2423
2424 // Get a handle to the process.
2425 HANDLE hProcess = OpenProcess(PROCESS_QUERY_INFORMATION |
2426 PROCESS_VM_READ,
2427 FALSE, fPids[i]);
2428
2429 // Get the process name.
2430 if (hProcess != NULL)
2431 {
2432 HMODULE hMod;
2433 DWORD cbNeeded;
2434
2435 if (EnumProcessModules(hProcess, &hMod, sizeof(hMod), &cbNeeded))
2436 GetModuleBaseName(hProcess, hMod, szProcessName,
2437 sizeof(szProcessName) / sizeof(TCHAR));
2438
2439 CloseHandle(hProcess);
2440
2441 if (pname == szProcessName)
2442 return true;
2443 }
2444 }
2445 }
2446
2447 return false;
2448 }
2449 #elif defined(__FreeBSD__)
2450 //FIXME
processExists(const uint32_t pid,const string & pname)2451 bool SlaveComm::processExists(const uint32_t pid, const string& pname)
2452 {
2453 return false;
2454 }
2455 #else
2456 #error Need to port processExists()
2457 #endif
2458
do_dmlLockLBIDRanges(ByteStream & msg)2459 void SlaveComm::do_dmlLockLBIDRanges(ByteStream& msg)
2460 {
2461 ByteStream reply;
2462 vector<LBIDRange> ranges;
2463 int txnID;
2464 uint32_t tmp32;
2465 int err;
2466
2467 deserializeVector<LBIDRange>(msg, ranges);
2468 msg >> tmp32;
2469 assert(msg.length() == 0);
2470 txnID = (int) tmp32;
2471
2472 if (printOnly)
2473 {
2474 cout << "dmlLockLBIDRanges: transID=" << txnID << " size="
2475 << ranges.size() << " ranges..." << endl;
2476
2477 for (uint32_t i = 0; i < ranges.size(); i++)
2478 cout << " start=" << ranges[i].start << " size=" << ranges[i].size << endl;
2479
2480 return;
2481 }
2482
2483 err = slave->dmlLockLBIDRanges(ranges, txnID);
2484
2485 reply << (uint8_t) err;
2486 #ifdef BRM_VERBOSE
2487 cerr << "WorkerComm: do_dmlLockLBIDRanges() err code is " << err << endl;
2488 #endif
2489
2490 if (!standalone)
2491 master.write(reply);
2492
2493 doSaveDelta = true;
2494 }
2495
do_dmlReleaseLBIDRanges(ByteStream & msg)2496 void SlaveComm::do_dmlReleaseLBIDRanges(ByteStream& msg)
2497 {
2498 ByteStream reply;
2499 vector<LBIDRange> ranges;
2500 int err;
2501
2502 deserializeVector<LBIDRange>(msg, ranges);
2503
2504 if (printOnly)
2505 {
2506 cout << "dmlLockLBIDRanges: size=" << ranges.size() << " ranges..." << endl;
2507
2508 for (uint32_t i = 0; i < ranges.size(); i++)
2509 cout << " start=" << ranges[i].start << " size=" << ranges[i].size << endl;
2510
2511 return;
2512 }
2513
2514 err = slave->dmlReleaseLBIDRanges(ranges);
2515
2516 reply << (uint8_t) err;
2517 #ifdef BRM_VERBOSE
2518 cerr << "WorkerComm: do_dmlReleaseLBIDRanges() err code is " << err << endl;
2519 #endif
2520
2521 if (!standalone)
2522 master.write(reply);
2523
2524 doSaveDelta = true;
2525 }
2526
2527 }
2528
2529 // vim:ts=4 sw=4:
2530