1 /*
2 Copyright (c) 2005, 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #include "lgman.hpp"
20 #include "diskpage.hpp"
21 #include <signaldata/FsRef.hpp>
22 #include <signaldata/FsConf.hpp>
23 #include <signaldata/FsOpenReq.hpp>
24 #include <signaldata/FsCloseReq.hpp>
25 #include <signaldata/CreateFilegroupImpl.hpp>
26 #include <signaldata/DropFilegroupImpl.hpp>
27 #include <signaldata/FsReadWriteReq.hpp>
28 #include <signaldata/LCP.hpp>
29 #include <signaldata/SumaImpl.hpp>
30 #include <signaldata/LgmanContinueB.hpp>
31 #include <signaldata/GetTabInfo.hpp>
32 #include "ndbfs/Ndbfs.hpp"
33 #include "dbtup/Dbtup.hpp"
34
35 #include <EventLogger.hpp>
36 extern EventLogger g_eventLogger;
37
38 #include <record_types.hpp>
39
40 /**
41 * ---<a>-----<b>-----<c>-----<d>---> (time)
42 *
43 * <a> = start of lcp 1
44 * <b> = stop of lcp 1
45 * <c> = start of lcp 2
46 * <d> = stop of lcp 2
47 *
48 * If ndb crashes before <d>
49 * the entire undo log from crash point until <a> has to be applied
50 *
51 * at <d> the undo log can be cut til <c>
52 */
53
54 #define DEBUG_UNDO_EXECUTION 0
55 #define DEBUG_SEARCH_LOG_HEAD 0
56
Lgman(Block_context & ctx)57 Lgman::Lgman(Block_context & ctx) :
58 SimulatedBlock(LGMAN, ctx),
59 m_logfile_group_list(m_logfile_group_pool),
60 m_logfile_group_hash(m_logfile_group_pool)
61 {
62 BLOCK_CONSTRUCTOR(Lgman);
63
64 // Add received signals
65 addRecSignal(GSN_STTOR, &Lgman::execSTTOR);
66 addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ);
67 addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD);
68 addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB);
69
70 addRecSignal(GSN_CREATE_FILE_REQ, &Lgman::execCREATE_FILE_REQ);
71 addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Lgman::execCREATE_FILEGROUP_REQ);
72
73 addRecSignal(GSN_DROP_FILE_REQ, &Lgman::execDROP_FILE_REQ);
74 addRecSignal(GSN_DROP_FILEGROUP_REQ, &Lgman::execDROP_FILEGROUP_REQ);
75
76 addRecSignal(GSN_FSWRITEREQ, &Lgman::execFSWRITEREQ);
77 addRecSignal(GSN_FSWRITEREF, &Lgman::execFSWRITEREF, true);
78 addRecSignal(GSN_FSWRITECONF, &Lgman::execFSWRITECONF);
79
80 addRecSignal(GSN_FSOPENREF, &Lgman::execFSOPENREF, true);
81 addRecSignal(GSN_FSOPENCONF, &Lgman::execFSOPENCONF);
82
83 addRecSignal(GSN_FSCLOSECONF, &Lgman::execFSCLOSECONF);
84
85 addRecSignal(GSN_FSREADREF, &Lgman::execFSREADREF, true);
86 addRecSignal(GSN_FSREADCONF, &Lgman::execFSREADCONF);
87
88 addRecSignal(GSN_LCP_FRAG_ORD, &Lgman::execLCP_FRAG_ORD);
89 addRecSignal(GSN_END_LCP_REQ, &Lgman::execEND_LCP_REQ);
90 addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Lgman::execSUB_GCP_COMPLETE_REP);
91 addRecSignal(GSN_START_RECREQ, &Lgman::execSTART_RECREQ);
92
93 addRecSignal(GSN_END_LCP_CONF, &Lgman::execEND_LCP_CONF);
94
95 addRecSignal(GSN_GET_TABINFOREQ, &Lgman::execGET_TABINFOREQ);
96
97 m_last_lsn = 1;
98 m_logfile_group_hash.setSize(10);
99 }
100
~Lgman()101 Lgman::~Lgman()
102 {
103 }
104
BLOCK_FUNCTIONS(Lgman)105 BLOCK_FUNCTIONS(Lgman)
106
107 void
108 Lgman::execREAD_CONFIG_REQ(Signal* signal)
109 {
110 jamEntry();
111
112 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
113
114 Uint32 ref = req->senderRef;
115 Uint32 senderData = req->senderData;
116
117 const ndb_mgm_configuration_iterator * p =
118 m_ctx.m_config.getOwnConfigIterator();
119 ndbrequire(p != 0);
120
121 Pool_context pc;
122 pc.m_block = this;
123 m_log_waiter_pool.wo_pool_init(RT_LGMAN_LOG_WAITER, pc);
124 m_file_pool.init(RT_LGMAN_FILE, pc);
125 m_logfile_group_pool.init(RT_LGMAN_FILEGROUP, pc);
126 m_data_buffer_pool.setSize(10);
127
128 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
129 conf->senderRef = reference();
130 conf->senderData = senderData;
131 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
132 ReadConfigConf::SignalLength, JBB);
133 }
134
135 void
execSTTOR(Signal * signal)136 Lgman::execSTTOR(Signal* signal)
137 {
138 jamEntry();
139 sendSTTORRY(signal);
140
141 return;
142 }//Lgman::execNDB_STTOR()
143
144 void
sendSTTORRY(Signal * signal)145 Lgman::sendSTTORRY(Signal* signal)
146 {
147 signal->theData[0] = 0;
148 signal->theData[3] = 1;
149 signal->theData[4] = 2;
150 signal->theData[5] = 3;
151 signal->theData[6] = 4;
152 signal->theData[7] = 5;
153 signal->theData[8] = 6;
154 signal->theData[9] = 255; // No more start phases from missra
155 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
156 }
157
158 void
execCONTINUEB(Signal * signal)159 Lgman::execCONTINUEB(Signal* signal){
160 jamEntry();
161
162 Uint32 type= signal->theData[0];
163 Uint32 ptrI = signal->theData[1];
164 switch(type){
165 case LgmanContinueB::FILTER_LOG:
166 jam();
167 break;
168 case LgmanContinueB::CUT_LOG_TAIL:
169 {
170 jam();
171 Ptr<Logfile_group> ptr;
172 m_logfile_group_pool.getPtr(ptr, ptrI);
173 cut_log_tail(signal, ptr);
174 return;
175 }
176 case LgmanContinueB::FLUSH_LOG:
177 {
178 jam();
179 Ptr<Logfile_group> ptr;
180 m_logfile_group_pool.getPtr(ptr, ptrI);
181 flush_log(signal, ptr, signal->theData[2]);
182 return;
183 }
184 case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
185 {
186 jam();
187 Ptr<Logfile_group> ptr;
188 m_logfile_group_pool.getPtr(ptr, ptrI);
189 process_log_buffer_waiters(signal, ptr);
190 return;
191 }
192 case LgmanContinueB::FIND_LOG_HEAD:
193 jam();
194 Ptr<Logfile_group> ptr;
195 if(ptrI != RNIL)
196 {
197 m_logfile_group_pool.getPtr(ptr, ptrI);
198 find_log_head(signal, ptr);
199 }
200 else
201 {
202 init_run_undo_log(signal);
203 }
204 return;
205 case LgmanContinueB::EXECUTE_UNDO_RECORD:
206 jam();
207 execute_undo_record(signal);
208 return;
209 case LgmanContinueB::STOP_UNDO_LOG:
210 jam();
211 stop_run_undo_log(signal);
212 return;
213 case LgmanContinueB::READ_UNDO_LOG:
214 {
215 jam();
216 Ptr<Logfile_group> ptr;
217 m_logfile_group_pool.getPtr(ptr, ptrI);
218 read_undo_log(signal, ptr);
219 return;
220 }
221 case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS:
222 {
223 jam();
224 Ptr<Logfile_group> ptr;
225 m_logfile_group_pool.getPtr(ptr, ptrI);
226 process_log_sync_waiters(signal, ptr);
227 return;
228 }
229 case LgmanContinueB::FORCE_LOG_SYNC:
230 {
231 jam();
232 Ptr<Logfile_group> ptr;
233 m_logfile_group_pool.getPtr(ptr, ptrI);
234 force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]);
235 return;
236 }
237 case LgmanContinueB::DROP_FILEGROUP:
238 {
239 jam();
240 Ptr<Logfile_group> ptr;
241 m_logfile_group_pool.getPtr(ptr, ptrI);
242 if (ptr.p->m_state & Logfile_group::LG_THREAD_MASK)
243 {
244 jam();
245 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
246 signal->length());
247 return;
248 }
249 Uint32 ref = signal->theData[2];
250 Uint32 data = signal->theData[3];
251 drop_filegroup_drop_files(signal, ptr, ref, data);
252 return;
253 }
254 }
255 }
256
257 void
execDUMP_STATE_ORD(Signal * signal)258 Lgman::execDUMP_STATE_ORD(Signal* signal){
259 jamEntry();
260 if(signal->theData[0] == 12001)
261 {
262 Ptr<Logfile_group> ptr;
263 m_logfile_group_list.first(ptr);
264 while(!ptr.isNull())
265 {
266 infoEvent("lfg %d state: %x fs: %d lsn "
267 "[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d %d",
268 ptr.p->m_logfile_group_id, ptr.p->m_state,
269 ptr.p->m_outstanding_fs,
270 ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn,
271 ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn,
272 !ptr.p->m_log_buffer_waiters.isEmpty(),
273 !ptr.p->m_log_sync_waiters.isEmpty());
274 if (!ptr.p->m_log_buffer_waiters.isEmpty())
275 {
276 Ptr<Log_waiter> waiter;
277 Local_log_waiter_list
278 list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
279 list.first(waiter);
280 infoEvent(" free_buffer_words: %d head(waiters).sz: %d %d",
281 ptr.p->m_free_buffer_words,
282 waiter.p->m_size,
283 2*File_formats::UNDO_PAGE_WORDS);
284 }
285 if (!ptr.p->m_log_sync_waiters.isEmpty())
286 {
287 Ptr<Log_waiter> waiter;
288 Local_log_waiter_list
289 list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
290 list.first(waiter);
291 infoEvent(" m_last_synced_lsn: %lld head(waiters %x).m_sync_lsn: %lld",
292 ptr.p->m_last_synced_lsn,
293 waiter.i,
294 waiter.p->m_sync_lsn);
295
296 while(!waiter.isNull())
297 {
298 ndbout_c("ptr: %x %p lsn: %lld next: %x",
299 waiter.i, waiter.p, waiter.p->m_sync_lsn, waiter.p->nextList);
300 list.next(waiter);
301 }
302 }
303 m_logfile_group_list.next(ptr);
304 }
305 }
306 }
307
308 void
execCREATE_FILEGROUP_REQ(Signal * signal)309 Lgman::execCREATE_FILEGROUP_REQ(Signal* signal){
310 jamEntry();
311 CreateFilegroupImplReq* req= (CreateFilegroupImplReq*)signal->getDataPtr();
312
313 Uint32 senderRef = req->senderRef;
314 Uint32 senderData = req->senderData;
315
316 Ptr<Logfile_group> ptr;
317 CreateFilegroupImplRef::ErrorCode err = CreateFilegroupImplRef::NoError;
318 do {
319 if (m_logfile_group_hash.find(ptr, req->filegroup_id))
320 {
321 jam();
322 err = CreateFilegroupImplRef::FilegroupAlreadyExists;
323 break;
324 }
325
326 if (!m_logfile_group_list.isEmpty())
327 {
328 jam();
329 err = CreateFilegroupImplRef::OneLogfileGroupLimit;
330 break;
331 }
332
333 if (!m_logfile_group_pool.seize(ptr))
334 {
335 jam();
336 err = CreateFilegroupImplRef::OutOfFilegroupRecords;
337 break;
338 }
339
340 new (ptr.p) Logfile_group(req);
341
342 if (!alloc_logbuffer_memory(ptr, req->logfile_group.buffer_size))
343 {
344 jam();
345 err= CreateFilegroupImplRef::OutOfLogBufferMemory;
346 m_logfile_group_pool.release(ptr);
347 break;
348 }
349
350 m_logfile_group_hash.add(ptr);
351 m_logfile_group_list.add(ptr);
352
353 if (getNodeState().getNodeRestartInProgress() ||
354 getNodeState().getSystemRestartInProgress())
355 {
356 ptr.p->m_state = Logfile_group::LG_STARTING;
357 }
358
359 CreateFilegroupImplConf* conf=
360 (CreateFilegroupImplConf*)signal->getDataPtr();
361 conf->senderData = senderData;
362 conf->senderRef = reference();
363 sendSignal(senderRef, GSN_CREATE_FILEGROUP_CONF, signal,
364 CreateFilegroupImplConf::SignalLength, JBB);
365
366 return;
367 } while(0);
368
369 CreateFilegroupImplRef* ref= (CreateFilegroupImplRef*)signal->getDataPtr();
370 ref->senderData = senderData;
371 ref->senderRef = reference();
372 ref->errorCode = err;
373 sendSignal(senderRef, GSN_CREATE_FILEGROUP_REF, signal,
374 CreateFilegroupImplRef::SignalLength, JBB);
375 }
376
377 void
execDROP_FILEGROUP_REQ(Signal * signal)378 Lgman::execDROP_FILEGROUP_REQ(Signal* signal)
379 {
380 jamEntry();
381
382 Uint32 errorCode = 0;
383 DropFilegroupImplReq req = *(DropFilegroupImplReq*)signal->getDataPtr();
384 do
385 {
386 Ptr<Logfile_group> ptr;
387 if (!m_logfile_group_hash.find(ptr, req.filegroup_id))
388 {
389 errorCode = DropFilegroupImplRef::NoSuchFilegroup;
390 break;
391 }
392
393 if (ptr.p->m_version != req.filegroup_version)
394 {
395 errorCode = DropFilegroupImplRef::InvalidFilegroupVersion;
396 break;
397 }
398
399 switch(req.requestInfo){
400 case DropFilegroupImplReq::Prepare:
401 break;
402 case DropFilegroupImplReq::Commit:
403 m_logfile_group_list.remove(ptr);
404 ptr.p->m_state |= Logfile_group::LG_DROPPING;
405 signal->theData[0] = LgmanContinueB::DROP_FILEGROUP;
406 signal->theData[1] = ptr.i;
407 signal->theData[2] = req.senderRef;
408 signal->theData[3] = req.senderData;
409 sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
410 return;
411 case DropFilegroupImplReq::Abort:
412 break;
413 default:
414 ndbrequire(false);
415 }
416 } while(0);
417
418 if (errorCode)
419 {
420 DropFilegroupImplRef* ref =
421 (DropFilegroupImplRef*)signal->getDataPtrSend();
422 ref->senderRef = reference();
423 ref->senderData = req.senderData;
424 ref->errorCode = errorCode;
425 sendSignal(req.senderRef, GSN_DROP_FILEGROUP_REF, signal,
426 DropFilegroupImplRef::SignalLength, JBB);
427 }
428 else
429 {
430 DropFilegroupImplConf* conf =
431 (DropFilegroupImplConf*)signal->getDataPtrSend();
432 conf->senderRef = reference();
433 conf->senderData = req.senderData;
434 sendSignal(req.senderRef, GSN_DROP_FILEGROUP_CONF, signal,
435 DropFilegroupImplConf::SignalLength, JBB);
436 }
437 }
438
439 void
drop_filegroup_drop_files(Signal * signal,Ptr<Logfile_group> ptr,Uint32 ref,Uint32 data)440 Lgman::drop_filegroup_drop_files(Signal* signal,
441 Ptr<Logfile_group> ptr,
442 Uint32 ref, Uint32 data)
443 {
444 jam();
445 ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
446 ndbrequire(ptr.p->m_outstanding_fs == 0);
447
448 Local_undofile_list list(m_file_pool, ptr.p->m_files);
449 Ptr<Undofile> file_ptr;
450
451 if (list.first(file_ptr))
452 {
453 jam();
454 ndbrequire(! (file_ptr.p->m_state & Undofile::FS_OUTSTANDING));
455 file_ptr.p->m_create.m_senderRef = ref;
456 file_ptr.p->m_create.m_senderData = data;
457 create_file_abort(signal, ptr, file_ptr);
458 return;
459 }
460
461 Local_undofile_list metalist(m_file_pool, ptr.p->m_meta_files);
462 if (metalist.first(file_ptr))
463 {
464 jam();
465 metalist.remove(file_ptr);
466 list.add(file_ptr);
467 file_ptr.p->m_create.m_senderRef = ref;
468 file_ptr.p->m_create.m_senderData = data;
469 create_file_abort(signal, ptr, file_ptr);
470 return;
471 }
472
473 free_logbuffer_memory(ptr);
474 m_logfile_group_hash.release(ptr);
475 DropFilegroupImplConf *conf = (DropFilegroupImplConf*)signal->getDataPtr();
476 conf->senderData = data;
477 conf->senderRef = reference();
478 sendSignal(ref, GSN_DROP_FILEGROUP_CONF, signal,
479 DropFilegroupImplConf::SignalLength, JBB);
480 }
481
482 void
execCREATE_FILE_REQ(Signal * signal)483 Lgman::execCREATE_FILE_REQ(Signal* signal)
484 {
485 jamEntry();
486 CreateFileImplReq* req= (CreateFileImplReq*)signal->getDataPtr();
487
488 Uint32 senderRef = req->senderRef;
489 Uint32 senderData = req->senderData;
490 Uint32 requestInfo = req->requestInfo;
491
492 Ptr<Logfile_group> ptr;
493 CreateFileImplRef::ErrorCode err = CreateFileImplRef::NoError;
494 do {
495 if (!m_logfile_group_hash.find(ptr, req->filegroup_id))
496 {
497 jam();
498 err = CreateFileImplRef::InvalidFilegroup;
499 break;
500 }
501
502 if (ptr.p->m_version != req->filegroup_version)
503 {
504 jam();
505 err = CreateFileImplRef::InvalidFilegroupVersion;
506 break;
507 }
508
509 Ptr<Undofile> file_ptr;
510 switch(requestInfo){
511 case CreateFileImplReq::Commit:
512 {
513 jam();
514 ndbrequire(find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id));
515 file_ptr.p->m_create.m_senderRef = req->senderRef;
516 file_ptr.p->m_create.m_senderData = req->senderData;
517 create_file_commit(signal, ptr, file_ptr);
518 return;
519 }
520 case CreateFileImplReq::Abort:
521 {
522 Uint32 senderRef = req->senderRef;
523 Uint32 senderData = req->senderData;
524 if (find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id))
525 {
526 jam();
527 file_ptr.p->m_create.m_senderRef = senderRef;
528 file_ptr.p->m_create.m_senderData = senderData;
529 create_file_abort(signal, ptr, file_ptr);
530 }
531 else
532 {
533 CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
534 jam();
535 conf->senderData = senderData;
536 conf->senderRef = reference();
537 sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal,
538 CreateFileImplConf::SignalLength, JBB);
539 }
540 return;
541 }
542 default: // prepare
543 break;
544 }
545
546 if (!m_file_pool.seize(file_ptr))
547 {
548 jam();
549 err = CreateFileImplRef::OutOfFileRecords;
550 break;
551 }
552
553 if(ERROR_INSERTED(15000) ||
554 (sizeof(void*) == 4 && req->file_size_hi & 0xFFFFFFFF))
555 {
556 jam();
557 if(signal->getNoOfSections())
558 releaseSections(signal);
559
560 CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
561 ref->senderData = senderData;
562 ref->senderRef = reference();
563 ref->errorCode = CreateFileImplRef::FileSizeTooLarge;
564 sendSignal(senderRef, GSN_CREATE_FILE_REF, signal,
565 CreateFileImplRef::SignalLength, JBB);
566 return;
567 }
568
569 new (file_ptr.p) Undofile(req, ptr.i);
570
571 Local_undofile_list tmp(m_file_pool, ptr.p->m_meta_files);
572 tmp.add(file_ptr);
573
574 open_file(signal, file_ptr, req->requestInfo);
575 return;
576 } while(0);
577
578 CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
579 ref->senderData = senderData;
580 ref->senderRef = reference();
581 ref->errorCode = err;
582 sendSignal(senderRef, GSN_CREATE_FILE_REF, signal,
583 CreateFileImplRef::SignalLength, JBB);
584 }
585
586 void
open_file(Signal * signal,Ptr<Undofile> ptr,Uint32 requestInfo)587 Lgman::open_file(Signal* signal, Ptr<Undofile> ptr, Uint32 requestInfo)
588 {
589 FsOpenReq* req = (FsOpenReq*)signal->getDataPtrSend();
590 req->userReference = reference();
591 req->userPointer = ptr.i;
592
593 memset(req->fileNumber, 0, sizeof(req->fileNumber));
594 FsOpenReq::setVersion(req->fileNumber, 4); // Version 4 = specified filename
595
596 req->fileFlags = 0;
597 req->fileFlags |= FsOpenReq::OM_READWRITE;
598 req->fileFlags |= FsOpenReq::OM_DIRECT;
599 req->fileFlags |= FsOpenReq::OM_SYNC;
600 switch(requestInfo){
601 case CreateFileImplReq::Create:
602 req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;
603 req->fileFlags |= FsOpenReq::OM_INIT;
604 ptr.p->m_state = Undofile::FS_CREATING;
605 break;
606 case CreateFileImplReq::CreateForce:
607 req->fileFlags |= FsOpenReq::OM_CREATE;
608 req->fileFlags |= FsOpenReq::OM_INIT;
609 ptr.p->m_state = Undofile::FS_CREATING;
610 break;
611 case CreateFileImplReq::Open:
612 req->fileFlags |= FsOpenReq::OM_CHECK_SIZE;
613 ptr.p->m_state = Undofile::FS_OPENING;
614 break;
615 default:
616 ndbrequire(false);
617 }
618
619 req->page_size = File_formats::NDB_PAGE_SIZE;
620 Uint64 size = (Uint64)ptr.p->m_file_size * (Uint64)File_formats::NDB_PAGE_SIZE;
621 req->file_size_hi = size >> 32;
622 req->file_size_lo = size & 0xFFFFFFFF;
623
624 // Forward filename
625 sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBB);
626 }
627
628 void
execFSWRITEREQ(Signal * signal)629 Lgman::execFSWRITEREQ(Signal* signal)
630 {
631 jamEntry();
632 Ptr<Undofile> ptr;
633 Ptr<GlobalPage> page_ptr;
634 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtr();
635
636 m_file_pool.getPtr(ptr, req->userPointer);
637 m_global_page_pool.getPtr(page_ptr, req->data.pageData[0]);
638
639 if (req->varIndex == 0)
640 {
641 jam();
642 File_formats::Undofile::Zero_page* page =
643 (File_formats::Undofile::Zero_page*)page_ptr.p;
644 page->m_page_header.init(File_formats::FT_Undofile,
645 getOwnNodeId(),
646 ndbGetOwnVersion(),
647 time(0));
648 page->m_file_id = ptr.p->m_file_id;
649 page->m_logfile_group_id = ptr.p->m_create.m_logfile_group_id;
650 page->m_logfile_group_version = ptr.p->m_create.m_logfile_group_version;
651 page->m_undo_pages = ptr.p->m_file_size - 1; // minus zero page
652 }
653 else
654 {
655 jam();
656 File_formats::Undofile::Undo_page* page =
657 (File_formats::Undofile::Undo_page*)page_ptr.p;
658 page->m_page_header.m_page_lsn_hi = 0;
659 page->m_page_header.m_page_lsn_lo = 0;
660 page->m_page_header.m_page_type = File_formats::PT_Undopage;
661 page->m_words_used = 0;
662 }
663 }
664
665 void
execFSOPENREF(Signal * signal)666 Lgman::execFSOPENREF(Signal* signal)
667 {
668 jamEntry();
669
670 Ptr<Undofile> ptr;
671 Ptr<Logfile_group> lg_ptr;
672 FsRef* ref = (FsRef*)signal->getDataPtr();
673
674 Uint32 errCode = ref->errorCode;
675 Uint32 osErrCode = ref->osErrorCode;
676
677 m_file_pool.getPtr(ptr, ref->userPointer);
678 m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
679
680 {
681 CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
682 ref->senderData = ptr.p->m_create.m_senderData;
683 ref->senderRef = reference();
684 ref->errorCode = CreateFileImplRef::FileError;
685 ref->fsErrCode = errCode;
686 ref->osErrCode = osErrCode;
687 sendSignal(ptr.p->m_create.m_senderRef, GSN_CREATE_FILE_REF, signal,
688 CreateFileImplRef::SignalLength, JBB);
689 }
690
691 Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
692 meta.release(ptr);
693 }
694
695 #define HEAD 0
696 #define TAIL 1
697
698 void
execFSOPENCONF(Signal * signal)699 Lgman::execFSOPENCONF(Signal* signal)
700 {
701 jamEntry();
702 Ptr<Undofile> ptr;
703
704 FsConf* conf = (FsConf*)signal->getDataPtr();
705
706 Uint32 fd = conf->filePointer;
707 m_file_pool.getPtr(ptr, conf->userPointer);
708
709 ptr.p->m_fd = fd;
710
711 {
712 Uint32 senderRef = ptr.p->m_create.m_senderRef;
713 Uint32 senderData = ptr.p->m_create.m_senderData;
714
715 CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
716 conf->senderData = senderData;
717 conf->senderRef = reference();
718 sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal,
719 CreateFileImplConf::SignalLength, JBB);
720 }
721 }
722
723 bool
find_file_by_id(Ptr<Undofile> & ptr,Local_undofile_list::Head & head,Uint32 id)724 Lgman::find_file_by_id(Ptr<Undofile>& ptr,
725 Local_undofile_list::Head& head, Uint32 id)
726 {
727 Local_undofile_list list(m_file_pool, head);
728 for(list.first(ptr); !ptr.isNull(); list.next(ptr))
729 if(ptr.p->m_file_id == id)
730 return true;
731 return false;
732 }
733
734 void
create_file_commit(Signal * signal,Ptr<Logfile_group> lg_ptr,Ptr<Undofile> ptr)735 Lgman::create_file_commit(Signal* signal,
736 Ptr<Logfile_group> lg_ptr,
737 Ptr<Undofile> ptr)
738 {
739 Uint32 senderRef = ptr.p->m_create.m_senderRef;
740 Uint32 senderData = ptr.p->m_create.m_senderData;
741
742 bool first= false;
743 if(ptr.p->m_state == Undofile::FS_CREATING &&
744 (lg_ptr.p->m_state & Logfile_group::LG_ONLINE))
745 {
746 jam();
747 Local_undofile_list free(m_file_pool, lg_ptr.p->m_files);
748 Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
749 first= free.isEmpty();
750 meta.remove(ptr);
751 if(!first)
752 {
753 /**
754 * Add log file next after current head
755 */
756 Ptr<Undofile> curr;
757 m_file_pool.getPtr(curr, lg_ptr.p->m_file_pos[HEAD].m_ptr_i);
758 if(free.next(curr))
759 free.insert(ptr, curr); // inserts before (that's why the extra next)
760 else
761 free.add(ptr);
762
763 ptr.p->m_state = Undofile::FS_ONLINE | Undofile::FS_EMPTY;
764 }
765 else
766 {
767 /**
768 * First file isn't empty as it can be written to at any time
769 */
770 free.add(ptr);
771 ptr.p->m_state = Undofile::FS_ONLINE;
772 lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
773 signal->theData[0] = LgmanContinueB::FLUSH_LOG;
774 signal->theData[1] = lg_ptr.i;
775 signal->theData[2] = 0;
776 sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
777 }
778 }
779 else
780 {
781 ptr.p->m_state = Undofile::FS_SORTING;
782 }
783
784 ptr.p->m_online.m_lsn = 0;
785 ptr.p->m_online.m_outstanding = 0;
786
787 Uint64 add= ptr.p->m_file_size - 1;
788 lg_ptr.p->m_free_file_words += add * File_formats::UNDO_PAGE_WORDS;
789
790 if(first)
791 {
792 jam();
793
794 Buffer_idx tmp= { ptr.i, 0 };
795 lg_ptr.p->m_file_pos[HEAD] = lg_ptr.p->m_file_pos[TAIL] = tmp;
796
797 /**
798 * Init log tail pointer
799 */
800 lg_ptr.p->m_tail_pos[0] = tmp;
801 lg_ptr.p->m_tail_pos[1] = tmp;
802 lg_ptr.p->m_tail_pos[2] = tmp;
803 lg_ptr.p->m_next_reply_ptr_i = ptr.i;
804 }
805
806 validate_logfile_group(lg_ptr, "create_file_commit");
807
808 CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
809 conf->senderData = senderData;
810 conf->senderRef = reference();
811 sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal,
812 CreateFileImplConf::SignalLength, JBB);
813 }
814
815 void
create_file_abort(Signal * signal,Ptr<Logfile_group> lg_ptr,Ptr<Undofile> ptr)816 Lgman::create_file_abort(Signal* signal,
817 Ptr<Logfile_group> lg_ptr,
818 Ptr<Undofile> ptr)
819 {
820 if (ptr.p->m_fd == RNIL)
821 {
822 ((FsConf*)signal->getDataPtr())->userPointer = ptr.i;
823 execFSCLOSECONF(signal);
824 return;
825 }
826
827 FsCloseReq *req= (FsCloseReq*)signal->getDataPtrSend();
828 req->filePointer = ptr.p->m_fd;
829 req->userReference = reference();
830 req->userPointer = ptr.i;
831 req->fileFlag = 0;
832 FsCloseReq::setRemoveFileFlag(req->fileFlag, true);
833
834 sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal,
835 FsCloseReq::SignalLength, JBB);
836 }
837
838 void
execFSCLOSECONF(Signal * signal)839 Lgman::execFSCLOSECONF(Signal* signal)
840 {
841 Ptr<Undofile> ptr;
842 Ptr<Logfile_group> lg_ptr;
843 Uint32 ptrI = ((FsConf*)signal->getDataPtr())->userPointer;
844 m_file_pool.getPtr(ptr, ptrI);
845
846 Uint32 senderRef = ptr.p->m_create.m_senderRef;
847 Uint32 senderData = ptr.p->m_create.m_senderData;
848
849 m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
850
851 if (lg_ptr.p->m_state & Logfile_group::LG_DROPPING)
852 {
853 jam();
854 {
855 Local_undofile_list list(m_file_pool, lg_ptr.p->m_files);
856 list.release(ptr);
857 }
858 drop_filegroup_drop_files(signal, lg_ptr, senderRef, senderData);
859 }
860 else
861 {
862 jam();
863 Local_undofile_list list(m_file_pool, lg_ptr.p->m_meta_files);
864 list.release(ptr);
865
866 CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
867 conf->senderData = senderData;
868 conf->senderRef = reference();
869 sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal,
870 CreateFileImplConf::SignalLength, JBB);
871 }
872 }
873
874 void
execDROP_FILE_REQ(Signal * signal)875 Lgman::execDROP_FILE_REQ(Signal* signal)
876 {
877 jamEntry();
878 ndbrequire(false);
879 }
880
881 #define CONSUMER 0
882 #define PRODUCER 1
883
Logfile_group(const CreateFilegroupImplReq * req)884 Lgman::Logfile_group::Logfile_group(const CreateFilegroupImplReq* req)
885 {
886 m_logfile_group_id = req->filegroup_id;
887 m_version = req->filegroup_version;
888 m_state = LG_ONLINE;
889 m_outstanding_fs = 0;
890 m_next_reply_ptr_i = RNIL;
891
892 m_last_lsn = 0;
893 m_last_synced_lsn = 0;
894 m_last_sync_req_lsn = 0;
895 m_max_sync_req_lsn = 0;
896 m_last_read_lsn = 0;
897 m_file_pos[0].m_ptr_i= m_file_pos[1].m_ptr_i = RNIL;
898
899 m_free_file_words = 0;
900 m_free_buffer_words = 0;
901 m_pos[CONSUMER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
902 m_pos[CONSUMER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
903 m_pos[PRODUCER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
904 m_pos[PRODUCER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
905
906 m_tail_pos[2].m_ptr_i= RNIL;
907 m_tail_pos[2].m_idx= ~0;
908
909 m_tail_pos[0] = m_tail_pos[1] = m_tail_pos[2];
910 }
911
912 bool
alloc_logbuffer_memory(Ptr<Logfile_group> ptr,Uint32 bytes)913 Lgman::alloc_logbuffer_memory(Ptr<Logfile_group> ptr, Uint32 bytes)
914 {
915 Uint32 pages= (((bytes + 3) >> 2) + File_formats::NDB_PAGE_SIZE_WORDS - 1)
916 / File_formats::NDB_PAGE_SIZE_WORDS;
917 Uint32 requested= pages;
918 {
919 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
920 while(pages)
921 {
922 Uint32 ptrI;
923 Uint32 cnt = pages > 64 ? 64 : pages;
924 m_ctx.m_mm.alloc_pages(RG_DISK_OPERATIONS, &ptrI, &cnt, 1);
925 if (cnt)
926 {
927 Buffer_idx range;
928 range.m_ptr_i= ptrI;
929 range.m_idx = cnt;
930
931 ndbrequire(map.append((Uint32*)&range, 2));
932 pages -= range.m_idx;
933 }
934 else
935 {
936 break;
937 }
938 }
939 }
940
941 if(2*pages > requested)
942 {
943 // less than half allocated
944 free_logbuffer_memory(ptr);
945 return false;
946 }
947
948 if(pages != 0)
949 {
950 warningEvent("Allocated %d pages for log buffer space, logfile_group: %d"
951 " , requested %d pages",
952 (requested-pages), ptr.p->m_logfile_group_id, requested);
953 }
954
955 init_logbuffer_pointers(ptr);
956 return true;
957 }
958
959 void
init_logbuffer_pointers(Ptr<Logfile_group> ptr)960 Lgman::init_logbuffer_pointers(Ptr<Logfile_group> ptr)
961 {
962 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
963 Page_map::Iterator it;
964 union {
965 Uint32 tmp[2];
966 Buffer_idx range;
967 };
968
969 map.first(it);
970 tmp[0] = *it.data;
971 ndbrequire(map.next(it));
972 tmp[1] = *it.data;
973
974 ptr.p->m_pos[CONSUMER].m_current_page.m_ptr_i = 0; // Index in page map
975 ptr.p->m_pos[CONSUMER].m_current_page.m_idx = range.m_idx - 1;// left range
976 ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
977 ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = 0; // Page pos
978
979 ptr.p->m_pos[PRODUCER].m_current_page.m_ptr_i = 0; // Index in page map
980 ptr.p->m_pos[PRODUCER].m_current_page.m_idx = range.m_idx - 1;// left range
981 ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
982 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0; // Page pos
983
984 Uint32 pages= range.m_idx;
985 while(map.next(it))
986 {
987 tmp[0] = *it.data;
988 ndbrequire(map.next(it));
989 tmp[1] = *it.data;
990 pages += range.m_idx;
991 }
992
993 ptr.p->m_free_buffer_words = pages * File_formats::UNDO_PAGE_WORDS;
994 }
995
996 Uint32
compute_free_file_pages(Ptr<Logfile_group> ptr)997 Lgman::compute_free_file_pages(Ptr<Logfile_group> ptr)
998 {
999 Buffer_idx head= ptr.p->m_file_pos[HEAD];
1000 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1001 Uint32 pages = 0;
1002 if (head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx)
1003 {
1004 pages += tail.m_idx - head.m_idx;
1005 }
1006 else
1007 {
1008 Ptr<Undofile> file;
1009 m_file_pool.getPtr(file, head.m_ptr_i);
1010 Local_undofile_list list(m_file_pool, ptr.p->m_files);
1011
1012 do
1013 {
1014 pages += (file.p->m_file_size - head.m_idx - 1);
1015 if(!list.next(file))
1016 list.first(file);
1017 head.m_idx = 0;
1018 } while(file.i != tail.m_ptr_i);
1019
1020 pages += tail.m_idx - head.m_idx;
1021 }
1022 return pages;
1023 }
1024
1025 void
free_logbuffer_memory(Ptr<Logfile_group> ptr)1026 Lgman::free_logbuffer_memory(Ptr<Logfile_group> ptr)
1027 {
1028 union {
1029 Uint32 tmp[2];
1030 Buffer_idx range;
1031 };
1032
1033 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1034
1035 Page_map::Iterator it;
1036 map.first(it);
1037 while(!it.isNull())
1038 {
1039 tmp[0] = *it.data;
1040 ndbrequire(map.next(it));
1041 tmp[1] = *it.data;
1042
1043 m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS, range.m_ptr_i, range.m_idx);
1044 map.next(it);
1045 }
1046 map.release();
1047 }
1048
Undofile(const struct CreateFileImplReq * req,Uint32 ptrI)1049 Lgman::Undofile::Undofile(const struct CreateFileImplReq* req, Uint32 ptrI)
1050 {
1051 m_fd = RNIL;
1052 m_file_id = req->file_id;
1053 m_logfile_group_ptr_i= ptrI;
1054
1055 Uint64 pages = req->file_size_hi;
1056 pages = (pages << 32) | req->file_size_lo;
1057 pages /= GLOBAL_PAGE_SIZE;
1058 m_file_size = pages;
1059
1060 m_create.m_senderRef = req->senderRef; // During META
1061 m_create.m_senderData = req->senderData; // During META
1062 m_create.m_logfile_group_id = req->filegroup_id;
1063 }
1064
Logfile_client(SimulatedBlock * block,Lgman * lgman,Uint32 logfile_group_id)1065 Logfile_client::Logfile_client(SimulatedBlock* block,
1066 Lgman* lgman, Uint32 logfile_group_id)
1067 {
1068 m_block= block->number();
1069 m_lgman= lgman;
1070 m_logfile_group_id= logfile_group_id;
1071 }
1072
1073 int
sync_lsn(Signal * signal,Uint64 lsn,Request * req,Uint32 flags)1074 Logfile_client::sync_lsn(Signal* signal,
1075 Uint64 lsn, Request* req, Uint32 flags)
1076 {
1077 Ptr<Lgman::Logfile_group> ptr;
1078 if(m_lgman->m_logfile_group_list.first(ptr))
1079 {
1080 if(ptr.p->m_last_synced_lsn >= lsn)
1081 {
1082 return 1;
1083 }
1084
1085 bool empty= false;
1086 Ptr<Lgman::Log_waiter> wait;
1087 {
1088 Lgman::Local_log_waiter_list
1089 list(m_lgman->m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1090
1091 empty= list.isEmpty();
1092 if(!list.seize(wait))
1093 return -1;
1094
1095 wait.p->m_block= m_block;
1096 wait.p->m_sync_lsn= lsn;
1097 memcpy(&wait.p->m_callback, &req->m_callback,
1098 sizeof(SimulatedBlock::Callback));
1099
1100 ptr.p->m_max_sync_req_lsn = lsn > ptr.p->m_max_sync_req_lsn ?
1101 lsn : ptr.p->m_max_sync_req_lsn;
1102 }
1103
1104 if(ptr.p->m_last_sync_req_lsn < lsn &&
1105 ! (ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD))
1106 {
1107 ptr.p->m_state |= Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1108 signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1109 signal->theData[1] = ptr.i;
1110 signal->theData[2] = lsn >> 32;
1111 signal->theData[3] = lsn & 0xFFFFFFFF;
1112 m_lgman->sendSignalWithDelay(m_lgman->reference(),
1113 GSN_CONTINUEB, signal, 10, 4);
1114 }
1115 return 0;
1116 }
1117 return -1;
1118 }
1119
1120 void
force_log_sync(Signal * signal,Ptr<Logfile_group> ptr,Uint32 lsn_hi,Uint32 lsn_lo)1121 Lgman::force_log_sync(Signal* signal,
1122 Ptr<Logfile_group> ptr,
1123 Uint32 lsn_hi, Uint32 lsn_lo)
1124 {
1125 Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1126 Uint64 force_lsn = lsn_hi; force_lsn <<= 32; force_lsn += lsn_lo;
1127
1128 if(ptr.p->m_last_sync_req_lsn < force_lsn)
1129 {
1130 /**
1131 * Do force
1132 */
1133 Buffer_idx pos= ptr.p->m_pos[PRODUCER].m_current_pos;
1134 GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1135
1136 Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1137 if(pos.m_idx) // don't flush empty page...
1138 {
1139 Uint64 lsn= ptr.p->m_last_lsn - 1;
1140
1141 File_formats::Undofile::Undo_page* undo=
1142 (File_formats::Undofile::Undo_page*)page;
1143 undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
1144 undo->m_page_header.m_page_lsn_hi = lsn >> 32;
1145 undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1146
1147 /**
1148 * Update free space with extra NOOP
1149 */
1150 ndbrequire(ptr.p->m_free_file_words >= free);
1151 ndbrequire(ptr.p->m_free_buffer_words > free);
1152 ptr.p->m_free_file_words -= free;
1153 ptr.p->m_free_buffer_words -= free;
1154
1155 validate_logfile_group(ptr, "force_log_sync");
1156
1157 next_page(ptr.p, PRODUCER);
1158 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1159 }
1160 }
1161
1162
1163
1164 Uint64 max_req_lsn = ptr.p->m_max_sync_req_lsn;
1165 if(max_req_lsn > force_lsn &&
1166 max_req_lsn > ptr.p->m_last_sync_req_lsn)
1167 {
1168 ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
1169 signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1170 signal->theData[1] = ptr.i;
1171 signal->theData[2] = max_req_lsn >> 32;
1172 signal->theData[3] = max_req_lsn & 0xFFFFFFFF;
1173 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 4);
1174 }
1175 else
1176 {
1177 ptr.p->m_state &= ~(Uint32)Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1178 }
1179 }
1180
1181 void
process_log_sync_waiters(Signal * signal,Ptr<Logfile_group> ptr)1182 Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1183 {
1184 Local_log_waiter_list
1185 list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1186
1187 if(list.isEmpty())
1188 {
1189 return;
1190 }
1191
1192 bool removed= false;
1193 Ptr<Log_waiter> waiter;
1194 list.first(waiter);
1195 Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1196
1197 if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn)
1198 {
1199 removed= true;
1200 Uint32 block = waiter.p->m_block;
1201 SimulatedBlock* b = globalData.getBlock(block);
1202 b->execute(signal, waiter.p->m_callback, logfile_group_id);
1203
1204 list.releaseFirst(waiter);
1205 }
1206
1207 if(removed && !list.isEmpty())
1208 {
1209 ptr.p->m_state |= Logfile_group::LG_SYNC_WAITERS_THREAD;
1210 signal->theData[0] = LgmanContinueB::PROCESS_LOG_SYNC_WAITERS;
1211 signal->theData[1] = ptr.i;
1212 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1213 }
1214 else
1215 {
1216 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_SYNC_WAITERS_THREAD;
1217 }
1218 }
1219
1220
1221 Uint32*
get_log_buffer(Ptr<Logfile_group> ptr,Uint32 sz)1222 Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
1223 {
1224 GlobalPage *page;
1225 page=m_shared_page_pool.getPtr(ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i);
1226
1227 Uint32 total_free= ptr.p->m_free_buffer_words;
1228 assert(total_free >= sz);
1229 Uint32 pos= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
1230 Uint32 free= File_formats::UNDO_PAGE_WORDS - pos;
1231
1232 if(sz <= free)
1233 {
1234 next:
1235 // fits this page wo/ problem
1236 ndbrequire(total_free > sz);
1237 ptr.p->m_free_buffer_words = total_free - sz;
1238 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
1239 return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
1240 }
1241
1242 /**
1243 * It didn't fit page...fill page with a NOOP log entry
1244 */
1245 Uint64 lsn= ptr.p->m_last_lsn - 1;
1246 File_formats::Undofile::Undo_page* undo=
1247 (File_formats::Undofile::Undo_page*)page;
1248 undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
1249 undo->m_page_header.m_page_lsn_hi = lsn >> 32;
1250 undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1251
1252 /**
1253 * Update free space with extra NOOP
1254 */
1255 ndbrequire(ptr.p->m_free_file_words >= free);
1256 ptr.p->m_free_file_words -= free;
1257
1258 validate_logfile_group(ptr, "get_log_buffer");
1259
1260 pos= 0;
1261 assert(total_free >= free);
1262 total_free -= free;
1263 page= m_shared_page_pool.getPtr(next_page(ptr.p, PRODUCER));
1264 goto next;
1265 }
1266
1267 Uint32
next_page(Logfile_group * ptrP,Uint32 i)1268 Lgman::next_page(Logfile_group* ptrP, Uint32 i)
1269 {
1270 Uint32 page_ptr_i= ptrP->m_pos[i].m_current_pos.m_ptr_i;
1271 Uint32 left_in_range= ptrP->m_pos[i].m_current_page.m_idx;
1272 if(left_in_range > 0)
1273 {
1274 ptrP->m_pos[i].m_current_page.m_idx = left_in_range - 1;
1275 ptrP->m_pos[i].m_current_pos.m_ptr_i = page_ptr_i + 1;
1276 return page_ptr_i + 1;
1277 }
1278 else
1279 {
1280 Lgman::Page_map map(m_data_buffer_pool, ptrP->m_buffer_pages);
1281 Uint32 pos= (ptrP->m_pos[i].m_current_page.m_ptr_i + 2) % map.getSize();
1282 Lgman::Page_map::Iterator it;
1283 map.position(it, pos);
1284
1285 union {
1286 Uint32 tmp[2];
1287 Lgman::Buffer_idx range;
1288 };
1289
1290 tmp[0] = *it.data; map.next(it);
1291 tmp[1] = *it.data;
1292
1293 ptrP->m_pos[i].m_current_page.m_ptr_i = pos; // New index in map
1294 ptrP->m_pos[i].m_current_page.m_idx = range.m_idx - 1; // Free pages
1295 ptrP->m_pos[i].m_current_pos.m_ptr_i = range.m_ptr_i; // Current page
1296 // No need to set ptrP->m_current_pos.m_idx, that is set "in higher"-func
1297 return range.m_ptr_i;
1298 }
1299 }
1300
1301 int
get_log_buffer(Signal * signal,Uint32 sz,SimulatedBlock::Callback * callback)1302 Logfile_client::get_log_buffer(Signal* signal, Uint32 sz,
1303 SimulatedBlock::Callback* callback)
1304 {
1305 sz += 2; // lsn
1306 Lgman::Logfile_group key;
1307 key.m_logfile_group_id= m_logfile_group_id;
1308 Ptr<Lgman::Logfile_group> ptr;
1309 if(m_lgman->m_logfile_group_hash.find(ptr, key))
1310 {
1311 if(ptr.p->m_free_buffer_words >= (sz + 2*File_formats::UNDO_PAGE_WORDS)&&
1312 ptr.p->m_log_buffer_waiters.isEmpty())
1313 {
1314 return 1;
1315 }
1316
1317 bool empty= false;
1318 {
1319 Ptr<Lgman::Log_waiter> wait;
1320 Lgman::Local_log_waiter_list
1321 list(m_lgman->m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1322
1323 empty= list.isEmpty();
1324 if(!list.seize(wait))
1325 {
1326 return -1;
1327 }
1328
1329 wait.p->m_size= sz;
1330 wait.p->m_block= m_block;
1331 memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::Callback));
1332 }
1333
1334 return 0;
1335 }
1336 return -1;
1337 }
1338
1339 NdbOut&
operator <<(NdbOut & out,const Lgman::Buffer_idx & pos)1340 operator<<(NdbOut& out, const Lgman::Buffer_idx& pos)
1341 {
1342 out << "[ "
1343 << pos.m_ptr_i << " "
1344 << pos.m_idx << " ]";
1345 return out;
1346 }
1347
1348 NdbOut&
operator <<(NdbOut & out,const Lgman::Logfile_group::Position & pos)1349 operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos)
1350 {
1351 out << "[ ("
1352 << pos.m_current_page.m_ptr_i << " "
1353 << pos.m_current_page.m_idx << ") ("
1354 << pos.m_current_pos.m_ptr_i << " "
1355 << pos.m_current_pos.m_idx << ") ]";
1356 return out;
1357 }
1358
1359 void
flush_log(Signal * signal,Ptr<Logfile_group> ptr,Uint32 force)1360 Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
1361 {
1362 Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
1363 Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
1364
1365 jamEntry();
1366
1367 if(consumer.m_current_page == producer.m_current_page)
1368 {
1369
1370 #if 0
1371 if (force)
1372 {
1373 ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x",
1374 force, ptr.p->m_file_pos[HEAD].m_ptr_i);
1375 ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
1376 consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
1377 producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
1378 }
1379 #endif
1380 if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1381 {
1382 jam();
1383
1384 if (ptr.p->m_log_buffer_waiters.isEmpty() || ptr.p->m_outstanding_fs)
1385 {
1386 force = 0;
1387 }
1388
1389 if (force < 2)
1390 {
1391 signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1392 signal->theData[1] = ptr.i;
1393 signal->theData[2] = force + 1;
1394 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
1395 force ? 10 : 100, 3);
1396 return;
1397 }
1398 else
1399 {
1400 Buffer_idx pos= producer.m_current_pos;
1401 GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1402
1403 Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1404
1405 ndbout_c("force flush %d %d", pos.m_idx, ptr.p->m_free_buffer_words);
1406
1407 ndbrequire(pos.m_idx); // don't flush empty page...
1408 Uint64 lsn= ptr.p->m_last_lsn - 1;
1409
1410 File_formats::Undofile::Undo_page* undo=
1411 (File_formats::Undofile::Undo_page*)page;
1412 undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
1413 undo->m_page_header.m_page_lsn_hi = lsn >> 32;
1414 undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1415
1416 /**
1417 * Update free space with extra NOOP
1418 */
1419 ndbrequire(ptr.p->m_free_file_words >= free);
1420 ndbrequire(ptr.p->m_free_buffer_words > free);
1421 ptr.p->m_free_file_words -= free;
1422 ptr.p->m_free_buffer_words -= free;
1423
1424 validate_logfile_group(ptr, "force_log_flush");
1425
1426 next_page(ptr.p, PRODUCER);
1427 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1428 producer = ptr.p->m_pos[PRODUCER];
1429 // break through
1430 }
1431 }
1432 else
1433 {
1434 jam();
1435 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1436 return;
1437 }
1438 }
1439
1440 bool full= false;
1441 Uint32 tot= 0;
1442 while(!(consumer.m_current_page == producer.m_current_page) && !full)
1443 {
1444 validate_logfile_group(ptr, "before flush log");
1445
1446 Uint32 cnt; // pages written
1447 Uint32 page= consumer.m_current_pos.m_ptr_i;
1448 if(consumer.m_current_page.m_ptr_i == producer.m_current_page.m_ptr_i)
1449 {
1450 if(consumer.m_current_page.m_idx > producer.m_current_page.m_idx)
1451 {
1452 jam();
1453 Uint32 tmp=
1454 consumer.m_current_page.m_idx - producer.m_current_page.m_idx;
1455 cnt= write_log_pages(signal, ptr, page, tmp);
1456 assert(cnt <= tmp);
1457
1458 consumer.m_current_pos.m_ptr_i += cnt;
1459 consumer.m_current_page.m_idx -= cnt;
1460 full= (tmp > cnt);
1461 }
1462 else
1463 {
1464 // Only 1 chunk
1465 ndbrequire(ptr.p->m_buffer_pages.getSize() == 2);
1466 Uint32 tmp= consumer.m_current_page.m_idx + 1;
1467 cnt= write_log_pages(signal, ptr, page, tmp);
1468 assert(cnt <= tmp);
1469
1470 if(cnt == tmp)
1471 {
1472 jam();
1473 /**
1474 * Entire chunk is written
1475 * move to next
1476 */
1477 ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1478 next_page(ptr.p, CONSUMER);
1479 consumer = ptr.p->m_pos[CONSUMER];
1480 }
1481 else
1482 {
1483 jam();
1484 /**
1485 * Failed to write entire chunk...
1486 */
1487 full= true;
1488 consumer.m_current_page.m_idx -= cnt;
1489 consumer.m_current_pos.m_ptr_i += cnt;
1490 }
1491 }
1492 }
1493 else
1494 {
1495 Uint32 tmp= consumer.m_current_page.m_idx + 1;
1496 cnt= write_log_pages(signal, ptr, page, tmp);
1497 assert(cnt <= tmp);
1498
1499 if(cnt == tmp)
1500 {
1501 jam();
1502 /**
1503 * Entire chunk is written
1504 * move to next
1505 */
1506 ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1507 next_page(ptr.p, CONSUMER);
1508 consumer = ptr.p->m_pos[CONSUMER];
1509 }
1510 else
1511 {
1512 jam();
1513 /**
1514 * Failed to write entire chunk...
1515 */
1516 full= true;
1517 consumer.m_current_page.m_idx -= cnt;
1518 consumer.m_current_pos.m_ptr_i += cnt;
1519 }
1520 }
1521
1522 tot += cnt;
1523 if(cnt)
1524 validate_logfile_group(ptr, " after flush_log");
1525 }
1526
1527 ptr.p->m_pos[CONSUMER]= consumer;
1528
1529 if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1530 {
1531 signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1532 signal->theData[1] = ptr.i;
1533 signal->theData[2] = 0;
1534 sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
1535 }
1536 else
1537 {
1538 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1539 }
1540 }
1541
1542 void
process_log_buffer_waiters(Signal * signal,Ptr<Logfile_group> ptr)1543 Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1544 {
1545 Uint32 free_buffer= ptr.p->m_free_buffer_words;
1546 Local_log_waiter_list
1547 list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1548
1549 if(list.isEmpty())
1550 {
1551 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1552 return;
1553 }
1554
1555 bool removed= false;
1556 Ptr<Log_waiter> waiter;
1557 list.first(waiter);
1558 Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1559 if(waiter.p->m_size + 2*File_formats::UNDO_PAGE_WORDS < free_buffer)
1560 {
1561 removed= true;
1562 Uint32 block = waiter.p->m_block;
1563 SimulatedBlock* b = globalData.getBlock(block);
1564 b->execute(signal, waiter.p->m_callback, logfile_group_id);
1565
1566 list.releaseFirst(waiter);
1567 }
1568
1569 if(removed && !list.isEmpty())
1570 {
1571 ptr.p->m_state |= Logfile_group::LG_WAITERS_THREAD;
1572 signal->theData[0] = LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS;
1573 signal->theData[1] = ptr.i;
1574 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1575 }
1576 else
1577 {
1578 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1579 }
1580 }
1581
1582 #define REALLY_SLOW_FS 0
1583
1584 Uint32
write_log_pages(Signal * signal,Ptr<Logfile_group> ptr,Uint32 pageId,Uint32 in_pages)1585 Lgman::write_log_pages(Signal* signal, Ptr<Logfile_group> ptr,
1586 Uint32 pageId, Uint32 in_pages)
1587 {
1588 assert(in_pages);
1589 Ptr<Undofile> filePtr;
1590 Buffer_idx head= ptr.p->m_file_pos[HEAD];
1591 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1592 m_file_pool.getPtr(filePtr, head.m_ptr_i);
1593
1594 if(filePtr.p->m_online.m_outstanding > 0)
1595 {
1596 jam();
1597 return 0;
1598 }
1599
1600 Uint32 sz= filePtr.p->m_file_size - 1; // skip zero
1601 Uint32 max, pages= in_pages;
1602
1603 if(!(head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx))
1604 {
1605 max= sz - head.m_idx;
1606 }
1607 else
1608 {
1609 max= tail.m_idx - head.m_idx;
1610 }
1611
1612 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
1613 req->filePointer = filePtr.p->m_fd;
1614 req->userReference = reference();
1615 req->userPointer = filePtr.i;
1616 req->varIndex = 1+head.m_idx; // skip zero page
1617 req->numberOfPages = pages;
1618 req->data.pageData[0] = pageId;
1619 req->operationFlag = 0;
1620 FsReadWriteReq::setFormatFlag(req->operationFlag,
1621 FsReadWriteReq::fsFormatSharedPage);
1622
1623 if(max > pages)
1624 {
1625 jam();
1626 max= pages;
1627 head.m_idx += max;
1628 ptr.p->m_file_pos[HEAD] = head;
1629
1630 if (REALLY_SLOW_FS)
1631 sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
1632 FsReadWriteReq::FixedLength + 1);
1633 else
1634 sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
1635 FsReadWriteReq::FixedLength + 1, JBA);
1636
1637 ptr.p->m_outstanding_fs++;
1638 filePtr.p->m_online.m_outstanding = max;
1639 filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
1640
1641 File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*)
1642 m_shared_page_pool.getPtr(pageId + max - 1);
1643 Uint64 lsn = 0;
1644 lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
1645 lsn += page->m_page_header.m_page_lsn_lo;
1646
1647 filePtr.p->m_online.m_lsn = lsn; // Store last writereq lsn on file
1648 ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
1649 }
1650 else
1651 {
1652 jam();
1653 req->numberOfPages = max;
1654 FsReadWriteReq::setSyncFlag(req->operationFlag, 1);
1655
1656 if (REALLY_SLOW_FS)
1657 sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
1658 FsReadWriteReq::FixedLength + 1);
1659 else
1660 sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
1661 FsReadWriteReq::FixedLength + 1, JBA);
1662
1663 ptr.p->m_outstanding_fs++;
1664 filePtr.p->m_online.m_outstanding = max;
1665 filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
1666
1667 File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*)
1668 m_shared_page_pool.getPtr(pageId + max - 1);
1669 Uint64 lsn = 0;
1670 lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
1671 lsn += page->m_page_header.m_page_lsn_lo;
1672
1673 filePtr.p->m_online.m_lsn = lsn; // Store last writereq lsn on file
1674 ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
1675
1676 Ptr<Undofile> next = filePtr;
1677 Local_undofile_list files(m_file_pool, ptr.p->m_files);
1678 if(!files.next(next))
1679 {
1680 jam();
1681 files.first(next);
1682 }
1683 ndbout_c("changing file from %d to %d", filePtr.i, next.i);
1684 filePtr.p->m_state |= Undofile::FS_MOVE_NEXT;
1685 next.p->m_state &= ~(Uint32)Undofile::FS_EMPTY;
1686
1687 head.m_idx= 0;
1688 head.m_ptr_i= next.i;
1689 ptr.p->m_file_pos[HEAD] = head;
1690 if(max < pages)
1691 max += write_log_pages(signal, ptr, pageId + max, pages - max);
1692 }
1693
1694 assert(max);
1695 return max;
1696 }
1697
1698 void
execFSWRITEREF(Signal * signal)1699 Lgman::execFSWRITEREF(Signal* signal)
1700 {
1701 jamEntry();
1702 SimulatedBlock::execFSWRITEREF(signal);
1703 ndbrequire(false);
1704 }
1705
1706 void
execFSWRITECONF(Signal * signal)1707 Lgman::execFSWRITECONF(Signal* signal)
1708 {
1709 jamEntry();
1710 FsConf * conf = (FsConf*)signal->getDataPtr();
1711 Ptr<Undofile> ptr;
1712 m_file_pool.getPtr(ptr, conf->userPointer);
1713
1714 ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
1715 ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
1716
1717 Ptr<Logfile_group> lg_ptr;
1718 m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
1719
1720 Uint32 cnt= lg_ptr.p->m_outstanding_fs;
1721 ndbrequire(cnt);
1722
1723 if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
1724 {
1725 Uint32 tot= 0;
1726 Uint64 lsn = 0;
1727 {
1728 Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
1729 while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
1730 {
1731 Uint32 state= ptr.p->m_state;
1732 Uint32 pages= ptr.p->m_online.m_outstanding;
1733 ndbrequire(pages);
1734 ptr.p->m_online.m_outstanding= 0;
1735 ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
1736 tot += pages;
1737 cnt--;
1738
1739 lsn = ptr.p->m_online.m_lsn;
1740
1741 if((state & Undofile::FS_MOVE_NEXT) && !files.next(ptr))
1742 files.first(ptr);
1743 }
1744 }
1745
1746 ndbassert(tot);
1747 lg_ptr.p->m_outstanding_fs = cnt;
1748 lg_ptr.p->m_free_buffer_words += (tot * File_formats::UNDO_PAGE_WORDS);
1749 lg_ptr.p->m_next_reply_ptr_i = ptr.i;
1750 lg_ptr.p->m_last_synced_lsn = lsn;
1751
1752 if(! (lg_ptr.p->m_state & Logfile_group::LG_SYNC_WAITERS_THREAD))
1753 {
1754 process_log_sync_waiters(signal, lg_ptr);
1755 }
1756
1757 if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
1758 {
1759 process_log_buffer_waiters(signal, lg_ptr);
1760 }
1761 }
1762 else
1763 {
1764 ndbout_c("miss matched writes");
1765 }
1766
1767 return;
1768 }
1769
1770 void
execLCP_FRAG_ORD(Signal * signal)1771 Lgman::execLCP_FRAG_ORD(Signal* signal)
1772 {
1773 jamEntry();
1774
1775 LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
1776 Uint32 lcp_id= ord->lcpId;
1777 Uint32 frag_id = ord->fragmentId;
1778 Uint32 table_id = ord->tableId;
1779
1780 Ptr<Logfile_group> ptr;
1781 m_logfile_group_list.first(ptr);
1782
1783 Uint32 entry= lcp_id == m_latest_lcp ?
1784 File_formats::Undofile::UNDO_LCP : File_formats::Undofile::UNDO_LCP_FIRST;
1785 if(!ptr.isNull() && ! (ptr.p->m_state & Logfile_group::LG_CUT_LOG_THREAD))
1786 {
1787 jam();
1788 ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
1789 signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
1790 signal->theData[1] = ptr.i;
1791 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1792 }
1793
1794 if(!ptr.isNull() && ptr.p->m_last_lsn)
1795 {
1796 Uint32 undo[3];
1797 undo[0] = lcp_id;
1798 undo[1] = (table_id << 16) | frag_id;
1799 undo[2] = (entry << 16 ) | (sizeof(undo) >> 2);
1800
1801 Uint64 last_lsn= m_last_lsn;
1802
1803 if(ptr.p->m_last_lsn == last_lsn
1804 #ifdef VM_TRACE
1805 && ((rand() % 100) > 50)
1806 #endif
1807 )
1808 {
1809 undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
1810 Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
1811 memcpy(dst, undo, sizeof(undo));
1812 ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
1813 ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
1814 }
1815 else
1816 {
1817 Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
1818 * dst++ = last_lsn >> 32;
1819 * dst++ = last_lsn & 0xFFFFFFFF;
1820 memcpy(dst, undo, sizeof(undo));
1821 ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
1822 ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
1823 }
1824 ptr.p->m_last_lcp_lsn = last_lsn;
1825 m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
1826
1827 validate_logfile_group(ptr, "execLCP_FRAG_ORD");
1828 }
1829
1830 while(!ptr.isNull())
1831 {
1832 if (ptr.p->m_last_lsn)
1833 {
1834 /**
1835 * First LCP_FRAGORD for each LCP, sets tail pos
1836 */
1837 if(m_latest_lcp != lcp_id)
1838 {
1839 ptr.p->m_tail_pos[0] = ptr.p->m_tail_pos[1];
1840 ptr.p->m_tail_pos[1] = ptr.p->m_tail_pos[2];
1841 ptr.p->m_tail_pos[2] = ptr.p->m_file_pos[HEAD];
1842 }
1843
1844 if(0)
1845 ndbout_c
1846 ("execLCP_FRAG_ORD (%d %d) (%d %d) (%d %d) free pages: %ld",
1847 ptr.p->m_tail_pos[0].m_ptr_i, ptr.p->m_tail_pos[0].m_idx,
1848 ptr.p->m_tail_pos[1].m_ptr_i, ptr.p->m_tail_pos[1].m_idx,
1849 ptr.p->m_tail_pos[2].m_ptr_i, ptr.p->m_tail_pos[2].m_idx,
1850 (long) (ptr.p->m_free_file_words / File_formats::UNDO_PAGE_WORDS));
1851 }
1852 m_logfile_group_list.next(ptr);
1853 }
1854
1855 m_latest_lcp = lcp_id;
1856 }
1857
1858 void
execEND_LCP_REQ(Signal * signal)1859 Lgman::execEND_LCP_REQ(Signal* signal)
1860 {
1861 EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
1862 ndbrequire(m_latest_lcp == req->backupId);
1863
1864 Ptr<Logfile_group> ptr;
1865 m_logfile_group_list.first(ptr);
1866 bool wait= false;
1867 while(!ptr.isNull())
1868 {
1869 Uint64 lcp_lsn = ptr.p->m_last_lcp_lsn;
1870 if(ptr.p->m_last_synced_lsn < lcp_lsn)
1871 {
1872 wait= true;
1873 if(signal->getSendersBlockRef() != reference())
1874 {
1875 Logfile_client tmp(this, this, ptr.p->m_logfile_group_id);
1876 Logfile_client::Request req;
1877 req.m_callback.m_callbackData = ptr.i;
1878 req.m_callback.m_callbackFunction = safe_cast(&Lgman::endlcp_callback);
1879 ndbrequire(tmp.sync_lsn(signal, lcp_lsn, &req, 0) == 0);
1880 }
1881 }
1882 else
1883 {
1884 ptr.p->m_last_lcp_lsn = 0;
1885 }
1886 m_logfile_group_list.next(ptr);
1887 }
1888
1889 if(wait)
1890 {
1891 return;
1892 }
1893
1894 signal->theData[0] = 0;
1895 sendSignal(DBLQH_REF, GSN_END_LCP_CONF, signal, 1, JBB);
1896 }
1897
1898 void
endlcp_callback(Signal * signal,Uint32 ptr,Uint32 res)1899 Lgman::endlcp_callback(Signal* signal, Uint32 ptr, Uint32 res)
1900 {
1901 EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
1902 req->backupId = m_latest_lcp;
1903 execEND_LCP_REQ(signal);
1904 }
1905
1906 void
cut_log_tail(Signal * signal,Ptr<Logfile_group> ptr)1907 Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr)
1908 {
1909 bool done= true;
1910 if (likely(ptr.p->m_last_lsn))
1911 {
1912 Buffer_idx tmp= ptr.p->m_tail_pos[0];
1913 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1914
1915 Ptr<Undofile> filePtr;
1916 m_file_pool.getPtr(filePtr, tail.m_ptr_i);
1917
1918 if(!(tmp == tail))
1919 {
1920 Uint32 free;
1921 if(tmp.m_ptr_i == tail.m_ptr_i && tail.m_idx < tmp.m_idx)
1922 {
1923 free= tmp.m_idx - tail.m_idx;
1924 ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
1925 ptr.p->m_file_pos[TAIL] = tmp;
1926 }
1927 else
1928 {
1929 free= filePtr.p->m_file_size - tail.m_idx - 1;
1930 ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
1931
1932 Ptr<Undofile> next = filePtr;
1933 Local_undofile_list files(m_file_pool, ptr.p->m_files);
1934 while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY))
1935 ndbrequire(next.i != filePtr.i);
1936 if(next.isNull())
1937 {
1938 jam();
1939 files.first(next);
1940 while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next))
1941 ndbrequire(next.i != filePtr.i);
1942 }
1943
1944 tmp.m_idx= 0;
1945 tmp.m_ptr_i= next.i;
1946 ptr.p->m_file_pos[TAIL] = tmp;
1947 done= false;
1948 }
1949 }
1950
1951 validate_logfile_group(ptr, "cut log");
1952 }
1953
1954 if (done)
1955 {
1956 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_CUT_LOG_THREAD;
1957 m_logfile_group_list.next(ptr);
1958 }
1959
1960 if(!done || !ptr.isNull())
1961 {
1962 ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
1963 signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
1964 signal->theData[1] = ptr.i;
1965 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1966 }
1967 }
1968
1969 void
execSUB_GCP_COMPLETE_REP(Signal * signal)1970 Lgman::execSUB_GCP_COMPLETE_REP(Signal* signal)
1971 {
1972 jamEntry();
1973
1974 Ptr<Logfile_group> ptr;
1975 m_logfile_group_list.first(ptr);
1976
1977 /**
1978 * Filter all logfile groups in parallell
1979 */
1980 return; // NOT IMPLETMENT YET
1981
1982 signal->theData[0] = LgmanContinueB::FILTER_LOG;
1983 while(!ptr.isNull())
1984 {
1985 signal->theData[1] = ptr.i;
1986 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1987 m_logfile_group_list.next(ptr);
1988 }
1989 }
1990
1991 int
alloc_log_space(Uint32 ref,Uint32 words)1992 Lgman::alloc_log_space(Uint32 ref, Uint32 words)
1993 {
1994 ndbrequire(words);
1995 words += 2; // lsn
1996 Logfile_group key;
1997 key.m_logfile_group_id= ref;
1998 Ptr<Logfile_group> ptr;
1999 if(m_logfile_group_hash.find(ptr, key) &&
2000 ptr.p->m_free_file_words >= (words + (4 * File_formats::UNDO_PAGE_WORDS)))
2001 {
2002 ptr.p->m_free_file_words -= words;
2003 validate_logfile_group(ptr, "alloc_log_space");
2004 return 0;
2005 }
2006
2007 if(ptr.isNull())
2008 {
2009 return -1;
2010 }
2011
2012 return 1501;
2013 }
2014
2015 int
free_log_space(Uint32 ref,Uint32 words)2016 Lgman::free_log_space(Uint32 ref, Uint32 words)
2017 {
2018 ndbrequire(words);
2019 Logfile_group key;
2020 key.m_logfile_group_id= ref;
2021 Ptr<Logfile_group> ptr;
2022 if(m_logfile_group_hash.find(ptr, key))
2023 {
2024 ptr.p->m_free_file_words += (words + 2);
2025 validate_logfile_group(ptr, "free_log_space");
2026 return 0;
2027 }
2028 ndbrequire(false);
2029 return -1;
2030 }
2031
2032 Uint64
add_entry(const Change * src,Uint32 cnt)2033 Logfile_client::add_entry(const Change* src, Uint32 cnt)
2034 {
2035 Uint32 i, tot= 0;
2036 for(i= 0; i<cnt; i++)
2037 {
2038 tot += src[i].len;
2039 }
2040
2041 Uint32 *dst;
2042 Uint64 last_lsn= m_lgman->m_last_lsn;
2043 {
2044 Lgman::Logfile_group key;
2045 key.m_logfile_group_id= m_logfile_group_id;
2046 Ptr<Lgman::Logfile_group> ptr;
2047 if(m_lgman->m_logfile_group_hash.find(ptr, key))
2048 {
2049 Uint64 last_lsn_filegroup= ptr.p->m_last_lsn;
2050 if(last_lsn_filegroup == last_lsn
2051 #ifdef VM_TRACE
2052 && ((rand() % 100) > 50)
2053 #endif
2054 )
2055 {
2056 dst= m_lgman->get_log_buffer(ptr, tot);
2057 for(i= 0; i<cnt; i++)
2058 {
2059 memcpy(dst, src[i].ptr, 4*src[i].len);
2060 dst += src[i].len;
2061 }
2062 * (dst - 1) |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
2063 ptr.p->m_free_file_words += 2;
2064 ptr.p->m_free_buffer_words += 2;
2065 m_lgman->validate_logfile_group(ptr);
2066 }
2067 else
2068 {
2069 dst= m_lgman->get_log_buffer(ptr, tot + 2);
2070 * dst++ = last_lsn >> 32;
2071 * dst++ = last_lsn & 0xFFFFFFFF;
2072 for(i= 0; i<cnt; i++)
2073 {
2074 memcpy(dst, src[i].ptr, 4*src[i].len);
2075 dst += src[i].len;
2076 }
2077 }
2078 }
2079
2080 m_lgman->m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
2081
2082 return last_lsn;
2083 }
2084 }
2085
2086 void
execSTART_RECREQ(Signal * signal)2087 Lgman::execSTART_RECREQ(Signal* signal)
2088 {
2089 m_latest_lcp = signal->theData[0];
2090
2091 Ptr<Logfile_group> ptr;
2092 m_logfile_group_list.first(ptr);
2093
2094 if(ptr.i != RNIL)
2095 {
2096 infoEvent("Applying undo to LCP: %d", m_latest_lcp);
2097 ndbout_c("Applying undo to LCP: %d", m_latest_lcp);
2098 find_log_head(signal, ptr);
2099 return;
2100 }
2101
2102 signal->theData[0] = reference();
2103 sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
2104 }
2105
2106 void
find_log_head(Signal * signal,Ptr<Logfile_group> ptr)2107 Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr)
2108 {
2109 ndbrequire(ptr.p->m_state &
2110 (Logfile_group::LG_STARTING | Logfile_group::LG_SORTING));
2111
2112 if(ptr.p->m_meta_files.isEmpty() && ptr.p->m_files.isEmpty())
2113 {
2114 jam();
2115 /**
2116 * Logfile_group wo/ any files
2117 */
2118 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_STARTING;
2119 ptr.p->m_state |= Logfile_group::LG_ONLINE;
2120 m_logfile_group_list.next(ptr);
2121 signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2122 signal->theData[1] = ptr.i;
2123 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2124 return;
2125 }
2126
2127 ptr.p->m_state = Logfile_group::LG_SORTING;
2128
2129 /**
2130 * Read first page from each undofile (1 file at a time...)
2131 */
2132 Local_undofile_list files(m_file_pool, ptr.p->m_meta_files);
2133 Ptr<Undofile> file_ptr;
2134 files.first(file_ptr);
2135
2136 if(!file_ptr.isNull())
2137 {
2138 /**
2139 * Use log buffer memory when reading
2140 */
2141 Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2142 file_ptr.p->m_online.m_outstanding= page_id;
2143
2144 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2145 req->filePointer = file_ptr.p->m_fd;
2146 req->userReference = reference();
2147 req->userPointer = file_ptr.i;
2148 req->varIndex = 1; // skip zero page
2149 req->numberOfPages = 1;
2150 req->data.pageData[0] = page_id;
2151 req->operationFlag = 0;
2152 FsReadWriteReq::setFormatFlag(req->operationFlag,
2153 FsReadWriteReq::fsFormatSharedPage);
2154
2155 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2156 FsReadWriteReq::FixedLength + 1, JBA);
2157
2158 ptr.p->m_outstanding_fs++;
2159 file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2160 return;
2161 }
2162 else
2163 {
2164 /**
2165 * All files have read first page
2166 * and m_files is sorted acording to lsn
2167 */
2168 ndbrequire(!ptr.p->m_files.isEmpty());
2169 Local_undofile_list read_files(m_file_pool, ptr.p->m_files);
2170 read_files.last(file_ptr);
2171
2172
2173 /**
2174 * Init binary search
2175 */
2176 ptr.p->m_state = Logfile_group::LG_SEARCHING;
2177 file_ptr.p->m_state = Undofile::FS_SEARCHING;
2178 ptr.p->m_file_pos[TAIL].m_idx = 1; // left page
2179 ptr.p->m_file_pos[HEAD].m_idx = file_ptr.p->m_file_size;
2180 ptr.p->m_file_pos[HEAD].m_ptr_i = ((file_ptr.p->m_file_size - 1) >> 1) + 1;
2181
2182 Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2183 file_ptr.p->m_online.m_outstanding= page_id;
2184
2185 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2186 req->filePointer = file_ptr.p->m_fd;
2187 req->userReference = reference();
2188 req->userPointer = file_ptr.i;
2189 req->varIndex = ptr.p->m_file_pos[HEAD].m_ptr_i;
2190 req->numberOfPages = 1;
2191 req->data.pageData[0] = page_id;
2192 req->operationFlag = 0;
2193 FsReadWriteReq::setFormatFlag(req->operationFlag,
2194 FsReadWriteReq::fsFormatSharedPage);
2195
2196 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2197 FsReadWriteReq::FixedLength + 1, JBA);
2198
2199 ptr.p->m_outstanding_fs++;
2200 file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2201 return;
2202 }
2203 }
2204
2205 void
execFSREADCONF(Signal * signal)2206 Lgman::execFSREADCONF(Signal* signal)
2207 {
2208 jamEntry();
2209
2210 Ptr<Undofile> ptr;
2211 Ptr<Logfile_group> lg_ptr;
2212 FsConf* conf = (FsConf*)signal->getDataPtr();
2213
2214 m_file_pool.getPtr(ptr, conf->userPointer);
2215 m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
2216
2217 ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
2218 ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
2219
2220 Uint32 cnt= lg_ptr.p->m_outstanding_fs;
2221 ndbrequire(cnt);
2222
2223 if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING)
2224 {
2225 jam();
2226
2227 if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
2228 {
2229 Uint32 tot= 0;
2230 Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2231 while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
2232 {
2233 Uint32 state= ptr.p->m_state;
2234 Uint32 pages= ptr.p->m_online.m_outstanding;
2235 ndbrequire(pages);
2236 ptr.p->m_online.m_outstanding= 0;
2237 ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
2238 tot += pages;
2239 cnt--;
2240
2241 if((state & Undofile::FS_MOVE_NEXT) && !files.prev(ptr))
2242 files.last(ptr);
2243 }
2244
2245 lg_ptr.p->m_outstanding_fs = cnt;
2246 lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot;
2247 lg_ptr.p->m_next_reply_ptr_i = ptr.i;
2248 }
2249 return;
2250 }
2251
2252 lg_ptr.p->m_outstanding_fs = cnt - 1;
2253
2254 Ptr<GlobalPage> page_ptr;
2255 m_shared_page_pool.getPtr(page_ptr, ptr.p->m_online.m_outstanding);
2256 ptr.p->m_online.m_outstanding= 0;
2257
2258 File_formats::Undofile::Undo_page* page =
2259 (File_formats::Undofile::Undo_page*)page_ptr.p;
2260
2261 Uint64 lsn = 0;
2262 lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2263 lsn += page->m_page_header.m_page_lsn_lo;
2264
2265 switch(ptr.p->m_state){
2266 case Undofile::FS_SORTING:
2267 jam();
2268 break;
2269 case Undofile::FS_SEARCHING:
2270 jam();
2271 find_log_head_in_file(signal, lg_ptr, ptr, lsn);
2272 return;
2273 default:
2274 case Undofile::FS_EXECUTING:
2275 case Undofile::FS_CREATING:
2276 case Undofile::FS_DROPPING:
2277 case Undofile::FS_ONLINE:
2278 case Undofile::FS_OPENING:
2279 case Undofile::FS_EMPTY:
2280 jam();
2281 ndbrequire(false);
2282 }
2283
2284 /**
2285 * Prepare for execution
2286 */
2287 ptr.p->m_state = Undofile::FS_EXECUTING;
2288 ptr.p->m_online.m_lsn = lsn;
2289
2290 /**
2291 * Insert into m_files
2292 */
2293 {
2294 Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
2295 Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2296 meta.remove(ptr);
2297
2298 Ptr<Undofile> loop;
2299 files.first(loop);
2300 while(!loop.isNull() && loop.p->m_online.m_lsn <= lsn)
2301 files.next(loop);
2302
2303 if(loop.isNull())
2304 {
2305 /**
2306 * File has highest lsn, add last
2307 */
2308 jam();
2309 files.add(ptr);
2310 }
2311 else
2312 {
2313 /**
2314 * Insert file in correct position in file list
2315 */
2316 files.insert(ptr, loop);
2317 }
2318 }
2319 find_log_head(signal, lg_ptr);
2320 }
2321
2322 void
execFSREADREF(Signal * signal)2323 Lgman::execFSREADREF(Signal* signal)
2324 {
2325 jamEntry();
2326 SimulatedBlock::execFSREADREF(signal);
2327 ndbrequire(false);
2328 }
2329
2330 void
find_log_head_in_file(Signal * signal,Ptr<Logfile_group> ptr,Ptr<Undofile> file_ptr,Uint64 last_lsn)2331 Lgman::find_log_head_in_file(Signal* signal,
2332 Ptr<Logfile_group> ptr,
2333 Ptr<Undofile> file_ptr,
2334 Uint64 last_lsn)
2335 {
2336 // a b
2337 // 3 4 5 0 1
2338 Uint32 curr= ptr.p->m_file_pos[HEAD].m_ptr_i;
2339 Uint32 head= ptr.p->m_file_pos[HEAD].m_idx;
2340 Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx;
2341
2342 ndbrequire(head > tail);
2343 Uint32 diff = head - tail;
2344
2345 if(DEBUG_SEARCH_LOG_HEAD)
2346 printf("tail: %d(%lld) head: %d last: %d(%lld) -> ",
2347 tail, file_ptr.p->m_online.m_lsn,
2348 head, curr, last_lsn);
2349 if(last_lsn > file_ptr.p->m_online.m_lsn)
2350 {
2351 if(DEBUG_SEARCH_LOG_HEAD)
2352 printf("moving tail ");
2353
2354 file_ptr.p->m_online.m_lsn = last_lsn;
2355 ptr.p->m_file_pos[TAIL].m_idx = tail = curr;
2356 }
2357 else
2358 {
2359 if(DEBUG_SEARCH_LOG_HEAD)
2360 printf("moving head ");
2361
2362 ptr.p->m_file_pos[HEAD].m_idx = head = curr;
2363 }
2364
2365 if(diff > 1)
2366 {
2367 // We need to find more pages to be sure...
2368 ptr.p->m_file_pos[HEAD].m_ptr_i = curr = ((head + tail) >> 1);
2369
2370 if(DEBUG_SEARCH_LOG_HEAD)
2371 ndbout_c("-> new search tail: %d(%lld) head: %d -> %d",
2372 tail, file_ptr.p->m_online.m_lsn,
2373 head, curr);
2374
2375 Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2376 file_ptr.p->m_online.m_outstanding= page_id;
2377
2378 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2379 req->filePointer = file_ptr.p->m_fd;
2380 req->userReference = reference();
2381 req->userPointer = file_ptr.i;
2382 req->varIndex = curr;
2383 req->numberOfPages = 1;
2384 req->data.pageData[0] = page_id;
2385 req->operationFlag = 0;
2386 FsReadWriteReq::setFormatFlag(req->operationFlag,
2387 FsReadWriteReq::fsFormatSharedPage);
2388
2389 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2390 FsReadWriteReq::FixedLength + 1, JBA);
2391
2392 ptr.p->m_outstanding_fs++;
2393 file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2394 return;
2395 }
2396
2397 ndbrequire(diff == 1);
2398 if(DEBUG_SEARCH_LOG_HEAD)
2399 ndbout_c("-> found last page: %d", tail);
2400
2401 ptr.p->m_state = 0;
2402 file_ptr.p->m_state = Undofile::FS_EXECUTING;
2403 ptr.p->m_last_lsn = file_ptr.p->m_online.m_lsn;
2404 ptr.p->m_last_read_lsn = file_ptr.p->m_online.m_lsn;
2405 ptr.p->m_last_synced_lsn = file_ptr.p->m_online.m_lsn;
2406 m_last_lsn = file_ptr.p->m_online.m_lsn;
2407
2408 /**
2409 * Set HEAD position
2410 */
2411 ptr.p->m_file_pos[HEAD].m_ptr_i = file_ptr.i;
2412 ptr.p->m_file_pos[HEAD].m_idx = tail;
2413
2414 ptr.p->m_file_pos[TAIL].m_ptr_i = file_ptr.i;
2415 ptr.p->m_file_pos[TAIL].m_idx = tail - 1;
2416 ptr.p->m_next_reply_ptr_i = file_ptr.i;
2417
2418 {
2419 Local_undofile_list files(m_file_pool, ptr.p->m_files);
2420 if(tail == 1)
2421 {
2422 /**
2423 * HEAD is first page in a file...
2424 * -> PREV should be in previous file
2425 */
2426 Ptr<Undofile> prev = file_ptr;
2427 if(!files.prev(prev))
2428 {
2429 files.last(prev);
2430 }
2431 ptr.p->m_file_pos[TAIL].m_ptr_i = prev.i;
2432 ptr.p->m_file_pos[TAIL].m_idx = prev.p->m_file_size - 1;
2433 ptr.p->m_next_reply_ptr_i = prev.i;
2434 }
2435
2436 SimulatedBlock* fs = globalData.getBlock(NDBFS);
2437 infoEvent("Undo head - %s page: %d lsn: %lld",
2438 fs->get_filename(file_ptr.p->m_fd),
2439 tail, file_ptr.p->m_online.m_lsn);
2440 g_eventLogger.info("Undo head - %s page: %d lsn: %lld",
2441 fs->get_filename(file_ptr.p->m_fd),
2442 tail, file_ptr.p->m_online.m_lsn);
2443
2444 for(files.prev(file_ptr); !file_ptr.isNull(); files.prev(file_ptr))
2445 {
2446 infoEvent(" - next - %s(%lld)",
2447 fs->get_filename(file_ptr.p->m_fd),
2448 file_ptr.p->m_online.m_lsn);
2449
2450 g_eventLogger.info(" - next - %s(%lld)",
2451 fs->get_filename(file_ptr.p->m_fd),
2452 file_ptr.p->m_online.m_lsn);
2453 }
2454 }
2455
2456 /**
2457 * Start next logfile group
2458 */
2459 m_logfile_group_list.next(ptr);
2460 signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2461 signal->theData[1] = ptr.i;
2462 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2463 }
2464
2465 void
init_run_undo_log(Signal * signal)2466 Lgman::init_run_undo_log(Signal* signal)
2467 {
2468 /**
2469 * Perform initial sorting of logfile groups
2470 */
2471 Ptr<Logfile_group> group;
2472 Logfile_group_list& list= m_logfile_group_list;
2473 Logfile_group_list tmp(m_logfile_group_pool);
2474
2475 list.first(group);
2476 while(!group.isNull())
2477 {
2478 Ptr<Logfile_group> ptr= group;
2479 list.next(group);
2480 list.remove(ptr);
2481
2482 {
2483 /**
2484 * Init buffer pointers
2485 */
2486 ptr.p->m_free_buffer_words -= File_formats::UNDO_PAGE_WORDS;
2487 ptr.p->m_pos[CONSUMER].m_current_page.m_idx = 0; // 0 more pages read
2488 ptr.p->m_pos[PRODUCER].m_current_page.m_idx = 0; // 0 more pages read
2489
2490 Uint32 page = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2491 File_formats::Undofile::Undo_page* pageP =
2492 (File_formats::Undofile::Undo_page*)m_shared_page_pool.getPtr(page);
2493
2494 ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = pageP->m_words_used;
2495 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 1;
2496 ptr.p->m_last_read_lsn++;
2497 }
2498
2499 /**
2500 * Start producer thread
2501 */
2502 signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2503 signal->theData[1] = ptr.i;
2504 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2505
2506 /**
2507 * Insert in correct position in list of logfile_group's
2508 */
2509 Ptr<Logfile_group> pos;
2510 for(tmp.first(pos); !pos.isNull(); tmp.next(pos))
2511 if(ptr.p->m_last_read_lsn >= pos.p->m_last_read_lsn)
2512 break;
2513
2514 if(pos.isNull())
2515 tmp.add(ptr);
2516 else
2517 tmp.insert(ptr, pos);
2518
2519 ptr.p->m_state =
2520 Logfile_group::LG_EXEC_THREAD | Logfile_group::LG_READ_THREAD;
2521 }
2522 list = tmp;
2523
2524 execute_undo_record(signal);
2525 }
2526
2527 void
read_undo_log(Signal * signal,Ptr<Logfile_group> ptr)2528 Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr)
2529 {
2530 Uint32 cnt, free= ptr.p->m_free_buffer_words;
2531
2532 if(! (ptr.p->m_state & Logfile_group::LG_EXEC_THREAD))
2533 {
2534 jam();
2535 /**
2536 * Logfile_group is done...
2537 */
2538 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_READ_THREAD;
2539 stop_run_undo_log(signal);
2540 return;
2541 }
2542
2543 if(free <= File_formats::UNDO_PAGE_WORDS)
2544 {
2545 signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2546 signal->theData[1] = ptr.i;
2547 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
2548 return;
2549 }
2550
2551 Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
2552 Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
2553
2554 if(producer.m_current_page.m_idx == 0)
2555 {
2556 /**
2557 * zero pages left in range -> switch range
2558 */
2559 Lgman::Page_map::Iterator it;
2560 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
2561 Uint32 sz = map.getSize();
2562 Uint32 pos= (producer.m_current_page.m_ptr_i + sz - 2) % sz;
2563 map.position(it, pos);
2564 union {
2565 Uint32 _tmp[2];
2566 Lgman::Buffer_idx range;
2567 };
2568 _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
2569 producer.m_current_page.m_ptr_i = pos;
2570 producer.m_current_page.m_idx = range.m_idx;
2571 producer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx;
2572 }
2573
2574 if(producer.m_current_page.m_ptr_i == consumer.m_current_page.m_ptr_i &&
2575 producer.m_current_pos.m_ptr_i > consumer.m_current_pos.m_ptr_i)
2576 {
2577 Uint32 max=
2578 producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
2579 ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
2580 cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
2581 ndbrequire(cnt <= max);
2582 producer.m_current_pos.m_ptr_i -= cnt;
2583 producer.m_current_page.m_idx -= cnt;
2584 }
2585 else
2586 {
2587 Uint32 max= producer.m_current_page.m_idx;
2588 ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
2589 cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
2590 ndbrequire(cnt <= max);
2591 producer.m_current_pos.m_ptr_i -= cnt;
2592 producer.m_current_page.m_idx -= cnt;
2593 }
2594
2595 ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS);
2596 free -= (cnt * File_formats::UNDO_PAGE_WORDS);
2597 ptr.p->m_free_buffer_words = free;
2598 ptr.p->m_pos[PRODUCER] = producer;
2599
2600 signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2601 signal->theData[1] = ptr.i;
2602
2603 if(free > File_formats::UNDO_PAGE_WORDS)
2604 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2605 else
2606 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
2607 }
2608
2609 Uint32
read_undo_pages(Signal * signal,Ptr<Logfile_group> ptr,Uint32 pageId,Uint32 pages)2610 Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
2611 Uint32 pageId, Uint32 pages)
2612 {
2613 ndbrequire(pages);
2614 Ptr<Undofile> filePtr;
2615 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
2616 m_file_pool.getPtr(filePtr, tail.m_ptr_i);
2617
2618 if(filePtr.p->m_online.m_outstanding > 0)
2619 {
2620 jam();
2621 return 0;
2622 }
2623
2624 Uint32 max= tail.m_idx;
2625
2626 FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2627 req->filePointer = filePtr.p->m_fd;
2628 req->userReference = reference();
2629 req->userPointer = filePtr.i;
2630 req->operationFlag = 0;
2631 FsReadWriteReq::setFormatFlag(req->operationFlag,
2632 FsReadWriteReq::fsFormatSharedPage);
2633
2634
2635 if(max > pages)
2636 {
2637 jam();
2638 tail.m_idx -= pages;
2639
2640 req->varIndex = 1 + tail.m_idx;
2641 req->numberOfPages = pages;
2642 req->data.pageData[0] = pageId - pages;
2643 ptr.p->m_file_pos[TAIL] = tail;
2644
2645 if(DEBUG_UNDO_EXECUTION)
2646 ndbout_c("a reading from file: %d page(%d-%d) into (%d-%d)",
2647 ptr.i, 1 + tail.m_idx, 1+tail.m_idx+pages-1,
2648 pageId - pages, pageId - 1);
2649
2650 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2651 FsReadWriteReq::FixedLength + 1, JBA);
2652
2653 ptr.p->m_outstanding_fs++;
2654 filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
2655 filePtr.p->m_online.m_outstanding = pages;
2656 max = pages;
2657 }
2658 else
2659 {
2660 jam();
2661
2662 ndbrequire(tail.m_idx - max == 0);
2663 req->varIndex = 1;
2664 req->numberOfPages = max;
2665 req->data.pageData[0] = pageId - max;
2666
2667 if(DEBUG_UNDO_EXECUTION)
2668 ndbout_c("b reading from file: %d page(%d-%d) into (%d-%d)",
2669 ptr.i, 1 , 1+max-1,
2670 pageId - max, pageId - 1);
2671
2672 sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2673 FsReadWriteReq::FixedLength + 1, JBA);
2674
2675 ptr.p->m_outstanding_fs++;
2676 filePtr.p->m_online.m_outstanding = max;
2677 filePtr.p->m_state |= Undofile::FS_OUTSTANDING | Undofile::FS_MOVE_NEXT;
2678
2679 Ptr<Undofile> prev = filePtr;
2680 {
2681 Local_undofile_list files(m_file_pool, ptr.p->m_files);
2682 if(!files.prev(prev))
2683 {
2684 jam();
2685 files.last(prev);
2686 }
2687 }
2688 if(DEBUG_UNDO_EXECUTION)
2689 ndbout_c("changing file from %d to %d", filePtr.i, prev.i);
2690
2691 tail.m_idx= prev.p->m_file_size - 1;
2692 tail.m_ptr_i= prev.i;
2693 ptr.p->m_file_pos[TAIL] = tail;
2694 if(max < pages && filePtr.i != prev.i)
2695 max += read_undo_pages(signal, ptr, pageId - max, pages - max);
2696 }
2697
2698 return max;
2699
2700 }
2701
2702 void
execute_undo_record(Signal * signal)2703 Lgman::execute_undo_record(Signal* signal)
2704 {
2705 Uint64 lsn;
2706 const Uint32* ptr;
2707 Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
2708 if((ptr = get_next_undo_record(&lsn)))
2709 {
2710 Uint32 len= (* ptr) & 0xFFFF;
2711 Uint32 type= (* ptr) >> 16;
2712 Uint32 mask= type & ~(Uint32)File_formats::Undofile::UNDO_NEXT_LSN;
2713 switch(mask){
2714 case File_formats::Undofile::UNDO_END:
2715 stop_run_undo_log(signal);
2716 return;
2717 case File_formats::Undofile::UNDO_LCP:
2718 case File_formats::Undofile::UNDO_LCP_FIRST:
2719 {
2720 Uint32 lcp = * (ptr - len + 1);
2721 if(m_latest_lcp && lcp > m_latest_lcp)
2722 {
2723 if (0)
2724 {
2725 const Uint32 * base = ptr - len + 1;
2726 Uint32 lcp = base[0];
2727 Uint32 tableId = base[1] >> 16;
2728 Uint32 fragId = base[1] & 0xFFFF;
2729
2730 ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u",
2731 lcp, tableId, fragId);
2732 }
2733 }
2734
2735 if(m_latest_lcp == 0 ||
2736 lcp < m_latest_lcp ||
2737 (lcp == m_latest_lcp &&
2738 mask == File_formats::Undofile::UNDO_LCP_FIRST))
2739 {
2740 stop_run_undo_log(signal);
2741 return;
2742 }
2743 // Fallthrough
2744 }
2745 case File_formats::Undofile::UNDO_TUP_ALLOC:
2746 case File_formats::Undofile::UNDO_TUP_UPDATE:
2747 case File_formats::Undofile::UNDO_TUP_FREE:
2748 case File_formats::Undofile::UNDO_TUP_CREATE:
2749 case File_formats::Undofile::UNDO_TUP_DROP:
2750 case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
2751 case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
2752 tup->disk_restart_undo(signal, lsn, mask, ptr - len + 1, len);
2753 return;
2754 default:
2755 ndbrequire(false);
2756 }
2757 }
2758 signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
2759 sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB);
2760
2761 return;
2762 }
2763
2764 const Uint32*
get_next_undo_record(Uint64 * this_lsn)2765 Lgman::get_next_undo_record(Uint64 * this_lsn)
2766 {
2767 Ptr<Logfile_group> ptr;
2768 m_logfile_group_list.first(ptr);
2769
2770 Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
2771 Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
2772 if(producer.m_current_pos.m_idx < 2)
2773 {
2774 jam();
2775 /**
2776 * Wait for fetching pages...
2777 */
2778 return 0;
2779 }
2780
2781 Uint32 pos = consumer.m_current_pos.m_idx;
2782 Uint32 page = consumer.m_current_pos.m_ptr_i;
2783
2784 File_formats::Undofile::Undo_page* pageP=(File_formats::Undofile::Undo_page*)
2785 m_shared_page_pool.getPtr(page);
2786
2787 if(pos == 0)
2788 {
2789 /**
2790 * End of log
2791 */
2792 pageP->m_data[0] = (File_formats::Undofile::UNDO_END << 16) | 1 ;
2793 pageP->m_page_header.m_page_lsn_hi = 0;
2794 pageP->m_page_header.m_page_lsn_lo = 0;
2795 pos= consumer.m_current_pos.m_idx= pageP->m_words_used = 1;
2796 this_lsn = 0;
2797 return pageP->m_data;
2798 }
2799
2800 Uint32 *record= pageP->m_data + pos - 1;
2801 Uint32 len= (* record) & 0xFFFF;
2802 ndbrequire(len);
2803 Uint32 *prev= record - len;
2804 Uint64 lsn = 0;
2805
2806 // Same page
2807 if(((* record) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
2808 {
2809 lsn = ptr.p->m_last_read_lsn - 1;
2810 ndbrequire((Int64)lsn >= 0);
2811 }
2812 else
2813 {
2814 ndbrequire(pos >= 3);
2815 lsn += * (prev - 1); lsn <<= 32;
2816 lsn += * (prev - 0);
2817 len += 2;
2818 ndbrequire((Int64)lsn >= 0);
2819 }
2820
2821
2822 ndbrequire(pos >= len);
2823
2824 if(pos == len)
2825 {
2826 /**
2827 * Switching page
2828 */
2829 ndbrequire(producer.m_current_pos.m_idx);
2830 ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --;
2831
2832 if(consumer.m_current_page.m_idx)
2833 {
2834 consumer.m_current_page.m_idx--; // left in range
2835 consumer.m_current_pos.m_ptr_i --; // page
2836 }
2837 else
2838 {
2839 // 0 pages left in range...switch range
2840 Lgman::Page_map::Iterator it;
2841 Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
2842 Uint32 sz = map.getSize();
2843 Uint32 tmp = (consumer.m_current_page.m_ptr_i + sz - 2) % sz;
2844
2845 map.position(it, tmp);
2846 union {
2847 Uint32 _tmp[2];
2848 Lgman::Buffer_idx range;
2849 };
2850
2851 _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
2852
2853 consumer.m_current_page.m_idx = range.m_idx - 1; // left in range
2854 consumer.m_current_page.m_ptr_i = tmp; // pos in map
2855
2856 consumer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx - 1; // page
2857 }
2858
2859 if(DEBUG_UNDO_EXECUTION)
2860 ndbout_c("reading from %d", consumer.m_current_pos.m_ptr_i);
2861
2862 pageP=(File_formats::Undofile::Undo_page*)
2863 m_shared_page_pool.getPtr(consumer.m_current_pos.m_ptr_i);
2864
2865 pos= consumer.m_current_pos.m_idx= pageP->m_words_used;
2866
2867 Uint64 tmp = 0;
2868 tmp += pageP->m_page_header.m_page_lsn_hi; tmp <<= 32;
2869 tmp += pageP->m_page_header.m_page_lsn_lo;
2870
2871 prev = pageP->m_data + pos - 1;
2872
2873 if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
2874 ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn);
2875
2876 ptr.p->m_pos[CONSUMER] = consumer;
2877 ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS;
2878 }
2879 else
2880 {
2881 ptr.p->m_pos[CONSUMER].m_current_pos.m_idx -= len;
2882 }
2883
2884 * this_lsn = ptr.p->m_last_read_lsn = lsn;
2885
2886 /**
2887 * Re-sort log file groups
2888 */
2889 Ptr<Logfile_group> sort = ptr;
2890 if(m_logfile_group_list.next(sort))
2891 {
2892 while(!sort.isNull() && sort.p->m_last_read_lsn > lsn)
2893 m_logfile_group_list.next(sort);
2894
2895 if(sort.i != ptr.p->nextList)
2896 {
2897 m_logfile_group_list.remove(ptr);
2898 if(sort.isNull())
2899 m_logfile_group_list.add(ptr);
2900 else
2901 m_logfile_group_list.insert(ptr, sort);
2902 }
2903 }
2904 return record;
2905 }
2906
2907 void
stop_run_undo_log(Signal * signal)2908 Lgman::stop_run_undo_log(Signal* signal)
2909 {
2910 bool running = false, outstanding = false;
2911 Ptr<Logfile_group> ptr;
2912 m_logfile_group_list.first(ptr);
2913 while(!ptr.isNull())
2914 {
2915 /**
2916 * Mark exec thread as completed
2917 */
2918 ptr.p->m_state &= ~(Uint32)Logfile_group::LG_EXEC_THREAD;
2919
2920 if(ptr.p->m_state & Logfile_group::LG_READ_THREAD)
2921 {
2922 /**
2923 * Thread is still running...wait for it to complete
2924 */
2925 running = true;
2926 }
2927 else if(ptr.p->m_outstanding_fs)
2928 {
2929 outstanding = true; // a FSREADREQ is outstanding...wait for it
2930 }
2931 else if(ptr.p->m_state != Logfile_group::LG_ONLINE)
2932 {
2933 /**
2934 * Fix log TAIL
2935 */
2936 ndbrequire(ptr.p->m_state == 0);
2937 ptr.p->m_state = Logfile_group::LG_ONLINE;
2938 Buffer_idx tail= ptr.p->m_file_pos[TAIL];
2939 Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
2940
2941 while(pages)
2942 {
2943 Ptr<Undofile> file;
2944 m_file_pool.getPtr(file, tail.m_ptr_i);
2945 Uint32 page= tail.m_idx;
2946 Uint32 size= file.p->m_file_size;
2947 ndbrequire(size >= page);
2948 Uint32 diff= size - page;
2949
2950 if(pages >= diff)
2951 {
2952 pages -= diff;
2953 Local_undofile_list files(m_file_pool, ptr.p->m_files);
2954 if(!files.next(file))
2955 files.first(file);
2956 tail.m_idx = 1;
2957 tail.m_ptr_i= file.i;
2958 }
2959 else
2960 {
2961 tail.m_idx += pages;
2962 pages= 0;
2963 }
2964 }
2965 ptr.p->m_tail_pos[0] = tail;
2966 ptr.p->m_tail_pos[1] = tail;
2967 ptr.p->m_tail_pos[2] = tail;
2968 ptr.p->m_file_pos[TAIL] = tail;
2969
2970 init_logbuffer_pointers(ptr);
2971
2972 {
2973 Buffer_idx head= ptr.p->m_file_pos[HEAD];
2974 Ptr<Undofile> file;
2975 m_file_pool.getPtr(file, head.m_ptr_i);
2976 if (head.m_idx == file.p->m_file_size - 1)
2977 {
2978 Local_undofile_list files(m_file_pool, ptr.p->m_files);
2979 if(!files.next(file))
2980 {
2981 jam();
2982 files.first(file);
2983 }
2984 head.m_idx = 0;
2985 head.m_ptr_i = file.i;
2986 ptr.p->m_file_pos[HEAD] = head;
2987 }
2988 }
2989
2990 ptr.p->m_free_file_words = (Uint64)File_formats::UNDO_PAGE_WORDS *
2991 (Uint64)compute_free_file_pages(ptr);
2992 ptr.p->m_next_reply_ptr_i = ptr.p->m_file_pos[HEAD].m_ptr_i;
2993
2994 ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
2995 signal->theData[0] = LgmanContinueB::FLUSH_LOG;
2996 signal->theData[1] = ptr.i;
2997 signal->theData[2] = 0;
2998 sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
2999
3000 if(1)
3001 {
3002 SimulatedBlock* fs = globalData.getBlock(NDBFS);
3003 Ptr<Undofile> hf, tf;
3004 m_file_pool.getPtr(tf, tail.m_ptr_i);
3005 m_file_pool.getPtr(hf, ptr.p->m_file_pos[HEAD].m_ptr_i);
3006 infoEvent("Logfile group: %d ", ptr.p->m_logfile_group_id);
3007 g_eventLogger.info("Logfile group: %d ", ptr.p->m_logfile_group_id);
3008 infoEvent(" head: %s page: %d",
3009 fs->get_filename(hf.p->m_fd),
3010 ptr.p->m_file_pos[HEAD].m_idx);
3011 g_eventLogger.info(" head: %s page: %d",
3012 fs->get_filename(hf.p->m_fd),
3013 ptr.p->m_file_pos[HEAD].m_idx);
3014 infoEvent(" tail: %s page: %d",
3015 fs->get_filename(tf.p->m_fd), tail.m_idx);
3016 g_eventLogger.info(" tail: %s page: %d",
3017 fs->get_filename(tf.p->m_fd), tail.m_idx);
3018 }
3019 }
3020
3021 m_logfile_group_list.next(ptr);
3022 }
3023
3024 if(running)
3025 {
3026 jam();
3027 return;
3028 }
3029
3030 if(outstanding)
3031 {
3032 jam();
3033 signal->theData[0] = LgmanContinueB::STOP_UNDO_LOG;
3034 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 1);
3035 return;
3036 }
3037
3038 infoEvent("Flushing page cache after undo completion");
3039 g_eventLogger.info("Flushing page cache after undo completion");
3040
3041 /**
3042 * Start flushing pages (local, LCP)
3043 */
3044 LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
3045 ord->lcpId = m_latest_lcp;
3046 sendSignal(PGMAN_REF, GSN_LCP_FRAG_ORD, signal,
3047 LcpFragOrd::SignalLength, JBB);
3048
3049 EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
3050 req->senderRef = reference();
3051 sendSignal(PGMAN_REF, GSN_END_LCP_REQ, signal,
3052 EndLcpReq::SignalLength, JBB);
3053 }
3054
3055 void
execEND_LCP_CONF(Signal * signal)3056 Lgman::execEND_LCP_CONF(Signal* signal)
3057 {
3058 Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
3059 tup->disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0);
3060
3061 /**
3062 * pgman has completed flushing all pages
3063 *
3064 * insert "fake" LCP record preventing undo to be "rerun"
3065 */
3066 Uint32 undo[3];
3067 undo[0] = m_latest_lcp;
3068 undo[1] = (0 << 16) | 0;
3069 undo[2] = (File_formats::Undofile::UNDO_LCP_FIRST << 16 )
3070 | (sizeof(undo) >> 2);
3071
3072 Ptr<Logfile_group> ptr;
3073 ndbrequire(m_logfile_group_list.first(ptr));
3074
3075 Uint64 last_lsn= m_last_lsn;
3076 if(ptr.p->m_last_lsn == last_lsn
3077 #ifdef VM_TRACE
3078 && ((rand() % 100) > 50)
3079 #endif
3080 )
3081 {
3082 undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
3083 Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
3084 memcpy(dst, undo, sizeof(undo));
3085 ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
3086 ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
3087 }
3088 else
3089 {
3090 Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
3091 * dst++ = last_lsn >> 32;
3092 * dst++ = last_lsn & 0xFFFFFFFF;
3093 memcpy(dst, undo, sizeof(undo));
3094 ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
3095 ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
3096 }
3097 m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
3098
3099 ptr.p->m_last_synced_lsn = last_lsn;
3100 while(m_logfile_group_list.next(ptr))
3101 ptr.p->m_last_synced_lsn = last_lsn;
3102
3103 infoEvent("Flushing complete");
3104 g_eventLogger.info("Flushing complete");
3105
3106 signal->theData[0] = reference();
3107 sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
3108 }
3109
3110 #ifdef VM_TRACE
3111 void
validate_logfile_group(Ptr<Logfile_group> ptr,const char * heading)3112 Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading)
3113 {
3114 do
3115 {
3116 if (ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL)
3117 break;
3118
3119 Uint32 pages = compute_free_file_pages(ptr);
3120
3121 Uint32 group_pages =
3122 ((ptr.p->m_free_file_words + File_formats::UNDO_PAGE_WORDS - 1)/ File_formats::UNDO_PAGE_WORDS) ;
3123 Uint32 last = ptr.p->m_free_file_words % File_formats::UNDO_PAGE_WORDS;
3124
3125 if(! (pages >= group_pages))
3126 {
3127 ndbout << heading << " Tail: " << ptr.p->m_file_pos[TAIL]
3128 << " Head: " << ptr.p->m_file_pos[HEAD]
3129 << " free: " << group_pages << "(" << last << ")"
3130 << " found: " << pages;
3131 for(Uint32 i = 0; i<3; i++)
3132 {
3133 ndbout << " - " << ptr.p->m_tail_pos[i];
3134 }
3135 ndbout << endl;
3136
3137 ndbrequire(pages >= group_pages);
3138 }
3139 } while(0);
3140 }
3141 #endif
3142
execGET_TABINFOREQ(Signal * signal)3143 void Lgman::execGET_TABINFOREQ(Signal* signal)
3144 {
3145 jamEntry();
3146
3147 if(!assembleFragments(signal))
3148 {
3149 return;
3150 }
3151
3152 GetTabInfoReq * const req = (GetTabInfoReq *)&signal->theData[0];
3153
3154 const Uint32 reqType = req->requestType & (~GetTabInfoReq::LongSignalConf);
3155 BlockReference retRef= req->senderRef;
3156 Uint32 senderData= req->senderData;
3157 Uint32 tableId= req->tableId;
3158
3159 if(reqType == GetTabInfoReq::RequestByName){
3160 jam();
3161 if(signal->getNoOfSections())
3162 releaseSections(signal);
3163
3164 sendGET_TABINFOREF(signal, req, GetTabInfoRef::NoFetchByName);
3165 return;
3166 }
3167
3168 Logfile_group key;
3169 key.m_logfile_group_id= tableId;
3170 Ptr<Logfile_group> ptr;
3171 m_logfile_group_hash.find(ptr, key);
3172
3173 if(ptr.p->m_logfile_group_id != tableId)
3174 {
3175 jam();
3176 if(signal->getNoOfSections())
3177 releaseSections(signal);
3178
3179 sendGET_TABINFOREF(signal, req, GetTabInfoRef::InvalidTableId);
3180 return;
3181 }
3182
3183
3184 GetTabInfoConf *conf = (GetTabInfoConf *)&signal->theData[0];
3185
3186 conf->senderData= senderData;
3187 conf->tableId= tableId;
3188 conf->freeWordsHi= ptr.p->m_free_file_words >> 32;
3189 conf->freeWordsLo= ptr.p->m_free_file_words & 0xFFFFFFFF;
3190 conf->tableType= DictTabInfo::LogfileGroup;
3191 conf->senderRef= reference();
3192 sendSignal(retRef, GSN_GET_TABINFO_CONF, signal,
3193 GetTabInfoConf::SignalLength, JBB);
3194 }
3195
sendGET_TABINFOREF(Signal * signal,GetTabInfoReq * req,GetTabInfoRef::ErrorCode errorCode)3196 void Lgman::sendGET_TABINFOREF(Signal* signal,
3197 GetTabInfoReq * req,
3198 GetTabInfoRef::ErrorCode errorCode)
3199 {
3200 jamEntry();
3201 GetTabInfoRef * const ref = (GetTabInfoRef *)&signal->theData[0];
3202 /**
3203 * The format of GetTabInfo Req/Ref is the same
3204 */
3205 BlockReference retRef = req->senderRef;
3206 ref->errorCode = errorCode;
3207
3208 sendSignal(retRef, GSN_GET_TABINFOREF, signal, signal->length(), JBB);
3209 }
3210