1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include "Ndbfs.hpp"
28 #include "AsyncFile.hpp"
29 
30 #ifdef _WIN32
31 #include "Win32AsyncFile.hpp"
32 #else
33 #include "PosixAsyncFile.hpp"
34 #endif
35 
36 #include <signaldata/FsOpenReq.hpp>
37 #include <signaldata/FsCloseReq.hpp>
38 #include <signaldata/FsReadWriteReq.hpp>
39 #include <signaldata/FsAppendReq.hpp>
40 #include <signaldata/FsRemoveReq.hpp>
41 #include <signaldata/FsConf.hpp>
42 #include <signaldata/FsRef.hpp>
43 #include <signaldata/NdbfsContinueB.hpp>
44 #include <signaldata/DumpStateOrd.hpp>
45 #include <signaldata/AllocMem.hpp>
46 #include <signaldata/BuildIndxImpl.hpp>
47 
48 #include <RefConvert.hpp>
49 #include <portlib/NdbDir.hpp>
50 #include <NdbOut.hpp>
51 #include <Configuration.hpp>
52 
53 #include <EventLogger.hpp>
54 
55 #define JAM_FILE_ID 393
56 
57 extern EventLogger * g_eventLogger;
58 /**
59  * NDBFS has two types of async IO file threads : Bound and non-bound.
60  * These threads are kept in two distinct idle pools.
61  * Requests to be executed by any thread in an idle pool are queued onto
62  * a shared queue, which all of the idle threads in the pool attempt to
63  * dequeue work from.  Work items are processed, which may take some
64  * time, and then once the outcome is known, a response is queued on a
65  * reply queue, which the NDBFS signal execution thread polls
66  * periodically.
67  *
68  * Bound IO threads have the ability to remove themselves from the idle
69  * pool.  This happens as part of processing an OPEN request, where the
70  * thread is 'attached' to a particular file.
71  * As part of being 'attached' to a file, the thread no longer attempts
72  * to dequeue work from the shared queue, but rather starts dequeuing
73  * work just from a private queue associated with the file.
74  *
75  * This removes the thread from general use and dedicates it to servicing
76  * requests on the attached file until a CLOSE request arrives, which
77  * will cause the thread to be detached from the file and return to the
78  * idle Bound threads pool, where it will attempt to dequeue work from
79  * the Shared queue again.
80  *
81  * Non-bound IO threads are created at startup, they are not associated
82  * with a particular file and process one request at a time to
83  * completion.  They always dequeue work from the non-bound shared queue.
84 
85  * Some request types use Bound IO threads in a non-bound way, where a
86  * single request is processed to completion by a single thread, which
87  * then continues to dequeue work from the shared bound
88  * queue.  Examples: build index, allocate memory, remove file.
89  * In these cases, the bound IO thread pool is being used as it
90  * effectively offers a concurrent thread for each concurrent request,
91  * and these use cases exist to get thread concurrency.
92  *
93  * Pool sizing
94  *
95  * The non-bound thread pool size is set by the DiskIoThreadPool config
96  * variable at node start, and does not change after.
97  *
98  * The bound thread pool size is set by the InitialNoOfOpenFiles
99  * config variable at node start and can grow dynamically afterwards.
100  * There is no mechanism currently for IO threads to be released.
101  * It is bound by MaxNoOfOpenFiles.
102  *
103  * Bound thread pool growth
104  *
105  * When receiving a request which requires the use of a Bound thread pool
106  * thread, the NDBFS block checks whether there are sufficient threads
107  * to ensure a quick execution of the request.  If there are not then
108  * it creates an extra thread prior to enqueuing the request on the
109  * shared bound thread pool queue.
110  *
111  *
112  * The Bound IO thread pool exists to supply enough thread concurrency to
113  * match the concurrency of requests submitted to it. Assumed goals are :
114  *  1) Avoid excessive thread creation
115  *     since each thread has a memory and resource cost and
116  *     currently they are never released until the process exits.
117  *  2) Avoid bound requests sitting on the shared bound queue for any
118  *     significant amount of time.
119 */
120 
121 inline
pageSize(const NewVARIABLE * baseAddrRef)122 int pageSize( const NewVARIABLE* baseAddrRef )
123 {
124    int log_psize;
125    int log_qsize = baseAddrRef->bits.q;
126    int log_vsize = baseAddrRef->bits.v;
127    if (log_vsize < 3)
128       log_vsize = 3;
129    log_psize = log_qsize + log_vsize - 3;
130    return (1 << log_psize);
131 }
132 
Ndbfs(Block_context & ctx)133 Ndbfs::Ndbfs(Block_context& ctx) :
134   SimulatedBlock(NDBFS, ctx),
135   scanningInProgress(false),
136   theLastId(0),
137   theRequestPool(0),
138   m_maxOpenedFiles(0),
139   m_bound_threads_cnt(0),
140   m_unbounds_threads_cnt(0),
141   m_active_bound_threads_cnt(0)
142 {
143   BLOCK_CONSTRUCTOR(Ndbfs);
144 
145   // Set received signals
146   addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);
147   addRecSignal(GSN_DUMP_STATE_ORD,  &Ndbfs::execDUMP_STATE_ORD);
148   addRecSignal(GSN_STTOR,  &Ndbfs::execSTTOR);
149   addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);
150   addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);
151   addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);
152   addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);
153   addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);
154   addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);
155   addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);
156   addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);
157   addRecSignal(GSN_ALLOC_MEM_REQ, &Ndbfs::execALLOC_MEM_REQ);
158   addRecSignal(GSN_SEND_PACKED, &Ndbfs::execSEND_PACKED, true);
159   addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Ndbfs::execBUILD_INDX_IMPL_REQ);
160    // Set send signals
161   addRecSignal(GSN_FSSUSPENDORD, &Ndbfs::execFSSUSPENDORD);
162 
163   theRequestPool = new Pool<Request>;
164 }
165 
~Ndbfs()166 Ndbfs::~Ndbfs()
167 {
168   /**
169    * Stop all unbound threads
170    */
171 
172   /**
173    * Post enought Request::end to saturate all unbound threads
174    */
175   Request request;
176   request.action = Request::end;
177   for (unsigned i = 0; i < theThreads.size(); i++)
178   {
179     theToBoundThreads.writeChannel(&request);
180     theToUnboundThreads.writeChannel(&request);
181   }
182 
183   for (unsigned i = 0; i < theThreads.size(); i++)
184   {
185     AsyncIoThread * thr = theThreads[i];
186     thr->shutdown();
187   }
188 
189   /**
190    * delete all threads
191    */
192   for (unsigned i = 0; i < theThreads.size(); i++)
193   {
194     AsyncIoThread * thr = theThreads[i];
195     delete thr;
196     theThreads[i] = 0;
197   }
198   theThreads.clear();
199 
200   /**
201    * Delete all files
202    */
203   for (unsigned i = 0; i < theFiles.size(); i++){
204     AsyncFile* file = theFiles[i];
205     delete file;
206     theFiles[i] = NULL;
207   }//for
208   theFiles.clear();
209 
210   if (theRequestPool)
211     delete theRequestPool;
212 }
213 
214 static
215 bool
do_mkdir(const char * path)216 do_mkdir(const char * path)
217 {
218   return NdbDir::create(path,
219                         NdbDir::u_rwx() | NdbDir::g_r() | NdbDir::g_x(),
220                         true /* ignore_existing */);
221 }
222 
223 static
224 void
add_path(BaseString & dst,const char * add)225 add_path(BaseString& dst, const char * add)
226 {
227   const char * tmp = dst.c_str();
228   unsigned len = dst.length();
229   unsigned dslen = (unsigned)strlen(DIR_SEPARATOR);
230 
231   if (len > dslen && strcmp(tmp+(len - dslen), DIR_SEPARATOR) != 0)
232     dst.append(DIR_SEPARATOR);
233   dst.append(add);
234 }
235 
236 static
237 bool
validate_path(BaseString & dst,const char * path)238 validate_path(BaseString & dst,
239               const char * path)
240 {
241   char buf2[PATH_MAX];
242   memset(buf2, 0,sizeof(buf2));
243 #ifdef _WIN32
244   CreateDirectory(path, 0);
245   char* szFilePart;
246   if(!GetFullPathName(path, sizeof(buf2), buf2, &szFilePart) ||
247      (GetFileAttributes(buf2) & FILE_ATTRIBUTE_READONLY))
248     return false;
249 #else
250   if (::realpath(path, buf2) == NULL ||
251       ::access(buf2, W_OK) != 0)
252     return false;
253 #endif
254   dst.assign(buf2);
255   add_path(dst, "");
256   return true;
257 }
258 
259 const BaseString&
get_base_path(Uint32 no) const260 Ndbfs::get_base_path(Uint32 no) const
261 {
262   if (no < NDB_ARRAY_SIZE(m_base_path) &&
263       strlen(m_base_path[no].c_str()) > 0)
264   {
265     jam();
266     return m_base_path[no];
267   }
268 
269   return m_base_path[FsOpenReq::BP_FS];
270 }
271 
272 void
execREAD_CONFIG_REQ(Signal * signal)273 Ndbfs::execREAD_CONFIG_REQ(Signal* signal)
274 {
275   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
276 
277   Uint32 ref = req->senderRef;
278   Uint32 senderData = req->senderData;
279 
280   const ndb_mgm_configuration_iterator * p =
281     m_ctx.m_config.getOwnConfigIterator();
282   ndbrequire(p != 0);
283   BaseString tmp;
284   tmp.assfmt("ndb_%u_fs%s", getOwnNodeId(), DIR_SEPARATOR);
285   m_base_path[FsOpenReq::BP_FS].assfmt("%s%s",
286                                        m_ctx.m_config.fileSystemPath(),
287                                        tmp.c_str());
288   m_base_path[FsOpenReq::BP_BACKUP].assign(m_ctx.m_config.backupFilePath());
289 
290   const char * ddpath = 0;
291   ndb_mgm_get_string_parameter(p, CFG_DB_DD_FILESYSTEM_PATH, &ddpath);
292 
293   {
294     const char * datapath = ddpath;
295     ndb_mgm_get_string_parameter(p, CFG_DB_DD_DATAFILE_PATH, &datapath);
296     if (datapath)
297     {
298       /**
299        * Only set BP_DD_DF if either FileSystemPathDataFiles or FileSystemPathDD
300        *   is set...otherwise get_base_path(FsOpenReq::BP_DD_DF) will
301        *   return BP_FS (see get_base_path)
302        */
303       BaseString path;
304       add_path(path, datapath);
305       do_mkdir(path.c_str());
306       add_path(path, tmp.c_str());
307       do_mkdir(path.c_str());
308       if (!validate_path(m_base_path[FsOpenReq::BP_DD_DF], path.c_str()))
309       {
310         ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
311                   m_base_path[FsOpenReq::BP_DD_DF].c_str(),
312                   "FileSystemPathDataFiles");
313       }
314     }
315     else
316     {
317       BaseString path;
318       add_path(path, m_base_path[FsOpenReq::BP_FS].c_str());
319       do_mkdir(path.c_str());
320       BaseString tmpTS;
321       tmpTS.assfmt("TS%s", DIR_SEPARATOR);
322       add_path(path, tmpTS.c_str());
323       do_mkdir(path.c_str());
324       if (!validate_path(m_base_path[FsOpenReq::BP_DD_DF], path.c_str()))
325       {
326         ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
327                   m_base_path[FsOpenReq::BP_DD_DF].c_str(),
328                   "FileSystemPathDataFiles");
329       }
330     }
331   }
332 
333   {
334     const char * undopath = ddpath;
335     ndb_mgm_get_string_parameter(p, CFG_DB_DD_UNDOFILE_PATH, &undopath);
336     if (undopath)
337     {
338       /**
339        * Only set BP_DD_DF if either FileSystemPathUndoFiles or FileSystemPathDD
340        *   is set...otherwise get_base_path(FsOpenReq::BP_DD_UF) will
341        *   return BP_FS (see get_base_path)
342        */
343       BaseString path;
344       add_path(path, undopath);
345       do_mkdir(path.c_str());
346       add_path(path, tmp.c_str());
347       do_mkdir(path.c_str());
348 
349       if (!validate_path(m_base_path[FsOpenReq::BP_DD_UF], path.c_str()))
350       {
351         ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
352                   m_base_path[FsOpenReq::BP_DD_UF].c_str(),
353                   "FileSystemPathUndoFiles");
354       }
355     }
356     else
357     {
358       BaseString path;
359       add_path(path, m_base_path[FsOpenReq::BP_FS].c_str());
360       do_mkdir(path.c_str());
361       BaseString tmpLG;
362       tmpLG.assfmt("LG%s", DIR_SEPARATOR);
363       add_path(path, tmpLG.c_str());
364       do_mkdir(path.c_str());
365       if (!validate_path(m_base_path[FsOpenReq::BP_DD_UF], path.c_str()))
366       {
367         ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
368                   m_base_path[FsOpenReq::BP_DD_UF].c_str(),
369                   "FileSystemPathUndoFiles");
370       }
371     }
372   }
373 
374   m_maxFiles = 0;
375   ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
376   Uint32 noIdleFiles = 27;
377 
378   ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
379 
380   {
381     /**
382      * each logpart keeps up to 3 logfiles open at any given time...
383      *   (bound)
384      * make sure noIdleFiles is atleast 4 times #logparts
385      * In addition the LCP execution can have up to 4 files open in each
386      * LDM thread. In the LCP prepare phase we can have up to 2 files
387      * (2 CTL files first, then 1 CTL file and finally 1 CTL file and
388      *  1 data file). The LCP execution runs in parallel and can also
389      * have 2 open files (1 CTL file and 1 data file). With the
390      * introduction of Partial LCP the execution phase could even have
391      * 9 files open at a time and thus we can have up to 11 threads open
392      * at any time per LDM thread only to handle LCP execution.
393      *
394      * In addition ensure that we have at least 10 more open files
395      * available for the remainder of the tasks we need to handle.
396      */
397     Uint32 logParts = NDB_DEFAULT_LOG_PARTS;
398     ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS, &logParts);
399     Uint32 logfiles = 4 * logParts;
400     Uint32 numLDMthreads = getLqhWorkers();
401     if (numLDMthreads == 0)
402     {
403       jam();
404       numLDMthreads = 1;
405     }
406     logfiles += ((numLDMthreads * 11) + 10);
407     if (noIdleFiles < logfiles)
408     {
409       jam();
410       noIdleFiles = logfiles;
411     }
412   }
413 
414   // Make sure at least "noIdleFiles" more files can be created
415   if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
416   {
417     const Uint32 newMax = theFiles.size() + noIdleFiles + 1;
418     g_eventLogger->info("Resetting MaxNoOfOpenFiles %u to %u",
419                         m_maxFiles, newMax);
420     m_maxFiles = newMax;
421   }
422 
423   // Create idle AsyncFiles
424   for (Uint32 i = 0; i < noIdleFiles; i++)
425   {
426     theIdleFiles.push_back(createAsyncFile());
427     AsyncIoThread * thr = createIoThread(/* bound */ true);
428     if (thr)
429     {
430       theThreads.push_back(thr);
431     }
432   }
433 
434   Uint32 threadpool = 2;
435   ndb_mgm_get_int_parameter(p, CFG_DB_THREAD_POOL, &threadpool);
436 
437   // Create IoThreads
438   for (Uint32 i = 0; i < threadpool; i++)
439   {
440     AsyncIoThread * thr = createIoThread(/* bound */ false);
441     if (thr)
442     {
443       jam();
444       theThreads.push_back(thr);
445     }
446     else
447     {
448       jam();
449       break;
450     }
451   }
452 
453   setup_wakeup();
454 
455   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
456   conf->senderRef = reference();
457   conf->senderData = senderData;
458   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
459 	     ReadConfigConf::SignalLength, JBB);
460 
461   // start scanning
462   signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
463   sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
464 }
465 
466 /* Received a restart signal.
467  * Answer it like any other block
468  * PR0  : StartCase
469  * DR0  : StartPhase
470  * DR1  : ?
471  * DR2  : ?
472  * DR3  : ?
473  * DR4  : ?
474  * DR5  : SignalKey
475  */
476 void
execSTTOR(Signal * signal)477 Ndbfs::execSTTOR(Signal* signal)
478 {
479   jamEntry();
480 
481   if(signal->theData[1] == 0){ // StartPhase 0
482     jam();
483 
484     if (ERROR_INSERTED(2000) || ERROR_INSERTED(2001))
485     {
486       // Save(2000) or restore(2001) FileSystemPath/ndb_XX_fs/
487       BaseString& fs_path = m_base_path[FsOpenReq::BP_FS];
488       unsigned i = fs_path.length() - strlen(DIR_SEPARATOR);
489       BaseString saved_path(fs_path.c_str(), i);
490       const char * ending_separator = fs_path.c_str() + i;
491       ndbrequire(strcmp(ending_separator, DIR_SEPARATOR)==0);
492       saved_path.append(".saved");
493       saved_path.append(ending_separator);
494       BaseString& from_dir = (ERROR_INSERTED(2000) ? fs_path : saved_path);
495       BaseString& to_dir = (ERROR_INSERTED(2000) ? saved_path : fs_path);
496 
497       const bool only_contents = true;
498       if (NdbDir::remove_recursive(to_dir.c_str(), !only_contents))
499       {
500         g_eventLogger->info("Cleaned destination file system at %s", to_dir.c_str());
501       }
502       else
503       {
504         g_eventLogger->warning("Failed cleaning file system at %s", to_dir.c_str());
505       }
506       if (access(to_dir.c_str(), F_OK) == 0 || errno != ENOENT)
507       {
508         g_eventLogger->error("Destination file system at %s should not be there (errno %d)!",
509                              to_dir.c_str(),
510                              errno);
511         ndbrequire(!"Destination file system already there during file system saving or restoring");
512       }
513       if (rename(from_dir.c_str(), to_dir.c_str()) == -1)
514       {
515         g_eventLogger->error("Failed renaming %s file system to %s while %s (errno %d)",
516           from_dir.c_str(),
517           to_dir.c_str(),
518           (ERROR_INSERTED(2000) ? "saving" : "restoring"),
519           errno);
520         ndbrequire(!"Failed renaming file system while saving ot restoring");
521       }
522       SET_ERROR_INSERT_VALUE2(ERROR_INSERT_EXTRA, 0);
523     }
524     do_mkdir(m_base_path[FsOpenReq::BP_FS].c_str());
525 
526     // close all open files
527     ndbrequire(theOpenFiles.size() == 0);
528 
529     signal->theData[3] = 255;
530     sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);
531     return;
532   }
533   ndbabort();
534 }
535 
536 int
forward(AsyncFile * file,Request * request)537 Ndbfs::forward( AsyncFile * file, Request* request)
538 {
539   jam();
540   request->m_startTime = getHighResTimer();
541 
542   AsyncIoThread* thr = file->getThread();
543   if (thr) // bound
544   {
545     thr->dispatch(request);
546   }
547   else if (request->m_do_bind)
548   {
549     theToBoundThreads.writeChannel(request);
550   }
551   else
552   {
553     theToUnboundThreads.writeChannel(request);
554   }
555   return 1;
556 }
557 
558 void
execFSOPENREQ(Signal * signal)559 Ndbfs::execFSOPENREQ(Signal* signal)
560 {
561   jamEntry();
562   const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
563   const BlockReference userRef = fsOpenReq->userReference;
564 
565   bool bound = (fsOpenReq->fileFlags & FsOpenReq::OM_THREAD_POOL) == 0;
566   AsyncFile* file = getIdleFile(bound);
567   ndbrequire(file != NULL);
568 
569   Uint32 userPointer = fsOpenReq->userPointer;
570 
571   SectionHandle handle(this, signal);
572   SegmentedSectionPtr ptr; ptr.setNull();
573   if (handle.m_cnt)
574   {
575     jam();
576     handle.getSection(ptr, FsOpenReq::FILENAME);
577   }
578   file->theFileName.set(this, userRef, fsOpenReq->fileNumber, false, ptr);
579   releaseSections(handle);
580 
581   if (fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
582   {
583     jam();
584     Uint32 cnt = 16; // 512k
585     Ptr<GlobalPage> page_ptr;
586     m_ctx.m_mm.alloc_pages(RT_NDBFS_INIT_FILE_PAGE, &page_ptr.i, &cnt, 1);
587     if(cnt == 0)
588     {
589       file->m_page_ptr.setNull();
590       file->m_page_cnt = 0;
591 
592       FsRef * const fsRef = (FsRef *)&signal->theData[0];
593       fsRef->userPointer  = userPointer;
594       fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
595       fsRef->osErrorCode  = ~0; // Indicate local error
596       sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
597       return;
598     }
599     m_shared_page_pool.getPtr(page_ptr);
600     file->set_buffer(RT_NDBFS_INIT_FILE_PAGE, page_ptr, cnt);
601   }
602   else if (fsOpenReq->fileFlags & FsOpenReq::OM_WRITE_BUFFER)
603   {
604     jam();
605     Uint32 cnt = NDB_FILE_BUFFER_SIZE / GLOBAL_PAGE_SIZE; // 256k
606     Ptr<GlobalPage> page_ptr;
607     m_ctx.m_mm.alloc_pages(RT_FILE_BUFFER, &page_ptr.i, &cnt, 1);
608     if (cnt == 0)
609     {
610       jam();
611       file->m_page_ptr.setNull();
612       file->m_page_cnt = 0;
613 
614       FsRef * const fsRef = (FsRef *)&signal->theData[0];
615       fsRef->userPointer  = userPointer;
616       fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
617       fsRef->osErrorCode  = ~0; // Indicate local error
618       sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
619       return;
620     }
621     m_shared_page_pool.getPtr(page_ptr);
622     file->set_buffer(RT_FILE_BUFFER, page_ptr, cnt);
623   }
624   else
625   {
626     ndbassert(file->m_page_ptr.isNull());
627     file->m_page_ptr.setNull();
628     file->m_page_cnt = 0;
629   }
630 
631   if (getenv("NDB_TRACE_OPEN"))
632     ndbout_c("open(%s) bound: %u", file->theFileName.c_str(), bound);
633 
634   Request* request = theRequestPool->get();
635   request->action = Request::open;
636   request->error = 0;
637   request->set(userRef, userPointer, newId() );
638   request->file = file;
639   request->theTrace = signal->getTrace();
640   request->par.open.flags = fsOpenReq->fileFlags;
641   request->par.open.page_size = fsOpenReq->page_size;
642   request->par.open.file_size = fsOpenReq->file_size_hi;
643   request->par.open.file_size <<= 32;
644   request->par.open.file_size |= fsOpenReq->file_size_lo;
645   request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
646   request->m_do_bind = bound;
647 
648   ndbrequire(forward(file, request));
649 }
650 
651 void
execFSREMOVEREQ(Signal * signal)652 Ndbfs::execFSREMOVEREQ(Signal* signal)
653 {
654   jamEntry();
655   const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
656   const BlockReference userRef = req->userReference;
657   bool bound = true;
658   AsyncFile* file = getIdleFile(bound);
659   ndbrequire(file != NULL);
660 
661   SectionHandle handle(this, signal);
662   SegmentedSectionPtr ptr; ptr.setNull();
663   if(handle.m_cnt)
664   {
665     jam();
666     handle.getSection(ptr, FsOpenReq::FILENAME);
667   }
668 
669   file->theFileName.set(this, userRef, req->fileNumber, req->directory, ptr);
670   releaseSections(handle);
671 
672   Uint32 version = FsOpenReq::getVersion(req->fileNumber);
673   Uint32 bp = FsOpenReq::v5_getLcpNo(req->fileNumber);
674 
675   Request* request = theRequestPool->get();
676   request->action = Request::rmrf;
677   request->par.rmrf.directory = req->directory;
678   request->par.rmrf.own_directory = req->ownDirectory;
679   request->error = 0;
680   request->set(userRef, req->userPointer, newId() );
681   request->file = file;
682   request->theTrace = signal->getTrace();
683   request->m_do_bind = bound;
684 
685   if (version == 6)
686   {
687     ndbrequire(bp < NDB_ARRAY_SIZE(m_base_path));
688     if (strlen(m_base_path[bp].c_str()) == 0)
689     {
690       goto ignore;
691     }
692   }
693 
694   ndbrequire(forward(file, request));
695   return;
696 ignore:
697   report(request, signal);
698 }
699 
700 /*
701  * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1
702  * remove file
703  */
704 void
execFSCLOSEREQ(Signal * signal)705 Ndbfs::execFSCLOSEREQ(Signal * signal)
706 {
707   jamEntry();
708   const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0];
709   const BlockReference userRef = fsCloseReq->userReference;
710   const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;
711   const UintR userPointer = fsCloseReq->userPointer;
712 
713   AsyncFile* openFile = theOpenFiles.find(filePointer);
714   if (openFile == NULL) {
715     // The file was not open, send error back to sender
716     jam();
717     // Initialise FsRef signal
718     FsRef * const fsRef = (FsRef *)&signal->theData[0];
719     fsRef->userPointer  = userPointer;
720     fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
721     fsRef->osErrorCode  = ~0; // Indicate local error
722     sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);
723 
724     g_eventLogger->warning("Trying to close unknown file!! %u", userPointer);
725     g_eventLogger->warning("Dumping files");
726     signal->theData[0] = 405;
727     execDUMP_STATE_ORD(signal);
728     return;
729   }
730 
731   if (getenv("NDB_TRACE_OPEN"))
732     ndbout_c("close(%s)", openFile->theFileName.c_str());
733 
734   Request *request = theRequestPool->get();
735   if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
736      jam();
737      request->action = Request::closeRemove;
738   } else {
739      jam();
740      request->action = Request::close;
741   }
742   request->set(userRef, fsCloseReq->userPointer, filePointer);
743   request->file = openFile;
744   request->error = 0;
745   request->theTrace = signal->getTrace();
746   request->m_do_bind = false;
747 
748   ndbrequire(forward(openFile, request));
749 }
750 
751 void
readWriteRequest(int action,Signal * signal)752 Ndbfs::readWriteRequest(int action, Signal * signal)
753 {
754   Uint32 theData[25 + 2 * NDB_FS_RW_PAGES];
755   memcpy(theData, signal->theData, 4 * signal->getLength());
756   SectionHandle handle(this, signal);
757   if (handle.m_cnt > 0)
758   {
759     jam();
760     SegmentedSectionPtr secPtr;
761     ndbrequire(handle.getSection(secPtr, 0));
762     ndbrequire(signal->getLength() + secPtr.sz < NDB_ARRAY_SIZE(theData));
763     copy(theData + signal->getLength(), secPtr);
764     releaseSections(handle);
765   }
766 
767   const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)theData;
768   Uint16 filePointer =  (Uint16)fsRWReq->filePointer;
769   const UintR userPointer = fsRWReq->userPointer;
770   const BlockReference userRef = fsRWReq->userReference;
771   const BlockNumber blockNumber = refToMain(userRef);
772   const Uint32 instanceNumber = refToInstance(userRef);
773 
774   AsyncFile* openFile = theOpenFiles.find(filePointer);
775 
776   const NewVARIABLE *myBaseAddrRef =
777     &getBat(blockNumber, instanceNumber)[fsRWReq->varIndex];
778   UintPtr tPageSize;
779   UintPtr tClusterSize;
780   UintPtr tNRR;
781   UintPtr tPageOffset;
782   char*        tWA;
783   FsRef::NdbfsErrorCodeType errorCode;
784 
785   Request *request = theRequestPool->get();
786   request->error = 0;
787   request->set(userRef, userPointer, filePointer);
788   request->file = openFile;
789   request->action = (Request::Action) action;
790   request->theTrace = signal->getTrace();
791   request->m_do_bind = false;
792 
793   Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
794 
795   if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed
796     jam();
797     errorCode = FsRef::fsErrInvalidParameters;
798     goto error;
799   }
800 
801   if(format != FsReadWriteReq::fsFormatGlobalPage &&
802      format != FsReadWriteReq::fsFormatSharedPage)
803   {
804     if (fsRWReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
805       jam();// Ensure that a valid variable is used
806       errorCode = FsRef::fsErrInvalidParameters;
807       goto error;
808     }
809     if (myBaseAddrRef == NULL) {
810       jam(); // Ensure that a valid variable is used
811       errorCode = FsRef::fsErrInvalidParameters;
812       goto error;
813     }
814     if (openFile == NULL) {
815       jam(); //file not open
816       errorCode = FsRef::fsErrFileDoesNotExist;
817       goto error;
818     }
819     tPageSize = pageSize(myBaseAddrRef);
820     tClusterSize = myBaseAddrRef->ClusterSize;
821     tNRR = myBaseAddrRef->nrr;
822     tWA = (char*)myBaseAddrRef->WA;
823 
824     switch (format) {
825 
826       // List of memory and file pages pairs
827     case FsReadWriteReq::fsFormatListOfPairs: {
828       jam();
829       for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
830 	jam();
831 	const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex;
832 	const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset;
833 	if (varIndex >= tNRR) {
834 	  jam();
835 	  errorCode = FsRef::fsErrInvalidParameters;
836 	  goto error;
837 	}//if
838 	request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
839 	request->par.readWrite.pages[i].size = tPageSize;
840 	request->par.readWrite.pages[i].offset = (off_t)(fileOffset*tPageSize);
841       }//for
842       request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
843       break;
844     }//case
845 
846       // Range of memory page with one file page
847     case FsReadWriteReq::fsFormatArrayOfPages: {
848       if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {
849         jam();
850         errorCode = FsRef::fsErrInvalidParameters;
851         goto error;
852       }//if
853       const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex;
854       const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset;
855 
856       request->par.readWrite.pages[0].offset = (off_t)(fileOffset * tPageSize);
857       request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;
858       request->par.readWrite.numberOfPages = 1;
859       request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];
860       break;
861     }//case
862 
863       // List of memory pages followed by one file page
864     case FsReadWriteReq::fsFormatListOfMemPages: {
865 
866       tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];
867       tPageOffset *= tPageSize;
868 
869       for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
870 	jam();
871 	UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i];
872 
873 	if (varIndex >= tNRR) {
874 	  jam();
875 	  errorCode = FsRef::fsErrInvalidParameters;
876 	  goto error;
877 	}//if
878 	request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
879 	request->par.readWrite.pages[i].size = tPageSize;
880 	request->par.readWrite.pages[i].offset = (off_t)
881           (tPageOffset + (i*tPageSize));
882       }//for
883       request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
884       break;
885       // make it a writev or readv
886     }//case
887 
888     case FsReadWriteReq::fsFormatMemAddress:
889     {
890       jam();
891       const Uint32 memoryOffset = fsRWReq->data.memoryAddress.memoryOffset;
892       const Uint32 fileOffset = fsRWReq->data.memoryAddress.fileOffset;
893       const Uint32 sz = fsRWReq->data.memoryAddress.size;
894 
895       request->par.readWrite.pages[0].buf = &tWA[memoryOffset];
896       request->par.readWrite.pages[0].size = sz;
897       request->par.readWrite.pages[0].offset = (off_t)(fileOffset);
898       request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
899       break;
900     }
901     default: {
902       jam();
903       errorCode = FsRef::fsErrInvalidParameters;
904       goto error;
905     }//default
906     }//switch
907   }
908   else if (format == FsReadWriteReq::fsFormatGlobalPage)
909   {
910     Ptr<GlobalPage> ptr;
911     m_global_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
912     request->par.readWrite.pages[0].buf = (char*)ptr.p;
913     request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
914     request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
915     request->par.readWrite.numberOfPages = 1;
916   }
917   else
918   {
919     ndbrequire(format == FsReadWriteReq::fsFormatSharedPage);
920     Ptr<GlobalPage> ptr;
921     m_shared_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
922     request->par.readWrite.pages[0].buf = (char*)ptr.p;
923     request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
924     request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
925     request->par.readWrite.numberOfPages = 1;
926   }
927 
928   ndbrequire(forward(openFile, request));
929   return;
930 
931 error:
932   theRequestPool->put(request);
933   FsRef * const fsRef = (FsRef *)&signal->theData[0];
934   fsRef->userPointer = userPointer;
935   fsRef->setErrorCode(fsRef->errorCode, errorCode);
936   fsRef->osErrorCode = ~0; // Indicate local error
937   switch (action) {
938   case Request:: write:
939   case Request:: writeSync: {
940     jam();
941     sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);
942     break;
943   }//case
944   case Request:: readPartial:
945   case Request:: read: {
946     jam();
947     sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
948   }//case
949   }//switch
950   return;
951 }
952 
953 /*
954     PR0: File Pointer , theData[0]
955     DR0: User reference, theData[1]
956     DR1: User Pointer,   etc.
957     DR2: Flag
958     DR3: Var number
959     DR4: amount of pages
960     DR5->: Memory Page id and File page id according to Flag
961 */
962 void
execFSWRITEREQ(Signal * signal)963 Ndbfs::execFSWRITEREQ(Signal* signal)
964 {
965   jamEntry();
966   const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0];
967 
968   if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){
969     jam();
970     readWriteRequest( Request::writeSync, signal );
971   } else {
972     jam();
973     readWriteRequest( Request::write, signal );
974   }
975 }
976 
977 /*
978     PR0: File Pointer
979     DR0: User reference
980     DR1: User Pointer
981     DR2: Flag
982     DR3: Var number
983     DR4: amount of pages
984     DR5->: Memory Page id and File page id according to Flag
985 */
986 void
execFSREADREQ(Signal * signal)987 Ndbfs::execFSREADREQ(Signal* signal)
988 {
989   jamEntry();
990   FsReadWriteReq * req = (FsReadWriteReq *)signal->getDataPtr();
991   if (FsReadWriteReq::getPartialReadFlag(req->operationFlag))
992   {
993     jam();
994     readWriteRequest( Request::readPartial, signal );
995   }
996   else
997   {
998     jam();
999     readWriteRequest( Request::read, signal );
1000   }
1001 }
1002 
1003 /*
1004  * PR0: File Pointer DR0: User reference DR1: User Pointer
1005  */
1006 void
execFSSYNCREQ(Signal * signal)1007 Ndbfs::execFSSYNCREQ(Signal * signal)
1008 {
1009   jamEntry();
1010   Uint16 filePointer =  (Uint16)signal->theData[0];
1011   BlockReference userRef = signal->theData[1];
1012   const UintR userPointer = signal->theData[2];
1013   AsyncFile* openFile = theOpenFiles.find(filePointer);
1014 
1015   if (openFile == NULL) {
1016      jam(); //file not open
1017      FsRef * const fsRef = (FsRef *)&signal->theData[0];
1018      fsRef->userPointer = userPointer;
1019      fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
1020      fsRef->osErrorCode = ~0; // Indicate local error
1021      sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);
1022      return;
1023   }
1024 
1025   Request *request = theRequestPool->get();
1026   request->error = 0;
1027   request->action = Request::sync;
1028   request->set(userRef, userPointer, filePointer);
1029   request->file = openFile;
1030   request->theTrace = signal->getTrace();
1031   request->m_do_bind = false;
1032 
1033   ndbrequire(forward(openFile,request));
1034 }
1035 
1036 /*
1037  * PR0: File Pointer DR0: User reference DR1: User Pointer
1038  */
1039 void
execFSSUSPENDORD(Signal * signal)1040 Ndbfs::execFSSUSPENDORD(Signal * signal)
1041 {
1042   jamEntry();
1043   Uint16 filePointer =  (Uint16)signal->theData[0];
1044   Uint32 millis = signal->theData[1];
1045   AsyncFile* openFile = theOpenFiles.find(filePointer);
1046 
1047   if (openFile == NULL)
1048   {
1049     jam(); //file not open
1050     return;
1051   }
1052 
1053   Request *request = theRequestPool->get();
1054   request->error = 0;
1055   request->action = Request::suspend;
1056   request->set(0, 0, filePointer);
1057   request->file = openFile;
1058   request->theTrace = signal->getTrace();
1059   request->par.suspend.milliseconds = millis;
1060   request->m_do_bind = false;
1061 
1062   ndbrequire(forward(openFile,request));
1063 }
1064 
1065 void
execFSAPPENDREQ(Signal * signal)1066 Ndbfs::execFSAPPENDREQ(Signal * signal)
1067 {
1068   const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];
1069   const Uint16 filePointer =  (Uint16)fsReq->filePointer;
1070   const UintR userPointer = fsReq->userPointer;
1071   const BlockReference userRef = fsReq->userReference;
1072   const BlockNumber blockNumber = refToMain(userRef);
1073   const Uint32 instanceNumber = refToInstance(userRef);
1074 
1075   FsRef::NdbfsErrorCodeType errorCode;
1076 
1077   AsyncFile* openFile = theOpenFiles.find(filePointer);
1078   const NewVARIABLE *myBaseAddrRef =
1079     &getBat(blockNumber, instanceNumber)[fsReq->varIndex];
1080 
1081   const Uint32* tWA   = (const Uint32*)myBaseAddrRef->WA;
1082   const Uint32  tSz   = myBaseAddrRef->nrr;
1083   const Uint32 offset = fsReq->offset;
1084   const Uint32 size   = fsReq->size;
1085   const Uint32 synch_flag = fsReq->synch_flag;
1086   Request *request = theRequestPool->get();
1087 
1088   if (openFile == NULL) {
1089     jam();
1090     errorCode = FsRef::fsErrFileDoesNotExist;
1091     goto error;
1092   }
1093 
1094   if (myBaseAddrRef == NULL) {
1095     jam(); // Ensure that a valid variable is used
1096     errorCode = FsRef::fsErrInvalidParameters;
1097     goto error;
1098   }
1099 
1100   if (fsReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
1101     jam();// Ensure that a valid variable is used
1102     errorCode = FsRef::fsErrInvalidParameters;
1103     goto error;
1104   }
1105 
1106   if(offset + size > tSz){
1107     jam(); // Ensure that a valid variable is used
1108     errorCode = FsRef::fsErrInvalidParameters;
1109     goto error;
1110   }
1111 
1112   request->error = 0;
1113   request->set(userRef, userPointer, filePointer);
1114   request->file = openFile;
1115   request->theTrace = signal->getTrace();
1116 
1117   request->par.append.buf = (const char *)(tWA + offset);
1118   request->par.append.size = size << 2;
1119 
1120   if (!synch_flag)
1121     request->action = Request::append;
1122   else
1123     request->action = Request::append_synch;
1124   request->m_do_bind = false;
1125   ndbrequire(forward(openFile, request));
1126   return;
1127 
1128 error:
1129   jam();
1130   theRequestPool->put(request);
1131   FsRef * const fsRef = (FsRef *)&signal->theData[0];
1132   fsRef->userPointer = userPointer;
1133   fsRef->setErrorCode(fsRef->errorCode, errorCode);
1134   fsRef->osErrorCode = ~0; // Indicate local error
1135 
1136   jam();
1137   sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB);
1138   return;
1139 }
1140 
1141 void
execALLOC_MEM_REQ(Signal * signal)1142 Ndbfs::execALLOC_MEM_REQ(Signal* signal)
1143 {
1144   jamEntry();
1145 
1146   AllocMemReq* req = (AllocMemReq*)signal->getDataPtr();
1147 
1148   bool bound = true;
1149   AsyncFile* file = getIdleFile(bound);
1150   ndbrequire(file != NULL);
1151 
1152   Request *request = theRequestPool->get();
1153 
1154   request->error = 0;
1155   request->set(req->senderRef, req->senderData, 0);
1156   request->file = file;
1157   request->theTrace = signal->getTrace();
1158 
1159   request->par.alloc.ctx = &m_ctx;
1160   request->par.alloc.requestInfo = req->requestInfo;
1161   request->par.alloc.bytes = (Uint64(req->bytes_hi) << 32) + req->bytes_lo;
1162   request->action = Request::allocmem;
1163   request->m_do_bind = bound;
1164   ndbrequire(forward(file, request));
1165 }
1166 
1167 void
execBUILD_INDX_IMPL_REQ(Signal * signal)1168 Ndbfs::execBUILD_INDX_IMPL_REQ(Signal* signal)
1169 {
1170   jamEntry();
1171   mt_BuildIndxReq * req = (mt_BuildIndxReq*)signal->getDataPtr();
1172 
1173   bool bound = true;
1174   AsyncFile* file = getIdleFile(bound);
1175   ndbrequire(file != NULL);
1176 
1177   Request *request = theRequestPool->get();
1178   request->error = 0;
1179   request->set(req->senderRef, req->senderData, 0);
1180   request->file = file;
1181   request->theTrace = signal->getTrace();
1182 
1183   Uint32 cnt = (req->buffer_size + 32768 - 1) / 32768;
1184   Uint32 save = cnt;
1185   Ptr<GlobalPage> page_ptr;
1186   m_ctx.m_mm.alloc_pages(RT_NDBFS_BUILD_INDEX_PAGE, &page_ptr.i, &cnt, cnt);
1187   if(cnt == 0)
1188   {
1189     file->m_page_ptr.setNull();
1190     file->m_page_cnt = 0;
1191 
1192     ndbabort(); // TODO
1193     return;
1194   }
1195 
1196   ndbrequire(cnt == save);
1197 
1198   m_shared_page_pool.getPtr(page_ptr);
1199   file->set_buffer(RT_NDBFS_BUILD_INDEX_PAGE, page_ptr, cnt);
1200 
1201   memcpy(&request->par.build.m_req, req, sizeof(* req));
1202   request->action = Request::buildindx;
1203   request->m_do_bind = bound;
1204   ndbrequire(forward(file, request));
1205 }
1206 
1207 Uint16
newId()1208 Ndbfs::newId()
1209 {
1210   // finds a new key, eg a new filepointer
1211   for (int i = 1; i < SHRT_MAX; i++)
1212   {
1213     if (theLastId == SHRT_MAX) {
1214       jam();
1215       theLastId = 1;
1216     } else {
1217       jam();
1218       theLastId++;
1219     }
1220 
1221     if(theOpenFiles.find(theLastId) == NULL) {
1222       jam();
1223       return theLastId;
1224     }
1225   }
1226   ndbrequire(1 == 0);
1227   // The program will not reach this point
1228   return 0;
1229 }
1230 
1231 AsyncFile*
createAsyncFile()1232 Ndbfs::createAsyncFile()
1233 {
1234   // Check limit of open files
1235   if (m_maxFiles !=0 && theFiles.size() ==  m_maxFiles)
1236   {
1237     // Print info about all open files
1238     for (unsigned i = 0; i < theFiles.size(); i++){
1239       AsyncFile* file = theFiles[i];
1240       ndbout_c("%2d (0x%lx): %s",
1241                i,
1242                (long) file,
1243                file->isOpen() ?"OPEN" : "CLOSED");
1244     }
1245     ndbout_c("m_maxFiles: %u, theFiles.size() = %u",
1246               m_maxFiles, theFiles.size());
1247     ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile: creating more than MaxNoOfOpenFiles");
1248   }
1249 
1250 #ifdef _WIN32
1251   AsyncFile* file = new Win32AsyncFile(* this);
1252 #else
1253   AsyncFile* file = new PosixAsyncFile(* this);
1254 #endif
1255   int err = file->init();
1256   if (err == -1)
1257   {
1258     ERROR_SET(fatal, NDBD_EXIT_AFS_ZLIB_INIT_FAIL, "", " Ndbfs::createAsyncFile: Zlib init failure");
1259   }
1260   else if(err)
1261   {
1262     ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
1263   }
1264 
1265   theFiles.push_back(file);
1266   return file;
1267 }
1268 
1269 void
pushIdleFile(AsyncFile * file)1270 Ndbfs::pushIdleFile(AsyncFile* file)
1271 {
1272   assert(file->getThread() == 0);
1273   if (file->thread_bound())
1274   {
1275     m_active_bound_threads_cnt--;
1276     file->set_thread_bound(false);
1277   }
1278   theIdleFiles.push_back(file);
1279 }
1280 
1281 AsyncIoThread*
createIoThread(bool bound)1282 Ndbfs::createIoThread(bool bound)
1283 {
1284   AsyncIoThread* thr = new AsyncIoThread(*this, bound);
1285   if (thr)
1286   {
1287 #ifdef VM_TRACE
1288     ndbout_c("NDBFS: Created new file thread %d", theThreads.size());
1289 #endif
1290 
1291     struct NdbThread* thrptr = thr->doStart();
1292     globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
1293     thr->set_real_time(
1294       globalEmulatorData.theConfiguration->get_io_real_time());
1295 
1296     if (bound)
1297       m_bound_threads_cnt++;
1298     else
1299       m_unbounds_threads_cnt++;
1300   }
1301 
1302   return thr;
1303 }
1304 
1305 AsyncFile*
getIdleFile(bool bound)1306 Ndbfs::getIdleFile(bool bound)
1307 {
1308   AsyncFile* file = 0;
1309   Uint32 sz = theIdleFiles.size();
1310   if (sz)
1311   {
1312     file = theIdleFiles[sz - 1];
1313     theIdleFiles.erase(sz - 1);
1314   }
1315   else
1316   {
1317     file = createAsyncFile();
1318   }
1319 
1320   if (bound)
1321   {
1322     /**
1323      * Check if we should create thread
1324      */
1325     if (m_active_bound_threads_cnt == m_bound_threads_cnt)
1326     {
1327       AsyncIoThread * thr = createIoThread(true);
1328       if (thr)
1329       {
1330         theThreads.push_back(thr);
1331       }
1332     }
1333 
1334     file->set_thread_bound(true);
1335     m_active_bound_threads_cnt++;
1336   }
1337   return file;
1338 }
1339 
1340 void
report(Request * request,Signal * signal)1341 Ndbfs::report(Request * request, Signal* signal)
1342 {
1343   const Uint32 orgTrace = signal->getTrace();
1344   signal->setTrace(request->theTrace);
1345   const BlockReference ref = request->theUserReference;
1346 
1347   if (request->file->has_buffer())
1348   {
1349     if ((request->action == Request::open && request->error) ||
1350         request->action == Request::close ||
1351         request->action == Request::closeRemove ||
1352         request->action == Request::buildindx)
1353     {
1354       Uint32 rg;
1355       Uint32 cnt;
1356       Ptr<GlobalPage> ptr;
1357       request->file->clear_buffer(rg, ptr, cnt);
1358       m_ctx.m_mm.release_pages(rg, ptr.i, cnt);
1359     }
1360   }
1361 
1362   if (request->error) {
1363     jam();
1364     // Initialise FsRef signal
1365     FsRef * const fsRef = (FsRef *)&signal->theData[0];
1366     fsRef->userPointer = request->theUserPointer;
1367     if(request->error & FsRef::FS_ERR_BIT)
1368     {
1369       fsRef->errorCode = request->error;
1370       fsRef->osErrorCode = 0;
1371     }
1372     else
1373     {
1374       fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error));
1375       fsRef->osErrorCode = request->error;
1376     }
1377     switch (request->action) {
1378     case Request:: open: {
1379       jam();
1380       // Put the file back in idle files list
1381       pushIdleFile(request->file);
1382       sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
1383       break;
1384     }
1385     case Request:: closeRemove:
1386     case Request:: close: {
1387       jam();
1388       sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB);
1389 
1390       g_eventLogger->warning("Error closing file: %s %u/%u",
1391                              request->file->theFileName.c_str(),
1392                              fsRef->errorCode,
1393                              fsRef->osErrorCode);
1394       g_eventLogger->warning("Dumping files");
1395       signal->theData[0] = 405;
1396       execDUMP_STATE_ORD(signal);
1397       break;
1398     }
1399     case Request:: writeSync:
1400     case Request:: writevSync:
1401     case Request:: write:
1402     case Request:: writev: {
1403       jam();
1404       sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB);
1405       break;
1406     }
1407     case Request:: read:
1408     case Request:: readPartial:
1409     case Request:: readv: {
1410       jam();
1411       sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB);
1412       break;
1413     }
1414     case Request:: sync: {
1415       jam();
1416       sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
1417       break;
1418     }
1419     case Request::append:
1420     case Request::append_synch:
1421     {
1422       jam();
1423       sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
1424       break;
1425     }
1426     case Request::rmrf: {
1427       jam();
1428       // Put the file back in idle files list
1429       pushIdleFile(request->file);
1430       sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
1431       break;
1432     }
1433 
1434     case Request:: end: {
1435     case Request:: suspend:
1436       // Report nothing
1437       break;
1438     }
1439     case Request::allocmem: {
1440       jam();
1441       AllocMemRef* rep = (AllocMemRef*)signal->getDataPtrSend();
1442       rep->senderRef = reference();
1443       rep->senderData = request->theUserPointer;
1444       rep->errorCode = request->error;
1445       sendSignal(ref, GSN_ALLOC_MEM_REF, signal,
1446                  AllocMemRef::SignalLength, JBB);
1447       pushIdleFile(request->file);
1448       break;
1449     }
1450     case Request::buildindx: {
1451       jam();
1452       BuildIndxImplRef* rep = (BuildIndxImplRef*)signal->getDataPtrSend();
1453       rep->senderRef = reference();
1454       rep->senderData = request->theUserPointer;
1455       rep->errorCode = (BuildIndxImplRef::ErrorCode)request->error;
1456       sendSignal(ref, GSN_BUILD_INDX_IMPL_REF, signal,
1457                  BuildIndxImplRef::SignalLength, JBB);
1458       pushIdleFile(request->file);
1459       break;
1460     }
1461     }//switch
1462   } else {
1463     jam();
1464     FsConf * const fsConf = (FsConf *)&signal->theData[0];
1465     fsConf->userPointer = request->theUserPointer;
1466     switch (request->action) {
1467     case Request:: open: {
1468       jam();
1469       theOpenFiles.insert(request->file, request->theFilePointer);
1470 
1471       // Keep track on max number of opened files
1472       if (theOpenFiles.size() > m_maxOpenedFiles)
1473 	m_maxOpenedFiles = theOpenFiles.size();
1474 
1475       fsConf->filePointer = request->theFilePointer;
1476       fsConf->fileInfo = request->m_fileinfo;
1477       fsConf->file_size_hi = request->m_file_size_hi;
1478       fsConf->file_size_lo = request->m_file_size_lo;
1479       sendSignal(ref, GSN_FSOPENCONF, signal, 5, JBA);
1480       break;
1481     }
1482     case Request:: closeRemove:
1483     case Request:: close: {
1484       jam();
1485       // removes the file from OpenFiles list
1486       theOpenFiles.erase(request->theFilePointer);
1487       // Put the file in idle files list
1488       pushIdleFile(request->file);
1489       sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBA);
1490       break;
1491     }
1492     case Request:: writeSync:
1493     case Request:: writevSync:
1494     case Request:: write:
1495     case Request:: writev: {
1496       jam();
1497       sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBA);
1498       break;
1499     }
1500     case Request:: read:
1501     case Request:: readv: {
1502       jam();
1503       sendSignal(ref, GSN_FSREADCONF, signal, 1, JBA);
1504       break;
1505     }
1506     case Request:: readPartial: {
1507       jam();
1508       fsConf->bytes_read = Uint32(request->par.readWrite.pages[0].size);
1509       sendSignal(ref, GSN_FSREADCONF, signal, 2, JBA);
1510       break;
1511     }
1512     case Request:: sync: {
1513       jam();
1514       sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBA);
1515       break;
1516     }//case
1517     case Request::append:
1518     case Request::append_synch:
1519     {
1520       jam();
1521       signal->theData[1] = Uint32(request->par.append.size);
1522       sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBA);
1523       break;
1524     }
1525     case Request::rmrf: {
1526       jam();
1527       // Put the file in idle files list
1528       pushIdleFile(request->file);
1529       sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBA);
1530       break;
1531     }
1532     case Request:: end: {
1533     case Request:: suspend:
1534       // Report nothing
1535       break;
1536     }
1537     case Request::allocmem: {
1538       jam();
1539       AllocMemConf* conf = (AllocMemConf*)signal->getDataPtrSend();
1540       conf->senderRef = reference();
1541       conf->senderData = request->theUserPointer;
1542       conf->bytes_hi = Uint32(request->par.alloc.bytes >> 32);
1543       conf->bytes_lo = Uint32(request->par.alloc.bytes);
1544       sendSignal(ref, GSN_ALLOC_MEM_CONF, signal,
1545                  AllocMemConf::SignalLength, JBB);
1546       pushIdleFile(request->file);
1547       break;
1548     }
1549     case Request::buildindx: {
1550       jam();
1551       BuildIndxImplConf* rep = (BuildIndxImplConf*)signal->getDataPtrSend();
1552       rep->senderRef = reference();
1553       rep->senderData = request->theUserPointer;
1554       sendSignal(ref, GSN_BUILD_INDX_IMPL_CONF, signal,
1555                  BuildIndxImplConf::SignalLength, JBB);
1556       pushIdleFile(request->file);
1557       break;
1558     }
1559     }
1560   }//if
1561   signal->setTrace(orgTrace);
1562 }
1563 
1564 
1565 bool
scanIPC(Signal * signal)1566 Ndbfs::scanIPC(Signal* signal)
1567 {
1568    Request* request = theFromThreads.tryReadChannel();
1569    jamDebug();
1570    if (request) {
1571       jam();
1572       report(request, signal);
1573       theRequestPool->put(request);
1574       return true;
1575    }
1576    return false;
1577 }
1578 
1579 #ifdef _WIN32
translateErrno(int aErrno)1580 Uint32 Ndbfs::translateErrno(int aErrno)
1581 {
1582   switch (aErrno)
1583     {
1584       //permission denied
1585     case ERROR_ACCESS_DENIED:
1586 
1587       return FsRef::fsErrPermissionDenied;
1588       //temporary not accessible
1589     case ERROR_PATH_BUSY:
1590     case ERROR_NO_MORE_SEARCH_HANDLES:
1591 
1592       return FsRef::fsErrTemporaryNotAccessible;
1593       //no space left on device
1594     case ERROR_HANDLE_DISK_FULL:
1595     case ERROR_DISK_FULL:
1596 
1597       return FsRef::fsErrNoSpaceLeftOnDevice;
1598       //none valid parameters
1599     case ERROR_INVALID_HANDLE:
1600     case ERROR_INVALID_DRIVE:
1601     case ERROR_INVALID_ACCESS:
1602     case ERROR_HANDLE_EOF:
1603     case ERROR_BUFFER_OVERFLOW:
1604 
1605       return FsRef::fsErrInvalidParameters;
1606       //environment error
1607     case ERROR_CRC:
1608     case ERROR_ARENA_TRASHED:
1609     case ERROR_BAD_ENVIRONMENT:
1610     case ERROR_INVALID_BLOCK:
1611     case ERROR_WRITE_FAULT:
1612     case ERROR_READ_FAULT:
1613     case ERROR_OPEN_FAILED:
1614 
1615       return FsRef::fsErrEnvironmentError;
1616 
1617       //no more process resources
1618     case ERROR_TOO_MANY_OPEN_FILES:
1619     case ERROR_NOT_ENOUGH_MEMORY:
1620     case ERROR_OUTOFMEMORY:
1621       return FsRef::fsErrNoMoreResources;
1622       //no file
1623     case ERROR_FILE_NOT_FOUND:
1624       return FsRef::fsErrFileDoesNotExist;
1625 
1626     case ERR_ReadUnderflow:
1627       return FsRef::fsErrReadUnderflow;
1628 
1629     default:
1630       return FsRef::fsErrUnknown;
1631     }
1632 }
1633 #else
translateErrno(int aErrno)1634 Uint32 Ndbfs::translateErrno(int aErrno)
1635 {
1636   switch (aErrno)
1637     {
1638       //permission denied
1639     case EACCES:
1640     case EROFS:
1641     case ENXIO:
1642       return FsRef::fsErrPermissionDenied;
1643       //temporary not accessible
1644     case EAGAIN:
1645     case ETIMEDOUT:
1646     case ENOLCK:
1647     case EINTR:
1648     case EIO:
1649       return FsRef::fsErrTemporaryNotAccessible;
1650       //no space left on device
1651     case ENFILE:
1652     case EDQUOT:
1653 #ifdef ENOSR
1654     case ENOSR:
1655 #endif
1656     case ENOSPC:
1657     case EFBIG:
1658       return FsRef::fsErrNoSpaceLeftOnDevice;
1659       //none valid parameters
1660     case EINVAL:
1661     case EBADF:
1662     case ENAMETOOLONG:
1663     case EFAULT:
1664     case EISDIR:
1665     case ENOTDIR:
1666     case EEXIST:
1667     case ETXTBSY:
1668       return FsRef::fsErrInvalidParameters;
1669       //environment error
1670     case ELOOP:
1671 #ifdef ENOLINK
1672     case ENOLINK:
1673 #endif
1674 #ifdef EMULTIHOP
1675     case EMULTIHOP:
1676 #endif
1677 #ifdef EOPNOTSUPP
1678     case EOPNOTSUPP:
1679 #endif
1680 #ifdef ESPIPE
1681     case ESPIPE:
1682 #endif
1683     case EPIPE:
1684       return FsRef::fsErrEnvironmentError;
1685 
1686       //no more process resources
1687     case EMFILE:
1688     case ENOMEM:
1689       return FsRef::fsErrNoMoreResources;
1690       //no file
1691     case ENOENT:
1692       return FsRef::fsErrFileDoesNotExist;
1693 
1694     case ERR_ReadUnderflow:
1695       return FsRef::fsErrReadUnderflow;
1696 
1697     default:
1698       return FsRef::fsErrUnknown;
1699     }
1700 }
1701 #endif
1702 
1703 
1704 
1705 void
execCONTINUEB(Signal * signal)1706 Ndbfs::execCONTINUEB(Signal* signal)
1707 {
1708   jamEntry();
1709   if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) {
1710     jam();
1711 
1712     // Also send CONTINUEB to ourself in order to scan for
1713     // incoming answers from AsyncFile on MemoryChannel theFromThreads
1714     signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
1715     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
1716     if (scanningInProgress == true) {
1717       jam();
1718       return;
1719     }
1720   }
1721   if (scanIPC(signal))
1722   {
1723     jam();
1724     scanningInProgress = true;
1725     signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1726     sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1727   }
1728   else
1729   {
1730     jam();
1731     scanningInProgress = false;
1732   }
1733   return;
1734 }
1735 
1736 void
execSEND_PACKED(Signal * signal)1737 Ndbfs::execSEND_PACKED(Signal* signal)
1738 {
1739   jamEntryDebug();
1740   if (scanIPC(signal))
1741   {
1742     if (scanningInProgress == false)
1743     {
1744       jam();
1745       scanningInProgress = true;
1746       signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1747       sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1748     }
1749     signal->theData[0] = 1;
1750     return;
1751   }
1752   if (scanningInProgress == false)
1753     signal->theData[0] = 0;
1754   else
1755     signal->theData[0] = 1;
1756 }
1757 
1758 void
execDUMP_STATE_ORD(Signal * signal)1759 Ndbfs::execDUMP_STATE_ORD(Signal* signal)
1760 {
1761   if(signal->theData[0] == 19){
1762     return;
1763   }
1764   if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){
1765     infoEvent("NDBFS: Files: %d Open files: %d",
1766 	      theFiles.size(),
1767 	      theOpenFiles.size());
1768     infoEvent(" Idle files: %u Max opened files: %d",
1769               theIdleFiles.size(),
1770               m_maxOpenedFiles);
1771     infoEvent(" Bound Threads: %u (active %u) Unbound threads: %u",
1772               m_bound_threads_cnt,
1773               m_active_bound_threads_cnt,
1774               m_unbounds_threads_cnt);
1775     infoEvent(" Max files: %d",
1776 	      m_maxFiles);
1777     infoEvent(" Requests: %d",
1778 	      theRequestPool->size());
1779 
1780     return;
1781   }
1782   if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){
1783     infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size());
1784 
1785     for (unsigned i = 0; i < theOpenFiles.size(); i++){
1786       AsyncFile* file = theOpenFiles.getFile(i);
1787       infoEvent("%2d (0x%lx): %s thr: %lx", i,
1788                 (long)file,
1789                 file->theFileName.c_str(),
1790                 (long)file->getThread());
1791     }
1792     return;
1793   }
1794   if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){
1795     infoEvent("NDBFS: Dump all files: %d", theFiles.size());
1796 
1797     for (unsigned i = 0; i < theFiles.size(); i++){
1798       AsyncFile* file = theFiles[i];
1799       infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
1800     }
1801     return;
1802   }
1803   if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
1804     infoEvent("NDBFS: Dump idle files: %u",
1805               theIdleFiles.size());
1806 
1807     for (unsigned i = 0; i < theIdleFiles.size(); i++){
1808       AsyncFile* file = theIdleFiles[i];
1809       infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
1810     }
1811 
1812     return;
1813   }
1814   if(signal->theData[0] == DumpStateOrd::NdbfsDumpRequests)
1815   {
1816     g_eventLogger->info("NDBFS: Dump requests: %u",
1817                         theRequestPool->inuse());
1818     for (unsigned ridx=0; ridx < theRequestPool->inuse(); ridx++)
1819     {
1820       const Request* req = theRequestPool->peekInuseItem(ridx);
1821       Uint64 duration = 0;
1822 
1823       if (NdbTick_IsValid(req->m_startTime))
1824       {
1825         duration = NdbTick_Elapsed(req->m_startTime,
1826                                    getHighResTimer()).microSec();
1827       }
1828 
1829       g_eventLogger->info("Request %u action %u %s userRef 0x%x "
1830                           "userPtr %u filePtr %u bind %u "
1831                           "duration(us) %llu filename %s",
1832                           ridx,
1833                           req->action,
1834                           Request::actionName(req->action),
1835                           req->theUserReference,
1836                           req->theUserPointer,
1837                           req->theFilePointer,
1838                           req->m_do_bind,
1839                           duration,
1840                           (req->file?
1841                            req->file->theFileName.c_str():
1842                            "NO FILE"));
1843     }
1844     return;
1845   }
1846 
1847 
1848   if(signal->theData[0] == 404)
1849   {
1850 #if 0
1851     ndbrequire(signal->getLength() == 2);
1852     Uint32 file= signal->theData[1];
1853     AsyncFile* openFile = theOpenFiles.find(file);
1854     ndbrequire(openFile != 0);
1855     ndbout_c("File: %s %p", openFile->theFileName.c_str(), openFile);
1856     Request* curr = openFile->m_current_request;
1857     Request* last = openFile->m_last_request;
1858     if(curr)
1859       ndbout << "Current request: " << *curr << endl;
1860     if(last)
1861        ndbout << "Last request: " << *last << endl;
1862 
1863     ndbout << "theReportTo " << *openFile->theReportTo << endl;
1864     ndbout << "theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl;
1865 
1866     ndbout << "All files: " << endl;
1867     for (unsigned i = 0; i < theFiles.size(); i++){
1868       AsyncFile* file = theFiles[i];
1869       ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
1870     }
1871 #endif
1872   }
1873 
1874   if(signal->theData[0] == 405)
1875   {
1876     for (unsigned i = 0; i < theFiles.size(); i++)
1877     {
1878       AsyncFile* file = theFiles[i];
1879       if (file == 0)
1880         continue;
1881       ndbout_c("%u : %s %s fileInfo=%08x", i,
1882                file->theFileName.c_str() ? file->theFileName.c_str() : "",
1883                file->isOpen() ? "OPEN" : "CLOSED",
1884                file->get_fileinfo());
1885     }
1886   }
1887 }//Ndbfs::execDUMP_STATE_ORD()
1888 
1889 const char*
get_filename(Uint32 fd) const1890 Ndbfs::get_filename(Uint32 fd) const
1891 {
1892   jamNoBlock();
1893   const AsyncFile* openFile = theOpenFiles.find(fd);
1894   if(openFile)
1895     return openFile->theFileName.get_base_name();
1896   return "";
1897 }
1898 
1899 
1900 BLOCK_FUNCTIONS(Ndbfs)
1901 
1902 template class Vector<AsyncFile*>;
1903 template class Vector<AsyncIoThread*>;
1904 template class Vector<OpenFiles::OpenFileItem>;
1905 template class MemoryChannel<Request>;
1906 template class Pool<Request>;
1907 template NdbOut& operator<<(NdbOut&, const MemoryChannel<Request>&);
1908