1 /*
2    Copyright (c) 2005, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
17 */
18 
19 #include "lgman.hpp"
20 #include "diskpage.hpp"
21 #include <signaldata/FsRef.hpp>
22 #include <signaldata/FsConf.hpp>
23 #include <signaldata/FsOpenReq.hpp>
24 #include <signaldata/FsCloseReq.hpp>
25 #include <signaldata/CreateFilegroupImpl.hpp>
26 #include <signaldata/DropFilegroupImpl.hpp>
27 #include <signaldata/FsReadWriteReq.hpp>
28 #include <signaldata/LCP.hpp>
29 #include <signaldata/SumaImpl.hpp>
30 #include <signaldata/LgmanContinueB.hpp>
31 #include <signaldata/GetTabInfo.hpp>
32 #include "ndbfs/Ndbfs.hpp"
33 #include "dbtup/Dbtup.hpp"
34 
35 #include <EventLogger.hpp>
36 extern EventLogger g_eventLogger;
37 
38 #include <record_types.hpp>
39 
40 /**
41  * ---<a>-----<b>-----<c>-----<d>---> (time)
42  *
43  * <a> = start of lcp 1
44  * <b> = stop of lcp 1
45  * <c> = start of lcp 2
46  * <d> = stop of lcp 2
47  *
48  * If ndb crashes before <d>
49  *   the entire undo log from crash point until <a> has to be applied
50  *
51  * at <d> the undo log can be cut til <c>
52  */
53 
54 #define DEBUG_UNDO_EXECUTION 0
55 #define DEBUG_SEARCH_LOG_HEAD 0
56 
Lgman(Block_context & ctx)57 Lgman::Lgman(Block_context & ctx) :
58   SimulatedBlock(LGMAN, ctx),
59   m_logfile_group_list(m_logfile_group_pool),
60   m_logfile_group_hash(m_logfile_group_pool)
61 {
62   BLOCK_CONSTRUCTOR(Lgman);
63 
64   // Add received signals
65   addRecSignal(GSN_STTOR, &Lgman::execSTTOR);
66   addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ);
67   addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD);
68   addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB);
69 
70   addRecSignal(GSN_CREATE_FILE_REQ, &Lgman::execCREATE_FILE_REQ);
71   addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Lgman::execCREATE_FILEGROUP_REQ);
72 
73   addRecSignal(GSN_DROP_FILE_REQ, &Lgman::execDROP_FILE_REQ);
74   addRecSignal(GSN_DROP_FILEGROUP_REQ, &Lgman::execDROP_FILEGROUP_REQ);
75 
76   addRecSignal(GSN_FSWRITEREQ, &Lgman::execFSWRITEREQ);
77   addRecSignal(GSN_FSWRITEREF, &Lgman::execFSWRITEREF, true);
78   addRecSignal(GSN_FSWRITECONF, &Lgman::execFSWRITECONF);
79 
80   addRecSignal(GSN_FSOPENREF, &Lgman::execFSOPENREF, true);
81   addRecSignal(GSN_FSOPENCONF, &Lgman::execFSOPENCONF);
82 
83   addRecSignal(GSN_FSCLOSECONF, &Lgman::execFSCLOSECONF);
84 
85   addRecSignal(GSN_FSREADREF, &Lgman::execFSREADREF, true);
86   addRecSignal(GSN_FSREADCONF, &Lgman::execFSREADCONF);
87 
88   addRecSignal(GSN_LCP_FRAG_ORD, &Lgman::execLCP_FRAG_ORD);
89   addRecSignal(GSN_END_LCP_REQ, &Lgman::execEND_LCP_REQ);
90   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Lgman::execSUB_GCP_COMPLETE_REP);
91   addRecSignal(GSN_START_RECREQ, &Lgman::execSTART_RECREQ);
92 
93   addRecSignal(GSN_END_LCP_CONF, &Lgman::execEND_LCP_CONF);
94 
95   addRecSignal(GSN_GET_TABINFOREQ, &Lgman::execGET_TABINFOREQ);
96 
97   m_last_lsn = 1;
98   m_logfile_group_hash.setSize(10);
99 }
100 
~Lgman()101 Lgman::~Lgman()
102 {
103 }
104 
BLOCK_FUNCTIONS(Lgman)105 BLOCK_FUNCTIONS(Lgman)
106 
107 void
108 Lgman::execREAD_CONFIG_REQ(Signal* signal)
109 {
110   jamEntry();
111 
112   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
113 
114   Uint32 ref = req->senderRef;
115   Uint32 senderData = req->senderData;
116 
117   const ndb_mgm_configuration_iterator * p =
118     m_ctx.m_config.getOwnConfigIterator();
119   ndbrequire(p != 0);
120 
121   Pool_context pc;
122   pc.m_block = this;
123   m_log_waiter_pool.wo_pool_init(RT_LGMAN_LOG_WAITER, pc);
124   m_file_pool.init(RT_LGMAN_FILE, pc);
125   m_logfile_group_pool.init(RT_LGMAN_FILEGROUP, pc);
126   m_data_buffer_pool.setSize(10);
127 
128   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
129   conf->senderRef = reference();
130   conf->senderData = senderData;
131   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
132 	     ReadConfigConf::SignalLength, JBB);
133 }
134 
135 void
execSTTOR(Signal * signal)136 Lgman::execSTTOR(Signal* signal)
137 {
138   jamEntry();
139   sendSTTORRY(signal);
140 
141   return;
142 }//Lgman::execNDB_STTOR()
143 
144 void
sendSTTORRY(Signal * signal)145 Lgman::sendSTTORRY(Signal* signal)
146 {
147   signal->theData[0] = 0;
148   signal->theData[3] = 1;
149   signal->theData[4] = 2;
150   signal->theData[5] = 3;
151   signal->theData[6] = 4;
152   signal->theData[7] = 5;
153   signal->theData[8] = 6;
154   signal->theData[9] = 255; // No more start phases from missra
155   sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
156 }
157 
158 void
execCONTINUEB(Signal * signal)159 Lgman::execCONTINUEB(Signal* signal){
160   jamEntry();
161 
162   Uint32 type= signal->theData[0];
163   Uint32 ptrI = signal->theData[1];
164   switch(type){
165   case LgmanContinueB::FILTER_LOG:
166     jam();
167     break;
168   case LgmanContinueB::CUT_LOG_TAIL:
169   {
170     jam();
171     Ptr<Logfile_group> ptr;
172     m_logfile_group_pool.getPtr(ptr, ptrI);
173     cut_log_tail(signal, ptr);
174     return;
175   }
176   case LgmanContinueB::FLUSH_LOG:
177   {
178     jam();
179     Ptr<Logfile_group> ptr;
180     m_logfile_group_pool.getPtr(ptr, ptrI);
181     flush_log(signal, ptr, signal->theData[2]);
182     return;
183   }
184   case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
185   {
186     jam();
187     Ptr<Logfile_group> ptr;
188     m_logfile_group_pool.getPtr(ptr, ptrI);
189     process_log_buffer_waiters(signal, ptr);
190     return;
191   }
192   case LgmanContinueB::FIND_LOG_HEAD:
193     jam();
194     Ptr<Logfile_group> ptr;
195     if(ptrI != RNIL)
196     {
197       m_logfile_group_pool.getPtr(ptr, ptrI);
198       find_log_head(signal, ptr);
199     }
200     else
201     {
202       init_run_undo_log(signal);
203     }
204     return;
205   case LgmanContinueB::EXECUTE_UNDO_RECORD:
206     jam();
207     execute_undo_record(signal);
208     return;
209   case LgmanContinueB::STOP_UNDO_LOG:
210     jam();
211     stop_run_undo_log(signal);
212     return;
213   case LgmanContinueB::READ_UNDO_LOG:
214   {
215     jam();
216     Ptr<Logfile_group> ptr;
217     m_logfile_group_pool.getPtr(ptr, ptrI);
218     read_undo_log(signal, ptr);
219     return;
220   }
221   case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS:
222   {
223     jam();
224     Ptr<Logfile_group> ptr;
225     m_logfile_group_pool.getPtr(ptr, ptrI);
226     process_log_sync_waiters(signal, ptr);
227     return;
228   }
229   case LgmanContinueB::FORCE_LOG_SYNC:
230   {
231     jam();
232     Ptr<Logfile_group> ptr;
233     m_logfile_group_pool.getPtr(ptr, ptrI);
234     force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]);
235     return;
236   }
237   case LgmanContinueB::DROP_FILEGROUP:
238   {
239     jam();
240     Ptr<Logfile_group> ptr;
241     m_logfile_group_pool.getPtr(ptr, ptrI);
242     if (ptr.p->m_state & Logfile_group::LG_THREAD_MASK)
243     {
244       jam();
245       sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
246 			  signal->length());
247       return;
248     }
249     Uint32 ref = signal->theData[2];
250     Uint32 data = signal->theData[3];
251     drop_filegroup_drop_files(signal, ptr, ref, data);
252     return;
253   }
254   }
255 }
256 
257 void
execDUMP_STATE_ORD(Signal * signal)258 Lgman::execDUMP_STATE_ORD(Signal* signal){
259   jamEntry();
260   if(signal->theData[0] == 12001)
261   {
262     Ptr<Logfile_group> ptr;
263     m_logfile_group_list.first(ptr);
264     while(!ptr.isNull())
265     {
266       infoEvent("lfg %d state: %x fs: %d lsn "
267 		"[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d %d",
268 		ptr.p->m_logfile_group_id, ptr.p->m_state,
269 		ptr.p->m_outstanding_fs,
270 		ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn,
271 		ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn,
272 		!ptr.p->m_log_buffer_waiters.isEmpty(),
273 		!ptr.p->m_log_sync_waiters.isEmpty());
274       if (!ptr.p->m_log_buffer_waiters.isEmpty())
275       {
276 	Ptr<Log_waiter> waiter;
277 	Local_log_waiter_list
278 	  list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
279 	list.first(waiter);
280 	infoEvent("  free_buffer_words: %d head(waiters).sz: %d %d",
281 		  ptr.p->m_free_buffer_words,
282 		  waiter.p->m_size,
283 		  2*File_formats::UNDO_PAGE_WORDS);
284       }
285       if (!ptr.p->m_log_sync_waiters.isEmpty())
286       {
287 	Ptr<Log_waiter> waiter;
288 	Local_log_waiter_list
289 	  list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
290 	list.first(waiter);
291 	infoEvent("  m_last_synced_lsn: %lld head(waiters %x).m_sync_lsn: %lld",
292 		  ptr.p->m_last_synced_lsn,
293 		  waiter.i,
294 		  waiter.p->m_sync_lsn);
295 
296 	while(!waiter.isNull())
297 	{
298 	  ndbout_c("ptr: %x %p lsn: %lld next: %x",
299 		   waiter.i, waiter.p, waiter.p->m_sync_lsn, waiter.p->nextList);
300 	  list.next(waiter);
301 	}
302       }
303       m_logfile_group_list.next(ptr);
304     }
305   }
306 }
307 
308 void
execCREATE_FILEGROUP_REQ(Signal * signal)309 Lgman::execCREATE_FILEGROUP_REQ(Signal* signal){
310   jamEntry();
311   CreateFilegroupImplReq* req= (CreateFilegroupImplReq*)signal->getDataPtr();
312 
313   Uint32 senderRef = req->senderRef;
314   Uint32 senderData = req->senderData;
315 
316   Ptr<Logfile_group> ptr;
317   CreateFilegroupImplRef::ErrorCode err = CreateFilegroupImplRef::NoError;
318   do {
319     if (m_logfile_group_hash.find(ptr, req->filegroup_id))
320     {
321       jam();
322       err = CreateFilegroupImplRef::FilegroupAlreadyExists;
323       break;
324     }
325 
326     if (!m_logfile_group_list.isEmpty())
327     {
328       jam();
329       err = CreateFilegroupImplRef::OneLogfileGroupLimit;
330       break;
331     }
332 
333     if (!m_logfile_group_pool.seize(ptr))
334     {
335       jam();
336       err = CreateFilegroupImplRef::OutOfFilegroupRecords;
337       break;
338     }
339 
340     new (ptr.p) Logfile_group(req);
341 
342     if (!alloc_logbuffer_memory(ptr, req->logfile_group.buffer_size))
343     {
344       jam();
345       err= CreateFilegroupImplRef::OutOfLogBufferMemory;
346       m_logfile_group_pool.release(ptr);
347       break;
348     }
349 
350     m_logfile_group_hash.add(ptr);
351     m_logfile_group_list.add(ptr);
352 
353     if (getNodeState().getNodeRestartInProgress() ||
354         getNodeState().getSystemRestartInProgress())
355     {
356       ptr.p->m_state = Logfile_group::LG_STARTING;
357     }
358 
359     CreateFilegroupImplConf* conf=
360       (CreateFilegroupImplConf*)signal->getDataPtr();
361     conf->senderData = senderData;
362     conf->senderRef = reference();
363     sendSignal(senderRef, GSN_CREATE_FILEGROUP_CONF, signal,
364 	       CreateFilegroupImplConf::SignalLength, JBB);
365 
366     return;
367   } while(0);
368 
369   CreateFilegroupImplRef* ref= (CreateFilegroupImplRef*)signal->getDataPtr();
370   ref->senderData = senderData;
371   ref->senderRef = reference();
372   ref->errorCode = err;
373   sendSignal(senderRef, GSN_CREATE_FILEGROUP_REF, signal,
374 	     CreateFilegroupImplRef::SignalLength, JBB);
375 }
376 
377 void
execDROP_FILEGROUP_REQ(Signal * signal)378 Lgman::execDROP_FILEGROUP_REQ(Signal* signal)
379 {
380   jamEntry();
381 
382   Uint32 errorCode = 0;
383   DropFilegroupImplReq req = *(DropFilegroupImplReq*)signal->getDataPtr();
384   do
385   {
386     Ptr<Logfile_group> ptr;
387     if (!m_logfile_group_hash.find(ptr, req.filegroup_id))
388     {
389       errorCode = DropFilegroupImplRef::NoSuchFilegroup;
390       break;
391     }
392 
393     if (ptr.p->m_version != req.filegroup_version)
394     {
395       errorCode = DropFilegroupImplRef::InvalidFilegroupVersion;
396       break;
397     }
398 
399     switch(req.requestInfo){
400     case DropFilegroupImplReq::Prepare:
401       break;
402     case DropFilegroupImplReq::Commit:
403       m_logfile_group_list.remove(ptr);
404       ptr.p->m_state |= Logfile_group::LG_DROPPING;
405       signal->theData[0] = LgmanContinueB::DROP_FILEGROUP;
406       signal->theData[1] = ptr.i;
407       signal->theData[2] = req.senderRef;
408       signal->theData[3] = req.senderData;
409       sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
410       return;
411     case DropFilegroupImplReq::Abort:
412       break;
413     default:
414       ndbrequire(false);
415     }
416   } while(0);
417 
418   if (errorCode)
419   {
420     DropFilegroupImplRef* ref =
421       (DropFilegroupImplRef*)signal->getDataPtrSend();
422     ref->senderRef = reference();
423     ref->senderData = req.senderData;
424     ref->errorCode = errorCode;
425     sendSignal(req.senderRef, GSN_DROP_FILEGROUP_REF, signal,
426 	       DropFilegroupImplRef::SignalLength, JBB);
427   }
428   else
429   {
430     DropFilegroupImplConf* conf =
431       (DropFilegroupImplConf*)signal->getDataPtrSend();
432     conf->senderRef = reference();
433     conf->senderData = req.senderData;
434     sendSignal(req.senderRef, GSN_DROP_FILEGROUP_CONF, signal,
435 	       DropFilegroupImplConf::SignalLength, JBB);
436   }
437 }
438 
439 void
drop_filegroup_drop_files(Signal * signal,Ptr<Logfile_group> ptr,Uint32 ref,Uint32 data)440 Lgman::drop_filegroup_drop_files(Signal* signal,
441 				 Ptr<Logfile_group> ptr,
442 				 Uint32 ref, Uint32 data)
443 {
444   jam();
445   ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
446   ndbrequire(ptr.p->m_outstanding_fs == 0);
447 
448   Local_undofile_list list(m_file_pool, ptr.p->m_files);
449   Ptr<Undofile> file_ptr;
450 
451   if (list.first(file_ptr))
452   {
453     jam();
454     ndbrequire(! (file_ptr.p->m_state & Undofile::FS_OUTSTANDING));
455     file_ptr.p->m_create.m_senderRef = ref;
456     file_ptr.p->m_create.m_senderData = data;
457     create_file_abort(signal, ptr, file_ptr);
458     return;
459   }
460 
461   Local_undofile_list metalist(m_file_pool, ptr.p->m_meta_files);
462   if (metalist.first(file_ptr))
463   {
464     jam();
465     metalist.remove(file_ptr);
466     list.add(file_ptr);
467     file_ptr.p->m_create.m_senderRef = ref;
468     file_ptr.p->m_create.m_senderData = data;
469     create_file_abort(signal, ptr, file_ptr);
470     return;
471   }
472 
473   free_logbuffer_memory(ptr);
474   m_logfile_group_hash.release(ptr);
475   DropFilegroupImplConf *conf = (DropFilegroupImplConf*)signal->getDataPtr();
476   conf->senderData = data;
477   conf->senderRef = reference();
478   sendSignal(ref, GSN_DROP_FILEGROUP_CONF, signal,
479 	     DropFilegroupImplConf::SignalLength, JBB);
480 }
481 
482 void
execCREATE_FILE_REQ(Signal * signal)483 Lgman::execCREATE_FILE_REQ(Signal* signal)
484 {
485   jamEntry();
486   CreateFileImplReq* req= (CreateFileImplReq*)signal->getDataPtr();
487 
488   Uint32 senderRef = req->senderRef;
489   Uint32 senderData = req->senderData;
490   Uint32 requestInfo = req->requestInfo;
491 
492   Ptr<Logfile_group> ptr;
493   CreateFileImplRef::ErrorCode err = CreateFileImplRef::NoError;
494   do {
495     if (!m_logfile_group_hash.find(ptr, req->filegroup_id))
496     {
497       jam();
498       err = CreateFileImplRef::InvalidFilegroup;
499       break;
500     }
501 
502     if (ptr.p->m_version != req->filegroup_version)
503     {
504       jam();
505       err = CreateFileImplRef::InvalidFilegroupVersion;
506       break;
507     }
508 
509     Ptr<Undofile> file_ptr;
510     switch(requestInfo){
511     case CreateFileImplReq::Commit:
512     {
513       jam();
514       ndbrequire(find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id));
515       file_ptr.p->m_create.m_senderRef = req->senderRef;
516       file_ptr.p->m_create.m_senderData = req->senderData;
517       create_file_commit(signal, ptr, file_ptr);
518       return;
519     }
520     case CreateFileImplReq::Abort:
521     {
522       Uint32 senderRef = req->senderRef;
523       Uint32 senderData = req->senderData;
524       if (find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id))
525       {
526         jam();
527 	file_ptr.p->m_create.m_senderRef = senderRef;
528 	file_ptr.p->m_create.m_senderData = senderData;
529 	create_file_abort(signal, ptr, file_ptr);
530       }
531       else
532       {
533 	CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
534         jam();
535 	conf->senderData = senderData;
536 	conf->senderRef = reference();
537 	sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal,
538 		   CreateFileImplConf::SignalLength, JBB);
539       }
540       return;
541     }
542     default: // prepare
543       break;
544     }
545 
546     if (!m_file_pool.seize(file_ptr))
547     {
548       jam();
549       err = CreateFileImplRef::OutOfFileRecords;
550       break;
551     }
552 
553     if(ERROR_INSERTED(15000) ||
554        (sizeof(void*) == 4 && req->file_size_hi & 0xFFFFFFFF))
555     {
556       jam();
557       if(signal->getNoOfSections())
558         releaseSections(signal);
559 
560       CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
561       ref->senderData = senderData;
562       ref->senderRef = reference();
563       ref->errorCode = CreateFileImplRef::FileSizeTooLarge;
564       sendSignal(senderRef, GSN_CREATE_FILE_REF, signal,
565                  CreateFileImplRef::SignalLength, JBB);
566       return;
567     }
568 
569     new (file_ptr.p) Undofile(req, ptr.i);
570 
571     Local_undofile_list tmp(m_file_pool, ptr.p->m_meta_files);
572     tmp.add(file_ptr);
573 
574     open_file(signal, file_ptr, req->requestInfo);
575     return;
576   } while(0);
577 
578   CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
579   ref->senderData = senderData;
580   ref->senderRef = reference();
581   ref->errorCode = err;
582   sendSignal(senderRef, GSN_CREATE_FILE_REF, signal,
583 	     CreateFileImplRef::SignalLength, JBB);
584 }
585 
586 void
open_file(Signal * signal,Ptr<Undofile> ptr,Uint32 requestInfo)587 Lgman::open_file(Signal* signal, Ptr<Undofile> ptr, Uint32 requestInfo)
588 {
589   FsOpenReq* req = (FsOpenReq*)signal->getDataPtrSend();
590   req->userReference = reference();
591   req->userPointer = ptr.i;
592 
593   memset(req->fileNumber, 0, sizeof(req->fileNumber));
594   FsOpenReq::setVersion(req->fileNumber, 4); // Version 4 = specified filename
595 
596   req->fileFlags = 0;
597   req->fileFlags |= FsOpenReq::OM_READWRITE;
598   req->fileFlags |= FsOpenReq::OM_DIRECT;
599   req->fileFlags |= FsOpenReq::OM_SYNC;
600   switch(requestInfo){
601   case CreateFileImplReq::Create:
602     req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;
603     req->fileFlags |= FsOpenReq::OM_INIT;
604     ptr.p->m_state = Undofile::FS_CREATING;
605     break;
606   case CreateFileImplReq::CreateForce:
607     req->fileFlags |= FsOpenReq::OM_CREATE;
608     req->fileFlags |= FsOpenReq::OM_INIT;
609     ptr.p->m_state = Undofile::FS_CREATING;
610     break;
611   case CreateFileImplReq::Open:
612     req->fileFlags |= FsOpenReq::OM_CHECK_SIZE;
613     ptr.p->m_state = Undofile::FS_OPENING;
614     break;
615   default:
616     ndbrequire(false);
617   }
618 
619   req->page_size = File_formats::NDB_PAGE_SIZE;
620   Uint64 size = (Uint64)ptr.p->m_file_size * (Uint64)File_formats::NDB_PAGE_SIZE;
621   req->file_size_hi = size >> 32;
622   req->file_size_lo = size & 0xFFFFFFFF;
623 
624   // Forward filename
625   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBB);
626 }
627 
628 void
execFSWRITEREQ(Signal * signal)629 Lgman::execFSWRITEREQ(Signal* signal)
630 {
631   jamEntry();
632   Ptr<Undofile> ptr;
633   Ptr<GlobalPage> page_ptr;
634   FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtr();
635 
636   m_file_pool.getPtr(ptr, req->userPointer);
637   m_global_page_pool.getPtr(page_ptr, req->data.pageData[0]);
638 
639   if (req->varIndex == 0)
640   {
641     jam();
642     File_formats::Undofile::Zero_page* page =
643       (File_formats::Undofile::Zero_page*)page_ptr.p;
644     page->m_page_header.init(File_formats::FT_Undofile,
645 			     getOwnNodeId(),
646 			     ndbGetOwnVersion(),
647 			     time(0));
648     page->m_file_id = ptr.p->m_file_id;
649     page->m_logfile_group_id = ptr.p->m_create.m_logfile_group_id;
650     page->m_logfile_group_version = ptr.p->m_create.m_logfile_group_version;
651     page->m_undo_pages = ptr.p->m_file_size - 1; // minus zero page
652   }
653   else
654   {
655     jam();
656     File_formats::Undofile::Undo_page* page =
657       (File_formats::Undofile::Undo_page*)page_ptr.p;
658     page->m_page_header.m_page_lsn_hi = 0;
659     page->m_page_header.m_page_lsn_lo = 0;
660     page->m_page_header.m_page_type = File_formats::PT_Undopage;
661     page->m_words_used = 0;
662   }
663 }
664 
665 void
execFSOPENREF(Signal * signal)666 Lgman::execFSOPENREF(Signal* signal)
667 {
668   jamEntry();
669 
670   Ptr<Undofile> ptr;
671   Ptr<Logfile_group> lg_ptr;
672   FsRef* ref = (FsRef*)signal->getDataPtr();
673 
674   Uint32 errCode = ref->errorCode;
675   Uint32 osErrCode = ref->osErrorCode;
676 
677   m_file_pool.getPtr(ptr, ref->userPointer);
678   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
679 
680   {
681     CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
682     ref->senderData = ptr.p->m_create.m_senderData;
683     ref->senderRef = reference();
684     ref->errorCode = CreateFileImplRef::FileError;
685     ref->fsErrCode = errCode;
686     ref->osErrCode = osErrCode;
687     sendSignal(ptr.p->m_create.m_senderRef, GSN_CREATE_FILE_REF, signal,
688 	       CreateFileImplRef::SignalLength, JBB);
689   }
690 
691   Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
692   meta.release(ptr);
693 }
694 
695 #define HEAD 0
696 #define TAIL 1
697 
698 void
execFSOPENCONF(Signal * signal)699 Lgman::execFSOPENCONF(Signal* signal)
700 {
701   jamEntry();
702   Ptr<Undofile> ptr;
703 
704   FsConf* conf = (FsConf*)signal->getDataPtr();
705 
706   Uint32 fd = conf->filePointer;
707   m_file_pool.getPtr(ptr, conf->userPointer);
708 
709   ptr.p->m_fd = fd;
710 
711   {
712     Uint32 senderRef = ptr.p->m_create.m_senderRef;
713     Uint32 senderData = ptr.p->m_create.m_senderData;
714 
715     CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
716     conf->senderData = senderData;
717     conf->senderRef = reference();
718     sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal,
719 	       CreateFileImplConf::SignalLength, JBB);
720   }
721 }
722 
723 bool
find_file_by_id(Ptr<Undofile> & ptr,Local_undofile_list::Head & head,Uint32 id)724 Lgman::find_file_by_id(Ptr<Undofile>& ptr,
725 		       Local_undofile_list::Head& head, Uint32 id)
726 {
727   Local_undofile_list list(m_file_pool, head);
728   for(list.first(ptr); !ptr.isNull(); list.next(ptr))
729     if(ptr.p->m_file_id == id)
730       return true;
731   return false;
732 }
733 
734 void
create_file_commit(Signal * signal,Ptr<Logfile_group> lg_ptr,Ptr<Undofile> ptr)735 Lgman::create_file_commit(Signal* signal,
736 			  Ptr<Logfile_group> lg_ptr,
737 			  Ptr<Undofile> ptr)
738 {
739   Uint32 senderRef = ptr.p->m_create.m_senderRef;
740   Uint32 senderData = ptr.p->m_create.m_senderData;
741 
742   bool first= false;
743   if(ptr.p->m_state == Undofile::FS_CREATING &&
744      (lg_ptr.p->m_state & Logfile_group::LG_ONLINE))
745   {
746     jam();
747     Local_undofile_list free(m_file_pool, lg_ptr.p->m_files);
748     Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
749     first= free.isEmpty();
750     meta.remove(ptr);
751     if(!first)
752     {
753       /**
754        * Add log file next after current head
755        */
756       Ptr<Undofile> curr;
757       m_file_pool.getPtr(curr, lg_ptr.p->m_file_pos[HEAD].m_ptr_i);
758       if(free.next(curr))
759 	free.insert(ptr, curr); // inserts before (that's why the extra next)
760       else
761 	free.add(ptr);
762 
763       ptr.p->m_state = Undofile::FS_ONLINE | Undofile::FS_EMPTY;
764     }
765     else
766     {
767       /**
768        * First file isn't empty as it can be written to at any time
769        */
770       free.add(ptr);
771       ptr.p->m_state = Undofile::FS_ONLINE;
772       lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
773       signal->theData[0] = LgmanContinueB::FLUSH_LOG;
774       signal->theData[1] = lg_ptr.i;
775       signal->theData[2] = 0;
776       sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
777     }
778   }
779   else
780   {
781     ptr.p->m_state = Undofile::FS_SORTING;
782   }
783 
784   ptr.p->m_online.m_lsn = 0;
785   ptr.p->m_online.m_outstanding = 0;
786 
787   Uint64 add= ptr.p->m_file_size - 1;
788   lg_ptr.p->m_free_file_words += add * File_formats::UNDO_PAGE_WORDS;
789 
790   if(first)
791   {
792     jam();
793 
794     Buffer_idx tmp= { ptr.i, 0 };
795     lg_ptr.p->m_file_pos[HEAD] = lg_ptr.p->m_file_pos[TAIL] = tmp;
796 
797     /**
798      * Init log tail pointer
799      */
800     lg_ptr.p->m_tail_pos[0] = tmp;
801     lg_ptr.p->m_tail_pos[1] = tmp;
802     lg_ptr.p->m_tail_pos[2] = tmp;
803     lg_ptr.p->m_next_reply_ptr_i = ptr.i;
804   }
805 
806   validate_logfile_group(lg_ptr, "create_file_commit");
807 
808   CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
809   conf->senderData = senderData;
810   conf->senderRef = reference();
811   sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal,
812 	     CreateFileImplConf::SignalLength, JBB);
813 }
814 
815 void
create_file_abort(Signal * signal,Ptr<Logfile_group> lg_ptr,Ptr<Undofile> ptr)816 Lgman::create_file_abort(Signal* signal,
817 			 Ptr<Logfile_group> lg_ptr,
818 			 Ptr<Undofile> ptr)
819 {
820   if (ptr.p->m_fd == RNIL)
821   {
822     ((FsConf*)signal->getDataPtr())->userPointer = ptr.i;
823     execFSCLOSECONF(signal);
824     return;
825   }
826 
827   FsCloseReq *req= (FsCloseReq*)signal->getDataPtrSend();
828   req->filePointer = ptr.p->m_fd;
829   req->userReference = reference();
830   req->userPointer = ptr.i;
831   req->fileFlag = 0;
832   FsCloseReq::setRemoveFileFlag(req->fileFlag, true);
833 
834   sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal,
835 	     FsCloseReq::SignalLength, JBB);
836 }
837 
838 void
execFSCLOSECONF(Signal * signal)839 Lgman::execFSCLOSECONF(Signal* signal)
840 {
841   Ptr<Undofile> ptr;
842   Ptr<Logfile_group> lg_ptr;
843   Uint32 ptrI = ((FsConf*)signal->getDataPtr())->userPointer;
844   m_file_pool.getPtr(ptr, ptrI);
845 
846   Uint32 senderRef = ptr.p->m_create.m_senderRef;
847   Uint32 senderData = ptr.p->m_create.m_senderData;
848 
849   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
850 
851   if (lg_ptr.p->m_state & Logfile_group::LG_DROPPING)
852   {
853     jam();
854     {
855       Local_undofile_list list(m_file_pool, lg_ptr.p->m_files);
856       list.release(ptr);
857     }
858     drop_filegroup_drop_files(signal, lg_ptr, senderRef, senderData);
859   }
860   else
861   {
862     jam();
863     Local_undofile_list list(m_file_pool, lg_ptr.p->m_meta_files);
864     list.release(ptr);
865 
866     CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr();
867     conf->senderData = senderData;
868     conf->senderRef = reference();
869     sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal,
870 	       CreateFileImplConf::SignalLength, JBB);
871   }
872 }
873 
874 void
execDROP_FILE_REQ(Signal * signal)875 Lgman::execDROP_FILE_REQ(Signal* signal)
876 {
877   jamEntry();
878   ndbrequire(false);
879 }
880 
881 #define CONSUMER 0
882 #define PRODUCER 1
883 
Logfile_group(const CreateFilegroupImplReq * req)884 Lgman::Logfile_group::Logfile_group(const CreateFilegroupImplReq* req)
885 {
886   m_logfile_group_id = req->filegroup_id;
887   m_version = req->filegroup_version;
888   m_state = LG_ONLINE;
889   m_outstanding_fs = 0;
890   m_next_reply_ptr_i = RNIL;
891 
892   m_last_lsn = 0;
893   m_last_synced_lsn = 0;
894   m_last_sync_req_lsn = 0;
895   m_max_sync_req_lsn = 0;
896   m_last_read_lsn = 0;
897   m_file_pos[0].m_ptr_i= m_file_pos[1].m_ptr_i = RNIL;
898 
899   m_free_file_words = 0;
900   m_free_buffer_words = 0;
901   m_pos[CONSUMER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
902   m_pos[CONSUMER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
903   m_pos[PRODUCER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx }
904   m_pos[PRODUCER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used}
905 
906   m_tail_pos[2].m_ptr_i= RNIL;
907   m_tail_pos[2].m_idx= ~0;
908 
909   m_tail_pos[0] = m_tail_pos[1] = m_tail_pos[2];
910 }
911 
912 bool
alloc_logbuffer_memory(Ptr<Logfile_group> ptr,Uint32 bytes)913 Lgman::alloc_logbuffer_memory(Ptr<Logfile_group> ptr, Uint32 bytes)
914 {
915   Uint32 pages= (((bytes + 3) >> 2) + File_formats::NDB_PAGE_SIZE_WORDS - 1)
916     / File_formats::NDB_PAGE_SIZE_WORDS;
917   Uint32 requested= pages;
918   {
919     Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
920     while(pages)
921     {
922       Uint32 ptrI;
923       Uint32 cnt = pages > 64 ? 64 : pages;
924       m_ctx.m_mm.alloc_pages(RG_DISK_OPERATIONS, &ptrI, &cnt, 1);
925       if (cnt)
926       {
927 	Buffer_idx range;
928 	range.m_ptr_i= ptrI;
929 	range.m_idx = cnt;
930 
931 	ndbrequire(map.append((Uint32*)&range, 2));
932 	pages -= range.m_idx;
933       }
934       else
935       {
936 	break;
937       }
938     }
939   }
940 
941   if(2*pages > requested)
942   {
943     // less than half allocated
944     free_logbuffer_memory(ptr);
945     return false;
946   }
947 
948   if(pages != 0)
949   {
950     warningEvent("Allocated %d pages for log buffer space, logfile_group: %d"
951 		 " , requested %d pages",
952 		 (requested-pages), ptr.p->m_logfile_group_id, requested);
953   }
954 
955   init_logbuffer_pointers(ptr);
956   return true;
957 }
958 
959 void
init_logbuffer_pointers(Ptr<Logfile_group> ptr)960 Lgman::init_logbuffer_pointers(Ptr<Logfile_group> ptr)
961 {
962   Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
963   Page_map::Iterator it;
964   union {
965     Uint32 tmp[2];
966     Buffer_idx range;
967   };
968 
969   map.first(it);
970   tmp[0] = *it.data;
971   ndbrequire(map.next(it));
972   tmp[1] = *it.data;
973 
974   ptr.p->m_pos[CONSUMER].m_current_page.m_ptr_i = 0;      // Index in page map
975   ptr.p->m_pos[CONSUMER].m_current_page.m_idx = range.m_idx - 1;// left range
976   ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
977   ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = 0;               // Page pos
978 
979   ptr.p->m_pos[PRODUCER].m_current_page.m_ptr_i = 0;      // Index in page map
980   ptr.p->m_pos[PRODUCER].m_current_page.m_idx = range.m_idx - 1;// left range
981   ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page
982   ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;               // Page pos
983 
984   Uint32 pages= range.m_idx;
985   while(map.next(it))
986   {
987     tmp[0] = *it.data;
988     ndbrequire(map.next(it));
989     tmp[1] = *it.data;
990     pages += range.m_idx;
991   }
992 
993   ptr.p->m_free_buffer_words = pages * File_formats::UNDO_PAGE_WORDS;
994 }
995 
996 Uint32
compute_free_file_pages(Ptr<Logfile_group> ptr)997 Lgman::compute_free_file_pages(Ptr<Logfile_group> ptr)
998 {
999   Buffer_idx head= ptr.p->m_file_pos[HEAD];
1000   Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1001   Uint32 pages = 0;
1002   if (head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx)
1003   {
1004     pages += tail.m_idx - head.m_idx;
1005   }
1006   else
1007   {
1008     Ptr<Undofile> file;
1009     m_file_pool.getPtr(file, head.m_ptr_i);
1010     Local_undofile_list list(m_file_pool, ptr.p->m_files);
1011 
1012     do
1013     {
1014       pages += (file.p->m_file_size - head.m_idx - 1);
1015       if(!list.next(file))
1016 	list.first(file);
1017       head.m_idx = 0;
1018     } while(file.i != tail.m_ptr_i);
1019 
1020     pages += tail.m_idx - head.m_idx;
1021   }
1022   return pages;
1023 }
1024 
1025 void
free_logbuffer_memory(Ptr<Logfile_group> ptr)1026 Lgman::free_logbuffer_memory(Ptr<Logfile_group> ptr)
1027 {
1028   union {
1029     Uint32 tmp[2];
1030     Buffer_idx range;
1031   };
1032 
1033   Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
1034 
1035   Page_map::Iterator it;
1036   map.first(it);
1037   while(!it.isNull())
1038   {
1039     tmp[0] = *it.data;
1040     ndbrequire(map.next(it));
1041     tmp[1] = *it.data;
1042 
1043     m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS, range.m_ptr_i, range.m_idx);
1044     map.next(it);
1045   }
1046   map.release();
1047 }
1048 
Undofile(const struct CreateFileImplReq * req,Uint32 ptrI)1049 Lgman::Undofile::Undofile(const struct CreateFileImplReq* req, Uint32 ptrI)
1050 {
1051   m_fd = RNIL;
1052   m_file_id = req->file_id;
1053   m_logfile_group_ptr_i= ptrI;
1054 
1055   Uint64 pages = req->file_size_hi;
1056   pages = (pages << 32) | req->file_size_lo;
1057   pages /= GLOBAL_PAGE_SIZE;
1058   m_file_size = pages;
1059 
1060   m_create.m_senderRef = req->senderRef; // During META
1061   m_create.m_senderData = req->senderData; // During META
1062   m_create.m_logfile_group_id = req->filegroup_id;
1063 }
1064 
Logfile_client(SimulatedBlock * block,Lgman * lgman,Uint32 logfile_group_id)1065 Logfile_client::Logfile_client(SimulatedBlock* block,
1066 			       Lgman* lgman, Uint32 logfile_group_id)
1067 {
1068   m_block= block->number();
1069   m_lgman= lgman;
1070   m_logfile_group_id= logfile_group_id;
1071 }
1072 
1073 int
sync_lsn(Signal * signal,Uint64 lsn,Request * req,Uint32 flags)1074 Logfile_client::sync_lsn(Signal* signal,
1075 			 Uint64 lsn, Request* req, Uint32 flags)
1076 {
1077   Ptr<Lgman::Logfile_group> ptr;
1078   if(m_lgman->m_logfile_group_list.first(ptr))
1079   {
1080     if(ptr.p->m_last_synced_lsn >= lsn)
1081     {
1082       return 1;
1083     }
1084 
1085     bool empty= false;
1086     Ptr<Lgman::Log_waiter> wait;
1087     {
1088       Lgman::Local_log_waiter_list
1089 	list(m_lgman->m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1090 
1091       empty= list.isEmpty();
1092       if(!list.seize(wait))
1093 	return -1;
1094 
1095       wait.p->m_block= m_block;
1096       wait.p->m_sync_lsn= lsn;
1097       memcpy(&wait.p->m_callback, &req->m_callback,
1098 	     sizeof(SimulatedBlock::Callback));
1099 
1100       ptr.p->m_max_sync_req_lsn = lsn > ptr.p->m_max_sync_req_lsn ?
1101 	lsn : ptr.p->m_max_sync_req_lsn;
1102     }
1103 
1104     if(ptr.p->m_last_sync_req_lsn < lsn &&
1105        ! (ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD))
1106     {
1107       ptr.p->m_state |= Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1108       signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1109       signal->theData[1] = ptr.i;
1110       signal->theData[2] = lsn >> 32;
1111       signal->theData[3] = lsn & 0xFFFFFFFF;
1112       m_lgman->sendSignalWithDelay(m_lgman->reference(),
1113 				   GSN_CONTINUEB, signal, 10, 4);
1114     }
1115     return 0;
1116   }
1117   return -1;
1118 }
1119 
1120 void
force_log_sync(Signal * signal,Ptr<Logfile_group> ptr,Uint32 lsn_hi,Uint32 lsn_lo)1121 Lgman::force_log_sync(Signal* signal,
1122 		      Ptr<Logfile_group> ptr,
1123 		      Uint32 lsn_hi, Uint32 lsn_lo)
1124 {
1125   Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1126   Uint64 force_lsn = lsn_hi; force_lsn <<= 32; force_lsn += lsn_lo;
1127 
1128   if(ptr.p->m_last_sync_req_lsn < force_lsn)
1129   {
1130     /**
1131      * Do force
1132      */
1133     Buffer_idx pos= ptr.p->m_pos[PRODUCER].m_current_pos;
1134     GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1135 
1136     Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1137     if(pos.m_idx) // don't flush empty page...
1138     {
1139       Uint64 lsn= ptr.p->m_last_lsn - 1;
1140 
1141       File_formats::Undofile::Undo_page* undo=
1142 	(File_formats::Undofile::Undo_page*)page;
1143       undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
1144       undo->m_page_header.m_page_lsn_hi = lsn >> 32;
1145       undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1146 
1147       /**
1148        * Update free space with extra NOOP
1149        */
1150       ndbrequire(ptr.p->m_free_file_words >= free);
1151       ndbrequire(ptr.p->m_free_buffer_words > free);
1152       ptr.p->m_free_file_words -= free;
1153       ptr.p->m_free_buffer_words -= free;
1154 
1155       validate_logfile_group(ptr, "force_log_sync");
1156 
1157       next_page(ptr.p, PRODUCER);
1158       ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1159     }
1160   }
1161 
1162 
1163 
1164   Uint64 max_req_lsn = ptr.p->m_max_sync_req_lsn;
1165   if(max_req_lsn > force_lsn &&
1166      max_req_lsn > ptr.p->m_last_sync_req_lsn)
1167   {
1168     ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
1169     signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
1170     signal->theData[1] = ptr.i;
1171     signal->theData[2] = max_req_lsn >> 32;
1172     signal->theData[3] = max_req_lsn & 0xFFFFFFFF;
1173     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 4);
1174   }
1175   else
1176   {
1177     ptr.p->m_state &= ~(Uint32)Lgman::Logfile_group::LG_FORCE_SYNC_THREAD;
1178   }
1179 }
1180 
1181 void
process_log_sync_waiters(Signal * signal,Ptr<Logfile_group> ptr)1182 Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1183 {
1184   Local_log_waiter_list
1185     list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
1186 
1187   if(list.isEmpty())
1188   {
1189     return;
1190   }
1191 
1192   bool removed= false;
1193   Ptr<Log_waiter> waiter;
1194   list.first(waiter);
1195   Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1196 
1197   if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn)
1198   {
1199     removed= true;
1200     Uint32 block = waiter.p->m_block;
1201     SimulatedBlock* b = globalData.getBlock(block);
1202     b->execute(signal, waiter.p->m_callback, logfile_group_id);
1203 
1204     list.releaseFirst(waiter);
1205   }
1206 
1207   if(removed && !list.isEmpty())
1208   {
1209     ptr.p->m_state |= Logfile_group::LG_SYNC_WAITERS_THREAD;
1210     signal->theData[0] = LgmanContinueB::PROCESS_LOG_SYNC_WAITERS;
1211     signal->theData[1] = ptr.i;
1212     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1213   }
1214   else
1215   {
1216     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_SYNC_WAITERS_THREAD;
1217   }
1218 }
1219 
1220 
1221 Uint32*
get_log_buffer(Ptr<Logfile_group> ptr,Uint32 sz)1222 Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
1223 {
1224   GlobalPage *page;
1225   page=m_shared_page_pool.getPtr(ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i);
1226 
1227   Uint32 total_free= ptr.p->m_free_buffer_words;
1228   assert(total_free >= sz);
1229   Uint32 pos= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
1230   Uint32 free= File_formats::UNDO_PAGE_WORDS - pos;
1231 
1232   if(sz <= free)
1233   {
1234 next:
1235     // fits this page wo/ problem
1236     ndbrequire(total_free > sz);
1237     ptr.p->m_free_buffer_words = total_free - sz;
1238     ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
1239     return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
1240   }
1241 
1242   /**
1243    * It didn't fit page...fill page with a NOOP log entry
1244    */
1245   Uint64 lsn= ptr.p->m_last_lsn - 1;
1246   File_formats::Undofile::Undo_page* undo=
1247     (File_formats::Undofile::Undo_page*)page;
1248   undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
1249   undo->m_page_header.m_page_lsn_hi = lsn >> 32;
1250   undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1251 
1252   /**
1253    * Update free space with extra NOOP
1254    */
1255   ndbrequire(ptr.p->m_free_file_words >= free);
1256   ptr.p->m_free_file_words -= free;
1257 
1258   validate_logfile_group(ptr, "get_log_buffer");
1259 
1260   pos= 0;
1261   assert(total_free >= free);
1262   total_free -= free;
1263   page= m_shared_page_pool.getPtr(next_page(ptr.p, PRODUCER));
1264   goto next;
1265 }
1266 
1267 Uint32
next_page(Logfile_group * ptrP,Uint32 i)1268 Lgman::next_page(Logfile_group* ptrP, Uint32 i)
1269 {
1270   Uint32 page_ptr_i= ptrP->m_pos[i].m_current_pos.m_ptr_i;
1271   Uint32 left_in_range= ptrP->m_pos[i].m_current_page.m_idx;
1272   if(left_in_range > 0)
1273   {
1274     ptrP->m_pos[i].m_current_page.m_idx = left_in_range - 1;
1275     ptrP->m_pos[i].m_current_pos.m_ptr_i = page_ptr_i + 1;
1276     return page_ptr_i + 1;
1277   }
1278   else
1279   {
1280     Lgman::Page_map map(m_data_buffer_pool, ptrP->m_buffer_pages);
1281     Uint32 pos= (ptrP->m_pos[i].m_current_page.m_ptr_i + 2) % map.getSize();
1282     Lgman::Page_map::Iterator it;
1283     map.position(it, pos);
1284 
1285     union {
1286       Uint32 tmp[2];
1287       Lgman::Buffer_idx range;
1288     };
1289 
1290     tmp[0] = *it.data; map.next(it);
1291     tmp[1] = *it.data;
1292 
1293     ptrP->m_pos[i].m_current_page.m_ptr_i = pos;           // New index in map
1294     ptrP->m_pos[i].m_current_page.m_idx = range.m_idx - 1; // Free pages
1295     ptrP->m_pos[i].m_current_pos.m_ptr_i = range.m_ptr_i;  // Current page
1296     // No need to set ptrP->m_current_pos.m_idx, that is set "in higher"-func
1297     return range.m_ptr_i;
1298   }
1299 }
1300 
1301 int
get_log_buffer(Signal * signal,Uint32 sz,SimulatedBlock::Callback * callback)1302 Logfile_client::get_log_buffer(Signal* signal, Uint32 sz,
1303 			       SimulatedBlock::Callback* callback)
1304 {
1305   sz += 2; // lsn
1306   Lgman::Logfile_group key;
1307   key.m_logfile_group_id= m_logfile_group_id;
1308   Ptr<Lgman::Logfile_group> ptr;
1309   if(m_lgman->m_logfile_group_hash.find(ptr, key))
1310   {
1311     if(ptr.p->m_free_buffer_words >= (sz + 2*File_formats::UNDO_PAGE_WORDS)&&
1312        ptr.p->m_log_buffer_waiters.isEmpty())
1313     {
1314       return 1;
1315     }
1316 
1317     bool empty= false;
1318     {
1319       Ptr<Lgman::Log_waiter> wait;
1320       Lgman::Local_log_waiter_list
1321 	list(m_lgman->m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1322 
1323       empty= list.isEmpty();
1324       if(!list.seize(wait))
1325       {
1326 	return -1;
1327       }
1328 
1329       wait.p->m_size= sz;
1330       wait.p->m_block= m_block;
1331       memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::Callback));
1332     }
1333 
1334     return 0;
1335   }
1336   return -1;
1337 }
1338 
1339 NdbOut&
operator <<(NdbOut & out,const Lgman::Buffer_idx & pos)1340 operator<<(NdbOut& out, const Lgman::Buffer_idx& pos)
1341 {
1342   out << "[ "
1343       << pos.m_ptr_i << " "
1344       << pos.m_idx << " ]";
1345   return out;
1346 }
1347 
1348 NdbOut&
operator <<(NdbOut & out,const Lgman::Logfile_group::Position & pos)1349 operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos)
1350 {
1351   out << "[ ("
1352       << pos.m_current_page.m_ptr_i << " "
1353       << pos.m_current_page.m_idx << ") ("
1354       << pos.m_current_pos.m_ptr_i << " "
1355       << pos.m_current_pos.m_idx << ") ]";
1356   return out;
1357 }
1358 
1359 void
flush_log(Signal * signal,Ptr<Logfile_group> ptr,Uint32 force)1360 Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
1361 {
1362   Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
1363   Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
1364 
1365   jamEntry();
1366 
1367   if(consumer.m_current_page == producer.m_current_page)
1368   {
1369 
1370 #if 0
1371     if (force)
1372     {
1373       ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x",
1374 	       force, ptr.p->m_file_pos[HEAD].m_ptr_i);
1375       ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
1376 	       consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
1377 	       producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
1378     }
1379 #endif
1380     if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1381     {
1382       jam();
1383 
1384       if (ptr.p->m_log_buffer_waiters.isEmpty() || ptr.p->m_outstanding_fs)
1385       {
1386 	force =  0;
1387       }
1388 
1389       if (force < 2)
1390       {
1391 	signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1392 	signal->theData[1] = ptr.i;
1393 	signal->theData[2] = force + 1;
1394 	sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
1395 			    force ? 10 : 100, 3);
1396 	return;
1397       }
1398       else
1399       {
1400 	Buffer_idx pos= producer.m_current_pos;
1401 	GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i);
1402 
1403 	Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
1404 
1405 	ndbout_c("force flush %d %d", pos.m_idx, ptr.p->m_free_buffer_words);
1406 
1407 	ndbrequire(pos.m_idx); // don't flush empty page...
1408 	Uint64 lsn= ptr.p->m_last_lsn - 1;
1409 
1410 	File_formats::Undofile::Undo_page* undo=
1411 	  (File_formats::Undofile::Undo_page*)page;
1412 	undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
1413 	undo->m_page_header.m_page_lsn_hi = lsn >> 32;
1414 	undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
1415 
1416 	/**
1417 	 * Update free space with extra NOOP
1418 	 */
1419 	ndbrequire(ptr.p->m_free_file_words >= free);
1420 	ndbrequire(ptr.p->m_free_buffer_words > free);
1421 	ptr.p->m_free_file_words -= free;
1422 	ptr.p->m_free_buffer_words -= free;
1423 
1424 	validate_logfile_group(ptr, "force_log_flush");
1425 
1426 	next_page(ptr.p, PRODUCER);
1427 	ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
1428 	producer = ptr.p->m_pos[PRODUCER];
1429 	// break through
1430       }
1431     }
1432     else
1433     {
1434       jam();
1435       ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1436       return;
1437     }
1438   }
1439 
1440   bool full= false;
1441   Uint32 tot= 0;
1442   while(!(consumer.m_current_page == producer.m_current_page) && !full)
1443   {
1444     validate_logfile_group(ptr, "before flush log");
1445 
1446     Uint32 cnt; // pages written
1447     Uint32 page= consumer.m_current_pos.m_ptr_i;
1448     if(consumer.m_current_page.m_ptr_i == producer.m_current_page.m_ptr_i)
1449     {
1450       if(consumer.m_current_page.m_idx > producer.m_current_page.m_idx)
1451       {
1452 	jam();
1453 	Uint32 tmp=
1454 	  consumer.m_current_page.m_idx - producer.m_current_page.m_idx;
1455 	cnt= write_log_pages(signal, ptr, page, tmp);
1456 	assert(cnt <= tmp);
1457 
1458 	consumer.m_current_pos.m_ptr_i += cnt;
1459 	consumer.m_current_page.m_idx -= cnt;
1460 	full= (tmp > cnt);
1461       }
1462       else
1463       {
1464 	// Only 1 chunk
1465 	ndbrequire(ptr.p->m_buffer_pages.getSize() == 2);
1466 	Uint32 tmp= consumer.m_current_page.m_idx + 1;
1467 	cnt= write_log_pages(signal, ptr, page, tmp);
1468 	assert(cnt <= tmp);
1469 
1470 	if(cnt == tmp)
1471 	{
1472 	  jam();
1473 	  /**
1474 	   * Entire chunk is written
1475 	   *   move to next
1476 	   */
1477 	  ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1478 	  next_page(ptr.p, CONSUMER);
1479 	  consumer = ptr.p->m_pos[CONSUMER];
1480  	}
1481 	else
1482 	{
1483 	  jam();
1484 	  /**
1485 	   * Failed to write entire chunk...
1486 	   */
1487 	  full= true;
1488 	  consumer.m_current_page.m_idx -= cnt;
1489 	  consumer.m_current_pos.m_ptr_i += cnt;
1490 	}
1491       }
1492     }
1493     else
1494     {
1495       Uint32 tmp= consumer.m_current_page.m_idx + 1;
1496       cnt= write_log_pages(signal, ptr, page, tmp);
1497       assert(cnt <= tmp);
1498 
1499       if(cnt == tmp)
1500       {
1501 	jam();
1502 	/**
1503 	 * Entire chunk is written
1504 	 *   move to next
1505 	 */
1506 	ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0;
1507 	next_page(ptr.p, CONSUMER);
1508 	consumer = ptr.p->m_pos[CONSUMER];
1509       }
1510       else
1511       {
1512 	jam();
1513 	/**
1514 	 * Failed to write entire chunk...
1515 	 */
1516 	full= true;
1517 	consumer.m_current_page.m_idx -= cnt;
1518 	consumer.m_current_pos.m_ptr_i += cnt;
1519       }
1520     }
1521 
1522     tot += cnt;
1523     if(cnt)
1524       validate_logfile_group(ptr, " after flush_log");
1525   }
1526 
1527   ptr.p->m_pos[CONSUMER]= consumer;
1528 
1529   if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
1530   {
1531     signal->theData[0] = LgmanContinueB::FLUSH_LOG;
1532     signal->theData[1] = ptr.i;
1533     signal->theData[2] = 0;
1534     sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
1535   }
1536   else
1537   {
1538     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
1539   }
1540 }
1541 
1542 void
process_log_buffer_waiters(Signal * signal,Ptr<Logfile_group> ptr)1543 Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
1544 {
1545   Uint32 free_buffer= ptr.p->m_free_buffer_words;
1546   Local_log_waiter_list
1547     list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
1548 
1549   if(list.isEmpty())
1550   {
1551     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1552     return;
1553   }
1554 
1555   bool removed= false;
1556   Ptr<Log_waiter> waiter;
1557   list.first(waiter);
1558   Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
1559   if(waiter.p->m_size + 2*File_formats::UNDO_PAGE_WORDS < free_buffer)
1560   {
1561     removed= true;
1562     Uint32 block = waiter.p->m_block;
1563     SimulatedBlock* b = globalData.getBlock(block);
1564     b->execute(signal, waiter.p->m_callback, logfile_group_id);
1565 
1566     list.releaseFirst(waiter);
1567   }
1568 
1569   if(removed && !list.isEmpty())
1570   {
1571     ptr.p->m_state |= Logfile_group::LG_WAITERS_THREAD;
1572     signal->theData[0] = LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS;
1573     signal->theData[1] = ptr.i;
1574     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1575   }
1576   else
1577   {
1578     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
1579   }
1580 }
1581 
1582 #define REALLY_SLOW_FS 0
1583 
1584 Uint32
write_log_pages(Signal * signal,Ptr<Logfile_group> ptr,Uint32 pageId,Uint32 in_pages)1585 Lgman::write_log_pages(Signal* signal, Ptr<Logfile_group> ptr,
1586 		       Uint32 pageId, Uint32 in_pages)
1587 {
1588   assert(in_pages);
1589   Ptr<Undofile> filePtr;
1590   Buffer_idx head= ptr.p->m_file_pos[HEAD];
1591   Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1592   m_file_pool.getPtr(filePtr, head.m_ptr_i);
1593 
1594   if(filePtr.p->m_online.m_outstanding > 0)
1595   {
1596     jam();
1597     return 0;
1598   }
1599 
1600   Uint32 sz= filePtr.p->m_file_size - 1; // skip zero
1601   Uint32 max, pages= in_pages;
1602 
1603   if(!(head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx))
1604   {
1605     max= sz - head.m_idx;
1606   }
1607   else
1608   {
1609     max= tail.m_idx - head.m_idx;
1610   }
1611 
1612   FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
1613   req->filePointer = filePtr.p->m_fd;
1614   req->userReference = reference();
1615   req->userPointer = filePtr.i;
1616   req->varIndex = 1+head.m_idx; // skip zero page
1617   req->numberOfPages = pages;
1618   req->data.pageData[0] = pageId;
1619   req->operationFlag = 0;
1620   FsReadWriteReq::setFormatFlag(req->operationFlag,
1621 				FsReadWriteReq::fsFormatSharedPage);
1622 
1623   if(max > pages)
1624   {
1625     jam();
1626     max= pages;
1627     head.m_idx += max;
1628     ptr.p->m_file_pos[HEAD] = head;
1629 
1630     if (REALLY_SLOW_FS)
1631       sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
1632 			  FsReadWriteReq::FixedLength + 1);
1633     else
1634       sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
1635 		 FsReadWriteReq::FixedLength + 1, JBA);
1636 
1637     ptr.p->m_outstanding_fs++;
1638     filePtr.p->m_online.m_outstanding = max;
1639     filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
1640 
1641     File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*)
1642       m_shared_page_pool.getPtr(pageId + max - 1);
1643     Uint64 lsn = 0;
1644     lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
1645     lsn += page->m_page_header.m_page_lsn_lo;
1646 
1647     filePtr.p->m_online.m_lsn = lsn;  // Store last writereq lsn on file
1648     ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
1649   }
1650   else
1651   {
1652     jam();
1653     req->numberOfPages = max;
1654     FsReadWriteReq::setSyncFlag(req->operationFlag, 1);
1655 
1656     if (REALLY_SLOW_FS)
1657       sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS,
1658 			  FsReadWriteReq::FixedLength + 1);
1659     else
1660       sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
1661 		 FsReadWriteReq::FixedLength + 1, JBA);
1662 
1663     ptr.p->m_outstanding_fs++;
1664     filePtr.p->m_online.m_outstanding = max;
1665     filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
1666 
1667     File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*)
1668       m_shared_page_pool.getPtr(pageId + max - 1);
1669     Uint64 lsn = 0;
1670     lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
1671     lsn += page->m_page_header.m_page_lsn_lo;
1672 
1673     filePtr.p->m_online.m_lsn = lsn;  // Store last writereq lsn on file
1674     ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group
1675 
1676     Ptr<Undofile> next = filePtr;
1677     Local_undofile_list files(m_file_pool, ptr.p->m_files);
1678     if(!files.next(next))
1679     {
1680       jam();
1681       files.first(next);
1682     }
1683     ndbout_c("changing file from %d to %d", filePtr.i, next.i);
1684     filePtr.p->m_state |= Undofile::FS_MOVE_NEXT;
1685     next.p->m_state &= ~(Uint32)Undofile::FS_EMPTY;
1686 
1687     head.m_idx= 0;
1688     head.m_ptr_i= next.i;
1689     ptr.p->m_file_pos[HEAD] = head;
1690     if(max < pages)
1691       max += write_log_pages(signal, ptr, pageId + max, pages - max);
1692   }
1693 
1694   assert(max);
1695   return max;
1696 }
1697 
1698 void
execFSWRITEREF(Signal * signal)1699 Lgman::execFSWRITEREF(Signal* signal)
1700 {
1701   jamEntry();
1702   SimulatedBlock::execFSWRITEREF(signal);
1703   ndbrequire(false);
1704 }
1705 
1706 void
execFSWRITECONF(Signal * signal)1707 Lgman::execFSWRITECONF(Signal* signal)
1708 {
1709   jamEntry();
1710   FsConf * conf = (FsConf*)signal->getDataPtr();
1711   Ptr<Undofile> ptr;
1712   m_file_pool.getPtr(ptr, conf->userPointer);
1713 
1714   ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
1715   ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
1716 
1717   Ptr<Logfile_group> lg_ptr;
1718   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
1719 
1720   Uint32 cnt= lg_ptr.p->m_outstanding_fs;
1721   ndbrequire(cnt);
1722 
1723   if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
1724   {
1725     Uint32 tot= 0;
1726     Uint64 lsn = 0;
1727     {
1728       Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
1729       while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
1730       {
1731 	Uint32 state= ptr.p->m_state;
1732 	Uint32 pages= ptr.p->m_online.m_outstanding;
1733 	ndbrequire(pages);
1734 	ptr.p->m_online.m_outstanding= 0;
1735 	ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
1736 	tot += pages;
1737 	cnt--;
1738 
1739 	lsn = ptr.p->m_online.m_lsn;
1740 
1741 	if((state & Undofile::FS_MOVE_NEXT) && !files.next(ptr))
1742 	  files.first(ptr);
1743       }
1744     }
1745 
1746     ndbassert(tot);
1747     lg_ptr.p->m_outstanding_fs = cnt;
1748     lg_ptr.p->m_free_buffer_words += (tot * File_formats::UNDO_PAGE_WORDS);
1749     lg_ptr.p->m_next_reply_ptr_i = ptr.i;
1750     lg_ptr.p->m_last_synced_lsn = lsn;
1751 
1752     if(! (lg_ptr.p->m_state & Logfile_group::LG_SYNC_WAITERS_THREAD))
1753     {
1754       process_log_sync_waiters(signal, lg_ptr);
1755     }
1756 
1757     if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
1758     {
1759       process_log_buffer_waiters(signal, lg_ptr);
1760     }
1761   }
1762   else
1763   {
1764     ndbout_c("miss matched writes");
1765   }
1766 
1767   return;
1768 }
1769 
1770 void
execLCP_FRAG_ORD(Signal * signal)1771 Lgman::execLCP_FRAG_ORD(Signal* signal)
1772 {
1773   jamEntry();
1774 
1775   LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
1776   Uint32 lcp_id= ord->lcpId;
1777   Uint32 frag_id = ord->fragmentId;
1778   Uint32 table_id = ord->tableId;
1779 
1780   Ptr<Logfile_group> ptr;
1781   m_logfile_group_list.first(ptr);
1782 
1783   Uint32 entry= lcp_id == m_latest_lcp ?
1784     File_formats::Undofile::UNDO_LCP : File_formats::Undofile::UNDO_LCP_FIRST;
1785   if(!ptr.isNull() && ! (ptr.p->m_state & Logfile_group::LG_CUT_LOG_THREAD))
1786   {
1787     jam();
1788     ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
1789     signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
1790     signal->theData[1] = ptr.i;
1791     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1792   }
1793 
1794   if(!ptr.isNull() && ptr.p->m_last_lsn)
1795   {
1796     Uint32 undo[3];
1797     undo[0] = lcp_id;
1798     undo[1] = (table_id << 16) | frag_id;
1799     undo[2] = (entry << 16 ) | (sizeof(undo) >> 2);
1800 
1801     Uint64 last_lsn= m_last_lsn;
1802 
1803     if(ptr.p->m_last_lsn == last_lsn
1804 #ifdef VM_TRACE
1805        && ((rand() % 100) > 50)
1806 #endif
1807        )
1808     {
1809       undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
1810       Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
1811       memcpy(dst, undo, sizeof(undo));
1812       ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
1813       ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
1814     }
1815     else
1816     {
1817       Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
1818       * dst++ = last_lsn >> 32;
1819       * dst++ = last_lsn & 0xFFFFFFFF;
1820       memcpy(dst, undo, sizeof(undo));
1821       ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
1822       ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
1823     }
1824     ptr.p->m_last_lcp_lsn = last_lsn;
1825     m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
1826 
1827     validate_logfile_group(ptr, "execLCP_FRAG_ORD");
1828   }
1829 
1830   while(!ptr.isNull())
1831   {
1832     if (ptr.p->m_last_lsn)
1833     {
1834       /**
1835        * First LCP_FRAGORD for each LCP, sets tail pos
1836        */
1837       if(m_latest_lcp != lcp_id)
1838       {
1839 	ptr.p->m_tail_pos[0] = ptr.p->m_tail_pos[1];
1840 	ptr.p->m_tail_pos[1] = ptr.p->m_tail_pos[2];
1841 	ptr.p->m_tail_pos[2] = ptr.p->m_file_pos[HEAD];
1842       }
1843 
1844       if(0)
1845 	ndbout_c
1846 	  ("execLCP_FRAG_ORD (%d %d) (%d %d) (%d %d) free pages: %ld",
1847 	   ptr.p->m_tail_pos[0].m_ptr_i, ptr.p->m_tail_pos[0].m_idx,
1848 	   ptr.p->m_tail_pos[1].m_ptr_i, ptr.p->m_tail_pos[1].m_idx,
1849 	   ptr.p->m_tail_pos[2].m_ptr_i, ptr.p->m_tail_pos[2].m_idx,
1850 	   (long) (ptr.p->m_free_file_words / File_formats::UNDO_PAGE_WORDS));
1851     }
1852     m_logfile_group_list.next(ptr);
1853   }
1854 
1855   m_latest_lcp = lcp_id;
1856 }
1857 
1858 void
execEND_LCP_REQ(Signal * signal)1859 Lgman::execEND_LCP_REQ(Signal* signal)
1860 {
1861   EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
1862   ndbrequire(m_latest_lcp == req->backupId);
1863 
1864   Ptr<Logfile_group> ptr;
1865   m_logfile_group_list.first(ptr);
1866   bool wait= false;
1867   while(!ptr.isNull())
1868   {
1869     Uint64 lcp_lsn = ptr.p->m_last_lcp_lsn;
1870     if(ptr.p->m_last_synced_lsn < lcp_lsn)
1871     {
1872       wait= true;
1873       if(signal->getSendersBlockRef() != reference())
1874       {
1875 	Logfile_client tmp(this, this, ptr.p->m_logfile_group_id);
1876 	Logfile_client::Request req;
1877 	req.m_callback.m_callbackData = ptr.i;
1878 	req.m_callback.m_callbackFunction = safe_cast(&Lgman::endlcp_callback);
1879 	ndbrequire(tmp.sync_lsn(signal, lcp_lsn, &req, 0) == 0);
1880       }
1881     }
1882     else
1883     {
1884       ptr.p->m_last_lcp_lsn = 0;
1885     }
1886     m_logfile_group_list.next(ptr);
1887   }
1888 
1889   if(wait)
1890   {
1891     return;
1892   }
1893 
1894   signal->theData[0] = 0;
1895   sendSignal(DBLQH_REF, GSN_END_LCP_CONF, signal, 1, JBB);
1896 }
1897 
1898 void
endlcp_callback(Signal * signal,Uint32 ptr,Uint32 res)1899 Lgman::endlcp_callback(Signal* signal, Uint32 ptr, Uint32 res)
1900 {
1901   EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
1902   req->backupId = m_latest_lcp;
1903   execEND_LCP_REQ(signal);
1904 }
1905 
1906 void
cut_log_tail(Signal * signal,Ptr<Logfile_group> ptr)1907 Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr)
1908 {
1909   bool done= true;
1910   if (likely(ptr.p->m_last_lsn))
1911   {
1912     Buffer_idx tmp= ptr.p->m_tail_pos[0];
1913     Buffer_idx tail= ptr.p->m_file_pos[TAIL];
1914 
1915     Ptr<Undofile> filePtr;
1916     m_file_pool.getPtr(filePtr, tail.m_ptr_i);
1917 
1918     if(!(tmp == tail))
1919     {
1920       Uint32 free;
1921       if(tmp.m_ptr_i == tail.m_ptr_i && tail.m_idx < tmp.m_idx)
1922       {
1923 	free= tmp.m_idx - tail.m_idx;
1924 	ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
1925 	ptr.p->m_file_pos[TAIL] = tmp;
1926       }
1927       else
1928       {
1929 	free= filePtr.p->m_file_size - tail.m_idx - 1;
1930 	ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS;
1931 
1932 	Ptr<Undofile> next = filePtr;
1933 	Local_undofile_list files(m_file_pool, ptr.p->m_files);
1934 	while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY))
1935 	  ndbrequire(next.i != filePtr.i);
1936 	if(next.isNull())
1937 	{
1938 	  jam();
1939 	  files.first(next);
1940 	  while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next))
1941 	    ndbrequire(next.i != filePtr.i);
1942 	}
1943 
1944 	tmp.m_idx= 0;
1945 	tmp.m_ptr_i= next.i;
1946 	ptr.p->m_file_pos[TAIL] = tmp;
1947 	done= false;
1948       }
1949     }
1950 
1951     validate_logfile_group(ptr, "cut log");
1952   }
1953 
1954   if (done)
1955   {
1956     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_CUT_LOG_THREAD;
1957     m_logfile_group_list.next(ptr);
1958   }
1959 
1960   if(!done || !ptr.isNull())
1961   {
1962     ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD;
1963     signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL;
1964     signal->theData[1] = ptr.i;
1965     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1966   }
1967 }
1968 
1969 void
execSUB_GCP_COMPLETE_REP(Signal * signal)1970 Lgman::execSUB_GCP_COMPLETE_REP(Signal* signal)
1971 {
1972   jamEntry();
1973 
1974   Ptr<Logfile_group> ptr;
1975   m_logfile_group_list.first(ptr);
1976 
1977   /**
1978    * Filter all logfile groups in parallell
1979    */
1980   return; // NOT IMPLETMENT YET
1981 
1982   signal->theData[0] = LgmanContinueB::FILTER_LOG;
1983   while(!ptr.isNull())
1984   {
1985     signal->theData[1] = ptr.i;
1986     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1987     m_logfile_group_list.next(ptr);
1988   }
1989 }
1990 
1991 int
alloc_log_space(Uint32 ref,Uint32 words)1992 Lgman::alloc_log_space(Uint32 ref, Uint32 words)
1993 {
1994   ndbrequire(words);
1995   words += 2; // lsn
1996   Logfile_group key;
1997   key.m_logfile_group_id= ref;
1998   Ptr<Logfile_group> ptr;
1999   if(m_logfile_group_hash.find(ptr, key) &&
2000      ptr.p->m_free_file_words >= (words + (4 * File_formats::UNDO_PAGE_WORDS)))
2001   {
2002     ptr.p->m_free_file_words -= words;
2003     validate_logfile_group(ptr, "alloc_log_space");
2004     return 0;
2005   }
2006 
2007   if(ptr.isNull())
2008   {
2009     return -1;
2010   }
2011 
2012   return 1501;
2013 }
2014 
2015 int
free_log_space(Uint32 ref,Uint32 words)2016 Lgman::free_log_space(Uint32 ref, Uint32 words)
2017 {
2018   ndbrequire(words);
2019   Logfile_group key;
2020   key.m_logfile_group_id= ref;
2021   Ptr<Logfile_group> ptr;
2022   if(m_logfile_group_hash.find(ptr, key))
2023   {
2024     ptr.p->m_free_file_words += (words + 2);
2025     validate_logfile_group(ptr, "free_log_space");
2026     return 0;
2027   }
2028   ndbrequire(false);
2029   return -1;
2030 }
2031 
2032 Uint64
add_entry(const Change * src,Uint32 cnt)2033 Logfile_client::add_entry(const Change* src, Uint32 cnt)
2034 {
2035   Uint32 i, tot= 0;
2036   for(i= 0; i<cnt; i++)
2037   {
2038     tot += src[i].len;
2039   }
2040 
2041   Uint32 *dst;
2042   Uint64 last_lsn= m_lgman->m_last_lsn;
2043   {
2044     Lgman::Logfile_group key;
2045     key.m_logfile_group_id= m_logfile_group_id;
2046     Ptr<Lgman::Logfile_group> ptr;
2047     if(m_lgman->m_logfile_group_hash.find(ptr, key))
2048     {
2049       Uint64 last_lsn_filegroup= ptr.p->m_last_lsn;
2050       if(last_lsn_filegroup == last_lsn
2051 #ifdef VM_TRACE
2052 	 && ((rand() % 100) > 50)
2053 #endif
2054 	 )
2055       {
2056 	dst= m_lgman->get_log_buffer(ptr, tot);
2057 	for(i= 0; i<cnt; i++)
2058 	{
2059 	  memcpy(dst, src[i].ptr, 4*src[i].len);
2060 	  dst += src[i].len;
2061 	}
2062 	* (dst - 1) |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
2063 	ptr.p->m_free_file_words += 2;
2064 	ptr.p->m_free_buffer_words += 2;
2065 	m_lgman->validate_logfile_group(ptr);
2066       }
2067       else
2068       {
2069 	dst= m_lgman->get_log_buffer(ptr, tot + 2);
2070 	* dst++ = last_lsn >> 32;
2071 	* dst++ = last_lsn & 0xFFFFFFFF;
2072 	for(i= 0; i<cnt; i++)
2073 	{
2074 	  memcpy(dst, src[i].ptr, 4*src[i].len);
2075 	  dst += src[i].len;
2076 	}
2077       }
2078     }
2079 
2080     m_lgman->m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
2081 
2082     return last_lsn;
2083   }
2084 }
2085 
2086 void
execSTART_RECREQ(Signal * signal)2087 Lgman::execSTART_RECREQ(Signal* signal)
2088 {
2089   m_latest_lcp = signal->theData[0];
2090 
2091   Ptr<Logfile_group> ptr;
2092   m_logfile_group_list.first(ptr);
2093 
2094   if(ptr.i != RNIL)
2095   {
2096     infoEvent("Applying undo to LCP: %d", m_latest_lcp);
2097     ndbout_c("Applying undo to LCP: %d", m_latest_lcp);
2098     find_log_head(signal, ptr);
2099     return;
2100   }
2101 
2102   signal->theData[0] = reference();
2103   sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
2104 }
2105 
2106 void
find_log_head(Signal * signal,Ptr<Logfile_group> ptr)2107 Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr)
2108 {
2109   ndbrequire(ptr.p->m_state &
2110              (Logfile_group::LG_STARTING | Logfile_group::LG_SORTING));
2111 
2112   if(ptr.p->m_meta_files.isEmpty() && ptr.p->m_files.isEmpty())
2113   {
2114     jam();
2115     /**
2116      * Logfile_group wo/ any files
2117      */
2118     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_STARTING;
2119     ptr.p->m_state |= Logfile_group::LG_ONLINE;
2120     m_logfile_group_list.next(ptr);
2121     signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2122     signal->theData[1] = ptr.i;
2123     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2124     return;
2125   }
2126 
2127   ptr.p->m_state = Logfile_group::LG_SORTING;
2128 
2129   /**
2130    * Read first page from each undofile (1 file at a time...)
2131    */
2132   Local_undofile_list files(m_file_pool, ptr.p->m_meta_files);
2133   Ptr<Undofile> file_ptr;
2134   files.first(file_ptr);
2135 
2136   if(!file_ptr.isNull())
2137   {
2138     /**
2139      * Use log buffer memory when reading
2140      */
2141     Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2142     file_ptr.p->m_online.m_outstanding= page_id;
2143 
2144     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2145     req->filePointer = file_ptr.p->m_fd;
2146     req->userReference = reference();
2147     req->userPointer = file_ptr.i;
2148     req->varIndex = 1; // skip zero page
2149     req->numberOfPages = 1;
2150     req->data.pageData[0] = page_id;
2151     req->operationFlag = 0;
2152     FsReadWriteReq::setFormatFlag(req->operationFlag,
2153 				  FsReadWriteReq::fsFormatSharedPage);
2154 
2155     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2156 	       FsReadWriteReq::FixedLength + 1, JBA);
2157 
2158     ptr.p->m_outstanding_fs++;
2159     file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2160     return;
2161   }
2162   else
2163   {
2164     /**
2165      * All files have read first page
2166      *   and m_files is sorted acording to lsn
2167      */
2168     ndbrequire(!ptr.p->m_files.isEmpty());
2169     Local_undofile_list read_files(m_file_pool, ptr.p->m_files);
2170     read_files.last(file_ptr);
2171 
2172 
2173     /**
2174      * Init binary search
2175      */
2176     ptr.p->m_state = Logfile_group::LG_SEARCHING;
2177     file_ptr.p->m_state = Undofile::FS_SEARCHING;
2178     ptr.p->m_file_pos[TAIL].m_idx = 1;                   // left page
2179     ptr.p->m_file_pos[HEAD].m_idx = file_ptr.p->m_file_size;
2180     ptr.p->m_file_pos[HEAD].m_ptr_i = ((file_ptr.p->m_file_size - 1) >> 1) + 1;
2181 
2182     Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2183     file_ptr.p->m_online.m_outstanding= page_id;
2184 
2185     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2186     req->filePointer = file_ptr.p->m_fd;
2187     req->userReference = reference();
2188     req->userPointer = file_ptr.i;
2189     req->varIndex = ptr.p->m_file_pos[HEAD].m_ptr_i;
2190     req->numberOfPages = 1;
2191     req->data.pageData[0] = page_id;
2192     req->operationFlag = 0;
2193     FsReadWriteReq::setFormatFlag(req->operationFlag,
2194 				  FsReadWriteReq::fsFormatSharedPage);
2195 
2196     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2197 	       FsReadWriteReq::FixedLength + 1, JBA);
2198 
2199     ptr.p->m_outstanding_fs++;
2200     file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2201     return;
2202   }
2203 }
2204 
2205 void
execFSREADCONF(Signal * signal)2206 Lgman::execFSREADCONF(Signal* signal)
2207 {
2208   jamEntry();
2209 
2210   Ptr<Undofile> ptr;
2211   Ptr<Logfile_group> lg_ptr;
2212   FsConf* conf = (FsConf*)signal->getDataPtr();
2213 
2214   m_file_pool.getPtr(ptr, conf->userPointer);
2215   m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
2216 
2217   ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
2218   ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
2219 
2220   Uint32 cnt= lg_ptr.p->m_outstanding_fs;
2221   ndbrequire(cnt);
2222 
2223   if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING)
2224   {
2225     jam();
2226 
2227     if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
2228     {
2229       Uint32 tot= 0;
2230       Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2231       while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
2232       {
2233 	Uint32 state= ptr.p->m_state;
2234 	Uint32 pages= ptr.p->m_online.m_outstanding;
2235 	ndbrequire(pages);
2236 	ptr.p->m_online.m_outstanding= 0;
2237 	ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
2238 	tot += pages;
2239 	cnt--;
2240 
2241 	if((state & Undofile::FS_MOVE_NEXT) && !files.prev(ptr))
2242 	  files.last(ptr);
2243       }
2244 
2245       lg_ptr.p->m_outstanding_fs = cnt;
2246       lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot;
2247       lg_ptr.p->m_next_reply_ptr_i = ptr.i;
2248     }
2249     return;
2250   }
2251 
2252   lg_ptr.p->m_outstanding_fs = cnt - 1;
2253 
2254   Ptr<GlobalPage> page_ptr;
2255   m_shared_page_pool.getPtr(page_ptr, ptr.p->m_online.m_outstanding);
2256   ptr.p->m_online.m_outstanding= 0;
2257 
2258   File_formats::Undofile::Undo_page* page =
2259     (File_formats::Undofile::Undo_page*)page_ptr.p;
2260 
2261   Uint64 lsn = 0;
2262   lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
2263   lsn += page->m_page_header.m_page_lsn_lo;
2264 
2265   switch(ptr.p->m_state){
2266   case Undofile::FS_SORTING:
2267     jam();
2268     break;
2269   case Undofile::FS_SEARCHING:
2270     jam();
2271     find_log_head_in_file(signal, lg_ptr, ptr, lsn);
2272     return;
2273   default:
2274   case Undofile::FS_EXECUTING:
2275   case Undofile::FS_CREATING:
2276   case Undofile::FS_DROPPING:
2277   case Undofile::FS_ONLINE:
2278   case Undofile::FS_OPENING:
2279   case Undofile::FS_EMPTY:
2280     jam();
2281     ndbrequire(false);
2282   }
2283 
2284   /**
2285    * Prepare for execution
2286    */
2287   ptr.p->m_state = Undofile::FS_EXECUTING;
2288   ptr.p->m_online.m_lsn = lsn;
2289 
2290   /**
2291    * Insert into m_files
2292    */
2293   {
2294     Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files);
2295     Local_undofile_list files(m_file_pool, lg_ptr.p->m_files);
2296     meta.remove(ptr);
2297 
2298     Ptr<Undofile> loop;
2299     files.first(loop);
2300     while(!loop.isNull() && loop.p->m_online.m_lsn <= lsn)
2301       files.next(loop);
2302 
2303     if(loop.isNull())
2304     {
2305       /**
2306        * File has highest lsn, add last
2307        */
2308       jam();
2309       files.add(ptr);
2310     }
2311     else
2312     {
2313       /**
2314        * Insert file in correct position in file list
2315        */
2316       files.insert(ptr, loop);
2317     }
2318   }
2319   find_log_head(signal, lg_ptr);
2320 }
2321 
2322 void
execFSREADREF(Signal * signal)2323 Lgman::execFSREADREF(Signal* signal)
2324 {
2325   jamEntry();
2326   SimulatedBlock::execFSREADREF(signal);
2327   ndbrequire(false);
2328 }
2329 
2330 void
find_log_head_in_file(Signal * signal,Ptr<Logfile_group> ptr,Ptr<Undofile> file_ptr,Uint64 last_lsn)2331 Lgman::find_log_head_in_file(Signal* signal,
2332 			     Ptr<Logfile_group> ptr,
2333 			     Ptr<Undofile> file_ptr,
2334 			     Uint64 last_lsn)
2335 {
2336   //     a b
2337   // 3 4 5 0 1
2338   Uint32 curr= ptr.p->m_file_pos[HEAD].m_ptr_i;
2339   Uint32 head= ptr.p->m_file_pos[HEAD].m_idx;
2340   Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx;
2341 
2342   ndbrequire(head > tail);
2343   Uint32 diff = head - tail;
2344 
2345   if(DEBUG_SEARCH_LOG_HEAD)
2346     printf("tail: %d(%lld) head: %d last: %d(%lld) -> ",
2347 	   tail, file_ptr.p->m_online.m_lsn,
2348 	   head, curr, last_lsn);
2349   if(last_lsn > file_ptr.p->m_online.m_lsn)
2350   {
2351     if(DEBUG_SEARCH_LOG_HEAD)
2352       printf("moving tail ");
2353 
2354     file_ptr.p->m_online.m_lsn = last_lsn;
2355     ptr.p->m_file_pos[TAIL].m_idx = tail = curr;
2356   }
2357   else
2358   {
2359     if(DEBUG_SEARCH_LOG_HEAD)
2360       printf("moving head ");
2361 
2362     ptr.p->m_file_pos[HEAD].m_idx = head = curr;
2363   }
2364 
2365   if(diff > 1)
2366   {
2367     // We need to find more pages to be sure...
2368     ptr.p->m_file_pos[HEAD].m_ptr_i = curr = ((head + tail) >> 1);
2369 
2370     if(DEBUG_SEARCH_LOG_HEAD)
2371       ndbout_c("-> new search tail: %d(%lld) head: %d -> %d",
2372 	       tail, file_ptr.p->m_online.m_lsn,
2373 	       head, curr);
2374 
2375     Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2376     file_ptr.p->m_online.m_outstanding= page_id;
2377 
2378     FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2379     req->filePointer = file_ptr.p->m_fd;
2380     req->userReference = reference();
2381     req->userPointer = file_ptr.i;
2382     req->varIndex = curr;
2383     req->numberOfPages = 1;
2384     req->data.pageData[0] = page_id;
2385     req->operationFlag = 0;
2386     FsReadWriteReq::setFormatFlag(req->operationFlag,
2387 				  FsReadWriteReq::fsFormatSharedPage);
2388 
2389     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2390 	       FsReadWriteReq::FixedLength + 1, JBA);
2391 
2392     ptr.p->m_outstanding_fs++;
2393     file_ptr.p->m_state |= Undofile::FS_OUTSTANDING;
2394     return;
2395   }
2396 
2397   ndbrequire(diff == 1);
2398   if(DEBUG_SEARCH_LOG_HEAD)
2399     ndbout_c("-> found last page: %d", tail);
2400 
2401   ptr.p->m_state = 0;
2402   file_ptr.p->m_state = Undofile::FS_EXECUTING;
2403   ptr.p->m_last_lsn = file_ptr.p->m_online.m_lsn;
2404   ptr.p->m_last_read_lsn = file_ptr.p->m_online.m_lsn;
2405   ptr.p->m_last_synced_lsn = file_ptr.p->m_online.m_lsn;
2406   m_last_lsn = file_ptr.p->m_online.m_lsn;
2407 
2408   /**
2409    * Set HEAD position
2410    */
2411   ptr.p->m_file_pos[HEAD].m_ptr_i = file_ptr.i;
2412   ptr.p->m_file_pos[HEAD].m_idx = tail;
2413 
2414   ptr.p->m_file_pos[TAIL].m_ptr_i = file_ptr.i;
2415   ptr.p->m_file_pos[TAIL].m_idx = tail - 1;
2416   ptr.p->m_next_reply_ptr_i = file_ptr.i;
2417 
2418   {
2419     Local_undofile_list files(m_file_pool, ptr.p->m_files);
2420     if(tail == 1)
2421     {
2422       /**
2423        * HEAD is first page in a file...
2424        *   -> PREV should be in previous file
2425        */
2426       Ptr<Undofile> prev = file_ptr;
2427       if(!files.prev(prev))
2428       {
2429 	files.last(prev);
2430       }
2431       ptr.p->m_file_pos[TAIL].m_ptr_i = prev.i;
2432       ptr.p->m_file_pos[TAIL].m_idx = prev.p->m_file_size - 1;
2433       ptr.p->m_next_reply_ptr_i = prev.i;
2434     }
2435 
2436     SimulatedBlock* fs = globalData.getBlock(NDBFS);
2437     infoEvent("Undo head - %s page: %d lsn: %lld",
2438 	      fs->get_filename(file_ptr.p->m_fd),
2439 	      tail, file_ptr.p->m_online.m_lsn);
2440     g_eventLogger.info("Undo head - %s page: %d lsn: %lld",
2441 		       fs->get_filename(file_ptr.p->m_fd),
2442 		       tail, file_ptr.p->m_online.m_lsn);
2443 
2444     for(files.prev(file_ptr); !file_ptr.isNull(); files.prev(file_ptr))
2445     {
2446       infoEvent("   - next - %s(%lld)",
2447 		fs->get_filename(file_ptr.p->m_fd),
2448 		file_ptr.p->m_online.m_lsn);
2449 
2450       g_eventLogger.info("   - next - %s(%lld)",
2451 			 fs->get_filename(file_ptr.p->m_fd),
2452 			 file_ptr.p->m_online.m_lsn);
2453     }
2454   }
2455 
2456   /**
2457    * Start next logfile group
2458    */
2459   m_logfile_group_list.next(ptr);
2460   signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD;
2461   signal->theData[1] = ptr.i;
2462   sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2463 }
2464 
2465 void
init_run_undo_log(Signal * signal)2466 Lgman::init_run_undo_log(Signal* signal)
2467 {
2468   /**
2469    * Perform initial sorting of logfile groups
2470    */
2471   Ptr<Logfile_group> group;
2472   Logfile_group_list& list= m_logfile_group_list;
2473   Logfile_group_list tmp(m_logfile_group_pool);
2474 
2475   list.first(group);
2476   while(!group.isNull())
2477   {
2478     Ptr<Logfile_group> ptr= group;
2479     list.next(group);
2480     list.remove(ptr);
2481 
2482     {
2483       /**
2484        * Init buffer pointers
2485        */
2486       ptr.p->m_free_buffer_words -= File_formats::UNDO_PAGE_WORDS;
2487       ptr.p->m_pos[CONSUMER].m_current_page.m_idx = 0; // 0 more pages read
2488       ptr.p->m_pos[PRODUCER].m_current_page.m_idx = 0; // 0 more pages read
2489 
2490       Uint32 page = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i;
2491       File_formats::Undofile::Undo_page* pageP =
2492 	(File_formats::Undofile::Undo_page*)m_shared_page_pool.getPtr(page);
2493 
2494       ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = pageP->m_words_used;
2495       ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 1;
2496       ptr.p->m_last_read_lsn++;
2497     }
2498 
2499     /**
2500      * Start producer thread
2501      */
2502     signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2503     signal->theData[1] = ptr.i;
2504     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2505 
2506     /**
2507      * Insert in correct position in list of logfile_group's
2508      */
2509     Ptr<Logfile_group> pos;
2510     for(tmp.first(pos); !pos.isNull(); tmp.next(pos))
2511       if(ptr.p->m_last_read_lsn >= pos.p->m_last_read_lsn)
2512 	break;
2513 
2514     if(pos.isNull())
2515       tmp.add(ptr);
2516     else
2517       tmp.insert(ptr, pos);
2518 
2519     ptr.p->m_state =
2520       Logfile_group::LG_EXEC_THREAD | Logfile_group::LG_READ_THREAD;
2521   }
2522   list = tmp;
2523 
2524   execute_undo_record(signal);
2525 }
2526 
2527 void
read_undo_log(Signal * signal,Ptr<Logfile_group> ptr)2528 Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr)
2529 {
2530   Uint32 cnt, free= ptr.p->m_free_buffer_words;
2531 
2532   if(! (ptr.p->m_state & Logfile_group::LG_EXEC_THREAD))
2533   {
2534     jam();
2535     /**
2536      * Logfile_group is done...
2537      */
2538     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_READ_THREAD;
2539     stop_run_undo_log(signal);
2540     return;
2541   }
2542 
2543   if(free <= File_formats::UNDO_PAGE_WORDS)
2544   {
2545     signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2546     signal->theData[1] = ptr.i;
2547     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
2548     return;
2549   }
2550 
2551   Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
2552   Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
2553 
2554   if(producer.m_current_page.m_idx == 0)
2555   {
2556     /**
2557      * zero pages left in range -> switch range
2558      */
2559     Lgman::Page_map::Iterator it;
2560     Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
2561     Uint32 sz = map.getSize();
2562     Uint32 pos= (producer.m_current_page.m_ptr_i + sz - 2) % sz;
2563     map.position(it, pos);
2564     union {
2565       Uint32 _tmp[2];
2566       Lgman::Buffer_idx range;
2567     };
2568     _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
2569     producer.m_current_page.m_ptr_i = pos;
2570     producer.m_current_page.m_idx = range.m_idx;
2571     producer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx;
2572   }
2573 
2574   if(producer.m_current_page.m_ptr_i == consumer.m_current_page.m_ptr_i &&
2575      producer.m_current_pos.m_ptr_i > consumer.m_current_pos.m_ptr_i)
2576   {
2577     Uint32 max=
2578       producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
2579     ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
2580     cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
2581     ndbrequire(cnt <= max);
2582     producer.m_current_pos.m_ptr_i -= cnt;
2583     producer.m_current_page.m_idx -= cnt;
2584   }
2585   else
2586   {
2587     Uint32 max= producer.m_current_page.m_idx;
2588     ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
2589     cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
2590     ndbrequire(cnt <= max);
2591     producer.m_current_pos.m_ptr_i -= cnt;
2592     producer.m_current_page.m_idx -= cnt;
2593   }
2594 
2595   ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS);
2596   free -= (cnt * File_formats::UNDO_PAGE_WORDS);
2597   ptr.p->m_free_buffer_words = free;
2598   ptr.p->m_pos[PRODUCER] = producer;
2599 
2600   signal->theData[0] = LgmanContinueB::READ_UNDO_LOG;
2601   signal->theData[1] = ptr.i;
2602 
2603   if(free > File_formats::UNDO_PAGE_WORDS)
2604     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2605   else
2606     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
2607 }
2608 
2609 Uint32
read_undo_pages(Signal * signal,Ptr<Logfile_group> ptr,Uint32 pageId,Uint32 pages)2610 Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
2611 		       Uint32 pageId, Uint32 pages)
2612 {
2613   ndbrequire(pages);
2614   Ptr<Undofile> filePtr;
2615   Buffer_idx tail= ptr.p->m_file_pos[TAIL];
2616   m_file_pool.getPtr(filePtr, tail.m_ptr_i);
2617 
2618   if(filePtr.p->m_online.m_outstanding > 0)
2619   {
2620     jam();
2621     return 0;
2622   }
2623 
2624   Uint32 max= tail.m_idx;
2625 
2626   FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend();
2627   req->filePointer = filePtr.p->m_fd;
2628   req->userReference = reference();
2629   req->userPointer = filePtr.i;
2630   req->operationFlag = 0;
2631   FsReadWriteReq::setFormatFlag(req->operationFlag,
2632 				FsReadWriteReq::fsFormatSharedPage);
2633 
2634 
2635   if(max > pages)
2636   {
2637     jam();
2638     tail.m_idx -= pages;
2639 
2640     req->varIndex = 1 + tail.m_idx;
2641     req->numberOfPages = pages;
2642     req->data.pageData[0] = pageId - pages;
2643     ptr.p->m_file_pos[TAIL] = tail;
2644 
2645     if(DEBUG_UNDO_EXECUTION)
2646       ndbout_c("a reading from file: %d page(%d-%d) into (%d-%d)",
2647 	       ptr.i, 1 + tail.m_idx, 1+tail.m_idx+pages-1,
2648 	       pageId - pages, pageId - 1);
2649 
2650     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2651 	       FsReadWriteReq::FixedLength + 1, JBA);
2652 
2653     ptr.p->m_outstanding_fs++;
2654     filePtr.p->m_state |= Undofile::FS_OUTSTANDING;
2655     filePtr.p->m_online.m_outstanding = pages;
2656     max = pages;
2657   }
2658   else
2659   {
2660     jam();
2661 
2662     ndbrequire(tail.m_idx - max == 0);
2663     req->varIndex = 1;
2664     req->numberOfPages = max;
2665     req->data.pageData[0] = pageId - max;
2666 
2667     if(DEBUG_UNDO_EXECUTION)
2668       ndbout_c("b reading from file: %d page(%d-%d) into (%d-%d)",
2669 	       ptr.i, 1 , 1+max-1,
2670 	       pageId - max, pageId - 1);
2671 
2672     sendSignal(NDBFS_REF, GSN_FSREADREQ, signal,
2673 	       FsReadWriteReq::FixedLength + 1, JBA);
2674 
2675     ptr.p->m_outstanding_fs++;
2676     filePtr.p->m_online.m_outstanding = max;
2677     filePtr.p->m_state |= Undofile::FS_OUTSTANDING | Undofile::FS_MOVE_NEXT;
2678 
2679     Ptr<Undofile> prev = filePtr;
2680     {
2681       Local_undofile_list files(m_file_pool, ptr.p->m_files);
2682       if(!files.prev(prev))
2683       {
2684 	jam();
2685 	files.last(prev);
2686       }
2687     }
2688     if(DEBUG_UNDO_EXECUTION)
2689       ndbout_c("changing file from %d to %d", filePtr.i, prev.i);
2690 
2691     tail.m_idx= prev.p->m_file_size - 1;
2692     tail.m_ptr_i= prev.i;
2693     ptr.p->m_file_pos[TAIL] = tail;
2694     if(max < pages && filePtr.i != prev.i)
2695       max += read_undo_pages(signal, ptr, pageId - max, pages - max);
2696   }
2697 
2698   return max;
2699 
2700 }
2701 
2702 void
execute_undo_record(Signal * signal)2703 Lgman::execute_undo_record(Signal* signal)
2704 {
2705   Uint64 lsn;
2706   const Uint32* ptr;
2707   Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
2708   if((ptr = get_next_undo_record(&lsn)))
2709   {
2710     Uint32 len= (* ptr) & 0xFFFF;
2711     Uint32 type= (* ptr) >> 16;
2712     Uint32 mask= type & ~(Uint32)File_formats::Undofile::UNDO_NEXT_LSN;
2713     switch(mask){
2714     case File_formats::Undofile::UNDO_END:
2715       stop_run_undo_log(signal);
2716       return;
2717     case File_formats::Undofile::UNDO_LCP:
2718     case File_formats::Undofile::UNDO_LCP_FIRST:
2719     {
2720       Uint32 lcp = * (ptr - len + 1);
2721       if(m_latest_lcp && lcp > m_latest_lcp)
2722       {
2723         if (0)
2724         {
2725           const Uint32 * base = ptr - len + 1;
2726           Uint32 lcp = base[0];
2727           Uint32 tableId = base[1] >> 16;
2728           Uint32 fragId = base[1] & 0xFFFF;
2729 
2730           ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u",
2731                    lcp, tableId, fragId);
2732         }
2733       }
2734 
2735       if(m_latest_lcp == 0 ||
2736 	 lcp < m_latest_lcp ||
2737 	 (lcp == m_latest_lcp &&
2738 	  mask == File_formats::Undofile::UNDO_LCP_FIRST))
2739       {
2740 	stop_run_undo_log(signal);
2741 	return;
2742       }
2743       // Fallthrough
2744     }
2745     case File_formats::Undofile::UNDO_TUP_ALLOC:
2746     case File_formats::Undofile::UNDO_TUP_UPDATE:
2747     case File_formats::Undofile::UNDO_TUP_FREE:
2748     case File_formats::Undofile::UNDO_TUP_CREATE:
2749     case File_formats::Undofile::UNDO_TUP_DROP:
2750     case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
2751     case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
2752       tup->disk_restart_undo(signal, lsn, mask, ptr - len + 1, len);
2753       return;
2754     default:
2755       ndbrequire(false);
2756     }
2757   }
2758   signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD;
2759   sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB);
2760 
2761   return;
2762 }
2763 
2764 const Uint32*
get_next_undo_record(Uint64 * this_lsn)2765 Lgman::get_next_undo_record(Uint64 * this_lsn)
2766 {
2767   Ptr<Logfile_group> ptr;
2768   m_logfile_group_list.first(ptr);
2769 
2770   Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
2771   Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
2772   if(producer.m_current_pos.m_idx < 2)
2773   {
2774     jam();
2775     /**
2776      * Wait for fetching pages...
2777      */
2778     return 0;
2779   }
2780 
2781   Uint32 pos = consumer.m_current_pos.m_idx;
2782   Uint32 page = consumer.m_current_pos.m_ptr_i;
2783 
2784   File_formats::Undofile::Undo_page* pageP=(File_formats::Undofile::Undo_page*)
2785     m_shared_page_pool.getPtr(page);
2786 
2787   if(pos == 0)
2788   {
2789     /**
2790      * End of log
2791      */
2792     pageP->m_data[0] = (File_formats::Undofile::UNDO_END << 16) | 1 ;
2793     pageP->m_page_header.m_page_lsn_hi = 0;
2794     pageP->m_page_header.m_page_lsn_lo = 0;
2795     pos= consumer.m_current_pos.m_idx= pageP->m_words_used = 1;
2796     this_lsn = 0;
2797     return pageP->m_data;
2798   }
2799 
2800   Uint32 *record= pageP->m_data + pos - 1;
2801   Uint32 len= (* record) & 0xFFFF;
2802   ndbrequire(len);
2803   Uint32 *prev= record - len;
2804   Uint64 lsn = 0;
2805 
2806   // Same page
2807   if(((* record) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
2808   {
2809     lsn = ptr.p->m_last_read_lsn - 1;
2810     ndbrequire((Int64)lsn >= 0);
2811   }
2812   else
2813   {
2814     ndbrequire(pos >= 3);
2815     lsn += * (prev - 1); lsn <<= 32;
2816     lsn += * (prev - 0);
2817     len += 2;
2818     ndbrequire((Int64)lsn >= 0);
2819   }
2820 
2821 
2822   ndbrequire(pos >= len);
2823 
2824   if(pos == len)
2825   {
2826     /**
2827      * Switching page
2828      */
2829     ndbrequire(producer.m_current_pos.m_idx);
2830     ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --;
2831 
2832     if(consumer.m_current_page.m_idx)
2833     {
2834       consumer.m_current_page.m_idx--;   // left in range
2835       consumer.m_current_pos.m_ptr_i --; // page
2836     }
2837     else
2838     {
2839       // 0 pages left in range...switch range
2840       Lgman::Page_map::Iterator it;
2841       Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages);
2842       Uint32 sz = map.getSize();
2843       Uint32 tmp = (consumer.m_current_page.m_ptr_i + sz - 2) % sz;
2844 
2845       map.position(it, tmp);
2846       union {
2847 	Uint32 _tmp[2];
2848 	Lgman::Buffer_idx range;
2849       };
2850 
2851       _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data;
2852 
2853       consumer.m_current_page.m_idx = range.m_idx - 1; // left in range
2854       consumer.m_current_page.m_ptr_i = tmp;           // pos in map
2855 
2856       consumer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx - 1; // page
2857     }
2858 
2859     if(DEBUG_UNDO_EXECUTION)
2860       ndbout_c("reading from %d", consumer.m_current_pos.m_ptr_i);
2861 
2862     pageP=(File_formats::Undofile::Undo_page*)
2863       m_shared_page_pool.getPtr(consumer.m_current_pos.m_ptr_i);
2864 
2865     pos= consumer.m_current_pos.m_idx= pageP->m_words_used;
2866 
2867     Uint64 tmp = 0;
2868     tmp += pageP->m_page_header.m_page_lsn_hi; tmp <<= 32;
2869     tmp += pageP->m_page_header.m_page_lsn_lo;
2870 
2871     prev = pageP->m_data + pos - 1;
2872 
2873     if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
2874       ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn);
2875 
2876     ptr.p->m_pos[CONSUMER] = consumer;
2877     ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS;
2878   }
2879   else
2880   {
2881     ptr.p->m_pos[CONSUMER].m_current_pos.m_idx -= len;
2882   }
2883 
2884   * this_lsn = ptr.p->m_last_read_lsn = lsn;
2885 
2886   /**
2887    * Re-sort log file groups
2888    */
2889   Ptr<Logfile_group> sort = ptr;
2890   if(m_logfile_group_list.next(sort))
2891   {
2892     while(!sort.isNull() && sort.p->m_last_read_lsn > lsn)
2893       m_logfile_group_list.next(sort);
2894 
2895     if(sort.i != ptr.p->nextList)
2896     {
2897       m_logfile_group_list.remove(ptr);
2898       if(sort.isNull())
2899 	m_logfile_group_list.add(ptr);
2900       else
2901 	m_logfile_group_list.insert(ptr, sort);
2902     }
2903   }
2904   return record;
2905 }
2906 
2907 void
stop_run_undo_log(Signal * signal)2908 Lgman::stop_run_undo_log(Signal* signal)
2909 {
2910   bool running = false, outstanding = false;
2911   Ptr<Logfile_group> ptr;
2912   m_logfile_group_list.first(ptr);
2913   while(!ptr.isNull())
2914   {
2915     /**
2916      * Mark exec thread as completed
2917      */
2918     ptr.p->m_state &= ~(Uint32)Logfile_group::LG_EXEC_THREAD;
2919 
2920     if(ptr.p->m_state & Logfile_group::LG_READ_THREAD)
2921     {
2922       /**
2923        * Thread is still running...wait for it to complete
2924        */
2925       running = true;
2926     }
2927     else if(ptr.p->m_outstanding_fs)
2928     {
2929       outstanding = true; // a FSREADREQ is outstanding...wait for it
2930     }
2931     else if(ptr.p->m_state != Logfile_group::LG_ONLINE)
2932     {
2933       /**
2934        * Fix log TAIL
2935        */
2936       ndbrequire(ptr.p->m_state == 0);
2937       ptr.p->m_state = Logfile_group::LG_ONLINE;
2938       Buffer_idx tail= ptr.p->m_file_pos[TAIL];
2939       Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
2940 
2941       while(pages)
2942       {
2943 	Ptr<Undofile> file;
2944 	m_file_pool.getPtr(file, tail.m_ptr_i);
2945 	Uint32 page= tail.m_idx;
2946 	Uint32 size= file.p->m_file_size;
2947 	ndbrequire(size >= page);
2948 	Uint32 diff= size - page;
2949 
2950 	if(pages >= diff)
2951 	{
2952 	  pages -= diff;
2953 	  Local_undofile_list files(m_file_pool, ptr.p->m_files);
2954 	  if(!files.next(file))
2955 	    files.first(file);
2956 	  tail.m_idx = 1;
2957 	  tail.m_ptr_i= file.i;
2958 	}
2959 	else
2960 	{
2961 	  tail.m_idx += pages;
2962 	  pages= 0;
2963 	}
2964       }
2965       ptr.p->m_tail_pos[0] = tail;
2966       ptr.p->m_tail_pos[1] = tail;
2967       ptr.p->m_tail_pos[2] = tail;
2968       ptr.p->m_file_pos[TAIL] = tail;
2969 
2970       init_logbuffer_pointers(ptr);
2971 
2972       {
2973 	Buffer_idx head= ptr.p->m_file_pos[HEAD];
2974 	Ptr<Undofile> file;
2975 	m_file_pool.getPtr(file, head.m_ptr_i);
2976 	if (head.m_idx == file.p->m_file_size - 1)
2977 	{
2978 	  Local_undofile_list files(m_file_pool, ptr.p->m_files);
2979 	  if(!files.next(file))
2980 	  {
2981 	    jam();
2982 	    files.first(file);
2983 	  }
2984 	  head.m_idx = 0;
2985 	  head.m_ptr_i = file.i;
2986 	  ptr.p->m_file_pos[HEAD] = head;
2987 	}
2988       }
2989 
2990       ptr.p->m_free_file_words = (Uint64)File_formats::UNDO_PAGE_WORDS *
2991 	(Uint64)compute_free_file_pages(ptr);
2992       ptr.p->m_next_reply_ptr_i = ptr.p->m_file_pos[HEAD].m_ptr_i;
2993 
2994       ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
2995       signal->theData[0] = LgmanContinueB::FLUSH_LOG;
2996       signal->theData[1] = ptr.i;
2997       signal->theData[2] = 0;
2998       sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
2999 
3000       if(1)
3001       {
3002 	SimulatedBlock* fs = globalData.getBlock(NDBFS);
3003 	Ptr<Undofile> hf, tf;
3004 	m_file_pool.getPtr(tf, tail.m_ptr_i);
3005 	m_file_pool.getPtr(hf,  ptr.p->m_file_pos[HEAD].m_ptr_i);
3006 	infoEvent("Logfile group: %d ", ptr.p->m_logfile_group_id);
3007 	g_eventLogger.info("Logfile group: %d ", ptr.p->m_logfile_group_id);
3008 	infoEvent("  head: %s page: %d",
3009 		  fs->get_filename(hf.p->m_fd),
3010 		  ptr.p->m_file_pos[HEAD].m_idx);
3011 	g_eventLogger.info("  head: %s page: %d",
3012 			   fs->get_filename(hf.p->m_fd),
3013 			   ptr.p->m_file_pos[HEAD].m_idx);
3014 	infoEvent("  tail: %s page: %d",
3015 		  fs->get_filename(tf.p->m_fd), tail.m_idx);
3016 	g_eventLogger.info("  tail: %s page: %d",
3017 			   fs->get_filename(tf.p->m_fd), tail.m_idx);
3018       }
3019     }
3020 
3021     m_logfile_group_list.next(ptr);
3022   }
3023 
3024   if(running)
3025   {
3026     jam();
3027     return;
3028   }
3029 
3030   if(outstanding)
3031   {
3032     jam();
3033     signal->theData[0] = LgmanContinueB::STOP_UNDO_LOG;
3034     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 1);
3035     return;
3036   }
3037 
3038   infoEvent("Flushing page cache after undo completion");
3039   g_eventLogger.info("Flushing page cache after undo completion");
3040 
3041   /**
3042    * Start flushing pages (local, LCP)
3043    */
3044   LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr();
3045   ord->lcpId = m_latest_lcp;
3046   sendSignal(PGMAN_REF, GSN_LCP_FRAG_ORD, signal,
3047 	     LcpFragOrd::SignalLength, JBB);
3048 
3049   EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
3050   req->senderRef = reference();
3051   sendSignal(PGMAN_REF, GSN_END_LCP_REQ, signal,
3052 	     EndLcpReq::SignalLength, JBB);
3053 }
3054 
3055 void
execEND_LCP_CONF(Signal * signal)3056 Lgman::execEND_LCP_CONF(Signal* signal)
3057 {
3058   Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
3059   tup->disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0);
3060 
3061   /**
3062    * pgman has completed flushing all pages
3063    *
3064    *   insert "fake" LCP record preventing undo to be "rerun"
3065    */
3066   Uint32 undo[3];
3067   undo[0] = m_latest_lcp;
3068   undo[1] = (0 << 16) | 0;
3069   undo[2] = (File_formats::Undofile::UNDO_LCP_FIRST << 16 )
3070     | (sizeof(undo) >> 2);
3071 
3072   Ptr<Logfile_group> ptr;
3073   ndbrequire(m_logfile_group_list.first(ptr));
3074 
3075   Uint64 last_lsn= m_last_lsn;
3076   if(ptr.p->m_last_lsn == last_lsn
3077 #ifdef VM_TRACE
3078      && ((rand() % 100) > 50)
3079 #endif
3080      )
3081   {
3082     undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
3083     Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
3084     memcpy(dst, undo, sizeof(undo));
3085     ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
3086     ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
3087   }
3088   else
3089   {
3090     Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2);
3091     * dst++ = last_lsn >> 32;
3092     * dst++ = last_lsn & 0xFFFFFFFF;
3093     memcpy(dst, undo, sizeof(undo));
3094     ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
3095     ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
3096   }
3097   m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
3098 
3099   ptr.p->m_last_synced_lsn = last_lsn;
3100   while(m_logfile_group_list.next(ptr))
3101     ptr.p->m_last_synced_lsn = last_lsn;
3102 
3103   infoEvent("Flushing complete");
3104   g_eventLogger.info("Flushing complete");
3105 
3106   signal->theData[0] = reference();
3107   sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB);
3108 }
3109 
3110 #ifdef VM_TRACE
3111 void
validate_logfile_group(Ptr<Logfile_group> ptr,const char * heading)3112 Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading)
3113 {
3114   do
3115   {
3116     if (ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL)
3117       break;
3118 
3119     Uint32 pages = compute_free_file_pages(ptr);
3120 
3121     Uint32 group_pages =
3122       ((ptr.p->m_free_file_words + File_formats::UNDO_PAGE_WORDS - 1)/ File_formats::UNDO_PAGE_WORDS) ;
3123     Uint32 last = ptr.p->m_free_file_words % File_formats::UNDO_PAGE_WORDS;
3124 
3125     if(! (pages >= group_pages))
3126     {
3127       ndbout << heading << " Tail: " << ptr.p->m_file_pos[TAIL]
3128 	     << " Head: " << ptr.p->m_file_pos[HEAD]
3129 	     << " free: " << group_pages << "(" << last << ")"
3130 	     << " found: " << pages;
3131       for(Uint32 i = 0; i<3; i++)
3132       {
3133 	ndbout << " - " << ptr.p->m_tail_pos[i];
3134       }
3135       ndbout << endl;
3136 
3137       ndbrequire(pages >= group_pages);
3138     }
3139   } while(0);
3140 }
3141 #endif
3142 
execGET_TABINFOREQ(Signal * signal)3143 void Lgman::execGET_TABINFOREQ(Signal* signal)
3144 {
3145   jamEntry();
3146 
3147   if(!assembleFragments(signal))
3148   {
3149     return;
3150   }
3151 
3152   GetTabInfoReq * const req = (GetTabInfoReq *)&signal->theData[0];
3153 
3154   const Uint32 reqType = req->requestType & (~GetTabInfoReq::LongSignalConf);
3155   BlockReference retRef= req->senderRef;
3156   Uint32 senderData= req->senderData;
3157   Uint32 tableId= req->tableId;
3158 
3159   if(reqType == GetTabInfoReq::RequestByName){
3160     jam();
3161     if(signal->getNoOfSections())
3162       releaseSections(signal);
3163 
3164     sendGET_TABINFOREF(signal, req, GetTabInfoRef::NoFetchByName);
3165     return;
3166   }
3167 
3168   Logfile_group key;
3169   key.m_logfile_group_id= tableId;
3170   Ptr<Logfile_group> ptr;
3171   m_logfile_group_hash.find(ptr, key);
3172 
3173   if(ptr.p->m_logfile_group_id != tableId)
3174   {
3175     jam();
3176     if(signal->getNoOfSections())
3177       releaseSections(signal);
3178 
3179     sendGET_TABINFOREF(signal, req, GetTabInfoRef::InvalidTableId);
3180     return;
3181   }
3182 
3183 
3184   GetTabInfoConf *conf = (GetTabInfoConf *)&signal->theData[0];
3185 
3186   conf->senderData= senderData;
3187   conf->tableId= tableId;
3188   conf->freeWordsHi= ptr.p->m_free_file_words >> 32;
3189   conf->freeWordsLo= ptr.p->m_free_file_words & 0xFFFFFFFF;
3190   conf->tableType= DictTabInfo::LogfileGroup;
3191   conf->senderRef= reference();
3192   sendSignal(retRef, GSN_GET_TABINFO_CONF, signal,
3193 	     GetTabInfoConf::SignalLength, JBB);
3194 }
3195 
sendGET_TABINFOREF(Signal * signal,GetTabInfoReq * req,GetTabInfoRef::ErrorCode errorCode)3196 void Lgman::sendGET_TABINFOREF(Signal* signal,
3197 			       GetTabInfoReq * req,
3198 			       GetTabInfoRef::ErrorCode errorCode)
3199 {
3200   jamEntry();
3201   GetTabInfoRef * const ref = (GetTabInfoRef *)&signal->theData[0];
3202   /**
3203    * The format of GetTabInfo Req/Ref is the same
3204    */
3205   BlockReference retRef = req->senderRef;
3206   ref->errorCode = errorCode;
3207 
3208   sendSignal(retRef, GSN_GET_TABINFOREF, signal, signal->length(), JBB);
3209 }
3210