1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include "Ndbfs.hpp"
28 #include "AsyncFile.hpp"
29
30 #ifdef _WIN32
31 #include "Win32AsyncFile.hpp"
32 #else
33 #include "PosixAsyncFile.hpp"
34 #endif
35
36 #include <signaldata/FsOpenReq.hpp>
37 #include <signaldata/FsCloseReq.hpp>
38 #include <signaldata/FsReadWriteReq.hpp>
39 #include <signaldata/FsAppendReq.hpp>
40 #include <signaldata/FsRemoveReq.hpp>
41 #include <signaldata/FsConf.hpp>
42 #include <signaldata/FsRef.hpp>
43 #include <signaldata/NdbfsContinueB.hpp>
44 #include <signaldata/DumpStateOrd.hpp>
45 #include <signaldata/AllocMem.hpp>
46 #include <signaldata/BuildIndxImpl.hpp>
47
48 #include <RefConvert.hpp>
49 #include <portlib/NdbDir.hpp>
50 #include <NdbOut.hpp>
51 #include <Configuration.hpp>
52
53 #include <EventLogger.hpp>
54
55 #define JAM_FILE_ID 393
56
57 extern EventLogger * g_eventLogger;
58 /**
59 * NDBFS has two types of async IO file threads : Bound and non-bound.
60 * These threads are kept in two distinct idle pools.
61 * Requests to be executed by any thread in an idle pool are queued onto
62 * a shared queue, which all of the idle threads in the pool attempt to
63 * dequeue work from. Work items are processed, which may take some
64 * time, and then once the outcome is known, a response is queued on a
65 * reply queue, which the NDBFS signal execution thread polls
66 * periodically.
67 *
68 * Bound IO threads have the ability to remove themselves from the idle
69 * pool. This happens as part of processing an OPEN request, where the
70 * thread is 'attached' to a particular file.
71 * As part of being 'attached' to a file, the thread no longer attempts
72 * to dequeue work from the shared queue, but rather starts dequeuing
73 * work just from a private queue associated with the file.
74 *
75 * This removes the thread from general use and dedicates it to servicing
76 * requests on the attached file until a CLOSE request arrives, which
77 * will cause the thread to be detached from the file and return to the
78 * idle Bound threads pool, where it will attempt to dequeue work from
79 * the Shared queue again.
80 *
81 * Non-bound IO threads are created at startup, they are not associated
82 * with a particular file and process one request at a time to
83 * completion. They always dequeue work from the non-bound shared queue.
84
85 * Some request types use Bound IO threads in a non-bound way, where a
86 * single request is processed to completion by a single thread, which
87 * then continues to dequeue work from the shared bound
88 * queue. Examples: build index, allocate memory, remove file.
89 * In these cases, the bound IO thread pool is being used as it
90 * effectively offers a concurrent thread for each concurrent request,
91 * and these use cases exist to get thread concurrency.
92 *
93 * Pool sizing
94 *
95 * The non-bound thread pool size is set by the DiskIoThreadPool config
96 * variable at node start, and does not change after.
97 *
98 * The bound thread pool size is set by the InitialNoOfOpenFiles
99 * config variable at node start and can grow dynamically afterwards.
100 * There is no mechanism currently for IO threads to be released.
101 * It is bound by MaxNoOfOpenFiles.
102 *
103 * Bound thread pool growth
104 *
105 * When receiving a request which requires the use of a Bound thread pool
106 * thread, the NDBFS block checks whether there are sufficient threads
107 * to ensure a quick execution of the request. If there are not then
108 * it creates an extra thread prior to enqueuing the request on the
109 * shared bound thread pool queue.
110 *
111 *
112 * The Bound IO thread pool exists to supply enough thread concurrency to
113 * match the concurrency of requests submitted to it. Assumed goals are :
114 * 1) Avoid excessive thread creation
115 * since each thread has a memory and resource cost and
116 * currently they are never released until the process exits.
117 * 2) Avoid bound requests sitting on the shared bound queue for any
118 * significant amount of time.
119 */
120
121 inline
pageSize(const NewVARIABLE * baseAddrRef)122 int pageSize( const NewVARIABLE* baseAddrRef )
123 {
124 int log_psize;
125 int log_qsize = baseAddrRef->bits.q;
126 int log_vsize = baseAddrRef->bits.v;
127 if (log_vsize < 3)
128 log_vsize = 3;
129 log_psize = log_qsize + log_vsize - 3;
130 return (1 << log_psize);
131 }
132
Ndbfs(Block_context & ctx)133 Ndbfs::Ndbfs(Block_context& ctx) :
134 SimulatedBlock(NDBFS, ctx),
135 scanningInProgress(false),
136 theLastId(0),
137 theRequestPool(0),
138 m_maxOpenedFiles(0),
139 m_bound_threads_cnt(0),
140 m_unbounds_threads_cnt(0),
141 m_active_bound_threads_cnt(0)
142 {
143 BLOCK_CONSTRUCTOR(Ndbfs);
144
145 // Set received signals
146 addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);
147 addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD);
148 addRecSignal(GSN_STTOR, &Ndbfs::execSTTOR);
149 addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);
150 addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);
151 addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);
152 addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);
153 addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);
154 addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);
155 addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);
156 addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);
157 addRecSignal(GSN_ALLOC_MEM_REQ, &Ndbfs::execALLOC_MEM_REQ);
158 addRecSignal(GSN_SEND_PACKED, &Ndbfs::execSEND_PACKED, true);
159 addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Ndbfs::execBUILD_INDX_IMPL_REQ);
160 // Set send signals
161 addRecSignal(GSN_FSSUSPENDORD, &Ndbfs::execFSSUSPENDORD);
162
163 theRequestPool = new Pool<Request>;
164 }
165
~Ndbfs()166 Ndbfs::~Ndbfs()
167 {
168 /**
169 * Stop all unbound threads
170 */
171
172 /**
173 * Post enought Request::end to saturate all unbound threads
174 */
175 Request request;
176 request.action = Request::end;
177 for (unsigned i = 0; i < theThreads.size(); i++)
178 {
179 theToBoundThreads.writeChannel(&request);
180 theToUnboundThreads.writeChannel(&request);
181 }
182
183 for (unsigned i = 0; i < theThreads.size(); i++)
184 {
185 AsyncIoThread * thr = theThreads[i];
186 thr->shutdown();
187 }
188
189 /**
190 * delete all threads
191 */
192 for (unsigned i = 0; i < theThreads.size(); i++)
193 {
194 AsyncIoThread * thr = theThreads[i];
195 delete thr;
196 theThreads[i] = 0;
197 }
198 theThreads.clear();
199
200 /**
201 * Delete all files
202 */
203 for (unsigned i = 0; i < theFiles.size(); i++){
204 AsyncFile* file = theFiles[i];
205 delete file;
206 theFiles[i] = NULL;
207 }//for
208 theFiles.clear();
209
210 if (theRequestPool)
211 delete theRequestPool;
212 }
213
214 static
215 bool
do_mkdir(const char * path)216 do_mkdir(const char * path)
217 {
218 return NdbDir::create(path,
219 NdbDir::u_rwx() | NdbDir::g_r() | NdbDir::g_x(),
220 true /* ignore_existing */);
221 }
222
223 static
224 void
add_path(BaseString & dst,const char * add)225 add_path(BaseString& dst, const char * add)
226 {
227 const char * tmp = dst.c_str();
228 unsigned len = dst.length();
229 unsigned dslen = (unsigned)strlen(DIR_SEPARATOR);
230
231 if (len > dslen && strcmp(tmp+(len - dslen), DIR_SEPARATOR) != 0)
232 dst.append(DIR_SEPARATOR);
233 dst.append(add);
234 }
235
236 static
237 bool
validate_path(BaseString & dst,const char * path)238 validate_path(BaseString & dst,
239 const char * path)
240 {
241 char buf2[PATH_MAX];
242 memset(buf2, 0,sizeof(buf2));
243 #ifdef _WIN32
244 CreateDirectory(path, 0);
245 char* szFilePart;
246 if(!GetFullPathName(path, sizeof(buf2), buf2, &szFilePart) ||
247 (GetFileAttributes(buf2) & FILE_ATTRIBUTE_READONLY))
248 return false;
249 #else
250 if (::realpath(path, buf2) == NULL ||
251 ::access(buf2, W_OK) != 0)
252 return false;
253 #endif
254 dst.assign(buf2);
255 add_path(dst, "");
256 return true;
257 }
258
259 const BaseString&
get_base_path(Uint32 no) const260 Ndbfs::get_base_path(Uint32 no) const
261 {
262 if (no < NDB_ARRAY_SIZE(m_base_path) &&
263 strlen(m_base_path[no].c_str()) > 0)
264 {
265 jam();
266 return m_base_path[no];
267 }
268
269 return m_base_path[FsOpenReq::BP_FS];
270 }
271
272 void
execREAD_CONFIG_REQ(Signal * signal)273 Ndbfs::execREAD_CONFIG_REQ(Signal* signal)
274 {
275 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
276
277 Uint32 ref = req->senderRef;
278 Uint32 senderData = req->senderData;
279
280 const ndb_mgm_configuration_iterator * p =
281 m_ctx.m_config.getOwnConfigIterator();
282 ndbrequire(p != 0);
283 BaseString tmp;
284 tmp.assfmt("ndb_%u_fs%s", getOwnNodeId(), DIR_SEPARATOR);
285 m_base_path[FsOpenReq::BP_FS].assfmt("%s%s",
286 m_ctx.m_config.fileSystemPath(),
287 tmp.c_str());
288 m_base_path[FsOpenReq::BP_BACKUP].assign(m_ctx.m_config.backupFilePath());
289
290 const char * ddpath = 0;
291 ndb_mgm_get_string_parameter(p, CFG_DB_DD_FILESYSTEM_PATH, &ddpath);
292
293 {
294 const char * datapath = ddpath;
295 ndb_mgm_get_string_parameter(p, CFG_DB_DD_DATAFILE_PATH, &datapath);
296 if (datapath)
297 {
298 /**
299 * Only set BP_DD_DF if either FileSystemPathDataFiles or FileSystemPathDD
300 * is set...otherwise get_base_path(FsOpenReq::BP_DD_DF) will
301 * return BP_FS (see get_base_path)
302 */
303 BaseString path;
304 add_path(path, datapath);
305 do_mkdir(path.c_str());
306 add_path(path, tmp.c_str());
307 do_mkdir(path.c_str());
308 if (!validate_path(m_base_path[FsOpenReq::BP_DD_DF], path.c_str()))
309 {
310 ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
311 m_base_path[FsOpenReq::BP_DD_DF].c_str(),
312 "FileSystemPathDataFiles");
313 }
314 }
315 else
316 {
317 BaseString path;
318 add_path(path, m_base_path[FsOpenReq::BP_FS].c_str());
319 do_mkdir(path.c_str());
320 BaseString tmpTS;
321 tmpTS.assfmt("TS%s", DIR_SEPARATOR);
322 add_path(path, tmpTS.c_str());
323 do_mkdir(path.c_str());
324 if (!validate_path(m_base_path[FsOpenReq::BP_DD_DF], path.c_str()))
325 {
326 ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
327 m_base_path[FsOpenReq::BP_DD_DF].c_str(),
328 "FileSystemPathDataFiles");
329 }
330 }
331 }
332
333 {
334 const char * undopath = ddpath;
335 ndb_mgm_get_string_parameter(p, CFG_DB_DD_UNDOFILE_PATH, &undopath);
336 if (undopath)
337 {
338 /**
339 * Only set BP_DD_DF if either FileSystemPathUndoFiles or FileSystemPathDD
340 * is set...otherwise get_base_path(FsOpenReq::BP_DD_UF) will
341 * return BP_FS (see get_base_path)
342 */
343 BaseString path;
344 add_path(path, undopath);
345 do_mkdir(path.c_str());
346 add_path(path, tmp.c_str());
347 do_mkdir(path.c_str());
348
349 if (!validate_path(m_base_path[FsOpenReq::BP_DD_UF], path.c_str()))
350 {
351 ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
352 m_base_path[FsOpenReq::BP_DD_UF].c_str(),
353 "FileSystemPathUndoFiles");
354 }
355 }
356 else
357 {
358 BaseString path;
359 add_path(path, m_base_path[FsOpenReq::BP_FS].c_str());
360 do_mkdir(path.c_str());
361 BaseString tmpLG;
362 tmpLG.assfmt("LG%s", DIR_SEPARATOR);
363 add_path(path, tmpLG.c_str());
364 do_mkdir(path.c_str());
365 if (!validate_path(m_base_path[FsOpenReq::BP_DD_UF], path.c_str()))
366 {
367 ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH,
368 m_base_path[FsOpenReq::BP_DD_UF].c_str(),
369 "FileSystemPathUndoFiles");
370 }
371 }
372 }
373
374 m_maxFiles = 0;
375 ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
376 Uint32 noIdleFiles = 27;
377
378 ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
379
380 {
381 /**
382 * each logpart keeps up to 3 logfiles open at any given time...
383 * (bound)
384 * make sure noIdleFiles is atleast 4 times #logparts
385 * In addition the LCP execution can have up to 4 files open in each
386 * LDM thread. In the LCP prepare phase we can have up to 2 files
387 * (2 CTL files first, then 1 CTL file and finally 1 CTL file and
388 * 1 data file). The LCP execution runs in parallel and can also
389 * have 2 open files (1 CTL file and 1 data file). With the
390 * introduction of Partial LCP the execution phase could even have
391 * 9 files open at a time and thus we can have up to 11 threads open
392 * at any time per LDM thread only to handle LCP execution.
393 *
394 * In addition ensure that we have at least 10 more open files
395 * available for the remainder of the tasks we need to handle.
396 */
397 Uint32 logParts = NDB_DEFAULT_LOG_PARTS;
398 ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS, &logParts);
399 Uint32 logfiles = 4 * logParts;
400 Uint32 numLDMthreads = getLqhWorkers();
401 if (numLDMthreads == 0)
402 {
403 jam();
404 numLDMthreads = 1;
405 }
406 logfiles += ((numLDMthreads * 11) + 10);
407 if (noIdleFiles < logfiles)
408 {
409 jam();
410 noIdleFiles = logfiles;
411 }
412 }
413
414 // Make sure at least "noIdleFiles" more files can be created
415 if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
416 {
417 const Uint32 newMax = theFiles.size() + noIdleFiles + 1;
418 g_eventLogger->info("Resetting MaxNoOfOpenFiles %u to %u",
419 m_maxFiles, newMax);
420 m_maxFiles = newMax;
421 }
422
423 // Create idle AsyncFiles
424 for (Uint32 i = 0; i < noIdleFiles; i++)
425 {
426 theIdleFiles.push_back(createAsyncFile());
427 AsyncIoThread * thr = createIoThread(/* bound */ true);
428 if (thr)
429 {
430 theThreads.push_back(thr);
431 }
432 }
433
434 Uint32 threadpool = 2;
435 ndb_mgm_get_int_parameter(p, CFG_DB_THREAD_POOL, &threadpool);
436
437 // Create IoThreads
438 for (Uint32 i = 0; i < threadpool; i++)
439 {
440 AsyncIoThread * thr = createIoThread(/* bound */ false);
441 if (thr)
442 {
443 jam();
444 theThreads.push_back(thr);
445 }
446 else
447 {
448 jam();
449 break;
450 }
451 }
452
453 setup_wakeup();
454
455 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
456 conf->senderRef = reference();
457 conf->senderData = senderData;
458 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
459 ReadConfigConf::SignalLength, JBB);
460
461 // start scanning
462 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
463 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
464 }
465
466 /* Received a restart signal.
467 * Answer it like any other block
468 * PR0 : StartCase
469 * DR0 : StartPhase
470 * DR1 : ?
471 * DR2 : ?
472 * DR3 : ?
473 * DR4 : ?
474 * DR5 : SignalKey
475 */
476 void
execSTTOR(Signal * signal)477 Ndbfs::execSTTOR(Signal* signal)
478 {
479 jamEntry();
480
481 if(signal->theData[1] == 0){ // StartPhase 0
482 jam();
483
484 if (ERROR_INSERTED(2000) || ERROR_INSERTED(2001))
485 {
486 // Save(2000) or restore(2001) FileSystemPath/ndb_XX_fs/
487 BaseString& fs_path = m_base_path[FsOpenReq::BP_FS];
488 unsigned i = fs_path.length() - strlen(DIR_SEPARATOR);
489 BaseString saved_path(fs_path.c_str(), i);
490 const char * ending_separator = fs_path.c_str() + i;
491 ndbrequire(strcmp(ending_separator, DIR_SEPARATOR)==0);
492 saved_path.append(".saved");
493 saved_path.append(ending_separator);
494 BaseString& from_dir = (ERROR_INSERTED(2000) ? fs_path : saved_path);
495 BaseString& to_dir = (ERROR_INSERTED(2000) ? saved_path : fs_path);
496
497 const bool only_contents = true;
498 if (NdbDir::remove_recursive(to_dir.c_str(), !only_contents))
499 {
500 g_eventLogger->info("Cleaned destination file system at %s", to_dir.c_str());
501 }
502 else
503 {
504 g_eventLogger->warning("Failed cleaning file system at %s", to_dir.c_str());
505 }
506 if (access(to_dir.c_str(), F_OK) == 0 || errno != ENOENT)
507 {
508 g_eventLogger->error("Destination file system at %s should not be there (errno %d)!",
509 to_dir.c_str(),
510 errno);
511 ndbrequire(!"Destination file system already there during file system saving or restoring");
512 }
513 if (rename(from_dir.c_str(), to_dir.c_str()) == -1)
514 {
515 g_eventLogger->error("Failed renaming %s file system to %s while %s (errno %d)",
516 from_dir.c_str(),
517 to_dir.c_str(),
518 (ERROR_INSERTED(2000) ? "saving" : "restoring"),
519 errno);
520 ndbrequire(!"Failed renaming file system while saving ot restoring");
521 }
522 SET_ERROR_INSERT_VALUE2(ERROR_INSERT_EXTRA, 0);
523 }
524 do_mkdir(m_base_path[FsOpenReq::BP_FS].c_str());
525
526 // close all open files
527 ndbrequire(theOpenFiles.size() == 0);
528
529 signal->theData[3] = 255;
530 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);
531 return;
532 }
533 ndbabort();
534 }
535
536 int
forward(AsyncFile * file,Request * request)537 Ndbfs::forward( AsyncFile * file, Request* request)
538 {
539 jam();
540 request->m_startTime = getHighResTimer();
541
542 AsyncIoThread* thr = file->getThread();
543 if (thr) // bound
544 {
545 thr->dispatch(request);
546 }
547 else if (request->m_do_bind)
548 {
549 theToBoundThreads.writeChannel(request);
550 }
551 else
552 {
553 theToUnboundThreads.writeChannel(request);
554 }
555 return 1;
556 }
557
558 void
execFSOPENREQ(Signal * signal)559 Ndbfs::execFSOPENREQ(Signal* signal)
560 {
561 jamEntry();
562 const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
563 const BlockReference userRef = fsOpenReq->userReference;
564
565 bool bound = (fsOpenReq->fileFlags & FsOpenReq::OM_THREAD_POOL) == 0;
566 AsyncFile* file = getIdleFile(bound);
567 ndbrequire(file != NULL);
568
569 Uint32 userPointer = fsOpenReq->userPointer;
570
571 SectionHandle handle(this, signal);
572 SegmentedSectionPtr ptr; ptr.setNull();
573 if (handle.m_cnt)
574 {
575 jam();
576 handle.getSection(ptr, FsOpenReq::FILENAME);
577 }
578 file->theFileName.set(this, userRef, fsOpenReq->fileNumber, false, ptr);
579 releaseSections(handle);
580
581 if (fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
582 {
583 jam();
584 Uint32 cnt = 16; // 512k
585 Ptr<GlobalPage> page_ptr;
586 m_ctx.m_mm.alloc_pages(RT_NDBFS_INIT_FILE_PAGE, &page_ptr.i, &cnt, 1);
587 if(cnt == 0)
588 {
589 file->m_page_ptr.setNull();
590 file->m_page_cnt = 0;
591
592 FsRef * const fsRef = (FsRef *)&signal->theData[0];
593 fsRef->userPointer = userPointer;
594 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
595 fsRef->osErrorCode = ~0; // Indicate local error
596 sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
597 return;
598 }
599 m_shared_page_pool.getPtr(page_ptr);
600 file->set_buffer(RT_NDBFS_INIT_FILE_PAGE, page_ptr, cnt);
601 }
602 else if (fsOpenReq->fileFlags & FsOpenReq::OM_WRITE_BUFFER)
603 {
604 jam();
605 Uint32 cnt = NDB_FILE_BUFFER_SIZE / GLOBAL_PAGE_SIZE; // 256k
606 Ptr<GlobalPage> page_ptr;
607 m_ctx.m_mm.alloc_pages(RT_FILE_BUFFER, &page_ptr.i, &cnt, 1);
608 if (cnt == 0)
609 {
610 jam();
611 file->m_page_ptr.setNull();
612 file->m_page_cnt = 0;
613
614 FsRef * const fsRef = (FsRef *)&signal->theData[0];
615 fsRef->userPointer = userPointer;
616 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
617 fsRef->osErrorCode = ~0; // Indicate local error
618 sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
619 return;
620 }
621 m_shared_page_pool.getPtr(page_ptr);
622 file->set_buffer(RT_FILE_BUFFER, page_ptr, cnt);
623 }
624 else
625 {
626 ndbassert(file->m_page_ptr.isNull());
627 file->m_page_ptr.setNull();
628 file->m_page_cnt = 0;
629 }
630
631 if (getenv("NDB_TRACE_OPEN"))
632 ndbout_c("open(%s) bound: %u", file->theFileName.c_str(), bound);
633
634 Request* request = theRequestPool->get();
635 request->action = Request::open;
636 request->error = 0;
637 request->set(userRef, userPointer, newId() );
638 request->file = file;
639 request->theTrace = signal->getTrace();
640 request->par.open.flags = fsOpenReq->fileFlags;
641 request->par.open.page_size = fsOpenReq->page_size;
642 request->par.open.file_size = fsOpenReq->file_size_hi;
643 request->par.open.file_size <<= 32;
644 request->par.open.file_size |= fsOpenReq->file_size_lo;
645 request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
646 request->m_do_bind = bound;
647
648 ndbrequire(forward(file, request));
649 }
650
651 void
execFSREMOVEREQ(Signal * signal)652 Ndbfs::execFSREMOVEREQ(Signal* signal)
653 {
654 jamEntry();
655 const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
656 const BlockReference userRef = req->userReference;
657 bool bound = true;
658 AsyncFile* file = getIdleFile(bound);
659 ndbrequire(file != NULL);
660
661 SectionHandle handle(this, signal);
662 SegmentedSectionPtr ptr; ptr.setNull();
663 if(handle.m_cnt)
664 {
665 jam();
666 handle.getSection(ptr, FsOpenReq::FILENAME);
667 }
668
669 file->theFileName.set(this, userRef, req->fileNumber, req->directory, ptr);
670 releaseSections(handle);
671
672 Uint32 version = FsOpenReq::getVersion(req->fileNumber);
673 Uint32 bp = FsOpenReq::v5_getLcpNo(req->fileNumber);
674
675 Request* request = theRequestPool->get();
676 request->action = Request::rmrf;
677 request->par.rmrf.directory = req->directory;
678 request->par.rmrf.own_directory = req->ownDirectory;
679 request->error = 0;
680 request->set(userRef, req->userPointer, newId() );
681 request->file = file;
682 request->theTrace = signal->getTrace();
683 request->m_do_bind = bound;
684
685 if (version == 6)
686 {
687 ndbrequire(bp < NDB_ARRAY_SIZE(m_base_path));
688 if (strlen(m_base_path[bp].c_str()) == 0)
689 {
690 goto ignore;
691 }
692 }
693
694 ndbrequire(forward(file, request));
695 return;
696 ignore:
697 report(request, signal);
698 }
699
700 /*
701 * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1
702 * remove file
703 */
704 void
execFSCLOSEREQ(Signal * signal)705 Ndbfs::execFSCLOSEREQ(Signal * signal)
706 {
707 jamEntry();
708 const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0];
709 const BlockReference userRef = fsCloseReq->userReference;
710 const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;
711 const UintR userPointer = fsCloseReq->userPointer;
712
713 AsyncFile* openFile = theOpenFiles.find(filePointer);
714 if (openFile == NULL) {
715 // The file was not open, send error back to sender
716 jam();
717 // Initialise FsRef signal
718 FsRef * const fsRef = (FsRef *)&signal->theData[0];
719 fsRef->userPointer = userPointer;
720 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
721 fsRef->osErrorCode = ~0; // Indicate local error
722 sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);
723
724 g_eventLogger->warning("Trying to close unknown file!! %u", userPointer);
725 g_eventLogger->warning("Dumping files");
726 signal->theData[0] = 405;
727 execDUMP_STATE_ORD(signal);
728 return;
729 }
730
731 if (getenv("NDB_TRACE_OPEN"))
732 ndbout_c("close(%s)", openFile->theFileName.c_str());
733
734 Request *request = theRequestPool->get();
735 if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
736 jam();
737 request->action = Request::closeRemove;
738 } else {
739 jam();
740 request->action = Request::close;
741 }
742 request->set(userRef, fsCloseReq->userPointer, filePointer);
743 request->file = openFile;
744 request->error = 0;
745 request->theTrace = signal->getTrace();
746 request->m_do_bind = false;
747
748 ndbrequire(forward(openFile, request));
749 }
750
751 void
readWriteRequest(int action,Signal * signal)752 Ndbfs::readWriteRequest(int action, Signal * signal)
753 {
754 Uint32 theData[25 + 2 * NDB_FS_RW_PAGES];
755 memcpy(theData, signal->theData, 4 * signal->getLength());
756 SectionHandle handle(this, signal);
757 if (handle.m_cnt > 0)
758 {
759 jam();
760 SegmentedSectionPtr secPtr;
761 ndbrequire(handle.getSection(secPtr, 0));
762 ndbrequire(signal->getLength() + secPtr.sz < NDB_ARRAY_SIZE(theData));
763 copy(theData + signal->getLength(), secPtr);
764 releaseSections(handle);
765 }
766
767 const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)theData;
768 Uint16 filePointer = (Uint16)fsRWReq->filePointer;
769 const UintR userPointer = fsRWReq->userPointer;
770 const BlockReference userRef = fsRWReq->userReference;
771 const BlockNumber blockNumber = refToMain(userRef);
772 const Uint32 instanceNumber = refToInstance(userRef);
773
774 AsyncFile* openFile = theOpenFiles.find(filePointer);
775
776 const NewVARIABLE *myBaseAddrRef =
777 &getBat(blockNumber, instanceNumber)[fsRWReq->varIndex];
778 UintPtr tPageSize;
779 UintPtr tClusterSize;
780 UintPtr tNRR;
781 UintPtr tPageOffset;
782 char* tWA;
783 FsRef::NdbfsErrorCodeType errorCode;
784
785 Request *request = theRequestPool->get();
786 request->error = 0;
787 request->set(userRef, userPointer, filePointer);
788 request->file = openFile;
789 request->action = (Request::Action) action;
790 request->theTrace = signal->getTrace();
791 request->m_do_bind = false;
792
793 Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
794
795 if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed
796 jam();
797 errorCode = FsRef::fsErrInvalidParameters;
798 goto error;
799 }
800
801 if(format != FsReadWriteReq::fsFormatGlobalPage &&
802 format != FsReadWriteReq::fsFormatSharedPage)
803 {
804 if (fsRWReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
805 jam();// Ensure that a valid variable is used
806 errorCode = FsRef::fsErrInvalidParameters;
807 goto error;
808 }
809 if (myBaseAddrRef == NULL) {
810 jam(); // Ensure that a valid variable is used
811 errorCode = FsRef::fsErrInvalidParameters;
812 goto error;
813 }
814 if (openFile == NULL) {
815 jam(); //file not open
816 errorCode = FsRef::fsErrFileDoesNotExist;
817 goto error;
818 }
819 tPageSize = pageSize(myBaseAddrRef);
820 tClusterSize = myBaseAddrRef->ClusterSize;
821 tNRR = myBaseAddrRef->nrr;
822 tWA = (char*)myBaseAddrRef->WA;
823
824 switch (format) {
825
826 // List of memory and file pages pairs
827 case FsReadWriteReq::fsFormatListOfPairs: {
828 jam();
829 for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
830 jam();
831 const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex;
832 const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset;
833 if (varIndex >= tNRR) {
834 jam();
835 errorCode = FsRef::fsErrInvalidParameters;
836 goto error;
837 }//if
838 request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
839 request->par.readWrite.pages[i].size = tPageSize;
840 request->par.readWrite.pages[i].offset = (off_t)(fileOffset*tPageSize);
841 }//for
842 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
843 break;
844 }//case
845
846 // Range of memory page with one file page
847 case FsReadWriteReq::fsFormatArrayOfPages: {
848 if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {
849 jam();
850 errorCode = FsRef::fsErrInvalidParameters;
851 goto error;
852 }//if
853 const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex;
854 const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset;
855
856 request->par.readWrite.pages[0].offset = (off_t)(fileOffset * tPageSize);
857 request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;
858 request->par.readWrite.numberOfPages = 1;
859 request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];
860 break;
861 }//case
862
863 // List of memory pages followed by one file page
864 case FsReadWriteReq::fsFormatListOfMemPages: {
865
866 tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];
867 tPageOffset *= tPageSize;
868
869 for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
870 jam();
871 UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i];
872
873 if (varIndex >= tNRR) {
874 jam();
875 errorCode = FsRef::fsErrInvalidParameters;
876 goto error;
877 }//if
878 request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
879 request->par.readWrite.pages[i].size = tPageSize;
880 request->par.readWrite.pages[i].offset = (off_t)
881 (tPageOffset + (i*tPageSize));
882 }//for
883 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
884 break;
885 // make it a writev or readv
886 }//case
887
888 case FsReadWriteReq::fsFormatMemAddress:
889 {
890 jam();
891 const Uint32 memoryOffset = fsRWReq->data.memoryAddress.memoryOffset;
892 const Uint32 fileOffset = fsRWReq->data.memoryAddress.fileOffset;
893 const Uint32 sz = fsRWReq->data.memoryAddress.size;
894
895 request->par.readWrite.pages[0].buf = &tWA[memoryOffset];
896 request->par.readWrite.pages[0].size = sz;
897 request->par.readWrite.pages[0].offset = (off_t)(fileOffset);
898 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
899 break;
900 }
901 default: {
902 jam();
903 errorCode = FsRef::fsErrInvalidParameters;
904 goto error;
905 }//default
906 }//switch
907 }
908 else if (format == FsReadWriteReq::fsFormatGlobalPage)
909 {
910 Ptr<GlobalPage> ptr;
911 m_global_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
912 request->par.readWrite.pages[0].buf = (char*)ptr.p;
913 request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
914 request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
915 request->par.readWrite.numberOfPages = 1;
916 }
917 else
918 {
919 ndbrequire(format == FsReadWriteReq::fsFormatSharedPage);
920 Ptr<GlobalPage> ptr;
921 m_shared_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]);
922 request->par.readWrite.pages[0].buf = (char*)ptr.p;
923 request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages;
924 request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex;
925 request->par.readWrite.numberOfPages = 1;
926 }
927
928 ndbrequire(forward(openFile, request));
929 return;
930
931 error:
932 theRequestPool->put(request);
933 FsRef * const fsRef = (FsRef *)&signal->theData[0];
934 fsRef->userPointer = userPointer;
935 fsRef->setErrorCode(fsRef->errorCode, errorCode);
936 fsRef->osErrorCode = ~0; // Indicate local error
937 switch (action) {
938 case Request:: write:
939 case Request:: writeSync: {
940 jam();
941 sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);
942 break;
943 }//case
944 case Request:: readPartial:
945 case Request:: read: {
946 jam();
947 sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
948 }//case
949 }//switch
950 return;
951 }
952
953 /*
954 PR0: File Pointer , theData[0]
955 DR0: User reference, theData[1]
956 DR1: User Pointer, etc.
957 DR2: Flag
958 DR3: Var number
959 DR4: amount of pages
960 DR5->: Memory Page id and File page id according to Flag
961 */
962 void
execFSWRITEREQ(Signal * signal)963 Ndbfs::execFSWRITEREQ(Signal* signal)
964 {
965 jamEntry();
966 const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0];
967
968 if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){
969 jam();
970 readWriteRequest( Request::writeSync, signal );
971 } else {
972 jam();
973 readWriteRequest( Request::write, signal );
974 }
975 }
976
977 /*
978 PR0: File Pointer
979 DR0: User reference
980 DR1: User Pointer
981 DR2: Flag
982 DR3: Var number
983 DR4: amount of pages
984 DR5->: Memory Page id and File page id according to Flag
985 */
986 void
execFSREADREQ(Signal * signal)987 Ndbfs::execFSREADREQ(Signal* signal)
988 {
989 jamEntry();
990 FsReadWriteReq * req = (FsReadWriteReq *)signal->getDataPtr();
991 if (FsReadWriteReq::getPartialReadFlag(req->operationFlag))
992 {
993 jam();
994 readWriteRequest( Request::readPartial, signal );
995 }
996 else
997 {
998 jam();
999 readWriteRequest( Request::read, signal );
1000 }
1001 }
1002
1003 /*
1004 * PR0: File Pointer DR0: User reference DR1: User Pointer
1005 */
1006 void
execFSSYNCREQ(Signal * signal)1007 Ndbfs::execFSSYNCREQ(Signal * signal)
1008 {
1009 jamEntry();
1010 Uint16 filePointer = (Uint16)signal->theData[0];
1011 BlockReference userRef = signal->theData[1];
1012 const UintR userPointer = signal->theData[2];
1013 AsyncFile* openFile = theOpenFiles.find(filePointer);
1014
1015 if (openFile == NULL) {
1016 jam(); //file not open
1017 FsRef * const fsRef = (FsRef *)&signal->theData[0];
1018 fsRef->userPointer = userPointer;
1019 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
1020 fsRef->osErrorCode = ~0; // Indicate local error
1021 sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);
1022 return;
1023 }
1024
1025 Request *request = theRequestPool->get();
1026 request->error = 0;
1027 request->action = Request::sync;
1028 request->set(userRef, userPointer, filePointer);
1029 request->file = openFile;
1030 request->theTrace = signal->getTrace();
1031 request->m_do_bind = false;
1032
1033 ndbrequire(forward(openFile,request));
1034 }
1035
1036 /*
1037 * PR0: File Pointer DR0: User reference DR1: User Pointer
1038 */
1039 void
execFSSUSPENDORD(Signal * signal)1040 Ndbfs::execFSSUSPENDORD(Signal * signal)
1041 {
1042 jamEntry();
1043 Uint16 filePointer = (Uint16)signal->theData[0];
1044 Uint32 millis = signal->theData[1];
1045 AsyncFile* openFile = theOpenFiles.find(filePointer);
1046
1047 if (openFile == NULL)
1048 {
1049 jam(); //file not open
1050 return;
1051 }
1052
1053 Request *request = theRequestPool->get();
1054 request->error = 0;
1055 request->action = Request::suspend;
1056 request->set(0, 0, filePointer);
1057 request->file = openFile;
1058 request->theTrace = signal->getTrace();
1059 request->par.suspend.milliseconds = millis;
1060 request->m_do_bind = false;
1061
1062 ndbrequire(forward(openFile,request));
1063 }
1064
1065 void
execFSAPPENDREQ(Signal * signal)1066 Ndbfs::execFSAPPENDREQ(Signal * signal)
1067 {
1068 const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];
1069 const Uint16 filePointer = (Uint16)fsReq->filePointer;
1070 const UintR userPointer = fsReq->userPointer;
1071 const BlockReference userRef = fsReq->userReference;
1072 const BlockNumber blockNumber = refToMain(userRef);
1073 const Uint32 instanceNumber = refToInstance(userRef);
1074
1075 FsRef::NdbfsErrorCodeType errorCode;
1076
1077 AsyncFile* openFile = theOpenFiles.find(filePointer);
1078 const NewVARIABLE *myBaseAddrRef =
1079 &getBat(blockNumber, instanceNumber)[fsReq->varIndex];
1080
1081 const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA;
1082 const Uint32 tSz = myBaseAddrRef->nrr;
1083 const Uint32 offset = fsReq->offset;
1084 const Uint32 size = fsReq->size;
1085 const Uint32 synch_flag = fsReq->synch_flag;
1086 Request *request = theRequestPool->get();
1087
1088 if (openFile == NULL) {
1089 jam();
1090 errorCode = FsRef::fsErrFileDoesNotExist;
1091 goto error;
1092 }
1093
1094 if (myBaseAddrRef == NULL) {
1095 jam(); // Ensure that a valid variable is used
1096 errorCode = FsRef::fsErrInvalidParameters;
1097 goto error;
1098 }
1099
1100 if (fsReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
1101 jam();// Ensure that a valid variable is used
1102 errorCode = FsRef::fsErrInvalidParameters;
1103 goto error;
1104 }
1105
1106 if(offset + size > tSz){
1107 jam(); // Ensure that a valid variable is used
1108 errorCode = FsRef::fsErrInvalidParameters;
1109 goto error;
1110 }
1111
1112 request->error = 0;
1113 request->set(userRef, userPointer, filePointer);
1114 request->file = openFile;
1115 request->theTrace = signal->getTrace();
1116
1117 request->par.append.buf = (const char *)(tWA + offset);
1118 request->par.append.size = size << 2;
1119
1120 if (!synch_flag)
1121 request->action = Request::append;
1122 else
1123 request->action = Request::append_synch;
1124 request->m_do_bind = false;
1125 ndbrequire(forward(openFile, request));
1126 return;
1127
1128 error:
1129 jam();
1130 theRequestPool->put(request);
1131 FsRef * const fsRef = (FsRef *)&signal->theData[0];
1132 fsRef->userPointer = userPointer;
1133 fsRef->setErrorCode(fsRef->errorCode, errorCode);
1134 fsRef->osErrorCode = ~0; // Indicate local error
1135
1136 jam();
1137 sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB);
1138 return;
1139 }
1140
1141 void
execALLOC_MEM_REQ(Signal * signal)1142 Ndbfs::execALLOC_MEM_REQ(Signal* signal)
1143 {
1144 jamEntry();
1145
1146 AllocMemReq* req = (AllocMemReq*)signal->getDataPtr();
1147
1148 bool bound = true;
1149 AsyncFile* file = getIdleFile(bound);
1150 ndbrequire(file != NULL);
1151
1152 Request *request = theRequestPool->get();
1153
1154 request->error = 0;
1155 request->set(req->senderRef, req->senderData, 0);
1156 request->file = file;
1157 request->theTrace = signal->getTrace();
1158
1159 request->par.alloc.ctx = &m_ctx;
1160 request->par.alloc.requestInfo = req->requestInfo;
1161 request->par.alloc.bytes = (Uint64(req->bytes_hi) << 32) + req->bytes_lo;
1162 request->action = Request::allocmem;
1163 request->m_do_bind = bound;
1164 ndbrequire(forward(file, request));
1165 }
1166
1167 void
execBUILD_INDX_IMPL_REQ(Signal * signal)1168 Ndbfs::execBUILD_INDX_IMPL_REQ(Signal* signal)
1169 {
1170 jamEntry();
1171 mt_BuildIndxReq * req = (mt_BuildIndxReq*)signal->getDataPtr();
1172
1173 bool bound = true;
1174 AsyncFile* file = getIdleFile(bound);
1175 ndbrequire(file != NULL);
1176
1177 Request *request = theRequestPool->get();
1178 request->error = 0;
1179 request->set(req->senderRef, req->senderData, 0);
1180 request->file = file;
1181 request->theTrace = signal->getTrace();
1182
1183 Uint32 cnt = (req->buffer_size + 32768 - 1) / 32768;
1184 Uint32 save = cnt;
1185 Ptr<GlobalPage> page_ptr;
1186 m_ctx.m_mm.alloc_pages(RT_NDBFS_BUILD_INDEX_PAGE, &page_ptr.i, &cnt, cnt);
1187 if(cnt == 0)
1188 {
1189 file->m_page_ptr.setNull();
1190 file->m_page_cnt = 0;
1191
1192 ndbabort(); // TODO
1193 return;
1194 }
1195
1196 ndbrequire(cnt == save);
1197
1198 m_shared_page_pool.getPtr(page_ptr);
1199 file->set_buffer(RT_NDBFS_BUILD_INDEX_PAGE, page_ptr, cnt);
1200
1201 memcpy(&request->par.build.m_req, req, sizeof(* req));
1202 request->action = Request::buildindx;
1203 request->m_do_bind = bound;
1204 ndbrequire(forward(file, request));
1205 }
1206
1207 Uint16
newId()1208 Ndbfs::newId()
1209 {
1210 // finds a new key, eg a new filepointer
1211 for (int i = 1; i < SHRT_MAX; i++)
1212 {
1213 if (theLastId == SHRT_MAX) {
1214 jam();
1215 theLastId = 1;
1216 } else {
1217 jam();
1218 theLastId++;
1219 }
1220
1221 if(theOpenFiles.find(theLastId) == NULL) {
1222 jam();
1223 return theLastId;
1224 }
1225 }
1226 ndbrequire(1 == 0);
1227 // The program will not reach this point
1228 return 0;
1229 }
1230
1231 AsyncFile*
createAsyncFile()1232 Ndbfs::createAsyncFile()
1233 {
1234 // Check limit of open files
1235 if (m_maxFiles !=0 && theFiles.size() == m_maxFiles)
1236 {
1237 // Print info about all open files
1238 for (unsigned i = 0; i < theFiles.size(); i++){
1239 AsyncFile* file = theFiles[i];
1240 ndbout_c("%2d (0x%lx): %s",
1241 i,
1242 (long) file,
1243 file->isOpen() ?"OPEN" : "CLOSED");
1244 }
1245 ndbout_c("m_maxFiles: %u, theFiles.size() = %u",
1246 m_maxFiles, theFiles.size());
1247 ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile: creating more than MaxNoOfOpenFiles");
1248 }
1249
1250 #ifdef _WIN32
1251 AsyncFile* file = new Win32AsyncFile(* this);
1252 #else
1253 AsyncFile* file = new PosixAsyncFile(* this);
1254 #endif
1255 int err = file->init();
1256 if (err == -1)
1257 {
1258 ERROR_SET(fatal, NDBD_EXIT_AFS_ZLIB_INIT_FAIL, "", " Ndbfs::createAsyncFile: Zlib init failure");
1259 }
1260 else if(err)
1261 {
1262 ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
1263 }
1264
1265 theFiles.push_back(file);
1266 return file;
1267 }
1268
1269 void
pushIdleFile(AsyncFile * file)1270 Ndbfs::pushIdleFile(AsyncFile* file)
1271 {
1272 assert(file->getThread() == 0);
1273 if (file->thread_bound())
1274 {
1275 m_active_bound_threads_cnt--;
1276 file->set_thread_bound(false);
1277 }
1278 theIdleFiles.push_back(file);
1279 }
1280
1281 AsyncIoThread*
createIoThread(bool bound)1282 Ndbfs::createIoThread(bool bound)
1283 {
1284 AsyncIoThread* thr = new AsyncIoThread(*this, bound);
1285 if (thr)
1286 {
1287 #ifdef VM_TRACE
1288 ndbout_c("NDBFS: Created new file thread %d", theThreads.size());
1289 #endif
1290
1291 struct NdbThread* thrptr = thr->doStart();
1292 globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
1293 thr->set_real_time(
1294 globalEmulatorData.theConfiguration->get_io_real_time());
1295
1296 if (bound)
1297 m_bound_threads_cnt++;
1298 else
1299 m_unbounds_threads_cnt++;
1300 }
1301
1302 return thr;
1303 }
1304
1305 AsyncFile*
getIdleFile(bool bound)1306 Ndbfs::getIdleFile(bool bound)
1307 {
1308 AsyncFile* file = 0;
1309 Uint32 sz = theIdleFiles.size();
1310 if (sz)
1311 {
1312 file = theIdleFiles[sz - 1];
1313 theIdleFiles.erase(sz - 1);
1314 }
1315 else
1316 {
1317 file = createAsyncFile();
1318 }
1319
1320 if (bound)
1321 {
1322 /**
1323 * Check if we should create thread
1324 */
1325 if (m_active_bound_threads_cnt == m_bound_threads_cnt)
1326 {
1327 AsyncIoThread * thr = createIoThread(true);
1328 if (thr)
1329 {
1330 theThreads.push_back(thr);
1331 }
1332 }
1333
1334 file->set_thread_bound(true);
1335 m_active_bound_threads_cnt++;
1336 }
1337 return file;
1338 }
1339
1340 void
report(Request * request,Signal * signal)1341 Ndbfs::report(Request * request, Signal* signal)
1342 {
1343 const Uint32 orgTrace = signal->getTrace();
1344 signal->setTrace(request->theTrace);
1345 const BlockReference ref = request->theUserReference;
1346
1347 if (request->file->has_buffer())
1348 {
1349 if ((request->action == Request::open && request->error) ||
1350 request->action == Request::close ||
1351 request->action == Request::closeRemove ||
1352 request->action == Request::buildindx)
1353 {
1354 Uint32 rg;
1355 Uint32 cnt;
1356 Ptr<GlobalPage> ptr;
1357 request->file->clear_buffer(rg, ptr, cnt);
1358 m_ctx.m_mm.release_pages(rg, ptr.i, cnt);
1359 }
1360 }
1361
1362 if (request->error) {
1363 jam();
1364 // Initialise FsRef signal
1365 FsRef * const fsRef = (FsRef *)&signal->theData[0];
1366 fsRef->userPointer = request->theUserPointer;
1367 if(request->error & FsRef::FS_ERR_BIT)
1368 {
1369 fsRef->errorCode = request->error;
1370 fsRef->osErrorCode = 0;
1371 }
1372 else
1373 {
1374 fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error));
1375 fsRef->osErrorCode = request->error;
1376 }
1377 switch (request->action) {
1378 case Request:: open: {
1379 jam();
1380 // Put the file back in idle files list
1381 pushIdleFile(request->file);
1382 sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
1383 break;
1384 }
1385 case Request:: closeRemove:
1386 case Request:: close: {
1387 jam();
1388 sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB);
1389
1390 g_eventLogger->warning("Error closing file: %s %u/%u",
1391 request->file->theFileName.c_str(),
1392 fsRef->errorCode,
1393 fsRef->osErrorCode);
1394 g_eventLogger->warning("Dumping files");
1395 signal->theData[0] = 405;
1396 execDUMP_STATE_ORD(signal);
1397 break;
1398 }
1399 case Request:: writeSync:
1400 case Request:: writevSync:
1401 case Request:: write:
1402 case Request:: writev: {
1403 jam();
1404 sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB);
1405 break;
1406 }
1407 case Request:: read:
1408 case Request:: readPartial:
1409 case Request:: readv: {
1410 jam();
1411 sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB);
1412 break;
1413 }
1414 case Request:: sync: {
1415 jam();
1416 sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
1417 break;
1418 }
1419 case Request::append:
1420 case Request::append_synch:
1421 {
1422 jam();
1423 sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
1424 break;
1425 }
1426 case Request::rmrf: {
1427 jam();
1428 // Put the file back in idle files list
1429 pushIdleFile(request->file);
1430 sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
1431 break;
1432 }
1433
1434 case Request:: end: {
1435 case Request:: suspend:
1436 // Report nothing
1437 break;
1438 }
1439 case Request::allocmem: {
1440 jam();
1441 AllocMemRef* rep = (AllocMemRef*)signal->getDataPtrSend();
1442 rep->senderRef = reference();
1443 rep->senderData = request->theUserPointer;
1444 rep->errorCode = request->error;
1445 sendSignal(ref, GSN_ALLOC_MEM_REF, signal,
1446 AllocMemRef::SignalLength, JBB);
1447 pushIdleFile(request->file);
1448 break;
1449 }
1450 case Request::buildindx: {
1451 jam();
1452 BuildIndxImplRef* rep = (BuildIndxImplRef*)signal->getDataPtrSend();
1453 rep->senderRef = reference();
1454 rep->senderData = request->theUserPointer;
1455 rep->errorCode = (BuildIndxImplRef::ErrorCode)request->error;
1456 sendSignal(ref, GSN_BUILD_INDX_IMPL_REF, signal,
1457 BuildIndxImplRef::SignalLength, JBB);
1458 pushIdleFile(request->file);
1459 break;
1460 }
1461 }//switch
1462 } else {
1463 jam();
1464 FsConf * const fsConf = (FsConf *)&signal->theData[0];
1465 fsConf->userPointer = request->theUserPointer;
1466 switch (request->action) {
1467 case Request:: open: {
1468 jam();
1469 theOpenFiles.insert(request->file, request->theFilePointer);
1470
1471 // Keep track on max number of opened files
1472 if (theOpenFiles.size() > m_maxOpenedFiles)
1473 m_maxOpenedFiles = theOpenFiles.size();
1474
1475 fsConf->filePointer = request->theFilePointer;
1476 fsConf->fileInfo = request->m_fileinfo;
1477 fsConf->file_size_hi = request->m_file_size_hi;
1478 fsConf->file_size_lo = request->m_file_size_lo;
1479 sendSignal(ref, GSN_FSOPENCONF, signal, 5, JBA);
1480 break;
1481 }
1482 case Request:: closeRemove:
1483 case Request:: close: {
1484 jam();
1485 // removes the file from OpenFiles list
1486 theOpenFiles.erase(request->theFilePointer);
1487 // Put the file in idle files list
1488 pushIdleFile(request->file);
1489 sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBA);
1490 break;
1491 }
1492 case Request:: writeSync:
1493 case Request:: writevSync:
1494 case Request:: write:
1495 case Request:: writev: {
1496 jam();
1497 sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBA);
1498 break;
1499 }
1500 case Request:: read:
1501 case Request:: readv: {
1502 jam();
1503 sendSignal(ref, GSN_FSREADCONF, signal, 1, JBA);
1504 break;
1505 }
1506 case Request:: readPartial: {
1507 jam();
1508 fsConf->bytes_read = Uint32(request->par.readWrite.pages[0].size);
1509 sendSignal(ref, GSN_FSREADCONF, signal, 2, JBA);
1510 break;
1511 }
1512 case Request:: sync: {
1513 jam();
1514 sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBA);
1515 break;
1516 }//case
1517 case Request::append:
1518 case Request::append_synch:
1519 {
1520 jam();
1521 signal->theData[1] = Uint32(request->par.append.size);
1522 sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBA);
1523 break;
1524 }
1525 case Request::rmrf: {
1526 jam();
1527 // Put the file in idle files list
1528 pushIdleFile(request->file);
1529 sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBA);
1530 break;
1531 }
1532 case Request:: end: {
1533 case Request:: suspend:
1534 // Report nothing
1535 break;
1536 }
1537 case Request::allocmem: {
1538 jam();
1539 AllocMemConf* conf = (AllocMemConf*)signal->getDataPtrSend();
1540 conf->senderRef = reference();
1541 conf->senderData = request->theUserPointer;
1542 conf->bytes_hi = Uint32(request->par.alloc.bytes >> 32);
1543 conf->bytes_lo = Uint32(request->par.alloc.bytes);
1544 sendSignal(ref, GSN_ALLOC_MEM_CONF, signal,
1545 AllocMemConf::SignalLength, JBB);
1546 pushIdleFile(request->file);
1547 break;
1548 }
1549 case Request::buildindx: {
1550 jam();
1551 BuildIndxImplConf* rep = (BuildIndxImplConf*)signal->getDataPtrSend();
1552 rep->senderRef = reference();
1553 rep->senderData = request->theUserPointer;
1554 sendSignal(ref, GSN_BUILD_INDX_IMPL_CONF, signal,
1555 BuildIndxImplConf::SignalLength, JBB);
1556 pushIdleFile(request->file);
1557 break;
1558 }
1559 }
1560 }//if
1561 signal->setTrace(orgTrace);
1562 }
1563
1564
1565 bool
scanIPC(Signal * signal)1566 Ndbfs::scanIPC(Signal* signal)
1567 {
1568 Request* request = theFromThreads.tryReadChannel();
1569 jamDebug();
1570 if (request) {
1571 jam();
1572 report(request, signal);
1573 theRequestPool->put(request);
1574 return true;
1575 }
1576 return false;
1577 }
1578
1579 #ifdef _WIN32
translateErrno(int aErrno)1580 Uint32 Ndbfs::translateErrno(int aErrno)
1581 {
1582 switch (aErrno)
1583 {
1584 //permission denied
1585 case ERROR_ACCESS_DENIED:
1586
1587 return FsRef::fsErrPermissionDenied;
1588 //temporary not accessible
1589 case ERROR_PATH_BUSY:
1590 case ERROR_NO_MORE_SEARCH_HANDLES:
1591
1592 return FsRef::fsErrTemporaryNotAccessible;
1593 //no space left on device
1594 case ERROR_HANDLE_DISK_FULL:
1595 case ERROR_DISK_FULL:
1596
1597 return FsRef::fsErrNoSpaceLeftOnDevice;
1598 //none valid parameters
1599 case ERROR_INVALID_HANDLE:
1600 case ERROR_INVALID_DRIVE:
1601 case ERROR_INVALID_ACCESS:
1602 case ERROR_HANDLE_EOF:
1603 case ERROR_BUFFER_OVERFLOW:
1604
1605 return FsRef::fsErrInvalidParameters;
1606 //environment error
1607 case ERROR_CRC:
1608 case ERROR_ARENA_TRASHED:
1609 case ERROR_BAD_ENVIRONMENT:
1610 case ERROR_INVALID_BLOCK:
1611 case ERROR_WRITE_FAULT:
1612 case ERROR_READ_FAULT:
1613 case ERROR_OPEN_FAILED:
1614
1615 return FsRef::fsErrEnvironmentError;
1616
1617 //no more process resources
1618 case ERROR_TOO_MANY_OPEN_FILES:
1619 case ERROR_NOT_ENOUGH_MEMORY:
1620 case ERROR_OUTOFMEMORY:
1621 return FsRef::fsErrNoMoreResources;
1622 //no file
1623 case ERROR_FILE_NOT_FOUND:
1624 return FsRef::fsErrFileDoesNotExist;
1625
1626 case ERR_ReadUnderflow:
1627 return FsRef::fsErrReadUnderflow;
1628
1629 default:
1630 return FsRef::fsErrUnknown;
1631 }
1632 }
1633 #else
translateErrno(int aErrno)1634 Uint32 Ndbfs::translateErrno(int aErrno)
1635 {
1636 switch (aErrno)
1637 {
1638 //permission denied
1639 case EACCES:
1640 case EROFS:
1641 case ENXIO:
1642 return FsRef::fsErrPermissionDenied;
1643 //temporary not accessible
1644 case EAGAIN:
1645 case ETIMEDOUT:
1646 case ENOLCK:
1647 case EINTR:
1648 case EIO:
1649 return FsRef::fsErrTemporaryNotAccessible;
1650 //no space left on device
1651 case ENFILE:
1652 case EDQUOT:
1653 #ifdef ENOSR
1654 case ENOSR:
1655 #endif
1656 case ENOSPC:
1657 case EFBIG:
1658 return FsRef::fsErrNoSpaceLeftOnDevice;
1659 //none valid parameters
1660 case EINVAL:
1661 case EBADF:
1662 case ENAMETOOLONG:
1663 case EFAULT:
1664 case EISDIR:
1665 case ENOTDIR:
1666 case EEXIST:
1667 case ETXTBSY:
1668 return FsRef::fsErrInvalidParameters;
1669 //environment error
1670 case ELOOP:
1671 #ifdef ENOLINK
1672 case ENOLINK:
1673 #endif
1674 #ifdef EMULTIHOP
1675 case EMULTIHOP:
1676 #endif
1677 #ifdef EOPNOTSUPP
1678 case EOPNOTSUPP:
1679 #endif
1680 #ifdef ESPIPE
1681 case ESPIPE:
1682 #endif
1683 case EPIPE:
1684 return FsRef::fsErrEnvironmentError;
1685
1686 //no more process resources
1687 case EMFILE:
1688 case ENOMEM:
1689 return FsRef::fsErrNoMoreResources;
1690 //no file
1691 case ENOENT:
1692 return FsRef::fsErrFileDoesNotExist;
1693
1694 case ERR_ReadUnderflow:
1695 return FsRef::fsErrReadUnderflow;
1696
1697 default:
1698 return FsRef::fsErrUnknown;
1699 }
1700 }
1701 #endif
1702
1703
1704
1705 void
execCONTINUEB(Signal * signal)1706 Ndbfs::execCONTINUEB(Signal* signal)
1707 {
1708 jamEntry();
1709 if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) {
1710 jam();
1711
1712 // Also send CONTINUEB to ourself in order to scan for
1713 // incoming answers from AsyncFile on MemoryChannel theFromThreads
1714 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
1715 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
1716 if (scanningInProgress == true) {
1717 jam();
1718 return;
1719 }
1720 }
1721 if (scanIPC(signal))
1722 {
1723 jam();
1724 scanningInProgress = true;
1725 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1726 sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1727 }
1728 else
1729 {
1730 jam();
1731 scanningInProgress = false;
1732 }
1733 return;
1734 }
1735
1736 void
execSEND_PACKED(Signal * signal)1737 Ndbfs::execSEND_PACKED(Signal* signal)
1738 {
1739 jamEntryDebug();
1740 if (scanIPC(signal))
1741 {
1742 if (scanningInProgress == false)
1743 {
1744 jam();
1745 scanningInProgress = true;
1746 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
1747 sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
1748 }
1749 signal->theData[0] = 1;
1750 return;
1751 }
1752 if (scanningInProgress == false)
1753 signal->theData[0] = 0;
1754 else
1755 signal->theData[0] = 1;
1756 }
1757
1758 void
execDUMP_STATE_ORD(Signal * signal)1759 Ndbfs::execDUMP_STATE_ORD(Signal* signal)
1760 {
1761 if(signal->theData[0] == 19){
1762 return;
1763 }
1764 if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){
1765 infoEvent("NDBFS: Files: %d Open files: %d",
1766 theFiles.size(),
1767 theOpenFiles.size());
1768 infoEvent(" Idle files: %u Max opened files: %d",
1769 theIdleFiles.size(),
1770 m_maxOpenedFiles);
1771 infoEvent(" Bound Threads: %u (active %u) Unbound threads: %u",
1772 m_bound_threads_cnt,
1773 m_active_bound_threads_cnt,
1774 m_unbounds_threads_cnt);
1775 infoEvent(" Max files: %d",
1776 m_maxFiles);
1777 infoEvent(" Requests: %d",
1778 theRequestPool->size());
1779
1780 return;
1781 }
1782 if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){
1783 infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size());
1784
1785 for (unsigned i = 0; i < theOpenFiles.size(); i++){
1786 AsyncFile* file = theOpenFiles.getFile(i);
1787 infoEvent("%2d (0x%lx): %s thr: %lx", i,
1788 (long)file,
1789 file->theFileName.c_str(),
1790 (long)file->getThread());
1791 }
1792 return;
1793 }
1794 if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){
1795 infoEvent("NDBFS: Dump all files: %d", theFiles.size());
1796
1797 for (unsigned i = 0; i < theFiles.size(); i++){
1798 AsyncFile* file = theFiles[i];
1799 infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
1800 }
1801 return;
1802 }
1803 if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
1804 infoEvent("NDBFS: Dump idle files: %u",
1805 theIdleFiles.size());
1806
1807 for (unsigned i = 0; i < theIdleFiles.size(); i++){
1808 AsyncFile* file = theIdleFiles[i];
1809 infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
1810 }
1811
1812 return;
1813 }
1814 if(signal->theData[0] == DumpStateOrd::NdbfsDumpRequests)
1815 {
1816 g_eventLogger->info("NDBFS: Dump requests: %u",
1817 theRequestPool->inuse());
1818 for (unsigned ridx=0; ridx < theRequestPool->inuse(); ridx++)
1819 {
1820 const Request* req = theRequestPool->peekInuseItem(ridx);
1821 Uint64 duration = 0;
1822
1823 if (NdbTick_IsValid(req->m_startTime))
1824 {
1825 duration = NdbTick_Elapsed(req->m_startTime,
1826 getHighResTimer()).microSec();
1827 }
1828
1829 g_eventLogger->info("Request %u action %u %s userRef 0x%x "
1830 "userPtr %u filePtr %u bind %u "
1831 "duration(us) %llu filename %s",
1832 ridx,
1833 req->action,
1834 Request::actionName(req->action),
1835 req->theUserReference,
1836 req->theUserPointer,
1837 req->theFilePointer,
1838 req->m_do_bind,
1839 duration,
1840 (req->file?
1841 req->file->theFileName.c_str():
1842 "NO FILE"));
1843 }
1844 return;
1845 }
1846
1847
1848 if(signal->theData[0] == 404)
1849 {
1850 #if 0
1851 ndbrequire(signal->getLength() == 2);
1852 Uint32 file= signal->theData[1];
1853 AsyncFile* openFile = theOpenFiles.find(file);
1854 ndbrequire(openFile != 0);
1855 ndbout_c("File: %s %p", openFile->theFileName.c_str(), openFile);
1856 Request* curr = openFile->m_current_request;
1857 Request* last = openFile->m_last_request;
1858 if(curr)
1859 ndbout << "Current request: " << *curr << endl;
1860 if(last)
1861 ndbout << "Last request: " << *last << endl;
1862
1863 ndbout << "theReportTo " << *openFile->theReportTo << endl;
1864 ndbout << "theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl;
1865
1866 ndbout << "All files: " << endl;
1867 for (unsigned i = 0; i < theFiles.size(); i++){
1868 AsyncFile* file = theFiles[i];
1869 ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
1870 }
1871 #endif
1872 }
1873
1874 if(signal->theData[0] == 405)
1875 {
1876 for (unsigned i = 0; i < theFiles.size(); i++)
1877 {
1878 AsyncFile* file = theFiles[i];
1879 if (file == 0)
1880 continue;
1881 ndbout_c("%u : %s %s fileInfo=%08x", i,
1882 file->theFileName.c_str() ? file->theFileName.c_str() : "",
1883 file->isOpen() ? "OPEN" : "CLOSED",
1884 file->get_fileinfo());
1885 }
1886 }
1887 }//Ndbfs::execDUMP_STATE_ORD()
1888
1889 const char*
get_filename(Uint32 fd) const1890 Ndbfs::get_filename(Uint32 fd) const
1891 {
1892 jamNoBlock();
1893 const AsyncFile* openFile = theOpenFiles.find(fd);
1894 if(openFile)
1895 return openFile->theFileName.get_base_name();
1896 return "";
1897 }
1898
1899
1900 BLOCK_FUNCTIONS(Ndbfs)
1901
1902 template class Vector<AsyncFile*>;
1903 template class Vector<AsyncIoThread*>;
1904 template class Vector<OpenFiles::OpenFileItem>;
1905 template class MemoryChannel<Request>;
1906 template class Pool<Request>;
1907 template NdbOut& operator<<(NdbOut&, const MemoryChannel<Request>&);
1908