1 /* Copyright (c) 2003-2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #include <ndb_global.h>
18
19 #include "Ndbfs.hpp"
20 #include "AsyncFile.hpp"
21 #include "Filename.hpp"
22
23 #include <signaldata/FsOpenReq.hpp>
24 #include <signaldata/FsCloseReq.hpp>
25 #include <signaldata/FsReadWriteReq.hpp>
26 #include <signaldata/FsAppendReq.hpp>
27 #include <signaldata/FsRemoveReq.hpp>
28 #include <signaldata/FsConf.hpp>
29 #include <signaldata/FsRef.hpp>
30 #include <signaldata/NdbfsContinueB.hpp>
31 #include <signaldata/DumpStateOrd.hpp>
32
33 #include <RefConvert.hpp>
34 #include <NdbSleep.h>
35 #include <NdbOut.hpp>
36 #include <Configuration.hpp>
37
38 #define DEBUG(x) { ndbout << "FS::" << x << endl; }
39
40 inline
pageSize(const NewVARIABLE * baseAddrRef)41 int pageSize( const NewVARIABLE* baseAddrRef )
42 {
43 int log_psize;
44 int log_qsize = baseAddrRef->bits.q;
45 int log_vsize = baseAddrRef->bits.v;
46 if (log_vsize < 3)
47 log_vsize = 3;
48 log_psize = log_qsize + log_vsize - 3;
49 return (1 << log_psize);
50 }
51
52
Ndbfs(Block_context & ctx)53 Ndbfs::Ndbfs(Block_context& ctx) :
54 SimulatedBlock(NDBFS, ctx),
55 scanningInProgress(false),
56 theLastId(0),
57 theRequestPool(0),
58 m_maxOpenedFiles(0)
59 {
60 BLOCK_CONSTRUCTOR(Ndbfs);
61
62 // Set received signals
63 addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);
64 addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD);
65 addRecSignal(GSN_STTOR, &Ndbfs::execSTTOR);
66 addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);
67 addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);
68 addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);
69 addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);
70 addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);
71 addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);
72 addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);
73 addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);
74 // Set send signals
75 }
76
~Ndbfs()77 Ndbfs::~Ndbfs()
78 {
79 // Delete all files
80 // AsyncFile destuctor will take care of deleting
81 // the thread it has created
82 for (unsigned i = 0; i < theFiles.size(); i++){
83 AsyncFile* file = theFiles[i];
84 delete file;
85 theFiles[i] = NULL;
86 }//for
87 theFiles.clear();
88 if (theRequestPool)
89 delete theRequestPool;
90 }
91
92 void
execREAD_CONFIG_REQ(Signal * signal)93 Ndbfs::execREAD_CONFIG_REQ(Signal* signal)
94 {
95 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
96
97 Uint32 ref = req->senderRef;
98 Uint32 senderData = req->senderData;
99
100 const ndb_mgm_configuration_iterator * p =
101 m_ctx.m_config.getOwnConfigIterator();
102 ndbrequire(p != 0);
103 theFileSystemPath.assfmt("%sndb_%u_fs%s", m_ctx.m_config.fileSystemPath(),
104 getOwnNodeId(), DIR_SEPARATOR);
105 theBackupFilePath.assign(m_ctx.m_config.backupFilePath());
106
107 theRequestPool = new Pool<Request>;
108
109 m_maxFiles = 0;
110 ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
111 Uint32 noIdleFiles = 27;
112 ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
113 if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
114 m_maxFiles = noIdleFiles;
115 // Create idle AsyncFiles
116 for (Uint32 i = 0; i < noIdleFiles; i++){
117 theIdleFiles.push_back(createAsyncFile());
118 }
119
120 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
121 conf->senderRef = reference();
122 conf->senderData = senderData;
123 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
124 ReadConfigConf::SignalLength, JBB);
125 }
126
127 /* Received a restart signal.
128 * Answer it like any other block
129 * PR0 : StartCase
130 * DR0 : StartPhase
131 * DR1 : ?
132 * DR2 : ?
133 * DR3 : ?
134 * DR4 : ?
135 * DR5 : SignalKey
136 */
137 void
execSTTOR(Signal * signal)138 Ndbfs::execSTTOR(Signal* signal)
139 {
140 jamEntry();
141
142 if(signal->theData[1] == 0){ // StartPhase 0
143 jam();
144
145 {
146 #ifdef NDB_WIN32
147 CreateDirectory(theFileSystemPath.c_str(), 0);
148 #else
149 mkdir(theFileSystemPath.c_str(),
150 S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
151 #endif
152 }
153
154 cownref = NDBFS_REF;
155 // close all open files
156 ndbrequire(theOpenFiles.size() == 0);
157
158 scanningInProgress = false;
159
160 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
161 sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 1);
162
163 signal->theData[3] = 255;
164 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);
165 return;
166 }
167 ndbrequire(0);
168 }
169
170 int
forward(AsyncFile * file,Request * request)171 Ndbfs::forward( AsyncFile * file, Request* request)
172 {
173 jam();
174 file->execute(request);
175 return 1;
176 }
177
178 void
execFSOPENREQ(Signal * signal)179 Ndbfs::execFSOPENREQ(Signal* signal)
180 {
181 jamEntry();
182 const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
183 const BlockReference userRef = fsOpenReq->userReference;
184 AsyncFile* file = getIdleFile();
185 ndbrequire(file != NULL);
186 Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
187
188 Uint32 userPointer = fsOpenReq->userPointer;
189
190 if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
191 {
192 Ptr<GlobalPage> page_ptr;
193 if(m_global_page_pool.seize(page_ptr) == false)
194 {
195 FsRef * const fsRef = (FsRef *)&signal->theData[0];
196 fsRef->userPointer = userPointer;
197 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
198 fsRef->osErrorCode = ~0; // Indicate local error
199 sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
200 return;
201 }
202 file->m_page_ptr = page_ptr;
203 }
204 else
205 {
206 ndbassert(file->m_page_ptr.isNull());
207 file->m_page_ptr.setNull();
208 }
209
210 if(signal->getNoOfSections() == 0){
211 jam();
212 file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
213 } else {
214 jam();
215 SegmentedSectionPtr ptr;
216 signal->getSection(ptr, FsOpenReq::FILENAME);
217 file->theFileName.set(spec, ptr, g_sectionSegmentPool);
218 releaseSections(signal);
219 }
220 file->reportTo(&theFromThreads);
221 if (getenv("NDB_TRACE_OPEN"))
222 ndbout_c("open(%s)", file->theFileName.c_str());
223
224 Request* request = theRequestPool->get();
225 request->action = Request::open;
226 request->error = 0;
227 request->set(userRef, userPointer, newId() );
228 request->file = file;
229 request->theTrace = signal->getTrace();
230 request->par.open.flags = fsOpenReq->fileFlags;
231 request->par.open.page_size = fsOpenReq->page_size;
232 request->par.open.file_size = fsOpenReq->file_size_hi;
233 request->par.open.file_size <<= 32;
234 request->par.open.file_size |= fsOpenReq->file_size_lo;
235 request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
236
237 ndbrequire(forward(file, request));
238 }
239
240 void
execFSREMOVEREQ(Signal * signal)241 Ndbfs::execFSREMOVEREQ(Signal* signal)
242 {
243 jamEntry();
244 const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
245 const BlockReference userRef = req->userReference;
246 AsyncFile* file = getIdleFile();
247 ndbrequire(file != NULL);
248
249 Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
250 file->theFileName.set(spec, userRef, req->fileNumber, req->directory);
251 file->reportTo(&theFromThreads);
252
253 Request* request = theRequestPool->get();
254 request->action = Request::rmrf;
255 request->par.rmrf.directory = req->directory;
256 request->par.rmrf.own_directory = req->ownDirectory;
257 request->error = 0;
258 request->set(userRef, req->userPointer, newId() );
259 request->file = file;
260 request->theTrace = signal->getTrace();
261
262 ndbrequire(forward(file, request));
263 }
264
265 /*
266 * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1
267 * remove file
268 */
269 void
execFSCLOSEREQ(Signal * signal)270 Ndbfs::execFSCLOSEREQ(Signal * signal)
271 {
272 jamEntry();
273 const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0];
274 const BlockReference userRef = fsCloseReq->userReference;
275 const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;
276 const UintR userPointer = fsCloseReq->userPointer;
277
278 AsyncFile* openFile = theOpenFiles.find(filePointer);
279 if (openFile == NULL) {
280 // The file was not open, send error back to sender
281 jam();
282 // Initialise FsRef signal
283 FsRef * const fsRef = (FsRef *)&signal->theData[0];
284 fsRef->userPointer = userPointer;
285 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
286 fsRef->osErrorCode = ~0; // Indicate local error
287 sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);
288 return;
289 }
290
291 Request *request = theRequestPool->get();
292 if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
293 jam();
294 request->action = Request::closeRemove;
295 } else {
296 jam();
297 request->action = Request::close;
298 }
299 request->set(userRef, fsCloseReq->userPointer, filePointer);
300 request->file = openFile;
301 request->error = 0;
302 request->theTrace = signal->getTrace();
303
304 ndbrequire(forward(openFile, request));
305 }
306
307 void
readWriteRequest(int action,Signal * signal)308 Ndbfs::readWriteRequest(int action, Signal * signal)
309 {
310 const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0];
311 Uint16 filePointer = (Uint16)fsRWReq->filePointer;
312 const UintR userPointer = fsRWReq->userPointer;
313 const BlockReference userRef = fsRWReq->userReference;
314 const BlockNumber blockNumber = refToBlock(userRef);
315
316 AsyncFile* openFile = theOpenFiles.find(filePointer);
317
318 const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex];
319 UintPtr tPageSize;
320 UintPtr tClusterSize;
321 UintPtr tNRR;
322 UintPtr tPageOffset;
323 char* tWA;
324 FsRef::NdbfsErrorCodeType errorCode;
325
326 Request *request = theRequestPool->get();
327 request->error = 0;
328 request->set(userRef, userPointer, filePointer);
329 request->file = openFile;
330 request->action = (Request::Action) action;
331 request->theTrace = signal->getTrace();
332
333 Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
334
335 if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed
336 jam();
337 errorCode = FsRef::fsErrInvalidParameters;
338 goto error;
339 }
340
341 if(format != FsReadWriteReq::fsFormatGlobalPage &&
342 format != FsReadWriteReq::fsFormatSharedPage)
343 {
344 if (fsRWReq->varIndex >= getBatSize(blockNumber)) {
345 jam();// Ensure that a valid variable is used
346 errorCode = FsRef::fsErrInvalidParameters;
347 goto error;
348 }
349 if (myBaseAddrRef == NULL) {
350 jam(); // Ensure that a valid variable is used
351 errorCode = FsRef::fsErrInvalidParameters;
352 goto error;
353 }
354 if (openFile == NULL) {
355 jam(); //file not open
356 errorCode = FsRef::fsErrFileDoesNotExist;
357 goto error;
358 }
359 tPageSize = pageSize(myBaseAddrRef);
360 tClusterSize = myBaseAddrRef->ClusterSize;
361 tNRR = myBaseAddrRef->nrr;
362 tWA = (char*)myBaseAddrRef->WA;
363
364 switch (format) {
365
366 // List of memory and file pages pairs
367 case FsReadWriteReq::fsFormatListOfPairs: {
368 jam();
369 for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
370 jam();
371 const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex;
372 const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset;
373 if (varIndex >= tNRR) {
374 jam();
375 errorCode = FsRef::fsErrInvalidParameters;
376 goto error;
377 }//if
378 request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
379 request->par.readWrite.pages[i].size = tPageSize;
380 request->par.readWrite.pages[i].offset = fileOffset * tPageSize;
381 }//for
382 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
383 break;
384 }//case
385
386 // Range of memory page with one file page
387 case FsReadWriteReq::fsFormatArrayOfPages: {
388 if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {
389 jam();
390 errorCode = FsRef::fsErrInvalidParameters;
391 goto error;
392 }//if
393 const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex;
394 const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset;
395
396 request->par.readWrite.pages[0].offset = fileOffset * tPageSize;
397 request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;
398 request->par.readWrite.numberOfPages = 1;
399 request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];
400 break;
401 }//case
402
403 // List of memory pages followed by one file page
404 case FsReadWriteReq::fsFormatListOfMemPages: {
405
406 tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];
407 tPageOffset *= tPageSize;
408
409 for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
410 jam();
411 UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i];
412
413 if (varIndex >= tNRR) {
414 jam();
415 errorCode = FsRef::fsErrInvalidParameters;
416 goto error;
417 }//if
418 request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
419 request->par.readWrite.pages[i].size = tPageSize;
420 request->par.readWrite.pages[i].offset = tPageOffset + (i*tPageSize);
421 }//for
422 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
423 break;
424 // make it a writev or readv
425 }//case
426
427 default: {
428 jam();
429 errorCode = FsRef::fsErrInvalidParameters;
430 goto error;
431 }//default
432 }//switch
433 }
434 else if (format == FsReadWriteReq::fsFormatGlobalPage)
435 {
436 Ptr<GlobalPage> ptr;
437 m_global_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
438 request->par.readWrite.pages[0].buf = (char*)ptr.p;
439 request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
440 request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
441 request->par.readWrite.numberOfPages = 1;
442 }
443 else
444 {
445 ndbrequire(format == FsReadWriteReq::fsFormatSharedPage);
446 Ptr<GlobalPage> ptr;
447 m_shared_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
448 request->par.readWrite.pages[0].buf = (char*)ptr.p;
449 request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
450 request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
451 request->par.readWrite.numberOfPages = 1;
452 }
453
454 ndbrequire(forward(openFile, request));
455 return;
456
457 error:
458 theRequestPool->put(request);
459 FsRef * const fsRef = (FsRef *)&signal->theData[0];
460 fsRef->userPointer = userPointer;
461 fsRef->setErrorCode(fsRef->errorCode, errorCode);
462 fsRef->osErrorCode = ~0; // Indicate local error
463 switch (action) {
464 case Request:: write:
465 case Request:: writeSync: {
466 jam();
467 sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);
468 break;
469 }//case
470 case Request:: readPartial:
471 case Request:: read: {
472 jam();
473 sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
474 }//case
475 }//switch
476 return;
477 }
478
479 /*
480 PR0: File Pointer , theData[0]
481 DR0: User reference, theData[1]
482 DR1: User Pointer, etc.
483 DR2: Flag
484 DR3: Var number
485 DR4: amount of pages
486 DR5->: Memory Page id and File page id according to Flag
487 */
488 void
execFSWRITEREQ(Signal * signal)489 Ndbfs::execFSWRITEREQ(Signal* signal)
490 {
491 jamEntry();
492 const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0];
493
494 if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){
495 jam();
496 readWriteRequest( Request::writeSync, signal );
497 } else {
498 jam();
499 readWriteRequest( Request::write, signal );
500 }
501 }
502
503 /*
504 PR0: File Pointer
505 DR0: User reference
506 DR1: User Pointer
507 DR2: Flag
508 DR3: Var number
509 DR4: amount of pages
510 DR5->: Memory Page id and File page id according to Flag
511 */
512 void
execFSREADREQ(Signal * signal)513 Ndbfs::execFSREADREQ(Signal* signal)
514 {
515 jamEntry();
516 FsReadWriteReq * req = (FsReadWriteReq *)signal->getDataPtr();
517 if (FsReadWriteReq::getPartialReadFlag(req->operationFlag))
518 readWriteRequest( Request::readPartial, signal );
519 else
520 readWriteRequest( Request::read, signal );
521 }
522
523 /*
524 * PR0: File Pointer DR0: User reference DR1: User Pointer
525 */
526 void
execFSSYNCREQ(Signal * signal)527 Ndbfs::execFSSYNCREQ(Signal * signal)
528 {
529 jamEntry();
530 Uint16 filePointer = (Uint16)signal->theData[0];
531 BlockReference userRef = signal->theData[1];
532 const UintR userPointer = signal->theData[2];
533 AsyncFile* openFile = theOpenFiles.find(filePointer);
534
535 if (openFile == NULL) {
536 jam(); //file not open
537 FsRef * const fsRef = (FsRef *)&signal->theData[0];
538 fsRef->userPointer = userPointer;
539 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
540 fsRef->osErrorCode = ~0; // Indicate local error
541 sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);
542 return;
543 }
544
545 Request *request = theRequestPool->get();
546 request->error = 0;
547 request->action = Request::sync;
548 request->set(userRef, userPointer, filePointer);
549 request->file = openFile;
550 request->theTrace = signal->getTrace();
551
552 ndbrequire(forward(openFile,request));
553 }
554
555 void
execFSAPPENDREQ(Signal * signal)556 Ndbfs::execFSAPPENDREQ(Signal * signal)
557 {
558 const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];
559 const Uint16 filePointer = (Uint16)fsReq->filePointer;
560 const UintR userPointer = fsReq->userPointer;
561 const BlockReference userRef = fsReq->userReference;
562 const BlockNumber blockNumber = refToBlock(userRef);
563
564 FsRef::NdbfsErrorCodeType errorCode;
565
566 AsyncFile* openFile = theOpenFiles.find(filePointer);
567 const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex];
568
569 const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA;
570 const Uint32 tSz = myBaseAddrRef->nrr;
571 const Uint32 offset = fsReq->offset;
572 const Uint32 size = fsReq->size;
573 const Uint32 synch_flag = fsReq->synch_flag;
574 Request *request = theRequestPool->get();
575
576 if (openFile == NULL) {
577 jam();
578 errorCode = FsRef::fsErrFileDoesNotExist;
579 goto error;
580 }
581
582 if (myBaseAddrRef == NULL) {
583 jam(); // Ensure that a valid variable is used
584 errorCode = FsRef::fsErrInvalidParameters;
585 goto error;
586 }
587
588 if (fsReq->varIndex >= getBatSize(blockNumber)) {
589 jam();// Ensure that a valid variable is used
590 errorCode = FsRef::fsErrInvalidParameters;
591 goto error;
592 }
593
594 if(offset + size > tSz){
595 jam(); // Ensure that a valid variable is used
596 errorCode = FsRef::fsErrInvalidParameters;
597 goto error;
598 }
599
600 request->error = 0;
601 request->set(userRef, userPointer, filePointer);
602 request->file = openFile;
603 request->theTrace = signal->getTrace();
604
605 request->par.append.buf = (const char *)(tWA + offset);
606 request->par.append.size = size << 2;
607
608 if (!synch_flag)
609 request->action = Request::append;
610 else
611 request->action = Request::append_synch;
612 ndbrequire(forward(openFile, request));
613 return;
614
615 error:
616 jam();
617 theRequestPool->put(request);
618 FsRef * const fsRef = (FsRef *)&signal->theData[0];
619 fsRef->userPointer = userPointer;
620 fsRef->setErrorCode(fsRef->errorCode, errorCode);
621 fsRef->osErrorCode = ~0; // Indicate local error
622
623 jam();
624 sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB);
625 return;
626 }
627
628 Uint16
newId()629 Ndbfs::newId()
630 {
631 // finds a new key, eg a new filepointer
632 for (int i = 1; i < SHRT_MAX; i++)
633 {
634 if (theLastId == SHRT_MAX) {
635 jam();
636 theLastId = 1;
637 } else {
638 jam();
639 theLastId++;
640 }
641
642 if(theOpenFiles.find(theLastId) == NULL) {
643 jam();
644 return theLastId;
645 }
646 }
647 ndbrequire(1 == 0);
648 // The program will not reach this point
649 return 0;
650 }
651
652 AsyncFile*
createAsyncFile()653 Ndbfs::createAsyncFile(){
654
655 // Check limit of open files
656 if (m_maxFiles !=0 && theFiles.size() == m_maxFiles) {
657 // Print info about all open files
658 for (unsigned i = 0; i < theFiles.size(); i++){
659 AsyncFile* file = theFiles[i];
660 ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
661 }
662 ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
663 }
664
665 AsyncFile* file = new AsyncFile(* this);
666 file->doStart();
667
668 // Put the file in list of all files
669 theFiles.push_back(file);
670
671 #ifdef VM_TRACE
672 infoEvent("NDBFS: Created new file thread %d", theFiles.size());
673 #endif
674
675 return file;
676 }
677
678 AsyncFile*
getIdleFile()679 Ndbfs::getIdleFile(){
680 AsyncFile* file;
681 if (theIdleFiles.size() > 0){
682 file = theIdleFiles[0];
683 theIdleFiles.erase(0);
684 } else {
685 file = createAsyncFile();
686 }
687 return file;
688 }
689
690
691
692 void
report(Request * request,Signal * signal)693 Ndbfs::report(Request * request, Signal* signal)
694 {
695 const Uint32 orgTrace = signal->getTrace();
696 signal->setTrace(request->theTrace);
697 const BlockReference ref = request->theUserReference;
698
699 if(!request->file->m_page_ptr.isNull())
700 {
701 m_global_page_pool.release(request->file->m_page_ptr);
702 request->file->m_page_ptr.setNull();
703 }
704
705 if (request->error) {
706 jam();
707 // Initialise FsRef signal
708 FsRef * const fsRef = (FsRef *)&signal->theData[0];
709 fsRef->userPointer = request->theUserPointer;
710 if(request->error & FsRef::FS_ERR_BIT)
711 {
712 fsRef->errorCode = request->error;
713 fsRef->osErrorCode = 0;
714 }
715 else
716 {
717 fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error));
718 fsRef->osErrorCode = request->error;
719 }
720 switch (request->action) {
721 case Request:: open: {
722 jam();
723 // Put the file back in idle files list
724 theIdleFiles.push_back(request->file);
725 sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
726 break;
727 }
728 case Request:: closeRemove:
729 case Request:: close: {
730 jam();
731 sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB);
732 break;
733 }
734 case Request:: writeSync:
735 case Request:: writevSync:
736 case Request:: write:
737 case Request:: writev: {
738 jam();
739 sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB);
740 break;
741 }
742 case Request:: read:
743 case Request:: readPartial:
744 case Request:: readv: {
745 jam();
746 sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB);
747 break;
748 }
749 case Request:: sync: {
750 jam();
751 sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
752 break;
753 }
754 case Request::append:
755 case Request::append_synch:
756 {
757 jam();
758 sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
759 break;
760 }
761 case Request::rmrf: {
762 jam();
763 // Put the file back in idle files list
764 theIdleFiles.push_back(request->file);
765 sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
766 break;
767 }
768
769 case Request:: end: {
770 // Report nothing
771 break;
772 }
773 }//switch
774 } else {
775 jam();
776 FsConf * const fsConf = (FsConf *)&signal->theData[0];
777 fsConf->userPointer = request->theUserPointer;
778 switch (request->action) {
779 case Request:: open: {
780 jam();
781 theOpenFiles.insert(request->file, request->theFilePointer);
782
783 // Keep track on max number of opened files
784 if (theOpenFiles.size() > m_maxOpenedFiles)
785 m_maxOpenedFiles = theOpenFiles.size();
786
787 fsConf->filePointer = request->theFilePointer;
788 sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBB);
789 break;
790 }
791 case Request:: closeRemove:
792 case Request:: close: {
793 jam();
794 // removes the file from OpenFiles list
795 theOpenFiles.erase(request->theFilePointer);
796 // Put the file in idle files list
797 theIdleFiles.push_back(request->file);
798 sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB);
799 break;
800 }
801 case Request:: writeSync:
802 case Request:: writevSync:
803 case Request:: write:
804 case Request:: writev: {
805 jam();
806 sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBB);
807 break;
808 }
809 case Request:: read:
810 case Request:: readv: {
811 jam();
812 sendSignal(ref, GSN_FSREADCONF, signal, 1, JBB);
813 break;
814 }
815 case Request:: readPartial: {
816 jam();
817 fsConf->bytes_read = request->par.readWrite.pages[0].size;
818 sendSignal(ref, GSN_FSREADCONF, signal, 2, JBB);
819 break;
820 }
821 case Request:: sync: {
822 jam();
823 sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB);
824 break;
825 }//case
826 case Request::append:
827 case Request::append_synch:
828 {
829 jam();
830 signal->theData[1] = request->par.append.size;
831 sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB);
832 break;
833 }
834 case Request::rmrf: {
835 jam();
836 // Put the file in idle files list
837 theIdleFiles.push_back(request->file);
838 sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB);
839 break;
840 }
841 case Request:: end: {
842 // Report nothing
843 break;
844 }
845 }
846 }//if
847 signal->setTrace(orgTrace);
848 }
849
850
851 bool
scanIPC(Signal * signal)852 Ndbfs::scanIPC(Signal* signal)
853 {
854 Request* request = theFromThreads.tryReadChannel();
855 jam();
856 if (request) {
857 jam();
858 report(request, signal);
859 theRequestPool->put(request);
860 return true;
861 }
862 return false;
863 }
864
865 #if defined NDB_WIN32
translateErrno(int aErrno)866 Uint32 Ndbfs::translateErrno(int aErrno)
867 {
868 switch (aErrno)
869 {
870 //permission denied
871 case ERROR_ACCESS_DENIED:
872
873 return FsRef::fsErrPermissionDenied;
874 //temporary not accessible
875 case ERROR_PATH_BUSY:
876 case ERROR_NO_MORE_SEARCH_HANDLES:
877
878 return FsRef::fsErrTemporaryNotAccessible;
879 //no space left on device
880 case ERROR_HANDLE_DISK_FULL:
881 case ERROR_DISK_FULL:
882
883 return FsRef::fsErrNoSpaceLeftOnDevice;
884 //none valid parameters
885 case ERROR_INVALID_HANDLE:
886 case ERROR_INVALID_DRIVE:
887 case ERROR_INVALID_ACCESS:
888 case ERROR_HANDLE_EOF:
889 case ERROR_BUFFER_OVERFLOW:
890
891 return FsRef::fsErrInvalidParameters;
892 //environment error
893 case ERROR_CRC:
894 case ERROR_ARENA_TRASHED:
895 case ERROR_BAD_ENVIRONMENT:
896 case ERROR_INVALID_BLOCK:
897 case ERROR_WRITE_FAULT:
898 case ERROR_READ_FAULT:
899 case ERROR_OPEN_FAILED:
900
901 return FsRef::fsErrEnvironmentError;
902
903 //no more process resources
904 case ERROR_TOO_MANY_OPEN_FILES:
905 case ERROR_NOT_ENOUGH_MEMORY:
906 case ERROR_OUTOFMEMORY:
907 return FsRef::fsErrNoMoreResources;
908 //no file
909 case ERROR_FILE_NOT_FOUND:
910 return FsRef::fsErrFileDoesNotExist;
911
912 case ERR_ReadUnderflow:
913 return FsRef::fsErrReadUnderflow;
914
915 default:
916 return FsRef::fsErrUnknown;
917 }
918 }
919 #else
translateErrno(int aErrno)920 Uint32 Ndbfs::translateErrno(int aErrno)
921 {
922 switch (aErrno)
923 {
924 //permission denied
925 case EACCES:
926 case EROFS:
927 case ENXIO:
928 return FsRef::fsErrPermissionDenied;
929 //temporary not accessible
930 case EAGAIN:
931 case ETIMEDOUT:
932 case ENOLCK:
933 case EINTR:
934 case EIO:
935 return FsRef::fsErrTemporaryNotAccessible;
936 //no space left on device
937 case ENFILE:
938 case EDQUOT:
939 #ifdef ENOSR
940 case ENOSR:
941 #endif
942 case ENOSPC:
943 case EFBIG:
944 return FsRef::fsErrNoSpaceLeftOnDevice;
945 //none valid parameters
946 case EINVAL:
947 case EBADF:
948 case ENAMETOOLONG:
949 case EFAULT:
950 case EISDIR:
951 case ENOTDIR:
952 case EEXIST:
953 case ETXTBSY:
954 return FsRef::fsErrInvalidParameters;
955 //environment error
956 case ELOOP:
957 #ifdef ENOLINK
958 case ENOLINK:
959 #endif
960 #ifdef EMULTIHOP
961 case EMULTIHOP:
962 #endif
963 #ifdef EOPNOTSUPP
964 case EOPNOTSUPP:
965 #endif
966 #ifdef ESPIPE
967 case ESPIPE:
968 #endif
969 case EPIPE:
970 return FsRef::fsErrEnvironmentError;
971
972 //no more process resources
973 case EMFILE:
974 case ENOMEM:
975 return FsRef::fsErrNoMoreResources;
976 //no file
977 case ENOENT:
978 return FsRef::fsErrFileDoesNotExist;
979
980 case ERR_ReadUnderflow:
981 return FsRef::fsErrReadUnderflow;
982
983 default:
984 return FsRef::fsErrUnknown;
985 }
986 }
987 #endif
988
989
990
991 void
execCONTINUEB(Signal * signal)992 Ndbfs::execCONTINUEB(Signal* signal)
993 {
994 jamEntry();
995 if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) {
996 jam();
997
998 // Also send CONTINUEB to ourself in order to scan for
999 // incoming answers from AsyncFile on MemoryChannel theFromThreads
1000 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
1001 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
1002 if (scanningInProgress == true) {
1003 jam();
1004 return;
1005 }
1006 }
1007 if (scanIPC(signal)) {
1008 jam();
1009 scanningInProgress = true;
1010 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1011 sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1012 } else {
1013 jam();
1014 scanningInProgress = false;
1015 }
1016 return;
1017 }
1018
1019 void
execDUMP_STATE_ORD(Signal * signal)1020 Ndbfs::execDUMP_STATE_ORD(Signal* signal)
1021 {
1022 if(signal->theData[0] == 19){
1023 return;
1024 }
1025 if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){
1026 infoEvent("NDBFS: Files: %d Open files: %d",
1027 theFiles.size(),
1028 theOpenFiles.size());
1029 infoEvent(" Idle files: %d Max opened files: %d",
1030 theIdleFiles.size(),
1031 m_maxOpenedFiles);
1032 infoEvent(" Max files: %d",
1033 m_maxFiles);
1034 infoEvent(" Requests: %d",
1035 theRequestPool->size());
1036
1037 return;
1038 }
1039 if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){
1040 infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size());
1041
1042 for (unsigned i = 0; i < theOpenFiles.size(); i++){
1043 AsyncFile* file = theOpenFiles.getFile(i);
1044 infoEvent("%2d (0x%x): %s", i,file, file->theFileName.c_str());
1045 }
1046 return;
1047 }
1048 if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){
1049 infoEvent("NDBFS: Dump all files: %d", theFiles.size());
1050
1051 for (unsigned i = 0; i < theFiles.size(); i++){
1052 AsyncFile* file = theFiles[i];
1053 infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED");
1054 }
1055 return;
1056 }
1057 if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
1058 infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size());
1059
1060 for (unsigned i = 0; i < theIdleFiles.size(); i++){
1061 AsyncFile* file = theIdleFiles[i];
1062 infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED");
1063 }
1064 return;
1065 }
1066
1067 if(signal->theData[0] == 404)
1068 {
1069 ndbrequire(signal->getLength() == 2);
1070 Uint32 file= signal->theData[1];
1071 AsyncFile* openFile = theOpenFiles.find(file);
1072 ndbrequire(openFile != 0);
1073 ndbout_c("File: %s %p", openFile->theFileName.c_str(), openFile);
1074 Request* curr = openFile->m_current_request;
1075 Request* last = openFile->m_last_request;
1076 if(curr)
1077 ndbout << "Current request: " << *curr << endl;
1078 if(last)
1079 ndbout << "Last request: " << *last << endl;
1080
1081 ndbout << "theReportTo " << *openFile->theReportTo << endl;
1082 ndbout << "theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl;
1083
1084 ndbout << "All files: " << endl;
1085 for (unsigned i = 0; i < theFiles.size(); i++){
1086 AsyncFile* file = theFiles[i];
1087 ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
1088 }
1089 }
1090 }//Ndbfs::execDUMP_STATE_ORD()
1091
1092 const char*
get_filename(Uint32 fd) const1093 Ndbfs::get_filename(Uint32 fd) const
1094 {
1095 jamEntry();
1096 const AsyncFile* openFile = theOpenFiles.find(fd);
1097 if(openFile)
1098 return openFile->theFileName.get_base_name();
1099 return "";
1100 }
1101
1102
1103 BLOCK_FUNCTIONS(Ndbfs)
1104
1105 template class Vector<AsyncFile*>;
1106 template class Vector<OpenFiles::OpenFileItem>;
1107 template class MemoryChannel<Request>;
1108 template class Pool<Request>;
1109 template NdbOut& operator<<(NdbOut&, const MemoryChannel<Request>&);
1110