1 /* Copyright (c) 2003-2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #include <ndb_global.h>
18 
19 #include "Ndbfs.hpp"
20 #include "AsyncFile.hpp"
21 #include "Filename.hpp"
22 
23 #include <signaldata/FsOpenReq.hpp>
24 #include <signaldata/FsCloseReq.hpp>
25 #include <signaldata/FsReadWriteReq.hpp>
26 #include <signaldata/FsAppendReq.hpp>
27 #include <signaldata/FsRemoveReq.hpp>
28 #include <signaldata/FsConf.hpp>
29 #include <signaldata/FsRef.hpp>
30 #include <signaldata/NdbfsContinueB.hpp>
31 #include <signaldata/DumpStateOrd.hpp>
32 
33 #include <RefConvert.hpp>
34 #include <NdbSleep.h>
35 #include <NdbOut.hpp>
36 #include <Configuration.hpp>
37 
38 #define DEBUG(x) { ndbout << "FS::" << x << endl; }
39 
40 inline
pageSize(const NewVARIABLE * baseAddrRef)41 int pageSize( const NewVARIABLE* baseAddrRef )
42 {
43    int log_psize;
44    int log_qsize = baseAddrRef->bits.q;
45    int log_vsize = baseAddrRef->bits.v;
46    if (log_vsize < 3)
47       log_vsize = 3;
48    log_psize = log_qsize + log_vsize - 3;
49    return (1 << log_psize);
50 }
51 
52 
Ndbfs(Block_context & ctx)53 Ndbfs::Ndbfs(Block_context& ctx) :
54   SimulatedBlock(NDBFS, ctx),
55   scanningInProgress(false),
56   theLastId(0),
57   theRequestPool(0),
58   m_maxOpenedFiles(0)
59 {
60   BLOCK_CONSTRUCTOR(Ndbfs);
61 
62   // Set received signals
63   addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);
64   addRecSignal(GSN_DUMP_STATE_ORD,  &Ndbfs::execDUMP_STATE_ORD);
65   addRecSignal(GSN_STTOR,  &Ndbfs::execSTTOR);
66   addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);
67   addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);
68   addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);
69   addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);
70   addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);
71   addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);
72   addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);
73   addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);
74    // Set send signals
75 }
76 
~Ndbfs()77 Ndbfs::~Ndbfs()
78 {
79   // Delete all files
80   // AsyncFile destuctor will take care of deleting
81   // the thread it has created
82   for (unsigned i = 0; i < theFiles.size(); i++){
83     AsyncFile* file = theFiles[i];
84     delete file;
85     theFiles[i] = NULL;
86   }//for
87   theFiles.clear();
88   if (theRequestPool)
89     delete theRequestPool;
90 }
91 
92 void
execREAD_CONFIG_REQ(Signal * signal)93 Ndbfs::execREAD_CONFIG_REQ(Signal* signal)
94 {
95   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
96 
97   Uint32 ref = req->senderRef;
98   Uint32 senderData = req->senderData;
99 
100   const ndb_mgm_configuration_iterator * p =
101     m_ctx.m_config.getOwnConfigIterator();
102   ndbrequire(p != 0);
103   theFileSystemPath.assfmt("%sndb_%u_fs%s", m_ctx.m_config.fileSystemPath(),
104 			   getOwnNodeId(), DIR_SEPARATOR);
105   theBackupFilePath.assign(m_ctx.m_config.backupFilePath());
106 
107   theRequestPool = new Pool<Request>;
108 
109   m_maxFiles = 0;
110   ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
111   Uint32 noIdleFiles = 27;
112   ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
113   if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
114     m_maxFiles = noIdleFiles;
115   // Create idle AsyncFiles
116   for (Uint32 i = 0; i < noIdleFiles; i++){
117     theIdleFiles.push_back(createAsyncFile());
118   }
119 
120   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
121   conf->senderRef = reference();
122   conf->senderData = senderData;
123   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
124 	     ReadConfigConf::SignalLength, JBB);
125 }
126 
127 /* Received a restart signal.
128  * Answer it like any other block
129  * PR0  : StartCase
130  * DR0  : StartPhase
131  * DR1  : ?
132  * DR2  : ?
133  * DR3  : ?
134  * DR4  : ?
135  * DR5  : SignalKey
136  */
137 void
execSTTOR(Signal * signal)138 Ndbfs::execSTTOR(Signal* signal)
139 {
140   jamEntry();
141 
142   if(signal->theData[1] == 0){ // StartPhase 0
143     jam();
144 
145     {
146 #ifdef NDB_WIN32
147       CreateDirectory(theFileSystemPath.c_str(), 0);
148 #else
149       mkdir(theFileSystemPath.c_str(),
150 	    S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
151 #endif
152     }
153 
154     cownref = NDBFS_REF;
155     // close all open files
156     ndbrequire(theOpenFiles.size() == 0);
157 
158     scanningInProgress = false;
159 
160     signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
161     sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 1);
162 
163     signal->theData[3] = 255;
164     sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);
165     return;
166   }
167   ndbrequire(0);
168 }
169 
170 int
forward(AsyncFile * file,Request * request)171 Ndbfs::forward( AsyncFile * file, Request* request)
172 {
173   jam();
174   file->execute(request);
175   return 1;
176 }
177 
178 void
execFSOPENREQ(Signal * signal)179 Ndbfs::execFSOPENREQ(Signal* signal)
180 {
181   jamEntry();
182   const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
183   const BlockReference userRef = fsOpenReq->userReference;
184   AsyncFile* file = getIdleFile();
185   ndbrequire(file != NULL);
186   Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
187 
188   Uint32 userPointer = fsOpenReq->userPointer;
189 
190   if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
191   {
192     Ptr<GlobalPage> page_ptr;
193     if(m_global_page_pool.seize(page_ptr) == false)
194     {
195       FsRef * const fsRef = (FsRef *)&signal->theData[0];
196       fsRef->userPointer  = userPointer;
197       fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
198       fsRef->osErrorCode  = ~0; // Indicate local error
199       sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
200       return;
201     }
202     file->m_page_ptr = page_ptr;
203   }
204   else
205   {
206     ndbassert(file->m_page_ptr.isNull());
207     file->m_page_ptr.setNull();
208   }
209 
210   if(signal->getNoOfSections() == 0){
211     jam();
212     file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
213   } else {
214     jam();
215     SegmentedSectionPtr ptr;
216     signal->getSection(ptr, FsOpenReq::FILENAME);
217     file->theFileName.set(spec, ptr, g_sectionSegmentPool);
218     releaseSections(signal);
219   }
220   file->reportTo(&theFromThreads);
221   if (getenv("NDB_TRACE_OPEN"))
222     ndbout_c("open(%s)", file->theFileName.c_str());
223 
224   Request* request = theRequestPool->get();
225   request->action = Request::open;
226   request->error = 0;
227   request->set(userRef, userPointer, newId() );
228   request->file = file;
229   request->theTrace = signal->getTrace();
230   request->par.open.flags = fsOpenReq->fileFlags;
231   request->par.open.page_size = fsOpenReq->page_size;
232   request->par.open.file_size = fsOpenReq->file_size_hi;
233   request->par.open.file_size <<= 32;
234   request->par.open.file_size |= fsOpenReq->file_size_lo;
235   request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
236 
237   ndbrequire(forward(file, request));
238 }
239 
240 void
execFSREMOVEREQ(Signal * signal)241 Ndbfs::execFSREMOVEREQ(Signal* signal)
242 {
243   jamEntry();
244   const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
245   const BlockReference userRef = req->userReference;
246   AsyncFile* file = getIdleFile();
247   ndbrequire(file != NULL);
248 
249   Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
250   file->theFileName.set(spec, userRef, req->fileNumber, req->directory);
251   file->reportTo(&theFromThreads);
252 
253   Request* request = theRequestPool->get();
254   request->action = Request::rmrf;
255   request->par.rmrf.directory = req->directory;
256   request->par.rmrf.own_directory = req->ownDirectory;
257   request->error = 0;
258   request->set(userRef, req->userPointer, newId() );
259   request->file = file;
260   request->theTrace = signal->getTrace();
261 
262   ndbrequire(forward(file, request));
263 }
264 
265 /*
266  * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1
267  * remove file
268  */
269 void
execFSCLOSEREQ(Signal * signal)270 Ndbfs::execFSCLOSEREQ(Signal * signal)
271 {
272   jamEntry();
273   const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0];
274   const BlockReference userRef = fsCloseReq->userReference;
275   const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;
276   const UintR userPointer = fsCloseReq->userPointer;
277 
278   AsyncFile* openFile = theOpenFiles.find(filePointer);
279   if (openFile == NULL) {
280     // The file was not open, send error back to sender
281     jam();
282     // Initialise FsRef signal
283     FsRef * const fsRef = (FsRef *)&signal->theData[0];
284     fsRef->userPointer  = userPointer;
285     fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
286     fsRef->osErrorCode  = ~0; // Indicate local error
287     sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);
288     return;
289   }
290 
291   Request *request = theRequestPool->get();
292   if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
293      jam();
294      request->action = Request::closeRemove;
295   } else {
296      jam();
297      request->action = Request::close;
298   }
299   request->set(userRef, fsCloseReq->userPointer, filePointer);
300   request->file = openFile;
301   request->error = 0;
302   request->theTrace = signal->getTrace();
303 
304   ndbrequire(forward(openFile, request));
305 }
306 
307 void
readWriteRequest(int action,Signal * signal)308 Ndbfs::readWriteRequest(int action, Signal * signal)
309 {
310   const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0];
311   Uint16 filePointer =  (Uint16)fsRWReq->filePointer;
312   const UintR userPointer = fsRWReq->userPointer;
313   const BlockReference userRef = fsRWReq->userReference;
314   const BlockNumber blockNumber = refToBlock(userRef);
315 
316   AsyncFile* openFile = theOpenFiles.find(filePointer);
317 
318   const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex];
319   UintPtr tPageSize;
320   UintPtr tClusterSize;
321   UintPtr tNRR;
322   UintPtr tPageOffset;
323   char*        tWA;
324   FsRef::NdbfsErrorCodeType errorCode;
325 
326   Request *request = theRequestPool->get();
327   request->error = 0;
328   request->set(userRef, userPointer, filePointer);
329   request->file = openFile;
330   request->action = (Request::Action) action;
331   request->theTrace = signal->getTrace();
332 
333   Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
334 
335   if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed
336     jam();
337     errorCode = FsRef::fsErrInvalidParameters;
338     goto error;
339   }
340 
341   if(format != FsReadWriteReq::fsFormatGlobalPage &&
342      format != FsReadWriteReq::fsFormatSharedPage)
343   {
344     if (fsRWReq->varIndex >= getBatSize(blockNumber)) {
345       jam();// Ensure that a valid variable is used
346       errorCode = FsRef::fsErrInvalidParameters;
347       goto error;
348     }
349     if (myBaseAddrRef == NULL) {
350       jam(); // Ensure that a valid variable is used
351       errorCode = FsRef::fsErrInvalidParameters;
352       goto error;
353     }
354     if (openFile == NULL) {
355       jam(); //file not open
356       errorCode = FsRef::fsErrFileDoesNotExist;
357       goto error;
358     }
359     tPageSize = pageSize(myBaseAddrRef);
360     tClusterSize = myBaseAddrRef->ClusterSize;
361     tNRR = myBaseAddrRef->nrr;
362     tWA = (char*)myBaseAddrRef->WA;
363 
364     switch (format) {
365 
366       // List of memory and file pages pairs
367     case FsReadWriteReq::fsFormatListOfPairs: {
368       jam();
369       for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
370 	jam();
371 	const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex;
372 	const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset;
373 	if (varIndex >= tNRR) {
374 	  jam();
375 	  errorCode = FsRef::fsErrInvalidParameters;
376 	  goto error;
377 	}//if
378 	request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
379 	request->par.readWrite.pages[i].size = tPageSize;
380 	request->par.readWrite.pages[i].offset = fileOffset * tPageSize;
381       }//for
382       request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
383       break;
384     }//case
385 
386       // Range of memory page with one file page
387     case FsReadWriteReq::fsFormatArrayOfPages: {
388       if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {
389         jam();
390         errorCode = FsRef::fsErrInvalidParameters;
391         goto error;
392       }//if
393       const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex;
394       const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset;
395 
396       request->par.readWrite.pages[0].offset = fileOffset * tPageSize;
397       request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;
398       request->par.readWrite.numberOfPages = 1;
399       request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];
400       break;
401     }//case
402 
403       // List of memory pages followed by one file page
404     case FsReadWriteReq::fsFormatListOfMemPages: {
405 
406       tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];
407       tPageOffset *= tPageSize;
408 
409       for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
410 	jam();
411 	UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i];
412 
413 	if (varIndex >= tNRR) {
414 	  jam();
415 	  errorCode = FsRef::fsErrInvalidParameters;
416 	  goto error;
417 	}//if
418 	request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
419 	request->par.readWrite.pages[i].size = tPageSize;
420 	request->par.readWrite.pages[i].offset = tPageOffset + (i*tPageSize);
421       }//for
422       request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
423       break;
424       // make it a writev or readv
425     }//case
426 
427     default: {
428       jam();
429       errorCode = FsRef::fsErrInvalidParameters;
430       goto error;
431     }//default
432     }//switch
433   }
434   else if (format == FsReadWriteReq::fsFormatGlobalPage)
435   {
436     Ptr<GlobalPage> ptr;
437     m_global_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
438     request->par.readWrite.pages[0].buf = (char*)ptr.p;
439     request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
440     request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
441     request->par.readWrite.numberOfPages = 1;
442   }
443   else
444   {
445     ndbrequire(format == FsReadWriteReq::fsFormatSharedPage);
446     Ptr<GlobalPage> ptr;
447     m_shared_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
448     request->par.readWrite.pages[0].buf = (char*)ptr.p;
449     request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
450     request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
451     request->par.readWrite.numberOfPages = 1;
452   }
453 
454   ndbrequire(forward(openFile, request));
455   return;
456 
457 error:
458   theRequestPool->put(request);
459   FsRef * const fsRef = (FsRef *)&signal->theData[0];
460   fsRef->userPointer = userPointer;
461   fsRef->setErrorCode(fsRef->errorCode, errorCode);
462   fsRef->osErrorCode = ~0; // Indicate local error
463   switch (action) {
464   case Request:: write:
465   case Request:: writeSync: {
466     jam();
467     sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);
468     break;
469   }//case
470   case Request:: readPartial:
471   case Request:: read: {
472     jam();
473     sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
474   }//case
475   }//switch
476   return;
477 }
478 
479 /*
480     PR0: File Pointer , theData[0]
481     DR0: User reference, theData[1]
482     DR1: User Pointer,   etc.
483     DR2: Flag
484     DR3: Var number
485     DR4: amount of pages
486     DR5->: Memory Page id and File page id according to Flag
487 */
488 void
execFSWRITEREQ(Signal * signal)489 Ndbfs::execFSWRITEREQ(Signal* signal)
490 {
491   jamEntry();
492   const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0];
493 
494   if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){
495     jam();
496     readWriteRequest( Request::writeSync, signal );
497   } else {
498     jam();
499     readWriteRequest( Request::write, signal );
500   }
501 }
502 
503 /*
504     PR0: File Pointer
505     DR0: User reference
506     DR1: User Pointer
507     DR2: Flag
508     DR3: Var number
509     DR4: amount of pages
510     DR5->: Memory Page id and File page id according to Flag
511 */
512 void
execFSREADREQ(Signal * signal)513 Ndbfs::execFSREADREQ(Signal* signal)
514 {
515   jamEntry();
516   FsReadWriteReq * req = (FsReadWriteReq *)signal->getDataPtr();
517   if (FsReadWriteReq::getPartialReadFlag(req->operationFlag))
518     readWriteRequest( Request::readPartial, signal );
519   else
520     readWriteRequest( Request::read, signal );
521 }
522 
523 /*
524  * PR0: File Pointer DR0: User reference DR1: User Pointer
525  */
526 void
execFSSYNCREQ(Signal * signal)527 Ndbfs::execFSSYNCREQ(Signal * signal)
528 {
529   jamEntry();
530   Uint16 filePointer =  (Uint16)signal->theData[0];
531   BlockReference userRef = signal->theData[1];
532   const UintR userPointer = signal->theData[2];
533   AsyncFile* openFile = theOpenFiles.find(filePointer);
534 
535   if (openFile == NULL) {
536      jam(); //file not open
537      FsRef * const fsRef = (FsRef *)&signal->theData[0];
538      fsRef->userPointer = userPointer;
539      fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
540      fsRef->osErrorCode = ~0; // Indicate local error
541      sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);
542      return;
543   }
544 
545   Request *request = theRequestPool->get();
546   request->error = 0;
547   request->action = Request::sync;
548   request->set(userRef, userPointer, filePointer);
549   request->file = openFile;
550   request->theTrace = signal->getTrace();
551 
552   ndbrequire(forward(openFile,request));
553 }
554 
555 void
execFSAPPENDREQ(Signal * signal)556 Ndbfs::execFSAPPENDREQ(Signal * signal)
557 {
558   const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];
559   const Uint16 filePointer =  (Uint16)fsReq->filePointer;
560   const UintR userPointer = fsReq->userPointer;
561   const BlockReference userRef = fsReq->userReference;
562   const BlockNumber blockNumber = refToBlock(userRef);
563 
564   FsRef::NdbfsErrorCodeType errorCode;
565 
566   AsyncFile* openFile = theOpenFiles.find(filePointer);
567   const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex];
568 
569   const Uint32* tWA   = (const Uint32*)myBaseAddrRef->WA;
570   const Uint32  tSz   = myBaseAddrRef->nrr;
571   const Uint32 offset = fsReq->offset;
572   const Uint32 size   = fsReq->size;
573   const Uint32 synch_flag = fsReq->synch_flag;
574   Request *request = theRequestPool->get();
575 
576   if (openFile == NULL) {
577     jam();
578     errorCode = FsRef::fsErrFileDoesNotExist;
579     goto error;
580   }
581 
582   if (myBaseAddrRef == NULL) {
583     jam(); // Ensure that a valid variable is used
584     errorCode = FsRef::fsErrInvalidParameters;
585     goto error;
586   }
587 
588   if (fsReq->varIndex >= getBatSize(blockNumber)) {
589     jam();// Ensure that a valid variable is used
590     errorCode = FsRef::fsErrInvalidParameters;
591     goto error;
592   }
593 
594   if(offset + size > tSz){
595     jam(); // Ensure that a valid variable is used
596     errorCode = FsRef::fsErrInvalidParameters;
597     goto error;
598   }
599 
600   request->error = 0;
601   request->set(userRef, userPointer, filePointer);
602   request->file = openFile;
603   request->theTrace = signal->getTrace();
604 
605   request->par.append.buf = (const char *)(tWA + offset);
606   request->par.append.size = size << 2;
607 
608   if (!synch_flag)
609     request->action = Request::append;
610   else
611     request->action = Request::append_synch;
612   ndbrequire(forward(openFile, request));
613   return;
614 
615 error:
616   jam();
617   theRequestPool->put(request);
618   FsRef * const fsRef = (FsRef *)&signal->theData[0];
619   fsRef->userPointer = userPointer;
620   fsRef->setErrorCode(fsRef->errorCode, errorCode);
621   fsRef->osErrorCode = ~0; // Indicate local error
622 
623   jam();
624   sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB);
625   return;
626 }
627 
628 Uint16
newId()629 Ndbfs::newId()
630 {
631   // finds a new key, eg a new filepointer
632   for (int i = 1; i < SHRT_MAX; i++)
633   {
634     if (theLastId == SHRT_MAX) {
635       jam();
636       theLastId = 1;
637     } else {
638       jam();
639       theLastId++;
640     }
641 
642     if(theOpenFiles.find(theLastId) == NULL) {
643       jam();
644       return theLastId;
645     }
646   }
647   ndbrequire(1 == 0);
648   // The program will not reach this point
649   return 0;
650 }
651 
652 AsyncFile*
createAsyncFile()653 Ndbfs::createAsyncFile(){
654 
655   // Check limit of open files
656   if (m_maxFiles !=0 && theFiles.size() ==  m_maxFiles) {
657     // Print info about all open files
658     for (unsigned i = 0; i < theFiles.size(); i++){
659       AsyncFile* file = theFiles[i];
660       ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
661     }
662     ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
663   }
664 
665   AsyncFile* file = new AsyncFile(* this);
666   file->doStart();
667 
668   // Put the file in list of all files
669   theFiles.push_back(file);
670 
671 #ifdef VM_TRACE
672   infoEvent("NDBFS: Created new file thread %d", theFiles.size());
673 #endif
674 
675   return file;
676 }
677 
678 AsyncFile*
getIdleFile()679 Ndbfs::getIdleFile(){
680   AsyncFile* file;
681   if (theIdleFiles.size() > 0){
682     file = theIdleFiles[0];
683     theIdleFiles.erase(0);
684   } else {
685     file = createAsyncFile();
686   }
687   return file;
688 }
689 
690 
691 
692 void
report(Request * request,Signal * signal)693 Ndbfs::report(Request * request, Signal* signal)
694 {
695   const Uint32 orgTrace = signal->getTrace();
696   signal->setTrace(request->theTrace);
697   const BlockReference ref = request->theUserReference;
698 
699   if(!request->file->m_page_ptr.isNull())
700   {
701     m_global_page_pool.release(request->file->m_page_ptr);
702     request->file->m_page_ptr.setNull();
703   }
704 
705   if (request->error) {
706     jam();
707     // Initialise FsRef signal
708     FsRef * const fsRef = (FsRef *)&signal->theData[0];
709     fsRef->userPointer = request->theUserPointer;
710     if(request->error & FsRef::FS_ERR_BIT)
711     {
712       fsRef->errorCode = request->error;
713       fsRef->osErrorCode = 0;
714     }
715     else
716     {
717       fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error));
718       fsRef->osErrorCode = request->error;
719     }
720     switch (request->action) {
721     case Request:: open: {
722       jam();
723       // Put the file back in idle files list
724       theIdleFiles.push_back(request->file);
725       sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
726       break;
727     }
728     case Request:: closeRemove:
729     case Request:: close: {
730       jam();
731       sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB);
732       break;
733     }
734     case Request:: writeSync:
735     case Request:: writevSync:
736     case Request:: write:
737     case Request:: writev: {
738       jam();
739       sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB);
740       break;
741     }
742     case Request:: read:
743     case Request:: readPartial:
744     case Request:: readv: {
745       jam();
746       sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB);
747       break;
748     }
749     case Request:: sync: {
750       jam();
751       sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
752       break;
753     }
754     case Request::append:
755     case Request::append_synch:
756     {
757       jam();
758       sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
759       break;
760     }
761     case Request::rmrf: {
762       jam();
763       // Put the file back in idle files list
764       theIdleFiles.push_back(request->file);
765       sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
766       break;
767     }
768 
769     case Request:: end: {
770       // Report nothing
771       break;
772     }
773     }//switch
774   } else {
775     jam();
776     FsConf * const fsConf = (FsConf *)&signal->theData[0];
777     fsConf->userPointer = request->theUserPointer;
778     switch (request->action) {
779     case Request:: open: {
780       jam();
781       theOpenFiles.insert(request->file, request->theFilePointer);
782 
783       // Keep track on max number of opened files
784       if (theOpenFiles.size() > m_maxOpenedFiles)
785 	m_maxOpenedFiles = theOpenFiles.size();
786 
787       fsConf->filePointer = request->theFilePointer;
788       sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBB);
789       break;
790     }
791     case Request:: closeRemove:
792     case Request:: close: {
793       jam();
794       // removes the file from OpenFiles list
795       theOpenFiles.erase(request->theFilePointer);
796       // Put the file in idle files list
797       theIdleFiles.push_back(request->file);
798       sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB);
799       break;
800     }
801     case Request:: writeSync:
802     case Request:: writevSync:
803     case Request:: write:
804     case Request:: writev: {
805       jam();
806       sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBB);
807       break;
808     }
809     case Request:: read:
810     case Request:: readv: {
811       jam();
812       sendSignal(ref, GSN_FSREADCONF, signal, 1, JBB);
813       break;
814     }
815     case Request:: readPartial: {
816       jam();
817       fsConf->bytes_read = request->par.readWrite.pages[0].size;
818       sendSignal(ref, GSN_FSREADCONF, signal, 2, JBB);
819       break;
820     }
821     case Request:: sync: {
822       jam();
823       sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB);
824       break;
825     }//case
826     case Request::append:
827     case Request::append_synch:
828     {
829       jam();
830       signal->theData[1] = request->par.append.size;
831       sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB);
832       break;
833     }
834     case Request::rmrf: {
835       jam();
836       // Put the file in idle files list
837       theIdleFiles.push_back(request->file);
838       sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB);
839       break;
840     }
841     case Request:: end: {
842       // Report nothing
843       break;
844     }
845     }
846   }//if
847   signal->setTrace(orgTrace);
848 }
849 
850 
851 bool
scanIPC(Signal * signal)852 Ndbfs::scanIPC(Signal* signal)
853 {
854    Request* request = theFromThreads.tryReadChannel();
855    jam();
856    if (request) {
857       jam();
858       report(request, signal);
859       theRequestPool->put(request);
860       return true;
861    }
862    return false;
863 }
864 
865 #if defined NDB_WIN32
translateErrno(int aErrno)866 Uint32 Ndbfs::translateErrno(int aErrno)
867 {
868   switch (aErrno)
869     {
870       //permission denied
871     case ERROR_ACCESS_DENIED:
872 
873       return FsRef::fsErrPermissionDenied;
874       //temporary not accessible
875     case ERROR_PATH_BUSY:
876     case ERROR_NO_MORE_SEARCH_HANDLES:
877 
878       return FsRef::fsErrTemporaryNotAccessible;
879       //no space left on device
880     case ERROR_HANDLE_DISK_FULL:
881     case ERROR_DISK_FULL:
882 
883       return FsRef::fsErrNoSpaceLeftOnDevice;
884       //none valid parameters
885     case ERROR_INVALID_HANDLE:
886     case ERROR_INVALID_DRIVE:
887     case ERROR_INVALID_ACCESS:
888     case ERROR_HANDLE_EOF:
889     case ERROR_BUFFER_OVERFLOW:
890 
891       return FsRef::fsErrInvalidParameters;
892       //environment error
893     case ERROR_CRC:
894     case ERROR_ARENA_TRASHED:
895     case ERROR_BAD_ENVIRONMENT:
896     case ERROR_INVALID_BLOCK:
897     case ERROR_WRITE_FAULT:
898     case ERROR_READ_FAULT:
899     case ERROR_OPEN_FAILED:
900 
901       return FsRef::fsErrEnvironmentError;
902 
903       //no more process resources
904     case ERROR_TOO_MANY_OPEN_FILES:
905     case ERROR_NOT_ENOUGH_MEMORY:
906     case ERROR_OUTOFMEMORY:
907       return FsRef::fsErrNoMoreResources;
908       //no file
909     case ERROR_FILE_NOT_FOUND:
910       return FsRef::fsErrFileDoesNotExist;
911 
912     case ERR_ReadUnderflow:
913       return FsRef::fsErrReadUnderflow;
914 
915     default:
916       return FsRef::fsErrUnknown;
917     }
918 }
919 #else
translateErrno(int aErrno)920 Uint32 Ndbfs::translateErrno(int aErrno)
921 {
922   switch (aErrno)
923     {
924       //permission denied
925     case EACCES:
926     case EROFS:
927     case ENXIO:
928       return FsRef::fsErrPermissionDenied;
929       //temporary not accessible
930     case EAGAIN:
931     case ETIMEDOUT:
932     case ENOLCK:
933     case EINTR:
934     case EIO:
935       return FsRef::fsErrTemporaryNotAccessible;
936       //no space left on device
937     case ENFILE:
938     case EDQUOT:
939 #ifdef ENOSR
940     case ENOSR:
941 #endif
942     case ENOSPC:
943     case EFBIG:
944       return FsRef::fsErrNoSpaceLeftOnDevice;
945       //none valid parameters
946     case EINVAL:
947     case EBADF:
948     case ENAMETOOLONG:
949     case EFAULT:
950     case EISDIR:
951     case ENOTDIR:
952     case EEXIST:
953     case ETXTBSY:
954       return FsRef::fsErrInvalidParameters;
955       //environment error
956     case ELOOP:
957 #ifdef ENOLINK
958     case ENOLINK:
959 #endif
960 #ifdef EMULTIHOP
961     case EMULTIHOP:
962 #endif
963 #ifdef EOPNOTSUPP
964     case EOPNOTSUPP:
965 #endif
966 #ifdef ESPIPE
967     case ESPIPE:
968 #endif
969     case EPIPE:
970       return FsRef::fsErrEnvironmentError;
971 
972       //no more process resources
973     case EMFILE:
974     case ENOMEM:
975       return FsRef::fsErrNoMoreResources;
976       //no file
977     case ENOENT:
978       return FsRef::fsErrFileDoesNotExist;
979 
980     case ERR_ReadUnderflow:
981       return FsRef::fsErrReadUnderflow;
982 
983     default:
984       return FsRef::fsErrUnknown;
985     }
986 }
987 #endif
988 
989 
990 
991 void
execCONTINUEB(Signal * signal)992 Ndbfs::execCONTINUEB(Signal* signal)
993 {
994   jamEntry();
995   if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) {
996     jam();
997 
998     // Also send CONTINUEB to ourself in order to scan for
999     // incoming answers from AsyncFile on MemoryChannel theFromThreads
1000     signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
1001     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
1002     if (scanningInProgress == true) {
1003       jam();
1004       return;
1005     }
1006   }
1007   if (scanIPC(signal)) {
1008     jam();
1009     scanningInProgress = true;
1010     signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1011     sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1012    } else {
1013     jam();
1014     scanningInProgress = false;
1015    }
1016    return;
1017 }
1018 
1019 void
execDUMP_STATE_ORD(Signal * signal)1020 Ndbfs::execDUMP_STATE_ORD(Signal* signal)
1021 {
1022   if(signal->theData[0] == 19){
1023     return;
1024   }
1025   if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){
1026     infoEvent("NDBFS: Files: %d Open files: %d",
1027 	      theFiles.size(),
1028 	      theOpenFiles.size());
1029     infoEvent(" Idle files: %d Max opened files: %d",
1030 	       theIdleFiles.size(),
1031 	       m_maxOpenedFiles);
1032     infoEvent(" Max files: %d",
1033 	      m_maxFiles);
1034     infoEvent(" Requests: %d",
1035 	      theRequestPool->size());
1036 
1037     return;
1038   }
1039   if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){
1040     infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size());
1041 
1042     for (unsigned i = 0; i < theOpenFiles.size(); i++){
1043       AsyncFile* file = theOpenFiles.getFile(i);
1044       infoEvent("%2d (0x%x): %s", i,file, file->theFileName.c_str());
1045     }
1046     return;
1047   }
1048   if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){
1049     infoEvent("NDBFS: Dump all files: %d", theFiles.size());
1050 
1051     for (unsigned i = 0; i < theFiles.size(); i++){
1052       AsyncFile* file = theFiles[i];
1053       infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED");
1054     }
1055     return;
1056   }
1057   if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
1058     infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size());
1059 
1060     for (unsigned i = 0; i < theIdleFiles.size(); i++){
1061       AsyncFile* file = theIdleFiles[i];
1062       infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED");
1063     }
1064     return;
1065   }
1066 
1067   if(signal->theData[0] == 404)
1068   {
1069     ndbrequire(signal->getLength() == 2);
1070     Uint32 file= signal->theData[1];
1071     AsyncFile* openFile = theOpenFiles.find(file);
1072     ndbrequire(openFile != 0);
1073     ndbout_c("File: %s %p", openFile->theFileName.c_str(), openFile);
1074     Request* curr = openFile->m_current_request;
1075     Request* last = openFile->m_last_request;
1076     if(curr)
1077       ndbout << "Current request: " << *curr << endl;
1078     if(last)
1079        ndbout << "Last request: " << *last << endl;
1080 
1081     ndbout << "theReportTo " << *openFile->theReportTo << endl;
1082     ndbout << "theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl;
1083 
1084     ndbout << "All files: " << endl;
1085     for (unsigned i = 0; i < theFiles.size(); i++){
1086       AsyncFile* file = theFiles[i];
1087       ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
1088     }
1089   }
1090 }//Ndbfs::execDUMP_STATE_ORD()
1091 
1092 const char*
get_filename(Uint32 fd) const1093 Ndbfs::get_filename(Uint32 fd) const
1094 {
1095   jamEntry();
1096   const AsyncFile* openFile = theOpenFiles.find(fd);
1097   if(openFile)
1098     return openFile->theFileName.get_base_name();
1099   return "";
1100 }
1101 
1102 
1103 BLOCK_FUNCTIONS(Ndbfs)
1104 
1105 template class Vector<AsyncFile*>;
1106 template class Vector<OpenFiles::OpenFileItem>;
1107 template class MemoryChannel<Request>;
1108 template class Pool<Request>;
1109 template NdbOut& operator<<(NdbOut&, const MemoryChannel<Request>&);
1110